当前位置: 首页 > news >正文

广州公司建设网站运行时间 wordpress

广州公司建设网站,运行时间 wordpress,网站建设需求分析的实施,郑州做商城网站目 录 1. 新建一个消息生产者2. 新建一个消息消费者3. 测 试 在开始之前#xff0c;需要先做点准备工作#xff0c;用 IDEA 新建一个 Maven 项目#xff0c;取名 kafka-study#xff0c;然后删掉它的 src 目录#xff0c;接着在 pom.xml 里面引入下面的依赖。这个项目的作… 目 录 1. 新建一个消息生产者2. 新建一个消息消费者3. 测 试 在开始之前需要先做点准备工作用 IDEA 新建一个 Maven 项目取名 kafka-study然后删掉它的 src 目录接着在 pom.xml 里面引入下面的依赖。这个项目的作用是作为整个工程的父项目后面的项目都在此基础上新建 module 即可。 这里用到的环境信息如下 JDK1.8SpringBoot2.7.1IDEAMaven3.6.3 ?xml version1.0 encodingUTF-8? project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersionpackagingpom/packagingparentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion2.7.1/versionrelativePath//parentgroupIdorg.yuhuofei/groupIdartifactIdkafka-study/artifactIdversion1.0-SNAPSHOT/versionnamekafka-study/namedescriptionStudy project for Kafka/descriptionpropertiesjava.version1.8/java.version/propertiesdependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactIdscopetest/scope/dependency/dependenciesbuildpluginsplugingroupIdorg.springframework.boot/groupIdartifactIdspring-boot-maven-plugin/artifactId/plugin/plugins/build /project1. 新建一个消息生产者 使用 IDEA 在 kafka-study 的基础上新建一个 Maven 类型的 module 命名为 producer-01直至完成。接着我们改造这个 module 把它变成我们想要的消息生产者。 1、在 java 目录下新建一个包 com.yuhuofei 然后再创建一个启动类 ProducerApplication.java 内容如下: package com.yuhuofei;import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;SpringBootApplication public class ProducerApplication {public static void main(String[] args) {SpringApplication.run(ProducerApplication.class, args);}} 2、在 resources 目录下新建配置文件 application.properties 内容如下 server.port8080 server.servlet.context-path/producer-01 #Kafka配置 topic.namemy-kafka-topic3、在自己的 pom.xml 文件中引入 Kafka 客户端的依赖。这里需要注意的是由于我们在上一篇博客里安装的是 kafka_2.11-2.2.1 版本的服务端所以引入的 Kafka 客户端的版本最好对应也选择 2.2.1 版本的。 ?xml version1.0 encodingUTF-8? project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdparentartifactIdkafka-study/artifactIdgroupIdorg.yuhuofei/groupIdversion1.0-SNAPSHOT/version/parentmodelVersion4.0.0/modelVersionartifactIdproducer-01/artifactIddependencies!--官方原生的java版kafka客户端依赖--dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactIdversion2.2.1/version/dependency!--fastjson依赖--dependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactIdversion2.0.11.graal/version/dependency/dependencies/project4、由于 Kafka 进行消息的发布或者消费是需要有 Topic 的因此我们需要要创建一个 Topic。在 zookeeper 和 Kafka 服务端都启动的情况下打开 D:\Kafka\kafka_2.11-2.2.1\bin\windows 文件夹在地址栏输入 cmd打开控制台然后用下面的命令创建、查看、删除 topic 。 如下图所示我已经创建了一个名字为 my-kafka-topic 的 topic 这个 topic 的分区为 1副本也是 1。 # 创建topic kafka-topics.bat --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic my-kafka-topic # 查询topic列表 kafka-topics.bat --zookeeper 127.0.0.1:2181 --list # 删除topic kafka-topics.bat --delete --zookeeper 127.0.0.1:2181 --topic my-kafka-topic # 查询某个topic信息 kafka-topics.bat --describe --zookeeper localhost:2181 --topic my-kafka-topic #kafka查看topic数据内容 kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic my-kafka-topic --from-beginning5、实现生产者发送消息 controller层 package com.yuhuofei.controller;import com.yuhuofei.service.ProducerInterface; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;/*** Description* ClassName ProducerController* Author yuhuofei* Date 2023/8/13 16:38* Version 1.0*/ RequestMapping(/info) RestController public class ProducerController {Resourceprivate ProducerInterface producerInterface;GetMapping(/create-message)public void createTopic() {producerInterface.createMessage();} } service层 package com.yuhuofei.service;/*** Description* InterfaceName ProducerController* Author yuhuofei* Date 2023/8/13 16:39* Version 1.0*/ public interface ProducerInterface {void createMessage(); } serviceImpl层 package com.yuhuofei.service.impl;import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.yuhuofei.service.ProducerInterface; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service;import javax.annotation.Resource; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future;/*** Description* ClassName ProducerInterfaceImpl* Author yuhuofei* Date 2023/8/13 16:39* Version 1.0*/ Service public class ProducerInterfaceImpl implements ProducerInterface {Value(${topic.name})private String topicName;Resourceprivate Producer producer;Overridepublic void createMessage() {//创建测试数据JSONObject jsonObject new JSONObject();jsonObject.put(word, 这是生产者生产的测试消息);String jsonString JSON.toJSONString(jsonObject);//使用kafka发送消息ProducerRecordString, String producerRecord new ProducerRecord(topicName, jsonString);producer.send(producerRecord);} } 客户端配置类 package com.yuhuofei.config;import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;import java.util.Properties;/*** Description 配置生产者客户端参数* ClassName KafkaProperties* Author yuhuofei* Date 2023/8/13 22:18* Version 1.0*/ Configuration public class KafkaConfig {Bean(producer)public ProducerString, String getKafkaProducer() {Properties properties new Properties();//配置Kafka的服务端IP和端口properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 127.0.0.1:9092);//指定key使用的序列化类properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//指定value使用的序列化类properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());return new KafkaProducer(properties);} } 6、启动服务调用接口 http://localhost:8080/producer-01/info/create-message 然后在控制台用命令查看一下消息是否写入如下图所见虽然是乱码但是已经可以看出消息成功写入到 Kafka 到这里消息生产者的创建暂时告一段落。 2. 新建一个消息消费者 和前面建立生产者类似的步骤只是文件信息和名字不一样这里步骤就不重复了直接给出相关的配置和类。这个消费者取名 consumer-01 。 pom.xml文件 ?xml version1.0 encodingUTF-8? project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdparentartifactIdkafka-study/artifactIdgroupIdorg.yuhuofei/groupIdversion1.0-SNAPSHOT/version/parentmodelVersion4.0.0/modelVersionartifactIdconsumer-01/artifactIddependencies!--官方原生的java版kafka客户端依赖--dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactIdversion2.2.1/version/dependency!--fastjson依赖--dependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactIdversion2.0.11.graal/version/dependency/dependencies /projectproperties文件 server.port8081 server.servlet.context-path/consumer-01 #Kafka配置 topic.namemy-kafka-topic启动类 ConsumerApplication.java package com.yuhuofei;import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;SpringBootApplication public class ConsumerApplication {public static void main(String[] args) {SpringApplication.run(ConsumerApplication.class, args);}} 控制层ConsumerController.java package com.yuhuofei.controller;import com.yuhuofei.service.ConsumerInterface; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;/*** Description* ClassName ConsumerController* Author yuhuofei* Date 2023/8/14 0:19* Version 1.0*/ RestController RequestMapping(/consumer) public class ConsumerController {Resourceprivate ConsumerInterface consumerInterface;GetMapping(/getData)public void getData() {consumerInterface.getData();} }service层 package com.yuhuofei.service;/*** Description* ClassName ConsumerInterface* Author yuhuofei* Date 2023/8/14 0:20* Version 1.0*/ public interface ConsumerInterface {void getData(); } serviceImpl层 package com.yuhuofei.service.impl;import com.yuhuofei.service.ConsumerInterface; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service;import javax.annotation.Resource; import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List;/*** Description* ClassName ConsumerInterfaceImpl* Author yuhuofei* Date 2023/8/14 0:20* Version 1.0*/ Service public class ConsumerInterfaceImpl implements ConsumerInterface {Value(${topic.name})private String topicName;Resourceprivate Consumer consumer;Overridepublic void getData() {CollectionString topics Collections.singletonList(topicName);consumer.subscribe(topics);ListString result new ArrayList();while (true) {ConsumerRecordsString, String consumerRecords consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord consumerRecord : consumerRecords) {//从ConsumerRecord中获取消费数据String res (String) consumerRecord.value();System.out.println(从Kafka中消费的原始数据: res);result.add(res);System.out.println(result);}}} }config配置类 package com.yuhuofei.config;import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;import java.util.Properties;/*** Description 配置消费者参数* ClassName KafkaConsumerConfig* Author yuhuofei* Date 2023/8/14 0:08* Version 1.0*/ Configuration public class KafkaConsumerConfig {Bean(consumer)public ConsumerString, String getKafkaConsumer() {Properties properties new Properties();//配置Kafka的服务端IP和端口properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 127.0.0.1:9092);//指定key使用的序列化类properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());//指定value使用的序列化类properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());//指定消费者组properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumer-01-group);properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,100);properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,latest);return new KafkaConsumer(properties);} } 最终整个项目目录如下图所示。 3. 测 试 1、检查 zookeeper 和 Kafka 服务端是否是启动的没启动就先启动。 2、接着启动消费者服务然后调用一下接口 http://localhost:8081/consumer-01/consumer/getData 这么做的目的启动一个线程监听 topic 。 3、最后启动生产者服务调用接口 http://localhost:8080/producer-01/info/create-message 向 Kafka 服务器写数据。 结果如下所示 控制台看到的所有消息记录 生产者发送一条消息 消费者监听到一条消息并接收 在 SpringBoot 中使用官方原生 java 版 Kafka 客户端的入门介绍到这里结束还有更多使用方式等待挖掘。
http://www.w-s-a.com/news/351937/

