原创:https://blog.csdn.net/ndzjx/article/details/120743147

目录

一:go-zero bulkinsert的简单逻辑

二:我的方案


数据库的高效执行(查询、写入),除了,设置索引、数据库配置调优、表结构在业务上设置合理、分库分表?、读写分离等等之外,要想改善数据的写入速度,一大杀器是批量插入,此批量并不是大事务,而是利用数据库的特性,insert xxx values (?,?,?),(?,?,?) 的特性,实测效率比单个单个的插入 快10倍以上。

那么批量执行,效率如此之高,能够写出一个自动的批量执行的控件就显得尤为必要

1:能够到一定数据,开始批量插入

2:没到一定的数目,但是时间点到了,也要批量插入。

满足这两个条件,同时避免代码随处乱用,就有了如下的代码方案:

我看到go-zero中也有这样的实现控件,但是看了看,觉得与整体耦合性太大,不易单独拿出来,封装性有点过度,封装的使用,锁的使用,还是有点云里雾里,不太直观。

倒不如,理解其意,自己写一个来的方便。

一:go-zero bulkinsert的简单逻辑

增加任务时,做判断,
	加入 Task, 
	开启定时器,分别等待任务执行通知和定时器到期
	if >= macCount
	{
		任务(args) -> commander 
		等待任务执行完
	}

二:我的方案

       1:增加了 一个缓冲区,保证执行时,能够继续入缓冲,不必等待语句执行,

       2:可以自己控制缓冲区的大小,可以控制批量执行的数目(比如sql过长,执行不成功,控制数目可以避免这种情况。)

batchinsert.go

package main

import (
	"fmt"
	"runtime/debug"
	"strings"
	"sync"
	"time"
)

// DataChanSize benchmark 数据chan加大,把所有数据都第一时间接收下来,可以灵活调整
const DataChanSize = 1000000 // benchmark

type BatchDataType struct {
	sync.Mutex
	data          []interface{}    // params to batchFun
	dataChan      chan interface{} // buf
	timerCh       chan struct{}    // notify
	batchCh       chan struct{}    // notify
	batchFun      func([]interface{}) error
	batchIntervel time.Duration
	batchCount    int
}

func NewBatchOperator(count int, interval time.Duration, customBatchFun func([]interface{}) error) *BatchDataType {
	if count < 1 {
		panic("batch count not < 1")
	}
	if nil == customBatchFun {
		panic("batch fun must not nil")
	}
	m := &BatchDataType{
		data: make([]interface{}, 0, count),
		// dataChan的作用,是在批量执行data数据时,其他地方还可以继续放入dataChan,不然都得等待数据库批量执行
		dataChan:      make(chan interface{}, DataChanSize), // benchmark
		timerCh:       make(chan struct{}),
		batchCh:       make(chan struct{}),
		batchFun:      customBatchFun,
		batchIntervel: interval,
		batchCount:    count,
	}
	go m.prepareBatchData()
	return m
}

func (m *BatchDataType) prepareBatchData() {
	for {
		v := <-m.dataChan
		if m.batchCount == len(m.data) {
			close(m.timerCh)
			// 等待批量执行完
			//Log.Info("-----begin batch", time.Now())
			<-m.batchCh
			//Log.Info("-----end batch", time.Now())
		}
		m.Lock()
		m.data = append(m.data, v)
		if 1 == len(m.data) {
			m.timerCh = make(chan struct{})
			m.batchCh = make(chan struct{})
			go m.taskTimer(time.NewTicker(m.batchIntervel))
		}
		m.Unlock()
	}
}

func (m *BatchDataType) PutData(v interface{}) {
	m.dataChan <- v
}

func (m *BatchDataType) batchInsertData() {
	defer func() {
		if err := recover(); err != nil {
			Log.Error("panic error: ", string(debug.Stack()))
		}
	}()
	m.Lock()
	defer m.Unlock()
	// 通知执行完了
	defer close(m.batchCh)
	//Log.Infof("----begin insert----len:[%d]\n", len(m.data))
	// 执行外部的批处理函数
	m.batchFun(m.data)
	// 清空数据
	m.data = m.data[:0]
	//Log.Infof("-----end insert----len:[%d]\n", len(m.data))
}

func (m *BatchDataType) taskTimer(ticker *time.Ticker) {
	defer ticker.Stop()
	select {
	case <-ticker.C:
		//Log.Info("-----timer----")
		m.batchInsertData()
	case <-m.timerCh:
		//Log.Info("----ch-----")
		m.batchInsertData()
	}
}

func insertDataBatch(stmtBase string, valueStrings []string, valueArgs []interface{}, fieldNum int) (err error) {
	lenAllRows := len(valueStrings)
	for start := 0; start < lenAllRows; start += _batchNum {
		end := start + _batchNum
		if end > lenAllRows {
			end = lenAllRows
		}
		batchValueStrings := valueStrings[start:end]
		batchValueArgs := valueArgs[start*fieldNum : end*fieldNum]
		stmt := fmt.Sprintf(stmtBase, strings.Join(batchValueStrings, ","))
		_, err = _db.Exec(stmt, batchValueArgs...)
		if nil != err {
			Log.Errorf("insertDataBatch err[%v], stmtBase[%s]", err, stmtBase)
			return err
		}
	}
	return
}

其他地方的使用:

// 1:定义批处理个数
var _batchNum = 1000

// 2:初始化时定义 执行对象
var _myDataBatcher = NewBatchOperator(_batchNum, 2*time.Second, batchInsertMyData)

// 3:定义 消息结构体
type MyDataBatchInfo struct {
	idStr    string
	datetime string
	// 或者其他数据
	protoData []*ProtoData.MyDataInfo
}

// 4:定义 自定义处理函数
func batchInsertMyData(arrMyData []interface{}) error {
	sParam := "(" + strings.Repeat(",?", 26)[1:] + ")"
	valueStrings := make([]string, 0, len(arrMyData))
	fieldNum := strings.Count(sParam, "?")
	valueArgs := make([]interface{}, 0, len(arrMyData)*fieldNum)

	for _, v0 := range arrMyData {
		v1, ok := v0.(*MyDataBatchInfo)
		if !ok {
			Log.Error("error:not *MyDataBatchInfo")
			return errors.New("batch myData not needed type")
		}
		for _, v2 := range v1.protoData {
			valueStrings = append(valueStrings, sParam)
			valueArgs = append(valueArgs, v1.idStr)
			valueArgs = append(valueArgs, v1.datetime)
			valueArgs = append(valueArgs, v2.GetOsInfo())
			// 其他参数的设置。。。
		}
	}

	if len(valueStrings) > 0 {
		stmtBase := "INSERT INTO myData_baseinfo (mac,infotime,os,/*等等其他参数*/) VALUES %s ON DUPLICATE KEY UPDATE infotime=values(infotime),os=values(os),/*等等其他更新参数*/ "
		err := insertDataBatch(stmtBase, valueStrings, valueArgs, fieldNum)
		if err != nil {
			return err
		}
	}

	return nil
}

至此,批处理 被完美的解决。还有了异步缓存的功能。

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