From fa112139849f896fe3db0ffe8c9caf2c7f88ced4 Mon Sep 17 00:00:00 2001 From: Jason Mogavero Date: Tue, 24 Jun 2025 11:36:15 -0400 Subject: [PATCH 1/9] cloud armor + load balancer tables added --- Makefile | 8 + gcp/plugin.go | 4 + go.mod | 2 +- go.sum | 4 +- .../cloud_logging_api_source.go | 158 +++++++++ .../cloud_logging_api_source_config.go | 37 ++ tables/requests_log/requests_log.go | 107 ++++++ tables/requests_log/requests_log_mapper.go | 335 ++++++++++++++++++ tables/requests_log/requests_log_table.go | 99 ++++++ 9 files changed, 751 insertions(+), 3 deletions(-) create mode 100644 sources/cloud_logging_api/cloud_logging_api_source.go create mode 100644 sources/cloud_logging_api/cloud_logging_api_source_config.go create mode 100644 tables/requests_log/requests_log.go create mode 100644 tables/requests_log/requests_log_mapper.go create mode 100644 tables/requests_log/requests_log_table.go diff --git a/Makefile b/Makefile index b83511a..2de7f28 100644 --- a/Makefile +++ b/Makefile @@ -6,7 +6,15 @@ PLUGIN_BINARY = $(PLUGIN_DIR)/tailpipe-plugin-gcp.plugin VERSION_JSON = $(PLUGIN_DIR)/version.json VERSIONS_JSON = $(TAILPIPE_INSTALL_DIR)/plugins/versions.json +.PHONY: install debug + install: go build -o $(PLUGIN_BINARY) -tags "${BUILD_TAGS}" *.go $(PLUGIN_BINARY) metadata > $(VERSION_JSON) + rm -f $(VERSIONS_JSON) + +debug: + @echo "Building and installing debug plugin…" + go build -gcflags="all=-N -l" -o $(PLUGIN_BINARY) -tags "${BUILD_TAGS}" *.go + $(PLUGIN_BINARY) metadata > $(VERSION_JSON) rm -f $(VERSIONS_JSON) \ No newline at end of file diff --git a/gcp/plugin.go b/gcp/plugin.go index 4574165..55d82c1 100644 --- a/gcp/plugin.go +++ b/gcp/plugin.go @@ -4,8 +4,10 @@ import ( "github.com/turbot/go-kit/helpers" "github.com/turbot/tailpipe-plugin-gcp/config" "github.com/turbot/tailpipe-plugin-gcp/sources/audit_log_api" + "github.com/turbot/tailpipe-plugin-gcp/sources/cloud_logging_api" "github.com/turbot/tailpipe-plugin-gcp/sources/storage_bucket" "github.com/turbot/tailpipe-plugin-gcp/tables/audit_log" + "github.com/turbot/tailpipe-plugin-gcp/tables/requests_log" "github.com/turbot/tailpipe-plugin-sdk/plugin" "github.com/turbot/tailpipe-plugin-sdk/row_source" "github.com/turbot/tailpipe-plugin-sdk/table" @@ -20,9 +22,11 @@ func init() { // 1. row struct // 2. table implementation table.RegisterTable[*audit_log.AuditLog, *audit_log.AuditLogTable]() + table.RegisterTable[*requests_log.RequestsLog, *requests_log.RequestsLogTable]() // register sources row_source.RegisterRowSource[*audit_log_api.AuditLogAPISource]() + row_source.RegisterRowSource[*cloud_logging_api.CloudLoggingAPISource]() row_source.RegisterRowSource[*storage_bucket.GcpStorageBucketSource]() } diff --git a/go.mod b/go.mod index fb1f3e3..902cef6 100644 --- a/go.mod +++ b/go.mod @@ -82,7 +82,7 @@ require ( github.com/gabriel-vasile/mimetype v1.4.3 // indirect github.com/gertd/go-pluralize v0.2.1 // indirect github.com/go-git/go-git/v5 v5.13.0 // indirect - github.com/go-jose/go-jose/v4 v4.0.5 // indirect + github.com/go-jose/go-jose/v4 v4.0.4 // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-ole/go-ole v1.2.6 // indirect diff --git a/go.sum b/go.sum index 52a34a2..b144132 100644 --- a/go.sum +++ b/go.sum @@ -372,8 +372,8 @@ github.com/go-git/go-git/v5 v5.13.0/go.mod h1:Wjo7/JyVKtQgUNdXYXIepzWfJQkUEIGvkv github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= -github.com/go-jose/go-jose/v4 v4.0.5 h1:M6T8+mKZl/+fNNuFHvGIzDz7BTLQPIounk/b9dw3AaE= -github.com/go-jose/go-jose/v4 v4.0.5/go.mod h1:s3P1lRrkT8igV8D9OjyL4WRyHvjB6a4JSllnOrmmBOA= +github.com/go-jose/go-jose/v4 v4.0.4 h1:VsjPI33J0SB9vQM6PLmNjoHqMQNGPiZ0rHL7Ni7Q6/E= +github.com/go-jose/go-jose/v4 v4.0.4/go.mod h1:NKb5HO1EZccyMpiZNbdUw/14tiXNyUJh188dfnMCAfc= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= diff --git a/sources/cloud_logging_api/cloud_logging_api_source.go b/sources/cloud_logging_api/cloud_logging_api_source.go new file mode 100644 index 0000000..857891a --- /dev/null +++ b/sources/cloud_logging_api/cloud_logging_api_source.go @@ -0,0 +1,158 @@ +package cloud_logging_api + +import ( + "context" + "errors" + "fmt" + "strings" // Assuming this is still used in getLogNameFilter + "time" + + "cloud.google.com/go/logging" + loggingpb "cloud.google.com/go/logging/apiv2/loggingpb" + "cloud.google.com/go/logging/logadmin" + "google.golang.org/api/iterator" + proto "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/anypb" + + "github.com/turbot/tailpipe-plugin-gcp/config" + "github.com/turbot/tailpipe-plugin-sdk/collection_state" + "github.com/turbot/tailpipe-plugin-sdk/row_source" + "github.com/turbot/tailpipe-plugin-sdk/schema" + "github.com/turbot/tailpipe-plugin-sdk/types" +) + +const CloudLoggingAPISourceIdentifier = "gcp_cloud_logging_api" + +// CloudLoggingAPISource source is responsible for collecting cloud logs from GCP +type CloudLoggingAPISource struct { + row_source.RowSourceImpl[*CloudLoggingAPISourceConfig, *config.GcpConnection] +} + +func (s *CloudLoggingAPISource) Init(ctx context.Context, params *row_source.RowSourceParams, opts ...row_source.RowSourceOption) error { + // set the collection state ctor + s.NewCollectionStateFunc = collection_state.NewTimeRangeCollectionState + + // call base init + return s.RowSourceImpl.Init(ctx, params, opts...) +} + +func (s *CloudLoggingAPISource) Identifier() string { + return CloudLoggingAPISourceIdentifier +} + +func (s *CloudLoggingAPISource) Collect(ctx context.Context) error { + project := s.Connection.GetProject() + var logTypes []string + if s.Config != nil && s.Config.LogTypes != nil { + logTypes = s.Config.LogTypes + } + + client, err := s.getClient(ctx, project) + if err != nil { + return err + } + defer client.Close() + + sourceName := CloudLoggingAPISourceIdentifier + sourceEnrichmentFields := &schema.SourceEnrichment{ + CommonFields: schema.CommonFields{ + TpSourceName: &sourceName, + TpSourceType: CloudLoggingAPISourceIdentifier, + TpSourceLocation: &project, + }, + } + + filter := s.getLogNameFilter(project, logTypes, s.FromTime) + + // TODO: #ratelimit implement rate limiting + // logEntry will now be the higher-level logging.Entry + var logEntry *logging.Entry + it := client.Entries(ctx, logadmin.Filter(filter), logadmin.PageSize(250)) + for { + logEntry, err = it.Next() + if err != nil && errors.Is(err, iterator.Done) { + break + } + if err != nil { + return fmt.Errorf("error fetching log entries, %w", err) + } + + if logEntry.Payload != nil { + var protoLogEntry loggingpb.LogEntry + // Unmarshal the anypb.Any into loggingpb.LogEntry + if anyPayload, ok := logEntry.Payload.(*anypb.Any); ok { + // If the assertion is successful, 'anyPayload' is now of type *anypb.Any + // and can be used with anypb.UnmarshalTo. + err := anypb.UnmarshalTo(anyPayload, &protoLogEntry, proto.UnmarshalOptions{}) + if err != nil { + return fmt.Errorf("Warning: Could not unmarshal anypb.Any from Payload to loggingpb.LogEntry for log ID %s: %v", logEntry.InsertID, err) + } + } + + // Use pbEntry (the *loggingpb.LogEntry) for collection state and RowData + // Note: CollectionState.ShouldCollect and OnCollected will now use pbEntry's fields + // Ensure pbEntry.Timestamp is converted to time.Time if ShouldCollect expects it. + if s.CollectionState.ShouldCollect(logEntry.InsertID, logEntry.Timestamp) { + row := &types.RowData{ + Data: logEntry, // Pass the *loggingpb.LogEntry to the RowData + SourceEnrichment: sourceEnrichmentFields, + } + + if err = s.CollectionState.OnCollected(logEntry.InsertID, logEntry.Timestamp); err != nil { + return fmt.Errorf("error updating collection state: %w", err) + } + if err = s.OnRow(ctx, row); err != nil { + return fmt.Errorf("error processing row: %w", err) + } + } + } + } + + return nil +} + +func (s *CloudLoggingAPISource) getClient(ctx context.Context, project string) (*logadmin.Client, error) { + opts, err := s.Connection.GetClientOptions(ctx) + if err != nil { + return nil, err + } + + if project == "" { + return nil, errors.New("unable to determine active project, please set project in configuration or env var CLOUDSDK_CORE_PROJECT / GCP_PROJECT") + } + + client, err := logadmin.NewClient(ctx, project, opts...) + if err != nil { + return nil, err + } + + return client, nil +} + +func (s *CloudLoggingAPISource) getLogNameFilter(projectId string, logTypes []string, startTime time.Time) string { + requestsLog := fmt.Sprintf(`"projects/%s/logs/requests"`, projectId) + timePart := fmt.Sprintf(`AND (timestamp > "%s")`, startTime.Format(time.RFC3339Nano)) + + // short-circuit default + if len(logTypes) == 0 { + return fmt.Sprintf("logName=%s %s", requestsLog, timePart) + } + + // Only request logs supported at implementation. Append additional cases for other log types as needed + var selected []string + for _, logType := range logTypes { + switch logType { + case "requests": + selected = append(selected, requestsLog) + } + } + + switch len(selected) { + case 0: + return fmt.Sprintf("logName=%s %s", requestsLog, timePart) + case 1: + return fmt.Sprintf("logName=%s %s", selected[0], timePart) + default: + return fmt.Sprintf("logName=(%s) %s", strings.Join(selected, " OR "), timePart) + } +} diff --git a/sources/cloud_logging_api/cloud_logging_api_source_config.go b/sources/cloud_logging_api/cloud_logging_api_source_config.go new file mode 100644 index 0000000..487c46f --- /dev/null +++ b/sources/cloud_logging_api/cloud_logging_api_source_config.go @@ -0,0 +1,37 @@ +package cloud_logging_api + +import ( + "fmt" + "strings" + + "github.com/hashicorp/hcl/v2" +) + +type CloudLoggingAPISourceConfig struct { + // required to allow partial decoding + Remain hcl.Body `hcl:",remain" json:"-"` + LogTypes []string `hcl:"log_types,optional" json:"log_types"` +} + +func (a *CloudLoggingAPISourceConfig) Validate() error { + validLogTypes := []string{"requests"} // Currently, only "requests" is supported, but keeping this a list for future expansion + + for _, logType := range a.LogTypes { + isValid := false + for _, validType := range validLogTypes { + if logType == validType { + isValid = true + break + } + } + + if !isValid { + return fmt.Errorf("invalid log type %q, valid log types are %s", logType, strings.Join(validLogTypes, ", ")) + } + } + return nil +} + +func (a *CloudLoggingAPISourceConfig) Identifier() string { + return CloudLoggingAPISourceIdentifier +} diff --git a/tables/requests_log/requests_log.go b/tables/requests_log/requests_log.go new file mode 100644 index 0000000..25d9c47 --- /dev/null +++ b/tables/requests_log/requests_log.go @@ -0,0 +1,107 @@ +package requests_log + +import ( + "time" + + "github.com/turbot/tailpipe-plugin-sdk/schema" +) + +// RequestsLog represents an enriched row ready for parquet writing +type RequestsLog struct { + // embed required enrichment fields + schema.CommonFields + + // Mandatory fields + Timestamp time.Time `json:"timestamp"` + ReceiveTimestamp time.Time `json:"receive_timestamp"` + LogName string `json:"log_name"` + InsertId string `json:"insert_id"` + Severity string `json:"severity"` + Trace string `json:"trace_id"` + SpanId string `json:"span_id"` + TraceSampled bool `json:"trace_sampled"` + + // the json payload fields from the requests log, moved to the top level + BackendTargetProjectNumber string `json:"backend_target_project_number,omitempty"` + CacheDecision []string `json:"cache_decision,omitempty"` + RemoteIp string `json:"remote_ip,omitempty"` + StatusDetails string `json:"status_details,omitempty"` + EnforcedSecurityPolicy *RequestLogSecurityPolicy `json:"enforced_security_policy" parquet:"type=JSON"` + PreviewSecurityPolicy *RequestLogSecurityPolicy `json:"preview_security_policy,omitempty" parquet:"type=JSON"` + SecurityPolicyRequestData *RequestLogSecurityPolicyRequestData `json:"security_policy_request_data,omitempty" parquet:"type=JSON"` + + // other top level fields + Resource *RequestLogResource `json:"resource,omitempty" parquet:"type=JSON"` + HttpRequest *RequestLogHttpRequest `json:"http_request,omitempty" parquet:"type=JSON"` +} + +func NewRequestsLog() *RequestsLog { + return &RequestsLog{} +} + +type RequestLogResource struct { + Type string `json:"type,omitempty"` + Labels map[string]string `json:"labels,omitempty" parquet:"type=JSON"` +} + +type RequestLogHttpRequest struct { + RequestMethod string `json:"request_method,omitempty"` + RequestUrl string `json:"request_url,omitempty"` + RequestSize string `json:"request_size,omitempty"` + Referer string `json:"referer,omitempty"` + Status int32 `json:"status,omitempty"` + ResponseSize string `json:"response_size,omitempty"` + RemoteIp string `json:"remote_ip,omitempty"` + ServerIp string `json:"server_ip,omitempty"` + Latency string `json:"latency,omitempty"` + Protocol string `json:"protocol,omitempty"` + CacheHit bool `json:"cache_hit,omitempty"` + CacheLookup bool `json:"cache_lookup,omitempty"` + CacheValidatedWithOriginServer bool `json:"cache_validated_with_origin_server,omitempty"` + CacheFillBytes string `json:"cache_fill_bytes,omitempty"` + UserAgent string `json:"user_agent,omitempty"` +} + +type RequestLogSecurityPolicy struct { + ConfiguredAction string `json:"configured_action,omitempty"` + Name string `json:"name,omitempty"` + Outcome string `json:"outcome,omitempty"` + Priority int `json:"priority,omitempty"` + PreconfiguredExpressionIds []string `json:"preconfigured_expression_ids,omitempty"` +} + +type RequestLogSecurityPolicyRequestData struct { + RemoteIpInfo *RequestLogRemoteIpInfo `json:"remote_ip_info,omitempty"` + TlsJa3Fingerprint string `json:"tls_ja3_fingerprint,omitempty"` + TlsJa4Fingerprint string `json:"tls_ja4_fingerprint,omitempty"` +} + +type RequestLogRemoteIpInfo struct { + Asn int `json:"asn,omitempty"` + RegionCode string `json:"region_code,omitempty"` +} + +func (a *RequestsLog) GetColumnDescriptions() map[string]string { + return map[string]string{ + "timestamp": "The date and time when the request was received, in ISO 8601 format.", + "receive_timestamp": "The time when the log entry was received by Cloud Logging.", + "log_name": "The name of the log that recorded the request, e.g., 'projects/[PROJECT_ID]/logs/requests'.", + "insert_id": "A unique identifier for the log entry, used to prevent duplicate log entries.", + "severity": "The severity level of the log entry (e.g., 'INFO', 'WARNING', 'ERROR', 'CRITICAL').", + "trace_id": "The unique trace ID associated with the request, used for distributed tracing.", + "trace_sampled": "Indicates whether the request trace was sampled for analysis (true or false).", + "span_id": "The span ID for the request, used in distributed tracing to identify specific operations.", + "resource": "The monitored resource associated with the log entry, including type and labels.", + "http_request": "Details about the HTTP request associated with the log entry, if available (present in application load balancer logs).", + "backend_target_project_number": "The project number of the backend target.", + "cache_decision": "A list of cache decisions made for the request.", + "enforced_security_policy": "Details about the enforced security policy for the request.", + "preview_security_policy": "Details about the preview security policy for the request, if any.", + "security_policy_request_data": "Additional data about the security policy request.", + "remote_ip": "The remote IP address from which the request originated.", + "status_details": "Additional status details for the request.", + + // Override table specific tp_* column descriptions + "tp_index": "The GCP project.", + } +} diff --git a/tables/requests_log/requests_log_mapper.go b/tables/requests_log/requests_log_mapper.go new file mode 100644 index 0000000..0ef3c46 --- /dev/null +++ b/tables/requests_log/requests_log_mapper.go @@ -0,0 +1,335 @@ +//nolint:staticcheck +package requests_log + +import ( + "context" + "encoding/json" + "fmt" + "strconv" + "time" + + // for debugging + "os" + + loggingpb "cloud.google.com/go/logging/apiv2/loggingpb" + + "github.com/turbot/tailpipe-plugin-sdk/mappers" +) + +func dumpRow(row *RequestsLog) { + f, err := os.Create("/tmp/row_debug.json") + if err == nil { + defer f.Close() + enc := json.NewEncoder(f) + enc.SetIndent("", " ") + enc.Encode(row) + } +} + +type RequestsLogMapper struct { +} + +func (m *RequestsLogMapper) Identifier() string { + return "gcp_requests_log_mapper" +} + +func (m *RequestsLogMapper) Map(_ context.Context, a any, _ ...mappers.MapOption[*RequestsLog]) (*RequestsLog, error) { + switch v := a.(type) { + case string: + return mapFromBucketJson([]byte(v)) + case *loggingpb.LogEntry: + return mapFromSDKType(v) + case []byte: + return mapFromBucketJson(v) + default: + return nil, fmt.Errorf("expected loggingpb.LogEntry, string or []byte, got %T", a) + } +} + +func mapFromSDKType(item *loggingpb.LogEntry) (*RequestsLog, error) { + // === 1. Early exit for non-HTTP(S) logs or those missing a payload === + if item.GetHttpRequest() == nil || item.GetJsonPayload() == nil { + return nil, nil + } + + row := NewRequestsLog() + + // === 2. Map common LogEntry fields === + row.Timestamp = item.GetTimestamp().AsTime() + row.LogName = item.GetLogName() + row.InsertId = item.GetInsertId() + row.Severity = item.GetSeverity().String() + row.ReceiveTimestamp = item.GetReceiveTimestamp().AsTime() + row.Trace = item.GetTrace() + row.SpanId = item.GetSpanId() + row.TraceSampled = item.GetTraceSampled() + + // === 3. Map Resource === + if item.GetResource() != nil { + row.Resource = &RequestLogResource{ + Type: item.GetResource().GetType(), + Labels: item.GetResource().GetLabels(), + } + } + + // === 4. Map JsonPayload (for LB/Cloud Armor specific data) === + jsonPayload := item.GetJsonPayload().AsMap() + row.BackendTargetProjectNumber = jsonPayload["backendTargetProjectNumber"].(string) + // Handle CacheDecision specifically: + if rawCacheDecision, ok := jsonPayload["cacheDecision"].([]interface{}); ok { + // Iterate over the []interface{} and append string elements to row.CacheDecision + for _, v := range rawCacheDecision { + if s, ok := v.(string); ok { + row.CacheDecision = append(row.CacheDecision, s) + } + } + } + row.RemoteIp = jsonPayload["remoteIp"].(string) + row.StatusDetails = jsonPayload["statusDetails"].(string) + + securityPolicyMap := jsonPayload["enforcedSecurityPolicy"].(map[string]interface{}) + + row.EnforcedSecurityPolicy = &RequestLogSecurityPolicy{ + // Direct assignments for guaranteed scalar fields + ConfiguredAction: securityPolicyMap["configuredAction"].(string), + Name: securityPolicyMap["name"].(string), + Outcome: securityPolicyMap["outcome"].(string), + Priority: int(securityPolicyMap["priority"].(float64)), // JSON numbers are float64 + } + + // Handle PreconfiguredExpressionIds only if it exists *and* has values. + if rawIds, ok := securityPolicyMap["preconfiguredExpressionIds"].([]interface{}); ok && len(rawIds) > 0 { + row.EnforcedSecurityPolicy.PreconfiguredExpressionIds = make([]string, 0, len(rawIds)) + for _, id := range rawIds { + row.EnforcedSecurityPolicy.PreconfiguredExpressionIds = append(row.EnforcedSecurityPolicy.PreconfiguredExpressionIds, id.(string)) + } + } + + if previewPolicyMap, ok := jsonPayload["previewSecurityPolicy"].(map[string]interface{}); ok { + // If it exists, initialize the PreviewSecurityPolicy struct + row.PreviewSecurityPolicy = &RequestLogSecurityPolicy{ + // Direct assignments for its guaranteed scalar fields within previewPolicyMap + ConfiguredAction: previewPolicyMap["configuredAction"].(string), + Name: previewPolicyMap["name"].(string), + Outcome: previewPolicyMap["outcome"].(string), + Priority: int(previewPolicyMap["priority"].(float64)), // JSON numbers are float64 + } + + // Handle PreconfiguredExpressionIds within PreviewSecurityPolicy only if it exists and has values. + if rawIds, ok := previewPolicyMap["preconfiguredExpressionIds"].([]interface{}); ok && len(rawIds) > 0 { + row.PreviewSecurityPolicy.PreconfiguredExpressionIds = make([]string, 0, len(rawIds)) + for _, id := range rawIds { + row.PreviewSecurityPolicy.PreconfiguredExpressionIds = append(row.PreviewSecurityPolicy.PreconfiguredExpressionIds, id.(string)) + } + } + } + + // === 5. Map HTTPRequest (guaranteed to be present due to early exit) === + // No 'if' check needed here for item.GetHttpRequest() because we already filtered. + httpRequestPb := item.GetHttpRequest() + row.HttpRequest = &RequestLogHttpRequest{ + RequestMethod: httpRequestPb.GetRequestMethod(), + RequestUrl: httpRequestPb.GetRequestUrl(), + RequestSize: strconv.FormatInt(httpRequestPb.GetRequestSize(), 10), + Referer: httpRequestPb.GetReferer(), + UserAgent: httpRequestPb.GetUserAgent(), + Status: httpRequestPb.GetStatus(), + ResponseSize: strconv.FormatInt(httpRequestPb.GetResponseSize(), 10), + RemoteIp: httpRequestPb.GetRemoteIp(), + Latency: httpRequestPb.GetLatency().String(), + ServerIp: httpRequestPb.GetServerIp(), + Protocol: httpRequestPb.GetProtocol(), + CacheFillBytes: strconv.FormatInt(httpRequestPb.GetCacheFillBytes(), 10), + CacheLookup: httpRequestPb.GetCacheLookup(), + CacheHit: httpRequestPb.GetCacheHit(), + CacheValidatedWithOriginServer: httpRequestPb.GetCacheValidatedWithOriginServer(), + } + + return row, nil +} + +func mapFromBucketJson(itemBytes []byte) (*RequestsLog, error) { + var log requestsLog + if err := json.Unmarshal(itemBytes, &log); err != nil { + return nil, fmt.Errorf("failed to parse requests log JSON: %w", err) + } + + // Filter out log entries that are not HTTP requests. + if log.HttpRequest == nil || log.JsonPayload == nil { + return nil, nil + } + + row := NewRequestsLog() + + // Map top-level fields + row.Timestamp = log.Timestamp + row.ReceiveTimestamp = log.ReceiveTimestamp + row.LogName = log.LogName + row.InsertId = log.InsertId + row.Severity = log.Severity + row.Trace = log.Trace + row.SpanId = log.SpanId + row.TraceSampled = log.TraceSampled + + // FIX: Only create objects if they exist in the source log. + // This avoids creating empty-but-non-nil objects that the downstream + // validator might reject. + + if log.Resource != nil { + row.Resource = &RequestLogResource{ + Type: log.Resource.Type, + Labels: func() map[string]string { + if log.Resource.Labels != nil { + return log.Resource.Labels + } + return make(map[string]string) + }(), + } + } + + // Map JSON Payload fields + row.BackendTargetProjectNumber = log.JsonPayload.BackendTargetProjectNumber + row.CacheDecision = log.JsonPayload.CacheDecision + row.RemoteIp = log.JsonPayload.RemoteIp + row.StatusDetails = log.JsonPayload.StatusDetails + + if log.JsonPayload.EnforcedSecurityPolicy != nil { + ids := []string{} + if log.JsonPayload.EnforcedSecurityPolicy.PreconfiguredExpressionIds != nil { + ids = log.JsonPayload.EnforcedSecurityPolicy.PreconfiguredExpressionIds + } + row.EnforcedSecurityPolicy = &RequestLogSecurityPolicy{ + ConfiguredAction: log.JsonPayload.EnforcedSecurityPolicy.ConfiguredAction, + Name: log.JsonPayload.EnforcedSecurityPolicy.Name, + Outcome: log.JsonPayload.EnforcedSecurityPolicy.Outcome, + Priority: log.JsonPayload.EnforcedSecurityPolicy.Priority, + PreconfiguredExpressionIds: ids, + } + } + + if log.JsonPayload.PreviewSecurityPolicy != nil { + ids := []string{} + if log.JsonPayload.PreviewSecurityPolicy.PreconfiguredExpressionIds != nil { + ids = log.JsonPayload.PreviewSecurityPolicy.PreconfiguredExpressionIds + } + row.PreviewSecurityPolicy = &RequestLogSecurityPolicy{ + ConfiguredAction: log.JsonPayload.PreviewSecurityPolicy.ConfiguredAction, + Name: log.JsonPayload.PreviewSecurityPolicy.Name, + Outcome: log.JsonPayload.PreviewSecurityPolicy.Outcome, + Priority: log.JsonPayload.PreviewSecurityPolicy.Priority, + PreconfiguredExpressionIds: ids, + } + } + + if log.JsonPayload.SecurityPolicyRequestData != nil { + row.SecurityPolicyRequestData = &RequestLogSecurityPolicyRequestData{ + TlsJa3Fingerprint: log.JsonPayload.SecurityPolicyRequestData.TlsJa3Fingerprint, + TlsJa4Fingerprint: log.JsonPayload.SecurityPolicyRequestData.TlsJa4Fingerprint, + // Always initialize the nested object to be safe. + RemoteIpInfo: &RequestLogRemoteIpInfo{}, + } + + if log.JsonPayload.SecurityPolicyRequestData.RemoteIpInfo != nil { + row.SecurityPolicyRequestData.RemoteIpInfo = &RequestLogRemoteIpInfo{ + Asn: log.JsonPayload.SecurityPolicyRequestData.RemoteIpInfo.Asn, + RegionCode: log.JsonPayload.SecurityPolicyRequestData.RemoteIpInfo.RegionCode, + } + } + } + + // HttpRequest is guaranteed non-nil by the filter at the top of the function. + row.HttpRequest = &RequestLogHttpRequest{ + RequestMethod: log.HttpRequest.RequestMethod, + RequestUrl: log.HttpRequest.RequestURL, + RequestSize: log.HttpRequest.RequestSize, + Status: log.HttpRequest.Status, + ResponseSize: log.HttpRequest.ResponseSize, + UserAgent: log.HttpRequest.UserAgent, + RemoteIp: log.HttpRequest.RemoteIP, + ServerIp: log.HttpRequest.ServerIP, + Referer: log.HttpRequest.Referer, + Latency: log.HttpRequest.Latency, + CacheLookup: log.HttpRequest.CacheLookup, + CacheHit: log.HttpRequest.CacheHit, + CacheValidatedWithOriginServer: log.HttpRequest.CacheValidatedWithOriginServer, + CacheFillBytes: log.HttpRequest.CacheFillBytes, + } + + // dumpRow(row) + + return row, nil +} + +type requestsLog struct { + InsertId string `json:"insertId"` + LogName string `json:"logName"` + Resource *resource `json:"resource,omitempty"` + Timestamp time.Time `json:"timestamp"` + Severity string `json:"severity"` + JsonPayload *jsonPayload `json:"jsonPayload,omitempty"` + ReceiveTimestamp time.Time `json:"receiveTimestamp"` + Trace string `json:"trace,omitempty"` + SpanId string `json:"spanId,omitempty"` + HttpRequest *httpRequest `json:"httpRequest,omitempty"` + TraceSampled bool `json:"traceSampled,omitempty"` +} + +type resource struct { + Type string `json:"type"` + Labels map[string]string `json:"labels"` +} + +type jsonPayload struct { + TypeName string `json:"@type"` + BackendTargetProjectNumber string `json:"backendTargetProjectNumber"` + CacheDecision []string `json:"cacheDecision"` + EnforcedSecurityPolicy *requestLogEnforcedSecurityPolicy `json:"enforcedSecurityPolicy"` + PreviewSecurityPolicy *requestLogPreviewSecurityPolicy `json:"previewSecurityPolicy,omitempty"` + SecurityPolicyRequestData *requestLogSecurityPolicyRequestData `json:"securityPolicyRequestData"` + RemoteIp string `json:"remoteIp"` + StatusDetails string `json:"statusDetails"` +} + +type httpRequest struct { + RequestMethod string `json:"requestMethod"` + RequestURL string `json:"requestUrl"` + RequestSize string `json:"requestSize,omitempty"` + Status int32 `json:"status"` + ResponseSize string `json:"responseSize,omitempty"` + UserAgent string `json:"userAgent"` + RemoteIP string `json:"remoteIp"` + ServerIP string `json:"serverIp,omitempty"` + Referer string `json:"referer,omitempty"` + Latency string `json:"latency,omitempty"` + CacheLookup bool `json:"cacheLookup,omitempty"` + CacheHit bool `json:"cacheHit,omitempty"` + CacheValidatedWithOriginServer bool `json:"cacheValidatedWithOriginServer,omitempty"` + CacheFillBytes string `json:"cacheFillBytes,omitempty"` +} + +type requestLogEnforcedSecurityPolicy struct { + ConfiguredAction string `json:"configuredAction"` + Name string `json:"name"` + Outcome string `json:"outcome"` + Priority int `json:"priority"` + PreconfiguredExpressionIds []string `json:"preconfiguredExpressionIds,omitempty"` +} + +type requestLogPreviewSecurityPolicy struct { + ConfiguredAction string `json:"configuredAction"` + Name string `json:"name"` + Outcome string `json:"outcome"` + Priority int `json:"priority"` + PreconfiguredExpressionIds []string `json:"preconfiguredExpressionIds,omitempty"` +} + +type requestLogSecurityPolicyRequestData struct { + RemoteIpInfo *requestLogRemoteIpInfo `json:"remoteIpInfo"` + TlsJa3Fingerprint string `json:"tlsJa3Fingerprint"` + TlsJa4Fingerprint string `json:"tlsJa4Fingerprint"` +} + +type requestLogRemoteIpInfo struct { + Asn int `json:"asn"` + RegionCode string `json:"regionCode"` +} diff --git a/tables/requests_log/requests_log_table.go b/tables/requests_log/requests_log_table.go new file mode 100644 index 0000000..918d88e --- /dev/null +++ b/tables/requests_log/requests_log_table.go @@ -0,0 +1,99 @@ +package requests_log + +import ( + "time" + + "github.com/rs/xid" + + "github.com/turbot/pipe-fittings/v2/utils" + "github.com/turbot/tailpipe-plugin-gcp/sources/cloud_logging_api" + "github.com/turbot/tailpipe-plugin-gcp/sources/storage_bucket" + "github.com/turbot/tailpipe-plugin-sdk/artifact_source" + "github.com/turbot/tailpipe-plugin-sdk/artifact_source_config" + "github.com/turbot/tailpipe-plugin-sdk/constants" + "github.com/turbot/tailpipe-plugin-sdk/row_source" + "github.com/turbot/tailpipe-plugin-sdk/schema" + "github.com/turbot/tailpipe-plugin-sdk/table" +) + +const RequestsLogTableIdentifier string = "gcp_requests_log" + +type RequestsLogTable struct { +} + +func (c *RequestsLogTable) Identifier() string { + return RequestsLogTableIdentifier +} + +func (c *RequestsLogTable) GetSourceMetadata() ([]*table.SourceMetadata[*RequestsLog], error) { + defaultArtifactConfig := &artifact_source_config.ArtifactSourceConfigImpl{ + FileLayout: utils.ToStringPointer("requests/%{YEAR:year}/%{MONTHNUM:month}/%{MONTHDAY:day}/%{HOUR:hour}:%{MINUTE:minute}:%{SECOND:second}_%{DATA:end_time}_%{DATA:suffix}.json"), + } + + return []*table.SourceMetadata[*RequestsLog]{ + { + SourceName: cloud_logging_api.CloudLoggingAPISourceIdentifier, + Mapper: &RequestsLogMapper{}, + }, + { + SourceName: storage_bucket.GcpStorageBucketSourceIdentifier, + Mapper: &RequestsLogMapper{}, + Options: []row_source.RowSourceOption{ + artifact_source.WithDefaultArtifactSourceConfig(defaultArtifactConfig), + artifact_source.WithRowPerLine(), + }, + }, + { + SourceName: constants.ArtifactSourceIdentifier, + Mapper: &RequestsLogMapper{}, + Options: []row_source.RowSourceOption{ + artifact_source.WithDefaultArtifactSourceConfig(defaultArtifactConfig), + }, + }, + }, nil +} + +func (c *RequestsLogTable) EnrichRow(row *RequestsLog, sourceEnrichmentFields schema.SourceEnrichment) (*RequestsLog, error) { + if row == nil { + return nil, nil + } + + row.CommonFields = sourceEnrichmentFields.CommonFields + + row.TpID = xid.New().String() + row.TpTimestamp = row.Timestamp + row.TpIngestTimestamp = time.Now() + row.TpIndex = schema.DefaultIndex + row.TpDate = row.Timestamp.Truncate(24 * time.Hour) + + // Ensure TpIps is always initialized (even if empty) + if row.TpIps == nil { + row.TpIps = []string{} + } + + // Set TpDestinationIP and TpSourceIP to non-nil pointers, even if empty + emptyStr := "" + if row.HttpRequest != nil { + if row.HttpRequest.RemoteIp != "" { + row.TpIps = append(row.TpIps, row.HttpRequest.RemoteIp) + row.TpSourceIP = &row.HttpRequest.RemoteIp + } else { + row.TpSourceIP = &emptyStr + } + if row.HttpRequest.ServerIp != "" { + row.TpIps = append(row.TpIps, row.HttpRequest.ServerIp) + row.TpDestinationIP = &row.HttpRequest.ServerIp + } else { + row.TpDestinationIP = &emptyStr + } + } else { + row.TpDestinationIP = &emptyStr + row.TpSourceIP = &emptyStr + } + dumpRow(row) + return row, nil +} + +func (c *RequestsLogTable) GetDescription() string { + return "GCP Request Logs track requests to Google Cloud services including application load balancer logs and Cloud Armor logs, capturing request events for security and compliance monitoring." +} From 389933eb9f281e132145c36be361b090faffa329 Mon Sep 17 00:00:00 2001 From: Bryan Grimes Date: Tue, 24 Jun 2025 16:26:35 -0400 Subject: [PATCH 2/9] corrected nil,nil non-http validation error and added a little more safe type checking just in case --- tables/requests_log/requests_log_mapper.go | 112 +++++++++++++++++---- 1 file changed, 91 insertions(+), 21 deletions(-) diff --git a/tables/requests_log/requests_log_mapper.go b/tables/requests_log/requests_log_mapper.go index 0ef3c46..e233b23 100644 --- a/tables/requests_log/requests_log_mapper.go +++ b/tables/requests_log/requests_log_mapper.go @@ -49,7 +49,8 @@ func (m *RequestsLogMapper) Map(_ context.Context, a any, _ ...mappers.MapOption func mapFromSDKType(item *loggingpb.LogEntry) (*RequestsLog, error) { // === 1. Early exit for non-HTTP(S) logs or those missing a payload === if item.GetHttpRequest() == nil || item.GetJsonPayload() == nil { - return nil, nil + // Return an error to skip this row instead of returning nil + return nil, fmt.Errorf("skipping non-HTTP request log entry") } row := NewRequestsLog() @@ -74,7 +75,15 @@ func mapFromSDKType(item *loggingpb.LogEntry) (*RequestsLog, error) { // === 4. Map JsonPayload (for LB/Cloud Armor specific data) === jsonPayload := item.GetJsonPayload().AsMap() - row.BackendTargetProjectNumber = jsonPayload["backendTargetProjectNumber"].(string) + if jsonPayload == nil { + jsonPayload = make(map[string]interface{}) + } + + // Safely extract string fields with type checking + if v, ok := jsonPayload["backendTargetProjectNumber"].(string); ok { + row.BackendTargetProjectNumber = v + } + // Handle CacheDecision specifically: if rawCacheDecision, ok := jsonPayload["cacheDecision"].([]interface{}); ok { // Iterate over the []interface{} and append string elements to row.CacheDecision @@ -84,42 +93,97 @@ func mapFromSDKType(item *loggingpb.LogEntry) (*RequestsLog, error) { } } } - row.RemoteIp = jsonPayload["remoteIp"].(string) - row.StatusDetails = jsonPayload["statusDetails"].(string) + + if v, ok := jsonPayload["remoteIp"].(string); ok { + row.RemoteIp = v + } + + if v, ok := jsonPayload["statusDetails"].(string); ok { + row.StatusDetails = v + } - securityPolicyMap := jsonPayload["enforcedSecurityPolicy"].(map[string]interface{}) + // Safely extract security policy map + securityPolicyMap, hasPolicy := jsonPayload["enforcedSecurityPolicy"].(map[string]interface{}) + if !hasPolicy { + // Skip security policy mapping if not present + goto skipEnforcedPolicy + } - row.EnforcedSecurityPolicy = &RequestLogSecurityPolicy{ - // Direct assignments for guaranteed scalar fields - ConfiguredAction: securityPolicyMap["configuredAction"].(string), - Name: securityPolicyMap["name"].(string), - Outcome: securityPolicyMap["outcome"].(string), - Priority: int(securityPolicyMap["priority"].(float64)), // JSON numbers are float64 + row.EnforcedSecurityPolicy = &RequestLogSecurityPolicy{} + + // Safely extract fields with type checking + if v, ok := securityPolicyMap["configuredAction"].(string); ok { + row.EnforcedSecurityPolicy.ConfiguredAction = v + } + if v, ok := securityPolicyMap["name"].(string); ok { + row.EnforcedSecurityPolicy.Name = v + } + if v, ok := securityPolicyMap["outcome"].(string); ok { + row.EnforcedSecurityPolicy.Outcome = v + } + if v, ok := securityPolicyMap["priority"].(float64); ok { + row.EnforcedSecurityPolicy.Priority = int(v) } // Handle PreconfiguredExpressionIds only if it exists *and* has values. if rawIds, ok := securityPolicyMap["preconfiguredExpressionIds"].([]interface{}); ok && len(rawIds) > 0 { row.EnforcedSecurityPolicy.PreconfiguredExpressionIds = make([]string, 0, len(rawIds)) for _, id := range rawIds { - row.EnforcedSecurityPolicy.PreconfiguredExpressionIds = append(row.EnforcedSecurityPolicy.PreconfiguredExpressionIds, id.(string)) + if idStr, ok := id.(string); ok { + row.EnforcedSecurityPolicy.PreconfiguredExpressionIds = append(row.EnforcedSecurityPolicy.PreconfiguredExpressionIds, idStr) + } } } +skipEnforcedPolicy: + if previewPolicyMap, ok := jsonPayload["previewSecurityPolicy"].(map[string]interface{}); ok { // If it exists, initialize the PreviewSecurityPolicy struct - row.PreviewSecurityPolicy = &RequestLogSecurityPolicy{ - // Direct assignments for its guaranteed scalar fields within previewPolicyMap - ConfiguredAction: previewPolicyMap["configuredAction"].(string), - Name: previewPolicyMap["name"].(string), - Outcome: previewPolicyMap["outcome"].(string), - Priority: int(previewPolicyMap["priority"].(float64)), // JSON numbers are float64 + row.PreviewSecurityPolicy = &RequestLogSecurityPolicy{} + + // Safely extract fields with type checking + if v, ok := previewPolicyMap["configuredAction"].(string); ok { + row.PreviewSecurityPolicy.ConfiguredAction = v + } + if v, ok := previewPolicyMap["name"].(string); ok { + row.PreviewSecurityPolicy.Name = v + } + if v, ok := previewPolicyMap["outcome"].(string); ok { + row.PreviewSecurityPolicy.Outcome = v + } + if v, ok := previewPolicyMap["priority"].(float64); ok { + row.PreviewSecurityPolicy.Priority = int(v) } // Handle PreconfiguredExpressionIds within PreviewSecurityPolicy only if it exists and has values. if rawIds, ok := previewPolicyMap["preconfiguredExpressionIds"].([]interface{}); ok && len(rawIds) > 0 { row.PreviewSecurityPolicy.PreconfiguredExpressionIds = make([]string, 0, len(rawIds)) for _, id := range rawIds { - row.PreviewSecurityPolicy.PreconfiguredExpressionIds = append(row.PreviewSecurityPolicy.PreconfiguredExpressionIds, id.(string)) + if idStr, ok := id.(string); ok { + row.PreviewSecurityPolicy.PreconfiguredExpressionIds = append(row.PreviewSecurityPolicy.PreconfiguredExpressionIds, idStr) + } + } + } + } + + // Map SecurityPolicyRequestData if present + if secPolicyData, ok := jsonPayload["securityPolicyRequestData"].(map[string]interface{}); ok { + row.SecurityPolicyRequestData = &RequestLogSecurityPolicyRequestData{} + + if v, ok := secPolicyData["tlsJa3Fingerprint"].(string); ok { + row.SecurityPolicyRequestData.TlsJa3Fingerprint = v + } + if v, ok := secPolicyData["tlsJa4Fingerprint"].(string); ok { + row.SecurityPolicyRequestData.TlsJa4Fingerprint = v + } + + if remoteIpInfo, ok := secPolicyData["remoteIpInfo"].(map[string]interface{}); ok { + row.SecurityPolicyRequestData.RemoteIpInfo = &RequestLogRemoteIpInfo{} + if v, ok := remoteIpInfo["asn"].(float64); ok { + row.SecurityPolicyRequestData.RemoteIpInfo.Asn = int(v) + } + if v, ok := remoteIpInfo["regionCode"].(string); ok { + row.SecurityPolicyRequestData.RemoteIpInfo.RegionCode = v } } } @@ -136,7 +200,12 @@ func mapFromSDKType(item *loggingpb.LogEntry) (*RequestsLog, error) { Status: httpRequestPb.GetStatus(), ResponseSize: strconv.FormatInt(httpRequestPb.GetResponseSize(), 10), RemoteIp: httpRequestPb.GetRemoteIp(), - Latency: httpRequestPb.GetLatency().String(), + Latency: func() string { + if lat := httpRequestPb.GetLatency(); lat != nil { + return lat.String() + } + return "" + }(), ServerIp: httpRequestPb.GetServerIp(), Protocol: httpRequestPb.GetProtocol(), CacheFillBytes: strconv.FormatInt(httpRequestPb.GetCacheFillBytes(), 10), @@ -156,7 +225,8 @@ func mapFromBucketJson(itemBytes []byte) (*RequestsLog, error) { // Filter out log entries that are not HTTP requests. if log.HttpRequest == nil || log.JsonPayload == nil { - return nil, nil + // Return an error to skip this row instead of returning nil + return nil, fmt.Errorf("skipping non-HTTP request log entry") } row := NewRequestsLog() From 06ae3818473f39c6c79d447c7ec846fea28b76cf Mon Sep 17 00:00:00 2001 From: Jason Mogavero Date: Tue, 24 Jun 2025 17:42:47 -0400 Subject: [PATCH 3/9] working extraction with api and bucket --- .../cloud_logging_api_source.go | 26 ++---- tables/requests_log/requests_log_mapper.go | 93 ++++++++++--------- 2 files changed, 55 insertions(+), 64 deletions(-) diff --git a/sources/cloud_logging_api/cloud_logging_api_source.go b/sources/cloud_logging_api/cloud_logging_api_source.go index 857891a..7edf51d 100644 --- a/sources/cloud_logging_api/cloud_logging_api_source.go +++ b/sources/cloud_logging_api/cloud_logging_api_source.go @@ -8,11 +8,8 @@ import ( "time" "cloud.google.com/go/logging" - loggingpb "cloud.google.com/go/logging/apiv2/loggingpb" "cloud.google.com/go/logging/logadmin" "google.golang.org/api/iterator" - proto "google.golang.org/protobuf/proto" - "google.golang.org/protobuf/types/known/anypb" "github.com/turbot/tailpipe-plugin-gcp/config" "github.com/turbot/tailpipe-plugin-sdk/collection_state" @@ -65,6 +62,7 @@ func (s *CloudLoggingAPISource) Collect(ctx context.Context) error { filter := s.getLogNameFilter(project, logTypes, s.FromTime) // TODO: #ratelimit implement rate limiting + // logEntry will now be the higher-level logging.Entry var logEntry *logging.Entry it := client.Entries(ctx, logadmin.Filter(filter), logadmin.PageSize(250)) @@ -78,27 +76,19 @@ func (s *CloudLoggingAPISource) Collect(ctx context.Context) error { } if logEntry.Payload != nil { - var protoLogEntry loggingpb.LogEntry - // Unmarshal the anypb.Any into loggingpb.LogEntry - if anyPayload, ok := logEntry.Payload.(*anypb.Any); ok { - // If the assertion is successful, 'anyPayload' is now of type *anypb.Any - // and can be used with anypb.UnmarshalTo. - err := anypb.UnmarshalTo(anyPayload, &protoLogEntry, proto.UnmarshalOptions{}) - if err != nil { - return fmt.Errorf("Warning: Could not unmarshal anypb.Any from Payload to loggingpb.LogEntry for log ID %s: %v", logEntry.InsertID, err) - } + logEntry.LogName = "" // remove logName from the log entry due to ToLogEntry requirements + protoLogEntry, err := logging.ToLogEntry(*logEntry, project) + if err != nil { + return fmt.Errorf("error converting log entry to loggingpb.LogEntry: %w", err) } - // Use pbEntry (the *loggingpb.LogEntry) for collection state and RowData - // Note: CollectionState.ShouldCollect and OnCollected will now use pbEntry's fields - // Ensure pbEntry.Timestamp is converted to time.Time if ShouldCollect expects it. - if s.CollectionState.ShouldCollect(logEntry.InsertID, logEntry.Timestamp) { + if s.CollectionState.ShouldCollect(protoLogEntry.GetInsertId(), protoLogEntry.GetTimestamp().AsTime()) { row := &types.RowData{ - Data: logEntry, // Pass the *loggingpb.LogEntry to the RowData + Data: protoLogEntry, // Pass the *loggingpb.LogEntry to the RowData SourceEnrichment: sourceEnrichmentFields, } - if err = s.CollectionState.OnCollected(logEntry.InsertID, logEntry.Timestamp); err != nil { + if err = s.CollectionState.OnCollected(protoLogEntry.GetInsertId(), protoLogEntry.GetTimestamp().AsTime()); err != nil { return fmt.Errorf("error updating collection state: %w", err) } if err = s.OnRow(ctx, row); err != nil { diff --git a/tables/requests_log/requests_log_mapper.go b/tables/requests_log/requests_log_mapper.go index e233b23..bbce2f1 100644 --- a/tables/requests_log/requests_log_mapper.go +++ b/tables/requests_log/requests_log_mapper.go @@ -4,6 +4,7 @@ package requests_log import ( "context" "encoding/json" + "errors" "fmt" "strconv" "time" @@ -11,11 +12,14 @@ import ( // for debugging "os" + "cloud.google.com/go/logging" loggingpb "cloud.google.com/go/logging/apiv2/loggingpb" "github.com/turbot/tailpipe-plugin-sdk/mappers" ) +var ErrSkipRow = errors.New("skipping row: not an HTTP request") + func dumpRow(row *RequestsLog) { f, err := os.Create("/tmp/row_debug.json") if err == nil { @@ -39,6 +43,8 @@ func (m *RequestsLogMapper) Map(_ context.Context, a any, _ ...mappers.MapOption return mapFromBucketJson([]byte(v)) case *loggingpb.LogEntry: return mapFromSDKType(v) + case *logging.Entry: + return nil, fmt.Errorf("Logging.Entry did not convert to *loggingpb.LogEntry: %T", a) case []byte: return mapFromBucketJson(v) default: @@ -50,7 +56,8 @@ func mapFromSDKType(item *loggingpb.LogEntry) (*RequestsLog, error) { // === 1. Early exit for non-HTTP(S) logs or those missing a payload === if item.GetHttpRequest() == nil || item.GetJsonPayload() == nil { // Return an error to skip this row instead of returning nil - return nil, fmt.Errorf("skipping non-HTTP request log entry") + // return nil, fmt.Errorf("skipping non-HTTP request log entry") + return nil, ErrSkipRow } row := NewRequestsLog() @@ -78,12 +85,12 @@ func mapFromSDKType(item *loggingpb.LogEntry) (*RequestsLog, error) { if jsonPayload == nil { jsonPayload = make(map[string]interface{}) } - + // Safely extract string fields with type checking if v, ok := jsonPayload["backendTargetProjectNumber"].(string); ok { row.BackendTargetProjectNumber = v } - + // Handle CacheDecision specifically: if rawCacheDecision, ok := jsonPayload["cacheDecision"].([]interface{}); ok { // Iterate over the []interface{} and append string elements to row.CacheDecision @@ -93,54 +100,47 @@ func mapFromSDKType(item *loggingpb.LogEntry) (*RequestsLog, error) { } } } - + if v, ok := jsonPayload["remoteIp"].(string); ok { row.RemoteIp = v } - + if v, ok := jsonPayload["statusDetails"].(string); ok { row.StatusDetails = v } // Safely extract security policy map - securityPolicyMap, hasPolicy := jsonPayload["enforcedSecurityPolicy"].(map[string]interface{}) - if !hasPolicy { - // Skip security policy mapping if not present - goto skipEnforcedPolicy - } - - row.EnforcedSecurityPolicy = &RequestLogSecurityPolicy{} - - // Safely extract fields with type checking - if v, ok := securityPolicyMap["configuredAction"].(string); ok { - row.EnforcedSecurityPolicy.ConfiguredAction = v - } - if v, ok := securityPolicyMap["name"].(string); ok { - row.EnforcedSecurityPolicy.Name = v - } - if v, ok := securityPolicyMap["outcome"].(string); ok { - row.EnforcedSecurityPolicy.Outcome = v - } - if v, ok := securityPolicyMap["priority"].(float64); ok { - row.EnforcedSecurityPolicy.Priority = int(v) - } + if securityPolicyMap, ok := jsonPayload["enforcedSecurityPolicy"].(map[string]interface{}); ok && securityPolicyMap != nil { + policy := &RequestLogSecurityPolicy{} + if val, ok := securityPolicyMap["configuredAction"].(string); ok { + policy.ConfiguredAction = val + } + if val, ok := securityPolicyMap["name"].(string); ok { + policy.Name = val + } + if val, ok := securityPolicyMap["outcome"].(string); ok { + policy.Outcome = val + } + if val, ok := securityPolicyMap["priority"].(float64); ok { + policy.Priority = int(val) + } - // Handle PreconfiguredExpressionIds only if it exists *and* has values. - if rawIds, ok := securityPolicyMap["preconfiguredExpressionIds"].([]interface{}); ok && len(rawIds) > 0 { - row.EnforcedSecurityPolicy.PreconfiguredExpressionIds = make([]string, 0, len(rawIds)) - for _, id := range rawIds { - if idStr, ok := id.(string); ok { - row.EnforcedSecurityPolicy.PreconfiguredExpressionIds = append(row.EnforcedSecurityPolicy.PreconfiguredExpressionIds, idStr) + ids := []string{} + if rawIds, ok := securityPolicyMap["preconfiguredExpressionIds"].([]interface{}); ok { + for _, id := range rawIds { + if s, ok := id.(string); ok { + ids = append(ids, s) + } } } + policy.PreconfiguredExpressionIds = ids + row.EnforcedSecurityPolicy = policy } -skipEnforcedPolicy: - if previewPolicyMap, ok := jsonPayload["previewSecurityPolicy"].(map[string]interface{}); ok { // If it exists, initialize the PreviewSecurityPolicy struct row.PreviewSecurityPolicy = &RequestLogSecurityPolicy{} - + // Safely extract fields with type checking if v, ok := previewPolicyMap["configuredAction"].(string); ok { row.PreviewSecurityPolicy.ConfiguredAction = v @@ -169,14 +169,14 @@ skipEnforcedPolicy: // Map SecurityPolicyRequestData if present if secPolicyData, ok := jsonPayload["securityPolicyRequestData"].(map[string]interface{}); ok { row.SecurityPolicyRequestData = &RequestLogSecurityPolicyRequestData{} - + if v, ok := secPolicyData["tlsJa3Fingerprint"].(string); ok { row.SecurityPolicyRequestData.TlsJa3Fingerprint = v } if v, ok := secPolicyData["tlsJa4Fingerprint"].(string); ok { row.SecurityPolicyRequestData.TlsJa4Fingerprint = v } - + if remoteIpInfo, ok := secPolicyData["remoteIpInfo"].(map[string]interface{}); ok { row.SecurityPolicyRequestData.RemoteIpInfo = &RequestLogRemoteIpInfo{} if v, ok := remoteIpInfo["asn"].(float64); ok { @@ -192,14 +192,14 @@ skipEnforcedPolicy: // No 'if' check needed here for item.GetHttpRequest() because we already filtered. httpRequestPb := item.GetHttpRequest() row.HttpRequest = &RequestLogHttpRequest{ - RequestMethod: httpRequestPb.GetRequestMethod(), - RequestUrl: httpRequestPb.GetRequestUrl(), - RequestSize: strconv.FormatInt(httpRequestPb.GetRequestSize(), 10), - Referer: httpRequestPb.GetReferer(), - UserAgent: httpRequestPb.GetUserAgent(), - Status: httpRequestPb.GetStatus(), - ResponseSize: strconv.FormatInt(httpRequestPb.GetResponseSize(), 10), - RemoteIp: httpRequestPb.GetRemoteIp(), + RequestMethod: httpRequestPb.GetRequestMethod(), + RequestUrl: httpRequestPb.GetRequestUrl(), + RequestSize: strconv.FormatInt(httpRequestPb.GetRequestSize(), 10), + Referer: httpRequestPb.GetReferer(), + UserAgent: httpRequestPb.GetUserAgent(), + Status: httpRequestPb.GetStatus(), + ResponseSize: strconv.FormatInt(httpRequestPb.GetResponseSize(), 10), + RemoteIp: httpRequestPb.GetRemoteIp(), Latency: func() string { if lat := httpRequestPb.GetLatency(); lat != nil { return lat.String() @@ -226,7 +226,8 @@ func mapFromBucketJson(itemBytes []byte) (*RequestsLog, error) { // Filter out log entries that are not HTTP requests. if log.HttpRequest == nil || log.JsonPayload == nil { // Return an error to skip this row instead of returning nil - return nil, fmt.Errorf("skipping non-HTTP request log entry") + // return nil, fmt.Errorf("skipping non-HTTP request log entry") + return nil, ErrSkipRow } row := NewRequestsLog() From 90c83c7f0fc609df1e07e7c33ae44e1dfe492576 Mon Sep 17 00:00:00 2001 From: Jason Mogavero Date: Wed, 25 Jun 2025 12:10:06 -0400 Subject: [PATCH 4/9] Implement TCP LB log support, clean up debugging --- .../cloud_logging_api_source.go | 4 +- tables/requests_log/requests_log_mapper.go | 116 +++++++----------- tables/requests_log/requests_log_table.go | 2 +- 3 files changed, 49 insertions(+), 73 deletions(-) diff --git a/sources/cloud_logging_api/cloud_logging_api_source.go b/sources/cloud_logging_api/cloud_logging_api_source.go index 7edf51d..1675926 100644 --- a/sources/cloud_logging_api/cloud_logging_api_source.go +++ b/sources/cloud_logging_api/cloud_logging_api_source.go @@ -76,7 +76,7 @@ func (s *CloudLoggingAPISource) Collect(ctx context.Context) error { } if logEntry.Payload != nil { - logEntry.LogName = "" // remove logName from the log entry due to ToLogEntry requirements + logEntry.LogName = "" // remove logName from the log entry due to ToLogEntry requirements of an empty string protoLogEntry, err := logging.ToLogEntry(*logEntry, project) if err != nil { return fmt.Errorf("error converting log entry to loggingpb.LogEntry: %w", err) @@ -84,7 +84,7 @@ func (s *CloudLoggingAPISource) Collect(ctx context.Context) error { if s.CollectionState.ShouldCollect(protoLogEntry.GetInsertId(), protoLogEntry.GetTimestamp().AsTime()) { row := &types.RowData{ - Data: protoLogEntry, // Pass the *loggingpb.LogEntry to the RowData + Data: protoLogEntry, SourceEnrichment: sourceEnrichmentFields, } diff --git a/tables/requests_log/requests_log_mapper.go b/tables/requests_log/requests_log_mapper.go index bbce2f1..973b4ca 100644 --- a/tables/requests_log/requests_log_mapper.go +++ b/tables/requests_log/requests_log_mapper.go @@ -4,13 +4,11 @@ package requests_log import ( "context" "encoding/json" - "errors" "fmt" "strconv" "time" // for debugging - "os" "cloud.google.com/go/logging" loggingpb "cloud.google.com/go/logging/apiv2/loggingpb" @@ -18,18 +16,6 @@ import ( "github.com/turbot/tailpipe-plugin-sdk/mappers" ) -var ErrSkipRow = errors.New("skipping row: not an HTTP request") - -func dumpRow(row *RequestsLog) { - f, err := os.Create("/tmp/row_debug.json") - if err == nil { - defer f.Close() - enc := json.NewEncoder(f) - enc.SetIndent("", " ") - enc.Encode(row) - } -} - type RequestsLogMapper struct { } @@ -53,12 +39,6 @@ func (m *RequestsLogMapper) Map(_ context.Context, a any, _ ...mappers.MapOption } func mapFromSDKType(item *loggingpb.LogEntry) (*RequestsLog, error) { - // === 1. Early exit for non-HTTP(S) logs or those missing a payload === - if item.GetHttpRequest() == nil || item.GetJsonPayload() == nil { - // Return an error to skip this row instead of returning nil - // return nil, fmt.Errorf("skipping non-HTTP request log entry") - return nil, ErrSkipRow - } row := NewRequestsLog() @@ -188,30 +168,32 @@ func mapFromSDKType(item *loggingpb.LogEntry) (*RequestsLog, error) { } } - // === 5. Map HTTPRequest (guaranteed to be present due to early exit) === - // No 'if' check needed here for item.GetHttpRequest() because we already filtered. - httpRequestPb := item.GetHttpRequest() - row.HttpRequest = &RequestLogHttpRequest{ - RequestMethod: httpRequestPb.GetRequestMethod(), - RequestUrl: httpRequestPb.GetRequestUrl(), - RequestSize: strconv.FormatInt(httpRequestPb.GetRequestSize(), 10), - Referer: httpRequestPb.GetReferer(), - UserAgent: httpRequestPb.GetUserAgent(), - Status: httpRequestPb.GetStatus(), - ResponseSize: strconv.FormatInt(httpRequestPb.GetResponseSize(), 10), - RemoteIp: httpRequestPb.GetRemoteIp(), - Latency: func() string { - if lat := httpRequestPb.GetLatency(); lat != nil { - return lat.String() - } - return "" - }(), - ServerIp: httpRequestPb.GetServerIp(), - Protocol: httpRequestPb.GetProtocol(), - CacheFillBytes: strconv.FormatInt(httpRequestPb.GetCacheFillBytes(), 10), - CacheLookup: httpRequestPb.GetCacheLookup(), - CacheHit: httpRequestPb.GetCacheHit(), - CacheValidatedWithOriginServer: httpRequestPb.GetCacheValidatedWithOriginServer(), + if item.GetHttpRequest() == nil { + row.HttpRequest = &RequestLogHttpRequest{} + } else { + httpRequestPb := item.GetHttpRequest() + row.HttpRequest = &RequestLogHttpRequest{ + RequestMethod: httpRequestPb.GetRequestMethod(), + RequestUrl: httpRequestPb.GetRequestUrl(), + RequestSize: strconv.FormatInt(httpRequestPb.GetRequestSize(), 10), + Referer: httpRequestPb.GetReferer(), + UserAgent: httpRequestPb.GetUserAgent(), + Status: httpRequestPb.GetStatus(), + ResponseSize: strconv.FormatInt(httpRequestPb.GetResponseSize(), 10), + RemoteIp: httpRequestPb.GetRemoteIp(), + Latency: func() string { + if lat := httpRequestPb.GetLatency(); lat != nil { + return lat.String() + } + return "" + }(), + ServerIp: httpRequestPb.GetServerIp(), + Protocol: httpRequestPb.GetProtocol(), + CacheFillBytes: strconv.FormatInt(httpRequestPb.GetCacheFillBytes(), 10), + CacheLookup: httpRequestPb.GetCacheLookup(), + CacheHit: httpRequestPb.GetCacheHit(), + CacheValidatedWithOriginServer: httpRequestPb.GetCacheValidatedWithOriginServer(), + } } return row, nil @@ -223,13 +205,6 @@ func mapFromBucketJson(itemBytes []byte) (*RequestsLog, error) { return nil, fmt.Errorf("failed to parse requests log JSON: %w", err) } - // Filter out log entries that are not HTTP requests. - if log.HttpRequest == nil || log.JsonPayload == nil { - // Return an error to skip this row instead of returning nil - // return nil, fmt.Errorf("skipping non-HTTP request log entry") - return nil, ErrSkipRow - } - row := NewRequestsLog() // Map top-level fields @@ -242,7 +217,7 @@ func mapFromBucketJson(itemBytes []byte) (*RequestsLog, error) { row.SpanId = log.SpanId row.TraceSampled = log.TraceSampled - // FIX: Only create objects if they exist in the source log. + // Only create objects if they exist in the source log. // This avoids creating empty-but-non-nil objects that the downstream // validator might reject. @@ -308,26 +283,27 @@ func mapFromBucketJson(itemBytes []byte) (*RequestsLog, error) { } } - // HttpRequest is guaranteed non-nil by the filter at the top of the function. - row.HttpRequest = &RequestLogHttpRequest{ - RequestMethod: log.HttpRequest.RequestMethod, - RequestUrl: log.HttpRequest.RequestURL, - RequestSize: log.HttpRequest.RequestSize, - Status: log.HttpRequest.Status, - ResponseSize: log.HttpRequest.ResponseSize, - UserAgent: log.HttpRequest.UserAgent, - RemoteIp: log.HttpRequest.RemoteIP, - ServerIp: log.HttpRequest.ServerIP, - Referer: log.HttpRequest.Referer, - Latency: log.HttpRequest.Latency, - CacheLookup: log.HttpRequest.CacheLookup, - CacheHit: log.HttpRequest.CacheHit, - CacheValidatedWithOriginServer: log.HttpRequest.CacheValidatedWithOriginServer, - CacheFillBytes: log.HttpRequest.CacheFillBytes, + if log.HttpRequest == nil { + row.HttpRequest = &RequestLogHttpRequest{} + } else { + row.HttpRequest = &RequestLogHttpRequest{ + RequestMethod: log.HttpRequest.RequestMethod, + RequestUrl: log.HttpRequest.RequestURL, + RequestSize: log.HttpRequest.RequestSize, + Status: log.HttpRequest.Status, + ResponseSize: log.HttpRequest.ResponseSize, + UserAgent: log.HttpRequest.UserAgent, + RemoteIp: log.HttpRequest.RemoteIP, + ServerIp: log.HttpRequest.ServerIP, + Referer: log.HttpRequest.Referer, + Latency: log.HttpRequest.Latency, + CacheLookup: log.HttpRequest.CacheLookup, + CacheHit: log.HttpRequest.CacheHit, + CacheValidatedWithOriginServer: log.HttpRequest.CacheValidatedWithOriginServer, + CacheFillBytes: log.HttpRequest.CacheFillBytes, + } } - // dumpRow(row) - return row, nil } diff --git a/tables/requests_log/requests_log_table.go b/tables/requests_log/requests_log_table.go index 918d88e..468828e 100644 --- a/tables/requests_log/requests_log_table.go +++ b/tables/requests_log/requests_log_table.go @@ -90,7 +90,7 @@ func (c *RequestsLogTable) EnrichRow(row *RequestsLog, sourceEnrichmentFields sc row.TpDestinationIP = &emptyStr row.TpSourceIP = &emptyStr } - dumpRow(row) + return row, nil } From d61a1fc0af505a6e95dcd8209c1376b6ee867b7c Mon Sep 17 00:00:00 2001 From: Jason Mogavero Date: Tue, 1 Jul 2025 16:51:02 -0400 Subject: [PATCH 5/9] added remaining possible log fields --- tables/requests_log/requests_log.go | 54 +++++++++++++--- tables/requests_log/requests_log_mapper.go | 74 ++++++++++++++++++---- 2 files changed, 107 insertions(+), 21 deletions(-) diff --git a/tables/requests_log/requests_log.go b/tables/requests_log/requests_log.go index 25d9c47..c6eec6f 100644 --- a/tables/requests_log/requests_log.go +++ b/tables/requests_log/requests_log.go @@ -29,6 +29,8 @@ type RequestsLog struct { EnforcedSecurityPolicy *RequestLogSecurityPolicy `json:"enforced_security_policy" parquet:"type=JSON"` PreviewSecurityPolicy *RequestLogSecurityPolicy `json:"preview_security_policy,omitempty" parquet:"type=JSON"` SecurityPolicyRequestData *RequestLogSecurityPolicyRequestData `json:"security_policy_request_data,omitempty" parquet:"type=JSON"` + EnforcedEdgeSecurityPolicy *RequestLogEdgeSecurityPolicy `json:"enforced_edge_security_policy,omitempty" parquet:"type=JSON"` + PreviewEdgeSecurityPolicy *RequestLogEdgeSecurityPolicy `json:"preview_edge_security_policy,omitempty" parquet:"type=JSON"` // other top level fields Resource *RequestLogResource `json:"resource,omitempty" parquet:"type=JSON"` @@ -63,17 +65,40 @@ type RequestLogHttpRequest struct { } type RequestLogSecurityPolicy struct { - ConfiguredAction string `json:"configured_action,omitempty"` - Name string `json:"name,omitempty"` - Outcome string `json:"outcome,omitempty"` - Priority int `json:"priority,omitempty"` - PreconfiguredExpressionIds []string `json:"preconfigured_expression_ids,omitempty"` + ConfiguredAction string `json:"configured_action,omitempty"` + RateLimitAction *RequestLogRateLimitAction `json:"rate_limit_action,omitempty"` + Name string `json:"name,omitempty"` + Outcome string `json:"outcome,omitempty"` + Priority int `json:"priority,omitempty"` + PreconfiguredExpressionIds []string `json:"preconfigured_expression_ids,omitempty"` + ThreatIntelligence *RequestLogThreatIntelligence `json:"threat_intelligence,omitempty"` + AddressGroup *RequestLogAddressGroup `json:"address_group,omitempty"` + MatchedFieldType string `json:"matched_field_type,omitempty"` + MatchedFieldValue string `json:"matched_field_value,omitempty"` + MatchedFieldName string `json:"matched_field_name,omitempty"` + MatchedOffset int `json:"matched_offset,omitempty"` + MatchedLength int `json:"matched_length,omitempty"` +} + +type RequestLogRateLimitAction struct { + Key string `json:"key,omitempty"` + Outcome string `json:"outcome,omitempty"` +} + +type RequestLogThreatIntelligence struct { + Categories []string `json:"categories,omitempty"` +} + +type RequestLogAddressGroup struct { + Names []string `json:"names,omitempty"` } type RequestLogSecurityPolicyRequestData struct { - RemoteIpInfo *RequestLogRemoteIpInfo `json:"remote_ip_info,omitempty"` - TlsJa3Fingerprint string `json:"tls_ja3_fingerprint,omitempty"` - TlsJa4Fingerprint string `json:"tls_ja4_fingerprint,omitempty"` + RemoteIpInfo *RequestLogRemoteIpInfo `json:"remote_ip_info,omitempty"` + RecaptchaActionToken *RequestLogRecaptchaToken `json:"recaptcha_action_token,omitempty"` + RecaptchaSessionToken *RequestLogRecaptchaToken `json:"recaptcha_session_token,omitempty"` + TlsJa3Fingerprint string `json:"tls_ja3_fingerprint,omitempty"` + TlsJa4Fingerprint string `json:"tls_ja4_fingerprint,omitempty"` } type RequestLogRemoteIpInfo struct { @@ -81,6 +106,17 @@ type RequestLogRemoteIpInfo struct { RegionCode string `json:"region_code,omitempty"` } +type RequestLogEdgeSecurityPolicy struct { + Name string `json:"name,omitempty"` + Priority int `json:"priority,omitempty"` + ConfiguredAction string `json:"configured_action,omitempty"` + Outcome string `json:"outcome,omitempty"` +} + +type RequestLogRecaptchaToken struct { + Score float64 `json:"score,omitempty"` +} + func (a *RequestsLog) GetColumnDescriptions() map[string]string { return map[string]string{ "timestamp": "The date and time when the request was received, in ISO 8601 format.", @@ -98,6 +134,8 @@ func (a *RequestsLog) GetColumnDescriptions() map[string]string { "enforced_security_policy": "Details about the enforced security policy for the request.", "preview_security_policy": "Details about the preview security policy for the request, if any.", "security_policy_request_data": "Additional data about the security policy request.", + "enforced_edge_security_policy": "Details about the enforced edge security policy for the request.", + "preview_edge_security_policy": "Details about the preview edge security policy for the request, if any.", "remote_ip": "The remote IP address from which the request originated.", "status_details": "Additional status details for the request.", diff --git a/tables/requests_log/requests_log_mapper.go b/tables/requests_log/requests_log_mapper.go index 973b4ca..b03fe5c 100644 --- a/tables/requests_log/requests_log_mapper.go +++ b/tables/requests_log/requests_log_mapper.go @@ -249,6 +249,11 @@ func mapFromBucketJson(itemBytes []byte) (*RequestsLog, error) { Name: log.JsonPayload.EnforcedSecurityPolicy.Name, Outcome: log.JsonPayload.EnforcedSecurityPolicy.Outcome, Priority: log.JsonPayload.EnforcedSecurityPolicy.Priority, + MatchedFieldType: log.JsonPayload.EnforcedSecurityPolicy.MatchedFieldType, + MatchedFieldValue: log.JsonPayload.EnforcedSecurityPolicy.MatchedFieldValue, + MatchedFieldName: log.JsonPayload.EnforcedSecurityPolicy.MatchedFieldName, + MatchedOffset: log.JsonPayload.EnforcedSecurityPolicy.MatchedOffset, + MatchedLength: log.JsonPayload.EnforcedSecurityPolicy.MatchedLength, PreconfiguredExpressionIds: ids, } } @@ -263,6 +268,11 @@ func mapFromBucketJson(itemBytes []byte) (*RequestsLog, error) { Name: log.JsonPayload.PreviewSecurityPolicy.Name, Outcome: log.JsonPayload.PreviewSecurityPolicy.Outcome, Priority: log.JsonPayload.PreviewSecurityPolicy.Priority, + MatchedFieldType: log.JsonPayload.PreviewSecurityPolicy.MatchedFieldType, + MatchedFieldValue: log.JsonPayload.PreviewSecurityPolicy.MatchedFieldValue, + MatchedFieldName: log.JsonPayload.PreviewSecurityPolicy.MatchedFieldName, + MatchedOffset: log.JsonPayload.PreviewSecurityPolicy.MatchedOffset, + MatchedLength: log.JsonPayload.PreviewSecurityPolicy.MatchedLength, PreconfiguredExpressionIds: ids, } } @@ -330,6 +340,8 @@ type jsonPayload struct { TypeName string `json:"@type"` BackendTargetProjectNumber string `json:"backendTargetProjectNumber"` CacheDecision []string `json:"cacheDecision"` + CacheId string `json:"cacheId,omitempty"` + CompressionStatus string `json:"compressionStatus,omitempty"` EnforcedSecurityPolicy *requestLogEnforcedSecurityPolicy `json:"enforcedSecurityPolicy"` PreviewSecurityPolicy *requestLogPreviewSecurityPolicy `json:"previewSecurityPolicy,omitempty"` SecurityPolicyRequestData *requestLogSecurityPolicyRequestData `json:"securityPolicyRequestData"` @@ -355,28 +367,64 @@ type httpRequest struct { } type requestLogEnforcedSecurityPolicy struct { - ConfiguredAction string `json:"configuredAction"` - Name string `json:"name"` - Outcome string `json:"outcome"` - Priority int `json:"priority"` - PreconfiguredExpressionIds []string `json:"preconfiguredExpressionIds,omitempty"` + ConfiguredAction string `json:"configuredAction"` + Name string `json:"name"` + Outcome string `json:"outcome"` + Priority int `json:"priority"` + PreconfiguredExpressionIds []string `json:"preconfiguredExpressionIds,omitempty"` + RateLimitAction *requestLogRateLimitAction `json:"rateLimitAction,omitempty"` + ThreatIntelligence *requestLogThreatIntelligence `json:"threatIntelligence,omitempty"` + AddressGroup *requestLogAddressGroup `json:"addressGroup,omitempty"` + MatchedFieldType string `json:"matchedFieldType,omitempty"` + MatchedFieldValue string `json:"matchedFieldValue,omitempty"` + MatchedFieldName string `json:"matchedFieldName,omitempty"` + MatchedOffset int `json:"matchedOffset,omitempty"` + MatchedLength int `json:"matchedLength,omitempty"` } type requestLogPreviewSecurityPolicy struct { - ConfiguredAction string `json:"configuredAction"` - Name string `json:"name"` - Outcome string `json:"outcome"` - Priority int `json:"priority"` - PreconfiguredExpressionIds []string `json:"preconfiguredExpressionIds,omitempty"` + ConfiguredAction string `json:"configuredAction"` + Name string `json:"name"` + Outcome string `json:"outcome"` + Priority int `json:"priority"` + PreconfiguredExpressionIds []string `json:"preconfiguredExpressionIds,omitempty"` + RateLimitAction *requestLogRateLimitAction `json:"rateLimitAction,omitempty"` + ThreatIntelligence *requestLogThreatIntelligence `json:"threatIntelligence,omitempty"` + AddressGroup *requestLogAddressGroup `json:"addressGroup,omitempty"` + MatchedFieldType string `json:"matchedFieldType,omitempty"` + MatchedFieldValue string `json:"matchedFieldValue,omitempty"` + MatchedFieldName string `json:"matchedFieldName,omitempty"` + MatchedFieldLength int `json:"matchedFieldLength,omitempty"` + MatchedOffset int `json:"matchedOffset,omitempty"` + MatchedLength int `json:"matchedLength,omitempty"` } type requestLogSecurityPolicyRequestData struct { - RemoteIpInfo *requestLogRemoteIpInfo `json:"remoteIpInfo"` - TlsJa3Fingerprint string `json:"tlsJa3Fingerprint"` - TlsJa4Fingerprint string `json:"tlsJa4Fingerprint"` + RemoteIpInfo *requestLogRemoteIpInfo `json:"remoteIpInfo"` + TlsJa3Fingerprint string `json:"tlsJa3Fingerprint"` + TlsJa4Fingerprint string `json:"tlsJa4Fingerprint"` + RecaptchaActionToken *requestLogRecaptchaToken `json:"recaptchaActionToken,omitempty"` + RecaptchaSessionToken *requestLogRecaptchaToken `json:"recaptchaSessionToken,omitempty"` } type requestLogRemoteIpInfo struct { Asn int `json:"asn"` RegionCode string `json:"regionCode"` } + +type requestLogRecaptchaToken struct { + Score float64 `json:"score"` +} + +type requestLogRateLimitAction struct { + Key string `json:"key"` + Outcome string `json:"outcome"` +} + +type requestLogThreatIntelligence struct { + Categories []string `json:"categories"` +} + +type requestLogAddressGroup struct { + Names []string `json:"names"` +} From 9ad9fdb073a1ce0e5c090a04ba4474dd0a28e8db Mon Sep 17 00:00:00 2001 From: Jason Mogavero Date: Wed, 2 Jul 2025 12:22:42 -0400 Subject: [PATCH 6/9] fix preconfig rule logic, start documentation --- Makefile | 8 +- docs/sources/gcp_cloud_logging_api.md | 1 + docs/tables/gcp_requests_log/index.md | 221 +++++++++++++++++++++ docs/tables/gcp_requests_log/queries.md | 0 tables/requests_log/requests_log.go | 26 +-- tables/requests_log/requests_log_mapper.go | 138 ++++++------- 6 files changed, 305 insertions(+), 89 deletions(-) create mode 100644 docs/sources/gcp_cloud_logging_api.md create mode 100644 docs/tables/gcp_requests_log/index.md create mode 100644 docs/tables/gcp_requests_log/queries.md diff --git a/Makefile b/Makefile index 2de7f28..4299c1c 100644 --- a/Makefile +++ b/Makefile @@ -6,15 +6,9 @@ PLUGIN_BINARY = $(PLUGIN_DIR)/tailpipe-plugin-gcp.plugin VERSION_JSON = $(PLUGIN_DIR)/version.json VERSIONS_JSON = $(TAILPIPE_INSTALL_DIR)/plugins/versions.json -.PHONY: install debug +.PHONY: install install: go build -o $(PLUGIN_BINARY) -tags "${BUILD_TAGS}" *.go $(PLUGIN_BINARY) metadata > $(VERSION_JSON) rm -f $(VERSIONS_JSON) - -debug: - @echo "Building and installing debug plugin…" - go build -gcflags="all=-N -l" -o $(PLUGIN_BINARY) -tags "${BUILD_TAGS}" *.go - $(PLUGIN_BINARY) metadata > $(VERSION_JSON) - rm -f $(VERSIONS_JSON) \ No newline at end of file diff --git a/docs/sources/gcp_cloud_logging_api.md b/docs/sources/gcp_cloud_logging_api.md new file mode 100644 index 0000000..5e40c08 --- /dev/null +++ b/docs/sources/gcp_cloud_logging_api.md @@ -0,0 +1 @@ +asdf \ No newline at end of file diff --git a/docs/tables/gcp_requests_log/index.md b/docs/tables/gcp_requests_log/index.md new file mode 100644 index 0000000..be2c43c --- /dev/null +++ b/docs/tables/gcp_requests_log/index.md @@ -0,0 +1,221 @@ +--- +title: "Tailpipe Table: gcp_requests_log - Query GCP request logs" +description: "GCP request logs capture network requests from GCP Load Balancers containing fields for the results of Cloud Armor analysis." +--- + +# Table: gcp_requests_log - Query GCP request logs + +The `gcp_requests_log` table allows you to query data from GCP audit logs. This table provides detailed information about API calls made within your Google Cloud environment, including the event name, resource affected, user identity, and more. + +## Configure + +Create a [partition](https://tailpipe.io/docs/manage/partition) for `gcp_requests_log`: + +```sh +vi ~/.tailpipe/config/gcp.tpc +``` + +```hcl +connection "gcp" "logging_account" { + project = "my-gcp-project" +} + +partition "gcp_requests_log" "my_logs" { + source "gcp_storage_bucket" { + connection = connection.gcp.logging_account + } +} +``` +OR + +```hcl +connection "gcp" "logging_account" { + project = "my-gcp-project" +} + +partition "gcp_requests_log" "my_logs" { + source "gcp_cloud_logging_api" { + connection = connection.gcp.logging_account + } +} +``` + +## Collect + +[Collect](https://tailpipe.io/docs/manage/collection) logs for all `gcp_requests_log` partitions: + +```sh +tailpipe collect gcp_requests_log +``` + +Or for a single partition: + +```sh +tailpipe collect gcp_requests_log.my_logs +``` + +## Query + + +### Blocked Requests + +Count how many requests were blocked by Cloud Armor + +```sql +> SELECT + enforced_security_policy ->> 'name' AS policy_name, + count(*) AS total_blocked_requests +FROM + gcp_requests_log +WHERE + enforced_security_policy ->> 'outcome' = 'DENY' +GROUP BY + policy_name +ORDER BY + total_blocked_requests DESC +``` + +### Top 10 events + +List the 10 most frequently called events. + +```sql +select + service_name, + method_name, + count(*) as event_count +from + gcp_audit_log +group by + service_name, + method_name +order by + event_count desc +limit 10; +``` + +### High Volume IAM Access Token Generation + +Find users generating a high volume of IAM access tokens within a short period, which may indicate potential privilege escalation or compromised credentials. + +```sql +select + authentication_info ->> 'principal_email' as user_email, + count(*) as event_count, + date_trunc('minute', timestamp) as event_minute +from + gcp_audit_log +where + service_name = 'iamcredentials.googleapis.com' + and method_name ilike 'generateaccesstoken' +group by + user_email, + event_minute +having + count(*) > 10 +order by + event_count desc; +``` + +## Example Configurations + +### Collect logs from a Storage bucket + +Collect audit logs stored in a Storage bucket that use the [default log file name format](https://hub.tailpipe.io/plugins/turbot/gcp/tables/gcp_audit_log#gcp_storage_bucket). + +```hcl +connection "gcp" "logging_account" { + project = "my-gcp-project" +} + +partition "gcp_audit_log" "my_logs" { + source "gcp_storage_bucket" { + connection = connection.gcp.logging_account + bucket = "gcp-audit-logs-bucket" + } +} +``` + +### Collect logs from a Storage bucket with a prefix + +Collect audit logs stored with a GCS key prefix. + +```hcl +partition "gcp_audit_log" "my_logs_prefix" { + source "gcp_storage_bucket" { + connection = connection.gcp.logging_account + bucket = "gcp-audit-logs-bucket" + prefix = "my/prefix/" + } +} +``` + +### Collect logs from a Storage Bucket for a single project + +Collect audit logs for a specific project. + +```hcl +partition "gcp_audit_log" "my_logs_prefix" { + filter = "log_name like 'projects/my-project-name/logs/cloudaudit.googleapis.com/%'" + + source "gcp_storage_bucket" { + connection = connection.gcp.logging_account + bucket = "gcp-audit-logs-bucket" + } +} +``` + +### Collect logs from audit logs API + +Collect audit logs stored in a Storage bucket that use the [default log file name format](https://hub.tailpipe.io/plugins/turbot/gcp/tables/gcp_audit_log#gcp_storage_bucket). + +```hcl +connection "gcp" "my_project" { + project = "my-gcp-project" +} + +partition "gcp_audit_log" "my_logs" { + source "gcp_audit_log_api" { + connection = connection.gcp.my_project + } +} +``` + +### Collect specific types of audit logs from audit logs API + +Collect admin activity and data access audit logs for a project. + +```hcl +partition "gcp_audit_log" "my_logs_admin_data_access" { + source "gcp_audit_log_api" { + connection = connection.gcp.my_project + log_types = ["activity", "data_access"] + } +} +``` + +### Exclude INFO level events + +Use the filter argument in your partition to exclude INFO severity level events and reduce log storage size. + +```hcl +partition "gcp_audit_log" "my_logs_severity" { + # Avoid saving specific severity levels + filter = "severity != 'INFO'" + + source "gcp-storage_bucket" { + connection = connection.gcp.logging_account + bucket = "gcp-audit-logs-bucket" + } +} +``` + +## Source Defaults + +### gcp_storage_bucket + +This table sets the following defaults for the [gcp_storage_bucket](https://hub.tailpipe.io/plugins/turbot/gcp/sources/gcp_storage_bucket#arguments): + +| Argument | Default | +|--------------|---------| +| file_layout | `cloudaudit.googleapis.com/%{DATA:type}/%{YEAR:year}/%{MONTHNUM:month}/%{MONTHDAY:day}/%{HOUR:hour}:%{MINUTE:minute}:%{SECOND:second}_%{DATA:end_time}_%{DATA:suffix}.json` | diff --git a/docs/tables/gcp_requests_log/queries.md b/docs/tables/gcp_requests_log/queries.md new file mode 100644 index 0000000..e69de29 diff --git a/tables/requests_log/requests_log.go b/tables/requests_log/requests_log.go index c6eec6f..3033ff8 100644 --- a/tables/requests_log/requests_log.go +++ b/tables/requests_log/requests_log.go @@ -65,19 +65,19 @@ type RequestLogHttpRequest struct { } type RequestLogSecurityPolicy struct { - ConfiguredAction string `json:"configured_action,omitempty"` - RateLimitAction *RequestLogRateLimitAction `json:"rate_limit_action,omitempty"` - Name string `json:"name,omitempty"` - Outcome string `json:"outcome,omitempty"` - Priority int `json:"priority,omitempty"` - PreconfiguredExpressionIds []string `json:"preconfigured_expression_ids,omitempty"` - ThreatIntelligence *RequestLogThreatIntelligence `json:"threat_intelligence,omitempty"` - AddressGroup *RequestLogAddressGroup `json:"address_group,omitempty"` - MatchedFieldType string `json:"matched_field_type,omitempty"` - MatchedFieldValue string `json:"matched_field_value,omitempty"` - MatchedFieldName string `json:"matched_field_name,omitempty"` - MatchedOffset int `json:"matched_offset,omitempty"` - MatchedLength int `json:"matched_length,omitempty"` + ConfiguredAction string `json:"configured_action,omitempty"` + RateLimitAction *RequestLogRateLimitAction `json:"rate_limit_action,omitempty"` + Name string `json:"name,omitempty"` + Outcome string `json:"outcome,omitempty"` + Priority int `json:"priority,omitempty"` + PreconfiguredExprId string `json:"preconfigured_expr_id,omitempty"` + ThreatIntelligence *RequestLogThreatIntelligence `json:"threat_intelligence,omitempty"` + AddressGroup *RequestLogAddressGroup `json:"address_group,omitempty"` + MatchedFieldType string `json:"matched_field_type,omitempty"` + MatchedFieldValue string `json:"matched_field_value,omitempty"` + MatchedFieldName string `json:"matched_field_name,omitempty"` + MatchedOffset int `json:"matched_offset,omitempty"` + MatchedLength int `json:"matched_length,omitempty"` } type RequestLogRateLimitAction struct { diff --git a/tables/requests_log/requests_log_mapper.go b/tables/requests_log/requests_log_mapper.go index b03fe5c..9cd0652 100644 --- a/tables/requests_log/requests_log_mapper.go +++ b/tables/requests_log/requests_log_mapper.go @@ -105,15 +105,10 @@ func mapFromSDKType(item *loggingpb.LogEntry) (*RequestsLog, error) { policy.Priority = int(val) } - ids := []string{} if rawIds, ok := securityPolicyMap["preconfiguredExpressionIds"].([]interface{}); ok { - for _, id := range rawIds { - if s, ok := id.(string); ok { - ids = append(ids, s) - } - } + rule_id := rawIds[0].(string) + policy.PreconfiguredExprId = rule_id } - policy.PreconfiguredExpressionIds = ids row.EnforcedSecurityPolicy = policy } @@ -137,12 +132,8 @@ func mapFromSDKType(item *loggingpb.LogEntry) (*RequestsLog, error) { // Handle PreconfiguredExpressionIds within PreviewSecurityPolicy only if it exists and has values. if rawIds, ok := previewPolicyMap["preconfiguredExpressionIds"].([]interface{}); ok && len(rawIds) > 0 { - row.PreviewSecurityPolicy.PreconfiguredExpressionIds = make([]string, 0, len(rawIds)) - for _, id := range rawIds { - if idStr, ok := id.(string); ok { - row.PreviewSecurityPolicy.PreconfiguredExpressionIds = append(row.PreviewSecurityPolicy.PreconfiguredExpressionIds, idStr) - } - } + rule_id := rawIds[0].(string) + row.PreviewSecurityPolicy.PreconfiguredExprId = rule_id } } @@ -241,39 +232,48 @@ func mapFromBucketJson(itemBytes []byte) (*RequestsLog, error) { if log.JsonPayload.EnforcedSecurityPolicy != nil { ids := []string{} - if log.JsonPayload.EnforcedSecurityPolicy.PreconfiguredExpressionIds != nil { - ids = log.JsonPayload.EnforcedSecurityPolicy.PreconfiguredExpressionIds + if log.JsonPayload.EnforcedSecurityPolicy.PreconfiguredExprIds != nil { + ids = log.JsonPayload.EnforcedSecurityPolicy.PreconfiguredExprIds + } + var exprId string + if len(ids) > 0 { + exprId = ids[0] } row.EnforcedSecurityPolicy = &RequestLogSecurityPolicy{ - ConfiguredAction: log.JsonPayload.EnforcedSecurityPolicy.ConfiguredAction, - Name: log.JsonPayload.EnforcedSecurityPolicy.Name, - Outcome: log.JsonPayload.EnforcedSecurityPolicy.Outcome, - Priority: log.JsonPayload.EnforcedSecurityPolicy.Priority, - MatchedFieldType: log.JsonPayload.EnforcedSecurityPolicy.MatchedFieldType, - MatchedFieldValue: log.JsonPayload.EnforcedSecurityPolicy.MatchedFieldValue, - MatchedFieldName: log.JsonPayload.EnforcedSecurityPolicy.MatchedFieldName, - MatchedOffset: log.JsonPayload.EnforcedSecurityPolicy.MatchedOffset, - MatchedLength: log.JsonPayload.EnforcedSecurityPolicy.MatchedLength, - PreconfiguredExpressionIds: ids, + ConfiguredAction: log.JsonPayload.EnforcedSecurityPolicy.ConfiguredAction, + Name: log.JsonPayload.EnforcedSecurityPolicy.Name, + Outcome: log.JsonPayload.EnforcedSecurityPolicy.Outcome, + Priority: log.JsonPayload.EnforcedSecurityPolicy.Priority, + MatchedFieldType: log.JsonPayload.EnforcedSecurityPolicy.MatchedFieldType, + MatchedFieldValue: log.JsonPayload.EnforcedSecurityPolicy.MatchedFieldValue, + MatchedFieldName: log.JsonPayload.EnforcedSecurityPolicy.MatchedFieldName, + MatchedOffset: log.JsonPayload.EnforcedSecurityPolicy.MatchedOffset, + MatchedLength: log.JsonPayload.EnforcedSecurityPolicy.MatchedLength, + PreconfiguredExprId: exprId, } } if log.JsonPayload.PreviewSecurityPolicy != nil { ids := []string{} - if log.JsonPayload.PreviewSecurityPolicy.PreconfiguredExpressionIds != nil { - ids = log.JsonPayload.PreviewSecurityPolicy.PreconfiguredExpressionIds + if log.JsonPayload.PreviewSecurityPolicy.PreconfiguredExprIds != nil { + ids = log.JsonPayload.PreviewSecurityPolicy.PreconfiguredExprIds + } + // preconfiguredExprIds is always an array of one string, grab the index 0 slice and case this to a string in the row struct + var exprId string + if len(ids) > 0 { + exprId = ids[0] } row.PreviewSecurityPolicy = &RequestLogSecurityPolicy{ - ConfiguredAction: log.JsonPayload.PreviewSecurityPolicy.ConfiguredAction, - Name: log.JsonPayload.PreviewSecurityPolicy.Name, - Outcome: log.JsonPayload.PreviewSecurityPolicy.Outcome, - Priority: log.JsonPayload.PreviewSecurityPolicy.Priority, - MatchedFieldType: log.JsonPayload.PreviewSecurityPolicy.MatchedFieldType, - MatchedFieldValue: log.JsonPayload.PreviewSecurityPolicy.MatchedFieldValue, - MatchedFieldName: log.JsonPayload.PreviewSecurityPolicy.MatchedFieldName, - MatchedOffset: log.JsonPayload.PreviewSecurityPolicy.MatchedOffset, - MatchedLength: log.JsonPayload.PreviewSecurityPolicy.MatchedLength, - PreconfiguredExpressionIds: ids, + ConfiguredAction: log.JsonPayload.PreviewSecurityPolicy.ConfiguredAction, + Name: log.JsonPayload.PreviewSecurityPolicy.Name, + Outcome: log.JsonPayload.PreviewSecurityPolicy.Outcome, + Priority: log.JsonPayload.PreviewSecurityPolicy.Priority, + MatchedFieldType: log.JsonPayload.PreviewSecurityPolicy.MatchedFieldType, + MatchedFieldValue: log.JsonPayload.PreviewSecurityPolicy.MatchedFieldValue, + MatchedFieldName: log.JsonPayload.PreviewSecurityPolicy.MatchedFieldName, + MatchedOffset: log.JsonPayload.PreviewSecurityPolicy.MatchedOffset, + MatchedLength: log.JsonPayload.PreviewSecurityPolicy.MatchedLength, + PreconfiguredExprId: exprId, } } @@ -342,8 +342,8 @@ type jsonPayload struct { CacheDecision []string `json:"cacheDecision"` CacheId string `json:"cacheId,omitempty"` CompressionStatus string `json:"compressionStatus,omitempty"` - EnforcedSecurityPolicy *requestLogEnforcedSecurityPolicy `json:"enforcedSecurityPolicy"` - PreviewSecurityPolicy *requestLogPreviewSecurityPolicy `json:"previewSecurityPolicy,omitempty"` + EnforcedSecurityPolicy *requestLogSecurityPolicy `json:"enforcedSecurityPolicy"` + PreviewSecurityPolicy *requestLogSecurityPolicy `json:"previewSecurityPolicy,omitempty"` SecurityPolicyRequestData *requestLogSecurityPolicyRequestData `json:"securityPolicyRequestData"` RemoteIp string `json:"remoteIp"` StatusDetails string `json:"statusDetails"` @@ -366,38 +366,38 @@ type httpRequest struct { CacheFillBytes string `json:"cacheFillBytes,omitempty"` } -type requestLogEnforcedSecurityPolicy struct { - ConfiguredAction string `json:"configuredAction"` - Name string `json:"name"` - Outcome string `json:"outcome"` - Priority int `json:"priority"` - PreconfiguredExpressionIds []string `json:"preconfiguredExpressionIds,omitempty"` - RateLimitAction *requestLogRateLimitAction `json:"rateLimitAction,omitempty"` - ThreatIntelligence *requestLogThreatIntelligence `json:"threatIntelligence,omitempty"` - AddressGroup *requestLogAddressGroup `json:"addressGroup,omitempty"` - MatchedFieldType string `json:"matchedFieldType,omitempty"` - MatchedFieldValue string `json:"matchedFieldValue,omitempty"` - MatchedFieldName string `json:"matchedFieldName,omitempty"` - MatchedOffset int `json:"matchedOffset,omitempty"` - MatchedLength int `json:"matchedLength,omitempty"` +type requestLogSecurityPolicy struct { + ConfiguredAction string `json:"configuredAction"` + Name string `json:"name"` + Outcome string `json:"outcome"` + Priority int `json:"priority"` + PreconfiguredExprIds []string `json:"preconfiguredExprIds,omitempty"` + RateLimitAction *requestLogRateLimitAction `json:"rateLimitAction,omitempty"` + ThreatIntelligence *requestLogThreatIntelligence `json:"threatIntelligence,omitempty"` + AddressGroup *requestLogAddressGroup `json:"addressGroup,omitempty"` + MatchedFieldType string `json:"matchedFieldType,omitempty"` + MatchedFieldValue string `json:"matchedFieldValue,omitempty"` + MatchedFieldName string `json:"matchedFieldName,omitempty"` + MatchedOffset int `json:"matchedOffset,omitempty"` + MatchedLength int `json:"matchedLength,omitempty"` } -type requestLogPreviewSecurityPolicy struct { - ConfiguredAction string `json:"configuredAction"` - Name string `json:"name"` - Outcome string `json:"outcome"` - Priority int `json:"priority"` - PreconfiguredExpressionIds []string `json:"preconfiguredExpressionIds,omitempty"` - RateLimitAction *requestLogRateLimitAction `json:"rateLimitAction,omitempty"` - ThreatIntelligence *requestLogThreatIntelligence `json:"threatIntelligence,omitempty"` - AddressGroup *requestLogAddressGroup `json:"addressGroup,omitempty"` - MatchedFieldType string `json:"matchedFieldType,omitempty"` - MatchedFieldValue string `json:"matchedFieldValue,omitempty"` - MatchedFieldName string `json:"matchedFieldName,omitempty"` - MatchedFieldLength int `json:"matchedFieldLength,omitempty"` - MatchedOffset int `json:"matchedOffset,omitempty"` - MatchedLength int `json:"matchedLength,omitempty"` -} +// type requestLogSecurityPolicy struct { +// ConfiguredAction string `json:"configuredAction"` +// Name string `json:"name"` +// Outcome string `json:"outcome"` +// Priority int `json:"priority"` +// PreconfiguredExprIds []string `json:"preconfiguredExprIds,omitempty"` +// RateLimitAction *requestLogRateLimitAction `json:"rateLimitAction,omitempty"` +// ThreatIntelligence *requestLogThreatIntelligence `json:"threatIntelligence,omitempty"` +// AddressGroup *requestLogAddressGroup `json:"addressGroup,omitempty"` +// MatchedFieldType string `json:"matchedFieldType,omitempty"` +// MatchedFieldValue string `json:"matchedFieldValue,omitempty"` +// MatchedFieldName string `json:"matchedFieldName,omitempty"` +// MatchedFieldLength int `json:"matchedFieldLength,omitempty"` +// MatchedOffset int `json:"matchedOffset,omitempty"` +// MatchedLength int `json:"matchedLength,omitempty"` +// } type requestLogSecurityPolicyRequestData struct { RemoteIpInfo *requestLogRemoteIpInfo `json:"remoteIpInfo"` From 46e6f8ba99d86f8558ff3c0e61bc0d36d89c3a6d Mon Sep 17 00:00:00 2001 From: Jason Mogavero Date: Thu, 3 Jul 2025 13:38:21 -0400 Subject: [PATCH 7/9] updated docs --- docs/sources/gcp_cloud_logging_api.md | 41 ++++++++- docs/tables/gcp_requests_log/index.md | 109 ++++++++---------------- docs/tables/gcp_requests_log/queries.md | 0 3 files changed, 77 insertions(+), 73 deletions(-) delete mode 100644 docs/tables/gcp_requests_log/queries.md diff --git a/docs/sources/gcp_cloud_logging_api.md b/docs/sources/gcp_cloud_logging_api.md index 5e40c08..d92b950 100644 --- a/docs/sources/gcp_cloud_logging_api.md +++ b/docs/sources/gcp_cloud_logging_api.md @@ -1 +1,40 @@ -asdf \ No newline at end of file +--- +title: "Source: gcp_cloud_logging_api - Collect logs from GCP Cloud Logging API" +description: "Allows users to collect logs from Google Cloud Platform (GCP) Cloud Logging API." +--- + +# Source: gcp_cloud_logging_api - Obtain logs from GCP Cloud Logging API + +The Google Cloud Platform (GCP) Cloud Logging API provides access to all logs for all GCP services. It allows you to view and manage logs for your GCP projects, services, and applications. + +This source is currently configured only for request logs from Google Load Balancer and Cloud Armor logs, from the source log name `projects/project-id/logs/requests` + +Using this source, currently you can collect, filter, and analyze request logs that have been enriched with Cloud Armor rule findings, in order to collect metrics on blocking or analyze to eliminate false positive findings that would block wanted application requests. + +Any other log type except for audit logs are of the `logEntry` type, and this source can potentially collect them with minor changes to the source code. (Tables must still be created for each type of log) + +## Example Configurations + +### Collect request logs + +Collect all of the request logs for a project. + +```hcl +connection "gcp" "my_project" { + project = "my-gcp-project" +} + +partition "gcp_requests_log" "my_logs" { + source "gcp_cloud_logging_api" { + connection = connection.gcp.my_project + } +} +``` + + +## Arguments + +| Argument | Type | Required | Default | Description | +|------------|------------------|----------|--------------------------|-------------------------------------------------------------------------------------------------------------------------------| +| connection | `connection.gcp` | No | `connection.gcp.default` | The [GCP connection](https://hub.tailpipe.io/plugins/turbot/gcp#connection-credentials) to use to connect to the GCP account. | +| log_types | List(String) | No | [] | This could any type of non-audit log that confirms to the [logEntry data model](https://cloud.google.com/logging/docs/log-entry-data-model) and is stored in the `_Default` logging bucket in a GCP project. The only restriction is what tables are supported in Tailpipe. Currently the only supported type is `requests` diff --git a/docs/tables/gcp_requests_log/index.md b/docs/tables/gcp_requests_log/index.md index be2c43c..31c7477 100644 --- a/docs/tables/gcp_requests_log/index.md +++ b/docs/tables/gcp_requests_log/index.md @@ -59,11 +59,11 @@ tailpipe collect gcp_requests_log.my_logs ### Blocked Requests -Count how many requests were blocked by Cloud Armor +Count how many requests were blocked by Cloud Armor across all policies ```sql -> SELECT - enforced_security_policy ->> 'name' AS policy_name, +SELECT + enforced_security_policy.name AS policy_name, count(*) AS total_blocked_requests FROM gcp_requests_log @@ -77,61 +77,40 @@ ORDER BY ### Top 10 events -List the 10 most frequently called events. +List the 10 most blocked OWASP Core Rule Set rules ```sql -select - service_name, - method_name, - count(*) as event_count -from - gcp_audit_log -group by - service_name, - method_name -order by - event_count desc -limit 10; +SELECT + enforced_security_policy ->> 'preconfigured_expr_id' AS rule_id, + COUNT(*) AS total_occurrences +FROM + gcp_requests_log +WHERE + enforced_security_policy ->> 'outcome' = 'DENY' + AND LENGTH(enforced_security_policy ->> 'preconfigured_expr_id') > 0 +GROUP BY + rule_id +ORDER BY + total_occurrences DESC +LIMIT 10 ``` -### High Volume IAM Access Token Generation - -Find users generating a high volume of IAM access tokens within a short period, which may indicate potential privilege escalation or compromised credentials. - -```sql -select - authentication_info ->> 'principal_email' as user_email, - count(*) as event_count, - date_trunc('minute', timestamp) as event_minute -from - gcp_audit_log -where - service_name = 'iamcredentials.googleapis.com' - and method_name ilike 'generateaccesstoken' -group by - user_email, - event_minute -having - count(*) > 10 -order by - event_count desc; -``` ## Example Configurations ### Collect logs from a Storage bucket -Collect audit logs stored in a Storage bucket that use the [default log file name format](https://hub.tailpipe.io/plugins/turbot/gcp/tables/gcp_audit_log#gcp_storage_bucket). +Collect request logs stored in a Storage bucket that use the [default log file name format](https://hub.tailpipe.io/plugins/turbot/gcp/tables/gcp_audit_log#gcp_storage_bucket). ```hcl connection "gcp" "logging_account" { project = "my-gcp-project" } -partition "gcp_audit_log" "my_logs" { +partition "gcp_requests_log" "my_logs" { source "gcp_storage_bucket" { connection = connection.gcp.logging_account - bucket = "gcp-audit-logs-bucket" + bucket = "gcp-cloudarmor-logs-bucket" } } ``` @@ -141,10 +120,10 @@ partition "gcp_audit_log" "my_logs" { Collect audit logs stored with a GCS key prefix. ```hcl -partition "gcp_audit_log" "my_logs_prefix" { +partition "gcp_requests_log" "my_logs_prefix" { source "gcp_storage_bucket" { connection = connection.gcp.logging_account - bucket = "gcp-audit-logs-bucket" + bucket = "gcp-cloudarmor-logs-bucket" prefix = "my/prefix/" } } @@ -155,57 +134,43 @@ partition "gcp_audit_log" "my_logs_prefix" { Collect audit logs for a specific project. ```hcl -partition "gcp_audit_log" "my_logs_prefix" { - filter = "log_name like 'projects/my-project-name/logs/cloudaudit.googleapis.com/%'" +partition "gcp_requests_log" "my_logs_prefix" { + filter = "log_name like 'projects/my-project-name/logs/requests/%'" source "gcp_storage_bucket" { connection = connection.gcp.logging_account - bucket = "gcp-audit-logs-bucket" + bucket = "gcp-cloudarmor-logs-bucket" } } ``` -### Collect logs from audit logs API +### Collect logs from Cloud Logging API -Collect audit logs stored in a Storage bucket that use the [default log file name format](https://hub.tailpipe.io/plugins/turbot/gcp/tables/gcp_audit_log#gcp_storage_bucket). +Collect request logs directly via the Cloud Logging API. *Note that rate limiting is currently not implemented and this could impact ability to collect a large number of logs* ```hcl connection "gcp" "my_project" { project = "my-gcp-project" } -partition "gcp_audit_log" "my_logs" { - source "gcp_audit_log_api" { +partition "gcp_requests_log" "my_logs" { + source "gcp_cloud_logging_api" { connection = connection.gcp.my_project } } ``` -### Collect specific types of audit logs from audit logs API - -Collect admin activity and data access audit logs for a project. - -```hcl -partition "gcp_audit_log" "my_logs_admin_data_access" { - source "gcp_audit_log_api" { - connection = connection.gcp.my_project - log_types = ["activity", "data_access"] - } -} -``` +### Collect other types of logs from Cloud Logging API -### Exclude INFO level events +The Cloud Logging API source can be expanded upon to retrieve logs other than requests, if the appropriate table is created to enrich and save them. The log name attribute is the filter used for this, and it assumes that a table / partition have been made that match the data type. -Use the filter argument in your partition to exclude INFO severity level events and reduce log storage size. +Example: Collecting GCP Dataflow logs ```hcl -partition "gcp_audit_log" "my_logs_severity" { - # Avoid saving specific severity levels - filter = "severity != 'INFO'" - - source "gcp-storage_bucket" { - connection = connection.gcp.logging_account - bucket = "gcp-audit-logs-bucket" +partition "gcp_dataflow_log" "my_logs_prefix" { + filter = "log_name like 'projects/my-project-name/logs/dataflow.googleapis.com%'" + source "gcp_cloud_logging_api" { + connection = connection.gcp.my_project } } ``` @@ -218,4 +183,4 @@ This table sets the following defaults for the [gcp_storage_bucket](https://hub. | Argument | Default | |--------------|---------| -| file_layout | `cloudaudit.googleapis.com/%{DATA:type}/%{YEAR:year}/%{MONTHNUM:month}/%{MONTHDAY:day}/%{HOUR:hour}:%{MINUTE:minute}:%{SECOND:second}_%{DATA:end_time}_%{DATA:suffix}.json` | +| file_layout | `requests/%{YEAR:year}/%{MONTHNUM:month}/%{MONTHDAY:day}/%{HOUR:hour}:%{MINUTE:minute}:%{SECOND:second}_%{DATA:end_time}_%{DATA:suffix}.json` | diff --git a/docs/tables/gcp_requests_log/queries.md b/docs/tables/gcp_requests_log/queries.md deleted file mode 100644 index e69de29..0000000 From ea2f247fdb450d7d192c5c810ae1884e03371707 Mon Sep 17 00:00:00 2001 From: Jason Mogavero Date: Fri, 18 Jul 2025 13:30:53 -0400 Subject: [PATCH 8/9] Apply suggestions from code review Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../cloud_logging_api_source.go | 7 +++-- tables/requests_log/requests_log_mapper.go | 31 ++++++------------- 2 files changed, 14 insertions(+), 24 deletions(-) diff --git a/sources/cloud_logging_api/cloud_logging_api_source.go b/sources/cloud_logging_api/cloud_logging_api_source.go index 1675926..3fcc354 100644 --- a/sources/cloud_logging_api/cloud_logging_api_source.go +++ b/sources/cloud_logging_api/cloud_logging_api_source.go @@ -4,7 +4,7 @@ import ( "context" "errors" "fmt" - "strings" // Assuming this is still used in getLogNameFilter + "strings" "time" "cloud.google.com/go/logging" @@ -61,7 +61,10 @@ func (s *CloudLoggingAPISource) Collect(ctx context.Context) error { filter := s.getLogNameFilter(project, logTypes, s.FromTime) - // TODO: #ratelimit implement rate limiting + // TODO: #ratelimit Implement rate limiting to ensure compliance with GCP API quotas. + // Use a token bucket algorithm with a maximum of 100 requests per second and a burst capacity of 200. + // Refer to the GCP API rate-limiting documentation: https://cloud.google.com/apis/docs/rate-limits + // This feature should be implemented by Q4 2023 to prevent potential throttling issues. // logEntry will now be the higher-level logging.Entry var logEntry *logging.Entry diff --git a/tables/requests_log/requests_log_mapper.go b/tables/requests_log/requests_log_mapper.go index 9cd0652..0fe750f 100644 --- a/tables/requests_log/requests_log_mapper.go +++ b/tables/requests_log/requests_log_mapper.go @@ -30,7 +30,7 @@ func (m *RequestsLogMapper) Map(_ context.Context, a any, _ ...mappers.MapOption case *loggingpb.LogEntry: return mapFromSDKType(v) case *logging.Entry: - return nil, fmt.Errorf("Logging.Entry did not convert to *loggingpb.LogEntry: %T", a) + return nil, fmt.Errorf("logging.Entry did not convert to *loggingpb.LogEntry: %T", a) case []byte: return mapFromBucketJson(v) default: @@ -105,9 +105,10 @@ func mapFromSDKType(item *loggingpb.LogEntry) (*RequestsLog, error) { policy.Priority = int(val) } - if rawIds, ok := securityPolicyMap["preconfiguredExpressionIds"].([]interface{}); ok { - rule_id := rawIds[0].(string) - policy.PreconfiguredExprId = rule_id + if rawIds, ok := securityPolicyMap["preconfiguredExpressionIds"].([]interface{}); ok && len(rawIds) > 0 { + if ruleId, ok := rawIds[0].(string); ok { + policy.PreconfiguredExprId = ruleId + } } row.EnforcedSecurityPolicy = policy } @@ -132,8 +133,9 @@ func mapFromSDKType(item *loggingpb.LogEntry) (*RequestsLog, error) { // Handle PreconfiguredExpressionIds within PreviewSecurityPolicy only if it exists and has values. if rawIds, ok := previewPolicyMap["preconfiguredExpressionIds"].([]interface{}); ok && len(rawIds) > 0 { - rule_id := rawIds[0].(string) - row.PreviewSecurityPolicy.PreconfiguredExprId = rule_id + if ruleId, ok := rawIds[0].(string); ok { + row.PreviewSecurityPolicy.PreconfiguredExprId = ruleId + } } } @@ -382,22 +384,7 @@ type requestLogSecurityPolicy struct { MatchedLength int `json:"matchedLength,omitempty"` } -// type requestLogSecurityPolicy struct { -// ConfiguredAction string `json:"configuredAction"` -// Name string `json:"name"` -// Outcome string `json:"outcome"` -// Priority int `json:"priority"` -// PreconfiguredExprIds []string `json:"preconfiguredExprIds,omitempty"` -// RateLimitAction *requestLogRateLimitAction `json:"rateLimitAction,omitempty"` -// ThreatIntelligence *requestLogThreatIntelligence `json:"threatIntelligence,omitempty"` -// AddressGroup *requestLogAddressGroup `json:"addressGroup,omitempty"` -// MatchedFieldType string `json:"matchedFieldType,omitempty"` -// MatchedFieldValue string `json:"matchedFieldValue,omitempty"` -// MatchedFieldName string `json:"matchedFieldName,omitempty"` -// MatchedFieldLength int `json:"matchedFieldLength,omitempty"` -// MatchedOffset int `json:"matchedOffset,omitempty"` -// MatchedLength int `json:"matchedLength,omitempty"` -// } +// (No replacement lines; the block is removed entirely.) type requestLogSecurityPolicyRequestData struct { RemoteIpInfo *requestLogRemoteIpInfo `json:"remoteIpInfo"` From 237b1b79c4be43ae4bc3ea376299b0993f34aaf5 Mon Sep 17 00:00:00 2001 From: Jason Mogavero Date: Mon, 21 Jul 2025 17:07:07 -0400 Subject: [PATCH 9/9] resolved bugs introduced from upstream changed to schema and FromTime --- sources/cloud_logging_api/cloud_logging_api_source.go | 8 +++++--- tables/requests_log/requests_log_table.go | 1 - 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/sources/cloud_logging_api/cloud_logging_api_source.go b/sources/cloud_logging_api/cloud_logging_api_source.go index 3fcc354..85cb363 100644 --- a/sources/cloud_logging_api/cloud_logging_api_source.go +++ b/sources/cloud_logging_api/cloud_logging_api_source.go @@ -59,7 +59,7 @@ func (s *CloudLoggingAPISource) Collect(ctx context.Context) error { }, } - filter := s.getLogNameFilter(project, logTypes, s.FromTime) + filter := s.getLogNameFilter(project, logTypes, s.CollectionTimeRange) // TODO: #ratelimit Implement rate limiting to ensure compliance with GCP API quotas. // Use a token bucket algorithm with a maximum of 100 requests per second and a burst capacity of 200. @@ -122,9 +122,11 @@ func (s *CloudLoggingAPISource) getClient(ctx context.Context, project string) ( return client, nil } -func (s *CloudLoggingAPISource) getLogNameFilter(projectId string, logTypes []string, startTime time.Time) string { +func (s *CloudLoggingAPISource) getLogNameFilter(projectId string, logTypes []string, timeRange collection_state.DirectionalTimeRange) string { requestsLog := fmt.Sprintf(`"projects/%s/logs/requests"`, projectId) - timePart := fmt.Sprintf(`AND (timestamp > "%s")`, startTime.Format(time.RFC3339Nano)) + timePart := fmt.Sprintf(`AND (timestamp >= "%s") AND (timestamp < "%s")`, + timeRange.StartTime().Format(time.RFC3339Nano), + timeRange.EndTime().Format(time.RFC3339Nano)) // short-circuit default if len(logTypes) == 0 { diff --git a/tables/requests_log/requests_log_table.go b/tables/requests_log/requests_log_table.go index 468828e..3d097cc 100644 --- a/tables/requests_log/requests_log_table.go +++ b/tables/requests_log/requests_log_table.go @@ -63,7 +63,6 @@ func (c *RequestsLogTable) EnrichRow(row *RequestsLog, sourceEnrichmentFields sc row.TpID = xid.New().String() row.TpTimestamp = row.Timestamp row.TpIngestTimestamp = time.Now() - row.TpIndex = schema.DefaultIndex row.TpDate = row.Timestamp.Truncate(24 * time.Hour) // Ensure TpIps is always initialized (even if empty)