【flink cdc】flink读mysql写clickhouse
1. 使用flink cdc 读mysql 写clickhouse2. clickhouse使用CollapsingMergeTree sign 1 is a “state” row, -1 is a “cancel” row.3. 解决时间差8个小时问题
·
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.0.2</version>
</dependency>
操作类型
println(Envelope.operationFor(sourceRecord))
- READ: 初始化
- Struct: after
- CREATE: 插入
- Struct: after
- UPDATE: 更新
- Struct: before + after
- DELETE: 删除
- Struct: before
import com.alibaba.fastjson.JSONObject
import com.google.gson.{Gson, JsonParser}
import com.ververica.cdc.connectors.mysql.MySqlSource
import com.ververica.cdc.connectors.mysql.table.StartupOptions
import com.ververica.cdc.debezium.DebeziumDeserializationSchema
import io.debezium.data.Envelope
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.connector.jdbc.{JdbcConnectionOptions, JdbcExecutionOptions, JdbcSink, JdbcStatementBuilder}
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.util.Collector
import org.apache.kafka.connect.data.Struct
import org.apache.kafka.connect.source.SourceRecord
import test.com.bean.MiniTodoListBean
import test.com.conf.{ConfigParser => CP}
import java.sql.{PreparedStatement, Types}
import scala.collection.JavaConversions._
object MiniTodoList2Ck {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// CP.setEnv(env, "mini_todolist_etl")
val source = MySqlSource.builder()
.hostname(CP._mysqlIP)
.port(CP._mysqlPort)
.username(CP._mysqlSinkUser)
.password(CP._mysqlSinkPasswd)
.databaseList("bike_v3")
.tableList("bike_v3.mini_todolist_test")
.startupOptions(StartupOptions.initial())
.deserializer(new MiniTodoListDebeziumDeserializationSchema())
.build()
val insert_? = ("?" * 4).toList.mkString(",")
env.addSource(source).process(new MiniTodoListProcessFun())
.addSink(JdbcSink.sink[MiniTodoListBean](
s"""
|insert into cdc_etl.mini_todolist(id,description,modifier,sign) values(${insert_?})
|""".stripMargin,
new MiniTodoListSinkBuilder,
new JdbcExecutionOptions.Builder()
.withBatchIntervalMs(1000L)
.withBatchSize(10000).build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withDriverName(CP._ckDriverName).withUrl(CP._ckSinkUrl)
.withUsername(CP._ckUser).withPassword(CP._ckPasswd).build()))
.uid("mini-todo-list-ck-sink")
.name("工单表写ck")
env.execute("flink-cdc");
}
}
class MiniTodoListDebeziumDeserializationSchema extends DebeziumDeserializationSchema[String] {
override def deserialize(sourceRecord: SourceRecord, collector: Collector[String]): Unit = {
val struct = sourceRecord.value().asInstanceOf[Struct]
val tp = Envelope.operationFor(sourceRecord) // 获取操作类型
val result = new JSONObject()
tp match {
case Operation.READ | Operation.CREATE | Operation.UPDATE => {
val after = struct.getStruct("after")
put(result, after, 1)
}
case Operation.DELETE => {
val before = struct.getStruct("before")
put(result, before, -1)
}
case _ =>
}
collector.collect(result.toJSONString)
}
// UTC+8
private def getTimestamp(l: Long): Long = {
TimestampData.fromEpochMillis(l)
.toLocalDateTime.atOffset(ZoneOffset.ofHours(8))
.toEpochSecond * 1000L
}
// CollapsingMergeTree sign 1 is a “state” row, -1 is a “cancel” row.
private def put(result: JSONObject, struct: Struct, sign: Int): Unit = {
struct.schema().fields().foreach(f => {
val get = struct.get(f)
if (f.schema().name() == Timestamp.SCHEMA_NAME && get.isInstanceOf[Long]) {
result.put(f.name(), getTimestamp(get.asInstanceOf[Long]))
} else {
result.put(f.name(), get)
}
result.put("sign", sign)
})
}
override def getProducedType: TypeInformation[String] = TypeExtractor.getForClass(classOf[String])
}
class MiniTodoListProcessFun extends ProcessFunction[String, MiniTodoListBean] {
override def processElement(record: String, context: ProcessFunction[String, MiniTodoListBean]#Context, collector: Collector[MiniTodoListBean]): Unit = {
val gson = new Gson()
val parser = new JsonParser()
val element = parser.parse(record)
collector.collect(gson.fromJson(element, classOf[MiniTodoListBean]))
}
}
class MiniTodoListSinkBuilder extends JdbcStatementBuilder[MiniTodoListBean] {
def accept(ps: PreparedStatement, v: MiniTodoListBean): Unit = {
ps.setInt(1, v.id)
ps.setString(2, v.description)
ps.setString(3, v.modifier)
ps.setInt(4, v.sign)
}
}
- serverTimeZone设置不生效, 源码RowDataDebeziumDeserializeSchema 有convertToTimestamp可参考
更多推荐
已为社区贡献1条内容
所有评论(0)