当前位置: 首页 > news >正文

长春星宿网站建设公司怎么样康乐县网站建设

长春星宿网站建设公司怎么样,康乐县网站建设,网络营销专业就业公司,网站更换服务器教程kafka实战教程(python操作kafka)#xff0c;kafka配置文件详解 Kafka内外网访问的设置 1 kafka简介 根据官网的介绍#xff0c;ApacheKafka是一个分布式流媒体平台#xff0c;它主要有3种功能#xff1a; (1)发布和订阅消息流#xff0c;这个功能类似于消息队列#x…kafka实战教程(python操作kafka)kafka配置文件详解 Kafka内外网访问的设置 1 kafka简介 根据官网的介绍ApacheKafka®是一个分布式流媒体平台它主要有3种功能 (1)发布和订阅消息流这个功能类似于消息队列这也是kafka归类为消息队列框架的原因。 (2)以容错的方式记录消息流kafka以文件的方式来存储消息流。 (3)可以在消息发布的时候进行处理。 使用场景: (1)在系统或应用程序之间构建可靠的用于传输实时数据的管道消息队列功能。 (2)构建实时的流数据处理程序来变换或处理数据流数据处理功能。 1.1 kafka生产者 (1)首先创建ProducerRecord必须包含Topic和Valuekey和partition可选。 (2)然后序列化key和value对象为ByteArray并发送到网络。 (3)接下来消息发送到partitioner。 如果创建ProducerRecord时指定了partition此时partitioner啥也不用做简单的返回指定的partition即可。 如果未指定partitionpartitioner会基于ProducerRecord的key生成partition。 (4)producer选择好partition后增加record到对应topic和partition的batch record。 (5)最后专有线程负责发送batch record到合适的Kafka broker。 (6)当broker收到消息时它会返回一个应答response。 如果消息成功写入Kafkabroker将返回RecordMetadata对象包含topicpartition和offset 相反broker将返回error。这时producer收到error会尝试重试发送消息几次直到producer返回error。 实例化producer后接着发送消息。 这里主要有3种发送消息的方法 (1)立即发送只管发送消息到server端不care消息是否成功发送。大部分情况下这种发送方式会成功因为Kafka自身具有高可用性producer会自动重试但有时也会丢失消息 (2)同步发送通过send()方法发送消息并返回Future对象。get()方法会等待Future对象看send()方法是否成功 (3)异步发送通过带有回调函数的send()方法发送消息当producer收到Kafka broker的response会触发回调函数。 以上所有情况一定要时刻考虑发送消息可能会失败想清楚如何去处理异常。 通常我们是一个producer起一个线程开始发送消息。为了优化producer的性能一般会有下面几种方式单个producer起多个线程发送消息使用多个producer。 生产者在向kafka集群发送消息的时候可以通过指定分区来发送到指定的分区中。 也可以通过指定均衡策略来将消息发送到不同的分区中。 如果不指定就会采用默认的随机均衡策略将消息随机的存储到不同的分区中。 1.2 kafka消费者 Consumer即消费者消费者通过与kafka集群建立长连接的方式不断地从集群中拉取消息然后可以对这些消息进行处理。 kafka的消费模式总共有3种最多一次最少一次正好一次。为什么会有这3种模式是因为客户端处理消息提交反馈commit这两个动作不是原子性。 (1)最多一次客户端收到消息后在处理消息前自动提交这样kafka就认为consumer已经消费过了偏移量增加。 (2)最少一次客户端收到消息处理消息再提交反馈。这样就可能出现消息处理完了在提交反馈前网络中断或者程序挂了那么kafka认为这个消息还没有被consumer消费产生重复消息推送。 (3)正好一次保证消息处理和提交反馈在同一个事务中即有原子性。 详细阐述如何实现以上三种方式。 (1)At-most-once最多一次 设置enable.auto.commit为ture 设置 auto.commit.interval.ms为一个较小的时间间隔. client不要调用commitSync()kafka在特定的时间间隔内自动提交。(2)At-least-once最少一次 方法一 设置enable.auto.commit为false client调用commitSync()增加消息偏移;方法二 设置enable.auto.commit为ture 设置 auto.commit.interval.ms为一个较大的时间间隔. client调用commitSync(),增加消息偏移;(3)Exactly-once正好一次 如果要实现这种方式必须自己控制消息的offset自己记录一下当前的offset对消息的处理和offset的移动必须保持在同一个事务中例如在同一个事务中把消息处理的结果存到mysql数据库同时更新此时的消息的偏移。 设置enable.auto.commit为false 保存ConsumerRecord中的offset到数据库当partition分区发生变化的时候需要rebalance有以下几个事件会触发分区变化。 1 consumer订阅的topic中的分区大小发生变化 2 topic被创建或者被删除 3 consuer所在group中有个成员挂了 4 新的consumer通过调用join加入了group此时 consumer通过实现ConsumerRebalanceListener接口捕捉这些事件对偏移量进行处理。consumer通过调用seek(TopicPartition, long)方法移动到指定的分区的偏移位置。 当新的消费者加入消费组它会消费一个或多个分区而这些分区之前是由其他消费者负责的另外当消费者离开消费组比如重启、宕机等时它所消费的分区会分配给其他分区。这种现象称为重平衡rebalance。重平衡是Kafka一个很重要的性质这个性质保证了高可用和水平扩展。 不过也需要注意到在重平衡期间所有消费者都不能消费消息因此会造成整个消费组短暂的不可用。而且将分区进行重平衡也会导致原来的消费者状态过期从而导致消费者需要重新更新状态这段期间也会降低消费性能。后面我们会讨论如何安全的进行重平衡以及如何尽可能避免。 消费者通过定期发送心跳hearbeat到一个作为组协调者group coordinator的broker来保持在消费组内存活。这个broker不是固定的每个消费组都可能不同。当消费者拉取消息或者提交时便会发送心跳。 如果消费者超过一定时间没有发送心跳那么它的会话session就会过期组协调者会认为该消费者已经宕机然后触发重平衡。可以看到从消费者宕机到会话过期是有一定时间的这段时间内该消费者的分区都不能进行消息消费通常情况下我们可以进行优雅关闭这样消费者会发送离开的消息到组协调者这样组协调者可以立即进行重平衡而不需要等待会话过期。 在0.10.1版本Kafka对心跳机制进行了修改将发送心跳与拉取消息进行分离这样使得发送心跳的频率不受拉取的频率影响。另外更高版本的Kafka支持配置一个消费者多长时间不拉取消息但仍然保持存活这个配置可以避免活锁livelock。活锁是指应用没有故障但是由于某些原因不能进一步消费。 在消费者消费消息时kafka使用offset来记录当前消费的位置。 在kafka的设计中可以有多个不同的group来同时消费同一个topic下的消息如图我们有两个不同的group同时消费他们的的消费的记录位置offset各不相同不互相干扰。 对于一个group而言消费者的数量不应该多于分区的数量因为在一个group中每个分区至多只能绑定到一个消费者上即一个消费者可以消费多个分区一个分区只能给一个消费者消费。 因此若一个group中的消费者数量大于分区数量的话多余的消费者将不会收到任何消息。 1.3 Broker Kafka是一个高吞吐量分布式消息系统采用Scala和Java语言编写它提供了快速、可扩展的、分布式、分区的和可复制的日志订阅服务。它由Producer、Broker、Consumer三部分构成. Producer向某个Topic发布消息而Consumer订阅某个Topic的消息。 一旦有某个Topic新产生的消息Broker会传递给订阅它的所有Consumer每个Topic分为多个分区这样的设计有利于管理数据和负载均衡。 Broker消息中间件处理结点一个Kafka节点就是一个broker多个broker可以组成一个Kafka集群。 Controller中央控制器Control负责管理分区和副本状态并执行管理着这些分区的重新分配。里面涉及到partition leader 选举。 ISR同步副本组。 谈到kafka的存储就不得不提到分区即partitions创建一个topic时同时可以指定分区数目分区数越多其吞吐量也越大但是需要的资源也越多同时也会导致更高的不可用性kafka在接收到生产者发送的消息之后会根据均衡策略将消息存储到不同的分区中。 2 Kafka安装与使用 kafka官方下载地址 (1)首先确保你的机器上安装了jdkkafka需要java运行环境。 (2)以前的kafka还需要zookeeper新版的kafka已经内置了一个zookeeper环境所以我们可以直接使用。 2.1 安装jdk 1、解压 sudo tar -xzvf jdk-8u144-linux-x64.tar.gz -C /usr/local/ 2、配置环境变量 vi /home/zb/.bashrc export JAVA_HOME/usr/local/jdk1.8.0_144 export PATH$PATH:$JAVA_HOME/bin 3、配置立即生效 source /home/zb/.bashrc java -version2.2 安装kafka 1、解压 sudo tar -xzvf kafka_2.13-3.4.0.tgz -C /usr/local/ cd /usr/local/ sudo chmod 777 kafka_2.13-3.4.0 2、配置环境变量 vi /home/zb/.bashrc export KAFKA_HOME/usr/local/kafka_2.13-3.4.0 export PATH$PATH:$KAFKA_HOME/bin 3、配置立即生效 source /home/zb/.bashrc进行最简单的尝试的话我们只需要解压到任意目录即可。 2.3 配置 在kafka解压目录下下有一个config的文件夹里面放置的是我们的配置文件。 一、consumer.properites 消费者配置这个配置文件用于配置开启的消费者此处我们使用默认的即可。 二、producer.properties 生产者配置这个配置文件用于配置开启的生产者此处我们使用默认的即可。 三、server.properties kafka服务器的配置此配置文件用来配置kafka服务器目前仅介绍几个最基础的配置。 1、broker.id 申明当前kafka服务器在集群中的唯一ID需配置为integer, 并且集群中的每一个kafka服务器的id都应是唯一的我们这里采用默认配置即可2、listeners 申明此kafka服务器需要监听的端口号 listenersPLAINTEXT://myubuntu:9092。 并确保服务器的9092端口能够访问 配置好/etc/hosts的内容 10.0.2.11 myubuntu3、zookeeper.connect 申明kafka所连接的zookeeper的地址 需配置为zookeeper的地址 由于本次使用的是kafka高版本中自带zookeeper使用默认配置即可 zookeeper.connectlocalhost:2181四、zookeeper.properties zookeeper配置文件。 2.4 启动 cd /usr/local/kafka_2.13-3.4.0/ (1)启动zookeeper bin/zookeeper-server-start.sh config/zookeeper.properties 后台方式 nohup ./bin/zookeeper-server-start.sh ./config/zookeeper.properties /tmp/zookeeperoutput.log 21 (2)启动kafka bin/kafka-server-start.sh config/server.properties 后台方式 nohup ./bin/kafka-server-start.sh ./config/server.properties /tmp/kafkaoutput.log 21 (3)查看进程 jps 3297 Jps 2137 QuorumPeerMain 2684 Kafka(4)停止kafka cd /usr/local/kafka_2.13-3.4.0/ 停止kafka kafka-server-stop.sh 停止zookeeper zookeeper-server-stop.sh 2.5 应用 cd /usr/local/kafka_2.13-3.4.0/ 创建topic bin/kafka-topics.sh --create --bootstrap-server myubuntu:9092 --replication-factor 1 --partitions 1 --topic test查看已经创建的topic bin/kafka-topics.sh --list --bootstrap-server localhost:9092创建一个消息消费者 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning创建一个消息生产者 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test3 kafka的配置 在kafka/config/目录下面有3个配置文件 producer.properties producer consumer.properties consumer server.properties broker3.1 BROKER的全局配置 最为核心的三个配置 broker.id、log.dir、zookeeper.connect 。 3.1.1 系统相关 ------------------------------------------- 系统 相关 ------------------------------------------- ##每一个broker在集群中的唯一标示要求是正数。在改变IP地址不改变broker.id的话不会影响consumers broker.id 1##kafka数据的存放地址多个地址的话用逗号分割 /tmp/kafka-logs-1/tmp/kafka-logs-2 log.dirs /tmp/kafka-logs##提供给客户端响应的端口 port 6667##消息体的最大大小单位是字节 message.max.bytes 1000000## broker 处理消息的最大线程数一般情况下不需要去修改 num.network.threads 3## broker处理磁盘IO 的线程数 数值应该大于你的硬盘数 num.io.threads 8## 一些后台任务处理的线程数例如过期消息文件的删除等一般情况下不需要去做修改 background.threads 4## 等待IO线程处理的请求队列最大数若是等待IO的请求超过这个数值那么会停止接受外部消息算是一种自我保护机制 queued.max.requests 500##broker的主机地址若是设置了那么会绑定到这个地址上若是没有会绑定到所有的接口上并将其中之一发送到ZK一般不设置 host.name## 打广告的地址若是设置的话会提供给producers, consumers,其他broker连接具体如何使用还未深究 advertised.host.name## 广告地址端口必须不同于port中的设置 advertised.port## socket的发送缓冲区socket的调优参数SO_SNDBUFF socket.send.buffer.bytes 100*1024## socket的接受缓冲区socket的调优参数SO_RCVBUFF socket.receive.buffer.bytes 100*1024## socket请求的最大数值防止serverOOMmessage.max.bytes必然要小于socket.request.max.bytes会被topic创建时的指定参数覆盖 socket.request.max.bytes 100*1024*1024 3.1.2 LOG相关 ------------------------------------------- LOG 相关 ------------------------------------------- ## topic的分区是以一堆segment文件存储的这个控制每个segment的大小会被topic创建时的指定参数覆盖 log.segment.bytes 1024*1024*1024## 这个参数会在日志segment没有达到log.segment.bytes设置的大小也会强制新建一个segment 会被 topic创建时的指定参数覆盖 log.roll.hours 24*7## 日志清理策略 选择有delete和compact 主要针对过期数据的处理或是日志文件达到限制的额度会被 topic创建时的指定参数覆盖 log.cleanup.policy delete## 数据存储的最大时间 超过这个时间 会根据log.cleanup.policy设置的策略处理数据也就是消费端能够多久去消费数据 ## log.retention.bytes和log.retention.minutes任意一个达到要求都会执行删除会被topic创建时的指定参数覆盖 log.retention.minutes7days指定日志每隔多久检查看是否可以被删除默认1分钟 log.cleanup.interval.mins1## topic每个分区的最大文件大小一个topic的大小限制 分区数*log.retention.bytes 。-1没有大小限制 ## log.retention.bytes和log.retention.minutes任意一个达到要求都会执行删除会被topic创建时的指定参数覆盖 log.retention.bytes-1## 文件大小检查的周期时间是否处罚 log.cleanup.policy中设置的策略 log.retention.check.interval.ms5minutes## 是否开启日志压缩 log.cleaner.enablefalse## 日志压缩运行的线程数 log.cleaner.threads 1## 日志压缩时候处理的最大大小 log.cleaner.io.max.bytes.per.secondNone## 日志压缩去重时候的缓存空间 在空间允许的情况下越大越好 log.cleaner.dedupe.buffer.size500*1024*1024## 日志清理时候用到的IO块大小 一般不需要修改 log.cleaner.io.buffer.size512*1024## 日志清理中hash表的扩大因子 一般不需要修改 log.cleaner.io.buffer.load.factor 0.9## 检查是否处罚日志清理的间隔 log.cleaner.backoff.ms 15000## 日志清理的频率控制越大意味着更高效的清理同时会存在一些空间上的浪费会被topic创建时的指定参数覆盖 log.cleaner.min.cleanable.ratio0.5## 对于压缩的日志保留的最长时间也是客户端消费消息的最长时间同log.retention.minutes的区别在于一个控制未压缩数据一个控制压缩后的数据。会被topic创建时的指定参数覆盖 log.cleaner.delete.retention.ms 1day## 对于segment日志的索引文件大小限制会被topic创建时的指定参数覆盖 log.index.size.max.bytes 10*1024*1024## 当执行一个fetch操作后需要一定的空间来扫描最近的offset大小设置越大代表扫描速度越快但是也更好内存一般情况下不需要搭理这个参数 log.index.interval.bytes 4096## log文件sync到磁盘之前累积的消息条数 ## 因为磁盘IO操作是一个慢操作,但又是一个数据可靠性的必要手段 ## 所以此参数的设置,需要在数据可靠性与性能之间做必要的权衡. ## 如果此值过大,将会导致每次fsync的时间较长(IO阻塞) ## 如果此值过小,将会导致fsync的次数较多,这也意味着整体的client请求有一定的延迟. ## 物理server故障,将会导致没有fsync的消息丢失. log.flush.interval.messagesNone## 检查是否需要固化到硬盘的时间间隔 log.flush.scheduler.interval.ms 3000## 仅仅通过interval来控制消息的磁盘写入时机,是不足的. ## 此参数用于控制fsync的时间间隔,如果消息量始终没有达到阀值,但是离上一次磁盘同步的时间间隔 ## 达到阀值,也将触发. log.flush.interval.ms None## 文件在索引中清除后保留的时间 一般不需要去修改 log.delete.delay.ms 60000## 控制上次固化硬盘的时间点以便于数据恢复 一般不需要去修改 log.flush.offset.checkpoint.interval.ms 600003.1.3 TOPIC相关 ------------------------------------------- TOPIC 相关 ------------------------------------------- ## 是否允许自动创建topic 若是false就需要通过命令创建topic auto.create.topics.enable true## 一个topic 默认分区的replication个数 不得大于集群中broker的个数 default.replication.factor 1## 每个topic的分区个数若是在topic创建时候没有指定的话 会被topic创建时的指定参数覆盖 num.partitions 1实例 --replication-factor3--partitions1--topic replicated-topic 名称replicated-topic有一个分区分区被复制到三个broker上。3.1.4 复制(Leader、replicas) 相关 ----------------------------------复制(Leader、replicas) 相关 ---------------------------------- ## partition leader与replicas之间通讯时,socket的超时时间 controller.socket.timeout.ms 30000## partition leader与replicas数据同步时,消息的队列尺寸 controller.message.queue.size10## replicas响应partition leader的最长等待时间若是超过这个时间就将replicas列入ISR(in-sync replicas)并认为它是死的不会再加入管理中 replica.lag.time.max.ms 10000## 如果follower落后与leader太多,将会认为此follower[或者说partition relicas]已经失效 ## 通常,在follower与leader通讯时,因为网络延迟或者链接断开,总会导致replicas中消息同步滞后 ## 如果消息之后太多,leader将认为此follower网络延迟较大或者消息吞吐能力有限,将会把此replicas迁移 ## 到其他follower中. ## 在broker数量较少,或者网络不足的环境中,建议提高此值. replica.lag.max.messages 4000##follower与leader之间的socket超时时间 replica.socket.timeout.ms30*1000## leader复制时候的socket缓存大小 replica.socket.receive.buffer.bytes64*1024## replicas每次获取数据的最大大小 replica.fetch.max.bytes 1024*1024## replicas同leader之间通信的最大等待时间失败了会重试 replica.fetch.wait.max.ms 500## fetch的最小数据尺寸,如果leader中尚未同步的数据不足此值,将会阻塞,直到满足条件 replica.fetch.min.bytes 1## leader 进行复制的线程数增大这个数值会增加follower的IO num.replica.fetchers1## 每个replica检查是否将最高水位进行固化的频率 replica.high.watermark.checkpoint.interval.ms 5000## 是否允许控制器关闭broker ,若是设置为true,会关闭所有在这个broker上的leader并转移到其他broker controlled.shutdown.enable false## 控制器关闭的尝试次数 controlled.shutdown.max.retries 3## 每次关闭尝试的时间间隔 controlled.shutdown.retry.backoff.ms 5000## 是否自动平衡broker之间的分配策略 auto.leader.rebalance.enable false## leader的不平衡比例若是超过这个数值会对分区进行重新的平衡 leader.imbalance.per.broker.percentage 10## 检查leader是否不平衡的时间间隔 leader.imbalance.check.interval.seconds 300## 客户端保留offset信息的最大空间大小 offset.metadata.max.bytes3.1.5 ZooKeeper相关 ----------------------------------ZooKeeper 相关---------------------------------- ##zookeeper集群的地址可以是多个多个之间用逗号分割 hostname1:port1,hostname2:port2,hostname3:port3 zookeeper.connect localhost:2181## ZooKeeper的最大超时时间就是心跳的间隔若是没有反映那么认为已经死了不易过大 zookeeper.session.timeout.ms6000## ZooKeeper的连接超时时间 zookeeper.connection.timeout.ms 6000## ZooKeeper集群中leader和follower之间的同步实际那 zookeeper.sync.time.ms 2000 配置的修改 其中一部分配置是可以被每个topic自身的配置所代替例如 新增配置 bin/kafka-topics.sh --zookeeper localhost:2181--create --topic my-topic --partitions1--replication-factor1--config max.message.bytes64000--config flush.messages1修改配置 bin/kafka-topics.sh --zookeeper localhost:2181--alter --topic my-topic --config max.message.bytes128000删除配置 bin/kafka-topics.sh --zookeeper localhost:2181--alter --topic my-topic --deleteConfig max.message.bytes 3.2 CONSUMER配置 最为核心的配置是group.id、zookeeper.connect。 ## Consumer归属的组IDbroker是根据group.id来判断是队列模式还是发布订阅模式非常重要group.id## 消费者的ID若是没有设置的话会自增consumer.id## 一个用于跟踪调查的ID 最好同group.id相同client.id group id value## 对于zookeeper集群的指定可以是多个 hostname1:port1,hostname2:port2,hostname3:port3 必须和broker使用同样的zk配置zookeeper.connectlocalhost:2182## zookeeper的心跳超时时间超过这个时间就认为是dead消费者zookeeper.session.timeout.ms 6000## zookeeper的等待连接时间zookeeper.connection.timeout.ms 6000## zookeeper的follower同leader的同步时间zookeeper.sync.time.ms 2000## 当zookeeper中没有初始的offset时候的处理方式 。smallest 重置为最小值 largest:重置为最大值 anythingelse抛出异常auto.offset.reset largest## socket的超时时间实际的超时时间是max.fetch.wait socket.timeout.ms.socket.timeout.ms30*1000## socket的接受缓存空间大小socket.receive.buffer.bytes64*1024##从每个分区获取的消息大小限制fetch.message.max.bytes 1024*1024## 是否在消费消息后将offset同步到zookeeper当Consumer失败后就能从zookeeper获取最新的offsetauto.commit.enable true## 自动提交的时间间隔auto.commit.interval.ms 60*1000## 用来处理消费消息的块每个块可以等同于fetch.message.max.bytes中数值queued.max.message.chunks 10## 当有新的consumer加入到group时,将会reblance,此后将会有partitions的消费端迁移到新 ## 的consumer上,如果一个consumer获得了某个partition的消费权限,那么它将会向zk注册 ##Partition Owner registry节点信息,但是有可能此时旧的consumer尚没有释放此节点, ## 此值用于控制,注册节点的重试次数.rebalance.max.retries 4## 每次再平衡的时间间隔rebalance.backoff.ms 2000## 每次重新选举leader的时间refresh.leader.backoff.ms## server发送到消费端的最小数据若是不满足这个数值则会等待知道满足数值要求fetch.min.bytes 1## 若是不满足最小大小(fetch.min.bytes)的话等待消费端请求的最长等待时间fetch.wait.max.ms 100## 指定时间内没有消息到达就抛出异常一般不需要改consumer.timeout.ms -1 3.3 PRODUCER的配置 比较核心的配置metadata.broker.list、request.required.acks、producer.type、serializer.class。 ## 消费者获取消息元信息(topics, partitions and replicas)的地址,配置格式是host1:port1,host2:port2也可以在外面设置一个vipmetadata.broker.list##消息的确认模式##0不保证消息的到达确认只管发送低延迟但是会出现消息的丢失在某个server失败的情况下有点像TCP##1发送消息并会等待leader 收到确认后一定的可靠性## -1发送消息等待leader收到确认并进行复制操作后才返回最高的可靠性request.required.acks 0## 消息发送的最长等待时间request.timeout.ms 10000## socket的缓存大小send.buffer.bytes100*1024## key的序列化方式若是没有设置同serializer.classkey.serializer.class## 分区的策略默认是取模partitioner.classkafka.producer.DefaultPartitioner## 消息的压缩模式默认是none可以有gzip和snappycompression.codec none## 可以针对默写特定的topic进行压缩compressed.topicsnull## 消息发送失败后的重试次数message.send.max.retries 3## 每次失败后的间隔时间retry.backoff.ms 100## 生产者定时更新topic元信息的时间间隔 若是设置为0那么会在每个消息发送后都去更新数据topic.metadata.refresh.interval.ms 600*1000## 用户随意指定但是不能重复主要用于跟踪记录消息client.id------------------------------------------- 消息模式 相关 -------------------------------------------## 生产者的类型 async:异步执行消息的发送 sync同步执行消息的发送producer.typesync## 异步模式下那么就会在设置的时间缓存消息并一次性发送queue.buffering.max.ms 5000## 异步的模式下 最长等待的消息数queue.buffering.max.messages 10000## 异步模式下进入队列的等待时间 若是设置为0那么要么进入队列要么直接抛弃queue.enqueue.timeout.ms -1## 异步模式下每次发送的最大消息数前提是触发了queue.buffering.max.messages或是queue.buffering.max.ms的限制batch.num.messages200## 消息体的系列化处理类 转化为字节流进行传输serializer.class kafka.serializer.DefaultEncoder4 Kafka内外网访问的设置 4.1 listeners和advertised.listeners kafka的两个配置listeners和advertised.listeners。 一、listeners kafka监听的网卡的ip假设你机器上有两张网卡内网192.168.0.213和外网101.89.163.1 如下配置 listenersPLAINTEXT://192.168.0.213:9092那么kafka只监听内网网卡即只接收内网网卡的数据如果你不能把外网网卡流量转发到内网网卡那么kafka就接收不到外网网卡数据。 如果配置成外网ip同理。 当然你可以配置成0.0.0.0监听所有网卡。 二、advertised.listeners 我们观察kafka的配置文件server.properties会发现里面记录了zookeeper集群的各个节点的访问地址但是并没有记录kafka兄弟节点的地址。 kafka节点启动后会向zookeeper注册自己同时从zookeeper中获取兄弟节点的地址以便与兄弟节点通信。 同样我们使用客户端连接kafka后kafka返回给客户端的是集群各节点的访问地址这个地址也是上面说的从zookeeper中获得的地址。 这个地址哪里来就是kafka节点向zookeeper注册时提供的advertised.listeners。如果没有就会使用listeners。 4.2 只需要内网访问kafka kafka只监听内网网卡即只接收内网网卡的数据。 listenersPLAINTEXT://192.168.0.213:90924.3 只需要外网访问kafka kafka只监听外网网卡即只接收外网网卡的数据。 listenersPLAINTEXT://101.89.163.1:90924.4 需要内外网访问 使用宿主机通过NAT映射搞出来的外网ip此时kafka无法监听这个外网ip(因为不存在启动就会报错)。这时候就是advertised.listeners真正发挥作用的时候了。 使用如下配置 listenersPLAINTEXT://192.168.0.213:9092 advertised.listenersPLAINTEXT://101.89.163.1:9092此时一个完整的kafka客户端访问服务端的流程 (1)客户端访问101.89.163.1:9092被kafka宿主机所在环境映射到内网192.168.0.213:9092访问到了kafka节点请求获得kafka服务端的访问地址。 (2)kafka从zookeeper拿到自己和其他兄弟节点通过advertised.listeners注册到zookeeper的101.89.163.1:9092等外网地址作为kafka的服务端访问地址返回给客户端。 (3)客户端拿这些地址访问kafka集群被kafka宿主机所在环境映射到各kafka节点的内网ip访问到了kafka服务端…完美循环。 你可能会问已经配置了访问地址为什么还要在第一次访问的时候请求获得kafka的访问地址。因为如果是kafka集群你可以选择只给客户端配置一个kafka节点的地址这样是不推荐的但是客户端必须要访问集群中的每一个节点所以必须通过这个节点获得集群中每一个节点的访问地址。 如果不配置advertised.listenersPLAINTEXT://101.89.163.1:9092你会发现虽然你给kafka客户端配置的访问地址是101.89.163.1:9092但是kafka客户端访问时报错报错原因是Connection to node -1[192.168.0.213:9092] could not be established. Broker may not be available.。这就是因为不配置advertised.listeners则advertised.listeners默认使用listeners配置的地址客户端拿到的就是listeners配置的内网地址。
http://www.w-s-a.com/news/987639/

