mirror of
https://github.com/absmach/supermq.git
synced 2026-06-23 07:20:19 +00:00
NOISSUE - Use methods for accessing in generated code (#204)
Signed-off-by: Rodney Osodo <28790446+rodneyosodo@users.noreply.github.com>
This commit is contained in:
+1
-1
@@ -217,7 +217,7 @@ func newService(ctx context.Context, id string, ps messaging.PubSub, cfg config,
|
||||
|
||||
func handle(ctx context.Context, logger mglog.Logger, chanID string, svc twins.Service) handlerFunc {
|
||||
return func(msg *messaging.Message) error {
|
||||
if msg.Channel == chanID {
|
||||
if msg.GetChannel() == chanID {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
+2
-2
@@ -58,7 +58,7 @@ func (svc *adapterService) Publish(ctx context.Context, key string, msg *messagi
|
||||
SubjectType: auth.ThingType,
|
||||
Permission: auth.PublishPermission,
|
||||
Subject: key,
|
||||
Object: msg.Channel,
|
||||
Object: msg.GetChannel(),
|
||||
ObjectType: auth.GroupType,
|
||||
}
|
||||
res, err := svc.auth.Authorize(ctx, ar)
|
||||
@@ -70,7 +70,7 @@ func (svc *adapterService) Publish(ctx context.Context, key string, msg *messagi
|
||||
}
|
||||
msg.Publisher = res.GetId()
|
||||
|
||||
return svc.pubsub.Publish(ctx, msg.Channel, msg)
|
||||
return svc.pubsub.Publish(ctx, msg.GetChannel(), msg)
|
||||
}
|
||||
|
||||
func (svc *adapterService) Subscribe(ctx context.Context, key, chanID, subtopic string, c Client) error {
|
||||
|
||||
+3
-3
@@ -31,9 +31,9 @@ func LoggingMiddleware(svc coap.Service, logger mglog.Logger) coap.Service {
|
||||
// If the request fails, it logs the error.
|
||||
func (lm *loggingMiddleware) Publish(ctx context.Context, key string, msg *messaging.Message) (err error) {
|
||||
defer func(begin time.Time) {
|
||||
destChannel := msg.Channel
|
||||
if msg.Subtopic != "" {
|
||||
destChannel = fmt.Sprintf("%s.%s", destChannel, msg.Subtopic)
|
||||
destChannel := msg.GetChannel()
|
||||
if msg.GetSubtopic() != "" {
|
||||
destChannel = fmt.Sprintf("%s.%s", destChannel, msg.GetSubtopic())
|
||||
}
|
||||
message := fmt.Sprintf("Method publish to %s took %s to complete", destChannel, time.Since(begin))
|
||||
if err != nil {
|
||||
|
||||
@@ -124,9 +124,9 @@ func handleGet(ctx context.Context, m *mux.Message, c mux.Client, msg *messaging
|
||||
}
|
||||
if obs == startObserve {
|
||||
c := coap.NewClient(c, m.Token, logger)
|
||||
return service.Subscribe(ctx, key, msg.Channel, msg.Subtopic, c)
|
||||
return service.Subscribe(ctx, key, msg.GetChannel(), msg.GetSubtopic(), c)
|
||||
}
|
||||
return service.Unsubscribe(ctx, key, msg.Channel, msg.Subtopic, m.Token.String())
|
||||
return service.Unsubscribe(ctx, key, msg.GetChannel(), msg.GetSubtopic(), m.Token.String())
|
||||
}
|
||||
|
||||
func decodeMessage(msg *mux.Message) (*messaging.Message, error) {
|
||||
|
||||
@@ -102,9 +102,9 @@ func (ns *notifierService) ConsumeBlocking(ctx context.Context, message interfac
|
||||
if !ok {
|
||||
return ErrMessage
|
||||
}
|
||||
topic := msg.Channel
|
||||
if msg.Subtopic != "" {
|
||||
topic = fmt.Sprintf("%s.%s", msg.Channel, msg.Subtopic)
|
||||
topic := msg.GetChannel()
|
||||
if msg.GetSubtopic() != "" {
|
||||
topic = fmt.Sprintf("%s.%s", msg.GetChannel(), msg.GetSubtopic())
|
||||
}
|
||||
pm := PageMetadata{
|
||||
Topic: topic,
|
||||
@@ -136,9 +136,9 @@ func (ns *notifierService) ConsumeAsync(ctx context.Context, message interface{}
|
||||
ns.errCh <- ErrMessage
|
||||
return
|
||||
}
|
||||
topic := msg.Channel
|
||||
if msg.Subtopic != "" {
|
||||
topic = fmt.Sprintf("%s.%s", msg.Channel, msg.Subtopic)
|
||||
topic := msg.GetChannel()
|
||||
if msg.GetSubtopic() != "" {
|
||||
topic = fmt.Sprintf("%s.%s", msg.GetChannel(), msg.GetSubtopic())
|
||||
}
|
||||
pm := PageMetadata{
|
||||
Topic: topic,
|
||||
|
||||
@@ -56,7 +56,7 @@ func (n *notifier) Notify(from string, to []string, msg *messaging.Message) erro
|
||||
DestAddrTON: n.destAddrTON,
|
||||
SourceAddrNPI: n.sourceAddrNPI,
|
||||
DestAddrNPI: n.destAddrNPI,
|
||||
Text: pdutext.Raw(msg.Payload),
|
||||
Text: pdutext.Raw(msg.GetPayload()),
|
||||
Register: pdufield.NoDeliveryReceipt,
|
||||
}
|
||||
_, err := n.transmitter.Submit(send)
|
||||
|
||||
@@ -28,13 +28,13 @@ func New(agent *email.Agent) notifiers.Notifier {
|
||||
}
|
||||
|
||||
func (n *notifier) Notify(from string, to []string, msg *messaging.Message) error {
|
||||
subject := fmt.Sprintf(`Notification for Channel %s`, msg.Channel)
|
||||
if msg.Subtopic != "" {
|
||||
subject = fmt.Sprintf("%s and subtopic %s", subject, msg.Subtopic)
|
||||
subject := fmt.Sprintf(`Notification for Channel %s`, msg.GetChannel())
|
||||
if msg.GetSubtopic() != "" {
|
||||
subject = fmt.Sprintf("%s and subtopic %s", subject, msg.GetSubtopic())
|
||||
}
|
||||
|
||||
values := string(msg.Payload)
|
||||
content := fmt.Sprintf(contentTemplate, msg.Publisher, msg.Protocol, values)
|
||||
values := string(msg.GetPayload())
|
||||
content := fmt.Sprintf(contentTemplate, msg.GetPublisher(), msg.GetProtocol(), values)
|
||||
|
||||
return n.agent.Send(to, from, subject, "", "", content, footer)
|
||||
}
|
||||
|
||||
+4
-4
@@ -44,14 +44,14 @@ func (f forwarder) Forward(ctx context.Context, id string, sub messaging.Subscri
|
||||
|
||||
func handle(ctx context.Context, pub messaging.Publisher, logger mglog.Logger) handleFunc {
|
||||
return func(msg *messaging.Message) error {
|
||||
if msg.Protocol == protocol {
|
||||
if msg.GetProtocol() == protocol {
|
||||
return nil
|
||||
}
|
||||
// Use concatenation instead of fmt.Sprintf for the
|
||||
// sake of simplicity and performance.
|
||||
topic := "channels/" + msg.Channel + "/messages"
|
||||
if msg.Subtopic != "" {
|
||||
topic = topic + "/" + strings.ReplaceAll(msg.Subtopic, ".", "/")
|
||||
topic := "channels/" + msg.GetChannel() + "/messages"
|
||||
if msg.GetSubtopic() != "" {
|
||||
topic = topic + "/" + strings.ReplaceAll(msg.GetSubtopic(), ".", "/")
|
||||
}
|
||||
|
||||
go func() {
|
||||
|
||||
+1
-1
@@ -169,7 +169,7 @@ func (h *handler) Publish(ctx context.Context, topic *string, payload *[]byte) e
|
||||
Created: time.Now().UnixNano(),
|
||||
}
|
||||
|
||||
if err := h.publisher.Publish(ctx, msg.Channel, &msg); err != nil {
|
||||
if err := h.publisher.Publish(ctx, msg.GetChannel(), &msg); err != nil {
|
||||
return errors.Wrap(ErrFailedPublishToMsgBroker, err)
|
||||
}
|
||||
|
||||
|
||||
@@ -242,7 +242,7 @@ func (c client) publish(ctx context.Context, token string, m message) error {
|
||||
Created: time.Now().UnixNano(),
|
||||
}
|
||||
|
||||
if err := c.publisher.Publish(ctx, msg.Channel, &msg); err != nil {
|
||||
if err := c.publisher.Publish(ctx, msg.GetChannel(), &msg); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
@@ -460,7 +460,7 @@ type handler struct {
|
||||
}
|
||||
|
||||
func (h handler) Handle(msg *messaging.Message) error {
|
||||
if msg.Publisher != h.publisher {
|
||||
if msg.GetPublisher() != h.publisher {
|
||||
h.msgChan <- msg
|
||||
}
|
||||
return nil
|
||||
|
||||
@@ -73,8 +73,8 @@ func (pub *publisher) Publish(ctx context.Context, topic string, msg *messaging.
|
||||
}
|
||||
|
||||
subject := fmt.Sprintf("%s.%s", pub.prefix, topic)
|
||||
if msg.Subtopic != "" {
|
||||
subject = fmt.Sprintf("%s.%s", subject, msg.Subtopic)
|
||||
if msg.GetSubtopic() != "" {
|
||||
subject = fmt.Sprintf("%s.%s", subject, msg.GetSubtopic())
|
||||
}
|
||||
|
||||
_, err = pub.js.Publish(ctx, subject, data)
|
||||
|
||||
@@ -40,7 +40,7 @@ func NewPublisher(config server.Config, tracer trace.Tracer, publisher messaging
|
||||
}
|
||||
|
||||
func (pm *publisherMiddleware) Publish(ctx context.Context, topic string, msg *messaging.Message) error {
|
||||
ctx, span := tracing.CreateSpan(ctx, publishOP, msg.Publisher, topic, msg.Subtopic, len(msg.Payload), pm.host, trace.SpanKindClient, pm.tracer)
|
||||
ctx, span := tracing.CreateSpan(ctx, publishOP, msg.GetPublisher(), topic, msg.GetSubtopic(), len(msg.GetPayload()), pm.host, trace.SpanKindClient, pm.tracer)
|
||||
defer span.End()
|
||||
span.SetAttributes(defaultAttributes...)
|
||||
|
||||
|
||||
@@ -82,7 +82,7 @@ type traceHandler struct {
|
||||
|
||||
// Handle instruments the message handling operation.
|
||||
func (h *traceHandler) Handle(msg *messaging.Message) error {
|
||||
_, span := tracing.CreateSpan(h.ctx, processOp, h.clientID, h.topic, msg.Subtopic, len(msg.Payload), h.host, trace.SpanKindConsumer, h.tracer)
|
||||
_, span := tracing.CreateSpan(h.ctx, processOp, h.clientID, h.topic, msg.GetSubtopic(), len(msg.GetPayload()), h.host, trace.SpanKindConsumer, h.tracer)
|
||||
defer span.End()
|
||||
|
||||
span.SetAttributes(defaultAttributes...)
|
||||
|
||||
@@ -62,8 +62,8 @@ func (pub *publisher) Publish(ctx context.Context, topic string, msg *messaging.
|
||||
}
|
||||
|
||||
subject := fmt.Sprintf("%s.%s", pub.prefix, topic)
|
||||
if msg.Subtopic != "" {
|
||||
subject = fmt.Sprintf("%s.%s", subject, msg.Subtopic)
|
||||
if msg.GetSubtopic() != "" {
|
||||
subject = fmt.Sprintf("%s.%s", subject, msg.GetSubtopic())
|
||||
}
|
||||
subject = formatTopic(subject)
|
||||
|
||||
|
||||
@@ -446,7 +446,7 @@ type handler struct {
|
||||
}
|
||||
|
||||
func (h handler) Handle(msg *messaging.Message) error {
|
||||
if msg.Publisher != h.publisher {
|
||||
if msg.GetPublisher() != h.publisher {
|
||||
msgChan <- msg
|
||||
}
|
||||
return nil
|
||||
|
||||
@@ -41,7 +41,7 @@ func NewPublisher(config server.Config, tracer trace.Tracer, publisher messaging
|
||||
}
|
||||
|
||||
func (pm *publisherMiddleware) Publish(ctx context.Context, topic string, msg *messaging.Message) error {
|
||||
ctx, span := tracing.CreateSpan(ctx, publishOP, msg.Publisher, topic, msg.Subtopic, len(msg.Payload), pm.host, trace.SpanKindClient, pm.tracer)
|
||||
ctx, span := tracing.CreateSpan(ctx, publishOP, msg.GetPublisher(), topic, msg.GetSubtopic(), len(msg.GetPayload()), pm.host, trace.SpanKindClient, pm.tracer)
|
||||
defer span.End()
|
||||
|
||||
span.SetAttributes(defaultAttributes...)
|
||||
|
||||
@@ -82,7 +82,7 @@ type traceHandler struct {
|
||||
|
||||
// Handle instruments the message handling operation.
|
||||
func (h *traceHandler) Handle(msg *messaging.Message) error {
|
||||
_, span := tracing.CreateSpan(h.ctx, processOp, h.clientID, h.topic, msg.Subtopic, len(msg.Payload), h.host, trace.SpanKindConsumer, h.tracer)
|
||||
_, span := tracing.CreateSpan(h.ctx, processOp, h.clientID, h.topic, msg.GetSubtopic(), len(msg.GetPayload()), h.host, trace.SpanKindConsumer, h.tracer)
|
||||
defer span.End()
|
||||
|
||||
span.SetAttributes(defaultAttributes...)
|
||||
|
||||
@@ -50,11 +50,11 @@ func New(tfs []TimeField) transformers.Transformer {
|
||||
// Transform transforms Magistrala message to a list of JSON messages.
|
||||
func (ts *transformerService) Transform(msg *messaging.Message) (interface{}, error) {
|
||||
ret := Message{
|
||||
Publisher: msg.Publisher,
|
||||
Created: msg.Created,
|
||||
Protocol: msg.Protocol,
|
||||
Channel: msg.Channel,
|
||||
Subtopic: msg.Subtopic,
|
||||
Publisher: msg.GetPublisher(),
|
||||
Created: msg.GetCreated(),
|
||||
Protocol: msg.GetProtocol(),
|
||||
Channel: msg.GetChannel(),
|
||||
Subtopic: msg.GetSubtopic(),
|
||||
}
|
||||
|
||||
if ret.Subtopic == "" {
|
||||
@@ -68,7 +68,7 @@ func (ts *transformerService) Transform(msg *messaging.Message) (interface{}, er
|
||||
|
||||
format := subs[len(subs)-1]
|
||||
var payload interface{}
|
||||
if err := json.Unmarshal(msg.Payload, &payload); err != nil {
|
||||
if err := json.Unmarshal(msg.GetPayload(), &payload); err != nil {
|
||||
return nil, errors.Wrap(ErrTransform, err)
|
||||
}
|
||||
|
||||
|
||||
@@ -44,7 +44,7 @@ func New(contentFormat string) transformers.Transformer {
|
||||
}
|
||||
|
||||
func (t transformer) Transform(msg *messaging.Message) (interface{}, error) {
|
||||
raw, err := senml.Decode(msg.Payload, t.format)
|
||||
raw, err := senml.Decode(msg.GetPayload(), t.format)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(errDecode, err)
|
||||
}
|
||||
@@ -60,14 +60,14 @@ func (t transformer) Transform(msg *messaging.Message) (interface{}, error) {
|
||||
t := v.Time
|
||||
if t == 0 {
|
||||
// Convert the Unix timestamp in nanoseconds to float64
|
||||
t = float64(msg.Created) / float64(1e9)
|
||||
t = float64(msg.GetCreated()) / float64(1e9)
|
||||
}
|
||||
|
||||
msgs[i] = Message{
|
||||
Channel: msg.Channel,
|
||||
Subtopic: msg.Subtopic,
|
||||
Publisher: msg.Publisher,
|
||||
Protocol: msg.Protocol,
|
||||
Channel: msg.GetChannel(),
|
||||
Subtopic: msg.GetSubtopic(),
|
||||
Publisher: msg.GetPublisher(),
|
||||
Protocol: msg.GetProtocol(),
|
||||
Name: v.Name,
|
||||
Unit: v.Unit,
|
||||
Time: t,
|
||||
|
||||
@@ -24,7 +24,7 @@ func NewBroker(sub map[string]string) messaging.Publisher {
|
||||
}
|
||||
|
||||
func (mb mockBroker) Publish(ctx context.Context, topic string, msg *messaging.Message) error {
|
||||
if len(msg.Payload) == 0 {
|
||||
if len(msg.GetPayload()) == 0 {
|
||||
return errors.New("failed to publish")
|
||||
}
|
||||
return nil
|
||||
|
||||
+11
-11
@@ -248,7 +248,7 @@ func (ts *twinsService) ListStates(ctx context.Context, token string, offset, li
|
||||
func (ts *twinsService) SaveStates(ctx context.Context, msg *messaging.Message) error {
|
||||
var ids []string
|
||||
|
||||
channel, subtopic := msg.Channel, msg.Subtopic
|
||||
channel, subtopic := msg.GetChannel(), msg.GetSubtopic()
|
||||
ids, err := ts.twinCache.IDs(ctx, channel, subtopic)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -283,17 +283,17 @@ func (ts *twinsService) saveState(ctx context.Context, msg *messaging.Message, t
|
||||
|
||||
tw, err := ts.twins.RetrieveByID(ctx, twinID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("retrieving twin for %s failed: %s", msg.Publisher, err)
|
||||
return fmt.Errorf("retrieving twin for %s failed: %s", msg.GetPublisher(), err)
|
||||
}
|
||||
|
||||
var recs []senml.Record
|
||||
if err := json.Unmarshal(msg.Payload, &recs); err != nil {
|
||||
return fmt.Errorf("unmarshal payload for %s failed: %s", msg.Publisher, err)
|
||||
if err := json.Unmarshal(msg.GetPayload(), &recs); err != nil {
|
||||
return fmt.Errorf("unmarshal payload for %s failed: %s", msg.GetPublisher(), err)
|
||||
}
|
||||
|
||||
st, err := ts.states.RetrieveLast(ctx, tw.ID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("retrieve last state for %s failed: %s", msg.Publisher, err)
|
||||
return fmt.Errorf("retrieve last state for %s failed: %s", msg.GetPublisher(), err)
|
||||
}
|
||||
|
||||
for _, rec := range recs {
|
||||
@@ -303,17 +303,17 @@ func (ts *twinsService) saveState(ctx context.Context, msg *messaging.Message, t
|
||||
return nil
|
||||
case update:
|
||||
if err := ts.states.Update(ctx, st); err != nil {
|
||||
return fmt.Errorf("update state for %s failed: %s", msg.Publisher, err)
|
||||
return fmt.Errorf("update state for %s failed: %s", msg.GetPublisher(), err)
|
||||
}
|
||||
case save:
|
||||
if err := ts.states.Save(ctx, st); err != nil {
|
||||
return fmt.Errorf("save state for %s failed: %s", msg.Publisher, err)
|
||||
return fmt.Errorf("save state for %s failed: %s", msg.GetPublisher(), err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
twinID = msg.Publisher
|
||||
b = msg.Payload
|
||||
twinID = msg.GetPublisher()
|
||||
b = msg.GetPayload()
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -345,7 +345,7 @@ func (ts *twinsService) prepareState(st *State, tw *Twin, rec senml.Record, msg
|
||||
if !attr.PersistState {
|
||||
continue
|
||||
}
|
||||
if attr.Channel == msg.Channel && (attr.Subtopic == SubtopicWildcard || attr.Subtopic == msg.Subtopic) {
|
||||
if attr.Channel == msg.GetChannel() && (attr.Subtopic == SubtopicWildcard || attr.Subtopic == msg.GetSubtopic()) {
|
||||
action = update
|
||||
delta := math.Abs(float64(st.Created.UnixNano()) - recNano)
|
||||
if recNano == 0 || delta > float64(def.Delta) {
|
||||
@@ -419,7 +419,7 @@ func (ts *twinsService) publish(ctx context.Context, twinID *string, err *error,
|
||||
Created: time.Now().UnixNano(),
|
||||
}
|
||||
|
||||
if err := ts.publisher.Publish(ctx, msg.Channel, &msg); err != nil {
|
||||
if err := ts.publisher.Publish(ctx, msg.GetChannel(), &msg); err != nil {
|
||||
ts.logger.Warn(fmt.Sprintf("Failed to publish notification on Message Broker: %s", err))
|
||||
}
|
||||
}
|
||||
|
||||
+2
-1
@@ -36,5 +36,6 @@ func (c *Client) Handle(msg *messaging.Message) error {
|
||||
if msg.GetPublisher() == c.id {
|
||||
return nil
|
||||
}
|
||||
return c.conn.WriteMessage(websocket.TextMessage, msg.Payload)
|
||||
|
||||
return c.conn.WriteMessage(websocket.TextMessage, msg.GetPayload())
|
||||
}
|
||||
|
||||
+1
-1
@@ -189,7 +189,7 @@ func (h *handler) Publish(ctx context.Context, topic *string, payload *[]byte) e
|
||||
Created: time.Now().UnixNano(),
|
||||
}
|
||||
|
||||
if err := h.pubsub.Publish(ctx, msg.Channel, &msg); err != nil {
|
||||
if err := h.pubsub.Publish(ctx, msg.GetChannel(), &msg); err != nil {
|
||||
return errors.Wrap(ErrFailedPublishToMsgBroker, err)
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user