Elasticsearch作为时序数据库使用(Springboot版)
Spring-boot-elasticsearch-timeseries-demo该项目作为Elasticsearch用于时序数据库用途的demo,核心是使用ES模板来快速生成分片索引,查询时借助索引别名完成分片的聚合查询,本例按月进行分片,例如my_index_test_2021-6环境ES集群(7.1.1):node1:192.168.56.101:9200node2:192.168.56.1
·
Spring-boot-elasticsearch-timeseries-demo
该项目作为Elasticsearch用于时序数据库用途的demo,核心是使用ES模板来快速生成分片索引,查询时借助索引别名完成分片的聚合查询,本例按月进行分片,例如my_index_test_2021-6
环境
- ES集群(7.1.1):
- node1:192.168.56.101:9200
- node2:192.168.56.102:9200
- node3:192.168.56.103:9200
前提
需要先创建好模板,完成后当插入文档时,若文档的索引名符合模板,则使用模板创建
- template属性:模板匹配,通配符*用于匹配索引
- aliases:别名,用于查询使用
curl -XPUT "http://192.168.56.101:9200/_template/my_index_test_template" -d '{
"template": "my_index_test*",
"settings": {
"number_of_shards": 3,
"number_of_replicas": 2
},
"mappings": {
"properties": {
"createTime": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss || yyyy-MM-dd || yyyy/MM/dd HH:mm:ss|| yyyy/MM/dd ||epoch_millis"
},
"id": {
"type": "keyword"
},
"name": {
"type": "keyword"
}
}
},
"aliases": {"my_index_test_alias":{}}
}'
项目结构
- DemoEsTimeseriesApplication:启动类
- ESConfig:ES配置类
- ESService:插入/查询业务封装
- DemoEntity:demo实体,对应es文档
- ESController:测试控制器
POM
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
类简介
yml配置
server:
port: 8844
elasticsearch:
cluster-nodes: 192.168.56.101:9200,192.168.56.102:9200,192.168.56.103:9200
ESConfig
@Configuration
public class ESConfig extends AbstractElasticsearchConfiguration {
@Value("${elasticsearch.cluster-nodes}")
private String nodes;
@Override
@Bean
public RestHighLevelClient elasticsearchClient() {
final ClientConfiguration clientConfiguration = ClientConfiguration.builder()
.connectedTo(nodes.split(","))
.build();
return RestClients.create(clientConfiguration).rest();
}
}
ESService
@Service
public class ESService {
@Autowired
private RestHighLevelClient client;
/**
* 保存消息
* @param entity 消息
* @throws IOException
*/
public boolean save(DemoEntity entity, String index) throws IOException {
String newIndex = getIndexForSave(index);
IndexRequest request = new IndexRequest(newIndex);
Map<String, Object> map = JSONObject.parseObject(JSONObject.toJSONString(entity)).getInnerMap();
request.source(map, XContentType.JSON);
IndexResponse iResponse = client.index(request, RequestOptions.DEFAULT);
return "created".equals(iResponse.getResult().getLowercase());
}
public List<Map<String, Object>> query(String index, int page, int limit, String name, String order) throws IOException {
String newIndex = getIndexForSearch(index);
SearchRequest searchRequest = new SearchRequest(newIndex);
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
if (!StringUtils.isEmpty(name)) {
sourceBuilder.query(QueryBuilders.matchQuery(name, "name").minimumShouldMatch("100%"));
}
sourceBuilder.timeout(new TimeValue(10, TimeUnit.SECONDS));
sourceBuilder.from((page-1)*limit);
sourceBuilder.size(limit);
// 排序
switch (order) {
case "desc":
sourceBuilder.sort(new FieldSortBuilder("createTime").order(SortOrder.DESC));
break;
case "asc":
sourceBuilder.sort(new FieldSortBuilder("createTime").order(SortOrder.ASC));
break;
}
searchRequest.source(sourceBuilder);
SearchResponse sResponse = client.search(searchRequest, RequestOptions.DEFAULT);
List<Map<String, Object>> list = new ArrayList<>();
Arrays.stream(sResponse.getHits().getHits()).forEach(i -> {
Map<String, Object> map = i.getSourceAsMap();
list.add(map);
});
return list;
}
private String getIndexForSave(String index) {
LocalDate now = LocalDate.now();
return index + "_" + now.getYear() + "-" + now.getMonthValue();
}
private String getIndexForSearch(String index) {
return String.format("%s_alias", index);
}
}
ESController
@Controller
public class ESController {
@Autowired
private ESService esService;
private static final String MYINDEX = "my_index_test";
@ResponseBody
@RequestMapping("/insert")
public String insert() {
DemoEntity entity = new DemoEntity();
entity.setId(UUID.randomUUID().toString());
entity.setName("Obj-"+ new Random(10).nextInt(20));
entity.setCreateTime(new Date());
try {
return esService.save(entity, MYINDEX) ? "SUCCESS" : "FAIL";
} catch (IOException e) {
e.printStackTrace();
return "FAIL";
}
}
@ResponseBody
@RequestMapping("/query")
public Object query() {
try {
List<Map<String, Object>> list = esService.query(MYINDEX, 1, 3, null, "desc");
return list;
} catch (IOException e) {
e.printStackTrace();
return "FAIL";
}
}
}
使用
- 自行修改配置文件中的elasticsearch地址
- 运行DemoEsTimeseriesApplication.java
- 访问http://localhost:8844/insert,插入一条数据,会自动根据当前时间存入相应的分片索引中
- 访问http://localhost:8844/query,查询所有分片
- TODO:可以定期删除旧索引
参考
更多推荐
已为社区贡献1条内容
所有评论(0)