package azsb import ( "context" "fmt" "sync" "github.com/Azure/azure-sdk-for-go/sdk/azidentity" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" "github.com/ThreeDotsLabs/watermill/message" "github.com/rs/zerolog" ) type AzBus struct { client *azservicebus.Client logger zerolog.Logger closed bool closedMutex sync.RWMutex } type Config struct { ConnectionString string UseManagedIdentity bool Namespace string } // NewAzBus creates a new Azure Service Bus publisher and subscriber func NewAzBus(cfg Config, logger zerolog.Logger) (message.Subscriber, message.Publisher, error) { var client *azservicebus.Client var err error if cfg.UseManagedIdentity { // Use managed identity if cfg.Namespace == "" { return nil, nil, fmt.Errorf("azure service bus namespace is required when using managed identity") } credential, credErr := azidentity.NewDefaultAzureCredential(nil) if credErr != nil { return nil, nil, fmt.Errorf("failed to create azure credential: %w", credErr) } namespace := cfg.Namespace client, err = azservicebus.NewClient(namespace, credential, nil) if err != nil { return nil, nil, fmt.Errorf("failed to create azure service bus client: %w", err) } } else { // Use connection string if cfg.ConnectionString == "" { return nil, nil, fmt.Errorf("azure service bus connection string is not configured") } client, err = azservicebus.NewClientFromConnectionString(cfg.ConnectionString, nil) if err != nil { return nil, nil, fmt.Errorf("failed to create azure service bus client: %w", err) } } azb := &AzBus{client: client, logger: logger, closed: false, closedMutex: sync.RWMutex{}} return azb, azb, nil } func (a *AzBus) Close() error { if a.closed { return nil } if a.client != nil { if err := a.client.Close(context.Background()); err != nil { a.logger.Error().Err(err).Msg("failed to close azure service bus client") return err } } a.closed = true a.logger.Info().Msg("azure service bus publisher closed") return nil }