Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions docs/guides/psoxy-test-tool.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,24 @@ node cli-call.js -u https://us-central1-acme.cloudfunctions.net/outlook-cal/v1.0

(*) You can obtain it by running `gcloud auth print-identity-token` (using [Google Cloud SDK])

### End-to-End Verification (Webhook Collection)

For Webhook Collection testing, you can use the tool to verify that the data was successfully collected and written to the expected bucket.

```shell
node cli-call.js -u https://us-central1-acme.cloudfunctions.net/webhook-collector --method POST --body '{...}' --verify-collection my-output-bucket
```

This will:
1. Make the POST request to the webhook collector.
2. In GCP case, trigger the associated Cloud Scheduler job processing the batch (GCP).
3. Poll the specified bucket until the output file appears (up to 60s).
4. Verify that the content of the output file matches the uploaded data.

**Options:**
* `--verify-collection <bucketName>`: Enables verification mode and specifies the target bucket.
* `--scheduler-job <jobName>`: (only for GCP case) Specify the Cloud Scheduler job that batch-processes pending webhooks from the pubsub topic.

### Psoxy Test Call: Health Check option
Use the `--health-check` option to check if your deploy is correctly configured:

Expand Down
11 changes: 11 additions & 0 deletions infra/modules/aws-host/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,17 @@ resource "aws_iam_policy" "invoke_webhook_collector_urls" {
],
"Effect" : "Allow",
"Resource" : flatten([for k, v in module.webhook_collectors : v.provisioned_auth_key_pairs])
},
{ # allow test caller to read from sanitized output buckets to verify collection
"Action" : [
"s3:ListBucket",
"s3:GetObject"
],
"Effect" : "Allow",
"Resource" : flatten([for k, v in module.webhook_collectors : [
"arn:aws:s3:::${v.output_sanitized_bucket_id}",
"arn:aws:s3:::${v.output_sanitized_bucket_id}/*"
]])
}
]
}
Expand Down
1 change: 1 addition & 0 deletions infra/modules/aws-webhook-collector/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,7 @@ locals {
example_payload = coalesce(var.example_payload, "{\"test\": \"data\"}")
example_identity = var.example_identity
collection_path = local.collection_path
sanitized_bucket_name = module.sanitized_output.bucket_id
})
}

Expand Down
1 change: 1 addition & 0 deletions infra/modules/aws-webhook-collector/test_script.tftpl
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ ${command_cli_call} -u "${collector_endpoint_url}${collection_path}" --method PO
%{ if example_identity != null ~}
--identity-subject '${example_identity}' \
%{ endif ~}
--verify-collection "${sanitized_bucket_name}" \
--body '${example_payload}'
COLLECTION_RC=$?

Expand Down
6 changes: 6 additions & 0 deletions infra/modules/gcp-host/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,12 @@ echo "Testing Bulk Connectors ..."
%{for test_script in values(module.bulk_connector)[*].test_script~}
./${test_script}
%{endfor}

echo "Testing Webhook Collectors ..."

%{for test_script in values(module.webhook_collector)[*].test_script~}
./${test_script}
%{endfor}
EOF
}

Expand Down
6 changes: 6 additions & 0 deletions infra/modules/gcp-webhook-collector/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,9 @@ locals {
secrets_to_grant_access_to = {
AUTH_ISSUER = {
secret_id = module.auth_issuer_secret.secret_ids_within_project["AUTH_ISSUER"]
},
SERVICE_URL = {
secret_id = module.auth_issuer_secret.secret_ids_within_project["SERVICE_URL"]
}
}
}
Expand Down Expand Up @@ -501,6 +504,9 @@ resource "local_file" "test_script" {
example_payload = coalesce(var.example_payload, "{\"test\": \"data\"}")
example_identity = var.example_identity
collection_path = "/"
scheduler_job_name = google_cloud_scheduler_job.trigger_batch_processing.id
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing IAM grant for Cloud Scheduler run permission

High Severity

The test script passes --scheduler-job to trigger the Cloud Scheduler job via triggerScheduler(), which calls client.runJob(). This requires cloudscheduler.jobs.run permission. However, the Terraform grants test principals only KMS signing and bucket read permissions—there's no IAM binding for Cloud Scheduler. Test principals will receive a permission denied error when the test attempts to trigger the scheduler.

Additional Locations (1)

Fix in Cursor Fix in Web

bucket_name = module.sanitized_webhook_output.bucket_name
output_path_prefix = var.output_path_prefix
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Template parameter output_path_prefix passed but unused

Low Severity

The output_path_prefix variable is passed to the test_script.tftpl template but is never used within the template. The template only uses bucket_name, scheduler_job_name, and other variables, making this a dead parameter that adds unnecessary clutter to the template invocation.

Fix in Cursor Fix in Web

})
}

