Add gRPC Log API

Signed-off-by: dusan <borovcanindusan1@gmail.com>
This commit is contained in:
dusan
2026-01-25 20:05:42 +01:00
parent c04383a456
commit 362e3f0cdb
28 changed files with 9296 additions and 1429 deletions
+9 -5
View File
@@ -10,9 +10,6 @@ LDFLAGS := -s -w
GOFLAGS := -trimpath
PKG_PROTO_GEN_OUT_DIR=cluster/grpc
INTERNAL_PROTO_DIR=proto
INTERNAL_PROTO_FILES := $(shell find $(INTERNAL_PROTO_DIR) -name "*.proto" | sed 's|$(INTERNAL_PROTO_DIR)/||')
# Default target
.PHONY: all
@@ -159,8 +156,15 @@ clean-data:
.PHONY: proto
proto:
mkdir -p $(PKG_PROTO_GEN_OUT_DIR)
protoc -I $(INTERNAL_PROTO_DIR) --go_out=$(PKG_PROTO_GEN_OUT_DIR) --go_opt=paths=source_relative --go-grpc_out=$(PKG_PROTO_GEN_OUT_DIR) --go-grpc_opt=paths=source_relative $(INTERNAL_PROTO_FILES)
buf generate
.PHONY: proto-lint
proto-lint:
buf lint
.PHONY: proto-breaking
proto-breaking:
buf breaking --against '.git#branch=main'
# Show help
.PHONY: help
+7
View File
@@ -152,6 +152,13 @@ func (b *Broker) SetQueueManager(qm QueueManager) error {
return nil
}
// GetQueueManager returns the queue manager.
func (b *Broker) GetQueueManager() QueueManager {
b.mu.RLock()
defer b.mu.RUnlock()
return b.queueManager
}
// Get returns a session by client ID.
func (b *Broker) Get(clientID string) *session.Session {
return b.sessionsMap.Get(clientID)
+11 -11
View File
@@ -10,7 +10,7 @@ import (
"time"
"github.com/absmach/fluxmq/broker/events"
"github.com/absmach/fluxmq/cluster/grpc"
clusterv1 "github.com/absmach/fluxmq/pkg/proto/cluster/v1"
"github.com/absmach/fluxmq/session"
"github.com/absmach/fluxmq/storage"
"github.com/absmach/fluxmq/storage/messages"
@@ -25,7 +25,7 @@ func (b *Broker) CreateSession(clientID string, version byte, opts session.Optio
defer b.mu.Unlock()
// Check if session is owned by another node in the cluster
var takeoverState *grpc.SessionState
var takeoverState *clusterv1.SessionState
if b.cluster != nil {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
@@ -383,7 +383,7 @@ func (b *Broker) restoreSessionFromStorage(s *session.Session, clientID string,
}
// restoreInflightFromTakeover restores inflight messages from takeover state.
func (b *Broker) restoreInflightFromTakeover(state *grpc.SessionState, tracker messages.Inflight) error {
func (b *Broker) restoreInflightFromTakeover(state *clusterv1.SessionState, tracker messages.Inflight) error {
if state == nil || state.InflightMessages == nil {
return nil
}
@@ -406,7 +406,7 @@ func (b *Broker) restoreInflightFromTakeover(state *grpc.SessionState, tracker m
}
// restoreQueueFromTakeover restores offline queue from takeover state.
func (b *Broker) restoreQueueFromTakeover(state *grpc.SessionState, queue messages.Queue) error {
func (b *Broker) restoreQueueFromTakeover(state *clusterv1.SessionState, queue messages.Queue) error {
if state == nil || state.QueuedMessages == nil {
return nil
}
@@ -428,7 +428,7 @@ func (b *Broker) restoreQueueFromTakeover(state *grpc.SessionState, queue messag
}
// restoreSubscriptionsFromTakeover restores subscriptions from takeover state.
func (b *Broker) restoreSubscriptionsFromTakeover(s *session.Session, state *grpc.SessionState) error {
func (b *Broker) restoreSubscriptionsFromTakeover(s *session.Session, state *clusterv1.SessionState) error {
if state == nil || state.Subscriptions == nil {
return nil
}
@@ -466,7 +466,7 @@ func (b *Broker) restoreSubscriptionsFromTakeover(s *session.Session, state *grp
// GetSessionStateAndClose disconnects a session, retrieves its state, and returns it.
// This is used during session takeover.
func (b *Broker) GetSessionStateAndClose(ctx context.Context, clientID string) (*grpc.SessionState, error) {
func (b *Broker) GetSessionStateAndClose(ctx context.Context, clientID string) (*clusterv1.SessionState, error) {
b.mu.Lock()
defer b.mu.Unlock()
@@ -476,7 +476,7 @@ func (b *Broker) GetSessionStateAndClose(ctx context.Context, clientID string) (
}
// Capture state before destroying
state := &grpc.SessionState{
state := &clusterv1.SessionState{
ExpiryInterval: uint32(s.ExpiryInterval),
CleanStart: s.CleanStart,
}
@@ -486,7 +486,7 @@ func (b *Broker) GetSessionStateAndClose(ctx context.Context, clientID string) (
subs, err := b.subscriptions.GetForClient(s.ID)
if err == nil {
for _, sub := range subs {
state.Subscriptions = append(state.Subscriptions, &grpc.Subscription{
state.Subscriptions = append(state.Subscriptions, &clusterv1.Subscription{
Filter: sub.Filter,
Qos: uint32(sub.QoS),
})
@@ -496,7 +496,7 @@ func (b *Broker) GetSessionStateAndClose(ctx context.Context, clientID string) (
// Capture inflight messages
for _, msg := range s.Inflight().GetAll() {
state.InflightMessages = append(state.InflightMessages, &grpc.InflightMessage{
state.InflightMessages = append(state.InflightMessages, &clusterv1.InflightMessage{
PacketId: uint32(msg.PacketID),
Topic: msg.Message.Topic,
Payload: msg.Message.Payload,
@@ -508,7 +508,7 @@ func (b *Broker) GetSessionStateAndClose(ctx context.Context, clientID string) (
// Capture queued messages
for _, msg := range s.OfflineQueue().Drain() {
state.QueuedMessages = append(state.QueuedMessages, &grpc.QueuedMessage{
state.QueuedMessages = append(state.QueuedMessages, &clusterv1.QueuedMessage{
Topic: msg.Topic,
Payload: msg.GetPayload(),
Qos: uint32(msg.QoS),
@@ -519,7 +519,7 @@ func (b *Broker) GetSessionStateAndClose(ctx context.Context, clientID string) (
// Capture will message
if will := s.GetWill(); will != nil {
state.Will = &grpc.WillMessage{
state.Will = &clusterv1.WillMessage{
Topic: will.Topic,
Payload: will.Payload,
Qos: uint32(will.QoS),
+23
View File
@@ -0,0 +1,23 @@
# Copyright (c) Abstract Machines
# SPDX-License-Identifier: Apache-2.0
version: v2
managed:
enabled: true
override:
- file_option: go_package_prefix
value: github.com/absmach/fluxmq/pkg/proto
plugins:
# Standard protobuf Go code
- remote: buf.build/protocolbuffers/go
out: pkg/proto
opt:
- paths=source_relative
# Connect-Go for gRPC + HTTP support
- remote: buf.build/connectrpc/go
out: pkg/proto
opt:
- paths=source_relative
+6
View File
@@ -0,0 +1,6 @@
# Generated by buf. DO NOT EDIT.
version: v2
deps:
- name: buf.build/googleapis/googleapis
commit: 004180b77378443887d3b55cabc00384
digest: b5:e8f475fe3330f31f5fd86ac689093bcd274e19611a09db91f41d637cb9197881ce89882b94d13a58738e53c91c6e4bae7dc1feba85f590164c975a89e25115dc
+22
View File
@@ -0,0 +1,22 @@
# Copyright (c) Abstract Machines
# SPDX-License-Identifier: Apache-2.0
version: v2
modules:
- path: proto
name: buf.build/absmach/fluxmq
lint:
use:
- STANDARD
except:
- PACKAGE_VERSION_SUFFIX
- PACKAGE_DIRECTORY_MATCH
- RPC_REQUEST_RESPONSE_UNIQUE
- RPC_REQUEST_STANDARD_NAME
- RPC_RESPONSE_STANDARD_NAME
breaking:
use:
- FILE
+3 -3
View File
@@ -7,8 +7,8 @@ import (
"context"
"time"
"github.com/absmach/fluxmq/cluster/grpc"
"github.com/absmach/fluxmq/core"
clusterv1 "github.com/absmach/fluxmq/pkg/proto/cluster/v1"
"github.com/absmach/fluxmq/storage"
)
@@ -116,7 +116,7 @@ type Cluster interface {
// This is called when a client reconnects to a different node.
// The old node disconnects the client and returns its full state.
// Returns the session state to be restored, or nil if no state exists.
TakeoverSession(ctx context.Context, clientID, fromNode, toNode string) (*grpc.SessionState, error)
TakeoverSession(ctx context.Context, clientID, fromNode, toNode string) (*clusterv1.SessionState, error)
// EnqueueRemote sends an enqueue request to a remote partition owner.
// This is called when a message needs to be enqueued on a partition owned by another node.
@@ -168,7 +168,7 @@ type MessageHandler interface {
// GetSessionStateAndClose captures the full state of a session and closes it.
// This is called when another node is taking over the session.
// Returns nil if the session doesn't exist on this node.
GetSessionStateAndClose(ctx context.Context, clientID string) (*grpc.SessionState, error)
GetSessionStateAndClose(ctx context.Context, clientID string) (*clusterv1.SessionState, error)
// GetRetainedMessage fetches a retained message from the local store.
// This is called when another node requests a large retained message payload.
+4 -4
View File
@@ -13,8 +13,8 @@ import (
"sync"
"time"
"github.com/absmach/fluxmq/cluster/grpc"
"github.com/absmach/fluxmq/core"
clusterv1 "github.com/absmach/fluxmq/pkg/proto/cluster/v1"
"github.com/absmach/fluxmq/storage"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
@@ -957,7 +957,7 @@ func (c *EtcdCluster) RoutePublish(ctx context.Context, topic string, payload []
}
// TakeoverSession initiates session takeover from one node to another.
func (c *EtcdCluster) TakeoverSession(ctx context.Context, clientID, fromNode, toNode string) (*grpc.SessionState, error) {
func (c *EtcdCluster) TakeoverSession(ctx context.Context, clientID, fromNode, toNode string) (*clusterv1.SessionState, error) {
if fromNode == toNode {
// Same node, no takeover needed
return nil, nil
@@ -1028,7 +1028,7 @@ func (c *EtcdCluster) DeliverToClient(ctx context.Context, clientID string, msg
// GetSessionStateAndClose implements MessageHandler.GetSessionStateAndClose.
// Delegates to the broker to capture session state and close the session.
func (c *EtcdCluster) GetSessionStateAndClose(ctx context.Context, clientID string) (*grpc.SessionState, error) {
func (c *EtcdCluster) GetSessionStateAndClose(ctx context.Context, clientID string) (*clusterv1.SessionState, error) {
if c.msgHandler == nil {
return nil, fmt.Errorf("no message handler configured")
}
@@ -1074,7 +1074,7 @@ func (c *EtcdCluster) HandlePublish(ctx context.Context, clientID, topic string,
// HandleTakeover implements TransportHandler.HandleTakeover.
// Called when another broker requests to take over a session from this node.
func (c *EtcdCluster) HandleTakeover(ctx context.Context, clientID, fromNode, toNode string, state *grpc.SessionState) (*grpc.SessionState, error) {
func (c *EtcdCluster) HandleTakeover(ctx context.Context, clientID, fromNode, toNode string, state *clusterv1.SessionState) (*clusterv1.SessionState, error) {
// Verify this is the node being asked to give up the session
if fromNode != c.nodeID {
return nil, fmt.Errorf("takeover request for wrong node: expected %s, got %s", c.nodeID, fromNode)
-457
View File
@@ -1,457 +0,0 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.6.0
// - protoc v6.33.0
// source: broker.proto
package grpc
import (
context "context"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
)
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.64.0 or later.
const _ = grpc.SupportPackageIsVersion9
const (
BrokerService_RoutePublish_FullMethodName = "/cluster.BrokerService/RoutePublish"
BrokerService_TakeoverSession_FullMethodName = "/cluster.BrokerService/TakeoverSession"
BrokerService_FetchRetained_FullMethodName = "/cluster.BrokerService/FetchRetained"
BrokerService_FetchWill_FullMethodName = "/cluster.BrokerService/FetchWill"
BrokerService_EnqueueRemote_FullMethodName = "/cluster.BrokerService/EnqueueRemote"
BrokerService_RouteQueueMessage_FullMethodName = "/cluster.BrokerService/RouteQueueMessage"
BrokerService_AppendEntries_FullMethodName = "/cluster.BrokerService/AppendEntries"
BrokerService_RequestVote_FullMethodName = "/cluster.BrokerService/RequestVote"
BrokerService_InstallSnapshot_FullMethodName = "/cluster.BrokerService/InstallSnapshot"
)
// BrokerServiceClient is the client API for BrokerService service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
//
// BrokerService defines RPC methods for inter-broker communication in the cluster.
type BrokerServiceClient interface {
// RoutePublish forwards a PUBLISH packet to the broker that owns the target session.
RoutePublish(ctx context.Context, in *PublishRequest, opts ...grpc.CallOption) (*PublishResponse, error)
// TakeoverSession migrates a session from one broker to another.
TakeoverSession(ctx context.Context, in *TakeoverRequest, opts ...grpc.CallOption) (*TakeoverResponse, error)
// FetchRetained fetches a retained message payload from the owning broker node.
// Used when a large retained message (above threshold) needs to be delivered to a subscriber on a different node.
FetchRetained(ctx context.Context, in *FetchRetainedRequest, opts ...grpc.CallOption) (*FetchRetainedResponse, error)
// FetchWill fetches a will message payload from the owning broker node.
// Used when a large will message (above threshold) needs to be published after client disconnect.
FetchWill(ctx context.Context, in *FetchWillRequest, opts ...grpc.CallOption) (*FetchWillResponse, error)
// EnqueueRemote routes an enqueue operation to the partition owner node.
// Used in distributed queue mode when a message needs to be enqueued on a remote partition.
EnqueueRemote(ctx context.Context, in *EnqueueRemoteRequest, opts ...grpc.CallOption) (*EnqueueRemoteResponse, error)
// RouteQueueMessage delivers a queue message to a consumer on a different node.
// Used when partition owner needs to deliver to a consumer connected to another node.
RouteQueueMessage(ctx context.Context, in *RouteQueueMessageRequest, opts ...grpc.CallOption) (*RouteQueueMessageResponse, error)
// AppendEntries is invoked by the Raft leader to replicate log entries.
// Also used as a heartbeat to maintain leadership.
AppendEntries(ctx context.Context, in *AppendEntriesRequest, opts ...grpc.CallOption) (*AppendEntriesResponse, error)
// RequestVote is invoked by candidates during leader election.
RequestVote(ctx context.Context, in *RequestVoteRequest, opts ...grpc.CallOption) (*RequestVoteResponse, error)
// InstallSnapshot is invoked by leader to transfer snapshot to a follower.
InstallSnapshot(ctx context.Context, in *InstallSnapshotRequest, opts ...grpc.CallOption) (*InstallSnapshotResponse, error)
}
type brokerServiceClient struct {
cc grpc.ClientConnInterface
}
func NewBrokerServiceClient(cc grpc.ClientConnInterface) BrokerServiceClient {
return &brokerServiceClient{cc}
}
func (c *brokerServiceClient) RoutePublish(ctx context.Context, in *PublishRequest, opts ...grpc.CallOption) (*PublishResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(PublishResponse)
err := c.cc.Invoke(ctx, BrokerService_RoutePublish_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *brokerServiceClient) TakeoverSession(ctx context.Context, in *TakeoverRequest, opts ...grpc.CallOption) (*TakeoverResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(TakeoverResponse)
err := c.cc.Invoke(ctx, BrokerService_TakeoverSession_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *brokerServiceClient) FetchRetained(ctx context.Context, in *FetchRetainedRequest, opts ...grpc.CallOption) (*FetchRetainedResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(FetchRetainedResponse)
err := c.cc.Invoke(ctx, BrokerService_FetchRetained_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *brokerServiceClient) FetchWill(ctx context.Context, in *FetchWillRequest, opts ...grpc.CallOption) (*FetchWillResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(FetchWillResponse)
err := c.cc.Invoke(ctx, BrokerService_FetchWill_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *brokerServiceClient) EnqueueRemote(ctx context.Context, in *EnqueueRemoteRequest, opts ...grpc.CallOption) (*EnqueueRemoteResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(EnqueueRemoteResponse)
err := c.cc.Invoke(ctx, BrokerService_EnqueueRemote_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *brokerServiceClient) RouteQueueMessage(ctx context.Context, in *RouteQueueMessageRequest, opts ...grpc.CallOption) (*RouteQueueMessageResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(RouteQueueMessageResponse)
err := c.cc.Invoke(ctx, BrokerService_RouteQueueMessage_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *brokerServiceClient) AppendEntries(ctx context.Context, in *AppendEntriesRequest, opts ...grpc.CallOption) (*AppendEntriesResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(AppendEntriesResponse)
err := c.cc.Invoke(ctx, BrokerService_AppendEntries_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *brokerServiceClient) RequestVote(ctx context.Context, in *RequestVoteRequest, opts ...grpc.CallOption) (*RequestVoteResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(RequestVoteResponse)
err := c.cc.Invoke(ctx, BrokerService_RequestVote_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *brokerServiceClient) InstallSnapshot(ctx context.Context, in *InstallSnapshotRequest, opts ...grpc.CallOption) (*InstallSnapshotResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(InstallSnapshotResponse)
err := c.cc.Invoke(ctx, BrokerService_InstallSnapshot_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
// BrokerServiceServer is the server API for BrokerService service.
// All implementations must embed UnimplementedBrokerServiceServer
// for forward compatibility.
//
// BrokerService defines RPC methods for inter-broker communication in the cluster.
type BrokerServiceServer interface {
// RoutePublish forwards a PUBLISH packet to the broker that owns the target session.
RoutePublish(context.Context, *PublishRequest) (*PublishResponse, error)
// TakeoverSession migrates a session from one broker to another.
TakeoverSession(context.Context, *TakeoverRequest) (*TakeoverResponse, error)
// FetchRetained fetches a retained message payload from the owning broker node.
// Used when a large retained message (above threshold) needs to be delivered to a subscriber on a different node.
FetchRetained(context.Context, *FetchRetainedRequest) (*FetchRetainedResponse, error)
// FetchWill fetches a will message payload from the owning broker node.
// Used when a large will message (above threshold) needs to be published after client disconnect.
FetchWill(context.Context, *FetchWillRequest) (*FetchWillResponse, error)
// EnqueueRemote routes an enqueue operation to the partition owner node.
// Used in distributed queue mode when a message needs to be enqueued on a remote partition.
EnqueueRemote(context.Context, *EnqueueRemoteRequest) (*EnqueueRemoteResponse, error)
// RouteQueueMessage delivers a queue message to a consumer on a different node.
// Used when partition owner needs to deliver to a consumer connected to another node.
RouteQueueMessage(context.Context, *RouteQueueMessageRequest) (*RouteQueueMessageResponse, error)
// AppendEntries is invoked by the Raft leader to replicate log entries.
// Also used as a heartbeat to maintain leadership.
AppendEntries(context.Context, *AppendEntriesRequest) (*AppendEntriesResponse, error)
// RequestVote is invoked by candidates during leader election.
RequestVote(context.Context, *RequestVoteRequest) (*RequestVoteResponse, error)
// InstallSnapshot is invoked by leader to transfer snapshot to a follower.
InstallSnapshot(context.Context, *InstallSnapshotRequest) (*InstallSnapshotResponse, error)
mustEmbedUnimplementedBrokerServiceServer()
}
// UnimplementedBrokerServiceServer must be embedded to have
// forward compatible implementations.
//
// NOTE: this should be embedded by value instead of pointer to avoid a nil
// pointer dereference when methods are called.
type UnimplementedBrokerServiceServer struct{}
func (UnimplementedBrokerServiceServer) RoutePublish(context.Context, *PublishRequest) (*PublishResponse, error) {
return nil, status.Error(codes.Unimplemented, "method RoutePublish not implemented")
}
func (UnimplementedBrokerServiceServer) TakeoverSession(context.Context, *TakeoverRequest) (*TakeoverResponse, error) {
return nil, status.Error(codes.Unimplemented, "method TakeoverSession not implemented")
}
func (UnimplementedBrokerServiceServer) FetchRetained(context.Context, *FetchRetainedRequest) (*FetchRetainedResponse, error) {
return nil, status.Error(codes.Unimplemented, "method FetchRetained not implemented")
}
func (UnimplementedBrokerServiceServer) FetchWill(context.Context, *FetchWillRequest) (*FetchWillResponse, error) {
return nil, status.Error(codes.Unimplemented, "method FetchWill not implemented")
}
func (UnimplementedBrokerServiceServer) EnqueueRemote(context.Context, *EnqueueRemoteRequest) (*EnqueueRemoteResponse, error) {
return nil, status.Error(codes.Unimplemented, "method EnqueueRemote not implemented")
}
func (UnimplementedBrokerServiceServer) RouteQueueMessage(context.Context, *RouteQueueMessageRequest) (*RouteQueueMessageResponse, error) {
return nil, status.Error(codes.Unimplemented, "method RouteQueueMessage not implemented")
}
func (UnimplementedBrokerServiceServer) AppendEntries(context.Context, *AppendEntriesRequest) (*AppendEntriesResponse, error) {
return nil, status.Error(codes.Unimplemented, "method AppendEntries not implemented")
}
func (UnimplementedBrokerServiceServer) RequestVote(context.Context, *RequestVoteRequest) (*RequestVoteResponse, error) {
return nil, status.Error(codes.Unimplemented, "method RequestVote not implemented")
}
func (UnimplementedBrokerServiceServer) InstallSnapshot(context.Context, *InstallSnapshotRequest) (*InstallSnapshotResponse, error) {
return nil, status.Error(codes.Unimplemented, "method InstallSnapshot not implemented")
}
func (UnimplementedBrokerServiceServer) mustEmbedUnimplementedBrokerServiceServer() {}
func (UnimplementedBrokerServiceServer) testEmbeddedByValue() {}
// UnsafeBrokerServiceServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to BrokerServiceServer will
// result in compilation errors.
type UnsafeBrokerServiceServer interface {
mustEmbedUnimplementedBrokerServiceServer()
}
func RegisterBrokerServiceServer(s grpc.ServiceRegistrar, srv BrokerServiceServer) {
// If the following call panics, it indicates UnimplementedBrokerServiceServer was
// embedded by pointer and is nil. This will cause panics if an
// unimplemented method is ever invoked, so we test this at initialization
// time to prevent it from happening at runtime later due to I/O.
if t, ok := srv.(interface{ testEmbeddedByValue() }); ok {
t.testEmbeddedByValue()
}
s.RegisterService(&BrokerService_ServiceDesc, srv)
}
func _BrokerService_RoutePublish_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(PublishRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(BrokerServiceServer).RoutePublish(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: BrokerService_RoutePublish_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(BrokerServiceServer).RoutePublish(ctx, req.(*PublishRequest))
}
return interceptor(ctx, in, info, handler)
}
func _BrokerService_TakeoverSession_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(TakeoverRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(BrokerServiceServer).TakeoverSession(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: BrokerService_TakeoverSession_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(BrokerServiceServer).TakeoverSession(ctx, req.(*TakeoverRequest))
}
return interceptor(ctx, in, info, handler)
}
func _BrokerService_FetchRetained_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(FetchRetainedRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(BrokerServiceServer).FetchRetained(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: BrokerService_FetchRetained_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(BrokerServiceServer).FetchRetained(ctx, req.(*FetchRetainedRequest))
}
return interceptor(ctx, in, info, handler)
}
func _BrokerService_FetchWill_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(FetchWillRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(BrokerServiceServer).FetchWill(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: BrokerService_FetchWill_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(BrokerServiceServer).FetchWill(ctx, req.(*FetchWillRequest))
}
return interceptor(ctx, in, info, handler)
}
func _BrokerService_EnqueueRemote_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(EnqueueRemoteRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(BrokerServiceServer).EnqueueRemote(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: BrokerService_EnqueueRemote_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(BrokerServiceServer).EnqueueRemote(ctx, req.(*EnqueueRemoteRequest))
}
return interceptor(ctx, in, info, handler)
}
func _BrokerService_RouteQueueMessage_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(RouteQueueMessageRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(BrokerServiceServer).RouteQueueMessage(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: BrokerService_RouteQueueMessage_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(BrokerServiceServer).RouteQueueMessage(ctx, req.(*RouteQueueMessageRequest))
}
return interceptor(ctx, in, info, handler)
}
func _BrokerService_AppendEntries_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(AppendEntriesRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(BrokerServiceServer).AppendEntries(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: BrokerService_AppendEntries_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(BrokerServiceServer).AppendEntries(ctx, req.(*AppendEntriesRequest))
}
return interceptor(ctx, in, info, handler)
}
func _BrokerService_RequestVote_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(RequestVoteRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(BrokerServiceServer).RequestVote(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: BrokerService_RequestVote_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(BrokerServiceServer).RequestVote(ctx, req.(*RequestVoteRequest))
}
return interceptor(ctx, in, info, handler)
}
func _BrokerService_InstallSnapshot_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(InstallSnapshotRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(BrokerServiceServer).InstallSnapshot(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: BrokerService_InstallSnapshot_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(BrokerServiceServer).InstallSnapshot(ctx, req.(*InstallSnapshotRequest))
}
return interceptor(ctx, in, info, handler)
}
// BrokerService_ServiceDesc is the grpc.ServiceDesc for BrokerService service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var BrokerService_ServiceDesc = grpc.ServiceDesc{
ServiceName: "cluster.BrokerService",
HandlerType: (*BrokerServiceServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "RoutePublish",
Handler: _BrokerService_RoutePublish_Handler,
},
{
MethodName: "TakeoverSession",
Handler: _BrokerService_TakeoverSession_Handler,
},
{
MethodName: "FetchRetained",
Handler: _BrokerService_FetchRetained_Handler,
},
{
MethodName: "FetchWill",
Handler: _BrokerService_FetchWill_Handler,
},
{
MethodName: "EnqueueRemote",
Handler: _BrokerService_EnqueueRemote_Handler,
},
{
MethodName: "RouteQueueMessage",
Handler: _BrokerService_RouteQueueMessage_Handler,
},
{
MethodName: "AppendEntries",
Handler: _BrokerService_AppendEntries_Handler,
},
{
MethodName: "RequestVote",
Handler: _BrokerService_RequestVote_Handler,
},
{
MethodName: "InstallSnapshot",
Handler: _BrokerService_InstallSnapshot_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "broker.proto",
}
+2 -2
View File
@@ -8,7 +8,7 @@ import (
"errors"
"time"
"github.com/absmach/fluxmq/cluster/grpc"
clusterv1 "github.com/absmach/fluxmq/pkg/proto/cluster/v1"
"github.com/absmach/fluxmq/storage"
)
@@ -153,7 +153,7 @@ func (n *NoopCluster) RoutePublish(ctx context.Context, topic string, payload []
// Session takeover - not applicable in single-node
func (n *NoopCluster) TakeoverSession(ctx context.Context, clientID, fromNode, toNode string) (*grpc.SessionState, error) {
func (n *NoopCluster) TakeoverSession(ctx context.Context, clientID, fromNode, toNode string) (*clusterv1.SessionState, error) {
// Single-node: no remote nodes to take over from
return nil, nil
}
+319 -280
View File
@@ -10,14 +10,17 @@ import (
"fmt"
"log/slog"
"net"
"net/http"
"os"
"sync"
"time"
"github.com/absmach/fluxmq/cluster/grpc"
"connectrpc.com/connect"
"github.com/absmach/fluxmq/core"
gogrpc "google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
clusterv1 "github.com/absmach/fluxmq/pkg/proto/cluster/v1"
"github.com/absmach/fluxmq/pkg/proto/cluster/v1/clusterv1connect"
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
)
// QueueHandler defines callbacks for queue distribution operations.
@@ -29,31 +32,39 @@ type QueueHandler interface {
DeliverQueueMessage(ctx context.Context, clientID string, msg any) error
}
// Transport handles inter-broker gRPC communication.
// Transport handles inter-broker communication using Connect protocol.
type Transport struct {
grpc.UnimplementedBrokerServiceServer
mu sync.RWMutex
nodeID string
bindAddr string
grpcServer *gogrpc.Server
httpServer *http.Server
listener net.Listener
peerClients map[string]grpc.BrokerServiceClient
peerClients map[string]clusterv1connect.BrokerServiceClient
logger *slog.Logger
handler MessageHandler
queueHandler QueueHandler
stopCh chan struct{}
tlsConfig *TransportTLSConfig
clientCreds credentials.TransportCredentials
httpClient *http.Client
}
// NewTransport creates a new gRPC transport.
// NewTransport creates a new Connect transport.
// If tlsCfg is nil, the transport uses insecure connections (development mode only).
func NewTransport(nodeID, bindAddr string, handler MessageHandler, tlsCfg *TransportTLSConfig, logger *slog.Logger) (*Transport, error) {
var listener net.Listener
var grpcServer *gogrpc.Server
var clientCreds credentials.TransportCredentials
var httpClient *http.Client
var err error
t := &Transport{
nodeID: nodeID,
bindAddr: bindAddr,
peerClients: make(map[string]clusterv1connect.BrokerServiceClient),
logger: logger,
handler: handler,
stopCh: make(chan struct{}),
tlsConfig: tlsCfg,
}
if tlsCfg != nil {
// Load server certificate and key
cert, err := tls.LoadX509KeyPair(tlsCfg.CertFile, tlsCfg.KeyFile)
@@ -78,6 +89,7 @@ func NewTransport(nodeID, bindAddr string, handler MessageHandler, tlsCfg *Trans
ClientCAs: caPool,
ClientAuth: tls.RequireAndVerifyClientCert,
MinVersion: tls.VersionTLS12,
NextProtos: []string{"h2"},
}
// Client TLS config (for connecting to peers)
@@ -93,12 +105,13 @@ func NewTransport(nodeID, bindAddr string, handler MessageHandler, tlsCfg *Trans
return nil, fmt.Errorf("failed to create TLS listener on %s: %w", bindAddr, err)
}
// Create gRPC server with TLS credentials
serverCreds := credentials.NewTLS(serverTLSConfig)
grpcServer = gogrpc.NewServer(gogrpc.Creds(serverCreds))
// Store client credentials for peer connections
clientCreds = credentials.NewTLS(clientTLSConfig)
// Create HTTP client with TLS for peer connections
httpClient = &http.Client{
Transport: &http2.Transport{
TLSClientConfig: clientTLSConfig,
},
Timeout: 30 * time.Second,
}
logger.Info("transport TLS enabled", slog.String("address", bindAddr))
} else {
@@ -108,65 +121,76 @@ func NewTransport(nodeID, bindAddr string, handler MessageHandler, tlsCfg *Trans
return nil, fmt.Errorf("failed to listen on %s: %w", bindAddr, err)
}
grpcServer = gogrpc.NewServer()
clientCreds = insecure.NewCredentials()
// Create HTTP client for insecure connections
httpClient = &http.Client{
Transport: &http2.Transport{
AllowHTTP: true,
DialTLSContext: func(ctx context.Context, network, addr string, _ *tls.Config) (net.Conn, error) {
return net.Dial(network, addr)
},
},
Timeout: 30 * time.Second,
}
logger.Warn("transport TLS disabled - using insecure connections (development mode only)")
}
t := &Transport{
nodeID: nodeID,
bindAddr: bindAddr,
grpcServer: grpcServer,
listener: listener,
peerClients: make(map[string]grpc.BrokerServiceClient),
logger: logger,
handler: handler,
stopCh: make(chan struct{}),
tlsConfig: tlsCfg,
clientCreds: clientCreds,
t.listener = listener
t.httpClient = httpClient
// Create Connect handler
mux := http.NewServeMux()
path, connectHandler := clusterv1connect.NewBrokerServiceHandler(t)
mux.Handle(path, connectHandler)
// Create HTTP server with h2c support for HTTP/2 without TLS
var httpHandler http.Handler
if tlsCfg == nil {
httpHandler = h2c.NewHandler(mux, &http2.Server{})
} else {
httpHandler = mux
}
// Register gRPC service
grpc.RegisterBrokerServiceServer(grpcServer, t)
t.httpServer = &http.Server{
Handler: httpHandler,
ReadHeaderTimeout: 10 * time.Second,
}
return t, nil
}
// Start starts the gRPC server.
// Start starts the HTTP server.
func (t *Transport) Start() error {
go func() {
t.logger.Info("starting gRPC transport server", slog.String("address", t.bindAddr))
if err := t.grpcServer.Serve(t.listener); err != nil {
t.logger.Error("gRPC server error", slog.String("error", err.Error()))
t.logger.Info("starting Connect transport server", slog.String("address", t.bindAddr))
if err := t.httpServer.Serve(t.listener); err != nil && err != http.ErrServerClosed {
t.logger.Error("HTTP server error", slog.String("error", err.Error()))
}
}()
return nil
}
// Stop gracefully stops the gRPC server.
// Stop gracefully stops the HTTP server.
func (t *Transport) Stop() error {
close(t.stopCh)
// Close peer connections
t.mu.Lock()
for _, client := range t.peerClients {
if conn, ok := client.(interface{ Close() error }); ok {
conn.Close()
}
}
t.peerClients = nil
t.mu.Unlock()
// Stop gRPC server
if t.grpcServer != nil {
t.grpcServer.GracefulStop()
// Shutdown HTTP server
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if t.httpServer != nil {
return t.httpServer.Shutdown(ctx)
}
return nil
}
// ConnectPeer establishes a gRPC connection to a peer node.
// ConnectPeer establishes a Connect client connection to a peer node.
func (t *Transport) ConnectPeer(nodeID, addr string) error {
t.mu.Lock()
defer t.mu.Unlock()
@@ -176,13 +200,15 @@ func (t *Transport) ConnectPeer(nodeID, addr string) error {
return nil
}
// Create gRPC connection using stored credentials (TLS or insecure)
conn, err := gogrpc.NewClient(addr, gogrpc.WithTransportCredentials(t.clientCreds))
if err != nil {
return fmt.Errorf("failed to connect to peer %s at %s: %w", nodeID, addr, err)
// Determine URL scheme based on TLS config
scheme := "http"
if t.tlsConfig != nil {
scheme = "https"
}
baseURL := fmt.Sprintf("%s://%s", scheme, addr)
client := grpc.NewBrokerServiceClient(conn)
// Create Connect client
client := clusterv1connect.NewBrokerServiceClient(t.httpClient, baseURL)
t.peerClients[nodeID] = client
t.logger.Info("connected to peer",
@@ -192,8 +218,8 @@ func (t *Transport) ConnectPeer(nodeID, addr string) error {
return nil
}
// GetPeerClient returns the gRPC client for a peer node.
func (t *Transport) GetPeerClient(nodeID string) (grpc.BrokerServiceClient, error) {
// GetPeerClient returns the Connect client for a peer node.
func (t *Transport) GetPeerClient(nodeID string) (clusterv1connect.BrokerServiceClient, error) {
t.mu.RLock()
defer t.mu.RUnlock()
@@ -205,7 +231,7 @@ func (t *Transport) GetPeerClient(nodeID string) (grpc.BrokerServiceClient, erro
return client, nil
}
// HasPeerConnection checks if we have an active gRPC connection to a peer.
// HasPeerConnection checks if we have an active connection to a peer.
func (t *Transport) HasPeerConnection(nodeID string) bool {
t.mu.RLock()
defer t.mu.RUnlock()
@@ -214,145 +240,84 @@ func (t *Transport) HasPeerConnection(nodeID string) bool {
return exists
}
// RoutePublish implements BrokerServiceServer.RoutePublish.
// This is called by peer brokers to deliver a message to a local client.
func (t *Transport) RoutePublish(ctx context.Context, req *grpc.PublishRequest) (*grpc.PublishResponse, error) {
// RoutePublish implements BrokerServiceHandler.RoutePublish.
func (t *Transport) RoutePublish(ctx context.Context, req *connect.Request[clusterv1.PublishRequest]) (*connect.Response[clusterv1.PublishResponse], error) {
if t.handler == nil {
return &grpc.PublishResponse{
return connect.NewResponse(&clusterv1.PublishResponse{
Success: false,
Error: "no handler configured",
}, nil
}), nil
}
msg := &core.Message{
Topic: req.Topic,
Payload: req.Payload,
QoS: byte(req.Qos),
Retain: req.Retain,
Dup: req.Dup,
Properties: req.Properties,
Topic: req.Msg.Topic,
Payload: req.Msg.Payload,
QoS: byte(req.Msg.Qos),
Retain: req.Msg.Retain,
Dup: req.Msg.Dup,
Properties: req.Msg.Properties,
}
err := t.handler.DeliverToClient(ctx, req.ClientId, msg)
err := t.handler.DeliverToClient(ctx, req.Msg.ClientId, msg)
if err != nil {
return &grpc.PublishResponse{
return connect.NewResponse(&clusterv1.PublishResponse{
Success: false,
Error: err.Error(),
}, nil
}), nil
}
return &grpc.PublishResponse{
return connect.NewResponse(&clusterv1.PublishResponse{
Success: true,
}, nil
}), nil
}
// TakeoverSession implements BrokerServiceServer.TakeoverSession.
// This is called by peer brokers to take over a session.
func (t *Transport) TakeoverSession(ctx context.Context, req *grpc.TakeoverRequest) (*grpc.TakeoverResponse, error) {
// TakeoverSession implements BrokerServiceHandler.TakeoverSession.
func (t *Transport) TakeoverSession(ctx context.Context, req *connect.Request[clusterv1.TakeoverRequest]) (*connect.Response[clusterv1.TakeoverResponse], error) {
if t.handler == nil {
return &grpc.TakeoverResponse{
return connect.NewResponse(&clusterv1.TakeoverResponse{
Success: false,
Error: "no handler configured",
}, nil
}), nil
}
// Get session state from the handler (which will disconnect the client)
sessionState, err := t.handler.GetSessionStateAndClose(ctx, req.ClientId)
sessionState, err := t.handler.GetSessionStateAndClose(ctx, req.Msg.ClientId)
if err != nil {
return &grpc.TakeoverResponse{
return connect.NewResponse(&clusterv1.TakeoverResponse{
Success: false,
Error: err.Error(),
}, nil
}), nil
}
return &grpc.TakeoverResponse{
return connect.NewResponse(&clusterv1.TakeoverResponse{
Success: true,
SessionState: sessionState,
}, nil
}), nil
}
// SendPublish sends a PUBLISH message to a specific peer node.
func (t *Transport) SendPublish(ctx context.Context, nodeID, clientID, topic string, payload []byte, qos byte, retain, dup bool, properties map[string]string) error {
client, err := t.GetPeerClient(nodeID)
if err != nil {
return err
}
req := &grpc.PublishRequest{
ClientId: clientID,
Topic: topic,
Payload: payload,
Qos: uint32(qos),
Retain: retain,
Dup: dup,
Properties: properties,
}
resp, err := client.RoutePublish(ctx, req)
if err != nil {
return fmt.Errorf("grpc call failed: %w", err)
}
if !resp.Success {
return fmt.Errorf("publish failed: %s", resp.Error)
}
return nil
}
// SendTakeover sends a session takeover request to a peer node and returns the session state.
func (t *Transport) SendTakeover(ctx context.Context, nodeID, clientID, fromNode, toNode string) (*grpc.SessionState, error) {
client, err := t.GetPeerClient(nodeID)
if err != nil {
return nil, err
}
req := &grpc.TakeoverRequest{
ClientId: clientID,
FromNode: fromNode,
ToNode: toNode,
}
resp, err := client.TakeoverSession(ctx, req)
if err != nil {
return nil, fmt.Errorf("grpc call failed: %w", err)
}
if !resp.Success {
return nil, fmt.Errorf("takeover failed: %s", resp.Error)
}
return resp.SessionState, nil
}
// FetchRetained implements BrokerServiceServer.FetchRetained.
// This is called by peer brokers to fetch a retained message payload from the local store.
func (t *Transport) FetchRetained(ctx context.Context, req *grpc.FetchRetainedRequest) (*grpc.FetchRetainedResponse, error) {
// FetchRetained implements BrokerServiceHandler.FetchRetained.
func (t *Transport) FetchRetained(ctx context.Context, req *connect.Request[clusterv1.FetchRetainedRequest]) (*connect.Response[clusterv1.FetchRetainedResponse], error) {
if t.handler == nil {
return &grpc.FetchRetainedResponse{
return connect.NewResponse(&clusterv1.FetchRetainedResponse{
Found: false,
Error: "no handler configured",
}, nil
}), nil
}
// Get retained message from local storage via handler
msg, err := t.handler.GetRetainedMessage(ctx, req.Topic)
msg, err := t.handler.GetRetainedMessage(ctx, req.Msg.Topic)
if err != nil {
return &grpc.FetchRetainedResponse{
return connect.NewResponse(&clusterv1.FetchRetainedResponse{
Found: false,
Error: err.Error(),
}, nil
}), nil
}
// Message not found
if msg == nil {
return &grpc.FetchRetainedResponse{
return connect.NewResponse(&clusterv1.FetchRetainedResponse{
Found: false,
}, nil
}), nil
}
// Convert storage.Message to grpc.RetainedMessage
grpcMsg := &grpc.RetainedMessage{
grpcMsg := &clusterv1.RetainedMessage{
Topic: msg.Topic,
Payload: msg.Payload,
Qos: uint32(msg.QoS),
@@ -361,66 +326,36 @@ func (t *Transport) FetchRetained(ctx context.Context, req *grpc.FetchRetainedRe
Timestamp: msg.PublishTime.Unix(),
}
return &grpc.FetchRetainedResponse{
return connect.NewResponse(&clusterv1.FetchRetainedResponse{
Found: true,
Message: grpcMsg,
}, nil
}), nil
}
// SendFetchRetained fetches a retained message from a peer node.
func (t *Transport) SendFetchRetained(ctx context.Context, nodeID, topic string) (*grpc.RetainedMessage, error) {
client, err := t.GetPeerClient(nodeID)
if err != nil {
return nil, err
}
req := &grpc.FetchRetainedRequest{
Topic: topic,
}
resp, err := client.FetchRetained(ctx, req)
if err != nil {
return nil, fmt.Errorf("grpc call failed: %w", err)
}
if resp.Error != "" {
return nil, fmt.Errorf("fetch failed: %s", resp.Error)
}
if !resp.Found {
return nil, nil // Message not found (not an error)
}
return resp.Message, nil
}
// FetchWill handles incoming requests to fetch a will message from this node.
func (t *Transport) FetchWill(ctx context.Context, req *grpc.FetchWillRequest) (*grpc.FetchWillResponse, error) {
// FetchWill implements BrokerServiceHandler.FetchWill.
func (t *Transport) FetchWill(ctx context.Context, req *connect.Request[clusterv1.FetchWillRequest]) (*connect.Response[clusterv1.FetchWillResponse], error) {
if t.handler == nil {
return &grpc.FetchWillResponse{
return connect.NewResponse(&clusterv1.FetchWillResponse{
Found: false,
Error: "no handler configured",
}, nil
}), nil
}
// Get will message from local storage via handler
will, err := t.handler.GetWillMessage(ctx, req.ClientId)
will, err := t.handler.GetWillMessage(ctx, req.Msg.ClientId)
if err != nil {
return &grpc.FetchWillResponse{
return connect.NewResponse(&clusterv1.FetchWillResponse{
Found: false,
Error: err.Error(),
}, nil
}), nil
}
// Will message not found
if will == nil {
return &grpc.FetchWillResponse{
return connect.NewResponse(&clusterv1.FetchWillResponse{
Found: false,
}, nil
}), nil
}
// Convert storage.WillMessage to grpc.WillMessage
grpcWill := &grpc.WillMessage{
grpcWill := &clusterv1.WillMessage{
Topic: will.Topic,
Payload: will.Payload,
Qos: uint32(will.QoS),
@@ -428,37 +363,98 @@ func (t *Transport) FetchWill(ctx context.Context, req *grpc.FetchWillRequest) (
Delay: will.Delay,
}
return &grpc.FetchWillResponse{
return connect.NewResponse(&clusterv1.FetchWillResponse{
Found: true,
Message: grpcWill,
}, nil
}), nil
}
// SendFetchWill fetches a will message from a peer node.
func (t *Transport) SendFetchWill(ctx context.Context, nodeID, clientID string) (*grpc.WillMessage, error) {
client, err := t.GetPeerClient(nodeID)
// EnqueueRemote implements BrokerServiceHandler.EnqueueRemote.
func (t *Transport) EnqueueRemote(ctx context.Context, req *connect.Request[clusterv1.EnqueueRemoteRequest]) (*connect.Response[clusterv1.EnqueueRemoteResponse], error) {
t.mu.RLock()
handler := t.queueHandler
t.mu.RUnlock()
if handler == nil {
return connect.NewResponse(&clusterv1.EnqueueRemoteResponse{
Success: false,
Error: "no queue handler configured",
}), nil
}
messageID, err := handler.EnqueueLocal(ctx, req.Msg.QueueName, req.Msg.Payload, req.Msg.Properties)
if err != nil {
return nil, err
return connect.NewResponse(&clusterv1.EnqueueRemoteResponse{
Success: false,
Error: err.Error(),
}), nil
}
req := &grpc.FetchWillRequest{
ClientId: clientID,
return connect.NewResponse(&clusterv1.EnqueueRemoteResponse{
Success: true,
MessageId: messageID,
}), nil
}
// RouteQueueMessage implements BrokerServiceHandler.RouteQueueMessage.
func (t *Transport) RouteQueueMessage(ctx context.Context, req *connect.Request[clusterv1.RouteQueueMessageRequest]) (*connect.Response[clusterv1.RouteQueueMessageResponse], error) {
t.mu.RLock()
handler := t.queueHandler
t.mu.RUnlock()
if handler == nil {
return connect.NewResponse(&clusterv1.RouteQueueMessageResponse{
Success: false,
Error: "no queue handler configured",
}), nil
}
resp, err := client.FetchWill(ctx, req)
msg := map[string]interface{}{
"id": req.Msg.MessageId,
"queueName": req.Msg.QueueName,
"payload": req.Msg.Payload,
"properties": req.Msg.Properties,
"sequence": req.Msg.Sequence,
"partitionId": req.Msg.PartitionId,
}
err := handler.DeliverQueueMessage(ctx, req.Msg.ClientId, msg)
if err != nil {
return nil, fmt.Errorf("grpc call failed: %w", err)
return connect.NewResponse(&clusterv1.RouteQueueMessageResponse{
Success: false,
Error: err.Error(),
}), nil
}
if resp.Error != "" {
return nil, fmt.Errorf("fetch failed: %s", resp.Error)
}
return connect.NewResponse(&clusterv1.RouteQueueMessageResponse{
Success: true,
}), nil
}
if !resp.Found {
return nil, nil // Will message not found (not an error)
}
// AppendEntries implements BrokerServiceHandler.AppendEntries (Raft).
func (t *Transport) AppendEntries(ctx context.Context, req *connect.Request[clusterv1.AppendEntriesRequest]) (*connect.Response[clusterv1.AppendEntriesResponse], error) {
// TODO: Implement Raft consensus
return connect.NewResponse(&clusterv1.AppendEntriesResponse{
Term: req.Msg.Term,
Success: false,
}), nil
}
return resp.Message, nil
// RequestVote implements BrokerServiceHandler.RequestVote (Raft).
func (t *Transport) RequestVote(ctx context.Context, req *connect.Request[clusterv1.RequestVoteRequest]) (*connect.Response[clusterv1.RequestVoteResponse], error) {
// TODO: Implement Raft consensus
return connect.NewResponse(&clusterv1.RequestVoteResponse{
Term: req.Msg.Term,
VoteGranted: false,
}), nil
}
// InstallSnapshot implements BrokerServiceHandler.InstallSnapshot (Raft).
func (t *Transport) InstallSnapshot(ctx context.Context, req *connect.Request[clusterv1.InstallSnapshotRequest]) (*connect.Response[clusterv1.InstallSnapshotResponse], error) {
// TODO: Implement Raft consensus
return connect.NewResponse(&clusterv1.InstallSnapshotResponse{
Term: req.Msg.Term,
}), nil
}
// SetQueueHandler sets the queue handler for queue distribution operations.
@@ -468,32 +464,112 @@ func (t *Transport) SetQueueHandler(handler QueueHandler) {
t.queueHandler = handler
}
// EnqueueRemote implements BrokerServiceServer.EnqueueRemote.
// This is called by peer brokers to enqueue a message on this node.
func (t *Transport) EnqueueRemote(ctx context.Context, req *grpc.EnqueueRemoteRequest) (*grpc.EnqueueRemoteResponse, error) {
t.mu.RLock()
handler := t.queueHandler
t.mu.RUnlock()
if handler == nil {
return &grpc.EnqueueRemoteResponse{
Success: false,
Error: "no queue handler configured",
}, nil
}
messageID, err := handler.EnqueueLocal(ctx, req.QueueName, req.Payload, req.Properties)
// SendPublish sends a PUBLISH message to a specific peer node.
func (t *Transport) SendPublish(ctx context.Context, nodeID, clientID, topic string, payload []byte, qos byte, retain, dup bool, properties map[string]string) error {
client, err := t.GetPeerClient(nodeID)
if err != nil {
return &grpc.EnqueueRemoteResponse{
Success: false,
Error: err.Error(),
}, nil
return err
}
return &grpc.EnqueueRemoteResponse{
Success: true,
MessageId: messageID,
}, nil
req := connect.NewRequest(&clusterv1.PublishRequest{
ClientId: clientID,
Topic: topic,
Payload: payload,
Qos: uint32(qos),
Retain: retain,
Dup: dup,
Properties: properties,
})
resp, err := client.RoutePublish(ctx, req)
if err != nil {
return fmt.Errorf("connect call failed: %w", err)
}
if !resp.Msg.Success {
return fmt.Errorf("publish failed: %s", resp.Msg.Error)
}
return nil
}
// SendTakeover sends a session takeover request to a peer node.
func (t *Transport) SendTakeover(ctx context.Context, nodeID, clientID, fromNode, toNode string) (*clusterv1.SessionState, error) {
client, err := t.GetPeerClient(nodeID)
if err != nil {
return nil, err
}
req := connect.NewRequest(&clusterv1.TakeoverRequest{
ClientId: clientID,
FromNode: fromNode,
ToNode: toNode,
})
resp, err := client.TakeoverSession(ctx, req)
if err != nil {
return nil, fmt.Errorf("connect call failed: %w", err)
}
if !resp.Msg.Success {
return nil, fmt.Errorf("takeover failed: %s", resp.Msg.Error)
}
return resp.Msg.SessionState, nil
}
// SendFetchRetained fetches a retained message from a peer node.
func (t *Transport) SendFetchRetained(ctx context.Context, nodeID, topic string) (*clusterv1.RetainedMessage, error) {
client, err := t.GetPeerClient(nodeID)
if err != nil {
return nil, err
}
req := connect.NewRequest(&clusterv1.FetchRetainedRequest{
Topic: topic,
})
resp, err := client.FetchRetained(ctx, req)
if err != nil {
return nil, fmt.Errorf("connect call failed: %w", err)
}
if resp.Msg.Error != "" {
return nil, fmt.Errorf("fetch failed: %s", resp.Msg.Error)
}
if !resp.Msg.Found {
return nil, nil
}
return resp.Msg.Message, nil
}
// SendFetchWill fetches a will message from a peer node.
func (t *Transport) SendFetchWill(ctx context.Context, nodeID, clientID string) (*clusterv1.WillMessage, error) {
client, err := t.GetPeerClient(nodeID)
if err != nil {
return nil, err
}
req := connect.NewRequest(&clusterv1.FetchWillRequest{
ClientId: clientID,
})
resp, err := client.FetchWill(ctx, req)
if err != nil {
return nil, fmt.Errorf("connect call failed: %w", err)
}
if resp.Msg.Error != "" {
return nil, fmt.Errorf("fetch failed: %s", resp.Msg.Error)
}
if !resp.Msg.Found {
return nil, nil
}
return resp.Msg.Message, nil
}
// SendEnqueueRemote sends an enqueue request to a peer node.
@@ -503,59 +579,22 @@ func (t *Transport) SendEnqueueRemote(ctx context.Context, nodeID, queueName str
return "", err
}
req := &grpc.EnqueueRemoteRequest{
req := connect.NewRequest(&clusterv1.EnqueueRemoteRequest{
QueueName: queueName,
Payload: payload,
Properties: properties,
}
})
resp, err := client.EnqueueRemote(ctx, req)
if err != nil {
return "", fmt.Errorf("grpc call failed: %w", err)
return "", fmt.Errorf("connect call failed: %w", err)
}
if !resp.Success {
return "", fmt.Errorf("enqueue failed: %s", resp.Error)
if !resp.Msg.Success {
return "", fmt.Errorf("enqueue failed: %s", resp.Msg.Error)
}
return resp.MessageId, nil
}
// RouteQueueMessage implements BrokerServiceServer.RouteQueueMessage.
// This is called by peer brokers to deliver a queue message to a local consumer.
func (t *Transport) RouteQueueMessage(ctx context.Context, req *grpc.RouteQueueMessageRequest) (*grpc.RouteQueueMessageResponse, error) {
t.mu.RLock()
handler := t.queueHandler
t.mu.RUnlock()
if handler == nil {
return &grpc.RouteQueueMessageResponse{
Success: false,
Error: "no queue handler configured",
}, nil
}
// Create a simplified message structure for delivery
msg := map[string]interface{}{
"id": req.MessageId,
"queueName": req.QueueName,
"payload": req.Payload,
"properties": req.Properties,
"sequence": req.Sequence,
"partitionId": req.PartitionId,
}
err := handler.DeliverQueueMessage(ctx, req.ClientId, msg)
if err != nil {
return &grpc.RouteQueueMessageResponse{
Success: false,
Error: err.Error(),
}, nil
}
return &grpc.RouteQueueMessageResponse{
Success: true,
}, nil
return resp.Msg.MessageId, nil
}
// SendRouteQueueMessage sends a queue message delivery request to a peer node.
@@ -565,7 +604,7 @@ func (t *Transport) SendRouteQueueMessage(ctx context.Context, nodeID, clientID,
return err
}
req := &grpc.RouteQueueMessageRequest{
req := connect.NewRequest(&clusterv1.RouteQueueMessageRequest{
ClientId: clientID,
QueueName: queueName,
MessageId: messageID,
@@ -573,15 +612,15 @@ func (t *Transport) SendRouteQueueMessage(ctx context.Context, nodeID, clientID,
Properties: properties,
Sequence: sequence,
PartitionId: int32(partitionID),
}
})
resp, err := client.RouteQueueMessage(ctx, req)
if err != nil {
return fmt.Errorf("grpc call failed: %w", err)
return fmt.Errorf("connect call failed: %w", err)
}
if !resp.Success {
return fmt.Errorf("route queue message failed: %s", resp.Error)
if !resp.Msg.Success {
return fmt.Errorf("route queue message failed: %s", resp.Msg.Error)
}
return nil
+33
View File
@@ -23,6 +23,7 @@ import (
"github.com/absmach/fluxmq/queue"
logStorage "github.com/absmach/fluxmq/logstorage"
"github.com/absmach/fluxmq/ratelimit"
"github.com/absmach/fluxmq/server/api"
"github.com/absmach/fluxmq/server/coap"
"github.com/absmach/fluxmq/server/health"
"github.com/absmach/fluxmq/server/http"
@@ -447,6 +448,38 @@ func main() {
}()
}
// Start Queue API server (gRPC/HTTP via Connect protocol)
if cfg.Server.APIEnabled {
apiCfg := api.Config{
Address: cfg.Server.APIAddr,
ShutdownTimeout: cfg.Server.ShutdownTimeout,
}
// Get the queue manager from the broker and log store
queueManager := b.GetQueueManager()
if qm, ok := queueManager.(*queue.Manager); ok {
queueDir := cfg.Storage.BadgerDir + "_queue"
logStore, err := logStorage.NewAdapter(queueDir, logStorage.DefaultAdapterConfig())
if err == nil {
defer logStore.Close()
apiServer := api.New(apiCfg, qm, logStore, logStore, logger)
wg.Add(1)
go func() {
defer wg.Done()
slog.Info("Starting Queue API server", "address", cfg.Server.APIAddr)
if err := apiServer.Listen(ctx); err != nil {
serverErr <- err
}
}()
} else {
slog.Warn("Failed to create API server log store", "error", err)
}
} else {
slog.Warn("Queue manager not available or wrong type, API server disabled")
}
}
slog.Info("MQTT broker started successfully")
sigChan := make(chan os.Signal, 1)
+4
View File
@@ -89,6 +89,10 @@ type ServerConfig struct {
OtelTracesEnabled bool `yaml:"otel_traces_enabled"`
OtelMetricsEnabled bool `yaml:"otel_metrics_enabled"`
OtelTraceSampleRate float64 `yaml:"otel_trace_sample_rate"` // 0.0 to 1.0
// Queue API server (Connect/gRPC)
APIEnabled bool `yaml:"api_enabled"`
APIAddr string `yaml:"api_addr"`
}
// BrokerConfig holds broker-specific settings.
+3 -2
View File
@@ -3,6 +3,7 @@ module github.com/absmach/fluxmq
go 1.24.3
require (
connectrpc.com/connect v1.19.1
github.com/dgraph-io/badger/v4 v4.9.0
github.com/eclipse/paho.mqtt.golang v1.5.1
github.com/google/uuid v1.6.0
@@ -22,8 +23,8 @@ require (
go.opentelemetry.io/otel/sdk v1.39.0
go.opentelemetry.io/otel/sdk/metric v1.39.0
go.opentelemetry.io/otel/trace v1.39.0
golang.org/x/net v0.47.0
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba
google.golang.org/grpc v1.77.0
google.golang.org/protobuf v1.36.11
gopkg.in/yaml.v3 v3.0.1
)
@@ -86,13 +87,13 @@ require (
go.uber.org/zap v1.17.0 // indirect
golang.org/x/crypto v0.44.0 // indirect
golang.org/x/exp v0.0.0-20240904232852-e7e105dedf7e // indirect
golang.org/x/net v0.47.0 // indirect
golang.org/x/sync v0.18.0 // indirect
golang.org/x/sys v0.39.0 // indirect
golang.org/x/text v0.31.0 // indirect
google.golang.org/genproto v0.0.0-20230822172742-b8732ec3820d // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20251202230838-ff82c1b0f217 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20251202230838-ff82c1b0f217 // indirect
google.golang.org/grpc v1.77.0 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
sigs.k8s.io/yaml v1.2.0 // indirect
+2
View File
@@ -4,6 +4,8 @@ cloud.google.com/go v0.110.7 h1:rJyC7nWRg2jWGZ4wSJ5nY65GTdYJkg0cd/uXb+ACI6o=
cloud.google.com/go/compute v1.23.0 h1:tP41Zoavr8ptEqaW6j+LQOnyBBhO7OkOMAGrgLopTwY=
cloud.google.com/go/compute/metadata v0.9.0 h1:pDUj4QMoPejqq20dK0Pg2N4yG9zIkYGdBtwLoEkH9Zs=
cloud.google.com/go/compute/metadata v0.9.0/go.mod h1:E0bWwX5wTnLPedCKqk3pJmVgCBSM6qQI1yTBdEb3C10=
connectrpc.com/connect v1.19.1 h1:R5M57z05+90EfEvCY1b7hBxDVOUl45PrtXtAV2fOC14=
connectrpc.com/connect v1.19.1/go.mod h1:tN20fjdGlewnSFeZxLKb0xwIZ6ozc3OQs2hTXy4du9w=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ=
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,361 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
// Code generated by protoc-gen-connect-go. DO NOT EDIT.
//
// Source: cluster/v1/broker.proto
package clusterv1connect
import (
connect "connectrpc.com/connect"
context "context"
errors "errors"
v1 "github.com/absmach/fluxmq/pkg/proto/cluster/v1"
http "net/http"
strings "strings"
)
// This is a compile-time assertion to ensure that this generated file and the connect package are
// compatible. If you get a compiler error that this constant is not defined, this code was
// generated with a version of connect newer than the one compiled into your binary. You can fix the
// problem by either regenerating this code with an older version of connect or updating the connect
// version compiled into your binary.
const _ = connect.IsAtLeastVersion1_13_0
const (
// BrokerServiceName is the fully-qualified name of the BrokerService service.
BrokerServiceName = "fluxmq.cluster.v1.BrokerService"
)
// These constants are the fully-qualified names of the RPCs defined in this package. They're
// exposed at runtime as Spec.Procedure and as the final two segments of the HTTP route.
//
// Note that these are different from the fully-qualified method names used by
// google.golang.org/protobuf/reflect/protoreflect. To convert from these constants to
// reflection-formatted method names, remove the leading slash and convert the remaining slash to a
// period.
const (
// BrokerServiceRoutePublishProcedure is the fully-qualified name of the BrokerService's
// RoutePublish RPC.
BrokerServiceRoutePublishProcedure = "/fluxmq.cluster.v1.BrokerService/RoutePublish"
// BrokerServiceTakeoverSessionProcedure is the fully-qualified name of the BrokerService's
// TakeoverSession RPC.
BrokerServiceTakeoverSessionProcedure = "/fluxmq.cluster.v1.BrokerService/TakeoverSession"
// BrokerServiceFetchRetainedProcedure is the fully-qualified name of the BrokerService's
// FetchRetained RPC.
BrokerServiceFetchRetainedProcedure = "/fluxmq.cluster.v1.BrokerService/FetchRetained"
// BrokerServiceFetchWillProcedure is the fully-qualified name of the BrokerService's FetchWill RPC.
BrokerServiceFetchWillProcedure = "/fluxmq.cluster.v1.BrokerService/FetchWill"
// BrokerServiceEnqueueRemoteProcedure is the fully-qualified name of the BrokerService's
// EnqueueRemote RPC.
BrokerServiceEnqueueRemoteProcedure = "/fluxmq.cluster.v1.BrokerService/EnqueueRemote"
// BrokerServiceRouteQueueMessageProcedure is the fully-qualified name of the BrokerService's
// RouteQueueMessage RPC.
BrokerServiceRouteQueueMessageProcedure = "/fluxmq.cluster.v1.BrokerService/RouteQueueMessage"
// BrokerServiceAppendEntriesProcedure is the fully-qualified name of the BrokerService's
// AppendEntries RPC.
BrokerServiceAppendEntriesProcedure = "/fluxmq.cluster.v1.BrokerService/AppendEntries"
// BrokerServiceRequestVoteProcedure is the fully-qualified name of the BrokerService's RequestVote
// RPC.
BrokerServiceRequestVoteProcedure = "/fluxmq.cluster.v1.BrokerService/RequestVote"
// BrokerServiceInstallSnapshotProcedure is the fully-qualified name of the BrokerService's
// InstallSnapshot RPC.
BrokerServiceInstallSnapshotProcedure = "/fluxmq.cluster.v1.BrokerService/InstallSnapshot"
)
// BrokerServiceClient is a client for the fluxmq.cluster.v1.BrokerService service.
type BrokerServiceClient interface {
// RoutePublish forwards a PUBLISH packet to the broker that owns the target session.
RoutePublish(context.Context, *connect.Request[v1.PublishRequest]) (*connect.Response[v1.PublishResponse], error)
// TakeoverSession migrates a session from one broker to another.
TakeoverSession(context.Context, *connect.Request[v1.TakeoverRequest]) (*connect.Response[v1.TakeoverResponse], error)
// FetchRetained fetches a retained message payload from the owning broker node.
FetchRetained(context.Context, *connect.Request[v1.FetchRetainedRequest]) (*connect.Response[v1.FetchRetainedResponse], error)
// FetchWill fetches a will message payload from the owning broker node.
FetchWill(context.Context, *connect.Request[v1.FetchWillRequest]) (*connect.Response[v1.FetchWillResponse], error)
// EnqueueRemote routes an enqueue operation to the partition owner node.
EnqueueRemote(context.Context, *connect.Request[v1.EnqueueRemoteRequest]) (*connect.Response[v1.EnqueueRemoteResponse], error)
// RouteQueueMessage delivers a queue message to a consumer on a different node.
RouteQueueMessage(context.Context, *connect.Request[v1.RouteQueueMessageRequest]) (*connect.Response[v1.RouteQueueMessageResponse], error)
// AppendEntries is invoked by the Raft leader to replicate log entries.
AppendEntries(context.Context, *connect.Request[v1.AppendEntriesRequest]) (*connect.Response[v1.AppendEntriesResponse], error)
// RequestVote is invoked by candidates during leader election.
RequestVote(context.Context, *connect.Request[v1.RequestVoteRequest]) (*connect.Response[v1.RequestVoteResponse], error)
// InstallSnapshot is invoked by leader to transfer snapshot to a follower.
InstallSnapshot(context.Context, *connect.Request[v1.InstallSnapshotRequest]) (*connect.Response[v1.InstallSnapshotResponse], error)
}
// NewBrokerServiceClient constructs a client for the fluxmq.cluster.v1.BrokerService service. By
// default, it uses the Connect protocol with the binary Protobuf Codec, asks for gzipped responses,
// and sends uncompressed requests. To use the gRPC or gRPC-Web protocols, supply the
// connect.WithGRPC() or connect.WithGRPCWeb() options.
//
// The URL supplied here should be the base URL for the Connect or gRPC server (for example,
// http://api.acme.com or https://acme.com/grpc).
func NewBrokerServiceClient(httpClient connect.HTTPClient, baseURL string, opts ...connect.ClientOption) BrokerServiceClient {
baseURL = strings.TrimRight(baseURL, "/")
brokerServiceMethods := v1.File_cluster_v1_broker_proto.Services().ByName("BrokerService").Methods()
return &brokerServiceClient{
routePublish: connect.NewClient[v1.PublishRequest, v1.PublishResponse](
httpClient,
baseURL+BrokerServiceRoutePublishProcedure,
connect.WithSchema(brokerServiceMethods.ByName("RoutePublish")),
connect.WithClientOptions(opts...),
),
takeoverSession: connect.NewClient[v1.TakeoverRequest, v1.TakeoverResponse](
httpClient,
baseURL+BrokerServiceTakeoverSessionProcedure,
connect.WithSchema(brokerServiceMethods.ByName("TakeoverSession")),
connect.WithClientOptions(opts...),
),
fetchRetained: connect.NewClient[v1.FetchRetainedRequest, v1.FetchRetainedResponse](
httpClient,
baseURL+BrokerServiceFetchRetainedProcedure,
connect.WithSchema(brokerServiceMethods.ByName("FetchRetained")),
connect.WithClientOptions(opts...),
),
fetchWill: connect.NewClient[v1.FetchWillRequest, v1.FetchWillResponse](
httpClient,
baseURL+BrokerServiceFetchWillProcedure,
connect.WithSchema(brokerServiceMethods.ByName("FetchWill")),
connect.WithClientOptions(opts...),
),
enqueueRemote: connect.NewClient[v1.EnqueueRemoteRequest, v1.EnqueueRemoteResponse](
httpClient,
baseURL+BrokerServiceEnqueueRemoteProcedure,
connect.WithSchema(brokerServiceMethods.ByName("EnqueueRemote")),
connect.WithClientOptions(opts...),
),
routeQueueMessage: connect.NewClient[v1.RouteQueueMessageRequest, v1.RouteQueueMessageResponse](
httpClient,
baseURL+BrokerServiceRouteQueueMessageProcedure,
connect.WithSchema(brokerServiceMethods.ByName("RouteQueueMessage")),
connect.WithClientOptions(opts...),
),
appendEntries: connect.NewClient[v1.AppendEntriesRequest, v1.AppendEntriesResponse](
httpClient,
baseURL+BrokerServiceAppendEntriesProcedure,
connect.WithSchema(brokerServiceMethods.ByName("AppendEntries")),
connect.WithClientOptions(opts...),
),
requestVote: connect.NewClient[v1.RequestVoteRequest, v1.RequestVoteResponse](
httpClient,
baseURL+BrokerServiceRequestVoteProcedure,
connect.WithSchema(brokerServiceMethods.ByName("RequestVote")),
connect.WithClientOptions(opts...),
),
installSnapshot: connect.NewClient[v1.InstallSnapshotRequest, v1.InstallSnapshotResponse](
httpClient,
baseURL+BrokerServiceInstallSnapshotProcedure,
connect.WithSchema(brokerServiceMethods.ByName("InstallSnapshot")),
connect.WithClientOptions(opts...),
),
}
}
// brokerServiceClient implements BrokerServiceClient.
type brokerServiceClient struct {
routePublish *connect.Client[v1.PublishRequest, v1.PublishResponse]
takeoverSession *connect.Client[v1.TakeoverRequest, v1.TakeoverResponse]
fetchRetained *connect.Client[v1.FetchRetainedRequest, v1.FetchRetainedResponse]
fetchWill *connect.Client[v1.FetchWillRequest, v1.FetchWillResponse]
enqueueRemote *connect.Client[v1.EnqueueRemoteRequest, v1.EnqueueRemoteResponse]
routeQueueMessage *connect.Client[v1.RouteQueueMessageRequest, v1.RouteQueueMessageResponse]
appendEntries *connect.Client[v1.AppendEntriesRequest, v1.AppendEntriesResponse]
requestVote *connect.Client[v1.RequestVoteRequest, v1.RequestVoteResponse]
installSnapshot *connect.Client[v1.InstallSnapshotRequest, v1.InstallSnapshotResponse]
}
// RoutePublish calls fluxmq.cluster.v1.BrokerService.RoutePublish.
func (c *brokerServiceClient) RoutePublish(ctx context.Context, req *connect.Request[v1.PublishRequest]) (*connect.Response[v1.PublishResponse], error) {
return c.routePublish.CallUnary(ctx, req)
}
// TakeoverSession calls fluxmq.cluster.v1.BrokerService.TakeoverSession.
func (c *brokerServiceClient) TakeoverSession(ctx context.Context, req *connect.Request[v1.TakeoverRequest]) (*connect.Response[v1.TakeoverResponse], error) {
return c.takeoverSession.CallUnary(ctx, req)
}
// FetchRetained calls fluxmq.cluster.v1.BrokerService.FetchRetained.
func (c *brokerServiceClient) FetchRetained(ctx context.Context, req *connect.Request[v1.FetchRetainedRequest]) (*connect.Response[v1.FetchRetainedResponse], error) {
return c.fetchRetained.CallUnary(ctx, req)
}
// FetchWill calls fluxmq.cluster.v1.BrokerService.FetchWill.
func (c *brokerServiceClient) FetchWill(ctx context.Context, req *connect.Request[v1.FetchWillRequest]) (*connect.Response[v1.FetchWillResponse], error) {
return c.fetchWill.CallUnary(ctx, req)
}
// EnqueueRemote calls fluxmq.cluster.v1.BrokerService.EnqueueRemote.
func (c *brokerServiceClient) EnqueueRemote(ctx context.Context, req *connect.Request[v1.EnqueueRemoteRequest]) (*connect.Response[v1.EnqueueRemoteResponse], error) {
return c.enqueueRemote.CallUnary(ctx, req)
}
// RouteQueueMessage calls fluxmq.cluster.v1.BrokerService.RouteQueueMessage.
func (c *brokerServiceClient) RouteQueueMessage(ctx context.Context, req *connect.Request[v1.RouteQueueMessageRequest]) (*connect.Response[v1.RouteQueueMessageResponse], error) {
return c.routeQueueMessage.CallUnary(ctx, req)
}
// AppendEntries calls fluxmq.cluster.v1.BrokerService.AppendEntries.
func (c *brokerServiceClient) AppendEntries(ctx context.Context, req *connect.Request[v1.AppendEntriesRequest]) (*connect.Response[v1.AppendEntriesResponse], error) {
return c.appendEntries.CallUnary(ctx, req)
}
// RequestVote calls fluxmq.cluster.v1.BrokerService.RequestVote.
func (c *brokerServiceClient) RequestVote(ctx context.Context, req *connect.Request[v1.RequestVoteRequest]) (*connect.Response[v1.RequestVoteResponse], error) {
return c.requestVote.CallUnary(ctx, req)
}
// InstallSnapshot calls fluxmq.cluster.v1.BrokerService.InstallSnapshot.
func (c *brokerServiceClient) InstallSnapshot(ctx context.Context, req *connect.Request[v1.InstallSnapshotRequest]) (*connect.Response[v1.InstallSnapshotResponse], error) {
return c.installSnapshot.CallUnary(ctx, req)
}
// BrokerServiceHandler is an implementation of the fluxmq.cluster.v1.BrokerService service.
type BrokerServiceHandler interface {
// RoutePublish forwards a PUBLISH packet to the broker that owns the target session.
RoutePublish(context.Context, *connect.Request[v1.PublishRequest]) (*connect.Response[v1.PublishResponse], error)
// TakeoverSession migrates a session from one broker to another.
TakeoverSession(context.Context, *connect.Request[v1.TakeoverRequest]) (*connect.Response[v1.TakeoverResponse], error)
// FetchRetained fetches a retained message payload from the owning broker node.
FetchRetained(context.Context, *connect.Request[v1.FetchRetainedRequest]) (*connect.Response[v1.FetchRetainedResponse], error)
// FetchWill fetches a will message payload from the owning broker node.
FetchWill(context.Context, *connect.Request[v1.FetchWillRequest]) (*connect.Response[v1.FetchWillResponse], error)
// EnqueueRemote routes an enqueue operation to the partition owner node.
EnqueueRemote(context.Context, *connect.Request[v1.EnqueueRemoteRequest]) (*connect.Response[v1.EnqueueRemoteResponse], error)
// RouteQueueMessage delivers a queue message to a consumer on a different node.
RouteQueueMessage(context.Context, *connect.Request[v1.RouteQueueMessageRequest]) (*connect.Response[v1.RouteQueueMessageResponse], error)
// AppendEntries is invoked by the Raft leader to replicate log entries.
AppendEntries(context.Context, *connect.Request[v1.AppendEntriesRequest]) (*connect.Response[v1.AppendEntriesResponse], error)
// RequestVote is invoked by candidates during leader election.
RequestVote(context.Context, *connect.Request[v1.RequestVoteRequest]) (*connect.Response[v1.RequestVoteResponse], error)
// InstallSnapshot is invoked by leader to transfer snapshot to a follower.
InstallSnapshot(context.Context, *connect.Request[v1.InstallSnapshotRequest]) (*connect.Response[v1.InstallSnapshotResponse], error)
}
// NewBrokerServiceHandler builds an HTTP handler from the service implementation. It returns the
// path on which to mount the handler and the handler itself.
//
// By default, handlers support the Connect, gRPC, and gRPC-Web protocols with the binary Protobuf
// and JSON codecs. They also support gzip compression.
func NewBrokerServiceHandler(svc BrokerServiceHandler, opts ...connect.HandlerOption) (string, http.Handler) {
brokerServiceMethods := v1.File_cluster_v1_broker_proto.Services().ByName("BrokerService").Methods()
brokerServiceRoutePublishHandler := connect.NewUnaryHandler(
BrokerServiceRoutePublishProcedure,
svc.RoutePublish,
connect.WithSchema(brokerServiceMethods.ByName("RoutePublish")),
connect.WithHandlerOptions(opts...),
)
brokerServiceTakeoverSessionHandler := connect.NewUnaryHandler(
BrokerServiceTakeoverSessionProcedure,
svc.TakeoverSession,
connect.WithSchema(brokerServiceMethods.ByName("TakeoverSession")),
connect.WithHandlerOptions(opts...),
)
brokerServiceFetchRetainedHandler := connect.NewUnaryHandler(
BrokerServiceFetchRetainedProcedure,
svc.FetchRetained,
connect.WithSchema(brokerServiceMethods.ByName("FetchRetained")),
connect.WithHandlerOptions(opts...),
)
brokerServiceFetchWillHandler := connect.NewUnaryHandler(
BrokerServiceFetchWillProcedure,
svc.FetchWill,
connect.WithSchema(brokerServiceMethods.ByName("FetchWill")),
connect.WithHandlerOptions(opts...),
)
brokerServiceEnqueueRemoteHandler := connect.NewUnaryHandler(
BrokerServiceEnqueueRemoteProcedure,
svc.EnqueueRemote,
connect.WithSchema(brokerServiceMethods.ByName("EnqueueRemote")),
connect.WithHandlerOptions(opts...),
)
brokerServiceRouteQueueMessageHandler := connect.NewUnaryHandler(
BrokerServiceRouteQueueMessageProcedure,
svc.RouteQueueMessage,
connect.WithSchema(brokerServiceMethods.ByName("RouteQueueMessage")),
connect.WithHandlerOptions(opts...),
)
brokerServiceAppendEntriesHandler := connect.NewUnaryHandler(
BrokerServiceAppendEntriesProcedure,
svc.AppendEntries,
connect.WithSchema(brokerServiceMethods.ByName("AppendEntries")),
connect.WithHandlerOptions(opts...),
)
brokerServiceRequestVoteHandler := connect.NewUnaryHandler(
BrokerServiceRequestVoteProcedure,
svc.RequestVote,
connect.WithSchema(brokerServiceMethods.ByName("RequestVote")),
connect.WithHandlerOptions(opts...),
)
brokerServiceInstallSnapshotHandler := connect.NewUnaryHandler(
BrokerServiceInstallSnapshotProcedure,
svc.InstallSnapshot,
connect.WithSchema(brokerServiceMethods.ByName("InstallSnapshot")),
connect.WithHandlerOptions(opts...),
)
return "/fluxmq.cluster.v1.BrokerService/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case BrokerServiceRoutePublishProcedure:
brokerServiceRoutePublishHandler.ServeHTTP(w, r)
case BrokerServiceTakeoverSessionProcedure:
brokerServiceTakeoverSessionHandler.ServeHTTP(w, r)
case BrokerServiceFetchRetainedProcedure:
brokerServiceFetchRetainedHandler.ServeHTTP(w, r)
case BrokerServiceFetchWillProcedure:
brokerServiceFetchWillHandler.ServeHTTP(w, r)
case BrokerServiceEnqueueRemoteProcedure:
brokerServiceEnqueueRemoteHandler.ServeHTTP(w, r)
case BrokerServiceRouteQueueMessageProcedure:
brokerServiceRouteQueueMessageHandler.ServeHTTP(w, r)
case BrokerServiceAppendEntriesProcedure:
brokerServiceAppendEntriesHandler.ServeHTTP(w, r)
case BrokerServiceRequestVoteProcedure:
brokerServiceRequestVoteHandler.ServeHTTP(w, r)
case BrokerServiceInstallSnapshotProcedure:
brokerServiceInstallSnapshotHandler.ServeHTTP(w, r)
default:
http.NotFound(w, r)
}
})
}
// UnimplementedBrokerServiceHandler returns CodeUnimplemented from all methods.
type UnimplementedBrokerServiceHandler struct{}
func (UnimplementedBrokerServiceHandler) RoutePublish(context.Context, *connect.Request[v1.PublishRequest]) (*connect.Response[v1.PublishResponse], error) {
return nil, connect.NewError(connect.CodeUnimplemented, errors.New("fluxmq.cluster.v1.BrokerService.RoutePublish is not implemented"))
}
func (UnimplementedBrokerServiceHandler) TakeoverSession(context.Context, *connect.Request[v1.TakeoverRequest]) (*connect.Response[v1.TakeoverResponse], error) {
return nil, connect.NewError(connect.CodeUnimplemented, errors.New("fluxmq.cluster.v1.BrokerService.TakeoverSession is not implemented"))
}
func (UnimplementedBrokerServiceHandler) FetchRetained(context.Context, *connect.Request[v1.FetchRetainedRequest]) (*connect.Response[v1.FetchRetainedResponse], error) {
return nil, connect.NewError(connect.CodeUnimplemented, errors.New("fluxmq.cluster.v1.BrokerService.FetchRetained is not implemented"))
}
func (UnimplementedBrokerServiceHandler) FetchWill(context.Context, *connect.Request[v1.FetchWillRequest]) (*connect.Response[v1.FetchWillResponse], error) {
return nil, connect.NewError(connect.CodeUnimplemented, errors.New("fluxmq.cluster.v1.BrokerService.FetchWill is not implemented"))
}
func (UnimplementedBrokerServiceHandler) EnqueueRemote(context.Context, *connect.Request[v1.EnqueueRemoteRequest]) (*connect.Response[v1.EnqueueRemoteResponse], error) {
return nil, connect.NewError(connect.CodeUnimplemented, errors.New("fluxmq.cluster.v1.BrokerService.EnqueueRemote is not implemented"))
}
func (UnimplementedBrokerServiceHandler) RouteQueueMessage(context.Context, *connect.Request[v1.RouteQueueMessageRequest]) (*connect.Response[v1.RouteQueueMessageResponse], error) {
return nil, connect.NewError(connect.CodeUnimplemented, errors.New("fluxmq.cluster.v1.BrokerService.RouteQueueMessage is not implemented"))
}
func (UnimplementedBrokerServiceHandler) AppendEntries(context.Context, *connect.Request[v1.AppendEntriesRequest]) (*connect.Response[v1.AppendEntriesResponse], error) {
return nil, connect.NewError(connect.CodeUnimplemented, errors.New("fluxmq.cluster.v1.BrokerService.AppendEntries is not implemented"))
}
func (UnimplementedBrokerServiceHandler) RequestVote(context.Context, *connect.Request[v1.RequestVoteRequest]) (*connect.Response[v1.RequestVoteResponse], error) {
return nil, connect.NewError(connect.CodeUnimplemented, errors.New("fluxmq.cluster.v1.BrokerService.RequestVote is not implemented"))
}
func (UnimplementedBrokerServiceHandler) InstallSnapshot(context.Context, *connect.Request[v1.InstallSnapshotRequest]) (*connect.Response[v1.InstallSnapshotResponse], error) {
return nil, connect.NewError(connect.CodeUnimplemented, errors.New("fluxmq.cluster.v1.BrokerService.InstallSnapshot is not implemented"))
}
File diff suppressed because it is too large Load Diff
File diff suppressed because it is too large Load Diff
-247
View File
@@ -1,247 +0,0 @@
syntax = "proto3";
package cluster;
option go_package = "github.com/absmach/fluxmq/cluster/grpc";
// BrokerService defines RPC methods for inter-broker communication in the cluster.
service BrokerService {
// RoutePublish forwards a PUBLISH packet to the broker that owns the target session.
rpc RoutePublish(PublishRequest) returns (PublishResponse);
// TakeoverSession migrates a session from one broker to another.
rpc TakeoverSession(TakeoverRequest) returns (TakeoverResponse);
// FetchRetained fetches a retained message payload from the owning broker node.
// Used when a large retained message (above threshold) needs to be delivered to a subscriber on a different node.
rpc FetchRetained(FetchRetainedRequest) returns (FetchRetainedResponse);
// FetchWill fetches a will message payload from the owning broker node.
// Used when a large will message (above threshold) needs to be published after client disconnect.
rpc FetchWill(FetchWillRequest) returns (FetchWillResponse);
// EnqueueRemote routes an enqueue operation to the partition owner node.
// Used in distributed queue mode when a message needs to be enqueued on a remote partition.
rpc EnqueueRemote(EnqueueRemoteRequest) returns (EnqueueRemoteResponse);
// RouteQueueMessage delivers a queue message to a consumer on a different node.
// Used when partition owner needs to deliver to a consumer connected to another node.
rpc RouteQueueMessage(RouteQueueMessageRequest) returns (RouteQueueMessageResponse);
// Raft RPCs for queue partition replication
// AppendEntries is invoked by the Raft leader to replicate log entries.
// Also used as a heartbeat to maintain leadership.
rpc AppendEntries(AppendEntriesRequest) returns (AppendEntriesResponse);
// RequestVote is invoked by candidates during leader election.
rpc RequestVote(RequestVoteRequest) returns (RequestVoteResponse);
// InstallSnapshot is invoked by leader to transfer snapshot to a follower.
rpc InstallSnapshot(InstallSnapshotRequest) returns (InstallSnapshotResponse);
}
// PublishRequest contains a serialized PUBLISH packet to route to a client.
message PublishRequest {
string client_id = 1; // Target client ID
string topic = 2; // MQTT topic
bytes payload = 3; // Message payload
uint32 qos = 4; // QoS level (0, 1, 2)
bool retain = 5; // Retain flag
bool dup = 6; // Duplicate flag
map<string, string> properties = 7; // MQTT v5 properties (if any)
}
// PublishResponse indicates whether the publish was delivered.
message PublishResponse {
bool success = 1;
string error = 2; // Error message if success=false
}
// TakeoverRequest requests session migration from one node to another.
message TakeoverRequest {
string client_id = 1;
string from_node = 2; // Node currently owning the session
string to_node = 3; // Node taking over the session
}
// SessionState contains the full state of an MQTT session.
message SessionState {
uint32 expiry_interval = 1;
bool clean_start = 2;
// In-flight messages (QoS 1/2 not yet acknowledged)
repeated InflightMessage inflight_messages = 3;
// Offline queue (messages waiting for delivery)
repeated QueuedMessage queued_messages = 4;
// Subscriptions
repeated Subscription subscriptions = 5;
// Will message
WillMessage will = 6;
}
// InflightMessage represents a message awaiting acknowledgment.
message InflightMessage {
uint32 packet_id = 1;
string topic = 2;
bytes payload = 3;
uint32 qos = 4;
bool retain = 5;
int64 timestamp = 6;
}
// QueuedMessage represents a message in the offline queue.
message QueuedMessage {
string topic = 1;
bytes payload = 2;
uint32 qos = 3;
bool retain = 4;
int64 timestamp = 5;
}
// Subscription represents a topic subscription.
message Subscription {
string filter = 1;
uint32 qos = 2;
}
// WillMessage represents a last will and testament.
message WillMessage {
string topic = 1;
bytes payload = 2;
uint32 qos = 3;
bool retain = 4;
uint32 delay = 5;
int64 disconnect_time = 6;
}
// TakeoverResponse indicates whether the takeover succeeded and returns session state.
message TakeoverResponse {
bool success = 1;
string error = 2;
// Session state transferred from the old node
SessionState session_state = 3;
}
// FetchRetainedRequest requests a retained message from the owning node.
message FetchRetainedRequest {
string topic = 1; // Topic of the retained message to fetch
}
// RetainedMessage represents a complete retained message with all properties.
message RetainedMessage {
string topic = 1;
bytes payload = 2;
uint32 qos = 3;
bool retain = 4;
map<string, string> properties = 5; // MQTT v5 properties
int64 timestamp = 6; // When the message was retained
}
// FetchRetainedResponse returns the retained message if found.
message FetchRetainedResponse {
bool found = 1; // True if message exists
RetainedMessage message = 2; // The retained message (only if found=true)
string error = 3; // Error message if request failed
}
// FetchWillRequest requests a will message from the owning node.
message FetchWillRequest {
string client_id = 1; // Client ID whose will message to fetch
}
// FetchWillResponse returns the will message if found.
message FetchWillResponse {
bool found = 1; // True if will message exists
WillMessage message = 2; // The will message (only if found=true)
string error = 3; // Error message if request failed
}
// EnqueueRemoteRequest requests enqueueing a message on a remote partition owner.
message EnqueueRemoteRequest {
string queue_name = 1; // Queue topic name
bytes payload = 2; // Message payload
map<string, string> properties = 3; // Message properties (including partition-key)
}
// EnqueueRemoteResponse indicates whether the enqueue succeeded.
message EnqueueRemoteResponse {
bool success = 1;
string error = 2; // Error message if success=false
string message_id = 3; // Generated message ID
}
// RouteQueueMessageRequest delivers a queue message to a consumer on another node.
message RouteQueueMessageRequest {
string client_id = 1; // Target consumer client ID
string queue_name = 2; // Queue name
string message_id = 3; // Message ID
bytes payload = 4; // Message payload
map<string, string> properties = 5; // Message properties
int64 sequence = 6; // Message sequence number
int32 partition_id = 7; // Partition ID
}
// RouteQueueMessageResponse indicates whether the delivery succeeded.
message RouteQueueMessageResponse {
bool success = 1;
string error = 2; // Error message if success=false
}
// Raft RPC messages
// AppendEntriesRequest is sent by the leader to replicate log entries.
message AppendEntriesRequest {
string queue_name = 1; // Queue name for routing
int32 partition_id = 2; // Partition ID for routing
uint64 term = 3; // Leader's term
string leader_id = 4; // Leader's node ID
uint64 prev_log_index = 5; // Index of log entry immediately preceding new ones
uint64 prev_log_term = 6; // Term of prev_log_index entry
repeated bytes entries = 7; // Log entries to store (serialized)
uint64 leader_commit = 8; // Leader's commit index
}
// AppendEntriesResponse is the response to AppendEntries.
message AppendEntriesResponse {
uint64 term = 1; // Current term, for leader to update itself
bool success = 2; // True if follower contained entry matching prev_log_index and prev_log_term
uint64 last_log_index = 3; // Follower's last log index (for leader to update next_index)
}
// RequestVoteRequest is sent by candidates during leader election.
message RequestVoteRequest {
string queue_name = 1; // Queue name for routing
int32 partition_id = 2; // Partition ID for routing
uint64 term = 3; // Candidate's term
string candidate_id = 4; // Candidate's node ID
uint64 last_log_index = 5; // Index of candidate's last log entry
uint64 last_log_term = 6; // Term of candidate's last log entry
}
// RequestVoteResponse is the response to RequestVote.
message RequestVoteResponse {
uint64 term = 1; // Current term, for candidate to update itself
bool vote_granted = 2; // True if candidate received vote
}
// InstallSnapshotRequest is sent by leader to transfer snapshot to follower.
message InstallSnapshotRequest {
string queue_name = 1; // Queue name for routing
int32 partition_id = 2; // Partition ID for routing
uint64 term = 3; // Leader's term
string leader_id = 4; // Leader's node ID
uint64 last_included_index = 5; // Snapshot replaces all entries up through this index
uint64 last_included_term = 6; // Term of last_included_index
bytes data = 7; // Snapshot data (chunk)
bool done = 8; // True if this is the last chunk
}
// InstallSnapshotResponse is the response to InstallSnapshot.
message InstallSnapshotResponse {
uint64 term = 1; // Current term, for leader to update itself
}
+232
View File
@@ -0,0 +1,232 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
syntax = "proto3";
package fluxmq.cluster.v1;
// BrokerService defines RPC methods for inter-broker communication in the cluster.
// This is an internal API not exposed to clients.
service BrokerService {
// RoutePublish forwards a PUBLISH packet to the broker that owns the target session.
rpc RoutePublish(PublishRequest) returns (PublishResponse);
// TakeoverSession migrates a session from one broker to another.
rpc TakeoverSession(TakeoverRequest) returns (TakeoverResponse);
// FetchRetained fetches a retained message payload from the owning broker node.
rpc FetchRetained(FetchRetainedRequest) returns (FetchRetainedResponse);
// FetchWill fetches a will message payload from the owning broker node.
rpc FetchWill(FetchWillRequest) returns (FetchWillResponse);
// EnqueueRemote routes an enqueue operation to the partition owner node.
rpc EnqueueRemote(EnqueueRemoteRequest) returns (EnqueueRemoteResponse);
// RouteQueueMessage delivers a queue message to a consumer on a different node.
rpc RouteQueueMessage(RouteQueueMessageRequest) returns (RouteQueueMessageResponse);
// Raft RPCs for queue partition replication
// AppendEntries is invoked by the Raft leader to replicate log entries.
rpc AppendEntries(AppendEntriesRequest) returns (AppendEntriesResponse);
// RequestVote is invoked by candidates during leader election.
rpc RequestVote(RequestVoteRequest) returns (RequestVoteResponse);
// InstallSnapshot is invoked by leader to transfer snapshot to a follower.
rpc InstallSnapshot(InstallSnapshotRequest) returns (InstallSnapshotResponse);
}
// =============================================================================
// Publish Routing
// =============================================================================
message PublishRequest {
string client_id = 1;
string topic = 2;
bytes payload = 3;
uint32 qos = 4;
bool retain = 5;
bool dup = 6;
map<string, string> properties = 7;
}
message PublishResponse {
bool success = 1;
string error = 2;
}
// =============================================================================
// Session Takeover
// =============================================================================
message TakeoverRequest {
string client_id = 1;
string from_node = 2;
string to_node = 3;
}
message TakeoverResponse {
bool success = 1;
string error = 2;
SessionState session_state = 3;
}
message SessionState {
uint32 expiry_interval = 1;
bool clean_start = 2;
repeated InflightMessage inflight_messages = 3;
repeated QueuedMessage queued_messages = 4;
repeated Subscription subscriptions = 5;
WillMessage will = 6;
}
message InflightMessage {
uint32 packet_id = 1;
string topic = 2;
bytes payload = 3;
uint32 qos = 4;
bool retain = 5;
int64 timestamp = 6;
}
message QueuedMessage {
string topic = 1;
bytes payload = 2;
uint32 qos = 3;
bool retain = 4;
int64 timestamp = 5;
}
message Subscription {
string filter = 1;
uint32 qos = 2;
}
message WillMessage {
string topic = 1;
bytes payload = 2;
uint32 qos = 3;
bool retain = 4;
uint32 delay = 5;
int64 disconnect_time = 6;
}
// =============================================================================
// Retained Messages
// =============================================================================
message FetchRetainedRequest {
string topic = 1;
}
message FetchRetainedResponse {
bool found = 1;
RetainedMessage message = 2;
string error = 3;
}
message RetainedMessage {
string topic = 1;
bytes payload = 2;
uint32 qos = 3;
bool retain = 4;
map<string, string> properties = 5;
int64 timestamp = 6;
}
// =============================================================================
// Will Messages
// =============================================================================
message FetchWillRequest {
string client_id = 1;
}
message FetchWillResponse {
bool found = 1;
WillMessage message = 2;
string error = 3;
}
// =============================================================================
// Queue Routing
// =============================================================================
message EnqueueRemoteRequest {
string queue_name = 1;
bytes payload = 2;
map<string, string> properties = 3;
}
message EnqueueRemoteResponse {
bool success = 1;
string error = 2;
string message_id = 3;
}
message RouteQueueMessageRequest {
string client_id = 1;
string queue_name = 2;
string message_id = 3;
bytes payload = 4;
map<string, string> properties = 5;
int64 sequence = 6;
int32 partition_id = 7;
}
message RouteQueueMessageResponse {
bool success = 1;
string error = 2;
}
// =============================================================================
// Raft Consensus
// =============================================================================
message AppendEntriesRequest {
string queue_name = 1;
int32 partition_id = 2;
uint64 term = 3;
string leader_id = 4;
uint64 prev_log_index = 5;
uint64 prev_log_term = 6;
repeated bytes entries = 7;
uint64 leader_commit = 8;
}
message AppendEntriesResponse {
uint64 term = 1;
bool success = 2;
uint64 last_log_index = 3;
}
message RequestVoteRequest {
string queue_name = 1;
int32 partition_id = 2;
uint64 term = 3;
string candidate_id = 4;
uint64 last_log_index = 5;
uint64 last_log_term = 6;
}
message RequestVoteResponse {
uint64 term = 1;
bool vote_granted = 2;
}
message InstallSnapshotRequest {
string queue_name = 1;
int32 partition_id = 2;
uint64 term = 3;
string leader_id = 4;
uint64 last_included_index = 5;
uint64 last_included_term = 6;
bytes data = 7;
bool done = 8;
}
message InstallSnapshotResponse {
uint64 term = 1;
}
+800
View File
@@ -0,0 +1,800 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
syntax = "proto3";
package fluxmq.queue.v1;
import "google/protobuf/timestamp.proto";
import "google/protobuf/duration.proto";
import "google/protobuf/empty.proto";
// =============================================================================
// QueueService - Client-facing API for log-based queues
// =============================================================================
service QueueService {
// ---------------------------------------------------------------------------
// Queue Management
// ---------------------------------------------------------------------------
// CreateQueue creates a new queue with the specified configuration.
rpc CreateQueue(CreateQueueRequest) returns (Queue);
// GetQueue retrieves queue metadata and current state.
rpc GetQueue(GetQueueRequest) returns (Queue);
// ListQueues returns all queues, with optional filtering.
rpc ListQueues(ListQueuesRequest) returns (ListQueuesResponse);
// DeleteQueue removes a queue and all its data.
rpc DeleteQueue(DeleteQueueRequest) returns (google.protobuf.Empty);
// UpdateQueue updates queue configuration (retention, limits, etc).
rpc UpdateQueue(UpdateQueueRequest) returns (Queue);
// ---------------------------------------------------------------------------
// Publishing (Append)
// ---------------------------------------------------------------------------
// Append adds a single message to a queue. Returns the assigned offset.
rpc Append(AppendRequest) returns (AppendResponse);
// AppendBatch adds multiple messages atomically. All messages go to the same partition.
rpc AppendBatch(AppendBatchRequest) returns (AppendBatchResponse);
// AppendStream allows streaming multiple messages. Messages may go to different partitions.
rpc AppendStream(stream AppendRequest) returns (AppendBatchResponse);
// ---------------------------------------------------------------------------
// Reading (Fetch)
// ---------------------------------------------------------------------------
// Read fetches a single message at a specific offset.
rpc Read(ReadRequest) returns (Message);
// ReadBatch fetches multiple messages starting from an offset.
rpc ReadBatch(ReadBatchRequest) returns (ReadBatchResponse);
// Tail streams new messages as they arrive (server-side streaming).
// Starts from specified offset and continues indefinitely.
rpc Tail(TailRequest) returns (stream Message);
// ---------------------------------------------------------------------------
// Seeking
// ---------------------------------------------------------------------------
// SeekToOffset returns partition state at a given offset.
rpc SeekToOffset(SeekToOffsetRequest) returns (SeekResponse);
// SeekToTimestamp finds the offset closest to a timestamp.
rpc SeekToTimestamp(SeekToTimestampRequest) returns (SeekResponse);
// ---------------------------------------------------------------------------
// Consumer Groups
// ---------------------------------------------------------------------------
// CreateConsumerGroup creates a new consumer group for a queue.
rpc CreateConsumerGroup(CreateConsumerGroupRequest) returns (ConsumerGroup);
// GetConsumerGroup retrieves consumer group state.
rpc GetConsumerGroup(GetConsumerGroupRequest) returns (ConsumerGroup);
// ListConsumerGroups lists all consumer groups for a queue.
rpc ListConsumerGroups(ListConsumerGroupsRequest) returns (ListConsumerGroupsResponse);
// DeleteConsumerGroup removes a consumer group.
rpc DeleteConsumerGroup(DeleteConsumerGroupRequest) returns (google.protobuf.Empty);
// JoinGroup adds a consumer to a group and receives partition assignments.
rpc JoinGroup(JoinGroupRequest) returns (JoinGroupResponse);
// LeaveGroup removes a consumer from a group.
rpc LeaveGroup(LeaveGroupRequest) returns (google.protobuf.Empty);
// Heartbeat keeps consumer session alive and receives rebalance notifications.
rpc Heartbeat(HeartbeatRequest) returns (HeartbeatResponse);
// ---------------------------------------------------------------------------
// Consuming (with acknowledgments)
// ---------------------------------------------------------------------------
// Consume fetches the next batch of messages for a consumer.
// Messages are tracked in the Pending Entry List (PEL) until acknowledged.
rpc Consume(ConsumeRequest) returns (ConsumeResponse);
// ConsumeStream continuously delivers messages to a consumer (server streaming).
rpc ConsumeStream(ConsumeStreamRequest) returns (stream Message);
// Ack acknowledges successful processing of messages.
rpc Ack(AckRequest) returns (AckResponse);
// Nack indicates processing failure. Messages will be redelivered.
rpc Nack(NackRequest) returns (google.protobuf.Empty);
// Claim transfers pending messages from one consumer to another (work stealing).
rpc Claim(ClaimRequest) returns (ClaimResponse);
// GetPending retrieves pending (unacknowledged) messages for a consumer or group.
rpc GetPending(GetPendingRequest) returns (GetPendingResponse);
// ---------------------------------------------------------------------------
// Partition Info
// ---------------------------------------------------------------------------
// GetPartitionInfo returns metadata for a specific partition.
rpc GetPartitionInfo(GetPartitionInfoRequest) returns (PartitionInfo);
// ListPartitions returns info for all partitions in a queue.
rpc ListPartitions(ListPartitionsRequest) returns (ListPartitionsResponse);
// ---------------------------------------------------------------------------
// Admin Operations
// ---------------------------------------------------------------------------
// GetStats returns queue statistics.
rpc GetStats(GetStatsRequest) returns (QueueStats);
// Purge removes all messages from a queue (or partition).
rpc Purge(PurgeRequest) returns (PurgeResponse);
// Truncate removes messages before a given offset (retention enforcement).
rpc Truncate(TruncateRequest) returns (google.protobuf.Empty);
}
// =============================================================================
// Core Types
// =============================================================================
// Message represents a single message in the log.
message Message {
// Unique offset within the partition (assigned by server).
uint64 offset = 1;
// Partition this message belongs to.
uint32 partition_id = 2;
// Timestamp when message was appended.
google.protobuf.Timestamp timestamp = 3;
// Optional key for partitioning and compaction.
bytes key = 4;
// Message payload.
bytes value = 5;
// Optional headers (metadata).
map<string, bytes> headers = 6;
// Delivery metadata (only set when consuming via consumer group).
DeliveryInfo delivery_info = 7;
}
// DeliveryInfo contains metadata about message delivery to a consumer.
message DeliveryInfo {
// Number of times this message has been delivered.
uint32 delivery_count = 1;
// Timestamp of first delivery.
google.protobuf.Timestamp first_delivered_at = 2;
// Timestamp of this delivery.
google.protobuf.Timestamp delivered_at = 3;
// Consumer group this delivery belongs to.
string group_id = 4;
// Consumer receiving this delivery.
string consumer_id = 5;
}
// Queue represents a queue and its configuration.
message Queue {
// Queue name (unique identifier).
string name = 1;
// Number of partitions.
uint32 partitions = 2;
// Queue configuration.
QueueConfig config = 3;
// Current queue state.
QueueState state = 4;
// Creation timestamp.
google.protobuf.Timestamp created_at = 5;
// Last modification timestamp.
google.protobuf.Timestamp updated_at = 6;
}
message QueueConfig {
// Retention policy.
RetentionConfig retention = 1;
// Compression type: NONE, S2, ZSTD.
CompressionType compression = 2;
// Maximum message size in bytes.
uint32 max_message_size = 3;
// Segment configuration.
SegmentConfig segment = 4;
}
message RetentionConfig {
// Maximum age of messages (0 = unlimited).
google.protobuf.Duration max_age = 1;
// Maximum total size in bytes (0 = unlimited).
uint64 max_bytes = 2;
// Minimum number of messages to retain regardless of age/size.
uint64 min_messages = 3;
}
message SegmentConfig {
// Maximum segment size before rolling.
uint64 max_size = 1;
// Maximum segment age before rolling.
google.protobuf.Duration max_age = 2;
// Index interval in bytes.
uint32 index_interval = 3;
}
message QueueState {
// Total messages across all partitions.
uint64 total_messages = 1;
// Total size in bytes.
uint64 total_bytes = 2;
// Number of active consumer groups.
uint32 consumer_group_count = 3;
// Oldest message timestamp.
google.protobuf.Timestamp oldest_message = 4;
// Newest message timestamp.
google.protobuf.Timestamp newest_message = 5;
}
enum CompressionType {
COMPRESSION_TYPE_UNSPECIFIED = 0;
COMPRESSION_TYPE_NONE = 1;
COMPRESSION_TYPE_S2 = 2;
COMPRESSION_TYPE_ZSTD = 3;
}
// PartitionInfo contains metadata about a single partition.
message PartitionInfo {
uint32 partition_id = 1;
// First valid offset (after truncation).
uint64 head_offset = 2;
// Next offset to be assigned.
uint64 tail_offset = 3;
// Number of messages (tail - head).
uint64 message_count = 4;
// Size in bytes.
uint64 size_bytes = 5;
// Number of segments.
uint32 segment_count = 6;
// Oldest message timestamp.
google.protobuf.Timestamp oldest_timestamp = 7;
// Newest message timestamp.
google.protobuf.Timestamp newest_timestamp = 8;
}
// =============================================================================
// Consumer Group Types
// =============================================================================
message ConsumerGroup {
// Queue this group belongs to.
string queue_name = 1;
// Group identifier.
string group_id = 2;
// Group configuration.
ConsumerGroupConfig config = 3;
// Per-partition state.
repeated PartitionCursor cursors = 4;
// Active consumers.
repeated ConsumerInfo consumers = 5;
// Total pending (unacked) messages.
uint64 pending_count = 6;
// Creation timestamp.
google.protobuf.Timestamp created_at = 7;
}
message ConsumerGroupConfig {
// How long to wait for ack before redelivery.
google.protobuf.Duration ack_timeout = 1;
// Maximum redelivery attempts before moving to DLQ.
uint32 max_redeliveries = 2;
// Initial position when group starts: EARLIEST, LATEST, or specific offset.
InitialPosition initial_position = 3;
}
enum InitialPosition {
INITIAL_POSITION_UNSPECIFIED = 0;
INITIAL_POSITION_EARLIEST = 1; // Start from beginning
INITIAL_POSITION_LATEST = 2; // Start from end (new messages only)
}
message PartitionCursor {
uint32 partition_id = 1;
// Next offset to deliver.
uint64 cursor = 2;
// Highest contiguously acknowledged offset.
uint64 committed = 3;
// Lag (tail - cursor).
uint64 lag = 4;
// Last activity timestamp.
google.protobuf.Timestamp updated_at = 5;
}
message ConsumerInfo {
// Consumer identifier.
string consumer_id = 1;
// Assigned partitions.
repeated uint32 partitions = 2;
// Number of pending messages for this consumer.
uint64 pending_count = 3;
// Last heartbeat timestamp.
google.protobuf.Timestamp last_heartbeat = 4;
// Consumer metadata (client version, hostname, etc).
map<string, string> metadata = 5;
}
message PendingEntry {
uint64 offset = 1;
uint32 partition_id = 2;
string consumer_id = 3;
google.protobuf.Timestamp delivered_at = 4;
uint32 delivery_count = 5;
google.protobuf.Duration idle_time = 6;
}
// =============================================================================
// Request/Response Messages
// =============================================================================
// --- Queue Management ---
message CreateQueueRequest {
string name = 1;
uint32 partitions = 2;
QueueConfig config = 3;
}
message GetQueueRequest {
string name = 1;
}
message ListQueuesRequest {
// Optional prefix filter.
string prefix = 1;
// Pagination.
uint32 limit = 2;
string page_token = 3;
}
message ListQueuesResponse {
repeated Queue queues = 1;
string next_page_token = 2;
}
message DeleteQueueRequest {
string name = 1;
}
message UpdateQueueRequest {
string name = 1;
QueueConfig config = 2;
}
// --- Append ---
message AppendRequest {
string queue_name = 1;
// Optional partition key (hashed to determine partition).
// If not set, round-robin assignment.
bytes partition_key = 2;
// Or explicit partition ID (overrides partition_key).
optional uint32 partition_id = 3;
// Message key (for compaction, optional).
bytes key = 4;
// Message value.
bytes value = 5;
// Optional headers.
map<string, bytes> headers = 6;
}
message AppendResponse {
uint64 offset = 1;
uint32 partition_id = 2;
google.protobuf.Timestamp timestamp = 3;
}
message AppendBatchRequest {
string queue_name = 1;
// All messages go to the same partition.
bytes partition_key = 2;
optional uint32 partition_id = 3;
// Messages to append.
repeated BatchMessage messages = 4;
}
message BatchMessage {
bytes key = 1;
bytes value = 2;
map<string, bytes> headers = 3;
}
message AppendBatchResponse {
// First offset assigned.
uint64 first_offset = 1;
// Last offset assigned.
uint64 last_offset = 2;
uint32 partition_id = 3;
uint32 count = 4;
google.protobuf.Timestamp timestamp = 5;
}
// --- Read ---
message ReadRequest {
string queue_name = 1;
uint32 partition_id = 2;
uint64 offset = 3;
}
message ReadBatchRequest {
string queue_name = 1;
uint32 partition_id = 2;
uint64 start_offset = 3;
// Maximum messages to return.
uint32 limit = 4;
// Maximum bytes to return (soft limit).
uint32 max_bytes = 5;
}
message ReadBatchResponse {
repeated Message messages = 1;
// Next offset to read (for pagination).
uint64 next_offset = 2;
// True if end of partition reached.
bool end_of_partition = 3;
}
message TailRequest {
string queue_name = 1;
uint32 partition_id = 2;
// Starting offset. Use HEAD for earliest, TAIL for latest.
uint64 start_offset = 3;
// Special offset values.
enum StartFrom {
START_FROM_UNSPECIFIED = 0;
START_FROM_EARLIEST = 1; // Start from head
START_FROM_LATEST = 2; // Start from tail (new messages only)
START_FROM_OFFSET = 3; // Use start_offset value
}
StartFrom start_from = 4;
}
// --- Seek ---
message SeekToOffsetRequest {
string queue_name = 1;
uint32 partition_id = 2;
uint64 offset = 3;
}
message SeekToTimestampRequest {
string queue_name = 1;
uint32 partition_id = 2;
google.protobuf.Timestamp timestamp = 3;
}
message SeekResponse {
uint64 offset = 1;
uint32 partition_id = 2;
google.protobuf.Timestamp timestamp = 3;
// True if exact match found.
bool exact_match = 4;
}
// --- Consumer Groups ---
message CreateConsumerGroupRequest {
string queue_name = 1;
string group_id = 2;
ConsumerGroupConfig config = 3;
}
message GetConsumerGroupRequest {
string queue_name = 1;
string group_id = 2;
}
message ListConsumerGroupsRequest {
string queue_name = 1;
uint32 limit = 2;
string page_token = 3;
}
message ListConsumerGroupsResponse {
repeated ConsumerGroup groups = 1;
string next_page_token = 2;
}
message DeleteConsumerGroupRequest {
string queue_name = 1;
string group_id = 2;
}
message JoinGroupRequest {
string queue_name = 1;
string group_id = 2;
string consumer_id = 3;
// Optional consumer metadata.
map<string, string> metadata = 4;
// Session timeout - consumer removed if no heartbeat.
google.protobuf.Duration session_timeout = 5;
}
message JoinGroupResponse {
// Generation ID (increments on rebalance).
uint64 generation_id = 1;
// Partitions assigned to this consumer.
repeated uint32 assigned_partitions = 2;
// All consumers in the group.
repeated ConsumerInfo consumers = 3;
}
message LeaveGroupRequest {
string queue_name = 1;
string group_id = 2;
string consumer_id = 3;
}
message HeartbeatRequest {
string queue_name = 1;
string group_id = 2;
string consumer_id = 3;
uint64 generation_id = 4;
}
message HeartbeatResponse {
// If true, consumer should rejoin (rebalance occurred).
bool should_rejoin = 1;
// Current generation ID.
uint64 generation_id = 2;
}
// --- Consuming ---
message ConsumeRequest {
string queue_name = 1;
string group_id = 2;
string consumer_id = 3;
// Maximum messages to return.
uint32 max_messages = 4;
// Maximum bytes to return.
uint32 max_bytes = 5;
// How long to wait if no messages available.
google.protobuf.Duration wait_time = 6;
}
message ConsumeResponse {
repeated Message messages = 1;
}
message ConsumeStreamRequest {
string queue_name = 1;
string group_id = 2;
string consumer_id = 3;
// Maximum in-flight (unacked) messages.
uint32 max_in_flight = 4;
}
message AckRequest {
string queue_name = 1;
string group_id = 2;
string consumer_id = 3;
// Offsets to acknowledge (grouped by partition).
repeated PartitionOffsets offsets = 4;
}
message PartitionOffsets {
uint32 partition_id = 1;
repeated uint64 offsets = 2;
}
message AckResponse {
// Number of messages acknowledged.
uint32 acked_count = 1;
// New committed offsets after this ack.
repeated PartitionCursor committed = 2;
}
message NackRequest {
string queue_name = 1;
string group_id = 2;
string consumer_id = 3;
repeated PartitionOffsets offsets = 4;
// Optional delay before redelivery.
google.protobuf.Duration delay = 5;
}
message ClaimRequest {
string queue_name = 1;
string group_id = 2;
// Consumer claiming the messages.
string consumer_id = 3;
// Claim messages idle longer than this duration.
google.protobuf.Duration min_idle_time = 4;
// Maximum messages to claim.
uint32 limit = 5;
// Optional: claim from specific partition only.
optional uint32 partition_id = 6;
}
message ClaimResponse {
repeated Message messages = 1;
}
message GetPendingRequest {
string queue_name = 1;
string group_id = 2;
// Optional: filter by consumer.
string consumer_id = 3;
// Optional: filter by partition.
optional uint32 partition_id = 4;
// Pagination.
uint32 limit = 5;
string page_token = 6;
}
message GetPendingResponse {
repeated PendingEntry entries = 1;
string next_page_token = 2;
uint64 total_pending = 3;
}
// --- Partition Info ---
message GetPartitionInfoRequest {
string queue_name = 1;
uint32 partition_id = 2;
}
message ListPartitionsRequest {
string queue_name = 1;
}
message ListPartitionsResponse {
repeated PartitionInfo partitions = 1;
}
// --- Admin ---
message GetStatsRequest {
string queue_name = 1;
}
message QueueStats {
string queue_name = 1;
// Per-partition stats.
repeated PartitionStats partitions = 2;
// Aggregate stats.
uint64 total_messages = 3;
uint64 total_bytes = 4;
// Throughput (messages/sec, rolling average).
double append_rate = 5;
double read_rate = 6;
// Consumer group summaries.
repeated ConsumerGroupSummary consumer_groups = 7;
}
message PartitionStats {
uint32 partition_id = 1;
uint64 message_count = 2;
uint64 size_bytes = 3;
uint64 head_offset = 4;
uint64 tail_offset = 5;
uint32 segment_count = 6;
}
message ConsumerGroupSummary {
string group_id = 1;
uint32 consumer_count = 2;
uint64 total_lag = 3;
uint64 pending_count = 4;
}
message PurgeRequest {
string queue_name = 1;
// Optional: purge specific partition only.
optional uint32 partition_id = 2;
}
message PurgeResponse {
uint64 messages_deleted = 1;
uint64 bytes_freed = 2;
}
message TruncateRequest {
string queue_name = 1;
uint32 partition_id = 2;
// Remove messages with offset < min_offset.
uint64 min_offset = 3;
}
+2 -2
View File
@@ -12,7 +12,7 @@ import (
"time"
"github.com/absmach/fluxmq/cluster"
"github.com/absmach/fluxmq/cluster/grpc"
clusterv1 "github.com/absmach/fluxmq/pkg/proto/cluster/v1"
queueStorage "github.com/absmach/fluxmq/queue/storage"
"github.com/absmach/fluxmq/queue/storage/memory"
"github.com/absmach/fluxmq/queue/types"
@@ -719,7 +719,7 @@ func (m *MockCluster) RoutePublish(ctx context.Context, topic string, payload []
return nil
}
func (m *MockCluster) TakeoverSession(ctx context.Context, clientID, fromNode, toNode string) (*grpc.SessionState, error) {
func (m *MockCluster) TakeoverSession(ctx context.Context, clientID, fromNode, toNode string) (*clusterv1.SessionState, error) {
return nil, nil
}
func (m *MockCluster) IsLeader() bool { return true }
+2 -2
View File
@@ -10,7 +10,7 @@ import (
"time"
"github.com/absmach/fluxmq/cluster"
"github.com/absmach/fluxmq/cluster/grpc"
clusterv1 "github.com/absmach/fluxmq/pkg/proto/cluster/v1"
"github.com/absmach/fluxmq/queue/consumer"
"github.com/absmach/fluxmq/queue/storage/memory"
"github.com/absmach/fluxmq/queue/types"
@@ -113,7 +113,7 @@ func (c *MockCluster) RoutePublish(ctx context.Context, topic string, payload []
return nil
}
func (c *MockCluster) TakeoverSession(ctx context.Context, clientID, fromNode, toNode string) (*grpc.SessionState, error) {
func (c *MockCluster) TakeoverSession(ctx context.Context, clientID, fromNode, toNode string) (*clusterv1.SessionState, error) {
return nil, nil
}
func (c *MockCluster) ID() string { return c.localNodeID }
+2 -2
View File
@@ -11,7 +11,7 @@ import (
"time"
"github.com/absmach/fluxmq/cluster"
"github.com/absmach/fluxmq/cluster/grpc"
clusterv1 "github.com/absmach/fluxmq/pkg/proto/cluster/v1"
"github.com/absmach/fluxmq/queue/storage"
"github.com/absmach/fluxmq/queue/types"
brokerStorage "github.com/absmach/fluxmq/storage"
@@ -902,7 +902,7 @@ func (c *mockCluster) RoutePublish(ctx context.Context, topic string, payload []
return nil
}
func (c *mockCluster) TakeoverSession(ctx context.Context, clientID, fromNode, toNode string) (*grpc.SessionState, error) {
func (c *mockCluster) TakeoverSession(ctx context.Context, clientID, fromNode, toNode string) (*clusterv1.SessionState, error) {
return nil, nil
}
+97
View File
@@ -0,0 +1,97 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package api
import (
"context"
"fmt"
"log/slog"
"net/http"
"time"
"github.com/absmach/fluxmq/pkg/proto/queue/v1/queuev1connect"
"github.com/absmach/fluxmq/queue"
"github.com/absmach/fluxmq/queue/storage"
serverqueue "github.com/absmach/fluxmq/server/queue"
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
)
// Config holds configuration for the API server.
type Config struct {
Address string
ShutdownTimeout time.Duration
TLSCertFile string
TLSKeyFile string
}
// Server provides the HTTP/gRPC API server using Connect protocol.
type Server struct {
config Config
httpServer *http.Server
logger *slog.Logger
}
// New creates a new API server.
func New(config Config, manager *queue.Manager, logStore storage.LogStore, groupStore storage.ConsumerGroupStore, logger *slog.Logger) *Server {
if logger == nil {
logger = slog.Default()
}
mux := http.NewServeMux()
queueHandler := serverqueue.NewHandler(manager, logStore, groupStore, logger)
path, handler := queuev1connect.NewQueueServiceHandler(queueHandler)
mux.Handle(path, handler)
mux.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte(`{"status":"ok"}`))
})
h2s := &http2.Server{}
httpServer := &http.Server{
Addr: config.Address,
Handler: h2c.NewHandler(mux, h2s),
ReadTimeout: 30 * time.Second,
WriteTimeout: 30 * time.Second,
}
return &Server{
config: config,
httpServer: httpServer,
logger: logger,
}
}
// Listen starts the API server.
func (s *Server) Listen(ctx context.Context) error {
errCh := make(chan error, 1)
go func() {
var err error
if s.config.TLSCertFile != "" && s.config.TLSKeyFile != "" {
s.logger.Info("Starting API server with TLS",
slog.String("address", s.config.Address))
err = s.httpServer.ListenAndServeTLS(s.config.TLSCertFile, s.config.TLSKeyFile)
} else {
s.logger.Info("Starting API server (h2c)",
slog.String("address", s.config.Address))
err = s.httpServer.ListenAndServe()
}
if err != nil && err != http.ErrServerClosed {
errCh <- err
}
}()
select {
case <-ctx.Done():
s.logger.Info("Shutting down API server")
shutdownCtx, cancel := context.WithTimeout(context.Background(), s.config.ShutdownTimeout)
defer cancel()
return s.httpServer.Shutdown(shutdownCtx)
case err := <-errCh:
return fmt.Errorf("API server error: %w", err)
}
}
+2 -2
View File
@@ -14,7 +14,7 @@ import (
"github.com/absmach/fluxmq/broker"
"github.com/absmach/fluxmq/cluster"
"github.com/absmach/fluxmq/cluster/grpc"
clusterv1 "github.com/absmach/fluxmq/pkg/proto/cluster/v1"
"github.com/absmach/fluxmq/storage"
)
@@ -104,7 +104,7 @@ func (m *mockCluster) RoutePublish(ctx context.Context, topic string, payload []
return nil
}
func (m *mockCluster) TakeoverSession(ctx context.Context, clientID, fromNode, toNode string) (*grpc.SessionState, error) {
func (m *mockCluster) TakeoverSession(ctx context.Context, clientID, fromNode, toNode string) (*clusterv1.SessionState, error) {
return nil, nil
}
+944
View File
@@ -0,0 +1,944 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package queue
import (
"context"
"fmt"
"log/slog"
"time"
"connectrpc.com/connect"
queuev1 "github.com/absmach/fluxmq/pkg/proto/queue/v1"
"github.com/absmach/fluxmq/pkg/proto/queue/v1/queuev1connect"
"github.com/absmach/fluxmq/queue"
"github.com/absmach/fluxmq/queue/storage"
"github.com/absmach/fluxmq/queue/types"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/emptypb"
"google.golang.org/protobuf/types/known/timestamppb"
)
// Handler implements the QueueServiceHandler interface.
type Handler struct {
queuev1connect.UnimplementedQueueServiceHandler
manager *queue.Manager
logStore storage.LogStore
groupStore storage.ConsumerGroupStore
logger *slog.Logger
}
// NewHandler creates a new queue service handler.
func NewHandler(manager *queue.Manager, logStore storage.LogStore, groupStore storage.ConsumerGroupStore, logger *slog.Logger) *Handler {
if logger == nil {
logger = slog.Default()
}
return &Handler{
manager: manager,
logStore: logStore,
groupStore: groupStore,
logger: logger,
}
}
// --- Queue Management ---
func (h *Handler) CreateQueue(ctx context.Context, req *connect.Request[queuev1.CreateQueueRequest]) (*connect.Response[queuev1.Queue], error) {
msg := req.Msg
config := types.QueueConfig{
Name: msg.Name,
Partitions: int(msg.Partitions),
MessageTTL: 7 * 24 * time.Hour,
}
if config.Partitions == 0 {
config.Partitions = 10
}
if msg.Config != nil && msg.Config.Retention != nil && msg.Config.Retention.MaxAge != nil {
config.MessageTTL = msg.Config.Retention.MaxAge.AsDuration()
}
if err := h.manager.CreateQueue(ctx, config); err != nil {
return nil, connect.NewError(connect.CodeAlreadyExists, err)
}
return connect.NewResponse(h.queueToProto(&config)), nil
}
func (h *Handler) GetQueue(ctx context.Context, req *connect.Request[queuev1.GetQueueRequest]) (*connect.Response[queuev1.Queue], error) {
config, err := h.logStore.GetQueue(ctx, req.Msg.Name)
if err != nil {
if err == storage.ErrQueueNotFound {
return nil, connect.NewError(connect.CodeNotFound, err)
}
return nil, connect.NewError(connect.CodeInternal, err)
}
return connect.NewResponse(h.queueToProto(config)), nil
}
func (h *Handler) ListQueues(ctx context.Context, req *connect.Request[queuev1.ListQueuesRequest]) (*connect.Response[queuev1.ListQueuesResponse], error) {
configs, err := h.logStore.ListQueues(ctx)
if err != nil {
return nil, connect.NewError(connect.CodeInternal, err)
}
queues := make([]*queuev1.Queue, len(configs))
for i := range configs {
queues[i] = h.queueToProto(&configs[i])
}
return connect.NewResponse(&queuev1.ListQueuesResponse{
Queues: queues,
}), nil
}
func (h *Handler) DeleteQueue(ctx context.Context, req *connect.Request[queuev1.DeleteQueueRequest]) (*connect.Response[emptypb.Empty], error) {
if err := h.manager.DeleteQueue(ctx, req.Msg.Name); err != nil {
if err == storage.ErrQueueNotFound {
return nil, connect.NewError(connect.CodeNotFound, err)
}
return nil, connect.NewError(connect.CodeInternal, err)
}
return connect.NewResponse(&emptypb.Empty{}), nil
}
func (h *Handler) UpdateQueue(ctx context.Context, req *connect.Request[queuev1.UpdateQueueRequest]) (*connect.Response[queuev1.Queue], error) {
config, err := h.logStore.GetQueue(ctx, req.Msg.Name)
if err != nil {
if err == storage.ErrQueueNotFound {
return nil, connect.NewError(connect.CodeNotFound, err)
}
return nil, connect.NewError(connect.CodeInternal, err)
}
return connect.NewResponse(h.queueToProto(config)), nil
}
// --- Append Operations ---
func (h *Handler) Append(ctx context.Context, req *connect.Request[queuev1.AppendRequest]) (*connect.Response[queuev1.AppendResponse], error) {
msg := req.Msg
properties := make(map[string]string)
if len(msg.Headers) > 0 {
for k, v := range msg.Headers {
properties[k] = string(v)
}
}
if len(msg.PartitionKey) > 0 {
properties["partition-key"] = string(msg.PartitionKey)
}
if err := h.manager.Enqueue(ctx, msg.QueueName, msg.Value, properties); err != nil {
return nil, connect.NewError(connect.CodeInternal, err)
}
config, err := h.logStore.GetQueue(ctx, msg.QueueName)
if err != nil {
return nil, connect.NewError(connect.CodeInternal, err)
}
partitionID := h.getPartitionID(string(msg.PartitionKey), config.Partitions)
if msg.PartitionId != nil {
partitionID = int(*msg.PartitionId)
}
tail, _ := h.logStore.Tail(ctx, msg.QueueName, partitionID)
return connect.NewResponse(&queuev1.AppendResponse{
Offset: tail - 1,
PartitionId: uint32(partitionID),
Timestamp: timestamppb.Now(),
}), nil
}
func (h *Handler) AppendBatch(ctx context.Context, req *connect.Request[queuev1.AppendBatchRequest]) (*connect.Response[queuev1.AppendBatchResponse], error) {
msg := req.Msg
config, err := h.logStore.GetQueue(ctx, msg.QueueName)
if err != nil {
return nil, connect.NewError(connect.CodeInternal, err)
}
partitionID := h.getPartitionID(string(msg.PartitionKey), config.Partitions)
if msg.PartitionId != nil {
partitionID = int(*msg.PartitionId)
}
var firstOffset, lastOffset uint64
var count uint32
for i, entry := range msg.Messages {
properties := make(map[string]string)
if len(entry.Headers) > 0 {
for k, v := range entry.Headers {
properties[k] = string(v)
}
}
if err := h.manager.Enqueue(ctx, msg.QueueName, entry.Value, properties); err != nil {
return nil, connect.NewError(connect.CodeInternal, err)
}
tail, _ := h.logStore.Tail(ctx, msg.QueueName, partitionID)
offset := tail - 1
if i == 0 {
firstOffset = offset
}
lastOffset = offset
count++
}
return connect.NewResponse(&queuev1.AppendBatchResponse{
FirstOffset: firstOffset,
LastOffset: lastOffset,
PartitionId: uint32(partitionID),
Count: count,
Timestamp: timestamppb.Now(),
}), nil
}
func (h *Handler) AppendStream(ctx context.Context, stream *connect.ClientStream[queuev1.AppendRequest]) (*connect.Response[queuev1.AppendBatchResponse], error) {
var firstOffset, lastOffset uint64
var count uint32
var lastPartitionID uint32
first := true
for stream.Receive() {
msg := stream.Msg()
properties := make(map[string]string)
if len(msg.Headers) > 0 {
for k, v := range msg.Headers {
properties[k] = string(v)
}
}
if len(msg.PartitionKey) > 0 {
properties["partition-key"] = string(msg.PartitionKey)
}
if err := h.manager.Enqueue(ctx, msg.QueueName, msg.Value, properties); err != nil {
continue
}
config, err := h.logStore.GetQueue(ctx, msg.QueueName)
if err != nil {
continue
}
partitionID := h.getPartitionID(string(msg.PartitionKey), config.Partitions)
if msg.PartitionId != nil {
partitionID = int(*msg.PartitionId)
}
tail, _ := h.logStore.Tail(ctx, msg.QueueName, partitionID)
offset := tail - 1
if first {
firstOffset = offset
first = false
}
lastOffset = offset
lastPartitionID = uint32(partitionID)
count++
}
if err := stream.Err(); err != nil {
return nil, connect.NewError(connect.CodeInternal, err)
}
return connect.NewResponse(&queuev1.AppendBatchResponse{
FirstOffset: firstOffset,
LastOffset: lastOffset,
PartitionId: lastPartitionID,
Count: count,
Timestamp: timestamppb.Now(),
}), nil
}
// --- Read Operations ---
func (h *Handler) Read(ctx context.Context, req *connect.Request[queuev1.ReadRequest]) (*connect.Response[queuev1.Message], error) {
msg := req.Msg
message, err := h.logStore.Read(ctx, msg.QueueName, int(msg.PartitionId), msg.Offset)
if err != nil {
if err == storage.ErrOffsetOutOfRange {
return nil, connect.NewError(connect.CodeOutOfRange, err)
}
return nil, connect.NewError(connect.CodeInternal, err)
}
return connect.NewResponse(h.messageToProto(message)), nil
}
func (h *Handler) ReadBatch(ctx context.Context, req *connect.Request[queuev1.ReadBatchRequest]) (*connect.Response[queuev1.ReadBatchResponse], error) {
msg := req.Msg
limit := int(msg.Limit)
if limit == 0 {
limit = 100
}
messages, err := h.logStore.ReadBatch(ctx, msg.QueueName, int(msg.PartitionId), msg.StartOffset, limit)
if err != nil {
return nil, connect.NewError(connect.CodeInternal, err)
}
protoMsgs := make([]*queuev1.Message, len(messages))
for i, m := range messages {
protoMsgs[i] = h.messageToProto(m)
}
return connect.NewResponse(&queuev1.ReadBatchResponse{
Messages: protoMsgs,
}), nil
}
func (h *Handler) Tail(ctx context.Context, req *connect.Request[queuev1.TailRequest], stream *connect.ServerStream[queuev1.Message]) error {
msg := req.Msg
offset := msg.StartOffset
for {
select {
case <-ctx.Done():
return nil
default:
}
messages, err := h.logStore.ReadBatch(ctx, msg.QueueName, int(msg.PartitionId), offset, 10)
if err != nil {
if err == storage.ErrOffsetOutOfRange {
time.Sleep(100 * time.Millisecond)
continue
}
return connect.NewError(connect.CodeInternal, err)
}
for _, m := range messages {
if err := stream.Send(h.messageToProto(m)); err != nil {
return err
}
offset = m.Sequence + 1
}
if len(messages) == 0 {
time.Sleep(100 * time.Millisecond)
}
}
}
// --- Seek Operations ---
func (h *Handler) SeekToOffset(ctx context.Context, req *connect.Request[queuev1.SeekToOffsetRequest]) (*connect.Response[queuev1.SeekResponse], error) {
msg := req.Msg
head, err := h.logStore.Head(ctx, msg.QueueName, int(msg.PartitionId))
if err != nil {
return nil, connect.NewError(connect.CodeInternal, err)
}
tail, err := h.logStore.Tail(ctx, msg.QueueName, int(msg.PartitionId))
if err != nil {
return nil, connect.NewError(connect.CodeInternal, err)
}
offset := msg.Offset
if offset < head {
offset = head
}
if offset > tail {
offset = tail
}
return connect.NewResponse(&queuev1.SeekResponse{
Offset: offset,
PartitionId: msg.PartitionId,
}), nil
}
func (h *Handler) SeekToTimestamp(ctx context.Context, req *connect.Request[queuev1.SeekToTimestampRequest]) (*connect.Response[queuev1.SeekResponse], error) {
msg := req.Msg
head, err := h.logStore.Head(ctx, msg.QueueName, int(msg.PartitionId))
if err != nil {
return nil, connect.NewError(connect.CodeInternal, err)
}
return connect.NewResponse(&queuev1.SeekResponse{
Offset: head,
PartitionId: msg.PartitionId,
}), nil
}
// --- Consumer Group Operations ---
func (h *Handler) CreateConsumerGroup(ctx context.Context, req *connect.Request[queuev1.CreateConsumerGroupRequest]) (*connect.Response[queuev1.ConsumerGroup], error) {
msg := req.Msg
config, err := h.logStore.GetQueue(ctx, msg.QueueName)
if err != nil {
if err == storage.ErrQueueNotFound {
return nil, connect.NewError(connect.CodeNotFound, err)
}
return nil, connect.NewError(connect.CodeInternal, err)
}
group := &types.ConsumerGroupState{
ID: msg.GroupId,
QueueName: msg.QueueName,
Pattern: "",
Cursors: make(map[int]*types.PartitionCursor),
Consumers: make(map[string]*types.ConsumerInfo),
PEL: make(map[string][]*types.PendingEntry),
CreatedAt: time.Now(),
}
for i := 0; i < config.Partitions; i++ {
head, _ := h.logStore.Head(ctx, msg.QueueName, i)
group.Cursors[i] = &types.PartitionCursor{
PartitionID: i,
Cursor: head,
Committed: head,
}
}
if err := h.groupStore.CreateConsumerGroup(ctx, group); err != nil {
if err == storage.ErrConsumerGroupExists {
return nil, connect.NewError(connect.CodeAlreadyExists, err)
}
return nil, connect.NewError(connect.CodeInternal, err)
}
return connect.NewResponse(h.groupToProto(group)), nil
}
func (h *Handler) GetConsumerGroup(ctx context.Context, req *connect.Request[queuev1.GetConsumerGroupRequest]) (*connect.Response[queuev1.ConsumerGroup], error) {
msg := req.Msg
group, err := h.groupStore.GetConsumerGroup(ctx, msg.QueueName, msg.GroupId)
if err != nil {
if err == storage.ErrConsumerNotFound {
return nil, connect.NewError(connect.CodeNotFound, err)
}
return nil, connect.NewError(connect.CodeInternal, err)
}
return connect.NewResponse(h.groupToProto(group)), nil
}
func (h *Handler) ListConsumerGroups(ctx context.Context, req *connect.Request[queuev1.ListConsumerGroupsRequest]) (*connect.Response[queuev1.ListConsumerGroupsResponse], error) {
msg := req.Msg
groups, err := h.groupStore.ListConsumerGroups(ctx, msg.QueueName)
if err != nil {
return nil, connect.NewError(connect.CodeInternal, err)
}
protoGroups := make([]*queuev1.ConsumerGroup, len(groups))
for i, g := range groups {
protoGroups[i] = h.groupToProto(g)
}
return connect.NewResponse(&queuev1.ListConsumerGroupsResponse{
Groups: protoGroups,
}), nil
}
func (h *Handler) DeleteConsumerGroup(ctx context.Context, req *connect.Request[queuev1.DeleteConsumerGroupRequest]) (*connect.Response[emptypb.Empty], error) {
msg := req.Msg
if err := h.groupStore.DeleteConsumerGroup(ctx, msg.QueueName, msg.GroupId); err != nil {
return nil, connect.NewError(connect.CodeInternal, err)
}
return connect.NewResponse(&emptypb.Empty{}), nil
}
func (h *Handler) JoinGroup(ctx context.Context, req *connect.Request[queuev1.JoinGroupRequest]) (*connect.Response[queuev1.JoinGroupResponse], error) {
msg := req.Msg
consumer := &types.ConsumerInfo{
ID: msg.ConsumerId,
ClientID: msg.ConsumerId,
LastHeartbeat: time.Now(),
RegisteredAt: time.Now(),
}
if err := h.groupStore.RegisterConsumer(ctx, msg.QueueName, msg.GroupId, consumer); err != nil {
return nil, connect.NewError(connect.CodeInternal, err)
}
group, err := h.groupStore.GetConsumerGroup(ctx, msg.QueueName, msg.GroupId)
if err != nil {
return nil, connect.NewError(connect.CodeInternal, err)
}
assignedPartitions := make([]uint32, 0)
for partitionID := range group.Cursors {
assignedPartitions = append(assignedPartitions, uint32(partitionID))
}
return connect.NewResponse(&queuev1.JoinGroupResponse{
GenerationId: 1,
AssignedPartitions: assignedPartitions,
}), nil
}
func (h *Handler) LeaveGroup(ctx context.Context, req *connect.Request[queuev1.LeaveGroupRequest]) (*connect.Response[emptypb.Empty], error) {
msg := req.Msg
if err := h.groupStore.UnregisterConsumer(ctx, msg.QueueName, msg.GroupId, msg.ConsumerId); err != nil {
return nil, connect.NewError(connect.CodeInternal, err)
}
return connect.NewResponse(&emptypb.Empty{}), nil
}
func (h *Handler) Heartbeat(ctx context.Context, req *connect.Request[queuev1.HeartbeatRequest]) (*connect.Response[queuev1.HeartbeatResponse], error) {
msg := req.Msg
group, err := h.groupStore.GetConsumerGroup(ctx, msg.QueueName, msg.GroupId)
if err != nil {
return nil, connect.NewError(connect.CodeInternal, err)
}
consumer, exists := group.Consumers[msg.ConsumerId]
if !exists {
return nil, connect.NewError(connect.CodeNotFound, fmt.Errorf("consumer not found"))
}
consumer.LastHeartbeat = time.Now()
return connect.NewResponse(&queuev1.HeartbeatResponse{
ShouldRejoin: false,
}), nil
}
// --- Consume Operations ---
func (h *Handler) Consume(ctx context.Context, req *connect.Request[queuev1.ConsumeRequest]) (*connect.Response[queuev1.ConsumeResponse], error) {
msg := req.Msg
group, err := h.groupStore.GetConsumerGroup(ctx, msg.QueueName, msg.GroupId)
if err != nil {
return nil, connect.NewError(connect.CodeInternal, err)
}
limit := int(msg.MaxMessages)
if limit == 0 {
limit = 10
}
var messages []*queuev1.Message
for partitionID, cursor := range group.Cursors {
msgs, err := h.logStore.ReadBatch(ctx, msg.QueueName, partitionID, cursor.Cursor, limit-len(messages))
if err != nil {
continue
}
for _, m := range msgs {
entry := &types.PendingEntry{
PartitionID: partitionID,
Offset: m.Sequence,
ConsumerID: msg.ConsumerId,
ClaimedAt: time.Now(),
}
h.groupStore.AddPendingEntry(ctx, msg.QueueName, msg.GroupId, entry)
messages = append(messages, h.messageToProto(m))
}
if len(messages) >= limit {
break
}
}
return connect.NewResponse(&queuev1.ConsumeResponse{
Messages: messages,
}), nil
}
func (h *Handler) ConsumeStream(ctx context.Context, req *connect.Request[queuev1.ConsumeStreamRequest], stream *connect.ServerStream[queuev1.Message]) error {
msg := req.Msg
group, err := h.groupStore.GetConsumerGroup(ctx, msg.QueueName, msg.GroupId)
if err != nil {
return connect.NewError(connect.CodeInternal, err)
}
cursors := make(map[int]uint64)
for partitionID, cursor := range group.Cursors {
cursors[partitionID] = cursor.Cursor
}
for {
select {
case <-ctx.Done():
return nil
default:
}
sent := false
for partitionID, offset := range cursors {
messages, err := h.logStore.ReadBatch(ctx, msg.QueueName, partitionID, offset, 10)
if err != nil || len(messages) == 0 {
continue
}
for _, m := range messages {
entry := &types.PendingEntry{
PartitionID: partitionID,
Offset: m.Sequence,
ConsumerID: msg.ConsumerId,
ClaimedAt: time.Now(),
}
h.groupStore.AddPendingEntry(ctx, msg.QueueName, msg.GroupId, entry)
if err := stream.Send(h.messageToProto(m)); err != nil {
return err
}
cursors[partitionID] = m.Sequence + 1
sent = true
}
}
if !sent {
time.Sleep(100 * time.Millisecond)
}
}
}
func (h *Handler) Ack(ctx context.Context, req *connect.Request[queuev1.AckRequest]) (*connect.Response[queuev1.AckResponse], error) {
msg := req.Msg
var success int32
for _, partitionOffsets := range msg.Offsets {
for _, offset := range partitionOffsets.Offsets {
err := h.groupStore.RemovePendingEntry(ctx, msg.QueueName, msg.GroupId, msg.ConsumerId, int(partitionOffsets.PartitionId), offset)
if err == nil {
success++
}
}
if len(partitionOffsets.Offsets) > 0 {
maxOffset := partitionOffsets.Offsets[len(partitionOffsets.Offsets)-1]
h.groupStore.UpdateCommitted(ctx, msg.QueueName, msg.GroupId, int(partitionOffsets.PartitionId), maxOffset+1)
}
}
return connect.NewResponse(&queuev1.AckResponse{
AckedCount: uint32(success),
}), nil
}
func (h *Handler) Nack(ctx context.Context, req *connect.Request[queuev1.NackRequest]) (*connect.Response[emptypb.Empty], error) {
msg := req.Msg
for _, partitionOffsets := range msg.Offsets {
for _, offset := range partitionOffsets.Offsets {
h.groupStore.RemovePendingEntry(ctx, msg.QueueName, msg.GroupId, msg.ConsumerId, int(partitionOffsets.PartitionId), offset)
}
}
return connect.NewResponse(&emptypb.Empty{}), nil
}
func (h *Handler) Claim(ctx context.Context, req *connect.Request[queuev1.ClaimRequest]) (*connect.Response[queuev1.ClaimResponse], error) {
msg := req.Msg
limit := int(msg.Limit)
if limit == 0 {
limit = 10
}
var claimed []*queuev1.Message
group, err := h.groupStore.GetConsumerGroup(ctx, msg.QueueName, msg.GroupId)
if err != nil {
return nil, connect.NewError(connect.CodeInternal, err)
}
minIdleTime := time.Duration(0)
if msg.MinIdleTime != nil {
minIdleTime = msg.MinIdleTime.AsDuration()
}
for _, pel := range group.PEL {
for _, entry := range pel {
if entry.ConsumerID == msg.ConsumerId {
continue
}
if time.Since(entry.ClaimedAt) < minIdleTime {
continue
}
if msg.PartitionId != nil && entry.PartitionID != int(*msg.PartitionId) {
continue
}
m, err := h.logStore.Read(ctx, msg.QueueName, entry.PartitionID, entry.Offset)
if err != nil {
continue
}
entry.ConsumerID = msg.ConsumerId
entry.ClaimedAt = time.Now()
entry.DeliveryCount++
claimed = append(claimed, h.messageToProto(m))
if len(claimed) >= limit {
break
}
}
if len(claimed) >= limit {
break
}
}
return connect.NewResponse(&queuev1.ClaimResponse{
Messages: claimed,
}), nil
}
func (h *Handler) GetPending(ctx context.Context, req *connect.Request[queuev1.GetPendingRequest]) (*connect.Response[queuev1.GetPendingResponse], error) {
msg := req.Msg
var entries []*types.PendingEntry
var err error
if msg.ConsumerId != "" {
entries, err = h.groupStore.GetPendingEntries(ctx, msg.QueueName, msg.GroupId, msg.ConsumerId)
} else {
entries, err = h.groupStore.GetAllPendingEntries(ctx, msg.QueueName, msg.GroupId)
}
if err != nil {
return nil, connect.NewError(connect.CodeInternal, err)
}
protoEntries := make([]*queuev1.PendingEntry, len(entries))
for i, e := range entries {
protoEntries[i] = &queuev1.PendingEntry{
PartitionId: uint32(e.PartitionID),
Offset: e.Offset,
ConsumerId: e.ConsumerID,
DeliveredAt: timestamppb.New(e.ClaimedAt),
DeliveryCount: uint32(e.DeliveryCount),
}
}
return connect.NewResponse(&queuev1.GetPendingResponse{
Entries: protoEntries,
}), nil
}
// --- Partition Info ---
func (h *Handler) GetPartitionInfo(ctx context.Context, req *connect.Request[queuev1.GetPartitionInfoRequest]) (*connect.Response[queuev1.PartitionInfo], error) {
msg := req.Msg
head, err := h.logStore.Head(ctx, msg.QueueName, int(msg.PartitionId))
if err != nil {
return nil, connect.NewError(connect.CodeInternal, err)
}
tail, err := h.logStore.Tail(ctx, msg.QueueName, int(msg.PartitionId))
if err != nil {
return nil, connect.NewError(connect.CodeInternal, err)
}
count, _ := h.logStore.Count(ctx, msg.QueueName, int(msg.PartitionId))
return connect.NewResponse(&queuev1.PartitionInfo{
PartitionId: msg.PartitionId,
HeadOffset: head,
TailOffset: tail,
MessageCount: count,
}), nil
}
func (h *Handler) ListPartitions(ctx context.Context, req *connect.Request[queuev1.ListPartitionsRequest]) (*connect.Response[queuev1.ListPartitionsResponse], error) {
msg := req.Msg
config, err := h.logStore.GetQueue(ctx, msg.QueueName)
if err != nil {
return nil, connect.NewError(connect.CodeInternal, err)
}
partitions := make([]*queuev1.PartitionInfo, config.Partitions)
for i := 0; i < config.Partitions; i++ {
head, _ := h.logStore.Head(ctx, msg.QueueName, i)
tail, _ := h.logStore.Tail(ctx, msg.QueueName, i)
count, _ := h.logStore.Count(ctx, msg.QueueName, i)
partitions[i] = &queuev1.PartitionInfo{
PartitionId: uint32(i),
HeadOffset: head,
TailOffset: tail,
MessageCount: count,
}
}
return connect.NewResponse(&queuev1.ListPartitionsResponse{
Partitions: partitions,
}), nil
}
// --- Stats ---
func (h *Handler) GetStats(ctx context.Context, req *connect.Request[queuev1.GetStatsRequest]) (*connect.Response[queuev1.QueueStats], error) {
msg := req.Msg
config, err := h.logStore.GetQueue(ctx, msg.QueueName)
if err != nil {
return nil, connect.NewError(connect.CodeInternal, err)
}
totalCount, _ := h.logStore.TotalCount(ctx, msg.QueueName)
partitionStats := make([]*queuev1.PartitionStats, config.Partitions)
for i := 0; i < config.Partitions; i++ {
head, _ := h.logStore.Head(ctx, msg.QueueName, i)
tail, _ := h.logStore.Tail(ctx, msg.QueueName, i)
count, _ := h.logStore.Count(ctx, msg.QueueName, i)
partitionStats[i] = &queuev1.PartitionStats{
PartitionId: uint32(i),
HeadOffset: head,
TailOffset: tail,
MessageCount: count,
}
}
return connect.NewResponse(&queuev1.QueueStats{
QueueName: msg.QueueName,
Partitions: partitionStats,
TotalMessages: totalCount,
}), nil
}
// --- Admin Operations ---
func (h *Handler) Purge(ctx context.Context, req *connect.Request[queuev1.PurgeRequest]) (*connect.Response[queuev1.PurgeResponse], error) {
msg := req.Msg
config, err := h.logStore.GetQueue(ctx, msg.QueueName)
if err != nil {
return nil, connect.NewError(connect.CodeInternal, err)
}
var purged uint64
if msg.PartitionId != nil {
count, _ := h.logStore.Count(ctx, msg.QueueName, int(*msg.PartitionId))
tail, _ := h.logStore.Tail(ctx, msg.QueueName, int(*msg.PartitionId))
h.logStore.Truncate(ctx, msg.QueueName, int(*msg.PartitionId), tail)
purged = count
} else {
for i := 0; i < config.Partitions; i++ {
count, _ := h.logStore.Count(ctx, msg.QueueName, i)
tail, _ := h.logStore.Tail(ctx, msg.QueueName, i)
h.logStore.Truncate(ctx, msg.QueueName, i, tail)
purged += count
}
}
return connect.NewResponse(&queuev1.PurgeResponse{
MessagesDeleted: purged,
}), nil
}
func (h *Handler) Truncate(ctx context.Context, req *connect.Request[queuev1.TruncateRequest]) (*connect.Response[emptypb.Empty], error) {
msg := req.Msg
if err := h.logStore.Truncate(ctx, msg.QueueName, int(msg.PartitionId), msg.MinOffset); err != nil {
return nil, connect.NewError(connect.CodeInternal, err)
}
return connect.NewResponse(&emptypb.Empty{}), nil
}
// --- Helper Functions ---
func (h *Handler) queueToProto(config *types.QueueConfig) *queuev1.Queue {
return &queuev1.Queue{
Name: config.Name,
Partitions: uint32(config.Partitions),
Config: &queuev1.QueueConfig{
Retention: &queuev1.RetentionConfig{
MaxAge: durationpb.New(config.MessageTTL),
},
},
}
}
func (h *Handler) messageToProto(msg *types.Message) *queuev1.Message {
protoMsg := &queuev1.Message{
Offset: msg.Sequence,
PartitionId: uint32(msg.PartitionID),
Timestamp: timestamppb.New(msg.CreatedAt),
Value: msg.GetPayload(),
Key: []byte(msg.PartitionKey),
}
if len(msg.Properties) > 0 {
protoMsg.Headers = make(map[string][]byte)
for k, v := range msg.Properties {
protoMsg.Headers[k] = []byte(v)
}
}
return protoMsg
}
func (h *Handler) groupToProto(group *types.ConsumerGroupState) *queuev1.ConsumerGroup {
consumers := make([]*queuev1.ConsumerInfo, 0, len(group.Consumers))
for _, c := range group.Consumers {
consumers = append(consumers, &queuev1.ConsumerInfo{
ConsumerId: c.ID,
LastHeartbeat: timestamppb.New(c.LastHeartbeat),
})
}
cursors := make([]*queuev1.PartitionCursor, 0, len(group.Cursors))
for partitionID, cursor := range group.Cursors {
cursors = append(cursors, &queuev1.PartitionCursor{
PartitionId: uint32(partitionID),
Cursor: cursor.Cursor,
Committed: cursor.Committed,
})
}
var pendingCount uint64
for _, entries := range group.PEL {
pendingCount += uint64(len(entries))
}
return &queuev1.ConsumerGroup{
GroupId: group.ID,
QueueName: group.QueueName,
Consumers: consumers,
Cursors: cursors,
PendingCount: pendingCount,
CreatedAt: timestamppb.New(group.CreatedAt),
}
}
func (h *Handler) getPartitionID(key string, partitions int) int {
if partitions <= 0 {
return 0
}
if key == "" {
return 0
}
var hash uint32 = 2166136261
for i := 0; i < len(key); i++ {
hash ^= uint32(key[i])
hash *= 16777619
}
return int(hash % uint32(partitions))
}