当前位置: 首页 > news >正文

为什么要建设营销型网站备案添加网站

为什么要建设营销型网站,备案添加网站,wordpress windows 慢,网页设计速成培训生产者管理TCP连接 Kafka生产者程序概览 Kafka的Java生产者API主要的对象就是KafkaProducer。通常我们开发一个生产者的步骤有4步#xff1a; 第1步#xff1a;构造生产者对象所需的参数对象。 第2步#xff1a;利用第1步的参数对象#xff0c;创建KafkaProducer对象实例…生产者管理TCP连接 Kafka生产者程序概览 Kafka的Java生产者API主要的对象就是KafkaProducer。通常我们开发一个生产者的步骤有4步  第1步构造生产者对象所需的参数对象。 第2步利用第1步的参数对象创建KafkaProducer对象实例。 第3步使用KafkaProducer的send方法发送消息。 第4步调用KafkaProducer的close方法关闭生产者并释放各种系统资源。 上面这4步写成Java代码的话大概是这个样子 Properties props new Properties (); props.put(“参数1”, “参数1的值”) props.put(“参数2”, “参数2的值”) …… try (ProducerString, String producer new KafkaProducer(props)) {producer.send(new ProducerRecordString, String(……), callback);…… } 何时创建TCP连接 1. TCP连接在创建KafkaProducer实例时建立 在创建KafkaProducer实例时生产者应用会在后台创建并启动一个名为 Sender的线程该Sender线程开始运行时首先会创建与Broker的TCP连接的。 如果不调用send方法这个Producer都不知道给哪个主题发消息它又怎么能知道连接哪个Broker呢这是通过Producer的核心参数之一bootstrap.servers参数来指定的。 如果为这个参数指定了1000个Broker连接信息Producer启动时会首先创建与这1000个Broker的TCP连接所以不建议把集群中所有的Broker信息都配置到bootstrap.servers中因为Producer一旦连接到集群中的任一台Broker就能拿到整个集群的Broker信息故没必要为bootstrap.servers指定所有的Broker。通过日志可以看出 [2018-12-09 09:35:45,828] DEBUG[ProducerclientIdproducer-1] Sendingmetadatarequest (typeMetadataRequest, topics) to nodelocalhost:9093 (id:-2 rack: null) (org.apache.kafka.clients.NetworkClient:1068) Producer向某一台Broker发送了MetadataRequest请求尝试获取集群的元数据信息——这就是前面提到的Producer能够获取集群所有信息的方法。  2. 在更新元数据后 和 在消息发送时 (1) 当Producer尝试给一个不存在的主题发送消息时Broker会告诉Producer说这个主题不存在。此时Producer会发送METADATA请求给Kafka集群去尝试获取最新的元数据信息。 (2) Producer通过metadata.max.age.ms参数定期地去更新元数据信息。该参数的默认值是300000即5分钟也就是说不管集群那边是否有变化Producer每5分钟都会强制刷新一次元数据以保证它是最及时的数据。 何时关闭TCP连接 1. 用户主动关闭 这里的主动关闭实际上是广义的主动关闭甚至包括用户调用kill-9主动“杀掉”Producer应用。当然最推荐的方式还是调用producer.close()方法来关闭。 2. Kafka自动关闭 这与Producer端参数connections.max.idle.ms的值有关。默认情况下该参数值是9分钟即如果在9分钟内没有任何请求“流过”某个TCP连接那么Kafka会主动帮你把该TCP连接关 闭。用户可以在Producer端设置connections.max.idle.ms-1禁掉这种机制。一旦被设置成-1TCP连接将成为永久长连接。当然这只是软件层面的“长连接”机制由于Kafka创建的这些Socket连接都开启了 keepalive因此keepalive探活机制还是会遵守的。 自动关闭中TCP连接是在Broker端被关闭的但其实这个TCP连接的发起方是客户端因此在TCP看来这属于被动关闭的场景即passive close。被动关闭的后果就是会产生大量的 CLOSE_WAIT连接因此Producer端或Client端没有机会显式地观测到此连接已被中断。 小结 Java Producer端管理TCP连接的方式是 1. KafkaProducer实例创建时启动Sender线程从而创建与bootstrap.servers中所有Broker的TCP连接。 2. KafkaProducer实例首次更新元数据信息之后还会再次创建与集群中所有Broker的TCP连接。 3. 如果Producer端发送消息到某台Broker时发现没有与该Broker的TCP连接那么也会立即创建连接。 4. 如果设置Producer端connections.max.idle.ms参数大于0则步骤1中创建的TCP连接会被自动关闭如果设置该参数-1那么步骤1中创建的TCP连接将无法被关闭从而成为“僵尸”连接。 消费者管理TCP连接 何时创建TCP连接 和生产者不同的是构建KafkaConsumer实例时是不会创建任何TCP连接的。TCP连接是在调用KafkaConsumer.poll方法时被创建的。再细粒度地说在poll方法内部有3个时机可以创建TCP连接 1.发起FindCoordinator请求时。 消费者端有个组件叫协调者Coordinator它驻留在Broker端的内存中负责消费者组的组成员管理和各个消费者的位移提交管理。当消费者程序首次启动调用poll方法时它需要向Kafka集群 发送一个名为FindCoordinator的请求向集群中当前负载最小的那台Broker发送请求。希望Kafka集群告诉它哪个Broker是管理它的协调者。 2.连接协调者时。 Broker处理完上一步发送的FindCoordinator请求之后会返还对应的响应结果Response显式地告诉消费者哪个Broker是真正的协调者因此在这一步消费者知晓了真正的协调者后会创建连向该 Broker的Socket连接。只有成功连入协调者协调者才能开启正常的组协调操作比如加入组、等待组分配方案、心跳请求处理、位移获取、位移提交等。 3.消费数据时。 消费者会为每个要消费的分区创建与该分区领导者副本所在Broker连接的TCP。举个例子假设消费者要消费5个分区的数据这5个分区各自的领导者副本分布在4台Broker上那么该消费者在消费时会创建与这4台Broker的Socket连接。 注意当第三类TCP连接成功创建后消费者程序就会废弃第一类TCP连接第一类TCP连接会在后台被默默地关闭掉。对一个运行了一段时间的消费者程序来说只会有后面两类TCP连接存在。 通过日志查看 [2019-05-27 10:00:54,142] DEBUG [ConsumerclientIdconsumer-1, groupIdtest] Initiating connection to nodelocalhost:9092 (id: -1 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:944) … [2019-05-27 10:00:54,188] DEBUG [ConsumerclientIdconsumer-1, groupIdtest] Sending metadata request MetadataRequestData(topics[MetadataRequestTopic(name‘t4’)], allowAutoTopicCreationtrue, includeClusterAuthorizedOperationsfalse, includeTopicAuthorizedOperationsfalse) to nodelocalhost:9092 (id: -1 rack: null) (org.apache.kafka.clients.NetworkClient:1097) … [2019-05-27 10:00:54,188] TRACE [ConsumerclientIdconsumer-1, groupIdtest] Sending FIND_COORDINATOR {keytest,key_type0} with correlation id 0 to node-1 (org.apache.kafka.clients.NetworkClient:496) [2019-05-27 10:00:54,203] TRACE [ConsumerclientIdconsumer-1, groupIdtest] Completed receivefrom node-1 for FIND_COORDINATORwith correlation id 0, received {throttle_time_ms0,error_code0,error_messagenull, node_id2,hostlocalhost,port9094}(org.apache.kafka.clients.NetworkClient:837) … [2019-05-27 10:00:54,204] DEBUG [ConsumerclientIdconsumer-1, groupIdtest] Initiating connection to nodelocalhost:9094 (id: 2147483645 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:944) … [2019-05-27 10:00:54,237] DEBUG [ConsumerclientIdconsumer-1, groupIdtest] Initiating connection to nodelocalhost:9094 (id: 2 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:944) [2019-05-27 10:00:54,237] DEBUG [ConsumerclientIdconsumer-1, groupIdtest] Initiating connection to nodelocalhost:9092 (id: 0 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:944) [2019-05-27 10:00:54,238] DEBUG [ConsumerclientIdconsumer-1, groupIdtest] Initiating connection to nodelocalhost:9093 (id: 1 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:944) 日志的第一行是消费者程序创建的第一个TCP连接就像我们前面说的这个Socket用于发送FindCoordinator请求。由于这是消费者程序创建的第一个连接此时消费者对于要连接 的Kafka集群一无所知因此它连接的Broker节点的ID是-1表示消费者尚未获取到Broker数据。 日志的第二行消费者复用了刚才创建的那个Socket连接向Kafka集群发送元数据请求以获取整个集群的信息。 日志的第三行表明消费者程序开始发送FindCoordinator请求给第一步中连接的Broker即localhost:9092也就是nodeId等于-1的那个。在十几毫秒之后消费者程序成功地获悉协调者所在的Broker信息 也就是第四行的“node_id 2”。 完成这些之后消费者就已经知道协调者Broker的连接信息了因此在日志的第五行发起了第二个Socket连接创建了连向localhost:9094的TCP。只有连接了协调者消费者进程才能正常地开启消费者组 的各种功能以及后续的消息消费。 在日志的最后三行中消费者又分别创建了新的TCP连接主要用于实际的消息获取。要消费的分区的领导者副本在哪台Broker上消费者就要创建连向哪台Broker的TCP。 那么2147483645是怎么来的呢 它是由Integer.MAX_VALUE减去协调者所在Broker的真实ID计算得来的。看第四行的内容我们可以知道协调者ID是2因此这个Socket连接的节点ID就是 Integer.MAX_VALUE减去2即2147483647减去2也就是2147483645。这种节点ID的标记方式目的就是要让组协调请求和真正的数据获取请求使用不同的Socket连接。 至于后面的0、1、2那就很好解释了。它们表征了真实的Broker ID也就是我们在server.properties中配置的broker.id值。 何时关闭TCP连接 1. 手动关闭 手动调用KafkaConsumer.close()方法或者是执行Kill命令。 2. 自动关闭 自动关闭是由消费者端参数connection.max.idle.ms控制的该参数现在的默认值是9分钟即如果某个Socket连接上连续9分钟都没有任何请求“过境”的话那么消费者会强行“杀掉”这个Socket连接。 注意和生产者有些不同的是如果在编写消费者程序时使用了循环的方式来调用poll方法消费消息那么上面提到的所有请求都会被定期发送到Broker因此这些Socket连接上总是能保证有请求在发送从而也就实现了“长连接”的效果。 参考Kafka 核心技术与实战 (geekbang.org)
http://www.w-s-a.com/news/550901/

