Files
Sammy Kerata Oina a3265bc346 NOISSUE - Introduce computation runner, log forwarder, ingress, and egress proxy services. (#559)
* feat: Introduce computation runner, log forwarder, ingress, and egress proxy services.

Signed-off-by: Sammy Oina <sammyoina@gmail.com>

* feat: Update Go environment variable parsing and build system to use new architecture and repository.

Signed-off-by: Sammy Oina <sammyoina@gmail.com>

* feat: Update package sources to `sammyoina/cocos-ai` at a specific commit, add log-forwarder pre-start hook, and rename proxy binaries.

Signed-off-by: Sammy Oina <sammyoina@gmail.com>

* chore: Update build system references to a specific commit and enhance logging for service connections and message processing.

Signed-off-by: Sammy Oina <sammyoina@gmail.com>

* build: Update package source repositories and versions, migrate client logging to slog, and adjust ingress/egress proxy build and install steps.

Signed-off-by: Sammy Oina <sammyoina@gmail.com>

* debug stuck

Signed-off-by: Sammy Oina <sammyoina@gmail.com>

* debug

Signed-off-by: Sammy Oina <sammyoina@gmail.com>

* debug

Signed-off-by: Sammy Oina <sammyoina@gmail.com>

* feat: add HTTP/2 support to egress proxy and update build system to use specific commit hashes

Signed-off-by: Sammy Oina <sammyoina@gmail.com>

* feat: enhance egress proxy CONNECT handling, update package sources, and add gRPC test utility

Signed-off-by: Sammy Oina <sammyoina@gmail.com>

* feat: Update build system for various services to a specific commit from a new repository, change agent gRPC port to 7001, and add a gRPC test client.

Signed-off-by: Sammy Oina <sammyoina@gmail.com>

* feat: Migrate agent-internal gRPC communication to Unix sockets, set ingress proxy to port 7002, and update build hashes.

Signed-off-by: Sammy Oina <sammyoina@gmail.com>

* refactor: Remove standalone ingress-proxy systemd service and update component versions.

Signed-off-by: Sammy Oina <sammyoina@gmail.com>

* fix: Prevent computation re-initialization in agent and update component versions across several packages.

Signed-off-by: Sammy Oina <sammyoina@gmail.com>

* feat: update package versions and enable h2c support in ingress proxy.

Signed-off-by: Sammy Oina <sammyoina@gmail.com>

* feat: refactor ingress proxy to support HTTP/2 over Unix sockets and update component versions.

Signed-off-by: Sammy Oina <sammyoina@gmail.com>

* feat: Update build system package sources to `ultravioletrs/cocos` and reduce agent logging verbosity.

Signed-off-by: Sammy Oina <sammyoina@gmail.com>

* refactor: improve error handling in proxy commands and remove unused gRPC test

Signed-off-by: Sammy Oina <sammyoina@gmail.com>

* test: add mock service state return value in handleRunReqChunks test

Signed-off-by: Sammy Oina <sammyoina@gmail.com>

* feat: add comprehensive tests for service and proxy components

Signed-off-by: Sammy Oina <sammyoina@gmail.com>

* fix linter

Signed-off-by: Sammy Oina <sammyoina@gmail.com>

* improve coverage

Signed-off-by: Sammy Oina <sammyoina@gmail.com>

* test: add gRPC client and ingress adapter tests, and update egress proxy tests.

Signed-off-by: Sammy Oina <sammyoina@gmail.com>

* improve coverage

Signed-off-by: Sammy Oina <sammyoina@gmail.com>

---------

Signed-off-by: Sammy Oina <sammyoina@gmail.com>
2026-02-09 10:38:21 +01:00

117 lines
2.3 KiB
Go

// Copyright (c) Ultraviolet
// SPDX-License-Identifier: Apache-2.0
package logger
import (
"context"
"io"
"log/slog"
"github.com/ultravioletrs/cocos/agent/cvms"
"github.com/ultravioletrs/cocos/agent/events"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/types/known/timestamppb"
)
var _ slog.Handler = (*handler)(nil)
type handler struct {
opts slog.HandlerOptions
w io.Writer
cmpID string
queue chan *cvms.ClientStreamMessage
}
func NewProtoHandler(conn io.Writer, opts *slog.HandlerOptions, queue chan *cvms.ClientStreamMessage) slog.Handler {
if opts == nil {
opts = &slog.HandlerOptions{}
}
h := &handler{
opts: *opts,
w: conn,
queue: queue,
}
return h
}
func (h *handler) Enabled(_ context.Context, l slog.Level) bool {
minLevel := slog.LevelInfo
if h.opts.Level != nil {
minLevel = h.opts.Level.Level()
}
return l >= minLevel
}
func (h *handler) Handle(_ context.Context, r slog.Record) error {
slog.Info("logging message", "message", r.Message)
message := r.Message
timestamp := timestamppb.New(r.Time)
level := r.Level.String()
chunkSize := 500
numChunks := (len(message) + chunkSize - 1) / chunkSize
for i := 0; i < numChunks; i++ {
start := i * chunkSize
end := start + chunkSize
if end > len(message) {
end = len(message)
}
chunk := message[start:end]
agentLog := events.EventsLogs{
Message: &events.EventsLogs_AgentLog{
AgentLog: &events.AgentLog{
Timestamp: timestamp,
Message: chunk,
Level: level,
ComputationId: h.cmpID,
},
},
}
h.queue <- &cvms.ClientStreamMessage{
Message: &cvms.ClientStreamMessage_AgentLog{
AgentLog: &cvms.AgentLog{
Timestamp: timestamp,
Message: chunk,
Level: level,
ComputationId: h.cmpID,
},
},
}
b, err := protojson.Marshal(&agentLog)
if err != nil {
return err
}
_, err = h.w.Write(b)
if err != nil {
return err
}
_, err = h.w.Write([]byte("\n"))
if err != nil {
return err
}
}
return nil
}
func (h *handler) WithAttrs(attrs []slog.Attr) slog.Handler {
panic("unimplemented")
}
func (h *handler) WithGroup(name string) slog.Handler {
h.cmpID = name
return h
}
func (h *handler) Close() error {
return nil
}