# pulsar简介

plusar是和kafak同类型的消息处理平台,这两年开始走入大众视野。相比于老牌kafka,新秀pulsar带来了一些让我喜欢的新功能:

  1. 多种订阅方式: 独占、灾备、共享、Key_Shared
  2. 延迟发送: 消息发送到topic后,consumer过一段时间才消费到这条消息
  3. 易用: 不需要像kafka那样去计算分区、副本数量等,pulsar不够用了直接无脑扩broker

详细的说明就不赘述了,我们来快速使用体验下。

# 安装

在安装前需要有java环境,本次使用的系统是Ubuntu 20.04, java版本是17.02。

本次使用的pulsar版本是2.10.1。

  1. 获取安装包
$ wget https://archive.apache.org/dist/pulsar/pulsar-2.10.1/apache-pulsar-2.10.1-bin.tar.gz
  1. 解压
$ tar xvfz apache-pulsar-2.10.1-bin.tar.gz
$ cd apache-pulsar-2.10.1
  1. 启动服务端
$ ./bin/pulsar standalone

看到如下信息就是启动成功了:

2022-08-13T21:34:46,652+0800 [worker-scheduler-0] INFO  org.apache.pulsar.functions.worker.SchedulerManager - Schedule summary - execution time: 0.033926031 sec | total unassigned: 0 | stats: {"Added": 0, "Updated": 0, "removed": 0}
{
  "c-standalone-fw-localhost-8080" : {
    "originalNumAssignments" : 0,
    "finalNumAssignments" : 0,
    "instancesAdded" : 0,
    "instancesRemoved" : 0,
    "instancesUpdated" : 0,
    "alive" : true
  }
}

如果想后台启动可以执行如下命令:

$ ./bin/pulsar-daemon start standalone

# 使用

  1. 查看健康状态
$ ./bin/pulsar-admin brokers healthcheck
ok
  1. 发送一个消息
$ ./bin/pulsar-client produce k-topic --messages "hello test"

如果正常发送的话可以看到如下提示:

2022-08-13T21:43:50,559+0800 [main] INFO  org.apache.pulsar.client.cli.PulsarClientTool - 1 messages successfully produced
  1. 消费一个消息
$ ./bin/pulsar-client consume j-topic -s "first-subscription"

如果正常发送的话可以看到刚才发的那个消息:

----- got message -----
key:[null], properties:[], content:hello test

可以看到整个过程我们不需要预先做其他的设置,不需要手动去处理zookeeper,启动pulsar后就可以直接使用了。

# Go使用

pulsar官方客户端除了有java之外,还有Python、go、c++、nodejs、C#,还支持websocket、以及REST api 来发送消息,可以说是很方便了。

我们来用go试试。

  1. 准备好相关扩展

创建一个测试目录,并初始化go mod:

$ mkdir test_dir && cd test_dir
$ go mod init test_dir 
$ go mod tidy 

安装pulsar客户端:

$ go get -u "github.com/apache/pulsar-client-go/pulsar"
  1. 创建生产者
package main

import (
	"context"
	"fmt"
	"github.com/apache/pulsar-client-go/pulsar"
	"log"
	"time"
)

func main() {

	// 创建客户端
	client, err := pulsar.NewClient(pulsar.ClientOptions{
		URL:               "pulsar://localhost:6650",
		OperationTimeout:  30 * time.Second,
		ConnectionTimeout: 30 * time.Second,
	})
	if err != nil {
		log.Fatalf("Could not instantiate Pulsar client: %v", err)
	}

	// 创建生产者
	producer, err := client.CreateProducer(pulsar.ProducerOptions{
		Topic: "go-topic",
	})
	if err != nil {
		log.Fatal(err)
	}
	defer client.Close()

	// 每隔3秒发一次消息
	num := 0
	for {
		msg := fmt.Sprintf("hello %d", num)
		_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
			Payload: []byte(msg),
		})
		num += 1
		time.Sleep(time.Duration(3) * time.Second)
	}
}
  1. 创建消费者
