fix(alerting) Use prometheus scrape manager [R8S-940] (#2198)

This commit is contained in:
RHCowan
2026-04-01 14:41:27 +13:00
committed by Robbie Cowan
parent 7f2da7811c
commit ba11fe920b
7 changed files with 511 additions and 12 deletions
+41
View File
@@ -0,0 +1,41 @@
package libhttp
import (
"net"
"net/http"
)
// IsLocalRequest returns true when the request originates from the local host.
// It accepts both loopback requests and self-dials to the listener's bound IP.
func IsLocalRequest(r *http.Request) bool {
remoteIP := ParseRequestIP(r.RemoteAddr)
if remoteIP == nil {
return false
}
if remoteIP.IsLoopback() {
return true
}
localAddr, ok := r.Context().Value(http.LocalAddrContextKey).(net.Addr)
if !ok {
return false
}
localIP := ParseRequestIP(localAddr.String())
if localIP == nil {
return false
}
return remoteIP.Equal(localIP)
}
// ParseRequestIP extracts the IP address from a host:port address string.
func ParseRequestIP(addr string) net.IP {
host, _, err := net.SplitHostPort(addr)
if err != nil {
return nil
}
return net.ParseIP(host)
}
+34
View File
@@ -0,0 +1,34 @@
package libhttp
import (
"context"
"net"
"net/http"
"net/http/httptest"
"testing"
"github.com/stretchr/testify/assert"
)
func TestIsLocalRequestAllowsLoopback(t *testing.T) {
r := httptest.NewRequest(http.MethodGet, "/api/metrics", nil)
r.RemoteAddr = "127.0.0.1:43210"
assert.True(t, IsLocalRequest(r))
}
func TestIsLocalRequestAllowsSelfDialToBoundAddress(t *testing.T) {
r := httptest.NewRequest(http.MethodGet, "/api/metrics", nil)
r.RemoteAddr = "10.0.0.5:43210"
r = r.WithContext(context.WithValue(r.Context(), http.LocalAddrContextKey, &net.TCPAddr{IP: net.ParseIP("10.0.0.5"), Port: 9001}))
assert.True(t, IsLocalRequest(r))
}
func TestIsLocalRequestRejectsRemotePeer(t *testing.T) {
r := httptest.NewRequest(http.MethodGet, "/api/metrics", nil)
r.RemoteAddr = "10.0.0.6:43210"
r = r.WithContext(context.WithValue(r.Context(), http.LocalAddrContextKey, &net.TCPAddr{IP: net.ParseIP("10.0.0.5"), Port: 9001}))
assert.False(t, IsLocalRequest(r))
}
+127 -12
View File
@@ -2,8 +2,10 @@ package libprometheus
import (
"fmt"
"net/url"
"os"
"path/filepath"
"sort"
"strings"
)
@@ -15,23 +17,136 @@ global:
scrape_configs:
- job_name: '%s'
static_configs:
scheme: '%s'
metrics_path: '%s'
%s static_configs:
- targets: ['%s']
%s
`
// WritePrometheusConfig writes a Prometheus config.yaml to dataDir.
// jobName identifies the scrape job (e.g. "portainer" or "edge-agent").
// target is the scrape target address.
func WritePrometheusConfig(dataDir string, scrapeInterval string, jobName string, target string) error {
configDir := filepath.Join(dataDir, "prometheus")
if err := os.MkdirAll(configDir, 0o750); err != nil {
const alertingTemplate = `alerting:
alertmanagers:
- scheme: '%s'
path_prefix: '%s'
api_version: 'v2'
static_configs:
- targets: ['%s']
%s
`
type PrometheusConfigOptions struct {
ScrapeInterval string
JobName string
ScrapeTarget string
AlertmanagerTarget string
AlertmanagerHeaders map[string]string
InsecureSkipVerify bool
}
// WritePrometheusConfig writes a Prometheus config.yaml to dataDir/config.yaml.
// ScrapeTarget is a full scrape URL (e.g. "http://localhost:9001/api/metrics").
// AlertmanagerTarget is an optional base URL used by Prometheus notifier
// (e.g. "https://portainer.example/api/endpoints/1/edge/alerts").
func WritePrometheusConfig(dataDir string, options PrometheusConfigOptions) error {
if err := os.MkdirAll(dataDir, 0o750); err != nil {
return fmt.Errorf("create prometheus config dir: %w", err)
}
// Escape single quotes to prevent YAML injection in interpolated values.
escapeYAMLSingleQuote := func(s string) string { return strings.ReplaceAll(s, "'", "''") }
content := fmt.Sprintf(configTemplate, scrapeInterval, scrapeInterval, escapeYAMLSingleQuote(jobName), escapeYAMLSingleQuote(target))
configPath := filepath.Join(configDir, "config.yaml")
scheme, metricsPath, hostPort, err := parseTarget(options.ScrapeTarget, "/metrics")
if err != nil {
return fmt.Errorf("parse scrape target: %w", err)
}
return os.WriteFile(configPath, []byte(content), 0o600)
escape := func(s string) string { return strings.ReplaceAll(s, "'", "''") }
alertingSection, err := renderAlertingSection(options.AlertmanagerTarget, options.AlertmanagerHeaders, options.InsecureSkipVerify, escape)
if err != nil {
return err
}
scrapeTLSBlock := ""
if scheme == "https" && options.InsecureSkipVerify {
scrapeTLSBlock = " tls_config:\n insecure_skip_verify: true\n"
}
content := fmt.Sprintf(configTemplate,
options.ScrapeInterval, options.ScrapeInterval,
escape(options.JobName),
escape(scheme),
escape(metricsPath),
scrapeTLSBlock,
escape(hostPort),
alertingSection,
)
return os.WriteFile(filepath.Join(dataDir, "config.yaml"), []byte(content), 0o600)
}
// parseTarget splits a full URL into its scheme, path, and host:port components
// for use in a Prometheus static_configs entry.
func parseTarget(target string, defaultPath string) (scheme, parsedPath, hostPort string, err error) {
u, err := url.Parse(target)
if err != nil {
return "", "", "", err
}
scheme = u.Scheme
if scheme == "" {
scheme = "http"
}
parsedPath = u.Path
if parsedPath == "" {
parsedPath = defaultPath
}
hostPort = u.Host
if hostPort == "" {
hostPort = target // fallback: treat the whole string as host:port
}
return scheme, parsedPath, hostPort, nil
}
func renderAlertingSection(target string, headers map[string]string, insecureSkipVerify bool, escape func(string) string) (string, error) {
if target == "" {
return "", nil
}
scheme, pathPrefix, hostPort, err := parseTarget(target, "")
if err != nil {
return "", fmt.Errorf("parse alertmanager target: %w", err)
}
tlsBlock := ""
if insecureSkipVerify {
tlsBlock = " tls_config:\n insecure_skip_verify: true\n"
}
headersBlock := renderHTTPHeaders(headers, escape, " ")
return fmt.Sprintf(alertingTemplate,
escape(scheme),
escape(pathPrefix),
escape(hostPort),
tlsBlock+headersBlock,
), nil
}
func renderHTTPHeaders(headers map[string]string, escape func(string) string, indent string) string {
if len(headers) == 0 {
return ""
}
keys := make([]string, 0, len(headers))
for name := range headers {
keys = append(keys, name)
}
sort.Strings(keys)
var builder strings.Builder
_, _ = fmt.Fprintf(&builder, "%shttp_headers:\n", indent)
for _, name := range keys {
_, _ = fmt.Fprintf(&builder, "%s %s:\n", indent, name)
_, _ = fmt.Fprintf(&builder, "%s values: ['%s']\n", indent, escape(headers[name]))
}
return builder.String()
}
+123
View File
@@ -0,0 +1,123 @@
package libprometheus_test
import (
"testing"
"time"
libprom "github.com/portainer/portainer/pkg/libprometheus"
prometheusreg "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestWritePrometheusConfigIncludesAlertmanagerSettings(t *testing.T) {
dataDir := t.TempDir()
err := libprom.WritePrometheusConfig(dataDir, libprom.PrometheusConfigOptions{
ScrapeInterval: "60s",
JobName: "edge-agent",
ScrapeTarget: "http://localhost:9001/api/metrics",
AlertmanagerTarget: "https://portainer.example/api/endpoints/7/edge/alerts",
AlertmanagerHeaders: map[string]string{
"X-PortainerAgent-EdgeID": "edge-id-7",
},
})
require.NoError(t, err)
cfg, err := libprom.LoadPrometheusConfig(dataDir)
require.NoError(t, err)
require.Len(t, cfg.ScrapeConfigs, 1)
assert.Equal(t, "edge-agent", cfg.ScrapeConfigs[0].JobName)
assert.Equal(t, model.Duration(60*time.Second), cfg.GlobalConfig.ScrapeInterval)
require.Len(t, cfg.AlertingConfig.AlertmanagerConfigs, 1)
amCfg := cfg.AlertingConfig.AlertmanagerConfigs[0]
assert.Equal(t, "https", amCfg.Scheme)
assert.Equal(t, "/api/endpoints/7/edge/alerts", amCfg.PathPrefix)
require.NotNil(t, amCfg.HTTPClientConfig.HTTPHeaders)
require.Contains(t, amCfg.HTTPClientConfig.HTTPHeaders.Headers, "X-PortainerAgent-EdgeID")
assert.Equal(t, []string{"edge-id-7"}, amCfg.HTTPClientConfig.HTTPHeaders.Headers["X-PortainerAgent-EdgeID"].Values)
notifierMgr, tsets, err := libprom.NewNotifierManagerFromConfig(cfg, prometheusreg.NewRegistry())
require.NoError(t, err)
require.NotNil(t, notifierMgr)
tset := <-tsets
require.Contains(t, tset, "config-0")
require.Len(t, tset["config-0"], 1)
require.Len(t, tset["config-0"][0].Targets, 1)
assert.Equal(t, model.LabelValue("portainer.example"), tset["config-0"][0].Targets[0][model.AddressLabel])
}
func TestWritePrometheusConfigInsecureSkipVerify(t *testing.T) {
dataDir := t.TempDir()
err := libprom.WritePrometheusConfig(dataDir, libprom.PrometheusConfigOptions{
ScrapeInterval: "60s",
JobName: "edge-agent",
ScrapeTarget: "http://10.244.0.15:9001/api/metrics",
AlertmanagerTarget: "https://192.168.68.52:9443/api/endpoints/9/edge/alerts",
InsecureSkipVerify: true,
})
require.NoError(t, err)
cfg, err := libprom.LoadPrometheusConfig(dataDir)
require.NoError(t, err)
require.Len(t, cfg.AlertingConfig.AlertmanagerConfigs, 1)
amCfg := cfg.AlertingConfig.AlertmanagerConfigs[0]
assert.True(t, amCfg.HTTPClientConfig.TLSConfig.InsecureSkipVerify)
}
func TestWritePrometheusConfigHTTPSScrapeWithInsecureSkipVerify(t *testing.T) {
dataDir := t.TempDir()
err := libprom.WritePrometheusConfig(dataDir, libprom.PrometheusConfigOptions{
ScrapeInterval: "15s",
JobName: "portainer",
ScrapeTarget: "https://127.0.0.1:9443/api/metrics",
AlertmanagerTarget: "http://127.0.0.1:9093",
InsecureSkipVerify: true,
})
require.NoError(t, err)
cfg, err := libprom.LoadPrometheusConfig(dataDir)
require.NoError(t, err)
require.Len(t, cfg.ScrapeConfigs, 1)
scrapeCfg := cfg.ScrapeConfigs[0]
assert.Equal(t, "https", scrapeCfg.Scheme)
assert.True(t, scrapeCfg.HTTPClientConfig.TLSConfig.InsecureSkipVerify)
}
func TestBootstrapManagerSetBuildsInitialScrapeAndNotifierTargets(t *testing.T) {
dataDir := t.TempDir()
reg := prometheusreg.NewRegistry()
db, err := libprom.NewInMemoryTSDB(reg)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, db.Close()) })
managers, err := libprom.BootstrapManagerSet(dataDir, libprom.PrometheusConfigOptions{
ScrapeInterval: "60s",
JobName: "edge-agent",
ScrapeTarget: "http://localhost:9001/api/metrics",
AlertmanagerTarget: "https://portainer.example/api/endpoints/7/edge/alerts",
}, db, reg)
require.NoError(t, err)
require.NotNil(t, managers.ScrapeManager)
require.NotNil(t, managers.NotifierManager)
scrapeTset := <-managers.ScrapeTargetSets
require.Contains(t, scrapeTset, "edge-agent")
require.Len(t, scrapeTset["edge-agent"], 1)
require.Len(t, scrapeTset["edge-agent"][0].Targets, 1)
assert.Equal(t, model.LabelValue("localhost:9001"), scrapeTset["edge-agent"][0].Targets[0][model.AddressLabel])
notifyTset := <-managers.NotifierTargetSets
require.Contains(t, notifyTset, "config-0")
require.Len(t, notifyTset["config-0"], 1)
require.Len(t, notifyTset["config-0"][0].Targets, 1)
assert.Equal(t, model.LabelValue("portainer.example"), notifyTset["config-0"][0].Targets[0][model.AddressLabel])
}
+56
View File
@@ -0,0 +1,56 @@
package libprometheus
import (
"fmt"
prometheusreg "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/notifier"
"github.com/prometheus/prometheus/scrape"
"github.com/prometheus/prometheus/storage"
)
// ManagerSet bundles the Prometheus managers and initial target sets built
// from config.yaml.
type ManagerSet struct {
ScrapeManager *scrape.Manager
ScrapeTargetSets chan map[string][]*targetgroup.Group
NotifierManager *notifier.Manager
NotifierTargetSets chan map[string][]*targetgroup.Group
}
// BootstrapManagerSet writes config.yaml, loads it back, and builds the scrape
// and notifier managers from that file so config.yaml remains the single source
// of truth for runtime configuration.
func BootstrapManagerSet(
dataDir string,
options PrometheusConfigOptions,
appendable storage.Appendable,
reg prometheusreg.Registerer,
) (*ManagerSet, error) {
if err := WritePrometheusConfig(dataDir, options); err != nil {
return nil, fmt.Errorf("write prometheus config: %w", err)
}
cfg, err := LoadPrometheusConfig(dataDir)
if err != nil {
return nil, fmt.Errorf("load prometheus config: %w", err)
}
scrapeManager, scrapeTargetSets, err := NewScrapeManagerFromConfig(cfg, appendable, reg)
if err != nil {
return nil, fmt.Errorf("create scrape manager: %w", err)
}
notifierManager, notifierTargetSets, err := NewNotifierManagerFromConfig(cfg, reg)
if err != nil {
return nil, fmt.Errorf("create notifier manager: %w", err)
}
return &ManagerSet{
ScrapeManager: scrapeManager,
ScrapeTargetSets: scrapeTargetSets,
NotifierManager: notifierManager,
NotifierTargetSets: notifierTargetSets,
}, nil
}
+42
View File
@@ -0,0 +1,42 @@
package libprometheus
import (
"fmt"
prometheusreg "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery"
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/notifier"
)
// NewNotifierManagerFromConfig creates a notifier.Manager from a loaded
// Prometheus config. It also returns a pre-populated target-sets channel ready
// to pass to Manager.Run() using the config's static alertmanager targets.
func NewNotifierManagerFromConfig(
cfg *config.Config,
reg prometheusreg.Registerer,
) (*notifier.Manager, chan map[string][]*targetgroup.Group, error) {
mgr := notifier.NewManager(
&notifier.Options{
QueueCapacity: 10000,
DrainOnShutdown: true,
Registerer: reg,
},
model.LegacyValidation,
NewZerologSlogger(),
)
if err := mgr.ApplyConfig(cfg); err != nil {
return nil, nil, fmt.Errorf("apply prometheus notifier config: %w", err)
}
alertmanagerConfigs := cfg.AlertingConfig.AlertmanagerConfigs.ToMap()
serviceDiscoveryConfigs := make(map[string]discovery.Configs, len(alertmanagerConfigs))
for key, amCfg := range alertmanagerConfigs {
serviceDiscoveryConfigs[key] = amCfg.ServiceDiscoveryConfigs
}
return mgr, newStaticTargetSetsChannel(serviceDiscoveryConfigs), nil
}
+88
View File
@@ -0,0 +1,88 @@
package libprometheus
import (
"fmt"
"path/filepath"
"time"
prometheusreg "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery"
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/scrape"
"github.com/prometheus/prometheus/storage"
)
// LoadPrometheusConfig reads and parses dataDir/config.yaml using the
// standard Prometheus config loader.
func LoadPrometheusConfig(dataDir string) (*config.Config, error) {
path := filepath.Join(dataDir, "config.yaml")
cfg, err := config.LoadFile(path, false, NewZerologSlogger())
if err != nil {
return nil, fmt.Errorf("load prometheus config %q: %w", path, err)
}
return cfg, nil
}
// NewScrapeManagerFromConfig creates a scrape.Manager from a loaded Prometheus
// config. It also returns a pre-populated target-sets channel ready to pass to
// Manager.Run() — targets are extracted from the config's static_configs so the
// file remains the single source of truth.
func NewScrapeManagerFromConfig(
cfg *config.Config,
appendable storage.Appendable,
reg prometheusreg.Registerer,
) (*scrape.Manager, chan map[string][]*targetgroup.Group, error) {
// DiscoveryReloadInterval defaults to 5s in Prometheus v0.310.0+.
// For a single static target this is just the delay before the first scrape;
// cap it at 1s so startup latency stays acceptable regardless of the
// configured scrape interval.
reloadInterval := time.Duration(cfg.GlobalConfig.ScrapeInterval)
if reloadInterval == 0 || reloadInterval > time.Second {
reloadInterval = time.Second
}
opts := &scrape.Options{
DiscoveryReloadInterval: model.Duration(reloadInterval),
}
mgr, err := scrape.NewManager(opts, NewZerologSlogger(), nil, appendable, nil, reg)
if err != nil {
return nil, nil, fmt.Errorf("create scrape manager: %w", err)
}
if err := mgr.ApplyConfig(cfg); err != nil {
return nil, nil, fmt.Errorf("apply prometheus config: %w", err)
}
serviceDiscoveryConfigs := make(map[string]discovery.Configs, len(cfg.ScrapeConfigs))
for _, sc := range cfg.ScrapeConfigs {
serviceDiscoveryConfigs[sc.JobName] = sc.ServiceDiscoveryConfigs
}
return mgr, newStaticTargetSetsChannel(serviceDiscoveryConfigs), nil
}
// newStaticTargetSetsChannel builds the initial target-set payload for manager
// startup from static discovery configs only.
func newStaticTargetSetsChannel(serviceDiscoveryConfigs map[string]discovery.Configs) chan map[string][]*targetgroup.Group {
tsets := make(map[string][]*targetgroup.Group, len(serviceDiscoveryConfigs))
for key, configs := range serviceDiscoveryConfigs {
tsets[key] = staticTargetGroups(configs)
}
ch := make(chan map[string][]*targetgroup.Group, 1)
ch <- tsets
return ch
}
func staticTargetGroups(serviceDiscoveryConfigs discovery.Configs) []*targetgroup.Group {
var groups []*targetgroup.Group
for _, sdCfg := range serviceDiscoveryConfigs {
if static, ok := sdCfg.(discovery.StaticConfig); ok {
groups = append(groups, static...)
}
}
return groups
}