Skip to content

Commit a1d91be

Browse files
THardy98mjameswh
andauthored
Special behaviour for temporal prefixes (#1644)
Co-authored-by: James Watkins-Harvey <mjameswh@users.noreply.github.com> Co-authored-by: James Watkins-Harvey <james.watkinsharvey@temporal.io>
1 parent 98b7004 commit a1d91be

File tree

10 files changed

+436
-92
lines changed

10 files changed

+436
-92
lines changed

packages/common/src/reserved.ts

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
export const TEMPORAL_RESERVED_PREFIX = '__temporal_';
2+
export const STACK_TRACE_QUERY_NAME = '__stack_trace';
3+
export const ENHANCED_STACK_TRACE_QUERY_NAME = '__enhanced_stack_trace';
4+
5+
/**
6+
* Valid entity types that can be checked for reserved name violations
7+
*/
8+
export type ReservedNameEntityType = 'query' | 'signal' | 'update' | 'activity' | 'task queue' | 'sink' | 'workflow';
9+
10+
/**
11+
* Validates if the provided name contains any reserved prefixes or matches any reserved names.
12+
* Throws a TypeError if validation fails, with a specific message indicating whether the issue
13+
* is with a reserved prefix or an exact match to a reserved name.
14+
*
15+
* @param type The entity type being checked
16+
* @param name The name to check against reserved prefixes/names
17+
*/
18+
export function throwIfReservedName(type: ReservedNameEntityType, name: string): void {
19+
if (name.startsWith(TEMPORAL_RESERVED_PREFIX)) {
20+
throw new TypeError(`Cannot use ${type} name: '${name}', with reserved prefix: '${TEMPORAL_RESERVED_PREFIX}'`);
21+
}
22+
23+
if (name === STACK_TRACE_QUERY_NAME || name === ENHANCED_STACK_TRACE_QUERY_NAME) {
24+
throw new TypeError(`Cannot use ${type} name: '${name}', which is a reserved name`);
25+
}
26+
}

packages/test/src/helpers.ts

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -293,21 +293,6 @@ export async function getRandomPort(fn = (_port: number) => Promise.resolve()):
293293
});
294294
}
295295

