feat(queues): apply Confirmation Threshold in write path and persist raw_status (#755)

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
Raj Nandan Sharma
2026-06-13 18:53:11 +05:30
parent 850ebae11a
commit ab527ff7d8
3 changed files with 34 additions and 1 deletions
@@ -92,6 +92,7 @@ interface MonitoringDataInput {
latency?: number;
type: string;
error_message?: string | null;
raw_status?: string | null;
}
interface InterpolatedDataEntry {
@@ -112,6 +113,7 @@ export const InsertMonitoringData = async (data: MonitoringDataInput): Promise<M
latency: data.latency || 0,
type: data.type,
error_message: data.error_message,
raw_status: data.raw_status,
});
};
@@ -7,6 +7,7 @@ import { GetMinuteStartNowTimestampUTC } from "../tool.js";
import db from "../db/db.js";
import monitorResponseQueue from "./monitorResponseQueue";
import GC from "../../global-constants.js";
import { resolveConfirmedStatus } from "../services/confirmationThreshold.js";
let monitorExecuteQueue: Queue | null = null;
let worker: Worker | null = null;
@@ -118,6 +119,25 @@ const addWorker = () => {
let realtimeData: MonitoringResultTS = {};
if (exeResult) {
realtimeData[ts] = exeResult;
// Always record what the check actually observed (forensics + grace counting).
realtimeData[ts].raw_status = exeResult.status;
// Confirmation Threshold damping (#712 / ADR 0009): scheduled checks only.
const threshold = Number(monitor.confirmation_threshold ?? 1);
const isScheduledCheck = ([GC.REALTIME, GC.TIMEOUT, GC.ERROR] as string[]).indexOf(exeResult.type) !== -1;
if (threshold > 1 && isScheduledCheck) {
const resolved = await resolveConfirmedStatus({
monitor_tag: monitor.tag,
ts,
rawStatus: exeResult.status,
threshold,
});
realtimeData[ts].status = resolved.status;
if (resolved.pendingHold) {
realtimeData[ts].latency = 0;
delete realtimeData[ts].error_message;
}
}
}
let incidentData: MonitoringResultTS = await manualIncident(monitor);
@@ -186,6 +206,15 @@ const addWorker = () => {
}
}
// Preserve raw_status from realtime monitoring (overlays replace the merged object wholesale,
// so re-attach the observed value the resolver recorded).
for (const timestamp in mergedData) {
const ts = parseInt(timestamp);
if (realtimeData[ts]?.raw_status !== undefined) {
mergedData[ts].raw_status = realtimeData[ts].raw_status;
}
}
for (const timestamp in mergedData) {
monitorResponseQueue.push(monitor.tag, parseInt(timestamp), mergedData[timestamp]);
}
@@ -17,6 +17,7 @@ interface JobData {
monitorTag: string;
ts: number;
error_message?: string | null;
raw_status?: string | null;
}
const getQueue = () => {
@@ -30,7 +31,7 @@ const addWorker = () => {
if (worker) return worker;
worker = q.createWorker(getQueue(), async (job: Job): Promise<MonitoringData | null> => {
const { monitorTag, ts, status, latency, type, error_message } = job.data as JobData;
const { monitorTag, ts, status, latency, type, error_message, raw_status } = job.data as JobData;
const dbRes = await InsertMonitoringData({
monitor_tag: monitorTag,
@@ -39,6 +40,7 @@ const addWorker = () => {
latency: latency,
type: type,
error_message: error_message,
raw_status: raw_status,
});
if (!dbRes) {