Files
Sammy Kerata Oina b44780df95
CI / lint (push) Has been cancelled
CI / test (agent) (push) Has been cancelled
CI / test (cli) (push) Has been cancelled
CI / test (cmd) (push) Has been cancelled
CI / test (internal) (push) Has been cancelled
CI / test (manager, true) (push) Has been cancelled
CI / test (pkg) (push) Has been cancelled
CI / upload-coverage (push) Has been cancelled
NOISSUE - Enhance OCI image extraction to return algorithm and requirements paths, and add deferred cleanup for temporary files (#586)
* feat: Enhance OCI image extraction to return algorithm and requirements paths, and add deferred cleanup for temporary files.

Signed-off-by: Sammy Oina <sammyoina@gmail.com>

* feat: implement deterministic zipping and enhance checksum verification for resources

Signed-off-by: Sammy Oina <sammyoina@gmail.com>

* feat: Update component build sources, add gRPC health checks to the CVM server, and refine algorithm argument handling and documentation.

Signed-off-by: Sammy Oina <sammyoina@gmail.com>

* docs: Update remote resources testing guide with `sudo` for KBS, algorithm result saving, `requirements.txt`, and `algo-args` for RVPS.

Signed-off-by: Sammy Oina <sammyoina@gmail.com>

* refactor: Explicitly ignore `stderr.Write` return values and add minor whitespace in tests.

Signed-off-by: Sammy Oina <sammyoina@gmail.com>

* test: add comprehensive error path and edge case tests for file, zip, OCI, and agent components.

Signed-off-by: Sammy Oina <sammyoina@gmail.com>

* feat: Add mutexes for thread-safe algorithm execution and expand recognized data file extensions to include common archive formats.

Signed-off-by: Sammy Oina <sammyoina@gmail.com>

* feat: Add OCI extraction tests for Python algorithms and multi-layer datasets, refactor algorithm execution for testability, and enhance algorithm stop and error handling tests.

Signed-off-by: Sammy Oina <sammyoina@gmail.com>

* test: Add error assertions to OCI extraction test helpers and remove an unused mock exec command.

Signed-off-by: Sammy Oina <sammyoina@gmail.com>

* test: Improve error handling test coverage for algorithm execution and OCI resource extraction.

Signed-off-by: Sammy Oina <sammyoina@gmail.com>

* fix: Improve algorithm process termination, enhance computation error handling, and add concurrency safety to agent service.

Signed-off-by: Sammy Oina <sammyoina@gmail.com>

---------

Signed-off-by: Sammy Oina <sammyoina@gmail.com>
2026-03-27 14:23:52 +01:00

371 lines
10 KiB
Go

// Copyright (c) Ultraviolet
// SPDX-License-Identifier: Apache-2.0
package oci
import (
"archive/tar"
"compress/gzip"
"context"
"encoding/json"
"fmt"
"io"
"log/slog"
"os"
"path/filepath"
"strings"
)
// OCILayout represents the OCI image layout.
type OCILayout struct {
ImageLayoutVersion string `json:"imageLayoutVersion"`
}
// OCIIndex represents the OCI index.json.
type OCIIndex struct {
SchemaVersion int `json:"schemaVersion"`
Manifests []struct {
MediaType string `json:"mediaType"`
Digest string `json:"digest"`
Size int `json:"size"`
} `json:"manifests"`
}
// ExtractAlgorithm extracts the algorithm file and optionally requirements.txt from an OCI image directory.
func ExtractAlgorithm(ctx context.Context, logger *slog.Logger, ociDir, destPath, algoType string) (string, string, error) {
// Read index.json to find manifest
indexPath := filepath.Join(ociDir, "index.json")
indexData, err := os.ReadFile(indexPath)
if err != nil {
return "", "", fmt.Errorf("failed to read index.json: %w", err)
}
var index OCIIndex
if err := json.Unmarshal(indexData, &index); err != nil {
return "", "", fmt.Errorf("failed to parse index.json: %w", err)
}
if len(index.Manifests) == 0 {
return "", "", fmt.Errorf("no manifests found in index.json")
}
// Get the first manifest digest
manifestDigest := index.Manifests[0].Digest
manifestPath := filepath.Join(ociDir, "blobs", strings.Replace(manifestDigest, ":", "/", 1))
// Read manifest to find layers
manifestData, err := os.ReadFile(manifestPath)
if err != nil {
return "", "", fmt.Errorf("failed to read manifest: %w", err)
}
var manifest struct {
Layers []struct {
Digest string `json:"digest"`
} `json:"layers"`
}
if err := json.Unmarshal(manifestData, &manifest); err != nil {
return "", "", fmt.Errorf("failed to parse manifest: %w", err)
}
// Extract layers to find algorithm files
logger.Debug("found layers in manifest", "count", len(manifest.Layers))
var algorithmPath string
var requirementsPath string
var allSeenFiles []string
// Process layers in reverse order (top layers first)
for i := len(manifest.Layers) - 1; i >= 0; i-- {
layer := manifest.Layers[i]
layerPath := filepath.Join(ociDir, "blobs", strings.Replace(layer.Digest, ":", "/", 1))
// Try to extract and find algorithm file
algoP, reqP, seenFiles, err := extractLayerAndFindAlgorithm(logger, layerPath, destPath, algoType)
if len(seenFiles) > 0 {
allSeenFiles = append(allSeenFiles, seenFiles...)
}
if err != nil {
logger.Warn("failed to extract layer", "digest", layer.Digest, "error", err)
continue
}
if algoP != "" && algorithmPath == "" {
algorithmPath = algoP
}
if reqP != "" && requirementsPath == "" {
requirementsPath = reqP
}
// If we found both, we can stop
if algorithmPath != "" && (algoType != "python" || requirementsPath != "") {
break
}
}
if algorithmPath == "" {
return "", "", fmt.Errorf("no algorithm file found. Seen files: %v", allSeenFiles)
}
return algorithmPath, requirementsPath, nil
}
// extractLayerAndFindAlgorithm extracts a layer and searches for algorithm files.
func extractLayerAndFindAlgorithm(logger *slog.Logger, layerPath, destPath, algoType string) (string, string, []string, error) {
// Open layer file
layerFile, err := os.Open(layerPath)
if err != nil {
return "", "", nil, fmt.Errorf("failed to open layer: %w", err)
}
defer layerFile.Close()
// Decompress gzip
gzReader, err := gzip.NewReader(layerFile)
if err != nil {
return "", "", nil, fmt.Errorf("failed to create gzip reader: %w", err)
}
defer gzReader.Close()
// Read tar archive
tarReader := tar.NewReader(gzReader)
var algorithmPath string
var requirementsPath string
seenFiles := []string{}
for {
header, err := tarReader.Next()
if err == io.EOF {
break
}
if err != nil {
return "", "", seenFiles, fmt.Errorf("failed to read tar header: %w", err)
}
logger.Debug("inspecting file in layer", "name", header.Name, "type", header.Typeflag)
// Skip directories
if header.Typeflag == tar.TypeDir {
continue
}
seenFiles = append(seenFiles, header.Name)
// Check if this is an algorithm file or requirements.txt
isAlgo := isAlgorithmFile(header.Name, header.Mode, algoType)
isReq := filepath.Base(header.Name) == "requirements.txt"
if isAlgo || isReq {
// Extract to destination, preserving directory structure
// Clean the name to prevent path traversal
cleanName := filepath.Clean(header.Name)
if strings.HasPrefix(cleanName, "..") || strings.HasPrefix(cleanName, "/") {
continue
}
targetPath := filepath.Join(destPath, cleanName)
if err := os.MkdirAll(filepath.Dir(targetPath), 0o755); err != nil {
return "", "", seenFiles, fmt.Errorf("failed to create dir: %w", err)
}
outFile, err := os.OpenFile(targetPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, os.FileMode(header.Mode))
if err != nil {
return "", "", seenFiles, fmt.Errorf("failed to create file: %w", err)
}
if _, err := io.Copy(outFile, tarReader); err != nil {
outFile.Close()
return "", "", seenFiles, fmt.Errorf("failed to write file: %w", err)
}
outFile.Close()
if isAlgo && algorithmPath == "" {
algorithmPath = targetPath
}
if isReq && requirementsPath == "" {
requirementsPath = targetPath
}
}
}
return algorithmPath, requirementsPath, seenFiles, nil
}
// isAlgorithmFile checks if a file is likely an algorithm file based on its name, mode and expected algorithm type.
func isAlgorithmFile(filename string, mode int64, algoType string) bool {
base := filepath.Base(filename)
baseLower := strings.ToLower(base)
// Common algorithm file names
algorithmNames := []string{"algorithm", "main", "run", "execute"}
switch algoType {
case "python":
return strings.HasSuffix(baseLower, ".py")
case "wasm":
return strings.HasSuffix(baseLower, ".wasm") || strings.HasSuffix(baseLower, ".wat")
case "bin":
// Ensure it doesn't have a known non-binary extension
nonBinExts := []string{".py", ".wasm", ".wat", ".js", ".sh", ".csv", ".json", ".txt", ".md"}
for _, ext := range nonBinExts {
if strings.HasSuffix(baseLower, ext) {
return false
}
}
// Check for common names
for _, name := range algorithmNames {
if strings.Contains(baseLower, name) {
return true
}
}
// Check if it's executable (at least one 'x' bit set)
return mode&0o111 != 0
case "docker":
// Docker algorithms are the whole image, this function shouldn't be used for them
return false
default:
// Unknown or empty algoType - no generic fallback to ensure explicit type usage
return false
}
}
// ExtractDataset extracts dataset files from an OCI image directory.
func ExtractDataset(ociDir, destPath string) ([]string, error) {
// Similar to ExtractAlgorithm but extracts all data files
// Read index.json to find manifest
indexPath := filepath.Join(ociDir, "index.json")
indexData, err := os.ReadFile(indexPath)
if err != nil {
return nil, fmt.Errorf("failed to read index.json: %w", err)
}
var index OCIIndex
if err := json.Unmarshal(indexData, &index); err != nil {
return nil, fmt.Errorf("failed to parse index.json: %w", err)
}
if len(index.Manifests) == 0 {
return nil, fmt.Errorf("no manifests found in index.json")
}
// Get the first manifest digest
manifestDigest := index.Manifests[0].Digest
manifestPath := filepath.Join(ociDir, "blobs", strings.Replace(manifestDigest, ":", "/", 1))
// Read manifest to find layers
manifestData, err := os.ReadFile(manifestPath)
if err != nil {
return nil, fmt.Errorf("failed to read manifest: %w", err)
}
var manifest struct {
Layers []struct {
Digest string `json:"digest"`
} `json:"layers"`
}
if err := json.Unmarshal(manifestData, &manifest); err != nil {
return nil, fmt.Errorf("failed to parse manifest: %w", err)
}
var datasetFiles []string
// Extract all layers and collect dataset files
// Iterate layers in reverse order to find user data first (usually in top layers)
for i := len(manifest.Layers) - 1; i >= 0; i-- {
layer := manifest.Layers[i]
layerPath := filepath.Join(ociDir, "blobs", strings.Replace(layer.Digest, ":", "/", 1))
files, err := extractLayerDataFiles(layerPath, destPath)
if err != nil {
slog.Warn("error extracting layer", "digest", layer.Digest, "error", err)
continue
}
datasetFiles = append(datasetFiles, files...)
}
if len(datasetFiles) == 0 {
return nil, fmt.Errorf("no dataset files found in OCI image layers")
}
return datasetFiles, nil
}
// extractLayerDataFiles extracts data files from a layer.
func extractLayerDataFiles(layerPath, destPath string) ([]string, error) {
layerFile, err := os.Open(layerPath)
if err != nil {
return nil, err
}
defer layerFile.Close()
gzReader, err := gzip.NewReader(layerFile)
if err != nil {
return nil, err
}
defer gzReader.Close()
tarReader := tar.NewReader(gzReader)
var extractedFiles []string
for {
header, err := tarReader.Next()
if err == io.EOF {
break
}
if err != nil {
return nil, err
}
if header.Typeflag == tar.TypeDir {
continue
}
// Check if this is a data file
if isDataFile(header.Name) {
// Extract to destination, preserving directory structure
cleanName := filepath.Clean(header.Name)
if strings.HasPrefix(cleanName, "..") || strings.HasPrefix(cleanName, "/") {
continue
}
targetPath := filepath.Join(destPath, cleanName)
if err := os.MkdirAll(filepath.Dir(targetPath), 0o755); err != nil {
return nil, err
}
outFile, err := os.OpenFile(targetPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, os.FileMode(header.Mode))
if err != nil {
return nil, err
}
if _, err := io.Copy(outFile, tarReader); err != nil {
outFile.Close()
return nil, err
}
outFile.Close()
extractedFiles = append(extractedFiles, targetPath)
}
}
return extractedFiles, nil
}
// isDataFile checks if a file is likely a dataset file.
func isDataFile(filename string) bool {
dataExts := []string{".csv", ".json", ".txt", ".parquet", ".arrow", ".dat", ".zip", ".tar", ".gz", ".tgz", ".tar.gz"}
baseLower := strings.ToLower(filepath.Base(filename))
for _, ext := range dataExts {
if strings.HasSuffix(baseLower, ext) {
return true
}
}
return false
}