基于NSQ的PHP消息队列架构和原理

Posted by WGrape的博客 on August 5, 2022

文章内容更新请以 WGrape GitHub博客 : 基于NSQ的PHP消息队列架构和原理 为准

前言

本文原创,著作权归WGrape所有,未经授权,严禁转载

一、架构

image

二、消息的生命周期

1、产生

消息由业务方产生,由于不同的Topic代表着不同的路由策略,所以Topic一般也用于区分不同的业务,业务方只需要关心消息所属的Topic

2、发布

业务方通过调用nsqd提供的HTTP接口(/pub?topic=name&defer=10),实现消息的发布,这时会向nsqd传输一个具有固定格式的数据结构

image

00000140qwertyuiopasdfgh{"topic":"test","classname":"UserService","methodname":"addUser","param":["userid", "username", "password"],"addtime":"2020-11-27 14:30:34"}

3、接收

NSQ内部的Http模块当监听到请求/pub接口时,会调用doPUB方法,由doPUB方法实现消息的接收

image

doPUB方法内部主要做以下事情 (1) 消息内容的检查,主要是长度的检查、参数的检查(如defer) (2) 从请求体中获取消息所属的Topic,并创建一个Topic对象 (3) 由Topic对象实现消息的发布

image

image

在Topic对象的PutMessage方法中,主要会进行一次锁处理,以保证消息发布的线程安全

image

消息的Pub最终由Topic的put方法完成,并把消息送入memoryMsgChan管道中,初始状态下,由于没有订阅此Topic的消费者客户端, 所以消息会在管道中排队,如果管道已满,会把消息写入后备队列中。

writeMessageToBackend是一个抽象的后备队列写入接口,以实现不同的后备队列,如磁盘

image

4、订阅

在NSQProxy中,startConsume()方法实现对Topic的订阅

image

image

在startConsume方法中,核心需要完成两个工作 : 1、创建消费者客户端实例,包括消费配置、消息回调等 2、连接至nsqlookupd,实现消费者客户端与nsqd的动态连接

image

5、传输

在nsqd启动成功后,会执行IOLoop,IOLoop会监听Topic对应的管道,当管道中有消息到达时,接收消息并传输给消费者客户端

image

image

5、回调

当消费者客户端接收到消息后,会执行HandleMessage方法,并通过Socket把消息继续向下传递给对应消费机上的MeepoPS进程

image

image

6、消费

在Worker机器上运行的MeepoPS进程,接收到消息后,会先检查数据包的长度,当数据包完全接收完后,会进行一个简单的解码操作,把消息头部的8字节数据去掉,并调用callbackNewData方法,把去掉头部数据的消息传递过去,最终实现对消息的消费

image

image

image

三、核心原理

1、消息的发布

nsqd提供消息发布的功能,可以通过HTTP接口实现,也可以通过Socket通信,传输PUB指令实现。

2、消息的存储

消息存储在Channel中,如果Channel已满,会把消息存储在后备队列中。其中对Channel和后备队列的存取的切换,由nsqd底层实现,使用无感知

3、消息的拓扑结构

消费者客户端通过注册中心(nsqlookupd)实现与nsqd的动态绑定后,nsqd内部会记录Topic、Channel、Consumer的拓扑结构,其中Topic与Channel是一对多(Broadcast)的关系,Channel和Consumer也是一对多(Load Balance)的关系。

image

4、NSQProxy工作原理

在启动NSQProxy后,会从数据库中获取所有Topic、Channel与Worker的对应关系,依次创建消费者客户端,并通过连接至nsqlookupd,实现消息从NSQ到NSQProxy的转发

image