
创建队列时需要指定x-max-priority属性,并设置一个优先级数值。
package main
import (
"fmt"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/trumanwong/go-tools/mq"
"os"
"time"
)
func main() {
arguments := make(amqp.Table)
arguments["x-max-priority"] = int64(9)
queue := mq.NewRabbitMQ(&mq.Options{
Name: os.Getenv("RABBITMQ_QUEUE_NAME"),
Addr: os.Getenv("RABBITMQ_ADDR"),
PrefetchCount: 0,
PrefetchSize: 0,
Global: false,
Consume: nil,
Arguments: arguments,
})
messages := []string{"Hello, World!", "Hello, RabbitMQ!", "Hello, Go!"}
// Attempt to push a message every 2 seconds
for _, message := range messages {
for {
if err := queue.PushV2(amqp.Publishing{
ContentType: "text/plain",
Body: []byte(message),
Priority: 0,
}); err != nil {
fmt.Printf("Push failed: %s\n", err)
time.Sleep(time.Second * 3)
} else {
fmt.Println("Push succeeded!")
break
}
}
}
for {
if err := queue.PushV2(amqp.Publishing{
ContentType: "text/plain",
Body: []byte("Test Priority"),
Priority: 9,
}); err != nil {
time.Sleep(time.Second * 3)
fmt.Printf("Push failed: %s\n", err)
} else {
fmt.Println("Push succeeded!")
break
}
}
}package main
import (
"fmt"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/trumanwong/go-tools/mq"
"os"
"time"
)
func main() {
arguments := make(amqp.Table)
arguments["x-max-priority"] = int64(9)
mq.NewRabbitMQ(&mq.Options{
Name: os.Getenv("RABBITMQ_QUEUE_NAME"),
Addr: os.Getenv("RABBITMQ_ADDR"),
PrefetchCount: 1,
PrefetchSize: 0,
Global: false,
Consume: func(msgs <-chan amqp.Delivery) {
for d := range msgs {
fmt.Println("receive data: ", string(d.Body))
time.Sleep(10 * time.Second)
_ = d.Ack(false)
}
},
Arguments: arguments,
})
select {}
}控制台输出结果:
2024/05/17 18:28:53 Attempting to connect
2024/05/17 18:28:53 Connected!
receive data: Test Priority
receive data: Hello, World!
receive data: Hello, RabbitMQ!
receive data: Hello, Go!x-max-priority属性如果设置的优先级小于等于队列设置的x-max-priority属性,优先级有效。
如果设置的优先级大于队列设置的x-max-priority属性,则优先级失效。
创建优先级队列,需要增加x-max-priority参数,指定一个数字。表示最大的优先级,建议优先级设置为1~10之间。
发送消息的时候,需要设置priority属性,最好不要超过指定的最大的优先级。
如果生产端发送很慢,消费者消息很快,则有可能不会严格的按照优先级来进行消费。
prefetch_count参数prefetch_count指定了每个消费者从队列中预取的消息数。如果将prefetch count设置为30,则每个消费者将在开始消费消息之前预取30条消息。这意味着消费者可以在处理完这30条消息之前,不必等待RabbitMQ向它们发送更多的消息。
prefetch_count参数仅仅在basic.consume的autoAck参数设置为false的前提下才生效,也就是不能使用自动确认,自动确认的消息没有办法限流。
关于prefetch_count参数的设置,RabbitMQ官方有一篇文章进行了分析:《Finding bottlenecks with RabbitMQ 3.3》。
该文章分析了消息流控的整个流程,其中提到了prefetch_count参数的一些指标:
| Prefetch limit | Consumer utilisation |
|---|---|
| 1 | 14% |
| 3 | 25% |
| 10 | 46% |
| 30 | 70% |
| 1000 | 74% |
这里指出了,如果prefetch_count的值超过了30,那么网络带宽限制开始占主导地位,此时进一步增加prefetch_count的值就会变得收效甚微。也就是说,官方是建议把prefetch_count设置为30。
应该综合带宽、每条消息的数据报大小、消费者线程处理的速率等等角度去考虑prefetch_count的设置。总结如下(个人经验仅供参考):
prefetch_size指定了每个消费者从队列中预取的字节数。这意味着,如果将prefetch size设置为1MB,则每个消费者将在开始消费消息之前预取1MB的消息。这样做的好处是可以确保消费者不会因为处理速度太慢而在RabbitMQ中积压过多的消息。
在实际使用中,根据应用场景的不同,可以选择使用prefetch_count或prefetch_size或者两者结合使用。例如,如果处理速度比较快并且消息较小,则可以使用prefetch_count。但是,如果处理速度比较慢并且消息较大,则可以使用prefetch_size。
文中代码已上传github。