当前位置: 首页 > news >正文

什么直播可以做游戏视频网站吗怎么查找关键词排名

什么直播可以做游戏视频网站吗,怎么查找关键词排名,软件工程师一个月工资多少,网站托管价格一、写在前面 在实际的生产环境中#xff0c;我们经常会把Flink处理的数据写入MySQL、Doris等数据库中#xff0c;下面以MySQL为例#xff0c;使用JDBC的方式将Flink的数据实时数据写入MySQL。 二、代码示例 2.1 版本说明 flink.version1.14.6/flink.version…一、写在前面 在实际的生产环境中我们经常会把Flink处理的数据写入MySQL、Doris等数据库中下面以MySQL为例使用JDBC的方式将Flink的数据实时数据写入MySQL。 二、代码示例 2.1 版本说明 flink.version1.14.6/flink.versionspark.version2.4.3/spark.versionhadoop.version2.8.5/hadoop.versionhbase.version1.4.9/hbase.versionhive.version2.3.5/hive.versionjava.version1.8/java.versionscala.version2.11.8/scala.versionmysql.version8.0.22/mysql.versionscala.binary.version2.11/scala.binary.version2.2 导入相关依赖 dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-jdbc_2.11/artifactIdversion${flink.version}/version /dependency !--mysql连接器依赖-- dependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactIdversion8.0.22/version /dependency2.3 连接数据库创建表 mysql CREATE TABLE ws ( id varchar(100) NOT NULL,ts bigint(20) DEFAULT NULL,vc int(11) DEFAULT NULL, PRIMARY KEY (id) ) ENGINEInnoDB DEFAULT CHARSETutf82.4 创建POJO类 package com.flink.POJOs;import java.util.Objects;/*** TODO POJO类的特点* 类是公有public的* 有一个无参的构造方法* 所有属性都是公有public的* 所有属性的类型都是可以序列化的*/ public class WaterSensor {//类的公共属性public String id;public Long ts;public Integer vc;//无参构造方法public WaterSensor() {//System.out.println(调用了无参数的构造方法);}public WaterSensor(String id, Long ts, Integer vc) {this.id id;this.ts ts;this.vc vc;}//生成get和set方法public void setId(String id) {this.id id;}public void setTs(Long ts) {this.ts ts;}public void setVc(Integer vc) {this.vc vc;}public String getId() {return id;}public Long getTs() {return ts;}public Integer getVc() {return vc;}//重写toString方法Overridepublic String toString() {return WaterSensor{ id id \ , ts ts , vc vc };}//重写equals和hasCode方法Overridepublic boolean equals(Object o) {if (this o) return true;if (o null || getClass() ! o.getClass()) return false;WaterSensor that (WaterSensor) o;return id.equals(that.id) ts.equals(that.ts) vc.equals(that.vc);}Overridepublic int hashCode() {return Objects.hash(id, ts, vc);} } //scala的case类2.5 自定义map函数 package com.flink.POJOs;import org.apache.flink.api.common.functions.MapFunction;public class WaterSensorMapFunction implements MapFunctionString, WaterSensor {Overridepublic WaterSensor map(String value) throws Exception {String[] datas value.split(,);return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));} }2.5 Flink2MySQL package com.flink.DataStream.Sink;import com.flink.POJOs.WaterSensor; import com.flink.POJOs.WaterSensorMapFunction; import org.apache.flink.connector.jdbc.JdbcConnectionOptions; import org.apache.flink.connector.jdbc.JdbcExecutionOptions; import org.apache.flink.connector.jdbc.JdbcSink; import org.apache.flink.connector.jdbc.JdbcStatementBuilder; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction;import java.sql.PreparedStatement; import java.sql.SQLException;/*** Flink 输出到 MySQLJDBC*/ public class flinkSinkJdbc {public static void main(String[] args) throws Exception {//TODO 创建Flink上下文执行环境StreamExecutionEnvironment streamExecutionEnvironment StreamExecutionEnvironment.getExecutionEnvironment();streamExecutionEnvironment.setParallelism(1);//TODO SourceDataStreamSourceString dataStreamSource streamExecutionEnvironment.socketTextStream(localhost, 8888);//TODO TransferSingleOutputStreamOperatorWaterSensor waterSensorSingleOutputStreamOperator dataStreamSource.map(new WaterSensorMapFunction());/**TODO 写入 mysql* 1、只能用老的 sink 写法* 2、JDBCSink 的 4 个参数:* 第一个参数 执行的 sql一般就是 insert into* 第二个参数 预编译 sql 对占位符填充值* 第三个参数 执行选项 ----攒批、重试* 第四个参数 连接选项----url、用户名、密码*/SinkFunctionWaterSensor sinkFunction JdbcSink.sink(insert into ws values(?,?,?),new JdbcStatementBuilderWaterSensor() {Overridepublic void accept(PreparedStatement preparedStatement, WaterSensor waterSensor) throws SQLException {preparedStatement.setString(1, waterSensor.getId());preparedStatement.setLong(2, waterSensor.getTs());preparedStatement.setInt(3, waterSensor.getVc());System.out.println(数据写入成功(waterSensor.getId(),waterSensor.getTs(),waterSensor.getVc()));}}, JdbcExecutionOptions.builder().withMaxRetries(3) // 重试次数.withBatchSize(100) // 批次的大小条数.withBatchIntervalMs(3000) // 批次的时间.build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl(jdbc:mysql://localhost:3306/dw?serverTimezoneAsia/ShanghaiuseUnicodetruecharacterEncodingUTF-8).withUsername(root).withPassword(********).withConnectionCheckTimeoutSeconds(60) // 重试的超时时间.build());//TODO 写入到MysqlwaterSensorSingleOutputStreamOperator.addSink(sinkFunction);streamExecutionEnvironment.execute();} }2.6 启动necat、Flink观察数据库写入情况 nc -lk 9999 #启动necat、并监听8888端口写入数据启动Flink程序 查看数据库写入是否正常
http://www.w-s-a.com/news/769431/

