From 478c9907db8600559a3b8af9fd0d17ccf84a1a0e Mon Sep 17 00:00:00 2001 From: Nataly Musilah <115026536+Musilah@users.noreply.github.com> Date: Wed, 9 Apr 2025 12:34:14 +0300 Subject: [PATCH] NOISSUE - Add Readers GRPC Endpoints (#87) * add ui prefix Signed-off-by: Musilah * add grpc backend Signed-off-by: Musilah * revert email utility and ui Signed-off-by: Musilah * update env variables and docker-composes Signed-off-by: Musilah * update env variables Signed-off-by: Musilah * add middleware folder Signed-off-by: Musilah * fix linter Signed-off-by: Musilah * ui commented Signed-off-by: Musilah * resolve comments and conflicts Signed-off-by: Musilah * resolve comments pt2 Signed-off-by: Musilah * remove token from request Signed-off-by: Musilah * resolve comments Signed-off-by: Musilah * revert makefile and docker changes Signed-off-by: Musilah * force open grpc conn Signed-off-by: Musilah * fix linter Signed-off-by: Musilah * refactor components Signed-off-by: Musilah * fix tests Signed-off-by: Musilah * fix query issue Signed-off-by: Musilah * update protoc Signed-off-by: Musilah * rename variables Signed-off-by: Musilah * remove unused envs Signed-off-by: Musilah * use senml and json types for the messages struct Signed-off-by: Musilah * remove repetitive message fields Signed-off-by: Musilah * update protoc version Signed-off-by: Musilah * return ui to docker Signed-off-by: Musilah --------- Signed-off-by: Musilah --- Makefile | 8 +- api/grpc/readers/v1/readers.pb.go | 849 ++++++++++++++++++ api/grpc/readers/v1/readers_grpc.pb.go | 130 +++ cmd/postgres-reader/main.go | 36 +- cmd/timescale-reader/main.go | 33 +- docker/.env | 20 + .../postgres-reader/docker-compose.yaml | 42 + .../timescale-reader/docker-compose.yaml | 45 +- go.mod | 2 +- internal/proto/readers/v1/readers.proto | 90 ++ pkg/sdk/messages_test.go | 2 +- readers/api/grpc/client.go | 234 +++++ readers/api/grpc/doc.go | 5 + readers/api/grpc/endpoint.go | 31 + readers/api/grpc/endpoint_test.go | 230 +++++ readers/api/grpc/request.go | 69 ++ readers/api/grpc/responses.go | 16 + readers/api/grpc/server.go | 189 ++++ readers/api/grpc/setup_test.go | 24 + readers/api/{ => http}/endpoint.go | 2 +- readers/api/{ => http}/endpoint_test.go | 6 +- readers/api/{ => http}/requests.go | 2 +- readers/api/{ => http}/responses.go | 2 +- readers/api/{ => http}/transport.go | 2 +- readers/middleware/doc.go | 5 + readers/{api => middleware}/logging.go | 2 +- readers/{api => middleware}/metrics.go | 2 +- scripts/ci.sh | 6 +- 28 files changed, 2059 insertions(+), 25 deletions(-) create mode 100644 api/grpc/readers/v1/readers.pb.go create mode 100644 api/grpc/readers/v1/readers_grpc.pb.go create mode 100644 internal/proto/readers/v1/readers.proto create mode 100644 readers/api/grpc/client.go create mode 100644 readers/api/grpc/doc.go create mode 100644 readers/api/grpc/endpoint.go create mode 100644 readers/api/grpc/endpoint_test.go create mode 100644 readers/api/grpc/request.go create mode 100644 readers/api/grpc/responses.go create mode 100644 readers/api/grpc/server.go create mode 100644 readers/api/grpc/setup_test.go rename readers/api/{ => http}/endpoint.go (98%) rename readers/api/{ => http}/endpoint_test.go (99%) rename readers/api/{ => http}/requests.go (99%) rename readers/api/{ => http}/responses.go (97%) rename readers/api/{ => http}/transport.go (99%) create mode 100644 readers/middleware/doc.go rename readers/{api => middleware}/logging.go (98%) rename readers/{api => middleware}/metrics.go (98%) diff --git a/Makefile b/Makefile index c04bc48eb..f7eb8b2fb 100644 --- a/Makefile +++ b/Makefile @@ -20,6 +20,10 @@ DOCKER_COMPOSE_COMMANDS_SUPPORTED := up down config DEFAULT_DOCKER_COMPOSE_COMMAND := up GRPC_MTLS_CERT_FILES_EXISTS = 0 MOCKERY_VERSION=v3.0.0-beta.6 +PKG_PROTO_GEN_OUT_DIR=api/grpc +INTERNAL_PROTO_DIR=internal/proto +INTERNAL_PROTO_FILES := $(shell find $(INTERNAL_PROTO_DIR) -name "*.proto" | sed 's|$(INTERNAL_PROTO_DIR)/||') + ifneq ($(MG_MESSAGE_BROKER_TYPE),) MG_MESSAGE_BROKER_TYPE := $(MG_MESSAGE_BROKER_TYPE) else @@ -174,8 +178,8 @@ $(TEST_API): $(call test_api_service,$(@),$(TEST_API_URL)) proto: - protoc -I. --go_out=. --go_opt=paths=source_relative pkg/messaging/*.proto - protoc -I. --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative ./*.proto + mkdir -p $(PKG_PROTO_GEN_OUT_DIR) + protoc -I $(INTERNAL_PROTO_DIR) --go_out=$(PKG_PROTO_GEN_OUT_DIR) --go_opt=paths=source_relative --go-grpc_out=$(PKG_PROTO_GEN_OUT_DIR) --go-grpc_opt=paths=source_relative $(INTERNAL_PROTO_FILES) $(FILTERED_SERVICES): $(call compile_service,$(@)) diff --git a/api/grpc/readers/v1/readers.pb.go b/api/grpc/readers/v1/readers.pb.go new file mode 100644 index 000000000..3f74d3880 --- /dev/null +++ b/api/grpc/readers/v1/readers.pb.go @@ -0,0 +1,849 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.36.6 +// protoc v6.30.2 +// source: readers/v1/readers.proto + +package v1 + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" + unsafe "unsafe" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// Aggregation defines supported data aggregations. +type Aggregation int32 + +const ( + Aggregation_AGGREGATION_UNSPECIFIED Aggregation = 0 + Aggregation_MAX Aggregation = 1 + Aggregation_MIN Aggregation = 2 + Aggregation_SUM Aggregation = 3 + Aggregation_COUNT Aggregation = 4 + Aggregation_AVG Aggregation = 5 +) + +// Enum value maps for Aggregation. +var ( + Aggregation_name = map[int32]string{ + 0: "AGGREGATION_UNSPECIFIED", + 1: "MAX", + 2: "MIN", + 3: "SUM", + 4: "COUNT", + 5: "AVG", + } + Aggregation_value = map[string]int32{ + "AGGREGATION_UNSPECIFIED": 0, + "MAX": 1, + "MIN": 2, + "SUM": 3, + "COUNT": 4, + "AVG": 5, + } +) + +func (x Aggregation) Enum() *Aggregation { + p := new(Aggregation) + *p = x + return p +} + +func (x Aggregation) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (Aggregation) Descriptor() protoreflect.EnumDescriptor { + return file_readers_v1_readers_proto_enumTypes[0].Descriptor() +} + +func (Aggregation) Type() protoreflect.EnumType { + return &file_readers_v1_readers_proto_enumTypes[0] +} + +func (x Aggregation) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use Aggregation.Descriptor instead. +func (Aggregation) EnumDescriptor() ([]byte, []int) { + return file_readers_v1_readers_proto_rawDescGZIP(), []int{0} +} + +type PageMetadata struct { + state protoimpl.MessageState `protogen:"open.v1"` + Limit uint64 `protobuf:"varint,1,opt,name=limit,proto3" json:"limit,omitempty"` + Offset uint64 `protobuf:"varint,2,opt,name=offset,proto3" json:"offset,omitempty"` + Protocol string `protobuf:"bytes,3,opt,name=protocol,proto3" json:"protocol,omitempty"` + Name string `protobuf:"bytes,4,opt,name=name,proto3" json:"name,omitempty"` + Value float64 `protobuf:"fixed64,5,opt,name=value,proto3" json:"value,omitempty"` + Publisher string `protobuf:"bytes,6,opt,name=publisher,proto3" json:"publisher,omitempty"` + BoolValue bool `protobuf:"varint,7,opt,name=bool_value,json=boolValue,proto3" json:"bool_value,omitempty"` + StringValue string `protobuf:"bytes,8,opt,name=string_value,json=stringValue,proto3" json:"string_value,omitempty"` + DataValue string `protobuf:"bytes,9,opt,name=data_value,json=dataValue,proto3" json:"data_value,omitempty"` + From float64 `protobuf:"fixed64,10,opt,name=from,proto3" json:"from,omitempty"` + To float64 `protobuf:"fixed64,11,opt,name=to,proto3" json:"to,omitempty"` + Subtopic string `protobuf:"bytes,12,opt,name=subtopic,proto3" json:"subtopic,omitempty"` + Interval string `protobuf:"bytes,13,opt,name=interval,proto3" json:"interval,omitempty"` + Read bool `protobuf:"varint,14,opt,name=read,proto3" json:"read,omitempty"` + Aggregation Aggregation `protobuf:"varint,15,opt,name=aggregation,proto3,enum=readers.v1.Aggregation" json:"aggregation,omitempty"` + Comparator string `protobuf:"bytes,16,opt,name=comparator,proto3" json:"comparator,omitempty"` + Format string `protobuf:"bytes,17,opt,name=format,proto3" json:"format,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *PageMetadata) Reset() { + *x = PageMetadata{} + mi := &file_readers_v1_readers_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *PageMetadata) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PageMetadata) ProtoMessage() {} + +func (x *PageMetadata) ProtoReflect() protoreflect.Message { + mi := &file_readers_v1_readers_proto_msgTypes[0] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PageMetadata.ProtoReflect.Descriptor instead. +func (*PageMetadata) Descriptor() ([]byte, []int) { + return file_readers_v1_readers_proto_rawDescGZIP(), []int{0} +} + +func (x *PageMetadata) GetLimit() uint64 { + if x != nil { + return x.Limit + } + return 0 +} + +func (x *PageMetadata) GetOffset() uint64 { + if x != nil { + return x.Offset + } + return 0 +} + +func (x *PageMetadata) GetProtocol() string { + if x != nil { + return x.Protocol + } + return "" +} + +func (x *PageMetadata) GetName() string { + if x != nil { + return x.Name + } + return "" +} + +func (x *PageMetadata) GetValue() float64 { + if x != nil { + return x.Value + } + return 0 +} + +func (x *PageMetadata) GetPublisher() string { + if x != nil { + return x.Publisher + } + return "" +} + +func (x *PageMetadata) GetBoolValue() bool { + if x != nil { + return x.BoolValue + } + return false +} + +func (x *PageMetadata) GetStringValue() string { + if x != nil { + return x.StringValue + } + return "" +} + +func (x *PageMetadata) GetDataValue() string { + if x != nil { + return x.DataValue + } + return "" +} + +func (x *PageMetadata) GetFrom() float64 { + if x != nil { + return x.From + } + return 0 +} + +func (x *PageMetadata) GetTo() float64 { + if x != nil { + return x.To + } + return 0 +} + +func (x *PageMetadata) GetSubtopic() string { + if x != nil { + return x.Subtopic + } + return "" +} + +func (x *PageMetadata) GetInterval() string { + if x != nil { + return x.Interval + } + return "" +} + +func (x *PageMetadata) GetRead() bool { + if x != nil { + return x.Read + } + return false +} + +func (x *PageMetadata) GetAggregation() Aggregation { + if x != nil { + return x.Aggregation + } + return Aggregation_AGGREGATION_UNSPECIFIED +} + +func (x *PageMetadata) GetComparator() string { + if x != nil { + return x.Comparator + } + return "" +} + +func (x *PageMetadata) GetFormat() string { + if x != nil { + return x.Format + } + return "" +} + +type ReadMessagesRes struct { + state protoimpl.MessageState `protogen:"open.v1"` + Total uint64 `protobuf:"varint,1,opt,name=total,proto3" json:"total,omitempty"` + PageMetadata *PageMetadata `protobuf:"bytes,2,opt,name=page_metadata,json=pageMetadata,proto3" json:"page_metadata,omitempty"` + Messages []*Message `protobuf:"bytes,3,rep,name=messages,proto3" json:"messages,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *ReadMessagesRes) Reset() { + *x = ReadMessagesRes{} + mi := &file_readers_v1_readers_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *ReadMessagesRes) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ReadMessagesRes) ProtoMessage() {} + +func (x *ReadMessagesRes) ProtoReflect() protoreflect.Message { + mi := &file_readers_v1_readers_proto_msgTypes[1] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ReadMessagesRes.ProtoReflect.Descriptor instead. +func (*ReadMessagesRes) Descriptor() ([]byte, []int) { + return file_readers_v1_readers_proto_rawDescGZIP(), []int{1} +} + +func (x *ReadMessagesRes) GetTotal() uint64 { + if x != nil { + return x.Total + } + return 0 +} + +func (x *ReadMessagesRes) GetPageMetadata() *PageMetadata { + if x != nil { + return x.PageMetadata + } + return nil +} + +func (x *ReadMessagesRes) GetMessages() []*Message { + if x != nil { + return x.Messages + } + return nil +} + +type Message struct { + state protoimpl.MessageState `protogen:"open.v1"` + // Types that are valid to be assigned to Payload: + // + // *Message_Senml + // *Message_Json + Payload isMessage_Payload `protobuf_oneof:"payload"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *Message) Reset() { + *x = Message{} + mi := &file_readers_v1_readers_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *Message) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Message) ProtoMessage() {} + +func (x *Message) ProtoReflect() protoreflect.Message { + mi := &file_readers_v1_readers_proto_msgTypes[2] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Message.ProtoReflect.Descriptor instead. +func (*Message) Descriptor() ([]byte, []int) { + return file_readers_v1_readers_proto_rawDescGZIP(), []int{2} +} + +func (x *Message) GetPayload() isMessage_Payload { + if x != nil { + return x.Payload + } + return nil +} + +func (x *Message) GetSenml() *SenMLMessage { + if x != nil { + if x, ok := x.Payload.(*Message_Senml); ok { + return x.Senml + } + } + return nil +} + +func (x *Message) GetJson() *JsonMessage { + if x != nil { + if x, ok := x.Payload.(*Message_Json); ok { + return x.Json + } + } + return nil +} + +type isMessage_Payload interface { + isMessage_Payload() +} + +type Message_Senml struct { + Senml *SenMLMessage `protobuf:"bytes,1,opt,name=senml,proto3,oneof"` +} + +type Message_Json struct { + Json *JsonMessage `protobuf:"bytes,2,opt,name=json,proto3,oneof"` +} + +func (*Message_Senml) isMessage_Payload() {} + +func (*Message_Json) isMessage_Payload() {} + +type BaseMessage struct { + state protoimpl.MessageState `protogen:"open.v1"` + Channel string `protobuf:"bytes,1,opt,name=channel,proto3" json:"channel,omitempty"` + Subtopic string `protobuf:"bytes,2,opt,name=subtopic,proto3" json:"subtopic,omitempty"` + Publisher string `protobuf:"bytes,3,opt,name=publisher,proto3" json:"publisher,omitempty"` + Protocol string `protobuf:"bytes,4,opt,name=protocol,proto3" json:"protocol,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *BaseMessage) Reset() { + *x = BaseMessage{} + mi := &file_readers_v1_readers_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *BaseMessage) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*BaseMessage) ProtoMessage() {} + +func (x *BaseMessage) ProtoReflect() protoreflect.Message { + mi := &file_readers_v1_readers_proto_msgTypes[3] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use BaseMessage.ProtoReflect.Descriptor instead. +func (*BaseMessage) Descriptor() ([]byte, []int) { + return file_readers_v1_readers_proto_rawDescGZIP(), []int{3} +} + +func (x *BaseMessage) GetChannel() string { + if x != nil { + return x.Channel + } + return "" +} + +func (x *BaseMessage) GetSubtopic() string { + if x != nil { + return x.Subtopic + } + return "" +} + +func (x *BaseMessage) GetPublisher() string { + if x != nil { + return x.Publisher + } + return "" +} + +func (x *BaseMessage) GetProtocol() string { + if x != nil { + return x.Protocol + } + return "" +} + +type SenMLMessage struct { + state protoimpl.MessageState `protogen:"open.v1"` + Base *BaseMessage `protobuf:"bytes,1,opt,name=base,proto3" json:"base,omitempty"` + Name string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"` + Unit string `protobuf:"bytes,3,opt,name=unit,proto3" json:"unit,omitempty"` + Time float64 `protobuf:"fixed64,4,opt,name=time,proto3" json:"time,omitempty"` + UpdateTime float64 `protobuf:"fixed64,5,opt,name=update_time,json=updateTime,proto3" json:"update_time,omitempty"` + Value float64 `protobuf:"fixed64,6,opt,name=value,proto3" json:"value,omitempty"` + StringValue string `protobuf:"bytes,7,opt,name=string_value,json=stringValue,proto3" json:"string_value,omitempty"` + DataValue string `protobuf:"bytes,8,opt,name=data_value,json=dataValue,proto3" json:"data_value,omitempty"` + BoolValue bool `protobuf:"varint,9,opt,name=bool_value,json=boolValue,proto3" json:"bool_value,omitempty"` + Sum float64 `protobuf:"fixed64,10,opt,name=sum,proto3" json:"sum,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *SenMLMessage) Reset() { + *x = SenMLMessage{} + mi := &file_readers_v1_readers_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *SenMLMessage) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SenMLMessage) ProtoMessage() {} + +func (x *SenMLMessage) ProtoReflect() protoreflect.Message { + mi := &file_readers_v1_readers_proto_msgTypes[4] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SenMLMessage.ProtoReflect.Descriptor instead. +func (*SenMLMessage) Descriptor() ([]byte, []int) { + return file_readers_v1_readers_proto_rawDescGZIP(), []int{4} +} + +func (x *SenMLMessage) GetBase() *BaseMessage { + if x != nil { + return x.Base + } + return nil +} + +func (x *SenMLMessage) GetName() string { + if x != nil { + return x.Name + } + return "" +} + +func (x *SenMLMessage) GetUnit() string { + if x != nil { + return x.Unit + } + return "" +} + +func (x *SenMLMessage) GetTime() float64 { + if x != nil { + return x.Time + } + return 0 +} + +func (x *SenMLMessage) GetUpdateTime() float64 { + if x != nil { + return x.UpdateTime + } + return 0 +} + +func (x *SenMLMessage) GetValue() float64 { + if x != nil { + return x.Value + } + return 0 +} + +func (x *SenMLMessage) GetStringValue() string { + if x != nil { + return x.StringValue + } + return "" +} + +func (x *SenMLMessage) GetDataValue() string { + if x != nil { + return x.DataValue + } + return "" +} + +func (x *SenMLMessage) GetBoolValue() bool { + if x != nil { + return x.BoolValue + } + return false +} + +func (x *SenMLMessage) GetSum() float64 { + if x != nil { + return x.Sum + } + return 0 +} + +type JsonMessage struct { + state protoimpl.MessageState `protogen:"open.v1"` + Base *BaseMessage `protobuf:"bytes,1,opt,name=base,proto3" json:"base,omitempty"` + Created int64 `protobuf:"varint,2,opt,name=created,proto3" json:"created,omitempty"` + Payload []byte `protobuf:"bytes,3,opt,name=payload,proto3" json:"payload,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *JsonMessage) Reset() { + *x = JsonMessage{} + mi := &file_readers_v1_readers_proto_msgTypes[5] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *JsonMessage) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*JsonMessage) ProtoMessage() {} + +func (x *JsonMessage) ProtoReflect() protoreflect.Message { + mi := &file_readers_v1_readers_proto_msgTypes[5] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use JsonMessage.ProtoReflect.Descriptor instead. +func (*JsonMessage) Descriptor() ([]byte, []int) { + return file_readers_v1_readers_proto_rawDescGZIP(), []int{5} +} + +func (x *JsonMessage) GetBase() *BaseMessage { + if x != nil { + return x.Base + } + return nil +} + +func (x *JsonMessage) GetCreated() int64 { + if x != nil { + return x.Created + } + return 0 +} + +func (x *JsonMessage) GetPayload() []byte { + if x != nil { + return x.Payload + } + return nil +} + +type ReadMessagesReq struct { + state protoimpl.MessageState `protogen:"open.v1"` + ChannelId string `protobuf:"bytes,1,opt,name=channel_id,json=channelId,proto3" json:"channel_id,omitempty"` + DomainId string `protobuf:"bytes,2,opt,name=domain_id,json=domainId,proto3" json:"domain_id,omitempty"` + PageMetadata *PageMetadata `protobuf:"bytes,3,opt,name=page_metadata,json=pageMetadata,proto3" json:"page_metadata,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *ReadMessagesReq) Reset() { + *x = ReadMessagesReq{} + mi := &file_readers_v1_readers_proto_msgTypes[6] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *ReadMessagesReq) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ReadMessagesReq) ProtoMessage() {} + +func (x *ReadMessagesReq) ProtoReflect() protoreflect.Message { + mi := &file_readers_v1_readers_proto_msgTypes[6] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ReadMessagesReq.ProtoReflect.Descriptor instead. +func (*ReadMessagesReq) Descriptor() ([]byte, []int) { + return file_readers_v1_readers_proto_rawDescGZIP(), []int{6} +} + +func (x *ReadMessagesReq) GetChannelId() string { + if x != nil { + return x.ChannelId + } + return "" +} + +func (x *ReadMessagesReq) GetDomainId() string { + if x != nil { + return x.DomainId + } + return "" +} + +func (x *ReadMessagesReq) GetPageMetadata() *PageMetadata { + if x != nil { + return x.PageMetadata + } + return nil +} + +var File_readers_v1_readers_proto protoreflect.FileDescriptor + +const file_readers_v1_readers_proto_rawDesc = "" + + "\n" + + "\x18readers/v1/readers.proto\x12\n" + + "readers.v1\"\xe4\x03\n" + + "\fPageMetadata\x12\x14\n" + + "\x05limit\x18\x01 \x01(\x04R\x05limit\x12\x16\n" + + "\x06offset\x18\x02 \x01(\x04R\x06offset\x12\x1a\n" + + "\bprotocol\x18\x03 \x01(\tR\bprotocol\x12\x12\n" + + "\x04name\x18\x04 \x01(\tR\x04name\x12\x14\n" + + "\x05value\x18\x05 \x01(\x01R\x05value\x12\x1c\n" + + "\tpublisher\x18\x06 \x01(\tR\tpublisher\x12\x1d\n" + + "\n" + + "bool_value\x18\a \x01(\bR\tboolValue\x12!\n" + + "\fstring_value\x18\b \x01(\tR\vstringValue\x12\x1d\n" + + "\n" + + "data_value\x18\t \x01(\tR\tdataValue\x12\x12\n" + + "\x04from\x18\n" + + " \x01(\x01R\x04from\x12\x0e\n" + + "\x02to\x18\v \x01(\x01R\x02to\x12\x1a\n" + + "\bsubtopic\x18\f \x01(\tR\bsubtopic\x12\x1a\n" + + "\binterval\x18\r \x01(\tR\binterval\x12\x12\n" + + "\x04read\x18\x0e \x01(\bR\x04read\x129\n" + + "\vaggregation\x18\x0f \x01(\x0e2\x17.readers.v1.AggregationR\vaggregation\x12\x1e\n" + + "\n" + + "comparator\x18\x10 \x01(\tR\n" + + "comparator\x12\x16\n" + + "\x06format\x18\x11 \x01(\tR\x06format\"\x97\x01\n" + + "\x0fReadMessagesRes\x12\x14\n" + + "\x05total\x18\x01 \x01(\x04R\x05total\x12=\n" + + "\rpage_metadata\x18\x02 \x01(\v2\x18.readers.v1.PageMetadataR\fpageMetadata\x12/\n" + + "\bmessages\x18\x03 \x03(\v2\x13.readers.v1.MessageR\bmessages\"u\n" + + "\aMessage\x120\n" + + "\x05senml\x18\x01 \x01(\v2\x18.readers.v1.SenMLMessageH\x00R\x05senml\x12-\n" + + "\x04json\x18\x02 \x01(\v2\x17.readers.v1.JsonMessageH\x00R\x04jsonB\t\n" + + "\apayload\"}\n" + + "\vBaseMessage\x12\x18\n" + + "\achannel\x18\x01 \x01(\tR\achannel\x12\x1a\n" + + "\bsubtopic\x18\x02 \x01(\tR\bsubtopic\x12\x1c\n" + + "\tpublisher\x18\x03 \x01(\tR\tpublisher\x12\x1a\n" + + "\bprotocol\x18\x04 \x01(\tR\bprotocol\"\xa1\x02\n" + + "\fSenMLMessage\x12+\n" + + "\x04base\x18\x01 \x01(\v2\x17.readers.v1.BaseMessageR\x04base\x12\x12\n" + + "\x04name\x18\x02 \x01(\tR\x04name\x12\x12\n" + + "\x04unit\x18\x03 \x01(\tR\x04unit\x12\x12\n" + + "\x04time\x18\x04 \x01(\x01R\x04time\x12\x1f\n" + + "\vupdate_time\x18\x05 \x01(\x01R\n" + + "updateTime\x12\x14\n" + + "\x05value\x18\x06 \x01(\x01R\x05value\x12!\n" + + "\fstring_value\x18\a \x01(\tR\vstringValue\x12\x1d\n" + + "\n" + + "data_value\x18\b \x01(\tR\tdataValue\x12\x1d\n" + + "\n" + + "bool_value\x18\t \x01(\bR\tboolValue\x12\x10\n" + + "\x03sum\x18\n" + + " \x01(\x01R\x03sum\"n\n" + + "\vJsonMessage\x12+\n" + + "\x04base\x18\x01 \x01(\v2\x17.readers.v1.BaseMessageR\x04base\x12\x18\n" + + "\acreated\x18\x02 \x01(\x03R\acreated\x12\x18\n" + + "\apayload\x18\x03 \x01(\fR\apayload\"\x8c\x01\n" + + "\x0fReadMessagesReq\x12\x1d\n" + + "\n" + + "channel_id\x18\x01 \x01(\tR\tchannelId\x12\x1b\n" + + "\tdomain_id\x18\x02 \x01(\tR\bdomainId\x12=\n" + + "\rpage_metadata\x18\x03 \x01(\v2\x18.readers.v1.PageMetadataR\fpageMetadata*Y\n" + + "\vAggregation\x12\x1b\n" + + "\x17AGGREGATION_UNSPECIFIED\x10\x00\x12\a\n" + + "\x03MAX\x10\x01\x12\a\n" + + "\x03MIN\x10\x02\x12\a\n" + + "\x03SUM\x10\x03\x12\t\n" + + "\x05COUNT\x10\x04\x12\a\n" + + "\x03AVG\x10\x052\\\n" + + "\x0eReadersService\x12J\n" + + "\fReadMessages\x12\x1b.readers.v1.ReadMessagesReq\x1a\x1b.readers.v1.ReadMessagesRes\"\x00B3Z1github.com/absmach/magistrala/api/grpc/readers/v1b\x06proto3" + +var ( + file_readers_v1_readers_proto_rawDescOnce sync.Once + file_readers_v1_readers_proto_rawDescData []byte +) + +func file_readers_v1_readers_proto_rawDescGZIP() []byte { + file_readers_v1_readers_proto_rawDescOnce.Do(func() { + file_readers_v1_readers_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_readers_v1_readers_proto_rawDesc), len(file_readers_v1_readers_proto_rawDesc))) + }) + return file_readers_v1_readers_proto_rawDescData +} + +var file_readers_v1_readers_proto_enumTypes = make([]protoimpl.EnumInfo, 1) +var file_readers_v1_readers_proto_msgTypes = make([]protoimpl.MessageInfo, 7) +var file_readers_v1_readers_proto_goTypes = []any{ + (Aggregation)(0), // 0: readers.v1.Aggregation + (*PageMetadata)(nil), // 1: readers.v1.PageMetadata + (*ReadMessagesRes)(nil), // 2: readers.v1.ReadMessagesRes + (*Message)(nil), // 3: readers.v1.Message + (*BaseMessage)(nil), // 4: readers.v1.BaseMessage + (*SenMLMessage)(nil), // 5: readers.v1.SenMLMessage + (*JsonMessage)(nil), // 6: readers.v1.JsonMessage + (*ReadMessagesReq)(nil), // 7: readers.v1.ReadMessagesReq +} +var file_readers_v1_readers_proto_depIdxs = []int32{ + 0, // 0: readers.v1.PageMetadata.aggregation:type_name -> readers.v1.Aggregation + 1, // 1: readers.v1.ReadMessagesRes.page_metadata:type_name -> readers.v1.PageMetadata + 3, // 2: readers.v1.ReadMessagesRes.messages:type_name -> readers.v1.Message + 5, // 3: readers.v1.Message.senml:type_name -> readers.v1.SenMLMessage + 6, // 4: readers.v1.Message.json:type_name -> readers.v1.JsonMessage + 4, // 5: readers.v1.SenMLMessage.base:type_name -> readers.v1.BaseMessage + 4, // 6: readers.v1.JsonMessage.base:type_name -> readers.v1.BaseMessage + 1, // 7: readers.v1.ReadMessagesReq.page_metadata:type_name -> readers.v1.PageMetadata + 7, // 8: readers.v1.ReadersService.ReadMessages:input_type -> readers.v1.ReadMessagesReq + 2, // 9: readers.v1.ReadersService.ReadMessages:output_type -> readers.v1.ReadMessagesRes + 9, // [9:10] is the sub-list for method output_type + 8, // [8:9] is the sub-list for method input_type + 8, // [8:8] is the sub-list for extension type_name + 8, // [8:8] is the sub-list for extension extendee + 0, // [0:8] is the sub-list for field type_name +} + +func init() { file_readers_v1_readers_proto_init() } +func file_readers_v1_readers_proto_init() { + if File_readers_v1_readers_proto != nil { + return + } + file_readers_v1_readers_proto_msgTypes[2].OneofWrappers = []any{ + (*Message_Senml)(nil), + (*Message_Json)(nil), + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: unsafe.Slice(unsafe.StringData(file_readers_v1_readers_proto_rawDesc), len(file_readers_v1_readers_proto_rawDesc)), + NumEnums: 1, + NumMessages: 7, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_readers_v1_readers_proto_goTypes, + DependencyIndexes: file_readers_v1_readers_proto_depIdxs, + EnumInfos: file_readers_v1_readers_proto_enumTypes, + MessageInfos: file_readers_v1_readers_proto_msgTypes, + }.Build() + File_readers_v1_readers_proto = out.File + file_readers_v1_readers_proto_goTypes = nil + file_readers_v1_readers_proto_depIdxs = nil +} diff --git a/api/grpc/readers/v1/readers_grpc.pb.go b/api/grpc/readers/v1/readers_grpc.pb.go new file mode 100644 index 000000000..d0700e6fa --- /dev/null +++ b/api/grpc/readers/v1/readers_grpc.pb.go @@ -0,0 +1,130 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.5.1 +// - protoc v6.30.2 +// source: readers/v1/readers.proto + +package v1 + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.64.0 or later. +const _ = grpc.SupportPackageIsVersion9 + +const ( + ReadersService_ReadMessages_FullMethodName = "/readers.v1.ReadersService/ReadMessages" +) + +// ReadersServiceClient is the client API for ReadersService service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +// +// ReadersService is a service that provides access to +// readers functionalities for Magistrala services. +type ReadersServiceClient interface { + ReadMessages(ctx context.Context, in *ReadMessagesReq, opts ...grpc.CallOption) (*ReadMessagesRes, error) +} + +type readersServiceClient struct { + cc grpc.ClientConnInterface +} + +func NewReadersServiceClient(cc grpc.ClientConnInterface) ReadersServiceClient { + return &readersServiceClient{cc} +} + +func (c *readersServiceClient) ReadMessages(ctx context.Context, in *ReadMessagesReq, opts ...grpc.CallOption) (*ReadMessagesRes, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(ReadMessagesRes) + err := c.cc.Invoke(ctx, ReadersService_ReadMessages_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + +// ReadersServiceServer is the server API for ReadersService service. +// All implementations must embed UnimplementedReadersServiceServer +// for forward compatibility. +// +// ReadersService is a service that provides access to +// readers functionalities for Magistrala services. +type ReadersServiceServer interface { + ReadMessages(context.Context, *ReadMessagesReq) (*ReadMessagesRes, error) + mustEmbedUnimplementedReadersServiceServer() +} + +// UnimplementedReadersServiceServer must be embedded to have +// forward compatible implementations. +// +// NOTE: this should be embedded by value instead of pointer to avoid a nil +// pointer dereference when methods are called. +type UnimplementedReadersServiceServer struct{} + +func (UnimplementedReadersServiceServer) ReadMessages(context.Context, *ReadMessagesReq) (*ReadMessagesRes, error) { + return nil, status.Errorf(codes.Unimplemented, "method ReadMessages not implemented") +} +func (UnimplementedReadersServiceServer) mustEmbedUnimplementedReadersServiceServer() {} +func (UnimplementedReadersServiceServer) testEmbeddedByValue() {} + +// UnsafeReadersServiceServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to ReadersServiceServer will +// result in compilation errors. +type UnsafeReadersServiceServer interface { + mustEmbedUnimplementedReadersServiceServer() +} + +func RegisterReadersServiceServer(s grpc.ServiceRegistrar, srv ReadersServiceServer) { + // If the following call pancis, it indicates UnimplementedReadersServiceServer was + // embedded by pointer and is nil. This will cause panics if an + // unimplemented method is ever invoked, so we test this at initialization + // time to prevent it from happening at runtime later due to I/O. + if t, ok := srv.(interface{ testEmbeddedByValue() }); ok { + t.testEmbeddedByValue() + } + s.RegisterService(&ReadersService_ServiceDesc, srv) +} + +func _ReadersService_ReadMessages_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(ReadMessagesReq) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ReadersServiceServer).ReadMessages(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: ReadersService_ReadMessages_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ReadersServiceServer).ReadMessages(ctx, req.(*ReadMessagesReq)) + } + return interceptor(ctx, in, info, handler) +} + +// ReadersService_ServiceDesc is the grpc.ServiceDesc for ReadersService service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var ReadersService_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "readers.v1.ReadersService", + HandlerType: (*ReadersServiceServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "ReadMessages", + Handler: _ReadersService_ReadMessages_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "readers/v1/readers.proto", +} diff --git a/cmd/postgres-reader/main.go b/cmd/postgres-reader/main.go index 568cb9be7..57973eb72 100644 --- a/cmd/postgres-reader/main.go +++ b/cmd/postgres-reader/main.go @@ -12,7 +12,10 @@ import ( "os" chclient "github.com/absmach/callhome/pkg/client" - httpapi "github.com/absmach/magistrala/readers/api" + grpcReadersV1 "github.com/absmach/magistrala/api/grpc/readers/v1" + readersgrpcapi "github.com/absmach/magistrala/readers/api/grpc" + httpapi "github.com/absmach/magistrala/readers/api/http" + middleware "github.com/absmach/magistrala/readers/middleware" "github.com/absmach/magistrala/readers/postgres" "github.com/absmach/supermq" smqlog "github.com/absmach/supermq/logger" @@ -21,12 +24,15 @@ import ( pgclient "github.com/absmach/supermq/pkg/postgres" "github.com/absmach/supermq/pkg/prometheus" "github.com/absmach/supermq/pkg/server" + grpcserver "github.com/absmach/supermq/pkg/server/grpc" httpserver "github.com/absmach/supermq/pkg/server/http" "github.com/absmach/supermq/pkg/uuid" "github.com/absmach/supermq/readers" "github.com/caarlos0/env/v11" "github.com/jmoiron/sqlx" "golang.org/x/sync/errgroup" + "google.golang.org/grpc" + "google.golang.org/grpc/reflection" ) const ( @@ -38,6 +44,8 @@ const ( envPrefixChannels = "SMQ_CHANNELS_GRPC_" defDB = "supermq" defSvcHTTPPort = "9009" + defSvcGRPCPort = "7009" + envPrefixGrpc = "MG_POSTGRES_READER_GRPC_" ) type config struct { @@ -85,6 +93,19 @@ func main() { } defer db.Close() + repo := newService(db, logger) + + grpcServerConfig := server.Config{Port: defSvcGRPCPort} + if err := env.ParseWithOptions(&grpcServerConfig, env.Options{Prefix: envPrefixGrpc}); err != nil { + logger.Error(fmt.Sprintf("failed to load %s gRPC server configuration : %s", svcName, err.Error())) + exitCode = 1 + return + } + registerReadersServiceServer := func(srv *grpc.Server) { + reflection.Register(srv) + grpcReadersV1.RegisterReadersServiceServer(srv, readersgrpcapi.NewReadersServer(repo)) + } + clientsClientCfg := grpcclient.Config{} if err := env.ParseWithOptions(&clientsClientCfg, env.Options{Prefix: envPrefixClients}); err != nil { logger.Error(fmt.Sprintf("failed to load clients gRPC client configuration : %s", err)) @@ -99,6 +120,7 @@ func main() { return } defer clientsHandler.Close() + logger.Info("Clients service gRPC client successfully connected to clients gRPC server " + clientsHandler.Secure()) channelsClientCfg := grpcclient.Config{} @@ -133,8 +155,6 @@ func main() { defer authnHandler.Close() logger.Info("authn successfully connected to auth gRPC server " + authnHandler.Secure()) - repo := newService(db, logger) - httpServerConfig := server.Config{Port: defSvcHTTPPort} if err := env.ParseWithOptions(&httpServerConfig, env.Options{Prefix: envPrefixHTTP}); err != nil { logger.Error(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err)) @@ -148,6 +168,12 @@ func main() { go chc.CallHome(ctx) } + gs := grpcserver.NewServer(ctx, cancel, svcName, grpcServerConfig, registerReadersServiceServer, logger) + + g.Go(func() error { + return gs.Start() + }) + g.Go(func() error { return hs.Start() }) @@ -163,9 +189,9 @@ func main() { func newService(db *sqlx.DB, logger *slog.Logger) readers.MessageRepository { svc := postgres.New(db) - svc = httpapi.LoggingMiddleware(svc, logger) + svc = middleware.LoggingMiddleware(svc, logger) counter, latency := prometheus.MakeMetrics("postgres", "message_reader") - svc = httpapi.MetricsMiddleware(svc, counter, latency) + svc = middleware.MetricsMiddleware(svc, counter, latency) return svc } diff --git a/cmd/timescale-reader/main.go b/cmd/timescale-reader/main.go index 8bbbb210c..b2031d939 100644 --- a/cmd/timescale-reader/main.go +++ b/cmd/timescale-reader/main.go @@ -12,7 +12,10 @@ import ( "os" chclient "github.com/absmach/callhome/pkg/client" - httpapi "github.com/absmach/magistrala/readers/api" + grpcReadersV1 "github.com/absmach/magistrala/api/grpc/readers/v1" + readersgrpcapi "github.com/absmach/magistrala/readers/api/grpc" + httpapi "github.com/absmach/magistrala/readers/api/http" + middleware "github.com/absmach/magistrala/readers/middleware" "github.com/absmach/magistrala/readers/timescale" "github.com/absmach/supermq" smqlog "github.com/absmach/supermq/logger" @@ -21,12 +24,15 @@ import ( pgclient "github.com/absmach/supermq/pkg/postgres" "github.com/absmach/supermq/pkg/prometheus" "github.com/absmach/supermq/pkg/server" + grpcserver "github.com/absmach/supermq/pkg/server/grpc" httpserver "github.com/absmach/supermq/pkg/server/http" "github.com/absmach/supermq/pkg/uuid" "github.com/absmach/supermq/readers" "github.com/caarlos0/env/v11" "github.com/jmoiron/sqlx" "golang.org/x/sync/errgroup" + "google.golang.org/grpc" + "google.golang.org/grpc/reflection" ) const ( @@ -38,6 +44,8 @@ const ( envPrefixChannels = "SMQ_CHANNELS_GRPC_" defDB = "messages" defSvcHTTPPort = "9011" + defSvcGRPCPort = "7011" + envPrefixGrpc = "MG_TIMESCALE_READER_GRPC_" ) type config struct { @@ -85,6 +93,19 @@ func main() { repo := newService(db, logger) + grpcServerConfig := server.Config{ + Port: defSvcGRPCPort, + } + if err := env.ParseWithOptions(&grpcServerConfig, env.Options{Prefix: envPrefixGrpc}); err != nil { + logger.Error(fmt.Sprintf("failed to load %s gRPC server configuration : %s", svcName, err.Error())) + exitCode = 1 + return + } + registerReadersServiceServer := func(srv *grpc.Server) { + reflection.Register(srv) + grpcReadersV1.RegisterReadersServiceServer(srv, readersgrpcapi.NewReadersServer(repo)) + } + clientsClientCfg := grpcclient.Config{} if err := env.ParseWithOptions(&clientsClientCfg, env.Options{Prefix: envPrefixClients}); err != nil { logger.Error(fmt.Sprintf("failed to load %s auth configuration : %s", svcName, err)) @@ -147,6 +168,12 @@ func main() { go chc.CallHome(ctx) } + gs := grpcserver.NewServer(ctx, cancel, svcName, grpcServerConfig, registerReadersServiceServer, logger) + + g.Go(func() error { + return gs.Start() + }) + g.Go(func() error { return hs.Start() }) @@ -162,9 +189,9 @@ func main() { func newService(db *sqlx.DB, logger *slog.Logger) readers.MessageRepository { svc := timescale.New(db) - svc = httpapi.LoggingMiddleware(svc, logger) + svc = middleware.LoggingMiddleware(svc, logger) counter, latency := prometheus.MakeMetrics("timescale", "message_reader") - svc = httpapi.MetricsMiddleware(svc, counter, latency) + svc = middleware.MetricsMiddleware(svc, counter, latency) return svc } diff --git a/docker/.env b/docker/.env index 05a307dc7..2d8e76b0f 100644 --- a/docker/.env +++ b/docker/.env @@ -189,9 +189,19 @@ MG_POSTGRES_WRITER_INSTANCE_ID= MG_POSTGRES_READER_LOG_LEVEL=debug MG_POSTGRES_READER_HTTP_HOST=postgres-reader MG_POSTGRES_READER_HTTP_PORT=9009 +MG_POSTGRES_READER_GRPC_HOST=postgres-reader +MG_POSTGRES_READER_GRPC_PORT=7009 MG_POSTGRES_READER_HTTP_SERVER_CERT= MG_POSTGRES_READER_HTTP_SERVER_KEY= MG_POSTGRES_READER_INSTANCE_ID= +MG_POSTGRES_READER_GRPC_URL=postgres-reader:7011 +MG_POSTGRES_READER_GRPC_TIMEOUT=300s +MG_POSTGRES_READER_GRPC_CLIENT_CERT=${GRPC_MTLS:+./ssl/certs/readers-grpc-client.crt} +MG_POSTGRES_READER_GRPC_CLIENT_CA_CERTS=${GRPC_MTLS:+./ssl/certs/ca.crt} +MG_POSTGRES_READER_GRPC_CLIENT_KEY=${GRPC_MTLS:+./ssl/certs/readers-grpc-client.key} +MG_POSTGRES_READER_GRPC_SERVER_CERT=${GRPC_MTLS:+./ssl/certs/readers-grpc-server.crt}${GRPC_TLS:+./ssl/certs/readers-grpc-server.crt} +MG_POSTGRES_READER_GRPC_SERVER_KEY=${GRPC_MTLS:+./ssl/certs/readers-grpc-server.key}${GRPC_TLS:+./ssl/certs/readers-grpc-server.key} +MG_POSTGRES_READER_GRPC_SERVER_CA_CERTS=${GRPC_MTLS:+./ssl/certs/ca.crt}${GRPC_TLS:+./ssl/certs/ca.crt} ### Timescale MG_TIMESCALE_HOST=timescale @@ -217,9 +227,19 @@ MG_TIMESCALE_WRITER_INSTANCE_ID= MG_TIMESCALE_READER_LOG_LEVEL=debug MG_TIMESCALE_READER_HTTP_HOST=timescale-reader MG_TIMESCALE_READER_HTTP_PORT=9011 +MG_TIMESCALE_READER_GRPC_HOST=timescale-reader +MG_TIMESCALE_READER_GRPC_PORT=7011 MG_TIMESCALE_READER_HTTP_SERVER_CERT= MG_TIMESCALE_READER_HTTP_SERVER_KEY= MG_TIMESCALE_READER_INSTANCE_ID= +MG_TIMESCALE_READER_GRPC_URL=timescale-reader:7011 +MG_TIMESCALE_READER_GRPC_TIMEOUT=300s +MG_TIMESCALE_READER_GRPC_CLIENT_CERT=${GRPC_MTLS:+./ssl/certs/reader-grpc-client.crt} +MG_TIMESCALE_READER_GRPC_CLIENT_CA_CERTS=${GRPC_MTLS:+./ssl/certs/ca.crt} +MG_TIMESCALE_READER_GRPC_CLIENT_KEY=${GRPC_MTLS:+./ssl/certs/readers-grpc-client.key} +MG_TIMESCALE_READER_GRPC_SERVER_CERT=${GRPC_MTLS:+./ssl/certs/readers-grpc-server.crt}${GRPC_TLS:+./ssl/certs/readers-grpc-server.crt} +MG_TIMESCALE_READER_GRPC_SERVER_KEY=${GRPC_MTLS:+./ssl/certs/readers-grpc-server.key}${GRPC_TLS:+./ssl/certs/readers-grpc-server.key} +MG_TIMESCALE_READER_GRPC_SERVER_CA_CERTS=${GRPC_MTLS:+./ssl/certs/ca.crt}${GRPC_TLS:+./ssl/certs/ca.crt} ### GRAFANA and PROMETHEUS SMQ_PROMETHEUS_PORT=9090 diff --git a/docker/addons/postgres-reader/docker-compose.yaml b/docker/addons/postgres-reader/docker-compose.yaml index 55f307d2b..95499061f 100644 --- a/docker/addons/postgres-reader/docker-compose.yaml +++ b/docker/addons/postgres-reader/docker-compose.yaml @@ -35,6 +35,16 @@ services: MG_THINGS_AUTH_GRPC_CLIENT_CERT: ${MG_THINGS_AUTH_GRPC_CLIENT_CERT:+/things-grpc-client.crt} SMQ_CLIENTS_GRPC_CLIENT_KEY: ${SMQ_CLIENTS_GRPC_CLIENT_KEY:+/things-grpc-client.key} SMQ_CLIENTS_GRPC_SERVER_CA_CERTS: ${SMQ_CLIENTS_GRPC_SERVER_CA_CERTS:+/things-grpc-server-ca.crt} + MG_POSTGRES_READER_GRPC_URL: ${MG_POSTGRES_READER_GRPC_URL} + MG_POSTGRES_READER_GRPC_PORT: ${MG_POSTGRES_READER_GRPC_PORT} + MG_POSTGRES_READER_GRPC_HOST: ${MG_POSTGRES_READER_GRPC_HOST} + MG_POSTGRES_READER_GRPC_TIMEOUT: ${MG_POSTGRES_READER_GRPC_TIMEOUT} + MG_POSTGRES_READER_GRPC_CLIENT_CERT: ${MG_POSTGRES_READER_GRPC_CLIENT_CERT:+./ssl/certs/reader-grpc-client.crt} + MG_POSTGRES_READER_GRPC_CLIENT_CA_CERTS: ${MG_POSTGRES_READER_GRPC_CLIENT_CA_CERTS:+./ssl/certs/ca.crt} + MG_POSTGRES_READER_GRPC_SERVER_CA_CERTS: ${MG_POSTGRES_READER_GRPC_SERVER_CA_CERTS:+./ssl/certs/ca.crt} + MG_POSTGRES_READER_GRPC_CLIENT_KEY: ${MG_POSTGRES_READER_GRPC_CLIENT_KEY:+/readers-grpc-client.key} + MG_POSTGRES_READER_GRPC_SERVER_CERT: ${MG_POSTGRES_READER_GRPC_SERVER_CERT:+./ssl/certs/readers-grpc-server.crt} + MG_POSTGRES_READER_GRPC_SERVER_KEY: ${MG_POSTGRES_READER_GRPC_SERVER_KEY:+./ssl/certs/readers-grpc-server.key} SMQ_AUTH_GRPC_URL: ${SMQ_AUTH_GRPC_URL} SMQ_AUTH_GRPC_TIMEOUT: ${SMQ_AUTH_GRPC_TIMEOUT} SMQ_AUTH_GRPC_CLIENT_CERT: ${SMQ_AUTH_GRPC_CLIENT_CERT:+/auth-grpc-client.crt} @@ -44,6 +54,7 @@ services: MG_POSTGRES_READER_INSTANCE_ID: ${MG_POSTGRES_READER_INSTANCE_ID} ports: - ${MG_POSTGRES_READER_HTTP_PORT}:${MG_POSTGRES_READER_HTTP_PORT} + - ${MG_POSTGRES_READER_GRPC_PORT}:${MG_POSTGRES_READER_GRPC_PORT} networks: - magistrala-base-net volumes: @@ -78,3 +89,34 @@ services: target: /things-grpc-server-ca${SMQ_CLIENTS_GRPC_SERVER_CA_CERTS:+.crt} bind: create_host_path: true + # Reader gRPC mTLS client certificates + - type: bind + source: ${MG_POSTGRES_READER_GRPC_SERVER_CERT:-ssl/certs/dummy/server_cert} + target: /readers-grpc-server${MG_POSTGRES_READER_GRPC_SERVER_CERT:+.crt} + bind: + create_host_path: true + - type: bind + source: ${MG_POSTGRES_READER_GRPC_SERVER_KEY:-ssl/certs/dummy/server_key} + target: /readers-grpc-server${MG_POSTGRES_READER_GRPC_SERVER_KEY:+.key} + bind: + create_host_path: true + - type: bind + source: ${MG_POSTGRES_READER_GRPC_SERVER_CA_CERTS:-ssl/certs/dummy/server_ca_certs} + target: /readers-grpc-server-ca${MG_POSTGRES_READER_GRPC_SERVER_CA_CERTS:+.crt} + bind: + create_host_path: true + - type: bind + source: ${MG_POSTGRES_READER_GRPC_CLIENT_CA_CERTS:-ssl/certs/dummy/client_ca_certs} + target: /readers-grpc-server${MG_POSTGRES_READER_GRPC_CLIENT_CA_CERTS:+.crt} + bind: + create_host_path: true + - type: bind + source: ${MG_POSTGRES_READER_GRPC_CLIENT_CERT:-ssl/certs/dummy/client_cert} + target: /readers-grpc-client${MG_POSTGRES_READER_GRPC_CLIENT_CERT:+.crt} + bind: + create_host_path: true + - type: bind + source: ${MG_POSTGRES_READER_GRPC_CLIENT_KEY:-ssl/certs/dummy/client_key} + target: /readers-grpc-client${MG_POSTGRES_READER_GRPC_CLIENT_KEY:+.key} + bind: + create_host_path: true diff --git a/docker/addons/timescale-reader/docker-compose.yaml b/docker/addons/timescale-reader/docker-compose.yaml index 475b04082..f12c07411 100644 --- a/docker/addons/timescale-reader/docker-compose.yaml +++ b/docker/addons/timescale-reader/docker-compose.yaml @@ -9,6 +9,7 @@ networks: magistrala-base-net: + driver: bridge services: timescale-reader: @@ -40,6 +41,16 @@ services: SMQ_CHANNELS_GRPC_CLIENT_CERT: ${SMQ_CHANNELS_GRPC_CLIENT_CERT:+/channels-grpc-client.crt} SMQ_CHANNELS_GRPC_CLIENT_KEY: ${SMQ_CHANNELS_GRPC_CLIENT_KEY:+/channels-grpc-client.key} SMQ_CHANNELS_GRPC_SERVER_CA_CERTS: ${SMQ_CHANNELS_GRPC_SERVER_CA_CERTS:+/channels-grpc-server-ca.crt} + MG_TIMESCALE_READER_GRPC_URL: ${MG_TIMESCALE_READER_GRPC__URL} + MG_TIMESCALE_READER_GRPC_PORT: ${MG_TIMESCALE_READER_GRPC_PORT} + MG_TIMESCALE_READER_GRPC_HOST: ${MG_TIMESCALE_READER_GRPC_HOST} + MG_TIMESCALE_READER_GRPC_TIMEOUT: ${MG_TIMESCALE_READER_GRPC_TIMEOUT} + MG_TIMESCALE_READER_GRPC_CLIENT_CERT: ${MG_TIMESCALE_READER_GRPC_CLIENT_CERT:+./ssl/certs/reader-grpc-client.crt} + MG_TIMESCALE_READER_GRPC_CLIENT_CA_CERTS: ${MG_TIMESCALE_READER_GRPC_CLIENT_CA_CERTS:+./ssl/certs/ca.crt} + MG_TIMESCALE_READER_GRPC_SERVER_CA_CERTS: ${MG_TIMESCALE_READER_GRPC_SERVER_CA_CERTS:+./ssl/certs/ca.crt} + MG_TIMESCALE_READER_GRPC_CLIENT_KEY: ${MG_TIMESCALE_READER_GRPC_CLIENT_KEY:+/readers-grpc-client.key} + MG_TIMESCALE_READER_GRPC_SERVER_CERT: ${MG_TIMESCALE_READER_GRPC_SERVER_CERT:+./ssl/certs/readers-grpc-server.crt} + MG_TIMESCALE_READER_GRPC_SERVER_KEY: ${MG_TIMESCALE_READER_GRPC_SERVER_KEY:+./ssl/certs/readers-grpc-server.key} SMQ_AUTH_GRPC_URL: ${SMQ_AUTH_GRPC_URL} SMQ_AUTH_GRPC_TIMEOUT: ${SMQ_AUTH_GRPC_TIMEOUT} SMQ_AUTH_GRPC_CLIENT_CERT: ${SMQ_AUTH_GRPC_CLIENT_CERT:+/auth-grpc-client.crt} @@ -48,7 +59,8 @@ services: SMQ_SEND_TELEMETRY: ${SMQ_SEND_TELEMETRY} MG_TIMESCALE_READER_INSTANCE_ID: ${MG_TIMESCALE_READER_INSTANCE_ID} ports: - - ${MG_TIMESCALE_READER_HTTP_PORT}:${MG_TIMESCALE_READER_HTTP_PORT} + - ${MG_TIMESCALE_READER_HTTP_PORT}:${MG_TIMESCALE_READER_HTTP_PORT} + - ${MG_TIMESCALE_READER_GRPC_PORT}:${MG_TIMESCALE_READER_GRPC_PORT} networks: - magistrala-base-net volumes: @@ -83,3 +95,34 @@ services: target: /things-grpc-server-ca${SMQ_CLIENTS_GRPC_SERVER_CA_CERTS:+.crt} bind: create_host_path: true + # Reader gRPC mTLS client certificates + - type: bind + source: ${MG_TIMESCALE_READER_GRPC_SERVER_CERT:-ssl/certs/dummy/server_cert} + target: /readers-grpc-server${MG_TIMESCALE_READER_GRPC_SERVER_CERT:+.crt} + bind: + create_host_path: true + - type: bind + source: ${MG_TIMESCALE_READER_GRPC_SERVER_KEY:-ssl/certs/dummy/server_key} + target: /readers-grpc-server${MG_TIMESCALE_READER_GRPC_SERVER_KEY:+.key} + bind: + create_host_path: true + - type: bind + source: ${MG_TIMESCALE_READER_GRPC_SERVER_CA_CERTS:-ssl/certs/dummy/server_ca_certs} + target: /readers-grpc-server-ca${MG_TIMESCALE_READER_GRPC_SERVER_CA_CERTS:+.crt} + bind: + create_host_path: true + - type: bind + source: ${MG_TIMESCALE_READER_GRPC_CLIENT_CA_CERTS:-ssl/certs/dummy/client_ca_certs} + target: /readers-grpc-server${MG_TIMESCALE_READER_GRPC_CLIENT_CA_CERTS:+.crt} + bind: + create_host_path: true + - type: bind + source: ${MG_TIMESCALE_READER_GRPC_CLIENT_CERT:-ssl/certs/dummy/client_cert} + target: /readers-grpc-client${MG_TIMESCALE_READER_GRPC_CLIENT_CERT:+.crt} + bind: + create_host_path: true + - type: bind + source: ${MG_TIMESCALE_READER_GRPC_CLIENT_KEY:-ssl/certs/dummy/client_key} + target: /readers-grpc-client${MG_TIMESCALE_READER_GRPC_CLIENT_KEY:+.key} + bind: + create_host_path: true diff --git a/go.mod b/go.mod index 97719a895..e5403caf4 100644 --- a/go.mod +++ b/go.mod @@ -139,7 +139,7 @@ require ( golang.org/x/text v0.23.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20250303144028-a0af3efb3deb // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20250303144028-a0af3efb3deb // indirect - google.golang.org/protobuf v1.36.6 // indirect + google.golang.org/protobuf v1.36.6 gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/internal/proto/readers/v1/readers.proto b/internal/proto/readers/v1/readers.proto new file mode 100644 index 000000000..a64405d6c --- /dev/null +++ b/internal/proto/readers/v1/readers.proto @@ -0,0 +1,90 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +syntax = "proto3"; + +package readers.v1; + +option go_package = "github.com/absmach/magistrala/api/grpc/readers/v1"; + +// ReadersService is a service that provides access to +// readers functionalities for Magistrala services. +service ReadersService { + rpc ReadMessages(ReadMessagesReq) + returns (ReadMessagesRes) {} +} + +message PageMetadata { + uint64 limit = 1; + uint64 offset = 2; + string protocol = 3; + string name = 4; + double value = 5; + string publisher = 6; + bool bool_value = 7; + string string_value = 8; + string data_value = 9; + double from = 10; + double to = 11; + string subtopic = 12; + string interval = 13; + bool read = 14; + Aggregation aggregation = 15; + string comparator = 16; + string format = 17; +} + +message ReadMessagesRes { + uint64 total = 1; + PageMetadata page_metadata = 2; + repeated Message messages = 3; +} + +message Message { + oneof payload { + SenMLMessage senml = 1; + JsonMessage json = 2; + } +} + +message BaseMessage { + string channel = 1; + string subtopic = 2; + string publisher = 3; + string protocol = 4; +} + +message SenMLMessage { + BaseMessage base = 1; + string name = 2; + string unit = 3; + double time = 4; + double update_time = 5; + double value = 6; + string string_value = 7; + string data_value = 8; + bool bool_value = 9; + double sum = 10; +} + +message JsonMessage { + BaseMessage base = 1; + int64 created = 2; + bytes payload = 3; +} + +message ReadMessagesReq { + string channel_id = 1; + string domain_id = 2; + PageMetadata page_metadata = 3; +} + +// Aggregation defines supported data aggregations. +enum Aggregation { + AGGREGATION_UNSPECIFIED = 0; + MAX = 1; + MIN = 2; + SUM = 3; + COUNT = 4; + AVG = 5; +} diff --git a/pkg/sdk/messages_test.go b/pkg/sdk/messages_test.go index 2f7a8d4ca..c8fce8af4 100644 --- a/pkg/sdk/messages_test.go +++ b/pkg/sdk/messages_test.go @@ -9,7 +9,7 @@ import ( "testing" sdk "github.com/absmach/magistrala/pkg/sdk" - readersapi "github.com/absmach/magistrala/readers/api" + readersapi "github.com/absmach/magistrala/readers/api/http" grpcChannelsV1 "github.com/absmach/supermq/api/grpc/channels/v1" apiutil "github.com/absmach/supermq/api/http/util" chmocks "github.com/absmach/supermq/channels/mocks" diff --git a/readers/api/grpc/client.go b/readers/api/grpc/client.go new file mode 100644 index 000000000..28edd4d5b --- /dev/null +++ b/readers/api/grpc/client.go @@ -0,0 +1,234 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package grpc + +import ( + "context" + "encoding/json" + "fmt" + "strings" + "time" + + grpcReadersV1 "github.com/absmach/magistrala/api/grpc/readers/v1" + "github.com/absmach/magistrala/pkg/errors" + svcerr "github.com/absmach/supermq/pkg/errors/service" + "github.com/absmach/supermq/pkg/transformers/senml" + readers "github.com/absmach/supermq/readers" + "github.com/go-kit/kit/endpoint" + kitgrpc "github.com/go-kit/kit/transport/grpc" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +const readersSvcName = "readers.v1.ReadersService" + +var _ grpcReadersV1.ReadersServiceClient = (*readersGrpcClient)(nil) + +type readersGrpcClient struct { + readMessages endpoint.Endpoint + timeout time.Duration +} + +// NewReadersClient returns new readers gRPC client instance. +func NewReadersClient(conn *grpc.ClientConn, timeout time.Duration) grpcReadersV1.ReadersServiceClient { + return &readersGrpcClient{ + readMessages: kitgrpc.NewClient( + conn, + readersSvcName, + "ReadMessages", + encodeReadMessagesRequest, + decodeReadMessagesResponse, + grpcReadersV1.ReadMessagesRes{}, + ).Endpoint(), + timeout: timeout, + } +} + +func (client readersGrpcClient) ReadMessages(ctx context.Context, in *grpcReadersV1.ReadMessagesReq, opts ...grpc.CallOption) (*grpcReadersV1.ReadMessagesRes, error) { + ctx, cancel := context.WithTimeout(ctx, client.timeout) + defer cancel() + + res, err := client.readMessages(ctx, readMessagesReq{ + chanID: in.GetChannelId(), + domain: in.GetDomainId(), + pageMeta: readers.PageMetadata{ + Offset: in.GetPageMetadata().GetOffset(), + Limit: in.GetPageMetadata().GetLimit(), + Comparator: in.GetPageMetadata().GetComparator(), + Aggregation: in.GetPageMetadata().GetAggregation().String(), + From: in.GetPageMetadata().GetFrom(), + To: in.GetPageMetadata().GetTo(), + Interval: in.GetPageMetadata().GetInterval(), + Subtopic: in.GetPageMetadata().GetSubtopic(), + Publisher: in.GetPageMetadata().GetPublisher(), + Protocol: in.GetPageMetadata().GetProtocol(), + Name: in.GetPageMetadata().GetName(), + Value: in.GetPageMetadata().GetValue(), + BoolValue: in.GetPageMetadata().GetBoolValue(), + StringValue: in.GetPageMetadata().GetStringValue(), + DataValue: in.GetPageMetadata().GetDataValue(), + Format: in.GetPageMetadata().GetFormat(), + }, + }) + if err != nil { + return &grpcReadersV1.ReadMessagesRes{}, decodeError(err) + } + + dpr := res.(readMessagesRes) + return &grpcReadersV1.ReadMessagesRes{ + Total: dpr.Total, + Messages: toResponseMessages(dpr.Messages), + PageMetadata: &grpcReadersV1.PageMetadata{ + Offset: dpr.PageMetadata.Offset, + Limit: dpr.PageMetadata.Limit, + }, + }, nil +} + +func decodeReadMessagesResponse(_ context.Context, grpcRes interface{}) (interface{}, error) { + res := grpcRes.(*grpcReadersV1.ReadMessagesRes) + return readMessagesRes{ + Total: res.Total, + Messages: fromResponseMessages(res.Messages), + PageMetadata: readers.PageMetadata{ + Offset: res.GetPageMetadata().GetOffset(), + Limit: res.GetPageMetadata().GetLimit(), + }, + }, nil +} + +func encodeReadMessagesRequest(_ context.Context, grpcReq interface{}) (interface{}, error) { + req := grpcReq.(readMessagesReq) + return &grpcReadersV1.ReadMessagesReq{ + ChannelId: req.chanID, + DomainId: req.domain, + PageMetadata: &grpcReadersV1.PageMetadata{ + Offset: req.pageMeta.Offset, + Limit: req.pageMeta.Limit, + Comparator: req.pageMeta.Comparator, + Aggregation: parseAggregation(req.pageMeta.Aggregation), + From: req.pageMeta.From, + To: req.pageMeta.To, + Interval: req.pageMeta.Interval, + Subtopic: req.pageMeta.Subtopic, + Publisher: req.pageMeta.Publisher, + Protocol: req.pageMeta.Protocol, + Name: req.pageMeta.Name, + Value: req.pageMeta.Value, + BoolValue: req.pageMeta.BoolValue, + StringValue: req.pageMeta.StringValue, + DataValue: req.pageMeta.DataValue, + Format: req.pageMeta.Format, + }, + }, nil +} + +func fromResponseMessages(protoMessages []*grpcReadersV1.Message) []readers.Message { + var messages []readers.Message + for _, m := range protoMessages { + switch msg := m.Payload.(type) { + case *grpcReadersV1.Message_Senml: + s := msg.Senml + base := s.GetBase() + typed := senml.Message{ + Channel: base.GetChannel(), + Subtopic: base.GetSubtopic(), + Publisher: base.GetPublisher(), + Protocol: base.GetProtocol(), + Name: s.GetName(), + Unit: s.GetUnit(), + Time: s.GetTime(), + UpdateTime: s.GetUpdateTime(), + Value: optionalFloat64(s.GetValue()), + StringValue: optionalString(s.GetStringValue()), + DataValue: optionalString(s.GetDataValue()), + BoolValue: optionalBool(s.GetBoolValue()), + Sum: optionalFloat64(s.GetSum()), + } + messages = append(messages, typed) + case *grpcReadersV1.Message_Json: + j := msg.Json + base := j.GetBase() + var p map[string]interface{} + if err := json.Unmarshal(j.GetPayload(), &p); err != nil { + continue + } + messages = append(messages, map[string]interface{}{ + "channel": base.GetChannel(), + "created": j.GetCreated(), + "subtopic": base.GetSubtopic(), + "publisher": base.GetPublisher(), + "protocol": base.GetProtocol(), + "payload": p, + }) + } + } + return messages +} + +func parseAggregation(agg string) grpcReadersV1.Aggregation { + switch strings.ToUpper(agg) { + case "MAX": + return grpcReadersV1.Aggregation_MAX + case "MIN": + return grpcReadersV1.Aggregation_MIN + case "SUM": + return grpcReadersV1.Aggregation_SUM + case "COUNT": + return grpcReadersV1.Aggregation_COUNT + case "AVG": + return grpcReadersV1.Aggregation_AVG + default: + return grpcReadersV1.Aggregation_AGGREGATION_UNSPECIFIED + } +} + +func decodeError(err error) error { + if st, ok := status.FromError(err); ok { + switch st.Code() { + case codes.Unauthenticated: + return errors.Wrap(svcerr.ErrAuthentication, errors.New(st.Message())) + case codes.PermissionDenied: + return errors.Wrap(svcerr.ErrAuthorization, errors.New(st.Message())) + case codes.InvalidArgument: + return errors.Wrap(errors.ErrMalformedEntity, errors.New(st.Message())) + case codes.FailedPrecondition: + return errors.Wrap(errors.ErrMalformedEntity, errors.New(st.Message())) + case codes.NotFound: + return errors.Wrap(svcerr.ErrNotFound, errors.New(st.Message())) + case codes.AlreadyExists: + return errors.Wrap(svcerr.ErrConflict, errors.New(st.Message())) + case codes.OK: + if msg := st.Message(); msg != "" { + return errors.Wrap(errors.ErrUnidentified, errors.New(msg)) + } + return nil + default: + return errors.Wrap(fmt.Errorf("unexpected gRPC status: %s (status code:%v)", st.Code().String(), st.Code()), errors.New(st.Message())) + } + } + return err +} + +func optionalString(v string) *string { + if v == "" { + return nil + } + return &v +} + +func optionalFloat64(v float64) *float64 { + if v == 0 { + return nil + } + return &v +} + +func optionalBool(v bool) *bool { + if !v { + return nil + } + return &v +} diff --git a/readers/api/grpc/doc.go b/readers/api/grpc/doc.go new file mode 100644 index 000000000..67672c6f0 --- /dev/null +++ b/readers/api/grpc/doc.go @@ -0,0 +1,5 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +// Package grpc contains implementation of Readers service gRPC API. +package grpc diff --git a/readers/api/grpc/endpoint.go b/readers/api/grpc/endpoint.go new file mode 100644 index 000000000..0b17d8892 --- /dev/null +++ b/readers/api/grpc/endpoint.go @@ -0,0 +1,31 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package grpc + +import ( + "context" + + readers "github.com/absmach/supermq/readers" + "github.com/go-kit/kit/endpoint" +) + +func readMessagesEndpoint(svc readers.MessageRepository) endpoint.Endpoint { + return func(ctx context.Context, request interface{}) (interface{}, error) { + req := request.(readMessagesReq) + if err := req.validate(); err != nil { + return readMessagesRes{}, err + } + + page, err := svc.ReadAll(req.chanID, req.pageMeta) + if err != nil { + return readMessagesRes{}, err + } + + return readMessagesRes{ + PageMetadata: page.PageMetadata, + Total: page.Total, + Messages: page.Messages, + }, nil + } +} diff --git a/readers/api/grpc/endpoint_test.go b/readers/api/grpc/endpoint_test.go new file mode 100644 index 000000000..69ef2b899 --- /dev/null +++ b/readers/api/grpc/endpoint_test.go @@ -0,0 +1,230 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package grpc_test + +import ( + "context" + "encoding/json" + "fmt" + "net" + "testing" + "time" + + grpcReadersV1 "github.com/absmach/magistrala/api/grpc/readers/v1" + "github.com/absmach/magistrala/pkg/errors" + grpcapi "github.com/absmach/magistrala/readers/api/grpc" + apiutil "github.com/absmach/supermq/api/http/util" + "github.com/absmach/supermq/pkg/transformers/senml" + "github.com/absmach/supermq/readers" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +const ( + port = 7071 + channelID = "testChannelID" + domain = "testDomain" + validID = "validID" + validToken = "valid" + inValidToken = "invalid" + testOffset = 0 + testLimit = 10 +) + +var authAddr = fmt.Sprintf("localhost:%d", port) + +func startGRPCServer(svc readers.MessageRepository, port int) *grpc.Server { + listener, _ := net.Listen("tcp", fmt.Sprintf(":%d", port)) + server := grpc.NewServer() + grpcReadersV1.RegisterReadersServiceServer(server, grpcapi.NewReadersServer(svc)) + go func() { + err := server.Serve(listener) + assert.Nil(&testing.T{}, err, fmt.Sprintf(`"Unexpected error creating reader server %s"`, err)) + }() + + return server +} + +func TestReadMessages(t *testing.T) { + conn, err := grpc.NewClient(authAddr, grpc.WithTransportCredentials(insecure.NewCredentials())) + assert.Nil(t, err, fmt.Sprintf("Unexpected error creating client connection %s", err)) + grpcClient := grpcapi.NewReadersClient(conn, time.Second) + + tmp := readers.MessagesPage{ + Total: 1, + PageMetadata: readers.PageMetadata{ + Offset: 0, + Limit: 10, + }, + Messages: []readers.Message{ + map[string]interface{}{ + "channel": "testChannel", + "created": int64(123456789), + "subtopic": "testSubtopic", + "publisher": "testPublisher", + "protocol": "testProtocol", + "payload": map[string]interface{}{ + "temp": 23.5, + }, + }, + }, + } + + expectedPayload, err := json.Marshal(tmp.Messages[0].(map[string]interface{})["payload"]) + require.NoError(t, err) + + expectedRes := &grpcReadersV1.ReadMessagesRes{ + Total: 1, + Messages: []*grpcReadersV1.Message{ + { + Payload: &grpcReadersV1.Message_Json{ + Json: &grpcReadersV1.JsonMessage{ + Base: &grpcReadersV1.BaseMessage{ + Channel: "testChannel", + Subtopic: "testSubtopic", + Publisher: "testPublisher", + Protocol: "testProtocol", + }, + Created: 123456789, + Payload: expectedPayload, + }, + }, + }, + }, + PageMetadata: &grpcReadersV1.PageMetadata{ + Offset: 0, + Limit: 10, + }, + } + + cases := []struct { + desc string + token string + svcRes readers.MessagesPage + ReadMessagesReq *grpcReadersV1.ReadMessagesReq + ReadMessagesRes *grpcReadersV1.ReadMessagesRes + err error + }{ + { + desc: "read valid req", + token: validToken, + ReadMessagesReq: &grpcReadersV1.ReadMessagesReq{ + ChannelId: channelID, + DomainId: domain, + PageMetadata: &grpcReadersV1.PageMetadata{ + Offset: testOffset, + Limit: testLimit, + }, + }, + svcRes: tmp, + + ReadMessagesRes: expectedRes, + err: nil, + }, + { + desc: " read missing channel id", + token: validToken, + ReadMessagesReq: &grpcReadersV1.ReadMessagesReq{ + ChannelId: "", + DomainId: domain, + PageMetadata: &grpcReadersV1.PageMetadata{ + Offset: testOffset, + Limit: testLimit, + }, + }, + ReadMessagesRes: &grpcReadersV1.ReadMessagesRes{}, + err: apiutil.ErrMissingID, + }, + { + desc: "read valid SenML message", + token: validToken, + ReadMessagesReq: &grpcReadersV1.ReadMessagesReq{ + ChannelId: channelID, + DomainId: domain, + PageMetadata: &grpcReadersV1.PageMetadata{ + Offset: testOffset, + Limit: testLimit, + }, + }, + svcRes: readers.MessagesPage{ + Total: 1, + PageMetadata: readers.PageMetadata{ + Offset: 0, + Limit: 10, + }, + Messages: []readers.Message{ + senml.Message{ + Channel: "senmlChannel", + Subtopic: "senmlSub", + Publisher: "senmlPublisher", + Protocol: "mqtt", + Name: "temperature", + Unit: "C", + Time: 1672531200, + UpdateTime: 1672531300, + Value: float64Ptr(22.5), + StringValue: stringPtr("ok"), + DataValue: stringPtr("binary"), + BoolValue: boolPtr(true), + Sum: float64Ptr(123.4), + }, + }, + }, + ReadMessagesRes: &grpcReadersV1.ReadMessagesRes{ + Total: 1, + PageMetadata: &grpcReadersV1.PageMetadata{ + Offset: 0, + Limit: 10, + }, + Messages: []*grpcReadersV1.Message{ + { + Payload: &grpcReadersV1.Message_Senml{ + Senml: &grpcReadersV1.SenMLMessage{ + Base: &grpcReadersV1.BaseMessage{ + Channel: "senmlChannel", + Subtopic: "senmlSub", + Publisher: "senmlPublisher", + Protocol: "mqtt", + }, + Name: "temperature", + Unit: "C", + Time: 1672531200, + UpdateTime: 1672531300, + Value: 22.5, + StringValue: "ok", + DataValue: "binary", + BoolValue: true, + Sum: 123.4, + }, + }, + }, + }, + }, + }, + } + + for _, tc := range cases { + repoCall := svc.On("ReadAll", mock.Anything, mock.Anything).Return(tc.svcRes, tc.err) + dpr, err := grpcClient.ReadMessages(context.Background(), tc.ReadMessagesReq) + assert.Equal(t, tc.ReadMessagesRes.Messages, dpr.Messages, fmt.Sprintf("%s: expected %v got %v", tc.desc, tc.ReadMessagesRes.Messages, dpr.Messages)) + + assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err)) + repoCall.Unset() + } +} + +func float64Ptr(v float64) *float64 { + return &v +} + +func stringPtr(v string) *string { + return &v +} + +func boolPtr(v bool) *bool { + return &v +} diff --git a/readers/api/grpc/request.go b/readers/api/grpc/request.go new file mode 100644 index 000000000..c53ca5cf2 --- /dev/null +++ b/readers/api/grpc/request.go @@ -0,0 +1,69 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package grpc + +import ( + "slices" + "strings" + "time" + + apiutil "github.com/absmach/supermq/api/http/util" + "github.com/absmach/supermq/readers" +) + +const maxLimitSize = 1000 + +var validAggregations = []string{"MAX", "MIN", "AVG", "SUM", "COUNT"} + +type readMessagesReq struct { + chanID string + domain string + pageMeta readers.PageMetadata +} + +func (req readMessagesReq) validate() error { + if req.chanID == "" { + return apiutil.ErrMissingID + } + if req.domain == "" { + return apiutil.ErrMissingID + } + + if req.pageMeta.Limit < 1 || req.pageMeta.Limit > maxLimitSize { + return apiutil.ErrLimitSize + } + + if req.pageMeta.Comparator != "" && + req.pageMeta.Comparator != readers.EqualKey && + req.pageMeta.Comparator != readers.LowerThanKey && + req.pageMeta.Comparator != readers.LowerThanEqualKey && + req.pageMeta.Comparator != readers.GreaterThanKey && + req.pageMeta.Comparator != readers.GreaterThanEqualKey { + return apiutil.ErrInvalidComparator + } + + if req.pageMeta.Aggregation == "AGGREGATION_UNSPECIFIED" { + req.pageMeta.Aggregation = "" + } + + if agg := strings.ToUpper(req.pageMeta.Aggregation); agg != "" && agg != "AGGREGATION_UNSPECIFIED" { + if req.pageMeta.From == 0 { + return apiutil.ErrMissingFrom + } + + if req.pageMeta.To == 0 { + return apiutil.ErrMissingTo + } + + if !slices.Contains(validAggregations, strings.ToUpper(req.pageMeta.Aggregation)) { + return apiutil.ErrInvalidAggregation + } + + if _, err := time.ParseDuration(req.pageMeta.Interval); err != nil { + return apiutil.ErrInvalidInterval + } + } + + return nil +} diff --git a/readers/api/grpc/responses.go b/readers/api/grpc/responses.go new file mode 100644 index 000000000..fbe06dc74 --- /dev/null +++ b/readers/api/grpc/responses.go @@ -0,0 +1,16 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package grpc + +import ( + "github.com/absmach/supermq/readers" +) + +type readMessagesRes struct { + Total uint64 + Messages []readers.Message + readers.PageMetadata +} + +type Message interface{} diff --git a/readers/api/grpc/server.go b/readers/api/grpc/server.go new file mode 100644 index 000000000..240fecfbf --- /dev/null +++ b/readers/api/grpc/server.go @@ -0,0 +1,189 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package grpc + +import ( + "context" + "encoding/json" + + grpcReadersV1 "github.com/absmach/magistrala/api/grpc/readers/v1" + grpcapi "github.com/absmach/supermq/auth/api/grpc" + "github.com/absmach/supermq/pkg/transformers/senml" + "github.com/absmach/supermq/readers" + kitgrpc "github.com/go-kit/kit/transport/grpc" +) + +var _ grpcReadersV1.ReadersServiceServer = (*readersGrpcServer)(nil) + +type readersGrpcServer struct { + grpcReadersV1.UnimplementedReadersServiceServer + readMessages kitgrpc.Handler +} + +func NewReadersServer(svc readers.MessageRepository) grpcReadersV1.ReadersServiceServer { + return &readersGrpcServer{ + readMessages: kitgrpc.NewServer( + (readMessagesEndpoint(svc)), + decodeReadMessagesRequest, + encodeReadMessagesResponse, + ), + } +} + +func decodeReadMessagesRequest(_ context.Context, grpcReq interface{}) (interface{}, error) { + req := grpcReq.(*grpcReadersV1.ReadMessagesReq) + return readMessagesReq{ + chanID: req.GetChannelId(), + domain: req.GetDomainId(), + pageMeta: readers.PageMetadata{ + Offset: req.GetPageMetadata().GetOffset(), + Limit: req.GetPageMetadata().GetLimit(), + Comparator: req.GetPageMetadata().GetComparator(), + Aggregation: stringifyAggregation(req.GetPageMetadata().GetAggregation()), + From: req.GetPageMetadata().GetFrom(), + To: req.GetPageMetadata().GetTo(), + Interval: req.GetPageMetadata().GetInterval(), + Subtopic: req.GetPageMetadata().GetSubtopic(), + Publisher: req.GetPageMetadata().GetPublisher(), + Protocol: req.GetPageMetadata().GetProtocol(), + Name: req.GetPageMetadata().GetName(), + Value: req.GetPageMetadata().GetValue(), + BoolValue: req.GetPageMetadata().GetBoolValue(), + StringValue: req.GetPageMetadata().GetStringValue(), + DataValue: req.GetPageMetadata().GetDataValue(), + Format: req.GetPageMetadata().GetFormat(), + }, + }, nil +} + +func encodeReadMessagesResponse(_ context.Context, grpcRes interface{}) (interface{}, error) { + res := grpcRes.(readMessagesRes) + + resp := &grpcReadersV1.ReadMessagesRes{ + Total: res.Total, + Messages: toResponseMessages(res.Messages), + PageMetadata: &grpcReadersV1.PageMetadata{ + Offset: res.PageMetadata.Offset, + Limit: res.PageMetadata.Limit, + }, + } + return resp, nil +} + +func (s *readersGrpcServer) ReadMessages(ctx context.Context, req *grpcReadersV1.ReadMessagesReq) (*grpcReadersV1.ReadMessagesRes, error) { + _, res, err := s.readMessages.ServeGRPC(ctx, req) + if err != nil { + return nil, grpcapi.EncodeError(err) + } + return res.(*grpcReadersV1.ReadMessagesRes), nil +} + +func toResponseMessages(messages []readers.Message) []*grpcReadersV1.Message { + var res []*grpcReadersV1.Message + for _, m := range messages { + switch typed := m.(type) { + case senml.Message: + res = append(res, &grpcReadersV1.Message{ + Payload: &grpcReadersV1.Message_Senml{ + Senml: &grpcReadersV1.SenMLMessage{ + Base: &grpcReadersV1.BaseMessage{ + Channel: typed.Channel, + Subtopic: typed.Subtopic, + Publisher: typed.Publisher, + Protocol: typed.Protocol, + }, + Name: typed.Name, + Unit: typed.Unit, + Time: typed.Time, + UpdateTime: typed.UpdateTime, + Value: derefFloat64(typed.Value), + StringValue: derefString(typed.StringValue), + DataValue: derefString(typed.DataValue), + BoolValue: derefBool(typed.BoolValue), + Sum: derefFloat64(typed.Sum), + }, + }, + }) + case map[string]interface{}: + payload := typed["payload"] + data, err := json.Marshal(payload) + if err != nil { + continue + } + res = append(res, &grpcReadersV1.Message{ + Payload: &grpcReadersV1.Message_Json{ + Json: &grpcReadersV1.JsonMessage{ + Base: &grpcReadersV1.BaseMessage{ + Channel: safeString(typed["channel"]), + Subtopic: safeString(typed["subtopic"]), + Publisher: safeString(typed["publisher"]), + Protocol: safeString(typed["protocol"]), + }, + Created: safeInt64(typed["created"]), + Payload: data, + }, + }, + }) + } + } + return res +} + +func stringifyAggregation(agg grpcReadersV1.Aggregation) string { + switch agg { + case grpcReadersV1.Aggregation_AGGREGATION_UNSPECIFIED: + return "" + case grpcReadersV1.Aggregation_MAX: + return "MAX" + case grpcReadersV1.Aggregation_MIN: + return "MIN" + case grpcReadersV1.Aggregation_AVG: + return "AVG" + case grpcReadersV1.Aggregation_SUM: + return "SUM" + case grpcReadersV1.Aggregation_COUNT: + return "COUNT" + default: + return "" + } +} + +func derefString(s *string) string { + if s == nil { + return "" + } + return *s +} + +func derefFloat64(f *float64) float64 { + if f == nil { + return 0 + } + return *f +} + +func derefBool(b *bool) bool { + if b == nil { + return false + } + return *b +} + +func safeString(v interface{}) string { + if s, ok := v.(string); ok { + return s + } + return "" +} + +func safeInt64(v interface{}) int64 { + switch v := v.(type) { + case float64: + return int64(v) + case int64: + return v + default: + return 0 + } +} diff --git a/readers/api/grpc/setup_test.go b/readers/api/grpc/setup_test.go new file mode 100644 index 000000000..001e6219f --- /dev/null +++ b/readers/api/grpc/setup_test.go @@ -0,0 +1,24 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package grpc_test + +import ( + "os" + "testing" + + "github.com/absmach/supermq/readers/mocks" +) + +var svc *mocks.MessageRepository + +func TestMain(m *testing.M) { + svc = new(mocks.MessageRepository) + server := startGRPCServer(svc, port) + + code := m.Run() + + server.GracefulStop() + + os.Exit(code) +} diff --git a/readers/api/endpoint.go b/readers/api/http/endpoint.go similarity index 98% rename from readers/api/endpoint.go rename to readers/api/http/endpoint.go index 61d1d6038..ab9b9f356 100644 --- a/readers/api/endpoint.go +++ b/readers/api/http/endpoint.go @@ -1,7 +1,7 @@ // Copyright (c) Abstract Machines // SPDX-License-Identifier: Apache-2.0 -package api +package http import ( "context" diff --git a/readers/api/endpoint_test.go b/readers/api/http/endpoint_test.go similarity index 99% rename from readers/api/endpoint_test.go rename to readers/api/http/endpoint_test.go index 6c439ca01..ac8076259 100644 --- a/readers/api/endpoint_test.go +++ b/readers/api/http/endpoint_test.go @@ -1,7 +1,7 @@ // Copyright (c) Abstract Machines // SPDX-License-Identifier: Apache-2.0 -package api_test +package http_test import ( "encoding/json" @@ -12,7 +12,7 @@ import ( "time" "github.com/absmach/magistrala/internal/testsutil" - "github.com/absmach/magistrala/readers/api" + customhttp "github.com/absmach/magistrala/readers/api/http" grpcChannelsV1 "github.com/absmach/supermq/api/grpc/channels/v1" grpcClientsV1 "github.com/absmach/supermq/api/grpc/clients/v1" apiutil "github.com/absmach/supermq/api/http/util" @@ -55,7 +55,7 @@ var ( ) func newServer(repo *mocks.MessageRepository, authn *authnmocks.Authentication, clients *climocks.ClientsServiceClient, channels *chmocks.ChannelsServiceClient) *httptest.Server { - mux := api.MakeHandler(repo, authn, clients, channels, svcName, instanceID) + mux := customhttp.MakeHandler(repo, authn, clients, channels, svcName, instanceID) return httptest.NewServer(mux) } diff --git a/readers/api/requests.go b/readers/api/http/requests.go similarity index 99% rename from readers/api/requests.go rename to readers/api/http/requests.go index 38b6af978..15e99612b 100644 --- a/readers/api/requests.go +++ b/readers/api/http/requests.go @@ -1,7 +1,7 @@ // Copyright (c) Abstract Machines // SPDX-License-Identifier: Apache-2.0 -package api +package http import ( "slices" diff --git a/readers/api/responses.go b/readers/api/http/responses.go similarity index 97% rename from readers/api/responses.go rename to readers/api/http/responses.go index c7270856c..2867239a3 100644 --- a/readers/api/responses.go +++ b/readers/api/http/responses.go @@ -1,7 +1,7 @@ // Copyright (c) Abstract Machines // SPDX-License-Identifier: Apache-2.0 -package api +package http import ( "net/http" diff --git a/readers/api/transport.go b/readers/api/http/transport.go similarity index 99% rename from readers/api/transport.go rename to readers/api/http/transport.go index 6ba6cd494..ae46bf448 100644 --- a/readers/api/transport.go +++ b/readers/api/http/transport.go @@ -1,7 +1,7 @@ // Copyright (c) Abstract Machines // SPDX-License-Identifier: Apache-2.0 -package api +package http import ( "context" diff --git a/readers/middleware/doc.go b/readers/middleware/doc.go new file mode 100644 index 000000000..78e1451d1 --- /dev/null +++ b/readers/middleware/doc.go @@ -0,0 +1,5 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +// Package middleware provides middleware for Magistrala Readers service. +package middleware diff --git a/readers/api/logging.go b/readers/middleware/logging.go similarity index 98% rename from readers/api/logging.go rename to readers/middleware/logging.go index 30f013ec2..d7df40883 100644 --- a/readers/api/logging.go +++ b/readers/middleware/logging.go @@ -3,7 +3,7 @@ //go:build !test -package api +package middleware import ( "log/slog" diff --git a/readers/api/metrics.go b/readers/middleware/metrics.go similarity index 98% rename from readers/api/metrics.go rename to readers/middleware/metrics.go index 717ab91bc..852113020 100644 --- a/readers/api/metrics.go +++ b/readers/middleware/metrics.go @@ -3,7 +3,7 @@ //go:build !test -package api +package middleware import ( "time" diff --git a/scripts/ci.sh b/scripts/ci.sh index 6944ea2aa..c9c70c5b3 100755 --- a/scripts/ci.sh +++ b/scripts/ci.sh @@ -5,9 +5,9 @@ # This script contains commands to be executed by the CI tool. NPROC=$(nproc) GO_VERSION=1.22.4 -PROTOC_VERSION=27.1 -PROTOC_GEN_VERSION=v1.34.2 -PROTOC_GRPC_VERSION=v1.4.0 +PROTOC_VERSION=30.2 +PROTOC_GEN_VERSION=v1.36.6 +PROTOC_GRPC_VERSION=v1.5.1 GOLANGCI_LINT_VERSION=v1.60.3 function version_gt() { test "$(printf '%s\n' "$@" | sort -V | head -n 1)" != "$1"; }