Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .changeset/consent-queue-guardrails.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
"@junctionjs/core": minor
---

feat(core): add consent queue guardrails — maxQueueSize, queue:drop telemetry, flush() drains queue

- Add `maxQueueSize` option to `ConsentConfig` — drops oldest events when exceeded
- Emit `queue:drop` events with `{ count, reason }` when events are lost to timeout or overflow
- `flush()` now drains the consent queue for permitted events, critical for page unload
124 changes: 124 additions & 0 deletions packages/core/src/collector.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -904,4 +904,128 @@ describe("Collector", () => {
expect(dest.onConsent).toHaveBeenCalledWith({ necessary: true, analytics: false });
});
});

describe("queue:drop telemetry", () => {
it("emits queue:drop when events expire from consent queue", () => {
const dropHandler = vi.fn();
const options = makeOptions();
options.config.consent.defaultState = {}; // analytics pending
options.config.consent.queueTimeout = 5000;
const dest = mockDestination({ consent: ["analytics"] });
options.config.destinations = [{ destination: dest, config: {} }];

const collector = createCollector(options);
collector.on("queue:drop", dropHandler);

collector.track("page", "viewed");

// Flush buffer so event enters consent queue
vi.advanceTimersByTime(3000);

// Advance past queue timeout — cleanup runs every 5s,
// event was queued at ~3s, so need to reach the 10s mark
// for the second cleanup pass to find it expired.
vi.advanceTimersByTime(7000);

expect(dropHandler).toHaveBeenCalledTimes(1);
expect(dropHandler).toHaveBeenCalledWith(
expect.objectContaining({
type: "queue:drop",
payload: { count: 1, reason: "timeout" },
}),
);
});

it("emits queue:drop when maxQueueSize is exceeded", () => {
const dropHandler = vi.fn();
const options = makeOptions();
options.config.consent.defaultState = {}; // analytics pending
options.config.consent.maxQueueSize = 2;
const dest = mockDestination({ consent: ["analytics"] });
options.config.destinations = [{ destination: dest, config: {} }];

const collector = createCollector(options);
collector.on("queue:drop", dropHandler);

collector.track("test", "one");
collector.track("test", "two");
collector.track("test", "three");

// Flush buffer so all 3 enter consent queue (max 2)
vi.advanceTimersByTime(3000);

expect(dropHandler).toHaveBeenCalledWith(
expect.objectContaining({
type: "queue:drop",
payload: { count: 1, reason: "overflow" },
}),
);
});
});

