工具类网站如何做排名,深圳网站建设推荐,西安网页制作培训,男的女的做那个的视频网站该文档演示了fink windows的操作DEMO 环境准备#xff1a; kafka本地运行#xff1a;kafka部署自动生成名字代码#xff1a;随机名自动生成随机IP代码#xff1a;随机IPFlink 1.18 测试数据
自动向kafka推送数据
import cn.hutool.core.date.DateUtil;
import com.alibab…该文档演示了fink windows的操作DEMO 环境准备 kafka本地运行kafka部署自动生成名字代码随机名自动生成随机IP代码随机IPFlink 1.18 测试数据
自动向kafka推送数据
import cn.hutool.core.date.DateUtil;
import com.alibaba.fastjson2.JSONObject;
import com.wfg.flink.example.dto.KafkaPvDto;
import com.wfg.flink.example.utils.RandomGeneratorUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.time.LocalDateTime;
import java.util.Properties;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;import static com.wfg.flink.example.constants.Constants.KAFKA_BROKERS;
import static com.wfg.flink.example.constants.Constants.TOPIC_NAME;public class KafkaTestProducer {public static void main(String[] args) {Properties props new Properties();props.put(bootstrap.servers, KAFKA_BROKERS);props.put(key.serializer, org.apache.kafka.common.serialization.StringSerializer);props.put(value.serializer, org.apache.kafka.common.serialization.StringSerializer);try (ProducerString, String producer new KafkaProducer(props)) {int times 100000;for (int i 0; i times; i) {System.out.println(Send No. : i);CompletableFuture.allOf(CompletableFuture.runAsync(() - sendKafkaMsg(producer)),CompletableFuture.runAsync(() - sendKafkaMsg(producer)),CompletableFuture.runAsync(() - sendKafkaMsg(producer)),CompletableFuture.runAsync(() - sendKafkaMsg(producer)),CompletableFuture.runAsync(() - sendKafkaMsg(producer)),CompletableFuture.runAsync(() - sendKafkaMsg(producer)),CompletableFuture.runAsync(() - sendKafkaMsg(producer)),CompletableFuture.runAsync(() - sendKafkaMsg(producer)),CompletableFuture.runAsync(() - sendKafkaMsg(producer))).join();producer.flush();Random random new Random();int randomNumber random.nextInt(7); // 生成一个0到6的随机数Thread.sleep(1000 * randomNumber);}} catch (InterruptedException e) {throw new RuntimeException(e);}}private static void sendKafkaMsg(ProducerString, String producer) {String msg createMsg();System.out.println(msg);producer.send(new ProducerRecord(TOPIC_NAME, UUID.randomUUID().toString().replaceAll(-, ), msg));}private static String createMsg() {KafkaPvDto dto new KafkaPvDto();dto.setUuid(UUID.randomUUID().toString().replaceAll(-, ));dto.setUserName(RandomGeneratorUtils.generateRandomFullName());dto.setVisitIp(RandomGeneratorUtils.generateRandomIp());
// DateTime begin DateUtil.beginOfDay(new Date());
// String timeStr DateUtil.format(RandomGeneratorUtils.generateRandomDateTime(LocalDateTimeUtil.of(begin).toLocalDate(), LocalDate.now()), yyyy-MM-dd HH:mm:ss);String timeStr DateUtil.format(LocalDateTime.now(), yyyy-MM-dd HH:mm:ss);dto.setVisitTime(timeStr);dto.setVisitServiceIp(RandomGeneratorUtils.generateRandomIp());return JSONObject.toJSONString(dto);}
}注意 kafka本地运行kafka部署自动生成名字代码随机名自动生成随机IP代码随机IP FLINK 数据 /**** author wfg*/
Slf4j
public class DataSplitter implements FlatMapFunctionString, Tuple2String, Integer {Overridepublic void flatMap(String value, CollectorTuple2String, Integer collector) {KafkaPvDto data JSONObject.parseObject(value, KafkaPvDto.class);if (data ! null) {collector.collect(new Tuple2(data.getUserName(), 1));}}
}
基于时间窗口
*** Desc: Flink Window 学习*/
Slf4j
public class WindowsDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();String brokers localhost:9092;KafkaSourceString source KafkaSource.Stringbuilder().setBootstrapServers(brokers).setTopics(TOPIC_NAME).setGroupId(my-group).setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSourceString data env.fromSource(source, WatermarkStrategy.noWatermarks(), wfgxxx);//基于时间窗口data.flatMap(new DataSplitter()).keyBy(1).timeWindow(Time.seconds(30)).sum(0).print();*/env.execute(flink window example);}
}
基于滑动时间窗口
/*** Desc: Flink Window 学习*/
Slf4j
public class WindowsDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();String brokers localhost:9092;KafkaSourceString source KafkaSource.Stringbuilder().setBootstrapServers(brokers).setTopics(TOPIC_NAME).setGroupId(my-group).setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSourceString data env.fromSource(source, WatermarkStrategy.noWatermarks(), wfgxxx);//基于滑动时间窗口data.flatMap(new DataSplitter()).keyBy(1).timeWindow(Time.seconds(60), Time.seconds(30)).sum(0).print();env.execute(flink window example);}
}基于事件数量窗口
/*** Desc: Flink Window 学习*/
Slf4j
public class WindowsDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();String brokers localhost:9092;KafkaSourceString source KafkaSource.Stringbuilder().setBootstrapServers(brokers).setTopics(TOPIC_NAME).setGroupId(my-group).setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSourceString data env.fromSource(source, WatermarkStrategy.noWatermarks(), wfgxxx);//基于事件数量窗口data.flatMap(new DataSplitter()).keyBy(1).countWindow(3).sum(0).print();env.execute(flink window example);}
}基于事件数量滑动窗口
/*** Desc: Flink Window 学习*/
Slf4j
public class WindowsDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();String brokers localhost:9092;KafkaSourceString source KafkaSource.Stringbuilder().setBootstrapServers(brokers).setTopics(TOPIC_NAME).setGroupId(my-group).setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSourceString data env.fromSource(source, WatermarkStrategy.noWatermarks(), wfgxxx);//基于事件数量滑动窗口data.flatMap(new DataSplitter()).keyBy(1).countWindow(4, 3).sum(0).print();*env.execute(flink window example);}
}基于会话时间窗口
/*** Desc: Flink Window 学习*/
Slf4j
public class WindowsDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();String brokers localhost:9092;KafkaSourceString source KafkaSource.Stringbuilder().setBootstrapServers(brokers).setTopics(TOPIC_NAME).setGroupId(my-group).setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSourceString data env.fromSource(source, WatermarkStrategy.noWatermarks(), wfgxxx);//基于会话时间窗口data.flatMap(new DataSplitter()).keyBy(v-v.f0).window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))//表示如果 5s 内没出现数据则认为超出会话时长然后计算这个窗口的和.sum(1).print();env.execute(flink window example);}
}滚动窗口Tumbling Window
滚动窗口Tumbling Window
/*** Desc: Flink Window 学习*/
Slf4j
public class WindowsDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();String brokers localhost:9092;KafkaSourceString source KafkaSource.Stringbuilder().setBootstrapServers(brokers).setTopics(TOPIC_NAME).setGroupId(my-group).setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSourceString data env.fromSource(source, WatermarkStrategy.noWatermarks(), wfgxxx);//滚动窗口Tumbling Window 基于处理时间的 30 秒滚动窗口data.flatMap(new DataSplitter()).keyBy(v-v.f0).window(TumblingProcessingTimeWindows.of(Time.seconds(30))).sum(1).print();;env.execute(flink window example);}
}基于事件时间
/*** Desc: Flink Window 学习*/
Slf4j
public class WindowsDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();String brokers localhost:9092;KafkaSourceString source KafkaSource.Stringbuilder().setBootstrapServers(brokers).setTopics(TOPIC_NAME).setGroupId(my-group).setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSourceString data env.fromSource(source, WatermarkStrategy.noWatermarks(), wfgxxx);// 基于事件时间的 30 秒滚动窗口data.flatMap(new DataSplitter()).keyBy(v-v.f0).assignTimestampsAndWatermarks(/* 分配时间戳和水印 */).window(TumblingEventTimeWindows.of(Time.seconds(30))).sum(1).print();env.execute(flink window example);}
}滑动窗口Sliding Window
基于处理时间
/*** Desc: Flink Window 学习*/
Slf4j
public class WindowsDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();String brokers localhost:9092;KafkaSourceString source KafkaSource.Stringbuilder().setBootstrapServers(brokers).setTopics(TOPIC_NAME).setGroupId(my-group).setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSourceString data env.fromSource(source, WatermarkStrategy.noWatermarks(), wfgxxx);// 基于处理时间的 30 秒滑动窗口滑动间隔为 10 秒data.flatMap(new DataSplitter()).keyBy(v-v.f0).window(SlidingProcessingTimeWindows.of(Time.seconds(30), Time.seconds(10))).sum(1).print();env.execute(flink window example);}
}基于事件时间
/*** Desc: Flink Window 学习*/
Slf4j
public class WindowsDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();String brokers localhost:9092;KafkaSourceString source KafkaSource.Stringbuilder().setBootstrapServers(brokers).setTopics(TOPIC_NAME).setGroupId(my-group).setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSourceString data env.fromSource(source, WatermarkStrategy.noWatermarks(), wfgxxx);// 基于事件时间的 30 秒滑动窗口滑动间隔为 10 秒 data.flatMap(new DataSplitter()).keyBy(v-v.f0).assignTimestampsAndWatermarks(/* 分配时间戳和水印 */).window(SlidingEventTimeWindows.of(Time.seconds(30), Time.seconds(10))).sum(1).print();env.execute(flink window example);}
}注意 kafka本地运行kafka部署自动生成名字代码随机名自动生成随机IP代码随机IP