Spring-boot-elasticsearch-timeseries-demo

该项目作为Elasticsearch用于时序数据库用途的demo,核心是使用ES模板来快速生成分片索引,查询时借助索引别名完成分片的聚合查询,本例按月进行分片,例如my_index_test_2021-6

环境

  • ES集群(7.1.1):
    • node1:192.168.56.101:9200
    • node2:192.168.56.102:9200
    • node3:192.168.56.103:9200

前提

需要先创建好模板,完成后当插入文档时,若文档的索引名符合模板,则使用模板创建

  • template属性:模板匹配,通配符*用于匹配索引
  • aliases:别名,用于查询使用
curl -XPUT "http://192.168.56.101:9200/_template/my_index_test_template" -d '{
  "template": "my_index_test*",
  "settings": {
    "number_of_shards": 3,
    "number_of_replicas": 2
  },
  "mappings": {
        "properties": {
            "createTime": {
                "type": "date",
                "format": "yyyy-MM-dd HH:mm:ss || yyyy-MM-dd || yyyy/MM/dd HH:mm:ss|| yyyy/MM/dd ||epoch_millis"
            },
            "id": {
                "type": "keyword"
            },
            "name": {
                "type": "keyword"
            }
        }
    },
  "aliases": {"my_index_test_alias":{}}
}'

项目结构

  • DemoEsTimeseriesApplication:启动类
  • ESConfig:ES配置类
  • ESService:插入/查询业务封装
  • DemoEntity:demo实体,对应es文档
  • ESController:测试控制器

POM

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

类简介

yml配置

server:
  port: 8844

elasticsearch:
    cluster-nodes: 192.168.56.101:9200,192.168.56.102:9200,192.168.56.103:9200

ESConfig

@Configuration
public class ESConfig  extends AbstractElasticsearchConfiguration {

    @Value("${elasticsearch.cluster-nodes}")
    private String nodes;

    @Override
    @Bean
    public RestHighLevelClient elasticsearchClient() {

        final ClientConfiguration clientConfiguration = ClientConfiguration.builder()
                .connectedTo(nodes.split(","))
                .build();

        return RestClients.create(clientConfiguration).rest();
    }
}

ESService

@Service
public class ESService {

    @Autowired
    private RestHighLevelClient client;

    /**
     * 保存消息
     * @param entity 消息
     * @throws IOException
     */
    public boolean save(DemoEntity entity, String index) throws IOException {
        String newIndex = getIndexForSave(index);

        IndexRequest request = new IndexRequest(newIndex);
        Map<String, Object> map = JSONObject.parseObject(JSONObject.toJSONString(entity)).getInnerMap();
        request.source(map, XContentType.JSON);
        IndexResponse iResponse = client.index(request, RequestOptions.DEFAULT);
        return "created".equals(iResponse.getResult().getLowercase());
    }

    public List<Map<String, Object>> query(String index, int page, int limit, String name, String order) throws IOException {
        String newIndex = getIndexForSearch(index);
        SearchRequest searchRequest = new SearchRequest(newIndex);

        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();

        if (!StringUtils.isEmpty(name)) {
            sourceBuilder.query(QueryBuilders.matchQuery(name, "name").minimumShouldMatch("100%"));
        }
        sourceBuilder.timeout(new TimeValue(10, TimeUnit.SECONDS));
        sourceBuilder.from((page-1)*limit);
        sourceBuilder.size(limit);

        // 排序
        switch (order) {
            case "desc":
                sourceBuilder.sort(new FieldSortBuilder("createTime").order(SortOrder.DESC));
                break;
            case "asc":
                sourceBuilder.sort(new FieldSortBuilder("createTime").order(SortOrder.ASC));
                break;
        }

        searchRequest.source(sourceBuilder);

        SearchResponse sResponse = client.search(searchRequest, RequestOptions.DEFAULT);
        List<Map<String, Object>> list = new ArrayList<>();
        Arrays.stream(sResponse.getHits().getHits()).forEach(i -> {
            Map<String, Object> map = i.getSourceAsMap();

            list.add(map);
        });
        return list;
    }

    private String getIndexForSave(String index) {
        LocalDate now = LocalDate.now();
        return index + "_" + now.getYear() + "-" + now.getMonthValue();
    }

    private String getIndexForSearch(String index) {
        return String.format("%s_alias", index);
    }
}

ESController

@Controller
public class ESController {

    @Autowired
    private ESService esService;

    private static final String MYINDEX = "my_index_test";

    @ResponseBody
    @RequestMapping("/insert")
    public String insert() {
        DemoEntity entity = new DemoEntity();
        entity.setId(UUID.randomUUID().toString());
        entity.setName("Obj-"+ new Random(10).nextInt(20));
        entity.setCreateTime(new Date());
        try {
            return esService.save(entity, MYINDEX) ? "SUCCESS" : "FAIL";
        } catch (IOException e) {
            e.printStackTrace();
            return "FAIL";
        }
    }

    @ResponseBody
    @RequestMapping("/query")
    public Object query() {
        try {
            List<Map<String, Object>> list = esService.query(MYINDEX, 1, 3, null, "desc");
            return list;
        } catch (IOException e) {
            e.printStackTrace();
            return "FAIL";
        }
    }
}

使用

  • 自行修改配置文件中的elasticsearch地址
  • 运行DemoEsTimeseriesApplication.java
  • 访问http://localhost:8844/insert,插入一条数据,会自动根据当前时间存入相应的分片索引中
  • 访问http://localhost:8844/query,查询所有分片
  • TODO:可以定期删除旧索引

参考

spring-boot-elasticsearch-timeseries-demo

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