diff --git a/cmd/timescale-reader/main.go b/cmd/timescale-reader/main.go index b2031d939..45393973f 100644 --- a/cmd/timescale-reader/main.go +++ b/cmd/timescale-reader/main.go @@ -13,6 +13,7 @@ import ( chclient "github.com/absmach/callhome/pkg/client" grpcReadersV1 "github.com/absmach/magistrala/api/grpc/readers/v1" + pgclient "github.com/absmach/magistrala/pkg/postgres" readersgrpcapi "github.com/absmach/magistrala/readers/api/grpc" httpapi "github.com/absmach/magistrala/readers/api/http" middleware "github.com/absmach/magistrala/readers/middleware" @@ -21,7 +22,6 @@ import ( smqlog "github.com/absmach/supermq/logger" "github.com/absmach/supermq/pkg/authn/authsvc" "github.com/absmach/supermq/pkg/grpcclient" - pgclient "github.com/absmach/supermq/pkg/postgres" "github.com/absmach/supermq/pkg/prometheus" "github.com/absmach/supermq/pkg/server" grpcserver "github.com/absmach/supermq/pkg/server/grpc" diff --git a/cmd/timescale-writer/main.go b/cmd/timescale-writer/main.go index 951a26374..2f88cddb9 100644 --- a/cmd/timescale-writer/main.go +++ b/cmd/timescale-writer/main.go @@ -17,12 +17,12 @@ import ( httpapi "github.com/absmach/magistrala/consumers/writers/api" "github.com/absmach/magistrala/consumers/writers/brokers" "github.com/absmach/magistrala/consumers/writers/timescale" + pgclient "github.com/absmach/magistrala/pkg/postgres" "github.com/absmach/supermq" "github.com/absmach/supermq/consumers" smqlog "github.com/absmach/supermq/logger" jaegerclient "github.com/absmach/supermq/pkg/jaeger" brokerstracing "github.com/absmach/supermq/pkg/messaging/brokers/tracing" - pgclient "github.com/absmach/supermq/pkg/postgres" "github.com/absmach/supermq/pkg/prometheus" "github.com/absmach/supermq/pkg/server" httpserver "github.com/absmach/supermq/pkg/server/http" diff --git a/consumers/writers/timescale/init.go b/consumers/writers/timescale/init.go index 3c1f6fa5f..d4e9525c1 100644 --- a/consumers/writers/timescale/init.go +++ b/consumers/writers/timescale/init.go @@ -26,14 +26,47 @@ func Migration() *migrate.MemoryMigrationSource { data_value BYTEA, sum FLOAT, update_time FLOAT, - PRIMARY KEY (time, publisher, channel, subtopic, name) - ); - SELECT create_hypertable('messages', 'time', create_default_indexes => FALSE, chunk_time_interval => 86400000, if_not_exists => TRUE);`, + PRIMARY KEY (time, channel, subtopic, protocol, publisher, name) + );`, + + // Creating HyperTable with chunks interval of 1 day = 86400000000000 Nanoseconds + "SELECT create_hypertable('messages', by_range('time', 86400000000000 ), if_not_exists => TRUE, migrate_data => TRUE);", }, Down: []string{ "DROP TABLE messages", }, }, + { + Id: "messages_2", + Up: []string{ + // Index on channel, time + "CREATE INDEX IF NOT EXISTS idx_channel_time ON messages (channel, time DESC) WITH (timescaledb.transaction_per_chunk);", + + // Index on channel, name, time + "CREATE INDEX IF NOT EXISTS idx_channel_name_time ON messages (channel, name, time DESC) WITH (timescaledb.transaction_per_chunk);", + + // Index on channel, subtopic, name, time + "CREATE INDEX IF NOT EXISTS idx_channel_subtopic_name_time ON messages (channel, subtopic, name, time DESC) WITH (timescaledb.transaction_per_chunk);", + + // Index on channel, publisher, name, time + "CREATE INDEX IF NOT EXISTS idx_channel_publisher_name_time ON messages (channel, publisher, name, time DESC) WITH (timescaledb.transaction_per_chunk);", + + // Index on channel, subtopic, publisher, name, time + "CREATE INDEX IF NOT EXISTS idx_channel_subtopic_publisher_name_time ON messages (channel, subtopic, publisher, name, time DESC) WITH (timescaledb.transaction_per_chunk);", + }, + DisableTransactionUp: true, + Down: []string{ + "DROP INDEX IF EXISTS idx_channel_time ;", + + "DROP INDEX IF EXISTS idx_channel_name_time ;", + + "DROP INDEX IF EXISTS idx_channel_subtopic_name_time ;", + + "DROP INDEX IF EXISTS idx_channel_publisher_name_time ;", + + "DROP INDEX IF EXISTS idx_channel_subtopic_publisher_name_time ;", + }, + }, }, } } diff --git a/docker/addons/timescale-writer/docker-compose.yaml b/docker/addons/timescale-writer/docker-compose.yaml index 224ffc8b1..1c4e26757 100644 --- a/docker/addons/timescale-writer/docker-compose.yaml +++ b/docker/addons/timescale-writer/docker-compose.yaml @@ -17,7 +17,7 @@ volumes: services: timescale: - image: timescale/timescaledb:2.13.1-pg16 + image: timescale/timescaledb:2.19.3-pg16-oss container_name: magistrala-timescale restart: on-failure environment: diff --git a/pkg/postgres/client.go b/pkg/postgres/client.go new file mode 100644 index 000000000..5d95a05b0 --- /dev/null +++ b/pkg/postgres/client.go @@ -0,0 +1,138 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package postgres + +import ( + "context" + "strings" + "time" + + "github.com/absmach/supermq/pkg/errors" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/jackc/pgx/v5/stdlib" + "github.com/jmoiron/sqlx" + migrate "github.com/rubenv/sql-migrate" +) + +var ( + errMigration = errors.New("failed to apply migrations") + errInvalidConnectionString = errors.New("invalid connection string") +) + +type PoolConfig struct { + // MaxConnLifetime is the duration since creation after which a connection will be automatically closed. + MaxConnLifetime time.Duration `env:"MAX_CONN_LIFETIME" envDefault:"1h"` + + // pool_max_conn_lifetime_jitter + MaxConnLifetimeJitter time.Duration `env:"MAX_CONN_LIFETIME_JITTER" envDefault:"0"` + + // MaxConnIdleTime is the duration after which an idle connection will be automatically closed by the health check. + MaxConnIdleTime time.Duration `env:"MAX_CONN_IDLE_TIME" envDefault:"15m"` + + // MaxConnLifetime is the duration since creation after which a connection will be automatically closed. + MaxConns uint16 `env:"MAX_CONNS" envDefault:"5"` + + // MinConns is the minimum size of the pool. After connection closes, the pool might dip below MinConns. A low + // number of MinConns might mean the pool is empty after MaxConnLifetime until the health check has a chance + // to create new connections. + MinConns uint16 `env:"MIN_CONNS" envDefault:"1"` + + // MinIdleConns is the minimum number of idle connections in the pool. You can increase this to ensure that + // there are always idle connections available. This can help reduce tail latencies during request processing, + // as you can avoid the latency of establishing a new connection while handling requests. It is superior + // to MinConns for this purpose. + // Similar to MinConns, the pool might temporarily dip below MinIdleConns after connection closes. + MinIdleConns uint16 `env:"MIN_IDLE_CONNS" envDefault:"1"` + + // HealthCheckPeriod is the duration between checks of the health of idle connections. + HealthCheckPeriod time.Duration `env:"HEALTH_CHECK_PERIOD" envDefault:"1m"` +} + +// Config defines the options that are used when connecting to a TimescaleSQL instance. +type Config struct { + Host string `env:"HOST" envDefault:"localhost"` + Port string `env:"PORT" envDefault:"5432"` + User string `env:"USER" envDefault:"supermq"` + Pass string `env:"PASS" envDefault:"supermq"` + Name string `env:"NAME" envDefault:""` + SSLMode string `env:"SSL_MODE" envDefault:"disable"` + SSLCert string `env:"SSL_CERT" envDefault:""` + SSLKey string `env:"SSL_KEY" envDefault:""` + SSLRootCert string `env:"SSL_ROOT_CERT" envDefault:""` + Pool PoolConfig `envPrefix:"POOL_"` +} + +// Setup creates a connection to the Postgres instance and applies any +// unapplied database migrations. A non-nil error is returned to indicate +// failure. +func Setup(cfg Config, migrations migrate.MemoryMigrationSource) (*sqlx.DB, error) { + db, err := Connect(cfg) + if err != nil { + return nil, err + } + + if _, err = migrate.Exec(db.DB, "postgres", migrations, migrate.Up); err != nil { + return nil, errors.Wrap(errMigration, err) + } + + return db, nil +} + +// Connect creates a connection to the Postgres instance. +func Connect(cfg Config) (*sqlx.DB, error) { + pgxPoolConfig, err := pgxpool.ParseConfig(cfg.dbConnURL()) + if err != nil { + return nil, errors.Wrap(errInvalidConnectionString, err) + } + + pgxPoolConfig.MaxConnIdleTime = cfg.Pool.MaxConnIdleTime + pgxPoolConfig.MaxConnLifetimeJitter = cfg.Pool.MaxConnLifetimeJitter + pgxPoolConfig.MaxConnLifetime = cfg.Pool.MaxConnLifetime + pgxPoolConfig.MaxConns = int32(cfg.Pool.MaxConns) + pgxPoolConfig.MinConns = int32(cfg.Pool.MinConns) + pgxPoolConfig.MinIdleConns = int32(cfg.Pool.MinIdleConns) + pgxPoolConfig.HealthCheckPeriod = cfg.Pool.HealthCheckPeriod + + dbpool, err := pgxpool.NewWithConfig(context.Background(), pgxPoolConfig) + if err != nil { + return nil, err + } + + sqlDB := stdlib.OpenDBFromPool(dbpool) + + return sqlx.NewDb(sqlDB, "pgx"), nil +} + +func (cfg Config) dbConnURL() string { + urlParts := []string{} + + if cfg.Host != "" { + urlParts = append(urlParts, "host="+cfg.Host) + } + if cfg.Port != "" { + urlParts = append(urlParts, "port="+cfg.Port) + } + if cfg.User != "" { + urlParts = append(urlParts, "user="+cfg.User) + } + if cfg.Pass != "" { + urlParts = append(urlParts, "password="+cfg.Pass) + } + if cfg.Name != "" { + urlParts = append(urlParts, "dbname="+cfg.Name) + } + if cfg.SSLMode != "" { + urlParts = append(urlParts, "sslmode="+cfg.SSLMode) + } + if cfg.SSLCert != "" { + urlParts = append(urlParts, "sslcert="+cfg.SSLCert) + } + if cfg.SSLKey != "" { + urlParts = append(urlParts, "sslkey="+cfg.SSLKey) + } + if cfg.SSLRootCert != "" { + urlParts = append(urlParts, "sslrootcert="+cfg.SSLRootCert) + } + return strings.Join(urlParts, " ") +} diff --git a/pkg/sdk/sdk.go b/pkg/sdk/sdk.go index 2069df19c..5a0fa7fda 100644 --- a/pkg/sdk/sdk.go +++ b/pkg/sdk/sdk.go @@ -42,6 +42,8 @@ type MessagePageMetadata struct { PageMetadata Subtopic string `json:"subtopic,omitempty"` Publisher string `json:"publisher,omitempty"` + Limit int `json:"limit,omitempty"` + Name string `json:"name,omitempty"` Comparator string `json:"comparator,omitempty"` BoolValue *bool `json:"vb,omitempty"` StringValue string `json:"vs,omitempty"` diff --git a/readers/timescale/init.go b/readers/timescale/init.go deleted file mode 100644 index adf1d06fa..000000000 --- a/readers/timescale/init.go +++ /dev/null @@ -1,80 +0,0 @@ -// Copyright (c) Abstract Machines -// SPDX-License-Identifier: Apache-2.0 - -package timescale - -import ( - "fmt" - - "github.com/jmoiron/sqlx" - migrate "github.com/rubenv/sql-migrate" -) - -// Table for SenML messages. -const defTable = "messages" - -// Config defines the options that are used when connecting to a TimescaleSQL instance. -type Config struct { - Host string `env:"HOST" envDefault:"localhost"` - Port string `env:"PORT" envDefault:"5432"` - User string `env:"USER" envDefault:"supermq"` - Pass string `env:"PASS" envDefault:"supermq"` - Name string `env:"NAME" envDefault:""` - SSLMode string `env:"SSL_MODE" envDefault:"disable"` - SSLCert string `env:"SSL_CERT" envDefault:""` - SSLKey string `env:"SSL_KEY" envDefault:""` - SSLRootCert string `env:"SSL_ROOT_CERT" envDefault:""` -} - -// Connect creates a connection to the TimescaleSQL instance and applies any -// unapplied database migrations. A non-nil error is returned to indicate -// failure. -func Connect(cfg Config) (*sqlx.DB, error) { - url := fmt.Sprintf("host=%s port=%s user=%s dbname=%s password=%s sslmode=%s sslcert=%s sslkey=%s sslrootcert=%s", cfg.Host, cfg.Port, cfg.User, cfg.Name, cfg.Pass, cfg.SSLMode, cfg.SSLCert, cfg.SSLKey, cfg.SSLRootCert) - - db, err := sqlx.Open("pgx", url) - if err != nil { - return nil, err - } - - if err := migrateDB(db); err != nil { - return nil, err - } - - return db, nil -} - -func migrateDB(db *sqlx.DB) error { - migrations := &migrate.MemoryMigrationSource{ - Migrations: []*migrate.Migration{ - { - Id: "messages_1", - Up: []string{ - `CREATE TABLE IF NOT EXISTS messages ( - time BIGINT NOT NULL, - channel UUID, - subtopic VARCHAR(254), - publisher UUID, - protocol TEXT, - name VARCHAR(254), - unit TEXT, - value FLOAT, - string_value TEXT, - bool_value BOOL, - data_value BYTEA, - sum FLOAT, - update_time FLOAT, - PRIMARY KEY (time, publisher, subtopic, name) - ); - SELECT create_hypertable('messages', 'time', create_default_indexes => FALSE, chunk_time_interval => 86400000, if_not_exists => TRUE);`, - }, - Down: []string{ - "DROP TABLE messages", - }, - }, - }, - } - - _, err := migrate.Exec(db.DB, "postgres", migrations, migrate.Up) - return err -} diff --git a/readers/timescale/messages.go b/readers/timescale/messages.go index 94c96bdd5..e9a1198a8 100644 --- a/readers/timescale/messages.go +++ b/readers/timescale/messages.go @@ -6,6 +6,7 @@ package timescale import ( "encoding/json" "fmt" + "strings" "github.com/absmach/supermq/pkg/errors" "github.com/absmach/supermq/pkg/transformers/senml" @@ -15,6 +16,9 @@ import ( "github.com/jmoiron/sqlx" // required for DB access ) +// Table for SenML messages. +const defTable = "messages" + var _ readers.MessageRepository = (*timescaleRepository)(nil) type timescaleRepository struct { @@ -44,7 +48,24 @@ func (tr timescaleRepository) ReadAll(chanID string, rpm readers.PageMetadata) ( const timeDivisor = 1000000000 if rpm.Aggregation != "" { - q = fmt.Sprintf(`SELECT EXTRACT(epoch FROM time_bucket('%s', to_timestamp(time/%d))) *%d AS time, %s(value) AS value, FIRST(publisher, time) AS publisher, FIRST(protocol, time) AS protocol, FIRST(subtopic, time) AS subtopic, FIRST(name,time) AS name, FIRST(unit, time) AS unit FROM %s WHERE %s GROUP BY 1 ORDER BY time DESC LIMIT :limit OFFSET :offset;`, rpm.Interval, timeDivisor, timeDivisor, rpm.Aggregation, format, fmtCondition(rpm)) + q = fmt.Sprintf(` + SELECT + EXTRACT(epoch FROM time_bucket('%s', to_timestamp(time/%d))) *%d AS time, + %s(value) AS value, + FIRST(publisher, time) AS publisher, + FIRST(protocol, time) AS protocol, + FIRST(subtopic, time) AS subtopic, + FIRST(name,time) AS name, + FIRST(unit, time) AS unit + FROM + %s + WHERE + %s + GROUP BY 1 + ORDER BY time DESC + LIMIT :limit OFFSET :offset; + `, + rpm.Interval, timeDivisor, timeDivisor, rpm.Aggregation, format, fmtCondition(rpm)) totalQuery = fmt.Sprintf(`SELECT COUNT(*) FROM (SELECT EXTRACT(epoch FROM time_bucket('%s', to_timestamp(time/%d))) AS time, %s(value) AS value FROM %s WHERE %s GROUP BY 1) AS subquery;`, rpm.Interval, timeDivisor, rpm.Aggregation, format, fmtCondition(rpm)) } @@ -122,54 +143,73 @@ func (tr timescaleRepository) ReadAll(chanID string, rpm readers.PageMetadata) ( } func fmtCondition(rpm readers.PageMetadata) string { - condition := `channel = :channel` + // Indexed columns conditions based on indices order. + chCondition := " channel = :channel " var query map[string]interface{} meta, err := json.Marshal(rpm) if err != nil { - return condition + return chCondition } if err := json.Unmarshal(meta, &query); err != nil { - return condition + return chCondition + } + + conditions := []string{chCondition} + + if _, ok := query["subtopic"]; ok { + conditions = append(conditions, " subtopic = :subtopic ") + } + + if _, ok := query["publisher"]; ok { + conditions = append(conditions, " publisher = :publisher ") + } + + if _, ok := query["name"]; ok { + conditions = append(conditions, " name = :name ") + } + + if _, ok := query["from"]; ok { + conditions = append(conditions, " time >= :from ") + } + + if _, ok := query["to"]; ok { + conditions = append(conditions, " time < :to ") + } + + // Non Indexed columns conditions added after indexed columns conditions order. + if _, ok := query["protocol"]; ok { + conditions = append(conditions, " protocol = :protocol ") } for name := range query { switch name { - case - "subtopic", - "publisher", - "name", - "protocol": - condition = fmt.Sprintf(`%s AND %s = :%s`, condition, name, name) case "v": comparator := readers.ParseValueComparator(query) - condition = fmt.Sprintf(`%s AND value %s :value`, condition, comparator) + conditions = append(conditions, fmt.Sprintf(" value %s :value ", comparator)) case "vb": - condition = fmt.Sprintf(`%s AND bool_value = :bool_value`, condition) + conditions = append(conditions, "bool_value = :bool_value") case "vs": comparator := readers.ParseValueComparator(query) switch comparator { case "=": - condition = fmt.Sprintf("%s AND string_value = :string_value ", condition) + conditions = append(conditions, " string_value = :string_value ") case ">": - condition = fmt.Sprintf("%s AND string_value LIKE '%%' || :string_value || '%%' AND string_value <> :string_value", condition) + conditions = append(conditions, " string_value LIKE '%%' || :string_value || '%%' AND string_value <> :string_value ") case ">=": - condition = fmt.Sprintf("%s AND string_value LIKE '%%' || :string_value || '%%'", condition) + conditions = append(conditions, " string_value LIKE '%%' || :string_value || '%%' ") case "<=": - condition = fmt.Sprintf("%s AND :string_value LIKE '%%' || string_value || '%%'", condition) + conditions = append(conditions, " :string_value LIKE '%%' || string_value || '%%' ") case "<": - condition = fmt.Sprintf("%s AND :string_value LIKE '%%' || string_value || '%%' AND string_value <> :string_value", condition) + conditions = append(conditions, " :string_value LIKE '%%' || string_value || '%%' AND string_value <> :string_value ") } case "vd": comparator := readers.ParseValueComparator(query) - condition = fmt.Sprintf(`%s AND data_value %s :data_value`, condition, comparator) - case "from": - condition = fmt.Sprintf(`%s AND time >= :from`, condition) - case "to": - condition = fmt.Sprintf(`%s AND time < :to`, condition) + conditions = append(conditions, fmt.Sprintf(" data_value %s :data_value ", comparator)) } } - return condition + + return strings.Join(conditions, " AND ") } type senmlMessage struct { diff --git a/readers/timescale/setup_test.go b/readers/timescale/setup_test.go index b4d14da50..26f84f3e3 100644 --- a/readers/timescale/setup_test.go +++ b/readers/timescale/setup_test.go @@ -10,8 +10,10 @@ import ( "log" "os" "testing" + "time" - "github.com/absmach/magistrala/readers/timescale" + tsWriter "github.com/absmach/magistrala/consumers/writers/timescale" + pgclient "github.com/absmach/magistrala/pkg/postgres" _ "github.com/jackc/pgx/v5/stdlib" // required for SQL access "github.com/jmoiron/sqlx" "github.com/ory/dockertest/v3" @@ -28,7 +30,7 @@ func TestMain(m *testing.M) { container, err := pool.RunWithOptions(&dockertest.RunOptions{ Repository: "timescale/timescaledb", - Tag: "2.13.1-pg16", + Tag: "2.19.3-pg16-oss", Env: []string{ "POSTGRES_USER=test", "POSTGRES_PASSWORD=test", @@ -56,7 +58,7 @@ func TestMain(m *testing.M) { log.Fatalf("Could not connect to docker: %s", err) } - dbConfig := timescale.Config{ + dbConfig := pgclient.Config{ Host: "localhost", Port: port, User: "test", @@ -66,9 +68,18 @@ func TestMain(m *testing.M) { SSLCert: "", SSLKey: "", SSLRootCert: "", + Pool: pgclient.PoolConfig{ + MaxConnLifetime: 1 * time.Hour, + MaxConnLifetimeJitter: time.Duration(0), + MaxConnIdleTime: 15 * time.Minute, + MaxConns: 5, + MinConns: 1, + MinIdleConns: 1, + HealthCheckPeriod: 1 * time.Minute, + }, } - if db, err = timescale.Connect(dbConfig); err != nil { + if db, err = pgclient.Setup(dbConfig, *tsWriter.Migration()); err != nil { log.Fatalf("Could not setup test DB connection: %s", err) } diff --git a/scripts/gen-ts-data/.gitignore b/scripts/gen-ts-data/.gitignore new file mode 100644 index 000000000..7607ea1c5 --- /dev/null +++ b/scripts/gen-ts-data/.gitignore @@ -0,0 +1,4 @@ +# Copyright (c) Abstract Machines +# SPDX-License-Identifier: Apache-2.0 + +*.csv diff --git a/scripts/gen-ts-data/gen-messages.py b/scripts/gen-ts-data/gen-messages.py new file mode 100644 index 000000000..7781e7c24 --- /dev/null +++ b/scripts/gen-ts-data/gen-messages.py @@ -0,0 +1,70 @@ +## Copyright (c) Abstract Machines +## SPDX-License-Identifier: Apache-2.0 + +## Script used to generate random data to do testing the timescaledb "messages" table +## The script generate CSV file, which can be loaded into data base using the below command +### psql command to copy csv +### psql -h 127.0.0.1 -U supermq -d supermq -p 5433 -W -c "\COPY messages (time, channel, subtopic, publisher, protocol, name, unit, value) FROM 'scripts/data-gen/messages.csv' WITH (FORMAT csv, HEADER)" + +import random +import uuid +from datetime import datetime, timedelta +import os + +num_channels=100 +num_subtopic=1 ## subtopic per channel +num_publisher=3 ## publisher per subtopic +num_metrics=10 ## metrics per publisher +last_num_days=30 +data_interval_minutes=15 + +# Prepare rows (all unique channel, subtopic, publisher, name combinations) + + +print(f"{num_channels} channels") +print(f"{num_subtopic} subtopics (per channel)") +print(f"{num_publisher} publishers (per subtopic)") +print(f"{num_metrics} names (per publisher)") +rows = [] +for channel_idx in range(1, num_channels+1): # (num_channels) channels + channel_id = uuid.uuid4() + for subtopic_idx in range(1, num_subtopic+1): # (num_subtopic_per_channel) subtopics per channel + subtopic = f"subtopic_{subtopic_idx}" + for publisher_idx in range(1, num_publisher+1): # (num_publisher_per_subtopic) publishers per subtopic + publisher_id = uuid.uuid4() + for name_idx in range(1, num_metrics+1): # (num_metrics_per_publisher) names per publisher + name = f"metric_{name_idx}" + rows.append({ + "channel": channel_id, + "subtopic": subtopic, + "publisher": publisher_id, + "name": name, + }) + +print(f"Total unique metric: {len(rows)}") + +start_time = datetime.now() - timedelta(days=last_num_days) +interval = timedelta(minutes=data_interval_minutes) +num_intervals = int((last_num_days * 24 * 60) / data_interval_minutes) # number of data_interval_minutes in last_num_days + +print(f"generating data for last {last_num_days} days with data interval of {data_interval_minutes} minutes which is {num_intervals} timestamps") + +script_directory = os.path.dirname(os.path.abspath(__file__)) + +data_file_path=f"{script_directory}/messages.csv" + + +# Open CSV file for writing +with open(data_file_path, 'w') as f: + f.write("time,channel,subtopic,publisher,protocol,name,unit,value\n") + + for i in range(num_intervals): + timestamp = start_time + (i * interval) + timestamp_ns = int(timestamp.timestamp() * 1_000_000_000) # nanoseconds + for row in rows: + value = random.uniform(0, 100) # generate a float64 value between 0 and 100 + f.write(f"{timestamp_ns},{row['channel']},{row['subtopic']},{row['publisher']},mqtt,{row['name']},unit,{value}\n") + +print(f"Finished writing CSV at {data_file_path} with {len(rows) * num_intervals} rows = {len(rows)} unique metrics (channel + subtopic + publisher + metric) x {num_intervals} timestamps") + +