NOISSUE - update mqtt prov tool and some refactor (#831)

* refactor code

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* connect each thing with each channel

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* add some comments

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* connect each thing with each channel

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* reverting - structure fields must be exported

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* reverting - structure fields must be exported

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* revert some names

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* move meausuring time start

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* revert changes to .gitignore

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* small changes

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>
This commit is contained in:
Mirko Teodorovic
2019-09-05 11:39:58 +00:00
committed by Drasko DRASKOVIC
parent e17a3de1d0
commit 640dfb7e19
5 changed files with 66 additions and 48 deletions
-2
View File
@@ -1,4 +1,2 @@
build
.vscode/
site/
*.crt
+30 -24
View File
@@ -54,16 +54,20 @@ type mainfluxFile struct {
ConnFile string `toml:"connections_file" mapstructure:"connections_file"`
}
type mfConn struct {
type mfThing struct {
ThingID string `toml:"thing_id" mapstructure:"thing_id"`
ThingKey string `toml:"thing_key" mapstructure:"thing_key"`
MTLSCert string `toml:"mtls_cert" mapstructure:"mtls_cert"`
MTLSKey string `toml:"mtls_key" mapstructure:"mtls_key"`
}
type mfChannel struct {
ChannelID string `toml:"channel_id" mapstructure:"channel_id"`
ThingID string `toml:"thing_id" mapstructure:"thing_id"`
ThingKey string `toml:"thing_key" mapstructure:"thing_key"`
MTLSCert string `toml:"mtls_cert" mapstructure:"mtls_cert"`
MTLSKey string `toml:"mtls_key" mapstructure:"mtls_key"`
}
type mainflux struct {
Conns []mfConn `toml:"mainflux" mapstructure:"mainflux"`
Things []mfThing `toml:"things" mapstructure:"things"`
Channels []mfChannel `toml:"channels" mapstructure:"channels"`
}
// Config struct holds benchmark configuration
@@ -76,8 +80,8 @@ type Config struct {
// JSONResults are used to export results as a JSON document
type JSONResults struct {
Runs []*RunResults `json:"runs"`
Totals *TotalResults `json:"totals"`
Runs []*runResults `json:"runs"`
Totals *totalResults `json:"totals"`
}
// Benchmark - main benckhmarking function
@@ -86,7 +90,7 @@ func Benchmark(cfg Config) {
var err error
checkConnection(cfg.MQTT.Broker.URL, 1)
subTimes := make(SubTimes)
subTimes := make(subTimes)
var caByte []byte
if cfg.MQTT.TLS.MTLS {
caFile, err := os.Open(cfg.MQTT.TLS.CA)
@@ -105,19 +109,19 @@ func Benchmark(cfg Config) {
log.Fatalf("Cannot load Mainflux connections config %s \nuse tools/provision to create file", cfg.Mf.ConnFile)
}
resCh := make(chan *RunResults)
resCh := make(chan *runResults)
done := make(chan bool)
start := time.Now()
n := len(mf.Conns)
n := len(mf.Channels)
var cert tls.Certificate
// Subscribers
for i := 0; i < cfg.Test.Subs; i++ {
mfConn := mf.Conns[i%n]
mfChann := mf.Channels[i%n]
mfThing := mf.Things[i%n]
if cfg.MQTT.TLS.MTLS {
cert, err = tls.X509KeyPair([]byte(mfConn.MTLSCert), []byte(mfConn.MTLSKey))
cert, err = tls.X509KeyPair([]byte(mfThing.MTLSCert), []byte(mfThing.MTLSKey))
if err != nil {
log.Fatal(err)
}
@@ -126,9 +130,9 @@ func Benchmark(cfg Config) {
c := &Client{
ID: strconv.Itoa(i),
BrokerURL: cfg.MQTT.Broker.URL,
BrokerUser: mfConn.ThingID,
BrokerPass: mfConn.ThingKey,
MsgTopic: fmt.Sprintf("channels/%s/messages/test", mfConn.ChannelID),
BrokerUser: mfThing.ThingID,
BrokerPass: mfThing.ThingKey,
MsgTopic: fmt.Sprintf("channels/%s/messages/test", mfChann.ChannelID),
MsgSize: cfg.MQTT.Message.Size,
MsgCount: cfg.Test.Count,
MsgQoS: byte(cfg.MQTT.Message.QoS),
@@ -148,12 +152,14 @@ func Benchmark(cfg Config) {
wg.Wait()
start := time.Now()
// Publishers
for i := 0; i < cfg.Test.Pubs; i++ {
mfConn := mf.Conns[i%n]
mfChann := mf.Channels[i%n]
mfThing := mf.Things[i%n]
if cfg.MQTT.TLS.MTLS {
cert, err = tls.X509KeyPair([]byte(mfConn.MTLSCert), []byte(mfConn.MTLSKey))
cert, err = tls.X509KeyPair([]byte(mfThing.MTLSCert), []byte(mfThing.MTLSKey))
if err != nil {
log.Fatal(err)
}
@@ -162,9 +168,9 @@ func Benchmark(cfg Config) {
c := &Client{
ID: strconv.Itoa(i),
BrokerURL: cfg.MQTT.Broker.URL,
BrokerUser: mfConn.ThingID,
BrokerPass: mfConn.ThingKey,
MsgTopic: fmt.Sprintf("channels/%s/messages/test", mfConn.ChannelID),
BrokerUser: mfThing.ThingID,
BrokerPass: mfThing.ThingKey,
MsgTopic: fmt.Sprintf("channels/%s/messages/test", mfChann.ChannelID),
MsgSize: cfg.MQTT.Message.Size,
MsgCount: cfg.Test.Count,
MsgQoS: byte(cfg.MQTT.Message.QoS),
@@ -181,9 +187,9 @@ func Benchmark(cfg Config) {
}
// Collect the results
var results []*RunResults
var results []*runResults
if cfg.Test.Pubs > 0 {
results = make([]*RunResults, cfg.Test.Pubs)
results = make([]*runResults, cfg.Test.Pubs)
}
for i := 0; i < cfg.Test.Pubs; i++ {
+4 -4
View File
@@ -59,12 +59,12 @@ type message struct {
}
// Publisher
func (c *Client) runPublisher(r chan *RunResults) {
func (c *Client) runPublisher(r chan *runResults) {
newMsgs := make(chan *message)
pubMsgs := make(chan *message)
doneGen := make(chan bool)
donePub := make(chan bool)
runResults := new(RunResults)
runResults := new(runResults)
started := time.Now()
@@ -106,7 +106,7 @@ func (c *Client) runPublisher(r chan *RunResults) {
}
// Subscriber
func (c *Client) runSubscriber(wg *sync.WaitGroup, subTimes *SubTimes, done *chan bool) {
func (c *Client) runSubscriber(wg *sync.WaitGroup, subTimes *subTimes, done *chan bool) {
defer wg.Done()
// Start subscriber
@@ -127,7 +127,7 @@ func (c *Client) generate(ch chan *message, done chan bool) {
return
}
func (c *Client) subscribe(wg *sync.WaitGroup, subTimes *SubTimes, done *chan bool) {
func (c *Client) subscribe(wg *sync.WaitGroup, subTimes *subTimes, done *chan bool) {
clientID := fmt.Sprintf("sub-%v-%v", time.Now().Format(time.RFC3339Nano), c.ID)
c.ID = clientID
+7 -10
View File
@@ -14,8 +14,7 @@ import (
stat "gonum.org/v1/gonum/stat"
)
// RunResults describes results of a single client / run
type RunResults struct {
type runResults struct {
ID string `json:"id"`
Successes int64 `json:"successes"`
Failures int64 `json:"failures"`
@@ -31,11 +30,9 @@ type RunResults struct {
MsgsPerSec float64 `json:"msgs_per_sec"`
}
// SubTimes - measuring time of arrival of message in subs
type SubTimes map[string][]float64
type subTimes map[string][]float64
// TotalResults describes results of all clients / runs
type TotalResults struct {
type totalResults struct {
Ratio float64 `json:"ratio"`
Successes int64 `json:"successes"`
Failures int64 `json:"failures"`
@@ -53,12 +50,12 @@ type TotalResults struct {
AvgMsgsPerSec float64 `json:"avg_msgs_per_sec"`
}
func calculateTotalResults(results []*RunResults, totalTime time.Duration, subTimes *SubTimes) *TotalResults {
func calculateTotalResults(results []*runResults, totalTime time.Duration, subTimes *subTimes) *totalResults {
if results == nil || len(results) < 1 {
return nil
}
totals := new(TotalResults)
subTimeRunResults := RunResults{}
totals := new(totalResults)
subTimeRunResults := runResults{}
msgTimeMeans := make([]float64, len(results))
msgTimeMeansDelivered := make([]float64, len(results))
msgsPerSecs := make([]float64, len(results))
@@ -121,7 +118,7 @@ func calculateTotalResults(results []*RunResults, totalTime time.Duration, subTi
return totals
}
func printResults(results []*RunResults, totals *TotalResults, format string, quiet bool) {
func printResults(results []*runResults, totals *totalResults, format string, quiet bool) {
switch format {
case "json":
jr := JSONResults{
+25 -8
View File
@@ -18,6 +18,7 @@ import (
"log"
"math/big"
"os"
"sync"
"time"
"github.com/docker/docker/pkg/namesgenerator"
@@ -116,6 +117,11 @@ func Provision(conf Config) {
}
// Create things and channels
things := make([]*sdk.Thing, conf.Num)
channels := make([]*string, conf.Num)
fmt.Println("# List of things that can be connected to MQTT broker")
for i := 0; i < conf.Num; i++ {
tid, err := s.CreateThing(sdk.Thing{Name: fmt.Sprintf("%s-thing-%d", conf.Prefix, i)}, token)
if err != nil {
@@ -123,6 +129,8 @@ func Provision(conf Config) {
}
thing, err := s.Thing(tid, token)
things[i] = &thing
if err != nil {
log.Fatalf("Failed to fetch the thing: %s", err.Error())
}
@@ -132,10 +140,7 @@ func Provision(conf Config) {
log.Fatalf(err.Error())
}
if err := s.ConnectThing(tid, cid, token); err != nil {
log.Printf("Failed to create thing: %s", err)
return
}
channels[i] = &cid
cert := ""
key := ""
@@ -195,16 +200,28 @@ func Provision(conf Config) {
}
// Print output
fmt.Println("[[mainflux]]")
fmt.Printf("channel_id = \"%s\"\n", cid)
fmt.Printf("thing_id = \"%s\"\n", tid)
fmt.Printf("thing_key = \"%s\"\n", thing.Key)
fmt.Printf("[[things]]\nthing_id = \"%s\"\nthing_key = \"%s\"\n", tid, thing.Key)
if conf.SSL {
fmt.Printf("mtls_cert = \"\"\"%s\"\"\"\n", cert)
fmt.Printf("mtls_key = \"\"\"%s\"\"\"\n", key)
}
fmt.Println("")
}
var wg sync.WaitGroup
fmt.Printf("# List of channels that things can publish to\n" +
"# each channel is connected to each thing from things list\n")
for i := 0; i < conf.Num; i++ {
for j := 0; j < conf.Num; j++ {
wg.Add(1)
go func(wg *sync.WaitGroup, i, j int) {
defer wg.Done()
s.ConnectThing(things[j].ID, *channels[i], token)
}(&wg, i, j)
}
fmt.Printf("[[channels]]\nchannel_id = \"%s\"\n\n", *channels[i])
}
wg.Wait()
}
func publicKey(priv interface{}) interface{} {