相关文章:

  • 网站多久才会被收录服务器租用泰海
  • 电商网站建设合同模板临汾推广型网站建设
  • 天猫商务网站建设目的长春网站设计
  • 公司网站建设会议纪要昆山高端网站建设机构
  • 做消费网站流程深圳网站设计价格
  • 做电影网站怎么接广告中国最新军事新闻视频
  • 网站推广设计做哪些设置自动删除的wordpress
  • 东莞东坑网站设计专业网站制作设
  • 网站怎么做现场直播视频成都科技网站建设找
  • 个人网页设计步骤网站没有内容 能做优化吗
  • 专业网站建设公司招聘网站排行榜
  • 网站建设规范方法企业解决方案架构
  • ae做网站导航wordpress门户
  • 重庆市网站备案材料云南做网站
  • 网页设计模板网站免费珠海视窗网
  • 茂名模板建站定制WordPress注册不提示
  • 陕西营销型手机网站建设深圳制作网站服务
  • 受欢迎的锦州网站建设Wordpress 图片左右滑动
  • 湖南优化网站建设线上网站建设需求
  • 建什么类型的网站访问量比较大哪些外包公司比较好
  • php网站地图外贸建站哪家强外贸网站怎么做
  • 宁波五金网站建设中国建筑网官网投诉查询
  • 哪个网站注册域名便宜免费流程图制作网站
  • 潍坊做网站南宁网站seo优化公司
  • 网站建设的基本技术步骤无网站营销
  • 我国旅游网站的建设网站开发 混合式 数据库
  • 淘宝客网站域名家居网站开发项目计划书
  • 网站打不开显示asp苏州注册公司需要多少钱
  • 凡科建站登录官网wordpress主题有什么用
  • 西安双语网站建设怎么做网页动图