大良网站设计价格,网站添加音乐,制作软件的app有哪些,免费简历一、快速入门
Pulsar 是一个分布式发布-订阅消息平台#xff0c;具有非常灵活的消息模型和直观的客户端 API。
最初由 Yahoo 开发#xff0c;在 2016 年开源#xff0c;并于2018年9月毕业成为 Apache 基金会的顶级项目。Pulsar 已经在 Yahoo 的生产环境使用了三年多#…一、快速入门
Pulsar 是一个分布式发布-订阅消息平台具有非常灵活的消息模型和直观的客户端 API。
最初由 Yahoo 开发在 2016 年开源并于2018年9月毕业成为 Apache 基金会的顶级项目。Pulsar 已经在 Yahoo 的生产环境使用了三年多主要服务于Mail、Finance、Sports、 Flickr、 the Gemini Ads platform、 Sherpa (Yahoo 的 KV 存储)。
号称云原生消息队列之王
主要特征
- 水平扩展
- 低延迟持久存储
- 多租户、认证、授权、配额
- 跨地域复制
- 主题的多种订阅模式独占共享和故障转移。
官网GitHub - apache/pulsar: Apache Pulsar - distributed pub-sub messaging system
B站入门视频 01-Apache Pulsar 课程介绍_哔哩哔哩_bilibili
SpringBoot使用starterGitHub - majusko/pulsar-java-spring-boot-starter: Simple pulsar spring boot starter with annotation based consumer/producer registration. 注意全局一个pulsar Client对象每一个topic生成对应的一个生产者和消费者对象因为不同topic可以灵活设置不同的特性。
消息传递
Pulsar基于publish-subscribe(pub-sub)生产订阅模式生产者将消息发布到Topic消费者可以订阅这些主题来处理消息并在处理完成后发送确认消息 消息生产者
发送模式
生产者将消息发布到Topic上发送消息分可为同步发送和异步发送两种模式 消息压缩 生产者发布消息在传输过程中会对数据进行压缩目前Pulsar支持的压缩方式有LZ4ZLIB, ZSTD, SNAPPY。如果启用了批处理生产者将在单个请求中累积一批消息进行发送批处理大小可以由最大消息数和最大发布延迟定义
批处理发送Batching 如果批处理开启producer将会累积一批消息然后通过一次请求发送出去。批处理的大小取决于最大的消息数量及最大的发布延迟。
消息消费者
消费模式
消费者从Topic上接收消息进行数据处理同样消息接收也分为同步接收和异步接收两种模式
消费确认ack
1消费者成功接收到消息时
当消费者成功处理完一条消息后会发送一个确认请求给broker告诉broker可以删除这条消息了,否则broker会一直存储这条消息。消息可以逐个确认也可以累积确认消费者只需要确认收到的最后一条消息这个流中所涉及到的所有消息都不会再重新传递给这个消费者。
2消费者不成功消费时
当消费者处理消息失败时会给broker发送一个失败确认这个时候broker就会给消费者重新发送这条消息失败确认可以逐条发送也可以累积发送这取决于消费订阅模式。在exclusive和failover订阅模式中消费者只会对收到的最后一条消息进行失败确认。在Pulsar客户端可以通过设置timeout的方式触发broker自动重新传递消息如果在timeout范围内消费者都没有发送确认请求那么broker就会自动重新发送这条消息给消费者。
3确认超时
如果某条消息一直处理失败就会触发broker一直重发这条消息给消费者使消费者无法处理其他消息Dead letter topic机制可以让消费者在无法成功消费某些消息时接收新的消息进行消费在这种机制下无法消费的消息存储在一个单独的topic中Dead letter topic用户可以决定如何处理这个topic中的消息。
消息的持久化
消息的持久化是通过BookKeeper实现的一旦创建了订阅关系Pulsar将保留所有的消息即使消费者断开了链接只有当消费者确认已经成功处理保留的消息时才会将这些消息丢弃消息。
消息的保留分为两种
1.在保留策略内的消息即使消费者已发送了确认也可以持久地存储在Pulsar中保留策略未涵盖的已确认消息将被删除如果没有保留策略所有已确认的消息都将被删除
2.设置消息到期时间会根据应用于namespace的TTL过期时间如果到期了即使消息没有被确认也会被删除
当有某条消息被重复发送时可以选择两种持久化策略
1.是将重复的消息也持久化到BookKeeper中
2.是判断如果是重复消息则不再进行持久化操作
租户tenant
Pulsar 从一开始就支持多租户topic 的名称是层级化的最上层是租户tenant
命名空间namespace
命名空间是租户内部逻辑上的命名术语。一个租户可以通过admin API创建多个命名空间。例如一个对接多个应用的租户可以为每个应用创建不同的namespace。 Topic
与其他pub-sub系统一样Pulsar中的topic被命名为从生产者向消费者传输消息的通道
{persistent|non-persistent}://tenant/namespace/topic producer写入不存在的主题时 了会在提供的命名空间下自动创建该主题。
常规topic只能由单个broker提供这限制了topic的最大吞吐量分区topic是由多个broker处理的一种特殊类型的topic它允许更高的吞吐量。分区topic和普通topic在订阅模式的工作方式上没有区别在创建主题时可以指定分区数。 消息路由模式
发布到分布分区topic主题时必须指定路由模式。默认三个路由模式默认轮询-和Kafka类似。 消息订阅模式subscription
Pulsar具有exclusivesharedfailoverKey_Shared 4种订阅模式 独占exclusive exclusive模式一个topic只允许一个消费者订阅否则会报错
在 exclusive 模式下一个 subscription 只允许被一个 consumer 用于订阅 topic 如果多个 consumer 使用相同的 subscription 去订阅同一个 topic则会发生错误。exclusive 是默认的订阅模式。如下图所示Consumer A-0 和 Consumer A-1 都使用了相同的 subscription相同的消费组只有 Consumer A-0 被允许消费消息。 故障转移|灾备failover failover模式多个消费者订阅同一个topic按照消费者名称进行排序第一个消费者时唯一接收到消息的消费者主消费者当主消费者断开连接时所有的后续消息都将发给下一个消费者
在 failover 模式下多个 consumer 允许使用同一个 subscription 去订阅 topic。但是对于给定的 topicbroker 将选择⼀个 consumer 作为该 topic 的主 consumer 其他 consumer 将被指定为故障转移 consumer 。当主 consumer 失去连接时topic 将被重新分配给其中⼀个故障转移 consumer ⽽新分配的 consumer 将成为新的主 consumer 。发⽣这种情况时所有未确认的消息都将传递给新的主 consumer 这个过程类似于 Kafka 中的 consumer 组重平衡rebalance。
如下图所示Consumer B-0 是 topic 的主 consumer 当 Consumer B-0 失去连接时Consumer B-1 才能成为新的主 consumer 去消费 topic。 共享shared shared模式多个消费者订阅同一个topic消息在消费者之间以循环的方式发送并且给定的某条消息只能发送给一个消费者当消费者断开连接时所有发送给它但没有确认的消息将重新安排发送给其他消费者
在 shared 模式下多个 consumer 可以使用同一个 subscription 去订阅 topic。消息以轮询的方式分发给 consumer 并且每条消费仅发送给一个 consumer 。当有 consumer 失去连接时所有发送给该 consumer 但未被确认的消息将被重新安排以便发送给该 subscription 上剩余的 consumer 。
但是消息不能保证有序以及不支持批量ack
如下图所示Consumer C-1Consumer C-2Consumer C-3 以轮询的方式接受消息。 共享键key_shared key_shared模式多个消费者订阅同一个topic消息以分布方式在消费者之间传递key, value具有相同key的消息传递给同一个消费者当这个消费者断开连接时将导致key对应的消费者更改
在 shared 模式下多个 consumer 可以使用同一个 subscription 去订阅 topic。消息按照 key 分发给 consumer 含有相同 key 的消息只被发送给同一个 consumer 。
如下图所示不同的 consumer 只接受到对应 key 的消息。 二、Pulsar原理架构
体系结构
在最高级别中一个Pulsar实例有一个或多个Pulsar集群组成实例中的集群可以彼此复制数据。在Pulsar集群中一个或多个broker处理和加载来自生产者传入的消息将消息发送给消费者与Pulsar配置存储通信以处理各种协调任务Pulsar集群架构如下所示包括一个或多个broker用于集群级配置和协调的Zookeeper用于持久存储消息的BookKeeper集群可以使用地理复制在集群间进行复制 Pulsar组件
Broker
Pulsar 的 broker 是一个无状态组件本身不存储数据。主要负责处理 producer 和 consumer 的请求消息的复制与分发数据的计算。可以理解成Broker 是 Pulsar 的自身实例
主要有2部分组成
1HTTP服务器向生产者和消费者公开用于管理任务和topic查找端的REST API
2调度程序异步TCP服务器通过用于所有数据传输的自定义二进制协议每个集群都有自己的本地Zookeeper用于存储集群特定的配置和协调如所有权元数据、代理加载报告、簿记员分类帐元数据等等。 Pulsar使用BookKeeper进行持久消息存储BookKeeper是一个分布式预写日志WAL系统。
除了消息数据外消费者的订阅位置cursor也可以持久地存储在BookKeeper中
每个 topic 的 partition 都会分配到某一个 borker 上producer 和 consumer 则会连接到这个 broker从而向该 topic 的 partition 发送和消费消息。broker 主要负责消息的复制与分发数据的计算。
zookeeper
主要用于存储元数据、集群配置任务的协调例如哪个 broker 负责哪个 topic服务的发现例如 broker 发现 bookie 的地址。
bookkeeper
主要用于数据的持久化存储。除了消息数据cursors游标 也会被持久化到 Bookeepercursors 是消费端订阅消费的位移。Bookeeper 中每一个存储节点叫做 bookie。
BookKeeper 是一种优化实时工作负载的存储服务具有可扩展、高容错、低延迟的特点。企业级的实时存储平台应符合以下几项要求
以极低的延迟小于 5 毫秒读写 entry 流能够持久、一致、容错地存储数据在写数据时能够进行流式传输或追尾传输有效地存储、访问历史数据与实时数据
数据存储
数据分区
写入主题的数据可能只有几个MB也有可能是几个TB。所以在某些情况下主题的吞吐量很低有时候又很高完全取决于消费者的数量。那么碰到有些主题吞吐量很高而有些又很低的情况该怎么处理为了解决这个问题Pulsar将一个主题的数据分布到多台机器上也就是所谓的分区。
在处理海量数据时为了保证高吞吐量分区是一种很常见的手段。默认情况下Pulsar的主题是不进行分区的但通过命令行工具或API可以很容易地创建分区主题并指定分区的数量。
在创建好分区主题之后Pulsar可以自动对数据进行分区不会影响到生产者和消费者。也就是说一个应用程序向一个主题写入数据对主题分区之后不需要修改应用程序的代码。分区只是一个运维操作应用程序不需要关心分区是如何进行的。【类似Kafka中的partition】
数据持久性
Pulsar broker在收到消息并进行确认之后就必须确保消息在任何情况下都不会丢失。与其他消息系统不同的是Pulsar使用Apache BookKeeper来保证持久性。BookKeeper提供了低延迟的持久化存储。Pulsar在收到消息之后将消息发送给多个BookKeeper节点具体由复制系数来定节点将数据写入预写式日志write ahead log同时在内存里也保存一份。节点在对消息进行确认之前强制将日志写入到持久化的存储上因此即使出现电力故障数据也不会丢失。因为Pulsar broker将数据发给了多个节点所以只会在大多数节点quorum确认写入成功之后它才会将确认消息发给生产者。Pulsar就是通过这种方式来保证即使在出现了硬件故障、网络故障或其他故障的情况下仍然能够保证数据不丢失。
三、集群搭建
从官网下载bin包修改配置组件集群搭建模式即可。
Set up a standalone Pulsar locally | Apache Pulsar
四、客户端使用
1.申请消费组【内部限制】
2.申请token
Java使用相关
依赖Pulsar官方版本 2.9.5
!-- in your properties block --
pulsar.version2.9.5/pulsar.version!-- in your dependencies block --
dependencygroupIdorg.apache.pulsar/groupIdartifactIdpulsar-client/artifactIdversion${pulsar.version}/version
/dependency
完整的官方指南Pulsar Java client | Apache Pulsar
生产者配置 public static void main(String[] args) throws PulsarClientException {// 1.1 建立客户端PulsarClientclient PulsarClient.builder().serviceUrl(pulsar://abc.demo.com:6651) // 集群连接地址请勿使用pulsar://host1,host2,host3的配置.authentication(AuthenticationFactory.token(************)).build();// 1.2 默认创建的生产者schema为byte[]Producerbyte[] producer client.newProducer().topic(persistent://test/test/my-topic).compressionType(LZ4).enableBatching(true) // 是否开启批量处理消息默认true。需要注意的是enableBatching只在异步发送sendAsync生效因此建议生产环境若想使用批处理则需使用异步发送.batchingMaxMessages(10) // 批处理中允许的最大消息数。默认1000建议设成10.batchingMaxBytes(4*1024*1024) // 一批消息的最大大小。建议设成4MB因为服务端有5MB的消息大小限制.batchingMaxPublishDelay(100, TimeUnit.MILLISECONDS) // 达到该时间就将一批消息发送出去默认10ms建议设大点100ms。可以理解为batch就算没攒够一旦过了这么多时间还是会发送出去.maxPendingMessages(1000).create();// 1.3 同步发送单条消息MessageId messageId producer.send(My message.getBytes());// 1.4 同步关闭生产者producer.close();client.close();System.out.println(发送pulsar消息成功: new String(messageId.toByteArray()));}
消费者配置 public static void main(String[] args) throws PulsarClientException {// 1.1 建立客户端PulsarClientclient PulsarClient.builder().serviceUrl(pulsar://abc.demo.com:6651) // 集群连接地址请勿使用pulsar://host1,host2,host3的配置.authentication(AuthenticationFactory.token(************)).build();// 1.2 创建消费者Consumerbyte[] consumer client.newConsumer(Schema.BYTES) // 消息schema需要与生产者配置一致默认是byte[].topic(persistent://test/test/my-topic).subscriptionName(my-subscription).subscriptionType(SubscriptionType.Shared) // 指定消费模式包含ExclusiveFailoverSharedKey_Shared。默认的是Exclusive模式.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) // 指定创建新订阅时cursor的初始位置默认Latest.subscribe();// 同步接收while (true) {Message msg consumer.receive();try {System.out.println(Message received: new String(msg.getData()));consumer.acknowledge(msg); // 消息需要ack了才不会再次接收} catch (Exception e) {consumer.negativeAcknowledge(msg);}}}
注意topic写上全路径名称如 persistent://test/test/test/web-test2-topic如果不填就默认到这个ns下persistent://public/default/my-topic
SpringBoot Starter使用
GitHub - majusko/pulsar-java-spring-boot-starter: Simple pulsar spring boot starter with annotation based consumer/producer registration.
1.引入依赖
dependencygroupIdio.github.majusko/groupIdartifactIdpulsar-java-spring-boot-starter/artifactIdversion1.1.2/version/dependency
2.定义生产者对象
Configuration
public class TestProducerConfiguration {Beanpublic ProducerFactory producerFactory() {return new ProducerFactory().addProducer(my-topic, MyMsg.class).addProducer(other-topic, String.class);}
}
在使用时可以通过模块来使用
Service
class MyProducer {Autowiredprivate PulsarTemplateMyMsg producer;void sendHelloWorld() throws PulsarClientException {producer.send(my-topic, new MyMsg(Hello world!));}
}
3.配置消费者
Service
class MyConsumer {PulsarConsumer(topicmy-topic, clazzMyMsg.class)void consume(MyMsg msg) {// TODO process your messageSystem.out.println(msg.getData());}// 批量消费PulsarConsumer(topic my-topic,clazzMyMsg.class,consumerName my-consumer,subscriptionName my-subscription,batch true)public void consumeString(MessagesMyMsg msgs) {msgs.forEach((msg) - {System.out.println(msg);});}// 批量消费和自动确认PulsarConsumer(topic my-topic,clazzMyMsg.class,consumerName my-consumer,subscriptionName my-subscription,batch true)public ListMessageId consumeString(MessagesMyMsg msgs) {ListMessageId ackList new ArrayList();msgs.forEach((msg) - {System.out.println(msg);ackList.add(msg.getMessageId());});return ackList;}// 批量消费和手动确认PulsarConsumer(topic my-topic,clazzMyMsg.class,consumerName my-consumer,subscriptionName my-subscription,batch true,batchAckMode BatchAckMode.MANUAL)public void consumeString(MessagesMyMsg msgs,ConsumerMyMsg consumer) {ListMessageId ackList new ArrayList();msgs.forEach((msg) - {try {System.out.println(msg);ackList.add(msg.getMessageId());} catch (Exception ex) {System.err.println(ex.getMessage());consumer.negativeAcknowledge(msg);}});consumer.acknowledge(ackList);}// 消费使用到元数据PulsarConsumer(topicmy-topic, clazzMyMsg.class)void consume(PulsarMessageMyMsg myMsg) { producer.send(TOPIC, msg.getValue()); }// 覆盖默认消费名称PulsarConsumer(topic my-topic,clazz MyMsg.class,consumerName my-consumer,subscriptionName my-subscription)// 支持 spel 表达式PulsarConsumer(topic ${my.custom.topic.name},clazz MyMsg.class,consumerName ${my.custom.consumer.name},subscriptionName ${my.custom.subscription.name})public void consume(MyMsg myMsg) {}
}
4.配置必要参数
#PulsarClient
pulsar.service-urlpulsar://localhost:6650
pulsar.io-threads10
pulsar.listener-threads10
pulsar.enable-tcp-no-delayfalse
pulsar.keep-alive-interval-sec20
pulsar.connection-timeout-sec10
pulsar.operation-timeout-sec15
pulsar.starting-backoff-interval-ms100
pulsar.max-backoff-interval-sec10
pulsar.consumer-name-delimiter
pulsar.namespacedefault
pulsar.tenantpublic
pulsar.auto-starttrue
pulsar.allow-interceptorfalse#Token based
pulsar.token-auth-value43th4398gh340gf34gj349gh304ghryj34fh#Consumer
pulsar.consumer.default.dead-letter-policy-max-redeliver-count-1
pulsar.consumer.default.ack-timeout-ms3000
5.错误处理
Service
public class PulsarErrorHandler {Autowiredprivate ConsumerAggregator aggregator;EventListener(ApplicationReadyEvent.class)public void pulsarErrorHandler() {aggregator.onError(failedMessage -failedMessage.getException().printStackTrace());}
}
6.自定义拦截器 pulsar.allow-interceptortrue Consumer Interceptor Example:
Component
public class PulsarConsumerInterceptor extends DefaultConsumerInterceptorObject {Overridepublic Message beforeConsume(ConsumerObject consumer, Message message) {System.out.println(do something);return super.beforeConsume(consumer, message);}
}
Producer Interceptor Example:
Component
public class PulsarProducerInterceptor extends DefaultProducerInterceptor {Overridepublic Message beforeSend(Producer producer, Message message) {super.beforeSend(producer, message);System.out.println(do something);return message;}Overridepublic void onSendAcknowledgement(Producer producer, Message message, MessageId msgId, Throwable exception) {super.onSendAcknowledgement(producer, message, msgId, exception);}
}
五、常见QA
1. 消费者消费不了消息抛的异常和schema相关
检查消费者的schema和topic生产者的schema是否一致如果不配置schema默认是byte[]字节流
Schema在new生产者或消费者的时候配置例如下列demo
Producerbyte[] producer pulsarClient.newProducer() // 默认byte[]
Consumerbyte[] consumer client.newConsumer(Schema.BYTES) ProducerUser producer client.newProducer(JSONSchema.of(User.class)) // schema为json
ConsumerUser consumer client.newConsumer(JSONSchema.of(User.class))
2. 怎么确定pulsar消息生产成功
用消费者尝试从earliest开始消费看是否能消费出消息