From 9c68952a7a84202dca2d6976819d27e404376709 Mon Sep 17 00:00:00 2001 From: Jonathan Widjaja Date: Mon, 10 Nov 2025 16:18:43 -0700 Subject: [PATCH 1/3] first pass --- apps/framework-cli/src/cli.rs | 12 ++++++++++- apps/framework-cli/src/cli/commands.rs | 4 ++++ apps/framework-cli/src/cli/local_webserver.rs | 19 +++++++++++++++++ .../src/pages/moose/local-dev.mdx | 15 +++++++++++++ .../src/pages/moose/moose-cli.mdx | 5 +++-- .../streaming/streaming_function_runner.py | 13 ++++++++++++ .../src/streaming-functions/runner.ts | 21 ++++++++++++++++++- 7 files changed, 85 insertions(+), 4 deletions(-) diff --git a/apps/framework-cli/src/cli.rs b/apps/framework-cli/src/cli.rs index afcb6bebe2..dc29db3a21 100644 --- a/apps/framework-cli/src/cli.rs +++ b/apps/framework-cli/src/cli.rs @@ -589,10 +589,20 @@ pub async fn top_command_handler( ))) } } - Commands::Dev { no_infra, mcp } => { + Commands::Dev { + no_infra, + mcp, + log_payloads, + } => { info!("Running dev command"); info!("Moose Version: {}", CLI_VERSION); + // Set environment variable for payload logging if flag is enabled + if *log_payloads { + std::env::set_var("MOOSE_LOG_PAYLOADS", "true"); + info!("Payload logging enabled"); + } + let mut project = load_project(commands)?; project.set_is_production_env(false); let project_arc = Arc::new(project); diff --git a/apps/framework-cli/src/cli/commands.rs b/apps/framework-cli/src/cli/commands.rs index 88c7523ac3..e0d8a1ec1b 100644 --- a/apps/framework-cli/src/cli/commands.rs +++ b/apps/framework-cli/src/cli/commands.rs @@ -116,6 +116,10 @@ pub enum Commands { /// Enable or disable the MCP (Model Context Protocol) server #[arg(long, default_value = "true")] mcp: bool, + + /// Log payloads at ingest API and streaming functions for debugging + #[arg(long)] + log_payloads: bool, }, /// Start a remote environment for use in cloud deployments Prod { diff --git a/apps/framework-cli/src/cli/local_webserver.rs b/apps/framework-cli/src/cli/local_webserver.rs index db1ad3b32a..a1b3ecaa2a 100644 --- a/apps/framework-cli/src/cli/local_webserver.rs +++ b/apps/framework-cli/src/cli/local_webserver.rs @@ -1318,6 +1318,25 @@ async fn handle_json_array_body( debug!("parsed json array for {}", topic_name); + // Log payload if enabled (compact JSON on one line) + if std::env::var("MOOSE_LOG_PAYLOADS").is_ok() { + match serde_json::from_slice::(&body) { + Ok(json_value) => { + if let Ok(compact_json) = serde_json::to_string(&json_value) { + info!("[PAYLOAD:INGEST] {}: {}", topic_name, compact_json); + } + } + Err(_) => { + // If we can't parse it, log the raw body (shouldn't happen since we already parsed it above) + info!( + "[PAYLOAD:INGEST] {}: {}", + topic_name, + String::from_utf8_lossy(&body) + ); + } + } + } + let mut records = match parsed { Err(e) => { if let Some(dlq) = dead_letter_queue { diff --git a/apps/framework-docs/src/pages/moose/local-dev.mdx b/apps/framework-docs/src/pages/moose/local-dev.mdx index b9ee8c1e2a..1759f9ff4c 100644 --- a/apps/framework-docs/src/pages/moose/local-dev.mdx +++ b/apps/framework-docs/src/pages/moose/local-dev.mdx @@ -46,6 +46,21 @@ workflows = false # Controls Temporal startup apis = true # Enables Analytics APIs server ``` +### Debugging Data Flow + +To debug data as it flows through your pipeline, use the `--log-payloads` flag: + +```bash +moose dev --log-payloads +``` + +This logs payloads at three key points with searchable prefixes: +- `PAYLOAD:INGEST` - Data received at ingest API +- `PAYLOAD:STREAM_IN` - Data before streaming function +- `PAYLOAD:STREAM_OUT` - Data after streaming function + +All logs appear in Moose logs and can be viewed using `moose logs --tail`. + ### Extending Docker Infrastructure You can extend Moose's Docker Compose configuration with custom services by creating a `docker-compose.dev.override.yaml` file in your project root. This allows you to add additional infrastructure (databases, monitoring tools, etc.) that runs alongside your Moose development environment. diff --git a/apps/framework-docs/src/pages/moose/moose-cli.mdx b/apps/framework-docs/src/pages/moose/moose-cli.mdx index eac9f5eec8..48a1d23218 100644 --- a/apps/framework-docs/src/pages/moose/moose-cli.mdx +++ b/apps/framework-docs/src/pages/moose/moose-cli.mdx @@ -41,10 +41,11 @@ moose build [--docker] [--amd64] [--arm64] ### Dev Starts a local development environment with hot reload and automatic infrastructure management. ```bash -moose dev [--mcp] [--docker] +moose dev [--mcp] [--no-infra] [--log-payloads] ``` - `--mcp`: Enable or disable the MCP (Model Context Protocol) server (default: true). The MCP server provides AI-assisted development tools at `http://localhost:4000/mcp`. See [MCP Server documentation](/moose/mcp-server) for details. -- `--docker`: Use Docker for infrastructure (default behavior in dev mode) +- `--no-infra`: Skip starting docker containers for infrastructure +- `--log-payloads`: Log payloads for debugging data flow The development server includes: - Hot reload for code changes diff --git a/packages/py-moose-lib/moose_lib/streaming/streaming_function_runner.py b/packages/py-moose-lib/moose_lib/streaming/streaming_function_runner.py index b8bf7e522a..b4acbd474a 100644 --- a/packages/py-moose-lib/moose_lib/streaming/streaming_function_runner.py +++ b/packages/py-moose-lib/moose_lib/streaming/streaming_function_runner.py @@ -432,6 +432,10 @@ def process_messages(): # Parse the message into the input type input_data = parse_input(streaming_function_input_type, message.value) + # Log payload before transformation if enabled + if os.getenv('MOOSE_LOG_PAYLOADS') == 'true': + log(f"[PAYLOAD:STREAM_IN] {log_prefix}: {json.dumps(input_data, cls=EnhancedJSONEncoder)}") + # Run the flow all_outputs = [] for (streaming_function_callable, dlq) in streaming_function_callables: @@ -477,6 +481,15 @@ def process_messages(): cli_log(CliLogData(action="Received", message=f'{log_prefix} {len(output_data_list)} message(s)')) + # Log payload after transformation if enabled (what we're actually sending to Kafka) + if os.getenv('MOOSE_LOG_PAYLOADS') == 'true': + # Filter out None values to match what actually gets sent + outgoing_data = [item for item in all_outputs if item is not None] + if len(outgoing_data) > 0: + log(f"[PAYLOAD:STREAM_OUT] {log_prefix}: {json.dumps(outgoing_data, cls=EnhancedJSONEncoder)}") + else: + log(f"[PAYLOAD:STREAM_OUT] {log_prefix}: (no output from streaming function)") + if producer is not None: for item in all_outputs: # Ignore flow function returning null diff --git a/packages/ts-moose-lib/src/streaming-functions/runner.ts b/packages/ts-moose-lib/src/streaming-functions/runner.ts index bd492d4586..c5abdba79a 100755 --- a/packages/ts-moose-lib/src/streaming-functions/runner.ts +++ b/packages/ts-moose-lib/src/streaming-functions/runner.ts @@ -492,6 +492,12 @@ const handleMessage = async ( payloadBuffer = payloadBuffer.subarray(5); } const parsedData = JSON.parse(payloadBuffer.toString(), jsonDateReviver); + + // Log payload before transformation if enabled + if (process.env.MOOSE_LOG_PAYLOADS === "true") { + logger.log(`[PAYLOAD:STREAM_IN] ${JSON.stringify(parsedData)}`); + } + const transformedData = await Promise.all( streamingFunctionWithConfigList.map(async ([fn, config]) => { try { @@ -545,7 +551,7 @@ const handleMessage = async ( }), ); - return transformedData + const processedMessages = transformedData .map((userFunctionOutput, i) => { const [_, config] = streamingFunctionWithConfigList[i]; if (userFunctionOutput) { @@ -578,6 +584,19 @@ const handleMessage = async ( }) .flat() .filter((item) => item !== undefined && item !== null); + + // Log payload after transformation if enabled (what we're actually sending to Kafka) + if (process.env.MOOSE_LOG_PAYLOADS === "true") { + if (processedMessages.length > 0) { + // msg.value is already JSON stringified, just construct array format + const outgoingJsonStrings = processedMessages.map((msg) => msg.value); + logger.log(`[PAYLOAD:STREAM_OUT] [${outgoingJsonStrings.join(",")}]`); + } else { + logger.log(`[PAYLOAD:STREAM_OUT] (no output from streaming function)`); + } + } + + return processedMessages; } catch (e) { // TODO: Track failure rate logger.error(`Failed to transform data`); From 19200ff20915ad0ff164b62f292665baf0ce9111 Mon Sep 17 00:00:00 2001 From: Jonathan Widjaja Date: Tue, 11 Nov 2025 10:10:01 -0700 Subject: [PATCH 2/3] fix import --- .../moose_lib/streaming/streaming_function_runner.py | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/py-moose-lib/moose_lib/streaming/streaming_function_runner.py b/packages/py-moose-lib/moose_lib/streaming/streaming_function_runner.py index b4acbd474a..a87fb3a64b 100644 --- a/packages/py-moose-lib/moose_lib/streaming/streaming_function_runner.py +++ b/packages/py-moose-lib/moose_lib/streaming/streaming_function_runner.py @@ -15,6 +15,7 @@ import argparse import dataclasses +import os import traceback from datetime import datetime, timezone from importlib import import_module From 32a68fbce156f9a057cbb6a3da112d1a78d5734f Mon Sep 17 00:00:00 2001 From: Jonathan Widjaja Date: Tue, 11 Nov 2025 11:46:06 -0700 Subject: [PATCH 3/3] ok --- apps/framework-cli/src/cli/local_webserver.rs | 5 ++++- .../moose_lib/streaming/streaming_function_runner.py | 6 +++--- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/apps/framework-cli/src/cli/local_webserver.rs b/apps/framework-cli/src/cli/local_webserver.rs index a1b3ecaa2a..239ea2aeb7 100644 --- a/apps/framework-cli/src/cli/local_webserver.rs +++ b/apps/framework-cli/src/cli/local_webserver.rs @@ -1319,7 +1319,10 @@ async fn handle_json_array_body( debug!("parsed json array for {}", topic_name); // Log payload if enabled (compact JSON on one line) - if std::env::var("MOOSE_LOG_PAYLOADS").is_ok() { + if std::env::var("MOOSE_LOG_PAYLOADS") + .map(|v| v == "true") + .unwrap_or(false) + { match serde_json::from_slice::(&body) { Ok(json_value) => { if let Ok(compact_json) = serde_json::to_string(&json_value) { diff --git a/packages/py-moose-lib/moose_lib/streaming/streaming_function_runner.py b/packages/py-moose-lib/moose_lib/streaming/streaming_function_runner.py index a87fb3a64b..87f0a66e18 100644 --- a/packages/py-moose-lib/moose_lib/streaming/streaming_function_runner.py +++ b/packages/py-moose-lib/moose_lib/streaming/streaming_function_runner.py @@ -435,7 +435,7 @@ def process_messages(): # Log payload before transformation if enabled if os.getenv('MOOSE_LOG_PAYLOADS') == 'true': - log(f"[PAYLOAD:STREAM_IN] {log_prefix}: {json.dumps(input_data, cls=EnhancedJSONEncoder)}") + log(f"[PAYLOAD:STREAM_IN] {json.dumps(input_data, cls=EnhancedJSONEncoder)}") # Run the flow all_outputs = [] @@ -487,9 +487,9 @@ def process_messages(): # Filter out None values to match what actually gets sent outgoing_data = [item for item in all_outputs if item is not None] if len(outgoing_data) > 0: - log(f"[PAYLOAD:STREAM_OUT] {log_prefix}: {json.dumps(outgoing_data, cls=EnhancedJSONEncoder)}") + log(f"[PAYLOAD:STREAM_OUT] {json.dumps(outgoing_data, cls=EnhancedJSONEncoder)}") else: - log(f"[PAYLOAD:STREAM_OUT] {log_prefix}: (no output from streaming function)") + log("[PAYLOAD:STREAM_OUT] (no output from streaming function)") if producer is not None: for item in all_outputs: