Fix replication (#5)

* Fix replication

Signed-off-by: dusan <borovcanindusan1@gmail.com>

* Fix forwarding for write policy forward

Signed-off-by: dusan <borovcanindusan1@gmail.com>

* Use replicate mode formwarding for ephemeral queues

Signed-off-by: dusan <borovcanindusan1@gmail.com>

---------

Signed-off-by: dusan <borovcanindusan1@gmail.com>
This commit is contained in:
Dušan Borovčanin
2026-02-05 12:28:32 +01:00
committed by GitHub
parent 203c23c830
commit e3916f91af
28 changed files with 590 additions and 124 deletions
+1 -1
View File
@@ -36,7 +36,7 @@ type QueueManager interface {
CreateQueue(ctx context.Context, config qtypes.QueueConfig) error
UpdateQueue(ctx context.Context, config qtypes.QueueConfig) error
GetQueue(ctx context.Context, queueName string) (*qtypes.QueueConfig, error)
Publish(ctx context.Context, topic string, payload []byte, properties map[string]string) error
Publish(ctx context.Context, publish qtypes.PublishRequest) error
Subscribe(ctx context.Context, queueName, pattern, clientID, groupID, proxyNodeID string) error
SubscribeWithCursor(ctx context.Context, queueName, pattern, clientID, groupID, proxyNodeID string, cursor *qtypes.CursorOption) error
Unsubscribe(ctx context.Context, queueName, pattern, clientID, groupID string) error
+15 -3
View File
@@ -367,7 +367,11 @@ func (ch *Channel) completePublish() {
if exchangeName == "" && strings.HasPrefix(routingKey, "$queue/") {
qm := ch.conn.broker.getQueueManager()
if qm != nil {
if err := qm.Publish(context.Background(), routingKey, body, props); err != nil {
if err := qm.Publish(context.Background(), qtypes.PublishRequest{
Topic: routingKey,
Payload: body,
Properties: props,
}); err != nil {
ch.conn.logger.Error("queue publish failed", "queue", routingKey, "error", err)
}
if ch.confirmMode {
@@ -382,7 +386,11 @@ func (ch *Channel) completePublish() {
qm := ch.conn.broker.getQueueManager()
if qm != nil {
queueTopic := "$queue/" + routingKey
if err := qm.Publish(context.Background(), queueTopic, body, props); err != nil {
if err := qm.Publish(context.Background(), qtypes.PublishRequest{
Topic: queueTopic,
Payload: body,
Properties: props,
}); err != nil {
ch.conn.logger.Error("queue publish failed", "queue", routingKey, "error", err)
}
if ch.confirmMode {
@@ -412,7 +420,11 @@ func (ch *Channel) completePublish() {
if routingKey != "" {
queueTopic = queueTopic + "/" + routingKey
}
if err := qm.Publish(context.Background(), queueTopic, body, props); err != nil {
if err := qm.Publish(context.Background(), qtypes.PublishRequest{
Topic: queueTopic,
Payload: body,
Properties: props,
}); err != nil {
ch.conn.logger.Error("queue publish failed", "queue", b.queue, "error", err)
}
}
+17 -17
View File
@@ -9,7 +9,7 @@ import (
"reflect"
"testing"
"github.com/absmach/fluxmq/amqp091/codec"
"github.com/absmach/fluxmq/amqp/codec"
)
func TestReadWriteOctet(t *testing.T) {
@@ -264,8 +264,8 @@ func TestReadWriteTable(t *testing.T) {
func TestConnectionMethods(t *testing.T) {
tests := []struct {
name string
method interface {
name string
method interface {
Read(*bytes.Reader) error
Write(io.Writer) error
}
@@ -376,12 +376,12 @@ func TestConnectionMethods(t *testing.T) {
{
"ConnectionOpen",
&codec.ConnectionOpen{
VirtualHost: "/",
VirtualHost: "/",
Capabilities: "",
Insist: true,
},
&codec.ConnectionOpen{
VirtualHost: "/",
VirtualHost: "/",
Capabilities: "",
Insist: true,
},
@@ -512,8 +512,8 @@ func TestFieldValueTypes(t *testing.T) {
func TestChannelMethods(t *testing.T) {
tests := []struct {
name string
method interface {
name string
method interface {
Read(*bytes.Reader) error
Write(io.Writer) error
}
@@ -623,8 +623,8 @@ func TestChannelMethods(t *testing.T) {
func TestExchangeMethods(t *testing.T) {
tests := []struct {
name string
method interface {
name string
method interface {
Read(*bytes.Reader) error
Write(io.Writer) error
}
@@ -804,8 +804,8 @@ func TestExchangeMethods(t *testing.T) {
func TestQueueMethods(t *testing.T) {
tests := []struct {
name string
method interface {
name string
method interface {
Read(*bytes.Reader) error
Write(io.Writer) error
}
@@ -1012,8 +1012,8 @@ func TestQueueMethods(t *testing.T) {
func TestBasicMethods(t *testing.T) {
tests := []struct {
name string
method interface {
name string
method interface {
Read(*bytes.Reader) error
Write(io.Writer) error
}
@@ -1304,8 +1304,8 @@ func TestBasicMethods(t *testing.T) {
func TestTxMethods(t *testing.T) {
tests := []struct {
name string
method interface {
name string
method interface {
Read(*bytes.Reader) error
Write(io.Writer) error
}
@@ -1405,8 +1405,8 @@ func TestTxMethods(t *testing.T) {
func TestConfirmMethods(t *testing.T) {
tests := []struct {
name string
method interface {
name string
method interface {
Read(*bytes.Reader) error
Write(io.Writer) error
}
+5 -1
View File
@@ -282,7 +282,11 @@ func (l *Link) receiveTransfer(transfer *performatives.Transfer, payload []byte)
props[k] = s
}
}
if err := qm.Publish(context.Background(), publishTopic, data, props); err != nil {
if err := qm.Publish(context.Background(), qtypes.PublishRequest{
Topic: publishTopic,
Payload: data,
Properties: props,
}); err != nil {
l.logger.Error("queue publish failed", "topic", publishTopic, "error", err)
}
}
+1 -1
View File
@@ -20,7 +20,7 @@ type QueueManager interface {
Start(ctx context.Context) error
Stop() error
// Publish adds a message to all queues whose topic patterns match the topic.
Publish(ctx context.Context, topic string, payload []byte, properties map[string]string) error
Publish(ctx context.Context, publish types.PublishRequest) error
// Subscribe adds a consumer to a queue with optional pattern matching.
Subscribe(ctx context.Context, queueName, pattern, clientID, groupID, proxyNodeID string) error
// SubscribeWithCursor adds a consumer with explicit cursor positioning.
+1 -1
View File
@@ -96,7 +96,7 @@ type QueueConsumerRegistry interface {
// ForwardQueuePublish forwards a queue publish to a remote node.
// The remote node will store the message in its local matching queues.
ForwardQueuePublish(ctx context.Context, nodeID, topic string, payload []byte, properties map[string]string) error
ForwardQueuePublish(ctx context.Context, nodeID, topic string, payload []byte, properties map[string]string, forwardToLeader bool) error
}
type Lifecycle interface {
+3 -10
View File
@@ -1028,7 +1028,7 @@ func (c *EtcdCluster) EnqueueRemote(ctx context.Context, nodeID, queueName strin
if c.transport == nil {
return "", fmt.Errorf("transport not configured")
}
return c.transport.SendEnqueueRemote(ctx, nodeID, queueName, payload, properties)
return c.transport.SendEnqueueRemote(ctx, nodeID, queueName, payload, properties, false, false)
}
// RouteQueueMessage sends a queue message to a remote consumer.
@@ -1219,20 +1219,13 @@ func (c *EtcdCluster) ListAllQueueConsumers(ctx context.Context) ([]*QueueConsum
}
// ForwardQueuePublish forwards a queue publish to a remote node.
func (c *EtcdCluster) ForwardQueuePublish(ctx context.Context, nodeID, topic string, payload []byte, properties map[string]string) error {
func (c *EtcdCluster) ForwardQueuePublish(ctx context.Context, nodeID, topic string, payload []byte, properties map[string]string, forwardToLeader bool) error {
if c.transport == nil {
return fmt.Errorf("transport not configured")
}
// Add special property to indicate this is a forwarded publish
props := make(map[string]string)
for k, v := range properties {
props[k] = v
}
props["_forward_publish"] = "true"
// Use SendEnqueueRemote with topic in queueName field
_, err := c.transport.SendEnqueueRemote(ctx, nodeID, topic, payload, props)
_, err := c.transport.SendEnqueueRemote(ctx, nodeID, topic, payload, properties, true, forwardToLeader)
return err
}
+1 -1
View File
@@ -174,7 +174,7 @@ func (n *NoopCluster) ListAllQueueConsumers(ctx context.Context) ([]*QueueConsum
return nil, nil
}
func (n *NoopCluster) ForwardQueuePublish(ctx context.Context, nodeID, topic string, payload []byte, properties map[string]string) error {
func (n *NoopCluster) ForwardQueuePublish(ctx context.Context, nodeID, topic string, payload []byte, properties map[string]string, forwardToLeader bool) error {
// Single-node: no remote nodes to forward to
return ErrClusterNotEnabled
}
+32 -23
View File
@@ -18,6 +18,7 @@ import (
"connectrpc.com/connect"
clusterv1 "github.com/absmach/fluxmq/pkg/proto/cluster/v1"
"github.com/absmach/fluxmq/pkg/proto/cluster/v1/clusterv1connect"
queueTypes "github.com/absmach/fluxmq/queue/types"
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
)
@@ -30,8 +31,8 @@ type QueueHandler interface {
// DeliverQueueMessage delivers a queue message to a local consumer.
DeliverQueueMessage(ctx context.Context, clientID string, msg any) error
// PublishLocal publishes a message to local matching queues (called by remote forward).
PublishLocal(ctx context.Context, topic string, payload []byte, properties map[string]string) error
// HandleQueuePublish handles a publish with the given mode.
HandleQueuePublish(ctx context.Context, publish queueTypes.PublishRequest, mode queueTypes.PublishMode) error
}
// Transport handles inter-broker communication using Connect protocol.
@@ -246,7 +247,7 @@ func (t *Transport) HasPeerConnection(nodeID string) bool {
}
// RoutePublish implements BrokerServiceHandler.RoutePublish.
func (t *Transport) RoutePublish(ctx context.Context, req *connect.Request[clusterv1.PublishRequest]) (*connect.Response[clusterv1.PublishResponse], error) {
func (t *Transport) RoutePublish(ctx context.Context, req *PublishReq) (*PublishResp, error) {
if t.handler == nil {
return connect.NewResponse(&clusterv1.PublishResponse{
Success: false,
@@ -277,7 +278,7 @@ func (t *Transport) RoutePublish(ctx context.Context, req *connect.Request[clust
}
// TakeoverSession implements BrokerServiceHandler.TakeoverSession.
func (t *Transport) TakeoverSession(ctx context.Context, req *connect.Request[clusterv1.TakeoverRequest]) (*connect.Response[clusterv1.TakeoverResponse], error) {
func (t *Transport) TakeoverSession(ctx context.Context, req *TakeoverReq) (*TakeoverResp, error) {
if t.handler == nil {
return connect.NewResponse(&clusterv1.TakeoverResponse{
Success: false,
@@ -300,7 +301,7 @@ func (t *Transport) TakeoverSession(ctx context.Context, req *connect.Request[cl
}
// FetchRetained implements BrokerServiceHandler.FetchRetained.
func (t *Transport) FetchRetained(ctx context.Context, req *connect.Request[clusterv1.FetchRetainedRequest]) (*connect.Response[clusterv1.FetchRetainedResponse], error) {
func (t *Transport) FetchRetained(ctx context.Context, req *FetchRetainedReq) (*FetchRetainedResp, error) {
if t.handler == nil {
return connect.NewResponse(&clusterv1.FetchRetainedResponse{
Found: false,
@@ -338,7 +339,7 @@ func (t *Transport) FetchRetained(ctx context.Context, req *connect.Request[clus
}
// FetchWill implements BrokerServiceHandler.FetchWill.
func (t *Transport) FetchWill(ctx context.Context, req *connect.Request[clusterv1.FetchWillRequest]) (*connect.Response[clusterv1.FetchWillResponse], error) {
func (t *Transport) FetchWill(ctx context.Context, req *FetchWillReq) (*FetchWillResp, error) {
if t.handler == nil {
return connect.NewResponse(&clusterv1.FetchWillResponse{
Found: false,
@@ -375,7 +376,7 @@ func (t *Transport) FetchWill(ctx context.Context, req *connect.Request[clusterv
}
// EnqueueRemote implements BrokerServiceHandler.EnqueueRemote.
func (t *Transport) EnqueueRemote(ctx context.Context, req *connect.Request[clusterv1.EnqueueRemoteRequest]) (*connect.Response[clusterv1.EnqueueRemoteResponse], error) {
func (t *Transport) EnqueueRemote(ctx context.Context, req *EnqueueRemoteReq) (*EnqueueRemoteResp, error) {
t.mu.RLock()
handler := t.queueHandler
t.mu.RUnlock()
@@ -387,17 +388,23 @@ func (t *Transport) EnqueueRemote(ctx context.Context, req *connect.Request[clus
}), nil
}
forwardedPublish := req.Msg.ForwardedPublish
forwardToLeader := req.Msg.ForwardToLeader
// Check if this is a forwarded publish (topic-based) vs direct enqueue (queue-based)
if req.Msg.Properties != nil && req.Msg.Properties["_forward_publish"] == "true" {
// This is a forwarded publish - call PublishLocal with the topic
if forwardedPublish {
// This is a forwarded publish - handle with mode
topic := req.Msg.QueueName // topic is passed in queueName field for forwards
props := make(map[string]string)
for k, v := range req.Msg.Properties {
if k != "_forward_publish" {
props[k] = v
}
mode := queueTypes.PublishForwarded
if forwardToLeader {
mode = queueTypes.PublishNormal
}
err := handler.PublishLocal(ctx, topic, req.Msg.Payload, props)
err := handler.HandleQueuePublish(ctx, queueTypes.PublishRequest{
Topic: topic,
Payload: req.Msg.Payload,
Properties: req.Msg.Properties,
}, mode)
if err != nil {
return connect.NewResponse(&clusterv1.EnqueueRemoteResponse{
Success: false,
@@ -425,7 +432,7 @@ func (t *Transport) EnqueueRemote(ctx context.Context, req *connect.Request[clus
}
// RouteQueueMessage implements BrokerServiceHandler.RouteQueueMessage.
func (t *Transport) RouteQueueMessage(ctx context.Context, req *connect.Request[clusterv1.RouteQueueMessageRequest]) (*connect.Response[clusterv1.RouteQueueMessageResponse], error) {
func (t *Transport) RouteQueueMessage(ctx context.Context, req *RouteQueueMessageReq) (*RouteQueueMessageResp, error) {
t.mu.RLock()
handler := t.queueHandler
t.mu.RUnlock()
@@ -459,7 +466,7 @@ func (t *Transport) RouteQueueMessage(ctx context.Context, req *connect.Request[
}
// AppendEntries implements BrokerServiceHandler.AppendEntries (Raft).
func (t *Transport) AppendEntries(ctx context.Context, req *connect.Request[clusterv1.AppendEntriesRequest]) (*connect.Response[clusterv1.AppendEntriesResponse], error) {
func (t *Transport) AppendEntries(ctx context.Context, req *AppendEntriesReq) (*AppendEntriesResp, error) {
// TODO: Implement Raft consensus
return connect.NewResponse(&clusterv1.AppendEntriesResponse{
Term: req.Msg.Term,
@@ -468,7 +475,7 @@ func (t *Transport) AppendEntries(ctx context.Context, req *connect.Request[clus
}
// RequestVote implements BrokerServiceHandler.RequestVote (Raft).
func (t *Transport) RequestVote(ctx context.Context, req *connect.Request[clusterv1.RequestVoteRequest]) (*connect.Response[clusterv1.RequestVoteResponse], error) {
func (t *Transport) RequestVote(ctx context.Context, req *RequestVoteReq) (*RequestVoteResp, error) {
// TODO: Implement Raft consensus
return connect.NewResponse(&clusterv1.RequestVoteResponse{
Term: req.Msg.Term,
@@ -477,7 +484,7 @@ func (t *Transport) RequestVote(ctx context.Context, req *connect.Request[cluste
}
// InstallSnapshot implements BrokerServiceHandler.InstallSnapshot (Raft).
func (t *Transport) InstallSnapshot(ctx context.Context, req *connect.Request[clusterv1.InstallSnapshotRequest]) (*connect.Response[clusterv1.InstallSnapshotResponse], error) {
func (t *Transport) InstallSnapshot(ctx context.Context, req *InstallSnapshotReq) (*InstallSnapshotResp, error) {
// TODO: Implement Raft consensus
return connect.NewResponse(&clusterv1.InstallSnapshotResponse{
Term: req.Msg.Term,
@@ -619,7 +626,7 @@ func (t *Transport) SendFetchWill(ctx context.Context, nodeID, clientID string)
}
// SendEnqueueRemote sends an enqueue request to a peer node with retry and circuit breaker.
func (t *Transport) SendEnqueueRemote(ctx context.Context, nodeID, queueName string, payload []byte, properties map[string]string) (string, error) {
func (t *Transport) SendEnqueueRemote(ctx context.Context, nodeID, queueName string, payload []byte, properties map[string]string, forwarded, forwardToLeader bool) (string, error) {
var messageID string
err := retryWithBreaker(ctx, t.breakers, nodeID, func() error {
client, err := t.GetPeerClient(nodeID)
@@ -628,9 +635,11 @@ func (t *Transport) SendEnqueueRemote(ctx context.Context, nodeID, queueName str
}
req := connect.NewRequest(&clusterv1.EnqueueRemoteRequest{
QueueName: queueName,
Payload: payload,
Properties: properties,
QueueName: queueName,
Payload: payload,
Properties: properties,
ForwardedPublish: forwarded,
ForwardToLeader: forwardToLeader,
})
resp, err := client.EnqueueRemote(ctx, req)
+30
View File
@@ -0,0 +1,30 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package cluster
import (
"connectrpc.com/connect"
clusterv1 "github.com/absmach/fluxmq/pkg/proto/cluster/v1"
)
type (
PublishReq = connect.Request[clusterv1.PublishRequest]
PublishResp = connect.Response[clusterv1.PublishResponse]
TakeoverReq = connect.Request[clusterv1.TakeoverRequest]
TakeoverResp = connect.Response[clusterv1.TakeoverResponse]
FetchRetainedReq = connect.Request[clusterv1.FetchRetainedRequest]
FetchRetainedResp = connect.Response[clusterv1.FetchRetainedResponse]
FetchWillReq = connect.Request[clusterv1.FetchWillRequest]
FetchWillResp = connect.Response[clusterv1.FetchWillResponse]
EnqueueRemoteReq = connect.Request[clusterv1.EnqueueRemoteRequest]
EnqueueRemoteResp = connect.Response[clusterv1.EnqueueRemoteResponse]
RouteQueueMessageReq = connect.Request[clusterv1.RouteQueueMessageRequest]
RouteQueueMessageResp = connect.Response[clusterv1.RouteQueueMessageResponse]
AppendEntriesReq = connect.Request[clusterv1.AppendEntriesRequest]
AppendEntriesResp = connect.Response[clusterv1.AppendEntriesResponse]
RequestVoteReq = connect.Request[clusterv1.RequestVoteRequest]
RequestVoteResp = connect.Response[clusterv1.RequestVoteResponse]
InstallSnapshotReq = connect.Request[clusterv1.InstallSnapshotRequest]
InstallSnapshotResp = connect.Response[clusterv1.InstallSnapshotResponse]
)
+2
View File
@@ -335,6 +335,8 @@ func main() {
// Convert queue configs from main config to queue types
queueCfg := queue.DefaultConfig()
queueCfg.AutoCommitInterval = cfg.QueueManager.AutoCommitInterval
queueCfg.WritePolicy = queue.WritePolicy(cfg.Cluster.Raft.WritePolicy)
queueCfg.DistributionMode = queue.DistributionMode(cfg.Cluster.Raft.DistributionMode)
for _, qc := range cfg.Queues {
queueCfg.QueueConfigs = append(queueCfg.QueueConfigs, queueTypes.FromInput(queueTypes.QueueConfigInput{
Name: qc.Name,
+22 -3
View File
@@ -289,9 +289,11 @@ type RaftConfig struct {
SyncMode bool `yaml:"sync_mode"` // true=wait for quorum, false=async
MinInSyncReplicas int `yaml:"min_in_sync_replicas"`
AckTimeout time.Duration `yaml:"ack_timeout"`
BindAddr string `yaml:"bind_addr"` // Base address for Raft (e.g., "127.0.0.1:7100")
DataDir string `yaml:"data_dir"` // Directory for Raft data
Peers map[string]string `yaml:"peers"` // Map of nodeID -> raft base address
WritePolicy string `yaml:"write_policy"` // local, reject, forward
DistributionMode string `yaml:"distribution_mode"` // forward, replicate
BindAddr string `yaml:"bind_addr"` // Base address for Raft (e.g., "127.0.0.1:7100")
DataDir string `yaml:"data_dir"` // Directory for Raft data
Peers map[string]string `yaml:"peers"` // Map of nodeID -> raft base address
// Raft tuning
HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout"`
@@ -494,6 +496,8 @@ func Default() *Config {
SyncMode: true,
MinInSyncReplicas: 2,
AckTimeout: 5 * time.Second,
WritePolicy: "forward",
DistributionMode: "replicate",
BindAddr: "127.0.0.1:7100",
DataDir: "/tmp/fluxmq/raft",
Peers: map[string]string{},
@@ -862,6 +866,21 @@ func (c *Config) Validate() error {
return fmt.Errorf("cluster.transport.tls_ca_file required when transport TLS is enabled")
}
}
if c.Cluster.Raft.WritePolicy != "" {
switch strings.ToLower(c.Cluster.Raft.WritePolicy) {
case "local", "reject", "forward":
default:
return fmt.Errorf("cluster.raft.write_policy must be one of: local, reject, forward")
}
}
if c.Cluster.Raft.DistributionMode != "" {
switch strings.ToLower(c.Cluster.Raft.DistributionMode) {
case "forward", "replicate":
default:
return fmt.Errorf("cluster.raft.distribution_mode must be one of: forward, replicate")
}
}
}
// Webhook validation (only if enabled)
+10
View File
@@ -699,6 +699,8 @@ cluster:
sync_mode: true
min_in_sync_replicas: 2
ack_timeout: "5s"
write_policy: "forward"
distribution_mode: "replicate"
bind_addr: "127.0.0.1:7100"
data_dir: "/tmp/fluxmq/raft"
peers: {}
@@ -903,6 +905,8 @@ raft:
sync_mode: true
min_in_sync_replicas: 2
ack_timeout: "5s"
write_policy: "forward" # local | reject | forward
distribution_mode: "replicate" # forward | replicate
bind_addr: "127.0.0.1:7100"
data_dir: "/tmp/fluxmq/raft"
peers:
@@ -914,6 +918,12 @@ raft:
snapshot_threshold: 8192
```
Notes:
- `write_policy` controls how non-leader nodes handle queue writes when Raft is enabled.
`local` keeps current behavior, `reject` returns a leader error, `forward` proxies to the leader.
- `distribution_mode` controls how publishes reach consumers across nodes.
`forward` uses cluster forwarding, `replicate` relies on Raft log replication (requires Raft enabled).
## Rate Limiting Configuration
Rate limiting can be enabled per IP and per client.
+2 -1
View File
@@ -365,8 +365,9 @@ Implemented:
Raft notes:
- Only leader appends go through Raft.
- Non-leader nodes currently append locally.
- Non-leader write behavior is configurable via `cluster.raft.write_policy` (`local`, `reject`, `forward`).
- Ack/Nack/Reject and consumer state are not replicated via Raft yet.
- Cross-node distribution is configurable via `cluster.raft.distribution_mode` (`forward`, `replicate`).
---
+8 -1
View File
@@ -867,10 +867,15 @@ func (b *Broker) setupSignalHandler() {
### Current Implementation Notes
- Single Raft group shared by all queues (`queue/raft/manager.go`)
- Append operations go through Raft only on the leader; non-leader appends are local
- Append operations go through Raft only on the leader; non-leader behavior is configurable (`write_policy`)
- Ack/Nack/Reject, cursor/PEL updates, and retention truncation are not replicated
- Benchmarks and failover tests for the Raft layer are not present in-tree
### Planned (Phase 2.x)
- Replicate consumer state (cursor/PEL/ACK/NACK/REJECT) through Raft
- Replicate retention truncation through Raft
### Configuration Example
```yaml
@@ -883,6 +888,8 @@ cluster:
sync_mode: true
min_in_sync_replicas: 2
ack_timeout: 5s
write_policy: "forward"
distribution_mode: "replicate"
bind_addr: "127.0.0.1:7100"
data_dir: "/tmp/fluxmq/raft"
peers:
+2
View File
@@ -92,6 +92,8 @@ cluster:
sync_mode: true
min_in_sync_replicas: 2
ack_timeout: "5s"
write_policy: "forward"
distribution_mode: "replicate"
bind_addr: "127.0.0.1:7100"
data_dir: "/tmp/fluxmq/node1/raft"
peers:
+2
View File
@@ -92,6 +92,8 @@ cluster:
sync_mode: true
min_in_sync_replicas: 2
ack_timeout: "5s"
write_policy: "forward"
distribution_mode: "replicate"
bind_addr: "127.0.0.1:7200"
data_dir: "/tmp/fluxmq/node2/raft"
peers:
+2
View File
@@ -92,6 +92,8 @@ cluster:
sync_mode: true
min_in_sync_replicas: 2
ack_timeout: "5s"
write_policy: "forward"
distribution_mode: "replicate"
bind_addr: "127.0.0.1:7300"
data_dir: "/tmp/fluxmq/node3/raft"
peers:
+3 -3
View File
@@ -11,11 +11,11 @@ import (
"github.com/absmach/fluxmq/broker"
"github.com/absmach/fluxmq/broker/router"
qtypes "github.com/absmach/fluxmq/queue/types"
"github.com/absmach/fluxmq/cluster"
"github.com/absmach/fluxmq/config"
"github.com/absmach/fluxmq/server/otel"
"github.com/absmach/fluxmq/mqtt/session"
qtypes "github.com/absmach/fluxmq/queue/types"
"github.com/absmach/fluxmq/server/otel"
"github.com/absmach/fluxmq/storage"
"github.com/absmach/fluxmq/storage/memory"
"go.opentelemetry.io/otel/trace"
@@ -36,7 +36,7 @@ type Notifier interface {
type QueueManager interface {
Start(ctx context.Context) error
Stop() error
Publish(ctx context.Context, topic string, payload []byte, properties map[string]string) error
Publish(ctx context.Context, publish qtypes.PublishRequest) error
Subscribe(ctx context.Context, queueName, pattern, clientID, groupID, proxyNodeID string) error
SubscribeWithCursor(ctx context.Context, queueName, pattern, clientID, groupID, proxyNodeID string, cursor *qtypes.CursorOption) error
Unsubscribe(ctx context.Context, queueName, pattern, clientID, groupID string) error
+6 -1
View File
@@ -11,6 +11,7 @@ import (
"time"
"github.com/absmach/fluxmq/broker/events"
"github.com/absmach/fluxmq/queue/types"
"github.com/absmach/fluxmq/storage"
)
@@ -36,7 +37,11 @@ func (b *Broker) Publish(msg *storage.Message) error {
if isQueueTopic(msg.Topic) {
// Route to queue manager - use existing properties or nil (avoid allocation)
return b.queueManager.Publish(context.Background(), msg.Topic, msg.GetPayload(), msg.Properties)
return b.queueManager.Publish(context.Background(), types.PublishRequest{
Topic: msg.Topic,
Payload: msg.GetPayload(),
Properties: msg.Properties,
})
}
}
+26 -8
View File
@@ -961,12 +961,14 @@ func (x *FetchWillResponse) GetError() string {
}
type EnqueueRemoteRequest struct {
state protoimpl.MessageState `protogen:"open.v1"`
QueueName string `protobuf:"bytes,1,opt,name=queue_name,json=queueName,proto3" json:"queue_name,omitempty"`
Payload []byte `protobuf:"bytes,2,opt,name=payload,proto3" json:"payload,omitempty"`
Properties map[string]string `protobuf:"bytes,3,rep,name=properties,proto3" json:"properties,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
state protoimpl.MessageState `protogen:"open.v1"`
QueueName string `protobuf:"bytes,1,opt,name=queue_name,json=queueName,proto3" json:"queue_name,omitempty"`
Payload []byte `protobuf:"bytes,2,opt,name=payload,proto3" json:"payload,omitempty"`
Properties map[string]string `protobuf:"bytes,3,rep,name=properties,proto3" json:"properties,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"`
ForwardedPublish bool `protobuf:"varint,4,opt,name=forwarded_publish,json=forwardedPublish,proto3" json:"forwarded_publish,omitempty"`
ForwardToLeader bool `protobuf:"varint,5,opt,name=forward_to_leader,json=forwardToLeader,proto3" json:"forward_to_leader,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *EnqueueRemoteRequest) Reset() {
@@ -1020,6 +1022,20 @@ func (x *EnqueueRemoteRequest) GetProperties() map[string]string {
return nil
}
func (x *EnqueueRemoteRequest) GetForwardedPublish() bool {
if x != nil {
return x.ForwardedPublish
}
return false
}
func (x *EnqueueRemoteRequest) GetForwardToLeader() bool {
if x != nil {
return x.ForwardToLeader
}
return false
}
type EnqueueRemoteResponse struct {
state protoimpl.MessageState `protogen:"open.v1"`
Success bool `protobuf:"varint,1,opt,name=success,proto3" json:"success,omitempty"`
@@ -1715,14 +1731,16 @@ const file_cluster_v1_broker_proto_rawDesc = "" +
"\x11FetchWillResponse\x12\x14\n" +
"\x05found\x18\x01 \x01(\bR\x05found\x128\n" +
"\amessage\x18\x02 \x01(\v2\x1e.fluxmq.cluster.v1.WillMessageR\amessage\x12\x14\n" +
"\x05error\x18\x03 \x01(\tR\x05error\"\xe7\x01\n" +
"\x05error\x18\x03 \x01(\tR\x05error\"\xc0\x02\n" +
"\x14EnqueueRemoteRequest\x12\x1d\n" +
"\n" +
"queue_name\x18\x01 \x01(\tR\tqueueName\x12\x18\n" +
"\apayload\x18\x02 \x01(\fR\apayload\x12W\n" +
"\n" +
"properties\x18\x03 \x03(\v27.fluxmq.cluster.v1.EnqueueRemoteRequest.PropertiesEntryR\n" +
"properties\x1a=\n" +
"properties\x12+\n" +
"\x11forwarded_publish\x18\x04 \x01(\bR\x10forwardedPublish\x12*\n" +
"\x11forward_to_leader\x18\x05 \x01(\bR\x0fforwardToLeader\x1a=\n" +
"\x0fPropertiesEntry\x12\x10\n" +
"\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" +
"\x05value\x18\x02 \x01(\tR\x05value:\x028\x01\"f\n" +
+2
View File
@@ -158,6 +158,8 @@ message EnqueueRemoteRequest {
string queue_name = 1;
bytes payload = 2;
map<string, string> properties = 3;
bool forwarded_publish = 4;
bool forward_to_leader = 5;
}
message EnqueueRemoteResponse {
+138 -41
View File
@@ -27,12 +27,14 @@ type DeliverFn func(ctx context.Context, clientID string, msg any) error
// Manager is the queue-based queue manager.
// It uses append-only logs with cursor-based consumer groups, NATS JetQueue-style.
type Manager struct {
queueStore storage.QueueStore
groupStore storage.ConsumerGroupStore
consumerManager *consumer.Manager
deliverFn DeliverFn
logger *slog.Logger
config Config
queueStore storage.QueueStore
groupStore storage.ConsumerGroupStore
consumerManager *consumer.Manager
deliverFn DeliverFn
logger *slog.Logger
config Config
writePolicy WritePolicy
distributionMode DistributionMode
// Raft replication manager
raftManager *raft.Manager
@@ -80,6 +82,10 @@ type Config struct {
// Retention configuration
RetentionCheckInterval time.Duration
// Replication/distribution configuration
WritePolicy WritePolicy
DistributionMode DistributionMode
// Queue configurations from main config
QueueConfigs []types.QueueConfig
}
@@ -99,6 +105,8 @@ func DefaultConfig() Config {
StealInterval: 5 * time.Second,
StealEnabled: true,
RetentionCheckInterval: 5 * time.Minute,
WritePolicy: WritePolicyLocal,
DistributionMode: DistributionForward,
}
}
@@ -127,21 +135,28 @@ func NewManager(queueStore storage.QueueStore, groupStore storage.ConsumerGroupS
}
return &Manager{
queueStore: queueStore,
groupStore: groupStore,
consumerManager: consumerMgr,
deliverFn: deliverFn,
logger: logger,
config: config,
cluster: cl,
localNodeID: localNodeID,
stopCh: make(chan struct{}),
metrics: metrics,
queueStore: queueStore,
groupStore: groupStore,
consumerManager: consumerMgr,
deliverFn: deliverFn,
logger: logger,
config: config,
writePolicy: normalizeWritePolicy(config.WritePolicy),
distributionMode: normalizeDistributionMode(config.DistributionMode),
cluster: cl,
localNodeID: localNodeID,
stopCh: make(chan struct{}),
metrics: metrics,
}
}
// Start starts background workers.
func (m *Manager) Start(ctx context.Context) error {
if m.distributionMode == DistributionReplicate && (m.raftManager == nil || !m.raftManager.IsEnabled()) {
m.logger.Warn("distribution_mode=replicate requires raft to be enabled; falling back to forward")
m.distributionMode = DistributionForward
}
// Ensure reserved queues exist
if err := m.ensureReservedQueues(ctx); err != nil {
return fmt.Errorf("failed to create reserved queues: %w", err)
@@ -292,45 +307,81 @@ func (m *Manager) ListQueues(ctx context.Context) ([]types.QueueConfig, error) {
// Publish adds a message to all queues whose topic patterns match the topic.
// This is the NATS JetQueue-style "multi-queue" routing.
// It also forwards the publish to remote nodes that have consumers for the topic.
func (m *Manager) Publish(ctx context.Context, topic string, payload []byte, properties map[string]string) error {
func (m *Manager) Publish(ctx context.Context, publish types.PublishRequest) error {
if m.raftManager != nil && m.raftManager.IsEnabled() && !m.raftManager.IsLeader() {
switch m.writePolicy {
case WritePolicyReject:
leaderAddr := m.raftManager.Leader()
if leaderAddr == "" {
return fmt.Errorf("raft leader unavailable")
}
return fmt.Errorf("raft leader is at %s", leaderAddr)
case WritePolicyForward:
return m.forwardPublishToLeader(ctx, publish)
case WritePolicyLocal:
// fall through to local append
default:
// Unknown policy - default to local append for backward compatibility.
}
}
// Store locally in matching queues
if err := m.PublishLocal(ctx, topic, payload, properties); err != nil {
if err := m.publishLocal(ctx, publish); err != nil {
return err
}
// Forward to remote nodes that have consumers
if m.cluster != nil {
m.forwardToRemoteNodes(ctx, topic, payload, properties)
switch m.distributionMode {
case DistributionForward:
m.forwardToRemoteNodes(ctx, publish, false)
case DistributionReplicate:
// In replicate mode, only forward to nodes whose queues do not exist locally.
// This avoids duplicates while still supporting locally-created queues on remote nodes.
m.forwardToRemoteNodes(ctx, publish, true)
}
}
return nil
}
// PublishLocal stores a message in local matching queues without forwarding.
// This is called directly when receiving forwarded publishes from other nodes.
func (m *Manager) PublishLocal(ctx context.Context, topic string, payload []byte, properties map[string]string) error {
// HandleQueuePublish implements cluster.QueueHandler.HandleQueuePublish.
func (m *Manager) HandleQueuePublish(ctx context.Context, publish types.PublishRequest, mode types.PublishMode) error {
switch mode {
case types.PublishLocal:
return m.publishLocal(ctx, publish)
case types.PublishForwarded:
return m.publishLocal(ctx, publish)
case types.PublishNormal:
fallthrough
default:
return m.Publish(ctx, publish)
}
}
func (m *Manager) publishLocal(ctx context.Context, publish types.PublishRequest) error {
// Find all matching queues
queues, err := m.queueStore.FindMatchingQueues(ctx, topic)
queues, err := m.queueStore.FindMatchingQueues(ctx, publish.Topic)
if err != nil {
return fmt.Errorf("failed to find matching queues: %w", err)
}
if len(queues) == 0 {
m.logger.Debug("no queues match topic, creating new queue", slog.String("topic", topic))
m.logger.Debug("no queues match topic, creating new queue", slog.String("topic", publish.Topic))
// Use the topic as the queue name and the topic itself as the matching pattern.
if _, err := m.GetOrCreateQueue(ctx, topic, topic); err != nil {
m.logger.Error("failed to create ephemeral queue", slog.String("topic", topic), slog.String("error", err.Error()))
if _, err := m.GetOrCreateQueue(ctx, publish.Topic, publish.Topic); err != nil {
m.logger.Error("failed to create ephemeral queue", slog.String("topic", publish.Topic), slog.String("error", err.Error()))
return err
}
// After creating, find it again.
queues, err = m.queueStore.FindMatchingQueues(ctx, topic)
queues, err = m.queueStore.FindMatchingQueues(ctx, publish.Topic)
if err != nil {
return fmt.Errorf("failed to find matching queues after creation: %w", err)
}
}
if len(queues) == 0 {
m.logger.Debug("no queues match topic", slog.String("topic", topic))
m.logger.Debug("no queues match topic", slog.String("topic", publish.Topic))
return nil
}
@@ -345,9 +396,9 @@ func (m *Manager) PublishLocal(ctx context.Context, topic string, payload []byte
// Create message for this queue
msg := &types.Message{
ID: generateMessageID(),
Payload: payload,
Topic: topic,
Properties: properties,
Payload: publish.Payload,
Topic: publish.Topic,
Properties: publish.Properties,
State: types.StateQueued,
CreatedAt: time.Now(),
ExpiresAt: time.Now().Add(queueConfig.MessageTTL),
@@ -363,14 +414,14 @@ func (m *Manager) PublishLocal(ctx context.Context, topic string, payload []byte
if err != nil {
m.logger.Warn("failed to append to queue",
slog.String("queue", queueName),
slog.String("topic", topic),
slog.String("topic", publish.Topic),
slog.String("error", err.Error()))
continue
}
m.logger.Debug("message published",
slog.String("queue", queueName),
slog.String("topic", topic),
slog.String("topic", publish.Topic),
slog.Uint64("offset", offset))
}
@@ -378,7 +429,7 @@ func (m *Manager) PublishLocal(ctx context.Context, topic string, payload []byte
}
// forwardToRemoteNodes forwards a publish to nodes that have consumers for the topic.
func (m *Manager) forwardToRemoteNodes(ctx context.Context, topic string, payload []byte, properties map[string]string) {
func (m *Manager) forwardToRemoteNodes(ctx context.Context, publish types.PublishRequest, unknownOnly bool) {
// Get all consumers from the cluster
consumers, err := m.cluster.ListAllQueueConsumers(ctx)
if err != nil {
@@ -387,6 +438,27 @@ func (m *Manager) forwardToRemoteNodes(ctx context.Context, topic string, payloa
return
}
queueExistsCache := make(map[string]bool)
queueExists := func(queueName string) bool {
if exists, ok := queueExistsCache[queueName]; ok {
return exists
}
_, err := m.queueStore.GetQueue(ctx, queueName)
if err == nil {
queueExistsCache[queueName] = true
return true
}
if err != storage.ErrQueueNotFound {
m.logger.Warn("failed to check queue existence for forwarding",
slog.String("queue", queueName),
slog.String("error", err.Error()))
}
queueExistsCache[queueName] = false
return false
}
// Find unique remote nodes that have consumers for queues matching this topic
remoteNodes := make(map[string]bool)
for _, c := range consumers {
@@ -395,24 +467,28 @@ func (m *Manager) forwardToRemoteNodes(ctx context.Context, topic string, payloa
continue
}
if unknownOnly && queueExists(c.QueueName) {
continue
}
// Check if this consumer's queue pattern matches the topic
queuePattern := "$queue/" + c.QueueName + "/#"
if matchesTopic(queuePattern, topic) {
if matchesTopic(queuePattern, publish.Topic) {
remoteNodes[c.ProxyNodeID] = true
}
}
// Forward to each unique remote node
for nodeID := range remoteNodes {
if err := m.cluster.ForwardQueuePublish(ctx, nodeID, topic, payload, properties); err != nil {
if err := m.cluster.ForwardQueuePublish(ctx, nodeID, publish.Topic, publish.Payload, publish.Properties, false); err != nil {
m.logger.Warn("failed to forward publish to remote node",
slog.String("node", nodeID),
slog.String("topic", topic),
slog.String("topic", publish.Topic),
slog.String("error", err.Error()))
} else {
m.logger.Debug("forwarded publish to remote node",
slog.String("node", nodeID),
slog.String("topic", topic))
slog.String("topic", publish.Topic))
}
}
}
@@ -424,7 +500,11 @@ func matchesTopic(filter, topic string) bool {
// Enqueue is an alias for Publish for backward compatibility.
func (m *Manager) Enqueue(ctx context.Context, topic string, payload []byte, properties map[string]string) error {
return m.Publish(ctx, topic, payload, properties)
return m.Publish(ctx, types.PublishRequest{
Topic: topic,
Payload: payload,
Properties: properties,
})
}
// --- Subscribe Operations ---
@@ -905,12 +985,25 @@ func (m *Manager) deliverMessages() {
}
// Also deliver to remote consumers registered in cluster
if m.cluster != nil {
if m.cluster != nil && m.distributionMode == DistributionForward {
m.deliverToRemoteConsumers(ctx, &queueConfig)
}
}
}
func (m *Manager) forwardPublishToLeader(ctx context.Context, publish types.PublishRequest) error {
if m.cluster == nil {
return fmt.Errorf("cluster not configured for leader forward")
}
leaderID := m.raftManager.LeaderID()
if leaderID == "" {
return fmt.Errorf("raft leader unavailable")
}
return m.cluster.ForwardQueuePublish(ctx, leaderID, publish.Topic, publish.Payload, publish.Properties, true)
}
// deliverToRemoteConsumers delivers messages to consumers registered on remote nodes.
// This enables cross-node queue message routing.
func (m *Manager) deliverToRemoteConsumers(ctx context.Context, config *types.QueueConfig) {
@@ -1548,7 +1641,11 @@ func (m *Manager) CommitOffset(ctx context.Context, queueName, groupID string, o
// EnqueueLocal implements cluster.QueueHandler.EnqueueLocal.
func (m *Manager) EnqueueLocal(ctx context.Context, topic string, payload []byte, properties map[string]string) (string, error) {
err := m.Publish(ctx, topic, payload, properties)
err := m.Publish(ctx, types.PublishRequest{
Topic: topic,
Payload: payload,
Properties: properties,
})
if err != nil {
return "", err
}
+182 -4
View File
@@ -7,16 +7,20 @@ import (
"context"
"io"
"log/slog"
"reflect"
"sync"
"testing"
"time"
"unsafe"
"github.com/absmach/fluxmq/cluster"
clusterv1 "github.com/absmach/fluxmq/pkg/proto/cluster/v1"
queueraft "github.com/absmach/fluxmq/queue/raft"
"github.com/absmach/fluxmq/queue/storage"
memlog "github.com/absmach/fluxmq/queue/storage/memory/log"
"github.com/absmach/fluxmq/queue/types"
brokerstorage "github.com/absmach/fluxmq/storage"
hraft "github.com/hashicorp/raft"
)
// mockGroupStore implements storage.ConsumerGroupStore for testing.
@@ -358,7 +362,11 @@ func TestStreamGroupDeliversWithoutPEL(t *testing.T) {
t.Fatalf("SubscribeWithCursor failed: %v", err)
}
if err := mgr.Publish(context.Background(), "$queue/events/test", []byte("hello"), nil); err != nil {
if err := mgr.Publish(context.Background(), types.PublishRequest{
Topic: "$queue/events/test",
Payload: []byte("hello"),
Properties: nil,
}); err != nil {
t.Fatalf("Publish failed: %v", err)
}
@@ -427,7 +435,11 @@ func TestRetentionOffsetMessages(t *testing.T) {
}
for i := 0; i < 3; i++ {
if err := mgr.Publish(context.Background(), "$queue/events/test", []byte("msg"), nil); err != nil {
if err := mgr.Publish(context.Background(), types.PublishRequest{
Topic: "$queue/events/test",
Payload: []byte("msg"),
Properties: nil,
}); err != nil {
t.Fatalf("Publish failed: %v", err)
}
}
@@ -703,6 +715,9 @@ type mockCluster struct {
nodeID string
routedMessages []routedMessage
routedMessagesMu sync.Mutex
forwardCalls []forwardPublishCall
forwardCallsMu sync.Mutex
queueConsumers []*cluster.QueueConsumerInfo
}
type routedMessage struct {
@@ -714,10 +729,19 @@ type routedMessage struct {
sequence int64
}
type forwardPublishCall struct {
nodeID string
topic string
payload []byte
properties map[string]string
forwardToLeader bool
}
func newMockCluster(nodeID string) *mockCluster {
return &mockCluster{
nodeID: nodeID,
routedMessages: make([]routedMessage, 0),
forwardCalls: make([]forwardPublishCall, 0),
}
}
@@ -745,6 +769,18 @@ func (c *mockCluster) GetRoutedMessages() []routedMessage {
return result
}
func (c *mockCluster) GetForwardCalls() []forwardPublishCall {
c.forwardCallsMu.Lock()
defer c.forwardCallsMu.Unlock()
result := make([]forwardPublishCall, len(c.forwardCalls))
copy(result, c.forwardCalls)
return result
}
func (c *mockCluster) SetQueueConsumers(consumers []*cluster.QueueConsumerInfo) {
c.queueConsumers = consumers
}
func (c *mockCluster) Start() error { return nil }
func (c *mockCluster) Stop() error { return nil }
func (c *mockCluster) IsLeader() bool { return true }
@@ -804,13 +840,66 @@ func (c *mockCluster) ListQueueConsumersByGroup(ctx context.Context, queueName,
}
func (c *mockCluster) ListAllQueueConsumers(ctx context.Context) ([]*cluster.QueueConsumerInfo, error) {
return nil, nil
if c.queueConsumers == nil {
return nil, nil
}
consumers := make([]*cluster.QueueConsumerInfo, len(c.queueConsumers))
copy(consumers, c.queueConsumers)
return consumers, nil
}
func (c *mockCluster) ForwardQueuePublish(ctx context.Context, nodeID, topic string, payload []byte, properties map[string]string) error {
func (c *mockCluster) ForwardQueuePublish(ctx context.Context, nodeID, topic string, payload []byte, properties map[string]string, forwardToLeader bool) error {
c.forwardCallsMu.Lock()
defer c.forwardCallsMu.Unlock()
c.forwardCalls = append(c.forwardCalls, forwardPublishCall{
nodeID: nodeID,
topic: topic,
payload: payload,
properties: properties,
forwardToLeader: forwardToLeader,
})
return nil
}
func setUnexportedField(t *testing.T, target any, fieldName string, value any) {
t.Helper()
v := reflect.ValueOf(target)
if v.Kind() != reflect.Ptr || v.IsNil() {
t.Fatalf("target must be a non-nil pointer")
}
elem := v.Elem()
field := elem.FieldByName(fieldName)
if !field.IsValid() {
t.Fatalf("field %q not found on %T", fieldName, target)
}
val := reflect.ValueOf(value)
if !val.IsValid() {
t.Fatalf("value for %q is invalid", fieldName)
}
if !val.Type().AssignableTo(field.Type()) {
t.Fatalf("cannot assign %s to %s for %q", val.Type(), field.Type(), fieldName)
}
reflect.NewAt(field.Type(), unsafe.Pointer(field.UnsafeAddr())).Elem().Set(val)
}
func newTestRaftManager(t *testing.T, leaderID string) *queueraft.Manager {
t.Helper()
rm := &queueraft.Manager{}
setUnexportedField(t, rm, "config", queueraft.ManagerConfig{Enabled: true})
fakeRaft := &hraft.Raft{}
setUnexportedField(t, fakeRaft, "leaderID", hraft.ServerID(leaderID))
setUnexportedField(t, rm, "raft", fakeRaft)
return rm
}
func TestCrossNodeMessageRouting(t *testing.T) {
logStore := memlog.New()
groupStore := newMockGroupStore()
@@ -880,6 +969,95 @@ func TestCrossNodeMessageRouting(t *testing.T) {
}
}
func TestPublishForwardPolicySkipsRemoteForwarding(t *testing.T) {
logStore := memlog.New()
groupStore := newMockGroupStore()
logger := slog.Default()
mockCl := newMockCluster("node-1")
mockCl.SetQueueConsumers([]*cluster.QueueConsumerInfo{
{
QueueName: "test",
ProxyNodeID: "node-2",
},
})
config := DefaultConfig()
config.WritePolicy = WritePolicyForward
config.DistributionMode = DistributionForward
manager := NewManager(logStore, groupStore, func(ctx context.Context, clientID string, msg any) error {
return nil
}, config, logger, mockCl)
manager.SetRaftManager(newTestRaftManager(t, "node-2"))
ctx := context.Background()
err := manager.Publish(ctx, types.PublishRequest{
Topic: "$queue/test/msg",
Payload: []byte("hello"),
})
if err != nil {
t.Fatalf("Publish returned error: %v", err)
}
calls := mockCl.GetForwardCalls()
if len(calls) != 1 {
t.Fatalf("expected 1 forward call, got %d", len(calls))
}
if !calls[0].forwardToLeader {
t.Fatalf("expected forward-to-leader call, got forwardToLeader=%v", calls[0].forwardToLeader)
}
if calls[0].nodeID != "node-2" {
t.Fatalf("expected leader nodeID node-2, got %s", calls[0].nodeID)
}
}
func TestPublishReplicateModeForwardsUnknownQueues(t *testing.T) {
logStore := memlog.New()
groupStore := newMockGroupStore()
logger := slog.Default()
mockCl := newMockCluster("node-1")
mockCl.SetQueueConsumers([]*cluster.QueueConsumerInfo{
{
QueueName: "+",
ProxyNodeID: "node-2",
},
})
config := DefaultConfig()
config.DistributionMode = DistributionReplicate
manager := NewManager(logStore, groupStore, func(ctx context.Context, clientID string, msg any) error {
return nil
}, config, logger, mockCl)
ctx := context.Background()
err := manager.Publish(ctx, types.PublishRequest{
Topic: "$queue/test/tpc/msg",
Payload: []byte("hello"),
})
if err != nil {
t.Fatalf("Publish returned error: %v", err)
}
calls := mockCl.GetForwardCalls()
if len(calls) != 1 {
t.Fatalf("expected 1 forward call, got %d", len(calls))
}
if calls[0].forwardToLeader {
t.Fatalf("expected forward-to-remote call, got forwardToLeader=%v", calls[0].forwardToLeader)
}
if calls[0].nodeID != "node-2" {
t.Fatalf("expected remote nodeID node-2, got %s", calls[0].nodeID)
}
}
func TestDeliverQueueMessage(t *testing.T) {
logStore := memlog.New()
groupStore := newMockGroupStore()
+41
View File
@@ -0,0 +1,41 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package queue
import "strings"
// WritePolicy controls how non-leader nodes handle queue writes when Raft is enabled.
type WritePolicy string
const (
WritePolicyLocal WritePolicy = "local" // Append locally (no Raft redirect)
WritePolicyReject WritePolicy = "reject" // Reject writes on non-leaders
WritePolicyForward WritePolicy = "forward" // Forward writes to the Raft leader
)
// DistributionMode controls how queue messages reach consumers across nodes.
type DistributionMode string
const (
DistributionForward DistributionMode = "forward" // Forward publishes to nodes with consumers
DistributionReplicate DistributionMode = "replicate" // Rely on Raft to replicate queue logs
)
func normalizeWritePolicy(policy WritePolicy) WritePolicy {
switch WritePolicy(strings.ToLower(string(policy))) {
case WritePolicyLocal, WritePolicyReject, WritePolicyForward:
return WritePolicy(strings.ToLower(string(policy)))
default:
return WritePolicyLocal
}
}
func normalizeDistributionMode(mode DistributionMode) DistributionMode {
switch DistributionMode(strings.ToLower(string(mode))) {
case DistributionForward, DistributionReplicate:
return DistributionMode(strings.ToLower(string(mode)))
default:
return DistributionForward
}
}
+20
View File
@@ -0,0 +1,20 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package types
// PublishRequest encapsulates publish data for queue routing.
type PublishRequest struct {
Topic string
Payload []byte
Properties map[string]string
}
// PublishMode controls how the queue manager should handle a publish.
type PublishMode int
const (
PublishNormal PublishMode = iota
PublishLocal
PublishForwarded
)
+1 -1
View File
@@ -85,7 +85,7 @@ func (m *mockCluster) ListAllQueueConsumers(ctx context.Context) ([]*cluster.Que
return nil, nil
}
func (m *mockCluster) ForwardQueuePublish(ctx context.Context, nodeID, topic string, payload []byte, properties map[string]string) error {
func (m *mockCluster) ForwardQueuePublish(ctx context.Context, nodeID, topic string, payload []byte, properties map[string]string, forwardToLeader bool) error {
return nil
}
+15 -3
View File
@@ -127,7 +127,11 @@ func (h *Handler) Append(ctx context.Context, req *connect.Request[queuev1.Appen
}
}
if err := h.manager.Publish(ctx, msg.QueueName, msg.Value, properties); err != nil {
if err := h.manager.Publish(ctx, types.PublishRequest{
Topic: msg.QueueName,
Payload: msg.Value,
Properties: properties,
}); err != nil {
return nil, connect.NewError(connect.CodeInternal, err)
}
@@ -153,7 +157,11 @@ func (h *Handler) AppendBatch(ctx context.Context, req *connect.Request[queuev1.
}
}
if err := h.manager.Publish(ctx, msg.QueueName, entry.Value, properties); err != nil {
if err := h.manager.Publish(ctx, types.PublishRequest{
Topic: msg.QueueName,
Payload: entry.Value,
Properties: properties,
}); err != nil {
return nil, connect.NewError(connect.CodeInternal, err)
}
@@ -189,7 +197,11 @@ func (h *Handler) AppendStream(ctx context.Context, stream *connect.ClientStream
}
}
if err := h.manager.Publish(ctx, msg.QueueName, msg.Value, properties); err != nil {
if err := h.manager.Publish(ctx, types.PublishRequest{
Topic: msg.QueueName,
Payload: msg.Value,
Properties: properties,
}); err != nil {
continue
}