当前位置: 首页 > news >正文

沙洋网站定制如果自己建立网站

沙洋网站定制,如果自己建立网站,北京网页制作案例,做二维码推送网站要真正驾驭 Flink 并构建出高效、稳定、可扩展的流处理应用#xff0c;仅仅停留在 API 的表面使用是远远不够的。深入理解其内部的运行机制#xff0c;洞悉数据从代码到分布式执行的完整生命周期#xff0c;以及明晰各个核心组件之间错综复杂而又协同工作的关系#xff0c;…要真正驾驭 Flink 并构建出高效、稳定、可扩展的流处理应用仅仅停留在 API 的表面使用是远远不够的。深入理解其内部的运行机制洞悉数据从代码到分布式执行的完整生命周期以及明晰各个核心组件之间错综复杂而又协同工作的关系对于我们进行性能调优、故障排查以及设计更优的应用程序架构至关重要。 本文将带领大家一起揭开 Flink 的神秘面纱我们将首先详细梳理一个 Flink 作业从客户端提交到在 TaskManager 上具体执行的完整启动流程理解 StreamGraph、JobGraph 到 ExecutionGraph 的演变。 紧接着我们将深入剖析 Flink 中那些我们既熟悉又可能感到困惑的核心概念如 DataStream 如何通过 Transformation 承载用户的 UDF最终又是如何在 StreamOperator 和 StreamTask 中焕发生机以及它们之间是如何相互关联、协同工作的。希望通过这次探索能帮助构建起对 Flink 内部原理更为清晰和系统的认识。 Flink 任务启动流程 客户端Client准备与提交作业: 用户通过 Flink 客户端例如执行 flink run 命令的 CLI或者通过 REST API 提交的程序或者在 IDE 中直接运行提交一个 Flink 应用程序例如用 DataStream API 编写的程序。在客户端用户的程序例如 StreamExecutionEnvironment.execute()首先会将用户定义的 DataStream 操作转换为一个 StreamGraph。StreamGraph 是作业的最初的、面向流的逻辑表示它包含了所有的算子、UDF、并行度设置、数据流向等信息。客户端随后将这个 StreamGraph 提交给 JobManager具体来说是 JobManager 中的 Dispatcher 组件。 JobManager 接收与处理作业:【stream Graph转化Job Graph 版本有变动但是通信链路是一致的】 Dispatcher: JobManager 中的 Dispatcher 接收到 StreamGraph 后会为这个作业启动一个新的 JobMaster。Dispatcher 负责作业的提交、JobMaster 的生命周期管理并提供 REST 接口。JobMaster: 每个作业都有其专属的 JobMaster。JobMaster 负责该作业的整个生命周期管理。 StreamGraph - JobGraph: JobMaster 首先将接收到的 StreamGraph 转换为 JobGraph。JobGraph 是一个更通用的、并行的作业表示它将 StreamGraph 中的算子链Operator Chains优化考虑在内并确定了 JobVertex逻辑上的并行算子。每个 JobVertex 对应 StreamGraph 中的一个或多个链式算子。 如 docs/content.zh/docs/internals/job_scheduling.md 中提到“JobManager 会接收到一个 JobGraph用来描述由多个算子顶点 (JobVertex) 组成的数据流图”。 JobGraph - ExecutionGraph: JobMaster 接着将 JobGraph 转换为 ExecutionGraph。ExecutionGraph 是作业的物理执行计划是 JobGraph 的并行化版本。 它将每个 JobVertex 根据其并行度展开为多个并行的 ExecutionVertex。每个 ExecutionVertex 代表了一个逻辑算子或算子链的一个并行实例。ExecutionGraph 中的每个 ExecutionVertex 会有一个或多个 Execution 对象来跟踪其执行尝试例如初次执行、故障恢复后的重试。ExecutionGraph 是 JobMaster 调度和监控作业执行的核心数据结构。 调度器 (Scheduler): JobMaster 内部的调度器负责将 ExecutionGraph 中的任务具体来说是 Execution 对象代表的执行尝试部署到可用的 TaskManager Slot 上。 调度器会向 ResourceManager (如果使用了如 YARN, Kubernetes 等资源管理器或者是 Flink 自身的 Standalone ResourceManager) 请求所需的 Task Slot。一旦 Slot 分配成功调度器就会将任务部署到相应的 TaskManager。 TaskManager 执行任务: TaskManager 接收到 JobManager (JobMaster) 分配的任务部署指令后会在其管理的某个空闲的 Task Slot 中为该任务启动执行。Slot 与线程: 一个 Task Slot 代表了 TaskManager 提供的一份固定的计算资源通常与 CPU核心数相关。一个 Task Slot 会运行一个或多个如果启用了 Slot Sharing Group 且任务属于同一共享组任务每个任务Task都在其自己的独立线程中执行。 这个线程本身不属于槽而属于taskSlot 是资源的划分线程是执行的载体。StreamTask: 在 TaskManager 内部每个被部署的流处理任务的实际体现就是一个 StreamTask或其特定子类如 SourceTask, OneInputStreamTask, TwoInputStreamTask 等的实例。 如 docs/content/docs/internals/task_lifecycle.md 所述: The StreamTask is the base for all different task sub-types in Flinks streaming engine.StreamTask 负责初始化和运行其内部的 StreamOperator 链OperatorChain处理输入数据执行用户定义的函数 (UDF)并产生输出数据。StreamTask 的生命周期包括创建、部署、运行、取消、完成或失败等状态。如 flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java 文件所示StreamTask 包含了大量的逻辑来管理算子链 (operatorChain)、配置 (configuration)、状态后端 (stateBackend)、检查点协调 (subtaskCheckpointCoordinator) 等。 关键补充点总结 StreamGraph: 在客户端生成是最初的逻辑图。Dispatcher 与 JobMaster: JobManager 内部组件Dispatcher 负责接收作业并为每个作业启动一个 JobMaster。JobMaster 负责单个作业的完整生命周期。JobGraph: 由 StreamGraph 转换而来是并行的逻辑图。ExecutionGraph: 由  JobGraph 转换而来是物理执行图包含 ExecutionVertex 和 Execution。ResourceManager: 负责 Task Slot 的分配。Task Slot 与线程: Slot 是资源单位Task 在 Slot 内的独立线程中运行。OperatorChain: StreamTask 内部可以运行一个算子链这是 Flink 的一项重要优化。 Flink 核心概念关系从 API 到执行 DataStream API 与 UDF (User-Defined Function - 用户定义函数): 起点: 用户通过 DataStream API (例如 dataStream.map(myMapFunction).filter(myFilterFunction)) 来声明式地构建数据处理流程。业务逻辑: UDF (例如 MyMapFunction, MyFilterFunction, KeyedProcessFunction) 是用户编写的包含具体业务处理逻辑的 Java/Scala 函数或类。它们是数据转换的核心。 Transformation (转换): 逻辑蓝图: 每当在 DataStream 上调用一个操作如 map, filter, keyBy就会创建一个或多个 Transformation 对象。Transformation 树是 Flink 作业的逻辑表示或蓝图。它详细描述了数据如何从一个操作流向下一个操作包括操作类型、应用的 UDF、输入/输出数据类型、并行度设置等。它本身不执行计算。 逻辑 Operator (算子) 与逻辑 Subtask (子任务): 逻辑处理单元: 在 Transformation 层面我们可以认为每个转换操作对应一个逻辑 Operator (例如 Map Operator, Filter Operator)。并行实例 (逻辑): 如果一个逻辑 Operator 的并行度parallelism被设置为 N那么在逻辑规划中这个 Operator 就拥有 N 个并行的逻辑实例这些逻辑实例通常被称为 Subtask。每个逻辑 Subtask 代表了该 Operator 的一个独立、并行的处理单元。 StreamOperator (运行时算子/流算子): 物理执行体: StreamOperator 是 Flink 运行时的核心组件是 Transformation 中定义的逻辑算子在物理执行时的具体实现和承载体。例如StreamMap、KeyedProcessOperator 都是 StreamOperator 的具体实现。封装 UDF: StreamOperator 负责封装用户提供的 UDF。它管理 UDF 的生命周期如调用 open(), close() 方法并调用 UDF 的核心处理方法如 map(), filter(), processElement()来处理数据。AbstractUdfStreamOperator 是许多包含 UDF 的 StreamOperator 的通用基类它简化了 UDF 的管理。 StreamTask (流任务/物理任务): 执行单元: StreamTask 是 Flink 在 TaskManager 上进行物理执行的基本单元。它是一个实现了 java.lang.Runnable 的对象在 TaskManager 的一个 Slot 中的一个独立线程内运行。核心职责: StreamTask 负责 管理其内部一个或多个 StreamOperator 的完整生命周期初始化、打开、运行数据处理循环、响应 Checkpoint、关闭、清理。处理数据的输入从网络或上游 Task和输出到网络或下游 Task。协调 Checkpoint 过程。 执行入口: StreamTask 的 invoke() 方法是其执行逻辑的入口点它启动了数据处理的主循环。 算子链 (Operator Chaining) 的影响: 优化: Flink 会尽可能地将满足条件的多个逻辑 Operator (及其对应的逻辑 Subtask) 链接chain在一起。例如连续的 map - filter - map 操作如果并行度相同且数据传输直接非重分区它们通常会被链接。结果: 被链接起来的一系列 StreamOperator 实例会运行在同一个 StreamTask 内部由一个 OperatorChain 对象管理。这意味着一个 StreamTask 可能只包含一个单独的 StreamOperator (如果没有发生链接)或者包含一串链式连接的 StreamOperator。这样做能显著减少线程切换、数据序列化/反序列化以及网络传输的开销提升性能。 总结关系流程: 用户使用 DataStream API 编写代码并提供 UDF 来定义业务逻辑。这些 API 调用会构建一个 Transformation 树这是作业的逻辑蓝图。Flink 编译器将 Transformation 树转换为物理执行图 (JobGraph - ExecutionGraph): 每个逻辑 Operator (在 Transformation 中定义) 根据其并行度被实例化为多个逻辑 Subtask。满足条件的逻辑 Subtask (来自不同的逻辑 Operator) 会被优化策略链接chain起来。 在运行时 (TaskManager): 每个可能经过链接的Subtask 序列作为一个整体被调度为一个 StreamTask 实例并在一个独立的线程中执行。StreamTask 内部运行着一个或多个如果发生链接则形成 OperatorChainStreamOperator 实例。每个 StreamOperator 实例则封装并调用用户编写的 UDF 来对流经它的数据执行具体的业务逻辑处理。 结合源码说明 我们来看一些关键的源码片段来理解这个流程 StreamTask 的构造与初始化: 当 TaskManager 接收到部署任务的指令后会创建 StreamTask 实例。 // ... existing code ... public abstract class StreamTaskOUT, OP extends StreamOperatorOUTimplements TaskInvokable,CheckpointableTask,CoordinatedTask,AsyncExceptionHandler,ContainingTaskDetails {// ... existing code ...protected StreamTask(Environment environment,Nullable TimerService timerService,Thread.UncaughtExceptionHandler uncaughtExceptionHandler,StreamTaskActionExecutor actionExecutor,TaskMailbox taskMailbox)throws Exception {this.environment Preconditions.checkNotNull(environment);this.configuration new StreamConfig(environment.getTaskConfiguration());this.recordWriter createRecordWriterDelegate(configuration, environment);this.resourceCloser new AutoCloseableRegistry();this.mailboxProcessor new MailboxProcessor(this::processInput,taskMailbox,actionExecutor,resourceCloser,this::shouldBeTerminated,this::handleAsyncExceptionDuringNormalExecution); // ... existing code ...this.asyncOperationsThreadPool MdcUtils.scopeToJob(getEnvironment().getJobID(),new ThreadPoolExecutor(0,configuration.getMaxConcurrentCheckpoints() 1,60L,TimeUnit.SECONDS,new LinkedBlockingQueue(),new ExecutorThreadFactory(AsyncOperations, uncaughtExceptionHandler))); // ... existing code ...this.stateBackend createStateBackend();this.checkpointStorage createCheckpointStorage(stateBackend); // ... existing code ...this.subtaskCheckpointCoordinator new SubtaskCheckpointCoordinatorImpl(checkpointStorageAccess,getName(),actionExecutor,getAsyncOperationsThreadPool(),environment,this,this::prepareInputSnapshot,configuration.getMaxConcurrentCheckpoints(),channelStateWriter,configuration.getConfiguration().get(CheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH),BarrierAlignmentUtil.createRegisterTimerCallback(mainMailboxExecutor, systemTimerService),environment.getTaskStateManager().getFileMergingSnapshotManager()); // ... existing code ... }在构造函数中StreamTask 会初始化运行环境、配置、状态后端、Checkpoint 存储和协调器等。 StreamTask 的执行入口 invoke(): 这是 TaskManager 启动 Task 后调用的核心方法。 // ... existing code ... Override public final void invoke() throws Exception {SubTaskInitializationMetricsBuilder initializationMetrics SubTaskInitializationMetricsBuilder.create(getEnvironment().getMetricGroup());final long initializationStarted SystemClock.getInstance().absoluteTimeMillis();initializationMetrics.addTimestampMetric(INITIALIZATION_START_TIMESTAMP, initializationStarted);// 初始化 OperatorChain这里会创建 StreamOperator 实例// RegularOperatorChain 或 FinishedOperatorChaintry {operatorChain getEnvironment().getTaskStateManager().isTaskDeployedAsFinished()? new FinishedOperatorChain(this, recordWriter): new RegularOperatorChain(this, recordWriter);mainOperator operatorChain.getMainOperator();getEnvironment().getTaskStateManager().getRestoreCheckpointId().ifPresent(restoreId - latestReportCheckpointId restoreId);// task specific initialization调用子类实现的 init() 方法init();configuration.clearInitialConfigs();// save the work of reloading state, etc, if the task is already canceledensureNotCanceled();// -------- Invoke --------LOG.debug(Invoking {}, getName());// we need to make sure that any triggers scheduled in open() cannot be// executed before all operators are opened// 恢复状态并打开算子CompletableFutureVoid allGatesRecoveredFuture actionExecutor.call(() - restoreStateAndGates(initializationMetrics));// Run mailbox until all gates will be recovered.// 启动邮箱处理循环这是任务处理数据的主要逻辑mailboxProcessor.runMailboxLoop();// ... existing code ...// make sure this is executed in any case!LOG.debug(Finished task {}, getName());} finally {// ... cleanup ...actionExecutor.runThrowing(() - {// only set the StreamTask to not running after all operators have been// finished! // ... existing code ...disableInterruptOnCancel();// ... existing code ...// clean up everything we initializedisRunning false;// ... existing code ...try {resourceCloser.close();} catch (Throwable t) {Exception e t instanceof Exception ? (Exception) t : new Exception(t);throw firstOrSuppressed(e, cancelException);}} }在 invoke() 方法中 创建 OperatorChain (RegularOperatorChain 或 FinishedOperatorChain)它包含了这个 StreamTask 要执行的一个或多个 StreamOperator。调用 init() 方法进行特定于任务类型的初始化例如SourceOperatorStreamTask 会在这里启动 SourceReader。调用 restoreStateAndGates()其中会调用 operatorChain.initializeStateAndOpenOperators()。 Operator 的初始化和开: OperatorChain 的 initializeStateAndOpenOperators 方法会遍历链上的所有算子调用它们的 initializeState() 和 open() 方法。 // ... existing code ...operatorChain.initializeStateAndOpenOperators(createStreamTaskStateInitializer(initializationMetrics));initializeStateEndTs SystemClock.getInstance().absoluteTimeMillis(); // ... existing code ...而 createStreamTaskStateInitializer 会创建一个 StreamTaskStateInitializerImpl 实例用于初始化算子的状态。 // ... existing code ... public StreamTaskStateInitializer createStreamTaskStateInitializer(SubTaskInitializationMetricsBuilder initializationMetrics) {InternalTimeServiceManager.Provider timerServiceProvider configuration.getTimerServiceProvider(getUserCodeClassLoader());return new StreamTaskStateInitializerImpl(getEnvironment(),stateBackend, // ... existing code ...UDF 的生命周期调用: 以 AbstractUdfStreamOperator 为例它的 initializeState() 和 open() 方法会进一步调用 UDF 的相应方法。 // ... existing code ... Override public void initializeState(StateInitializationContext context) throws Exception {super.initializeState(context);StreamingFunctionUtils.restoreFunctionState(context, userFunction); }Override public void open() throws Exception {super.open();FunctionUtils.openFunction(userFunction, DefaultOpenContext.INSTANCE); }Override public void finish() throws Exception {super.finish();if (userFunction instanceof SinkFunction) {((SinkFunction?) userFunction).finish();} }Override public void close() throws Exception {super.close();FunctionUtils.closeFunction(userFunction); } // ... existing code ...这里 userFunction 就是用户定义的 UDF。FunctionUtils.openFunction 会调用 UDF 的 open() 方法并传入 RuntimeContext。 数据处理循环: StreamTask 的 mailboxProcessor.runMailboxLoop() 启动后会不断调用 processInput() 方法如果邮箱中有待处理的邮件或默认操作可以执行。 // ... existing code ... protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {DataInputStatus status inputProcessor.processInput();switch (status) {case MORE_AVAILABLE:if (taskIsAvailable()) {return;}break;case NOTHING_AVAILABLE:break;case END_OF_RECOVERY:throw new IllegalStateException(We should not receive this event here.); // ... existing code ...inputProcessor.processInput() 会从输入源读取数据并通过 OperatorChain 将数据传递给第一个 StreamOperator然后数据会在算子链中依次处理每个算子会调用其内部 UDF 的处理逻辑如 map(), filter(), processElement()。 文档中的相关描述 Task Lifecycle: /flink/docs/content/docs/internals/task_lifecycle.md 描述了 StreamTask 和 Operator 的生命周期。 The StreamTask is the base for all different task sub-types in Flinks streaming engine. ... OPERATOR::setup - UDF::setRuntimeContext OPERATOR::initializeState OPERATOR::open - UDF::open OPERATOR::processElement - UDF::run Flink Architecture - Tasks and Operator Chains: flink/docs/content/docs/concepts/flink-architecture.md For distributed execution, Flink chains operator subtasks together into tasks. Each task is executed by one thread. Chaining operators together into tasks is a useful optimization... 通过以上分析应该对 Flink 任务的启动流程以及 Subtask、StreamTask、Operator 和 UDF 之间的关系有了更清晰的理解。 StreamTask 是核心的执行单元它承载了 Operator而 Operator 又驱动着 UDF 的执行。整个过程由 JobManager 调度并在 TaskManager 上实际运行。
http://www.w-s-a.com/news/428330/

