一、前言

在大数据处理中,经常需要在不同的数据库之间进行数据的导入导出操作。本文将介绍如何使用 Python 中的 Spark 框架实现将 Hive 数据导入到 MySQL 以及从 MySQL 数据导出到 Hive 的功能,以替代传统的 datax 和 sqoop 工具。这里我用到的配置环境皆可根据自己情况进行修改

二、环境准备

  1. 安装 JDK并配置环境变量。
  2. 下载并解压 Hadoop ,配置环境变量`。
  3. 如果想在本地测试需安装 Miniconda3,并配置环境变量PYSPARK_PYTHONPYSPARK_DRIVER_PYTHONF:\APP\Miniconda3/python.exe
  4. 设置 Hadoop 用户名为root,即配置环境变量HADOOP_USER_NAME = 'root'这里是为了解决权限问题伪装一下用户。

三、代码实现

以下是使用 Spark 将 Hive 数据导入导出到 MySQL 的 Python 代码:

import os

from pyspark.sql import SparkSession

"""
------------------------------------------
  Description : TODO:
  SourceFile : World_count
  Author  : BJ
  Date  : 2024/11/4
-------------------------------------------
"""

if __name__ == '__main__':
    # 配置环境
    os.environ['JAVA_HOME'] = '你的JDK路径'
    # 配置Hadoop的路径,就是前面解压的那个路径
    os.environ['HADOOP_HOME'] = '你的Hadoop路径'
    # 配置base环境Python解析器的路径
    os.environ['PYSPARK_PYTHON'] = '你的Miniconda3路径/python.exe'  # 配置base环境Python解析器的路径
    os.environ['PYSPARK_DRIVER_PYTHON'] = '你的Miniconda3路径/python.exe'
    os.environ['HADOOP_USER_NAME'] = 'root'
    spark = SparkSession \
      .builder \
      .appName("HiveAPP") \
      .master("local[2]") \
      .config("spark.sql.warehouse.dir", 'hdfs://hadoop102:8020/user/hive/warehouse') \
      .config('hive.metastore.uris', 'thrift://hadoop102:9083') \
      .config("spark.sql.shuffle.partitions", 1) \
      .enableHiveSupport() \
      .getOrCreate()

    # 从Hive表中读取数据
    hive_df = spark.sql("SELECT * FROM edu.ads_examination_question_accuracy") #hive数据库中的表

    # 配置MySQL连接信息
    mysql_url = "jdbc:mysql://你的MySQL服务器地址:3306/你的数据库名"
    mysql_properties = {
        "user": "你的MySQL用户名",
        "password": "你的MySQL密码",
        "driver": "com.mysql.jdbc.Driver"
    }

    # 将数据写入到MySQL表
    hive_df.write.jdbc(mysql_url, "ads_examination_question_accuracy", "overwrite", mysql_properties)
    # 这里写的是MySQL中的表名,可以不存在,当然也可以存在将overwrite改为append否则会删除创建好的表结构

    spark.stop()

四、代码解释

  1. 首先,通过设置环境变量配置 Java、Hadoop 和 Python 解释器的路径,以及设置 Hadoop 用户名为有权限用户。
  2. 创建 SparkSession 对象,设置应用名称为“HiveAPP”,主节点为“local[2]”,配置 Spark SQL 的仓库目录和 Hive 元数据存储的 URI,并启用 Hive 支持。
  3. 使用spark.sql()方法从 Hive 表中读取数据。
  4. 配置 MySQL 的连接 URL、用户名、密码和 JDBC 驱动类名等连接信息。
  5. 使用write.jdbc()方法将从 Hive 读取的数据写入到 MySQL 表中,指定写入模式为“overwrite”表示覆盖原有数据。
  6. 最后,调用spark.stop()方法停止 SparkSession。

五、总结

通过以上代码,我们可以使用 Spark 框架轻松地实现将 Hive 数据导入到 MySQL 的功能。这种方法具有高效、灵活的特点,可以替代传统的数据迁移工具如 datax 和 sqoop。同时,我们还可以根据实际需求进行扩展和优化,例如处理大规模数据、进行数据转换和清洗等操作。希望本文对你在大数据处理中的数据迁移工作有所帮助。

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