jsp网站开发中常见问题,阿里巴巴网站如何做固定背景,有人看片吗免费观看,青海省教育厅门户网站首页提示#xff1a;文章写完后#xff0c;目录可以自动生成#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、写入到Elasticsearch5二、写入到Elasticsearch7总结 前言
Flink sink 流数据写入到es5和es7的简单示例。 一、写入到Elasticsearch5
pom maven依赖 d… 提示文章写完后目录可以自动生成如何生成可参考右边的帮助文档 文章目录 前言一、写入到Elasticsearch5二、写入到Elasticsearch7总结 前言
Flink sink 流数据写入到es5和es7的简单示例。 一、写入到Elasticsearch5
pom maven依赖 dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-elasticsearch5_2.11/artifactIdversion${flink.version}/version/dependency代码如下示例
public class Es5SinkDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();Row rowRow.of(张三,001,getTimestamp(2016-10-24 21:59:06));Row row2Row.of(张三,002,getTimestamp(2016-10-24 21:50:06));Row row3Row.of(张三,002,getTimestamp(2016-10-24 21:51:06));Row row4Row.of(李四,003,getTimestamp(2016-10-24 21:50:56));Row row5Row.of(李四,004,getTimestamp(2016-10-24 00:48:36));Row row6Row.of(王五,005,getTimestamp(2016-10-24 00:48:36));DataStreamSourceRow source env.fromElements(row,row2,row3,row4,row5,row6);MapString, String config new HashMap();
// config.put(cluster.name, my-cluster-name);
// config.put(bulk.flush.max.actions, 1);ListInetSocketAddress transportAddresses new ArrayList();transportAddresses.add(new InetSocketAddress(InetAddress.getByName(10.68.8.60), 9300));//Sink操作DataStreamSinkRow rowDataStreamSink source.addSink(new ElasticsearchSink(config, transportAddresses, new ElasticsearchSinkFunctionRow() {public IndexRequest createIndexRequest(Row element) {MapString, Object json new HashMap();json.put(name22, element.getField(0).toString());json.put(no22, element.getField(1));json.put(age, 34);json.put(create_time, element.getField(2));return Requests.indexRequest().index(cc).type(mtype).id(element.getField(1).toString()).source(json);}Overridepublic void process(Row element, RuntimeContext ctx, RequestIndexer indexer) {//利用requestIndexer进行发送请求写入数据indexer.add(createIndexRequest(element));}}));env.execute(es demo);}private static java.sql.Timestamp getTimestamp(String str) throws Exception {
// String string 2016-10-24 21:59:06;SimpleDateFormat sdf new SimpleDateFormat(yyyy-MM-dd HH:mm:ss);java.util.Date datesdf.parse(str);java.sql.Timestamp s new java.sql.Timestamp(date.getTime());return s;}二、写入到Elasticsearch7
pom maven依赖 dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-elasticsearch7_2.11/artifactIdversion${flink.version}/versionscopeprovided/scope/dependency
代码如下示例
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.flink.types.Row;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class EsSinkDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();Row rowRow.of(张三,001,getTimestamp(2016-10-24 21:59:06));Row row2Row.of(张三,002,getTimestamp(2016-10-24 21:50:06));Row row3Row.of(张三,002,getTimestamp(2016-10-24 21:51:06));Row row4Row.of(李四,003,getTimestamp(2016-10-24 21:50:56));Row row5Row.of(李四,004,getTimestamp(2016-10-24 00:48:36));Row row6Row.of(王五,005,getTimestamp(2016-10-24 00:48:36));DataStreamSourceRow source env.fromElements(row,row2,row3,row4,row5,row6);MapString, String config new HashMap();
// config.put(cluster.name, my-cluster-name);
// This instructs the sink to emit after every element, otherwise they would be buffered
// config.put(bulk.flush.max.actions, 1);ListHttpHost hosts new ArrayList();hosts.add(new HttpHost(10.68.8.69,9200,http));ElasticsearchSink.BuilderRow esSinkBuilder new ElasticsearchSink.BuilderRow(hosts,new ElasticsearchSinkFunctionRow() {public IndexRequest createIndexRequest(Row element) {MapString, Object json new HashMap();json.put(name22, element.getField(0).toString());json.put(no22, element.getField(1));json.put(age, 34);
// json.put(create_time, element.getField(2));return Requests.indexRequest().index(cc).id(element.getField(1).toString()).source(json);}Overridepublic void process(Row element, RuntimeContext ctx, RequestIndexer indexer) {//利用requestIndexer进行发送请求写入数据indexer.add(createIndexRequest(element));}});esSinkBuilder.setBulkFlushMaxActions(100);//Sink操作source.addSink(esSinkBuilder.build());env.execute(es demo);}private static java.sql.Timestamp getTimestamp(String str) throws Exception {
// String string 2016-10-24 21:59:06;SimpleDateFormat sdf new SimpleDateFormat(yyyy-MM-dd HH:mm:ss);java.util.Date datesdf.parse(str);java.sql.Timestamp s new java.sql.Timestamp(date.getTime());return s;}
} 总结
flink写入es5和es7 的区别是引入不同的flink-connector-elasticsearch,es7已没有type的概念故无需再设置type。