Kafka Connect 是一个强大的工具,简化了 Kafka 与其他系统的数据集成过程。通过合理配置和使用 Kafka Connect,可以高效地实现数据的实时同步和处理,提高了数据流动的效率和可靠性。

我们为什么选择Kafka Connect?

  • 减少开发工作量:Kafka Connect 提供了大量的预构建连接器,可以直接用于常见的数据源(如 MySQL、PostgreSQL、Elasticsearch 等),减少了自定义开发的工作量。

  • 标准化流程:通过标准的配置文件进行配置,确保数据集成过程的一致性和可维护性。

  • 分布式架构:Kafka Connect 支持分布式模式,可以水平扩展以处理大量数据,确保高吞吐量和低延迟。

  • 容错机制:在分布式模式下,多个 Worker 节点协同工作,即使某个节点失败,其他节点也能继续处理任务,提高系统的可靠性和可用性。

  • 低延迟传输:Kafka Connect 可以实现实时的数据流式传输,确保数据从源头到目的地的时间延迟最小化。

  • 支持多种协议:Kafka Connect 支持多种数据格式和传输协议,满足不同的实时数据需求。

  • JMX 监控:Kafka Connect 集成了 JMX 监控,方便管理和调试。

  • 开源免费:Kafka Connect 是 Apache 开源项目,免费使用且无许可费用。

关键组件

  • Connector:负责管理与外部系统的交互。每个连接器可以有一个或多个任务。

  • Task:实际执行数据导入导出工作的单元。

  • Converter:负责在 Kafka 消息格式(通常是字节数组)和其他格式之间进行转换。

  • Transformations:在数据进入或离开 Kafka 之前对数据进行处理和转换。

  • Worker:运行连接器和任务的进程。在分布式模式下,有多个 Worker 协同工作。

配置Kafka Connect

定义Kafka Connect如何从Kafka主题读取消息并将其写入Elasticsearch。

name=order-sink-connector
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=orders
connection.url=http://localhost:9200
type.name=kafka-connect
key.ignore=true
schema.ignore=true

代码实操

<dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
    </dependency>

配置Kafka生产者

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
publicclass KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        returnnew DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        returnnew KafkaTemplate<>(producerFactory());
    }
}

Controller

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/api/orders")
publicclass OrderController {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @PostMapping
    public void sendOrder(@RequestBody String orderData) {
        kafkaTemplate.send("orders", orderData);
    }
}

Application

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class OrderApplication {

    public static void main(String[] args) {
        SpringApplication.run(OrderApplication.class, args);
    }
}

测试

发送订单数据

curl -X POST http://localhost:8080/api/orders -H "Content-Type: application/json" -d '{"orderId": "1", "productName": "Laptop", "quantity": 1, "price": 999.99}'

验证Elasticsearch中的数据

curl -X GET "http://localhost:9200/orders/_search?pretty"

Respons

{
  "took" : 2,
"timed_out" : false,
"_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
"hits" : {
    "total" : {
      "value" : 1,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "orders",
        "_type" : "kafka-connect",
        "_id" : "AWyGvZJxQhWqoOeTzFwA",
        "_score" : 1.0,
        "_source" : {
          "orderId" : "1",
          "productName" : "Laptop",
          "quantity" : 1,
          "price" : 999.99
        }
      }
    ]
  }
}

关注我,送Java福利

/**
 * 这段代码只有Java开发者才能看得懂!
 * 关注我微信公众号之后,
 * 发送:"666",
 * 即可获得一本由Java大神一手面试经验诚意出品
 * 《Java开发者面试百宝书》Pdf电子书
 * 福利截止日期为2025年02月28日止
 * 手快有手慢没!!!
*/
System.out.println("请关注我的微信公众号:");
System.out.println("Java知识日历");
Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