NOISSUE - Update timescale reader (#2085)

Signed-off-by: Musilah <nataleigh.nk@gmail.com>
Signed-off-by: Rodney Osodo <28790446+rodneyosodo@users.noreply.github.com>
Co-authored-by: Rodney Osodo <28790446+rodneyosodo@users.noreply.github.com>
This commit is contained in:
Nataly Musilah
2024-03-05 14:12:55 +03:00
committed by GitHub
parent 2be34c42f4
commit 42d433a92f
11 changed files with 298 additions and 27 deletions
+38 -7
View File
@@ -26,7 +26,7 @@ servers:
- url: https://localhost:9009
- url: http://localhost:9011
- url: https://localhost:9011
tags:
- name: readers
description: Everything about your Readers
@@ -57,14 +57,16 @@ paths:
- $ref: "#/components/parameters/DataValue"
- $ref: "#/components/parameters/From"
- $ref: "#/components/parameters/To"
- $ref: "#/components/parameters/Aggregation"
- $ref: "#/components/parameters/Interval"
responses:
'200':
"200":
$ref: "#/components/responses/MessagesPageRes"
'400':
"400":
description: Failed due to malformed query parameters.
'401':
"401":
description: Missing or invalid access token provided.
'500':
"500":
$ref: "#/components/responses/ServiceError"
/health:
get:
@@ -72,9 +74,9 @@ paths:
tags:
- health
responses:
'200':
"200":
$ref: "#/components/responses/HealthRes"
'500':
"500":
$ref: "#/components/responses/ServiceError"
components:
@@ -226,6 +228,7 @@ components:
in: query
schema:
type: number
example: 1709218556069
required: false
To:
name: to
@@ -233,6 +236,34 @@ components:
in: query
schema:
type: number
example: 1709218757503
required: false
Aggregation:
name: aggregation
description: Aggregation function.
in: query
schema:
type: string
enum:
- MAX
- AVG
- MIN
- SUM
- COUNT
- max
- min
- sum
- avg
- count
example: MAX
required: false
Interval:
name: interval
description: Aggregation interval.
in: query
schema:
type: string
example: 10s
required: false
responses:
+5 -3
View File
@@ -38,9 +38,11 @@ var cmdMessages = []cobra.Command{
logUsage(cmd.Use)
return
}
pageMetadata := mgxsdk.PageMetadata{
Offset: Offset,
Limit: Limit,
pageMetadata := mgxsdk.MessagePageMetadata{
PageMetadata: mgxsdk.PageMetadata{
Offset: Offset,
Limit: Limit,
},
}
m, err := sdk.ReadMessages(pageMetadata, args[0], args[1])
+12
View File
@@ -164,4 +164,16 @@ var (
// ErrRollbackTx indicates failed to rollback transaction.
ErrRollbackTx = errors.New("failed to rollback transaction")
// ErrInvalidAggregation indicates invalid aggregation value.
ErrInvalidAggregation = errors.New("invalid aggregation value")
// ErrInvalidInterval indicates invalid interval value.
ErrInvalidInterval = errors.New("invalid interval value")
// ErrMissingFrom indicates missing from value.
ErrMissingFrom = errors.New("missing from time value")
// ErrMissingTo indicates missing to value.
ErrMissingTo = errors.New("missing to time value")
)
+38 -5
View File
@@ -7,6 +7,8 @@ import (
"encoding/json"
"fmt"
"net/http"
"net/url"
"strconv"
"strings"
"github.com/absmach/magistrala/internal/apiutil"
@@ -23,14 +25,14 @@ func (sdk mgSDK) SendMessage(chanName, msg, key string) errors.SDKError {
subtopicPart = fmt.Sprintf("/%s", strings.ReplaceAll(chanNameParts[1], ".", "/"))
}
url := fmt.Sprintf("%s/channels/%s/messages%s", sdk.httpAdapterURL, chanID, subtopicPart)
reqURL := fmt.Sprintf("%s/channels/%s/messages%s", sdk.httpAdapterURL, chanID, subtopicPart)
_, _, err := sdk.processRequest(http.MethodPost, url, ThingPrefix+key, []byte(msg), nil, http.StatusAccepted)
_, _, err := sdk.processRequest(http.MethodPost, reqURL, ThingPrefix+key, []byte(msg), nil, http.StatusAccepted)
return err
}
func (sdk mgSDK) ReadMessages(pm PageMetadata, chanName, token string) (MessagesPage, errors.SDKError) {
func (sdk mgSDK) ReadMessages(pm MessagePageMetadata, chanName, token string) (MessagesPage, errors.SDKError) {
chanNameParts := strings.SplitN(chanName, ".", channelParts)
chanID := chanNameParts[0]
subtopicPart := ""
@@ -39,7 +41,7 @@ func (sdk mgSDK) ReadMessages(pm PageMetadata, chanName, token string) (Messages
}
readMessagesEndpoint := fmt.Sprintf("channels/%s/messages%s", chanID, subtopicPart)
url, err := sdk.withQueryParams(sdk.readerURL, readMessagesEndpoint, pm)
msgURL, err := sdk.withMessageQueryParams(sdk.readerURL, readMessagesEndpoint, pm)
if err != nil {
return MessagesPage{}, errors.NewSDKError(err)
}
@@ -47,7 +49,7 @@ func (sdk mgSDK) ReadMessages(pm PageMetadata, chanName, token string) (Messages
header := make(map[string]string)
header["Content-Type"] = string(sdk.msgContentType)
_, body, sdkerr := sdk.processRequest(http.MethodGet, url, token, nil, header, http.StatusOK)
_, body, sdkerr := sdk.processRequest(http.MethodGet, msgURL, token, nil, header, http.StatusOK)
if sdkerr != nil {
return MessagesPage{}, sdkerr
}
@@ -69,3 +71,34 @@ func (sdk *mgSDK) SetContentType(ct ContentType) errors.SDKError {
return nil
}
func (sdk mgSDK) withMessageQueryParams(baseURL, endpoint string, mpm MessagePageMetadata) (string, error) {
b, err := json.Marshal(mpm)
if err != nil {
return "", err
}
q := map[string]interface{}{}
if err := json.Unmarshal(b, &q); err != nil {
return "", err
}
ret := url.Values{}
for k, v := range q {
switch t := v.(type) {
case string:
ret.Add(k, t)
case float64:
ret.Add(k, strconv.FormatFloat(t, 'f', -1, 64))
case uint64:
ret.Add(k, strconv.FormatUint(t, 10))
case int64:
ret.Add(k, strconv.FormatInt(t, 10))
case json.Number:
ret.Add(k, t.String())
case bool:
ret.Add(k, strconv.FormatBool(t))
}
}
qs := ret.Encode()
return fmt.Sprintf("%s/%s?%s", baseURL, endpoint, qs), nil
}
+18 -2
View File
@@ -69,6 +69,22 @@ var (
ErrInvalidJWT = errors.New("invalid JWT")
)
type MessagePageMetadata struct {
PageMetadata
Subtopic string `json:"subtopic,omitempty"`
Publisher string `json:"publisher,omitempty"`
Comparator string `json:"comparator,omitempty"`
BoolValue *bool `json:"vb,omitempty"`
StringValue string `json:"vs,omitempty"`
DataValue string `json:"vd,omitempty"`
From float64 `json:"from,omitempty"`
To float64 `json:"to,omitempty"`
Aggregation string `json:"aggregation,omitempty"`
Interval string `json:"interval,omitempty"`
Value float64 `json:"value,omitempty"`
Protocol string `json:"protocol,omitempty"`
}
type PageMetadata struct {
Total uint64 `json:"total"`
Offset uint64 `json:"offset"`
@@ -828,13 +844,13 @@ type SDK interface {
// ReadMessages read messages of specified channel.
//
// example:
// pm := sdk.PageMetadata{
// pm := sdk.MessagePageMetadata{
// Offset: 0,
// Limit: 10,
// }
// msgs, _ := sdk.ReadMessages(pm,"channelID", "token")
// fmt.Println(msgs)
ReadMessages(pm PageMetadata, chanID, token string) (MessagesPage, errors.SDKError)
ReadMessages(pm MessagePageMetadata, chanID, token string) (MessagesPage, errors.SDKError)
// SetContentType sets message content type.
//
+4 -4
View File
@@ -1805,7 +1805,7 @@ func (_m *SDK) Parents(id string, pm sdk.PageMetadata, token string) (sdk.Groups
}
// ReadMessages provides a mock function with given fields: pm, chanID, token
func (_m *SDK) ReadMessages(pm sdk.PageMetadata, chanID string, token string) (sdk.MessagesPage, errors.SDKError) {
func (_m *SDK) ReadMessages(pm sdk.MessagePageMetadata, chanID string, token string) (sdk.MessagesPage, errors.SDKError) {
ret := _m.Called(pm, chanID, token)
if len(ret) == 0 {
@@ -1814,16 +1814,16 @@ func (_m *SDK) ReadMessages(pm sdk.PageMetadata, chanID string, token string) (s
var r0 sdk.MessagesPage
var r1 errors.SDKError
if rf, ok := ret.Get(0).(func(sdk.PageMetadata, string, string) (sdk.MessagesPage, errors.SDKError)); ok {
if rf, ok := ret.Get(0).(func(sdk.MessagePageMetadata, string, string) (sdk.MessagesPage, errors.SDKError)); ok {
return rf(pm, chanID, token)
}
if rf, ok := ret.Get(0).(func(sdk.PageMetadata, string, string) sdk.MessagesPage); ok {
if rf, ok := ret.Get(0).(func(sdk.MessagePageMetadata, string, string) sdk.MessagesPage); ok {
r0 = rf(pm, chanID, token)
} else {
r0 = ret.Get(0).(sdk.MessagesPage)
}
if rf, ok := ret.Get(1).(func(sdk.PageMetadata, string, string) errors.SDKError); ok {
if rf, ok := ret.Get(1).(func(sdk.MessagePageMetadata, string, string) errors.SDKError); ok {
r1 = rf(pm, chanID, token)
} else {
if ret.Get(1) != nil {
+124 -1
View File
@@ -414,7 +414,6 @@ func TestReadAll(t *testing.T) {
key: thingToken,
status: http.StatusBadRequest,
},
{
desc: "read page with non-float to as thing",
url: fmt.Sprintf("%s/channels/%s/messages?to=ABCD", ts.URL, chanID),
@@ -431,6 +430,68 @@ func TestReadAll(t *testing.T) {
Messages: messages[5:15],
},
},
{
desc: "read page with aggregation as thing",
url: fmt.Sprintf("%s/channels/%s/messages?aggregation=MAX", ts.URL, chanID),
key: thingToken,
status: http.StatusBadRequest,
},
{
desc: "read page with interval as thing",
url: fmt.Sprintf("%s/channels/%s/messages?interval=10h", ts.URL, chanID),
key: thingToken,
status: http.StatusOK,
res: pageRes{
Total: uint64(len(messages)),
Messages: messages[0:10],
},
},
{
desc: "read page with aggregation and interval as thing",
url: fmt.Sprintf("%s/channels/%s/messages?aggregation=MAX&interval=10h", ts.URL, chanID),
key: thingToken,
status: http.StatusBadRequest,
},
{
desc: "read page with aggregation, interval, to and from as thing",
url: fmt.Sprintf("%s/channels/%s/messages?aggregation=MAX&interval=10h&from=%f&to=%f", ts.URL, chanID, messages[19].Time, messages[4].Time),
key: thingToken,
status: http.StatusOK,
res: pageRes{
Total: uint64(len(messages[5:20])),
Messages: messages[5:15],
},
},
{
desc: "read page with invalid aggregation and valid interval, to and from as thing",
url: fmt.Sprintf("%s/channels/%s/messages?aggregation=invalid&interval=10h&from=%f&to=%f", ts.URL, chanID, messages[19].Time, messages[4].Time),
key: thingToken,
status: http.StatusBadRequest,
},
{
desc: "read page with invalid interval and valid aggregation, to and from as thing",
url: fmt.Sprintf("%s/channels/%s/messages?aggregation=MAX&interval=10hrs&from=%f&to=%f", ts.URL, chanID, messages[19].Time, messages[4].Time),
key: thingToken,
status: http.StatusBadRequest,
},
{
desc: "read page with aggregation, interval and to with missing from as thing",
url: fmt.Sprintf("%s/channels/%s/messages?aggregation=MAX&interval=10h&to=%f", ts.URL, chanID, messages[4].Time),
key: thingToken,
status: http.StatusBadRequest,
},
{
desc: "read page with aggregation, interval and to with invalid from as thing",
url: fmt.Sprintf("%s/channels/%s/messages?aggregation=MAX&interval=10h&to=ABCD&from=%f", ts.URL, chanID, messages[4].Time),
key: thingToken,
status: http.StatusBadRequest,
},
{
desc: "read page with aggregation, interval and to with invalid to as thing",
url: fmt.Sprintf("%s/channels/%s/messages?aggregation=MAX&interval=10h&from=%f&to=ABCD", ts.URL, chanID, messages[4].Time),
key: thingToken,
status: http.StatusBadRequest,
},
{
desc: "read page with valid offset and limit as user",
url: fmt.Sprintf("%s/channels/%s/messages?offset=0&limit=10", ts.URL, chanID),
@@ -711,6 +772,68 @@ func TestReadAll(t *testing.T) {
Messages: messages[5:15],
},
},
{
desc: "read page with aggregation as user",
url: fmt.Sprintf("%s/channels/%s/messages?aggregation=MAX", ts.URL, chanID),
key: userToken,
status: http.StatusBadRequest,
},
{
desc: "read page with interval as user",
url: fmt.Sprintf("%s/channels/%s/messages?interval=10h", ts.URL, chanID),
key: userToken,
status: http.StatusOK,
res: pageRes{
Total: uint64(len(messages)),
Messages: messages[0:10],
},
},
{
desc: "read page with aggregation and interval as user",
url: fmt.Sprintf("%s/channels/%s/messages?aggregation=MAX&interval=10h", ts.URL, chanID),
key: userToken,
status: http.StatusBadRequest,
},
{
desc: "read page with aggregation, interval, to and from as user",
url: fmt.Sprintf("%s/channels/%s/messages?aggregation=MAX&interval=10h&from=%f&to=%f", ts.URL, chanID, messages[19].Time, messages[4].Time),
key: userToken,
status: http.StatusOK,
res: pageRes{
Total: uint64(len(messages[5:20])),
Messages: messages[5:15],
},
},
{
desc: "read page with invalid aggregation and valid interval, to and from as user",
url: fmt.Sprintf("%s/channels/%s/messages?aggregation=invalid&interval=10h&from=%f&to=%f", ts.URL, chanID, messages[19].Time, messages[4].Time),
key: userToken,
status: http.StatusBadRequest,
},
{
desc: "read page with invalid interval and valid aggregation, to and from as user",
url: fmt.Sprintf("%s/channels/%s/messages?aggregation=MAX&interval=10hrs&from=%f&to=%f", ts.URL, chanID, messages[19].Time, messages[4].Time),
key: userToken,
status: http.StatusBadRequest,
},
{
desc: "read page with aggregation, interval and to with missing from as user",
url: fmt.Sprintf("%s/channels/%s/messages?aggregation=MAX&interval=10h&to=%f", ts.URL, chanID, messages[4].Time),
key: userToken,
status: http.StatusBadRequest,
},
{
desc: "read page with aggregation, interval and to with invalid from as user",
url: fmt.Sprintf("%s/channels/%s/messages?aggregation=MAX&interval=10h&to=ABCD&from=%f", ts.URL, chanID, messages[4].Time),
key: userToken,
status: http.StatusBadRequest,
},
{
desc: "read page with aggregation, interval and to with invalid to as user",
url: fmt.Sprintf("%s/channels/%s/messages?aggregation=MAX&interval=10h&from=%f&to=ABCD", ts.URL, chanID, messages[4].Time),
key: userToken,
status: http.StatusBadRequest,
},
}
for _, tc := range cases {
+24
View File
@@ -4,12 +4,18 @@
package api
import (
"slices"
"strings"
"time"
"github.com/absmach/magistrala/internal/apiutil"
"github.com/absmach/magistrala/readers"
)
const maxLimitSize = 1000
var validAggregations = []string{"MAX", "MIN", "AVG", "SUM", "COUNT"}
type listMessagesReq struct {
chanID string
token string
@@ -39,5 +45,23 @@ func (req listMessagesReq) validate() error {
return apiutil.ErrInvalidComparator
}
if req.pageMeta.Aggregation != "" {
if req.pageMeta.From == 0 {
return apiutil.ErrMissingFrom
}
if req.pageMeta.To == 0 {
return apiutil.ErrMissingTo
}
if !slices.Contains(validAggregations, strings.ToUpper(req.pageMeta.Aggregation)) {
return apiutil.ErrInvalidAggregation
}
if _, err := time.ParseDuration(req.pageMeta.Interval); err != nil {
return apiutil.ErrInvalidInterval
}
}
return nil
}
+23 -1
View File
@@ -36,6 +36,9 @@ const (
comparatorKey = "comparator"
fromKey = "from"
toKey = "to"
aggregationKey = "aggregation"
intervalKey = "interval"
defInterval = "1s"
defLimit = 10
defOffset = 0
defFormat = "messages"
@@ -141,6 +144,19 @@ func decodeList(_ context.Context, r *http.Request) (interface{}, error) {
return nil, errors.Wrap(apiutil.ErrValidation, err)
}
aggregation, err := apiutil.ReadStringQuery(r, aggregationKey, "")
if err != nil {
return nil, errors.Wrap(apiutil.ErrValidation, err)
}
var interval string
if aggregation != "" {
interval, err = apiutil.ReadStringQuery(r, intervalKey, defInterval)
if err != nil {
return nil, errors.Wrap(apiutil.ErrValidation, err)
}
}
req := listMessagesReq{
chanID: chi.URLParam(r, "chanID"),
token: apiutil.ExtractBearerToken(r),
@@ -160,6 +176,8 @@ func decodeList(_ context.Context, r *http.Request) (interface{}, error) {
BoolValue: vb,
From: from,
To: to,
Aggregation: aggregation,
Interval: interval,
},
}
return req, nil
@@ -196,7 +214,11 @@ func encodeError(_ context.Context, err error, w http.ResponseWriter) {
errors.Contains(err, apiutil.ErrMissingID),
errors.Contains(err, apiutil.ErrLimitSize),
errors.Contains(err, apiutil.ErrOffsetSize),
errors.Contains(err, apiutil.ErrInvalidComparator):
errors.Contains(err, apiutil.ErrInvalidComparator),
errors.Contains(err, apiutil.ErrInvalidAggregation),
errors.Contains(err, apiutil.ErrInvalidInterval),
errors.Contains(err, apiutil.ErrMissingFrom),
errors.Contains(err, apiutil.ErrMissingTo):
w.WriteHeader(http.StatusBadRequest)
case errors.Contains(err, svcerr.ErrAuthentication),
errors.Contains(err, svcerr.ErrAuthorization),
+2
View File
@@ -55,6 +55,8 @@ type PageMetadata struct {
From float64 `json:"from,omitempty"`
To float64 `json:"to,omitempty"`
Format string `json:"format,omitempty"`
Aggregation string `json:"aggregation,omitempty"`
Interval string `json:"interval,omitempty"`
}
// ParseValueComparator convert comparison operator keys into mathematic anotation.
+10 -4
View File
@@ -37,7 +37,14 @@ func (tr timescaleRepository) ReadAll(chanID string, rpm readers.PageMetadata) (
format = rpm.Format
}
q := fmt.Sprintf(`SELECT * FROM %s WHERE %s ORDER BY %s DESC LIMIT :limit OFFSET :offset;`, format, fmtCondition(chanID, rpm), order)
q := fmt.Sprintf(`SELECT * FROM %s WHERE %s ORDER BY %s DESC LIMIT :limit OFFSET :offset;`, format, fmtCondition(rpm), order)
totalQuery := fmt.Sprintf(`SELECT COUNT(*) FROM %s WHERE %s;`, format, fmtCondition(rpm))
// If aggregation is provided, add time_bucket and aggregation to the query
if rpm.Aggregation != "" {
q = fmt.Sprintf(`SELECT EXTRACT(epoch FROM time_bucket('%s', to_timestamp(time/1000))) AS time, %s(value) AS value FROM %s WHERE %s GROUP BY 1 ORDER BY time DESC LIMIT :limit OFFSET :offset;`, rpm.Interval, rpm.Aggregation, format, fmtCondition(rpm))
totalQuery = fmt.Sprintf(`SELECT COUNT(*) FROM (SELECT EXTRACT(epoch FROM time_bucket('%s', to_timestamp(time/1000))) AS time, %s(value) AS value FROM %s WHERE %s GROUP BY 1) AS subquery;`, rpm.Interval, rpm.Aggregation, format, fmtCondition(rpm))
}
params := map[string]interface{}{
"channel": chanID,
@@ -94,8 +101,7 @@ func (tr timescaleRepository) ReadAll(chanID string, rpm readers.PageMetadata) (
}
}
q = fmt.Sprintf(`SELECT COUNT(*) FROM %s WHERE %s;`, format, fmtCondition(chanID, rpm))
rows, err = tr.db.NamedQuery(q, params)
rows, err = tr.db.NamedQuery(totalQuery, params)
if err != nil {
return readers.MessagesPage{}, errors.Wrap(readers.ErrReadMessages, err)
}
@@ -112,7 +118,7 @@ func (tr timescaleRepository) ReadAll(chanID string, rpm readers.PageMetadata) (
return page, nil
}
func fmtCondition(chanID string, rpm readers.PageMetadata) string {
func fmtCondition(rpm readers.PageMetadata) string {
condition := `channel = :channel`
var query map[string]interface{}