
python自动化程序从mongo数据库导表到postgres数据库
假设mongo数据库有A表,需要将A表的数据导入postgres数据库中的B表中,并自动完成创建表,创建字段。
·
假设mongo数据库有A表,需要将A表的数据导入postgres数据库中的B表中,并自动完成创建表,创建字段。
import pymongo
import psycopg2
from psycopg2.extras import execute_batch
from bson import ObjectId
class MongoDBToPostgreSQL:
def __init__(self, mongo_dbname, mongo_collection, pg_tablename):
self.mongo_dbname = mongo_dbname
self.mongo_collection = mongo_collection
self.pg_tablename = pg_tablename
def connect_mongo(self):
# 连接 MongoDB
client = pymongo.MongoClient('mongodb://IP地址:27017')
db = client[self.mongo_dbname]
collection = db[self.mongo_collection]
return collection
def connect_pgsql(self):
# 连接 PostgreSQL
conn = psycopg2.connect(
dbname='数据库名称',
user='用户名',
password='密码',
host='IP地址',
port='5432'
)
return conn
def create_pg_table(self, conn, columns):
# 创建 PostgreSQL 表
cursor = conn.cursor()
print(f"CREATE TABLE IF NOT EXISTS {self.pg_tablename}({columns})")
cursor.execute(f"CREATE TABLE IF NOT EXISTS {self.pg_tablename}({columns})")
conn.commit()
cursor.close()
def check_pg_table_exists(self, conn):
# 检查 PostgreSQL 表是否存在
cursor = conn.cursor()
cursor.execute(
f"SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = '{self.pg_tablename}')")
result = cursor.fetchone()
cursor.close()
if result[0]:
return True
else:
return False
def migrate_data(self, collection, conn):
# 将数据从 MongoDB 导入到 PostgreSQL
cursor = conn.cursor()
column_names1 = None
l = 0
if not self.check_pg_table_exists(conn):
# 如果表不存在,则先创建表
column_names1 = collection.find_one().keys() # 获取键名
columns = ', '.join(
[f"\"{column}\" text" for column in column_names1 if
column and column != '_id']) # 定义 PostgreSQL 表的列,排除空列名
# print(columns)
self.create_pg_table(conn, columns)
else:
column_names1 = collection.find_one().keys()
l = len(column_names1)
# 进行数据导入操作
column_names = ', '.join(
[f"\"{column}\"" for column in column_names1 if column and column != '_id']) # 获取键名,排除空列名
sql = f"INSERT INTO {self.pg_tablename} ({column_names}) VALUES ({','.join(['%s' for i in range(l - 1)])})"
def batch_generator():
data_to_insert = []
for doc in collection.find():
list1 = []
# 将 ObjectId 类型转换为字符串类型
doc = {k: str(v) if isinstance(v, ObjectId) else v for k, v in doc.items()}
for i in column_names1:
if i != '_id':
try:
v = doc[i]
if type(v) != str:
v = f'{v}'
list1.append(v.replace("\x00", "\uFFFD"))
except:
list1.append('')
if list1:
data_to_insert.append(list1)
if len(data_to_insert) >= 1000: # 每次处理1000行数据
yield data_to_insert
data_to_insert = []
if data_to_insert:
yield data_to_insert
for data_batch in batch_generator():
execute_batch(cursor, sql, data_batch)
conn.commit()
cursor.close()
def run_migration(self):
collection = self.connect_mongo()
conn = self.connect_pgsql()
self.migrate_data(collection, conn)
conn.close()
if __name__ == '__main__':
mongo_dbname = "mongo数据库名称" #这个传库名
mongo_collection = "A表" #这个传表名
pg_tablename = "B表" #这个传pg表名
migrator = MongoDBToPostgreSQL(mongo_dbname, mongo_collection, pg_tablename)
migrator.run_migration()
更多推荐
所有评论(0)