wordpress下载网站模板怎么用,商标购买,宁波人流医院,太原做网站要多少钱呢一、上下文
从《Kafka-初识》中可以看到运行kafka-console-producer和 kafka-console-consumer来生产和消费数据时会打印很多参数#xff0c;这些参数给我们应对多种场景提供了遍历#xff0c;除了producer和consumer的提供了参数外#xff0c;Kafka服务器集群中的broker也…一、上下文
从《Kafka-初识》中可以看到运行kafka-console-producer和 kafka-console-consumer来生产和消费数据时会打印很多参数这些参数给我们应对多种场景提供了遍历除了producer和consumer的提供了参数外Kafka服务器集群中的broker也提供了相应的参数我们可以在server.properties中找到对应的参数下面我们就从producer、consumer、broker三方面对Kafka的参数分别做下整理。
二、producer
acks 默认值cdh中为1 官网为all
producer要求分区所属leader在考虑请求完成之前收到的确认数量。这控制了发送的记录的持久性。允许以下设置
acks0如果设置为零则生产者根本不会等待服务器的任何确认。记录将立即添加到套接字缓冲区并被视为已发送。在这种情况下无法保证服务器已收到记录重试配置将不会生效因为客户端通常不会知道任何故障。每条记录的偏移量将始终设置为-1。
acks1这意味着leader将把记录写入其本地日志但不会等待所有followers的完全确认。在这种情况下如果leader在确认记录后立即失败但在followers复制记录之前失败那么记录就会丢失。
acksall这意味着leader将等待整套同步副本确认记录。这保证了只要至少有一个同步副本仍然有效记录就不会丢失。这是最有力的保证。这相当于acks-1设置。
请注意启用幂等性要求此配置值为“all”。如果设置了冲突的配置并且未显式启用幂等性则禁用幂等性。batch.size 默认值16384 16k
每当多条记录被发送到同一分区时生产者将尝试将记录一起批处理成更少的请求。这有助于提高客户端和服务器的性能。此配置控制默认批大小以字节为单位。
将不会尝试批量处理大于此大小的记录。
发送给broker的请求将包含多个批次每个分区一个批次其中包含可发送的数据。
小批量将使批处理不那么常见并可能降低吞吐量批量大小为零将完全禁用批处理。非常大的批处理大小可能会更浪费内存因为我们总是会分配指定批处理大小的缓冲区以应对额外的记录。
注意此设置给出了要发送的批大小的上限。如果此分区的累积字节数少于此数我们将“徘徊”linger.ms时间等待更多记录出现。此linger.ms设置默认为0这意味着即使累积的批大小在此batch.size设置下我们也会立即发送一条记录。bootstrap.servers 默认值cdh中为[cdh1:9092, cdh2:9092, cdh3:9092] 因为cdh整体为你提供的服务只能你选了哪些节点安装kafka。 官网为
用于建立与Kafka集群的初始连接的主机/端口对列表。客户端将使用所有服务器而不管这里指定了哪些服务器进行引导——此列表仅影响用于发现整套服务器的初始主机。此列表的格式应为host1:port1,host2:port2,...由于这些服务器仅用于初始连接以发现完整的集群成员资格这可能会动态变化因此此列表不需要包含完整的服务器集不过如果服务器停机您可能需要多个服务器。buffer.memory 默认值33554432
生产者可以用来缓冲等待发送到服务器的记录的内存总字节数。如果记录的发送速度超过了它们可以传递到服务器的速度生产者将阻止max.block.ms之后将抛出异常。
此设置应大致对应于生产者将使用的总内存但不是硬限制因为生产者使用的并非所有内存都用于缓冲。一些额外的内存将用于压缩如果启用了压缩以及维护正在进行的请求。client.dns.lookup 默认值use_all_dns_ips
控制客户端如何使用DNS查找。如果设置为use_all_dns_ips则按顺序连接到每个返回的IP地址直到建立成功的连接。断开连接后将使用下一个IP。一旦所有IP都被使用一次客户端就会再次从主机名解析IP但是JVM和操作系统缓存DNS名称查找都是如此。如果设置为resolve_canonical_boottrap_servers_only则将每个引导地址解析为规范名称列表。在引导阶段之后其行为与use_all_dns_ips相同。client.id 默认值cdh为console-producer 官网为
发出请求时传递给服务器的id字符串。这样做的目的是通过允许在服务器端请求日志中包含逻辑应用程序名称能够跟踪除ip/port之外的请求来源。compression.type 默认值cdh为none 官网为producer
指定给定topic的最终压缩类型。此配置接受标准压缩编解码器gzip、snappy、lz4、zstd。它还接受“不压缩”这相当于没有压缩以及“producer这意味着保留生产者设置的原始压缩编解码器。connections.max.idle.ms 默认值cdh为540000 官网为600000 10min
空闲连接超时服务器Socket处理器线程关闭空闲时间超过此时间的连接delivery.timeout.ms 默认值120000 2min
send() 调用返回后报告成功或失败的时间上限。这限制了记录在发送之前延迟的总时间、等broker确认的时间如果预期的话以及允许重试发送失败的时间。如果遇到不可恢复的错误、重试次数已用尽或记录被添加到已达到较早交付截止日期的批中则生产者可能会报告未能在此配置之前发送记录。此配置的值应大于或等于request.timeout.ms和linger.ms之和enable.idempotence 默认值cdh为false 官网为true
当设置为“true”时生产者将确保流中只写入每条消息的一个副本。如果为“false”由于代理失败等原因生产者重试可能会在流中写入重试消息的副本。请注意启用幂等性要求max.in.flight.requests.per.connection小于或等于5保留任何允许值的消息顺序重试次数大于0并且acks必须为“all”。
如果没有设置冲突的配置默认情况下会启用Idempotence。如果设置了冲突的配置并且未显式启用幂等性则禁用幂等性。如果显式启用了幂等性并且设置了冲突的配置则会抛出ConfigException。interceptor.classes 默认值[]
用作拦截器的类列表。实现org.apache.kafka.clients.producer。ProducerInterceptor接口允许我们在生产者接收到的记录发布到Kafka集群之前对其进行拦截并可能进行变异。默认情况下没有拦截器。key.serializer 默认值class org.apache.kafka.common.serialization.ByteArraySerializer
实现org.apache.kafka.common.serialization的键的序列化器类。序列化程序接口。linger.ms 默认值cdh为1000 官网为0
生产者将请求传输之间到达的任何记录组合成一个批处理请求。通常只有在记录到达速度快于发送速度时才会在负载下发生这种情况。然而在某些情况下即使在中等负载下客户端也可能希望减少请求数量。此设置通过添加少量的人为延迟来实现这一点也就是说生产者不会立即发送记录而是等待给定的延迟来允许发送其他记录以便可以将发送一起批处理。这可以被认为类似于TCP中的Nagle算法。此设置给出了批处理延迟的上限一旦我们为一个分区获得了batch.size值的记录无论此设置如何它都会立即发送但是如果我们为这个分区累积的字节数少于这个数量我们将在指定的时间内“徘徊”等待更多的记录出现。此设置默认为0即无延迟。例如设置linger.ms5可以减少发送的请求数量但在没有负载的情况下会给发送的记录增加5毫秒的延迟。max.block.ms 默认值60000 1min
该配置控制KafkaProducer的send、partitionsFor、initTransactions、sendOffsetsToTransaction、commitTransaction和abortTransaction方法将阻塞多长时间。对于send此超时限制了等待元数据获取和缓冲区分配的总时间用户提供的序列化器或分区器中的阻塞不计入此超时。对于partitionsFor如果元数据不可用则此超时限制了等待元数据的时间。与事务相关的方法总是阻塞但如果无法发现事务协调器或在超时内没有响应则可能会超时。max.in.flight.requests.per.connection 默认值5
在阻塞之前客户端将在单个连接上发送的未确认请求的最大数量。请注意如果此配置设置为大于1并且enable.idempotence设置为false则由于重试即如果启用了重试导致发送失败后存在消息重新排序的风险如果禁用重试或enable.idempotence设置为true则将保留顺序。此外启用幂等性要求此配置的值小于或等于5。如果设置了冲突的配置并且未显式启用幂等性则禁用幂等性。max.request.size 默认值1048576
请求的最大大小字节。此设置将限制生产者在单个请求中发送的记录批数以避免发送大量请求。这也有效地限制了最大未压缩记录批大小。请注意服务器对记录批大小有自己的上限如果启用了压缩则在压缩后这可能与此不同。metadata.max.age.ms 默认值300000 5min
一段时间毫秒在此之后即使我们没有看到任何分区领导层发生变化我们也会强制刷新元数据以主动发现任何新的broker或分区。metric.reporters 默认值[]
用作指标报告器的类列表。实现org.apache.kafka.common.metrics.MetricsReporter接口允许插入将收到新度量创建通知的类。始终包含JmxReporter来注册JMX统计信息metrics.num.samples 默认值2
为计算度量而维护的样本数量metrics.recording.level 默认值INFO
指标的最高记录级别metrics.sample.window.ms 默认值30000 30s
计算度量样本的时间窗口。partitioner.class 默认值class cdh为 org.apache.kafka.clients.producer.internals.DefaultPartitioner 官网为null
确定生成记录时将记录发送到哪个分区。可用选项包括
1、如果未设置则使用默认分区逻辑。此策略将记录发送到分区直到该分区至少产生batch.size字节。它与以下策略协同工作 a、如果没有指定分区但存在key请根据key的哈希值选择分区。 b、如果没有分区或key请选择在分区产生至少batch.size字节时发生变化的粘性分区。
2、org.apache.kafka.clients.producer.RoundRobinPartitioner一种分区策略其中一系列连续记录中的每条记录都被发送到不同的分区无论是否提供了“key”直到分区用完过程重新开始。注意在创建新批次时有一个已知的问题会导致分布不均。更多详细信息请参见KAFKA-9965。
实现org.apache.kafka.clients.producer.Partitioner接口允许您插入自定义分区器。receive.buffer.bytes 默认值32768 32k
读取数据时使用的TCP接收缓冲区SO_RCVBUF的大小。如果该值为-1则将使用操作系统默认值。reconnect.backoff.max.ms 默认值1000 1s
重新连接到反复连接失败的代理时等待的最长时间毫秒。如果提供每台主机的回退将随着每次连续连接失败呈指数级增长直至达到此最大值。在计算退避增加后添加20%的随机抖动以避免连接风暴。reconnect.backoff.ms 默认值50
尝试重新连接到给定主机之前等待的基本时间量。这避免了在紧环中重复连接到主机。此回退适用于客户端向代理的所有连接尝试。此值是初始回退值每次连续连接失败时都会呈指数级增长最高可达reconnect.backoff.max.ms值。request.timeout.ms 默认值cdh为1500 官网为40000 40s
配置控制客户端等待请求响应的最长时间。如果在超时时间过去之前未收到响应则客户端将在必要时重新发送请求或者在重试次数用尽时使请求失败。retries 默认值cdh为3 官网为0
设置大于零的值将导致客户端重新发送任何失败的请求并可能出现暂时性错误。建议将该值设置为零或“MAX_VALUE”并使用相应的超时参数来控制客户端应重试请求的时间。retry.backoff.ms 默认值100
在尝试重试对给定topic分区的失败请求之前等待的时间量。这避免了在某些故障情况下在紧循环中重复发送请求。此值是初始退避值对于每个失败的请求它将呈指数级增加直到retry.backff.max.ms值。transaction.timeout.ms 默认值60000 1min
事务在协调器主动中止之前保持打开状态的最长时间毫秒。事务的开始时间是在向其添加第一个分区时设置的。如果此值大于代理中的transaction.max.timeout.ms设置则请求将失败并出现InvalidTxnTimeoutException错误。transactional.id 默认值null
用于事务传递的TransactionalId。这实现了跨越多个生产者会话的可靠性语义因为它允许客户端保证在开始任何新事务之前使用相同TransactionalId的事务已经完成。如果没有提供TransactionalId则生产者仅限于幂等传递。如果配置了TransactionalId则暗示了enable.idempotence。默认情况下未配置TransactionId这意味着无法使用事务。请注意默认情况下交易需要至少三个broker的集群这是生产的推荐设置对于开发您可以通过调整代理设置transaction.state.log.replication.factor来更改此设置。value.serializer 默认值class cdh为 org.apache.kafka.common.serialization.ByteArraySerializer 官网为null
实现org.apache.kafka.common.serialization的值的序列化器类
三、consumer
auto.commit.interval.ms 默认值5000 5s
如果enable.auto.commit设置为true则消费者偏移量自动提交到Kafka的频率毫秒auto.offset.reset 默认值cdh为earliest 官网为latest
当Kafka中没有初始偏移量时或者如果服务器上不再存在当前偏移量例如因为该数据已被删除该怎么办
earliest自动将偏移重置为最早的偏移
latest自动将偏移量重置为最新偏移量
none如果没有找到消费者组的先前偏移量则向消费者抛出异常
anything else向消费者抛出例外。
请注意在将此配置设置为最新时更改分区号可能会导致消息传递丢失因为生产者可能会在消费者重置其偏移量之前开始向新添加的分区发送消息即还不存在初始偏移量。bootstrap.servers 默认值因为通过cm界面安装kafka时已经指定了机器因此可以给出具体的列表[cdh1:9092, cdh2:9092, cdh3:9092] 官网localhost:9092
用于建立与Kafka集群的初始连接的主机/端口对列表。客户端将使用所有服务器而不管这里指定了哪些服务器进行引导——此列表仅影响用于发现整套服务器的初始主机。此列表的格式应为host1:port1,host2:port2,...由于这些服务器仅用于初始连接以发现完整的集群成员资格这可能会动态变化因此此列表不需要包含完整的服务器集不过如果服务器停机您可能需要多个服务器。check.crcs 默认值true
自动检查所消耗记录的CRC32。这确保了消息不会在线路或磁盘上发生损坏。此检查会增加一些开销因此在寻求极致性能的情况下可能会将其禁用。client.dns.lookup 默认值cdh为default 官网为use_all_dns_ips
控制客户端如何使用DNS查找。如果设置为use_all_dns_ips则按顺序连接到每个返回的IP地址直到建立成功的连接。断开连接后将使用下一个IP。一旦所有IP都被使用一次客户端就会再次从主机名解析IP但是JVM和操作系统缓存DNS名称查找都是如此。如果设置为resolve_canonical_bootstrap_servers_only则将每个引导地址解析为规范名称列表。在引导阶段之后其行为与use_all_dns_ips相同。client.id
发出请求时传递给服务器的id字符串。这样做的目的是通过允许在服务器端请求日志中包含逻辑应用程序名称能够跟踪除ip/port之外的请求来源。connections.max.idle.ms 默认值cdh为540000 9min 官网为300000 5min
在此配置指定的毫秒数后关闭空闲连接default.api.timeout.ms 默认值60000 1min
指定客户端API的超时时间以毫秒为单位。此配置用作所有未指定超时参数的客户端操作的默认超时。enable.auto.commit 默认值cdh中为false 官网为true
如果为真则消费者的偏移将定期在后台提交。exclude.internal.topics 默认值true
是否应从订阅中排除与订阅模式匹配的内部topic。始终可以明确地订阅内部topic。fetch.max.bytes 默认值52428800 50m
服务器应为获取请求返回的最大数据量。记录由消费者分批提取如果提取的第一个非空分区中的第一个记录批次大于此值则仍将返回记录批次以确保消费者能够取得进展。因此这不是一个绝对的最大值。broker接受的最大记录批大小是通过message.max.bytes代理配置或max.message.bytes主题配置定义的。请注意消费者并行执行多个提取。fetch.max.wait.ms 默认值500
服务器在响应获取请求之前将阻止的最长时间——没有足够的数据来立即满足fetch.min.bytes给出的要求。此配置仅用于本地日志获取。要调整远程获取的最大等待时间请参阅“remote.reache.max.wait.ms”broker 配置fetch.min.bytes 默认值1
服务器应为获取请求返回的最小数据量。如果可用数据不足请求将等待积累那么多数据后再回复请求。1字节的默认设置意味着一旦有那么多字节的数据可用或者获取请求在等待数据到达时超时就会得到响应。将其设置为更大的值将导致服务器等待大量数据累积这可以以一些额外的延迟为代价提高服务器吞吐量。group.id 默认值console-consumer-26289 默认时没有值的需要用户指定如果没有指定kafka给你默认分配了一个
标识此工作程序所属的Connect群集组的唯一字符串heartbeat.interval.ms 默认值3000 3s
使用Kafka的组管理工具时组协调员心跳之间的预期时间。心跳用于确保工人的会话保持活跃并在新成员加入或离开组时促进重新平衡。该值必须设置为低于session.timeout.ms但通常不应设置为高于该值的1/3。它可以调整得更低以控制正常再平衡的预期时间。interceptor.classes 默认值[]
用作拦截器的类列表。实现org.apache.kafka.clients.consumer.ConsumerInterceptor接口允许您拦截并可能修改消费者收到的记录。默认情况下没有拦截器。isolation.level 默认值read_uncommitted
控制如何读取以事务方式编写的消息。如果设置为read_committedconsumer.poll() 将只返回已提交的事务消息。如果设置为read_uncommitted默认值consumer.poll()将返回所有消息甚至是已中止的事务消息。在任何一种模式下非事务性消息都将无条件返回。
消息将始终按偏移顺序返回。因此在read_committed模式下consumer.poll()将只返回直到最后一个稳定偏移量LSO的消息该偏移量小于第一个打开事务的偏移量。特别是在属于正在进行的交易的消息之后出现的任何消息都将被扣留直到相关交易完成。因此当有正在进行的交易时read_committed消费者将无法读取高水位。
此外当处于read_committed状态时seekToEnd方法将返回LSOkey.deserializer 默认值class cdh中为 org.apache.kafka.common.serialization.ByteArrayDeserializer 官方中没有默认值
实现org.apache.kafka.common.serialization的key的反序列化器类max.partition.fetch.bytes 默认值1048576
服务器将返回的每个分区的最大数据量。记录由消费者分批提取。如果获取的第一个非空分区中的第一个记录批大于此限制则仍将返回该批以确保消费者可以取得进展。代理接受的最大记录批大小是通过message.max.bytes代理配置或max.message.bytes主题配置定义的。有关限制消费者请求大小的信息请参阅fetch.max.bytes。max.poll.interval.ms 默认值300000 5min
使用消费者组管理时调用poll()之间的最大延迟。这为消费者在获取更多记录之前可以空闲的时间设置了上限。如果在此超时到期之前未调用poll()则认为消费者失败组将重新平衡以便将分区重新分配给另一个成员。对于使用非空group.instance.id达到此超时的消费者分区不会立即重新分配。相反消费者将停止发送心跳分区将在session.timeout.ms过期后重新分配。这反映了已关闭的静态消费者的行为。max.poll.records 默认值500
单次调用poll()时返回的最大记录数。请注意max.poll.records不会影响底层的获取行为。消费者将缓存每个获取请求的记录并从每次轮询中递增地返回这些记录。metadata.max.age.ms 默认值300000 5min
一段时间毫秒在此之后即使我们没有看到任何分区leader发生变化我们也会强制刷新元数据以主动发现任何新的代理或分区。partition.assignment.strategy 默认值[class cdh为 org.apache.kafka.clients.consumer.RangeAssignor] 官网为
class org.apache.kafka.clients.consumer.RangeAssignor,
class org.apache.kafka.clients.consumer.CooperativeStickyAssignor
按优先级排序的类名或类类型列表其中包含客户端在使用组管理时将用于在消费者实例之间分配分区所有权的支持分区分配策略。可用选项包括
org.apache.kafka.clients.consumer.RangeAssignor按主题分配分区。
org.apache.kafka.clients.consumer.RoundRobinAssignor以循环方式为消费者分配分区。
org.apache.kafka.clients.consumer.StickyAssignor保证分配达到最大平衡同时保留尽可能多的现有分区分配。
org.apache.kafka.clients.consumer.CooperativeStickyAssignor遵循相同的StickyAassignor逻辑但允许合作再平衡。
默认的分配器是[RangeAssignorCooperative StickyAssignor]默认情况下将使用RangeAssignor但允许升级到Cooperative StickeyAssignor只需一次滚动反弹即可从列表中删除RangeAssignor。
实现org.apache.kafka.clients.consumer.ConsumerPartitionAssignor接口允许您插入自定义分配策略。receive.buffer.bytes 默认值65536 64k
读取数据时使用的TCP接收缓冲区SO_RCVBUF的大小。如果该值为-1则将使用操作系统默认值。reconnect.backoff.max.ms 默认值1000 1s
重新连接到反复连接失败的代理时等待的最长时间毫秒。如果提供每台主机的回退将随着每次连续连接失败呈指数级增长直至达到此最大值。在计算退避增加后添加20%的随机抖动以避免连接风暴。reconnect.backoff.ms 默认值50
尝试重新连接到给定主机之前等待的基本时间量。这避免了在紧环中重复连接到主机。此回退适用于客户端向代理的所有连接尝试。此值是初始回退值每次连续连接失败时都会呈指数级增长最高可达reconnect.backoff.max.ms值。request.timeout.ms 默认值cdh为30000 30s 官网为40000 40s
配置控制客户端等待请求响应的最长时间。如果在超时时间过去之前未收到响应则客户端将在必要时重新发送请求或者在重试次数用尽时使请求失败。retry.backoff.ms 默认值100
在尝试重试对给定主题分区的失败请求之前等待的时间量。这避免了在某些故障情况下在紧循环中重复发送请求。此值是初始退避值对于每个失败的请求它将呈指数级增加直到retry.backff.max.ms值。security.protocol 默认值PLAINTEXT
用于与broker通信的协议。有效值为PLAINTEXT、SSL、SASL_PLAINTEXT和SASL_SSL。send.buffer.bytes 默认值131072
发送数据时使用的TCP发送缓冲区SO_SNDBUF的大小。如果该值为-1则将使用操作系统默认值。session.timeout.ms 默认值10000 10s
用于检测工作程序故障的超时时间。worker定期向代理发送心跳以指示其活动状态。如果代理在此会话超时到期之前没有收到心跳则代理将从组中删除该worker并启动重新平衡。请注意该值必须在group.min.session.timeout.ms和group.max.session.ttimeout.ms在代理配置中配置的允许范围内。value.deserializer 默认值class cdh中为
org.apache.kafka.common.serialization.ByteArrayDeserializer 官网为空
实现org.apache.kafka.common.serialization的value的反序列化器类
四、broker
服务器基础配置
broker.id 默认值0
必须为每一个broker设置一个唯一的整数
此服务器的唯一id。如果未设置将生成一个唯一的整数作为id。为了避免ZooKeeper生成的id和用户配置的id之间的冲突生成的id从reserved.broker.max.id1开始。
Socket配置
listeners 默认值PLAINTEXT://:9092
Socket服务器监听的地址,r如果没有配置将从java.net.InetAddress.getCanonicalHostName()获取 advertised.listeners 默认值PLAINTEXT://your.host.name:9092
listener.security.protocol.map 默认值PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
broker 向生产者和消费者展示的主机名和端口如果未设置则使用listeners的值如果已配置。否则它将使用java.net.InetAddress.getCanonicalHostName()。
num.network.threads 默认值3
服务器用于从网络接收请求并向网络发送响应的线程数
num.io.threads 默认值8
服务器用于处理请求的线程数其中可能包括磁盘I/O
socket.send.buffer.bytes 默认值102400 100K
Socket的发送缓冲区(SO_SNDBUF)
socket.receive.buffer.bytes 默认值102400 100K
Socket的接收缓冲区(SO_RCVBUF)
socket.request.max.bytes 默认值104857600 100M
Socket接受请求的最大大小防止OOM
日志设置
log.dirs 默认值/tmp/kafka-logs
以逗号分隔的存储日志文件的目录列表
num.partitions 默认值1
每个topic的默认日志分区数。更多的分区允许更大的并行性用于消费但这也会导致跨broker的文件更多。
num.recovery.threads.per.data.dir 默认值1
启动时用于日志恢复和关闭时用于刷新的每个数据目录的线程数。对于RAID阵列中有数据驱动器的安装建议增加此值。
内部Topic设置
offsets.topic.replication.factor 默认值cdh中是1官网是3 transaction.state.log.replication.factor 默认值cdh中是1官网是3
对于内部的topic例如“__consumer_offsets”和“__transaction_state”的副本数建议使用大于1的值以确保可用性例如3transaction.state.log.min.isr 默认值cdh中是1官网是2
生产者在向topci打数据时需要确认写入的最小副本数量。
日志写入策略
消息会立即写入文件系统但默认情况下我们只使用fsync()来延迟同步操作系统缓存。以下配置控制数据到磁盘的刷新。这里有一些重要的权衡trade-offs 1、持久性如果不使用副本未持久化的数据可能会丢失。 2、延迟当确实发生写入时非常大的写入间隔可能会导致延迟峰值因为将有大量数据需要写入磁盘。 3、吞吐量持久化磁盘通常是最昂贵的操作较小的写入磁盘间隔可能会导致过度磁盘寻道。
以下设置允许用户配置写入磁盘策略以便在一段时间后或每N条消息或两者兼而有之刷新数据。这可以在全局范围内完成并在每个topic的基础上覆盖。
log.flush.interval.messages 默认值cdh中是10000 源码中是9223372036854775807
当接受的消息数达到该值后强制将数据刷新到磁盘
log.flush.interval.ms 默认值cdh中是1000官网是null
消息可以在日志中停留的最长时间当达到该阈值强制将数据刷新到磁盘
日志保留策略
以下配置控制日志段的处置。该策略可以设置为在一段时间后或累积给定大小后进行删除。只要满足这些条件中的任何一个就会删除一个片段。删除总是从日志末尾开始。
log.retention.hours 默认值168
日志保留的小时数当达到这个阈值时就要对数据进行删除
log.retention.bytes 默认值cdh中为1073741824 1G 官网为-1
该删除策略独立于log.retenation.hours
日志保留的最大大小当达到这个阈值时就要对数据进行删除
log.segment.bytes 默认值1073741824 1G
日志段文件的最大大小。当达到此大小时将创建一个新的日志段。
log.retention.check.interval.ms 默认值300000 5min
根据保留策略检查日志段是否可以删除的间隔
Zookeeper配置
zookeeper.connect 默认值localhost:2181
Zookeeper连接字符串。这是一个逗号分隔的主机端口对每个对应一个zk服务器。例如“127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002”。我们还可以在url后附加一个可选的chroot字符串以指定所有kafka-znode的根目录。例如127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002/kafka
zookeeper.connection.timeout.ms 默认值6000
连接到Zookeeper的超时时间毫秒
组协调设置
以下配置指定了GroupCoordinator将延迟初始消费者重新平衡的时间以毫秒为单位。
随着新成员加入组group.initial.rebalance.delay.ms的值将进一步延迟重新平衡最大值为max.poll.interval.ms。
默认值为3秒。
我们在这里将其重写为0因为它为开发和测试提供了更好的开箱即用体验。
然而在生产环境中默认值3秒更合适因为这将有助于避免在应用程序启动期间进行不必要且可能昂贵的重新平衡
group.initial.rebalance.delay.ms 默认值chd中是0 官网是3