NOISSUE - Remove blocking on vsock (#301)

* no blocking

Signed-off-by: Sammy Oina <sammyoina@gmail.com>

* return error

Signed-off-by: Sammy Oina <sammyoina@gmail.com>

* add test cases

Signed-off-by: Sammy Oina <sammyoina@gmail.com>

---------

Signed-off-by: Sammy Oina <sammyoina@gmail.com>
This commit is contained in:
Sammy Kerata Oina
2024-11-06 04:31:04 +03:00
committed by GitHub
parent 0380b2323a
commit d3636de824
5 changed files with 48 additions and 50 deletions
+3 -3
View File
@@ -121,9 +121,9 @@ func main() {
return
}
eventsSvc := events.New(logger, svc.ReportBrokenConnection, eventsChan)
if eventsSvc == nil {
logger.Error("Failed to create events service")
eventsSvc, err := events.New(logger, svc.ReportBrokenConnection, eventsChan)
if err != nil {
logger.Error(err.Error())
exitCode = 1
return
}
+1 -24
View File
@@ -80,32 +80,9 @@ func (aw *AckWriter) Write(p []byte) (int, error) {
copy(message.Content, p)
aw.messageStore.Store(messageID, message)
select {
case aw.pendingMessages <- message:
timer := time.NewTimer(ackTimeout)
defer timer.Stop()
for {
if msg, ok := aw.messageStore.Load(messageID); ok {
m := msg.(*Message)
if m.Status == StatusAcknowledged {
return len(p), nil
}
if m.Status == StatusFailed {
return 0, fmt.Errorf("message delivery failed after %d retries", maxRetries)
}
}
select {
case <-timer.C:
return 0, fmt.Errorf("timeout waiting for acknowledgment")
case <-aw.ctx.Done():
return 0, fmt.Errorf("writer closed while waiting for acknowledgment")
case <-time.After(100 * time.Millisecond):
continue
}
}
return len(p), nil
case <-aw.ctx.Done():
return 0, fmt.Errorf("writer is closed")
}
+39 -19
View File
@@ -6,6 +6,7 @@ import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"io"
"net"
"sync"
@@ -80,6 +81,7 @@ func TestAckReader_Read(t *testing.T) {
{"Valid message", []byte("Hello, World!"), false},
{"Empty message", []byte{}, false},
{"Message at max size", make([]byte, maxMessageSize), false},
{"Message exceeds max size", make([]byte, maxMessageSize+1), true},
}
for _, tt := range tests {
@@ -208,25 +210,19 @@ func TestAckWriter_Write(t *testing.T) {
tests := []struct {
name string
input []byte
mockBehavior func(*MockConn)
expectErr bool
expectedError string
}{
{
name: "Message exceeds max size",
input: make([]byte, maxMessageSize+1),
mockBehavior: func(m *MockConn) {},
expectErr: true,
expectedError: "message size exceeds maximum allowed size",
},
{
name: "Timeout waiting for acknowledgment",
input: []byte("timeout message"),
mockBehavior: func(m *MockConn) {
// Don't send ACK, let it timeout
},
expectErr: true,
expectedError: "timeout waiting for acknowledgment",
name: "Write succeeds",
input: []byte("Hello, world!"),
expectErr: false,
},
}
@@ -235,13 +231,16 @@ func TestAckWriter_Write(t *testing.T) {
mockConn := &MockConn{
mu: sync.Mutex{},
}
if tt.mockBehavior != nil {
tt.mockBehavior(mockConn)
}
writer := NewAckWriter(mockConn)
defer writer.Close()
if tt.expectErr {
writer.(*AckWriter).ctx.Done()
}
time.Sleep(100 * time.Millisecond)
n, err := writer.Write(tt.input)
if tt.expectErr {
@@ -253,13 +252,6 @@ func TestAckWriter_Write(t *testing.T) {
} else {
assert.NoError(t, err)
assert.Equal(t, len(tt.input), n)
assert.GreaterOrEqual(t, len(mockConn.WrittenData), 8+len(tt.input))
messageLen := binary.LittleEndian.Uint32(mockConn.WrittenData[4:8])
assert.Equal(t, uint32(len(tt.input)), messageLen)
assert.Equal(t, tt.input, mockConn.WrittenData[8:8+len(tt.input)])
}
})
}
@@ -315,3 +307,31 @@ func TestAckReader_LargeMessage(t *testing.T) {
ackID := binary.LittleEndian.Uint32(mockConn.WrittenData)
assert.Equal(t, messageID, ackID)
}
func TestAckWriter_FailedSends(t *testing.T) {
mockConn := &MockConn{
WriteErr: errors.New("write error"),
}
writer := NewAckWriter(mockConn).(*AckWriter)
defer writer.Close()
// Add some messages to the channel
for i := 0; i < 5; i++ {
msg := &Message{
ID: uint32(i + 1),
Content: []byte(fmt.Sprintf("Message %d", i+1)),
Status: StatusPending,
}
writer.pendingMessages <- msg
}
// Wait for the messages to be sent
time.Sleep(100 * time.Millisecond)
// Check that the messages were marked as failed
writer.messageStore.Range(func(key, value interface{}) bool {
msg := value.(*Message)
assert.Equal(t, StatusFailed, msg.Status)
return true
})
}
+3 -3
View File
@@ -25,17 +25,17 @@ type events struct {
eventsChan chan *manager.ClientStreamMessage
}
func New(logger *slog.Logger, reportBrokenConnection ReportBrokenConnectionFunc, eventsChan chan *manager.ClientStreamMessage) Listener {
func New(logger *slog.Logger, reportBrokenConnection ReportBrokenConnectionFunc, eventsChan chan *manager.ClientStreamMessage) (Listener, error) {
l, err := vsock.Listen(ManagerVsockPort, nil)
if err != nil {
return nil
return nil, err
}
return &events{
lis: l,
reportBrokenConnection: reportBrokenConnection,
logger: logger,
eventsChan: eventsChan,
}
}, nil
}
func (e *events) Listen(ctx context.Context) {
+2 -1
View File
@@ -95,7 +95,8 @@ func TestNew(t *testing.T) {
reportBrokenConnection := func(address string) {}
eventsChan := make(chan *manager.ClientStreamMessage)
e := New(logger, reportBrokenConnection, eventsChan)
e, err := New(logger, reportBrokenConnection, eventsChan)
assert.NoError(t, err)
assert.NotNil(t, e)
assert.IsType(t, &events{}, e)