在 Java 领域,数据采集(Data Collection / Data Ingestion)是构建大数据平台、物联网系统、监控系统、日志平台等的关键环节。以下是目前 主流和前沿的 Java 数据采集技术栈,涵盖从设备、数据库、日志、网络协议到 API 的各种场景。
📡 一、数据采集常见场景
| 场景 |
示例 |
| 设备数据采集 |
工业传感器、PLC、水表、电表、气体检测仪等 |
| 日志采集 |
Nginx 日志、业务日志、异常日志 |
| 数据库采集 |
MySQL、Oracle、PostgreSQL 等增量同步 |
| 接口采集 |
HTTP API、WebSocket、MQTT、Modbus TCP/RTU |
| 文件采集 |
CSV、TSV、Excel、XML、JSON 等文件导入 |
🛠️ 二、Java 数据采集常用技术与框架
1. 通用通信协议支持
| 协议 |
Java 支持方式 |
框架/工具 |
| [HTTP] |
HttpURLConnection, Apache HttpClient, OkHttp |
Retrofit, Feign |
| [HTTPS] |
同上 + SSLContext |
OkHttp, Apache HttpClient |
WebSocket |
Java-WebSocket, Tyrus, Spring WebSocket |
Netty |
MQTT |
Moquette, HiveMQ Client, Eclipse Paho |
MQTTX(调试) |
Modbus |
jamod, modbus4j, djamod |
工业设备对接 |
TCP/UDP Socket |
ServerSocket, DatagramSocket |
Netty, MINA |
Serial Port |
RXTX, jSerialComm |
串口通信(RS232/RS485) |
2. 数据采集中间件 / 工具
| 工具 |
特点 |
适用场景 |
Flume |
分布式、可靠、高可用的日志采集 |
日志收集、流式处理 |
Logstash |
ELK 套件之一,插件丰富 |
多源数据采集、转换 |
Filebeat |
轻量级日志采集器,常用于 Kubernetes |
容器日志采集 |
Apache Kafka Connect |
Kafka 生态的一部分,可连接 DB、API 等 |
实时数据管道 |
Canal / Debezium |
基于 MySQL binlog 的增量数据订阅 |
数据库实时同步 |
NiFi / MiNiFi |
图形化数据流管理工具 |
ETL、多源异构采集 |
Telegraf |
InfluxData 开发,支持多种输入输出插件 |
监控指标采集 |
3. 数据库采集方案
| 方案 |
技术 |
特点 |
| 全量采集 |
JDBC + SQL 查询 |
简单直接,适合低频次任务 |
| 增量采集 |
binlog + Canal/Debezium |
实时性强,适用于主从同步 |
| 时间戳采集 |
WHERE update_time > last_time |
实现简单,但可能漏数据 |
| 触发器采集 |
MySQL Trigger + RabbitMQ/Kafka |
实时性好,但影响性能 |
| CDC(Change Data Capture) |
Oracle GoldenGate、Maxwell、Debezium |
实时捕获结构化变更 |
4. 日志采集方案
| 方案 |
工具 |
特点 |
| 应用内埋点 |
Logback / Log4j + Async Appender |
可控性强,需代码配合 |
| 文件采集 |
Filebeat + Kafka |
轻量,适合容器环境 |
| APM 埋点 |
SkyWalking Agent / Pinpoint Agent |
无侵入式监控采集 |
| 日志聚合 |
Fluentd / Loki |
支持结构化日志分析 |
5. 工业 & 物联网设备采集
| 协议 |
Java 支持框架 |
场景 |
Modbus |
jamod, djamod, modbus4j |
PLC、仪表 |
OPC UA |
Eclipse Milo |
工厂自动化设备 |
BACnet |
BACnet4J |
楼宇自控系统 |
MQTT |
HiveMQ、Paho |
物联网设备通信 |
CoAP |
Californium |
资源受限设备通信 |
DL/T645 / GB/T 19582 |
自定义解析 |
电力仪表、智能电表 |
🧱 三、数据采集架构示例(推荐)
采集层:
- Modbus TCP: jModbus / djamod
- MQTT: Paho Client
- HTTP API: RestTemplate / WebClient / Feign
传输层:
- Kafka / RocketMQ / RabbitMQ
处理层:
- Flink / Spark Streaming / Storm
存储层:
- MySQL / PostgreSQL / TiDB (关系型)
- InfluxDB / TDengine / ClickHouse (时序/OLAP)
- Elasticsearch / MongoDB (非结构化)
展示层:
- Grafana / Superset / FineBI / 自研大屏
📦 四、典型 Java 项目结构建议
com.example.datacollector
├── collector
│ ├── modbus.ModbusCollector
│ ├── mqtt.MqttDataListener
│ └── http.HttpDataFetcher
├── processor
│ ├── DataTransformer
│ └── DataValidator
├── publisher
│ ├── KafkaDataPublisher
│ └── MqttDataPublisher
├── config
│ ├── DeviceConfigLoader
│ └── AppConfig
└── util
└── ByteUtils
🔍 五、选型建议(按需求分类)
| 需求 |
推荐技术 |
| 实时采集数据库变更 |
Debezium / Canal |
| 工业设备通信 |
Modbus4j / jamod |
| 物联网设备接入 |
MQTT + Paho / CoAP |
| 日志采集 |
Filebeat + Kafka |
| 多源异构采集 |
NiFi / Telegraf |
| 流式处理 |
Apache Flink / Spark Streaming |
| 存储 |
InfluxDB / ClickHouse / ES |
🧪 示例:Modbus TCP 采集代码片段
import com.serotonin.modbus4j.ModbusFactory;
import com.serotonin.modbus4j.ModbusMaster;
import com.serotonin.modbus4j.ip.IpParameters;
public class ModbusCollector {
public static void main(String[] args) throws Exception {
ModbusFactory factory = new ModbusFactory();
IpParameters params = new IpParameters();
params.setHost("192.168.1.100");
params.setPort(502);
ModbusMaster master = factory.createTcpMaster(params, false);
master.init();
int slaveId = 1;
int startOffset = 0;
int numberOfRegisters = 10;
short[] values = master.readMultipleRegisters(slaveId, startOffset, numberOfRegisters);
for (short v : values) {
System.out.println("Value: " + v);
}
master.destroy();
}
}
✅ 总结
| 类型 |
推荐技术 |
| 工业采集 |
Modbus4j、OPC UA、DL/T645 |
| 物联网采集 |
MQTT、CoAP、HTTP API |
| 数据库采集 |
Debezium、Canal、JDBC |
| 日志采集 |
Filebeat、Flume、Logstash |
| 中间件传输 |
Kafka、RocketMQ、RabbitMQ |
| 存储引擎 |
InfluxDB、ClickHouse、MySQL |
所有评论(0)