当前位置: 首页 > news >正文

工具类网站如何做排名深圳网站建设推荐

工具类网站如何做排名,深圳网站建设推荐,西安网页制作培训,男的女的做那个的视频网站该文档演示了fink windows的操作DEMO 环境准备#xff1a; kafka本地运行#xff1a;kafka部署自动生成名字代码#xff1a;随机名自动生成随机IP代码#xff1a;随机IPFlink 1.18 测试数据 自动向kafka推送数据 import cn.hutool.core.date.DateUtil; import com.alibab…该文档演示了fink windows的操作DEMO 环境准备 kafka本地运行kafka部署自动生成名字代码随机名自动生成随机IP代码随机IPFlink 1.18 测试数据 自动向kafka推送数据 import cn.hutool.core.date.DateUtil; import com.alibaba.fastjson2.JSONObject; import com.wfg.flink.example.dto.KafkaPvDto; import com.wfg.flink.example.utils.RandomGeneratorUtils; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord;import java.time.LocalDateTime; import java.util.Properties; import java.util.Random; import java.util.UUID; import java.util.concurrent.CompletableFuture;import static com.wfg.flink.example.constants.Constants.KAFKA_BROKERS; import static com.wfg.flink.example.constants.Constants.TOPIC_NAME;public class KafkaTestProducer {public static void main(String[] args) {Properties props new Properties();props.put(bootstrap.servers, KAFKA_BROKERS);props.put(key.serializer, org.apache.kafka.common.serialization.StringSerializer);props.put(value.serializer, org.apache.kafka.common.serialization.StringSerializer);try (ProducerString, String producer new KafkaProducer(props)) {int times 100000;for (int i 0; i times; i) {System.out.println(Send No. : i);CompletableFuture.allOf(CompletableFuture.runAsync(() - sendKafkaMsg(producer)),CompletableFuture.runAsync(() - sendKafkaMsg(producer)),CompletableFuture.runAsync(() - sendKafkaMsg(producer)),CompletableFuture.runAsync(() - sendKafkaMsg(producer)),CompletableFuture.runAsync(() - sendKafkaMsg(producer)),CompletableFuture.runAsync(() - sendKafkaMsg(producer)),CompletableFuture.runAsync(() - sendKafkaMsg(producer)),CompletableFuture.runAsync(() - sendKafkaMsg(producer)),CompletableFuture.runAsync(() - sendKafkaMsg(producer))).join();producer.flush();Random random new Random();int randomNumber random.nextInt(7); // 生成一个0到6的随机数Thread.sleep(1000 * randomNumber);}} catch (InterruptedException e) {throw new RuntimeException(e);}}private static void sendKafkaMsg(ProducerString, String producer) {String msg createMsg();System.out.println(msg);producer.send(new ProducerRecord(TOPIC_NAME, UUID.randomUUID().toString().replaceAll(-, ), msg));}private static String createMsg() {KafkaPvDto dto new KafkaPvDto();dto.setUuid(UUID.randomUUID().toString().replaceAll(-, ));dto.setUserName(RandomGeneratorUtils.generateRandomFullName());dto.setVisitIp(RandomGeneratorUtils.generateRandomIp()); // DateTime begin DateUtil.beginOfDay(new Date()); // String timeStr DateUtil.format(RandomGeneratorUtils.generateRandomDateTime(LocalDateTimeUtil.of(begin).toLocalDate(), LocalDate.now()), yyyy-MM-dd HH:mm:ss);String timeStr DateUtil.format(LocalDateTime.now(), yyyy-MM-dd HH:mm:ss);dto.setVisitTime(timeStr);dto.setVisitServiceIp(RandomGeneratorUtils.generateRandomIp());return JSONObject.toJSONString(dto);} }注意 kafka本地运行kafka部署自动生成名字代码随机名自动生成随机IP代码随机IP FLINK 数据 /**** author wfg*/ Slf4j public class DataSplitter implements FlatMapFunctionString, Tuple2String, Integer {Overridepublic void flatMap(String value, CollectorTuple2String, Integer collector) {KafkaPvDto data JSONObject.parseObject(value, KafkaPvDto.class);if (data ! null) {collector.collect(new Tuple2(data.getUserName(), 1));}} } 基于时间窗口 *** Desc: Flink Window 学习*/ Slf4j public class WindowsDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();String brokers localhost:9092;KafkaSourceString source KafkaSource.Stringbuilder().setBootstrapServers(brokers).setTopics(TOPIC_NAME).setGroupId(my-group).setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSourceString data env.fromSource(source, WatermarkStrategy.noWatermarks(), wfgxxx);//基于时间窗口data.flatMap(new DataSplitter()).keyBy(1).timeWindow(Time.seconds(30)).sum(0).print();*/env.execute(flink window example);} } 基于滑动时间窗口 /*** Desc: Flink Window 学习*/ Slf4j public class WindowsDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();String brokers localhost:9092;KafkaSourceString source KafkaSource.Stringbuilder().setBootstrapServers(brokers).setTopics(TOPIC_NAME).setGroupId(my-group).setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSourceString data env.fromSource(source, WatermarkStrategy.noWatermarks(), wfgxxx);//基于滑动时间窗口data.flatMap(new DataSplitter()).keyBy(1).timeWindow(Time.seconds(60), Time.seconds(30)).sum(0).print();env.execute(flink window example);} }基于事件数量窗口 /*** Desc: Flink Window 学习*/ Slf4j public class WindowsDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();String brokers localhost:9092;KafkaSourceString source KafkaSource.Stringbuilder().setBootstrapServers(brokers).setTopics(TOPIC_NAME).setGroupId(my-group).setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSourceString data env.fromSource(source, WatermarkStrategy.noWatermarks(), wfgxxx);//基于事件数量窗口data.flatMap(new DataSplitter()).keyBy(1).countWindow(3).sum(0).print();env.execute(flink window example);} }基于事件数量滑动窗口 /*** Desc: Flink Window 学习*/ Slf4j public class WindowsDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();String brokers localhost:9092;KafkaSourceString source KafkaSource.Stringbuilder().setBootstrapServers(brokers).setTopics(TOPIC_NAME).setGroupId(my-group).setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSourceString data env.fromSource(source, WatermarkStrategy.noWatermarks(), wfgxxx);//基于事件数量滑动窗口data.flatMap(new DataSplitter()).keyBy(1).countWindow(4, 3).sum(0).print();*env.execute(flink window example);} }基于会话时间窗口 /*** Desc: Flink Window 学习*/ Slf4j public class WindowsDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();String brokers localhost:9092;KafkaSourceString source KafkaSource.Stringbuilder().setBootstrapServers(brokers).setTopics(TOPIC_NAME).setGroupId(my-group).setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSourceString data env.fromSource(source, WatermarkStrategy.noWatermarks(), wfgxxx);//基于会话时间窗口data.flatMap(new DataSplitter()).keyBy(v-v.f0).window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))//表示如果 5s 内没出现数据则认为超出会话时长然后计算这个窗口的和.sum(1).print();env.execute(flink window example);} }滚动窗口Tumbling Window 滚动窗口Tumbling Window /*** Desc: Flink Window 学习*/ Slf4j public class WindowsDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();String brokers localhost:9092;KafkaSourceString source KafkaSource.Stringbuilder().setBootstrapServers(brokers).setTopics(TOPIC_NAME).setGroupId(my-group).setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSourceString data env.fromSource(source, WatermarkStrategy.noWatermarks(), wfgxxx);//滚动窗口Tumbling Window 基于处理时间的 30 秒滚动窗口data.flatMap(new DataSplitter()).keyBy(v-v.f0).window(TumblingProcessingTimeWindows.of(Time.seconds(30))).sum(1).print();;env.execute(flink window example);} }基于事件时间 /*** Desc: Flink Window 学习*/ Slf4j public class WindowsDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();String brokers localhost:9092;KafkaSourceString source KafkaSource.Stringbuilder().setBootstrapServers(brokers).setTopics(TOPIC_NAME).setGroupId(my-group).setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSourceString data env.fromSource(source, WatermarkStrategy.noWatermarks(), wfgxxx);// 基于事件时间的 30 秒滚动窗口data.flatMap(new DataSplitter()).keyBy(v-v.f0).assignTimestampsAndWatermarks(/* 分配时间戳和水印 */).window(TumblingEventTimeWindows.of(Time.seconds(30))).sum(1).print();env.execute(flink window example);} }滑动窗口Sliding Window 基于处理时间 /*** Desc: Flink Window 学习*/ Slf4j public class WindowsDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();String brokers localhost:9092;KafkaSourceString source KafkaSource.Stringbuilder().setBootstrapServers(brokers).setTopics(TOPIC_NAME).setGroupId(my-group).setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSourceString data env.fromSource(source, WatermarkStrategy.noWatermarks(), wfgxxx);// 基于处理时间的 30 秒滑动窗口滑动间隔为 10 秒data.flatMap(new DataSplitter()).keyBy(v-v.f0).window(SlidingProcessingTimeWindows.of(Time.seconds(30), Time.seconds(10))).sum(1).print();env.execute(flink window example);} }基于事件时间 /*** Desc: Flink Window 学习*/ Slf4j public class WindowsDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();String brokers localhost:9092;KafkaSourceString source KafkaSource.Stringbuilder().setBootstrapServers(brokers).setTopics(TOPIC_NAME).setGroupId(my-group).setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSourceString data env.fromSource(source, WatermarkStrategy.noWatermarks(), wfgxxx);// 基于事件时间的 30 秒滑动窗口滑动间隔为 10 秒 data.flatMap(new DataSplitter()).keyBy(v-v.f0).assignTimestampsAndWatermarks(/* 分配时间戳和水印 */).window(SlidingEventTimeWindows.of(Time.seconds(30), Time.seconds(10))).sum(1).print();env.execute(flink window example);} }注意 kafka本地运行kafka部署自动生成名字代码随机名自动生成随机IP代码随机IP
http://www.w-s-a.com/news/859753/

