nsqadmin: 简单的管理界面,展示了topic, channel以及channel上的消费者,也可以创建topic,channel
/*
特性:
1. 提供一个对topic和channel统一管理的操作界面以及各种实时监控数据的展示,界面设计的很简洁,操作也很简单
2. 展示所有message的数量
3. 能够在后台创建topic和channel
4. nsqadmin的所有功能都必须依赖于nsqlookupd,nsqadmin只是向nsqlookupd传递用户操作并展示来自nsqlookupd的数据
*/
NSQ工作模式

Topic和Channel
每个nsqd实例旨在一次处理多个数据流。这些数据流称为“topics”,一个topic具有1个或多个“channels”。每个channel都会收到topic所有消息的副本,实际上下游的服务是通过对应的channel来消费topic消息。
topic和channel不是预先配置的。topic在首次使用时创建,方法是将其发布到指定topic,或者订阅指定topic上的channel。channel是通过订阅指定的channel在第一次使用时创建的。
topic和channel都相互独立地缓冲数据,防止缓慢的消费者导致其他chennel的积压(同样适用于topic级别)。
channel可以并且通常会连接多个客户端。假设所有连接的客户端都处于准备接收消息的状态,则每条消息将被传递到随机客户端。例如:

生产者向某个topic中发送消息,如果topic有一个或者多个channle,那么该消息会被复制多分发送到每一个channel中。类似 rabbitmq中的fanout类型,channle类似队列。 官方说 nsq 是分布式的消息队列服务,但是在我看来只有channel到消费者这部分提现出来分布式的感觉,nsqd 这个模块其实就是单点的,nsqd 将 topic、channel、以及消息都存储在了本地磁盘,官方还建议一个生产者使用一个 nsqd,这样不仅浪费资源还没有数据备份的保障。一旦 nsqd 所在的主机磁损坏,数据都将丢失。
总而言之,消息是从topic--> channel (每个channel接受该topic的所有消息的副本)多播的,但是从channel --> consumers均匀分布 (每个消费者接受该channel的一部分消息)
NSQ接受和发送消息流程

Centos安装NSQ
下载
wget https://s3.amazonaws.com/bitly-downloads/nsq/nsq-1.2.0.linux-amd64.go1.12.9.tar.gz
tar xf nsq-1.2.0.linux-amd64.go1.12.9.tar.gz -C /usr/local/
本地解析Hosts
[root@nsq-47 ~]# tail -1 /etc/hosts
192.168.43.47 nsq-47
启动
# 开三个终端,分别按顺序启动
./nsqlookupd
./nsqd --lookupd-tcp-address=192.168.43.47:4160
./nsqadmin --lookupd-http-address=192.168.43.47:4161
# 访问
:4171

Go操作NSQ
安装go客户端
/*
go get -u github.com/nsqio/go-nsq
*/
生产者
// nsq_producer/main.go
package main
import (
"bufio"
"fmt"
"os"
"strings"
"github.com/nsqio/go-nsq"
)
// NSQ Producer Demo
var producer *nsq.Producer
// 初始化生产者
func initProducer(str string) (err error) {
config := nsq.NewConfig()
producer, err = nsq.NewProducer(str, config)
if err != nil {
fmt.Printf("create producer failed, err:%v\n", err)
return err
}
return nil
}
func main() {
nsqAddress := "192.168.43.47:4150"
err := initProducer(nsqAddress)
if err != nil {
fmt.Printf("init producer failed, err:%v\n", err)
return
}
reader := bufio.NewReader(os.Stdin) // 从标准输入读取
for {
data, err := reader.ReadString('\n')
if err != nil {
fmt.Printf("read string from stdin failed, err:%v\n", err)
continue
}
data = strings.TrimSpace(data)
if strings.ToUpper(data) == "Q" { // 输入Q退出
break
}
// 向 'topic_demo' publish 数据
err = producer.Publish("topic_demo", []byte(data))
if err != nil {
fmt.Printf("publish msg to nsq failed, err:%v\n", err)
continue
}
}
}
消费者
在/nodes这个页面我们能够很方便的查看当前接入lookupd的nsqd节点。

这个/counter页面显示了处理的消息数量,因为我们没有接入消费者,所以处理的消息数量为0。