NOISSUE - Fix aggregation query (#2370)

Signed-off-by: Musilah <nataleigh.nk@gmail.com>
This commit is contained in:
Nataly Musilah
2024-09-02 13:07:12 +03:00
committed by GitHub
parent 2eccae8097
commit 092e679103
2 changed files with 131 additions and 2 deletions
+5 -2
View File
@@ -41,9 +41,12 @@ func (tr timescaleRepository) ReadAll(chanID string, rpm readers.PageMetadata) (
totalQuery := fmt.Sprintf(`SELECT COUNT(*) FROM %s WHERE %s;`, format, fmtCondition(rpm))
// If aggregation is provided, add time_bucket and aggregation to the query
const timeDivisor = 1000000000
if rpm.Aggregation != "" {
q = fmt.Sprintf(`SELECT EXTRACT(epoch FROM time_bucket('%s', to_timestamp(time/1000000))) *1000000 AS time, publisher, protocol, subtopic, name, unit, %s(value) AS value FROM %s WHERE %s GROUP BY time, publisher, protocol, subtopic, name, unit ORDER BY time DESC LIMIT :limit OFFSET :offset;`, rpm.Interval, rpm.Aggregation, format, fmtCondition(rpm))
totalQuery = fmt.Sprintf(`SELECT COUNT(*) FROM (SELECT EXTRACT(epoch FROM time_bucket('%s', to_timestamp(time/1000000))) AS time, %s(value) AS value FROM %s WHERE %s GROUP BY time) AS subquery;`, rpm.Interval, rpm.Aggregation, format, fmtCondition(rpm))
q = fmt.Sprintf(`SELECT EXTRACT(epoch FROM time_bucket('%s', to_timestamp(time/%d))) *%d AS time, %s(value) AS value, FIRST(publisher, time) AS publisher, FIRST(protocol, time) AS protocol, FIRST(subtopic, time) AS subtopic, FIRST(name,time) AS name, FIRST(unit, time) AS unit FROM %s WHERE %s GROUP BY 1 ORDER BY time DESC LIMIT :limit OFFSET :offset;`, rpm.Interval, timeDivisor, timeDivisor, rpm.Aggregation, format, fmtCondition(rpm))
totalQuery = fmt.Sprintf(`SELECT COUNT(*) FROM (SELECT EXTRACT(epoch FROM time_bucket('%s', to_timestamp(time/%d))) AS time, %s(value) AS value FROM %s WHERE %s GROUP BY 1) AS subquery;`, rpm.Interval, timeDivisor, rpm.Aggregation, format, fmtCondition(rpm))
}
params := map[string]interface{}{
+126
View File
@@ -518,6 +518,132 @@ func TestReadSenml(t *testing.T) {
}
}
func TestReadMessagesWithAggregation(t *testing.T) {
writer := twriter.New(db)
chanID := testsutil.GenerateUUID(t)
pubID := testsutil.GenerateUUID(t)
messages := []senml.Message{}
now := float64(time.Now().UnixNano())
value := 10.0
for i := 0; i < 100; i++ {
if i%10 == 0 {
value += 10.0
}
v := value
msg := senml.Message{
Channel: chanID,
Publisher: pubID,
Time: now - float64(i*1000000000), // over 100 seconds
Value: &v,
Protocol: mqttProt,
}
messages = append(messages, msg)
}
err := writer.ConsumeBlocking(context.TODO(), messages)
require.Nil(t, err, "expected no error got %s\n", err)
reader := treader.New(db)
// Set up cases for aggregation readAll
cases := []struct {
desc string
chanID string
pageMeta readers.PageMetadata
page readers.MessagesPage
}{
{
desc: "read message page for existing channel with AVG aggregation over an hour",
chanID: chanID,
pageMeta: readers.PageMetadata{
Limit: 100,
Offset: 0,
Aggregation: "AVG",
Interval: "1 hour",
From: now - float64(100000000000),
To: now,
},
page: readers.MessagesPage{
Messages: fromSenml(messages),
},
},
{
desc: "read message page for existing channel with MAX aggregation over an hour",
chanID: chanID,
pageMeta: readers.PageMetadata{
Limit: 100,
Offset: 0,
Aggregation: "MAX",
Interval: "1 hour",
From: now - float64(100000000000),
To: now,
},
page: readers.MessagesPage{
Messages: fromSenml(messages),
},
},
{
desc: "read message page for existing channel with MIN aggregation over an hour",
chanID: chanID,
pageMeta: readers.PageMetadata{
Limit: 100,
Offset: 0,
Aggregation: "MIN",
Interval: "1 hour",
From: now - float64(100000000000),
To: now,
},
page: readers.MessagesPage{
Messages: fromSenml(messages),
},
},
{
desc: "read message page for existing channel with SUM aggregation over an hour",
chanID: chanID,
pageMeta: readers.PageMetadata{
Limit: 100,
Offset: 0,
Aggregation: "SUM",
Interval: "1 hour",
From: now - float64(100000000000),
To: now,
},
page: readers.MessagesPage{
Messages: fromSenml(messages),
},
},
{
desc: "read message page for existing channel with COUNT aggregation over an hour",
chanID: chanID,
pageMeta: readers.PageMetadata{
Limit: 100,
Offset: 0,
Aggregation: "COUNT",
Interval: "1 hour",
From: now - float64(100000000000),
To: now,
},
page: readers.MessagesPage{
Messages: fromSenml(messages),
},
},
}
for _, tc := range cases {
resultPage, err := reader.ReadAll(tc.chanID, tc.pageMeta)
assert.Nil(t, err, fmt.Sprintf("%s: expected no error got %s", tc.desc, err))
assert.NotEmpty(t, resultPage.Messages, "expected non-empty result set")
for i := range resultPage.Messages {
msg, ok := resultPage.Messages[i].(senml.Message)
if ok && msg.Value != nil {
assert.GreaterOrEqual(t, *msg.Value, resultPage.Value, "expected aggregated value to be greater or equal to the expected value")
}
}
}
}
func TestReadJSON(t *testing.T) {
writer := twriter.New(db)