实时数据分析是工业4.0和互联网业务的核心能力——比如电商的实时订单统计、工业产线的实时设备监控、大屏的实时数据展示。Flink是Apache旗下的顶级实时计算框架,以低延迟、高吞吐、 Exactly-Once语义著称,是实时数据分析的事实标准。

本文从零开始,用Flink 1.18.0 + Kafka(可选) + Redis + MySQL + Spring Boot 3.x + WebSocket + ECharts实现一个完整的实时订单数据分析平台,涵盖:数据采集(模拟订单生成)、Flink实时清洗聚合、数据存储(Redis存实时结果、MySQL存历史数据)、可视化大屏(WebSocket实时推送ECharts)


一、平台架构设计

整个平台分为5层,架构清晰,可扩展:

【数据采集层】→ 【消息队列层(Kafka,可选)】→ 【实时处理层(Flink)】→ 【数据存储层(Redis/MySQL)】→ 【可视化层(Spring Boot WebSocket + ECharts)】
  1. 数据采集层:模拟生成电商订单数据(包含订单ID、用户ID、商品ID、数量、金额、时间);
  2. 消息队列层:Kafka(可选,解耦采集和处理,生产环境推荐);
  3. 实时处理层:Flink消费数据,清洗过滤,按1分钟滚动窗口聚合:
    • 每分钟的订单总数;
    • 每分钟的订单总金额;
    • 每分钟的热门商品TOP5;
  4. 数据存储层
    • Redis:存实时聚合结果(供大屏快速读取);
    • MySQL:存原始订单数据(供历史查询);
  5. 可视化层:Spring Boot WebSocket从Redis读取实时数据,推送到前端ECharts大屏,动态更新。

二、技术选型与环境准备

1. 技术栈

技术 版本 用途
Java 17 开发语言
Flink 1.18.0 实时计算框架
Kafka 2.8.2 消息队列(可选)
Redis 7.0 实时结果存储
MySQL 8.0 历史数据存储
Spring Boot 3.2.0 Web框架+WebSocket
ECharts 5.4.3 可视化大屏
MyBatis-Plus 3.5.5 持久层框架

2. 环境准备

  1. 安装Java 17
  2. 安装Flink 1.18.0:https://flink.apache.org/downloads/;
  3. 安装Kafka 2.8.2(可选):https://kafka.apache.org/downloads;
  4. 安装Redis 7.0
  5. 安装MySQL 8.0

三、项目结构与依赖

1. 项目结构

flink-realtime-platform
├── flink-realtime-job       # Flink实时处理任务
│   └── src/main/java/com/example/flink
│       ├── entity          # 实体类
│       ├── source          # 数据源(模拟订单/Kafka)
│       ├── sink            # 数据存储(Redis/MySQL)
│       └── RealtimeOrderJob.java # Flink主程序
└── flink-realtime-web       # Spring Boot可视化大屏
    └── src/main/java/com/example/web
        ├── config          # WebSocket配置
        ├── controller      # WebSocket处理器
        ├── service         # Redis服务
        └── FlinkRealtimeWebApplication.java
└── pom.xml

2. 父项目依赖(pom.xml)

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.2.0</version>
        <relativePath/>
    </parent>

    <groupId>com.example</groupId>
    <artifactId>flink-realtime-platform</artifactId>
    <version>1.0.0</version>
    <packaging>pom</packaging>
    <modules>
        <module>flink-realtime-job</module>
        <module>flink-realtime-web</module>
    </modules>

    <properties>
        <java.version>17</java.version>
        <flink.version>1.18.0</flink.version>
        <mybatis-plus.version>3.5.5</mybatis-plus.version>
    </properties>

    <dependencyManagement>
        <dependencies>
            <!-- Flink -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-java</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-redis</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-jdbc</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <!-- MyBatis-Plus -->
            <dependency>
                <groupId>com.baomidou</groupId>
                <artifactId>mybatis-plus-boot-starter</artifactId>
                <version>${mybatis-plus.version}</version>
            </dependency>
        </dependencies>
    </dependencyManagement>
</project>

四、实战1:Flink实时处理任务(flink-realtime-job)

1. 子项目依赖(flink-realtime-job/pom.xml)

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>com.example</groupId>
        <artifactId>flink-realtime-platform</artifactId>
        <version>1.0.0</version>
    </parent>

    <artifactId>flink-realtime-job</artifactId>

    <dependencies>
        <!-- Flink核心 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
        </dependency>
        <!-- Flink Redis Connector -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-redis</artifactId>
        </dependency>
        <!-- Flink JDBC Connector -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc</artifactId>
        </dependency>
        <!-- MySQL驱动 -->
        <dependency>
            <groupId>com.mysql</groupId>
            <artifactId>mysql-connector-j</artifactId>
            <scope>runtime</scope>
        </dependency>
        <!-- Jedis(Redis客户端) -->
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>5.1.0</version>
        </dependency>
        <!-- Lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
    </dependencies>
