济南网站制作套餐,wordpress修改文章浏览次数,wordpress页面模板下载,微网站搭建1、本文架构
本文目标是使用微服务Spring Cloud集成Kafka实现异步通信。其中Kafka Server部署在Ubuntu虚拟机上#xff0c;微服务部署在Windows 11系统上#xff0c;Kafka Producer微服务和Kafka Consumer微服务分别注册到Eureka注册中心。Kafka Producer和Kafka Consumer之…1、本文架构
本文目标是使用微服务Spring Cloud集成Kafka实现异步通信。其中Kafka Server部署在Ubuntu虚拟机上微服务部署在Windows 11系统上Kafka Producer微服务和Kafka Consumer微服务分别注册到Eureka注册中心。Kafka Producer和Kafka Consumer之间通过Kafka Server实现异步通信。
出于便于测试的目的我通过浏览器触发Kafka Producer发送消息观察Kafka Consumer的后台是否打印出接收到的消息内容。
Ubuntu 上部署Kafka Server详见博文Ubuntu下Kafka安装及使用-CSDN博客
Eureka注册中心的搭建过程和完整代码详见博文微服务1搭建微服务注册中心命令行简易版不使用IDE-CSDN博客
Kafka Producer微服务的完整代码详见博文使用微服务Spring Cloud集成Kafka实现异步通信-CSDN博客
本文的重点是实现下图中的深蓝色部分Kafka Consumer微服务。 2、创建Spring boot项目Kafka Consumer微服务项目
mvn archetype:generate -DgroupIdcom.test -DartifactIdmicroservice-kafka-consumer -DarchetypeArtifactIdmaven-archetype-quickstart
项目代码的完整目录如下图所示 编辑pom.xml添加依赖包 dependency groupIdorg.springframework.cloud/groupId artifactIdspring-cloud-stream-binder-kafka/artifactId /dependency
project xmlnshttp://maven.apache.org/POM/4.0.0 xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsdmodelVersion4.0.0/modelVersiongroupIdcom.test/groupIdartifactIdmicroservice-kafka-consumer/artifactIdpackagingjar/packagingversion1.0-SNAPSHOT/versionnamemicroservice-kafka-consumer/nameurlhttp://maven.apache.org/urlparentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion2.3.0.RELEASE/versionrelativePath/ /parentdependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdorg.springframework.cloud/groupIdartifactIdspring-cloud-starter-netflix-eureka-client/artifactId/dependency dependencygroupIdjunit/groupIdartifactIdjunit/artifactIdversion3.8.1/versionscopetest/scope/dependencydependencygroupIdorg.springframework.cloud/groupIdartifactIdspring-cloud-stream-binder-kafka/artifactId/dependency/dependenciesdependencyManagementdependenciesdependencygroupIdorg.springframework.cloud/groupIdartifactIdspring-cloud-dependencies/artifactIdversionHoxton.SR4/versiontypepom/typescopeimport/scope/dependency /dependencies/dependencyManagementbuildpluginsplugingroupIdorg.springframework.boot/groupIdartifactIdspring-boot-maven-plugin/artifactId/plugin/plugins/build/project编辑application.yml配置kafka消费者
consumer: #消费的主题 topic: test-topic #消费者组id group-id: test-group #是否自动提交偏移量 enable-auto-commit: true #提交偏移量的间隔-毫秒 auto-commit-ms: 1000 #客户端消费的会话超时时间-毫秒 session-timeout-ms: 10000 #实现DeSerializer接口的反序列化类键 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer #实现DeSerializer接口的反序列化类值 value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
server:port: 8030
spring:application:name: microservice-kafka-consumerkafka:bootstrap-servers: 192.168.23.131:9092consumer:group-id: test-groupenable-auto-commit: trueauto-commit-ms: 1000session-timeout-ms: 10000key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializereureka:client:serviceUrl:defaultZone: http://localhost:8080/eureka/instance:prefer-ip-address: true App.java的完整代码如下
package com.test;import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.kafka.annotation.KafkaListener;SpringBootApplication
EnableDiscoveryClient
public class App
{KafkaListener(topics mydemo1)public void listen(String msg) throws Exception {System.out.println( ----- Recv a msg: msg );}public static void main( String[] args ){System.out.println( Hello World! );SpringApplication.run(App.class, args);}
}3、测试
在浏览器输入触发Kafka Producer向Kafka Server发送消息
http://localhost:8020/kafka/sendMsg?msg测试消息testmsg
在Kafka Consumer的后台打印出收到的消息