创建队列时需要指定x-max-priority
属性,并设置一个优先级数值。
package main
import (
"fmt"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/trumanwong/go-tools/mq"
"os"
"time"
)
func main() {
arguments := make(amqp.Table)
arguments["x-max-priority"] = int64(9)
queue := mq.NewRabbitMQ(&mq.Options{
Name: os.Getenv("RABBITMQ_QUEUE_NAME"),
Addr: os.Getenv("RABBITMQ_ADDR"),
PrefetchCount: 0,
PrefetchSize: 0,
Global: false,
Consume: nil,
Arguments: arguments,
})
messages := []string{"Hello, World!", "Hello, RabbitMQ!", "Hello, Go!"}
// Attempt to push a message every 2 seconds
for _, message := range messages {
for {
if err := queue.PushV2(amqp.Publishing{
ContentType: "text/plain",
Body: []byte(message),
Priority: 0,
}); err != nil {
fmt.Printf("Push failed: %s\n", err)
time.Sleep(time.Second * 3)
} else {
fmt.Println("Push succeeded!")
break
}
}
}
for {
if err := queue.PushV2(amqp.Publishing{
ContentType: "text/plain",
Body: []byte("Test Priority"),
Priority: 9,
}); err != nil {
time.Sleep(time.Second * 3)
fmt.Printf("Push failed: %s\n", err)
} else {
fmt.Println("Push succeeded!")
break
}
}
}
package main
import (
"fmt"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/trumanwong/go-tools/mq"
"os"
"time"
)
func main() {
arguments := make(amqp.Table)
arguments["x-max-priority"] = int64(9)
mq.NewRabbitMQ(&mq.Options{
Name: os.Getenv("RABBITMQ_QUEUE_NAME"),
Addr: os.Getenv("RABBITMQ_ADDR"),
PrefetchCount: 1,
PrefetchSize: 0,
Global: false,
Consume: func(msgs <-chan amqp.Delivery) {
for d := range msgs {
fmt.Println("receive data: ", string(d.Body))
time.Sleep(10 * time.Second)
_ = d.Ack(false)
}
},
Arguments: arguments,
})
select {}
}
控制台输出结果:
2024/05/17 18:28:53 Attempting to connect
2024/05/17 18:28:53 Connected!
receive data: Test Priority
receive data: Hello, World!
receive data: Hello, RabbitMQ!
receive data: Hello, Go!
x-max-priority
属性如果设置的优先级小于等于队列设置的x-max-priority
属性,优先级有效。
如果设置的优先级大于队列设置的x-max-priority
属性,则优先级失效。
创建优先级队列,需要增加x-max-priority
参数,指定一个数字。表示最大的优先级,建议优先级设置为1~10之间。
发送消息的时候,需要设置priority
属性,最好不要超过指定的最大的优先级。
如果生产端发送很慢,消费者消息很快,则有可能不会严格的按照优先级来进行消费。
prefetch_count
参数prefetch_count
指定了每个消费者从队列中预取的消息数。如果将prefetch count
设置为30,则每个消费者将在开始消费消息之前预取30条消息。这意味着消费者可以在处理完这30条消息之前,不必等待RabbitMQ
向它们发送更多的消息。
prefetch_count
参数仅仅在basic.consume
的autoAck
参数设置为false
的前提下才生效,也就是不能使用自动确认,自动确认的消息没有办法限流。
关于prefetch_count
参数的设置,RabbitMQ
官方有一篇文章进行了分析:《Finding bottlenecks with RabbitMQ 3.3》。
该文章分析了消息流控的整个流程,其中提到了prefetch_count
参数的一些指标:
Prefetch limit | Consumer utilisation |
---|---|
1 | 14% |
3 | 25% |
10 | 46% |
30 | 70% |
1000 | 74% |
这里指出了,如果prefetch_count
的值超过了30
,那么网络带宽限制开始占主导地位,此时进一步增加prefetch_count
的值就会变得收效甚微。也就是说,官方是建议把prefetch_count
设置为30
。
应该综合带宽、每条消息的数据报大小、消费者线程处理的速率等等角度去考虑prefetch_count
的设置。总结如下(个人经验仅供参考):
prefetch_size
指定了每个消费者从队列中预取的字节数。这意味着,如果将prefetch size
设置为1MB
,则每个消费者将在开始消费消息之前预取1MB
的消息。这样做的好处是可以确保消费者不会因为处理速度太慢而在RabbitMQ
中积压过多的消息。
在实际使用中,根据应用场景的不同,可以选择使用prefetch_count
或prefetch_size
或者两者结合使用。例如,如果处理速度比较快并且消息较小,则可以使用prefetch_count
。但是,如果处理速度比较慢并且消息较大,则可以使用prefetch_size
。
文中代码已上传github
。