根据需要实现动态创建超级表,设备表,灵活实现测量参数的名称类型和tags的适配,不用再每个测试改一个程序,实现全自动生成表,入库表
type ZngValue struct {
	DeviceID    string                 `json:"deviceid"`  //设备名
	Metric      string                 `json:"metric"`    //物理量 表名
	Endpoint    string                 `json:"endpoint"`  //IP
	Timestamp   int64                  `json:"timestamp"` //时间戳
	Step        int64                  `json:"step"`      //时间间隔
	ValuesMap   map[string]interface{} `json:"valuesMap"` //数据
	CounterType string                 `json:"counterType"`
	Tags        string                 `json:"tags"`
	TagsMap     map[string]string      `json:"tagsMap"` //保留2种格式,方便后端组件使用
	Extra       string                 `json:"extra"`
}
func (taosd *TDengineDataSource) autogencreatezngvaluesql(sql string, metric ZngValue) (sqlcmd string, err error) {
	var sqlvaluename, sqlvalue string
	sqlbody := sql
	for name, value := range metric.ValuesMap {

		switch vv := value.(type) {
		case string:
			sqlbody = sqlbody + "," + value.(string)
			logger.Debug(vv)

		case float32:
			//vl := value.(float32)
			//sqlbody = sqlbody + "," + name + "=" + strconv.FormatFloat(float64(value.(float32)), 'E', -1, 64)
			sqlvaluename += name + ","
			sqlvalue += strconv.FormatFloat(float64(value.(float32)), 'E', -1, 64) + ","

			logger.Debug(vv)
		case float64:
			//sqlbody = sqlbody + "," + name + "=" + strconv.FormatFloat(value.(float64), 'E', -1, 64)
			sqlvaluename += name + ","
			sqlvalue += strconv.FormatFloat(float64(value.(float64)), 'E', -1, 64) + ","
			logger.Debug(vv)
		case uint64:
			//sqlbody = sqlbody + "," + name + "=" + strconv.FormatInt(int64(value.(uint64)), 10)
			sqlvaluename += name + ","
			sqlvalue += strconv.FormatInt(int64(value.(uint64)), 10) + ","
			logger.Debug(vv)
		case int64:
			//sqlbody = sqlbody + "," + name + "=" + strconv.FormatInt(value.(int64), 10)
			sqlvaluename += name + ","
			sqlvalue += strconv.FormatInt(int64(value.(int64)), 10) + ","
			logger.Debug(vv)
		case int:
			//sqlbody = sqlbody + "," + name + "=" + strconv.Itoa(value.(int))
			sqlvaluename += name + ","
			sqlvalue += strconv.FormatInt(int64(value.(int)), 10) + ","
			logger.Debug(vv)
		case bool:
			//sqlbody = sqlbody + "," + name + "=" + strconv.FormatBool(value.(bool))
			sqlvaluename += name + ","
			sqlvalue += strconv.FormatBool(value.(bool)) + ","
		case int8:
			//sqlbody = sqlbody + "," + name + "=" + strconv.FormatInt(int64(value.(uint64)), 10)
			sqlvaluename += name + ","
			sqlvalue += strconv.FormatInt(int64(value.(int8)), 10) + ","
			logger.Debug(vv)
		case int16:
			//sqlbody = sqlbody + "," + name + "=" + strconv.FormatInt(int64(value.(uint64)), 10)
			sqlvaluename += name + ","
			sqlvalue += strconv.FormatInt(int64(value.(int16)), 10) + ","
			logger.Debug(vv)
		case byte:

		default:
			//	var err error
			//err.(string) = "not support"
			//fmt.Printf()err("not support")
			return "", errors.New("not support")
		}
		//var
	}
	sqlvaluename = sqlvaluename[:len(sqlvaluename)-1]
	sqlvalue = sqlvalue[:len(sqlvalue)-1]
	tl := metric.Timestamp
	tls := strconv.FormatInt(tl, 10)

	sqlcmd = sqlbody + sqlvaluename + ") values(" + tls + "000," + sqlvalue + ")\n"
	return sqlcmd, nil
}

