COCOS-83 - Add hash verification for datasets and algorithms (#84)

* Add hash verification for datasets and algorithms

Enhanced data integrity checking by incorporating hash fields in Dataset and Algorithm structures, and modified the corresponding service logic to validate these hashes during processing. The update includes SHA-3 for hash computation, replacing the former SHA-256 usage, and ensures the provided data matches the expected hash from the manifest to prevent processing malformed or corrupted data.

- Introduce `Hash` field to both Dataset and Algorithm structs to store the expected hash value.
- Implement SHA-3 hashing within service methods that process the data, ensuring consistency with newly added `Hash` fields.
- Add error handling for hash mismatches, preventing further processing and alerting to potential data integrity issues.
- Update Protocol Buffers serialization to accommodate the new hash fields for gRPC communication.
- Modify manager service's Run method to pass the hash information when creating agent configurations.

Go module dependencies were updated to include the new SHA-3 package and upgrade Go version to 1.21.6 for improved stability and compatibility.

Signed-off-by: SammyOina <sammyoina@gmail.com>

* Remove identifiers from protobuf and related code

The protobuf definitions and related service handling code have been revised to drop specific identifier fields (`AlgorithmID` and `DatasetID`) to simplify API responses and internal function signatures. These removals streamline the overall data flow between components, reduce unnecessary data transmission, and lead to an aligned server-client expectation where identifiers are no longer a part of the response payload. Consequently, these changes simplify the logic within various functions and client commands, reinforcing encapsulation by ensuring that internal identifiers do not need to be managed or exposed unnecessarily.

Signed-off-by: SammyOina <sammyoina@gmail.com>

* fix lint

Signed-off-by: SammyOina <sammyoina@gmail.com>

---------

Signed-off-by: SammyOina <sammyoina@gmail.com>
This commit is contained in:
Sammy Kerata Oina
2024-02-19 17:31:35 +03:00
committed by GitHub
parent 722b463b6a
commit 997fb3bf48
19 changed files with 147 additions and 211 deletions
+8 -30
View File
@@ -90,8 +90,6 @@ type AlgoResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
AlgorithmID string `protobuf:"bytes,1,opt,name=algorithmID,proto3" json:"algorithmID,omitempty"`
}
func (x *AlgoResponse) Reset() {
@@ -126,13 +124,6 @@ func (*AlgoResponse) Descriptor() ([]byte, []int) {
return file_agent_agent_proto_rawDescGZIP(), []int{1}
}
func (x *AlgoResponse) GetAlgorithmID() string {
if x != nil {
return x.AlgorithmID
}
return ""
}
type DataRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
@@ -200,8 +191,6 @@ type DataResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
DatasetID string `protobuf:"bytes,1,opt,name=datasetID,proto3" json:"datasetID,omitempty"`
}
func (x *DataResponse) Reset() {
@@ -236,13 +225,6 @@ func (*DataResponse) Descriptor() ([]byte, []int) {
return file_agent_agent_proto_rawDescGZIP(), []int{3}
}
func (x *DataResponse) GetDatasetID() string {
if x != nil {
return x.DatasetID
}
return ""
}
type ResultRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
@@ -441,18 +423,14 @@ var file_agent_agent_proto_rawDesc = []byte{
0x67, 0x6f, 0x72, 0x69, 0x74, 0x68, 0x6d, 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x72, 0x6f, 0x76, 0x69,
0x64, 0x65, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x70, 0x72, 0x6f, 0x76, 0x69,
0x64, 0x65, 0x72, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52,
0x02, 0x69, 0x64, 0x22, 0x30, 0x0a, 0x0c, 0x41, 0x6c, 0x67, 0x6f, 0x52, 0x65, 0x73, 0x70, 0x6f,
0x6e, 0x73, 0x65, 0x12, 0x20, 0x0a, 0x0b, 0x61, 0x6c, 0x67, 0x6f, 0x72, 0x69, 0x74, 0x68, 0x6d,
0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x61, 0x6c, 0x67, 0x6f, 0x72, 0x69,
0x74, 0x68, 0x6d, 0x49, 0x44, 0x22, 0x53, 0x0a, 0x0b, 0x44, 0x61, 0x74, 0x61, 0x52, 0x65, 0x71,
0x75, 0x65, 0x73, 0x74, 0x12, 0x18, 0x0a, 0x07, 0x64, 0x61, 0x74, 0x61, 0x73, 0x65, 0x74, 0x18,
0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x64, 0x61, 0x74, 0x61, 0x73, 0x65, 0x74, 0x12, 0x1a,
0x0a, 0x08, 0x70, 0x72, 0x6f, 0x76, 0x69, 0x64, 0x65, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09,
0x52, 0x08, 0x70, 0x72, 0x6f, 0x76, 0x69, 0x64, 0x65, 0x72, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64,
0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x22, 0x2c, 0x0a, 0x0c, 0x44, 0x61,
0x74, 0x61, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1c, 0x0a, 0x09, 0x64, 0x61,
0x74, 0x61, 0x73, 0x65, 0x74, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x64,
0x61, 0x74, 0x61, 0x73, 0x65, 0x74, 0x49, 0x44, 0x22, 0x2b, 0x0a, 0x0d, 0x52, 0x65, 0x73, 0x75,
0x02, 0x69, 0x64, 0x22, 0x0e, 0x0a, 0x0c, 0x41, 0x6c, 0x67, 0x6f, 0x52, 0x65, 0x73, 0x70, 0x6f,
0x6e, 0x73, 0x65, 0x22, 0x53, 0x0a, 0x0b, 0x44, 0x61, 0x74, 0x61, 0x52, 0x65, 0x71, 0x75, 0x65,
0x73, 0x74, 0x12, 0x18, 0x0a, 0x07, 0x64, 0x61, 0x74, 0x61, 0x73, 0x65, 0x74, 0x18, 0x01, 0x20,
0x01, 0x28, 0x0c, 0x52, 0x07, 0x64, 0x61, 0x74, 0x61, 0x73, 0x65, 0x74, 0x12, 0x1a, 0x0a, 0x08,
0x70, 0x72, 0x6f, 0x76, 0x69, 0x64, 0x65, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08,
0x70, 0x72, 0x6f, 0x76, 0x69, 0x64, 0x65, 0x72, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x03,
0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x22, 0x0e, 0x0a, 0x0c, 0x44, 0x61, 0x74, 0x61,
0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x2b, 0x0a, 0x0d, 0x52, 0x65, 0x73, 0x75,
0x6c, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x63, 0x6f, 0x6e,
0x73, 0x75, 0x6d, 0x65, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x63, 0x6f, 0x6e,
0x73, 0x75, 0x6d, 0x65, 0x72, 0x22, 0x24, 0x0a, 0x0e, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x52,
+8 -4
View File
@@ -20,7 +20,7 @@ message AlgoRequest {
string id = 3;
}
message AlgoResponse { string algorithmID = 1; }
message AlgoResponse {}
message DataRequest {
bytes dataset = 1;
@@ -28,16 +28,20 @@ message DataRequest {
string id = 3;
}
message DataResponse { string datasetID = 1; }
message DataResponse {}
message ResultRequest {
string consumer = 1;
}
message ResultResponse { bytes file = 1; }
message ResultResponse {
bytes file = 1;
}
message AttestationRequest {
bytes report_data = 1;
}
message AttestationResponse { bytes file = 1; }
message AttestationResponse {
bytes file = 1;
}
+8 -14
View File
@@ -80,14 +80,12 @@ func encodeAlgoRequest(_ context.Context, request interface{}) (interface{}, err
// decodeAlgoResponse is a transport/grpc.DecodeResponseFunc that
// converts a gRPC AlgoResponse to a user-domain response.
func decodeAlgoResponse(_ context.Context, grpcResponse interface{}) (interface{}, error) {
response, ok := grpcResponse.(*agent.AlgoResponse)
_, ok := grpcResponse.(*agent.AlgoResponse)
if !ok {
return nil, fmt.Errorf("invalid response type: %T", grpcResponse)
}
return algoRes{
AlgorithmID: response.AlgorithmID,
}, nil
return algoRes{}, nil
}
// encodeDataRequest is a transport/grpc.EncodeRequestFunc that
@@ -108,14 +106,12 @@ func encodeDataRequest(_ context.Context, request interface{}) (interface{}, err
// decodeDataResponse is a transport/grpc.DecodeResponseFunc that
// converts a gRPC DataResponse to a user-domain response.
func decodeDataResponse(_ context.Context, grpcResponse interface{}) (interface{}, error) {
response, ok := grpcResponse.(*agent.DataResponse)
_, ok := grpcResponse.(*agent.DataResponse)
if !ok {
return nil, fmt.Errorf("invalid response type: %T", grpcResponse)
}
return dataRes{
DatasetID: response.DatasetID,
}, nil
return dataRes{}, nil
}
// encodeResultRequest is a transport/grpc.EncodeRequestFunc that
@@ -172,13 +168,12 @@ func (c grpcClient) Algo(ctx context.Context, request *agent.AlgoRequest, _ ...g
ctx, cancel := context.WithTimeout(ctx, c.timeout)
defer cancel()
res, err := c.algo(ctx, &algoReq{Algorithm: request.Algorithm, Provider: request.Provider, Id: request.Id})
_, err := c.algo(ctx, &algoReq{Algorithm: request.Algorithm, Provider: request.Provider, Id: request.Id})
if err != nil {
return nil, err
}
algoRes := res.(algoRes)
return &agent.AlgoResponse{AlgorithmID: algoRes.AlgorithmID}, nil
return &agent.AlgoResponse{}, nil
}
// Data implements the Data method of the agent.AgentServiceClient interface.
@@ -186,13 +181,12 @@ func (c grpcClient) Data(ctx context.Context, request *agent.DataRequest, _ ...g
ctx, cancel := context.WithTimeout(ctx, c.timeout)
defer cancel()
res, err := c.data(ctx, &dataReq{Dataset: request.Dataset, Provider: request.Provider, Id: request.Id})
_, err := c.data(ctx, &dataReq{Dataset: request.Dataset, Provider: request.Provider, Id: request.Id})
if err != nil {
return nil, err
}
dataRes := res.(dataRes)
return &agent.DataResponse{DatasetID: dataRes.DatasetID}, nil
return &agent.DataResponse{}, nil
}
// Result implements the Result method of the agent.AgentServiceClient interface.
+4 -4
View File
@@ -19,12 +19,12 @@ func algoEndpoint(svc agent.Service) endpoint.Endpoint {
algo := agent.Algorithm{Algorithm: req.Algorithm, Provider: req.Provider, ID: req.Id}
algorithmID, err := svc.Algo(ctx, algo)
err := svc.Algo(ctx, algo)
if err != nil {
return algoRes{}, err
}
return algoRes{AlgorithmID: algorithmID}, nil
return algoRes{}, nil
}
}
@@ -38,12 +38,12 @@ func dataEndpoint(svc agent.Service) endpoint.Endpoint {
dataset := agent.Dataset{Dataset: req.Dataset, Provider: req.Provider, ID: req.Id}
datasetID, err := svc.Data(ctx, dataset)
err := svc.Data(ctx, dataset)
if err != nil {
return dataRes{}, err
}
return dataRes{DatasetID: datasetID}, nil
return dataRes{}, nil
}
}
+2 -6
View File
@@ -2,13 +2,9 @@
// SPDX-License-Identifier: Apache-2.0
package grpc
type algoRes struct {
AlgorithmID string `json:"algorithmId,omitempty"`
}
type algoRes struct{}
type dataRes struct {
DatasetID string `json:"datasetId,omitempty"`
}
type dataRes struct{}
type resultRes struct {
File []byte `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
+2 -8
View File
@@ -54,10 +54,7 @@ func decodeAlgoRequest(_ context.Context, grpcReq interface{}) (interface{}, err
}
func encodeAlgoResponse(_ context.Context, response interface{}) (interface{}, error) {
res := response.(algoRes)
return &agent.AlgoResponse{
AlgorithmID: res.AlgorithmID,
}, nil
return &agent.AlgoResponse{}, nil
}
func decodeDataRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
@@ -71,10 +68,7 @@ func decodeDataRequest(_ context.Context, grpcReq interface{}) (interface{}, err
}
func encodeDataResponse(_ context.Context, response interface{}) (interface{}, error) {
res := response.(dataRes)
return &agent.DataResponse{
DatasetID: res.DatasetID,
}, nil
return &agent.DataResponse{}, nil
}
func decodeResultRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
+2 -15
View File
@@ -27,20 +27,7 @@ func LoggingMiddleware(svc agent.Service, logger *slog.Logger) agent.Service {
return &loggingMiddleware{logger, svc}
}
func (lm *loggingMiddleware) Run(ac agent.Computation) (response string, err error) {
defer func(begin time.Time) {
message := fmt.Sprintf("Method Run for computation %s took %s to complete", ac.ID, time.Since(begin))
if err != nil {
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
return
}
lm.logger.Info(fmt.Sprintf("%s without errors.", message))
}(time.Now())
return lm.svc.Run(ac)
}
func (lm *loggingMiddleware) Algo(ctx context.Context, algorithm agent.Algorithm) (response string, err error) {
func (lm *loggingMiddleware) Algo(ctx context.Context, algorithm agent.Algorithm) (err error) {
defer func(begin time.Time) {
message := fmt.Sprintf("Method Algo took %s to complete", time.Since(begin))
if err != nil {
@@ -53,7 +40,7 @@ func (lm *loggingMiddleware) Algo(ctx context.Context, algorithm agent.Algorithm
return lm.svc.Algo(ctx, algorithm)
}
func (lm *loggingMiddleware) Data(ctx context.Context, dataset agent.Dataset) (response string, err error) {
func (lm *loggingMiddleware) Data(ctx context.Context, dataset agent.Dataset) (err error) {
defer func(begin time.Time) {
message := fmt.Sprintf("Method Data took %s to complete", time.Since(begin))
if err != nil {
+2 -11
View File
@@ -32,16 +32,7 @@ func MetricsMiddleware(svc agent.Service, counter metrics.Counter, latency metri
}
}
func (ms *metricsMiddleware) Run(ac agent.Computation) (string, error) {
defer func(begin time.Time) {
ms.counter.With("method", "run").Add(1)
ms.latency.With("method", "run").Observe(time.Since(begin).Seconds())
}(time.Now())
return ms.svc.Run(ac)
}
func (ms *metricsMiddleware) Algo(ctx context.Context, algorithm agent.Algorithm) (string, error) {
func (ms *metricsMiddleware) Algo(ctx context.Context, algorithm agent.Algorithm) error {
defer func(begin time.Time) {
ms.counter.With("method", "algo").Add(1)
ms.latency.With("method", "algo").Observe(time.Since(begin).Seconds())
@@ -50,7 +41,7 @@ func (ms *metricsMiddleware) Algo(ctx context.Context, algorithm agent.Algorithm
return ms.svc.Algo(ctx, algorithm)
}
func (ms *metricsMiddleware) Data(ctx context.Context, dataset agent.Dataset) (string, error) {
func (ms *metricsMiddleware) Data(ctx context.Context, dataset agent.Dataset) error {
defer func(begin time.Time) {
ms.counter.With("method", "data").Add(1)
ms.latency.With("method", "data").Observe(time.Since(begin).Seconds())
+8 -6
View File
@@ -50,17 +50,19 @@ func (a *Algorithms) String() string {
}
type Dataset struct {
Dataset []byte `json:"-"`
Provider string `json:"provider,omitempty"`
ID string `json:"id,omitempty"`
Dataset []byte `json:"-"`
Hash [32]byte `json:"hash,omitempty"`
Provider string `json:"provider,omitempty"`
ID string `json:"id,omitempty"`
}
type Datasets []Dataset
type Algorithm struct {
Algorithm []byte `json:"-"`
Provider string `json:"provider,omitempty"`
ID string `json:"id,omitempty"`
Algorithm []byte `json:"-"`
Hash [32]byte `json:"hash,omitempty"`
Provider string `json:"provider,omitempty"`
ID string `json:"id,omitempty"`
}
type Algorithms []Algorithm
+33 -47
View File
@@ -5,9 +5,7 @@ package agent
import (
"context"
"crypto/sha256"
"crypto/sha512"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
@@ -18,6 +16,7 @@ import (
"github.com/google/go-sev-guest/client"
"github.com/ultravioletrs/cocos/agent/events"
"github.com/ultravioletrs/cocos/pkg/socket"
"golang.org/x/crypto/sha3"
)
var _ Service = (*agentService)(nil)
@@ -43,14 +42,15 @@ var (
errResultsNotReady = errors.New("computation results are not yet ready")
// errStateNotReady agent received a request in the wrong state.
errStateNotReady = errors.New("agent not expecting this operation in the current state")
// errHashMismatch provided algorithm/dataset does not match hash in manifest.
errHashMismatch = errors.New("malformed data, hash does not match manifest")
)
// Service specifies an API that must be fullfiled by the domain service
// implementation, and all of its decorators (e.g. logging & metrics).
type Service interface {
Run(c Computation) (string, error)
Algo(ctx context.Context, algorithm Algorithm) (string, error)
Data(ctx context.Context, dataset Dataset) (string, error)
Algo(ctx context.Context, algorithm Algorithm) error
Data(ctx context.Context, dataset Dataset) error
Result(ctx context.Context, consumer string) ([]byte, error)
Attestation(ctx context.Context, reportData []byte) ([]byte, error)
}
@@ -73,7 +73,7 @@ const (
var _ Service = (*agentService)(nil)
// New instantiates the agent service implementation.
func New(ctx context.Context, logger *slog.Logger, eventSvc events.Service) Service {
func New(ctx context.Context, logger *slog.Logger, eventSvc events.Service, cmp Computation) Service {
svc := &agentService{
sm: NewStateMachine(logger),
eventSvc: eventSvc,
@@ -87,42 +87,32 @@ func New(ctx context.Context, logger *slog.Logger, eventSvc events.Service) Serv
svc.sm.StateFunctions[resultsReady] = svc.publishEvent("in-progress", json.RawMessage{})
svc.sm.StateFunctions[complete] = svc.publishEvent("in-progress", json.RawMessage{})
svc.sm.StateFunctions[running] = svc.runComputation
svc.computation = cmp
svc.sm.SendEvent(manifestsReceived)
return svc
}
func (as *agentService) Run(c Computation) (string, error) {
if as.sm.GetState() != receivingManifests {
return "", errStateNotReady
}
cmpJSON, err := json.Marshal(c)
if err != nil {
return "", err
}
as.computation = c
as.sm.SendEvent(manifestsReceived)
// Calculate the SHA-256 hash of the algorithm
hash := sha256.Sum256(cmpJSON)
cmpHash := hex.EncodeToString(hash[:])
return cmpHash, nil // return computation hash.
}
func (as *agentService) Algo(ctx context.Context, algorithm Algorithm) (string, error) {
func (as *agentService) Algo(ctx context.Context, algorithm Algorithm) error {
if as.sm.GetState() != receivingAlgorithms {
return "", errStateNotReady
return errStateNotReady
}
if len(as.computation.Algorithms) == 0 {
return "", errAllManifestItemsReceived
return errAllManifestItemsReceived
}
hash := sha3.Sum256(algorithm.Algorithm)
index := containsID(as.computation.Algorithms, algorithm.ID)
switch index {
case -1:
return "", errUndeclaredAlgorithm
return errUndeclaredAlgorithm
default:
if as.computation.Algorithms[index].Provider != algorithm.Provider {
return "", errProviderMissmatch
return errProviderMissmatch
}
if hash != as.computation.Algorithms[index].Hash {
return errHashMismatch
}
as.computation.Algorithms = slices.Delete(as.computation.Algorithms, index, index+1)
}
@@ -133,28 +123,29 @@ func (as *agentService) Algo(ctx context.Context, algorithm Algorithm) (string,
as.sm.SendEvent(algorithmsReceived)
}
// Calculate the SHA-256 hash of the algorithm.
hash := sha256.Sum256(algorithm.Algorithm)
algorithmHash := hex.EncodeToString(hash[:])
// Return the algorithm hash or an error.
return algorithmHash, nil
return nil
}
func (as *agentService) Data(ctx context.Context, dataset Dataset) (string, error) {
func (as *agentService) Data(ctx context.Context, dataset Dataset) error {
if as.sm.GetState() != receivingData {
return "", errStateNotReady
return errStateNotReady
}
if len(as.computation.Datasets) == 0 {
return "", errAllManifestItemsReceived
return errAllManifestItemsReceived
}
hash := sha3.Sum256(dataset.Dataset)
index := containsID(as.computation.Datasets, dataset.ID)
switch index {
case -1:
return "", errUndeclaredDataset
return errUndeclaredDataset
default:
if as.computation.Datasets[index].Provider != dataset.Provider {
return "", errProviderMissmatch
return errProviderMissmatch
}
if hash != as.computation.Datasets[index].Hash {
return errHashMismatch
}
as.computation.Datasets = slices.Delete(as.computation.Datasets, index, index+1)
}
@@ -165,12 +156,7 @@ func (as *agentService) Data(ctx context.Context, dataset Dataset) (string, erro
as.sm.SendEvent(dataReceived)
}
// Calculate the SHA-256 hash of the dataset.
hash := sha256.Sum256(dataset.Dataset)
datasetHash := hex.EncodeToString(hash[:])
// Return the dataset hash or an error.
return datasetHash, nil
return nil
}
func (as *agentService) Result(ctx context.Context, consumer string) ([]byte, error) {
+2 -3
View File
@@ -32,12 +32,11 @@ func (cli *CLI) NewAlgorithmsCmd() *cobra.Command {
Provider: args[2],
}
response, err := cli.agentSDK.Algo(cmd.Context(), algoReq)
if err != nil {
if err := cli.agentSDK.Algo(cmd.Context(), algoReq); err != nil {
log.Fatalf("Error uploading algorithm with ID %s and provider %s: %v", algoReq.ID, algoReq.Provider, err)
}
log.Println("Successfully uploaded algorithm:", response)
log.Println("Successfully uploaded algorithm")
},
}
}
+2 -3
View File
@@ -32,12 +32,11 @@ func (cli *CLI) NewDatasetsCmd() *cobra.Command {
Provider: args[2],
}
response, err := cli.agentSDK.Data(cmd.Context(), dataReq)
if err != nil {
if err := cli.agentSDK.Data(cmd.Context(), dataReq); err != nil {
log.Fatalf("Error uploading dataset: %v", err)
}
log.Println("Response:", response)
log.Println("Successfully uploaded dataset")
},
}
}
+3 -11
View File
@@ -57,15 +57,7 @@ func main() {
return
}
defer eventSvc.Close()
svc := newService(ctx, logger, eventSvc)
if _, err := svc.Run(cfg); err != nil {
if err := eventSvc.SendEvent("init", "failed", json.RawMessage{}); err != nil {
logger.Warn(err.Error())
}
logger.Error(fmt.Sprintf("failed to run computation with err: %s", err))
return
}
svc := newService(ctx, logger, eventSvc, cfg)
grpcServerConfig := server.Config{
Port: cfg.AgentConfig.Port,
@@ -95,8 +87,8 @@ func main() {
}
}
func newService(ctx context.Context, logger *slog.Logger, eventSvc events.Service) agent.Service {
svc := agent.New(ctx, logger, eventSvc)
func newService(ctx context.Context, logger *slog.Logger, eventSvc events.Service, cmp agent.Computation) agent.Service {
svc := agent.New(ctx, logger, eventSvc, cmp)
svc = api.LoggingMiddleware(svc, logger)
counter, latency := internal.MakeMetrics(svcName, "api")
+3 -3
View File
@@ -1,6 +1,6 @@
module github.com/ultravioletrs/cocos
go 1.21.4
go 1.21.6
require (
github.com/absmach/magistrala v0.0.0-20240119191055-d95283d31472
@@ -20,6 +20,7 @@ require (
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.21.0
go.opentelemetry.io/otel/sdk v1.21.0
go.opentelemetry.io/otel/trace v1.21.0
golang.org/x/crypto v0.19.0
golang.org/x/sync v0.6.0
google.golang.org/grpc v1.60.1
google.golang.org/protobuf v1.32.0
@@ -49,9 +50,8 @@ require (
go.opentelemetry.io/otel/metric v1.21.0 // indirect
go.opentelemetry.io/proto/otlp v1.0.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.18.0 // indirect
golang.org/x/net v0.20.0 // indirect
golang.org/x/sys v0.16.0 // indirect
golang.org/x/sys v0.17.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240108191215-35c7eff3a6b1 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240108191215-35c7eff3a6b1 // indirect
+4 -4
View File
@@ -113,8 +113,8 @@ go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.18.0 h1:PGVlW0xEltQnzFZ55hkuX5+KLyrMYhHld1YHO4AKcdc=
golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg=
golang.org/x/crypto v0.19.0 h1:ENy+Az/9Y1vSrlrvBSyna3PITt4tiZLf7sgCjZBX7Wo=
golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
@@ -133,8 +133,8 @@ golang.org/x/sys v0.0.0-20210426230700-d19ff857e887/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y=
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+2
View File
@@ -59,11 +59,13 @@ message ComputationRunReq {
message Dataset {
string provider = 1;
string id = 2;
bytes hash = 3;
}
message Algorithm {
string provider = 1;
string id = 2;
bytes hash = 3;
}
message AgentConfig {
+2 -2
View File
@@ -81,10 +81,10 @@ func (ms *managerService) Run(ctx context.Context, c *manager.ComputationRunReq)
},
}
for _, algo := range c.Algorithms {
ac.Algorithms = append(ac.Algorithms, agent.Algorithm{ID: algo.Id, Provider: algo.Provider})
ac.Algorithms = append(ac.Algorithms, agent.Algorithm{ID: algo.Id, Provider: algo.Provider, Hash: [32]byte(algo.Hash)})
}
for _, data := range c.Datasets {
ac.Datasets = append(ac.Datasets, agent.Dataset{ID: data.Id, Provider: data.Provider})
ac.Datasets = append(ac.Datasets, agent.Dataset{ID: data.Id, Provider: data.Provider, Hash: [32]byte(data.Hash)})
}
agentPort, err := getFreePort()
+44 -26
View File
@@ -486,6 +486,7 @@ type Dataset struct {
Provider string `protobuf:"bytes,1,opt,name=provider,proto3" json:"provider,omitempty"`
Id string `protobuf:"bytes,2,opt,name=id,proto3" json:"id,omitempty"`
Hash []byte `protobuf:"bytes,3,opt,name=hash,proto3" json:"hash,omitempty"`
}
func (x *Dataset) Reset() {
@@ -534,6 +535,13 @@ func (x *Dataset) GetId() string {
return ""
}
func (x *Dataset) GetHash() []byte {
if x != nil {
return x.Hash
}
return nil
}
type Algorithm struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
@@ -541,6 +549,7 @@ type Algorithm struct {
Provider string `protobuf:"bytes,1,opt,name=provider,proto3" json:"provider,omitempty"`
Id string `protobuf:"bytes,2,opt,name=id,proto3" json:"id,omitempty"`
Hash []byte `protobuf:"bytes,3,opt,name=hash,proto3" json:"hash,omitempty"`
}
func (x *Algorithm) Reset() {
@@ -589,6 +598,13 @@ func (x *Algorithm) GetId() string {
return ""
}
func (x *Algorithm) GetHash() []byte {
if x != nil {
return x.Hash
}
return nil
}
type AgentConfig struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
@@ -753,35 +769,37 @@ var file_manager_manager_proto_rawDesc = []byte{
0x73, 0x12, 0x37, 0x0a, 0x0c, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x5f, 0x63, 0x6f, 0x6e, 0x66, 0x69,
0x67, 0x18, 0x07, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65,
0x72, 0x2e, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x0b, 0x61,
0x67, 0x65, 0x6e, 0x74, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x22, 0x35, 0x0a, 0x07, 0x44, 0x61,
0x67, 0x65, 0x6e, 0x74, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x22, 0x49, 0x0a, 0x07, 0x44, 0x61,
0x74, 0x61, 0x73, 0x65, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x72, 0x6f, 0x76, 0x69, 0x64, 0x65,
0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x70, 0x72, 0x6f, 0x76, 0x69, 0x64, 0x65,
0x72, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69,
0x64, 0x22, 0x37, 0x0a, 0x09, 0x41, 0x6c, 0x67, 0x6f, 0x72, 0x69, 0x74, 0x68, 0x6d, 0x12, 0x1a,
0x0a, 0x08, 0x70, 0x72, 0x6f, 0x76, 0x69, 0x64, 0x65, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09,
0x52, 0x08, 0x70, 0x72, 0x6f, 0x76, 0x69, 0x64, 0x65, 0x72, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64,
0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x22, 0xd6, 0x01, 0x0a, 0x0b, 0x41,
0x67, 0x65, 0x6e, 0x74, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x12, 0x0a, 0x04, 0x70, 0x6f,
0x72, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x70, 0x6f, 0x72, 0x74, 0x12, 0x12,
0x0a, 0x04, 0x68, 0x6f, 0x73, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x68, 0x6f,
0x73, 0x74, 0x12, 0x1b, 0x0a, 0x09, 0x63, 0x65, 0x72, 0x74, 0x5f, 0x66, 0x69, 0x6c, 0x65, 0x18,
0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x63, 0x65, 0x72, 0x74, 0x46, 0x69, 0x6c, 0x65, 0x12,
0x19, 0x0a, 0x08, 0x6b, 0x65, 0x79, 0x5f, 0x66, 0x69, 0x6c, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28,
0x09, 0x52, 0x07, 0x6b, 0x65, 0x79, 0x46, 0x69, 0x6c, 0x65, 0x12, 0x24, 0x0a, 0x0e, 0x63, 0x6c,
0x69, 0x65, 0x6e, 0x74, 0x5f, 0x63, 0x61, 0x5f, 0x66, 0x69, 0x6c, 0x65, 0x18, 0x05, 0x20, 0x01,
0x28, 0x09, 0x52, 0x0c, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x43, 0x61, 0x46, 0x69, 0x6c, 0x65,
0x12, 0x24, 0x0a, 0x0e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x5f, 0x63, 0x61, 0x5f, 0x66, 0x69,
0x6c, 0x65, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72,
0x43, 0x61, 0x46, 0x69, 0x6c, 0x65, 0x12, 0x1b, 0x0a, 0x09, 0x6c, 0x6f, 0x67, 0x5f, 0x6c, 0x65,
0x76, 0x65, 0x6c, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x6c, 0x6f, 0x67, 0x4c, 0x65,
0x76, 0x65, 0x6c, 0x32, 0x5b, 0x0a, 0x0e, 0x4d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x53, 0x65,
0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x49, 0x0a, 0x07, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73,
0x12, 0x1c, 0x2e, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x2e, 0x43, 0x6c, 0x69, 0x65, 0x6e,
0x74, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x1a, 0x1a,
0x2e, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x2e, 0x43, 0x6f, 0x6d, 0x70, 0x75, 0x74, 0x61,
0x74, 0x69, 0x6f, 0x6e, 0x52, 0x75, 0x6e, 0x52, 0x65, 0x71, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01,
0x42, 0x0b, 0x5a, 0x09, 0x2e, 0x2f, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x62, 0x06, 0x70,
0x72, 0x6f, 0x74, 0x6f, 0x33,
0x64, 0x12, 0x12, 0x0a, 0x04, 0x68, 0x61, 0x73, 0x68, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52,
0x04, 0x68, 0x61, 0x73, 0x68, 0x22, 0x4b, 0x0a, 0x09, 0x41, 0x6c, 0x67, 0x6f, 0x72, 0x69, 0x74,
0x68, 0x6d, 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x72, 0x6f, 0x76, 0x69, 0x64, 0x65, 0x72, 0x18, 0x01,
0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x70, 0x72, 0x6f, 0x76, 0x69, 0x64, 0x65, 0x72, 0x12, 0x0e,
0x0a, 0x02, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x12,
0x0a, 0x04, 0x68, 0x61, 0x73, 0x68, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x68, 0x61,
0x73, 0x68, 0x22, 0xd6, 0x01, 0x0a, 0x0b, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x43, 0x6f, 0x6e, 0x66,
0x69, 0x67, 0x12, 0x12, 0x0a, 0x04, 0x70, 0x6f, 0x72, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09,
0x52, 0x04, 0x70, 0x6f, 0x72, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x68, 0x6f, 0x73, 0x74, 0x18, 0x02,
0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x68, 0x6f, 0x73, 0x74, 0x12, 0x1b, 0x0a, 0x09, 0x63, 0x65,
0x72, 0x74, 0x5f, 0x66, 0x69, 0x6c, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x63,
0x65, 0x72, 0x74, 0x46, 0x69, 0x6c, 0x65, 0x12, 0x19, 0x0a, 0x08, 0x6b, 0x65, 0x79, 0x5f, 0x66,
0x69, 0x6c, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6b, 0x65, 0x79, 0x46, 0x69,
0x6c, 0x65, 0x12, 0x24, 0x0a, 0x0e, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x5f, 0x63, 0x61, 0x5f,
0x66, 0x69, 0x6c, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x63, 0x6c, 0x69, 0x65,
0x6e, 0x74, 0x43, 0x61, 0x46, 0x69, 0x6c, 0x65, 0x12, 0x24, 0x0a, 0x0e, 0x73, 0x65, 0x72, 0x76,
0x65, 0x72, 0x5f, 0x63, 0x61, 0x5f, 0x66, 0x69, 0x6c, 0x65, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09,
0x52, 0x0c, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x43, 0x61, 0x46, 0x69, 0x6c, 0x65, 0x12, 0x1b,
0x0a, 0x09, 0x6c, 0x6f, 0x67, 0x5f, 0x6c, 0x65, 0x76, 0x65, 0x6c, 0x18, 0x07, 0x20, 0x01, 0x28,
0x09, 0x52, 0x08, 0x6c, 0x6f, 0x67, 0x4c, 0x65, 0x76, 0x65, 0x6c, 0x32, 0x5b, 0x0a, 0x0e, 0x4d,
0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x49, 0x0a,
0x07, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x12, 0x1c, 0x2e, 0x6d, 0x61, 0x6e, 0x61, 0x67,
0x65, 0x72, 0x2e, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x4d,
0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x1a, 0x1a, 0x2e, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72,
0x2e, 0x43, 0x6f, 0x6d, 0x70, 0x75, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x75, 0x6e, 0x52,
0x65, 0x71, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x42, 0x0b, 0x5a, 0x09, 0x2e, 0x2f, 0x6d, 0x61,
0x6e, 0x61, 0x67, 0x65, 0x72, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
+8 -14
View File
@@ -23,40 +23,34 @@ func NewAgentSDK(log *slog.Logger, agentClient agent.AgentServiceClient) *agentS
}
}
func (sdk *agentSDK) Run(ac agent.Computation) (string, error) {
return "", nil
}
func (sdk *agentSDK) Algo(ctx context.Context, algorithm agent.Algorithm) (string, error) {
func (sdk *agentSDK) Algo(ctx context.Context, algorithm agent.Algorithm) error {
request := &agent.AlgoRequest{
Algorithm: algorithm.Algorithm,
Provider: algorithm.Provider,
Id: algorithm.ID,
}
response, err := sdk.client.Algo(ctx, request)
if err != nil {
if _, err := sdk.client.Algo(ctx, request); err != nil {
sdk.logger.Error("Failed to call Algo RPC")
return "", err
return err
}
return response.AlgorithmID, nil
return nil
}
func (sdk *agentSDK) Data(ctx context.Context, dataset agent.Dataset) (string, error) {
func (sdk *agentSDK) Data(ctx context.Context, dataset agent.Dataset) error {
request := &agent.DataRequest{
Dataset: dataset.Dataset,
Provider: dataset.Provider,
Id: dataset.ID,
}
response, err := sdk.client.Data(ctx, request)
if err != nil {
if _, err := sdk.client.Data(ctx, request); err != nil {
sdk.logger.Error("Failed to call Data RPC")
return "", err
return err
}
return response.DatasetID, nil
return nil
}
func (sdk *agentSDK) Result(ctx context.Context, consumer string) ([]byte, error) {