</project>

2. 实体类(Order.java、OrderAgg.java)

package com.example.flink.entity;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.math.BigDecimal;
import java.time.LocalDateTime;

/**
 * 订单实体类
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Order {
    private Long id;          // 订单ID
    private Long userId;      // 用户ID
    private Long productId;   // 商品ID
    private String productName; // 商品名称
    private Integer quantity;  // 数量
    private BigDecimal totalAmount; // 总金额
    private LocalDateTime createTime; // 创建时间
}
package com.example.flink.entity;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.math.BigDecimal;
import java.util.List;

/**
 * 订单聚合结果实体类
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
public class OrderAgg {
    private String windowStart;   // 窗口开始时间
    private Long orderCount;      // 订单总数
    private BigDecimal totalAmount; // 总金额
    private List<String> hotProducts; // 热门商品TOP5
}

3. 模拟订单数据源(OrderSource.java)

package com.example.flink.source;

import com.example.flink.entity.Order;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.util.Random;
import java.util.UUID;

/**
 * 模拟订单数据源:每秒生成10条订单
 */
public class OrderSource implements SourceFunction<Order> {
    private volatile boolean isRunning = true;
    private final Random random = new Random();
    // 模拟商品列表
    private static final String[] PRODUCTS = {"iPhone 15", "MacBook Pro", "AirPods Pro", "iPad Pro", "Apple Watch"};

    @Override
    public void run(SourceContext<Order> ctx) throws Exception {
        while (isRunning) {
            // 生成10条订单
            for (int i = 0; i < 10; i++) {
                Order order = new Order();
                order.setId(Math.abs(UUID.randomUUID().getMostSignificantBits()));
                order.setUserId(random.nextLong(1000) + 1);
                int productIndex = random.nextInt(PRODUCTS.length);
                order.setProductId((long) productIndex + 1);
                order.setProductName(PRODUCTS[productIndex]);
                order.setQuantity(random.nextInt(5) + 1);
                order.setTotalAmount(new BigDecimal(random.nextInt(10000) + 100));
                order.setCreateTime(LocalDateTime.now());
                // 发送订单数据
                ctx.collect(order);
            }
            // 每秒生成一次
            Thread.sleep(1000);
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }
}

4. Flink主程序(RealtimeOrderJob.java)

package com.example.flink;

import com.example.flink.entity.Order;
import com.example.flink.entity.OrderAgg;
import com.example.flink.sink.MySQLSink;
import com.example.flink.sink.RedisSink;
import com.example.flink.source.OrderSource;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.math.BigDecimal;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.stream.Collectors;

/**
 * Flink实时订单分析主程序
 */
public class RealtimeOrderJob {
    public static void main(String[] args) throws Exception {
        // 1. 创建Flink流执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2); // 设置并行度

        // 2. 添加数据源:模拟订单
        DataStream<Order> orderStream = env.addSource(new OrderSource());

        // 3. 数据清洗:过滤掉金额为0的订单
        SingleOutputStreamOperator<Order> cleanedStream = orderStream.filter(order -> order.getTotalAmount().compareTo(BigDecimal.ZERO) > 0);

        // 4. 写入MySQL:原始订单数据
        cleanedStream.addSink(new MySQLSink());

        // 5. 1分钟滚动窗口聚合:订单数、总金额、热门商品
        SingleOutputStreamOperator<OrderAgg> aggStream = cleanedStream
                .windowAll(TumblingProcessingTimeWindows.of(Time.minutes(1)))
                .aggregate(new OrderAggregateFunction());

        // 6. 写入Redis:实时聚合结果
        aggStream.addSink(new RedisSink());

        // 7. 打印结果(开发环境用)
        aggStream.print("实时聚合结果");

        // 8. 启动Flink任务
        env.execute("RealtimeOrderAnalysisJob");
    }

    /**
     * 订单聚合函数:聚合1分钟的订单
     */
    private static class OrderAggregateFunction implements AggregateFunction<Order, OrderAggAccumulator, OrderAgg> {
        @Override
        public OrderAggAccumulator createAccumulator() {
            return new OrderAggAccumulator();
        }

