|
|
|
@@ -1,8 +1,10 @@
|
|
|
|
|
package postinit
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"cmp"
|
|
|
|
|
"context"
|
|
|
|
|
"fmt"
|
|
|
|
|
"slices"
|
|
|
|
|
|
|
|
|
|
"github.com/docker/docker/api/types/container"
|
|
|
|
|
"github.com/docker/docker/client"
|
|
|
|
@@ -44,40 +46,65 @@ func NewPostInitMigrator(
|
|
|
|
|
|
|
|
|
|
// PostInitMigrate will run all post-init migrations, which require docker/kube clients for all edge or non-edge environments
|
|
|
|
|
func (postInitMigrator *PostInitMigrator) PostInitMigrate() error {
|
|
|
|
|
environments, err := postInitMigrator.dataStore.Endpoint().Endpoints()
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Error().Err(err).Msg("Error getting environments")
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
var environments []portainer.Endpoint
|
|
|
|
|
|
|
|
|
|
for _, environment := range environments {
|
|
|
|
|
// edge environments will run after the server starts, in pending actions
|
|
|
|
|
if endpoints.IsEdgeEndpoint(&environment) {
|
|
|
|
|
// Skip edge environments that do not have direct connectivity
|
|
|
|
|
if !endpoints.HasDirectConnectivity(&environment) {
|
|
|
|
|
if err := postInitMigrator.dataStore.UpdateTx(func(tx dataservices.DataStoreTx) error {
|
|
|
|
|
var err error
|
|
|
|
|
if environments, err = tx.Endpoint().ReadAll(func(endpoint portainer.Endpoint) bool {
|
|
|
|
|
return endpoints.HasDirectConnectivity(&endpoint)
|
|
|
|
|
}); err != nil {
|
|
|
|
|
return fmt.Errorf("failed to retrieve environments: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var pendingActions []portainer.PendingAction
|
|
|
|
|
if pendingActions, err = tx.PendingActions().ReadAll(func(action portainer.PendingAction) bool {
|
|
|
|
|
return action.Action == actions.PostInitMigrateEnvironment
|
|
|
|
|
}); err != nil {
|
|
|
|
|
return fmt.Errorf("failed to retrieve pending actions: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Sort for the binary search in createPostInitMigrationPendingAction()
|
|
|
|
|
slices.SortFunc(pendingActions, func(a, b portainer.PendingAction) int {
|
|
|
|
|
return cmp.Compare(a.EndpointID, b.EndpointID)
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
for _, environment := range environments {
|
|
|
|
|
if !endpoints.IsEdgeEndpoint(&environment) {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Edge environments will run after the server starts, in pending actions
|
|
|
|
|
log.Info().
|
|
|
|
|
Int("endpoint_id", int(environment.ID)).
|
|
|
|
|
Msg("adding pending action 'PostInitMigrateEnvironment' for environment")
|
|
|
|
|
|
|
|
|
|
if err := postInitMigrator.createPostInitMigrationPendingAction(environment.ID); err != nil {
|
|
|
|
|
if err := postInitMigrator.createPostInitMigrationPendingAction(tx, environment.ID, pendingActions); err != nil {
|
|
|
|
|
log.Error().
|
|
|
|
|
Err(err).
|
|
|
|
|
Int("endpoint_id", int(environment.ID)).
|
|
|
|
|
Msg("error creating pending action for environment")
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
// Non-edge environments will run before the server starts.
|
|
|
|
|
if err := postInitMigrator.MigrateEnvironment(&environment); err != nil {
|
|
|
|
|
log.Error().
|
|
|
|
|
Err(err).
|
|
|
|
|
Int("endpoint_id", int(environment.ID)).
|
|
|
|
|
Msg("error running post-init migrations for non-edge environment")
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return err
|
|
|
|
|
}); err != nil {
|
|
|
|
|
log.Error().Err(err).Msg("error running post-init migrations")
|
|
|
|
|
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for _, environment := range environments {
|
|
|
|
|
if endpoints.IsEdgeEndpoint(&environment) {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Non-edge environments will run before the server starts.
|
|
|
|
|
if err := postInitMigrator.MigrateEnvironment(&environment); err != nil {
|
|
|
|
|
log.Error().
|
|
|
|
|
Err(err).
|
|
|
|
|
Int("endpoint_id", int(environment.ID)).
|
|
|
|
|
Msg("error running post-init migrations for non-edge environment")
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
@@ -85,44 +112,47 @@ func (postInitMigrator *PostInitMigrator) PostInitMigrate() error {
|
|
|
|
|
|
|
|
|
|
// try to create a post init migration pending action. If it already exists, do nothing
|
|
|
|
|
// this function exists for readability, not reusability
|
|
|
|
|
func (postInitMigrator *PostInitMigrator) createPostInitMigrationPendingAction(environmentID portainer.EndpointID) error {
|
|
|
|
|
// pending actions must be passed in ascending order by endpoint ID
|
|
|
|
|
func (postInitMigrator *PostInitMigrator) createPostInitMigrationPendingAction(tx dataservices.DataStoreTx, environmentID portainer.EndpointID, pendingActions []portainer.PendingAction) error {
|
|
|
|
|
action := portainer.PendingAction{
|
|
|
|
|
EndpointID: environmentID,
|
|
|
|
|
Action: actions.PostInitMigrateEnvironment,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pendingActions, err := postInitMigrator.dataStore.PendingActions().ReadAll()
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("failed to retrieve pending actions: %w", err)
|
|
|
|
|
if _, found := slices.BinarySearchFunc(pendingActions, environmentID, func(e portainer.PendingAction, id portainer.EndpointID) int {
|
|
|
|
|
return cmp.Compare(e.EndpointID, id)
|
|
|
|
|
}); found {
|
|
|
|
|
log.Debug().
|
|
|
|
|
Str("action", action.Action).
|
|
|
|
|
Int("endpoint_id", int(action.EndpointID)).
|
|
|
|
|
Msg("pending action already exists for environment, skipping...")
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for _, dba := range pendingActions {
|
|
|
|
|
if dba.EndpointID == action.EndpointID && dba.Action == action.Action {
|
|
|
|
|
log.Debug().
|
|
|
|
|
Str("action", action.Action).
|
|
|
|
|
Int("endpoint_id", int(action.EndpointID)).
|
|
|
|
|
Msg("pending action already exists for environment, skipping...")
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return postInitMigrator.dataStore.PendingActions().Create(&action)
|
|
|
|
|
return tx.PendingActions().Create(&action)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// MigrateEnvironment runs migrations on a single environment
|
|
|
|
|
func (migrator *PostInitMigrator) MigrateEnvironment(environment *portainer.Endpoint) error {
|
|
|
|
|
log.Info().Msgf("Executing post init migration for environment %d", environment.ID)
|
|
|
|
|
log.Info().
|
|
|
|
|
Int("endpoint_id", int(environment.ID)).
|
|
|
|
|
Msg("executing post init migration for environment")
|
|
|
|
|
|
|
|
|
|
switch {
|
|
|
|
|
case endpointutils.IsKubernetesEndpoint(environment):
|
|
|
|
|
// get the kubeclient for the environment, and skip all kube migrations if there's an error
|
|
|
|
|
kubeclient, err := migrator.kubeFactory.GetPrivilegedKubeClient(environment)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Error().Err(err).Msgf("Error creating kubeclient for environment: %d", environment.ID)
|
|
|
|
|
log.Error().
|
|
|
|
|
Err(err).
|
|
|
|
|
Int("endpoint_id", int(environment.ID)).
|
|
|
|
|
Msg("error creating kubeclient for environment")
|
|
|
|
|
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// if one environment fails, it is logged and the next migration runs. The error is returned at the end and handled by pending actions
|
|
|
|
|
// If one environment fails, it is logged and the next migration runs. The error is returned at the end and handled by pending actions
|
|
|
|
|
if err := migrator.MigrateIngresses(*environment, kubeclient); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
@@ -132,12 +162,21 @@ func (migrator *PostInitMigrator) MigrateEnvironment(environment *portainer.Endp
|
|
|
|
|
// get the docker client for the environment, and skip all docker migrations if there's an error
|
|
|
|
|
dockerClient, err := migrator.dockerFactory.CreateClient(environment, "", nil)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Error().Err(err).Msgf("Error creating docker client for environment: %d", environment.ID)
|
|
|
|
|
log.Error().
|
|
|
|
|
Err(err).
|
|
|
|
|
Int("endpoint_id", int(environment.ID)).
|
|
|
|
|
Msg("error creating docker client for environment")
|
|
|
|
|
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
defer logs.CloseAndLogErr(dockerClient)
|
|
|
|
|
|
|
|
|
|
if err := migrator.MigrateGPUs(*environment, dockerClient); err != nil {
|
|
|
|
|
log.Error().
|
|
|
|
|
Err(err).
|
|
|
|
|
Int("endpoint_id", int(environment.ID)).
|
|
|
|
|
Msg("error migrating GPUs for environment")
|
|
|
|
|
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@@ -150,13 +189,20 @@ func (migrator *PostInitMigrator) MigrateIngresses(environment portainer.Endpoin
|
|
|
|
|
if !environment.PostInitMigrations.MigrateIngresses {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
log.Debug().Msgf("Migrating ingresses for environment %d", environment.ID)
|
|
|
|
|
|
|
|
|
|
err := migrator.kubeFactory.MigrateEndpointIngresses(&environment, migrator.dataStore, kubeclient)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Error().Err(err).Msgf("Error migrating ingresses for environment %d", environment.ID)
|
|
|
|
|
log.Debug().
|
|
|
|
|
Int("endpoint_id", int(environment.ID)).
|
|
|
|
|
Msg("migrating ingresses for environment")
|
|
|
|
|
|
|
|
|
|
if err := migrator.kubeFactory.MigrateEndpointIngresses(&environment, migrator.dataStore, kubeclient); err != nil {
|
|
|
|
|
log.Error().
|
|
|
|
|
Err(err).
|
|
|
|
|
Int("endpoint_id", int(environment.ID)).
|
|
|
|
|
Msg("error migrating ingresses for environment")
|
|
|
|
|
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@@ -166,29 +212,42 @@ func (migrator *PostInitMigrator) MigrateGPUs(e portainer.Endpoint, dockerClient
|
|
|
|
|
return migrator.dataStore.UpdateTx(func(tx dataservices.DataStoreTx) error {
|
|
|
|
|
environment, err := tx.Endpoint().Endpoint(e.ID)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Error().Err(err).Msgf("Error getting environment %d", e.ID)
|
|
|
|
|
log.Error().
|
|
|
|
|
Err(err).
|
|
|
|
|
Int("endpoint_id", int(e.ID)).
|
|
|
|
|
Msg("error getting environment")
|
|
|
|
|
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Early exit if we do not need to migrate!
|
|
|
|
|
if !environment.PostInitMigrations.MigrateGPUs {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
log.Debug().Msgf("Migrating GPUs for environment %d", e.ID)
|
|
|
|
|
|
|
|
|
|
// get all containers
|
|
|
|
|
log.Debug().
|
|
|
|
|
Int("endpoint_id", int(e.ID)).
|
|
|
|
|
Msg("migrating GPUs for environment")
|
|
|
|
|
|
|
|
|
|
// Get all containers
|
|
|
|
|
containers, err := dockerClient.ContainerList(context.Background(), container.ListOptions{All: true})
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Error().Err(err).Msgf("failed to list containers for environment %d", environment.ID)
|
|
|
|
|
log.Error().
|
|
|
|
|
Err(err).
|
|
|
|
|
Int("endpoint_id", int(environment.ID)).
|
|
|
|
|
Msg("failed to list containers for environment")
|
|
|
|
|
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// check for a gpu on each container. If even one GPU is found, set EnableGPUManagement to true for the whole environment
|
|
|
|
|
// Check for a gpu on each container. If even one GPU is found, set EnableGPUManagement to true for the whole environment
|
|
|
|
|
containersLoop:
|
|
|
|
|
for _, container := range containers {
|
|
|
|
|
// https://www.sobyte.net/post/2022-10/go-docker/ has nice documentation on the docker client with GPUs
|
|
|
|
|
containerDetails, err := dockerClient.ContainerInspect(context.Background(), container.ID)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Error().Err(err).Msg("failed to inspect container")
|
|
|
|
|
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@@ -202,10 +261,14 @@ func (migrator *PostInitMigrator) MigrateGPUs(e portainer.Endpoint, dockerClient
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// set the MigrateGPUs flag to false so we don't run this again
|
|
|
|
|
// Set the MigrateGPUs flag to false so we don't run this again
|
|
|
|
|
environment.PostInitMigrations.MigrateGPUs = false
|
|
|
|
|
if err := tx.Endpoint().UpdateEndpoint(environment.ID, environment); err != nil {
|
|
|
|
|
log.Error().Err(err).Msgf("Error updating EnableGPUManagement flag for environment %d", environment.ID)
|
|
|
|
|
log.Error().
|
|
|
|
|
Err(err).
|
|
|
|
|
Int("endpoint_id", int(environment.ID)).
|
|
|
|
|
Msg("error updating EnableGPUManagement flag for environment")
|
|
|
|
|
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|