Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion server/routes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import { ErrorLogger } from "./services/logger";
import { notificationTablesExist, channelTablesExist } from "./services/notificationReady";
import { BrowserlessUsageTracker, getMonthResetDate } from "./services/browserlessTracker";
import { ResendUsageTracker, getResendResetDate } from "./services/resendTracker";
import { errorLogs, monitorMetrics } from "@shared/schema";
import { errorLogs, monitorMetrics, monitors } from "@shared/schema";
import {
generalRateLimiter,
createMonitorRateLimiter,
Expand Down Expand Up @@ -248,6 +248,7 @@ export async function registerRoutes(
pauseReason: null,
healthAlertSentAt: null,
lastHealthyAt: null,
pendingRetryAt: null,
createdAt: new Date()
};

Expand Down Expand Up @@ -585,6 +586,16 @@ export async function registerRoutes(
if (!existing) return res.status(404).json({ message: "Not found" });
if (String(existing.userId) !== String(req.user.claims.sub)) return res.status(403).json({ message: "Forbidden" });

// Clear any pending auto-retry before the manual check to prevent
// a narrow race where the scheduler cron fires a duplicate check.
await db.update(monitors)
.set({ pendingRetryAt: null })
.where(eq(monitors.id, id))
.catch((err: unknown) => {
console.error(`[AutoRetry] Failed to clear pendingRetryAt for monitor ${id}:`,
err instanceof Error ? err.message : err);
});
Comment on lines +589 to +597
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don't fail open when clearing pendingRetryAt.

If this write fails, the manual check still runs while the due retry remains armed, so the scheduler can execute the same monitor again and duplicate change rows or notifications. This guard should abort the request on failure, and the write should stay behind IStorage.

💡 Safer change
-      await db.update(monitors)
-        .set({ pendingRetryAt: null })
-        .where(eq(monitors.id, id))
-        .catch((err: unknown) => {
-          console.error(`[AutoRetry] Failed to clear pendingRetryAt for monitor ${id}:`,
-            err instanceof Error ? err.message : err);
-        });
+      try {
+        await storage.updateMonitor(id, { pendingRetryAt: null });
+      } catch (err: unknown) {
+        console.error(`[AutoRetry] Failed to clear pendingRetryAt for monitor ${id}:`,
+          err instanceof Error ? err.message : err);
+        return res.status(503).json({
+          message: "Unable to start the manual check right now.",
+          code: "RETRY_STATE_UPDATE_FAILED",
+        });
+      }
As per coding guidelines, `Never put database queries or Drizzle ORM calls directly in route handlers — all database access must go through methods on the IStorage interface implemented in server/storage.ts`.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@server/routes.ts` around lines 589 - 597, The current route handler directly
calls db.update(...).set({ pendingRetryAt: null }) and catches errors, which
fails open; move this update behind the IStorage abstraction by adding a storage
method (e.g., IStorage.clearPendingRetry(monitorId) or
IStorage.setPendingRetryNull(id)) implemented in server/storage.ts that performs
the db.update on the monitors table and returns/throws on error, then call that
method from the route instead of the direct db.update; ensure the route
aborts/returns an error (does not continue with the manual check) if the storage
call fails so the armed retry remains cleared only on success.


const result = await checkMonitor(existing);
res.json(result);
} catch (error: any) {
Expand Down
81 changes: 81 additions & 0 deletions server/services/scheduler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ const {
mockGetAllActiveMonitors,
mockCleanupPollutedValues,
mockDbExecute,
mockDbUpdateSet,
cronCallbacks,
mockMonitorsNeedingRetry,
mockDeliverWebhook,
Expand All @@ -16,6 +17,7 @@ const {
mockGetAllActiveMonitors: vi.fn().mockResolvedValue([]),
mockCleanupPollutedValues: vi.fn().mockResolvedValue(undefined),
mockDbExecute: vi.fn().mockResolvedValue({ rowCount: 0 }),
mockDbUpdateSet: vi.fn(),
cronCallbacks: {} as Record<string, Array<() => Promise<void>>>,
mockMonitorsNeedingRetry: new Set<number>(),
mockDeliverWebhook: vi.fn().mockResolvedValue({ success: true, statusCode: 200 }),
Expand Down Expand Up @@ -60,6 +62,14 @@ vi.mock("./logger", () => ({
vi.mock("../db", () => ({
db: {
execute: (...args: any[]) => mockDbExecute(...args),
update: vi.fn().mockReturnValue({
set: (...args: any[]) => {
mockDbUpdateSet(...args);
const whereResult = Promise.resolve();
const whereFn = vi.fn().mockReturnValue(whereResult);
return { where: whereFn };
},
}),
Comment on lines +65 to +72
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Capture and assert the where(...) clause for retry-clear updates.

Line 70 returns a where function, but its args are not recorded. With current assertions (Lines 462, 480), a regression that removes where(...) could still pass and hide a full-table update bug.

Suggested hardening for the DB update mock and assertions
 const {
   mockCheckMonitor,
   mockGetAllActiveMonitors,
   mockCleanupPollutedValues,
   mockDbExecute,
   mockDbUpdateSet,
+  mockDbUpdateWhere,
   cronCallbacks,
   mockMonitorsNeedingRetry,
   mockDeliverWebhook,
 } = vi.hoisted(() => ({
@@
   mockDbExecute: vi.fn().mockResolvedValue({ rowCount: 0 }),
   mockDbUpdateSet: vi.fn(),
+  mockDbUpdateWhere: vi.fn(),
@@
     update: vi.fn().mockReturnValue({
       set: (...args: any[]) => {
         mockDbUpdateSet(...args);
-        const whereResult = Promise.resolve();
-        const whereFn = vi.fn().mockReturnValue(whereResult);
+        const whereFn = vi.fn((...whereArgs: any[]) => {
+          mockDbUpdateWhere(...whereArgs);
+          return Promise.resolve();
+        });
         return { where: whereFn };
       },
     }),
 expect(mockDbUpdateSet).toHaveBeenCalledWith({ pendingRetryAt: null });
+expect(mockDbUpdateWhere).toHaveBeenCalled();
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
update: vi.fn().mockReturnValue({
set: (...args: any[]) => {
mockDbUpdateSet(...args);
const whereResult = Promise.resolve();
const whereFn = vi.fn().mockReturnValue(whereResult);
return { where: whereFn };
},
}),
update: vi.fn().mockReturnValue({
set: (...args: any[]) => {
mockDbUpdateSet(...args);
const whereFn = vi.fn((...whereArgs: any[]) => {
mockDbUpdateWhere(...whereArgs);
return Promise.resolve();
});
return { where: whereFn };
},
}),
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@server/services/scheduler.test.ts` around lines 65 - 72, The update mock
returns a set that creates a where function but doesn't record its args; change
the mock inside the update: replace the inline creation of whereFn with a
vi.fn(...) assigned to a shared test-level recorder (e.g., mockDbUpdateWhere or
reuse mockDbUpdateSet to store whereArgs) so every call to where(...) is
captured (returning the same whereResult Promise). Then update the tests to
assert that mockDbUpdateWhere was called with the expected predicates (in
addition to existing assertions on mockDbUpdateSet) to prevent a silent removal
of the where(...) clause in scheduler.update.

},
}));

Expand Down Expand Up @@ -163,6 +173,7 @@ function makeMonitor(overrides: Partial<Monitor> = {}): Monitor {
pauseReason: null,
healthAlertSentAt: null,
lastHealthyAt: null,
pendingRetryAt: null,
createdAt: new Date(),
...overrides,
};
Expand Down Expand Up @@ -399,6 +410,75 @@ describe("startScheduler", () => {
resolver!();
await Promise.resolve();
});

// -----------------------------------------------------------------------
// auto-retry scheduler pickup (pendingRetryAt)
// -----------------------------------------------------------------------

it("triggers check for monitor with pendingRetryAt <= now", async () => {
const monitor = makeMonitor({
frequency: "hourly",
lastChecked: new Date(Date.now() - 30 * 60 * 1000), // 30 min ago — not normally due
pendingRetryAt: new Date(Date.now() - 1000), // 1 second in the past
});
mockGetAllActiveMonitors.mockResolvedValueOnce([monitor]);

await startScheduler();
await runCron("* * * * *");
await vi.advanceTimersByTimeAsync(31000);

expect(mockCheckMonitor).toHaveBeenCalledWith(monitor);
});

it("does NOT trigger check for monitor with pendingRetryAt in the future", async () => {
const monitor = makeMonitor({
frequency: "hourly",
lastChecked: new Date(Date.now() - 30 * 60 * 1000), // 30 min ago — not normally due
pendingRetryAt: new Date(Date.now() + 30 * 60 * 1000), // 30 min in the future
});
mockGetAllActiveMonitors.mockResolvedValueOnce([monitor]);

await startScheduler();
await runCron("* * * * *");
await vi.advanceTimersByTimeAsync(31000);

expect(mockCheckMonitor).not.toHaveBeenCalled();
});
Comment on lines +418 to +446
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Add explicit boundary test for pendingRetryAt === now.