        @Override
        public OrderAggAccumulator add(Order order, OrderAggAccumulator acc) {
            acc.orderCount++;
            acc.totalAmount = acc.totalAmount.add(order.getTotalAmount());
            // 统计商品数量
            acc.productCountMap.merge(order.getProductName(), 1, Integer::sum);
            // 记录窗口开始时间
            if (acc.windowStart == null) {
                acc.windowStart = order.getCreateTime().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
            }
            return acc;
        }

        @Override
        public OrderAgg getResult(OrderAggAccumulator acc) {
            // 计算热门商品TOP5
            List<String> hotProducts = acc.productCountMap.entrySet().stream()
                    .sorted(Map.Entry.<String, Integer>comparingByValue().reversed())
                    .limit(5)
                    .map(Map.Entry::getKey)
                    .collect(Collectors.toList());
            return new OrderAgg(acc.windowStart, acc.orderCount, acc.totalAmount, hotProducts);
        }

        @Override
        public OrderAggAccumulator merge(OrderAggAccumulator a, OrderAggAccumulator b) {
            a.orderCount += b.orderCount;
            a.totalAmount = a.totalAmount.add(b.totalAmount);
            b.productCountMap.forEach((k, v) -> a.productCountMap.merge(k, v, Integer::sum));
            return a;
        }
    }

    /**
     * 聚合累加器
     */
    private static class OrderAggAccumulator {
        String windowStart;
        Long orderCount = 0L;
        BigDecimal totalAmount = BigDecimal.ZERO;
        Map<String, Integer> productCountMap = new HashMap<>();
    }
}

5. Redis Sink(RedisSink.java)

package com.example.flink.sink;

import com.example.flink.entity.OrderAgg;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

/**
 * Redis Sink:写入实时聚合结果
 */
public class RedisSink implements SinkFunction<OrderAgg> {
    private transient JedisPool jedisPool;
    private final ObjectMapper objectMapper = new ObjectMapper();

    @Override
    public void invoke(OrderAgg value, Context context) throws Exception {
        if (jedisPool == null) {
            // 初始化Jedis连接池
            JedisPoolConfig config = new JedisPoolConfig();
            config.setMaxTotal(10);
            jedisPool = new JedisPool(config, "localhost", 6379);
        }
        try (Jedis jedis = jedisPool.getResource()) {
            // 写入Redis,Key为realtime:order:agg
            String json = objectMapper.writeValueAsString(value);
            jedis.set("realtime:order:agg", json);
            System.out.println("写入Redis成功:" + json);
        }
    }
}

6. MySQL Sink(MySQLSink.java)

package com.example.flink.sink;

import com.example.flink.entity.Order;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

import java.sql.PreparedStatement;

/**
 * MySQL Sink:写入原始订单数据
 */
public class MySQLSink implements SinkFunction<Order> {
    private transient SinkFunction<Order> jdbcSink;

    @Override
    public void invoke(Order value, Context context) throws Exception {
        if (jdbcSink == null) {
            // 初始化JdbcSink
            jdbcSink = JdbcSink.sink(
                    "INSERT INTO `order` (id, user_id, product_id, product_name, quantity, total_amount, create_time) VALUES (?, ?, ?, ?, ?, ?, ?)",
                    (PreparedStatement ps, Order order) -> {
                        ps.setLong(1, order.getId());
                        ps.setLong(2, order.getUserId());
                        ps.setLong(3, order.getProductId());
                        ps.setString(4, order.getProductName());
                        ps.setInt(5, order.getQuantity());
                        ps.setBigDecimal(6, order.getTotalAmount());
                        ps.setObject(7, order.getCreateTime());
                    },
                    JdbcExecutionOptions.builder()
                            .withBatchSize(100)
                            .withBatchIntervalMs(1000)
                            .build(),
                    new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                            .withUrl("jdbc:mysql://localhost:3306/flink_db?useSSL=false&serverTimezone=Asia/Shanghai")
                            .withDriverName("com.mysql.cj.jdbc.Driver")
                            .withUsername("root")
                            .withPassword("123456")
                            .build()
            );
        }
        jdbcSink.invoke(value, context);
    }
}

五、实战2:Spring Boot可视化大屏(flink-realtime-web)

1. 子项目依赖(flink-realtime-web/pom.xml)

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>com.example</groupId>
        <artifactId>flink-realtime-platform</artifactId>
        <version>1.0.0</version>
    </parent>

    <artifactId>flink-realtime-web</artifactId>

