diff --git a/terraform/account-wide-infrastructure/modules/glue/glue.tf b/terraform/account-wide-infrastructure/modules/glue/glue.tf index e7cd3b810..240ea7677 100644 --- a/terraform/account-wide-infrastructure/modules/glue/glue.tf +++ b/terraform/account-wide-infrastructure/modules/glue/glue.tf @@ -1,7 +1,7 @@ # Create Glue Data Catalog Database resource "aws_glue_catalog_database" "log_database" { name = "${var.name_prefix}-reporting" - location_uri = "${aws_s3_bucket.target-data-bucket.id}/logs/" + location_uri = "${aws_s3_bucket.target-data-bucket.id}/" } # Create Glue Crawler @@ -10,7 +10,37 @@ resource "aws_glue_crawler" "log_crawler" { database_name = aws_glue_catalog_database.log_database.name role = aws_iam_role.glue_service_role.name s3_target { - path = "${aws_s3_bucket.target-data-bucket.id}/logs/" + path = "${aws_s3_bucket.target-data-bucket.id}/consumer_countDocumentReference/" + } + s3_target { + path = "${aws_s3_bucket.target-data-bucket.id}/consumer_readDocumentReference/" + } + s3_target { + path = "${aws_s3_bucket.target-data-bucket.id}/consumer_searchDocumentReference/" + } + s3_target { + path = "${aws_s3_bucket.target-data-bucket.id}/consumer_searchPostDocumentReference/" + } + s3_target { + path = "${aws_s3_bucket.target-data-bucket.id}/producer_createDocumentReference/" + } + s3_target { + path = "${aws_s3_bucket.target-data-bucket.id}/producer_deleteDocumentReference/" + } + s3_target { + path = "${aws_s3_bucket.target-data-bucket.id}/producer_readDocumentReference/" + } + s3_target { + path = "${aws_s3_bucket.target-data-bucket.id}/producer_searchDocumentReference/" + } + s3_target { + path = "${aws_s3_bucket.target-data-bucket.id}/producer_searchPostDocumentReference/" + } + s3_target { + path = "${aws_s3_bucket.target-data-bucket.id}/producer_updateDocumentReference/" + } + s3_target { + path = "${aws_s3_bucket.target-data-bucket.id}/producer_upsertDocumentReference//" } schema_change_policy { delete_behavior = "LOG" @@ -49,8 +79,8 @@ resource "aws_glue_job" "glue_job" { "--enable-auto-scaling" = "true" "--enable-continous-cloudwatch-log" = "true" "--datalake-formats" = "delta" - "--source_path" = "s3://${aws_s3_bucket.source-data-bucket.id}/" # Specify the source S3 path - "--target_path" = "s3://${aws_s3_bucket.target-data-bucket.id}/logs" # Specify the destination S3 path + "--source_path" = "s3://${aws_s3_bucket.source-data-bucket.id}/" # Specify the source S3 path + "--target_path" = "s3://${aws_s3_bucket.target-data-bucket.id}/" # Specify the destination S3 path "--job_name" = "${var.name_prefix}-glue-job" "--partition_cols" = "date" "--enable-continuous-log-filter" = "true" diff --git a/terraform/account-wide-infrastructure/modules/glue/src/consumer_schemas.py b/terraform/account-wide-infrastructure/modules/glue/src/consumer_schemas.py new file mode 100644 index 000000000..41e45edd4 --- /dev/null +++ b/terraform/account-wide-infrastructure/modules/glue/src/consumer_schemas.py @@ -0,0 +1,1070 @@ +from pyspark.sql.types import ( + BooleanType, + DoubleType, + LongType, + StringType, + StructField, + StructType, +) + +searchPostDocumentReferenceSchema = StructType( + [ + StructField("raw_message", StringType(), True), + StructField("time", DoubleType(), True), + StructField("index", StringType(), True), + StructField("host", StringType(), True), + StructField("source", StringType(), True), + StructField( + "event", + StructType( + [ + StructField("level", StringType(), True), + StructField("location", StringType(), True), + StructField("message", StringType(), True), + StructField("timestamp", StringType(), True), + StructField("service", StringType(), True), + StructField("cold_start", BooleanType(), True), + StructField("function_name", StringType(), True), + StructField("function_memory_size", StringType(), True), + StructField("function_arn", StringType(), True), + StructField("function_request_id", StringType(), True), + StructField("correlation_id", StringType(), True), + StructField("method", StringType(), True), + StructField("path", StringType(), True), + StructField( + "headers", + StructType( + [ + StructField("accept", StringType(), True), + StructField("accept-encoding", StringType(), True), + StructField("authorization", StringType(), True), + StructField("content-type", StringType(), True), + StructField("host", StringType(), True), + StructField( + "nhsd-client-rp-details", StringType(), True + ), + StructField( + "nhsd-connection-metadata", StringType(), True + ), + StructField("nhsd-correlation-id", StringType(), True), + StructField( + "nhsd-end-user-organisation-ods", StringType(), True + ), + StructField("nhsd-request-id", StringType(), True), + StructField("user-agent", StringType(), True), + StructField("x-correlation-id", StringType(), True), + StructField("x-forwarded-for", StringType(), True), + StructField("x-forwarded-port", StringType(), True), + StructField("x-forwarded-proto", StringType(), True), + StructField("x-request-id", StringType(), True), + StructField("cache-control", StringType(), True), + StructField("postman-token", StringType(), True), + ] + ), + True, + ), + StructField("log_reference", StringType(), True), + StructField("xray_trace_id", StringType(), True), + StructField( + "config", + StructType( + [ + StructField("aws_region", StringType(), True), + StructField("prefix", StringType(), True), + StructField("environment", StringType(), True), + StructField("splunk_index", StringType(), True), + StructField("source", StringType(), True), + StructField("auth_store", StringType(), True), + StructField("table_name", StringType(), True), + ] + ), + True, + ), + StructField( + "metadata", + StructType( + [ + StructField("ods_code", StringType(), True), + StructField("nrl_app_id", StringType(), True), + StructField( + "client_rp_details", + StructType( + [ + StructField( + "developer_app_name", StringType(), True + ), + StructField( + "developer_app_id", StringType(), True + ), + ] + ), + True, + ), + ] + ), + True, + ), + StructField("pointer_types", StringType(), True), + StructField("body", StringType(), True), + StructField("model", StringType(), True), + StructField( + "parsed_body", + StructType( + [ + StructField("subject_identifier", StringType(), True), + StructField("type", StringType(), True), + StructField("custodian_identifier", StringType(), True), + ] + ), + True, + ), + StructField("table_name", StringType(), True), + StructField("item_type", StringType(), True), + StructField("original_kwargs_keys", StringType(), True), + StructField("filtered_kwargs_keys", StringType(), True), + StructField("nhs_number", StringType(), True), + StructField( + "query", + StructType( + [ + StructField("indexname", StringType(), True), + StructField( + "keyconditionexpression", StringType(), True + ), + StructField( + "expressionattributevalues", + StructType( + [ + StructField( + ":patient_key", StringType(), True + ), + StructField(":type_0", StringType(), True), + StructField(":type_1", StringType(), True), + StructField(":type_2", StringType(), True), + StructField(":type_3", StringType(), True), + StructField(":type_4", StringType(), True), + StructField(":type_5", StringType(), True), + StructField(":type_6", StringType(), True), + StructField(":type_7", StringType(), True), + StructField(":type_8", StringType(), True), + StructField(":type_9", StringType(), True), + StructField(":type_10", StringType(), True), + StructField(":type_11", StringType(), True), + StructField(":type_12", StringType(), True), + StructField( + ":patient_sort", StringType(), True + ), + ] + ), + True, + ), + StructField( + "returnconsumedcapacity", StringType(), True + ), + StructField("filterexpression", StringType(), True), + StructField( + "expressionattributenames", + StructType( + [ + StructField( + "#pointer_type", StringType(), True + ) + ] + ), + True, + ), + ] + ), + True, + ), + StructField("table", StringType(), True), + StructField( + "stats", + StructType( + [ + StructField("count", LongType(), True), + StructField("scanned_count", LongType(), True), + ] + ), + True, + ), + StructField( + "result", + StructType( + [ + StructField("items", StructType([]), True), + StructField("count", LongType(), True), + StructField("scannedcount", LongType(), True), + StructField( + "consumedcapacity", + StructType( + [ + StructField( + "tablename", StringType(), True + ), + StructField( + "capacityunits", DoubleType(), True + ), + StructField("table", StructType([]), True), + StructField( + "globalsecondaryindexes", + StructType( + [ + StructField( + "patient_gsi", + StructType( + [ + StructField( + "capacityunits", + DoubleType(), + True, + ) + ] + ), + True, + ) + ] + ), + True, + ), + ] + ), + True, + ), + StructField( + "responsemetadata", + StructType( + [ + StructField( + "requestid", StringType(), True + ), + StructField( + "httpstatuscode", LongType(), True + ), + StructField( + "httpheaders", + StructType( + [ + StructField( + "server", StringType(), True + ), + StructField( + "date", StringType(), True + ), + StructField( + "content-type", + StringType(), + True, + ), + StructField( + "content-length", + StringType(), + True, + ), + StructField( + "connection", + StringType(), + True, + ), + StructField( + "x-amzn-requestid", + StringType(), + True, + ), + StructField( + "x-amz-crc32", + StringType(), + True, + ), + ] + ), + True, + ), + ] + ), + True, + ), + ] + ), + True, + ), + StructField("producer_id", StringType(), True), + StructField("document_id", StringType(), True), + StructField("custodian", StringType(), True), + StructField("id", StringType(), True), + StructField("count", LongType(), True), + StructField("status_code", StringType(), True), + StructField( + "response", + StructType( + [ + StructField("statuscode", StringType(), True), + StructField("body", StringType(), True), + ] + ), + True, + ), + StructField("error", StringType(), True), + StructField("document", StringType(), True), + StructField("exception", StringType(), True), + StructField("exception_name", StringType(), True), + StructField( + "stack_trace", + StructType( + [ + StructField("type", StringType(), True), + StructField("value", StringType(), True), + StructField("module", StringType(), True), + StructField("frames", StructType([]), True), + ] + ), + True, + ), + StructField("bucket", StringType(), True), + StructField("key", StringType(), True), + StructField("subject_identifier", StringType(), True), + StructField("type", StringType(), True), + ] + ), + True, + ), + ] +) + +searchDocumentReferenceSchema = StructType( + [ + StructField("raw_message", StringType(), True), + StructField("time", DoubleType(), True), + StructField("index", StringType(), True), + StructField("host", StringType(), True), + StructField("source", StringType(), True), + StructField( + "event", + StructType( + [ + StructField("level", StringType(), True), + StructField("location", StringType(), True), + StructField("message", StringType(), True), + StructField("timestamp", StringType(), True), + StructField("service", StringType(), True), + StructField("cold_start", BooleanType(), True), + StructField("function_name", StringType(), True), + StructField("function_memory_size", StringType(), True), + StructField("function_arn", StringType(), True), + StructField("function_request_id", StringType(), True), + StructField("correlation_id", StringType(), True), + StructField("method", StringType(), True), + StructField("path", StringType(), True), + StructField( + "headers", + StructType( + [ + StructField("accept", StringType(), True), + StructField("accept-encoding", StringType(), True), + StructField("authorization", StringType(), True), + StructField("cache-control", StringType(), True), + StructField("host", StringType(), True), + StructField( + "nhsd-client-rp-details", StringType(), True + ), + StructField( + "nhsd-connection-metadata", StringType(), True + ), + StructField("nhsd-correlation-id", StringType(), True), + StructField( + "nhsd-end-user-organisation-ods", StringType(), True + ), + StructField("nhsd-request-id", StringType(), True), + StructField("postman-token", StringType(), True), + StructField("user-agent", StringType(), True), + StructField("x-correlation-id", StringType(), True), + StructField("x-forwarded-for", StringType(), True), + StructField("x-forwarded-port", StringType(), True), + StructField("x-forwarded-proto", StringType(), True), + StructField("x-request-id", StringType(), True), + ] + ), + True, + ), + StructField("log_reference", StringType(), True), + StructField("xray_trace_id", StringType(), True), + StructField( + "config", + StructType( + [ + StructField("aws_region", StringType(), True), + StructField("prefix", StringType(), True), + StructField("environment", StringType(), True), + StructField("splunk_index", StringType(), True), + StructField("source", StringType(), True), + StructField("auth_store", StringType(), True), + StructField("table_name", StringType(), True), + ] + ), + True, + ), + StructField( + "metadata", + StructType( + [ + StructField("ods_code", StringType(), True), + StructField("nrl_app_id", StringType(), True), + StructField( + "client_rp_details", + StructType( + [ + StructField( + "developer_app_name", StringType(), True + ), + StructField( + "developer_app_id", StringType(), True + ), + ] + ), + True, + ), + ] + ), + True, + ), + StructField("pointer_types", StringType(), True), + StructField( + "params", + StructType( + [ + StructField("subject:identifier", StringType(), True), + StructField("type", StringType(), True), + ] + ), + True, + ), + StructField("model", StringType(), True), + StructField( + "parsed_params", + StructType( + [ + StructField("subject_identifier", StringType(), True), + StructField("type", StringType(), True), + ] + ), + True, + ), + StructField("table_name", StringType(), True), + StructField("item_type", StringType(), True), + StructField("original_kwargs_keys", StringType(), True), + StructField("filtered_kwargs_keys", StringType(), True), + StructField("type", StringType(), True), + StructField("status_code", StringType(), True), + StructField( + "response", + StructType( + [ + StructField("statuscode", StringType(), True), + StructField("body", StringType(), True), + ] + ), + True, + ), + StructField("nhs_number", StringType(), True), + StructField( + "query", + StructType( + [ + StructField("indexname", StringType(), True), + StructField( + "keyconditionexpression", StringType(), True + ), + StructField( + "expressionattributevalues", + StructType( + [ + StructField( + ":patient_key", StringType(), True + ), + StructField(":type_0", StringType(), True), + StructField(":type_1", StringType(), True), + StructField(":type_2", StringType(), True), + StructField(":type_3", StringType(), True), + StructField( + ":patient_sort", StringType(), True + ), + StructField(":type_4", StringType(), True), + StructField(":type_5", StringType(), True), + StructField(":type_6", StringType(), True), + StructField(":type_7", StringType(), True), + StructField(":type_8", StringType(), True), + StructField(":type_9", StringType(), True), + StructField(":type_10", StringType(), True), + StructField(":type_11", StringType(), True), + StructField(":type_12", StringType(), True), + ] + ), + True, + ), + StructField( + "returnconsumedcapacity", StringType(), True + ), + StructField("filterexpression", StringType(), True), + StructField( + "expressionattributenames", + StructType( + [ + StructField( + "#pointer_type", StringType(), True + ) + ] + ), + True, + ), + ] + ), + True, + ), + StructField("table", StringType(), True), + StructField( + "stats", + StructType( + [ + StructField("count", LongType(), True), + StructField("scanned_count", LongType(), True), + ] + ), + True, + ), + StructField( + "result", + StructType( + [ + StructField("items", StructType([]), True), + StructField("count", LongType(), True), + StructField("scannedcount", LongType(), True), + StructField( + "consumedcapacity", + StructType( + [ + StructField( + "tablename", StringType(), True + ), + StructField( + "capacityunits", DoubleType(), True + ), + StructField("table", StructType([]), True), + StructField( + "globalsecondaryindexes", + StructType( + [ + StructField( + "patient_gsi", + StructType( + [ + StructField( + "capacityunits", + DoubleType(), + True, + ) + ] + ), + True, + ) + ] + ), + True, + ), + ] + ), + True, + ), + StructField( + "responsemetadata", + StructType( + [ + StructField( + "requestid", StringType(), True + ), + StructField( + "httpstatuscode", LongType(), True + ), + StructField( + "httpheaders", + StructType( + [ + StructField( + "server", StringType(), True + ), + StructField( + "date", StringType(), True + ), + StructField( + "content-type", + StringType(), + True, + ), + StructField( + "content-length", + StringType(), + True, + ), + StructField( + "connection", + StringType(), + True, + ), + StructField( + "x-amzn-requestid", + StringType(), + True, + ), + StructField( + "x-amz-crc32", + StringType(), + True, + ), + ] + ), + True, + ), + ] + ), + True, + ), + ] + ), + True, + ), + StructField("producer_id", StringType(), True), + StructField("document_id", StringType(), True), + StructField("custodian", StringType(), True), + StructField("id", StringType(), True), + StructField("count", LongType(), True), + ] + ), + True, + ), + ] +) + +readDocumentReferenceSchema = StructType( + [ + StructField("raw_message", StringType(), True), + StructField("time", DoubleType(), True), + StructField("index", StringType(), True), + StructField("host", StringType(), True), + StructField("source", StringType(), True), + StructField( + "event", + StructType( + [ + StructField("level", StringType(), True), + StructField("location", StringType(), True), + StructField("message", StringType(), True), + StructField("timestamp", StringType(), True), + StructField("service", StringType(), True), + StructField("cold_start", BooleanType(), True), + StructField("function_name", StringType(), True), + StructField("function_memory_size", StringType(), True), + StructField("function_arn", StringType(), True), + StructField("function_request_id", StringType(), True), + StructField("correlation_id", StringType(), True), + StructField("method", StringType(), True), + StructField( + "path", + StructType([StructField("id", StringType(), True)]), + True, + ), + StructField( + "headers", + StructType( + [ + StructField("accept", StringType(), True), + StructField("accept-encoding", StringType(), True), + StructField("authorization", StringType(), True), + StructField("cache-control", StringType(), True), + StructField("host", StringType(), True), + StructField( + "nhsd-client-rp-details", StringType(), True + ), + StructField( + "nhsd-connection-metadata", StringType(), True + ), + StructField("nhsd-correlation-id", StringType(), True), + StructField( + "nhsd-end-user-organisation-ods", StringType(), True + ), + StructField("nhsd-request-id", StringType(), True), + StructField("postman-token", StringType(), True), + StructField("user-agent", StringType(), True), + StructField("x-correlation-id", StringType(), True), + StructField("x-forwarded-for", StringType(), True), + StructField("x-forwarded-port", StringType(), True), + StructField("x-forwarded-proto", StringType(), True), + StructField("x-request-id", StringType(), True), + ] + ), + True, + ), + StructField("log_reference", StringType(), True), + StructField("xray_trace_id", StringType(), True), + StructField( + "config", + StructType( + [ + StructField("aws_region", StringType(), True), + StructField("prefix", StringType(), True), + StructField("environment", StringType(), True), + StructField("splunk_index", StringType(), True), + StructField("source", StringType(), True), + StructField("auth_store", StringType(), True), + StructField("table_name", StringType(), True), + ] + ), + True, + ), + StructField( + "metadata", + StructType( + [ + StructField("ods_code", StringType(), True), + StructField("nrl_app_id", StringType(), True), + StructField( + "client_rp_details", + StructType( + [ + StructField( + "developer_app_name", StringType(), True + ), + StructField( + "developer_app_id", StringType(), True + ), + ] + ), + True, + ), + ] + ), + True, + ), + StructField("pointer_types", StringType(), True), + StructField("model", StringType(), True), + StructField( + "parsed_path", + StructType([StructField("id", StringType(), True)]), + True, + ), + StructField("table_name", StringType(), True), + StructField("item_type", StringType(), True), + StructField("original_kwargs_keys", StringType(), True), + StructField("filtered_kwargs_keys", StringType(), True), + StructField("status_code", StringType(), True), + StructField( + "response", + StructType( + [ + StructField("statuscode", StringType(), True), + StructField("body", StringType(), True), + ] + ), + True, + ), + StructField("producer_id", StringType(), True), + StructField("document_id", StringType(), True), + StructField("custodian", StringType(), True), + StructField( + "result", + StructType( + [ + StructField("id", StringType(), True), + StructField("nhs_number", StringType(), True), + StructField("custodian", StringType(), True), + StructField("producer_id", StringType(), True), + StructField("category_id", StringType(), True), + StructField("category", StringType(), True), + StructField("type_id", StringType(), True), + StructField("type", StringType(), True), + StructField("author", StringType(), True), + StructField("source", StringType(), True), + StructField("version", LongType(), True), + StructField("document", StringType(), True), + StructField("created_on", StringType(), True), + StructField("pk", StringType(), True), + StructField("sk", StringType(), True), + StructField("patient_key", StringType(), True), + StructField("patient_sort", StringType(), True), + ] + ), + True, + ), + ] + ), + True, + ), + ] +) + +countDocumentReferenceSchema = StructType( + [ + StructField("raw_message", StringType(), True), + StructField("time", DoubleType(), True), + StructField("index", StringType(), True), + StructField("host", StringType(), True), + StructField("source", StringType(), True), + StructField( + "event", + StructType( + [ + StructField("level", StringType(), True), + StructField("location", StringType(), True), + StructField("message", StringType(), True), + StructField("timestamp", StringType(), True), + StructField("service", StringType(), True), + StructField("cold_start", BooleanType(), True), + StructField("function_name", StringType(), True), + StructField("function_memory_size", StringType(), True), + StructField("function_arn", StringType(), True), + StructField("function_request_id", StringType(), True), + StructField("correlation_id", StringType(), True), + StructField("method", StringType(), True), + StructField("path", StringType(), True), + StructField( + "headers", + StructType( + [ + StructField("accept", StringType(), True), + StructField("accept-encoding", StringType(), True), + StructField("authorization", StringType(), True), + StructField("host", StringType(), True), + StructField( + "nhsd-client-rp-details", StringType(), True + ), + StructField( + "nhsd-connection-metadata", StringType(), True + ), + StructField("nhsd-correlation-id", StringType(), True), + StructField("user-agent", StringType(), True), + StructField("x-forwarded-for", StringType(), True), + StructField("x-request-id", StringType(), True), + StructField( + "nhsd-end-user-organisation-ods", StringType(), True + ), + StructField("nhsd-request-id", StringType(), True), + StructField("x-forwarded-port", StringType(), True), + StructField("x-forwarded-proto", StringType(), True), + ] + ), + True, + ), + StructField("log_reference", StringType(), True), + StructField("xray_trace_id", StringType(), True), + StructField( + "config", + StructType( + [ + StructField("aws_region", StringType(), True), + StructField("prefix", StringType(), True), + StructField("environment", StringType(), True), + StructField("splunk_index", StringType(), True), + StructField("source", StringType(), True), + StructField("auth_store", StringType(), True), + StructField("table_name", StringType(), True), + ] + ), + True, + ), + StructField( + "metadata", + StructType( + [ + StructField("ods_code", StringType(), True), + StructField("nrl_app_id", StringType(), True), + StructField( + "client_rp_details", + StructType( + [ + StructField( + "developer_app_name", StringType(), True + ), + StructField( + "developer_app_id", StringType(), True + ), + ] + ), + True, + ), + ] + ), + True, + ), + StructField("pointer_types", StringType(), True), + StructField( + "params", + StructType( + [StructField("subject:identifier", StringType(), True)] + ), + True, + ), + StructField("model", StringType(), True), + StructField( + "parsed_params", + StructType( + [StructField("subject_identifier", StringType(), True)] + ), + True, + ), + StructField("table_name", StringType(), True), + StructField("item_type", StringType(), True), + StructField("original_kwargs_keys", StringType(), True), + StructField("filtered_kwargs_keys", StringType(), True), + StructField("nhs_number", StringType(), True), + StructField( + "query", + StructType( + [ + StructField("indexname", StringType(), True), + StructField( + "keyconditionexpression", StringType(), True + ), + StructField( + "expressionattributevalues", + StructType( + [ + StructField( + ":patient_key", StringType(), True + ), + StructField( + ":patient_sort", StringType(), True + ), + ] + ), + True, + ), + StructField("select", StringType(), True), + StructField( + "returnconsumedcapacity", StringType(), True + ), + ] + ), + True, + ), + StructField("count", LongType(), True), + StructField( + "result", + StructType( + [ + StructField("count", LongType(), True), + StructField("scannedcount", LongType(), True), + StructField( + "consumedcapacity", + StructType( + [ + StructField( + "tablename", StringType(), True + ), + StructField( + "capacityunits", DoubleType(), True + ), + StructField("table", StructType([]), True), + StructField( + "globalsecondaryindexes", + StructType( + [ + StructField( + "patient_gsi", + StructType( + [ + StructField( + "capacityunits", + DoubleType(), + True, + ) + ] + ), + True, + ) + ] + ), + True, + ), + ] + ), + True, + ), + StructField( + "responsemetadata", + StructType( + [ + StructField( + "requestid", StringType(), True + ), + StructField( + "httpstatuscode", LongType(), True + ), + StructField( + "httpheaders", + StructType( + [ + StructField( + "server", StringType(), True + ), + StructField( + "date", StringType(), True + ), + StructField( + "content-type", + StringType(), + True, + ), + StructField( + "content-length", + StringType(), + True, + ), + StructField( + "connection", + StringType(), + True, + ), + StructField( + "x-amzn-requestid", + StringType(), + True, + ), + StructField( + "x-amz-crc32", + StringType(), + True, + ), + ] + ), + True, + ), + ] + ), + True, + ), + ] + ), + True, + ), + StructField("status_code", StringType(), True), + StructField( + "response", + StructType( + [ + StructField("statuscode", StringType(), True), + StructField("body", StringType(), True), + ] + ), + True, + ), + ] + ), + True, + ), + ] +) + +consumerSchemaList = { + "consumer--countDocumentReference": countDocumentReferenceSchema, + "consumer--searchPostDocumentReference": searchPostDocumentReferenceSchema, + "consumer--searchDocumentReference": searchDocumentReferenceSchema, + "consumer--readDocumentReference": readDocumentReferenceSchema, +} diff --git a/terraform/account-wide-infrastructure/modules/glue/src/main.py b/terraform/account-wide-infrastructure/modules/glue/src/main.py index 5450da0cd..e461ecbd4 100644 --- a/terraform/account-wide-infrastructure/modules/glue/src/main.py +++ b/terraform/account-wide-infrastructure/modules/glue/src/main.py @@ -1,9 +1,11 @@ import sys from awsglue.utils import getResolvedOptions +from consumer_schemas import consumerSchemaList from pipeline import LogPipeline +from producer_schemas import producerSchemaList from pyspark.context import SparkContext -from transformations import dtype_conversion, flatten_df, logSchema +from transformations import dtype_conversion, flatten_df # Get arguments from AWS Glue job args = getResolvedOptions( @@ -15,12 +17,14 @@ partition_cols = args["partition_cols"].split(",") if "partition_cols" in args else [] +consumerSchemaList.update(producerSchemaList) + # Initialize ETL process etl_job = LogPipeline( spark_context=sc, source_path=args["source_path"], target_path=args["target_path"], - schema=logSchema, + schemas=consumerSchemaList, job_name=args["job_name"], partition_cols=partition_cols, transformations=[flatten_df, dtype_conversion], diff --git a/terraform/account-wide-infrastructure/modules/glue/src/pipeline.py b/terraform/account-wide-infrastructure/modules/glue/src/pipeline.py index 2fb30ff91..2893c6100 100644 --- a/terraform/account-wide-infrastructure/modules/glue/src/pipeline.py +++ b/terraform/account-wide-infrastructure/modules/glue/src/pipeline.py @@ -1,5 +1,6 @@ import boto3 from instances import GlueContextSingleton, LoggerSingleton +from pyspark.sql.functions import col class LogPipeline: @@ -8,7 +9,7 @@ def __init__( spark_context, source_path, target_path, - schema, + schemas, job_name, partition_cols=[], transformations=[], @@ -19,7 +20,7 @@ def __init__( self.logger = LoggerSingleton().logger self.source_path = source_path self.target_path = target_path - self.schema = schema + self.schemas = schemas self.partition_cols = partition_cols self.transformations = transformations self.glue = boto3.client( @@ -33,11 +34,12 @@ def run(self): """Runs ETL""" try: self.logger.info("ETL Process started.") - df = self.extract() + data = self.extract() self.logger.info(f"Data extracted from {self.source_path}.") - df = self.transform(df) + for name, df in data.items(): + data[name] = self.transform(df) self.logger.info("Data transformed successfully.") - self.load(df) + self.load(data) self.logger.info(f"Data loaded into {self.target_path}.") self.logger.info("Trigger glue crawler") self.trigger_crawler() @@ -48,11 +50,14 @@ def run(self): def extract(self): """Extract JSON data from S3""" self.logger.info(f"Extracting data from {self.source_path} as JSON") - return ( - self.spark.read.option("recursiveFileLookup", "true") - .schema(self.schema) - .json(self.source_path) - ) + data = {} + for name, schema in self.schemas.items(): + data[name] = ( + self.spark.read.option("recursiveFileLookup", "true") + .schema(schema) + .json(self.source_path) + ).where(col("host").contains(name)) + return data def transform(self, dataframe): """Apply a list of transformations on the dataframe""" @@ -61,12 +66,14 @@ def transform(self, dataframe): dataframe = transformation(dataframe) return dataframe - def load(self, dataframe): + def load(self, data): """Load transformed data into Parquet format""" self.logger.info(f"Loading data into {self.target_path} as Parquet") - dataframe.write.mode("append").partitionBy(*self.partition_cols).parquet( - self.target_path - ) + for name, dataframe in data.items(): + name = name.replace("--", "_") + dataframe.write.mode("append").partitionBy(*self.partition_cols).parquet( + f"{self.target_path}{name}" + ) def trigger_crawler(self): self.glue.start_crawler(Name=f"{self.name_prefix}-log-crawler") diff --git a/terraform/account-wide-infrastructure/modules/glue/src/producer_schemas.py b/terraform/account-wide-infrastructure/modules/glue/src/producer_schemas.py new file mode 100644 index 000000000..410917925 --- /dev/null +++ b/terraform/account-wide-infrastructure/modules/glue/src/producer_schemas.py @@ -0,0 +1,2374 @@ +from pyspark.sql.types import ( + BooleanType, + DoubleType, + LongType, + StringType, + StructField, + StructType, +) + +upsertDocumentReferenceSchema = StructType( + [ + StructField("raw_message", StringType(), True), + StructField("time", DoubleType(), True), + StructField("index", StringType(), True), + StructField("host", StringType(), True), + StructField("source", StringType(), True), + StructField( + "event", + StructType( + [ + StructField("level", StringType(), True), + StructField("location", StringType(), True), + StructField("message", StringType(), True), + StructField("timestamp", StringType(), True), + StructField("service", StringType(), True), + StructField("cold_start", BooleanType(), True), + StructField("function_name", StringType(), True), + StructField("function_memory_size", StringType(), True), + StructField("function_arn", StringType(), True), + StructField("function_request_id", StringType(), True), + StructField("correlation_id", StringType(), True), + StructField("method", StringType(), True), + StructField("path", StringType(), True), + StructField( + "headers", + StructType( + [ + StructField("accept", StringType(), True), + StructField("accept-encoding", StringType(), True), + StructField("authorization", StringType(), True), + StructField("host", StringType(), True), + StructField( + "nhsd-client-rp-details", StringType(), True + ), + StructField( + "nhsd-connection-metadata", StringType(), True + ), + StructField("nhsd-correlation-id", StringType(), True), + StructField( + "nhsd-end-user-organisation-ods", StringType(), True + ), + StructField("nhsd-request-id", StringType(), True), + StructField("user-agent", StringType(), True), + StructField("x-correlation-id", StringType(), True), + StructField("x-forwarded-for", StringType(), True), + StructField("x-forwarded-port", StringType(), True), + StructField("x-forwarded-proto", StringType(), True), + StructField("x-request-id", StringType(), True), + StructField("cache-control", StringType(), True), + StructField("content-type", StringType(), True), + StructField("postman-token", StringType(), True), + ] + ), + True, + ), + StructField("log_reference", StringType(), True), + StructField("xray_trace_id", StringType(), True), + StructField( + "config", + StructType( + [ + StructField("aws_region", StringType(), True), + StructField("prefix", StringType(), True), + StructField("environment", StringType(), True), + StructField("splunk_index", StringType(), True), + StructField("source", StringType(), True), + StructField("auth_store", StringType(), True), + StructField("table_name", StringType(), True), + ] + ), + True, + ), + StructField( + "metadata", + StructType( + [ + StructField("ods_code", StringType(), True), + StructField("nrl_permissions", StringType(), True), + StructField("nrl_app_id", StringType(), True), + StructField( + "client_rp_details", + StructType( + [ + StructField( + "developer_app_name", StringType(), True + ), + StructField( + "developer_app_id", StringType(), True + ), + ] + ), + True, + ), + ] + ), + True, + ), + StructField("body", StringType(), True), + StructField("model", StringType(), True), + StructField( + "parsed_body", + StructType( + [ + StructField("resourcetype", StringType(), True), + StructField("id", StringType(), True), + StructField("status", StringType(), True), + StructField( + "type", + StructType( + [StructField("coding", StructType([]), True)] + ), + True, + ), + StructField("category", StructType([]), True), + StructField( + "subject", + StructType( + [ + StructField( + "identifier", + StructType( + [ + StructField( + "system", StringType(), True + ), + StructField( + "value", StringType(), True + ), + ] + ), + True, + ) + ] + ), + True, + ), + StructField("date", StringType(), True), + StructField("author", StructType([]), True), + StructField( + "custodian", + StructType( + [ + StructField( + "identifier", + StructType( + [ + StructField( + "system", StringType(), True + ), + StructField( + "value", StringType(), True + ), + ] + ), + True, + ) + ] + ), + True, + ), + StructField("content", StructType([]), True), + StructField( + "context", + StructType( + [ + StructField( + "period", + StructType( + [ + StructField( + "start", StringType(), True + ), + StructField( + "end", StringType(), True + ), + ] + ), + True, + ), + StructField( + "practicesetting", + StructType( + [ + StructField( + "coding", + StructType([]), + True, + ) + ] + ), + True, + ), + StructField( + "related", StructType([]), True + ), + ] + ), + True, + ), + StructField( + "masteridentifier", + StructType( + [ + StructField("system", StringType(), True), + StructField("value", StringType(), True), + ] + ), + True, + ), + ] + ), + True, + ), + StructField("table_name", StringType(), True), + StructField("item_type", StringType(), True), + StructField("original_kwargs_keys", StringType(), True), + StructField("filtered_kwargs_keys", StringType(), True), + StructField("resource", StringType(), True), + StructField("resource_type", StringType(), True), + StructField("step", StringType(), True), + StructField("required_fields", StringType(), True), + StructField("reason", StringType(), True), + StructField("is_valid", BooleanType(), True), + StructField("id", StringType(), True), + StructField("date", StringType(), True), + StructField("producer_id", StringType(), True), + StructField("document_id", StringType(), True), + StructField("custodian", StringType(), True), + StructField("type", StringType(), True), + StructField("pointer_id", StringType(), True), + StructField( + "indexes", + StructType( + [ + StructField("pk", StringType(), True), + StructField("sk", StringType(), True), + StructField("patient_key", StringType(), True), + StructField("patient_sort", StringType(), True), + StructField("masterid_key", StringType(), True), + ] + ), + True, + ), + StructField("source", StringType(), True), + StructField("version", LongType(), True), + StructField( + "result", + StructType( + [ + StructField( + "consumedcapacity", + StructType( + [ + StructField( + "tablename", StringType(), True + ), + StructField( + "capacityunits", DoubleType(), True + ), + StructField( + "table", + StructType( + [ + StructField( + "capacityunits", + DoubleType(), + True, + ) + ] + ), + True, + ), + StructField( + "globalsecondaryindexes", + StructType( + [ + StructField( + "patient_gsi", + StructType( + [ + StructField( + "capacityunits", + DoubleType(), + True, + ) + ] + ), + True, + ), + StructField( + "masterid_gsi", + StructType( + [ + StructField( + "capacityunits", + DoubleType(), + True, + ) + ] + ), + True, + ), + ] + ), + True, + ), + ] + ), + True, + ), + StructField( + "responsemetadata", + StructType( + [ + StructField( + "requestid", StringType(), True + ), + StructField( + "httpstatuscode", LongType(), True + ), + StructField( + "httpheaders", + StructType( + [ + StructField( + "server", StringType(), True + ), + StructField( + "date", StringType(), True + ), + StructField( + "content-type", + StringType(), + True, + ), + StructField( + "content-length", + StringType(), + True, + ), + StructField( + "connection", + StringType(), + True, + ), + StructField( + "x-amzn-requestid", + StringType(), + True, + ), + StructField( + "x-amz-crc32", + StringType(), + True, + ), + ] + ), + True, + ), + ] + ), + True, + ), + ] + ), + True, + ), + StructField("status_code", StringType(), True), + StructField( + "response", + StructType( + [ + StructField("statuscode", StringType(), True), + StructField("body", StringType(), True), + StructField( + "headers", + StructType( + [StructField("location", StringType(), True)] + ), + True, + ), + ] + ), + True, + ), + StructField("pointer_types", StringType(), True), + StructField("error", StringType(), True), + StructField("exception", StringType(), True), + StructField("exception_name", StringType(), True), + StructField( + "stack_trace", + StructType( + [ + StructField("type", StringType(), True), + StructField("value", StringType(), True), + StructField("module", StringType(), True), + StructField("frames", StructType([]), True), + ] + ), + True, + ), + StructField("bucket", StringType(), True), + StructField("key", StringType(), True), + StructField("ods_code", StringType(), True), + ] + ), + True, + ), + ] +) + +updateDocumentReferenceSchema = StructType( + [ + StructField("raw_message", StringType(), True), + StructField("time", DoubleType(), True), + StructField("index", StringType(), True), + StructField("host", StringType(), True), + StructField("source", StringType(), True), + StructField( + "event", + StructType( + [ + StructField("level", StringType(), True), + StructField("location", StringType(), True), + StructField("message", StringType(), True), + StructField("timestamp", StringType(), True), + StructField("service", StringType(), True), + StructField("cold_start", BooleanType(), True), + StructField("function_name", StringType(), True), + StructField("function_memory_size", StringType(), True), + StructField("function_arn", StringType(), True), + StructField("function_request_id", StringType(), True), + StructField("correlation_id", StringType(), True), + StructField("method", StringType(), True), + StructField( + "path", + StructType([StructField("id", StringType(), True)]), + True, + ), + StructField( + "headers", + StructType( + [ + StructField("accept", StringType(), True), + StructField("accept-encoding", StringType(), True), + StructField("authorization", StringType(), True), + StructField("cache-control", StringType(), True), + StructField("content-type", StringType(), True), + StructField("host", StringType(), True), + StructField( + "nhsd-client-rp-details", StringType(), True + ), + StructField( + "nhsd-connection-metadata", StringType(), True + ), + StructField("nhsd-correlation-id", StringType(), True), + StructField( + "nhsd-end-user-organisation-ods", StringType(), True + ), + StructField("nhsd-request-id", StringType(), True), + StructField("postman-token", StringType(), True), + StructField("user-agent", StringType(), True), + StructField("x-correlation-id", StringType(), True), + StructField("x-forwarded-for", StringType(), True), + StructField("x-forwarded-port", StringType(), True), + StructField("x-forwarded-proto", StringType(), True), + StructField("x-request-id", StringType(), True), + ] + ), + True, + ), + StructField("log_reference", StringType(), True), + StructField("xray_trace_id", StringType(), True), + StructField( + "config", + StructType( + [ + StructField("aws_region", StringType(), True), + StructField("prefix", StringType(), True), + StructField("environment", StringType(), True), + StructField("splunk_index", StringType(), True), + StructField("source", StringType(), True), + StructField("auth_store", StringType(), True), + StructField("table_name", StringType(), True), + ] + ), + True, + ), + StructField( + "metadata", + StructType( + [ + StructField("ods_code", StringType(), True), + StructField("nrl_app_id", StringType(), True), + StructField( + "client_rp_details", + StructType( + [ + StructField( + "developer_app_name", StringType(), True + ), + StructField( + "developer_app_id", StringType(), True + ), + ] + ), + True, + ), + ] + ), + True, + ), + StructField("pointer_types", StringType(), True), + StructField("body", StringType(), True), + StructField("model", StringType(), True), + StructField( + "parsed_body", + StructType( + [ + StructField("resourcetype", StringType(), True), + StructField("id", StringType(), True), + StructField( + "masteridentifier", + StructType( + [ + StructField("system", StringType(), True), + StructField("value", StringType(), True), + ] + ), + True, + ), + StructField("status", StringType(), True), + StructField( + "type", + StructType( + [ + StructField( + "coding", + StructType( + [ + StructField( + "id", StringType(), True + ) + ] + ), + True, + ) + ] + ), + True, + ), + StructField( + "category", + StructType([StructField("id", StringType(), True)]), + True, + ), + StructField( + "subject", + StructType( + [ + StructField( + "identifier", + StructType( + [ + StructField( + "system", StringType(), True + ), + StructField( + "value", StringType(), True + ), + ] + ), + True, + ) + ] + ), + True, + ), + StructField("date", StringType(), True), + StructField( + "author", + StructType([StructField("id", StringType(), True)]), + True, + ), + StructField( + "custodian", + StructType( + [ + StructField( + "identifier", + StructType( + [ + StructField( + "system", StringType(), True + ), + StructField( + "value", StringType(), True + ), + ] + ), + True, + ) + ] + ), + True, + ), + StructField( + "content", + StructType([StructField("id", StringType(), True)]), + True, + ), + StructField( + "context", + StructType( + [ + StructField( + "period", + StructType( + [ + StructField( + "start", StringType(), True + ) + ] + ), + True, + ), + StructField( + "practicesetting", + StructType( + [ + StructField( + "coding", + StructType( + [ + StructField( + "id", + StringType(), + True, + ) + ] + ), + True, + ) + ] + ), + True, + ), + StructField( + "related", + StructType( + [ + StructField( + "id", StringType(), True + ) + ] + ), + True, + ), + ] + ), + True, + ), + ] + ), + True, + ), + StructField( + "parsed_path", + StructType([StructField("id", StringType(), True)]), + True, + ), + StructField("table_name", StringType(), True), + StructField("item_type", StringType(), True), + StructField("original_kwargs_keys", StringType(), True), + StructField("filtered_kwargs_keys", StringType(), True), + StructField("resource_type", StringType(), True), + StructField("step", StringType(), True), + StructField("required_fields", StringType(), True), + StructField("reason", StringType(), True), + StructField("is_valid", BooleanType(), True), + StructField("producer_id", StringType(), True), + StructField("document_id", StringType(), True), + StructField("custodian", StringType(), True), + StructField("type", StringType(), True), + StructField( + "result", + StructType( + [ + StructField("id", StringType(), True), + StructField("nhs_number", StringType(), True), + StructField("custodian", StringType(), True), + StructField("producer_id", StringType(), True), + StructField("category_id", StringType(), True), + StructField("category", StringType(), True), + StructField("type_id", StringType(), True), + StructField("type", StringType(), True), + StructField("author", StringType(), True), + StructField("source", StringType(), True), + StructField("version", LongType(), True), + StructField("document", StringType(), True), + StructField("created_on", StringType(), True), + StructField("updated_on", StringType(), True), + StructField("pk", StringType(), True), + StructField("sk", StringType(), True), + StructField("patient_key", StringType(), True), + StructField("patient_sort", StringType(), True), + StructField( + "consumedcapacity", + StructType( + [ + StructField( + "tablename", StringType(), True + ), + StructField( + "capacityunits", DoubleType(), True + ), + StructField( + "table", + StructType( + [ + StructField( + "capacityunits", + DoubleType(), + True, + ) + ] + ), + True, + ), + StructField( + "globalsecondaryindexes", + StructType( + [ + StructField( + "patient_gsi", + StructType( + [ + StructField( + "capacityunits", + DoubleType(), + True, + ) + ] + ), + True, + ) + ] + ), + True, + ), + ] + ), + True, + ), + StructField( + "responsemetadata", + StructType( + [ + StructField( + "requestid", StringType(), True + ), + StructField( + "httpstatuscode", LongType(), True + ), + StructField( + "httpheaders", + StructType( + [ + StructField( + "server", StringType(), True + ), + StructField( + "date", StringType(), True + ), + StructField( + "content-type", + StringType(), + True, + ), + StructField( + "content-length", + StringType(), + True, + ), + StructField( + "connection", + StringType(), + True, + ), + StructField( + "x-amzn-requestid", + StringType(), + True, + ), + StructField( + "x-amz-crc32", + StringType(), + True, + ), + ] + ), + True, + ), + ] + ), + True, + ), + ] + ), + True, + ), + StructField("field", StringType(), True), + StructField("provided", StringType(), True), + StructField("status_code", StringType(), True), + StructField( + "response", + StructType( + [ + StructField("statuscode", StringType(), True), + StructField("body", StringType(), True), + ] + ), + True, + ), + ] + ), + True, + ), + ] +) + +searchPostDocumentReferenceSchema = StructType( + [ + StructField("raw_message", StringType(), True), + StructField("time", DoubleType(), True), + StructField("index", StringType(), True), + StructField("host", StringType(), True), + StructField("source", StringType(), True), + StructField( + "event", + StructType( + [ + StructField("level", StringType(), True), + StructField("location", StringType(), True), + StructField("message", StringType(), True), + StructField("timestamp", StringType(), True), + StructField("service", StringType(), True), + StructField("cold_start", BooleanType(), True), + StructField("function_name", StringType(), True), + StructField("function_memory_size", StringType(), True), + StructField("function_arn", StringType(), True), + StructField("function_request_id", StringType(), True), + StructField("correlation_id", StringType(), True), + StructField("method", StringType(), True), + StructField("path", StringType(), True), + StructField( + "headers", + StructType( + [ + StructField("accept", StringType(), True), + StructField("accept-encoding", StringType(), True), + StructField("authorization", StringType(), True), + StructField("cache-control", StringType(), True), + StructField("content-type", StringType(), True), + StructField("host", StringType(), True), + StructField( + "nhsd-client-rp-details", StringType(), True + ), + StructField( + "nhsd-connection-metadata", StringType(), True + ), + StructField("nhsd-correlation-id", StringType(), True), + StructField( + "nhsd-end-user-organisation-ods", StringType(), True + ), + StructField("nhsd-request-id", StringType(), True), + StructField("postman-token", StringType(), True), + StructField("user-agent", StringType(), True), + StructField("x-correlation-id", StringType(), True), + StructField("x-forwarded-for", StringType(), True), + StructField("x-forwarded-port", StringType(), True), + StructField("x-forwarded-proto", StringType(), True), + StructField("x-request-id", StringType(), True), + ] + ), + True, + ), + StructField("log_reference", StringType(), True), + StructField("xray_trace_id", StringType(), True), + StructField( + "config", + StructType( + [ + StructField("aws_region", StringType(), True), + StructField("prefix", StringType(), True), + StructField("environment", StringType(), True), + StructField("splunk_index", StringType(), True), + StructField("source", StringType(), True), + StructField("auth_store", StringType(), True), + StructField("table_name", StringType(), True), + ] + ), + True, + ), + StructField( + "metadata", + StructType( + [ + StructField("ods_code", StringType(), True), + StructField("nrl_app_id", StringType(), True), + StructField( + "client_rp_details", + StructType( + [ + StructField( + "developer_app_name", StringType(), True + ), + StructField( + "developer_app_id", StringType(), True + ), + ] + ), + True, + ), + ] + ), + True, + ), + StructField("pointer_types", StringType(), True), + StructField("body", StringType(), True), + StructField("model", StringType(), True), + StructField( + "parsed_body", + StructType( + [ + StructField("subject_identifier", StringType(), True), + StructField("type", StringType(), True), + ] + ), + True, + ), + StructField("table_name", StringType(), True), + StructField("item_type", StringType(), True), + StructField("original_kwargs_keys", StringType(), True), + StructField("filtered_kwargs_keys", StringType(), True), + StructField("custodian", StringType(), True), + StructField("nhs_number", StringType(), True), + StructField("expression", StringType(), True), + StructField("values", StringType(), True), + StructField( + "query", + StructType( + [ + StructField("indexname", StringType(), True), + StructField( + "keyconditionexpression", StringType(), True + ), + StructField( + "expressionattributevalues", + StructType( + [ + StructField( + ":patient_key", StringType(), True + ), + StructField(":type_0", StringType(), True), + StructField(":type_1", StringType(), True), + StructField(":type_2", StringType(), True), + StructField(":type_3", StringType(), True), + StructField(":type_4", StringType(), True), + StructField(":type_5", StringType(), True), + StructField(":type_6", StringType(), True), + StructField(":type_7", StringType(), True), + StructField(":type_8", StringType(), True), + StructField(":type_9", StringType(), True), + StructField(":type_10", StringType(), True), + StructField(":type_11", StringType(), True), + StructField(":type_12", StringType(), True), + StructField( + ":custodian", StringType(), True + ), + StructField( + ":patient_sort", StringType(), True + ), + ] + ), + True, + ), + StructField( + "returnconsumedcapacity", StringType(), True + ), + StructField("filterexpression", StringType(), True), + StructField( + "expressionattributenames", + StructType( + [ + StructField( + "#pointer_type", StringType(), True + ) + ] + ), + True, + ), + ] + ), + True, + ), + StructField("table", StringType(), True), + StructField( + "stats", + StructType( + [ + StructField("count", LongType(), True), + StructField("scanned_count", LongType(), True), + ] + ), + True, + ), + StructField( + "result", + StructType( + [ + StructField("items", StructType([]), True), + StructField("count", LongType(), True), + StructField("scannedcount", LongType(), True), + StructField( + "consumedcapacity", + StructType( + [ + StructField( + "tablename", StringType(), True + ), + StructField( + "capacityunits", DoubleType(), True + ), + StructField("table", StructType([]), True), + StructField( + "globalsecondaryindexes", + StructType( + [ + StructField( + "patient_gsi", + StructType( + [ + StructField( + "capacityunits", + DoubleType(), + True, + ) + ] + ), + True, + ) + ] + ), + True, + ), + ] + ), + True, + ), + StructField( + "responsemetadata", + StructType( + [ + StructField( + "requestid", StringType(), True + ), + StructField( + "httpstatuscode", LongType(), True + ), + StructField( + "httpheaders", + StructType( + [ + StructField( + "server", StringType(), True + ), + StructField( + "date", StringType(), True + ), + StructField( + "content-type", + StringType(), + True, + ), + StructField( + "content-length", + StringType(), + True, + ), + StructField( + "connection", + StringType(), + True, + ), + StructField( + "x-amzn-requestid", + StringType(), + True, + ), + StructField( + "x-amz-crc32", + StringType(), + True, + ), + ] + ), + True, + ), + ] + ), + True, + ), + ] + ), + True, + ), + StructField("producer_id", StringType(), True), + StructField("document_id", StringType(), True), + StructField("id", StringType(), True), + StructField("count", LongType(), True), + StructField("status_code", StringType(), True), + StructField( + "response", + StructType( + [ + StructField("statuscode", StringType(), True), + StructField("body", StringType(), True), + ] + ), + True, + ), + StructField("error", StringType(), True), + StructField("exception", StringType(), True), + StructField("exception_name", StringType(), True), + StructField( + "stack_trace", + StructType( + [ + StructField("type", StringType(), True), + StructField("value", StringType(), True), + StructField("module", StringType(), True), + StructField("frames", StructType([]), True), + ] + ), + True, + ), + StructField("bucket", StringType(), True), + StructField("key", StringType(), True), + StructField("ods_code", StringType(), True), + ] + ), + True, + ), + ] +) + +searchDocumentReferenceSchema = StructType( + [ + StructField("raw_message", StringType(), True), + StructField("time", DoubleType(), True), + StructField("index", StringType(), True), + StructField("host", StringType(), True), + StructField("source", StringType(), True), + StructField( + "event", + StructType( + [ + StructField("level", StringType(), True), + StructField("location", StringType(), True), + StructField("message", StringType(), True), + StructField("timestamp", StringType(), True), + StructField("service", StringType(), True), + StructField("cold_start", BooleanType(), True), + StructField("function_name", StringType(), True), + StructField("function_memory_size", StringType(), True), + StructField("function_arn", StringType(), True), + StructField("function_request_id", StringType(), True), + StructField("correlation_id", StringType(), True), + StructField("method", StringType(), True), + StructField("path", StringType(), True), + StructField( + "headers", + StructType( + [ + StructField("accept", StringType(), True), + StructField("accept-encoding", StringType(), True), + StructField("authorization", StringType(), True), + StructField("cache-control", StringType(), True), + StructField("host", StringType(), True), + StructField( + "nhsd-client-rp-details", StringType(), True + ), + StructField( + "nhsd-connection-metadata", StringType(), True + ), + StructField("nhsd-correlation-id", StringType(), True), + StructField( + "nhsd-end-user-organisation-ods", StringType(), True + ), + StructField("nhsd-request-id", StringType(), True), + StructField("postman-token", StringType(), True), + StructField("user-agent", StringType(), True), + StructField("x-correlation-id", StringType(), True), + StructField("x-forwarded-for", StringType(), True), + StructField("x-forwarded-port", StringType(), True), + StructField("x-forwarded-proto", StringType(), True), + StructField("x-request-id", StringType(), True), + ] + ), + True, + ), + StructField("log_reference", StringType(), True), + StructField("xray_trace_id", StringType(), True), + StructField( + "config", + StructType( + [ + StructField("aws_region", StringType(), True), + StructField("prefix", StringType(), True), + StructField("environment", StringType(), True), + StructField("splunk_index", StringType(), True), + StructField("source", StringType(), True), + StructField("auth_store", StringType(), True), + StructField("table_name", StringType(), True), + ] + ), + True, + ), + StructField( + "metadata", + StructType( + [ + StructField("ods_code", StringType(), True), + StructField("nrl_app_id", StringType(), True), + StructField( + "client_rp_details", + StructType( + [ + StructField( + "developer_app_name", StringType(), True + ), + StructField( + "developer_app_id", StringType(), True + ), + ] + ), + True, + ), + ] + ), + True, + ), + StructField("pointer_types", StringType(), True), + StructField( + "params", + StructType( + [ + StructField("subject:identifier", StringType(), True), + StructField("type", StringType(), True), + ] + ), + True, + ), + StructField("model", StringType(), True), + StructField( + "parsed_params", + StructType( + [ + StructField("subject_identifier", StringType(), True), + StructField("type", StringType(), True), + ] + ), + True, + ), + StructField("table_name", StringType(), True), + StructField("item_type", StringType(), True), + StructField("original_kwargs_keys", StringType(), True), + StructField("filtered_kwargs_keys", StringType(), True), + StructField("custodian", StringType(), True), + StructField("nhs_number", StringType(), True), + StructField("expression", StringType(), True), + StructField("values", StringType(), True), + StructField( + "query", + StructType( + [ + StructField("indexname", StringType(), True), + StructField( + "keyconditionexpression", StringType(), True + ), + StructField( + "expressionattributevalues", + StructType( + [ + StructField( + ":patient_key", StringType(), True + ), + StructField(":type_0", StringType(), True), + StructField(":type_1", StringType(), True), + StructField(":type_2", StringType(), True), + StructField(":type_3", StringType(), True), + StructField( + ":custodian", StringType(), True + ), + StructField( + ":patient_sort", StringType(), True + ), + StructField(":type_4", StringType(), True), + StructField(":type_5", StringType(), True), + StructField(":type_6", StringType(), True), + StructField(":type_7", StringType(), True), + StructField(":type_8", StringType(), True), + StructField(":type_9", StringType(), True), + StructField(":type_10", StringType(), True), + StructField(":type_11", StringType(), True), + StructField(":type_12", StringType(), True), + ] + ), + True, + ), + StructField( + "returnconsumedcapacity", StringType(), True + ), + StructField("filterexpression", StringType(), True), + StructField( + "expressionattributenames", + StructType( + [ + StructField( + "#pointer_type", StringType(), True + ) + ] + ), + True, + ), + ] + ), + True, + ), + StructField("table", StringType(), True), + StructField( + "stats", + StructType( + [ + StructField("count", LongType(), True), + StructField("scanned_count", LongType(), True), + ] + ), + True, + ), + StructField( + "result", + StructType( + [ + StructField("items", StructType([]), True), + StructField("count", LongType(), True), + StructField("scannedcount", LongType(), True), + StructField( + "consumedcapacity", + StructType( + [ + StructField( + "tablename", StringType(), True + ), + StructField( + "capacityunits", DoubleType(), True + ), + StructField("table", StructType([]), True), + StructField( + "globalsecondaryindexes", + StructType( + [ + StructField( + "patient_gsi", + StructType( + [ + StructField( + "capacityunits", + DoubleType(), + True, + ) + ] + ), + True, + ) + ] + ), + True, + ), + ] + ), + True, + ), + StructField( + "responsemetadata", + StructType( + [ + StructField( + "requestid", StringType(), True + ), + StructField( + "httpstatuscode", LongType(), True + ), + StructField( + "httpheaders", + StructType( + [ + StructField( + "server", StringType(), True + ), + StructField( + "date", StringType(), True + ), + StructField( + "content-type", + StringType(), + True, + ), + StructField( + "content-length", + StringType(), + True, + ), + StructField( + "connection", + StringType(), + True, + ), + StructField( + "x-amzn-requestid", + StringType(), + True, + ), + StructField( + "x-amz-crc32", + StringType(), + True, + ), + ] + ), + True, + ), + ] + ), + True, + ), + ] + ), + True, + ), + StructField("producer_id", StringType(), True), + StructField("document_id", StringType(), True), + StructField("id", StringType(), True), + StructField("count", LongType(), True), + StructField("status_code", StringType(), True), + StructField( + "response", + StructType( + [ + StructField("statuscode", StringType(), True), + StructField("body", StringType(), True), + ] + ), + True, + ), + StructField("type", StringType(), True), + StructField("error", StringType(), True), + StructField("exception", StringType(), True), + StructField("exception_name", StringType(), True), + StructField( + "stack_trace", + StructType( + [ + StructField("type", StringType(), True), + StructField("value", StringType(), True), + StructField("module", StringType(), True), + StructField("frames", StructType([]), True), + ] + ), + True, + ), + StructField("bucket", StringType(), True), + StructField("key", StringType(), True), + StructField("ods_code", StringType(), True), + ] + ), + True, + ), + ] +) + +readDocumentReferenceSchema = StructType( + [ + StructField("raw_message", StringType(), True), + StructField("time", DoubleType(), True), + StructField("index", StringType(), True), + StructField("host", StringType(), True), + StructField("source", StringType(), True), + StructField( + "event", + StructType( + [ + StructField("level", StringType(), True), + StructField("location", StringType(), True), + StructField("message", StringType(), True), + StructField("timestamp", StringType(), True), + StructField("service", StringType(), True), + StructField("cold_start", BooleanType(), True), + StructField("function_name", StringType(), True), + StructField("function_memory_size", StringType(), True), + StructField("function_arn", StringType(), True), + StructField("function_request_id", StringType(), True), + StructField("correlation_id", StringType(), True), + StructField("method", StringType(), True), + StructField( + "path", + StructType([StructField("id", StringType(), True)]), + True, + ), + StructField( + "headers", + StructType( + [ + StructField("accept", StringType(), True), + StructField("accept-encoding", StringType(), True), + StructField("authorization", StringType(), True), + StructField("cache-control", StringType(), True), + StructField("host", StringType(), True), + StructField( + "nhsd-client-rp-details", StringType(), True + ), + StructField( + "nhsd-connection-metadata", StringType(), True + ), + StructField("nhsd-correlation-id", StringType(), True), + StructField( + "nhsd-end-user-organisation-ods", StringType(), True + ), + StructField("nhsd-request-id", StringType(), True), + StructField("postman-token", StringType(), True), + StructField("user-agent", StringType(), True), + StructField("x-correlation-id", StringType(), True), + StructField("x-forwarded-for", StringType(), True), + StructField("x-forwarded-port", StringType(), True), + StructField("x-forwarded-proto", StringType(), True), + StructField("x-request-id", StringType(), True), + ] + ), + True, + ), + StructField("log_reference", StringType(), True), + StructField("xray_trace_id", StringType(), True), + StructField( + "config", + StructType( + [ + StructField("aws_region", StringType(), True), + StructField("prefix", StringType(), True), + StructField("environment", StringType(), True), + StructField("splunk_index", StringType(), True), + StructField("source", StringType(), True), + StructField("auth_store", StringType(), True), + StructField("table_name", StringType(), True), + ] + ), + True, + ), + StructField( + "metadata", + StructType( + [ + StructField("ods_code", StringType(), True), + StructField("nrl_app_id", StringType(), True), + StructField( + "client_rp_details", + StructType( + [ + StructField( + "developer_app_name", StringType(), True + ), + StructField( + "developer_app_id", StringType(), True + ), + ] + ), + True, + ), + ] + ), + True, + ), + StructField("pointer_types", StringType(), True), + StructField("model", StringType(), True), + StructField( + "parsed_path", + StructType([StructField("id", StringType(), True)]), + True, + ), + StructField("table_name", StringType(), True), + StructField("item_type", StringType(), True), + StructField("original_kwargs_keys", StringType(), True), + StructField("filtered_kwargs_keys", StringType(), True), + StructField("status_code", StringType(), True), + StructField( + "response", + StructType( + [ + StructField("statuscode", StringType(), True), + StructField("body", StringType(), True), + ] + ), + True, + ), + StructField("producer_id", StringType(), True), + StructField("document_id", StringType(), True), + StructField("custodian", StringType(), True), + StructField( + "result", + StructType( + [ + StructField("id", StringType(), True), + StructField("nhs_number", StringType(), True), + StructField("custodian", StringType(), True), + StructField("producer_id", StringType(), True), + StructField("category_id", StringType(), True), + StructField("category", StringType(), True), + StructField("type_id", StringType(), True), + StructField("type", StringType(), True), + StructField("author", StringType(), True), + StructField("source", StringType(), True), + StructField("version", LongType(), True), + StructField("document", StringType(), True), + StructField("created_on", StringType(), True), + StructField("pk", StringType(), True), + StructField("sk", StringType(), True), + StructField("patient_key", StringType(), True), + StructField("patient_sort", StringType(), True), + ] + ), + True, + ), + StructField("exception", StringType(), True), + StructField("exception_name", StringType(), True), + StructField( + "stack_trace", + StructType( + [ + StructField("type", StringType(), True), + StructField("value", StringType(), True), + StructField("module", StringType(), True), + StructField( + "frames", + StructType([StructField("id", StringType(), True)]), + True, + ), + ] + ), + True, + ), + StructField("ods_code_parts", StringType(), True), + ] + ), + True, + ), + ] +) + +deleteDocumentReferenceSchema = StructType( + [ + StructField("raw_message", StringType(), True), + StructField("time", DoubleType(), True), + StructField("index", StringType(), True), + StructField("host", StringType(), True), + StructField("source", StringType(), True), + StructField( + "event", + StructType( + [ + StructField("level", StringType(), True), + StructField("location", StringType(), True), + StructField("message", StringType(), True), + StructField("timestamp", StringType(), True), + StructField("service", StringType(), True), + StructField("cold_start", BooleanType(), True), + StructField("function_name", StringType(), True), + StructField("function_memory_size", StringType(), True), + StructField("function_arn", StringType(), True), + StructField("function_request_id", StringType(), True), + StructField("correlation_id", StringType(), True), + StructField("method", StringType(), True), + StructField( + "path", + StructType([StructField("id", StringType(), True)]), + True, + ), + StructField( + "headers", + StructType( + [ + StructField("accept", StringType(), True), + StructField("accept-encoding", StringType(), True), + StructField("authorization", StringType(), True), + StructField("host", StringType(), True), + StructField( + "nhsd-client-rp-details", StringType(), True + ), + StructField( + "nhsd-connection-metadata", StringType(), True + ), + StructField("nhsd-correlation-id", StringType(), True), + StructField("user-agent", StringType(), True), + StructField("x-forwarded-for", StringType(), True), + StructField("x-request-id", StringType(), True), + StructField( + "nhsd-end-user-organisation-ods", StringType(), True + ), + StructField("nhsd-request-id", StringType(), True), + StructField("x-forwarded-port", StringType(), True), + StructField("x-forwarded-proto", StringType(), True), + StructField("cache-control", StringType(), True), + StructField("postman-token", StringType(), True), + StructField("x-correlation-id", StringType(), True), + ] + ), + True, + ), + StructField("log_reference", StringType(), True), + StructField("xray_trace_id", StringType(), True), + StructField( + "config", + StructType( + [ + StructField("aws_region", StringType(), True), + StructField("prefix", StringType(), True), + StructField("environment", StringType(), True), + StructField("splunk_index", StringType(), True), + StructField("source", StringType(), True), + StructField("auth_store", StringType(), True), + StructField("table_name", StringType(), True), + ] + ), + True, + ), + StructField( + "metadata", + StructType( + [ + StructField("ods_code", StringType(), True), + StructField("nrl_app_id", StringType(), True), + StructField( + "client_rp_details", + StructType( + [ + StructField( + "developer_app_name", StringType(), True + ), + StructField( + "developer_app_id", StringType(), True + ), + ] + ), + True, + ), + ] + ), + True, + ), + StructField("pointer_types", StringType(), True), + StructField("model", StringType(), True), + StructField( + "parsed_path", + StructType([StructField("id", StringType(), True)]), + True, + ), + StructField("table_name", StringType(), True), + StructField("item_type", StringType(), True), + StructField("original_kwargs_keys", StringType(), True), + StructField("filtered_kwargs_keys", StringType(), True), + StructField("producer_id", StringType(), True), + StructField("document_id", StringType(), True), + StructField("custodian", StringType(), True), + StructField( + "result", + StructType( + [ + StructField("id", StringType(), True), + StructField("nhs_number", StringType(), True), + StructField("custodian", StringType(), True), + StructField("producer_id", StringType(), True), + StructField("category_id", StringType(), True), + StructField("category", StringType(), True), + StructField("type_id", StringType(), True), + StructField("type", StringType(), True), + StructField("author", StringType(), True), + StructField("source", StringType(), True), + StructField("version", LongType(), True), + StructField("document", StringType(), True), + StructField("created_on", StringType(), True), + StructField("pk", StringType(), True), + StructField("sk", StringType(), True), + StructField("patient_key", StringType(), True), + StructField("patient_sort", StringType(), True), + StructField( + "consumedcapacity", + StructType( + [ + StructField( + "tablename", StringType(), True + ), + StructField( + "capacityunits", DoubleType(), True + ), + StructField( + "table", + StructType( + [ + StructField( + "capacityunits", + DoubleType(), + True, + ) + ] + ), + True, + ), + StructField( + "globalsecondaryindexes", + StructType( + [ + StructField( + "patient_gsi", + StructType( + [ + StructField( + "capacityunits", + DoubleType(), + True, + ) + ] + ), + True, + ) + ] + ), + True, + ), + ] + ), + True, + ), + StructField( + "responsemetadata", + StructType( + [ + StructField( + "requestid", StringType(), True + ), + StructField( + "httpstatuscode", LongType(), True + ), + StructField( + "httpheaders", + StructType( + [ + StructField( + "server", StringType(), True + ), + StructField( + "date", StringType(), True + ), + StructField( + "content-type", + StringType(), + True, + ), + StructField( + "content-length", + StringType(), + True, + ), + StructField( + "connection", + StringType(), + True, + ), + StructField( + "x-amzn-requestid", + StringType(), + True, + ), + StructField( + "x-amz-crc32", + StringType(), + True, + ), + ] + ), + True, + ), + ] + ), + True, + ), + StructField("updated_on", StringType(), True), + ] + ), + True, + ), + StructField("partition_key", StringType(), True), + StructField("sort_key", StringType(), True), + StructField("status_code", StringType(), True), + StructField( + "response", + StructType( + [ + StructField("statuscode", StringType(), True), + StructField("body", StringType(), True), + ] + ), + True, + ), + StructField("pointer_id", StringType(), True), + ] + ), + True, + ), + ] +) + +createDocumentReferenceSchema = StructType( + [ + StructField("raw_message", StringType(), True), + StructField("time", DoubleType(), True), + StructField("index", StringType(), True), + StructField("host", StringType(), True), + StructField("source", StringType(), True), + StructField( + "event", + StructType( + [ + StructField("level", StringType(), True), + StructField("location", StringType(), True), + StructField("message", StringType(), True), + StructField("timestamp", StringType(), True), + StructField("service", StringType(), True), + StructField("cold_start", BooleanType(), True), + StructField("function_name", StringType(), True), + StructField("function_memory_size", StringType(), True), + StructField("function_arn", StringType(), True), + StructField("function_request_id", StringType(), True), + StructField("correlation_id", StringType(), True), + StructField("method", StringType(), True), + StructField("path", StringType(), True), + StructField( + "headers", + StructType( + [ + StructField("accept", StringType(), True), + StructField("accept-encoding", StringType(), True), + StructField("authorization", StringType(), True), + StructField("cache-control", StringType(), True), + StructField("content-type", StringType(), True), + StructField("host", StringType(), True), + StructField( + "nhsd-client-rp-details", StringType(), True + ), + StructField( + "nhsd-connection-metadata", StringType(), True + ), + StructField("nhsd-correlation-id", StringType(), True), + StructField( + "nhsd-end-user-organisation-ods", StringType(), True + ), + StructField("nhsd-request-id", StringType(), True), + StructField("postman-token", StringType(), True), + StructField("user-agent", StringType(), True), + StructField("x-correlation-id", StringType(), True), + StructField("x-forwarded-for", StringType(), True), + StructField("x-forwarded-port", StringType(), True), + StructField("x-forwarded-proto", StringType(), True), + StructField("x-request-id", StringType(), True), + ] + ), + True, + ), + StructField("log_reference", StringType(), True), + StructField("xray_trace_id", StringType(), True), + StructField( + "config", + StructType( + [ + StructField("aws_region", StringType(), True), + StructField("prefix", StringType(), True), + StructField("environment", StringType(), True), + StructField("splunk_index", StringType(), True), + StructField("source", StringType(), True), + StructField("auth_store", StringType(), True), + StructField("table_name", StringType(), True), + ] + ), + True, + ), + StructField( + "metadata", + StructType( + [ + StructField("ods_code", StringType(), True), + StructField("nrl_app_id", StringType(), True), + StructField( + "client_rp_details", + StructType( + [ + StructField( + "developer_app_name", StringType(), True + ), + StructField( + "developer_app_id", StringType(), True + ), + ] + ), + True, + ), + ] + ), + True, + ), + StructField("pointer_types", StringType(), True), + StructField("body", StringType(), True), + StructField("model", StringType(), True), + StructField( + "parsed_body", + StructType( + [ + StructField("resourcetype", StringType(), True), + StructField("status", StringType(), True), + StructField( + "type", + StructType( + [StructField("coding", StructType([]), True)] + ), + True, + ), + StructField("category", StructType([]), True), + StructField( + "subject", + StructType( + [ + StructField( + "identifier", + StructType( + [ + StructField( + "system", StringType(), True + ), + StructField( + "value", StringType(), True + ), + ] + ), + True, + ) + ] + ), + True, + ), + StructField("date", StringType(), True), + StructField("author", StructType([]), True), + StructField( + "custodian", + StructType( + [ + StructField( + "identifier", + StructType( + [ + StructField( + "system", StringType(), True + ), + StructField( + "value", StringType(), True + ), + ] + ), + True, + ) + ] + ), + True, + ), + StructField("content", StructType([]), True), + StructField( + "context", + StructType( + [ + StructField( + "period", + StructType( + [ + StructField( + "start", StringType(), True + ), + StructField( + "end", StringType(), True + ), + ] + ), + True, + ), + StructField( + "practicesetting", + StructType( + [ + StructField( + "coding", + StructType([]), + True, + ) + ] + ), + True, + ), + StructField( + "related", StructType([]), True + ), + StructField("event", StructType([]), True), + ] + ), + True, + ), + StructField("relatesto", StructType([]), True), + StructField("id", StringType(), True), + StructField( + "meta", + StructType( + [StructField("lastupdated", StringType(), True)] + ), + True, + ), + StructField( + "masteridentifier", + StructType( + [ + StructField("system", StringType(), True), + StructField("value", StringType(), True), + ] + ), + True, + ), + StructField("identifier", StructType([]), True), + ] + ), + True, + ), + StructField("table_name", StringType(), True), + StructField("item_type", StringType(), True), + StructField("original_kwargs_keys", StringType(), True), + StructField("filtered_kwargs_keys", StringType(), True), + StructField("resource", StringType(), True), + StructField("resource_type", StringType(), True), + StructField("step", StringType(), True), + StructField("required_fields", StringType(), True), + StructField("reason", StringType(), True), + StructField("is_valid", BooleanType(), True), + StructField("producer_id", StringType(), True), + StructField("document_id", StringType(), True), + StructField("custodian", StringType(), True), + StructField("type", StringType(), True), + StructField("pointer_id", StringType(), True), + StructField( + "indexes", + StructType( + [ + StructField("pk", StringType(), True), + StructField("sk", StringType(), True), + StructField("patient_key", StringType(), True), + StructField("patient_sort", StringType(), True), + StructField("masterid_key", StringType(), True), + ] + ), + True, + ), + StructField("source", StringType(), True), + StructField("version", LongType(), True), + StructField( + "result", + StructType( + [ + StructField( + "consumedcapacity", + StructType( + [ + StructField( + "tablename", StringType(), True + ), + StructField( + "capacityunits", DoubleType(), True + ), + StructField( + "table", + StructType( + [ + StructField( + "capacityunits", + DoubleType(), + True, + ) + ] + ), + True, + ), + StructField( + "globalsecondaryindexes", + StructType( + [ + StructField( + "patient_gsi", + StructType( + [ + StructField( + "capacityunits", + DoubleType(), + True, + ) + ] + ), + True, + ), + StructField( + "masterid_gsi", + StructType( + [ + StructField( + "capacityunits", + DoubleType(), + True, + ) + ] + ), + True, + ), + ] + ), + True, + ), + ] + ), + True, + ), + StructField( + "responsemetadata", + StructType( + [ + StructField( + "requestid", StringType(), True + ), + StructField( + "httpstatuscode", LongType(), True + ), + StructField( + "httpheaders", + StructType( + [ + StructField( + "server", StringType(), True + ), + StructField( + "date", StringType(), True + ), + StructField( + "content-type", + StringType(), + True, + ), + StructField( + "content-length", + StringType(), + True, + ), + StructField( + "connection", + StringType(), + True, + ), + StructField( + "x-amzn-requestid", + StringType(), + True, + ), + StructField( + "x-amz-crc32", + StringType(), + True, + ), + ] + ), + True, + ), + ] + ), + True, + ), + ] + ), + True, + ), + StructField("status_code", StringType(), True), + StructField( + "response", + StructType( + [ + StructField("statuscode", StringType(), True), + StructField("body", StringType(), True), + StructField( + "headers", + StructType( + [StructField("location", StringType(), True)] + ), + True, + ), + ] + ), + True, + ), + StructField("error", StringType(), True), + StructField( + "issue", + StructType( + [ + StructField("severity", StringType(), True), + StructField("code", StringType(), True), + StructField( + "details", + StructType( + [StructField("coding", StructType([]), True)] + ), + True, + ), + StructField("diagnostics", StringType(), True), + StructField("expression", StringType(), True), + ] + ), + True, + ), + StructField("issue_count", LongType(), True), + StructField("ods_code", StringType(), True), + StructField("relatesto", StringType(), True), + StructField("related_identifier", StringType(), True), + ] + ), + True, + ), + ] +) + +producerSchemaList = { + "producer--searchPostDocumentReference": searchPostDocumentReferenceSchema, + "producer--searchDocumentReference": searchDocumentReferenceSchema, + "producer--readDocumentReference": readDocumentReferenceSchema, + "producer--upsertDocumentReference": upsertDocumentReferenceSchema, + "producer--updateDocumentReference": updateDocumentReferenceSchema, + "producer--deleteDocumentReference": deleteDocumentReferenceSchema, + "producer--createDocumentReference": createDocumentReferenceSchema, +} diff --git a/terraform/account-wide-infrastructure/modules/glue/src/transformations.py b/terraform/account-wide-infrastructure/modules/glue/src/transformations.py index e4d7b8f1d..eb39d6a4f 100644 --- a/terraform/account-wide-infrastructure/modules/glue/src/transformations.py +++ b/terraform/account-wide-infrastructure/modules/glue/src/transformations.py @@ -5,296 +5,7 @@ to_date, to_timestamp, ) -from pyspark.sql.types import ( - BooleanType, - DoubleType, - LongType, - StringType, - StructField, - StructType, -) - -logSchema = StructType( - [ - StructField("time", DoubleType(), True), - StructField("index", StringType(), True), - StructField("host", StringType(), True), - StructField("source", StringType(), True), - StructField( - "event", - StructType( - [ - StructField("level", StringType(), True), - StructField("location", StringType(), True), - StructField("message", StringType(), True), - StructField("timestamp", StringType(), True), - StructField("service", StringType(), True), - StructField("cold_start", BooleanType(), True), - StructField("function_name", StringType(), True), - StructField("function_memory_size", StringType(), True), - StructField("function_arn", StringType(), True), - StructField("function_request_id", StringType(), True), - StructField("correlation_id", StringType(), True), - StructField("method", StringType(), True), - StructField("path", StringType(), True), - StructField( - "headers", - StructType( - [ - StructField("accept", StringType(), True), - StructField("accept-encoding", StringType(), True), - StructField("Authorization", StringType(), True), - StructField("Host", StringType(), True), - StructField( - "NHSD-Client-RP-Details", StringType(), True - ), - StructField( - "NHSD-Connection-Metadata", StringType(), True - ), - StructField("NHSD-Correlation-Id", StringType(), True), - StructField("User-Agent", StringType(), True), - StructField("X-Forwarded-For", StringType(), True), - StructField("X-Request-Id", StringType(), True), - StructField( - "NHSD-End-User-Organisation-ODS", StringType(), True - ), - StructField("NHSD-Request-ID", StringType(), True), - StructField("X-Forwarded-Port", StringType(), True), - StructField("X-Forwarded-Proto", StringType(), True), - ] - ), - True, - ), - StructField("log_reference", StringType(), True), - StructField("xray_trace_id", StringType(), True), - StructField( - "config", - StructType( - [ - StructField("AWS_REGION", StringType(), True), - StructField("PREFIX", StringType(), True), - StructField("ENVIRONMENT", StringType(), True), - StructField("SPLUNK_INDEX", StringType(), True), - StructField("SOURCE", StringType(), True), - StructField("AUTH_STORE", StringType(), True), - StructField("TABLE_NAME", StringType(), True), - ] - ), - True, - ), - StructField( - "metadata", - StructType( - [ - StructField("ods_code", StringType(), True), - StructField("ods_code_extension", StringType(), True), - StructField("nrl_app_id", StringType(), True), - StructField("is_test_event", BooleanType(), True), - StructField( - "client_rp_details", - StructType( - [ - StructField( - "developer_app_name", StringType(), True - ), - StructField( - "developer_app_id", StringType(), True - ), - ] - ), - True, - ), - ] - ), - True, - ), - StructField( - "params", - StructType( - [ - StructField("subject:identifier", StringType(), True), - ] - ), - True, - ), - StructField("model", StringType(), True), - StructField( - "parsed_params", - StructType( - [ - StructField("subject_identifier", StringType(), True), - ] - ), - True, - ), - StructField("table_name", StringType(), True), - StructField("item_type", StringType(), True), - StructField("original_kwargs_keys", StringType(), True), - StructField("filtered_kwargs_keys", StringType(), True), - StructField("nhs_number", StringType(), True), - StructField( - "query", - StructType( - [ - StructField("IndexName", StringType(), True), - StructField( - "KeyConditionExpression", StringType(), True - ), - StructField( - "ExpressionAttributeValues", - StructType( - [ - StructField( - ":patient_key", StringType(), True - ), - StructField( - ":patient_sort", StringType(), True - ), - ] - ), - True, - ), - StructField("Select", StringType(), True), - StructField( - "ReturnConsumedCapacity", StringType(), True - ), - ] - ), - True, - ), - StructField("count", LongType(), True), - StructField( - "result", - StructType( - [ - StructField("Count", LongType(), True), - StructField("ScannedCount", LongType(), True), - StructField( - "ConsumedCapacity", - StructType( - [ - StructField( - "TableName", StringType(), True - ), - StructField( - "CapacityUnits", DoubleType(), True - ), - StructField( - "Table", - StructType( - [ - StructField( - "CapacityUnits", - DoubleType(), - True, - ), - ] - ), - True, - ), - StructField( - "GlobalSecondaryIndexes", - StructType( - [ - StructField( - "patient_gsi", - StructType( - [ - StructField( - "CapacityUnits", - DoubleType(), - True, - ), - ] - ), - True, - ), - ] - ), - True, - ), - ] - ), - True, - ), - StructField( - "ResponseMetadata", - StructType( - [ - StructField( - "RequestId", StringType(), True - ), - StructField( - "HTTPStatusCode", LongType(), True - ), - StructField( - "HTTPHeaders", - StructType( - [ - StructField( - "server", StringType(), True - ), - StructField( - "date", StringType(), True - ), - StructField( - "content-type", - StringType(), - True, - ), - StructField( - "content-length", - StringType(), - True, - ), - StructField( - "connection", - StringType(), - True, - ), - StructField( - "x-amzn-requestid", - StringType(), - True, - ), - StructField( - "x-amz-crc32", - StringType(), - True, - ), - ] - ), - True, - ), - StructField( - "RetryAttempts", LongType(), True - ), - ] - ), - True, - ), - ] - ), - True, - ), - StructField("status_code", StringType(), True), - StructField( - "response", - StructType( - [ - StructField("statusCode", StringType(), True), - StructField("body", StringType(), True), - StructField("isBase64Encoded", BooleanType(), True), - ] - ), - True, - ), - ] - ), - True, - ), - ] -) +from pyspark.sql.types import StructType def flatten_df(df):