Java+Flink实时数据分析平台:从数据采集到可视化大屏实战
实时数据分析是工业4.0和互联网业务的核心能力——比如电商的实时订单统计、工业产线的实时设备监控、大屏的实时数据展示。Flink是Apache旗下的顶级实时计算框架,以低延迟、高吞吐、 Exactly-Once语义著称,是实时数据分析的事实标准。本文从零开始,用Flink 1.18.0 + Kafka(可选) + Redis + MySQL + Spring Boot 3.x + WebSocke
实时数据分析是工业4.0和互联网业务的核心能力——比如电商的实时订单统计、工业产线的实时设备监控、大屏的实时数据展示。Flink是Apache旗下的顶级实时计算框架,以低延迟、高吞吐、 Exactly-Once语义著称,是实时数据分析的事实标准。
本文从零开始,用Flink 1.18.0 + Kafka(可选) + Redis + MySQL + Spring Boot 3.x + WebSocket + ECharts实现一个完整的实时订单数据分析平台,涵盖:数据采集(模拟订单生成)、Flink实时清洗聚合、数据存储(Redis存实时结果、MySQL存历史数据)、可视化大屏(WebSocket实时推送ECharts)。
一、平台架构设计
整个平台分为5层,架构清晰,可扩展:
【数据采集层】→ 【消息队列层(Kafka,可选)】→ 【实时处理层(Flink)】→ 【数据存储层(Redis/MySQL)】→ 【可视化层(Spring Boot WebSocket + ECharts)】
- 数据采集层:模拟生成电商订单数据(包含订单ID、用户ID、商品ID、数量、金额、时间);
- 消息队列层:Kafka(可选,解耦采集和处理,生产环境推荐);
- 实时处理层:Flink消费数据,清洗过滤,按1分钟滚动窗口聚合:
- 每分钟的订单总数;
- 每分钟的订单总金额;
- 每分钟的热门商品TOP5;
- 数据存储层:
- Redis:存实时聚合结果(供大屏快速读取);
- MySQL:存原始订单数据(供历史查询);
- 可视化层:Spring Boot WebSocket从Redis读取实时数据,推送到前端ECharts大屏,动态更新。
二、技术选型与环境准备
1. 技术栈
| 技术 | 版本 | 用途 |
|---|---|---|
| Java | 17 | 开发语言 |
| Flink | 1.18.0 | 实时计算框架 |
| Kafka | 2.8.2 | 消息队列(可选) |
| Redis | 7.0 | 实时结果存储 |
| MySQL | 8.0 | 历史数据存储 |
| Spring Boot | 3.2.0 | Web框架+WebSocket |
| ECharts | 5.4.3 | 可视化大屏 |
| MyBatis-Plus | 3.5.5 | 持久层框架 |
2. 环境准备
- 安装Java 17;
- 安装Flink 1.18.0:https://flink.apache.org/downloads/;
- 安装Kafka 2.8.2(可选):https://kafka.apache.org/downloads;
- 安装Redis 7.0;
- 安装MySQL 8.0。
三、项目结构与依赖
1. 项目结构
flink-realtime-platform
├── flink-realtime-job # Flink实时处理任务
│ └── src/main/java/com/example/flink
│ ├── entity # 实体类
│ ├── source # 数据源(模拟订单/Kafka)
│ ├── sink # 数据存储(Redis/MySQL)
│ └── RealtimeOrderJob.java # Flink主程序
└── flink-realtime-web # Spring Boot可视化大屏
└── src/main/java/com/example/web
├── config # WebSocket配置
├── controller # WebSocket处理器
├── service # Redis服务
└── FlinkRealtimeWebApplication.java
└── pom.xml
2. 父项目依赖(pom.xml)
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.0</version>
<relativePath/>
</parent>
<groupId>com.example</groupId>
<artifactId>flink-realtime-platform</artifactId>
<version>1.0.0</version>
<packaging>pom</packaging>
<modules>
<module>flink-realtime-job</module>
<module>flink-realtime-web</module>
</modules>
<properties>
<java.version>17</java.version>
<flink.version>1.18.0</flink.version>
<mybatis-plus.version>3.5.5</mybatis-plus.version>
</properties>
<dependencyManagement>
<dependencies>
<!-- Flink -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-redis</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- MyBatis-Plus -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>${mybatis-plus.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
</project>
四、实战1:Flink实时处理任务(flink-realtime-job)
1. 子项目依赖(flink-realtime-job/pom.xml)
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.example</groupId>
<artifactId>flink-realtime-platform</artifactId>
<version>1.0.0</version>
</parent>
<artifactId>flink-realtime-job</artifactId>
<dependencies>
<!-- Flink核心 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
</dependency>
<!-- Flink Redis Connector -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-redis</artifactId>
</dependency>
<!-- Flink JDBC Connector -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
</dependency>
<!-- MySQL驱动 -->
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<scope>runtime</scope>
</dependency>
<!-- Jedis(Redis客户端) -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>5.1.0</version>
</dependency>
<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
</project>
2. 实体类(Order.java、OrderAgg.java)
package com.example.flink.entity;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.math.BigDecimal;
import java.time.LocalDateTime;
/**
* 订单实体类
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Order {
private Long id; // 订单ID
private Long userId; // 用户ID
private Long productId; // 商品ID
private String productName; // 商品名称
private Integer quantity; // 数量
private BigDecimal totalAmount; // 总金额
private LocalDateTime createTime; // 创建时间
}
package com.example.flink.entity;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.math.BigDecimal;
import java.util.List;
/**
* 订单聚合结果实体类
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class OrderAgg {
private String windowStart; // 窗口开始时间
private Long orderCount; // 订单总数
private BigDecimal totalAmount; // 总金额
private List<String> hotProducts; // 热门商品TOP5
}
3. 模拟订单数据源(OrderSource.java)
package com.example.flink.source;
import com.example.flink.entity.Order;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.util.Random;
import java.util.UUID;
/**
* 模拟订单数据源:每秒生成10条订单
*/
public class OrderSource implements SourceFunction<Order> {
private volatile boolean isRunning = true;
private final Random random = new Random();
// 模拟商品列表
private static final String[] PRODUCTS = {"iPhone 15", "MacBook Pro", "AirPods Pro", "iPad Pro", "Apple Watch"};
@Override
public void run(SourceContext<Order> ctx) throws Exception {
while (isRunning) {
// 生成10条订单
for (int i = 0; i < 10; i++) {
Order order = new Order();
order.setId(Math.abs(UUID.randomUUID().getMostSignificantBits()));
order.setUserId(random.nextLong(1000) + 1);
int productIndex = random.nextInt(PRODUCTS.length);
order.setProductId((long) productIndex + 1);
order.setProductName(PRODUCTS[productIndex]);
order.setQuantity(random.nextInt(5) + 1);
order.setTotalAmount(new BigDecimal(random.nextInt(10000) + 100));
order.setCreateTime(LocalDateTime.now());
// 发送订单数据
ctx.collect(order);
}
// 每秒生成一次
Thread.sleep(1000);
}
}
@Override
public void cancel() {
isRunning = false;
}
}
4. Flink主程序(RealtimeOrderJob.java)
package com.example.flink;
import com.example.flink.entity.Order;
import com.example.flink.entity.OrderAgg;
import com.example.flink.sink.MySQLSink;
import com.example.flink.sink.RedisSink;
import com.example.flink.source.OrderSource;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.math.BigDecimal;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.stream.Collectors;
/**
* Flink实时订单分析主程序
*/
public class RealtimeOrderJob {
public static void main(String[] args) throws Exception {
// 1. 创建Flink流执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2); // 设置并行度
// 2. 添加数据源:模拟订单
DataStream<Order> orderStream = env.addSource(new OrderSource());
// 3. 数据清洗:过滤掉金额为0的订单
SingleOutputStreamOperator<Order> cleanedStream = orderStream.filter(order -> order.getTotalAmount().compareTo(BigDecimal.ZERO) > 0);
// 4. 写入MySQL:原始订单数据
cleanedStream.addSink(new MySQLSink());
// 5. 1分钟滚动窗口聚合:订单数、总金额、热门商品
SingleOutputStreamOperator<OrderAgg> aggStream = cleanedStream
.windowAll(TumblingProcessingTimeWindows.of(Time.minutes(1)))
.aggregate(new OrderAggregateFunction());
// 6. 写入Redis:实时聚合结果
aggStream.addSink(new RedisSink());
// 7. 打印结果(开发环境用)
aggStream.print("实时聚合结果");
// 8. 启动Flink任务
env.execute("RealtimeOrderAnalysisJob");
}
/**
* 订单聚合函数:聚合1分钟的订单
*/
private static class OrderAggregateFunction implements AggregateFunction<Order, OrderAggAccumulator, OrderAgg> {
@Override
public OrderAggAccumulator createAccumulator() {
return new OrderAggAccumulator();
}
@Override
public OrderAggAccumulator add(Order order, OrderAggAccumulator acc) {
acc.orderCount++;
acc.totalAmount = acc.totalAmount.add(order.getTotalAmount());
// 统计商品数量
acc.productCountMap.merge(order.getProductName(), 1, Integer::sum);
// 记录窗口开始时间
if (acc.windowStart == null) {
acc.windowStart = order.getCreateTime().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
}
return acc;
}
@Override
public OrderAgg getResult(OrderAggAccumulator acc) {
// 计算热门商品TOP5
List<String> hotProducts = acc.productCountMap.entrySet().stream()
.sorted(Map.Entry.<String, Integer>comparingByValue().reversed())
.limit(5)
.map(Map.Entry::getKey)
.collect(Collectors.toList());
return new OrderAgg(acc.windowStart, acc.orderCount, acc.totalAmount, hotProducts);
}
@Override
public OrderAggAccumulator merge(OrderAggAccumulator a, OrderAggAccumulator b) {
a.orderCount += b.orderCount;
a.totalAmount = a.totalAmount.add(b.totalAmount);
b.productCountMap.forEach((k, v) -> a.productCountMap.merge(k, v, Integer::sum));
return a;
}
}
/**
* 聚合累加器
*/
private static class OrderAggAccumulator {
String windowStart;
Long orderCount = 0L;
BigDecimal totalAmount = BigDecimal.ZERO;
Map<String, Integer> productCountMap = new HashMap<>();
}
}
5. Redis Sink(RedisSink.java)
package com.example.flink.sink;
import com.example.flink.entity.OrderAgg;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
/**
* Redis Sink:写入实时聚合结果
*/
public class RedisSink implements SinkFunction<OrderAgg> {
private transient JedisPool jedisPool;
private final ObjectMapper objectMapper = new ObjectMapper();
@Override
public void invoke(OrderAgg value, Context context) throws Exception {
if (jedisPool == null) {
// 初始化Jedis连接池
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxTotal(10);
jedisPool = new JedisPool(config, "localhost", 6379);
}
try (Jedis jedis = jedisPool.getResource()) {
// 写入Redis,Key为realtime:order:agg
String json = objectMapper.writeValueAsString(value);
jedis.set("realtime:order:agg", json);
System.out.println("写入Redis成功:" + json);
}
}
}
6. MySQL Sink(MySQLSink.java)
package com.example.flink.sink;
import com.example.flink.entity.Order;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import java.sql.PreparedStatement;
/**
* MySQL Sink:写入原始订单数据
*/
public class MySQLSink implements SinkFunction<Order> {
private transient SinkFunction<Order> jdbcSink;
@Override
public void invoke(Order value, Context context) throws Exception {
if (jdbcSink == null) {
// 初始化JdbcSink
jdbcSink = JdbcSink.sink(
"INSERT INTO `order` (id, user_id, product_id, product_name, quantity, total_amount, create_time) VALUES (?, ?, ?, ?, ?, ?, ?)",
(PreparedStatement ps, Order order) -> {
ps.setLong(1, order.getId());
ps.setLong(2, order.getUserId());
ps.setLong(3, order.getProductId());
ps.setString(4, order.getProductName());
ps.setInt(5, order.getQuantity());
ps.setBigDecimal(6, order.getTotalAmount());
ps.setObject(7, order.getCreateTime());
},
JdbcExecutionOptions.builder()
.withBatchSize(100)
.withBatchIntervalMs(1000)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://localhost:3306/flink_db?useSSL=false&serverTimezone=Asia/Shanghai")
.withDriverName("com.mysql.cj.jdbc.Driver")
.withUsername("root")
.withPassword("123456")
.build()
);
}
jdbcSink.invoke(value, context);
}
}
五、实战2:Spring Boot可视化大屏(flink-realtime-web)
1. 子项目依赖(flink-realtime-web/pom.xml)
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.example</groupId>
<artifactId>flink-realtime-platform</artifactId>
<version>1.0.0</version>
</parent>
<artifactId>flink-realtime-web</artifactId>
<dependencies>
<!-- Spring Boot Web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Spring Boot WebSocket -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<!-- Jedis -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>5.1.0</version>
</dependency>
<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
</project>
2. WebSocket配置(WebSocketConfig.java)
package com.example.web.config;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
/**
* WebSocket配置:使用STOMP协议
*/
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
// 注册WebSocket端点,前端连接这个端点
registry.addEndpoint("/ws/realtime").withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
// 启用简单的消息代理,前端订阅/topic/realtime
registry.enableSimpleBroker("/topic");
// 应用目标前缀,前端发送消息到/app
registry.setApplicationDestinationPrefixes("/app");
}
}
3. Redis服务(RedisService.java)
package com.example.web.service;
import com.example.web.entity.OrderAgg;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.stereotype.Service;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
/**
* Redis服务:读取实时聚合结果
*/
@Service
public class RedisService {
private final JedisPool jedisPool;
private final ObjectMapper objectMapper = new ObjectMapper();
public RedisService() {
// 初始化Jedis连接池
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxTotal(10);
this.jedisPool = new JedisPool(config, "localhost", 6379);
}
/**
* 从Redis读取实时聚合结果
*/
public OrderAgg getRealtimeAgg() {
try (Jedis jedis = jedisPool.getResource()) {
String json = jedis.get("realtime:order:agg");
if (json == null) {
return null;
}
return objectMapper.readValue(json, OrderAgg.class);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
}
4. 定时推送任务(RealtimePushTask.java)
package com.example.web.task;
import com.example.web.entity.OrderAgg;
import com.example.web.service.RedisService;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
/**
* 定时推送任务:每秒从Redis读取数据,推送到前端
*/
@Component
public class RealtimePushTask {
private final SimpMessagingTemplate messagingTemplate;
private final RedisService redisService;
public RealtimePushTask(SimpMessagingTemplate messagingTemplate, RedisService redisService) {
this.messagingTemplate = messagingTemplate;
this.redisService = redisService;
}
@Scheduled(fixedRate = 1000) // 每秒执行一次
public void pushRealtimeData() {
OrderAgg agg = redisService.getRealtimeAgg();
if (agg != null) {
// 推送到前端订阅的/topic/realtime
messagingTemplate.convertAndSend("/topic/realtime", agg);
}
}
}
5. 前端大屏(index.html)
在src/main/resources/static下创建index.html,用ECharts展示实时数据:
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>实时订单数据分析大屏</title>
<script src="https://cdn.jsdelivr.net/npm/echarts@5.4.3/dist/echarts.min.js"></script>
<script src="https://cdn.jsdelivr.net/npm/sockjs-client@1.5.1/dist/sockjs.min.js"></script>
<script src="https://cdn.jsdelivr.net/npm/stompjs@2.3.3/lib/stomp.min.js"></script>
<style>
body { margin: 0; padding: 0; background-color: #0f1419; color: white; font-family: Arial, sans-serif; }
.container { display: flex; flex-wrap: wrap; padding: 20px; }
.card { flex: 1; min-width: 300px; margin: 10px; padding: 20px; background-color: #1a2129; border-radius: 10px; }
.title { font-size: 20px; font-weight: bold; margin-bottom: 20px; text-align: center; }
#orderCountChart, #totalAmountChart, #hotProductChart { width: 100%; height: 300px; }
</style>
</head>
<body>
<div class="container">
<div class="card">
<div class="title">实时订单总数</div>
<div id="orderCountChart"></div>
</div>
<div class="card">
<div class="title">实时订单总金额</div>
<div id="totalAmountChart"></div>
</div>
<div class="card">
<div class="title">热门商品TOP5</div>
<div id="hotProductChart"></div>
</div>
</div>
<script>
// 1. 初始化ECharts
const orderCountChart = echarts.init(document.getElementById('orderCountChart'));
const totalAmountChart = echarts.init(document.getElementById('totalAmountChart'));
const hotProductChart = echarts.init(document.getElementById('hotProductChart'));
// 2. 初始化图表数据
let orderCountData = [];
let totalAmountData = [];
let hotProductData = [];
// 3. 连接WebSocket
const socket = new SockJS('/ws/realtime');
const stompClient = Stomp.over(socket);
stompClient.connect({}, function (frame) {
console.log('WebSocket连接成功');
// 订阅/topic/realtime
stompClient.subscribe('/topic/realtime', function (message) {
const agg = JSON.parse(message.body);
updateCharts(agg);
});
});
// 4. 更新图表
function updateCharts(agg) {
// 更新订单总数
orderCountData.push({ name: agg.windowStart, value: agg.orderCount });
if (orderCountData.length > 10) orderCountData.shift();
orderCountChart.setOption({
xAxis: { type: 'category', data: orderCountData.map(d => d.name) },
yAxis: { type: 'value' },
series: [{ type: 'line', data: orderCountData.map(d => d.value), smooth: true, itemStyle: { color: '#00d4ff' } }]
});
// 更新总金额
totalAmountData.push({ name: agg.windowStart, value: agg.totalAmount });
if (totalAmountData.length > 10) totalAmountData.shift();
totalAmountChart.setOption({
xAxis: { type: 'category', data: totalAmountData.map(d => d.name) },
yAxis: { type: 'value' },
series: [{ type: 'bar', data: totalAmountData.map(d => d.value), itemStyle: { color: '#ff6b6b' } }]
});
// 更新热门商品
hotProductChart.setOption({
xAxis: { type: 'value' },
yAxis: { type: 'category', data: agg.hotProducts.reverse() },
series: [{ type: 'bar', data: agg.hotProducts.map((p, i) => ({ value: 5 - i, name: p })), itemStyle: { color: '#4ecdc4' } }]
});
}
// 5. 窗口大小改变时重绘图表
window.addEventListener('resize', function () {
orderCountChart.resize();
totalAmountChart.resize();
hotProductChart.resize();
});
</script>
</body>
</html>
六、数据库准备
在MySQL里创建数据库和表:
CREATE DATABASE IF NOT EXISTS flink_db DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
USE flink_db;
CREATE TABLE IF NOT EXISTS `order` (
id BIGINT PRIMARY KEY COMMENT '订单ID',
user_id BIGINT NOT NULL COMMENT '用户ID',
product_id BIGINT NOT NULL COMMENT '商品ID',
product_name VARCHAR(50) NOT NULL COMMENT '商品名称',
quantity INT NOT NULL COMMENT '数量',
total_amount DECIMAL(10,2) NOT NULL COMMENT '总金额',
create_time DATETIME NOT NULL COMMENT '创建时间',
INDEX idx_create_time (create_time)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='订单表';
七、测试运行
1. 启动Flink任务
- 编译
flink-realtime-job:mvn clean package; - 启动Flink集群:
./bin/start-cluster.sh; - 提交Flink任务:
./bin/flink run -c com.example.flink.RealtimeOrderJob flink-realtime-job/target/flink-realtime-job-1.0.0.jar; - 访问Flink Web UI:http://localhost:8081,查看任务运行状态。
2. 启动Spring Boot可视化大屏
- 启动
flink-realtime-web的FlinkRealtimeWebApplication; - 访问可视化大屏:http://localhost:8080/index.html;
- 可以看到ECharts图表实时更新,展示每分钟的订单总数、总金额、热门商品TOP5。
八、进阶优化建议
- 加入Kafka:把模拟订单数据源改成Kafka Producer,Flink从Kafka消费数据,解耦采集和处理;
- Exactly-Once语义:Flink + Kafka + Checkpoint,实现Exactly-Once语义,保证数据不丢不重;
- 更多聚合指标:比如按小时滑动窗口、按用户维度聚合、按地区维度聚合;
- 告警功能:Flink检测到订单数骤降、金额异常时,发送告警(邮件/短信/钉钉);
- 历史数据查询:Spring Boot + MyBatis-Plus,提供历史订单数据的查询接口;
- 大屏美化:用更多ECharts图表(比如地图、饼图、雷达图),美化大屏界面。
九、总结
本文从零开始,用Flink 1.18.0 + Redis + MySQL + Spring Boot 3.x + WebSocket + ECharts实现了一个完整的实时订单数据分析平台,涵盖了:
- 数据采集:模拟订单生成;
- Flink实时处理:清洗、过滤、1分钟滚动窗口聚合;
- 数据存储:Redis存实时结果,MySQL存历史数据;
- 可视化大屏:WebSocket实时推送,ECharts动态展示。
这个平台是实时数据分析的基础,希望能帮你快速上手Flink和实时数据可视化,有问题可以在评论区交流!
更多推荐
所有评论(0)