SMQ-3094 - Improve test coverage in channels and journals (#3238)

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>
This commit is contained in:
Steve Munene
2025-12-01 20:15:57 +03:00
committed by GitHub
parent b05ae65e24
commit c5a4336d43
6 changed files with 2091 additions and 42 deletions
+21 -16
View File
@@ -149,25 +149,30 @@ func decodeListChannels(_ context.Context, r *http.Request) (any, error) {
if err != nil {
return listChannelsReq{}, errors.Wrap(apiutil.ErrValidation, err)
}
connectionType, err := apiutil.ReadStringQuery(r, api.ConnTypeKey, "")
if err != nil {
return listChannelsReq{}, errors.Wrap(apiutil.ErrValidation, err)
}
req := listChannelsReq{
Page: channels.Page{
Name: name,
Tag: tag,
Status: status,
Metadata: meta,
RoleName: roleName,
RoleID: roleID,
Actions: actions,
AccessType: accessType,
Order: order,
Dir: dir,
Offset: offset,
Limit: limit,
Group: groupID,
Client: clientID,
ID: id,
OnlyTotal: ot,
Name: name,
Tag: tag,
Status: status,
Metadata: meta,
RoleName: roleName,
RoleID: roleID,
Actions: actions,
AccessType: accessType,
Order: order,
Dir: dir,
Offset: offset,
Limit: limit,
Group: groupID,
Client: clientID,
ConnectionType: connectionType,
ID: id,
OnlyTotal: ot,
},
userID: userID,
}
+58
View File
@@ -786,6 +786,64 @@ func TestListChannels(t *testing.T) {
status: http.StatusBadRequest,
err: apiutil.ErrInvalidQueryParams,
},
{
desc: "list channels with client ID",
domainID: validID,
token: validToken,
listChannelsResponse: channels.ChannelsPage{
Page: channels.Page{
Total: 1,
},
Channels: []channels.Channel{validChannelResp},
},
query: "client=" + validID,
status: http.StatusOK,
err: nil,
},
{
desc: "list channels with client ID and connection type publish",
domainID: validID,
token: validToken,
listChannelsResponse: channels.ChannelsPage{
Page: channels.Page{
Total: 1,
},
Channels: []channels.Channel{validChannelResp},
},
query: "client=" + validID + "&connection_type=publish",
status: http.StatusOK,
err: nil,
},
{
desc: "list channels with client ID and connection type subscribe",
domainID: validID,
token: validToken,
listChannelsResponse: channels.ChannelsPage{
Page: channels.Page{
Total: 1,
},
Channels: []channels.Channel{validChannelResp},
},
query: "client=" + validID + "&connection_type=subscribe",
status: http.StatusOK,
err: nil,
},
{
desc: "list channels with invalid connection type",
domainID: validID,
token: validToken,
query: "client=" + validID + "&connection_type=invalid",
status: http.StatusBadRequest,
err: apiutil.ErrValidation,
},
{
desc: "list channels with duplicate connection type",
domainID: validID,
token: validToken,
query: "connection_type=publish&connection_type=subscribe",
status: http.StatusBadRequest,
err: apiutil.ErrInvalidQueryParams,
},
}
for _, tc := range cases {
+6
View File
@@ -103,6 +103,12 @@ func (req listChannelsReq) validate() error {
return apiutil.ErrInvalidDirection
}
if req.ConnectionType != "" {
if _, err := connections.ParseConnType(req.ConnectionType); err != nil {
return apiutil.ErrValidation
}
}
return nil
}
+13 -3
View File
@@ -1326,7 +1326,7 @@ func PageQuery(pm channels.Page) (string, error) {
if pm.Client != "" {
query = append(query, "conn.client_id = :client_id ")
if pm.ConnectionType != "" {
query = append(query, "conn.type = :conn_type ")
query = append(query, ":conn_type = ANY(conn.connection_types) ")
}
}
if pm.AccessType != "" {
@@ -1381,6 +1381,16 @@ func toDBChannelsPage(pm channels.Page) (dbChannelsPage, error) {
if err != nil {
return dbChannelsPage{}, errors.Wrap(repoerr.ErrViewEntity, err)
}
var connType uint8
if pm.ConnectionType != "" {
ct, err := connections.ParseConnType(pm.ConnectionType)
if err != nil {
return dbChannelsPage{}, errors.Wrap(repoerr.ErrViewEntity, err)
}
connType = uint8(ct)
}
return dbChannelsPage{
Limit: pm.Limit,
Offset: pm.Offset,
@@ -1392,7 +1402,7 @@ func toDBChannelsPage(pm channels.Page) (dbChannelsPage, error) {
Status: pm.Status,
GroupID: sql.NullString{Valid: pm.Group.Valid, String: pm.Group.Value},
ClientID: pm.Client,
ConnType: pm.ConnectionType,
ConnType: connType,
RoleName: pm.RoleName,
RoleID: pm.RoleID,
Actions: pm.Actions,
@@ -1411,7 +1421,7 @@ type dbChannelsPage struct {
Status channels.Status `db:"status"`
GroupID sql.NullString `db:"group_id"`
ClientID string `db:"client_id"`
ConnType string `db:"type"`
ConnType uint8 `db:"conn_type"`
RoleName string `db:"role_name"`
RoleID string `db:"role_id"`
Actions pq.StringArray `db:"actions"`
File diff suppressed because it is too large Load Diff
+550
View File
@@ -278,6 +278,35 @@ func TestJournalSave(t *testing.T) {
},
err: nil,
},
{
desc: "with domain in attributes",
journal: journal.Journal{
ID: testsutil.GenerateUUID(t),
Operation: operation + ".with.domain.in.attributes",
OccurredAt: time.Now(),
Attributes: map[string]any{
"domain": testsutil.GenerateUUID(t),
"data": "test",
},
Metadata: payload,
},
err: nil,
},
{
desc: "with domain operation prefix",
journal: journal.Journal{
ID: testsutil.GenerateUUID(t),
Operation: "domain.create",
OccurredAt: time.Now(),
Attributes: map[string]any{
"id": testsutil.GenerateUUID(t),
"name": "test-domain",
"status": "enabled",
},
Metadata: payload,
},
err: nil,
},
{
desc: "with empty journal",
journal: journal.Journal{},
@@ -779,3 +808,524 @@ func extractEntities(journals []journal.Journal, entityType journal.EntityType,
return entities
}
func TestSaveClientTelemetry(t *testing.T) {
t.Cleanup(func() {
_, err := db.Exec("DELETE FROM clients_telemetry")
require.Nil(t, err, fmt.Sprintf("clean clients_telemetry unexpected error: %s", err))
})
repo := postgres.NewRepository(database)
clientID := testsutil.GenerateUUID(t)
domainID := testsutil.GenerateUUID(t)
firstSeen := time.Now().UTC().Truncate(time.Millisecond)
lastSeen := time.Now().UTC().Add(time.Hour).Truncate(time.Millisecond)
cases := []struct {
desc string
telemetry journal.ClientTelemetry
err error
}{
{
desc: "save client telemetry successfully",
telemetry: journal.ClientTelemetry{
ClientID: clientID,
DomainID: domainID,
InboundMessages: 10,
OutboundMessages: 5,
FirstSeen: firstSeen,
LastSeen: lastSeen,
},
err: nil,
},
{
desc: "save duplicate client telemetry",
telemetry: journal.ClientTelemetry{
ClientID: clientID,
DomainID: domainID,
InboundMessages: 20,
OutboundMessages: 10,
FirstSeen: firstSeen,
LastSeen: lastSeen,
},
err: repoerr.ErrConflict,
},
{
desc: "save client telemetry with zero messages",
telemetry: journal.ClientTelemetry{
ClientID: testsutil.GenerateUUID(t),
DomainID: domainID,
InboundMessages: 0,
OutboundMessages: 0,
FirstSeen: firstSeen,
LastSeen: time.Time{},
},
err: nil,
},
{
desc: "save client telemetry with high message counts",
telemetry: journal.ClientTelemetry{
ClientID: testsutil.GenerateUUID(t),
DomainID: testsutil.GenerateUUID(t),
InboundMessages: 1000000,
OutboundMessages: 999999,
FirstSeen: firstSeen,
LastSeen: lastSeen,
},
err: nil,
},
}
for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
err := repo.SaveClientTelemetry(context.Background(), tc.telemetry)
assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %v got %v", tc.desc, tc.err, err))
})
}
}
func TestDeleteClientTelemetry(t *testing.T) {
t.Cleanup(func() {
_, err := db.Exec("DELETE FROM clients_telemetry")
require.Nil(t, err, fmt.Sprintf("clean clients_telemetry unexpected error: %s", err))
})
repo := postgres.NewRepository(database)
clientID := testsutil.GenerateUUID(t)
domainID := testsutil.GenerateUUID(t)
ct := journal.ClientTelemetry{
ClientID: clientID,
DomainID: domainID,
InboundMessages: 10,
OutboundMessages: 5,
FirstSeen: time.Now().UTC(),
LastSeen: time.Now().UTC(),
}
err := repo.SaveClientTelemetry(context.Background(), ct)
require.Nil(t, err)
cases := []struct {
desc string
clientID string
domainID string
err error
}{
{
desc: "delete existing client telemetry",
clientID: clientID,
domainID: domainID,
err: nil,
},
{
desc: "delete non-existing client telemetry",
clientID: testsutil.GenerateUUID(t),
domainID: domainID,
err: repoerr.ErrNotFound,
},
}
for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
err := repo.DeleteClientTelemetry(context.Background(), tc.clientID, tc.domainID)
assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %v got %v", tc.desc, tc.err, err))
})
}
}
func TestRetrieveClientTelemetry(t *testing.T) {
t.Cleanup(func() {
_, err := db.Exec("DELETE FROM clients_telemetry")
require.Nil(t, err, fmt.Sprintf("clean clients_telemetry unexpected error: %s", err))
})
repo := postgres.NewRepository(database)
clientID := testsutil.GenerateUUID(t)
domainID := testsutil.GenerateUUID(t)
firstSeen := time.Now().UTC().Truncate(time.Millisecond)
lastSeen := time.Now().UTC().Add(time.Hour).Truncate(time.Millisecond)
ct := journal.ClientTelemetry{
ClientID: clientID,
DomainID: domainID,
InboundMessages: 10,
OutboundMessages: 5,
FirstSeen: firstSeen,
LastSeen: lastSeen,
}
err := repo.SaveClientTelemetry(context.Background(), ct)
require.Nil(t, err)
cases := []struct {
desc string
clientID string
domainID string
response journal.ClientTelemetry
err error
}{
{
desc: "retrieve existing client telemetry",
clientID: clientID,
domainID: domainID,
response: ct,
err: nil,
},
{
desc: "retrieve non-existing client telemetry",
clientID: testsutil.GenerateUUID(t),
domainID: domainID,
response: journal.ClientTelemetry{},
err: repoerr.ErrNotFound,
},
}
for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
result, err := repo.RetrieveClientTelemetry(context.Background(), tc.clientID, tc.domainID)
assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %v got %v", tc.desc, tc.err, err))
if err == nil {
assert.Equal(t, tc.response.ClientID, result.ClientID)
assert.Equal(t, tc.response.DomainID, result.DomainID)
assert.Equal(t, tc.response.InboundMessages, result.InboundMessages)
assert.Equal(t, tc.response.OutboundMessages, result.OutboundMessages)
assert.Equal(t, tc.response.FirstSeen.Unix(), result.FirstSeen.Unix())
assert.Equal(t, tc.response.LastSeen.Unix(), result.LastSeen.Unix())
}
})
}
}
func TestAddSubscription(t *testing.T) {
t.Cleanup(func() {
_, err := db.Exec("DELETE FROM subscriptions")
require.Nil(t, err)
_, err = db.Exec("DELETE FROM clients_telemetry")
require.Nil(t, err)
})
repo := postgres.NewRepository(database)
clientID := testsutil.GenerateUUID(t)
domainID := testsutil.GenerateUUID(t)
ct := journal.ClientTelemetry{
ClientID: clientID,
DomainID: domainID,
FirstSeen: time.Now().UTC(),
}
err := repo.SaveClientTelemetry(context.Background(), ct)
require.Nil(t, err)
cases := []struct {
desc string
subscription journal.ClientSubscription
err error
}{
{
desc: "add subscription successfully",
subscription: journal.ClientSubscription{
ID: testsutil.GenerateUUID(t),
SubscriberID: testsutil.GenerateUUID(t),
ChannelID: testsutil.GenerateUUID(t),
Subtopic: "subtopic",
ClientID: clientID,
},
err: nil,
},
{
desc: "add subscription with empty subtopic",
subscription: journal.ClientSubscription{
ID: testsutil.GenerateUUID(t),
SubscriberID: testsutil.GenerateUUID(t),
ChannelID: testsutil.GenerateUUID(t),
Subtopic: "",
ClientID: clientID,
},
err: nil,
},
{
desc: "add duplicate subscription",
subscription: journal.ClientSubscription{
ID: testsutil.GenerateUUID(t),
SubscriberID: testsutil.GenerateUUID(t),
ChannelID: testsutil.GenerateUUID(t),
Subtopic: "another-subtopic",
ClientID: clientID,
},
err: nil,
},
}
for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
err := repo.AddSubscription(context.Background(), tc.subscription)
assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %v got %v", tc.desc, tc.err, err))
})
}
}
func TestCountSubscriptions(t *testing.T) {
t.Cleanup(func() {
_, err := db.Exec("DELETE FROM subscriptions")
require.Nil(t, err)
_, err = db.Exec("DELETE FROM clients_telemetry")
require.Nil(t, err)
})
repo := postgres.NewRepository(database)
clientID := testsutil.GenerateUUID(t)
domainID := testsutil.GenerateUUID(t)
ct := journal.ClientTelemetry{
ClientID: clientID,
DomainID: domainID,
FirstSeen: time.Now().UTC(),
}
err := repo.SaveClientTelemetry(context.Background(), ct)
require.Nil(t, err)
for i := 0; i < 3; i++ {
sub := journal.ClientSubscription{
ID: testsutil.GenerateUUID(t),
SubscriberID: testsutil.GenerateUUID(t),
ChannelID: testsutil.GenerateUUID(t),
Subtopic: fmt.Sprintf("subtopic%d", i),
ClientID: clientID,
}
err := repo.AddSubscription(context.Background(), sub)
require.Nil(t, err)
}
cases := []struct {
desc string
clientID string
count uint64
err error
}{
{
desc: "count subscriptions for existing client",
clientID: clientID,
count: 3,
err: nil,
},
{
desc: "count subscriptions for non-existing client",
clientID: testsutil.GenerateUUID(t),
count: 0,
err: nil,
},
}
for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
count, err := repo.CountSubscriptions(context.Background(), tc.clientID)
assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %v got %v", tc.desc, tc.err, err))
assert.Equal(t, tc.count, count)
})
}
}
func TestRemoveSubscription(t *testing.T) {
t.Cleanup(func() {
_, err := db.Exec("DELETE FROM subscriptions")
require.Nil(t, err)
_, err = db.Exec("DELETE FROM clients_telemetry")
require.Nil(t, err)
})
repo := postgres.NewRepository(database)
clientID := testsutil.GenerateUUID(t)
domainID := testsutil.GenerateUUID(t)
subscriberID := testsutil.GenerateUUID(t)
ct := journal.ClientTelemetry{
ClientID: clientID,
DomainID: domainID,
FirstSeen: time.Now().UTC(),
}
err := repo.SaveClientTelemetry(context.Background(), ct)
require.Nil(t, err)
sub := journal.ClientSubscription{
ID: testsutil.GenerateUUID(t),
SubscriberID: subscriberID,
ChannelID: testsutil.GenerateUUID(t),
Subtopic: "subtopic",
ClientID: clientID,
}
err = repo.AddSubscription(context.Background(), sub)
require.Nil(t, err)
cases := []struct {
desc string
subscriberID string
err error
}{
{
desc: "remove existing subscription",
subscriberID: subscriberID,
err: nil,
},
{
desc: "remove non-existing subscription",
subscriberID: testsutil.GenerateUUID(t),
err: nil,
},
}
for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
err := repo.RemoveSubscription(context.Background(), tc.subscriberID)
assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %v got %v", tc.desc, tc.err, err))
})
}
}
func TestIncrementInboundMessages(t *testing.T) {
t.Cleanup(func() {
_, err := db.Exec("DELETE FROM clients_telemetry")
require.Nil(t, err)
})
repo := postgres.NewRepository(database)
clientID := testsutil.GenerateUUID(t)
domainID := testsutil.GenerateUUID(t)
firstSeen := time.Now().UTC().Truncate(time.Millisecond)
cases := []struct {
desc string
telemetry journal.ClientTelemetry
expectedInbound uint64
err error
setupExisting bool
existingInbound uint64
}{
{
desc: "increment inbound messages for new client",
telemetry: journal.ClientTelemetry{
ClientID: clientID,
DomainID: domainID,
FirstSeen: firstSeen,
LastSeen: firstSeen,
},
expectedInbound: 1,
setupExisting: false,
err: nil,
},
{
desc: "increment inbound messages for existing client",
telemetry: journal.ClientTelemetry{
ClientID: clientID,
DomainID: domainID,
FirstSeen: firstSeen,
LastSeen: firstSeen.Add(time.Hour),
},
expectedInbound: 2,
setupExisting: true,
existingInbound: 1,
err: nil,
},
}
for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
err := repo.IncrementInboundMessages(context.Background(), tc.telemetry)
assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %v got %v", tc.desc, tc.err, err))
if err == nil {
result, err := repo.RetrieveClientTelemetry(context.Background(), tc.telemetry.ClientID, tc.telemetry.DomainID)
require.Nil(t, err)
assert.Equal(t, tc.expectedInbound, result.InboundMessages)
}
})
}
}
func TestIncrementOutboundMessages(t *testing.T) {
t.Cleanup(func() {
_, err := db.Exec("DELETE FROM subscriptions")
require.Nil(t, err)
_, err = db.Exec("DELETE FROM clients_telemetry")
require.Nil(t, err)
})
repo := postgres.NewRepository(database)
clientID1 := testsutil.GenerateUUID(t)
clientID2 := testsutil.GenerateUUID(t)
domainID := testsutil.GenerateUUID(t)
channelID := testsutil.GenerateUUID(t)
subtopic := "test/subtopic"
for i, cid := range []string{clientID1, clientID2} {
ct := journal.ClientTelemetry{
ClientID: cid,
DomainID: domainID,
FirstSeen: time.Now().UTC(),
}
err := repo.SaveClientTelemetry(context.Background(), ct)
require.Nil(t, err)
for j := 0; j < 2; j++ {
sub := journal.ClientSubscription{
ID: testsutil.GenerateUUID(t),
SubscriberID: fmt.Sprintf("subscriber-%d-%d", i, j),
ChannelID: channelID,
Subtopic: subtopic,
ClientID: cid,
}
err = repo.AddSubscription(context.Background(), sub)
require.Nil(t, err)
}
}
cases := []struct {
desc string
channelID string
subtopic string
expectedIncrement uint64
setupAdditional bool
err error
}{
{
desc: "increment outbound messages for subscribed clients with multiple subscriptions",
channelID: channelID,
subtopic: subtopic,
expectedIncrement: 2,
err: nil,
},
{
desc: "increment for non-existing channel",
channelID: testsutil.GenerateUUID(t),
subtopic: subtopic,
expectedIncrement: 0,
err: nil,
},
{
desc: "increment with different subtopic",
channelID: channelID,
subtopic: "different/subtopic",
expectedIncrement: 0,
err: nil,
},
}
for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
err := repo.IncrementOutboundMessages(context.Background(), tc.channelID, tc.subtopic)
assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %v got %v", tc.desc, tc.err, err))
if err == nil && tc.expectedIncrement > 0 {
for _, cid := range []string{clientID1, clientID2} {
result, err := repo.RetrieveClientTelemetry(context.Background(), cid, domainID)
require.Nil(t, err)
assert.Equal(t, tc.expectedIncrement, result.OutboundMessages)
}
}
})
}
}