NOISSUE - Exit on network failures only (#227)

Signed-off-by: Sammy Oina <sammyoina@gmail.com>
This commit is contained in:
Sammy Kerata Oina
2024-08-30 16:30:51 +03:00
committed by GitHub
parent dc349e1f1f
commit 5383f4465b
2 changed files with 121 additions and 71 deletions
+1 -1
View File
@@ -109,7 +109,7 @@ func main() {
return
}
mc := managerapi.NewClient(pc, svc, eventsChan)
mc := managerapi.NewClient(pc, svc, eventsChan, logger)
g.Go(func() error {
return mc.Process(ctx, cancel)
+120 -70
View File
@@ -5,6 +5,7 @@ package grpc
import (
"bytes"
"context"
"log/slog"
"github.com/absmach/magistrala/pkg/errors"
"github.com/ultravioletrs/cocos/manager"
@@ -22,14 +23,16 @@ type ManagerClient struct {
stream pkgmanager.ManagerService_ProcessClient
svc manager.Service
responses chan *pkgmanager.ClientStreamMessage
logger *slog.Logger
}
// NewClient returns new gRPC client instance.
func NewClient(stream pkgmanager.ManagerService_ProcessClient, svc manager.Service, responses chan *pkgmanager.ClientStreamMessage) ManagerClient {
func NewClient(stream pkgmanager.ManagerService_ProcessClient, svc manager.Service, responses chan *pkgmanager.ClientStreamMessage, logger *slog.Logger) ManagerClient {
return ManagerClient{
stream: stream,
svc: svc,
responses: responses,
logger: logger,
}
}
@@ -37,79 +40,126 @@ func (client ManagerClient) Process(ctx context.Context, cancel context.CancelFu
eg, ctx := errgroup.WithContext(ctx)
eg.Go(func() error {
var runReqBuffer bytes.Buffer
for {
req, err := client.stream.Recv()
if err != nil {
return err
}
switch mes := req.Message.(type) {
case *pkgmanager.ServerStreamMessage_RunReqChunks:
if len(mes.RunReqChunks.Data) == 0 {
var runReq pkgmanager.ComputationRunReq
if err = proto.Unmarshal(runReqBuffer.Bytes(), &runReq); err != nil {
return errors.Wrap(err, errCorruptedManifest)
}
port, err := client.svc.Run(ctx, &runReq)
if err != nil {
return err
}
runRes := &pkgmanager.ClientStreamMessage_RunRes{
RunRes: &pkgmanager.RunResponse{
AgentPort: port,
ComputationId: runReq.Id,
},
}
if err := client.stream.Send(&pkgmanager.ClientStreamMessage{Message: runRes}); err != nil {
return err
}
}
if _, err := runReqBuffer.Write(mes.RunReqChunks.Data); err != nil {
return err
}
case *pkgmanager.ServerStreamMessage_TerminateReq:
cancel()
return errors.Wrap(errTerminationFromServer, errors.New(mes.TerminateReq.Message))
case *pkgmanager.ServerStreamMessage_StopComputation:
msg := &pkgmanager.ClientStreamMessage_StopComputationRes{StopComputationRes: &pkgmanager.StopComputationResponse{
ComputationId: mes.StopComputation.ComputationId,
}}
if err := client.svc.Stop(ctx, mes.StopComputation.ComputationId); err != nil {
msg.StopComputationRes.Message = err.Error()
}
if err := client.stream.Send(&pkgmanager.ClientStreamMessage{Message: msg}); err != nil {
return err
}
case *pkgmanager.ServerStreamMessage_BackendInfoReq:
res, err := client.svc.FetchBackendInfo()
if err != nil {
return err
}
info := &pkgmanager.ClientStreamMessage_BackendInfo{BackendInfo: &pkgmanager.BackendInfo{
Info: res,
Id: mes.BackendInfoReq.Id,
}}
if err := client.stream.Send(&pkgmanager.ClientStreamMessage{Message: info}); err != nil {
return err
}
}
}
return client.handleIncomingMessages(ctx)
})
eg.Go(func() error {
for {
select {
case <-ctx.Done():
return nil
case mes := <-client.responses:
if err := client.stream.Send(mes); err != nil {
return err
}
}
}
return client.handleOutgoingMessages(ctx)
})
return eg.Wait()
}
func (client ManagerClient) handleIncomingMessages(ctx context.Context) error {
var runReqBuffer bytes.Buffer
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
req, err := client.stream.Recv()
if err != nil {
return err
}
if err := client.processIncomingMessage(ctx, req, &runReqBuffer); err != nil {
return err
}
}
}
}
func (client ManagerClient) processIncomingMessage(ctx context.Context, req *pkgmanager.ServerStreamMessage, runReqBuffer *bytes.Buffer) error {
switch mes := req.Message.(type) {
case *pkgmanager.ServerStreamMessage_RunReqChunks:
return client.handleRunReqChunks(ctx, mes, runReqBuffer)
case *pkgmanager.ServerStreamMessage_TerminateReq:
return client.handleTerminateReq(mes)
case *pkgmanager.ServerStreamMessage_StopComputation:
go client.handleStopComputation(ctx, mes)
case *pkgmanager.ServerStreamMessage_BackendInfoReq:
go client.handleBackendInfoReq(ctx, mes)
default:
return errors.New("unknown message type")
}
return nil
}
func (client ManagerClient) handleRunReqChunks(ctx context.Context, mes *pkgmanager.ServerStreamMessage_RunReqChunks, runReqBuffer *bytes.Buffer) error {
if len(mes.RunReqChunks.Data) == 0 {
var runReq pkgmanager.ComputationRunReq
if err := proto.Unmarshal(runReqBuffer.Bytes(), &runReq); err != nil {
return errors.Wrap(err, errCorruptedManifest)
}
go client.executeRun(ctx, &runReq)
}
_, err := runReqBuffer.Write(mes.RunReqChunks.Data)
return err
}
func (client ManagerClient) executeRun(ctx context.Context, runReq *pkgmanager.ComputationRunReq) {
port, err := client.svc.Run(ctx, runReq)
if err != nil {
client.logger.Warn(err.Error())
return
}
runRes := &pkgmanager.ClientStreamMessage_RunRes{
RunRes: &pkgmanager.RunResponse{
AgentPort: port,
ComputationId: runReq.Id,
},
}
client.sendMessage(&pkgmanager.ClientStreamMessage{Message: runRes})
}
func (client ManagerClient) handleTerminateReq(mes *pkgmanager.ServerStreamMessage_TerminateReq) error {
return errors.Wrap(errTerminationFromServer, errors.New(mes.TerminateReq.Message))
}
func (client ManagerClient) handleStopComputation(ctx context.Context, mes *pkgmanager.ServerStreamMessage_StopComputation) {
msg := &pkgmanager.ClientStreamMessage_StopComputationRes{
StopComputationRes: &pkgmanager.StopComputationResponse{
ComputationId: mes.StopComputation.ComputationId,
},
}
if err := client.svc.Stop(ctx, mes.StopComputation.ComputationId); err != nil {
msg.StopComputationRes.Message = err.Error()
}
client.sendMessage(&pkgmanager.ClientStreamMessage{Message: msg})
}
func (client ManagerClient) handleBackendInfoReq(ctx context.Context, mes *pkgmanager.ServerStreamMessage_BackendInfoReq) {
res, err := client.svc.FetchBackendInfo()
if err != nil {
client.logger.Warn(err.Error())
return
}
info := &pkgmanager.ClientStreamMessage_BackendInfo{
BackendInfo: &pkgmanager.BackendInfo{
Info: res,
Id: mes.BackendInfoReq.Id,
},
}
client.sendMessage(&pkgmanager.ClientStreamMessage{Message: info})
}
func (client ManagerClient) handleOutgoingMessages(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case mes := <-client.responses:
if err := client.stream.Send(mes); err != nil {
return err
}
}
}
}
func (client ManagerClient) sendMessage(mes *pkgmanager.ClientStreamMessage) {
select {
case client.responses <- mes:
return
default:
client.logger.Warn("failed to send message to client")
}
}