NOISSUE - Add Readers GRPC Endpoints (#87)

* add ui prefix

Signed-off-by: Musilah <nataleigh.nk@gmail.com>

* add grpc backend

Signed-off-by: Musilah <nataleigh.nk@gmail.com>

* revert email utility and ui

Signed-off-by: Musilah <nataleigh.nk@gmail.com>

* update env variables and docker-composes

Signed-off-by: Musilah <nataleigh.nk@gmail.com>

* update env variables

Signed-off-by: Musilah <nataleigh.nk@gmail.com>

* add middleware folder

Signed-off-by: Musilah <nataleigh.nk@gmail.com>

* fix linter

Signed-off-by: Musilah <nataleigh.nk@gmail.com>

* ui commented

Signed-off-by: Musilah <nataleigh.nk@gmail.com>

* resolve comments and conflicts

Signed-off-by: Musilah <nataleigh.nk@gmail.com>

* resolve comments pt2

Signed-off-by: Musilah <nataleigh.nk@gmail.com>

* remove token from request

Signed-off-by: Musilah <nataleigh.nk@gmail.com>

* resolve comments

Signed-off-by: Musilah <nataleigh.nk@gmail.com>

* revert makefile and docker changes

Signed-off-by: Musilah <nataleigh.nk@gmail.com>

* force open grpc conn

Signed-off-by: Musilah <nataleigh.nk@gmail.com>

* fix linter

Signed-off-by: Musilah <nataleigh.nk@gmail.com>

* refactor components

Signed-off-by: Musilah <nataleigh.nk@gmail.com>

* fix tests

Signed-off-by: Musilah <nataleigh.nk@gmail.com>

* fix query issue

Signed-off-by: Musilah <nataleigh.nk@gmail.com>

* update protoc

Signed-off-by: Musilah <nataleigh.nk@gmail.com>

* rename variables

Signed-off-by: Musilah <nataleigh.nk@gmail.com>

* remove unused envs

Signed-off-by: Musilah <nataleigh.nk@gmail.com>

* use senml and json types for the messages struct

Signed-off-by: Musilah <nataleigh.nk@gmail.com>

* remove repetitive message fields

Signed-off-by: Musilah <nataleigh.nk@gmail.com>

* update protoc version

Signed-off-by: Musilah <nataleigh.nk@gmail.com>

* return ui to docker

Signed-off-by: Musilah <nataleigh.nk@gmail.com>

---------

Signed-off-by: Musilah <nataleigh.nk@gmail.com>
This commit is contained in:
Nataly Musilah
2025-04-09 12:34:14 +03:00
committed by GitHub
parent 58aca168c8
commit 478c9907db
28 changed files with 2059 additions and 25 deletions
+6 -2
View File
@@ -20,6 +20,10 @@ DOCKER_COMPOSE_COMMANDS_SUPPORTED := up down config
DEFAULT_DOCKER_COMPOSE_COMMAND := up
GRPC_MTLS_CERT_FILES_EXISTS = 0
MOCKERY_VERSION=v3.0.0-beta.6
PKG_PROTO_GEN_OUT_DIR=api/grpc
INTERNAL_PROTO_DIR=internal/proto
INTERNAL_PROTO_FILES := $(shell find $(INTERNAL_PROTO_DIR) -name "*.proto" | sed 's|$(INTERNAL_PROTO_DIR)/||')
ifneq ($(MG_MESSAGE_BROKER_TYPE),)
MG_MESSAGE_BROKER_TYPE := $(MG_MESSAGE_BROKER_TYPE)
else
@@ -174,8 +178,8 @@ $(TEST_API):
$(call test_api_service,$(@),$(TEST_API_URL))
proto:
protoc -I. --go_out=. --go_opt=paths=source_relative pkg/messaging/*.proto
protoc -I. --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative ./*.proto
mkdir -p $(PKG_PROTO_GEN_OUT_DIR)
protoc -I $(INTERNAL_PROTO_DIR) --go_out=$(PKG_PROTO_GEN_OUT_DIR) --go_opt=paths=source_relative --go-grpc_out=$(PKG_PROTO_GEN_OUT_DIR) --go-grpc_opt=paths=source_relative $(INTERNAL_PROTO_FILES)
$(FILTERED_SERVICES):
$(call compile_service,$(@))
+849
View File
@@ -0,0 +1,849 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.36.6
// protoc v6.30.2
// source: readers/v1/readers.proto
package v1
import (
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
unsafe "unsafe"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
// Aggregation defines supported data aggregations.
type Aggregation int32
const (
Aggregation_AGGREGATION_UNSPECIFIED Aggregation = 0
Aggregation_MAX Aggregation = 1
Aggregation_MIN Aggregation = 2
Aggregation_SUM Aggregation = 3
Aggregation_COUNT Aggregation = 4
Aggregation_AVG Aggregation = 5
)
// Enum value maps for Aggregation.
var (
Aggregation_name = map[int32]string{
0: "AGGREGATION_UNSPECIFIED",
1: "MAX",
2: "MIN",
3: "SUM",
4: "COUNT",
5: "AVG",
}
Aggregation_value = map[string]int32{
"AGGREGATION_UNSPECIFIED": 0,
"MAX": 1,
"MIN": 2,
"SUM": 3,
"COUNT": 4,
"AVG": 5,
}
)
func (x Aggregation) Enum() *Aggregation {
p := new(Aggregation)
*p = x
return p
}
func (x Aggregation) String() string {
return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x))
}
func (Aggregation) Descriptor() protoreflect.EnumDescriptor {
return file_readers_v1_readers_proto_enumTypes[0].Descriptor()
}
func (Aggregation) Type() protoreflect.EnumType {
return &file_readers_v1_readers_proto_enumTypes[0]
}
func (x Aggregation) Number() protoreflect.EnumNumber {
return protoreflect.EnumNumber(x)
}
// Deprecated: Use Aggregation.Descriptor instead.
func (Aggregation) EnumDescriptor() ([]byte, []int) {
return file_readers_v1_readers_proto_rawDescGZIP(), []int{0}
}
type PageMetadata struct {
state protoimpl.MessageState `protogen:"open.v1"`
Limit uint64 `protobuf:"varint,1,opt,name=limit,proto3" json:"limit,omitempty"`
Offset uint64 `protobuf:"varint,2,opt,name=offset,proto3" json:"offset,omitempty"`
Protocol string `protobuf:"bytes,3,opt,name=protocol,proto3" json:"protocol,omitempty"`
Name string `protobuf:"bytes,4,opt,name=name,proto3" json:"name,omitempty"`
Value float64 `protobuf:"fixed64,5,opt,name=value,proto3" json:"value,omitempty"`
Publisher string `protobuf:"bytes,6,opt,name=publisher,proto3" json:"publisher,omitempty"`
BoolValue bool `protobuf:"varint,7,opt,name=bool_value,json=boolValue,proto3" json:"bool_value,omitempty"`
StringValue string `protobuf:"bytes,8,opt,name=string_value,json=stringValue,proto3" json:"string_value,omitempty"`
DataValue string `protobuf:"bytes,9,opt,name=data_value,json=dataValue,proto3" json:"data_value,omitempty"`
From float64 `protobuf:"fixed64,10,opt,name=from,proto3" json:"from,omitempty"`
To float64 `protobuf:"fixed64,11,opt,name=to,proto3" json:"to,omitempty"`
Subtopic string `protobuf:"bytes,12,opt,name=subtopic,proto3" json:"subtopic,omitempty"`
Interval string `protobuf:"bytes,13,opt,name=interval,proto3" json:"interval,omitempty"`
Read bool `protobuf:"varint,14,opt,name=read,proto3" json:"read,omitempty"`
Aggregation Aggregation `protobuf:"varint,15,opt,name=aggregation,proto3,enum=readers.v1.Aggregation" json:"aggregation,omitempty"`
Comparator string `protobuf:"bytes,16,opt,name=comparator,proto3" json:"comparator,omitempty"`
Format string `protobuf:"bytes,17,opt,name=format,proto3" json:"format,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *PageMetadata) Reset() {
*x = PageMetadata{}
mi := &file_readers_v1_readers_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *PageMetadata) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*PageMetadata) ProtoMessage() {}
func (x *PageMetadata) ProtoReflect() protoreflect.Message {
mi := &file_readers_v1_readers_proto_msgTypes[0]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use PageMetadata.ProtoReflect.Descriptor instead.
func (*PageMetadata) Descriptor() ([]byte, []int) {
return file_readers_v1_readers_proto_rawDescGZIP(), []int{0}
}
func (x *PageMetadata) GetLimit() uint64 {
if x != nil {
return x.Limit
}
return 0
}
func (x *PageMetadata) GetOffset() uint64 {
if x != nil {
return x.Offset
}
return 0
}
func (x *PageMetadata) GetProtocol() string {
if x != nil {
return x.Protocol
}
return ""
}
func (x *PageMetadata) GetName() string {
if x != nil {
return x.Name
}
return ""
}
func (x *PageMetadata) GetValue() float64 {
if x != nil {
return x.Value
}
return 0
}
func (x *PageMetadata) GetPublisher() string {
if x != nil {
return x.Publisher
}
return ""
}
func (x *PageMetadata) GetBoolValue() bool {
if x != nil {
return x.BoolValue
}
return false
}
func (x *PageMetadata) GetStringValue() string {
if x != nil {
return x.StringValue
}
return ""
}
func (x *PageMetadata) GetDataValue() string {
if x != nil {
return x.DataValue
}
return ""
}
func (x *PageMetadata) GetFrom() float64 {
if x != nil {
return x.From
}
return 0
}
func (x *PageMetadata) GetTo() float64 {
if x != nil {
return x.To
}
return 0
}
func (x *PageMetadata) GetSubtopic() string {
if x != nil {
return x.Subtopic
}
return ""
}
func (x *PageMetadata) GetInterval() string {
if x != nil {
return x.Interval
}
return ""
}
func (x *PageMetadata) GetRead() bool {
if x != nil {
return x.Read
}
return false
}
func (x *PageMetadata) GetAggregation() Aggregation {
if x != nil {
return x.Aggregation
}
return Aggregation_AGGREGATION_UNSPECIFIED
}
func (x *PageMetadata) GetComparator() string {
if x != nil {
return x.Comparator
}
return ""
}
func (x *PageMetadata) GetFormat() string {
if x != nil {
return x.Format
}
return ""
}
type ReadMessagesRes struct {
state protoimpl.MessageState `protogen:"open.v1"`
Total uint64 `protobuf:"varint,1,opt,name=total,proto3" json:"total,omitempty"`
PageMetadata *PageMetadata `protobuf:"bytes,2,opt,name=page_metadata,json=pageMetadata,proto3" json:"page_metadata,omitempty"`
Messages []*Message `protobuf:"bytes,3,rep,name=messages,proto3" json:"messages,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *ReadMessagesRes) Reset() {
*x = ReadMessagesRes{}
mi := &file_readers_v1_readers_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *ReadMessagesRes) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*ReadMessagesRes) ProtoMessage() {}
func (x *ReadMessagesRes) ProtoReflect() protoreflect.Message {
mi := &file_readers_v1_readers_proto_msgTypes[1]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use ReadMessagesRes.ProtoReflect.Descriptor instead.
func (*ReadMessagesRes) Descriptor() ([]byte, []int) {
return file_readers_v1_readers_proto_rawDescGZIP(), []int{1}
}
func (x *ReadMessagesRes) GetTotal() uint64 {
if x != nil {
return x.Total
}
return 0
}
func (x *ReadMessagesRes) GetPageMetadata() *PageMetadata {
if x != nil {
return x.PageMetadata
}
return nil
}
func (x *ReadMessagesRes) GetMessages() []*Message {
if x != nil {
return x.Messages
}
return nil
}
type Message struct {
state protoimpl.MessageState `protogen:"open.v1"`
// Types that are valid to be assigned to Payload:
//
// *Message_Senml
// *Message_Json
Payload isMessage_Payload `protobuf_oneof:"payload"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *Message) Reset() {
*x = Message{}
mi := &file_readers_v1_readers_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *Message) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*Message) ProtoMessage() {}
func (x *Message) ProtoReflect() protoreflect.Message {
mi := &file_readers_v1_readers_proto_msgTypes[2]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use Message.ProtoReflect.Descriptor instead.
func (*Message) Descriptor() ([]byte, []int) {
return file_readers_v1_readers_proto_rawDescGZIP(), []int{2}
}
func (x *Message) GetPayload() isMessage_Payload {
if x != nil {
return x.Payload
}
return nil
}
func (x *Message) GetSenml() *SenMLMessage {
if x != nil {
if x, ok := x.Payload.(*Message_Senml); ok {
return x.Senml
}
}
return nil
}
func (x *Message) GetJson() *JsonMessage {
if x != nil {
if x, ok := x.Payload.(*Message_Json); ok {
return x.Json
}
}
return nil
}
type isMessage_Payload interface {
isMessage_Payload()
}
type Message_Senml struct {
Senml *SenMLMessage `protobuf:"bytes,1,opt,name=senml,proto3,oneof"`
}
type Message_Json struct {
Json *JsonMessage `protobuf:"bytes,2,opt,name=json,proto3,oneof"`
}
func (*Message_Senml) isMessage_Payload() {}
func (*Message_Json) isMessage_Payload() {}
type BaseMessage struct {
state protoimpl.MessageState `protogen:"open.v1"`
Channel string `protobuf:"bytes,1,opt,name=channel,proto3" json:"channel,omitempty"`
Subtopic string `protobuf:"bytes,2,opt,name=subtopic,proto3" json:"subtopic,omitempty"`
Publisher string `protobuf:"bytes,3,opt,name=publisher,proto3" json:"publisher,omitempty"`
Protocol string `protobuf:"bytes,4,opt,name=protocol,proto3" json:"protocol,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *BaseMessage) Reset() {
*x = BaseMessage{}
mi := &file_readers_v1_readers_proto_msgTypes[3]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *BaseMessage) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*BaseMessage) ProtoMessage() {}
func (x *BaseMessage) ProtoReflect() protoreflect.Message {
mi := &file_readers_v1_readers_proto_msgTypes[3]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use BaseMessage.ProtoReflect.Descriptor instead.
func (*BaseMessage) Descriptor() ([]byte, []int) {
return file_readers_v1_readers_proto_rawDescGZIP(), []int{3}
}
func (x *BaseMessage) GetChannel() string {
if x != nil {
return x.Channel
}
return ""
}
func (x *BaseMessage) GetSubtopic() string {
if x != nil {
return x.Subtopic
}
return ""
}
func (x *BaseMessage) GetPublisher() string {
if x != nil {
return x.Publisher
}
return ""
}
func (x *BaseMessage) GetProtocol() string {
if x != nil {
return x.Protocol
}
return ""
}
type SenMLMessage struct {
state protoimpl.MessageState `protogen:"open.v1"`
Base *BaseMessage `protobuf:"bytes,1,opt,name=base,proto3" json:"base,omitempty"`
Name string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"`
Unit string `protobuf:"bytes,3,opt,name=unit,proto3" json:"unit,omitempty"`
Time float64 `protobuf:"fixed64,4,opt,name=time,proto3" json:"time,omitempty"`
UpdateTime float64 `protobuf:"fixed64,5,opt,name=update_time,json=updateTime,proto3" json:"update_time,omitempty"`
Value float64 `protobuf:"fixed64,6,opt,name=value,proto3" json:"value,omitempty"`
StringValue string `protobuf:"bytes,7,opt,name=string_value,json=stringValue,proto3" json:"string_value,omitempty"`
DataValue string `protobuf:"bytes,8,opt,name=data_value,json=dataValue,proto3" json:"data_value,omitempty"`
BoolValue bool `protobuf:"varint,9,opt,name=bool_value,json=boolValue,proto3" json:"bool_value,omitempty"`
Sum float64 `protobuf:"fixed64,10,opt,name=sum,proto3" json:"sum,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *SenMLMessage) Reset() {
*x = SenMLMessage{}
mi := &file_readers_v1_readers_proto_msgTypes[4]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *SenMLMessage) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*SenMLMessage) ProtoMessage() {}
func (x *SenMLMessage) ProtoReflect() protoreflect.Message {
mi := &file_readers_v1_readers_proto_msgTypes[4]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use SenMLMessage.ProtoReflect.Descriptor instead.
func (*SenMLMessage) Descriptor() ([]byte, []int) {
return file_readers_v1_readers_proto_rawDescGZIP(), []int{4}
}
func (x *SenMLMessage) GetBase() *BaseMessage {
if x != nil {
return x.Base
}
return nil
}
func (x *SenMLMessage) GetName() string {
if x != nil {
return x.Name
}
return ""
}
func (x *SenMLMessage) GetUnit() string {
if x != nil {
return x.Unit
}
return ""
}
func (x *SenMLMessage) GetTime() float64 {
if x != nil {
return x.Time
}
return 0
}
func (x *SenMLMessage) GetUpdateTime() float64 {
if x != nil {
return x.UpdateTime
}
return 0
}
func (x *SenMLMessage) GetValue() float64 {
if x != nil {
return x.Value
}
return 0
}
func (x *SenMLMessage) GetStringValue() string {
if x != nil {
return x.StringValue
}
return ""
}
func (x *SenMLMessage) GetDataValue() string {
if x != nil {
return x.DataValue
}
return ""
}
func (x *SenMLMessage) GetBoolValue() bool {
if x != nil {
return x.BoolValue
}
return false
}
func (x *SenMLMessage) GetSum() float64 {
if x != nil {
return x.Sum
}
return 0
}
type JsonMessage struct {
state protoimpl.MessageState `protogen:"open.v1"`
Base *BaseMessage `protobuf:"bytes,1,opt,name=base,proto3" json:"base,omitempty"`
Created int64 `protobuf:"varint,2,opt,name=created,proto3" json:"created,omitempty"`
Payload []byte `protobuf:"bytes,3,opt,name=payload,proto3" json:"payload,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *JsonMessage) Reset() {
*x = JsonMessage{}
mi := &file_readers_v1_readers_proto_msgTypes[5]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *JsonMessage) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*JsonMessage) ProtoMessage() {}
func (x *JsonMessage) ProtoReflect() protoreflect.Message {
mi := &file_readers_v1_readers_proto_msgTypes[5]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use JsonMessage.ProtoReflect.Descriptor instead.
func (*JsonMessage) Descriptor() ([]byte, []int) {
return file_readers_v1_readers_proto_rawDescGZIP(), []int{5}
}
func (x *JsonMessage) GetBase() *BaseMessage {
if x != nil {
return x.Base
}
return nil
}
func (x *JsonMessage) GetCreated() int64 {
if x != nil {
return x.Created
}
return 0
}
func (x *JsonMessage) GetPayload() []byte {
if x != nil {
return x.Payload
}
return nil
}
type ReadMessagesReq struct {
state protoimpl.MessageState `protogen:"open.v1"`
ChannelId string `protobuf:"bytes,1,opt,name=channel_id,json=channelId,proto3" json:"channel_id,omitempty"`
DomainId string `protobuf:"bytes,2,opt,name=domain_id,json=domainId,proto3" json:"domain_id,omitempty"`
PageMetadata *PageMetadata `protobuf:"bytes,3,opt,name=page_metadata,json=pageMetadata,proto3" json:"page_metadata,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *ReadMessagesReq) Reset() {
*x = ReadMessagesReq{}
mi := &file_readers_v1_readers_proto_msgTypes[6]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *ReadMessagesReq) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*ReadMessagesReq) ProtoMessage() {}
func (x *ReadMessagesReq) ProtoReflect() protoreflect.Message {
mi := &file_readers_v1_readers_proto_msgTypes[6]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use ReadMessagesReq.ProtoReflect.Descriptor instead.
func (*ReadMessagesReq) Descriptor() ([]byte, []int) {
return file_readers_v1_readers_proto_rawDescGZIP(), []int{6}
}
func (x *ReadMessagesReq) GetChannelId() string {
if x != nil {
return x.ChannelId
}
return ""
}
func (x *ReadMessagesReq) GetDomainId() string {
if x != nil {
return x.DomainId
}
return ""
}
func (x *ReadMessagesReq) GetPageMetadata() *PageMetadata {
if x != nil {
return x.PageMetadata
}
return nil
}
var File_readers_v1_readers_proto protoreflect.FileDescriptor
const file_readers_v1_readers_proto_rawDesc = "" +
"\n" +
"\x18readers/v1/readers.proto\x12\n" +
"readers.v1\"\xe4\x03\n" +
"\fPageMetadata\x12\x14\n" +
"\x05limit\x18\x01 \x01(\x04R\x05limit\x12\x16\n" +
"\x06offset\x18\x02 \x01(\x04R\x06offset\x12\x1a\n" +
"\bprotocol\x18\x03 \x01(\tR\bprotocol\x12\x12\n" +
"\x04name\x18\x04 \x01(\tR\x04name\x12\x14\n" +
"\x05value\x18\x05 \x01(\x01R\x05value\x12\x1c\n" +
"\tpublisher\x18\x06 \x01(\tR\tpublisher\x12\x1d\n" +
"\n" +
"bool_value\x18\a \x01(\bR\tboolValue\x12!\n" +
"\fstring_value\x18\b \x01(\tR\vstringValue\x12\x1d\n" +
"\n" +
"data_value\x18\t \x01(\tR\tdataValue\x12\x12\n" +
"\x04from\x18\n" +
" \x01(\x01R\x04from\x12\x0e\n" +
"\x02to\x18\v \x01(\x01R\x02to\x12\x1a\n" +
"\bsubtopic\x18\f \x01(\tR\bsubtopic\x12\x1a\n" +
"\binterval\x18\r \x01(\tR\binterval\x12\x12\n" +
"\x04read\x18\x0e \x01(\bR\x04read\x129\n" +
"\vaggregation\x18\x0f \x01(\x0e2\x17.readers.v1.AggregationR\vaggregation\x12\x1e\n" +
"\n" +
"comparator\x18\x10 \x01(\tR\n" +
"comparator\x12\x16\n" +
"\x06format\x18\x11 \x01(\tR\x06format\"\x97\x01\n" +
"\x0fReadMessagesRes\x12\x14\n" +
"\x05total\x18\x01 \x01(\x04R\x05total\x12=\n" +
"\rpage_metadata\x18\x02 \x01(\v2\x18.readers.v1.PageMetadataR\fpageMetadata\x12/\n" +
"\bmessages\x18\x03 \x03(\v2\x13.readers.v1.MessageR\bmessages\"u\n" +
"\aMessage\x120\n" +
"\x05senml\x18\x01 \x01(\v2\x18.readers.v1.SenMLMessageH\x00R\x05senml\x12-\n" +
"\x04json\x18\x02 \x01(\v2\x17.readers.v1.JsonMessageH\x00R\x04jsonB\t\n" +
"\apayload\"}\n" +
"\vBaseMessage\x12\x18\n" +
"\achannel\x18\x01 \x01(\tR\achannel\x12\x1a\n" +
"\bsubtopic\x18\x02 \x01(\tR\bsubtopic\x12\x1c\n" +
"\tpublisher\x18\x03 \x01(\tR\tpublisher\x12\x1a\n" +
"\bprotocol\x18\x04 \x01(\tR\bprotocol\"\xa1\x02\n" +
"\fSenMLMessage\x12+\n" +
"\x04base\x18\x01 \x01(\v2\x17.readers.v1.BaseMessageR\x04base\x12\x12\n" +
"\x04name\x18\x02 \x01(\tR\x04name\x12\x12\n" +
"\x04unit\x18\x03 \x01(\tR\x04unit\x12\x12\n" +
"\x04time\x18\x04 \x01(\x01R\x04time\x12\x1f\n" +
"\vupdate_time\x18\x05 \x01(\x01R\n" +
"updateTime\x12\x14\n" +
"\x05value\x18\x06 \x01(\x01R\x05value\x12!\n" +
"\fstring_value\x18\a \x01(\tR\vstringValue\x12\x1d\n" +
"\n" +
"data_value\x18\b \x01(\tR\tdataValue\x12\x1d\n" +
"\n" +
"bool_value\x18\t \x01(\bR\tboolValue\x12\x10\n" +
"\x03sum\x18\n" +
" \x01(\x01R\x03sum\"n\n" +
"\vJsonMessage\x12+\n" +
"\x04base\x18\x01 \x01(\v2\x17.readers.v1.BaseMessageR\x04base\x12\x18\n" +
"\acreated\x18\x02 \x01(\x03R\acreated\x12\x18\n" +
"\apayload\x18\x03 \x01(\fR\apayload\"\x8c\x01\n" +
"\x0fReadMessagesReq\x12\x1d\n" +
"\n" +
"channel_id\x18\x01 \x01(\tR\tchannelId\x12\x1b\n" +
"\tdomain_id\x18\x02 \x01(\tR\bdomainId\x12=\n" +
"\rpage_metadata\x18\x03 \x01(\v2\x18.readers.v1.PageMetadataR\fpageMetadata*Y\n" +
"\vAggregation\x12\x1b\n" +
"\x17AGGREGATION_UNSPECIFIED\x10\x00\x12\a\n" +
"\x03MAX\x10\x01\x12\a\n" +
"\x03MIN\x10\x02\x12\a\n" +
"\x03SUM\x10\x03\x12\t\n" +
"\x05COUNT\x10\x04\x12\a\n" +
"\x03AVG\x10\x052\\\n" +
"\x0eReadersService\x12J\n" +
"\fReadMessages\x12\x1b.readers.v1.ReadMessagesReq\x1a\x1b.readers.v1.ReadMessagesRes\"\x00B3Z1github.com/absmach/magistrala/api/grpc/readers/v1b\x06proto3"
var (
file_readers_v1_readers_proto_rawDescOnce sync.Once
file_readers_v1_readers_proto_rawDescData []byte
)
func file_readers_v1_readers_proto_rawDescGZIP() []byte {
file_readers_v1_readers_proto_rawDescOnce.Do(func() {
file_readers_v1_readers_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_readers_v1_readers_proto_rawDesc), len(file_readers_v1_readers_proto_rawDesc)))
})
return file_readers_v1_readers_proto_rawDescData
}
var file_readers_v1_readers_proto_enumTypes = make([]protoimpl.EnumInfo, 1)
var file_readers_v1_readers_proto_msgTypes = make([]protoimpl.MessageInfo, 7)
var file_readers_v1_readers_proto_goTypes = []any{
(Aggregation)(0), // 0: readers.v1.Aggregation
(*PageMetadata)(nil), // 1: readers.v1.PageMetadata
(*ReadMessagesRes)(nil), // 2: readers.v1.ReadMessagesRes
(*Message)(nil), // 3: readers.v1.Message
(*BaseMessage)(nil), // 4: readers.v1.BaseMessage
(*SenMLMessage)(nil), // 5: readers.v1.SenMLMessage
(*JsonMessage)(nil), // 6: readers.v1.JsonMessage
(*ReadMessagesReq)(nil), // 7: readers.v1.ReadMessagesReq
}
var file_readers_v1_readers_proto_depIdxs = []int32{
0, // 0: readers.v1.PageMetadata.aggregation:type_name -> readers.v1.Aggregation
1, // 1: readers.v1.ReadMessagesRes.page_metadata:type_name -> readers.v1.PageMetadata
3, // 2: readers.v1.ReadMessagesRes.messages:type_name -> readers.v1.Message
5, // 3: readers.v1.Message.senml:type_name -> readers.v1.SenMLMessage
6, // 4: readers.v1.Message.json:type_name -> readers.v1.JsonMessage
4, // 5: readers.v1.SenMLMessage.base:type_name -> readers.v1.BaseMessage
4, // 6: readers.v1.JsonMessage.base:type_name -> readers.v1.BaseMessage
1, // 7: readers.v1.ReadMessagesReq.page_metadata:type_name -> readers.v1.PageMetadata
7, // 8: readers.v1.ReadersService.ReadMessages:input_type -> readers.v1.ReadMessagesReq
2, // 9: readers.v1.ReadersService.ReadMessages:output_type -> readers.v1.ReadMessagesRes
9, // [9:10] is the sub-list for method output_type
8, // [8:9] is the sub-list for method input_type
8, // [8:8] is the sub-list for extension type_name
8, // [8:8] is the sub-list for extension extendee
0, // [0:8] is the sub-list for field type_name
}
func init() { file_readers_v1_readers_proto_init() }
func file_readers_v1_readers_proto_init() {
if File_readers_v1_readers_proto != nil {
return
}
file_readers_v1_readers_proto_msgTypes[2].OneofWrappers = []any{
(*Message_Senml)(nil),
(*Message_Json)(nil),
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: unsafe.Slice(unsafe.StringData(file_readers_v1_readers_proto_rawDesc), len(file_readers_v1_readers_proto_rawDesc)),
NumEnums: 1,
NumMessages: 7,
NumExtensions: 0,
NumServices: 1,
},
GoTypes: file_readers_v1_readers_proto_goTypes,
DependencyIndexes: file_readers_v1_readers_proto_depIdxs,
EnumInfos: file_readers_v1_readers_proto_enumTypes,
MessageInfos: file_readers_v1_readers_proto_msgTypes,
}.Build()
File_readers_v1_readers_proto = out.File
file_readers_v1_readers_proto_goTypes = nil
file_readers_v1_readers_proto_depIdxs = nil
}
+130
View File
@@ -0,0 +1,130 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.5.1
// - protoc v6.30.2
// source: readers/v1/readers.proto
package v1
import (
context "context"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
)
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.64.0 or later.
const _ = grpc.SupportPackageIsVersion9
const (
ReadersService_ReadMessages_FullMethodName = "/readers.v1.ReadersService/ReadMessages"
)
// ReadersServiceClient is the client API for ReadersService service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
//
// ReadersService is a service that provides access to
// readers functionalities for Magistrala services.
type ReadersServiceClient interface {
ReadMessages(ctx context.Context, in *ReadMessagesReq, opts ...grpc.CallOption) (*ReadMessagesRes, error)
}
type readersServiceClient struct {
cc grpc.ClientConnInterface
}
func NewReadersServiceClient(cc grpc.ClientConnInterface) ReadersServiceClient {
return &readersServiceClient{cc}
}
func (c *readersServiceClient) ReadMessages(ctx context.Context, in *ReadMessagesReq, opts ...grpc.CallOption) (*ReadMessagesRes, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(ReadMessagesRes)
err := c.cc.Invoke(ctx, ReadersService_ReadMessages_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
// ReadersServiceServer is the server API for ReadersService service.
// All implementations must embed UnimplementedReadersServiceServer
// for forward compatibility.
//
// ReadersService is a service that provides access to
// readers functionalities for Magistrala services.
type ReadersServiceServer interface {
ReadMessages(context.Context, *ReadMessagesReq) (*ReadMessagesRes, error)
mustEmbedUnimplementedReadersServiceServer()
}
// UnimplementedReadersServiceServer must be embedded to have
// forward compatible implementations.
//
// NOTE: this should be embedded by value instead of pointer to avoid a nil
// pointer dereference when methods are called.
type UnimplementedReadersServiceServer struct{}
func (UnimplementedReadersServiceServer) ReadMessages(context.Context, *ReadMessagesReq) (*ReadMessagesRes, error) {
return nil, status.Errorf(codes.Unimplemented, "method ReadMessages not implemented")
}
func (UnimplementedReadersServiceServer) mustEmbedUnimplementedReadersServiceServer() {}
func (UnimplementedReadersServiceServer) testEmbeddedByValue() {}
// UnsafeReadersServiceServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to ReadersServiceServer will
// result in compilation errors.
type UnsafeReadersServiceServer interface {
mustEmbedUnimplementedReadersServiceServer()
}
func RegisterReadersServiceServer(s grpc.ServiceRegistrar, srv ReadersServiceServer) {
// If the following call pancis, it indicates UnimplementedReadersServiceServer was
// embedded by pointer and is nil. This will cause panics if an
// unimplemented method is ever invoked, so we test this at initialization
// time to prevent it from happening at runtime later due to I/O.
if t, ok := srv.(interface{ testEmbeddedByValue() }); ok {
t.testEmbeddedByValue()
}
s.RegisterService(&ReadersService_ServiceDesc, srv)
}
func _ReadersService_ReadMessages_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ReadMessagesReq)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ReadersServiceServer).ReadMessages(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: ReadersService_ReadMessages_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ReadersServiceServer).ReadMessages(ctx, req.(*ReadMessagesReq))
}
return interceptor(ctx, in, info, handler)
}
// ReadersService_ServiceDesc is the grpc.ServiceDesc for ReadersService service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var ReadersService_ServiceDesc = grpc.ServiceDesc{
ServiceName: "readers.v1.ReadersService",
HandlerType: (*ReadersServiceServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "ReadMessages",
Handler: _ReadersService_ReadMessages_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "readers/v1/readers.proto",
}
+31 -5
View File
@@ -12,7 +12,10 @@ import (
"os"
chclient "github.com/absmach/callhome/pkg/client"
httpapi "github.com/absmach/magistrala/readers/api"
grpcReadersV1 "github.com/absmach/magistrala/api/grpc/readers/v1"
readersgrpcapi "github.com/absmach/magistrala/readers/api/grpc"
httpapi "github.com/absmach/magistrala/readers/api/http"
middleware "github.com/absmach/magistrala/readers/middleware"
"github.com/absmach/magistrala/readers/postgres"
"github.com/absmach/supermq"
smqlog "github.com/absmach/supermq/logger"
@@ -21,12 +24,15 @@ import (
pgclient "github.com/absmach/supermq/pkg/postgres"
"github.com/absmach/supermq/pkg/prometheus"
"github.com/absmach/supermq/pkg/server"
grpcserver "github.com/absmach/supermq/pkg/server/grpc"
httpserver "github.com/absmach/supermq/pkg/server/http"
"github.com/absmach/supermq/pkg/uuid"
"github.com/absmach/supermq/readers"
"github.com/caarlos0/env/v11"
"github.com/jmoiron/sqlx"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
)
const (
@@ -38,6 +44,8 @@ const (
envPrefixChannels = "SMQ_CHANNELS_GRPC_"
defDB = "supermq"
defSvcHTTPPort = "9009"
defSvcGRPCPort = "7009"
envPrefixGrpc = "MG_POSTGRES_READER_GRPC_"
)
type config struct {
@@ -85,6 +93,19 @@ func main() {
}
defer db.Close()
repo := newService(db, logger)
grpcServerConfig := server.Config{Port: defSvcGRPCPort}
if err := env.ParseWithOptions(&grpcServerConfig, env.Options{Prefix: envPrefixGrpc}); err != nil {
logger.Error(fmt.Sprintf("failed to load %s gRPC server configuration : %s", svcName, err.Error()))
exitCode = 1
return
}
registerReadersServiceServer := func(srv *grpc.Server) {
reflection.Register(srv)
grpcReadersV1.RegisterReadersServiceServer(srv, readersgrpcapi.NewReadersServer(repo))
}
clientsClientCfg := grpcclient.Config{}
if err := env.ParseWithOptions(&clientsClientCfg, env.Options{Prefix: envPrefixClients}); err != nil {
logger.Error(fmt.Sprintf("failed to load clients gRPC client configuration : %s", err))
@@ -99,6 +120,7 @@ func main() {
return
}
defer clientsHandler.Close()
logger.Info("Clients service gRPC client successfully connected to clients gRPC server " + clientsHandler.Secure())
channelsClientCfg := grpcclient.Config{}
@@ -133,8 +155,6 @@ func main() {
defer authnHandler.Close()
logger.Info("authn successfully connected to auth gRPC server " + authnHandler.Secure())
repo := newService(db, logger)
httpServerConfig := server.Config{Port: defSvcHTTPPort}
if err := env.ParseWithOptions(&httpServerConfig, env.Options{Prefix: envPrefixHTTP}); err != nil {
logger.Error(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
@@ -148,6 +168,12 @@ func main() {
go chc.CallHome(ctx)
}
gs := grpcserver.NewServer(ctx, cancel, svcName, grpcServerConfig, registerReadersServiceServer, logger)
g.Go(func() error {
return gs.Start()
})
g.Go(func() error {
return hs.Start()
})
@@ -163,9 +189,9 @@ func main() {
func newService(db *sqlx.DB, logger *slog.Logger) readers.MessageRepository {
svc := postgres.New(db)
svc = httpapi.LoggingMiddleware(svc, logger)
svc = middleware.LoggingMiddleware(svc, logger)
counter, latency := prometheus.MakeMetrics("postgres", "message_reader")
svc = httpapi.MetricsMiddleware(svc, counter, latency)
svc = middleware.MetricsMiddleware(svc, counter, latency)
return svc
}
+30 -3
View File
@@ -12,7 +12,10 @@ import (
"os"
chclient "github.com/absmach/callhome/pkg/client"
httpapi "github.com/absmach/magistrala/readers/api"
grpcReadersV1 "github.com/absmach/magistrala/api/grpc/readers/v1"
readersgrpcapi "github.com/absmach/magistrala/readers/api/grpc"
httpapi "github.com/absmach/magistrala/readers/api/http"
middleware "github.com/absmach/magistrala/readers/middleware"
"github.com/absmach/magistrala/readers/timescale"
"github.com/absmach/supermq"
smqlog "github.com/absmach/supermq/logger"
@@ -21,12 +24,15 @@ import (
pgclient "github.com/absmach/supermq/pkg/postgres"
"github.com/absmach/supermq/pkg/prometheus"
"github.com/absmach/supermq/pkg/server"
grpcserver "github.com/absmach/supermq/pkg/server/grpc"
httpserver "github.com/absmach/supermq/pkg/server/http"
"github.com/absmach/supermq/pkg/uuid"
"github.com/absmach/supermq/readers"
"github.com/caarlos0/env/v11"
"github.com/jmoiron/sqlx"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
)
const (
@@ -38,6 +44,8 @@ const (
envPrefixChannels = "SMQ_CHANNELS_GRPC_"
defDB = "messages"
defSvcHTTPPort = "9011"
defSvcGRPCPort = "7011"
envPrefixGrpc = "MG_TIMESCALE_READER_GRPC_"
)
type config struct {
@@ -85,6 +93,19 @@ func main() {
repo := newService(db, logger)
grpcServerConfig := server.Config{
Port: defSvcGRPCPort,
}
if err := env.ParseWithOptions(&grpcServerConfig, env.Options{Prefix: envPrefixGrpc}); err != nil {
logger.Error(fmt.Sprintf("failed to load %s gRPC server configuration : %s", svcName, err.Error()))
exitCode = 1
return
}
registerReadersServiceServer := func(srv *grpc.Server) {
reflection.Register(srv)
grpcReadersV1.RegisterReadersServiceServer(srv, readersgrpcapi.NewReadersServer(repo))
}
clientsClientCfg := grpcclient.Config{}
if err := env.ParseWithOptions(&clientsClientCfg, env.Options{Prefix: envPrefixClients}); err != nil {
logger.Error(fmt.Sprintf("failed to load %s auth configuration : %s", svcName, err))
@@ -147,6 +168,12 @@ func main() {
go chc.CallHome(ctx)
}
gs := grpcserver.NewServer(ctx, cancel, svcName, grpcServerConfig, registerReadersServiceServer, logger)
g.Go(func() error {
return gs.Start()
})
g.Go(func() error {
return hs.Start()
})
@@ -162,9 +189,9 @@ func main() {
func newService(db *sqlx.DB, logger *slog.Logger) readers.MessageRepository {
svc := timescale.New(db)
svc = httpapi.LoggingMiddleware(svc, logger)
svc = middleware.LoggingMiddleware(svc, logger)
counter, latency := prometheus.MakeMetrics("timescale", "message_reader")
svc = httpapi.MetricsMiddleware(svc, counter, latency)
svc = middleware.MetricsMiddleware(svc, counter, latency)
return svc
}
+20
View File
@@ -189,9 +189,19 @@ MG_POSTGRES_WRITER_INSTANCE_ID=
MG_POSTGRES_READER_LOG_LEVEL=debug
MG_POSTGRES_READER_HTTP_HOST=postgres-reader
MG_POSTGRES_READER_HTTP_PORT=9009
MG_POSTGRES_READER_GRPC_HOST=postgres-reader
MG_POSTGRES_READER_GRPC_PORT=7009
MG_POSTGRES_READER_HTTP_SERVER_CERT=
MG_POSTGRES_READER_HTTP_SERVER_KEY=
MG_POSTGRES_READER_INSTANCE_ID=
MG_POSTGRES_READER_GRPC_URL=postgres-reader:7011
MG_POSTGRES_READER_GRPC_TIMEOUT=300s
MG_POSTGRES_READER_GRPC_CLIENT_CERT=${GRPC_MTLS:+./ssl/certs/readers-grpc-client.crt}
MG_POSTGRES_READER_GRPC_CLIENT_CA_CERTS=${GRPC_MTLS:+./ssl/certs/ca.crt}
MG_POSTGRES_READER_GRPC_CLIENT_KEY=${GRPC_MTLS:+./ssl/certs/readers-grpc-client.key}
MG_POSTGRES_READER_GRPC_SERVER_CERT=${GRPC_MTLS:+./ssl/certs/readers-grpc-server.crt}${GRPC_TLS:+./ssl/certs/readers-grpc-server.crt}
MG_POSTGRES_READER_GRPC_SERVER_KEY=${GRPC_MTLS:+./ssl/certs/readers-grpc-server.key}${GRPC_TLS:+./ssl/certs/readers-grpc-server.key}
MG_POSTGRES_READER_GRPC_SERVER_CA_CERTS=${GRPC_MTLS:+./ssl/certs/ca.crt}${GRPC_TLS:+./ssl/certs/ca.crt}
### Timescale
MG_TIMESCALE_HOST=timescale
@@ -217,9 +227,19 @@ MG_TIMESCALE_WRITER_INSTANCE_ID=
MG_TIMESCALE_READER_LOG_LEVEL=debug
MG_TIMESCALE_READER_HTTP_HOST=timescale-reader
MG_TIMESCALE_READER_HTTP_PORT=9011
MG_TIMESCALE_READER_GRPC_HOST=timescale-reader
MG_TIMESCALE_READER_GRPC_PORT=7011
MG_TIMESCALE_READER_HTTP_SERVER_CERT=
MG_TIMESCALE_READER_HTTP_SERVER_KEY=
MG_TIMESCALE_READER_INSTANCE_ID=
MG_TIMESCALE_READER_GRPC_URL=timescale-reader:7011
MG_TIMESCALE_READER_GRPC_TIMEOUT=300s
MG_TIMESCALE_READER_GRPC_CLIENT_CERT=${GRPC_MTLS:+./ssl/certs/reader-grpc-client.crt}
MG_TIMESCALE_READER_GRPC_CLIENT_CA_CERTS=${GRPC_MTLS:+./ssl/certs/ca.crt}
MG_TIMESCALE_READER_GRPC_CLIENT_KEY=${GRPC_MTLS:+./ssl/certs/readers-grpc-client.key}
MG_TIMESCALE_READER_GRPC_SERVER_CERT=${GRPC_MTLS:+./ssl/certs/readers-grpc-server.crt}${GRPC_TLS:+./ssl/certs/readers-grpc-server.crt}
MG_TIMESCALE_READER_GRPC_SERVER_KEY=${GRPC_MTLS:+./ssl/certs/readers-grpc-server.key}${GRPC_TLS:+./ssl/certs/readers-grpc-server.key}
MG_TIMESCALE_READER_GRPC_SERVER_CA_CERTS=${GRPC_MTLS:+./ssl/certs/ca.crt}${GRPC_TLS:+./ssl/certs/ca.crt}
### GRAFANA and PROMETHEUS
SMQ_PROMETHEUS_PORT=9090
@@ -35,6 +35,16 @@ services:
MG_THINGS_AUTH_GRPC_CLIENT_CERT: ${MG_THINGS_AUTH_GRPC_CLIENT_CERT:+/things-grpc-client.crt}
SMQ_CLIENTS_GRPC_CLIENT_KEY: ${SMQ_CLIENTS_GRPC_CLIENT_KEY:+/things-grpc-client.key}
SMQ_CLIENTS_GRPC_SERVER_CA_CERTS: ${SMQ_CLIENTS_GRPC_SERVER_CA_CERTS:+/things-grpc-server-ca.crt}
MG_POSTGRES_READER_GRPC_URL: ${MG_POSTGRES_READER_GRPC_URL}
MG_POSTGRES_READER_GRPC_PORT: ${MG_POSTGRES_READER_GRPC_PORT}
MG_POSTGRES_READER_GRPC_HOST: ${MG_POSTGRES_READER_GRPC_HOST}
MG_POSTGRES_READER_GRPC_TIMEOUT: ${MG_POSTGRES_READER_GRPC_TIMEOUT}
MG_POSTGRES_READER_GRPC_CLIENT_CERT: ${MG_POSTGRES_READER_GRPC_CLIENT_CERT:+./ssl/certs/reader-grpc-client.crt}
MG_POSTGRES_READER_GRPC_CLIENT_CA_CERTS: ${MG_POSTGRES_READER_GRPC_CLIENT_CA_CERTS:+./ssl/certs/ca.crt}
MG_POSTGRES_READER_GRPC_SERVER_CA_CERTS: ${MG_POSTGRES_READER_GRPC_SERVER_CA_CERTS:+./ssl/certs/ca.crt}
MG_POSTGRES_READER_GRPC_CLIENT_KEY: ${MG_POSTGRES_READER_GRPC_CLIENT_KEY:+/readers-grpc-client.key}
MG_POSTGRES_READER_GRPC_SERVER_CERT: ${MG_POSTGRES_READER_GRPC_SERVER_CERT:+./ssl/certs/readers-grpc-server.crt}
MG_POSTGRES_READER_GRPC_SERVER_KEY: ${MG_POSTGRES_READER_GRPC_SERVER_KEY:+./ssl/certs/readers-grpc-server.key}
SMQ_AUTH_GRPC_URL: ${SMQ_AUTH_GRPC_URL}
SMQ_AUTH_GRPC_TIMEOUT: ${SMQ_AUTH_GRPC_TIMEOUT}
SMQ_AUTH_GRPC_CLIENT_CERT: ${SMQ_AUTH_GRPC_CLIENT_CERT:+/auth-grpc-client.crt}
@@ -44,6 +54,7 @@ services:
MG_POSTGRES_READER_INSTANCE_ID: ${MG_POSTGRES_READER_INSTANCE_ID}
ports:
- ${MG_POSTGRES_READER_HTTP_PORT}:${MG_POSTGRES_READER_HTTP_PORT}
- ${MG_POSTGRES_READER_GRPC_PORT}:${MG_POSTGRES_READER_GRPC_PORT}
networks:
- magistrala-base-net
volumes:
@@ -78,3 +89,34 @@ services:
target: /things-grpc-server-ca${SMQ_CLIENTS_GRPC_SERVER_CA_CERTS:+.crt}
bind:
create_host_path: true
# Reader gRPC mTLS client certificates
- type: bind
source: ${MG_POSTGRES_READER_GRPC_SERVER_CERT:-ssl/certs/dummy/server_cert}
target: /readers-grpc-server${MG_POSTGRES_READER_GRPC_SERVER_CERT:+.crt}
bind:
create_host_path: true
- type: bind
source: ${MG_POSTGRES_READER_GRPC_SERVER_KEY:-ssl/certs/dummy/server_key}
target: /readers-grpc-server${MG_POSTGRES_READER_GRPC_SERVER_KEY:+.key}
bind:
create_host_path: true
- type: bind
source: ${MG_POSTGRES_READER_GRPC_SERVER_CA_CERTS:-ssl/certs/dummy/server_ca_certs}
target: /readers-grpc-server-ca${MG_POSTGRES_READER_GRPC_SERVER_CA_CERTS:+.crt}
bind:
create_host_path: true
- type: bind
source: ${MG_POSTGRES_READER_GRPC_CLIENT_CA_CERTS:-ssl/certs/dummy/client_ca_certs}
target: /readers-grpc-server${MG_POSTGRES_READER_GRPC_CLIENT_CA_CERTS:+.crt}
bind:
create_host_path: true
- type: bind
source: ${MG_POSTGRES_READER_GRPC_CLIENT_CERT:-ssl/certs/dummy/client_cert}
target: /readers-grpc-client${MG_POSTGRES_READER_GRPC_CLIENT_CERT:+.crt}
bind:
create_host_path: true
- type: bind
source: ${MG_POSTGRES_READER_GRPC_CLIENT_KEY:-ssl/certs/dummy/client_key}
target: /readers-grpc-client${MG_POSTGRES_READER_GRPC_CLIENT_KEY:+.key}
bind:
create_host_path: true
@@ -9,6 +9,7 @@
networks:
magistrala-base-net:
driver: bridge
services:
timescale-reader:
@@ -40,6 +41,16 @@ services:
SMQ_CHANNELS_GRPC_CLIENT_CERT: ${SMQ_CHANNELS_GRPC_CLIENT_CERT:+/channels-grpc-client.crt}
SMQ_CHANNELS_GRPC_CLIENT_KEY: ${SMQ_CHANNELS_GRPC_CLIENT_KEY:+/channels-grpc-client.key}
SMQ_CHANNELS_GRPC_SERVER_CA_CERTS: ${SMQ_CHANNELS_GRPC_SERVER_CA_CERTS:+/channels-grpc-server-ca.crt}
MG_TIMESCALE_READER_GRPC_URL: ${MG_TIMESCALE_READER_GRPC__URL}
MG_TIMESCALE_READER_GRPC_PORT: ${MG_TIMESCALE_READER_GRPC_PORT}
MG_TIMESCALE_READER_GRPC_HOST: ${MG_TIMESCALE_READER_GRPC_HOST}
MG_TIMESCALE_READER_GRPC_TIMEOUT: ${MG_TIMESCALE_READER_GRPC_TIMEOUT}
MG_TIMESCALE_READER_GRPC_CLIENT_CERT: ${MG_TIMESCALE_READER_GRPC_CLIENT_CERT:+./ssl/certs/reader-grpc-client.crt}
MG_TIMESCALE_READER_GRPC_CLIENT_CA_CERTS: ${MG_TIMESCALE_READER_GRPC_CLIENT_CA_CERTS:+./ssl/certs/ca.crt}
MG_TIMESCALE_READER_GRPC_SERVER_CA_CERTS: ${MG_TIMESCALE_READER_GRPC_SERVER_CA_CERTS:+./ssl/certs/ca.crt}
MG_TIMESCALE_READER_GRPC_CLIENT_KEY: ${MG_TIMESCALE_READER_GRPC_CLIENT_KEY:+/readers-grpc-client.key}
MG_TIMESCALE_READER_GRPC_SERVER_CERT: ${MG_TIMESCALE_READER_GRPC_SERVER_CERT:+./ssl/certs/readers-grpc-server.crt}
MG_TIMESCALE_READER_GRPC_SERVER_KEY: ${MG_TIMESCALE_READER_GRPC_SERVER_KEY:+./ssl/certs/readers-grpc-server.key}
SMQ_AUTH_GRPC_URL: ${SMQ_AUTH_GRPC_URL}
SMQ_AUTH_GRPC_TIMEOUT: ${SMQ_AUTH_GRPC_TIMEOUT}
SMQ_AUTH_GRPC_CLIENT_CERT: ${SMQ_AUTH_GRPC_CLIENT_CERT:+/auth-grpc-client.crt}
@@ -48,7 +59,8 @@ services:
SMQ_SEND_TELEMETRY: ${SMQ_SEND_TELEMETRY}
MG_TIMESCALE_READER_INSTANCE_ID: ${MG_TIMESCALE_READER_INSTANCE_ID}
ports:
- ${MG_TIMESCALE_READER_HTTP_PORT}:${MG_TIMESCALE_READER_HTTP_PORT}
- ${MG_TIMESCALE_READER_HTTP_PORT}:${MG_TIMESCALE_READER_HTTP_PORT}
- ${MG_TIMESCALE_READER_GRPC_PORT}:${MG_TIMESCALE_READER_GRPC_PORT}
networks:
- magistrala-base-net
volumes:
@@ -83,3 +95,34 @@ services:
target: /things-grpc-server-ca${SMQ_CLIENTS_GRPC_SERVER_CA_CERTS:+.crt}
bind:
create_host_path: true
# Reader gRPC mTLS client certificates
- type: bind
source: ${MG_TIMESCALE_READER_GRPC_SERVER_CERT:-ssl/certs/dummy/server_cert}
target: /readers-grpc-server${MG_TIMESCALE_READER_GRPC_SERVER_CERT:+.crt}
bind:
create_host_path: true
- type: bind
source: ${MG_TIMESCALE_READER_GRPC_SERVER_KEY:-ssl/certs/dummy/server_key}
target: /readers-grpc-server${MG_TIMESCALE_READER_GRPC_SERVER_KEY:+.key}
bind:
create_host_path: true
- type: bind
source: ${MG_TIMESCALE_READER_GRPC_SERVER_CA_CERTS:-ssl/certs/dummy/server_ca_certs}
target: /readers-grpc-server-ca${MG_TIMESCALE_READER_GRPC_SERVER_CA_CERTS:+.crt}
bind:
create_host_path: true
- type: bind
source: ${MG_TIMESCALE_READER_GRPC_CLIENT_CA_CERTS:-ssl/certs/dummy/client_ca_certs}
target: /readers-grpc-server${MG_TIMESCALE_READER_GRPC_CLIENT_CA_CERTS:+.crt}
bind:
create_host_path: true
- type: bind
source: ${MG_TIMESCALE_READER_GRPC_CLIENT_CERT:-ssl/certs/dummy/client_cert}
target: /readers-grpc-client${MG_TIMESCALE_READER_GRPC_CLIENT_CERT:+.crt}
bind:
create_host_path: true
- type: bind
source: ${MG_TIMESCALE_READER_GRPC_CLIENT_KEY:-ssl/certs/dummy/client_key}
target: /readers-grpc-client${MG_TIMESCALE_READER_GRPC_CLIENT_KEY:+.key}
bind:
create_host_path: true
+1 -1
View File
@@ -139,7 +139,7 @@ require (
golang.org/x/text v0.23.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20250303144028-a0af3efb3deb // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250303144028-a0af3efb3deb // indirect
google.golang.org/protobuf v1.36.6 // indirect
google.golang.org/protobuf v1.36.6
gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
+90
View File
@@ -0,0 +1,90 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
syntax = "proto3";
package readers.v1;
option go_package = "github.com/absmach/magistrala/api/grpc/readers/v1";
// ReadersService is a service that provides access to
// readers functionalities for Magistrala services.
service ReadersService {
rpc ReadMessages(ReadMessagesReq)
returns (ReadMessagesRes) {}
}
message PageMetadata {
uint64 limit = 1;
uint64 offset = 2;
string protocol = 3;
string name = 4;
double value = 5;
string publisher = 6;
bool bool_value = 7;
string string_value = 8;
string data_value = 9;
double from = 10;
double to = 11;
string subtopic = 12;
string interval = 13;
bool read = 14;
Aggregation aggregation = 15;
string comparator = 16;
string format = 17;
}
message ReadMessagesRes {
uint64 total = 1;
PageMetadata page_metadata = 2;
repeated Message messages = 3;
}
message Message {
oneof payload {
SenMLMessage senml = 1;
JsonMessage json = 2;
}
}
message BaseMessage {
string channel = 1;
string subtopic = 2;
string publisher = 3;
string protocol = 4;
}
message SenMLMessage {
BaseMessage base = 1;
string name = 2;
string unit = 3;
double time = 4;
double update_time = 5;
double value = 6;
string string_value = 7;
string data_value = 8;
bool bool_value = 9;
double sum = 10;
}
message JsonMessage {
BaseMessage base = 1;
int64 created = 2;
bytes payload = 3;
}
message ReadMessagesReq {
string channel_id = 1;
string domain_id = 2;
PageMetadata page_metadata = 3;
}
// Aggregation defines supported data aggregations.
enum Aggregation {
AGGREGATION_UNSPECIFIED = 0;
MAX = 1;
MIN = 2;
SUM = 3;
COUNT = 4;
AVG = 5;
}
+1 -1
View File
@@ -9,7 +9,7 @@ import (
"testing"
sdk "github.com/absmach/magistrala/pkg/sdk"
readersapi "github.com/absmach/magistrala/readers/api"
readersapi "github.com/absmach/magistrala/readers/api/http"
grpcChannelsV1 "github.com/absmach/supermq/api/grpc/channels/v1"
apiutil "github.com/absmach/supermq/api/http/util"
chmocks "github.com/absmach/supermq/channels/mocks"
+234
View File
@@ -0,0 +1,234 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package grpc
import (
"context"
"encoding/json"
"fmt"
"strings"
"time"
grpcReadersV1 "github.com/absmach/magistrala/api/grpc/readers/v1"
"github.com/absmach/magistrala/pkg/errors"
svcerr "github.com/absmach/supermq/pkg/errors/service"
"github.com/absmach/supermq/pkg/transformers/senml"
readers "github.com/absmach/supermq/readers"
"github.com/go-kit/kit/endpoint"
kitgrpc "github.com/go-kit/kit/transport/grpc"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
const readersSvcName = "readers.v1.ReadersService"
var _ grpcReadersV1.ReadersServiceClient = (*readersGrpcClient)(nil)
type readersGrpcClient struct {
readMessages endpoint.Endpoint
timeout time.Duration
}
// NewReadersClient returns new readers gRPC client instance.
func NewReadersClient(conn *grpc.ClientConn, timeout time.Duration) grpcReadersV1.ReadersServiceClient {
return &readersGrpcClient{
readMessages: kitgrpc.NewClient(
conn,
readersSvcName,
"ReadMessages",
encodeReadMessagesRequest,
decodeReadMessagesResponse,
grpcReadersV1.ReadMessagesRes{},
).Endpoint(),
timeout: timeout,
}
}
func (client readersGrpcClient) ReadMessages(ctx context.Context, in *grpcReadersV1.ReadMessagesReq, opts ...grpc.CallOption) (*grpcReadersV1.ReadMessagesRes, error) {
ctx, cancel := context.WithTimeout(ctx, client.timeout)
defer cancel()
res, err := client.readMessages(ctx, readMessagesReq{
chanID: in.GetChannelId(),
domain: in.GetDomainId(),
pageMeta: readers.PageMetadata{
Offset: in.GetPageMetadata().GetOffset(),
Limit: in.GetPageMetadata().GetLimit(),
Comparator: in.GetPageMetadata().GetComparator(),
Aggregation: in.GetPageMetadata().GetAggregation().String(),
From: in.GetPageMetadata().GetFrom(),
To: in.GetPageMetadata().GetTo(),
Interval: in.GetPageMetadata().GetInterval(),
Subtopic: in.GetPageMetadata().GetSubtopic(),
Publisher: in.GetPageMetadata().GetPublisher(),
Protocol: in.GetPageMetadata().GetProtocol(),
Name: in.GetPageMetadata().GetName(),
Value: in.GetPageMetadata().GetValue(),
BoolValue: in.GetPageMetadata().GetBoolValue(),
StringValue: in.GetPageMetadata().GetStringValue(),
DataValue: in.GetPageMetadata().GetDataValue(),
Format: in.GetPageMetadata().GetFormat(),
},
})
if err != nil {
return &grpcReadersV1.ReadMessagesRes{}, decodeError(err)
}
dpr := res.(readMessagesRes)
return &grpcReadersV1.ReadMessagesRes{
Total: dpr.Total,
Messages: toResponseMessages(dpr.Messages),
PageMetadata: &grpcReadersV1.PageMetadata{
Offset: dpr.PageMetadata.Offset,
Limit: dpr.PageMetadata.Limit,
},
}, nil
}
func decodeReadMessagesResponse(_ context.Context, grpcRes interface{}) (interface{}, error) {
res := grpcRes.(*grpcReadersV1.ReadMessagesRes)
return readMessagesRes{
Total: res.Total,
Messages: fromResponseMessages(res.Messages),
PageMetadata: readers.PageMetadata{
Offset: res.GetPageMetadata().GetOffset(),
Limit: res.GetPageMetadata().GetLimit(),
},
}, nil
}
func encodeReadMessagesRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
req := grpcReq.(readMessagesReq)
return &grpcReadersV1.ReadMessagesReq{
ChannelId: req.chanID,
DomainId: req.domain,
PageMetadata: &grpcReadersV1.PageMetadata{
Offset: req.pageMeta.Offset,
Limit: req.pageMeta.Limit,
Comparator: req.pageMeta.Comparator,
Aggregation: parseAggregation(req.pageMeta.Aggregation),
From: req.pageMeta.From,
To: req.pageMeta.To,
Interval: req.pageMeta.Interval,
Subtopic: req.pageMeta.Subtopic,
Publisher: req.pageMeta.Publisher,
Protocol: req.pageMeta.Protocol,
Name: req.pageMeta.Name,
Value: req.pageMeta.Value,
BoolValue: req.pageMeta.BoolValue,
StringValue: req.pageMeta.StringValue,
DataValue: req.pageMeta.DataValue,
Format: req.pageMeta.Format,
},
}, nil
}
func fromResponseMessages(protoMessages []*grpcReadersV1.Message) []readers.Message {
var messages []readers.Message
for _, m := range protoMessages {
switch msg := m.Payload.(type) {
case *grpcReadersV1.Message_Senml:
s := msg.Senml
base := s.GetBase()
typed := senml.Message{
Channel: base.GetChannel(),
Subtopic: base.GetSubtopic(),
Publisher: base.GetPublisher(),
Protocol: base.GetProtocol(),
Name: s.GetName(),
Unit: s.GetUnit(),
Time: s.GetTime(),
UpdateTime: s.GetUpdateTime(),
Value: optionalFloat64(s.GetValue()),
StringValue: optionalString(s.GetStringValue()),
DataValue: optionalString(s.GetDataValue()),
BoolValue: optionalBool(s.GetBoolValue()),
Sum: optionalFloat64(s.GetSum()),
}
messages = append(messages, typed)
case *grpcReadersV1.Message_Json:
j := msg.Json
base := j.GetBase()
var p map[string]interface{}
if err := json.Unmarshal(j.GetPayload(), &p); err != nil {
continue
}
messages = append(messages, map[string]interface{}{
"channel": base.GetChannel(),
"created": j.GetCreated(),
"subtopic": base.GetSubtopic(),
"publisher": base.GetPublisher(),
"protocol": base.GetProtocol(),
"payload": p,
})
}
}
return messages
}
func parseAggregation(agg string) grpcReadersV1.Aggregation {
switch strings.ToUpper(agg) {
case "MAX":
return grpcReadersV1.Aggregation_MAX
case "MIN":
return grpcReadersV1.Aggregation_MIN
case "SUM":
return grpcReadersV1.Aggregation_SUM
case "COUNT":
return grpcReadersV1.Aggregation_COUNT
case "AVG":
return grpcReadersV1.Aggregation_AVG
default:
return grpcReadersV1.Aggregation_AGGREGATION_UNSPECIFIED
}
}
func decodeError(err error) error {
if st, ok := status.FromError(err); ok {
switch st.Code() {
case codes.Unauthenticated:
return errors.Wrap(svcerr.ErrAuthentication, errors.New(st.Message()))
case codes.PermissionDenied:
return errors.Wrap(svcerr.ErrAuthorization, errors.New(st.Message()))
case codes.InvalidArgument:
return errors.Wrap(errors.ErrMalformedEntity, errors.New(st.Message()))
case codes.FailedPrecondition:
return errors.Wrap(errors.ErrMalformedEntity, errors.New(st.Message()))
case codes.NotFound:
return errors.Wrap(svcerr.ErrNotFound, errors.New(st.Message()))
case codes.AlreadyExists:
return errors.Wrap(svcerr.ErrConflict, errors.New(st.Message()))
case codes.OK:
if msg := st.Message(); msg != "" {
return errors.Wrap(errors.ErrUnidentified, errors.New(msg))
}
return nil
default:
return errors.Wrap(fmt.Errorf("unexpected gRPC status: %s (status code:%v)", st.Code().String(), st.Code()), errors.New(st.Message()))
}
}
return err
}
func optionalString(v string) *string {
if v == "" {
return nil
}
return &v
}
func optionalFloat64(v float64) *float64 {
if v == 0 {
return nil
}
return &v
}
func optionalBool(v bool) *bool {
if !v {
return nil
}
return &v
}
+5
View File
@@ -0,0 +1,5 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
// Package grpc contains implementation of Readers service gRPC API.
package grpc
+31
View File
@@ -0,0 +1,31 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package grpc
import (
"context"
readers "github.com/absmach/supermq/readers"
"github.com/go-kit/kit/endpoint"
)
func readMessagesEndpoint(svc readers.MessageRepository) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (interface{}, error) {
req := request.(readMessagesReq)
if err := req.validate(); err != nil {
return readMessagesRes{}, err
}
page, err := svc.ReadAll(req.chanID, req.pageMeta)
if err != nil {
return readMessagesRes{}, err
}
return readMessagesRes{
PageMetadata: page.PageMetadata,
Total: page.Total,
Messages: page.Messages,
}, nil
}
}
+230
View File
@@ -0,0 +1,230 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package grpc_test
import (
"context"
"encoding/json"
"fmt"
"net"
"testing"
"time"
grpcReadersV1 "github.com/absmach/magistrala/api/grpc/readers/v1"
"github.com/absmach/magistrala/pkg/errors"
grpcapi "github.com/absmach/magistrala/readers/api/grpc"
apiutil "github.com/absmach/supermq/api/http/util"
"github.com/absmach/supermq/pkg/transformers/senml"
"github.com/absmach/supermq/readers"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
const (
port = 7071
channelID = "testChannelID"
domain = "testDomain"
validID = "validID"
validToken = "valid"
inValidToken = "invalid"
testOffset = 0
testLimit = 10
)
var authAddr = fmt.Sprintf("localhost:%d", port)
func startGRPCServer(svc readers.MessageRepository, port int) *grpc.Server {
listener, _ := net.Listen("tcp", fmt.Sprintf(":%d", port))
server := grpc.NewServer()
grpcReadersV1.RegisterReadersServiceServer(server, grpcapi.NewReadersServer(svc))
go func() {
err := server.Serve(listener)
assert.Nil(&testing.T{}, err, fmt.Sprintf(`"Unexpected error creating reader server %s"`, err))
}()
return server
}
func TestReadMessages(t *testing.T) {
conn, err := grpc.NewClient(authAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
assert.Nil(t, err, fmt.Sprintf("Unexpected error creating client connection %s", err))
grpcClient := grpcapi.NewReadersClient(conn, time.Second)
tmp := readers.MessagesPage{
Total: 1,
PageMetadata: readers.PageMetadata{
Offset: 0,
Limit: 10,
},
Messages: []readers.Message{
map[string]interface{}{
"channel": "testChannel",
"created": int64(123456789),
"subtopic": "testSubtopic",
"publisher": "testPublisher",
"protocol": "testProtocol",
"payload": map[string]interface{}{
"temp": 23.5,
},
},
},
}
expectedPayload, err := json.Marshal(tmp.Messages[0].(map[string]interface{})["payload"])
require.NoError(t, err)
expectedRes := &grpcReadersV1.ReadMessagesRes{
Total: 1,
Messages: []*grpcReadersV1.Message{
{
Payload: &grpcReadersV1.Message_Json{
Json: &grpcReadersV1.JsonMessage{
Base: &grpcReadersV1.BaseMessage{
Channel: "testChannel",
Subtopic: "testSubtopic",
Publisher: "testPublisher",
Protocol: "testProtocol",
},
Created: 123456789,
Payload: expectedPayload,
},
},
},
},
PageMetadata: &grpcReadersV1.PageMetadata{
Offset: 0,
Limit: 10,
},
}
cases := []struct {
desc string
token string
svcRes readers.MessagesPage
ReadMessagesReq *grpcReadersV1.ReadMessagesReq
ReadMessagesRes *grpcReadersV1.ReadMessagesRes
err error
}{
{
desc: "read valid req",
token: validToken,
ReadMessagesReq: &grpcReadersV1.ReadMessagesReq{
ChannelId: channelID,
DomainId: domain,
PageMetadata: &grpcReadersV1.PageMetadata{
Offset: testOffset,
Limit: testLimit,
},
},
svcRes: tmp,
ReadMessagesRes: expectedRes,
err: nil,
},
{
desc: " read missing channel id",
token: validToken,
ReadMessagesReq: &grpcReadersV1.ReadMessagesReq{
ChannelId: "",
DomainId: domain,
PageMetadata: &grpcReadersV1.PageMetadata{
Offset: testOffset,
Limit: testLimit,
},
},
ReadMessagesRes: &grpcReadersV1.ReadMessagesRes{},
err: apiutil.ErrMissingID,
},
{
desc: "read valid SenML message",
token: validToken,
ReadMessagesReq: &grpcReadersV1.ReadMessagesReq{
ChannelId: channelID,
DomainId: domain,
PageMetadata: &grpcReadersV1.PageMetadata{
Offset: testOffset,
Limit: testLimit,
},
},
svcRes: readers.MessagesPage{
Total: 1,
PageMetadata: readers.PageMetadata{
Offset: 0,
Limit: 10,
},
Messages: []readers.Message{
senml.Message{
Channel: "senmlChannel",
Subtopic: "senmlSub",
Publisher: "senmlPublisher",
Protocol: "mqtt",
Name: "temperature",
Unit: "C",
Time: 1672531200,
UpdateTime: 1672531300,
Value: float64Ptr(22.5),
StringValue: stringPtr("ok"),
DataValue: stringPtr("binary"),
BoolValue: boolPtr(true),
Sum: float64Ptr(123.4),
},
},
},
ReadMessagesRes: &grpcReadersV1.ReadMessagesRes{
Total: 1,
PageMetadata: &grpcReadersV1.PageMetadata{
Offset: 0,
Limit: 10,
},
Messages: []*grpcReadersV1.Message{
{
Payload: &grpcReadersV1.Message_Senml{
Senml: &grpcReadersV1.SenMLMessage{
Base: &grpcReadersV1.BaseMessage{
Channel: "senmlChannel",
Subtopic: "senmlSub",
Publisher: "senmlPublisher",
Protocol: "mqtt",
},
Name: "temperature",
Unit: "C",
Time: 1672531200,
UpdateTime: 1672531300,
Value: 22.5,
StringValue: "ok",
DataValue: "binary",
BoolValue: true,
Sum: 123.4,
},
},
},
},
},
},
}
for _, tc := range cases {
repoCall := svc.On("ReadAll", mock.Anything, mock.Anything).Return(tc.svcRes, tc.err)
dpr, err := grpcClient.ReadMessages(context.Background(), tc.ReadMessagesReq)
assert.Equal(t, tc.ReadMessagesRes.Messages, dpr.Messages, fmt.Sprintf("%s: expected %v got %v", tc.desc, tc.ReadMessagesRes.Messages, dpr.Messages))
assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err))
repoCall.Unset()
}
}
func float64Ptr(v float64) *float64 {
return &v
}
func stringPtr(v string) *string {
return &v
}
func boolPtr(v bool) *bool {
return &v
}
+69
View File
@@ -0,0 +1,69 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package grpc
import (
"slices"
"strings"
"time"
apiutil "github.com/absmach/supermq/api/http/util"
"github.com/absmach/supermq/readers"
)
const maxLimitSize = 1000
var validAggregations = []string{"MAX", "MIN", "AVG", "SUM", "COUNT"}
type readMessagesReq struct {
chanID string
domain string
pageMeta readers.PageMetadata
}
func (req readMessagesReq) validate() error {
if req.chanID == "" {
return apiutil.ErrMissingID
}
if req.domain == "" {
return apiutil.ErrMissingID
}
if req.pageMeta.Limit < 1 || req.pageMeta.Limit > maxLimitSize {
return apiutil.ErrLimitSize
}
if req.pageMeta.Comparator != "" &&
req.pageMeta.Comparator != readers.EqualKey &&
req.pageMeta.Comparator != readers.LowerThanKey &&
req.pageMeta.Comparator != readers.LowerThanEqualKey &&
req.pageMeta.Comparator != readers.GreaterThanKey &&
req.pageMeta.Comparator != readers.GreaterThanEqualKey {
return apiutil.ErrInvalidComparator
}
if req.pageMeta.Aggregation == "AGGREGATION_UNSPECIFIED" {
req.pageMeta.Aggregation = ""
}
if agg := strings.ToUpper(req.pageMeta.Aggregation); agg != "" && agg != "AGGREGATION_UNSPECIFIED" {
if req.pageMeta.From == 0 {
return apiutil.ErrMissingFrom
}
if req.pageMeta.To == 0 {
return apiutil.ErrMissingTo
}
if !slices.Contains(validAggregations, strings.ToUpper(req.pageMeta.Aggregation)) {
return apiutil.ErrInvalidAggregation
}
if _, err := time.ParseDuration(req.pageMeta.Interval); err != nil {
return apiutil.ErrInvalidInterval
}
}
return nil
}
+16
View File
@@ -0,0 +1,16 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package grpc
import (
"github.com/absmach/supermq/readers"
)
type readMessagesRes struct {
Total uint64
Messages []readers.Message
readers.PageMetadata
}
type Message interface{}
+189
View File
@@ -0,0 +1,189 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package grpc
import (
"context"
"encoding/json"
grpcReadersV1 "github.com/absmach/magistrala/api/grpc/readers/v1"
grpcapi "github.com/absmach/supermq/auth/api/grpc"
"github.com/absmach/supermq/pkg/transformers/senml"
"github.com/absmach/supermq/readers"
kitgrpc "github.com/go-kit/kit/transport/grpc"
)
var _ grpcReadersV1.ReadersServiceServer = (*readersGrpcServer)(nil)
type readersGrpcServer struct {
grpcReadersV1.UnimplementedReadersServiceServer
readMessages kitgrpc.Handler
}
func NewReadersServer(svc readers.MessageRepository) grpcReadersV1.ReadersServiceServer {
return &readersGrpcServer{
readMessages: kitgrpc.NewServer(
(readMessagesEndpoint(svc)),
decodeReadMessagesRequest,
encodeReadMessagesResponse,
),
}
}
func decodeReadMessagesRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
req := grpcReq.(*grpcReadersV1.ReadMessagesReq)
return readMessagesReq{
chanID: req.GetChannelId(),
domain: req.GetDomainId(),
pageMeta: readers.PageMetadata{
Offset: req.GetPageMetadata().GetOffset(),
Limit: req.GetPageMetadata().GetLimit(),
Comparator: req.GetPageMetadata().GetComparator(),
Aggregation: stringifyAggregation(req.GetPageMetadata().GetAggregation()),
From: req.GetPageMetadata().GetFrom(),
To: req.GetPageMetadata().GetTo(),
Interval: req.GetPageMetadata().GetInterval(),
Subtopic: req.GetPageMetadata().GetSubtopic(),
Publisher: req.GetPageMetadata().GetPublisher(),
Protocol: req.GetPageMetadata().GetProtocol(),
Name: req.GetPageMetadata().GetName(),
Value: req.GetPageMetadata().GetValue(),
BoolValue: req.GetPageMetadata().GetBoolValue(),
StringValue: req.GetPageMetadata().GetStringValue(),
DataValue: req.GetPageMetadata().GetDataValue(),
Format: req.GetPageMetadata().GetFormat(),
},
}, nil
}
func encodeReadMessagesResponse(_ context.Context, grpcRes interface{}) (interface{}, error) {
res := grpcRes.(readMessagesRes)
resp := &grpcReadersV1.ReadMessagesRes{
Total: res.Total,
Messages: toResponseMessages(res.Messages),
PageMetadata: &grpcReadersV1.PageMetadata{
Offset: res.PageMetadata.Offset,
Limit: res.PageMetadata.Limit,
},
}
return resp, nil
}
func (s *readersGrpcServer) ReadMessages(ctx context.Context, req *grpcReadersV1.ReadMessagesReq) (*grpcReadersV1.ReadMessagesRes, error) {
_, res, err := s.readMessages.ServeGRPC(ctx, req)
if err != nil {
return nil, grpcapi.EncodeError(err)
}
return res.(*grpcReadersV1.ReadMessagesRes), nil
}
func toResponseMessages(messages []readers.Message) []*grpcReadersV1.Message {
var res []*grpcReadersV1.Message
for _, m := range messages {
switch typed := m.(type) {
case senml.Message:
res = append(res, &grpcReadersV1.Message{
Payload: &grpcReadersV1.Message_Senml{
Senml: &grpcReadersV1.SenMLMessage{
Base: &grpcReadersV1.BaseMessage{
Channel: typed.Channel,
Subtopic: typed.Subtopic,
Publisher: typed.Publisher,
Protocol: typed.Protocol,
},
Name: typed.Name,
Unit: typed.Unit,
Time: typed.Time,
UpdateTime: typed.UpdateTime,
Value: derefFloat64(typed.Value),
StringValue: derefString(typed.StringValue),
DataValue: derefString(typed.DataValue),
BoolValue: derefBool(typed.BoolValue),
Sum: derefFloat64(typed.Sum),
},
},
})
case map[string]interface{}:
payload := typed["payload"]
data, err := json.Marshal(payload)
if err != nil {
continue
}
res = append(res, &grpcReadersV1.Message{
Payload: &grpcReadersV1.Message_Json{
Json: &grpcReadersV1.JsonMessage{
Base: &grpcReadersV1.BaseMessage{
Channel: safeString(typed["channel"]),
Subtopic: safeString(typed["subtopic"]),
Publisher: safeString(typed["publisher"]),
Protocol: safeString(typed["protocol"]),
},
Created: safeInt64(typed["created"]),
Payload: data,
},
},
})
}
}
return res
}
func stringifyAggregation(agg grpcReadersV1.Aggregation) string {
switch agg {
case grpcReadersV1.Aggregation_AGGREGATION_UNSPECIFIED:
return ""
case grpcReadersV1.Aggregation_MAX:
return "MAX"
case grpcReadersV1.Aggregation_MIN:
return "MIN"
case grpcReadersV1.Aggregation_AVG:
return "AVG"
case grpcReadersV1.Aggregation_SUM:
return "SUM"
case grpcReadersV1.Aggregation_COUNT:
return "COUNT"
default:
return ""
}
}
func derefString(s *string) string {
if s == nil {
return ""
}
return *s
}
func derefFloat64(f *float64) float64 {
if f == nil {
return 0
}
return *f
}
func derefBool(b *bool) bool {
if b == nil {
return false
}
return *b
}
func safeString(v interface{}) string {
if s, ok := v.(string); ok {
return s
}
return ""
}
func safeInt64(v interface{}) int64 {
switch v := v.(type) {
case float64:
return int64(v)
case int64:
return v
default:
return 0
}
}
+24
View File
@@ -0,0 +1,24 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package grpc_test
import (
"os"
"testing"
"github.com/absmach/supermq/readers/mocks"
)
var svc *mocks.MessageRepository
func TestMain(m *testing.M) {
svc = new(mocks.MessageRepository)
server := startGRPCServer(svc, port)
code := m.Run()
server.GracefulStop()
os.Exit(code)
}
@@ -1,7 +1,7 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package api
package http
import (
"context"
@@ -1,7 +1,7 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package api_test
package http_test
import (
"encoding/json"
@@ -12,7 +12,7 @@ import (
"time"
"github.com/absmach/magistrala/internal/testsutil"
"github.com/absmach/magistrala/readers/api"
customhttp "github.com/absmach/magistrala/readers/api/http"
grpcChannelsV1 "github.com/absmach/supermq/api/grpc/channels/v1"
grpcClientsV1 "github.com/absmach/supermq/api/grpc/clients/v1"
apiutil "github.com/absmach/supermq/api/http/util"
@@ -55,7 +55,7 @@ var (
)
func newServer(repo *mocks.MessageRepository, authn *authnmocks.Authentication, clients *climocks.ClientsServiceClient, channels *chmocks.ChannelsServiceClient) *httptest.Server {
mux := api.MakeHandler(repo, authn, clients, channels, svcName, instanceID)
mux := customhttp.MakeHandler(repo, authn, clients, channels, svcName, instanceID)
return httptest.NewServer(mux)
}
@@ -1,7 +1,7 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package api
package http
import (
"slices"
@@ -1,7 +1,7 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package api
package http
import (
"net/http"
@@ -1,7 +1,7 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package api
package http
import (
"context"
+5
View File
@@ -0,0 +1,5 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
// Package middleware provides middleware for Magistrala Readers service.
package middleware
@@ -3,7 +3,7 @@
//go:build !test
package api
package middleware
import (
"log/slog"
@@ -3,7 +3,7 @@
//go:build !test
package api
package middleware
import (
"time"
+3 -3
View File
@@ -5,9 +5,9 @@
# This script contains commands to be executed by the CI tool.
NPROC=$(nproc)
GO_VERSION=1.22.4
PROTOC_VERSION=27.1
PROTOC_GEN_VERSION=v1.34.2
PROTOC_GRPC_VERSION=v1.4.0
PROTOC_VERSION=30.2
PROTOC_GEN_VERSION=v1.36.6
PROTOC_GRPC_VERSION=v1.5.1
GOLANGCI_LINT_VERSION=v1.60.3
function version_gt() { test "$(printf '%s\n' "$@" | sort -V | head -n 1)" != "$1"; }