diff --git a/apps/framework-cli/src/cli.rs b/apps/framework-cli/src/cli.rs index afcb6bebe2..dc29db3a21 100644 --- a/apps/framework-cli/src/cli.rs +++ b/apps/framework-cli/src/cli.rs @@ -589,10 +589,20 @@ pub async fn top_command_handler( ))) } } - Commands::Dev { no_infra, mcp } => { + Commands::Dev { + no_infra, + mcp, + log_payloads, + } => { info!("Running dev command"); info!("Moose Version: {}", CLI_VERSION); + // Set environment variable for payload logging if flag is enabled + if *log_payloads { + std::env::set_var("MOOSE_LOG_PAYLOADS", "true"); + info!("Payload logging enabled"); + } + let mut project = load_project(commands)?; project.set_is_production_env(false); let project_arc = Arc::new(project); diff --git a/apps/framework-cli/src/cli/commands.rs b/apps/framework-cli/src/cli/commands.rs index 88c7523ac3..e0d8a1ec1b 100644 --- a/apps/framework-cli/src/cli/commands.rs +++ b/apps/framework-cli/src/cli/commands.rs @@ -116,6 +116,10 @@ pub enum Commands { /// Enable or disable the MCP (Model Context Protocol) server #[arg(long, default_value = "true")] mcp: bool, + + /// Log payloads at ingest API and streaming functions for debugging + #[arg(long)] + log_payloads: bool, }, /// Start a remote environment for use in cloud deployments Prod { diff --git a/apps/framework-cli/src/cli/local_webserver.rs b/apps/framework-cli/src/cli/local_webserver.rs index db1ad3b32a..239ea2aeb7 100644 --- a/apps/framework-cli/src/cli/local_webserver.rs +++ b/apps/framework-cli/src/cli/local_webserver.rs @@ -1318,6 +1318,28 @@ async fn handle_json_array_body( debug!("parsed json array for {}", topic_name); + // Log payload if enabled (compact JSON on one line) + if std::env::var("MOOSE_LOG_PAYLOADS") + .map(|v| v == "true") + .unwrap_or(false) + { + match serde_json::from_slice::(&body) { + Ok(json_value) => { + if let Ok(compact_json) = serde_json::to_string(&json_value) { + info!("[PAYLOAD:INGEST] {}: {}", topic_name, compact_json); + } + } + Err(_) => { + // If we can't parse it, log the raw body (shouldn't happen since we already parsed it above) + info!( + "[PAYLOAD:INGEST] {}: {}", + topic_name, + String::from_utf8_lossy(&body) + ); + } + } + } + let mut records = match parsed { Err(e) => { if let Some(dlq) = dead_letter_queue { diff --git a/apps/framework-docs/src/pages/moose/local-dev.mdx b/apps/framework-docs/src/pages/moose/local-dev.mdx index b9ee8c1e2a..1759f9ff4c 100644 --- a/apps/framework-docs/src/pages/moose/local-dev.mdx +++ b/apps/framework-docs/src/pages/moose/local-dev.mdx @@ -46,6 +46,21 @@ workflows = false # Controls Temporal startup apis = true # Enables Analytics APIs server ``` +### Debugging Data Flow + +To debug data as it flows through your pipeline, use the `--log-payloads` flag: + +```bash +moose dev --log-payloads +``` + +This logs payloads at three key points with searchable prefixes: +- `PAYLOAD:INGEST` - Data received at ingest API +- `PAYLOAD:STREAM_IN` - Data before streaming function +- `PAYLOAD:STREAM_OUT` - Data after streaming function + +All logs appear in Moose logs and can be viewed using `moose logs --tail`. + ### Extending Docker Infrastructure You can extend Moose's Docker Compose configuration with custom services by creating a `docker-compose.dev.override.yaml` file in your project root. This allows you to add additional infrastructure (databases, monitoring tools, etc.) that runs alongside your Moose development environment. diff --git a/apps/framework-docs/src/pages/moose/moose-cli.mdx b/apps/framework-docs/src/pages/moose/moose-cli.mdx index eac9f5eec8..48a1d23218 100644 --- a/apps/framework-docs/src/pages/moose/moose-cli.mdx +++ b/apps/framework-docs/src/pages/moose/moose-cli.mdx @@ -41,10 +41,11 @@ moose build [--docker] [--amd64] [--arm64] ### Dev Starts a local development environment with hot reload and automatic infrastructure management. ```bash -moose dev [--mcp] [--docker] +moose dev [--mcp] [--no-infra] [--log-payloads] ``` - `--mcp`: Enable or disable the MCP (Model Context Protocol) server (default: true). The MCP server provides AI-assisted development tools at `http://localhost:4000/mcp`. See [MCP Server documentation](/moose/mcp-server) for details. -- `--docker`: Use Docker for infrastructure (default behavior in dev mode) +- `--no-infra`: Skip starting docker containers for infrastructure +- `--log-payloads`: Log payloads for debugging data flow The development server includes: - Hot reload for code changes diff --git a/packages/py-moose-lib/moose_lib/streaming/streaming_function_runner.py b/packages/py-moose-lib/moose_lib/streaming/streaming_function_runner.py index b8bf7e522a..87f0a66e18 100644 --- a/packages/py-moose-lib/moose_lib/streaming/streaming_function_runner.py +++ b/packages/py-moose-lib/moose_lib/streaming/streaming_function_runner.py @@ -15,6 +15,7 @@ import argparse import dataclasses +import os import traceback from datetime import datetime, timezone from importlib import import_module @@ -432,6 +433,10 @@ def process_messages(): # Parse the message into the input type input_data = parse_input(streaming_function_input_type, message.value) + # Log payload before transformation if enabled + if os.getenv('MOOSE_LOG_PAYLOADS') == 'true': + log(f"[PAYLOAD:STREAM_IN] {json.dumps(input_data, cls=EnhancedJSONEncoder)}") + # Run the flow all_outputs = [] for (streaming_function_callable, dlq) in streaming_function_callables: @@ -477,6 +482,15 @@ def process_messages(): cli_log(CliLogData(action="Received", message=f'{log_prefix} {len(output_data_list)} message(s)')) + # Log payload after transformation if enabled (what we're actually sending to Kafka) + if os.getenv('MOOSE_LOG_PAYLOADS') == 'true': + # Filter out None values to match what actually gets sent + outgoing_data = [item for item in all_outputs if item is not None] + if len(outgoing_data) > 0: + log(f"[PAYLOAD:STREAM_OUT] {json.dumps(outgoing_data, cls=EnhancedJSONEncoder)}") + else: + log("[PAYLOAD:STREAM_OUT] (no output from streaming function)") + if producer is not None: for item in all_outputs: # Ignore flow function returning null diff --git a/packages/ts-moose-lib/src/streaming-functions/runner.ts b/packages/ts-moose-lib/src/streaming-functions/runner.ts index bd492d4586..c5abdba79a 100755 --- a/packages/ts-moose-lib/src/streaming-functions/runner.ts +++ b/packages/ts-moose-lib/src/streaming-functions/runner.ts @@ -492,6 +492,12 @@ const handleMessage = async ( payloadBuffer = payloadBuffer.subarray(5); } const parsedData = JSON.parse(payloadBuffer.toString(), jsonDateReviver); + + // Log payload before transformation if enabled + if (process.env.MOOSE_LOG_PAYLOADS === "true") { + logger.log(`[PAYLOAD:STREAM_IN] ${JSON.stringify(parsedData)}`); + } + const transformedData = await Promise.all( streamingFunctionWithConfigList.map(async ([fn, config]) => { try { @@ -545,7 +551,7 @@ const handleMessage = async ( }), ); - return transformedData + const processedMessages = transformedData .map((userFunctionOutput, i) => { const [_, config] = streamingFunctionWithConfigList[i]; if (userFunctionOutput) { @@ -578,6 +584,19 @@ const handleMessage = async ( }) .flat() .filter((item) => item !== undefined && item !== null); + + // Log payload after transformation if enabled (what we're actually sending to Kafka) + if (process.env.MOOSE_LOG_PAYLOADS === "true") { + if (processedMessages.length > 0) { + // msg.value is already JSON stringified, just construct array format + const outgoingJsonStrings = processedMessages.map((msg) => msg.value); + logger.log(`[PAYLOAD:STREAM_OUT] [${outgoingJsonStrings.join(",")}]`); + } else { + logger.log(`[PAYLOAD:STREAM_OUT] (no output from streaming function)`); + } + } + + return processedMessages; } catch (e) { // TODO: Track failure rate logger.error(`Failed to transform data`);