在 Java 领域,数据采集(Data Collection / Data Ingestion)是构建大数据平台、物联网系统、监控系统、日志平台等的关键环节。以下是目前 主流和前沿的 Java 数据采集技术栈,涵盖从设备、数据库、日志、网络协议到 API 的各种场景。


📡 一、数据采集常见场景

场景 示例
设备数据采集 工业传感器、PLC、水表、电表、气体检测仪等
日志采集 Nginx 日志、业务日志、异常日志
数据库采集 MySQL、Oracle、PostgreSQL 等增量同步
接口采集 HTTP API、WebSocket、MQTT、Modbus TCP/RTU
文件采集 CSV、TSV、Excel、XML、JSON 等文件导入

🛠️ 二、Java 数据采集常用技术与框架

1. 通用通信协议支持

协议 Java 支持方式 框架/工具
[HTTP] HttpURLConnection, Apache HttpClient, OkHttp Retrofit, Feign
[HTTPS] 同上 + SSLContext OkHttp, Apache HttpClient
WebSocket Java-WebSocket, Tyrus, Spring WebSocket Netty
MQTT Moquette, HiveMQ Client, Eclipse Paho MQTTX(调试)
Modbus jamod, modbus4j, djamod 工业设备对接
TCP/UDP Socket ServerSocket, DatagramSocket Netty, MINA
Serial Port RXTX, jSerialComm 串口通信(RS232/RS485)

2. 数据采集中间件 / 工具

工具 特点 适用场景
Flume 分布式、可靠、高可用的日志采集 日志收集、流式处理
Logstash ELK 套件之一,插件丰富 多源数据采集、转换
Filebeat 轻量级日志采集器,常用于 Kubernetes 容器日志采集
Apache Kafka Connect Kafka 生态的一部分,可连接 DB、API 等 实时数据管道
Canal / Debezium 基于 MySQL binlog 的增量数据订阅 数据库实时同步
NiFi / MiNiFi 图形化数据流管理工具 ETL、多源异构采集
Telegraf InfluxData 开发,支持多种输入输出插件 监控指标采集

3. 数据库采集方案

方案 技术 特点
全量采集 JDBC + SQL 查询 简单直接,适合低频次任务
增量采集 binlog + Canal/Debezium 实时性强,适用于主从同步
时间戳采集 WHERE update_time > last_time 实现简单,但可能漏数据
触发器采集 MySQL Trigger + RabbitMQ/Kafka 实时性好,但影响性能
CDC(Change Data Capture) Oracle GoldenGate、Maxwell、Debezium 实时捕获结构化变更

4. 日志采集方案

方案 工具 特点
应用内埋点 Logback / Log4j + Async Appender 可控性强,需代码配合
文件采集 Filebeat + Kafka 轻量,适合容器环境
APM 埋点 SkyWalking Agent / Pinpoint Agent 无侵入式监控采集
日志聚合 Fluentd / Loki 支持结构化日志分析

5. 工业 & 物联网设备采集

协议 Java 支持框架 场景
Modbus jamod, djamod, modbus4j PLC、仪表
OPC UA Eclipse Milo 工厂自动化设备
BACnet BACnet4J 楼宇自控系统
MQTT HiveMQ、Paho 物联网设备通信
CoAP Californium 资源受限设备通信
DL/T645 / GB/T 19582 自定义解析 电力仪表、智能电表

🧱 三、数据采集架构示例(推荐)

采集层:
  - Modbus TCP: jModbus / djamod
  - MQTT: Paho Client
  - HTTP API: RestTemplate / WebClient / Feign
传输层:
  - Kafka / RocketMQ / RabbitMQ
处理层:
  - Flink / Spark Streaming / Storm
存储层:
  - MySQL / PostgreSQL / TiDB (关系型)
  - InfluxDB / TDengine / ClickHouse (时序/OLAP)
  - Elasticsearch / MongoDB (非结构化)
展示层:
  - Grafana / Superset / FineBI / 自研大屏

📦 四、典型 Java 项目结构建议

com.example.datacollector
├── collector
│   ├── modbus.ModbusCollector
│   ├── mqtt.MqttDataListener
│   └── http.HttpDataFetcher
├── processor
│   ├── DataTransformer
│   └── DataValidator
├── publisher
│   ├── KafkaDataPublisher
│   └── MqttDataPublisher
├── config
│   ├── DeviceConfigLoader
│   └── AppConfig
└── util
    └── ByteUtils

🔍 五、选型建议(按需求分类)

需求 推荐技术
实时采集数据库变更 Debezium / Canal
工业设备通信 Modbus4j / jamod
物联网设备接入 MQTT + Paho / CoAP
日志采集 Filebeat + Kafka
多源异构采集 NiFi / Telegraf
流式处理 Apache Flink / Spark Streaming
存储 InfluxDB / ClickHouse / ES

🧪 示例:Modbus TCP 采集代码片段

import com.serotonin.modbus4j.ModbusFactory;
import com.serotonin.modbus4j.ModbusMaster;
import com.serotonin.modbus4j.ip.IpParameters;

public class ModbusCollector {
    public static void main(String[] args) throws Exception {
        ModbusFactory factory = new ModbusFactory();
        IpParameters params = new IpParameters();
        params.setHost("192.168.1.100");
        params.setPort(502);

        ModbusMaster master = factory.createTcpMaster(params, false);
        master.init();

        // 读取保持寄存器
        int slaveId = 1;
        int startOffset = 0;
        int numberOfRegisters = 10;

        short[] values = master.readMultipleRegisters(slaveId, startOffset, numberOfRegisters);
        for (short v : values) {
            System.out.println("Value: " + v);
        }

        master.destroy();
    }
}

✅ 总结

类型 推荐技术
工业采集 Modbus4j、OPC UA、DL/T645
物联网采集 MQTT、CoAP、HTTP API
数据库采集 Debezium、Canal、JDBC
日志采集 Filebeat、Flume、Logstash
中间件传输 Kafka、RocketMQ、RabbitMQ
存储引擎 InfluxDB、ClickHouse、MySQL
Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