当前位置: 首页 > news >正文

织梦网站后台如何做百度优化网站建设的公司哪家是上市公司

织梦网站后台如何做百度优化,网站建设的公司哪家是上市公司,高端个性化网站建设,无锡市政务服务网站建设项目从本篇开始 打算用三篇文章 分别介绍下Producer生产消费#xff0c;Consumer消费消息 以及Spring是如何集成Kafka 三部分#xff0c;致于对于Broker的源码解析#xff0c;因为是scala语言写的#xff0c;暂时不打算进行学习分享。 总体介绍 clients : 保存的是Kafka客户端…从本篇开始 打算用三篇文章 分别介绍下Producer生产消费Consumer消费消息 以及Spring是如何集成Kafka 三部分致于对于Broker的源码解析因为是scala语言写的暂时不打算进行学习分享。 总体介绍 clients : 保存的是Kafka客户端代码主要就是生产者和消费者代码config保存Kafka的配置文件其中比较重要的配置文件是server.properties。connect目录保存Connect组件的源代码。我在开篇词里提到过Kafka Connect组件是用来实现Kafka与外部系统之间的实时数据传输的。core目录保存Broker端代码。Kafka服务器端代码全部保存在该目录下。 而一条消息的整体流转过程其实就是经过三部分也就是Producer\Broker\Consumer。 因为是对主要核心流程的分析所以只会截核心代码。具体后面细节在说。 producer整体流程 对于Producer来说其实就是几部分。 初始化、发送流程、缓冲区 初始化流程 设置分区器 // 设置分区器this.partitioner config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG,Partitioner.class,Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));设置重试时间默认100ms如果配置Kafka可以重试retries制定重试次数retryBackoffMs指定重试的间隔 long retryBackoffMs config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);获取Key和Value的序列化器 // 序列化器if (keySerializer null) {this.keySerializer config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,Serializer.class);this.keySerializer.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)), true);} else {config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);this.keySerializer keySerializer;}if (valueSerializer null) {this.valueSerializer config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,Serializer.class);this.valueSerializer.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)), false);} else {config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);this.valueSerializer valueSerializer;}拦截器 // 设置拦截器ListProducerInterceptorK, V interceptorList (List) config.getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,ProducerInterceptor.class,Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));if (interceptors ! null)this.interceptors interceptors;elsethis.interceptors new ProducerInterceptors(interceptorList);其他参数 // 设置最大消息为多大,默认是1M 默认是16384this.maxRequestSize config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);// 设置缓存大小 默认是32M 默认是33554432 RecordAccumulator32MBthis.totalMemorySize config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);// 设置压缩类型 可以提升性能this.compressionType CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));this.accumulator new RecordAccumulator(logContext, // 因为是通过缓冲区发送消息的,所以需要消息累计器RecordAccumulator.PartitionerConfig partitionerConfig new RecordAccumulator.PartitionerConfig(enableAdaptivePartitioning,config.getLong(ProducerConfig.PARTITIONER_AVAILABILITY_TIMEOUT_MS_CONFIG));初始化元数据 // 初始化集群元数据if (metadata ! null) {this.metadata metadata;} else {this.metadata new ProducerMetadata(retryBackoffMs,config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),config.getLong(ProducerConfig.METADATA_MAX_IDLE_CONFIG),logContext,clusterResourceListeners,Time.SYSTEM);this.metadata.bootstrap(addresses);}创建Sender线程其中包含一个重要的网络组件NetWorkClient // 创建sender线程this.sender newSender(logContext, kafkaClient, this.metadata);// 线程nameString ioThreadName NETWORK_THREAD_PREFIX | clientId;// 封装起来 设置为守护线程 并启动this.ioThread new KafkaThread(ioThreadName, this.sender, true);// 线程启动this.ioThread.start();发送消息流程 发送消息的过程 public FutureRecordMetadata send(ProducerRecordK, V record, Callback callback) {// 执行拦截器逻辑ProducerRecordK, V interceptedRecord this.interceptors.onSend(record);return doSend(interceptedRecord, callback);}先执行拦截器可以发现就是遍历拦截器然后执行对应的onSend()方法。当我们想增加一个拦截器直接实现对应的接口重写onSend()方法然后Kafka就会调用我们的onSend方法。通过提供一个拓展点进行使用。 public ProducerRecordK, V onSend(ProducerRecordK, V record) {ProducerRecordK, V interceptRecord record;for (ProducerInterceptorK, V interceptor : this.interceptors) {try {interceptRecord interceptor.onSend(interceptRecord);} catch (Exception e) {}}return interceptRecord;}从Kafka Broker集群获取元数据metadata // 从broker获取元数据clusterAndWaitTime waitOnMetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs);对key和value进行序列化调用对应的serialize的方法。 byte[] serializedKey;try {// 选择对应的序列化进行操作serializedKey keySerializer.serialize(record.topic(), record.headers(), record.key());} catch (ClassCastException cce) {}byte[] serializedValue;try {serializedValue valueSerializer.serialize(record.topic(), record.headers(), record.value());} catch (ClassCastException cce) {}// 选择具体的分区int partition partition(record, serializedKey, serializedValue, cluster);// 消息缓存到RecoredAccumulatorresult accumulator.append(record.topic(), partition, timestamp, serializedKey,serializedValue, headers, appendCallbacks, remainingWaitMs, false, nowMs, cluster);// 消息发送的条件// 缓冲区数据大小达到batch.size 或者linnger.ms达到上限后 唤醒sneder线程。if (result.batchIsFull || result.newBatchCreated) {log.trace(Waking up the sender since topic {} partition {} is either full or getting a new batch, record.topic(), appendCallbacks.getPartition());this.sender.wakeup();}Sender线程 runOnce();long pollTimeout sendProducerData(currentTimeMs);缓冲区、 这篇讲解很详细 https://www.cnblogs.com/rwxwsblog/p/14754810.html 生产者核心参数配置 bootstrap.servers连接Broker配置一般就是xxxx:9092 key.serializer 和 value.serializer对key和value进行序列化器可以自定义一般就是String方式 buffer.memoryRecordAccumulator 缓冲区总大小默认32m。 batch.size 消息会以batch的方式进行发送这是一批数据的大小 默认是16K linger.ms发送消息的时机如果没有达到batch.size or linger.ms的时间就会发送 默认是0ms 立即发送 acks 0: 不落盘 1:只有leader落盘 -1(all) : leader和所有从节点持久化成功 默认是-1 max.in.flight.requests.per.connection允许最多没有返回 ack 的次数默认为 5 retries: 消息发送失败时系统重发消息 默认值 2147483647 retry.backoff.ms两次重试间隔 默认是100ms enable.idempotence: 开启幂等性 默认true compression.type: 压缩格式 默认是none
http://www.w-s-a.com/news/459/

相关文章:

  • 商品网站怎么做wordpress 表情拉长
  • 商城网站设计费用网络公司怎样推广网站
  • 视频公司的网站设计工图网
  • 免费快速网站十八个免费的舆情网站