package main

import (
	"context"
	"fmt"
	"github.com/apache/pulsar-client-go/pulsar"
	"log"
	"time"
)

func main() {

	// 创建客户端
	client, err := pulsar.NewClient(pulsar.ClientOptions{
		URL:               "pulsar://localhost:6650",
		OperationTimeout:  30 * time.Second,
		ConnectionTimeout: 30 * time.Second,
	})
	if err != nil {
		log.Fatalf("Could not instantiate Pulsar client: %v", err)
	}

	// 创建消费者
	consumer, err := client.Subscribe(pulsar.ConsumerOptions{
		Topic:            "go-topic",
		SubscriptionName: "my-sub",
		Type:             pulsar.Shared,
	})
	if err != nil {
		log.Fatal(err)
	}
	defer consumer.Close()

	// 消费消息
	for {
		msg, err := consumer.Receive(context.Background())
		if err != nil {
			log.Fatal(err)
		}

		fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
			msg.ID(), string(msg.Payload()))

		consumer.Ack(msg)
	}
}

  1. 打包测试
$ go build -o consumer consumer.go
$ go build -o produce produce.go
$ chmod +x consumer produce

启动一个生产者

$ ./produce 
INFO[0000] [Connecting to broker]                        remote_addr="pulsar://localhost:6650"
INFO[0000] [TCP connection established]                  local_addr="127.0.0.1:33210" remote_addr="pulsar://localhost:6650"
INFO[0000] [Connection is ready]                         local_addr="127.0.0.1:33210" remote_addr="pulsar://localhost:6650"
INFO[0000] [Created producer]                            cnx="127.0.0.1:33210 -> 127.0.0.1:6650" producerID=1 producer_name=standalone-2-9 topic="persistent://public/default/go-topic"

再启动一个消费者,获取消息

$ ./consumer 
INFO[0000] [Connecting to broker]                        remote_addr="pulsar://localhost:6650"
INFO[0000] [TCP connection established]                  local_addr="127.0.0.1:33212" remote_addr="pulsar://localhost:6650"
INFO[0000] [Connection is ready]                         local_addr="127.0.0.1:33212" remote_addr="pulsar://localhost:6650"
INFO[0000] [Connected consumer]                          consumerID=1 name=xgqua subscription=my-sub topic="persistent://public/default/go-topic"
INFO[0000] [Created consumer]                            consumerID=1 name=xgqua subscription=my-sub topic="persistent://public/default/go-topic"
Received message msgId: pulsar.trackingMessageID{messageID:pulsar.messageID{ledgerID:242, entryID:5, batchIdx:0, partitionIdx:0}, tracker:(*pulsar.ackTracker)(nil), consumer:(*pulsar.partitionConsumer)(0xc000274ea0), receivedTime:time.Time{wall:0xc0b60af1958c2114, ext:2499737025, loc:(*time.Location)(0x1024400)}} -- content: 'hello 4'
Received message msgId: pulsar.trackingMessageID{messageID:pulsar.messageID{ledgerID:242, entryID:6, batchIdx:0, partitionIdx:0}, tracker:(*pulsar.ackTracker)(nil), consumer:(*pulsar.partitionConsumer)(0xc000274ea0), receivedTime:time.Time{wall:0xc0b60af255a034ea, ext:5501052821, loc:(*time.Location)(0x1024400)}} -- content: 'hello 5'
Received message msgId: pulsar.trackingMessageID{messageID:pulsar.messageID{ledgerID:242, entryID:7, batchIdx:0, partitionIdx:0}, tracker:(*pulsar.ackTracker)(nil), consumer:(*pulsar.partitionConsumer)(0xc000274ea0), receivedTime:time.Time{wall:0xc0b60af315c8fc87, ext:8503725362, loc:(*time.Location)(0x1024400)}} -- content: 'hello 6'

今天的介绍就到这里了,感谢阅读。