Removed VerneMQ auth plugin, Aedes impl. Added mproxy support in docker (#1049)

Signed-off-by: Nikola Marcetic <n.marcetic86@gmail.com>
This commit is contained in:
Nikola Marčetić
2020-03-01 23:55:43 +01:00
committed by GitHub
parent f6c813ea8c
commit dcba41e8c4
34 changed files with 27 additions and 8049 deletions
+2
View File
@@ -79,7 +79,9 @@ MF_HTTP_ADAPTER_PORT=8185
### MQTT
MF_MQTT_ADAPTER_LOG_LEVEL=debug
MF_MQTT_ADAPTER_PORT=1883
MF_MQTT_BROKER_PORT=1883
MF_MQTT_ADAPTER_WS_PORT=8080
MF_MQTT_BROKER_WS_PORT=8881
MF_MQTT_ADAPTER_ES_DB=0
MF_MQTT_ADAPTER_ES_PASS=
+5 -23
View File
@@ -36,13 +36,12 @@ define make_docker_dev
-f docker/Dockerfile.dev ./build
endef
all: $(SERVICES) mqtt
all: $(SERVICES)
.PHONY: all $(SERVICES) dockers dockers_dev latest release mqtt
.PHONY: all $(SERVICES) dockers dockers_dev latest release
clean:
rm -rf ${BUILD_DIR}
rm -rf mqtt/aedes/node_modules
cleandocker:
# Stop all containers (if running)
@@ -82,29 +81,14 @@ $(DOCKERS):
$(DOCKERS_DEV):
$(call make_docker_dev,$(@))
docker_mqtt:
# MQTT Docker build must be done from root dir because it copies .proto files
ifeq ($(GOARCH), arm)
docker build --tag=mainflux/mqtt -f mqtt/aedes/Dockerfile.arm .
else
docker build --tag=mainflux/mqtt -f mqtt/aedes/Dockerfile .
endif
docker_mqtt_verne:
docker build --tag=mainflux/mqtt-verne -f mqtt/verne/Dockerfile .
dockers: $(DOCKERS) docker_mqtt
dockers: $(DOCKERS)
dockers_dev: $(DOCKERS_DEV)
mqtt:
cd mqtt/aedes && npm install
define docker_push
for svc in $(SERVICES); do \
docker push mainflux/$$svc:$(1); \
done
docker push mainflux/mqtt:$(1)
endef
changelog:
@@ -120,22 +104,20 @@ release:
for svc in $(SERVICES); do \
docker tag mainflux/$$svc mainflux/$$svc:$(version); \
done
docker tag mainflux/mqtt mainflux/mqtt:$(version)
$(call docker_push,$(version))
rundev:
cd scripts && ./run.sh
run:
docker-compose -f docker/docker-compose.yml -f docker/aedes.yml up
docker-compose -f docker/docker-compose.yml -f docker/mproxy.yml up
runlora:
docker-compose \
-f docker/docker-compose.yml \
-f docker/aedes.yml up \
-f docker/addons/influxdb-writer/docker-compose.yml \
-f docker/addons/lora-adapter/docker-compose.yml up \
# Run all Mainflux core services except distributed tracing system - Jaeger. Recommended on gateways:
rungw:
MF_JAEGER_URL= docker-compose -f docker/docker-compose.yml -f docker/aedes.yml up --scale jaeger=0
MF_JAEGER_URL= docker-compose -f docker/docker-compose.yml -f docker/mproxy.yml up --scale jaeger=0
+1 -1
View File
@@ -53,7 +53,7 @@ Developing Mainflux will also require:
Once the prerequisites are installed, execute the following commands from the project's root:
```bash
docker-compose -f docker/docker-compose.yml -f docker/aedes.yml up
docker-compose -f docker/docker-compose.yml -f docker/mproxy.yml up
```
This will bring up the Mainflux docker services and interconnect them. This command can also be executed using the project's included Makefile:
-45
View File
@@ -1,45 +0,0 @@
# Copyright (c) Mainflux
# SPDX-License-Identifier: Apache-2.0
version: "3.7"
volumes:
mainflux-mqtt-redis-volume:
services:
nginx:
depends_on:
- mqtt-adapter
mqtt-redis:
image: redis:5.0-alpine
container_name: mainflux-mqtt-redis
restart: on-failure
networks:
- mainflux-base-net
volumes:
- mainflux-mqtt-redis-volume:/data
mqtt-adapter:
image: mainflux/mqtt:latest
container_name: mainflux-mqtt
depends_on:
- things
- nats
- mqtt-redis
restart: on-failure
environment:
MF_MQTT_ADAPTER_LOG_LEVEL: ${MF_MQTT_ADAPTER_LOG_LEVEL}
MF_MQTT_INSTANCE_ID: mqtt-adapter-1
MF_MQTT_ADAPTER_PORT: ${MF_MQTT_ADAPTER_PORT}
MF_MQTT_ADAPTER_WS_PORT: ${MF_MQTT_ADAPTER_WS_PORT}
MF_MQTT_ADAPTER_REDIS_HOST: mqtt-redis
MF_MQTT_ADAPTER_ES_HOST: es-redis
MF_NATS_URL: ${MF_NATS_URL}
MF_THINGS_URL: things:${MF_THINGS_AUTH_GRPC_PORT}
MF_JAEGER_URL: ${MF_JAEGER_URL}
ports:
- 18831:${MF_MQTT_ADAPTER_PORT}
- 8881:${MF_MQTT_ADAPTER_WS_PORT}
networks:
- mainflux-base-net
+17 -11
View File
@@ -4,29 +4,35 @@
version: "3.7"
volumes:
mainflux-mqtt-redis-volume:
mainflux-mqtt-broker-volume:
services:
nginx:
depends_on:
- mqtt-adapter
emqx:
image: emqx/emqx:latest
container_name: mainflux-emqx
vernemq:
image: vernemq/vernemq:1.10.1-alpine
container_name: mainflux-vernemq
restart: on-failure
environment:
EMQX_LISTENER__TCP__EXTERNAL: 1884
DOCKER_VERNEMQ_ALLOW_ANONYMOUS: "on"
DOCKER_VERNEMQ_LOG__CONSOLE__LEVEL: info
DOCKER_VERNEMQ_ACCEPT_EULA: "yes"
ports:
- 1884:1884
- 18831:${MF_MQTT_BROKER_PORT}
- 8881:${MF_MQTT_BROKER_WS_PORT}
- 7777:8888
networks:
- mainflux-base-net
volumes:
- mainflux-mqtt-broker-volume:/var/lib/vernemq
mqtt-adapter:
image: mainflux/mproxy:latest
container_name: mainflux-mqtt
depends_on:
- emqx
- vernemq
- things
- nats
restart: on-failure
@@ -38,10 +44,10 @@ services:
MF_NATS_URL: ${MF_NATS_URL}
MF_THINGS_URL: things:${MF_THINGS_AUTH_GRPC_PORT}
MF_JAEGER_URL: ${MF_JAEGER_URL}
MF_MQTT_ADAPTER_MQTT_TARGET_HOST: emqx
MF_MQTT_ADAPTER_MQTT_TARGET_PORT: 1884
MF_MQTT_ADAPTER_MQTT_TARGET_HOST: vernemq
MF_MQTT_ADAPTER_MQTT_TARGET_PORT: ${MF_MQTT_BROKER_PORT}
ports:
- 18831:${MF_MQTT_ADAPTER_PORT}
- 8881:${MF_MQTT_ADAPTER_WS_PORT}
- 18832:${MF_MQTT_ADAPTER_PORT}
- 8882:${MF_MQTT_ADAPTER_WS_PORT}
networks:
- mainflux-base-net
-72
View File
@@ -1,72 +0,0 @@
# Copyright (c) Mainflux
# SPDX-License-Identifier: Apache-2.0
version: "3.7"
x-mqtt-adapter: &mqtt-adapter
image: mainflux/mqtt-verne:latest
depends_on:
- things
- nats
restart: on-failure
environment: &mqtt-adapter-env
MF_MQTT_ADAPTER_LOG_LEVEL: ${MF_MQTT_ADAPTER_LOG_LEVEL}
MF_MQTT_INSTANCE_ID: mqtt-adapter-1
MF_MQTT_ADAPTER_ES_URL: tcp://es-redis:${MF_REDIS_TCP_PORT}
MF_NATS_URL: ${MF_NATS_URL}
MF_THINGS_AUTH_GRPC_URL: http://things:${MF_THINGS_AUTH_GRPC_PORT}
DOCKER_VERNEMQ_PLUGINS__VMQ_PASSWD: "off"
DOCKER_VERNEMQ_PLUGINS__VMQ_ACL: "off"
DOCKER_VERNEMQ_PLUGINS__MFX_AUTH: "on"
DOCKER_VERNEMQ_PLUGINS__MFX_AUTH__PATH: /mainflux/_build/default
DOCKER_VERNEMQ_LOG__CONSOLE__LEVEL: debug
MF_MQTT_VERNEMQ_GRPC_POOL_SIZE: 1000
networks:
- mainflux-base-net
services:
nginx:
environment:
MF_MQTT_CLUSTER: 1
depends_on:
- mqtt-adapter-1
- mqtt-adapter-2
- mqtt-adapter-3
mqtt-adapter-1:
<< : *mqtt-adapter
container_name: mainflux-mqtt-1
ports:
- 18831:${MF_MQTT_ADAPTER_PORT}
- 8891:${MF_MQTT_ADAPTER_WS_PORT}
- 7777:8888
mqtt-adapter-2:
<< : *mqtt-adapter
container_name: mainflux-mqtt-2
environment:
<< : *mqtt-adapter-env
DOCKER_VERNEMQ_COMPOSE: 1
DOCKER_VERNEMQ_DISCOVERY_NODE: mqtt-adapter-1
ports:
- 18832:1883
- 8892:8080
- 7778:8888
depends_on:
- mqtt-adapter-1
mqtt-adapter-3:
<< : *mqtt-adapter
container_name: mainflux-mqtt-3
environment:
<< : *mqtt-adapter-env
DOCKER_VERNEMQ_COMPOSE: 1
DOCKER_VERNEMQ_DISCOVERY_NODE: mqtt-adapter-1
ports:
- 18833:1883
- 8893:8080
- 7779:8888
depends_on:
- mqtt-adapter-1
-37
View File
@@ -1,37 +0,0 @@
# Copyright (c) Mainflux
# SPDX-License-Identifier: Apache-2.0
version: "3.7"
services:
nginx:
depends_on:
- mqtt-adapter
mqtt-adapter:
image: mainflux/mqtt-verne:latest
container_name: mainflux-mqtt
depends_on:
- things
- nats
restart: on-failure
environment:
MF_MQTT_ADAPTER_LOG_LEVEL: ${MF_MQTT_ADAPTER_LOG_LEVEL}
MF_MQTT_INSTANCE_ID: mqtt-adapter-1
MF_MQTT_ADAPTER_ES_URL: tcp://es-redis:${MF_REDIS_TCP_PORT}
MF_MQTT_ADAPTER_ES_DB: ${MF_MQTT_ADAPTER_ES_DB}
MF_MQTT_ADAPTER_ES_PASS: ${MF_MQTT_ADAPTER_ES_PASS}
MF_NATS_URL: ${MF_NATS_URL}
MF_THINGS_AUTH_GRPC_URL: http://things:${MF_THINGS_AUTH_GRPC_PORT}
DOCKER_VERNEMQ_PLUGINS__VMQ_PASSWD: "off"
DOCKER_VERNEMQ_PLUGINS__VMQ_ACL: "off"
DOCKER_VERNEMQ_PLUGINS__MFX_AUTH: "on"
DOCKER_VERNEMQ_PLUGINS__MFX_AUTH__PATH: /mainflux/_build/default
DOCKER_VERNEMQ_LOG__CONSOLE__LEVEL: debug
MF_MQTT_VERNEMQ_GRPC_POOL_SIZE: 1000
ports:
- 18831:${MF_MQTT_ADAPTER_PORT}
- 8881:${MF_MQTT_ADAPTER_WS_PORT}
- 7777:8888
networks:
- mainflux-base-net
-3
View File
@@ -1,3 +0,0 @@
.git
node_modules/
test
-1
View File
@@ -1 +0,0 @@
node_modules/
-18
View File
@@ -1,18 +0,0 @@
# Copyright (c) Mainflux
# SPDX-License-Identifier: Apache-2.0
FROM node:10.15.1-alpine as builder
## Install build toolchain, install node deps and compile native add-ons
RUN apk add --no-cache python make g++
COPY mqtt/aedes/package.json .
RUN npm rebuild && npm install --only=production
FROM node:10.15.1-alpine
## Copy built node modules and binaries without including the toolchain
COPY --from=builder node_modules/ ./node_modules
COPY *.proto mqtt/* ./
EXPOSE 1883 8880
CMD ["node", "mqtt.js"]
-13
View File
@@ -1,13 +0,0 @@
FROM arm32v7/node:10.16.0-stretch-slim
COPY qemu-arm-static /usr/bin
COPY *.proto mqtt/* ./
RUN npm rebuild && npm install
EXPOSE 1883 8880
CMD ["node", "mqtt.js"]
RUN rm /usr/bin/qemu-arm-static
-159
View File
@@ -1,159 +0,0 @@
# MQTT adapter
MQTT adapter provides an MQTT API for sending and receiving messages through the
platform.
## Configuration
The service is configured using the environment variables presented in the
following table. Note that any unset variables will be replaced with their
default values.
| Variable | Description | Default |
|-----------------------------|-------------------------------------------------------|-----------------------|
| MF_MQTT_ADAPTER_LOG_LEVEL | MQTT adapter log level | error |
| MF_MQTT_INSTANCE_ID | ID of MQTT adapter instance | |
| MF_MQTT_ADAPTER_PORT | Service MQTT port | 1883 |
| MF_MQTT_ADAPTER_WS_PORT | WebSocket port | 8880 |
| MF_NATS_URL | NATS instance URL | nats://localhost:4222 |
| MF_MQTT_ADAPTER_REDIS_PORT | Redis port | 6379 |
| MF_MQTT_ADAPTER_REDIS_HOST | Redis host | localhost |
| MF_MQTT_ADAPTER_REDIS_PASS | Redis pass | mqtt |
| MF_MQTT_ADAPTER_REDIS_DB | Redis db | 0 |
| MF_MQTT_ADAPTER_MESSAGE_TTL | MQTT message TTL in seconds in Redis | 60 |
| MF_MQTT_ADAPTER_ES_PORT | Event stream port | 6379 |
| MF_MQTT_ADAPTER_ES_HOST | Event stream host | localhost |
| MF_MQTT_ADAPTER_ES_PASS | Event stream pass | mqtt |
| MF_MQTT_ADAPTER_ES_DB | Event stream db | 0 |
| MF_MQTT_CONCURRENT_MESSAGES | Number of messages that can be concurrently exchanged | 100 |
| MF_THINGS_URL | Things service URL | localhost:8181 |
| MF_MQTT_ADAPTER_CLIENT_TLS | Flag that indicates if TLS should be turned on | false |
| MF_MQTT_ADAPTER_CA_CERTS | Path to trusted CAs in PEM format | |
## Deployment
The service is distributed as Docker container. The following snippet provides
a compose file template that can be used to deploy the service container locally:
```yaml
version: "2"
services:
mqtt:
image: mainflux/mqtt:[version]
container_name: [instance name]
ports:
- [host machine port]:[configured port]
environment:
MF_THINGS_URL: [Things service URL]
MF_NATS_URL: [NATS instance URL]
MF_MQTT_ADAPTER_LOG_LEVEL: [MQTT adapter log level]
MF_MQTT_INSTANCE_ID: [ID of MQTT adapter instance]
MF_MQTT_ADAPTER_PORT: [Service MQTT port]
MF_MQTT_ADAPTER_WS_PORT: [Service WS port]
MF_MQTT_ADAPTER_REDIS_PORT: [Redis port]
MF_MQTT_ADAPTER_REDIS_HOST: [Redis host]
MF_MQTT_ADAPTER_REDIS_PASS: [Redis pass]
MF_MQTT_ADAPTER_REDIS_DB: [Redis db]
MF_MQTT_ADAPTER_MESSAGE_TTL: [MQTT message TTL in seconds in Redis]
MF_MQTT_ADAPTER_ES_PORT: [Event stream port]
MF_MQTT_ADAPTER_ES_HOST: [Event stream host]
MF_MQTT_ADAPTER_ES_PASS: [Event stream pass]
MF_MQTT_ADAPTER_ES_DB: [Event stream db]
MF_MQTT_CONCURRENT_MESSAGES: [Number of messages that can be concurrently exchanged]
MF_MQTT_ADAPTER_CLIENT_TLS: [Flag that indicates if TLS should be turned on]
MF_MQTT_ADAPTER_CA_CERTS: [Path to trusted CAs in PEM format]
```
To start the service outside of the container, execute the following shell script:
```bash
# download the latest version of the service
git clone https://github.com/mainflux/mainflux
cd mainflux/mqtt
# install dependencies
npm install
# set the environment variables and run the service
MF_THINGS_URL=[Things service URL] MF_NATS_URL=[NATS instance URL] MF_MQTT_ADAPTER_LOG_LEVEL=[MQTT adapter log level] MF_MQTT_INSTANCE_ID=[ID of MQTT adapter instance] MF_MQTT_ADAPTER_PORT=[Service MQTT port] MF_MQTT_ADAPTER_WS_PORT=[Service WS port] MF_MQTT_ADAPTER_REDIS_PORT=[Redis port] MF_MQTT_ADAPTER_REDIS_HOST=[Redis host] MF_MQTT_ADAPTER_REDIS_PASS=[Redis pass] MF_MQTT_ADAPTER_REDIS_DB=[Redis db] MF_MQTT_ADAPTER_MESSAGE_TTL=[MQTT message TTL in seconds in Redis] MF_MQTT_ADAPTER_ES_PORT=[Event stream port] MF_MQTT_ADAPTER_ES_HOST=[Event stream host] MF_MQTT_ADAPTER_ES_PASS=[Event stream pass] MF_MQTT_ADAPTER_ES_DB=[Event stream db] MF_MQTT_CONCURRENT_MESSAGES=[Number of messages that can be concurrently exchanged] MF_MQTT_ADAPTER_CLIENT_TLS=[Flag that indicates if TLS should be turned on] MF_MQTT_ADAPTER_CA_CERTS=[Path to trusted CAs in PEM format] node mqtt.js ..
```
## Usage
To use MQTT adapter you should use `channels/<channel_id>/messages`. Client key should
be passed as user's password. If you want to use MQTT over WebSocket, you could use
[Paho client](https://www.eclipse.org/paho/):
```
<!DOCTYPE html>
<html>
<head>
<meta http-equiv="Content-Type" content="text/html;charset=utf-8"/>
<script src="https://cdnjs.cloudflare.com/ajax/libs/paho-mqtt/1.0.2/mqttws31.min.js" type="text/javascript"></script>
<script type="text/javascript">
var wsbroker = "localhost"; //mqtt websocket enabled broker
var wsport = 443 // port for above
var client = new Paho.MQTT.Client(wsbroker, Number(wsport), '');
client.onConnectionLost = function (responseObject) {
console.log("connection lost: " + responseObject.errorMessage);
};
client.onMessageArrived = function (message) {
console.log("Msg", message.destinationName, ' -- ', message.payloadString);
};
var options = {
timeout: 3,
userName: "<thing_id>", // Replace <:your_thing_id>
password: "<thing_key>", // Replace <:your_thing_key>
useSSL: true,
onSuccess: function () {
console.log("mqtt connected");
// Connection succeeded; subscribe to our topic, you can add multile lines of these
client.subscribe('channels/<channel_id>/messages', {qos: 1}); // Replace <:your_channel_id>
// use the below if you want to publish to a topic on connect
var payload = [
{
"bn":"e35b157f-21b8-4adb-ab59-9df21461c815",
"bt":1.276020076001e+09,
"bu":"A",
"bver":5,
"n":"voltage",
"u":"V",
"v":120.1
},
{
"n":"current",
"t":-5,
"v":1.2
},
{
"n":"current",
"t":-4,
"v":1.3
}
];
var message = new Paho.MQTT.Message(JSON.stringify(payload));
message.destinationName = "channels/<channel_id>/messages"; // Replace <:your_channel_id>
client.send(message);
},
onFailure: function (message) {
console.log("Connection failed: " + message.errorMessage);
}
};
function init() {
client.connect(options);
}
</script>
</head>
<body onload="init();">
</body>
</html>
```
-310
View File
@@ -1,310 +0,0 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
'use strict';
const version = '0.9.0';
var http = require('http'),
redis = require('redis'),
net = require('net'),
protobuf = require('protobufjs'),
websocket = require('websocket-stream'),
grpc = require('grpc'),
protoLoader = require('@grpc/proto-loader'),
fs = require('fs'),
bunyan = require('bunyan'),
logging = require('aedes-logging');
// pass a proto file as a buffer/string or pass a parsed protobuf-schema object
var config = {
log_level: process.env.MF_MQTT_ADAPTER_LOG_LEVEL || 'error',
instance_id: process.env.MF_MQTT_INSTANCE_ID || '',
event_stream: 'mainflux.mqtt',
mqtt_port: Number(process.env.MF_MQTT_ADAPTER_PORT) || 1883,
ws_port: Number(process.env.MF_MQTT_ADAPTER_WS_PORT) || 8880,
nats_url: process.env.MF_NATS_URL || 'nats://localhost:4222',
redis_port: Number(process.env.MF_MQTT_ADAPTER_REDIS_PORT) || 6379,
redis_host: process.env.MF_MQTT_ADAPTER_REDIS_HOST || 'localhost',
redis_pass: process.env.MF_MQTT_ADAPTER_REDIS_PASS || 'mqtt',
redis_db: Number(process.env.MF_MQTT_ADAPTER_REDIS_DB) || 0,
message_ttl: Number(process.env.MF_MQTT_ADAPTER_MESSAGE_TTL) || 60, // in seconds
es_port: Number(process.env.MF_MQTT_ADAPTER_ES_PORT) || 6379,
es_host: process.env.MF_MQTT_ADAPTER_ES_HOST || 'localhost',
es_pass: process.env.MF_MQTT_ADAPTER_ES_PASS || 'mqtt',
es_db: Number(process.env.MF_MQTT_ADAPTER_ES_DB) || 0,
client_tls: (process.env.MF_MQTT_ADAPTER_CLIENT_TLS == 'true') || false,
ca_certs: process.env.MF_MQTT_ADAPTER_CA_CERTS || '',
concurrency: Number(process.env.MF_MQTT_CONCURRENT_MESSAGES) || 100,
auth_url: process.env.MF_THINGS_URL || 'localhost:8181',
schema_dir: process.argv[2] || '.',
},
logger = bunyan.createLogger({
name: 'mqtt',
level: config.log_level
}),
packageDefinition = protoLoader.loadSync(
config.schema_dir + '/authn.proto', {
keepCase: true,
longs: String,
enums: String,
defaults: true,
oneofs: true
}
),
protoDescriptor = grpc.loadPackageDefinition(packageDefinition),
thingsSchema = protoDescriptor.mainflux,
messagesSchema = new protobuf.Root().loadSync(config.schema_dir + '/message.proto'),
Message = messagesSchema.lookupType('mainflux.Message'),
nats = require('nats').connect({
servers: [config.nats_url],
preserveBuffers: true,
}),
aedesRedis = require('aedes-persistence-redis')({
port: config.redis_port,
host: config.redis_host,
password: config.redis_pass,
db: config.redis_db,
packetTTL: function (packet) {
return config.message_ttl; // in seconds
}
}),
mqRedis = require('mqemitter-redis')({
port: config.redis_port,
host: config.redis_host,
password: config.redis_pass,
db: config.redis_db
}),
aedes = require('aedes')({
id: config.instance_id ? config.instance_id : undefined,
mq: mqRedis,
persistence: aedesRedis,
concurrency: config.concurrency
}),
things = (function () {
var certs;
if (config.client_tls) {
certs = grpc.credentials.createSsl(config.ca_certs);
} else {
certs = grpc.credentials.createInsecure();
}
return new thingsSchema.ThingsService(config.auth_url, certs);
})(),
esclient = redis.createClient({
port: config.es_port,
host: config.es_host,
password: config.es_pass,
db: config.es_db
}),
servers = [
startMqtt(),
startWs()
];
logging({
instance: aedes,
servers: servers,
pinoOptions: {
level: 30
}
});
logger.level(config.log_level);
esclient.on('error', function (err) {
logger.warn('error on redis connection: %s', err.message);
});
// MQTT over WebSocket
function startWs() {
var server = http.createServer();
server.on('request', (req, res) => {
res.setHeader('Content-Type', 'application/json');
if (req.url === '/version') {
res.statusCode = 200;
res.end(`{"service":"mqtt-adapter","version":"${version}"}`);
}
res.statusCode = 404;
res.end('{"service":"mqtt-adpater", "message": "not found"}')
});
websocket.createServer({
server: server
}, aedes.handle);
server.listen(config.ws_port);
return server;
}
function startMqtt() {
return net.createServer(aedes.handle).listen(config.mqtt_port);
}
nats.subscribe('channel.>', {
'queue': 'mqtts'
}, function (msg) {
var m = Message.decode(msg),
packet, subtopic, ct;
if (m && m.protocol !== 'mqtt') {
subtopic = m.subtopic !== '' ? '/' + m.subtopic.replace(/\./g, '/') : '';
ct = (m.contentType) ? ('/ct/' + m.contentType.replace('/', '_').replace('+', '-')) : '';
packet = {
cmd: 'publish',
qos: 2,
topic: 'channels/' + m.channel + '/messages' + subtopic + ct,
payload: m.payload,
retain: false
};
aedes.publish(packet);
}
});
function parseTopic(topic) {
// Topics are in the form `channels/<channel_id>/messages`
// Subtopic's are in the form `channels/<channel_id>/messages/<subtopic>`
return /^channels\/(.+?)\/messages\/?.*$/.exec(topic);
}
aedes.authorizePublish = function (client, packet, publish) {
var channel = parseTopic(packet.topic);
if (!channel) {
var err = new Error('unknown topic');
logger.warn(err);
publish(err); // Bad username or password
return;
}
var channelId = channel[1],
accessReq = {
token: client.password,
chanID: channelId
},
// Parse unlimited subtopics
baseLength = 3, // First 3 elements which represents the base part of topic.
isEmpty = function (value) {
return value !== '';
},
parts = packet.topic.split('/'),
elements = parts.slice(baseLength).join('.').split('.').filter(isEmpty),
baseTopic = 'channel.' + channelId;
// Remove empty elements
for (var i = 0; i < elements.length; i++) {
if (elements[i].length > 1 && (elements[i].includes('*') || elements[i].includes('>'))) {
var err = new Error('invalid subtopic');
logger.warn(err);
publish(err);
return;
}
}
var contentType = '',
st = elements;
if (elements.length > 1 && elements[elements.length - 2] === 'ct') {
// If there is ct prefix, read and decode content type.
contentType = elements[elements.length - 1].replace('_', '/').replace('-', '+');
st = elements.slice(0, elements.length - 2);
}
var channelTopic = st.length ? baseTopic + '.' + st.join('.') : baseTopic,
onAuthorize = function (err, res) {
var msg;
if (!err) {
msg = Message.encode({
publisher: client.thingId,
channel: channelId,
subtopic: st.join('.'),
contentType: contentType,
protocol: 'mqtt',
payload: packet.payload
}).finish();
nats.publish(channelTopic, msg);
publish(null);
} else {
logger.warn('unauthorized publish: %s', err.message);
publish(err); // Bad username or password
}
};
things.CanAccessByKey(accessReq, onAuthorize);
};
aedes.authorizeSubscribe = function (client, packet, subscribe) {
var channel = parseTopic(packet.topic);
if (!channel) {
logger.warn('unknown topic');
var err = new Error('unknown topic')
subscribe(err, null); // Bad username or password
return;
}
var channelId = channel[1],
accessReq = {
token: client.password,
chanID: channelId
},
onAuthorize = function (err, res) {
if (!err) {
subscribe(null, packet);
} else {
logger.warn('unauthorized subscribe: %s', err.message);
subscribe(err, null); // Bad username or password
}
};
things.CanAccessByKey(accessReq, onAuthorize);
};
aedes.authenticate = function (client, username, password, acknowledge) {
var pass = (password || '').toString(),
identity = {
value: pass
},
onIdentify = function (err, res) {
if (!err) {
client.thingId = res.value.toString() || '';
client.id = client.id || client.thingId;
client.password = pass;
acknowledge(null, true);
publishConnEvent(client.thingId, 'connect');
} else {
logger.warn('failed to authenticate client with key %s', pass);
err.responseCode = 4;
acknowledge(err, false);
}
};
things.identify(identity, onIdentify);
};
aedes.on('clientDisconnect', function (client) {
logger.info('disconnect client %s', client.id);
client.password = null;
publishConnEvent(client.thingId, 'disconnect');
});
aedes.on('clientError', function (client, err) {
logger.warn('client error: client: %s, error: %s', client.id, err.message);
});
aedes.on('connectionError', function (client, err) {
logger.warn('connection error: client: %s, error: %s', client.id, err.message);
});
aedes.on('error', function (err) {
logger.warn('aedes error: %s', err.message);
});
function publishConnEvent(id, type) {
var onPublish = function (err) {
if (err) {
logger.warn('event publish failed: %s', err);
}
};
esclient.xadd(config.event_stream, '*',
'thing_id', id,
'timestamp', Math.round((new Date()).getTime() / 1000),
'event_type', type,
'instance', config.instance_id,
onPublish);
}
-3603
View File
File diff suppressed because it is too large Load Diff
-43
View File
@@ -1,43 +0,0 @@
{
"name": "mqtt-adapter",
"description": "Mainflux is an open source MIT licensed IoT cloud written in NodeJS",
"main": "mqtt.js",
"repository": {
"type": "git",
"url": "https://github.com/Mainflux/mqtt-adapter"
},
"license": "Apache-2.0",
"scripts": {
"start": "node mqtt.js",
"test": "node_modules/.bin/mocha",
"lint": "eslint mqtt.js"
},
"dependencies": {
"2": "^1.0.2",
"@grpc/proto-loader": "^0.5.0",
"aedes": "^0.39.0",
"aedes-logging": "^2.0.1",
"aedes-persistence-redis": "^6.0.0",
"ajv": "^5.5.2",
"atob": "^2.0.3",
"bunyan": "^1.5.1",
"grpc": "^1.20.3",
"lodash": "^4.17.10",
"mqemitter-redis": "^3.0.0",
"nats": "^1.2.10",
"protobufjs": "^6.8.8",
"redis": "^2.8.0",
"request": "^2.81.0",
"toml": "^2.3.0",
"websocket-stream": "^5.1.2"
},
"devDependencies": {
"chai": "^3.4.0",
"eslint": "^4.7.2",
"eslint-config-airbnb-base": "^12.0.1",
"eslint-plugin-import": "^2.7.0",
"jshint-stylish": "^2.0.1",
"mocha": "^5.2.0",
"supertest": "^3.1.0"
}
}
-7
View File
@@ -1,7 +0,0 @@
# Copyright (c) Mainflux
# SPDX-License-Identifier: Apache-2.0
_build
gpb
grpc
grpc_client
-13
View File
@@ -1,13 +0,0 @@
# Copyright (c) Mainflux
# SPDX-License-Identifier: Apache-2.0
FROM erlang:21-alpine AS builder
WORKDIR /mainflux
COPY ./mqtt/verne .
RUN apk add --no-cache git && \
./rebar3 compile
FROM vernemq/vernemq:1.9.2-alpine
WORKDIR /mainflux
COPY --from=builder --chown=10000:10000 /mainflux .
-114
View File
@@ -1,114 +0,0 @@
# MQTT adapter
MQTT adapter provides an MQTT API for sending and receiving messages through the
platform.
## Configuration
The service is configured using the environment variables presented in the
following table. Note that any unset variables will be replaced with their
default values.
| Variable | Description | Default |
|---------------------------------|-------------------------------------|-----------------------|
| MF_NATS_URL | NATS instance URL | nats://localhost:4222 |
| MF_THINGS_AUTH_GRPC_URL | Things service gRPC URL for Auth | tcp://localhost:8183 |
| MF_MQTT_ADAPTER_ES_URL | Redis ES URL | http://localhost:6379 |
| MF_MQTT_VERNEMQ_GRPC_POOL_SIZE | Number of processes to do gRPC comm | 10 |
Apart from this, VerneMQ configuration found
[here](https://github.com/ThingMesh/docker-vernemq/blob/master/vernemq.conf.default) can be customized.
When run in the Docker,
```yaml
DOCKER_VERNEMQ_PLUGINS__VMQ_PASSWD: "off"
DOCKER_VERNEMQ_PLUGINS__VMQ_ACL: "off"
DOCKER_VERNEMQ_PLUGINS__MFX_AUTH: "on"
DOCKER_VERNEMQ_PLUGINS__MFX_AUTH__PATH: /mainflux/_build/default
DOCKER_VERNEMQ_LISTENER__WS__DEFAULT: "127.0.0.1:8880"
```
> N.B. in this Docker env var setup, `__` replaces `.` in the config file,
> so `plugins.mfx_auth.path` becomes `DOCKER_VERNEMQ_PLUGINS__MFX_AUTH__PATH`
## Deployment
### Docker
The service is distributed as Docker container. The following snippet provides
a compose file template that can be used to deploy the service container locally:
```yaml
version: "2"
services:
mqtt-adapter:
image: mainflux/mqtt:latest
container_name: mainflux-mqtt
depends_on:
- things
- nats
- mqtt-redis
restart: on-failure
environment:
MF_MQTT_ADAPTER_LOG_LEVEL: ${MF_MQTT_ADAPTER_LOG_LEVEL}
MF_MQTT_INSTANCE_ID: mqtt-adapter
MF_MQTT_ADAPTER_PORT: ${MF_MQTT_ADAPTER_PORT}
MF_MQTT_ADAPTER_WS_PORT: ${MF_MQTT_ADAPTER_WS_PORT}
MF_MQTT_ADAPTER_REDIS_URL: tcp://mqtt-redis:${MF_REDIS_TCP_PORT}
MF_MQTT_ADAPTER_ES_URL: tcp://es-redis:${MF_REDIS_TCP_PORT}
MF_NATS_URL: ${MF_NATS_URL}
MF_THINGS_AUTH_HTTP_URL: http://things:${MF_THINGS_AUTH_HTTP_PORT}
DOCKER_VERNEMQ_PLUGINS__VMQ_PASSWD: "off"
DOCKER_VERNEMQ_PLUGINS__VMQ_ACL: "off"
DOCKER_VERNEMQ_PLUGINS__MFX_AUTH: "on"
DOCKER_VERNEMQ_PLUGINS__MFX_AUTH__PATH: /mainflux/_build/default
DOCKER_VERNEMQ_LISTENER__WS__DEFAULT: "127.0.0.1:8880"
ports:
- ${MF_MQTT_ADAPTER_PORT}:${MF_MQTT_ADAPTER_PORT}
- ${MF_MQTT_ADAPTER_WS_PORT}:${MF_MQTT_ADAPTER_WS_PORT}
networks:
- mainflux-base-net
```
### Native
#### Generate protobuf files
```
mkdir -p src/protos
rebar3 grpc gen
```
#### Compile
```
./rebar3 compile
```
#### Load Plugin
First start VerneMQ broker:
```
cd $VERNEMQ_BROKER_PATH
./_build/default/rel/vernemq/bin/vernemq start
```
Remove other plugins:
```
cd $VERNEMQ_BROKER_PATH
./_build/default/rel/vernemq/bin/vmq-admin plugin disable -n vmq_passwd
./_build/default/rel/vernemq/bin/vmq-admin plugin disable -n vmq_acl
```
Enable Mainflux `mfx_auth` plugin:
```
cd $VERNEMQ_BROKER_PATH
./_build/default/rel/vernemq/bin/vmq-admin plugin enable -n mfx_auth -p <path_to_mfx_auth_plugin>/_build/default
```
## Debugging
Inspect logs:
```
cd $VERNEMQ_BROKER_PATH
cat _build/default/rel/vernemq/log/console.log
cat _build/default/rel/vernemq/log/error.log
```
-15
View File
@@ -1,15 +0,0 @@
%%-*- mode: erlang -*-
{deps, [
{vernemq_dev, {git, "git://github.com/erlio/vernemq_dev.git", {branch, "master"}}},
{teacup_nats, "0.4.1"},
{gpb, "4.10.6"},
{grpcbox, "0.11.0"},
{poolboy, "1.5.2"},
{eredis, "1.2.0"}
]}.
{grpc, [{protos, "../.."},
{out_dir, "src/protos"},
{gpb_opts, [{module_name_suffix, "_pb"}]}]}.
{plugins, [grpcbox_plugin]}.
BIN
View File
Binary file not shown.
-30
View File
@@ -1,30 +0,0 @@
{application, mfx_auth,
[
{description, "Mainflux auth plugin for VerneMQ"},
{vsn, "0.0.1"},
{registered, []},
{applications, [
kernel,
stdlib,
teacup,
gpb,
grpcbox,
eredis,
poolboy
]},
{mod, { mfx_auth_app, []}},
{env, [
%% this tells VerneMQ to look in the file 'mfx_auth'
%% for the plugin hook functions. The format is:
%% {ModuleName, HookName, Arity, Opts}
{vmq_plugin_hooks,
[
{mfx_auth, auth_on_register, 5, []},
{mfx_auth, auth_on_publish, 6, []},
{mfx_auth, auth_on_subscribe, 3, []},
{mfx_auth, on_register, 3, []},
{mfx_auth, on_client_offline, 1, []},
{mfx_auth, on_client_gone, 1, []}
]}
]}
]}.
-192
View File
@@ -1,192 +0,0 @@
-module(mfx_auth).
-behaviour(auth_on_register_hook).
-behaviour(auth_on_subscribe_hook).
-behaviour(auth_on_publish_hook).
-behaviour(on_register_hook).
-behaviour(on_client_offline_hook).
-behaviour(on_client_gone_hook).
-export([auth_on_register/5,
auth_on_publish/6,
auth_on_subscribe/3,
on_register/3,
on_client_offline/1,
on_client_gone/1
]).
%% This file demonstrates the hooks you typically want to use
%% if your plugin deals with Authentication or Authorization.
%%
%% All it does is:
%% - authenticate every user and write the log
%% - authorize every PUBLISH and SUBSCRIBE and write it to the log
%%
%% You don't need to implement all of these hooks, just the one
%% needed for your use case.
%%
%% IMPORTANT:
%% these hook functions run in the session context
identify(undefined) ->
error_logger:info_msg("identify undefined", []),
{error, undefined};
identify(Password) ->
error_logger:info_msg("identify: ~p", [Password]),
Token = #{value => binary_to_list(Password)},
Worker = poolboy:checkout(grpc_pool),
Result = gen_server:call(Worker, {identify, Token}),
poolboy:checkin(grpc_pool, Worker),
Result.
access(UserName, ChannelId) ->
error_logger:info_msg("access: ~p ~p", [UserName, ChannelId]),
AccessByIdReq = #{thingID => binary_to_list(UserName), chanID => binary_to_list(ChannelId)},
Worker = poolboy:checkout(grpc_pool),
Result = gen_server:call(Worker, {can_access_by_id, AccessByIdReq}),
poolboy:checkin(grpc_pool, Worker),
Result.
auth_on_register({_IpAddr, _Port} = Peer, {_MountPoint, _ClientId} = SubscriberId, UserName, Password, CleanSession) ->
error_logger:info_msg("auth_on_register: ~p ~p ~p ~p ~p", [Peer, SubscriberId, UserName, Password, CleanSession]),
%% do whatever you like with the params, all that matters
%% is the return value of this function
%%
%% 1. return 'ok' -> CONNECT is authenticated
%% 2. return 'next' -> leave it to other plugins to decide
%% 3. return {ok, [{ModifierKey, NewVal}...]} -> CONNECT is authenticated,
%% but we might want to set some options used throughout the client session:
%% - {mountpoint, NewMountPoint::string}
%% - {clean_session, NewCleanSession::boolean}
%% 4. return {error, invalid_credentials} -> CONNACK_CREDENTIALS is sent
%% 5. return {error, whatever} -> CONNACK_AUTH is sent
case identify(Password) of
{ok, Id} ->
case Id of
UserName ->
ok;
_ ->
error_logger:info_msg("auth_on_register failed - Id: ~p, Username: ~p", [Id, UserName]),
{error, invalid_credentials}
end;
Other ->
Other
end.
% Erlang binary_join/2 can be found here: https://blog.kempkens.io/posts/joining-a-list-of-binaries-in-erlang/
-spec binary_join([binary()], binary()) -> binary().
binary_join([], _Sep) ->
<<>>;
binary_join([Part], _Sep) ->
Part;
binary_join([Head|Tail], Sep) ->
lists:foldl(fun (Value, Acc) -> <<Acc/binary, Sep/binary, Value/binary>> end, Head, Tail).
parseTopic(Topic) when length(Topic) == 3 ->
ChannelId = lists:nth(2, Topic),
NatsSubject = [<<"channel">>, <<".">>, ChannelId],
[{chanel_id, ChannelId}, {content_type, ""}, {subtopic, <<>>}, {nats_subject, NatsSubject}];
parseTopic(Topic) when length(Topic) > 3 ->
ChannelId = lists:nth(2, Topic),
case lists:nth(length(Topic) - 1, Topic) of
<<"ct">> ->
ContentType = lists:last(Topic),
ContentType2 = re:replace(ContentType, "_", "/", [global, {return, list}]),
ContentType3 = re:replace(ContentType2, "-", "\\+", [global, {return, list}]),
% Subtopic is a sublist that starts at 4th element and is of full length
% substracted by 3 elements (<<channels>>, <<ChanId>>, <<messages>>)
% and substracted by 2 (last elements are <<ct>> and <<ContentType>>)
Subtopic = lists:sublist(Topic, 4, length(Topic) - 3 - 2),
Subtopic2 = binary_join(Subtopic, <<".">>),
NatsSubject = [<<"channel">>, <<".">>, ChannelId, <<".">>, Subtopic2],
[{chanel_id, ChannelId}, {content_type, ContentType3}, {subtopic, Subtopic2}, {nats_subject, NatsSubject}];
_ ->
Subtopic = lists:sublist(Topic, 4, length(Topic) - 3),
Subtopic2 = binary_join(Subtopic, <<".">>),
NatsSubject = [<<"channel.">>, ChannelId, <<".">>, Subtopic2],
[{chanel_id, ChannelId}, {content_type, ""}, {subtopic, Subtopic2}, {nats_subject, NatsSubject}]
end.
auth_on_publish(UserName, {_MountPoint, _ClientId} = SubscriberId, QoS, Topic, Payload, IsRetain) ->
error_logger:info_msg("auth_on_publish: ~p ~p ~p ~p ~p ~p", [UserName, SubscriberId, QoS, Topic, Payload, IsRetain]),
%% do whatever you like with the params, all that matters
%% is the return value of this function
%%
%% 1. return 'ok' -> PUBLISH is authorized
%% 2. return 'next' -> leave it to other plugins to decide
%% 3. return {ok, NewPayload::binary} -> PUBLISH is authorized, but we changed the payload
%% 4. return {ok, [{ModifierKey, NewVal}...]} -> PUBLISH is authorized, but we might have changed different Publish Options:
%% - {topic, NewTopic::string}
%% - {payload, NewPayload::binary}
%% - {qos, NewQoS::0..2}
%% - {retain, NewRetainFlag::boolean}
%% 5. return {error, whatever} -> auth chain is stopped, and message is silently dropped (unless it is a Last Will message)
%%
% Topic is list of binaries, ex: [<<"channels">>, <<"1">>, <<"messages">>, <<"subtopic_1">>, ...]
[{chanel_id, ChannelId}, {content_type, ContentType}, {subtopic, Subtopic}, {nats_subject, NatsSubject}] = parseTopic(Topic),
case access(UserName, ChannelId) of
ok ->
Message = #{
channel => ChannelId,
subtopic => Subtopic,
publisher => UserName,
protocol => "mqtt",
contentType => ContentType,
payload => Payload
},
mfx_nats:publish(NatsSubject, message_pb:encode_msg(Message, message)),
ok;
Other ->
error_logger:info_msg("Error auth: ~p", [Other]),
Other
end.
auth_on_subscribe(UserName, ClientId, [{Topic, _QoS}|_] = Topics) ->
error_logger:info_msg("auth_on_subscribe: ~p ~p ~p", [UserName, ClientId, Topics]),
%% do whatever you like with the params, all that matters
%% is the return value of this function
%%
%% 1. return 'ok' -> SUBSCRIBE is authorized
%% 2. return 'next' -> leave it to other plugins to decide
%% 3. return {error, whatever} -> auth chain is stopped, and no SUBACK is sent
[{chanel_id, ChannelId}, _, _, _] = parseTopic(Topic),
access(UserName, ChannelId).
%%% Redis ES
publish_event(UserName, Type) ->
Timestamp = os:system_time(second),
[{_, InstanceId}] = ets:lookup(mfx_cfg, instance_id),
KeyValuePairs = [
"mainflux.mqtt", "*",
"thing_id", binary_to_list(UserName),
"timestamp", integer_to_list(Timestamp),
"event_type", Type,
"instance", InstanceId
],
mfx_redis:publish(KeyValuePairs).
on_register(_Peer, {_Mountpoint, ClientId} = _SubscriberId, UserName) ->
error_logger:info_msg("on_register, UserName: ~p, ClientId: ~p", [UserName, ClientId]),
ets:insert(mfx_client_map, {ClientId, UserName}),
publish_event(UserName, "connect").
publish_erase(ClientId) ->
case ets:lookup(mfx_client_map, ClientId) of
[] ->
error_logger:info_msg("UserName for client ~p not found.", [ClientId]),
error;
[{ClientId, UserName}] ->
ets:delete_object(mfx_client_map, {ClientId, UserName}),
publish_event(UserName, "disconnect")
end.
on_client_offline({_Mountpoint, ClientId} = _SubscriberId) ->
error_logger:info_msg("on_client_offline, ClientId: ~p", [ClientId]),
publish_erase(ClientId).
on_client_gone({_Mountpoint, ClientId} = _SubscriberId) ->
error_logger:info_msg("on_client_gone, ClientId: ~p", [ClientId]),
publish_erase(ClientId).
-43
View File
@@ -1,43 +0,0 @@
-module(mfx_auth_app).
-behaviour(application).
%% Application callbacks
-export([start/2, stop/1]).
%% ===================================================================
%% Application callbacks
%% ===================================================================
start(_StartType, _StartArgs) ->
% Put ENV variables in ETS
ets:new(mfx_cfg, [set, named_table, public]),
NatsUrl = os:getenv("MF_NATS_URL", "nats://localhost:4222"),
GrpcUrl = os:getenv("MF_THINGS_AUTH_GRPC_URL", "tcp://localhost:8183"),
RedisUrl = os:getenv("MF_MQTT_ADAPTER_ES_URL", "tcp://localhost:6379"),
RedisDb = os:getenv("MF_MQTT_ADAPTER_ES_DB", "0"),
RedisPwd = os:getenv("MF_MQTT_ADAPTER_ES_PASS", ""),
InstanceId = os:getenv("MF_MQTT_INSTANCE_ID", ""),
PoolSize = os:getenv("MF_MQTT_VERNEMQ_GRPC_POOL_SIZE", "10"),
ets:insert(mfx_cfg, [
{grpc_url, GrpcUrl},
{nats_url, NatsUrl},
{redis_url, RedisUrl},
{redis_db, list_to_integer(RedisDb)},
{redis_pwd, RedisPwd},
{instance_id, InstanceId}
]),
% Also, init one ETS table for keeping the #{ClientId => Username} mapping
ets:new(mfx_client_map, [set, named_table, public]),
% Start the MFX Auth process
mfx_auth_sup:start_link(list_to_integer(PoolSize)).
stop(_State) ->
ok.
-38
View File
@@ -1,38 +0,0 @@
-module(mfx_auth_sup).
-behaviour(supervisor).
%% API
-export([start_link/1]).
%% Supervisor callbacks
-export([init/1]).
%% Helper macro for declaring children of supervisor
-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}).
%% ===================================================================
%% API functions
%% ===================================================================
start_link(PoolSize) ->
supervisor:start_link({local, ?MODULE}, ?MODULE, [PoolSize]).
%% ===================================================================
%% Supervisor callbacks
%% ===================================================================
init([PoolSize]) ->
SizeArgs = [{size, PoolSize}, {max_overflow, PoolSize * 1.5}],
PoolArgs = [{name, {local, grpc_pool}}, {worker_module, mfx_grpc}],
WorkerArgs = [],
PoolSpec = poolboy:child_spec(grpc_pool, PoolArgs ++ SizeArgs, WorkerArgs),
error_logger:info_msg("PoolSpec: ~p", [PoolSpec]),
{ok, { {one_for_one, 5, 10}, [
{mfx_nats, {mfx_nats, start_link, []}, permanent, 2000, worker, [mfx_nats]},
{mfx_redis, {mfx_redis, start_link, []}, permanent, 2000, worker, [mfx_redis]},
PoolSpec
]} }.
-60
View File
@@ -1,60 +0,0 @@
-module(mfx_grpc).
-behaviour(gen_server).
-export([
start_link/0,
start_link/1,
init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2
]).
-record(state, {channel}).
init(_Args) ->
error_logger:info_msg("mfx_grpc genserver has started (~w)~n", [self()]),
[{_, GrpcUrl}] = ets:lookup(mfx_cfg, grpc_url),
{ok, {_, _, GrpcHost, GrpcPort, _, _}} = http_uri:parse(GrpcUrl),
error_logger:info_msg("gRPC host: ~p, port: ~p", [GrpcHost, GrpcPort]),
Channel = list_to_atom(pid_to_list(self())),
grpcbox_channel_sup:start_child(Channel, [{http, GrpcHost, GrpcPort, []}], #{}),
{ok, #state{channel = Channel}}.
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
start_link(Args) ->
gen_server:start_link(?MODULE, Args, []).
handle_call({identify, Message}, _From, #state{channel = Channel} = State) ->
error_logger:info_msg("mfx_grpc message: ~p, channel: ~p", [Message, Channel]),
{ok, Resp, HeadersAndTrailers} = mainflux_things_service_client:identify(Message, #{channel => Channel}),
case maps:get(<<":status">>, maps:get(headers, HeadersAndTrailers)) of
<<"200">> ->
{reply, {ok, maps:get(value, Resp)}, State};
ErrorStatus ->
{reply, {error, ErrorStatus}, State}
end;
handle_call({can_access_by_id, Message}, _From, #state{channel = Channel} = State) ->
error_logger:info_msg("mfx_grpc message: ~p, channel: ~p", [Message, Channel]),
{ok, _, HeadersAndTrailers} = mainflux_things_service_client:can_access_by_id(Message, #{channel => Channel}),
error_logger:info_msg("mfx_grpc can_access_by_id() HeadersAndTrailers: ~p", [HeadersAndTrailers]),
case maps:get(<<":status">>, maps:get(headers, HeadersAndTrailers)) of
<<"200">> ->
{reply, ok, State};
ErrorStatus ->
{reply, {error, ErrorStatus}, State}
end.
handle_cast(_Request, State) ->
{noreply, State}.
handle_info(_Info, State) ->
{noreply, State}.
terminate(Reason, #state{channel = Channel} =State) ->
grpcbox_channel:stop(Channel),
{stop, Reason, State}.
-95
View File
@@ -1,95 +0,0 @@
-module(mfx_nats).
-behaviour(gen_server).
-export([
start_link/0,
init/1,
publish/2,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
subscribe/1,
loop/1
]).
-record(state, {conn}).
start_link() ->
% Start genserver for PUB
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
init(_Args) ->
error_logger:info_msg("mfx_nats genserver has started (~w)~n", [self()]),
[{_, NatsUrl}] = ets:lookup(mfx_cfg, nats_url),
{ok, {_, _, NatsHost, NatsPort, _, _}} = http_uri:parse(NatsUrl),
{ok, NatsConn} = nats:connect(list_to_binary(NatsHost), NatsPort, #{buffer_size => 10}),
% Spawn SUB process
spawn_link(?MODULE, subscribe, [NatsConn]),
{ok, #state{conn = NatsConn}}.
publish(Subject, Message) ->
error_logger:info_msg("mfx_nats genserver publish ~p ~p", [Subject, Message]),
gen_server:cast(?MODULE, {publish, Subject, Message}).
% Currently unused, but kept to avoid compiler warnings (it expects handle_call/3 in the gen_server)
handle_call(Name, _From, _State) ->
Reply = lists:flatten(io_lib:format("Hello ~s from mfx_nats genserver", [Name])),
{reply, Reply, _State}.
handle_cast({publish, Subject, Message}, #state{conn = NatsConn} = State) ->
error_logger:info_msg("mfx_nats genserver cast ~p ~p ~p", [Subject, NatsConn, Message]),
nats:pub(NatsConn, Subject, #{payload => Message}),
{noreply, State}.
handle_info(_Info, State) ->
{noreply, State}.
terminate(Reason, #state{conn = NatsConn} = State) ->
nats:disconnect(NatsConn),
{stop, Reason, State}.
subscribe(NatsConn) ->
Subject = <<"channel.>">>,
nats:sub(NatsConn, Subject, #{queue_group => <<"mqtts">>}),
loop(NatsConn).
loop(Conn) ->
receive
{Conn, ready} ->
error_logger:info_msg("NATS ready", []),
loop(Conn);
{Conn, {msg, <<"teacup.control">>, _, <<"exit">>}} ->
error_logger:info_msg("NATS received exit msg", []);
{Conn, {msg, Subject, _ReplyTo, NatsMsg}} ->
#{protocol := Protocol, channel := ChannelId, contentType := ContentType,
payload := Payload, subtopic := Subtopic} = message_pb:decode_msg(NatsMsg, message),
error_logger:info_msg("Received NATS protobuf msg with payload: ~p and ContentType: ~p~n", [Payload, ContentType]),
case Protocol of
<<"mqtt">> ->
error_logger:info_msg("Ignoring MQTT message loopback", []),
loop(Conn);
_ ->
error_logger:info_msg("mfx_nats Protocol ~p", [Protocol]),
error_logger:info_msg("Re-publishing on MQTT broker", []),
Subtopic2 = re:split(Subtopic,"\\."),
ContentType2 = re:replace(ContentType, "/", "_", [global, {return, list}]),
ContentType3 = re:replace(ContentType2, "\\+", "-", [global, {return, binary}]),
{_, PublishFun, {_, _}} = vmq_reg:direct_plugin_exports(?MODULE),
% Topic needs to be in the form of the list, like [<<"channel">>,<<"6def78cd-b441-4fd8-8680-af7e3bbea187">>]
Topic = case ContentType of
<<"">> ->
[<<"channels">>, ChannelId, <<"messages">>] ++ Subtopic2;
_ ->
[<<"channels">>, ChannelId, <<"messages">>] ++ Subtopic2 ++ [<<"ct">>, ContentType3]
end,
error_logger:info_msg("Subject: ~p, Topic: ~p, PublishFunction: ~p~n", [Subject, Topic, PublishFun]),
PublishFun(Topic, Payload, #{qos => 0, retain => false}),
loop(Conn)
end;
Other ->
error_logger:info_msg("Received other msg: ~p~n", [Other]),
loop(Conn)
end.
-49
View File
@@ -1,49 +0,0 @@
-module(mfx_redis).
-behaviour(gen_server).
-export([
start_link/0,
init/1,
publish/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2
]).
-record(state, {conn}).
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
init(_Args) ->
error_logger:info_msg("mfx_redis genserver has started (~w)~n", [self()]),
[{_, RedisUrl}] = ets:lookup(mfx_cfg, redis_url),
[{_, RedisDb}] = ets:lookup(mfx_cfg, redis_db),
[{_, RedisPwd}] = ets:lookup(mfx_cfg, redis_pwd),
{ok, {_, _, RedisHost, RedisPort, _, _}} = http_uri:parse(RedisUrl),
error_logger:info_msg("mfx_redis host: ~p, port: ~p", [RedisHost, RedisPort]),
{ok, RedisConn} = eredis:start_link(RedisHost, RedisPort, RedisDb, RedisPwd),
{ok, #state{conn = RedisConn}}.
publish(Message) ->
gen_server:cast(?MODULE, {publish, Message}).
% Currently unused, but kept to avoid compiler warnings (it expects handle_call/3 in the gen_server)
handle_call(Name, _From, _State) ->
Reply = lists:flatten(io_lib:format("Hello ~s from mfx_redis genserver", [Name])),
{reply, Reply, _State}.
handle_cast({publish, Message}, #state{conn = RedisConn} = State) ->
error_logger:info_msg("mfx_redis genserver cast ~p ~p", [RedisConn, Message]),
eredis:q(RedisConn, ["XADD" | Message]),
{noreply, State}.
handle_info(_Info, State) ->
{noreply, State}.
terminate(Reason, #state{conn = RedisConn} = State) ->
eredis:stop(RedisConn),
{stop, Reason, State}.
File diff suppressed because it is too large Load Diff
@@ -1,17 +0,0 @@
%%%-------------------------------------------------------------------
%% @doc Behaviour to implement for grpc service mainflux.AuthNService.
%% @end
%%%-------------------------------------------------------------------
%% this module was generated on 2019-12-22T13:46:21+00:00 and should not be modified manually
-module(mainflux_auth_n_service_bhvr).
%% @doc Unary RPC
-callback issue(ctx:ctx(), authn_pb:issue_req()) ->
{ok, authn_pb:token(), ctx:ctx()} | grpcbox_stream:grpc_error_response().
%% @doc Unary RPC
-callback identify(ctx:ctx(), authn_pb:token()) ->
{ok, authn_pb:user_id(), ctx:ctx()} | grpcbox_stream:grpc_error_response().
@@ -1,61 +0,0 @@
%%%-------------------------------------------------------------------
%% @doc Client module for grpc service mainflux.AuthNService.
%% @end
%%%-------------------------------------------------------------------
%% this module was generated on 2019-12-22T13:46:21+00:00 and should not be modified manually
-module(mainflux_auth_n_service_client).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("grpcbox/include/grpcbox.hrl").
-define(is_ctx(Ctx), is_tuple(Ctx) andalso element(1, Ctx) =:= ctx).
-define(SERVICE, 'mainflux.AuthNService').
-define(PROTO_MODULE, 'authn_pb').
-define(MARSHAL_FUN(T), fun(I) -> ?PROTO_MODULE:encode_msg(I, T) end).
-define(UNMARSHAL_FUN(T), fun(I) -> ?PROTO_MODULE:decode_msg(I, T) end).
-define(DEF(Input, Output, MessageType), #grpcbox_def{service=?SERVICE,
message_type=MessageType,
marshal_fun=?MARSHAL_FUN(Input),
unmarshal_fun=?UNMARSHAL_FUN(Output)}).
%% @doc Unary RPC
-spec issue(authn_pb:issue_req()) ->
{ok, authn_pb:token(), grpcbox:metadata()} | grpcbox_stream:grpc_error_response().
issue(Input) ->
issue(ctx:new(), Input, #{}).
-spec issue(ctx:t() | authn_pb:issue_req(), authn_pb:issue_req() | grpcbox_client:options()) ->
{ok, authn_pb:token(), grpcbox:metadata()} | grpcbox_stream:grpc_error_response().
issue(Ctx, Input) when ?is_ctx(Ctx) ->
issue(Ctx, Input, #{});
issue(Input, Options) ->
issue(ctx:new(), Input, Options).
-spec issue(ctx:t(), authn_pb:issue_req(), grpcbox_client:options()) ->
{ok, authn_pb:token(), grpcbox:metadata()} | grpcbox_stream:grpc_error_response().
issue(Ctx, Input, Options) ->
grpcbox_client:unary(Ctx, <<"/mainflux.AuthNService/Issue">>, Input, ?DEF(issue_req, token, <<"mainflux.IssueReq">>), Options).
%% @doc Unary RPC
-spec identify(authn_pb:token()) ->
{ok, authn_pb:user_id(), grpcbox:metadata()} | grpcbox_stream:grpc_error_response().
identify(Input) ->
identify(ctx:new(), Input, #{}).
-spec identify(ctx:t() | authn_pb:token(), authn_pb:token() | grpcbox_client:options()) ->
{ok, authn_pb:user_id(), grpcbox:metadata()} | grpcbox_stream:grpc_error_response().
identify(Ctx, Input) when ?is_ctx(Ctx) ->
identify(Ctx, Input, #{});
identify(Input, Options) ->
identify(ctx:new(), Input, Options).
-spec identify(ctx:t(), authn_pb:token(), grpcbox_client:options()) ->
{ok, authn_pb:user_id(), grpcbox:metadata()} | grpcbox_stream:grpc_error_response().
identify(Ctx, Input, Options) ->
grpcbox_client:unary(Ctx, <<"/mainflux.AuthNService/Identify">>, Input, ?DEF(token, user_id, <<"mainflux.Token">>), Options).
@@ -1,21 +0,0 @@
%%%-------------------------------------------------------------------
%% @doc Behaviour to implement for grpc service mainflux.ThingsService.
%% @end
%%%-------------------------------------------------------------------
%% this module was generated on 2019-12-22T13:46:21+00:00 and should not be modified manually
-module(mainflux_things_service_bhvr).
%% @doc Unary RPC
-callback can_access_by_key(ctx:ctx(), authn_pb:access_by_key_req()) ->
{ok, authn_pb:thing_id(), ctx:ctx()} | grpcbox_stream:grpc_error_response().
%% @doc Unary RPC
-callback can_access_by_id(ctx:ctx(), authn_pb:access_by_id_req()) ->
{ok, authn_pb:empty(), ctx:ctx()} | grpcbox_stream:grpc_error_response().
%% @doc Unary RPC
-callback identify(ctx:ctx(), authn_pb:token()) ->
{ok, authn_pb:thing_id(), ctx:ctx()} | grpcbox_stream:grpc_error_response().
@@ -1,79 +0,0 @@
%%%-------------------------------------------------------------------
%% @doc Client module for grpc service mainflux.ThingsService.
%% @end
%%%-------------------------------------------------------------------
%% this module was generated on 2019-12-22T13:46:21+00:00 and should not be modified manually
-module(mainflux_things_service_client).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("grpcbox/include/grpcbox.hrl").
-define(is_ctx(Ctx), is_tuple(Ctx) andalso element(1, Ctx) =:= ctx).
-define(SERVICE, 'mainflux.ThingsService').
-define(PROTO_MODULE, 'authn_pb').
-define(MARSHAL_FUN(T), fun(I) -> ?PROTO_MODULE:encode_msg(I, T) end).
-define(UNMARSHAL_FUN(T), fun(I) -> ?PROTO_MODULE:decode_msg(I, T) end).
-define(DEF(Input, Output, MessageType), #grpcbox_def{service=?SERVICE,
message_type=MessageType,
marshal_fun=?MARSHAL_FUN(Input),
unmarshal_fun=?UNMARSHAL_FUN(Output)}).
%% @doc Unary RPC
-spec can_access_by_key(authn_pb:access_by_key_req()) ->
{ok, authn_pb:thing_id(), grpcbox:metadata()} | grpcbox_stream:grpc_error_response().
can_access_by_key(Input) ->
can_access_by_key(ctx:new(), Input, #{}).
-spec can_access_by_key(ctx:t() | authn_pb:access_by_key_req(), authn_pb:access_by_key_req() | grpcbox_client:options()) ->
{ok, authn_pb:thing_id(), grpcbox:metadata()} | grpcbox_stream:grpc_error_response().
can_access_by_key(Ctx, Input) when ?is_ctx(Ctx) ->
can_access_by_key(Ctx, Input, #{});
can_access_by_key(Input, Options) ->
can_access_by_key(ctx:new(), Input, Options).
-spec can_access_by_key(ctx:t(), authn_pb:access_by_key_req(), grpcbox_client:options()) ->
{ok, authn_pb:thing_id(), grpcbox:metadata()} | grpcbox_stream:grpc_error_response().
can_access_by_key(Ctx, Input, Options) ->
grpcbox_client:unary(Ctx, <<"/mainflux.ThingsService/CanAccessByKey">>, Input, ?DEF(access_by_key_req, thing_id, <<"mainflux.AccessByKeyReq">>), Options).
%% @doc Unary RPC
-spec can_access_by_id(authn_pb:access_by_id_req()) ->
{ok, authn_pb:empty(), grpcbox:metadata()} | grpcbox_stream:grpc_error_response().
can_access_by_id(Input) ->
can_access_by_id(ctx:new(), Input, #{}).
-spec can_access_by_id(ctx:t() | authn_pb:access_by_id_req(), authn_pb:access_by_id_req() | grpcbox_client:options()) ->
{ok, authn_pb:empty(), grpcbox:metadata()} | grpcbox_stream:grpc_error_response().
can_access_by_id(Ctx, Input) when ?is_ctx(Ctx) ->
can_access_by_id(Ctx, Input, #{});
can_access_by_id(Input, Options) ->
can_access_by_id(ctx:new(), Input, Options).
-spec can_access_by_id(ctx:t(), authn_pb:access_by_id_req(), grpcbox_client:options()) ->
{ok, authn_pb:empty(), grpcbox:metadata()} | grpcbox_stream:grpc_error_response().
can_access_by_id(Ctx, Input, Options) ->
grpcbox_client:unary(Ctx, <<"/mainflux.ThingsService/CanAccessByID">>, Input, ?DEF(access_by_id_req, empty, <<"mainflux.AccessByIDReq">>), Options).
%% @doc Unary RPC
-spec identify(authn_pb:token()) ->
{ok, authn_pb:thing_id(), grpcbox:metadata()} | grpcbox_stream:grpc_error_response().
identify(Input) ->
identify(ctx:new(), Input, #{}).
-spec identify(ctx:t() | authn_pb:token(), authn_pb:token() | grpcbox_client:options()) ->
{ok, authn_pb:thing_id(), grpcbox:metadata()} | grpcbox_stream:grpc_error_response().
identify(Ctx, Input) when ?is_ctx(Ctx) ->
identify(Ctx, Input, #{});
identify(Input, Options) ->
identify(ctx:new(), Input, Options).
-spec identify(ctx:t(), authn_pb:token(), grpcbox_client:options()) ->
{ok, authn_pb:thing_id(), grpcbox:metadata()} | grpcbox_stream:grpc_error_response().
identify(Ctx, Input, Options) ->
grpcbox_client:unary(Ctx, <<"/mainflux.ThingsService/Identify">>, Input, ?DEF(token, thing_id, <<"mainflux.Token">>), Options).
-974
View File
@@ -1,974 +0,0 @@
%% -*- coding: utf-8 -*-
%% @private
%% Automatically generated, do not edit
%% Generated by gpb_compile version 4.7.3
-module(message_pb).
-export([encode_msg/2, encode_msg/3]).
-export([decode_msg/2, decode_msg/3]).
-export([merge_msgs/3, merge_msgs/4]).
-export([verify_msg/2, verify_msg/3]).
-export([get_msg_defs/0]).
-export([get_msg_names/0]).
-export([get_group_names/0]).
-export([get_msg_or_group_names/0]).
-export([get_enum_names/0]).
-export([find_msg_def/1, fetch_msg_def/1]).
-export([find_enum_def/1, fetch_enum_def/1]).
-export([enum_symbol_by_value/2, enum_value_by_symbol/2]).
-export([get_service_names/0]).
-export([get_service_def/1]).
-export([get_rpc_names/1]).
-export([find_rpc_def/2, fetch_rpc_def/2]).
-export([fqbin_to_service_name/1]).
-export([service_name_to_fqbin/1]).
-export([fqbins_to_service_and_rpc_name/2]).
-export([service_and_rpc_name_to_fqbins/2]).
-export([fqbin_to_msg_name/1]).
-export([msg_name_to_fqbin/1]).
-export([fqbin_to_enum_name/1]).
-export([enum_name_to_fqbin/1]).
-export([get_package_name/0]).
-export([uses_packages/0]).
-export([source_basename/0]).
-export([get_all_source_basenames/0]).
-export([get_all_proto_names/0]).
-export([get_msg_containment/1]).
-export([get_pkg_containment/1]).
-export([get_service_containment/1]).
-export([get_rpc_containment/1]).
-export([get_enum_containment/1]).
-export([get_proto_by_msg_name_as_fqbin/1]).
-export([get_proto_by_service_name_as_fqbin/1]).
-export([get_proto_by_enum_name_as_fqbin/1]).
-export([get_protos_by_pkg_name_as_fqbin/1]).
-export([gpb_version_as_string/0, gpb_version_as_list/0]).
%% enumerated types
-export_type([]).
%% message types
-type message() ::
#{channel => iodata(), % = 1
subtopic => iodata(), % = 2
publisher => iodata(), % = 3
protocol => iodata(), % = 4
contentType => iodata(), % = 5
payload => iodata() % = 6
}.
-export_type(['message'/0]).
-spec encode_msg(message(), atom()) -> binary().
encode_msg(Msg, MsgName) when is_atom(MsgName) ->
encode_msg(Msg, MsgName, []).
-spec encode_msg(message(), atom(), list()) -> binary().
encode_msg(Msg, MsgName, Opts) ->
case proplists:get_bool(verify, Opts) of
true -> verify_msg(Msg, MsgName, Opts);
false -> ok
end,
TrUserData = proplists:get_value(user_data, Opts),
case MsgName of
message ->
encode_msg_message(id(Msg, TrUserData), TrUserData)
end.
encode_msg_message(Msg, TrUserData) ->
encode_msg_message(Msg, <<>>, TrUserData).
encode_msg_message(#{} = M, Bin, TrUserData) ->
B1 = case M of
#{channel := F1} ->
begin
TrF1 = id(F1, TrUserData),
case is_empty_string(TrF1) of
true -> Bin;
false ->
e_type_string(TrF1, <<Bin/binary, 10>>, TrUserData)
end
end;
_ -> Bin
end,
B2 = case M of
#{subtopic := F2} ->
begin
TrF2 = id(F2, TrUserData),
case is_empty_string(TrF2) of
true -> B1;
false ->
e_type_string(TrF2, <<B1/binary, 18>>, TrUserData)
end
end;
_ -> B1
end,
B3 = case M of
#{publisher := F3} ->
begin
TrF3 = id(F3, TrUserData),
case is_empty_string(TrF3) of
true -> B2;
false ->
e_type_string(TrF3, <<B2/binary, 26>>, TrUserData)
end
end;
_ -> B2
end,
B4 = case M of
#{protocol := F4} ->
begin
TrF4 = id(F4, TrUserData),
case is_empty_string(TrF4) of
true -> B3;
false ->
e_type_string(TrF4, <<B3/binary, 34>>, TrUserData)
end
end;
_ -> B3
end,
B5 = case M of
#{contentType := F5} ->
begin
TrF5 = id(F5, TrUserData),
case is_empty_string(TrF5) of
true -> B4;
false ->
e_type_string(TrF5, <<B4/binary, 42>>, TrUserData)
end
end;
_ -> B4
end,
case M of
#{payload := F6} ->
begin
TrF6 = id(F6, TrUserData),
case iolist_size(TrF6) of
0 -> B5;
_ -> e_type_bytes(TrF6, <<B5/binary, 50>>, TrUserData)
end
end;
_ -> B5
end.
-compile({nowarn_unused_function,e_type_sint/3}).
e_type_sint(Value, Bin, _TrUserData) when Value >= 0 ->
e_varint(Value * 2, Bin);
e_type_sint(Value, Bin, _TrUserData) ->
e_varint(Value * -2 - 1, Bin).
-compile({nowarn_unused_function,e_type_int32/3}).
e_type_int32(Value, Bin, _TrUserData)
when 0 =< Value, Value =< 127 ->
<<Bin/binary, Value>>;
e_type_int32(Value, Bin, _TrUserData) ->
<<N:64/unsigned-native>> = <<Value:64/signed-native>>,
e_varint(N, Bin).
-compile({nowarn_unused_function,e_type_int64/3}).
e_type_int64(Value, Bin, _TrUserData)
when 0 =< Value, Value =< 127 ->
<<Bin/binary, Value>>;
e_type_int64(Value, Bin, _TrUserData) ->
<<N:64/unsigned-native>> = <<Value:64/signed-native>>,
e_varint(N, Bin).
-compile({nowarn_unused_function,e_type_bool/3}).
e_type_bool(true, Bin, _TrUserData) ->
<<Bin/binary, 1>>;
e_type_bool(false, Bin, _TrUserData) ->
<<Bin/binary, 0>>;
e_type_bool(1, Bin, _TrUserData) -> <<Bin/binary, 1>>;
e_type_bool(0, Bin, _TrUserData) -> <<Bin/binary, 0>>.
-compile({nowarn_unused_function,e_type_string/3}).
e_type_string(S, Bin, _TrUserData) ->
Utf8 = unicode:characters_to_binary(S),
Bin2 = e_varint(byte_size(Utf8), Bin),
<<Bin2/binary, Utf8/binary>>.
-compile({nowarn_unused_function,e_type_bytes/3}).
e_type_bytes(Bytes, Bin, _TrUserData)
when is_binary(Bytes) ->
Bin2 = e_varint(byte_size(Bytes), Bin),
<<Bin2/binary, Bytes/binary>>;
e_type_bytes(Bytes, Bin, _TrUserData)
when is_list(Bytes) ->
BytesBin = iolist_to_binary(Bytes),
Bin2 = e_varint(byte_size(BytesBin), Bin),
<<Bin2/binary, BytesBin/binary>>.
-compile({nowarn_unused_function,e_type_fixed32/3}).
e_type_fixed32(Value, Bin, _TrUserData) ->
<<Bin/binary, Value:32/little>>.
-compile({nowarn_unused_function,e_type_sfixed32/3}).
e_type_sfixed32(Value, Bin, _TrUserData) ->
<<Bin/binary, Value:32/little-signed>>.
-compile({nowarn_unused_function,e_type_fixed64/3}).
e_type_fixed64(Value, Bin, _TrUserData) ->
<<Bin/binary, Value:64/little>>.
-compile({nowarn_unused_function,e_type_sfixed64/3}).
e_type_sfixed64(Value, Bin, _TrUserData) ->
<<Bin/binary, Value:64/little-signed>>.
-compile({nowarn_unused_function,e_type_float/3}).
e_type_float(V, Bin, _) when is_number(V) ->
<<Bin/binary, V:32/little-float>>;
e_type_float(infinity, Bin, _) ->
<<Bin/binary, 0:16, 128, 127>>;
e_type_float('-infinity', Bin, _) ->
<<Bin/binary, 0:16, 128, 255>>;
e_type_float(nan, Bin, _) ->
<<Bin/binary, 0:16, 192, 127>>.
-compile({nowarn_unused_function,e_type_double/3}).
e_type_double(V, Bin, _) when is_number(V) ->
<<Bin/binary, V:64/little-float>>;
e_type_double(infinity, Bin, _) ->
<<Bin/binary, 0:48, 240, 127>>;
e_type_double('-infinity', Bin, _) ->
<<Bin/binary, 0:48, 240, 255>>;
e_type_double(nan, Bin, _) ->
<<Bin/binary, 0:48, 248, 127>>.
-compile({nowarn_unused_function,e_varint/3}).
e_varint(N, Bin, _TrUserData) -> e_varint(N, Bin).
-compile({nowarn_unused_function,e_varint/2}).
e_varint(N, Bin) when N =< 127 -> <<Bin/binary, N>>;
e_varint(N, Bin) ->
Bin2 = <<Bin/binary, (N band 127 bor 128)>>,
e_varint(N bsr 7, Bin2).
is_empty_string("") -> true;
is_empty_string(<<>>) -> true;
is_empty_string(L) when is_list(L) ->
not string_has_chars(L);
is_empty_string(B) when is_binary(B) -> false.
string_has_chars([C | _]) when is_integer(C) -> true;
string_has_chars([H | T]) ->
case string_has_chars(H) of
true -> true;
false -> string_has_chars(T)
end;
string_has_chars(B)
when is_binary(B), byte_size(B) =/= 0 ->
true;
string_has_chars(C) when is_integer(C) -> true;
string_has_chars(<<>>) -> false;
string_has_chars([]) -> false.
decode_msg(Bin, MsgName) when is_binary(Bin) ->
decode_msg(Bin, MsgName, []).
decode_msg(Bin, MsgName, Opts) when is_binary(Bin) ->
TrUserData = proplists:get_value(user_data, Opts),
decode_msg_1_catch(Bin, MsgName, TrUserData).
-ifdef('OTP_RELEASE').
decode_msg_1_catch(Bin, MsgName, TrUserData) ->
try decode_msg_2_doit(MsgName, Bin, TrUserData)
catch Class:Reason:StackTrace -> error({gpb_error,{decoding_failure, {Bin, MsgName, {Class, Reason, StackTrace}}}})
end.
-else.
decode_msg_1_catch(Bin, MsgName, TrUserData) ->
try decode_msg_2_doit(MsgName, Bin, TrUserData)
catch Class:Reason ->
StackTrace = erlang:get_stacktrace(),
error({gpb_error,{decoding_failure, {Bin, MsgName, {Class, Reason, StackTrace}}}})
end.
-endif.
decode_msg_2_doit(message, Bin, TrUserData) ->
id(decode_msg_message(Bin, TrUserData), TrUserData).
decode_msg_message(Bin, TrUserData) ->
dfp_read_field_def_message(Bin, 0, 0,
id(<<>>, TrUserData), id(<<>>, TrUserData),
id(<<>>, TrUserData), id(<<>>, TrUserData),
id(<<>>, TrUserData), id(<<>>, TrUserData),
TrUserData).
dfp_read_field_def_message(<<10, Rest/binary>>, Z1, Z2,
F@_1, F@_2, F@_3, F@_4, F@_5, F@_6, TrUserData) ->
d_field_message_channel(Rest, Z1, Z2, F@_1, F@_2, F@_3,
F@_4, F@_5, F@_6, TrUserData);
dfp_read_field_def_message(<<18, Rest/binary>>, Z1, Z2,
F@_1, F@_2, F@_3, F@_4, F@_5, F@_6, TrUserData) ->
d_field_message_subtopic(Rest, Z1, Z2, F@_1, F@_2, F@_3,
F@_4, F@_5, F@_6, TrUserData);
dfp_read_field_def_message(<<26, Rest/binary>>, Z1, Z2,
F@_1, F@_2, F@_3, F@_4, F@_5, F@_6, TrUserData) ->
d_field_message_publisher(Rest, Z1, Z2, F@_1, F@_2,
F@_3, F@_4, F@_5, F@_6, TrUserData);
dfp_read_field_def_message(<<34, Rest/binary>>, Z1, Z2,
F@_1, F@_2, F@_3, F@_4, F@_5, F@_6, TrUserData) ->
d_field_message_protocol(Rest, Z1, Z2, F@_1, F@_2, F@_3,
F@_4, F@_5, F@_6, TrUserData);
dfp_read_field_def_message(<<42, Rest/binary>>, Z1, Z2,
F@_1, F@_2, F@_3, F@_4, F@_5, F@_6, TrUserData) ->
d_field_message_contentType(Rest, Z1, Z2, F@_1, F@_2,
F@_3, F@_4, F@_5, F@_6, TrUserData);
dfp_read_field_def_message(<<50, Rest/binary>>, Z1, Z2,
F@_1, F@_2, F@_3, F@_4, F@_5, F@_6, TrUserData) ->
d_field_message_payload(Rest, Z1, Z2, F@_1, F@_2, F@_3,
F@_4, F@_5, F@_6, TrUserData);
dfp_read_field_def_message(<<>>, 0, 0, F@_1, F@_2, F@_3,
F@_4, F@_5, F@_6, _) ->
#{channel => F@_1, subtopic => F@_2, publisher => F@_3,
protocol => F@_4, contentType => F@_5, payload => F@_6};
dfp_read_field_def_message(Other, Z1, Z2, F@_1, F@_2,
F@_3, F@_4, F@_5, F@_6, TrUserData) ->
dg_read_field_def_message(Other, Z1, Z2, F@_1, F@_2,
F@_3, F@_4, F@_5, F@_6, TrUserData).
dg_read_field_def_message(<<1:1, X:7, Rest/binary>>, N,
Acc, F@_1, F@_2, F@_3, F@_4, F@_5, F@_6, TrUserData)
when N < 32 - 7 ->
dg_read_field_def_message(Rest, N + 7, X bsl N + Acc,
F@_1, F@_2, F@_3, F@_4, F@_5, F@_6, TrUserData);
dg_read_field_def_message(<<0:1, X:7, Rest/binary>>, N,
Acc, F@_1, F@_2, F@_3, F@_4, F@_5, F@_6,
TrUserData) ->
Key = X bsl N + Acc,
case Key of
10 ->
d_field_message_channel(Rest, 0, 0, F@_1, F@_2, F@_3,
F@_4, F@_5, F@_6, TrUserData);
18 ->
d_field_message_subtopic(Rest, 0, 0, F@_1, F@_2, F@_3,
F@_4, F@_5, F@_6, TrUserData);
26 ->
d_field_message_publisher(Rest, 0, 0, F@_1, F@_2, F@_3,
F@_4, F@_5, F@_6, TrUserData);
34 ->
d_field_message_protocol(Rest, 0, 0, F@_1, F@_2, F@_3,
F@_4, F@_5, F@_6, TrUserData);
42 ->
d_field_message_contentType(Rest, 0, 0, F@_1, F@_2,
F@_3, F@_4, F@_5, F@_6, TrUserData);
50 ->
d_field_message_payload(Rest, 0, 0, F@_1, F@_2, F@_3,
F@_4, F@_5, F@_6, TrUserData);
_ ->
case Key band 7 of
0 ->
skip_varint_message(Rest, 0, 0, F@_1, F@_2, F@_3, F@_4,
F@_5, F@_6, TrUserData);
1 ->
skip_64_message(Rest, 0, 0, F@_1, F@_2, F@_3, F@_4,
F@_5, F@_6, TrUserData);
2 ->
skip_length_delimited_message(Rest, 0, 0, F@_1, F@_2,
F@_3, F@_4, F@_5, F@_6,
TrUserData);
3 ->
skip_group_message(Rest, Key bsr 3, 0, F@_1, F@_2, F@_3,
F@_4, F@_5, F@_6, TrUserData);
5 ->
skip_32_message(Rest, 0, 0, F@_1, F@_2, F@_3, F@_4,
F@_5, F@_6, TrUserData)
end
end;
dg_read_field_def_message(<<>>, 0, 0, F@_1, F@_2, F@_3,
F@_4, F@_5, F@_6, _) ->
#{channel => F@_1, subtopic => F@_2, publisher => F@_3,
protocol => F@_4, contentType => F@_5, payload => F@_6}.
d_field_message_channel(<<1:1, X:7, Rest/binary>>, N,
Acc, F@_1, F@_2, F@_3, F@_4, F@_5, F@_6, TrUserData)
when N < 57 ->
d_field_message_channel(Rest, N + 7, X bsl N + Acc,
F@_1, F@_2, F@_3, F@_4, F@_5, F@_6, TrUserData);
d_field_message_channel(<<0:1, X:7, Rest/binary>>, N,
Acc, _, F@_2, F@_3, F@_4, F@_5, F@_6, TrUserData) ->
{NewFValue, RestF} = begin
Len = X bsl N + Acc,
<<Bytes:Len/binary, Rest2/binary>> = Rest,
{id(binary:copy(Bytes), TrUserData), Rest2}
end,
dfp_read_field_def_message(RestF, 0, 0, NewFValue, F@_2,
F@_3, F@_4, F@_5, F@_6, TrUserData).
d_field_message_subtopic(<<1:1, X:7, Rest/binary>>, N,
Acc, F@_1, F@_2, F@_3, F@_4, F@_5, F@_6, TrUserData)
when N < 57 ->
d_field_message_subtopic(Rest, N + 7, X bsl N + Acc,
F@_1, F@_2, F@_3, F@_4, F@_5, F@_6, TrUserData);
d_field_message_subtopic(<<0:1, X:7, Rest/binary>>, N,
Acc, F@_1, _, F@_3, F@_4, F@_5, F@_6, TrUserData) ->
{NewFValue, RestF} = begin
Len = X bsl N + Acc,
<<Bytes:Len/binary, Rest2/binary>> = Rest,
{id(binary:copy(Bytes), TrUserData), Rest2}
end,
dfp_read_field_def_message(RestF, 0, 0, F@_1, NewFValue,
F@_3, F@_4, F@_5, F@_6, TrUserData).
d_field_message_publisher(<<1:1, X:7, Rest/binary>>, N,
Acc, F@_1, F@_2, F@_3, F@_4, F@_5, F@_6, TrUserData)
when N < 57 ->
d_field_message_publisher(Rest, N + 7, X bsl N + Acc,
F@_1, F@_2, F@_3, F@_4, F@_5, F@_6, TrUserData);
d_field_message_publisher(<<0:1, X:7, Rest/binary>>, N,
Acc, F@_1, F@_2, _, F@_4, F@_5, F@_6, TrUserData) ->
{NewFValue, RestF} = begin
Len = X bsl N + Acc,
<<Bytes:Len/binary, Rest2/binary>> = Rest,
{id(binary:copy(Bytes), TrUserData), Rest2}
end,
dfp_read_field_def_message(RestF, 0, 0, F@_1, F@_2,
NewFValue, F@_4, F@_5, F@_6, TrUserData).
d_field_message_protocol(<<1:1, X:7, Rest/binary>>, N,
Acc, F@_1, F@_2, F@_3, F@_4, F@_5, F@_6, TrUserData)
when N < 57 ->
d_field_message_protocol(Rest, N + 7, X bsl N + Acc,
F@_1, F@_2, F@_3, F@_4, F@_5, F@_6, TrUserData);
d_field_message_protocol(<<0:1, X:7, Rest/binary>>, N,
Acc, F@_1, F@_2, F@_3, _, F@_5, F@_6, TrUserData) ->
{NewFValue, RestF} = begin
Len = X bsl N + Acc,
<<Bytes:Len/binary, Rest2/binary>> = Rest,
{id(binary:copy(Bytes), TrUserData), Rest2}
end,
dfp_read_field_def_message(RestF, 0, 0, F@_1, F@_2,
F@_3, NewFValue, F@_5, F@_6, TrUserData).
d_field_message_contentType(<<1:1, X:7, Rest/binary>>,
N, Acc, F@_1, F@_2, F@_3, F@_4, F@_5, F@_6,
TrUserData)
when N < 57 ->
d_field_message_contentType(Rest, N + 7, X bsl N + Acc,
F@_1, F@_2, F@_3, F@_4, F@_5, F@_6, TrUserData);
d_field_message_contentType(<<0:1, X:7, Rest/binary>>,
N, Acc, F@_1, F@_2, F@_3, F@_4, _, F@_6,
TrUserData) ->
{NewFValue, RestF} = begin
Len = X bsl N + Acc,
<<Bytes:Len/binary, Rest2/binary>> = Rest,
{id(binary:copy(Bytes), TrUserData), Rest2}
end,
dfp_read_field_def_message(RestF, 0, 0, F@_1, F@_2,
F@_3, F@_4, NewFValue, F@_6, TrUserData).
d_field_message_payload(<<1:1, X:7, Rest/binary>>, N,
Acc, F@_1, F@_2, F@_3, F@_4, F@_5, F@_6, TrUserData)
when N < 57 ->
d_field_message_payload(Rest, N + 7, X bsl N + Acc,
F@_1, F@_2, F@_3, F@_4, F@_5, F@_6, TrUserData);
d_field_message_payload(<<0:1, X:7, Rest/binary>>, N,
Acc, F@_1, F@_2, F@_3, F@_4, F@_5, _, TrUserData) ->
{NewFValue, RestF} = begin
Len = X bsl N + Acc,
<<Bytes:Len/binary, Rest2/binary>> = Rest,
{id(binary:copy(Bytes), TrUserData), Rest2}
end,
dfp_read_field_def_message(RestF, 0, 0, F@_1, F@_2,
F@_3, F@_4, F@_5, NewFValue, TrUserData).
skip_varint_message(<<1:1, _:7, Rest/binary>>, Z1, Z2,
F@_1, F@_2, F@_3, F@_4, F@_5, F@_6, TrUserData) ->
skip_varint_message(Rest, Z1, Z2, F@_1, F@_2, F@_3,
F@_4, F@_5, F@_6, TrUserData);
skip_varint_message(<<0:1, _:7, Rest/binary>>, Z1, Z2,
F@_1, F@_2, F@_3, F@_4, F@_5, F@_6, TrUserData) ->
dfp_read_field_def_message(Rest, Z1, Z2, F@_1, F@_2,
F@_3, F@_4, F@_5, F@_6, TrUserData).
skip_length_delimited_message(<<1:1, X:7, Rest/binary>>,
N, Acc, F@_1, F@_2, F@_3, F@_4, F@_5, F@_6,
TrUserData)
when N < 57 ->
skip_length_delimited_message(Rest, N + 7,
X bsl N + Acc, F@_1, F@_2, F@_3, F@_4, F@_5,
F@_6, TrUserData);
skip_length_delimited_message(<<0:1, X:7, Rest/binary>>,
N, Acc, F@_1, F@_2, F@_3, F@_4, F@_5, F@_6,
TrUserData) ->
Length = X bsl N + Acc,
<<_:Length/binary, Rest2/binary>> = Rest,
dfp_read_field_def_message(Rest2, 0, 0, F@_1, F@_2,
F@_3, F@_4, F@_5, F@_6, TrUserData).
skip_group_message(Bin, FNum, Z2, F@_1, F@_2, F@_3,
F@_4, F@_5, F@_6, TrUserData) ->
{_, Rest} = read_group(Bin, FNum),
dfp_read_field_def_message(Rest, 0, Z2, F@_1, F@_2,
F@_3, F@_4, F@_5, F@_6, TrUserData).
skip_32_message(<<_:32, Rest/binary>>, Z1, Z2, F@_1,
F@_2, F@_3, F@_4, F@_5, F@_6, TrUserData) ->
dfp_read_field_def_message(Rest, Z1, Z2, F@_1, F@_2,
F@_3, F@_4, F@_5, F@_6, TrUserData).
skip_64_message(<<_:64, Rest/binary>>, Z1, Z2, F@_1,
F@_2, F@_3, F@_4, F@_5, F@_6, TrUserData) ->
dfp_read_field_def_message(Rest, Z1, Z2, F@_1, F@_2,
F@_3, F@_4, F@_5, F@_6, TrUserData).
read_group(Bin, FieldNum) ->
{NumBytes, EndTagLen} = read_gr_b(Bin, 0, 0, 0, 0, FieldNum),
<<Group:NumBytes/binary, _:EndTagLen/binary, Rest/binary>> = Bin,
{Group, Rest}.
%% Like skipping over fields, but record the total length,
%% Each field is <(FieldNum bsl 3) bor FieldType> ++ <FieldValue>
%% Record the length because varints may be non-optimally encoded.
%%
%% Groups can be nested, but assume the same FieldNum cannot be nested
%% because group field numbers are shared with the rest of the fields
%% numbers. Thus we can search just for an group-end with the same
%% field number.
%%
%% (The only time the same group field number could occur would
%% be in a nested sub message, but then it would be inside a
%% length-delimited entry, which we skip-read by length.)
read_gr_b(<<1:1, X:7, Tl/binary>>, N, Acc, NumBytes, TagLen, FieldNum)
when N < (32-7) ->
read_gr_b(Tl, N+7, X bsl N + Acc, NumBytes, TagLen+1, FieldNum);
read_gr_b(<<0:1, X:7, Tl/binary>>, N, Acc, NumBytes, TagLen,
FieldNum) ->
Key = X bsl N + Acc,
TagLen1 = TagLen + 1,
case {Key bsr 3, Key band 7} of
{FieldNum, 4} -> % 4 = group_end
{NumBytes, TagLen1};
{_, 0} -> % 0 = varint
read_gr_vi(Tl, 0, NumBytes + TagLen1, FieldNum);
{_, 1} -> % 1 = bits64
<<_:64, Tl2/binary>> = Tl,
read_gr_b(Tl2, 0, 0, NumBytes + TagLen1 + 8, 0, FieldNum);
{_, 2} -> % 2 = length_delimited
read_gr_ld(Tl, 0, 0, NumBytes + TagLen1, FieldNum);
{_, 3} -> % 3 = group_start
read_gr_b(Tl, 0, 0, NumBytes + TagLen1, 0, FieldNum);
{_, 4} -> % 4 = group_end
read_gr_b(Tl, 0, 0, NumBytes + TagLen1, 0, FieldNum);
{_, 5} -> % 5 = bits32
<<_:32, Tl2/binary>> = Tl,
read_gr_b(Tl2, 0, 0, NumBytes + TagLen1 + 4, 0, FieldNum)
end.
read_gr_vi(<<1:1, _:7, Tl/binary>>, N, NumBytes, FieldNum)
when N < (64-7) ->
read_gr_vi(Tl, N+7, NumBytes+1, FieldNum);
read_gr_vi(<<0:1, _:7, Tl/binary>>, _, NumBytes, FieldNum) ->
read_gr_b(Tl, 0, 0, NumBytes+1, 0, FieldNum).
read_gr_ld(<<1:1, X:7, Tl/binary>>, N, Acc, NumBytes, FieldNum)
when N < (64-7) ->
read_gr_ld(Tl, N+7, X bsl N + Acc, NumBytes+1, FieldNum);
read_gr_ld(<<0:1, X:7, Tl/binary>>, N, Acc, NumBytes, FieldNum) ->
Len = X bsl N + Acc,
NumBytes1 = NumBytes + 1,
<<_:Len/binary, Tl2/binary>> = Tl,
read_gr_b(Tl2, 0, 0, NumBytes1 + Len, 0, FieldNum).
merge_msgs(Prev, New, MsgName) when is_atom(MsgName) ->
merge_msgs(Prev, New, MsgName, []).
merge_msgs(Prev, New, MsgName, Opts) ->
TrUserData = proplists:get_value(user_data, Opts),
case MsgName of
message -> merge_msg_message(Prev, New, TrUserData)
end.
-compile({nowarn_unused_function,merge_msg_message/3}).
merge_msg_message(PMsg, NMsg, _) ->
S1 = #{},
S2 = case {PMsg, NMsg} of
{_, #{channel := NFchannel}} ->
S1#{channel => NFchannel};
{#{channel := PFchannel}, _} ->
S1#{channel => PFchannel};
_ -> S1
end,
S3 = case {PMsg, NMsg} of
{_, #{subtopic := NFsubtopic}} ->
S2#{subtopic => NFsubtopic};
{#{subtopic := PFsubtopic}, _} ->
S2#{subtopic => PFsubtopic};
_ -> S2
end,
S4 = case {PMsg, NMsg} of
{_, #{publisher := NFpublisher}} ->
S3#{publisher => NFpublisher};
{#{publisher := PFpublisher}, _} ->
S3#{publisher => PFpublisher};
_ -> S3
end,
S5 = case {PMsg, NMsg} of
{_, #{protocol := NFprotocol}} ->
S4#{protocol => NFprotocol};
{#{protocol := PFprotocol}, _} ->
S4#{protocol => PFprotocol};
_ -> S4
end,
S6 = case {PMsg, NMsg} of
{_, #{contentType := NFcontentType}} ->
S5#{contentType => NFcontentType};
{#{contentType := PFcontentType}, _} ->
S5#{contentType => PFcontentType};
_ -> S5
end,
case {PMsg, NMsg} of
{_, #{payload := NFpayload}} ->
S6#{payload => NFpayload};
{#{payload := PFpayload}, _} ->
S6#{payload => PFpayload};
_ -> S6
end.
verify_msg(Msg, MsgName) when is_atom(MsgName) ->
verify_msg(Msg, MsgName, []).
verify_msg(Msg, MsgName, Opts) ->
TrUserData = proplists:get_value(user_data, Opts),
case MsgName of
message -> v_msg_message(Msg, [MsgName], TrUserData);
_ -> mk_type_error(not_a_known_message, Msg, [])
end.
-compile({nowarn_unused_function,v_msg_message/3}).
-dialyzer({nowarn_function,v_msg_message/3}).
v_msg_message(#{} = M, Path, TrUserData) ->
case M of
#{channel := F1} ->
v_type_string(F1, [channel | Path], TrUserData);
_ -> ok
end,
case M of
#{subtopic := F2} ->
v_type_string(F2, [subtopic | Path], TrUserData);
_ -> ok
end,
case M of
#{publisher := F3} ->
v_type_string(F3, [publisher | Path], TrUserData);
_ -> ok
end,
case M of
#{protocol := F4} ->
v_type_string(F4, [protocol | Path], TrUserData);
_ -> ok
end,
case M of
#{contentType := F5} ->
v_type_string(F5, [contentType | Path], TrUserData);
_ -> ok
end,
case M of
#{payload := F6} ->
v_type_bytes(F6, [payload | Path], TrUserData);
_ -> ok
end,
lists:foreach(fun (channel) -> ok;
(subtopic) -> ok;
(publisher) -> ok;
(protocol) -> ok;
(contentType) -> ok;
(payload) -> ok;
(OtherKey) ->
mk_type_error({extraneous_key, OtherKey}, M, Path)
end,
maps:keys(M)),
ok;
v_msg_message(M, Path, _TrUserData) when is_map(M) ->
mk_type_error({missing_fields, [] -- maps:keys(M),
message},
M, Path);
v_msg_message(X, Path, _TrUserData) ->
mk_type_error({expected_msg, message}, X, Path).
-compile({nowarn_unused_function,v_type_string/3}).
-dialyzer({nowarn_function,v_type_string/3}).
v_type_string(S, Path, _TrUserData)
when is_list(S); is_binary(S) ->
try unicode:characters_to_binary(S) of
B when is_binary(B) -> ok;
{error, _, _} ->
mk_type_error(bad_unicode_string, S, Path)
catch
error:badarg ->
mk_type_error(bad_unicode_string, S, Path)
end;
v_type_string(X, Path, _TrUserData) ->
mk_type_error(bad_unicode_string, X, Path).
-compile({nowarn_unused_function,v_type_bytes/3}).
-dialyzer({nowarn_function,v_type_bytes/3}).
v_type_bytes(B, _Path, _TrUserData) when is_binary(B) ->
ok;
v_type_bytes(B, _Path, _TrUserData) when is_list(B) ->
ok;
v_type_bytes(X, Path, _TrUserData) ->
mk_type_error(bad_binary_value, X, Path).
-compile({nowarn_unused_function,mk_type_error/3}).
-spec mk_type_error(_, _, list()) -> no_return().
mk_type_error(Error, ValueSeen, Path) ->
Path2 = prettify_path(Path),
erlang:error({gpb_type_error,
{Error, [{value, ValueSeen}, {path, Path2}]}}).
-compile({nowarn_unused_function,prettify_path/1}).
-dialyzer({nowarn_function,prettify_path/1}).
prettify_path([]) -> top_level;
prettify_path(PathR) ->
list_to_atom(lists:append(lists:join(".",
lists:map(fun atom_to_list/1,
lists:reverse(PathR))))).
-compile({nowarn_unused_function,id/2}).
-compile({inline,id/2}).
id(X, _TrUserData) -> X.
-compile({nowarn_unused_function,v_ok/3}).
-compile({inline,v_ok/3}).
v_ok(_Value, _Path, _TrUserData) -> ok.
-compile({nowarn_unused_function,m_overwrite/3}).
-compile({inline,m_overwrite/3}).
m_overwrite(_Prev, New, _TrUserData) -> New.
-compile({nowarn_unused_function,cons/3}).
-compile({inline,cons/3}).
cons(Elem, Acc, _TrUserData) -> [Elem | Acc].
-compile({nowarn_unused_function,lists_reverse/2}).
-compile({inline,lists_reverse/2}).
'lists_reverse'(L, _TrUserData) -> lists:reverse(L).
-compile({nowarn_unused_function,'erlang_++'/3}).
-compile({inline,'erlang_++'/3}).
'erlang_++'(A, B, _TrUserData) -> A ++ B.
get_msg_defs() ->
[{{msg, message},
[#{name => channel, fnum => 1, rnum => 2,
type => string, occurrence => optional, opts => []},
#{name => subtopic, fnum => 2, rnum => 3,
type => string, occurrence => optional, opts => []},
#{name => publisher, fnum => 3, rnum => 4,
type => string, occurrence => optional, opts => []},
#{name => protocol, fnum => 4, rnum => 5,
type => string, occurrence => optional, opts => []},
#{name => contentType, fnum => 5, rnum => 6,
type => string, occurrence => optional, opts => []},
#{name => payload, fnum => 6, rnum => 7, type => bytes,
occurrence => optional, opts => []}]}].
get_msg_names() -> [message].
get_group_names() -> [].
get_msg_or_group_names() -> [message].
get_enum_names() -> [].
fetch_msg_def(MsgName) ->
case find_msg_def(MsgName) of
Fs when is_list(Fs) -> Fs;
error -> erlang:error({no_such_msg, MsgName})
end.
-spec fetch_enum_def(_) -> no_return().
fetch_enum_def(EnumName) ->
erlang:error({no_such_enum, EnumName}).
find_msg_def(message) ->
[#{name => channel, fnum => 1, rnum => 2,
type => string, occurrence => optional, opts => []},
#{name => subtopic, fnum => 2, rnum => 3,
type => string, occurrence => optional, opts => []},
#{name => publisher, fnum => 3, rnum => 4,
type => string, occurrence => optional, opts => []},
#{name => protocol, fnum => 4, rnum => 5,
type => string, occurrence => optional, opts => []},
#{name => contentType, fnum => 5, rnum => 6,
type => string, occurrence => optional, opts => []},
#{name => payload, fnum => 6, rnum => 7, type => bytes,
occurrence => optional, opts => []}];
find_msg_def(_) -> error.
find_enum_def(_) -> error.
-spec enum_symbol_by_value(_, _) -> no_return().
enum_symbol_by_value(E, V) ->
erlang:error({no_enum_defs, E, V}).
-spec enum_value_by_symbol(_, _) -> no_return().
enum_value_by_symbol(E, V) ->
erlang:error({no_enum_defs, E, V}).
get_service_names() -> [].
get_service_def(_) -> error.
get_rpc_names(_) -> error.
find_rpc_def(_, _) -> error.
-spec fetch_rpc_def(_, _) -> no_return().
fetch_rpc_def(ServiceName, RpcName) ->
erlang:error({no_such_rpc, ServiceName, RpcName}).
%% Convert a a fully qualified (ie with package name) service name
%% as a binary to a service name as an atom.
-spec fqbin_to_service_name(_) -> no_return().
fqbin_to_service_name(X) ->
error({gpb_error, {badservice, X}}).
%% Convert a service name as an atom to a fully qualified
%% (ie with package name) name as a binary.
-spec service_name_to_fqbin(_) -> no_return().
service_name_to_fqbin(X) ->
error({gpb_error, {badservice, X}}).
%% Convert a a fully qualified (ie with package name) service name
%% and an rpc name, both as binaries to a service name and an rpc
%% name, as atoms.
-spec fqbins_to_service_and_rpc_name(_, _) -> no_return().
fqbins_to_service_and_rpc_name(S, R) ->
error({gpb_error, {badservice_or_rpc, {S, R}}}).
%% Convert a service name and an rpc name, both as atoms,
%% to a fully qualified (ie with package name) service name and
%% an rpc name as binaries.
-spec service_and_rpc_name_to_fqbins(_, _) -> no_return().
service_and_rpc_name_to_fqbins(S, R) ->
error({gpb_error, {badservice_or_rpc, {S, R}}}).
fqbin_to_msg_name(<<"mainflux.Message">>) -> message;
fqbin_to_msg_name(E) -> error({gpb_error, {badmsg, E}}).
msg_name_to_fqbin(message) -> <<"mainflux.Message">>;
msg_name_to_fqbin(E) -> error({gpb_error, {badmsg, E}}).
-spec fqbin_to_enum_name(_) -> no_return().
fqbin_to_enum_name(E) ->
error({gpb_error, {badenum, E}}).
-spec enum_name_to_fqbin(_) -> no_return().
enum_name_to_fqbin(E) ->
error({gpb_error, {badenum, E}}).
get_package_name() -> mainflux.
%% Whether or not the message names
%% are prepended with package name or not.
uses_packages() -> true.
source_basename() -> "message.proto".
%% Retrieve all proto file names, also imported ones.
%% The order is top-down. The first element is always the main
%% source file. The files are returned with extension,
%% see get_all_proto_names/0 for a version that returns
%% the basenames sans extension
get_all_source_basenames() -> ["message.proto"].
%% Retrieve all proto file names, also imported ones.
%% The order is top-down. The first element is always the main
%% source file. The files are returned sans .proto extension,
%% to make it easier to use them with the various get_xyz_containment
%% functions.
get_all_proto_names() -> ["message"].
get_msg_containment("message") -> [message];
get_msg_containment(P) ->
error({gpb_error, {badproto, P}}).
get_pkg_containment("message") -> mainflux;
get_pkg_containment(P) ->
error({gpb_error, {badproto, P}}).
get_service_containment("message") -> [];
get_service_containment(P) ->
error({gpb_error, {badproto, P}}).
get_rpc_containment("message") -> [];
get_rpc_containment(P) ->
error({gpb_error, {badproto, P}}).
get_enum_containment("message") -> [];
get_enum_containment(P) ->
error({gpb_error, {badproto, P}}).
get_proto_by_msg_name_as_fqbin(<<"mainflux.Message">>) -> "message";
get_proto_by_msg_name_as_fqbin(E) ->
error({gpb_error, {badmsg, E}}).
-spec get_proto_by_service_name_as_fqbin(_) -> no_return().
get_proto_by_service_name_as_fqbin(E) ->
error({gpb_error, {badservice, E}}).
-spec get_proto_by_enum_name_as_fqbin(_) -> no_return().
get_proto_by_enum_name_as_fqbin(E) ->
error({gpb_error, {badenum, E}}).
get_protos_by_pkg_name_as_fqbin(<<"mainflux">>) -> ["message"];
get_protos_by_pkg_name_as_fqbin(E) ->
error({gpb_error, {badpkg, E}}).
gpb_version_as_string() ->
"4.7.3".
gpb_version_as_list() ->
[4,7,3].
+2 -6
View File
@@ -56,13 +56,9 @@ MF_HTTP_ADAPTER_LOG_LEVEL=info MF_HTTP_ADAPTER_PORT=8185 MF_THINGS_URL=localhost
MF_WS_ADAPTER_LOG_LEVEL=info MF_WS_ADAPTER_PORT=8186 MF_THINGS_URL=localhost:8183 $BUILD_DIR/mainflux-ws &
###
# MQTT
# MQTT mproxy
###
# Switch to top dir to find *.proto stuff when running MQTT broker
cd ..
MF_MQTT_ADAPTER_LOG_LEVEL=info MF_THINGS_URL=localhost:8183 node mqtt/aedes/mqtt.js &
cd -
MF_MQTT_ADAPTER_LOG_LEVEL=info MF_THINGS_URL=localhost:8183 $BUILD_DIR/mainflux-mproxy &
###
# CoAP