TrumanWong

RabbitMQ优先级队列

TrumanWong
5/18/2024

创建队列时需要指定x-max-priority属性,并设置一个优先级数值。

生产者

package main

import (
    "fmt"
    amqp "github.com/rabbitmq/amqp091-go"
    "github.com/trumanwong/go-tools/mq"
    "os"
    "time"
)

func main() {
    arguments := make(amqp.Table)
    arguments["x-max-priority"] = int64(9)
    queue := mq.NewRabbitMQ(&mq.Options{
        Name:          os.Getenv("RABBITMQ_QUEUE_NAME"),
        Addr:          os.Getenv("RABBITMQ_ADDR"),
        PrefetchCount: 0,
        PrefetchSize:  0,
        Global:        false,
        Consume:       nil,
        Arguments:     arguments,
    })
    messages := []string{"Hello, World!", "Hello, RabbitMQ!", "Hello, Go!"}
    // Attempt to push a message every 2 seconds
    for _, message := range messages {
        for {
            if err := queue.PushV2(amqp.Publishing{
                ContentType: "text/plain",
                Body:        []byte(message),
                Priority:    0,
            }); err != nil {
                fmt.Printf("Push failed: %s\n", err)
                time.Sleep(time.Second * 3)
            } else {
                fmt.Println("Push succeeded!")
                break
            }
        }
    }
    for {
        if err := queue.PushV2(amqp.Publishing{
            ContentType: "text/plain",
            Body:        []byte("Test Priority"),
            Priority:    9,
        }); err != nil {
            time.Sleep(time.Second * 3)
            fmt.Printf("Push failed: %s\n", err)
        } else {
            fmt.Println("Push succeeded!")
            break
        }
    }
}

消费者

package main

import (
    "fmt"
    amqp "github.com/rabbitmq/amqp091-go"
    "github.com/trumanwong/go-tools/mq"
    "os"
    "time"
)

func main() {
    arguments := make(amqp.Table)
    arguments["x-max-priority"] = int64(9)
    mq.NewRabbitMQ(&mq.Options{
        Name:          os.Getenv("RABBITMQ_QUEUE_NAME"),
        Addr:          os.Getenv("RABBITMQ_ADDR"),
        PrefetchCount: 1,
        PrefetchSize:  0,
        Global:        false,
        Consume: func(msgs <-chan amqp.Delivery) {
            for d := range msgs {
                fmt.Println("receive data: ", string(d.Body))
                time.Sleep(10 * time.Second)
                _ = d.Ack(false)
            }
        },
        Arguments: arguments,
    })
    select {}
}

控制台输出结果:

2024/05/17 18:28:53 Attempting to connect
2024/05/17 18:28:53 Connected!
receive data:  Test Priority
receive data:  Hello, World!
receive data:  Hello, RabbitMQ!
receive data:  Hello, Go!

小结

x-max-priority属性

如果设置的优先级小于等于队列设置的x-max-priority属性,优先级有效。 如果设置的优先级大于队列设置的x-max-priority属性,则优先级失效。

创建优先级队列,需要增加x-max-priority参数,指定一个数字。表示最大的优先级,建议优先级设置为1~10之间。 发送消息的时候,需要设置priority属性,最好不要超过指定的最大的优先级。 如果生产端发送很慢,消费者消息很快,则有可能不会严格的按照优先级来进行消费。

  • 如果发送的消息的优先级属性小于设置的队列属性x-max-priority值,则按优先级的高低进行消费,数字越高则优先级越高。
  • 如果发送的消息的优先级属性都大于设置的队列属性x-max-priority值,则设置的优先级失效,按照入队列的顺序进行消费。
  • 如果消费端一直进行监听(prefetch_count为0或者消费速度大于生产的速度),而发送端一条条的发送消息,优先级属性也会失效。

prefetch_count参数

prefetch_count指定了每个消费者从队列中预取的消息数。如果将prefetch count设置为30,则每个消费者将在开始消费消息之前预取30条消息。这意味着消费者可以在处理完这30条消息之前,不必等待RabbitMQ向它们发送更多的消息。

prefetch_count参数仅仅在basic.consumeautoAck参数设置为false的前提下才生效,也就是不能使用自动确认,自动确认的消息没有办法限流。

关于prefetch_count参数的设置,RabbitMQ官方有一篇文章进行了分析:《Finding bottlenecks with RabbitMQ 3.3》

该文章分析了消息流控的整个流程,其中提到了prefetch_count参数的一些指标:

Prefetch limit Consumer utilisation
1 14%
3 25%
10 46%
30 70%
1000 74%

这里指出了,如果prefetch_count的值超过了30,那么网络带宽限制开始占主导地位,此时进一步增加prefetch_count的值就会变得收效甚微。也就是说,官方是建议把prefetch_count设置为30

应该综合带宽、每条消息的数据报大小、消费者线程处理的速率等等角度去考虑prefetch_count的设置。总结如下(个人经验仅供参考):

  • 当消费者线程的处理速度十分慢,而队列的消息量十分少的场景下,可以考虑把prefetch_count设置为1
  • 当队列中的每条消息的数据报十分大的时候,要计算好客户端可以容纳的未ack总消息量的内存极限,从而设计一个合理的prefetch_count值。
  • 当消费者线程的处理速度十分快,远远大于RabbitMQ服务端的消息分发,在网络带宽充足的前提下,设置可以把prefetch_count值设置为0,不做任何的消息流控。
  • 一般场景下,建议使用RabbitMQ官方的建议值30

prefetch_size参数

prefetch_size指定了每个消费者从队列中预取的字节数。这意味着,如果将prefetch size设置为1MB,则每个消费者将在开始消费消息之前预取1MB的消息。这样做的好处是可以确保消费者不会因为处理速度太慢而在RabbitMQ中积压过多的消息。

在实际使用中,根据应用场景的不同,可以选择使用prefetch_countprefetch_size或者两者结合使用。例如,如果处理速度比较快并且消息较小,则可以使用prefetch_count。但是,如果处理速度比较慢并且消息较大,则可以使用prefetch_size

文中代码已上传github