针对DataX自带的ElasticsearchWriter 插件,按照业务使用需要进行针对性改造。
(适配ES7.15.0版本)改造内容如下:

  • 取消强制对已存在的index 进行删除。
  • 新增自定义主键id,对写入数据进行更新。
  • 新增自定义主键id是否存在于mapping的检查功能。
  • 新增获取自定义index的mapping功能。
  • 新增自定义mapping中feild的重复性检查功能。
  • 新增自定义mapping与现有index的mapping比较功能。

DataX扩展功能

修改com.alibaba.datax.plugin.writer.elasticsearchwriter

Key.java新增getId方法,获取自定义key => id

public static String getId(Configuration conf) {
    return conf.getString("id", "");
}

修改ESClient.java文件,新增mapping获取

(1)新增mapping获取方法

public JsonObject getIndexMapping(String indexName){
    try{
        JestResult rst =jestClient.execute(new GetMapping.Builder().addIndex(indexName).build());
        String jsonString = rst.getJsonString();
        return new JsonParser().parse(jsonString).getAsJsonObject().getAsJsonObject(indexName).getAsJsonObject("mappings").getAsJsonObject("properties");
    }catch(Exception e){
        e.printStackTrace();
    }
    return null;
}

(2)修改 deleteIndex 方法

public boolean deleteIndex(String indexName) throws Exception {
    log.info("delete index " + indexName);
    if (indicesExists(indexName)) {
        JestResult rst = execute(new DeleteIndex.Builder(indexName).build());
        if (rst.isSucceeded()){
            return true;
        }else {
            throw DataXException.asDataXException(ESWriterErrorCode.ES_INDEX_DELETE,rst.getErrorMessage());
        }
    } else {
        log.info("index cannot found, skip delete " + indexName);
    }
    return true;
}

(3)修改 indicesExists 方法

// 判断ES的Index是否存在
public boolean indicesExists(String indexName) throws Exception {
    boolean isIndicesExists = false;
    JestResult rst = jestClient.execute(new IndicesExists.Builder(indexName).build());
    if (rst.isSucceeded()) {
        isIndicesExists = true;
    } else {
        switch (rst.getResponseCode()) {
            case 404:
                isIndicesExists = false;
                break;
            case 401:
                // 无权访问
            case 403:
                // 无权未认证
                throw DataXException.asDataXException(ESWriterErrorCode.ES_INDEX_EXISTS,rst.getErrorMessage());
            default:
                log.warn(rst.getErrorMessage());
                break;
        }
    }
    return isIndicesExists;
}

(4)修改 createIndex 方法

public boolean createIndex(String indexName, String typeName, Object mappings, String settings, boolean dynamic) throws Exception {
    JestResult rst = null;
    if (!indicesExists(indexName)) {
        log.info("create index " + indexName);
        rst = jestClient.execute(
                new CreateIndex.Builder(indexName)
                        .settings(settings)
                        .setParameter("master_timeout", "5m")
                        .build()
        );
        //index_already_exists_exception
        if (!rst.isSucceeded()) {
            if (getStatus(rst) == 400) {
                log.info(String.format("index [%s] already exists", indexName));
                return true;
            } else {
                log.error(rst.getErrorMessage());
                return false;
            }
        } else {
            log.info(String.format("create [%s] index success", indexName));
        }
    }

    int idx = 0;
    while (idx < 5) {
        if (indicesExists(indexName)) {
            break;
        }
        Thread.sleep(2000);
        idx ++;
    }
    if (idx >= 5) {
        return false;
    }

    if (dynamic) {
        log.info("ignore mappings");
        return true;
    }
    log.info("create mappings for " + indexName + "  " + mappings);
    rst = jestClient.execute(new PutMapping.Builder(indexName, typeName, mappings)
            .setParameter("master_timeout", "5m")
            .setParameter("include_type_name","true").build());//es7.x 需要新增include_type_name=true
    if (!rst.isSucceeded()) {
        if (getStatus(rst) == 400) {
            log.warn(rst.getErrorMessage());
            log.info(String.format("index [%s] mappings already exists", indexName));
        } else {
            log.error(rst.getErrorMessage());
            return false;
        }
    } else {
        log.info(String.format("index [%s] put mappings success", indexName));
    }
    return true;
}

修改ESWriterErrorCode.java文件

新增ES_INDEX_EXISTS 枚举项

BAD_CONFIG_VALUE("ESWriter-00", "您配置的值不合法."),
ES_INDEX_DELETE("ESWriter-01", "删除index错误."),
ES_INDEX_CREATE("ESWriter-02", "创建index错误."),
ES_INDEX_EXISTS("ESWriter-06", "判别index是否存在错误."),
ES_MAPPINGS("ESWriter-03", "mappings错误."),
ES_INDEX_INSERT("ESWriter-04", "插入数据错误."),
ES_ALIAS_MODIFY("ESWriter-05", "别名修改错误."),

修改ESWriter.java文件

