基于Go语言的消费程序设计与实现

Posted by WGrape的博客 on December 10, 2020

文章内容更新请以 WGrape GitHub博客 : 基于Go语言的消费程序设计与实现 为准

前言

本文原创,著作权归WGrape所有,未经授权,严禁转载

一、背景

早前业务队列出现过几次消息堆积的情况,但由于当时缺少消息相关监控,用户反馈后才发现消息已经多个小时未消费,导致未能及时处理此类问题。次之,由于消费程序是基于PHP实现,只能单线程消费,消费的速率很低,即使可以及时发现问题,也需要较长的时间才能把堆积的若干消息完全消费完成。

二、新架构要解决哪些问题

1、报警问题

  • 现状 :消费程序自身没有异常报警
  • 改进 :添加消费耗时过长、消费异常中止等报警

2、性能优化

  • 现状 :单线程消费,非“常驻进程”且每次重启消费者都需要加载整个CI框架
  • 改进 :多协程消费,常驻进程

三、新架构为什么使用Go

1、比PHP更高效的消费 2、比PHP更适合作为服务端的常驻进程服务 3、为后续Go在业务中的应用而铺路

四、新架构的设计方案

1、设计介绍

(1) 架构概览

数据层、应用层、基础层从上至下依次分布。其中,数据层是消息的源,应用层是对源消息进行相应处理,基础层用以提供通用性功能 image

MQ消费架构10.xml

(2) 模块结构

如下图所示,整个项目结构主要分为Control、Task功能模块,和Conf、Util、Lib、Monitor等通用性模块

image

(3) 进程退出流程

为确保不会出现竞争问题,必须以正确的时序控制每个协程正常退出

用户或程序主动发出退出信号,daemon模块接收信号后,会向dispatch模块发出Done指令,dispatch先向receiver协程发出Done指令,待所有consumer协程全部消费结束后,dispatch才会向向所有consumer协程发送Done指令

2、配置驱动

(1) 配置优先级

任何一个任务都可以配置单独的配置项,如果缺少某个配置项,会默认使用system中的配置项

[system]
exchange=e.business
queue=q.business.account.update
key=k.business.account.update
block=10
deliverBy=order_id
consumerNum=10
channelLength=1024

# 可配置单独的任务 [业务线_业务名]
[my_business]

(2) 配置热更新

热更新的本质是在保留当前进程的前提下,更新内存中的数据,使整个服务使用更新后最新的内存数据。如更新配置后,在当前的进程空间内,通过IPC机制实现服务把配置信息重新载入内存。

(3) 更多配置项

根据实际情况,再确定是否需要有“消费模式”、“消费最长时间限制”等配置项

3、消息投递规则

接收协程会根据配置的投递规则,将消息投递至不同的通道。常用的投递规则是取模,为N个Channel从0至N-1编号,根据配置的 consumerNumdeliverBy 两个字段,通过取模运算,决定如何投递

image

如配置 consumerNum=10deliverBy="order_id",消息体为 {"data":{"order_id":35432}, "url":""},则接收协程取到此消息后,会做运算 intval(data.order_id) % 10 = 2 最终会把此消息投至编号为2的通道中。

如果消息中没有deliverBy配置的参数,则默认会使用当前的纳秒和秒之和做取模运算

4、进程状态控制

主要基于IPC机制,通过新启动的“命令进程”对“服务进程”的控制,实现平滑重启、安全停止、配置热更新等功能

5、进程内部通信流程

进程内部通过信号机制实现“中断类”操作的处理,然后通过一种叫做“控制通道”的channel实现协程间通信

image

6、进程内部失败类型

进程内部出现的失败有时会产生副作用,比如接收到消息时却消费失败,这种失败就可能需要特殊处理。目前内部失败类型以 是否开始正常工作 作为界限,分为ReadyFailed和DoingFailed两类。第一类失败是不会产生副作用的失败,第二类的失败是有副作用的失败

image

7、进程安全

(1) 进程监控

  • 进程状态监控 :CPU、内存的使用情况,防止CPU使用率过高和内存增长引起的内存泄露
  • 协程和管道监控 :各个协程和Channel的实时工作状态(消费协程是否异常)和详情数据

(2) 消息持久化

如果消费协程接收消息的len(channel)>1,必须有相应的消息持久化处理,以保证消息的不丢失

(3) 内存泄露问题处理

运维阶段使用PProf、Prometheus实时监控内存

(4) 堵塞死锁问题处理

运维阶段使用PProf、Prometheus实时监控内存、CPU占用

8、底层服务

(1) 日志服务

使用通用的日志服务和日志收集

(2) 报警服务

通过 Reserved API 机制,接入至通用API Notice报警平台,再为其设置接口级别即可。实现过程即是在出现异常时,手动写入一条uri为MQ专用API的日志

{
    "uri" : "/business/mq/consumer",
    "level" : "FATAL"
}

注 :不要在代码中添加任何发邮件等报警功能,所有的报警功能都通过写特定日志和API Notice配置的方式实现,关于API Notice报警平台请查看设计与实现基于Kibana Watcher的服务分级报警平台

(3) 监控服务

主要使用PProf、Prometheus实现进程实时监控 image

9、项目管理

(1) 横纵向扩展

横向扩展指可以支持无限的多个任务加入,纵向扩展指当前的架构需要支持后续可能出现的任务协程整合,即实现由根协程做任务转发的完整服务,或者是支持服务可以快速做适当性的架构调整

(2) K8S部署

通过K8S完成容器化部署

(3) 持续集成

代码规范、静态检查等工作

五、如何使用

1、本地部署

执行 deploy/build.sh 脚本,自动设置 Go Mod 相关配置和相关目录、文件的创建等环境操作

2、服务启动

go run main.go --start development my_business

3、服务重启

go run main.go --restart development my_business

4、配置热更新

go run main.go --update development my_business

5、服务停止

go run main.go --stop development my_business

6、异常停止

使用 Control+C 、kill pid 中止进程