NOISSUE - Enhance event status (#235)

* enhance timeline

Signed-off-by: WashingtonKK <washingtonkigan@gmail.com>

* fix: remove redundant event

Signed-off-by: WashingtonKK <washingtonkigan@gmail.com>

* use constant

Signed-off-by: WashingtonKK <washingtonkigan@gmail.com>

* lint

Signed-off-by: WashingtonKK <washingtonkigan@gmail.com>

* use typed constant for status

Signed-off-by: WashingtonKK <washingtonkigan@gmail.com>

* refactor status

Signed-off-by: WashingtonKK <washingtonkigan@gmail.com>

* export agent status and state

Signed-off-by: WashingtonKK <washingtonkigan@gmail.com>

* ehance event states

Signed-off-by: WashingtonKK <washingtonkigan@gmail.com>

* fix tests

Signed-off-by: WashingtonKK <washingtonkigan@gmail.com>

* use manager states and status

Signed-off-by: WashingtonKK <washingtonkigan@gmail.com>

* move algo-run to agent package

Signed-off-by: WashingtonKK <washingtonkigan@gmail.com>

* replace literal with constant

Signed-off-by: WashingtonKK <washingtonkigan@gmail.com>

* replace manager variable with constant

Signed-off-by: WashingtonKK <washingtonkigan@gmail.com>

---------

Signed-off-by: WashingtonKK <washingtonkigan@gmail.com>
This commit is contained in:
Washington Kigani Kamadi
2024-09-17 16:58:15 +03:00
committed by GitHub
parent 355f95771d
commit c14a6338cc
15 changed files with 210 additions and 92 deletions
+6 -2
View File
@@ -16,7 +16,11 @@ var (
_ io.Writer = &Stderr{}
)
const bufSize = 1024
const (
bufSize = 1024
algorithmRun = "AlgorithmRun"
errorStatus = "Error"
)
type Stdout struct {
Logger *slog.Logger
@@ -66,7 +70,7 @@ func (s *Stderr) Write(p []byte) (n int, err error) {
s.Logger.Error(string(buf[:n]))
}
if err := s.EventSvc.SendEvent("algorithm-run", "error", json.RawMessage{}); err != nil {
if err := s.EventSvc.SendEvent(algorithmRun, errorStatus, json.RawMessage{}); err != nil {
return len(p), err
}
+2 -1
View File
@@ -10,6 +10,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/ultravioletrs/cocos/agent/events/mocks"
"github.com/ultravioletrs/cocos/pkg/manager"
)
func TestStdoutWrite(t *testing.T) {
@@ -72,7 +73,7 @@ func TestStderrWrite(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mockEventService := mocks.NewService(t)
mockEventService.On("SendEvent", "algorithm-run", "error", mock.Anything).Return(nil)
mockEventService.On("SendEvent", "AlgorithmRun", manager.Error.String(), mock.Anything).Return(nil)
stderr := &Stderr{Logger: mglog.NewMock(), EventSvc: mockEventService}
n, err := stderr.Write([]byte(tt.input))
+12 -12
View File
@@ -92,14 +92,14 @@ func New(ctx context.Context, logger *slog.Logger, eventSvc events.Service, cmp
go svc.sm.Start(ctx)
svc.sm.SendEvent(start)
svc.sm.StateFunctions[idle] = svc.publishEvent("in-progress", json.RawMessage{})
svc.sm.StateFunctions[receivingManifest] = svc.publishEvent("in-progress", json.RawMessage{})
svc.sm.StateFunctions[receivingAlgorithm] = svc.publishEvent("in-progress", json.RawMessage{})
svc.sm.StateFunctions[receivingData] = svc.publishEvent("in-progress", json.RawMessage{})
svc.sm.StateFunctions[resultsReady] = svc.publishEvent("in-progress", json.RawMessage{})
svc.sm.StateFunctions[complete] = svc.publishEvent("in-progress", json.RawMessage{})
svc.sm.StateFunctions[running] = svc.runComputation
svc.sm.StateFunctions[failed] = svc.publishEvent("failed", json.RawMessage{})
svc.sm.StateFunctions[Idle] = svc.publishEvent(IdleState.String(), json.RawMessage{})
svc.sm.StateFunctions[ReceivingManifest] = svc.publishEvent(InProgress.String(), json.RawMessage{})
svc.sm.StateFunctions[ReceivingAlgorithm] = svc.publishEvent(InProgress.String(), json.RawMessage{})
svc.sm.StateFunctions[ReceivingData] = svc.publishEvent(InProgress.String(), json.RawMessage{})
svc.sm.StateFunctions[ConsumingResults] = svc.publishEvent(Ready.String(), json.RawMessage{})
svc.sm.StateFunctions[Complete] = svc.publishEvent(Completed.String(), json.RawMessage{})
svc.sm.StateFunctions[Running] = svc.runComputation
svc.sm.StateFunctions[Failed] = svc.publishEvent(Failed.String(), json.RawMessage{})
svc.computation = cmp
@@ -108,7 +108,7 @@ func New(ctx context.Context, logger *slog.Logger, eventSvc events.Service, cmp
}
func (as *agentService) Algo(ctx context.Context, algo Algorithm) error {
if as.sm.GetState() != receivingAlgorithm {
if as.sm.GetState() != ReceivingAlgorithm {
return ErrStateNotReady
}
if as.algorithm != nil {
@@ -189,7 +189,7 @@ func (as *agentService) Algo(ctx context.Context, algo Algorithm) error {
}
func (as *agentService) Data(ctx context.Context, dataset Dataset) error {
if as.sm.GetState() != receivingData {
if as.sm.GetState() != ReceivingData {
return ErrStateNotReady
}
if len(as.computation.Datasets) == 0 {
@@ -242,7 +242,7 @@ func (as *agentService) Data(ctx context.Context, dataset Dataset) error {
}
func (as *agentService) Result(ctx context.Context) ([]byte, error) {
if as.sm.GetState() != resultsReady && as.sm.GetState() != failed {
if as.sm.GetState() != ConsumingResults && as.sm.GetState() != Failed {
return []byte{}, ErrResultsNotReady
}
if len(as.computation.ResultConsumers) == 0 {
@@ -254,7 +254,7 @@ func (as *agentService) Result(ctx context.Context) ([]byte, error) {
}
as.computation.ResultConsumers = slices.Delete(as.computation.ResultConsumers, index, index+1)
if len(as.computation.ResultConsumers) == 0 && as.sm.GetState() == resultsReady {
if len(as.computation.ResultConsumers) == 0 && as.sm.GetState() == ConsumingResults {
as.sm.SendEvent(resultsConsumed)
}
+3 -3
View File
@@ -251,7 +251,7 @@ func TestResult(t *testing.T) {
name: "Test all results consumed",
err: ErrAllResultsConsumed,
setup: func(svc *agentService) {
svc.sm.SetState(resultsReady)
svc.sm.SetState(ConsumingResults)
svc.computation.ResultConsumers = []ResultConsumer{}
},
ctxSetup: func(ctx context.Context) context.Context {
@@ -262,7 +262,7 @@ func TestResult(t *testing.T) {
name: "Test undeclared consumer",
err: ErrUndeclaredConsumer,
setup: func(svc *agentService) {
svc.sm.SetState(resultsReady)
svc.sm.SetState(ConsumingResults)
svc.computation.ResultConsumers = []ResultConsumer{{UserKey: []byte("user")}}
},
ctxSetup: func(ctx context.Context) context.Context {
@@ -273,7 +273,7 @@ func TestResult(t *testing.T) {
name: "Test results consumed and event sent",
err: nil,
setup: func(svc *agentService) {
svc.sm.SetState(resultsReady)
svc.sm.SetState(ConsumingResults)
svc.computation.ResultConsumers = []ResultConsumer{{UserKey: []byte("key")}}
},
ctxSetup: func(ctx context.Context) context.Context {
+45 -32
View File
@@ -9,18 +9,31 @@ import (
"sync"
)
//go:generate stringer -type=state
type state uint8
//go:generate stringer -type=State
type State uint8
const (
idle state = iota
receivingManifest
receivingAlgorithm
receivingData
running
resultsReady
complete
failed
Idle State = iota
ReceivingManifest
ReceivingAlgorithm
ReceivingData
Running
ConsumingResults
Complete
Failed
AlgorithmRun
)
//go:generate stringer -type=Status
type Status uint8
const (
IdleState Status = iota
InProgress
Ready
Completed
Terminated
Error
)
type event uint8
@@ -38,10 +51,10 @@ const (
// StateMachine represents the state machine.
type StateMachine struct {
mu sync.Mutex
State state
State State
EventChan chan event
Transitions map[state]map[event]state
StateFunctions map[state]func()
Transitions map[State]map[event]State
StateFunctions map[State]func()
logger *slog.Logger
wg *sync.WaitGroup
}
@@ -49,37 +62,37 @@ type StateMachine struct {
// NewStateMachine creates a new StateMachine.
func NewStateMachine(logger *slog.Logger, cmp Computation) *StateMachine {
sm := &StateMachine{
State: idle,
State: Idle,
EventChan: make(chan event),
Transitions: make(map[state]map[event]state),
StateFunctions: make(map[state]func()),
Transitions: make(map[State]map[event]State),
StateFunctions: make(map[State]func()),
logger: logger,
wg: &sync.WaitGroup{},
}
sm.Transitions[idle] = make(map[event]state)
sm.Transitions[idle][start] = receivingManifest
sm.Transitions[Idle] = make(map[event]State)
sm.Transitions[Idle][start] = ReceivingManifest
sm.Transitions[receivingManifest] = make(map[event]state)
sm.Transitions[receivingManifest][manifestReceived] = receivingAlgorithm
sm.Transitions[ReceivingManifest] = make(map[event]State)
sm.Transitions[ReceivingManifest][manifestReceived] = ReceivingAlgorithm
sm.Transitions[receivingAlgorithm] = make(map[event]state)
sm.Transitions[ReceivingAlgorithm] = make(map[event]State)
switch len(cmp.Datasets) {
case 0:
sm.Transitions[receivingAlgorithm][algorithmReceived] = running
sm.Transitions[ReceivingAlgorithm][algorithmReceived] = Running
default:
sm.Transitions[receivingAlgorithm][algorithmReceived] = receivingData
sm.Transitions[ReceivingAlgorithm][algorithmReceived] = ReceivingData
}
sm.Transitions[receivingData] = make(map[event]state)
sm.Transitions[receivingData][dataReceived] = running
sm.Transitions[ReceivingData] = make(map[event]State)
sm.Transitions[ReceivingData][dataReceived] = Running
sm.Transitions[running] = make(map[event]state)
sm.Transitions[running][runComplete] = resultsReady
sm.Transitions[running][runFailed] = failed
sm.Transitions[Running] = make(map[event]State)
sm.Transitions[Running][runComplete] = ConsumingResults
sm.Transitions[Running][runFailed] = Failed
sm.Transitions[resultsReady] = make(map[event]state)
sm.Transitions[resultsReady][resultsConsumed] = complete
sm.Transitions[ConsumingResults] = make(map[event]State)
sm.Transitions[ConsumingResults][resultsConsumed] = Complete
return sm
}
@@ -118,13 +131,13 @@ func (sm *StateMachine) SendEvent(event event) {
sm.EventChan <- event
}
func (sm *StateMachine) GetState() state {
func (sm *StateMachine) GetState() State {
sm.mu.Lock()
defer sm.mu.Unlock()
return sm.State
}
func (sm *StateMachine) SetState(state state) {
func (sm *StateMachine) SetState(state State) {
sm.mu.Lock()
defer sm.mu.Unlock()
sm.State = state
+16 -15
View File
@@ -1,4 +1,4 @@
// Code generated by "stringer -type=state"; DO NOT EDIT.
// Code generated by "stringer -type=State"; DO NOT EDIT.
package agent
@@ -8,23 +8,24 @@ func _() {
// An "invalid array index" compiler error signifies that the constant values have changed.
// Re-run the stringer command to generate them again.
var x [1]struct{}
_ = x[idle-0]
_ = x[receivingManifest-1]
_ = x[receivingAlgorithm-2]
_ = x[receivingData-3]
_ = x[running-4]
_ = x[resultsReady-5]
_ = x[complete-6]
_ = x[failed-7]
_ = x[Idle-0]
_ = x[ReceivingManifest-1]
_ = x[ReceivingAlgorithm-2]
_ = x[ReceivingData-3]
_ = x[Running-4]
_ = x[ConsumingResults-5]
_ = x[Complete-6]
_ = x[Failed-7]
_ = x[AlgorithmRun-8]
}
const _state_name = "idlereceivingManifestreceivingAlgorithmreceivingDatarunningresultsReadycompletefailed"
const _State_name = "IdleReceivingManifestReceivingAlgorithmReceivingDataRunningConsumingResultsCompleteFailedAlgorithmRun"
var _state_index = [...]uint8{0, 4, 21, 39, 52, 59, 71, 79, 85}
var _State_index = [...]uint8{0, 4, 21, 39, 52, 59, 75, 83, 89, 101}
func (i state) String() string {
if i >= state(len(_state_index)-1) {
return "state(" + strconv.FormatInt(int64(i), 10) + ")"
func (i State) String() string {
if i >= State(len(_State_index)-1) {
return "State(" + strconv.FormatInt(int64(i), 10) + ")"
}
return _state_name[_state_index[i]:_state_index[i+1]]
return _State_name[_State_index[i]:_State_index[i+1]]
}
+11 -11
View File
@@ -21,18 +21,18 @@ var cmp = Computation{
func TestStateMachineTransitions(t *testing.T) {
cases := []struct {
fromState state
fromState State
event event
expected state
expected State
cmp Computation
}{
{idle, start, receivingManifest, cmp},
{receivingManifest, manifestReceived, receivingAlgorithm, cmp},
{receivingAlgorithm, algorithmReceived, receivingData, cmp},
{receivingAlgorithm, algorithmReceived, running, Computation{}},
{receivingData, dataReceived, running, cmp},
{running, runComplete, resultsReady, cmp},
{resultsReady, resultsConsumed, complete, cmp},
{Idle, start, ReceivingManifest, cmp},
{ReceivingManifest, manifestReceived, ReceivingAlgorithm, cmp},
{ReceivingAlgorithm, algorithmReceived, ReceivingData, cmp},
{ReceivingAlgorithm, algorithmReceived, Running, Computation{}},
{ReceivingData, dataReceived, Running, cmp},
{Running, runComplete, ConsumingResults, cmp},
{ConsumingResults, resultsConsumed, Complete, cmp},
}
for _, tc := range cases {
@@ -61,11 +61,11 @@ func TestStateMachineInvalidTransition(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
go sm.Start(ctx)
sm.SetState(idle)
sm.SetState(Idle)
sm.SendEvent(dataReceived)
if sm.State != idle {
if sm.State != Idle {
t.Errorf("State should not change on an invalid event, but got %v", sm.State)
}
cancel()
+28
View File
@@ -0,0 +1,28 @@
// Code generated by "stringer -type=Status"; DO NOT EDIT.
package agent
import "strconv"
func _() {
// An "invalid array index" compiler error signifies that the constant values have changed.
// Re-run the stringer command to generate them again.
var x [1]struct{}
_ = x[IdleState-0]
_ = x[InProgress-1]
_ = x[Ready-2]
_ = x[Completed-3]
_ = x[Terminated-4]
_ = x[Error-5]
}
const _Status_name = "IdleStateInProgressReadyCompletedTerminatedError"
var _Status_index = [...]uint8{0, 9, 19, 24, 33, 43, 48}
func (i Status) String() string {
if i >= Status(len(_Status_index)-1) {
return "Status(" + strconv.FormatInt(int64(i), 10) + ")"
}
return _Status_name[_Status_index[i]:_Status_index[i+1]]
}
+2 -2
View File
@@ -124,8 +124,8 @@ func (v *qemuVM) checkVMProcessPeriodically() {
Message: &manager.ClientStreamMessage_AgentEvent{
AgentEvent: &manager.AgentEvent{
ComputationId: v.computationId,
EventType: "vm-running",
Status: "stopped",
EventType: manager.VmRunning.String(),
Status: manager.Stopped.String(),
Timestamp: timestamppb.Now(),
Originator: "manager",
},
+10 -10
View File
@@ -111,7 +111,7 @@ func New(cfg qemu.Config, backendMeasurementBinPath string, logger *slog.Logger,
}
func (ms *managerService) Run(ctx context.Context, c *manager.ComputationRunReq) (string, error) {
ms.publishEvent("vm-provision", c.Id, "starting", json.RawMessage{})
ms.publishEvent(manager.VmProvision.String(), c.Id, manager.Starting.String(), json.RawMessage{})
ac := agent.Computation{
ID: c.Id,
Name: c.Name,
@@ -130,7 +130,7 @@ func (ms *managerService) Run(ctx context.Context, c *manager.ComputationRunReq)
for _, data := range c.Datasets {
if len(data.Hash) != hashLength {
ms.publishEvent("vm-provision", c.Id, "failed", json.RawMessage{})
ms.publishEvent(manager.VmProvision.String(), c.Id, agent.Failed.String(), json.RawMessage{})
return "", errInvalidHashLength
}
ac.Datasets = append(ac.Datasets, agent.Dataset{Hash: [hashLength]byte(data.Hash), UserKey: data.UserKey, Filename: data.Filename})
@@ -142,7 +142,7 @@ func (ms *managerService) Run(ctx context.Context, c *manager.ComputationRunReq)
agentPort, err := getFreePort(ms.portRangeMin, ms.portRangeMax)
if err != nil {
ms.publishEvent("vm-provision", c.Id, "failed", json.RawMessage{})
ms.publishEvent(manager.VmProvision.String(), c.Id, agent.Failed.String(), json.RawMessage{})
return "", errors.Wrap(ErrFailedToAllocatePort, err)
}
ms.qemuCfg.HostFwdAgent = agentPort
@@ -150,7 +150,7 @@ func (ms *managerService) Run(ctx context.Context, c *manager.ComputationRunReq)
ch, err := computationHash(ac)
if err != nil {
ms.publishEvent("vm-provision", c.Id, "failed", json.RawMessage{})
ms.publishEvent(manager.VmProvision.String(), c.Id, agent.Failed.String(), json.RawMessage{})
return "", errors.Wrap(ErrFailedToCalculateHash, err)
}
@@ -158,9 +158,9 @@ func (ms *managerService) Run(ctx context.Context, c *manager.ComputationRunReq)
ms.qemuCfg.SevConfig.HostData = base64.StdEncoding.EncodeToString(ch[:])
cvm := ms.vmFactory(ms.qemuCfg, ms.eventsChan, c.Id)
ms.publishEvent("vm-provision", c.Id, "in-progress", json.RawMessage{})
ms.publishEvent(manager.VmProvision.String(), c.Id, agent.InProgress.String(), json.RawMessage{})
if err = cvm.Start(); err != nil {
ms.publishEvent("vm-provision", c.Id, "failed", json.RawMessage{})
ms.publishEvent(manager.VmProvision.String(), c.Id, agent.Failed.String(), json.RawMessage{})
return "", err
}
ms.mu.Lock()
@@ -187,7 +187,7 @@ func (ms *managerService) Run(ctx context.Context, c *manager.ComputationRunReq)
ms.qemuCfg.VSockConfig.Vnc++
ms.publishEvent("vm-provision", c.Id, "complete", json.RawMessage{})
ms.publishEvent(manager.VmProvision.String(), c.Id, agent.Completed.String(), json.RawMessage{})
return fmt.Sprint(ms.qemuCfg.HostFwdAgent), nil
}
@@ -196,11 +196,11 @@ func (ms *managerService) Stop(ctx context.Context, computationID string) error
defer ms.mu.Unlock()
cvm, ok := ms.vms[computationID]
if !ok {
defer ms.publishEvent("stop-computation", computationID, "failed", json.RawMessage{})
defer ms.publishEvent(manager.StopComputationRun.String(), computationID, agent.Failed.String(), json.RawMessage{})
return ErrNotFound
}
if err := cvm.Stop(); err != nil {
defer ms.publishEvent("stop-computation", computationID, "failed", json.RawMessage{})
defer ms.publishEvent(manager.StopComputationRun.String(), computationID, agent.Failed.String(), json.RawMessage{})
return err
}
delete(ms.vms, computationID)
@@ -209,7 +209,7 @@ func (ms *managerService) Stop(ctx context.Context, computationID string) error
ms.logger.Error("Failed to delete persisted VM state", "error", err)
}
defer ms.publishEvent("stop-computation", computationID, "complete", json.RawMessage{})
defer ms.publishEvent(manager.StopComputationRun.String(), computationID, agent.Completed.String(), json.RawMessage{})
return nil
}
+2 -2
View File
@@ -118,10 +118,10 @@ func (s *Stderr) Write(p []byte) (n int, err error) {
Message: &manager.ClientStreamMessage_AgentEvent{
AgentEvent: &manager.AgentEvent{
ComputationId: s.ComputationId,
EventType: "vm-provision",
EventType: manager.VmProvision.String(),
Timestamp: timestamppb.Now(),
Originator: "manager",
Status: "error",
Status: manager.Error.String(),
},
},
}
+2 -2
View File
@@ -121,8 +121,8 @@ func TestStderrWrite(t *testing.T) {
agentEvent := msg.GetAgentEvent()
assert.NotNil(t, agentEvent)
assert.Equal(t, "test-computation", agentEvent.ComputationId)
assert.Equal(t, "vm-provision", agentEvent.EventType)
assert.Equal(t, "error", agentEvent.Status)
assert.Equal(t, manager.VmProvision.String(), agentEvent.EventType)
assert.Equal(t, manager.Error.String(), agentEvent.Status)
assert.NotNil(t, agentEvent.Timestamp)
}
case <-time.After(time.Second):
+21
View File
@@ -0,0 +1,21 @@
// Copyright (c) Ultraviolet
// SPDX-License-Identifier: Apache-2.0
package manager
//go:generate stringer -type=ManagerState
type ManagerState uint8
const (
VmProvision ManagerState = iota
StopComputationRun
VmRunning
)
//go:generate stringer -type=ManagerStatus
type ManagerStatus uint8
const (
Starting ManagerStatus = iota
Stopped
Error
)
+25
View File
@@ -0,0 +1,25 @@
// Code generated by "stringer -type=ManagerState"; DO NOT EDIT.
package manager
import "strconv"
func _() {
// An "invalid array index" compiler error signifies that the constant values have changed.
// Re-run the stringer command to generate them again.
var x [1]struct{}
_ = x[VmProvision-0]
_ = x[StopComputationRun-1]
_ = x[VmRunning-2]
}
const _ManagerState_name = "VmProvisionStopComputationRunVmRunning"
var _ManagerState_index = [...]uint8{0, 11, 29, 38}
func (i ManagerState) String() string {
if i >= ManagerState(len(_ManagerState_index)-1) {
return "ManagerState(" + strconv.FormatInt(int64(i), 10) + ")"
}
return _ManagerState_name[_ManagerState_index[i]:_ManagerState_index[i+1]]
}
+25
View File
@@ -0,0 +1,25 @@
// Code generated by "stringer -type=ManagerStatus"; DO NOT EDIT.
package manager
import "strconv"
func _() {
// An "invalid array index" compiler error signifies that the constant values have changed.
// Re-run the stringer command to generate them again.
var x [1]struct{}
_ = x[Starting-0]
_ = x[Stopped-1]
_ = x[Error-2]
}
const _ManagerStatus_name = "StartingStoppedError"
var _ManagerStatus_index = [...]uint8{0, 8, 15, 20}
func (i ManagerStatus) String() string {
if i >= ManagerStatus(len(_ManagerStatus_index)-1) {
return "ManagerStatus(" + strconv.FormatInt(int64(i), 10) + ")"
}
return _ManagerStatus_name[_ManagerStatus_index[i]:_ManagerStatus_index[i+1]]
}