You cover past and future, but not the exact boundary for the <= now condition. Locking that case prevents subtle regressions.

Boundary test to add
+  it("triggers check for monitor with pendingRetryAt exactly now", async () => {
+    const now = new Date();
+    vi.setSystemTime(now);
+    const monitor = makeMonitor({
+      frequency: "hourly",
+      lastChecked: new Date(now.getTime() - 30 * 60 * 1000),
+      pendingRetryAt: now,
+    });
+    mockGetAllActiveMonitors.mockResolvedValueOnce([monitor]);
+
+    await startScheduler();
+    await runCron("* * * * *");
+    await vi.advanceTimersByTimeAsync(31000);
+
+    expect(mockCheckMonitor).toHaveBeenCalledWith(monitor);
+  });

As per coding guidelines, "Tests must cover edge cases and error paths - include assertions for edge cases (empty inputs, boundary values, null/undefined)".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@server/services/scheduler.test.ts` around lines 418 - 446, Add a new unit
test alongside the existing cases that creates a monitor via makeMonitor with
pendingRetryAt set to new Date() (exactly "now"), mockGetAllActiveMonitors to
return that monitor, then call startScheduler(), runCron("* * * * *") and
advance timers (vi.advanceTimersByTimeAsync(31000)), and finally assert
mockCheckMonitor was called with that monitor to verify the <= now boundary;
place this test next to the existing "pendingRetryAt <= now" and "in the future"
tests so it uses the same helpers (makeMonitor, mockGetAllActiveMonitors,
startScheduler, runCron, mockCheckMonitor).


it("clears pendingRetryAt after retry fires (success path)", async () => {
const monitor = makeMonitor({
frequency: "hourly",
lastChecked: new Date(Date.now() - 30 * 60 * 1000),
pendingRetryAt: new Date(Date.now() - 1000),
});
mockGetAllActiveMonitors.mockResolvedValueOnce([monitor]);

await startScheduler();
await runCron("* * * * *");
await vi.advanceTimersByTimeAsync(31000);

expect(mockCheckMonitor).toHaveBeenCalledWith(monitor);
// The finally block should clear pendingRetryAt
expect(mockDbUpdateSet).toHaveBeenCalledWith({ pendingRetryAt: null });
});
Comment on lines +456 to +463
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

find . -name "scheduler.test.ts" -type f | head -5

Repository: bd73-com/fetchthechange

Length of output: 102


🏁 Script executed:

wc -l server/services/scheduler.test.ts

Repository: bd73-com/fetchthechange

Length of output: 105


🏁 Script executed:

sed -n '450,485p' server/services/scheduler.test.ts

Repository: bd73-com/fetchthechange

Length of output: 1361


🏁 Script executed:

find . -name "scheduler.ts" -type f | grep -E "server/services" | head -5

Repository: bd73-com/fetchthechange

Length of output: 97


🏁 Script executed:

sed -n '1,100p' server/services/scheduler.ts | head -80

Repository: bd73-com/fetchthechange

Length of output: 3179


🏁 Script executed:

rg "finally" server/services/scheduler.ts -B3 -A3

Repository: bd73-com/fetchthechange

Length of output: 780


🏁 Script executed:

rg "pendingRetryAt" server/services/scheduler.ts -B3 -A3

Repository: bd73-com/fetchthechange

Length of output: 800


🏁 Script executed:

rg "runCheckWithLimit" server/services/scheduler.ts -B5 -A15

Repository: bd73-com/fetchthechange

Length of output: 2613


🏁 Script executed:

head -50 server/services/scheduler.test.ts

Repository: bd73-com/fetchthechange

Length of output: 2009


🏁 Script executed:

sed -n '1,200p' server/services/scheduler.test.ts | grep -E "import|flushPromises"

Repository: bd73-com/fetchthechange

Length of output: 602


🏁 Script executed:

rg "flushPromises" server/services/scheduler.test.ts

Repository: bd73-com/fetchthechange

Length of output: 177


🏁 Script executed:

rg "vi.runAllTimers|vi.advanceTimers" server/services/scheduler.test.ts | head -20

Repository: bd73-com/fetchthechange

Length of output: 1010


🏁 Script executed:

sed -n '440,485p' server/services/scheduler.test.ts

Repository: bd73-com/fetchthechange

Length of output: 1640


🏁 Script executed:

rg "void runCheckWithLimit" server/services/scheduler.ts -B2 -A2

Repository: bd73-com/fetchthechange

Length of output: 624


🏁 Script executed:

sed -n '9,30p' server/services/scheduler.test.ts

Repository: bd73-com/fetchthechange

Length of output: 915


🏁 Script executed:

rg "async function flushPromises" server/services/scheduler.test.ts -A 10

Repository: bd73-com/fetchthechange

Length of output: 418


🏁 Script executed:

sed -n '456,495p' server/services/scheduler.test.ts

Repository: bd73-com/fetchthechange

Length of output: 1404


🏁 Script executed:

sed -n '80,150p' server/services/scheduler.test.ts

Repository: bd73-com/fetchthechange

Length of output: 2414


🏁 Script executed:

rg "vi.mock.*db" server/services/scheduler.test.ts -A 30

Repository: bd73-com/fetchthechange

Length of output: 831


🏁 Script executed:

sed -n '448,490p' server/services/scheduler.test.ts

Repository: bd73-com/fetchthechange

Length of output: 1595


🏁 Script executed:

rg "await flushPromises" server/services/scheduler.test.ts | head -20

Repository: bd73-com/fetchthechange

Length of output: 120


🏁 Script executed:

rg -n "await flushPromises\(\)" server/services/scheduler.test.ts

Repository: bd73-com/fetchthechange

Length of output: 130


🏁 Script executed:

sed -n '300,350p' server/services/scheduler.test.ts

Repository: bd73-com/fetchthechange

Length of output: 1909


🏁 Script executed:

sed -n '1040,1075p' server/services/scheduler.test.ts

Repository: bd73-com/fetchthechange

Length of output: 1547


Add await flushPromises() before assertions to prevent race conditions with detached async work.

Both tests at lines 456–463 (success path) and 474–480 (failure path) invoke void runCheckWithLimit(), which schedules a fire-and-forget async task that clears pendingRetryAt in its finally block. The vi.advanceTimersByTimeAsync() call advances timers but does not drain the microtask queue, so the clear may not execute before the assertions run, causing intermittent failures.

Add microtask flush before assertions
   await startScheduler();
   await runCron("* * * * *");
   await vi.advanceTimersByTimeAsync(31000);
+  await flushPromises();

   expect(mockCheckMonitor).toHaveBeenCalledWith(monitor);
   // The finally block should clear pendingRetryAt
   expect(mockDbUpdateSet).toHaveBeenCalledWith({ pendingRetryAt: null });
@@
   await startScheduler();
   await runCron("* * * * *");
   await vi.advanceTimersByTimeAsync(31000);
+  await flushPromises();

   expect(mockCheckMonitor).toHaveBeenCalledWith(monitor);
   // Even on failure, pendingRetryAt should be cleared
   expect(mockDbUpdateSet).toHaveBeenCalledWith({ pendingRetryAt: null });
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@server/services/scheduler.test.ts` around lines 456 - 463, The tests call
fire-and-forget work via runCheckWithLimit (triggered by runCron/startScheduler)
which clears pendingRetryAt in a finally block, but timers advancement doesn't
drain microtasks; before the assertions that check mockCheckMonitor and
mockDbUpdateSet({ pendingRetryAt: null }) add an awaited microtask drain (e.g.,
await flushPromises()) so the detached async finally block completes and the
assertions observe the cleared pendingRetryAt.


