COCOS-157 - Provide abstractions for VM management (#171)

* abstract vm creation and allow stopping computation

Signed-off-by: SammyOina <sammyoina@gmail.com>

* Refactor QEMU configuration loading and execution in main.go

Signed-off-by: SammyOina <sammyoina@gmail.com>

* * feat(agent-config): add support for sending agent configuration to manager

Signed-off-by: SammyOina <sammyoina@gmail.com>

* * chore(checkproto.yaml): update protoc-gen and protoc-grpc versions

Signed-off-by: SammyOina <sammyoina@gmail.com>

* * chore(auth): update mockery version to v2.43.2
* chore(main.go): update import path for vm package in agent
* chore(main.go): update import path for vm package in manager
* chore(go.mod): add github.com/google/logger v1.1.1 as a required dependency
* chore(manager_test.go): update import path for vm package in manager
* chore(logging.go): move logging.go to manager/qemu/vm package
* chore(logging_test.go): move logging_test.go to manager/qemu/vm package
* chore(vm_factory.go): rename vm_factory.go to provider.go in manager/qemu/vm/mocks package
* chore(vm.go): move vm.go to manager/qemu/vm package
* chore(vm.go): update import path for vm package in manager
* chore(vm_test.go): move vm_test.go to manager/qemu/vm package
* chore(vsock.go): move vsock.go to manager

Signed-off-by: SammyOina <sammyoina@gmail.com>

* * fix(main.go): change import path for 'github.com/ultravioletrs/cocos/manager/qemu/vm' to 'github.com/ultravioletrs/cocos/manager/vm'
* fix(main.go): change vsock.Dial argument from 'vm.VsockConfigPort' to 'qemu.VsockConfigPort'
* fix(main.go): change import path for 'github.com/ultravioletrs/cocos/manager/qemu' to 'github.com/ultravioletrs/cocos/manager/qemu'

Signed-off-by: SammyOina <sammyoina@gmail.com>

---------

Signed-off-by: SammyOina <sammyoina@gmail.com>
This commit is contained in:
Sammy Kerata Oina
2024-07-10 16:33:54 +03:00
committed by GitHub
parent d1c9834d86
commit f4e3e8e09c
28 changed files with 1175 additions and 289 deletions
+2 -2
View File
@@ -34,8 +34,8 @@ jobs:
- name: Set up protoc
run: |
PROTOC_VERSION=25.3
PROTOC_GEN_VERSION=v1.33.0
PROTOC_GRPC_VERSION=v1.3.0
PROTOC_GEN_VERSION=v1.34.2
PROTOC_GRPC_VERSION=v1.4.0
# Download and install protoc
PROTOC_ZIP=protoc-$PROTOC_VERSION-linux-x86_64.zip
+10 -10
View File
@@ -3,7 +3,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.33.0
// protoc-gen-go v1.34.2
// protoc v4.25.3
// source: agent/agent.proto
@@ -426,7 +426,7 @@ func file_agent_agent_proto_rawDescGZIP() []byte {
}
var file_agent_agent_proto_msgTypes = make([]protoimpl.MessageInfo, 8)
var file_agent_agent_proto_goTypes = []interface{}{
var file_agent_agent_proto_goTypes = []any{
(*AlgoRequest)(nil), // 0: agent.AlgoRequest
(*AlgoResponse)(nil), // 1: agent.AlgoResponse
(*DataRequest)(nil), // 2: agent.DataRequest
@@ -458,7 +458,7 @@ func file_agent_agent_proto_init() {
return
}
if !protoimpl.UnsafeEnabled {
file_agent_agent_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
file_agent_agent_proto_msgTypes[0].Exporter = func(v any, i int) any {
switch v := v.(*AlgoRequest); i {
case 0:
return &v.state
@@ -470,7 +470,7 @@ func file_agent_agent_proto_init() {
return nil
}
}
file_agent_agent_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
file_agent_agent_proto_msgTypes[1].Exporter = func(v any, i int) any {
switch v := v.(*AlgoResponse); i {
case 0:
return &v.state
@@ -482,7 +482,7 @@ func file_agent_agent_proto_init() {
return nil
}
}
file_agent_agent_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
file_agent_agent_proto_msgTypes[2].Exporter = func(v any, i int) any {
switch v := v.(*DataRequest); i {
case 0:
return &v.state
@@ -494,7 +494,7 @@ func file_agent_agent_proto_init() {
return nil
}
}
file_agent_agent_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} {
file_agent_agent_proto_msgTypes[3].Exporter = func(v any, i int) any {
switch v := v.(*DataResponse); i {
case 0:
return &v.state
@@ -506,7 +506,7 @@ func file_agent_agent_proto_init() {
return nil
}
}
file_agent_agent_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} {
file_agent_agent_proto_msgTypes[4].Exporter = func(v any, i int) any {
switch v := v.(*ResultRequest); i {
case 0:
return &v.state
@@ -518,7 +518,7 @@ func file_agent_agent_proto_init() {
return nil
}
}
file_agent_agent_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} {
file_agent_agent_proto_msgTypes[5].Exporter = func(v any, i int) any {
switch v := v.(*ResultResponse); i {
case 0:
return &v.state
@@ -530,7 +530,7 @@ func file_agent_agent_proto_init() {
return nil
}
}
file_agent_agent_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} {
file_agent_agent_proto_msgTypes[6].Exporter = func(v any, i int) any {
switch v := v.(*AttestationRequest); i {
case 0:
return &v.state
@@ -542,7 +542,7 @@ func file_agent_agent_proto_init() {
return nil
}
}
file_agent_agent_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} {
file_agent_agent_proto_msgTypes[7].Exporter = func(v any, i int) any {
switch v := v.(*AttestationResponse); i {
case 0:
return &v.state
+15 -11
View File
@@ -3,7 +3,7 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.3.0
// - protoc-gen-go-grpc v1.4.0
// - protoc v4.25.3
// source: agent/agent.proto
@@ -18,8 +18,8 @@ import (
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.32.0 or later.
const _ = grpc.SupportPackageIsVersion7
// Requires gRPC-Go v1.62.0 or later.
const _ = grpc.SupportPackageIsVersion8
const (
AgentService_Algo_FullMethodName = "/agent.AgentService/Algo"
@@ -47,11 +47,12 @@ func NewAgentServiceClient(cc grpc.ClientConnInterface) AgentServiceClient {
}
func (c *agentServiceClient) Algo(ctx context.Context, opts ...grpc.CallOption) (AgentService_AlgoClient, error) {
stream, err := c.cc.NewStream(ctx, &AgentService_ServiceDesc.Streams[0], AgentService_Algo_FullMethodName, opts...)
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
stream, err := c.cc.NewStream(ctx, &AgentService_ServiceDesc.Streams[0], AgentService_Algo_FullMethodName, cOpts...)
if err != nil {
return nil, err
}
x := &agentServiceAlgoClient{stream}
x := &agentServiceAlgoClient{ClientStream: stream}
return x, nil
}
@@ -81,11 +82,12 @@ func (x *agentServiceAlgoClient) CloseAndRecv() (*AlgoResponse, error) {
}
func (c *agentServiceClient) Data(ctx context.Context, opts ...grpc.CallOption) (AgentService_DataClient, error) {
stream, err := c.cc.NewStream(ctx, &AgentService_ServiceDesc.Streams[1], AgentService_Data_FullMethodName, opts...)
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
stream, err := c.cc.NewStream(ctx, &AgentService_ServiceDesc.Streams[1], AgentService_Data_FullMethodName, cOpts...)
if err != nil {
return nil, err
}
x := &agentServiceDataClient{stream}
x := &agentServiceDataClient{ClientStream: stream}
return x, nil
}
@@ -115,8 +117,9 @@ func (x *agentServiceDataClient) CloseAndRecv() (*DataResponse, error) {
}
func (c *agentServiceClient) Result(ctx context.Context, in *ResultRequest, opts ...grpc.CallOption) (*ResultResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(ResultResponse)
err := c.cc.Invoke(ctx, AgentService_Result_FullMethodName, in, out, opts...)
err := c.cc.Invoke(ctx, AgentService_Result_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
@@ -124,8 +127,9 @@ func (c *agentServiceClient) Result(ctx context.Context, in *ResultRequest, opts
}
func (c *agentServiceClient) Attestation(ctx context.Context, in *AttestationRequest, opts ...grpc.CallOption) (*AttestationResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(AttestationResponse)
err := c.cc.Invoke(ctx, AgentService_Attestation_FullMethodName, in, out, opts...)
err := c.cc.Invoke(ctx, AgentService_Attestation_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
@@ -173,7 +177,7 @@ func RegisterAgentServiceServer(s grpc.ServiceRegistrar, srv AgentServiceServer)
}
func _AgentService_Algo_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(AgentServiceServer).Algo(&agentServiceAlgoServer{stream})
return srv.(AgentServiceServer).Algo(&agentServiceAlgoServer{ServerStream: stream})
}
type AgentService_AlgoServer interface {
@@ -199,7 +203,7 @@ func (x *agentServiceAlgoServer) Recv() (*AlgoRequest, error) {
}
func _AgentService_Data_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(AgentServiceServer).Data(&agentServiceDataServer{stream})
return srv.(AgentServiceServer).Data(&agentServiceDataServer{ServerStream: stream})
}
type AgentService_DataServer interface {
+1 -1
View File
@@ -1,4 +1,4 @@
// Code generated by mockery v2.42.0. DO NOT EDIT.
// Code generated by mockery v2.43.2. DO NOT EDIT.
// Copyright (c) Ultraviolet
// SPDX-License-Identifier: Apache-2.0
+2 -1
View File
@@ -21,6 +21,7 @@ import (
"github.com/ultravioletrs/cocos/internal/server"
grpcserver "github.com/ultravioletrs/cocos/internal/server/grpc"
"github.com/ultravioletrs/cocos/manager"
"github.com/ultravioletrs/cocos/manager/qemu"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
@@ -108,7 +109,7 @@ func newService(ctx context.Context, logger *slog.Logger, eventSvc events.Servic
}
func readConfig() (agent.Computation, error) {
l, err := vsock.Listen(manager.VsockConfigPort, nil)
l, err := vsock.Listen(qemu.VsockConfigPort, nil)
if err != nil {
return agent.Computation{}, err
}
+3 -7
View File
@@ -78,12 +78,8 @@ func main() {
logger.Error(fmt.Sprintf("failed to load QEMU configuration: %s", err))
return
}
exe, args, err := qemu.ExecutableAndArgs(qemuCfg)
if err != nil {
logger.Error(fmt.Sprintf("failed to parse QEMU configuration: %s", err))
return
}
logger.Info(fmt.Sprintf("%s %s", exe, strings.Join(args, " ")))
args := qemuCfg.ConstructQemuArgs()
logger.Info(strings.Join(args, " "))
managerGRPCConfig := grpc.Config{}
if err := env.Parse(&managerGRPCConfig, env.Options{Prefix: envPrefixGRPC}); err != nil {
@@ -124,7 +120,7 @@ func main() {
}
func newService(logger *slog.Logger, tracer trace.Tracer, qemuCfg qemu.Config, eventsChan chan *pkgmanager.ClientStreamMessage) manager.Service {
svc := manager.New(qemuCfg, logger, eventsChan)
svc := manager.New(qemuCfg, logger, eventsChan, qemu.NewVM)
go svc.RetrieveAgentEventsLogs()
svc = api.LoggingMiddleware(svc, logger)
counter, latency := internal.MakeMetrics(svcName, "api")
+6 -7
View File
@@ -1,6 +1,6 @@
module github.com/ultravioletrs/cocos
go 1.22.0
go 1.22.4
require (
github.com/absmach/magistrala v0.0.0-20240119191055-d95283d31472
@@ -10,6 +10,7 @@ require (
github.com/go-kit/kit v0.13.0
github.com/gofrs/uuid v4.4.0+incompatible
github.com/google/go-sev-guest v0.10.1
github.com/google/logger v1.1.1
github.com/mdlayher/vsock v1.2.1
github.com/prometheus/client_golang v1.18.0
github.com/spf13/cobra v1.8.0
@@ -23,12 +24,11 @@ require (
go.opentelemetry.io/otel/trace v1.21.0
golang.org/x/crypto v0.19.0
golang.org/x/sync v0.6.0
google.golang.org/grpc v1.60.1
google.golang.org/grpc v1.62.0
google.golang.org/protobuf v1.32.0
)
require (
cloud.google.com/go/compute v1.23.3 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
@@ -38,8 +38,7 @@ require (
github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/go-configfs-tsm v0.2.2 // indirect
github.com/google/logger v1.1.1 // indirect
github.com/google/uuid v1.5.0 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect
@@ -58,7 +57,7 @@ require (
golang.org/x/sys v0.21.0 // indirect
golang.org/x/term v0.21.0
golang.org/x/text v0.14.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240108191215-35c7eff3a6b1 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240108191215-35c7eff3a6b1 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240123012728-ef4313101c80 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
+16 -17
View File
@@ -1,4 +1,3 @@
cloud.google.com/go v0.110.8 h1:tyNdfIxjzaWctIiLYOTalaLKZ17SI44SKFW26QbOhME=
cloud.google.com/go/compute v1.23.3 h1:6sVlXXBmbd7jNX0Ipq0trII3e4n1/MsADLK6a+aiVlk=
cloud.google.com/go/compute v1.23.3/go.mod h1:VCgBUoMnIVIR0CscqQiPJLAG25E3ZRZMzcFZeQ+h8CI=
cloud.google.com/go/compute/metadata v0.2.3 h1:mg4jlk7mCAj6xXp9UJ4fjI9VUI5rubuGBW5aJ7UnBMY=
@@ -15,8 +14,8 @@ github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqy
github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cncf/xds/go v0.0.0-20230607035331-e9ce68804cb4 h1:/inchEIKaYC1Akx+H+gqO04wryn5h75LSazbRlnya1k=
github.com/cncf/xds/go v0.0.0-20230607035331-e9ce68804cb4/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cncf/xds/go v0.0.0-20231128003011-0fa0005c9caa h1:jQCWAUqqlij9Pgj2i/PB79y4KOPYVyFYdROxgaCwdTQ=
github.com/cncf/xds/go v0.0.0-20231128003011-0fa0005c9caa/go.mod h1:x/1Gn8zydmfq8dk6e9PdstVsDgu9RuyIIJqAaF//0IM=
github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@@ -24,8 +23,8 @@ github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/digitalocean/go-libvirt v0.0.0-20221205150000-2939327a8519 h1:OpkN/n40cmKenDQS+IOAeW9DLhYy4DADSeZnouCEV/E=
github.com/digitalocean/go-libvirt v0.0.0-20221205150000-2939327a8519/go.mod h1:WyJJyfmJ0gWJvjV+ZH4DOgtOYZc1KOvYyBXWCLKxsUU=
github.com/envoyproxy/protoc-gen-validate v1.0.2 h1:QkIBuU5k+x7/QXPvPPnWXWlCdaBFApVqftFV6k087DA=
github.com/envoyproxy/protoc-gen-validate v1.0.2/go.mod h1:GpiZQP3dDbg4JouG/NNS7QWXpgx6x8QiMKdmN72jogE=
github.com/envoyproxy/protoc-gen-validate v1.0.4 h1:gVPz/FMfvh57HdSJQyvBtF00j8JU4zdyUgIUNhlgg0A=
github.com/envoyproxy/protoc-gen-validate v1.0.4/go.mod h1:qys6tmnRsYrQqIhm2bvKZH4Blx/1gTIZ2UKVY1M+Yew=
github.com/go-kit/kit v0.13.0 h1:OoneCcHKHQ03LfBpoQCUfCluwd2Vt3ohz+kvbJneZAU=
github.com/go-kit/kit v0.13.0/go.mod h1:phqEHMMUbyrCFCTgH48JueqrM3md2HcAZ8N3XE4FKDg=
github.com/go-kit/log v0.2.1 h1:MRVx0/zhvdseW+Gza6N9rVzU/IVzaeE1SFI4raAhmBU=
@@ -52,8 +51,8 @@ github.com/google/go-sev-guest v0.10.1/go.mod h1:/5hrgGWqG7+MPTXKhQz+v9ZE+Eh4MCB
github.com/google/logger v1.1.1 h1:+6Z2geNxc9G+4D4oDO9njjjn2d0wN5d7uOo0vOIW1NQ=
github.com/google/logger v1.1.1/go.mod h1:BkeJZ+1FhQ+/d087r4dzojEg1u2ZX+ZqG1jTUrLM+zQ=
github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU=
github.com/google/uuid v1.5.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0 h1:Wqo399gCIufwto+VfwCSvsnfGpF/w5E9CNxSwbpD6No=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0/go.mod h1:qmOFXW2epJhM0qSnUUYpldc7gVz2KMQwJ/QYCDIa7XU=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
@@ -130,8 +129,8 @@ golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.20.0 h1:aCL9BSgETF1k+blQaYUBx9hJ9LOGP3gAVemcZlf1Kpo=
golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY=
golang.org/x/oauth2 v0.15.0 h1:s8pnnxNVzjWyrvYdFUQq5llS1PX2zhPXmccZv99h7uQ=
golang.org/x/oauth2 v0.15.0/go.mod h1:q48ptWNTY5XWf+JNten23lcvHpLJ0ZSxF5ttTHKVCAM=
golang.org/x/oauth2 v0.16.0 h1:aDkGMBSYxElaoP81NpoUoz2oo2R2wHdZpGToUxfyQrQ=
golang.org/x/oauth2 v0.16.0/go.mod h1:hqZ+0LWXsiVoZpeld6jVt06P3adbS2Uu911W1SsJv2o=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ=
@@ -162,14 +161,14 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/appengine v1.6.8 h1:IhEN5q69dyKagZPYMSdIjS2HqprW324FRQZJcGqPAsM=
google.golang.org/appengine v1.6.8/go.mod h1:1jJ3jBArFh5pcgW8gCtRJnepW8FzD1V44FJffLiz/Ds=
google.golang.org/genproto v0.0.0-20240102182953-50ed04b92917 h1:nz5NESFLZbJGPFxDT/HCn+V1mZ8JGNoY4nUpmW/Y2eg=
google.golang.org/genproto v0.0.0-20240102182953-50ed04b92917/go.mod h1:pZqR+glSb11aJ+JQcczCvgf47+duRuzNSKqE8YAQnV0=
google.golang.org/genproto/googleapis/api v0.0.0-20240108191215-35c7eff3a6b1 h1:OPXtXn7fNMaXwO3JvOmF1QyTc00jsSFFz1vXXBOdCDo=
google.golang.org/genproto/googleapis/api v0.0.0-20240108191215-35c7eff3a6b1/go.mod h1:B5xPO//w8qmBDjGReYLpR6UJPnkldGkCSMoH/2vxJeg=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240108191215-35c7eff3a6b1 h1:gphdwh0npgs8elJ4T6J+DQJHPVF7RsuJHCfwztUb4J4=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240108191215-35c7eff3a6b1/go.mod h1:daQN87bsDqDoe316QbbvX60nMoJQa4r6Ds0ZuoAe5yA=
google.golang.org/grpc v1.60.1 h1:26+wFr+cNqSGFcOXcabYC0lUVJVRa2Sb2ortSK7VrEU=
google.golang.org/grpc v1.60.1/go.mod h1:OlCHIeLYqSSsLi6i49B5QGdzaMZK9+M7LXN2FKz4eGM=
google.golang.org/genproto v0.0.0-20240123012728-ef4313101c80 h1:KAeGQVN3M9nD0/bQXnr/ClcEMJ968gUXJQ9pwfSynuQ=
google.golang.org/genproto v0.0.0-20240123012728-ef4313101c80/go.mod h1:cc8bqMqtv9gMOr0zHg2Vzff5ULhhL2IXP4sbcn32Dro=
google.golang.org/genproto/googleapis/api v0.0.0-20240123012728-ef4313101c80 h1:Lj5rbfG876hIAYFjqiJnPHfhXbv+nzTWfm04Fg/XSVU=
google.golang.org/genproto/googleapis/api v0.0.0-20240123012728-ef4313101c80/go.mod h1:4jWUdICTdgc3Ibxmr8nAJiiLHwQBY0UI0XZcEMaFKaA=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80 h1:AjyfHzEPEFp/NpvfN5g+KDla3EMojjhRVZc1i7cj+oM=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80/go.mod h1:PAREbraiVEVGVdTZsVWjSbbTtSyGbAgIIvni8a8CD5s=
google.golang.org/grpc v1.62.0 h1:HQKZ/fa1bXkX1oFOvSjmZEUL8wLSaZTjCcLAlmZRtdk=
google.golang.org/grpc v1.62.0/go.mod h1:IWTG0VlJLCh1SkC58F7np9ka9mx/WNkjl4PGJaiq+QE=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I=
+4
View File
@@ -50,6 +50,10 @@ func (client ManagerClient) Process(ctx context.Context, cancel context.CancelFu
case *pkgmanager.ServerStreamMessage_TerminateReq:
cancel()
return errors.Join(errTerminationFromServer, errors.New(mes.TerminateReq.Message))
case *pkgmanager.ServerStreamMessage_StopComputation:
if err := client.svc.Stop(ctx, mes.StopComputation.ComputationId); err != nil {
return err
}
}
}
})
+13
View File
@@ -41,6 +41,19 @@ func (lm *loggingMiddleware) Run(ctx context.Context, mc *pkgmanager.Computation
return lm.svc.Run(ctx, mc)
}
func (lm *loggingMiddleware) Stop(ctx context.Context, computationID string) (err error) {
defer func(begin time.Time) {
message := fmt.Sprintf("Method Stop for computation took %s to complete", time.Since(begin))
if err != nil {
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
return
}
lm.logger.Info(message)
}(time.Now())
return lm.svc.Stop(ctx, computationID)
}
func (lm *loggingMiddleware) RetrieveAgentEventsLogs() {
lm.svc.RetrieveAgentEventsLogs()
}
+9
View File
@@ -42,6 +42,15 @@ func (ms *metricsMiddleware) Run(ctx context.Context, mc *pkgmanager.Computation
return ms.svc.Run(ctx, mc)
}
func (ms *metricsMiddleware) Stop(ctx context.Context, computationID string) error {
defer func(begin time.Time) {
ms.counter.With("method", "Stop").Add(1)
ms.latency.With("method", "Stop").Observe(time.Since(begin).Seconds())
}(time.Now())
return ms.svc.Stop(ctx, computationID)
}
func (ms *metricsMiddleware) RetrieveAgentEventsLogs() {
ms.svc.RetrieveAgentEventsLogs()
}
+5
View File
@@ -17,6 +17,10 @@ message Terminate {
string message = 1;
}
message StopComputation {
string computation_id = 1;
}
message RunResponse{
string agent_port = 1;
string computation_id = 2;
@@ -50,6 +54,7 @@ message ServerStreamMessage {
oneof message {
ComputationRunReq runReq = 1;
Terminate terminateReq = 2;
StopComputation stopComputation = 3;
}
}
+232
View File
@@ -0,0 +1,232 @@
// Copyright (c) Ultraviolet
// SPDX-License-Identifier: Apache-2.0
package manager
import (
"context"
"encoding/json"
"log/slog"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/ultravioletrs/cocos/manager/qemu"
"github.com/ultravioletrs/cocos/manager/vm"
"github.com/ultravioletrs/cocos/manager/vm/mocks"
"github.com/ultravioletrs/cocos/pkg/manager"
)
func TestNew(t *testing.T) {
qemuCfg := qemu.Config{}
logger := slog.Default()
eventsChan := make(chan *manager.ClientStreamMessage)
vmf := new(mocks.Provider)
service := New(qemuCfg, logger, eventsChan, vmf.Execute)
assert.NotNil(t, service)
assert.IsType(t, &managerService{}, service)
}
func TestRun(t *testing.T) {
vmf := new(mocks.Provider)
vmMock := new(mocks.VM)
vmf.On("Execute", mock.Anything, mock.Anything, mock.Anything).Return(vmMock)
tests := []struct {
name string
req *manager.ComputationRunReq
vmStartError error
expectedError error
}{
{
name: "Successful run",
req: &manager.ComputationRunReq{
Id: "test-computation",
Name: "Test Computation",
Algorithm: &manager.Algorithm{
Hash: make([]byte, hashLength),
},
AgentConfig: &manager.AgentConfig{},
},
vmStartError: nil,
expectedError: nil,
},
{
name: "VM start failure",
req: &manager.ComputationRunReq{
Id: "test-computation",
Name: "Test Computation",
Algorithm: &manager.Algorithm{
Hash: make([]byte, hashLength),
},
AgentConfig: &manager.AgentConfig{},
},
vmStartError: assert.AnError,
expectedError: assert.AnError,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if tt.vmStartError == nil {
vmMock.On("Start").Return(nil).Once()
} else {
vmMock.On("Start").Return(tt.vmStartError).Once()
}
vmMock.On("SendAgentConfig", mock.Anything).Return(nil)
qemuCfg := qemu.Config{
VSockConfig: qemu.VSockConfig{
GuestCID: 3,
Vnc: 5900,
},
}
logger := slog.Default()
eventsChan := make(chan *manager.ClientStreamMessage, 10)
ms := &managerService{
qemuCfg: qemuCfg,
logger: logger,
agents: make(map[int]string),
vms: make(map[string]vm.VM),
eventsChan: eventsChan,
vmFactory: vmf.Execute,
}
ctx := context.Background()
port, err := ms.Run(ctx, tt.req)
if tt.expectedError != nil {
assert.Error(t, err)
assert.ErrorIs(t, err, tt.expectedError)
assert.Empty(t, port)
} else {
assert.NoError(t, err)
assert.NotEmpty(t, port)
assert.Len(t, ms.vms, 1)
assert.Len(t, ms.agents, 1)
}
vmf.AssertExpectations(t)
// Clear the events channel
for len(eventsChan) > 0 {
<-eventsChan
}
})
}
}
func TestStop(t *testing.T) {
tests := []struct {
name string
computationID string
vmStopError error
expectedError error
initialVMCount int
}{
{
name: "Successful stop",
computationID: "existing-computation",
vmStopError: nil,
expectedError: nil,
initialVMCount: 1,
},
{
name: "Non-existent computation",
computationID: "non-existent-computation",
vmStopError: nil,
expectedError: ErrNotFound,
initialVMCount: 0,
},
{
name: "VM stop error",
computationID: "error-computation",
vmStopError: assert.AnError,
expectedError: assert.AnError,
initialVMCount: 1,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ms := &managerService{
vms: make(map[string]vm.VM),
}
vmMock := new(mocks.VM)
if tt.vmStopError == nil {
vmMock.On("Stop").Return(nil).Once()
} else {
vmMock.On("Stop").Return(assert.AnError).Once()
}
if tt.initialVMCount > 0 {
ms.vms[tt.computationID] = vmMock
}
err := ms.Stop(context.Background(), tt.computationID)
if tt.expectedError != nil {
assert.Error(t, err)
assert.ErrorIs(t, err, tt.expectedError)
} else {
assert.NoError(t, err)
assert.Len(t, ms.vms, 0)
}
})
}
}
func TestGetFreePort(t *testing.T) {
port, err := getFreePort()
assert.NoError(t, err)
assert.Greater(t, port, 0)
}
func TestPublishEvent(t *testing.T) {
tests := []struct {
name string
event string
computationID string
status string
details json.RawMessage
}{
{
name: "Standard event",
event: "test-event",
computationID: "test-computation",
status: "test-status",
details: nil,
},
{
name: "Event with details",
event: "detailed-event",
computationID: "detailed-computation",
status: "detailed-status",
details: json.RawMessage(`{"key": "value"}`),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
eventsChan := make(chan *manager.ClientStreamMessage, 1)
ms := &managerService{
eventsChan: eventsChan,
}
ms.publishEvent(tt.event, tt.computationID, tt.status, tt.details)
assert.Len(t, eventsChan, 1)
event := <-eventsChan
assert.Equal(t, tt.event, event.GetAgentEvent().EventType)
assert.Equal(t, tt.computationID, event.GetAgentEvent().ComputationId)
assert.Equal(t, tt.status, event.GetAgentEvent().Status)
assert.Equal(t, "manager", event.GetAgentEvent().Originator)
assert.Equal(t, tt.details, json.RawMessage(event.GetAgentEvent().Details))
})
}
}
+1 -1
View File
@@ -101,7 +101,7 @@ type Config struct {
Monitor string `env:"MONITOR" envDefault:"pty"`
}
func constructQemuArgs(config Config) []string {
func (config Config) ConstructQemuArgs() []string {
args := []string{}
// virtualization
-93
View File
@@ -1,93 +0,0 @@
// Copyright (c) Ultraviolet
// SPDX-License-Identifier: Apache-2.0
package qemu
import (
"context"
"fmt"
"os/exec"
"github.com/gofrs/uuid"
"github.com/ultravioletrs/cocos/internal"
)
const (
firmwareVars = "OVMF_VARS"
KernelFile = "bzImage"
rootfsFile = "rootfs.cpio"
)
func CreateVM(ctx context.Context, cfg Config) (*exec.Cmd, error) {
// Create unique emu device identifiers
id, err := uuid.NewV4()
if err != nil {
return &exec.Cmd{}, err
}
qemuCfg := cfg
qemuCfg.NetDevConfig.ID = fmt.Sprintf("%s-%s", qemuCfg.NetDevConfig.ID, id)
qemuCfg.SevConfig.ID = fmt.Sprintf("%s-%s", qemuCfg.SevConfig.ID, id)
if !cfg.KernelHash {
// Copy firmware vars file
srcFile := qemuCfg.OVMFVarsConfig.File
dstFile := fmt.Sprintf("%s/%s-%s.fd", cfg.TmpFileLoc, firmwareVars, id)
err = internal.CopyFile(srcFile, dstFile)
if err != nil {
return &exec.Cmd{}, err
}
qemuCfg.OVMFVarsConfig.File = dstFile
}
// Copy img files
srcFile := qemuCfg.DiskImgConfig.KernelFile
dstFile := fmt.Sprintf("%s/%s-%s", cfg.TmpFileLoc, KernelFile, id)
err = internal.CopyFile(srcFile, dstFile)
if err != nil {
return &exec.Cmd{}, err
}
qemuCfg.DiskImgConfig.KernelFile = dstFile
srcFile = qemuCfg.DiskImgConfig.RootFsFile
dstFile = fmt.Sprintf("%s/%s-%s.gz", cfg.TmpFileLoc, rootfsFile, id)
err = internal.CopyFile(srcFile, dstFile)
if err != nil {
return &exec.Cmd{}, err
}
qemuCfg.DiskImgConfig.RootFsFile = dstFile
exe, args, err := ExecutableAndArgs(qemuCfg)
if err != nil {
return &exec.Cmd{}, err
}
cmd, err := runQemuVM(exe, args)
if err != nil {
return cmd, err
}
return cmd, nil
}
func ExecutableAndArgs(cfg Config) (string, []string, error) {
exe, err := exec.LookPath(cfg.QemuBinPath)
if err != nil {
return "", nil, err
}
args := constructQemuArgs(cfg)
if cfg.UseSudo {
args = append([]string{exe}, args...)
exe = "sudo"
}
return exe, args, nil
}
func runQemuVM(exe string, args []string) (*exec.Cmd, error) {
cmd, err := internal.RunCmdStart(exe, args...)
if err != nil {
return nil, err
}
return cmd, nil
}
+104
View File
@@ -0,0 +1,104 @@
// Copyright (c) Ultraviolet
// SPDX-License-Identifier: Apache-2.0
package qemu
import (
"fmt"
"os/exec"
"github.com/gofrs/uuid"
"github.com/ultravioletrs/cocos/internal"
"github.com/ultravioletrs/cocos/manager/vm"
"github.com/ultravioletrs/cocos/pkg/manager"
)
const (
firmwareVars = "OVMF_VARS"
KernelFile = "bzImage"
rootfsFile = "rootfs.cpio"
)
type qemuVM struct {
config Config
cmd *exec.Cmd
logsChan chan *manager.ClientStreamMessage
computationId string
}
func NewVM(config interface{}, logsChan chan *manager.ClientStreamMessage, computationId string) vm.VM {
return &qemuVM{
config: config.(Config),
logsChan: logsChan,
computationId: computationId,
}
}
func (v *qemuVM) Start() error {
// Create unique qemu device identifiers
id, err := uuid.NewV4()
if err != nil {
return err
}
qemuCfg := v.config
qemuCfg.NetDevConfig.ID = fmt.Sprintf("%s-%s", qemuCfg.NetDevConfig.ID, id)
qemuCfg.SevConfig.ID = fmt.Sprintf("%s-%s", qemuCfg.SevConfig.ID, id)
if !v.config.KernelHash {
// Copy firmware vars file
srcFile := qemuCfg.OVMFVarsConfig.File
dstFile := fmt.Sprintf("%s/%s-%s.fd", v.config.TmpFileLoc, firmwareVars, id)
err = internal.CopyFile(srcFile, dstFile)
if err != nil {
return err
}
qemuCfg.OVMFVarsConfig.File = dstFile
}
// Copy img files
srcFile := qemuCfg.DiskImgConfig.KernelFile
dstFile := fmt.Sprintf("%s/%s-%s", v.config.TmpFileLoc, KernelFile, id)
err = internal.CopyFile(srcFile, dstFile)
if err != nil {
return err
}
qemuCfg.DiskImgConfig.KernelFile = dstFile
srcFile = qemuCfg.DiskImgConfig.RootFsFile
dstFile = fmt.Sprintf("%s/%s-%s.gz", v.config.TmpFileLoc, rootfsFile, id)
err = internal.CopyFile(srcFile, dstFile)
if err != nil {
return err
}
qemuCfg.DiskImgConfig.RootFsFile = dstFile
exe, args, err := v.executableAndArgs()
if err != nil {
return err
}
v.cmd = exec.Command(exe, args...)
v.cmd.Stdout = &vm.Stdout{LogsChan: v.logsChan, ComputationId: v.computationId}
v.cmd.Stderr = &vm.Stderr{LogsChan: v.logsChan, ComputationId: v.computationId}
return v.cmd.Start()
}
func (v *qemuVM) Stop() error {
return v.cmd.Process.Kill()
}
func (v *qemuVM) executableAndArgs() (string, []string, error) {
exe, err := exec.LookPath(v.config.QemuBinPath)
if err != nil {
return "", nil, err
}
args := v.config.ConstructQemuArgs()
if v.config.UseSudo {
args = append([]string{exe}, args...)
exe = "sudo"
}
return exe, args, nil
}
+39
View File
@@ -0,0 +1,39 @@
// Copyright (c) Ultraviolet
// SPDX-License-Identifier: Apache-2.0
package qemu
import (
"os/exec"
"testing"
"github.com/stretchr/testify/assert"
"github.com/ultravioletrs/cocos/pkg/manager"
)
func TestNewVM(t *testing.T) {
config := Config{}
logsChan := make(chan *manager.ClientStreamMessage)
computationId := "test-computation"
nvm := NewVM(config, logsChan, computationId)
assert.NotNil(t, nvm)
assert.IsType(t, &qemuVM{}, nvm)
}
func TestVM_Stop(t *testing.T) {
// Setup
v := &qemuVM{
cmd: exec.Command("sleep", "1"),
}
err := v.cmd.Start()
assert.NoError(t, err)
// Test
err = v.Stop()
// Assert
assert.NoError(t, err)
assert.Error(t, v.cmd.Wait()) // Process should have been killed
}
+3 -3
View File
@@ -1,6 +1,6 @@
// Copyright (c) Ultraviolet
// SPDX-License-Identifier: Apache-2.0
package manager
package qemu
import (
"encoding/json"
@@ -11,8 +11,8 @@ import (
const VsockConfigPort uint32 = 9999
func SendAgentConfig(cid uint32, ac agent.Computation) error {
conn, err := vsock.Dial(cid, VsockConfigPort, nil)
func (v *qemuVM) SendAgentConfig(ac agent.Computation) error {
conn, err := vsock.Dial(uint32(v.config.GuestCID), VsockConfigPort, nil)
if err != nil {
return err
}
+24 -3
View File
@@ -15,6 +15,7 @@ import (
"github.com/cenkalti/backoff/v4"
"github.com/ultravioletrs/cocos/agent"
"github.com/ultravioletrs/cocos/manager/qemu"
"github.com/ultravioletrs/cocos/manager/vm"
"github.com/ultravioletrs/cocos/pkg/manager"
"golang.org/x/crypto/sha3"
"google.golang.org/protobuf/types/known/timestamppb"
@@ -48,6 +49,8 @@ var (
type Service interface {
// Run create a computation.
Run(ctx context.Context, c *manager.ComputationRunReq) (string, error)
// Stop stops a computation.
Stop(ctx context.Context, computationID string) error
// RetrieveAgentEventsLogs Retrieve and forward agent logs and events via vsock.
RetrieveAgentEventsLogs()
}
@@ -57,17 +60,21 @@ type managerService struct {
logger *slog.Logger
agents map[int]string // agent map of vsock cid to computationID.
eventsChan chan *manager.ClientStreamMessage
vms map[string]vm.VM
vmFactory vm.Provider
}
var _ Service = (*managerService)(nil)
// New instantiates the manager service implementation.
func New(qemuCfg qemu.Config, logger *slog.Logger, eventsChan chan *manager.ClientStreamMessage) Service {
func New(qemuCfg qemu.Config, logger *slog.Logger, eventsChan chan *manager.ClientStreamMessage, vmFactory vm.Provider) Service {
ms := &managerService{
qemuCfg: qemuCfg,
logger: logger,
agents: make(map[int]string),
vms: make(map[string]vm.VM),
eventsChan: eventsChan,
vmFactory: vmFactory,
}
return ms
}
@@ -118,16 +125,18 @@ func (ms *managerService) Run(ctx context.Context, c *manager.ComputationRunReq)
// Define host-data value of QEMU for SEV-SNP, with a base64 encoding of the computation hash.
ms.qemuCfg.SevConfig.HostData = base64.StdEncoding.EncodeToString(ch[:])
cvm := ms.vmFactory(ms.qemuCfg, ms.eventsChan, c.Id)
ms.publishEvent("vm-provision", c.Id, "in-progress", json.RawMessage{})
if _, err = qemu.CreateVM(ctx, ms.qemuCfg); err != nil {
if err = cvm.Start(); err != nil {
ms.publishEvent("vm-provision", c.Id, "failed", json.RawMessage{})
return "", err
}
ms.vms[c.Id] = cvm
ms.agents[ms.qemuCfg.VSockConfig.GuestCID] = c.Id
err = backoff.Retry(func() error {
return SendAgentConfig(uint32(ms.qemuCfg.VSockConfig.GuestCID), ac)
return cvm.SendAgentConfig(ac)
}, backoff.NewExponentialBackOff())
if err != nil {
return "", err
@@ -139,6 +148,18 @@ func (ms *managerService) Run(ctx context.Context, c *manager.ComputationRunReq)
return fmt.Sprint(ms.qemuCfg.HostFwdAgent), nil
}
func (ms *managerService) Stop(ctx context.Context, computationID string) error {
cvm, ok := ms.vms[computationID]
if !ok {
return ErrNotFound
}
if err := cvm.Stop(); err != nil {
return err
}
delete(ms.vms, computationID)
return nil
}
func getFreePort() (int, error) {
listener, err := net.Listen("tcp", "")
if err != nil {
+7
View File
@@ -29,6 +29,13 @@ func (tm *tracingMiddleware) Run(ctx context.Context, mc *pkgmanager.Computation
return tm.svc.Run(ctx, mc)
}
func (tm *tracingMiddleware) Stop(ctx context.Context, computationID string) error {
ctx, span := tm.tracer.Start(ctx, "stop")
defer span.End()
return tm.svc.Stop(ctx, computationID)
}
func (tm *tracingMiddleware) RetrieveAgentEventsLogs() {
tm.svc.RetrieveAgentEventsLogs()
}
+133
View File
@@ -0,0 +1,133 @@
// Copyright (c) Ultraviolet
// SPDX-License-Identifier: Apache-2.0
package vm
import (
"bytes"
"errors"
"io"
"github.com/ultravioletrs/cocos/pkg/manager"
"google.golang.org/protobuf/types/known/timestamppb"
)
var (
_ io.Writer = &Stdout{}
_ io.Writer = &Stderr{}
ErrFailedToSendMessage = errors.New("failed to send message to channel")
ErrPanicRecovered = errors.New("panic recovered: channel may be closed")
)
const bufSize = 1024
type Stdout struct {
LogsChan chan *manager.ClientStreamMessage
ComputationId string
}
// safeSend safely sends a message to the channel and returns an error on failure.
func safeSend(ch chan *manager.ClientStreamMessage, msg *manager.ClientStreamMessage) (err error) {
defer func() {
if r := recover(); r != nil {
// Recover from panic if the channel is closed
err = ErrPanicRecovered
}
}()
select {
case ch <- msg:
return nil
default:
// Channel is full or closed
return ErrFailedToSendMessage
}
}
// Write implements io.Writer.
func (s *Stdout) Write(p []byte) (n int, err error) {
inBuf := bytes.NewBuffer(p)
buf := make([]byte, bufSize)
for {
n, err := inBuf.Read(buf)
if err != nil {
if err == io.EOF {
break
}
return len(p) - inBuf.Len(), err
}
msg := &manager.ClientStreamMessage{
Message: &manager.ClientStreamMessage_AgentLog{
AgentLog: &manager.AgentLog{
Message: string(buf[:n]),
ComputationId: s.ComputationId,
Level: "debug",
Timestamp: timestamppb.Now(),
},
},
}
if err := safeSend(s.LogsChan, msg); err != nil {
return len(p) - inBuf.Len(), err
}
}
return len(p), nil
}
type Stderr struct {
LogsChan chan *manager.ClientStreamMessage
ComputationId string
}
// Write implements io.Writer.
func (s *Stderr) Write(p []byte) (n int, err error) {
inBuf := bytes.NewBuffer(p)
buf := make([]byte, bufSize)
for {
n, err := inBuf.Read(buf)
if err != nil {
if err == io.EOF {
break
}
return len(p) - inBuf.Len(), err
}
msg := &manager.ClientStreamMessage{
Message: &manager.ClientStreamMessage_AgentLog{
AgentLog: &manager.AgentLog{
Message: string(buf[:n]),
ComputationId: s.ComputationId,
Level: "error",
Timestamp: timestamppb.Now(),
},
},
}
if err := safeSend(s.LogsChan, msg); err != nil {
return len(p) - inBuf.Len(), err
}
}
// Ensure vm-provision failure message is sent
eventMsg := &manager.ClientStreamMessage{
Message: &manager.ClientStreamMessage_AgentEvent{
AgentEvent: &manager.AgentEvent{
ComputationId: s.ComputationId,
EventType: "vm-provision",
Timestamp: timestamppb.Now(),
Originator: "manager",
Status: "failed",
},
},
}
if err := safeSend(s.LogsChan, eventMsg); err != nil {
return len(p), err
}
return len(p), nil
}
+171
View File
@@ -0,0 +1,171 @@
// Copyright (c) Ultraviolet
// SPDX-License-Identifier: Apache-2.0
package vm
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/ultravioletrs/cocos/pkg/manager"
)
func TestStdoutWrite(t *testing.T) {
tests := []struct {
name string
input string
expectedWrites int
}{
{
name: "Single write within buffer size",
input: "Hello, World!",
expectedWrites: 1,
},
{
name: "Multiple writes within buffer size",
input: "This is a longer message that will be split into multiple writes.",
expectedWrites: 1,
},
{
name: "Large write exceeding buffer size",
input: string(make([]byte, bufSize*2+1)),
expectedWrites: 3,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
logsChan := make(chan *manager.ClientStreamMessage, 10)
s := &Stdout{
LogsChan: logsChan,
ComputationId: "test-computation",
}
n, err := s.Write([]byte(tt.input))
assert.NoError(t, err)
assert.Equal(t, len(tt.input), n)
var receivedWrites int
for i := 0; i < tt.expectedWrites; i++ {
select {
case msg := <-logsChan:
receivedWrites++
agentLog := msg.GetAgentLog()
assert.NotNil(t, agentLog)
assert.Equal(t, "test-computation", agentLog.ComputationId)
assert.Equal(t, "debug", agentLog.Level)
assert.NotEmpty(t, agentLog.Message)
assert.NotNil(t, agentLog.Timestamp)
case <-time.After(time.Second):
t.Fatal("Timed out waiting for log message")
}
}
assert.Equal(t, tt.expectedWrites, receivedWrites)
})
}
}
func TestStderrWrite(t *testing.T) {
tests := []struct {
name string
input string
expectedWrites int
}{
{
name: "Single write within buffer size",
input: "Error: Something went wrong",
expectedWrites: 1,
},
{
name: "Multiple writes within buffer size",
input: "This is a longer error message that will be split into multiple writes.",
expectedWrites: 1,
},
{
name: "Large write exceeding buffer size",
input: string(make([]byte, bufSize*2)),
expectedWrites: 3,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
logsChan := make(chan *manager.ClientStreamMessage, 10)
s := &Stderr{
LogsChan: logsChan,
ComputationId: "test-computation",
}
n, err := s.Write([]byte(tt.input))
assert.NoError(t, err)
assert.Equal(t, len(tt.input), n)
var receivedWrites int
for i := 0; i < tt.expectedWrites; i++ {
select {
case msg := <-logsChan:
receivedWrites++
switch msg.Message.(type) {
case *manager.ClientStreamMessage_AgentLog:
agentLog := msg.GetAgentLog()
assert.NotNil(t, agentLog)
assert.Equal(t, "test-computation", agentLog.ComputationId)
assert.Equal(t, "error", agentLog.Level)
assert.NotEmpty(t, agentLog.Message)
assert.NotNil(t, agentLog.Timestamp)
case *manager.ClientStreamMessage_AgentEvent:
agentEvent := msg.GetAgentEvent()
assert.NotNil(t, agentEvent)
assert.Equal(t, "test-computation", agentEvent.ComputationId)
assert.Equal(t, "vm-provision", agentEvent.EventType)
assert.Equal(t, "failed", agentEvent.Status)
assert.NotNil(t, agentEvent.Timestamp)
}
case <-time.After(time.Second):
t.Fatal("Timed out waiting for log message")
}
}
assert.Equal(t, tt.expectedWrites, receivedWrites)
})
}
}
func TestStdoutWriteErrorHandling(t *testing.T) {
logsChan := make(chan *manager.ClientStreamMessage, 1)
s := &Stdout{
LogsChan: logsChan,
ComputationId: "test-computation",
}
// Test with a closed channel to simulate an error condition
close(logsChan)
message := []byte("This should fail")
n, err := s.Write(message)
assert.Error(t, err)
assert.Equal(t, len(message), n)
assert.Equal(t, ErrPanicRecovered, err)
}
func TestStderrWriteErrorHandling(t *testing.T) {
logsChan := make(chan *manager.ClientStreamMessage, 1)
s := &Stderr{
LogsChan: logsChan,
ComputationId: "test-computation",
}
// Test with a closed channel to simulate an error condition
close(logsChan)
message := []byte("This should fail")
n, err := s.Write(message)
assert.Error(t, err)
assert.Equal(t, len(message), n)
assert.Equal(t, ErrPanicRecovered, err)
}
+52
View File
@@ -0,0 +1,52 @@
// Code generated by mockery v2.43.2. DO NOT EDIT.
// Copyright (c) Ultraviolet
// SPDX-License-Identifier: Apache-2.0
package mocks
import (
mock "github.com/stretchr/testify/mock"
manager "github.com/ultravioletrs/cocos/pkg/manager"
vm "github.com/ultravioletrs/cocos/manager/vm"
)
// Provider is an autogenerated mock type for the Provider type
type Provider struct {
mock.Mock
}
// Execute provides a mock function with given fields: config, logsChan, computationId
func (_m *Provider) Execute(config interface{}, logsChan chan *manager.ClientStreamMessage, computationId string) vm.VM {
ret := _m.Called(config, logsChan, computationId)
if len(ret) == 0 {
panic("no return value specified for Execute")
}
var r0 vm.VM
if rf, ok := ret.Get(0).(func(interface{}, chan *manager.ClientStreamMessage, string) vm.VM); ok {
r0 = rf(config, logsChan, computationId)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(vm.VM)
}
}
return r0
}
// NewProvider creates a new instance of Provider. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewProvider(t interface {
mock.TestingT
Cleanup(func())
}) *Provider {
mock := &Provider{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}
+84
View File
@@ -0,0 +1,84 @@
// Code generated by mockery v2.43.2. DO NOT EDIT.
// Copyright (c) Ultraviolet
// SPDX-License-Identifier: Apache-2.0
package mocks
import (
mock "github.com/stretchr/testify/mock"
agent "github.com/ultravioletrs/cocos/agent"
)
// VM is an autogenerated mock type for the VM type
type VM struct {
mock.Mock
}
// SendAgentConfig provides a mock function with given fields: ac
func (_m *VM) SendAgentConfig(ac agent.Computation) error {
ret := _m.Called(ac)
if len(ret) == 0 {
panic("no return value specified for SendAgentConfig")
}
var r0 error
if rf, ok := ret.Get(0).(func(agent.Computation) error); ok {
r0 = rf(ac)
} else {
r0 = ret.Error(0)
}
return r0
}
// Start provides a mock function with given fields:
func (_m *VM) Start() error {
ret := _m.Called()
if len(ret) == 0 {
panic("no return value specified for Start")
}
var r0 error
if rf, ok := ret.Get(0).(func() error); ok {
r0 = rf()
} else {
r0 = ret.Error(0)
}
return r0
}
// Stop provides a mock function with given fields:
func (_m *VM) Stop() error {
ret := _m.Called()
if len(ret) == 0 {
panic("no return value specified for Stop")
}
var r0 error
if rf, ok := ret.Get(0).(func() error); ok {
r0 = rf()
} else {
r0 = ret.Error(0)
}
return r0
}
// NewVM creates a new instance of VM. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewVM(t interface {
mock.TestingT
Cleanup(func())
}) *VM {
mock := &VM{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}
+20
View File
@@ -0,0 +1,20 @@
// Copyright (c) Ultraviolet
// SPDX-License-Identifier: Apache-2.0
package vm
import (
"github.com/ultravioletrs/cocos/agent"
"github.com/ultravioletrs/cocos/pkg/manager"
)
// VM represents a virtual machine.
//
//go:generate mockery --name VM --output=./mocks --filename vm.go --quiet --note "Copyright (c) Ultraviolet \n // SPDX-License-Identifier: Apache-2.0"
type VM interface {
Start() error
Stop() error
SendAgentConfig(ac agent.Computation) error
}
//go:generate mockery --name Provider --output=./mocks --filename provider.go --quiet --note "Copyright (c) Ultraviolet \n // SPDX-License-Identifier: Apache-2.0"
type Provider func(config interface{}, logsChan chan *manager.ClientStreamMessage, computationId string) VM
+210 -126
View File
@@ -3,7 +3,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.33.0
// protoc-gen-go v1.34.2
// protoc v4.25.3
// source: manager/manager.proto
@@ -71,6 +71,53 @@ func (x *Terminate) GetMessage() string {
return ""
}
type StopComputation struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
ComputationId string `protobuf:"bytes,1,opt,name=computation_id,json=computationId,proto3" json:"computation_id,omitempty"`
}
func (x *StopComputation) Reset() {
*x = StopComputation{}
if protoimpl.UnsafeEnabled {
mi := &file_manager_manager_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *StopComputation) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*StopComputation) ProtoMessage() {}
func (x *StopComputation) ProtoReflect() protoreflect.Message {
mi := &file_manager_manager_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use StopComputation.ProtoReflect.Descriptor instead.
func (*StopComputation) Descriptor() ([]byte, []int) {
return file_manager_manager_proto_rawDescGZIP(), []int{1}
}
func (x *StopComputation) GetComputationId() string {
if x != nil {
return x.ComputationId
}
return ""
}
type RunResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
@@ -83,7 +130,7 @@ type RunResponse struct {
func (x *RunResponse) Reset() {
*x = RunResponse{}
if protoimpl.UnsafeEnabled {
mi := &file_manager_manager_proto_msgTypes[1]
mi := &file_manager_manager_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -96,7 +143,7 @@ func (x *RunResponse) String() string {
func (*RunResponse) ProtoMessage() {}
func (x *RunResponse) ProtoReflect() protoreflect.Message {
mi := &file_manager_manager_proto_msgTypes[1]
mi := &file_manager_manager_proto_msgTypes[2]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -109,7 +156,7 @@ func (x *RunResponse) ProtoReflect() protoreflect.Message {
// Deprecated: Use RunResponse.ProtoReflect.Descriptor instead.
func (*RunResponse) Descriptor() ([]byte, []int) {
return file_manager_manager_proto_rawDescGZIP(), []int{1}
return file_manager_manager_proto_rawDescGZIP(), []int{2}
}
func (x *RunResponse) GetAgentPort() string {
@@ -142,7 +189,7 @@ type AgentEvent struct {
func (x *AgentEvent) Reset() {
*x = AgentEvent{}
if protoimpl.UnsafeEnabled {
mi := &file_manager_manager_proto_msgTypes[2]
mi := &file_manager_manager_proto_msgTypes[3]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -155,7 +202,7 @@ func (x *AgentEvent) String() string {
func (*AgentEvent) ProtoMessage() {}
func (x *AgentEvent) ProtoReflect() protoreflect.Message {
mi := &file_manager_manager_proto_msgTypes[2]
mi := &file_manager_manager_proto_msgTypes[3]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -168,7 +215,7 @@ func (x *AgentEvent) ProtoReflect() protoreflect.Message {
// Deprecated: Use AgentEvent.ProtoReflect.Descriptor instead.
func (*AgentEvent) Descriptor() ([]byte, []int) {
return file_manager_manager_proto_rawDescGZIP(), []int{2}
return file_manager_manager_proto_rawDescGZIP(), []int{3}
}
func (x *AgentEvent) GetEventType() string {
@@ -227,7 +274,7 @@ type AgentLog struct {
func (x *AgentLog) Reset() {
*x = AgentLog{}
if protoimpl.UnsafeEnabled {
mi := &file_manager_manager_proto_msgTypes[3]
mi := &file_manager_manager_proto_msgTypes[4]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -240,7 +287,7 @@ func (x *AgentLog) String() string {
func (*AgentLog) ProtoMessage() {}
func (x *AgentLog) ProtoReflect() protoreflect.Message {
mi := &file_manager_manager_proto_msgTypes[3]
mi := &file_manager_manager_proto_msgTypes[4]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -253,7 +300,7 @@ func (x *AgentLog) ProtoReflect() protoreflect.Message {
// Deprecated: Use AgentLog.ProtoReflect.Descriptor instead.
func (*AgentLog) Descriptor() ([]byte, []int) {
return file_manager_manager_proto_rawDescGZIP(), []int{3}
return file_manager_manager_proto_rawDescGZIP(), []int{4}
}
func (x *AgentLog) GetMessage() string {
@@ -300,7 +347,7 @@ type ClientStreamMessage struct {
func (x *ClientStreamMessage) Reset() {
*x = ClientStreamMessage{}
if protoimpl.UnsafeEnabled {
mi := &file_manager_manager_proto_msgTypes[4]
mi := &file_manager_manager_proto_msgTypes[5]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -313,7 +360,7 @@ func (x *ClientStreamMessage) String() string {
func (*ClientStreamMessage) ProtoMessage() {}
func (x *ClientStreamMessage) ProtoReflect() protoreflect.Message {
mi := &file_manager_manager_proto_msgTypes[4]
mi := &file_manager_manager_proto_msgTypes[5]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -326,7 +373,7 @@ func (x *ClientStreamMessage) ProtoReflect() protoreflect.Message {
// Deprecated: Use ClientStreamMessage.ProtoReflect.Descriptor instead.
func (*ClientStreamMessage) Descriptor() ([]byte, []int) {
return file_manager_manager_proto_rawDescGZIP(), []int{4}
return file_manager_manager_proto_rawDescGZIP(), []int{5}
}
func (m *ClientStreamMessage) GetMessage() isClientStreamMessage_Message {
@@ -388,13 +435,14 @@ type ServerStreamMessage struct {
//
// *ServerStreamMessage_RunReq
// *ServerStreamMessage_TerminateReq
// *ServerStreamMessage_StopComputation
Message isServerStreamMessage_Message `protobuf_oneof:"message"`
}
func (x *ServerStreamMessage) Reset() {
*x = ServerStreamMessage{}
if protoimpl.UnsafeEnabled {
mi := &file_manager_manager_proto_msgTypes[5]
mi := &file_manager_manager_proto_msgTypes[6]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -407,7 +455,7 @@ func (x *ServerStreamMessage) String() string {
func (*ServerStreamMessage) ProtoMessage() {}
func (x *ServerStreamMessage) ProtoReflect() protoreflect.Message {
mi := &file_manager_manager_proto_msgTypes[5]
mi := &file_manager_manager_proto_msgTypes[6]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -420,7 +468,7 @@ func (x *ServerStreamMessage) ProtoReflect() protoreflect.Message {
// Deprecated: Use ServerStreamMessage.ProtoReflect.Descriptor instead.
func (*ServerStreamMessage) Descriptor() ([]byte, []int) {
return file_manager_manager_proto_rawDescGZIP(), []int{5}
return file_manager_manager_proto_rawDescGZIP(), []int{6}
}
func (m *ServerStreamMessage) GetMessage() isServerStreamMessage_Message {
@@ -444,6 +492,13 @@ func (x *ServerStreamMessage) GetTerminateReq() *Terminate {
return nil
}
func (x *ServerStreamMessage) GetStopComputation() *StopComputation {
if x, ok := x.GetMessage().(*ServerStreamMessage_StopComputation); ok {
return x.StopComputation
}
return nil
}
type isServerStreamMessage_Message interface {
isServerStreamMessage_Message()
}
@@ -456,10 +511,16 @@ type ServerStreamMessage_TerminateReq struct {
TerminateReq *Terminate `protobuf:"bytes,2,opt,name=terminateReq,proto3,oneof"`
}
type ServerStreamMessage_StopComputation struct {
StopComputation *StopComputation `protobuf:"bytes,3,opt,name=stopComputation,proto3,oneof"`
}
func (*ServerStreamMessage_RunReq) isServerStreamMessage_Message() {}
func (*ServerStreamMessage_TerminateReq) isServerStreamMessage_Message() {}
func (*ServerStreamMessage_StopComputation) isServerStreamMessage_Message() {}
type ComputationRunReq struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
@@ -477,7 +538,7 @@ type ComputationRunReq struct {
func (x *ComputationRunReq) Reset() {
*x = ComputationRunReq{}
if protoimpl.UnsafeEnabled {
mi := &file_manager_manager_proto_msgTypes[6]
mi := &file_manager_manager_proto_msgTypes[7]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -490,7 +551,7 @@ func (x *ComputationRunReq) String() string {
func (*ComputationRunReq) ProtoMessage() {}
func (x *ComputationRunReq) ProtoReflect() protoreflect.Message {
mi := &file_manager_manager_proto_msgTypes[6]
mi := &file_manager_manager_proto_msgTypes[7]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -503,7 +564,7 @@ func (x *ComputationRunReq) ProtoReflect() protoreflect.Message {
// Deprecated: Use ComputationRunReq.ProtoReflect.Descriptor instead.
func (*ComputationRunReq) Descriptor() ([]byte, []int) {
return file_manager_manager_proto_rawDescGZIP(), []int{6}
return file_manager_manager_proto_rawDescGZIP(), []int{7}
}
func (x *ComputationRunReq) GetId() string {
@@ -566,7 +627,7 @@ type ResultConsumer struct {
func (x *ResultConsumer) Reset() {
*x = ResultConsumer{}
if protoimpl.UnsafeEnabled {
mi := &file_manager_manager_proto_msgTypes[7]
mi := &file_manager_manager_proto_msgTypes[8]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -579,7 +640,7 @@ func (x *ResultConsumer) String() string {
func (*ResultConsumer) ProtoMessage() {}
func (x *ResultConsumer) ProtoReflect() protoreflect.Message {
mi := &file_manager_manager_proto_msgTypes[7]
mi := &file_manager_manager_proto_msgTypes[8]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -592,7 +653,7 @@ func (x *ResultConsumer) ProtoReflect() protoreflect.Message {
// Deprecated: Use ResultConsumer.ProtoReflect.Descriptor instead.
func (*ResultConsumer) Descriptor() ([]byte, []int) {
return file_manager_manager_proto_rawDescGZIP(), []int{7}
return file_manager_manager_proto_rawDescGZIP(), []int{8}
}
func (x *ResultConsumer) GetUserKey() []byte {
@@ -614,7 +675,7 @@ type Dataset struct {
func (x *Dataset) Reset() {
*x = Dataset{}
if protoimpl.UnsafeEnabled {
mi := &file_manager_manager_proto_msgTypes[8]
mi := &file_manager_manager_proto_msgTypes[9]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -627,7 +688,7 @@ func (x *Dataset) String() string {
func (*Dataset) ProtoMessage() {}
func (x *Dataset) ProtoReflect() protoreflect.Message {
mi := &file_manager_manager_proto_msgTypes[8]
mi := &file_manager_manager_proto_msgTypes[9]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -640,7 +701,7 @@ func (x *Dataset) ProtoReflect() protoreflect.Message {
// Deprecated: Use Dataset.ProtoReflect.Descriptor instead.
func (*Dataset) Descriptor() ([]byte, []int) {
return file_manager_manager_proto_rawDescGZIP(), []int{8}
return file_manager_manager_proto_rawDescGZIP(), []int{9}
}
func (x *Dataset) GetHash() []byte {
@@ -669,7 +730,7 @@ type Algorithm struct {
func (x *Algorithm) Reset() {
*x = Algorithm{}
if protoimpl.UnsafeEnabled {
mi := &file_manager_manager_proto_msgTypes[9]
mi := &file_manager_manager_proto_msgTypes[10]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -682,7 +743,7 @@ func (x *Algorithm) String() string {
func (*Algorithm) ProtoMessage() {}
func (x *Algorithm) ProtoReflect() protoreflect.Message {
mi := &file_manager_manager_proto_msgTypes[9]
mi := &file_manager_manager_proto_msgTypes[10]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -695,7 +756,7 @@ func (x *Algorithm) ProtoReflect() protoreflect.Message {
// Deprecated: Use Algorithm.ProtoReflect.Descriptor instead.
func (*Algorithm) Descriptor() ([]byte, []int) {
return file_manager_manager_proto_rawDescGZIP(), []int{9}
return file_manager_manager_proto_rawDescGZIP(), []int{10}
}
func (x *Algorithm) GetHash() []byte {
@@ -730,7 +791,7 @@ type AgentConfig struct {
func (x *AgentConfig) Reset() {
*x = AgentConfig{}
if protoimpl.UnsafeEnabled {
mi := &file_manager_manager_proto_msgTypes[10]
mi := &file_manager_manager_proto_msgTypes[11]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -743,7 +804,7 @@ func (x *AgentConfig) String() string {
func (*AgentConfig) ProtoMessage() {}
func (x *AgentConfig) ProtoReflect() protoreflect.Message {
mi := &file_manager_manager_proto_msgTypes[10]
mi := &file_manager_manager_proto_msgTypes[11]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -756,7 +817,7 @@ func (x *AgentConfig) ProtoReflect() protoreflect.Message {
// Deprecated: Use AgentConfig.ProtoReflect.Descriptor instead.
func (*AgentConfig) Descriptor() ([]byte, []int) {
return file_manager_manager_proto_rawDescGZIP(), []int{10}
return file_manager_manager_proto_rawDescGZIP(), []int{11}
}
func (x *AgentConfig) GetPort() string {
@@ -824,57 +885,65 @@ var file_manager_manager_proto_rawDesc = []byte{
0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x22, 0x25, 0x0a, 0x09, 0x54, 0x65, 0x72, 0x6d, 0x69, 0x6e, 0x61, 0x74, 0x65, 0x12, 0x18,
0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52,
0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x53, 0x0a, 0x0b, 0x52, 0x75, 0x6e, 0x52,
0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x61, 0x67, 0x65, 0x6e, 0x74,
0x5f, 0x70, 0x6f, 0x72, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x61, 0x67, 0x65,
0x6e, 0x74, 0x50, 0x6f, 0x72, 0x74, 0x12, 0x25, 0x0a, 0x0e, 0x63, 0x6f, 0x6d, 0x70, 0x75, 0x74,
0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d,
0x63, 0x6f, 0x6d, 0x70, 0x75, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x22, 0xde, 0x01,
0x0a, 0x0a, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x1d, 0x0a, 0x0a,
0x65, 0x76, 0x65, 0x6e, 0x74, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09,
0x52, 0x09, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x38, 0x0a, 0x09, 0x74,
0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a,
0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66,
0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65,
0x73, 0x74, 0x61, 0x6d, 0x70, 0x12, 0x25, 0x0a, 0x0e, 0x63, 0x6f, 0x6d, 0x70, 0x75, 0x74, 0x61,
0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x63,
0x6f, 0x6d, 0x70, 0x75, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x12, 0x18, 0x0a, 0x07,
0x64, 0x65, 0x74, 0x61, 0x69, 0x6c, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x64,
0x65, 0x74, 0x61, 0x69, 0x6c, 0x73, 0x12, 0x1e, 0x0a, 0x0a, 0x6f, 0x72, 0x69, 0x67, 0x69, 0x6e,
0x61, 0x74, 0x6f, 0x72, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x6f, 0x72, 0x69, 0x67,
0x69, 0x6e, 0x61, 0x74, 0x6f, 0x72, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73,
0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x22, 0x9b,
0x01, 0x0a, 0x08, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x4c, 0x6f, 0x67, 0x12, 0x18, 0x0a, 0x07, 0x6d,
0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6d, 0x65,
0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x25, 0x0a, 0x0e, 0x63, 0x6f, 0x6d, 0x70, 0x75, 0x74, 0x61,
0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x63,
0x6f, 0x6d, 0x70, 0x75, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x12, 0x14, 0x0a, 0x05,
0x6c, 0x65, 0x76, 0x65, 0x6c, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x6c, 0x65, 0x76,
0x65, 0x6c, 0x12, 0x38, 0x0a, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18,
0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70,
0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d,
0x70, 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x22, 0xbb, 0x01, 0x0a,
0x13, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x4d, 0x65, 0x73,
0x73, 0x61, 0x67, 0x65, 0x12, 0x30, 0x0a, 0x09, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x5f, 0x6c, 0x6f,
0x67, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65,
0x72, 0x2e, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x4c, 0x6f, 0x67, 0x48, 0x00, 0x52, 0x08, 0x61, 0x67,
0x65, 0x6e, 0x74, 0x4c, 0x6f, 0x67, 0x12, 0x36, 0x0a, 0x0b, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x5f,
0x65, 0x76, 0x65, 0x6e, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x6d, 0x61,
0x6e, 0x61, 0x67, 0x65, 0x72, 0x2e, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x45, 0x76, 0x65, 0x6e, 0x74,
0x48, 0x00, 0x52, 0x0a, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x2f,
0x0a, 0x07, 0x72, 0x75, 0x6e, 0x5f, 0x72, 0x65, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32,
0x14, 0x2e, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x2e, 0x52, 0x75, 0x6e, 0x52, 0x65, 0x73,
0x70, 0x6f, 0x6e, 0x73, 0x65, 0x48, 0x00, 0x52, 0x06, 0x72, 0x75, 0x6e, 0x52, 0x65, 0x73, 0x42,
0x09, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x90, 0x01, 0x0a, 0x13, 0x53,
0x65, 0x72, 0x76, 0x65, 0x72, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x4d, 0x65, 0x73, 0x73, 0x61,
0x67, 0x65, 0x12, 0x34, 0x0a, 0x06, 0x72, 0x75, 0x6e, 0x52, 0x65, 0x71, 0x18, 0x01, 0x20, 0x01,
0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x2e, 0x43, 0x6f, 0x6d,
0x70, 0x75, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x75, 0x6e, 0x52, 0x65, 0x71, 0x48, 0x00,
0x52, 0x06, 0x72, 0x75, 0x6e, 0x52, 0x65, 0x71, 0x12, 0x38, 0x0a, 0x0c, 0x74, 0x65, 0x72, 0x6d,
0x69, 0x6e, 0x61, 0x74, 0x65, 0x52, 0x65, 0x71, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12,
0x2e, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x2e, 0x54, 0x65, 0x72, 0x6d, 0x69, 0x6e, 0x61,
0x74, 0x65, 0x48, 0x00, 0x52, 0x0c, 0x74, 0x65, 0x72, 0x6d, 0x69, 0x6e, 0x61, 0x74, 0x65, 0x52,
0x65, 0x71, 0x42, 0x09, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0xb6, 0x02,
0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x38, 0x0a, 0x0f, 0x53, 0x74, 0x6f, 0x70,
0x43, 0x6f, 0x6d, 0x70, 0x75, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x25, 0x0a, 0x0e, 0x63,
0x6f, 0x6d, 0x70, 0x75, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20,
0x01, 0x28, 0x09, 0x52, 0x0d, 0x63, 0x6f, 0x6d, 0x70, 0x75, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e,
0x49, 0x64, 0x22, 0x53, 0x0a, 0x0b, 0x52, 0x75, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73,
0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x5f, 0x70, 0x6f, 0x72, 0x74, 0x18,
0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x50, 0x6f, 0x72, 0x74,
0x12, 0x25, 0x0a, 0x0e, 0x63, 0x6f, 0x6d, 0x70, 0x75, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f,
0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x63, 0x6f, 0x6d, 0x70, 0x75, 0x74,
0x61, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x22, 0xde, 0x01, 0x0a, 0x0a, 0x41, 0x67, 0x65, 0x6e,
0x74, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x1d, 0x0a, 0x0a, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x5f,
0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x65, 0x76, 0x65, 0x6e,
0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x38, 0x0a, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61,
0x6d, 0x70, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c,
0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73,
0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x12,
0x25, 0x0a, 0x0e, 0x63, 0x6f, 0x6d, 0x70, 0x75, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x69,
0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x63, 0x6f, 0x6d, 0x70, 0x75, 0x74, 0x61,
0x74, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x12, 0x18, 0x0a, 0x07, 0x64, 0x65, 0x74, 0x61, 0x69, 0x6c,
0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x64, 0x65, 0x74, 0x61, 0x69, 0x6c, 0x73,
0x12, 0x1e, 0x0a, 0x0a, 0x6f, 0x72, 0x69, 0x67, 0x69, 0x6e, 0x61, 0x74, 0x6f, 0x72, 0x18, 0x05,
0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x6f, 0x72, 0x69, 0x67, 0x69, 0x6e, 0x61, 0x74, 0x6f, 0x72,
0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09,
0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x22, 0x9b, 0x01, 0x0a, 0x08, 0x41, 0x67, 0x65,
0x6e, 0x74, 0x4c, 0x6f, 0x67, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65,
0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12,
0x25, 0x0a, 0x0e, 0x63, 0x6f, 0x6d, 0x70, 0x75, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x69,
0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x63, 0x6f, 0x6d, 0x70, 0x75, 0x74, 0x61,
0x74, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x6c, 0x65, 0x76, 0x65, 0x6c, 0x18,
0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x6c, 0x65, 0x76, 0x65, 0x6c, 0x12, 0x38, 0x0a, 0x09,
0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32,
0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75,
0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x74, 0x69, 0x6d,
0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x22, 0xbb, 0x01, 0x0a, 0x13, 0x43, 0x6c, 0x69, 0x65, 0x6e,
0x74, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x30,
0x0a, 0x09, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x5f, 0x6c, 0x6f, 0x67, 0x18, 0x01, 0x20, 0x01, 0x28,
0x0b, 0x32, 0x11, 0x2e, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x2e, 0x41, 0x67, 0x65, 0x6e,
0x74, 0x4c, 0x6f, 0x67, 0x48, 0x00, 0x52, 0x08, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x4c, 0x6f, 0x67,
0x12, 0x36, 0x0a, 0x0b, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x5f, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x18,
0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x2e,
0x41, 0x67, 0x65, 0x6e, 0x74, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x48, 0x00, 0x52, 0x0a, 0x61, 0x67,
0x65, 0x6e, 0x74, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x2f, 0x0a, 0x07, 0x72, 0x75, 0x6e, 0x5f,
0x72, 0x65, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x6d, 0x61, 0x6e, 0x61,
0x67, 0x65, 0x72, 0x2e, 0x52, 0x75, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x48,
0x00, 0x52, 0x06, 0x72, 0x75, 0x6e, 0x52, 0x65, 0x73, 0x42, 0x09, 0x0a, 0x07, 0x6d, 0x65, 0x73,
0x73, 0x61, 0x67, 0x65, 0x22, 0xd6, 0x01, 0x0a, 0x13, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x53,
0x74, 0x72, 0x65, 0x61, 0x6d, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x34, 0x0a, 0x06,
0x72, 0x75, 0x6e, 0x52, 0x65, 0x71, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x6d,
0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x2e, 0x43, 0x6f, 0x6d, 0x70, 0x75, 0x74, 0x61, 0x74, 0x69,
0x6f, 0x6e, 0x52, 0x75, 0x6e, 0x52, 0x65, 0x71, 0x48, 0x00, 0x52, 0x06, 0x72, 0x75, 0x6e, 0x52,
0x65, 0x71, 0x12, 0x38, 0x0a, 0x0c, 0x74, 0x65, 0x72, 0x6d, 0x69, 0x6e, 0x61, 0x74, 0x65, 0x52,
0x65, 0x71, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x6d, 0x61, 0x6e, 0x61, 0x67,
0x65, 0x72, 0x2e, 0x54, 0x65, 0x72, 0x6d, 0x69, 0x6e, 0x61, 0x74, 0x65, 0x48, 0x00, 0x52, 0x0c,
0x74, 0x65, 0x72, 0x6d, 0x69, 0x6e, 0x61, 0x74, 0x65, 0x52, 0x65, 0x71, 0x12, 0x44, 0x0a, 0x0f,
0x73, 0x74, 0x6f, 0x70, 0x43, 0x6f, 0x6d, 0x70, 0x75, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18,
0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x18, 0x2e, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x2e,
0x53, 0x74, 0x6f, 0x70, 0x43, 0x6f, 0x6d, 0x70, 0x75, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x48,
0x00, 0x52, 0x0f, 0x73, 0x74, 0x6f, 0x70, 0x43, 0x6f, 0x6d, 0x70, 0x75, 0x74, 0x61, 0x74, 0x69,
0x6f, 0x6e, 0x42, 0x09, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0xb6, 0x02,
0x0a, 0x11, 0x43, 0x6f, 0x6d, 0x70, 0x75, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x75, 0x6e,
0x52, 0x65, 0x71, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52,
0x02, 0x69, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28,
@@ -942,40 +1011,42 @@ func file_manager_manager_proto_rawDescGZIP() []byte {
return file_manager_manager_proto_rawDescData
}
var file_manager_manager_proto_msgTypes = make([]protoimpl.MessageInfo, 11)
var file_manager_manager_proto_goTypes = []interface{}{
var file_manager_manager_proto_msgTypes = make([]protoimpl.MessageInfo, 12)
var file_manager_manager_proto_goTypes = []any{
(*Terminate)(nil), // 0: manager.Terminate
(*RunResponse)(nil), // 1: manager.RunResponse
(*AgentEvent)(nil), // 2: manager.AgentEvent
(*AgentLog)(nil), // 3: manager.AgentLog
(*ClientStreamMessage)(nil), // 4: manager.ClientStreamMessage
(*ServerStreamMessage)(nil), // 5: manager.ServerStreamMessage
(*ComputationRunReq)(nil), // 6: manager.ComputationRunReq
(*ResultConsumer)(nil), // 7: manager.ResultConsumer
(*Dataset)(nil), // 8: manager.Dataset
(*Algorithm)(nil), // 9: manager.Algorithm
(*AgentConfig)(nil), // 10: manager.AgentConfig
(*timestamppb.Timestamp)(nil), // 11: google.protobuf.Timestamp
(*StopComputation)(nil), // 1: manager.StopComputation
(*RunResponse)(nil), // 2: manager.RunResponse
(*AgentEvent)(nil), // 3: manager.AgentEvent
(*AgentLog)(nil), // 4: manager.AgentLog
(*ClientStreamMessage)(nil), // 5: manager.ClientStreamMessage
(*ServerStreamMessage)(nil), // 6: manager.ServerStreamMessage
(*ComputationRunReq)(nil), // 7: manager.ComputationRunReq
(*ResultConsumer)(nil), // 8: manager.ResultConsumer
(*Dataset)(nil), // 9: manager.Dataset
(*Algorithm)(nil), // 10: manager.Algorithm
(*AgentConfig)(nil), // 11: manager.AgentConfig
(*timestamppb.Timestamp)(nil), // 12: google.protobuf.Timestamp
}
var file_manager_manager_proto_depIdxs = []int32{
11, // 0: manager.AgentEvent.timestamp:type_name -> google.protobuf.Timestamp
11, // 1: manager.AgentLog.timestamp:type_name -> google.protobuf.Timestamp
3, // 2: manager.ClientStreamMessage.agent_log:type_name -> manager.AgentLog
2, // 3: manager.ClientStreamMessage.agent_event:type_name -> manager.AgentEvent
1, // 4: manager.ClientStreamMessage.run_res:type_name -> manager.RunResponse
6, // 5: manager.ServerStreamMessage.runReq:type_name -> manager.ComputationRunReq
12, // 0: manager.AgentEvent.timestamp:type_name -> google.protobuf.Timestamp
12, // 1: manager.AgentLog.timestamp:type_name -> google.protobuf.Timestamp
4, // 2: manager.ClientStreamMessage.agent_log:type_name -> manager.AgentLog
3, // 3: manager.ClientStreamMessage.agent_event:type_name -> manager.AgentEvent
2, // 4: manager.ClientStreamMessage.run_res:type_name -> manager.RunResponse
7, // 5: manager.ServerStreamMessage.runReq:type_name -> manager.ComputationRunReq
0, // 6: manager.ServerStreamMessage.terminateReq:type_name -> manager.Terminate
8, // 7: manager.ComputationRunReq.datasets:type_name -> manager.Dataset
9, // 8: manager.ComputationRunReq.algorithm:type_name -> manager.Algorithm
7, // 9: manager.ComputationRunReq.result_consumers:type_name -> manager.ResultConsumer
10, // 10: manager.ComputationRunReq.agent_config:type_name -> manager.AgentConfig
4, // 11: manager.ManagerService.Process:input_type -> manager.ClientStreamMessage
5, // 12: manager.ManagerService.Process:output_type -> manager.ServerStreamMessage
12, // [12:13] is the sub-list for method output_type
11, // [11:12] is the sub-list for method input_type
11, // [11:11] is the sub-list for extension type_name
11, // [11:11] is the sub-list for extension extendee
0, // [0:11] is the sub-list for field type_name
1, // 7: manager.ServerStreamMessage.stopComputation:type_name -> manager.StopComputation
9, // 8: manager.ComputationRunReq.datasets:type_name -> manager.Dataset
10, // 9: manager.ComputationRunReq.algorithm:type_name -> manager.Algorithm
8, // 10: manager.ComputationRunReq.result_consumers:type_name -> manager.ResultConsumer
11, // 11: manager.ComputationRunReq.agent_config:type_name -> manager.AgentConfig
5, // 12: manager.ManagerService.Process:input_type -> manager.ClientStreamMessage
6, // 13: manager.ManagerService.Process:output_type -> manager.ServerStreamMessage
13, // [13:14] is the sub-list for method output_type
12, // [12:13] is the sub-list for method input_type
12, // [12:12] is the sub-list for extension type_name
12, // [12:12] is the sub-list for extension extendee
0, // [0:12] is the sub-list for field type_name
}
func init() { file_manager_manager_proto_init() }
@@ -984,7 +1055,7 @@ func file_manager_manager_proto_init() {
return
}
if !protoimpl.UnsafeEnabled {
file_manager_manager_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
file_manager_manager_proto_msgTypes[0].Exporter = func(v any, i int) any {
switch v := v.(*Terminate); i {
case 0:
return &v.state
@@ -996,7 +1067,19 @@ func file_manager_manager_proto_init() {
return nil
}
}
file_manager_manager_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
file_manager_manager_proto_msgTypes[1].Exporter = func(v any, i int) any {
switch v := v.(*StopComputation); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_manager_manager_proto_msgTypes[2].Exporter = func(v any, i int) any {
switch v := v.(*RunResponse); i {
case 0:
return &v.state
@@ -1008,7 +1091,7 @@ func file_manager_manager_proto_init() {
return nil
}
}
file_manager_manager_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
file_manager_manager_proto_msgTypes[3].Exporter = func(v any, i int) any {
switch v := v.(*AgentEvent); i {
case 0:
return &v.state
@@ -1020,7 +1103,7 @@ func file_manager_manager_proto_init() {
return nil
}
}
file_manager_manager_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} {
file_manager_manager_proto_msgTypes[4].Exporter = func(v any, i int) any {
switch v := v.(*AgentLog); i {
case 0:
return &v.state
@@ -1032,7 +1115,7 @@ func file_manager_manager_proto_init() {
return nil
}
}
file_manager_manager_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} {
file_manager_manager_proto_msgTypes[5].Exporter = func(v any, i int) any {
switch v := v.(*ClientStreamMessage); i {
case 0:
return &v.state
@@ -1044,7 +1127,7 @@ func file_manager_manager_proto_init() {
return nil
}
}
file_manager_manager_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} {
file_manager_manager_proto_msgTypes[6].Exporter = func(v any, i int) any {
switch v := v.(*ServerStreamMessage); i {
case 0:
return &v.state
@@ -1056,7 +1139,7 @@ func file_manager_manager_proto_init() {
return nil
}
}
file_manager_manager_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} {
file_manager_manager_proto_msgTypes[7].Exporter = func(v any, i int) any {
switch v := v.(*ComputationRunReq); i {
case 0:
return &v.state
@@ -1068,7 +1151,7 @@ func file_manager_manager_proto_init() {
return nil
}
}
file_manager_manager_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} {
file_manager_manager_proto_msgTypes[8].Exporter = func(v any, i int) any {
switch v := v.(*ResultConsumer); i {
case 0:
return &v.state
@@ -1080,7 +1163,7 @@ func file_manager_manager_proto_init() {
return nil
}
}
file_manager_manager_proto_msgTypes[8].Exporter = func(v interface{}, i int) interface{} {
file_manager_manager_proto_msgTypes[9].Exporter = func(v any, i int) any {
switch v := v.(*Dataset); i {
case 0:
return &v.state
@@ -1092,7 +1175,7 @@ func file_manager_manager_proto_init() {
return nil
}
}
file_manager_manager_proto_msgTypes[9].Exporter = func(v interface{}, i int) interface{} {
file_manager_manager_proto_msgTypes[10].Exporter = func(v any, i int) any {
switch v := v.(*Algorithm); i {
case 0:
return &v.state
@@ -1104,7 +1187,7 @@ func file_manager_manager_proto_init() {
return nil
}
}
file_manager_manager_proto_msgTypes[10].Exporter = func(v interface{}, i int) interface{} {
file_manager_manager_proto_msgTypes[11].Exporter = func(v any, i int) any {
switch v := v.(*AgentConfig); i {
case 0:
return &v.state
@@ -1117,14 +1200,15 @@ func file_manager_manager_proto_init() {
}
}
}
file_manager_manager_proto_msgTypes[4].OneofWrappers = []interface{}{
file_manager_manager_proto_msgTypes[5].OneofWrappers = []any{
(*ClientStreamMessage_AgentLog)(nil),
(*ClientStreamMessage_AgentEvent)(nil),
(*ClientStreamMessage_RunRes)(nil),
}
file_manager_manager_proto_msgTypes[5].OneofWrappers = []interface{}{
file_manager_manager_proto_msgTypes[6].OneofWrappers = []any{
(*ServerStreamMessage_RunReq)(nil),
(*ServerStreamMessage_TerminateReq)(nil),
(*ServerStreamMessage_StopComputation)(nil),
}
type x struct{}
out := protoimpl.TypeBuilder{
@@ -1132,7 +1216,7 @@ func file_manager_manager_proto_init() {
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_manager_manager_proto_rawDesc,
NumEnums: 0,
NumMessages: 11,
NumMessages: 12,
NumExtensions: 0,
NumServices: 1,
},
+7 -6
View File
@@ -3,7 +3,7 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.3.0
// - protoc-gen-go-grpc v1.4.0
// - protoc v4.25.3
// source: manager/manager.proto
@@ -18,8 +18,8 @@ import (
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.32.0 or later.
const _ = grpc.SupportPackageIsVersion7
// Requires gRPC-Go v1.62.0 or later.
const _ = grpc.SupportPackageIsVersion8
const (
ManagerService_Process_FullMethodName = "/manager.ManagerService/Process"
@@ -41,11 +41,12 @@ func NewManagerServiceClient(cc grpc.ClientConnInterface) ManagerServiceClient {
}
func (c *managerServiceClient) Process(ctx context.Context, opts ...grpc.CallOption) (ManagerService_ProcessClient, error) {
stream, err := c.cc.NewStream(ctx, &ManagerService_ServiceDesc.Streams[0], ManagerService_Process_FullMethodName, opts...)
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
stream, err := c.cc.NewStream(ctx, &ManagerService_ServiceDesc.Streams[0], ManagerService_Process_FullMethodName, cOpts...)
if err != nil {
return nil, err
}
x := &managerServiceProcessClient{stream}
x := &managerServiceProcessClient{ClientStream: stream}
return x, nil
}
@@ -100,7 +101,7 @@ func RegisterManagerServiceServer(s grpc.ServiceRegistrar, srv ManagerServiceSer
}
func _ManagerService_Process_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(ManagerServiceServer).Process(&managerServiceProcessServer{stream})
return srv.(ManagerServiceServer).Process(&managerServiceProcessServer{ServerStream: stream})
}
type ManagerService_ProcessServer interface {
+2 -1
View File
@@ -16,6 +16,7 @@ import (
"github.com/mdlayher/vsock"
"github.com/ultravioletrs/cocos/agent"
"github.com/ultravioletrs/cocos/manager"
"github.com/ultravioletrs/cocos/manager/qemu"
pkgmanager "github.com/ultravioletrs/cocos/pkg/manager"
"golang.org/x/crypto/sha3"
"google.golang.org/protobuf/proto"
@@ -89,7 +90,7 @@ func main() {
}
func SendAgentConfig(cid uint32, ac agent.Computation) error {
conn, err := vsock.Dial(cid, manager.VsockConfigPort, nil)
conn, err := vsock.Dial(cid, qemu.VsockConfigPort, nil)
if err != nil {
return err
}