func (taosd *TDengineDataSource) serilizeZngTDengine(m ZngValue, tbn string, db *sql.DB) error {
	idx := taosd.TAOShashID([]byte(tbn))
	sqlcmd := " " + tbn

	//tl := m.Timestamp
	//tls := strconv.FormatInt(tl, 10)
	sqlcmd = sqlcmd + "(ts,"
	sqlcmd, err := taosd.autogencreatezngvaluesql(sqlcmd, m)
	if err != nil {
		logger.Debugf("sqlcmd err %v", err)
		return err
	}
	id := idx % taosd.Section.config.insertsqlworkers
	logger.Debugf("Send-->GO %v"+sqlcmd, id)
	taosd.insertbatchChans[id] <- sqlcmd
	return nil
}
func (taosd *TDengineDataSource) autogencreatezngtabelsql(stbname string, metric ZngValue) (sqlcmd string, err error) {
	//var sqlbody string
	sqlbody := " (ts timestamp"
	for valuename, value := range metric.ValuesMap {

		switch value.(type) {
		case string:
			sqlbody = sqlbody + "," + valuename + " BINARY(100)"
		case float32:
			sqlbody = sqlbody + "," + valuename + " FLOAT"
		case float64:
			sqlbody = sqlbody + "," + valuename + " DOUBLE"
		case uint64:
			sqlbody = sqlbody + "," + valuename + " BIGINT"
		case int64:
			sqlbody = sqlbody + "," + valuename + " BIGINT"
		case int:
			sqlbody = sqlbody + "," + valuename + " INT"
		case bool:
			sqlbody = sqlbody + "," + valuename + " BOOL"
		case int8:
			sqlbody = sqlbody + "," + valuename + " TINYINT"
		case int16:
			sqlbody = sqlbody + "," + valuename + " SMALLINT"
		case byte:

		default:
			//	var err error
			//err.(string) = "not support"
			//fmt.Printf()err("not support")
			return "", errors.New("not support")
		}
		//var
	}
	sqlcmd = "create table if not exists " + stbname + sqlbody + ") tags(taghash binary(34)"
	return sqlcmd, nil
}

