From 331bf2e06a4a3b7790b2ab5922b0f382a8ef4843 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Thu, 4 Jun 2026 17:35:00 +0100 Subject: [PATCH 1/9] feat(supervisor): add BackpressureMonitor for dequeue gating Cached, fail-open monitor that decides whether to skip dequeues based on a pluggable signal source. Disabled is a total no-op (no refresh, no reads). The hot-path read is synchronous and never performs I/O; every failure mode (source throws, returns null, or verdict goes stale) fails open. --- .../backpressure/backpressureMonitor.test.ts | 154 ++++++++++++++++++ .../src/backpressure/backpressureMonitor.ts | 78 +++++++++ 2 files changed, 232 insertions(+) create mode 100644 apps/supervisor/src/backpressure/backpressureMonitor.test.ts create mode 100644 apps/supervisor/src/backpressure/backpressureMonitor.ts diff --git a/apps/supervisor/src/backpressure/backpressureMonitor.test.ts b/apps/supervisor/src/backpressure/backpressureMonitor.test.ts new file mode 100644 index 0000000000..4306a229af --- /dev/null +++ b/apps/supervisor/src/backpressure/backpressureMonitor.test.ts @@ -0,0 +1,154 @@ +import { describe, it, expect, vi, beforeEach, afterEach } from "vitest"; +import { BackpressureMonitor, type BackpressureSignalSource } from "./backpressureMonitor.js"; + +function countingSource(verdict: { engaged: boolean } | null): { + source: BackpressureSignalSource; + reads: () => number; +} { + let reads = 0; + return { + source: { + read: async () => { + reads++; + return verdict; + }, + }, + reads: () => reads, + }; +} + +describe("BackpressureMonitor", () => { + beforeEach(() => { + vi.useFakeTimers(); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + it("when disabled, never skips dequeue and never reads the signal source", () => { + // Even though the source would report "engaged", a disabled monitor must be + // a complete no-op: this is the backwards-compatibility guarantee. + const { source, reads } = countingSource({ engaged: true }); + const monitor = new BackpressureMonitor({ enabled: false, source }); + + monitor.start(); + + expect(monitor.shouldSkipDequeue()).toBe(false); + expect(reads()).toBe(0); + + monitor.stop(); + }); + + it("when enabled and the source reports engaged, skips dequeue after a refresh", async () => { + const { source } = countingSource({ engaged: true }); + const monitor = new BackpressureMonitor({ enabled: true, source, refreshIntervalMs: 1000 }); + + monitor.start(); + await vi.advanceTimersByTimeAsync(0); // flush the initial async read + + expect(monitor.shouldSkipDequeue()).toBe(true); + + monitor.stop(); + }); + + it("when enabled and the source reports clear, does not skip dequeue", async () => { + const { source } = countingSource({ engaged: false }); + const monitor = new BackpressureMonitor({ enabled: true, source, refreshIntervalMs: 1000 }); + + monitor.start(); + await vi.advanceTimersByTimeAsync(0); + + expect(monitor.shouldSkipDequeue()).toBe(false); + + monitor.stop(); + }); + + it("fails open (stops skipping) when the source throws", async () => { + let call = 0; + const source: BackpressureSignalSource = { + read: async () => { + call++; + if (call === 1) { + return { engaged: true }; + } + throw new Error("signal source unreachable"); + }, + }; + const monitor = new BackpressureMonitor({ enabled: true, source, refreshIntervalMs: 1000 }); + + monitor.start(); + await vi.advanceTimersByTimeAsync(0); + expect(monitor.shouldSkipDequeue()).toBe(true); // engaged from the first read + + await vi.advanceTimersByTimeAsync(1000); // next refresh throws + expect(monitor.shouldSkipDequeue()).toBe(false); // fail-open: a dead source must not pin the brake + + monitor.stop(); + }); + + it("fails open when the source reports unknown (null)", async () => { + const { source } = countingSource(null); + const monitor = new BackpressureMonitor({ enabled: true, source, refreshIntervalMs: 1000 }); + + monitor.start(); + await vi.advanceTimersByTimeAsync(0); + + expect(monitor.shouldSkipDequeue()).toBe(false); + + monitor.stop(); + }); + + it("fails open when the cached verdict goes stale (older than max age)", async () => { + // Source stops updating (e.g. hangs) after the first read; the verdict ages out. + const source: BackpressureSignalSource = { + read: async () => ({ engaged: true, ts: Date.now() }), + }; + const monitor = new BackpressureMonitor({ + enabled: true, + source, + refreshIntervalMs: 1_000_000, // effectively only the initial read fires + maxVerdictAgeMs: 15_000, + }); + + monitor.start(); + await vi.advanceTimersByTimeAsync(0); + expect(monitor.shouldSkipDequeue()).toBe(true); + + await vi.advanceTimersByTimeAsync(15_001); // verdict now older than max age + expect(monitor.shouldSkipDequeue()).toBe(false); + + monitor.stop(); + }); + + it("does not read the source on the hot path (reads are driven by the refresh tick)", async () => { + const { source, reads } = countingSource({ engaged: true }); + const monitor = new BackpressureMonitor({ enabled: true, source, refreshIntervalMs: 1000 }); + + monitor.start(); + await vi.advanceTimersByTimeAsync(0); + expect(reads()).toBe(1); // just the initial refresh + + for (let i = 0; i < 1000; i++) { + monitor.shouldSkipDequeue(); + } + + expect(reads()).toBe(1); // hot-path calls performed zero I/O + + monitor.stop(); + }); + + it("stops refreshing after stop()", async () => { + const { source, reads } = countingSource({ engaged: true }); + const monitor = new BackpressureMonitor({ enabled: true, source, refreshIntervalMs: 1000 }); + + monitor.start(); + await vi.advanceTimersByTimeAsync(0); + const readsAtStop = reads(); + + monitor.stop(); + await vi.advanceTimersByTimeAsync(5000); + + expect(reads()).toBe(readsAtStop); + }); +}); diff --git a/apps/supervisor/src/backpressure/backpressureMonitor.ts b/apps/supervisor/src/backpressure/backpressureMonitor.ts new file mode 100644 index 0000000000..d62ee43875 --- /dev/null +++ b/apps/supervisor/src/backpressure/backpressureMonitor.ts @@ -0,0 +1,78 @@ +export type BackpressureVerdict = { + engaged: boolean; + /** Epoch ms the verdict was produced. Used for consumer-side staleness fail-open. */ + ts?: number; +}; + +/** + * Source of the current backpressure verdict. `read()` returns `null` when the + * verdict is unknown (missing/unreadable) - the monitor treats unknown as + * "not engaged" (fail-open). + */ +export interface BackpressureSignalSource { + read(): Promise; +} + +export type BackpressureMonitorOptions = { + enabled: boolean; + source: BackpressureSignalSource; + refreshIntervalMs?: number; + /** + * If set, a cached verdict older than this is treated as unknown (fail-open). + * Guards against the source silently going stale (e.g. hanging reads). + */ + maxVerdictAgeMs?: number; +}; + +const DEFAULT_REFRESH_INTERVAL_MS = 1000; + +export class BackpressureMonitor { + private verdict: BackpressureVerdict | null = null; + private timer?: ReturnType; + + constructor(private readonly opts: BackpressureMonitorOptions) {} + + start(): void { + if (!this.opts.enabled) { + return; + } + + void this.refresh(); + this.timer = setInterval( + () => void this.refresh(), + this.opts.refreshIntervalMs ?? DEFAULT_REFRESH_INTERVAL_MS + ); + } + + stop(): void { + if (this.timer) { + clearInterval(this.timer); + this.timer = undefined; + } + } + + /** Hot-path read: synchronous, never performs I/O. */ + shouldSkipDequeue(): boolean { + const verdict = this.verdict; + if (verdict?.engaged !== true) { + return false; + } + + const maxAge = this.opts.maxVerdictAgeMs; + if (maxAge !== undefined && verdict.ts !== undefined && Date.now() - verdict.ts > maxAge) { + return false; + } + + return true; + } + + private async refresh(): Promise { + try { + this.verdict = await this.opts.source.read(); + } catch { + // Fail-open: a dead/unreachable source must never pin the brake. Treat as + // unknown (no verdict) so dequeue resumes as if backpressure were off. + this.verdict = null; + } + } +} From 72450381dc4ae72a69a3c88913c673b2a8ae41ee Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Thu, 4 Jun 2026 17:37:38 +0100 Subject: [PATCH 2/9] feat(supervisor): add RedisBackpressureSignalSource Reads the backpressure verdict from a Redis key (written by the cluster-side aggregator). Malformed or wrong-shaped values are treated as unknown so the monitor fails open. Adds @internal/redis + @internal/testcontainers deps. --- apps/supervisor/package.json | 2 + .../redisBackpressureSignalSource.test.ts | 62 +++++++++++++++++++ .../redisBackpressureSignalSource.ts | 35 +++++++++++ pnpm-lock.yaml | 6 ++ 4 files changed, 105 insertions(+) create mode 100644 apps/supervisor/src/backpressure/redisBackpressureSignalSource.test.ts create mode 100644 apps/supervisor/src/backpressure/redisBackpressureSignalSource.ts diff --git a/apps/supervisor/package.json b/apps/supervisor/package.json index 7456d42185..0e35bcfca8 100644 --- a/apps/supervisor/package.json +++ b/apps/supervisor/package.json @@ -15,6 +15,7 @@ "dependencies": { "@aws-sdk/client-ecr": "^3.839.0", "@internal/compute": "workspace:*", + "@internal/redis": "workspace:*", "@kubernetes/client-node": "^1.0.0", "@trigger.dev/core": "workspace:*", "dockerode": "^4.0.6", @@ -25,6 +26,7 @@ "zod": "3.25.76" }, "devDependencies": { + "@internal/testcontainers": "workspace:*", "@types/dockerode": "^3.3.33" } } diff --git a/apps/supervisor/src/backpressure/redisBackpressureSignalSource.test.ts b/apps/supervisor/src/backpressure/redisBackpressureSignalSource.test.ts new file mode 100644 index 0000000000..d3ba9d3897 --- /dev/null +++ b/apps/supervisor/src/backpressure/redisBackpressureSignalSource.test.ts @@ -0,0 +1,62 @@ +import { redisTest } from "@internal/testcontainers"; +import { Redis } from "@internal/redis"; +import { describe, expect } from "vitest"; +import { RedisBackpressureSignalSource } from "./redisBackpressureSignalSource.js"; + +const KEY = "backpressure:test"; + +describe("RedisBackpressureSignalSource", () => { + redisTest("returns null when the key is absent", async ({ redisOptions }) => { + const redis = new Redis(redisOptions); + try { + const source = new RedisBackpressureSignalSource(redis, KEY); + expect(await source.read()).toBeNull(); + } finally { + await redis.quit(); + } + }); + + redisTest("parses a valid engaged verdict", async ({ redisOptions }) => { + const redis = new Redis(redisOptions); + try { + await redis.set(KEY, JSON.stringify({ engaged: true, ts: 1_700_000_000_000 })); + const source = new RedisBackpressureSignalSource(redis, KEY); + expect(await source.read()).toEqual({ engaged: true, ts: 1_700_000_000_000 }); + } finally { + await redis.quit(); + } + }); + + redisTest("parses a clear verdict", async ({ redisOptions }) => { + const redis = new Redis(redisOptions); + try { + await redis.set(KEY, JSON.stringify({ engaged: false })); + const source = new RedisBackpressureSignalSource(redis, KEY); + expect(await source.read()).toEqual({ engaged: false }); + } finally { + await redis.quit(); + } + }); + + redisTest("returns null for malformed JSON (fail-open)", async ({ redisOptions }) => { + const redis = new Redis(redisOptions); + try { + await redis.set(KEY, "not json {"); + const source = new RedisBackpressureSignalSource(redis, KEY); + expect(await source.read()).toBeNull(); + } finally { + await redis.quit(); + } + }); + + redisTest("returns null for valid JSON of the wrong shape (fail-open)", async ({ redisOptions }) => { + const redis = new Redis(redisOptions); + try { + await redis.set(KEY, JSON.stringify({ foo: "bar" })); + const source = new RedisBackpressureSignalSource(redis, KEY); + expect(await source.read()).toBeNull(); + } finally { + await redis.quit(); + } + }); +}); diff --git a/apps/supervisor/src/backpressure/redisBackpressureSignalSource.ts b/apps/supervisor/src/backpressure/redisBackpressureSignalSource.ts new file mode 100644 index 0000000000..ed5958c832 --- /dev/null +++ b/apps/supervisor/src/backpressure/redisBackpressureSignalSource.ts @@ -0,0 +1,35 @@ +import type { Redis } from "@internal/redis"; +import { z } from "zod"; +import type { BackpressureSignalSource, BackpressureVerdict } from "./backpressureMonitor.js"; + +const VerdictSchema = z.object({ + engaged: z.boolean(), + ts: z.number().optional(), +}); + +/** Reads the backpressure verdict from a Redis key written by the cluster-side aggregator. */ +export class RedisBackpressureSignalSource implements BackpressureSignalSource { + constructor( + private readonly redis: Redis, + private readonly key: string + ) {} + + async read(): Promise { + const raw = await this.redis.get(this.key); + if (raw === null) { + return null; + } + + // A malformed or wrong-shaped value is treated as unknown (null) so the + // monitor fails open rather than acting on garbage. + let json: unknown; + try { + json = JSON.parse(raw); + } catch { + return null; + } + + const parsed = VerdictSchema.safeParse(json); + return parsed.success ? parsed.data : null; + } +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 3c5e9afcbc..741ad21cbf 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -215,6 +215,9 @@ importers: '@internal/compute': specifier: workspace:* version: link:../../internal-packages/compute + '@internal/redis': + specifier: workspace:* + version: link:../../internal-packages/redis '@kubernetes/client-node': specifier: ^1.0.0 version: 1.0.0(patch_hash=ba1a06f46256cdb8d6faf7167246692c0de2e7cd846a9dc0f13be0137e1c3745)(bufferutil@4.0.9)(encoding@0.1.13) @@ -240,6 +243,9 @@ importers: specifier: 3.25.76 version: 3.25.76 devDependencies: + '@internal/testcontainers': + specifier: workspace:* + version: link:../../internal-packages/testcontainers '@types/dockerode': specifier: ^3.3.33 version: 3.3.35 From 9a3cf7b1d3d60ccbca4f34110344f41b06c62bf8 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Thu, 4 Jun 2026 17:41:01 +0100 Subject: [PATCH 3/9] feat(supervisor): wire dequeue backpressure into preDequeue Gate dequeues on the backpressure verdict via the existing preDequeue hook, on all paths including k8s (where the resource monitor is a no-op). Construct the Redis-backed monitor only when TRIGGER_DEQUEUE_BACKPRESSURE_ENABLED is set; require a Redis host when enabled. Off by default - no Redis client, no effect. --- .../supervisor-dequeue-backpressure.md | 6 +++ apps/supervisor/src/env.ts | 26 +++++++++ apps/supervisor/src/index.ts | 54 ++++++++++++++++--- 3 files changed, 79 insertions(+), 7 deletions(-) create mode 100644 .server-changes/supervisor-dequeue-backpressure.md diff --git a/.server-changes/supervisor-dequeue-backpressure.md b/.server-changes/supervisor-dequeue-backpressure.md new file mode 100644 index 0000000000..5d62a73c55 --- /dev/null +++ b/.server-changes/supervisor-dequeue-backpressure.md @@ -0,0 +1,6 @@ +--- +area: supervisor +type: feature +--- + +Add opt-in dequeue backpressure to the supervisor. When enabled, the supervisor reads a verdict from Redis and pauses dequeuing while the worker cluster is saturated, then resumes once capacity is available. Disabled by default - no behavior change for existing deployments. diff --git a/apps/supervisor/src/env.ts b/apps/supervisor/src/env.ts index 071de4cc81..6476d34978 100644 --- a/apps/supervisor/src/env.ts +++ b/apps/supervisor/src/env.ts @@ -47,6 +47,24 @@ const Env = z TRIGGER_DEQUEUE_SCALING_BATCH_WINDOW_MS: z.coerce.number().int().positive().default(1000), // Batch window for metrics processing (ms) TRIGGER_DEQUEUE_SCALING_DAMPING_FACTOR: z.coerce.number().min(0).max(1).default(0.7), // Smooths consumer count changes after EWMA (0=no scaling, 1=immediate) + // Dequeue backpressure - off by default. When enabled, the supervisor reads a + // verdict from Redis (written by the cluster-side aggregator) and pauses dequeues + // while the worker cluster can't schedule pods. Disabled = total no-op: no Redis + // client is created, no reads happen, and the dequeue loop is unaffected. + TRIGGER_DEQUEUE_BACKPRESSURE_ENABLED: BoolEnv.default(false), + TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_KEY: z.string().default("engine:dequeue:backpressure"), + TRIGGER_DEQUEUE_BACKPRESSURE_REFRESH_MS: z.coerce.number().int().positive().default(1000), + TRIGGER_DEQUEUE_BACKPRESSURE_MAX_VERDICT_AGE_MS: z.coerce + .number() + .int() + .positive() + .default(15_000), // Stale verdict → fail-open (treat as not engaged) + TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_HOST: z.string().optional(), + TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_PORT: z.coerce.number().int().optional(), + TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_USERNAME: z.string().optional(), + TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_PASSWORD: z.string().optional(), + TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_TLS_DISABLED: BoolEnv.default(false), + // Optional services TRIGGER_WARM_START_URL: z.string().optional(), TRIGGER_CHECKPOINT_URL: z.string().optional(), @@ -281,6 +299,14 @@ const Env = z path: ["TRIGGER_WORKLOAD_API_DOMAIN"], }); } + if (data.TRIGGER_DEQUEUE_BACKPRESSURE_ENABLED && !data.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_HOST) { + ctx.addIssue({ + code: z.ZodIssueCode.custom, + message: + "TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_HOST is required when TRIGGER_DEQUEUE_BACKPRESSURE_ENABLED is true", + path: ["TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_HOST"], + }); + } }) .transform((data) => ({ ...data, diff --git a/apps/supervisor/src/index.ts b/apps/supervisor/src/index.ts index 3a1e6165fc..ed59fd5abd 100644 --- a/apps/supervisor/src/index.ts +++ b/apps/supervisor/src/index.ts @@ -28,6 +28,9 @@ import { FailedPodHandler } from "./services/failedPodHandler.js"; import { getWorkerToken } from "./workerToken.js"; import { OtlpTraceService } from "./services/otlpTraceService.js"; import { extractTraceparent, getRestoreRunnerId } from "./util.js"; +import { createRedisClient } from "@internal/redis"; +import { BackpressureMonitor } from "./backpressure/backpressureMonitor.js"; +import { RedisBackpressureSignalSource } from "./backpressure/redisBackpressureSignalSource.js"; import { fromContext, recordPhaseSince, @@ -54,6 +57,7 @@ class ManagedSupervisor { private readonly podCleaner?: PodCleaner; private readonly failedPodHandler?: FailedPodHandler; private readonly tracing?: OtlpTraceService; + private readonly backpressureMonitor?: BackpressureMonitor; private readonly isKubernetes = isKubernetesEnvironment(env.KUBERNETES_FORCE_ENABLED); private readonly warmStartUrl = env.TRIGGER_WARM_START_URL; @@ -181,6 +185,38 @@ class ManagedSupervisor { ); } + if (env.TRIGGER_DEQUEUE_BACKPRESSURE_ENABLED) { + const backpressureRedis = createRedisClient( + { + host: env.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_HOST, + port: env.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_PORT, + username: env.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_USERNAME, + password: env.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_PASSWORD, + ...(env.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_TLS_DISABLED ? {} : { tls: {} }), + }, + { + onError: (error) => + this.logger.error("Backpressure redis error", { error: error.message }), + } + ); + + this.backpressureMonitor = new BackpressureMonitor({ + enabled: true, + source: new RedisBackpressureSignalSource( + backpressureRedis, + env.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_KEY + ), + refreshIntervalMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_REFRESH_MS, + maxVerdictAgeMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_MAX_VERDICT_AGE_MS, + }); + + this.logger.log("đŸ›‘ Dequeue backpressure enabled", { + key: env.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_KEY, + refreshIntervalMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_REFRESH_MS, + maxVerdictAgeMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_MAX_VERDICT_AGE_MS, + }); + } + this.workerSession = new SupervisorSession({ workerToken: getWorkerToken(), apiUrl: env.TRIGGER_API_URL, @@ -206,13 +242,12 @@ class ManagedSupervisor { heartbeatIntervalSeconds: env.TRIGGER_WORKER_HEARTBEAT_INTERVAL_SECONDS, sendRunDebugLogs: env.SEND_RUN_DEBUG_LOGS, preDequeue: async () => { - if (!env.RESOURCE_MONITOR_ENABLED) { - return {}; - } + // Synchronous, hot-path-safe cached read; undefined when backpressure is disabled. + const skipForBackpressure = this.backpressureMonitor?.shouldSkipDequeue() ?? false; - if (this.isKubernetes) { - // Not used in k8s for now - return {}; + if (!env.RESOURCE_MONITOR_ENABLED || this.isKubernetes) { + // Resource monitor is not used in k8s; backpressure is the only gate there. + return { skipDequeue: skipForBackpressure }; } const resources = await this.resourceMonitor.getNodeResources(); @@ -222,7 +257,10 @@ class ManagedSupervisor { cpu: resources.cpuAvailable, memory: resources.memoryAvailable, }, - skipDequeue: resources.cpuAvailable < 0.25 || resources.memoryAvailable < 0.25, + skipDequeue: + skipForBackpressure || + resources.cpuAvailable < 0.25 || + resources.memoryAvailable < 0.25, }; }, preSkip: async () => { @@ -552,6 +590,7 @@ class ManagedSupervisor { this.logger.log("Starting up"); // Optional services + this.backpressureMonitor?.start(); await this.podCleaner?.start(); await this.failedPodHandler?.start(); await this.metricsServer?.start(); @@ -576,6 +615,7 @@ class ManagedSupervisor { await this.workerSession.stop(); // Optional services + this.backpressureMonitor?.stop(); await this.podCleaner?.stop(); await this.failedPodHandler?.stop(); await this.metricsServer?.stop(); From 5399677360bf56bd8453da7ac016a4d2e17e0865 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Thu, 4 Jun 2026 18:01:01 +0100 Subject: [PATCH 4/9] feat(supervisor): split backpressure signal and add resume ramp isEngaged() exposes the hard backpressure state (drives scale-up freeze), while shouldSkipDequeue() additionally ramps after release - skipping a linearly-decaying fraction of attempts over rampMs so the aggregate dequeue rate climbs back to full instead of snapping and re-flooding the cluster. --- .../backpressure/backpressureMonitor.test.ts | 89 +++++++++++++++++++ .../src/backpressure/backpressureMonitor.ts | 47 +++++++++- 2 files changed, 134 insertions(+), 2 deletions(-) diff --git a/apps/supervisor/src/backpressure/backpressureMonitor.test.ts b/apps/supervisor/src/backpressure/backpressureMonitor.test.ts index 4306a229af..0c5c8a2149 100644 --- a/apps/supervisor/src/backpressure/backpressureMonitor.test.ts +++ b/apps/supervisor/src/backpressure/backpressureMonitor.test.ts @@ -151,4 +151,93 @@ describe("BackpressureMonitor", () => { expect(reads()).toBe(readsAtStop); }); + + it("isEngaged reflects the hard engaged state (the signal for freezing scale-up)", async () => { + const { source } = countingSource({ engaged: true }); + const monitor = new BackpressureMonitor({ enabled: true, source, refreshIntervalMs: 1000 }); + + monitor.start(); + await vi.advanceTimersByTimeAsync(0); + + expect(monitor.isEngaged()).toBe(true); + + monitor.stop(); + }); + + it("isEngaged is false when clear and when stale", async () => { + const source: BackpressureSignalSource = { + read: async () => ({ engaged: true, ts: Date.now() }), + }; + const monitor = new BackpressureMonitor({ + enabled: true, + source, + refreshIntervalMs: 1_000_000, + maxVerdictAgeMs: 15_000, + }); + + monitor.start(); + await vi.advanceTimersByTimeAsync(0); + expect(monitor.isEngaged()).toBe(true); + + await vi.advanceTimersByTimeAsync(15_001); // stale → fail-open + expect(monitor.isEngaged()).toBe(false); + + monitor.stop(); + }); + + it("ramps the dequeue gate after release instead of resuming instantly", async () => { + let engaged = true; + let rnd = 0.5; + const source: BackpressureSignalSource = { read: async () => ({ engaged }) }; + const monitor = new BackpressureMonitor({ + enabled: true, + source, + refreshIntervalMs: 1000, + rampMs: 10_000, + random: () => rnd, + }); + + monitor.start(); + await vi.advanceTimersByTimeAsync(0); + expect(monitor.shouldSkipDequeue()).toBe(true); // hard engaged + + // Release: the next refresh observes the clear verdict and starts the ramp. + engaged = false; + await vi.advanceTimersByTimeAsync(1000); + expect(monitor.isEngaged()).toBe(false); + + // Just after release (progress ~0): skip probability ~1, so skip regardless. + rnd = 0.99; + expect(monitor.shouldSkipDequeue()).toBe(true); + + // Halfway through the ramp (progress 0.5): skip probability 0.5. + await vi.advanceTimersByTimeAsync(5000); + rnd = 0.4; + expect(monitor.shouldSkipDequeue()).toBe(true); // 0.4 < 0.5 → skip + rnd = 0.6; + expect(monitor.shouldSkipDequeue()).toBe(false); // 0.6 ≥ 0.5 → allow + + // Past the ramp window: never skip. + await vi.advanceTimersByTimeAsync(5000); + rnd = 0.0; + expect(monitor.shouldSkipDequeue()).toBe(false); + + monitor.stop(); + }); + + it("resumes instantly when no ramp is configured", async () => { + let engaged = true; + const source: BackpressureSignalSource = { read: async () => ({ engaged }) }; + const monitor = new BackpressureMonitor({ enabled: true, source, refreshIntervalMs: 1000 }); + + monitor.start(); + await vi.advanceTimersByTimeAsync(0); + expect(monitor.shouldSkipDequeue()).toBe(true); + + engaged = false; + await vi.advanceTimersByTimeAsync(1000); + expect(monitor.shouldSkipDequeue()).toBe(false); // no ramp → instant resume + + monitor.stop(); + }); }); diff --git a/apps/supervisor/src/backpressure/backpressureMonitor.ts b/apps/supervisor/src/backpressure/backpressureMonitor.ts index d62ee43875..d1aa85483c 100644 --- a/apps/supervisor/src/backpressure/backpressureMonitor.ts +++ b/apps/supervisor/src/backpressure/backpressureMonitor.ts @@ -22,6 +22,15 @@ export type BackpressureMonitorOptions = { * Guards against the source silently going stale (e.g. hanging reads). */ maxVerdictAgeMs?: number; + /** + * If set, after backpressure releases the dequeue gate stays partially engaged + * for this long, skipping a linearly-decaying fraction of attempts so the + * aggregate dequeue rate ramps from ~0 to full instead of snapping to full and + * re-flooding a freshly-recovered cluster. 0/unset = instant resume. + */ + rampMs?: number; + /** Injectable RNG for the resume ramp; defaults to Math.random. */ + random?: () => number; }; const DEFAULT_REFRESH_INTERVAL_MS = 1000; @@ -29,6 +38,8 @@ const DEFAULT_REFRESH_INTERVAL_MS = 1000; export class BackpressureMonitor { private verdict: BackpressureVerdict | null = null; private timer?: ReturnType; + private wasEngaged = false; + private releasedAt?: number; constructor(private readonly opts: BackpressureMonitorOptions) {} @@ -51,8 +62,12 @@ export class BackpressureMonitor { } } - /** Hot-path read: synchronous, never performs I/O. */ - shouldSkipDequeue(): boolean { + /** + * Hard backpressure state: true while the (fresh) verdict says engaged. This is + * the signal for freezing consumer-pool scale-up - distinct from the dequeue + * gate, which additionally ramps after release. Hot-path read, no I/O. + */ + isEngaged(): boolean { const verdict = this.verdict; if (verdict?.engaged !== true) { return false; @@ -66,6 +81,26 @@ export class BackpressureMonitor { return true; } + /** Hot-path read: synchronous, never performs I/O. */ + shouldSkipDequeue(): boolean { + if (this.isEngaged()) { + return true; + } + + // Post-release ramp: skip a linearly-decaying fraction of attempts so the + // aggregate dequeue rate climbs back to full over rampMs rather than snapping. + const rampMs = this.opts.rampMs; + if (rampMs && this.releasedAt !== undefined) { + const elapsed = Date.now() - this.releasedAt; + if (elapsed < rampMs) { + const skipProbability = 1 - elapsed / rampMs; + return (this.opts.random ?? Math.random)() < skipProbability; + } + } + + return false; + } + private async refresh(): Promise { try { this.verdict = await this.opts.source.read(); @@ -74,5 +109,13 @@ export class BackpressureMonitor { // unknown (no verdict) so dequeue resumes as if backpressure were off. this.verdict = null; } + + // Track the engaged→released transition to anchor the resume ramp. Based on + // the raw refreshed verdict, not the staleness-adjusted read. + const nowEngaged = this.verdict?.engaged === true; + if (this.wasEngaged && !nowEngaged) { + this.releasedAt = Date.now(); + } + this.wasEngaged = nowEngaged; } } From 8190215de242612824658f9137cb24b09004add7 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Thu, 4 Jun 2026 18:06:24 +0100 Subject: [PATCH 5/9] feat(core): freeze consumer-pool scale-up under backpressure Add optional shouldPauseScaling to ScalingOptions; when it returns true the pool stops scaling up (scale-down still allowed), so it won't add consumers to drain a queue backpressure is deliberately holding. --- .changeset/backpressure-scale-up-freeze.md | 5 ++ .../supervisor/consumerPool.test.ts | 61 +++++++++++++++++++ .../supervisor/consumerPool.ts | 18 ++++++ 3 files changed, 84 insertions(+) create mode 100644 .changeset/backpressure-scale-up-freeze.md diff --git a/.changeset/backpressure-scale-up-freeze.md b/.changeset/backpressure-scale-up-freeze.md new file mode 100644 index 0000000000..b69fad0f26 --- /dev/null +++ b/.changeset/backpressure-scale-up-freeze.md @@ -0,0 +1,5 @@ +--- +"@trigger.dev/core": patch +--- + +Add optional `shouldPauseScaling` to the supervisor consumer pool scaling options to freeze scale-up while it returns true (scale-down stays allowed). diff --git a/packages/core/src/v3/runEngineWorker/supervisor/consumerPool.test.ts b/packages/core/src/v3/runEngineWorker/supervisor/consumerPool.test.ts index 6093790b01..d6627b3004 100644 --- a/packages/core/src/v3/runEngineWorker/supervisor/consumerPool.test.ts +++ b/packages/core/src/v3/runEngineWorker/supervisor/consumerPool.test.ts @@ -718,4 +718,65 @@ describe("RunQueueConsumerPool", () => { expect(pool.size).toBe(1); }); }); + + describe("Backpressure scale-up freeze", () => { + it("freezes scale-up while shouldPauseScaling returns true, then resumes", async () => { + let paused = true; + pool = new RunQueueConsumerPool({ + ...defaultOptions, + scaling: { + strategy: "smooth", + minConsumerCount: 1, + maxConsumerCount: 10, + scaleUpCooldownMs: 0, + disableJitter: true, + shouldPauseScaling: () => paused, + }, + }); + await pool.start(); + expect(pool.size).toBe(1); + + // A high queue would normally scale up, but backpressure freezes it. + pool.updateQueueLength(10); + advanceTimeAndProcessMetrics(1100); + expect(pool.size).toBe(1); + + // Once backpressure releases, scaling resumes. + paused = false; + pool.updateQueueLength(10); + advanceTimeAndProcessMetrics(1100); + expect(pool.size).toBeGreaterThan(1); + }); + + it("still allows scale-down while paused", async () => { + let paused = false; + pool = new RunQueueConsumerPool({ + ...defaultOptions, + scaling: { + strategy: "smooth", + minConsumerCount: 1, + maxConsumerCount: 10, + scaleUpCooldownMs: 0, + scaleDownCooldownMs: 0, + disableJitter: true, + shouldPauseScaling: () => paused, + }, + }); + await pool.start(); + + pool.updateQueueLength(10); + advanceTimeAndProcessMetrics(1100); + const scaledUp = pool.size; + expect(scaledUp).toBeGreaterThan(1); + + // Pausing must not block shrinking - we want to drain down, just not grow. + // Loop to let the EWMA-smoothed queue length fall (one batch isn't enough). + paused = true; + for (let i = 0; i < 5; i++) { + pool.updateQueueLength(0); + advanceTimeAndProcessMetrics(1100); + } + expect(pool.size).toBeLessThan(scaledUp); + }); + }); }); diff --git a/packages/core/src/v3/runEngineWorker/supervisor/consumerPool.ts b/packages/core/src/v3/runEngineWorker/supervisor/consumerPool.ts index d72cef75c7..5c73a9819a 100644 --- a/packages/core/src/v3/runEngineWorker/supervisor/consumerPool.ts +++ b/packages/core/src/v3/runEngineWorker/supervisor/consumerPool.ts @@ -22,6 +22,12 @@ export type ScalingOptions = { batchWindowMs?: number; disableJitter?: boolean; dampingFactor?: number; + /** + * When this returns true, scale-up is frozen (scale-down still allowed). Used to + * stop the pool from adding consumers to drain a queue that backpressure is + * deliberately holding. Synchronous and hot-path-safe. + */ + shouldPauseScaling?: () => boolean; }; export type ConsumerPoolOptions = { @@ -49,6 +55,7 @@ export class RunQueueConsumerPool { private readonly maxConsumerCount: number; private readonly scalingStrategy: ScalingStrategy; private readonly disableJitter: boolean; + private readonly shouldPauseScaling?: () => boolean; private consumers: Map = new Map(); private readonly consumerFactory: QueueConsumerFactory; @@ -79,6 +86,7 @@ export class RunQueueConsumerPool { this.scaleUpCooldownMs = opts.scaling.scaleUpCooldownMs ?? 10000; // 10 seconds default this.scaleDownCooldownMs = opts.scaling.scaleDownCooldownMs ?? 60000; // 60 seconds default this.disableJitter = opts.scaling.disableJitter ?? false; + this.shouldPauseScaling = opts.scaling.shouldPauseScaling; // Configure EWMA parameters from options this.ewmaAlpha = opts.scaling.ewmaAlpha ?? 0.3; @@ -259,6 +267,16 @@ export class RunQueueConsumerPool { // Check cooldown periods with jitter if (targetCount > this.consumers.size) { + // Freeze scale-up while backpressure is engaged - don't add consumers to + // drain a queue we're deliberately holding. Scale-down stays allowed. + if (this.shouldPauseScaling?.()) { + this.logger.debug("Scale up frozen by backpressure", { + currentCount: this.consumers.size, + targetCount, + }); + return; + } + // Scale up const effectiveCooldown = this.scaleUpCooldownMs + jitterMs; if (timeSinceLastScale < effectiveCooldown) { From 30e476b0f538d5329bc1641705be021809f40d01 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Thu, 4 Jun 2026 18:06:30 +0100 Subject: [PATCH 6/9] feat(supervisor): wire scale-up freeze and resume ramp Pass shouldPauseScaling (monitor.isEngaged) into the consumer pool so scale-up freezes while hard-engaged, and feed TRIGGER_DEQUEUE_BACKPRESSURE_RAMP_MS into the monitor's post-release ramp. Off by default. --- apps/supervisor/src/env.ts | 2 ++ apps/supervisor/src/index.ts | 5 +++++ 2 files changed, 7 insertions(+) diff --git a/apps/supervisor/src/env.ts b/apps/supervisor/src/env.ts index 6476d34978..3ff7644275 100644 --- a/apps/supervisor/src/env.ts +++ b/apps/supervisor/src/env.ts @@ -54,6 +54,8 @@ const Env = z TRIGGER_DEQUEUE_BACKPRESSURE_ENABLED: BoolEnv.default(false), TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_KEY: z.string().default("engine:dequeue:backpressure"), TRIGGER_DEQUEUE_BACKPRESSURE_REFRESH_MS: z.coerce.number().int().positive().default(1000), + TRIGGER_DEQUEUE_BACKPRESSURE_RAMP_MS: z.coerce.number().int().min(0).default(30_000), // Resume ramp window after release; 0 = instant resume + TRIGGER_DEQUEUE_BACKPRESSURE_MAX_VERDICT_AGE_MS: z.coerce .number() .int() diff --git a/apps/supervisor/src/index.ts b/apps/supervisor/src/index.ts index ed59fd5abd..d3b3eb5445 100644 --- a/apps/supervisor/src/index.ts +++ b/apps/supervisor/src/index.ts @@ -208,12 +208,14 @@ class ManagedSupervisor { ), refreshIntervalMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_REFRESH_MS, maxVerdictAgeMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_MAX_VERDICT_AGE_MS, + rampMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_RAMP_MS, }); this.logger.log("đŸ›‘ Dequeue backpressure enabled", { key: env.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_KEY, refreshIntervalMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_REFRESH_MS, maxVerdictAgeMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_MAX_VERDICT_AGE_MS, + rampMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_RAMP_MS, }); } @@ -237,6 +239,9 @@ class ManagedSupervisor { ewmaAlpha: env.TRIGGER_DEQUEUE_SCALING_EWMA_ALPHA, batchWindowMs: env.TRIGGER_DEQUEUE_SCALING_BATCH_WINDOW_MS, dampingFactor: env.TRIGGER_DEQUEUE_SCALING_DAMPING_FACTOR, + // Freeze scale-up while backpressure is hard-engaged (not during the resume + // ramp). Undefined when backpressure is disabled → no effect on scaling. + shouldPauseScaling: () => this.backpressureMonitor?.isEngaged() ?? false, }, runNotificationsEnabled: env.TRIGGER_WORKLOAD_API_ENABLED, heartbeatIntervalSeconds: env.TRIGGER_WORKER_HEARTBEAT_INTERVAL_SECONDS, From e8cfce99556852b978f04401f6855f83267fa8f9 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Thu, 4 Jun 2026 18:35:20 +0100 Subject: [PATCH 7/9] feat(supervisor): backpressure dry-run mode and prometheus metrics Dry-run (default on via env) keeps the gates inert while computeEngaged still reflects the real signal and verdict transitions are logged. Adds BackpressureMetrics (engaged/dry_run gauges, skipped-dequeues counter). --- .../src/backpressure/backpressureMetrics.ts | 34 +++++++++ .../backpressure/backpressureMonitor.test.ts | 70 +++++++++++++++++++ .../src/backpressure/backpressureMonitor.ts | 53 ++++++++++++-- apps/supervisor/src/env.ts | 3 + apps/supervisor/src/index.ts | 5 ++ 5 files changed, 158 insertions(+), 7 deletions(-) create mode 100644 apps/supervisor/src/backpressure/backpressureMetrics.ts diff --git a/apps/supervisor/src/backpressure/backpressureMetrics.ts b/apps/supervisor/src/backpressure/backpressureMetrics.ts new file mode 100644 index 0000000000..ffe5762854 --- /dev/null +++ b/apps/supervisor/src/backpressure/backpressureMetrics.ts @@ -0,0 +1,34 @@ +import { Counter, Gauge, type Registry } from "prom-client"; + +/** Prometheus metrics for dequeue backpressure. */ +export class BackpressureMetrics { + /** 1 while backpressure is engaged (computed signal, set even in dry-run). */ + readonly engaged: Gauge; + /** 1 when running in dry-run (gates inert). */ + readonly dryRun: Gauge; + /** Dequeue attempts the gate skipped - or would have, in dry-run (labelled). */ + readonly skipsTotal: Counter; + + constructor(opts: { register: Registry; prefix?: string }) { + const prefix = opts.prefix ?? "supervisor_backpressure"; + + this.engaged = new Gauge({ + name: `${prefix}_engaged`, + help: "1 while dequeue backpressure is engaged (computed signal, regardless of dry-run)", + registers: [opts.register], + }); + + this.dryRun = new Gauge({ + name: `${prefix}_dry_run`, + help: "1 when dequeue backpressure is in dry-run mode (gates inert)", + registers: [opts.register], + }); + + this.skipsTotal = new Counter({ + name: `${prefix}_skipped_dequeues_total`, + help: "Dequeue attempts skipped by backpressure (or would be, in dry-run)", + labelNames: ["dry_run"], + registers: [opts.register], + }); + } +} diff --git a/apps/supervisor/src/backpressure/backpressureMonitor.test.ts b/apps/supervisor/src/backpressure/backpressureMonitor.test.ts index 0c5c8a2149..0b90957981 100644 --- a/apps/supervisor/src/backpressure/backpressureMonitor.test.ts +++ b/apps/supervisor/src/backpressure/backpressureMonitor.test.ts @@ -1,5 +1,7 @@ import { describe, it, expect, vi, beforeEach, afterEach } from "vitest"; +import { Registry } from "prom-client"; import { BackpressureMonitor, type BackpressureSignalSource } from "./backpressureMonitor.js"; +import { BackpressureMetrics } from "./backpressureMetrics.js"; function countingSource(verdict: { engaged: boolean } | null): { source: BackpressureSignalSource; @@ -225,6 +227,74 @@ describe("BackpressureMonitor", () => { monitor.stop(); }); + it("in dry-run, the gates are inert but computeEngaged still reflects the real signal", async () => { + const { source } = countingSource({ engaged: true }); + const monitor = new BackpressureMonitor({ + enabled: true, + source, + refreshIntervalMs: 1000, + dryRun: true, + }); + + monitor.start(); + await vi.advanceTimersByTimeAsync(0); + + expect(monitor.computeEngaged()).toBe(true); // real signal, for observability/metrics + expect(monitor.isEngaged()).toBe(false); // inert: no scale-up freeze + expect(monitor.shouldSkipDequeue()).toBe(false); // inert: no dequeue skip + + monitor.stop(); + }); + + it("logs on verdict transitions", async () => { + let engaged = true; + const source: BackpressureSignalSource = { read: async () => ({ engaged }) }; + const logs: Array<{ message: string; meta?: Record }> = []; + const logger = { + info: (message: string, meta?: Record) => logs.push({ message, meta }), + }; + const monitor = new BackpressureMonitor({ + enabled: true, + source, + refreshIntervalMs: 1000, + logger, + }); + + monitor.start(); + await vi.advanceTimersByTimeAsync(0); + expect(logs.some((l) => l.meta?.engaged === true)).toBe(true); + + engaged = false; + await vi.advanceTimersByTimeAsync(1000); + expect(logs.some((l) => l.meta?.engaged === false)).toBe(true); + + monitor.stop(); + }); + + it("records prometheus metrics", async () => { + const { source } = countingSource({ engaged: true }); + const register = new Registry(); + const metrics = new BackpressureMetrics({ register }); + const monitor = new BackpressureMonitor({ + enabled: true, + source, + refreshIntervalMs: 1000, + metrics, + }); + + monitor.start(); + await vi.advanceTimersByTimeAsync(0); + + expect(await register.metrics()).toContain("supervisor_backpressure_engaged 1"); + + monitor.shouldSkipDequeue(); + expect(await register.metrics()).toMatch( + /supervisor_backpressure_skipped_dequeues_total\{dry_run="false"\} [1-9]/ + ); + + monitor.stop(); + }); + it("resumes instantly when no ramp is configured", async () => { let engaged = true; const source: BackpressureSignalSource = { read: async () => ({ engaged }) }; diff --git a/apps/supervisor/src/backpressure/backpressureMonitor.ts b/apps/supervisor/src/backpressure/backpressureMonitor.ts index d1aa85483c..381f2dbfbf 100644 --- a/apps/supervisor/src/backpressure/backpressureMonitor.ts +++ b/apps/supervisor/src/backpressure/backpressureMonitor.ts @@ -1,3 +1,9 @@ +import type { BackpressureMetrics } from "./backpressureMetrics.js"; + +export interface BackpressureLogger { + info(message: string, meta?: Record): void; +} + export type BackpressureVerdict = { engaged: boolean; /** Epoch ms the verdict was produced. Used for consumer-side staleness fail-open. */ @@ -31,6 +37,13 @@ export type BackpressureMonitorOptions = { rampMs?: number; /** Injectable RNG for the resume ramp; defaults to Math.random. */ random?: () => number; + /** + * When true, the gates are inert (never skip dequeues, never freeze scale-up). + * computeEngaged() still reflects the real signal so it can be observed. + */ + dryRun?: boolean; + logger?: BackpressureLogger; + metrics?: BackpressureMetrics; }; const DEFAULT_REFRESH_INTERVAL_MS = 1000; @@ -41,7 +54,9 @@ export class BackpressureMonitor { private wasEngaged = false; private releasedAt?: number; - constructor(private readonly opts: BackpressureMonitorOptions) {} + constructor(private readonly opts: BackpressureMonitorOptions) { + this.opts.metrics?.dryRun.set(this.opts.dryRun ? 1 : 0); + } start(): void { if (!this.opts.enabled) { @@ -63,11 +78,11 @@ export class BackpressureMonitor { } /** - * Hard backpressure state: true while the (fresh) verdict says engaged. This is - * the signal for freezing consumer-pool scale-up - distinct from the dequeue - * gate, which additionally ramps after release. Hot-path read, no I/O. + * Raw hard backpressure state: true while the (fresh) verdict says engaged, + * ignoring dry-run. Used for observability/metrics so the real signal is + * visible even when the gates are inert. */ - isEngaged(): boolean { + computeEngaged(): boolean { const verdict = this.verdict; if (verdict?.engaged !== true) { return false; @@ -81,9 +96,25 @@ export class BackpressureMonitor { return true; } - /** Hot-path read: synchronous, never performs I/O. */ + /** + * Effective hard state: the signal for freezing consumer-pool scale-up. Inert + * (false) in dry-run. Hot-path read, no I/O. + */ + isEngaged(): boolean { + return this.opts.dryRun ? false : this.computeEngaged(); + } + + /** Hot-path read: synchronous, never performs I/O. Inert (false) in dry-run. */ shouldSkipDequeue(): boolean { - if (this.isEngaged()) { + const wouldSkip = this.computeShouldSkip(); + if (wouldSkip) { + this.opts.metrics?.skipsTotal.inc({ dry_run: this.opts.dryRun ? "true" : "false" }); + } + return this.opts.dryRun ? false : wouldSkip; + } + + private computeShouldSkip(): boolean { + if (this.computeEngaged()) { return true; } @@ -113,6 +144,14 @@ export class BackpressureMonitor { // Track the engaged→released transition to anchor the resume ramp. Based on // the raw refreshed verdict, not the staleness-adjusted read. const nowEngaged = this.verdict?.engaged === true; + this.opts.metrics?.engaged.set(nowEngaged ? 1 : 0); + + if (nowEngaged !== this.wasEngaged) { + this.opts.logger?.info("backpressure verdict changed", { + engaged: nowEngaged, + dryRun: !!this.opts.dryRun, + }); + } if (this.wasEngaged && !nowEngaged) { this.releasedAt = Date.now(); } diff --git a/apps/supervisor/src/env.ts b/apps/supervisor/src/env.ts index 3ff7644275..176b98a8b4 100644 --- a/apps/supervisor/src/env.ts +++ b/apps/supervisor/src/env.ts @@ -52,6 +52,9 @@ const Env = z // while the worker cluster can't schedule pods. Disabled = total no-op: no Redis // client is created, no reads happen, and the dequeue loop is unaffected. TRIGGER_DEQUEUE_BACKPRESSURE_ENABLED: BoolEnv.default(false), + // Safety default: even when enabled, backpressure only logs what it would do. + // Set to false to actually skip dequeues / freeze scale-up. + TRIGGER_DEQUEUE_BACKPRESSURE_DRY_RUN: BoolEnv.default(true), TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_KEY: z.string().default("engine:dequeue:backpressure"), TRIGGER_DEQUEUE_BACKPRESSURE_REFRESH_MS: z.coerce.number().int().positive().default(1000), TRIGGER_DEQUEUE_BACKPRESSURE_RAMP_MS: z.coerce.number().int().min(0).default(30_000), // Resume ramp window after release; 0 = instant resume diff --git a/apps/supervisor/src/index.ts b/apps/supervisor/src/index.ts index d3b3eb5445..64052cadd6 100644 --- a/apps/supervisor/src/index.ts +++ b/apps/supervisor/src/index.ts @@ -31,6 +31,7 @@ import { extractTraceparent, getRestoreRunnerId } from "./util.js"; import { createRedisClient } from "@internal/redis"; import { BackpressureMonitor } from "./backpressure/backpressureMonitor.js"; import { RedisBackpressureSignalSource } from "./backpressure/redisBackpressureSignalSource.js"; +import { BackpressureMetrics } from "./backpressure/backpressureMetrics.js"; import { fromContext, recordPhaseSince, @@ -209,6 +210,9 @@ class ManagedSupervisor { refreshIntervalMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_REFRESH_MS, maxVerdictAgeMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_MAX_VERDICT_AGE_MS, rampMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_RAMP_MS, + dryRun: env.TRIGGER_DEQUEUE_BACKPRESSURE_DRY_RUN, + logger: this.logger, + metrics: new BackpressureMetrics({ register }), }); this.logger.log("đŸ›‘ Dequeue backpressure enabled", { @@ -216,6 +220,7 @@ class ManagedSupervisor { refreshIntervalMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_REFRESH_MS, maxVerdictAgeMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_MAX_VERDICT_AGE_MS, rampMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_RAMP_MS, + dryRun: env.TRIGGER_DEQUEUE_BACKPRESSURE_DRY_RUN, }); } From e6b5a67bae2c6973a58a1f4712a39fea3dfcb66b Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Thu, 4 Jun 2026 19:15:42 +0100 Subject: [PATCH 8/9] fix(supervisor): address review feedback on backpressure monitor Guard against overlapping refresh ticks when a read hangs; use the staleness-aware computeEngaged() for transition/ramp/gauge bookkeeping; close the backpressure Redis client on supervisor shutdown. --- .../backpressure/backpressureMonitor.test.ts | 19 +++++++++++++++ .../src/backpressure/backpressureMonitor.ts | 24 +++++++++++++++---- apps/supervisor/src/index.ts | 8 ++++--- 3 files changed, 43 insertions(+), 8 deletions(-) diff --git a/apps/supervisor/src/backpressure/backpressureMonitor.test.ts b/apps/supervisor/src/backpressure/backpressureMonitor.test.ts index 0b90957981..e54314ad82 100644 --- a/apps/supervisor/src/backpressure/backpressureMonitor.test.ts +++ b/apps/supervisor/src/backpressure/backpressureMonitor.test.ts @@ -140,6 +140,25 @@ describe("BackpressureMonitor", () => { monitor.stop(); }); + it("does not start an overlapping refresh while one is in flight", async () => { + let reads = 0; + const source: BackpressureSignalSource = { + // Never resolves - simulates a hung read. + read: () => { + reads++; + return new Promise<{ engaged: boolean } | null>(() => {}); + }, + }; + const monitor = new BackpressureMonitor({ enabled: true, source, refreshIntervalMs: 1000 }); + + monitor.start(); + await vi.advanceTimersByTimeAsync(3000); // several intervals while the first read hangs + + expect(reads).toBe(1); // in-flight guard prevents stacking + + monitor.stop(); + }); + it("stops refreshing after stop()", async () => { const { source, reads } = countingSource({ engaged: true }); const monitor = new BackpressureMonitor({ enabled: true, source, refreshIntervalMs: 1000 }); diff --git a/apps/supervisor/src/backpressure/backpressureMonitor.ts b/apps/supervisor/src/backpressure/backpressureMonitor.ts index 381f2dbfbf..d9ee995197 100644 --- a/apps/supervisor/src/backpressure/backpressureMonitor.ts +++ b/apps/supervisor/src/backpressure/backpressureMonitor.ts @@ -51,6 +51,7 @@ const DEFAULT_REFRESH_INTERVAL_MS = 1000; export class BackpressureMonitor { private verdict: BackpressureVerdict | null = null; private timer?: ReturnType; + private refreshInFlight = false; private wasEngaged = false; private releasedAt?: number; @@ -63,13 +64,26 @@ export class BackpressureMonitor { return; } - void this.refresh(); + void this.refreshTick(); this.timer = setInterval( - () => void this.refresh(), + () => void this.refreshTick(), this.opts.refreshIntervalMs ?? DEFAULT_REFRESH_INTERVAL_MS ); } + /** Skip a tick if the previous refresh is still in flight, so slow/hung reads can't stack. */ + private async refreshTick(): Promise { + if (this.refreshInFlight) { + return; + } + this.refreshInFlight = true; + try { + await this.refresh(); + } finally { + this.refreshInFlight = false; + } + } + stop(): void { if (this.timer) { clearInterval(this.timer); @@ -141,9 +155,9 @@ export class BackpressureMonitor { this.verdict = null; } - // Track the engaged→released transition to anchor the resume ramp. Based on - // the raw refreshed verdict, not the staleness-adjusted read. - const nowEngaged = this.verdict?.engaged === true; + // Track the engaged→released transition to anchor the resume ramp. Use the + // staleness-aware state so a stale verdict doesn't pin wasEngaged / the gauge. + const nowEngaged = this.computeEngaged(); this.opts.metrics?.engaged.set(nowEngaged ? 1 : 0); if (nowEngaged !== this.wasEngaged) { diff --git a/apps/supervisor/src/index.ts b/apps/supervisor/src/index.ts index 64052cadd6..829073e484 100644 --- a/apps/supervisor/src/index.ts +++ b/apps/supervisor/src/index.ts @@ -28,7 +28,7 @@ import { FailedPodHandler } from "./services/failedPodHandler.js"; import { getWorkerToken } from "./workerToken.js"; import { OtlpTraceService } from "./services/otlpTraceService.js"; import { extractTraceparent, getRestoreRunnerId } from "./util.js"; -import { createRedisClient } from "@internal/redis"; +import { createRedisClient, type Redis } from "@internal/redis"; import { BackpressureMonitor } from "./backpressure/backpressureMonitor.js"; import { RedisBackpressureSignalSource } from "./backpressure/redisBackpressureSignalSource.js"; import { BackpressureMetrics } from "./backpressure/backpressureMetrics.js"; @@ -59,6 +59,7 @@ class ManagedSupervisor { private readonly failedPodHandler?: FailedPodHandler; private readonly tracing?: OtlpTraceService; private readonly backpressureMonitor?: BackpressureMonitor; + private readonly backpressureRedis?: Redis; private readonly isKubernetes = isKubernetesEnvironment(env.KUBERNETES_FORCE_ENABLED); private readonly warmStartUrl = env.TRIGGER_WARM_START_URL; @@ -187,7 +188,7 @@ class ManagedSupervisor { } if (env.TRIGGER_DEQUEUE_BACKPRESSURE_ENABLED) { - const backpressureRedis = createRedisClient( + this.backpressureRedis = createRedisClient( { host: env.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_HOST, port: env.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_PORT, @@ -204,7 +205,7 @@ class ManagedSupervisor { this.backpressureMonitor = new BackpressureMonitor({ enabled: true, source: new RedisBackpressureSignalSource( - backpressureRedis, + this.backpressureRedis, env.TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_KEY ), refreshIntervalMs: env.TRIGGER_DEQUEUE_BACKPRESSURE_REFRESH_MS, @@ -626,6 +627,7 @@ class ManagedSupervisor { // Optional services this.backpressureMonitor?.stop(); + await this.backpressureRedis?.quit(); await this.podCleaner?.stop(); await this.failedPodHandler?.stop(); await this.metricsServer?.stop(); From 44842f959f85d4d93e469d6477c9f7e381655068 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Thu, 4 Jun 2026 19:34:03 +0100 Subject: [PATCH 9/9] fix(supervisor): strip backpressure redis password from debug env log Add TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_PASSWORD to the secret strip-list so it never lands in the DEBUG startup log, with a comment to keep new secrets out. --- apps/supervisor/src/index.ts | 3 +++ 1 file changed, 3 insertions(+) diff --git a/apps/supervisor/src/index.ts b/apps/supervisor/src/index.ts index 829073e484..572121c7e8 100644 --- a/apps/supervisor/src/index.ts +++ b/apps/supervisor/src/index.ts @@ -72,10 +72,13 @@ class ManagedSupervisor { private readonly wideEventsNoisyRoutes = env.TRIGGER_WIDE_EVENTS_NOISY_ROUTES; constructor() { + // Strip secret-like env vars before debug-logging the rest. Add any new + // secret env var here so it never lands in the DEBUG "Starting up" log. const { TRIGGER_WORKER_TOKEN, MANAGED_WORKER_SECRET, COMPUTE_GATEWAY_AUTH_TOKEN, + TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_PASSWORD, ...envWithoutSecrets } = env;