Fix ghost connection goroutine leak via write deadline and TCP keepalive

Connections closed by the kernel but not detected by endlessh-go cause
goroutines to run indefinitely, drifting open/closed Prometheus counters.
This happens because conn.Write() succeeds on dead connections when the
kernel buffers data. Kernel 6.12 (Debian 13) is more aggressive at
buffering, making ghosts permanent rather than self-healing.

Changes:
- Add SetWriteDeadline before conn.Write to detect dead connections
- Enable TCP keepalive (30s) on accepted connections for kernel-level
  dead peer detection
- Add defer/recover in send goroutine for robustness
- Add -write_deadline_ms flag (default 30000, 0 to disable)

No new dependencies - uses only Go stdlib net package functions.
This commit is contained in:
darkwolf
2026-03-02 19:14:07 +01:00
parent 3fd26b15db
commit ecdfc514d0
2 changed files with 38 additions and 16 deletions
+26 -13
View File
@@ -41,28 +41,35 @@ func randStringBytes(n int64) []byte {
}
type Client struct {
conn net.Conn
next time.Time
start time.Time
last time.Time
interval time.Duration
bytesSent int
conn net.Conn
next time.Time
start time.Time
last time.Time
interval time.Duration
writeDeadline time.Duration
bytesSent int
}
func NewClient(conn net.Conn, interval time.Duration, maxClients int64) *Client {
func NewClient(conn net.Conn, interval time.Duration, maxClients int64, writeDeadline time.Duration) *Client {
for numCurrentClients >= maxClients {
time.Sleep(interval)
}
atomic.AddInt64(&numCurrentClients, 1)
// Enable TCP keepalive to detect dead peers at the kernel level.
if tcpConn, ok := conn.(*net.TCPConn); ok {
tcpConn.SetKeepAlive(true)
tcpConn.SetKeepAlivePeriod(30 * time.Second)
}
addr := conn.RemoteAddr().(*net.TCPAddr)
glog.V(1).Infof("ACCEPT host=%v port=%v n=%v/%v\n", addr.IP, addr.Port, numCurrentClients, maxClients)
return &Client{
conn: conn,
next: time.Now().Add(interval),
start: time.Now(),
last: time.Now(),
interval: interval,
bytesSent: 0,
conn: conn,
next: time.Now().Add(interval),
start: time.Now(),
last: time.Now(),
interval: interval,
writeDeadline: writeDeadline,
bytesSent: 0,
}
}
@@ -80,6 +87,12 @@ func (c *Client) Send(bannerMaxLength int64) (int, error) {
}
c.next = time.Now().Add(c.interval)
length := rand.Int63n(bannerMaxLength)
// Set a write deadline to detect dead connections where the kernel
// buffers data but the remote peer is gone. Without this, Write()
// can succeed indefinitely on dead connections, causing goroutine leaks.
if c.writeDeadline > 0 {
c.conn.SetWriteDeadline(time.Now().Add(c.writeDeadline))
}
bytesSent, err := c.conn.Write(randStringBytes(length))
if err != nil {
return 0, err
+12 -3
View File
@@ -41,6 +41,13 @@ func startSending(maxClients int64, bannerMaxLength int64, records chan<- metric
return
}
go func() {
// Recover from unexpected panics to ensure client cleanup.
defer func() {
if r := recover(); r != nil {
glog.Errorf("Recovered panic in send goroutine: %v", r)
c.Close()
}
}()
bytesSent, err := c.Send(bannerMaxLength)
remoteIpAddr := c.RemoteIpAddr()
localPort := c.LocalPort()
@@ -69,7 +76,7 @@ func startSending(maxClients int64, bannerMaxLength int64, records chan<- metric
return clients
}
func startAccepting(maxClients int64, connType, connHost, connPort string, interval time.Duration, clients chan<- *client.Client, records chan<- metrics.RecordEntry, proxyProtocolEnabled bool, proxyProtocolReadHeaderTimeout int) {
func startAccepting(maxClients int64, connType, connHost, connPort string, interval time.Duration, writeDeadline time.Duration, clients chan<- *client.Client, records chan<- metrics.RecordEntry, proxyProtocolEnabled bool, proxyProtocolReadHeaderTimeout int) {
go func() {
connPortInt, err := strconv.Atoi(connPort)
if err != nil {
@@ -102,7 +109,7 @@ func startAccepting(maxClients int64, connType, connHost, connPort string, inter
glog.Errorf("Error accepting connection from port %v: %v", connPort, err)
os.Exit(1)
}
c := client.NewClient(conn, interval, maxClients)
c := client.NewClient(conn, interval, maxClients, writeDeadline)
remoteIpAddr := c.RemoteIpAddr()
records <- metrics.RecordEntry{
RecordType: metrics.RecordEntryTypeStart,
@@ -143,6 +150,7 @@ func main() {
prometheusCleanUnseenSeconds := flag.Int("prometheus_clean_unseen_seconds", 0, "Remove series if the IP is not seen for the given time. Set to 0 to disable. (default 0)")
geoipSupplier := flag.String("geoip_supplier", "off", "Supplier to obtain Geohash of IPs. Possible values are \"off\", \"ip-api\", \"max-mind-db\"")
maxMindDbFileName := flag.String("max_mind_db", "", "Path to the MaxMind DB file.")
writeDeadlineMs := flag.Int("write_deadline_ms", 30000, "Write deadline in milliseconds for sending tarpit data. Detects dead connections where the kernel buffers data but the remote peer is gone. Set to 0 to disable. (default 30000)")
proxyProtocolEnabled := flag.Bool("proxy_protocol_enabled", false, "Enable PROXY protocol support. This causes the server to expect PROXY protocol headers on incoming connections.")
proxyProtocolReadHeaderTimeout := flag.Int("proxy_protocol_read_header_timeout_ms", 200, "Timeout for reading the PROXY protocol header in milliseconds. If the connection does not send a valid PROXY protocol header in this time, the header is ignored.")
@@ -167,6 +175,7 @@ func main() {
clients := startSending(*maxClients, *bannerMaxLength, records)
interval := time.Duration(*intervalMs) * time.Millisecond
writeDeadline := time.Duration(*writeDeadlineMs) * time.Millisecond
// Listen for incoming connections.
if *connType == "tcp6" && *connHost == "0.0.0.0" {
*connHost = "[::]"
@@ -175,7 +184,7 @@ func main() {
connPorts = append(connPorts, defaultPort)
}
for _, connPort := range connPorts {
startAccepting(*maxClients, *connType, *connHost, connPort, interval, clients, records, *proxyProtocolEnabled, *proxyProtocolReadHeaderTimeout)
startAccepting(*maxClients, *connType, *connHost, connPort, interval, writeDeadline, clients, records, *proxyProtocolEnabled, *proxyProtocolReadHeaderTimeout)
}
for {
if *prometheusCleanUnseenSeconds <= 0 {