外卖餐饮API开发:Java实现霸王餐数据的实时统计与报表生成技巧

在高并发外卖场景中,霸王餐活动需对核销订单、用户参与、商家补贴等指标进行秒级统计,并支持多维度动态报表。本文基于Flink实时计算、Redis聚合缓存、MyBatis动态SQL及EasyExcel导出,构建低延迟、高吞吐的统计系统。

1. 实时数据流接入与预处理

通过Kafka接收核销事件,Flink作业清洗并写入中间Topic:

// Flink作业(非Spring上下文)
public class RedemptionEventProcessor {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(5000);

        DataStream<RedemptionEvent> stream = env
            .addSource(new FlinkKafkaConsumer<>("redemption-events", 
                new JSONKeyValueDeserializationSchema(), props))
            .map(record -> {
                JSONObject value = record.getObject("value");
                return new RedemptionEvent(
                    value.getString("orderId"),
                    value.getString("userId"),
                    value.getString("merchantId"),
                    value.getBigDecimal("subsidyAmount"),
                    value.getLong("timestamp")
                );
            })
            .filter(event -> event.getSubsidyAmount().compareTo(BigDecimal.ZERO) > 0);

        // 按商户ID分组,写入聚合Topic
        stream.keyBy(RedemptionEvent::getMerchantId)
              .process(new MerchantAggProcessFunction())
              .addSink(new FlinkKafkaProducer<>("merchant-agg", new SimpleStringSchema(), props));

        env.execute("Redemption Aggregation");
    }
}

在这里插入图片描述

2. Redis实时聚合缓存设计

使用Hash结构存储分钟级、小时级、日级指标:

package baodanbao.com.cn.catering.stat;

@Service
public class RealTimeStatService {

    @Autowired
    private StringRedisTemplate redisTemplate;

    private static final String KEY_PREFIX = "baodan:stat:merchant:";

    public void updateMerchantStat(String merchantId, BigDecimal amount, long timestamp) {
        String dateKey = LocalDate.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault()).toString();
        String hourKey = dateKey + ":" + LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault()).getHour();
        String minuteKey = hourKey + ":" + (LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault()).getMinute() / 5 * 5); // 5分钟粒度

        // 日汇总
        redisTemplate.opsForHash().increment(KEY_PREFIX + merchantId + ":daily", dateKey, amount.doubleValue());
        // 小时汇总
        redisTemplate.opsForHash().increment(KEY_PREFIX + merchantId + ":hourly", hourKey, amount.doubleValue());
        // 5分钟汇总
        redisTemplate.opsForHash().increment(KEY_PREFIX + merchantId + ":min5", minuteKey, amount.doubleValue());

        // 总参与人数(Set去重)
        String userId = getCurrentUserId(); // 从上下文获取
        redisTemplate.opsForSet().add(KEY_PREFIX + merchantId + ":users:daily:" + dateKey, userId);
    }

    public MerchantStat getTodayStat(String merchantId) {
        String today = LocalDate.now().toString();
        Double amount = (Double) redisTemplate.opsForHash().get(KEY_PREFIX + merchantId + ":daily", today);
        Long userCount = redisTemplate.opsForSet().size(KEY_PREFIX + merchantId + ":users:daily:" + today);
        return new MerchantStat(amount == null ? 0.0 : amount, userCount == null ? 0L : userCount);
    }
}

3. 动态SQL构建多维查询

支持按时间、区域、商户类型组合筛选:

@Mapper
public interface RedemptionReportMapper {

    @Select({
        "<script>",
        "SELECT",
        "  DATE(r.create_time) AS stat_date,",
        "  r.merchant_id,",
        "  m.region,",
        "  SUM(r.subsidy_amount) AS total_subsidy,",
        "  COUNT(*) AS order_count",
        "FROM redemption_record r",
        "JOIN merchant m ON r.merchant_id = m.id",
        "WHERE r.status = 'SUCCESS'",
        "<if test='startTime != null'>AND r.create_time >= #{startTime}</if>",
        "<if test='endTime != null'>AND r.create_time &lt;= #{endTime}</if>",
        "<if test='region != null'>AND m.region = #{region}</if>",
        "<if test='merchantIds != null and !merchantIds.isEmpty()'>",
        "  AND r.merchant_id IN",
        "  <foreach item='id' collection='merchantIds' open='(' separator=',' close=')'>#{id}</foreach>",
        "</if>",
        "GROUP BY DATE(r.create_time), r.merchant_id, m.region",
        "ORDER BY stat_date DESC",
        "</script>"
    })
    List<RedemptionDailyStat> queryDailyStats(@Param("startTime") LocalDateTime startTime,
                                              @Param("endTime") LocalDateTime endTime,
                                              @Param("region") String region,
                                              @Param("merchantIds") List<String> merchantIds);
}

4. 报表异步生成与下载

避免大查询阻塞主线程:

@RestController
public class ReportExportController {

    @PostMapping("/api/report/export")
    public ResponseEntity<String> triggerExport(@RequestBody ReportExportRequest request) {
        String taskId = UUID.randomUUID().toString();
        CompletableFuture.runAsync(() -> {
            try {
                List<RedemptionDailyStat> data = baodanbao.com.cn.catering.mapper.RedemptionReportMapper
                    .queryDailyStats(request.getStartTime(), request.getEndTime(), request.getRegion(), request.getMerchantIds());

                String filePath = "/tmp/report_" + taskId + ".xlsx";
                EasyExcel.write(filePath, RedemptionDailyStat.class)
                         .sheet("霸王餐日报")
                         .doWrite(data);

                // 上传至OSS并记录状态
                String ossUrl = baodanbao.com.cn.catering.storage.OssClient.upload(filePath);
                baodanbao.com.cn.catering.task.ExportTaskRegistry.markSuccess(taskId, ossUrl);
            } catch (Exception e) {
                baodanbao.com.cn.catering.task.ExportTaskRegistry.markFailed(taskId, e.getMessage());
            }
        }, Executors.newFixedThreadPool(2));
        return ResponseEntity.ok(taskId);
    }

    @GetMapping("/api/report/status/{taskId}")
    public ResponseEntity<ExportStatus> getExportStatus(@PathVariable String taskId) {
        return ResponseEntity.ok(baodanbao.com.cn.catering.task.ExportTaskRegistry.getStatus(taskId));
    }
}

5. 高频指标缓存预热

每日凌晨预计算昨日全量数据:

@Component
public class DailyStatPreWarmer {

    @Scheduled(cron = "0 5 0 * * ?") // 每天00:05执行
    public void warmYesterdayStat() {
        LocalDate yesterday = LocalDate.now().minusDays(1);
        List<MerchantSummary> summaries = baodanbao.com.cn.catering.service.StatisticalService
            .computeMerchantSummary(yesterday);

        for (MerchantSummary s : summaries) {
            String key = "baodan:stat:merchant:" + s.getMerchantId() + ":daily:" + yesterday;
            redisTemplate.opsForValue().set(key, JSON.toJSONString(s), 7, TimeUnit.DAYS);
        }
    }
}

前端查询昨日数据时优先读取该缓存,响应时间从秒级降至毫秒级。

本文著作权归 俱美开放平台 ,转载请注明出处!

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