NOISSUE - Add VM state machine and filter on qemu logs (#272)

* add vm state machine and filter on qemu logs

Signed-off-by: Sammy Oina <sammyoina@gmail.com>

* fix lint

Signed-off-by: Sammy Oina <sammyoina@gmail.com>

* fix failing test

Signed-off-by: Sammy Oina <sammyoina@gmail.com>

* fix logging test

Signed-off-by: Sammy Oina <sammyoina@gmail.com>

* fix tests

Signed-off-by: Sammy Oina <sammyoina@gmail.com>

* fix failing test

Signed-off-by: Sammy Oina <sammyoina@gmail.com>

---------

Signed-off-by: Sammy Oina <sammyoina@gmail.com>
This commit is contained in:
Sammy Kerata Oina
2024-10-08 17:02:17 +03:00
committed by GitHub
parent 643c132ff7
commit 034547d667
11 changed files with 175 additions and 33 deletions
+1 -1
View File
@@ -125,7 +125,7 @@ func (ms *managerService) reportBrokenConnection(cmpID string) {
ms.eventsChan <- &manager.ClientStreamMessage{
Message: &manager.ClientStreamMessage_AgentEvent{
AgentEvent: &manager.AgentEvent{
EventType: manager.VmRunning.String(),
EventType: ms.vms[cmpID].State(),
ComputationId: cmpID,
Status: manager.Disconnected.String(),
Timestamp: timestamppb.Now(),
+5 -2
View File
@@ -125,9 +125,9 @@ func TestHandleConnection(t *testing.T) {
msg := &manager.ClientStreamMessage{
Message: &manager.ClientStreamMessage_AgentEvent{
AgentEvent: &manager.AgentEvent{
EventType: manager.VmRunning.String(),
EventType: manager.VmProvision.String(),
ComputationId: "comp1",
Status: manager.VmRunning.String(),
Status: manager.VmProvision.String(),
Timestamp: timestamppb.Now(),
Originator: "agent",
},
@@ -153,6 +153,9 @@ func TestHandleConnection(t *testing.T) {
func TestReportBrokenConnection(t *testing.T) {
ms := &managerService{
eventsChan: make(chan *manager.ClientStreamMessage, 1),
vms: map[string]vm.VM{
"comp1": qemu.NewVM(qemu.Config{VSockConfig: qemu.VSockConfig{GuestCID: 3}}, make(chan *manager.ClientStreamMessage), "comp1"),
},
}
ms.reportBrokenConnection("comp1")
+20 -2
View File
@@ -30,6 +30,7 @@ type qemuVM struct {
cmd *exec.Cmd
logsChan chan *manager.ClientStreamMessage
computationId string
vm.StateMachine
}
func NewVM(config interface{}, logsChan chan *manager.ClientStreamMessage, computationId string) vm.VM {
@@ -37,6 +38,7 @@ func NewVM(config interface{}, logsChan chan *manager.ClientStreamMessage, compu
config: config.(Config),
logsChan: logsChan,
computationId: computationId,
StateMachine: vm.NewStateMachine(),
}
}
@@ -73,12 +75,28 @@ func (v *qemuVM) Start() (err error) {
v.cmd = exec.Command(exe, args...)
v.cmd.Stdout = &vm.Stdout{LogsChan: v.logsChan, ComputationId: v.computationId}
v.cmd.Stderr = &vm.Stderr{LogsChan: v.logsChan, ComputationId: v.computationId}
v.cmd.Stderr = &vm.Stderr{LogsChan: v.logsChan, ComputationId: v.computationId, StateMachine: v.StateMachine}
return v.cmd.Start()
}
func (v *qemuVM) Stop() error {
defer func() {
err := v.StateMachine.Transition(manager.StopComputationRun)
if err != nil {
v.logsChan <- &manager.ClientStreamMessage{
Message: &manager.ClientStreamMessage_AgentEvent{
AgentEvent: &manager.AgentEvent{
ComputationId: v.computationId,
EventType: v.StateMachine.State(),
Status: manager.Warning.String(),
Timestamp: timestamppb.Now(),
Originator: "manager",
},
},
}
}
}()
err := v.cmd.Process.Signal(syscall.SIGTERM)
if err != nil {
return fmt.Errorf("failed to send SIGTERM: %v", err)
@@ -146,7 +164,7 @@ func (v *qemuVM) checkVMProcessPeriodically() {
Message: &manager.ClientStreamMessage_AgentEvent{
AgentEvent: &manager.AgentEvent{
ComputationId: v.computationId,
EventType: manager.VmRunning.String(),
EventType: v.StateMachine.State(),
Status: manager.Stopped.String(),
Timestamp: timestamppb.Now(),
Originator: "manager",
+4 -1
View File
@@ -9,6 +9,7 @@ import (
"time"
"github.com/stretchr/testify/assert"
"github.com/ultravioletrs/cocos/manager/vm"
"github.com/ultravioletrs/cocos/pkg/manager"
)
@@ -57,6 +58,7 @@ func TestStop(t *testing.T) {
cmd: &exec.Cmd{
Process: cmd.Process,
},
StateMachine: vm.NewStateMachine(),
}
err = vm.Stop()
@@ -110,6 +112,7 @@ func TestCheckVMProcessPeriodically(t *testing.T) {
cmd: &exec.Cmd{
Process: &os.Process{Pid: -1}, // Use an invalid PID to simulate a stopped process
},
StateMachine: vm.NewStateMachine(),
}
go vm.checkVMProcessPeriodically()
@@ -118,7 +121,7 @@ func TestCheckVMProcessPeriodically(t *testing.T) {
case msg := <-logsChan:
assert.NotNil(t, msg.GetAgentEvent())
assert.Equal(t, "test-computation", msg.GetAgentEvent().ComputationId)
assert.Equal(t, manager.VmRunning.String(), msg.GetAgentEvent().EventType)
assert.Equal(t, manager.VmProvision.String(), msg.GetAgentEvent().EventType)
assert.Equal(t, manager.Stopped.String(), msg.GetAgentEvent().Status)
case <-time.After(2 * interval):
t.Fatal("Timeout waiting for VM stopped message")
+8
View File
@@ -187,6 +187,10 @@ func (ms *managerService) Run(ctx context.Context, c *manager.ComputationRunReq)
return "", err
}
if err := ms.vms[c.Id].Transition(manager.VmRunning); err != nil {
ms.logger.Warn("Failed to transition VM state", "computation", c.Id, "error", err)
}
ms.publishEvent(manager.VmProvision.String(), c.Id, agent.Completed.String(), json.RawMessage{})
return fmt.Sprint(ms.qemuCfg.HostFwdAgent), nil
}
@@ -327,6 +331,10 @@ func (ms *managerService) restoreVMs() error {
continue
}
if err := cvm.Transition(manager.VmRunning); err != nil {
ms.logger.Warn("Failed to transition VM state", "computation", state.ID, "error", err)
}
ms.vms[state.ID] = cvm
ms.logger.Info("Successfully restored VM state", "id", state.ID, "computationId", state.ID, "pid", state.PID)
}
+2
View File
@@ -86,6 +86,7 @@ func TestRun(t *testing.T) {
vmMock.On("SendAgentConfig", mock.Anything).Return(nil)
vmMock.On("GetProcess").Return(1234)
vmMock.On("Transition", mock.Anything).Return(nil)
persistence.On("SaveVM", mock.Anything).Return(nil)
@@ -324,6 +325,7 @@ func TestRestoreVMs(t *testing.T) {
vmMock := new(mocks.VM)
vmf.On("Execute", mock.Anything, mock.Anything, mock.Anything).Return(vmMock)
vmMock.On("SetProcess", mock.Anything).Return(nil)
vmMock.On("Transition", mock.Anything).Return(nil)
ms := &managerService{
persistence: mockPersistence,
vms: make(map[string]vm.VM),
+36 -25
View File
@@ -7,6 +7,7 @@ import (
"errors"
"io"
"log/slog"
"strings"
"github.com/ultravioletrs/cocos/pkg/manager"
"google.golang.org/protobuf/types/known/timestamppb"
@@ -58,18 +59,7 @@ func (s *Stdout) Write(p []byte) (n int, err error) {
return len(p) - inBuf.Len(), err
}
msg := &manager.ClientStreamMessage{
Message: &manager.ClientStreamMessage_AgentLog{
AgentLog: &manager.AgentLog{
Message: string(buf[:n]),
ComputationId: s.ComputationId,
Level: slog.LevelDebug.String(),
Timestamp: timestamppb.Now(),
},
},
}
if err := safeSend(s.LogsChan, msg); err != nil {
if err := sendLog(s.LogsChan, s.ComputationId, string(buf[:n]), slog.LevelDebug.String()); err != nil {
return len(p) - inBuf.Len(), err
}
}
@@ -80,6 +70,7 @@ func (s *Stdout) Write(p []byte) (n int, err error) {
type Stderr struct {
LogsChan chan *manager.ClientStreamMessage
ComputationId string
StateMachine StateMachine
}
// Write implements io.Writer.
@@ -97,18 +88,7 @@ func (s *Stderr) Write(p []byte) (n int, err error) {
return len(p) - inBuf.Len(), err
}
msg := &manager.ClientStreamMessage{
Message: &manager.ClientStreamMessage_AgentLog{
AgentLog: &manager.AgentLog{
Message: string(buf[:n]),
ComputationId: s.ComputationId,
Level: slog.LevelError.String(),
Timestamp: timestamppb.Now(),
},
},
}
if err := safeSend(s.LogsChan, msg); err != nil {
if err := sendLog(s.LogsChan, s.ComputationId, string(buf[:n]), ""); err != nil {
return len(p) - inBuf.Len(), err
}
}
@@ -118,7 +98,7 @@ func (s *Stderr) Write(p []byte) (n int, err error) {
Message: &manager.ClientStreamMessage_AgentEvent{
AgentEvent: &manager.AgentEvent{
ComputationId: s.ComputationId,
EventType: manager.VmRunning.String(),
EventType: s.StateMachine.State(),
Timestamp: timestamppb.Now(),
Originator: "manager",
Status: manager.Warning.String(),
@@ -132,3 +112,34 @@ func (s *Stderr) Write(p []byte) (n int, err error) {
return len(p), nil
}
func sendLog(logsChan chan *manager.ClientStreamMessage, computationID, message, level string) error {
if len(message) < 3 {
return nil
}
if level == "" {
if strings.Contains(strings.ToLower(message), "warning") {
level = slog.LevelWarn.String()
} else {
level = slog.LevelError.String()
}
}
msg := &manager.ClientStreamMessage{
Message: &manager.ClientStreamMessage_AgentLog{
AgentLog: &manager.AgentLog{
Message: message,
ComputationId: computationID,
Level: level,
Timestamp: timestamppb.Now(),
},
},
}
if err := safeSend(logsChan, msg); err != nil {
return err
}
return nil
}
+5 -1
View File
@@ -29,7 +29,7 @@ func TestStdoutWrite(t *testing.T) {
},
{
name: "Large write exceeding buffer size",
input: string(make([]byte, bufSize*2+1)),
input: string(make([]byte, bufSize*2+3)),
expectedWrites: 3,
},
}
@@ -97,8 +97,12 @@ func TestStderrWrite(t *testing.T) {
s := &Stderr{
LogsChan: logsChan,
ComputationId: "test-computation",
StateMachine: NewStateMachine(),
}
err := s.StateMachine.Transition(manager.VmRunning)
assert.NoError(t, err)
n, err := s.Write([]byte(tt.input))
assert.NoError(t, err)
+39 -1
View File
@@ -6,8 +6,10 @@
package mocks
import (
mock "github.com/stretchr/testify/mock"
agent "github.com/ultravioletrs/cocos/agent"
manager "github.com/ultravioletrs/cocos/pkg/manager"
mock "github.com/stretchr/testify/mock"
)
// VM is an autogenerated mock type for the VM type
@@ -105,6 +107,24 @@ func (_m *VM) Start() error {
return r0
}
// State provides a mock function with given fields:
func (_m *VM) State() string {
ret := _m.Called()
if len(ret) == 0 {
panic("no return value specified for State")
}
var r0 string
if rf, ok := ret.Get(0).(func() string); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(string)
}
return r0
}
// Stop provides a mock function with given fields:
func (_m *VM) Stop() error {
ret := _m.Called()
@@ -123,6 +143,24 @@ func (_m *VM) Stop() error {
return r0
}
// Transition provides a mock function with given fields: newState
func (_m *VM) Transition(newState manager.ManagerState) error {
ret := _m.Called(newState)
if len(ret) == 0 {
panic("no return value specified for Transition")
}
var r0 error
if rf, ok := ret.Get(0).(func(manager.ManagerState) error); ok {
r0 = rf(newState)
} else {
r0 = ret.Error(0)
}
return r0
}
// NewVM creates a new instance of VM. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewVM(t interface {
+53
View File
@@ -0,0 +1,53 @@
// Copyright (c) Ultraviolet
// SPDX-License-Identifier: Apache-2.0
package vm
import (
"errors"
"sync"
"github.com/ultravioletrs/cocos/pkg/manager"
)
type sm struct {
sync.Mutex
state manager.ManagerState
}
type StateMachine interface {
Transition(newState manager.ManagerState) error
State() string
}
func NewStateMachine() StateMachine {
return &sm{state: manager.VmProvision}
}
func (sm *sm) Transition(newState manager.ManagerState) error {
sm.Lock()
defer sm.Unlock()
switch sm.state {
case manager.VmProvision:
if newState == manager.VmRunning || newState == manager.StopComputationRun {
sm.state = newState
return nil
}
case manager.VmRunning:
if newState == manager.StopComputationRun {
sm.state = newState
return nil
}
case manager.StopComputationRun:
if newState == manager.VmRunning {
sm.state = newState
return nil
}
}
return errors.New("invalid state transition")
}
func (sm *sm) State() string {
sm.Lock()
defer sm.Unlock()
return sm.state.String()
}
+2
View File
@@ -17,6 +17,8 @@ type VM interface {
SetProcess(pid int) error
GetProcess() int
GetCID() int
Transition(newState manager.ManagerState) error
State() string
}
//go:generate mockery --name Provider --output=./mocks --filename provider.go --quiet --note "Copyright (c) Ultraviolet \n // SPDX-License-Identifier: Apache-2.0"