相关文章:

  • 做酒招代理的网站赣icp南昌网站建设
  • 怎样做网站內链大连市建设工程信息网官网
  • 网站软件免费下载安装泰安网站建设收费标准
  • 部署iis网站校园网站设计毕业设计
  • 网站快慢由什么决定塘沽手机网站建设
  • 苏州那家公司做网站比较好装修队做网站
  • 外贸网站推广中山网站流量团队
  • 网站前端设计培训做一份网站的步zou
  • 网站备案拍照茶叶网页设计素材
  • wordpress 手机商城模板关键词优化软件有哪些
  • 网站301做排名python做的网站如何部署
  • 昆山做企业网站工信部网站 备案
  • 做英文的小说网站有哪些网站做qq登录
  • 湖州建设局招投标网站深圳广告公司集中在哪里
  • 重庆主城推广网站建设商城网站建设预算
  • 宁波品牌网站推广优化公司开发公司工程部工作总结
  • 长沙建站模板微信网站建设方案
  • 不让网站在手机怎么做门户网站 模板之家
  • 网站建设及推广图片wordpress文章摘要调用
  • 手机版网站案例全国信息企业公示系统
  • 模仿别人网站建设银行广州招聘网站
  • 沧州网站建设沧州内页优化
  • 代加工网站有哪些专门做网站关键词排名
  • 郑州做景区网站建设公司软件开发者模式怎么打开
  • 长沙企业网站建设哪家好做app一般多少钱
  • 南宁一站网网络技术有限公司网站开发技术应用领域
  • 公司网站建设方案ppt专业构建网站的公司
  • 深圳网站建设方维网络网站框架设计好后怎么做
  • 合肥网站建设过程网站栏目建设调研
  • 手机访问网站页面丢失北京电商平台网站建设