当前位置: 首页 > news >正文

网站建设与管理读书心得南通市区有哪几家做网站的

网站建设与管理读书心得,南通市区有哪几家做网站的,wordpress中文seo插件,做视频网站技术壁垒在哪里一、写在前面 在实际的生产环境中#xff0c;我们经常会把Flink处理的数据写入MySQL、Doris等数据库中#xff0c;下面以MySQL为例#xff0c;使用JDBC的方式将Flink的数据实时数据写入MySQL。 二、代码示例 2.1 版本说明 flink.version1.14.6/flink.version…一、写在前面 在实际的生产环境中我们经常会把Flink处理的数据写入MySQL、Doris等数据库中下面以MySQL为例使用JDBC的方式将Flink的数据实时数据写入MySQL。 二、代码示例 2.1 版本说明 flink.version1.14.6/flink.versionspark.version2.4.3/spark.versionhadoop.version2.8.5/hadoop.versionhbase.version1.4.9/hbase.versionhive.version2.3.5/hive.versionjava.version1.8/java.versionscala.version2.11.8/scala.versionmysql.version8.0.22/mysql.versionscala.binary.version2.11/scala.binary.version2.2 导入相关依赖 dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-jdbc_2.11/artifactIdversion${flink.version}/version /dependency !--mysql连接器依赖-- dependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactIdversion8.0.22/version /dependency2.3 连接数据库创建表 mysql CREATE TABLE ws ( id varchar(100) NOT NULL,ts bigint(20) DEFAULT NULL,vc int(11) DEFAULT NULL, PRIMARY KEY (id) ) ENGINEInnoDB DEFAULT CHARSETutf82.4 创建POJO类 package com.flink.POJOs;import java.util.Objects;/*** TODO POJO类的特点* 类是公有public的* 有一个无参的构造方法* 所有属性都是公有public的* 所有属性的类型都是可以序列化的*/ public class WaterSensor {//类的公共属性public String id;public Long ts;public Integer vc;//无参构造方法public WaterSensor() {//System.out.println(调用了无参数的构造方法);}public WaterSensor(String id, Long ts, Integer vc) {this.id id;this.ts ts;this.vc vc;}//生成get和set方法public void setId(String id) {this.id id;}public void setTs(Long ts) {this.ts ts;}public void setVc(Integer vc) {this.vc vc;}public String getId() {return id;}public Long getTs() {return ts;}public Integer getVc() {return vc;}//重写toString方法Overridepublic String toString() {return WaterSensor{ id id \ , ts ts , vc vc };}//重写equals和hasCode方法Overridepublic boolean equals(Object o) {if (this o) return true;if (o null || getClass() ! o.getClass()) return false;WaterSensor that (WaterSensor) o;return id.equals(that.id) ts.equals(that.ts) vc.equals(that.vc);}Overridepublic int hashCode() {return Objects.hash(id, ts, vc);} } //scala的case类2.5 自定义map函数 package com.flink.POJOs;import org.apache.flink.api.common.functions.MapFunction;public class WaterSensorMapFunction implements MapFunctionString, WaterSensor {Overridepublic WaterSensor map(String value) throws Exception {String[] datas value.split(,);return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));} }2.5 Flink2MySQL package com.flink.DataStream.Sink;import com.flink.POJOs.WaterSensor; import com.flink.POJOs.WaterSensorMapFunction; import org.apache.flink.connector.jdbc.JdbcConnectionOptions; import org.apache.flink.connector.jdbc.JdbcExecutionOptions; import org.apache.flink.connector.jdbc.JdbcSink; import org.apache.flink.connector.jdbc.JdbcStatementBuilder; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction;import java.sql.PreparedStatement; import java.sql.SQLException;/*** Flink 输出到 MySQLJDBC*/ public class flinkSinkJdbc {public static void main(String[] args) throws Exception {//TODO 创建Flink上下文执行环境StreamExecutionEnvironment streamExecutionEnvironment StreamExecutionEnvironment.getExecutionEnvironment();streamExecutionEnvironment.setParallelism(1);//TODO SourceDataStreamSourceString dataStreamSource streamExecutionEnvironment.socketTextStream(localhost, 8888);//TODO TransferSingleOutputStreamOperatorWaterSensor waterSensorSingleOutputStreamOperator dataStreamSource.map(new WaterSensorMapFunction());/**TODO 写入 mysql* 1、只能用老的 sink 写法* 2、JDBCSink 的 4 个参数:* 第一个参数 执行的 sql一般就是 insert into* 第二个参数 预编译 sql 对占位符填充值* 第三个参数 执行选项 ----攒批、重试* 第四个参数 连接选项----url、用户名、密码*/SinkFunctionWaterSensor sinkFunction JdbcSink.sink(insert into ws values(?,?,?),new JdbcStatementBuilderWaterSensor() {Overridepublic void accept(PreparedStatement preparedStatement, WaterSensor waterSensor) throws SQLException {preparedStatement.setString(1, waterSensor.getId());preparedStatement.setLong(2, waterSensor.getTs());preparedStatement.setInt(3, waterSensor.getVc());System.out.println(数据写入成功(waterSensor.getId(),waterSensor.getTs(),waterSensor.getVc()));}}, JdbcExecutionOptions.builder().withMaxRetries(3) // 重试次数.withBatchSize(100) // 批次的大小条数.withBatchIntervalMs(3000) // 批次的时间.build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl(jdbc:mysql://localhost:3306/dw?serverTimezoneAsia/ShanghaiuseUnicodetruecharacterEncodingUTF-8).withUsername(root).withPassword(********).withConnectionCheckTimeoutSeconds(60) // 重试的超时时间.build());//TODO 写入到MysqlwaterSensorSingleOutputStreamOperator.addSink(sinkFunction);streamExecutionEnvironment.execute();} }2.6 启动necat、Flink观察数据库写入情况 nc -lk 9999 #启动necat、并监听8888端口写入数据启动Flink程序 查看数据库写入是否正常
http://www.w-s-a.com/news/62760/

