简介
NSQ 是 Go 语言编写的,开源的分布式消息队列中间件,其设计的目的是用来大规模地处理每天数以十亿计级别的消息。NSQ 具有分布式和去中心化拓扑结构,该结构具有无单点故障、故障容错、高可用性以及能够保证消息的可靠传递的特征,是一个成熟的、已在大规模生成环境下应用的产品。
NSQ 在国内公司用的很少,在使用当中愈发的觉得惊喜,比如他的简单易用、部署快捷,再比如之前比较困扰的 延时定时消息,发现 nsq 也支持,官方文档比较全,咨询问题时回复也非常的耐心和即时,所以我觉得有必要发布一篇文章来介绍下 nsq,惠及大众。
nsq 有三个必要的组建 nsqd、nsqlookupd、nsqadmin 其中 nsqd 和 nsqlookup 是必须部署的 下面我们一一介绍。
nsqd
负责接收消息,存储队列和将消息发送给客户端,nsqd 可以多机器部署,当你使用客户端向一个 topic 发送消息时,可以配置多个 nsqd 地址,消息会随机的分配到各个 nsqd 上,nsqd 优先把消息存储到内存 channel 中,当内存 channel 满了之后,则把消息写到磁盘文件中。他监听了两个 tcp 端口,一个用来服务客户端,一个用来提供 http 的接口 ,nsqd 启动时置顶下 nsqlookupd 地址即可:
nsqd –lookupd-tcp-address=127.0.0.1:4160
也可以指定端口 与数据目录
nsqd –lookupd-tcp-address=127.0.0.1:4160 --broadcast-address=127.0.0.1 -tcp-address=127.0.0.1:4154 -http-address=”0.0.0.0:4155″ –data-path=/data/nsqdata
其他配置项可详见官网
nsqlookupd
主要负责服务发现 负责 nsqd 的心跳、状态监测,给客户端、nsqadmin 提供 nsqd 地址与状态
nsqadmin:
nsqadmin 是一个 web 管理界面 启动方式如下:
nsqadmin –lookupd-http-address=127.0.0.1:4161
channel 详情页示例图如下 ,empty 可以清空当前 channel 的信息,delete 删除当前 channel, pause 是暂停消息消费。
图中也有几个比较重要的参数 depth 当前的积压量,in-flight 代表已经投递还未消费掉的消息,deferred 是未消费的定时(延时)消息数,ready count 比较重要,go 的客户端是通过设置 max-in-flight 除以客户端连接数得到的,代表一次推给客户端多少条消息,或者客户端准备一次性接受多少条消息,谨慎设置其值,因为可能造成服务器压力,如果消费能力比较弱,rdy 建议设置的低一点比如 3
Topic 和 Channel
其实 nsqd 相当于 kafka 当中的分区,channel 和 consumers 客户端的多个连接 相当于 kafka 的消费组,但 nsq 比 kafka 使用方式便捷概念上更容易理解
抛开与 kafka 的对比,nsq 的 topic 可以设置多个 channel,因为有可能有多个业务方需要定值 topic 的消息,这样互不影响,
当然一个消息会发送 topic 下的所有 channel, 然后会分配到不同客户端的连接上,如下图。
这篇文章主要介绍 nsq 的使用,源码就不展开讲,如果有兴趣的同学多的话 过几天我会再开一篇专门叙述 nsq 的源码与分析。
这里提下延时消息
nsq 支持延时消息的投递,比如我想这条消息 5 分钟之后才被投递出去被客户端消费,较于普通的消息投递,多了个毫秒数,默认支持最大的毫秒数为 3600000 毫秒也就是 60 分钟,不过这个值可以在 nsqd 启动的时候 用 -max-req-timeout 参数修改最大值。
延时消息可用于以下场景,比如一个订单超过 30 分钟未付款,修改其状态 或者给客户发短信提醒,比如之前看到的滴滴打车订单完成后 一定时间内未评价的可以未其设置默认值,再比如用户的积分过期,等等场景避免了全表扫描,异步处理,kafka 不支持延时消息的投递,目前知道支持的有 rabbitmq rocketmq, 但是 rabbitmq 有坑,有可能会超时投递,而 rocketmq 只有阿里云付费版支持的比较好。
nsq 延时消息的实现是用最小堆算法完成,作者继承实现 heap 的一系类接口,专门写了一个 pqueque 最小堆的优先队列,在 internal/pequeque 目录可以看到相关实现,pub 的时候如果 chanMsg.deferred != 0 则会调用 channel.PutMessageDeferred 方法,最终会调用继承了 go heap 接口的 pqueque.push 方法
延时消息的处理 和普通消息一样都是 nsqd/protocol_v2.go 下 messagePump 中把消息发送给客户端 然后在 queueScanWorker 中分别处理,pop 是 peekAndShift 方法中,拿当前时间 和 deferred[0]对比如果大于 就弹出发送给客户端 如下代码:
func (n *NSQD) queueScanWorker(workCh chan *Channel, responseCh chan bool, closeCh chan int) { for { select { case c := <-workCh: now := time.Now().UnixNano() dirty := false if c.processInFlightQueue(now) { dirty = true } if c.processDeferredQueue(now) { dirty = true } responseCh <- dirty case <-closeCh: return } } } func (c *Channel) processDeferredQueue(t int64) bool { c.exitMutex.RLock() defer c.exitMutex.RUnlock() if c.Exiting() { return false } dirty := false for { c.deferredMutex.Lock() item, _ := c.deferredPQ.PeekAndShift(t) c.deferredMutex.Unlock() if item == nil { goto exit } dirty = true msg := item.Value.(*Message) _, err := c.popDeferredMessage(msg.ID) if err != nil { goto exit } c.put(msg) } exit: return dirty } func (pq *PriorityQueue) PeekAndShift(max int64) (*Item, int64) { if pq.Len() == 0 { return nil, 0 } item := (*pq)[0] if item.Priority > max { return nil, item.Priority - max } heap.Remove(pq, 0) return item, 0 }
php 和 go 的客户端的使用
官网客户端链接:Client Libraries php 客户端之前官网有一个 5 年前比较老的客户端,已经没人维护 甚至无法运行,于是我贡献了一个 php72 扩展版本 php-nsq,速度块了近三倍,正在逐步完善,支持各种配置与特性,目前已被官网收纳,简单介绍下使用 顺便求下 star
php-nsq pub :
$nsqd_addr = array( "127.0.0.1:4150", "127.0.0.1:4154" ); $nsq = new Nsq(); $is_true = $nsq->connect_nsqd($nsqd_addr); for($i = 0; $i < 20; $i++){ $nsq->publish("test", "nihao"); }
php-nsq 延时 pub :
参数 仅仅多一个毫秒参数,so easy!
$deferred = new Nsq(); $isTrue = $deferred->connectNsqd($nsqdAddr); for($i = 0; $i < 20; $i++){ $deferred->deferredPublish("test", "message daly", 3000); // 第三值默认范围 millisecond default : [0 < millisecond < 3600000] ,可以更改 上面已提到 }
php-nsq sub :
抛异常消息可以自动重试,重试时间可以有 retry_delay_time 设定,多少时间后再次接收被重试的消息
$nsq_lookupd = new NsqLookupd("127.0.0.1:4161"); $nsq = new Nsq(); $config = array( "topic" => "test", "channel" => "struggle", "rdy" => 2, "connect_num" => 1, "retry_delay_time" => 5000, ); $nsq->subscribe($nsq_lookupd, $config, function($msg){ echo $msg->payload; echo $msg->attempts; echo $msg->message_id; echo $msg->timestamp; });
go client pub
package main import ( "github.com/nsqio/go-nsq" ) var producer *nsq.Producer func main() { nsqd := "127.0.0.1:4150" producer, err := nsq.NewProducer(nsqd, nsq.NewConfig()) producer.Publish("test", []byte("nihao")) if err != nil { panic(err) } }
go client sub
package main import ( "fmt" "sync" "github.com/nsqio/go-nsq" ) type NSQHandler struct { } func (this *NSQHandler) HandleMessage(msg *nsq.Message) error { fmt.Println("receive", msg.NSQDAddress, "message:", string(msg.Body)) return nil } func testNSQ() { waiter := sync.WaitGroup{} waiter.Add(1) go func() { defer waiter.Done() config:=nsq.NewConfig() config.MaxInFlight=9 for i := 0; i<10; i++ { consumer, err := nsq.NewConsumer("test", "struggle", config) if nil != err { fmt.Println("err", err) return } consumer.AddHandler(&NSQHandler{}) err = consumer.ConnectToNSQD("127.0.0.1:4150") if nil != err { fmt.Println("err", err) return } } select{} }() waiter.Wait() } func main() { testNSQ(); }