当前位置: 首页 > news >正文

网站底部怎么做盐城网站优化

网站底部怎么做,盐城网站优化,做细分领域的同城网站,建一个wordpress网站成本目录 1 可靠的数据传递1.1 Kafka的可靠性保证1.2 复制1.3 Broker配置1.3.1 复制系数1.3.2 broker的位置分布1.3.3 不彻底的首领选举1.3.4 最少同步副本1.3.5 保持副本同步1.3.6 持久化到磁盘flush.messages9223372036854775807flush.ms9223372036854775807 1.2 在可靠的系统中使… 目录 1 可靠的数据传递1.1 Kafka的可靠性保证1.2 复制1.3 Broker配置1.3.1 复制系数1.3.2 broker的位置分布1.3.3 不彻底的首领选举1.3.4 最少同步副本1.3.5 保持副本同步1.3.6 持久化到磁盘flush.messages9223372036854775807flush.ms9223372036854775807 1.2 在可靠的系统中使用生产者1.2.1 根据需求配置恰当的acks1.2.2 配置重试参数1.2.3 处理不可重试错误 1.3 在可靠的系统中使用消费者1.3.1 消费者的可靠性配置1.3.2 自动提交偏移量1.3.3 手动提交偏移量1 总是在处理完消息后提交偏移量2 提交频率时性能和重复消息数量之间的权衡3 在正确的时间点提交正确的偏移量4 消费者再均衡5 消费者重试6 消费者可能需要维护状态 1.4 验证系统可靠性1.4.1 验证配置1.4.2 验证应用程序 1.5 监控系统可靠性 2 精确一次性语义2.1 幂等生产者2.1.1 启用幂等生产者2.1.2 工作原理2.1.3 幂等生产者的局限性 2.2 事务2.2.1 事务的应用场景2.2.2 事务可以解决哪些问题2.2.3 事务是如何保证精确一次性的生产者端transactional.idnull消费者端isolation.levelread_uncommitted 2.2.4 事务不能解决哪些问题1 在流式处理中执行外部操作2 从Kafka中读取数据并写入数据库3 从一个数据库读取数据写入Kafka再从Kafka将数据写入另一个数据库4 将数据从一个集群复制到另一个集群5 发布订阅模式 2.2.5 如何使用事务 1 可靠的数据传递 1.1 Kafka的可靠性保证 分区中的消息时有序的一条消息只有被写入分区所有的同步副本时才被认为是“已提交”只要还有一个副本是活动的已提交的消息就不会丢失消费者只能读取已提交的消息 1.2 复制 同步副本需满足的条件 与ZooKeeper之间有一个活跃的会话在过去的6秒内向ZooKeeper发送过消息在过去的10秒内从首领那里复制过消息在过去的10秒内从首领那里复制过最新消息 1.3 Broker配置 1.3.1 复制系数 broker级别配置default.replication.factor1 topic级别配置replication.factor1 建议非关键数据小于3 1.3.2 broker的位置分布 建议把broker分布在多个不同的机器上 1.3.3 不彻底的首领选举 unclean.leader.election.enablefalse 指示是否启用非同步副本可以被选为首领作为首领选举的最后手段即使这样做可能会导致数据丢失 1.3.4 最少同步副本 min.insync.replicas1 最小同步副本数。min.insync.replicas(默认值为1)代表了正常写入生产者数据所需要的最少ISR个数, 当ISR中的副本数量小于min.insync.replicas时Leader停止写入生产者生产的消息并向生产者抛出NotEnoughReplicas异常阻塞等待更多的 Follower 赶上并重新进入ISR, 因此能够容忍min.insync.replicas-1个副本同时宕机。当与min.insync.replicas和acks一起使用时可以实现更大的耐用性保证。一个典型的场景是创建一个复制因子为3的主题将min.insync.replicas设置为2并使用acks “all”进行生产。 1.3.5 保持副本同步 replica.lag.time.max.ms30000 (30 seconds) 如果一个follower这段时间内没有发送任何fetch请求或者没有消费leader最新偏移量的消息那么leader将从isr中删除该follower。 zookeeper.session.timeout.ms18000 (18 seconds) 允许broker不向ZooKeeper发送心跳的时间间隔。如果超过这个时间不向ZK发送心跳ZK会认为broker已经死亡会将其移除出集群。 1.3.6 持久化到磁盘 Kafka会在重启之前和关闭日志片段的时候将消息冲刷到磁盘上或者等Linux系统页面缓存被填满时冲刷。拥有不同机架上的副本的多个磁盘比只写入首领磁盘更加安全。不过也可以让broker更频繁的写入磁盘。 flush.messages9223372036854775807 此设置允许指定一个间隔在该间隔我们将强制对写入日志的数据进行fsync。例如如果将其设置为1我们将在每条消息之后进行fsync如果是5我们将在每5条消息之后进行fsync。通常我们建议您不要设置此项并使用复制以提高耐用性并允许操作系统的后台刷新功能因为它更高效。此设置可以在每个主题的基础上覆盖请参阅每个主题配置部分。 flush.ms9223372036854775807 此设置允许指定一个时间间隔在该时间间隔内我们将强制对写入日志的数据进行fsync。例如如果将其设置为1000我们将在1000毫秒后进行fsync。通常我们建议您不要设置此项并使用复制以提高耐用性并允许操作系统的后台刷新功能因为它更高效。 1.2 在可靠的系统中使用生产者 1.2.1 根据需求配置恰当的acks acks参数指定了生产者在多少个分区副本收到消息的情况下才会认为消息写入成功。允许以下设置 acks0。如果设置为零则生产者根本不会等待来自服务器的任何确认。该记录将立即添加到套接字缓冲区并被视为已发送。在这种情况下无法保证服务器已收到记录重试配置也不会生效因为客户端通常不会知道任何故障。为每条记录返回的偏移量将始终设置为-1。 acks1。表示只要首领收到消息并将记录成功写入其本地日志就返回成功响应不等待所有追随者的确认。在这种情况下如果首领在确认成功后追随者复制之前崩溃则记录将会丢失。 acksall。表示首领将等待同步复制集合中所有的副本都确认收到了记录。这保证了只要至少有一个同步复制副本保持活动状态记录就不会丢失。这是最有力的保证。这相当于acks-1的设置。 请注意启用幂等性要求此配置值为“all”。如果设置了冲突的配置并且没有显式启用幂等性则会禁用幂等性。 1.2.2 配置重试参数 设置自动重试并使用默认重试次数。 将delivery.timeout.ms设置成愿意等待的时长生产者会在这段时间内重试。 1.2.3 处理不可重试错误 例如 不可重试的broker错误消息大小身份验证在将消息发送给broker之前发生的错误比如序列化错误在生产者道道重试次数上限或重试消息占用的内存达到上限时发生的错误超时 1.3 在可靠的系统中使用消费者 1.3.1 消费者的可靠性配置 group.id auto.offset.reset 1.3.2 自动提交偏移量 如果所有的处理逻辑都是在轮询里进行的并且不需要维护轮询之间的状态比如为了聚合数据那么就很简单可以使用自动提交在轮询结束时提交偏移量。 enable.auto.commit 无法控制应用应用程序可能重复处理的消息的数量 如果应用程序把消息交给另一个后台线程处理那么只能使用手动提交了 auto.commit.interval.ms 自动提交的频率过大会增加重复的概率过小会增加额外开销 1.3.3 手动提交偏移量 手动提交偏移量增加了灵活性但也增加了复杂度并且有可能对性能产生影响所以可能需要考虑如下事项 1 总是在处理完消息后提交偏移量 如果所有的处理逻辑都是在轮询里进行的就很简单选择一个合适的提交频率 如果涉及额外线程该如何呢 2 提交频率时性能和重复消息数量之间的权衡 3 在正确的时间点提交正确的偏移量 一定要在处理完后在提交偏移量 4 消费者再均衡 如何在分区被撤销之前提交偏移量 如何在应用程序被分配到新分区并清理状态时提交偏移量 5 消费者重试 如果遇到批次中的部分消息需要稍后处理。因为消费者不能针对每一条消息提交偏移量而是提交最后一条成功的偏移量所以需要借助额外的工具来处理。 有两种模式来解决这个问题 在遇到可重试错误时提交最后一条处理成功的消息的偏移量然后把还未处理好的消息保存到缓冲区这样下一个轮询就不会把他们覆盖掉并调用消费者的pause()方法确保其他轮询不会返回数据之后继续处理缓冲区里的消息。在遇到可重试错误时把消息写到另一个重试主题并继续处理其他消息。另一个消费者群组负责处理重试主题中的消息或者让一个消费者同时订阅主主题和重试主题。这种模式有点像其他消息系统中的死信队列。 6 消费者可能需要维护状态 1.在提交偏移量的同时状态存入另一个主题中可以开启事务来保证一致性。当一个线程重新启动时就可以读取状态和从偏移量处读取消息。 2. 使用流式处理框架 1.4 验证系统可靠性 1.4.1 验证配置 测试场景 首领选举控制器选举滚动重启不彻底的首领选举 1.4.2 验证应用程序 测试场景 客户端与服务器断开连接客户端与服务器之间存在高延迟磁盘被填满磁盘被挂起首领选举滚动重启broker滚动启动消费者滚动重启生产者 1.5 监控系统可靠性 生产者 错误率重试率 消费者 消费者滞后 broker kafka.server:typeBrokerTopicMetrics,nameFailedProduce-RequestsPerSeckafka.server:typeBrokerTopicMetrics,nameFailedFetch-RequestsPerSec 2 精确一次性语义 上一章主要讨论如何保证不丢失消息但不能保证出现重复消息。 在一些简单的应用程序中处理重复消息比较简单消息都包含有唯一标识。 但在处理聚合事件的流式处理应用程序中因为很难取判断结果的正确性因为结果中可能包含了重复消息。在这种情况下需要提供更强的保证这种保证就是精确一次性处理语义。 2.1 幂等生产者 如果一个操作被执行多次的结果与被执行一次相同那么这个操作就是幂等的。 2.1.1 启用幂等生产者 如果需要启用幂等生产者需要在生产者端做如下配置 enable.idempotencetruemax.in.flight.requests.per.connection5acksall 其实这三个参数都已经是默认配置无需显式添加。 2.1.2 工作原理 如果启用了幂等生产者那么每条消息都将包含生产者IDPID和序列号。所以一条消息的唯一标识由4个字段组成 主题分区生产者ID序列号 broke会用组合唯一标识来跟踪保存在内存和硬盘中写入每个分区的最后5条消息重复的消息会被拒绝记录错误指标kafka.server:typeRequestMetrics,nameErrorsPerSec并返回错误。生产者会记录这个错误record-error-rate并反映在指标当中但不抛出异常也不触发告警。 乱序错误broke收到不连续的序列号将会返回乱序错误。 2.1.3 幂等生产者的局限性 幂等生产者只能防止由生产者内部重试逻辑引起额消息重复。 如果同一个生产者调用多次producer.send()方法发送同一条消息或多个生产者都发送同一条消息Kafka中将产生多条消息。 2.2 事务 Kafka的事务机制为了让流式处理应用程序生成正确的结果要保证每个输入的消息都被精确处理一次即使是在发生故障的情况下。 事务适用于流式处理应用程序的基础模式即“消费-处理-生产”。 2.2.1 事务的应用场景 流式处理应用程序处理过程包包含了聚合或连接操作需要更新内部状态事务对他们来说就会非常有用。 2.2.2 事务可以解决哪些问题 应用程序崩溃导致的重复处理 假设应用程序处理完一批消息并产生结果之后在偏移量提交之前崩溃了。后来的其它实例就会重复处理这些数据并生成结果那么结果数据就会包含重复处理的数据。“僵尸”应用程序导致的重复处理 假设一个应用程序正在处理消息的时候与Kafka暂时断开了连接那么Kafka就会安排其它的实例继续处这批消息。这时候应用程序恢复运行了它继续处理之前的消息批次并写入结果。这样结果就包含了重复处理的数据。 2.2.3 事务是如何保证精确一次性的 Kafka事务引入了原子多分区写入的概念用来保证写入多个分区和提交偏移量是一个原子操作。 生产者端 上图中Producer A是事务性生产者用来启动事务和执行原子多分区写入。需要给事务配置属性transactional.id并用initTransactions()方法初始化。 broker维护了producer.id和transactional.id的映射如果使用同一个transactional.id再次调用initTransactions()方法那么生产者分配到的producer.id和之前是一样的。 在调用initTransactions()方法初始化时broker会为producer分配epoc用来防止隔离僵尸应用。 transactional.idnull 用于事务传递的TransactionalId。这实现了跨越多个生产者会话的可靠性语义因为它允许客户端保证使用相同TransactionalId的事务在开始任何新事务之前已经完成。如果没有提供TransactionalId则生产者仅限于幂等传递。 如果配置了TransactionalId则隐含enable.idempotence。默认情况下未配置TransactionId这意味着无法使用事务。请注意默认情况下交易需要至少三个broker的集群这是建议的生产设置为了进行开发您可以通过调整broker设置transaction.state.log.replication.factor来更改这一点。 消费者端 对于消费者我们通过参数isolation.level来消费者的事务隔离级别用啦控制消费者如何读取以事务方式写入的消息。 isolation.levelread_uncommitted 控制如何读取以事务方式写入的消息。如果设置为read_committedconsumer.poll()将只返回已提交的事务消息。如果设置为read_uncommitted默认值consumer.poll()将返回所有消息甚至是已中止的事务消息。非事务性消息将在任一模式下无条件返回。 消息将始终以偏移顺序返回。因此在read_committed模式下consumer.poll()将只返回最后一个稳定偏移量LSO, last stable offset之前的消息该偏移量小于第一个打开事务的偏移量。特别是在相关交易完成之前属于正在进行的交易的消息之后出现的任何消息都将被扣留。因此当存在进行中事务时read_committed消费者将无法读取高水位线。 此外在read_committed中seekToEnd方法将返回LSO。 在上图中消费者B在事务提交之前只能读取到message 2但是消费者C则可以读取到所有消息。 2.2.4 事务不能解决哪些问题 Kafka事务无法实现精确一次性保证的几种场景 1 在流式处理中执行外部操作 比如处理数据时发送电子邮件并不能保证只发送一次而且无法撤销。 2 从Kafka中读取数据并写入数据库 这种场景的流程是“消费-处理-数据库”状态数据写入数据库。我们可以考虑在数据库中维护状态数据和偏移量借助数据的事务来保证数据一致性。 还有一种场景是“消费-处理-主题-数据库”状态数据既要写入主题又要写入数据库。这个问题的一种常见的解决方案是“发件箱模式”。 应用将消息发送到一个Kafka主题也就是发件箱然后另外一个独立中继服务将会从Kafka读取消息并更新数据库需要确保数据库更新是幂等的。 这个模式可以保证消息最终到达Kafka主题消费者和数据库要么都不到达。 这个模式的反模式是用数据库作为发件箱然后另外一个独立中继服务将确保数据库更新也作为消息发送给Kafka。这种模式可以借助数据的内置约束唯一索引外键保证精确一次性。 3 从一个数据库读取数据写入Kafka再从Kafka将数据写入另一个数据库 4 将数据从一个集群复制到另一个集群 有一个为MM2增加精确一次性语义的改进提议KIP665 5 发布订阅模式 前面讨论的精确一次性保证是针对“消费-处理-生产”的模式而发布订阅在Kafka 中是“生产-消费-处理”的模式。针对这种模式生产者可以开启事务但是消费者只能通过设置隔离级别保证看不到已终止事务的消息无法开启事务保证只消费一次消息。 2.2.5 如何使用事务 package com.qupeng.demo.kafka.kafkaapache.transaction;import org.apache.kafka.clients.consumer.*; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory;import java.time.Duration; import java.util.*;public class KafkaTransactionK, V {private final static Logger logger LoggerFactory.getLogger(KafkaTransaction.class);public K, V KafkaProducerK, V createKafkaProducer() {Properties producerProps new Properties();producerProps.put(bootstrap.servers, your_broker_list);producerProps.put(key.serializer, org.apache.kafka.common.serialization.StringSerializer);producerProps.put(value.serializer, org.apache.kafka.common.serialization.StringSerializer);producerProps.put(transactional.id, your_transactional_id);ProducerString, String producer new KafkaProducer(producerProps);Properties consumerProps new Properties();consumerProps.put(bootstrap.servers, your_broker_list);consumerProps.put(group.id, your_group_id);consumerProps.put(enable.auto.commit, false);consumerProps.put(key.deserializer, org.apache.kafka.common.serialization.StringDeserializer);consumerProps.put(value.deserializer, org.apache.kafka.common.serialization.StringDeserializer);consumerProps.put(isolation.level, read_committed);ConsumerString, String consumer new KafkaConsumer(consumerProps);producer.initTransactions();consumer.subscribe(Arrays.asList(input_topic));while (true) {ConsumerRecordsString, String records null;try {records consumer.poll(Duration.ofMillis(100));if (records.count() 0) {producer.beginTransaction();for (ConsumerRecordString, String record : records) {// 处理消息String processedValue processMessage(record.value());// 发送处理后的消息到另一个主题producer.send(new ProducerRecord(output_topic, record.key(), processedValue));}MapTopicPartition, OffsetAndMetadata offsetAndMetadataMap consumerOffsets(records);producer.sendOffsetsToTransaction(offsetAndMetadataMap, consumer.groupMetadata());producer.commitTransaction();}} catch (WakeupException e) {// 关闭消费者consumer.close();throw new KafkaException();} catch (ProducerFencedException | InvalidProducerEpochException e) {// 程序已变为僵尸只能退出throw new KafkaException(String.format(The transactional.id %s is used by another process., your_transactional_id));} catch (KafkaException e) {// 其它异常中止事务重置偏移量并进行重试producer.abortTransaction();resetToLatestCommittedPositions(consumer, records);} finally {producer.close();consumer.close();}}}private void resetToLatestCommittedPositions(ConsumerString, String consumer, ConsumerRecordsString, String records) {for (TopicPartition partition : records.partitions()) {ListConsumerRecordString, String partitionRecords records.records(partition);consumer.seek(partition, partitionRecords.get(0).offset());}}private MapTopicPartition, OffsetAndMetadata consumerOffsets(ConsumerRecordsString, String records) {MapTopicPartition, OffsetAndMetadata offsetAndMetadataMap new HashMap();for (TopicPartition partition : records.partitions()) {ListConsumerRecordString, String partitionRecords records.records(partition);offsetAndMetadataMap.put(partition, new OffsetAndMetadata(partitionRecords.get(partitionRecords.size() - 1).offset()));}return offsetAndMetadataMap;}private String processMessage(String value) {return Message has been handled.;} }
http://www.w-s-a.com/news/979777/

相关文章:

  • 郑州做网站九零后网络沧州做网站的专业公司
  • 小游戏网站建设可以自己做图片的软件
  • 湖南地税局官网站水利建设基金app仿制
  • 苏州网站设计kgwl建设网站需要用到哪些技术人员
  • 万户网络做网站如何亚马逊网站建设
  • 门户网站制作费用暴雪公司最新消息
  • 深圳专业建网站公司济南公司做网站的价格
  • 怎么运行自己做的网站网上申请平台怎么申请
  • 旅游公司网站 优帮云新闻近期大事件
  • 电商网站后台报价营销软文小短文
  • 网站建设项目售后服务承诺公司名称邮箱大全
  • 湖南网站建设哪里好做ppt的网站叫什么名字
  • 容城县建设银行网站电子商务网站建设子项目
  • 网站管理助手3.0做淘宝网站用什么软件做
  • 贵阳做网站的公司wordpress趣味插件
  • 自己设置免费网站设计平台南京哪里有做公司网站的
  • 建设公司内网网站的意义自助建站网站的宣传手册
  • 手机建设中网站建立个人网站服务器
  • 网站开发工程师岗位概要网站怎么制作教程
  • 城乡建设主管部门官方网站公司简介模板ppt范文
  • 网站认证必须做么cc0图片素材网站
  • net域名 著名网站国外设计案例网站
  • 淘宝客网站哪里可以做app地推网
  • 宜昌建设厅网站中国最新时事新闻
  • 微网站怎么开发wordpress 发表评论
  • 山东网站建设是什么一页网站首页图如何做
  • 游戏开发与网站开发哪个难万网影
  • 做网站编程语言建筑施工特种证书查询
  • 找人做网站内容自己编辑吗修改wordpress登陆界面
  • 登陆建设银行wap网站湖南网站建设磐石网络答疑