文章内容更新请以 WGrape GitHub博客 : 基于Go语言的消费程序设计与实现 为准
前言
本文原创,著作权归WGrape所有,未经授权,严禁转载
一、背景
早前业务队列出现过几次消息堆积的情况,但由于当时缺少消息相关监控,用户反馈后才发现消息已经多个小时未消费,导致未能及时处理此类问题。次之,由于消费程序是基于PHP实现,只能单线程消费,消费的速率很低,即使可以及时发现问题,也需要较长的时间才能把堆积的若干消息完全消费完成。
二、新架构要解决哪些问题
1、报警问题
- 现状 :消费程序自身没有异常报警
- 改进 :添加消费耗时过长、消费异常中止等报警
2、性能优化
- 现状 :单线程消费,非“常驻进程”且每次重启消费者都需要加载整个CI框架
- 改进 :多协程消费,常驻进程
三、新架构为什么使用Go
1、比PHP更高效的消费 2、比PHP更适合作为服务端的常驻进程服务 3、为后续Go在业务中的应用而铺路
四、新架构的设计方案
1、设计介绍
(1) 架构概览
数据层、应用层、基础层从上至下依次分布。其中,数据层是消息的源,应用层是对源消息进行相应处理,基础层用以提供通用性功能
(2) 模块结构
如下图所示,整个项目结构主要分为Control、Task功能模块,和Conf、Util、Lib、Monitor等通用性模块
(3) 进程退出流程
为确保不会出现竞争问题,必须以正确的时序控制每个协程正常退出
用户或程序主动发出退出信号,daemon模块接收信号后,会向dispatch模块发出Done指令,dispatch先向receiver协程发出Done指令,待所有consumer协程全部消费结束后,dispatch才会向向所有consumer协程发送Done指令
2、配置驱动
(1) 配置优先级
任何一个任务都可以配置单独的配置项,如果缺少某个配置项,会默认使用system
中的配置项
[system]
exchange=e.business
queue=q.business.account.update
key=k.business.account.update
block=10
deliverBy=order_id
consumerNum=10
channelLength=1024
# 可配置单独的任务 [业务线_业务名]
[my_business]
(2) 配置热更新
热更新的本质是在保留当前进程的前提下,更新内存中的数据,使整个服务使用更新后最新的内存数据。如更新配置后,在当前的进程空间内,通过IPC机制实现服务把配置信息重新载入内存。
(3) 更多配置项
根据实际情况,再确定是否需要有“消费模式”、“消费最长时间限制”等配置项
3、消息投递规则
接收协程会根据配置的投递规则,将消息投递至不同的通道。常用的投递规则是取模,为N个Channel从0至N-1编号,根据配置的 consumerNum
和 deliverBy
两个字段,通过取模运算,决定如何投递
如配置 consumerNum=10
和 deliverBy="order_id"
,消息体为 {"data":{"order_id":35432}, "url":""}
,则接收协程取到此消息后,会做运算 intval(data.order_id) % 10 = 2
最终会把此消息投至编号为2的通道中。
如果消息中没有deliverBy配置的参数,则默认会使用当前的纳秒和秒之和做取模运算
4、进程状态控制
主要基于IPC机制,通过新启动的“命令进程”对“服务进程”的控制,实现平滑重启、安全停止、配置热更新等功能
5、进程内部通信流程
进程内部通过信号机制实现“中断类”操作的处理,然后通过一种叫做“控制通道”的channel实现协程间通信
6、进程内部失败类型
进程内部出现的失败有时会产生副作用,比如接收到消息时却消费失败,这种失败就可能需要特殊处理。目前内部失败类型以 是否开始正常工作
作为界限,分为ReadyFailed和DoingFailed两类。第一类失败是不会产生副作用的失败,第二类的失败是有副作用的失败
7、进程安全
(1) 进程监控
- 进程状态监控 :CPU、内存的使用情况,防止CPU使用率过高和内存增长引起的内存泄露
- 协程和管道监控 :各个协程和Channel的实时工作状态(消费协程是否异常)和详情数据
(2) 消息持久化
如果消费协程接收消息的len(channel)>1
,必须有相应的消息持久化处理,以保证消息的不丢失
(3) 内存泄露问题处理
运维阶段使用PProf、Prometheus实时监控内存
(4) 堵塞死锁问题处理
运维阶段使用PProf、Prometheus实时监控内存、CPU占用
8、底层服务
(1) 日志服务
使用通用的日志服务和日志收集
(2) 报警服务
通过 Reserved API 机制,接入至通用API Notice报警平台,再为其设置接口级别即可。实现过程即是在出现异常时,手动写入一条uri为MQ专用API的日志
{
"uri" : "/business/mq/consumer",
"level" : "FATAL"
}
注 :不要在代码中添加任何发邮件等报警功能,所有的报警功能都通过写特定日志和API Notice配置的方式实现,关于API Notice报警平台请查看设计与实现基于Kibana Watcher的服务分级报警平台
(3) 监控服务
主要使用PProf、Prometheus实现进程实时监控
9、项目管理
(1) 横纵向扩展
横向扩展指可以支持无限的多个任务加入,纵向扩展指当前的架构需要支持后续可能出现的任务协程整合,即实现由根协程做任务转发的完整服务,或者是支持服务可以快速做适当性的架构调整
(2) K8S部署
通过K8S完成容器化部署
(3) 持续集成
代码规范、静态检查等工作
五、如何使用
1、本地部署
执行 deploy/build.sh
脚本,自动设置 Go Mod
相关配置和相关目录、文件的创建等环境操作
2、服务启动
go run main.go --start development my_business
3、服务重启
go run main.go --restart development my_business
4、配置热更新
go run main.go --update development my_business
5、服务停止
go run main.go --stop development my_business
6、异常停止
使用 Control+C 、kill pid 中止进程