diff --git a/Makefile b/Makefile index b83511a..4299c1c 100644 --- a/Makefile +++ b/Makefile @@ -6,7 +6,9 @@ PLUGIN_BINARY = $(PLUGIN_DIR)/tailpipe-plugin-gcp.plugin VERSION_JSON = $(PLUGIN_DIR)/version.json VERSIONS_JSON = $(TAILPIPE_INSTALL_DIR)/plugins/versions.json +.PHONY: install + install: go build -o $(PLUGIN_BINARY) -tags "${BUILD_TAGS}" *.go $(PLUGIN_BINARY) metadata > $(VERSION_JSON) - rm -f $(VERSIONS_JSON) \ No newline at end of file + rm -f $(VERSIONS_JSON) diff --git a/docs/sources/gcp_cloud_logging_api.md b/docs/sources/gcp_cloud_logging_api.md new file mode 100644 index 0000000..d92b950 --- /dev/null +++ b/docs/sources/gcp_cloud_logging_api.md @@ -0,0 +1,40 @@ +--- +title: "Source: gcp_cloud_logging_api - Collect logs from GCP Cloud Logging API" +description: "Allows users to collect logs from Google Cloud Platform (GCP) Cloud Logging API." +--- + +# Source: gcp_cloud_logging_api - Obtain logs from GCP Cloud Logging API + +The Google Cloud Platform (GCP) Cloud Logging API provides access to all logs for all GCP services. It allows you to view and manage logs for your GCP projects, services, and applications. + +This source is currently configured only for request logs from Google Load Balancer and Cloud Armor logs, from the source log name `projects/project-id/logs/requests` + +Using this source, currently you can collect, filter, and analyze request logs that have been enriched with Cloud Armor rule findings, in order to collect metrics on blocking or analyze to eliminate false positive findings that would block wanted application requests. + +Any other log type except for audit logs are of the `logEntry` type, and this source can potentially collect them with minor changes to the source code. (Tables must still be created for each type of log) + +## Example Configurations + +### Collect request logs + +Collect all of the request logs for a project. + +```hcl +connection "gcp" "my_project" { + project = "my-gcp-project" +} + +partition "gcp_requests_log" "my_logs" { + source "gcp_cloud_logging_api" { + connection = connection.gcp.my_project + } +} +``` + + +## Arguments + +| Argument | Type | Required | Default | Description | +|------------|------------------|----------|--------------------------|-------------------------------------------------------------------------------------------------------------------------------| +| connection | `connection.gcp` | No | `connection.gcp.default` | The [GCP connection](https://hub.tailpipe.io/plugins/turbot/gcp#connection-credentials) to use to connect to the GCP account. | +| log_types | List(String) | No | [] | This could any type of non-audit log that confirms to the [logEntry data model](https://cloud.google.com/logging/docs/log-entry-data-model) and is stored in the `_Default` logging bucket in a GCP project. The only restriction is what tables are supported in Tailpipe. Currently the only supported type is `requests` diff --git a/docs/tables/gcp_requests_log/index.md b/docs/tables/gcp_requests_log/index.md new file mode 100644 index 0000000..31c7477 --- /dev/null +++ b/docs/tables/gcp_requests_log/index.md @@ -0,0 +1,186 @@ +--- +title: "Tailpipe Table: gcp_requests_log - Query GCP request logs" +description: "GCP request logs capture network requests from GCP Load Balancers containing fields for the results of Cloud Armor analysis." +--- + +# Table: gcp_requests_log - Query GCP request logs + +The `gcp_requests_log` table allows you to query data from GCP audit logs. This table provides detailed information about API calls made within your Google Cloud environment, including the event name, resource affected, user identity, and more. + +## Configure + +Create a [partition](https://tailpipe.io/docs/manage/partition) for `gcp_requests_log`: + +```sh +vi ~/.tailpipe/config/gcp.tpc +``` + +```hcl +connection "gcp" "logging_account" { + project = "my-gcp-project" +} + +partition "gcp_requests_log" "my_logs" { + source "gcp_storage_bucket" { + connection = connection.gcp.logging_account + } +} +``` +OR + +```hcl +connection "gcp" "logging_account" { + project = "my-gcp-project" +} + +partition "gcp_requests_log" "my_logs" { + source "gcp_cloud_logging_api" { + connection = connection.gcp.logging_account + } +} +``` + +## Collect + +[Collect](https://tailpipe.io/docs/manage/collection) logs for all `gcp_requests_log` partitions: + +```sh +tailpipe collect gcp_requests_log +``` + +Or for a single partition: + +```sh +tailpipe collect gcp_requests_log.my_logs +``` + +## Query + + +### Blocked Requests + +Count how many requests were blocked by Cloud Armor across all policies + +```sql +SELECT + enforced_security_policy.name AS policy_name, + count(*) AS total_blocked_requests +FROM + gcp_requests_log +WHERE + enforced_security_policy ->> 'outcome' = 'DENY' +GROUP BY + policy_name +ORDER BY + total_blocked_requests DESC +``` + +### Top 10 events + +List the 10 most blocked OWASP Core Rule Set rules + +```sql +SELECT + enforced_security_policy ->> 'preconfigured_expr_id' AS rule_id, + COUNT(*) AS total_occurrences +FROM + gcp_requests_log +WHERE + enforced_security_policy ->> 'outcome' = 'DENY' + AND LENGTH(enforced_security_policy ->> 'preconfigured_expr_id') > 0 +GROUP BY + rule_id +ORDER BY + total_occurrences DESC +LIMIT 10 +``` + + +## Example Configurations + +### Collect logs from a Storage bucket + +Collect request logs stored in a Storage bucket that use the [default log file name format](https://hub.tailpipe.io/plugins/turbot/gcp/tables/gcp_audit_log#gcp_storage_bucket). + +```hcl +connection "gcp" "logging_account" { + project = "my-gcp-project" +} + +partition "gcp_requests_log" "my_logs" { + source "gcp_storage_bucket" { + connection = connection.gcp.logging_account + bucket = "gcp-cloudarmor-logs-bucket" + } +} +``` + +### Collect logs from a Storage bucket with a prefix + +Collect audit logs stored with a GCS key prefix. + +```hcl +partition "gcp_requests_log" "my_logs_prefix" { + source "gcp_storage_bucket" { + connection = connection.gcp.logging_account + bucket = "gcp-cloudarmor-logs-bucket" + prefix = "my/prefix/" + } +} +``` + +### Collect logs from a Storage Bucket for a single project + +Collect audit logs for a specific project. + +```hcl +partition "gcp_requests_log" "my_logs_prefix" { + filter = "log_name like 'projects/my-project-name/logs/requests/%'" + + source "gcp_storage_bucket" { + connection = connection.gcp.logging_account + bucket = "gcp-cloudarmor-logs-bucket" + } +} +``` + +### Collect logs from Cloud Logging API + +Collect request logs directly via the Cloud Logging API. *Note that rate limiting is currently not implemented and this could impact ability to collect a large number of logs* + +```hcl +connection "gcp" "my_project" { + project = "my-gcp-project" +} + +partition "gcp_requests_log" "my_logs" { + source "gcp_cloud_logging_api" { + connection = connection.gcp.my_project + } +} +``` + +### Collect other types of logs from Cloud Logging API + +The Cloud Logging API source can be expanded upon to retrieve logs other than requests, if the appropriate table is created to enrich and save them. The log name attribute is the filter used for this, and it assumes that a table / partition have been made that match the data type. + +Example: Collecting GCP Dataflow logs + +```hcl +partition "gcp_dataflow_log" "my_logs_prefix" { + filter = "log_name like 'projects/my-project-name/logs/dataflow.googleapis.com%'" + source "gcp_cloud_logging_api" { + connection = connection.gcp.my_project + } +} +``` + +## Source Defaults + +### gcp_storage_bucket + +This table sets the following defaults for the [gcp_storage_bucket](https://hub.tailpipe.io/plugins/turbot/gcp/sources/gcp_storage_bucket#arguments): + +| Argument | Default | +|--------------|---------| +| file_layout | `requests/%{YEAR:year}/%{MONTHNUM:month}/%{MONTHDAY:day}/%{HOUR:hour}:%{MINUTE:minute}:%{SECOND:second}_%{DATA:end_time}_%{DATA:suffix}.json` | diff --git a/gcp/plugin.go b/gcp/plugin.go index 4574165..55d82c1 100644 --- a/gcp/plugin.go +++ b/gcp/plugin.go @@ -4,8 +4,10 @@ import ( "github.com/turbot/go-kit/helpers" "github.com/turbot/tailpipe-plugin-gcp/config" "github.com/turbot/tailpipe-plugin-gcp/sources/audit_log_api" + "github.com/turbot/tailpipe-plugin-gcp/sources/cloud_logging_api" "github.com/turbot/tailpipe-plugin-gcp/sources/storage_bucket" "github.com/turbot/tailpipe-plugin-gcp/tables/audit_log" + "github.com/turbot/tailpipe-plugin-gcp/tables/requests_log" "github.com/turbot/tailpipe-plugin-sdk/plugin" "github.com/turbot/tailpipe-plugin-sdk/row_source" "github.com/turbot/tailpipe-plugin-sdk/table" @@ -20,9 +22,11 @@ func init() { // 1. row struct // 2. table implementation table.RegisterTable[*audit_log.AuditLog, *audit_log.AuditLogTable]() + table.RegisterTable[*requests_log.RequestsLog, *requests_log.RequestsLogTable]() // register sources row_source.RegisterRowSource[*audit_log_api.AuditLogAPISource]() + row_source.RegisterRowSource[*cloud_logging_api.CloudLoggingAPISource]() row_source.RegisterRowSource[*storage_bucket.GcpStorageBucketSource]() } diff --git a/go.mod b/go.mod index d712d83..90d8cc6 100644 --- a/go.mod +++ b/go.mod @@ -82,7 +82,7 @@ require ( github.com/gabriel-vasile/mimetype v1.4.3 // indirect github.com/gertd/go-pluralize v0.2.1 // indirect github.com/go-git/go-git/v5 v5.13.0 // indirect - github.com/go-jose/go-jose/v4 v4.0.5 // indirect + github.com/go-jose/go-jose/v4 v4.0.4 // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-ole/go-ole v1.2.6 // indirect diff --git a/go.sum b/go.sum index 14d6d47..0462668 100644 --- a/go.sum +++ b/go.sum @@ -372,8 +372,8 @@ github.com/go-git/go-git/v5 v5.13.0/go.mod h1:Wjo7/JyVKtQgUNdXYXIepzWfJQkUEIGvkv github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= -github.com/go-jose/go-jose/v4 v4.0.5 h1:M6T8+mKZl/+fNNuFHvGIzDz7BTLQPIounk/b9dw3AaE= -github.com/go-jose/go-jose/v4 v4.0.5/go.mod h1:s3P1lRrkT8igV8D9OjyL4WRyHvjB6a4JSllnOrmmBOA= +github.com/go-jose/go-jose/v4 v4.0.4 h1:VsjPI33J0SB9vQM6PLmNjoHqMQNGPiZ0rHL7Ni7Q6/E= +github.com/go-jose/go-jose/v4 v4.0.4/go.mod h1:NKb5HO1EZccyMpiZNbdUw/14tiXNyUJh188dfnMCAfc= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= diff --git a/sources/cloud_logging_api/cloud_logging_api_source.go b/sources/cloud_logging_api/cloud_logging_api_source.go new file mode 100644 index 0000000..85cb363 --- /dev/null +++ b/sources/cloud_logging_api/cloud_logging_api_source.go @@ -0,0 +1,153 @@ +package cloud_logging_api + +import ( + "context" + "errors" + "fmt" + "strings" + "time" + + "cloud.google.com/go/logging" + "cloud.google.com/go/logging/logadmin" + "google.golang.org/api/iterator" + + "github.com/turbot/tailpipe-plugin-gcp/config" + "github.com/turbot/tailpipe-plugin-sdk/collection_state" + "github.com/turbot/tailpipe-plugin-sdk/row_source" + "github.com/turbot/tailpipe-plugin-sdk/schema" + "github.com/turbot/tailpipe-plugin-sdk/types" +) + +const CloudLoggingAPISourceIdentifier = "gcp_cloud_logging_api" + +// CloudLoggingAPISource source is responsible for collecting cloud logs from GCP +type CloudLoggingAPISource struct { + row_source.RowSourceImpl[*CloudLoggingAPISourceConfig, *config.GcpConnection] +} + +func (s *CloudLoggingAPISource) Init(ctx context.Context, params *row_source.RowSourceParams, opts ...row_source.RowSourceOption) error { + // set the collection state ctor + s.NewCollectionStateFunc = collection_state.NewTimeRangeCollectionState + + // call base init + return s.RowSourceImpl.Init(ctx, params, opts...) +} + +func (s *CloudLoggingAPISource) Identifier() string { + return CloudLoggingAPISourceIdentifier +} + +func (s *CloudLoggingAPISource) Collect(ctx context.Context) error { + project := s.Connection.GetProject() + var logTypes []string + if s.Config != nil && s.Config.LogTypes != nil { + logTypes = s.Config.LogTypes + } + + client, err := s.getClient(ctx, project) + if err != nil { + return err + } + defer client.Close() + + sourceName := CloudLoggingAPISourceIdentifier + sourceEnrichmentFields := &schema.SourceEnrichment{ + CommonFields: schema.CommonFields{ + TpSourceName: &sourceName, + TpSourceType: CloudLoggingAPISourceIdentifier, + TpSourceLocation: &project, + }, + } + + filter := s.getLogNameFilter(project, logTypes, s.CollectionTimeRange) + + // TODO: #ratelimit Implement rate limiting to ensure compliance with GCP API quotas. + // Use a token bucket algorithm with a maximum of 100 requests per second and a burst capacity of 200. + // Refer to the GCP API rate-limiting documentation: https://cloud.google.com/apis/docs/rate-limits + // This feature should be implemented by Q4 2023 to prevent potential throttling issues. + + // logEntry will now be the higher-level logging.Entry + var logEntry *logging.Entry + it := client.Entries(ctx, logadmin.Filter(filter), logadmin.PageSize(250)) + for { + logEntry, err = it.Next() + if err != nil && errors.Is(err, iterator.Done) { + break + } + if err != nil { + return fmt.Errorf("error fetching log entries, %w", err) + } + + if logEntry.Payload != nil { + logEntry.LogName = "" // remove logName from the log entry due to ToLogEntry requirements of an empty string + protoLogEntry, err := logging.ToLogEntry(*logEntry, project) + if err != nil { + return fmt.Errorf("error converting log entry to loggingpb.LogEntry: %w", err) + } + + if s.CollectionState.ShouldCollect(protoLogEntry.GetInsertId(), protoLogEntry.GetTimestamp().AsTime()) { + row := &types.RowData{ + Data: protoLogEntry, + SourceEnrichment: sourceEnrichmentFields, + } + + if err = s.CollectionState.OnCollected(protoLogEntry.GetInsertId(), protoLogEntry.GetTimestamp().AsTime()); err != nil { + return fmt.Errorf("error updating collection state: %w", err) + } + if err = s.OnRow(ctx, row); err != nil { + return fmt.Errorf("error processing row: %w", err) + } + } + } + } + + return nil +} + +func (s *CloudLoggingAPISource) getClient(ctx context.Context, project string) (*logadmin.Client, error) { + opts, err := s.Connection.GetClientOptions(ctx) + if err != nil { + return nil, err + } + + if project == "" { + return nil, errors.New("unable to determine active project, please set project in configuration or env var CLOUDSDK_CORE_PROJECT / GCP_PROJECT") + } + + client, err := logadmin.NewClient(ctx, project, opts...) + if err != nil { + return nil, err + } + + return client, nil +} + +func (s *CloudLoggingAPISource) getLogNameFilter(projectId string, logTypes []string, timeRange collection_state.DirectionalTimeRange) string { + requestsLog := fmt.Sprintf(`"projects/%s/logs/requests"`, projectId) + timePart := fmt.Sprintf(`AND (timestamp >= "%s") AND (timestamp < "%s")`, + timeRange.StartTime().Format(time.RFC3339Nano), + timeRange.EndTime().Format(time.RFC3339Nano)) + + // short-circuit default + if len(logTypes) == 0 { + return fmt.Sprintf("logName=%s %s", requestsLog, timePart) + } + + // Only request logs supported at implementation. Append additional cases for other log types as needed + var selected []string + for _, logType := range logTypes { + switch logType { + case "requests": + selected = append(selected, requestsLog) + } + } + + switch len(selected) { + case 0: + return fmt.Sprintf("logName=%s %s", requestsLog, timePart) + case 1: + return fmt.Sprintf("logName=%s %s", selected[0], timePart) + default: + return fmt.Sprintf("logName=(%s) %s", strings.Join(selected, " OR "), timePart) + } +} diff --git a/sources/cloud_logging_api/cloud_logging_api_source_config.go b/sources/cloud_logging_api/cloud_logging_api_source_config.go new file mode 100644 index 0000000..487c46f --- /dev/null +++ b/sources/cloud_logging_api/cloud_logging_api_source_config.go @@ -0,0 +1,37 @@ +package cloud_logging_api + +import ( + "fmt" + "strings" + + "github.com/hashicorp/hcl/v2" +) + +type CloudLoggingAPISourceConfig struct { + // required to allow partial decoding + Remain hcl.Body `hcl:",remain" json:"-"` + LogTypes []string `hcl:"log_types,optional" json:"log_types"` +} + +func (a *CloudLoggingAPISourceConfig) Validate() error { + validLogTypes := []string{"requests"} // Currently, only "requests" is supported, but keeping this a list for future expansion + + for _, logType := range a.LogTypes { + isValid := false + for _, validType := range validLogTypes { + if logType == validType { + isValid = true + break + } + } + + if !isValid { + return fmt.Errorf("invalid log type %q, valid log types are %s", logType, strings.Join(validLogTypes, ", ")) + } + } + return nil +} + +func (a *CloudLoggingAPISourceConfig) Identifier() string { + return CloudLoggingAPISourceIdentifier +} diff --git a/tables/requests_log/requests_log.go b/tables/requests_log/requests_log.go new file mode 100644 index 0000000..3033ff8 --- /dev/null +++ b/tables/requests_log/requests_log.go @@ -0,0 +1,145 @@ +package requests_log + +import ( + "time" + + "github.com/turbot/tailpipe-plugin-sdk/schema" +) + +// RequestsLog represents an enriched row ready for parquet writing +type RequestsLog struct { + // embed required enrichment fields + schema.CommonFields + + // Mandatory fields + Timestamp time.Time `json:"timestamp"` + ReceiveTimestamp time.Time `json:"receive_timestamp"` + LogName string `json:"log_name"` + InsertId string `json:"insert_id"` + Severity string `json:"severity"` + Trace string `json:"trace_id"` + SpanId string `json:"span_id"` + TraceSampled bool `json:"trace_sampled"` + + // the json payload fields from the requests log, moved to the top level + BackendTargetProjectNumber string `json:"backend_target_project_number,omitempty"` + CacheDecision []string `json:"cache_decision,omitempty"` + RemoteIp string `json:"remote_ip,omitempty"` + StatusDetails string `json:"status_details,omitempty"` + EnforcedSecurityPolicy *RequestLogSecurityPolicy `json:"enforced_security_policy" parquet:"type=JSON"` + PreviewSecurityPolicy *RequestLogSecurityPolicy `json:"preview_security_policy,omitempty" parquet:"type=JSON"` + SecurityPolicyRequestData *RequestLogSecurityPolicyRequestData `json:"security_policy_request_data,omitempty" parquet:"type=JSON"` + EnforcedEdgeSecurityPolicy *RequestLogEdgeSecurityPolicy `json:"enforced_edge_security_policy,omitempty" parquet:"type=JSON"` + PreviewEdgeSecurityPolicy *RequestLogEdgeSecurityPolicy `json:"preview_edge_security_policy,omitempty" parquet:"type=JSON"` + + // other top level fields + Resource *RequestLogResource `json:"resource,omitempty" parquet:"type=JSON"` + HttpRequest *RequestLogHttpRequest `json:"http_request,omitempty" parquet:"type=JSON"` +} + +func NewRequestsLog() *RequestsLog { + return &RequestsLog{} +} + +type RequestLogResource struct { + Type string `json:"type,omitempty"` + Labels map[string]string `json:"labels,omitempty" parquet:"type=JSON"` +} + +type RequestLogHttpRequest struct { + RequestMethod string `json:"request_method,omitempty"` + RequestUrl string `json:"request_url,omitempty"` + RequestSize string `json:"request_size,omitempty"` + Referer string `json:"referer,omitempty"` + Status int32 `json:"status,omitempty"` + ResponseSize string `json:"response_size,omitempty"` + RemoteIp string `json:"remote_ip,omitempty"` + ServerIp string `json:"server_ip,omitempty"` + Latency string `json:"latency,omitempty"` + Protocol string `json:"protocol,omitempty"` + CacheHit bool `json:"cache_hit,omitempty"` + CacheLookup bool `json:"cache_lookup,omitempty"` + CacheValidatedWithOriginServer bool `json:"cache_validated_with_origin_server,omitempty"` + CacheFillBytes string `json:"cache_fill_bytes,omitempty"` + UserAgent string `json:"user_agent,omitempty"` +} + +type RequestLogSecurityPolicy struct { + ConfiguredAction string `json:"configured_action,omitempty"` + RateLimitAction *RequestLogRateLimitAction `json:"rate_limit_action,omitempty"` + Name string `json:"name,omitempty"` + Outcome string `json:"outcome,omitempty"` + Priority int `json:"priority,omitempty"` + PreconfiguredExprId string `json:"preconfigured_expr_id,omitempty"` + ThreatIntelligence *RequestLogThreatIntelligence `json:"threat_intelligence,omitempty"` + AddressGroup *RequestLogAddressGroup `json:"address_group,omitempty"` + MatchedFieldType string `json:"matched_field_type,omitempty"` + MatchedFieldValue string `json:"matched_field_value,omitempty"` + MatchedFieldName string `json:"matched_field_name,omitempty"` + MatchedOffset int `json:"matched_offset,omitempty"` + MatchedLength int `json:"matched_length,omitempty"` +} + +type RequestLogRateLimitAction struct { + Key string `json:"key,omitempty"` + Outcome string `json:"outcome,omitempty"` +} + +type RequestLogThreatIntelligence struct { + Categories []string `json:"categories,omitempty"` +} + +type RequestLogAddressGroup struct { + Names []string `json:"names,omitempty"` +} + +type RequestLogSecurityPolicyRequestData struct { + RemoteIpInfo *RequestLogRemoteIpInfo `json:"remote_ip_info,omitempty"` + RecaptchaActionToken *RequestLogRecaptchaToken `json:"recaptcha_action_token,omitempty"` + RecaptchaSessionToken *RequestLogRecaptchaToken `json:"recaptcha_session_token,omitempty"` + TlsJa3Fingerprint string `json:"tls_ja3_fingerprint,omitempty"` + TlsJa4Fingerprint string `json:"tls_ja4_fingerprint,omitempty"` +} + +type RequestLogRemoteIpInfo struct { + Asn int `json:"asn,omitempty"` + RegionCode string `json:"region_code,omitempty"` +} + +type RequestLogEdgeSecurityPolicy struct { + Name string `json:"name,omitempty"` + Priority int `json:"priority,omitempty"` + ConfiguredAction string `json:"configured_action,omitempty"` + Outcome string `json:"outcome,omitempty"` +} + +type RequestLogRecaptchaToken struct { + Score float64 `json:"score,omitempty"` +} + +func (a *RequestsLog) GetColumnDescriptions() map[string]string { + return map[string]string{ + "timestamp": "The date and time when the request was received, in ISO 8601 format.", + "receive_timestamp": "The time when the log entry was received by Cloud Logging.", + "log_name": "The name of the log that recorded the request, e.g., 'projects/[PROJECT_ID]/logs/requests'.", + "insert_id": "A unique identifier for the log entry, used to prevent duplicate log entries.", + "severity": "The severity level of the log entry (e.g., 'INFO', 'WARNING', 'ERROR', 'CRITICAL').", + "trace_id": "The unique trace ID associated with the request, used for distributed tracing.", + "trace_sampled": "Indicates whether the request trace was sampled for analysis (true or false).", + "span_id": "The span ID for the request, used in distributed tracing to identify specific operations.", + "resource": "The monitored resource associated with the log entry, including type and labels.", + "http_request": "Details about the HTTP request associated with the log entry, if available (present in application load balancer logs).", + "backend_target_project_number": "The project number of the backend target.", + "cache_decision": "A list of cache decisions made for the request.", + "enforced_security_policy": "Details about the enforced security policy for the request.", + "preview_security_policy": "Details about the preview security policy for the request, if any.", + "security_policy_request_data": "Additional data about the security policy request.", + "enforced_edge_security_policy": "Details about the enforced edge security policy for the request.", + "preview_edge_security_policy": "Details about the preview edge security policy for the request, if any.", + "remote_ip": "The remote IP address from which the request originated.", + "status_details": "Additional status details for the request.", + + // Override table specific tp_* column descriptions + "tp_index": "The GCP project.", + } +} diff --git a/tables/requests_log/requests_log_mapper.go b/tables/requests_log/requests_log_mapper.go new file mode 100644 index 0000000..0fe750f --- /dev/null +++ b/tables/requests_log/requests_log_mapper.go @@ -0,0 +1,417 @@ +//nolint:staticcheck +package requests_log + +import ( + "context" + "encoding/json" + "fmt" + "strconv" + "time" + + // for debugging + + "cloud.google.com/go/logging" + loggingpb "cloud.google.com/go/logging/apiv2/loggingpb" + + "github.com/turbot/tailpipe-plugin-sdk/mappers" +) + +type RequestsLogMapper struct { +} + +func (m *RequestsLogMapper) Identifier() string { + return "gcp_requests_log_mapper" +} + +func (m *RequestsLogMapper) Map(_ context.Context, a any, _ ...mappers.MapOption[*RequestsLog]) (*RequestsLog, error) { + switch v := a.(type) { + case string: + return mapFromBucketJson([]byte(v)) + case *loggingpb.LogEntry: + return mapFromSDKType(v) + case *logging.Entry: + return nil, fmt.Errorf("logging.Entry did not convert to *loggingpb.LogEntry: %T", a) + case []byte: + return mapFromBucketJson(v) + default: + return nil, fmt.Errorf("expected loggingpb.LogEntry, string or []byte, got %T", a) + } +} + +func mapFromSDKType(item *loggingpb.LogEntry) (*RequestsLog, error) { + + row := NewRequestsLog() + + // === 2. Map common LogEntry fields === + row.Timestamp = item.GetTimestamp().AsTime() + row.LogName = item.GetLogName() + row.InsertId = item.GetInsertId() + row.Severity = item.GetSeverity().String() + row.ReceiveTimestamp = item.GetReceiveTimestamp().AsTime() + row.Trace = item.GetTrace() + row.SpanId = item.GetSpanId() + row.TraceSampled = item.GetTraceSampled() + + // === 3. Map Resource === + if item.GetResource() != nil { + row.Resource = &RequestLogResource{ + Type: item.GetResource().GetType(), + Labels: item.GetResource().GetLabels(), + } + } + + // === 4. Map JsonPayload (for LB/Cloud Armor specific data) === + jsonPayload := item.GetJsonPayload().AsMap() + if jsonPayload == nil { + jsonPayload = make(map[string]interface{}) + } + + // Safely extract string fields with type checking + if v, ok := jsonPayload["backendTargetProjectNumber"].(string); ok { + row.BackendTargetProjectNumber = v + } + + // Handle CacheDecision specifically: + if rawCacheDecision, ok := jsonPayload["cacheDecision"].([]interface{}); ok { + // Iterate over the []interface{} and append string elements to row.CacheDecision + for _, v := range rawCacheDecision { + if s, ok := v.(string); ok { + row.CacheDecision = append(row.CacheDecision, s) + } + } + } + + if v, ok := jsonPayload["remoteIp"].(string); ok { + row.RemoteIp = v + } + + if v, ok := jsonPayload["statusDetails"].(string); ok { + row.StatusDetails = v + } + + // Safely extract security policy map + if securityPolicyMap, ok := jsonPayload["enforcedSecurityPolicy"].(map[string]interface{}); ok && securityPolicyMap != nil { + policy := &RequestLogSecurityPolicy{} + if val, ok := securityPolicyMap["configuredAction"].(string); ok { + policy.ConfiguredAction = val + } + if val, ok := securityPolicyMap["name"].(string); ok { + policy.Name = val + } + if val, ok := securityPolicyMap["outcome"].(string); ok { + policy.Outcome = val + } + if val, ok := securityPolicyMap["priority"].(float64); ok { + policy.Priority = int(val) + } + + if rawIds, ok := securityPolicyMap["preconfiguredExpressionIds"].([]interface{}); ok && len(rawIds) > 0 { + if ruleId, ok := rawIds[0].(string); ok { + policy.PreconfiguredExprId = ruleId + } + } + row.EnforcedSecurityPolicy = policy + } + + if previewPolicyMap, ok := jsonPayload["previewSecurityPolicy"].(map[string]interface{}); ok { + // If it exists, initialize the PreviewSecurityPolicy struct + row.PreviewSecurityPolicy = &RequestLogSecurityPolicy{} + + // Safely extract fields with type checking + if v, ok := previewPolicyMap["configuredAction"].(string); ok { + row.PreviewSecurityPolicy.ConfiguredAction = v + } + if v, ok := previewPolicyMap["name"].(string); ok { + row.PreviewSecurityPolicy.Name = v + } + if v, ok := previewPolicyMap["outcome"].(string); ok { + row.PreviewSecurityPolicy.Outcome = v + } + if v, ok := previewPolicyMap["priority"].(float64); ok { + row.PreviewSecurityPolicy.Priority = int(v) + } + + // Handle PreconfiguredExpressionIds within PreviewSecurityPolicy only if it exists and has values. + if rawIds, ok := previewPolicyMap["preconfiguredExpressionIds"].([]interface{}); ok && len(rawIds) > 0 { + if ruleId, ok := rawIds[0].(string); ok { + row.PreviewSecurityPolicy.PreconfiguredExprId = ruleId + } + } + } + + // Map SecurityPolicyRequestData if present + if secPolicyData, ok := jsonPayload["securityPolicyRequestData"].(map[string]interface{}); ok { + row.SecurityPolicyRequestData = &RequestLogSecurityPolicyRequestData{} + + if v, ok := secPolicyData["tlsJa3Fingerprint"].(string); ok { + row.SecurityPolicyRequestData.TlsJa3Fingerprint = v + } + if v, ok := secPolicyData["tlsJa4Fingerprint"].(string); ok { + row.SecurityPolicyRequestData.TlsJa4Fingerprint = v + } + + if remoteIpInfo, ok := secPolicyData["remoteIpInfo"].(map[string]interface{}); ok { + row.SecurityPolicyRequestData.RemoteIpInfo = &RequestLogRemoteIpInfo{} + if v, ok := remoteIpInfo["asn"].(float64); ok { + row.SecurityPolicyRequestData.RemoteIpInfo.Asn = int(v) + } + if v, ok := remoteIpInfo["regionCode"].(string); ok { + row.SecurityPolicyRequestData.RemoteIpInfo.RegionCode = v + } + } + } + + if item.GetHttpRequest() == nil { + row.HttpRequest = &RequestLogHttpRequest{} + } else { + httpRequestPb := item.GetHttpRequest() + row.HttpRequest = &RequestLogHttpRequest{ + RequestMethod: httpRequestPb.GetRequestMethod(), + RequestUrl: httpRequestPb.GetRequestUrl(), + RequestSize: strconv.FormatInt(httpRequestPb.GetRequestSize(), 10), + Referer: httpRequestPb.GetReferer(), + UserAgent: httpRequestPb.GetUserAgent(), + Status: httpRequestPb.GetStatus(), + ResponseSize: strconv.FormatInt(httpRequestPb.GetResponseSize(), 10), + RemoteIp: httpRequestPb.GetRemoteIp(), + Latency: func() string { + if lat := httpRequestPb.GetLatency(); lat != nil { + return lat.String() + } + return "" + }(), + ServerIp: httpRequestPb.GetServerIp(), + Protocol: httpRequestPb.GetProtocol(), + CacheFillBytes: strconv.FormatInt(httpRequestPb.GetCacheFillBytes(), 10), + CacheLookup: httpRequestPb.GetCacheLookup(), + CacheHit: httpRequestPb.GetCacheHit(), + CacheValidatedWithOriginServer: httpRequestPb.GetCacheValidatedWithOriginServer(), + } + } + + return row, nil +} + +func mapFromBucketJson(itemBytes []byte) (*RequestsLog, error) { + var log requestsLog + if err := json.Unmarshal(itemBytes, &log); err != nil { + return nil, fmt.Errorf("failed to parse requests log JSON: %w", err) + } + + row := NewRequestsLog() + + // Map top-level fields + row.Timestamp = log.Timestamp + row.ReceiveTimestamp = log.ReceiveTimestamp + row.LogName = log.LogName + row.InsertId = log.InsertId + row.Severity = log.Severity + row.Trace = log.Trace + row.SpanId = log.SpanId + row.TraceSampled = log.TraceSampled + + // Only create objects if they exist in the source log. + // This avoids creating empty-but-non-nil objects that the downstream + // validator might reject. + + if log.Resource != nil { + row.Resource = &RequestLogResource{ + Type: log.Resource.Type, + Labels: func() map[string]string { + if log.Resource.Labels != nil { + return log.Resource.Labels + } + return make(map[string]string) + }(), + } + } + + // Map JSON Payload fields + row.BackendTargetProjectNumber = log.JsonPayload.BackendTargetProjectNumber + row.CacheDecision = log.JsonPayload.CacheDecision + row.RemoteIp = log.JsonPayload.RemoteIp + row.StatusDetails = log.JsonPayload.StatusDetails + + if log.JsonPayload.EnforcedSecurityPolicy != nil { + ids := []string{} + if log.JsonPayload.EnforcedSecurityPolicy.PreconfiguredExprIds != nil { + ids = log.JsonPayload.EnforcedSecurityPolicy.PreconfiguredExprIds + } + var exprId string + if len(ids) > 0 { + exprId = ids[0] + } + row.EnforcedSecurityPolicy = &RequestLogSecurityPolicy{ + ConfiguredAction: log.JsonPayload.EnforcedSecurityPolicy.ConfiguredAction, + Name: log.JsonPayload.EnforcedSecurityPolicy.Name, + Outcome: log.JsonPayload.EnforcedSecurityPolicy.Outcome, + Priority: log.JsonPayload.EnforcedSecurityPolicy.Priority, + MatchedFieldType: log.JsonPayload.EnforcedSecurityPolicy.MatchedFieldType, + MatchedFieldValue: log.JsonPayload.EnforcedSecurityPolicy.MatchedFieldValue, + MatchedFieldName: log.JsonPayload.EnforcedSecurityPolicy.MatchedFieldName, + MatchedOffset: log.JsonPayload.EnforcedSecurityPolicy.MatchedOffset, + MatchedLength: log.JsonPayload.EnforcedSecurityPolicy.MatchedLength, + PreconfiguredExprId: exprId, + } + } + + if log.JsonPayload.PreviewSecurityPolicy != nil { + ids := []string{} + if log.JsonPayload.PreviewSecurityPolicy.PreconfiguredExprIds != nil { + ids = log.JsonPayload.PreviewSecurityPolicy.PreconfiguredExprIds + } + // preconfiguredExprIds is always an array of one string, grab the index 0 slice and case this to a string in the row struct + var exprId string + if len(ids) > 0 { + exprId = ids[0] + } + row.PreviewSecurityPolicy = &RequestLogSecurityPolicy{ + ConfiguredAction: log.JsonPayload.PreviewSecurityPolicy.ConfiguredAction, + Name: log.JsonPayload.PreviewSecurityPolicy.Name, + Outcome: log.JsonPayload.PreviewSecurityPolicy.Outcome, + Priority: log.JsonPayload.PreviewSecurityPolicy.Priority, + MatchedFieldType: log.JsonPayload.PreviewSecurityPolicy.MatchedFieldType, + MatchedFieldValue: log.JsonPayload.PreviewSecurityPolicy.MatchedFieldValue, + MatchedFieldName: log.JsonPayload.PreviewSecurityPolicy.MatchedFieldName, + MatchedOffset: log.JsonPayload.PreviewSecurityPolicy.MatchedOffset, + MatchedLength: log.JsonPayload.PreviewSecurityPolicy.MatchedLength, + PreconfiguredExprId: exprId, + } + } + + if log.JsonPayload.SecurityPolicyRequestData != nil { + row.SecurityPolicyRequestData = &RequestLogSecurityPolicyRequestData{ + TlsJa3Fingerprint: log.JsonPayload.SecurityPolicyRequestData.TlsJa3Fingerprint, + TlsJa4Fingerprint: log.JsonPayload.SecurityPolicyRequestData.TlsJa4Fingerprint, + // Always initialize the nested object to be safe. + RemoteIpInfo: &RequestLogRemoteIpInfo{}, + } + + if log.JsonPayload.SecurityPolicyRequestData.RemoteIpInfo != nil { + row.SecurityPolicyRequestData.RemoteIpInfo = &RequestLogRemoteIpInfo{ + Asn: log.JsonPayload.SecurityPolicyRequestData.RemoteIpInfo.Asn, + RegionCode: log.JsonPayload.SecurityPolicyRequestData.RemoteIpInfo.RegionCode, + } + } + } + + if log.HttpRequest == nil { + row.HttpRequest = &RequestLogHttpRequest{} + } else { + row.HttpRequest = &RequestLogHttpRequest{ + RequestMethod: log.HttpRequest.RequestMethod, + RequestUrl: log.HttpRequest.RequestURL, + RequestSize: log.HttpRequest.RequestSize, + Status: log.HttpRequest.Status, + ResponseSize: log.HttpRequest.ResponseSize, + UserAgent: log.HttpRequest.UserAgent, + RemoteIp: log.HttpRequest.RemoteIP, + ServerIp: log.HttpRequest.ServerIP, + Referer: log.HttpRequest.Referer, + Latency: log.HttpRequest.Latency, + CacheLookup: log.HttpRequest.CacheLookup, + CacheHit: log.HttpRequest.CacheHit, + CacheValidatedWithOriginServer: log.HttpRequest.CacheValidatedWithOriginServer, + CacheFillBytes: log.HttpRequest.CacheFillBytes, + } + } + + return row, nil +} + +type requestsLog struct { + InsertId string `json:"insertId"` + LogName string `json:"logName"` + Resource *resource `json:"resource,omitempty"` + Timestamp time.Time `json:"timestamp"` + Severity string `json:"severity"` + JsonPayload *jsonPayload `json:"jsonPayload,omitempty"` + ReceiveTimestamp time.Time `json:"receiveTimestamp"` + Trace string `json:"trace,omitempty"` + SpanId string `json:"spanId,omitempty"` + HttpRequest *httpRequest `json:"httpRequest,omitempty"` + TraceSampled bool `json:"traceSampled,omitempty"` +} + +type resource struct { + Type string `json:"type"` + Labels map[string]string `json:"labels"` +} + +type jsonPayload struct { + TypeName string `json:"@type"` + BackendTargetProjectNumber string `json:"backendTargetProjectNumber"` + CacheDecision []string `json:"cacheDecision"` + CacheId string `json:"cacheId,omitempty"` + CompressionStatus string `json:"compressionStatus,omitempty"` + EnforcedSecurityPolicy *requestLogSecurityPolicy `json:"enforcedSecurityPolicy"` + PreviewSecurityPolicy *requestLogSecurityPolicy `json:"previewSecurityPolicy,omitempty"` + SecurityPolicyRequestData *requestLogSecurityPolicyRequestData `json:"securityPolicyRequestData"` + RemoteIp string `json:"remoteIp"` + StatusDetails string `json:"statusDetails"` +} + +type httpRequest struct { + RequestMethod string `json:"requestMethod"` + RequestURL string `json:"requestUrl"` + RequestSize string `json:"requestSize,omitempty"` + Status int32 `json:"status"` + ResponseSize string `json:"responseSize,omitempty"` + UserAgent string `json:"userAgent"` + RemoteIP string `json:"remoteIp"` + ServerIP string `json:"serverIp,omitempty"` + Referer string `json:"referer,omitempty"` + Latency string `json:"latency,omitempty"` + CacheLookup bool `json:"cacheLookup,omitempty"` + CacheHit bool `json:"cacheHit,omitempty"` + CacheValidatedWithOriginServer bool `json:"cacheValidatedWithOriginServer,omitempty"` + CacheFillBytes string `json:"cacheFillBytes,omitempty"` +} + +type requestLogSecurityPolicy struct { + ConfiguredAction string `json:"configuredAction"` + Name string `json:"name"` + Outcome string `json:"outcome"` + Priority int `json:"priority"` + PreconfiguredExprIds []string `json:"preconfiguredExprIds,omitempty"` + RateLimitAction *requestLogRateLimitAction `json:"rateLimitAction,omitempty"` + ThreatIntelligence *requestLogThreatIntelligence `json:"threatIntelligence,omitempty"` + AddressGroup *requestLogAddressGroup `json:"addressGroup,omitempty"` + MatchedFieldType string `json:"matchedFieldType,omitempty"` + MatchedFieldValue string `json:"matchedFieldValue,omitempty"` + MatchedFieldName string `json:"matchedFieldName,omitempty"` + MatchedOffset int `json:"matchedOffset,omitempty"` + MatchedLength int `json:"matchedLength,omitempty"` +} + +// (No replacement lines; the block is removed entirely.) + +type requestLogSecurityPolicyRequestData struct { + RemoteIpInfo *requestLogRemoteIpInfo `json:"remoteIpInfo"` + TlsJa3Fingerprint string `json:"tlsJa3Fingerprint"` + TlsJa4Fingerprint string `json:"tlsJa4Fingerprint"` + RecaptchaActionToken *requestLogRecaptchaToken `json:"recaptchaActionToken,omitempty"` + RecaptchaSessionToken *requestLogRecaptchaToken `json:"recaptchaSessionToken,omitempty"` +} + +type requestLogRemoteIpInfo struct { + Asn int `json:"asn"` + RegionCode string `json:"regionCode"` +} + +type requestLogRecaptchaToken struct { + Score float64 `json:"score"` +} + +type requestLogRateLimitAction struct { + Key string `json:"key"` + Outcome string `json:"outcome"` +} + +type requestLogThreatIntelligence struct { + Categories []string `json:"categories"` +} + +type requestLogAddressGroup struct { + Names []string `json:"names"` +} diff --git a/tables/requests_log/requests_log_table.go b/tables/requests_log/requests_log_table.go new file mode 100644 index 0000000..3d097cc --- /dev/null +++ b/tables/requests_log/requests_log_table.go @@ -0,0 +1,98 @@ +package requests_log + +import ( + "time" + + "github.com/rs/xid" + + "github.com/turbot/pipe-fittings/v2/utils" + "github.com/turbot/tailpipe-plugin-gcp/sources/cloud_logging_api" + "github.com/turbot/tailpipe-plugin-gcp/sources/storage_bucket" + "github.com/turbot/tailpipe-plugin-sdk/artifact_source" + "github.com/turbot/tailpipe-plugin-sdk/artifact_source_config" + "github.com/turbot/tailpipe-plugin-sdk/constants" + "github.com/turbot/tailpipe-plugin-sdk/row_source" + "github.com/turbot/tailpipe-plugin-sdk/schema" + "github.com/turbot/tailpipe-plugin-sdk/table" +) + +const RequestsLogTableIdentifier string = "gcp_requests_log" + +type RequestsLogTable struct { +} + +func (c *RequestsLogTable) Identifier() string { + return RequestsLogTableIdentifier +} + +func (c *RequestsLogTable) GetSourceMetadata() ([]*table.SourceMetadata[*RequestsLog], error) { + defaultArtifactConfig := &artifact_source_config.ArtifactSourceConfigImpl{ + FileLayout: utils.ToStringPointer("requests/%{YEAR:year}/%{MONTHNUM:month}/%{MONTHDAY:day}/%{HOUR:hour}:%{MINUTE:minute}:%{SECOND:second}_%{DATA:end_time}_%{DATA:suffix}.json"), + } + + return []*table.SourceMetadata[*RequestsLog]{ + { + SourceName: cloud_logging_api.CloudLoggingAPISourceIdentifier, + Mapper: &RequestsLogMapper{}, + }, + { + SourceName: storage_bucket.GcpStorageBucketSourceIdentifier, + Mapper: &RequestsLogMapper{}, + Options: []row_source.RowSourceOption{ + artifact_source.WithDefaultArtifactSourceConfig(defaultArtifactConfig), + artifact_source.WithRowPerLine(), + }, + }, + { + SourceName: constants.ArtifactSourceIdentifier, + Mapper: &RequestsLogMapper{}, + Options: []row_source.RowSourceOption{ + artifact_source.WithDefaultArtifactSourceConfig(defaultArtifactConfig), + }, + }, + }, nil +} + +func (c *RequestsLogTable) EnrichRow(row *RequestsLog, sourceEnrichmentFields schema.SourceEnrichment) (*RequestsLog, error) { + if row == nil { + return nil, nil + } + + row.CommonFields = sourceEnrichmentFields.CommonFields + + row.TpID = xid.New().String() + row.TpTimestamp = row.Timestamp + row.TpIngestTimestamp = time.Now() + row.TpDate = row.Timestamp.Truncate(24 * time.Hour) + + // Ensure TpIps is always initialized (even if empty) + if row.TpIps == nil { + row.TpIps = []string{} + } + + // Set TpDestinationIP and TpSourceIP to non-nil pointers, even if empty + emptyStr := "" + if row.HttpRequest != nil { + if row.HttpRequest.RemoteIp != "" { + row.TpIps = append(row.TpIps, row.HttpRequest.RemoteIp) + row.TpSourceIP = &row.HttpRequest.RemoteIp + } else { + row.TpSourceIP = &emptyStr + } + if row.HttpRequest.ServerIp != "" { + row.TpIps = append(row.TpIps, row.HttpRequest.ServerIp) + row.TpDestinationIP = &row.HttpRequest.ServerIp + } else { + row.TpDestinationIP = &emptyStr + } + } else { + row.TpDestinationIP = &emptyStr + row.TpSourceIP = &emptyStr + } + + return row, nil +} + +func (c *RequestsLogTable) GetDescription() string { + return "GCP Request Logs track requests to Google Cloud services including application load balancer logs and Cloud Armor logs, capturing request events for security and compliance monitoring." +}