利用 Flink 实现实时淘客订单流处理与返利状态更新
微赚淘客系统3.0每日接入超80万笔淘宝联盟订单回调,传统批处理模式存在数小时延迟,用户无法及时感知返利到账。我们基于 Apache Flink 构建实时订单处理流水线,从 Kafka 消费原始订单事件,经关联用户、计算佣金、状态去重后写入数据库与通知服务,端到端延迟控制在3秒内。本文著作权归 微赚淘客系统3.0 研发团队,转载请注明出处!大家好,我是 微赚淘客系统3.0 的研发者省赚客!同一订单
·
利用 Flink 实现实时淘客订单流处理与返利状态更新
大家好,我是 微赚淘客系统3.0 的研发者省赚客!
微赚淘客系统3.0每日接入超80万笔淘宝联盟订单回调,传统批处理模式存在数小时延迟,用户无法及时感知返利到账。我们基于 Apache Flink 构建实时订单处理流水线,从 Kafka 消费原始订单事件,经关联用户、计算佣金、状态去重后写入数据库与通知服务,端到端延迟控制在3秒内。
一、订单事件数据源定义
淘宝联盟回调经网关标准化后写入 Kafka:
{
"order_id": "20240515ABCD1234",
"item_id": "675849302193",
"tk_status": "3", // 3=付款,12=确认收货,13=结算成功
"pub_id": "mm_12345678_90123456_78901234",
"adzone_id": "90123456",
"tk_commission": "12.50",
"event_time": 1715769600000
}
Flink 消费 Kafka 主题:
// juwatech.cn.flink.source.TbkOrderSource
public class TbkOrderSource {
public static DataStream<TbkOrderEvent> fromKafka(StreamExecutionEnvironment env) {
Properties props = new Properties();
props.setProperty("bootstrap.servers", "kafka.juwatech.cn:9092");
props.setProperty("group.id", "flink-rebate-processor");
KafkaSource<TbkOrderEvent> source = KafkaSource.<TbkOrderEvent>builder()
.setBootstrapServers("kafka.juwatech.cn:9092")
.setGroupId("flink-rebate-processor")
.setTopics("tbk.order.raw")
.setValueOnlyDeserializer(new JsonDeserializationSchema<>(TbkOrderEvent.class))
.setStartingOffsets(OffsetsInitializer.latest())
.build();
return env.fromSource(source, WatermarkStrategy.noWatermarks(), "TbkOrderSource");
}
}
二、用户维度数据关联
通过 adzone_id 关联内部用户ID(使用 Flink Async I/O):
// juwatech.cn.flink.enrich.UserEnrichFunction
public class UserEnrichFunction extends RichAsyncFunction<TbkOrderEvent, EnrichedOrderEvent> {
private transient JedisCluster jedis;
@Override
public void open(Configuration parameters) {
jedis = new JedisCluster(new HostAndPort("redis.juwatech.cn", 7000));
}
@Override
public void asyncInvoke(TbkOrderEvent input, ResultFuture<EnrichedOrderEvent> resultFuture) {
String userIdStr = jedis.get("adzone:user:" + input.getAdzoneId());
if (userIdStr != null) {
EnrichedOrderEvent enriched = new EnrichedOrderEvent(input);
enriched.setUserId(Long.parseLong(userIdStr));
resultFuture.complete(Collections.singletonList(enriched));
} else {
resultFuture.complete(Collections.emptyList()); // 丢弃无效流量
}
}
}
主流程调用:
DataStream<EnrichedOrderEvent> enrichedStream =
orderStream
.keyBy(event -> event.getAdzoneId())
.asyncFlatMap(new UserEnrichFunction())
.name("UserEnrich");
三、状态去重与幂等处理
同一订单可能多次回调(如状态变更),需按 order_id 去重:
// juwatech.cn.flink.dedup.OrderDedupFunction
public class OrderDedupFunction extends RichFlatMapFunction<EnrichedOrderEvent, ProcessedOrderEvent> {
private transient ValueState<String> latestStatus;
@Override
public void open(Configuration parameters) {
ValueStateDescriptor<String> descriptor =
new ValueStateDescriptor<>("latest_tk_status", String.class);
latestStatus = getRuntimeContext().getState(descriptor);
}
@Override
public void flatMap(EnrichedOrderEvent event, Collector<ProcessedOrderEvent> out) throws Exception {
String currentStatus = latestStatus.value();
if (currentStatus == null || !currentStatus.equals(event.getTkStatus())) {
latestStatus.update(event.getTkStatus());
out.collect(new ProcessedOrderEvent(event));
}
// 若状态未变,丢弃重复事件
}
}
应用去重:
DataStream<ProcessedOrderEvent> dedupedStream =
enrichedStream
.keyBy(event -> event.getOrderId())
.flatMap(new OrderDedupFunction())
.name("OrderDedup");
四、返利金额计算与状态机更新
根据业务规则计算最终返利并更新数据库:
// juwatech.cn.flink.sink.RebateUpdateSink
public class RebateUpdateSink implements SinkFunction<ProcessedOrderEvent> {
private transient DataSource dataSource;
private transient JdbcTemplate jdbcTemplate;
@Override
public void open(Configuration parameters) {
HikariConfig config = new HikariConfig();
config.setJdbcUrl("jdbc:mysql://mysql.juwatech.cn:3306/rebate_db");
config.setUsername("rebate_user");
config.setPassword("******");
dataSource = new HikariDataSource(config);
jdbcTemplate = new JdbcTemplate(dataSource);
}
@Override
public void invoke(ProcessedOrderEvent event, Context context) {
// 1. 计算实际返利(考虑用户等级加成)
BigDecimal finalRebate = calculateRebate(event);
// 2. 插入或更新返利记录
String sql = """
INSERT INTO rebate_record (order_id, user_id, item_id, amount, status, updated_at)
VALUES (?, ?, ?, ?, ?, NOW())
ON DUPLICATE KEY UPDATE
status = VALUES(status),
amount = VALUES(amount),
updated_at = NOW()
""";
jdbcTemplate.update(sql,
event.getOrderId(),
event.getUserId(),
event.getItemId(),
finalRebate,
mapTaoBaoStatus(event.getTkStatus())
);
// 3. 若为结算成功,触发通知
if ("SETTLED".equals(mapTaoBaoStatus(event.getTkStatus()))) {
sendNotification(event.getUserId(), event.getOrderId(), finalRebate);
}
}
private BigDecimal calculateRebate(ProcessedOrderEvent event) {
// 调用规则引擎或直接计算
return event.getTkCommission().multiply(BigDecimal.valueOf(0.9)); // 扣除平台服务费
}
private String mapTaoBaoStatus(String tkStatus) {
return switch (tkStatus) {
case "3" -> "PAID";
case "12" -> "CONFIRMED";
case "13" -> "SETTLED";
default -> "UNKNOWN";
};
}
private void sendNotification(Long userId, String orderId, BigDecimal amount) {
// 发送至通知队列
NotificationMessage msg = new NotificationMessage(userId, "返利到账",
"订单 " + orderId + " 返利 ¥" + amount + " 已到账");
kafkaProducer.send("user.notification", JSON.toJSONString(msg));
}
}
五、作业部署与容错配置
Flink 作业主类:
// juwatech.cn.flink.job.RealtimeRebateJob
public class RealtimeRebateJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(10000); // 每10秒 checkpoint
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)));
DataStream<TbkOrderEvent> orders = TbkOrderSource.fromKafka(env);
DataStream<EnrichedOrderEvent> enriched = orders
.keyBy(e -> e.getAdzoneId())
.asyncFlatMap(new UserEnrichFunction());
DataStream<ProcessedOrderEvent> processed = enriched
.keyBy(e -> e.getOrderId())
.flatMap(new OrderDedupFunction());
processed.addSink(new RebateUpdateSink()).name("RebateDBSink");
env.execute("Realtime Rebate Processing Job");
}
}
提交命令:
flink run -d \
-p 8 \
-ys 2 \
-yjm 1024m \
-ytm 2048m \
./juwatech-flink-rebate-1.0.jar
本文著作权归 微赚淘客系统3.0 研发团队,转载请注明出处!
更多推荐
所有评论(0)