• 达梦数据库自定义插件

    • 达梦8的依赖引入

    • 定义reader module

    • 定义writer module

    • 修改核心配置数据库类型支持

    • 打包插件

  • 测试

    • 以mysql到dm数据库为例

    • 配置mysql2dm.json

    • 执行任务

    • 查询下结果

DataX二次开发之达梦数据库插件

DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,支持大部分主流的数据库之间的数据同步,也提供了数据库插件的自定义开发。

达梦数据库自定义插件

插件自定义开发详细看官网dataxPluginDev.md说明文档

达梦8的依赖引入

<!-- dm driver -->
  <!-- https://mvnrepository.com/artifact/com.dameng/DmJdbcDriver18 -->
  <dependency>
   <groupId>com.dameng</groupId>
   <artifactId>DmJdbcDriver18</artifactId>
   <version>8.1.3.140</version>
  </dependency>

定义reader module

427e4eba9382d60f4f333f4321cf9652.png

定义writer module

753f83ef60536194647e87db09ab7ac9.png

修改核心配置数据库类型支持

public enum DataBaseType {
    Dm("dm", "dm.jdbc.driver.DmDriver"),
   //...省略其他

    private String typeName;
    private String driverClassName;

    DataBaseType(String typeName, String driverClassName) {
        this.typeName = typeName;
        this.driverClassName = driverClassName;
    }

    public String getDriverClassName() {
        return this.driverClassName;
    }

    public String appendJDBCSuffixForReader(String jdbc) {
      
            case Oracle:
                break;
            case Dm:
              //...省略其他
            default:
                throw DataXException.asDataXException(DBUtilErrorCode.UNSUPPORTED_TYPE, "unsupported database type.");
        }

        return result;
    }

    public String appendJDBCSuffixForWriter(String jdbc) {
        String result = jdbc;
        String suffix = null;
        switch (this) {
       
            case Oracle:
                break;
            case Dm:
                break;
          //...省略其他
            default:
                throw DataXException.asDataXException(DBUtilErrorCode.UNSUPPORTED_TYPE, "unsupported database type.");
        }

        return result;
    }

    public String formatPk(String splitPk) {
        String result = splitPk;

        switch (this) {
            case MySql:
            
            case Dm:
                if (splitPk.length() >= 2 && splitPk.startsWith("`") && splitPk.endsWith("`")) {
                    result = splitPk.substring(1, splitPk.length() - 1).toLowerCase();
                }
                break;
              //...省略其他
            default:
                throw DataXException.asDataXException(DBUtilErrorCode.UNSUPPORTED_TYPE, "unsupported database type.");
        }

        return result;
    }


    public String quoteColumnName(String columnName) {
        String result = columnName;

        switch (this) {
          
            case Dm:
               //...省略其他
            default:
                throw DataXException.asDataXException(DBUtilErrorCode.UNSUPPORTED_TYPE, "unsupported database type");
        }

        return result;
    }

    public String quoteTableName(String tableName) {
        String result = tableName;

        switch (this) {
            case MySql:
                result = "`" + tableName.replace("`", "``") + "`";
                break;
            case Oracle:
                break;
            case Dm:
                break;
              //...省略其他
            default:
                throw DataXException.asDataXException(DBUtilErrorCode.UNSUPPORTED_TYPE, "unsupported database type");
        }

        return result;
    }

    private static Pattern mysqlPattern = Pattern.compile("jdbc:mysql://(.+):\\d+/.+");
    private static Pattern oraclePattern = Pattern.compile("jdbc:oracle:thin:@(.+):\\d+:.+");
    private static Pattern dmPattern = Pattern.compile("jdbc:dm://:@(.+):\\d+:.+");

    /**
     * 注意:目前只实现了从 mysql/oracle 中识别出ip 信息.未识别到则返回 null.
     */
    public static String parseIpFromJdbcUrl(String jdbcUrl) {
        Matcher mysql = mysqlPattern.matcher(jdbcUrl);
        if (mysql.matches()) {
            return mysql.group(1);
        }
        Matcher oracle = oraclePattern.matcher(jdbcUrl);
        if (oracle.matches()) {
            return oracle.group(1);
        }
        Matcher dm = dmPattern.matcher(jdbcUrl);
        if (dm.matches()) {
            return dm.group(1);
        }
        return null;
    }
      //...省略其他

}
62703333fb8b7fecf2676a973b78a347.png

打包插件

需要在父类的插件里边配置模块以及插件打包配置

cbc91b208a8998075770c582df403a96.png

maven执行以下命令

mvn -U clean package assembly:assembly -Dmaven.test.skip=true
7eafa8eeaa66d2915ce299753a49af7a.png

测试

以mysql到dm数据库为例

mysql建立表并插入数据

-- test.psn definition

CREATE TABLE `psn` (
  `id` int(11) NOT NULL,
  `name` text,
  `address` text
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
INSERT INTO psn
(id, name, address)
VALUES(1, 'elite', 'gz');
INSERT INTO psn
(id, name, address)
VALUES(2, 'tom', 'bj');
INSERT INTO psn
(id, name, address)
VALUES(3, 'jack', 'sz');
INSERT INTO psn
(id, name, address)
VALUES(4, 'json', 'sh');

在达梦数据库里边创建一个表psn

CREATE TABLE test.psn
(
 id int NOT NULL,
 name VARCHAR(40) NULL,
 address VARCHAR(100)
);

配置mysql2dm.json

关系型数据库插件都差不多的配置

{
   "job": {
    "setting": {
      "speed": {
        "channel":2
      }
    },
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "username": "root",
            "password": "123456",
            "splitPk": "id",
            "column": ["id","name","address"],
            "connection": [
              {
                "jdbcUrl": ["jdbc:mysql://mysqlip:3306/test?useUnicode=true&characterEncoding=UTF-8&useSSL=false"],
                "table": ["psn"]
              }
            ]
          }
        },
        "writer": {
          "name": "dmwriter",
          "parameter": {
            "username": "test",
            "password": "123456@dm",
            "column": ["id","name","address"],
            "connection": [
              {
                "table": [
                  "psn"
                ],
                "jdbcUrl": "jdbc:dm://ip:5236?schema=TEST"
              }
            ]
          }
        }
      }
    ]
  }
}

执行任务

执行任务可以用命令测试,详细可以参考官网,或者用java,本例以java为例

84640ca02f501a3a10ecb940fe2ba05c.png

查询下结果

016b602737ec549814115ddc1557c06f.png

c96703240207aac0930e9d818bfd55c9.jpeg

6db9125d83b547ff71727b23ab5de297.png

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