开源 RocketMQ 消费端仅有一个消费线程池,多个 topic 的消费会互相影响。另外同一个消费端仅有一个 listener,如果是多个 topic,需要上层业务分发。云音乐同一个应用都会有多个 topic 消费,有的多达 30 个。
因此优先级高的 topic 需要自定义自己的消费线程池,优先级低的使用公共的。另外 每个 topic 也要有自己的 listener,这样就不用上层分发。基于上述要求,我们增加订阅可以同时指定 listener 和 consumeThreadExecutor 的方式。
消费限流与降级
开源 RocketMQ 并没有提供消费限流能力,基于 Sentinel 的是 pull 模式,而不是 push 模式,不能很好满足云音乐的业务需求。
云音乐的消息消费不少都要写数据库或者访问第三方,这些消费和在线业务都是同一个应用,消息队列自身虽然具备削峰填谷的能力,但是消费端会最大化使用消费线程池,这会对在线业务也产生影响。为保证在线业务优先,需要限制消费的速度,减少瞬时高峰消息消费对在线业务的影响。
这部分可以通过调整消费线程池和消费 qps 来调整。我们选择了调整消费 qps消费限流的方式,这样能和监控数据对起来并提供控制台动态调整能力,消费线程池调整虽然我们也提供动态调整线程池能力但是并不是线性的,调整起来没有参考,比较困难。消费限流做在了底层而不是让应用层自己做,和上层的区别时,上层限流了会触发消息消费一次并且失败,底层不会失败也不算消费一次,因为还没投递上层业务。
多机房网络 Bug 修复
云音乐有部分业务部署在建德,需要消费杭州的数据。这部分消费的机器总是隔三差五报 timeout。经过排查,发现 client 新建的连接总是在关闭创建循环,非常不稳定,这部分排查 remoting 层的代码发现 client 建立连接存在并发问题,会将建立好的链接关闭。定位待开源 client 的 remoting bug 后,我们进行了修复。
另外后来几天,曲库的业务同学也报发送 3s 超时,经过仔细排查,发现异常日志和连接建立日志和网络建立并发问题的日志一致,协同业务升级到最新修复的客户端解决。业务升级上线后如上所示,发送非常平稳,不再超时。
其他
作为开源的受益者,部分改动我们会提交到 Apache RocketMQ 官方,如消费限流,消费线程池,网络 bug 修复等。