mirror of
https://github.com/shizunge/endlessh-go.git
synced 2026-06-23 04:10:08 +00:00
clean metrics, remove series if the ip is not seen for a while.
This commit is contained in:
@@ -73,6 +73,8 @@ Usage of ./endlessh-go
|
||||
Path to the MaxMind DB file.
|
||||
-port value
|
||||
SSH listening port. You may provide multiple -port flags to listen to multiple ports. (default "2222")
|
||||
-prometheus_clean_unseen_seconds int
|
||||
Remove series if the IP is not seen for the given time. Set to 0 to disable. (default 0)
|
||||
-prometheus_entry string
|
||||
Entry point for prometheus (default "metrics")
|
||||
-prometheus_host string
|
||||
|
||||
@@ -121,6 +121,7 @@ func main() {
|
||||
prometheusHost := flag.String("prometheus_host", "0.0.0.0", "The address for prometheus")
|
||||
prometheusPort := flag.String("prometheus_port", "2112", "The port for prometheus")
|
||||
prometheusEntry := flag.String("prometheus_entry", "metrics", "Entry point for prometheus")
|
||||
prometheusCleanUnseenSeconds := flag.Int("prometheus_clean_unseen_seconds", 0, "Remove series if the IP is not seen for the given time. Set to 0 to disable. (default 0)")
|
||||
geoipSupplier := flag.String("geoip_supplier", "off", "Supplier to obtain Geohash of IPs. Possible values are \"off\", \"ip-api\", \"max-mind-db\"")
|
||||
maxMindDbFileName := flag.String("max_mind_db", "", "Path to the MaxMind DB file.")
|
||||
|
||||
@@ -137,10 +138,11 @@ func main() {
|
||||
metrics.InitPrometheus(*prometheusHost, *prometheusPort, *prometheusEntry)
|
||||
}
|
||||
|
||||
records := metrics.StartRecording(*maxClients, *prometheusEnabled, geoip.GeoOption{
|
||||
GeoipSupplier: *geoipSupplier,
|
||||
MaxMindDbFileName: *maxMindDbFileName,
|
||||
})
|
||||
records := metrics.StartRecording(*maxClients, *prometheusEnabled, *prometheusCleanUnseenSeconds,
|
||||
geoip.GeoOption{
|
||||
GeoipSupplier: *geoipSupplier,
|
||||
MaxMindDbFileName: *maxMindDbFileName,
|
||||
})
|
||||
clients := startSending(*maxClients, *bannerMaxLength, records)
|
||||
|
||||
interval := time.Duration(*intervalMs) * time.Millisecond
|
||||
@@ -155,6 +157,13 @@ func main() {
|
||||
startAccepting(*maxClients, *connType, *connHost, connPort, interval, clients, records)
|
||||
}
|
||||
for {
|
||||
time.Sleep(time.Duration(1<<63 - 1))
|
||||
if *prometheusCleanUnseenSeconds <= 0 {
|
||||
time.Sleep(time.Duration(1<<63 - 1))
|
||||
} else {
|
||||
time.Sleep(time.Second * time.Duration(60))
|
||||
records <- metrics.RecordEntry{
|
||||
RecordType: metrics.RecordEntryTypeClean,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
+22
-2
@@ -19,6 +19,8 @@ package metrics
|
||||
import (
|
||||
"endlessh-go/geoip"
|
||||
"net/http"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
@@ -26,6 +28,7 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
pq *UpdatablePriorityQueue
|
||||
totalClients *prometheus.CounterVec
|
||||
totalClientsClosed *prometheus.CounterVec
|
||||
totalBytes *prometheus.CounterVec
|
||||
@@ -35,6 +38,7 @@ var (
|
||||
)
|
||||
|
||||
func InitPrometheus(prometheusHost, prometheusPort, prometheusEntry string) {
|
||||
pq = NewUpdatablePriorityQueue()
|
||||
totalClients = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Name: "endlessh_client_open_count_total",
|
||||
@@ -84,7 +88,10 @@ func InitPrometheus(prometheusHost, prometheusPort, prometheusEntry string) {
|
||||
http.Handle("/"+prometheusEntry, handler)
|
||||
go func() {
|
||||
glog.Infof("Starting Prometheus on %v:%v, entry point is /%v", prometheusHost, prometheusPort, prometheusEntry)
|
||||
http.ListenAndServe(prometheusHost+":"+prometheusPort, nil)
|
||||
if err := http.ListenAndServe(prometheusHost+":"+prometheusPort, nil); err != nil {
|
||||
glog.Errorf("Error starting Prometheus at port %v:%v: %v", prometheusHost, prometheusPort, err)
|
||||
os.Exit(1)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
@@ -92,6 +99,7 @@ const (
|
||||
RecordEntryTypeStart = iota
|
||||
RecordEntryTypeSend = iota
|
||||
RecordEntryTypeStop = iota
|
||||
RecordEntryTypeClean = iota
|
||||
)
|
||||
|
||||
type RecordEntry struct {
|
||||
@@ -102,7 +110,7 @@ type RecordEntry struct {
|
||||
MillisecondsSpent int64
|
||||
}
|
||||
|
||||
func StartRecording(maxClients int64, prometheusEnabled bool, geoOption geoip.GeoOption) chan RecordEntry {
|
||||
func StartRecording(maxClients int64, prometheusEnabled bool, prometheusCleanUnseenSeconds int, geoOption geoip.GeoOption) chan RecordEntry {
|
||||
records := make(chan RecordEntry, maxClients)
|
||||
go func() {
|
||||
for {
|
||||
@@ -126,6 +134,7 @@ func StartRecording(maxClients int64, prometheusEnabled bool, geoOption geoip.Ge
|
||||
"country": country,
|
||||
"location": location}).Inc()
|
||||
totalClients.With(prometheus.Labels{"local_port": r.LocalPort}).Inc()
|
||||
pq.Update(r.IpAddr, time.Now())
|
||||
case RecordEntryTypeSend:
|
||||
secondsSpent := float64(r.MillisecondsSpent) / 1000
|
||||
clientSeconds.With(prometheus.Labels{
|
||||
@@ -133,8 +142,19 @@ func StartRecording(maxClients int64, prometheusEnabled bool, geoOption geoip.Ge
|
||||
"local_port": r.LocalPort}).Add(secondsSpent)
|
||||
totalBytes.With(prometheus.Labels{"local_port": r.LocalPort}).Add(float64(r.BytesSent))
|
||||
totalSeconds.With(prometheus.Labels{"local_port": r.LocalPort}).Add(secondsSpent)
|
||||
pq.Update(r.IpAddr, time.Now())
|
||||
case RecordEntryTypeStop:
|
||||
totalClientsClosed.With(prometheus.Labels{"local_port": r.LocalPort}).Inc()
|
||||
pq.Update(r.IpAddr, time.Now())
|
||||
case RecordEntryTypeClean:
|
||||
top := pq.Peek()
|
||||
deadline := time.Now().Add(-time.Second * time.Duration(prometheusCleanUnseenSeconds))
|
||||
for top != nil && top.Value.Before(deadline) {
|
||||
clientIP.DeletePartialMatch(prometheus.Labels{"ip": top.Key})
|
||||
clientSeconds.DeletePartialMatch(prometheus.Labels{"ip": top.Key})
|
||||
pq.Pop()
|
||||
top = pq.Peek()
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
@@ -0,0 +1,94 @@
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"container/heap"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Pair represents a key-value pair with a timestamp
|
||||
type Pair struct {
|
||||
Key string
|
||||
Value time.Time
|
||||
HeapIdx int // Index in the heap for efficient updates
|
||||
}
|
||||
|
||||
// PriorityQueue is a min-heap implementation for Pairs
|
||||
type PriorityQueue []*Pair
|
||||
|
||||
// Len returns the length of the priority queue
|
||||
func (pq PriorityQueue) Len() int { return len(pq) }
|
||||
|
||||
// Less compares two pairs based on their values (timestamps)
|
||||
func (pq PriorityQueue) Less(i, j int) bool {
|
||||
return pq[i].Value.Before(pq[j].Value)
|
||||
}
|
||||
|
||||
// Swap swaps two pairs in the priority queue
|
||||
func (pq PriorityQueue) Swap(i, j int) {
|
||||
pq[i], pq[j] = pq[j], pq[i]
|
||||
pq[i].HeapIdx = i
|
||||
pq[j].HeapIdx = j
|
||||
}
|
||||
|
||||
// Push adds a pair to the priority queue
|
||||
func (pq *PriorityQueue) Push(x interface{}) {
|
||||
pair := x.(*Pair)
|
||||
pair.HeapIdx = len(*pq)
|
||||
*pq = append(*pq, pair)
|
||||
}
|
||||
|
||||
// Pop removes the pair with the minimum value (timestamp) from the priority queue
|
||||
func (pq *PriorityQueue) Pop() interface{} {
|
||||
old := *pq
|
||||
n := len(old)
|
||||
pair := old[n-1]
|
||||
pair.HeapIdx = -1 // for safety
|
||||
*pq = old[0 : n-1]
|
||||
return pair
|
||||
}
|
||||
|
||||
// UpdatablePriorityQueue represents the data structure with the priority queue
|
||||
type UpdatablePriorityQueue struct {
|
||||
pq PriorityQueue
|
||||
keyMap map[string]*Pair
|
||||
}
|
||||
|
||||
// NewUpdatablePriorityQueue initializes a new UpdatablePriorityQueue
|
||||
func NewUpdatablePriorityQueue() *UpdatablePriorityQueue {
|
||||
return &UpdatablePriorityQueue{
|
||||
pq: make(PriorityQueue, 0),
|
||||
keyMap: make(map[string]*Pair),
|
||||
}
|
||||
}
|
||||
|
||||
// Update adds or updates a key-value pair in the data structure
|
||||
func (ds *UpdatablePriorityQueue) Update(key string, value time.Time) {
|
||||
if pair, ok := ds.keyMap[key]; ok {
|
||||
// Key exists, update the time
|
||||
pair.Value = value
|
||||
heap.Fix(&ds.pq, pair.HeapIdx)
|
||||
} else {
|
||||
// Key does not exist, create a new entry
|
||||
pair := &Pair{Key: key, Value: value}
|
||||
heap.Push(&ds.pq, pair)
|
||||
ds.keyMap[key] = pair
|
||||
}
|
||||
}
|
||||
|
||||
// Peek returns the entry with the minimal time
|
||||
func (ds *UpdatablePriorityQueue) Peek() *Pair {
|
||||
if ds.pq.Len() == 0 {
|
||||
return nil
|
||||
}
|
||||
return ds.pq[0]
|
||||
}
|
||||
|
||||
// Pop removes the entry with the minimal time
|
||||
func (ds *UpdatablePriorityQueue) Pop() *Pair {
|
||||
if ds.pq.Len() == 0 {
|
||||
return nil
|
||||
}
|
||||
pair := heap.Pop(&ds.pq).(*Pair)
|
||||
delete(ds.keyMap, pair.Key)
|
||||
return pair
|
||||
}
|
||||
Reference in New Issue
Block a user