Skip to content

Commit 3e4bc36

Browse files
Spread Object Level lifecycle (#3088)
Previously this format was failing to propagate lifecycle to table ```export const SomePipeline = new IngestPipeline<SomeType>("FooPipeline", { table: { engine: ClickHouseEngines.MergeTree, orderByFields: ['session'] }, stream: true, ingestApi: true, version: "0.0", lifeCycle: LifeCycle.DELETION_PROTECTED }); ``` <!-- CURSOR_SUMMARY --> --- > [!NOTE] > Propagates top-level `lifeCycle` to `table`, `stream`, and `deadLetterQueue` when not explicitly set, and adds tests covering propagation and overrides. > > - **SDK (`packages/ts-moose-lib/src/dmv2/sdk/ingestPipeline.ts`)**: > - Propagate top-level `lifeCycle` to component configs when omitted: > - `table`: set `lifeCycle` to `config.table.lifeCycle ?? config.lifeCycle` for object configs; include `lifeCycle` when `table: true`. > - `stream`: set `lifeCycle` to `config.stream.lifeCycle ?? config.lifeCycle` for object configs; include `lifeCycle` when `stream: true`. > - `deadLetterQueue`: set `lifeCycle` to `config.deadLetterQueue.lifeCycle ?? config.lifeCycle` for object configs; include `lifeCycle` when `deadLetterQueue: true`. > - **Tests (`packages/ts-moose-lib/tests/ingestPipeline-lifecycle.test.ts`)**: > - Add comprehensive tests verifying `lifeCycle` propagation, per-component overrides, boolean config cases, and absence when unspecified. > > <sup>Written by [Cursor Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit 7ad6022. This will update automatically on new commits. Configure [here](https://cursor.com/dashboard?tab=bugbot).</sup> <!-- /CURSOR_SUMMARY -->
1 parent 4acbe90 commit 3e4bc36

File tree

2 files changed

+266
-3
lines changed

2 files changed

+266
-3
lines changed

packages/ts-moose-lib/src/dmv2/sdk/ingestPipeline.ts

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,7 @@ export class IngestPipeline<T> extends TypedBase<T, IngestPipelineConfig<T>> {
236236
typeof config.table === "object" ?
237237
{
238238
...config.table,
239+
lifeCycle: config.table.lifeCycle ?? config.lifeCycle,
239240
...(config.version && { version: config.version }),
240241
}
241242
: {
@@ -256,8 +257,11 @@ export class IngestPipeline<T> extends TypedBase<T, IngestPipelineConfig<T>> {
256257
const streamConfig = {
257258
destination: undefined,
258259
...(typeof config.deadLetterQueue === "object" ?
259-
config.deadLetterQueue
260-
: {}),
260+
{
261+
...config.deadLetterQueue,
262+
lifeCycle: config.deadLetterQueue.lifeCycle ?? config.lifeCycle,
263+
}
264+
: { lifeCycle: config.lifeCycle }),
261265
...(config.version && { version: config.version }),
262266
};
263267
this.deadLetterQueue = new DeadLetterQueue<T>(
@@ -273,7 +277,10 @@ export class IngestPipeline<T> extends TypedBase<T, IngestPipelineConfig<T>> {
273277
destination: this.table,
274278
defaultDeadLetterQueue: this.deadLetterQueue,
275279
...(typeof config.stream === "object" ?
276-
config.stream
280+
{
281+
...config.stream,
282+
lifeCycle: config.stream.lifeCycle ?? config.lifeCycle,
283+
}
277284
: { lifeCycle: config.lifeCycle }),
278285
...(config.version && { version: config.version }),
279286
};
Lines changed: 256 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,256 @@
1+
import { expect } from "chai";
2+
import { IngestPipeline } from "../src/dmv2/sdk/ingestPipeline";
3+
import { LifeCycle } from "../src/dmv2/sdk/lifeCycle";
4+
import { getMooseInternal } from "../src/dmv2/internal";
5+
6+
interface TestData {
7+
id: string;
8+
value: number;
9+
}
10+
11+
describe("IngestPipeline", () => {
12+
beforeEach(() => {
13+
// Clear the registry before each test
14+
const registry = getMooseInternal();
15+
registry.tables.clear();
16+
registry.streams.clear();
17+
registry.ingestApis.clear();
18+
});
19+
20+
describe("lifeCycle propagation", () => {
21+
it("should propagate top-level lifeCycle to table when table.lifeCycle is not specified", () => {
22+
const pipeline = new IngestPipeline<TestData>("TestPipeline", {
23+
table: {
24+
orderByFields: ["id"],
25+
},
26+
stream: false,
27+
ingestApi: false,
28+
lifeCycle: LifeCycle.EXTERNALLY_MANAGED,
29+
});
30+
31+
expect(pipeline.table).to.not.be.undefined;
32+
expect(pipeline.table?.config.lifeCycle).to.equal(
33+
LifeCycle.EXTERNALLY_MANAGED,
34+
);
35+
});
36+
37+
it("should propagate top-level lifeCycle to stream when stream.lifeCycle is not specified", () => {
38+
const pipeline = new IngestPipeline<TestData>("TestPipeline", {
39+
table: true,
40+
stream: {
41+
parallelism: 1,
42+
},
43+
ingestApi: false,
44+
lifeCycle: LifeCycle.DELETION_PROTECTED,
45+
});
46+
47+
expect(pipeline.stream).to.not.be.undefined;
48+
expect(pipeline.stream?.config.lifeCycle).to.equal(
49+
LifeCycle.DELETION_PROTECTED,
50+
);
51+
});
52+
53+
it("should propagate top-level lifeCycle to both table and stream", () => {
54+
const pipeline = new IngestPipeline<TestData>("TestPipeline", {
55+
table: {
56+
orderByFields: ["id"],
57+
},
58+
stream: {
59+
parallelism: 2,
60+
},
61+
ingestApi: false,
62+
lifeCycle: LifeCycle.FULLY_MANAGED,
63+
});
64+
65+
expect(pipeline.table).to.not.be.undefined;
66+
expect(pipeline.table?.config.lifeCycle).to.equal(
67+
LifeCycle.FULLY_MANAGED,
68+
);
69+
expect(pipeline.stream).to.not.be.undefined;
70+
expect(pipeline.stream?.config.lifeCycle).to.equal(
71+
LifeCycle.FULLY_MANAGED,
72+
);
73+
});
74+
75+
it("should respect table-specific lifeCycle over top-level lifeCycle", () => {
76+
const pipeline = new IngestPipeline<TestData>("TestPipeline", {
77+
table: {
78+
orderByFields: ["id"],
79+
lifeCycle: LifeCycle.DELETION_PROTECTED,
80+
},
81+
stream: false,
82+
ingestApi: false,
83+
lifeCycle: LifeCycle.EXTERNALLY_MANAGED,
84+
});
85+
86+
expect(pipeline.table).to.not.be.undefined;
87+
expect(pipeline.table?.config.lifeCycle).to.equal(
88+
LifeCycle.DELETION_PROTECTED,
89+
);
90+
});
91+
92+
it("should respect stream-specific lifeCycle over top-level lifeCycle", () => {
93+
const pipeline = new IngestPipeline<TestData>("TestPipeline", {
94+
table: true,
95+
stream: {
96+
parallelism: 1,
97+
lifeCycle: LifeCycle.FULLY_MANAGED,
98+
},
99+
ingestApi: false,
100+
lifeCycle: LifeCycle.EXTERNALLY_MANAGED,
101+
});
102+
103+
expect(pipeline.stream).to.not.be.undefined;
104+
expect(pipeline.stream?.config.lifeCycle).to.equal(
105+
LifeCycle.FULLY_MANAGED,
106+
);
107+
});
108+
109+
it("should allow different lifeCycles for table and stream with top-level default", () => {
110+
const pipeline = new IngestPipeline<TestData>("TestPipeline", {
111+
table: {
112+
orderByFields: ["id"],
113+
lifeCycle: LifeCycle.DELETION_PROTECTED,
114+
},
115+
stream: {
116+
parallelism: 1,
117+
},
118+
ingestApi: false,
119+
lifeCycle: LifeCycle.EXTERNALLY_MANAGED,
120+
});
121+
122+
expect(pipeline.table).to.not.be.undefined;
123+
expect(pipeline.table?.config.lifeCycle).to.equal(
124+
LifeCycle.DELETION_PROTECTED,
125+
);
126+
expect(pipeline.stream).to.not.be.undefined;
127+
expect(pipeline.stream?.config.lifeCycle).to.equal(
128+
LifeCycle.EXTERNALLY_MANAGED,
129+
);
130+
});
131+
132+
it("should work with table: true and propagate lifeCycle", () => {
133+
const pipeline = new IngestPipeline<TestData>("TestPipeline", {
134+
table: true,
135+
stream: false,
136+
ingestApi: false,
137+
lifeCycle: LifeCycle.DELETION_PROTECTED,
138+
});
139+
140+
expect(pipeline.table).to.not.be.undefined;
141+
expect(pipeline.table?.config.lifeCycle).to.equal(
142+
LifeCycle.DELETION_PROTECTED,
143+
);
144+
});
145+
146+
it("should work with stream: true and propagate lifeCycle", () => {
147+
const pipeline = new IngestPipeline<TestData>("TestPipeline", {
148+
table: true,
149+
stream: true,
150+
ingestApi: false,
151+
lifeCycle: LifeCycle.EXTERNALLY_MANAGED,
152+
});
153+
154+
expect(pipeline.stream).to.not.be.undefined;
155+
expect(pipeline.stream?.config.lifeCycle).to.equal(
156+
LifeCycle.EXTERNALLY_MANAGED,
157+
);
158+
});
159+
160+
it("should not set lifeCycle when not specified at any level", () => {
161+
const pipeline = new IngestPipeline<TestData>("TestPipeline", {
162+
table: {
163+
orderByFields: ["id"],
164+
},
165+
stream: {
166+
parallelism: 1,
167+
},
168+
ingestApi: false,
169+
});
170+
171+
expect(pipeline.table).to.not.be.undefined;
172+
expect(pipeline.table?.config.lifeCycle).to.be.undefined;
173+
expect(pipeline.stream).to.not.be.undefined;
174+
expect(pipeline.stream?.config.lifeCycle).to.be.undefined;
175+
});
176+
177+
it("should propagate top-level lifeCycle to deadLetterQueue when deadLetterQueue.lifeCycle is not specified", () => {
178+
const pipeline = new IngestPipeline<TestData>("TestPipeline", {
179+
table: true,
180+
stream: true,
181+
ingestApi: false,
182+
deadLetterQueue: {
183+
parallelism: 1,
184+
},
185+
lifeCycle: LifeCycle.DELETION_PROTECTED,
186+
});
187+
188+
expect(pipeline.deadLetterQueue).to.not.be.undefined;
189+
expect(pipeline.deadLetterQueue?.config.lifeCycle).to.equal(
190+
LifeCycle.DELETION_PROTECTED,
191+
);
192+
});
193+
194+
it("should propagate top-level lifeCycle to deadLetterQueue when deadLetterQueue is true", () => {
195+
const pipeline = new IngestPipeline<TestData>("TestPipeline", {
196+
table: true,
197+
stream: true,
198+
ingestApi: false,
199+
deadLetterQueue: true,
200+
lifeCycle: LifeCycle.EXTERNALLY_MANAGED,
201+
});
202+
203+
expect(pipeline.deadLetterQueue).to.not.be.undefined;
204+
expect(pipeline.deadLetterQueue?.config.lifeCycle).to.equal(
205+
LifeCycle.EXTERNALLY_MANAGED,
206+
);
207+
});
208+
209+
it("should respect deadLetterQueue-specific lifeCycle over top-level lifeCycle", () => {
210+
const pipeline = new IngestPipeline<TestData>("TestPipeline", {
211+
table: true,
212+
stream: true,
213+
ingestApi: false,
214+
deadLetterQueue: {
215+
parallelism: 2,
216+
lifeCycle: LifeCycle.FULLY_MANAGED,
217+
},
218+
lifeCycle: LifeCycle.EXTERNALLY_MANAGED,
219+
});
220+
221+
expect(pipeline.deadLetterQueue).to.not.be.undefined;
222+
expect(pipeline.deadLetterQueue?.config.lifeCycle).to.equal(
223+
LifeCycle.FULLY_MANAGED,
224+
);
225+
});
226+
227+
it("should propagate lifeCycle to all components (table, stream, deadLetterQueue)", () => {
228+
const pipeline = new IngestPipeline<TestData>("TestPipeline", {
229+
table: {
230+
orderByFields: ["id"],
231+
},
232+
stream: {
233+
parallelism: 1,
234+
},
235+
ingestApi: false,
236+
deadLetterQueue: {
237+
parallelism: 1,
238+
},
239+
lifeCycle: LifeCycle.DELETION_PROTECTED,
240+
});
241+
242+
expect(pipeline.table).to.not.be.undefined;
243+
expect(pipeline.table?.config.lifeCycle).to.equal(
244+
LifeCycle.DELETION_PROTECTED,
245+
);
246+
expect(pipeline.stream).to.not.be.undefined;
247+
expect(pipeline.stream?.config.lifeCycle).to.equal(
248+
LifeCycle.DELETION_PROTECTED,
249+
);
250+
expect(pipeline.deadLetterQueue).to.not.be.undefined;
251+
expect(pipeline.deadLetterQueue?.config.lifeCycle).to.equal(
252+
LifeCycle.DELETION_PROTECTED,
253+
);
254+
});
255+
});
256+
});

0 commit comments

Comments
 (0)