mirror of
https://github.com/ultravioletrs/cocos.git
synced 2026-06-23 04:10:25 +00:00
NOISSUE - Connect to agent when vm is created (#33)
* Add gRPC TLS config and update protoc versions Enhanced manager service with TLS configuration options and timeout settings for gRPC communication with agent services. Updated corresponding protobuf definitions to include new fields for CA certificates, TLS status, and timeouts. The added TLS support ensures secure inter-service communication while flexible timeout configurations improve the robustness of network interactions. The following adjustments and additions were made: - Included new gRPC configuration fields (ca_certs, client_tls, timeout) for agent communication in protobuf definitions. - Injected agent gRPC configuration into service endpoints, ensuring secure TLS setup and compliance with provided settings. - Revised main service function signatures to accept the gRPC configuration object. - Incremented port forwarding counters post-computation to avoid port conflicts. - Conducted compatibility update of protobuf version comments to reflect minor version bump (v4.25.0 to v4.25.1). This change impacts service deployment that requires proper configuration of TLS credentials and mindful determination of timeouts for efficient network use. Signed-off-by: SammyOina <sammyoina@gmail.com> * update proto Signed-off-by: SammyOina <sammyoina@gmail.com> * Set default timeout for agent configuration Introduced a default timeout of 60 seconds in both gRPC and HTTP endpoints for agent configuration when none is specified. This change ensures that operations do not hang indefinitely and provides a reasonable default for client interactions. Additionally, the instantiation of a new agent client is now outside the retry loop to avoid repeated setup on transient failures. Refactors service logic to optimize client connection handling by moving the agent client setup to occur before attempting retries, which should reduce overhead and improve clarity in error situations. Signed-off-by: SammyOina <sammyoina@gmail.com> * Subject: Move agent client creation into retry loop Body: Refactored the service manager's Run method to initialize the agent gRPC client inside the exponential backoff retry loop. This change addresses intermittent connection issues by reattempting client creation on temporary network failures, ensuring a robust setup before calling the Run method on the client. Signed-off-by: SammyOina <sammyoina@gmail.com> --------- Signed-off-by: SammyOina <sammyoina@gmail.com>
This commit is contained in:
committed by
GitHub
parent
34c5472485
commit
a3c5c765b8
@@ -33,7 +33,7 @@ jobs:
|
||||
|
||||
- name: Set up protoc
|
||||
run: |
|
||||
PROTOC_VERSION=25.0
|
||||
PROTOC_VERSION=25.1
|
||||
PROTOC_GEN_VERSION=v1.31.0
|
||||
PROTOC_GRPC_VERSION=v1.3.0
|
||||
|
||||
|
||||
+1
-1
@@ -4,7 +4,7 @@
|
||||
// Code generated by protoc-gen-go. DO NOT EDIT.
|
||||
// versions:
|
||||
// protoc-gen-go v1.31.0
|
||||
// protoc v4.25.0
|
||||
// protoc v4.25.1
|
||||
// source: agent/agent.proto
|
||||
|
||||
package agent
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
|
||||
// versions:
|
||||
// - protoc-gen-go-grpc v1.3.0
|
||||
// - protoc v4.25.0
|
||||
// - protoc v4.25.1
|
||||
// source: agent/agent.proto
|
||||
|
||||
package agent
|
||||
|
||||
@@ -132,6 +132,15 @@ components:
|
||||
type: array
|
||||
items:
|
||||
type: integer
|
||||
ca_certs:
|
||||
description: agent grpc ca_certs
|
||||
type: string
|
||||
client_tls:
|
||||
description: agent grpc client_tls enabled
|
||||
type: boolean
|
||||
timeout:
|
||||
description: agent grpc timeout
|
||||
type: string
|
||||
|
||||
responses:
|
||||
ServiceError:
|
||||
|
||||
+3
-18
@@ -13,7 +13,6 @@ import (
|
||||
mglog "github.com/absmach/magistrala/logger"
|
||||
"github.com/absmach/magistrala/pkg/messaging/brokers"
|
||||
"github.com/absmach/magistrala/pkg/uuid"
|
||||
"github.com/ultravioletrs/cocos/agent"
|
||||
"github.com/ultravioletrs/cocos/internal"
|
||||
"github.com/ultravioletrs/cocos/internal/env"
|
||||
jaegerclient "github.com/ultravioletrs/cocos/internal/jaeger"
|
||||
@@ -26,8 +25,6 @@ import (
|
||||
httpapi "github.com/ultravioletrs/cocos/manager/api/http"
|
||||
"github.com/ultravioletrs/cocos/manager/qemu"
|
||||
"github.com/ultravioletrs/cocos/manager/tracing"
|
||||
pkggrpc "github.com/ultravioletrs/cocos/pkg/clients/grpc"
|
||||
agentgrpc "github.com/ultravioletrs/cocos/pkg/clients/grpc/agent"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"golang.org/x/sync/errgroup"
|
||||
"google.golang.org/grpc"
|
||||
@@ -83,18 +80,6 @@ func main() {
|
||||
}()
|
||||
tracer := tp.Tracer(svcName)
|
||||
|
||||
agentGRPCConfig := pkggrpc.Config{}
|
||||
if err := env.Parse(&agentGRPCConfig, env.Options{Prefix: envPrefixAgentGRPC}); err != nil {
|
||||
logger.Fatal(fmt.Sprintf("failed to load %s gRPC client configuration: %s", svcName, err))
|
||||
}
|
||||
agentGRPCClient, agentClient, err := agentgrpc.NewAgentClient(agentGRPCConfig)
|
||||
if err != nil {
|
||||
logger.Fatal(err.Error())
|
||||
}
|
||||
defer agentGRPCClient.Close()
|
||||
|
||||
logger.Info(fmt.Sprintf("Successfully connected to agent grpc server at %s %s", agentGRPCConfig.URL, agentGRPCClient.Secure()))
|
||||
|
||||
qemuCfg := qemu.Config{}
|
||||
if err := env.Parse(&qemuCfg, env.Options{Prefix: envPrefixQemu}); err != nil {
|
||||
logger.Fatal(fmt.Sprintf("failed to load QEMU configuration: %s", err))
|
||||
@@ -113,7 +98,7 @@ func main() {
|
||||
logger.Fatal(err.Error())
|
||||
}
|
||||
|
||||
svc := newService(agentClient, logger, tracer, qemuCfg)
|
||||
svc := newService(logger, tracer, qemuCfg)
|
||||
|
||||
httpServerConfig := server.Config{Port: defSvcHTTPPort}
|
||||
if err := env.Parse(&httpServerConfig, env.Options{Prefix: envPrefixHTTP}); err != nil {
|
||||
@@ -154,8 +139,8 @@ func main() {
|
||||
}
|
||||
}
|
||||
|
||||
func newService(agentClient agent.AgentServiceClient, logger mglog.Logger, tracer trace.Tracer, qemuCfg qemu.Config) manager.Service {
|
||||
svc := manager.New(agentClient, qemuCfg)
|
||||
func newService(logger mglog.Logger, tracer trace.Tracer, qemuCfg qemu.Config) manager.Service {
|
||||
svc := manager.New(qemuCfg)
|
||||
|
||||
svc = api.LoggingMiddleware(svc, logger)
|
||||
counter, latency := internal.MakeMetrics(svcName, "api")
|
||||
|
||||
@@ -17,10 +17,6 @@ The service is configured using the environment variables from the following tab
|
||||
| MANAGER_GRPC_PORT | Manager service gRPC port | 7001 |
|
||||
| MANAGER_GRPC_SERVER_CERT | Path to server certificate in pem format | |
|
||||
| MANAGER_GRPC_SERVER_KEY | Path to server key in pem format | |
|
||||
| AGENT_GRPC_URL | Agent service gRPC URL | localhost:7002 |
|
||||
| AGENT_GRPC_TIMEOUT | Agent service gRPC timeout | 1s |
|
||||
| AGENT_GRPC_CA_CERTS | Agent service gRPC CA certificates | |
|
||||
| AGENT_GRPC_CLIENT_TLS | Agent service gRPC client TLS | false |
|
||||
| COCOS_JAEGER_URL | Jaeger server URL | http://localhost:14268/api/traces |
|
||||
| MANAGER_INSTANCE_ID | Manager service instance ID | |
|
||||
| COCOS_MESSAGE_BROKER_URL | Mesage broker url | nats://localhost:4222 |
|
||||
|
||||
@@ -44,6 +44,9 @@ func encodeRunRequest(_ context.Context, request interface{}) (interface{}, erro
|
||||
}
|
||||
return &manager.RunRequest{
|
||||
Computation: req.Computation,
|
||||
CaCerts: req.CACerts,
|
||||
ClientTls: req.ClientTLS,
|
||||
Timeout: req.Timeout.String(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -63,8 +66,16 @@ func (client grpcClient) Run(ctx context.Context, req *manager.RunRequest, _ ...
|
||||
ctx, cancel := context.WithTimeout(ctx, client.timeout)
|
||||
defer cancel()
|
||||
|
||||
dur, err := time.ParseDuration(req.GetTimeout())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
runReq := runReq{
|
||||
Computation: req.GetComputation(),
|
||||
ClientTLS: req.GetClientTls(),
|
||||
CACerts: req.GetCaCerts(),
|
||||
Timeout: dur,
|
||||
}
|
||||
|
||||
res, err := client.run(ctx, runReq)
|
||||
|
||||
@@ -4,9 +4,11 @@ package grpc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/go-kit/kit/endpoint"
|
||||
"github.com/ultravioletrs/cocos/manager"
|
||||
"github.com/ultravioletrs/cocos/pkg/clients/grpc"
|
||||
)
|
||||
|
||||
func runEndpoint(svc manager.Service) endpoint.Endpoint {
|
||||
@@ -17,7 +19,15 @@ func runEndpoint(svc manager.Service) endpoint.Endpoint {
|
||||
return runRes{}, err
|
||||
}
|
||||
|
||||
id, err := svc.Run(ctx, req.Computation)
|
||||
agentConf := grpc.Config{
|
||||
ClientTLS: req.ClientTLS,
|
||||
CACerts: req.CACerts,
|
||||
Timeout: req.Timeout,
|
||||
}
|
||||
if agentConf.Timeout == 0 {
|
||||
agentConf.Timeout = 60 * time.Second
|
||||
}
|
||||
id, err := svc.Run(ctx, req.Computation, agentConf)
|
||||
if err != nil {
|
||||
return runRes{}, err
|
||||
}
|
||||
|
||||
@@ -2,10 +2,17 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
package grpc
|
||||
|
||||
import "github.com/ultravioletrs/cocos/manager"
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/ultravioletrs/cocos/manager"
|
||||
)
|
||||
|
||||
type runReq struct {
|
||||
Computation []byte `json:"computation,omitempty"`
|
||||
Computation []byte `json:"computation,omitempty"`
|
||||
ClientTLS bool `json:"client_tls,omitempty"`
|
||||
CACerts string `json:"ca_certs,omitempty"`
|
||||
Timeout time.Duration `json:"timeout,omitempty"`
|
||||
}
|
||||
|
||||
func (req runReq) validate() error {
|
||||
|
||||
@@ -4,6 +4,7 @@ package grpc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/go-kit/kit/transport/grpc"
|
||||
"github.com/ultravioletrs/cocos/manager"
|
||||
@@ -27,8 +28,15 @@ func NewServer(svc manager.Service) manager.ManagerServiceServer {
|
||||
|
||||
func decodeRunRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
|
||||
req := grpcReq.(*manager.RunRequest)
|
||||
dur, err := time.ParseDuration(req.GetTimeout())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return runReq{
|
||||
Computation: req.GetComputation(),
|
||||
ClientTLS: req.ClientTls,
|
||||
CACerts: req.CaCerts,
|
||||
Timeout: dur,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -4,9 +4,11 @@ package http
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/go-kit/kit/endpoint"
|
||||
"github.com/ultravioletrs/cocos/manager"
|
||||
"github.com/ultravioletrs/cocos/pkg/clients/grpc"
|
||||
)
|
||||
|
||||
func runEndpoint(svc manager.Service) endpoint.Endpoint {
|
||||
@@ -17,8 +19,16 @@ func runEndpoint(svc manager.Service) endpoint.Endpoint {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
agentConf := grpc.Config{
|
||||
ClientTLS: req.ClientTLS,
|
||||
CACerts: req.CACerts,
|
||||
Timeout: req.Timeout.Duration,
|
||||
}
|
||||
if agentConf.Timeout == 0 {
|
||||
agentConf.Timeout = 60 * time.Second
|
||||
}
|
||||
// Call the Run method on the service
|
||||
runID, err := svc.Run(ctx, req.Computation)
|
||||
runID, err := svc.Run(ctx, req.Computation, agentConf)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -2,7 +2,10 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
package http
|
||||
|
||||
import "github.com/ultravioletrs/cocos/manager"
|
||||
import (
|
||||
"github.com/ultravioletrs/cocos/agent"
|
||||
"github.com/ultravioletrs/cocos/manager"
|
||||
)
|
||||
|
||||
var _ apiReq = (*runReq)(nil)
|
||||
|
||||
@@ -11,7 +14,10 @@ type apiReq interface {
|
||||
}
|
||||
|
||||
type runReq struct {
|
||||
Computation []byte `json:"computation,omitempty"`
|
||||
Computation []byte `json:"computation,omitempty"`
|
||||
ClientTLS bool `json:"client_tls,omitempty"`
|
||||
CACerts string `json:"ca_certs,omitempty"`
|
||||
Timeout agent.Duration `json:"timeout,omitempty"`
|
||||
}
|
||||
|
||||
func (req runReq) validate() error {
|
||||
|
||||
@@ -13,6 +13,7 @@ import (
|
||||
|
||||
mglog "github.com/absmach/magistrala/logger"
|
||||
"github.com/ultravioletrs/cocos/manager"
|
||||
"github.com/ultravioletrs/cocos/pkg/clients/grpc"
|
||||
)
|
||||
|
||||
var _ manager.Service = (*loggingMiddleware)(nil)
|
||||
@@ -27,7 +28,7 @@ func LoggingMiddleware(svc manager.Service, logger mglog.Logger) manager.Service
|
||||
return &loggingMiddleware{logger, svc}
|
||||
}
|
||||
|
||||
func (lm *loggingMiddleware) Run(ctx context.Context, computation []byte) (id string, err error) {
|
||||
func (lm *loggingMiddleware) Run(ctx context.Context, computation []byte, agentConfig grpc.Config) (id string, err error) {
|
||||
defer func(begin time.Time) {
|
||||
message := fmt.Sprintf("Method Run for computation took %s to complete", time.Since(begin))
|
||||
if err != nil {
|
||||
@@ -37,5 +38,5 @@ func (lm *loggingMiddleware) Run(ctx context.Context, computation []byte) (id st
|
||||
lm.logger.Info(fmt.Sprintf("%s with ID: %s", message, id))
|
||||
}(time.Now())
|
||||
|
||||
return lm.svc.Run(ctx, computation)
|
||||
return lm.svc.Run(ctx, computation, agentConfig)
|
||||
}
|
||||
|
||||
@@ -12,6 +12,7 @@ import (
|
||||
|
||||
"github.com/go-kit/kit/metrics"
|
||||
"github.com/ultravioletrs/cocos/manager"
|
||||
"github.com/ultravioletrs/cocos/pkg/clients/grpc"
|
||||
)
|
||||
|
||||
var _ manager.Service = (*metricsMiddleware)(nil)
|
||||
@@ -32,11 +33,11 @@ func MetricsMiddleware(svc manager.Service, counter metrics.Counter, latency met
|
||||
}
|
||||
}
|
||||
|
||||
func (ms *metricsMiddleware) Run(ctx context.Context, computation []byte) (string, error) {
|
||||
func (ms *metricsMiddleware) Run(ctx context.Context, computation []byte, agentConfig grpc.Config) (string, error) {
|
||||
defer func(begin time.Time) {
|
||||
ms.counter.With("method", "Run").Add(1)
|
||||
ms.latency.With("method", "Run").Observe(time.Since(begin).Seconds())
|
||||
}(time.Now())
|
||||
|
||||
return ms.svc.Run(ctx, computation)
|
||||
return ms.svc.Run(ctx, computation, agentConfig)
|
||||
}
|
||||
|
||||
+41
-12
@@ -4,7 +4,7 @@
|
||||
// Code generated by protoc-gen-go. DO NOT EDIT.
|
||||
// versions:
|
||||
// protoc-gen-go v1.31.0
|
||||
// protoc v4.25.0
|
||||
// protoc v4.25.1
|
||||
// source: manager/manager.proto
|
||||
|
||||
package manager
|
||||
@@ -29,6 +29,9 @@ type RunRequest struct {
|
||||
unknownFields protoimpl.UnknownFields
|
||||
|
||||
Computation []byte `protobuf:"bytes,1,opt,name=computation,proto3" json:"computation,omitempty"`
|
||||
CaCerts string `protobuf:"bytes,2,opt,name=ca_certs,json=caCerts,proto3" json:"ca_certs,omitempty"`
|
||||
ClientTls bool `protobuf:"varint,3,opt,name=client_tls,json=clientTls,proto3" json:"client_tls,omitempty"`
|
||||
Timeout string `protobuf:"bytes,4,opt,name=timeout,proto3" json:"timeout,omitempty"`
|
||||
}
|
||||
|
||||
func (x *RunRequest) Reset() {
|
||||
@@ -70,6 +73,27 @@ func (x *RunRequest) GetComputation() []byte {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (x *RunRequest) GetCaCerts() string {
|
||||
if x != nil {
|
||||
return x.CaCerts
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func (x *RunRequest) GetClientTls() bool {
|
||||
if x != nil {
|
||||
return x.ClientTls
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (x *RunRequest) GetTimeout() string {
|
||||
if x != nil {
|
||||
return x.Timeout
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
type RunResponse struct {
|
||||
state protoimpl.MessageState
|
||||
sizeCache protoimpl.SizeCache
|
||||
@@ -122,17 +146,22 @@ var File_manager_manager_proto protoreflect.FileDescriptor
|
||||
var file_manager_manager_proto_rawDesc = []byte{
|
||||
0x0a, 0x15, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x2f, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65,
|
||||
0x72, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x07, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72,
|
||||
0x22, 0x2e, 0x0a, 0x0a, 0x52, 0x75, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x20,
|
||||
0x0a, 0x0b, 0x63, 0x6f, 0x6d, 0x70, 0x75, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20,
|
||||
0x01, 0x28, 0x0c, 0x52, 0x0b, 0x63, 0x6f, 0x6d, 0x70, 0x75, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e,
|
||||
0x22, 0x1d, 0x0a, 0x0b, 0x52, 0x75, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12,
|
||||
0x0e, 0x0a, 0x02, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x49, 0x44, 0x32,
|
||||
0x44, 0x0a, 0x0e, 0x4d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63,
|
||||
0x65, 0x12, 0x32, 0x0a, 0x03, 0x52, 0x75, 0x6e, 0x12, 0x13, 0x2e, 0x6d, 0x61, 0x6e, 0x61, 0x67,
|
||||
0x65, 0x72, 0x2e, 0x52, 0x75, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x14, 0x2e,
|
||||
0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x2e, 0x52, 0x75, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f,
|
||||
0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x0b, 0x5a, 0x09, 0x2e, 0x2f, 0x6d, 0x61, 0x6e, 0x61, 0x67,
|
||||
0x65, 0x72, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
|
||||
0x22, 0x82, 0x01, 0x0a, 0x0a, 0x52, 0x75, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12,
|
||||
0x20, 0x0a, 0x0b, 0x63, 0x6f, 0x6d, 0x70, 0x75, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x01,
|
||||
0x20, 0x01, 0x28, 0x0c, 0x52, 0x0b, 0x63, 0x6f, 0x6d, 0x70, 0x75, 0x74, 0x61, 0x74, 0x69, 0x6f,
|
||||
0x6e, 0x12, 0x19, 0x0a, 0x08, 0x63, 0x61, 0x5f, 0x63, 0x65, 0x72, 0x74, 0x73, 0x18, 0x02, 0x20,
|
||||
0x01, 0x28, 0x09, 0x52, 0x07, 0x63, 0x61, 0x43, 0x65, 0x72, 0x74, 0x73, 0x12, 0x1d, 0x0a, 0x0a,
|
||||
0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x5f, 0x74, 0x6c, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x08,
|
||||
0x52, 0x09, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x54, 0x6c, 0x73, 0x12, 0x18, 0x0a, 0x07, 0x74,
|
||||
0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x74, 0x69,
|
||||
0x6d, 0x65, 0x6f, 0x75, 0x74, 0x22, 0x1d, 0x0a, 0x0b, 0x52, 0x75, 0x6e, 0x52, 0x65, 0x73, 0x70,
|
||||
0x6f, 0x6e, 0x73, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09,
|
||||
0x52, 0x02, 0x49, 0x44, 0x32, 0x44, 0x0a, 0x0e, 0x4d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x53,
|
||||
0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x32, 0x0a, 0x03, 0x52, 0x75, 0x6e, 0x12, 0x13, 0x2e,
|
||||
0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x2e, 0x52, 0x75, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65,
|
||||
0x73, 0x74, 0x1a, 0x14, 0x2e, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x2e, 0x52, 0x75, 0x6e,
|
||||
0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x0b, 0x5a, 0x09, 0x2e, 0x2f,
|
||||
0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
|
||||
}
|
||||
|
||||
var (
|
||||
|
||||
@@ -11,6 +11,11 @@ service ManagerService {
|
||||
rpc Run(RunRequest) returns (RunResponse) {}
|
||||
}
|
||||
|
||||
message RunRequest { bytes computation = 1; }
|
||||
message RunRequest {
|
||||
bytes computation = 1;
|
||||
string ca_certs = 2;
|
||||
bool client_tls = 3;
|
||||
string timeout = 4;
|
||||
}
|
||||
|
||||
message RunResponse { string ID = 1; }
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
|
||||
// versions:
|
||||
// - protoc-gen-go-grpc v1.3.0
|
||||
// - protoc v4.25.0
|
||||
// - protoc v4.25.1
|
||||
// source: manager/manager.proto
|
||||
|
||||
package manager
|
||||
|
||||
+19
-9
@@ -5,10 +5,13 @@ package manager
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/cenkalti/backoff/v4"
|
||||
"github.com/ultravioletrs/cocos/agent"
|
||||
"github.com/ultravioletrs/cocos/manager/qemu"
|
||||
"github.com/ultravioletrs/cocos/pkg/clients/grpc"
|
||||
agentgrpc "github.com/ultravioletrs/cocos/pkg/clients/grpc/agent"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -27,38 +30,45 @@ var (
|
||||
// Service specifies an API that must be fulfilled by the domain service
|
||||
// implementation, and all of its decorators (e.g. logging & metrics).
|
||||
type Service interface {
|
||||
Run(ctx context.Context, computation []byte) (string, error)
|
||||
Run(ctx context.Context, computation []byte, agentConfig grpc.Config) (string, error)
|
||||
}
|
||||
|
||||
type managerService struct {
|
||||
agent agent.AgentServiceClient
|
||||
qemuCfg qemu.Config
|
||||
}
|
||||
|
||||
var _ Service = (*managerService)(nil)
|
||||
|
||||
// New instantiates the manager service implementation.
|
||||
func New(agentClient agent.AgentServiceClient, qemuCfg qemu.Config) Service {
|
||||
func New(qemuCfg qemu.Config) Service {
|
||||
return &managerService{
|
||||
agent: agentClient,
|
||||
qemuCfg: qemuCfg,
|
||||
}
|
||||
}
|
||||
|
||||
func (ms *managerService) Run(ctx context.Context, computation []byte) (string, error) {
|
||||
func (ms *managerService) Run(ctx context.Context, computation []byte, agentConfig grpc.Config) (string, error) {
|
||||
_, err := qemu.CreateVM(ctx, ms.qemuCfg)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
// different VM guests can't forward ports to the same ports on the same host
|
||||
ms.qemuCfg.HostFwd1++
|
||||
ms.qemuCfg.NetDevConfig.HostFwd2++
|
||||
ms.qemuCfg.NetDevConfig.HostFwd3++
|
||||
defer func() {
|
||||
ms.qemuCfg.HostFwd1++
|
||||
ms.qemuCfg.NetDevConfig.HostFwd2++
|
||||
ms.qemuCfg.NetDevConfig.HostFwd3++
|
||||
}()
|
||||
|
||||
var res *agent.RunResponse
|
||||
|
||||
agentConfig.URL = fmt.Sprintf("localhost:%d", ms.qemuCfg.HostFwd3)
|
||||
|
||||
err = backoff.Retry(func() error {
|
||||
res, err = ms.agent.Run(ctx, &agent.RunRequest{Computation: computation})
|
||||
agentGRPCClient, agentClient, err := agentgrpc.NewAgentClient(agentConfig)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer agentGRPCClient.Close()
|
||||
res, err = agentClient.Run(ctx, &agent.RunRequest{Computation: computation})
|
||||
return err
|
||||
}, backoff.NewExponentialBackOff())
|
||||
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"context"
|
||||
|
||||
"github.com/ultravioletrs/cocos/manager"
|
||||
"github.com/ultravioletrs/cocos/pkg/clients/grpc"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
@@ -21,9 +22,9 @@ func New(svc manager.Service, tracer trace.Tracer) manager.Service {
|
||||
return &tracingMiddleware{tracer, svc}
|
||||
}
|
||||
|
||||
func (tm *tracingMiddleware) Run(ctx context.Context, computation []byte) (string, error) {
|
||||
func (tm *tracingMiddleware) Run(ctx context.Context, computation []byte, agentConfig grpc.Config) (string, error) {
|
||||
ctx, span := tm.tracer.Start(ctx, "run")
|
||||
defer span.End()
|
||||
|
||||
return tm.svc.Run(ctx, computation)
|
||||
return tm.svc.Run(ctx, computation, agentConfig)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user