Skip to content

feat: begin laying out things for go worker#5275

Draft
michaelkedar wants to merge 8 commits intogoogle:masterfrom
michaelkedar:👷🙅🐍

Hidden character warning

The head ref may contain hidden characters: "\ud83d\udc77\ud83d\ude45\ud83d\udc0d"
Draft

feat: begin laying out things for go worker#5275
michaelkedar wants to merge 8 commits intogoogle:masterfrom
michaelkedar:👷🙅🐍

Conversation

@michaelkedar
Copy link
Copy Markdown
Member

@michaelkedar michaelkedar commented Apr 22, 2026

Very much not complete, but think of this as a design doc without the doc part.

The basic idea is to have a bunch of Enrichers in charge of populating fields in the records as the worker currently does, but in a more concrete and modular pipeline.
I've Implemented the thing that adds the source link into database_specific as an example, but Enrichers will include version enumeration, PURL generated, etc.

Affected commit computation and enumeration will happen after the enricher pipeline, since it needs to return the list of commits that are not part of the OSV record.

I've made two separate structs for the pub/sub subscriber/parser and the actual vuln processing struct (the 'engine') which hopefully makes it less coupled to pub/sub.

Comment thread go/internal/worker/subscriber.go Outdated
}

func (s *Subscriber) Run(ctx context.Context) error {
return s.PubSubSub.Receive(ctx, func(ctx context.Context, m *pubsub.Message) {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

by they way, you can configure parallelism on this pubsub.Subscriber, so we wouldn't have to manually manage a worker pool

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have an example of how this is done? It just calls the Receive function in multiple goroutines?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah

There's Subscriber.ReceiveSettings.MaxOutstandingMessages to configure the number of messages (and therefore goroutines) to be processed at once

Copy link
Copy Markdown
Contributor

@another-rex another-rex left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Architecture LGTM!

Comment thread go/internal/worker/subscriber.go Outdated

return
}
buf := make([]byte, 0, len(m.Data)*3) // let's guess 3x compression
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

zstd probably has the full size in the encoded data's metadata, it's apparently an optional header, but we can just make sure we encode that on the encoding end.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually had a quick look at how to do this, seems kind of annoying to do, so probably just add a TODO here and leave it.

Comment thread go/internal/worker/subscriber.go Outdated
slog.String("source", task.SourceID),
slog.String("path", task.PathInSource),
}
if len(m.Data) != 0 {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add some new lines and comments to break up this function a bit. (Or probably better to put it into separate functions)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

functionized bits

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we just call this file engine?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed this to engine; moved some other things to what once was interfaces and renamed that to worker 🙂

Comment thread go/internal/worker/worker.go Outdated
} else if err != nil {
logger.ErrorContext(ctx, "Failed to get current vuln state", slog.String("vuln_id", enriched.GetId()), slog.Any("error", err))
return fmt.Errorf("failed to get current vuln state: %w", err)
} else if e.isSemanticallyDifferent(current, enriched) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove else since it's already returngin

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still needs the else since the first if doesn't return

maybe this would be neater as a switch statement?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

didn't make it a switch, but changed the logic to be a bit easier to follow

Comment thread go/internal/worker/worker.go Outdated
Comment thread go/internal/worker/worker.go Outdated
}
if task.Vuln == nil {
// TODO: Download Vuln from source
return errors.New("vuln not provided")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think returning the error is the correct move. We shouldn't try to download the vuln in worker, that should be importer's job.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I'm worried about what if the compressed vuln is too big to send over pub/sub (which, I guess would be really big), and also if we ever want to request an update to a record from a datafix tool or something we'd need to download it there as well.

Copy link
Copy Markdown
Contributor

@another-rex another-rex Apr 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm still think the vuln should be sent along by the datafix tool right?

For the really big records, I think that's something we should solve with a side channel to pass the vuln along. I just think it's going to get really messy / buggy if we start adding logic for getting the record from upstream into the worker. Probably something we don't need to worry about for now as I think with zstd compression it has to be something truly monstrous for it to be a problem, and it'll likely cause other breakages as well.


var _ worker.Enricher = (*SourceLinkAdder)(nil)

func (*SourceLinkAdder) Enrich(_ context.Context, vuln *osvschema.Vulnerability, params *worker.EnrichParams) error {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the package comment, I think this is likely intended as an example, rather than the full complement of enrichers? Perhaps say so explicitly in a comment.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved some things around and moved each enricher to its own package so that the package comments are more specific to the package

Vulnerability models.VulnerabilityStore
}

type EnrichParams struct {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these the only things that would be needed for any possible enricher? If not, would you expect that we would include all possible types as (optional, I guess) fields here?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This I don't have a great design for.

This struct should contain all things needed across all enrichers, so if a new enricher needs something else we'd have to add it here and populate it.

I guess technically all the fields are optional, (if a specific enricher doesn't need it) but practically, we'd be running every enricher so all the fields become required.

Comment thread go/internal/models/vulnerability.go Outdated
Source string // The source name (e.g. "debian")
Path string // The relative path in the source (e.g. "CVE-2023.json")
Raw *osvschema.Vulnerability // The original input proto
Processed *osvschema.Vulnerability // The final enriched proto
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Elsewhere you seem to have called this "enriched". Is the difference meaningful?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Processed felt a bit more generic, but I can change it to enriched for consistency

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants