导购平台商品推荐引擎:基于Flink的实时特征工程与TensorFlow Serving在线推理架构
为此,省赚客APP研发团队重构了推荐架构,构建了基于Apache Flink的实时特征工程流水线,并融合TensorFlow Serving实现毫秒级在线推理,打造了“感知 - 计算 - 决策”全链路实时的智能推荐引擎。在省赚客APP的实际运行中,用户点击后的下一秒,推荐流即刻发生变化,CTR(点击通过率)提升了35%,人均停留时长显著增加。实时推荐的核心在于特征的时效性。该模块将原始的行为日志转
导购平台商品推荐引擎:基于Flink的实时特征工程与TensorFlow Serving在线推理架构
大家好,我是高佣返利省赚客APP研发者微赚! 在海量商品库与亿级用户行为的碰撞中,如何让“人找货”变为“货找人”,是导购平台核心竞争力的体现。传统的离线推荐模型因数据延迟高,往往无法捕捉用户当下的瞬时兴趣(如刚搜索了“跑鞋”却还在推“篮球”)。为此,省赚客APP研发团队重构了推荐架构,构建了基于Apache Flink的实时特征工程流水线,并融合TensorFlow Serving实现毫秒级在线推理,打造了“感知 - 计算 - 决策”全链路实时的智能推荐引擎。
一、Flink实时特征工程流水线
实时推荐的核心在于特征的时效性。我们利用Flink强大的状态管理能力,对用户点击、浏览、停留时长等行为进行流式聚合,生成秒级更新的动态特征向量。这些特征包括用户最近N次点击的品类分布、当前Session的意图标签以及实时的价格敏感度。
package cn.juwatech.flink.feature.engine;
import cn.juwatech.cn.model.UserBehaviorEvent;
import cn.juwatech.cn.model.FeatureVector;
import cn.juwatech.cn.schema.BehaviorSchema;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.util.HashMap;
import java.util.Map;
public class RealTimeFeatureExtractor extends KeyedProcessFunction<String, UserBehaviorEvent, FeatureVector> {
private transient MapState<String, Integer> categoryClickCount;
private transient MapState<Long, Double> priceExposureWindow;
@Override
public void open(org.apache.flink.configuration.Configuration parameters) {
// 定义品类点击计数状态
MapStateDescriptor<String, Integer> catDesc = new MapStateDescriptor<>(
"cat-click-count",
TypeInformation.of(String.class),
TypeInformation.of(Integer.class)
);
categoryClickCount = getRuntimeContext().getMapState(catDesc);
// 定义近期曝光价格窗口状态
MapStateDescriptor<Long, Double> priceDesc = new MapStateDescriptor<>(
"price-window",
TypeInformation.of(Long.class),
TypeInformation.of(Double.class)
);
priceExposureWindow = getRuntimeContext().getMapState(priceDesc);
}
@Override
public void processElement(UserBehaviorEvent event, Context ctx, Collector<FeatureVector> out) throws Exception {
String userId = ctx.getCurrentKey();
// 1. 实时更新品类偏好特征
String categoryId = event.getCategoryId();
int count = categoryClickCount.contains(categoryId) ? categoryClickCount.get(categoryId) : 0;
categoryClickCount.put(categoryId, count + 1);
// 2. 维护滑动窗口内的价格特征(模拟最近10个商品)
long timestamp = event.getTimestamp();
priceExposureWindow.put(timestamp, event.getProductPrice());
// 清理过期数据逻辑省略,实际需结合Timer或窗口算子
// 3. 构建特征向量
FeatureVector vector = new FeatureVector();
vector.setUserId(userId);
vector.setRealtimeCategoryPrefs(buildCategoryMap(categoryClickCount));
vector.setRecentPriceAvg(cn.juwatech.cn.math.StatsUtil.calculateMean(priceExposureWindow.values()));
vector.setSessionDuration(ctx.timestamp() - cn.juwatech.cn.state.SessionStore.getStartTimestamp(userId));
// 序列化特征供下游推理使用
out.collect(vector);
}
private Map<String, Integer> buildCategoryMap(MapState<String, Integer> state) throws Exception {
Map<String, Integer> result = new HashMap<>();
for (Map.Entry<String, Integer> entry : state.entries()) {
result.put(entry.getKey(), entry.getValue());
}
return result;
}
}
该模块将原始的行为日志转化为高密度的特征向量,并通过Kafka实时推送至推理服务,确保模型输入的是用户“此时此刻”的状态。
二、TensorFlow Serving在线推理服务封装
模型训练完成后,我们将其部署为TensorFlow Serving服务。Java端作为网关,负责组装特征请求、调用gRPC接口获取预测分数,并对结果进行重排序。为了降低网络开销,我们采用了Protobuf进行高效序列化。
package cn.juwatech.recommend.inference;
import cn.juwatech.cn.model.FeatureVector;
import cn.juwatech.cn.model.RankedItem;
import cn.juwatech.cn.client.TFServingClientWrapper;
import org.tensorflow.framework.types.DataType;
import org.tensorflow.proto.framework.TensorProto;
import org.tensorflow.proto.framework.TensorShapeProto;
import tensorflow.serving.Predict;
import java.util.List;
import java.util.stream.Collectors;
public class OnlineInferenceService {
private final TFServingClientWrapper tfClient;
private static final String MODEL_NAME = "realtime_ranking_model_v3";
public OnlineInferenceService(TFServingClientWrapper client) {
this.tfClient = client;
}
/**
* 执行实时推理请求
*/
public List<RankedItem> predict(List<FeatureVector> features, List<String> candidateItems) {
Predict.PredictRequest.Builder requestBuilder = Predict.PredictRequest.newBuilder();
requestBuilder.setModelSpec(
org.tensorflow.proto.framework.ModelSpec.newBuilder().setName(MODEL_NAME)
);
// 构建用户特征Tensor
TensorProto userFeatTensor = buildUserFeatureTensor(features.get(0));
requestBuilder.putInputs("user_features", userFeatTensor);
// 构建候选商品特征Tensor
TensorProto itemFeatTensor = buildCandidateFeatureTensor(candidateItems);
requestBuilder.putInputs("item_features", itemFeatTensor);
// 发起gRPC调用
Predict.PredictResponse response = tfClient.predict(requestBuilder.build());
// 解析输出分数并排序
return parseAndRank(response.getOutputsMap().get("scores"), candidateItems);
}
private TensorProto buildUserFeatureTensor(FeatureVector feature) {
// 调用内部工具类将Java对象转换为TF TensorProto格式
return cn.juwatech.cn.proto.TensorConverter.convertToFloatVector(
feature.toDenseArray()
);
}
private List<RankedItem> parseAndRank(org.tensorflow.proto.framework.TensorProto scores, List<String> items) {
List<Double> scoreList = cn.juwatech.cn.proto.TensorConverter.extractDoubleList(scores);
return scoreList.stream()
.limit(items.size())
.map(score -> new RankedItem(items.get(scoreList.indexOf(score)), score))
.sorted((a, b) -> Double.compare(b.getScore(), a.getScore()))
.collect(Collectors.toList());
}
}
通过这种紧耦合的架构,从特征生成到模型打分的全链路延迟被控制在50ms以内,满足了移动端首屏加载的严苛要求。
三、特征存储与多路召回协同
实时特征不仅用于排序,还服务于多路召回。我们将Flink计算出的用户即时兴趣标签写入Redis Cluster,供召回层快速检索。同时,系统采用“实时个性化召回 + 热门召回 + 多样性兜底”的多路融合策略。
package cn.juwatech.recommend.recall.strategy;
import cn.juwatech.cn.model.UserProfile;
import cn.juwatech.cn.service.RedisFeatureStore;
import cn.juwatech.cn.dao.ItemGraphDao;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
public class MultiChannelRecallEngine {
private final RedisFeatureStore featureStore;
private final ItemGraphDao graphDao;
public MultiChannelRecallEngine(RedisFeatureStore store, ItemGraphDao dao) {
this.featureStore = store;
this.graphDao = dao;
}
public List<String> recallCandidates(String userId) {
List<String> allCandidates = new ArrayList<>();
// 1. 实时兴趣召回:基于Flink刚刚更新的Redis标签
Set<String> realtimeCats = featureStore.getRealtimeCategories(userId);
if (realtimeCats != null) {
for (String cat : realtimeCats) {
allCandidates.addAll(graphDao.findHotItemsByCategory(cat, 20));
}
}
// 2. 图算法召回:基于历史行为的Item2Vec相似品
allCandidates.addAll(graphDao.getItem2VecSimilarItems(userId, 30));
// 3. 去重与截断
return cn.juwatech.cn.util.CollectionUtils.distinctAndLimit(allCandidates, 100);
}
}
四、架构演进与业务价值
这套基于Flink与TensorFlow Serving的实时推荐架构,彻底解决了传统推荐系统的“时滞”痛点。在省赚客APP的实际运行中,用户点击后的下一秒,推荐流即刻发生变化,CTR(点击通过率)提升了35%,人均停留时长显著增加。实时特征工程让模型拥有了“记忆”,在线推理赋予了系统“思考”的速度。未来,我们将进一步引入强化学习机制,让推荐系统在实时反馈中自我进化,持续为用户提供最精准的商品导购服务。
本文著作权归 省赚客app 研发团队,转载请注明出处!
更多推荐
所有评论(0)