SMQ-2842 - Fix client telemetry handling in journal service (#2864)

Signed-off-by: Felix Gateru <felix.gateru@gmail.com>
This commit is contained in:
Felix Gateru
2025-05-19 17:50:43 +03:00
committed by GitHub
parent 16eff6bebb
commit 2b5a670bef
10 changed files with 96 additions and 65 deletions
-7
View File
@@ -148,13 +148,6 @@ func main() {
}
defer mpub.Close()
mpub, err = msgevents.NewPublisherMiddleware(ctx, mpub, cfg.ESURL)
if err != nil {
logger.Error(fmt.Sprintf("failed to create event store middleware: %s", err))
exitCode = 1
return
}
fwd := mqtt.NewForwarder(brokers.SubjectAllChannels, logger)
fwd = mqtttracing.New(serverConfig, tracer, fwd, brokers.SubjectAllChannels)
if err := fwd.Forward(ctx, svcName, bsub, mpub); err != nil {
+22 -7
View File
@@ -6,6 +6,7 @@ package events
import (
"context"
"errors"
"log/slog"
"time"
"github.com/absmach/supermq/journal"
@@ -13,7 +14,12 @@ import (
"github.com/absmach/supermq/pkg/events/store"
)
var ErrMissingOccurredAt = errors.New("missing occurred_at")
var (
ErrMissingOccurredAt = errors.New("missing occurred_at")
errMissingOperation = errors.New("missing operation")
errMissingAttributes = errors.New("missing attributes")
errMsg = "failed to save journal"
)
// Start method starts consuming messages received from Event store.
func Start(ctx context.Context, consumer string, sub events.Subscriber, service journal.Service) error {
@@ -35,22 +41,27 @@ func Handle(service journal.Service) handleFunc {
operation, ok := data["operation"].(string)
if !ok {
return errors.New("missing operation")
// Error is logged instead of being returned to avoid redelivering of the event.
slog.Error(errMsg, "error", errMissingOperation)
return nil
}
delete(data, "operation")
if operation == "" {
return errors.New("missing operation")
slog.Error(errMsg, "error", errMissingOperation)
return nil
}
occurredAt, ok := data["occurred_at"].(float64)
if !ok {
return ErrMissingOccurredAt
slog.Error(errMsg, "error", ErrMissingOccurredAt)
return nil
}
delete(data, "occurred_at")
if occurredAt == 0 {
return ErrMissingOccurredAt
slog.Error(errMsg, "error", ErrMissingOccurredAt)
return nil
}
metadata, ok := data["metadata"].(map[string]interface{})
@@ -60,7 +71,8 @@ func Handle(service journal.Service) handleFunc {
delete(data, "metadata")
if len(data) == 0 {
return errors.New("missing attributes")
slog.Error(errMsg, "error", errMissingAttributes)
return nil
}
j := journal.Journal{
@@ -69,8 +81,11 @@ func Handle(service journal.Service) handleFunc {
Attributes: data,
Metadata: metadata,
}
if err := service.Save(ctx, j); err != nil {
slog.Error(errMsg, "error", err)
}
return service.Save(ctx, j)
return nil
}
}
+8 -8
View File
@@ -94,7 +94,7 @@ func TestHandle(t *testing.T) {
"number": float64(rand.Intn(1000)),
"metadata": payload,
},
err: errors.New("missing operation"),
err: nil,
},
{
desc: "with empty operation",
@@ -106,7 +106,7 @@ func TestHandle(t *testing.T) {
"number": float64(rand.Intn(1000)),
"metadata": payload,
},
err: errors.New("missing operation"),
err: nil,
},
{
desc: "with invalid operation",
@@ -118,7 +118,7 @@ func TestHandle(t *testing.T) {
"number": float64(rand.Intn(1000)),
"metadata": payload,
},
err: errors.New("missing operation"),
err: nil,
},
{
desc: "with missing occurred_at",
@@ -129,7 +129,7 @@ func TestHandle(t *testing.T) {
"number": float64(rand.Intn(1000)),
"metadata": payload,
},
err: aevents.ErrMissingOccurredAt,
err: nil,
},
{
desc: "with empty occurred_at",
@@ -141,7 +141,7 @@ func TestHandle(t *testing.T) {
"number": float64(rand.Intn(1000)),
"metadata": payload,
},
err: aevents.ErrMissingOccurredAt,
err: nil,
},
{
desc: "with invalid occurred_at",
@@ -153,7 +153,7 @@ func TestHandle(t *testing.T) {
"number": float64(rand.Intn(1000)),
"metadata": payload,
},
err: aevents.ErrMissingOccurredAt,
err: nil,
},
{
desc: "with missing metadata",
@@ -197,7 +197,7 @@ func TestHandle(t *testing.T) {
"occurred_at": float64(time.Now().UnixNano()),
"metadata": payload,
},
err: errors.New("missing attributes"),
err: nil,
},
{
desc: "with empty attributes",
@@ -249,7 +249,7 @@ func TestHandle(t *testing.T) {
"metadata": payload,
},
repoErr: repoerr.ErrCreateEntity,
err: repoerr.ErrCreateEntity,
err: nil,
},
}
+1 -1
View File
@@ -181,7 +181,7 @@ type Repository interface {
RemoveSubscription(ctx context.Context, subscriberID string) error
// IncrementInboundMessages increments the inbound messages count for a client.
IncrementInboundMessages(ctx context.Context, clientID string) error
IncrementInboundMessages(ctx context.Context, ct ClientTelemetry) error
// IncrementOutboundMessages increments the outbound messages count for a client.
IncrementOutboundMessages(ctx context.Context, channelID, subtopic string) error
+10 -10
View File
@@ -190,16 +190,16 @@ func (_c *Repository_DeleteClientTelemetry_Call) RunAndReturn(run func(ctx conte
}
// IncrementInboundMessages provides a mock function for the type Repository
func (_mock *Repository) IncrementInboundMessages(ctx context.Context, clientID string) error {
ret := _mock.Called(ctx, clientID)
func (_mock *Repository) IncrementInboundMessages(ctx context.Context, ct journal.ClientTelemetry) error {
ret := _mock.Called(ctx, ct)
if len(ret) == 0 {
panic("no return value specified for IncrementInboundMessages")
}
var r0 error
if returnFunc, ok := ret.Get(0).(func(context.Context, string) error); ok {
r0 = returnFunc(ctx, clientID)
if returnFunc, ok := ret.Get(0).(func(context.Context, journal.ClientTelemetry) error); ok {
r0 = returnFunc(ctx, ct)
} else {
r0 = ret.Error(0)
}
@@ -213,14 +213,14 @@ type Repository_IncrementInboundMessages_Call struct {
// IncrementInboundMessages is a helper method to define mock.On call
// - ctx
// - clientID
func (_e *Repository_Expecter) IncrementInboundMessages(ctx interface{}, clientID interface{}) *Repository_IncrementInboundMessages_Call {
return &Repository_IncrementInboundMessages_Call{Call: _e.mock.On("IncrementInboundMessages", ctx, clientID)}
// - ct
func (_e *Repository_Expecter) IncrementInboundMessages(ctx interface{}, ct interface{}) *Repository_IncrementInboundMessages_Call {
return &Repository_IncrementInboundMessages_Call{Call: _e.mock.On("IncrementInboundMessages", ctx, ct)}
}
func (_c *Repository_IncrementInboundMessages_Call) Run(run func(ctx context.Context, clientID string)) *Repository_IncrementInboundMessages_Call {
func (_c *Repository_IncrementInboundMessages_Call) Run(run func(ctx context.Context, ct journal.ClientTelemetry)) *Repository_IncrementInboundMessages_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(string))
run(args[0].(context.Context), args[1].(journal.ClientTelemetry))
})
return _c
}
@@ -230,7 +230,7 @@ func (_c *Repository_IncrementInboundMessages_Call) Return(err error) *Repositor
return _c
}
func (_c *Repository_IncrementInboundMessages_Call) RunAndReturn(run func(ctx context.Context, clientID string) error) *Repository_IncrementInboundMessages_Call {
func (_c *Repository_IncrementInboundMessages_Call) RunAndReturn(run func(ctx context.Context, ct journal.ClientTelemetry) error) *Repository_IncrementInboundMessages_Call {
_c.Call.Return(run)
return _c
}
+15 -17
View File
@@ -46,6 +46,7 @@ func (repo *repository) DeleteClientTelemetry(ctx context.Context, clientID, dom
if rows, _ := result.RowsAffected(); rows == 0 {
return repoerr.ErrNotFound
}
return nil
}
@@ -81,19 +82,19 @@ func (repo *repository) RetrieveClientTelemetry(ctx context.Context, clientID, d
}
func (repo *repository) AddSubscription(ctx context.Context, sub journal.ClientSubscription) error {
q := `INSERT INTO subscriptions (id, subscriber_id, channel_id, subtopic, client_id)
VALUES (:id, :subscriber_id, :channel_id, :subtopic, :client_id);
q := `
INSERT INTO subscriptions (id, subscriber_id, channel_id, subtopic, client_id)
SELECT :id, :subscriber_id, :channel_id, :subtopic, :client_id
FROM clients_telemetry
WHERE client_id = :client_id
RETURNING id;
`
result, err := repo.db.NamedExecContext(ctx, q, sub)
_, err := repo.db.NamedExecContext(ctx, q, sub)
if err != nil {
return postgres.HandleError(repoerr.ErrUpdateEntity, err)
}
if rows, _ := result.RowsAffected(); rows == 0 {
return repoerr.ErrNotFound
}
return nil
}
@@ -127,18 +128,15 @@ func (repo *repository) RemoveSubscription(ctx context.Context, subscriberID str
return nil
}
func (repo *repository) IncrementInboundMessages(ctx context.Context, clientID string) error {
q := `
UPDATE clients_telemetry
SET inbound_messages = inbound_messages + 1,
last_seen = :last_seen
WHERE client_id = :client_id;
func (repo *repository) IncrementInboundMessages(ctx context.Context, ct journal.ClientTelemetry) error {
q := `INSERT INTO clients_telemetry (client_id,domain_id, inbound_messages,first_seen, last_seen)
VALUES (:client_id, :domain_id, 1, :first_seen, :last_seen)
ON CONFLICT (client_id)
DO UPDATE SET
inbound_messages = clients_telemetry.inbound_messages + 1,
last_seen = EXCLUDED.last_seen;
`
ct := journal.ClientTelemetry{
ClientID: clientID,
LastSeen: time.Now(),
}
dbct, err := toDBClientsTelemetry(ct)
if err != nil {
return errors.Wrap(repoerr.ErrUpdateEntity, err)
+36 -15
View File
@@ -25,6 +25,11 @@ const (
messagingUnsubscribe = "messaging.client_unsubscribe"
)
var (
errSaveJournal = errors.New("failed to save journal")
errHandleTelemetry = errors.New("failed to handle client telemetry")
)
type service struct {
idProvider supermq.IDProvider
repository Repository
@@ -45,10 +50,10 @@ func (svc *service) Save(ctx context.Context, journal Journal) error {
journal.ID = id
if err := svc.repository.Save(ctx, journal); err != nil {
return err
return errors.Wrap(errSaveJournal, err)
}
if err := svc.handleTelemetry(ctx, journal); err != nil {
return err
return errors.Wrap(errHandleTelemetry, err)
}
return nil
@@ -108,7 +113,7 @@ func (svc *service) handleTelemetry(ctx context.Context, journal Journal) error
}
func (svc *service) addClientTelemetry(ctx context.Context, journal Journal) error {
ce, err := toClientEvent(journal)
ce, err := toClientEvent(journal, true)
if err != nil {
return err
}
@@ -122,7 +127,7 @@ func (svc *service) addClientTelemetry(ctx context.Context, journal Journal) err
}
func (svc *service) removeClientTelemetry(ctx context.Context, journal Journal) error {
ce, err := toClientEvent(journal)
ce, err := toClientEvent(journal, false)
if err != nil {
return err
}
@@ -201,7 +206,14 @@ func (svc *service) updateMessageCount(ctx context.Context, journal Journal) err
if err != nil {
return err
}
if err := svc.repository.IncrementInboundMessages(ctx, ae.clientID); err != nil {
ct := ClientTelemetry{
ClientID: ae.clientID,
DomainID: ae.domainID,
FirstSeen: ae.occurredAt,
LastSeen: ae.occurredAt,
}
if err := svc.repository.IncrementInboundMessages(ctx, ct); err != nil {
return err
}
if err := svc.repository.IncrementOutboundMessages(ctx, ae.channelID, ae.subtopic); err != nil {
@@ -216,9 +228,8 @@ type clientEvent struct {
createdAt time.Time
}
func toClientEvent(journal Journal) (clientEvent, error) {
func toClientEvent(journal Journal, isCreate bool) (clientEvent, error) {
var createdAt time.Time
var err error
id, err := getStringAttribute(journal, "id")
if err != nil {
return clientEvent{}, err
@@ -228,11 +239,13 @@ func toClientEvent(journal Journal) (clientEvent, error) {
return clientEvent{}, err
}
createdAtStr := journal.Attributes["created_at"].(string)
if createdAtStr != "" {
createdAt, err = time.Parse(time.RFC3339, createdAtStr)
if err != nil {
return clientEvent{}, fmt.Errorf("invalid created_at format")
if isCreate {
createdAtStr := journal.Attributes["created_at"].(string)
if createdAtStr != "" {
createdAt, err = time.Parse(time.RFC3339, createdAtStr)
if err != nil {
return clientEvent{}, fmt.Errorf("invalid created_at format")
}
}
}
return clientEvent{
@@ -245,9 +258,11 @@ func toClientEvent(journal Journal) (clientEvent, error) {
type adapterEvent struct {
clientID string
channelID string
domainID string
subscriberID string
topic string
subtopic string
occurredAt time.Time
}
func toPublishEvent(journal Journal) (adapterEvent, error) {
@@ -259,15 +274,21 @@ func toPublishEvent(journal Journal) (adapterEvent, error) {
if err != nil {
return adapterEvent{}, err
}
domainID, err := getStringAttribute(journal, "domain_id")
if err != nil {
return adapterEvent{}, err
}
subtopic, err := getStringAttribute(journal, "subtopic")
if err != nil {
return adapterEvent{}, err
}
return adapterEvent{
clientID: clientID,
channelID: channelID,
subtopic: subtopic,
clientID: clientID,
channelID: channelID,
domainID: domainID,
subtopic: subtopic,
occurredAt: journal.OccurredAt,
}, nil
}
+2
View File
@@ -18,6 +18,7 @@ var (
)
type publishEvent struct {
domainID string
channelID string
clientID string
subtopic string
@@ -26,6 +27,7 @@ type publishEvent struct {
func (pe publishEvent) Encode() (map[string]interface{}, error) {
return map[string]interface{}{
"operation": clientPublish,
"domain_id": pe.domainID,
"channel_id": pe.channelID,
"client_id": pe.clientID,
"subtopic": pe.subtopic,
+1
View File
@@ -36,6 +36,7 @@ func (es *publisherES) Publish(ctx context.Context, topic string, msg *messaging
}
me := publishEvent{
domainID: msg.Domain,
channelID: msg.Channel,
clientID: msg.Publisher,
subtopic: msg.Subtopic,
+1
View File
@@ -43,6 +43,7 @@ func (es *pubsubES) Publish(ctx context.Context, topic string, msg *messaging.Me
}
me := publishEvent{
domainID: msg.Domain,
channelID: msg.Channel,
clientID: msg.Publisher,
subtopic: msg.Subtopic,