NOISSUE - Add broker options (#119)

Signed-off-by: Arvindh <arvindh91@gmail.com>
This commit is contained in:
Arvindh
2025-04-16 17:51:57 +05:30
committed by GitHub
parent 17fa535502
commit aef424c5da
4 changed files with 30 additions and 12 deletions
+7 -3
View File
@@ -16,7 +16,11 @@ import (
"github.com/nats-io/nats.go/jetstream"
)
const AllTopic = "alarms.>"
const (
AllTopic = "alarms.>"
prefix = "writers"
)
var cfg = jetstream.StreamConfig{
Name: "alarms",
@@ -31,7 +35,7 @@ var cfg = jetstream.StreamConfig{
}
func NewPubSub(ctx context.Context, url string, logger *slog.Logger) (messaging.PubSub, error) {
pb, err := broker.NewPubSub(ctx, url, logger, broker.JSStreamConfig(cfg))
pb, err := broker.NewPubSub(ctx, url, logger, broker.Prefix(prefix), broker.JSStreamConfig(cfg))
if err != nil {
return nil, err
}
@@ -40,7 +44,7 @@ func NewPubSub(ctx context.Context, url string, logger *slog.Logger) (messaging.
}
func NewPublisher(ctx context.Context, url string) (messaging.Publisher, error) {
pb, err := broker.NewPublisher(ctx, url, broker.JSStreamConfig(cfg))
pb, err := broker.NewPublisher(ctx, url, broker.Prefix(prefix), broker.JSStreamConfig(cfg))
if err != nil {
return nil, err
}
+8 -3
View File
@@ -14,10 +14,15 @@ import (
broker "github.com/absmach/supermq/pkg/messaging/rabbitmq"
)
const AllTopic = "alarms.#"
const (
AllTopic = "alarms.#"
exchangeName = "writers"
prefix = "writers"
)
func NewPubSub(_ context.Context, url string, logger *slog.Logger) (messaging.PubSub, error) {
pb, err := broker.NewPubSub(url, logger, broker.Prefix("writers"))
pb, err := broker.NewPubSub(url, logger, broker.Prefix("writers"), broker.Exchange(exchangeName))
if err != nil {
return nil, err
}
@@ -26,7 +31,7 @@ func NewPubSub(_ context.Context, url string, logger *slog.Logger) (messaging.Pu
}
func NewPublisher(_ context.Context, url string) (messaging.Publisher, error) {
pb, err := broker.NewPublisher(url, broker.Prefix("writers"))
pb, err := broker.NewPublisher(url, broker.Prefix("writers"), broker.Exchange(exchangeName))
if err != nil {
return nil, err
}
+7 -3
View File
@@ -16,7 +16,11 @@ import (
"github.com/nats-io/nats.go/jetstream"
)
const AllTopic = "writers.>"
const (
AllTopic = "writers.>"
prefix = "writers"
)
var cfg = jetstream.StreamConfig{
Name: "writers",
@@ -31,7 +35,7 @@ var cfg = jetstream.StreamConfig{
}
func NewPubSub(ctx context.Context, url string, logger *slog.Logger) (messaging.PubSub, error) {
pb, err := broker.NewPubSub(ctx, url, logger, broker.JSStreamConfig(cfg))
pb, err := broker.NewPubSub(ctx, url, logger, broker.Prefix(prefix), broker.JSStreamConfig(cfg))
if err != nil {
return nil, err
}
@@ -40,7 +44,7 @@ func NewPubSub(ctx context.Context, url string, logger *slog.Logger) (messaging.
}
func NewPublisher(ctx context.Context, url string) (messaging.Publisher, error) {
pb, err := broker.NewPublisher(ctx, url, broker.JSStreamConfig(cfg))
pb, err := broker.NewPublisher(ctx, url, broker.Prefix(prefix), broker.JSStreamConfig(cfg))
if err != nil {
return nil, err
}
@@ -14,10 +14,15 @@ import (
broker "github.com/absmach/supermq/pkg/messaging/rabbitmq"
)
const AllTopic = "writers.#"
const (
AllTopic = "writers.#"
exchangeName = "writers"
prefix = "writers"
)
func NewPubSub(_ context.Context, url string, logger *slog.Logger) (messaging.PubSub, error) {
pb, err := broker.NewPubSub(url, logger, broker.Prefix("writers"))
pb, err := broker.NewPubSub(url, logger, broker.Prefix(prefix), broker.Exchange(exchangeName))
if err != nil {
return nil, err
}
@@ -26,7 +31,7 @@ func NewPubSub(_ context.Context, url string, logger *slog.Logger) (messaging.Pu
}
func NewPublisher(_ context.Context, url string) (messaging.Publisher, error) {
pb, err := broker.NewPublisher(url, broker.Prefix("writers"))
pb, err := broker.NewPublisher(url, broker.Prefix(prefix), broker.Exchange(exchangeName))
if err != nil {
return nil, err
}