当前位置: 首页 > news >正文

乐清建网站公司哪家好flex网站模板

乐清建网站公司哪家好,flex网站模板,网站制作的内容包含,wordpress屏蔽广告插件文章目录 整体设计processMail1.Checkpoint Tigger2.ProcessingTime Timer Trigger processInput兼容SourceStreamTask 整体设计 Mailbox线程模型通过引入阻塞队列配合一个Mailbox线程的方式#xff0c;可以轻松修改StreamTask内部状态的修改。Checkpoint、ProcessingTime Ti… 文章目录 整体设计processMail1.Checkpoint Tigger2.ProcessingTime Timer Trigger processInput兼容SourceStreamTask 整体设计 Mailbox线程模型通过引入阻塞队列配合一个Mailbox线程的方式可以轻松修改StreamTask内部状态的修改。Checkpoint、ProcessingTime Timer的相关操作Runnable任务会以Mail的形式保存到Mailbox内的阻塞队列中。StreamTask在invoke阶段的runMailboxLoop时期就会轮询Mailbox来处理队列中保存的MailMail处理完毕后才会对DataStream上的数据元素执行处理逻辑。 MailboxProcessor的能力就是负责拉取、处理Mail以及执行MailboxDefaultAction默认动作即processInput()方法中对DataStream上的普通消息的处理逻辑包括处理Event、barrier、Watermark等。 /*** 开始轮询Mailbox内的MailCheckpoint和ProcessingTime Timer的触发事件会以Runnable的形式作为Mail添加到Mailbox的队列中等待“Mailbox线程”去处理*/ public void runMailboxLoop() throws Exception {// 获取最新的TaskMailbox主要用于存储提交的Mail并提供获取接口。// TaskMailbox有2个队列// 1.queue阻塞队列通过ReentrantLock控制队列中的读写操作// 2.batch非阻塞队列调用createBatch()方法会将queue中的Mail转存到batch中这样读操作就能通过tryTakeFromBatch()方法从batch队列中批量获取Mail且只能被Mailbox线程消费final TaskMailbox localMailbox mailbox;// 检查当前线程是否为Mailbox线程即StreamTask运行时所在的主线程Preconditions.checkState(localMailbox.isMailboxThread(),Method must be executed by declared mailbox thread!);// 确认Mailbox的状态必须为OPENassert localMailbox.getState() TaskMailbox.State.OPEN : Mailbox must be opened!;// 创建MailboxController实例可以控制Mailbox的循环、临时暂停和恢复MailboxDefaultAction默认动作final MailboxController defaultActionContext new MailboxController(this);/*** 核心事件循环* processMail()方法会检测Mailbox中是否还有Mail需要处理新Mail会在ReentrantLock的保护下被添加到queue队列并转存到batch队列中。* MailboxProcessor处理完batch队列中的全部Mail后执行作为Mail的Runnable#run()方法才会进入到while循环内执行MailboxDefaultAction的默认动作* 即调用StreamTask#processInput()方法对读取到的数据Event、Barrier、Watermark等进行处理*/while (processMail(localMailbox)) {mailboxDefaultAction.runDefaultAction(defaultActionContext); // lock is acquired inside default action as needed} }可以看出对Mail和MailboxDefaultAction的处理是由唯一的Mailbox线程负责的。 processMail 在while轮询时首先会processMail /*** 处理Mailbox中的MailCheckpoint、ProcessingTime Timer的触发事件会以Runnable的形式作为Mail保存在Mailbox的queue队列中* 并在ReentrantLock的保护下将queue队列中的新Mail转移到batch队列中。MailboxProcessor会根据queue队列、batch队列内的Mail情况* 决定处理Mail or processInput。只有当TaskMailbox内的所有Mail全都处理完毕后MailboxProcessor才会去processInput()*/ private boolean processMail(TaskMailbox mailbox) throws Exception {/*** 新Mail写入queue队列TaskMailbox会将queue队列中的新Mail转移到batch队列中MailboxProcessor会根据queue队列、batch队列内的Mail情况* 判断执行Mail的run() or processInput()。只有当TaskMailbox内的所有Mail全部处理完成后MailboxProcessor才会去processInput()。*/if (!mailbox.createBatch()) {return true;}OptionalMail maybeMail;/*** 能走到这说明queue队列中的Mail已被全部转移至batch队列。现在要从batch队列中获取到Mail并执行它作为Runnable的run()方法* 直到batch队列中的所有Mail全都处理完毕*/while (isMailboxLoopRunning() (maybeMail mailbox.tryTakeFromBatch()).isPresent()) {maybeMail.get().run();}/**如果默认操作处于Unavailable状态那就先阻塞住直到它重新回归available状态*/while (isDefaultActionUnavailable() isMailboxLoopRunning()) {mailbox.take(MIN_PRIORITY).run();}// 返回Mailbox是否还在Loopreturn isMailboxLoopRunning(); }很核心的一个点就是Mailbox要去createBatchTaskMailboxImpl提供了具体的实现逻辑。Mailbox引入了2个队列新Mail被add到Mailbox内的queue队列中此过程受ReentrantLock保护。同时为了减少读取queue队列时的同步开销Mailbox还构建了一个batch队列专门用来后续消费避免加锁操作。 /*** 对DequeMail队列的读写通过ReentrantLock加以保护*/ private final ReentrantLock lock new ReentrantLock();/*** Internal queue of mails.* 使用Deque内部队列保存所有的Mail*/ GuardedBy(lock) private final DequeMail queue new ArrayDeque(); /*** 为了减少读取queue队列所造成的同步开销TaskMailbox会创建一个batch队列queue队列中的Mail会被转移到batch队列中* 有效避免了后续消费时的加锁操作*/ private final DequeMail batch new ArrayDeque();Override public boolean createBatch() {checkIsMailboxThread();/*** 如果queue队列中没有新Mail那就要看batch队列是否为空。* 1.如果batch也是空的Mailbox里已经没有任何Mail了需要去processInput()了那processMail()也会return true* MailboxProcessor就会进入到while循环内部执行processInput()来处理DataStream上的数据* 2.如果batch不空说明MailboxProcessor还需要继续processMail()即取出Mail执行它作为Runnable的run()方法* 由此可见Mailbox中的batch队列中的Mail最终一定会被Mailbox线程消耗殆尽轮询、处理然后才会去processInput()*/if (!hasNewMail) { // 只要queue队列里还有MailhasNewMail就为truereturn !batch.isEmpty();}/**能走到这说明queue队列中仍有新Mail接下来需要将它的新Mail向batch队列转移该过程受ReentrantLock保护*/final ReentrantLock lock this.lock;// 获取锁lock.lock();try {Mail mail;/**每次循环都将queue队列中的First Mail转移到batch队列中直至queue队列被消耗殆尽。此时一定return true*/while ((mail queue.pollFirst()) ! null) {batch.addLast(mail);}// 此时queue队列内的所有Mail都被转移到batch队列中了queue中没有新Mail了hasNewMail false;// 此时根据batch队列是否为空MailboxProcessor会判断执行Mail的run() or processInput()return !batch.isEmpty();} finally {// 最终释放锁lock.unlock();} }如果Mailbox内的queue队列中仍有新Mail那就在ReentrantLock的加持下将queue内的Mail全都转移到batch队列中如果Mailbox内的queue队列中没有新Mail那就看batch队列的情况了。决断权交给外层的MailboxProcessor总的来看 如果batch队列中有MailMailboxProcessor会从Mailbox内的batch队列中逐个pollFirst然后执行它作为Runnable#run()方法直到batch队列中的所有Mail全都被“消耗殆尽”为止如果batch队列中没有MailMailboxProcessor此时就没有Mail可处理了那就直接processInput 1.Checkpoint Tigger 对Checkpoint的触发是通过MailboxExecutor向Mailbox提交Mail的 /*** 触发执行StreamTask中的Checkpoint操作异步的通过MailboxExecutor将“执行Checkpoint”的请求封装成Mail后* 提交到TaskMailbox中最终由MailboxProcessor来处理*/ Override public FutureBoolean triggerCheckpointAsync(CheckpointMetaData checkpointMetaData,CheckpointOptions checkpointOptions,boolean advanceToEndOfEventTime) {// 通过MailboxExecutor将“触发执行Checkpoint”的具体逻辑封装成Mail提交到Mailbox中后期会被MailboxProcessor执行return mailboxProcessor.getMainMailboxExecutor().submit(// 触发Checkpoint的具体逻辑() - triggerCheckpoint(checkpointMetaData, checkpointOptions, advanceToEndOfEventTime),checkpoint %s with %s,checkpointMetaData,checkpointOptions); }triggerCheckpoint操作会被封装成Mail添加到Mailbox中等待被处理。 Override public void execute(Nonnull final RunnableWithException command,final String descriptionFormat,final Object... descriptionArgs) {try {mailbox.put(new Mail(command, priority, actionExecutor, descriptionFormat, descriptionArgs));} catch (IllegalStateException mbex) {throw new RejectedExecutionException(mbex);} }当然Checkpoint的完成操作也是同样的套路。 2.ProcessingTime Timer Trigger /*** 借助Mailbox线程模型由MailboxExecutor负责将ProcessingTime Timer触发的消息封装成Mail提交到TaskMailbox中后续由MailboxProcessor处理*/ public ProcessingTimeService getProcessingTimeService(int operatorIndex) {Preconditions.checkState(timerService ! null, The timer service has not been initialized.);MailboxExecutor mailboxExecutor mailboxProcessor.getMailboxExecutor(operatorIndex);// 通过MailboxExecutor将Mail提交到Mailbox中等待处理return new ProcessingTimeServiceImpl(timerService, callback - deferCallbackToMailbox(mailboxExecutor, callback)); }private ProcessingTimeCallback deferCallbackToMailbox(MailboxExecutor mailboxExecutor, ProcessingTimeCallback callback) {return timestamp - {mailboxExecutor.execute(() - invokeProcessingTimeCallback(callback, timestamp),Timer callback for %s %d,callback,timestamp);}; }processInput StreamInputProcessor会对输入的数据进行处理、输出包含StreamTaskInput OperatorChain DataOutput。每次processInput都相当于是在处理一个有界流外层MailboxProcessor在不断地的轮询处理完DataStream上的StreamRecord后会返回InputStatus的枚举值根据InputStatus值来决定下一步该“何去何从”。 /*** StreamTask的执行逻辑处理输入的数据返回InputStatus状态并根据InputStatus决定是否需要结束当前Task。* 该方法会通过MailboxProcessor调度、执行作为MailboxProcessor的默认动作底层调用StreamInputProcessor#processInput()方法*/ protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {/*** 核心借助StreamInputProcessor完成数据的读取并交给算子处理处理完毕后会返回InputStatus。* 每次触发相当于处理一个有界流在外层Mailbox拉取Mail才是while循环无限拉取*/InputStatus status inputProcessor.processInput();/*** case 1上游如果还有数据 RecordWriter是可用的立即返回。意为继续处理*/if (status InputStatus.MORE_AVAILABLE recordWriter.isAvailable()) {return;}/*** case 2当状态为END_OF_INPUT说明本批次的有界流数据已经处理完毕* 通过MailboxCollector来告诉Mailbox*/if (status InputStatus.END_OF_INPUT) {controller.allActionsCompleted();return;}/*** case 3当前有界流中没有数据但未来可能会有。此时处理线程会被挂起直到有新的可用数据到来 RecordWriter可用* 此时会先临时暂停对MailboxDefaultAction的处理等啥时候又有新数据了再重新恢复MailboxDefaultAction的处理。*/CompletableFuture? jointFuture getInputOutputJointFuture(status);// 通过MailboxCollector让Mailbox线程暂时停止对MailboxDefaultAction的处理MailboxDefaultAction.Suspension suspendedDefaultAction controller.suspendDefaultAction();// 等啥时候又有了input、outputRecordWriter也变得可用了以后再重新继续执行默认操作jointFuture.thenRun(suspendedDefaultAction::resume); }MailboxController是MailboxDefaultAction和Mailbox之间交互的桥梁在StreamTask处理DataStream元素的过程中会利用MailboxController将处理状态及时通知给Mailbox。如果这批有界流处理完毕就会通过MailboxController通知Mailbox本质就是向Mailbox发送一个Mail进行下一轮的处理。 private void sendControlMail(RunnableWithException mail, String descriptionFormat, Object... descriptionArgs) {mailbox.putFirst(new Mail(mail,Integer.MAX_VALUE /*not used with putFirst*/,descriptionFormat,descriptionArgs)); }兼容SourceStreamTask 作为DataStream Source是专门用来生产无界流数据的并不能穿插兼顾Mailbox内Mail的检测。如果仅有一个线程生产无界流数据的话那将永远无法检测Mailbox内的Mail。作为StreamTask的子类SourceStreamTask会额外启动另一个独立的LegacySourceFunctionThread线程来执行SourceFunction中的循环生产无界流Mailbox线程主线程依然负责处理Mail和默认操作。 /*** 专门为Source源生产数据的线程*/ private class LegacySourceFunctionThread extends Thread {private final CompletableFutureVoid completionFuture;LegacySourceFunctionThread() {this.completionFuture new CompletableFuture();}Overridepublic void run() {try {// CheckpointLock保证线程安全headOperator.run(getCheckpointLock(), getStreamStatusMaintainer(), operatorChain);completionFuture.complete(null);} catch (Throwable t) {// Note, t can be also an InterruptedExceptioncompletionFuture.completeExceptionally(t);}}public void setTaskDescription(final String taskDescription) {setName(Legacy Source Thread - taskDescription);}CompletableFutureVoid getCompletionFuture() {return isFailing() !isAlive() ? CompletableFuture.completedFuture(null) : completionFuture;} }负责为Source生产无界流数据的LegacySourceFunctionThread线程启动后不管是启动成功 or 出现异常都会封装对应的Mail并发送给Mailbox而Mailbox线程的processMail一直在等待处理Mail。 /*** SourceStreamTask中一个Thread负责专门生产无界流另一个MailBox Thread处理Checkpoint、ProcessingTime Timer等事件Mail*/ Override protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {/*** 通过MailboxDefaultAction.Controller告诉Mailbox让MailboxThread先暂停处理MailboxDefaultAction。* TaskMailbox收到该消息后就会在processMail()中一直等待并处理Mail在MailboxThread中会一直处理Mail*/controller.suspendDefaultAction();/**启动LegacySourceFunctionThread线程专门生产Source无界流数据的和MailboxThread线程一起运行*/sourceThread.setTaskDescription(getName());sourceThread.start();/**LegacySourceFunctionThread线程启动后会通知MailboxMailbox会在processMail()中一直等待并处理mail不会返回即Mailbox线程会一直处理mail*/sourceThread.getCompletionFuture().whenComplete((Void ignore, Throwable sourceThreadThrowable) - {/**LegacySourceFunctionThread线程启动过程中发生的任何异常、以及启动成功都会以Mail的形式发送给Mailbox*/if (isCanceled() ExceptionUtils.findThrowable(sourceThreadThrowable, InterruptedException.class).isPresent()) {mailboxProcessor.reportThrowable(new CancelTaskException(sourceThreadThrowable));} else if (!isFinished sourceThreadThrowable ! null) {mailboxProcessor.reportThrowable(sourceThreadThrowable);} else {mailboxProcessor.allActionsCompleted();}}); }Mailbox主线程和LegacySourceFunctionThread线程线程都在运行通过CheckpointLock锁来保证线程安全。
http://www.w-s-a.com/news/226460/

相关文章:

  • 桂林北站离阳朔多远贵州省建设厅住房和城乡建设官网二建考试
  • 浙江省建设厅 网站是多少wordpress淘宝客一键
  • 网站流量少怎么做5个不好的网站
  • 随州网站建设有限公司个人申请注册公司需要多少钱
  • 东莞做商城网站建设wordpress批量下载外链图片
  • 新网站建设运营年计划书仓山区建设局招标网站
  • 网站开发天津网站建设项目组织图
  • 网站开发认证考试石家庄高端网站开发
  • 网站建设第一步怎么弄站酷网页
  • 设备网站模板江西的赣州网站建设
  • 邯郸营销型网站国际招聘人才网
  • hexo wordpress 主题织梦网站优化教程
  • 网站建设方案及上海市建设协会网站
  • 轴承外贸网站怎么做南宁网站排名优化公司哪家好
  • 沈阳企业网站建站郴州优化公司
  • cctv5+手机在线直播观看seo关键词排名优化方法
  • 网站建设公司怎么谈单怎么开通微信小程序商店
  • 深圳做网站案例一个服务器可以备案几个网站
  • 网络营销策划名词解释泉州百度推广排名优化
  • 一键生成网站的软件互联网营销师是干什么
  • 网站后台管理水印怎么做手机优化设置
  • 哪个网站做图文素材多wordpress++优化
  • 建设网站就选用什么样的公司网站类型分类有哪些
  • 找平面设计师网站网站建设须知
  • 建设联结是不是正规网站wordpress 微博同步
  • 瑞安微网站建设广州推广
  • 做旅游宣传网站的流程图中国企业集成网电子商务
  • 开发商城网站开发成交功能网站
  • 网站建设公司专业公司排名搭建网站的企业
  • 网站建设难吗海南智能网站建设报价