Go之NSQ简介,原理和使用 (2)

nsqadmin: 简单的管理界面,展示了topic, channel以及channel上的消费者,也可以创建topic,channel

/* 特性: 1. 提供一个对topic和channel统一管理的操作界面以及各种实时监控数据的展示,界面设计的很简洁,操作也很简单 2. 展示所有message的数量 3. 能够在后台创建topic和channel 4. nsqadmin的所有功能都必须依赖于nsqlookupd,nsqadmin只是向nsqlookupd传递用户操作并展示来自nsqlookupd的数据 */ NSQ工作模式

Go之NSQ简介,原理和使用

Topic和Channel

每个nsqd实例旨在一次处理多个数据流。这些数据流称为“topics”,一个topic具有1个或多个“channels”。每个channel都会收到topic所有消息的副本,实际上下游的服务是通过对应的channel来消费topic消息。

topic和channel不是预先配置的。topic在首次使用时创建,方法是将其发布到指定topic,或者订阅指定topic上的channel。channel是通过订阅指定的channel在第一次使用时创建的。

topic和channel都相互独立地缓冲数据,防止缓慢的消费者导致其他chennel的积压(同样适用于topic级别)。

channel可以并且通常会连接多个客户端。假设所有连接的客户端都处于准备接收消息的状态,则每条消息将被传递到随机客户端。例如:

Go之NSQ简介,原理和使用

生产者向某个topic中发送消息,如果topic有一个或者多个channle,那么该消息会被复制多分发送到每一个channel中。类似 rabbitmq中的fanout类型,channle类似队列。 官方说 nsq 是分布式的消息队列服务,但是在我看来只有channel到消费者这部分提现出来分布式的感觉,nsqd 这个模块其实就是单点的,nsqd 将 topic、channel、以及消息都存储在了本地磁盘,官方还建议一个生产者使用一个 nsqd,这样不仅浪费资源还没有数据备份的保障。一旦 nsqd 所在的主机磁损坏,数据都将丢失。

总而言之,消息是从topic--> channel (每个channel接受该topic的所有消息的副本)多播的,但是从channel --> consumers均匀分布 (每个消费者接受该channel的一部分消息)

NSQ接受和发送消息流程

Go之NSQ简介,原理和使用

Centos安装NSQ 下载 wget https://s3.amazonaws.com/bitly-downloads/nsq/nsq-1.2.0.linux-amd64.go1.12.9.tar.gz tar xf nsq-1.2.0.linux-amd64.go1.12.9.tar.gz -C /usr/local/ 本地解析Hosts [root@nsq-47 ~]# tail -1 /etc/hosts 192.168.43.47 nsq-47 启动 # 开三个终端,分别按顺序启动 ./nsqlookupd ./nsqd --lookupd-tcp-address=192.168.43.47:4160 ./nsqadmin --lookupd-http-address=192.168.43.47:4161 # 访问 :4171

Go之NSQ简介,原理和使用

Go操作NSQ 安装go客户端 /* go get -u github.com/nsqio/go-nsq */ 生产者 // nsq_producer/main.go package main import ( "bufio" "fmt" "os" "strings" "github.com/nsqio/go-nsq" ) // NSQ Producer Demo var producer *nsq.Producer // 初始化生产者 func initProducer(str string) (err error) { config := nsq.NewConfig() producer, err = nsq.NewProducer(str, config) if err != nil { fmt.Printf("create producer failed, err:%v\n", err) return err } return nil } func main() { nsqAddress := "192.168.43.47:4150" err := initProducer(nsqAddress) if err != nil { fmt.Printf("init producer failed, err:%v\n", err) return } reader := bufio.NewReader(os.Stdin) // 从标准输入读取 for { data, err := reader.ReadString('\n') if err != nil { fmt.Printf("read string from stdin failed, err:%v\n", err) continue } data = strings.TrimSpace(data) if strings.ToUpper(data) == "Q" { // 输入Q退出 break } // 向 'topic_demo' publish 数据 err = producer.Publish("topic_demo", []byte(data)) if err != nil { fmt.Printf("publish msg to nsq failed, err:%v\n", err) continue } } } 消费者

在/nodes这个页面我们能够很方便的查看当前接入lookupd的nsqd节点。

Go之NSQ简介,原理和使用

这个/counter页面显示了处理的消息数量,因为我们没有接入消费者,所以处理的消息数量为0。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wsxdfs.html