网站加速代码,济宁网架有多少网架公司,免费网站的建设,3d云打印网站开发a#xff09;概述
本节将描述 FLIP-27 中引入的新 Source API 的主要接口。
b#xff09;Source
Source API 是一个工厂模式的接口#xff0c;用于创建以下组件。
Split EnumeratorSource ReaderSplit SerializerEnumerator Checkpoint Serializer
此外#xff0c;Sou…a概述
本节将描述 FLIP-27 中引入的新 Source API 的主要接口。
bSource
Source API 是一个工厂模式的接口用于创建以下组件。
Split EnumeratorSource ReaderSplit SerializerEnumerator Checkpoint Serializer
此外Source 还提供了 Boundedness【有界】的特性使 Flink 可以选择合适的模式来运行 Flink 任务。
Source 实现应该是可序列化的因为 Source 实例会在运行时被序列化并上传到 Flink 集群。
cSplitEnumerator
SplitEnumerator 典型实现如下
SourceReader 的注册处理SourceReader 的失败处理 SourceReader 失败时会调用 addSplitsBack() 方法SplitEnumerator 会收回已经被分配但尚未被该 SourceReader 确认acknowledged的分片。 SourceEvent 的处理 SourceEvents 是 SplitEnumerator 和 SourceReader 之间来回传递的自定义事件可以利用此机制来执行复杂的协调任务。 分片的发现以及分配 SplitEnumerator 可以将分片分配到 SourceReader 从而响应各种事件包括发现新的分片、新 SourceReader 的注册、SourceReader 的失败处理等。
SplitEnumerator 可以在 SplitEnumeratorContext 的帮助下完成上述工作SplitEnumeratorContext 会在 SplitEnumerator 创建或者恢复的时候提供给 Source。
SplitEnumeratorContext 允许 SplitEnumerator 检索到 reader 的必要信息并执行协调操作而在 Source 的实现中会将 SplitEnumeratorContext 传递给 SplitEnumerator 实例。
SplitEnumerator 的实现可以仅采用被动工作方式仅在其方法被调用时采取协调操作但是一些 SplitEnumerator 的实现会采取主动的工作方式例如 SplitEnumerator 定期寻找分片并分配给 SourceReader这类问题使用 SplitEnumeratorContext 类中的 callAsync() 方法比较方便。
示例如何在 SplitEnumerator 不需要自己维护线程的条件下实现这一点。
class MySplitEnumerator implements SplitEnumeratorMySplit, MyCheckpoint {private final long DISCOVER_INTERVAL 60_000L;/*** 一种发现分片的方法*/private ListMySplit discoverSplits() {...}Overridepublic void start() {...enumContext.callAsync(this::discoverSplits, splits - {MapInteger, ListMySplit assignments new HashMap();int parallelism enumContext.currentParallelism();for (MySplit split : splits) {int owner split.splitId().hashCode() % parallelism;assignments.computeIfAbsent(owner, new ArrayList()).add(split);}enumContext.assignSplits(new SplitsAssignment(assignments));}, 0L, DISCOVER_INTERVAL);...}...
}dSourceReader
SourceReader 是一个运行在 Task Manager 上的组件用于处理来自分片的记录。
SourceReader 提供了一个拉取式的pull-based处理接口Flink 任务会在循环中不断调用 pollNext(ReaderOutput) 轮询来自 SourceReader 的记录pollNext(ReaderOutput) 方法的返回值指示 SourceReader 的状态。
MORE_AVAILABLE - SourceReader 有可用的记录。NOTHING_AVAILABLE - SourceReader 现在没有可用的记录但是将来可能会有记录可用。END_OF_INPUT - SourceReader 已经处理完所有记录到达数据的尾部。即 SourceReader 可以终止任务了。
pollNext(ReaderOutput) 会使用 ReaderOutput 作为参数为了提高性能且在必要情况下SourceReader 可以在一次 pollNext() 调用中返回多条记录例如外部系统的工作粒度为块而一个块可以包含多个记录但是 source 只能在块的边界处设置 Checkpoint此时SourceReader 可以一次将一个块中的所有记录通过 ReaderOutput 发送至下游。
**注意SourceReader 的实现应该避免在一次 pollNext(ReaderOutput) 的调用中发送多个记录**因为对 SourceReader 轮询的任务线程工作在一个事件循环event-loop中且不能阻塞。
在创建 SourceReader 时相应的 SourceReaderContext 会提供给 Source而 Source 会将相应的上下文传递给 SourceReader 实例SourceReader 可以通过 SourceReaderContext 将 SourceEvent 传递给相应的 SplitEnumerator Source 的一个典型设计模式是让 SourceReader 发送它们的本地信息给 SplitEnumerator后者则会全局性地做出决定。
SourceReader API 是一个底层low-level) API允许用户自行处理分片并使用自己的线程模型来获取和移交记录为了帮助实现 SourceReaderFlink 提供了 SourceReaderBase 类可以显著减少编写 SourceReader 所需要的工作量。
强烈建议连接器开发人员充分利用 SourceReaderBase 而不是从头开始编写 SourceReader。
eSource 使用方法
为了通过 Source 创建 DataStream需要将 Source 传递给 StreamExecutionEnvironment。
final StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();Source mySource new MySource(...);DataStreamInteger stream env.fromSource(mySource,WatermarkStrategy.noWatermarks(),MySourceName);