mirror of
https://github.com/ultravioletrs/cocos.git
synced 2026-06-23 04:10:25 +00:00
NOISSUE - Remove CID tracking (#218)
* remove cid tracking Signed-off-by: Sammy Oina <sammyoina@gmail.com> * remove unused code Signed-off-by: Sammy Oina <sammyoina@gmail.com> --------- Signed-off-by: Sammy Oina <sammyoina@gmail.com>
This commit is contained in:
committed by
GitHub
parent
01c502e1a1
commit
d0c99479db
+1
-1
@@ -54,7 +54,7 @@ func main() {
|
||||
log.Println(err)
|
||||
return
|
||||
}
|
||||
handler := agentlogger.NewProtoHandler(conn, &slog.HandlerOptions{Level: level})
|
||||
handler := agentlogger.NewProtoHandler(conn, &slog.HandlerOptions{Level: level}, cfg.ID)
|
||||
logger := slog.New(handler)
|
||||
|
||||
eventSvc, err := events.New(svcName, cfg.ID, manager.ManagerVsockPort)
|
||||
|
||||
@@ -15,17 +15,19 @@ import (
|
||||
var _ slog.Handler = (*handler)(nil)
|
||||
|
||||
type handler struct {
|
||||
opts slog.HandlerOptions
|
||||
w io.Writer
|
||||
opts slog.HandlerOptions
|
||||
w io.Writer
|
||||
cmpID string
|
||||
}
|
||||
|
||||
func NewProtoHandler(w io.Writer, opts *slog.HandlerOptions) slog.Handler {
|
||||
func NewProtoHandler(w io.Writer, opts *slog.HandlerOptions, cmpID string) slog.Handler {
|
||||
if opts == nil {
|
||||
opts = &slog.HandlerOptions{}
|
||||
}
|
||||
return &handler{
|
||||
opts: *opts,
|
||||
w: w,
|
||||
opts: *opts,
|
||||
w: w,
|
||||
cmpID: cmpID,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -62,9 +64,10 @@ func (h *handler) Handle(_ context.Context, r slog.Record) error {
|
||||
agentLog := manager.ClientStreamMessage{
|
||||
Message: &manager.ClientStreamMessage_AgentLog{
|
||||
AgentLog: &manager.AgentLog{
|
||||
Timestamp: timestamp,
|
||||
Message: chunk,
|
||||
Level: level,
|
||||
Timestamp: timestamp,
|
||||
Message: chunk,
|
||||
Level: level,
|
||||
ComputationId: h.cmpID,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
@@ -5,10 +5,7 @@ package manager
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"regexp"
|
||||
"strconv"
|
||||
|
||||
"github.com/absmach/magistrala/pkg/errors"
|
||||
"github.com/mdlayher/vsock"
|
||||
"github.com/ultravioletrs/cocos/pkg/manager"
|
||||
"google.golang.org/protobuf/proto"
|
||||
@@ -19,8 +16,6 @@ const (
|
||||
messageSize int = 1024
|
||||
)
|
||||
|
||||
var errFailedToParseCID = errors.New("failed to parse cid from remote address")
|
||||
|
||||
// RetrieveAgentEventsLogs Retrieve and forward agent logs and events via vsock.
|
||||
func (ms *managerService) RetrieveAgentEventsLogs() {
|
||||
l, err := vsock.Listen(ManagerVsockPort, nil)
|
||||
@@ -49,22 +44,18 @@ func (ms *managerService) handleConnections(conn net.Conn) {
|
||||
ms.logger.Warn(err.Error())
|
||||
return
|
||||
}
|
||||
cmpID, err := ms.computationIDFromAddress(conn.RemoteAddr().String())
|
||||
if err != nil {
|
||||
ms.logger.Warn(err.Error())
|
||||
continue
|
||||
}
|
||||
var message manager.ClientStreamMessage
|
||||
if err := proto.Unmarshal(b[:n], &message); err != nil {
|
||||
ms.logger.Warn(err.Error())
|
||||
continue
|
||||
}
|
||||
cmpID := ""
|
||||
switch mes := message.Message.(type) {
|
||||
case *manager.ClientStreamMessage_AgentEvent:
|
||||
mes.AgentEvent.ComputationId = cmpID
|
||||
cmpID = mes.AgentEvent.ComputationId
|
||||
ms.eventsChan <- &manager.ClientStreamMessage{Message: mes}
|
||||
case *manager.ClientStreamMessage_AgentLog:
|
||||
mes.AgentLog.ComputationId = cmpID
|
||||
cmpID = mes.AgentLog.ComputationId
|
||||
ms.eventsChan <- &manager.ClientStreamMessage{Message: mes}
|
||||
default:
|
||||
ms.logger.Warn("Unexpected agent log or event type")
|
||||
@@ -73,17 +64,3 @@ func (ms *managerService) handleConnections(conn net.Conn) {
|
||||
ms.logger.Info(fmt.Sprintf("Agent Log/Event, Computation ID: %s, Message: %s", cmpID, message.String()))
|
||||
}
|
||||
}
|
||||
|
||||
func (ms *managerService) computationIDFromAddress(address string) (string, error) {
|
||||
re := regexp.MustCompile(`vm\((\d+)\)`)
|
||||
matches := re.FindStringSubmatch(address)
|
||||
|
||||
if len(matches) > 1 {
|
||||
cid, err := strconv.Atoi(matches[1])
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return ms.agents[cid], nil
|
||||
}
|
||||
return "", errFailedToParseCID
|
||||
}
|
||||
|
||||
@@ -63,7 +63,6 @@ type managerService struct {
|
||||
qemuCfg qemu.Config
|
||||
backendMeasurementBinaryPath string
|
||||
logger *slog.Logger
|
||||
agents map[int]string // agent map of vsock cid to computationID.
|
||||
eventsChan chan *manager.ClientStreamMessage
|
||||
vms map[string]vm.VM
|
||||
vmFactory vm.Provider
|
||||
@@ -82,7 +81,6 @@ func New(cfg qemu.Config, backendMeasurementBinPath string, logger *slog.Logger,
|
||||
ms := &managerService{
|
||||
qemuCfg: cfg,
|
||||
logger: logger,
|
||||
agents: make(map[int]string),
|
||||
vms: make(map[string]vm.VM),
|
||||
eventsChan: eventsChan,
|
||||
vmFactory: vmFactory,
|
||||
@@ -147,8 +145,6 @@ func (ms *managerService) Run(ctx context.Context, c *manager.ComputationRunReq)
|
||||
}
|
||||
ms.vms[c.Id] = cvm
|
||||
|
||||
ms.agents[ms.qemuCfg.VSockConfig.GuestCID] = c.Id
|
||||
|
||||
err = backoff.Retry(func() error {
|
||||
return cvm.SendAgentConfig(ac)
|
||||
}, backoff.NewExponentialBackOff())
|
||||
|
||||
@@ -92,7 +92,6 @@ func TestRun(t *testing.T) {
|
||||
ms := &managerService{
|
||||
qemuCfg: qemuCfg,
|
||||
logger: logger,
|
||||
agents: make(map[int]string),
|
||||
vms: make(map[string]vm.VM),
|
||||
eventsChan: eventsChan,
|
||||
vmFactory: vmf.Execute,
|
||||
@@ -110,7 +109,6 @@ func TestRun(t *testing.T) {
|
||||
assert.NoError(t, err)
|
||||
assert.NotEmpty(t, port)
|
||||
assert.Len(t, ms.vms, 1)
|
||||
assert.Len(t, ms.agents, 1)
|
||||
}
|
||||
|
||||
vmf.AssertExpectations(t)
|
||||
|
||||
Reference in New Issue
Block a user