From a45cac8bf3eee86fce15368117445350d0f6c60b Mon Sep 17 00:00:00 2001 From: Abdullatif Alrashdan Date: Sun, 1 Feb 2026 16:12:17 +0000 Subject: [PATCH 1/3] Add background-workers agent example with Node.js Worker Threads This commit adds a new sample demonstrating how to use Node.js Worker Threads to handle CPU-blocking operations without freezing the agent's main thread. New Features: - Background worker agent using worker_threads for CPU-intensive tasks - Fire-and-forget pattern: agent returns immediately without waiting for workers - get_task_status tool using app.getAsyncTaskInfo() for monitoring active tasks - Non-blocking architecture allowing concurrent request handling - Comprehensive documentation on blocking vs non-blocking operations Changes: - primitives/runtime/background-workers/: New sample with agent, worker, and docs - primitives/runtime/async-agent/: Added get_task_status tool for consistency - primitives/runtime/README.md: Added link to new background-workers sample The example demonstrates: - Worker threads for blocking operations (loops, dataset processing, calculations) - Main thread stays responsive while workers process in background - Multiple concurrent workers can run simultaneously - Health status tracking (Healthy/HealthyBusy) - Message passing between main thread and worker threads --- primitives/runtime/README.md | 4 + primitives/runtime/async-agent/README.md | 53 ++- primitives/runtime/async-agent/agent.ts | 29 +- .../runtime/background-workers/README.md | 414 ++++++++++++++++++ .../runtime/background-workers/agent.ts | 99 +++++ .../runtime/background-workers/package.json | 25 ++ .../runtime/background-workers/tsconfig.json | 14 + .../runtime/background-workers/worker.ts | 27 ++ 8 files changed, 661 insertions(+), 4 deletions(-) create mode 100644 primitives/runtime/background-workers/README.md create mode 100644 primitives/runtime/background-workers/agent.ts create mode 100644 primitives/runtime/background-workers/package.json create mode 100644 primitives/runtime/background-workers/tsconfig.json create mode 100644 primitives/runtime/background-workers/worker.ts diff --git a/primitives/runtime/README.md b/primitives/runtime/README.md index 0515763..7271922 100644 --- a/primitives/runtime/README.md +++ b/primitives/runtime/README.md @@ -150,6 +150,10 @@ Full-duplex WebSocket communication for real-time applications. Long-running background tasks with automatic health status tracking. +### [background-workers](./background-workers/) + +Node.js Worker Threads for CPU-blocking operations. Demonstrates how to offload blocking tasks (loops, dataset processing, calculations) to separate threads, keeping the agent responsive and able to handle concurrent requests. + --- ## Running Locally diff --git a/primitives/runtime/async-agent/README.md b/primitives/runtime/async-agent/README.md index 51b1af5..0e42b95 100644 --- a/primitives/runtime/async-agent/README.md +++ b/primitives/runtime/async-agent/README.md @@ -14,7 +14,9 @@ Agent code communicates its processing status using the "/ping" endpoint health ## How It Works -The agent uses the AgentCore Runtime's task tracking system to manage background jobs: +The agent provides two tools for managing and monitoring background tasks: + +### 1. Start Background Task ```typescript const startBackgroundTask = tool({ @@ -37,6 +39,29 @@ const startBackgroundTask = tool({ When a task is registered with `addAsyncTask()`, the runtime's health endpoint (`/ping`) automatically returns `HealthyBusy`. Once `completeAsyncTask()` is called, the status returns to `Healthy`. +### 2. Get Task Status + +```typescript +const getTaskStatus = tool({ + name: 'get_task_status', + description: 'Get information about currently running background tasks', + inputSchema: z.object({}), + callback: async (): Promise => { + const taskStatus = app.getAsyncTaskInfo() + // Returns: { activeCount: number, runningJobs: Array<{ name: string, duration: number }> } + + if (taskStatus.activeCount === 0) { + return 'No background tasks currently running. Agent status is Healthy.' + } + + // Return task details + return `Currently running tasks: ${taskStatus.activeCount}...` + } +}) +``` + +This tool uses `app.getAsyncTaskInfo()` to query all currently active async tasks, allowing users to check what background tasks are running. + ## Build and Run Locally ### Prerequisites @@ -67,7 +92,7 @@ BedrockAgentCoreApp server listening on port 8080 ## Testing the Async Agent -**Note:** The `/ping` endpoint is only accessible when running locally. In deployed agents, AgentCore uses this endpoint internally for health monitoring and does not expose it to external traffic. +**Note:** The `/ping` endpoint is only accessible when running locally for testing purposes. When the agent is deployed publicly on Amazon Bedrock AgentCore, this endpoint is not exposed to external traffic. AgentCore uses the `/ping` endpoint internally for health monitoring to manage agent session lifecycle. ### 1. Check Initial Health Status @@ -116,7 +141,29 @@ Response while task is active: } ``` -### 4. Check Status After Task Completes +### 4. Query Running Tasks + +While tasks are running, query their status using the agent: + +```bash +curl -X POST http://localhost:8080/invocations \ + -H "Content-Type: application/json" \ + -d '{"prompt": "what tasks are running?"}' +``` + +Response will show currently active tasks: + +``` +Currently running background tasks (1): + +Task 1: + - Name: background_processing + - Duration: 20 seconds + +Agent status: HealthyBusy +``` + +### 5. Check Status After Task Completes Wait for the task duration to complete, then check again: diff --git a/primitives/runtime/async-agent/agent.ts b/primitives/runtime/async-agent/agent.ts index 59958a4..4c01f40 100644 --- a/primitives/runtime/async-agent/agent.ts +++ b/primitives/runtime/async-agent/agent.ts @@ -2,6 +2,33 @@ import { Agent, BedrockModel, tool } from '@strands-agents/sdk' import { BedrockAgentCoreApp } from 'bedrock-agentcore/runtime' import { z } from 'zod' +const getTaskStatus = tool({ + name: 'get_task_status', + description: 'Get information about currently running background tasks and agent status', + inputSchema: z.object({}), + callback: async (): Promise => { + const taskStatus = app.getAsyncTaskInfo() + + if (taskStatus.activeCount === 0) { + return 'No background tasks are currently running. Agent status is Healthy.' + } + + const taskList = taskStatus.runningJobs + .map((job, index) => { + return `Task ${index + 1}: + - Name: ${job.name} + - Duration: ${job.duration} seconds` + }) + .join('\n\n') + + return `Currently running background tasks (${taskStatus.activeCount}): + +${taskList} + +Agent status: HealthyBusy` + }, +}) + const startBackgroundTask = tool({ name: 'start_background_task', description: 'Start a simple background task that runs for specified duration', @@ -25,7 +52,7 @@ const agent = new Agent({ modelId: 'global.amazon.nova-2-lite-v1:0', region: process.env['AWS_REGION'] ?? 'us-east-1', }), - tools: [startBackgroundTask], + tools: [startBackgroundTask, getTaskStatus], }) const app = new BedrockAgentCoreApp({ diff --git a/primitives/runtime/background-workers/README.md b/primitives/runtime/background-workers/README.md new file mode 100644 index 0000000..a7f6300 --- /dev/null +++ b/primitives/runtime/background-workers/README.md @@ -0,0 +1,414 @@ +# Background Workers Agent - Strands + +Deploy an AI agent that uses Node.js Worker Threads to handle CPU-blocking tasks without freezing the main thread. This demonstrates how to keep your agent responsive and able to accept new requests while performing blocking operations like loops, dataset processing, cryptographic operations, or complex calculations. + +## What This Sample Demonstrates + +- **Worker Threads for Blocking Tasks**: Offload CPU-blocking operations (loops, data processing, calculations) to separate threads +- **Non-Blocking Architecture**: Main thread stays responsive and accepts new requests while workers process +- **Concurrent Request Handling**: Agent can handle multiple requests simultaneously without blocking +- **Message Passing**: Communication between main thread and worker threads via events +- **Health Status Tracking**: Agent status changes from `Healthy` to `HealthyBusy` during processing which is used by AgentCore to keep session alive. +- **Real-World Use Cases**: Image processing, log analysis, data transformation, encryption, etc. + +## Why Use Background Workers? + +### The Problem: Blocking Operations Freeze Your Agent + +Without worker threads, blocking operations completely freeze the main thread, preventing your agent from accepting ANY new incoming requests: + +```typescript +// ❌ BAD: Blocks the main thread +callback: async (): Promise => { + // Tight loop blocks for 10 seconds + for (let i = 0; i < 10000000000; i++) { + // CPU-intensive calculation + } + + // OR processing large datasets + const data = readFileSync('large-file.json') + for (const record of millionsOfRecords) { + // Process each record - blocks for minutes + } + + return 'Done' +} + +// Result: +// - Main thread is frozen during processing +// - Agent cannot accept new requests +// - All incoming requests wait or timeout +// - Other users cannot be served +``` + +**Impact on concurrent users:** +- User A: "Process this dataset" → Blocks agent for 10 seconds +- User B: "What's 2+2?" → Request blocked, must wait 10 seconds for User A to complete +- User C: Attempts to connect → Connection refused or times out while agent is blocked + +### The Solution: Worker Threads Keep Agent Responsive + +Worker threads run blocking operations in separate OS threads, keeping the main thread free to accept requests: + +```typescript +// ✅ GOOD: Offloads blocking work to worker thread +callback: async (): Promise => { + const worker = new Worker('./worker.js') + + return new Promise((resolve) => { + // Main thread sends work to worker (non-blocking) + worker.postMessage({ task: 'process' }) + + // Main thread waits for response (async, non-blocking) + worker.on('message', (result) => { + resolve(result) + }) + }) +} + +// Result: +// - Main thread stays free +// - Agent accepts new requests immediately +// - Worker thread does blocking work in parallel +// - All users can be served concurrently +``` + +**Impact on concurrent users:** +- User A: "Process this dataset" → Worker spawned, processes for 10 seconds in background +- User B: "What's 2+2?" → Responds immediately (main thread remains available) +- User C: "Tell me a joke" → Responds immediately (main thread remains available) + +### Common Blocking Operations That Need Workers + +Use worker threads for these CPU-blocking tasks: + +1. **Tight loops**: `while`, `for` loops that run millions of iterations +2. **Large dataset processing**: Parsing/transforming megabytes of data +3. **Complex calculations**: Mathematical computations, simulations +4. **File processing**: Reading/parsing large files (logs, CSVs, JSON) +5. **Cryptographic operations**: Hashing, encryption, key generation +6. **Image/video processing**: Resizing, compression, format conversion +7. **Data aggregation**: Sorting, filtering, grouping large datasets + +### Benefits + +1. **Agent stays responsive** - Main thread free to accept new requests +2. **No request blocking** - Incoming requests don't wait for processing +3. **Concurrent processing** - Multiple workers run in parallel +4. **Better scalability** - Serve multiple users simultaneously +5. **Prevents timeouts** - Long tasks don't cause connection timeouts + +## How It Works + +### Agent Tools (agent.ts) + +The agent provides two tools: + +#### 1. Start Background Worker +```typescript +const startBackgroundWorker = tool({ + name: 'start_background_worker', + description: 'Start a background worker for CPU-intensive processing', + inputSchema: z.object({ + duration: z.number().default(5) + }), + callback: async (input: { duration: number }): Promise => { + const taskId = app.addAsyncTask('background_worker_processing', { duration }) + const worker = new Worker(workerPath) + + // Submit job to worker + worker.postMessage({ duration }) + + // Set up completion handlers (fire and forget) + worker.on('message', (message) => { + app.completeAsyncTask(taskId) + worker.terminate() + }) + + // Return immediately without waiting + return `Worker started (Task ID: ${taskId}). Processing in background...` + } +}) +``` + +**Key Point:** The callback returns immediately after starting the worker, without waiting for it to complete. This is true "fire and forget" - the agent can accept new requests while the worker processes in the background. + +#### 2. Get Task Status + +```typescript +const getTaskStatus = tool({ + name: 'get_task_status', + description: 'Get information about currently running background tasks', + inputSchema: z.object({}), + callback: async (): Promise => { + const taskStatus = app.getAsyncTaskInfo() + // Returns: { activeCount: number, runningJobs: Array<{ name: string, duration: number }> } + + if (taskStatus.activeCount === 0) { + return 'No background tasks currently running. Agent status is Healthy.' + } + + // Return task details: name, duration + return `Currently running tasks: ${taskStatus.activeCount}...` + } +}) +``` + +This tool uses `app.getAsyncTaskInfo()` to query the async task status, which returns: +- `activeCount`: Number of currently running background tasks +- `runningJobs`: Array of jobs with `name` and `duration` properties + +### Worker Thread (worker.ts) +```typescript +parentPort.on('message', (message: { duration: number }) => { + // CPU-intensive blocking loop + // This blocks the WORKER thread, NOT the main thread + const end = Date.now() + message.duration * 1000 + while (Date.now() < end) { /* busy loop */ } + + // Send completion message back to main thread + parentPort?.postMessage({ status: 'completed' }) +}) +``` + +### Health Status +When a task is registered with `addAsyncTask()`, the `/ping` endpoint automatically returns `HealthyBusy`. Once `completeAsyncTask()` is called, status returns to `Healthy`. + +**Note:** The `/ping` endpoint is used by Amazon Bedrock AgentCore for internal health monitoring and session management. It is only accessible locally during development and is not exposed when the agent is deployed publicly. + +## Build and Run Locally + +### Prerequisites + +- Node.js 20+ +- AWS credentials configured (for Bedrock API access) +- AgentCore Starter Toolkit installed + +### Run Locally with Hot Reload + +```bash +# Start dev server with automatic reload +agentcore dev +``` + +Test with the CLI: + +```bash +agentcore invoke --dev '{"prompt": "What is 25 * 4?"}' +``` + +Or with curl: + +``` +curl -X POST http://localhost:8080/invocations -H "Content-Type: application/json" -d '{"prompt": "start a 3 second worker"}' +BedrockAgentCoreApp server listening on port 8080 +``` + +## Demonstrating Non-Blocking Behavior + +**Note:** The `/ping` endpoint is only accessible when running locally for testing purposes. When the agent is deployed publicly on Amazon Bedrock AgentCore, this endpoint is not exposed to external traffic. AgentCore uses the `/ping` endpoint internally for health monitoring to manage agent session lifecycle. + +### Test 1: Basic Worker Execution + +Start a 5-second background worker: + +```bash +curl -X POST http://localhost:8080/invocations \ + -H "Content-Type: application/json" \ + -d '{"prompt": "start a 5 second background task"}' +``` + +The agent will: +1. Start the worker thread +2. Return immediately with "Worker started" message +3. Worker processes for 5 seconds in the background +4. Agent is available to handle new requests immediately + +**Important:** The agent does not wait for the worker to complete. It returns immediately after starting the worker. + +### Test 1b: Query Task Status + +While a worker is running, check what tasks are currently active: + +```bash +curl -X POST http://localhost:8080/invocations \ + -H "Content-Type: application/json" \ + -d '{"prompt": "what background tasks are running?"}' +``` + +The agent will use the `get_task_status` tool to query `app.getAsyncTaskInfo()` and return: +- Number of active tasks (`activeCount`) +- List of running jobs with name and duration +- Current agent status (Healthy or HealthyBusy) + +### Test 2: Concurrent Requests (The Key Benefit) + +This demonstrates that the agent returns immediately and can handle concurrent requests while workers process in the background. + +**Terminal 1:** Start a long-running worker (10 seconds) +```bash +curl -X POST http://localhost:8080/invocations \ + -H "Content-Type: application/json" \ + -d '{"prompt": "run a background task for 10 seconds"}' +# Response: "Worker started..." (returns in ~500ms) +``` + +**Terminal 2:** Immediately send another request +```bash +curl -X POST http://localhost:8080/invocations \ + -H "Content-Type: application/json" \ + -d '{"prompt": "what is 25 times 4?"}' +# Response: "625" (returns immediately, ~500ms) +``` + +**Result:** Both requests return immediately. The first request starts a worker and returns right away without waiting. The second request is handled normally. The 10-second worker processes in the background without blocking either request. + +### Test 3: Health Status Monitoring + +Monitor the agent's health status while a worker processes in the background. + +**Check initial status:** +```bash +curl http://localhost:8080/ping +# Response: {"status": "Healthy"} +``` + +**Start a 10-second worker:** +```bash +curl -X POST http://localhost:8080/invocations \ + -d '{"prompt": "start 10 second worker"}' +# Response: "Worker started..." (returns immediately) +``` + +**Immediately check status (worker is now processing):** +```bash +curl http://localhost:8080/ping +# Response: {"status": "HealthyBusy"} +``` + +**Wait 10 seconds for worker to complete, then check again:** +```bash +sleep 10 +curl http://localhost:8080/ping +# Response: {"status": "Healthy"} +``` + +This demonstrates that: +- Agent returns immediately after starting the worker +- Health status changes to HealthyBusy while worker processes +- Agent can still accept requests (try sending another request while status is HealthyBusy) +- Status returns to Healthy when worker completes + +### Test 4: Query Running Tasks + +Start multiple workers and query their status: + +```bash +# Start first worker +curl -X POST http://localhost:8080/invocations \ + -d '{"prompt": "start a 15 second worker"}' + +# Start second worker +curl -X POST http://localhost:8080/invocations \ + -d '{"prompt": "start a 10 second worker"}' + +# Query task status +curl -X POST http://localhost:8080/invocations \ + -d '{"prompt": "what tasks are running?"}' +``` + +Response will show: +``` +Currently running background tasks (2): + +Task 1: + - Name: background_worker_processing + - Duration: 15 seconds + +Task 2: + - Name: background_worker_processing + - Duration: 10 seconds + +Agent status: HealthyBusy +``` + +## Deploying to AWS + +### Prerequisites + +- AWS credentials configured +- AgentCore Starter Toolkit installed + +### Deployment Steps + +1. **Configure the Agent** + +```bash +agentcore configure +``` + +This will prompt you to configure deployment settings for your agent. + +2. **Deploy to AWS** + +```bash +agentcore deploy +``` + +This will: + +- Build and containerize the agent +- Push the container image to Amazon ECR +- Create necessary IAM roles and permissions +- Deploy the agent to Amazon Bedrock AgentCore Runtime + +The deployment outputs the Runtime ARN which you can use to invoke the agent. + +### Testing the Deployed Agent + +```bash +# Invoke the deployed agent +agentcore invoke --payload '{"prompt": "run a background task for 10 seconds"}' +``` + +## Architecture + +This sample demonstrates the integration of Node.js Worker Threads with Strands SDK and Amazon Bedrock AgentCore Runtime: + +### Components + +1. **Agent (agent.ts)**: + - Defines the `start_background_worker` tool + - Spawns worker threads (fire and forget) + - Returns immediately without waiting for worker completion + - Tracks async tasks for health status reporting + +2. **Worker (worker.ts)**: + - Runs CPU-intensive processing in separate thread + - Performs blocking operations without affecting main thread + - Sends completion message back to main thread to mark async task as complete, changing agent status from HealthyBusy to Healthy + +3. **Message Passing Flow (Fire and Forget)**: + ``` + User Request → Agent → Spawn Worker → Return "Worker started" + ↓ + (Agent already responded) + ↓ + Worker processes in background + ↓ + Worker completes, updates health status + ``` + + **Key:** The agent does not wait for the worker. It starts the worker and returns immediately, enabling true non-blocking concurrent request handling. + +4. **Health Status**: + - `addAsyncTask()`: Marks agent as `HealthyBusy` + - Worker processes in background + - `completeAsyncTask()`: Returns to `Healthy` + +### Key Benefits + +- **Non-blocking**: Main thread handles new requests while workers process +- **Concurrent**: Multiple workers can run simultaneously +- **Scalable**: Each worker runs in its own OS thread +- **Responsive**: Agent stays available during heavy processing diff --git a/primitives/runtime/background-workers/agent.ts b/primitives/runtime/background-workers/agent.ts new file mode 100644 index 0000000..0c72686 --- /dev/null +++ b/primitives/runtime/background-workers/agent.ts @@ -0,0 +1,99 @@ +import { Agent, BedrockModel, tool } from '@strands-agents/sdk' +import { BedrockAgentCoreApp } from 'bedrock-agentcore/runtime' +import { Worker } from 'worker_threads' +import { fileURLToPath } from 'url' +import { dirname, join } from 'path' +import { z } from 'zod' + +const getTaskStatus = tool({ + name: 'get_task_status', + description: 'Get information about currently running background tasks and agent status', + inputSchema: z.object({}), + callback: async (): Promise => { + const taskStatus = app.getAsyncTaskInfo() + + if (taskStatus.activeCount === 0) { + return 'No background tasks are currently running. Agent status is Healthy.' + } + + const taskList = taskStatus.runningJobs + .map((job, index) => { + return `Task ${index + 1}: + - Name: ${job.name} + - Duration: ${job.duration} seconds` + }) + .join('\n\n') + + return `Currently running background tasks (${taskStatus.activeCount}): + +${taskList} + +Agent status: HealthyBusy` + }, +}) + +const startBackgroundWorker = tool({ + name: 'start_background_worker', + description: 'Start a background worker that performs CPU-intensive processing without blocking', + inputSchema: z.object({ + duration: z.number().default(5).describe('Duration in seconds for CPU-intensive processing'), + }), + callback: async (input: { duration: number }): Promise => { + const duration = input.duration + const taskId = app.addAsyncTask('background_worker_processing', { duration }) + + const __filename = fileURLToPath(import.meta.url) + const __dirname = dirname(__filename) + const workerPath = join(__dirname, 'worker.js') + + const worker = new Worker(workerPath) + + console.log(`Submitting ${duration}s CPU-intensive task to background worker...`) + worker.postMessage({ duration }) + + // Set up event handlers for when worker completes (fire and forget) + worker.on('message', (message) => { + console.log('Worker completed:', message) + app.completeAsyncTask(taskId) + worker.terminate() + }) + + worker.on('error', (error) => { + console.error('Worker error:', error) + app.completeAsyncTask(taskId) + worker.terminate() + }) + + worker.on('exit', (code) => { + if (code !== 0) { + console.error(`Worker stopped with exit code ${code}`) + } + }) + + // Return immediately without waiting for worker to complete + return `Background worker started (Task ID: ${taskId}) for ${duration} seconds of CPU-intensive processing. + +The worker is now processing in the background. The agent is immediately available to handle new requests. +Check the /ping endpoint to monitor status: it will show HealthyBusy while the worker processes, then return to Healthy when complete.` + }, +}) + +const agent = new Agent({ + model: new BedrockModel({ + modelId: 'global.amazon.nova-2-lite-v1:0', + region: process.env['AWS_REGION'] ?? 'us-east-1', + }), + tools: [startBackgroundWorker, getTaskStatus], +}) + +const app = new BedrockAgentCoreApp({ + invocationHandler: { + requestSchema: z.object({ prompt: z.string() }), + process: async function* (request, _context) { + const response = await agent.invoke(request.prompt); + yield { event: 'message', data: { text: response.lastMessage } } + }, + }, +}) + +app.run() diff --git a/primitives/runtime/background-workers/package.json b/primitives/runtime/background-workers/package.json new file mode 100644 index 0000000..9e9da1b --- /dev/null +++ b/primitives/runtime/background-workers/package.json @@ -0,0 +1,25 @@ +{ + "name": "background-workers-agent", + "version": "1.0.0", + "type": "module", + "main": "dist/agent.js", + "scripts": { + "build": "./node_modules/typescript/bin/tsc", + "dev": "tsx watch agent.ts", + "start": "node dist/agent.js" + }, + "engines": { + "node": ">=20" + }, + "dependencies": { + "@opentelemetry/auto-instrumentations-node": "^0.68.0", + "@strands-agents/sdk": "^0.1.5", + "bedrock-agentcore": "^0.2.0", + "prettier": "^3.8.0" + }, + "devDependencies": { + "@types/node": "^25.0.9", + "tsx": "^4.21.0", + "typescript": "^5.9.3" + } +} diff --git a/primitives/runtime/background-workers/tsconfig.json b/primitives/runtime/background-workers/tsconfig.json new file mode 100644 index 0000000..db283bc --- /dev/null +++ b/primitives/runtime/background-workers/tsconfig.json @@ -0,0 +1,14 @@ +{ + "compilerOptions": { + "target": "ES2022", + "module": "NodeNext", + "moduleResolution": "NodeNext", + "outDir": "./dist", + "rootDir": ".", + "strict": true, + "esModuleInterop": true, + "skipLibCheck": true + }, + "include": ["agent.ts", "worker.ts"], + "exclude": ["node_modules", "dist"] +} diff --git a/primitives/runtime/background-workers/worker.ts b/primitives/runtime/background-workers/worker.ts new file mode 100644 index 0000000..e4f3b09 --- /dev/null +++ b/primitives/runtime/background-workers/worker.ts @@ -0,0 +1,27 @@ +import { parentPort } from 'worker_threads' + +if (!parentPort) { + throw new Error('This script must be run as a Worker thread') +} + +parentPort.on('message', (message: { duration: number }) => { + console.log('Worker received message:', message) + + const duration = message.duration || 5 + console.log(`Worker starting CPU-intensive task for ${duration} seconds...`) + + // CPU-intensive blocking loop - simulates heavy processing + // This blocks the WORKER thread, NOT the main thread + const end = Date.now() + duration * 1000 + while (Date.now() < end) { + // Intentional busy loop to simulate CPU work + } + + console.log(`Worker completed ${duration}s of processing`) + + parentPort?.postMessage({ + status: 'completed', + duration, + timestamp: new Date().toISOString(), + }) +}) From 99f756b3d27ae3b8d0cb816d5c0370d4ce72140c Mon Sep 17 00:00:00 2001 From: Abdullatif Alrashdan Date: Sun, 1 Feb 2026 16:21:44 +0000 Subject: [PATCH 2/3] fix prettier formatting --- .../runtime/background-workers/README.md | 27 ++++++++++++++++--- .../runtime/background-workers/agent.ts | 2 +- 2 files changed, 24 insertions(+), 5 deletions(-) diff --git a/primitives/runtime/background-workers/README.md b/primitives/runtime/background-workers/README.md index a7f6300..9e94000 100644 --- a/primitives/runtime/background-workers/README.md +++ b/primitives/runtime/background-workers/README.md @@ -42,6 +42,7 @@ callback: async (): Promise => { ``` **Impact on concurrent users:** + - User A: "Process this dataset" → Blocks agent for 10 seconds - User B: "What's 2+2?" → Request blocked, must wait 10 seconds for User A to complete - User C: Attempts to connect → Connection refused or times out while agent is blocked @@ -74,6 +75,7 @@ callback: async (): Promise => { ``` **Impact on concurrent users:** + - User A: "Process this dataset" → Worker spawned, processes for 10 seconds in background - User B: "What's 2+2?" → Responds immediately (main thread remains available) - User C: "Tell me a joke" → Responds immediately (main thread remains available) @@ -105,12 +107,13 @@ Use worker threads for these CPU-blocking tasks: The agent provides two tools: #### 1. Start Background Worker + ```typescript const startBackgroundWorker = tool({ name: 'start_background_worker', description: 'Start a background worker for CPU-intensive processing', inputSchema: z.object({ - duration: z.number().default(5) + duration: z.number().default(5), }), callback: async (input: { duration: number }): Promise => { const taskId = app.addAsyncTask('background_worker_processing', { duration }) @@ -127,7 +130,7 @@ const startBackgroundWorker = tool({ // Return immediately without waiting return `Worker started (Task ID: ${taskId}). Processing in background...` - } + }, }) ``` @@ -150,21 +153,25 @@ const getTaskStatus = tool({ // Return task details: name, duration return `Currently running tasks: ${taskStatus.activeCount}...` - } + }, }) ``` This tool uses `app.getAsyncTaskInfo()` to query the async task status, which returns: + - `activeCount`: Number of currently running background tasks - `runningJobs`: Array of jobs with `name` and `duration` properties ### Worker Thread (worker.ts) + ```typescript parentPort.on('message', (message: { duration: number }) => { // CPU-intensive blocking loop // This blocks the WORKER thread, NOT the main thread const end = Date.now() + message.duration * 1000 - while (Date.now() < end) { /* busy loop */ } + while (Date.now() < end) { + /* busy loop */ + } // Send completion message back to main thread parentPort?.postMessage({ status: 'completed' }) @@ -172,6 +179,7 @@ parentPort.on('message', (message: { duration: number }) => { ``` ### Health Status + When a task is registered with `addAsyncTask()`, the `/ping` endpoint automatically returns `HealthyBusy`. Once `completeAsyncTask()` is called, status returns to `Healthy`. **Note:** The `/ping` endpoint is used by Amazon Bedrock AgentCore for internal health monitoring and session management. It is only accessible locally during development and is not exposed when the agent is deployed publicly. @@ -219,6 +227,7 @@ curl -X POST http://localhost:8080/invocations \ ``` The agent will: + 1. Start the worker thread 2. Return immediately with "Worker started" message 3. Worker processes for 5 seconds in the background @@ -237,6 +246,7 @@ curl -X POST http://localhost:8080/invocations \ ``` The agent will use the `get_task_status` tool to query `app.getAsyncTaskInfo()` and return: + - Number of active tasks (`activeCount`) - List of running jobs with name and duration - Current agent status (Healthy or HealthyBusy) @@ -246,6 +256,7 @@ The agent will use the `get_task_status` tool to query `app.getAsyncTaskInfo()` This demonstrates that the agent returns immediately and can handle concurrent requests while workers process in the background. **Terminal 1:** Start a long-running worker (10 seconds) + ```bash curl -X POST http://localhost:8080/invocations \ -H "Content-Type: application/json" \ @@ -254,6 +265,7 @@ curl -X POST http://localhost:8080/invocations \ ``` **Terminal 2:** Immediately send another request + ```bash curl -X POST http://localhost:8080/invocations \ -H "Content-Type: application/json" \ @@ -268,12 +280,14 @@ curl -X POST http://localhost:8080/invocations \ Monitor the agent's health status while a worker processes in the background. **Check initial status:** + ```bash curl http://localhost:8080/ping # Response: {"status": "Healthy"} ``` **Start a 10-second worker:** + ```bash curl -X POST http://localhost:8080/invocations \ -d '{"prompt": "start 10 second worker"}' @@ -281,12 +295,14 @@ curl -X POST http://localhost:8080/invocations \ ``` **Immediately check status (worker is now processing):** + ```bash curl http://localhost:8080/ping # Response: {"status": "HealthyBusy"} ``` **Wait 10 seconds for worker to complete, then check again:** + ```bash sleep 10 curl http://localhost:8080/ping @@ -294,6 +310,7 @@ curl http://localhost:8080/ping ``` This demonstrates that: + - Agent returns immediately after starting the worker - Health status changes to HealthyBusy while worker processes - Agent can still accept requests (try sending another request while status is HealthyBusy) @@ -318,6 +335,7 @@ curl -X POST http://localhost:8080/invocations \ ``` Response will show: + ``` Currently running background tasks (2): @@ -389,6 +407,7 @@ This sample demonstrates the integration of Node.js Worker Threads with Strands - Sends completion message back to main thread to mark async task as complete, changing agent status from HealthyBusy to Healthy 3. **Message Passing Flow (Fire and Forget)**: + ``` User Request → Agent → Spawn Worker → Return "Worker started" ↓ diff --git a/primitives/runtime/background-workers/agent.ts b/primitives/runtime/background-workers/agent.ts index 0c72686..452d32c 100644 --- a/primitives/runtime/background-workers/agent.ts +++ b/primitives/runtime/background-workers/agent.ts @@ -90,7 +90,7 @@ const app = new BedrockAgentCoreApp({ invocationHandler: { requestSchema: z.object({ prompt: z.string() }), process: async function* (request, _context) { - const response = await agent.invoke(request.prompt); + const response = await agent.invoke(request.prompt) yield { event: 'message', data: { text: response.lastMessage } } }, }, From 2b3a2f6b421eb5bf85a38cf8fc74dbed277e7372 Mon Sep 17 00:00:00 2001 From: Abdullatif Alrashdan Date: Sun, 1 Feb 2026 16:27:33 +0000 Subject: [PATCH 3/3] updating README.md for async agent - prettier formatting --- primitives/runtime/async-agent/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/primitives/runtime/async-agent/README.md b/primitives/runtime/async-agent/README.md index 0e42b95..75451ef 100644 --- a/primitives/runtime/async-agent/README.md +++ b/primitives/runtime/async-agent/README.md @@ -56,7 +56,7 @@ const getTaskStatus = tool({ // Return task details return `Currently running tasks: ${taskStatus.activeCount}...` - } + }, }) ```