Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,16 @@ Security notes:
- Replay protection is enforced by deduplicating `eventId` values in the ingestion service.
- Duplicate deliveries are treated as safe no-ops and return `202 Accepted`.

## Soft Delete / Retention policy

A new soft delete flow is available for stream records:

- `DELETE /api/v1/streams/:id`: marks the record as soft-deleted via `deleted_at` timestamp.
- `GET /api/v1/streams`: by default returns non-deleted records; add `?includeDeleted=true` for admin/inspection mode.
- `GET /api/v1/streams/:id`: by default hides soft-deleted records; add `?includeDeleted=true` to retrieve them.

All queries now include `deleted_at IS NULL` unless `includeDeleted` is explicitly true.

## API Versioning Policy

All new features and endpoints must be mounted under the `/api/v1` prefix.
Expand Down
38 changes: 38 additions & 0 deletions src/api/v1/streams.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,44 @@ describe("Stream API Routes", () => {
expect(response.status).toBe(400);
expect(response.body.error).toBe("Invalid stream ID format");
});

it("should allow includeDeleted for get by id", async () => {
const mockStream = { id: validId, payer: "p1", accruedEstimate: "10.5" };
const spy = jest.spyOn(StreamRepository.prototype, "findById").mockResolvedValue(mockStream as never);

const response = await request(app)
.get(`/api/v1/streams/${validId}?includeDeleted=true`)
.set("x-api-key", "test-1234");

expect(response.status).toBe(200);
expect(response.body).toEqual(mockStream);
expect(spy).toHaveBeenCalledWith(validId, true);
spy.mockRestore();
});

it("soft-deletes stream with DELETE", async () => {
const spy = jest.spyOn(StreamRepository.prototype, "softDeleteById").mockResolvedValue(true);

const response = await request(app)
.delete(`/api/v1/streams/${validId}`)
.set("x-api-key", "test-1234");

expect(response.status).toBe(204);
expect(spy).toHaveBeenCalledWith(validId);
spy.mockRestore();
});

it("returns 404 when delete target not found", async () => {
const spy = jest.spyOn(StreamRepository.prototype, "softDeleteById").mockResolvedValue(false);

const response = await request(app)
.delete(`/api/v1/streams/${validId}`)
.set("x-api-key", "test-1234");

expect(response.status).toBe(404);
expect(response.body.error).toBe("Stream not found");
spy.mockRestore();
});
});

describe("GET /api/v1/streams", () => {
Expand Down
71 changes: 69 additions & 2 deletions src/repositories/streamRepository.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { db } from "../db/index";
jest.mock("../db/index", () => ({
db: {
select: jest.fn(),
update: jest.fn(),
},
}));

Expand Down Expand Up @@ -60,6 +61,53 @@ describe("StreamRepository", () => {

expect(result).toBeNull();
});

it("should not return deleted stream by default", async () => {
(db.select as jest.Mock).mockReturnValue(createMockQuery([]));

const result = await repository.findById("deleted-id");

expect(result).toBeNull();
});

it("should include deleted when includeDeleted is true", async () => {
const mockStream = {
id: "deleted-id",
payer: "payer1",
recipient: "recipient1",
status: "paused",
ratePerSecond: "1",
startTime: new Date(),
endTime: null,
lastSettledAt: new Date(),
createdAt: new Date(),
updatedAt: new Date(),
deletedAt: new Date(),
};

(db.select as jest.Mock).mockReturnValue(createMockQuery([mockStream]));

const result = await repository.findById("deleted-id", true);

expect(result).not.toBeNull();
expect(result?.id).toBe("deleted-id");
});

it("should mark stream deleted with softDeleteById", async () => {
const updateBuilder = {
set: jest.fn().mockReturnThis(),
where: jest.fn().mockResolvedValue(1),
};

(db.update as jest.Mock).mockReturnValue(updateBuilder);

const deleted = await repository.softDeleteById("to-delete-id");

expect(deleted).toBe(true);
expect(db.update).toHaveBeenCalled();
expect(updateBuilder.set).toHaveBeenCalledWith({ deletedAt: expect.any(Date) });
expect(updateBuilder.where).toHaveBeenCalled();
});
});

describe("findAll", () => {
Expand All @@ -69,14 +117,33 @@ describe("StreamRepository", () => {
{ id: "2", payer: "p1", status: "paused", createdAt: new Date() },
];

const firstQuery = createMockQuery(mockStreams);
const countQuery = createMockQuery([{ count: 2 }]);

(db.select as jest.Mock)
.mockReturnValueOnce(createMockQuery(mockStreams)) // for data
.mockReturnValueOnce(createMockQuery([{ count: 2 }])); // for count
.mockReturnValueOnce(firstQuery)
.mockReturnValueOnce(countQuery);

const result = await repository.findAll({ payer: "p1" });

expect(result.streams).toHaveLength(2);
expect(result.total).toBe(2);
expect(firstQuery.where).toHaveBeenCalled();
expect(countQuery.where).toHaveBeenCalled();
});

it("should query deleted rows only when includeDeleted true", async () => {
const firstQuery = createMockQuery([{ id: "1", payer: "p1" }]);
const countQuery = createMockQuery([{ count: 1 }]);

(db.select as jest.Mock)
.mockReturnValueOnce(firstQuery)
.mockReturnValueOnce(countQuery);

await repository.findAll({ payer: "p1", includeDeleted: true });

expect(firstQuery.where).toHaveBeenCalled();
expect(countQuery.where).toHaveBeenCalled();
});
});
});
12 changes: 10 additions & 2 deletions src/repositories/streamRepository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ export interface FindAllParams {
status?: "active" | "paused" | "cancelled" | "completed";
limit?: number;
offset?: number;
includeDeleted?: boolean;
}

export interface UpdateStreamParams {
Expand All @@ -18,11 +19,14 @@ export interface UpdateStreamParams {
}

export class StreamRepository {
async findById(id: string): Promise<(Stream & { accruedEstimate: string }) | null> {
async findById(id: string, includeDeleted = false): Promise<(Stream & { accruedEstimate: string }) | null> {
const conditions = [eq(streams.id, id)];
if (!includeDeleted) conditions.push(sql`${streams.deletedAt} IS NULL`);

const [result] = await db
.select()
.from(streams)
.where(eq(streams.id, id))
.where(and(...conditions))
.limit(1);

if (!result) return null;
Expand All @@ -44,6 +48,10 @@ export class StreamRepository {
if (params.recipient) conditions.push(eq(streams.recipient, params.recipient));
if (params.status) conditions.push(eq(streams.status, params.status));

if (!params.includeDeleted) {
conditions.push(sql`${streams.deletedAt} IS NULL`);
}

const query = db
.select()
.from(streams)
Expand Down
Loading