相关文章:

  • 南昌县建设局网站微信分销小程序开发
  • 网站设计师需要什么知识与技能wordpress个性
  • 做茶叶网站的目的和规划有什么做照片书的网站
  • 开福区城乡建设局门户网站关键词挖掘查询工具爱站网
  • 网站建设全国排名沈阳seo按天计费
  • 成都公司网站设计无锡seo网站推广费用
  • 建网站平台要多少钱购物网站界面设计策划
  • 学完js了可以做哪些网站长沙建站官网
  • 怎么样做问卷网站多少钱英语
  • 房产网站建设方案建筑公司是干什么的
  • wordpress建的大型网站柳州市网站建设
  • 石家庄做网站的公司有哪些微信自媒体网站建设
  • 池州哪里有做网站注册公司有哪些风险
  • 做古代风格头像的网站对网站政务建设的建议
  • 网站搜索栏怎么做设计个网站要多少钱
  • 阿里巴巴网站建设目标wamp wordpress
  • 自己做的网站怎么挂网上金蝶erp
  • 网站的页面由什么组成淘宝网网站建设的需求分析
  • 软文网站推广法dede5.7内核qq个性门户网站源码
  • 个人备案网站名称校园网站建设特色
  • vr超市门户网站建设班级网站怎么做ppt模板
  • 网站建设一般是用哪个软件刚开始做写手上什么网站
  • 用jsp做的网站源代码下载有哪些做红色旅游景点的网站
  • 网站开发的技术选型黄石市网站建设
  • 做直播网站需要证书吗专做宝宝的用品网站
  • 网站标题用什么符号网站制作交易流程
  • dede模板网站教程jsp网站搭建
  • 上海网站开发外包公司鲜花导购网页制作
  • 宿州外贸网站建设公司个人注册网站一般做什么
  • 小公司做网站用哪种服务器什么是网站代理