NOISSUE - Streamline message processing to prevent potential message loss (#228)

* fix dropping of message response from manager

Signed-off-by: Sammy Oina <sammyoina@gmail.com>

* remove change

Signed-off-by: Sammy Oina <sammyoina@gmail.com>

* simplify

Signed-off-by: Sammy Oina <sammyoina@gmail.com>

* add message send timeout

Signed-off-by: Sammy Oina <sammyoina@gmail.com>

---------

Signed-off-by: Sammy Oina <sammyoina@gmail.com>
This commit is contained in:
Sammy Kerata Oina
2024-09-03 12:29:07 +03:00
committed by GitHub
parent 9ca045b06a
commit 7ba34b93bc
4 changed files with 36 additions and 30 deletions
+5 -4
View File
@@ -32,9 +32,10 @@ import (
)
const (
svcName = "manager"
envPrefixGRPC = "MANAGER_GRPC_"
envPrefixQemu = "MANAGER_QEMU_"
svcName = "manager"
envPrefixGRPC = "MANAGER_GRPC_"
envPrefixQemu = "MANAGER_QEMU_"
clientBufferSize = 100
)
type config struct {
@@ -112,7 +113,7 @@ func main() {
return
}
eventsChan := make(chan *pkgmanager.ClientStreamMessage)
eventsChan := make(chan *pkgmanager.ClientStreamMessage, clientBufferSize)
svc, err := newService(logger, tracer, qemuCfg, eventsChan, cfg.BackendMeasurementBinary)
if err != nil {
logger.Error(err.Error())
+18 -14
View File
@@ -6,6 +6,7 @@ import (
"bytes"
"context"
"log/slog"
"time"
"github.com/absmach/magistrala/pkg/errors"
"github.com/ultravioletrs/cocos/manager"
@@ -17,22 +18,23 @@ import (
var (
errTerminationFromServer = errors.New("server requested client termination")
errCorruptedManifest = errors.New("received manifest may be corrupted")
sendTimeout = 5 * time.Second
)
type ManagerClient struct {
stream pkgmanager.ManagerService_ProcessClient
svc manager.Service
responses chan *pkgmanager.ClientStreamMessage
logger *slog.Logger
stream pkgmanager.ManagerService_ProcessClient
svc manager.Service
messageQueue chan *pkgmanager.ClientStreamMessage
logger *slog.Logger
}
// NewClient returns new gRPC client instance.
func NewClient(stream pkgmanager.ManagerService_ProcessClient, svc manager.Service, responses chan *pkgmanager.ClientStreamMessage, logger *slog.Logger) ManagerClient {
func NewClient(stream pkgmanager.ManagerService_ProcessClient, svc manager.Service, messageQueue chan *pkgmanager.ClientStreamMessage, logger *slog.Logger) ManagerClient {
return ManagerClient{
stream: stream,
svc: svc,
responses: responses,
logger: logger,
stream: stream,
svc: svc,
messageQueue: messageQueue,
logger: logger,
}
}
@@ -147,7 +149,7 @@ func (client ManagerClient) handleOutgoingMessages(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
case mes := <-client.responses:
case mes := <-client.messageQueue:
if err := client.stream.Send(mes); err != nil {
return err
}
@@ -156,10 +158,12 @@ func (client ManagerClient) handleOutgoingMessages(ctx context.Context) error {
}
func (client ManagerClient) sendMessage(mes *pkgmanager.ClientStreamMessage) {
ctx, cancel := context.WithTimeout(context.Background(), sendTimeout)
defer cancel()
select {
case client.responses <- mes:
return
default:
client.logger.Warn("failed to send message to client")
case client.messageQueue <- mes:
case <-ctx.Done():
client.logger.Warn("Failed to send message: timeout exceeded")
}
}
+1
View File
@@ -41,6 +41,7 @@ func NewServer(incoming chan *manager.ClientStreamMessage, svc Service) manager.
func (s *grpcServer) Process(stream manager.ManagerService_ProcessServer) error {
runReqChan := make(chan *manager.ServerStreamMessage)
defer close(runReqChan)
client, ok := peer.FromContext(stream.Context())
if ok {
go s.svc.Run(client.Addr.String(), runReqChan, client.AuthInfo)
+12 -12
View File
@@ -65,6 +65,7 @@ type Service interface {
}
type managerService struct {
mu sync.Mutex
qemuCfg qemu.Config
backendMeasurementBinaryPath string
logger *slog.Logger
@@ -162,7 +163,9 @@ func (ms *managerService) Run(ctx context.Context, c *manager.ComputationRunReq)
ms.publishEvent("vm-provision", c.Id, "failed", json.RawMessage{})
return "", err
}
ms.mu.Lock()
ms.vms[c.Id] = cvm
ms.mu.Unlock()
pid := cvm.GetProcess()
@@ -189,6 +192,8 @@ func (ms *managerService) Run(ctx context.Context, c *manager.ComputationRunReq)
}
func (ms *managerService) Stop(ctx context.Context, computationID string) error {
ms.mu.Lock()
defer ms.mu.Unlock()
cvm, ok := ms.vms[computationID]
if !ok {
defer ms.publishEvent("stop-computation", computationID, "failed", json.RawMessage{})
@@ -307,13 +312,7 @@ func (ms *managerService) restoreVMs() error {
}
for _, state := range states {
exists, err := processExists(state.PID)
if err != nil {
ms.logger.Warn("Failed to check process existence", "computation", state.ID, "pid", state.PID, "error", err)
continue
}
if !exists {
if !ms.processExists(state.PID) {
if err := ms.persistence.DeleteVM(state.ID); err != nil {
ms.logger.Error("Failed to delete persisted VM state", "computation", state.ID, "error", err)
}
@@ -335,17 +334,18 @@ func (ms *managerService) restoreVMs() error {
return nil
}
func processExists(pid int) (bool, error) {
func (ms *managerService) processExists(pid int) bool {
process, err := os.FindProcess(pid)
if err != nil {
return false, err
ms.logger.Warn("Failed to find process", "pid", pid, "error", err)
return false
}
if err = process.Signal(syscall.Signal(0)); err == nil {
return true, nil
return true
}
if err == syscall.ESRCH {
return false, nil
return false
}
return false, err
return false
}