describe("flush drains consent queue", () => {
it("dispatches queued events on flush() when consent is now granted", async () => {
const dest = mockDestination({ consent: ["analytics"] });
const options = makeOptions();
options.config.consent.defaultState = {}; // analytics pending
options.config.destinations = [{ destination: dest, config: {} }];

const collector = createCollector(options);
await vi.advanceTimersByTimeAsync(0); // init destinations

collector.track("page", "viewed");

// Flush buffer — event enters consent queue (analytics pending)
vi.advanceTimersByTime(3000);
expect(dest.transform).not.toHaveBeenCalled();

// Grant consent without triggering onChange replay
// (simulate: consent granted, then flush() called on unload)
collector.consent({ analytics: true });
// onChange already drained and replayed — but let's test flush() independently.
// Reset the mock to see if flush() also works
(dest.transform as any).mockClear();
(dest.send as any).mockClear();

// Track another event while analytics is now granted but still in buffer
collector.track("page", "scrolled");

// flush() should dispatch the buffered event
await collector.flush();

expect(dest.transform).toHaveBeenCalled();
const event = (dest.transform as any).mock.calls[0][0] as JctEvent;
expect(event.entity).toBe("page");
expect(event.action).toBe("scrolled");
});

it("flush() dispatches consent-queued events that are now permitted", async () => {
const exemptDest = mockDestination({ name: "exempt-dest", consent: ["exempt"] });
const analyticsDest = mockDestination({ name: "analytics-dest", consent: ["analytics"] });
const options = makeOptions();
options.config.consent.defaultState = {}; // analytics pending
options.config.destinations = [
{ destination: exemptDest, config: {} },
{ destination: analyticsDest, config: {} },
];

const collector = createCollector(options);
await vi.advanceTimersByTimeAsync(0); // init destinations

collector.track("page", "viewed");
vi.advanceTimersByTime(3000); // flush buffer

// exempt dest got the event, analytics dest didn't
expect(exemptDest.transform).toHaveBeenCalledTimes(1);
expect(analyticsDest.transform).not.toHaveBeenCalled();

// Grant consent — onChange fires and drains queue
collector.consent({ analytics: true });

// Analytics dest should have received the replayed event
expect(analyticsDest.transform).toHaveBeenCalledTimes(1);
const event = (analyticsDest.transform as any).mock.calls[0][0] as JctEvent;
expect(event.context.was_queued).toBe(true);
});
});
});
29 changes: 29 additions & 0 deletions packages/core/src/collector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,15 @@ export function createCollector(options: CreateCollectorOptions): Collector {
// Start initialization (fire and forget — destinations buffer until ready)
const initPromise = initDestinations();

// ── Queue drop telemetry ─────────────────────────────────

consent.onDrop((count, reason) => {
emit("queue:drop", { count, reason });
if (config.debug) {
console.warn(`[Junction] Dropped ${count} queued event(s): ${reason}`);
}
});

// ── Consent change handler ─────────────────────────────

consent.onChange((state, _previous) => {
Expand Down Expand Up @@ -383,6 +392,26 @@ export function createCollector(options: CreateCollectorOptions): Collector {
async flush(): Promise<void> {
await initPromise; // ensure destinations are ready
flushBuffer();

// Also drain the consent queue — any events where consent is now
// granted get dispatched. Critical for page unload: the client calls
// flush() on pagehide, and we need to dispatch everything we can.
const queued = consent.drain();
if (queued.length > 0) {
for (const { event, sentTo } of queued) {
const updatedEvent = {
...event,
user: { ...user },
context: {
...event.context,
was_queued: true,
consent: consent.getState(),
},
};
dispatchToDestinations(updatedEvent, sentTo);
}
emit("queue:flush", { count: queued.length });
}
},

async shutdown(): Promise<void> {
Expand Down
98 changes: 98 additions & 0 deletions packages/core/src/consent.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -461,4 +461,102 @@ describe("ConsentManager", () => {
expect(listener).not.toHaveBeenCalled();
});
});

describe("queue drop telemetry", () => {
it("emits drop event when events expire from queue", () => {
const dropListener = vi.fn();
manager = createConsentManager(makeConfig({ queueTimeout: 5000 }));
manager.onDrop(dropListener);

manager.enqueue(makeEvent({ id: "evt-1" }), new Set());
manager.enqueue(makeEvent({ id: "evt-2" }), new Set());

// Advance past the queue timeout
vi.advanceTimersByTime(6000);

expect(dropListener).toHaveBeenCalledTimes(1);
expect(dropListener).toHaveBeenCalledWith(2, "timeout");
expect(manager.queueSize()).toBe(0);
});

it("does not emit drop event when no events expired", () => {
const dropListener = vi.fn();
manager = createConsentManager(makeConfig({ queueTimeout: 10_000 }));
manager.onDrop(dropListener);

manager.enqueue(makeEvent({ id: "evt-1" }), new Set());

// Advance but not past timeout
vi.advanceTimersByTime(5000);

expect(dropListener).not.toHaveBeenCalled();
expect(manager.queueSize()).toBe(1);
});

it("unsubscribes drop listener", () => {
const dropListener = vi.fn();
manager = createConsentManager(makeConfig({ queueTimeout: 5000 }));
const unsub = manager.onDrop(dropListener);

manager.enqueue(makeEvent({ id: "evt-1" }), new Set());
unsub();

vi.advanceTimersByTime(6000);

expect(dropListener).not.toHaveBeenCalled();
});
});

describe("maxQueueSize", () => {
it("drops oldest events when queue exceeds maxQueueSize", () => {
const dropListener = vi.fn();
manager = createConsentManager(makeConfig({ maxQueueSize: 3 }));
manager.onDrop(dropListener);

manager.enqueue(makeEvent({ id: "evt-1" }), new Set());
manager.enqueue(makeEvent({ id: "evt-2" }), new Set());
manager.enqueue(makeEvent({ id: "evt-3" }), new Set());
expect(manager.queueSize()).toBe(3);
expect(dropListener).not.toHaveBeenCalled();

// Adding a 4th should drop the oldest
manager.enqueue(makeEvent({ id: "evt-4" }), new Set());

expect(manager.queueSize()).toBe(3);
expect(dropListener).toHaveBeenCalledWith(1, "overflow");

// Verify the oldest was dropped: drain and check IDs
const drained = manager.drain();
expect(drained.map((q) => q.event.id)).toEqual(["evt-2", "evt-3", "evt-4"]);
});

it("does not enforce limit when maxQueueSize is not set", () => {
manager = createConsentManager(makeConfig());

for (let i = 0; i < 50; i++) {
manager.enqueue(makeEvent({ id: `evt-${i}` }), new Set());
}

expect(manager.queueSize()).toBe(50);
});

it("drops multiple oldest when queue is far over limit", () => {
const dropListener = vi.fn();
manager = createConsentManager(makeConfig({ maxQueueSize: 2 }));
manager.onDrop(dropListener);

// Fill to capacity
manager.enqueue(makeEvent({ id: "evt-1" }), new Set());
manager.enqueue(makeEvent({ id: "evt-2" }), new Set());

// Add one more — should drop 1
manager.enqueue(makeEvent({ id: "evt-3" }), new Set());

expect(dropListener).toHaveBeenCalledWith(1, "overflow");
expect(manager.queueSize()).toBe(2);

const drained = manager.drain();
expect(drained.map((q) => q.event.id)).toEqual(["evt-2", "evt-3"]);
});
});
});
37 changes: 37 additions & 0 deletions packages/core/src/consent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ export interface QueuedEvent {
sentTo: Set<string>;
}

export type DropReason = "timeout" | "overflow";

type ConsentListener = (state: ConsentState, previous: ConsentState) => void;
type DropListener = (count: number, reason: DropReason) => void;

export interface ConsentManager {
/** Get current consent state */
Expand All @@ -50,6 +53,9 @@ export interface ConsentManager {
/** Subscribe to consent changes */
onChange: (listener: ConsentListener) => () => void;

/** Subscribe to queue drop events (timeout or overflow) */
onDrop: (listener: DropListener) => () => void;

/** Number of queued events */
queueSize: () => number;

Expand All @@ -63,6 +69,7 @@ export function createConsentManager(config: ConsentConfig): ConsentManager {
let state: ConsentState = { necessary: true, ...config.defaultState };
let queue: QueuedEvent[] = [];
const listeners = new Set<ConsentListener>();
const dropListeners = new Set<DropListener>();

// Check DNT/GPC on creation
if (typeof globalThis.navigator !== "undefined") {
Expand All @@ -74,14 +81,29 @@ export function createConsentManager(config: ConsentConfig): ConsentManager {
}
}

function notifyDrop(count: number, reason: DropReason): void {
for (const listener of dropListeners) {
try {
listener(count, reason);
} catch (e) {
console.error("[Junction] Drop listener error:", e);
}
}
}

// Queue cleanup timer — expire old events
let _cleanupTimer: ReturnType<typeof setInterval> | null = null;

if (!config.strictMode && config.queueTimeout > 0) {
_cleanupTimer = setInterval(
() => {
const now = Date.now();
const before = queue.length;
queue = queue.filter((item) => now - item.queuedAt < config.queueTimeout);
const dropped = before - queue.length;
if (dropped > 0) {
notifyDrop(dropped, "timeout");
}
},
Math.min(config.queueTimeout, 30_000),
);
Expand Down Expand Up @@ -161,6 +183,14 @@ export function createConsentManager(config: ConsentConfig): ConsentManager {
for (const name of sentTo) existing.sentTo.add(name);
return;
}

// Enforce max queue size — drop oldest events to make room
if (config.maxQueueSize && config.maxQueueSize > 0 && queue.length >= config.maxQueueSize) {
const overflow = queue.length - config.maxQueueSize + 1;
queue.splice(0, overflow);
notifyDrop(overflow, "overflow");
}

queue.push({ event, queuedAt: Date.now(), sentTo: new Set(sentTo) });
},

Expand All @@ -177,6 +207,13 @@ export function createConsentManager(config: ConsentConfig): ConsentManager {
};
},

onDrop(listener: DropListener) {
dropListeners.add(listener);
return () => {
dropListeners.delete(listener);
};
},

queueSize() {
return queue.length;
},
Expand Down
8 changes: 8 additions & 0 deletions packages/core/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,13 @@ export interface ConsentConfig {
/** Vendor consent protocol signals — fired on every consent state change */
signals?: ConsentSignal[];

/**
* Max number of events to hold in the consent queue.
* When exceeded, the oldest events are dropped (with a `queue:drop` event).
* Default: no limit.
*/
maxQueueSize?: number;

/**
* Strict GDPR mode. When true:
* - Event queuing is disabled (pending = denied)
Expand Down Expand Up @@ -461,6 +468,7 @@ export type CollectorEvent =
| "destination:error" // destination send failed
| "destination:init" // destination initialized
| "queue:flush" // consent queue flushed
| "queue:drop" // events dropped from consent queue (timeout or overflow)
| "error"; // any error

export type CollectorEventHandler = (data: {
Expand Down
Loading