296-
export function asSdkLoggerSink(
297-
fn: (info: WorkflowInfo, message: string, attrs?: Record<string, unknown>) => Promise<void>,
298-
opts?: Omit<worker.InjectedSinkFunction<any>, 'fn'>
299-
): worker.InjectedSinks<DefaultLoggerSinks> {
300-
return {
301-
__temporal_logger: {
302-
trace: { fn, ...opts },
303-
debug: { fn, ...opts },
304-
info: { fn, ...opts },
305-
warn: { fn, ...opts },
306-
error: { fn, ...opts },
307-
},
308-
};
309-
}
310-
311296
export async function loadHistory(fname: string): Promise<iface.temporal.api.history.v1.History> {
312297
const isJson = fname.endsWith('json');
313298
const fpath = path.resolve(__dirname, `../history_files/${fname}`);

packages/test/src/test-integration-workflows.ts

Lines changed: 247 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,24 @@
11
import { setTimeout as setTimeoutPromise } from 'timers/promises';
22
import { randomUUID } from 'crypto';
3+
import asyncRetry from 'async-retry';
34
import { ExecutionContext } from 'ava';
45
import { firstValueFrom, Subject } from 'rxjs';
5-
import { WorkflowFailedError } from '@temporalio/client';
6+
import { WorkflowFailedError, WorkflowHandle } from '@temporalio/client';
67
import * as activity from '@temporalio/activity';
78
import { msToNumber, tsToMs } from '@temporalio/common/lib/time';
89
import { TestWorkflowEnvironment } from '@temporalio/testing';
910
import { CancelReason } from '@temporalio/worker/lib/activity';
1011
import * as workflow from '@temporalio/workflow';
11-
import { defineQuery, defineSignal } from '@temporalio/workflow';
12+
import {
13+
condition,
14+
defineQuery,
15+
defineSignal,
16+
defineUpdate,
17+
setDefaultQueryHandler,
18+
setDefaultSignalHandler,
19+
setDefaultUpdateHandler,
20+
setHandler,
21+
} from '@temporalio/workflow';
1222
import { SdkFlags } from '@temporalio/workflow/lib/flags';
1323
import {
1424
ActivityCancellationType,
@@ -20,12 +30,17 @@ import {
2030
TypedSearchAttributes,
2131
WorkflowExecutionAlreadyStartedError,
2232
} from '@temporalio/common';
33+
import {
34+
TEMPORAL_RESERVED_PREFIX,
35+
STACK_TRACE_QUERY_NAME,
36+
ENHANCED_STACK_TRACE_QUERY_NAME,
37+
} from '@temporalio/common/lib/reserved';
2338
import { signalSchedulingWorkflow } from './activities/helpers';
2439
import { activityStartedSignal } from './workflows/definitions';
2540
import * as workflows from './workflows';
2641
import { Context, createLocalTestEnvironment, helpers, makeTestFunction } from './helpers-integration';
2742
import { overrideSdkInternalFlag } from './mock-internal-flags';
28-
import { asSdkLoggerSink, loadHistory, RUN_TIME_SKIPPING_TESTS, waitUntil } from './helpers';
43+
import { loadHistory, RUN_TIME_SKIPPING_TESTS, waitUntil } from './helpers';
2944

3045
const test = makeTestFunction({
3146
workflowsPath: __filename,
@@ -1126,53 +1141,6 @@ test('Workflow can upsert memo', async (t) => {
11261141
});
11271142
});
11281143

1129-
test('Sink functions contains upserted memo', async (t) => {
1130-
const { createWorker, executeWorkflow } = helpers(t);
1131-
const recordedMessages = Array<{ message: string; memo: Record<string, unknown> | undefined }>();
1132-
const sinks = asSdkLoggerSink(async (info, message, _attrs) => {
1133-
recordedMessages.push({
1134-
message,
1135-
memo: info.memo,
1136-
});
1137-
});
1138-
const worker = await createWorker({ sinks });
1139-
await worker.runUntil(async () => {
1140-
await executeWorkflow(upsertAndReadMemo, {
1141-
memo: {
1142-
note1: 'aaa',
1143-
note2: 'bbb',
1144-
note4: 'eee',
1145-
},
1146-
args: [
1147-
{
1148-
note2: 'ccc',
1149-
note3: 'ddd',
1150-
note4: null,
1151-
},
1152-
],
1153-
});
1154-
});
1155-
1156-
t.deepEqual(recordedMessages, [
1157-
{
1158-
message: 'Workflow started',
1159-
memo: {
1160-
note1: 'aaa',
1161-
note2: 'bbb',
1162-
note4: 'eee',
1163-
},
1164-
},
1165-
{
1166-
message: 'Workflow completed',
1167-
memo: {
1168-
note1: 'aaa',
1169-
note2: 'ccc',
1170-
note3: 'ddd',
1171-
},
1172-
},
1173-
]);
1174-
});
1175-
11761144
export async function langFlagsReplayCorrectly(): Promise<void> {
11771145
const { noopActivity } = workflow.proxyActivities({ scheduleToCloseTimeout: '10s' });
11781146
await workflow.CancellationScope.withTimeout('10s', async () => {
@@ -1440,3 +1408,232 @@ test('Workflow can return root workflow', async (t) => {
14401408
t.deepEqual(result, 'empty test-root-workflow-length');
14411409
});
14421410
});
1411+
1412+
const reservedNames = [TEMPORAL_RESERVED_PREFIX, STACK_TRACE_QUERY_NAME, ENHANCED_STACK_TRACE_QUERY_NAME];
1413+
1414+
test('Cannot register activities using reserved prefixes', async (t) => {
1415+
const { createWorker } = helpers(t);
1416+
1417+
for (const name of reservedNames) {
1418+
const activityName = name === TEMPORAL_RESERVED_PREFIX ? name + '_test' : name;
1419+
await t.throwsAsync(
1420+
createWorker({
1421+
activities: { [activityName]: () => {} },
1422+
}),
1423+
{
1424+
name: 'TypeError',
1425+
message:
1426+
name === TEMPORAL_RESERVED_PREFIX
1427+
? `Cannot use activity name: '${activityName}', with reserved prefix: '${name}'`
1428+
: `Cannot use activity name: '${activityName}', which is a reserved name`,
1429+
}
1430+
);
1431+
}
1432+
});
1433+
1434+
test('Cannot register task queues using reserved prefixes', async (t) => {
1435+
const { createWorker } = helpers(t);
1436+
1437+
for (const name of reservedNames) {
1438+
const taskQueue = name === TEMPORAL_RESERVED_PREFIX ? name + '_test' : name;
1439+
1440+
await t.throwsAsync(
1441+
createWorker({
1442+
taskQueue,
1443+
}),
1444+
{
1445+
name: 'TypeError',
1446+
message:
1447+
name === TEMPORAL_RESERVED_PREFIX
1448+
? `Cannot use task queue name: '${taskQueue}', with reserved prefix: '${name}'`
1449+
: `Cannot use task queue name: '${taskQueue}', which is a reserved name`,
1450+
}
1451+
);
1452+
}
1453+
});
1454+
1455+
test('Cannot register sinks using reserved prefixes', async (t) => {
1456+
const { createWorker } = helpers(t);
1457+
1458+
for (const name of reservedNames) {
1459+
const sinkName = name === TEMPORAL_RESERVED_PREFIX ? name + '_test' : name;
1460+
await t.throwsAsync(
1461+
createWorker({
1462+
sinks: {
1463+
[sinkName]: {
1464+
test: {
1465+
fn: () => {},
1466+
},
1467+
},
1468+
},
1469+
}),
1470+
{
1471+
name: 'TypeError',
1472+
message:
1473+
name === TEMPORAL_RESERVED_PREFIX
1474+
? `Cannot use sink name: '${sinkName}', with reserved prefix: '${name}'`
1475+
: `Cannot use sink name: '${sinkName}', which is a reserved name`,
1476+
}
1477+
);
1478+
}
1479+
});
1480+
1481+
interface HandlerError {
1482+
name: string;
1483+
message: string;
1484+
}
1485+
1486+
export async function workflowReservedNameHandler(name: string): Promise<HandlerError[]> {
1487+
// Re-package errors, default payload converter has trouble converting native errors (no 'data' field).
1488+
const expectedErrors: HandlerError[] = [];
1489+
try {
1490+
setHandler(defineSignal(name === TEMPORAL_RESERVED_PREFIX ? name + '_signal' : name), () => {});
1491+
} catch (e) {
1492+
if (e instanceof Error) {
1493+
expectedErrors.push({ name: e.name, message: e.message });
1494+
}
1495+
}
1496+
try {
1497+
setHandler(defineUpdate(name === TEMPORAL_RESERVED_PREFIX ? name + '_update' : name), () => {});
1498+
} catch (e) {
1499+
if (e instanceof Error) {
1500+
expectedErrors.push({ name: e.name, message: e.message });
1501+
}
1502+
}
1503+
try {
1504+
setHandler(defineQuery(name === TEMPORAL_RESERVED_PREFIX ? name + '_query' : name), () => {});
1505+
} catch (e) {
1506+
if (e instanceof Error) {
1507+
expectedErrors.push({ name: e.name, message: e.message });
1508+
}
1509+
}
1510+
return expectedErrors;
1511+
}
1512+
1513+
test('Workflow failure if define signals/updates/queries with reserved prefixes', async (t) => {
1514+
const { createWorker, executeWorkflow } = helpers(t);
1515+
const worker = await createWorker();
1516+
await worker.runUntil(async () => {
1517+
for (const name of reservedNames) {
1518+
const result = await executeWorkflow(workflowReservedNameHandler, {
1519+
args: [name],
1520+
});
1521+
t.deepEqual(result, [
1522+
{
1523+
name: 'TypeError',
1524+
message:
1525+
name === TEMPORAL_RESERVED_PREFIX
1526+
? `Cannot use signal name: '${name}_signal', with reserved prefix: '${name}'`
1527+
: `Cannot use signal name: '${name}', which is a reserved name`,
1528+
},
1529+
{
1530+
name: 'TypeError',
1531+
message:
1532+
name === TEMPORAL_RESERVED_PREFIX
1533+
? `Cannot use update name: '${name}_update', with reserved prefix: '${name}'`
1534+
: `Cannot use update name: '${name}', which is a reserved name`,
1535+
},
1536+
{
1537+
name: 'TypeError',
1538+
message:
1539+
name === TEMPORAL_RESERVED_PREFIX
1540+
? `Cannot use query name: '${name}_query', with reserved prefix: '${name}'`
1541+
: `Cannot use query name: '${name}', which is a reserved name`,
1542+
},
1543+
]);
1544+
}
1545+
});
1546+
});
1547+
1548+
export const wfReadyQuery = defineQuery<boolean>('wf-ready');
1549+
export async function workflowWithDefaultHandlers(): Promise<void> {
1550+
let unblocked = false;
1551+
setHandler(defineSignal('unblock'), () => {
1552+
unblocked = true;
1553+
});
1554+
1555+
setDefaultQueryHandler(() => {});
1556+
setDefaultSignalHandler(() => {});
1557+
setDefaultUpdateHandler(() => {});
1558+
setHandler(wfReadyQuery, () => true);
1559+
1560+
await condition(() => unblocked);
1561+
}
1562+
1563+
test('Default handlers fail given reserved prefix', async (t) => {
1564+
const { createWorker, startWorkflow } = helpers(t);
1565+
const worker = await createWorker();
1566+
1567+
const assertWftFailure = async (handle: WorkflowHandle, errMsg: string) => {
1568+
await asyncRetry(
1569+
async () => {
1570+
const history = await handle.fetchHistory();
1571+
const wftFailedEvent = history.events?.findLast((ev) => ev.workflowTaskFailedEventAttributes);
1572+
if (wftFailedEvent === undefined) {
1573+
throw new Error('No WFT failed event found');
1574+
}
1575+
const { failure } = wftFailedEvent.workflowTaskFailedEventAttributes ?? {};
1576+
if (!failure) {
1577+
return t.fail('Expected failure in workflowTaskFailedEventAttributes');
1578+
}
1579+
t.is(failure.message, errMsg);
1580+
},
1581+
{ minTimeout: 300, factor: 1, retries: 10 }
1582+
);
1583+
};
1584+
1585+
await worker.runUntil(async () => {
1586+
// Reserved query
1587+
let handle = await startWorkflow(workflowWithDefaultHandlers);
1588+
await asyncRetry(async () => {
1589+
if (!(await handle.query(wfReadyQuery))) {
1590+
throw new Error('Workflow not ready yet');
1591+
}
1592+
});
1593+
const queryName = `${TEMPORAL_RESERVED_PREFIX}_query`;
1594+
await t.throwsAsync(
1595+
handle.query(queryName),
1596+
{
1597+
// TypeError transforms to a QueryNotRegisteredError on the way back from server
1598+
name: 'QueryNotRegisteredError',
1599+
message: `Cannot use query name: '${queryName}', with reserved prefix: '${TEMPORAL_RESERVED_PREFIX}'`,
1600+
},
1601+
`Query ${queryName} should fail`
1602+
);
1603+
await handle.terminate();
1604+
1605+
// Reserved signal
1606+
handle = await startWorkflow(workflowWithDefaultHandlers);
1607+
await asyncRetry(async () => {
1608+
if (!(await handle.query(wfReadyQuery))) {
1609+
throw new Error('Workflow not ready yet');
1610+
}
1611+
});
1612+
const signalName = `${TEMPORAL_RESERVED_PREFIX}_signal`;
1613+
await handle.signal(signalName);
1614+
await assertWftFailure(
1615+
handle,
1616+
`Cannot use signal name: '${signalName}', with reserved prefix: '${TEMPORAL_RESERVED_PREFIX}'`
1617+
);
1618+
await handle.terminate();
1619+
1620+
// Reserved update
1621+
handle = await startWorkflow(workflowWithDefaultHandlers);
1622+
await asyncRetry(async () => {
1623+
if (!(await handle.query(wfReadyQuery))) {
1624+
throw new Error('Workflow not ready yet');
1625+
}
1626+
});
1627+
const updateName = `${TEMPORAL_RESERVED_PREFIX}_update`;
1628+
handle.executeUpdate(updateName).catch(() => {
1629+
// Expect failure. The error caught here is a WorkflowNotFound because
1630+
// the workflow will have already failed, so the update cannot go through.
1631+
// We assert on the expected failure below.
1632+
});
1633+
await assertWftFailure(
1634+
handle,
1635+
`Cannot use update name: '${updateName}', with reserved prefix: '${TEMPORAL_RESERVED_PREFIX}'`
1636+
);
1637+
await handle.terminate();
1638+
});
1639+
});

0 commit comments

Comments
 (0)