当前位置: 首页 > news >正文

白银市城乡建设局网站海外房产网站建设

白银市城乡建设局网站,海外房产网站建设,抖音代运营计划书,pageadmin 制作网站怎么绑定域名目录 keyed state Keyed DataStream 使用 Keyed State 实现了一个简单的计数窗口 状态有效期 (TTL) 过期数据的清理 全量快照时进行清理 增量数据清理 在 RocksDB 压缩时清理 Operator State算子状态 Broadcast State广播状态 keyed state Keyed DataStream 使用 k…目录 keyed state Keyed DataStream 使用 Keyed State 实现了一个简单的计数窗口 状态有效期 (TTL) 过期数据的清理 全量快照时进行清理 增量数据清理 在 RocksDB 压缩时清理 Operator State算子状态 Broadcast State广播状态 keyed state Keyed DataStream 使用 keyed state首先需要为DataStream指定 key主键。这个主键用于状态分区也会给数据流中的记录本身分区。 使用 DataStream 中 Java/Scala API 的 keyBy(KeySelector) 或者是 Python API 的 key_by(KeySelector) 来指定 key。它将生成 KeyedStream接下来允许使用 keyed state 操作。 Keyselector函数接收单条记录作为输入返回这条记录的 key。该 key 可以为任何类型但是它的计算产生方式必须是具备确定性的。 Flink的数据模型不基于key-value对因此实际上将数据集在物理上封装成 key和 value是没有必要的。Key是“虚拟”的。它们定义为基于实际数据的函数用以操纵分组算子。 使用 Keyed State keyed state接口提供不同类型状态的访问接口这些状态都作用于当前输入数据的key。 换句话说这些状态仅可在KeyedStream上使用在Java/Scala API上可以通过stream.keyBy(...)得到 KeyedStream在Python API上可以通过 stream.key_by(...) 得到 KeyedStream。 所有支持的状态类型如下所示 ValueStateT: 保存一个可以更新和检索的值 ListstateT: 保存一个元素的列表。可以往这个列表中追加数据并在当前的列表上进行检索。 ReducingStateT: 保存一个单值表示添加到状态的所有值的聚合。但使用 add(T) 增加元素会使用提供的 ReduceFunction 进行聚合。 AggregatingStateIN, OUT: 保留一个单值表示添加到状态的所有值的聚合。使用 add(IN) 添加的元素会用指定的 AggregateFunction 进行聚合。 MapStateUK, UV: 维护了一个映射列表。 你可以添加键值对到状态中也可以获得反映当前所有映射的迭代器。 所有类型的状态还有一个clear() 方法清除当前 key 下的状态数据也就是当前输入元素的 key。 实现了一个简单的计数窗口 我们把元组的第一个元素当作 key。 该函数将出现的次数以及总和存储在 “ValueState” 中。 一旦出现次数达到 2则将平均值发送到下游并清除状态重新开始。 请注意我们会为每个不同的 key元组中第一个元素保存一个单独的值。 必须创建一个 StateDescriptor才能得到对应的状态句柄。 这保存了状态名称状态所持有值的类型并且可能包含用户指定的函数例如ReduceFunction。 根据不同的状态类型可以创建ValueStateDescriptorListstateDescriptor AggregatingStateDescriptor, ReducingStateDescriptor 或MapStateDescriptor。 状态通过 RuntimeContext 进行访问因此只能在 rich functions 中使用。 from pyflink.common.typeinfo import Types from pyflink.datastream import StreamExecutionEnvironment, FlatMapFunction, RuntimeContext from pyflink.datastream.state import ValueStateDescriptorclass CountWindowAverage(FlatMapFunction):def __init__(self):self.sum Nonedef open(self, runtime_context: RuntimeContext):descriptor ValueStateDescriptor(average,  # the state nameTypes.PICKLED_BYTE_ARRAY()  # type information)self.sum runtime_context.get_state(descriptor)def flat_map(self, value):# access the state valuecurrent_sum self.sum.value()if current_sum is None:current_sum (0, 0)# update the countcurrent_sum (current_sum[0] 1, current_sum[1] value[1])# update the stateself.sum.update(current_sum)# if the count reaches 2, emit the average and clear the stateif current_sum[0] 2:self.sum.clear()yield value[0], int(current_sum[1] / current_sum[0])env StreamExecutionEnvironment.get_execution_environment() env.from_collection([(1, 3), (1, 5), (1, 7), (1, 4), (1, 2)]) \.key_by(lambda row: row[0]) \.flat_map(CountWindowAverage()) \.print()env.execute() # the printed output will be (1,4) and (1,5) 状态有效期 (TTL) 任何类型的keyed state都可以有有效期(TTL)。如果配置了TTL且状态值已过期则会尽最大可能清除对应的值。所有状态类型都支持单元素的TTL。 这意味着列表元素和映射元素将独立到期。 在使用状态 TTL 前需要先构建一个StateTtlConfig 配置对象。 然后把配置传递到state descriptor中启用TTL功能。 from pyflink.common.time import Time from pyflink.common.typeinfo import Types from pyflink.datastream.state import ValueStateDescriptor, StateTtlConfigttl_config StateTtlConfig \.new_builder(Time.seconds(1)) \.set_update_type(StateTtlConfig.UpdateType.OnCreateAndWrite) \.set_state_visibility(StateTtlConfig.StateVisibility.NeverReturnExpired) \.build()state_descriptor ValueStateDescriptor(text state, Types.STRING()) state_descriptor.enable_time_to_live(ttl_config) TTL配置有以下几个选项 newBuilder 的第一个参数表示数据的有效期是必选项。 TTL 的更新策略默认是 OnCreateAndWrite StateTtlConfig.UpdateType.OnCreateAndWrite - 仅在创建和写入时更新 StateTtlConfig.UpdateType.OnReadAndWrite - 读取时也更新 数据在过期但还未被清理时的可见性配置如下默认为 NeverReturnExpired): StateTtlConfig.StateVisibility.NeverReturnExpired - 不返回过期数据 (注意: 在PyFlink作业中状态的读写缓存都将失效这将导致一部分的性能损失) NeverReturnExpired 情况下过期数据就像不存在一样不管是否被物理删除。这对于不能访问过期数据的场景下非常有用比如敏感数据。 StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp - 会返回过期但未清理的数据 (注意: 在PyFlink作业中状态的读缓存将会失效这将导致一部分的性能损失) ReturnExpiredIfNotCleanedUp 在数据被物理删除前都会返回。 过期数据的清理 默认情况下过期数据会在读取的时候被删除例如 ValueState#value同时会有后台线程定期清理如果 StateBackend 支持的话。可以通过 StateTtlConfig 配置关闭后台清理. from pyflink.common.time import Time from pyflink.datastream.state import StateTtlConfigttl_config StateTtlConfig \.new_builder(Time.seconds(1)) \.disable_cleanup_in_background() \.build() 可以配置更细粒度的后台清理策略。当前的实现中 HeapStateBackend 依赖增量数据清理RocksDBStateBackend 利用压缩过滤器进行后台清理。 全量快照时进行清理 可以启用全量快照时进行清理的策略这可以减少整个快照的大小。当前实现中不会清理本地的状态但从上次快照恢复时不会恢复那些已经删除的过期数据。 该策略可以通过 StateTtlConfig 配置进行配置这种策略在 RocksDBStateBackend 的增量 checkpoint 模式下无效。 from pyflink.common.time import Time from pyflink.datastream.state import StateTtlConfigttl_config StateTtlConfig \.new_builder(Time.seconds(1)) \.cleanup_full_snapshot() \.build() 这种清理方式可以在任何时候通过 StateTtlConfig 启用或者关闭比如在从 savepoint 恢复时。 增量数据清理 现在仅 Heap state backend 支持增量清除机制。 增量式清理状态数据在状态访问或/和处理时进行。如果没有 state 访问也没有处理数据则不会清理过期数据。增量清理会增加数据处理的耗时。 如果某个状态开启了该清理策略则会在存储后端保留一个所有状态的惰性全局迭代器。 每次触发增量清理时从迭代器中选择已经过期的数进行清理。 from pyflink.common.time import Time from pyflink.datastream.state import StateTtlConfigttl_config StateTtlConfig \.new_builder(Time.seconds(1)) \.cleanup_incrementally(10, True) \.build() 该策略有两个参数。 第一个是每次清理时检查状态的条目数在每个状态访问时触发。 第二个参数表示是否在处理每条记录时触发清理。 Heap backend 默认会检查 5 条状态并且关闭在每条记录时触发清理。 在 RocksDB 压缩时清理 如果使用 RocksDB state backend则会启用 Flink 为 RocksDB 定制的压缩过滤器。RocksDB 会周期性的对数据进行合并压缩从而减少存储空间。 Flink 提供的 RocksDB 压缩过滤器会在压缩时过滤掉已经过期的状态数据。 from pyflink.common.time import Time from pyflink.datastream.state import StateTtlConfigttl_config StateTtlConfig \.new_builder(Time.seconds(1)) \.cleanup_in_rocksdb_compact_filter(1000) \.build() Flink 处理一定条数的状态数据后会使用当前时间戳来检测 RocksDB 中的状态是否已经过期 你可以通过 StateTtlConfig.newBuilder(...).cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries) 方法指定处理状态的条数。 时间戳更新的越频繁状态的清理越及时但由于压缩会有调用 JNI 的开销因此会影响整体的压缩性能。 RocksDB backend 的默认后台清理策略会每处理 1000 条数据进行一次。 注意: 压缩时调用 TTL 过滤器会降低速度。TTL 过滤器需要解析上次访问的时间戳并对每个将参与压缩的状态进行是否过期检查。 对于集合型状态类型比如 list 和 map会对集合中每个元素进行检查。 对于元素序列化后长度不固定的列表状态TTL 过滤器需要在每次 JNI 调用过程中额外调用 Flink 的 java 序列化器 从而确定下一个未过期数据的位置。 对已有的作业这个清理方式可以在任何时候通过 StateTtlConfig 启用或禁用该特性比如从 savepoint 重启后。 Operator State算子状态 Python DataStream API 仍无法支持算子状态 算子状态或者非 keyed 状态是绑定到一个并行算子实例的状态。Kafka Connector 是 Flink 中使用算子状态一个很具有启发性的例子。 Kafka consumer 每个并行实例维护了 topic partitions 和偏移量的 map 作为它的算子状态。 当并行度改变的时候算子状态支持将状态重新分发给各并行算子实例。处理重分发过程有多种不同的方案。 在典型的有状态 Flink 应用中你无需使用算子状态。它大都作为一种特殊类型的状态使用。用于实现 source/sink以及无法对 state 进行分区而没有主键的这类场景中。 Broadcast State广播状态 Python DataStream API 仍无法支持广播状态 广播状态是一种特殊的算子状态。引入它的目的在于支持一个流中的元素需要广播到所有下游任务的使用情形。在这些任务中广播状态用于保持所有子任务状态相同。 该状态接下来可在第二个处理记录的数据流中访问。可以设想包含了一系列用于处理其他流中元素规则的低吞吐量数据流这个例子自然而然地运用了广播状态。 考虑到上述这类使用情形广播状态和其他算子状态的不同之处在于 它具有 map 格式 它仅在一些特殊的算子中可用。这些算子的输入为一个广播数据流和非广播数据流 这类算子可以拥有不同命名的多个广播状态 。
http://www.w-s-a.com/news/687636/

相关文章:

  • 北海住房和城乡建设局网站wordpress标题去掉私密
  • 织梦网站安装视频做网站都有那些步骤
  • 网站空间大小选择沈阳微信网站制作
  • 网站分享对联广告网站结构的类型
  • 三维家是在网站上做还是在app上北京网站建设 专业10年
  • 模拟网站建设网站建设认准猪八戒网
  • godaddy网站建设教程微信手机网站搭建
  • 网站建设 商城淄博网站制作哪家好
  • 廊坊手机网站团队国际贸易进口代理公司
  • 运用django做网站网站建设问题及解决办法
  • 商丘企业网站服务c mvc 网站开发
  • 太仓建设工程网站广州seo排名外包
  • 成都的教育品牌网站建设做网站不给提供ftp
  • 建站行业市场人力管理系统
  • qq钓鱼网站在线生成器google关键词搜索工具
  • 化妆网站模板移动网站建设模板
  • 欢迎访问语文建设杂志网站网站蓝色配色
  • 最新网站发布重庆城乡建设部网站首页
  • 网站建设费用高鄂尔多斯seo
  • dw做网站怎么发布网站无后台可以上框架
  • 网络公司如何建网站网站的建设需要多少钱
  • 代刷网站推广快速泉州网页定制
  • 网站优秀网站地址做宣传册的公司
  • 苏州高端网站建设咨询wordpress云图插件
  • 河北省建设厅网站重新安装优秀中文网页设计
  • 如何在腾讯云做网站开源站群cms
  • 公司网站建设的意义网易做网站
  • 网络营销案例分析与实践搜外seo
  • 手机建网站挣钱吗wordpress面包屑
  • 淘客做网站怎么备案网站开发工具的是什么