Skip to content

Commit d497bc7

Browse files
first pass
1 parent be90c8d commit d497bc7

File tree

7 files changed

+85
-4
lines changed

7 files changed

+85
-4
lines changed

apps/framework-cli/src/cli.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -588,10 +588,20 @@ pub async fn top_command_handler(
588588
)))
589589
}
590590
}
591-
Commands::Dev { no_infra, mcp } => {
591+
Commands::Dev {
592+
no_infra,
593+
mcp,
594+
log_payloads,
595+
} => {
592596
info!("Running dev command");
593597
info!("Moose Version: {}", CLI_VERSION);
594598

599+
// Set environment variable for payload logging if flag is enabled
600+
if *log_payloads {
601+
std::env::set_var("MOOSE_LOG_PAYLOADS", "true");
602+
info!("Payload logging enabled");
603+
}
604+
595605
let mut project = load_project(commands)?;
596606
project.set_is_production_env(false);
597607
let project_arc = Arc::new(project);

apps/framework-cli/src/cli/commands.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,10 @@ pub enum Commands {
116116
/// Enable or disable the MCP (Model Context Protocol) server
117117
#[arg(long, default_value = "true")]
118118
mcp: bool,
119+
120+
/// Log payloads at ingest API and streaming functions for debugging
121+
#[arg(long)]
122+
log_payloads: bool,
119123
},
120124
/// Start a remote environment for use in cloud deployments
121125
Prod {

apps/framework-cli/src/cli/local_webserver.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1318,6 +1318,25 @@ async fn handle_json_array_body(
13181318

13191319
debug!("parsed json array for {}", topic_name);
13201320

1321+
// Log payload if enabled (compact JSON on one line)
1322+
if std::env::var("MOOSE_LOG_PAYLOADS").is_ok() {
1323+
match serde_json::from_slice::<serde_json::Value>(&body) {
1324+
Ok(json_value) => {
1325+
if let Ok(compact_json) = serde_json::to_string(&json_value) {
1326+
info!("[PAYLOAD:INGEST] {}: {}", topic_name, compact_json);
1327+
}
1328+
}
1329+
Err(_) => {
1330+
// If we can't parse it, log the raw body (shouldn't happen since we already parsed it above)
1331+
info!(
1332+
"[PAYLOAD:INGEST] {}: {}",
1333+
topic_name,
1334+
String::from_utf8_lossy(&body)
1335+
);
1336+
}
1337+
}
1338+
}
1339+
13211340
let mut records = match parsed {
13221341
Err(e) => {
13231342
if let Some(dlq) = dead_letter_queue {

apps/framework-docs/src/pages/moose/local-dev.mdx

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,21 @@ workflows = false # Controls Temporal startup
4646
apis = true # Enables Analytics APIs server
4747
```
4848

49+
### Debugging Data Flow
50+
51+
To debug data as it flows through your pipeline, use the `--log-payloads` flag:
52+
53+
```bash
54+
moose dev --log-payloads
55+
```
56+
57+
This logs payloads at three key points with searchable prefixes:
58+
- `PAYLOAD:INGEST` - Data received at ingest API
59+
- `PAYLOAD:STREAM_IN` - Data before streaming function
60+
- `PAYLOAD:STREAM_OUT` - Data after streaming function
61+
62+
All logs appear in Moose logs and can be viewed using `moose logs --tail`.
63+
4964
### Extending Docker Infrastructure
5065

5166
You can extend Moose's Docker Compose configuration with custom services by creating a `docker-compose.dev.override.yaml` file in your project root. This allows you to add additional infrastructure (databases, monitoring tools, etc.) that runs alongside your Moose development environment.

apps/framework-docs/src/pages/moose/moose-cli.mdx

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,11 @@ moose build [--docker] [--amd64] [--arm64]
4141
### Dev
4242
Starts a local development environment with hot reload and automatic infrastructure management.
4343
```bash
44-
moose dev [--mcp] [--docker]
44+
moose dev [--mcp] [--no-infra] [--log-payloads]
4545
```
4646
- `--mcp`: Enable or disable the MCP (Model Context Protocol) server (default: true). The MCP server provides AI-assisted development tools at `http://localhost:4000/mcp`. See [MCP Server documentation](/moose/mcp-server) for details.
47-
- `--docker`: Use Docker for infrastructure (default behavior in dev mode)
47+
- `--no-infra`: Skip starting docker containers for infrastructure
48+
- `--log-payloads`: Log payloads for debugging data flow
4849

4950
The development server includes:
5051
- Hot reload for code changes

packages/py-moose-lib/moose_lib/streaming/streaming_function_runner.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -432,6 +432,10 @@ def process_messages():
432432
# Parse the message into the input type
433433
input_data = parse_input(streaming_function_input_type, message.value)
434434

435+
# Log payload before transformation if enabled
436+
if os.getenv('MOOSE_LOG_PAYLOADS') == 'true':
437+
log(f"[PAYLOAD:STREAM_IN] {log_prefix}: {json.dumps(input_data, cls=EnhancedJSONEncoder)}")
438+
435439
# Run the flow
436440
all_outputs = []
437441
for (streaming_function_callable, dlq) in streaming_function_callables:
@@ -477,6 +481,15 @@ def process_messages():
477481
cli_log(CliLogData(action="Received",
478482
message=f'{log_prefix} {len(output_data_list)} message(s)'))
479483

484+
# Log payload after transformation if enabled (what we're actually sending to Kafka)
485+
if os.getenv('MOOSE_LOG_PAYLOADS') == 'true':
486+
# Filter out None values to match what actually gets sent
487+
outgoing_data = [item for item in all_outputs if item is not None]
488+
if len(outgoing_data) > 0:
489+
log(f"[PAYLOAD:STREAM_OUT] {log_prefix}: {json.dumps(outgoing_data, cls=EnhancedJSONEncoder)}")
490+
else:
491+
log(f"[PAYLOAD:STREAM_OUT] {log_prefix}: (no output from streaming function)")
492+
480493
if producer is not None:
481494
for item in all_outputs:
482495
# Ignore flow function returning null

packages/ts-moose-lib/src/streaming-functions/runner.ts

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -492,6 +492,12 @@ const handleMessage = async (
492492
payloadBuffer = payloadBuffer.subarray(5);
493493
}
494494
const parsedData = JSON.parse(payloadBuffer.toString(), jsonDateReviver);
495+
496+
// Log payload before transformation if enabled
497+
if (process.env.MOOSE_LOG_PAYLOADS === "true") {
498+
logger.log(`[PAYLOAD:STREAM_IN] ${JSON.stringify(parsedData)}`);
499+
}
500+
495501
const transformedData = await Promise.all(
496502
streamingFunctionWithConfigList.map(async ([fn, config]) => {
497503
try {
@@ -545,7 +551,7 @@ const handleMessage = async (
545551
}),
546552
);
547553

548-
return transformedData
554+
const processedMessages = transformedData
549555
.map((userFunctionOutput, i) => {
550556
const [_, config] = streamingFunctionWithConfigList[i];
551557
if (userFunctionOutput) {
@@ -578,6 +584,19 @@ const handleMessage = async (
578584
})
579585
.flat()
580586
.filter((item) => item !== undefined && item !== null);
587+
588+
// Log payload after transformation if enabled (what we're actually sending to Kafka)
589+
if (process.env.MOOSE_LOG_PAYLOADS === "true") {
590+
if (processedMessages.length > 0) {
591+
// msg.value is already JSON stringified, just construct array format
592+
const outgoingJsonStrings = processedMessages.map((msg) => msg.value);
593+
logger.log(`[PAYLOAD:STREAM_OUT] [${outgoingJsonStrings.join(",")}]`);
594+
} else {
595+
logger.log(`[PAYLOAD:STREAM_OUT] (no output from streaming function)`);
596+
}
597+
}
598+
599+
return processedMessages;
581600
} catch (e) {
582601
// TODO: Track failure rate
583602
logger.error(`Failed to transform data`);

0 commit comments

Comments
 (0)