Files
base/pkg/rabbit/consumer.go
2026-04-10 18:25:21 +03:30

201 lines
4.6 KiB
Go

package rabbitmq
import (
"context"
"errors"
"fmt"
"sync"
"time"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/rs/zerolog"
)
type consumer struct {
connectionManager ConnectionManager
handler MessageHandler
opts *ConsumerOptions
logger zerolog.Logger
isConsuming bool
consumeMutex sync.RWMutex
shutdownCh chan struct{}
wg sync.WaitGroup
}
func NewConsumer(connectionManager ConnectionManager, handler MessageHandler, opts *ConsumerOptions, logger zerolog.Logger) Consumer {
return &consumer{
connectionManager: connectionManager,
handler: handler,
opts: opts,
logger: logger,
shutdownCh: make(chan struct{}),
}
}
func (c *consumer) Consume(ctx context.Context) error {
c.consumeMutex.Lock()
if c.isConsuming {
c.consumeMutex.Unlock()
return fmt.Errorf("consumer is already consuming")
}
c.isConsuming = true
c.consumeMutex.Unlock()
defer func() {
c.consumeMutex.Lock()
c.isConsuming = false
c.consumeMutex.Unlock()
}()
c.logger.Info().Msgf("starting consumer for queue %s", c.opts.Queue)
for {
select {
case <-ctx.Done():
c.logger.Info().Bool("withErr", ctx.Err() != nil).Msgf("stopping consumer for queue %s", c.opts.Queue)
return ctx.Err()
case <-c.shutdownCh:
c.logger.Info().Msgf("stopping consumer for queue %s with shoutdown", c.opts.Queue)
return nil
default:
if err := c.consumeLoop(ctx, c.opts.Queue, c.handler); err != nil {
c.logger.Error().
Err(err).
Str("errType", fmt.Sprintf("%T", err)).
Msgf("error consuming message for queue %s: %s", c.opts.Queue, err)
// If it's a connection error, wait and retry
var connectionError *ConnectionError
if errors.As(err, &connectionError) {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(c.opts.ReconnectWait):
continue
}
}
// if consume error occurred (including delivery channel closed), wait and retry
var consumeErr *ConsumeError
if errors.As(err, &consumeErr) {
c.logger.Warn().Err(errors.Unwrap(consumeErr)).Msg("consume error, will retry")
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(c.opts.ReconnectWait):
continue
}
}
continue
}
}
}
}
func (c *consumer) consumeLoop(ctx context.Context, queue string, handler MessageHandler) error {
ch, err := c.connectionManager.GetChannel()
if err != nil {
return NewConsumeError(queue, err)
}
if c.opts.PrefetchCount > 0 {
err = ch.Qos(
c.opts.PrefetchCount,
c.opts.PrefetchSize,
false,
)
if err != nil {
ch.Close()
return NewConnectionError("set channel QoS", err)
}
}
defer c.connectionManager.ReturnChannel(ch)
// Start consuming
deliveries, err := ch.Consume(
queue,
c.opts.ConsumerTag,
c.opts.AutoAck,
c.opts.Exclusive,
c.opts.NoLocal,
c.opts.NoWait,
c.opts.Args,
)
if err != nil {
return NewConsumeError(queue, fmt.Errorf("failed to start consuming: %w", err))
}
c.logger.Info().Msgf("starting consumer for queue %s", queue)
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-c.shutdownCh:
return nil
case delivery, ok := <-deliveries:
if !ok {
c.logger.Warn().Msgf("delivery channel closed for queue %s, will retry", queue)
return NewConsumeError(queue, fmt.Errorf("delivery channel closed"))
}
c.wg.Add(1)
go func(d amqp.Delivery) {
defer c.wg.Done()
c.handleDelivery(ctx, d, handler)
}(delivery)
}
}
}
func (c *consumer) handleDelivery(ctx context.Context, delivery amqp.Delivery, handler MessageHandler) {
msg := c.deliveryToMessage(delivery)
handler(ctx, msg)
}
func (c *consumer) deliveryToMessage(delivery amqp.Delivery) *Message {
headers := make(map[string]interface{})
for k, v := range delivery.Headers {
headers[k] = v
}
msg := &Message{
ID: delivery.MessageId,
Body: delivery.Body,
ContentType: delivery.ContentType,
Headers: headers,
Timestamp: delivery.Timestamp,
Expiration: delivery.Expiration,
Priority: delivery.Priority,
DeliveryMode: delivery.DeliveryMode,
ReplyTo: delivery.ReplyTo,
CorrelationID: delivery.CorrelationId,
delivery: &delivery, // Attach delivery for acknowledgment
acknowledged: false,
}
// Set ID from headers if not available in MessageId
if msg.ID == "" {
if id, ok := headers["x-message-id"].(string); ok {
msg.ID = id
}
}
return msg
}
func (c *consumer) Close() error {
c.logger.Info().Msg("closing consumer")
// Signal shutdown
close(c.shutdownCh)
// Wait for all message handlers to complete
c.wg.Wait()
return nil
}