NOISSUE - Remove run channel (#231)

Signed-off-by: Sammy Oina <sammyoina@gmail.com>
This commit is contained in:
Sammy Kerata Oina
2024-09-05 13:27:06 +03:00
committed by GitHub
parent 7ba34b93bc
commit 00980639d5
3 changed files with 64 additions and 48 deletions
+50 -42
View File
@@ -4,6 +4,7 @@ package grpc
import (
"bytes"
"context"
"errors"
"io"
@@ -28,7 +29,7 @@ type grpcServer struct {
}
type Service interface {
Run(ipAddress string, runReqChan chan *manager.ServerStreamMessage, authInfo credentials.AuthInfo)
Run(ctx context.Context, ipAddress string, sendMessage func(*manager.ServerStreamMessage) error, authInfo credentials.AuthInfo)
}
// NewServer returns new AuthServiceServer instance.
@@ -40,12 +41,11 @@ func NewServer(incoming chan *manager.ClientStreamMessage, svc Service) manager.
}
func (s *grpcServer) Process(stream manager.ManagerService_ProcessServer) error {
runReqChan := make(chan *manager.ServerStreamMessage)
defer close(runReqChan)
client, ok := peer.FromContext(stream.Context())
if ok {
go s.svc.Run(client.Addr.String(), runReqChan, client.AuthInfo)
if !ok {
return errors.New("failed to get peer info")
}
eg, ctx := errgroup.WithContext(stream.Context())
eg.Go(func() error {
@@ -60,45 +60,53 @@ func (s *grpcServer) Process(stream manager.ManagerService_ProcessServer) error
})
eg.Go(func() error {
for {
select {
case <-ctx.Done():
return nil
case req := <-runReqChan:
switch msg := req.Message.(type) {
case *manager.ServerStreamMessage_RunReq:
data, err := proto.Marshal(msg.RunReq)
if err != nil {
return err
}
dataBuffer := bytes.NewBuffer(data)
buf := make([]byte, bufferSize)
for {
n, err := dataBuffer.Read(buf)
chunk := &manager.ServerStreamMessage{
Message: &manager.ServerStreamMessage_RunReqChunks{
RunReqChunks: &manager.RunReqChunks{
Data: buf[:n],
},
},
}
if err := stream.Send(chunk); err != nil {
return err
}
if err == io.EOF {
break
}
}
default:
if err := stream.Send(req); err != nil {
return err
}
}
sendMessage := func(msg *manager.ServerStreamMessage) error {
switch m := msg.Message.(type) {
case *manager.ServerStreamMessage_RunReq:
return s.sendRunReqInChunks(stream, m.RunReq)
default:
return stream.Send(msg)
}
}
s.svc.Run(ctx, client.Addr.String(), sendMessage, client.AuthInfo)
return nil
})
return eg.Wait()
}
func (s *grpcServer) sendRunReqInChunks(stream manager.ManagerService_ProcessServer, runReq *manager.ComputationRunReq) error {
data, err := proto.Marshal(runReq)
if err != nil {
return err
}
dataBuffer := bytes.NewBuffer(data)
buf := make([]byte, bufferSize)
for {
n, err := dataBuffer.Read(buf)
if err != nil && err != io.EOF {
return err
}
chunk := &manager.ServerStreamMessage{
Message: &manager.ServerStreamMessage_RunReqChunks{
RunReqChunks: &manager.RunReqChunks{
Data: buf[:n],
},
},
}
if err := stream.Send(chunk); err != nil {
return err
}
if err == io.EOF {
break
}
}
return nil
}
+8 -3
View File
@@ -64,7 +64,7 @@ func bufDialer(context.Context, string) (net.Conn, error) {
return lis.Dial()
}
func (s *svc) Run(ipAddress string, runReqChan chan *manager.ServerStreamMessage, authInfo credentials.AuthInfo) {
func (s *svc) Run(ctx context.Context, ipAddress string, sendMessage func(*manager.ServerStreamMessage) error, authInfo credentials.AuthInfo) {
privKey, err := rsa.GenerateKey(rand.Reader, keyBitSize)
if err != nil {
s.t.Fatalf("Error generating public key: %v", err)
@@ -82,10 +82,12 @@ func (s *svc) Run(ipAddress string, runReqChan chan *manager.ServerStreamMessage
go func() {
time.Sleep(time.Millisecond * 100)
runReqChan <- &manager.ServerStreamMessage{
if err := sendMessage(&manager.ServerStreamMessage{
Message: &manager.ServerStreamMessage_TerminateReq{
TerminateReq: &manager.Terminate{Message: "test terminate"},
},
}); err != nil {
s.t.Fatalf("failed to send terminate request: %s", err)
}
}()
@@ -105,7 +107,8 @@ func (s *svc) Run(ipAddress string, runReqChan chan *manager.ServerStreamMessage
pubPem, _ := pem.Decode(pubPemBytes)
algoHash := sha3.Sum256(algo)
dataHash := sha3.Sum256(data)
runReqChan <- &manager.ServerStreamMessage{
if err := sendMessage(&manager.ServerStreamMessage{
Message: &manager.ServerStreamMessage_RunReq{
RunReq: &manager.ComputationRunReq{
Id: "1",
@@ -121,6 +124,8 @@ func (s *svc) Run(ipAddress string, runReqChan chan *manager.ServerStreamMessage
},
},
},
}); err != nil {
s.t.Fatalf("failed to send run request: %s", err)
}
}()
}
+6 -3
View File
@@ -42,8 +42,8 @@ type svc struct {
logger *slog.Logger
}
func (s *svc) Run(ipAdress string, reqChan chan *manager.ServerStreamMessage, auth credentials.AuthInfo) {
s.logger.Debug(fmt.Sprintf("received who am on ip address %s", ipAdress))
func (s *svc) Run(ctx context.Context, ipAddress string, sendMessage func(*manager.ServerStreamMessage) error, authInfo credentials.AuthInfo) {
s.logger.Debug(fmt.Sprintf("received who am on ip address %s", ipAddress))
pubKey, err := os.ReadFile(pubKeyFile)
if err != nil {
@@ -73,7 +73,7 @@ func (s *svc) Run(ipAdress string, reqChan chan *manager.ServerStreamMessage, au
return
}
reqChan <- &manager.ServerStreamMessage{
if err := sendMessage(&manager.ServerStreamMessage{
Message: &manager.ServerStreamMessage_RunReq{
RunReq: &manager.ComputationRunReq{
Id: "1",
@@ -89,6 +89,9 @@ func (s *svc) Run(ipAdress string, reqChan chan *manager.ServerStreamMessage, au
},
},
},
}); err != nil {
s.logger.Error(fmt.Sprintf("failed to send run request: %s", err))
return
}
}