Add grpc server skeleton

Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>
This commit is contained in:
Darko Draskovic
2023-03-28 17:17:12 +02:00
parent 122b214299
commit 6f014156b3
11 changed files with 579 additions and 499 deletions
+43
View File
@@ -1 +1,44 @@
package grpc
import (
"context"
kitot "github.com/go-kit/kit/tracing/opentracing"
kitgrpc "github.com/go-kit/kit/transport/grpc"
"github.com/opentracing/opentracing-go"
"github.com/ultravioletrs/manager/manager"
)
type grpcServer struct {
createDomain kitgrpc.Handler
manager.UnimplementedManagerServiceServer
}
// NewServer returns new AuthServiceServer instance.
func NewServer(tracer opentracing.Tracer, svc manager.Service) manager.ManagerServiceServer {
return &grpcServer{
createDomain: kitgrpc.NewServer(
kitot.TraceServer(tracer, "createDomain")(createDomainEndpoint(svc)),
decodeCreateDomainRequest,
encodeCreateDomainResponse,
),
}
}
func decodeCreateDomainRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
req := grpcReq.(*manager.CreateDomainRequest)
return createDomainReq{Pool: req.GetPool(), Volume: req.GetVolume(), Domain: req.GetDomain()}, nil
}
func encodeCreateDomainResponse(_ context.Context, response interface{}) (interface{}, error) {
res := response.(createDomainRes)
return manager.CreateDomainResponse{Name: res.Name}, nil
}
func (s *grpcServer) CreateDomain(ctx context.Context, req *manager.CreateDomainRequest) (*manager.CreateDomainResponse, error) {
_, res, err := s.createDomain.ServeGRPC(ctx, req)
if err != nil {
return nil, err
}
return res.(*manager.CreateDomainResponse), nil
}
+37 -407
View File
@@ -4,11 +4,12 @@
// protoc v3.12.4
// source: proto/manager.proto
// import "google/protobuf/timestamp.proto";
package manager
import (
proto "github.com/golang/protobuf/proto"
timestamp "github.com/golang/protobuf/ptypes/timestamp"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
@@ -26,273 +27,6 @@ const (
// of the legacy proto package is being used.
const _ = proto.ProtoPackageIsVersion4
type HealthRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
}
func (x *HealthRequest) Reset() {
*x = HealthRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_proto_manager_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *HealthRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*HealthRequest) ProtoMessage() {}
func (x *HealthRequest) ProtoReflect() protoreflect.Message {
mi := &file_proto_manager_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use HealthRequest.ProtoReflect.Descriptor instead.
func (*HealthRequest) Descriptor() ([]byte, []int) {
return file_proto_manager_proto_rawDescGZIP(), []int{0}
}
type HealthResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Status string `protobuf:"bytes,1,opt,name=status,proto3" json:"status,omitempty"`
}
func (x *HealthResponse) Reset() {
*x = HealthResponse{}
if protoimpl.UnsafeEnabled {
mi := &file_proto_manager_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *HealthResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*HealthResponse) ProtoMessage() {}
func (x *HealthResponse) ProtoReflect() protoreflect.Message {
mi := &file_proto_manager_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use HealthResponse.ProtoReflect.Descriptor instead.
func (*HealthResponse) Descriptor() ([]byte, []int) {
return file_proto_manager_proto_rawDescGZIP(), []int{1}
}
func (x *HealthResponse) GetStatus() string {
if x != nil {
return x.Status
}
return ""
}
type RunRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Name string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"`
Description string `protobuf:"bytes,3,opt,name=description,proto3" json:"description,omitempty"`
Status string `protobuf:"bytes,4,opt,name=status,proto3" json:"status,omitempty"`
Owner string `protobuf:"bytes,5,opt,name=owner,proto3" json:"owner,omitempty"`
StartTime *timestamp.Timestamp `protobuf:"bytes,6,opt,name=start_time,json=startTime,proto3" json:"start_time,omitempty"`
EndTime *timestamp.Timestamp `protobuf:"bytes,7,opt,name=end_time,json=endTime,proto3" json:"end_time,omitempty"`
Datasets []string `protobuf:"bytes,8,rep,name=datasets,proto3" json:"datasets,omitempty"`
Algorithms []string `protobuf:"bytes,9,rep,name=algorithms,proto3" json:"algorithms,omitempty"`
DatasetProviders []string `protobuf:"bytes,10,rep,name=dataset_providers,json=datasetProviders,proto3" json:"dataset_providers,omitempty"`
AlgorithmProviders []string `protobuf:"bytes,11,rep,name=algorithm_providers,json=algorithmProviders,proto3" json:"algorithm_providers,omitempty"`
ResultConsumers []string `protobuf:"bytes,13,rep,name=result_consumers,json=resultConsumers,proto3" json:"result_consumers,omitempty"`
Ttl int32 `protobuf:"varint,12,opt,name=ttl,proto3" json:"ttl,omitempty"`
}
func (x *RunRequest) Reset() {
*x = RunRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_proto_manager_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *RunRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*RunRequest) ProtoMessage() {}
func (x *RunRequest) ProtoReflect() protoreflect.Message {
mi := &file_proto_manager_proto_msgTypes[2]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use RunRequest.ProtoReflect.Descriptor instead.
func (*RunRequest) Descriptor() ([]byte, []int) {
return file_proto_manager_proto_rawDescGZIP(), []int{2}
}
func (x *RunRequest) GetName() string {
if x != nil {
return x.Name
}
return ""
}
func (x *RunRequest) GetDescription() string {
if x != nil {
return x.Description
}
return ""
}
func (x *RunRequest) GetStatus() string {
if x != nil {
return x.Status
}
return ""
}
func (x *RunRequest) GetOwner() string {
if x != nil {
return x.Owner
}
return ""
}
func (x *RunRequest) GetStartTime() *timestamp.Timestamp {
if x != nil {
return x.StartTime
}
return nil
}
func (x *RunRequest) GetEndTime() *timestamp.Timestamp {
if x != nil {
return x.EndTime
}
return nil
}
func (x *RunRequest) GetDatasets() []string {
if x != nil {
return x.Datasets
}
return nil
}
func (x *RunRequest) GetAlgorithms() []string {
if x != nil {
return x.Algorithms
}
return nil
}
func (x *RunRequest) GetDatasetProviders() []string {
if x != nil {
return x.DatasetProviders
}
return nil
}
func (x *RunRequest) GetAlgorithmProviders() []string {
if x != nil {
return x.AlgorithmProviders
}
return nil
}
func (x *RunRequest) GetResultConsumers() []string {
if x != nil {
return x.ResultConsumers
}
return nil
}
func (x *RunRequest) GetTtl() int32 {
if x != nil {
return x.Ttl
}
return 0
}
type RunResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Message string `protobuf:"bytes,1,opt,name=message,proto3" json:"message,omitempty"`
}
func (x *RunResponse) Reset() {
*x = RunResponse{}
if protoimpl.UnsafeEnabled {
mi := &file_proto_manager_proto_msgTypes[3]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *RunResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*RunResponse) ProtoMessage() {}
func (x *RunResponse) ProtoReflect() protoreflect.Message {
mi := &file_proto_manager_proto_msgTypes[3]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use RunResponse.ProtoReflect.Descriptor instead.
func (*RunResponse) Descriptor() ([]byte, []int) {
return file_proto_manager_proto_rawDescGZIP(), []int{3}
}
func (x *RunResponse) GetMessage() string {
if x != nil {
return x.Message
}
return ""
}
type CreateDomainRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
@@ -306,7 +40,7 @@ type CreateDomainRequest struct {
func (x *CreateDomainRequest) Reset() {
*x = CreateDomainRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_proto_manager_proto_msgTypes[4]
mi := &file_proto_manager_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -319,7 +53,7 @@ func (x *CreateDomainRequest) String() string {
func (*CreateDomainRequest) ProtoMessage() {}
func (x *CreateDomainRequest) ProtoReflect() protoreflect.Message {
mi := &file_proto_manager_proto_msgTypes[4]
mi := &file_proto_manager_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -332,7 +66,7 @@ func (x *CreateDomainRequest) ProtoReflect() protoreflect.Message {
// Deprecated: Use CreateDomainRequest.ProtoReflect.Descriptor instead.
func (*CreateDomainRequest) Descriptor() ([]byte, []int) {
return file_proto_manager_proto_rawDescGZIP(), []int{4}
return file_proto_manager_proto_rawDescGZIP(), []int{0}
}
func (x *CreateDomainRequest) GetPool() string {
@@ -367,7 +101,7 @@ type CreateDomainResponse struct {
func (x *CreateDomainResponse) Reset() {
*x = CreateDomainResponse{}
if protoimpl.UnsafeEnabled {
mi := &file_proto_manager_proto_msgTypes[5]
mi := &file_proto_manager_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -380,7 +114,7 @@ func (x *CreateDomainResponse) String() string {
func (*CreateDomainResponse) ProtoMessage() {}
func (x *CreateDomainResponse) ProtoReflect() protoreflect.Message {
mi := &file_proto_manager_proto_msgTypes[5]
mi := &file_proto_manager_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -393,7 +127,7 @@ func (x *CreateDomainResponse) ProtoReflect() protoreflect.Message {
// Deprecated: Use CreateDomainResponse.ProtoReflect.Descriptor instead.
func (*CreateDomainResponse) Descriptor() ([]byte, []int) {
return file_proto_manager_proto_rawDescGZIP(), []int{5}
return file_proto_manager_proto_rawDescGZIP(), []int{1}
}
func (x *CreateDomainResponse) GetName() string {
@@ -408,68 +142,23 @@ var File_proto_manager_proto protoreflect.FileDescriptor
var file_proto_manager_proto_rawDesc = []byte{
0x0a, 0x13, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x2e,
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0d, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x5f, 0x70,
0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f,
0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e,
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x0f, 0x0a, 0x0d, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x52,
0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x28, 0x0a, 0x0e, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68,
0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74,
0x75, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73,
0x22, 0xb9, 0x03, 0x0a, 0x0a, 0x52, 0x75, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12,
0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e,
0x61, 0x6d, 0x65, 0x12, 0x20, 0x0a, 0x0b, 0x64, 0x65, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69,
0x6f, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x64, 0x65, 0x73, 0x63, 0x72, 0x69,
0x70, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18,
0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x14, 0x0a,
0x05, 0x6f, 0x77, 0x6e, 0x65, 0x72, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x6f, 0x77,
0x6e, 0x65, 0x72, 0x12, 0x39, 0x0a, 0x0a, 0x73, 0x74, 0x61, 0x72, 0x74, 0x5f, 0x74, 0x69, 0x6d,
0x65, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65,
0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74,
0x61, 0x6d, 0x70, 0x52, 0x09, 0x73, 0x74, 0x61, 0x72, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x35,
0x0a, 0x08, 0x65, 0x6e, 0x64, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x07, 0x20, 0x01, 0x28, 0x0b,
0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62,
0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x07, 0x65, 0x6e,
0x64, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x64, 0x61, 0x74, 0x61, 0x73, 0x65, 0x74,
0x73, 0x18, 0x08, 0x20, 0x03, 0x28, 0x09, 0x52, 0x08, 0x64, 0x61, 0x74, 0x61, 0x73, 0x65, 0x74,
0x73, 0x12, 0x1e, 0x0a, 0x0a, 0x61, 0x6c, 0x67, 0x6f, 0x72, 0x69, 0x74, 0x68, 0x6d, 0x73, 0x18,
0x09, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0a, 0x61, 0x6c, 0x67, 0x6f, 0x72, 0x69, 0x74, 0x68, 0x6d,
0x73, 0x12, 0x2b, 0x0a, 0x11, 0x64, 0x61, 0x74, 0x61, 0x73, 0x65, 0x74, 0x5f, 0x70, 0x72, 0x6f,
0x76, 0x69, 0x64, 0x65, 0x72, 0x73, 0x18, 0x0a, 0x20, 0x03, 0x28, 0x09, 0x52, 0x10, 0x64, 0x61,
0x74, 0x61, 0x73, 0x65, 0x74, 0x50, 0x72, 0x6f, 0x76, 0x69, 0x64, 0x65, 0x72, 0x73, 0x12, 0x2f,
0x0a, 0x13, 0x61, 0x6c, 0x67, 0x6f, 0x72, 0x69, 0x74, 0x68, 0x6d, 0x5f, 0x70, 0x72, 0x6f, 0x76,
0x69, 0x64, 0x65, 0x72, 0x73, 0x18, 0x0b, 0x20, 0x03, 0x28, 0x09, 0x52, 0x12, 0x61, 0x6c, 0x67,
0x6f, 0x72, 0x69, 0x74, 0x68, 0x6d, 0x50, 0x72, 0x6f, 0x76, 0x69, 0x64, 0x65, 0x72, 0x73, 0x12,
0x29, 0x0a, 0x10, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x5f, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6d,
0x65, 0x72, 0x73, 0x18, 0x0d, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0f, 0x72, 0x65, 0x73, 0x75, 0x6c,
0x74, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x72, 0x73, 0x12, 0x10, 0x0a, 0x03, 0x74, 0x74,
0x6c, 0x18, 0x0c, 0x20, 0x01, 0x28, 0x05, 0x52, 0x03, 0x74, 0x74, 0x6c, 0x22, 0x27, 0x0a, 0x0b,
0x52, 0x75, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x6d,
0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6d, 0x65,
0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x59, 0x0a, 0x13, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x44,
0x6f, 0x6d, 0x61, 0x69, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04,
0x70, 0x6f, 0x6f, 0x6c, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x70, 0x6f, 0x6f, 0x6c,
0x12, 0x16, 0x0a, 0x06, 0x76, 0x6f, 0x6c, 0x75, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09,
0x52, 0x06, 0x76, 0x6f, 0x6c, 0x75, 0x6d, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x64, 0x6f, 0x6d, 0x61,
0x69, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x64, 0x6f, 0x6d, 0x61, 0x69, 0x6e,
0x22, 0x2a, 0x0a, 0x14, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x44, 0x6f, 0x6d, 0x61, 0x69, 0x6e,
0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65,
0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x32, 0xf4, 0x01, 0x0a,
0x0e, 0x4d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12,
0x47, 0x0a, 0x06, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x12, 0x1c, 0x2e, 0x6d, 0x61, 0x6e, 0x61,
0x67, 0x65, 0x72, 0x5f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68,
0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65,
0x72, 0x5f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x52, 0x65,
0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x3e, 0x0a, 0x03, 0x52, 0x75, 0x6e, 0x12,
0x19, 0x2e, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x5f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e,
0x52, 0x75, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1a, 0x2e, 0x6d, 0x61, 0x6e,
0x61, 0x67, 0x65, 0x72, 0x5f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x52, 0x75, 0x6e, 0x52, 0x65,
0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x59, 0x0a, 0x0c, 0x43, 0x72, 0x65, 0x61,
0x74, 0x65, 0x44, 0x6f, 0x6d, 0x61, 0x69, 0x6e, 0x12, 0x22, 0x2e, 0x6d, 0x61, 0x6e, 0x61, 0x67,
0x65, 0x72, 0x5f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x44,
0x6f, 0x6d, 0x61, 0x69, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x23, 0x2e, 0x6d,
0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x5f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x43, 0x72, 0x65,
0x61, 0x74, 0x65, 0x44, 0x6f, 0x6d, 0x61, 0x69, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73,
0x65, 0x22, 0x00, 0x42, 0x0b, 0x5a, 0x09, 0x2e, 0x2f, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72,
0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
0x72, 0x6f, 0x74, 0x6f, 0x22, 0x59, 0x0a, 0x13, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x44, 0x6f,
0x6d, 0x61, 0x69, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x70,
0x6f, 0x6f, 0x6c, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x70, 0x6f, 0x6f, 0x6c, 0x12,
0x16, 0x0a, 0x06, 0x76, 0x6f, 0x6c, 0x75, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52,
0x06, 0x76, 0x6f, 0x6c, 0x75, 0x6d, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x64, 0x6f, 0x6d, 0x61, 0x69,
0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x64, 0x6f, 0x6d, 0x61, 0x69, 0x6e, 0x22,
0x2a, 0x0a, 0x14, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x44, 0x6f, 0x6d, 0x61, 0x69, 0x6e, 0x52,
0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18,
0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x32, 0x6b, 0x0a, 0x0e, 0x4d,
0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x59, 0x0a,
0x0c, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x44, 0x6f, 0x6d, 0x61, 0x69, 0x6e, 0x12, 0x22, 0x2e,
0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x5f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x43, 0x72,
0x65, 0x61, 0x74, 0x65, 0x44, 0x6f, 0x6d, 0x61, 0x69, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73,
0x74, 0x1a, 0x23, 0x2e, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x5f, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x44, 0x6f, 0x6d, 0x61, 0x69, 0x6e, 0x52, 0x65,
0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x0b, 0x5a, 0x09, 0x2e, 0x2f, 0x6d, 0x61,
0x6e, 0x61, 0x67, 0x65, 0x72, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
@@ -484,30 +173,19 @@ func file_proto_manager_proto_rawDescGZIP() []byte {
return file_proto_manager_proto_rawDescData
}
var file_proto_manager_proto_msgTypes = make([]protoimpl.MessageInfo, 6)
var file_proto_manager_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
var file_proto_manager_proto_goTypes = []interface{}{
(*HealthRequest)(nil), // 0: manager_proto.HealthRequest
(*HealthResponse)(nil), // 1: manager_proto.HealthResponse
(*RunRequest)(nil), // 2: manager_proto.RunRequest
(*RunResponse)(nil), // 3: manager_proto.RunResponse
(*CreateDomainRequest)(nil), // 4: manager_proto.CreateDomainRequest
(*CreateDomainResponse)(nil), // 5: manager_proto.CreateDomainResponse
(*timestamp.Timestamp)(nil), // 6: google.protobuf.Timestamp
(*CreateDomainRequest)(nil), // 0: manager_proto.CreateDomainRequest
(*CreateDomainResponse)(nil), // 1: manager_proto.CreateDomainResponse
}
var file_proto_manager_proto_depIdxs = []int32{
6, // 0: manager_proto.RunRequest.start_time:type_name -> google.protobuf.Timestamp
6, // 1: manager_proto.RunRequest.end_time:type_name -> google.protobuf.Timestamp
0, // 2: manager_proto.ManagerService.Health:input_type -> manager_proto.HealthRequest
2, // 3: manager_proto.ManagerService.Run:input_type -> manager_proto.RunRequest
4, // 4: manager_proto.ManagerService.CreateDomain:input_type -> manager_proto.CreateDomainRequest
1, // 5: manager_proto.ManagerService.Health:output_type -> manager_proto.HealthResponse
3, // 6: manager_proto.ManagerService.Run:output_type -> manager_proto.RunResponse
5, // 7: manager_proto.ManagerService.CreateDomain:output_type -> manager_proto.CreateDomainResponse
5, // [5:8] is the sub-list for method output_type
2, // [2:5] is the sub-list for method input_type
2, // [2:2] is the sub-list for extension type_name
2, // [2:2] is the sub-list for extension extendee
0, // [0:2] is the sub-list for field type_name
0, // 0: manager_proto.ManagerService.CreateDomain:input_type -> manager_proto.CreateDomainRequest
1, // 1: manager_proto.ManagerService.CreateDomain:output_type -> manager_proto.CreateDomainResponse
1, // [1:2] is the sub-list for method output_type
0, // [0:1] is the sub-list for method input_type
0, // [0:0] is the sub-list for extension type_name
0, // [0:0] is the sub-list for extension extendee
0, // [0:0] is the sub-list for field type_name
}
func init() { file_proto_manager_proto_init() }
@@ -517,54 +195,6 @@ func file_proto_manager_proto_init() {
}
if !protoimpl.UnsafeEnabled {
file_proto_manager_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*HealthRequest); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_proto_manager_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*HealthResponse); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_proto_manager_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*RunRequest); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_proto_manager_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*RunResponse); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_proto_manager_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*CreateDomainRequest); i {
case 0:
return &v.state
@@ -576,7 +206,7 @@ func file_proto_manager_proto_init() {
return nil
}
}
file_proto_manager_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} {
file_proto_manager_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*CreateDomainResponse); i {
case 0:
return &v.state
@@ -595,7 +225,7 @@ func file_proto_manager_proto_init() {
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_proto_manager_proto_rawDesc,
NumEnums: 0,
NumMessages: 6,
NumMessages: 2,
NumExtensions: 0,
NumServices: 1,
},
+4 -72
View File
@@ -18,8 +18,8 @@ const _ = grpc.SupportPackageIsVersion7
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type ManagerServiceClient interface {
Health(ctx context.Context, in *HealthRequest, opts ...grpc.CallOption) (*HealthResponse, error)
Run(ctx context.Context, in *RunRequest, opts ...grpc.CallOption) (*RunResponse, error)
// rpc Health(HealthRequest) returns (HealthResponse) {}
// rpc Run(RunRequest) returns (RunResponse) {}
CreateDomain(ctx context.Context, in *CreateDomainRequest, opts ...grpc.CallOption) (*CreateDomainResponse, error)
}
@@ -31,24 +31,6 @@ func NewManagerServiceClient(cc grpc.ClientConnInterface) ManagerServiceClient {
return &managerServiceClient{cc}
}
func (c *managerServiceClient) Health(ctx context.Context, in *HealthRequest, opts ...grpc.CallOption) (*HealthResponse, error) {
out := new(HealthResponse)
err := c.cc.Invoke(ctx, "/manager_proto.ManagerService/Health", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *managerServiceClient) Run(ctx context.Context, in *RunRequest, opts ...grpc.CallOption) (*RunResponse, error) {
out := new(RunResponse)
err := c.cc.Invoke(ctx, "/manager_proto.ManagerService/Run", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *managerServiceClient) CreateDomain(ctx context.Context, in *CreateDomainRequest, opts ...grpc.CallOption) (*CreateDomainResponse, error) {
out := new(CreateDomainResponse)
err := c.cc.Invoke(ctx, "/manager_proto.ManagerService/CreateDomain", in, out, opts...)
@@ -62,8 +44,8 @@ func (c *managerServiceClient) CreateDomain(ctx context.Context, in *CreateDomai
// All implementations must embed UnimplementedManagerServiceServer
// for forward compatibility
type ManagerServiceServer interface {
Health(context.Context, *HealthRequest) (*HealthResponse, error)
Run(context.Context, *RunRequest) (*RunResponse, error)
// rpc Health(HealthRequest) returns (HealthResponse) {}
// rpc Run(RunRequest) returns (RunResponse) {}
CreateDomain(context.Context, *CreateDomainRequest) (*CreateDomainResponse, error)
mustEmbedUnimplementedManagerServiceServer()
}
@@ -72,12 +54,6 @@ type ManagerServiceServer interface {
type UnimplementedManagerServiceServer struct {
}
func (UnimplementedManagerServiceServer) Health(context.Context, *HealthRequest) (*HealthResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method Health not implemented")
}
func (UnimplementedManagerServiceServer) Run(context.Context, *RunRequest) (*RunResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method Run not implemented")
}
func (UnimplementedManagerServiceServer) CreateDomain(context.Context, *CreateDomainRequest) (*CreateDomainResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method CreateDomain not implemented")
}
@@ -94,42 +70,6 @@ func RegisterManagerServiceServer(s grpc.ServiceRegistrar, srv ManagerServiceSer
s.RegisterService(&ManagerService_ServiceDesc, srv)
}
func _ManagerService_Health_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(HealthRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ManagerServiceServer).Health(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/manager_proto.ManagerService/Health",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ManagerServiceServer).Health(ctx, req.(*HealthRequest))
}
return interceptor(ctx, in, info, handler)
}
func _ManagerService_Run_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(RunRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ManagerServiceServer).Run(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/manager_proto.ManagerService/Run",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ManagerServiceServer).Run(ctx, req.(*RunRequest))
}
return interceptor(ctx, in, info, handler)
}
func _ManagerService_CreateDomain_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(CreateDomainRequest)
if err := dec(in); err != nil {
@@ -155,14 +95,6 @@ var ManagerService_ServiceDesc = grpc.ServiceDesc{
ServiceName: "manager_proto.ManagerService",
HandlerType: (*ManagerServiceServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "Health",
Handler: _ManagerService_Health_Handler,
},
{
MethodName: "Run",
Handler: _ManagerService_Run_Handler,
},
{
MethodName: "CreateDomain",
Handler: _ManagerService_CreateDomain_Handler,
+20 -20
View File
@@ -1,36 +1,36 @@
syntax = "proto3";
import "google/protobuf/timestamp.proto";
// import "google/protobuf/timestamp.proto";
package manager_proto;
option go_package = "./manager";
service ManagerService {
rpc Health(HealthRequest) returns (HealthResponse) {}
rpc Run(RunRequest) returns (RunResponse) {}
// rpc Health(HealthRequest) returns (HealthResponse) {}
// rpc Run(RunRequest) returns (RunResponse) {}
rpc CreateDomain(CreateDomainRequest) returns (CreateDomainResponse) {}
}
message HealthRequest {}
message HealthResponse { string status = 1; }
// message HealthRequest {}
// message HealthResponse { string status = 1; }
message RunRequest {
string name = 2;
string description = 3;
string status = 4;
string owner = 5;
google.protobuf.Timestamp start_time = 6;
google.protobuf.Timestamp end_time = 7;
repeated string datasets = 8;
repeated string algorithms = 9;
repeated string dataset_providers = 10;
repeated string algorithm_providers = 11;
repeated string result_consumers = 13;
int32 ttl = 12;
}
// message RunRequest {
// string name = 2;
// string description = 3;
// string status = 4;
// string owner = 5;
// google.protobuf.Timestamp start_time = 6;
// google.protobuf.Timestamp end_time = 7;
// repeated string datasets = 8;
// repeated string algorithms = 9;
// repeated string dataset_providers = 10;
// repeated string algorithm_providers = 11;
// repeated string result_consumers = 13;
// int32 ttl = 12;
// }
message RunResponse { string message = 1; }
// message RunResponse { string message = 1; }
message CreateDomainRequest {
string pool = 1;
+54
View File
@@ -0,0 +1,54 @@
# grpc
[gRPC](http://www.grpc.io/) is an excellent, modern IDL and transport for
microservices. If you're starting a greenfield project, go-kit strongly
recommends gRPC as your default transport.
One important note is that while gRPC supports streaming requests and replies,
go-kit does not. You can still use streams in your service, but their
implementation will not be able to take advantage of many go-kit features like middleware.
Using gRPC and go-kit together is very simple.
First, define your service using protobuf3. This is explained
[in gRPC documentation](http://www.grpc.io/docs/#defining-a-service).
See
[addsvc.proto](https://github.com/go-kit/examples/blob/master/addsvc/pb/addsvc.proto)
for an example. Make sure the proto definition matches your service's go-kit
(interface) definition.
Next, get the protoc compiler.
You can download pre-compiled binaries from the
[protobuf release page](https://github.com/google/protobuf/releases).
You will unzip a folder called `protoc3` with a subdirectory `bin` containing
an executable. Move that executable somewhere in your `$PATH` and you're good
to go!
It can also be built from source.
```sh
brew install autoconf automake libtool
git clone https://github.com/google/protobuf
cd protobuf
./autogen.sh ; ./configure ; make ; make install
```
Then, compile your service definition, from .proto to .go.
```sh
protoc add.proto --go_out=plugins=grpc:.
```
Finally, write a tiny binding from your service definition to the gRPC
definition. It's a simple conversion from one domain to another.
See
[grpc.go](https://github.com/go-kit/examples/blob/master/addsvc/pkg/addtransport/grpc.go)
for an example.
That's it!
The gRPC binding can be bound to a listener and serve normal gRPC requests.
And within your service, you can use standard go-kit components and idioms.
See [addsvc](https://github.com/go-kit/examples/tree/master/addsvc/) for
a complete working example with gRPC support. And remember: go-kit services
can support multiple transports simultaneously.
+140
View File
@@ -0,0 +1,140 @@
package grpc
import (
"context"
"fmt"
"reflect"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"github.com/go-kit/kit/endpoint"
)
// Client wraps a gRPC connection and provides a method that implements
// endpoint.Endpoint.
type Client struct {
client *grpc.ClientConn
serviceName string
method string
enc EncodeRequestFunc
dec DecodeResponseFunc
grpcReply reflect.Type
before []ClientRequestFunc
after []ClientResponseFunc
finalizer []ClientFinalizerFunc
}
// NewClient constructs a usable Client for a single remote endpoint.
// Pass an zero-value protobuf message of the RPC response type as
// the grpcReply argument.
func NewClient(
cc *grpc.ClientConn,
serviceName string,
method string,
enc EncodeRequestFunc,
dec DecodeResponseFunc,
grpcReply interface{},
options ...ClientOption,
) *Client {
c := &Client{
client: cc,
method: fmt.Sprintf("/%s/%s", serviceName, method),
enc: enc,
dec: dec,
// We are using reflect.Indirect here to allow both reply structs and
// pointers to these reply structs. New consumers of the client should
// use structs directly, while existing consumers will not break if they
// remain to use pointers to structs.
grpcReply: reflect.TypeOf(
reflect.Indirect(
reflect.ValueOf(grpcReply),
).Interface(),
),
before: []ClientRequestFunc{},
after: []ClientResponseFunc{},
}
for _, option := range options {
option(c)
}
return c
}
// ClientOption sets an optional parameter for clients.
type ClientOption func(*Client)
// ClientBefore sets the RequestFuncs that are applied to the outgoing gRPC
// request before it's invoked.
func ClientBefore(before ...ClientRequestFunc) ClientOption {
return func(c *Client) { c.before = append(c.before, before...) }
}
// ClientAfter sets the ClientResponseFuncs that are applied to the incoming
// gRPC response prior to it being decoded. This is useful for obtaining
// response metadata and adding onto the context prior to decoding.
func ClientAfter(after ...ClientResponseFunc) ClientOption {
return func(c *Client) { c.after = append(c.after, after...) }
}
// ClientFinalizer is executed at the end of every gRPC request.
// By default, no finalizer is registered.
func ClientFinalizer(f ...ClientFinalizerFunc) ClientOption {
return func(s *Client) { s.finalizer = append(s.finalizer, f...) }
}
// Endpoint returns a usable endpoint that will invoke the gRPC specified by the
// client.
func (c Client) Endpoint() endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (response interface{}, err error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
if c.finalizer != nil {
defer func() {
for _, f := range c.finalizer {
f(ctx, err)
}
}()
}
ctx = context.WithValue(ctx, ContextKeyRequestMethod, c.method)
req, err := c.enc(ctx, request)
if err != nil {
return nil, err
}
md := &metadata.MD{}
for _, f := range c.before {
ctx = f(ctx, md)
}
ctx = metadata.NewOutgoingContext(ctx, *md)
var header, trailer metadata.MD
grpcReply := reflect.New(c.grpcReply).Interface()
if err = c.client.Invoke(
ctx, c.method, req, grpcReply, grpc.Header(&header),
grpc.Trailer(&trailer),
); err != nil {
return nil, err
}
for _, f := range c.after {
ctx = f(ctx, header, trailer)
}
response, err = c.dec(ctx, grpcReply)
if err != nil {
return nil, err
}
return response, nil
}
}
// ClientFinalizerFunc can be used to perform work at the end of a client gRPC
// request, after the response is returned. The principal
// intended use is for error logging. Additional response parameters are
// provided in the context under keys with the ContextKeyResponse prefix.
// Note: err may be nil. There maybe also no additional response parameters depending on
// when an error occurs.
type ClientFinalizerFunc func(ctx context.Context, err error)
+2
View File
@@ -0,0 +1,2 @@
// Package grpc provides a gRPC binding for endpoints.
package grpc
+29
View File
@@ -0,0 +1,29 @@
package grpc
import (
"context"
)
// DecodeRequestFunc extracts a user-domain request object from a gRPC request.
// It's designed to be used in gRPC servers, for server-side endpoints. One
// straightforward DecodeRequestFunc could be something that decodes from the
// gRPC request message to the concrete request type.
type DecodeRequestFunc func(context.Context, interface{}) (request interface{}, err error)
// EncodeRequestFunc encodes the passed request object into the gRPC request
// object. It's designed to be used in gRPC clients, for client-side endpoints.
// One straightforward EncodeRequestFunc could something that encodes the object
// directly to the gRPC request message.
type EncodeRequestFunc func(context.Context, interface{}) (request interface{}, err error)
// EncodeResponseFunc encodes the passed response object to the gRPC response
// message. It's designed to be used in gRPC servers, for server-side endpoints.
// One straightforward EncodeResponseFunc could be something that encodes the
// object directly to the gRPC response message.
type EncodeResponseFunc func(context.Context, interface{}) (response interface{}, err error)
// DecodeResponseFunc extracts a user-domain response object from a gRPC
// response object. It's designed to be used in gRPC clients, for client-side
// endpoints. One straightforward DecodeResponseFunc could be something that
// decodes from the gRPC response message to the concrete response type.
type DecodeResponseFunc func(context.Context, interface{}) (response interface{}, err error)
+81
View File
@@ -0,0 +1,81 @@
package grpc
import (
"context"
"encoding/base64"
"strings"
"google.golang.org/grpc/metadata"
)
const (
binHdrSuffix = "-bin"
)
// ClientRequestFunc may take information from context and use it to construct
// metadata headers to be transported to the server. ClientRequestFuncs are
// executed after creating the request but prior to sending the gRPC request to
// the server.
type ClientRequestFunc func(context.Context, *metadata.MD) context.Context
// ServerRequestFunc may take information from the received metadata header and
// use it to place items in the request scoped context. ServerRequestFuncs are
// executed prior to invoking the endpoint.
type ServerRequestFunc func(context.Context, metadata.MD) context.Context
// ServerResponseFunc may take information from a request context and use it to
// manipulate the gRPC response metadata headers and trailers. ResponseFuncs are
// only executed in servers, after invoking the endpoint but prior to writing a
// response.
type ServerResponseFunc func(ctx context.Context, header *metadata.MD, trailer *metadata.MD) context.Context
// ClientResponseFunc may take information from a gRPC metadata header and/or
// trailer and make the responses available for consumption. ClientResponseFuncs
// are only executed in clients, after a request has been made, but prior to it
// being decoded.
type ClientResponseFunc func(ctx context.Context, header metadata.MD, trailer metadata.MD) context.Context
// SetRequestHeader returns a ClientRequestFunc that sets the specified metadata
// key-value pair.
func SetRequestHeader(key, val string) ClientRequestFunc {
return func(ctx context.Context, md *metadata.MD) context.Context {
key, val := EncodeKeyValue(key, val)
(*md)[key] = append((*md)[key], val)
return ctx
}
}
// SetResponseHeader returns a ResponseFunc that sets the specified metadata
// key-value pair.
func SetResponseHeader(key, val string) ServerResponseFunc {
return func(ctx context.Context, md *metadata.MD, _ *metadata.MD) context.Context {
key, val := EncodeKeyValue(key, val)
(*md)[key] = append((*md)[key], val)
return ctx
}
}
// SetResponseTrailer returns a ResponseFunc that sets the specified metadata
// key-value pair.
func SetResponseTrailer(key, val string) ServerResponseFunc {
return func(ctx context.Context, _ *metadata.MD, md *metadata.MD) context.Context {
key, val := EncodeKeyValue(key, val)
(*md)[key] = append((*md)[key], val)
return ctx
}
}
// EncodeKeyValue sanitizes a key-value pair for use in gRPC metadata headers.
func EncodeKeyValue(key, val string) (string, string) {
key = strings.ToLower(key)
if strings.HasSuffix(key, binHdrSuffix) {
val = base64.StdEncoding.EncodeToString([]byte(val))
}
return key, val
}
type contextKey int
const (
ContextKeyRequestMethod contextKey = iota
)
+168
View File
@@ -0,0 +1,168 @@
package grpc
import (
"context"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"github.com/go-kit/kit/endpoint"
"github.com/go-kit/kit/transport"
"github.com/go-kit/log"
)
// Handler which should be called from the gRPC binding of the service
// implementation. The incoming request parameter, and returned response
// parameter, are both gRPC types, not user-domain.
type Handler interface {
ServeGRPC(ctx context.Context, request interface{}) (context.Context, interface{}, error)
}
// Server wraps an endpoint and implements grpc.Handler.
type Server struct {
e endpoint.Endpoint
dec DecodeRequestFunc
enc EncodeResponseFunc
before []ServerRequestFunc
after []ServerResponseFunc
finalizer []ServerFinalizerFunc
errorHandler transport.ErrorHandler
}
// NewServer constructs a new server, which implements wraps the provided
// endpoint and implements the Handler interface. Consumers should write
// bindings that adapt the concrete gRPC methods from their compiled protobuf
// definitions to individual handlers. Request and response objects are from the
// caller business domain, not gRPC request and reply types.
func NewServer(
e endpoint.Endpoint,
dec DecodeRequestFunc,
enc EncodeResponseFunc,
options ...ServerOption,
) *Server {
s := &Server{
e: e,
dec: dec,
enc: enc,
errorHandler: transport.NewLogErrorHandler(log.NewNopLogger()),
}
for _, option := range options {
option(s)
}
return s
}
// ServerOption sets an optional parameter for servers.
type ServerOption func(*Server)
// ServerBefore functions are executed on the gRPC request object before the
// request is decoded.
func ServerBefore(before ...ServerRequestFunc) ServerOption {
return func(s *Server) { s.before = append(s.before, before...) }
}
// ServerAfter functions are executed on the gRPC response writer after the
// endpoint is invoked, but before anything is written to the client.
func ServerAfter(after ...ServerResponseFunc) ServerOption {
return func(s *Server) { s.after = append(s.after, after...) }
}
// ServerErrorLogger is used to log non-terminal errors. By default, no errors
// are logged.
// Deprecated: Use ServerErrorHandler instead.
func ServerErrorLogger(logger log.Logger) ServerOption {
return func(s *Server) { s.errorHandler = transport.NewLogErrorHandler(logger) }
}
// ServerErrorHandler is used to handle non-terminal errors. By default, non-terminal errors
// are ignored.
func ServerErrorHandler(errorHandler transport.ErrorHandler) ServerOption {
return func(s *Server) { s.errorHandler = errorHandler }
}
// ServerFinalizer is executed at the end of every gRPC request.
// By default, no finalizer is registered.
func ServerFinalizer(f ...ServerFinalizerFunc) ServerOption {
return func(s *Server) { s.finalizer = append(s.finalizer, f...) }
}
// ServeGRPC implements the Handler interface.
func (s Server) ServeGRPC(ctx context.Context, req interface{}) (retctx context.Context, resp interface{}, err error) {
// Retrieve gRPC metadata.
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
md = metadata.MD{}
}
if len(s.finalizer) > 0 {
defer func() {
for _, f := range s.finalizer {
f(ctx, err)
}
}()
}
for _, f := range s.before {
ctx = f(ctx, md)
}
var (
request interface{}
response interface{}
grpcResp interface{}
)
request, err = s.dec(ctx, req)
if err != nil {
s.errorHandler.Handle(ctx, err)
return ctx, nil, err
}
response, err = s.e(ctx, request)
if err != nil {
s.errorHandler.Handle(ctx, err)
return ctx, nil, err
}
var mdHeader, mdTrailer metadata.MD
for _, f := range s.after {
ctx = f(ctx, &mdHeader, &mdTrailer)
}
grpcResp, err = s.enc(ctx, response)
if err != nil {
s.errorHandler.Handle(ctx, err)
return ctx, nil, err
}
if len(mdHeader) > 0 {
if err = grpc.SendHeader(ctx, mdHeader); err != nil {
s.errorHandler.Handle(ctx, err)
return ctx, nil, err
}
}
if len(mdTrailer) > 0 {
if err = grpc.SetTrailer(ctx, mdTrailer); err != nil {
s.errorHandler.Handle(ctx, err)
return ctx, nil, err
}
}
return ctx, grpcResp, nil
}
// ServerFinalizerFunc can be used to perform work at the end of an gRPC
// request, after the response has been written to the client.
type ServerFinalizerFunc func(ctx context.Context, err error)
// Interceptor is a grpc UnaryInterceptor that injects the method name into
// context so it can be consumed by Go kit gRPC middlewares. The Interceptor
// typically is added at creation time of the grpc-go server.
// Like this: `grpc.NewServer(grpc.UnaryInterceptor(kitgrpc.Interceptor))`
func Interceptor(
ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler,
) (resp interface{}, err error) {
ctx = context.WithValue(ctx, ContextKeyRequestMethod, info.FullMethod)
return handler(ctx, req)
}
+1
View File
@@ -23,6 +23,7 @@ github.com/go-kit/kit/sd
github.com/go-kit/kit/sd/lb
github.com/go-kit/kit/tracing/opentracing
github.com/go-kit/kit/transport
github.com/go-kit/kit/transport/grpc
github.com/go-kit/kit/transport/http
# github.com/go-kit/log v0.2.0
## explicit; go 1.17