diff --git a/.changeset/consent-queue-guardrails.md b/.changeset/consent-queue-guardrails.md new file mode 100644 index 0000000..5e95eaa --- /dev/null +++ b/.changeset/consent-queue-guardrails.md @@ -0,0 +1,9 @@ +--- +"@junctionjs/core": minor +--- + +feat(core): add consent queue guardrails — maxQueueSize, queue:drop telemetry, flush() drains queue + +- Add `maxQueueSize` option to `ConsentConfig` — drops oldest events when exceeded +- Emit `queue:drop` events with `{ count, reason }` when events are lost to timeout or overflow +- `flush()` now drains the consent queue for permitted events, critical for page unload diff --git a/packages/core/src/collector.test.ts b/packages/core/src/collector.test.ts index 745e6f5..76edaad 100644 --- a/packages/core/src/collector.test.ts +++ b/packages/core/src/collector.test.ts @@ -904,4 +904,128 @@ describe("Collector", () => { expect(dest.onConsent).toHaveBeenCalledWith({ necessary: true, analytics: false }); }); }); + + describe("queue:drop telemetry", () => { + it("emits queue:drop when events expire from consent queue", () => { + const dropHandler = vi.fn(); + const options = makeOptions(); + options.config.consent.defaultState = {}; // analytics pending + options.config.consent.queueTimeout = 5000; + const dest = mockDestination({ consent: ["analytics"] }); + options.config.destinations = [{ destination: dest, config: {} }]; + + const collector = createCollector(options); + collector.on("queue:drop", dropHandler); + + collector.track("page", "viewed"); + + // Flush buffer so event enters consent queue + vi.advanceTimersByTime(3000); + + // Advance past queue timeout — cleanup runs every 5s, + // event was queued at ~3s, so need to reach the 10s mark + // for the second cleanup pass to find it expired. + vi.advanceTimersByTime(7000); + + expect(dropHandler).toHaveBeenCalledTimes(1); + expect(dropHandler).toHaveBeenCalledWith( + expect.objectContaining({ + type: "queue:drop", + payload: { count: 1, reason: "timeout" }, + }), + ); + }); + + it("emits queue:drop when maxQueueSize is exceeded", () => { + const dropHandler = vi.fn(); + const options = makeOptions(); + options.config.consent.defaultState = {}; // analytics pending + options.config.consent.maxQueueSize = 2; + const dest = mockDestination({ consent: ["analytics"] }); + options.config.destinations = [{ destination: dest, config: {} }]; + + const collector = createCollector(options); + collector.on("queue:drop", dropHandler); + + collector.track("test", "one"); + collector.track("test", "two"); + collector.track("test", "three"); + + // Flush buffer so all 3 enter consent queue (max 2) + vi.advanceTimersByTime(3000); + + expect(dropHandler).toHaveBeenCalledWith( + expect.objectContaining({ + type: "queue:drop", + payload: { count: 1, reason: "overflow" }, + }), + ); + }); + }); + + describe("flush drains consent queue", () => { + it("dispatches queued events on flush() when consent is now granted", async () => { + const dest = mockDestination({ consent: ["analytics"] }); + const options = makeOptions(); + options.config.consent.defaultState = {}; // analytics pending + options.config.destinations = [{ destination: dest, config: {} }]; + + const collector = createCollector(options); + await vi.advanceTimersByTimeAsync(0); // init destinations + + collector.track("page", "viewed"); + + // Flush buffer — event enters consent queue (analytics pending) + vi.advanceTimersByTime(3000); + expect(dest.transform).not.toHaveBeenCalled(); + + // Grant consent without triggering onChange replay + // (simulate: consent granted, then flush() called on unload) + collector.consent({ analytics: true }); + // onChange already drained and replayed — but let's test flush() independently. + // Reset the mock to see if flush() also works + (dest.transform as any).mockClear(); + (dest.send as any).mockClear(); + + // Track another event while analytics is now granted but still in buffer + collector.track("page", "scrolled"); + + // flush() should dispatch the buffered event + await collector.flush(); + + expect(dest.transform).toHaveBeenCalled(); + const event = (dest.transform as any).mock.calls[0][0] as JctEvent; + expect(event.entity).toBe("page"); + expect(event.action).toBe("scrolled"); + }); + + it("flush() dispatches consent-queued events that are now permitted", async () => { + const exemptDest = mockDestination({ name: "exempt-dest", consent: ["exempt"] }); + const analyticsDest = mockDestination({ name: "analytics-dest", consent: ["analytics"] }); + const options = makeOptions(); + options.config.consent.defaultState = {}; // analytics pending + options.config.destinations = [ + { destination: exemptDest, config: {} }, + { destination: analyticsDest, config: {} }, + ]; + + const collector = createCollector(options); + await vi.advanceTimersByTimeAsync(0); // init destinations + + collector.track("page", "viewed"); + vi.advanceTimersByTime(3000); // flush buffer + + // exempt dest got the event, analytics dest didn't + expect(exemptDest.transform).toHaveBeenCalledTimes(1); + expect(analyticsDest.transform).not.toHaveBeenCalled(); + + // Grant consent — onChange fires and drains queue + collector.consent({ analytics: true }); + + // Analytics dest should have received the replayed event + expect(analyticsDest.transform).toHaveBeenCalledTimes(1); + const event = (analyticsDest.transform as any).mock.calls[0][0] as JctEvent; + expect(event.context.was_queued).toBe(true); + }); + }); }); diff --git a/packages/core/src/collector.ts b/packages/core/src/collector.ts index 8bd1281..85881b3 100644 --- a/packages/core/src/collector.ts +++ b/packages/core/src/collector.ts @@ -126,6 +126,15 @@ export function createCollector(options: CreateCollectorOptions): Collector { // Start initialization (fire and forget — destinations buffer until ready) const initPromise = initDestinations(); + // ── Queue drop telemetry ───────────────────────────────── + + consent.onDrop((count, reason) => { + emit("queue:drop", { count, reason }); + if (config.debug) { + console.warn(`[Junction] Dropped ${count} queued event(s): ${reason}`); + } + }); + // ── Consent change handler ───────────────────────────── consent.onChange((state, _previous) => { @@ -383,6 +392,26 @@ export function createCollector(options: CreateCollectorOptions): Collector { async flush(): Promise { await initPromise; // ensure destinations are ready flushBuffer(); + + // Also drain the consent queue — any events where consent is now + // granted get dispatched. Critical for page unload: the client calls + // flush() on pagehide, and we need to dispatch everything we can. + const queued = consent.drain(); + if (queued.length > 0) { + for (const { event, sentTo } of queued) { + const updatedEvent = { + ...event, + user: { ...user }, + context: { + ...event.context, + was_queued: true, + consent: consent.getState(), + }, + }; + dispatchToDestinations(updatedEvent, sentTo); + } + emit("queue:flush", { count: queued.length }); + } }, async shutdown(): Promise { diff --git a/packages/core/src/consent.test.ts b/packages/core/src/consent.test.ts index 3828b50..f50ea92 100644 --- a/packages/core/src/consent.test.ts +++ b/packages/core/src/consent.test.ts @@ -461,4 +461,102 @@ describe("ConsentManager", () => { expect(listener).not.toHaveBeenCalled(); }); }); + + describe("queue drop telemetry", () => { + it("emits drop event when events expire from queue", () => { + const dropListener = vi.fn(); + manager = createConsentManager(makeConfig({ queueTimeout: 5000 })); + manager.onDrop(dropListener); + + manager.enqueue(makeEvent({ id: "evt-1" }), new Set()); + manager.enqueue(makeEvent({ id: "evt-2" }), new Set()); + + // Advance past the queue timeout + vi.advanceTimersByTime(6000); + + expect(dropListener).toHaveBeenCalledTimes(1); + expect(dropListener).toHaveBeenCalledWith(2, "timeout"); + expect(manager.queueSize()).toBe(0); + }); + + it("does not emit drop event when no events expired", () => { + const dropListener = vi.fn(); + manager = createConsentManager(makeConfig({ queueTimeout: 10_000 })); + manager.onDrop(dropListener); + + manager.enqueue(makeEvent({ id: "evt-1" }), new Set()); + + // Advance but not past timeout + vi.advanceTimersByTime(5000); + + expect(dropListener).not.toHaveBeenCalled(); + expect(manager.queueSize()).toBe(1); + }); + + it("unsubscribes drop listener", () => { + const dropListener = vi.fn(); + manager = createConsentManager(makeConfig({ queueTimeout: 5000 })); + const unsub = manager.onDrop(dropListener); + + manager.enqueue(makeEvent({ id: "evt-1" }), new Set()); + unsub(); + + vi.advanceTimersByTime(6000); + + expect(dropListener).not.toHaveBeenCalled(); + }); + }); + + describe("maxQueueSize", () => { + it("drops oldest events when queue exceeds maxQueueSize", () => { + const dropListener = vi.fn(); + manager = createConsentManager(makeConfig({ maxQueueSize: 3 })); + manager.onDrop(dropListener); + + manager.enqueue(makeEvent({ id: "evt-1" }), new Set()); + manager.enqueue(makeEvent({ id: "evt-2" }), new Set()); + manager.enqueue(makeEvent({ id: "evt-3" }), new Set()); + expect(manager.queueSize()).toBe(3); + expect(dropListener).not.toHaveBeenCalled(); + + // Adding a 4th should drop the oldest + manager.enqueue(makeEvent({ id: "evt-4" }), new Set()); + + expect(manager.queueSize()).toBe(3); + expect(dropListener).toHaveBeenCalledWith(1, "overflow"); + + // Verify the oldest was dropped: drain and check IDs + const drained = manager.drain(); + expect(drained.map((q) => q.event.id)).toEqual(["evt-2", "evt-3", "evt-4"]); + }); + + it("does not enforce limit when maxQueueSize is not set", () => { + manager = createConsentManager(makeConfig()); + + for (let i = 0; i < 50; i++) { + manager.enqueue(makeEvent({ id: `evt-${i}` }), new Set()); + } + + expect(manager.queueSize()).toBe(50); + }); + + it("drops multiple oldest when queue is far over limit", () => { + const dropListener = vi.fn(); + manager = createConsentManager(makeConfig({ maxQueueSize: 2 })); + manager.onDrop(dropListener); + + // Fill to capacity + manager.enqueue(makeEvent({ id: "evt-1" }), new Set()); + manager.enqueue(makeEvent({ id: "evt-2" }), new Set()); + + // Add one more — should drop 1 + manager.enqueue(makeEvent({ id: "evt-3" }), new Set()); + + expect(dropListener).toHaveBeenCalledWith(1, "overflow"); + expect(manager.queueSize()).toBe(2); + + const drained = manager.drain(); + expect(drained.map((q) => q.event.id)).toEqual(["evt-2", "evt-3"]); + }); + }); }); diff --git a/packages/core/src/consent.ts b/packages/core/src/consent.ts index 12a7ac6..404d1cb 100644 --- a/packages/core/src/consent.ts +++ b/packages/core/src/consent.ts @@ -23,7 +23,10 @@ export interface QueuedEvent { sentTo: Set; } +export type DropReason = "timeout" | "overflow"; + type ConsentListener = (state: ConsentState, previous: ConsentState) => void; +type DropListener = (count: number, reason: DropReason) => void; export interface ConsentManager { /** Get current consent state */ @@ -50,6 +53,9 @@ export interface ConsentManager { /** Subscribe to consent changes */ onChange: (listener: ConsentListener) => () => void; + /** Subscribe to queue drop events (timeout or overflow) */ + onDrop: (listener: DropListener) => () => void; + /** Number of queued events */ queueSize: () => number; @@ -63,6 +69,7 @@ export function createConsentManager(config: ConsentConfig): ConsentManager { let state: ConsentState = { necessary: true, ...config.defaultState }; let queue: QueuedEvent[] = []; const listeners = new Set(); + const dropListeners = new Set(); // Check DNT/GPC on creation if (typeof globalThis.navigator !== "undefined") { @@ -74,6 +81,16 @@ export function createConsentManager(config: ConsentConfig): ConsentManager { } } + function notifyDrop(count: number, reason: DropReason): void { + for (const listener of dropListeners) { + try { + listener(count, reason); + } catch (e) { + console.error("[Junction] Drop listener error:", e); + } + } + } + // Queue cleanup timer — expire old events let _cleanupTimer: ReturnType | null = null; @@ -81,7 +98,12 @@ export function createConsentManager(config: ConsentConfig): ConsentManager { _cleanupTimer = setInterval( () => { const now = Date.now(); + const before = queue.length; queue = queue.filter((item) => now - item.queuedAt < config.queueTimeout); + const dropped = before - queue.length; + if (dropped > 0) { + notifyDrop(dropped, "timeout"); + } }, Math.min(config.queueTimeout, 30_000), ); @@ -161,6 +183,14 @@ export function createConsentManager(config: ConsentConfig): ConsentManager { for (const name of sentTo) existing.sentTo.add(name); return; } + + // Enforce max queue size — drop oldest events to make room + if (config.maxQueueSize && config.maxQueueSize > 0 && queue.length >= config.maxQueueSize) { + const overflow = queue.length - config.maxQueueSize + 1; + queue.splice(0, overflow); + notifyDrop(overflow, "overflow"); + } + queue.push({ event, queuedAt: Date.now(), sentTo: new Set(sentTo) }); }, @@ -177,6 +207,13 @@ export function createConsentManager(config: ConsentConfig): ConsentManager { }; }, + onDrop(listener: DropListener) { + dropListeners.add(listener); + return () => { + dropListeners.delete(listener); + }; + }, + queueSize() { return queue.length; }, diff --git a/packages/core/src/types.ts b/packages/core/src/types.ts index 610e22c..33184a7 100644 --- a/packages/core/src/types.ts +++ b/packages/core/src/types.ts @@ -157,6 +157,13 @@ export interface ConsentConfig { /** Vendor consent protocol signals — fired on every consent state change */ signals?: ConsentSignal[]; + /** + * Max number of events to hold in the consent queue. + * When exceeded, the oldest events are dropped (with a `queue:drop` event). + * Default: no limit. + */ + maxQueueSize?: number; + /** * Strict GDPR mode. When true: * - Event queuing is disabled (pending = denied) @@ -461,6 +468,7 @@ export type CollectorEvent = | "destination:error" // destination send failed | "destination:init" // destination initialized | "queue:flush" // consent queue flushed + | "queue:drop" // events dropped from consent queue (timeout or overflow) | "error"; // any error export type CollectorEventHandler = (data: {