    <dependencies>
        <!-- Spring Boot Web -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- Spring Boot WebSocket -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>
        <!-- Jedis -->
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>5.1.0</version>
        </dependency>
        <!-- Lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
    </dependencies>
</project>

2. WebSocket配置(WebSocketConfig.java)

package com.example.web.config;

import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;

/**
 * WebSocket配置:使用STOMP协议
 */
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        // 注册WebSocket端点,前端连接这个端点
        registry.addEndpoint("/ws/realtime").withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        // 启用简单的消息代理,前端订阅/topic/realtime
        registry.enableSimpleBroker("/topic");
        // 应用目标前缀,前端发送消息到/app
        registry.setApplicationDestinationPrefixes("/app");
    }
}

3. Redis服务(RedisService.java)

package com.example.web.service;

import com.example.web.entity.OrderAgg;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.stereotype.Service;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

/**
 * Redis服务:读取实时聚合结果
 */
@Service
public class RedisService {
    private final JedisPool jedisPool;
    private final ObjectMapper objectMapper = new ObjectMapper();

    public RedisService() {
        // 初始化Jedis连接池
        JedisPoolConfig config = new JedisPoolConfig();
        config.setMaxTotal(10);
        this.jedisPool = new JedisPool(config, "localhost", 6379);
    }

    /**
     * 从Redis读取实时聚合结果
     */
    public OrderAgg getRealtimeAgg() {
        try (Jedis jedis = jedisPool.getResource()) {
            String json = jedis.get("realtime:order:agg");
            if (json == null) {
                return null;
            }
            return objectMapper.readValue(json, OrderAgg.class);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }
}

4. 定时推送任务(RealtimePushTask.java)

package com.example.web.task;

import com.example.web.entity.OrderAgg;
import com.example.web.service.RedisService;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

/**
 * 定时推送任务:每秒从Redis读取数据,推送到前端
 */
@Component
public class RealtimePushTask {
    private final SimpMessagingTemplate messagingTemplate;
    private final RedisService redisService;

    public RealtimePushTask(SimpMessagingTemplate messagingTemplate, RedisService redisService) {
        this.messagingTemplate = messagingTemplate;
        this.redisService = redisService;
    }