(1)修改内部类Job的prepare方法,更新逻辑检查

@Override
public void prepare() {
    /**
     * 注意:此方法仅执行一次。
     * 最佳实践:如果 Job 中有需要进行数据同步之前的处理,可以在此处完成,如果没有必要则可以直接去掉。
     */
    ESClient esClient = new ESClient();
    esClient.createClient(Key.getEndpoint(conf),
            Key.getAccessID(conf),
            Key.getAccessKey(conf),
            false,
            300000,
            false,
            false);

    String indexName = Key.getIndexName(conf);
    String typeName = Key.getTypeName(conf);
    boolean dynamic = Key.getDynamic(conf);
    String id = Key.getId(conf);
    String mappings = genMappings(typeName);
    String settings = JSONObject.toJSONString(Key.getSettings(conf));
    log.info(String.format("index:[%s], type:[%s], mappings:[%s], id:[%s]", indexName, typeName, mappings,id));

    try {
        JSONObject usMap = JSONObject.parseObject(mappings).getJSONObject(typeName).getJSONObject("properties");
        matchMapping(usMap);

        if (id!=null){
            checkId(id,usMap);
        }
        // 索引是否存在
        boolean isIndicesExists = esClient.indicesExists(indexName);
        // 是否新建索引
        boolean isCleanup = Key.isCleanup(this.conf);

        // 不新建索引
        if(!isCleanup){
            // 现有索引不存在
            if (!isIndicesExists){
                throw DataXException.asDataXException(String.format("index [%s] not exists and isCleanup is set false !",indexName));
            }
        }else {
            // 现有索引存在
            if (isIndicesExists) {
                // 删除现有索引
                esClient.deleteIndex(indexName);
            }
            if (!esClient.createIndex(indexName, typeName, mappings, settings, dynamic)) {
                throw DataXException.asDataXException(ESWriterErrorCode.ES_INDEX_CREATE,"create index or mapping failed");
            }
        }

        // 判断两个mapping是否一致
        JsonObject indexMapping = esClient.getIndexMapping(indexName);
        checkMapping(indexMapping, usMap);

//                if (isCleanup && isIndicesExists) {
//                    esClient.deleteIndex(indexName);
//                    // 删除索引后新建索引
//                    if (!esClient.createIndex(indexName, typeName, mappings, settings, dynamic)) {
//                        throw new IOException("create index or mapping failed");
//                    }
//                }
//                // 强制创建,内部自动忽略已存在的情况
//                if (!esClient.createIndex(indexName, typeName, mappings, settings, dynamic)) {
//                    throw new IOException("create index or mapping failed");
//                }
    } catch (Exception ex) {
        throw DataXException.asDataXException(ESWriterErrorCode.ES_MAPPINGS, ex.toString());
    }
    esClient.closeJestClient();
}

(2)新增方法checkId,检查用户自定义主键是否存在于自定义的mapping中。

private void checkId(String id,JSONObject usMap) {
    boolean isConsistent = false;
    for (String field : usMap.keySet()) {
        if (id.equals(field)) {
            isConsistent = true;
            break;
        }
    }
    if (!isConsistent) {
        throw DataXException.asDataXException(String.format("id [%s] not exists in mapping [%s]", id,usMap.toString()));
    }
}

(3)新增方法matchMapping,检查用户自定义mapping是否存在重复的字段。

private void matchMapping(JSONObject usMap) {
    ArrayList<String> arrayList = new ArrayList<String>();
    for (String field:usMap.keySet()) {
        if (arrayList.contains(field)){
            throw DataXException.asDataXException(String.format(" [%s] Field duplication ", field));
        }else {
            arrayList.add(field);
        }
    }
}

(4)新增方法checkMapping,检查用户填写的mapping是否匹配es已有索引的mapping。

// 比较用户填写的mapping是否和es的mapping一致
private void checkMapping(JsonObject esMap,JSONObject usMap) {
    int esMappingSize = esMap.entrySet().size();
    int usMappingSize = usMap.size();
    if (esMappingSize==usMappingSize) {
        boolean isConsistent = true;
        Iterator<String> iterator = usMap.keySet().iterator();
        while (iterator.hasNext()) {
            String next = iterator.next();
            isConsistent = esMap.has(next);
            if (!isConsistent) {
                log.error(String.format("mapping field [%s] not exists", next));
                throw DataXException.asDataXException(String.format("mapping field [%s] not exists", next));
            }
        }
    }else {
        throw DataXException.asDataXException("mapping nums not match");
    }
}

(5)修改内部类Task,新增 id属性

private String id;

(6)修改内部类Task,修改doBatchInsert方法,更新提交数据获取id的逻辑

