利用 Flink 实现实时淘客订单流处理与返利状态更新

大家好,我是 微赚淘客系统3.0 的研发者省赚客!

微赚淘客系统3.0每日接入超80万笔淘宝联盟订单回调,传统批处理模式存在数小时延迟,用户无法及时感知返利到账。我们基于 Apache Flink 构建实时订单处理流水线,从 Kafka 消费原始订单事件,经关联用户、计算佣金、状态去重后写入数据库与通知服务,端到端延迟控制在3秒内。

一、订单事件数据源定义

淘宝联盟回调经网关标准化后写入 Kafka:

{
  "order_id": "20240515ABCD1234",
  "item_id": "675849302193",
  "tk_status": "3", // 3=付款,12=确认收货,13=结算成功
  "pub_id": "mm_12345678_90123456_78901234",
  "adzone_id": "90123456",
  "tk_commission": "12.50",
  "event_time": 1715769600000
}

Flink 消费 Kafka 主题:

// juwatech.cn.flink.source.TbkOrderSource
public class TbkOrderSource {
    public static DataStream<TbkOrderEvent> fromKafka(StreamExecutionEnvironment env) {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "kafka.juwatech.cn:9092");
        props.setProperty("group.id", "flink-rebate-processor");

        KafkaSource<TbkOrderEvent> source = KafkaSource.<TbkOrderEvent>builder()
            .setBootstrapServers("kafka.juwatech.cn:9092")
            .setGroupId("flink-rebate-processor")
            .setTopics("tbk.order.raw")
            .setValueOnlyDeserializer(new JsonDeserializationSchema<>(TbkOrderEvent.class))
            .setStartingOffsets(OffsetsInitializer.latest())
            .build();

        return env.fromSource(source, WatermarkStrategy.noWatermarks(), "TbkOrderSource");
    }
}

二、用户维度数据关联

通过 adzone_id 关联内部用户ID(使用 Flink Async I/O):

// juwatech.cn.flink.enrich.UserEnrichFunction
public class UserEnrichFunction extends RichAsyncFunction<TbkOrderEvent, EnrichedOrderEvent> {

    private transient JedisCluster jedis;

    @Override
    public void open(Configuration parameters) {
        jedis = new JedisCluster(new HostAndPort("redis.juwatech.cn", 7000));
    }

    @Override
    public void asyncInvoke(TbkOrderEvent input, ResultFuture<EnrichedOrderEvent> resultFuture) {
        String userIdStr = jedis.get("adzone:user:" + input.getAdzoneId());
        if (userIdStr != null) {
            EnrichedOrderEvent enriched = new EnrichedOrderEvent(input);
            enriched.setUserId(Long.parseLong(userIdStr));
            resultFuture.complete(Collections.singletonList(enriched));
        } else {
            resultFuture.complete(Collections.emptyList()); // 丢弃无效流量
        }
    }
}

主流程调用:

DataStream<EnrichedOrderEvent> enrichedStream = 
    orderStream
        .keyBy(event -> event.getAdzoneId())
        .asyncFlatMap(new UserEnrichFunction())
        .name("UserEnrich");

三、状态去重与幂等处理

同一订单可能多次回调(如状态变更),需按 order_id 去重:

// juwatech.cn.flink.dedup.OrderDedupFunction
public class OrderDedupFunction extends RichFlatMapFunction<EnrichedOrderEvent, ProcessedOrderEvent> {

    private transient ValueState<String> latestStatus;

    @Override
    public void open(Configuration parameters) {
        ValueStateDescriptor<String> descriptor = 
            new ValueStateDescriptor<>("latest_tk_status", String.class);
        latestStatus = getRuntimeContext().getState(descriptor);
    }

    @Override
    public void flatMap(EnrichedOrderEvent event, Collector<ProcessedOrderEvent> out) throws Exception {
        String currentStatus = latestStatus.value();
        if (currentStatus == null || !currentStatus.equals(event.getTkStatus())) {
            latestStatus.update(event.getTkStatus());
            out.collect(new ProcessedOrderEvent(event));
        }
        // 若状态未变,丢弃重复事件
    }
}

