SMQ-2761 - Support domain and channel routes in message topic (#2775)

Signed-off-by: Felix Gateru <felix.gateru@gmail.com>
This commit is contained in:
Felix Gateru
2025-06-20 10:57:36 +03:00
committed by GitHub
parent 2aa247b3c3
commit 5b7b759131
48 changed files with 1447 additions and 134 deletions
+13 -9
View File
@@ -327,12 +327,13 @@ const file_channels_v1_channels_proto_rawDesc = "" +
"\bAuthzRes\x12\x1e\n" +
"\n" +
"authorized\x18\x01 \x01(\bR\n" +
"authorized2\x8b\x03\n" +
"authorized2\xdd\x03\n" +
"\x0fChannelsService\x12;\n" +
"\tAuthorize\x12\x15.channels.v1.AuthzReq\x1a\x15.channels.v1.AuthzRes\"\x00\x12m\n" +
"\x17RemoveClientConnections\x12'.channels.v1.RemoveClientConnectionsReq\x1a'.channels.v1.RemoveClientConnectionsRes\"\x00\x12|\n" +
"\x1cUnsetParentGroupFromChannels\x12,.channels.v1.UnsetParentGroupFromChannelsReq\x1a,.channels.v1.UnsetParentGroupFromChannelsRes\"\x00\x12N\n" +
"\x0eRetrieveEntity\x12\x1c.common.v1.RetrieveEntityReq\x1a\x1c.common.v1.RetrieveEntityRes\"\x00B1Z/github.com/absmach/supermq/api/grpc/channels/v1b\x06proto3"
"\x0eRetrieveEntity\x12\x1c.common.v1.RetrieveEntityReq\x1a\x1c.common.v1.RetrieveEntityRes\"\x00\x12P\n" +
"\x0fRetrieveByRoute\x12\x1d.common.v1.RetrieveByRouteReq\x1a\x1c.common.v1.RetrieveEntityRes\"\x00B1Z/github.com/absmach/supermq/api/grpc/channels/v1b\x06proto3"
var (
file_channels_v1_channels_proto_rawDescOnce sync.Once
@@ -355,19 +356,22 @@ var file_channels_v1_channels_proto_goTypes = []any{
(*AuthzReq)(nil), // 4: channels.v1.AuthzReq
(*AuthzRes)(nil), // 5: channels.v1.AuthzRes
(*v1.RetrieveEntityReq)(nil), // 6: common.v1.RetrieveEntityReq
(*v1.RetrieveEntityRes)(nil), // 7: common.v1.RetrieveEntityRes
(*v1.RetrieveByRouteReq)(nil), // 7: common.v1.RetrieveByRouteReq
(*v1.RetrieveEntityRes)(nil), // 8: common.v1.RetrieveEntityRes
}
var file_channels_v1_channels_proto_depIdxs = []int32{
4, // 0: channels.v1.ChannelsService.Authorize:input_type -> channels.v1.AuthzReq
0, // 1: channels.v1.ChannelsService.RemoveClientConnections:input_type -> channels.v1.RemoveClientConnectionsReq
2, // 2: channels.v1.ChannelsService.UnsetParentGroupFromChannels:input_type -> channels.v1.UnsetParentGroupFromChannelsReq
6, // 3: channels.v1.ChannelsService.RetrieveEntity:input_type -> common.v1.RetrieveEntityReq
5, // 4: channels.v1.ChannelsService.Authorize:output_type -> channels.v1.AuthzRes
1, // 5: channels.v1.ChannelsService.RemoveClientConnections:output_type -> channels.v1.RemoveClientConnectionsRes
3, // 6: channels.v1.ChannelsService.UnsetParentGroupFromChannels:output_type -> channels.v1.UnsetParentGroupFromChannelsRes
7, // 7: channels.v1.ChannelsService.RetrieveEntity:output_type -> common.v1.RetrieveEntityRes
4, // [4:8] is the sub-list for method output_type
0, // [0:4] is the sub-list for method input_type
7, // 4: channels.v1.ChannelsService.RetrieveByRoute:input_type -> common.v1.RetrieveByRouteReq
5, // 5: channels.v1.ChannelsService.Authorize:output_type -> channels.v1.AuthzRes
1, // 6: channels.v1.ChannelsService.RemoveClientConnections:output_type -> channels.v1.RemoveClientConnectionsRes
3, // 7: channels.v1.ChannelsService.UnsetParentGroupFromChannels:output_type -> channels.v1.UnsetParentGroupFromChannelsRes
8, // 8: channels.v1.ChannelsService.RetrieveEntity:output_type -> common.v1.RetrieveEntityRes
8, // 9: channels.v1.ChannelsService.RetrieveByRoute:output_type -> common.v1.RetrieveEntityRes
5, // [5:10] is the sub-list for method output_type
0, // [0:5] is the sub-list for method input_type
0, // [0:0] is the sub-list for extension type_name
0, // [0:0] is the sub-list for extension extendee
0, // [0:0] is the sub-list for field type_name
+38
View File
@@ -27,6 +27,7 @@ const (
ChannelsService_RemoveClientConnections_FullMethodName = "/channels.v1.ChannelsService/RemoveClientConnections"
ChannelsService_UnsetParentGroupFromChannels_FullMethodName = "/channels.v1.ChannelsService/UnsetParentGroupFromChannels"
ChannelsService_RetrieveEntity_FullMethodName = "/channels.v1.ChannelsService/RetrieveEntity"
ChannelsService_RetrieveByRoute_FullMethodName = "/channels.v1.ChannelsService/RetrieveByRoute"
)
// ChannelsServiceClient is the client API for ChannelsService service.
@@ -37,6 +38,7 @@ type ChannelsServiceClient interface {
RemoveClientConnections(ctx context.Context, in *RemoveClientConnectionsReq, opts ...grpc.CallOption) (*RemoveClientConnectionsRes, error)
UnsetParentGroupFromChannels(ctx context.Context, in *UnsetParentGroupFromChannelsReq, opts ...grpc.CallOption) (*UnsetParentGroupFromChannelsRes, error)
RetrieveEntity(ctx context.Context, in *v1.RetrieveEntityReq, opts ...grpc.CallOption) (*v1.RetrieveEntityRes, error)
RetrieveByRoute(ctx context.Context, in *v1.RetrieveByRouteReq, opts ...grpc.CallOption) (*v1.RetrieveEntityRes, error)
}
type channelsServiceClient struct {
@@ -87,6 +89,16 @@ func (c *channelsServiceClient) RetrieveEntity(ctx context.Context, in *v1.Retri
return out, nil
}
func (c *channelsServiceClient) RetrieveByRoute(ctx context.Context, in *v1.RetrieveByRouteReq, opts ...grpc.CallOption) (*v1.RetrieveEntityRes, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(v1.RetrieveEntityRes)
err := c.cc.Invoke(ctx, ChannelsService_RetrieveByRoute_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
// ChannelsServiceServer is the server API for ChannelsService service.
// All implementations must embed UnimplementedChannelsServiceServer
// for forward compatibility.
@@ -95,6 +107,7 @@ type ChannelsServiceServer interface {
RemoveClientConnections(context.Context, *RemoveClientConnectionsReq) (*RemoveClientConnectionsRes, error)
UnsetParentGroupFromChannels(context.Context, *UnsetParentGroupFromChannelsReq) (*UnsetParentGroupFromChannelsRes, error)
RetrieveEntity(context.Context, *v1.RetrieveEntityReq) (*v1.RetrieveEntityRes, error)
RetrieveByRoute(context.Context, *v1.RetrieveByRouteReq) (*v1.RetrieveEntityRes, error)
mustEmbedUnimplementedChannelsServiceServer()
}
@@ -117,6 +130,9 @@ func (UnimplementedChannelsServiceServer) UnsetParentGroupFromChannels(context.C
func (UnimplementedChannelsServiceServer) RetrieveEntity(context.Context, *v1.RetrieveEntityReq) (*v1.RetrieveEntityRes, error) {
return nil, status.Errorf(codes.Unimplemented, "method RetrieveEntity not implemented")
}
func (UnimplementedChannelsServiceServer) RetrieveByRoute(context.Context, *v1.RetrieveByRouteReq) (*v1.RetrieveEntityRes, error) {
return nil, status.Errorf(codes.Unimplemented, "method RetrieveByRoute not implemented")
}
func (UnimplementedChannelsServiceServer) mustEmbedUnimplementedChannelsServiceServer() {}
func (UnimplementedChannelsServiceServer) testEmbeddedByValue() {}
@@ -210,6 +226,24 @@ func _ChannelsService_RetrieveEntity_Handler(srv interface{}, ctx context.Contex
return interceptor(ctx, in, info, handler)
}
func _ChannelsService_RetrieveByRoute_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(v1.RetrieveByRouteReq)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ChannelsServiceServer).RetrieveByRoute(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: ChannelsService_RetrieveByRoute_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ChannelsServiceServer).RetrieveByRoute(ctx, req.(*v1.RetrieveByRouteReq))
}
return interceptor(ctx, in, info, handler)
}
// ChannelsService_ServiceDesc is the grpc.ServiceDesc for ChannelsService service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
@@ -233,6 +267,10 @@ var ChannelsService_ServiceDesc = grpc.ServiceDesc{
MethodName: "RetrieveEntity",
Handler: _ChannelsService_RetrieveEntity_Handler,
},
{
MethodName: "RetrieveByRoute",
Handler: _ChannelsService_RetrieveByRoute_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "channels/v1/channels.proto",
+59 -3
View File
@@ -536,6 +536,58 @@ func (x *Connection) GetType() uint32 {
return 0
}
type RetrieveByRouteReq struct {
state protoimpl.MessageState `protogen:"open.v1"`
Route string `protobuf:"bytes,1,opt,name=route,proto3" json:"route,omitempty"`
DomainId string `protobuf:"bytes,2,opt,name=domain_id,json=domainId,proto3" json:"domain_id,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *RetrieveByRouteReq) Reset() {
*x = RetrieveByRouteReq{}
mi := &file_common_v1_common_proto_msgTypes[10]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *RetrieveByRouteReq) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*RetrieveByRouteReq) ProtoMessage() {}
func (x *RetrieveByRouteReq) ProtoReflect() protoreflect.Message {
mi := &file_common_v1_common_proto_msgTypes[10]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use RetrieveByRouteReq.ProtoReflect.Descriptor instead.
func (*RetrieveByRouteReq) Descriptor() ([]byte, []int) {
return file_common_v1_common_proto_rawDescGZIP(), []int{10}
}
func (x *RetrieveByRouteReq) GetRoute() string {
if x != nil {
return x.Route
}
return ""
}
func (x *RetrieveByRouteReq) GetDomainId() string {
if x != nil {
return x.DomainId
}
return ""
}
var File_common_v1_common_proto protoreflect.FileDescriptor
const file_common_v1_common_proto_rawDesc = "" +
@@ -571,7 +623,10 @@ const file_common_v1_common_proto_rawDesc = "" +
"\n" +
"channel_id\x18\x02 \x01(\tR\tchannelId\x12\x1b\n" +
"\tdomain_id\x18\x03 \x01(\tR\bdomainId\x12\x12\n" +
"\x04type\x18\x04 \x01(\rR\x04typeB/Z-github.com/absmach/supermq/api/grpc/common/v1b\x06proto3"
"\x04type\x18\x04 \x01(\rR\x04type\"G\n" +
"\x12RetrieveByRouteReq\x12\x14\n" +
"\x05route\x18\x01 \x01(\tR\x05route\x12\x1b\n" +
"\tdomain_id\x18\x02 \x01(\tR\bdomainIdB/Z-github.com/absmach/supermq/api/grpc/common/v1b\x06proto3"
var (
file_common_v1_common_proto_rawDescOnce sync.Once
@@ -585,7 +640,7 @@ func file_common_v1_common_proto_rawDescGZIP() []byte {
return file_common_v1_common_proto_rawDescData
}
var file_common_v1_common_proto_msgTypes = make([]protoimpl.MessageInfo, 10)
var file_common_v1_common_proto_msgTypes = make([]protoimpl.MessageInfo, 11)
var file_common_v1_common_proto_goTypes = []any{
(*RetrieveEntitiesReq)(nil), // 0: common.v1.RetrieveEntitiesReq
(*RetrieveEntitiesRes)(nil), // 1: common.v1.RetrieveEntitiesRes
@@ -597,6 +652,7 @@ var file_common_v1_common_proto_goTypes = []any{
(*RemoveConnectionsReq)(nil), // 7: common.v1.RemoveConnectionsReq
(*RemoveConnectionsRes)(nil), // 8: common.v1.RemoveConnectionsRes
(*Connection)(nil), // 9: common.v1.Connection
(*RetrieveByRouteReq)(nil), // 10: common.v1.RetrieveByRouteReq
}
var file_common_v1_common_proto_depIdxs = []int32{
4, // 0: common.v1.RetrieveEntitiesRes.entities:type_name -> common.v1.EntityBasic
@@ -621,7 +677,7 @@ func file_common_v1_common_proto_init() {
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: unsafe.Slice(unsafe.StringData(file_common_v1_common_proto_rawDesc), len(file_common_v1_common_proto_rawDesc)),
NumEnums: 0,
NumMessages: 10,
NumMessages: 11,
NumExtensions: 0,
NumServices: 0,
},
+14 -10
View File
@@ -122,10 +122,11 @@ const file_domains_v1_domains_proto_rawDesc = "" +
"\rDeleteUserRes\x12\x18\n" +
"\adeleted\x18\x01 \x01(\bR\adeleted\"\x1f\n" +
"\rDeleteUserReq\x12\x0e\n" +
"\x02id\x18\x01 \x01(\tR\x02id2\xb1\x01\n" +
"\x02id\x18\x01 \x01(\tR\x02id2\x83\x02\n" +
"\x0eDomainsService\x12O\n" +
"\x15DeleteUserFromDomains\x12\x19.domains.v1.DeleteUserReq\x1a\x19.domains.v1.DeleteUserRes\"\x00\x12N\n" +
"\x0eRetrieveEntity\x12\x1c.common.v1.RetrieveEntityReq\x1a\x1c.common.v1.RetrieveEntityRes\"\x00B5Z3github.com/absmach/supermq/internal/grpc/domains/v1b\x06proto3"
"\x0eRetrieveEntity\x12\x1c.common.v1.RetrieveEntityReq\x1a\x1c.common.v1.RetrieveEntityRes\"\x00\x12P\n" +
"\x0fRetrieveByRoute\x12\x1d.common.v1.RetrieveByRouteReq\x1a\x1c.common.v1.RetrieveEntityRes\"\x00B5Z3github.com/absmach/supermq/internal/grpc/domains/v1b\x06proto3"
var (
file_domains_v1_domains_proto_rawDescOnce sync.Once
@@ -141,18 +142,21 @@ func file_domains_v1_domains_proto_rawDescGZIP() []byte {
var file_domains_v1_domains_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
var file_domains_v1_domains_proto_goTypes = []any{
(*DeleteUserRes)(nil), // 0: domains.v1.DeleteUserRes
(*DeleteUserReq)(nil), // 1: domains.v1.DeleteUserReq
(*v1.RetrieveEntityReq)(nil), // 2: common.v1.RetrieveEntityReq
(*v1.RetrieveEntityRes)(nil), // 3: common.v1.RetrieveEntityRes
(*DeleteUserRes)(nil), // 0: domains.v1.DeleteUserRes
(*DeleteUserReq)(nil), // 1: domains.v1.DeleteUserReq
(*v1.RetrieveEntityReq)(nil), // 2: common.v1.RetrieveEntityReq
(*v1.RetrieveByRouteReq)(nil), // 3: common.v1.RetrieveByRouteReq
(*v1.RetrieveEntityRes)(nil), // 4: common.v1.RetrieveEntityRes
}
var file_domains_v1_domains_proto_depIdxs = []int32{
1, // 0: domains.v1.DomainsService.DeleteUserFromDomains:input_type -> domains.v1.DeleteUserReq
2, // 1: domains.v1.DomainsService.RetrieveEntity:input_type -> common.v1.RetrieveEntityReq
0, // 2: domains.v1.DomainsService.DeleteUserFromDomains:output_type -> domains.v1.DeleteUserRes
3, // 3: domains.v1.DomainsService.RetrieveEntity:output_type -> common.v1.RetrieveEntityRes
2, // [2:4] is the sub-list for method output_type
0, // [0:2] is the sub-list for method input_type
3, // 2: domains.v1.DomainsService.RetrieveByRoute:input_type -> common.v1.RetrieveByRouteReq
0, // 3: domains.v1.DomainsService.DeleteUserFromDomains:output_type -> domains.v1.DeleteUserRes
4, // 4: domains.v1.DomainsService.RetrieveEntity:output_type -> common.v1.RetrieveEntityRes
4, // 5: domains.v1.DomainsService.RetrieveByRoute:output_type -> common.v1.RetrieveEntityRes
3, // [3:6] is the sub-list for method output_type
0, // [0:3] is the sub-list for method input_type
0, // [0:0] is the sub-list for extension type_name
0, // [0:0] is the sub-list for extension extendee
0, // [0:0] is the sub-list for field type_name
+38
View File
@@ -25,6 +25,7 @@ const _ = grpc.SupportPackageIsVersion9
const (
DomainsService_DeleteUserFromDomains_FullMethodName = "/domains.v1.DomainsService/DeleteUserFromDomains"
DomainsService_RetrieveEntity_FullMethodName = "/domains.v1.DomainsService/RetrieveEntity"
DomainsService_RetrieveByRoute_FullMethodName = "/domains.v1.DomainsService/RetrieveByRoute"
)
// DomainsServiceClient is the client API for DomainsService service.
@@ -36,6 +37,7 @@ const (
type DomainsServiceClient interface {
DeleteUserFromDomains(ctx context.Context, in *DeleteUserReq, opts ...grpc.CallOption) (*DeleteUserRes, error)
RetrieveEntity(ctx context.Context, in *v1.RetrieveEntityReq, opts ...grpc.CallOption) (*v1.RetrieveEntityRes, error)
RetrieveByRoute(ctx context.Context, in *v1.RetrieveByRouteReq, opts ...grpc.CallOption) (*v1.RetrieveEntityRes, error)
}
type domainsServiceClient struct {
@@ -66,6 +68,16 @@ func (c *domainsServiceClient) RetrieveEntity(ctx context.Context, in *v1.Retrie
return out, nil
}
func (c *domainsServiceClient) RetrieveByRoute(ctx context.Context, in *v1.RetrieveByRouteReq, opts ...grpc.CallOption) (*v1.RetrieveEntityRes, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(v1.RetrieveEntityRes)
err := c.cc.Invoke(ctx, DomainsService_RetrieveByRoute_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
// DomainsServiceServer is the server API for DomainsService service.
// All implementations must embed UnimplementedDomainsServiceServer
// for forward compatibility.
@@ -75,6 +87,7 @@ func (c *domainsServiceClient) RetrieveEntity(ctx context.Context, in *v1.Retrie
type DomainsServiceServer interface {
DeleteUserFromDomains(context.Context, *DeleteUserReq) (*DeleteUserRes, error)
RetrieveEntity(context.Context, *v1.RetrieveEntityReq) (*v1.RetrieveEntityRes, error)
RetrieveByRoute(context.Context, *v1.RetrieveByRouteReq) (*v1.RetrieveEntityRes, error)
mustEmbedUnimplementedDomainsServiceServer()
}
@@ -91,6 +104,9 @@ func (UnimplementedDomainsServiceServer) DeleteUserFromDomains(context.Context,
func (UnimplementedDomainsServiceServer) RetrieveEntity(context.Context, *v1.RetrieveEntityReq) (*v1.RetrieveEntityRes, error) {
return nil, status.Errorf(codes.Unimplemented, "method RetrieveEntity not implemented")
}
func (UnimplementedDomainsServiceServer) RetrieveByRoute(context.Context, *v1.RetrieveByRouteReq) (*v1.RetrieveEntityRes, error) {
return nil, status.Errorf(codes.Unimplemented, "method RetrieveByRoute not implemented")
}
func (UnimplementedDomainsServiceServer) mustEmbedUnimplementedDomainsServiceServer() {}
func (UnimplementedDomainsServiceServer) testEmbeddedByValue() {}
@@ -148,6 +164,24 @@ func _DomainsService_RetrieveEntity_Handler(srv interface{}, ctx context.Context
return interceptor(ctx, in, info, handler)
}
func _DomainsService_RetrieveByRoute_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(v1.RetrieveByRouteReq)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(DomainsServiceServer).RetrieveByRoute(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: DomainsService_RetrieveByRoute_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(DomainsServiceServer).RetrieveByRoute(ctx, req.(*v1.RetrieveByRouteReq))
}
return interceptor(ctx, in, info, handler)
}
// DomainsService_ServiceDesc is the grpc.ServiceDesc for DomainsService service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
@@ -163,6 +197,10 @@ var DomainsService_ServiceDesc = grpc.ServiceDesc{
MethodName: "RetrieveEntity",
Handler: _DomainsService_RetrieveEntity_Handler,
},
{
MethodName: "RetrieveByRoute",
Handler: _DomainsService_RetrieveByRoute_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "domains/v1/domains.proto",
+2 -2
View File
@@ -176,7 +176,6 @@ func EncodeError(_ context.Context, err error, w http.ResponseWriter) {
errors.Contains(err, errors.ErrMalformedEntity),
errors.Contains(err, apiutil.ErrMissingID),
errors.Contains(err, apiutil.ErrMissingName),
errors.Contains(err, apiutil.ErrMissingRoute),
errors.Contains(err, apiutil.ErrMissingEmail),
errors.Contains(err, apiutil.ErrInvalidEmail),
errors.Contains(err, apiutil.ErrMissingHost),
@@ -234,7 +233,8 @@ func EncodeError(_ context.Context, err error, w http.ResponseWriter) {
errors.Contains(err, apiutil.ErrMissingDescription),
errors.Contains(err, apiutil.ErrMissingEntityID),
errors.Contains(err, apiutil.ErrInvalidRouteFormat),
errors.Contains(err, svcerr.ErrRetainOneMember):
errors.Contains(err, svcerr.ErrRetainOneMember),
errors.Contains(err, apiutil.ErrMissingRoute):
err = unwrap(err)
w.WriteHeader(http.StatusBadRequest)
+9 -11
View File
@@ -37,14 +37,14 @@ servers:
- user-password: []
channels:
/m/{domainID}/c/{channelID}/{subtopic}:
/m/{domainPrefix}/c/{channelPrefix}/{subtopic}:
parameters:
domainID:
$ref: '#/components/parameters/domainID'
domainPrefix:
$ref: '#/components/parameters/domainPrefix'
in: path
required: true
channelID:
$ref: '#/components/parameters/channelID'
channelPrefix:
$ref: '#/components/parameters/channelPrefix'
in: path
required: true
subtopic:
@@ -91,16 +91,14 @@ components:
```
parameters:
domainID:
description: Domain ID associated with the channel and client.
domainPrefix:
description: ID or route of the domain associated with the channel and client.
schema:
type: string
format: uuid
channelID:
description: Channel ID connected to the Client ID defined in the username.
channelPrefix:
description: ID or route of the channel connected to the Client ID defined in the username.
schema:
type: string
format: uuid
subtopic:
description: Arbitrary message subtopic.
schema:
+9 -11
View File
@@ -32,14 +32,14 @@ servers:
default: '8186'
channels:
'm/{domainID}/c/{channelID}/{subtopic}':
'm/{domainPrefix}/c/{channelPrefix}/{subtopic}':
parameters:
domainID:
$ref: '#/components/parameters/domainID'
domainPrefix:
$ref: '#/components/parameters/domainPrefix'
in: path
required: true
channelID:
$ref: '#/components/parameters/channelID'
channelPrefix:
$ref: '#/components/parameters/channelPrefix'
in: path
required: true
subtopic:
@@ -129,16 +129,14 @@ components:
```
parameters:
domainID:
description: Domain ID associated with the channel and client.
domainPrefix:
description: ID or route of the domain associated with the channel and client.
schema:
type: string
format: uuid
channelID:
description: Channel ID connected to the Client ID defined in the username.
channelPrefix:
description: ID or route of the channel connected to the Client ID defined in the username.
schema:
type: string
format: uuid
subtopic:
description: Arbitrary message subtopic.
schema:
+9 -11
View File
@@ -27,7 +27,7 @@ tags:
url: https://docs.supermq.abstractmachines.fr/
paths:
/m/{domainID}/c/{channelID}:
/m/{domainPrefix}/c/{channelPrefix}:
post:
summary: Sends message to the communication channel
description: |
@@ -36,8 +36,8 @@ paths:
tags:
- messages
parameters:
- $ref: "#/components/parameters/domainID"
- $ref: "#/components/parameters/channelID"
- $ref: "#/components/parameters/domainPrefix"
- $ref: "#/components/parameters/channelPrefix"
requestBody:
$ref: "#/components/requestBodies/MessageReq"
responses:
@@ -130,21 +130,19 @@ components:
$ref: "#/components/schemas/SenMLRecord"
parameters:
domainID:
name: domainID
description: Unique domain identifier.
domainPrefix:
name: domainPrefix
description: ID or route of the domain associated with the channel and client.
in: path
schema:
type: string
format: uuid
required: true
channelID:
name: channelID
description: Unique channel identifier.
channelPrefix:
name: channelPrefix
description: ID or route of the channel connected to the client.
in: path
schema:
type: string
format: uuid
required: true
requestBodies:
+29
View File
@@ -30,6 +30,7 @@ type grpcClient struct {
removeClientConnections endpoint.Endpoint
unsetParentGroupFromChannels endpoint.Endpoint
retrieveEntity endpoint.Endpoint
retrieveByRoute endpoint.Endpoint
}
// NewClient returns new gRPC client instance.
@@ -67,6 +68,14 @@ func NewClient(conn *grpc.ClientConn, timeout time.Duration) grpcChannelsV1.Chan
decodeRetrieveEntityResponse,
grpcCommonV1.RetrieveEntityRes{},
).Endpoint(),
retrieveByRoute: kitgrpc.NewClient(
conn,
svcName,
"RetrieveByRoute",
encodeRetrieveByRouteRequest,
decodeRetrieveByRouteResponse,
grpcCommonV1.RetrieveEntityRes{},
).Endpoint(),
timeout: timeout,
}
}
@@ -167,6 +176,26 @@ func decodeRetrieveEntityResponse(_ context.Context, grpcRes interface{}) (inter
return grpcRes.(*grpcCommonV1.RetrieveEntityRes), nil
}
func (client grpcClient) RetrieveByRoute(ctx context.Context, req *grpcCommonV1.RetrieveByRouteReq, _ ...grpc.CallOption) (r *grpcCommonV1.RetrieveEntityRes, err error) {
ctx, cancel := context.WithTimeout(ctx, client.timeout)
defer cancel()
res, err := client.retrieveByRoute(ctx, req)
if err != nil {
return &grpcCommonV1.RetrieveEntityRes{}, decodeError(err)
}
return res.(*grpcCommonV1.RetrieveEntityRes), nil
}
func encodeRetrieveByRouteRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
return grpcReq.(*grpcCommonV1.RetrieveByRouteReq), nil
}
func decodeRetrieveByRouteResponse(_ context.Context, grpcRes interface{}) (interface{}, error) {
return grpcRes.(*grpcCommonV1.RetrieveEntityRes), nil
}
func decodeError(err error) error {
if st, ok := status.FromError(err); ok {
switch st.Code() {
+16
View File
@@ -67,3 +67,19 @@ func retrieveEntityEndpoint(svc channels.Service) endpoint.Endpoint {
return retrieveEntityRes{id: channel.ID, domain: channel.Domain, parentGroup: channel.ParentGroup, status: uint8(channel.Status)}, nil
}
}
func retrieveByRouteEndpoint(svc channels.Service) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (interface{}, error) {
req := request.(retrieveByRouteReq)
if err := req.validate(); err != nil {
return retrieveEntityRes{}, err
}
channel, err := svc.RetrieveByRoute(ctx, req.route, req.domainID)
if err != nil {
return retrieveEntityRes{}, err
}
return retrieveEntityRes{id: channel.ID, status: uint8(channel.Status)}, nil
}
}
+85
View File
@@ -12,6 +12,7 @@ import (
grpcChannelsV1 "github.com/absmach/supermq/api/grpc/channels/v1"
grpcCommonV1 "github.com/absmach/supermq/api/grpc/common/v1"
apiutil "github.com/absmach/supermq/api/http/util"
"github.com/absmach/supermq/channels"
ch "github.com/absmach/supermq/channels"
grpcapi "github.com/absmach/supermq/channels/api/grpc"
@@ -265,3 +266,87 @@ func TestRetrieveEntity(t *testing.T) {
})
}
}
func TestRetrieveByRoute(t *testing.T) {
svc := new(mocks.Service)
server := startGRPCServer(svc, port)
defer server.GracefulStop()
authAddr := fmt.Sprintf("localhost:%d", port)
conn, _ := grpc.NewClient(authAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
client := grpcapi.NewClient(conn, time.Second)
validRoute := "validRoute"
domainID := testsutil.GenerateUUID(t)
channel := ch.Channel{
ID: validID,
Route: validRoute,
Status: channels.EnabledStatus,
}
cases := []struct {
desc string
retrieveReq *grpcCommonV1.RetrieveByRouteReq
svcRes ch.Channel
svcErr error
retrieveRes *grpcCommonV1.RetrieveEntityRes
err error
}{
{
desc: "retrieve entity by route successfully",
retrieveReq: &grpcCommonV1.RetrieveByRouteReq{
Route: validRoute,
DomainId: domainID,
},
svcRes: channel,
retrieveRes: &grpcCommonV1.RetrieveEntityRes{
Entity: &grpcCommonV1.EntityBasic{
Id: channel.ID,
Status: uint32(ch.EnabledStatus),
},
},
err: nil,
},
{
desc: "retrieve entity by route with empty route",
retrieveReq: &grpcCommonV1.RetrieveByRouteReq{
Route: "",
DomainId: domainID,
},
svcRes: ch.Channel{},
retrieveRes: &grpcCommonV1.RetrieveEntityRes{},
err: apiutil.ErrMissingRoute,
},
{
desc: "retrieve entity by route with empty domain ID",
retrieveReq: &grpcCommonV1.RetrieveByRouteReq{
Route: validRoute,
DomainId: "",
},
svcRes: ch.Channel{},
retrieveRes: &grpcCommonV1.RetrieveEntityRes{},
err: apiutil.ErrMissingDomainID,
},
{
desc: "retrieve entity by route with invalid route",
retrieveReq: &grpcCommonV1.RetrieveByRouteReq{
Route: "invalidRoute",
DomainId: domainID,
},
svcRes: ch.Channel{},
svcErr: svcerr.ErrNotFound,
retrieveRes: &grpcCommonV1.RetrieveEntityRes{},
err: svcerr.ErrNotFound,
},
}
for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
svcCall := svc.On("RetrieveByRoute", mock.Anything, tc.retrieveReq.Route, tc.retrieveReq.DomainId).Return(tc.svcRes, tc.svcErr)
res, err := client.RetrieveByRoute(context.Background(), tc.retrieveReq)
assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %s got %s", tc.desc, tc.err, err))
assert.Equal(t, tc.retrieveRes.Entity, res.Entity)
svcCall.Unset()
})
}
}
+17
View File
@@ -4,6 +4,7 @@
package grpc
import (
apiutil "github.com/absmach/supermq/api/http/util"
"github.com/absmach/supermq/pkg/connections"
"github.com/absmach/supermq/pkg/errors"
"github.com/absmach/supermq/pkg/policies"
@@ -37,3 +38,19 @@ type unsetParentGroupFromChannelsReq struct {
type retrieveEntityReq struct {
Id string
}
type retrieveByRouteReq struct {
route string
domainID string
}
func (req retrieveByRouteReq) validate() error {
if req.route == "" {
return apiutil.ErrMissingRoute
}
if req.domainID == "" {
return apiutil.ErrMissingDomainID
}
return nil
}
+33
View File
@@ -27,6 +27,7 @@ type grpcServer struct {
removeClientConnections kitgrpc.Handler
unsetParentGroupFromChannels kitgrpc.Handler
retrieveEntity kitgrpc.Handler
retrieveByRoute kitgrpc.Handler
}
// NewServer returns new AuthServiceServer instance.
@@ -52,6 +53,11 @@ func NewServer(svc channels.Service) grpcChannelsV1.ChannelsServiceServer {
decodeRetrieveEntityRequest,
encodeRetrieveEntityResponse,
),
retrieveByRoute: kitgrpc.NewServer(
retrieveByRouteEndpoint(svc),
decodeRetrieveByRouteRequest,
encodeRetrieveByRouteResponse,
),
}
}
@@ -154,6 +160,33 @@ func encodeRetrieveEntityResponse(_ context.Context, grpcRes interface{}) (inter
}, nil
}
func decodeRetrieveByRouteRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
req := grpcReq.(*grpcCommonV1.RetrieveByRouteReq)
return retrieveByRouteReq{
route: req.GetRoute(),
domainID: req.GetDomainId(),
}, nil
}
func encodeRetrieveByRouteResponse(_ context.Context, grpcRes interface{}) (interface{}, error) {
res := grpcRes.(retrieveEntityRes)
return &grpcCommonV1.RetrieveEntityRes{
Entity: &grpcCommonV1.EntityBasic{
Id: res.id,
Status: uint32(res.status),
},
}, nil
}
func (s *grpcServer) RetrieveByRoute(ctx context.Context, req *grpcCommonV1.RetrieveByRouteReq) (*grpcCommonV1.RetrieveEntityRes, error) {
_, res, err := s.retrieveByRoute.ServeGRPC(ctx, req)
if err != nil {
return nil, encodeError(err)
}
return res.(*grpcCommonV1.RetrieveEntityRes), nil
}
func encodeError(err error) error {
switch {
case errors.Contains(err, nil):
+1
View File
@@ -72,6 +72,7 @@ func TestCreateChannelEndpoint(t *testing.T) {
Metadata: map[string]interface{}{
"name": "test",
},
Route: valid,
}
reqWithRoute := reqChannel
reqWithRoute.Route = valid
+71
View File
@@ -185,6 +185,77 @@ func (_c *ChannelsServiceClient_RemoveClientConnections_Call) RunAndReturn(run f
return _c
}
// RetrieveByRoute provides a mock function for the type ChannelsServiceClient
func (_mock *ChannelsServiceClient) RetrieveByRoute(ctx context.Context, in *v10.RetrieveByRouteReq, opts ...grpc.CallOption) (*v10.RetrieveEntityRes, error) {
var tmpRet mock.Arguments
if len(opts) > 0 {
tmpRet = _mock.Called(ctx, in, opts)
} else {
tmpRet = _mock.Called(ctx, in)
}
ret := tmpRet
if len(ret) == 0 {
panic("no return value specified for RetrieveByRoute")
}
var r0 *v10.RetrieveEntityRes
var r1 error
if returnFunc, ok := ret.Get(0).(func(context.Context, *v10.RetrieveByRouteReq, []grpc.CallOption) (*v10.RetrieveEntityRes, error)); ok {
return returnFunc(ctx, in, opts)
}
if returnFunc, ok := ret.Get(0).(func(context.Context, *v10.RetrieveByRouteReq, ...grpc.CallOption) *v10.RetrieveEntityRes); ok {
r0 = returnFunc(ctx, in, opts...)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*v10.RetrieveEntityRes)
}
}
if returnFunc, ok := ret.Get(1).(func(context.Context, *v10.RetrieveByRouteReq, ...grpc.CallOption) error); ok {
r1 = returnFunc(ctx, in, opts...)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// ChannelsServiceClient_RetrieveByRoute_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RetrieveByRoute'
type ChannelsServiceClient_RetrieveByRoute_Call struct {
*mock.Call
}
// RetrieveByRoute is a helper method to define mock.On call
// - ctx
// - in
// - opts
func (_e *ChannelsServiceClient_Expecter) RetrieveByRoute(ctx interface{}, in interface{}, opts ...interface{}) *ChannelsServiceClient_RetrieveByRoute_Call {
return &ChannelsServiceClient_RetrieveByRoute_Call{Call: _e.mock.On("RetrieveByRoute",
append([]interface{}{ctx, in}, opts...)...)}
}
func (_c *ChannelsServiceClient_RetrieveByRoute_Call) Run(run func(ctx context.Context, in *v10.RetrieveByRouteReq, opts ...grpc.CallOption)) *ChannelsServiceClient_RetrieveByRoute_Call {
_c.Call.Run(func(args mock.Arguments) {
variadicArgs := make([]grpc.CallOption, len(args)-2)
for i, a := range args[2:] {
if a != nil {
variadicArgs[i] = a.(grpc.CallOption)
}
}
run(args[0].(context.Context), args[1].(*v10.RetrieveByRouteReq), variadicArgs...)
})
return _c
}
func (_c *ChannelsServiceClient_RetrieveByRoute_Call) Return(retrieveEntityRes *v10.RetrieveEntityRes, err error) *ChannelsServiceClient_RetrieveByRoute_Call {
_c.Call.Return(retrieveEntityRes, err)
return _c
}
func (_c *ChannelsServiceClient_RetrieveByRoute_Call) RunAndReturn(run func(ctx context.Context, in *v10.RetrieveByRouteReq, opts ...grpc.CallOption) (*v10.RetrieveEntityRes, error)) *ChannelsServiceClient_RetrieveByRoute_Call {
_c.Call.Return(run)
return _c
}
// RetrieveEntity provides a mock function for the type ChannelsServiceClient
func (_mock *ChannelsServiceClient) RetrieveEntity(ctx context.Context, in *v10.RetrieveEntityReq, opts ...grpc.CallOption) (*v10.RetrieveEntityRes, error) {
var tmpRet mock.Arguments
+56
View File
@@ -188,6 +188,62 @@ func (_c *Service_RetrieveByID_Call) RunAndReturn(run func(ctx context.Context,
return _c
}
// RetrieveByRoute provides a mock function for the type Service
func (_mock *Service) RetrieveByRoute(ctx context.Context, route string, domainID string) (channels.Channel, error) {
ret := _mock.Called(ctx, route, domainID)
if len(ret) == 0 {
panic("no return value specified for RetrieveByRoute")
}
var r0 channels.Channel
var r1 error
if returnFunc, ok := ret.Get(0).(func(context.Context, string, string) (channels.Channel, error)); ok {
return returnFunc(ctx, route, domainID)
}
if returnFunc, ok := ret.Get(0).(func(context.Context, string, string) channels.Channel); ok {
r0 = returnFunc(ctx, route, domainID)
} else {
r0 = ret.Get(0).(channels.Channel)
}
if returnFunc, ok := ret.Get(1).(func(context.Context, string, string) error); ok {
r1 = returnFunc(ctx, route, domainID)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// Service_RetrieveByRoute_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RetrieveByRoute'
type Service_RetrieveByRoute_Call struct {
*mock.Call
}
// RetrieveByRoute is a helper method to define mock.On call
// - ctx
// - route
// - domainID
func (_e *Service_Expecter) RetrieveByRoute(ctx interface{}, route interface{}, domainID interface{}) *Service_RetrieveByRoute_Call {
return &Service_RetrieveByRoute_Call{Call: _e.mock.On("RetrieveByRoute", ctx, route, domainID)}
}
func (_c *Service_RetrieveByRoute_Call) Run(run func(ctx context.Context, route string, domainID string)) *Service_RetrieveByRoute_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(string), args[2].(string))
})
return _c
}
func (_c *Service_RetrieveByRoute_Call) Return(channel channels.Channel, err error) *Service_RetrieveByRoute_Call {
_c.Call.Return(channel, err)
return _c
}
func (_c *Service_RetrieveByRoute_Call) RunAndReturn(run func(ctx context.Context, route string, domainID string) (channels.Channel, error)) *Service_RetrieveByRoute_Call {
_c.Call.Return(run)
return _c
}
// UnsetParentGroupFromChannels provides a mock function for the type Service
func (_mock *Service) UnsetParentGroupFromChannels(ctx context.Context, parentGroupID string) error {
ret := _mock.Called(ctx, parentGroupID)
+5
View File
@@ -22,6 +22,7 @@ type Service interface {
UnsetParentGroupFromChannels(ctx context.Context, parentGroupID string) error
RemoveClientConnections(ctx context.Context, clientID string) error
RetrieveByID(ctx context.Context, id string) (channels.Channel, error)
RetrieveByRoute(ctx context.Context, route string, domainID string) (channels.Channel, error)
}
type service struct {
@@ -120,3 +121,7 @@ func (svc service) UnsetParentGroupFromChannels(ctx context.Context, parentGroup
func (svc service) RetrieveByID(ctx context.Context, id string) (channels.Channel, error) {
return svc.repo.RetrieveByID(ctx, id)
}
func (svc service) RetrieveByRoute(ctx context.Context, route string, domainID string) (channels.Channel, error) {
return svc.repo.RetrieveByRoute(ctx, route, domainID)
}
+19 -1
View File
@@ -17,6 +17,7 @@ import (
httpapi "github.com/absmach/supermq/coap/api"
"github.com/absmach/supermq/coap/tracing"
smqlog "github.com/absmach/supermq/logger"
domainsAuthz "github.com/absmach/supermq/pkg/domains/grpcclient"
"github.com/absmach/supermq/pkg/grpcclient"
jaegerclient "github.com/absmach/supermq/pkg/jaeger"
"github.com/absmach/supermq/pkg/messaging/brokers"
@@ -37,6 +38,7 @@ const (
envPrefixHTTP = "SMQ_COAP_ADAPTER_HTTP_"
envPrefixClients = "SMQ_CLIENTS_GRPC_"
envPrefixChannels = "SMQ_CHANNELS_GRPC_"
envPrefixDomains = "SMQ_DOMAINS_GRPC_"
defSvcHTTPPort = "5683"
defSvcCoAPPort = "5683"
)
@@ -90,6 +92,22 @@ func main() {
return
}
domsGrpcCfg := grpcclient.Config{}
if err := env.ParseWithOptions(&domsGrpcCfg, env.Options{Prefix: envPrefixDomains}); err != nil {
logger.Error(fmt.Sprintf("failed to load domains gRPC client configuration : %s", err))
exitCode = 1
return
}
_, domainsClient, domainsHandler, err := domainsAuthz.NewAuthorization(ctx, domsGrpcCfg)
if err != nil {
logger.Error(err.Error())
exitCode = 1
return
}
defer domainsHandler.Close()
logger.Info("Domains service gRPC client successfully connected to domains gRPC server " + domainsHandler.Secure())
clientsClientCfg := grpcclient.Config{}
if err := env.ParseWithOptions(&clientsClientCfg, env.Options{Prefix: envPrefixClients}); err != nil {
logger.Error(fmt.Sprintf("failed to load %s auth configuration : %s", svcName, err))
@@ -163,7 +181,7 @@ func main() {
hs := httpserver.NewServer(ctx, cancel, svcName, httpServerConfig, httpapi.MakeHandler(cfg.InstanceID), logger)
cs := coapserver.NewServer(ctx, cancel, svcName, coapServerConfig, httpapi.MakeCoAPHandler(svc, logger), logger)
cs := coapserver.NewServer(ctx, cancel, svcName, coapServerConfig, httpapi.MakeCoAPHandler(svc, channelsClient, domainsClient, logger), logger)
if cfg.SendTelemetry {
chc := chclient.New(svcName, supermq.Version, logger, cancel)
+22 -3
View File
@@ -21,11 +21,13 @@ import (
"github.com/absmach/supermq"
grpcChannelsV1 "github.com/absmach/supermq/api/grpc/channels/v1"
grpcClientsV1 "github.com/absmach/supermq/api/grpc/clients/v1"
grpcDomainsV1 "github.com/absmach/supermq/api/grpc/domains/v1"
adapter "github.com/absmach/supermq/http"
httpapi "github.com/absmach/supermq/http/api"
smqlog "github.com/absmach/supermq/logger"
smqauthn "github.com/absmach/supermq/pkg/authn"
"github.com/absmach/supermq/pkg/authn/authsvc"
domainsAuthz "github.com/absmach/supermq/pkg/domains/grpcclient"
"github.com/absmach/supermq/pkg/grpcclient"
jaegerclient "github.com/absmach/supermq/pkg/jaeger"
"github.com/absmach/supermq/pkg/messaging"
@@ -48,6 +50,7 @@ const (
envPrefixClients = "SMQ_CLIENTS_GRPC_"
envPrefixChannels = "SMQ_CHANNELS_GRPC_"
envPrefixAuth = "SMQ_AUTH_GRPC_"
envPrefixDomains = "SMQ_DOMAINS_GRPC_"
defSvcHTTPPort = "80"
targetHTTPProtocol = "http"
targetHTTPHost = "localhost"
@@ -97,6 +100,22 @@ func main() {
return
}
domsGrpcCfg := grpcclient.Config{}
if err := env.ParseWithOptions(&domsGrpcCfg, env.Options{Prefix: envPrefixDomains}); err != nil {
logger.Error(fmt.Sprintf("failed to load domains gRPC client configuration : %s", err))
exitCode = 1
return
}
_, domainsClient, domainsHandler, err := domainsAuthz.NewAuthorization(ctx, domsGrpcCfg)
if err != nil {
logger.Error(err.Error())
exitCode = 1
return
}
defer domainsHandler.Close()
logger.Info("Domains service gRPC client successfully connected to domains gRPC server " + domainsHandler.Secure())
clientsClientCfg := grpcclient.Config{}
if err := env.ParseWithOptions(&clientsClientCfg, env.Options{Prefix: envPrefixClients}); err != nil {
logger.Error(fmt.Sprintf("failed to load clients gRPC client configuration : %s", err))
@@ -174,7 +193,7 @@ func main() {
return
}
svc := newService(pub, authn, clientsClient, channelsClient, logger, tracer)
svc := newService(pub, authn, clientsClient, channelsClient, domainsClient, logger, tracer)
targetServerCfg := server.Config{Port: targetHTTPPort}
hs := httpserver.NewServer(ctx, cancel, svcName, targetServerCfg, httpapi.MakeHandler(logger, cfg.InstanceID), logger)
@@ -201,8 +220,8 @@ func main() {
}
}
func newService(pub messaging.Publisher, authn smqauthn.Authentication, clients grpcClientsV1.ClientsServiceClient, channels grpcChannelsV1.ChannelsServiceClient, logger *slog.Logger, tracer trace.Tracer) session.Handler {
svc := adapter.NewHandler(pub, authn, clients, channels, logger)
func newService(pub messaging.Publisher, authn smqauthn.Authentication, clients grpcClientsV1.ClientsServiceClient, channels grpcChannelsV1.ChannelsServiceClient, domains grpcDomainsV1.DomainsServiceClient, logger *slog.Logger, tracer trace.Tracer) session.Handler {
svc := adapter.NewHandler(pub, authn, clients, channels, domains, logger)
svc = handler.NewTracing(tracer, svc)
svc = handler.LoggingMiddleware(svc, logger)
counter, latency := prometheus.MakeMetrics(svcName, "api")
+125 -11
View File
@@ -23,13 +23,19 @@ import (
"github.com/absmach/mgate/pkg/mqtt/websocket"
"github.com/absmach/mgate/pkg/session"
"github.com/absmach/supermq"
grpcChannelsV1 "github.com/absmach/supermq/api/grpc/channels/v1"
grpcCommonV1 "github.com/absmach/supermq/api/grpc/common/v1"
grpcDomainsV1 "github.com/absmach/supermq/api/grpc/domains/v1"
api "github.com/absmach/supermq/api/http"
smqlog "github.com/absmach/supermq/logger"
"github.com/absmach/supermq/mqtt"
"github.com/absmach/supermq/mqtt/events"
mqtttracing "github.com/absmach/supermq/mqtt/tracing"
domainsAuthz "github.com/absmach/supermq/pkg/domains/grpcclient"
"github.com/absmach/supermq/pkg/errors"
"github.com/absmach/supermq/pkg/grpcclient"
jaegerclient "github.com/absmach/supermq/pkg/jaeger"
"github.com/absmach/supermq/pkg/messaging"
"github.com/absmach/supermq/pkg/messaging/brokers"
brokerstracing "github.com/absmach/supermq/pkg/messaging/brokers/tracing"
msgevents "github.com/absmach/supermq/pkg/messaging/events"
@@ -47,9 +53,15 @@ const (
svcName = "mqtt"
envPrefixClients = "SMQ_CLIENTS_GRPC_"
envPrefixChannels = "SMQ_CHANNELS_GRPC_"
envPrefixDomains = "SMQ_DOMAINS_GRPC_"
wsPathPrefix = "/mqtt"
)
var (
errFailedResolveDomain = errors.New("failed to resolve domain route")
errFailedResolveChannel = errors.New("failed to resolve channel route")
)
type config struct {
LogLevel string `env:"SMQ_MQTT_ADAPTER_LOG_LEVEL" envDefault:"info"`
MQTTPort string `env:"SMQ_MQTT_ADAPTER_MQTT_PORT" envDefault:"1883"`
@@ -172,6 +184,20 @@ func main() {
return
}
domsGrpcCfg := grpcclient.Config{}
if err := env.ParseWithOptions(&domsGrpcCfg, env.Options{Prefix: envPrefixDomains}); err != nil {
logger.Error(fmt.Sprintf("failed to load domains gRPC client configuration : %s", err))
exitCode = 1
return
}
_, domainsClient, domainsHandler, err := domainsAuthz.NewAuthorization(ctx, domsGrpcCfg)
if err != nil {
logger.Error(err.Error())
exitCode = 1
return
}
defer domainsHandler.Close()
clientsClientCfg := grpcclient.Config{}
if err := env.ParseWithOptions(&clientsClientCfg, env.Options{Prefix: envPrefixClients}); err != nil {
logger.Error(fmt.Sprintf("failed to load %s auth configuration : %s", svcName, err))
@@ -220,18 +246,23 @@ func main() {
go chc.CallHome(ctx)
}
interceptor := interceptor{
beforeHandler := beforeHandler{
domains: domainsClient,
channels: channelsClient,
}
afterHandler := afterHandler{
username: cfg.MQTTTargetUsername,
password: cfg.MQTTTargetPassword,
}
logger.Info(fmt.Sprintf("Starting MQTT proxy on port %s", cfg.MQTTPort))
g.Go(func() error {
return proxyMQTT(ctx, cfg, logger, h, interceptor)
return proxyMQTT(ctx, cfg, logger, h, beforeHandler, afterHandler)
})
logger.Info(fmt.Sprintf("Starting MQTT over WS proxy on port %s", cfg.HTTPPort))
g.Go(func() error {
return proxyWS(ctx, cfg, logger, h, interceptor)
return proxyWS(ctx, cfg, logger, h, afterHandler)
})
g.Go(func() error {
@@ -243,13 +274,13 @@ func main() {
}
}
func proxyMQTT(ctx context.Context, cfg config, logger *slog.Logger, sessionHandler session.Handler, interceptor session.Interceptor) error {
func proxyMQTT(ctx context.Context, cfg config, logger *slog.Logger, sessionHandler session.Handler, beforeHandler, afterHandler session.Interceptor) error {
config := mgate.Config{
Port: cfg.MQTTPort,
TargetHost: cfg.MQTTTargetHost,
TargetPort: cfg.MQTTTargetPort,
}
mproxy := mgatemqtt.New(config, sessionHandler, nil, interceptor, logger)
mproxy := mgatemqtt.New(config, sessionHandler, beforeHandler, afterHandler, logger)
errCh := make(chan error)
go func() {
@@ -324,21 +355,21 @@ func stopSignalHandler(ctx context.Context, cancel context.CancelFunc, logger *s
}
}
type interceptor struct {
type afterHandler struct {
username string
password string
}
// This interceptor adds the correct credentials to upstream MQTT broker since the downstream clients
// are authenticated to the MQTT adapter but not upstream MQTT broker.
func (ic interceptor) Intercept(ctx context.Context, pkt packets.ControlPacket, dir session.Direction) (packets.ControlPacket, error) {
func (ah afterHandler) Intercept(ctx context.Context, pkt packets.ControlPacket, dir session.Direction) (packets.ControlPacket, error) {
if connectPkt, ok := pkt.(*packets.ConnectPacket); ok {
if ic.username != "" {
connectPkt.Username = ic.username
if ah.username != "" {
connectPkt.Username = ah.username
connectPkt.UsernameFlag = true
}
if ic.password != "" {
connectPkt.Password = []byte(ic.password)
if ah.password != "" {
connectPkt.Password = []byte(ah.password)
connectPkt.PasswordFlag = true
}
@@ -347,3 +378,86 @@ func (ic interceptor) Intercept(ctx context.Context, pkt packets.ControlPacket,
return pkt, nil
}
type beforeHandler struct {
domains grpcDomainsV1.DomainsServiceClient
channels grpcChannelsV1.ChannelsServiceClient
}
// This interceptor is used to replace domain and channel routes with relevant domain and channel IDs in the message topic.
func (bh beforeHandler) Intercept(ctx context.Context, pkt packets.ControlPacket, dir session.Direction) (packets.ControlPacket, error) {
switch pt := pkt.(type) {
case *packets.SubscribePacket:
for i, topic := range pt.Topics {
ft, err := bh.resolveTopic(ctx, topic)
if err != nil {
return nil, err
}
pt.Topics[i] = ft
}
return pt, nil
case *packets.UnsubscribePacket:
for i, topic := range pt.Topics {
ft, err := bh.resolveTopic(ctx, topic)
if err != nil {
return nil, err
}
pt.Topics[i] = ft
}
return pt, nil
case *packets.PublishPacket:
ft, err := bh.resolveTopic(ctx, pt.TopicName)
if err != nil {
return nil, err
}
pt.TopicName = ft
return pt, nil
}
return pkt, nil
}
func (bh beforeHandler) resolveTopic(ctx context.Context, topic string) (string, error) {
matches := messaging.TopicRegExp.FindStringSubmatch(topic)
if len(matches) < 4 {
return "", messaging.ErrMalformedTopic
}
domainID, err := bh.resolveDomain(ctx, matches[1])
if err != nil {
return "", errors.Wrap(errFailedResolveDomain, err)
}
channelID, err := bh.resolveChannel(ctx, matches[2], domainID)
if err != nil {
return "", errors.Wrap(errFailedResolveChannel, err)
}
return fmt.Sprintf("m/%s/c/%s%s", domainID, channelID, matches[3]), nil
}
func (bh beforeHandler) resolveDomain(ctx context.Context, domain string) (string, error) {
if api.ValidateUUID(domain) == nil {
return domain, nil
}
resp, err := bh.domains.RetrieveByRoute(ctx, &grpcCommonV1.RetrieveByRouteReq{Route: domain})
if err != nil {
return "", err
}
return resp.Entity.Id, nil
}
func (bh beforeHandler) resolveChannel(ctx context.Context, channel, domainID string) (string, error) {
if api.ValidateUUID(channel) == nil {
return channel, nil
}
resp, err := bh.channels.RetrieveByRoute(ctx, &grpcCommonV1.RetrieveByRouteReq{
Route: channel,
DomainId: domainID,
})
if err != nil {
return "", err
}
return resp.Entity.Id, nil
}
+23 -4
View File
@@ -19,8 +19,10 @@ import (
"github.com/absmach/supermq"
grpcChannelsV1 "github.com/absmach/supermq/api/grpc/channels/v1"
grpcClientsV1 "github.com/absmach/supermq/api/grpc/clients/v1"
grpcDomainsV1 "github.com/absmach/supermq/api/grpc/domains/v1"
smqlog "github.com/absmach/supermq/logger"
"github.com/absmach/supermq/pkg/authn/authsvc"
domainsAuthz "github.com/absmach/supermq/pkg/domains/grpcclient"
"github.com/absmach/supermq/pkg/grpcclient"
jaegerclient "github.com/absmach/supermq/pkg/jaeger"
"github.com/absmach/supermq/pkg/messaging"
@@ -45,6 +47,7 @@ const (
envPrefixClients = "SMQ_CLIENTS_GRPC_"
envPrefixChannels = "SMQ_CHANNELS_GRPC_"
envPrefixAuth = "SMQ_AUTH_GRPC_"
envPrefixDomains = "SMQ_DOMAINS_GRPC_"
defSvcHTTPPort = "8190"
targetWSProtocol = "http"
targetWSHost = "localhost"
@@ -98,6 +101,22 @@ func main() {
Host: targetWSHost,
}
domsGrpcCfg := grpcclient.Config{}
if err := env.ParseWithOptions(&domsGrpcCfg, env.Options{Prefix: envPrefixDomains}); err != nil {
logger.Error(fmt.Sprintf("failed to load domains gRPC client configuration : %s", err))
exitCode = 1
return
}
_, domainsClient, domainsHandler, err := domainsAuthz.NewAuthorization(ctx, domsGrpcCfg)
if err != nil {
logger.Error(err.Error())
exitCode = 1
return
}
defer domainsHandler.Close()
logger.Info("Domains service gRPC client successfully connected to domains gRPC server " + domainsHandler.Secure())
clientsClientCfg := grpcclient.Config{}
if err := env.ParseWithOptions(&clientsClientCfg, env.Options{Prefix: envPrefixClients}); err != nil {
logger.Error(fmt.Sprintf("failed to load %s auth configuration : %s", svcName, err))
@@ -176,7 +195,7 @@ func main() {
return
}
svc := newService(clientsClient, channelsClient, nps, logger, tracer)
svc := newService(clientsClient, channelsClient, domainsClient, nps, logger, tracer)
hs := httpserver.NewServer(ctx, cancel, svcName, targetServerConfig, httpapi.MakeHandler(ctx, svc, logger, cfg.InstanceID), logger)
@@ -190,7 +209,7 @@ func main() {
})
g.Go(func() error {
handler := ws.NewHandler(nps, logger, authn, clientsClient, channelsClient)
handler := ws.NewHandler(nps, logger, authn, clientsClient, channelsClient, domainsClient)
return proxyWS(ctx, httpServerConfig, targetServerConfig, logger, handler)
})
@@ -203,8 +222,8 @@ func main() {
}
}
func newService(clientsClient grpcClientsV1.ClientsServiceClient, channels grpcChannelsV1.ChannelsServiceClient, nps messaging.PubSub, logger *slog.Logger, tracer trace.Tracer) ws.Service {
svc := ws.New(clientsClient, channels, nps)
func newService(clientsClient grpcClientsV1.ClientsServiceClient, channels grpcChannelsV1.ChannelsServiceClient, domains grpcDomainsV1.DomainsServiceClient, nps messaging.PubSub, logger *slog.Logger, tracer trace.Tracer) ws.Service {
svc := ws.New(clientsClient, channels, domains, nps)
svc = tracing.New(tracer, svc)
svc = httpapi.LoggingMiddleware(svc, logger)
counter, latency := prometheus.MakeMetrics("ws_adapter", "api")
+57 -8
View File
@@ -13,6 +13,10 @@ import (
"time"
"github.com/absmach/supermq"
grpcChannelsV1 "github.com/absmach/supermq/api/grpc/channels/v1"
grpcCommonV1 "github.com/absmach/supermq/api/grpc/common/v1"
grpcDomainsV1 "github.com/absmach/supermq/api/grpc/domains/v1"
api "github.com/absmach/supermq/api/http"
"github.com/absmach/supermq/coap"
"github.com/absmach/supermq/pkg/errors"
svcerr "github.com/absmach/supermq/pkg/errors/service"
@@ -32,13 +36,17 @@ const (
)
var (
errBadOptions = errors.New("bad options")
errMethodNotAllowed = errors.New("method not allowed")
errBadOptions = errors.New("bad options")
errMethodNotAllowed = errors.New("method not allowed")
errFailedResolveDomain = errors.New("failed to resolve domain route")
errFailedResolveChannel = errors.New("failed to resolve channel route")
)
var (
logger *slog.Logger
service coap.Service
logger *slog.Logger
service coap.Service
channels grpcChannelsV1.ChannelsServiceClient
domains grpcDomainsV1.DomainsServiceClient
)
// MakeHandler returns a HTTP handler for API endpoints.
@@ -51,9 +59,11 @@ func MakeHandler(instanceID string) http.Handler {
}
// MakeCoAPHandler creates handler for CoAP messages.
func MakeCoAPHandler(svc coap.Service, l *slog.Logger) mux.HandlerFunc {
func MakeCoAPHandler(svc coap.Service, channelsClient grpcChannelsV1.ChannelsServiceClient, domainsClient grpcDomainsV1.DomainsServiceClient, l *slog.Logger) mux.HandlerFunc {
logger = l
service = svc
channels = channelsClient
domains = domainsClient
return handler
}
@@ -138,17 +148,27 @@ func decodeMessage(msg *mux.Message) (*messaging.Message, error) {
return &messaging.Message{}, err
}
var domainID, channelID, subTopic string
var domain, channel, subTopic string
switch msg.Code() {
case codes.GET:
domainID, channelID, subTopic, err = messaging.ParseSubscribeTopic(path)
domain, channel, subTopic, err = messaging.ParseSubscribeTopic(path)
case codes.POST:
domainID, channelID, subTopic, err = messaging.ParsePublishTopic(path)
domain, channel, subTopic, err = messaging.ParsePublishTopic(path)
}
if err != nil {
return &messaging.Message{}, err
}
domainID, err := resolveDomain(msg.Context(), domain)
if err != nil {
return &messaging.Message{}, errors.Wrap(errFailedResolveDomain, err)
}
channelID, err := resolveChannel(msg.Context(), channel, domainID)
if err != nil {
return &messaging.Message{}, errors.Wrap(errFailedResolveChannel, err)
}
ret := &messaging.Message{
Protocol: protocol,
Domain: domainID,
@@ -179,3 +199,32 @@ func parseKey(msg *mux.Message) (string, error) {
}
return vars[1], nil
}
func resolveDomain(ctx context.Context, domain string) (string, error) {
if api.ValidateUUID(domain) == nil {
return domain, nil
}
d, err := domains.RetrieveByRoute(ctx, &grpcCommonV1.RetrieveByRouteReq{
Route: domain,
})
if err != nil {
return "", err
}
return d.Entity.Id, nil
}
func resolveChannel(ctx context.Context, channel, domainID string) (string, error) {
if api.ValidateUUID(channel) == nil {
return channel, nil
}
c, err := channels.RetrieveByRoute(ctx, &grpcCommonV1.RetrieveByRouteReq{
Route: channel,
DomainId: domainID,
})
if err != nil {
return "", err
}
return c.Entity.Id, nil
}
+85 -2
View File
@@ -23,7 +23,6 @@ volumes:
supermq-domains-redis-volume:
supermq-auth-redis-volume:
services:
spicedb:
image: "authzed/spicedb:v1.37.0"
@@ -69,7 +68,7 @@ services:
POSTGRES_DB: ${SMQ_SPICEDB_DB_NAME}
volumes:
- supermq-spicedb-db-volume:/var/lib/postgresql/data
command: [ "postgres", "-c", "track_commit_timestamp=on" ]
command: ["postgres", "-c", "track_commit_timestamp=on"]
auth-db:
image: postgres:16.2-alpine
@@ -975,6 +974,11 @@ services:
SMQ_CHANNELS_GRPC_CLIENT_CERT: ${SMQ_CHANNELS_GRPC_CLIENT_CERT:+/channels-grpc-client.crt}
SMQ_CHANNELS_GRPC_CLIENT_KEY: ${SMQ_CHANNELS_GRPC_CLIENT_KEY:+/channels-grpc-client.key}
SMQ_CHANNELS_GRPC_SERVER_CA_CERTS: ${SMQ_CHANNELS_GRPC_SERVER_CA_CERTS:+/channels-grpc-server-ca.crt}
SMQ_DOMAINS_GRPC_URL: ${SMQ_DOMAINS_GRPC_URL}
SMQ_DOMAINS_GRPC_TIMEOUT: ${SMQ_DOMAINS_GRPC_TIMEOUT}
SMQ_DOMAINS_GRPC_CLIENT_CERT: ${SMQ_DOMAINS_GRPC_CLIENT_CERT:+/domains-grpc-client.crt}
SMQ_DOMAINS_GRPC_CLIENT_KEY: ${SMQ_DOMAINS_GRPC_CLIENT_KEY:+/domains-grpc-client.key}
SMQ_DOMAINS_GRPC_SERVER_CA_CERTS: ${SMQ_DOMAINS_GRPC_SERVER_CA_CERTS:+/domains-grpc-server-ca.crt}
SMQ_JAEGER_URL: ${SMQ_JAEGER_URL}
SMQ_MESSAGE_BROKER_URL: ${SMQ_MESSAGE_BROKER_URL}
SMQ_JAEGER_TRACE_RATIO: ${SMQ_JAEGER_TRACE_RATIO}
@@ -1014,6 +1018,22 @@ services:
target: /channels-grpc-server-ca${SMQ_CHANNELS_GRPC_SERVER_CA_CERTS:+.crt}
bind:
create_host_path: true
# Domains gRPC mTLS client certificates
- type: bind
source: ${SMQ_DOMAINS_GRPC_CLIENT_CERT:-ssl/certs/dummy/client_cert}
target: /domains-grpc-server${SMQ_DOMAINS_GRPC_CLIENT_CERT:+.crt}
bind:
create_host_path: true
- type: bind
source: ${SMQ_DOMAINS_GRPC_CLIENT_KEY:-ssl/certs/dummy/client_key}
target: /domains-grpc-server${SMQ_DOMAINS_GRPC_CLIENT_KEY:+.key}
bind:
create_host_path: true
- type: bind
source: ${SMQ_DOMAINS_GRPC_SERVER_CA_CERTS:-ssl/certs/dummy/server_ca}
target: /domains-grpc-server-ca${SMQ_DOMAINS_GRPC_SERVER_CA_CERTS:+.crt}
bind:
create_host_path: true
http-adapter:
image: supermq/http:${SMQ_RELEASE_TAG}
@@ -1038,6 +1058,11 @@ services:
SMQ_CHANNELS_GRPC_CLIENT_CERT: ${SMQ_CHANNELS_GRPC_CLIENT_CERT:+/channels-grpc-client.crt}
SMQ_CHANNELS_GRPC_CLIENT_KEY: ${SMQ_CHANNELS_GRPC_CLIENT_KEY:+/channels-grpc-client.key}
SMQ_CHANNELS_GRPC_SERVER_CA_CERTS: ${SMQ_CHANNELS_GRPC_SERVER_CA_CERTS:+/channels-grpc-server-ca.crt}
SMQ_DOMAINS_GRPC_URL: ${SMQ_DOMAINS_GRPC_URL}
SMQ_DOMAINS_GRPC_TIMEOUT: ${SMQ_DOMAINS_GRPC_TIMEOUT}
SMQ_DOMAINS_GRPC_CLIENT_CERT: ${SMQ_DOMAINS_GRPC_CLIENT_CERT:+/domains-grpc-client.crt}
SMQ_DOMAINS_GRPC_CLIENT_KEY: ${SMQ_DOMAINS_GRPC_CLIENT_KEY:+/domains-grpc-client.key}
SMQ_DOMAINS_GRPC_SERVER_CA_CERTS: ${SMQ_DOMAINS_GRPC_SERVER_CA_CERTS:+/domains-grpc-server-ca.crt}
SMQ_AUTH_GRPC_URL: ${SMQ_AUTH_GRPC_URL}
SMQ_AUTH_GRPC_TIMEOUT: ${SMQ_AUTH_GRPC_TIMEOUT}
SMQ_AUTH_GRPC_CLIENT_CERT: ${SMQ_AUTH_GRPC_CLIENT_CERT:+/auth-grpc-client.crt}
@@ -1102,6 +1127,22 @@ services:
target: /auth-grpc-server-ca${SMQ_AUTH_GRPC_SERVER_CA_CERTS:+.crt}
bind:
create_host_path: true
# Domains gRPC mTLS client certificates
- type: bind
source: ${SMQ_DOMAINS_GRPC_CLIENT_CERT:-ssl/certs/dummy/client_cert}
target: /domains-grpc-server${SMQ_DOMAINS_GRPC_CLIENT_CERT:+.crt}
bind:
create_host_path: true
- type: bind
source: ${SMQ_DOMAINS_GRPC_CLIENT_KEY:-ssl/certs/dummy/client_key}
target: /domains-grpc-server${SMQ_DOMAINS_GRPC_CLIENT_KEY:+.key}
bind:
create_host_path: true
- type: bind
source: ${SMQ_DOMAINS_GRPC_SERVER_CA_CERTS:-ssl/certs/dummy/server_ca}
target: /domains-grpc-server-ca${SMQ_DOMAINS_GRPC_SERVER_CA_CERTS:+.crt}
bind:
create_host_path: true
coap-adapter:
image: supermq/coap:${SMQ_RELEASE_TAG}
@@ -1130,6 +1171,11 @@ services:
SMQ_CHANNELS_GRPC_CLIENT_CERT: ${SMQ_CHANNELS_GRPC_CLIENT_CERT:+/channels-grpc-client.crt}
SMQ_CHANNELS_GRPC_CLIENT_KEY: ${SMQ_CHANNELS_GRPC_CLIENT_KEY:+/channels-grpc-client.key}
SMQ_CHANNELS_GRPC_SERVER_CA_CERTS: ${SMQ_CHANNELS_GRPC_SERVER_CA_CERTS:+/channels-grpc-server-ca.crt}
SMQ_DOMAINS_GRPC_URL: ${SMQ_DOMAINS_GRPC_URL}
SMQ_DOMAINS_GRPC_TIMEOUT: ${SMQ_DOMAINS_GRPC_TIMEOUT}
SMQ_DOMAINS_GRPC_CLIENT_CERT: ${SMQ_DOMAINS_GRPC_CLIENT_CERT:+/domains-grpc-client.crt}
SMQ_DOMAINS_GRPC_CLIENT_KEY: ${SMQ_DOMAINS_GRPC_CLIENT_KEY:+/domains-grpc-client.key}
SMQ_DOMAINS_GRPC_SERVER_CA_CERTS: ${SMQ_DOMAINS_GRPC_SERVER_CA_CERTS:+/domains-grpc-server-ca.crt}
SMQ_MESSAGE_BROKER_URL: ${SMQ_MESSAGE_BROKER_URL}
SMQ_JAEGER_URL: ${SMQ_JAEGER_URL}
SMQ_JAEGER_TRACE_RATIO: ${SMQ_JAEGER_TRACE_RATIO}
@@ -1179,6 +1225,22 @@ services:
target: /channels-grpc-client-ca${SMQ_CHANNELS_GRPC_CLIENT_CA_CERTS:+.crt}
bind:
create_host_path: true
# Domains gRPC mTLS client certificates
- type: bind
source: ${SMQ_DOMAINS_GRPC_CLIENT_CERT:-ssl/certs/dummy/client_cert}
target: /domains-grpc-server${SMQ_DOMAINS_GRPC_CLIENT_CERT:+.crt}
bind:
create_host_path: true
- type: bind
source: ${SMQ_DOMAINS_GRPC_CLIENT_KEY:-ssl/certs/dummy/client_key}
target: /domains-grpc-server${SMQ_DOMAINS_GRPC_CLIENT_KEY:+.key}
bind:
create_host_path: true
- type: bind
source: ${SMQ_DOMAINS_GRPC_SERVER_CA_CERTS:-ssl/certs/dummy/server_ca}
target: /domains-grpc-server-ca${SMQ_DOMAINS_GRPC_SERVER_CA_CERTS:+.crt}
bind:
create_host_path: true
ws-adapter:
image: supermq/ws:${SMQ_RELEASE_TAG}
@@ -1203,6 +1265,11 @@ services:
SMQ_CHANNELS_GRPC_CLIENT_CERT: ${SMQ_CHANNELS_GRPC_CLIENT_CERT:+/channels-grpc-client.crt}
SMQ_CHANNELS_GRPC_CLIENT_KEY: ${SMQ_CHANNELS_GRPC_CLIENT_KEY:+/channels-grpc-client.key}
SMQ_CHANNELS_GRPC_SERVER_CA_CERTS: ${SMQ_CHANNELS_GRPC_SERVER_CA_CERTS:+/channels-grpc-server-ca.crt}
SMQ_DOMAINS_GRPC_URL: ${SMQ_DOMAINS_GRPC_URL}
SMQ_DOMAINS_GRPC_TIMEOUT: ${SMQ_DOMAINS_GRPC_TIMEOUT}
SMQ_DOMAINS_GRPC_CLIENT_CERT: ${SMQ_DOMAINS_GRPC_CLIENT_CERT:+/domains-grpc-client.crt}
SMQ_DOMAINS_GRPC_CLIENT_KEY: ${SMQ_DOMAINS_GRPC_CLIENT_KEY:+/domains-grpc-client.key}
SMQ_DOMAINS_GRPC_SERVER_CA_CERTS: ${SMQ_DOMAINS_GRPC_SERVER_CA_CERTS:+/domains-grpc-server-ca.crt}
SMQ_AUTH_GRPC_URL: ${SMQ_AUTH_GRPC_URL}
SMQ_AUTH_GRPC_TIMEOUT: ${SMQ_AUTH_GRPC_TIMEOUT}
SMQ_AUTH_GRPC_CLIENT_CERT: ${SMQ_AUTH_GRPC_CLIENT_CERT:+/auth-grpc-client.crt}
@@ -1267,6 +1334,22 @@ services:
target: /auth-grpc-server-ca${SMQ_AUTH_GRPC_SERVER_CA_CERTS:+.crt}
bind:
create_host_path: true
# Domains gRPC mTLS client certificates
- type: bind
source: ${SMQ_DOMAINS_GRPC_CLIENT_CERT:-ssl/certs/dummy/client_cert}
target: /domains-grpc-server${SMQ_DOMAINS_GRPC_CLIENT_CERT:+.crt}
bind:
create_host_path: true
- type: bind
source: ${SMQ_DOMAINS_GRPC_CLIENT_KEY:-ssl/certs/dummy/client_key}
target: /domains-grpc-server${SMQ_DOMAINS_GRPC_CLIENT_KEY:+.key}
bind:
create_host_path: true
- type: bind
source: ${SMQ_DOMAINS_GRPC_SERVER_CA_CERTS:-ssl/certs/dummy/server_ca}
target: /domains-grpc-server-ca${SMQ_DOMAINS_GRPC_SERVER_CA_CERTS:+.crt}
bind:
create_host_path: true
rabbitmq:
image: rabbitmq:4.0.5-management-alpine
+41
View File
@@ -22,6 +22,7 @@ var _ grpcDomainsV1.DomainsServiceClient = (*domainsGrpcClient)(nil)
type domainsGrpcClient struct {
deleteUserFromDomains endpoint.Endpoint
retrieveEntity endpoint.Endpoint
retrieveByRoute endpoint.Endpoint
timeout time.Duration
}
@@ -44,6 +45,14 @@ func NewDomainsClient(conn *grpc.ClientConn, timeout time.Duration) grpcDomainsV
decodeRetrieveEntityResponse,
grpcCommonV1.RetrieveEntityRes{},
).Endpoint(),
retrieveByRoute: kitgrpc.NewClient(
conn,
domainsSvcName,
"RetrieveByRoute",
encodeRetrieveByRouteRequest,
decodeRetrieveByRouteResponse,
grpcCommonV1.RetrieveEntityRes{},
).Endpoint(),
timeout: timeout,
}
}
@@ -106,3 +115,35 @@ func encodeRetrieveEntityRequest(_ context.Context, grpcReq interface{}) (interf
Id: req.ID,
}, nil
}
func (client domainsGrpcClient) RetrieveByRoute(ctx context.Context, in *grpcCommonV1.RetrieveByRouteReq, opts ...grpc.CallOption) (*grpcCommonV1.RetrieveEntityRes, error) {
ctx, cancel := context.WithTimeout(ctx, client.timeout)
defer cancel()
res, err := client.retrieveByRoute(ctx, retrieveByRouteReq{
Route: in.GetRoute(),
})
if err != nil {
return &grpcCommonV1.RetrieveEntityRes{}, grpcapi.DecodeError(err)
}
rbr := res.(retrieveEntityRes)
return &grpcCommonV1.RetrieveEntityRes{
Entity: &grpcCommonV1.EntityBasic{
Id: rbr.id,
Status: uint32(rbr.status),
},
}, nil
}
func decodeRetrieveByRouteResponse(_ context.Context, grpcRes interface{}) (interface{}, error) {
res := grpcRes.(*grpcCommonV1.RetrieveEntityRes)
return retrieveEntityRes{id: res.Entity.GetId(), status: uint8(res.Entity.GetStatus())}, nil
}
func encodeRetrieveByRouteRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
req := grpcReq.(retrieveByRouteReq)
return &grpcCommonV1.RetrieveByRouteReq{
Route: req.Route,
}, nil
}
+19
View File
@@ -43,3 +43,22 @@ func retrieveEntityEndpoint(svc domains.Service) endpoint.Endpoint {
}, nil
}
}
func retrieveByRouteEndpoint(svc domains.Service) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (interface{}, error) {
req := request.(retrieveByRouteReq)
if err := req.validate(); err != nil {
return retrieveEntityRes{}, err
}
dom, err := svc.RetrieveByRoute(ctx, req.Route)
if err != nil {
return retrieveEntityRes{}, err
}
return retrieveEntityRes{
id: dom.ID,
status: uint8(dom.Status),
}, nil
}
}
+130 -2
View File
@@ -10,11 +10,14 @@ import (
"testing"
"time"
grpcCommonV1 "github.com/absmach/supermq/api/grpc/common/v1"
grpcDomainsV1 "github.com/absmach/supermq/api/grpc/domains/v1"
apiutil "github.com/absmach/supermq/api/http/util"
"github.com/absmach/supermq/domains"
grpcapi "github.com/absmach/supermq/domains/api/grpc"
domains "github.com/absmach/supermq/domains/private"
pDomains "github.com/absmach/supermq/domains/private"
"github.com/absmach/supermq/pkg/errors"
svcerr "github.com/absmach/supermq/pkg/errors/service"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"google.golang.org/grpc"
@@ -43,7 +46,7 @@ const (
var authAddr = fmt.Sprintf("localhost:%d", port)
func startGRPCServer(svc domains.Service, port int) *grpc.Server {
func startGRPCServer(svc pDomains.Service, port int) *grpc.Server {
listener, _ := net.Listen("tcp", fmt.Sprintf(":%d", port))
server := grpc.NewServer()
grpcDomainsV1.RegisterDomainsServiceServer(server, grpcapi.NewDomainsServer(svc))
@@ -101,3 +104,128 @@ func TestDeleteUserFromDomains(t *testing.T) {
repoCall.Unset()
}
}
func TestRetrieveEntity(t *testing.T) {
conn, err := grpc.NewClient(authAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
assert.Nil(t, err, fmt.Sprintf("Unexpected error creating client connection %s", err))
grpcClient := grpcapi.NewDomainsClient(conn, time.Second)
dom := domains.Domain{
ID: id,
Status: domains.EnabledStatus,
}
cases := []struct {
desc string
token string
retrieveReq *grpcCommonV1.RetrieveEntityReq
svcRes domains.Domain
svcErr error
retrieveRes *grpcCommonV1.RetrieveEntityRes
err error
}{
{
desc: "retrieve entity with valid req",
token: validToken,
retrieveReq: &grpcCommonV1.RetrieveEntityReq{
Id: id,
},
svcRes: dom,
retrieveRes: &grpcCommonV1.RetrieveEntityRes{
Entity: &grpcCommonV1.EntityBasic{
Id: id,
Status: uint32(domains.EnabledStatus),
},
},
err: nil,
},
{
desc: "retrieve entity with empty id",
retrieveReq: &grpcCommonV1.RetrieveEntityReq{
Id: "",
},
svcRes: domains.Domain{},
retrieveRes: &grpcCommonV1.RetrieveEntityRes{},
err: apiutil.ErrMissingID,
},
{
desc: "retrieve entity with invalid id",
retrieveReq: &grpcCommonV1.RetrieveEntityReq{
Id: "invalid",
},
svcRes: domains.Domain{},
svcErr: svcerr.ErrNotFound,
retrieveRes: &grpcCommonV1.RetrieveEntityRes{},
err: svcerr.ErrNotFound,
},
}
for _, tc := range cases {
svcCall := svc.On("RetrieveEntity", mock.Anything, tc.retrieveReq.Id).Return(tc.svcRes, tc.svcErr)
dpr, err := grpcClient.RetrieveEntity(context.Background(), tc.retrieveReq)
assert.Equal(t, tc.retrieveRes.Entity, dpr.Entity, fmt.Sprintf("%s: expected %v got %v", tc.desc, tc.retrieveRes.Entity, dpr.Entity))
assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err))
svcCall.Unset()
}
}
func TestRetrieveByRoute(t *testing.T) {
conn, err := grpc.NewClient(authAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
assert.Nil(t, err, fmt.Sprintf("Unexpected error creating client connection %s", err))
grpcClient := grpcapi.NewDomainsClient(conn, time.Second)
validRoute := "validRoute"
dom := domains.Domain{
ID: id,
Route: validRoute,
Status: domains.EnabledStatus,
}
cases := []struct {
desc string
retrieveReq *grpcCommonV1.RetrieveByRouteReq
svcRes domains.Domain
svcErr error
retrieveRes *grpcCommonV1.RetrieveEntityRes
err error
}{
{
desc: "retrieve entity with valid req",
retrieveReq: &grpcCommonV1.RetrieveByRouteReq{
Route: validRoute,
},
svcRes: dom,
retrieveRes: &grpcCommonV1.RetrieveEntityRes{
Entity: &grpcCommonV1.EntityBasic{
Id: id,
Status: uint32(domains.EnabledStatus),
},
},
err: nil,
},
{
desc: "retrieve entity with empty route",
retrieveReq: &grpcCommonV1.RetrieveByRouteReq{
Route: "",
},
svcRes: domains.Domain{},
retrieveRes: &grpcCommonV1.RetrieveEntityRes{},
err: apiutil.ErrMissingRoute,
},
{
desc: "retrieve entity with invalid route",
retrieveReq: &grpcCommonV1.RetrieveByRouteReq{
Route: "invalid",
},
svcRes: domains.Domain{},
svcErr: svcerr.ErrNotFound,
retrieveRes: &grpcCommonV1.RetrieveEntityRes{},
err: svcerr.ErrNotFound,
},
}
for _, tc := range cases {
svcCall := svc.On("RetrieveByRoute", mock.Anything, tc.retrieveReq.Route).Return(tc.svcRes, tc.svcErr)
dpr, err := grpcClient.RetrieveByRoute(context.Background(), tc.retrieveReq)
assert.Equal(t, tc.retrieveRes.Entity, dpr.Entity, fmt.Sprintf("%s: expected %v got %v", tc.desc, tc.retrieveRes.Entity, dpr.Entity))
assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err))
svcCall.Unset()
}
}
+12
View File
@@ -30,3 +30,15 @@ func (req retrieveEntityReq) validate() error {
return nil
}
type retrieveByRouteReq struct {
Route string
}
func (req retrieveByRouteReq) validate() error {
if req.Route == "" {
return apiutil.ErrMissingRoute
}
return nil
}
+34
View File
@@ -19,6 +19,7 @@ type domainsGrpcServer struct {
grpcDomainsV1.UnimplementedDomainsServiceServer
deleteUserFromDomains kitgrpc.Handler
retrieveEntity kitgrpc.Handler
retrieveByRoute kitgrpc.Handler
}
func NewDomainsServer(svc domains.Service) grpcDomainsV1.DomainsServiceServer {
@@ -33,6 +34,11 @@ func NewDomainsServer(svc domains.Service) grpcDomainsV1.DomainsServiceServer {
decodeRetrieveEntityRequest,
encodeRetrieveEntityResponse,
),
retrieveByRoute: kitgrpc.NewServer(
retrieveByRouteEndpoint(svc),
decodeRetrieveByRouteRequest,
encodeRetrieveByRouteResponse,
),
}
}
@@ -83,3 +89,31 @@ func (s *domainsGrpcServer) RetrieveEntity(ctx context.Context, req *grpcCommonV
return res.(*grpcCommonV1.RetrieveEntityRes), nil
}
func decodeRetrieveByRouteRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
req := grpcReq.(*grpcCommonV1.RetrieveByRouteReq)
return retrieveByRouteReq{
Route: req.GetRoute(),
}, nil
}
func encodeRetrieveByRouteResponse(_ context.Context, grpcRes interface{}) (interface{}, error) {
res := grpcRes.(retrieveEntityRes)
return &grpcCommonV1.RetrieveEntityRes{
Entity: &grpcCommonV1.EntityBasic{
Id: res.id,
Status: uint32(res.status),
},
}, nil
}
func (s *domainsGrpcServer) RetrieveByRoute(ctx context.Context, req *grpcCommonV1.RetrieveByRouteReq) (*grpcCommonV1.RetrieveEntityRes, error) {
_, res, err := s.retrieveByRoute.ServeGRPC(ctx, req)
if err != nil {
return nil, grpcapi.EncodeError(err)
}
return res.(*grpcCommonV1.RetrieveEntityRes), nil
}
+71
View File
@@ -114,6 +114,77 @@ func (_c *DomainsServiceClient_DeleteUserFromDomains_Call) RunAndReturn(run func
return _c
}
// RetrieveByRoute provides a mock function for the type DomainsServiceClient
func (_mock *DomainsServiceClient) RetrieveByRoute(ctx context.Context, in *v10.RetrieveByRouteReq, opts ...grpc.CallOption) (*v10.RetrieveEntityRes, error) {
var tmpRet mock.Arguments
if len(opts) > 0 {
tmpRet = _mock.Called(ctx, in, opts)
} else {
tmpRet = _mock.Called(ctx, in)
}
ret := tmpRet
if len(ret) == 0 {
panic("no return value specified for RetrieveByRoute")
}
var r0 *v10.RetrieveEntityRes
var r1 error
if returnFunc, ok := ret.Get(0).(func(context.Context, *v10.RetrieveByRouteReq, []grpc.CallOption) (*v10.RetrieveEntityRes, error)); ok {
return returnFunc(ctx, in, opts)
}
if returnFunc, ok := ret.Get(0).(func(context.Context, *v10.RetrieveByRouteReq, ...grpc.CallOption) *v10.RetrieveEntityRes); ok {
r0 = returnFunc(ctx, in, opts...)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*v10.RetrieveEntityRes)
}
}
if returnFunc, ok := ret.Get(1).(func(context.Context, *v10.RetrieveByRouteReq, ...grpc.CallOption) error); ok {
r1 = returnFunc(ctx, in, opts...)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// DomainsServiceClient_RetrieveByRoute_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RetrieveByRoute'
type DomainsServiceClient_RetrieveByRoute_Call struct {
*mock.Call
}
// RetrieveByRoute is a helper method to define mock.On call
// - ctx
// - in
// - opts
func (_e *DomainsServiceClient_Expecter) RetrieveByRoute(ctx interface{}, in interface{}, opts ...interface{}) *DomainsServiceClient_RetrieveByRoute_Call {
return &DomainsServiceClient_RetrieveByRoute_Call{Call: _e.mock.On("RetrieveByRoute",
append([]interface{}{ctx, in}, opts...)...)}
}
func (_c *DomainsServiceClient_RetrieveByRoute_Call) Run(run func(ctx context.Context, in *v10.RetrieveByRouteReq, opts ...grpc.CallOption)) *DomainsServiceClient_RetrieveByRoute_Call {
_c.Call.Run(func(args mock.Arguments) {
variadicArgs := make([]grpc.CallOption, len(args)-2)
for i, a := range args[2:] {
if a != nil {
variadicArgs[i] = a.(grpc.CallOption)
}
}
run(args[0].(context.Context), args[1].(*v10.RetrieveByRouteReq), variadicArgs...)
})
return _c
}
func (_c *DomainsServiceClient_RetrieveByRoute_Call) Return(retrieveEntityRes *v10.RetrieveEntityRes, err error) *DomainsServiceClient_RetrieveByRoute_Call {
_c.Call.Return(retrieveEntityRes, err)
return _c
}
func (_c *DomainsServiceClient_RetrieveByRoute_Call) RunAndReturn(run func(ctx context.Context, in *v10.RetrieveByRouteReq, opts ...grpc.CallOption) (*v10.RetrieveEntityRes, error)) *DomainsServiceClient_RetrieveByRoute_Call {
_c.Call.Return(run)
return _c
}
// RetrieveEntity provides a mock function for the type DomainsServiceClient
func (_mock *DomainsServiceClient) RetrieveEntity(ctx context.Context, in *v10.RetrieveEntityReq, opts ...grpc.CallOption) (*v10.RetrieveEntityRes, error) {
var tmpRet mock.Arguments
+55
View File
@@ -87,6 +87,61 @@ func (_c *Service_DeleteUserFromDomains_Call) RunAndReturn(run func(ctx context.
return _c
}
// RetrieveByRoute provides a mock function for the type Service
func (_mock *Service) RetrieveByRoute(ctx context.Context, route string) (domains.Domain, error) {
ret := _mock.Called(ctx, route)
if len(ret) == 0 {
panic("no return value specified for RetrieveByRoute")
}
var r0 domains.Domain
var r1 error
if returnFunc, ok := ret.Get(0).(func(context.Context, string) (domains.Domain, error)); ok {
return returnFunc(ctx, route)
}
if returnFunc, ok := ret.Get(0).(func(context.Context, string) domains.Domain); ok {
r0 = returnFunc(ctx, route)
} else {
r0 = ret.Get(0).(domains.Domain)
}
if returnFunc, ok := ret.Get(1).(func(context.Context, string) error); ok {
r1 = returnFunc(ctx, route)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// Service_RetrieveByRoute_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RetrieveByRoute'
type Service_RetrieveByRoute_Call struct {
*mock.Call
}
// RetrieveByRoute is a helper method to define mock.On call
// - ctx
// - route
func (_e *Service_Expecter) RetrieveByRoute(ctx interface{}, route interface{}) *Service_RetrieveByRoute_Call {
return &Service_RetrieveByRoute_Call{Call: _e.mock.On("RetrieveByRoute", ctx, route)}
}
func (_c *Service_RetrieveByRoute_Call) Run(run func(ctx context.Context, route string)) *Service_RetrieveByRoute_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(string))
})
return _c
}
func (_c *Service_RetrieveByRoute_Call) Return(domain domains.Domain, err error) *Service_RetrieveByRoute_Call {
_c.Call.Return(domain, err)
return _c
}
func (_c *Service_RetrieveByRoute_Call) RunAndReturn(run func(ctx context.Context, route string) (domains.Domain, error)) *Service_RetrieveByRoute_Call {
_c.Call.Return(run)
return _c
}
// RetrieveEntity provides a mock function for the type Service
func (_mock *Service) RetrieveEntity(ctx context.Context, id string) (domains.Domain, error) {
ret := _mock.Called(ctx, id)
+10
View File
@@ -16,6 +16,7 @@ const defLimit = 100
type Service interface {
RetrieveEntity(ctx context.Context, id string) (domains.Domain, error)
DeleteUserFromDomains(ctx context.Context, id string) error
RetrieveByRoute(ctx context.Context, route string) (domains.Domain, error)
}
var _ Service = (*service)(nil)
@@ -68,3 +69,12 @@ func (svc service) DeleteUserFromDomains(ctx context.Context, id string) (err er
return nil
}
func (svc service) RetrieveByRoute(ctx context.Context, route string) (domains.Domain, error) {
dom, err := svc.repo.RetrieveDomainByRoute(ctx, route)
if err != nil {
return domains.Domain{}, errors.Wrap(svcerr.ErrViewEntity, err)
}
return domains.Domain{ID: dom.ID, Status: dom.Status}, nil
}
+9 -3
View File
@@ -18,9 +18,12 @@ import (
"github.com/absmach/mgate/pkg/session"
grpcChannelsV1 "github.com/absmach/supermq/api/grpc/channels/v1"
grpcClientsV1 "github.com/absmach/supermq/api/grpc/clients/v1"
grpcCommonV1 "github.com/absmach/supermq/api/grpc/common/v1"
grpcDomainsV1 "github.com/absmach/supermq/api/grpc/domains/v1"
apiutil "github.com/absmach/supermq/api/http/util"
chmocks "github.com/absmach/supermq/channels/mocks"
climocks "github.com/absmach/supermq/clients/mocks"
dmocks "github.com/absmach/supermq/domains/mocks"
server "github.com/absmach/supermq/http"
"github.com/absmach/supermq/http/api"
"github.com/absmach/supermq/internal/testsutil"
@@ -46,9 +49,9 @@ var (
domainID = testsutil.GenerateUUID(&testing.T{})
)
func newService(authn smqauthn.Authentication, clients grpcClientsV1.ClientsServiceClient, channels grpcChannelsV1.ChannelsServiceClient) (session.Handler, *pubsub.PubSub) {
func newService(authn smqauthn.Authentication, clients grpcClientsV1.ClientsServiceClient, channels grpcChannelsV1.ChannelsServiceClient, domains grpcDomainsV1.DomainsServiceClient) (session.Handler, *pubsub.PubSub) {
pub := new(pubsub.PubSub)
return server.NewHandler(pub, authn, clients, channels, smqlog.NewMock()), pub
return server.NewHandler(pub, authn, clients, channels, domains, smqlog.NewMock()), pub
}
func newTargetHTTPServer() *httptest.Server {
@@ -107,6 +110,7 @@ func TestPublish(t *testing.T) {
clients := new(climocks.ClientsServiceClient)
authn := new(authnMocks.Authentication)
channels := new(chmocks.ChannelsServiceClient)
domains := new(dmocks.DomainsServiceClient)
ctSenmlJSON := "application/senml+json"
ctSenmlCBOR := "application/senml+cbor"
ctJSON := "application/json"
@@ -115,7 +119,7 @@ func TestPublish(t *testing.T) {
msg := `[{"n":"current","t":-1,"v":1.6}]`
msgJSON := `{"field1":"val1","field2":"val2"}`
msgCBOR := `81A3616E6763757272656E746174206176FB3FF999999999999A`
svc, pub := newService(authn, clients, channels)
svc, pub := newService(authn, clients, channels, domains)
target := newTargetHTTPServer()
defer target.Close()
ts, err := newProxyHTPPServer(svc, target)
@@ -251,6 +255,7 @@ func TestPublish(t *testing.T) {
for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
clientsCall := clients.On("Authenticate", mock.Anything, &grpcClientsV1.AuthnReq{ClientSecret: tc.key}).Return(tc.authnRes, tc.authnErr)
domainsCall := domains.On("RetrieveByRoute", mock.Anything, mock.Anything).Return(&grpcCommonV1.RetrieveEntityRes{Entity: &grpcCommonV1.EntityBasic{Id: tc.domainID}}, nil)
channelsCall := channels.On("Authorize", mock.Anything, &grpcChannelsV1.AuthzReq{
DomainId: tc.domainID,
ChannelId: tc.chanID,
@@ -274,6 +279,7 @@ func TestPublish(t *testing.T) {
svcCall.Unset()
clientsCall.Unset()
channelsCall.Unset()
domainsCall.Unset()
})
}
}
+2 -2
View File
@@ -33,14 +33,14 @@ func MakeHandler(logger *slog.Logger, instanceID string) http.Handler {
}
r := chi.NewRouter()
r.Post("/m/{domainID}/c/{chanID}", otelhttp.NewHandler(kithttp.NewServer(
r.Post("/m/{domain}/c/{channel}", otelhttp.NewHandler(kithttp.NewServer(
sendMessageEndpoint(),
decodeRequest,
api.EncodeResponse,
opts...,
), "publish").ServeHTTP)
r.Post("/m/{domainID}/c/{chanID}/*", otelhttp.NewHandler(kithttp.NewServer(
r.Post("/m/{domain}/c/{channel}/*", otelhttp.NewHandler(kithttp.NewServer(
sendMessageEndpoint(),
decodeRequest,
api.EncodeResponse,
+47 -3
View File
@@ -15,6 +15,9 @@ import (
"github.com/absmach/mgate/pkg/session"
grpcChannelsV1 "github.com/absmach/supermq/api/grpc/channels/v1"
grpcClientsV1 "github.com/absmach/supermq/api/grpc/clients/v1"
grpcCommonV1 "github.com/absmach/supermq/api/grpc/common/v1"
grpcDomainsV1 "github.com/absmach/supermq/api/grpc/domains/v1"
api "github.com/absmach/supermq/api/http"
apiutil "github.com/absmach/supermq/api/http/util"
smqauthn "github.com/absmach/supermq/pkg/authn"
"github.com/absmach/supermq/pkg/connections"
@@ -49,6 +52,8 @@ var (
errFailedPublishToMsgBroker = errors.New("failed to publish to supermq message broker")
errMalformedTopic = mgate.NewHTTPProxyError(http.StatusBadRequest, errors.New("malformed topic"))
errMissingTopicPub = mgate.NewHTTPProxyError(http.StatusBadRequest, errors.New("failed to publish due to missing topic"))
errFailedResolveDomain = mgate.NewHTTPProxyError(http.StatusBadRequest, errors.New("failed to resolve domain route"))
errFailedResolveChannel = mgate.NewHTTPProxyError(http.StatusBadRequest, errors.New("failed to resolve channel route"))
)
// Event implements events.Event interface.
@@ -56,17 +61,19 @@ type handler struct {
publisher messaging.Publisher
clients grpcClientsV1.ClientsServiceClient
channels grpcChannelsV1.ChannelsServiceClient
domains grpcDomainsV1.DomainsServiceClient
authn smqauthn.Authentication
logger *slog.Logger
}
// NewHandler creates new Handler entity.
func NewHandler(publisher messaging.Publisher, authn smqauthn.Authentication, clients grpcClientsV1.ClientsServiceClient, channels grpcChannelsV1.ChannelsServiceClient, logger *slog.Logger) session.Handler {
func NewHandler(publisher messaging.Publisher, authn smqauthn.Authentication, clients grpcClientsV1.ClientsServiceClient, channels grpcChannelsV1.ChannelsServiceClient, domains grpcDomainsV1.DomainsServiceClient, logger *slog.Logger) session.Handler {
return &handler{
publisher: publisher,
authn: authn,
clients: clients,
channels: channels,
domains: domains,
logger: logger,
}
}
@@ -119,10 +126,18 @@ func (h *handler) Publish(ctx context.Context, topic *string, payload *[]byte) e
return errors.Wrap(errFailedPublish, errClientNotInitialized)
}
domainID, chanID, subtopic, err := messaging.ParsePublishTopic(*topic)
domain, channel, subtopic, err := messaging.ParsePublishTopic(*topic)
if err != nil {
return errors.Wrap(errMalformedTopic, err)
}
domainID, err := h.resolveDomain(ctx, domain)
if err != nil {
return errors.Wrap(errFailedResolveDomain, err)
}
channelID, err := h.resolveChannel(ctx, channel, domainID)
if err != nil {
return errors.Wrap(errFailedResolveChannel, err)
}
var clientID, clientType string
switch {
@@ -155,7 +170,7 @@ func (h *handler) Publish(ctx context.Context, topic *string, payload *[]byte) e
msg := messaging.Message{
Protocol: protocol,
Domain: domainID,
Channel: chanID,
Channel: channelID,
Subtopic: subtopic,
Payload: *payload,
Created: time.Now().UnixNano(),
@@ -203,3 +218,32 @@ func (h *handler) Unsubscribe(ctx context.Context, topics *[]string) error {
func (h *handler) Disconnect(ctx context.Context) error {
return nil
}
func (h *handler) resolveDomain(ctx context.Context, domain string) (string, error) {
if api.ValidateUUID(domain) == nil {
return domain, nil
}
d, err := h.domains.RetrieveByRoute(ctx, &grpcCommonV1.RetrieveByRouteReq{
Route: domain,
})
if err != nil {
return "", err
}
return d.Entity.Id, nil
}
func (h *handler) resolveChannel(ctx context.Context, channel, domainID string) (string, error) {
if api.ValidateUUID(channel) == nil {
return channel, nil
}
c, err := h.channels.RetrieveByRoute(ctx, &grpcCommonV1.RetrieveByRouteReq{
Route: channel,
DomainId: domainID,
})
if err != nil {
return "", err
}
return c.Entity.Id, nil
}
+4 -1
View File
@@ -17,6 +17,7 @@ import (
apiutil "github.com/absmach/supermq/api/http/util"
chmocks "github.com/absmach/supermq/channels/mocks"
clmocks "github.com/absmach/supermq/clients/mocks"
dmocks "github.com/absmach/supermq/domains/mocks"
mhttp "github.com/absmach/supermq/http"
"github.com/absmach/supermq/internal/testsutil"
smqlog "github.com/absmach/supermq/logger"
@@ -65,6 +66,7 @@ var (
channels = new(chmocks.ChannelsServiceClient)
authn = new(authnmocks.Authentication)
publisher = new(mocks.PubSub)
domains = new(dmocks.DomainsServiceClient)
)
func newHandler() session.Handler {
@@ -72,9 +74,10 @@ func newHandler() session.Handler {
authn = new(authnmocks.Authentication)
clients = new(clmocks.ClientsServiceClient)
channels = new(chmocks.ChannelsServiceClient)
domains = new(dmocks.DomainsServiceClient)
publisher = new(mocks.PubSub)
return mhttp.NewHandler(publisher, authn, clients, channels, logger)
return mhttp.NewHandler(publisher, authn, clients, channels, domains, logger)
}
func TestAuthConnect(t *testing.T) {
@@ -21,6 +21,9 @@ service ChannelsService {
rpc RetrieveEntity(common.v1.RetrieveEntityReq)
returns (common.v1.RetrieveEntityRes) {}
rpc RetrieveByRoute(common.v1.RetrieveByRouteReq)
returns (common.v1.RetrieveEntityRes) {}
}
message RemoveClientConnectionsReq {
@@ -50,3 +53,4 @@ message AuthzReq {
message AuthzRes {
bool authorized = 1;
}
+5
View File
@@ -55,3 +55,8 @@ message Connection {
string domain_id = 3;
uint32 type = 4;
}
message RetrieveByRouteReq{
string route = 1;
string domain_id = 2;
}
+2
View File
@@ -17,6 +17,8 @@ service DomainsService {
returns (DeleteUserRes) {}
rpc RetrieveEntity(common.v1.RetrieveEntityReq)
returns (common.v1.RetrieveEntityRes) {}
rpc RetrieveByRoute(common.v1.RetrieveByRouteReq)
returns (common.v1.RetrieveEntityRes) {}
}
message DeleteUserRes {
+3 -3
View File
@@ -26,7 +26,7 @@ var (
ErrMalformedTopic = errors.New("malformed topic")
ErrMalformedSubtopic = errors.New("malformed subtopic")
// Regex to group topic in format m.<domain_id>.c.<channel_id>.<sub_topic> `^\/?m\/([\w\-]+)\/c\/([\w\-]+)(\/[^?]*)?(\?.*)?$`.
topicRegExp = regexp.MustCompile(`^\/?` + MsgTopicPrefix + `\/([\w\-]+)\/` + ChannelTopicPrefix + `\/([\w\-]+)(\/[^?]*)?(\?.*)?$`)
TopicRegExp = regexp.MustCompile(`^\/?` + MsgTopicPrefix + `\/([\w\-]+)\/` + ChannelTopicPrefix + `\/([\w\-]+)(\/[^?]*)?(\?.*)?$`)
mqWildcards = "+#"
wildcards = "*>"
subtopicInvalidChars = " #+"
@@ -35,7 +35,7 @@ var (
)
func ParsePublishTopic(topic string) (domainID, chanID, subtopic string, err error) {
msgParts := topicRegExp.FindStringSubmatch(topic)
msgParts := TopicRegExp.FindStringSubmatch(topic)
if len(msgParts) < numGroups {
return "", "", "", ErrMalformedTopic
}
@@ -74,7 +74,7 @@ func ParsePublishSubtopic(subtopic string) (parseSubTopic string, err error) {
}
func ParseSubscribeTopic(topic string) (domainID string, chanID string, subtopic string, err error) {
msgParts := topicRegExp.FindStringSubmatch(topic)
msgParts := TopicRegExp.FindStringSubmatch(topic)
if len(msgParts) < numGroups {
return "", "", "", ErrMalformedTopic
}
+9 -1
View File
@@ -17,9 +17,11 @@ import (
proxy "github.com/absmach/mgate/pkg/http"
grpcChannelsV1 "github.com/absmach/supermq/api/grpc/channels/v1"
grpcClientsV1 "github.com/absmach/supermq/api/grpc/clients/v1"
grpcCommonV1 "github.com/absmach/supermq/api/grpc/common/v1"
apiutil "github.com/absmach/supermq/api/http/util"
chmocks "github.com/absmach/supermq/channels/mocks"
climocks "github.com/absmach/supermq/clients/mocks"
dmocks "github.com/absmach/supermq/domains/mocks"
adapter "github.com/absmach/supermq/http"
"github.com/absmach/supermq/http/api"
smqlog "github.com/absmach/supermq/logger"
@@ -35,14 +37,16 @@ import (
var (
channelsGRPCClient *chmocks.ChannelsServiceClient
clientsGRPCClient *climocks.ClientsServiceClient
domainsGRPCClient *dmocks.DomainsServiceClient
)
func setupMessages() (*httptest.Server, *pubsub.PubSub) {
clientsGRPCClient = new(climocks.ClientsServiceClient)
channelsGRPCClient = new(chmocks.ChannelsServiceClient)
domainsGRPCClient = new(dmocks.DomainsServiceClient)
pub := new(pubsub.PubSub)
authn := new(authnmocks.Authentication)
handler := adapter.NewHandler(pub, authn, clientsGRPCClient, channelsGRPCClient, smqlog.NewMock())
handler := adapter.NewHandler(pub, authn, clientsGRPCClient, channelsGRPCClient, domainsGRPCClient, smqlog.NewMock())
mux := api.MakeHandler(smqlog.NewMock(), "")
target := httptest.NewServer(mux)
@@ -179,6 +183,8 @@ func TestSendMessage(t *testing.T) {
authzCall := clientsGRPCClient.On("Authenticate", mock.Anything, mock.Anything).Return(tc.authRes, tc.authErr)
authnCall := channelsGRPCClient.On("Authorize", mock.Anything, mock.Anything).Return(&grpcChannelsV1.AuthzRes{Authorized: true}, nil)
svcCall := pub.On("Publish", mock.Anything, internalTopic, mock.Anything).Return(tc.svcErr)
domainsCall := domainsGRPCClient.On("RetrieveByRoute", mock.Anything, mock.Anything).Return(&grpcCommonV1.RetrieveEntityRes{Entity: &grpcCommonV1.EntityBasic{Id: tc.domainID}}, nil)
channelsCall := channelsGRPCClient.On("RetrieveByRoute", mock.Anything, mock.Anything).Return(&grpcCommonV1.RetrieveEntityRes{Entity: &grpcCommonV1.EntityBasic{Id: channelID}}, nil)
err := mgsdk.SendMessage(context.Background(), tc.domainID, tc.topic, tc.msg, tc.secret)
assert.Equal(t, tc.err, err)
if tc.err == nil {
@@ -188,6 +194,8 @@ func TestSendMessage(t *testing.T) {
svcCall.Unset()
authzCall.Unset()
authnCall.Unset()
domainsCall.Unset()
channelsCall.Unset()
})
}
}
+63 -9
View File
@@ -9,6 +9,9 @@ import (
grpcChannelsV1 "github.com/absmach/supermq/api/grpc/channels/v1"
grpcClientsV1 "github.com/absmach/supermq/api/grpc/clients/v1"
grpcCommonV1 "github.com/absmach/supermq/api/grpc/common/v1"
grpcDomainsV1 "github.com/absmach/supermq/api/grpc/domains/v1"
api "github.com/absmach/supermq/api/http"
"github.com/absmach/supermq/pkg/connections"
"github.com/absmach/supermq/pkg/errors"
svcerr "github.com/absmach/supermq/pkg/errors/service"
@@ -19,11 +22,14 @@ import (
var (
// ErrFailedSubscription indicates that client couldn't subscribe to specified channel.
ErrFailedSubscription = errors.New("failed to subscribe to a channel")
// ErrFailedPublish indicates that client couldn't publish to specified channel.
ErrFailedSubscribe = errors.New("failed to unsubscribe from topic")
// ErrEmptyTopic indicate absence of clientKey in the request.
ErrEmptyTopic = errors.New("empty topic")
// errFailedResolveDomain indicates that the domain route could not be resolved.
errFailedResolveDomain = errors.New("failed to resolve domain route")
// errFailedResolveChannel indicates that the channel route could not be resolved.
errFailedResolveChannel = errors.New("failed to resolve channel route")
)
// Service specifies web socket service API.
@@ -42,31 +48,42 @@ var _ Service = (*adapterService)(nil)
type adapterService struct {
clients grpcClientsV1.ClientsServiceClient
channels grpcChannelsV1.ChannelsServiceClient
domains grpcDomainsV1.DomainsServiceClient
pubsub messaging.PubSub
}
// New instantiates the WS adapter implementation.
func New(clients grpcClientsV1.ClientsServiceClient, channels grpcChannelsV1.ChannelsServiceClient, pubsub messaging.PubSub) Service {
func New(clients grpcClientsV1.ClientsServiceClient, channels grpcChannelsV1.ChannelsServiceClient, domains grpcDomainsV1.DomainsServiceClient, pubsub messaging.PubSub) Service {
return &adapterService{
clients: clients,
channels: channels,
domains: domains,
pubsub: pubsub,
}
}
func (svc *adapterService) Subscribe(ctx context.Context, sessionID, clientKey, domainID, chanID, subtopic string, c *Client) error {
if chanID == "" || clientKey == "" || domainID == "" {
func (svc *adapterService) Subscribe(ctx context.Context, sessionID, clientKey, domain, channel, subtopic string, c *Client) error {
if channel == "" || clientKey == "" || domain == "" {
return svcerr.ErrAuthentication
}
clientID, err := svc.authorize(ctx, clientKey, domainID, chanID, connections.Subscribe)
domainID, err := svc.resolveDomain(ctx, domain)
if err != nil {
return errFailedResolveDomain
}
channelID, err := svc.resolveChannel(ctx, channel, domainID)
if err != nil {
return errFailedResolveChannel
}
clientID, err := svc.authorize(ctx, clientKey, domainID, channelID, connections.Subscribe)
if err != nil {
return svcerr.ErrAuthorization
}
c.id = clientID
subject := messaging.EncodeTopic(domainID, chanID, subtopic)
subject := messaging.EncodeTopic(domainID, channelID, subtopic)
subCfg := messaging.SubscriberConfig{
ID: sessionID,
ClientID: clientID,
@@ -80,8 +97,16 @@ func (svc *adapterService) Subscribe(ctx context.Context, sessionID, clientKey,
return nil
}
func (svc *adapterService) Unsubscribe(ctx context.Context, sessionID, domainID, chanID, subtopic string) error {
topic := messaging.EncodeTopic(domainID, chanID, subtopic)
func (svc *adapterService) Unsubscribe(ctx context.Context, sessionID, domain, channel, subtopic string) error {
domainID, err := svc.resolveDomain(ctx, domain)
if err != nil {
return errors.Wrap(errFailedResolveDomain, err)
}
channelID, err := svc.resolveChannel(ctx, channel, domainID)
if err != nil {
return errors.Wrap(errFailedResolveChannel, err)
}
topic := messaging.EncodeTopic(domainID, channelID, subtopic)
if err := svc.pubsub.Unsubscribe(ctx, sessionID, topic); err != nil {
return errors.Wrap(ErrFailedSubscribe, err)
@@ -123,3 +148,32 @@ func (svc *adapterService) authorize(ctx context.Context, clientKey, domainID, c
return authnRes.GetId(), nil
}
func (svc *adapterService) resolveDomain(ctx context.Context, domain string) (string, error) {
if api.ValidateUUID(domain) == nil {
return domain, nil
}
d, err := svc.domains.RetrieveByRoute(ctx, &grpcCommonV1.RetrieveByRouteReq{
Route: domain,
})
if err != nil {
return "", err
}
return d.Entity.Id, nil
}
func (svc *adapterService) resolveChannel(ctx context.Context, channel, domainID string) (string, error) {
if api.ValidateUUID(channel) == nil {
return channel, nil
}
c, err := svc.channels.RetrieveByRoute(ctx, &grpcCommonV1.RetrieveByRouteReq{
Route: channel,
DomainId: domainID,
})
if err != nil {
return "", err
}
return c.Entity.Id, nil
}
+11 -4
View File
@@ -12,8 +12,10 @@ import (
grpcChannelsV1 "github.com/absmach/supermq/api/grpc/channels/v1"
grpcClientsV1 "github.com/absmach/supermq/api/grpc/clients/v1"
grpcCommonV1 "github.com/absmach/supermq/api/grpc/common/v1"
chmocks "github.com/absmach/supermq/channels/mocks"
climocks "github.com/absmach/supermq/clients/mocks"
dmocks "github.com/absmach/supermq/domains/mocks"
"github.com/absmach/supermq/internal/testsutil"
"github.com/absmach/supermq/pkg/connections"
"github.com/absmach/supermq/pkg/errors"
@@ -50,16 +52,17 @@ var (
sessionID = "sessionID"
)
func newService() (ws.Service, *mocks.PubSub, *climocks.ClientsServiceClient, *chmocks.ChannelsServiceClient) {
func newService() (ws.Service, *mocks.PubSub, *climocks.ClientsServiceClient, *chmocks.ChannelsServiceClient, *dmocks.DomainsServiceClient) {
pubsub := new(mocks.PubSub)
clients := new(climocks.ClientsServiceClient)
channels := new(chmocks.ChannelsServiceClient)
domains := new(dmocks.DomainsServiceClient)
return ws.New(clients, channels, pubsub), pubsub, clients, channels
return ws.New(clients, channels, domains, pubsub), pubsub, clients, channels, domains
}
func TestSubscribe(t *testing.T) {
svc, pubsub, clients, channels := newService()
svc, pubsub, clients, channels, domains := newService()
c := ws.NewClient(slog.Default(), nil, sessionID)
@@ -110,7 +113,7 @@ func TestSubscribe(t *testing.T) {
{
desc: "subscribe to channel with invalid clientKey",
clientKey: invalidKey,
chanID: invalidID,
chanID: chanID,
domainID: domainID,
subtopic: subTopic,
authNRes: &grpcClientsV1.AuthnRes{Authenticated: false},
@@ -194,6 +197,7 @@ func TestSubscribe(t *testing.T) {
if strings.HasPrefix(tc.clientKey, "Client") {
authReq.ClientSecret = strings.TrimPrefix(tc.clientKey, "Client ")
}
domainsCall := domains.On("RetrieveByRoute", mock.Anything, mock.Anything).Return(&grpcCommonV1.RetrieveEntityRes{Entity: &grpcCommonV1.EntityBasic{Id: tc.domainID}}, nil)
clientsCall := clients.On("Authenticate", mock.Anything, authReq).Return(tc.authNRes, tc.authNErr)
channelsCall := channels.On("Authorize", mock.Anything, &grpcChannelsV1.AuthzReq{
ClientType: policies.ClientType,
@@ -202,11 +206,14 @@ func TestSubscribe(t *testing.T) {
ChannelId: tc.chanID,
DomainId: tc.domainID,
}).Return(tc.authZRes, tc.authZErr)
channelsCall1 := channels.On("RetrieveByRoute", mock.Anything, mock.Anything).Return(&grpcCommonV1.RetrieveEntityRes{Entity: &grpcCommonV1.EntityBasic{Id: tc.chanID}}, nil)
repocall := pubsub.On("Subscribe", mock.Anything, subConfig).Return(tc.subErr)
err := svc.Subscribe(context.Background(), sessionID, tc.clientKey, tc.domainID, tc.chanID, tc.subtopic, c)
assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err))
repocall.Unset()
clientsCall.Unset()
channelsCall.Unset()
domainsCall.Unset()
channelsCall1.Unset()
}
}
+8 -5
View File
@@ -18,8 +18,10 @@ import (
"github.com/absmach/mgate/pkg/session"
grpcChannelsV1 "github.com/absmach/supermq/api/grpc/channels/v1"
grpcClientsV1 "github.com/absmach/supermq/api/grpc/clients/v1"
grpcDomainsV1 "github.com/absmach/supermq/api/grpc/domains/v1"
chmocks "github.com/absmach/supermq/channels/mocks"
climocks "github.com/absmach/supermq/clients/mocks"
dmocks "github.com/absmach/supermq/domains/mocks"
"github.com/absmach/supermq/internal/testsutil"
smqlog "github.com/absmach/supermq/logger"
smqauthn "github.com/absmach/supermq/pkg/authn"
@@ -34,7 +36,6 @@ import (
)
const (
id = "1"
clientKey = "c02ff576-ccd5-40f6-ba5f-c85377aad529"
protocol = "ws"
instanceID = "5de9b29a-feb9-11ed-be56-0242ac120002"
@@ -43,11 +44,12 @@ const (
var (
msg = []byte(`[{"n":"current","t":-1,"v":1.6}]`)
domainID = testsutil.GenerateUUID(&testing.T{})
id = testsutil.GenerateUUID(&testing.T{})
)
func newService(clients grpcClientsV1.ClientsServiceClient, channels grpcChannelsV1.ChannelsServiceClient) (ws.Service, *mocks.PubSub) {
func newService(clients grpcClientsV1.ClientsServiceClient, channels grpcChannelsV1.ChannelsServiceClient, domains grpcDomainsV1.DomainsServiceClient) (ws.Service, *mocks.PubSub) {
pubsub := new(mocks.PubSub)
return ws.New(clients, channels, pubsub), pubsub
return ws.New(clients, channels, domains, pubsub), pubsub
}
func newHTTPServer(svc ws.Service) *httptest.Server {
@@ -113,10 +115,11 @@ func TestHandshake(t *testing.T) {
clients := new(climocks.ClientsServiceClient)
channels := new(chmocks.ChannelsServiceClient)
authn := new(authnMocks.Authentication)
svc, pubsub := newService(clients, channels)
domains := new(dmocks.DomainsServiceClient)
svc, pubsub := newService(clients, channels, domains)
target := newHTTPServer(svc)
defer target.Close()
handler := ws.NewHandler(pubsub, smqlog.NewMock(), authn, clients, channels)
handler := ws.NewHandler(pubsub, smqlog.NewMock(), authn, clients, channels, domains)
ts, err := newProxyHTPPServer(handler, target)
require.Nil(t, err)
defer ts.Close()
+7 -7
View File
@@ -51,17 +51,17 @@ func handshake(ctx context.Context, svc ws.Service, logger *slog.Logger) http.Ha
client := ws.NewClient(logger, conn, sessionID)
client.SetCloseHandler(func(code int, text string) error {
return svc.Unsubscribe(ctx, sessionID, req.domainID, req.chanID, req.subtopic)
return svc.Unsubscribe(ctx, sessionID, req.domain, req.channel, req.subtopic)
})
go client.Start(ctx)
if err := svc.Subscribe(ctx, sessionID, req.clientKey, req.domainID, req.chanID, req.subtopic, client); err != nil {
if err := svc.Subscribe(ctx, sessionID, req.clientKey, req.domain, req.channel, req.subtopic, client); err != nil {
conn.Close()
return
}
logger.Debug(fmt.Sprintf("Successfully upgraded communication to WS on channel %s", req.chanID))
logger.Debug(fmt.Sprintf("Successfully upgraded communication to WS on channel %s", req.channel))
}
}
@@ -76,13 +76,13 @@ func decodeRequest(r *http.Request, logger *slog.Logger) (connReq, error) {
authKey = authKeys[0]
}
domainID := chi.URLParam(r, "domainID")
chanID := chi.URLParam(r, "chanID")
domain := chi.URLParam(r, "domain")
channel := chi.URLParam(r, "channel")
req := connReq{
clientKey: authKey,
chanID: chanID,
domainID: domainID,
channel: channel,
domain: domain,
}
subTopic := chi.URLParam(r, "*")
+2 -2
View File
@@ -5,7 +5,7 @@ package api
type connReq struct {
clientKey string
chanID string
domainID string
channel string
domain string
subtopic string
}
+2 -2
View File
@@ -40,8 +40,8 @@ func MakeHandler(ctx context.Context, svc ws.Service, l *slog.Logger, instanceID
logger = l
mux := chi.NewRouter()
mux.Get("/m/{domainID}/c/{chanID}", handshake(ctx, svc, l))
mux.Get("/m/{domainID}/c/{chanID}/*", handshake(ctx, svc, l))
mux.Get("/m/{domain}/c/{channel}", handshake(ctx, svc, l))
mux.Get("/m/{domain}/c/{channel}/*", handshake(ctx, svc, l))
mux.Get("/health", supermq.Health(service, instanceID))
mux.Handle("/metrics", promhttp.Handler())
+62 -4
View File
@@ -15,6 +15,9 @@ import (
"github.com/absmach/mgate/pkg/session"
grpcChannelsV1 "github.com/absmach/supermq/api/grpc/channels/v1"
grpcClientsV1 "github.com/absmach/supermq/api/grpc/clients/v1"
grpcCommonV1 "github.com/absmach/supermq/api/grpc/common/v1"
grpcDomainsV1 "github.com/absmach/supermq/api/grpc/domains/v1"
api "github.com/absmach/supermq/api/http"
apiutil "github.com/absmach/supermq/api/http/util"
smqauthn "github.com/absmach/supermq/pkg/authn"
"github.com/absmach/supermq/pkg/connections"
@@ -50,18 +53,20 @@ type handler struct {
pubsub messaging.PubSub
clients grpcClientsV1.ClientsServiceClient
channels grpcChannelsV1.ChannelsServiceClient
domains grpcDomainsV1.DomainsServiceClient
authn smqauthn.Authentication
logger *slog.Logger
}
// NewHandler creates new Handler entity.
func NewHandler(pubsub messaging.PubSub, logger *slog.Logger, authn smqauthn.Authentication, clients grpcClientsV1.ClientsServiceClient, channels grpcChannelsV1.ChannelsServiceClient) session.Handler {
func NewHandler(pubsub messaging.PubSub, logger *slog.Logger, authn smqauthn.Authentication, clients grpcClientsV1.ClientsServiceClient, channels grpcChannelsV1.ChannelsServiceClient, domains grpcDomainsV1.DomainsServiceClient) session.Handler {
return &handler{
logger: logger,
pubsub: pubsub,
authn: authn,
clients: clients,
channels: channels,
domains: domains,
}
}
@@ -90,10 +95,18 @@ func (h *handler) AuthPublish(ctx context.Context, topic *string, payload *[]byt
token = string(s.Password)
}
domainID, chanID, _, err := messaging.ParsePublishTopic(*topic)
domain, channel, _, err := messaging.ParsePublishTopic(*topic)
if err != nil {
return err
}
domainID, err := h.resolveDomain(ctx, domain)
if err != nil {
return mgate.NewHTTPProxyError(http.StatusBadRequest, errors.Wrap(errFailedResolveDomain, err))
}
chanID, err := h.resolveChannel(ctx, channel, domainID)
if err != nil {
return mgate.NewHTTPProxyError(http.StatusBadRequest, errors.Wrap(errFailedResolveChannel, err))
}
clientID, clientType, err := h.authAccess(ctx, token, domainID, chanID, connections.Publish)
if err != nil {
@@ -119,10 +132,18 @@ func (h *handler) AuthSubscribe(ctx context.Context, topics *[]string) error {
}
for _, topic := range *topics {
domainID, chanID, _, err := messaging.ParseSubscribeTopic(topic)
domain, channel, _, err := messaging.ParseSubscribeTopic(topic)
if err != nil {
return err
}
domainID, err := h.resolveDomain(ctx, domain)
if err != nil {
return mgate.NewHTTPProxyError(http.StatusBadRequest, errors.Wrap(errFailedResolveDomain, err))
}
chanID, err := h.resolveChannel(ctx, channel, domainID)
if err != nil {
return mgate.NewHTTPProxyError(http.StatusBadRequest, errors.Wrap(errFailedResolveChannel, err))
}
if _, _, err := h.authAccess(ctx, string(s.Password), domainID, chanID, connections.Subscribe); err != nil {
return err
}
@@ -148,10 +169,18 @@ func (h *handler) Publish(ctx context.Context, topic *string, payload *[]byte) e
return nil
}
domainID, chanID, subtopic, err := messaging.ParsePublishTopic(*topic)
domain, channel, subtopic, err := messaging.ParsePublishTopic(*topic)
if err != nil {
return errors.Wrap(errFailedPublish, err)
}
domainID, err := h.resolveDomain(ctx, domain)
if err != nil {
return mgate.NewHTTPProxyError(http.StatusBadRequest, errors.Wrap(errFailedResolveDomain, err))
}
chanID, err := h.resolveChannel(ctx, channel, domainID)
if err != nil {
return mgate.NewHTTPProxyError(http.StatusBadRequest, errors.Wrap(errFailedResolveChannel, err))
}
msg := messaging.Message{
Protocol: protocol,
@@ -228,6 +257,35 @@ func (h *handler) authAccess(ctx context.Context, token, domainID, chanID string
return clientID, clientType, nil
}
func (h *handler) resolveDomain(ctx context.Context, domain string) (string, error) {
if api.ValidateUUID(domain) == nil {
return domain, nil
}
d, err := h.domains.RetrieveByRoute(ctx, &grpcCommonV1.RetrieveByRouteReq{
Route: domain,
})
if err != nil {
return "", err
}
return d.Entity.Id, nil
}
func (h *handler) resolveChannel(ctx context.Context, channel, domainID string) (string, error) {
if api.ValidateUUID(channel) == nil {
return channel, nil
}
c, err := h.channels.RetrieveByRoute(ctx, &grpcCommonV1.RetrieveByRouteReq{
Route: channel,
DomainId: domainID,
})
if err != nil {
return "", err
}
return c.Entity.Id, nil
}
// extractClientSecret returns value of the client secret. If there is no client key - an empty value is returned.
func extractClientSecret(token string) string {
if !strings.HasPrefix(token, apiutil.ClientPrefix) {