Expand Down
2 changes: 2 additions & 0 deletions infra/modules/gcp-webhook-collector/test_script.tftpl
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ ${command_cli_call} -u "${collector_endpoint_url}${collection_path}" --method PO
%{ if example_identity != null ~}
--identity-subject '${example_identity}' \
%{ endif ~}
--verify-collection "${bucket_name}" \
--scheduler-job "${scheduler_job_name}" \
--body '${example_payload}'
COLLECTION_RC=$?

Expand Down
33 changes: 33 additions & 0 deletions tools/psoxy-test/cli-call.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { Command, Option } from 'commander';
import _ from 'lodash';
import { createRequire } from 'module';
import { callDataSourceEndpoints } from './data-sources/runner.js';
import gcp from './lib/gcp.js';
import getLogger from './lib/logger.js';
import psoxyTestCall from './psoxy-test-call.js';

Expand Down Expand Up @@ -39,6 +40,8 @@ const AWS_ACCESS_DENIED_EXCEPTION_REGEXP = new RegExp(/(?<arn>arn:aws:iam::\d+:\
.option('--request-no-response', "Request 'No response body' back from proxy (tests side-output case)", false)
.option('--async', 'Process request asynchronously (adds X-Psoxy-Process-Async header)', false)
.option('-b, --body <body>', 'Body to send in request (it expects a JSON string)')
.option('--verify-collection <bucket>', 'Verify that the posted data appears in the specified bucket (GCS/S3)')
.option('--scheduler-job <name>', 'GCP: Cloud Scheduler job name to trigger batch processing')
.addOption(new Option('-d, --data-source <name>',
'Data source to test all available endpoints').choices([
//TODO: pull this list from terraform console or something??
Expand Down Expand Up @@ -82,11 +85,41 @@ const AWS_ACCESS_DENIED_EXCEPTION_REGEXP = new RegExp(/(?<arn>arn:aws:iam::\d+:\

let result;
try {
const startTime = Date.now();
if (options.dataSource) {
result = await callDataSourceEndpoints(options);
} else {
result = await psoxyTestCall(options);
}

if (options.verifyCollection && result.status === 200) {
// Delegate based on cloud provider logic
const url = new URL(options.url);


const isGcp = options.force?.toLowerCase() === 'gcp' || gcp.isValidURL(url);
const isAws = options.force?.toLowerCase() === 'aws' || (!isGcp && (url.hostname.endsWith('amazonaws.com') || url.hostname.endsWith('on.aws'))); // rough check or rely on fallback
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused isAws variable is dead code

Low Severity

The isAws variable is computed at line 101 but never actually used. The conditional logic only checks if (isGcp) and falls through to else for the AWS case, making the isAws computation entirely redundant. This dead code could confuse future maintainers who might expect isAws to influence the branching logic.

Fix in Cursor Fix in Web


if (isGcp) {
await gcp.verifyCollection({
...options,
bucketName: options.verifyCollection,
startTime: startTime
}, logger);
} else {
// Assume AWS or fallback
const aws = (await import('./lib/aws.js')).default;
await aws.verifyCollection({
verifyCollection: options.verifyCollection,
url: options.url,
body: options.body,
startTime: startTime,
role: options.role,
region: options.region,
}, logger);
}
}

} catch (error) {
if (error?.name === 'AccessDenied' && error.message &&
AWS_ACCESS_DENIED_EXCEPTION_REGEXP.test(error.message)) {
Expand Down
149 changes: 131 additions & 18 deletions tools/psoxy-test/lib/aws.js
Original file line number Diff line number Diff line change
@@ -1,26 +1,28 @@
import {
CloudWatchLogsClient,
DescribeLogStreamsCommand,
GetLogEventsCommand,
CloudWatchLogsClient,
DescribeLogStreamsCommand,
GetLogEventsCommand,
} from '@aws-sdk/client-cloudwatch-logs';
import {
DeleteObjectCommand,
GetObjectCommand,
ListBucketsCommand,
ListObjectsV2Command,
PutObjectCommand,
S3Client
DeleteObjectCommand,
GetObjectCommand,
ListBucketsCommand,
ListObjectsV2Command,
PutObjectCommand,
S3Client
} from '@aws-sdk/client-s3';
import {
executeWithRetry,
getAWSCredentials,
getCommonHTTPHeaders,
isGzipped,
request,
resolveAWSRegion,
resolveHTTPMethod,
signAWSRequestURL,
signJwtWithAWSKMS
compareContent,
executeWithRetry,
getAWSCredentials,
getCommonHTTPHeaders,
isGzipped,
request,
resolveAWSRegion,
resolveHTTPMethod,
signAWSRequestURL,
signJwtWithAWSKMS,
sleep,
} from './utils.js';

import fs from 'fs';
Expand Down Expand Up @@ -374,6 +376,116 @@ async function deleteObject(bucket, key, options, client) {
// BypassGovernanceRetention: true,
}));
}
/**
* Verifies that a file containing the expected content appears in the bucket after startTime.
*
* @param {Object} options
* @param {string} options.verifyCollection - bucket name
* @param {string} options.body - expected content
* @param {number} options.startTime - timestamp in ms
* @param {string} options.role
* @param {string} options.region
* @param {Object} logger
*/
async function verifyCollection(options, logger) {
const bucketName = options.verifyCollection;
const expectedContent = options.body;
const startTime = options.startTime;
const timeout = 90000; // 90 seconds
const pollInterval = 5000; // 5 seconds
const endTime = Date.now() + timeout;

logger.info(`Verifying content in bucket: ${bucketName}. Will wait up to ${timeout / 1000}s`);

const client = await createS3Client(options.role, options.region);

while (Date.now() < endTime) {
const elapsed = Math.round((Date.now() - startTime) / 1000);
logger.info(`Waiting for content to appear in bucket... [${Math.max(0, elapsed)}s elapsed]`);

// List objects
// We might want to list only recent objects or just list all and filter.
// AWS S3 ListObjectsV2 returns up to 1000 keys.
const command = new ListObjectsV2Command({
Bucket: bucketName
});
const response = await client.send(command);

const files = response.Contents || [];

// Filter by LastModified > startTime
const newFiles = files.filter(f => f.LastModified && new Date(f.LastModified).getTime() > startTime)
.sort((a, b) => new Date(b.LastModified).getTime() - new Date(a.LastModified).getTime());

if (newFiles.length > 0) {
const file = newFiles[0];
logger.info(`New file found: ${file.Key} (Created: ${new Date(file.LastModified).toISOString()})`);

// Download content
const getObjCmd = new GetObjectCommand({
Bucket: bucketName,
Key: file.Key
});
const getResponse = await client.send(getObjCmd);

let contentStr = '';
if (getResponse.Body) {
const chunks = [];
for await (const chunk of getResponse.Body) {
chunks.push(chunk);
}
const buffer = Buffer.concat(chunks);

// Check for gzip
const isGzippedContent = (await isGzipped(buffer)) || getResponse.ContentEncoding === 'gzip';
if (isGzippedContent) {
contentStr = (await new Promise((resolve, reject) => {
zlib.gunzip(buffer, (err, res) => {
if (err) reject(err);
else resolve(res);
});
})).toString();
} else {
contentStr = buffer.toString();
}
}

logger.info(`Found Content: ${contentStr}`);

let items = [];
try {
const jsonContent = JSON.parse(contentStr);
if (Array.isArray(jsonContent)) {
items = jsonContent;
} else if (_.isPlainObject(jsonContent)) {
items = [jsonContent];
}
} catch (e) {
logger.error(`Failed to parse file content: ${e.message}`);
throw new Error(`Verification failed: Invalid JSON in file ${file.Key}`);
}

if (items.length > 0) {
const matchFound = compareContent(items, expectedContent, logger);
if (matchFound) {
logger.success(`Verification Successful: Content matches.`);
return;
} else {
logger.error(`Verification Failed: Content does not match.`);
throw new Error(`Verification failed: Content mismatch in file ${file.Key}`);
}
} else {
logger.warn(`File is empty or contains no items.`);
throw new Error(`Verification failed: Empty file ${file.Key}`);
}
}

await sleep(pollInterval);
}

logger.error('No new files found in bucket within timeout.');
throw new Error('Verification failed: Expected content not found in bucket.');
}

export default {
call,
Expand All @@ -389,4 +501,5 @@ export default {
listObjects,
parseLogEvents,
upload,
verifyCollection,
}
Loading
Loading