81 lines
2.0 KiB
Go
81 lines
2.0 KiB
Go
package azsb
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
|
|
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
|
|
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
|
|
"github.com/ThreeDotsLabs/watermill/message"
|
|
"github.com/rs/zerolog"
|
|
)
|
|
|
|
type AzBus struct {
|
|
client *azservicebus.Client
|
|
logger zerolog.Logger
|
|
closed bool
|
|
closedMutex sync.RWMutex
|
|
}
|
|
|
|
type Config struct {
|
|
ConnectionString string
|
|
UseManagedIdentity bool
|
|
Namespace string
|
|
}
|
|
|
|
// NewAzBus creates a new Azure Service Bus publisher and subscriber
|
|
func NewAzBus(cfg Config, logger zerolog.Logger) (message.Subscriber, message.Publisher, error) {
|
|
var client *azservicebus.Client
|
|
var err error
|
|
|
|
if cfg.UseManagedIdentity {
|
|
// Use managed identity
|
|
if cfg.Namespace == "" {
|
|
return nil, nil, fmt.Errorf("azure service bus namespace is required when using managed identity")
|
|
}
|
|
|
|
credential, credErr := azidentity.NewDefaultAzureCredential(nil)
|
|
if credErr != nil {
|
|
return nil, nil, fmt.Errorf("failed to create azure credential: %w", credErr)
|
|
}
|
|
|
|
namespace := cfg.Namespace
|
|
client, err = azservicebus.NewClient(namespace, credential, nil)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("failed to create azure service bus client: %w", err)
|
|
}
|
|
} else {
|
|
// Use connection string
|
|
if cfg.ConnectionString == "" {
|
|
return nil, nil, fmt.Errorf("azure service bus connection string is not configured")
|
|
}
|
|
|
|
client, err = azservicebus.NewClientFromConnectionString(cfg.ConnectionString, nil)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("failed to create azure service bus client: %w", err)
|
|
}
|
|
}
|
|
|
|
azb := &AzBus{client: client, logger: logger, closed: false, closedMutex: sync.RWMutex{}}
|
|
|
|
return azb, azb, nil
|
|
}
|
|
|
|
func (a *AzBus) Close() error {
|
|
if a.closed {
|
|
return nil
|
|
}
|
|
|
|
if a.client != nil {
|
|
if err := a.client.Close(context.Background()); err != nil {
|
|
a.logger.Error().Err(err).Msg("failed to close azure service bus client")
|
|
return err
|
|
}
|
|
}
|
|
|
|
a.closed = true
|
|
a.logger.Info().Msg("azure service bus publisher closed")
|
|
return nil
|
|
}
|