mirror of
https://github.com/ultravioletrs/cocos.git
synced 2026-06-23 04:10:25 +00:00
NOISSUE - Use a single listener for logs and events (#82)
* add handler Signed-off-by: SammyOina <sammyoina@gmail.com> * Refactor gRPC and Protobuf integration for manager service - Shifted Protobuf message definitions to a separate package `pkg/manager`. - Updated references throughout the codebase to import and use the new package for gRPC service definitions. - Enhanced AgentLog message with additional fields `level` and `timestamp`. - Removed direct dependencies on old Protobuf-generated types in favor of the new package. - Deleted obsolete Protobuf-generated files as they are now superseded by the new `pkg/manager`. - Streamlined event publishing and gRPC handling in the manager service to use the updated Protobuf messages. This refactoring improves modularity by centralizing Protobuf message definitions and decouples internal representation from the gRPC interface, aligning with best practices for microservice architecture. Additionally, the enriched logging structure paves the way for more detailed and fine-grained log analysis. Signed-off-by: SammyOina <sammyoina@gmail.com> * Refactor vsock event/log handling and config Streamlined event and log services in the manager by moving vsock listening functions out of `managerService` initialization and into dedicated `RetrieveAgentEventsLogs` methods. This change decouples the manager service creation from the actual start of log listening, adding clarity and flexibility in service management. Also moved logging middleware invocation outside of network handling loops to avoid unnecessary overhead. Additionally, the agent's vsock port configuration is now dynamically passed to the `New` function in the `events` package instead of relying on a hardcoded constant, allowing for greater configurability and testability. Finally, updated message structures for event and log sending to conform with the `ClientStreamMessage` definitions. These modifications should improve parsing and handling consistency and prepare our system for future enhancements related to inter-process communication. Signed-off-by: SammyOina <sammyoina@gmail.com> * fix linting errors Signed-off-by: SammyOina <sammyoina@gmail.com> * correct path to generated files Signed-off-by: SammyOina <sammyoina@gmail.com> * fix comments Signed-off-by: SammyOina <sammyoina@gmail.com> * remove uneccessary comments Signed-off-by: SammyOina <sammyoina@gmail.com> --------- Signed-off-by: SammyOina <sammyoina@gmail.com>
This commit is contained in:
committed by
GitHub
parent
938dd6cb78
commit
722b463b6a
@@ -6,7 +6,7 @@ on:
|
||||
- main
|
||||
paths:
|
||||
- "manager/manager.proto"
|
||||
- "manager/*.pb.go"
|
||||
- "pkg/manager/*.pb.go"
|
||||
- "agent/agent.proto"
|
||||
- "agent/*.pb.go"
|
||||
pull_request:
|
||||
@@ -14,7 +14,7 @@ on:
|
||||
- main
|
||||
paths:
|
||||
- "manager/manager.proto"
|
||||
- "manager/*.pb.go"
|
||||
- "pkg/manager/*.pb.go"
|
||||
- "agent/agent.proto"
|
||||
- "agent/*.pb.go"
|
||||
|
||||
|
||||
@@ -32,4 +32,4 @@ install-cli: cli
|
||||
|
||||
protoc:
|
||||
protoc -I. --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative agent/agent.proto
|
||||
protoc -I. --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative manager/manager.proto
|
||||
protoc -I. --go_out=./pkg --go_opt=paths=source_relative --go-grpc_out=./pkg --go-grpc_opt=paths=source_relative manager/manager.proto
|
||||
|
||||
+11
-10
@@ -7,10 +7,11 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/mdlayher/vsock"
|
||||
"github.com/ultravioletrs/cocos/pkg/manager"
|
||||
"google.golang.org/protobuf/proto"
|
||||
"google.golang.org/protobuf/types/known/timestamppb"
|
||||
)
|
||||
|
||||
const VsockEventsPort uint32 = 9998
|
||||
|
||||
type service struct {
|
||||
service string
|
||||
computationID string
|
||||
@@ -31,8 +32,8 @@ type Service interface {
|
||||
Close() error
|
||||
}
|
||||
|
||||
func New(svc, computationID string) (Service, error) {
|
||||
conn, err := vsock.Dial(vsock.Host, VsockEventsPort, nil)
|
||||
func New(svc, computationID string, sockPort uint32) (Service, error) {
|
||||
conn, err := vsock.Dial(vsock.Host, sockPort, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -44,19 +45,19 @@ func New(svc, computationID string) (Service, error) {
|
||||
}
|
||||
|
||||
func (s *service) SendEvent(event, status string, details json.RawMessage) error {
|
||||
body := AgentEvent{
|
||||
body := manager.ClientStreamMessage{Message: &manager.ClientStreamMessage_AgentEvent{AgentEvent: &manager.AgentEvent{
|
||||
EventType: event,
|
||||
Timestamp: time.Now(),
|
||||
ComputationID: s.computationID,
|
||||
Timestamp: timestamppb.Now(),
|
||||
ComputationId: s.computationID,
|
||||
Originator: s.service,
|
||||
Status: status,
|
||||
Details: details,
|
||||
}
|
||||
jsonBody, err := json.Marshal(body)
|
||||
}}}
|
||||
protoBody, err := proto.Marshal(&body)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := s.conn.Write(jsonBody); err != nil {
|
||||
if _, err := s.conn.Write(protoBody); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
|
||||
+8
-6
@@ -9,13 +9,13 @@ import (
|
||||
"log"
|
||||
"log/slog"
|
||||
|
||||
mglog "github.com/absmach/magistrala/logger"
|
||||
"github.com/mdlayher/vsock"
|
||||
"github.com/ultravioletrs/cocos/agent"
|
||||
"github.com/ultravioletrs/cocos/agent/api"
|
||||
agentgrpc "github.com/ultravioletrs/cocos/agent/api/grpc"
|
||||
"github.com/ultravioletrs/cocos/agent/events"
|
||||
"github.com/ultravioletrs/cocos/internal"
|
||||
agentlogger "github.com/ultravioletrs/cocos/internal/logger"
|
||||
"github.com/ultravioletrs/cocos/internal/server"
|
||||
grpcserver "github.com/ultravioletrs/cocos/internal/server/grpc"
|
||||
"github.com/ultravioletrs/cocos/manager"
|
||||
@@ -38,18 +38,20 @@ func main() {
|
||||
log.Fatalf("failed to read agent configuration from vsock %s", err.Error())
|
||||
}
|
||||
|
||||
conn, err := vsock.Dial(vsock.Host, manager.VsockLogsPort, nil)
|
||||
conn, err := vsock.Dial(vsock.Host, manager.ManagerVsockPort, nil)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
defer conn.Close()
|
||||
logger, err := mglog.New(conn, cfg.AgentConfig.LogLevel)
|
||||
if err != nil {
|
||||
log.Print(err.Error())
|
||||
var level slog.Level
|
||||
if err := level.UnmarshalText([]byte(cfg.AgentConfig.LogLevel)); err != nil {
|
||||
log.Println(err)
|
||||
return
|
||||
}
|
||||
handler := agentlogger.NewProtoHandler(conn, &slog.HandlerOptions{Level: level})
|
||||
logger := slog.New(handler)
|
||||
|
||||
eventSvc, err := events.New(svcName, cfg.ID)
|
||||
eventSvc, err := events.New(svcName, cfg.ID, manager.ManagerVsockPort)
|
||||
if err != nil {
|
||||
log.Printf("failed to create events service %s", err.Error())
|
||||
return
|
||||
|
||||
+5
-4
@@ -23,6 +23,7 @@ import (
|
||||
"github.com/ultravioletrs/cocos/manager/tracing"
|
||||
"github.com/ultravioletrs/cocos/pkg/clients/grpc"
|
||||
managergrpc "github.com/ultravioletrs/cocos/pkg/clients/grpc/manager"
|
||||
pkgmanager "github.com/ultravioletrs/cocos/pkg/manager"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
@@ -102,12 +103,12 @@ func main() {
|
||||
logger.Error(err.Error())
|
||||
return
|
||||
}
|
||||
if err := pc.Send(&manager.ClientStreamMessage{Message: &manager.ClientStreamMessage_Whoami{}}); err != nil {
|
||||
if err := pc.Send(&pkgmanager.ClientStreamMessage{Message: &pkgmanager.ClientStreamMessage_Whoami{}}); err != nil {
|
||||
logger.Error(err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
eventsChan := make(chan *manager.ClientStreamMessage)
|
||||
eventsChan := make(chan *pkgmanager.ClientStreamMessage)
|
||||
svc := newService(logger, tracer, qemuCfg, eventsChan)
|
||||
|
||||
mc := managerapi.NewClient(pc, svc, eventsChan)
|
||||
@@ -126,9 +127,9 @@ func main() {
|
||||
}
|
||||
}
|
||||
|
||||
func newService(logger *slog.Logger, tracer trace.Tracer, qemuCfg qemu.Config, eventsChan chan *manager.ClientStreamMessage) manager.Service {
|
||||
func newService(logger *slog.Logger, tracer trace.Tracer, qemuCfg qemu.Config, eventsChan chan *pkgmanager.ClientStreamMessage) manager.Service {
|
||||
svc := manager.New(qemuCfg, logger, eventsChan)
|
||||
|
||||
go svc.RetrieveAgentEventsLogs()
|
||||
svc = api.LoggingMiddleware(svc, logger)
|
||||
counter, latency := internal.MakeMetrics(svcName, "api")
|
||||
svc = api.MetricsMiddleware(svc, counter, latency)
|
||||
|
||||
@@ -0,0 +1,67 @@
|
||||
// Copyright (c) Ultraviolet
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
package logger
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"log/slog"
|
||||
|
||||
"github.com/ultravioletrs/cocos/pkg/manager"
|
||||
"google.golang.org/protobuf/proto"
|
||||
"google.golang.org/protobuf/types/known/timestamppb"
|
||||
)
|
||||
|
||||
var _ slog.Handler = (*handler)(nil)
|
||||
|
||||
type handler struct {
|
||||
opts slog.HandlerOptions
|
||||
w io.Writer
|
||||
}
|
||||
|
||||
func NewProtoHandler(w io.Writer, opts *slog.HandlerOptions) slog.Handler {
|
||||
if opts == nil {
|
||||
opts = &slog.HandlerOptions{}
|
||||
}
|
||||
return &handler{
|
||||
opts: *opts,
|
||||
w: w,
|
||||
}
|
||||
}
|
||||
|
||||
// Enabled implements slog.Handler.
|
||||
func (h *handler) Enabled(_ context.Context, l slog.Level) bool {
|
||||
minLevel := slog.LevelInfo
|
||||
if h.opts.Level != nil {
|
||||
minLevel = h.opts.Level.Level()
|
||||
}
|
||||
return l >= minLevel
|
||||
}
|
||||
|
||||
// Handle implements slog.Handler.
|
||||
func (h *handler) Handle(_ context.Context, r slog.Record) error {
|
||||
agentLog := manager.ClientStreamMessage{Message: &manager.ClientStreamMessage_AgentLog{AgentLog: &manager.AgentLog{
|
||||
Timestamp: timestamppb.New(r.Time),
|
||||
Message: r.Message,
|
||||
Level: r.Level.String(),
|
||||
}}}
|
||||
|
||||
b, err := proto.Marshal(&agentLog)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := h.w.Write(b); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// WithAttrs implements slog.Handler.
|
||||
func (*handler) WithAttrs(attrs []slog.Attr) slog.Handler {
|
||||
panic("unimplemented")
|
||||
}
|
||||
|
||||
// WithGroup implements slog.Handler.
|
||||
func (*handler) WithGroup(name string) slog.Handler {
|
||||
panic("unimplemented")
|
||||
}
|
||||
@@ -10,17 +10,20 @@ import (
|
||||
|
||||
"github.com/absmach/magistrala/pkg/errors"
|
||||
"github.com/mdlayher/vsock"
|
||||
"github.com/ultravioletrs/cocos/pkg/manager"
|
||||
"google.golang.org/protobuf/proto"
|
||||
)
|
||||
|
||||
const (
|
||||
VsockLogsPort = 9997
|
||||
messageSize int = 1024
|
||||
ManagerVsockPort = 9997
|
||||
messageSize int = 1024
|
||||
)
|
||||
|
||||
var errFailedToParseCID = errors.New("failed to parse cid from remote address")
|
||||
|
||||
func (ms *managerService) retrieveAgentLogs() {
|
||||
l, err := vsock.Listen(VsockLogsPort, nil)
|
||||
// RetrieveAgentEventsLogs Retrieve and forward agent logs and events via vsock.
|
||||
func (ms *managerService) RetrieveAgentEventsLogs() {
|
||||
l, err := vsock.Listen(ManagerVsockPort, nil)
|
||||
if err != nil {
|
||||
ms.logger.Warn(err.Error())
|
||||
return
|
||||
@@ -33,11 +36,11 @@ func (ms *managerService) retrieveAgentLogs() {
|
||||
continue
|
||||
}
|
||||
|
||||
go ms.handleLogsConnections(conn)
|
||||
go ms.handleConnections(conn)
|
||||
}
|
||||
}
|
||||
|
||||
func (ms *managerService) handleLogsConnections(conn net.Conn) {
|
||||
func (ms *managerService) handleConnections(conn net.Conn) {
|
||||
defer conn.Close()
|
||||
for {
|
||||
b := make([]byte, messageSize)
|
||||
@@ -51,8 +54,23 @@ func (ms *managerService) handleLogsConnections(conn net.Conn) {
|
||||
ms.logger.Warn(err.Error())
|
||||
continue
|
||||
}
|
||||
ms.logger.Info(fmt.Sprintf("Agent Log, Computation ID: %s, Log: %s", cmpID, string(b[:n])))
|
||||
ms.eventsChan <- &ClientStreamMessage{Message: &ClientStreamMessage_AgentLog{AgentLog: &AgentLog{ComputationId: cmpID, LogMessage: string(b[:n])}}}
|
||||
var message manager.ClientStreamMessage
|
||||
if err := proto.Unmarshal(b[:n], &message); err != nil {
|
||||
ms.logger.Warn(err.Error())
|
||||
continue
|
||||
}
|
||||
switch mes := message.Message.(type) {
|
||||
case *manager.ClientStreamMessage_AgentEvent:
|
||||
mes.AgentEvent.ComputationId = cmpID
|
||||
ms.eventsChan <- &manager.ClientStreamMessage{Message: mes}
|
||||
case *manager.ClientStreamMessage_AgentLog:
|
||||
mes.AgentLog.ComputationId = cmpID
|
||||
ms.eventsChan <- &manager.ClientStreamMessage{Message: mes}
|
||||
default:
|
||||
ms.logger.Warn("Unexpected agent log or event type")
|
||||
}
|
||||
|
||||
ms.logger.Info(fmt.Sprintf("Agent Log/Event, Computation ID: %s, Message: %s", cmpID, message.String()))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,55 +0,0 @@
|
||||
// Copyright (c) Ultraviolet
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
package manager
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"net"
|
||||
|
||||
"github.com/mdlayher/vsock"
|
||||
"github.com/ultravioletrs/cocos/agent/events"
|
||||
"google.golang.org/protobuf/types/known/timestamppb"
|
||||
)
|
||||
|
||||
func (s *managerService) retrieveAgentEvents() {
|
||||
l, err := vsock.Listen(events.VsockEventsPort, nil)
|
||||
if err != nil {
|
||||
s.logger.Warn(err.Error())
|
||||
return
|
||||
}
|
||||
for {
|
||||
conn, err := l.Accept()
|
||||
if err != nil {
|
||||
s.logger.Warn(err.Error())
|
||||
continue
|
||||
}
|
||||
go s.handleEventsConnections(conn)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *managerService) handleEventsConnections(conn net.Conn) {
|
||||
defer conn.Close()
|
||||
for {
|
||||
b := make([]byte, messageSize)
|
||||
n, err := conn.Read(b)
|
||||
if err != nil {
|
||||
s.logger.Warn(err.Error())
|
||||
return
|
||||
}
|
||||
var ev events.AgentEvent
|
||||
if err := json.Unmarshal(b[:n], &ev); err != nil {
|
||||
s.logger.Warn(err.Error())
|
||||
continue
|
||||
}
|
||||
s.eventsChan <- &ClientStreamMessage{
|
||||
Message: &ClientStreamMessage_AgentEvent{AgentEvent: &AgentEvent{
|
||||
EventType: ev.EventType,
|
||||
Timestamp: timestamppb.New(ev.Timestamp),
|
||||
ComputationId: ev.ComputationID,
|
||||
Details: ev.Details,
|
||||
Originator: ev.Originator,
|
||||
Status: ev.Status,
|
||||
}},
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -6,17 +6,18 @@ import (
|
||||
"context"
|
||||
|
||||
"github.com/ultravioletrs/cocos/manager"
|
||||
pkgmanager "github.com/ultravioletrs/cocos/pkg/manager"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
type ManagerClient struct {
|
||||
stream manager.ManagerService_ProcessClient
|
||||
stream pkgmanager.ManagerService_ProcessClient
|
||||
svc manager.Service
|
||||
responses chan *manager.ClientStreamMessage
|
||||
responses chan *pkgmanager.ClientStreamMessage
|
||||
}
|
||||
|
||||
// NewClient returns new gRPC client instance.
|
||||
func NewClient(stream manager.ManagerService_ProcessClient, svc manager.Service, responses chan *manager.ClientStreamMessage) ManagerClient {
|
||||
func NewClient(stream pkgmanager.ManagerService_ProcessClient, svc manager.Service, responses chan *pkgmanager.ClientStreamMessage) ManagerClient {
|
||||
return ManagerClient{
|
||||
stream: stream,
|
||||
svc: svc,
|
||||
@@ -37,8 +38,8 @@ func (client ManagerClient) Process(ctx context.Context) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
runRes := &manager.ClientStreamMessage_RunRes{RunRes: &manager.RunResponse{AgentPort: port, ComputationId: req.Id}}
|
||||
if err := client.stream.Send(&manager.ClientStreamMessage{Message: runRes}); err != nil {
|
||||
runRes := &pkgmanager.ClientStreamMessage_RunRes{RunRes: &pkgmanager.RunResponse{AgentPort: port, ComputationId: req.Id}}
|
||||
if err := client.stream.Send(&pkgmanager.ClientStreamMessage{Message: runRes}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,7 +5,7 @@ package grpc
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/ultravioletrs/cocos/manager"
|
||||
"github.com/ultravioletrs/cocos/pkg/manager"
|
||||
"golang.org/x/sync/errgroup"
|
||||
"google.golang.org/grpc/peer"
|
||||
)
|
||||
|
||||
@@ -13,6 +13,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/ultravioletrs/cocos/manager"
|
||||
pkgmanager "github.com/ultravioletrs/cocos/pkg/manager"
|
||||
)
|
||||
|
||||
var _ manager.Service = (*loggingMiddleware)(nil)
|
||||
@@ -27,7 +28,7 @@ func LoggingMiddleware(svc manager.Service, logger *slog.Logger) manager.Service
|
||||
return &loggingMiddleware{logger, svc}
|
||||
}
|
||||
|
||||
func (lm *loggingMiddleware) Run(ctx context.Context, mc *manager.ComputationRunReq) (agentAddr string, err error) {
|
||||
func (lm *loggingMiddleware) Run(ctx context.Context, mc *pkgmanager.ComputationRunReq) (agentAddr string, err error) {
|
||||
defer func(begin time.Time) {
|
||||
message := fmt.Sprintf("Method Run for computation took %s to complete", time.Since(begin))
|
||||
if err != nil {
|
||||
@@ -39,3 +40,7 @@ func (lm *loggingMiddleware) Run(ctx context.Context, mc *manager.ComputationRun
|
||||
|
||||
return lm.svc.Run(ctx, mc)
|
||||
}
|
||||
|
||||
func (lm *loggingMiddleware) RetrieveAgentEventsLogs() {
|
||||
lm.svc.RetrieveAgentEventsLogs()
|
||||
}
|
||||
|
||||
@@ -12,6 +12,7 @@ import (
|
||||
|
||||
"github.com/go-kit/kit/metrics"
|
||||
"github.com/ultravioletrs/cocos/manager"
|
||||
pkgmanager "github.com/ultravioletrs/cocos/pkg/manager"
|
||||
)
|
||||
|
||||
var _ manager.Service = (*metricsMiddleware)(nil)
|
||||
@@ -32,7 +33,7 @@ func MetricsMiddleware(svc manager.Service, counter metrics.Counter, latency met
|
||||
}
|
||||
}
|
||||
|
||||
func (ms *metricsMiddleware) Run(ctx context.Context, mc *manager.ComputationRunReq) (string, error) {
|
||||
func (ms *metricsMiddleware) Run(ctx context.Context, mc *pkgmanager.ComputationRunReq) (string, error) {
|
||||
defer func(begin time.Time) {
|
||||
ms.counter.With("method", "Run").Add(1)
|
||||
ms.latency.With("method", "Run").Observe(time.Since(begin).Seconds())
|
||||
@@ -40,3 +41,7 @@ func (ms *metricsMiddleware) Run(ctx context.Context, mc *manager.ComputationRun
|
||||
|
||||
return ms.svc.Run(ctx, mc)
|
||||
}
|
||||
|
||||
func (ms *metricsMiddleware) RetrieveAgentEventsLogs() {
|
||||
ms.svc.RetrieveAgentEventsLogs()
|
||||
}
|
||||
|
||||
@@ -31,8 +31,10 @@ message AgentEvent {
|
||||
}
|
||||
|
||||
message AgentLog {
|
||||
string log_message = 1;
|
||||
string message = 1;
|
||||
string computation_id = 2;
|
||||
string level = 3;
|
||||
google.protobuf.Timestamp timestamp = 4;
|
||||
}
|
||||
|
||||
message ClientStreamMessage {
|
||||
|
||||
+11
-9
@@ -14,6 +14,7 @@ import (
|
||||
"github.com/cenkalti/backoff/v4"
|
||||
"github.com/ultravioletrs/cocos/agent"
|
||||
"github.com/ultravioletrs/cocos/manager/qemu"
|
||||
"github.com/ultravioletrs/cocos/pkg/manager"
|
||||
"google.golang.org/protobuf/types/known/timestamppb"
|
||||
)
|
||||
|
||||
@@ -36,32 +37,33 @@ var (
|
||||
// Service specifies an API that must be fulfilled by the domain service
|
||||
// implementation, and all of its decorators (e.g. logging & metrics).
|
||||
type Service interface {
|
||||
Run(ctx context.Context, c *ComputationRunReq) (string, error)
|
||||
// Run create a computation.
|
||||
Run(ctx context.Context, c *manager.ComputationRunReq) (string, error)
|
||||
// RetrieveAgentEventsLogs Retrieve and forward agent logs and events via vsock.
|
||||
RetrieveAgentEventsLogs()
|
||||
}
|
||||
|
||||
type managerService struct {
|
||||
qemuCfg qemu.Config
|
||||
logger *slog.Logger
|
||||
agents map[int]string // agent map of vsock cid to computationID.
|
||||
eventsChan chan *ClientStreamMessage
|
||||
eventsChan chan *manager.ClientStreamMessage
|
||||
}
|
||||
|
||||
var _ Service = (*managerService)(nil)
|
||||
|
||||
// New instantiates the manager service implementation.
|
||||
func New(qemuCfg qemu.Config, logger *slog.Logger, eventsChan chan *ClientStreamMessage) Service {
|
||||
func New(qemuCfg qemu.Config, logger *slog.Logger, eventsChan chan *manager.ClientStreamMessage) Service {
|
||||
ms := &managerService{
|
||||
qemuCfg: qemuCfg,
|
||||
logger: logger,
|
||||
agents: make(map[int]string),
|
||||
eventsChan: eventsChan,
|
||||
}
|
||||
go ms.retrieveAgentLogs()
|
||||
go ms.retrieveAgentEvents()
|
||||
return ms
|
||||
}
|
||||
|
||||
func (ms *managerService) Run(ctx context.Context, c *ComputationRunReq) (string, error) {
|
||||
func (ms *managerService) Run(ctx context.Context, c *manager.ComputationRunReq) (string, error) {
|
||||
ms.publishEvent("vm-provision", c.Id, "starting", json.RawMessage{})
|
||||
ac := agent.Computation{
|
||||
ID: c.Id,
|
||||
@@ -130,9 +132,9 @@ func getFreePort() (int, error) {
|
||||
}
|
||||
|
||||
func (ms *managerService) publishEvent(event, cmpID, status string, details json.RawMessage) {
|
||||
ms.eventsChan <- &ClientStreamMessage{
|
||||
Message: &ClientStreamMessage_AgentEvent{
|
||||
AgentEvent: &AgentEvent{
|
||||
ms.eventsChan <- &manager.ClientStreamMessage{
|
||||
Message: &manager.ClientStreamMessage_AgentEvent{
|
||||
AgentEvent: &manager.AgentEvent{
|
||||
EventType: event,
|
||||
ComputationId: cmpID,
|
||||
Status: status,
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"context"
|
||||
|
||||
"github.com/ultravioletrs/cocos/manager"
|
||||
pkgmanager "github.com/ultravioletrs/cocos/pkg/manager"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
@@ -21,9 +22,13 @@ func New(svc manager.Service, tracer trace.Tracer) manager.Service {
|
||||
return &tracingMiddleware{tracer, svc}
|
||||
}
|
||||
|
||||
func (tm *tracingMiddleware) Run(ctx context.Context, mc *manager.ComputationRunReq) (string, error) {
|
||||
func (tm *tracingMiddleware) Run(ctx context.Context, mc *pkgmanager.ComputationRunReq) (string, error) {
|
||||
ctx, span := tm.tracer.Start(ctx, "run")
|
||||
defer span.End()
|
||||
|
||||
return tm.svc.Run(ctx, mc)
|
||||
}
|
||||
|
||||
func (tm *tracingMiddleware) RetrieveAgentEventsLogs() {
|
||||
tm.svc.RetrieveAgentEventsLogs()
|
||||
}
|
||||
|
||||
@@ -3,8 +3,8 @@
|
||||
package manager
|
||||
|
||||
import (
|
||||
"github.com/ultravioletrs/cocos/manager"
|
||||
"github.com/ultravioletrs/cocos/pkg/clients/grpc"
|
||||
"github.com/ultravioletrs/cocos/pkg/manager"
|
||||
)
|
||||
|
||||
// NewManagerClient creates new manager gRPC client instance.
|
||||
|
||||
@@ -209,8 +209,10 @@ type AgentLog struct {
|
||||
sizeCache protoimpl.SizeCache
|
||||
unknownFields protoimpl.UnknownFields
|
||||
|
||||
LogMessage string `protobuf:"bytes,1,opt,name=log_message,json=logMessage,proto3" json:"log_message,omitempty"`
|
||||
ComputationId string `protobuf:"bytes,2,opt,name=computation_id,json=computationId,proto3" json:"computation_id,omitempty"`
|
||||
Message string `protobuf:"bytes,1,opt,name=message,proto3" json:"message,omitempty"`
|
||||
ComputationId string `protobuf:"bytes,2,opt,name=computation_id,json=computationId,proto3" json:"computation_id,omitempty"`
|
||||
Level string `protobuf:"bytes,3,opt,name=level,proto3" json:"level,omitempty"`
|
||||
Timestamp *timestamppb.Timestamp `protobuf:"bytes,4,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
|
||||
}
|
||||
|
||||
func (x *AgentLog) Reset() {
|
||||
@@ -245,9 +247,9 @@ func (*AgentLog) Descriptor() ([]byte, []int) {
|
||||
return file_manager_manager_proto_rawDescGZIP(), []int{3}
|
||||
}
|
||||
|
||||
func (x *AgentLog) GetLogMessage() string {
|
||||
func (x *AgentLog) GetMessage() string {
|
||||
if x != nil {
|
||||
return x.LogMessage
|
||||
return x.Message
|
||||
}
|
||||
return ""
|
||||
}
|
||||
@@ -259,6 +261,20 @@ func (x *AgentLog) GetComputationId() string {
|
||||
return ""
|
||||
}
|
||||
|
||||
func (x *AgentLog) GetLevel() string {
|
||||
if x != nil {
|
||||
return x.Level
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func (x *AgentLog) GetTimestamp() *timestamppb.Timestamp {
|
||||
if x != nil {
|
||||
return x.Timestamp
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type ClientStreamMessage struct {
|
||||
state protoimpl.MessageState
|
||||
sizeCache protoimpl.SizeCache
|
||||
@@ -695,72 +711,77 @@ var file_manager_manager_proto_rawDesc = []byte{
|
||||
0x67, 0x69, 0x6e, 0x61, 0x74, 0x6f, 0x72, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x6f,
|
||||
0x72, 0x69, 0x67, 0x69, 0x6e, 0x61, 0x74, 0x6f, 0x72, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x61,
|
||||
0x74, 0x75, 0x73, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75,
|
||||
0x73, 0x22, 0x52, 0x0a, 0x08, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x4c, 0x6f, 0x67, 0x12, 0x1f, 0x0a,
|
||||
0x0b, 0x6c, 0x6f, 0x67, 0x5f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01,
|
||||
0x28, 0x09, 0x52, 0x0a, 0x6c, 0x6f, 0x67, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x25,
|
||||
0x0a, 0x0e, 0x63, 0x6f, 0x6d, 0x70, 0x75, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x69, 0x64,
|
||||
0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x63, 0x6f, 0x6d, 0x70, 0x75, 0x74, 0x61, 0x74,
|
||||
0x69, 0x6f, 0x6e, 0x49, 0x64, 0x22, 0xe6, 0x01, 0x0a, 0x13, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74,
|
||||
0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x29, 0x0a,
|
||||
0x06, 0x77, 0x68, 0x6f, 0x61, 0x6d, 0x69, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e,
|
||||
0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x2e, 0x57, 0x68, 0x6f, 0x41, 0x6d, 0x49, 0x48, 0x00,
|
||||
0x52, 0x06, 0x77, 0x68, 0x6f, 0x61, 0x6d, 0x69, 0x12, 0x30, 0x0a, 0x09, 0x61, 0x67, 0x65, 0x6e,
|
||||
0x74, 0x5f, 0x6c, 0x6f, 0x67, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x6d, 0x61,
|
||||
0x6e, 0x61, 0x67, 0x65, 0x72, 0x2e, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x4c, 0x6f, 0x67, 0x48, 0x00,
|
||||
0x52, 0x08, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x4c, 0x6f, 0x67, 0x12, 0x36, 0x0a, 0x0b, 0x61, 0x67,
|
||||
0x65, 0x6e, 0x74, 0x5f, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32,
|
||||
0x13, 0x2e, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x2e, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x45,
|
||||
0x76, 0x65, 0x6e, 0x74, 0x48, 0x00, 0x52, 0x0a, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x45, 0x76, 0x65,
|
||||
0x6e, 0x74, 0x12, 0x2f, 0x0a, 0x07, 0x72, 0x75, 0x6e, 0x5f, 0x72, 0x65, 0x73, 0x18, 0x04, 0x20,
|
||||
0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x2e, 0x52, 0x75,
|
||||
0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x48, 0x00, 0x52, 0x06, 0x72, 0x75, 0x6e,
|
||||
0x52, 0x65, 0x73, 0x42, 0x09, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x9f,
|
||||
0x02, 0x0a, 0x11, 0x43, 0x6f, 0x6d, 0x70, 0x75, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x75,
|
||||
0x6e, 0x52, 0x65, 0x71, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09,
|
||||
0x52, 0x02, 0x69, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01,
|
||||
0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x20, 0x0a, 0x0b, 0x64, 0x65, 0x73, 0x63,
|
||||
0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x64,
|
||||
0x65, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x2c, 0x0a, 0x08, 0x64, 0x61,
|
||||
0x74, 0x61, 0x73, 0x65, 0x74, 0x73, 0x18, 0x04, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x10, 0x2e, 0x6d,
|
||||
0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x2e, 0x44, 0x61, 0x74, 0x61, 0x73, 0x65, 0x74, 0x52, 0x08,
|
||||
0x64, 0x61, 0x74, 0x61, 0x73, 0x65, 0x74, 0x73, 0x12, 0x32, 0x0a, 0x0a, 0x61, 0x6c, 0x67, 0x6f,
|
||||
0x72, 0x69, 0x74, 0x68, 0x6d, 0x73, 0x18, 0x05, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x6d,
|
||||
0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x2e, 0x41, 0x6c, 0x67, 0x6f, 0x72, 0x69, 0x74, 0x68, 0x6d,
|
||||
0x52, 0x0a, 0x61, 0x6c, 0x67, 0x6f, 0x72, 0x69, 0x74, 0x68, 0x6d, 0x73, 0x12, 0x29, 0x0a, 0x10,
|
||||
0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x5f, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x72, 0x73,
|
||||
0x18, 0x06, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0f, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x43, 0x6f,
|
||||
0x6e, 0x73, 0x75, 0x6d, 0x65, 0x72, 0x73, 0x12, 0x37, 0x0a, 0x0c, 0x61, 0x67, 0x65, 0x6e, 0x74,
|
||||
0x5f, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, 0x07, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e,
|
||||
0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x2e, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x43, 0x6f, 0x6e,
|
||||
0x66, 0x69, 0x67, 0x52, 0x0b, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67,
|
||||
0x22, 0x35, 0x0a, 0x07, 0x44, 0x61, 0x74, 0x61, 0x73, 0x65, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x70,
|
||||
0x72, 0x6f, 0x76, 0x69, 0x64, 0x65, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x70,
|
||||
0x72, 0x6f, 0x76, 0x69, 0x64, 0x65, 0x72, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x02, 0x20,
|
||||
0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x22, 0x37, 0x0a, 0x09, 0x41, 0x6c, 0x67, 0x6f, 0x72,
|
||||
0x69, 0x74, 0x68, 0x6d, 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x72, 0x6f, 0x76, 0x69, 0x64, 0x65, 0x72,
|
||||
0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x70, 0x72, 0x6f, 0x76, 0x69, 0x64, 0x65, 0x72,
|
||||
0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64,
|
||||
0x22, 0xd6, 0x01, 0x0a, 0x0b, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67,
|
||||
0x12, 0x12, 0x0a, 0x04, 0x70, 0x6f, 0x72, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04,
|
||||
0x70, 0x6f, 0x72, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x68, 0x6f, 0x73, 0x74, 0x18, 0x02, 0x20, 0x01,
|
||||
0x28, 0x09, 0x52, 0x04, 0x68, 0x6f, 0x73, 0x74, 0x12, 0x1b, 0x0a, 0x09, 0x63, 0x65, 0x72, 0x74,
|
||||
0x5f, 0x66, 0x69, 0x6c, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x63, 0x65, 0x72,
|
||||
0x74, 0x46, 0x69, 0x6c, 0x65, 0x12, 0x19, 0x0a, 0x08, 0x6b, 0x65, 0x79, 0x5f, 0x66, 0x69, 0x6c,
|
||||
0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6b, 0x65, 0x79, 0x46, 0x69, 0x6c, 0x65,
|
||||
0x12, 0x24, 0x0a, 0x0e, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x5f, 0x63, 0x61, 0x5f, 0x66, 0x69,
|
||||
0x6c, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74,
|
||||
0x43, 0x61, 0x46, 0x69, 0x6c, 0x65, 0x12, 0x24, 0x0a, 0x0e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72,
|
||||
0x5f, 0x63, 0x61, 0x5f, 0x66, 0x69, 0x6c, 0x65, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c,
|
||||
0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x43, 0x61, 0x46, 0x69, 0x6c, 0x65, 0x12, 0x1b, 0x0a, 0x09,
|
||||
0x6c, 0x6f, 0x67, 0x5f, 0x6c, 0x65, 0x76, 0x65, 0x6c, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x52,
|
||||
0x08, 0x6c, 0x6f, 0x67, 0x4c, 0x65, 0x76, 0x65, 0x6c, 0x32, 0x5b, 0x0a, 0x0e, 0x4d, 0x61, 0x6e,
|
||||
0x61, 0x67, 0x65, 0x72, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x49, 0x0a, 0x07, 0x50,
|
||||
0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x12, 0x1c, 0x2e, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72,
|
||||
0x2e, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x4d, 0x65, 0x73,
|
||||
0x73, 0x61, 0x67, 0x65, 0x1a, 0x1a, 0x2e, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x2e, 0x43,
|
||||
0x6f, 0x6d, 0x70, 0x75, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x75, 0x6e, 0x52, 0x65, 0x71,
|
||||
0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x42, 0x0b, 0x5a, 0x09, 0x2e, 0x2f, 0x6d, 0x61, 0x6e, 0x61,
|
||||
0x67, 0x65, 0x72, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
|
||||
0x73, 0x22, 0x9b, 0x01, 0x0a, 0x08, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x4c, 0x6f, 0x67, 0x12, 0x18,
|
||||
0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52,
|
||||
0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x25, 0x0a, 0x0e, 0x63, 0x6f, 0x6d, 0x70,
|
||||
0x75, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09,
|
||||
0x52, 0x0d, 0x63, 0x6f, 0x6d, 0x70, 0x75, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x12,
|
||||
0x14, 0x0a, 0x05, 0x6c, 0x65, 0x76, 0x65, 0x6c, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05,
|
||||
0x6c, 0x65, 0x76, 0x65, 0x6c, 0x12, 0x38, 0x0a, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61,
|
||||
0x6d, 0x70, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c,
|
||||
0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73,
|
||||
0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x22,
|
||||
0xe6, 0x01, 0x0a, 0x13, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d,
|
||||
0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x29, 0x0a, 0x06, 0x77, 0x68, 0x6f, 0x61, 0x6d,
|
||||
0x69, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65,
|
||||
0x72, 0x2e, 0x57, 0x68, 0x6f, 0x41, 0x6d, 0x49, 0x48, 0x00, 0x52, 0x06, 0x77, 0x68, 0x6f, 0x61,
|
||||
0x6d, 0x69, 0x12, 0x30, 0x0a, 0x09, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x5f, 0x6c, 0x6f, 0x67, 0x18,
|
||||
0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x2e,
|
||||
0x41, 0x67, 0x65, 0x6e, 0x74, 0x4c, 0x6f, 0x67, 0x48, 0x00, 0x52, 0x08, 0x61, 0x67, 0x65, 0x6e,
|
||||
0x74, 0x4c, 0x6f, 0x67, 0x12, 0x36, 0x0a, 0x0b, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x5f, 0x65, 0x76,
|
||||
0x65, 0x6e, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x6d, 0x61, 0x6e, 0x61,
|
||||
0x67, 0x65, 0x72, 0x2e, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x48, 0x00,
|
||||
0x52, 0x0a, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x2f, 0x0a, 0x07,
|
||||
0x72, 0x75, 0x6e, 0x5f, 0x72, 0x65, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e,
|
||||
0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x2e, 0x52, 0x75, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f,
|
||||
0x6e, 0x73, 0x65, 0x48, 0x00, 0x52, 0x06, 0x72, 0x75, 0x6e, 0x52, 0x65, 0x73, 0x42, 0x09, 0x0a,
|
||||
0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x9f, 0x02, 0x0a, 0x11, 0x43, 0x6f, 0x6d,
|
||||
0x70, 0x75, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x75, 0x6e, 0x52, 0x65, 0x71, 0x12, 0x0e,
|
||||
0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x12,
|
||||
0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61,
|
||||
0x6d, 0x65, 0x12, 0x20, 0x0a, 0x0b, 0x64, 0x65, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f,
|
||||
0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x64, 0x65, 0x73, 0x63, 0x72, 0x69, 0x70,
|
||||
0x74, 0x69, 0x6f, 0x6e, 0x12, 0x2c, 0x0a, 0x08, 0x64, 0x61, 0x74, 0x61, 0x73, 0x65, 0x74, 0x73,
|
||||
0x18, 0x04, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x10, 0x2e, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72,
|
||||
0x2e, 0x44, 0x61, 0x74, 0x61, 0x73, 0x65, 0x74, 0x52, 0x08, 0x64, 0x61, 0x74, 0x61, 0x73, 0x65,
|
||||
0x74, 0x73, 0x12, 0x32, 0x0a, 0x0a, 0x61, 0x6c, 0x67, 0x6f, 0x72, 0x69, 0x74, 0x68, 0x6d, 0x73,
|
||||
0x18, 0x05, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72,
|
||||
0x2e, 0x41, 0x6c, 0x67, 0x6f, 0x72, 0x69, 0x74, 0x68, 0x6d, 0x52, 0x0a, 0x61, 0x6c, 0x67, 0x6f,
|
||||
0x72, 0x69, 0x74, 0x68, 0x6d, 0x73, 0x12, 0x29, 0x0a, 0x10, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74,
|
||||
0x5f, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x72, 0x73, 0x18, 0x06, 0x20, 0x03, 0x28, 0x09,
|
||||
0x52, 0x0f, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x72,
|
||||
0x73, 0x12, 0x37, 0x0a, 0x0c, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x5f, 0x63, 0x6f, 0x6e, 0x66, 0x69,
|
||||
0x67, 0x18, 0x07, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65,
|
||||
0x72, 0x2e, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x0b, 0x61,
|
||||
0x67, 0x65, 0x6e, 0x74, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x22, 0x35, 0x0a, 0x07, 0x44, 0x61,
|
||||
0x74, 0x61, 0x73, 0x65, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x72, 0x6f, 0x76, 0x69, 0x64, 0x65,
|
||||
0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x70, 0x72, 0x6f, 0x76, 0x69, 0x64, 0x65,
|
||||
0x72, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69,
|
||||
0x64, 0x22, 0x37, 0x0a, 0x09, 0x41, 0x6c, 0x67, 0x6f, 0x72, 0x69, 0x74, 0x68, 0x6d, 0x12, 0x1a,
|
||||
0x0a, 0x08, 0x70, 0x72, 0x6f, 0x76, 0x69, 0x64, 0x65, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09,
|
||||
0x52, 0x08, 0x70, 0x72, 0x6f, 0x76, 0x69, 0x64, 0x65, 0x72, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64,
|
||||
0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x22, 0xd6, 0x01, 0x0a, 0x0b, 0x41,
|
||||
0x67, 0x65, 0x6e, 0x74, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x12, 0x0a, 0x04, 0x70, 0x6f,
|
||||
0x72, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x70, 0x6f, 0x72, 0x74, 0x12, 0x12,
|
||||
0x0a, 0x04, 0x68, 0x6f, 0x73, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x68, 0x6f,
|
||||
0x73, 0x74, 0x12, 0x1b, 0x0a, 0x09, 0x63, 0x65, 0x72, 0x74, 0x5f, 0x66, 0x69, 0x6c, 0x65, 0x18,
|
||||
0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x63, 0x65, 0x72, 0x74, 0x46, 0x69, 0x6c, 0x65, 0x12,
|
||||
0x19, 0x0a, 0x08, 0x6b, 0x65, 0x79, 0x5f, 0x66, 0x69, 0x6c, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28,
|
||||
0x09, 0x52, 0x07, 0x6b, 0x65, 0x79, 0x46, 0x69, 0x6c, 0x65, 0x12, 0x24, 0x0a, 0x0e, 0x63, 0x6c,
|
||||
0x69, 0x65, 0x6e, 0x74, 0x5f, 0x63, 0x61, 0x5f, 0x66, 0x69, 0x6c, 0x65, 0x18, 0x05, 0x20, 0x01,
|
||||
0x28, 0x09, 0x52, 0x0c, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x43, 0x61, 0x46, 0x69, 0x6c, 0x65,
|
||||
0x12, 0x24, 0x0a, 0x0e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x5f, 0x63, 0x61, 0x5f, 0x66, 0x69,
|
||||
0x6c, 0x65, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72,
|
||||
0x43, 0x61, 0x46, 0x69, 0x6c, 0x65, 0x12, 0x1b, 0x0a, 0x09, 0x6c, 0x6f, 0x67, 0x5f, 0x6c, 0x65,
|
||||
0x76, 0x65, 0x6c, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x6c, 0x6f, 0x67, 0x4c, 0x65,
|
||||
0x76, 0x65, 0x6c, 0x32, 0x5b, 0x0a, 0x0e, 0x4d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x53, 0x65,
|
||||
0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x49, 0x0a, 0x07, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73,
|
||||
0x12, 0x1c, 0x2e, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x2e, 0x43, 0x6c, 0x69, 0x65, 0x6e,
|
||||
0x74, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x1a, 0x1a,
|
||||
0x2e, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x2e, 0x43, 0x6f, 0x6d, 0x70, 0x75, 0x74, 0x61,
|
||||
0x74, 0x69, 0x6f, 0x6e, 0x52, 0x75, 0x6e, 0x52, 0x65, 0x71, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01,
|
||||
0x42, 0x0b, 0x5a, 0x09, 0x2e, 0x2f, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x62, 0x06, 0x70,
|
||||
0x72, 0x6f, 0x74, 0x6f, 0x33,
|
||||
}
|
||||
|
||||
var (
|
||||
@@ -789,21 +810,22 @@ var file_manager_manager_proto_goTypes = []interface{}{
|
||||
(*timestamppb.Timestamp)(nil), // 9: google.protobuf.Timestamp
|
||||
}
|
||||
var file_manager_manager_proto_depIdxs = []int32{
|
||||
9, // 0: manager.AgentEvent.timestamp:type_name -> google.protobuf.Timestamp
|
||||
0, // 1: manager.ClientStreamMessage.whoami:type_name -> manager.WhoAmI
|
||||
3, // 2: manager.ClientStreamMessage.agent_log:type_name -> manager.AgentLog
|
||||
2, // 3: manager.ClientStreamMessage.agent_event:type_name -> manager.AgentEvent
|
||||
1, // 4: manager.ClientStreamMessage.run_res:type_name -> manager.RunResponse
|
||||
6, // 5: manager.ComputationRunReq.datasets:type_name -> manager.Dataset
|
||||
7, // 6: manager.ComputationRunReq.algorithms:type_name -> manager.Algorithm
|
||||
8, // 7: manager.ComputationRunReq.agent_config:type_name -> manager.AgentConfig
|
||||
4, // 8: manager.ManagerService.Process:input_type -> manager.ClientStreamMessage
|
||||
5, // 9: manager.ManagerService.Process:output_type -> manager.ComputationRunReq
|
||||
9, // [9:10] is the sub-list for method output_type
|
||||
8, // [8:9] is the sub-list for method input_type
|
||||
8, // [8:8] is the sub-list for extension type_name
|
||||
8, // [8:8] is the sub-list for extension extendee
|
||||
0, // [0:8] is the sub-list for field type_name
|
||||
9, // 0: manager.AgentEvent.timestamp:type_name -> google.protobuf.Timestamp
|
||||
9, // 1: manager.AgentLog.timestamp:type_name -> google.protobuf.Timestamp
|
||||
0, // 2: manager.ClientStreamMessage.whoami:type_name -> manager.WhoAmI
|
||||
3, // 3: manager.ClientStreamMessage.agent_log:type_name -> manager.AgentLog
|
||||
2, // 4: manager.ClientStreamMessage.agent_event:type_name -> manager.AgentEvent
|
||||
1, // 5: manager.ClientStreamMessage.run_res:type_name -> manager.RunResponse
|
||||
6, // 6: manager.ComputationRunReq.datasets:type_name -> manager.Dataset
|
||||
7, // 7: manager.ComputationRunReq.algorithms:type_name -> manager.Algorithm
|
||||
8, // 8: manager.ComputationRunReq.agent_config:type_name -> manager.AgentConfig
|
||||
4, // 9: manager.ManagerService.Process:input_type -> manager.ClientStreamMessage
|
||||
5, // 10: manager.ManagerService.Process:output_type -> manager.ComputationRunReq
|
||||
10, // [10:11] is the sub-list for method output_type
|
||||
9, // [9:10] is the sub-list for method input_type
|
||||
9, // [9:9] is the sub-list for extension type_name
|
||||
9, // [9:9] is the sub-list for extension extendee
|
||||
0, // [0:9] is the sub-list for field type_name
|
||||
}
|
||||
|
||||
func init() { file_manager_manager_proto_init() }
|
||||
@@ -13,8 +13,8 @@ import (
|
||||
"github.com/ultravioletrs/cocos/internal/env"
|
||||
"github.com/ultravioletrs/cocos/internal/server"
|
||||
grpcserver "github.com/ultravioletrs/cocos/internal/server/grpc"
|
||||
"github.com/ultravioletrs/cocos/manager"
|
||||
managergrpc "github.com/ultravioletrs/cocos/manager/api/grpc"
|
||||
"github.com/ultravioletrs/cocos/pkg/manager"
|
||||
"golang.org/x/sync/errgroup"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/reflection"
|
||||
|
||||
@@ -11,6 +11,8 @@ import (
|
||||
"log"
|
||||
|
||||
"github.com/mdlayher/vsock"
|
||||
"github.com/ultravioletrs/cocos/pkg/manager"
|
||||
"google.golang.org/protobuf/proto"
|
||||
)
|
||||
|
||||
const VsockConfigPort uint32 = 9999
|
||||
@@ -71,10 +73,6 @@ func main() {
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
l2, err := vsock.Listen(9998, nil)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
ac := Computation{
|
||||
ID: "123",
|
||||
Datasets: Datasets{Dataset{ID: "1", Provider: "pr1"}},
|
||||
@@ -91,30 +89,23 @@ func main() {
|
||||
for {
|
||||
conn, err := l.Accept()
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
continue
|
||||
}
|
||||
b := make([]byte, 1024)
|
||||
n, err := conn.Read(b)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
continue
|
||||
}
|
||||
conn.Close()
|
||||
fmt.Println(string(b[:n]))
|
||||
var mes manager.ClientStreamMessage
|
||||
if err := proto.Unmarshal(b[:n], &mes); err != nil {
|
||||
log.Println(err)
|
||||
}
|
||||
fmt.Println(mes.String())
|
||||
}
|
||||
}()
|
||||
for {
|
||||
conn, err := l2.Accept()
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
b := make([]byte, 1024)
|
||||
n, err := conn.Read(b)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
conn.Close()
|
||||
fmt.Println(string(b[:n]))
|
||||
}
|
||||
}
|
||||
|
||||
func SendAgentConfig(cid uint32, ac Computation) error {
|
||||
|
||||
Reference in New Issue
Block a user