126 lines
3.3 KiB
Go
126 lines
3.3 KiB
Go
package azsb
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
|
|
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
|
|
"github.com/ThreeDotsLabs/watermill/message"
|
|
)
|
|
|
|
func (a *AzBus) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error) {
|
|
a.closedMutex.RLock()
|
|
if a.closed {
|
|
a.closedMutex.RUnlock()
|
|
return nil, fmt.Errorf("subscriber is closed")
|
|
}
|
|
a.closedMutex.RUnlock()
|
|
|
|
// Create receiver for the subscription
|
|
// In Azure Service Bus, you need to create a subscription for a topic before subscribing
|
|
// The subscription name should match what was created in Azure Service Bus
|
|
// Default: use topic name with "-subscription" suffix
|
|
// You should create the subscription in Azure Service Bus beforehand or make this configurable
|
|
subscriptionName := topic + "-subscription"
|
|
|
|
receiver, err := a.client.NewReceiverForSubscription(topic, subscriptionName, nil)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create receiver for topic %s subscription %s: %w. Note: Subscription must be created in Azure Service Bus first", topic, subscriptionName, err)
|
|
}
|
|
|
|
messages := make(chan *message.Message, 100)
|
|
|
|
go func() {
|
|
defer close(messages)
|
|
defer receiver.Close(context.Background())
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
a.logger.Info().Str("topic", topic).Msg("subscription context cancelled")
|
|
return
|
|
default:
|
|
// Check if closed
|
|
a.closedMutex.RLock()
|
|
if a.closed {
|
|
a.closedMutex.RUnlock()
|
|
return
|
|
}
|
|
a.closedMutex.RUnlock()
|
|
|
|
// Receive messages
|
|
messages2, err := receiver.ReceiveMessages(ctx, 1, nil)
|
|
if err != nil {
|
|
if ctx.Err() != nil {
|
|
return
|
|
}
|
|
a.logger.Error().
|
|
Err(err).
|
|
Str("topic", topic).
|
|
Msg("failed to receive messages from azure service bus")
|
|
continue
|
|
}
|
|
|
|
for _, sbMsg := range messages2 {
|
|
watermillMsg := a.convertToWatermillMessage(sbMsg)
|
|
|
|
select {
|
|
case messages <- watermillMsg:
|
|
// Message sent successfully
|
|
// Complete the message
|
|
if err := receiver.CompleteMessage(ctx, sbMsg, nil); err != nil {
|
|
a.logger.Error().
|
|
Err(err).
|
|
Str("message_id", watermillMsg.UUID).
|
|
Msg("failed to complete message")
|
|
}
|
|
case <-ctx.Done():
|
|
// Context cancelled, abandon the message
|
|
if err := receiver.AbandonMessage(ctx, sbMsg, nil); err != nil {
|
|
a.logger.Error().
|
|
Err(err).
|
|
Str("message_id", watermillMsg.UUID).
|
|
Msg("failed to abandon message")
|
|
}
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
a.logger.Info().
|
|
Str("topic", topic).
|
|
Str("subscription", subscriptionName).
|
|
Msg("started subscribing to azure service bus")
|
|
|
|
return messages, nil
|
|
}
|
|
|
|
func (a *AzBus) convertToWatermillMessage(sbMsg *azservicebus.ReceivedMessage) *message.Message {
|
|
msg := message.NewMessage("", sbMsg.Body)
|
|
|
|
// Set message ID
|
|
if sbMsg.MessageID != "=" {
|
|
msg.UUID = sbMsg.MessageID
|
|
}
|
|
|
|
// Copy application properties to metadata
|
|
if sbMsg.ApplicationProperties != nil {
|
|
msg.Metadata = make(message.Metadata)
|
|
for key, value := range sbMsg.ApplicationProperties {
|
|
if strValue, ok := value.(string); ok {
|
|
msg.Metadata[key] = strValue
|
|
} else {
|
|
// Convert non-string values to string
|
|
if jsonValue, err := json.Marshal(value); err == nil {
|
|
msg.Metadata[key] = string(jsonValue)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return msg
|
|
}
|