consumer.ts ×
retry-queue.ts ×
schema.ts ×
telemetry.ts ×
1234567
891011121314
●151617181920
212223242526
27282930
// retry-queue.ts · exponential back-off with jitter, bounded memory.
// Fixes #2817 — queue grew unbounded under sustained upstream 503s.
import { Clock, now } from "./clock";
import { Telemetry } from "./telemetry";
export interface RetryOptions {
maxAttempts: number;
baseDelayMs: number;
capDelayMs: number;
jitter: "full" | "equal" | "none";
}
export class RetryQueue<T> {
private items: Entry<T>[] = [];
private readonly max = 10_000; // drop-head when exceeded
constructor(
private opts: RetryOptions,
private clock: Clock = now,
private tel?: Telemetry,
) {}
enqueue(item: T): void {
if (this.items.length >= this.max) {
this.items.shift(); // drop oldest
this.tel?.count("queue.overflow", 1);
}
this.items.push({ item, attempt: 0, nextAt: this.clock() });
}
// ghost suggestion below — press Tab to accept
dequeueReady(): T[] { ... }
}