Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,13 @@ jobs:
if: ${{ startsWith(matrix.on, 'macos-') }}
- name: "Download CWL conformance test script from StreamFlow"
run: |
curl -fsSL https://raw.githubusercontent.com/alpha-unito/streamflow/refs/tags/0.2.0.dev14/cwl-conformance-test.sh -o cwl-conformance-test.sh
curl -fsSL https://raw.githubusercontent.com/alpha-unito/streamflow/refs/tags/0.2.0rc1/cwl-conformance-test.sh -o cwl-conformance-test.sh
chmod 755 cwl-conformance-test.sh
- name: "Test CWL v1.3 conformance"
env:
VERSION: "v1.3"
COMMIT: "2063e9095f421f2a2ce12abaf196b2ba06ca5aae"
EXCLUDE: "docker_entrypoint,modify_file_content,iwd-container-entryname1"
EXCLUDE: "docker_entrypoint,modify_file_content"
DOCKER: "docker"
run: ./cwl-conformance-test.sh
- name: "Upload test results"
Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ flake8:

format:
isort streamflow tests
black streamflow tests
black --target-version py310 streamflow tests

format-check:
isort --check-only streamflow tests
black --diff --check streamflow tests
black --target-version py310 --diff --check streamflow tests

pyupgrade:
pyupgrade --py3-only --py310-plus $(shell git ls-files | grep .py)
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ classifiers = [
]
dependencies = [
"asyncpg==0.31.0",
"streamflow==0.2.0.dev14"
"streamflow==0.2.0rc1"
]
version = "0.0.10"

Expand Down
14 changes: 7 additions & 7 deletions streamflow/plugins/unito/postgresql/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
from typing import Any

import asyncpg
from cachebox import cached
from streamflow.core import utils
from streamflow.core.asyncache import cachedmethod
from streamflow.core.context import StreamFlowContext
from streamflow.core.deployment import Target
from streamflow.core.persistence import DependencyType
Expand Down Expand Up @@ -325,7 +325,7 @@ async def get_dependers(
token_id,
)

@cachedmethod(lambda self: self.deployment_cache)
@cached(cache=lambda self: self.deployment_cache)
async def get_deployment(self, deployment_id: int) -> MutableMapping[str, Any]:
async with self.pool as pool:
async with pool.acquire() as conn:
Expand Down Expand Up @@ -368,7 +368,7 @@ async def get_executions_by_step(
for row in rows
]

@cachedmethod(lambda self: self.filter_cache)
@cached(cache=lambda self: self.filter_cache)
async def get_filter(self, filter_id: int) -> MutableMapping[str, Any]:
async with self.pool as pool:
async with pool.acquire() as conn:
Expand Down Expand Up @@ -426,7 +426,7 @@ async def get_output_steps(
DependencyType.INPUT.value,
)

@cachedmethod(lambda self: self.port_cache)
@cached(cache=lambda self: self.port_cache)
async def get_port(self, port_id: int) -> MutableMapping[str, Any]:
async with self.pool as pool:
async with pool.acquire() as conn:
Expand Down Expand Up @@ -490,7 +490,7 @@ async def get_reports(
)
return list(result.values())

@cachedmethod(lambda self: self.step_cache)
@cached(cache=lambda self: self.step_cache)
async def get_step(self, step_id: int) -> MutableMapping[str, Any]:
async with self.pool as pool:
async with pool.acquire() as conn:
Expand All @@ -500,7 +500,7 @@ async def get_step(self, step_id: int) -> MutableMapping[str, Any]:
)
)

@cachedmethod(lambda self: self.target_cache)
@cached(cache=lambda self: self.target_cache)
async def get_target(self, target_id: int) -> MutableMapping[str, Any]:
async with self.pool as pool:
async with pool.acquire() as conn:
Expand All @@ -512,7 +512,7 @@ async def get_target(self, target_id: int) -> MutableMapping[str, Any]:
)
)

@cachedmethod(lambda self: self.token_cache)
@cached(cache=lambda self: self.token_cache)
async def get_token(self, token_id: int) -> MutableMapping[str, Any]:
async with self.pool as pool:
async with pool.acquire() as conn:
Expand Down
73 changes: 37 additions & 36 deletions tests/test_cwl_persistence.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ def _create_cwl_token_processor(name: str, workflow: CWLWorkflow) -> CWLTokenPro


