-
Notifications
You must be signed in to change notification settings - Fork 0
chore(refactor): simplify graph ops #102
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
folded
commented
May 26, 2025
- Simplify the implementations of graph walking.
- Update comments and move only_stages application
📊 SonarQube Summary
|
MattWellie
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
V interesting, looks much simpler. Shadow compute test is compelling.
📊 SonarQube Summary
|
📊 SonarQube Summary
|
Bring only_stages closer to first/last_stages.
e59422a to
07cb65c
Compare
275a341 to
ecb4f45
Compare
|
Apologies for my delayed response. Feel free to go ahead and bump the requests version to pass the security check. If you could install the pre-commit hooks as well that would be great. Should have said this earlier but you can check it all out in the contirbutors file. https://github.com/populationgenomics/cpg-flow/blob/main/CONTRIBUTING.md If this branch contains a fix have at least one commit See https://www.conventionalcommits.org/en/v1.0.0/ for a full breakdown of the convention |
|
Just need to merge so it's not out of sync with main. |
🐳 Docker Image BuiltA new Docker image has been built for this PR: Image: Pull command: docker pull australia-southeast1-docker.pkg.dev/cpg-common/images-dev/cpg_flow:dfa7e69708407784fd683434db446ff80d390725🔗 View in Google Cloud Console This comment was automatically generated by the Docker workflow. |
MattWellie
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this, compelling test case, much more easily explained logical flow
rameshka
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @folded, thanks for taking on this work to improve the workflow logic—and sorry for taking too long to review it.
Overall, the proposed improvements look good. I’ve left a few comments for you to take a look at. Since this logic is fairly complex, adding some inline comments would also help make it easier to follow and maintain.
| stages_dict: dict[str, 'Stage'] = {} # noqa: UP037 | ||
|
|
||
| def _make_once(cls) -> tuple['Stage', bool]: | ||
| try: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of catching the KeyError, we can simplify this by using a safe lookup on stages_dict. Something like:
instance = stages_dict.get(cls.__name__)
if instance is not None:
return instance, False
instance = stages_dict[cls.__name__] = cls()
return instance, True
| stages_dict |= implicit_stages | ||
| def _instantiate_stages( | ||
| requested_stages: list['StageDecorator'], skip_stages: list[str], only_stages: list[str] | ||
| ) -> dict[str, 'Stage']: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is nice and improves some inefficiencies in the previous logic (eg, processing the same stage more than once).
| if not instance.skipped: | ||
| instance.required_stages.extend( | ||
| filter(None, map(_recursively_make_stage, instance.required_stages_classes)), | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think, if we move the only_stages logic here, we can avoid re-iterating over the stages_dict logic (between lines 543-546).
Something like:
if only_stages:
if cls.__name__ not in only_stages:
instance.skipped = True
| return out | ||
|
|
||
|
|
||
| def _compute_shadow(graph: nx.DiGraph, shadow_casters: set[str]) -> set[str]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic looks more concise than the previous implementation. @folded, I've included a few differences I've noticed between the new logic and the previous implementation. Here, I did a 1:1 comparison, but if these changes are intentional, feel free to skip my comment.
(In the workflow examples, -> points to the execution order and not the edge direction in the DAG object)
- When
last_stagescontains multiple stages on the same path, the previous logic picks the downstream stage (to skip the stages further downstream).
Let's say we have a workflowA->B->C->D. If we definelast_stages= [B,C], the previous logic skips onlyD, but the new logic will skipC,D.
This happens when B becomes a shadow caster with shadowed={C, D}.
- Stage skipping when both
last_stagesandfirst_stagesare defined.
Let's say we have a workflow withfirst_stages = Bandlast_stages=F
A->C
->B->D
->E->F->G
->G
The previous logic will result in,
B->D
->E->F
But in the new logic,
Awill not be skipped - Even thoughBis ashadow caster,Cwill light upAand thelast_stage keptlogic will includeA.Gwill not be skipped - Even thoughFis ashadow caster,Ewill light-upGand thefirst_stage keptlogic will includeG.