TrumanWong

Golang RabbitMQ自动重连/重发生产者

TrumanWong
5/14/2023

代码如下:

package mq

import (
    "context"
    "errors"
    amqp "github.com/rabbitmq/amqp091-go"
    "log"
    "os"
    "time"
)

type RabbitMQ struct {
    name            string
    logger          *log.Logger
    connection      *amqp.Connection
    channel         *amqp.Channel
    done            chan bool
    notifyConnClose chan *amqp.Error
    notifyChanClose chan *amqp.Error
    notifyConfirm   chan amqp.Confirmation
    isReady         bool
    consume         func(<-chan amqp.Delivery)
}

const (
    // When reconnecting to the server after connection failure
    reconnectDelay = 5 * time.Second

    // When setting up the channel after a channel exception
    reInitDelay = 2 * time.Second

    // When resending messages the server didn't confirm
    resendDelay = 5 * time.Second
)

var (
    errNotConnected  = errors.New("not connected to a server")
    errAlreadyClosed = errors.New("already closed: not connected to the server")
    errShutdown      = errors.New("rabbitMQ is shutting down")
)

// NewRabbitMQ creates a new consumer state instance, and automatically
// attempts to connect to the server.
func NewRabbitMQ(name string, addr string, consume func(<-chan amqp.Delivery)) *RabbitMQ {
    rabbitMQ := RabbitMQ{
        logger:  log.New(os.Stdout, "", log.LstdFlags),
        name:    name,
        done:    make(chan bool),
        consume: consume,
    }
    go rabbitMQ.handleReconnect(addr)
    return &rabbitMQ
}

// handleReconnect will wait for a connection error on
// notifyConnClose, and then continuously attempt to reconnect.
func (rabbitMQ *RabbitMQ) handleReconnect(addr string) {
    for {
        rabbitMQ.isReady = false
        log.Println("Attempting to connect")

        conn, err := rabbitMQ.connect(addr)

        if err != nil {
            log.Println("Failed to connect. Retrying..., err:", err)

            select {
            case <-rabbitMQ.done:
                return
            case <-time.After(reconnectDelay):
            }
            continue
        }

        if done := rabbitMQ.handleReInit(conn); done {
            break
        }
    }
}

// connect will create a new AMQP connection
func (rabbitMQ *RabbitMQ) connect(addr string) (*amqp.Connection, error) {
    conn, err := amqp.Dial(addr)

    if err != nil {
        return nil, err
    }

    rabbitMQ.changeConnection(conn)
    log.Println("Connected!")
    return conn, nil
}

// handleReconnect will wait for a channel error
// and then continuously attempt to re-initialize both channels
func (rabbitMQ *RabbitMQ) handleReInit(conn *amqp.Connection) bool {
    for {
        rabbitMQ.isReady = false

        err := rabbitMQ.init(conn)

        if err != nil {
            log.Println("Failed to initialize channel. Retrying..., err:", err)

            select {
            case <-rabbitMQ.done:
                return true
            case <-time.After(reInitDelay):
            }
            continue
        }

        select {
        case <-rabbitMQ.done:
            return true
        case <-rabbitMQ.notifyConnClose:
            log.Println("Connection closed. Reconnecting...")
            return false
        case <-rabbitMQ.notifyChanClose:
            log.Println("Channel closed. Re-running init...")
        }
    }
}

// init will initialize channel & declare queue
func (rabbitMQ *RabbitMQ) init(conn *amqp.Connection) error {
    ch, err := conn.Channel()

    if err != nil {
        return err
    }

    err = ch.Confirm(false)

    if err != nil {
        return err
    }
    _, err = ch.QueueDeclare(
        rabbitMQ.name,
        true,  // Durable
        false, // Delete when unused
        false, // Exclusive
        false, // No-wait
        nil,   // Arguments
    )

    if err != nil {
        return err
    }

    rabbitMQ.changeChannel(ch)
    rabbitMQ.isReady = true
    log.Println("Setup!")

    if rabbitMQ.consume != nil {
        go func() {
            msgs, err := rabbitMQ.Stream()
            if err == nil {
                rabbitMQ.consume(msgs)
            }
        }()
    }

    return nil
}

// changeConnection takes a new connection to the queue,
// and updates the close listener to reflect this.
func (rabbitMQ *RabbitMQ) changeConnection(connection *amqp.Connection) {
    rabbitMQ.connection = connection
    rabbitMQ.notifyConnClose = make(chan *amqp.Error)
    rabbitMQ.connection.NotifyClose(rabbitMQ.notifyConnClose)
}

