Skip to content

Commit 2475752

Browse files
committed
Add support for otfMetadata in Kinesis firehose response metadata for iceberg table routing
1 parent 42a01a9 commit 2475752

File tree

2 files changed

+50
-31
lines changed

2 files changed

+50
-31
lines changed

events/firehose.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,13 @@ const (
2525
KinesisFirehoseTransformedStateProcessingFailed = "ProcessingFailed"
2626
)
2727

28+
// Constants used for otf operation for the record
29+
const (
30+
KinesisFirehoseOTFOperationInsert = "insert"
31+
KinesisFirehoseOTFOperationUpdate = "update"
32+
KinesisFirehoseOTFOperationDelete = "delete"
33+
)
34+
2835
type KinesisFirehoseResponse struct {
2936
Records []KinesisFirehoseResponseRecord `json:"records"`
3037
}
@@ -37,7 +44,14 @@ type KinesisFirehoseResponseRecord struct {
3744
}
3845

3946
type KinesisFirehoseResponseRecordMetadata struct {
40-
PartitionKeys map[string]string `json:"partitionKeys"`
47+
PartitionKeys map[string]string `json:"partitionKeys"`
48+
OTFMetadata KinesisFirehoseResponseRecordOTFMetadata `json:"otfMetadata"`
49+
}
50+
51+
type KinesisFirehoseResponseRecordOTFMetadata struct {
52+
DestinationTableName string `json:"destinationTableName"`
53+
DestinationDatabaseName string `json:"destinationDatabaseName"`
54+
Operation string `json:"operation"` // The Operation field must have one of the following values – insert, update, or delete.
4155
}
4256

4357
type KinesisFirehoseRecordMetadata struct {
Lines changed: 35 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,36 @@
11
{
2-
"records": [
3-
{
4-
"data": "SGVsbG8gV29ybGQ=",
5-
"recordId": "record1",
6-
"result": "TRANSFORMED_STATE_OK",
7-
"metadata": {
8-
"partitionKeys": {}
9-
}
10-
},
11-
{
12-
"data": "SGVsbG8gV29ybGQ=",
13-
"recordId": "record2",
14-
"result": "TRANSFORMED_STATE_DROPPED",
15-
"metadata": {
16-
"partitionKeys": {}
17-
}
18-
},
19-
{
20-
"data": "SGVsbG8gV29ybGQ=",
21-
"recordId": "record3",
22-
"result": "TransformedStateOk",
23-
"metadata": {
24-
"partitionKeys": {
25-
"iamKey1": "iamValue1",
26-
"iamKey2": "iamValue2"
27-
}
28-
}
29-
}
30-
]
31-
}
2+
"records": [
3+
{
4+
"data": "SGVsbG8gV29ybGQ=",
5+
"recordId": "record1",
6+
"result": "TRANSFORMED_STATE_OK",
7+
"metadata": {
8+
"partitionKeys": {},
9+
"otfMetadata": {
10+
"destinationTableName": "",
11+
"destinationDatabaseName": "",
12+
"operation": ""
13+
}
14+
}
15+
},
16+
{
17+
"data": "SGVsbG8gV29ybGQ=",
18+
"recordId": "record2",
19+
"result": "TRANSFORMED_STATE_DROPPED",
20+
"metadata": {
21+
"partitionKeys": {}
22+
}
23+
},
24+
{
25+
"data": "SGVsbG8gV29ybGQ=",
26+
"recordId": "record3",
27+
"result": "TransformedStateOk",
28+
"metadata": {
29+
"partitionKeys": {
30+
"iamKey1": "iamValue1",
31+
"iamKey2": "iamValue2"
32+
}
33+
}
34+
}
35+
]
36+
}

0 commit comments

Comments
 (0)