相关文章:

  • 有哪些ui的设计网站网上商城网站建设设计方案
  • iis中怎样配置网站绑定运城可以做网站的公司
  • 品牌网站建设开发价格dedecms电影网站模板
  • 网站设计外包合同帝国网站后台认证码错误
  • 网站设计公司深圳怎么免费做公司网站
  • 90设计网站几次是什么意思swipe类网站
  • 安康微网站建设网站域名使用费用
  • 网站建设执招标评分表微信代理网站模板
  • ps做网站分辨率自适应地方网站盈利
  • 免费自助小型网站专业网站建设组织
  • 猎聘网网站建设目标查看别人wordpress主题
  • 免费建设网站入驻网站备案不能更新吗
  • 个人网站制作代码西安建筑类公司
  • 网站备案要营业执照吗网站建设如何记账
  • 新手学做网站难吗外包服务商
  • 公司网站建设的项目工作分解结构wordpress插件后端页面
  • 四川省建设人才网站2018南京专业建站
  • ppt制作网站推荐seo教程百度网盘
  • 网站建设多少钱一平米网上商城网站开发报告
  • 福州网站建设招聘信息哈尔滨中企动力科技股份有限公司
  • 军事新闻最新seo关键词查询排名软件
  • 免费网站建设官网项目建设表态发言
  • 平谷建站推广广告投放平台主要有哪些
  • 网站备案掉了什么原因步骤怎么读
  • 徐州市建设监理协会网站做一个公司官网需要多少钱
  • 网站开发学什么数据库做公司网站注意事项
  • 游戏开发网站建设国际战事最新消息
  • 达州+网站建设网站里自己怎么做推广
  • 看网站建设公司的网站案例熊掌号接入wordpress
  • 黄石下陆区建设局网站wordpress如何拖移小工具