Fix queue client example

Signed-off-by: dusan <borovcanindusan1@gmail.com>
This commit is contained in:
dusan
2026-02-03 13:52:28 +01:00
parent 9587c01500
commit 8d5eda3a8d
3 changed files with 89 additions and 9 deletions
+86 -9
View File
@@ -1,21 +1,23 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
// Package main demonstrates cross-protocol queue interop between MQTT and AMQP 1.0.
// Package main demonstrates cross-protocol queue interop between MQTT, AMQP 1.0, and AMQP 0.9.1.
//
// Scenario: Order Processing Pipeline
// - 3 MQTT Publishers send orders from different customers
// - 2 Consumer Groups process the same queue:
// - 3 Consumer Groups process the same queue:
// - "order-validators" (2 MQTT consumers) - validates each order
// - "order-fulfillment" (1 AMQP 1.0 consumer) - fulfills validated orders
// - "order-shipper" (1 AMQP 0.9.1 consumer) - ships fulfilled orders
//
// Key behaviors demonstrated:
// 1. MQTT publishers write to a durable queue
// 2. MQTT consumers receive messages via the FluxMQ client library
// 3. An AMQP 1.0 consumer (using Azure/go-amqp) receives the same messages
// 4. AMQP consumer uses Accepted/Released dispositions for ack/nack
// 5. Both consumer groups receive ALL messages independently (fan-out)
// 6. Within a group, messages are load-balanced across consumers
// 4. An AMQP 0.9.1 consumer (using rabbitmq/amqp091-go) receives the same messages
// 5. AMQP consumers use dispositions/acks for message acknowledgment
// 6. All three consumer groups receive ALL messages independently (fan-out)
// 7. Within a group, messages are load-balanced across consumers
package main
import (
@@ -33,6 +35,7 @@ import (
"github.com/Azure/go-amqp"
"github.com/absmach/fluxmq/client"
amqp091 "github.com/rabbitmq/amqp091-go"
)
const queueName = "tasks/orders"
@@ -40,6 +43,7 @@ const queueName = "tasks/orders"
var (
mqttAddr = flag.String("mqtt", "localhost:1883", "MQTT broker address")
amqpAddr = flag.String("amqp", "localhost:5672", "AMQP 1.0 broker address")
amqp091Addr = flag.String("amqp091", "localhost:5682", "AMQP 0.9.1 broker address")
numMessages = flag.Int("messages", 10, "Number of messages per publisher")
publishRate = flag.Duration("rate", 200*time.Millisecond, "Delay between publishes")
)
@@ -59,6 +63,7 @@ func main() {
publishedCount int64
validatorCount int64
fulfillmentCount int64
shipperCount int64
)
log.Println("Starting consumers...")
@@ -79,6 +84,13 @@ func main() {
runAMQPFulfillment(ctx, &fulfillmentCount)
}()
// Consumer Group 3: order-shipper (1 AMQP 0.9.1 consumer)
wg.Add(1)
go func() {
defer wg.Done()
runAMQP091Fulfillment(ctx, &shipperCount)
}()
time.Sleep(time.Second)
log.Println("Starting publishers...")
@@ -113,14 +125,16 @@ func main() {
wg.Wait()
fmt.Println("\n=== Statistics ===")
fmt.Printf("Messages published: %d\n", atomic.LoadInt64(&publishedCount))
fmt.Printf("Validator group processed (MQTT): %d\n", atomic.LoadInt64(&validatorCount))
fmt.Printf("Fulfillment group processed (AMQP): %d\n", atomic.LoadInt64(&fulfillmentCount))
fmt.Printf("Messages published: %d\n", atomic.LoadInt64(&publishedCount))
fmt.Printf("Validator group processed (MQTT): %d\n", atomic.LoadInt64(&validatorCount))
fmt.Printf("Fulfillment group processed (AMQP 1.0): %d\n", atomic.LoadInt64(&fulfillmentCount))
fmt.Printf("Shipper group processed (AMQP 0.9.1): %d\n", atomic.LoadInt64(&shipperCount))
fmt.Println()
fmt.Println("Expected behavior:")
fmt.Printf(" - Each group receives all %d messages\n", atomic.LoadInt64(&publishedCount))
fmt.Println(" - Validators split work across 2 MQTT consumers")
fmt.Println(" - Fulfillment processes all messages via AMQP 1.0")
fmt.Println(" - Shipper processes all messages via AMQP 0.9.1")
}
// runPublisher publishes orders via MQTT with QoS 2.
@@ -306,7 +320,7 @@ func runAMQPFulfillment(ctx context.Context, processed *int64) {
time.Sleep(200 * time.Millisecond)
atomic.AddInt64(processed, 1)
log.Printf("[%s] Fulfilled order (AMQP): %s", clientID, string(payload))
log.Printf("[%s] Fulfilled order (AMQP 1.0): %s", clientID, string(payload))
// Demonstrate different disposition types
if rand.Float32() < 0.05 {
@@ -326,3 +340,66 @@ func runAMQPFulfillment(ctx context.Context, processed *int64) {
}
}
}
// runAMQP091Fulfillment runs the order shipping consumer over AMQP 0.9.1.
func runAMQP091Fulfillment(ctx context.Context, processed *int64) {
clientID := "amqp091-shipper-1"
consumerGroup := "order-shipper"
addr := fmt.Sprintf("amqp://guest:guest@%s/", *amqp091Addr)
conn, err := amqp091.Dial(addr)
if err != nil {
log.Printf("[%s] Failed to connect: %v", clientID, err)
return
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
log.Printf("[%s] Failed to open a channel: %v", clientID, err)
return
}
defer ch.Close()
// FluxMQ uses consumer arguments to specify the group
args := amqp091.Table{"x-consumer-group": consumerGroup}
msgs, err := ch.Consume(
"$queue/"+queueName,
clientID, // consumer tag
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
args,
)
if err != nil {
log.Printf("[%s] Failed to register a consumer: %v", clientID, err)
return
}
log.Printf("[%s] Connected to %s (AMQP 0.9.1), receiving from queue '%s' in group '%s'",
clientID, *amqp091Addr, "$queue/"+queueName, consumerGroup)
for {
select {
case <-ctx.Done():
log.Printf("[%s] Shutting down", clientID)
return
case d, ok := <-msgs:
if !ok {
log.Printf("[%s] Consumer channel closed", clientID)
return
}
time.Sleep(250 * time.Millisecond) // Simulate shipping work
atomic.AddInt64(processed, 1)
log.Printf("[%s] Shipped order (AMQP 0.9.1): %s", clientID, string(d.Body))
if err := d.Ack(false); err != nil {
log.Printf("[%s] Failed to ack message: %v", clientID, err)
}
}
}
}
+1
View File
@@ -13,6 +13,7 @@ require (
github.com/klauspost/compress v1.18.3
github.com/pion/dtls/v3 v3.0.7
github.com/plgd-dev/go-coap/v3 v3.4.1
github.com/rabbitmq/amqp091-go v1.10.0
github.com/sony/gobreaker v1.0.0
go.etcd.io/etcd/client/v3 v3.5.17
go.etcd.io/etcd/server/v3 v3.5.17
+2
View File
@@ -221,6 +221,8 @@ github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+Gx
github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
github.com/prometheus/procfs v0.6.0 h1:mxy4L2jP6qMonqmq+aTtOx1ifVWUgG/TAmntgbh3xv4=
github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ=
github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc=