<dependency>
    <groupId>com.ververica</groupId>
    <artifactId>flink-connector-mysql-cdc</artifactId>
    <version>2.0.2</version>
</dependency>
操作类型
println(Envelope.operationFor(sourceRecord))
  • READ: 初始化
    • Struct: after
  • CREATE: 插入
    • Struct: after
  • UPDATE: 更新
    • Struct: before + after
  • DELETE: 删除
    • Struct: before
import com.alibaba.fastjson.JSONObject
import com.google.gson.{Gson, JsonParser}
import com.ververica.cdc.connectors.mysql.MySqlSource
import com.ververica.cdc.connectors.mysql.table.StartupOptions
import com.ververica.cdc.debezium.DebeziumDeserializationSchema
import io.debezium.data.Envelope
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.connector.jdbc.{JdbcConnectionOptions, JdbcExecutionOptions, JdbcSink, JdbcStatementBuilder}
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.util.Collector
import org.apache.kafka.connect.data.Struct
import org.apache.kafka.connect.source.SourceRecord
import test.com.bean.MiniTodoListBean
import test.com.conf.{ConfigParser => CP}

import java.sql.{PreparedStatement, Types}
import scala.collection.JavaConversions._

object MiniTodoList2Ck {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    //    CP.setEnv(env, "mini_todolist_etl")

    val source = MySqlSource.builder()
      .hostname(CP._mysqlIP)
      .port(CP._mysqlPort)
      .username(CP._mysqlSinkUser)
      .password(CP._mysqlSinkPasswd)
      .databaseList("bike_v3")
      .tableList("bike_v3.mini_todolist_test")
      .startupOptions(StartupOptions.initial())
      .deserializer(new MiniTodoListDebeziumDeserializationSchema())
      .build()

    val insert_? = ("?" * 4).toList.mkString(",")

    env.addSource(source).process(new MiniTodoListProcessFun())
      .addSink(JdbcSink.sink[MiniTodoListBean](
        s"""
          |insert into cdc_etl.mini_todolist(id,description,modifier,sign) values(${insert_?})
          |""".stripMargin,
        new MiniTodoListSinkBuilder,
        new JdbcExecutionOptions.Builder()
          .withBatchIntervalMs(1000L)
          .withBatchSize(10000).build(),
        new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
          .withDriverName(CP._ckDriverName).withUrl(CP._ckSinkUrl)
          .withUsername(CP._ckUser).withPassword(CP._ckPasswd).build()))
      .uid("mini-todo-list-ck-sink")
      .name("工单表写ck")

    env.execute("flink-cdc");
  }
}

class MiniTodoListDebeziumDeserializationSchema extends DebeziumDeserializationSchema[String] {
  override def deserialize(sourceRecord: SourceRecord, collector: Collector[String]): Unit = {
    val struct = sourceRecord.value().asInstanceOf[Struct]
    val tp = Envelope.operationFor(sourceRecord) // 获取操作类型
    val result = new JSONObject()
    tp match {
      case Operation.READ | Operation.CREATE | Operation.UPDATE => {
        val after = struct.getStruct("after")
        put(result, after, 1)
      }
      case Operation.DELETE => {
        val before = struct.getStruct("before")
        put(result, before, -1)
      }
      case _ =>
    }
    collector.collect(result.toJSONString)
  }

  // UTC+8
  private def getTimestamp(l: Long): Long = {
    TimestampData.fromEpochMillis(l)
      .toLocalDateTime.atOffset(ZoneOffset.ofHours(8))
      .toEpochSecond * 1000L
  }

  // CollapsingMergeTree sign 1 is a “state” row, -1 is a “cancel” row.
  private def put(result: JSONObject, struct: Struct, sign: Int): Unit = {
    struct.schema().fields().foreach(f => {
      val get = struct.get(f)
      if (f.schema().name() == Timestamp.SCHEMA_NAME && get.isInstanceOf[Long]) {
        result.put(f.name(), getTimestamp(get.asInstanceOf[Long]))
      } else {
        result.put(f.name(), get)
      }
      result.put("sign", sign)
    })
  }

  override def getProducedType: TypeInformation[String] = TypeExtractor.getForClass(classOf[String])
}

class MiniTodoListProcessFun extends ProcessFunction[String, MiniTodoListBean] {
  override def processElement(record: String, context: ProcessFunction[String, MiniTodoListBean]#Context, collector: Collector[MiniTodoListBean]): Unit = {
    val gson = new Gson()
    val parser = new JsonParser()
    val element = parser.parse(record)
    collector.collect(gson.fromJson(element, classOf[MiniTodoListBean]))
  }
}

class MiniTodoListSinkBuilder extends JdbcStatementBuilder[MiniTodoListBean] {
  def accept(ps: PreparedStatement, v: MiniTodoListBean): Unit = {
    ps.setInt(1, v.id)
    ps.setString(2, v.description)
    ps.setString(3, v.modifier)
    ps.setInt(4, v.sign)
  }
}
  • serverTimeZone设置不生效, 源码RowDataDebeziumDeserializeSchema 有convertToTimestamp可参考
Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