基于 Flink 的实时淘客订单流处理:用于异常检测与即时返利通知
在高并发淘客场景中,用户下单后需在秒级内完成订单有效性校验、佣金计算与返利通知。微赚淘客系统3.0 引入。该 Flink 作业在生产环境处理峰值 5000+ TPS,端到端延迟 <800ms,显著提升返利时效性与风控能力。构建实时订单处理管道,实现从 Kafka 订单事件接入到异常检测、多级返利计算、微信通知的端到端低延迟处理。淘宝联盟通过 Webhook 推送订单至 Kafka,Topic 为。
·
基于 Flink 的实时淘客订单流处理:用于异常检测与即时返利通知
大家好,我是 微赚淘客系统3.0 的研发者省赚客!
在高并发淘客场景中,用户下单后需在秒级内完成订单有效性校验、佣金计算与返利通知。传统批处理模式存在延迟高、异常响应慢等问题。微赚淘客系统3.0 引入 Apache Flink 构建实时订单处理管道,实现从 Kafka 订单事件接入到异常检测、多级返利计算、微信通知的端到端低延迟处理。
一、数据源与订单事件结构
淘宝联盟通过 Webhook 推送订单至 Kafka,Topic 为 taobao_order_events,消息格式如下:
{
"tradeId": "T20260130142301",
"userId": 10086,
"itemId": "675849302",
"itemTitle": "iPhone 15 保护壳",
"price": 29.90,
"commissionRate": 0.10,
"status": "CREATE", // CREATE / SETTLE / INVALID
"timestamp": 1706611381000,
"ip": "203.0.113.45"
}
Flink 消费该 Topic 并解析为 POJO:
package juwatech.cn.flink.model;
public class TaobaoOrderEvent {
public String tradeId;
public Long userId;
public String itemId;
public String itemTitle;
public Double price;
public Double commissionRate;
public String status;
public Long timestamp;
public String ip;
}
二、Flink 作业主流程
package juwatech.cn.flink.job;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
public class RealtimeRebateJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(10000); // 每10秒 checkpoint
// 1. 读取 Kafka 订单流
DataStream<TaobaoOrderEvent> orderStream = env
.addSource(new FlinkKafkaConsumer<>(
"taobao_order_events",
new JsonDeserializationSchema<>(TaobaoOrderEvent.class),
kafkaProps()
))
.name("kafka-source");
// 2. 过滤有效订单(仅处理 CREATE 和 SETTLE)
DataStream<TaobaoOrderEvent> validOrders = orderStream
.filter(event -> "CREATE".equals(event.status) || "SETTLE".equals(event.status))
.name("filter-valid-orders");
// 3. 关联用户邀请关系(闭包表预加载至 State)
DataStream<RebateContext> rebateStream = validOrders
.keyBy(event -> event.userId)
.process(new UserInviteEnrichFunction())
.name("enrich-invite-chain");
// 4. 计算多级返利
DataStream<RebateRecord> rebateRecords = rebateStream
.flatMap(new MultiLevelRebateCalculator())
.name("calculate-rebates");
// 5. 异常检测:同一 IP 短时高频下单
DataStream<AbnormalOrderAlert> alerts = orderStream
.keyBy(event -> event.ip)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.aggregate(new OrderCountAgg(), new AbnormalIpAlertFunction())
.name("abnormal-ip-detection");
// 6. 输出返利记录到 Kafka
rebateRecords.addSink(new FlinkKafkaProducer<>(
"rebate_records",
new RebateRecordSerializationSchema(),
kafkaProps()
)).name("sink-rebates");
// 7. 输出告警到企业微信机器人
alerts.addSink(new WeComAlertSink()).name("sink-alerts");
env.execute("Realtime Rebate Processing Job");
}
}
三、用户邀请关系状态管理
使用 KeyedProcessFunction 加载并缓存用户上级链路:
package juwatech.cn.flink.function;
public class UserInviteEnrichFunction
extends KeyedProcessFunction<Long, TaobaoOrderEvent, RebateContext> {
private transient ValueState<List<Long>> inviteChainState;
@Override
public void open(Configuration parameters) {
ValueStateDescriptor<List<Long>> descriptor =
new ValueStateDescriptor<>("inviteChain", TypeInformation.of(new TypeHint<List<Long>>() {}));
inviteChainState = getRuntimeContext().getState(descriptor);
}
@Override
public void processElement(TaobaoOrderEvent event, Context ctx, Collector<RebateContext> out)
throws Exception {
List<Long> chain = inviteChainState.value();
if (chain == null) {
// 从 MySQL 闭包表加载(实际应通过 CDC 同步至维表)
chain = loadInviteChainFromDb(event.userId);
inviteChainState.update(chain);
}
out.collect(new RebateContext(event, chain));
}
private List<Long> loadInviteChainFromDb(Long userId) {
// 调用 juwatech.cn.rebate.dao.UserClosureDao
return UserClosureDao.findAncestorsWithinDepth(userId, 1, 3);
}
}
四、多级返利计算 FlatMapFunction
package juwatech.cn.flink.function;
public class MultiLevelRebateCalculator
implements FlatMapFunction<RebateContext, RebateRecord> {
private static final BigDecimal[] RATES = {
BigDecimal.ZERO,
new BigDecimal("0.10"),
new BigDecimal("0.05"),
new BigDecimal("0.02")
};
@Override
public void flatMap(RebateContext ctx, Collector<RebateRecord> out) {
TaobaoOrderEvent order = ctx.order;
BigDecimal commission = BigDecimal.valueOf(order.price * order.commissionRate);
List<Long> chain = ctx.inviteChain; // [parent, grandparent, ...]
for (int i = 0; i < chain.size() && i < RATES.length - 1; i++) {
Long benefiter = chain.get(i);
BigDecimal rate = RATES[i + 1];
BigDecimal amount = commission.multiply(rate).setScale(2, RoundingMode.DOWN);
out.collect(new RebateRecord(
order.tradeId,
order.userId,
benefiter,
i + 1,
amount.doubleValue(),
System.currentTimeMillis()
));
}
}
}
五、异常检测:高频 IP 告警
package juwatech.cn.flink.function;
public class OrderCountAgg implements AggregateFunction<
TaobaoOrderEvent,
Long,
Tuple2<String, Long>> {
@Override
public Long createAccumulator() { return 0L; }
@Override
public Long add(TaobaoOrderEvent event, Long acc) { return acc + 1; }
@Override
public Tuple2<String, Long> getResult(Long acc) {
// 实际需携带 IP,此处简化
return new Tuple2<>("PLACEHOLDER_IP", acc);
}
@Override
public Long merge(Long a, Long b) { return a + b; }
}
public class AbnormalIpAlertFunction
extends ProcessWindowFunction<Tuple2<String, Long>, AbnormalOrderAlert, String, TimeWindow> {
@Override
public void process(String ip, Context context, Iterable<Tuple2<String, Long>> elements,
Collector<AbnormalOrderAlert> out) {
Long count = elements.iterator().next().f1;
if (count > 50) { // 5分钟超50单视为异常
out.collect(new AbnormalOrderAlert(ip, count, context.window().getEnd()));
}
}
}
六、结果输出与通知
返利记录写入 Kafka 后由下游服务消费,完成账户入账;异常告警通过企业微信 Webhook 发送:
package juwatech.cn.flink.sink;
public class WeComAlertSink implements SinkFunction<AbnormalOrderAlert> {
private final String webhookUrl = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxx";
@Override
public void invoke(AbnormalOrderAlert alert, Context context) {
String msg = String.format(
"【异常订单告警】\nIP: %s\n5分钟订单数: %d\n时间: %s",
alert.ip, alert.count, new Date(alert.windowEnd)
);
HttpClient.post(webhookUrl, "{\"msgtype\":\"text\",\"text\":{\"content\":\"" + msg + "\"}}");
}
}
该 Flink 作业在生产环境处理峰值 5000+ TPS,端到端延迟 <800ms,显著提升返利时效性与风控能力。
本文著作权归 微赚淘客系统3.0 研发团队,转载请注明出处!
更多推荐
所有评论(0)