数据库批量执行实践(go语言)
数据库的高效执行(查询、写入),除了,设置索引、数据库配置调优、表结构在业务上设置合理、分库分表?、读写分离等等之外,要想改善数据的写入速度,一大杀气是批量插入,此批量并不是大事务,而是利用数据库的特性,insert xxx values (?,?,?),(?,?,?) 的特性,实测效率比单个单个的插入 快10倍以上。那么批量执行,效率如此之高,能够写出一个自动的批量执行的控件就显得尤为必要1:能
原创:https://blog.csdn.net/ndzjx/article/details/120743147
目录
数据库的高效执行(查询、写入),除了,设置索引、数据库配置调优、表结构在业务上设置合理、分库分表?、读写分离等等之外,要想改善数据的写入速度,一大杀器是批量插入,此批量并不是大事务,而是利用数据库的特性,insert xxx values (?,?,?),(?,?,?) 的特性,实测效率比单个单个的插入 快10倍以上。
那么批量执行,效率如此之高,能够写出一个自动的批量执行的控件就显得尤为必要
1:能够到一定数据,开始批量插入
2:没到一定的数目,但是时间点到了,也要批量插入。
满足这两个条件,同时避免代码随处乱用,就有了如下的代码方案:
我看到go-zero中也有这样的实现控件,但是看了看,觉得与整体耦合性太大,不易单独拿出来,封装性有点过度,封装的使用,锁的使用,还是有点云里雾里,不太直观。
倒不如,理解其意,自己写一个来的方便。
一:go-zero bulkinsert的简单逻辑
增加任务时,做判断,
加入 Task,
开启定时器,分别等待任务执行通知和定时器到期
if >= macCount
{
任务(args) -> commander
等待任务执行完
}
二:我的方案
1:增加了 一个缓冲区,保证执行时,能够继续入缓冲,不必等待语句执行,
2:可以自己控制缓冲区的大小,可以控制批量执行的数目(比如sql过长,执行不成功,控制数目可以避免这种情况。)
batchinsert.go
package main
import (
"fmt"
"runtime/debug"
"strings"
"sync"
"time"
)
// DataChanSize benchmark 数据chan加大,把所有数据都第一时间接收下来,可以灵活调整
const DataChanSize = 1000000 // benchmark
type BatchDataType struct {
sync.Mutex
data []interface{} // params to batchFun
dataChan chan interface{} // buf
timerCh chan struct{} // notify
batchCh chan struct{} // notify
batchFun func([]interface{}) error
batchIntervel time.Duration
batchCount int
}
func NewBatchOperator(count int, interval time.Duration, customBatchFun func([]interface{}) error) *BatchDataType {
if count < 1 {
panic("batch count not < 1")
}
if nil == customBatchFun {
panic("batch fun must not nil")
}
m := &BatchDataType{
data: make([]interface{}, 0, count),
// dataChan的作用,是在批量执行data数据时,其他地方还可以继续放入dataChan,不然都得等待数据库批量执行
dataChan: make(chan interface{}, DataChanSize), // benchmark
timerCh: make(chan struct{}),
batchCh: make(chan struct{}),
batchFun: customBatchFun,
batchIntervel: interval,
batchCount: count,
}
go m.prepareBatchData()
return m
}
func (m *BatchDataType) prepareBatchData() {
for {
v := <-m.dataChan
if m.batchCount == len(m.data) {
close(m.timerCh)
// 等待批量执行完
//Log.Info("-----begin batch", time.Now())
<-m.batchCh
//Log.Info("-----end batch", time.Now())
}
m.Lock()
m.data = append(m.data, v)
if 1 == len(m.data) {
m.timerCh = make(chan struct{})
m.batchCh = make(chan struct{})
go m.taskTimer(time.NewTicker(m.batchIntervel))
}
m.Unlock()
}
}
func (m *BatchDataType) PutData(v interface{}) {
m.dataChan <- v
}
func (m *BatchDataType) batchInsertData() {
defer func() {
if err := recover(); err != nil {
Log.Error("panic error: ", string(debug.Stack()))
}
}()
m.Lock()
defer m.Unlock()
// 通知执行完了
defer close(m.batchCh)
//Log.Infof("----begin insert----len:[%d]\n", len(m.data))
// 执行外部的批处理函数
m.batchFun(m.data)
// 清空数据
m.data = m.data[:0]
//Log.Infof("-----end insert----len:[%d]\n", len(m.data))
}
func (m *BatchDataType) taskTimer(ticker *time.Ticker) {
defer ticker.Stop()
select {
case <-ticker.C:
//Log.Info("-----timer----")
m.batchInsertData()
case <-m.timerCh:
//Log.Info("----ch-----")
m.batchInsertData()
}
}
func insertDataBatch(stmtBase string, valueStrings []string, valueArgs []interface{}, fieldNum int) (err error) {
lenAllRows := len(valueStrings)
for start := 0; start < lenAllRows; start += _batchNum {
end := start + _batchNum
if end > lenAllRows {
end = lenAllRows
}
batchValueStrings := valueStrings[start:end]
batchValueArgs := valueArgs[start*fieldNum : end*fieldNum]
stmt := fmt.Sprintf(stmtBase, strings.Join(batchValueStrings, ","))
_, err = _db.Exec(stmt, batchValueArgs...)
if nil != err {
Log.Errorf("insertDataBatch err[%v], stmtBase[%s]", err, stmtBase)
return err
}
}
return
}
其他地方的使用:
// 1:定义批处理个数
var _batchNum = 1000
// 2:初始化时定义 执行对象
var _myDataBatcher = NewBatchOperator(_batchNum, 2*time.Second, batchInsertMyData)
// 3:定义 消息结构体
type MyDataBatchInfo struct {
idStr string
datetime string
// 或者其他数据
protoData []*ProtoData.MyDataInfo
}
// 4:定义 自定义处理函数
func batchInsertMyData(arrMyData []interface{}) error {
sParam := "(" + strings.Repeat(",?", 26)[1:] + ")"
valueStrings := make([]string, 0, len(arrMyData))
fieldNum := strings.Count(sParam, "?")
valueArgs := make([]interface{}, 0, len(arrMyData)*fieldNum)
for _, v0 := range arrMyData {
v1, ok := v0.(*MyDataBatchInfo)
if !ok {
Log.Error("error:not *MyDataBatchInfo")
return errors.New("batch myData not needed type")
}
for _, v2 := range v1.protoData {
valueStrings = append(valueStrings, sParam)
valueArgs = append(valueArgs, v1.idStr)
valueArgs = append(valueArgs, v1.datetime)
valueArgs = append(valueArgs, v2.GetOsInfo())
// 其他参数的设置。。。
}
}
if len(valueStrings) > 0 {
stmtBase := "INSERT INTO myData_baseinfo (mac,infotime,os,/*等等其他参数*/) VALUES %s ON DUPLICATE KEY UPDATE infotime=values(infotime),os=values(os),/*等等其他更新参数*/ "
err := insertDataBatch(stmtBase, valueStrings, valueArgs, fieldNum)
if err != nil {
return err
}
}
return nil
}
至此,批处理 被完美的解决。还有了异步缓存的功能。
更多推荐
所有评论(0)