相关文章:

  • 嘉定专业网站制作公司七星彩网站开发
  • 网站建设人员培训企业网站开发模型图
  • 自己开发一个网站应该怎么做国外设计网站 绿色的
  • 南昌外贸网站设计推广任务发布平台app
  • 建立网站成本书店网站建设可行性分析
  • 高端网站设计官网乌海学校网站建设
  • 哪些网站适合新手编程做项目优秀网页设计赏析
  • 永州网站seo德阳网站建设优化
  • 网站建设高端网站本地建设网站软件下载
  • 网站后台账号密码破解杭州酒店网站设计公司推荐
  • 和县网站开发秦皇岛建设工程信息网站
  • 国外网站用什么dns好建一个下载网站要什么cms系统
  • 礼品工艺品网站建设手机做网站哪家好
  • 泉州网站建设方案维护怎样选择网站建设
  • 江苏建站速度忿先进的网站建设
  • 广州天河建站公司com域名注册多少钱
  • 成都网站建设推广好vs2013如何做网站
  • 茶叶网站建设模板企业网站备案要多少钱
  • 怎么查网站找谁做的win主机伪静态规则 wordpress
  • 轻云服务器菁英版 多个网站北京it外包服务商
  • 售后服务 网站建设阳江seo优化
  • 网站建设后怎么赚钱wordpress调用导航栏
  • 特产网站设计六色网站
  • 服务器网站备案做网站公司如何赚钱
  • 怎样进行站点优化荣成市有做网站的吗
  • 合肥建设工会网站芜湖做网站建设公司
  • 玉林市住房和城乡建设局网站网站开发百灵鸟
  • 网站怎么做双机房切换建设部网站2015年第158号
  • 郑州服务设计公司网站色块的网站
  • 网站设计所用到的技术做网站添加mp3