diff --git a/.changeset/backpressure-scale-up-freeze.md b/.changeset/backpressure-scale-up-freeze.md new file mode 100644 index 0000000000..b69fad0f26 --- /dev/null +++ b/.changeset/backpressure-scale-up-freeze.md @@ -0,0 +1,5 @@ +--- +"@trigger.dev/core": patch +--- + +Add optional `shouldPauseScaling` to the supervisor consumer pool scaling options to freeze scale-up while it returns true (scale-down stays allowed). diff --git a/.server-changes/supervisor-dequeue-backpressure.md b/.server-changes/supervisor-dequeue-backpressure.md new file mode 100644 index 0000000000..5d62a73c55 --- /dev/null +++ b/.server-changes/supervisor-dequeue-backpressure.md @@ -0,0 +1,6 @@ +--- +area: supervisor +type: feature +--- + +Add opt-in dequeue backpressure to the supervisor. When enabled, the supervisor reads a verdict from Redis and pauses dequeuing while the worker cluster is saturated, then resumes once capacity is available. Disabled by default - no behavior change for existing deployments. diff --git a/apps/supervisor/package.json b/apps/supervisor/package.json index 7456d42185..7a3537dbc0 100644 --- a/apps/supervisor/package.json +++ b/apps/supervisor/package.json @@ -18,6 +18,7 @@ "@kubernetes/client-node": "^1.0.0", "@trigger.dev/core": "workspace:*", "dockerode": "^4.0.6", + "ioredis": "^5.3.2", "p-limit": "^6.2.0", "prom-client": "^15.1.0", "socket.io": "4.7.4", @@ -25,6 +26,7 @@ "zod": "3.25.76" }, "devDependencies": { + "@internal/testcontainers": "workspace:*", "@types/dockerode": "^3.3.33" } } diff --git a/apps/supervisor/src/backpressure/backpressureMetrics.ts b/apps/supervisor/src/backpressure/backpressureMetrics.ts new file mode 100644 index 0000000000..ffe5762854 --- /dev/null +++ b/apps/supervisor/src/backpressure/backpressureMetrics.ts @@ -0,0 +1,34 @@ +import { Counter, Gauge, type Registry } from "prom-client"; + +/** Prometheus metrics for dequeue backpressure. */ +export class BackpressureMetrics { + /** 1 while backpressure is engaged (computed signal, set even in dry-run). */ + readonly engaged: Gauge; + /** 1 when running in dry-run (gates inert). */ + readonly dryRun: Gauge; + /** Dequeue attempts the gate skipped - or would have, in dry-run (labelled). */ + readonly skipsTotal: Counter; + + constructor(opts: { register: Registry; prefix?: string }) { + const prefix = opts.prefix ?? "supervisor_backpressure"; + + this.engaged = new Gauge({ + name: `${prefix}_engaged`, + help: "1 while dequeue backpressure is engaged (computed signal, regardless of dry-run)", + registers: [opts.register], + }); + + this.dryRun = new Gauge({ + name: `${prefix}_dry_run`, + help: "1 when dequeue backpressure is in dry-run mode (gates inert)", + registers: [opts.register], + }); + + this.skipsTotal = new Counter({ + name: `${prefix}_skipped_dequeues_total`, + help: "Dequeue attempts skipped by backpressure (or would be, in dry-run)", + labelNames: ["dry_run"], + registers: [opts.register], + }); + } +} diff --git a/apps/supervisor/src/backpressure/backpressureMonitor.test.ts b/apps/supervisor/src/backpressure/backpressureMonitor.test.ts new file mode 100644 index 0000000000..7af28ffc9f --- /dev/null +++ b/apps/supervisor/src/backpressure/backpressureMonitor.test.ts @@ -0,0 +1,353 @@ +import { describe, it, expect, vi, beforeEach, afterEach } from "vitest"; +import { Registry } from "prom-client"; +import { BackpressureMonitor, type BackpressureSignalSource } from "./backpressureMonitor.js"; +import { BackpressureMetrics } from "./backpressureMetrics.js"; + +function countingSource(verdict: { engaged: boolean } | null): { + source: BackpressureSignalSource; + reads: () => number; +} { + let reads = 0; + return { + source: { + read: async () => { + reads++; + return verdict; + }, + }, + reads: () => reads, + }; +} + +describe("BackpressureMonitor", () => { + beforeEach(() => { + vi.useFakeTimers(); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + it("when disabled, never skips dequeue and never reads the signal source", () => { + // Even though the source would report "engaged", a disabled monitor must be + // a complete no-op: this is the backwards-compatibility guarantee. + const { source, reads } = countingSource({ engaged: true }); + const monitor = new BackpressureMonitor({ enabled: false, source }); + + monitor.start(); + + expect(monitor.shouldSkipDequeue()).toBe(false); + expect(reads()).toBe(0); + + monitor.stop(); + }); + + it("when enabled and the source reports engaged, skips dequeue after a refresh", async () => { + const { source } = countingSource({ engaged: true }); + const monitor = new BackpressureMonitor({ enabled: true, source, refreshIntervalMs: 1000 }); + + monitor.start(); + await vi.advanceTimersByTimeAsync(0); // flush the initial async read + + expect(monitor.shouldSkipDequeue()).toBe(true); + + monitor.stop(); + }); + + it("when enabled and the source reports clear, does not skip dequeue", async () => { + const { source } = countingSource({ engaged: false }); + const monitor = new BackpressureMonitor({ enabled: true, source, refreshIntervalMs: 1000 }); + + monitor.start(); + await vi.advanceTimersByTimeAsync(0); + + expect(monitor.shouldSkipDequeue()).toBe(false); + + monitor.stop(); + }); + + it("fails open (stops skipping) when the source throws", async () => { + let call = 0; + const source: BackpressureSignalSource = { + read: async () => { + call++; + if (call === 1) { + return { engaged: true }; + } + throw new Error("signal source unreachable"); + }, + }; + const monitor = new BackpressureMonitor({ enabled: true, source, refreshIntervalMs: 1000 }); + + monitor.start(); + await vi.advanceTimersByTimeAsync(0); + expect(monitor.shouldSkipDequeue()).toBe(true); // engaged from the first read + + await vi.advanceTimersByTimeAsync(1000); // next refresh throws + expect(monitor.shouldSkipDequeue()).toBe(false); // fail-open: a dead source must not pin the brake + + monitor.stop(); + }); + + it("fails open when the source reports unknown (null)", async () => { + const { source } = countingSource(null); + const monitor = new BackpressureMonitor({ enabled: true, source, refreshIntervalMs: 1000 }); + + monitor.start(); + await vi.advanceTimersByTimeAsync(0); + + expect(monitor.shouldSkipDequeue()).toBe(false); + + monitor.stop(); + }); + + it("fails open when the cached verdict goes stale (older than max age)", async () => { + // Source stops updating (e.g. hangs) after the first read; the verdict ages out. + const source: BackpressureSignalSource = { + read: async () => ({ engaged: true, ts: Date.now() }), + }; + const monitor = new BackpressureMonitor({ + enabled: true, + source, + refreshIntervalMs: 1_000_000, // effectively only the initial read fires + maxVerdictAgeMs: 15_000, + }); + + monitor.start(); + await vi.advanceTimersByTimeAsync(0); + expect(monitor.shouldSkipDequeue()).toBe(true); + + await vi.advanceTimersByTimeAsync(15_001); // verdict now older than max age + expect(monitor.shouldSkipDequeue()).toBe(false); + + monitor.stop(); + }); + + it("does not read the source on the hot path (reads are driven by the refresh tick)", async () => { + const { source, reads } = countingSource({ engaged: true }); + const monitor = new BackpressureMonitor({ enabled: true, source, refreshIntervalMs: 1000 }); + + monitor.start(); + await vi.advanceTimersByTimeAsync(0); + expect(reads()).toBe(1); // just the initial refresh + + for (let i = 0; i < 1000; i++) { + monitor.shouldSkipDequeue(); + } + + expect(reads()).toBe(1); // hot-path calls performed zero I/O + + monitor.stop(); + }); + + it("does not start an overlapping refresh while one is in flight", async () => { + let reads = 0; + const source: BackpressureSignalSource = { + // Never resolves - simulates a hung read. + read: () => { + reads++; + return new Promise<{ engaged: boolean } | null>(() => {}); + }, + }; + const monitor = new BackpressureMonitor({ enabled: true, source, refreshIntervalMs: 1000 }); + + monitor.start(); + await vi.advanceTimersByTimeAsync(3000); // several intervals while the first read hangs + + expect(reads).toBe(1); // in-flight guard prevents stacking + + monitor.stop(); + }); + + it("stops refreshing after stop()", async () => { + const { source, reads } = countingSource({ engaged: true }); + const monitor = new BackpressureMonitor({ enabled: true, source, refreshIntervalMs: 1000 }); + + monitor.start(); + await vi.advanceTimersByTimeAsync(0); + const readsAtStop = reads(); + + monitor.stop(); + await vi.advanceTimersByTimeAsync(5000); + + expect(reads()).toBe(readsAtStop); + }); + + it("isEngaged reflects the hard engaged state (the signal for freezing scale-up)", async () => { + const { source } = countingSource({ engaged: true }); + const monitor = new BackpressureMonitor({ enabled: true, source, refreshIntervalMs: 1000 }); + + monitor.start(); + await vi.advanceTimersByTimeAsync(0); + + expect(monitor.isEngaged()).toBe(true); + + monitor.stop(); + }); + + it("isEngaged is false when clear and when stale", async () => { + const source: BackpressureSignalSource = { + read: async () => ({ engaged: true, ts: Date.now() }), + }; + const monitor = new BackpressureMonitor({ + enabled: true, + source, + refreshIntervalMs: 1_000_000, + maxVerdictAgeMs: 15_000, + }); + + monitor.start(); + await vi.advanceTimersByTimeAsync(0); + expect(monitor.isEngaged()).toBe(true); + + await vi.advanceTimersByTimeAsync(15_001); // stale → fail-open + expect(monitor.isEngaged()).toBe(false); + + monitor.stop(); + }); + + it("ramps the dequeue gate after release instead of resuming instantly", async () => { + let engaged = true; + let rnd = 0.5; + const source: BackpressureSignalSource = { read: async () => ({ engaged }) }; + const monitor = new BackpressureMonitor({ + enabled: true, + source, + refreshIntervalMs: 1000, + rampMs: 10_000, + random: () => rnd, + }); + + monitor.start(); + await vi.advanceTimersByTimeAsync(0); + expect(monitor.shouldSkipDequeue()).toBe(true); // hard engaged + + // Release: the next refresh observes the clear verdict and starts the ramp. + engaged = false; + await vi.advanceTimersByTimeAsync(1000); + expect(monitor.isEngaged()).toBe(false); + + // Just after release (progress ~0): skip probability ~1, so skip regardless. + rnd = 0.99; + expect(monitor.shouldSkipDequeue()).toBe(true); + + // Halfway through the ramp (progress 0.5): skip probability 0.5. + await vi.advanceTimersByTimeAsync(5000); + rnd = 0.4; + expect(monitor.shouldSkipDequeue()).toBe(true); // 0.4 < 0.5 → skip + rnd = 0.6; + expect(monitor.shouldSkipDequeue()).toBe(false); // 0.6 ≥ 0.5 → allow + + // Past the ramp window: never skip. + await vi.advanceTimersByTimeAsync(5000); + rnd = 0.0; + expect(monitor.shouldSkipDequeue()).toBe(false); + + monitor.stop(); + }); + + it("fails open on an engaged verdict with no timestamp when staleness is enforced", async () => { + // A verdict claiming engaged but carrying no ts can't be checked for freshness; + // when maxVerdictAgeMs is set we must not trust it (else a dead producer could + // pin the brake forever). + const { source } = countingSource({ engaged: true }); // no ts + const monitor = new BackpressureMonitor({ + enabled: true, + source, + refreshIntervalMs: 1000, + maxVerdictAgeMs: 15_000, + }); + + monitor.start(); + await vi.advanceTimersByTimeAsync(0); + + expect(monitor.computeEngaged()).toBe(false); + expect(monitor.shouldSkipDequeue()).toBe(false); + + monitor.stop(); + }); + + it("in dry-run, the gates are inert but computeEngaged still reflects the real signal", async () => { + const { source } = countingSource({ engaged: true }); + const monitor = new BackpressureMonitor({ + enabled: true, + source, + refreshIntervalMs: 1000, + dryRun: true, + }); + + monitor.start(); + await vi.advanceTimersByTimeAsync(0); + + expect(monitor.computeEngaged()).toBe(true); // real signal, for observability/metrics + expect(monitor.isEngaged()).toBe(false); // inert: no scale-up freeze + expect(monitor.shouldSkipDequeue()).toBe(false); // inert: no dequeue skip + + monitor.stop(); + }); + + it("logs on verdict transitions", async () => { + let engaged = true; + const source: BackpressureSignalSource = { read: async () => ({ engaged }) }; + const logs: Array<{ message: string; meta?: Record }> = []; + const logger = { + info: (message: string, meta?: Record) => logs.push({ message, meta }), + }; + const monitor = new BackpressureMonitor({ + enabled: true, + source, + refreshIntervalMs: 1000, + logger, + }); + + monitor.start(); + await vi.advanceTimersByTimeAsync(0); + expect(logs.some((l) => l.meta?.engaged === true)).toBe(true); + + engaged = false; + await vi.advanceTimersByTimeAsync(1000); + expect(logs.some((l) => l.meta?.engaged === false)).toBe(true); + + monitor.stop(); + }); + + it("records prometheus metrics", async () => { + const { source } = countingSource({ engaged: true }); + const register = new Registry(); + const metrics = new BackpressureMetrics({ register }); + const monitor = new BackpressureMonitor({ + enabled: true, + source, + refreshIntervalMs: 1000, + metrics, + }); + + monitor.start(); + await vi.advanceTimersByTimeAsync(0); + + expect(await register.metrics()).toContain("supervisor_backpressure_engaged 1"); + + monitor.shouldSkipDequeue(); + expect(await register.metrics()).toMatch( + /supervisor_backpressure_skipped_dequeues_total\{dry_run="false"\} [1-9]/ + ); + + monitor.stop(); + }); + + it("resumes instantly when no ramp is configured", async () => { + let engaged = true; + const source: BackpressureSignalSource = { read: async () => ({ engaged }) }; + const monitor = new BackpressureMonitor({ enabled: true, source, refreshIntervalMs: 1000 }); + + monitor.start(); + await vi.advanceTimersByTimeAsync(0); + expect(monitor.shouldSkipDequeue()).toBe(true); + + engaged = false; + await vi.advanceTimersByTimeAsync(1000); + expect(monitor.shouldSkipDequeue()).toBe(false); // no ramp → instant resume + + monitor.stop(); + }); +}); diff --git a/apps/supervisor/src/backpressure/backpressureMonitor.ts b/apps/supervisor/src/backpressure/backpressureMonitor.ts new file mode 100644 index 0000000000..6b4170697e --- /dev/null +++ b/apps/supervisor/src/backpressure/backpressureMonitor.ts @@ -0,0 +1,179 @@ +import type { BackpressureMetrics } from "./backpressureMetrics.js"; + +export interface BackpressureLogger { + info(message: string, meta?: Record): void; +} + +export type BackpressureVerdict = { + engaged: boolean; + /** Epoch ms the verdict was produced. Used for consumer-side staleness fail-open. */ + ts?: number; +}; + +/** + * Source of the current backpressure verdict. `read()` returns `null` when the + * verdict is unknown (missing/unreadable) - the monitor treats unknown as + * "not engaged" (fail-open). + */ +export interface BackpressureSignalSource { + read(): Promise; +} + +export type BackpressureMonitorOptions = { + enabled: boolean; + source: BackpressureSignalSource; + refreshIntervalMs?: number; + /** + * If set, a cached verdict older than this is treated as unknown (fail-open). + * Guards against the source silently going stale (e.g. hanging reads). + */ + maxVerdictAgeMs?: number; + /** + * If set, after backpressure releases the dequeue gate stays partially engaged + * for this long, skipping a linearly-decaying fraction of attempts so the + * aggregate dequeue rate ramps from ~0 to full instead of snapping to full and + * re-flooding a freshly-recovered cluster. 0/unset = instant resume. + */ + rampMs?: number; + /** Injectable RNG for the resume ramp; defaults to Math.random. */ + random?: () => number; + /** + * When true, the gates are inert (never skip dequeues, never freeze scale-up). + * computeEngaged() still reflects the real signal so it can be observed. + */ + dryRun?: boolean; + logger?: BackpressureLogger; + metrics?: BackpressureMetrics; +}; + +const DEFAULT_REFRESH_INTERVAL_MS = 1000; + +export class BackpressureMonitor { + private verdict: BackpressureVerdict | null = null; + private timer?: ReturnType; + private refreshInFlight = false; + private wasEngaged = false; + private releasedAt?: number; + + constructor(private readonly opts: BackpressureMonitorOptions) { + this.opts.metrics?.dryRun.set(this.opts.dryRun ? 1 : 0); + } + + start(): void { + if (!this.opts.enabled) { + return; + } + + void this.refreshTick(); + this.timer = setInterval( + () => void this.refreshTick(), + this.opts.refreshIntervalMs ?? DEFAULT_REFRESH_INTERVAL_MS + ); + } + + /** Skip a tick if the previous refresh is still in flight, so slow/hung reads can't stack. */ + private async refreshTick(): Promise { + if (this.refreshInFlight) { + return; + } + this.refreshInFlight = true; + try { + await this.refresh(); + } finally { + this.refreshInFlight = false; + } + } + + stop(): void { + if (this.timer) { + clearInterval(this.timer); + this.timer = undefined; + } + } + + /** + * Raw hard backpressure state: true while the (fresh) verdict says engaged, + * ignoring dry-run. Used for observability/metrics so the real signal is + * visible even when the gates are inert. + */ + computeEngaged(): boolean { + const verdict = this.verdict; + if (verdict?.engaged !== true) { + return false; + } + + // When staleness enforcement is on, an engaged verdict must carry a fresh + // timestamp. A missing or stale ts can't be trusted (a dead producer could + // otherwise pin the brake forever), so fail open. + const maxAge = this.opts.maxVerdictAgeMs; + if (maxAge !== undefined) { + if (verdict.ts === undefined || Date.now() - verdict.ts > maxAge) { + return false; + } + } + + return true; + } + + /** + * Effective hard state: the signal for freezing consumer-pool scale-up. Inert + * (false) in dry-run. Hot-path read, no I/O. + */ + isEngaged(): boolean { + return this.opts.dryRun ? false : this.computeEngaged(); + } + + /** Hot-path read: synchronous, never performs I/O. Inert (false) in dry-run. */ + shouldSkipDequeue(): boolean { + const wouldSkip = this.computeShouldSkip(); + if (wouldSkip) { + this.opts.metrics?.skipsTotal.inc({ dry_run: this.opts.dryRun ? "true" : "false" }); + } + return this.opts.dryRun ? false : wouldSkip; + } + + private computeShouldSkip(): boolean { + if (this.computeEngaged()) { + return true; + } + + // Post-release ramp: skip a linearly-decaying fraction of attempts so the + // aggregate dequeue rate climbs back to full over rampMs rather than snapping. + const rampMs = this.opts.rampMs; + if (rampMs && this.releasedAt !== undefined) { + const elapsed = Date.now() - this.releasedAt; + if (elapsed < rampMs) { + const skipProbability = 1 - elapsed / rampMs; + return (this.opts.random ?? Math.random)() < skipProbability; + } + } + + return false; + } + + private async refresh(): Promise { + try { + this.verdict = await this.opts.source.read(); + } catch { + // Fail-open: a dead/unreachable source must never pin the brake. Treat as + // unknown (no verdict) so dequeue resumes as if backpressure were off. + this.verdict = null; + } + + // Track the engaged→released transition to anchor the resume ramp. Use the + // staleness-aware state so a stale verdict doesn't pin wasEngaged / the gauge. + const nowEngaged = this.computeEngaged(); + this.opts.metrics?.engaged.set(nowEngaged ? 1 : 0); + + if (nowEngaged !== this.wasEngaged) { + this.opts.logger?.info("backpressure verdict changed", { + engaged: nowEngaged, + dryRun: !!this.opts.dryRun, + }); + } + if (this.wasEngaged && !nowEngaged) { + this.releasedAt = Date.now(); + } + this.wasEngaged = nowEngaged; + } +} diff --git a/apps/supervisor/src/backpressure/redisBackpressureSignalSource.test.ts b/apps/supervisor/src/backpressure/redisBackpressureSignalSource.test.ts new file mode 100644 index 0000000000..77a7457b13 --- /dev/null +++ b/apps/supervisor/src/backpressure/redisBackpressureSignalSource.test.ts @@ -0,0 +1,62 @@ +import { redisTest } from "@internal/testcontainers"; +import { Redis } from "ioredis"; +import { describe, expect } from "vitest"; +import { RedisBackpressureSignalSource } from "./redisBackpressureSignalSource.js"; + +const KEY = "backpressure:test"; + +describe("RedisBackpressureSignalSource", () => { + redisTest("returns null when the key is absent", async ({ redisOptions }) => { + const redis = new Redis(redisOptions); + try { + const source = new RedisBackpressureSignalSource(redis, KEY); + expect(await source.read()).toBeNull(); + } finally { + await redis.quit(); + } + }); + + redisTest("parses a valid engaged verdict", async ({ redisOptions }) => { + const redis = new Redis(redisOptions); + try { + await redis.set(KEY, JSON.stringify({ engaged: true, ts: 1_700_000_000_000 })); + const source = new RedisBackpressureSignalSource(redis, KEY); + expect(await source.read()).toEqual({ engaged: true, ts: 1_700_000_000_000 }); + } finally { + await redis.quit(); + } + }); + + redisTest("parses a clear verdict", async ({ redisOptions }) => { + const redis = new Redis(redisOptions); + try { + await redis.set(KEY, JSON.stringify({ engaged: false })); + const source = new RedisBackpressureSignalSource(redis, KEY); + expect(await source.read()).toEqual({ engaged: false }); + } finally { + await redis.quit(); + } + }); + + redisTest("returns null for malformed JSON (fail-open)", async ({ redisOptions }) => { + const redis = new Redis(redisOptions); + try { + await redis.set(KEY, "not json {"); + const source = new RedisBackpressureSignalSource(redis, KEY); + expect(await source.read()).toBeNull(); + } finally { + await redis.quit(); + } + }); + + redisTest("returns null for valid JSON of the wrong shape (fail-open)", async ({ redisOptions }) => { + const redis = new Redis(redisOptions); + try { + await redis.set(KEY, JSON.stringify({ foo: "bar" })); + const source = new RedisBackpressureSignalSource(redis, KEY); + expect(await source.read()).toBeNull(); + } finally { + await redis.quit(); + } + }); +}); diff --git a/apps/supervisor/src/backpressure/redisBackpressureSignalSource.ts b/apps/supervisor/src/backpressure/redisBackpressureSignalSource.ts new file mode 100644 index 0000000000..4f8a54c624 --- /dev/null +++ b/apps/supervisor/src/backpressure/redisBackpressureSignalSource.ts @@ -0,0 +1,35 @@ +import type { Redis } from "ioredis"; +import { z } from "zod"; +import type { BackpressureSignalSource, BackpressureVerdict } from "./backpressureMonitor.js"; + +const VerdictSchema = z.object({ + engaged: z.boolean(), + ts: z.number().optional(), +}); + +/** Reads the backpressure verdict from a Redis key written by the cluster-side aggregator. */ +export class RedisBackpressureSignalSource implements BackpressureSignalSource { + constructor( + private readonly redis: Redis, + private readonly key: string + ) {} + + async read(): Promise { + const raw = await this.redis.get(this.key); + if (raw === null) { + return null; + } + + // A malformed or wrong-shaped value is treated as unknown (null) so the + // monitor fails open rather than acting on garbage. + let json: unknown; + try { + json = JSON.parse(raw); + } catch { + return null; + } + + const parsed = VerdictSchema.safeParse(json); + return parsed.success ? parsed.data : null; + } +} diff --git a/apps/supervisor/src/env.ts b/apps/supervisor/src/env.ts index f9f7e2d086..f3b0d4c023 100644 --- a/apps/supervisor/src/env.ts +++ b/apps/supervisor/src/env.ts @@ -51,6 +51,29 @@ const Env = z TRIGGER_DEQUEUE_SCALING_BATCH_WINDOW_MS: z.coerce.number().int().positive().default(1000), // Batch window for metrics processing (ms) TRIGGER_DEQUEUE_SCALING_DAMPING_FACTOR: z.coerce.number().min(0).max(1).default(0.7), // Smooths consumer count changes after EWMA (0=no scaling, 1=immediate) + // Dequeue backpressure - off by default. When enabled, the supervisor reads a + // verdict from Redis (written by the cluster-side aggregator) and pauses dequeues + // while the worker cluster can't schedule pods. Disabled = total no-op: no Redis + // client is created, no reads happen, and the dequeue loop is unaffected. + TRIGGER_DEQUEUE_BACKPRESSURE_ENABLED: BoolEnv.default(false), + // Safety default: even when enabled, backpressure only logs what it would do. + // Set to false to actually skip dequeues / freeze scale-up. + TRIGGER_DEQUEUE_BACKPRESSURE_DRY_RUN: BoolEnv.default(true), + TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_KEY: z.string().default("engine:dequeue:backpressure"), + TRIGGER_DEQUEUE_BACKPRESSURE_REFRESH_MS: z.coerce.number().int().positive().default(1000), + TRIGGER_DEQUEUE_BACKPRESSURE_RAMP_MS: z.coerce.number().int().min(0).default(30_000), // Resume ramp window after release; 0 = instant resume + + TRIGGER_DEQUEUE_BACKPRESSURE_MAX_VERDICT_AGE_MS: z.coerce + .number() + .int() + .positive() + .default(15_000), // Stale verdict → fail-open (treat as not engaged) + TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_HOST: z.string().optional(), + TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_PORT: z.coerce.number().int().optional(), + TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_USERNAME: z.string().optional(), + TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_PASSWORD: z.string().optional(), + TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_TLS_DISABLED: BoolEnv.default(false), + // Optional services TRIGGER_WARM_START_URL: z.string().optional(), TRIGGER_CHECKPOINT_URL: z.string().optional(), @@ -285,6 +308,14 @@ const Env = z path: ["TRIGGER_WORKLOAD_API_DOMAIN"], }); } + if (data.TRIGGER_DEQUEUE_BACKPRESSURE_ENABLED && !data.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_HOST) { + ctx.addIssue({ + code: z.ZodIssueCode.custom, + message: + "TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_HOST is required when TRIGGER_DEQUEUE_BACKPRESSURE_ENABLED is true", + path: ["TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_HOST"], + }); + } }) .transform((data) => ({ ...data, diff --git a/apps/supervisor/src/index.ts b/apps/supervisor/src/index.ts index 440459a416..aa9255ef26 100644 --- a/apps/supervisor/src/index.ts +++ b/apps/supervisor/src/index.ts @@ -28,6 +28,10 @@ import { FailedPodHandler } from "./services/failedPodHandler.js"; import { getWorkerToken } from "./workerToken.js"; import { OtlpTraceService } from "./services/otlpTraceService.js"; import { extractTraceparent, getRestoreRunnerId } from "./util.js"; +import { Redis } from "ioredis"; +import { BackpressureMonitor } from "./backpressure/backpressureMonitor.js"; +import { RedisBackpressureSignalSource } from "./backpressure/redisBackpressureSignalSource.js"; +import { BackpressureMetrics } from "./backpressure/backpressureMetrics.js"; import { fromContext, recordPhaseSince, @@ -54,6 +58,8 @@ class ManagedSupervisor { private readonly podCleaner?: PodCleaner; private readonly failedPodHandler?: FailedPodHandler; private readonly tracing?: OtlpTraceService; + private readonly backpressureMonitor?: BackpressureMonitor; + private readonly backpressureRedis?: Redis; private readonly isKubernetes = isKubernetesEnvironment(env.KUBERNETES_FORCE_ENABLED); private readonly warmStartUrl = env.TRIGGER_WARM_START_URL; @@ -66,10 +72,14 @@ class ManagedSupervisor { private readonly wideEventsNoisyRoutes = env.TRIGGER_WIDE_EVENTS_NOISY_ROUTES; constructor() { + // Strip secret-like env vars before debug-logging the rest. Add any new + // secret env var here so it never lands in the DEBUG "Starting up" log. const { TRIGGER_WORKER_TOKEN, MANAGED_WORKER_SECRET, COMPUTE_GATEWAY_AUTH_TOKEN, + DOCKER_REGISTRY_PASSWORD, + TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_PASSWORD, ...envWithoutSecrets } = env; @@ -181,6 +191,42 @@ class ManagedSupervisor { ); } + if (env.TRIGGER_DEQUEUE_BACKPRESSURE_ENABLED) { + this.backpressureRedis = new Redis({ + host: env.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_HOST, + port: env.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_PORT, + username: env.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_USERNAME, + password: env.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_PASSWORD, + ...(env.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_TLS_DISABLED ? {} : { tls: {} }), + maxRetriesPerRequest: null, + }); + this.backpressureRedis.on("error", (error) => + this.logger.error("Backpressure redis error", { error: error.message }) + ); + + this.backpressureMonitor = new BackpressureMonitor({ + enabled: true, + source: new RedisBackpressureSignalSource( + this.backpressureRedis, + env.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_KEY + ), + refreshIntervalMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_REFRESH_MS, + maxVerdictAgeMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_MAX_VERDICT_AGE_MS, + rampMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_RAMP_MS, + dryRun: env.TRIGGER_DEQUEUE_BACKPRESSURE_DRY_RUN, + logger: this.logger, + metrics: new BackpressureMetrics({ register }), + }); + + this.logger.log("🛑 Dequeue backpressure enabled", { + key: env.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_KEY, + refreshIntervalMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_REFRESH_MS, + maxVerdictAgeMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_MAX_VERDICT_AGE_MS, + rampMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_RAMP_MS, + dryRun: env.TRIGGER_DEQUEUE_BACKPRESSURE_DRY_RUN, + }); + } + this.workerSession = new SupervisorSession({ workerToken: getWorkerToken(), apiUrl: env.TRIGGER_API_URL, @@ -202,18 +248,20 @@ class ManagedSupervisor { ewmaAlpha: env.TRIGGER_DEQUEUE_SCALING_EWMA_ALPHA, batchWindowMs: env.TRIGGER_DEQUEUE_SCALING_BATCH_WINDOW_MS, dampingFactor: env.TRIGGER_DEQUEUE_SCALING_DAMPING_FACTOR, + // Freeze scale-up while backpressure is hard-engaged (not during the resume + // ramp). Undefined when backpressure is disabled → no effect on scaling. + shouldPauseScaling: () => this.backpressureMonitor?.isEngaged() ?? false, }, runNotificationsEnabled: env.TRIGGER_WORKLOAD_API_ENABLED, heartbeatIntervalSeconds: env.TRIGGER_WORKER_HEARTBEAT_INTERVAL_SECONDS, sendRunDebugLogs: env.SEND_RUN_DEBUG_LOGS, preDequeue: async () => { - if (!env.RESOURCE_MONITOR_ENABLED) { - return {}; - } + // Synchronous, hot-path-safe cached read; undefined when backpressure is disabled. + const skipForBackpressure = this.backpressureMonitor?.shouldSkipDequeue() ?? false; - if (this.isKubernetes) { - // Not used in k8s for now - return {}; + if (!env.RESOURCE_MONITOR_ENABLED || this.isKubernetes) { + // Resource monitor is not used in k8s; backpressure is the only gate there. + return { skipDequeue: skipForBackpressure }; } const resources = await this.resourceMonitor.getNodeResources(); @@ -223,7 +271,10 @@ class ManagedSupervisor { cpu: resources.cpuAvailable, memory: resources.memoryAvailable, }, - skipDequeue: resources.cpuAvailable < 0.25 || resources.memoryAvailable < 0.25, + skipDequeue: + skipForBackpressure || + resources.cpuAvailable < 0.25 || + resources.memoryAvailable < 0.25, }; }, preSkip: async () => { @@ -553,6 +604,7 @@ class ManagedSupervisor { this.logger.log("Starting up"); // Optional services + this.backpressureMonitor?.start(); await this.podCleaner?.start(); await this.failedPodHandler?.start(); await this.metricsServer?.start(); @@ -577,6 +629,8 @@ class ManagedSupervisor { await this.workerSession.stop(); // Optional services + this.backpressureMonitor?.stop(); + await this.backpressureRedis?.quit(); await this.podCleaner?.stop(); await this.failedPodHandler?.stop(); await this.metricsServer?.stop(); diff --git a/packages/core/src/v3/runEngineWorker/supervisor/consumerPool.test.ts b/packages/core/src/v3/runEngineWorker/supervisor/consumerPool.test.ts index 6093790b01..d6627b3004 100644 --- a/packages/core/src/v3/runEngineWorker/supervisor/consumerPool.test.ts +++ b/packages/core/src/v3/runEngineWorker/supervisor/consumerPool.test.ts @@ -718,4 +718,65 @@ describe("RunQueueConsumerPool", () => { expect(pool.size).toBe(1); }); }); + + describe("Backpressure scale-up freeze", () => { + it("freezes scale-up while shouldPauseScaling returns true, then resumes", async () => { + let paused = true; + pool = new RunQueueConsumerPool({ + ...defaultOptions, + scaling: { + strategy: "smooth", + minConsumerCount: 1, + maxConsumerCount: 10, + scaleUpCooldownMs: 0, + disableJitter: true, + shouldPauseScaling: () => paused, + }, + }); + await pool.start(); + expect(pool.size).toBe(1); + + // A high queue would normally scale up, but backpressure freezes it. + pool.updateQueueLength(10); + advanceTimeAndProcessMetrics(1100); + expect(pool.size).toBe(1); + + // Once backpressure releases, scaling resumes. + paused = false; + pool.updateQueueLength(10); + advanceTimeAndProcessMetrics(1100); + expect(pool.size).toBeGreaterThan(1); + }); + + it("still allows scale-down while paused", async () => { + let paused = false; + pool = new RunQueueConsumerPool({ + ...defaultOptions, + scaling: { + strategy: "smooth", + minConsumerCount: 1, + maxConsumerCount: 10, + scaleUpCooldownMs: 0, + scaleDownCooldownMs: 0, + disableJitter: true, + shouldPauseScaling: () => paused, + }, + }); + await pool.start(); + + pool.updateQueueLength(10); + advanceTimeAndProcessMetrics(1100); + const scaledUp = pool.size; + expect(scaledUp).toBeGreaterThan(1); + + // Pausing must not block shrinking - we want to drain down, just not grow. + // Loop to let the EWMA-smoothed queue length fall (one batch isn't enough). + paused = true; + for (let i = 0; i < 5; i++) { + pool.updateQueueLength(0); + advanceTimeAndProcessMetrics(1100); + } + expect(pool.size).toBeLessThan(scaledUp); + }); + }); }); diff --git a/packages/core/src/v3/runEngineWorker/supervisor/consumerPool.ts b/packages/core/src/v3/runEngineWorker/supervisor/consumerPool.ts index d72cef75c7..5c73a9819a 100644 --- a/packages/core/src/v3/runEngineWorker/supervisor/consumerPool.ts +++ b/packages/core/src/v3/runEngineWorker/supervisor/consumerPool.ts @@ -22,6 +22,12 @@ export type ScalingOptions = { batchWindowMs?: number; disableJitter?: boolean; dampingFactor?: number; + /** + * When this returns true, scale-up is frozen (scale-down still allowed). Used to + * stop the pool from adding consumers to drain a queue that backpressure is + * deliberately holding. Synchronous and hot-path-safe. + */ + shouldPauseScaling?: () => boolean; }; export type ConsumerPoolOptions = { @@ -49,6 +55,7 @@ export class RunQueueConsumerPool { private readonly maxConsumerCount: number; private readonly scalingStrategy: ScalingStrategy; private readonly disableJitter: boolean; + private readonly shouldPauseScaling?: () => boolean; private consumers: Map = new Map(); private readonly consumerFactory: QueueConsumerFactory; @@ -79,6 +86,7 @@ export class RunQueueConsumerPool { this.scaleUpCooldownMs = opts.scaling.scaleUpCooldownMs ?? 10000; // 10 seconds default this.scaleDownCooldownMs = opts.scaling.scaleDownCooldownMs ?? 60000; // 60 seconds default this.disableJitter = opts.scaling.disableJitter ?? false; + this.shouldPauseScaling = opts.scaling.shouldPauseScaling; // Configure EWMA parameters from options this.ewmaAlpha = opts.scaling.ewmaAlpha ?? 0.3; @@ -259,6 +267,16 @@ export class RunQueueConsumerPool { // Check cooldown periods with jitter if (targetCount > this.consumers.size) { + // Freeze scale-up while backpressure is engaged - don't add consumers to + // drain a queue we're deliberately holding. Scale-down stays allowed. + if (this.shouldPauseScaling?.()) { + this.logger.debug("Scale up frozen by backpressure", { + currentCount: this.consumers.size, + targetCount, + }); + return; + } + // Scale up const effectiveCooldown = this.scaleUpCooldownMs + jitterMs; if (timeSinceLastScale < effectiveCooldown) { diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 1efd8dea94..ce093c8ffc 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -224,6 +224,9 @@ importers: dockerode: specifier: ^4.0.6 version: 4.0.6 + ioredis: + specifier: ^5.3.2 + version: 5.3.2 p-limit: specifier: ^6.2.0 version: 6.2.0 @@ -240,6 +243,9 @@ importers: specifier: 3.25.76 version: 3.25.76 devDependencies: + '@internal/testcontainers': + specifier: workspace:* + version: link:../../internal-packages/testcontainers '@types/dockerode': specifier: ^3.3.33 version: 3.3.35