当前位置: 首页 > news >正文

非法网站开发是什么意思做网站租服务器

非法网站开发是什么意思,做网站租服务器,息烽县抖音seo推广,企业邮箱在哪里查整体架构如下所示#xff1a; responseQueue不在RequestChannel中#xff0c;在Processor中#xff0c;每个Processor内部有一个responseQueue 客户端发送的请求被Acceptor转发给Processor处理处理器将请求放到RequestChannel的requestQueue中KafkaRequestHandler取出reque…整体架构如下所示 responseQueue不在RequestChannel中在Processor中每个Processor内部有一个responseQueue 客户端发送的请求被Acceptor转发给Processor处理处理器将请求放到RequestChannel的requestQueue中KafkaRequestHandler取出requestQueue中的请求调用KafkaApis进行业务逻辑处理KafkaApis将响应结果放到对应的Processor的responseQueue中processor从responseQueue中取出响应结果processor将响应结果返回给客户端 KafkaServer是Kafka服务端的主类KafkaServer中和网络成相关的服务组件包括SocketServer、KafkaApis和KafkaRequestHandlerPool。SocketServer主要关注网络层的通信协议具体的业务处理逻辑则交给KafkaRequestHandler和KafkaApis来完成。 class KafkaServer(val config: KafkaConfig) {def startup() {socketServer new SocketServer(config, metrics, time, credentialProvider)socketServer.startup(startupProcessors false)/* start processing requests */apis new KafkaApis(socketServer.requestChannel, ...)requestHandlerPool new KafkaRequestHandlerPool(config.brokerId, ...)}}SocketServer def startup(startupProcessors: Boolean true) {this.synchronized {...createAcceptorAndProcessors(config.numNetworkThreads, config.listeners)if (startupProcessors) {startProcessors()}}private def createAcceptorAndProcessors(processorsPerListener: Int,endpoints: Seq[EndPoint]): Unit synchronized {...endpoints.foreach { endpoint ...val acceptor new Acceptor(endpoint, ...)addProcessors(acceptor, endpoint, processorsPerListener)KafkaThread.nonDaemon(skafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}, acceptor).start()acceptor.awaitStartup()acceptors.put(endpoint, acceptor)}}可以看出SocketServer.startup()中会根据listener的个数创建相同个数的acceptor每个acceptor关联数个processor。这是一种典型的Reactor模式acceptor负责与客户端建立连接并将连接分发给processorprocessor负责所分连接后续的所有读写交互。 Acceptor def run() {serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)startupComplete()try {var currentProcessor 0while (isRunning) {try {val ready nioSelector.select(500)if (ready 0) {val keys nioSelector.selectedKeys()val iter keys.iterator()while (iter.hasNext isRunning) {try {val key iter.nextiter.remove()if (key.isAcceptable) {val processor synchronized {currentProcessor currentProcessor % processors.sizeprocessors(currentProcessor)}accept(key, processor)} elsethrow new IllegalStateException(Unrecognized key state for acceptor thread.)// round robin to the next processor thread, mod(numProcessors) will be done latercurrentProcessor currentProcessor 1} catch {case e: Throwable error(Error while accepting connection, e)}}}}catch {// We catch all the throwables to prevent the acceptor thread from exiting on exceptions due// to a select operation on a specific channel or a bad request. We dont want// the broker to stop responding to requests from other clients in these scenarios.case e: ControlThrowable throw ecase e: Throwable error(Error occurred, e)}}} finally {debug(Closing server socket and selector.)CoreUtils.swallow(serverChannel.close(), this, Level.ERROR)CoreUtils.swallow(nioSelector.close(), this, Level.ERROR)shutdownComplete()}}上面是Acceptor的run()方法可以看出Acceptor在通道上注册了SelectionKey.OP_ACCEPT事件OP_READ、OP_WRITE、OP_CONNECT、OP_ACCEPT客户端监听OP_CONNECT事件负责发起连接服务端监听OP_CONNECT事件负责建立连接负责与客户端建立连接。并将建立的连接通过轮询的方式指派给processor。 Processor 每个Processor都会分到数个与客户端的连接。Processor的处理逻辑如下所示 override def run() {startupComplete()try {while (isRunning) {try {// 在新分到的客户端连接上注册OP_READ事件configureNewConnections()// 从responseQueue中取响应赋值给KafkaChannel的send等待poll时发送processNewResponses()// selector轮询各种事件读取请求或者发送响应poll()// 封装selector.completedReceives中的请求放入requestQueueprocessCompletedReceives()// 处理selector.completedSends响应移除inflightResponses中的记录执行响应的回调函数processCompletedSends()processDisconnected()} catch {...}}} finally {...}}Processor线程的名字中有kafka-network字样可以通过jstack -l pid | grep kafka-network进行筛选。 KafkaRequestHandlerPool KafkaServer会创建请求处理线程池KafkaRequestHandlerPool在请求处理线程池中会创建并启动多个请求处理线程KafkaRequestHandler。KafkaRequestHandler会获取RequestChannel.requestQueue中的请求进行处理在内部实际处理会交给KafkaApis完成。 class KafkaRequestHandlerPool(val brokerId: Int, ...) {...for (i - 0 until numThreads) {createHandler(i)}def createHandler(id: Int): Unit synchronized {runnables new KafkaRequestHandler(..., requestChannel, apis, time)KafkaThread.daemon(kafka-request-handler- id, runnables(id)).start()} }KafkaRequestHandler的run()方法如下 class KafkaRequestHandler(id: Int,...) extends Runnable with Logging {...def run() {while (!stopped) {val req requestChannel.receiveRequest(300)req match {case RequestChannel.ShutdownRequest shutdownComplete.countDown()returncase request: RequestChannel.Request try {request.requestDequeueTimeNanos endTimeapis.handle(request)} catch {case e: FatalExitError shutdownComplete.countDown()Exit.exit(e.statusCode)case e: Throwable error(Exception when handling request, e)} finally {request.releaseBuffer()}case null // continue}}shutdownComplete.countDown()}}
http://www.w-s-a.com/news/821731/