// changeChannel takes a new channel to the queue,
// and updates the channel listeners to reflect this.
func (rabbitMQ *RabbitMQ) changeChannel(channel *amqp.Channel) {
    rabbitMQ.channel = channel
    rabbitMQ.notifyChanClose = make(chan *amqp.Error)
    rabbitMQ.notifyConfirm = make(chan amqp.Confirmation, 1)
    rabbitMQ.channel.NotifyClose(rabbitMQ.notifyChanClose)
    rabbitMQ.channel.NotifyPublish(rabbitMQ.notifyConfirm)
}

// Push will push data onto the queue, and wait for a confirm.
// If no confirms are received until within the resendTimeout,
// it continuously re-sends messages until a confirm is received.
// This will block until the server sends a confirm. Errors are
// only returned if the push action itself fails, see UnsafePush.
func (rabbitMQ *RabbitMQ) Push(data []byte) error {
    if !rabbitMQ.isReady {
        return errors.New("failed to push push: not connected")
    }
    for {
        err := rabbitMQ.UnsafePush(data)
        if err != nil {
            rabbitMQ.logger.Println("Push failed. Retrying...")
            select {
            case <-rabbitMQ.done:
                return errShutdown
            case <-time.After(resendDelay):
            }
            continue
        }
        select {
        case confirm := <-rabbitMQ.notifyConfirm:
            if confirm.Ack {
                rabbitMQ.logger.Println("Push confirmed!")
                return nil
            }
        case <-time.After(resendDelay):
        }
        rabbitMQ.logger.Println("Push didn't confirm. Retrying...")
    }
}

// UnsafePush will push to the queue without checking for
// confirmation. It returns an error if it fails to connect.
// No guarantees are provided for whether the server will
// recieve the message.
func (rabbitMQ *RabbitMQ) UnsafePush(data []byte) error {
    if !rabbitMQ.isReady {
        return errNotConnected
    }
    return rabbitMQ.channel.PublishWithContext(
        context.Background(),
        "",            // Exchange
        rabbitMQ.name, // Routing key
        false,         // Mandatory
        false,         // Immediate
        amqp.Publishing{
            ContentType: "text/plain",
            Body:        data,
        },
    )
}

// Stream will continuously put queue items on the channel.
// It is required to call delivery.Ack when it has been
// successfully processed, or delivery.Nack when it fails.
// Ignoring this will cause data to build up on the server.
func (rabbitMQ *RabbitMQ) Stream() (<-chan amqp.Delivery, error) {
    if !rabbitMQ.isReady {
        return nil, errNotConnected
    }
    return rabbitMQ.channel.Consume(
        rabbitMQ.name,
        "",    // Consumer
        false, // Auto-Ack
        false, // Exclusive
        false, // No-local
        false, // No-Wait
        nil,   // Args
    )
}

// Close will cleanly shutdown the channel and connection.
func (rabbitMQ *RabbitMQ) Close() error {
    if !rabbitMQ.isReady {
        return errAlreadyClosed
    }
    err := rabbitMQ.channel.Close()
    if err != nil {
        return err
    }
    err = rabbitMQ.connection.Close()
    if err != nil {
        return err
    }
    close(rabbitMQ.done)
    rabbitMQ.isReady = false
    return nil
}

// GetIsReady returns whether the queue is ready to be used.
func (rabbitMQ *RabbitMQ) GetIsReady() bool {
    return rabbitMQ.isReady
}

使用示例:

func ExampleRabbitMQ_Push() {
    name := "job_queue"
    addr := "amqp://guest:guest@localhost:5672/"
    queue := NewRabbitMQ(name, addr, nil)
    message := []byte("message")
    // Attempt to push a message every 2 seconds
    for {
        time.Sleep(time.Second * 3)
        if err := queue.Push(message); err != nil {
            fmt.Printf("Push failed: %s\n", err)
        } else {
            fmt.Println("Push succeeded!")
        }
    }
}

func ExampleRabbitMQ_Stream() {
    name := "job_queue"
    addr := "amqp://guest:guest@localhost:5672/"
    NewRabbitMQ(name, addr, func(deliveries <-chan amqp.Delivery) {
        // Consume messages
    })
}

文中代码已上传github