Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion apps/framework-cli/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -589,10 +589,20 @@ pub async fn top_command_handler(
)))
}
}
Commands::Dev { no_infra, mcp } => {
Commands::Dev {
no_infra,
mcp,
log_payloads,
} => {
info!("Running dev command");
info!("Moose Version: {}", CLI_VERSION);

// Set environment variable for payload logging if flag is enabled
if *log_payloads {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of the pattern set the env variable and re-read it later could we just modify the project and have that be used as a reference. It would be less error prone reading the code and less performance overhead

std::env::set_var("MOOSE_LOG_PAYLOADS", "true");
info!("Payload logging enabled");
}

let mut project = load_project(commands)?;
project.set_is_production_env(false);
let project_arc = Arc::new(project);
Expand Down
4 changes: 4 additions & 0 deletions apps/framework-cli/src/cli/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ pub enum Commands {
/// Enable or disable the MCP (Model Context Protocol) server
#[arg(long, default_value = "true")]
mcp: bool,

/// Log payloads at ingest API and streaming functions for debugging
#[arg(long)]
log_payloads: bool,
},
/// Start a remote environment for use in cloud deployments
Prod {
Expand Down
22 changes: 22 additions & 0 deletions apps/framework-cli/src/cli/local_webserver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1318,6 +1318,28 @@ async fn handle_json_array_body(

debug!("parsed json array for {}", topic_name);

// Log payload if enabled (compact JSON on one line)
if std::env::var("MOOSE_LOG_PAYLOADS")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we preload it?

This will get re-executed for every requests which we get a lot of

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also shouldn't that be passed through the cli flag?

.map(|v| v == "true")
.unwrap_or(false)
{
match serde_json::from_slice::<serde_json::Value>(&body) {
Ok(json_value) => {
if let Ok(compact_json) = serde_json::to_string(&json_value) {
info!("[PAYLOAD:INGEST] {}: {}", topic_name, compact_json);
}
}
Err(_) => {
// If we can't parse it, log the raw body (shouldn't happen since we already parsed it above)
info!(
"[PAYLOAD:INGEST] {}: {}",
topic_name,
String::from_utf8_lossy(&body)
);
}
}
}

let mut records = match parsed {
Err(e) => {
if let Some(dlq) = dead_letter_queue {
Expand Down
15 changes: 15 additions & 0 deletions apps/framework-docs/src/pages/moose/local-dev.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,21 @@ workflows = false # Controls Temporal startup
apis = true # Enables Analytics APIs server
```

### Debugging Data Flow

To debug data as it flows through your pipeline, use the `--log-payloads` flag:

```bash
moose dev --log-payloads
```

This logs payloads at three key points with searchable prefixes:
- `PAYLOAD:INGEST` - Data received at ingest API
- `PAYLOAD:STREAM_IN` - Data before streaming function
- `PAYLOAD:STREAM_OUT` - Data after streaming function

All logs appear in Moose logs and can be viewed using `moose logs --tail`.

### Extending Docker Infrastructure

You can extend Moose's Docker Compose configuration with custom services by creating a `docker-compose.dev.override.yaml` file in your project root. This allows you to add additional infrastructure (databases, monitoring tools, etc.) that runs alongside your Moose development environment.
Expand Down
5 changes: 3 additions & 2 deletions apps/framework-docs/src/pages/moose/moose-cli.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,11 @@ moose build [--docker] [--amd64] [--arm64]
### Dev
Starts a local development environment with hot reload and automatic infrastructure management.
```bash
moose dev [--mcp] [--docker]
moose dev [--mcp] [--no-infra] [--log-payloads]
```
- `--mcp`: Enable or disable the MCP (Model Context Protocol) server (default: true). The MCP server provides AI-assisted development tools at `http://localhost:4000/mcp`. See [MCP Server documentation](/moose/mcp-server) for details.
- `--docker`: Use Docker for infrastructure (default behavior in dev mode)
- `--no-infra`: Skip starting docker containers for infrastructure
- `--log-payloads`: Log payloads for debugging data flow

The development server includes:
- Hot reload for code changes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import argparse
import dataclasses
import os
import traceback
from datetime import datetime, timezone
from importlib import import_module
Expand Down Expand Up @@ -432,6 +433,10 @@ def process_messages():
# Parse the message into the input type
input_data = parse_input(streaming_function_input_type, message.value)

# Log payload before transformation if enabled
if os.getenv('MOOSE_LOG_PAYLOADS') == 'true':
log(f"[PAYLOAD:STREAM_IN] {json.dumps(input_data, cls=EnhancedJSONEncoder)}")

# Run the flow
all_outputs = []
for (streaming_function_callable, dlq) in streaming_function_callables:
Expand Down Expand Up @@ -477,6 +482,15 @@ def process_messages():
cli_log(CliLogData(action="Received",
message=f'{log_prefix} {len(output_data_list)} message(s)'))

# Log payload after transformation if enabled (what we're actually sending to Kafka)
if os.getenv('MOOSE_LOG_PAYLOADS') == 'true':
# Filter out None values to match what actually gets sent
outgoing_data = [item for item in all_outputs if item is not None]
if len(outgoing_data) > 0:
log(f"[PAYLOAD:STREAM_OUT] {json.dumps(outgoing_data, cls=EnhancedJSONEncoder)}")
else:
log("[PAYLOAD:STREAM_OUT] (no output from streaming function)")

if producer is not None:
for item in all_outputs:
# Ignore flow function returning null
Expand Down
21 changes: 20 additions & 1 deletion packages/ts-moose-lib/src/streaming-functions/runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,12 @@ const handleMessage = async (
payloadBuffer = payloadBuffer.subarray(5);
}
const parsedData = JSON.parse(payloadBuffer.toString(), jsonDateReviver);

// Log payload before transformation if enabled
if (process.env.MOOSE_LOG_PAYLOADS === "true") {
logger.log(`[PAYLOAD:STREAM_IN] ${JSON.stringify(parsedData)}`);
}

const transformedData = await Promise.all(
streamingFunctionWithConfigList.map(async ([fn, config]) => {
try {
Expand Down Expand Up @@ -545,7 +551,7 @@ const handleMessage = async (
}),
);

return transformedData
const processedMessages = transformedData
.map((userFunctionOutput, i) => {
const [_, config] = streamingFunctionWithConfigList[i];
if (userFunctionOutput) {
Expand Down Expand Up @@ -578,6 +584,19 @@ const handleMessage = async (
})
.flat()
.filter((item) => item !== undefined && item !== null);

// Log payload after transformation if enabled (what we're actually sending to Kafka)
if (process.env.MOOSE_LOG_PAYLOADS === "true") {
if (processedMessages.length > 0) {
// msg.value is already JSON stringified, just construct array format
const outgoingJsonStrings = processedMessages.map((msg) => msg.value);
logger.log(`[PAYLOAD:STREAM_OUT] [${outgoingJsonStrings.join(",")}]`);
} else {
logger.log(`[PAYLOAD:STREAM_OUT] (no output from streaming function)`);
}
}

return processedMessages;
} catch (e) {
// TODO: Track failure rate
logger.error(`Failed to transform data`);
Expand Down
Loading