潍坊高端网站建设价格,2017优秀网站设计,建筑网官网查询,领导交给你一个网站你该怎么做文章收录在网站#xff1a;http://hardyfish.top/
文章收录在网站#xff1a;http://hardyfish.top/
文章收录在网站#xff1a;http://hardyfish.top/
文章收录在网站#xff1a;http://hardyfish.top/ 事务
事务Producer保证消息写入分区的原子性#xff0c;即这批消…文章收录在网站http://hardyfish.top/
文章收录在网站http://hardyfish.top/
文章收录在网站http://hardyfish.top/
文章收录在网站http://hardyfish.top/ 事务
事务Producer保证消息写入分区的原子性即这批消息要么全部写入成功要么全失败。此外Producer重启回来后kafka依然保证它们发送消息的精确一次处理。
开启enable.idempotence true
设置Producer端参数transctional.id
数据的发送需要放在beginTransaction和commitTransaction之间。
Consumer端的代码也需要加上isolation.level参数用以处理事务提交的数据。
producer.initTransactions();
try {producer.beginTransaction();producer.send(record1);producer.send(record2);producer.commitTransaction();
} catch (KafkaException e) {producer.abortTransaction();
}事务Producer虽然在多分区的数据处理上保证了幂等但是处理性能上相应的是会有一些下降的。
数据存储 Kafka 消息以 Partition 作为存储单元每个 Topic 的消息被一个或者多个 Partition 进行管理。 Partition 是一个有序的不变的消息队列消息总是被追加到尾部。一个 Partition 不能被切分成多个散落在多个 Broker 上或者多个磁盘上。 Partition 又划分成多个 Segment 来组织数据。 Segment 在它的下面还有两个组成部分 索引文件以 .index 后缀结尾存储当前数据文件的索引。数据文件以 .log 后缀结尾存储当前索引文件名对应的数据文件。 请求模型 请求到Broker后也会通过类似于请求转发的组件Acceptor转发到对应的工作线程上Kafka中被称为网络线程池一般默认每个Broker上为3个工作线程可以通过参数 num.network.threads 进行配置。
并且采用轮询的策略可以很均匀的将请求分发到不同的网络线程中进行处理。 但是实际的处理请求并不是由网络线程池进行处理的而是会交给后续的IO线程池当网络线程接受到请求的时候会将请求写入到共享的请求队列中而IO线程池会进行异步的处理默认情况下是8个可以通过 num.io.threads 进行配置。 常见场景
重复消费
consumer 在消费过程中应用进程被强制kill掉或发生异常退出。 例如在一次poll500条消息后消费到200条时进程被强制kill消费到offset未提交或出现异常退出导致消费到offset未提交。 下次重启时依然会重新拉取500消息造成之前消费到200条消息重复消费了两次。 消费者消费时间过长。 max.poll.interval.ms参数定义了两次poll的最大间隔它的默认值是 5 分钟表示你的 Consumer 程序如果在 5 分钟之内无法消费完 poll 方法返回的消息那么 Consumer 会主动发起 离开组 的请求Coordinator 也会开启新一轮 Rebalance。 因为上次消费的offset未提交再次拉取的消息是之前消费过的消息造成重复消费。 提高消费能力提高单条消息的处理速度根据实际场景max.poll.interval.ms值设置大一点避免不必要的rebalance 可适当减小max.poll.records的值默认值是500可根据实际消息速率适当调小。 消息丢失
消费者程序丢失数据 Consumer 程序从 Kafka 获取到消息后开启了多个线程异步处理消息而 Consumer 程序自动地向前更新位移。 假如某个线程运行失败了它负责的消息没有被成功处理但位移已经被更新了因此这条消息对于 Consumer 而言实际上是丢失了。 最佳配置: 不要使用 producer.send(msg)而要使用 producer.send(msg, callback)。 设置 acks all 设置成 all则表明所有副本 Broker 都要接收到消息该消息才算是 已提交。 设置 retries 为一个较大的值。 当出现网络的瞬时抖动时消息发送可能会失败此时配置了retries 0 的 Producer 能够自动重试消息发送避免消息丢失。 设置 unclean.leader.election.enable false。 设置 replication.factor 3。 防止消息丢失的主要机制就是冗余。 设置 min.insync.replicas 1。 控制的是消息至少要被写入到多少个副本才算是 已提交 。设置成大于 1 可以提升消息持久性。在实际环境中千万不要使用默认值 1。 确保 replication.factor min.insync.replicas。 如果两者相等那么只要有一个副本挂机整个分区就无法正常工作了。 确保消息消费完成再提交。 Consumer 端有个参数 enable.auto.commit最好把它设置成 false并采用手动提交位移的方式。 消息顺序
乱序场景一
因为一个topic可以有多个partitionkafka只能保证partition内部有序。 1、可以设置topic 有且只有一个partition。 2、根据业务需要需要顺序的指定为同一个partition。 乱序场景二
对于同一业务进入了同一个消费者组之后用了多线程来处理消息会导致消息的乱序。 消费者内部根据线程数量创建等量的内存队列对于需要顺序的一系列业务数据根据key或者业务数据放到同一个内存队列中然后线程从对应的内存队列中取出并操作。