相关文章:

  • 凡科网站怎么做建站关键字搜索网站怎么做
  • 小说网站建站程序企业邮箱地址
  • 福州市住房和城乡建设网站网站开发方案论文
  • 在线教育网站开发网站推广常用方法包括
  • 东莞高端品牌网站建设软件开发模型及特点
  • 个人网站的设计与实现的主要内容网站开发公司架构
  • 浏览器收录网站什么是新媒体营销
  • 上海营销网站建设公司下面哪个不是网页制作工具
  • 有哪些网站可以做设计比赛苏州设计公司排名前十
  • 公益网站建设需求车陂手机网站开发
  • 高端网站建设专业营销团队宁德网站建设51yunsou
  • 网站如何做cdn购物网站建设app开发
  • 简单的手机网站模板好看大方的企业网站源码.net
  • 沈阳住房和城乡建设厅网站网站个人备案做论坛
  • 企业建网站的目的开家网站建设培训班
  • 做怎么网站网站优化和推广
  • 建站工具 风铃网站每年空间域名费用及维护费
  • 网站开发工具 知乎工业软件开发技术就业前景
  • 永济微网站建设费用新手如何自学编程
  • 在本地怎么做网站深圳保障房申请条件2022
  • 广州天河区网站建设公司东莞网络游戏制作开发
  • 哪个网站做免费小程序rio门户网站的制作
  • 短网站生成查询网站所有关键词排名
  • 阿里云购买网站登录技术服务外包公司
  • 淘宝单页面网站手机制作游戏的软件
  • 汉中市网站建设wordpress编辑器好麻烦
  • 织梦做的网站快照被攻击在线看crm系统
  • 青岛物流公司网站建设网站建设提议
  • 企业网站建设高端品牌宿州注册公司多少钱
  • 个人微信公众号怎么做微网站吗湛江网站制作方案