广东购物网站建设,中国前500强企业名单,网站建设个人主要事迹,网站js代码检测消息中间件对比#xff1a; 1、吞吐、可靠性、性能
Kafka安装
Kafka对于zookeeper是强依赖#xff0c;保存kafka相关的节点数据#xff0c;所以安装Kafka之前必须先安装zookeeper
Docker安装zookeeper
下载镜像#xff1a;
docker pull zookeeper:3.4.14创建容器
do…消息中间件对比 1、吞吐、可靠性、性能
Kafka安装
Kafka对于zookeeper是强依赖保存kafka相关的节点数据所以安装Kafka之前必须先安装zookeeper
Docker安装zookeeper
下载镜像
docker pull zookeeper:3.4.14创建容器
docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14Docker安装kafka
下载镜像
docker pull wurstmeister/kafka:2.12-2.3.1创建容器
docker run -d --name kafka \
--env KAFKA_ADVERTISED_HOST_NAME192.168.200.130 \
--env KAFKA_ZOOKEEPER_CONNECT192.168.200.130:2181 \
--env KAFKA_ADVERTISED_LISTENERSPLAINTEXT://192.168.200.130:9092 \
--env KAFKA_LISTENERSPLAINTEXT://0.0.0.0:9092 \
--env KAFKA_HEAP_OPTS-Xmx256M -Xms256M \
--nethost wurstmeister/kafka:2.12-2.3.1kafka入门
生产者发送消息多个消费者只能有一个消费者接收到消息生产者发送消息多个消费者都可以接收到消息
1创建kafka-demo项目导入依赖
dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactId
/dependency2生产者发送消息
package com.heima.kafka.sample;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;/*** 生产者*/
public class ProducerQuickStart {public static void main(String[] args) {//1.kafka的配置信息Properties properties new Properties();//kafka的连接地址properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,192.168.200.130:9092);//发送失败失败的重试次数properties.put(ProducerConfig.RETRIES_CONFIG,5);//消息key的序列化器properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer);//消息value的序列化器properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer);//2.生产者对象KafkaProducerString,String producer new KafkaProducerString, String(properties);//封装发送的消息ProducerRecordString,String record new ProducerRecordString, String(itheima-topic,100001,hello kafka);//3.发送消息producer.send(record);//4.关闭消息通道必须关闭否则消息发送不成功producer.close();}}3消费者接收消息
package com.heima.kafka.sample;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;/*** 消费者*/
public class ConsumerQuickStart {public static void main(String[] args) {//1.添加kafka的配置信息Properties properties new Properties();//kafka的连接地址properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 192.168.200.130:9092);//消费者组properties.put(ConsumerConfig.GROUP_ID_CONFIG, group2);//消息的反序列化器properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer);properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer);//2.消费者对象KafkaConsumerString, String consumer new KafkaConsumerString, String(properties);//3.订阅主题consumer.subscribe(Collections.singletonList(itheima-topic));//当前线程一直处于监听状态while (true) {//4.获取消息ConsumerRecordsString, String consumerRecords consumer.poll(Duration.ofMillis(1000));for (ConsumerRecordString, String consumerRecord : consumerRecords) {System.out.println(consumerRecord.key());System.out.println(consumerRecord.value());}}}}kafka高可用设计
1、设计集群模式
Kafka的服务器端由被称为 Broker 的服务进程构成即一个 Kafka 集群由多个Broker 组成。当一个机器宕机了另外一个机器就会替补山
2、备份机制
Kafka定义了两类副本
领导者副本(Leader Replica)追随者副本 (Follower Replica) 追随者副本分为两类 1、一种是ISR副本同步保存 2、普通的副本异步保存 出现主节点宕机会先选ISR副本中的一个成为新的主节点保证数据一致性没有ISR节点再从普通节点中挑选 针对全部节点宕机的情况有两种策略 1、等待第一个ISR副本保证了数据的尽可能一致 2、等待一个复活的追随者无论是ISR还是普通提高系统的高可用性。
kafka生产者详解
1发送类型 同步发送 使用send()方法发送它会返回一个Future对象调用get()方法进行等待就可以知道消息是否发送成功
RecordMetadata recordMetadata producer.send(kvProducerRecord).get();
System.out.println(recordMetadata.offset());异步发送 调用send()方法并指定一个回调函数服务器在返回响应时调用函数
//异步消息发送
producer.send(kvProducerRecord, new Callback() {Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if(e ! null){System.out.println(记录异常信息到日志表中);}System.out.println(recordMetadata.offset());}
});2参数详解
ack
代码的配置方式
//ack配置 消息确认机制
prop.put(ProducerConfig.ACKS_CONFIG,all);参数的选择说明
确认机制说明acks0生产者在成功写入消息之前不会等待任何来自服务器的响应,消息有丢失的风险但是速度最快acks1默认值只要集群首领节点收到消息生产者就会收到一个来自服务器的成功响应acksall只有当所有参与赋值的节点全部收到消息时生产者才会收到一个来自服务器的成功响应
retries
生产者从服务器收到的错误有可能是临时性错误在这种情况下retries参数的值决定了生产者可以重发消息的次数如果达到这个次数生产者会放弃重试返回错误默认情况下生产者会在每次重试之间等待100ms
代码中配置方式
//重试次数
prop.put(ProducerConfig.RETRIES_CONFIG,10);消息压缩
默认情况下 消息发送时不会被压缩。
代码中配置方式
//数据压缩
prop.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,lz4);压缩算法说明snappy占用较少的 CPU 却能提供较好的性能和相当可观的压缩比 如果看重性能和网络带宽建议采用lz4占用较少的 CPU 压缩和解压缩速度较快压缩比也很客观gzip占用较多的 CPU但会提供更高的压缩比网络带宽有限可以使用这种算法
使用压缩可以降低网络传输开销和存储开销而这往往是向 Kafka 发送消息的瓶颈所在。
kafka消费者
消息的有序性
方法一个topic分区能保证自己的数据是按照先后消费的但是不能保证跨分区消息处理的先后顺序。我么只能使用一个分区在单分区种消息可以保证严格顺序消费
提交和偏移量 自动提交 当enable.auto.commit被设置为true提交方式就是让消费者自动提交偏移量每隔5秒消费者会自动把从poll0方法接收的最大偏移量提交上去这样只是记录了规定时间内的最大偏移量其实与数据提交的偏移量存在偏差因此可能会出现数据的重复提交或者丢失 手动提交 当enableauto.commit被设置为false可以有以下三种提交方式
提交当前偏移量(同步提交)异步提交同步和异步组合提交
同步提交commitSync()方法会一直尝试直至提交成功如果提交失败也可以记录到错误日志里。
while (true){ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(1000));for (ConsumerRecordString, String record : records) {System.out.println(record.value());System.out.println(record.key());try {consumer.commitSync();//同步提交当前最新的偏移量}catch (CommitFailedException e){System.out.println(记录提交失败的异常e);}}
}异步提交手动提交有一个缺点那就是当发起提交调用时应用会阻塞。消息没有重试机制
while (true){ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(1000));for (ConsumerRecordString, String record : records) {System.out.println(record.value());System.out.println(record.key());}consumer.commitAsync(new OffsetCommitCallback() {Overridepublic void onComplete(MapTopicPartition, OffsetAndMetadata map, Exception e) {if(e!null){System.out.println(记录错误的提交偏移量 map,异常信息e);}}});
}同步和异步组合提交
异步提交也有个缺点那就是如果服务器返回提交失败异步提交不会进行重试。相比较起来同步提交会进行重试直到成功或者最后抛出异常给应用。异步提交没有实现重试是因为如果同时存在多个异步提交进行重试可能会导致位移覆盖。
举个例子假如我们发起了一个异步提交commitA此时的提交位移为2000随后又发起了一个异步提交commitB且位移为3000commitA提交失败但commitB提交成功此时commitA进行重试并成功的话会将实际上将已经提交的位移从3000回滚到2000导致消息重复消费。
try {while (true){ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(1000));for (ConsumerRecordString, String record : records) {System.out.println(record.value());System.out.println(record.key());}consumer.commitAsync();}
}catch (Exception e){e.printStackTrace();System.out.println(记录错误信息e);
}finally {try {consumer.commitSync();}finally {consumer.close();}
}springboot整合kafka
1、在父类中的pop文件中导入依赖包
xml
!-- kafkfa --
dependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactId
/dependency
dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactId
/dependency2、在需要用到kafka的微服务的naco中分别配置生产者和消费者配置
spring:kafka:bootstrap-servers: 192.168.200.130:9092producer:retries: 10key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerspring:kafka:bootstrap-servers: 192.168.200.130:9092consumer:group-id: ${spring.application.name}key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer传递消息为对象
目前springboot整合后的kafka因为序列化器是StringSerializer这个时候如果需要传递对象可以有两种方式
方式一可以自定义序列化器对象类型众多这种方式通用性不强本章节不介绍
方式二可以把要传递的对象进行转json字符串接收消息后再转为对象即可本项目采用这种方式
发送消息
GetMapping(/hello)
public String hello(){User user new User();user.setUsername(xiaowang);user.setAge(18);kafkaTemplate.send(user-topic, JSON.toJSONString(user));return ok;
}接收消息
package com.heima.kafka.listener;import com.alibaba.fastjson.JSON;
import com.heima.kafka.pojo.User;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;Component
public class HelloListener {KafkaListener(topics user-topic)public void onMessage(String message){if(!StringUtils.isEmpty(message)){User user JSON.parseObject(message, User.class);System.out.println(user);}}
}