Fix error handling in auto queue create

Signed-off-by: dusan <borovcanindusan1@gmail.com>
This commit is contained in:
dusan
2026-02-08 20:43:57 +01:00
parent c03e3fbb31
commit 8d2e0d14d4
4 changed files with 203 additions and 0 deletions
+18
View File
@@ -172,8 +172,22 @@ func (a *Adapter) FindMatchingQueues(ctx context.Context, topic string) ([]strin
return a.topicIndex.FindMatching(topic), nil
}
func (a *Adapter) queueConfigExists(queueName string) error {
if _, err := a.queueStore.Get(queueName); err != nil {
if err == ErrQueueNotFound {
return storage.ErrQueueNotFound
}
return err
}
return nil
}
// Append adds a message to the end of a queue's log.
func (a *Adapter) Append(ctx context.Context, queueName string, msg *types.Message) (uint64, error) {
if err := a.queueConfigExists(queueName); err != nil {
return 0, err
}
value := msg.GetPayload()
key := []byte{}
@@ -197,6 +211,10 @@ func (a *Adapter) AppendBatch(ctx context.Context, queueName string, msgs []*typ
return 0, ErrEmptyBatch
}
if err := a.queueConfigExists(queueName); err != nil {
return 0, err
}
batch := NewBatch(0)
for _, msg := range msgs {
+14
View File
@@ -7,6 +7,7 @@ import (
"context"
"testing"
"github.com/absmach/fluxmq/queue/storage"
"github.com/absmach/fluxmq/queue/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@@ -19,6 +20,8 @@ func TestAdapter_ReadBatch(t *testing.T) {
defer adapter.Close()
ctx := context.Background()
cfg := types.DefaultQueueConfig("q1", "$queue/q1/#")
require.NoError(t, adapter.CreateQueue(ctx, cfg))
msgs := []*types.Message{
{ID: "1", Topic: "t", Payload: []byte("a")},
@@ -49,6 +52,17 @@ func TestAdapter_ReadBatch(t *testing.T) {
assert.Len(t, got, 0)
}
func TestAdapter_AppendRequiresQueueConfig(t *testing.T) {
dir := t.TempDir()
adapter, err := NewAdapter(dir, DefaultAdapterConfig())
require.NoError(t, err)
defer adapter.Close()
ctx := context.Background()
_, err = adapter.Append(ctx, "missing", &types.Message{ID: "1", Topic: "$queue/missing", Payload: []byte("x")})
require.ErrorIs(t, err, storage.ErrQueueNotFound)
}
func TestAdapter_StreamCursorAndCommitDoNotRegress(t *testing.T) {
dir := t.TempDir()
adapter, err := NewAdapter(dir, DefaultAdapterConfig())
+26
View File
@@ -153,6 +153,15 @@ func (f *LogFSM) applyAppend(ctx context.Context, op *Operation) *ApplyResult {
}
offset, err := f.queueStore.Append(ctx, op.QueueName, op.Message)
if err == storage.ErrQueueNotFound {
if createErr := f.ensureQueueExists(ctx, op.QueueName); createErr != nil {
f.logger.Error("failed to auto-create queue for append",
slog.String("queue", op.QueueName),
slog.String("error", createErr.Error()))
return &ApplyResult{Error: createErr}
}
offset, err = f.queueStore.Append(ctx, op.QueueName, op.Message)
}
if err != nil {
f.logger.Error("failed to apply append",
slog.String("queue", op.QueueName),
@@ -175,6 +184,15 @@ func (f *LogFSM) applyAppendBatch(ctx context.Context, op *Operation) *ApplyResu
}
offset, err := f.queueStore.AppendBatch(ctx, op.QueueName, op.Messages)
if err == storage.ErrQueueNotFound {
if createErr := f.ensureQueueExists(ctx, op.QueueName); createErr != nil {
f.logger.Error("failed to auto-create queue for append batch",
slog.String("queue", op.QueueName),
slog.String("error", createErr.Error()))
return &ApplyResult{Error: createErr}
}
offset, err = f.queueStore.AppendBatch(ctx, op.QueueName, op.Messages)
}
if err != nil {
f.logger.Error("failed to apply append batch",
slog.String("queue", op.QueueName),
@@ -191,6 +209,14 @@ func (f *LogFSM) applyAppendBatch(ctx context.Context, op *Operation) *ApplyResu
return &ApplyResult{Offset: offset}
}
func (f *LogFSM) ensureQueueExists(ctx context.Context, queueName string) error {
cfg := types.DefaultEphemeralQueueConfig(queueName, "$queue/"+queueName+"/#")
if err := f.queueStore.CreateQueue(ctx, cfg); err != nil && err != storage.ErrQueueAlreadyExists {
return err
}
return nil
}
func (f *LogFSM) applyTruncate(ctx context.Context, op *Operation) *ApplyResult {
err := f.queueStore.Truncate(ctx, op.QueueName, op.MinOffset)
if err != nil {
+145
View File
@@ -0,0 +1,145 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package raft
import (
"context"
"io"
"log/slog"
"testing"
"time"
"github.com/absmach/fluxmq/queue/storage"
memlog "github.com/absmach/fluxmq/queue/storage/memory/log"
"github.com/absmach/fluxmq/queue/types"
)
type noopGroupStore struct{}
func (noopGroupStore) CreateConsumerGroup(context.Context, *types.ConsumerGroup) error {
return nil
}
func (noopGroupStore) GetConsumerGroup(context.Context, string, string) (*types.ConsumerGroup, error) {
return nil, storage.ErrConsumerNotFound
}
func (noopGroupStore) UpdateConsumerGroup(context.Context, *types.ConsumerGroup) error {
return nil
}
func (noopGroupStore) DeleteConsumerGroup(context.Context, string, string) error {
return nil
}
func (noopGroupStore) ListConsumerGroups(context.Context, string) ([]*types.ConsumerGroup, error) {
return nil, nil
}
func (noopGroupStore) AddPendingEntry(context.Context, string, string, *types.PendingEntry) error {
return nil
}
func (noopGroupStore) RemovePendingEntry(context.Context, string, string, string, uint64) error {
return nil
}
func (noopGroupStore) GetPendingEntries(context.Context, string, string, string) ([]*types.PendingEntry, error) {
return nil, nil
}
func (noopGroupStore) GetAllPendingEntries(context.Context, string, string) ([]*types.PendingEntry, error) {
return nil, nil
}
func (noopGroupStore) TransferPendingEntry(context.Context, string, string, uint64, string, string) error {
return nil
}
func (noopGroupStore) UpdateCursor(context.Context, string, string, uint64) error {
return nil
}
func (noopGroupStore) UpdateCommitted(context.Context, string, string, uint64) error {
return nil
}
func (noopGroupStore) RegisterConsumer(context.Context, string, string, *types.ConsumerInfo) error {
return nil
}
func (noopGroupStore) UnregisterConsumer(context.Context, string, string, string) error {
return nil
}
func (noopGroupStore) ListConsumers(context.Context, string, string) ([]*types.ConsumerInfo, error) {
return nil, nil
}
func newTestLogFSM() (*LogFSM, *memlog.Store) {
queueStore := memlog.New()
logger := slog.New(slog.NewTextHandler(io.Discard, nil))
return NewLogFSM(queueStore, noopGroupStore{}, logger), queueStore
}
func TestLogFSM_ApplyAppendAutoCreatesMissingQueue(t *testing.T) {
fsm, store := newTestLogFSM()
ctx := context.Background()
queueName := "demo-events"
result := fsm.applyAppend(ctx, &Operation{
QueueName: queueName,
Message: &types.Message{
ID: "msg-1",
Topic: "$queue/" + queueName,
Payload: []byte("payload-1"),
State: types.StateQueued,
CreatedAt: time.Now(),
},
})
if result.Error != nil {
t.Fatalf("applyAppend returned error: %v", result.Error)
}
if result.Offset != 0 {
t.Fatalf("expected first offset to be 0, got %d", result.Offset)
}
if _, err := store.GetQueue(ctx, queueName); err != nil {
t.Fatalf("expected queue %q to be auto-created, got error: %v", queueName, err)
}
msg, err := store.Read(ctx, queueName, 0)
if err != nil {
t.Fatalf("expected appended message at offset 0, got error: %v", err)
}
if got := string(msg.GetPayload()); got != "payload-1" {
t.Fatalf("unexpected payload: %q", got)
}
}
func TestLogFSM_ApplyAppendBatchAutoCreatesMissingQueue(t *testing.T) {
fsm, store := newTestLogFSM()
ctx := context.Background()
queueName := "demo-batch"
result := fsm.applyAppendBatch(ctx, &Operation{
QueueName: queueName,
Messages: []*types.Message{
{
ID: "msg-1",
Topic: "$queue/" + queueName,
Payload: []byte("one"),
State: types.StateQueued,
CreatedAt: time.Now(),
},
{
ID: "msg-2",
Topic: "$queue/" + queueName,
Payload: []byte("two"),
State: types.StateQueued,
CreatedAt: time.Now(),
},
},
})
if result.Error != nil {
t.Fatalf("applyAppendBatch returned error: %v", result.Error)
}
if result.Offset != 0 {
t.Fatalf("expected first offset to be 0, got %d", result.Offset)
}
count, err := store.Count(ctx, queueName)
if err != nil {
t.Fatalf("count failed: %v", err)
}
if count != 2 {
t.Fatalf("expected 2 messages, got %d", count)
}
}