func (taosd *TDengineDataSource) HandleZngStable(metric ZngValue, db *sql.DB) error {
	taglist := list.New()   // save push in data tags name include endpoint / nid
	tbtaglist := list.New() //save ready add to stable table  tags max num(tagnumlimit)
	tagmap := make(map[string]string)
	tbtagmap := make(map[string]string)
	//m := make(metric, len(metric))
	tagnum := taosd.Section.config.tagnumlimit
	//var hasName bool = false
	var metricsName string
	var tbn string = ""
	var ln string = ""
	taglen := taosd.Section.config.taglen
	var nt nametag
	var sqlcmd string
	//var annotlen int
	//fmt.Println(ts)
	j := 0
	metricsName = metric.Metric
	if metricsName != "" {
		tbn += metricsName
		//hasName = true
	} else {
		info := fmt.Sprintf("no name metric")
		logger.Errorf(info)
		return nil
	}
	if metric.Endpoint != "" {
		tagmap["ip"] = metric.Endpoint
		ln = "ip" //strings.ToLower(string(metric.Endpoint))
		taosd.OrderInsertS(ln, taglist)
		taosd.OrderInsertS(ln, tbtaglist)
		tbtagmap[ln] = "y"
		tbn += strings.ToLower(string(metric.Endpoint))
		j++
	}
	if metric.DeviceID != "" {
		tagmap["deviceid"] = metric.DeviceID
		ln = "deviceid" //strings.ToLower(string(metric.Nid))
		taosd.OrderInsertS(ln, taglist)
		taosd.OrderInsertS(ln, tbtaglist)
		tbtagmap[ln] = "y"
		tbn += strings.ToLower(string(metric.DeviceID))
		j++
	}

	for k, v := range metric.TagsMap {
		//j <= tagnum
		j++
		ln = strings.ToLower(string(k))
		//taosd.OrderInsertS(ln, taglist)
		s := string(v)
		if j <= tagnum {
			tbn += s
			taosd.OrderInsertS(ln, taglist)
			taosd.OrderInsertS(ln, tbtaglist)
			//tbtaglist.PushBack(ln)
			if len(s) > taglen {
				s = s[:taglen]
			}
			tagmap[ln] = s
			tbtagmap[ln] = "y"
		}

	}

	if taosd.Section.config.debugprt == 2 {
		t := metric.Timestamp
		//var ns int64 = 0
		//if t/1000000000 > 10 {
		//	tm := t / 1000
		//	ns = t - tm*1000
		//}
		logger.Debugf(" Ts: %v, value: %v, ", time.Unix(t, 0), metric.ValuesMap)
		//logger.Debug(ts)
	}

	stbname := taosd.tablenameEscape(metricsName)
	//var ok bool
	//i := 0
	schema, ok := taosd.IsSTableCreated.Load(stbname)
	if !ok { // no local record of super table structure
		//获取表结构的TAG字段数组
		stablehas := false
		tags := taosd.taosdGetTableTagDescribe(db, stbname)
		if tags != nil {
			taostaglist := list.New()
			taostagmap := make(map[string]string)
			for _, tag := range tags {
				taostaglist.PushBack(tag)
				taostagmap[tag] = "y"
			}
			nt.taglist = taostaglist
			nt.tagmap = taostagmap
			//tbtaglist = nt.taglist
			//tbtagmap = nt.tagmap
			stablehas = true //stable is exist

		}
		//nt.tagmap
		//nt.taglist

		if stablehas { //超级表存在
			//需要插入的 tags
			//yes, the super table was already created in TDengine
			for e := tbtaglist.Front(); e != nil; e = e.Next() {
				k := e.Value.(string)
				//i++
				//if i < taosd.Section.config.tagnumlimit {
				_, ok = nt.tagmap[k] //表结构的tags name
				if !ok {
					//tag以前不存在,需要添补插入,tag在超级表里没找到则改变表结构,添加此tag
					sqlcmd = "alter table " + stbname + " add tag " + k + taosd.Section.tagstr + "\n"
					_, err := taosd.execSql(taosd.Section.config.dbName, sqlcmd, db)
					if err != nil {
						logger.Error(err)
						errorcode := fmt.Sprintf("%s", err)
						if strings.Contains(errorcode, "duplicated column names") {
							nt.taglist.PushBack(k)
							//OrderInsertS(k, tbtaglist)
							nt.tagmap[k] = "y"
						}
					} else {
						nt.taglist.PushBack(k)
						//OrderInsertS(k, tbtaglist)
						nt.tagmap[k] = "y"
					}
				}
				//}
			}
			tbtaglist = nt.taglist
			tbtagmap = nt.tagmap
			taosd.IsSTableCreated.Store(stbname, nt)
		} else { //超级表不存在
			// no, the super table haven't been created in TDengine, create it.
			nt.taglist = tbtaglist
			nt.tagmap = tbtagmap
			sqlcmd, err := taosd.autogencreatezngtabelsql(stbname, metric)
			if err != nil {
				logger.Errorf("autogencreatetabelsql%s", err.Error())
				return err
			}
			//sqlcmd = "create table if not exists " + stbname + " (ts timestamp, value double) tags(taghash binary(34)"
			for e := tbtaglist.Front(); e != nil; e = e.Next() {
				sqlcmd = sqlcmd + "," + e.Value.(string) + taosd.Section.tagstr
			}
			sqlcmd = sqlcmd + ")\n"
			if taosd.Section.config.debugprt == 2 {
				fmt.Printf("SQL:%s", sqlcmd)
			}
			_, err = taosd.execSql(taosd.Section.config.dbName, sqlcmd, db)
			if err == nil {
				//tbtaglist = nt.taglist
				//tbtagmap = nt.tagmap
				taosd.IsSTableCreated.Store(stbname, nt)
			} else {
				logger.Error(err)
				return err
			}
		}
	} else { //有本地 tag信息,就让需要插入的tags跟本地的tag信息进行比对
		ntag := schema.(nametag)
		//tbtaglist = ntag.taglist
		//tbtagmap = ntag.tagmap
		//i := 0
		for e := tbtaglist.Front(); e != nil; e = e.Next() {
			k := e.Value.(string)
			//i++
			//if i < taosd.Section.config.tagnumlimit {
			_, ok := ntag.tagmap[k]
			if !ok {
				sqlcmd = "alter table " + stbname + " add tag " + k + taosd.Section.tagstr + "\n"
				_, err := taosd.execSql(taosd.Section.config.dbName, sqlcmd, db)
				if err != nil {
					logger.Error(err)
					errorcode := fmt.Sprintf("%s", err)
					if strings.Contains(errorcode, "duplicated column names") {
						ntag.taglist.PushBack(k)
						//OrderInsertS(k, tbtaglist)
						ntag.tagmap[k] = "y"
					} else {
						return err
					}

				} else {
					ntag.taglist.PushBack(k)
					//OrderInsertS(k, tbtaglist)
					ntag.tagmap[k] = "y"
				}
			}
			//}
		}

		tbtaglist = ntag.taglist
		tbtagmap = ntag.tagmap
		taosd.IsSTableCreated.Store(stbname, ntag)
	}
	// insert device table data ,tables create auto
	tbnhash := "MD5_" + taosd.md5V2(tbn)
	_, tbcreated := taosd.IsTableCreated.Load(tbnhash)
	//tbtaglist,tbtagmap根据更新过程记录最新的tag结构
	if !tbcreated {
		var sqlcmdhead, sqlcmd string
		sqlcmdhead = "create table if not exists " + tbnhash + " using " + stbname + " tags(\""
		sqlcmd = ""
		i := 0
		for e := tbtaglist.Front(); e != nil; e = e.Next() {
			if e.Value.(string) == "taghash" {
				continue
			}
			tagvalue, has := tagmap[e.Value.(string)]
			if len(tagvalue) > taglen {
				tagvalue = tagvalue[:taglen]
			}

			if i == 0 {
				if has {
					sqlcmd = sqlcmd + "\"" + tagvalue + "\""
				} else {
					sqlcmd = sqlcmd + "null"
				}
				i++
			} else {
				if has {
					sqlcmd = sqlcmd + ",\"" + tagvalue + "\""
				} else {
					sqlcmd = sqlcmd + ",null"
				}
			}

		}

		var keys []string
		var tagHash = ""
		for t := range tagmap {
			keys = append(keys, t)
		}
		sort.Strings(keys)
		for _, k := range keys {
			tagHash += tagmap[k]
		}

		sqlcmd = sqlcmd + ")\n"
		sqlcmd = sqlcmdhead + taosd.md5V2(tagHash) + "\"," + sqlcmd
		_, err := taosd.execSql(taosd.Section.config.dbName, sqlcmd, db)
		if err == nil {
			taosd.IsTableCreated.Store(tbnhash, true)
		} else {
			return err
		}
	}
	if taosd.Section.config.debugprt == 2 {
		fmt.Println("stable:", metric)
	}
	taosd.serilizeZngTDengine(metric, tbnhash, db)
	return nil
}
func (taosd *TDengineDataSource) PushZng2Queue(items []*ZngValue) {
	errCnt := 0
	var req ZngValue
	for _, item := range items {
		req = *item
		var address string

		//取Endpoint或Nid作为唯一标识经过和计算后为通道索引键值
		if item.Endpoint != "" {
			address = item.Endpoint
		} else {
			if item.DeviceID != "" {
				address = item.DeviceID
			} else {
				continue
			}
		}
		if address != "" {
			//stats.Counter.Set("points.in", 1)
			logger.Debugf("taosd Pushzng2Queue :%s", address)
			if taosd.Section.config.debugprt == 2 {
				fmt.Println("taosd Pushzng2Queue:", req)
			}

			idx := taosd.TAOShashID([]byte(address))
			taosd.inputnodeChans[idx%taosd.Section.config.inputworkers] <- req //将需入库参数组写入通道队列
		} else {
			errCnt += 1
		}
	}

}
//接口实例
func (e Deviceinfo) Heart(c *gin.Context) {
	var values taosd.ZngValue
	var items []*taosd.ZngValue
	err := c.ShouldBindBodyWith(&values, binding.JSON)
	if err != nil {
		fmt.Println("api heart", "ShouldBindBodyWith error", err)
		e.Error(500, err, fmt.Sprintf("设备实时状态更新 失败,\r\n失败信息 %s", err.Error()))
		//response.ReturnJSON(c, http.StatusOK, statuscode.InvalidParam.Code,statuscode.InvalidParam.Msg, nil)
		return
	}
	fmt.Println("api heart values ", values)

	//for _, item := range values {
	items = append(items, &values)
	//}
	fmt.Println("/r/napi heart", items)
	if items != nil {
		taosd.Push2Queue(items)
		//e.OK(values.DeviceID, "设备实时状态更新成功")
	} else {
		e.Error(500, err, fmt.Sprintf("设备实时状态更新 失败,\r\n失败信息 %s", err.Error()))

	}
}
//http 接口的 body
{
    "deviceid": "1002",
    "metric": "heart",
    "endpoint": "192.168.0.11",
    "timestamp": 1629037578, 
    "valuesMap":{
        "cputemp": 65,
        "cpuload": 78,
        "gputemp": 76,
        "gpuload": 66,
        "memempty": 30,
        "diskempty": 50
    },
    "tagsMap": {
        "mac": "ae:23:55:ed:66",
        "localaddr": "北京",
        "devicename": "智能设备1"
    }
}

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