it("clears pendingRetryAt after retry fires (failure path)", async () => {
mockCheckMonitor.mockRejectedValueOnce(new Error("Scrape failed"));
const monitor = makeMonitor({
frequency: "hourly",
lastChecked: new Date(Date.now() - 30 * 60 * 1000),
pendingRetryAt: new Date(Date.now() - 1000),
});
mockGetAllActiveMonitors.mockResolvedValueOnce([monitor]);

await startScheduler();
await runCron("* * * * *");
await vi.advanceTimersByTimeAsync(31000);

expect(mockCheckMonitor).toHaveBeenCalledWith(monitor);
// Even on failure, pendingRetryAt should be cleared
expect(mockDbUpdateSet).toHaveBeenCalledWith({ pendingRetryAt: null });
});
});

describe("concurrency limiting (runCheckWithLimit)", () => {
Expand Down Expand Up @@ -1290,3 +1370,4 @@ describe("webhook retry cumulative backoff", () => {
}));
});
});

26 changes: 25 additions & 1 deletion server/services/scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ import { ensureMonitorConditionsTable } from "./ensureTables";
import { processAutomatedCampaigns } from "./automatedCampaigns";
import { isTransientDbError } from "../utils/dbErrors";
import { db } from "../db";
import { sql } from "drizzle-orm";
import { eq, sql } from "drizzle-orm";
import { monitors } from "@shared/schema";

