“Pulsar is a distributed pub-sub messaging platform with a very flexible messaging model and an intuitive client API.”
Pulsar是pub-sub模式的分布式消息平台,拥有灵活的消息模型和直观的客户端API。
Pulsar由雅虎开发并开源的下一代消息系统,目前是Apache软件基金会的孵化器项目。
本片文章简单介绍Pulsar的Producer,包含以下内容:
Producer的设计
消息发送的实现
1. Producer设计 1.1 创建Producer以上是Pulsar官网上创建一个Producer的示例代码。
创建的过程如下:
指定serviceUrl创建PulsarClient
指定Producer发送消息的Topic,通过PulsarClient创建Producer
通过上述的创建代码可以推测:
serviceUrl应该是用于做服务发现的,通过serviceUrl查找Broker的信息
Producer指定了Topic,那么一个Producer只能往特定的Topic发送消息
1.2 Producer APIPulsar中,发送相关的接口为Producer,如上图所示:
Producer定义了发送接口
ProducerBase作为抽象类,提供了基础实现
ProducerImpl则是真正的实现类
PartitionedProducerImpl看着和分区相关,这个之后再看
Producer 接口具体如下:
public interface Producer<T> extends Closeable { /** * 返回Producer发送消息的Topic */ String getTopic(); /** * Producer的名称 */ String getProducerName(); /** * 同步发送消息 */ MessageId send(T message) throws PulsarClientException; /** * 有发送消息 */ CompletableFuture<MessageId> sendAsync(T message); /** * Flush客户端完成中的消息并等待所有消息成功持久化 * @since 2.1.0 * @see #flushAsync() */ void flush() throws PulsarClientException; /** * 异步Flush客户端完成中的消息并等待所有消息成功持久化 * @since 2.1.0 * @see #flush() */ CompletableFuture<Void> flushAsync(); /** * 创建TypedMessageBuilder,用于构建消息 */ TypedMessageBuilder<T> newMessage(); /** * 同步发送消息,已经被弃用 */ @Deprecated MessageId send(Message<T> message) throws PulsarClientException; /** * 异步发送消息,已经被弃用 */ @Deprecated CompletableFuture<MessageId> sendAsync(Message<T> message); /** * 获取Producer发送的最后一个序列号 */ long getLastSequenceId(); /** * 获取Producer的统计信息 */ ProducerStats getStats(); /** * 异步关闭Producer并且释放资源 */ CompletableFuture<Void> closeAsync(); /** * 返回Producer是否连接到Broker上 */ boolean isConnected(); }