应用去重:

DataStream<ProcessedOrderEvent> dedupedStream = 
    enrichedStream
        .keyBy(event -> event.getOrderId())
        .flatMap(new OrderDedupFunction())
        .name("OrderDedup");

四、返利金额计算与状态机更新

根据业务规则计算最终返利并更新数据库:

// juwatech.cn.flink.sink.RebateUpdateSink
public class RebateUpdateSink implements SinkFunction<ProcessedOrderEvent> {

    private transient DataSource dataSource;
    private transient JdbcTemplate jdbcTemplate;

    @Override
    public void open(Configuration parameters) {
        HikariConfig config = new HikariConfig();
        config.setJdbcUrl("jdbc:mysql://mysql.juwatech.cn:3306/rebate_db");
        config.setUsername("rebate_user");
        config.setPassword("******");
        dataSource = new HikariDataSource(config);
        jdbcTemplate = new JdbcTemplate(dataSource);
    }

    @Override
    public void invoke(ProcessedOrderEvent event, Context context) {
        // 1. 计算实际返利(考虑用户等级加成)
        BigDecimal finalRebate = calculateRebate(event);

        // 2. 插入或更新返利记录
        String sql = """
            INSERT INTO rebate_record (order_id, user_id, item_id, amount, status, updated_at)
            VALUES (?, ?, ?, ?, ?, NOW())
            ON DUPLICATE KEY UPDATE
                status = VALUES(status),
                amount = VALUES(amount),
                updated_at = NOW()
            """;
        jdbcTemplate.update(sql,
            event.getOrderId(),
            event.getUserId(),
            event.getItemId(),
            finalRebate,
            mapTaoBaoStatus(event.getTkStatus())
        );

        // 3. 若为结算成功,触发通知
        if ("SETTLED".equals(mapTaoBaoStatus(event.getTkStatus()))) {
            sendNotification(event.getUserId(), event.getOrderId(), finalRebate);
        }
    }

    private BigDecimal calculateRebate(ProcessedOrderEvent event) {
        // 调用规则引擎或直接计算
        return event.getTkCommission().multiply(BigDecimal.valueOf(0.9)); // 扣除平台服务费
    }

    private String mapTaoBaoStatus(String tkStatus) {
        return switch (tkStatus) {
            case "3" -> "PAID";
            case "12" -> "CONFIRMED";
            case "13" -> "SETTLED";
            default -> "UNKNOWN";
        };
    }

    private void sendNotification(Long userId, String orderId, BigDecimal amount) {
        // 发送至通知队列
        NotificationMessage msg = new NotificationMessage(userId, "返利到账", 
            "订单 " + orderId + " 返利 ¥" + amount + " 已到账");
        kafkaProducer.send("user.notification", JSON.toJSONString(msg));
    }
}

五、作业部署与容错配置

Flink 作业主类:

// juwatech.cn.flink.job.RealtimeRebateJob
public class RealtimeRebateJob {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(10000); // 每10秒 checkpoint
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)));

        DataStream<TbkOrderEvent> orders = TbkOrderSource.fromKafka(env);
        DataStream<EnrichedOrderEvent> enriched = orders
            .keyBy(e -> e.getAdzoneId())
            .asyncFlatMap(new UserEnrichFunction());

        DataStream<ProcessedOrderEvent> processed = enriched
            .keyBy(e -> e.getOrderId())
            .flatMap(new OrderDedupFunction());

        processed.addSink(new RebateUpdateSink()).name("RebateDBSink");

        env.execute("Realtime Rebate Processing Job");
    }
}

提交命令:

flink run -d \
  -p 8 \
  -ys 2 \
  -yjm 1024m \
  -ytm 2048m \
  ./juwatech-flink-rebate-1.0.jar

本文著作权归 微赚淘客系统3.0 研发团队,转载请注明出处!

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