COCOS-77 - Stream agent logs and events over gRPC (#78)

* Refactor GRPC manager service and client

The manager service and client have been restructured for stream communication, facilitating real-time agent events, logs, and run responses. The `Run` RPC is replaced by the `Process` stream RPC, enabling bidirectional streaming between clients and the manager service. This allows continuous interchange of different message types including `WhoAmIRequest`, `AgentLog`, `AgentEvent`, and `RunResponse`.

Several message types have been adjusted and new fields introduced, like `AgentPort` in `RunResponse` and various agent-config attributes including CA files and instance IDs, to support TLS client authentication and distinguish between agent instances.

We've also incorporated `google.protobuf.Timestamp` in `AgentEvent` for precise event logging. The client code reflects these modifications with updated method calls and stream handling logic for ongoing communication. Moreover, the updates necessitate corresponding changes throughout service, grpc, and sdk layers to interoperate with the new streaming approach.

The transition to streaming paves the way for a more interactive, flexible communication system that can accommodate future expansion and real-time monitoring features.

Signed-off-by: SammyOina <sammyoina@gmail.com>

* fix lint

Signed-off-by: SammyOina <sammyoina@gmail.com>

* Update GitHub Actions to Latest Versions

Upgraded GitHub Actions 'checkout' to version 4 and 'setup-go' to version 5 across various workflow files to leverage the latest features and improvements for better performance and reliability. This also ensures compatibility with Go version 1.21.x which is specified in the workflows.

Signed-off-by: SammyOina <sammyoina@gmail.com>

* Refactor event handling and logging

Reworked event and log processing to use channels instead of direct HTTP calls. Removed obsolete events package and consolidated event structures, leading to cleaner and more maintainable code. Updated agent events to use channels, enhanced error handling in log forwarding, and simplified manager `New` function signature to accept an event channel directly.

- Removed `events` and `agentevents` packages to reduce complexity.
- Replaced direct event server communication with internal channel usage.
- Introduced `AgentEvent` struct in events.go for standardized event objects.
- Adapted `managerService` to dispatch events and logs through channels.
- Streamlined manager construction by removing the now-unnecessary event service and host IP parameters.

This change results in a more robust and easier to extend event and log management system within the agent-manager interaction.

Signed-off-by: SammyOina <sammyoina@gmail.com>

* fix ci

Signed-off-by: SammyOina <sammyoina@gmail.com>

* remove unused code

Signed-off-by: SammyOina <sammyoina@gmail.com>

* add comments

Signed-off-by: SammyOina <sammyoina@gmail.com>

---------

Signed-off-by: SammyOina <sammyoina@gmail.com>
This commit is contained in:
Sammy Kerata Oina
2024-02-07 14:08:39 +03:00
committed by GitHub
parent 3e2be03047
commit 8975e28437
29 changed files with 877 additions and 659 deletions
-5
View File
@@ -1,5 +1,3 @@
# Docker: Environment variables in Compose
## Jaeger
COCOS_JAEGER_PORT=6831
COCOS_JAEGER_FRONTEND=16686
@@ -8,9 +6,6 @@ COCOS_JAEGER_CONFIGS=5778
COCOS_JAEGER_URL=http://jaeger:14268/api/traces
COCOS_JAEGER_TRACE_RATIO=1.0
## Notification Server
COCOS_NOTIFICATION_SERVER_URL="http://localhost:9000"
## Core Services
### Manager
+2 -2
View File
@@ -24,10 +24,10 @@ jobs:
steps:
- name: Checkout code
uses: actions/checkout@v2
uses: actions/checkout@v4
- name: Install Go
uses: actions/setup-go@v3
uses: actions/setup-go@v5
with:
go-version: 1.21.x
+1 -1
View File
@@ -16,7 +16,7 @@ jobs:
sudo apt-get upgrade -y
- name: Install Go
uses: actions/setup-go@v4
uses: actions/setup-go@v5
with:
go-version: 1.21.x
cache-dependency-path: "go.sum"
+2 -2
View File
@@ -14,10 +14,10 @@ jobs:
steps:
- name: Checkout code
uses: actions/checkout@v2
uses: actions/checkout@v4
- name: Install Go
uses: actions/setup-go@v3
uses: actions/setup-go@v5
with:
go-version: 1.21.x
+13 -10
View File
@@ -7,22 +7,32 @@ import (
"time"
"github.com/mdlayher/vsock"
"github.com/ultravioletrs/cocos/manager/agentevents"
)
const VsockEventsPort uint32 = 9998
type service struct {
service string
computationID string
conn *vsock.Conn
}
type AgentEvent struct {
EventType string `json:"event_type"`
Timestamp time.Time `json:"timestamp"`
ComputationID string `json:"computation_id,omitempty"`
Details json.RawMessage `json:"details,omitempty"`
Originator string `json:"originator"`
Status string `json:"status,omitempty"`
}
type Service interface {
SendEvent(event, status string, details json.RawMessage) error
Close() error
}
func New(svc, computationID string) (Service, error) {
conn, err := vsock.Dial(vsock.Host, agentevents.VsockEventsPort, nil)
conn, err := vsock.Dial(vsock.Host, VsockEventsPort, nil)
if err != nil {
return nil, err
}
@@ -34,14 +44,7 @@ func New(svc, computationID string) (Service, error) {
}
func (s *service) SendEvent(event, status string, details json.RawMessage) error {
body := struct {
EventType string `json:"event_type"`
Timestamp time.Time `json:"timestamp"`
ComputationID string `json:"computation_id,omitempty"`
Details json.RawMessage `json:"details,omitempty"`
Originator string `json:"originator"`
Status string `json:"status,omitempty"`
}{
body := AgentEvent{
EventType: event,
Timestamp: time.Now(),
ComputationID: s.computationID,
+9 -10
View File
@@ -56,19 +56,18 @@ type Service interface {
}
type agentService struct {
computation Computation
algorithms [][]byte
datasets [][]byte
result []byte
sm *StateMachine
runError error
eventSvc events.Service
computation Computation // Holds the current computation request details.
algorithms [][]byte // Stores the algorithms received for the computation.
datasets [][]byte // Stores the datasets received for the computation.
result []byte // Stores the result of the computation.
sm *StateMachine // Manages the state transitions of the agent service.
runError error // Stores any error encountered during the computation run.
eventSvc events.Service // Service for publishing events related to computation.
}
const (
socketPath = "unix_socket"
pyRuntime = "python3"
notificationTopic = "agent"
socketPath = "unix_socket"
pyRuntime = "python3"
)
var _ Service = (*agentService)(nil)
+1 -1
View File
@@ -25,7 +25,7 @@ func (cli *CLI) NewRunCmd() *cobra.Command {
log.Fatalf("Error unmarshling computation json: %v", err)
}
req := manager.Computation{
req := manager.ComputationRunReq{
Id: cmp.ID,
Description: cmp.Description,
Name: cmp.Name,
+15 -12
View File
@@ -6,15 +6,17 @@ import (
"fmt"
"log"
"os"
"strings"
mglog "github.com/absmach/magistrala/logger"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"github.com/ultravioletrs/cocos/cli"
"github.com/ultravioletrs/cocos/internal/env"
managersvc "github.com/ultravioletrs/cocos/manager"
"github.com/ultravioletrs/cocos/manager/qemu"
"github.com/ultravioletrs/cocos/pkg/clients/grpc"
"github.com/ultravioletrs/cocos/pkg/clients/grpc/agent"
"github.com/ultravioletrs/cocos/pkg/clients/grpc/manager"
"github.com/ultravioletrs/cocos/pkg/sdk"
)
@@ -23,10 +25,11 @@ const (
envPrefixAgentGRPC = "AGENT_GRPC_"
envPrefixManagerGRPC = "MANAGER_GRPC_"
completion = "completion"
envPrefixQemu = "MANAGER_QEMU_"
)
type config struct {
LogLevel string `env:"AGENT_LOG_LEVEL" envDefault:"info"`
LogLevel string `env:"AGENT_LOG_LEVEL" envDefault:"info"`
}
func main() {
@@ -45,11 +48,6 @@ func main() {
logger.Error(fmt.Sprintf("failed to load %s gRPC client configuration : %s", svcName, err))
return
}
managerGRPCConfig := grpc.Config{}
if err := env.Parse(&managerGRPCConfig, env.Options{Prefix: envPrefixManagerGRPC}); err != nil {
logger.Error(fmt.Sprintf("failed to load %s gRPC client configuration : %s", svcName, err))
return
}
agentGRPCClient, agentClient, err := agent.NewAgentClient(agentGRPCConfig)
if err != nil {
@@ -58,15 +56,20 @@ func main() {
}
defer agentGRPCClient.Close()
managerGRPCClient, managerClient, err := manager.NewManagerClient(managerGRPCConfig)
if err != nil {
logger.Error(err.Error())
qemuCfg := qemu.Config{}
if err := env.Parse(&qemuCfg, env.Options{Prefix: envPrefixQemu}); err != nil {
logger.Error(fmt.Sprintf("failed to load QEMU configuration: %s", err))
return
}
defer managerGRPCClient.Close()
exe, args, err := qemu.ExecutableAndArgs(qemuCfg)
if err != nil {
logger.Error(fmt.Sprintf("failed to parse QEMU configuration: %s", err))
return
}
logger.Info(fmt.Sprintf("%s %s", exe, strings.Join(args, " ")))
agentSDK := sdk.NewAgentSDK(logger, agentClient)
managerSDK := sdk.NewManagerSDK(managerClient, logger)
managerSDK := managersvc.New(qemuCfg, logger, make(chan *managersvc.ClientStreamMessage))
cliSVC := cli.New(agentSDK, managerSDK)
+35 -47
View File
@@ -15,42 +15,34 @@ import (
"github.com/absmach/magistrala/pkg/uuid"
"github.com/ultravioletrs/cocos/internal"
"github.com/ultravioletrs/cocos/internal/env"
"github.com/ultravioletrs/cocos/internal/events"
jaegerclient "github.com/ultravioletrs/cocos/internal/jaeger"
"github.com/ultravioletrs/cocos/internal/server"
grpcserver "github.com/ultravioletrs/cocos/internal/server/grpc"
"github.com/ultravioletrs/cocos/manager"
"github.com/ultravioletrs/cocos/manager/agentevents"
"github.com/ultravioletrs/cocos/manager/api"
managergrpc "github.com/ultravioletrs/cocos/manager/api/grpc"
managerapi "github.com/ultravioletrs/cocos/manager/api/grpc"
"github.com/ultravioletrs/cocos/manager/qemu"
"github.com/ultravioletrs/cocos/manager/tracing"
"github.com/ultravioletrs/cocos/pkg/clients/grpc"
managergrpc "github.com/ultravioletrs/cocos/pkg/clients/grpc/manager"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
)
const (
svcName = "manager"
envPrefixHTTP = "MANAGER_HTTP_"
envPrefixGRPC = "MANAGER_GRPC_"
envPrefixAgentGRPC = "AGENT_GRPC_"
envPrefixQemu = "MANAGER_QEMU_"
defSvcGRPCPort = "7001"
defSvcHTTPPort = "9021"
svcName = "manager"
envPrefixGRPC = "MANAGER_GRPC_"
envPrefixQemu = "MANAGER_QEMU_"
defSvcGRPCPort = "7001"
defSvcHTTPPort = "9021"
)
type config struct {
LogLevel string `env:"MANAGER_LOG_LEVEL" envDefault:"info"`
JaegerURL string `env:"COCOS_JAEGER_URL" envDefault:"http://localhost:14268/api/traces"`
InstanceID string `env:"MANAGER_INSTANCE_ID" envDefault:""`
NotificationServerURL string `env:"COCOS_NOTIFICATION_SERVER_URL" envDefault:"http://localhost:9000"`
HostIP string `env:"MANAGER_HOST_IP" envDefault:"localhost"`
LogLevel string `env:"MANAGER_LOG_LEVEL" envDefault:"info"`
JaegerURL string `env:"COCOS_JAEGER_URL" envDefault:"http://localhost:14268/api/traces"`
InstanceID string `env:"MANAGER_INSTANCE_ID" envDefault:""`
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
ctx := context.Background()
g, ctx := errgroup.WithContext(ctx)
var cfg config
@@ -94,40 +86,36 @@ func main() {
}
logger.Info(fmt.Sprintf("%s %s", exe, strings.Join(args, " ")))
agEvents, err := agentevents.New(cfg.NotificationServerURL)
managerGRPCConfig := grpc.Config{}
if err := env.Parse(&managerGRPCConfig, env.Options{Prefix: envPrefixGRPC}); err != nil {
logger.Error(fmt.Sprintf("failed to load %s gRPC client configuration : %s", svcName, err))
return
}
managerGRPCClient, managerClient, err := managergrpc.NewManagerClient(managerGRPCConfig)
if err != nil {
logger.Error(fmt.Sprintf("failed to start agent events service: %s", err))
logger.Error(err.Error())
return
}
errChan := make(chan error)
go agEvents.Forward(ctx, errChan)
go func() {
for err := range errChan {
logger.Warn(err.Error())
}
}()
defer managerGRPCClient.Close()
svc := newService(logger, tracer, qemuCfg, events.New(svcName, cfg.NotificationServerURL), cfg)
httpServerConfig := server.Config{Port: defSvcHTTPPort}
if err := env.Parse(&httpServerConfig, env.Options{Prefix: envPrefixHTTP}); err != nil {
logger.Error(fmt.Sprintf("failed to load %s gRPC server configuration: %s", svcName, err))
pc, err := managerClient.Process(ctx)
if err != nil {
logger.Error(err.Error())
return
}
if err := pc.Send(&manager.ClientStreamMessage{Message: &manager.ClientStreamMessage_WhoamiRequest{}}); err != nil {
logger.Error(err.Error())
return
}
grpcServerConfig := server.Config{Port: defSvcGRPCPort}
if err := env.Parse(&grpcServerConfig, env.Options{Prefix: envPrefixGRPC}); err != nil {
log.Printf("failed to load %s gRPC server configuration: %s", svcName, err.Error())
return
}
registerManagerServiceServer := func(srv *grpc.Server) {
reflection.Register(srv)
manager.RegisterManagerServiceServer(srv, managergrpc.NewServer(svc))
}
gs := grpcserver.New(ctx, cancel, svcName, grpcServerConfig, registerManagerServiceServer, logger)
eventsChan := make(chan *manager.ClientStreamMessage)
svc := newService(logger, tracer, qemuCfg, eventsChan)
mc := managerapi.NewClient(pc, svc, eventsChan)
g.Go(func() error {
return gs.Start()
return mc.Process(ctx)
})
if err := g.Wait(); err != nil {
@@ -140,8 +128,8 @@ func main() {
}
}
func newService(logger *slog.Logger, tracer trace.Tracer, qemuCfg qemu.Config, eventSvc events.Service, cfg config) manager.Service {
svc := manager.New(qemuCfg, logger, eventSvc, cfg.HostIP)
func newService(logger *slog.Logger, tracer trace.Tracer, qemuCfg qemu.Config, eventsChan chan *manager.ClientStreamMessage) manager.Service {
svc := manager.New(qemuCfg, logger, eventsChan)
svc = api.LoggingMiddleware(svc, logger)
counter, latency := internal.MakeMetrics(svcName, "api")
-11
View File
@@ -1,11 +0,0 @@
# Events package
## Overview
The events package provides a simple client for sending events to a server. It is designed to be used in conjunction with a server that can handle and process events.
`service`: A string representing the name of the service. Agent or manager.
`serverUrl`: A string representing the URL of the events server.
`event`: A string representing the event type of the events.
`computationId`: A string representing the computation ID associated with the events. This can be null for idle events.
The notifications are sent to endpoint `<server_url>/notifications/events` using post method.
-72
View File
@@ -1,72 +0,0 @@
// Copyright (c) Ultraviolet
// SPDX-License-Identifier: Apache-2.0
package events
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"net/http"
"time"
)
var errFailedToCreateNotification = errors.New("failed to create notification on server")
type service struct {
service string
serverUrl string
}
type Event struct {
EventType string `json:"event_type"`
Timestamp time.Time `json:"timestamp"`
ComputationID string `json:"computation_id,omitempty"`
Details json.RawMessage `json:"details,omitempty"`
Originator string `json:"originator"`
Status string `json:"status,omitempty"`
}
type Service interface {
SendEvent(event, computationId, status string, details json.RawMessage) error
SendRaw(body []byte) error
}
func New(svc, serverUrl string) Service {
return &service{
service: svc,
serverUrl: serverUrl,
}
}
func (s *service) SendEvent(event, computationId, status string, details json.RawMessage) error {
body := Event{
EventType: event,
Timestamp: time.Now(),
ComputationID: computationId,
Originator: s.service,
Status: status,
Details: details,
}
jsonBody, err := json.Marshal(body)
if err != nil {
return err
}
return s.SendRaw(jsonBody)
}
func (s *service) SendRaw(body []byte) error {
req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s/computations/events", s.serverUrl), bytes.NewReader(body))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
res, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
if res.StatusCode != http.StatusCreated {
return errFailedToCreateNotification
}
return nil
}
-2
View File
@@ -15,8 +15,6 @@ The service is configured using the environment variables from the following tab
| MANAGER_GRPC_SERVER_KEY | Path to server key in pem format | |
| COCOS_JAEGER_URL | Jaeger server URL | http://localhost:14268/api/traces |
| MANAGER_INSTANCE_ID | Manager service instance ID | |
| COCOS_NOTIFICATION_SERVER_URL | Server to receive notification events from agent. | http:/localhost:9000 |
| MANAGER_HOST_IP | Mnagaer host IP address | localhost |
## Setup
+55
View File
@@ -0,0 +1,55 @@
// Copyright (c) Ultraviolet
// SPDX-License-Identifier: Apache-2.0
package manager
import (
"encoding/json"
"net"
"github.com/mdlayher/vsock"
"github.com/ultravioletrs/cocos/agent/events"
"google.golang.org/protobuf/types/known/timestamppb"
)
func (s *managerService) retrieveAgentEvents() {
l, err := vsock.Listen(events.VsockEventsPort, nil)
if err != nil {
s.logger.Warn(err.Error())
return
}
for {
conn, err := l.Accept()
if err != nil {
s.logger.Warn(err.Error())
continue
}
go s.handleEventsConnections(conn)
}
}
func (s *managerService) handleEventsConnections(conn net.Conn) {
defer conn.Close()
for {
b := make([]byte, messageSize)
n, err := conn.Read(b)
if err != nil {
s.logger.Warn(err.Error())
return
}
var ev events.AgentEvent
if err := json.Unmarshal(b[:n], &ev); err != nil {
s.logger.Warn(err.Error())
continue
}
s.eventsChan <- &ClientStreamMessage{
Message: &ClientStreamMessage_AgentEvent{AgentEvent: &AgentEvent{
EventType: ev.EventType,
Timestamp: timestamppb.New(ev.Timestamp),
ComputationId: ev.ComputationID,
Details: ev.Details,
Originator: ev.Originator,
Status: ev.Status,
}},
}
}
}
-64
View File
@@ -1,64 +0,0 @@
// Copyright (c) Ultraviolet
// SPDX-License-Identifier: Apache-2.0
package agentevents
import (
"context"
"net"
"github.com/mdlayher/vsock"
"github.com/ultravioletrs/cocos/internal/events"
)
const (
VsockEventsPort uint32 = 9998
svc string = "agent"
messageSize int = 1024
)
type service struct {
svc events.Service
listener *vsock.Listener
}
type Service interface {
Forward(ctx context.Context, errChan chan<- error)
}
func New(eventServerUrl string) (Service, error) {
l, err := vsock.Listen(VsockEventsPort, nil)
if err != nil {
return nil, err
}
return &service{
svc: events.New(svc, eventServerUrl),
listener: l,
}, nil
}
func (s *service) Forward(ctx context.Context, errChan chan<- error) {
for {
conn, err := s.listener.Accept()
if err != nil {
errChan <- err
continue
}
go s.handleConnections(conn, errChan)
}
}
func (s *service) handleConnections(conn net.Conn, errCh chan<- error) {
defer conn.Close()
for {
b := make([]byte, messageSize)
n, err := conn.Read(b)
if err != nil {
errCh <- err
return
}
if err := s.svc.SendRaw(b[:n]); err != nil {
errCh <- err
continue
}
}
}
+3 -2
View File
@@ -33,11 +33,11 @@ func (ms *managerService) retrieveAgentLogs() {
continue
}
go ms.handleConnections(conn)
go ms.handleLogsConnections(conn)
}
}
func (ms *managerService) handleConnections(conn net.Conn) {
func (ms *managerService) handleLogsConnections(conn net.Conn) {
defer conn.Close()
for {
b := make([]byte, messageSize)
@@ -52,6 +52,7 @@ func (ms *managerService) handleConnections(conn net.Conn) {
continue
}
ms.logger.Info(fmt.Sprintf("Agent Log, Computation ID: %s, Log: %s", cmpID, string(b[:n])))
ms.eventsChan <- &ClientStreamMessage{Message: &ClientStreamMessage_AgentLog{AgentLog: &AgentLog{ComputationId: cmpID, LogMessage: string(b[:n])}}}
}
}
+40 -58
View File
@@ -4,72 +4,54 @@ package grpc
import (
"context"
"fmt"
"time"
"github.com/go-kit/kit/endpoint"
kitgrpc "github.com/go-kit/kit/transport/grpc"
"github.com/ultravioletrs/cocos/manager"
"google.golang.org/grpc"
"golang.org/x/sync/errgroup"
)
const svcName = "manager.ManagerService"
type grpcClient struct {
run endpoint.Endpoint
timeout time.Duration
type ManagerClient struct {
stream manager.ManagerService_ProcessClient
svc manager.Service
responses chan *manager.ClientStreamMessage
}
// NewClient returns new gRPC client instance.
func NewClient(conn *grpc.ClientConn, timeout time.Duration) manager.ManagerServiceClient {
return &grpcClient{
run: kitgrpc.NewClient(
conn,
svcName,
"Run",
encodeRunRequest,
decodeRunResponse,
manager.RunResponse{},
).Endpoint(),
timeout: timeout,
func NewClient(stream manager.ManagerService_ProcessClient, svc manager.Service, responses chan *manager.ClientStreamMessage) ManagerClient {
return ManagerClient{
stream: stream,
svc: svc,
responses: responses,
}
}
// encodeRunRequest is a transport/grpc.EncodeRequestFunc that
// converts a user-domain runReq to a gRPC request.
func encodeRunRequest(_ context.Context, request interface{}) (interface{}, error) {
req, ok := request.(runReq)
if !ok {
return nil, fmt.Errorf("invalid request type: %T", request)
}
return &manager.RunRequest{
Computation: req.Computation,
}, nil
}
// decodeRunResponse is a transport/grpc.DecodeResponseFunc that
// converts a gRPC RunResponse to a user-domain response.
func decodeRunResponse(_ context.Context, grpcResponse interface{}) (interface{}, error) {
res, ok := grpcResponse.(*manager.RunResponse)
if !ok {
return nil, fmt.Errorf("invalid response type: %T", grpcResponse)
}
return runRes{AgentAddress: res.AgentAddress}, nil
}
func (client grpcClient) Run(ctx context.Context, req *manager.RunRequest, _ ...grpc.CallOption) (*manager.RunResponse, error) {
ctx, cancel := context.WithTimeout(ctx, client.timeout)
defer cancel()
runReq := runReq{
Computation: req.GetComputation(),
}
res, err := client.run(ctx, runReq)
if err != nil {
return nil, err
}
runRes := res.(runRes)
return &manager.RunResponse{AgentAddress: runRes.AgentAddress}, nil
func (client ManagerClient) Process(ctx context.Context) error {
eg, ctx := errgroup.WithContext(ctx)
eg.Go(func() error {
for {
req, err := client.stream.Recv()
if err != nil {
return err
}
port, err := client.svc.Run(ctx, req)
if err != nil {
return err
}
runRes := &manager.ClientStreamMessage_RunRes{RunRes: &manager.RunResponse{AgentPort: port}}
if err := client.stream.Send(&manager.ClientStreamMessage{Message: runRes}); err != nil {
return err
}
}
})
eg.Go(func() error {
for mes := range client.responses {
if err := client.stream.Send(mes); err != nil {
return err
}
}
return nil
})
return eg.Wait()
}
-27
View File
@@ -1,27 +0,0 @@
// Copyright (c) Ultraviolet
// SPDX-License-Identifier: Apache-2.0
package grpc
import (
"context"
"github.com/go-kit/kit/endpoint"
"github.com/ultravioletrs/cocos/manager"
)
func runEndpoint(svc manager.Service) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (interface{}, error) {
req := request.(runReq)
if err := req.validate(); err != nil {
return runRes{}, err
}
agAddr, err := svc.Run(ctx, req.Computation)
if err != nil {
return runRes{}, err
}
return runRes{AgentAddress: agAddr}, nil
}
}
-15
View File
@@ -1,15 +0,0 @@
// Copyright (c) Ultraviolet
// SPDX-License-Identifier: Apache-2.0
package grpc
import (
"github.com/ultravioletrs/cocos/manager"
)
type runReq struct {
Computation *manager.Computation `json:"computation"`
}
func (req runReq) validate() error {
return nil
}
-7
View File
@@ -1,7 +0,0 @@
// Copyright (c) Ultraviolet
// SPDX-License-Identifier: Apache-2.0
package grpc
type runRes struct {
AgentAddress string `json:"agent_address"`
}
+28 -25
View File
@@ -5,44 +5,47 @@ package grpc
import (
"context"
"github.com/go-kit/kit/transport/grpc"
"github.com/ultravioletrs/cocos/manager"
"golang.org/x/sync/errgroup"
)
type grpcServer struct {
run grpc.Handler
manager.UnimplementedManagerServiceServer
incoming chan *manager.ClientStreamMessage
responses chan *manager.ComputationRunReq
ctx context.Context
}
// NewServer returns new AuthServiceServer instance.
func NewServer(svc manager.Service) manager.ManagerServiceServer {
func NewServer(ctx context.Context, incoming chan *manager.ClientStreamMessage, responses chan *manager.ComputationRunReq) manager.ManagerServiceServer {
return &grpcServer{
run: grpc.NewServer(
runEndpoint(svc),
decodeRunRequest,
encodeRunResponse,
),
incoming: incoming,
responses: responses,
}
}
func decodeRunRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
req := grpcReq.(*manager.RunRequest)
func (s *grpcServer) Process(stream manager.ManagerService_ProcessServer) error {
eg, _ := errgroup.WithContext(s.ctx)
return runReq{
Computation: req.GetComputation(),
}, nil
}
eg.Go(func() error {
for {
req, err := stream.Recv()
if err != nil {
return err
}
func encodeRunResponse(_ context.Context, response interface{}) (interface{}, error) {
res := response.(runRes)
return &manager.RunResponse{AgentAddress: res.AgentAddress}, nil
}
s.incoming <- req
}
})
func (s *grpcServer) Run(ctx context.Context, req *manager.RunRequest) (*manager.RunResponse, error) {
_, res, err := s.run.ServeGRPC(ctx, req)
if err != nil {
return nil, err
}
rr := res.(*manager.RunResponse)
return rr, nil
eg.Go(func() error {
for resp := range s.responses {
if err := stream.Send(resp); err != nil {
return err
}
}
return nil
})
return eg.Wait()
}
+1 -1
View File
@@ -27,7 +27,7 @@ func LoggingMiddleware(svc manager.Service, logger *slog.Logger) manager.Service
return &loggingMiddleware{logger, svc}
}
func (lm *loggingMiddleware) Run(ctx context.Context, mc *manager.Computation) (agentAddr string, err error) {
func (lm *loggingMiddleware) Run(ctx context.Context, mc *manager.ComputationRunReq) (agentAddr string, err error) {
defer func(begin time.Time) {
message := fmt.Sprintf("Method Run for computation took %s to complete", time.Since(begin))
if err != nil {
+1 -1
View File
@@ -32,7 +32,7 @@ func MetricsMiddleware(svc manager.Service, counter metrics.Counter, latency met
}
}
func (ms *metricsMiddleware) Run(ctx context.Context, mc *manager.Computation) (string, error) {
func (ms *metricsMiddleware) Run(ctx context.Context, mc *manager.ComputationRunReq) (string, error) {
defer func(begin time.Time) {
ms.counter.With("method", "Run").Add(1)
ms.latency.With("method", "Run").Observe(time.Since(begin).Seconds())
+548 -188
View File
@@ -12,6 +12,7 @@ package manager
import (
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
timestamppb "google.golang.org/protobuf/types/known/timestamppb"
reflect "reflect"
sync "sync"
)
@@ -23,16 +24,14 @@ const (
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
type RunRequest struct {
type WhoAmIRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Computation *Computation `protobuf:"bytes,1,opt,name=computation,proto3" json:"computation,omitempty"`
}
func (x *RunRequest) Reset() {
*x = RunRequest{}
func (x *WhoAmIRequest) Reset() {
*x = WhoAmIRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_manager_manager_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
@@ -40,13 +39,13 @@ func (x *RunRequest) Reset() {
}
}
func (x *RunRequest) String() string {
func (x *WhoAmIRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*RunRequest) ProtoMessage() {}
func (*WhoAmIRequest) ProtoMessage() {}
func (x *RunRequest) ProtoReflect() protoreflect.Message {
func (x *WhoAmIRequest) ProtoReflect() protoreflect.Message {
mi := &file_manager_manager_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
@@ -58,19 +57,310 @@ func (x *RunRequest) ProtoReflect() protoreflect.Message {
return mi.MessageOf(x)
}
// Deprecated: Use RunRequest.ProtoReflect.Descriptor instead.
func (*RunRequest) Descriptor() ([]byte, []int) {
// Deprecated: Use WhoAmIRequest.ProtoReflect.Descriptor instead.
func (*WhoAmIRequest) Descriptor() ([]byte, []int) {
return file_manager_manager_proto_rawDescGZIP(), []int{0}
}
func (x *RunRequest) GetComputation() *Computation {
type RunResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
AgentPort string `protobuf:"bytes,1,opt,name=agent_port,json=agentPort,proto3" json:"agent_port,omitempty"`
}
func (x *RunResponse) Reset() {
*x = RunResponse{}
if protoimpl.UnsafeEnabled {
mi := &file_manager_manager_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *RunResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*RunResponse) ProtoMessage() {}
func (x *RunResponse) ProtoReflect() protoreflect.Message {
mi := &file_manager_manager_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use RunResponse.ProtoReflect.Descriptor instead.
func (*RunResponse) Descriptor() ([]byte, []int) {
return file_manager_manager_proto_rawDescGZIP(), []int{1}
}
func (x *RunResponse) GetAgentPort() string {
if x != nil {
return x.Computation
return x.AgentPort
}
return ""
}
type AgentEvent struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
EventType string `protobuf:"bytes,1,opt,name=event_type,json=eventType,proto3" json:"event_type,omitempty"`
Timestamp *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
ComputationId string `protobuf:"bytes,3,opt,name=computation_id,json=computationId,proto3" json:"computation_id,omitempty"`
Details []byte `protobuf:"bytes,4,opt,name=details,proto3" json:"details,omitempty"`
Originator string `protobuf:"bytes,5,opt,name=originator,proto3" json:"originator,omitempty"`
Status string `protobuf:"bytes,6,opt,name=status,proto3" json:"status,omitempty"`
}
func (x *AgentEvent) Reset() {
*x = AgentEvent{}
if protoimpl.UnsafeEnabled {
mi := &file_manager_manager_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *AgentEvent) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*AgentEvent) ProtoMessage() {}
func (x *AgentEvent) ProtoReflect() protoreflect.Message {
mi := &file_manager_manager_proto_msgTypes[2]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use AgentEvent.ProtoReflect.Descriptor instead.
func (*AgentEvent) Descriptor() ([]byte, []int) {
return file_manager_manager_proto_rawDescGZIP(), []int{2}
}
func (x *AgentEvent) GetEventType() string {
if x != nil {
return x.EventType
}
return ""
}
func (x *AgentEvent) GetTimestamp() *timestamppb.Timestamp {
if x != nil {
return x.Timestamp
}
return nil
}
type Computation struct {
func (x *AgentEvent) GetComputationId() string {
if x != nil {
return x.ComputationId
}
return ""
}
func (x *AgentEvent) GetDetails() []byte {
if x != nil {
return x.Details
}
return nil
}
func (x *AgentEvent) GetOriginator() string {
if x != nil {
return x.Originator
}
return ""
}
func (x *AgentEvent) GetStatus() string {
if x != nil {
return x.Status
}
return ""
}
type AgentLog struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
LogMessage string `protobuf:"bytes,1,opt,name=log_message,json=logMessage,proto3" json:"log_message,omitempty"`
ComputationId string `protobuf:"bytes,2,opt,name=computation_id,json=computationId,proto3" json:"computation_id,omitempty"`
}
func (x *AgentLog) Reset() {
*x = AgentLog{}
if protoimpl.UnsafeEnabled {
mi := &file_manager_manager_proto_msgTypes[3]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *AgentLog) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*AgentLog) ProtoMessage() {}
func (x *AgentLog) ProtoReflect() protoreflect.Message {
mi := &file_manager_manager_proto_msgTypes[3]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use AgentLog.ProtoReflect.Descriptor instead.
func (*AgentLog) Descriptor() ([]byte, []int) {
return file_manager_manager_proto_rawDescGZIP(), []int{3}
}
func (x *AgentLog) GetLogMessage() string {
if x != nil {
return x.LogMessage
}
return ""
}
func (x *AgentLog) GetComputationId() string {
if x != nil {
return x.ComputationId
}
return ""
}
type ClientStreamMessage struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
// Types that are assignable to Message:
//
// *ClientStreamMessage_WhoamiRequest
// *ClientStreamMessage_AgentLog
// *ClientStreamMessage_AgentEvent
// *ClientStreamMessage_RunRes
Message isClientStreamMessage_Message `protobuf_oneof:"message"`
}
func (x *ClientStreamMessage) Reset() {
*x = ClientStreamMessage{}
if protoimpl.UnsafeEnabled {
mi := &file_manager_manager_proto_msgTypes[4]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *ClientStreamMessage) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*ClientStreamMessage) ProtoMessage() {}
func (x *ClientStreamMessage) ProtoReflect() protoreflect.Message {
mi := &file_manager_manager_proto_msgTypes[4]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use ClientStreamMessage.ProtoReflect.Descriptor instead.
func (*ClientStreamMessage) Descriptor() ([]byte, []int) {
return file_manager_manager_proto_rawDescGZIP(), []int{4}
}
func (m *ClientStreamMessage) GetMessage() isClientStreamMessage_Message {
if m != nil {
return m.Message
}
return nil
}
func (x *ClientStreamMessage) GetWhoamiRequest() *WhoAmIRequest {
if x, ok := x.GetMessage().(*ClientStreamMessage_WhoamiRequest); ok {
return x.WhoamiRequest
}
return nil
}
func (x *ClientStreamMessage) GetAgentLog() *AgentLog {
if x, ok := x.GetMessage().(*ClientStreamMessage_AgentLog); ok {
return x.AgentLog
}
return nil
}
func (x *ClientStreamMessage) GetAgentEvent() *AgentEvent {
if x, ok := x.GetMessage().(*ClientStreamMessage_AgentEvent); ok {
return x.AgentEvent
}
return nil
}
func (x *ClientStreamMessage) GetRunRes() *RunResponse {
if x, ok := x.GetMessage().(*ClientStreamMessage_RunRes); ok {
return x.RunRes
}
return nil
}
type isClientStreamMessage_Message interface {
isClientStreamMessage_Message()
}
type ClientStreamMessage_WhoamiRequest struct {
WhoamiRequest *WhoAmIRequest `protobuf:"bytes,1,opt,name=whoami_request,json=whoamiRequest,proto3,oneof"`
}
type ClientStreamMessage_AgentLog struct {
AgentLog *AgentLog `protobuf:"bytes,2,opt,name=agent_log,json=agentLog,proto3,oneof"`
}
type ClientStreamMessage_AgentEvent struct {
AgentEvent *AgentEvent `protobuf:"bytes,3,opt,name=agent_event,json=agentEvent,proto3,oneof"`
}
type ClientStreamMessage_RunRes struct {
RunRes *RunResponse `protobuf:"bytes,4,opt,name=run_res,json=runRes,proto3,oneof"`
}
func (*ClientStreamMessage_WhoamiRequest) isClientStreamMessage_Message() {}
func (*ClientStreamMessage_AgentLog) isClientStreamMessage_Message() {}
func (*ClientStreamMessage_AgentEvent) isClientStreamMessage_Message() {}
func (*ClientStreamMessage_RunRes) isClientStreamMessage_Message() {}
type ComputationRunReq struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
@@ -84,23 +374,23 @@ type Computation struct {
AgentConfig *AgentConfig `protobuf:"bytes,7,opt,name=agent_config,json=agentConfig,proto3" json:"agent_config,omitempty"`
}
func (x *Computation) Reset() {
*x = Computation{}
func (x *ComputationRunReq) Reset() {
*x = ComputationRunReq{}
if protoimpl.UnsafeEnabled {
mi := &file_manager_manager_proto_msgTypes[1]
mi := &file_manager_manager_proto_msgTypes[5]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *Computation) String() string {
func (x *ComputationRunReq) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*Computation) ProtoMessage() {}
func (*ComputationRunReq) ProtoMessage() {}
func (x *Computation) ProtoReflect() protoreflect.Message {
mi := &file_manager_manager_proto_msgTypes[1]
func (x *ComputationRunReq) ProtoReflect() protoreflect.Message {
mi := &file_manager_manager_proto_msgTypes[5]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -111,54 +401,54 @@ func (x *Computation) ProtoReflect() protoreflect.Message {
return mi.MessageOf(x)
}
// Deprecated: Use Computation.ProtoReflect.Descriptor instead.
func (*Computation) Descriptor() ([]byte, []int) {
return file_manager_manager_proto_rawDescGZIP(), []int{1}
// Deprecated: Use ComputationRunReq.ProtoReflect.Descriptor instead.
func (*ComputationRunReq) Descriptor() ([]byte, []int) {
return file_manager_manager_proto_rawDescGZIP(), []int{5}
}
func (x *Computation) GetId() string {
func (x *ComputationRunReq) GetId() string {
if x != nil {
return x.Id
}
return ""
}
func (x *Computation) GetName() string {
func (x *ComputationRunReq) GetName() string {
if x != nil {
return x.Name
}
return ""
}
func (x *Computation) GetDescription() string {
func (x *ComputationRunReq) GetDescription() string {
if x != nil {
return x.Description
}
return ""
}
func (x *Computation) GetDatasets() []*Dataset {
func (x *ComputationRunReq) GetDatasets() []*Dataset {
if x != nil {
return x.Datasets
}
return nil
}
func (x *Computation) GetAlgorithms() []*Algorithm {
func (x *ComputationRunReq) GetAlgorithms() []*Algorithm {
if x != nil {
return x.Algorithms
}
return nil
}
func (x *Computation) GetResultConsumers() []string {
func (x *ComputationRunReq) GetResultConsumers() []string {
if x != nil {
return x.ResultConsumers
}
return nil
}
func (x *Computation) GetAgentConfig() *AgentConfig {
func (x *ComputationRunReq) GetAgentConfig() *AgentConfig {
if x != nil {
return x.AgentConfig
}
@@ -177,7 +467,7 @@ type Dataset struct {
func (x *Dataset) Reset() {
*x = Dataset{}
if protoimpl.UnsafeEnabled {
mi := &file_manager_manager_proto_msgTypes[2]
mi := &file_manager_manager_proto_msgTypes[6]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -190,7 +480,7 @@ func (x *Dataset) String() string {
func (*Dataset) ProtoMessage() {}
func (x *Dataset) ProtoReflect() protoreflect.Message {
mi := &file_manager_manager_proto_msgTypes[2]
mi := &file_manager_manager_proto_msgTypes[6]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -203,7 +493,7 @@ func (x *Dataset) ProtoReflect() protoreflect.Message {
// Deprecated: Use Dataset.ProtoReflect.Descriptor instead.
func (*Dataset) Descriptor() ([]byte, []int) {
return file_manager_manager_proto_rawDescGZIP(), []int{2}
return file_manager_manager_proto_rawDescGZIP(), []int{6}
}
func (x *Dataset) GetProvider() string {
@@ -232,7 +522,7 @@ type Algorithm struct {
func (x *Algorithm) Reset() {
*x = Algorithm{}
if protoimpl.UnsafeEnabled {
mi := &file_manager_manager_proto_msgTypes[3]
mi := &file_manager_manager_proto_msgTypes[7]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -245,7 +535,7 @@ func (x *Algorithm) String() string {
func (*Algorithm) ProtoMessage() {}
func (x *Algorithm) ProtoReflect() protoreflect.Message {
mi := &file_manager_manager_proto_msgTypes[3]
mi := &file_manager_manager_proto_msgTypes[7]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -258,7 +548,7 @@ func (x *Algorithm) ProtoReflect() protoreflect.Message {
// Deprecated: Use Algorithm.ProtoReflect.Descriptor instead.
func (*Algorithm) Descriptor() ([]byte, []int) {
return file_manager_manager_proto_rawDescGZIP(), []int{3}
return file_manager_manager_proto_rawDescGZIP(), []int{7}
}
func (x *Algorithm) GetProvider() string {
@@ -275,69 +565,25 @@ func (x *Algorithm) GetId() string {
return ""
}
type RunResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
AgentAddress string `protobuf:"bytes,1,opt,name=agent_address,json=agentAddress,proto3" json:"agent_address,omitempty"`
}
func (x *RunResponse) Reset() {
*x = RunResponse{}
if protoimpl.UnsafeEnabled {
mi := &file_manager_manager_proto_msgTypes[4]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *RunResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*RunResponse) ProtoMessage() {}
func (x *RunResponse) ProtoReflect() protoreflect.Message {
mi := &file_manager_manager_proto_msgTypes[4]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use RunResponse.ProtoReflect.Descriptor instead.
func (*RunResponse) Descriptor() ([]byte, []int) {
return file_manager_manager_proto_rawDescGZIP(), []int{4}
}
func (x *RunResponse) GetAgentAddress() string {
if x != nil {
return x.AgentAddress
}
return ""
}
type AgentConfig struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Port string `protobuf:"bytes,1,opt,name=port,proto3" json:"port,omitempty"`
Host string `protobuf:"bytes,2,opt,name=host,proto3" json:"host,omitempty"`
CertFile string `protobuf:"bytes,3,opt,name=cert_file,json=certFile,proto3" json:"cert_file,omitempty"`
KeyFile string `protobuf:"bytes,4,opt,name=key_file,json=keyFile,proto3" json:"key_file,omitempty"`
LogLevel string `protobuf:"bytes,5,opt,name=log_level,json=logLevel,proto3" json:"log_level,omitempty"`
Port string `protobuf:"bytes,1,opt,name=port,proto3" json:"port,omitempty"`
Host string `protobuf:"bytes,2,opt,name=host,proto3" json:"host,omitempty"`
CertFile string `protobuf:"bytes,3,opt,name=cert_file,json=certFile,proto3" json:"cert_file,omitempty"`
KeyFile string `protobuf:"bytes,4,opt,name=key_file,json=keyFile,proto3" json:"key_file,omitempty"`
ClientCaFile string `protobuf:"bytes,5,opt,name=client_ca_file,json=clientCaFile,proto3" json:"client_ca_file,omitempty"`
ServerCaFile string `protobuf:"bytes,6,opt,name=server_ca_file,json=serverCaFile,proto3" json:"server_ca_file,omitempty"`
LogLevel string `protobuf:"bytes,7,opt,name=log_level,json=logLevel,proto3" json:"log_level,omitempty"`
InstanceId string `protobuf:"bytes,8,opt,name=instance_id,json=instanceId,proto3" json:"instance_id,omitempty"`
}
func (x *AgentConfig) Reset() {
*x = AgentConfig{}
if protoimpl.UnsafeEnabled {
mi := &file_manager_manager_proto_msgTypes[5]
mi := &file_manager_manager_proto_msgTypes[8]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -350,7 +596,7 @@ func (x *AgentConfig) String() string {
func (*AgentConfig) ProtoMessage() {}
func (x *AgentConfig) ProtoReflect() protoreflect.Message {
mi := &file_manager_manager_proto_msgTypes[5]
mi := &file_manager_manager_proto_msgTypes[8]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -363,7 +609,7 @@ func (x *AgentConfig) ProtoReflect() protoreflect.Message {
// Deprecated: Use AgentConfig.ProtoReflect.Descriptor instead.
func (*AgentConfig) Descriptor() ([]byte, []int) {
return file_manager_manager_proto_rawDescGZIP(), []int{5}
return file_manager_manager_proto_rawDescGZIP(), []int{8}
}
func (x *AgentConfig) GetPort() string {
@@ -394,6 +640,20 @@ func (x *AgentConfig) GetKeyFile() string {
return ""
}
func (x *AgentConfig) GetClientCaFile() string {
if x != nil {
return x.ClientCaFile
}
return ""
}
func (x *AgentConfig) GetServerCaFile() string {
if x != nil {
return x.ServerCaFile
}
return ""
}
func (x *AgentConfig) GetLogLevel() string {
if x != nil {
return x.LogLevel
@@ -401,58 +661,108 @@ func (x *AgentConfig) GetLogLevel() string {
return ""
}
func (x *AgentConfig) GetInstanceId() string {
if x != nil {
return x.InstanceId
}
return ""
}
var File_manager_manager_proto protoreflect.FileDescriptor
var file_manager_manager_proto_rawDesc = []byte{
0x0a, 0x15, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x2f, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65,
0x72, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x07, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72,
0x22, 0x44, 0x0a, 0x0a, 0x52, 0x75, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x36,
0x0a, 0x0b, 0x63, 0x6f, 0x6d, 0x70, 0x75, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20,
0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x2e, 0x43, 0x6f,
0x6d, 0x70, 0x75, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x0b, 0x63, 0x6f, 0x6d, 0x70, 0x75,
0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x22, 0x99, 0x02, 0x0a, 0x0b, 0x43, 0x6f, 0x6d, 0x70, 0x75,
0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01,
0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02,
0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x20, 0x0a, 0x0b, 0x64, 0x65,
0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52,
0x0b, 0x64, 0x65, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x2c, 0x0a, 0x08,
0x64, 0x61, 0x74, 0x61, 0x73, 0x65, 0x74, 0x73, 0x18, 0x04, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x10,
0x2e, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x2e, 0x44, 0x61, 0x74, 0x61, 0x73, 0x65, 0x74,
0x52, 0x08, 0x64, 0x61, 0x74, 0x61, 0x73, 0x65, 0x74, 0x73, 0x12, 0x32, 0x0a, 0x0a, 0x61, 0x6c,
0x67, 0x6f, 0x72, 0x69, 0x74, 0x68, 0x6d, 0x73, 0x18, 0x05, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x12,
0x2e, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x2e, 0x41, 0x6c, 0x67, 0x6f, 0x72, 0x69, 0x74,
0x68, 0x6d, 0x52, 0x0a, 0x61, 0x6c, 0x67, 0x6f, 0x72, 0x69, 0x74, 0x68, 0x6d, 0x73, 0x12, 0x29,
0x0a, 0x10, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x5f, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65,
0x72, 0x73, 0x18, 0x06, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0f, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74,
0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x72, 0x73, 0x12, 0x37, 0x0a, 0x0c, 0x61, 0x67, 0x65,
0x6e, 0x74, 0x5f, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, 0x07, 0x20, 0x01, 0x28, 0x0b, 0x32,
0x14, 0x2e, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x2e, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x43,
0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x0b, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x43, 0x6f, 0x6e, 0x66,
0x69, 0x67, 0x22, 0x35, 0x0a, 0x07, 0x44, 0x61, 0x74, 0x61, 0x73, 0x65, 0x74, 0x12, 0x1a, 0x0a,
0x08, 0x70, 0x72, 0x6f, 0x76, 0x69, 0x64, 0x65, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52,
0x08, 0x70, 0x72, 0x6f, 0x76, 0x69, 0x64, 0x65, 0x72, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18,
0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x22, 0x37, 0x0a, 0x09, 0x41, 0x6c, 0x67,
0x6f, 0x72, 0x69, 0x74, 0x68, 0x6d, 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x72, 0x6f, 0x76, 0x69, 0x64,
0x65, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x70, 0x72, 0x6f, 0x76, 0x69, 0x64,
0x65, 0x72, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02,
0x69, 0x64, 0x22, 0x32, 0x0a, 0x0b, 0x52, 0x75, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73,
0x65, 0x12, 0x23, 0x0a, 0x0d, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x5f, 0x61, 0x64, 0x64, 0x72, 0x65,
0x73, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x41,
0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x22, 0x8a, 0x01, 0x0a, 0x0b, 0x41, 0x67, 0x65, 0x6e, 0x74,
0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75,
0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x22, 0x0f, 0x0a, 0x0d, 0x57, 0x68, 0x6f, 0x41, 0x6d, 0x49, 0x52, 0x65, 0x71, 0x75, 0x65,
0x73, 0x74, 0x22, 0x2c, 0x0a, 0x0b, 0x52, 0x75, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73,
0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x5f, 0x70, 0x6f, 0x72, 0x74, 0x18,
0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x50, 0x6f, 0x72, 0x74,
0x22, 0xde, 0x01, 0x0a, 0x0a, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12,
0x1d, 0x0a, 0x0a, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20,
0x01, 0x28, 0x09, 0x52, 0x09, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x38,
0x0a, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x02, 0x20, 0x01, 0x28,
0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f,
0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x74,
0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x12, 0x25, 0x0a, 0x0e, 0x63, 0x6f, 0x6d, 0x70,
0x75, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09,
0x52, 0x0d, 0x63, 0x6f, 0x6d, 0x70, 0x75, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x12,
0x18, 0x0a, 0x07, 0x64, 0x65, 0x74, 0x61, 0x69, 0x6c, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c,
0x52, 0x07, 0x64, 0x65, 0x74, 0x61, 0x69, 0x6c, 0x73, 0x12, 0x1e, 0x0a, 0x0a, 0x6f, 0x72, 0x69,
0x67, 0x69, 0x6e, 0x61, 0x74, 0x6f, 0x72, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x6f,
0x72, 0x69, 0x67, 0x69, 0x6e, 0x61, 0x74, 0x6f, 0x72, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x61,
0x74, 0x75, 0x73, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75,
0x73, 0x22, 0x52, 0x0a, 0x08, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x4c, 0x6f, 0x67, 0x12, 0x1f, 0x0a,
0x0b, 0x6c, 0x6f, 0x67, 0x5f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01,
0x28, 0x09, 0x52, 0x0a, 0x6c, 0x6f, 0x67, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x25,
0x0a, 0x0e, 0x63, 0x6f, 0x6d, 0x70, 0x75, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x69, 0x64,
0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x63, 0x6f, 0x6d, 0x70, 0x75, 0x74, 0x61, 0x74,
0x69, 0x6f, 0x6e, 0x49, 0x64, 0x22, 0xfc, 0x01, 0x0a, 0x13, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74,
0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x3f, 0x0a,
0x0e, 0x77, 0x68, 0x6f, 0x61, 0x6d, 0x69, 0x5f, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x18,
0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x16, 0x2e, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x2e,
0x57, 0x68, 0x6f, 0x41, 0x6d, 0x49, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x48, 0x00, 0x52,
0x0d, 0x77, 0x68, 0x6f, 0x61, 0x6d, 0x69, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x30,
0x0a, 0x09, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x5f, 0x6c, 0x6f, 0x67, 0x18, 0x02, 0x20, 0x01, 0x28,
0x0b, 0x32, 0x11, 0x2e, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x2e, 0x41, 0x67, 0x65, 0x6e,
0x74, 0x4c, 0x6f, 0x67, 0x48, 0x00, 0x52, 0x08, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x4c, 0x6f, 0x67,
0x12, 0x36, 0x0a, 0x0b, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x5f, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x18,
0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x2e,
0x41, 0x67, 0x65, 0x6e, 0x74, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x48, 0x00, 0x52, 0x0a, 0x61, 0x67,
0x65, 0x6e, 0x74, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x2f, 0x0a, 0x07, 0x72, 0x75, 0x6e, 0x5f,
0x72, 0x65, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x6d, 0x61, 0x6e, 0x61,
0x67, 0x65, 0x72, 0x2e, 0x52, 0x75, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x48,
0x00, 0x52, 0x06, 0x72, 0x75, 0x6e, 0x52, 0x65, 0x73, 0x42, 0x09, 0x0a, 0x07, 0x6d, 0x65, 0x73,
0x73, 0x61, 0x67, 0x65, 0x22, 0x9f, 0x02, 0x0a, 0x11, 0x43, 0x6f, 0x6d, 0x70, 0x75, 0x74, 0x61,
0x74, 0x69, 0x6f, 0x6e, 0x52, 0x75, 0x6e, 0x52, 0x65, 0x71, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64,
0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61,
0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x20,
0x0a, 0x0b, 0x64, 0x65, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x03, 0x20,
0x01, 0x28, 0x09, 0x52, 0x0b, 0x64, 0x65, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e,
0x12, 0x2c, 0x0a, 0x08, 0x64, 0x61, 0x74, 0x61, 0x73, 0x65, 0x74, 0x73, 0x18, 0x04, 0x20, 0x03,
0x28, 0x0b, 0x32, 0x10, 0x2e, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x2e, 0x44, 0x61, 0x74,
0x61, 0x73, 0x65, 0x74, 0x52, 0x08, 0x64, 0x61, 0x74, 0x61, 0x73, 0x65, 0x74, 0x73, 0x12, 0x32,
0x0a, 0x0a, 0x61, 0x6c, 0x67, 0x6f, 0x72, 0x69, 0x74, 0x68, 0x6d, 0x73, 0x18, 0x05, 0x20, 0x03,
0x28, 0x0b, 0x32, 0x12, 0x2e, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x2e, 0x41, 0x6c, 0x67,
0x6f, 0x72, 0x69, 0x74, 0x68, 0x6d, 0x52, 0x0a, 0x61, 0x6c, 0x67, 0x6f, 0x72, 0x69, 0x74, 0x68,
0x6d, 0x73, 0x12, 0x29, 0x0a, 0x10, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x5f, 0x63, 0x6f, 0x6e,
0x73, 0x75, 0x6d, 0x65, 0x72, 0x73, 0x18, 0x06, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0f, 0x72, 0x65,
0x73, 0x75, 0x6c, 0x74, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x72, 0x73, 0x12, 0x37, 0x0a,
0x0c, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x5f, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, 0x07, 0x20,
0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x2e, 0x41, 0x67,
0x65, 0x6e, 0x74, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x0b, 0x61, 0x67, 0x65, 0x6e, 0x74,
0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x22, 0x35, 0x0a, 0x07, 0x44, 0x61, 0x74, 0x61, 0x73, 0x65,
0x74, 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x72, 0x6f, 0x76, 0x69, 0x64, 0x65, 0x72, 0x18, 0x01, 0x20,
0x01, 0x28, 0x09, 0x52, 0x08, 0x70, 0x72, 0x6f, 0x76, 0x69, 0x64, 0x65, 0x72, 0x12, 0x0e, 0x0a,
0x02, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x22, 0x37, 0x0a,
0x09, 0x41, 0x6c, 0x67, 0x6f, 0x72, 0x69, 0x74, 0x68, 0x6d, 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x72,
0x6f, 0x76, 0x69, 0x64, 0x65, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x70, 0x72,
0x6f, 0x76, 0x69, 0x64, 0x65, 0x72, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01,
0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x22, 0xf7, 0x01, 0x0a, 0x0b, 0x41, 0x67, 0x65, 0x6e, 0x74,
0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x12, 0x0a, 0x04, 0x70, 0x6f, 0x72, 0x74, 0x18, 0x01,
0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x70, 0x6f, 0x72, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x68, 0x6f,
0x73, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x68, 0x6f, 0x73, 0x74, 0x12, 0x1b,
0x0a, 0x09, 0x63, 0x65, 0x72, 0x74, 0x5f, 0x66, 0x69, 0x6c, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28,
0x09, 0x52, 0x08, 0x63, 0x65, 0x72, 0x74, 0x46, 0x69, 0x6c, 0x65, 0x12, 0x19, 0x0a, 0x08, 0x6b,
0x65, 0x79, 0x5f, 0x66, 0x69, 0x6c, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6b,
0x65, 0x79, 0x46, 0x69, 0x6c, 0x65, 0x12, 0x1b, 0x0a, 0x09, 0x6c, 0x6f, 0x67, 0x5f, 0x6c, 0x65,
0x76, 0x65, 0x6c, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x6c, 0x6f, 0x67, 0x4c, 0x65,
0x76, 0x65, 0x6c, 0x32, 0x44, 0x0a, 0x0e, 0x4d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x53, 0x65,
0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x32, 0x0a, 0x03, 0x52, 0x75, 0x6e, 0x12, 0x13, 0x2e, 0x6d,
0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x2e, 0x52, 0x75, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73,
0x74, 0x1a, 0x14, 0x2e, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x2e, 0x52, 0x75, 0x6e, 0x52,
0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x0b, 0x5a, 0x09, 0x2e, 0x2f, 0x6d,
0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
0x65, 0x79, 0x46, 0x69, 0x6c, 0x65, 0x12, 0x24, 0x0a, 0x0e, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74,
0x5f, 0x63, 0x61, 0x5f, 0x66, 0x69, 0x6c, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c,
0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x43, 0x61, 0x46, 0x69, 0x6c, 0x65, 0x12, 0x24, 0x0a, 0x0e,
0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x5f, 0x63, 0x61, 0x5f, 0x66, 0x69, 0x6c, 0x65, 0x18, 0x06,
0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x43, 0x61, 0x46, 0x69,
0x6c, 0x65, 0x12, 0x1b, 0x0a, 0x09, 0x6c, 0x6f, 0x67, 0x5f, 0x6c, 0x65, 0x76, 0x65, 0x6c, 0x18,
0x07, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x6c, 0x6f, 0x67, 0x4c, 0x65, 0x76, 0x65, 0x6c, 0x12,
0x1f, 0x0a, 0x0b, 0x69, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, 0x5f, 0x69, 0x64, 0x18, 0x08,
0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x69, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, 0x49, 0x64,
0x32, 0x5b, 0x0a, 0x0e, 0x4d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x53, 0x65, 0x72, 0x76, 0x69,
0x63, 0x65, 0x12, 0x49, 0x0a, 0x07, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x12, 0x1c, 0x2e,
0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x2e, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x53, 0x74,
0x72, 0x65, 0x61, 0x6d, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x1a, 0x1a, 0x2e, 0x6d, 0x61,
0x6e, 0x61, 0x67, 0x65, 0x72, 0x2e, 0x43, 0x6f, 0x6d, 0x70, 0x75, 0x74, 0x61, 0x74, 0x69, 0x6f,
0x6e, 0x52, 0x75, 0x6e, 0x52, 0x65, 0x71, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x42, 0x0b, 0x5a,
0x09, 0x2e, 0x2f, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x33,
}
var (
@@ -467,27 +777,35 @@ func file_manager_manager_proto_rawDescGZIP() []byte {
return file_manager_manager_proto_rawDescData
}
var file_manager_manager_proto_msgTypes = make([]protoimpl.MessageInfo, 6)
var file_manager_manager_proto_msgTypes = make([]protoimpl.MessageInfo, 9)
var file_manager_manager_proto_goTypes = []interface{}{
(*RunRequest)(nil), // 0: manager.RunRequest
(*Computation)(nil), // 1: manager.Computation
(*Dataset)(nil), // 2: manager.Dataset
(*Algorithm)(nil), // 3: manager.Algorithm
(*RunResponse)(nil), // 4: manager.RunResponse
(*AgentConfig)(nil), // 5: manager.AgentConfig
(*WhoAmIRequest)(nil), // 0: manager.WhoAmIRequest
(*RunResponse)(nil), // 1: manager.RunResponse
(*AgentEvent)(nil), // 2: manager.AgentEvent
(*AgentLog)(nil), // 3: manager.AgentLog
(*ClientStreamMessage)(nil), // 4: manager.ClientStreamMessage
(*ComputationRunReq)(nil), // 5: manager.ComputationRunReq
(*Dataset)(nil), // 6: manager.Dataset
(*Algorithm)(nil), // 7: manager.Algorithm
(*AgentConfig)(nil), // 8: manager.AgentConfig
(*timestamppb.Timestamp)(nil), // 9: google.protobuf.Timestamp
}
var file_manager_manager_proto_depIdxs = []int32{
1, // 0: manager.RunRequest.computation:type_name -> manager.Computation
2, // 1: manager.Computation.datasets:type_name -> manager.Dataset
3, // 2: manager.Computation.algorithms:type_name -> manager.Algorithm
5, // 3: manager.Computation.agent_config:type_name -> manager.AgentConfig
0, // 4: manager.ManagerService.Run:input_type -> manager.RunRequest
4, // 5: manager.ManagerService.Run:output_type -> manager.RunResponse
5, // [5:6] is the sub-list for method output_type
4, // [4:5] is the sub-list for method input_type
4, // [4:4] is the sub-list for extension type_name
4, // [4:4] is the sub-list for extension extendee
0, // [0:4] is the sub-list for field type_name
9, // 0: manager.AgentEvent.timestamp:type_name -> google.protobuf.Timestamp
0, // 1: manager.ClientStreamMessage.whoami_request:type_name -> manager.WhoAmIRequest
3, // 2: manager.ClientStreamMessage.agent_log:type_name -> manager.AgentLog
2, // 3: manager.ClientStreamMessage.agent_event:type_name -> manager.AgentEvent
1, // 4: manager.ClientStreamMessage.run_res:type_name -> manager.RunResponse
6, // 5: manager.ComputationRunReq.datasets:type_name -> manager.Dataset
7, // 6: manager.ComputationRunReq.algorithms:type_name -> manager.Algorithm
8, // 7: manager.ComputationRunReq.agent_config:type_name -> manager.AgentConfig
4, // 8: manager.ManagerService.Process:input_type -> manager.ClientStreamMessage
5, // 9: manager.ManagerService.Process:output_type -> manager.ComputationRunReq
9, // [9:10] is the sub-list for method output_type
8, // [8:9] is the sub-list for method input_type
8, // [8:8] is the sub-list for extension type_name
8, // [8:8] is the sub-list for extension extendee
0, // [0:8] is the sub-list for field type_name
}
func init() { file_manager_manager_proto_init() }
@@ -497,7 +815,7 @@ func file_manager_manager_proto_init() {
}
if !protoimpl.UnsafeEnabled {
file_manager_manager_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*RunRequest); i {
switch v := v.(*WhoAmIRequest); i {
case 0:
return &v.state
case 1:
@@ -509,42 +827,6 @@ func file_manager_manager_proto_init() {
}
}
file_manager_manager_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*Computation); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_manager_manager_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*Dataset); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_manager_manager_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*Algorithm); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_manager_manager_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*RunResponse); i {
case 0:
return &v.state
@@ -556,7 +838,79 @@ func file_manager_manager_proto_init() {
return nil
}
}
file_manager_manager_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*AgentEvent); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_manager_manager_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*AgentLog); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_manager_manager_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ClientStreamMessage); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_manager_manager_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ComputationRunReq); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_manager_manager_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*Dataset); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_manager_manager_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*Algorithm); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_manager_manager_proto_msgTypes[8].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*AgentConfig); i {
case 0:
return &v.state
@@ -569,13 +923,19 @@ func file_manager_manager_proto_init() {
}
}
}
file_manager_manager_proto_msgTypes[4].OneofWrappers = []interface{}{
(*ClientStreamMessage_WhoamiRequest)(nil),
(*ClientStreamMessage_AgentLog)(nil),
(*ClientStreamMessage_AgentEvent)(nil),
(*ClientStreamMessage_RunRes)(nil),
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_manager_manager_proto_rawDesc,
NumEnums: 0,
NumMessages: 6,
NumMessages: 9,
NumExtensions: 0,
NumServices: 1,
},
+37 -10
View File
@@ -3,19 +3,47 @@
syntax = "proto3";
import "google/protobuf/timestamp.proto";
package manager;
option go_package = "./manager";
service ManagerService {
rpc Run(RunRequest) returns (RunResponse) {}
rpc Process(stream ClientStreamMessage) returns (stream ComputationRunReq) {}
}
message RunRequest {
Computation computation = 1;
message WhoAmIRequest {
}
message Computation {
message RunResponse{
string agent_port = 1;
}
message AgentEvent {
string event_type = 1;
google.protobuf.Timestamp timestamp = 2;
string computation_id = 3;
bytes details = 4;
string originator = 5;
string status = 6;
}
message AgentLog {
string log_message = 1;
string computation_id = 2;
}
message ClientStreamMessage {
oneof message {
WhoAmIRequest whoami_request = 1;
AgentLog agent_log = 2;
AgentEvent agent_event = 3;
RunResponse run_res = 4;
}
}
message ComputationRunReq {
string id = 1;
string name = 2;
string description = 3;
@@ -35,14 +63,13 @@ message Algorithm {
string id = 2;
}
message RunResponse {
string agent_address = 1;
}
message AgentConfig {
string port = 1;
string host = 2;
string cert_file = 3;
string key_file = 4;
string log_level = 5;
}
string client_ca_file = 5;
string server_ca_file = 6;
string log_level = 7;
string instance_id = 8;
}
+59 -27
View File
@@ -22,14 +22,14 @@ import (
const _ = grpc.SupportPackageIsVersion7
const (
ManagerService_Run_FullMethodName = "/manager.ManagerService/Run"
ManagerService_Process_FullMethodName = "/manager.ManagerService/Process"
)
// ManagerServiceClient is the client API for ManagerService service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type ManagerServiceClient interface {
Run(ctx context.Context, in *RunRequest, opts ...grpc.CallOption) (*RunResponse, error)
Process(ctx context.Context, opts ...grpc.CallOption) (ManagerService_ProcessClient, error)
}
type managerServiceClient struct {
@@ -40,20 +40,42 @@ func NewManagerServiceClient(cc grpc.ClientConnInterface) ManagerServiceClient {
return &managerServiceClient{cc}
}
func (c *managerServiceClient) Run(ctx context.Context, in *RunRequest, opts ...grpc.CallOption) (*RunResponse, error) {
out := new(RunResponse)
err := c.cc.Invoke(ctx, ManagerService_Run_FullMethodName, in, out, opts...)
func (c *managerServiceClient) Process(ctx context.Context, opts ...grpc.CallOption) (ManagerService_ProcessClient, error) {
stream, err := c.cc.NewStream(ctx, &ManagerService_ServiceDesc.Streams[0], ManagerService_Process_FullMethodName, opts...)
if err != nil {
return nil, err
}
return out, nil
x := &managerServiceProcessClient{stream}
return x, nil
}
type ManagerService_ProcessClient interface {
Send(*ClientStreamMessage) error
Recv() (*ComputationRunReq, error)
grpc.ClientStream
}
type managerServiceProcessClient struct {
grpc.ClientStream
}
func (x *managerServiceProcessClient) Send(m *ClientStreamMessage) error {
return x.ClientStream.SendMsg(m)
}
func (x *managerServiceProcessClient) Recv() (*ComputationRunReq, error) {
m := new(ComputationRunReq)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// ManagerServiceServer is the server API for ManagerService service.
// All implementations must embed UnimplementedManagerServiceServer
// for forward compatibility
type ManagerServiceServer interface {
Run(context.Context, *RunRequest) (*RunResponse, error)
Process(ManagerService_ProcessServer) error
mustEmbedUnimplementedManagerServiceServer()
}
@@ -61,8 +83,8 @@ type ManagerServiceServer interface {
type UnimplementedManagerServiceServer struct {
}
func (UnimplementedManagerServiceServer) Run(context.Context, *RunRequest) (*RunResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method Run not implemented")
func (UnimplementedManagerServiceServer) Process(ManagerService_ProcessServer) error {
return status.Errorf(codes.Unimplemented, "method Process not implemented")
}
func (UnimplementedManagerServiceServer) mustEmbedUnimplementedManagerServiceServer() {}
@@ -77,22 +99,30 @@ func RegisterManagerServiceServer(s grpc.ServiceRegistrar, srv ManagerServiceSer
s.RegisterService(&ManagerService_ServiceDesc, srv)
}
func _ManagerService_Run_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(RunRequest)
if err := dec(in); err != nil {
func _ManagerService_Process_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(ManagerServiceServer).Process(&managerServiceProcessServer{stream})
}
type ManagerService_ProcessServer interface {
Send(*ComputationRunReq) error
Recv() (*ClientStreamMessage, error)
grpc.ServerStream
}
type managerServiceProcessServer struct {
grpc.ServerStream
}
func (x *managerServiceProcessServer) Send(m *ComputationRunReq) error {
return x.ServerStream.SendMsg(m)
}
func (x *managerServiceProcessServer) Recv() (*ClientStreamMessage, error) {
m := new(ClientStreamMessage)
if err := x.ServerStream.RecvMsg(m); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ManagerServiceServer).Run(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: ManagerService_Run_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ManagerServiceServer).Run(ctx, req.(*RunRequest))
}
return interceptor(ctx, in, info, handler)
return m, nil
}
// ManagerService_ServiceDesc is the grpc.ServiceDesc for ManagerService service.
@@ -101,12 +131,14 @@ func _ManagerService_Run_Handler(srv interface{}, ctx context.Context, dec func(
var ManagerService_ServiceDesc = grpc.ServiceDesc{
ServiceName: "manager.ManagerService",
HandlerType: (*ManagerServiceServer)(nil),
Methods: []grpc.MethodDesc{
Methods: []grpc.MethodDesc{},
Streams: []grpc.StreamDesc{
{
MethodName: "Run",
Handler: _ManagerService_Run_Handler,
StreamName: "Process",
Handler: _ManagerService_Process_Handler,
ServerStreams: true,
ClientStreams: true,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "manager/manager.proto",
}
+25 -17
View File
@@ -13,8 +13,8 @@ import (
"github.com/absmach/magistrala/pkg/errors"
"github.com/cenkalti/backoff/v4"
"github.com/ultravioletrs/cocos/agent"
"github.com/ultravioletrs/cocos/internal/events"
"github.com/ultravioletrs/cocos/manager/qemu"
"google.golang.org/protobuf/types/known/timestamppb"
)
var (
@@ -36,33 +36,32 @@ var (
// Service specifies an API that must be fulfilled by the domain service
// implementation, and all of its decorators (e.g. logging & metrics).
type Service interface {
Run(ctx context.Context, c *Computation) (string, error)
Run(ctx context.Context, c *ComputationRunReq) (string, error)
}
type managerService struct {
qemuCfg qemu.Config
logger *slog.Logger
eventSvc events.Service
hostIP string
agents map[int]string // agent map of vsock cid to computationID.
qemuCfg qemu.Config
logger *slog.Logger
agents map[int]string // agent map of vsock cid to computationID.
eventsChan chan *ClientStreamMessage
}
var _ Service = (*managerService)(nil)
// New instantiates the manager service implementation.
func New(qemuCfg qemu.Config, logger *slog.Logger, eventSvc events.Service, hostIP string) Service {
func New(qemuCfg qemu.Config, logger *slog.Logger, eventsChan chan *ClientStreamMessage) Service {
ms := &managerService{
qemuCfg: qemuCfg,
eventSvc: eventSvc,
hostIP: hostIP,
logger: logger,
agents: make(map[int]string),
qemuCfg: qemuCfg,
logger: logger,
agents: make(map[int]string),
eventsChan: eventsChan,
}
go ms.retrieveAgentLogs()
go ms.retrieveAgentEvents()
return ms
}
func (ms *managerService) Run(ctx context.Context, c *Computation) (string, error) {
func (ms *managerService) Run(ctx context.Context, c *ComputationRunReq) (string, error) {
ms.publishEvent("vm-provision", c.Id, "starting", json.RawMessage{})
ac := agent.Computation{
ID: c.Id,
@@ -108,7 +107,7 @@ func (ms *managerService) Run(ctx context.Context, c *Computation) (string, erro
ms.qemuCfg.VSockConfig.GuestCID++
ms.publishEvent("vm-provision", c.Id, "complete", json.RawMessage{})
return fmt.Sprintf("%s:%d", ms.hostIP, ms.qemuCfg.HostFwdAgent), nil
return fmt.Sprint(ms.qemuCfg.HostFwdAgent), nil
}
func getFreePort() (int, error) {
@@ -129,7 +128,16 @@ func getFreePort() (int, error) {
}
func (ms *managerService) publishEvent(event, cmpID, status string, details json.RawMessage) {
if err := ms.eventSvc.SendEvent(event, cmpID, status, details); err != nil {
ms.logger.Warn(err.Error())
ms.eventsChan <- &ClientStreamMessage{
Message: &ClientStreamMessage_AgentEvent{
AgentEvent: &AgentEvent{
EventType: event,
ComputationId: cmpID,
Status: status,
Details: details,
Timestamp: timestamppb.Now(),
Originator: "manager",
},
},
}
}
+1 -1
View File
@@ -21,7 +21,7 @@ func New(svc manager.Service, tracer trace.Tracer) manager.Service {
return &tracingMiddleware{tracer, svc}
}
func (tm *tracingMiddleware) Run(ctx context.Context, mc *manager.Computation) (string, error) {
func (tm *tracingMiddleware) Run(ctx context.Context, mc *manager.ComputationRunReq) (string, error) {
ctx, span := tm.tracer.Start(ctx, "run")
defer span.End()
+1 -2
View File
@@ -4,7 +4,6 @@ package manager
import (
"github.com/ultravioletrs/cocos/manager"
managerapi "github.com/ultravioletrs/cocos/manager/api/grpc"
"github.com/ultravioletrs/cocos/pkg/clients/grpc"
)
@@ -15,5 +14,5 @@ func NewManagerClient(cfg grpc.Config) (grpc.Client, manager.ManagerServiceClien
return nil, nil, err
}
return client, managerapi.NewClient(client.Connection(), cfg.Timeout), nil
return client, manager.NewManagerServiceClient(client.Connection()), nil
}
-39
View File
@@ -1,39 +0,0 @@
// Copyright (c) Ultraviolet
// SPDX-License-Identifier: Apache-2.0
package sdk
import (
"context"
"log/slog"
"github.com/ultravioletrs/cocos/manager"
)
var _ manager.Service = (*managerSDK)(nil)
type managerSDK struct {
client manager.ManagerServiceClient
logger *slog.Logger
}
func NewManagerSDK(client manager.ManagerServiceClient, logger *slog.Logger) manager.Service {
return &managerSDK{
client: client,
logger: logger,
}
}
// Run deploys a new agent in a virtual machine.
func (sdk *managerSDK) Run(ctx context.Context, c *manager.Computation) (string, error) {
request := &manager.RunRequest{
Computation: c,
}
response, err := sdk.client.Run(ctx, request)
if err != nil {
sdk.logger.Error("Failed to call Run RPC")
return "", err
}
return response.AgentAddress, nil
}