From 00101746d44d59bbb88fffb3aa9fa803184b3701 Mon Sep 17 00:00:00 2001 From: michaelkedar Date: Tue, 21 Apr 2026 07:14:30 +0000 Subject: [PATCH 1/8] feat: begin laying out things for go worker --- .../database/datastore/vulnerability.go | 13 +++ go/internal/importer/mock_test.go | 13 +++ go/internal/models/vulnerability.go | 30 +++++ .../worker/enrich/source_link_adder.go | 31 ++++++ go/internal/worker/interfaces.go | 22 ++++ go/internal/worker/subscriber.go | 103 ++++++++++++++++++ go/internal/worker/worker.go | 99 +++++++++++++++++ 7 files changed, 311 insertions(+) create mode 100644 go/internal/worker/enrich/source_link_adder.go create mode 100644 go/internal/worker/interfaces.go create mode 100644 go/internal/worker/subscriber.go create mode 100644 go/internal/worker/worker.go diff --git a/go/internal/database/datastore/vulnerability.go b/go/internal/database/datastore/vulnerability.go index a4abfebde5c..162772fdebd 100644 --- a/go/internal/database/datastore/vulnerability.go +++ b/go/internal/database/datastore/vulnerability.go @@ -10,6 +10,7 @@ import ( "cloud.google.com/go/datastore" "github.com/google/osv.dev/go/internal/models" + "github.com/ossf/osv-schema/bindings/go/osvschema" "google.golang.org/api/iterator" ) @@ -80,3 +81,15 @@ func (s *VulnerabilityStore) GetSourceModified(ctx context.Context, id string) ( return v.ModifiedRaw, nil } + +func (s *VulnerabilityStore) Get(_ context.Context, _ string) (*osvschema.Vulnerability, error) { + panic("not implemented") +} + +func (s *VulnerabilityStore) Write(_ context.Context, _ models.WriteRequest) error { + panic("not implemented") +} + +func (s *VulnerabilityStore) Withdraw(_ context.Context, _ string) error { + panic("not implemented") +} diff --git a/go/internal/importer/mock_test.go b/go/internal/importer/mock_test.go index dc7f73e106a..fea2d38ec11 100644 --- a/go/internal/importer/mock_test.go +++ b/go/internal/importer/mock_test.go @@ -8,6 +8,7 @@ import ( "time" "github.com/google/osv.dev/go/internal/models" + "github.com/ossf/osv-schema/bindings/go/osvschema" ) type mockSourceRepositoryStore struct { @@ -53,6 +54,18 @@ func (m *mockVulnerabilityStore) GetSourceModified(_ context.Context, vuln strin return time.Time{}, models.ErrNotFound } +func (m *mockVulnerabilityStore) Get(_ context.Context, _ string) (*osvschema.Vulnerability, error) { + panic("not implemented") +} + +func (m *mockVulnerabilityStore) Write(_ context.Context, _ models.WriteRequest) error { + panic("not implemented") +} + +func (m *mockVulnerabilityStore) Withdraw(_ context.Context, _ string) error { + panic("not implemented") +} + type mockSourceRecord struct { DataToRead []byte ReadError error diff --git a/go/internal/models/vulnerability.go b/go/internal/models/vulnerability.go index f761409a9b4..77d99875026 100644 --- a/go/internal/models/vulnerability.go +++ b/go/internal/models/vulnerability.go @@ -5,6 +5,8 @@ import ( "context" "iter" "time" + + "github.com/ossf/osv-schema/bindings/go/osvschema" ) // VulnSourceRef represents a minimal vulnerability entry for indexing/reconciliation. @@ -15,10 +17,38 @@ type VulnSourceRef struct { ModifiedRaw time.Time } +// WriteRequest bundles everything needed to perform a permanent update to an OSV record. +type WriteRequest struct { + ID string + Source string // The source name (e.g. "debian") + Path string // The relative path in the source (e.g. "CVE-2023.json") + Raw *osvschema.Vulnerability // The original input proto + Processed *osvschema.Vulnerability // The final enriched proto + AffectedCommits AffectedCommitsResult // Derived affected commits +} + +// AffectedCommitsResult handles the distinction between 'no change' and 'set to empty'. +type AffectedCommitsResult struct { + Commits [][]byte + Skip bool // If true, the store should not modify existing affected commits for this ID. +} + type VulnerabilityStore interface { // ListBySource returns an iterator over vulnerabilities for a given source. ListBySource(ctx context.Context, source string, skipWithdrawn bool) iter.Seq2[*VulnSourceRef, error] + // GetSourceModified returns the modified time of a vulnerability according to the source. // Returns ErrNotFound if the vulnerability is not found. GetSourceModified(ctx context.Context, id string) (time.Time, error) + + // Get returns the fully enriched vulnerability. + Get(ctx context.Context, id string) (*osvschema.Vulnerability, error) + + // Write atomically updates the base record and its derived indexes. + Write(ctx context.Context, req WriteRequest) error + + // Withdraw marks a vulnerability as withdrawn/deleted. + // Sets the withdrawn + modified dates to the current time. + // If the vulnerability is already withdrawn, this is a no-op. + Withdraw(ctx context.Context, id string) error } diff --git a/go/internal/worker/enrich/source_link_adder.go b/go/internal/worker/enrich/source_link_adder.go new file mode 100644 index 00000000000..cca8271d32f --- /dev/null +++ b/go/internal/worker/enrich/source_link_adder.go @@ -0,0 +1,31 @@ +// Package enrich contains individual vulnerability enrichers for the worker pipeline. +package enrich + +import ( + "context" + + "github.com/google/osv.dev/go/internal/worker" + "github.com/ossf/osv-schema/bindings/go/osvschema" + "google.golang.org/protobuf/types/known/structpb" +) + +type SourceLinkAdder struct{} + +var _ worker.Enricher = (*SourceLinkAdder)(nil) + +func (*SourceLinkAdder) Enrich(_ context.Context, vuln *osvschema.Vulnerability, params *worker.EnrichParams) error { + if params.SourceRepo == nil || params.SourceRepo.Link == "" { + return nil + } + sourceLink := structpb.NewStringValue(params.SourceRepo.Link + params.PathInSource) + + for _, affected := range vuln.GetAffected() { + if affected.GetDatabaseSpecific() == nil { + // The error would only be from an invalid map value, passing nil is fine. + affected.DatabaseSpecific, _ = structpb.NewStruct(nil) + } + affected.DatabaseSpecific.Fields["source"] = sourceLink + } + + return nil +} diff --git a/go/internal/worker/interfaces.go b/go/internal/worker/interfaces.go new file mode 100644 index 00000000000..b6fcd4d79f2 --- /dev/null +++ b/go/internal/worker/interfaces.go @@ -0,0 +1,22 @@ +package worker + +import ( + "context" + + "github.com/google/osv.dev/go/internal/models" + "github.com/ossf/osv-schema/bindings/go/osvschema" +) + +type Stores struct { + SourceRepo models.SourceRepositoryStore + Vulnerability models.VulnerabilityStore +} + +type EnrichParams struct { + PathInSource string + SourceRepo *models.SourceRepository +} + +type Enricher interface { + Enrich(ctx context.Context, vuln *osvschema.Vulnerability, params *EnrichParams) error +} diff --git a/go/internal/worker/subscriber.go b/go/internal/worker/subscriber.go new file mode 100644 index 00000000000..f0515a686ab --- /dev/null +++ b/go/internal/worker/subscriber.go @@ -0,0 +1,103 @@ +package worker + +import ( + "context" + "log/slog" + "strconv" + "time" + + "cloud.google.com/go/pubsub/v2" + "github.com/google/osv.dev/go/logger" + "github.com/klauspost/compress/zstd" + "github.com/ossf/osv-schema/bindings/go/osvschema" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/propagation" + "google.golang.org/protobuf/proto" +) + +type Subscriber struct { + Engine Engine + PubSubSub *pubsub.Subscriber +} + +func (s *Subscriber) Run(ctx context.Context) error { + return s.PubSubSub.Receive(ctx, func(ctx context.Context, m *pubsub.Message) { + if taskType := m.Attributes["type"]; taskType != "update" { + logger.InfoContext(ctx, "Skipping message, not an update", slog.Any("task_type", taskType)) + m.Ack() + + return + } + + taskCtx := otel.GetTextMapPropagator().Extract(ctx, propagation.MapCarrier(m.Attributes)) + taskCtx, span := otel.Tracer("worker").Start(taskCtx, "process_message") + defer span.End() + task := Task{ + SourceID: m.Attributes["source"], + PathInSource: m.Attributes["path"], + } + logInfo := []any{ + slog.String("source", task.SourceID), + slog.String("path", task.PathInSource), + } + if len(m.Data) != 0 { + if m.Attributes["content_encoding"] != "zstd" { + logger.ErrorContext(taskCtx, "Unrecognized content encoding", append(logInfo, slog.String("encoding", m.Attributes["content_encoding"]))...) + m.Nack() + + return + } + buf := make([]byte, 0, len(m.Data)*3) // let's guess 3x compression + buf, err := zstd.DecodeTo(buf, m.Data) + if err != nil { + logger.ErrorContext(taskCtx, "Failed to decompress vulnerability", append(logInfo, slog.Any("error", err))...) + m.Nack() + + return + } + task.Vuln = &osvschema.Vulnerability{} + if err := proto.Unmarshal(buf, task.Vuln); err != nil { + logger.ErrorContext(taskCtx, "Failed to unmarshal vulnerability", append(logInfo, slog.Any("error", err))...) + m.Nack() + + return + } + } + deleted, err := strconv.ParseBool(m.Attributes["deleted"]) + if err != nil { + logger.ErrorContext(taskCtx, "Failed to parse deleted attribute, defaulting to false", append(logInfo, slog.Any("error", err))...) + deleted = false + } + task.IsDeleted = deleted + + timestamp, err := strconv.ParseInt(m.Attributes["req_timestamp"], 10, 64) + if err != nil { + logger.ErrorContext(taskCtx, "Failed to parse req_timestamp attribute, ignoring", append(logInfo, slog.Any("error", err))...) + } else { + ts := time.Unix(timestamp, 0) + task.ReceivedTime = &ts + } + srcTime := m.Attributes["src_timestamp"] + if srcTime != "" { + timestamp, err = strconv.ParseInt(srcTime, 10, 64) + if err != nil { + logger.ErrorContext(taskCtx, "Failed to parse src_timestamp attribute, ignoring", append(logInfo, slog.Any("error", err))...) + } else { + ts := time.Unix(timestamp, 0) + task.SourceTime = &ts + } + } + + skipHash, ok := m.Attributes["skip_hash_check"] + if !ok || skipHash != "true" { + task.SHA256 = m.Attributes["original_sha256"] + } + + if err := s.Engine.RunTask(taskCtx, task); err != nil { + logger.ErrorContext(taskCtx, "Failed to process task", append(logInfo, slog.Any("error", err))...) + m.Nack() + } else { + m.Ack() + } + }) +} diff --git a/go/internal/worker/worker.go b/go/internal/worker/worker.go new file mode 100644 index 00000000000..fa0fe18b39c --- /dev/null +++ b/go/internal/worker/worker.go @@ -0,0 +1,99 @@ +// Package worker contains the implementation for the vulnerability enrichment worker pipeline. +package worker + +import ( + "context" + "errors" + "fmt" + "log/slog" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/google/osv.dev/go/internal/models" + "github.com/google/osv.dev/go/logger" + "github.com/ossf/osv-schema/bindings/go/osvschema" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/testing/protocmp" + "google.golang.org/protobuf/types/known/timestamppb" +) + +type Task struct { + Vuln *osvschema.Vulnerability + SourceID string + PathInSource string + IsDeleted bool + // ReceivedTime is when the importer requested the vuln to be processed. + ReceivedTime *time.Time + // SourceTime is the modified time according to the source + SourceTime *time.Time + // SHA256 is only used when Vuln is not provided + SHA256 string +} + +type Engine struct { + Stores Stores + Pipeline []Enricher +} + +func (e *Engine) RunTask(ctx context.Context, task Task) error { + if task.IsDeleted { + return e.handleDelete(ctx, task) + } + params := EnrichParams{ + PathInSource: task.PathInSource, + } + var err error + params.SourceRepo, err = e.Stores.SourceRepo.Get(ctx, task.SourceID) + if err != nil { + return err + } + if task.Vuln == nil { + // TODO: Download Vuln from source + return errors.New("vuln not provided") + } + + enriched := proto.Clone(task.Vuln).(*osvschema.Vulnerability) + for _, enricher := range e.Pipeline { + if err := enricher.Enrich(ctx, enriched, ¶ms); err != nil { + return err + } + } + + // TODO: affected commits + + // Get the current state of the vuln to check against + current, err := e.Stores.Vulnerability.Get(ctx, enriched.GetId()) + if errors.Is(err, models.ErrNotFound) { + enriched.Modified = timestamppb.Now() + } else if err != nil { + logger.ErrorContext(ctx, "Failed to get current vuln state", slog.String("vuln_id", enriched.GetId()), slog.Any("error", err)) + return fmt.Errorf("failed to get current vuln state: %w", err) + } else if e.isSemanticallyDifferent(current, enriched) { + enriched.Modified = timestamppb.Now() + } else if current.GetModified().AsTime().After(enriched.GetModified().AsTime()) { + enriched.Modified = current.GetModified() + } + + return e.Stores.Vulnerability.Write(ctx, models.WriteRequest{ + ID: enriched.GetId(), + Source: task.SourceID, + Path: task.PathInSource, + Raw: task.Vuln, + Processed: enriched, + AffectedCommits: models.AffectedCommitsResult{ + Skip: true, + }, + }) +} + +func (e *Engine) isSemanticallyDifferent(v1, v2 *osvschema.Vulnerability) bool { + return !cmp.Equal(v1, v2, + protocmp.Transform(), + protocmp.IgnoreFields(&osvschema.Vulnerability{}, "modified", "published"), + ) +} + +func (e *Engine) handleDelete(_ context.Context, _ Task) error { + // TODO + return nil +} From 974e401d885d8052b0478e8e9621b654a2bcc313 Mon Sep 17 00:00:00 2001 From: michaelkedar Date: Wed, 22 Apr 2026 00:28:10 +0000 Subject: [PATCH 2/8] add a log --- go/internal/worker/worker.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/go/internal/worker/worker.go b/go/internal/worker/worker.go index fa0fe18b39c..a861da1ba92 100644 --- a/go/internal/worker/worker.go +++ b/go/internal/worker/worker.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "log/slog" + "reflect" "time" "github.com/google/go-cmp/cmp" @@ -55,6 +56,12 @@ func (e *Engine) RunTask(ctx context.Context, task Task) error { enriched := proto.Clone(task.Vuln).(*osvschema.Vulnerability) for _, enricher := range e.Pipeline { if err := enricher.Enrich(ctx, enriched, ¶ms); err != nil { + logger.ErrorContext(ctx, "Enricher failed with error", + slog.String("id", task.Vuln.GetId()), + slog.String("enricher", reflect.TypeOf(enricher).Name()), + slog.Any("error", err), + ) + return err } } From 9725af0757b51aacb3fd83a77b933a1cf53d7ecb Mon Sep 17 00:00:00 2001 From: michaelkedar Date: Thu, 23 Apr 2026 03:38:39 +0000 Subject: [PATCH 3/8] some review comments --- go/internal/worker/engine.go | 103 +++++++++++++++++++++++++++++++ go/internal/worker/interfaces.go | 22 ------- go/internal/worker/subscriber.go | 77 ++++++++++++++--------- go/internal/worker/worker.go | 95 +++++----------------------- 4 files changed, 168 insertions(+), 129 deletions(-) create mode 100644 go/internal/worker/engine.go delete mode 100644 go/internal/worker/interfaces.go diff --git a/go/internal/worker/engine.go b/go/internal/worker/engine.go new file mode 100644 index 00000000000..bf06c96ba0d --- /dev/null +++ b/go/internal/worker/engine.go @@ -0,0 +1,103 @@ +// Package worker contains the implementation for the vulnerability enrichment worker pipeline. +package worker + +import ( + "context" + "errors" + "fmt" + "log/slog" + "reflect" + + "github.com/google/go-cmp/cmp" + "github.com/google/osv.dev/go/internal/models" + "github.com/google/osv.dev/go/logger" + "github.com/ossf/osv-schema/bindings/go/osvschema" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/testing/protocmp" + "google.golang.org/protobuf/types/known/timestamppb" +) + +type Engine struct { + Stores Stores + Pipeline []Enricher +} + +func (e *Engine) RunTask(ctx context.Context, task Task) error { + switch task.Type { + case TaskDelete: + return e.handleDelete(ctx, task) + case TaskUpdate: + return e.handleUpdate(ctx, task) + default: + return fmt.Errorf("unknown task type: %v", task.Type) + } +} + +func (e *Engine) handleUpdate(ctx context.Context, task Task) error { + params := EnrichParams{ + PathInSource: task.PathInSource, + } + var err error + params.SourceRepo, err = e.Stores.SourceRepo.Get(ctx, task.SourceID) + if err != nil { + return err + } + if task.Vuln == nil { + // TODO: Download Vuln from source + return errors.New("vuln not provided") + } + + enriched := proto.Clone(task.Vuln).(*osvschema.Vulnerability) + for _, enricher := range e.Pipeline { + if err := enricher.Enrich(ctx, enriched, ¶ms); err != nil { + logger.ErrorContext(ctx, "Enricher failed with error", + slog.String("id", task.Vuln.GetId()), + slog.String("enricher", reflect.TypeOf(enricher).Name()), + slog.Any("error", err), + ) + + return err + } + } + + // TODO: affected commits + + // Get the current state of the vuln to check against + current, err := e.Stores.Vulnerability.Get(ctx, enriched.GetId()) + isNotFound := errors.Is(err, models.ErrNotFound) + + if err != nil && !isNotFound { + logger.ErrorContext(ctx, "Failed to get current vuln state", slog.String("vuln_id", enriched.GetId()), slog.Any("error", err)) + + return fmt.Errorf("failed to get current vuln state: %w", err) + } + + if isNotFound || e.isSemanticallyDifferent(current, enriched) { + enriched.Modified = timestamppb.Now() + } else if current.GetModified().AsTime().After(enriched.GetModified().AsTime()) { + enriched.Modified = current.GetModified() + } + + return e.Stores.Vulnerability.Write(ctx, models.WriteRequest{ + ID: enriched.GetId(), + Source: task.SourceID, + Path: task.PathInSource, + Raw: task.Vuln, + Processed: enriched, + AffectedCommits: models.AffectedCommitsResult{ + Skip: true, + }, + }) +} + +func (e *Engine) isSemanticallyDifferent(v1, v2 *osvschema.Vulnerability) bool { + return !cmp.Equal(v1, v2, + protocmp.Transform(), + protocmp.IgnoreFields(&osvschema.Vulnerability{}, "modified", "published"), + ) +} + +func (e *Engine) handleDelete(_ context.Context, _ Task) error { + // TODO + return nil +} diff --git a/go/internal/worker/interfaces.go b/go/internal/worker/interfaces.go deleted file mode 100644 index b6fcd4d79f2..00000000000 --- a/go/internal/worker/interfaces.go +++ /dev/null @@ -1,22 +0,0 @@ -package worker - -import ( - "context" - - "github.com/google/osv.dev/go/internal/models" - "github.com/ossf/osv-schema/bindings/go/osvschema" -) - -type Stores struct { - SourceRepo models.SourceRepositoryStore - Vulnerability models.VulnerabilityStore -} - -type EnrichParams struct { - PathInSource string - SourceRepo *models.SourceRepository -} - -type Enricher interface { - Enrich(ctx context.Context, vuln *osvschema.Vulnerability, params *EnrichParams) error -} diff --git a/go/internal/worker/subscriber.go b/go/internal/worker/subscriber.go index f0515a686ab..ebb3358e4f4 100644 --- a/go/internal/worker/subscriber.go +++ b/go/internal/worker/subscriber.go @@ -2,6 +2,7 @@ package worker import ( "context" + "fmt" "log/slog" "strconv" "time" @@ -36,55 +37,41 @@ func (s *Subscriber) Run(ctx context.Context) error { SourceID: m.Attributes["source"], PathInSource: m.Attributes["path"], } + logInfo := []any{ slog.String("source", task.SourceID), slog.String("path", task.PathInSource), } - if len(m.Data) != 0 { - if m.Attributes["content_encoding"] != "zstd" { - logger.ErrorContext(taskCtx, "Unrecognized content encoding", append(logInfo, slog.String("encoding", m.Attributes["content_encoding"]))...) - m.Nack() - return - } - buf := make([]byte, 0, len(m.Data)*3) // let's guess 3x compression - buf, err := zstd.DecodeTo(buf, m.Data) - if err != nil { - logger.ErrorContext(taskCtx, "Failed to decompress vulnerability", append(logInfo, slog.Any("error", err))...) - m.Nack() - - return - } - task.Vuln = &osvschema.Vulnerability{} - if err := proto.Unmarshal(buf, task.Vuln); err != nil { - logger.ErrorContext(taskCtx, "Failed to unmarshal vulnerability", append(logInfo, slog.Any("error", err))...) - m.Nack() + var err error + task.Vuln, err = s.parseVuln(m) + if err != nil { + logger.ErrorContext(taskCtx, "Failed to parse vulnerability", append(logInfo, slog.Any("error", err))...) + m.Nack() - return - } + return } + deleted, err := strconv.ParseBool(m.Attributes["deleted"]) if err != nil { logger.ErrorContext(taskCtx, "Failed to parse deleted attribute, defaulting to false", append(logInfo, slog.Any("error", err))...) deleted = false } - task.IsDeleted = deleted + if deleted { + task.Type = TaskDelete + } else { + task.Type = TaskUpdate + } - timestamp, err := strconv.ParseInt(m.Attributes["req_timestamp"], 10, 64) + task.ReceivedTime, err = s.timeFromUnixSeconds(m.Attributes["req_timestamp"]) if err != nil { logger.ErrorContext(taskCtx, "Failed to parse req_timestamp attribute, ignoring", append(logInfo, slog.Any("error", err))...) - } else { - ts := time.Unix(timestamp, 0) - task.ReceivedTime = &ts } srcTime := m.Attributes["src_timestamp"] if srcTime != "" { - timestamp, err = strconv.ParseInt(srcTime, 10, 64) + task.SourceTime, err = s.timeFromUnixSeconds(srcTime) if err != nil { logger.ErrorContext(taskCtx, "Failed to parse src_timestamp attribute, ignoring", append(logInfo, slog.Any("error", err))...) - } else { - ts := time.Unix(timestamp, 0) - task.SourceTime = &ts } } @@ -101,3 +88,35 @@ func (s *Subscriber) Run(ctx context.Context) error { } }) } + +func (s *Subscriber) parseVuln(m *pubsub.Message) (*osvschema.Vulnerability, error) { + if len(m.Data) == 0 { + //nolint:nilnil // this is expected for delete requests + return nil, nil + } + if m.Attributes["content_encoding"] != "zstd" { + return nil, fmt.Errorf("unrecognized content encoding: %s", m.Attributes["content_encoding"]) + } + // TODO: try to extract the actual uncompressed size from the zstd frame. + buf := make([]byte, 0, len(m.Data)*3) + buf, err := zstd.DecodeTo(buf, m.Data) + if err != nil { + return nil, fmt.Errorf("failed to decompress vulnerability: %w", err) + } + v := &osvschema.Vulnerability{} + if err := proto.Unmarshal(buf, v); err != nil { + return nil, fmt.Errorf("failed to unmarshal vulnerability: %w", err) + } + + return v, nil +} + +func (s *Subscriber) timeFromUnixSeconds(tsString string) (*time.Time, error) { + timestamp, err := strconv.ParseInt(tsString, 10, 64) + if err != nil { + return nil, err + } + ts := time.Unix(timestamp, 0) + + return &ts, nil +} diff --git a/go/internal/worker/worker.go b/go/internal/worker/worker.go index a861da1ba92..a92df65813d 100644 --- a/go/internal/worker/worker.go +++ b/go/internal/worker/worker.go @@ -3,26 +3,25 @@ package worker import ( "context" - "errors" - "fmt" - "log/slog" - "reflect" "time" - "github.com/google/go-cmp/cmp" "github.com/google/osv.dev/go/internal/models" - "github.com/google/osv.dev/go/logger" "github.com/ossf/osv-schema/bindings/go/osvschema" - "google.golang.org/protobuf/proto" - "google.golang.org/protobuf/testing/protocmp" - "google.golang.org/protobuf/types/known/timestamppb" +) + +type TaskType int + +const ( + TaskUnknown TaskType = iota + TaskUpdate + TaskDelete ) type Task struct { + Type TaskType Vuln *osvschema.Vulnerability SourceID string PathInSource string - IsDeleted bool // ReceivedTime is when the importer requested the vuln to be processed. ReceivedTime *time.Time // SourceTime is the modified time according to the source @@ -31,76 +30,16 @@ type Task struct { SHA256 string } -type Engine struct { - Stores Stores - Pipeline []Enricher +type Stores struct { + SourceRepo models.SourceRepositoryStore + Vulnerability models.VulnerabilityStore } -func (e *Engine) RunTask(ctx context.Context, task Task) error { - if task.IsDeleted { - return e.handleDelete(ctx, task) - } - params := EnrichParams{ - PathInSource: task.PathInSource, - } - var err error - params.SourceRepo, err = e.Stores.SourceRepo.Get(ctx, task.SourceID) - if err != nil { - return err - } - if task.Vuln == nil { - // TODO: Download Vuln from source - return errors.New("vuln not provided") - } - - enriched := proto.Clone(task.Vuln).(*osvschema.Vulnerability) - for _, enricher := range e.Pipeline { - if err := enricher.Enrich(ctx, enriched, ¶ms); err != nil { - logger.ErrorContext(ctx, "Enricher failed with error", - slog.String("id", task.Vuln.GetId()), - slog.String("enricher", reflect.TypeOf(enricher).Name()), - slog.Any("error", err), - ) - - return err - } - } - - // TODO: affected commits - - // Get the current state of the vuln to check against - current, err := e.Stores.Vulnerability.Get(ctx, enriched.GetId()) - if errors.Is(err, models.ErrNotFound) { - enriched.Modified = timestamppb.Now() - } else if err != nil { - logger.ErrorContext(ctx, "Failed to get current vuln state", slog.String("vuln_id", enriched.GetId()), slog.Any("error", err)) - return fmt.Errorf("failed to get current vuln state: %w", err) - } else if e.isSemanticallyDifferent(current, enriched) { - enriched.Modified = timestamppb.Now() - } else if current.GetModified().AsTime().After(enriched.GetModified().AsTime()) { - enriched.Modified = current.GetModified() - } - - return e.Stores.Vulnerability.Write(ctx, models.WriteRequest{ - ID: enriched.GetId(), - Source: task.SourceID, - Path: task.PathInSource, - Raw: task.Vuln, - Processed: enriched, - AffectedCommits: models.AffectedCommitsResult{ - Skip: true, - }, - }) -} - -func (e *Engine) isSemanticallyDifferent(v1, v2 *osvschema.Vulnerability) bool { - return !cmp.Equal(v1, v2, - protocmp.Transform(), - protocmp.IgnoreFields(&osvschema.Vulnerability{}, "modified", "published"), - ) +type EnrichParams struct { + PathInSource string + SourceRepo *models.SourceRepository } -func (e *Engine) handleDelete(_ context.Context, _ Task) error { - // TODO - return nil +type Enricher interface { + Enrich(ctx context.Context, vuln *osvschema.Vulnerability, params *EnrichParams) error } From 18dff425605cddcc516e0161cd35a53e508e01e0 Mon Sep 17 00:00:00 2001 From: michaelkedar Date: Thu, 23 Apr 2026 03:41:15 +0000 Subject: [PATCH 4/8] de-anonymize function --- go/internal/worker/subscriber.go | 110 ++++++++++++++++--------------- 1 file changed, 56 insertions(+), 54 deletions(-) diff --git a/go/internal/worker/subscriber.go b/go/internal/worker/subscriber.go index ebb3358e4f4..e4da5ea2016 100644 --- a/go/internal/worker/subscriber.go +++ b/go/internal/worker/subscriber.go @@ -22,71 +22,73 @@ type Subscriber struct { } func (s *Subscriber) Run(ctx context.Context) error { - return s.PubSubSub.Receive(ctx, func(ctx context.Context, m *pubsub.Message) { - if taskType := m.Attributes["type"]; taskType != "update" { - logger.InfoContext(ctx, "Skipping message, not an update", slog.Any("task_type", taskType)) - m.Ack() + return s.PubSubSub.Receive(ctx, s.handleMessage) +} - return - } +func (s *Subscriber) handleMessage(ctx context.Context, m *pubsub.Message) { + if taskType := m.Attributes["type"]; taskType != "update" { + logger.InfoContext(ctx, "Skipping message, not an update", slog.Any("task_type", taskType)) + m.Ack() - taskCtx := otel.GetTextMapPropagator().Extract(ctx, propagation.MapCarrier(m.Attributes)) - taskCtx, span := otel.Tracer("worker").Start(taskCtx, "process_message") - defer span.End() - task := Task{ - SourceID: m.Attributes["source"], - PathInSource: m.Attributes["path"], - } + return + } - logInfo := []any{ - slog.String("source", task.SourceID), - slog.String("path", task.PathInSource), - } + taskCtx := otel.GetTextMapPropagator().Extract(ctx, propagation.MapCarrier(m.Attributes)) + taskCtx, span := otel.Tracer("worker").Start(taskCtx, "process_message") + defer span.End() + task := Task{ + SourceID: m.Attributes["source"], + PathInSource: m.Attributes["path"], + } - var err error - task.Vuln, err = s.parseVuln(m) - if err != nil { - logger.ErrorContext(taskCtx, "Failed to parse vulnerability", append(logInfo, slog.Any("error", err))...) - m.Nack() + logInfo := []any{ + slog.String("source", task.SourceID), + slog.String("path", task.PathInSource), + } - return - } + var err error + task.Vuln, err = s.parseVuln(m) + if err != nil { + logger.ErrorContext(taskCtx, "Failed to parse vulnerability", append(logInfo, slog.Any("error", err))...) + m.Nack() - deleted, err := strconv.ParseBool(m.Attributes["deleted"]) - if err != nil { - logger.ErrorContext(taskCtx, "Failed to parse deleted attribute, defaulting to false", append(logInfo, slog.Any("error", err))...) - deleted = false - } - if deleted { - task.Type = TaskDelete - } else { - task.Type = TaskUpdate - } + return + } - task.ReceivedTime, err = s.timeFromUnixSeconds(m.Attributes["req_timestamp"]) + deleted, err := strconv.ParseBool(m.Attributes["deleted"]) + if err != nil { + logger.ErrorContext(taskCtx, "Failed to parse deleted attribute, defaulting to false", append(logInfo, slog.Any("error", err))...) + deleted = false + } + if deleted { + task.Type = TaskDelete + } else { + task.Type = TaskUpdate + } + + task.ReceivedTime, err = s.timeFromUnixSeconds(m.Attributes["req_timestamp"]) + if err != nil { + logger.ErrorContext(taskCtx, "Failed to parse req_timestamp attribute, ignoring", append(logInfo, slog.Any("error", err))...) + } + srcTime := m.Attributes["src_timestamp"] + if srcTime != "" { + task.SourceTime, err = s.timeFromUnixSeconds(srcTime) if err != nil { - logger.ErrorContext(taskCtx, "Failed to parse req_timestamp attribute, ignoring", append(logInfo, slog.Any("error", err))...) - } - srcTime := m.Attributes["src_timestamp"] - if srcTime != "" { - task.SourceTime, err = s.timeFromUnixSeconds(srcTime) - if err != nil { - logger.ErrorContext(taskCtx, "Failed to parse src_timestamp attribute, ignoring", append(logInfo, slog.Any("error", err))...) - } + logger.ErrorContext(taskCtx, "Failed to parse src_timestamp attribute, ignoring", append(logInfo, slog.Any("error", err))...) } + } - skipHash, ok := m.Attributes["skip_hash_check"] - if !ok || skipHash != "true" { - task.SHA256 = m.Attributes["original_sha256"] - } + skipHash, ok := m.Attributes["skip_hash_check"] + if !ok || skipHash != "true" { + task.SHA256 = m.Attributes["original_sha256"] + } - if err := s.Engine.RunTask(taskCtx, task); err != nil { - logger.ErrorContext(taskCtx, "Failed to process task", append(logInfo, slog.Any("error", err))...) - m.Nack() - } else { - m.Ack() - } - }) + if err := s.Engine.RunTask(taskCtx, task); err != nil { + logger.ErrorContext(taskCtx, "Failed to process task", append(logInfo, slog.Any("error", err))...) + m.Nack() + } else { + m.Ack() + } } func (s *Subscriber) parseVuln(m *pubsub.Message) (*osvschema.Vulnerability, error) { From a6b32dd3e182dc60a2ba9efdf82127643b6d9128 Mon Sep 17 00:00:00 2001 From: michaelkedar Date: Thu, 23 Apr 2026 04:04:50 +0000 Subject: [PATCH 5/8] do some package shenanigans --- go/internal/worker/engine.go | 5 +++-- go/internal/worker/pipeline/enrich.go | 18 ++++++++++++++++++ .../worker/pipeline/registry/registry.go | 12 ++++++++++++ .../sourcelink/sourcelink.go} | 14 ++++++++------ go/internal/worker/worker.go | 10 ---------- 5 files changed, 41 insertions(+), 18 deletions(-) create mode 100644 go/internal/worker/pipeline/enrich.go create mode 100644 go/internal/worker/pipeline/registry/registry.go rename go/internal/worker/{enrich/source_link_adder.go => pipeline/sourcelink/sourcelink.go} (52%) diff --git a/go/internal/worker/engine.go b/go/internal/worker/engine.go index bf06c96ba0d..6a3362f49c7 100644 --- a/go/internal/worker/engine.go +++ b/go/internal/worker/engine.go @@ -10,6 +10,7 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/osv.dev/go/internal/models" + "github.com/google/osv.dev/go/internal/worker/pipeline" "github.com/google/osv.dev/go/logger" "github.com/ossf/osv-schema/bindings/go/osvschema" "google.golang.org/protobuf/proto" @@ -19,7 +20,7 @@ import ( type Engine struct { Stores Stores - Pipeline []Enricher + Pipeline []pipeline.Enricher } func (e *Engine) RunTask(ctx context.Context, task Task) error { @@ -34,7 +35,7 @@ func (e *Engine) RunTask(ctx context.Context, task Task) error { } func (e *Engine) handleUpdate(ctx context.Context, task Task) error { - params := EnrichParams{ + params := pipeline.EnrichParams{ PathInSource: task.PathInSource, } var err error diff --git a/go/internal/worker/pipeline/enrich.go b/go/internal/worker/pipeline/enrich.go new file mode 100644 index 00000000000..d189bbfb174 --- /dev/null +++ b/go/internal/worker/pipeline/enrich.go @@ -0,0 +1,18 @@ +// Package pipeline contains individual vulnerability enrichers for the worker pipeline. +package pipeline + +import ( + "context" + + "github.com/google/osv.dev/go/internal/models" + "github.com/ossf/osv-schema/bindings/go/osvschema" +) + +type EnrichParams struct { + PathInSource string + SourceRepo *models.SourceRepository +} + +type Enricher interface { + Enrich(ctx context.Context, vuln *osvschema.Vulnerability, params *EnrichParams) error +} diff --git a/go/internal/worker/pipeline/registry/registry.go b/go/internal/worker/pipeline/registry/registry.go new file mode 100644 index 00000000000..e71043840aa --- /dev/null +++ b/go/internal/worker/pipeline/registry/registry.go @@ -0,0 +1,12 @@ +// Package registry contains all the enrichers that are used in the worker pipeline. +package registry + +import ( + "github.com/google/osv.dev/go/internal/worker/pipeline" + "github.com/google/osv.dev/go/internal/worker/pipeline/sourcelink" +) + +// List is the list of all enrichers used in the worker pipeline. +var List = []pipeline.Enricher{ + &sourcelink.Enricher{}, +} diff --git a/go/internal/worker/enrich/source_link_adder.go b/go/internal/worker/pipeline/sourcelink/sourcelink.go similarity index 52% rename from go/internal/worker/enrich/source_link_adder.go rename to go/internal/worker/pipeline/sourcelink/sourcelink.go index cca8271d32f..829c2b2eb02 100644 --- a/go/internal/worker/enrich/source_link_adder.go +++ b/go/internal/worker/pipeline/sourcelink/sourcelink.go @@ -1,19 +1,21 @@ -// Package enrich contains individual vulnerability enrichers for the worker pipeline. -package enrich +// Package sourcelink implements an enricher that adds the source link to the vulnerability. +// The source link is added under the database_specific field under each affected range, +// with they key "source" and the value being the full path to the vulnerability in the source repo. +package sourcelink import ( "context" - "github.com/google/osv.dev/go/internal/worker" + "github.com/google/osv.dev/go/internal/worker/pipeline" "github.com/ossf/osv-schema/bindings/go/osvschema" "google.golang.org/protobuf/types/known/structpb" ) -type SourceLinkAdder struct{} +type Enricher struct{} -var _ worker.Enricher = (*SourceLinkAdder)(nil) +var _ pipeline.Enricher = (*Enricher)(nil) -func (*SourceLinkAdder) Enrich(_ context.Context, vuln *osvschema.Vulnerability, params *worker.EnrichParams) error { +func (*Enricher) Enrich(_ context.Context, vuln *osvschema.Vulnerability, params *pipeline.EnrichParams) error { if params.SourceRepo == nil || params.SourceRepo.Link == "" { return nil } diff --git a/go/internal/worker/worker.go b/go/internal/worker/worker.go index a92df65813d..9d043abfe9a 100644 --- a/go/internal/worker/worker.go +++ b/go/internal/worker/worker.go @@ -2,7 +2,6 @@ package worker import ( - "context" "time" "github.com/google/osv.dev/go/internal/models" @@ -34,12 +33,3 @@ type Stores struct { SourceRepo models.SourceRepositoryStore Vulnerability models.VulnerabilityStore } - -type EnrichParams struct { - PathInSource string - SourceRepo *models.SourceRepository -} - -type Enricher interface { - Enrich(ctx context.Context, vuln *osvschema.Vulnerability, params *EnrichParams) error -} From 54993abd76661dfee7c93ce13174647d8bb6ac66 Mon Sep 17 00:00:00 2001 From: michaelkedar Date: Thu, 23 Apr 2026 04:11:01 +0000 Subject: [PATCH 6/8] Processed -> Enriched --- go/internal/models/vulnerability.go | 2 +- go/internal/worker/engine.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/go/internal/models/vulnerability.go b/go/internal/models/vulnerability.go index 77d99875026..06b1b23298c 100644 --- a/go/internal/models/vulnerability.go +++ b/go/internal/models/vulnerability.go @@ -23,7 +23,7 @@ type WriteRequest struct { Source string // The source name (e.g. "debian") Path string // The relative path in the source (e.g. "CVE-2023.json") Raw *osvschema.Vulnerability // The original input proto - Processed *osvschema.Vulnerability // The final enriched proto + Enriched *osvschema.Vulnerability // The final enriched proto AffectedCommits AffectedCommitsResult // Derived affected commits } diff --git a/go/internal/worker/engine.go b/go/internal/worker/engine.go index 6a3362f49c7..0dd4136179f 100644 --- a/go/internal/worker/engine.go +++ b/go/internal/worker/engine.go @@ -84,7 +84,7 @@ func (e *Engine) handleUpdate(ctx context.Context, task Task) error { Source: task.SourceID, Path: task.PathInSource, Raw: task.Vuln, - Processed: enriched, + Enriched: enriched, AffectedCommits: models.AffectedCommitsResult{ Skip: true, }, From d1470ff532552fec847724e00fa43e965acdb30a Mon Sep 17 00:00:00 2001 From: michaelkedar Date: Thu, 23 Apr 2026 04:16:42 +0000 Subject: [PATCH 7/8] formaterattering --- go/internal/models/vulnerability.go | 2 +- go/internal/worker/engine.go | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/go/internal/models/vulnerability.go b/go/internal/models/vulnerability.go index 06b1b23298c..898c2423365 100644 --- a/go/internal/models/vulnerability.go +++ b/go/internal/models/vulnerability.go @@ -23,7 +23,7 @@ type WriteRequest struct { Source string // The source name (e.g. "debian") Path string // The relative path in the source (e.g. "CVE-2023.json") Raw *osvschema.Vulnerability // The original input proto - Enriched *osvschema.Vulnerability // The final enriched proto + Enriched *osvschema.Vulnerability // The final enriched proto AffectedCommits AffectedCommitsResult // Derived affected commits } diff --git a/go/internal/worker/engine.go b/go/internal/worker/engine.go index 0dd4136179f..6fbb3a71944 100644 --- a/go/internal/worker/engine.go +++ b/go/internal/worker/engine.go @@ -80,10 +80,10 @@ func (e *Engine) handleUpdate(ctx context.Context, task Task) error { } return e.Stores.Vulnerability.Write(ctx, models.WriteRequest{ - ID: enriched.GetId(), - Source: task.SourceID, - Path: task.PathInSource, - Raw: task.Vuln, + ID: enriched.GetId(), + Source: task.SourceID, + Path: task.PathInSource, + Raw: task.Vuln, Enriched: enriched, AffectedCommits: models.AffectedCommitsResult{ Skip: true, From fdc872b30cb66c3683ac4d3f7b5499d1db3b73c4 Mon Sep 17 00:00:00 2001 From: michaelkedar Date: Thu, 23 Apr 2026 06:17:19 +0000 Subject: [PATCH 8/8] fix some logging naming --- go/internal/worker/engine.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/go/internal/worker/engine.go b/go/internal/worker/engine.go index 6fbb3a71944..1f518731a0a 100644 --- a/go/internal/worker/engine.go +++ b/go/internal/worker/engine.go @@ -6,7 +6,6 @@ import ( "errors" "fmt" "log/slog" - "reflect" "github.com/google/go-cmp/cmp" "github.com/google/osv.dev/go/internal/models" @@ -53,7 +52,7 @@ func (e *Engine) handleUpdate(ctx context.Context, task Task) error { if err := enricher.Enrich(ctx, enriched, ¶ms); err != nil { logger.ErrorContext(ctx, "Enricher failed with error", slog.String("id", task.Vuln.GetId()), - slog.String("enricher", reflect.TypeOf(enricher).Name()), + slog.String("enricher", fmt.Sprintf("%T", enricher)), slog.Any("error", err), )