From 2475752e71d2757b9d863a63cfc68fa28703b271 Mon Sep 17 00:00:00 2001 From: Simon Juba Date: Tue, 2 Sep 2025 19:15:50 +0900 Subject: [PATCH 1/3] Add support for otfMetadata in Kinesis firehose response metadata for iceberg table routing --- events/firehose.go | 16 ++++- .../testdata/kinesis-firehose-response.json | 65 ++++++++++--------- 2 files changed, 50 insertions(+), 31 deletions(-) diff --git a/events/firehose.go b/events/firehose.go index 85b8fd18..09a87ef9 100644 --- a/events/firehose.go +++ b/events/firehose.go @@ -25,6 +25,13 @@ const ( KinesisFirehoseTransformedStateProcessingFailed = "ProcessingFailed" ) +// Constants used for otf operation for the record +const ( + KinesisFirehoseOTFOperationInsert = "insert" + KinesisFirehoseOTFOperationUpdate = "update" + KinesisFirehoseOTFOperationDelete = "delete" +) + type KinesisFirehoseResponse struct { Records []KinesisFirehoseResponseRecord `json:"records"` } @@ -37,7 +44,14 @@ type KinesisFirehoseResponseRecord struct { } type KinesisFirehoseResponseRecordMetadata struct { - PartitionKeys map[string]string `json:"partitionKeys"` + PartitionKeys map[string]string `json:"partitionKeys"` + OTFMetadata KinesisFirehoseResponseRecordOTFMetadata `json:"otfMetadata"` +} + +type KinesisFirehoseResponseRecordOTFMetadata struct { + DestinationTableName string `json:"destinationTableName"` + DestinationDatabaseName string `json:"destinationDatabaseName"` + Operation string `json:"operation"` // The Operation field must have one of the following values – insert, update, or delete. } type KinesisFirehoseRecordMetadata struct { diff --git a/events/testdata/kinesis-firehose-response.json b/events/testdata/kinesis-firehose-response.json index c7c4466c..d1f3aa4e 100644 --- a/events/testdata/kinesis-firehose-response.json +++ b/events/testdata/kinesis-firehose-response.json @@ -1,31 +1,36 @@ { - "records": [ - { - "data": "SGVsbG8gV29ybGQ=", - "recordId": "record1", - "result": "TRANSFORMED_STATE_OK", - "metadata": { - "partitionKeys": {} - } - }, - { - "data": "SGVsbG8gV29ybGQ=", - "recordId": "record2", - "result": "TRANSFORMED_STATE_DROPPED", - "metadata": { - "partitionKeys": {} - } - }, - { - "data": "SGVsbG8gV29ybGQ=", - "recordId": "record3", - "result": "TransformedStateOk", - "metadata": { - "partitionKeys": { - "iamKey1": "iamValue1", - "iamKey2": "iamValue2" - } - } - } - ] - } + "records": [ + { + "data": "SGVsbG8gV29ybGQ=", + "recordId": "record1", + "result": "TRANSFORMED_STATE_OK", + "metadata": { + "partitionKeys": {}, + "otfMetadata": { + "destinationTableName": "", + "destinationDatabaseName": "", + "operation": "" + } + } + }, + { + "data": "SGVsbG8gV29ybGQ=", + "recordId": "record2", + "result": "TRANSFORMED_STATE_DROPPED", + "metadata": { + "partitionKeys": {} + } + }, + { + "data": "SGVsbG8gV29ybGQ=", + "recordId": "record3", + "result": "TransformedStateOk", + "metadata": { + "partitionKeys": { + "iamKey1": "iamValue1", + "iamKey2": "iamValue2" + } + } + } + ] +} From 5ec6ac4ba9e8494422c263c4703119b7f0e68983 Mon Sep 17 00:00:00 2001 From: Simon Juba Date: Tue, 2 Sep 2025 19:42:40 +0900 Subject: [PATCH 2/3] Update tests --- events/testdata/kinesis-firehose-response.json | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/events/testdata/kinesis-firehose-response.json b/events/testdata/kinesis-firehose-response.json index d1f3aa4e..4ad248d4 100644 --- a/events/testdata/kinesis-firehose-response.json +++ b/events/testdata/kinesis-firehose-response.json @@ -18,7 +18,12 @@ "recordId": "record2", "result": "TRANSFORMED_STATE_DROPPED", "metadata": { - "partitionKeys": {} + "partitionKeys": {}, + "otfMetadata": { + "destinationTableName": "", + "destinationDatabaseName": "", + "operation": "" + } } }, { @@ -29,6 +34,11 @@ "partitionKeys": { "iamKey1": "iamValue1", "iamKey2": "iamValue2" + }, + "otfMetadata": { + "destinationTableName": "table1", + "destinationDatabaseName": "database1", + "operation": "update" } } } From 6d3b3e27ade309bccda097eddfaf20de4dd3be99 Mon Sep 17 00:00:00 2001 From: Simon Juba Date: Mon, 6 Oct 2025 14:06:17 +0900 Subject: [PATCH 3/3] Use specific type for otf operations --- events/firehose.go | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/events/firehose.go b/events/firehose.go index 09a87ef9..350c5b31 100644 --- a/events/firehose.go +++ b/events/firehose.go @@ -25,11 +25,13 @@ const ( KinesisFirehoseTransformedStateProcessingFailed = "ProcessingFailed" ) -// Constants used for otf operation for the record +// KinesisFirehoseOTFOperation represents the operation to apply on the record during on-the-fly record routing. +type KinesisFirehoseOTFOperation string + const ( - KinesisFirehoseOTFOperationInsert = "insert" - KinesisFirehoseOTFOperationUpdate = "update" - KinesisFirehoseOTFOperationDelete = "delete" + KinesisFirehoseOTFOperationInsert KinesisFirehoseOTFOperation = "insert" + KinesisFirehoseOTFOperationUpdate KinesisFirehoseOTFOperation = "update" + KinesisFirehoseOTFOperationDelete KinesisFirehoseOTFOperation = "delete" ) type KinesisFirehoseResponse struct { @@ -49,9 +51,9 @@ type KinesisFirehoseResponseRecordMetadata struct { } type KinesisFirehoseResponseRecordOTFMetadata struct { - DestinationTableName string `json:"destinationTableName"` - DestinationDatabaseName string `json:"destinationDatabaseName"` - Operation string `json:"operation"` // The Operation field must have one of the following values – insert, update, or delete. + DestinationDatabaseName string `json:"destinationDatabaseName"` + DestinationTableName string `json:"destinationTableName"` + Operation KinesisFirehoseOTFOperation `json:"operation"` // The Operation field must have one of the following values – insert, update, or delete. } type KinesisFirehoseRecordMetadata struct {