    @Scheduled(fixedRate = 1000) // 每秒执行一次
    public void pushRealtimeData() {
        OrderAgg agg = redisService.getRealtimeAgg();
        if (agg != null) {
            // 推送到前端订阅的/topic/realtime
            messagingTemplate.convertAndSend("/topic/realtime", agg);
        }
    }
}

5. 前端大屏(index.html)

src/main/resources/static下创建index.html,用ECharts展示实时数据:

<!DOCTYPE html>
<html>
<head>
    <meta charset="UTF-8">
    <title>实时订单数据分析大屏</title>
    <script src="https://cdn.jsdelivr.net/npm/echarts@5.4.3/dist/echarts.min.js"></script>
    <script src="https://cdn.jsdelivr.net/npm/sockjs-client@1.5.1/dist/sockjs.min.js"></script>
    <script src="https://cdn.jsdelivr.net/npm/stompjs@2.3.3/lib/stomp.min.js"></script>
    <style>
        body { margin: 0; padding: 0; background-color: #0f1419; color: white; font-family: Arial, sans-serif; }
        .container { display: flex; flex-wrap: wrap; padding: 20px; }
        .card { flex: 1; min-width: 300px; margin: 10px; padding: 20px; background-color: #1a2129; border-radius: 10px; }
        .title { font-size: 20px; font-weight: bold; margin-bottom: 20px; text-align: center; }
        #orderCountChart, #totalAmountChart, #hotProductChart { width: 100%; height: 300px; }
    </style>
</head>
<body>
    <div class="container">
        <div class="card">
            <div class="title">实时订单总数</div>
            <div id="orderCountChart"></div>
        </div>
        <div class="card">
            <div class="title">实时订单总金额</div>
            <div id="totalAmountChart"></div>
        </div>
        <div class="card">
            <div class="title">热门商品TOP5</div>
            <div id="hotProductChart"></div>
        </div>
    </div>

    <script>
        // 1. 初始化ECharts
        const orderCountChart = echarts.init(document.getElementById('orderCountChart'));
        const totalAmountChart = echarts.init(document.getElementById('totalAmountChart'));
        const hotProductChart = echarts.init(document.getElementById('hotProductChart'));

        // 2. 初始化图表数据
        let orderCountData = [];
        let totalAmountData = [];
        let hotProductData = [];

        // 3. 连接WebSocket
        const socket = new SockJS('/ws/realtime');
        const stompClient = Stomp.over(socket);
        stompClient.connect({}, function (frame) {
            console.log('WebSocket连接成功');
            // 订阅/topic/realtime
            stompClient.subscribe('/topic/realtime', function (message) {
                const agg = JSON.parse(message.body);
                updateCharts(agg);
            });
        });

        // 4. 更新图表
        function updateCharts(agg) {
            // 更新订单总数
            orderCountData.push({ name: agg.windowStart, value: agg.orderCount });
            if (orderCountData.length > 10) orderCountData.shift();
            orderCountChart.setOption({
                xAxis: { type: 'category', data: orderCountData.map(d => d.name) },
                yAxis: { type: 'value' },
                series: [{ type: 'line', data: orderCountData.map(d => d.value), smooth: true, itemStyle: { color: '#00d4ff' } }]
            });

            // 更新总金额
            totalAmountData.push({ name: agg.windowStart, value: agg.totalAmount });
            if (totalAmountData.length > 10) totalAmountData.shift();
            totalAmountChart.setOption({
                xAxis: { type: 'category', data: totalAmountData.map(d => d.name) },
                yAxis: { type: 'value' },
                series: [{ type: 'bar', data: totalAmountData.map(d => d.value), itemStyle: { color: '#ff6b6b' } }]
            });

            // 更新热门商品
            hotProductChart.setOption({
                xAxis: { type: 'value' },
                yAxis: { type: 'category', data: agg.hotProducts.reverse() },
                series: [{ type: 'bar', data: agg.hotProducts.map((p, i) => ({ value: 5 - i, name: p })), itemStyle: { color: '#4ecdc4' } }]
            });
        }

        // 5. 窗口大小改变时重绘图表
        window.addEventListener('resize', function () {
            orderCountChart.resize();
            totalAmountChart.resize();
            hotProductChart.resize();
        });
    </script>
</body>
</html>

六、数据库准备

在MySQL里创建数据库和表:

CREATE DATABASE IF NOT EXISTS flink_db DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
USE flink_db;

CREATE TABLE IF NOT EXISTS `order` (
    id BIGINT PRIMARY KEY COMMENT '订单ID',
    user_id BIGINT NOT NULL COMMENT '用户ID',
    product_id BIGINT NOT NULL COMMENT '商品ID',
    product_name VARCHAR(50) NOT NULL COMMENT '商品名称',
    quantity INT NOT NULL COMMENT '数量',
    total_amount DECIMAL(10,2) NOT NULL COMMENT '总金额',
    create_time DATETIME NOT NULL COMMENT '创建时间',
    INDEX idx_create_time (create_time)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='订单表';

七、测试运行

1. 启动Flink任务

  1. 编译flink-realtime-jobmvn clean package
  2. 启动Flink集群:./bin/start-cluster.sh
  3. 提交Flink任务:./bin/flink run -c com.example.flink.RealtimeOrderJob flink-realtime-job/target/flink-realtime-job-1.0.0.jar
  4. 访问Flink Web UI:http://localhost:8081,查看任务运行状态。

2. 启动Spring Boot可视化大屏

  1. 启动flink-realtime-webFlinkRealtimeWebApplication
  2. 访问可视化大屏:http://localhost:8080/index.html;
  3. 可以看到ECharts图表实时更新,展示每分钟的订单总数、总金额、热门商品TOP5。

八、进阶优化建议

  1. 加入Kafka:把模拟订单数据源改成Kafka Producer,Flink从Kafka消费数据,解耦采集和处理;
  2. Exactly-Once语义:Flink + Kafka + Checkpoint,实现Exactly-Once语义,保证数据不丢不重;
  3. 更多聚合指标:比如按小时滑动窗口、按用户维度聚合、按地区维度聚合;
  4. 告警功能:Flink检测到订单数骤降、金额异常时,发送告警(邮件/短信/钉钉);
  5. 历史数据查询:Spring Boot + MyBatis-Plus,提供历史订单数据的查询接口;
  6. 大屏美化:用更多ECharts图表(比如地图、饼图、雷达图),美化大屏界面。

九、总结

本文从零开始,用Flink 1.18.0 + Redis + MySQL + Spring Boot 3.x + WebSocket + ECharts实现了一个完整的实时订单数据分析平台,涵盖了:

  • 数据采集:模拟订单生成;
  • Flink实时处理:清洗、过滤、1分钟滚动窗口聚合;
  • 数据存储:Redis存实时结果,MySQL存历史数据;
  • 可视化大屏:WebSocket实时推送,ECharts动态展示。

这个平台是实时数据分析的基础,希望能帮你快速上手Flink和实时数据可视化,有问题可以在评论区交流!

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