This library is a wrapper of Rabbitmq official library amqp091-go. Users do not need to care about connections and channels, but only need to focus on their specific business.
gorabbit implements connection closing and reconnection. The amqp091-go library maintained by the RabbitMQ team does not care about the connection status. When registering a client according to a simple example, it may not be a safe solution.
- Maintain a RabbitMQ connection, monitor the connection status and cache it globally. When a connection is disconnected, it will reconnect and update to the global cache.
- Different operation roles will open separate channels, and while opening the channel, they will also monitor its status. Once closed, the channel will be reopened. And notify the channel caller, such as the consumer
gorabbit encapsulates the producer (publisher). Users only need to declare some configurations to register. Currently, it supports publishing simple messages and custom messages.
gorabbit encapsulates the consumer (consumer). Users only need to declare some configurations and implement a listening function to register.
- The consumer listening interface function will be called back to the implemented listening function when a message is received. You only need to focus on the business logic.
You can view the _examples directory, which contains consumer and producer examples, or you can view the instructions below. You can also submit an issue.
is a unified and integrated client configuration structure, which includes client connection configuration and role declaration configuration. The specific structure is as follows.
type Client struct {
Config ConnectionConfig // Connection configurationConsumers
[]IConsumer // Consumer listPublishers
[]IPublisher // Producer list
When using, you need to declare the connection configuration to provide to gorabbit for connection and reconnection.
rabbitClient := &gorabbit.Client{
Config: gorabbit.ConnectionConfig{
Host: "",
Port: "5672",
UserName: "admin",
Password: "admin",
VHost: "/",
- gorabbit.Consumer Description
type Consumer struct {
Queue Queue // Queue configuration
ConsumerConfig ConsumerConfig // Consumer configuration
QueueBinding QueueBinding // Queue binding configuration
Complete configuration set
Queue: gorabbit.Queue{
Name: "", // Queue name
Durable: false, // Whether to persist
AutoDelete: false, // Whether to automatically delete
Exclusive: false, // Whether exclusive
NoWait: false, // Whether to wait for RabbitMQ server confirmation
Args: nil, // Other custom parameters
ConsumerConfig: gorabbit.ConsumerConfig{
Tag: "", // Consumer tag
AutoAck: false, // Automatic confirmation
Exclusive: false, // Exclusive
NoLocal: false, // No matter
NoWait: false, // Wait for RabbitMQ server confirmation
Args: nil, // Other custom parameters
QueueBinding: gorabbit.QueueBinding{
Exchange: gorabbit.Exchange{ // Exchange configuration, automatic creation
Name: "", // Exchange name
Kind: "", // Exchange type
Durable: false, // Whether to persist
AutoDelete: false, // Whether to automatically delete
Internal: false, // Is it an internal switch, don't worry about it
NoWait: false, // Whether to wait for RabbitMQ server confirmation
Args: nil, // Other custom parameters
RoutingKey: []string{QueueName}, // routingkey
Register a simple consumer example
Declare a consumer structure and inherit gorabbit.Consumer
Implement the Listener function
type ExampleConsumer struct { gorabbit.Consumer } func (c *ExampleConsumer) Listener(delivery *amqp.Delivery) { body := string(delivery.Body) log.Printf("a message was received:%s", body) }
Then register to the client
rabbitClient := &gorabbit.Client{ Config: gorabbit.ConnectionConfig{ Host: "", Port: : "5672", UserName: "admin", Password: "admin", VHost: "/", }, Consumers: []gorabbit.IConsumer{ &ExampleConsumer{ Consumer: gorabbit.Consumer{ Queue: gorabbit.Queue{ Name: "example-queue-1", Durable: true, AutoDelete: false, }, ConsumerConfig: gorabbit.ConsumerConfig{ AutoAck: true, }, QueueBinding: gorabbit.QueueBinding{ Exchange: gorabbit.Exchange{ Name: "example-exchange-1", Kind: "direct", Durable: true, }, RoutingKey: []string{"example-queue-1"}, }, }, }, }, } rabbitClient.Init()
Another way to create a consumer (optional)
Declare a consumer struct and inherit gorabbit.Consumer
Implement Listener function and BuildConsumer function
type ExampleConsumer struct { gorabbit.Consumer } func (c *ExampleConsumer) BuildConsumer() gorabbit.Consumer { c.Consumer = gorabbit.Consumer{ Queue: gorabbit.Queue{ Name: QueueName2, Durable: true, AutoDelete: false, }, ConsumerConfig: gorabbit.ConsumerConfig{ AutoAck: true, }, QueueBinding: gorabbit.QueueBinding{ Exchange: gorabbit.Exchange{ Name: ExchangeName2, Kind: "direct", Durable: true, }, RoutingKey: []string{QueueName2}, NoWait: false, }, } return c.Consumer } func (c *ExampleConsumer) Listener(delivery *amqp.Delivery) { body := string(delivery.Body) log.Printf("a message was received: %s, ConsumeConfig: %s", body, c.Queue.Name) }
Then register to the client
rabbitClient := &gorabbit.Client{ Config: gorabbit.ConnectionConfig{ Host: "", Port: "5672", UserName: "admin", Password: "admin", VHost: "/", }, Consumers: []gorabbit.IConsumer{ &ExampleConsumer{}, }, } rabbitClient.Init()
- gorabbit.Publisher Description
type Publisher struct {
mtx sync.RWMutex // read-write lock
ch *amqp.Channel // connection channel
PublisherConfig PublisherConfig // producer configuration
Complete configuration set
ExchangeName: "", // exchange name
RoutingKey: "", // routingkey
Mandatory: false, // Whether to respond to the producer when the corresponding switch and routingkey cannot find the corresponding queue
Immediate: false, // Whether to detect the existence of consumers after the message is sent, set to true if there is no consumer in the queue and the message will not be entered into the queue and returned
A simple producer example
Declare a producer structure and inherit gorabbit.Publisher
type ExamplePublisher struct { gorabbit.Publisher }
Then register it in the client
publisher := &ExamplePublisher{ Publisher: gorabbit.Publisher{ PublisherConfig: gorabbit.PublisherConfig{ ExchangeName: "", RoutingKey: "", Mandatory: false, Immediate: false, }, }, } rabbitClient := &gorabbit.Client{ Config: gorabbit.ConnectionConfig{ Host: "", Port: "5672", UserName: "admin", Password: "admin", VHost: "/", }, Publishers: []gorabbit.IPublisher{ publisher, }, } rabbitClient.Init() for { time.Sleep(5 * time.Second) err := publisher.SimpleSend([]byte("a test message")) if err != nil { log.Println(err) } }
Another way to create a producer (optional)
Declare a producer structure and inherit goribbit.Publisher
Implement the BuildPublisher function
type ExamplePublisher struct { gorabbit.Publisher } func (p *ExamplePublisher) BuildPublisher() *gorabbit.PublisherConfig { p.PublisherConfig = PublisherConfig: gorabbit.PublisherConfig{ ExchangeName: "", RoutingKey: "", Mandatory: false, Immediate: false, }, return &p.PublisherConfig }
Then register to the client
publisher := &ExamplePublisher{} rabbitClient := &gorabbit.Client{ Config: gorabbit.ConnectionConfig{ Host: "", Port: "5672", UserName: "admin", Password: "admin", VHost: "/", }, Publishers: []gorabbit.IPublisher{ publisher, }, } rabbitClient.Init() for { time.Sleep(5 * time.Second) err := publisher.SimpleSend([]byte("a test message")) if err != nil { log.Println(err) } }
Producer message sending
Currently, there are only two functions for producers to send messages:
publisher.SimpleSend, simple message delivery, all parameters are default amqp default, only need to pass in Body
publisher.CustomSend, custom message delivery, use amqp.Publishing as a parameter, you can customize the delivery parameters
publisher.SimpleSend([]byte("a test message")) publisher.CustomSend(&amqp.Publishing{ Headers: nil, ContentType: "", ContentEncoding: "", DeliveryMode: 0, Priority: 0, CorrelationId: "", ReplyTo: "", Expiration: "", MessageId: "", Timestamp: time.Time{}, Type: "", UserId: "", AppId: "", Body: nil, })
Rewrite the send function
The sending function supports rewriting and user customization
func (p *ExamplePublisher) SimpleSend(body []byte) error {
// do something .........
err := p.CustomSend(&amqp.Publishing{
Headers: nil,
ContentType: "application/octet-stream",
ContentEncoding: "",
DeliveryMode: 0,
Priority: 0,
CorrelationId: "",
ReplyTo: "",
Expiration: "",
MessageId: "",
Timestamp: time.Time{},
Type: "",
UserId: "",
AppId: "",
Body: body,
// do something .........
return err
func (p *ExamplePublisher) CustomSend(msg *amqp.Publishing) error {
// do something .........
ch := p.GetCh()
if ch.IsClosed() {
return errors.New("publisher send failed, because channel is closed")
err := ch.Publish(
p.PublisherConfig.ExchangeName, // exchange
p.PublisherConfig.RoutingKey, // routing key
p.PublisherConfig.Mandatory, // mandatory
p.PublisherConfig.Immediate, // immediate
if err != nil {
return errors.New(fmt.Sprintf("publisher send failed, error: %v", err))
// do something .........
return nil