展馆的科普网站建设,wordpress收到登录错误,如何微信小程序注册,软文广告经典案例ES的使用#xff08;Elasticsearch#xff09; es是什么#xff1f; es是非关系型数据库#xff0c;是分布式文档数据库#xff0c;本质上是一个JSON 文本 为什么要用es? 搜索速度快#xff0c;近乎是实时的存储、检索数据 怎么使用es? 1.下载es的包#xff08;环境要…ES的使用Elasticsearch es是什么 es是非关系型数据库是分布式文档数据库本质上是一个JSON 文本 为什么要用es? 搜索速度快近乎是实时的存储、检索数据 怎么使用es? 1.下载es的包环境要是jdk1.8及以上我的资源中有 2.下载es的可视化界面包我的资源中有 3.java编写es的工具类 es与关系型数据库对比 1.下载es的包解压运行bat文件windows
下载地址es官网下载地址 elasticsearch.yml配置允许跨域
http.cors.enabled: true
http.cors.allow-origin: *2.下载es的可视化界面包解压使用命令npm run start
下载地址elasticsearch-head-master es可视化工具 打开http:localhost:9100
3.java编写es的工具类
引入es的依赖包 dependencygroupIdorg.elasticsearch.client/groupIdartifactIdelasticsearch-rest-high-level-client/artifactIdversion6.2.4/version/dependencydependencygroupIdorg.elasticsearch.client/groupIdartifactIdelasticsearch-rest-client/artifactIdversion6.2.4/version/dependencydependencygroupIdorg.elasticsearch/groupIdartifactIdelasticsearch/artifactIdversion6.2.4/version/dependencypackage com.next.service;import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpHost;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.message.BasicHeader;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.get.MultiGetRequest;
import org.elasticsearch.action.get.MultiGetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Service;Service
Slf4j
public class ESClient implements ApplicationListenerContextRefreshedEvent {private final static int CONNECT_TIMEOUT 100;private final static int SOCKET_TIMEOUT 60 * 1000;private final static int REQUEST_TIMEOUT SOCKET_TIMEOUT;private RestHighLevelClient restHighLevelClient; //JDK8及以上private BasicHeader[] basicHeaders;Overridepublic void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {try {initClient();} catch (Exception e) {log.error(es client init exception, e);try {Thread.sleep(1000);} catch (Exception e1) {}initClient();}}private void initClient() {log.info(es client init start);//请求头时允许的格式basicHeaders new BasicHeader[]{new BasicHeader(Accept, application/json;charsetUTF-8)};//es客户端连接设置初始化RestClientBuilder builder RestClient.builder(new HttpHost(127.0.0.1, 9200, http));builder.setDefaultHeaders(basicHeaders)//设置相关超时间配置.setRequestConfigCallback((RequestConfig.Builder configBuilder) - {configBuilder.setConnectTimeout(CONNECT_TIMEOUT);configBuilder.setSocketTimeout(SOCKET_TIMEOUT);configBuilder.setConnectionRequestTimeout(REQUEST_TIMEOUT);return configBuilder;});restHighLevelClient new RestHighLevelClient(builder);log.info(es client init end);}//es新增操作public IndexResponse index(IndexRequest indexRequest) throws Exception {try {return restHighLevelClient.index(indexRequest);} catch (Exception e) {log.error(es.index exception,indexRequest:{}, indexRequest, e);throw e;}}//更新操作public UpdateResponse update(UpdateRequest updateRequest) throws Exception {try {return restHighLevelClient.update(updateRequest, basicHeaders);} catch (Exception e) {log.error(es.update exception,updateRequest:{}, updateRequest, e);throw e;}}//查询public GetResponse get(GetRequest getRequest) throws Exception {try {return restHighLevelClient.get(getRequest, basicHeaders);} catch (Exception e) {log.error(es.get exception,updateRequest:{}, getRequest, e);throw e;}}//多个查询请求放在一起查public MultiGetResponse multiGet(MultiGetRequest multiGetRequest) throws Exception {try {return restHighLevelClient.multiGet(multiGetRequest);} catch (Exception e) {log.error(es.multiGet exception,getRequest:{}, multiGetRequest, e);throw e;}}/*** desc 批量更新*/public BulkResponse bulk(BulkRequest bulkRequest) throws Exception {try {return restHighLevelClient.bulk(bulkRequest,basicHeaders);} catch (Exception e) {log.error(es.multiGet exception,bulkRequest:{}, bulkRequest, e);throw e;}}
}
es启动
4.使用例子 将车次信息存到es中方便用户查询(从此地到目的地有哪些车可以乘坐) package com.next.service;import com.alibaba.google.common.base.Splitter;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.next.common.TrainEsConstant;
import com.next.dao.TrainNumberDetailMapper;
import com.next.dao.TrainNumberMapper;
import com.next.model.TrainNumber;
import com.next.model.TrainNumberDetail;
import com.next.util.JsonMapper;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.get.*;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.util.set.Sets;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;import javax.annotation.Resource;
import java.util.List;
import java.util.Map;
import java.util.Set;Service
Slf4j
public class TrainNumberService {Resourceprivate TrainNumberMapper trainNumberMapper;Resourceprivate TrainCacheService trainCacheService;Resourceprivate TrainNumberDetailMapper trainNumberDetailMapper;Resourceprivate ESClient esClient;public void handle(ListCanalEntry.Column columns, CanalEntry.EventType eventType) throws Exception{if (eventType ! CanalEntry.EventType.UPDATE) {log.info(not update,no need care);return;}int trainNumberId 0;//获取数据库的trainNumberIdfor (CanalEntry.Column column : columns) {if (column.getName().equals(id)) {trainNumberId Integer.parseInt(column.getValue());break;}}TrainNumber trainNumber trainNumberMapper.selectByPrimaryKey(trainNumberId);//校验是否有车次if (null trainNumber) {log.error(not found trainNumber,trainNumberId:{}, trainNumberId);return;}ListTrainNumberDetail detailList trainNumberDetailMapper.getByTrainNumberId(trainNumberId);//校验是否有车次详情if (CollectionUtils.isEmpty(detailList)) {log.warn(no detail,no need care,trainNumberId:{}, trainNumber.getName());return;}//将数据写入缓存中trainCacheService.set(TN_ trainNumber.getName(), JsonMapper.obj2String(detailList));log.info(trainNumber:{} detailList update redis, trainNumber.getName());//将数据存入es中saveES(detailList,trainNumber);log.info(trainNumber:{} detailList update es, trainNumber.getName());}//数据保存到es(客户需要查询的数据放到es---从此地到目的地有哪些车可以乘坐)private void saveES(ListTrainNumberDetail detailList, TrainNumber trainNumber) throws Exception{/*** A-B fromStationId- toStationId* 例北京到大连有多少趟车?* 根据车站的开始结束站去找车次即根据fromStationId- toStationId获取到 trainNumberId1,trainNumberId2。。。。* trainNumber: A-B-C* D386:北京-锦州-大连* D387:北京-鞍山-大连** 拆分如下* D386: 北京-锦州 锦州-大连 北京-大连* D387: 北京-鞍山 鞍山-大连 北京-大连*/ListString list Lists.newArrayList();int fromStationId trainNumber.getFromStationId();if (detailList.size() 1) {//单段int toStationId trainNumber.getToStationId();list.add(fromStationId _ toStationId);} else {//多段,枚举所有的车次,要保证多段有序for (int i 0; i detailList.size(); i) {//获取开始车站idint tempFromStationId detailList.get(i).getFromStationId();for (int j i; j detailList.size(); j) {//获取到达车站idint tempToStationId detailList.get(j).getToStationId();list.add(tempFromStationId_tempToStationId);}}}//检查数据是否已经存在存在则不新增不存在则新增//★如果是for循环里面的话要封装成批量操作IOMultiGetRequest multiGetRequest new MultiGetRequest();BulkRequest bulkRequest new BulkRequest();for(String item:list){multiGetRequest.add(new MultiGetRequest.Item(TrainEsConstant.INDEX,TrainEsConstant.TYPE,item));}//获取处理后的结果MultiGetResponse multiGetItemResponses esClient.multiGet(multiGetRequest);for(MultiGetItemResponse itemResponse:multiGetItemResponses.getResponses()){if(itemResponse.isFailed()){log.error(multiGet item failed,itemResponse:{},itemResponse);continue;}GetResponse getResponse itemResponse.getResponse();if(getResponse null){log.error(multiGet item is null,itemResponse:{},itemResponse);continue;}//存储更新es的数据新增用source传入数据 更新用doc传入数据MapString,Object dataMap Maps.newHashMap();MapString,Object map getResponse.getSourceAsMap();if(!getResponse.isExists() || map null){//add indexdataMap.put(TrainEsConstant.COLUMN_TRAIN_NUMBER,trainNumber.getName());IndexRequest indexRequest new IndexRequest(TrainEsConstant.INDEX,TrainEsConstant.TYPE,getResponse.getId()).source(dataMap);bulkRequest.add(indexRequest);continue;}//里面是车次信息 trainNumberId1,trainNumberId2。。。。需要拆分String origin (String) map.get(TrainEsConstant.COLUMN_TRAIN_NUMBER);SetString set Sets.newHashSet(Splitter.on(,).trimResults().omitEmptyStrings().split(origin));if(!set.contains(trainNumber.getName())){//update indexdataMap.put(TrainEsConstant.COLUMN_TRAIN_NUMBER,origin,trainNumber.getName());UpdateRequest updateRequest new UpdateRequest(TrainEsConstant.INDEX,TrainEsConstant.TYPE,getResponse.getId()).doc(dataMap);bulkRequest.add(updateRequest);}}//批量更新es的数据(bulkResponse是批量对象转成string打印日志)BulkResponse bulkResponse esClient.bulk(bulkRequest);log.info(es bulk response:{},JsonMapper.obj2String(bulkResponse));if(bulkResponse.hasFailures()){throw new RuntimeException(es bulk failure);}}}
车次表 车次明细表 修改数据库中车次表的信息会将数据处理后出发站-到达站 车次号存入es