@pytest.mark.asyncio
async def test_clone_transformer(context: StreamFlowContext):
async def test_clone_transformer(context: StreamFlowContext) -> None:
"""Test saving and loading CloneTransformer from database"""
workflow, (in_port, replica_port, out_port) = await create_workflow(
context=context, num_port=3
Expand All @@ -184,7 +184,7 @@ async def test_clone_transformer(context: StreamFlowContext):


@pytest.mark.asyncio
async def test_cwl_file_token(context: StreamFlowContext):
async def test_cwl_file_token(context: StreamFlowContext) -> None:
"""Test saving and loading CWLFileToken from database"""
token = get_full_instantiation(
cls_=CWLFileToken,
Expand All @@ -207,7 +207,7 @@ async def test_cwl_file_token(context: StreamFlowContext):

@pytest.mark.asyncio
@pytest.mark.parametrize("processor_t", ["none", "primitive", "map", "object", "union"])
async def test_cwl_command(context: StreamFlowContext, processor_t: str):
async def test_cwl_command(context: StreamFlowContext, processor_t: str) -> None:
"""Test saving and loading ExecuteStep with CWLCommand and CWLCommandTokenProcessor classes from database"""
workflow = CWLWorkflow(
context=context, name=utils.random_name(), config={}, cwl_version=CWL_VERSION
Expand Down Expand Up @@ -272,7 +272,7 @@ async def test_cwl_command(context: StreamFlowContext, processor_t: str):


@pytest.mark.asyncio
async def test_cwl_expression_command(context: StreamFlowContext):
async def test_cwl_expression_command(context: StreamFlowContext) -> None:
"""Test saving and loading ExecuteStep with CWLExpressionCommand from database"""
workflow = CWLWorkflow(
context=context, name=utils.random_name(), config={}, cwl_version=CWL_VERSION
Expand Down Expand Up @@ -301,7 +301,7 @@ async def test_cwl_expression_command(context: StreamFlowContext):
"output_type",
["no_output", "default", "expression", "primitive", "object", "union"],
)
async def test_cwl_execute_step(context: StreamFlowContext, output_type: str):
async def test_cwl_execute_step(context: StreamFlowContext, output_type: str) -> None:
"""Test saving and loading CWLExecuteStep from database"""
workflow = CWLWorkflow(
context=context, name=utils.random_name(), config={}, cwl_version=CWL_VERSION
Expand Down Expand Up @@ -385,7 +385,7 @@ async def test_cwl_execute_step(context: StreamFlowContext, output_type: str):


@pytest.mark.asyncio
async def test_list_merge_combinator(context: StreamFlowContext):
async def test_list_merge_combinator(context: StreamFlowContext) -> None:
"""Test saving and loading CombinatorStep with ListMergeCombinator from database"""
workflow, (port,) = await create_workflow(context=context, num_port=1)
name = utils.random_name()
Expand Down Expand Up @@ -419,7 +419,7 @@ async def test_list_merge_combinator(context: StreamFlowContext):


@pytest.mark.asyncio
async def test_default_transformer(context: StreamFlowContext):
async def test_default_transformer(context: StreamFlowContext) -> None:
"""Test saving and loading DefaultTransformer from database"""
workflow, (port,) = await create_workflow(context=context, num_port=1)
step = get_full_instantiation(
Expand All @@ -433,7 +433,7 @@ async def test_default_transformer(context: StreamFlowContext):


@pytest.mark.asyncio
async def test_default_retag_transformer(context: StreamFlowContext):
async def test_default_retag_transformer(context: StreamFlowContext) -> None:
"""Test saving and loading DefaultRetagTransformer from database"""
workflow, (port,) = await create_workflow(context=context, num_port=1)
step = get_full_instantiation(
Expand All @@ -451,7 +451,9 @@ async def test_default_retag_transformer(context: StreamFlowContext):
@pytest.mark.parametrize(
"processor_t", ["primitive", "map", "object", "union", "optional"]
)
async def test_cwl_token_transformer(context: StreamFlowContext, processor_t: str):
async def test_cwl_token_transformer(
context: StreamFlowContext, processor_t: str
) -> None:
"""Test saving and loading CWLTokenTransformer with different TokenProcessor classes from database"""
workflow = CWLWorkflow(
context=context, name=utils.random_name(), config={}, cwl_version=CWL_VERSION
Expand Down Expand Up @@ -511,7 +513,7 @@ async def test_cwl_token_transformer(context: StreamFlowContext, processor_t: st


@pytest.mark.asyncio
async def test_value_from_transformer(context: StreamFlowContext):
async def test_value_from_transformer(context: StreamFlowContext) -> None:
"""Test saving and loading ValueFromTransformer with CWLTokenProcessor from database"""
workflow = CWLWorkflow(
context=context, name=utils.random_name(), config={}, cwl_version=CWL_VERSION
Expand All @@ -536,7 +538,7 @@ async def test_value_from_transformer(context: StreamFlowContext):


@pytest.mark.asyncio
async def test_loop_value_from_transformer(context: StreamFlowContext):
async def test_loop_value_from_transformer(context: StreamFlowContext) -> None:
"""Test saving and loading LoopValueFromTransformer with CWLTokenProcessor from database"""
workflow = CWLWorkflow(
context=context, name=utils.random_name(), config={}, cwl_version=CWL_VERSION
Expand All @@ -547,18 +549,15 @@ async def test_loop_value_from_transformer(context: StreamFlowContext):
workflow.format_graph = Graph()
await workflow.save(context)

step = cast(
LoopValueFromTransformer,
get_full_instantiation(
cls_=LoopValueFromTransformer,
name=f"{utils.random_name()}-loop-value-from-transformer",
processor=_create_cwl_token_processor(port_name, workflow),
port_name=port_name,
expression_lib=True,
full_js=True,
value_from="$(1 + 1 == 0)",
workflow=workflow,
),
step = get_full_instantiation(
cls_=LoopValueFromTransformer,
name=f"{utils.random_name()}-loop-value-from-transformer",
processor=_create_cwl_token_processor(port_name, workflow),
port_name=port_name,
expression_lib=True,
full_js=True,
value_from="$(1 + 1 == 0)",
workflow=workflow,
)
step.add_loop_input_port(port_name, ports[0])
step.add_loop_source_port(port_name, ports[1])
Expand All @@ -567,7 +566,7 @@ async def test_loop_value_from_transformer(context: StreamFlowContext):


@pytest.mark.asyncio
async def test_all_non_null_transformer(context: StreamFlowContext):
async def test_all_non_null_transformer(context: StreamFlowContext) -> None:
"""Test saving and loading AllNonNullTransformer from database"""
workflow, (in_port, out_port) = await create_workflow(context=context)
name = utils.random_name()
Expand All @@ -583,7 +582,7 @@ async def test_all_non_null_transformer(context: StreamFlowContext):


@pytest.mark.asyncio
async def test_first_non_null_transformer(context: StreamFlowContext):
async def test_first_non_null_transformer(context: StreamFlowContext) -> None:
"""Test saving and loading FirstNonNullTransformer from database"""
workflow, (in_port, out_port) = await create_workflow(context=context)
name = utils.random_name()
Expand All @@ -599,7 +598,7 @@ async def test_first_non_null_transformer(context: StreamFlowContext):


@pytest.mark.asyncio
async def test_forward_transformer(context: StreamFlowContext):
async def test_forward_transformer(context: StreamFlowContext) -> None:
"""Test saving and loading ForwardTransformer from database"""
workflow, (in_port, out_port) = await create_workflow(context=context)
name = utils.random_name()
Expand All @@ -615,7 +614,7 @@ async def test_forward_transformer(context: StreamFlowContext):


@pytest.mark.asyncio
async def test_list_to_element_transformer(context: StreamFlowContext):
async def test_list_to_element_transformer(context: StreamFlowContext) -> None:
"""Test saving and loading ListToElementTransformer from database"""
workflow, (in_port, out_port) = await create_workflow(context=context)
name = utils.random_name()
Expand All @@ -631,7 +630,7 @@ async def test_list_to_element_transformer(context: StreamFlowContext):


@pytest.mark.asyncio
async def test_only_non_null_transformer(context: StreamFlowContext):
async def test_only_non_null_transformer(context: StreamFlowContext) -> None:
"""Test saving and loading OnlyNonNullTransformer from database"""
workflow, (in_port, out_port) = await create_workflow(context=context)
name = utils.random_name()
Expand All @@ -647,7 +646,7 @@ async def test_only_non_null_transformer(context: StreamFlowContext):


@pytest.mark.asyncio
async def test_cwl_empty_scatter_conditional_step(context: StreamFlowContext):
async def test_cwl_empty_scatter_conditional_step(context: StreamFlowContext) -> None:
"""Test saving and loading CWLEmptyScatterConditionalStep from database"""
workflow, (in_port, out_port) = await create_workflow(context=context)
name = utils.random_name()
Expand All @@ -664,7 +663,7 @@ async def test_cwl_empty_scatter_conditional_step(context: StreamFlowContext):


@pytest.mark.asyncio
async def test_cwl_conditional_step(context: StreamFlowContext):
async def test_cwl_conditional_step(context: StreamFlowContext) -> None:
"""Test saving and loading CWLConditionalStep from database"""
workflow, (skip_port,) = await create_workflow(context=context, num_port=1)
step = get_full_instantiation(
Expand All @@ -681,7 +680,7 @@ async def test_cwl_conditional_step(context: StreamFlowContext):


@pytest.mark.asyncio
async def test_cwl_loop_conditional_step(context: StreamFlowContext):
async def test_cwl_loop_conditional_step(context: StreamFlowContext) -> None:
"""Test saving and loading CWLLoopConditionalStep from database"""
workflow, (skip_port,) = await create_workflow(context=context, num_port=1)
step = get_full_instantiation(
Expand All @@ -698,7 +697,7 @@ async def test_cwl_loop_conditional_step(context: StreamFlowContext):


@pytest.mark.asyncio
async def test_cwl_transfer_step(context: StreamFlowContext):
async def test_cwl_transfer_step(context: StreamFlowContext) -> None:
"""Test saving and loading CWLTransferStep from database"""
workflow, (job_port,) = await create_workflow(context=context, num_port=1)
await workflow.save(context)
Expand All @@ -715,7 +714,7 @@ async def test_cwl_transfer_step(context: StreamFlowContext):


@pytest.mark.asyncio
async def test_cwl_input_injector_step(context: StreamFlowContext):
async def test_cwl_input_injector_step(context: StreamFlowContext) -> None:
"""Test saving and loading CWLInputInjectorStep from database"""
workflow, (job_port,) = await create_workflow(context=context, num_port=1)
await workflow.save(context)
Expand All @@ -731,7 +730,9 @@ async def test_cwl_input_injector_step(context: StreamFlowContext):

@pytest.mark.asyncio
@pytest.mark.parametrize("step_cls", [CWLLoopOutputAllStep, CWLLoopOutputLastStep])
async def test_cwl_loop_output(context: StreamFlowContext, step_cls: type[Step]):
async def test_cwl_loop_output(
context: StreamFlowContext, step_cls: type[Step]
) -> None:
"""Test saving and loading CWLLoopOutput from database"""
workflow, _ = await create_workflow(context=context, num_port=0)
step = get_full_instantiation(
Expand All @@ -744,7 +745,7 @@ async def test_cwl_loop_output(context: StreamFlowContext, step_cls: type[Step])


@pytest.mark.asyncio
async def test_cwl_schedule_step(context: StreamFlowContext):
async def test_cwl_schedule_step(context: StreamFlowContext) -> None:
"""Test saving and loading CWLScheduleStep with a CWLHardwareRequirement from database"""
workflow, (job_port,) = await create_workflow(context, 1)
binding_config = get_full_instantiation(
Expand Down Expand Up @@ -790,7 +791,7 @@ async def test_cwl_schedule_step(context: StreamFlowContext):


@pytest.mark.asyncio
async def test_cwl_workflow(context: StreamFlowContext):
async def test_cwl_workflow(context: StreamFlowContext) -> None:
"""Test saving and loading CWLWorkflow from database"""
g = Graph()
g.add(
Expand Down
2 changes: 1 addition & 1 deletion tests/test_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@


@pytest.mark.asyncio
async def test_get_steps_queries(context: StreamFlowContext):
async def test_get_steps_queries(context: StreamFlowContext) -> None:
"""Test get_input_steps and get_output_steps queries"""
workflow, (port_a, job_port, job_port_2, port_b, port_c) = await create_workflow(
context, num_port=5
Expand Down
Loading
Loading