python编写Mongodb同步实时数据到Tidb或mysql
Mongodb同步实时数据到Tidb架构图1.Go环境安装yum install golang golang-bin golang-src –y2.mongo-shake安装直接下载最新的mongo-shake安装包不用编译开源下载地址https://github.com/ali...
Mongodb同步实时数据到Tidb
架构图
1.Go环境安装
yum install golang golang-bin golang-src –y
2.mongo-shake安装
直接下载最新的mongo-shake安装包不用编译
开源下载地址
https://github.com/alibaba/MongoShake?spm=a2c4e.10696291.0.0.316619a4JXuDrN
二进制版本下载地址
https://github.com/alibaba/MongoShake/releases
下载mongo-shake-v2.4.16.tar.gz
cd /home/gocode/
tar –zxvf mongo-shake-v2.4.16.tar.gz
mv mongo-shake-v2.4.16 mongo-shake
cd mongo-shake
更改配置vim collector.conf
mongo_urls = mongodb://user:password@x.x.x.x:0000,x.x.x.x:0000
tunnel = kafka
tunnel.address = testGroup1@x.x.x.x:0000,x.x.x.x:0000,x.x.x.x:0000
tunnel.message = json
incr_sync.mongo_fetch_method = oplog
#黑白名单设置
filter.namespace.black =
#filter.namespace.white = test1215
启动
nohup ./start.sh collector.conf > nohup.log 2>&1 &
3. 从kafka获取数据存入Tidb
#Python脚本
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from kafka import KafkaConsumer
import pymysql
import json
import time
import logging
import datetime
import os
import sys
#写日志
logging.basicConfig(level=logging.INFO,
format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
datefmt='%Y/%m/%d %H:%M:%S',
filename='log_tracking.log',
filemode='a')
def writeErrorLog(errSrc, errType, errMsg):
"""
ERROR日志
:param errSrc:
:param errType:
:param errMsg:
:return:
"""
try:
v_log_file = 'err_tracking.log'
v_file = open(v_log_file, 'a')
v_file.write(datetime.datetime.strftime(datetime.datetime.now(),"%Y-%m-%d %H:%M:%S") + " - " + errSrc + " - " + errType +" : " + errMsg + '\n')
v_file.flush()
except Exception as data:
v_err_file = open('err_tracking.log', 'a')
v_err_file.write(str(data) + '\n')
v_err_file.write(datetime.datetime.strftime(datetime.datetime.now(), "%Y-%m-%d %H:%M:%S") + " - " + errSrc + " - " + errType + " : " + errMsg + '\n')
v_err_file.flush()
v_err_file.close()
finally:
v_file.close()
class KafkaPython:
consumer = server = topic = None
TOPIC = 'testGroup1'
BROKER_LIST = 'x.x.x.x:0000'
DB_USER = 'user'
DB_PASS = 'password'
DB_NAME = 'test'
DB_PORT = 0000
DB_IP = 'x.x.x.x'
DB_OPT = {'i':'insert', 'u':'update', 'd':'delete'}
def __init__(self):
print("init kafka consumer")
self.server = self.BROKER_LIST
self.topic = self.TOPIC
print("init mysql client")
self.db = pymysql.connect(self.DB_IP, self.DB_USER,
self.DB_PASS, self.DB_NAME, port=self.DB_PORT)
self.cursor = self.db.cursor()
def __del__(self):
print("end")
def getConnect(self):
self.consumer = KafkaConsumer(self.topic, bootstrap_servers = self.server,enable_auto_commit=False,auto_offset_reset='smallest',group_id='testGroup1')
def execSQL(self, sql):
rows = self.cursor.execute(sql)
# print sql
if rows > 0:
self.db.commit()
return rows
def beginConsumer(self):
for oneLog in self.consumer:
mlog = json.loads(oneLog.value)
if mlog is not None:
self.getDesc(mlog)
def getDesc(self, mlog):
try:
desc = {}
data = {}
desc['operation'] = mlog['op']
desc['namespace'] = mlog['ns']
desc['timestamp'] = mlog['ts']
for o in mlog['o']:
data[o['Name']]=o['Value']
desc['o2'] = mlog['o2']
desc['data'] = data
sql = self.ruleRoute(desc, data)
logging.info(('execute sql is %s' % (sql,)))
ret = self.execSQL(sql)
self.consumer.commit()
dt = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))
logging.info('%s %s %s rows' % (dt, self.DB_OPT[desc['operation']], ret))
except Exception as data:
writeErrorLog('_do', 'exce data Error', str(data))
def ruleRoute(self, desc, data):
if desc['operation'] not in self.DB_OPT.keys():
return
if desc['operation'] == 'i':
desc['_id'] = data['_id']
return self.stmtIns(desc)
elif desc['operation'] == 'u':
desc['_id'] = desc['o2']['_id']
return self.stmtUpd(desc)
else:
desc['_id'] = data['_id']
return self.stmtDel(desc)
def stmtIns(self, desc):
mtbl = desc['namespace']
data = desc['data']
mid = desc['_id']
#mts = desc['timestamp']
data.pop('_id')
#data.pop('timestamp')
mlist = data.keys()
vlist = data.values()
mlist = mtbl + '(%s,%s)' % ('_id',','.join(mlist))
clist = "('%s', " % (mid)
if len(vlist)<1:
return
for val in vlist:
if type(val).__name__ == 'unicode':
clist += "'%s', " % val
elif type(val).__name__ == 'dict':
clist += "'%s', " % json.dumps(val)
elif type(val).__name__ == 'str':
clist += "'%s', " % val
else:
clist += "%s, " % val
clist = clist[:-2] + ")"
sql = "insert into %s values %s" % (mlist, clist)
# print 'insert stmt: ',sql
return sql
def stmtUpd(self, desc):
mtbl = desc['namespace']
data = desc['data']
mid = desc['_id']
mts = desc['timestamp']
stmt = []
for k, v in data.items():
if k != '$set':
continue;
print('item is :'+str(v))
for item in v:
col,val = item.values()
print(col+''+val)
if type(val).__name__ == 'unicode':
ln = "%s = '%s'" % (col, val)
elif type(val).__name__ == 'dict':
ln = "%s = '%s'" % (col, json.dumps(val))
elif type(val).__name__ == 'str':
ln = "%s = '%s'" % (col, val)
else:
ln = "%s = %s" % (col, val)
stmt.append(ln)
col = ','.join(stmt)
sql = "update %s set %s where _id = '%s'" % (mtbl, col, mid)
# print 'update stmt: ',sql
return sql
def stmtDel(self, desc):
mtbl = desc['namespace']
mid = desc['_id']
mts = desc['timestamp']
sql = "delete from %s where _id = '%s' " % (mtbl, mid)
# print 'delete stmt: ',sql
return sql
#关闭数据库
def _release_db(self):
self.cursor.close()
#退出消费消息
def _exit_consumer(self):
self._release_db()
sys.exit()
def disConnect(self):
self.consumer.close()
if __name__ == '__main__':
kp = KafkaPython()
kp.getConnect()
kp.beginConsumer()
运行
nohup python3 -u ./kafkaTomysql.py > nohup.log 2>&1 &
3登录mongdb 创建 测试库
use test1215
插入一条数据验证
db.table20201215.insert({"name":"测试案例"});
4.查看kafka 数据
#查看生成kafka组
./bin/kafka-topics.sh --bootstrap-server x.x.x.x:0000 –list
#查生产数据
./bin/kafka-console-consumer.sh --bootstrap-server x.x.x.x:0000 --topic testGroup1 --from-beginning
#查看消费组
./bin/kafka-consumer-groups.sh --bootstrap-server x.x.x.x:0000 –list
查看消费数据
./bin/kafka-consumer-groups.sh --bootstrap-server x.x.x.x:0000 -group testGroup1 –describe
5.查看Tidb数据写入情况
6.批量写入1W个数据测试
python脚本
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from pymongo import MongoClient
from pymongo import InsertOne
import time
def insert():
#连接数据库
conn = MongoClient('mongodb://user:password@x.x.x.x:0000/admin')
my_db = conn['test1215']
my_collection = my_db['kafka1218']
print(conn)
# 批量写
i = 0
t0 = time.time()
data =[]
while True:
#'_id'为主键,循环时递增,全部添加到data列表内
data.append(InsertOne({"_id":i,"insert_time": int(time.time() * 1)}))
i+=1
#判断列表长度,达到10000执行插入,后继续循环
if len(data) == 10000:
my_collection.bulk_write(data)
res = []
i += 1
continue
#判断i等于1亿时停止循环
elif i == 10000000:
break
if __name__ == '__main__':
insert()
#mongodb里面写入的数据
#kafka 消费信息
#Tidb里面数据(在mongodb里面增删改都会更新到Tidb)
更多推荐
所有评论(0)