外卖餐饮API开发:Java实现霸王餐数据的实时统计与报表生成技巧
在高并发外卖场景中,霸王餐活动需对核销订单、用户参与、商家补贴等指标进行秒级统计,并支持多维度动态报表。本文基于Flink实时计算、Redis聚合缓存、MyBatis动态SQL及EasyExcel导出,构建低延迟、高吞吐的统计系统。前端查询昨日数据时优先读取该缓存,响应时间从秒级降至毫秒级。本文著作权归 俱美开放平台 ,转载请注明出处!
·
外卖餐饮API开发:Java实现霸王餐数据的实时统计与报表生成技巧
在高并发外卖场景中,霸王餐活动需对核销订单、用户参与、商家补贴等指标进行秒级统计,并支持多维度动态报表。本文基于Flink实时计算、Redis聚合缓存、MyBatis动态SQL及EasyExcel导出,构建低延迟、高吞吐的统计系统。
1. 实时数据流接入与预处理
通过Kafka接收核销事件,Flink作业清洗并写入中间Topic:
// Flink作业(非Spring上下文)
public class RedemptionEventProcessor {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000);
DataStream<RedemptionEvent> stream = env
.addSource(new FlinkKafkaConsumer<>("redemption-events",
new JSONKeyValueDeserializationSchema(), props))
.map(record -> {
JSONObject value = record.getObject("value");
return new RedemptionEvent(
value.getString("orderId"),
value.getString("userId"),
value.getString("merchantId"),
value.getBigDecimal("subsidyAmount"),
value.getLong("timestamp")
);
})
.filter(event -> event.getSubsidyAmount().compareTo(BigDecimal.ZERO) > 0);
// 按商户ID分组,写入聚合Topic
stream.keyBy(RedemptionEvent::getMerchantId)
.process(new MerchantAggProcessFunction())
.addSink(new FlinkKafkaProducer<>("merchant-agg", new SimpleStringSchema(), props));
env.execute("Redemption Aggregation");
}
}

2. Redis实时聚合缓存设计
使用Hash结构存储分钟级、小时级、日级指标:
package baodanbao.com.cn.catering.stat;
@Service
public class RealTimeStatService {
@Autowired
private StringRedisTemplate redisTemplate;
private static final String KEY_PREFIX = "baodan:stat:merchant:";
public void updateMerchantStat(String merchantId, BigDecimal amount, long timestamp) {
String dateKey = LocalDate.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault()).toString();
String hourKey = dateKey + ":" + LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault()).getHour();
String minuteKey = hourKey + ":" + (LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault()).getMinute() / 5 * 5); // 5分钟粒度
// 日汇总
redisTemplate.opsForHash().increment(KEY_PREFIX + merchantId + ":daily", dateKey, amount.doubleValue());
// 小时汇总
redisTemplate.opsForHash().increment(KEY_PREFIX + merchantId + ":hourly", hourKey, amount.doubleValue());
// 5分钟汇总
redisTemplate.opsForHash().increment(KEY_PREFIX + merchantId + ":min5", minuteKey, amount.doubleValue());
// 总参与人数(Set去重)
String userId = getCurrentUserId(); // 从上下文获取
redisTemplate.opsForSet().add(KEY_PREFIX + merchantId + ":users:daily:" + dateKey, userId);
}
public MerchantStat getTodayStat(String merchantId) {
String today = LocalDate.now().toString();
Double amount = (Double) redisTemplate.opsForHash().get(KEY_PREFIX + merchantId + ":daily", today);
Long userCount = redisTemplate.opsForSet().size(KEY_PREFIX + merchantId + ":users:daily:" + today);
return new MerchantStat(amount == null ? 0.0 : amount, userCount == null ? 0L : userCount);
}
}
3. 动态SQL构建多维查询
支持按时间、区域、商户类型组合筛选:
@Mapper
public interface RedemptionReportMapper {
@Select({
"<script>",
"SELECT",
" DATE(r.create_time) AS stat_date,",
" r.merchant_id,",
" m.region,",
" SUM(r.subsidy_amount) AS total_subsidy,",
" COUNT(*) AS order_count",
"FROM redemption_record r",
"JOIN merchant m ON r.merchant_id = m.id",
"WHERE r.status = 'SUCCESS'",
"<if test='startTime != null'>AND r.create_time >= #{startTime}</if>",
"<if test='endTime != null'>AND r.create_time <= #{endTime}</if>",
"<if test='region != null'>AND m.region = #{region}</if>",
"<if test='merchantIds != null and !merchantIds.isEmpty()'>",
" AND r.merchant_id IN",
" <foreach item='id' collection='merchantIds' open='(' separator=',' close=')'>#{id}</foreach>",
"</if>",
"GROUP BY DATE(r.create_time), r.merchant_id, m.region",
"ORDER BY stat_date DESC",
"</script>"
})
List<RedemptionDailyStat> queryDailyStats(@Param("startTime") LocalDateTime startTime,
@Param("endTime") LocalDateTime endTime,
@Param("region") String region,
@Param("merchantIds") List<String> merchantIds);
}
4. 报表异步生成与下载
避免大查询阻塞主线程:
@RestController
public class ReportExportController {
@PostMapping("/api/report/export")
public ResponseEntity<String> triggerExport(@RequestBody ReportExportRequest request) {
String taskId = UUID.randomUUID().toString();
CompletableFuture.runAsync(() -> {
try {
List<RedemptionDailyStat> data = baodanbao.com.cn.catering.mapper.RedemptionReportMapper
.queryDailyStats(request.getStartTime(), request.getEndTime(), request.getRegion(), request.getMerchantIds());
String filePath = "/tmp/report_" + taskId + ".xlsx";
EasyExcel.write(filePath, RedemptionDailyStat.class)
.sheet("霸王餐日报")
.doWrite(data);
// 上传至OSS并记录状态
String ossUrl = baodanbao.com.cn.catering.storage.OssClient.upload(filePath);
baodanbao.com.cn.catering.task.ExportTaskRegistry.markSuccess(taskId, ossUrl);
} catch (Exception e) {
baodanbao.com.cn.catering.task.ExportTaskRegistry.markFailed(taskId, e.getMessage());
}
}, Executors.newFixedThreadPool(2));
return ResponseEntity.ok(taskId);
}
@GetMapping("/api/report/status/{taskId}")
public ResponseEntity<ExportStatus> getExportStatus(@PathVariable String taskId) {
return ResponseEntity.ok(baodanbao.com.cn.catering.task.ExportTaskRegistry.getStatus(taskId));
}
}
5. 高频指标缓存预热
每日凌晨预计算昨日全量数据:
@Component
public class DailyStatPreWarmer {
@Scheduled(cron = "0 5 0 * * ?") // 每天00:05执行
public void warmYesterdayStat() {
LocalDate yesterday = LocalDate.now().minusDays(1);
List<MerchantSummary> summaries = baodanbao.com.cn.catering.service.StatisticalService
.computeMerchantSummary(yesterday);
for (MerchantSummary s : summaries) {
String key = "baodan:stat:merchant:" + s.getMerchantId() + ":daily:" + yesterday;
redisTemplate.opsForValue().set(key, JSON.toJSONString(s), 7, TimeUnit.DAYS);
}
}
}
前端查询昨日数据时优先读取该缓存,响应时间从秒级降至毫秒级。
本文著作权归 俱美开放平台 ,转载请注明出处!
更多推荐
所有评论(0)