相关文章:

  • 做高防鞋 哪个网站能上架net网站开发net网站开发
  • 做网站公司郑州推广计划步骤
  • 网站建设计无形资产外国做美食视频网站
  • 创立一个网站需要什么网推技巧
  • 网站的会员功能怎么做wordpress主题开拓右边栏
  • 做个一般的网站要多少钱nas 建网站
  • 网页设计作品源代码彼岸花坊网站seo测评
  • 用什么软件做动漫视频网站好环保网站设计价格
  • 合肥网站设计服投稿网站源码
  • 为什么很多网站用php做上海口碑最好的装修公司排名
  • 运城网站推广找人做小程序要多少钱
  • 做外链哪个网站好seo诊断网站
  • 网站建设与管理考查方案上海公司免费起名
  • 哪个网站做h5好做汽车网站
  • 汝州网站制作住房和城乡建设部官网进行查询
  • 怎么做整人点不完的网站获取网站访客qq号码源码
  • 自建网站软件网站如何减少404跳转
  • 我想学制作网站吗公司起名网站十大排名
  • 广州白云手机网站建设淘宝店铺怎么推广
  • 青海省住房与城乡建设厅网站珠海高端网站制作公司
  • 深圳个性化建网站公司简便网站建设
  • 网站安全狗十大免费ppt网站在线
  • 进网站后台显示空白图片模板 网站源码
  • dedecms 英文网站怎么在网站上做模式题库
  • 轻网站怎么建立国外做评论的网站
  • 拉米拉网站建设乐清网站网站建设
  • 获取网站全站代码申请免费域名的方法
  • 网站制作建设公司哪家好wordpress仪表盘打不开
  • 最佳网站制作模板用手机能创建网站吗
  • 只做黑白摄影的网站网站建设好后给领导作介绍