NOISSUE - Update Timescale DB and queries (#147)

* updated timescaledb and queries

Signed-off-by: Arvindh <arvindh91@gmail.com>

* add pgx connection pool

Signed-off-by: Arvindh <arvindh91@gmail.com>

* add pgx config

Signed-off-by: Arvindh <arvindh91@gmail.com>

* add pgx config

Signed-off-by: Arvindh <arvindh91@gmail.com>

* add pgx config

Signed-off-by: Arvindh <arvindh91@gmail.com>

* fix message sdk and sql migration

Signed-off-by: Arvindh <arvindh91@gmail.com>

* updated pgx pool set config

Signed-off-by: Arvindh <arvindh91@gmail.com>

* clean up values

Signed-off-by: Arvindh <arvindh91@gmail.com>

* clean up values

Signed-off-by: Arvindh <arvindh91@gmail.com>

* change pgclient in test

Signed-off-by: Arvindh <arvindh91@gmail.com>

* remove comments

Signed-off-by: Arvindh <arvindh91@gmail.com>

* fix test

Signed-off-by: Arvindh <arvindh91@gmail.com>

---------

Signed-off-by: Arvindh <arvindh91@gmail.com>
This commit is contained in:
Arvindh
2025-05-14 14:10:53 +05:30
committed by GitHub
parent 47ed0aa112
commit c9a51947a3
11 changed files with 331 additions and 113 deletions
+1 -1
View File
@@ -13,6 +13,7 @@ import (
chclient "github.com/absmach/callhome/pkg/client"
grpcReadersV1 "github.com/absmach/magistrala/api/grpc/readers/v1"
pgclient "github.com/absmach/magistrala/pkg/postgres"
readersgrpcapi "github.com/absmach/magistrala/readers/api/grpc"
httpapi "github.com/absmach/magistrala/readers/api/http"
middleware "github.com/absmach/magistrala/readers/middleware"
@@ -21,7 +22,6 @@ import (
smqlog "github.com/absmach/supermq/logger"
"github.com/absmach/supermq/pkg/authn/authsvc"
"github.com/absmach/supermq/pkg/grpcclient"
pgclient "github.com/absmach/supermq/pkg/postgres"
"github.com/absmach/supermq/pkg/prometheus"
"github.com/absmach/supermq/pkg/server"
grpcserver "github.com/absmach/supermq/pkg/server/grpc"
+1 -1
View File
@@ -17,12 +17,12 @@ import (
httpapi "github.com/absmach/magistrala/consumers/writers/api"
"github.com/absmach/magistrala/consumers/writers/brokers"
"github.com/absmach/magistrala/consumers/writers/timescale"
pgclient "github.com/absmach/magistrala/pkg/postgres"
"github.com/absmach/supermq"
"github.com/absmach/supermq/consumers"
smqlog "github.com/absmach/supermq/logger"
jaegerclient "github.com/absmach/supermq/pkg/jaeger"
brokerstracing "github.com/absmach/supermq/pkg/messaging/brokers/tracing"
pgclient "github.com/absmach/supermq/pkg/postgres"
"github.com/absmach/supermq/pkg/prometheus"
"github.com/absmach/supermq/pkg/server"
httpserver "github.com/absmach/supermq/pkg/server/http"
+36 -3
View File
@@ -26,14 +26,47 @@ func Migration() *migrate.MemoryMigrationSource {
data_value BYTEA,
sum FLOAT,
update_time FLOAT,
PRIMARY KEY (time, publisher, channel, subtopic, name)
);
SELECT create_hypertable('messages', 'time', create_default_indexes => FALSE, chunk_time_interval => 86400000, if_not_exists => TRUE);`,
PRIMARY KEY (time, channel, subtopic, protocol, publisher, name)
);`,
// Creating HyperTable with chunks interval of 1 day = 86400000000000 Nanoseconds
"SELECT create_hypertable('messages', by_range('time', 86400000000000 ), if_not_exists => TRUE, migrate_data => TRUE);",
},
Down: []string{
"DROP TABLE messages",
},
},
{
Id: "messages_2",
Up: []string{
// Index on channel, time
"CREATE INDEX IF NOT EXISTS idx_channel_time ON messages (channel, time DESC) WITH (timescaledb.transaction_per_chunk);",
// Index on channel, name, time
"CREATE INDEX IF NOT EXISTS idx_channel_name_time ON messages (channel, name, time DESC) WITH (timescaledb.transaction_per_chunk);",
// Index on channel, subtopic, name, time
"CREATE INDEX IF NOT EXISTS idx_channel_subtopic_name_time ON messages (channel, subtopic, name, time DESC) WITH (timescaledb.transaction_per_chunk);",
// Index on channel, publisher, name, time
"CREATE INDEX IF NOT EXISTS idx_channel_publisher_name_time ON messages (channel, publisher, name, time DESC) WITH (timescaledb.transaction_per_chunk);",
// Index on channel, subtopic, publisher, name, time
"CREATE INDEX IF NOT EXISTS idx_channel_subtopic_publisher_name_time ON messages (channel, subtopic, publisher, name, time DESC) WITH (timescaledb.transaction_per_chunk);",
},
DisableTransactionUp: true,
Down: []string{
"DROP INDEX IF EXISTS idx_channel_time ;",
"DROP INDEX IF EXISTS idx_channel_name_time ;",
"DROP INDEX IF EXISTS idx_channel_subtopic_name_time ;",
"DROP INDEX IF EXISTS idx_channel_publisher_name_time ;",
"DROP INDEX IF EXISTS idx_channel_subtopic_publisher_name_time ;",
},
},
},
}
}
@@ -17,7 +17,7 @@ volumes:
services:
timescale:
image: timescale/timescaledb:2.13.1-pg16
image: timescale/timescaledb:2.19.3-pg16-oss
container_name: magistrala-timescale
restart: on-failure
environment:
+138
View File
@@ -0,0 +1,138 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package postgres
import (
"context"
"strings"
"time"
"github.com/absmach/supermq/pkg/errors"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/jackc/pgx/v5/stdlib"
"github.com/jmoiron/sqlx"
migrate "github.com/rubenv/sql-migrate"
)
var (
errMigration = errors.New("failed to apply migrations")
errInvalidConnectionString = errors.New("invalid connection string")
)
type PoolConfig struct {
// MaxConnLifetime is the duration since creation after which a connection will be automatically closed.
MaxConnLifetime time.Duration `env:"MAX_CONN_LIFETIME" envDefault:"1h"`
// pool_max_conn_lifetime_jitter
MaxConnLifetimeJitter time.Duration `env:"MAX_CONN_LIFETIME_JITTER" envDefault:"0"`
// MaxConnIdleTime is the duration after which an idle connection will be automatically closed by the health check.
MaxConnIdleTime time.Duration `env:"MAX_CONN_IDLE_TIME" envDefault:"15m"`
// MaxConnLifetime is the duration since creation after which a connection will be automatically closed.
MaxConns uint16 `env:"MAX_CONNS" envDefault:"5"`
// MinConns is the minimum size of the pool. After connection closes, the pool might dip below MinConns. A low
// number of MinConns might mean the pool is empty after MaxConnLifetime until the health check has a chance
// to create new connections.
MinConns uint16 `env:"MIN_CONNS" envDefault:"1"`
// MinIdleConns is the minimum number of idle connections in the pool. You can increase this to ensure that
// there are always idle connections available. This can help reduce tail latencies during request processing,
// as you can avoid the latency of establishing a new connection while handling requests. It is superior
// to MinConns for this purpose.
// Similar to MinConns, the pool might temporarily dip below MinIdleConns after connection closes.
MinIdleConns uint16 `env:"MIN_IDLE_CONNS" envDefault:"1"`
// HealthCheckPeriod is the duration between checks of the health of idle connections.
HealthCheckPeriod time.Duration `env:"HEALTH_CHECK_PERIOD" envDefault:"1m"`
}
// Config defines the options that are used when connecting to a TimescaleSQL instance.
type Config struct {
Host string `env:"HOST" envDefault:"localhost"`
Port string `env:"PORT" envDefault:"5432"`
User string `env:"USER" envDefault:"supermq"`
Pass string `env:"PASS" envDefault:"supermq"`
Name string `env:"NAME" envDefault:""`
SSLMode string `env:"SSL_MODE" envDefault:"disable"`
SSLCert string `env:"SSL_CERT" envDefault:""`
SSLKey string `env:"SSL_KEY" envDefault:""`
SSLRootCert string `env:"SSL_ROOT_CERT" envDefault:""`
Pool PoolConfig `envPrefix:"POOL_"`
}
// Setup creates a connection to the Postgres instance and applies any
// unapplied database migrations. A non-nil error is returned to indicate
// failure.
func Setup(cfg Config, migrations migrate.MemoryMigrationSource) (*sqlx.DB, error) {
db, err := Connect(cfg)
if err != nil {
return nil, err
}
if _, err = migrate.Exec(db.DB, "postgres", migrations, migrate.Up); err != nil {
return nil, errors.Wrap(errMigration, err)
}
return db, nil
}
// Connect creates a connection to the Postgres instance.
func Connect(cfg Config) (*sqlx.DB, error) {
pgxPoolConfig, err := pgxpool.ParseConfig(cfg.dbConnURL())
if err != nil {
return nil, errors.Wrap(errInvalidConnectionString, err)
}
pgxPoolConfig.MaxConnIdleTime = cfg.Pool.MaxConnIdleTime
pgxPoolConfig.MaxConnLifetimeJitter = cfg.Pool.MaxConnLifetimeJitter
pgxPoolConfig.MaxConnLifetime = cfg.Pool.MaxConnLifetime
pgxPoolConfig.MaxConns = int32(cfg.Pool.MaxConns)
pgxPoolConfig.MinConns = int32(cfg.Pool.MinConns)
pgxPoolConfig.MinIdleConns = int32(cfg.Pool.MinIdleConns)
pgxPoolConfig.HealthCheckPeriod = cfg.Pool.HealthCheckPeriod
dbpool, err := pgxpool.NewWithConfig(context.Background(), pgxPoolConfig)
if err != nil {
return nil, err
}
sqlDB := stdlib.OpenDBFromPool(dbpool)
return sqlx.NewDb(sqlDB, "pgx"), nil
}
func (cfg Config) dbConnURL() string {
urlParts := []string{}
if cfg.Host != "" {
urlParts = append(urlParts, "host="+cfg.Host)
}
if cfg.Port != "" {
urlParts = append(urlParts, "port="+cfg.Port)
}
if cfg.User != "" {
urlParts = append(urlParts, "user="+cfg.User)
}
if cfg.Pass != "" {
urlParts = append(urlParts, "password="+cfg.Pass)
}
if cfg.Name != "" {
urlParts = append(urlParts, "dbname="+cfg.Name)
}
if cfg.SSLMode != "" {
urlParts = append(urlParts, "sslmode="+cfg.SSLMode)
}
if cfg.SSLCert != "" {
urlParts = append(urlParts, "sslcert="+cfg.SSLCert)
}
if cfg.SSLKey != "" {
urlParts = append(urlParts, "sslkey="+cfg.SSLKey)
}
if cfg.SSLRootCert != "" {
urlParts = append(urlParts, "sslrootcert="+cfg.SSLRootCert)
}
return strings.Join(urlParts, " ")
}
+2
View File
@@ -42,6 +42,8 @@ type MessagePageMetadata struct {
PageMetadata
Subtopic string `json:"subtopic,omitempty"`
Publisher string `json:"publisher,omitempty"`
Limit int `json:"limit,omitempty"`
Name string `json:"name,omitempty"`
Comparator string `json:"comparator,omitempty"`
BoolValue *bool `json:"vb,omitempty"`
StringValue string `json:"vs,omitempty"`
-80
View File
@@ -1,80 +0,0 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package timescale
import (
"fmt"
"github.com/jmoiron/sqlx"
migrate "github.com/rubenv/sql-migrate"
)
// Table for SenML messages.
const defTable = "messages"
// Config defines the options that are used when connecting to a TimescaleSQL instance.
type Config struct {
Host string `env:"HOST" envDefault:"localhost"`
Port string `env:"PORT" envDefault:"5432"`
User string `env:"USER" envDefault:"supermq"`
Pass string `env:"PASS" envDefault:"supermq"`
Name string `env:"NAME" envDefault:""`
SSLMode string `env:"SSL_MODE" envDefault:"disable"`
SSLCert string `env:"SSL_CERT" envDefault:""`
SSLKey string `env:"SSL_KEY" envDefault:""`
SSLRootCert string `env:"SSL_ROOT_CERT" envDefault:""`
}
// Connect creates a connection to the TimescaleSQL instance and applies any
// unapplied database migrations. A non-nil error is returned to indicate
// failure.
func Connect(cfg Config) (*sqlx.DB, error) {
url := fmt.Sprintf("host=%s port=%s user=%s dbname=%s password=%s sslmode=%s sslcert=%s sslkey=%s sslrootcert=%s", cfg.Host, cfg.Port, cfg.User, cfg.Name, cfg.Pass, cfg.SSLMode, cfg.SSLCert, cfg.SSLKey, cfg.SSLRootCert)
db, err := sqlx.Open("pgx", url)
if err != nil {
return nil, err
}
if err := migrateDB(db); err != nil {
return nil, err
}
return db, nil
}
func migrateDB(db *sqlx.DB) error {
migrations := &migrate.MemoryMigrationSource{
Migrations: []*migrate.Migration{
{
Id: "messages_1",
Up: []string{
`CREATE TABLE IF NOT EXISTS messages (
time BIGINT NOT NULL,
channel UUID,
subtopic VARCHAR(254),
publisher UUID,
protocol TEXT,
name VARCHAR(254),
unit TEXT,
value FLOAT,
string_value TEXT,
bool_value BOOL,
data_value BYTEA,
sum FLOAT,
update_time FLOAT,
PRIMARY KEY (time, publisher, subtopic, name)
);
SELECT create_hypertable('messages', 'time', create_default_indexes => FALSE, chunk_time_interval => 86400000, if_not_exists => TRUE);`,
},
Down: []string{
"DROP TABLE messages",
},
},
},
}
_, err := migrate.Exec(db.DB, "postgres", migrations, migrate.Up)
return err
}
+63 -23
View File
@@ -6,6 +6,7 @@ package timescale
import (
"encoding/json"
"fmt"
"strings"
"github.com/absmach/supermq/pkg/errors"
"github.com/absmach/supermq/pkg/transformers/senml"
@@ -15,6 +16,9 @@ import (
"github.com/jmoiron/sqlx" // required for DB access
)
// Table for SenML messages.
const defTable = "messages"
var _ readers.MessageRepository = (*timescaleRepository)(nil)
type timescaleRepository struct {
@@ -44,7 +48,24 @@ func (tr timescaleRepository) ReadAll(chanID string, rpm readers.PageMetadata) (
const timeDivisor = 1000000000
if rpm.Aggregation != "" {
q = fmt.Sprintf(`SELECT EXTRACT(epoch FROM time_bucket('%s', to_timestamp(time/%d))) *%d AS time, %s(value) AS value, FIRST(publisher, time) AS publisher, FIRST(protocol, time) AS protocol, FIRST(subtopic, time) AS subtopic, FIRST(name,time) AS name, FIRST(unit, time) AS unit FROM %s WHERE %s GROUP BY 1 ORDER BY time DESC LIMIT :limit OFFSET :offset;`, rpm.Interval, timeDivisor, timeDivisor, rpm.Aggregation, format, fmtCondition(rpm))
q = fmt.Sprintf(`
SELECT
EXTRACT(epoch FROM time_bucket('%s', to_timestamp(time/%d))) *%d AS time,
%s(value) AS value,
FIRST(publisher, time) AS publisher,
FIRST(protocol, time) AS protocol,
FIRST(subtopic, time) AS subtopic,
FIRST(name,time) AS name,
FIRST(unit, time) AS unit
FROM
%s
WHERE
%s
GROUP BY 1
ORDER BY time DESC
LIMIT :limit OFFSET :offset;
`,
rpm.Interval, timeDivisor, timeDivisor, rpm.Aggregation, format, fmtCondition(rpm))
totalQuery = fmt.Sprintf(`SELECT COUNT(*) FROM (SELECT EXTRACT(epoch FROM time_bucket('%s', to_timestamp(time/%d))) AS time, %s(value) AS value FROM %s WHERE %s GROUP BY 1) AS subquery;`, rpm.Interval, timeDivisor, rpm.Aggregation, format, fmtCondition(rpm))
}
@@ -122,54 +143,73 @@ func (tr timescaleRepository) ReadAll(chanID string, rpm readers.PageMetadata) (
}
func fmtCondition(rpm readers.PageMetadata) string {
condition := `channel = :channel`
// Indexed columns conditions based on indices order.
chCondition := " channel = :channel "
var query map[string]interface{}
meta, err := json.Marshal(rpm)
if err != nil {
return condition
return chCondition
}
if err := json.Unmarshal(meta, &query); err != nil {
return condition
return chCondition
}
conditions := []string{chCondition}
if _, ok := query["subtopic"]; ok {
conditions = append(conditions, " subtopic = :subtopic ")
}
if _, ok := query["publisher"]; ok {
conditions = append(conditions, " publisher = :publisher ")
}
if _, ok := query["name"]; ok {
conditions = append(conditions, " name = :name ")
}
if _, ok := query["from"]; ok {
conditions = append(conditions, " time >= :from ")
}
if _, ok := query["to"]; ok {
conditions = append(conditions, " time < :to ")
}
// Non Indexed columns conditions added after indexed columns conditions order.
if _, ok := query["protocol"]; ok {
conditions = append(conditions, " protocol = :protocol ")
}
for name := range query {
switch name {
case
"subtopic",
"publisher",
"name",
"protocol":
condition = fmt.Sprintf(`%s AND %s = :%s`, condition, name, name)
case "v":
comparator := readers.ParseValueComparator(query)
condition = fmt.Sprintf(`%s AND value %s :value`, condition, comparator)
conditions = append(conditions, fmt.Sprintf(" value %s :value ", comparator))
case "vb":
condition = fmt.Sprintf(`%s AND bool_value = :bool_value`, condition)
conditions = append(conditions, "bool_value = :bool_value")
case "vs":
comparator := readers.ParseValueComparator(query)
switch comparator {
case "=":
condition = fmt.Sprintf("%s AND string_value = :string_value ", condition)
conditions = append(conditions, " string_value = :string_value ")
case ">":
condition = fmt.Sprintf("%s AND string_value LIKE '%%' || :string_value || '%%' AND string_value <> :string_value", condition)
conditions = append(conditions, " string_value LIKE '%%' || :string_value || '%%' AND string_value <> :string_value ")
case ">=":
condition = fmt.Sprintf("%s AND string_value LIKE '%%' || :string_value || '%%'", condition)
conditions = append(conditions, " string_value LIKE '%%' || :string_value || '%%' ")
case "<=":
condition = fmt.Sprintf("%s AND :string_value LIKE '%%' || string_value || '%%'", condition)
conditions = append(conditions, " :string_value LIKE '%%' || string_value || '%%' ")
case "<":
condition = fmt.Sprintf("%s AND :string_value LIKE '%%' || string_value || '%%' AND string_value <> :string_value", condition)
conditions = append(conditions, " :string_value LIKE '%%' || string_value || '%%' AND string_value <> :string_value ")
}
case "vd":
comparator := readers.ParseValueComparator(query)
condition = fmt.Sprintf(`%s AND data_value %s :data_value`, condition, comparator)
case "from":
condition = fmt.Sprintf(`%s AND time >= :from`, condition)
case "to":
condition = fmt.Sprintf(`%s AND time < :to`, condition)
conditions = append(conditions, fmt.Sprintf(" data_value %s :data_value ", comparator))
}
}
return condition
return strings.Join(conditions, " AND ")
}
type senmlMessage struct {
+15 -4
View File
@@ -10,8 +10,10 @@ import (
"log"
"os"
"testing"
"time"
"github.com/absmach/magistrala/readers/timescale"
tsWriter "github.com/absmach/magistrala/consumers/writers/timescale"
pgclient "github.com/absmach/magistrala/pkg/postgres"
_ "github.com/jackc/pgx/v5/stdlib" // required for SQL access
"github.com/jmoiron/sqlx"
"github.com/ory/dockertest/v3"
@@ -28,7 +30,7 @@ func TestMain(m *testing.M) {
container, err := pool.RunWithOptions(&dockertest.RunOptions{
Repository: "timescale/timescaledb",
Tag: "2.13.1-pg16",
Tag: "2.19.3-pg16-oss",
Env: []string{
"POSTGRES_USER=test",
"POSTGRES_PASSWORD=test",
@@ -56,7 +58,7 @@ func TestMain(m *testing.M) {
log.Fatalf("Could not connect to docker: %s", err)
}
dbConfig := timescale.Config{
dbConfig := pgclient.Config{
Host: "localhost",
Port: port,
User: "test",
@@ -66,9 +68,18 @@ func TestMain(m *testing.M) {
SSLCert: "",
SSLKey: "",
SSLRootCert: "",
Pool: pgclient.PoolConfig{
MaxConnLifetime: 1 * time.Hour,
MaxConnLifetimeJitter: time.Duration(0),
MaxConnIdleTime: 15 * time.Minute,
MaxConns: 5,
MinConns: 1,
MinIdleConns: 1,
HealthCheckPeriod: 1 * time.Minute,
},
}
if db, err = timescale.Connect(dbConfig); err != nil {
if db, err = pgclient.Setup(dbConfig, *tsWriter.Migration()); err != nil {
log.Fatalf("Could not setup test DB connection: %s", err)
}
+4
View File
@@ -0,0 +1,4 @@
# Copyright (c) Abstract Machines
# SPDX-License-Identifier: Apache-2.0
*.csv
+70
View File
@@ -0,0 +1,70 @@
## Copyright (c) Abstract Machines
## SPDX-License-Identifier: Apache-2.0
## Script used to generate random data to do testing the timescaledb "messages" table
## The script generate CSV file, which can be loaded into data base using the below command
### psql command to copy csv
### psql -h 127.0.0.1 -U supermq -d supermq -p 5433 -W -c "\COPY messages (time, channel, subtopic, publisher, protocol, name, unit, value) FROM 'scripts/data-gen/messages.csv' WITH (FORMAT csv, HEADER)"
import random
import uuid
from datetime import datetime, timedelta
import os
num_channels=100
num_subtopic=1 ## subtopic per channel
num_publisher=3 ## publisher per subtopic
num_metrics=10 ## metrics per publisher
last_num_days=30
data_interval_minutes=15
# Prepare rows (all unique channel, subtopic, publisher, name combinations)
print(f"{num_channels} channels")
print(f"{num_subtopic} subtopics (per channel)")
print(f"{num_publisher} publishers (per subtopic)")
print(f"{num_metrics} names (per publisher)")
rows = []
for channel_idx in range(1, num_channels+1): # (num_channels) channels
channel_id = uuid.uuid4()
for subtopic_idx in range(1, num_subtopic+1): # (num_subtopic_per_channel) subtopics per channel
subtopic = f"subtopic_{subtopic_idx}"
for publisher_idx in range(1, num_publisher+1): # (num_publisher_per_subtopic) publishers per subtopic
publisher_id = uuid.uuid4()
for name_idx in range(1, num_metrics+1): # (num_metrics_per_publisher) names per publisher
name = f"metric_{name_idx}"
rows.append({
"channel": channel_id,
"subtopic": subtopic,
"publisher": publisher_id,
"name": name,
})
print(f"Total unique metric: {len(rows)}")
start_time = datetime.now() - timedelta(days=last_num_days)
interval = timedelta(minutes=data_interval_minutes)
num_intervals = int((last_num_days * 24 * 60) / data_interval_minutes) # number of data_interval_minutes in last_num_days
print(f"generating data for last {last_num_days} days with data interval of {data_interval_minutes} minutes which is {num_intervals} timestamps")
script_directory = os.path.dirname(os.path.abspath(__file__))
data_file_path=f"{script_directory}/messages.csv"
# Open CSV file for writing
with open(data_file_path, 'w') as f:
f.write("time,channel,subtopic,publisher,protocol,name,unit,value\n")
for i in range(num_intervals):
timestamp = start_time + (i * interval)
timestamp_ns = int(timestamp.timestamp() * 1_000_000_000) # nanoseconds
for row in rows:
value = random.uniform(0, 100) # generate a float64 value between 0 and 100
f.write(f"{timestamp_ns},{row['channel']},{row['subtopic']},{row['publisher']},mqtt,{row['name']},unit,{value}\n")
print(f"Finished writing CSV at {data_file_path} with {len(rows) * num_intervals} rows = {len(rows)} unique metrics (channel + subtopic + publisher + metric) x {num_intervals} timestamps")