From ab527ff7d829f41f5d5fa6aff6a0109332b798fb Mon Sep 17 00:00:00 2001 From: Raj Nandan Sharma Date: Sat, 13 Jun 2026 18:53:11 +0530 Subject: [PATCH] feat(queues): apply Confirmation Threshold in write path and persist raw_status (#755) Co-Authored-By: Claude Fable 5 --- .../server/controllers/monitorsController.ts | 2 ++ src/lib/server/queues/monitorExecuteQueue.ts | 29 +++++++++++++++++++ src/lib/server/queues/monitorResponseQueue.ts | 4 ++- 3 files changed, 34 insertions(+), 1 deletion(-) diff --git a/src/lib/server/controllers/monitorsController.ts b/src/lib/server/controllers/monitorsController.ts index 749e9ff7..65dd6de6 100644 --- a/src/lib/server/controllers/monitorsController.ts +++ b/src/lib/server/controllers/monitorsController.ts @@ -92,6 +92,7 @@ interface MonitoringDataInput { latency?: number; type: string; error_message?: string | null; + raw_status?: string | null; } interface InterpolatedDataEntry { @@ -112,6 +113,7 @@ export const InsertMonitoringData = async (data: MonitoringDataInput): Promise { let realtimeData: MonitoringResultTS = {}; if (exeResult) { realtimeData[ts] = exeResult; + // Always record what the check actually observed (forensics + grace counting). + realtimeData[ts].raw_status = exeResult.status; + + // Confirmation Threshold damping (#712 / ADR 0009): scheduled checks only. + const threshold = Number(monitor.confirmation_threshold ?? 1); + const isScheduledCheck = ([GC.REALTIME, GC.TIMEOUT, GC.ERROR] as string[]).indexOf(exeResult.type) !== -1; + if (threshold > 1 && isScheduledCheck) { + const resolved = await resolveConfirmedStatus({ + monitor_tag: monitor.tag, + ts, + rawStatus: exeResult.status, + threshold, + }); + realtimeData[ts].status = resolved.status; + if (resolved.pendingHold) { + realtimeData[ts].latency = 0; + delete realtimeData[ts].error_message; + } + } } let incidentData: MonitoringResultTS = await manualIncident(monitor); @@ -186,6 +206,15 @@ const addWorker = () => { } } + // Preserve raw_status from realtime monitoring (overlays replace the merged object wholesale, + // so re-attach the observed value the resolver recorded). + for (const timestamp in mergedData) { + const ts = parseInt(timestamp); + if (realtimeData[ts]?.raw_status !== undefined) { + mergedData[ts].raw_status = realtimeData[ts].raw_status; + } + } + for (const timestamp in mergedData) { monitorResponseQueue.push(monitor.tag, parseInt(timestamp), mergedData[timestamp]); } diff --git a/src/lib/server/queues/monitorResponseQueue.ts b/src/lib/server/queues/monitorResponseQueue.ts index 817600e9..346f8c6f 100644 --- a/src/lib/server/queues/monitorResponseQueue.ts +++ b/src/lib/server/queues/monitorResponseQueue.ts @@ -17,6 +17,7 @@ interface JobData { monitorTag: string; ts: number; error_message?: string | null; + raw_status?: string | null; } const getQueue = () => { @@ -30,7 +31,7 @@ const addWorker = () => { if (worker) return worker; worker = q.createWorker(getQueue(), async (job: Job): Promise => { - const { monitorTag, ts, status, latency, type, error_message } = job.data as JobData; + const { monitorTag, ts, status, latency, type, error_message, raw_status } = job.data as JobData; const dbRes = await InsertMonitoringData({ monitor_tag: monitorTag, @@ -39,6 +40,7 @@ const addWorker = () => { latency: latency, type: type, error_message: error_message, + raw_status: raw_status, }); if (!dbRes) {