基于Elasticsearch构建外卖试吃活动参与日志的实时分析看板

在“霸王餐”试吃活动中,运营需实时监控用户参与行为:如每分钟新增参与数、各城市分布、失败原因统计、高并发峰值等。传统关系型数据库难以支撑高写入吞吐与低延迟聚合查询。Elasticsearch(ES)凭借其分布式、近实时索引和强大的聚合能力,成为构建参与日志分析看板的理想选择。

定义参与日志文档结构

首先设计 ES 索引映射(mapping),确保字段类型精准:

PUT /free_meal_participation_log
{
  "mappings": {
    "properties": {
      "userId": { "type": "keyword" },
      "activityId": { "type": "keyword" },
      "city": { "type": "keyword" },
      "status": { "type": "keyword" },
      "errorCode": { "type": "keyword" },
      "ip": { "type": "ip" },
      "deviceType": { "type": "keyword" },
      "timestamp": { "type": "date", "format": "strict_date_optional_time||epoch_millis" },
      "durationMs": { "type": "integer" }
    }
  }
}

keyword 类型用于精确匹配与 Terms 聚合,date 支持时间范围查询。
在这里插入图片描述

Java 应用写入日志到 Elasticsearch

使用 Spring Data Elasticsearch 客户端:

package baodanbao.com.cn.meituan.log.model;

import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;

import java.time.Instant;

@Document(indexName = "free_meal_participation_log")
public class ParticipationLog {

    @Id
    private String id;

    @Field(type = FieldType.Keyword)
    private String userId;

    @Field(type = FieldType.Keyword)
    private String activityId;

    @Field(type = FieldType.Keyword)
    private String city;

    @Field(type = FieldType.Keyword)
    private String status; // SUCCESS, FAILED

    @Field(type = FieldType.Keyword)
    private String errorCode;

    @Field(type = FieldType.Ip)
    private String ip;

    @Field(type = FieldType.Keyword)
    private String deviceType;

    @Field(type = FieldType.Date)
    private Instant timestamp;

    @Field(type = FieldType.Integer)
    private Integer durationMs;

    // constructors, getters, setters
}

Repository 接口:

package baodanbao.com.cn.meituan.log.repository;

import baodanbao.com.cn.meituan.log.model.ParticipationLog;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
import org.springframework.stereotype.Repository;

@Repository
public interface ParticipationLogRepository 
    extends ElasticsearchRepository<ParticipationLog, String> {
}

服务层异步写入(避免阻塞主流程):

package baodanbao.com.cn.meituan.log.service;

import baodanbao.com.cn.meituan.log.model.ParticipationLog;
import baodanbao.com.cn.meituan.log.repository.ParticipationLogRepository;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

@Service
public class ParticipationLogService {

    private final ParticipationLogRepository logRepository;

    public ParticipationLogService(ParticipationLogRepository logRepository) {
        this.logRepository = logRepository;
    }

    @Async
    public void saveLog(ParticipationLog log) {
        logRepository.save(log);
    }
}

关键聚合查询示例

实时参与趋势(按分钟)

GET /free_meal_participation_log/_search
{
  "size": 0,
  "aggs": {
    "participation_over_time": {
      "date_histogram": {
        "field": "timestamp",
        "calendar_interval": "1m"
      }
    }
  }
}

各城市参与量 Top 10

GET /free_meal_participation_log/_search
{
  "size": 0,
  "aggs": {
    "by_city": {
      "terms": {
        "field": "city",
        "size": 10
      }
    }
  }
}

失败原因分布

GET /free_meal_participation_log/_search
{
  "query": {
    "term": { "status": "FAILED" }
  },
  "size": 0,
  "aggs": {
    "error_codes": {
      "terms": { "field": "errorCode", "size": 20 }
    }
  }
}

Java 封装聚合查询服务

package baodanbao.com.cn.meituan.dashboard.service;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.aggregations.*;
import co.elastic.clients.elasticsearch.core.SearchRequest;
import co.elastic.clients.elasticsearch.core.SearchResponse;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

@Service
public class DashboardAggregationService {

    private final ElasticsearchClient esClient;

    public DashboardAggregationService(ElasticsearchClient esClient) {
        this.esClient = esClient;
    }

    public Map<String, Long> getCityDistribution() throws IOException {
        SearchResponse<Void> response = esClient.search(
            SearchRequest.of(s -> s
                .index("free_meal_participation_log")
                .size(0)
                .aggregations("by_city", a -> a
                    .terms(t -> t.field("city").size(10))
                )
            ),
            Void.class
        );

        Map<String, Long> result = new HashMap<>();
        TermsAggregate terms = response.aggregations().get("by_city").sterms();
        for (TermsBucket bucket : terms.buckets().array()) {
            result.put(bucket.key(), bucket.docCount());
        }
        return result;
    }

    public Map<String, Long> getFailureReasons() throws IOException {
        SearchResponse<Void> response = esClient.search(
            SearchRequest.of(s -> s
                .index("free_meal_participation_log")
                .query(q -> q.term(t -> t.field("status").value("FAILED")))
                .size(0)
                .aggregations("errors", a -> a
                    .terms(t -> t.field("errorCode").size(20))
                )
            ),
            Void.class
        );

        Map<String, Long> result = new HashMap<>();
        TermsAggregate terms = response.aggregations().get("errors").sterms();
        for (TermsBucket bucket : terms.buckets().array()) {
            result.put(bucket.key(), bucket.docCount());
        }
        return result;
    }
}

前端看板对接

后端提供 REST 接口供前端调用:

@RestController
@RequestMapping("/api/dashboard")
public class DashboardController {

    private final DashboardAggregationService aggregationService;

    @GetMapping("/city-distribution")
    public Map<String, Long> cityDistribution() throws IOException {
        return aggregationService.getCityDistribution();
    }

    @GetMapping("/failure-reasons")
    public Map<String, Long> failureReasons() throws IOException {
        return aggregationService.getFailureReasons();
    }
}

前端使用 ECharts 渲染柱状图、饼图,实现秒级刷新。

性能与运维建议

  • 写入使用 Bulk API 批量提交(Spring Data 默认支持)。
  • timestamp 字段设置 ILM(Index Lifecycle Management)策略,自动滚动索引并删除过期数据。
  • 对高频查询字段(如 city, status)确保未被分词,使用 keyword 类型。
  • 部署专用协调节点(coordinating node)处理聚合请求,避免影响数据节点。

通过 Elasticsearch 的实时索引与多维聚合能力,外卖试吃活动的参与日志可被高效转化为运营决策依据,支撑精细化活动调控。

本文著作权归 俱美开放平台 ,转载请注明出处!

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