NOISSUE - Use NATS as MQTT broker (#2681)

Signed-off-by: Rodney Osodo <socials@rodneyosodo.com>
This commit is contained in:
b1ackd0t
2025-02-03 20:17:35 +03:00
committed by GitHub
parent 7c6bc9cd7e
commit cff6e7f085
23 changed files with 92 additions and 1651 deletions
+1 -1
View File
@@ -48,7 +48,7 @@ jobs:
- name: Install Go
uses: actions/setup-go@v5
with:
go-version: 1.22.x
go-version: 1.23.x
cache-dependency-path: "go.sum"
- name: Build images
+1 -1
View File
@@ -24,7 +24,7 @@ jobs:
- name: Install Go
uses: actions/setup-go@v5
with:
go-version: 1.22.x
go-version: 1.23.x
cache-dependency-path: "go.sum"
- name: Run tests
+1 -1
View File
@@ -21,7 +21,7 @@ jobs:
- name: Install Go
uses: actions/setup-go@v5
with:
go-version: 1.22.x
go-version: 1.23.x
cache-dependency-path: "go.sum"
- name: Check for changes in go.mod
+1 -1
View File
@@ -21,7 +21,7 @@ jobs:
- name: Check License Header
run: |
CHECK=""
for file in $(grep -rl --exclude-dir={.git,build,**vernemq**} \
for file in $(grep -rl --exclude-dir={.git,build} \
--exclude=\*.{crt,key,pem,zed,hcl,md,json,csv,mod,sum,tmpl,args} \
--exclude={CODEOWNERS,LICENSE,MAINTAINERS} \
.); do
+2 -2
View File
@@ -20,7 +20,7 @@ jobs:
- name: Setup Go
uses: actions/setup-go@v5
with:
go-version: 1.22.x
go-version: 1.23.x
cache-dependency-path: "go.sum"
- name: Install protolint
@@ -63,7 +63,7 @@ jobs:
- name: Setup Go
uses: actions/setup-go@v5
with:
go-version: 1.22.x
go-version: 1.23.x
cache-dependency-path: "go.sum"
- name: Check for changes in specific paths
+15 -52
View File
@@ -26,7 +26,6 @@ import (
smqlog "github.com/absmach/supermq/logger"
"github.com/absmach/supermq/mqtt"
"github.com/absmach/supermq/mqtt/events"
mqtttracing "github.com/absmach/supermq/mqtt/tracing"
"github.com/absmach/supermq/pkg/errors"
"github.com/absmach/supermq/pkg/grpcclient"
jaegerclient "github.com/absmach/supermq/pkg/jaeger"
@@ -34,7 +33,6 @@ import (
brokerstracing "github.com/absmach/supermq/pkg/messaging/brokers/tracing"
msgevents "github.com/absmach/supermq/pkg/messaging/events"
"github.com/absmach/supermq/pkg/messaging/handler"
mqttpub "github.com/absmach/supermq/pkg/messaging/mqtt"
"github.com/absmach/supermq/pkg/server"
"github.com/absmach/supermq/pkg/uuid"
"github.com/caarlos0/env/v11"
@@ -50,24 +48,21 @@ const (
)
type config struct {
LogLevel string `env:"SMQ_MQTT_ADAPTER_LOG_LEVEL" envDefault:"info"`
MQTTPort string `env:"SMQ_MQTT_ADAPTER_MQTT_PORT" envDefault:"1883"`
MQTTTargetHost string `env:"SMQ_MQTT_ADAPTER_MQTT_TARGET_HOST" envDefault:"localhost"`
MQTTTargetPort string `env:"SMQ_MQTT_ADAPTER_MQTT_TARGET_PORT" envDefault:"1883"`
MQTTForwarderTimeout time.Duration `env:"SMQ_MQTT_ADAPTER_FORWARDER_TIMEOUT" envDefault:"30s"`
MQTTTargetHealthCheck string `env:"SMQ_MQTT_ADAPTER_MQTT_TARGET_HEALTH_CHECK" envDefault:""`
MQTTQoS uint8 `env:"SMQ_MQTT_ADAPTER_MQTT_QOS" envDefault:"1"`
HTTPPort string `env:"SMQ_MQTT_ADAPTER_WS_PORT" envDefault:"8080"`
HTTPTargetHost string `env:"SMQ_MQTT_ADAPTER_WS_TARGET_HOST" envDefault:"localhost"`
HTTPTargetPort string `env:"SMQ_MQTT_ADAPTER_WS_TARGET_PORT" envDefault:"8080"`
HTTPTargetPath string `env:"SMQ_MQTT_ADAPTER_WS_TARGET_PATH" envDefault:"/mqtt"`
Instance string `env:"SMQ_MQTT_ADAPTER_INSTANCE" envDefault:""`
JaegerURL url.URL `env:"SMQ_JAEGER_URL" envDefault:"http://localhost:4318/v1/traces"`
BrokerURL string `env:"SMQ_MESSAGE_BROKER_URL" envDefault:"nats://localhost:4222"`
SendTelemetry bool `env:"SMQ_SEND_TELEMETRY" envDefault:"true"`
InstanceID string `env:"SMQ_MQTT_ADAPTER_INSTANCE_ID" envDefault:""`
ESURL string `env:"SMQ_ES_URL" envDefault:"nats://localhost:4222"`
TraceRatio float64 `env:"SMQ_JAEGER_TRACE_RATIO" envDefault:"1.0"`
LogLevel string `env:"SMQ_MQTT_ADAPTER_LOG_LEVEL" envDefault:"info"`
MQTTPort string `env:"SMQ_MQTT_ADAPTER_MQTT_PORT" envDefault:"1883"`
MQTTTargetHost string `env:"SMQ_MQTT_ADAPTER_MQTT_TARGET_HOST" envDefault:"localhost"`
MQTTTargetPort string `env:"SMQ_MQTT_ADAPTER_MQTT_TARGET_PORT" envDefault:"1883"`
MQTTTargetHealthCheck string `env:"SMQ_MQTT_ADAPTER_MQTT_TARGET_HEALTH_CHECK" envDefault:""`
HTTPPort string `env:"SMQ_MQTT_ADAPTER_WS_PORT" envDefault:"8080"`
HTTPTargetHost string `env:"SMQ_MQTT_ADAPTER_WS_TARGET_HOST" envDefault:"localhost"`
HTTPTargetPort string `env:"SMQ_MQTT_ADAPTER_WS_TARGET_PORT" envDefault:"8080"`
Instance string `env:"SMQ_MQTT_ADAPTER_INSTANCE" envDefault:""`
JaegerURL url.URL `env:"SMQ_JAEGER_URL" envDefault:"http://localhost:4318/v1/traces"`
BrokerURL string `env:"SMQ_MESSAGE_BROKER_URL" envDefault:"nats://localhost:4222"`
SendTelemetry bool `env:"SMQ_SEND_TELEMETRY" envDefault:"true"`
InstanceID string `env:"SMQ_MQTT_ADAPTER_INSTANCE_ID" envDefault:""`
ESURL string `env:"SMQ_ES_URL" envDefault:"nats://localhost:4222"`
TraceRatio float64 `env:"SMQ_JAEGER_TRACE_RATIO" envDefault:"1.0"`
}
func main() {
@@ -126,38 +121,6 @@ func main() {
}()
tracer := tp.Tracer(svcName)
bsub, err := brokers.NewPubSub(ctx, cfg.BrokerURL, logger)
if err != nil {
logger.Error(fmt.Sprintf("failed to connect to message broker: %s", err))
exitCode = 1
return
}
defer bsub.Close()
bsub = brokerstracing.NewPubSub(serverConfig, tracer, bsub)
mpub, err := mqttpub.NewPublisher(fmt.Sprintf("mqtt://%s:%s", cfg.MQTTTargetHost, cfg.MQTTTargetPort), cfg.MQTTQoS, cfg.MQTTForwarderTimeout)
if err != nil {
logger.Error(fmt.Sprintf("failed to create MQTT publisher: %s", err))
exitCode = 1
return
}
defer mpub.Close()
mpub, err = msgevents.NewPublisherMiddleware(ctx, mpub, cfg.ESURL)
if err != nil {
logger.Error(fmt.Sprintf("failed to create event store middleware: %s", err))
exitCode = 1
return
}
fwd := mqtt.NewForwarder(brokers.SubjectAllChannels, logger)
fwd = mqtttracing.New(serverConfig, tracer, fwd, brokers.SubjectAllChannels)
if err := fwd.Forward(ctx, svcName, bsub, mpub); err != nil {
logger.Error(fmt.Sprintf("failed to forward message broker messages: %s", err))
exitCode = 1
return
}
np, err := brokers.NewPublisher(ctx, cfg.BrokerURL)
if err != nil {
logger.Error(fmt.Sprintf("failed to connect to message broker: %s", err))
+5 -16
View File
@@ -20,7 +20,6 @@ SMQ_NATS_URL=nats://nats:${SMQ_NATS_PORT}
# Configs for nats as MQTT broker
SMQ_NATS_HEALTH_CHECK=http://nats:${SMQ_NATS_HTTP_PORT}/healthz
SMQ_NATS_WS_TARGET_PATH=
SMQ_NATS_MQTT_QOS=1
## RabbitMQ
SMQ_RABBITMQ_PORT=5672
@@ -35,23 +34,14 @@ SMQ_RABBITMQ_URL=amqp://${SMQ_RABBITMQ_USER}:${SMQ_RABBITMQ_PASS}@rabbitmq:${SMQ
SMQ_MESSAGE_BROKER_TYPE=nats
SMQ_MESSAGE_BROKER_URL=${SMQ_NATS_URL}
## VERNEMQ
SMQ_DOCKER_VERNEMQ_ALLOW_ANONYMOUS=on
SMQ_DOCKER_VERNEMQ_LOG__CONSOLE__LEVEL=error
SMQ_VERNEMQ_HEALTH_CHECK=http://vernemq:8888/health
SMQ_VERNEMQ_WS_TARGET_PATH=/mqtt
SMQ_VERNEMQ_MQTT_QOS=2
## MQTT Broker
SMQ_MQTT_BROKER_TYPE=vernemq
SMQ_MQTT_BROKER_HEALTH_CHECK=${SMQ_VERNEMQ_HEALTH_CHECK}
SMQ_MQTT_ADAPTER_MQTT_QOS=${SMQ_VERNEMQ_MQTT_QOS}
SMQ_MQTT_BROKER_TYPE=nats
SMQ_MQTT_BROKER_HEALTH_CHECK=${SMQ_NATS_HEALTH_CHECK}
SMQ_MQTT_ADAPTER_MQTT_TARGET_HOST=${SMQ_MQTT_BROKER_TYPE}
SMQ_MQTT_ADAPTER_MQTT_TARGET_PORT=1883
SMQ_MQTT_ADAPTER_MQTT_TARGET_HEALTH_CHECK=${SMQ_MQTT_BROKER_HEALTH_CHECK}
SMQ_MQTT_ADAPTER_WS_TARGET_HOST=${SMQ_MQTT_BROKER_TYPE}
SMQ_MQTT_ADAPTER_WS_TARGET_PORT=8080
SMQ_MQTT_ADAPTER_WS_TARGET_PATH=${SMQ_VERNEMQ_WS_TARGET_PATH}
## Redis
SMQ_REDIS_TCP_PORT=6379
@@ -70,7 +60,7 @@ SMQ_JAEGER_TRACE_RATIO=1.0
SMQ_JAEGER_MEMORY_MAX_TRACES=5000
## Call home
SMQ_SEND_TELEMETRY=true
SMQ_SEND_TELEMETRY=false
## Postgres
SMQ_POSTGRES_MAX_CONNECTIONS=100
@@ -310,7 +300,7 @@ SMQ_CLIENTS_INSTANCE_ID=
#### Clients Client Config
SMQ_CLIENTS_URL=http://clients:9006
SMQ_CLIENTS_AUTH_GRPC_URL=clients:7006
SMQ_CLIENTS_AUTH_GRPC_TIMEOUT=1s
SMQ_CLIENTS_AUTH_GRPC_TIMEOUT=300s
SMQ_CLIENTS_AUTH_GRPC_CLIENT_CERT=${GRPC_MTLS:+./ssl/certs/clients-grpc-client.crt}
SMQ_CLIENTS_AUTH_GRPC_CLIENT_KEY=${GRPC_MTLS:+./ssl/certs/clients-grpc-client.key}
SMQ_CLIENTS_AUTH_GRPC_CLIENT_CA_CERTS=${GRPC_MTLS:+./ssl/certs/ca.crt}
@@ -338,7 +328,7 @@ SMQ_CHANNELS_INSTANCE_ID=
#### Channels Client Config
SMQ_CHANNELS_URL=http://channels:9005
SMQ_CHANNELS_GRPC_URL=channels:7005
SMQ_CHANNELS_GRPC_TIMEOUT=1s
SMQ_CHANNELS_GRPC_TIMEOUT=300s
SMQ_CHANNELS_GRPC_CLIENT_CERT=${GRPC_MTLS:+./ssl/certs/channels-grpc-client.crt}
SMQ_CHANNELS_GRPC_CLIENT_KEY=${GRPC_MTLS:+./ssl/certs/channels-grpc-client.key}
SMQ_CHANNELS_GRPC_CLIENT_CA_CERTS=${GRPC_MTLS:+./ssl/certs/ca.crt}
@@ -354,7 +344,6 @@ SMQ_HTTP_ADAPTER_INSTANCE_ID=
### MQTT
SMQ_MQTT_ADAPTER_LOG_LEVEL=debug
SMQ_MQTT_ADAPTER_MQTT_PORT=1883
SMQ_MQTT_ADAPTER_FORWARDER_TIMEOUT=30s
SMQ_MQTT_ADAPTER_WS_PORT=8080
SMQ_MQTT_ADAPTER_INSTANCE=
SMQ_MQTT_ADAPTER_INSTANCE_ID=
+9 -27
View File
@@ -26,25 +26,21 @@ To pull docker images from a specific release you need to change the value of `S
SuperMQ supports configurable MQTT broker and Message broker, which also acts as an events store. SuperMQ uses two types of brokers:
1. MQTT_BROKER: Handles MQTT communication between MQTT adapters and message broker. This can either be 'VerneMQ' or 'NATS'.
1. MQTT_BROKER: Handles MQTT communication between MQTT adapters and message broker. This is NATS.
2. MESSAGE_BROKER: Manages message exchange between SuperMQ core, optional, and external services. This can either be 'NATS' or 'RabbitMQ'. This is used to store messages for distributed processing.
Events store: This is used by SuperMQ services to store events for distributed processing. SuperMQ uses a single service to be the message broker and events store. This can either be 'NATS' or 'RabbitMQ'. Redis can also be used as an events store, but it requires a message broker to be deployed along with it for message exchange.
This is the same as MESSAGE_BROKER. This can either be 'NATS' or 'RabbitMQ' or 'Redis'. If Redis is used as an events store, then RabbitMQ or NATS is used as a message broker.
This is the same as MESSAGE_BROKER. This can either be 'NATS' or 'RabbitMQ' or 'Redis'. If Redis is used as an events store, then RabbitMQ or NATS is used as a message broker.
The current deployment strategy for SuperMQ in `docker/docker-compose.yml` is to use VerneMQ as a MQTT_BROKER and NATS as a MESSAGE_BROKER and EVENTS_STORE.
The current deployment strategy for SuperMQ in `docker/docker-compose.yml` is to use NATS as a MQTT_BROKER, MESSAGE_BROKER and EVENTS_STORE.
Therefore, the following combinations are possible:
- MQTT_BROKER: VerneMQ, MESSAGE_BROKER: NATS, EVENTS_STORE: NATS
- MQTT_BROKER: VerneMQ, MESSAGE_BROKER: NATS, EVENTS_STORE: Redis
- MQTT_BROKER: VerneMQ, MESSAGE_BROKER: RabbitMQ, EVENTS_STORE: RabbitMQ
- MQTT_BROKER: VerneMQ, MESSAGE_BROKER: RabbitMQ, EVENTS_STORE: Redis
- MQTT_BROKER: NATS, MESSAGE_BROKER: RabbitMQ, EVENTS_STORE: RabbitMQ
- MQTT_BROKER: NATS, MESSAGE_BROKER: RabbitMQ, EVENTS_STORE: Redis
- MQTT_BROKER: NATS, MESSAGE_BROKER: NATS, EVENTS_STORE: NATS
- MQTT_BROKER: NATS, MESSAGE_BROKER: NATS, EVENTS_STORE: Redis
- MESSAGE_BROKER: RabbitMQ, EVENTS_STORE: RabbitMQ
- MESSAGE_BROKER: RabbitMQ, EVENTS_STORE: Redis
- MESSAGE_BROKER: NATS, EVENTS_STORE: NATS
- MESSAGE_BROKER: NATS, EVENTS_STORE: Redis
For Message brokers other than NATS, you would need to build the docker images with RabbitMQ as the build tag and change the `docker/.env`. For example, to use RabbitMQ as a message broker:
@@ -70,20 +66,6 @@ SMQ_ES_TYPE=redis
SMQ_ES_URL=${SMQ_REDIS_URL}
```
For MQTT broker other than VerneMQ, you would need to change the `docker/.env`. For example, to use NATS as a MQTT broker:
```env
SMQ_MQTT_BROKER_TYPE=nats
SMQ_MQTT_BROKER_HEALTH_CHECK=${SMQ_NATS_HEALTH_CHECK}
SMQ_MQTT_ADAPTER_MQTT_QOS=${SMQ_NATS_MQTT_QOS}
SMQ_MQTT_ADAPTER_MQTT_TARGET_HOST=${SMQ_MQTT_BROKER_TYPE}
SMQ_MQTT_ADAPTER_MQTT_TARGET_PORT=1883
SMQ_MQTT_ADAPTER_MQTT_TARGET_HEALTH_CHECK=${SMQ_MQTT_BROKER_HEALTH_CHECK}
SMQ_MQTT_ADAPTER_WS_TARGET_HOST=${SMQ_MQTT_BROKER_TYPE}
SMQ_MQTT_ADAPTER_WS_TARGET_PORT=8080
SMQ_MQTT_ADAPTER_WS_TARGET_PATH=${SMQ_NATS_WS_TARGET_PATH}
```
### RabbitMQ configuration
```yaml
@@ -125,9 +107,9 @@ By using environment variables file at `docker/.env` you can modify the below gi
`SMQ_NGINX_SERVER_NAME` environmental variable is used to configure nginx directive `server_name`. If environmental variable `SMQ_NGINX_SERVER_NAME` is empty then default value `localhost` will set to `server_name`.
`SMQ_NGINX_SERVER_CERT` environmental variable is used to configure nginx directive `ssl_certificate`. If environmental variable `SMQ_NGINX_SERVER_CERT` is empty then by default server certificate in the path `docker/ssl/certs/supermq-server.crt` will be assigned.
`SMQ_NGINX_SERVER_CERT` environmental variable is used to configure nginx directive `ssl_certificate`. If environmental variable `SMQ_NGINX_SERVER_CERT` is empty then by default server certificate in the path `docker/ssl/certs/supermq-server.crt` will be assigned.
`SMQ_NGINX_SERVER_KEY` environmental variable is used to configure nginx directive `ssl_certificate_key`. If environmental variable `SMQ_NGINX_SERVER_KEY` is empty then by default server certificate key in the path `docker/ssl/certs/supermq-server.key` will be assigned.
`SMQ_NGINX_SERVER_KEY` environmental variable is used to configure nginx directive `ssl_certificate_key`. If environmental variable `SMQ_NGINX_SERVER_KEY` is empty then by default server certificate key in the path `docker/ssl/certs/supermq-server.key` will be assigned.
`SMQ_NGINX_SERVER_CLIENT_CA` environmental variable is used to configure nginx directive `ssl_client_certificate`. If environmental variable `SMQ_NGINX_SERVER_CLIENT_CA` is empty then by default certificate in the path `docker/ssl/certs/ca.crt` will be assigned.
-17
View File
@@ -14,7 +14,6 @@ volumes:
supermq-channels-db-volume:
supermq-clients-redis-volume:
supermq-broker-volume:
supermq-mqtt-broker-volume:
supermq-spicedb-db-volume:
supermq-auth-db-volume:
supermq-pat-db-volume:
@@ -964,7 +963,6 @@ services:
container_name: supermq-mqtt
depends_on:
- clients
- vernemq
- nats
restart: on-failure
environment:
@@ -972,14 +970,11 @@ services:
SMQ_MQTT_ADAPTER_MQTT_PORT: ${SMQ_MQTT_ADAPTER_MQTT_PORT}
SMQ_MQTT_ADAPTER_MQTT_TARGET_HOST: ${SMQ_MQTT_ADAPTER_MQTT_TARGET_HOST}
SMQ_MQTT_ADAPTER_MQTT_TARGET_PORT: ${SMQ_MQTT_ADAPTER_MQTT_TARGET_PORT}
SMQ_MQTT_ADAPTER_FORWARDER_TIMEOUT: ${SMQ_MQTT_ADAPTER_FORWARDER_TIMEOUT}
SMQ_MQTT_ADAPTER_MQTT_TARGET_HEALTH_CHECK: ${SMQ_MQTT_ADAPTER_MQTT_TARGET_HEALTH_CHECK}
SMQ_MQTT_ADAPTER_MQTT_QOS: ${SMQ_MQTT_ADAPTER_MQTT_QOS}
SMQ_MQTT_ADAPTER_WS_PORT: ${SMQ_MQTT_ADAPTER_WS_PORT}
SMQ_MQTT_ADAPTER_INSTANCE_ID: ${SMQ_MQTT_ADAPTER_INSTANCE_ID}
SMQ_MQTT_ADAPTER_WS_TARGET_HOST: ${SMQ_MQTT_ADAPTER_WS_TARGET_HOST}
SMQ_MQTT_ADAPTER_WS_TARGET_PORT: ${SMQ_MQTT_ADAPTER_WS_TARGET_PORT}
SMQ_MQTT_ADAPTER_WS_TARGET_PATH: ${SMQ_MQTT_ADAPTER_WS_TARGET_PATH}
SMQ_MQTT_ADAPTER_INSTANCE: ${SMQ_MQTT_ADAPTER_INSTANCE}
SMQ_ES_URL: ${SMQ_ES_URL}
SMQ_CLIENTS_AUTH_GRPC_URL: ${SMQ_CLIENTS_AUTH_GRPC_URL}
@@ -1285,18 +1280,6 @@ services:
bind:
create_host_path: true
vernemq:
image: supermq/vernemq:${SMQ_RELEASE_TAG}
container_name: supermq-vernemq
restart: on-failure
environment:
DOCKER_VERNEMQ_ALLOW_ANONYMOUS: ${SMQ_DOCKER_VERNEMQ_ALLOW_ANONYMOUS}
DOCKER_VERNEMQ_LOG__CONSOLE__LEVEL: ${SMQ_DOCKER_VERNEMQ_LOG__CONSOLE__LEVEL}
networks:
- supermq-base-net
volumes:
- supermq-mqtt-broker-volume:/var/lib/vernemq
nats:
image: nats:2.10.9-alpine
container_name: supermq-nats
-56
View File
@@ -1,56 +0,0 @@
# Copyright (c) Abstract Machines
# SPDX-License-Identifier: Apache-2.0
# Builder
FROM erlang:25.3.2.8-alpine AS builder
RUN apk add --update git build-base bsd-compat-headers openssl-dev snappy-dev curl \
&& git clone -b 1.13.0 https://github.com/vernemq/vernemq \
&& cd vernemq \
&& make -j 16 rel
# Executor
FROM alpine:3.19
COPY --from=builder /vernemq/_build/default/rel /
RUN apk --no-cache --update --available upgrade && \
apk add --no-cache ncurses-libs openssl libstdc++ jq curl bash snappy-dev && \
addgroup --gid 10000 vernemq && \
adduser --uid 10000 -H -D -G vernemq -h /vernemq vernemq && \
install -d -o vernemq -g vernemq /vernemq
# Defaults
ENV DOCKER_VERNEMQ_KUBERNETES_LABEL_SELECTOR="app=vernemq" \
DOCKER_VERNEMQ_LOG__CONSOLE=console \
PATH="/vernemq/bin:$PATH" \
VERNEMQ_VERSION="1.13.0"
WORKDIR /vernemq
COPY --chown=10000:10000 bin/vernemq.sh /usr/sbin/start_vernemq
COPY --chown=10000:10000 files/vm.args /vernemq/etc/vm.args
RUN chown -R 10000:10000 /vernemq && \
ln -s /vernemq/etc /etc/vernemq && \
ln -s /vernemq/data /var/lib/vernemq && \
ln -s /vernemq/log /var/log/vernemq
# Ports
# 1883 MQTT
# 8883 MQTT/SSL
# 8080 MQTT WebSockets
# 44053 VerneMQ Message Distribution
# 4369 EPMD - Erlang Port Mapper Daemon
# 8888 Health, API, Prometheus Metrics
# 9100 9101 9102 9103 9104 9105 9106 9107 9108 9109 Specific Distributed Erlang Port Range
EXPOSE 1883 8883 8080 44053 4369 8888 \
9100 9101 9102 9103 9104 9105 9106 9107 9108 9109
VOLUME ["/vernemq/log", "/vernemq/data", "/vernemq/etc"]
HEALTHCHECK CMD vernemq ping | grep -q pong
USER vernemq
CMD ["start_vernemq"]
-352
View File
@@ -1,352 +0,0 @@
#!/usr/bin/env sh
NET_INTERFACE=$(route | grep '^default' | grep -o '[^ ]*$')
NET_INTERFACE=${DOCKER_NET_INTERFACE:-${NET_INTERFACE}}
IP_ADDRESS=$(ip -4 addr show ${NET_INTERFACE} | grep -oE '[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}' | sed -e "s/^[[:space:]]*//" | head -n 1)
IP_ADDRESS=${DOCKER_IP_ADDRESS:-${IP_ADDRESS}}
VERNEMQ_ETC_DIR="/vernemq/etc"
VERNEMQ_VM_ARGS_FILE="${VERNEMQ_ETC_DIR}/vm.args"
VERNEMQ_CONF_FILE="${VERNEMQ_ETC_DIR}/vernemq.conf"
VERNEMQ_CONF_LOCAL_FILE="${VERNEMQ_ETC_DIR}/vernemq.conf.local"
SECRETS_KUBERNETES_DIR="/var/run/secrets/kubernetes.io/serviceaccount"
# Function to check istio readiness
istio_health() {
cmd=$(curl -s http://localhost:15021/healthz/ready > /dev/null)
status=$?
return $status
}
# Ensure we have all files and needed directory write permissions
if [ ! -d ${VERNEMQ_ETC_DIR} ]; then
echo "Configuration directory at ${VERNEMQ_ETC_DIR} does not exist, exiting" >&2
exit 1
fi
if [ ! -f ${VERNEMQ_VM_ARGS_FILE} ]; then
echo "ls -l ${VERNEMQ_ETC_DIR}"
ls -l ${VERNEMQ_ETC_DIR}
echo "###" >&2
echo "### Configuration file ${VERNEMQ_VM_ARGS_FILE} does not exist, exiting" >&2
echo "###" >&2
exit 1
fi
if [ ! -w ${VERNEMQ_VM_ARGS_FILE} ]; then
echo "# whoami"
whoami
echo "# ls -l ${VERNEMQ_ETC_DIR}"
ls -l ${VERNEMQ_ETC_DIR}
echo "###" >&2
echo "### Configuration file ${VERNEMQ_VM_ARGS_FILE} exists, but there are no write permissions! Exiting." >&2
echo "###" >&2
exit 1
fi
if [ ! -s ${VERNEMQ_VM_ARGS_FILE} ]; then
echo "ls -l ${VERNEMQ_ETC_DIR}"
ls -l ${VERNEMQ_ETC_DIR}
echo "###" >&2
echo "### Configuration file ${VERNEMQ_VM_ARGS_FILE} is empty! This will not work." >&2
echo "### Exiting now." >&2
echo "###" >&2
exit 1
fi
# Ensure the Erlang node name is set correctly
if env | grep "DOCKER_VERNEMQ_NODENAME" -q; then
sed -i.bak -r "s/-name VerneMQ@.+/-name VerneMQ@${DOCKER_VERNEMQ_NODENAME}/" ${VERNEMQ_VM_ARGS_FILE}
else
if [ -n "$DOCKER_VERNEMQ_SWARM" ]; then
NODENAME=$(hostname -i)
sed -i.bak -r "s/VerneMQ@.+/VerneMQ@${NODENAME}/" ${VERNEMQ_VM_ARGS_FILE}
else
sed -i.bak -r "s/-name VerneMQ@.+/-name VerneMQ@${IP_ADDRESS}/" ${VERNEMQ_VM_ARGS_FILE}
fi
fi
if env | grep "DOCKER_VERNEMQ_DISCOVERY_NODE" -q; then
discovery_node=$DOCKER_VERNEMQ_DISCOVERY_NODE
if [ -n "$DOCKER_VERNEMQ_SWARM" ]; then
tmp=''
while [[ -z "$tmp" ]]; do
tmp=$(getent hosts tasks.$discovery_node | awk '{print $1}' | head -n 1)
sleep 1
done
discovery_node=$tmp
fi
if [ -n "$DOCKER_VERNEMQ_COMPOSE" ]; then
tmp=''
while [[ -z "$tmp" ]]; do
tmp=$(getent hosts $discovery_node | awk '{print $1}' | head -n 1)
sleep 1
done
discovery_node=$tmp
fi
sed -i.bak -r "/-eval.+/d" ${VERNEMQ_VM_ARGS_FILE}
echo "-eval \"vmq_server_cmd:node_join('VerneMQ@$discovery_node')\"" >> ${VERNEMQ_VM_ARGS_FILE}
fi
# If you encounter "SSL certification error (subject name does not match the host name)", you may try to set DOCKER_VERNEMQ_KUBERNETES_INSECURE to "1".
insecure=""
if env | grep "DOCKER_VERNEMQ_KUBERNETES_INSECURE" -q; then
echo "Using curl with \"--insecure\" argument to access kubernetes API without matching SSL certificate"
insecure="--insecure"
fi
if env | grep "DOCKER_VERNEMQ_KUBERNETES_ISTIO_ENABLED" -q; then
istio_health
while [ $status != 0 ]; do
istio_health
sleep 1
done
echo "Istio ready"
fi
# Function to call a HTTP GET request on the given URL Path, using the hostname
# of the current k8s cluster name. Usage: "k8sCurlGet /my/path"
function k8sCurlGet () {
local urlPath=$1
local hostname="kubernetes.default.svc.${DOCKER_VERNEMQ_KUBERNETES_CLUSTER_NAME}"
local certsFile="${SECRETS_KUBERNETES_DIR}/ca.crt"
local token=$(cat ${SECRETS_KUBERNETES_DIR}/token)
local header="Authorization: Bearer ${token}"
local url="https://${hostname}/${urlPath}"
curl -sS ${insecure} --cacert ${certsFile} -H "${header}" ${url} \
|| ( echo "### Error on accessing URL ${url}" )
}
DOCKER_VERNEMQ_KUBERNETES_CLUSTER_NAME=${DOCKER_VERNEMQ_KUBERNETES_CLUSTER_NAME:-cluster.local}
if [ -d "${SECRETS_KUBERNETES_DIR}" ] ; then
# Let's get the namespace if it isn't set
DOCKER_VERNEMQ_KUBERNETES_NAMESPACE=${DOCKER_VERNEMQ_KUBERNETES_NAMESPACE:-$(cat "${SECRETS_KUBERNETES_DIR}/namespace")}
# Check the API access that will be needed in the TERM signal handler
podResponse=$(k8sCurlGet api/v1/namespaces/${DOCKER_VERNEMQ_KUBERNETES_NAMESPACE}/pods/$(hostname) )
statefulSetName=$(echo ${podResponse} | jq -r '.metadata.ownerReferences[0].name')
statefulSetPath="apis/apps/v1/namespaces/${DOCKER_VERNEMQ_KUBERNETES_NAMESPACE}/statefulsets/${statefulSetName}"
statefulSetResponse=$(k8sCurlGet ${statefulSetPath} )
isCodeForbidden=$(echo ${statefulSetResponse} | jq '.code == 403')
if [[ ${isCodeForbidden} == "true" ]]; then
echo "Permission error: Cannot access URL ${statefulSetPath}: $(echo ${statefulSetResponse} | jq '.reason,.code,.message')"
exit 1
else
numReplicas=$(echo ${statefulSetResponse} | jq '.status.replicas')
echo "Permissions ok: Our pod $(hostname) belongs to StatefulSet ${statefulSetName} with ${numReplicas} replicas"
fi
fi
# Set up kubernetes node discovery
start_join_cluster=0
if env | grep "DOCKER_VERNEMQ_DISCOVERY_KUBERNETES" -q; then
# Let's set our nodename correctly
# https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.19/#list-pod-v1-core
podList=$(k8sCurlGet "api/v1/namespaces/${DOCKER_VERNEMQ_KUBERNETES_NAMESPACE}/pods?labelSelector=${DOCKER_VERNEMQ_KUBERNETES_LABEL_SELECTOR}")
VERNEMQ_KUBERNETES_SUBDOMAIN=${DOCKER_VERNEMQ_KUBERNETES_SUBDOMAIN:-$(echo ${podList} | jq '.items[0].spec.subdomain' | tr '\n' '"' | sed 's/"//g')}
if [[ $VERNEMQ_KUBERNETES_SUBDOMAIN == "null" ]]; then
VERNEMQ_KUBERNETES_HOSTNAME=${MY_POD_NAME}.${DOCKER_VERNEMQ_KUBERNETES_NAMESPACE}.svc.${DOCKER_VERNEMQ_KUBERNETES_CLUSTER_NAME}
else
VERNEMQ_KUBERNETES_HOSTNAME=${MY_POD_NAME}.${VERNEMQ_KUBERNETES_SUBDOMAIN}.${DOCKER_VERNEMQ_KUBERNETES_NAMESPACE}.svc.${DOCKER_VERNEMQ_KUBERNETES_CLUSTER_NAME}
fi
sed -i.bak -r "s/VerneMQ@.+/VerneMQ@${VERNEMQ_KUBERNETES_HOSTNAME}/" ${VERNEMQ_VM_ARGS_FILE}
# Hack into K8S DNS resolution (temporarily)
kube_pod_names=$(echo ${podList} | jq '.items[].spec.hostname' | sed 's/"//g' | tr '\n' ' ' | sed 's/ *$//')
for kube_pod_name in $kube_pod_names; do
if [[ $kube_pod_name == "null" ]]; then
echo "Kubernetes discovery selected, but no pods found. Maybe we're the first?"
echo "Anyway, we won't attempt to join any cluster."
break
fi
if [[ $kube_pod_name != $MY_POD_NAME ]]; then
discoveryHostname="${kube_pod_name}.${VERNEMQ_KUBERNETES_SUBDOMAIN}.${DOCKER_VERNEMQ_KUBERNETES_NAMESPACE}.svc.${DOCKER_VERNEMQ_KUBERNETES_CLUSTER_NAME}"
start_join_cluster=1
echo "Will join an existing Kubernetes cluster with discovery node at ${discoveryHostname}"
echo "-eval \"vmq_server_cmd:node_join('VerneMQ@${discoveryHostname}')\"" >> ${VERNEMQ_VM_ARGS_FILE}
echo "Did I previously leave the cluster? If so, purging old state."
curl -fsSL http://${discoveryHostname}:8888/status.json >/dev/null 2>&1 ||
(echo "Can't download status.json, better to exit now" && exit 1)
curl -fsSL http://${discoveryHostname}:8888/status.json | grep -q ${VERNEMQ_KUBERNETES_HOSTNAME} ||
(echo "Cluster doesn't know about me, this means I've left previously. Purging old state..." && rm -rf /vernemq/data/*)
break
fi
done
fi
if [ -f "${VERNEMQ_CONF_LOCAL_FILE}" ]; then
cp "${VERNEMQ_CONF_LOCAL_FILE}" ${VERNEMQ_CONF_FILE}
sed -i -r "s/###IPADDRESS###/${IP_ADDRESS}/" ${VERNEMQ_CONF_FILE}
else
sed -i '/########## Start ##########/,/########## End ##########/d' ${VERNEMQ_CONF_FILE}
echo "########## Start ##########" >> ${VERNEMQ_CONF_FILE}
env | grep DOCKER_VERNEMQ | grep -v 'DISCOVERY_NODE\|KUBERNETES\|SWARM\|COMPOSE\|DOCKER_VERNEMQ_USER' | cut -c 16- | awk '{match($0,/^[A-Z0-9_]*/)}{print tolower(substr($0,RSTART,RLENGTH)) substr($0,RLENGTH+1)}' | sed 's/__/./g' >> ${VERNEMQ_CONF_FILE}
users_are_set=$(env | grep DOCKER_VERNEMQ_USER)
if [ ! -z "$users_are_set" ]; then
echo "vmq_passwd.password_file = /vernemq/etc/vmq.passwd" >> ${VERNEMQ_CONF_FILE}
touch /vernemq/etc/vmq.passwd
fi
for vernemq_user in $(env | grep DOCKER_VERNEMQ_USER); do
username=$(echo $vernemq_user | awk -F '=' '{ print $1 }' | sed 's/DOCKER_VERNEMQ_USER_//g' | tr '[:upper:]' '[:lower:]')
password=$(echo $vernemq_user | awk -F '=' '{ print $2 }')
/vernemq/bin/vmq-passwd /vernemq/etc/vmq.passwd $username <<EOF
$password
$password
EOF
done
if [ -z "$DOCKER_VERNEMQ_ERLANG__DISTRIBUTION__PORT_RANGE__MINIMUM" ]; then
echo "erlang.distribution.port_range.minimum = 9100" >> ${VERNEMQ_CONF_FILE}
fi
if [ -z "$DOCKER_VERNEMQ_ERLANG__DISTRIBUTION__PORT_RANGE__MAXIMUM" ]; then
echo "erlang.distribution.port_range.maximum = 9109" >> ${VERNEMQ_CONF_FILE}
fi
if [ -z "$DOCKER_VERNEMQ_LISTENER__TCP__DEFAULT" ]; then
echo "listener.tcp.default = ${IP_ADDRESS}:1883" >> ${VERNEMQ_CONF_FILE}
fi
if [ -z "$DOCKER_VERNEMQ_LISTENER__WS__DEFAULT" ]; then
echo "listener.ws.default = ${IP_ADDRESS}:8080" >> ${VERNEMQ_CONF_FILE}
fi
if [ -z "$DOCKER_VERNEMQ_LISTENER__VMQ__CLUSTERING" ]; then
echo "listener.vmq.clustering = ${IP_ADDRESS}:44053" >> ${VERNEMQ_CONF_FILE}
fi
if [ -z "$DOCKER_VERNEMQ_LISTENER__HTTP__METRICS" ]; then
echo "listener.http.metrics = ${IP_ADDRESS}:8888" >> ${VERNEMQ_CONF_FILE}
fi
echo "########## End ##########" >> ${VERNEMQ_CONF_FILE}
fi
if [ ! -z "$DOCKER_VERNEMQ_ERLANG__MAX_PORTS" ]; then
sed -i.bak -r "s/\+Q.+/\+Q ${DOCKER_VERNEMQ_ERLANG__MAX_PORTS}/" ${VERNEMQ_VM_ARGS_FILE}
fi
if [ ! -z "$DOCKER_VERNEMQ_ERLANG__PROCESS_LIMIT" ]; then
sed -i.bak -r "s/\+P.+/\+P ${DOCKER_VERNEMQ_ERLANG__PROCESS_LIMIT}/" ${VERNEMQ_VM_ARGS_FILE}
fi
if [ ! -z "$DOCKER_VERNEMQ_ERLANG__MAX_ETS_TABLES" ]; then
sed -i.bak -r "s/\+e.+/\+e ${DOCKER_VERNEMQ_ERLANG__MAX_ETS_TABLES}/" ${VERNEMQ_VM_ARGS_FILE}
fi
if [ ! -z "$DOCKER_VERNEMQ_ERLANG__DISTRIBUTION_BUFFER_SIZE" ]; then
sed -i.bak -r "s/\+zdbbl.+/\+zdbbl ${DOCKER_VERNEMQ_ERLANG__DISTRIBUTION_BUFFER_SIZE}/" ${VERNEMQ_VM_ARGS_FILE}
fi
# Check configuration file
/vernemq/bin/vernemq config generate 2>&1 > /dev/null | tee /tmp/config.out | grep error
if [ $? -ne 1 ]; then
echo "configuration error, exit"
echo "$(cat /tmp/config.out)"
exit $?
fi
pid=0
# SIGUSR1-handler
siguser1_handler() {
echo "stopped"
}
# SIGTERM-handler
sigterm_handler() {
if [ $pid -ne 0 ]; then
if [ -d "${SECRETS_KUBERNETES_DIR}" ] ; then
# this will stop the VerneMQ process, but first drain the node from all existing client sessions (-k)
if [ -n "$VERNEMQ_KUBERNETES_HOSTNAME" ]; then
terminating_node_name=VerneMQ@$VERNEMQ_KUBERNETES_HOSTNAME
else
terminating_node_name=VerneMQ@$IP_ADDRESS
fi
podList=$(k8sCurlGet "api/v1/namespaces/${DOCKER_VERNEMQ_KUBERNETES_NAMESPACE}/pods?labelSelector=${DOCKER_VERNEMQ_KUBERNETES_LABEL_SELECTOR}")
kube_pod_names=$(echo ${podList} | jq '.items[].spec.hostname' | sed 's/"//g' | tr '\n' ' ' | sed 's/ *$//')
if [ "$kube_pod_names" = "$MY_POD_NAME" ]; then
echo "I'm the only pod remaining. Not performing leave and/or state purge."
/vernemq/bin/vmq-admin node stop >/dev/null
else
# https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.19/#read-pod-v1-core
podResponse=$(k8sCurlGet api/v1/namespaces/${DOCKER_VERNEMQ_KUBERNETES_NAMESPACE}/pods/$(hostname) )
statefulSetName=$(echo ${podResponse} | jq -r '.metadata.ownerReferences[0].name')
# https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.19/#-strong-read-operations-statefulset-v1-apps-strong-
statefulSetResponse=$(k8sCurlGet "apis/apps/v1/namespaces/${DOCKER_VERNEMQ_KUBERNETES_NAMESPACE}/statefulsets/${statefulSetName}" )
isCodeForbidden=$(echo ${statefulSetResponse} | jq '.code == 403')
if [[ ${isCodeForbidden} == "true" ]]; then
echo "Permission error: Cannot access URL ${statefulSetPath}: $(echo ${statefulSetResponse} | jq '.reason,.code,.message')"
fi
reschedule=$(echo ${statefulSetResponse} | jq '.status.replicas == .status.readyReplicas')
scaled_down=$(echo ${statefulSetResponse} | jq '.status.currentReplicas == .status.updatedReplicas')
if [[ $reschedule == "true" ]]; then
# Perhaps is an scale down?
if [[ $scaled_down == "true" ]]; then
echo "Seems that this is a scale down scenario. Leaving cluster."
/vernemq/bin/vmq-admin cluster leave node=${terminating_node_name} -k && rm -rf /vernemq/data/*
else
echo "Reschedule is true. Not leaving the cluster."
/vernemq/bin/vmq-admin node stop >/dev/null
fi
else
echo "Reschedule is false. Leaving the cluster."
/vernemq/bin/vmq-admin cluster leave node=${terminating_node_name} -k && rm -rf /vernemq/data/*
fi
fi
else
if [ -n "$DOCKER_VERNEMQ_SWARM" ]; then
terminating_node_name=VerneMQ@$(hostname -i)
# For Swarm we keep the old "cluster leave" approach for now
echo "Swarm node is leaving the cluster."
/vernemq/bin/vmq-admin cluster leave node=${terminating_node_name} -k && rm -rf /vernemq/data/*
else
# In non-k8s mode: Stop the vernemq node gracefully
/vernemq/bin/vmq-admin node stop >/dev/null
fi
fi
kill -s TERM ${pid}
WAITFOR_PID=${pid}
pid=0
wait ${WAITFOR_PID}
fi
exit 143; # 128 + 15 -- SIGTERM
}
if [ ! -s ${VERNEMQ_VM_ARGS_FILE} ]; then
echo "ls -l ${VERNEMQ_ETC_DIR}"
ls -l ${VERNEMQ_ETC_DIR}
echo "###" >&2
echo "### Configuration file ${VERNEMQ_VM_ARGS_FILE} is empty! This will not work." >&2
echo "### Exiting now." >&2
echo "###" >&2
exit 1
fi
# Setup OS signal handlers
trap 'siguser1_handler' SIGUSR1
trap 'sigterm_handler' SIGTERM
# Start VerneMQ
/vernemq/bin/vernemq console -noshell -noinput $@ &
pid=$!
if [ $start_join_cluster -eq 1 ]; then
mkdir -p /var/log/vernemq/log
join_cluster > /var/log/vernemq/log/join_cluster.log &
fi
if [ -n "$API_KEY" ]; then
sleep 10 && echo "Adding API_KEY..." && /vernemq/bin/vmq-admin api-key add key="${API_KEY:-DEFAULT}"
vmq-admin api-key show
fi
wait $pid
-15
View File
@@ -1,15 +0,0 @@
+P 512000
+e 256000
-env ERL_CRASH_DUMP /erl_crash.dump
-env ERL_FULLSWEEP_AFTER 0
+Q 512000
+A 64
-setcookie vmq
-name VerneMQ@127.0.0.1
+K true
+W w
+sbwt none
+sbwtdcpu none
+sbwtdio none
-smp enable
+zdbbl 32768
+18 -18
View File
@@ -5,7 +5,7 @@ go 1.23.4
require (
github.com/0x6flab/namegenerator v1.4.0
github.com/absmach/callhome v0.14.0
github.com/absmach/certs v0.0.0-20241014135535-3f118b801054
github.com/absmach/certs v0.0.0-20250127084046-fb0da0712b2b
github.com/absmach/mgate v0.4.5
github.com/absmach/senml v1.0.6
github.com/authzed/authzed-go v1.3.0
@@ -13,7 +13,6 @@ require (
github.com/authzed/spicedb v1.40.0
github.com/caarlos0/env/v11 v11.3.1
github.com/cenkalti/backoff/v4 v4.3.0
github.com/eclipse/paho.mqtt.golang v1.5.0
github.com/fatih/color v1.18.0
github.com/go-chi/chi/v5 v5.2.0
github.com/go-kit/kit v0.13.0
@@ -52,7 +51,7 @@ require (
golang.org/x/crypto v0.32.0
golang.org/x/oauth2 v0.25.0
golang.org/x/sync v0.10.0
google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f
google.golang.org/genproto/googleapis/rpc v0.0.0-20250127172529-29210b9bc287
google.golang.org/grpc v1.70.0
google.golang.org/protobuf v1.36.4
gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df
@@ -65,7 +64,7 @@ require (
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 // indirect
github.com/Microsoft/go-winio v0.6.2 // indirect
github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 // indirect
github.com/antlr4-go/antlr/v4 v4.13.0 // indirect
github.com/antlr4-go/antlr/v4 v4.13.1 // indirect
github.com/authzed/cel-go v0.20.2 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/ccoveille/go-safecast v1.5.0 // indirect
@@ -80,6 +79,7 @@ require (
github.com/docker/go-connections v0.5.0 // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/dsnet/golib/memfile v1.0.0 // indirect
github.com/eclipse/paho.mqtt.golang v1.5.0 // indirect
github.com/emirpasic/gods v1.18.1 // indirect
github.com/envoyproxy/protoc-gen-validate v1.2.1 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
@@ -91,7 +91,7 @@ require (
github.com/go-logfmt/logfmt v0.6.0 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/goccy/go-json v0.10.3 // indirect
github.com/goccy/go-json v0.10.5 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
github.com/gopherjs/gopherjs v1.17.2 // indirect
@@ -102,9 +102,9 @@ require (
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-retryablehttp v0.7.7 // indirect
github.com/hashicorp/go-rootcerts v1.0.2 // indirect
github.com/hashicorp/go-secure-stdlib/parseutil v0.1.8 // indirect
github.com/hashicorp/go-secure-stdlib/parseutil v0.1.9 // indirect
github.com/hashicorp/go-secure-stdlib/strutil v0.1.2 // indirect
github.com/hashicorp/go-sockaddr v1.0.6 // indirect
github.com/hashicorp/go-sockaddr v1.0.7 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jackc/pgio v1.0.0 // indirect
@@ -114,13 +114,13 @@ require (
github.com/jackc/puddle/v2 v2.2.2 // indirect
github.com/jtolds/gls v4.20.0+incompatible // indirect
github.com/jzelinskie/stringz v0.0.3 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/klauspost/compress v1.17.11 // indirect
github.com/lestrrat-go/blackmagic v1.0.2 // indirect
github.com/lestrrat-go/httpcc v1.0.1 // indirect
github.com/lestrrat-go/httprc v1.0.6 // indirect
github.com/lestrrat-go/iter v1.0.2 // indirect
github.com/lestrrat-go/option v1.0.1 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-colorable v0.1.14 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/moby/docker-image-spec v1.3.1 // indirect
@@ -131,22 +131,22 @@ require (
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.1.0 // indirect
github.com/opencontainers/runc v1.1.14 // indirect
github.com/pion/dtls/v3 v3.0.2 // indirect
github.com/pion/logging v0.2.2 // indirect
github.com/pion/dtls/v3 v3.0.4 // indirect
github.com/pion/logging v0.2.3 // indirect
github.com/pion/transport/v3 v3.0.7 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/planetscale/vtprotobuf v0.6.1-0.20240917153116-6f2963f01587 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.61.0 // indirect
github.com/prometheus/common v0.62.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
github.com/rs/zerolog v1.33.0 // indirect
github.com/ryanuber/go-glob v1.0.0 // indirect
github.com/samber/lo v1.49.0 // indirect
github.com/samber/lo v1.49.1 // indirect
github.com/segmentio/asm v1.2.0 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/smarty/assertions v1.15.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/spf13/pflag v1.0.6 // indirect
github.com/stoewer/go-strcase v1.3.0 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/x448/float16 v0.8.4 // indirect
@@ -157,14 +157,14 @@ require (
go.opentelemetry.io/otel/metric v1.34.0 // indirect
go.opentelemetry.io/proto/otlp v1.5.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 // indirect
golang.org/x/exp v0.0.0-20250128182459-e0ece0dbea4c // indirect
golang.org/x/net v0.34.0 // indirect
golang.org/x/sys v0.29.0 // indirect
golang.org/x/text v0.21.0 // indirect
golang.org/x/time v0.8.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20250115164207-1a7da9e5054f // indirect
golang.org/x/time v0.9.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20250127172529-29210b9bc287 // indirect
gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 // indirect
k8s.io/utils v0.0.0-20241210054802-24370beab758 // indirect
)
+34 -34
View File
@@ -19,14 +19,14 @@ github.com/VividCortex/gohistogram v1.0.0 h1:6+hBz+qvs0JOrrNhhmR7lFxo5sINxBCGXrd
github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g=
github.com/absmach/callhome v0.14.0 h1:zB4tIZJ1YUmZ1VGHFPfMA/Lo6/Mv19y2dvoOiXj2BWs=
github.com/absmach/callhome v0.14.0/go.mod h1:l12UJOfibK4Muvg/AbupHuquNV9qSz/ROdTEPg7f2Vk=
github.com/absmach/certs v0.0.0-20241014135535-3f118b801054 h1:NsIwp+ueKxDx8XftruA4hz8WUgyWq7eBE344nJt0LJg=
github.com/absmach/certs v0.0.0-20241014135535-3f118b801054/go.mod h1:bEAb/HjPztlrMmz8dLeJTke4Tzu9yW3+hY5eldEUtSY=
github.com/absmach/certs v0.0.0-20250127084046-fb0da0712b2b h1:EGIqL1bARjRSS7kH98Q5O/g7lZN/Q0KtAVX5mxRcq84=
github.com/absmach/certs v0.0.0-20250127084046-fb0da0712b2b/go.mod h1:g6Kqge7RVxwt+LRxqt+09cqa2SgPAwXvIPoyPsEqZlQ=
github.com/absmach/mgate v0.4.5 h1:l6RmrEsR9jxkdb9WHUSecmT0HA41TkZZQVffFfUAIfI=
github.com/absmach/mgate v0.4.5/go.mod h1:IvRIHZexZPEIAPmmaJF0L5DY2ERjj+GxRGitOW4s6qo=
github.com/absmach/senml v1.0.6 h1:WPeIl6vQ00k7ghWSZYT/QP0KUxq2+4zQoaC7240pLFk=
github.com/absmach/senml v1.0.6/go.mod h1:QnJNPy1DJPy0+qUW21PTcH/xoh0LgfYZxTfwriMIvmQ=
github.com/antlr4-go/antlr/v4 v4.13.0 h1:lxCg3LAv+EUK6t1i0y1V6/SLeUi0eKEKdhQAlS8TVTI=
github.com/antlr4-go/antlr/v4 v4.13.0/go.mod h1:pfChB/xh/Unjila75QW7+VU4TSnWnnk9UTnmpPaOR2g=
github.com/antlr4-go/antlr/v4 v4.13.1 h1:SqQKkuVZ+zWkMMNkjy5FZe5mr5WURWnlpmOuzYWrPrQ=
github.com/antlr4-go/antlr/v4 v4.13.1/go.mod h1:GKmUxMtwp6ZgGwZSva4eWPC5mS6vUAmOABFgjdkM7Nw=
github.com/authzed/authzed-go v1.3.0 h1:jKIMpYDy+6WoOwl32HRURxLZxNGm+I7ObUlTntEPcXA=
github.com/authzed/authzed-go v1.3.0/go.mod h1:MYkXImtFAxrM/bVZvmC/WO+gZC9RLlvpCM51SLaUZb0=
github.com/authzed/cel-go v0.20.2 h1:GlmLecGry7Z8HU0k+hmaHHUV05ZHrsFxduXHtIePvck=
@@ -99,8 +99,6 @@ github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2
github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/fxamacker/cbor/v2 v2.7.0 h1:iM5WgngdRBanHcxugY4JySA0nk1wZorNOpTgCMedv5E=
github.com/fxamacker/cbor/v2 v2.7.0/go.mod h1:pxXPTn3joSm21Gbwsv0w9OSA2y1HFR9qXEeXQVeNoDQ=
github.com/go-chi/chi v1.5.5 h1:vOB/HbEMt9QqBqErz07QehcOKHaWFtuj87tTDVz2qXE=
github.com/go-chi/chi v1.5.5/go.mod h1:C9JqLr3tIYjDOZpzn+BCuxY8z8vmca43EeMgyZt7irw=
github.com/go-chi/chi/v5 v5.2.0 h1:Aj1EtB0qR2Rdo2dG4O94RIU35w2lvQSj6BRA4+qwFL0=
github.com/go-chi/chi/v5 v5.2.0/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8=
github.com/go-errors/errors v1.5.1 h1:ZwEMSLRCapFLflTpT7NKaAc7ukJ8ZPEjzlxt8rPN8bk=
@@ -127,8 +125,8 @@ github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqw
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/go-test/deep v1.0.2 h1:onZX1rnHT3Wv6cqNgYyFOOlgVKJrksuCMCRvJStbMYw=
github.com/go-test/deep v1.0.2/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA=
github.com/goccy/go-json v0.10.3 h1:KZ5WoDbxAIgm2HNbYckL0se1fHD6rz5j4ywS6ebzDqA=
github.com/goccy/go-json v0.10.3/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M=
github.com/goccy/go-json v0.10.5 h1:Fq85nIqj+gXn/S5ahsiTlK3TmC85qgirsdTP/+DeaC4=
github.com/goccy/go-json v0.10.5/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gofrs/uuid v4.0.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
github.com/gofrs/uuid/v5 v5.3.0 h1:m0mUMr+oVYUdxpMLgSYCZiXe7PuVPnI94+OMeVBNedk=
@@ -173,12 +171,12 @@ github.com/hashicorp/go-retryablehttp v0.7.7 h1:C8hUCYzor8PIfXHa4UrZkU4VvK8o9ISH
github.com/hashicorp/go-retryablehttp v0.7.7/go.mod h1:pkQpWZeYWskR+D1tR2O5OcBFOxfA7DoAO6xtkuQnHTk=
github.com/hashicorp/go-rootcerts v1.0.2 h1:jzhAVGtqPKbwpyCPELlgNWhE1znq+qwJtW5Oi2viEzc=
github.com/hashicorp/go-rootcerts v1.0.2/go.mod h1:pqUvnprVnM5bf7AOirdbb01K4ccR319Vf4pU3K5EGc8=
github.com/hashicorp/go-secure-stdlib/parseutil v0.1.8 h1:iBt4Ew4XEGLfh6/bPk4rSYmuZJGizr6/x/AEizP0CQc=
github.com/hashicorp/go-secure-stdlib/parseutil v0.1.8/go.mod h1:aiJI+PIApBRQG7FZTEBx5GiiX+HbOHilUdNxUZi4eV0=
github.com/hashicorp/go-secure-stdlib/parseutil v0.1.9 h1:FW0YttEnUNDJ2WL9XcrrfteS1xW8u+sh4ggM8pN5isQ=
github.com/hashicorp/go-secure-stdlib/parseutil v0.1.9/go.mod h1:Ll013mhdmsVDuoIXVfBtvgGJsXDYkTw1kooNcoCXuE0=
github.com/hashicorp/go-secure-stdlib/strutil v0.1.2 h1:kes8mmyCpxJsI7FTwtzRqEy9CdjCtrXrXGuOpxEA7Ts=
github.com/hashicorp/go-secure-stdlib/strutil v0.1.2/go.mod h1:Gou2R9+il93BqX25LAKCLuM+y9U2T4hlwvT1yprcna4=
github.com/hashicorp/go-sockaddr v1.0.6 h1:RSG8rKU28VTUTvEKghe5gIhIQpv8evvNpnDEyqO4u9I=
github.com/hashicorp/go-sockaddr v1.0.6/go.mod h1:uoUUmtwU7n9Dv3O4SNLeFvg0SxQ3lyjsj6+CCykpaxI=
github.com/hashicorp/go-sockaddr v1.0.7 h1:G+pTkSO01HpR5qCxg7lxfsFEZaG+C0VssTy/9dbT+Fw=
github.com/hashicorp/go-sockaddr v1.0.7/go.mod h1:FZQbEYa1pxkQ7WLpyXJ6cbjpT8q0YgQaK/JakXqGyWw=
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/hashicorp/vault/api v1.15.0 h1:O24FYQCWwhwKnF7CuSqP30S51rTV7vz1iACXE/pj5DA=
@@ -253,8 +251,8 @@ github.com/jzelinskie/stringz v0.0.3 h1:0GhG3lVMYrYtIvRbxvQI6zqRTT1P1xyQlpa0FhfU
github.com/jzelinskie/stringz v0.0.3/go.mod h1:hHYbgxJuNLRw91CmpuFsYEOyQqpDVFg8pvEh23vy4P0=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA=
github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc=
github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
@@ -287,8 +285,9 @@ github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/mattn/go-colorable v0.1.1/go.mod h1:FuOcm+DKB9mbwrcAfNl7/TZVBZ6rcnceauSikq3lYCQ=
github.com/mattn/go-colorable v0.1.6/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
github.com/mattn/go-colorable v0.1.14 h1:9A9LHSqF/7dyVVX6g0U9cwm9pG3kP9gSzcuIPHPsaIE=
github.com/mattn/go-colorable v0.1.14/go.mod h1:6LmQG8QLFO4G5z1gPvYEzlUgJ2wF+stgPZH1UqBm1s8=
github.com/mattn/go-isatty v0.0.5/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
github.com/mattn/go-isatty v0.0.7/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
@@ -328,10 +327,10 @@ github.com/ory/dockertest/v3 v3.11.0/go.mod h1:VIPxS1gwT9NpPOrfD3rACs8Y9Z7yhzO4S
github.com/pborman/getopt v0.0.0-20170112200414-7148bc3a4c30/go.mod h1:85jBQOZwpVEaDAr341tbn15RS4fCAsIst0qp7i8ex1o=
github.com/pelletier/go-toml v1.9.5 h1:4yBQzkHv+7BHq2PQUZF3Mx0IYxG7LsP222s7Agd3ve8=
github.com/pelletier/go-toml v1.9.5/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c=
github.com/pion/dtls/v3 v3.0.2 h1:425DEeJ/jfuTTghhUDW0GtYZYIwwMtnKKJNMcWccTX0=
github.com/pion/dtls/v3 v3.0.2/go.mod h1:dfIXcFkKoujDQ+jtd8M6RgqKK3DuaUilm3YatAbGp5k=
github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY=
github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms=
github.com/pion/dtls/v3 v3.0.4 h1:44CZekewMzfrn9pmGrj5BNnTMDCFwr+6sLH+cCuLM7U=
github.com/pion/dtls/v3 v3.0.4/go.mod h1:R373CsjxWqNPf6MEkfdy3aSe9niZvL/JaKlGeFphtMg=
github.com/pion/logging v0.2.3 h1:gHuf0zpoh1GW67Nr6Gj4cv5Z9ZscU7g/EaoC/Ke/igI=
github.com/pion/logging v0.2.3/go.mod h1:z8YfknkquMe1csOrxK5kc+5/ZPAzMxbKLX5aXpbpC90=
github.com/pion/transport/v3 v3.0.7 h1:iRbMH05BzSNwhILHoBoAPxoB9xQgOaJk+591KC9P1o0=
github.com/pion/transport/v3 v3.0.7/go.mod h1:YleKiTZ4vqNxVwh77Z0zytYi7rXHl7j6uPLGhhz9rwo=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
@@ -351,8 +350,8 @@ github.com/prometheus/client_golang v1.20.5/go.mod h1:PIEt8X02hGcP8JWbeHyeZ53Y/j
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E=
github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY=
github.com/prometheus/common v0.61.0 h1:3gv/GThfX0cV2lpO7gkTUwZru38mxevy90Bj8YFSRQQ=
github.com/prometheus/common v0.61.0/go.mod h1:zr29OCN/2BsJRaFwG8QOBr41D6kkchKbpeNH7pAjb/s=
github.com/prometheus/common v0.62.0 h1:xasJaQlnWAeyHdUBeGjXmutelfJHWMRr+Fg4QszZ2Io=
github.com/prometheus/common v0.62.0/go.mod h1:vyBcEuLSvWos9B1+CyL7JZ2up+uFzXhkqml0W5zIY1I=
github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc=
github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk=
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
@@ -373,8 +372,8 @@ github.com/rubenv/sql-migrate v1.7.1/go.mod h1:Ob2Psprc0/3ggbM6wCzyYVFFuc6FyZrb2
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/ryanuber/go-glob v1.0.0 h1:iQh3xXAumdQ+4Ufa5b25cRpC5TYKlno6hsv6Cb3pkBk=
github.com/ryanuber/go-glob v1.0.0/go.mod h1:807d1WSdnB0XRJzKNil9Om6lcp/3a0v4qIHxIXzX/Yc=
github.com/samber/lo v1.49.0 h1:AGnTnQrg1jpFuwECPUSoxZCfVH5W22b605kWSry3YxM=
github.com/samber/lo v1.49.0/go.mod h1:dO6KHFzUKXgP8LDhU0oI8d2hekjXnGOu0DB8Jecxd6o=
github.com/samber/lo v1.49.1 h1:4BIFyVfuQSEpluc7Fua+j1NolZHiEHEpaSEKdsH0tew=
github.com/samber/lo v1.49.1/go.mod h1:dO6KHFzUKXgP8LDhU0oI8d2hekjXnGOu0DB8Jecxd6o=
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys=
github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs=
@@ -390,8 +389,9 @@ github.com/smartystreets/goconvey v1.8.1 h1:qGjIddxOk4grTu9JPOU31tVfq3cNdBlNa5sS
github.com/smartystreets/goconvey v1.8.1/go.mod h1:+/u4qLyY6x1jReYOp7GOM2FSt8aP9CzCZL03bI28W60=
github.com/spf13/cobra v1.8.1 h1:e5/vxKd/rZsfSJMUX1agtjeTDf+qv1/JdBF8gg5k9ZM=
github.com/spf13/cobra v1.8.1/go.mod h1:wHxEcudfqmLYa8iTfL+OuZPbBZkmvliBWKIezN3kD9Y=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/spf13/pflag v1.0.6 h1:jFzHGLGAlb3ruxLB8MhbI6A8+AQX/2eW4qeyNZXNp2o=
github.com/spf13/pflag v1.0.6/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/sqids/sqids-go v0.4.1 h1:eQKYzmAZbLlRwHeHYPF35QhgxwZHLnlmVj9AkIj/rrw=
github.com/sqids/sqids-go v0.4.1/go.mod h1:EMwHuPQgSNFS0A49jESTfIQS+066XQTVhukrzEPScl8=
github.com/stoewer/go-strcase v1.3.0 h1:g0eASXYtp+yvN9fK8sH94oCIk0fau9uV1/ZdJ0AVEzs=
@@ -485,8 +485,8 @@ golang.org/x/crypto v0.20.0/go.mod h1:Xwo95rrVNIoSMx9wa1JroENMToLWn3RNVrTBpLHgZP
golang.org/x/crypto v0.32.0 h1:euUpcYgM8WcP71gNpTqQCn6rC2t6ULUPiOzfWaXVVfc=
golang.org/x/crypto v0.32.0/go.mod h1:ZnnJkOaASj8g0AjIduWNlq2NRxL0PlBrbKVyZ6V/Ugc=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 h1:e66Fs6Z+fZTbFBAxKfP3PALWBtpfqks2bwGcexMxgtk=
golang.org/x/exp v0.0.0-20240909161429-701f63a606c0/go.mod h1:2TbTHSBQa924w8M6Xs1QcRcFwyucIwBGpK1p2f1YFFY=
golang.org/x/exp v0.0.0-20250128182459-e0ece0dbea4c h1:KL/ZBHXgKGVmuZBZ01Lt57yE5ws8ZPSkkihmEyq7FXc=
golang.org/x/exp v0.0.0-20250128182459-e0ece0dbea4c/go.mod h1:tujkw807nyEEAamNbDrEGzRav+ilXA7PCRAd6xsmwiU=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
@@ -569,8 +569,8 @@ golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo=
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
golang.org/x/time v0.8.0 h1:9i3RxcPv3PZnitoVGMPDKZSq1xW1gK1Xy3ArNOGZfEg=
golang.org/x/time v0.8.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/time v0.9.0 h1:EsRrnYcQiGH+5FfbgvV4AP7qEZstoyrHB0DzarOQ4ZY=
golang.org/x/time v0.9.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
@@ -599,10 +599,10 @@ google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
google.golang.org/genproto v0.0.0-20200423170343-7949de9c1215/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
google.golang.org/genproto/googleapis/api v0.0.0-20250115164207-1a7da9e5054f h1:gap6+3Gk41EItBuyi4XX/bp4oqJ3UwuIMl25yGinuAA=
google.golang.org/genproto/googleapis/api v0.0.0-20250115164207-1a7da9e5054f/go.mod h1:Ic02D47M+zbarjYYUlK57y316f2MoN0gjAwI3f2S95o=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f h1:OxYkA3wjPsZyBylwymxSHa7ViiW1Sml4ToBrncvFehI=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f/go.mod h1:+2Yz8+CLJbIfL9z73EW45avw8Lmge3xVElCP9zEKi50=
google.golang.org/genproto/googleapis/api v0.0.0-20250127172529-29210b9bc287 h1:A2ni10G3UlplFrWdCDJTl7D7mJ7GSRm37S+PDimaKRw=
google.golang.org/genproto/googleapis/api v0.0.0-20250127172529-29210b9bc287/go.mod h1:iYONQfRdizDB8JJBybql13nArx91jcUk7zCXEsOofM4=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250127172529-29210b9bc287 h1:J1H9f+LEdWAfHcez/4cvaVBox7cOYT+IU6rgqj5x++8=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250127172529-29210b9bc287/go.mod h1:8BS3B93F/U1juMFq9+EDk+qOT5CO1R9IzXxG3PTqiRk=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
@@ -635,7 +635,7 @@ gotest.tools/v3 v3.5.1/go.mod h1:isy3WKz7GK6uNw/sbHzfKBLvlvXwUyV06n6brMxxopU=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 h1:pUdcCO1Lk/tbT5ztQWOBi5HBgbBP1J8+AsQnQCKsi8A=
k8s.io/utils v0.0.0-20240711033017-18e509b52bc8/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
k8s.io/utils v0.0.0-20241210054802-24370beab758 h1:sdbE21q2nlQtFh65saZY+rRM6x6aJJI8IUa1AmH/qa0=
k8s.io/utils v0.0.0-20241210054802-24370beab758/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
moul.io/http2curl v1.0.0 h1:6XwpyZOYsgZJrU8exnG87ncVkU1FVCcTRpwzOkTDUi8=
moul.io/http2curl v1.0.0/go.mod h1:f6cULg+e4Md/oW1cYmwW4IWQOVl2lGbmCNGOHvzX2kE=
+5 -11
View File
@@ -6,22 +6,19 @@ MQTT adapter provides an MQTT API for sending messages through the platform. MQT
The service is configured using the environment variables presented in the following table. Note that any unset variables will be replaced with their default values.
| Variable | Description | Default |
| ---------------------------------------- | ----------------------------------------------------------------------------------- | --------------------------------- |
| Variable | Description | Default |
| ----------------------------------------- | ----------------------------------------------------------------------------------- | --------------------------------- |
| SMQ_MQTT_ADAPTER_LOG_LEVEL | Log level for the MQTT Adapter (debug, info, warn, error) | info |
| SMQ_MQTT_ADAPTER_MQTT_PORT | mProxy port | 1883 |
| SMQ_MQTT_ADAPTER_MQTT_TARGET_HOST | MQTT broker host | localhost |
| SMQ_MQTT_ADAPTER_MQTT_TARGET_PORT | MQTT broker port | 1883 |
| SMQ_MQTT_ADAPTER_MQTT_QOS | MQTT broker QoS | 1 |
| SMQ_MQTT_ADAPTER_FORWARDER_TIMEOUT | MQTT forwarder for multiprotocol communication timeout | 30s |
| SMQ_MQTT_ADAPTER_MQTT_TARGET_HEALTH_CHECK | URL of broker health check | "" |
| SMQ_MQTT_ADAPTER_WS_PORT | mProxy MQTT over WS port | 8080 |
| SMQ_MQTT_ADAPTER_WS_TARGET_HOST | MQTT broker host for MQTT over WS | localhost |
| SMQ_MQTT_ADAPTER_WS_TARGET_PORT | MQTT broker port for MQTT over WS | 8080 |
| SMQ_MQTT_ADAPTER_WS_TARGET_PATH | MQTT broker MQTT over WS path | /mqtt |
| SMQ_MQTT_ADAPTER_INSTANCE | Instance name for MQTT adapter | "" |
| SMQ_CLIENTS_AUTH_GRPC_URL | Clients service Auth gRPC URL | <localhost:7000> |
| SMQ_CLIENTS_AUTH_GRPC_TIMEOUT | Clients service Auth gRPC request timeout in seconds | 1s |
| SMQ_CLIENTS_AUTH_GRPC_URL | Clients service Auth gRPC URL | <localhost:7000> |
| SMQ_CLIENTS_AUTH_GRPC_TIMEOUT | Clients service Auth gRPC request timeout in seconds | 1s |
| SMQ_CLIENTS_AUTH_GRPC_CLIENT_CERT | Path to the PEM encoded clients service Auth gRPC client certificate file | "" |
| SMQ_CLIENTS_AUTH_GRPC_CLIENT_KEY | Path to the PEM encoded clients service Auth gRPC client key file | "" |
| SMQ_CLIENTS_AUTH_GRPC_SERVER_CERTS | Path to the PEM encoded clients server Auth gRPC server trusted CA certificate file | "" |
@@ -29,7 +26,7 @@ The service is configured using the environment variables presented in the follo
| SMQ_MESSAGE_BROKER_URL | Message broker instance URL | <nats://localhost:4222> |
| SMQ_JAEGER_URL | Jaeger server URL | <http://localhost:4318/v1/traces> |
| SMQ_JAEGER_TRACE_RATIO | Jaeger sampling ratio | 1.0 |
| SMQ_SEND_TELEMETRY | Send telemetry to supermq call home server | true |
| SMQ_SEND_TELEMETRY | Send telemetry to supermq call home server | true |
| SMQ_MQTT_ADAPTER_INSTANCE_ID | Service instance ID | "" |
## Deployment
@@ -56,13 +53,10 @@ SMQ_MQTT_ADAPTER_LOG_LEVEL=info \
SMQ_MQTT_ADAPTER_MQTT_PORT=1883 \
SMQ_MQTT_ADAPTER_MQTT_TARGET_HOST=localhost \
SMQ_MQTT_ADAPTER_MQTT_TARGET_PORT=1883 \
SMQ_MQTT_ADAPTER_MQTT_QOS=1 \
SMQ_MQTT_ADAPTER_FORWARDER_TIMEOUT=30s \
SMQ_MQTT_ADAPTER_MQTT_TARGET_HEALTH_CHECK="" \
SMQ_MQTT_ADAPTER_WS_PORT=8080 \
SMQ_MQTT_ADAPTER_WS_TARGET_HOST=localhost \
SMQ_MQTT_ADAPTER_WS_TARGET_PORT=8080 \
SMQ_MQTT_ADAPTER_WS_TARGET_PATH=/mqtt \
SMQ_MQTT_ADAPTER_INSTANCE="" \
SMQ_CLIENTS_AUTH_GRPC_URL=localhost:7000 \
SMQ_CLIENTS_AUTH_GRPC_TIMEOUT=1s \
-75
View File
@@ -1,75 +0,0 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package mqtt
import (
"context"
"fmt"
"log/slog"
"strings"
"github.com/absmach/supermq/pkg/messaging"
)
// Forwarder specifies MQTT forwarder interface API.
type Forwarder interface {
// Forward subscribes to the Subscriber and
// publishes messages using provided Publisher.
Forward(ctx context.Context, id string, sub messaging.Subscriber, pub messaging.Publisher) error
}
type forwarder struct {
topic string
logger *slog.Logger
}
// NewForwarder returns new Forwarder implementation.
func NewForwarder(topic string, logger *slog.Logger) Forwarder {
return forwarder{
topic: topic,
logger: logger,
}
}
func (f forwarder) Forward(ctx context.Context, id string, sub messaging.Subscriber, pub messaging.Publisher) error {
subCfg := messaging.SubscriberConfig{
ID: id,
Topic: f.topic,
Handler: handle(ctx, pub, f.logger),
}
return sub.Subscribe(ctx, subCfg)
}
func handle(ctx context.Context, pub messaging.Publisher, logger *slog.Logger) handleFunc {
return func(msg *messaging.Message) error {
if msg.GetProtocol() == protocol {
return nil
}
// Use concatenation instead of fmt.Sprintf for the
// sake of simplicity and performance.
topic := "channels/" + msg.GetChannel() + "/messages"
if msg.GetSubtopic() != "" {
topic = topic + "/" + strings.ReplaceAll(msg.GetSubtopic(), ".", "/")
}
go func() {
if err := pub.Publish(ctx, topic, msg); err != nil {
logger.Warn(fmt.Sprintf("Failed to forward message: %s", err))
}
}()
return nil
}
}
type handleFunc func(msg *messaging.Message) error
func (h handleFunc) Handle(msg *messaging.Message) error {
return h(msg)
}
func (h handleFunc) Cancel() error {
return nil
}
-12
View File
@@ -1,12 +0,0 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
// Package tracing provides tracing instrumentation for SuperMQ MQTT adapter service.
//
// This package provides tracing middleware for SuperMQ MQTT adapter service.
// It can be used to trace incoming requests and add tracing capabilities to
// SuperMQ MQTT adapter service.
//
// For more details about tracing instrumentation for SuperMQ messaging refer
// to the documentation at https://docs.supermq.abstractmachines.fr/tracing/.
package tracing
-63
View File
@@ -1,63 +0,0 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package tracing
import (
"context"
"fmt"
"github.com/absmach/supermq/mqtt"
"github.com/absmach/supermq/pkg/messaging"
"github.com/absmach/supermq/pkg/server"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
const forwardOP = "process"
var _ mqtt.Forwarder = (*forwarderMiddleware)(nil)
type forwarderMiddleware struct {
topic string
forwarder mqtt.Forwarder
tracer trace.Tracer
host server.Config
}
// New creates new mqtt forwarder tracing middleware.
func New(config server.Config, tracer trace.Tracer, forwarder mqtt.Forwarder, topic string) mqtt.Forwarder {
return &forwarderMiddleware{
forwarder: forwarder,
tracer: tracer,
topic: topic,
host: config,
}
}
// Forward traces mqtt forward operations.
func (fm *forwarderMiddleware) Forward(ctx context.Context, id string, sub messaging.Subscriber, pub messaging.Publisher) error {
subject := fmt.Sprintf("channels.%s.messages", fm.topic)
spanName := fmt.Sprintf("%s %s", subject, forwardOP)
ctx, span := fm.tracer.Start(ctx,
spanName,
trace.WithAttributes(
attribute.String("messaging.system", "mqtt"),
attribute.Bool("messaging.destination.anonymous", false),
attribute.String("messaging.destination.template", "channels/{channelID}/messages/*"),
attribute.Bool("messaging.destination.temporary", true),
attribute.String("network.protocol.name", "mqtt"),
attribute.String("network.protocol.version", "3.1.1"),
attribute.String("network.transport", "tcp"),
attribute.String("network.type", "ipv4"),
attribute.String("messaging.operation", forwardOP),
attribute.String("messaging.client_id", id),
attribute.String("server.address", fm.host.Host),
attribute.String("server.socket.port", fm.host.Port),
),
)
defer span.End()
return fm.forwarder.Forward(ctx, id, sub, pub)
}
-11
View File
@@ -1,11 +0,0 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
// Package mqtt hold the implementation of the Publisher and PubSub
// interfaces for the MQTT messaging system, the internal messaging
// broker of the SuperMQ IoT platform. Due to the practical requirements
// implementation Publisher is created alongside PubSub. The reason for
// this is that Subscriber implementation of MQTT brings the burden of
// additional struct fields which are not used by Publisher. Subscriber
// is not implemented separately because PubSub can be used where Subscriber is needed.
package mqtt
-61
View File
@@ -1,61 +0,0 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package mqtt
import (
"context"
"errors"
"time"
"github.com/absmach/supermq/pkg/messaging"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
var errPublishTimeout = errors.New("failed to publish due to timeout reached")
var _ messaging.Publisher = (*publisher)(nil)
type publisher struct {
client mqtt.Client
timeout time.Duration
qos uint8
}
// NewPublisher returns a new MQTT message publisher.
func NewPublisher(address string, qos uint8, timeout time.Duration) (messaging.Publisher, error) {
client, err := newClient(address, "mqtt-publisher", timeout)
if err != nil {
return nil, err
}
ret := publisher{
client: client,
timeout: timeout,
qos: qos,
}
return ret, nil
}
func (pub publisher) Publish(ctx context.Context, topic string, msg *messaging.Message) error {
if topic == "" {
return ErrEmptyTopic
}
// Publish only the payload and not the whole message.
token := pub.client.Publish(topic, byte(pub.qos), false, msg.GetPayload())
if token.Error() != nil {
return token.Error()
}
if ok := token.WaitTimeout(pub.timeout); !ok {
return errPublishTimeout
}
return nil
}
func (pub publisher) Close() error {
pub.client.Disconnect(uint(pub.timeout))
return nil
}
-230
View File
@@ -1,230 +0,0 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package mqtt
import (
"context"
"errors"
"fmt"
"log/slog"
"sync"
"time"
"github.com/absmach/supermq/pkg/messaging"
mqtt "github.com/eclipse/paho.mqtt.golang"
"google.golang.org/protobuf/proto"
)
const username = "supermq-mqtt"
var (
// ErrConnect indicates that connection to MQTT broker failed.
ErrConnect = errors.New("failed to connect to MQTT broker")
// errSubscribeTimeout indicates that the subscription failed due to timeout.
errSubscribeTimeout = errors.New("failed to subscribe due to timeout reached")
// errUnsubscribeTimeout indicates that unsubscribe failed due to timeout.
errUnsubscribeTimeout = errors.New("failed to unsubscribe due to timeout reached")
// errUnsubscribeDeleteTopic indicates that unsubscribe failed because the topic was deleted.
errUnsubscribeDeleteTopic = errors.New("failed to unsubscribe due to deletion of topic")
// ErrNotSubscribed indicates that the topic is not subscribed to.
ErrNotSubscribed = errors.New("not subscribed")
// ErrEmptyTopic indicates the absence of topic.
ErrEmptyTopic = errors.New("empty topic")
// ErrEmptyID indicates the absence of ID.
ErrEmptyID = errors.New("empty ID")
)
var _ messaging.PubSub = (*pubsub)(nil)
type subscription struct {
client mqtt.Client
topics []string
cancel func() error
}
type pubsub struct {
publisher
logger *slog.Logger
mu sync.RWMutex
address string
timeout time.Duration
subscriptions map[string]subscription
}
// NewPubSub returns MQTT message publisher/subscriber.
func NewPubSub(url string, qos uint8, timeout time.Duration, logger *slog.Logger) (messaging.PubSub, error) {
client, err := newClient(url, "mqtt-publisher", timeout)
if err != nil {
return nil, err
}
ret := &pubsub{
publisher: publisher{
client: client,
timeout: timeout,
qos: qos,
},
address: url,
timeout: timeout,
logger: logger,
subscriptions: make(map[string]subscription),
}
return ret, nil
}
func (ps *pubsub) Subscribe(ctx context.Context, cfg messaging.SubscriberConfig) error {
if cfg.ID == "" {
return ErrEmptyID
}
if cfg.Topic == "" {
return ErrEmptyTopic
}
ps.mu.Lock()
defer ps.mu.Unlock()
s, ok := ps.subscriptions[cfg.ID]
// If the client exists, check if it's subscribed to the topic and unsubscribe if needed.
switch ok {
case true:
if ok := s.contains(cfg.Topic); ok {
if err := s.unsubscribe(cfg.Topic, ps.timeout); err != nil {
return err
}
}
default:
client, err := newClient(ps.address, cfg.ID, ps.timeout)
if err != nil {
return err
}
s = subscription{
client: client,
topics: []string{},
cancel: cfg.Handler.Cancel,
}
}
s.topics = append(s.topics, cfg.Topic)
ps.subscriptions[cfg.ID] = s
token := s.client.Subscribe(cfg.Topic, byte(ps.qos), ps.mqttHandler(cfg.Handler))
if token.Error() != nil {
return token.Error()
}
if ok := token.WaitTimeout(ps.timeout); !ok {
return errSubscribeTimeout
}
return nil
}
func (ps *pubsub) Unsubscribe(ctx context.Context, id, topic string) error {
if id == "" {
return ErrEmptyID
}
if topic == "" {
return ErrEmptyTopic
}
ps.mu.Lock()
defer ps.mu.Unlock()
s, ok := ps.subscriptions[id]
if !ok || !s.contains(topic) {
return ErrNotSubscribed
}
if err := s.unsubscribe(topic, ps.timeout); err != nil {
return err
}
ps.subscriptions[id] = s
if len(s.topics) == 0 {
delete(ps.subscriptions, id)
}
return nil
}
func (s *subscription) unsubscribe(topic string, timeout time.Duration) error {
if s.cancel != nil {
if err := s.cancel(); err != nil {
return err
}
}
token := s.client.Unsubscribe(topic)
if token.Error() != nil {
return token.Error()
}
if ok := token.WaitTimeout(timeout); !ok {
return errUnsubscribeTimeout
}
if ok := s.delete(topic); !ok {
return errUnsubscribeDeleteTopic
}
return token.Error()
}
func newClient(address, id string, timeout time.Duration) (mqtt.Client, error) {
opts := mqtt.NewClientOptions().
SetUsername(username).
AddBroker(address).
SetClientID(id)
client := mqtt.NewClient(opts)
token := client.Connect()
if token.Error() != nil {
return nil, token.Error()
}
if ok := token.WaitTimeout(timeout); !ok {
return nil, ErrConnect
}
return client, nil
}
func (ps *pubsub) mqttHandler(h messaging.MessageHandler) mqtt.MessageHandler {
return func(_ mqtt.Client, m mqtt.Message) {
var msg messaging.Message
if err := proto.Unmarshal(m.Payload(), &msg); err != nil {
ps.logger.Warn(fmt.Sprintf("Failed to unmarshal received message: %s", err))
return
}
if err := h.Handle(&msg); err != nil {
ps.logger.Warn(fmt.Sprintf("Failed to handle SuperMQ message: %s", err))
}
}
}
// Contains checks if a topic is present.
func (s subscription) contains(topic string) bool {
return s.indexOf(topic) != -1
}
// Finds the index of an item in the topics.
func (s subscription) indexOf(element string) int {
for k, v := range s.topics {
if element == v {
return k
}
}
return -1
}
// Deletes a topic from the slice.
func (s *subscription) delete(topic string) bool {
index := s.indexOf(topic)
if index == -1 {
return false
}
topics := make([]string, len(s.topics)-1)
copy(topics[:index], s.topics[:index])
copy(topics[index:], s.topics[index+1:])
s.topics = topics
return true
}
-474
View File
@@ -1,474 +0,0 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package mqtt_test
import (
"context"
"errors"
"fmt"
"testing"
"time"
"github.com/absmach/supermq/pkg/messaging"
mqttpubsub "github.com/absmach/supermq/pkg/messaging/mqtt"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/stretchr/testify/assert"
"google.golang.org/protobuf/proto"
)
const (
topic = "topic"
chansPrefix = "channels"
channel = "9b7b1b3f-b1b0-46a8-a717-b8213f9eda3b"
subtopic = "engine"
tokenTimeout = 100 * time.Millisecond
)
var data = []byte("payload")
// ErrFailedHandleMessage indicates that the message couldn't be handled.
var errFailedHandleMessage = errors.New("failed to handle supermq message")
func TestPublisher(t *testing.T) {
msgChan := make(chan []byte)
// Subscribing with topic, and with subtopic, so that we can publish messages.
client, err := newClient(address, "clientID1", brokerTimeout)
assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
token := client.Subscribe(topic, qos, func(_ mqtt.Client, m mqtt.Message) {
msgChan <- m.Payload()
})
if ok := token.WaitTimeout(tokenTimeout); !ok {
assert.Fail(t, fmt.Sprintf("failed to subscribe to topic %s", topic))
}
assert.Nil(t, token.Error(), fmt.Sprintf("got unexpected error: %s", token.Error()))
token = client.Subscribe(fmt.Sprintf("%s.%s", topic, subtopic), qos, func(_ mqtt.Client, m mqtt.Message) {
msgChan <- m.Payload()
})
if ok := token.WaitTimeout(tokenTimeout); !ok {
assert.Fail(t, fmt.Sprintf("failed to subscribe to topic %s", fmt.Sprintf("%s.%s", topic, subtopic)))
}
assert.Nil(t, token.Error(), fmt.Sprintf("got unexpected error: %s", token.Error()))
t.Cleanup(func() {
token := client.Unsubscribe(topic, fmt.Sprintf("%s.%s", topic, subtopic))
token.WaitTimeout(tokenTimeout)
assert.Nil(t, token.Error(), fmt.Sprintf("got unexpected error: %s", token.Error()))
client.Disconnect(100)
})
// Test publish with an empty topic.
err = pubsub.Publish(context.TODO(), "", &messaging.Message{Payload: data})
assert.Equal(t, err, mqttpubsub.ErrEmptyTopic, fmt.Sprintf("Publish with empty topic: expected: %s, got: %s", mqttpubsub.ErrEmptyTopic, err))
cases := []struct {
desc string
channel string
subtopic string
payload []byte
}{
{
desc: "publish message with nil payload",
payload: nil,
},
{
desc: "publish message with string payload",
payload: data,
},
{
desc: "publish message with channel",
payload: data,
channel: channel,
},
{
desc: "publish message with subtopic",
payload: data,
subtopic: subtopic,
},
{
desc: "publish message with channel and subtopic",
payload: data,
channel: channel,
subtopic: subtopic,
},
}
for _, tc := range cases {
expectedMsg := messaging.Message{
Publisher: "clientID11",
Channel: tc.channel,
Subtopic: tc.subtopic,
Payload: tc.payload,
}
err := pubsub.Publish(context.TODO(), topic, &expectedMsg)
assert.Nil(t, err, fmt.Sprintf("%s: got unexpected error: %s\n", tc.desc, err))
data, err := proto.Marshal(&expectedMsg)
assert.Nil(t, err, fmt.Sprintf("%s: failed to serialize protobuf error: %s\n", tc.desc, err))
receivedMsg := <-msgChan
if tc.payload != nil {
assert.Equal(t, expectedMsg.GetPayload(), receivedMsg, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, data, receivedMsg))
}
}
}
func TestSubscribe(t *testing.T) {
msgChan := make(chan *messaging.Message)
// Creating client to Publish messages to subscribed topic.
client, err := newClient(address, "supermq", brokerTimeout)
assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
t.Cleanup(func() {
client.Unsubscribe()
client.Disconnect(100)
})
cases := []struct {
desc string
topic string
clientID string
err error
handler messaging.MessageHandler
}{
{
desc: "Subscribe to a topic with an ID",
topic: topic,
clientID: "clientid1",
err: nil,
handler: handler{false, "clientid1", msgChan},
},
{
desc: "Subscribe to the same topic with a different ID",
topic: topic,
clientID: "clientid2",
err: nil,
handler: handler{false, "clientid2", msgChan},
},
{
desc: "Subscribe to an already subscribed topic with an ID",
topic: topic,
clientID: "clientid1",
err: nil,
handler: handler{false, "clientid1", msgChan},
},
{
desc: "Subscribe to a topic with a subtopic with an ID",
topic: fmt.Sprintf("%s.%s", topic, subtopic),
clientID: "clientid1",
err: nil,
handler: handler{false, "clientid1", msgChan},
},
{
desc: "Subscribe to an already subscribed topic with a subtopic with an ID",
topic: fmt.Sprintf("%s.%s", topic, subtopic),
clientID: "clientid1",
err: nil,
handler: handler{false, "clientid1", msgChan},
},
{
desc: "Subscribe to an empty topic with an ID",
topic: "",
clientID: "clientid1",
err: mqttpubsub.ErrEmptyTopic,
handler: handler{false, "clientid1", msgChan},
},
{
desc: "Subscribe to a topic with empty id",
topic: topic,
clientID: "",
err: mqttpubsub.ErrEmptyID,
handler: handler{false, "", msgChan},
},
}
for _, tc := range cases {
subCfg := messaging.SubscriberConfig{
ID: tc.clientID,
Topic: tc.topic,
Handler: tc.handler,
}
err = pubsub.Subscribe(context.TODO(), subCfg)
assert.Equal(t, err, tc.err, fmt.Sprintf("%s: expected: %s, but got: %s", tc.desc, err, tc.err))
if tc.err == nil {
expectedMsg := messaging.Message{
Publisher: "clientID1",
Channel: channel,
Subtopic: subtopic,
Payload: data,
}
data, err := proto.Marshal(&expectedMsg)
assert.Nil(t, err, fmt.Sprintf("%s: failed to serialize protobuf error: %s\n", tc.desc, err))
token := client.Publish(tc.topic, qos, false, data)
token.WaitTimeout(tokenTimeout)
assert.Nil(t, token.Error(), fmt.Sprintf("got unexpected error: %s", token.Error()))
receivedMsg := <-msgChan
assert.Equal(t, expectedMsg.Channel, receivedMsg.Channel, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, &expectedMsg, receivedMsg))
assert.Equal(t, expectedMsg.Created, receivedMsg.Created, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, &expectedMsg, receivedMsg))
assert.Equal(t, expectedMsg.Protocol, receivedMsg.Protocol, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, &expectedMsg, receivedMsg))
assert.Equal(t, expectedMsg.Publisher, receivedMsg.Publisher, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, &expectedMsg, receivedMsg))
assert.Equal(t, expectedMsg.Subtopic, receivedMsg.Subtopic, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, &expectedMsg, receivedMsg))
assert.Equal(t, expectedMsg.Payload, receivedMsg.Payload, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, &expectedMsg, receivedMsg))
}
}
}
func TestPubSub(t *testing.T) {
msgChan := make(chan *messaging.Message)
cases := []struct {
desc string
topic string
clientID string
err error
handler messaging.MessageHandler
}{
{
desc: "Subscribe to a topic with an ID",
topic: topic,
clientID: "clientid7",
err: nil,
handler: handler{false, "clientid7", msgChan},
},
{
desc: "Subscribe to the same topic with a different ID",
topic: topic,
clientID: "clientid8",
err: nil,
handler: handler{false, "clientid8", msgChan},
},
{
desc: "Subscribe to a topic with a subtopic with an ID",
topic: fmt.Sprintf("%s.%s", topic, subtopic),
clientID: "clientid7",
err: nil,
handler: handler{false, "clientid7", msgChan},
},
{
desc: "Subscribe to an empty topic with an ID",
topic: "",
clientID: "clientid7",
err: mqttpubsub.ErrEmptyTopic,
handler: handler{false, "clientid7", msgChan},
},
{
desc: "Subscribe to a topic with empty id",
topic: topic,
clientID: "",
err: mqttpubsub.ErrEmptyID,
handler: handler{false, "", msgChan},
},
}
for _, tc := range cases {
subCfg := messaging.SubscriberConfig{
ID: tc.clientID,
Topic: tc.topic,
Handler: tc.handler,
}
err := pubsub.Subscribe(context.TODO(), subCfg)
assert.Equal(t, err, tc.err, fmt.Sprintf("%s: expected: %s, but got: %s", tc.desc, err, tc.err))
if tc.err == nil {
// Use pubsub to subscribe to a topic, and then publish messages to that topic.
expectedMsg := messaging.Message{
Publisher: "clientID",
Channel: channel,
Subtopic: subtopic,
Payload: data,
}
data, err := proto.Marshal(&expectedMsg)
assert.Nil(t, err, fmt.Sprintf("%s: failed to serialize protobuf error: %s\n", tc.desc, err))
msg := messaging.Message{
Payload: data,
}
// Publish message, and then receive it on message channel.
err = pubsub.Publish(context.TODO(), topic, &msg)
assert.Nil(t, err, fmt.Sprintf("%s: got unexpected error: %s\n", tc.desc, err))
receivedMsg := <-msgChan
assert.Equal(t, expectedMsg.Channel, receivedMsg.Channel, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, &expectedMsg, receivedMsg))
assert.Equal(t, expectedMsg.Created, receivedMsg.Created, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, &expectedMsg, receivedMsg))
assert.Equal(t, expectedMsg.Protocol, receivedMsg.Protocol, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, &expectedMsg, receivedMsg))
assert.Equal(t, expectedMsg.Publisher, receivedMsg.Publisher, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, &expectedMsg, receivedMsg))
assert.Equal(t, expectedMsg.Subtopic, receivedMsg.Subtopic, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, &expectedMsg, receivedMsg))
assert.Equal(t, expectedMsg.Payload, receivedMsg.Payload, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, &expectedMsg, receivedMsg))
}
}
}
func TestUnsubscribe(t *testing.T) {
msgChan := make(chan *messaging.Message)
cases := []struct {
desc string
topic string
clientID string
err error
subscribe bool // True for subscribe and false for unsubscribe.
handler messaging.MessageHandler
}{
{
desc: "Subscribe to a topic with an ID",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic),
clientID: "clientid4",
err: nil,
subscribe: true,
handler: handler{false, "clientid4", msgChan},
},
{
desc: "Subscribe to the same topic with a different ID",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic),
clientID: "clientid9",
err: nil,
subscribe: true,
handler: handler{false, "clientid9", msgChan},
},
{
desc: "Unsubscribe from a topic with an ID",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic),
clientID: "clientid4",
err: nil,
subscribe: false,
handler: handler{false, "clientid4", msgChan},
},
{
desc: "Unsubscribe from same topic with different ID",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic),
clientID: "clientid9",
err: nil,
subscribe: false,
handler: handler{false, "clientid9", msgChan},
},
{
desc: "Unsubscribe from a non-existent topic with an ID",
topic: "h",
clientID: "clientid4",
err: mqttpubsub.ErrNotSubscribed,
subscribe: false,
handler: handler{false, "clientid4", msgChan},
},
{
desc: "Unsubscribe from an already unsubscribed topic with an ID",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic),
clientID: "clientid4",
err: mqttpubsub.ErrNotSubscribed,
subscribe: false,
handler: handler{false, "clientid4", msgChan},
},
{
desc: "Subscribe to a topic with a subtopic with an ID",
topic: fmt.Sprintf("%s.%s.%s", chansPrefix, topic, subtopic),
clientID: "clientidd4",
err: nil,
subscribe: true,
handler: handler{false, "clientidd4", msgChan},
},
{
desc: "Unsubscribe from a topic with a subtopic with an ID",
topic: fmt.Sprintf("%s.%s.%s", chansPrefix, topic, subtopic),
clientID: "clientidd4",
err: nil,
subscribe: false,
handler: handler{false, "clientidd4", msgChan},
},
{
desc: "Unsubscribe from an already unsubscribed topic with a subtopic with an ID",
topic: fmt.Sprintf("%s.%s.%s", chansPrefix, topic, subtopic),
clientID: "clientid4",
err: mqttpubsub.ErrNotSubscribed,
subscribe: false,
handler: handler{false, "clientid4", msgChan},
},
{
desc: "Unsubscribe from an empty topic with an ID",
topic: "",
clientID: "clientid4",
err: mqttpubsub.ErrEmptyTopic,
subscribe: false,
handler: handler{false, "clientid4", msgChan},
},
{
desc: "Unsubscribe from a topic with empty ID",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic),
clientID: "",
err: mqttpubsub.ErrEmptyID,
subscribe: false,
handler: handler{false, "", msgChan},
},
{
desc: "Subscribe to a new topic with an ID",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic+"2"),
clientID: "clientid55",
err: nil,
subscribe: true,
handler: handler{true, "clientid5", msgChan},
},
{
desc: "Unsubscribe from a topic with an ID with failing handler",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic+"2"),
clientID: "clientid55",
err: errFailedHandleMessage,
subscribe: false,
handler: handler{true, "clientid5", msgChan},
},
{
desc: "Subscribe to a new topic with subtopic with an ID",
topic: fmt.Sprintf("%s.%s.%s", chansPrefix, topic+"2", subtopic),
clientID: "clientid55",
err: nil,
subscribe: true,
handler: handler{true, "clientid5", msgChan},
},
{
desc: "Unsubscribe from a topic with subtopic with an ID with failing handler",
topic: fmt.Sprintf("%s.%s.%s", chansPrefix, topic+"2", subtopic),
clientID: "clientid55",
err: errFailedHandleMessage,
subscribe: false,
handler: handler{true, "clientid5", msgChan},
},
}
for _, tc := range cases {
subCfg := messaging.SubscriberConfig{
ID: tc.clientID,
Topic: tc.topic,
Handler: tc.handler,
}
switch tc.subscribe {
case true:
err := pubsub.Subscribe(context.TODO(), subCfg)
assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected: %s, but got: %s", tc.desc, tc.err, err))
default:
err := pubsub.Unsubscribe(context.TODO(), tc.clientID, tc.topic)
assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected: %s, but got: %s", tc.desc, tc.err, err))
}
}
}
type handler struct {
fail bool
publisher string
msgChan chan *messaging.Message
}
func (h handler) Handle(msg *messaging.Message) error {
if msg.GetPublisher() != h.publisher {
h.msgChan <- msg
}
return nil
}
func (h handler) Cancel() error {
if h.fail {
return errFailedHandleMessage
}
return nil
}
-121
View File
@@ -1,121 +0,0 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package mqtt_test
import (
"fmt"
"log"
"log/slog"
"os"
"os/signal"
"syscall"
"testing"
"time"
smqlog "github.com/absmach/supermq/logger"
"github.com/absmach/supermq/pkg/messaging"
mqttpubsub "github.com/absmach/supermq/pkg/messaging/mqtt"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/ory/dockertest/v3"
"github.com/ory/dockertest/v3/docker"
)
var (
pubsub messaging.PubSub
logger *slog.Logger
address string
)
const (
username = "supermq-mqtt"
qos = 2
port = "1883/tcp"
brokerTimeout = 30 * time.Second
poolMaxWait = 120 * time.Second
)
func TestMain(m *testing.M) {
pool, err := dockertest.NewPool("")
if err != nil {
log.Fatalf("Could not connect to docker: %s", err)
}
container, err := pool.RunWithOptions(&dockertest.RunOptions{
Repository: "eclipse-mosquitto",
Tag: "1.6.15",
}, func(config *docker.HostConfig) {
config.AutoRemove = true
config.RestartPolicy = docker.RestartPolicy{Name: "no"}
})
if err != nil {
log.Fatalf("Could not start container: %s", err)
}
handleInterrupt(pool, container)
address = fmt.Sprintf("%s:%s", "localhost", container.GetPort(port))
pool.MaxWait = poolMaxWait
logger, err = smqlog.New(os.Stdout, "debug")
if err != nil {
log.Fatal(err.Error())
}
if err := pool.Retry(func() error {
pubsub, err = mqttpubsub.NewPubSub(address, 2, brokerTimeout, logger)
return err
}); err != nil {
log.Fatalf("Could not connect to docker: %s", err)
}
code := m.Run()
if err := pool.Purge(container); err != nil {
log.Fatalf("Could not purge container: %s", err)
}
os.Exit(code)
defer func() {
err = pubsub.Close()
if err != nil {
log.Fatal(err.Error())
}
}()
}
func handleInterrupt(pool *dockertest.Pool, container *dockertest.Resource) {
c := make(chan os.Signal, 2)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
go func() {
<-c
if err := pool.Purge(container); err != nil {
log.Fatalf("Could not purge container: %s", err)
}
os.Exit(0)
}()
}
func newClient(address, id string, timeout time.Duration) (mqtt.Client, error) {
opts := mqtt.NewClientOptions().
SetUsername(username).
AddBroker(address).
SetClientID(id)
client := mqtt.NewClient(opts)
token := client.Connect()
if token.Error() != nil {
return nil, token.Error()
}
ok := token.WaitTimeout(timeout)
if !ok {
return nil, mqttpubsub.ErrConnect
}
if token.Error() != nil {
return nil, token.Error()
}
return client, nil
}