RabbitMQ优先级队列

TrumanWong
5/18/2024
TrumanWong

创建队列时需要指定x-max-priority属性,并设置一个优先级数值。

生产者

package main

import (
	"fmt"
	amqp "github.com/rabbitmq/amqp091-go"
	"github.com/trumanwong/go-tools/mq"
	"os"
	"time"
)

func main() {
	arguments := make(amqp.Table)
	arguments["x-max-priority"] = int64(9)
	queue := mq.NewRabbitMQ(&mq.Options{
		Name:          os.Getenv("RABBITMQ_QUEUE_NAME"),
		Addr:          os.Getenv("RABBITMQ_ADDR"),
		PrefetchCount: 0,
		PrefetchSize:  0,
		Global:        false,
		Consume:       nil,
		Arguments:     arguments,
	})
	messages := []string{"Hello, World!", "Hello, RabbitMQ!", "Hello, Go!"}
	// Attempt to push a message every 2 seconds
	for _, message := range messages {
		for {
			if err := queue.PushV2(amqp.Publishing{
				ContentType: "text/plain",
				Body:        []byte(message),
				Priority:    0,
			}); err != nil {
				fmt.Printf("Push failed: %s\n", err)
				time.Sleep(time.Second * 3)
			} else {
				fmt.Println("Push succeeded!")
				break
			}
		}
	}
	for {
		if err := queue.PushV2(amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte("Test Priority"),
			Priority:    9,
		}); err != nil {
			time.Sleep(time.Second * 3)
			fmt.Printf("Push failed: %s\n", err)
		} else {
			fmt.Println("Push succeeded!")
			break
		}
	}
}

消费者

package main

import (
	"fmt"
	amqp "github.com/rabbitmq/amqp091-go"
	"github.com/trumanwong/go-tools/mq"
	"os"
	"time"
)

func main() {
	arguments := make(amqp.Table)
	arguments["x-max-priority"] = int64(9)
	mq.NewRabbitMQ(&mq.Options{
		Name:          os.Getenv("RABBITMQ_QUEUE_NAME"),
		Addr:          os.Getenv("RABBITMQ_ADDR"),
		PrefetchCount: 1,
		PrefetchSize:  0,
		Global:        false,
		Consume: func(msgs <-chan amqp.Delivery) {
			for d := range msgs {
				fmt.Println("receive data: ", string(d.Body))
				time.Sleep(10 * time.Second)
				_ = d.Ack(false)
			}
		},
		Arguments: arguments,
	})
	select {}
}

控制台输出结果:

2024/05/17 18:28:53 Attempting to connect
2024/05/17 18:28:53 Connected!
receive data:  Test Priority
receive data:  Hello, World!
receive data:  Hello, RabbitMQ!
receive data:  Hello, Go!

小结

x-max-priority属性

如果设置的优先级小于等于队列设置的x-max-priority属性,优先级有效。 如果设置的优先级大于队列设置的x-max-priority属性,则优先级失效。

创建优先级队列,需要增加x-max-priority参数,指定一个数字。表示最大的优先级,建议优先级设置为1~10之间。 发送消息的时候,需要设置priority属性,最好不要超过指定的最大的优先级。 如果生产端发送很慢,消费者消息很快,则有可能不会严格的按照优先级来进行消费。

prefetch_count参数

prefetch_count指定了每个消费者从队列中预取的消息数。如果将prefetch count设置为30,则每个消费者将在开始消费消息之前预取30条消息。这意味着消费者可以在处理完这30条消息之前,不必等待RabbitMQ向它们发送更多的消息。

prefetch_count参数仅仅在basic.consumeautoAck参数设置为false的前提下才生效,也就是不能使用自动确认,自动确认的消息没有办法限流。

关于prefetch_count参数的设置,RabbitMQ官方有一篇文章进行了分析:《Finding bottlenecks with RabbitMQ 3.3》

该文章分析了消息流控的整个流程,其中提到了prefetch_count参数的一些指标:

Prefetch limitConsumer utilisation
114%
325%
1046%
3070%
100074%

这里指出了,如果prefetch_count的值超过了30,那么网络带宽限制开始占主导地位,此时进一步增加prefetch_count的值就会变得收效甚微。也就是说,官方是建议把prefetch_count设置为30

应该综合带宽、每条消息的数据报大小、消费者线程处理的速率等等角度去考虑prefetch_count的设置。总结如下(个人经验仅供参考):

prefetch_size参数

prefetch_size指定了每个消费者从队列中预取的字节数。这意味着,如果将prefetch size设置为1MB,则每个消费者将在开始消费消息之前预取1MB的消息。这样做的好处是可以确保消费者不会因为处理速度太慢而在RabbitMQ中积压过多的消息。

在实际使用中,根据应用场景的不同,可以选择使用prefetch_countprefetch_size或者两者结合使用。例如,如果处理速度比较快并且消息较小,则可以使用prefetch_count。但是,如果处理速度比较慢并且消息较大,则可以使用prefetch_size

文中代码已上传github