mirror of
https://github.com/rajnandan1/kener.git
synced 2026-06-23 04:10:22 +00:00
Merge pull request #764 from rajnandan1/fix/db-pool-web-worker-split
fix(database): isolate web and worker connection pools
This commit is contained in:
+39
-12
@@ -17,9 +17,24 @@ const intFromEnv = (name: string, fallback: number): number => {
|
||||
// TCP keepalive on pooled connections, on by default. Cloud networks (Railway,
|
||||
// Docker Swarm overlays, k8s) silently drop idle TCP connections; without
|
||||
// keepalive the pool keeps handing out dead sockets after an idle period or a
|
||||
// database restart. See docs/adr/0003-fail-fast-self-healing-db-pool.md.
|
||||
// database restart. See docs .../setup/database-setup.md.
|
||||
const keepAliveEnabled = process.env.DATABASE_KEEPALIVE !== "false";
|
||||
|
||||
interface PoolConfig {
|
||||
min: number;
|
||||
max: number;
|
||||
idleTimeoutMillis: number;
|
||||
createTimeoutMillis: number;
|
||||
}
|
||||
|
||||
// Two pools share one process (Postgres/MySQL only): the WEB pool serves
|
||||
// SvelteKit requests; the WORKER pool serves background jobs (BullMQ workers +
|
||||
// schedulers, routed via src/lib/server/db/poolContext.ts). Isolating them
|
||||
// stops a burst of background jobs from exhausting the connections that serve
|
||||
// page loads. Budget across both pools: replicas * (web + worker) must stay
|
||||
// under the database's max_connections. SQLite has no real pool and reuses a
|
||||
// single connection, so the split does not apply there.
|
||||
//
|
||||
// Pool defaults deviate from knex's on purpose:
|
||||
// - min 0: knex's min 2 connections are never reaped, so they are exactly the
|
||||
// ones that go stale and wedge the app until a manual restart
|
||||
@@ -27,14 +42,17 @@ const keepAliveEnabled = process.env.DATABASE_KEEPALIVE !== "false";
|
||||
// knex's default 60s during a database outage
|
||||
// Tarn requires max >= 1 and min <= max; clamp so a bad env value can not
|
||||
// produce a pool that fails every acquire
|
||||
const poolMax = Math.max(1, intFromEnv("DATABASE_POOL_MAX", 10));
|
||||
const poolMin = Math.min(intFromEnv("DATABASE_POOL_MIN", 0), poolMax);
|
||||
const pool = {
|
||||
min: poolMin,
|
||||
max: poolMax,
|
||||
idleTimeoutMillis: intFromEnv("DATABASE_IDLE_TIMEOUT_MS", 30000),
|
||||
createTimeoutMillis: intFromEnv("DATABASE_CREATE_TIMEOUT_MS", 15000),
|
||||
};
|
||||
const idleTimeoutMillis = intFromEnv("DATABASE_IDLE_TIMEOUT_MS", 30000);
|
||||
const createTimeoutMillis = intFromEnv("DATABASE_CREATE_TIMEOUT_MS", 15000);
|
||||
const poolMin = intFromEnv("DATABASE_POOL_MIN", 0);
|
||||
const buildPool = (max: number): PoolConfig => ({
|
||||
min: Math.min(poolMin, max),
|
||||
max,
|
||||
idleTimeoutMillis,
|
||||
createTimeoutMillis,
|
||||
});
|
||||
const webPool = buildPool(Math.max(1, intFromEnv("DATABASE_POOL_MAX", 10)));
|
||||
const workerPool = buildPool(Math.max(1, intFromEnv("DATABASE_WORKER_POOL_MAX", 5)));
|
||||
const acquireConnectionTimeout = intFromEnv("DATABASE_ACQUIRE_TIMEOUT_MS", 15000);
|
||||
|
||||
interface KnexConfig {
|
||||
@@ -44,7 +62,7 @@ interface KnexConfig {
|
||||
client?: string;
|
||||
connection?: string | { filename: string } | Record<string, unknown>;
|
||||
useNullAsDefault?: boolean;
|
||||
pool?: typeof pool;
|
||||
pool?: PoolConfig;
|
||||
acquireConnectionTimeout?: number;
|
||||
}
|
||||
|
||||
@@ -57,6 +75,12 @@ const knexOb: KnexConfig = {
|
||||
},
|
||||
databaseType,
|
||||
};
|
||||
|
||||
// Worker pool config for Postgres/MySQL — same connection as the web config,
|
||||
// but with the worker pool. Stays null for SQLite (single shared connection),
|
||||
// in which case the app reuses the web instance for background work too.
|
||||
let workerKnexOb: KnexConfig | null = null;
|
||||
|
||||
console.log(`Configuring database with type ${databaseType}`);
|
||||
if (databaseType === "sqlite") {
|
||||
knexOb.client = "better-sqlite3";
|
||||
@@ -70,8 +94,9 @@ if (databaseType === "sqlite") {
|
||||
connectionString: databaseURL,
|
||||
keepAlive: keepAliveEnabled,
|
||||
};
|
||||
knexOb.pool = pool;
|
||||
knexOb.pool = webPool;
|
||||
knexOb.acquireConnectionTimeout = acquireConnectionTimeout;
|
||||
workerKnexOb = { ...knexOb, pool: workerPool };
|
||||
} else if (databaseType === "mysql") {
|
||||
knexOb.client = "mysql2";
|
||||
knexOb.connection = {
|
||||
@@ -79,11 +104,13 @@ if (databaseType === "sqlite") {
|
||||
enableKeepAlive: keepAliveEnabled,
|
||||
keepAliveInitialDelay: 10000,
|
||||
};
|
||||
knexOb.pool = pool;
|
||||
knexOb.pool = webPool;
|
||||
knexOb.acquireConnectionTimeout = acquireConnectionTimeout;
|
||||
workerKnexOb = { ...knexOb, pool: workerPool };
|
||||
} else {
|
||||
console.error("Invalid database type");
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
export { workerKnexOb };
|
||||
export default knexOb;
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import DbImpl from "./dbimpl";
|
||||
import knexOb from "../../../../knexfile.js";
|
||||
import knexOb, { workerKnexOb } from "../../../../knexfile.js";
|
||||
|
||||
const instance: DbImpl = new DbImpl(knexOb);
|
||||
const instance: DbImpl = new DbImpl(knexOb, workerKnexOb);
|
||||
export default instance;
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import Knex from "knex";
|
||||
import type { Knex as KnexType } from "knex";
|
||||
import { runWithWorkerKnex } from "./poolContext.js";
|
||||
|
||||
// Import all repositories
|
||||
import { MonitoringRepository } from "./repositories/monitoring.js";
|
||||
@@ -29,6 +30,9 @@ export type * from "../types/db.js";
|
||||
*/
|
||||
class DbImpl {
|
||||
private knex: KnexType;
|
||||
// Dedicated pool for background jobs (Postgres/MySQL). Equals `knex` when
|
||||
// there is no separate worker pool (e.g. SQLite).
|
||||
private workerKnex: KnexType;
|
||||
|
||||
// Domain repositories
|
||||
private monitoring!: MonitoringRepository;
|
||||
@@ -374,8 +378,11 @@ class DbImpl {
|
||||
deleteEmailTemplate!: EmailTemplateConfigRepository["deleteEmailTemplate"];
|
||||
upsertEmailTemplate!: EmailTemplateConfigRepository["upsertEmailTemplate"];
|
||||
|
||||
constructor(opts: KnexType.Config) {
|
||||
constructor(opts: KnexType.Config, workerOpts?: KnexType.Config | null) {
|
||||
this.knex = Knex(opts);
|
||||
// Separate pool for background jobs when configured (Postgres/MySQL);
|
||||
// otherwise reuse the web pool (SQLite has a single connection).
|
||||
this.workerKnex = workerOpts ? Knex(workerOpts) : this.knex;
|
||||
|
||||
// Initialize repositories
|
||||
this.monitoring = new MonitoringRepository(this.knex);
|
||||
@@ -840,6 +847,15 @@ class DbImpl {
|
||||
|
||||
async init(): Promise<void> {}
|
||||
|
||||
/**
|
||||
* Runs `fn` with all repository queries routed to the worker connection pool.
|
||||
* Wrap background work (BullMQ job processors, schedulers) with this so a
|
||||
* burst of jobs cannot exhaust the web pool that serves page loads.
|
||||
*/
|
||||
runInWorkerContext<T>(fn: () => Promise<T>): Promise<T> {
|
||||
return runWithWorkerKnex(this.workerKnex, fn);
|
||||
}
|
||||
|
||||
/** Probes database connectivity with a trivial query. Never throws. */
|
||||
async ping(): Promise<boolean> {
|
||||
try {
|
||||
@@ -851,7 +867,10 @@ class DbImpl {
|
||||
}
|
||||
|
||||
async close(): Promise<void> {
|
||||
return await this.knex.destroy();
|
||||
await this.knex.destroy();
|
||||
if (this.workerKnex !== this.knex) {
|
||||
await this.workerKnex.destroy();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -0,0 +1,26 @@
|
||||
import { AsyncLocalStorage } from "node:async_hooks";
|
||||
import type { Knex as KnexType } from "knex";
|
||||
|
||||
// Per-execution-context selection of the database connection pool.
|
||||
//
|
||||
// Kener runs SvelteKit requests, the cron scheduler, and the BullMQ workers in
|
||||
// a single process, all sharing one Knex instance. A burst of background jobs
|
||||
// could therefore exhaust the connection pool and time out user-facing page
|
||||
// loads (KnexTimeoutError on acquire). To prevent that, background work runs
|
||||
// against a dedicated worker pool: queues/q.ts wraps every job processor in
|
||||
// runWithWorkerKnex(), and BaseRepository reads getWorkerKnex() so its queries
|
||||
// route to that pool. Anything outside a job (requests, startup, migrations)
|
||||
// has no store set and falls back to the web pool.
|
||||
//
|
||||
// See knexfile.ts for pool sizing and docs .../setup/database-setup.md.
|
||||
const workerKnexStorage = new AsyncLocalStorage<KnexType>();
|
||||
|
||||
/** Runs `fn` with all repository queries routed to the worker pool `knex`. */
|
||||
export function runWithWorkerKnex<T>(knex: KnexType, fn: () => Promise<T>): Promise<T> {
|
||||
return workerKnexStorage.run(knex, fn);
|
||||
}
|
||||
|
||||
/** The worker pool for the current context, or undefined when not in a job. */
|
||||
export function getWorkerKnex(): KnexType | undefined {
|
||||
return workerKnexStorage.getStore();
|
||||
}
|
||||
@@ -1,4 +1,5 @@
|
||||
import type { Knex as KnexType } from "knex";
|
||||
import { getWorkerKnex } from "../poolContext.js";
|
||||
|
||||
// Filter types for queries
|
||||
export interface MonitorFilter {
|
||||
@@ -35,9 +36,22 @@ export interface CountResult {
|
||||
* Base repository class that provides access to the Knex instance
|
||||
*/
|
||||
export abstract class BaseRepository {
|
||||
protected knex: KnexType;
|
||||
private readonly fallbackKnex: KnexType;
|
||||
|
||||
constructor(knex: KnexType) {
|
||||
this.knex = knex;
|
||||
this.fallbackKnex = knex;
|
||||
}
|
||||
|
||||
/**
|
||||
* The Knex instance for the current execution context.
|
||||
*
|
||||
* Background jobs run inside a worker-pool context (set in queues/q.ts), so
|
||||
* their queries use the dedicated worker connection pool. Everything else —
|
||||
* SvelteKit requests, startup — falls back to the web pool this repository
|
||||
* was constructed with. This keeps a burst of background jobs from exhausting
|
||||
* the connections that serve page loads. See poolContext.ts and knexfile.ts.
|
||||
*/
|
||||
protected get knex(): KnexType {
|
||||
return getWorkerKnex() ?? this.fallbackKnex;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import { redisIOConnection } from "../redisConnector.js";
|
||||
import db from "../db/db.js";
|
||||
import {
|
||||
Queue,
|
||||
Worker,
|
||||
@@ -40,7 +41,15 @@ export const createWorker = <T = unknown, R = unknown>(
|
||||
concurrency: 5,
|
||||
...options,
|
||||
};
|
||||
return new Worker<T, R>(queue.name, processor, opts);
|
||||
// Route every job's database access to the worker pool. This is the single
|
||||
// chokepoint all BullMQ workers and schedulers flow through, so wrapping here
|
||||
// isolates background work from the web request pool (see db/poolContext.ts).
|
||||
// Sandboxed (string/URL) processors run out-of-process and pass through.
|
||||
const wrapped: Processor<T, R> =
|
||||
typeof processor === "function"
|
||||
? (job, token) => db.runInWorkerContext(() => Promise.resolve(processor(job, token)))
|
||||
: processor;
|
||||
return new Worker<T, R>(queue.name, wrapped, opts);
|
||||
};
|
||||
|
||||
export default {
|
||||
|
||||
@@ -93,21 +93,27 @@ DATABASE_URL=mysql://kener:password@localhost:3306/kener
|
||||
|
||||
For PostgreSQL and MySQL, Kener ships fail-fast, self-healing pool defaults: no permanently-idle connections, TCP keepalive on, and 15-second connection timeouts. This protects deployments on cloud networks (Railway, Docker Swarm overlays, Kubernetes) that silently drop idle TCP connections, which otherwise causes 500s after idle periods and can require a restart after a database outage.
|
||||
|
||||
Kener uses **two separate pools** so background work cannot starve page loads: a **web pool** (`DATABASE_POOL_MAX`) for HTTP requests and a **worker pool** (`DATABASE_WORKER_POOL_MAX`) for background jobs (monitor checks, alerting, scheduled tasks). A burst of background jobs can only exhaust the worker pool, leaving the web pool free to serve requests.
|
||||
|
||||
Override only if your setup needs it:
|
||||
|
||||
| Variable | Description | Default |
|
||||
| ----------------------------- | --------------------------------------------------------------- | ------- |
|
||||
| `DATABASE_POOL_MIN` | Minimum pool connections (0 lets idle connections be reclaimed) | `0` |
|
||||
| `DATABASE_POOL_MAX` | Maximum pool connections | `10` |
|
||||
| `DATABASE_POOL_MAX` | Max connections for the **web** (HTTP request) pool | `10` |
|
||||
| `DATABASE_WORKER_POOL_MAX` | Max connections for the **worker** (background job) pool | `5` |
|
||||
| `DATABASE_ACQUIRE_TIMEOUT_MS` | How long a query waits for a free connection before failing | `15000` |
|
||||
| `DATABASE_CREATE_TIMEOUT_MS` | How long a new connection attempt waits before failing | `15000` |
|
||||
| `DATABASE_IDLE_TIMEOUT_MS` | How long a connection may sit idle before being closed | `30000` |
|
||||
| `DATABASE_KEEPALIVE` | TCP keepalive on connections (`true`/`false`) | `true` |
|
||||
|
||||
> [!IMPORTANT]
|
||||
> Budget your pools against the database's `max_connections`: `replicas × (DATABASE_POOL_MAX + DATABASE_WORKER_POOL_MAX)` must stay below it. On small managed Postgres tiers (often capped near 20–25 connections), keep the defaults or lower them. Each `GET /` fans out several queries, so a web pool that is too small causes `KnexTimeoutError` under concurrent traffic.
|
||||
|
||||
> [!TIP]
|
||||
> If your database is slow to accept connections (cold starts, cross-region), raise `DATABASE_ACQUIRE_TIMEOUT_MS` and `DATABASE_CREATE_TIMEOUT_MS` instead of disabling keepalive or raising `DATABASE_POOL_MIN`.
|
||||
|
||||
These variables have no effect on SQLite.
|
||||
These variables have no effect on SQLite, which uses a single shared connection.
|
||||
|
||||
## Switching databases {#switching-databases}
|
||||
|
||||
@@ -123,7 +129,7 @@ These variables have no effect on SQLite.
|
||||
- Connection failed: verify host, port, credentials, firewall.
|
||||
- Migration failed: ensure DB exists and user can `CREATE`/`ALTER`.
|
||||
- SQLite write error: ensure directory exists and is writable.
|
||||
- `KnexTimeoutError: Timeout acquiring a connection`: the database is unreachable or too slow to accept connections — check database health first, then see [Connection pool tuning](#connection-pool-tuning).
|
||||
- `KnexTimeoutError: Timeout acquiring a connection`: every pooled connection is busy, or the database is unreachable/too slow to accept new ones. If the database is healthy, the pool is too small for your concurrency — raise `DATABASE_POOL_MAX` (and `DATABASE_WORKER_POOL_MAX`) within your `max_connections` budget. See [Connection pool tuning](#connection-pool-tuning).
|
||||
- `Connection terminated unexpectedly` after idle periods: the network dropped an idle connection; keepalive (on by default) prevents this — verify `DATABASE_KEEPALIVE` is not set to `false`.
|
||||
|
||||
## Environment variables {#environment-variables}
|
||||
|
||||
@@ -242,7 +242,8 @@ SMTP_SECURE=1
|
||||
| :---------------------------- | :----------------------------------------------------------- | :------------------------------------ |
|
||||
| `DATABASE_URL` | Full database connection string | `sqlite://./database/kener.sqlite.db` |
|
||||
| `DATABASE_POOL_MIN` | Minimum pool connections (PostgreSQL/MySQL) | `0` |
|
||||
| `DATABASE_POOL_MAX` | Maximum pool connections (PostgreSQL/MySQL) | `10` |
|
||||
| `DATABASE_POOL_MAX` | Max web/request pool connections (PostgreSQL/MySQL) | `10` |
|
||||
| `DATABASE_WORKER_POOL_MAX` | Max background-job pool connections (PostgreSQL/MySQL) | `5` |
|
||||
| `DATABASE_ACQUIRE_TIMEOUT_MS` | Wait for a free connection before failing (PostgreSQL/MySQL) | `15000` |
|
||||
| `DATABASE_CREATE_TIMEOUT_MS` | Wait for a new connection before failing (PostgreSQL/MySQL) | `15000` |
|
||||
| `DATABASE_IDLE_TIMEOUT_MS` | Idle time before a connection is closed (PostgreSQL/MySQL) | `30000` |
|
||||
|
||||
Reference in New Issue
Block a user