刘兴起

I am coder. Currently doing more in backend, focused in Ruby and Python.

NSQ Introduction

29 Jan 2018 » nsq

本篇博客并不介绍 NSQ 的基本使用方法,你可以在网上搜索到很多相关的博客和实例,比如:

  1. NSQ:分布式的实时消息平台
  2. NSQ使用总结

本文将着重介绍 NSQ 的设计和使用过程中需要注意的一些问题,可以帮助你更好的理解和使用 NSQ。

NSQ 的数据流模型

NSQ 数据流模型

以上是 nsq 官网的数据流模型图,我们可以看到 nsq 在设计上的一些特点:

  • 每一个 channel 保存了topic 所有消息的副本
  • channel 中的消息是推送给 Consumers,采用的是 push model
  • channel 中的消息是随机推送给每一个 Consumer,即每一个 Consumer 只接收到 channel 中的一部分数据

nsq 的 topic 和 channel 无需提前创建,在 pub/sub 的时候会自动创建

pull vs push

nsq 之所以是一个实时的消息平台,和使用 push model 有很大的关系,push model 保证了消息可以尽快的到达 Consumers,只要 Consumers 的消费能力足够,消息就可以被实时的处理;一旦 Consumers 的消费能力跟不上,消息就会出现积压,一方面 Consumers 会不堪重负,一方面也会增大 nsqd 的压力,因为 nsqd 需要将消息重新入队或刷入磁盘。这与另外一个非常流行的消息队列 kafka 的设计完全不同,kafka 采用的是 pull model。pull model 和 push model 都各有优劣,可以从两种场景来分析:

1. Producer 的速率大于 Consumer 的速率。

对于 push model 来说,这种情况会导致消息的积压,如果 Consumer 端的处理逻辑很复杂,比如涉及到大量的磁盘和网络 I/O 操作,会加重 Consumer 的负载,甚至是崩溃。对于 nsq 来讲,为了保证消息的可靠传递,服务端必须维护消息的状态,要考虑 requeue 和持久化的处理,也会加重服务端的压力和实现复杂度。

对于 pull model 来说,这种情况的影响就小得多,因为服务端并不关心 Consumer 是否可以及时消费数据。

2. Producer 的速率小于 Consumer 的速率。

对于 push model, 如果 Consumer 有大量的线程在处理消息,而消息的生成速率更不上,就到导致 CPU 空转,造成系统资源的浪费。

对于 pull model 的情况就有点糟糕,Consumer 需要不断的去轮询服务器获取数据,但是很难准确的判断应该什么时候去拉取消息,频繁的轮询同样会给服务器造成压力。不过一般都会有对应的算法去优化这样的问题。

综合来讲,无论使用哪种模型,都是为了能够最大效率的处理数据。个人觉得,pull model 确实比 push model 有更大的灵活性,尤其是对于批量数据的处理,pull model 需要 Producer 或者 Consumer 积累到一定的数据才能处理,这无疑增大了系统的延迟和实现的复杂度。但从另一方面来说,对于 pull model, 由于 Consumer 不关心消息的状态,会让 Consumer 的逻辑更加简单,无疑也增加了易用性。

kafka pull design

消息传递的保证

NSQ 保证消息至少被传递一次,但是存在消息重复的可能,需要 Consumer 来保证操作的幂等性。

NSQ 实现这样的保证基于以下几个设计:

  1. client 会通知 nsq 它已经做好接收消息的准备,nsq 不会在 client 还没连接进来的时候发送消息;
  2. nsq 发送的消息会在本地临时存储;
  3. client 会回复 nsq 是否成功的处理了数据。如果 nsq 没有收到回复,会在配置的规定时间内将消息重新入队;

但是会存在一种极端的情况导致 NSQ 丢数据,那就是 nsqd 被 unclean shutdown,这会导致所有在内存中的消息丢失。有一种方法可以解决这个问题:在一台服务器上启动两个 nsqd 来接收消息,防止其中一个失败导致消息丢失。(很少有人这样做,会增大部署的难度,通常情况下建议 nsqd 和 producer 部署在同一台机器上,除非人为的操作失误,否则只有 nsqd 崩溃的几率是比较低的)

还有一种增加消息可靠传递的方法:将 --mem-queue-size 设置为 1,甚至是0。该值控制 channel 中的最大消息数,一旦超过了这个阈值,消息将被写入磁盘。将这个值设置为0就意味着每产生一条消息,消息就会被写入磁盘,消息丢失的概率就大大降低。

由于消息在未被成功消费的情况下(可能是超时或者 Consumer 处理失败)会重新入队,所以 nsq 不能保证消息是有序被消费的,所以 nsq 不适用于要求消息有序的系统。

关于 max-in-flight

max-in-flight

max-in-flight 是很多人都不好理解的概念,用官方开发人员的解释来说:

“max in flight” is a configuration knob for consumers to adjust the number of messages it will tell nsqd to send concurrently.

max-in-flight 就是 Consumer 告诉 nsqd 同时发送给 client 的消息数。从上图可以看出 max-in-flight 的值是 2,即 nsqd 同时发送了 2 个消息给 Consumer。可见这是 Consumer 控制的值,一方面可以方便批处理消息,也可以提高性能(不用每次处理消息后就通知 nsq 是否处理成功)。需要注意的是,如果 channel 的消息数小于 max-in-flight 的值,nsqd 依然会把消息推送给 Consumer, 而不会等到消息数达到 max-in-flight 的值后再推送。

我们是如何使用 NSQ 处理 7500 亿消息的