NOISSUE - Add Messages page ordering (#306)

* add messaging ordering

Signed-off-by: musilah <nataleigh.nk@gmail.com>

* fix lint

Signed-off-by: musilah <nataleigh.nk@gmail.com>

* use orderByTime

Signed-off-by: musilah <nataleigh.nk@gmail.com>

* fix endpoint tests

Signed-off-by: musilah <nataleigh.nk@gmail.com>

* return ui build

Signed-off-by: musilah <nataleigh.nk@gmail.com>

* use switch cases

Signed-off-by: musilah <nataleigh.nk@gmail.com>

---------

Signed-off-by: musilah <nataleigh.nk@gmail.com>
This commit is contained in:
Nataly Musilah
2025-09-12 13:17:34 +03:00
committed by GitHub
parent 9a621f4a88
commit a22685f168
3 changed files with 158 additions and 55 deletions
+45 -45
View File
@@ -33,8 +33,6 @@ const (
clientToken = "1"
userToken = "token"
invalidToken = "invalid"
email = "user@example.com"
invalid = "invalid"
numOfMessages = 100
valueFields = 5
subtopic = "topic"
@@ -158,7 +156,7 @@ func TestReadAll(t *testing.T) {
authResponse: true,
status: http.StatusOK,
res: pageRes{
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages"},
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages", Order: "time", Dir: "desc"},
Total: uint64(len(messages)),
Messages: messages[0:10],
},
@@ -170,7 +168,7 @@ func TestReadAll(t *testing.T) {
authResponse: true,
status: http.StatusOK,
res: pageRes{
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages"},
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages", Order: "time", Dir: "desc"},
Total: uint64(len(messages)),
Messages: messages[0:10],
},
@@ -247,7 +245,7 @@ func TestReadAll(t *testing.T) {
authResponse: true,
status: http.StatusOK,
res: pageRes{
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages"},
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages", Order: "time", Dir: "desc"},
Total: uint64(len(messages)),
Messages: messages[0:10],
},
@@ -259,7 +257,7 @@ func TestReadAll(t *testing.T) {
authResponse: true,
status: http.StatusOK,
res: pageRes{
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages"},
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages", Order: "time", Dir: "desc"},
Total: uint64(len(messages)),
Messages: messages[0:10],
},
@@ -271,7 +269,7 @@ func TestReadAll(t *testing.T) {
authResponse: true,
status: http.StatusOK,
res: pageRes{
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages"},
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages", Order: "time", Dir: "desc"},
Total: uint64(len(messages)),
Messages: messages[0:10],
},
@@ -283,7 +281,7 @@ func TestReadAll(t *testing.T) {
authResponse: true,
status: http.StatusOK,
res: pageRes{
PageMetadata: readers.PageMetadata{Limit: 10, Subtopic: subtopic, Format: "messages", Protocol: httpProt},
PageMetadata: readers.PageMetadata{Limit: 10, Subtopic: subtopic, Format: "messages", Protocol: httpProt, Order: "time", Dir: "desc"},
Total: uint64(len(queryMsgs)),
Messages: queryMsgs[0:10],
},
@@ -295,7 +293,7 @@ func TestReadAll(t *testing.T) {
authResponse: true,
status: http.StatusOK,
res: pageRes{
PageMetadata: readers.PageMetadata{Limit: 10, Subtopic: subtopic, Format: "messages", Protocol: httpProt},
PageMetadata: readers.PageMetadata{Limit: 10, Subtopic: subtopic, Format: "messages", Protocol: httpProt, Order: "time", Dir: "desc"},
Total: uint64(len(queryMsgs)),
Messages: queryMsgs[0:10],
},
@@ -307,7 +305,7 @@ func TestReadAll(t *testing.T) {
authResponse: true,
status: http.StatusOK,
res: pageRes{
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages", Publisher: pubID2},
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages", Publisher: pubID2, Order: "time", Dir: "desc"},
Total: uint64(len(queryMsgs)),
Messages: queryMsgs[0:10],
},
@@ -319,7 +317,7 @@ func TestReadAll(t *testing.T) {
authResponse: true,
status: http.StatusOK,
res: pageRes{
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages", Protocol: httpProt},
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages", Protocol: httpProt, Order: "time", Dir: "desc"},
Total: uint64(len(queryMsgs)),
Messages: queryMsgs[0:10],
},
@@ -331,7 +329,7 @@ func TestReadAll(t *testing.T) {
authResponse: true,
status: http.StatusOK,
res: pageRes{
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages", Name: msgName},
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages", Name: msgName, Order: "time", Dir: "desc"},
Total: uint64(len(queryMsgs)),
Messages: queryMsgs[0:10],
},
@@ -343,7 +341,7 @@ func TestReadAll(t *testing.T) {
authResponse: true,
status: http.StatusOK,
res: pageRes{
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages", Value: v},
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages", Value: v, Order: "time", Dir: "desc"},
Total: uint64(len(valueMsgs)),
Messages: valueMsgs[0:10],
},
@@ -355,7 +353,7 @@ func TestReadAll(t *testing.T) {
authResponse: true,
status: http.StatusOK,
res: pageRes{
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages", Value: v, Comparator: readers.EqualKey},
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages", Value: v, Comparator: readers.EqualKey, Order: "time", Dir: "desc"},
Total: uint64(len(valueMsgs)),
Messages: valueMsgs[0:10],
},
@@ -367,7 +365,7 @@ func TestReadAll(t *testing.T) {
authResponse: true,
status: http.StatusOK,
res: pageRes{
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages", Value: v + 1, Comparator: readers.LowerThanKey},
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages", Value: v + 1, Comparator: readers.LowerThanKey, Order: "time", Dir: "desc"},
Total: uint64(len(valueMsgs)),
Messages: valueMsgs[0:10],
},
@@ -379,7 +377,7 @@ func TestReadAll(t *testing.T) {
authResponse: true,
status: http.StatusOK,
res: pageRes{
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages", Value: v + 1, Comparator: readers.LowerThanEqualKey},
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages", Value: v + 1, Comparator: readers.LowerThanEqualKey, Order: "time", Dir: "desc"},
Total: uint64(len(valueMsgs)),
Messages: valueMsgs[0:10],
},
@@ -391,7 +389,7 @@ func TestReadAll(t *testing.T) {
authResponse: true,
status: http.StatusOK,
res: pageRes{
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages", Value: v - 1, Comparator: readers.GreaterThanKey},
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages", Value: v - 1, Comparator: readers.GreaterThanKey, Order: "time", Dir: "desc"},
Total: uint64(len(valueMsgs)),
Messages: valueMsgs[0:10],
},
@@ -403,7 +401,7 @@ func TestReadAll(t *testing.T) {
authResponse: true,
status: http.StatusOK,
res: pageRes{
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages", Value: v - 1, Comparator: readers.GreaterThanEqualKey},
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages", Value: v - 1, Comparator: readers.GreaterThanEqualKey, Order: "time", Dir: "desc"},
Total: uint64(len(valueMsgs)),
Messages: valueMsgs[0:10],
},
@@ -429,7 +427,7 @@ func TestReadAll(t *testing.T) {
authResponse: true,
status: http.StatusOK,
res: pageRes{
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages", BoolValue: true},
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages", BoolValue: true, Order: "time", Dir: "desc"},
Total: uint64(len(boolMsgs)),
Messages: boolMsgs[0:10],
},
@@ -448,7 +446,7 @@ func TestReadAll(t *testing.T) {
authResponse: true,
status: http.StatusOK,
res: pageRes{
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages", StringValue: vs},
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages", StringValue: vs, Order: "time", Dir: "desc"},
Total: uint64(len(stringMsgs)),
Messages: stringMsgs[0:10],
},
@@ -460,7 +458,7 @@ func TestReadAll(t *testing.T) {
authResponse: true,
status: http.StatusOK,
res: pageRes{
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages", DataValue: vd},
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages", DataValue: vd, Order: "time", Dir: "desc"},
Total: uint64(len(dataMsgs)),
Messages: dataMsgs[0:10],
},
@@ -486,7 +484,7 @@ func TestReadAll(t *testing.T) {
authResponse: true,
status: http.StatusOK,
res: pageRes{
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages", From: messages[19].Time, To: messages[4].Time},
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages", From: messages[19].Time, To: messages[4].Time, Order: "time", Dir: "desc"},
Total: uint64(len(messages[5:20])),
Messages: messages[5:15],
},
@@ -505,7 +503,7 @@ func TestReadAll(t *testing.T) {
authResponse: true,
status: http.StatusOK,
res: pageRes{
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages"},
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages", Order: "time", Dir: "desc"},
Total: uint64(len(messages)),
Messages: messages[0:10],
},
@@ -517,6 +515,7 @@ func TestReadAll(t *testing.T) {
authResponse: true,
status: http.StatusBadRequest,
},
{
desc: "read page with aggregation, interval, to and from as client",
url: fmt.Sprintf("%s/%s/channels/%s/messages?aggregation=MAX&interval=10h&from=%f&to=%f", ts.URL, domainID, chanID, messages[19].Time, messages[4].Time),
@@ -524,11 +523,12 @@ func TestReadAll(t *testing.T) {
authResponse: true,
status: http.StatusOK,
res: pageRes{
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages", Aggregation: "MAX", Interval: "10h", From: messages[19].Time, To: messages[4].Time},
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages", Aggregation: "MAX", Interval: "10h", From: messages[19].Time, To: messages[4].Time, Order: "time", Dir: "desc"},
Total: uint64(len(messages[5:20])),
Messages: messages[5:15],
},
},
{
desc: "read page with invalid aggregation and valid interval, to and from as client",
url: fmt.Sprintf("%s/%s/channels/%s/messages?aggregation=invalid&interval=10h&from=%f&to=%f", ts.URL, domainID, chanID, messages[19].Time, messages[4].Time),
@@ -571,7 +571,7 @@ func TestReadAll(t *testing.T) {
authResponse: true,
status: http.StatusOK,
res: pageRes{
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages"},
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages", Order: "time", Dir: "desc"},
Total: uint64(len(messages)),
Messages: messages[0:10],
},
@@ -655,7 +655,7 @@ func TestReadAll(t *testing.T) {
authResponse: true,
status: http.StatusOK,
res: pageRes{
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages"},
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages", Order: "time", Dir: "desc"},
Total: uint64(len(messages)),
Messages: messages[0:10],
},
@@ -667,7 +667,7 @@ func TestReadAll(t *testing.T) {
authResponse: true,
status: http.StatusOK,
res: pageRes{
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages"},
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages", Order: "time", Dir: "desc"},
Total: uint64(len(messages)),
Messages: messages[0:10],
},
@@ -679,7 +679,7 @@ func TestReadAll(t *testing.T) {
authResponse: true,
status: http.StatusOK,
res: pageRes{
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages"},
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages", Order: "time", Dir: "desc"},
Total: uint64(len(messages)),
Messages: messages[0:10],
},
@@ -691,7 +691,7 @@ func TestReadAll(t *testing.T) {
authResponse: true,
status: http.StatusOK,
res: pageRes{
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages", Subtopic: subtopic, Protocol: httpProt},
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages", Subtopic: subtopic, Protocol: httpProt, Order: "time", Dir: "desc"},
Total: uint64(len(queryMsgs)),
Messages: queryMsgs[0:10],
},
@@ -703,7 +703,7 @@ func TestReadAll(t *testing.T) {
authResponse: true,
status: http.StatusOK,
res: pageRes{
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages", Subtopic: subtopic, Protocol: httpProt},
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages", Subtopic: subtopic, Protocol: httpProt, Order: "time", Dir: "desc"},
Total: uint64(len(queryMsgs)),
Messages: queryMsgs[0:10],
},
@@ -715,7 +715,7 @@ func TestReadAll(t *testing.T) {
authResponse: true,
status: http.StatusOK,
res: pageRes{
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages", Publisher: pubID2},
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages", Publisher: pubID2, Order: "time", Dir: "desc"},
Total: uint64(len(queryMsgs)),
Messages: queryMsgs[0:10],
},
@@ -727,7 +727,7 @@ func TestReadAll(t *testing.T) {
authResponse: true,
status: http.StatusOK,
res: pageRes{
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages", Protocol: httpProt},
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages", Protocol: httpProt, Order: "time", Dir: "desc"},
Total: uint64(len(queryMsgs)),
Messages: queryMsgs[0:10],
},
@@ -739,7 +739,7 @@ func TestReadAll(t *testing.T) {
authResponse: true,
status: http.StatusOK,
res: pageRes{
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages", Name: msgName},
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages", Name: msgName, Order: "time", Dir: "desc"},
Total: uint64(len(queryMsgs)),
Messages: queryMsgs[0:10],
},
@@ -751,7 +751,7 @@ func TestReadAll(t *testing.T) {
authResponse: true,
status: http.StatusOK,
res: pageRes{
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages", Value: v},
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages", Value: v, Order: "time", Dir: "desc"},
Total: uint64(len(valueMsgs)),
Messages: valueMsgs[0:10],
},
@@ -763,7 +763,7 @@ func TestReadAll(t *testing.T) {
authResponse: true,
status: http.StatusOK,
res: pageRes{
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages", Value: v, Comparator: readers.EqualKey},
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages", Value: v, Comparator: readers.EqualKey, Order: "time", Dir: "desc"},
Total: uint64(len(valueMsgs)),
Messages: valueMsgs[0:10],
},
@@ -775,7 +775,7 @@ func TestReadAll(t *testing.T) {
authResponse: true,
status: http.StatusOK,
res: pageRes{
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages", Value: v + 1, Comparator: readers.LowerThanKey},
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages", Value: v + 1, Comparator: readers.LowerThanKey, Order: "time", Dir: "desc"},
Total: uint64(len(valueMsgs)),
Messages: valueMsgs[0:10],
},
@@ -787,7 +787,7 @@ func TestReadAll(t *testing.T) {
authResponse: true,
status: http.StatusOK,
res: pageRes{
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages", Value: v + 1, Comparator: readers.LowerThanEqualKey},
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages", Value: v + 1, Comparator: readers.LowerThanEqualKey, Order: "time", Dir: "desc"},
Total: uint64(len(valueMsgs)),
Messages: valueMsgs[0:10],
},
@@ -799,7 +799,7 @@ func TestReadAll(t *testing.T) {
status: http.StatusOK,
authResponse: true,
res: pageRes{
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages", Value: v - 1, Comparator: readers.GreaterThanKey},
PageMetadata: readers.PageMetadata{Limit: 10, Order: "time", Dir: "desc", Format: "messages", Value: v - 1, Comparator: readers.GreaterThanKey},
Total: uint64(len(valueMsgs)),
Messages: valueMsgs[0:10],
},
@@ -811,7 +811,7 @@ func TestReadAll(t *testing.T) {
authResponse: true,
status: http.StatusOK,
res: pageRes{
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages", Value: v - 1, Comparator: readers.GreaterThanEqualKey},
PageMetadata: readers.PageMetadata{Order: "time", Dir: "desc", Limit: 10, Format: "messages", Value: v - 1, Comparator: readers.GreaterThanEqualKey},
Total: uint64(len(valueMsgs)),
Messages: valueMsgs[0:10],
},
@@ -837,7 +837,7 @@ func TestReadAll(t *testing.T) {
authResponse: true,
status: http.StatusOK,
res: pageRes{
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages", BoolValue: true},
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages", BoolValue: true, Order: "time", Dir: "desc"},
Total: uint64(len(boolMsgs)),
Messages: boolMsgs[0:10],
},
@@ -856,7 +856,7 @@ func TestReadAll(t *testing.T) {
authResponse: true,
status: http.StatusOK,
res: pageRes{
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages", StringValue: vs},
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages", StringValue: vs, Order: "time", Dir: "desc"},
Total: uint64(len(stringMsgs)),
Messages: stringMsgs[0:10],
},
@@ -868,7 +868,7 @@ func TestReadAll(t *testing.T) {
authResponse: true,
status: http.StatusOK,
res: pageRes{
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages", DataValue: vd},
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages", DataValue: vd, Order: "time", Dir: "desc"},
Total: uint64(len(dataMsgs)),
Messages: dataMsgs[0:10],
},
@@ -894,7 +894,7 @@ func TestReadAll(t *testing.T) {
authResponse: true,
status: http.StatusOK,
res: pageRes{
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages", From: messages[19].Time, To: messages[4].Time},
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages", From: messages[19].Time, To: messages[4].Time, Order: "time", Dir: "desc"},
Total: uint64(len(messages[5:20])),
Messages: messages[5:15],
},
@@ -913,7 +913,7 @@ func TestReadAll(t *testing.T) {
authResponse: true,
status: http.StatusOK,
res: pageRes{
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages"},
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages", Order: "time", Dir: "desc"},
Total: uint64(len(messages)),
Messages: messages[0:10],
},
@@ -932,7 +932,7 @@ func TestReadAll(t *testing.T) {
authResponse: true,
status: http.StatusOK,
res: pageRes{
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages", Aggregation: "MAX", Interval: "10h", From: messages[19].Time, To: messages[4].Time},
PageMetadata: readers.PageMetadata{Limit: 10, Format: "messages", Aggregation: "MAX", Interval: "10h", From: messages[19].Time, To: messages[4].Time, Order: "time", Dir: "desc"},
Total: uint64(len(messages[5:20])),
Messages: messages[5:15],
},
+13
View File
@@ -11,6 +11,7 @@ import (
"github.com/absmach/supermq"
grpcChannelsV1 "github.com/absmach/supermq/api/grpc/channels/v1"
grpcClientsV1 "github.com/absmach/supermq/api/grpc/clients/v1"
api "github.com/absmach/supermq/api/http"
apiutil "github.com/absmach/supermq/api/http/util"
smqauthn "github.com/absmach/supermq/pkg/authn"
"github.com/absmach/supermq/pkg/connections"
@@ -143,6 +144,16 @@ func decodeList(_ context.Context, r *http.Request) (any, error) {
return nil, errors.Wrap(apiutil.ErrValidation, err)
}
order, err := apiutil.ReadStringQuery(r, api.OrderKey, "time")
if err != nil {
return nil, errors.Wrap(apiutil.ErrValidation, err)
}
dir, err := apiutil.ReadStringQuery(r, api.DirKey, "desc")
if err != nil {
return nil, errors.Wrap(apiutil.ErrValidation, err)
}
var interval string
if aggregation != "" {
interval, err = apiutil.ReadStringQuery(r, intervalKey, defInterval)
@@ -173,6 +184,8 @@ func decodeList(_ context.Context, r *http.Request) (any, error) {
To: to,
Aggregation: aggregation,
Interval: interval,
Order: order,
Dir: dir,
},
}
return req, nil
+100 -10
View File
@@ -8,6 +8,7 @@ import (
"fmt"
"strings"
api "github.com/absmach/supermq/api/http"
"github.com/absmach/supermq/pkg/errors"
"github.com/absmach/supermq/pkg/transformers/senml"
"github.com/absmach/supermq/readers"
@@ -17,7 +18,11 @@ import (
)
// Table for SenML messages.
const defTable = "messages"
const (
defTable = "messages"
orderByTime = "time"
orderByCreated = "created"
)
var _ readers.MessageRepository = (*timescaleRepository)(nil)
@@ -33,21 +38,46 @@ func New(db *sqlx.DB) readers.MessageRepository {
}
func (tr timescaleRepository) ReadAll(chanID string, rpm readers.PageMetadata) (readers.MessagesPage, error) {
order := "time"
format := defTable
if rpm.Format != "" && rpm.Format != defTable {
order = "created"
format = rpm.Format
}
q := fmt.Sprintf(`SELECT * FROM %s WHERE %s ORDER BY %s DESC LIMIT :limit OFFSET :offset;`, format, fmtCondition(rpm), order)
totalQuery := fmt.Sprintf(`SELECT COUNT(*) FROM %s WHERE %s;`, format, fmtCondition(rpm))
isSenml := (format == defTable)
// If aggregation is provided, add time_bucket and aggregation to the query
const timeDivisor = 1000000000
isAggregated := isSenml && rpm.Aggregation != "" && rpm.Interval != ""
if rpm.Aggregation != "" {
if rpm.Order == "" {
switch {
case isSenml:
rpm.Order = orderByTime
default:
rpm.Order = orderByCreated
}
}
orderClause := applyOrdering(rpm, isAggregated, isSenml)
pgData := ""
if rpm.Limit != 0 {
pgData = "LIMIT :limit"
}
if rpm.Offset != 0 {
if pgData != "" {
pgData += " "
}
pgData += "OFFSET :offset"
}
where := fmtCondition(rpm)
var q string
totalQuery := fmt.Sprintf(`SELECT COUNT(*) FROM %s WHERE %s;`, format, where)
if isAggregated {
q = fmt.Sprintf(`
SELECT
EXTRACT(epoch FROM time_bucket('%s', to_timestamp(time/%d))) *%d AS time,
@@ -62,12 +92,14 @@ func (tr timescaleRepository) ReadAll(chanID string, rpm readers.PageMetadata) (
WHERE
%s
GROUP BY 1
ORDER BY time DESC
LIMIT :limit OFFSET :offset;
%s
%s;
`,
rpm.Interval, timeDivisor, timeDivisor, rpm.Aggregation, format, fmtCondition(rpm))
rpm.Interval, timeDivisor, timeDivisor, rpm.Aggregation, format, where, orderClause, pgData)
totalQuery = fmt.Sprintf(`SELECT COUNT(*) FROM (SELECT EXTRACT(epoch FROM time_bucket('%s', to_timestamp(time/%d))) AS time, %s(value) AS value FROM %s WHERE %s GROUP BY 1) AS subquery;`, rpm.Interval, timeDivisor, rpm.Aggregation, format, fmtCondition(rpm))
totalQuery = fmt.Sprintf(`SELECT COUNT(*) FROM (SELECT EXTRACT(epoch FROM time_bucket('%s', to_timestamp(time/%d))) AS time, %s(value) AS value FROM %s WHERE %s GROUP BY 1) AS subquery;`, rpm.Interval, timeDivisor, rpm.Aggregation, format, where)
} else {
q = fmt.Sprintf(`SELECT * FROM %s WHERE %s %s %s;`, format, where, orderClause, pgData)
}
params := map[string]any{
@@ -101,6 +133,7 @@ func (tr timescaleRepository) ReadAll(chanID string, rpm readers.PageMetadata) (
PageMetadata: rpm,
Messages: []readers.Message{},
}
switch format {
case defTable:
for rows.Next() {
@@ -242,3 +275,60 @@ func (msg jsonMessage) toMap() (map[string]any, error) {
ret["payload"] = pld
return ret, nil
}
func applyOrdering(pm readers.PageMetadata, isAggregated bool, isSenml bool) string {
timeCol := orderByTime
if !isSenml {
timeCol = orderByCreated
}
dir := pm.Dir
if dir != api.AscDir && dir != api.DescDir {
dir = api.DescDir
}
aggCols := map[string]bool{
orderByTime: true, "value": true, "publisher": true, "protocol": true,
"subtopic": true, "name": true, "unit": true,
}
senmlCols := map[string]bool{
orderByTime: true, "value": true, "publisher": true, "name": true,
"protocol": true, "channel": true, "subtopic": true, "unit": true,
}
jsonCols := map[string]bool{
orderByCreated: true, "publisher": true, "protocol": true,
"channel": true, "subtopic": true,
}
if isAggregated {
col := pm.Order
if !aggCols[col] {
col = orderByTime
}
if col == orderByTime {
return fmt.Sprintf("ORDER BY time %s", dir)
}
return fmt.Sprintf("ORDER BY %s %s, time %s", col, dir, dir)
}
col := pm.Order
switch {
case isSenml:
if !senmlCols[col] {
col = orderByTime
}
case !isSenml:
if !jsonCols[col] {
col = orderByCreated
}
}
secondary := fmt.Sprintf("%s DESC", timeCol)
if col == timeCol {
return fmt.Sprintf("ORDER BY %s %s", col, dir)
}
return fmt.Sprintf("ORDER BY %s %s, %s", col, dir, secondary)
}