购物网站 建设,合肥培训网站建设,ppt模板免费下载 素材百度网盘,网店设计说明基础概念
Flink是一个框架和分布式处理引擎#xff0c;用于对无界数据流和有界数据流进行有状态计算#xff0c;它的核心目标是“数据流上的有状态计算”。 有界流和无界流
有界流#xff1a;具有明确的开始和结束时间#xff0c;数据量有限。适合使用批处理技术#xf…基础概念
Flink是一个框架和分布式处理引擎用于对无界数据流和有界数据流进行有状态计算它的核心目标是“数据流上的有状态计算”。 有界流和无界流
有界流具有明确的开始和结束时间数据量有限。适合使用批处理技术可以在处理前将所有数据一次性读入内存进行处理。有界流通常用于历史数据分析、数据迁移等场景。无界流没有明确的开始和结束时间数据连续不断生成。由于数据是无限且持续的无界流需要实时处理并且必须持续摄取和处理数据不能等待所有数据到达后再进行处理。适合适用于流处理。
名词
源算子source
Flink可以从各种来源获取数据然后构建DataStream进行转换处理。一般将数据的输入来源称为数据源data source而读取数据的算子就是源算子source operator。 // 创建执行环境
StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 从集合读取数据
DataStreamSourceInteger collectionSource env.fromCollection(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15));// 从文件中读取数据
DataStreamSourceString fileSource env.fromSource(FileSource.forRecordStreamFormat(new TextLineInputFormat(),new Path(input/word.txt)).build(),WatermarkStrategy.noWatermarks(),fileSource);// 从kafka读取数据
DataStreamSourceString kafkaSource env.fromSource(KafkaSource.Stringbuilder()// kafka地址--可配置多个.setBootstrapServers()// topic名称--可配置多个.setTopics()// 消费组id.setGroupId()// 反序列化方式.setValueOnlyDeserializer(new SimpleStringSchema())// kafka 消费偏移量 方式 earliest一定从最早的开始消费、latest一定从最新的开始消费或者手动设置偏移量 默认是earliest.setStartingOffsets(OffsetsInitializer.latest())// 水位线自定义数据源算子名称.build(), WatermarkStrategy.noWatermarks(), kafkaSource);// 从socket读取数据
DataStreamSourceString socketSource env.socketTextStream(...,1234);
基本转换算子
Map
对元素的数据类型和内容做转换。
// 第一个参数为输入流第二个参数为输出流
SingleOutputStreamOperatorUserDto userDataStream kafkaSource.map(new MapFunctionString, UserDto() { Override public UserDto map(String message) throws Exception { return JSONObject.parseObject(message, UserDto.class); }
});
FlatMap
输入一个元素同时产生零个、一个或多个元素。
// 第一个参数为输入流第二个参数为输出流
// 可做转换可做条件过滤
SingleOutputStreamOperatorUserDto userDataStream kafkaSource.flatMap(new FlatMapFunctionString, UserDto() { Override public void flatMap(String message, CollectorUserDto collector) throws Exception { UserDto userDto JSONObject.parseObject(message, UserDto.class); collector.collect(userDto);}
});
Filter
对数据源根据条件过滤数据保留满足条件的数据 // 过滤出年龄大于18的用户
SingleOutputStreamOperatorUserDto filterDataStream userDataStream.filter(new FilterFunctionUserDto() { Override public boolean filter(UserDto userDto) throws Exception { return userDto.getAge() 18; }
});
聚合算子
KeyBy
根据指定的字段(key)将数据划分到不相交的分区中。相同key的元素会被分到同一个分区中。 // 将用户id一样的用户分到一个分区内
KeyedStreamUserDto, Integer userKeyedStream userDataStream.keyBy(new KeySelectorUserDto, Integer() { Override public Integer getKey(UserDto userDto) throws Exception { return userDto.getId(); }
});
Reduce 仅支持同类型的数据
对流的数据来一条计算一条将当前元素和上一次聚合后的数据组合输出新值并将新值进行保存作为下一次计算的元素。 聚合前和聚合后的数据类型是一致的。 当第一条数据进来时不会触发计算。 // 计算一个用户的订单总价格
SingleOutputStreamOperatorUserDto reduce userKeyedStream.reduce(new ReduceFunctionUserDto() {Overridepublic UserDto reduce(UserDto t1, UserDto t2) throws Exception {int totalPrice t1.getTotalPrice() t2.getOrderPrice();UserDto userDto new UserDto();userDto.setId(t1.getId());userDto.setAge(t1.getAge());userDto.setTotalPrice(totalPrice);return userDto;}
});
Aggregate 支持不同类型的数据 SingleOutputStreamOperatorString aggregate windowedStream.aggregate(new AggregateFunctionUserDto, Integer, String() { /** * 创建累加器就是初始化累加器 * return */ Override public Integer createAccumulator() { return 0; } /** * 计算逻辑或者是聚合逻辑 * param userDto * param beforeData * return */ Override public Integer add(UserDto userDto, Integer beforeData) { return beforeData userDto.getAge(); } /** * 获取最终结果窗口触发时输出 * param integer * return */ Override public String getResult(Integer integer) { return 计算结束最终结果为 integer.toString(); } /** * 只有会话窗口才会使用到 * param integer * param acc1 * return */ Override public Integer merge(Integer integer, Integer acc1) { return 0; }
});
窗口window
把流切割成有限大小的多个“存储桶”每个数据都会分发到对应的桶中当到达窗口结束时间时就对每个桶中收集的数据进行计算处理。窗口不是静态生成的是动态创建的。当这个窗口范围的进入第一条数据时才会创建对应的窗口。
滚动窗口
有固定的大小是一种对数据进行“均匀切片”的划分方式。窗口之间没有重叠也不会有间隔是“首尾相接”的状态。每个数据都会分配到一个窗口而且只会属于一个窗口。滚动窗口可以基于时间定义也可以基于数据的个数定义需要的参数只有一个就是窗口的大小。
// 分组
KeyedStreamTuple2String, Integer, String keyedStream dataStream.keyBy(p - p.f0);// 基于处理时间开窗窗口长度为10s窗口开始时间为 窗口长度整数倍向下取整结束时间为开始时间窗口长度
WindowedStreamTuple2String, Integer, String, TimeWindow tumblingProcessingTimeStream keyByStream.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));// 基于事件时间开窗窗口长度为10s窗口开始时间为数据源事件时间结束时间为开始时间窗口长度
WindowedStreamTuple2String, Integer, String, TimeWindow tumblingEventTimeStream keyedStream.window(TumblingEventTimeWindows.of(Time.seconds(10)));// 基于次数开窗
WindowedStreamTuple2String, Integer, String, GlobalWindow countWindowStream keyedStream.countWindow(10);
滑动窗口
大小是固定的但是窗口之间不是收尾相接的而是可以“错开”一定的位置。定义滑动窗口的参数有2个窗口大小和滑动步长滑动步长代表了窗口计算的频率。因此如果 slide 小于窗口大小滑动窗口可以允许窗口重叠。这种情况下一个元素可能会被分发到多个窗口。
// 分组
KeyedStreamTuple2String, Integer, String keyedStream dataStream.keyBy(p - p.f0);// 基于处理事件开窗窗口长度为10s滑动步长为1s
WindowedStreamTuple2String, Integer, String, TimeWindow slidingProcessingTimeWindowStream keyedStream.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(1)));// 基于事件事件开窗窗口长度为10s滑动步长为1s
WindowedStreamTuple2String, Integer, String, TimeWindow slidingEventTimeWindowStream keyedStream.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(1)));// 基于次数开窗
WindowedStreamTuple2String, Integer, String, GlobalWindow countWindowStream keyedStream.countWindow(10, 1);
会话窗口
是基于会话来对数据进行分组的。会话窗口只能基于时间来定义。会话窗口中最重要的参数就是会话的超时时间也就是两个会话窗口之间的最小距离。如果相邻两个数据到来的时间间隔Gap小于指定的大小size那说明还在保持会话他们就属于同一个窗口如果gap大于Size那么新来的数据就应该属于新的会话窗口而前一个窗口就应该关闭了。会话窗口的长度不固定起始和结束时间也不是确定的各个分区之间窗口没有任何关联。会话窗口之间一定不会重叠的而且会保留至少size的间隔。
// 分组
KeyedStreamTuple2String, Integer, String keyedStream dataStream.keyBy(p - p.f0);// 基于处理时间开窗会话间隔时间为10s
WindowedStreamTuple2String, Integer, String, TimeWindow sessionWindow keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)));// 基于事件时间开窗会话间隔时间为10s
WindowedStreamTuple2String, Integer, String, TimeWindow sessionWindow keyedStream.window(EventTimeSessionWindows.withGap(Time.seconds(10)));
全局窗口
这种窗口全局有效会把相同key的所有数据都分配到同一个窗口中。这种窗口没有结束的时候默认是不会触发计算的。如果希望它能对数据进行计算还需要自定义“触发器”Trigger。全局窗口没有结束的时间点所以一般在希望做更加灵活的窗口处理时自定义使用。Flink中的计数窗口底层就是用全局窗口实现的。
窗口触发器trigger
定义了窗口何时被触发并决定触发后的行为如进行窗口数据的计算或清理。
EventTimeTrigger
基于事件时间和水印机制来触发窗口计算。当窗口的最大时间戳小于等于当前的水印时立即触发窗口计算。
ProcessingTimeTrigger
基于处理时间即机器的系统时间来触发窗口计算。当处理时间达到窗口的结束时间时触发窗口计算。
CountTrigger
根据窗口内元素的数量来触发计算。当窗口内的元素数量达到预设的阈值时触发窗口计算。
关键方法
onElement(T element, long timestamp, W window, TriggerContext ctx) 当元素被添加到窗口时调用用于注册定时器或更新窗口状态。onEventTime(long time, W window, TriggerContext ctx) 当事件时间计时器触发时调用用于处理事件时间相关的触发逻辑。onProcessingTime(long time, W window, TriggerContext ctx) 当处理时间计时器触发时调用用于处理处理时间相关的触发逻辑。onMerge(W window, OnMergeContext ctx) 当两个窗口合并时调用用于合并窗口的状态和定时器。clear(W window, TriggerContext ctx) 当窗口被删除时调用用于清理窗口的状态和定时器。
Override
public TriggerResult onElement(BatteryRuntimeFlinkDto batteryRuntimeDto, long l, GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception { ReducingStateLong countState triggerContext.getPartitionedState(countStateDescriptor); }Override
public TriggerResult onProcessingTime(long l, GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception { log.info(窗口清除定时器触发清除计数器和定时器并关窗); this.clear(globalWindow, triggerContext); return TriggerResult.PURGE;
}Override
public TriggerResult onEventTime(long time, GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception { return TriggerResult.CONTINUE;
}Override
public void clear(GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception { // 清除计数器 triggerContext.getPartitionedState(countStateDescriptor).clear(); // 清除定时器 triggerContext.deleteProcessingTimeTimer(triggerContext.getPartitionedState(processTimerDescription).get());
}
处理算子process
ProcessFunction
最基本的处理函数基于DataStream直接调用.process()时作为参数传入。
public class CabinetDetailProcessFunction extends ProcessFunctionCabinetDetailDto, BatteryPutTakeLogDataSourceDto { //往redis中写入 private transient RedisService redisService; private String platform; public CabinetDetailProcessFunction(String platform) { this.platform platform; } Override public void open(Configuration parameters) throws Exception { super.open(parameters); this.redisService ApplicationContextHolder.getBean(RedisService.class); } Override public void processElement(CabinetDetailDto cabinetDetailDto, Context context, CollectorBatteryPutTakeLogDataSourceDto collector) throws Exception { }
}
KeyedProcessFunction
对流按键分区后的处理函数基于KeyedStream调用.process()时作为参数传入。
ProcessWindowFunction
开窗之后的处理函数也是全窗口函数的代表。基于WindowedStream调用.process()时作为参数传入。
public class BatteryRuntimeProcessFunction extends ProcessWindowFunctionBatteryRuntimeFlinkDto, BatteryRuntimeFlinkDto, String, GlobalWindow { Override
public void process(String s, Context context, IterableBatteryRuntimeFlinkDto iterable, CollectorBatteryRuntimeFlinkDto collector) throws Exception { ListBatteryRuntimeFlinkDto batteryRuntimeDtos new ArrayList(); iterable.forEach(p - batteryRuntimeDtos.add(p)); if (CollectionUtils.isEmpty(batteryRuntimeDtos)) { return; } BatteryRuntimeFlinkDto batteryRuntimeFlinkDto batteryRuntimeDtos.get(0); collector.collect(batteryRuntimeFlinkDto);
}}
ProcessAllWindowFunction
同样是开窗之后的处理函数基于AllWindowedStream调用.process()时作为参数传入。
CoProcessFunction
合并connect两条流之后的处理函数基于ConnectedStreams调用.process()时作为参数传入。
ProcessJoinFunction
间隔连接interval join两条流之后的处理函数基于IntervalJoined调用.process()时作为参数传入。
BroadcastProcessFunction
广播连接流处理函数基于BroadcastConnectedStream调用.process()时作为参数传入。这里的“广播连接流”BroadcastConnectedStream是一个未keyBy的普通DataStream与一个广播流BroadcastStream做连接conncet之后的产物。 public class BatteryRuntimeConnectProcessFunction extends BroadcastProcessFunctionBatteryRuntimeDto, BatteryPutTakeLogDataSourceDto, BatteryRuntimeFlinkDto {
// 状态
MapStateDescriptorString, BatteryInBoxStatusDto descriptor new MapStateDescriptor(boxInStatus, BasicTypeInfo.STRING_TYPE_INFO, TypeInformation.of(new TypeHintBatteryInBoxStatusDto(){})); Override
public void processElement(BatteryRuntimeDto batteryRuntimeDto, ReadOnlyContext readOnlyContext, CollectorBatteryRuntimeFlinkDto collector) throws Exception {
// dosometing
}Override
public void processBroadcastElement(BatteryPutTakeLogDataSourceDto batteryPutTakeLogDataSourceDto, Context context, CollectorBatteryRuntimeFlinkDto collector) throws Exception {
// dosometing
}
KeyedBroadcastProcessFunction
按键分区的广播连接流处理函数同样是基于BroadcastConnectedStream调用.process()时作为参数传入。与BroadcastProcessFunction不同的是这时的广播连接流是一个KeyedStream与广播流BroadcastStream做连接之后的产物。
输出算子sink
输出算子就是经过一系列处理算子后的数据输出到某个位置。例如kafkaredis数据库等等。
KafkaSink DataStream stream...;
KafkaSinkString kafkaSink KafkaSink.Stringbuilder()
// 指定 kafka 的地址和端口
.setBootstrapServers(kafka地址和端口)
// 指定序列化器指定Topic名称、具体的序列化
.setRecordSerializer(KafkaRecordSerializationSchema.builder() .setTopic(topic名称) .setValueSerializationSchema(new SimpleStringSchema()) .build() )
/**
* EXACTLY_ONCE: 精准一次投送。这是最严格最理想的数据投送保证。数据不丢失不重复。
* AT_LEAST_ONCE: 至少一次投送。数据保证不丢失但可能会重复。
* NONE: 无任何额外机制保证。数据有可能丢失或者重复。
*/
// sink设置保证级别为 至少一次投送。数据保证不丢失但可能会重复
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) .build(); stream.sinkTo(kafkaSink);
JDBCSink DataStreamUserDto reduceStream...;
// 构建jdbc sink
SinkFunctionUserDto jdbcSink JdbcSink.sink(
// 数据插入sql语句
insert into user (name, age) values(?, ?),
new JdbcStatementBuilderUserDto() {
Override
// 字段映射配置
public void accept(PreparedStatement pStmt, UserDto userDto) throws SQLException {
pStmt.setString(1, userDto.getUserName());
pStmt.setInt(2, userDto.getAge()); } },
JdbcExecutionOptions
.builder()
// 批次大小,条数
.withBatchSize(10)
// 批次最大等待时间
.withBatchIntervalMs(5000)
// 重复次数
.withMaxRetries(1) .build(),
// jdbc信息配置
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withDriverName(com.mysql.jdbc.Driver)
.withUrl(数据库地址)
.withUsername(root)
.withPassword(password)
.build() );
// 添加jdbc sink
reduceStream.addSink(jdbcSink);
其他方式的sink: File、MongoDB、RabbitMQ、Elasticsearch、Apache Pulsar 等使用方式可参考官方文档Apache Flink Documentation。
Flink 相关依赖 dependency groupIdorg.apache.flink/groupId artifactIdflink-java/artifactId version1.17.0/version
/dependency
dependency groupIdorg.apache.flink/groupId artifactIdflink-streaming-java/artifactIdversion1.17.0/version
/dependency
dependency groupIdorg.apache.flink/groupIdartifactIdflink-runtime-web/artifactIdversion1.17.0/version
/dependency
dependency groupIdorg.apache.flink/groupIdartifactIdflink-clients/artifactIdversion1.17.0/version
/dependency
!-- File连接器 --
dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-files/artifactId version1.17.0/version
/dependency
!-- kafka连接器 --
dependency groupIdorg.apache.flink/groupIdartifactIdflink-connector-kafka/artifactId version1.17.0/version
/dependency
!-- jdbc连接器 --
dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-jdbc/artifactId version1.16.0/version
/dependency