DataX ElasticsearchWriter 插件改造
针对DataX自带的ElasticsearchWriter 插件,按照业务使用需要进行针对性改造。
·
针对DataX自带的ElasticsearchWriter 插件,按照业务使用需要进行针对性改造。
(适配ES7.15.0版本)改造内容如下:
- 取消强制对已存在的index 进行删除。
- 新增自定义主键id,对写入数据进行更新。
- 新增自定义主键id是否存在于mapping的检查功能。
- 新增获取自定义index的mapping功能。
- 新增自定义mapping中feild的重复性检查功能。
- 新增自定义mapping与现有index的mapping比较功能。
DataX扩展功能
修改com.alibaba.datax.plugin.writer.elasticsearchwriter
Key.java新增getId方法,获取自定义key => id
public static String getId(Configuration conf) {
return conf.getString("id", "");
}
修改ESClient.java文件,新增mapping获取
(1)新增mapping获取方法
public JsonObject getIndexMapping(String indexName){
try{
JestResult rst =jestClient.execute(new GetMapping.Builder().addIndex(indexName).build());
String jsonString = rst.getJsonString();
return new JsonParser().parse(jsonString).getAsJsonObject().getAsJsonObject(indexName).getAsJsonObject("mappings").getAsJsonObject("properties");
}catch(Exception e){
e.printStackTrace();
}
return null;
}
(2)修改 deleteIndex 方法
public boolean deleteIndex(String indexName) throws Exception {
log.info("delete index " + indexName);
if (indicesExists(indexName)) {
JestResult rst = execute(new DeleteIndex.Builder(indexName).build());
if (rst.isSucceeded()){
return true;
}else {
throw DataXException.asDataXException(ESWriterErrorCode.ES_INDEX_DELETE,rst.getErrorMessage());
}
} else {
log.info("index cannot found, skip delete " + indexName);
}
return true;
}
(3)修改 indicesExists 方法
// 判断ES的Index是否存在
public boolean indicesExists(String indexName) throws Exception {
boolean isIndicesExists = false;
JestResult rst = jestClient.execute(new IndicesExists.Builder(indexName).build());
if (rst.isSucceeded()) {
isIndicesExists = true;
} else {
switch (rst.getResponseCode()) {
case 404:
isIndicesExists = false;
break;
case 401:
// 无权访问
case 403:
// 无权未认证
throw DataXException.asDataXException(ESWriterErrorCode.ES_INDEX_EXISTS,rst.getErrorMessage());
default:
log.warn(rst.getErrorMessage());
break;
}
}
return isIndicesExists;
}
(4)修改 createIndex 方法
public boolean createIndex(String indexName, String typeName, Object mappings, String settings, boolean dynamic) throws Exception {
JestResult rst = null;
if (!indicesExists(indexName)) {
log.info("create index " + indexName);
rst = jestClient.execute(
new CreateIndex.Builder(indexName)
.settings(settings)
.setParameter("master_timeout", "5m")
.build()
);
//index_already_exists_exception
if (!rst.isSucceeded()) {
if (getStatus(rst) == 400) {
log.info(String.format("index [%s] already exists", indexName));
return true;
} else {
log.error(rst.getErrorMessage());
return false;
}
} else {
log.info(String.format("create [%s] index success", indexName));
}
}
int idx = 0;
while (idx < 5) {
if (indicesExists(indexName)) {
break;
}
Thread.sleep(2000);
idx ++;
}
if (idx >= 5) {
return false;
}
if (dynamic) {
log.info("ignore mappings");
return true;
}
log.info("create mappings for " + indexName + " " + mappings);
rst = jestClient.execute(new PutMapping.Builder(indexName, typeName, mappings)
.setParameter("master_timeout", "5m")
.setParameter("include_type_name","true").build());//es7.x 需要新增include_type_name=true
if (!rst.isSucceeded()) {
if (getStatus(rst) == 400) {
log.warn(rst.getErrorMessage());
log.info(String.format("index [%s] mappings already exists", indexName));
} else {
log.error(rst.getErrorMessage());
return false;
}
} else {
log.info(String.format("index [%s] put mappings success", indexName));
}
return true;
}
修改ESWriterErrorCode.java文件
新增ES_INDEX_EXISTS 枚举项
BAD_CONFIG_VALUE("ESWriter-00", "您配置的值不合法."),
ES_INDEX_DELETE("ESWriter-01", "删除index错误."),
ES_INDEX_CREATE("ESWriter-02", "创建index错误."),
ES_INDEX_EXISTS("ESWriter-06", "判别index是否存在错误."),
ES_MAPPINGS("ESWriter-03", "mappings错误."),
ES_INDEX_INSERT("ESWriter-04", "插入数据错误."),
ES_ALIAS_MODIFY("ESWriter-05", "别名修改错误."),
修改ESWriter.java文件
(1)修改内部类Job的prepare方法,更新逻辑检查
@Override
public void prepare() {
/**
* 注意:此方法仅执行一次。
* 最佳实践:如果 Job 中有需要进行数据同步之前的处理,可以在此处完成,如果没有必要则可以直接去掉。
*/
ESClient esClient = new ESClient();
esClient.createClient(Key.getEndpoint(conf),
Key.getAccessID(conf),
Key.getAccessKey(conf),
false,
300000,
false,
false);
String indexName = Key.getIndexName(conf);
String typeName = Key.getTypeName(conf);
boolean dynamic = Key.getDynamic(conf);
String id = Key.getId(conf);
String mappings = genMappings(typeName);
String settings = JSONObject.toJSONString(Key.getSettings(conf));
log.info(String.format("index:[%s], type:[%s], mappings:[%s], id:[%s]", indexName, typeName, mappings,id));
try {
JSONObject usMap = JSONObject.parseObject(mappings).getJSONObject(typeName).getJSONObject("properties");
matchMapping(usMap);
if (id!=null){
checkId(id,usMap);
}
// 索引是否存在
boolean isIndicesExists = esClient.indicesExists(indexName);
// 是否新建索引
boolean isCleanup = Key.isCleanup(this.conf);
// 不新建索引
if(!isCleanup){
// 现有索引不存在
if (!isIndicesExists){
throw DataXException.asDataXException(String.format("index [%s] not exists and isCleanup is set false !",indexName));
}
}else {
// 现有索引存在
if (isIndicesExists) {
// 删除现有索引
esClient.deleteIndex(indexName);
}
if (!esClient.createIndex(indexName, typeName, mappings, settings, dynamic)) {
throw DataXException.asDataXException(ESWriterErrorCode.ES_INDEX_CREATE,"create index or mapping failed");
}
}
// 判断两个mapping是否一致
JsonObject indexMapping = esClient.getIndexMapping(indexName);
checkMapping(indexMapping, usMap);
// if (isCleanup && isIndicesExists) {
// esClient.deleteIndex(indexName);
// // 删除索引后新建索引
// if (!esClient.createIndex(indexName, typeName, mappings, settings, dynamic)) {
// throw new IOException("create index or mapping failed");
// }
// }
// // 强制创建,内部自动忽略已存在的情况
// if (!esClient.createIndex(indexName, typeName, mappings, settings, dynamic)) {
// throw new IOException("create index or mapping failed");
// }
} catch (Exception ex) {
throw DataXException.asDataXException(ESWriterErrorCode.ES_MAPPINGS, ex.toString());
}
esClient.closeJestClient();
}
(2)新增方法checkId,检查用户自定义主键是否存在于自定义的mapping中。
private void checkId(String id,JSONObject usMap) {
boolean isConsistent = false;
for (String field : usMap.keySet()) {
if (id.equals(field)) {
isConsistent = true;
break;
}
}
if (!isConsistent) {
throw DataXException.asDataXException(String.format("id [%s] not exists in mapping [%s]", id,usMap.toString()));
}
}
(3)新增方法matchMapping,检查用户自定义mapping是否存在重复的字段。
private void matchMapping(JSONObject usMap) {
ArrayList<String> arrayList = new ArrayList<String>();
for (String field:usMap.keySet()) {
if (arrayList.contains(field)){
throw DataXException.asDataXException(String.format(" [%s] Field duplication ", field));
}else {
arrayList.add(field);
}
}
}
(4)新增方法checkMapping,检查用户填写的mapping是否匹配es已有索引的mapping。
// 比较用户填写的mapping是否和es的mapping一致
private void checkMapping(JsonObject esMap,JSONObject usMap) {
int esMappingSize = esMap.entrySet().size();
int usMappingSize = usMap.size();
if (esMappingSize==usMappingSize) {
boolean isConsistent = true;
Iterator<String> iterator = usMap.keySet().iterator();
while (iterator.hasNext()) {
String next = iterator.next();
isConsistent = esMap.has(next);
if (!isConsistent) {
log.error(String.format("mapping field [%s] not exists", next));
throw DataXException.asDataXException(String.format("mapping field [%s] not exists", next));
}
}
}else {
throw DataXException.asDataXException("mapping nums not match");
}
}
(5)修改内部类Task,新增 id属性
private String id;
(6)修改内部类Task,修改doBatchInsert方法,更新提交数据获取id的逻辑
private long doBatchInsert(final List<Record> writerBuffer) {
Map<String, Object> data = null;
final Bulk.Builder bulkaction = new Bulk.Builder().defaultIndex(this.index).defaultType(this.type);
for (Record record : writerBuffer) {
data = new HashMap<String, Object>();
String id = null;
for (int i = 0; i < record.getColumnNumber(); i++) {
Column column = record.getColumn(i);
// 字段名称
String columnName = columnList.get(i).getName();
ESFieldType columnType = typeList.get(i);
// 获取主键字段值
if (columnName.equals(this.id)){
if (id != null) {
id += record.getColumn(i).asString();
} else {
id = record.getColumn(i).asString();
}
}
//如果是数组类型,那它传入的必是字符串类型
if (columnList.get(i).isArray() != null && columnList.get(i).isArray()) {
String[] dataList = column.asString().split(splitter);
if (!columnType.equals(ESFieldType.DATE)) {
data.put(columnName, dataList);
} else {
for (int pos = 0; pos < dataList.length; pos++) {
dataList[pos] = getDateStr(columnList.get(i), column);
}
data.put(columnName, dataList);
}
} else {
switch (columnType) {
case ID: //获取id数据
if (this.id!=null){
if (id != null) {
id += record.getColumn(i).asString();
} else {
id = record.getColumn(i).asString();
}
}
break;
case DATE:
try {
String dateStr = getDateStr(columnList.get(i), column);
data.put(columnName, dateStr);
} catch (Exception e) {
getTaskPluginCollector().collectDirtyRecord(record, String.format("时间类型解析失败 [%s:%s] exception: %s", columnName, column.toString(), e.toString()));
}
break;
.....
default:
getTaskPluginCollector().collectDirtyRecord(record, "类型错误:不支持的类型:" + columnType + " " + columnName);
}
}
}
if (id == null) {
//id = UUID.randomUUID().toString();
bulkaction.addAction(new Index.Builder(data).build());
} else {
bulkaction.addAction(new Index.Builder(data).id(id).build());
}
}
try {
...
}
return 0;
}
Datax项目修改打包
mvn -U clean package assembly:assembly -Dmaven.test.skip=true
打包完成后把target中elasticsearchwriter-0.0.1-SNAPSHOT.jar替换原文件即可。
使用样例
{
"job": {
"setting": {
"speed": {
"channel": 1
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "123456",
"connection": [
{
"jdbcUrl": ["jdbc:mysql://cdh001:3306/test?useUnicode=yes&character_set_server=utf8mb4&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai"],
"querySql": ["select id,number,name,age,height,gander,address,schoolName,introduction,major,grade,intersets,discription,mottor,isLeader,email,phoneNumber,updateTime,now() from pressure_test where DATE_FORMAT(updateTime,'%Y%m%d')='20230106'"]
}
]
}
},
"writer": {
"name": "elasticsearchwriter",
"parameter": {
"endpoint":"http://cdh001:9200",
"accessId":"es_w",
"accessKey":"123456",
"index":"ods_jcjy_xs_pressure_test_1",
"type":"_doc",
"batchSize":5000,
"trySize":5,
"compression":true,
"multiThread":true,
"ignoreWriteError":false,
"ignoreParseError":false,
"aliasMode":"exclusive",
"id":"id",
"column": [
{"name": "id","type": "long"},
{"name": "number","type": "text"},
{"name": "name","type": "text"},
{"name": "age","type": "integer"},
{"name": "height","type": "double"},
{"name": "gander","type": "text"},
{"name": "address","type": "text"},
{"name": "schoolname","type": "text"},
{"name": "introduction","type": "text"},
{"name": "major","type": "text"},
{"name": "grade","type": "text"},
{"name": "intersets","type": "text"},
{"name": "discription","type": "text"},
{"name": "mottor","type": "text"},
{"name": "isleader","type": "boolean"},
{"name": "email","type": "text"},
{"name": "phonenumber","type": "text"},
{"name": "updatetime","type": "text"},
{"name": "time_stamp","type": "text"}
]
}
}
}
]
}
}
期间遇错:
es创建mapping报错
{
"root_cause": [
{
"type": "illegal_argument_exception",
"reason": "Types cannot be provided in put mapping requests, unless the include_type_name parameter is set to true."
}
],
"type": "illegal_argument_exception",
"reason": "Types cannot be provided in put mapping requests, unless the include_type_name parameter is set to true."
}
版本问题,es7.0之后不支持type导致,本文使用的版本为7.15.0,需要修改url,额外添加参数
?include_type_name=true
等同命令
PUT /索引/_mapping/grid_cell_0?include_type_name=true
更多推荐
所有评论(0)