Golang RabbitMQ自动重连/重发生产者

TrumanWong
5/14/2023
TrumanWong

代码如下:

package mq

import (
	"context"
	"errors"
	amqp "github.com/rabbitmq/amqp091-go"
	"log"
	"os"
	"time"
)

type RabbitMQ struct {
	name            string
	logger          *log.Logger
	connection      *amqp.Connection
	channel         *amqp.Channel
	done            chan bool
	notifyConnClose chan *amqp.Error
	notifyChanClose chan *amqp.Error
	notifyConfirm   chan amqp.Confirmation
	isReady         bool
	consume         func(<-chan amqp.Delivery)
}

const (
	// When reconnecting to the server after connection failure
	reconnectDelay = 5 * time.Second

	// When setting up the channel after a channel exception
	reInitDelay = 2 * time.Second

	// When resending messages the server didn't confirm
	resendDelay = 5 * time.Second
)

var (
	errNotConnected  = errors.New("not connected to a server")
	errAlreadyClosed = errors.New("already closed: not connected to the server")
	errShutdown      = errors.New("rabbitMQ is shutting down")
)

// NewRabbitMQ creates a new consumer state instance, and automatically
// attempts to connect to the server.
func NewRabbitMQ(name string, addr string, consume func(<-chan amqp.Delivery)) *RabbitMQ {
	rabbitMQ := RabbitMQ{
		logger:  log.New(os.Stdout, "", log.LstdFlags),
		name:    name,
		done:    make(chan bool),
		consume: consume,
	}
	go rabbitMQ.handleReconnect(addr)
	return &rabbitMQ
}

// handleReconnect will wait for a connection error on
// notifyConnClose, and then continuously attempt to reconnect.
func (rabbitMQ *RabbitMQ) handleReconnect(addr string) {
	for {
		rabbitMQ.isReady = false
		log.Println("Attempting to connect")

		conn, err := rabbitMQ.connect(addr)

		if err != nil {
			log.Println("Failed to connect. Retrying..., err:", err)

			select {
			case <-rabbitMQ.done:
				return
			case <-time.After(reconnectDelay):
			}
			continue
		}

		if done := rabbitMQ.handleReInit(conn); done {
			break
		}
	}
}

// connect will create a new AMQP connection
func (rabbitMQ *RabbitMQ) connect(addr string) (*amqp.Connection, error) {
	conn, err := amqp.Dial(addr)

	if err != nil {
		return nil, err
	}

	rabbitMQ.changeConnection(conn)
	log.Println("Connected!")
	return conn, nil
}

// handleReconnect will wait for a channel error
// and then continuously attempt to re-initialize both channels
func (rabbitMQ *RabbitMQ) handleReInit(conn *amqp.Connection) bool {
	for {
		rabbitMQ.isReady = false

		err := rabbitMQ.init(conn)

		if err != nil {
			log.Println("Failed to initialize channel. Retrying..., err:", err)

			select {
			case <-rabbitMQ.done:
				return true
			case <-time.After(reInitDelay):
			}
			continue
		}

		select {
		case <-rabbitMQ.done:
			return true
		case <-rabbitMQ.notifyConnClose:
			log.Println("Connection closed. Reconnecting...")
			return false
		case <-rabbitMQ.notifyChanClose:
			log.Println("Channel closed. Re-running init...")
		}
	}
}

// init will initialize channel & declare queue
func (rabbitMQ *RabbitMQ) init(conn *amqp.Connection) error {
	ch, err := conn.Channel()

	if err != nil {
		return err
	}

	err = ch.Confirm(false)

	if err != nil {
		return err
	}
	_, err = ch.QueueDeclare(
		rabbitMQ.name,
		true,  // Durable
		false, // Delete when unused
		false, // Exclusive
		false, // No-wait
		nil,   // Arguments
	)

	if err != nil {
		return err
	}

	rabbitMQ.changeChannel(ch)
	rabbitMQ.isReady = true
	log.Println("Setup!")

	if rabbitMQ.consume != nil {
		go func() {
			msgs, err := rabbitMQ.Stream()
			if err == nil {
				rabbitMQ.consume(msgs)
			}
		}()
	}

	return nil
}

// changeConnection takes a new connection to the queue,
// and updates the close listener to reflect this.
func (rabbitMQ *RabbitMQ) changeConnection(connection *amqp.Connection) {
	rabbitMQ.connection = connection
	rabbitMQ.notifyConnClose = make(chan *amqp.Error)
	rabbitMQ.connection.NotifyClose(rabbitMQ.notifyConnClose)
}

// changeChannel takes a new channel to the queue,
// and updates the channel listeners to reflect this.
func (rabbitMQ *RabbitMQ) changeChannel(channel *amqp.Channel) {
	rabbitMQ.channel = channel
	rabbitMQ.notifyChanClose = make(chan *amqp.Error)
	rabbitMQ.notifyConfirm = make(chan amqp.Confirmation, 1)
	rabbitMQ.channel.NotifyClose(rabbitMQ.notifyChanClose)
	rabbitMQ.channel.NotifyPublish(rabbitMQ.notifyConfirm)
}

// Push will push data onto the queue, and wait for a confirm.
// If no confirms are received until within the resendTimeout,
// it continuously re-sends messages until a confirm is received.
// This will block until the server sends a confirm. Errors are
// only returned if the push action itself fails, see UnsafePush.
func (rabbitMQ *RabbitMQ) Push(data []byte) error {
	if !rabbitMQ.isReady {
		return errors.New("failed to push push: not connected")
	}
	for {
		err := rabbitMQ.UnsafePush(data)
		if err != nil {
			rabbitMQ.logger.Println("Push failed. Retrying...")
			select {
			case <-rabbitMQ.done:
				return errShutdown
			case <-time.After(resendDelay):
			}
			continue
		}
		select {
		case confirm := <-rabbitMQ.notifyConfirm:
			if confirm.Ack {
				rabbitMQ.logger.Println("Push confirmed!")
				return nil
			}
		case <-time.After(resendDelay):
		}
		rabbitMQ.logger.Println("Push didn't confirm. Retrying...")
	}
}

// UnsafePush will push to the queue without checking for
// confirmation. It returns an error if it fails to connect.
// No guarantees are provided for whether the server will
// recieve the message.
func (rabbitMQ *RabbitMQ) UnsafePush(data []byte) error {
	if !rabbitMQ.isReady {
		return errNotConnected
	}
	return rabbitMQ.channel.PublishWithContext(
		context.Background(),
		"",            // Exchange
		rabbitMQ.name, // Routing key
		false,         // Mandatory
		false,         // Immediate
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        data,
		},
	)
}

// Stream will continuously put queue items on the channel.
// It is required to call delivery.Ack when it has been
// successfully processed, or delivery.Nack when it fails.
// Ignoring this will cause data to build up on the server.
func (rabbitMQ *RabbitMQ) Stream() (<-chan amqp.Delivery, error) {
	if !rabbitMQ.isReady {
		return nil, errNotConnected
	}
	return rabbitMQ.channel.Consume(
		rabbitMQ.name,
		"",    // Consumer
		false, // Auto-Ack
		false, // Exclusive
		false, // No-local
		false, // No-Wait
		nil,   // Args
	)
}

// Close will cleanly shutdown the channel and connection.
func (rabbitMQ *RabbitMQ) Close() error {
	if !rabbitMQ.isReady {
		return errAlreadyClosed
	}
	err := rabbitMQ.channel.Close()
	if err != nil {
		return err
	}
	err = rabbitMQ.connection.Close()
	if err != nil {
		return err
	}
	close(rabbitMQ.done)
	rabbitMQ.isReady = false
	return nil
}

// GetIsReady returns whether the queue is ready to be used.
func (rabbitMQ *RabbitMQ) GetIsReady() bool {
	return rabbitMQ.isReady
}

使用示例:

func ExampleRabbitMQ_Push() {
	name := "job_queue"
	addr := "amqp://guest:guest@localhost:5672/"
	queue := NewRabbitMQ(name, addr, nil)
	message := []byte("message")
	// Attempt to push a message every 2 seconds
	for {
		time.Sleep(time.Second * 3)
		if err := queue.Push(message); err != nil {
			fmt.Printf("Push failed: %s\n", err)
		} else {
			fmt.Println("Push succeeded!")
		}
	}
}

func ExampleRabbitMQ_Stream() {
	name := "job_queue"
	addr := "amqp://guest:guest@localhost:5672/"
	NewRabbitMQ(name, addr, func(deliveries <-chan amqp.Delivery) {
		// Consume messages
	})
}

文中代码已上传github