<span class=“js_title_inner“>SpringBoot与Connect整合,实现订单数据实时同步至Elasticsearch功能</span>
Kafka Connect 是一个强大的工具,简化了 Kafka 与其他系统的数据集成过程。通过合理配置和使用 Kafka Connect,可以高效地实现数据的实时同步和处理,提高了数据流动的效率和可靠性。
我们为什么选择Kafka Connect?
-
减少开发工作量:Kafka Connect 提供了大量的预构建连接器,可以直接用于常见的数据源(如 MySQL、PostgreSQL、Elasticsearch 等),减少了自定义开发的工作量。
-
标准化流程:通过标准的配置文件进行配置,确保数据集成过程的一致性和可维护性。
-
分布式架构:Kafka Connect 支持分布式模式,可以水平扩展以处理大量数据,确保高吞吐量和低延迟。
-
容错机制:在分布式模式下,多个 Worker 节点协同工作,即使某个节点失败,其他节点也能继续处理任务,提高系统的可靠性和可用性。
-
低延迟传输:Kafka Connect 可以实现实时的数据流式传输,确保数据从源头到目的地的时间延迟最小化。
-
支持多种协议:Kafka Connect 支持多种数据格式和传输协议,满足不同的实时数据需求。
-
JMX 监控:Kafka Connect 集成了 JMX 监控,方便管理和调试。
-
开源免费:Kafka Connect 是 Apache 开源项目,免费使用且无许可费用。
关键组件
-
Connector:负责管理与外部系统的交互。每个连接器可以有一个或多个任务。
-
Task:实际执行数据导入导出工作的单元。
-
Converter:负责在 Kafka 消息格式(通常是字节数组)和其他格式之间进行转换。
-
Transformations:在数据进入或离开 Kafka 之前对数据进行处理和转换。
-
Worker:运行连接器和任务的进程。在分布式模式下,有多个 Worker 协同工作。
配置Kafka Connect
“定义Kafka Connect如何从Kafka主题读取消息并将其写入Elasticsearch。
name=order-sink-connector
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=orders
connection.url=http://localhost:9200
type.name=kafka-connect
key.ignore=true
schema.ignore=true
代码实操
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
配置Kafka生产者
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
@Configuration
publicclass KafkaProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
returnnew DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
returnnew KafkaTemplate<>(producerFactory());
}
}
Controller
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/api/orders")
publicclass OrderController {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@PostMapping
public void sendOrder(@RequestBody String orderData) {
kafkaTemplate.send("orders", orderData);
}
}
Application
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class OrderApplication {
public static void main(String[] args) {
SpringApplication.run(OrderApplication.class, args);
}
}
测试
发送订单数据
curl -X POST http://localhost:8080/api/orders -H "Content-Type: application/json" -d '{"orderId": "1", "productName": "Laptop", "quantity": 1, "price": 999.99}'
验证Elasticsearch中的数据
curl -X GET "http://localhost:9200/orders/_search?pretty"
Respons
{
"took" : 2,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 1,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "orders",
"_type" : "kafka-connect",
"_id" : "AWyGvZJxQhWqoOeTzFwA",
"_score" : 1.0,
"_source" : {
"orderId" : "1",
"productName" : "Laptop",
"quantity" : 1,
"price" : 999.99
}
}
]
}
}
关注我,送Java福利
/**
* 这段代码只有Java开发者才能看得懂!
* 关注我微信公众号之后,
* 发送:"666",
* 即可获得一本由Java大神一手面试经验诚意出品
* 《Java开发者面试百宝书》Pdf电子书
* 福利截止日期为2025年02月28日止
* 手快有手慢没!!!
*/
System.out.println("请关注我的微信公众号:");
System.out.println("Java知识日历");
更多推荐
所有评论(0)