园岭中小网站建设,网站开发用什么系统比较好,沈阳微信网站制作,建网站公司专业消息中间件-RocketMQ入门 消息发送的三种方式消息中间件简介应用场景常用消息中间件RocketMQ核心概念入门案例-生产者和消费者代码逻辑消息发送的三种方式同步发送异步发送一次性消息消息中间件简介
应用场景
假设现在有订单微服务和积分微服务,正常请求流程之后是不是一个订…
消息中间件-RocketMQ入门 消息发送的三种方式消息中间件简介应用场景常用消息中间件RocketMQ核心概念入门案例-生产者和消费者代码逻辑消息发送的三种方式同步发送异步发送一次性消息消息中间件简介
应用场景
假设现在有订单微服务和积分微服务,正常请求流程之后是不是一个订单完成后给对应的用户加上积分但如果积分微服务坏掉了正常来说会回滚但实际中情况中积分完全可以晚一点加没有什么影响 1.解决代码耦合的问题 解决问题的方法 这样订单微服务把参数发送给中间件之后就完成了它自己的任务使微服务不用依赖其它微服务就算中间件挂了也不需要担心它虽然默认存储在内存里面但也会在磁盘里面存一份 2.进行流量的削峰 3.数据分发 解决办法
常用消息中间件 1.ActiveMQ是Apache出品,比较老的一个开源的消息中间件,以前在中小企业应用广泛. 2.Kafka是由Apache软件基金会开发的一个开源流处理平台由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统。它可以处理消费者规模的网站中的所有动作流数据。这种动作(网页浏览搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决. 3.RabbitMQ是一个基于Erlang语言开发的消息中间件, RabbitMQ最初起源于金融系统用于在分布式系统中存储转发消息在易用性、扩展性、高可用性等方面表现不俗。 对数据的一致性,稳定性和可靠性要求比较高的场景 4.RocketMQ是阿里巴巴在2012年开源的分布式消息中间件目前已经捐赠给Apache软件基金会并于2017年9月25日成为 Apache的顶级项目。作为经历过多次阿里巴巴双十一这种超级工程的洗礼并有稳定出色表现的国产中间件以其高性能、低延时和高可靠等特性近年来已经也被越来越多的国内企业使用。 淘宝内部的交易系统使用了淘宝自主研发的Noify消息中间件使用MySQL作为消息存储媒介可完全水平扩容为了进一步降低成本我们认为存储部分可以进一步优化2011年初Linkin开源了Kafka这个优秀的消息中间件淘宝中间件团队在对Kafka做过充分Review之后Kaka无限消息堆积高效的持久化速度吸引了我们但是同时发现这个消息系统主要定位于日志传输对于使用在淘宝交易、订单、充值等场景下还有诸多特性不满足为此我们重新用Javai语言编写了RocketMQ定位于非日志的可靠消息传输〈(日志场最也OK)目前RocketMQ在阿里集团被广泛应用在订单交易充值流计算消息推送日志流式处理, binlog分发等场景。 RocketMQ核心概念 消息中间件里面集群了多个代理服务器如何做到负载? 在创造RocketMQ的时候它本身有一个轻量级的注册中心称为NameServer命名服务因为像Nacos和zookeeper这样复杂的注册中心运行起来对性能肯定也会有一定的影响倘若有一天该注册中心不开源不维护了该中间件是不是也会因此遇到很大的麻烦
入门案例-生产者和消费者代码逻辑 第一步创建两个两个项目分别为生产者和消费者 创建生产者 第一步导入依赖 dependenciesdependencygroupIdorg.apache.rocketmq/groupIdartifactIdrocketmq-client/artifactIdversion4.4.0/version/dependency
/dependencies第二步:创建生产类模拟生产 public class Producer {public static void main(String[] args) throws Exception {//定义一个生产者对象DefaultMQProducer producer new DefaultMQProducer(helloGroup);//连接nameServerproducer.setNamesrvAddr(43.143.161.59:9876);//启动生产者producer.start();//设置消息发送的目的地String topic helloTopic;//发送消息for(int i0;i10;i){Message msg new Message(topic,(RocketMQ普通消息i).getBytes(Charset.defaultCharset()));SendResult result producer.send(msg);System.out.println(发送状态result.getSendStatus());}System.out.println(消息发送完毕.);//关闭资源producer.shutdown();}
}创建消费者 第一步导入依赖 dependenciesdependencygroupIdorg.apache.rocketmq/groupIdartifactIdrocketmq-client/artifactIdversion4.4.0/version/dependency
/dependencies第二步创建消费类模拟接收 public class Consumer {public static void main(String[] args) throws Exception {//定义消息消费者(在同一个JVM中,消费者的组名不能重复)DefaultMQPushConsumer consumer new DefaultMQPushConsumer(helloConsumerGroup);//设置nameServer地址consumer.setNamesrvAddr(43.143.161.59:9876);//设置订阅的主题consumer.subscribe(helloTopic,*);//设置消息的监听器consumer.setMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for(MessageExt msg:list){String s new String(msg.getBody(), Charset.defaultCharset());System.out.println(线程:Thread.currentThread(),消息的内容:s);}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动消费者consumer.start();}
}消息发送的三种方式
同步发送 应用程序给消息中间件发送消息的时候。需要等待消息中间件将消息存储完毕之后才响应回去。业务代码才能往下执行
异步发送 应用程序给消息中间件发送消息的时候,消息中间件收到这个消息之后直接给应用程序响应了.(此时消息并没有完全存储到磁盘),消息中间件继续存储消息。存储完成(成功或者失败),通过回调地址通知有应用程序。消息存储的结果 示例代码
public class Producer {public static void main(String[] args) throws Exception {//定义一个生产者对象DefaultMQProducer producer new DefaultMQProducer(helloGroup);//连接nameServerproducer.setNamesrvAddr(43.143.161.59:9876);//启动生产者producer.start();//设置消息发送的目的地String topic helloTopic;//发送消息Message msg new Message(topic,(RocketMQ异步消息).getBytes(Charset.defaultCharset()));System.out.println(消息发送前);//异步发送producer.send(msg, new SendCallback() {Overridepublic void onSuccess(SendResult sendResult) {System.out.println(消息存储状态:sendResult.getSendStatus());}Overridepublic void onException(Throwable throwable) {System.out.println(消息发送出现异常);}});System.out.println(消息发送完毕.);TimeUnit.SECONDS.sleep(5);//关闭资源producer.shutdown();}运行结果 业务逻辑处理 ---- 执行send方法,不需要等待消息中间件存储消息,可以直接执行业务逻辑代码 与同步发送相比异步发送时间更短一点响应更快一点为了使响应时间更快的可以选择异步发送但同步发送也有它自己的意义同步发送更加可靠
一次性消息
应用程序给消息中间件发送消息的时候不需要知道消息是否在消息中间存储了只管发就是了.
public class Producer {public static void main(String[] args) throws Exception {//定义一个生产者对象DefaultMQProducer producer new DefaultMQProducer(helloGroup);//连接nameServerproducer.setNamesrvAddr(43.143.161.59:9876);//启动生产者producer.start();//设置消息发送的目的地String topic helloTopic;//发送消息Message msg new Message(topic,(RocketMQ一次性消息).getBytes(Charset.defaultCharset()));System.out.println(消息发送前);producer.sendOneway(msg);System.out.println(消息发送完毕.);TimeUnit.SECONDS.sleep(5);//关闭资源producer.shutdown();}
}运行结果