// Keep below DB pool max (3, see db.ts) to leave headroom for cron jobs and
// API requests. Browser POOL_MAX is 1 (browserPool.ts), so the second
Expand Down Expand Up @@ -110,9 +111,17 @@ async function runCheckWithLimit(monitor: Parameters<typeof checkMonitor>[0]): P
console.debug(`[Scheduler] Concurrency limit reached, deferring monitor ${monitor.id}`);
return false;
}

const hadPendingRetry = !!(
monitor.pendingRetryAt && new Date(monitor.pendingRetryAt) <= new Date()
);

activeChecks++;
try {
await checkMonitor(monitor);
if (hadPendingRetry) {
console.log(`[AutoRetry] Monitor ${monitor.id} — retry completed`);
}
return true;
} catch (error) {
await ErrorLogger.error("scheduler", `"${monitor.name}" — scheduled check failed. This is usually a temporary issue. If it persists, verify the URL is still valid and the selector matches the page.`, error instanceof Error ? error : null, {
Expand All @@ -124,6 +133,16 @@ async function runCheckWithLimit(monitor: Parameters<typeof checkMonitor>[0]): P
return true;
} finally {
activeChecks--;
if (hadPendingRetry) {
try {
await db.update(monitors)
.set({ pendingRetryAt: null })
.where(eq(monitors.id, monitor.id));
} catch (err: unknown) {
console.error(`[AutoRetry] Failed to clear pendingRetryAt for monitor ${monitor.id}:`,
err instanceof Error ? err.message : err);
}
}
}
}

Expand Down Expand Up @@ -218,6 +237,11 @@ export async function startScheduler() {
}
}

// Auto-retry: fire if pendingRetryAt window has elapsed
if (!shouldCheck && monitor.pendingRetryAt && new Date(monitor.pendingRetryAt) <= now) {
shouldCheck = true;
}

if (shouldCheck) {
const jitterMs = Math.floor(Math.random() * 30000);
trackTimeout(() => {
Expand Down
Loading
Loading