普通网站逻辑设计数据流图,在建设网站入账,网站图片优化,北京购物网站建设Apache Pulusar是一个分布式、多租户、高性能的发布/订阅#xff08;Pub/Sub#xff09;消息系统#xff0c;最初由Yahoo开发并开源。它结合了Kafka和传统消息队列的优点#xff0c;提供高吞吐量、低延迟、强一致性和可扩展的消息传递能力#xff0c;适用于大规模分布式系…Apache Pulusar是一个分布式、多租户、高性能的发布/订阅Pub/Sub消息系统最初由Yahoo开发并开源。它结合了Kafka和传统消息队列的优点提供高吞吐量、低延迟、强一致性和可扩展的消息传递能力适用于大规模分布式系统的实时数据处理和异步通信。 Pulsar的架构设计结合了消息队列和流处理的特点既可以作为传统消息队列使用也可以作为流处理平台支持实时数据处理。
主要特点
分布式架构Pulsar采用分层架构将消息存储与代理服务分离提供了更好的水平扩展能力和故障隔离。多租户支持Pulsar支持多租户部署不同租户可以共享同一集群同时保证资源隔离和安全性。持久化和一致性Pulsar支持消息的持久化存储并通过BookKeeper提供强一致性保障。灵活的消息模型Pulsar支持多种消息传递模式包括Pub/Sub、P2P和Key_Shared订阅模式。多语言支持Pulsar提供了多种编程语言的客户端库如Java、Python、Go、C等。丰富的生态Pulsar拥有活跃的社区和丰富的生态系统支持与其他工具和服务集成如Kafka Connect、Flink、Spark等。
1、核心概念
1、命名空间Namespace
命名空间是Pulsar中的一个逻辑单元用于组织和管理主题Topic。每个命名空间可以包含多个主题并且可以为不同的命名空间设置不同的配置例如保留策略、订阅类型等。命名空间通常用于实现多租户隔离。
2、主题Topic
主题是Pulsar中的消息通道生产者Producer将消息发送到主题消费者Consumer从主题中消费消息。
Pulsar支持两种类型的主题
持久化主题Persistent Topic消息会被持久化存储确保即使在broker故障的情况下也不会丢失。非持久化主题Non-Persistent Topic消息不会被持久化存储适用于对延迟敏感但对可靠性要求较低的场景。
3、订阅Subscription
订阅是消费者与主题之间的绑定关系。Pulsar支持多种订阅类型每种订阅类型决定了消息的分发方式
独占订阅Exclusive Subscription只有一个消费者可以订阅该主题其他消费者无法订阅。共享订阅Shared Subscription多个消费者可以订阅同一个主题消息会被轮询分发给不同的消费者。故障转移订阅Failover Subscription多个消费者可以订阅同一个主题但只有一个是活跃的消费者其他消费者作为备用。当活跃消费者失败时备用消费者会接管消息消费。Key_Shared 订阅基于消息的key进行分区确保相同key的消息总是被分发给同一个消费者。
4、消息Message
消息是Pulsar中的基本数据单位由生产者发送到主题。
每个消息可以包含以下属性
消息体Payload消息的实际内容可以是任意二进制数据。消息IDMessage ID唯一标识每条消息的ID用于确认消息的消费状态。属性Properties用户可以为消息添加自定义的键值对属性方便后续处理。时间戳Timestamp消息的创建时间或发送时间。
5、分区Partition
Pulsar支持主题分区即将一个主题划分为多个分区每个分区可以独立地处理消息。分区可以提高主题的吞吐量和并发性特别是在高负载场景下。Pulsar会自动将消息均匀分布到不同的分区中。
6、Broker
Broker是Pulsar的核心组件之一负责接收生产者的消息并将其分发给消费者。注意Broker不直接存储消息而是将消息委托给BookKeeper进行持久化存储。Broker负责管理主题、订阅和消费者的连接并处理消息的路由和分发。
7、BookKeeper
BookKeeper是Pulsar的持久化存储层负责将消息持久化到磁盘。BookKeeper采用分布式日志存储机制提供了高可用性和强一致性保障。每个消息会被写入多个BookKeeper节点确保即使部分节点故障也不会丢失数据。
8、ZooKeeper
ZooKeeper是Pulsar的元数据管理组件用于存储集群的配置信息、主题和命名空间的元数据、以及Broker和BookKeeper的状态信息。ZooKeeper提供了分布式协调服务确保Pulsar集群的一致性和可靠性。
2、架构设计
Pulsar的架构设计采用了分层结构将消息存储与代理服务分离使得系统更加模块化和可扩展。
结构示例图
Pulsar的主要组件及其作用 Broker负责接收生产者的消息并将其分发给消费者。Broker不直接存储消息而是将消息委托给BookKeeper进行持久化存储。Broker还负责管理主题、订阅和消费者的连接。 BookKeeper即上图BK Client。负责将消息持久化到磁盘提供高可用性和强一致性保障。BookKeeper采用分布式日志存储机制确保消息的安全性和可靠性。 BookieBookie是BookKeeper的存储节点组成持久化地存储消息。BookKeeper采用分布式日志存储的方式将消息以日志的形式存储在多个Bookie节点上。这种设计确保了消息的可靠性和持久性即使在节点故障的情况下也能保证消息不丢失。 ZooKeeper负责存储集群的元数据包括主题、命名空间、Broker和BookKeeper的状态信息。ZooKeeper提供了分布式协调服务确保集群的一致性和可靠性。 Proxy可选Pulsar提供了一个可选的代理层Proxy允许客户端通过HTTP或WebSocket协议与Pulsar集群进行通信。Proxy可以简化客户端的连接管理并提供跨区域访问的能力。 FunctionPulsar提供了一个轻量级的流处理框架Pulsar Functions允许用户编写简单的流处理逻辑并将其部署到Pulsar集群中。Pulsar Functions可以用于实时数据处理、事件驱动计算等场景。 SQLPulsar提供了一个SQL查询引擎Pulsar SQL允许用户通过SQL语句查询Pulsar中的消息数据。Pulsar SQL可以用于数据分析、监控和告警等场景。
3、特性与优势
1、高吞吐量和低延迟
Pulsar采用了分层架构将消息存储与代理服务分离使得系统能够同时具备高吞吐量和低延迟。Broker负责处理消息的路由和分发而BookKeeper负责持久化存储两者相互协作确保消息的高效传递。
2、多租户支持
Pulsar支持多租户部署不同租户可以共享同一集群同时保证资源隔离和安全性。每个租户可以拥有自己的命名空间并可以根据需要设置不同的配置例如保留策略、订阅类型等。 即类似Nacos的命名空间实现配置服务等隔离。
3、持久化和一致性
Pulsar支持消息的持久化存储并通过BookKeeper提供强一致性保障。每个消息会被写入多个Bookie节点确保即使部分节点故障也不会丢失数据。Pulsar还支持事务和幂等性确保消息的可靠传递。
4、灵活的消息模型
Pulsar支持多种消息传递模式包括Pub/Sub、P2P和Key_Shared订阅模式。用户可以根据实际需求选择合适的订阅类型满足不同的业务场景。Pulsar还支持消息的重播、回溯和跳过等功能方便用户进行调试和故障排查。
5、多语言支持
Pulsar提供了多种编程语言的客户端库包括Java、Python、Go、C等。用户可以根据自己的技术栈选择合适的客户端库快速集成Pulsar到应用程序中。
6、丰富的生态
Pulsar拥有活跃的社区和丰富的生态系统支持与其他工具和服务集成。例如Pulsar可以与Kafka Connect、Flink、Spark等工具集成实现数据的实时处理和分析。Pulsar还提供了Pulsar Functions和Pulsar SQL等功能进一步扩展了其应用场景。
4、应用场景
1、实时数据处理
Pulsar的高吞吐量和低延迟特性使其非常适合用于实时数据处理场景。例如电商网站可以使用Pulsar来处理订单、支付、库存等实时数据确保数据的及时性和准确性。
2、物联网IoT
Pulsar的分布式架构和多租户支持使其非常适合用于物联网场景。物联网设备可以将传感器数据发送到PulsarPulsar可以将这些数据分发给不同的消费者进行处理。Pulsar还支持消息的重播和回溯功能方便用户进行历史数据分析。
3、微服务架构
Pulsar可以作为微服务之间的消息总线实现服务间的异步通信。微服务可以通过Pulsar发送和接收消息避免阻塞主线程提高系统的响应速度和稳定性。
4、日志收集和监控
Pulsar可以用于日志收集和监控场景将应用的日志数据发送到PulsarPulsar可以将这些数据分发给不同的消费者进行处理。Pulsar还支持消息的持久化存储确保日志数据不会丢失。
5、事件驱动架构
Pulsar支持事件驱动架构用户可以将事件发送到PulsarPulsar可以将这些事件分发给不同的消费者进行处理。Pulsar还支持消息的重播和回溯功能方便用户进行事件的回放和调试。
5、代码示例
1、生产者示例
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.MessageId;public class PulsarProducerExample {public static void main(String[] args) throws Exception {// 1、创建Pulsar客户端try (PulsarClient client PulsarClient.builder().serviceUrl(pulsar://localhost:6650).build()) {// 2、创建生产者try (Producerbyte[] producer client.newProducer().topic(persistent://public/default/example-topic) // 指定主题.create()) {// 3、发送消息for (int i 0; i 10; i) {String message Hello, Pulsar! i;MessageId msgId producer.send(message.getBytes()); // 发送消息System.out.println( [x] Sent message: message , msgId: msgId);}}}}
}2、消费者示例
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionType;public class PulsarConsumerExample {public static void main(String[] args) throws Exception {// 1、创建Pulsar客户端try (PulsarClient client PulsarClient.builder().serviceUrl(pulsar://localhost:6650).build()) {// 2、创建消费者try (Consumerbyte[] consumer client.newConsumer().topic(persistent://public/default/example-topic) // 监听的主题.subscriptionName(example-subscription).subscriptionType(SubscriptionType.Shared).subscribe()) {// 3、接收和消费消息while (true) { // 利用循环接收消息Messagebyte[] msg consumer.receive(); // 具体接收消息try {System.out.println( [x] Received message: new String(msg.getData()));consumer.acknowledge(msg); // 4、确认消息已消费} catch (Exception e) {consumer.negativeAcknowledge(msg); // 5、处理失败重新投递}}}}}
}6、Pulsar总结
Apache Pulsar是一个功能强大、架构灵活的消息系统特别适合大规模分布式系统的实时数据处理和异步通信。它的分层架构、多租户支持、持久化和一致性保障、灵活的消息模型等特点使其在性能、可靠性和可扩展性方面表现出色。Pulsar还拥有丰富的生态系统支持与其他工具和服务集成适用于多种应用场景。
乘风破浪会有时直挂云帆济沧海