fix: add timeout functionality to kafka (#7472)

Co-authored-by: joaofilipesilva <joao.silva@clearops.com>
This commit is contained in:
João Filipe Silva
2026-06-04 11:36:40 +01:00
committed by GitHub
parent d0a63d56cf
commit 8e27fd1925
5 changed files with 36 additions and 3 deletions
+1
View File
@@ -939,6 +939,7 @@ class Monitor extends BeanModel {
ssl: this.kafkaProducerSsl, ssl: this.kafkaProducerSsl,
clientId: `Uptime-Kuma/${version}`, clientId: `Uptime-Kuma/${version}`,
interval: this.interval, interval: this.interval,
connectionTimeout: this.timeout,
}, },
JSON.parse(this.kafkaProducerSaslOptions) JSON.parse(this.kafkaProducerSaslOptions)
); );
+8 -1
View File
@@ -213,7 +213,13 @@ exports.pingAsync = function (
*/ */
exports.kafkaProducerAsync = function (brokers, topic, message, options = {}, saslOptions = {}) { exports.kafkaProducerAsync = function (brokers, topic, message, options = {}, saslOptions = {}) {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
const { interval = 20, allowAutoTopicCreation = false, ssl = false, clientId = "Uptime-Kuma" } = options; const {
interval = 20,
allowAutoTopicCreation = false,
ssl = false,
clientId = "Uptime-Kuma",
connectionTimeout = 1,
} = options;
let connectedToKafka = false; let connectedToKafka = false;
@@ -238,6 +244,7 @@ exports.kafkaProducerAsync = function (brokers, topic, message, options = {}, sa
retries: 0, retries: 0,
}, },
ssl: ssl, ssl: ssl,
connectionTimeout: connectionTimeout * 1000,
}); });
let producer = client.producer({ let producer = client.producer({
+1
View File
@@ -87,6 +87,7 @@
"Path": "Path", "Path": "Path",
"Heartbeat Interval": "Heartbeat Interval", "Heartbeat Interval": "Heartbeat Interval",
"Request Timeout": "Request Timeout", "Request Timeout": "Request Timeout",
"Connection Timeout": "Connection Timeout",
"timeoutAfter": "Timeout after {0} seconds", "timeoutAfter": "Timeout after {0} seconds",
"Retries": "Retries", "Retries": "Retries",
"Heartbeat Retry Interval": "Heartbeat Retry Interval", "Heartbeat Retry Interval": "Heartbeat Retry Interval",
+11 -2
View File
@@ -1499,12 +1499,19 @@
monitor.type === 'ping' || monitor.type === 'ping' ||
monitor.type === 'rabbitmq' || monitor.type === 'rabbitmq' ||
monitor.type === 'snmp' || monitor.type === 'snmp' ||
monitor.type === 'websocket-upgrade' monitor.type === 'websocket-upgrade' ||
monitor.type === 'kafka-producer'
" "
class="my-3" class="my-3"
> >
<label for="timeout" class="form-label"> <label for="timeout" class="form-label">
{{ monitor.type === "ping" ? $t("pingGlobalTimeoutLabel") : $t("Request Timeout") }} {{
monitor.type === "ping"
? $t("pingGlobalTimeoutLabel")
: monitor.type === "kafka-producer"
? $t("Connection Timeout")
: $t("Request Timeout")
}}
<span v-if="monitor.type !== 'ping'"> <span v-if="monitor.type !== 'ping'">
({{ $t("timeoutAfter", [monitor.timeout || clampTimeout(monitor.interval)]) }}) ({{ $t("timeoutAfter", [monitor.timeout || clampTimeout(monitor.interval)]) }})
</span> </span>
@@ -3659,6 +3666,8 @@ message HealthCheckResponse {
this.monitor.timeout = 5; this.monitor.timeout = 5;
} else if (this.monitor.type === "ping") { } else if (this.monitor.type === "ping") {
this.monitor.timeout = 10; this.monitor.timeout = 10;
} else if (this.monitor.type === "kafka-producer") {
this.monitor.timeout = 1;
} else { } else {
this.monitor.timeout = 48; this.monitor.timeout = 48;
} }
@@ -0,0 +1,15 @@
const { describe, test } = require("node:test");
const assert = require("node:assert");
const { kafkaProducerAsync } = require("../../../server/util-server");
describe("Kafka Producer", () => {
test("rejects when broker is not reachable", async () => {
await assert.rejects(
kafkaProducerAsync(["localhost:19092"], "test-topic", "test-message", {
interval: 5,
connectionTimeout: 1,
}),
/.*/ // any error
);
});
});