Flatten keys for simpler etcd storage

Signed-off-by: dusan <borovcanindusan1@gmail.com>
This commit is contained in:
dusan
2026-02-10 20:14:24 +01:00
parent 25fa02b763
commit 8edb3e5e5d
6 changed files with 192 additions and 55 deletions
+3
View File
@@ -67,6 +67,9 @@ type SubscriptionRouter interface {
// RemoveSubscription removes a subscription for a client.
RemoveSubscription(ctx context.Context, clientID, filter string) error
// RemoveAllSubscriptions removes all subscriptions for a client in a single operation.
RemoveAllSubscriptions(ctx context.Context, clientID string) error
// GetSubscriptionsForClient returns all subscriptions for a specific client.
GetSubscriptionsForClient(ctx context.Context, clientID string) ([]*storage.Subscription, error)
+172 -48
View File
@@ -22,14 +22,17 @@ import (
)
const (
willPrefix = "/mqtt/wills/"
retainedPrefix = "/mqtt/retained/"
subscriptionsPrefix = "/mqtt/subscriptions/"
sessionsPrefix = "/mqtt/sessions/"
queueConsumersPrefix = "/mqtt/queue-consumers/"
// MQTT-specific prefixes
willPrefix = "/mqtt/wills/"
retainedPrefix = "/mqtt/retained/"
electionPrefix = "/mqtt/leader"
urlPrefix = "http://"
// Protocol-agnostic prefixes
subscriptionsPrefix = "/subscriptions/"
sessionsPrefix = "/sessions/"
queueConsumersPrefix = "/queue-consumers/"
electionPrefix = "/leader"
urlPrefix = "http://"
)
var (
@@ -69,6 +72,7 @@ type EtcdCluster struct {
// Local subscription cache for fast topic matching
subCache map[string]*storage.Subscription // key: clientID|filter
clientSubs map[string][]string // clientID → []cacheKey (reverse index)
subCacheMu sync.RWMutex
// Local session owner cache to avoid etcd roundtrips in RoutePublish
@@ -205,6 +209,7 @@ func NewEtcdCluster(cfg *EtcdConfig, localStore storage.Store, logger *slog.Logg
session: s,
logger: logger,
subCache: make(map[string]*storage.Subscription),
clientSubs: make(map[string][]string),
ownerCache: make(map[string]string),
retainedCache: make(map[string]*storage.Message),
localStore: localStore,
@@ -798,52 +803,158 @@ func (c *EtcdCluster) WatchSessionOwner(ctx context.Context, clientID string) <-
}
// AddSubscription adds a subscription to the cluster store.
// Uses read-modify-write with CAS to consolidate all client subscriptions in a single key.
func (c *EtcdCluster) AddSubscription(ctx context.Context, clientID, filter string, qos byte, opts storage.SubscribeOptions) error {
key := fmt.Sprintf("%s%s/%s", subscriptionsPrefix, clientID, filter)
key := subscriptionsPrefix + clientID
sub := &storage.Subscription{
newSub := storage.Subscription{
ClientID: clientID,
Filter: filter,
QoS: qos,
Options: opts,
}
data, err := json.Marshal(sub)
if err != nil {
return fmt.Errorf("failed to marshal subscription: %w", err)
}
for {
resp, err := c.client.Get(ctx, key)
if err != nil {
return fmt.Errorf("failed to get subscriptions: %w", err)
}
_, err = c.client.Put(ctx, key, string(data))
return err
var subs []storage.Subscription
var modRev int64
if len(resp.Kvs) > 0 {
modRev = resp.Kvs[0].ModRevision
if err := json.Unmarshal(resp.Kvs[0].Value, &subs); err != nil {
return fmt.Errorf("failed to unmarshal subscriptions: %w", err)
}
}
replaced := false
for i, s := range subs {
if s.Filter == filter {
subs[i] = newSub
replaced = true
break
}
}
if !replaced {
subs = append(subs, newSub)
}
data, err := json.Marshal(subs)
if err != nil {
return fmt.Errorf("failed to marshal subscriptions: %w", err)
}
var cmp clientv3.Cmp
if modRev == 0 {
cmp = clientv3.Compare(clientv3.CreateRevision(key), "=", 0)
} else {
cmp = clientv3.Compare(clientv3.ModRevision(key), "=", modRev)
}
txnResp, err := c.client.Txn(ctx).
If(cmp).
Then(clientv3.OpPut(key, string(data))).
Commit()
if err != nil {
return fmt.Errorf("failed to commit subscription: %w", err)
}
if txnResp.Succeeded {
return nil
}
}
}
// RemoveSubscription removes a subscription from the cluster store.
// Uses read-modify-write with CAS. Deletes the key if no subscriptions remain.
func (c *EtcdCluster) RemoveSubscription(ctx context.Context, clientID, filter string) error {
key := fmt.Sprintf("%s%s/%s", subscriptionsPrefix, clientID, filter)
key := subscriptionsPrefix + clientID
for {
resp, err := c.client.Get(ctx, key)
if err != nil {
return fmt.Errorf("failed to get subscriptions: %w", err)
}
if len(resp.Kvs) == 0 {
return nil
}
modRev := resp.Kvs[0].ModRevision
var subs []storage.Subscription
if err := json.Unmarshal(resp.Kvs[0].Value, &subs); err != nil {
return fmt.Errorf("failed to unmarshal subscriptions: %w", err)
}
idx := -1
for i, s := range subs {
if s.Filter == filter {
idx = i
break
}
}
if idx == -1 {
return nil
}
subs = append(subs[:idx], subs[idx+1:]...)
cmp := clientv3.Compare(clientv3.ModRevision(key), "=", modRev)
var op clientv3.Op
if len(subs) == 0 {
op = clientv3.OpDelete(key)
} else {
data, err := json.Marshal(subs)
if err != nil {
return fmt.Errorf("failed to marshal subscriptions: %w", err)
}
op = clientv3.OpPut(key, string(data))
}
txnResp, err := c.client.Txn(ctx).
If(cmp).
Then(op).
Commit()
if err != nil {
return fmt.Errorf("failed to commit subscription removal: %w", err)
}
if txnResp.Succeeded {
return nil
}
}
}
// RemoveAllSubscriptions removes all subscriptions for a client in a single DELETE.
func (c *EtcdCluster) RemoveAllSubscriptions(ctx context.Context, clientID string) error {
key := subscriptionsPrefix + clientID
_, err := c.client.Delete(ctx, key)
return err
}
// GetSubscriptionsForClient returns all subscriptions for a client.
func (c *EtcdCluster) GetSubscriptionsForClient(ctx context.Context, clientID string) ([]*storage.Subscription, error) {
prefix := subscriptionsPrefix + clientID + "/"
key := subscriptionsPrefix + clientID
resp, err := c.client.Get(ctx, prefix, clientv3.WithPrefix())
resp, err := c.client.Get(ctx, key)
if err != nil {
return nil, err
}
var subs []*storage.Subscription
for _, kv := range resp.Kvs {
var sub storage.Subscription
if err := json.Unmarshal(kv.Value, &sub); err != nil {
c.logger.Warn("failed to unmarshal subscription", slog.String("error", err.Error()))
continue
}
subs = append(subs, &sub)
if len(resp.Kvs) == 0 {
return nil, nil
}
return subs, nil
var subs []storage.Subscription
if err := json.Unmarshal(resp.Kvs[0].Value, &subs); err != nil {
return nil, fmt.Errorf("failed to unmarshal subscriptions: %w", err)
}
result := make([]*storage.Subscription, len(subs))
for i := range subs {
result[i] = &subs[i]
}
return result, nil
}
// GetSubscribersForTopic returns all subscriptions matching a topic.
@@ -1384,21 +1495,30 @@ func (c *EtcdCluster) loadSubscriptionCache() error {
return fmt.Errorf("failed to load subscriptions: %w", err)
}
fresh := make(map[string]*storage.Subscription, len(resp.Kvs))
fresh := make(map[string]*storage.Subscription)
freshClientSubs := make(map[string][]string)
for _, kv := range resp.Kvs {
var sub storage.Subscription
if err := json.Unmarshal(kv.Value, &sub); err != nil {
c.logger.Warn("failed to unmarshal subscription during cache load", slog.String("error", err.Error()))
clientID := strings.TrimPrefix(string(kv.Key), subscriptionsPrefix)
var subs []storage.Subscription
if err := json.Unmarshal(kv.Value, &subs); err != nil {
c.logger.Warn("failed to unmarshal subscriptions during cache load",
slog.String("client_id", clientID), slog.String("error", err.Error()))
continue
}
cacheKey := fmt.Sprintf("%s|%s", sub.ClientID, sub.Filter)
fresh[cacheKey] = &sub
for i := range subs {
cacheKey := subs[i].ClientID + "|" + subs[i].Filter
fresh[cacheKey] = &subs[i]
freshClientSubs[clientID] = append(freshClientSubs[clientID], cacheKey)
}
}
c.subCacheMu.Lock()
prevSize := len(c.subCache)
c.subCache = fresh
c.clientSubs = freshClientSubs
c.subCacheMu.Unlock()
if staleRemoved := prevSize - len(fresh); staleRemoved > 0 {
@@ -1448,7 +1568,6 @@ func (c *EtcdCluster) watchSubscriptions() {
if c.lifecycleCtx.Err() != nil {
return
}
// Watch channel closed, reload and restart
c.logger.Warn("subscription watch channel closed, reloading cache")
if err := c.loadSubscriptionCache(); err != nil {
c.logger.Error("failed to reload subscriptions", slog.String("error", err.Error()))
@@ -1465,24 +1584,29 @@ func (c *EtcdCluster) watchSubscriptions() {
c.subCacheMu.Lock()
for _, event := range watchResp.Events {
switch event.Type {
case clientv3.EventTypePut:
var sub storage.Subscription
if err := json.Unmarshal(event.Kv.Value, &sub); err != nil {
c.logger.Error("failed to unmarshal subscription in watch", slog.String("error", err.Error()))
clientID := strings.TrimPrefix(string(event.Kv.Key), subscriptionsPrefix)
// Purge all existing cache entries for this client
for _, ck := range c.clientSubs[clientID] {
delete(c.subCache, ck)
}
delete(c.clientSubs, clientID)
if event.Type == clientv3.EventTypePut {
var subs []storage.Subscription
if err := json.Unmarshal(event.Kv.Value, &subs); err != nil {
c.logger.Error("failed to unmarshal subscriptions in watch",
slog.String("client_id", clientID), slog.String("error", err.Error()))
continue
}
cacheKey := fmt.Sprintf("%s|%s", sub.ClientID, sub.Filter)
c.subCache[cacheKey] = &sub
case clientv3.EventTypeDelete:
key := string(event.Kv.Key)
parts := strings.Split(strings.TrimPrefix(key, subscriptionsPrefix), "/")
if len(parts) >= 2 {
cacheKey := fmt.Sprintf("%s|%s", parts[0], parts[1])
delete(c.subCache, cacheKey)
keys := make([]string, 0, len(subs))
for i := range subs {
ck := subs[i].ClientID + "|" + subs[i].Filter
c.subCache[ck] = &subs[i]
keys = append(keys, ck)
}
c.clientSubs[clientID] = keys
}
}
c.subCacheMu.Unlock()
+4
View File
@@ -63,6 +63,10 @@ func (n *NoopCluster) RemoveSubscription(ctx context.Context, clientID, filter s
return nil
}
func (n *NoopCluster) RemoveAllSubscriptions(ctx context.Context, clientID string) error {
return nil
}
func (n *NoopCluster) GetSubscriptionsForClient(ctx context.Context, clientID string) ([]*storage.Subscription, error) {
// Single-node: use local storage
return nil, ErrClusterNotEnabled
+5 -7
View File
@@ -209,7 +209,6 @@ func (b *Broker) destroySessionLocked(s *session.Session) error {
subs := s.GetSubscriptions()
for filter := range subs {
// Check if this is a shared subscription and clean up share groups
if topics.IsShared(filter) {
if b.sharedSubs.Unsubscribe(s.ID, filter) {
shareName, topicFilter, _ := topics.ParseShared(filter)
@@ -219,13 +218,12 @@ func (b *Broker) destroySessionLocked(s *session.Session) error {
} else {
b.router.Unsubscribe(s.ID, filter)
}
}
// Remove subscription from cluster
if b.cluster != nil {
ctx := context.Background()
if err := b.cluster.RemoveSubscription(ctx, s.ID, filter); err != nil {
b.logError("cluster_remove_subscription", err, slog.String("client_id", s.ID), slog.String("filter", filter))
}
if b.cluster != nil {
ctx := context.Background()
if err := b.cluster.RemoveAllSubscriptions(ctx, s.ID); err != nil {
b.logError("cluster_remove_all_subscriptions", err, slog.String("client_id", s.ID))
}
}
+4
View File
@@ -852,6 +852,10 @@ func (c *mockCluster) RemoveSubscription(ctx context.Context, clientID, filter s
return nil
}
func (c *mockCluster) RemoveAllSubscriptions(ctx context.Context, clientID string) error {
return nil
}
func (c *mockCluster) GetSubscriptionsForClient(ctx context.Context, clientID string) ([]*brokerstorage.Subscription, error) {
return nil, nil
}
+4
View File
@@ -57,6 +57,10 @@ func (m *mockCluster) RemoveSubscription(ctx context.Context, clientID, filter s
return nil
}
func (m *mockCluster) RemoveAllSubscriptions(ctx context.Context, clientID string) error {
return nil
}
func (m *mockCluster) GetSubscriptionsForClient(ctx context.Context, clientID string) ([]*storage.Subscription, error) {
return nil, nil
}