网站开发需要用例图吗,深圳pc端网站开发,网络舆情案例,常见的网络推广方式有哪些目录
一、引入Kafka的依赖
二、配置Kafka
三、创建主题
1、自动创建(不推荐)
2、手动动创建
四、生产者代码
五、消费者代码
六、常用的KafKa的命令 Kafka是一个高性能、分布式的消息发布-订阅系统#xff0c;被广泛应用于大数据处理、实时日志分析等场景。Spring B…
目录
一、引入Kafka的依赖
二、配置Kafka
三、创建主题
1、自动创建(不推荐)
2、手动动创建
四、生产者代码
五、消费者代码
六、常用的KafKa的命令 Kafka是一个高性能、分布式的消息发布-订阅系统被广泛应用于大数据处理、实时日志分析等场景。Spring Boot作为目前最流行的Java开发框架之一其简洁的配置和丰富的工具使得与Kafka的集成变得更加容易。本文将介绍如何使用Spring Boot整合Kafka实现高效的数据处理和消息传递。
一、引入Kafka的依赖 dependency groupIdorg.springframework.cloud/groupId artifactIdspring-cloud-starter-stream-kafka/artifactId /dependency 二、配置Kafka
spring:kafka:bootstrap-servers: 156.65.20.76:9092,156.65.20.77:9092,156.65.20.78:9092 #指定Kafka集群的地址这里有三个地址用逗号分隔。listener:ack-mode: manual_immediate #设置消费者的确认模式为manual_immediate表示消费者在接收到消息后立即手动确认。concurrency: 3 #设置消费者的并发数为3missing-topics-fatal: false #设置为false表示如果消费者订阅的主题不存在不会抛出异常。producer:key-serializer: org.apache.kafka.common.serialization.StringSerializer # 设置消息键的序列化器value-serializer: org.apache.kafka.common.serialization.StringSerializer #设置消息值的序列化器acks: 1 #一般就是选择1兼顾可靠性和吞吐量 如果想要更高的吞吐量设置为0如果要求更高的可靠性就设置为-1consumer:auto-offset-reset: earliest #设置为earliest表示将从最早的可用消息开始消费即从分区的起始位置开始读取消息。enable-auto-commit: false #禁用了自动提交偏移量的功能为了避免出现重复数据和数据丢失一般都是手动提交key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 设置消息键的反序列化器value-deserializer: org.apache.kafka.common.serialization.StringDeserializer #设置消息值的反序列化器
注kafka的acks有三个值可以根据实际情况和需求平衡消息系统的吞吐量和数据安全性来选择对应的值。
acks0这是最不可靠的模式。当设置为acks0时生产者在发送消息后不会等待任何服务器端的确认响应。这种模式下生产者可以迅速继续发送下一批消息效率最高但风险也最大。如果在此模式下发生网络问题或broker故障发送的消息可能会永久丢失生产者无法得知消息是否成功到达Kafka broker。因此这种配置适合于能够容忍少量数据丢失的场景例如实时数据分析或生成非关键的实时报表。acks1这是默认的配置模式也是一种折衷方案。在这种模式下生产者会等待分区的领导者节点leader确认消息已经成功写入磁盘才会发送确认信息给生产者。这提高了数据的安全性因为只要领导者节点保存了消息即使跟随者replicas没有及时同步消息也不会丢失。然而如果领导者在同步给所有追随者之前崩溃那么尚未同步的副本将无法获取该消息仍然存在消息丢失的风险。acksall或-1这是最可靠的模式。在这个模式下生产者不仅需要领导者节点确认还会等待所有同步副本In-sync replicas, ISR都确认写入消息后才会收到确认。这极大地增强了数据的持久性保证确保了即使在多个节点故障的情况下消息也不会丢失。此模式适用于数据可靠性要求非常高的场景如金融交易系统或重要的日志记录
三、创建主题 1、自动创建(不推荐) 不存在的主题会自动创建分区数和副本数均为默认值。而默认值可能会不符合某些场景的要求。 在kafka的安装目录conf目录下找到该配置文件server.properties添加如下配置 num.partitions3 #默认3个分区 auto.create.topics.enabletrue #开启自动创建主题 default.replication.factor3 #默认3个副本 2、手动动创建 在kafka的安装目录bin目录下执行如下命令 //创建一个有三个分区和三个副本名为zhuoye的主题 ./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3 --topic zhuoye 四、生产者代码 Slf4j
Component
public class ALiYunServiceImpl implents IALiYunService {Autowiredprivate KafkaTemplate kafkaTemplate;Autowiredprivate ExecutorService executorService;String topicName zhuoye;Overridepublic void queryECSMetricInfo() {//发送到kafka的消息集合,因为使用了多线程并且在多线程中往该集合进行添加操作所以需要线程安全的ListMessage messages Collections.synchronizedList(new ArrayList());boolean flag true;//获取上次查询时间Long startTime Long.valueOf(queryTimeRecordMapper.selectTimeByBelongId(3)) * 1000;Long endTime System.currentTimeMillis();try {//查询出所有的运行中的实例ListCloudInstanceAssetDto cloudInstances cloudInstanceAssetMapper.queryAllRunningInstance(1, Running);if (CollectionUtils.isEmpty(cloudInstances)) {return;}//定义计数器CountDownLatch latch new CountDownLatch(cloudInstances.size());//遍历查询for (CloudInstanceAssetDto instance : cloudInstances) {executorService.submit(() - {try {//获取内网流出带宽并将结果封装到消息集合中dealMetricDataToMessage(ALiYunConstant.ECS_INTRANET_OUT_RATE, ALiYunConstant.INTRANET_OUT_RATE_NAME, ALiYunConstant.LW_INTRANET_OUT_RATE_CODE,startTime, endTime, instance, messages);} catch (Exception e) {log.error(获取ECS的指标数据-多线程处理任务异常, e);} finally {latch.countDown();}});}//等待任务执行完毕latch.await();//将最终的消息集合发送到kafkaif (CollectionUtils.isNotEmpty(messages)) {for (int i 0; i messages.size(); i) {if (StringUtils.isNotBlank(messages.get(i).getValue()) noSuchInstance.equals(messages.get(i).getValue())) {continue;}kafkaTemplate.send(topicName, messages.get(i));}}} catch (Exception e) {flag false;log.error(获取ECS的指标数据失败, e);}//更新记录上次查询时间if (flag) {QueryTimeRecord queryTimeRecord new QueryTimeRecord();queryTimeRecord.setBelongId(3).setLastQueryTime(String.valueOf((endTime - 1000 * 60 * 1) / 1000)); //开始时间往前推1分钟queryTimeRecordMapper.updateByBelongId(queryTimeRecord);}}
这个时候如果你想看有没有把消息发送到kafka的指定主题可以使用如下命令 kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic zhuoye 五、消费者代码
Slf4j
Component
public class KafkaConsumer {// 消费监听KafkaListener(topics zhuoye,groupId zhuoye-aliyunmetric)public void consumeExtractorChangeMessage(ConsumerRecordString, String record, Acknowledgment ack){try {String value record.value();//处理数据存入openTsDb.................................ack.acknowledge();//手动提交}catch (Exception e){log.error(kafa-topic【zhuoye】消费阿里云指标源消息【失败】);log.error(e.getMessage());}}
}
六、常用的KafKa的命令 //创建主题 ./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3 --topic zhuoye //查看kafka是否接收对应的消息 kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic zhuoye // 修改kafka-topic分区数 ./kafka-topics.sh --zookeeper localhost:2181 -alter --partitions 6 --topic zhuoye // 查看topic分区数 ./kafka-topics.sh --zookeeper localhost:2181 --describe --topic zhuoye // 查看用户组消费情况 ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group zhuoye-aliyunmetric --describe