mirror of
https://github.com/portainer/portainer.git
synced 2026-06-23 04:10:29 +00:00
feat(api/gitops): list and filter kubernetes git workflows (#2474)
This commit is contained in:
@@ -6,6 +6,7 @@ import (
|
||||
portainer "github.com/portainer/portainer/api"
|
||||
"github.com/portainer/portainer/api/dataservices"
|
||||
"github.com/portainer/portainer/api/http/security"
|
||||
"github.com/portainer/portainer/api/kubernetes/cli"
|
||||
httperror "github.com/portainer/portainer/pkg/libhttp/error"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
@@ -21,7 +22,7 @@ type Handler struct {
|
||||
fileService portainer.FileService
|
||||
}
|
||||
|
||||
func NewHandler(bouncer security.BouncerService, dataStore dataservices.DataStore, gitService portainer.GitService, fileService portainer.FileService) *Handler {
|
||||
func NewHandler(bouncer security.BouncerService, dataStore dataservices.DataStore, gitService portainer.GitService, fileService portainer.FileService, k8sFactory *cli.ClientFactory) *Handler {
|
||||
h := &Handler{
|
||||
Router: mux.NewRouter(),
|
||||
dataStore: dataStore,
|
||||
@@ -34,7 +35,7 @@ func NewHandler(bouncer security.BouncerService, dataStore dataservices.DataStor
|
||||
|
||||
authenticatedRouter.Handle("/gitops/repo/file/preview", httperror.LoggerHandler(h.gitOperationRepoFilePreview)).Methods(http.MethodPost)
|
||||
|
||||
workflowsHandler := workflows.NewHandler(dataStore, gitService)
|
||||
workflowsHandler := workflows.NewHandler(dataStore, gitService, k8sFactory)
|
||||
authenticatedRouter.PathPrefix("/gitops/workflows").Handler(workflowsHandler)
|
||||
|
||||
return h
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"github.com/portainer/portainer/api/dataservices"
|
||||
"github.com/portainer/portainer/api/http/security"
|
||||
"github.com/portainer/portainer/api/internal/authorization"
|
||||
"github.com/portainer/portainer/api/internal/endpointutils"
|
||||
"github.com/portainer/portainer/api/internal/snapshot"
|
||||
"github.com/portainer/portainer/api/set"
|
||||
"github.com/portainer/portainer/api/slicesx"
|
||||
@@ -19,6 +20,8 @@ func endpointMatchesStackType(ep portainer.Endpoint, stackType portainer.StackTy
|
||||
return len(ep.Snapshots) > 0 && ep.Snapshots[0].Swarm
|
||||
case portainer.DockerComposeStack:
|
||||
return len(ep.Snapshots) == 0 || !ep.Snapshots[0].Swarm
|
||||
case portainer.KubernetesStack:
|
||||
return endpointutils.IsKubernetesEndpoint(&ep)
|
||||
default:
|
||||
return true
|
||||
}
|
||||
|
||||
@@ -33,7 +33,7 @@ func TestWorkflowsList_RBAC_NonAdminNoAccess(t *testing.T) {
|
||||
GitConfig: gitConfig("https://github.com/x/no-rc"),
|
||||
}))
|
||||
|
||||
h := NewHandler(store, nil)
|
||||
h := NewHandler(store, nil, nil)
|
||||
rr := httptest.NewRecorder()
|
||||
h.ServeHTTP(rr, buildWorkflowsReq(t, 1, portainer.StandardUserRole, ""))
|
||||
|
||||
@@ -70,7 +70,7 @@ func TestWorkflowsList_RBAC_NonAdminWithAccess(t *testing.T) {
|
||||
},
|
||||
}))
|
||||
|
||||
h := NewHandler(store, nil)
|
||||
h := NewHandler(store, nil, nil)
|
||||
rr := httptest.NewRecorder()
|
||||
h.ServeHTTP(rr, buildWorkflowsReq(t, 1, portainer.StandardUserRole, ""))
|
||||
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
gocache "github.com/patrickmn/go-cache"
|
||||
portainer "github.com/portainer/portainer/api"
|
||||
"github.com/portainer/portainer/api/dataservices"
|
||||
"github.com/portainer/portainer/api/kubernetes/cli"
|
||||
httperror "github.com/portainer/portainer/pkg/libhttp/error"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
@@ -22,14 +23,16 @@ type Handler struct {
|
||||
dataStore dataservices.DataStore
|
||||
gitService portainer.GitService
|
||||
cache *gocache.Cache
|
||||
k8sFactory *cli.ClientFactory
|
||||
}
|
||||
|
||||
func NewHandler(dataStore dataservices.DataStore, gitService portainer.GitService) *Handler {
|
||||
func NewHandler(dataStore dataservices.DataStore, gitService portainer.GitService, k8sFactory *cli.ClientFactory) *Handler {
|
||||
h := &Handler{
|
||||
Router: mux.NewRouter(),
|
||||
dataStore: dataStore,
|
||||
gitService: gitService,
|
||||
cache: gocache.New(cacheTTL, cacheCleanupInterval),
|
||||
k8sFactory: k8sFactory,
|
||||
}
|
||||
|
||||
h.Handle("/gitops/workflows", httperror.LoggerHandler(h.list)).Methods(http.MethodGet)
|
||||
|
||||
@@ -12,8 +12,11 @@ import (
|
||||
portainer "github.com/portainer/portainer/api"
|
||||
"github.com/portainer/portainer/api/dataservices"
|
||||
svc "github.com/portainer/portainer/api/gitops/workflows"
|
||||
"github.com/portainer/portainer/api/http/models/kubernetes"
|
||||
"github.com/portainer/portainer/api/http/security"
|
||||
"github.com/portainer/portainer/api/http/utils/filters"
|
||||
"github.com/portainer/portainer/api/internal/endpointutils"
|
||||
"github.com/portainer/portainer/api/kubernetes/cli"
|
||||
"github.com/portainer/portainer/api/set"
|
||||
"github.com/portainer/portainer/api/slicesx"
|
||||
httperror "github.com/portainer/portainer/pkg/libhttp/error"
|
||||
@@ -160,17 +163,17 @@ func (h *Handler) fetchWorkflows(ctx context.Context, sc *security.RestrictedReq
|
||||
for i := range stacks {
|
||||
s := stacks[i]
|
||||
|
||||
// TODO show kube stacks when there's a kube stacks view [BE-12867]
|
||||
if s.Type == portainer.KubernetesStack {
|
||||
continue
|
||||
}
|
||||
|
||||
if ep, ok := endpointMap[s.EndpointID]; ok && !endpointMatchesStackType(ep, s.Type) {
|
||||
continue
|
||||
}
|
||||
entries = append(entries, s)
|
||||
}
|
||||
|
||||
entries, err = filterK8SStacks(entries, endpointMap, h.k8sFactory, sc.UserID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
@@ -186,6 +189,58 @@ func (h *Handler) fetchWorkflows(ctx context.Context, sc *security.RestrictedReq
|
||||
return items, nil
|
||||
}
|
||||
|
||||
// lookup only if env is kube and either not edge or (edge + not async)
|
||||
func shouldPerformEnvLookup(endpoint *portainer.Endpoint) bool {
|
||||
return endpointutils.IsKubernetesEndpoint(endpoint) &&
|
||||
(!endpointutils.IsEdgeEndpoint(endpoint) ||
|
||||
(endpointutils.IsEdgeEndpoint(endpoint) && !endpoint.Edge.AsyncMode))
|
||||
}
|
||||
|
||||
func filterK8SStacks(items []portainer.Stack, endpointMap map[portainer.EndpointID]portainer.Endpoint, k8sFactory *cli.ClientFactory, userId portainer.UserID) ([]portainer.Stack, error) {
|
||||
k8sStacks, result := slicesx.Partition(items, func(s portainer.Stack) bool {
|
||||
return s.Type == portainer.KubernetesStack
|
||||
})
|
||||
|
||||
groupedByEnvId := slicesx.GroupBy(k8sStacks, func(s portainer.Stack) portainer.EndpointID {
|
||||
return s.EndpointID
|
||||
})
|
||||
|
||||
for envID, stacks := range groupedByEnvId {
|
||||
ep, ok := endpointMap[envID]
|
||||
if !ok || !shouldPerformEnvLookup(&ep) {
|
||||
continue
|
||||
}
|
||||
|
||||
kcl, err := k8sFactory.GetPrivilegedUserKubeClient(&ep, userId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
apps, err := kcl.GetApplications("", "")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, s := range stacks {
|
||||
idx := slices.IndexFunc(apps, func(app kubernetes.K8sApplication) bool {
|
||||
return app.StackKind != "edge" && app.StackID == strconv.Itoa(int(s.ID))
|
||||
})
|
||||
if idx == -1 {
|
||||
// if we don't find a matching application (deployment/statefulset/daemonset) in the environment workloads
|
||||
// this workflow (stack) wouldn't show in the Applications list, so we don't keep it
|
||||
continue
|
||||
}
|
||||
|
||||
app := apps[idx]
|
||||
|
||||
s.Name = app.Name
|
||||
s.Namespace = app.ResourcePool
|
||||
|
||||
result = append(result, s)
|
||||
}
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func cacheKey(sc *security.RestrictedRequestContext, endpointIDs []portainer.EndpointID) string {
|
||||
ids := make([]string, len(endpointIDs))
|
||||
for i, id := range endpointIDs {
|
||||
|
||||
@@ -32,7 +32,7 @@ func TestWorkflowsList_GitConfigFilter(t *testing.T) {
|
||||
return nil
|
||||
}))
|
||||
|
||||
h := NewHandler(store, nil)
|
||||
h := NewHandler(store, nil, nil)
|
||||
rr := httptest.NewRecorder()
|
||||
h.ServeHTTP(rr, buildWorkflowsReq(t, 1, portainer.AdministratorRole, ""))
|
||||
|
||||
@@ -61,7 +61,7 @@ func TestWorkflowsList_EndpointIDsFilter(t *testing.T) {
|
||||
return nil
|
||||
}))
|
||||
|
||||
h := NewHandler(store, nil)
|
||||
h := NewHandler(store, nil, nil)
|
||||
rr := httptest.NewRecorder()
|
||||
h.ServeHTTP(rr, buildWorkflowsReq(t, 1, portainer.AdministratorRole, "endpointIds[]=1&endpointIds[]=2"))
|
||||
|
||||
@@ -89,7 +89,7 @@ func TestWorkflowsList_Pagination(t *testing.T) {
|
||||
return nil
|
||||
}))
|
||||
|
||||
h := NewHandler(store, nil)
|
||||
h := NewHandler(store, nil, nil)
|
||||
rr := httptest.NewRecorder()
|
||||
h.ServeHTTP(rr, buildWorkflowsReq(t, 1, portainer.AdministratorRole, "start=0&limit=2"))
|
||||
|
||||
@@ -114,7 +114,7 @@ func TestWorkflowsList_Search(t *testing.T) {
|
||||
return nil
|
||||
}))
|
||||
|
||||
h := NewHandler(store, nil)
|
||||
h := NewHandler(store, nil, nil)
|
||||
rr := httptest.NewRecorder()
|
||||
h.ServeHTTP(rr, buildWorkflowsReq(t, 1, portainer.AdministratorRole, "search=alpha"))
|
||||
|
||||
@@ -141,7 +141,7 @@ func TestWorkflowsList_SearchByURL(t *testing.T) {
|
||||
return nil
|
||||
}))
|
||||
|
||||
h := NewHandler(store, nil)
|
||||
h := NewHandler(store, nil, nil)
|
||||
rr := httptest.NewRecorder()
|
||||
h.ServeHTTP(rr, buildWorkflowsReq(t, 1, portainer.AdministratorRole, "search=org1"))
|
||||
|
||||
@@ -166,7 +166,7 @@ func TestWorkflowsList_Sort(t *testing.T) {
|
||||
return nil
|
||||
}))
|
||||
|
||||
h := NewHandler(store, nil)
|
||||
h := NewHandler(store, nil, nil)
|
||||
rr := httptest.NewRecorder()
|
||||
h.ServeHTTP(rr, buildWorkflowsReq(t, 1, portainer.AdministratorRole, "sort=name&order=desc"))
|
||||
|
||||
@@ -199,7 +199,7 @@ func TestWorkflowsList_Cache(t *testing.T) {
|
||||
|
||||
// Create the handler outside the bubble so the go-cache cleanup goroutine
|
||||
// is not part of the bubble and does not block synctest.Test from returning.
|
||||
h := NewHandler(store, nil)
|
||||
h := NewHandler(store, nil, nil)
|
||||
|
||||
synctest.Test(t, func(t *testing.T) {
|
||||
// First request at fake T=0: populates cache.
|
||||
@@ -248,7 +248,7 @@ func TestWorkflowsList_CacheImmutableAfterSort(t *testing.T) {
|
||||
}))
|
||||
}
|
||||
|
||||
h := NewHandler(store, nil)
|
||||
h := NewHandler(store, nil, nil)
|
||||
|
||||
// First request: no sort — cache miss, populates cache as [alpha, beta, gamma].
|
||||
rr := httptest.NewRecorder()
|
||||
@@ -290,7 +290,7 @@ func TestWorkflowsList_CacheSeparateKeys(t *testing.T) {
|
||||
return nil
|
||||
}))
|
||||
|
||||
h := NewHandler(store, nil)
|
||||
h := NewHandler(store, nil, nil)
|
||||
|
||||
rr1 := httptest.NewRecorder()
|
||||
h.ServeHTTP(rr1, buildWorkflowsReq(t, 1, portainer.AdministratorRole, "endpointIds[]=1"))
|
||||
@@ -323,7 +323,7 @@ func TestWorkflowsList_StatusFilter(t *testing.T) {
|
||||
return nil
|
||||
}))
|
||||
|
||||
h := NewHandler(store, nil)
|
||||
h := NewHandler(store, nil, nil)
|
||||
|
||||
t.Run("status=healthy returns only healthy workflows", func(t *testing.T) {
|
||||
rr := httptest.NewRecorder()
|
||||
@@ -346,7 +346,7 @@ func TestWorkflowsList_InvalidFilterParams(t *testing.T) {
|
||||
t.Parallel()
|
||||
_, store := datastore.MustNewTestStore(t, false, true)
|
||||
require.NoError(t, store.User().Create(&portainer.User{ID: 1, Role: portainer.AdministratorRole}))
|
||||
h := NewHandler(store, nil)
|
||||
h := NewHandler(store, nil, nil)
|
||||
|
||||
for _, query := range []string{"status=garbage", "type=garbage", "platform=garbage"} {
|
||||
t.Run(query, func(t *testing.T) {
|
||||
@@ -373,7 +373,7 @@ func TestWorkflowsList_RedactsCredentials(t *testing.T) {
|
||||
return nil
|
||||
}))
|
||||
|
||||
h := NewHandler(store, nil)
|
||||
h := NewHandler(store, nil, nil)
|
||||
rr := httptest.NewRecorder()
|
||||
h.ServeHTTP(rr, buildWorkflowsReq(t, 1, portainer.AdministratorRole, ""))
|
||||
|
||||
|
||||
@@ -72,7 +72,7 @@ func TestWorkflowsList_StackStatusDerivation(t *testing.T) {
|
||||
return nil
|
||||
}))
|
||||
|
||||
h := NewHandler(store, nil)
|
||||
h := NewHandler(store, nil, nil)
|
||||
rr := httptest.NewRecorder()
|
||||
h.ServeHTTP(rr, buildWorkflowsReq(t, 1, portainer.AdministratorRole, ""))
|
||||
|
||||
|
||||
@@ -2,7 +2,6 @@ package kubernetes
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"strconv"
|
||||
|
||||
"github.com/portainer/portainer/api/http/middlewares"
|
||||
"github.com/portainer/portainer/api/http/security"
|
||||
@@ -33,7 +32,7 @@ func (handler *Handler) prepareKubeClient(r *http.Request) (*cli.KubeClient, *ht
|
||||
return nil, httperror.InternalServerError("Unable to retrieve token data associated to the request.", err)
|
||||
}
|
||||
|
||||
pcli, err := handler.KubernetesClientFactory.GetPrivilegedUserKubeClient(endpoint, strconv.Itoa(int(tokenData.ID)))
|
||||
pcli, err := handler.KubernetesClientFactory.GetPrivilegedUserKubeClient(endpoint, tokenData.ID)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Str("context", "prepareKubeClient").Msg("Unable to get a privileged Kubernetes client for the user.")
|
||||
return nil, httperror.InternalServerError("Unable to get a privileged Kubernetes client for the user.", err)
|
||||
|
||||
+1
-1
@@ -208,7 +208,7 @@ func (server *Server) Start(ctx context.Context) error {
|
||||
|
||||
var endpointHelmHandler = helm.NewHandler(requestBouncer, server.DataStore, server.JWTService, server.KubernetesDeployer, server.HelmPackageManager, server.KubeClusterAccessService)
|
||||
|
||||
var gitOperationHandler = gitops.NewHandler(requestBouncer, server.DataStore, server.GitService, server.FileService)
|
||||
var gitOperationHandler = gitops.NewHandler(requestBouncer, server.DataStore, server.GitService, server.FileService, server.KubernetesClientFactory)
|
||||
|
||||
var helmTemplatesHandler = helm.NewTemplateHandler(requestBouncer, server.HelmPackageManager)
|
||||
|
||||
|
||||
@@ -125,8 +125,8 @@ func (factory *ClientFactory) GetPrivilegedKubeClient(endpoint *portainer.Endpoi
|
||||
|
||||
// GetPrivilegedUserKubeClient checks if an existing admin client is already registered for the environment(endpoint) and user and returns it if one is found.
|
||||
// If no client is registered, it will create a new client, register it, and returns it.
|
||||
func (factory *ClientFactory) GetPrivilegedUserKubeClient(endpoint *portainer.Endpoint, userID string) (*KubeClient, error) {
|
||||
key := strconv.Itoa(int(endpoint.ID)) + ".admin." + userID
|
||||
func (factory *ClientFactory) GetPrivilegedUserKubeClient(endpoint *portainer.Endpoint, userID portainer.UserID) (*KubeClient, error) {
|
||||
key := strconv.Itoa(int(endpoint.ID)) + ".admin." + strconv.Itoa(int(userID))
|
||||
pcl, ok := factory.endpointProxyClients.Get(key)
|
||||
if ok {
|
||||
return pcl.(*KubeClient), nil
|
||||
|
||||
@@ -15,7 +15,7 @@ func Filter[T any](input []T, predicate func(T) bool) []T {
|
||||
return result
|
||||
}
|
||||
|
||||
// Filter in place all elements from input that predicate returns truthy for and returns an array of the removed elements.
|
||||
// Filter in place all elements from input that predicate returns truthy for.
|
||||
//
|
||||
// Note: Unlike `Filter`, this method mutates input.
|
||||
func FilterInPlace[T any](input []T, predicate func(T) bool) []T {
|
||||
|
||||
@@ -0,0 +1,10 @@
|
||||
package slicesx
|
||||
|
||||
func GroupBy[A any, K comparable](input []A, f func(A) K) map[K][]A {
|
||||
result := make(map[K][]A)
|
||||
for _, v := range input {
|
||||
key := f(v)
|
||||
result[key] = append(result[key], v)
|
||||
}
|
||||
return result
|
||||
}
|
||||
@@ -0,0 +1,21 @@
|
||||
package slicesx_test
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/portainer/portainer/api/slicesx"
|
||||
)
|
||||
|
||||
func TestGroupBy(t *testing.T) {
|
||||
t.Parallel()
|
||||
input := []string{"apple", "banana", "cherry", "date", "elderberry"}
|
||||
f := func(a string) int {
|
||||
return len(a)
|
||||
}
|
||||
expected := map[int][]string{5: {"apple"}, 6: {"banana", "cherry"}, 4: {"date"}, 10: {"elderberry"}}
|
||||
result := slicesx.GroupBy(input, f)
|
||||
if !reflect.DeepEqual(expected, result) {
|
||||
t.Errorf("Expected %v, got %v", expected, result)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,19 @@
|
||||
package slicesx
|
||||
|
||||
// Split elements in two slices.
|
||||
// The first contains elements predicate returns truthy for
|
||||
// The second contains elements predicate returns falsey for
|
||||
// The predicate is invoked with one argument: (value).
|
||||
func Partition[T any](input []T, predicate func(T) bool) ([]T, []T) {
|
||||
truthy := make([]T, 0)
|
||||
falsey := make([]T, 0)
|
||||
|
||||
for i := range input {
|
||||
if predicate(input[i]) {
|
||||
truthy = append(truthy, input[i])
|
||||
} else {
|
||||
falsey = append(falsey, input[i])
|
||||
}
|
||||
}
|
||||
return truthy, falsey
|
||||
}
|
||||
@@ -0,0 +1,32 @@
|
||||
package slicesx_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/portainer/portainer/api/slicesx"
|
||||
)
|
||||
|
||||
func partition[T any](input []T, predicate func(T) bool) [2][]T {
|
||||
left, right := slicesx.Partition(input, predicate)
|
||||
return [2][]T{left, right}
|
||||
}
|
||||
|
||||
func Test_Partition(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
test(t, partition, "Partition even and odd",
|
||||
[]int{1, 2, 3, 4, 5, 6, 7, 8, 9},
|
||||
[2][]int{{2, 4, 6, 8}, {1, 3, 5, 7, 9}},
|
||||
func(x int) bool { return x%2 == 0 },
|
||||
)
|
||||
test(t, partition, "Partition strings starting with 'A'",
|
||||
[]string{"Apple", "Banana", "Avocado", "Grapes", "Apricot"},
|
||||
[2][]string{{"Apple", "Avocado", "Apricot"}, {"Banana", "Grapes"}},
|
||||
func(s string) bool { return s[0] == 'A' },
|
||||
)
|
||||
test(t, partition, "Partition strings longer than 5 chars",
|
||||
[]string{"Apple", "Banana", "Avocado", "Grapes", "Apricot"},
|
||||
[2][]string{{"Banana", "Avocado", "Grapes", "Apricot"}, {"Apple"}},
|
||||
func(s string) bool { return len(s) > 5 },
|
||||
)
|
||||
}
|
||||
@@ -116,7 +116,7 @@ export function WorkflowsView() {
|
||||
groups={groups}
|
||||
totalCount={totalCount}
|
||||
isLoading={workflowsQuery.isLoading}
|
||||
getItemKey={(item) => item.id}
|
||||
getItemKey={(item) => `${item.type}-${item.id}`}
|
||||
showGroupHeaders
|
||||
emptyMessage="No workflows found"
|
||||
searchPlaceholder="Search"
|
||||
|
||||
Reference in New Issue
Block a user