NOISSUE - Cache and retry message sending (#222)

* cache and retry message sending

Signed-off-by: Sammy Oina <sammyoina@gmail.com>

* cache and retry message sending

Signed-off-by: SammyOina <sammyoina@gmail.com>

* remove safeconn

Signed-off-by: Sammy Oina <sammyoina@gmail.com>

* simplify retry

Signed-off-by: Sammy Oina <sammyoina@gmail.com>

* debug disconnect

Signed-off-by: Sammy Oina <sammyoina@gmail.com>

* remove debug

Signed-off-by: Sammy Oina <sammyoina@gmail.com>

* simplify

Signed-off-by: SammyOina <sammyoina@gmail.com>

---------

Signed-off-by: Sammy Oina <sammyoina@gmail.com>
Signed-off-by: SammyOina <sammyoina@gmail.com>
This commit is contained in:
Sammy Kerata Oina
2024-09-06 18:02:30 +03:00
committed by GitHub
parent 51b129c3a2
commit c2a4b44769
6 changed files with 135 additions and 54 deletions
+55 -15
View File
@@ -4,6 +4,7 @@ package events
import (
"encoding/json"
"sync"
"time"
"github.com/mdlayher/vsock"
@@ -12,10 +13,15 @@ import (
"google.golang.org/protobuf/types/known/timestamppb"
)
const retryInterval = 5 * time.Second
type service struct {
service string
computationID string
conn *vsock.Conn
service string
computationID string
conn *vsock.Conn
cachedMessages [][]byte
mutex sync.Mutex
stopRetry chan struct{}
}
type AgentEvent struct {
@@ -30,19 +36,21 @@ type AgentEvent struct {
//go:generate mockery --name Service --output=./mocks --filename events.go --quiet --note "Copyright (c) Ultraviolet \n // SPDX-License-Identifier: Apache-2.0"
type Service interface {
SendEvent(event, status string, details json.RawMessage) error
Close() error
Close()
}
func New(svc, computationID string, sockPort uint32) (Service, error) {
conn, err := vsock.Dial(vsock.Host, sockPort, nil)
if err != nil {
return nil, err
func New(svc, computationID string, conn *vsock.Conn) (Service, error) {
s := &service{
service: svc,
computationID: computationID,
conn: conn,
cachedMessages: make([][]byte, 0),
stopRetry: make(chan struct{}),
}
return &service{
service: svc,
computationID: computationID,
conn: conn,
}, nil
go s.periodicRetry()
return s, nil
}
func (s *service) SendEvent(event, status string, details json.RawMessage) error {
@@ -58,12 +66,44 @@ func (s *service) SendEvent(event, status string, details json.RawMessage) error
if err != nil {
return err
}
s.mutex.Lock()
defer s.mutex.Unlock()
if _, err := s.conn.Write(protoBody); err != nil {
s.cachedMessages = append(s.cachedMessages, protoBody)
return err
}
return nil
}
func (s *service) Close() error {
return s.conn.Close()
func (s *service) periodicRetry() {
ticker := time.NewTicker(retryInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
s.retrySendCachedMessages()
case <-s.stopRetry:
return
}
}
}
func (s *service) retrySendCachedMessages() {
s.mutex.Lock()
defer s.mutex.Unlock()
tmp := [][]byte{}
for _, msg := range s.cachedMessages {
if _, err := s.conn.Write(msg); err != nil {
tmp = append(tmp, msg)
}
}
s.cachedMessages = tmp
}
func (s *service) Close() {
close(s.stopRetry)
}
+2 -15
View File
@@ -17,21 +17,8 @@ type Service struct {
}
// Close provides a mock function with given fields:
func (_m *Service) Close() error {
ret := _m.Called()
if len(ret) == 0 {
panic("no return value specified for Close")
}
var r0 error
if rf, ok := ret.Get(0).(func() error); ok {
r0 = rf()
} else {
r0 = ret.Error(0)
}
return r0
func (_m *Service) Close() {
_m.Called()
}
// SendEvent provides a mock function with given fields: event, status, details
+2 -1
View File
@@ -62,10 +62,11 @@ func main() {
exitCode = 1
return
}
handler := agentlogger.NewProtoHandler(conn, &slog.HandlerOptions{Level: level}, cfg.ID)
logger := slog.New(handler)
eventSvc, err := events.New(svcName, cfg.ID, manager.ManagerVsockPort)
eventSvc, err := events.New(svcName, cfg.ID, conn)
if err != nil {
logger.Error(fmt.Sprintf("failed to create events service %s", err.Error()))
exitCode = 1
+3
View File
@@ -24,12 +24,14 @@ require (
require (
github.com/Microsoft/go-winio v0.6.1 // indirect
github.com/containerd/log v0.1.0 // indirect
github.com/distribution/reference v0.6.0 // indirect
github.com/docker/go-connections v0.5.0 // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/moby/docker-image-spec v1.3.1 // indirect
github.com/morikuni/aec v1.0.0 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.1.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.53.0 // indirect
@@ -39,6 +41,7 @@ require (
go.opentelemetry.io/otel/sdk v1.28.0 // indirect
golang.org/x/mod v0.19.0 // indirect
golang.org/x/tools v0.23.0 // indirect
gotest.tools/v3 v3.5.1 // indirect
)
require (
+14 -2
View File
@@ -1,3 +1,5 @@
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 h1:L/gRVlceqvL25UVaW/CKtUDjefjrs0SPonmDGUVOYP0=
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E=
github.com/Microsoft/go-winio v0.6.1 h1:9/kr64B9VUZrLm5YYwbGtUJnMgqWVOdUAXu6Migciow=
github.com/Microsoft/go-winio v0.6.1/go.mod h1:LRdKpFKfdobln8UmuiYcKPot9D2v6svN5+sAH+4kjUM=
github.com/VividCortex/gohistogram v1.0.0 h1:6+hBz+qvs0JOrrNhhmR7lFxo5sINxBCGXrdtl/UvroE=
@@ -12,6 +14,8 @@ github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK3
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/containerd/log v0.1.0 h1:TCJt7ioM2cr/tfR8GPbGf9/VRAX8D2B4PjzCpfX540I=
github.com/containerd/log v0.1.0/go.mod h1:VRRf09a7mHDIRezVKTRCrOq78v577GXq3bSa3EhrzVo=
github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@@ -71,6 +75,10 @@ github.com/mdlayher/vsock v1.2.1 h1:pC1mTJTvjo1r9n9fbm7S1j04rCgCzhCOS5DY0zqHlnQ=
github.com/mdlayher/vsock v1.2.1/go.mod h1:NRfCibel++DgeMD8z/hP+PPTjlNJsdPOmxcnENvE+SE=
github.com/moby/docker-image-spec v1.3.1 h1:jMKff3w6PgbfSa69GfNg+zN/XLhfXJGnEx3Nl2EsFP0=
github.com/moby/docker-image-spec v1.3.1/go.mod h1:eKmb5VW8vQEh/BAr2yvVNvuiJuY6UIocYsFu/DxxRpo=
github.com/moby/term v0.5.0 h1:xt8Q1nalod/v7BqbG21f8mQPqH+xAaC9C3N3wfWbVP0=
github.com/moby/term v0.5.0/go.mod h1:8FzsFHVUBGZdbDsJw/ot+X+d5HLUbvklYLJ9uGfcI3Y=
github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A=
github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc=
github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM=
github.com/opencontainers/image-spec v1.1.0 h1:8SG7/vwALn54lVB/0yZ/MMwhFrPYtpEHQb2IpWsCzug=
@@ -92,6 +100,8 @@ github.com/prometheus/procfs v0.13.0/go.mod h1:cd4PFCR54QLnGKPaKGA6l+cfuNXtht43Z
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/spf13/cobra v1.8.1 h1:e5/vxKd/rZsfSJMUX1agtjeTDf+qv1/JdBF8gg5k9ZM=
github.com/spf13/cobra v1.8.1/go.mod h1:wHxEcudfqmLYa8iTfL+OuZPbBZkmvliBWKIezN3kD9Y=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
@@ -135,8 +145,6 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ=
golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE=
golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys=
golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
@@ -156,6 +164,8 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4=
golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI=
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
@@ -179,3 +189,5 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntN
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gotest.tools/v3 v3.5.1 h1:EENdUnS3pdur5nybKYIh2Vfgc8IUNBjxDPSjtiJcOzU=
gotest.tools/v3 v3.5.1/go.mod h1:isy3WKz7GK6uNw/sbHzfKBLvlvXwUyV06n6brMxxopU=
+59 -21
View File
@@ -6,32 +6,44 @@ import (
"context"
"io"
"log/slog"
"sync"
"time"
"github.com/ultravioletrs/cocos/pkg/manager"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
)
const retryInterval = 5 * time.Second
var _ slog.Handler = (*handler)(nil)
type handler struct {
opts slog.HandlerOptions
w io.Writer
cmpID string
opts slog.HandlerOptions
w io.Writer
cmpID string
cachedMessages [][]byte
mutex sync.Mutex
stopRetry chan struct{}
}
func NewProtoHandler(w io.Writer, opts *slog.HandlerOptions, cmpID string) slog.Handler {
func NewProtoHandler(conn io.Writer, opts *slog.HandlerOptions, cmpID string) slog.Handler {
if opts == nil {
opts = &slog.HandlerOptions{}
}
return &handler{
opts: *opts,
w: w,
cmpID: cmpID,
h := &handler{
opts: *opts,
w: conn,
cmpID: cmpID,
cachedMessages: make([][]byte, 0),
stopRetry: make(chan struct{}),
}
go h.periodicRetry()
return h
}
// Enabled implements slog.Handler.
func (h *handler) Enabled(_ context.Context, l slog.Level) bool {
minLevel := slog.LevelInfo
if h.opts.Level != nil {
@@ -40,13 +52,11 @@ func (h *handler) Enabled(_ context.Context, l slog.Level) bool {
return l >= minLevel
}
// Handle implements slog.Handler.
func (h *handler) Handle(_ context.Context, r slog.Record) error {
message := r.Message
timestamp := timestamppb.New(r.Time)
level := r.Level.String()
// Calculate the number of chunks
chunkSize := 500
numChunks := (len(message) + chunkSize - 1) / chunkSize
@@ -57,10 +67,8 @@ func (h *handler) Handle(_ context.Context, r slog.Record) error {
end = len(message)
}
// Create a chunk of the message
chunk := message[start:end]
// Create the agent log with the chunk
agentLog := manager.ClientStreamMessage{
Message: &manager.ClientStreamMessage_AgentLog{
AgentLog: &manager.AgentLog{
@@ -72,27 +80,57 @@ func (h *handler) Handle(_ context.Context, r slog.Record) error {
},
}
// Marshal the chunk to protobuf
b, err := proto.Marshal(&agentLog)
if err != nil {
return err
}
// Write the chunk to the writer
if _, err := h.w.Write(b); err != nil {
return err
h.mutex.Lock()
_, err = h.w.Write(b)
if err != nil {
h.cachedMessages = append(h.cachedMessages, b)
}
h.mutex.Unlock()
}
return nil
}
// WithAttrs implements slog.Handler.
func (*handler) WithAttrs(attrs []slog.Attr) slog.Handler {
func (h *handler) periodicRetry() {
ticker := time.NewTicker(retryInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
h.retrySendCachedMessages()
case <-h.stopRetry:
return
}
}
}
func (h *handler) retrySendCachedMessages() {
h.mutex.Lock()
defer h.mutex.Unlock()
tmp := [][]byte{}
for _, msg := range h.cachedMessages {
if _, err := h.w.Write(msg); err != nil {
tmp = append(tmp, msg)
}
}
h.cachedMessages = tmp
}
func (h *handler) WithAttrs(attrs []slog.Attr) slog.Handler {
panic("unimplemented")
}
// WithGroup implements slog.Handler.
func (*handler) WithGroup(name string) slog.Handler {
func (h *handler) WithGroup(name string) slog.Handler {
panic("unimplemented")
}
func (h *handler) Close() error {
close(h.stopRetry)
return nil
}