做ps从哪个网站上下载图片大小,固安做网站的,画册设计多少钱一页,简易手工制作目录 0.为什么用消息队列1.代码文件创建结构2.pom.xml文件3.三个配置文件开发和生产环境4.Rabbitmq 基础配置类 TtlQueueConfig5.建立netty服务器 rabbitmq消息生产者6.建立常规队列的消费者 Consumer7.建立死信队列的消费者 DeadLetterConsumer8.建立mapper.xml文件9.建立map… 目录 0.为什么用消息队列1.代码文件创建结构2.pom.xml文件3.三个配置文件开发和生产环境4.Rabbitmq 基础配置类 TtlQueueConfig5.建立netty服务器 rabbitmq消息生产者6.建立常规队列的消费者 Consumer7.建立死信队列的消费者 DeadLetterConsumer8.建立mapper.xml文件9.建立mapper文件接口10.建立接口ProducerController 测试11.测试接口请求112.测试接口请求213.网络助手测试NetAssist.exe14.观察rabbitmq界面管理简单介绍 0.为什么用消息队列
流量消峰应用解耦异步确认
1.代码文件创建结构 2.pom.xml文件
?xml version1.0 encodingUTF-8?
project xmlnshttp://maven.apache.org/POM/4.0.0 xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersionparentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion2.3.4.RELEASE/versionrelativePath/ !-- lookup parent from repository --/parentgroupIdcom.test/groupIdartifactIddemo/artifactIdversion0.0.1-SNAPSHOT/versionnamedemo/namedescriptionDemo project for Spring Boot/descriptionpackagingwar/packagingurl/licenseslicense//licensesdevelopersdeveloper//developersscmconnection/developerConnection/tag/url//scmpropertiesjava.version1.8/java.version/propertiesdependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactIdoptionaltrue/optional/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactIdscopetest/scope/dependencydependencygroupIdorg.springframework.amqp/groupIdartifactIdspring-rabbit/artifactId/dependencydependencygroupIdio.netty/groupIdartifactIdnetty-all/artifactIdversion4.1.86.Final/version !-- 根据需要选择版本 --/dependencydependencygroupIdcom.baomidou/groupIdartifactIddynamic-datasource-spring-boot-starter/artifactIdversion3.5.0/version/dependencydependencygroupIdcom.alibaba/groupIdartifactIdeasyexcel/artifactIdversion3.1.1/version/dependencydependencygroupIdorg.mybatis.spring.boot/groupIdartifactIdmybatis-spring-boot-starter/artifactIdversion2.2.0/version/dependencydependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactIdscoperuntime/scope/dependency/dependenciesbuildpluginsplugingroupIdorg.springframework.boot/groupIdartifactIdspring-boot-maven-plugin/artifactIdconfigurationexcludesexcludegroupIdorg.projectlombok/groupIdartifactIdlombok/artifactId/exclude/excludes/configuration/pluginplugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-surefire-plugin/artifactIdconfigurationskiptrue/skip/configuration/plugin/plugins/build
/project 3.三个配置文件开发和生产环境 文件一 application.properties
#开发环境
spring.profiles.activedev
#生产环境
#spring.profiles.activeprod文件二 application-dev.properties 开发环境
spring.application.namedemo
server.servlet.context-path/demo1
server.port1001
#spring.main.allow-circular-referencestrue
spring.rabbitmq.host实际ip地址
spring.rabbitmq.port5672
spring.rabbitmq.usernameroot
spring.rabbitmq.passwordroot
spring.rabbitmq.virtual-host/
##创建单线程监听容器 本项目目前用的是单线程 这里预期值2000 需根据实际情况调整
spring.rabbitmq.listener.simple.prefetch2000
##创建多线程监听容器
#spring.rabbitmq.listener.direct.prefetch2000
#spring.rabbitmq.listener.simple.acknowledge-modeauto
# application.properties 示例
spring.rabbitmq.listener.simple.acknowledge-modeMANUAL
#开启消息确认机制
spring.rabbitmq.publisher-confirm-typecorrelated
spring.rabbitmq.publisher-returnstruenetty.server.port1002
netty.server.bossThreads1
netty.server.workerThreads1server.max-http-header-size655360
mybatis.mapper-locationsclasspath:mapper/*Mapper.xml
mybatis.type-aliases-packagecom.mt.entity
spring.mvc.pathmatch.matching-strategyant_path_matcher
logging.level.com.ysd.mapperinfo
logging.file.namedemo.log
logging.level.com.mt.mapperinfo
#mybatis.configuration.log-implorg.apache.ibatis.logging.stdout.StdOutImpl
mybatis.configuration.map-underscore-to-camel-casetrue
pagehelper.helper-dialectmysql
pagehelper.reasonabletrue
spring.main.allow-circular-referencestrue
spring.jackson.default-property-inclusionnon_null#!--数据库1 --
spring.datasource.dynamic.英文数据库名称1.urljdbc:mysql://ip地址:3306/英文数据库名称?serverTimezoneUTCuseUnicodetruecharacterEncodingutf8useSSLfalse
spring.datasource.dynamic.datasource.英文数据库名称1.usernameroot
spring.datasource.dynamic.datasource.英文数据库名称1.passwordroot
spring.datasource.dynamic.datasource.英文数据库名称1.driver-class-namecom.mysql.cj.jdbc.Driver
#!--数据库2 --
spring.datasource.dynamic.英文数据库名称2.urljdbc:mysql://ip地址:3306/英文数据库名称?serverTimezoneUTCuseUnicodetruecharacterEncodingutf8useSSLfalse
spring.datasource.dynamic.datasource.英文数据库名称2.usernameroot
spring.datasource.dynamic.datasource.英文数据库名称2.passwordroot
spring.datasource.dynamic.datasource.英文数据库名称2.driver-class-namecom.mysql.cj.jdbc.Driver4.Rabbitmq 基础配置类 TtlQueueConfig
rabbitmq基础信息配置已经在application-dev.properities中进行配置过一部分
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;Configuration
public class TtlQueueConfig {public static final String X_EXCHANGE X;public static final String QUEUE_A QA;public static final String QUEUE_B QB;public static final String Y_DEAD_LETTER_EXCHANGE Y;public static final String DEAD_LETTER_QUEUE QD;Bean(xExchange)public DirectExchange xExchange() {return new DirectExchange(X_EXCHANGE);}Bean(yExchange)public DirectExchange yExchange() {return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);}Bean(queueA)public Queue queueA() {MapString, Object args new HashMap(3);//声明当前队列绑定的死信交换机args.put(x-dead-letter-exchange, Y_DEAD_LETTER_EXCHANGE);//声明当前队列的死信路由 keyargs.put(x-dead-letter-routing-key, YD);//声明队列的 TTLargs.put(x-message-ttl, 150000);//超出150秒没有被消费 就会进入死信队列return QueueBuilder.durable(QUEUE_A).withArguments(args).build();}Beanpublic Binding queueaBindingX(Qualifier(queueA) org.springframework.amqp.core.Queue queueA,Qualifier(xExchange) DirectExchange xExchange) {return BindingBuilder.bind(queueA).to(xExchange).with(XA);}//声明队列 B ttl 为 40s 并绑定到对应的死信交换机Bean(queueB)public Queue queueB() {MapString, Object args new HashMap(3);//声明当前队列绑定的死信交换机args.put(x-dead-letter-exchange, Y_DEAD_LETTER_EXCHANGE);//声明当前队列的死信路由 keyargs.put(x-dead-letter-routing-key, YD);//声明队列的 TTLargs.put(x-message-ttl, 40000);//超出40秒没有被消费 就会进入死信队列return QueueBuilder.durable(QUEUE_B).withArguments(args).build();}//声明队列 B 绑定 X 交换机Beanpublic Binding queuebBindingX(Qualifier(queueB) Queue queue1B,Qualifier(xExchange) DirectExchange xExchange) {return BindingBuilder.bind(queue1B).to(xExchange).with(XB);}//声明死信队列 QDBean(queueD)public Queue queueD() {return new Queue(DEAD_LETTER_QUEUE);}//声明死信队列 QD 绑定关系Beanpublic Binding deadLetterBindingQAD(Qualifier(queueD) Queue queueD,Qualifier(yExchange) DirectExchange yExchange) {return BindingBuilder.bind(queueD).to(yExchange).with(YD);}Beanpublic SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory new SimpleRabbitListenerContainerFactory();//这个connectionFactory就是我们自己配置的连接工厂直接注入进来simpleRabbitListenerContainerFactory.setConnectionFactory(connectionFactory);//这边设置消息确认方式由自动确认变为手动确认simpleRabbitListenerContainerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);//设置消息预取数量// simpleRabbitListenerContainerFactory.setPrefetchCount(1);return simpleRabbitListenerContainerFactory;}
}5.建立netty服务器 rabbitmq消息生产者
创建服务器类 初始化启动服务器NettyServer
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;Component
Slf4j
public class NettyServer implements ApplicationRunner {Autowiredprivate RabbitTemplate rabbitTemplate;//有可能在项目初始化的时候加载不出来导致项目隐形报错//加载application-dev.proerities文件中 对应参数配置项Value(${netty.server.port})private int port;public int getPort () {return port;}Overridepublic void run(ApplicationArguments args) throws Exception {log.info(netty服务启动端口getPort());EventLoopGroup bossGroup new NioEventLoopGroup(); // 用于接收进来的连接EventLoopGroup workerGroup new NioEventLoopGroup(); // 用于处理已经被接收的连接ServerBootstrap b new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) // 使用Nio的通道类型.childHandler(new ChannelInitializerSocketChannel() { // 添加一个处理器来处理接收到的数据Overridepublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p ch.pipeline();p.addLast(new StringDecoder());p.addLast(new StringEncoder());p.addLast(new SimpleChannelInboundHandlerString() {Overrideprotected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {//netty监听端口消息发送消息通过rabbitmq生产者发送到消息队列sendMessage(ctx,msg);}Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();ctx.close();}});}});// 绑定端口开始接收进来的连接ChannelFuture f b.bind(getPort()).sync();// 等待服务器socket关闭f.channel().closeFuture().sync();}//netty监听端口消息发送消息通过rabbitmq生产者发送到消息队列中//充当消息的生产者发送public void sendMessage(ChannelHandlerContext ctx, String msg){// 接收netty监听信息来源作为消息生产者rabbitTemplate.convertAndSend(X,XA,来自QAmsg);ctx.writeAndFlush(msg);}
}
6.建立常规队列的消费者 Consumer
import com.rabbitmq.client.Channel;
import com.test.demo.service.MessageProcessService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;Slf4j
Component
public class Consumer {Resourceprivate MessageProcessService messProceService;private MapInteger, Message messageMap new HashMap();private int i 1;private int j 1;RabbitListener(queues QA)public void receiveQA(Message message, Channel channel) throws InterruptedException, IOException {i;synchronized (this) {messageMap.put(i,message);if (messageMap.size() 1500) {// 模拟数据库插入操作System.out.println(模拟插入数据库操作QA队列处理消息数 messageMap.size());processMessagesBatch(channel);System.out.println(模拟插入成功, 准备进行下一次收集);}}}// RabbitListener(queues QB)
// public void receiveQB(Message message, Channel channel) throws IOException {
// String msg new String(message.getBody());
// log.info(当前时间{},监听QB队列信息{}, new Date().toString(), msg);
// }public static class SleepUtils {public static void sleep(int second) {try {Thread.sleep(1000 * second);} catch (InterruptedException _ignored) {Thread.currentThread().interrupt();}}}private void processMessagesBatch(Channel channel) throws IOException {ListString list new ArrayList();long startTime System.currentTimeMillis();MapInteger, Message tempMessageMap new HashMap(messageMap);messageMap.clear();for (Map.EntryInteger, Message map : tempMessageMap.entrySet()) {list.add(C (j) : QA队列收集 new String(map.getValue().getBody()));}int count messProceService.add(list);if (count list.size()) {System.out.println(插入数据成功);for (Map.EntryInteger, Message map : tempMessageMap.entrySet()) {channel.basicAck(map.getValue().getMessageProperties().getDeliveryTag(), false);}list.clear();}long durationSeconds (System.currentTimeMillis() - startTime) / 1000;System.out.println(插入1500条数据执行时间: durationSeconds);}
}7.建立死信队列的消费者 DeadLetterConsumer
import com.rabbitmq.client.Channel;
import com.test.demo.service.MessageProcessService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.io.IOException;
import java.util.*;Slf4j
Component
public class DeadLetterConsumer {Resourceprivate MessageProcessService messProceService;private MapInteger, Message messageMap new HashMap();private int i 1;private int j 1;RabbitListener(queues QD)public void receiveD(Message message, Channel channel) throws IOException {i;synchronized (this) {messageMap.put(i, message);if (messageMap.size() 100) {// 模拟数据库插入操作System.out.println(模拟插入数据库操作死信队列处理消息数 messageMap.size());processMessagesBatch(channel);System.out.println(模拟插入成功, 准备进行下一次收集);// 确认当前消息以便RabbitMQ知道它可以释放此消息}}}private void processMessagesBatch(Channel channel) throws IOException {ListString list new ArrayList();// 复制当前消息映射以避免在迭代时修改MapInteger, Message tempMessageMap new HashMap(messageMap);messageMap.clear(); long startTime System.currentTimeMillis();for (Map.EntryInteger, Message map : tempMessageMap.entrySet()) {list.add(C (j) : 死信队列收集 new String(map.getValue().getBody()));}int count messProceService.add(list);long durationSeconds (System.currentTimeMillis() - startTime) / 1000;System.out.println(插入50条数据执行时间: durationSeconds);if (count list.size()) {System.out.println(插入数据成功);list.clear();for (Map.EntryInteger, Message map : tempMessageMap.entrySet()) {channel.basicAck(map.getValue().getMessageProperties().getDeliveryTag(), false);}}}}8.建立mapper.xml文件
MessageProcessMapper.xml
?xml version1.0 encodingUTF-8?
!DOCTYPE mapper PUBLIC -//mybatis.org//DTD Mapper 3.0//EN http://mybatis.org/dtd/mybatis-3-mapper.dtd
mapper namespacecom.test.demo.mapper.MessageProcessMapperinsert idaddINSERT INTO t_xinyang_direct (direct,create_date)VALUESforeach collectionlist itemitem separator,(#{item},SYSDATE())/foreach/insert
/mapper
9.建立mapper文件接口
MessageProcessMapper import com.baomidou.dynamic.datasource.annotation.DS;
import org.apache.ibatis.annotations.Mapper;import java.util.List;Mapper
DS(英文数据库名称1)
public interface MessageProcessMapper {int add(ListString list);
}10.建立接口ProducerController 测试
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;Slf4j
RestController
RequestMapping(/api)
public class ProducerController {GetMapping(/hello/{message})public ResponseEntityString sayHello(PathVariable String message) {return ResponseEntity.ok(Hello, RabbitMQ!message);}Autowiredprivate RabbitTemplate rabbitTemplate;GetMapping(/sendMsg/{message})public void sendMsg(PathVariable String message){log.info(接收接口发送的消息请求);rabbitTemplate.convertAndSend(X,XA,消息来自tt1为10s的队列message);rabbitTemplate.convertAndSend(X,XB,消息来自tt1为150s的队列message);}}11.测试接口请求1
测试请求接口 /hello http://实际本机ip地址:1001/demo1/api/hello/返回浏览器输入内容
12.测试接口请求2
测试请求接口 /hello http://实际本机ip地址:1001/demo1/api/sendRabbitMq/发给rabbitmq消息
13.网络助手测试NetAssist.exe
主要目的模拟向netty端口进行发送数据通过netty监听到的信息然后通过rabbitmq的生产者发送rabbitmq的队列中让消费者进行消费如果消费者绑定死信队列,那么消费者从队列中取出消息后经过一定时间未确认即不进行消费确认或者拒绝然后入之前绑定好的死信队列中供死信队列绑定的死信消费者进行消费处理。
14.观察rabbitmq界面管理简单介绍