private long doBatchInsert(final List<Record> writerBuffer) {
    Map<String, Object> data = null;
    final Bulk.Builder bulkaction = new Bulk.Builder().defaultIndex(this.index).defaultType(this.type);
    for (Record record : writerBuffer) {
        data = new HashMap<String, Object>();
        String id = null;
        for (int i = 0; i < record.getColumnNumber(); i++) {
            Column column = record.getColumn(i);
            // 字段名称
            String columnName = columnList.get(i).getName();
            ESFieldType columnType = typeList.get(i);
            // 获取主键字段值
            if (columnName.equals(this.id)){
                if (id != null) {
                    id += record.getColumn(i).asString();
                } else {
                    id = record.getColumn(i).asString();
                }
            }
            //如果是数组类型,那它传入的必是字符串类型
            if (columnList.get(i).isArray() != null && columnList.get(i).isArray()) {
                String[] dataList = column.asString().split(splitter);
                if (!columnType.equals(ESFieldType.DATE)) {
                    data.put(columnName, dataList);
                } else {
                    for (int pos = 0; pos < dataList.length; pos++) {
                        dataList[pos] = getDateStr(columnList.get(i), column);
                    }
                    data.put(columnName, dataList);
                }
            } else {
                switch (columnType) {
                    case ID: //获取id数据
                        if (this.id!=null){
                            if (id != null) {
                                id += record.getColumn(i).asString();
                            } else {
                                id = record.getColumn(i).asString();
                            }
                        }
                        break;
                    case DATE:
                        try {
                            String dateStr = getDateStr(columnList.get(i), column);
                            data.put(columnName, dateStr);
                        } catch (Exception e) {
                            getTaskPluginCollector().collectDirtyRecord(record, String.format("时间类型解析失败 [%s:%s] exception: %s", columnName, column.toString(), e.toString()));
                        }
                        break;
                      .....
                    default:
                        getTaskPluginCollector().collectDirtyRecord(record, "类型错误:不支持的类型:" + columnType + " " + columnName);
                }
            }
        }

        if (id == null) {
            //id = UUID.randomUUID().toString();
            bulkaction.addAction(new Index.Builder(data).build());
        } else {
            bulkaction.addAction(new Index.Builder(data).id(id).build());
        }
    }

    try {
    ...
    }
    return 0;
}

Datax项目修改打包

mvn -U clean package assembly:assembly -Dmaven.test.skip=true
打包完成后把target中elasticsearchwriter-0.0.1-SNAPSHOT.jar替换原文件即可。

使用样例

{
    "job": {
        "setting": {
            "speed": {
                "channel": 1
            }
        },
        "content": [
            {
            "reader": {
                "name": "mysqlreader",
                "parameter": {
                    "username": "root",
                    "password": "123456",
                    "connection": [
                        {
                            "jdbcUrl": ["jdbc:mysql://cdh001:3306/test?useUnicode=yes&character_set_server=utf8mb4&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai"],
                            "querySql": ["select id,number,name,age,height,gander,address,schoolName,introduction,major,grade,intersets,discription,mottor,isLeader,email,phoneNumber,updateTime,now() from pressure_test where DATE_FORMAT(updateTime,'%Y%m%d')='20230106'"]
                        }
                    ]
                }
            },
             "writer": {
                 "name": "elasticsearchwriter",
                 "parameter": {
                    "endpoint":"http://cdh001:9200",
                    "accessId":"es_w",
                    "accessKey":"123456",
                    "index":"ods_jcjy_xs_pressure_test_1",
                    "type":"_doc",
                    "batchSize":5000,
                    "trySize":5,
                    "compression":true,
                    "multiThread":true,
                    "ignoreWriteError":false,
                    "ignoreParseError":false,
                    "aliasMode":"exclusive",
                    "id":"id",
                     "column": [
                             {"name": "id","type": "long"},
                             {"name": "number","type": "text"},
                            {"name": "name","type": "text"},
                             {"name": "age","type": "integer"},
                             {"name": "height","type": "double"},
                             {"name": "gander","type": "text"},
                             {"name": "address","type": "text"},
                             {"name": "schoolname","type": "text"},
                            {"name": "introduction","type": "text"},
                             {"name": "major","type": "text"},
                            {"name": "grade","type": "text"},
                            {"name": "intersets","type": "text"},
                            {"name": "discription","type": "text"},
                            {"name": "mottor","type": "text"},
                            {"name": "isleader","type": "boolean"},
                            {"name": "email","type": "text"},
                            {"name": "phonenumber","type": "text"},
                            {"name": "updatetime","type": "text"},
                            {"name": "time_stamp","type": "text"}
                     ]
                    }
                }
            }
        ]
    }
}

期间遇错:

es创建mapping报错

{
	"root_cause": [
		{
			"type": "illegal_argument_exception",
			"reason": "Types cannot be provided in put mapping requests, unless the include_type_name parameter is set to true."
		}
	],
	"type": "illegal_argument_exception",
	"reason": "Types cannot be provided in put mapping requests, unless the include_type_name parameter is set to true."
}

版本问题,es7.0之后不支持type导致,本文使用的版本为7.15.0,需要修改url,额外添加参数

?include_type_name=true

等同命令

PUT /索引/_mapping/grid_cell_0?include_type_name=true
Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