相关文章:

  • 网站设计网页设计系统没有安装wordpress
  • 建网站做哪方面公司百度官网优化
  • 山西网站seo网站采集信息怎么做
  • 同江佳木斯网站建设seo学徒培训
  • 淘宝不能发布网站源码做商品怀化网站制作建设
  • 买空间哪个网站好做我的世界背景图的网站
  • 南京哪里做网站wordpress 增加子目录
  • 刚做的网站搜全名查不到网站很难被百度收录
  • 网站建设与管理期末做网站买空间用共享ip
  • 网络合同怎么签有效南京seo公司哪家
  • 厦门建设网官方网站上海网络网站建
  • 网站制作西安郑州网站建设动态
  • 外贸网站免费推广温州做网站技术员
  • 武冈 网站建设做网站能收回吗
  • 网站做前端把网站扒下来以后怎么做
  • 网站模板素材下载手机做任务佣金的网站
  • 机关网站建设考核测评总结做网站sqlserver排序
  • 凉山州建设厅官方网站html5下载教程
  • 内网网站建设方面政策id97网站怎么做的
  • 福州企业建站系统七米网站建设
  • 长春seo建站北京做机床的公司网站
  • 网站维护具体做啥如何开发wap网站
  • 公司网站设计费计入什么科目潍坊公司网站制作
  • 拖拽式网站开发模具钢东莞网站建设
  • 彩票娱乐网站建设模块化网站开发
  • 孝感网站设计用自己的名字设计头像
  • 高明网站建设哪家好深圳vi设计公司全力设计
  • 工程技术cpu游戏优化加速软件
  • 一起做网店网站入驻收费wordpress 自定义评论样式
  • 深圳高端网站建设公司排名app软件开发sh365