Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 36 additions & 20 deletions torchx/runner/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -426,26 +426,42 @@ def dryrun(

sched._pre_build_validate(app, scheduler, resolved_cfg)

if workspace and isinstance(sched, WorkspaceMixin):
role = app.roles[0]
old_img = role.image

logger.info(f"Checking for changes in workspace `{workspace}`...")
logger.info(
'To disable workspaces pass: --workspace="" from CLI or workspace=None programmatically.'
)
sched.build_workspace_and_update_role2(role, workspace, resolved_cfg)

if old_img != role.image:
logger.info(
f"Built new image `{role.image}` based on original image `{old_img}`"
f" and changes in workspace `{workspace}` for role[0]={role.name}."
)
else:
logger.info(
f"Reusing original image `{old_img}` for role[0]={role.name}."
" Either a patch was built or no changes to workspace was detected."
)
if isinstance(sched, WorkspaceMixin):
for i, role in enumerate(app.roles):
role_workspace = role.workspace

if i == 0 and workspace:
# NOTE: torchx originally took workspace as a runner arg and only applied the workspace to role[0]
# later, torchx added support for the workspace attr in Role
# for BC, give precedence to the workspace argument over the workspace attr for role[0]
if role_workspace:
logger.info(
f"Using workspace={workspace} over role[{i}].workspace={role_workspace} for role[{i}]={role.name}."
" To use the role's workspace attr pass: --workspace='' from CLI or workspace=None programmatically." # noqa: B950
)
role_workspace = workspace

if role_workspace:
old_img = role.image
logger.info(
f"Checking for changes in workspace `{role_workspace}` for role[{i}]={role.name}..."
)
# TODO kiuk@ once we deprecate the `workspace` argument in runner APIs we can simplify the signature of
# build_workspace_and_update_role2() to just taking the role and resolved_cfg
sched.build_workspace_and_update_role2(
role, role_workspace, resolved_cfg
)

if old_img != role.image:
logger.info(
f"Built new image `{role.image}` based on original image `{old_img}`"
f" and changes in workspace `{role_workspace}` for role[{i}]={role.name}."
)
else:
logger.info(
f"Reusing original image `{old_img}` for role[{i}]={role.name}."
" Either a patch was built or no changes to workspace was detected."
)

sched._validate(app, scheduler, resolved_cfg)
dryrun_info = sched.submit_dryrun(app, resolved_cfg)
Expand Down
98 changes: 74 additions & 24 deletions torchx/runner/test/api_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,18 @@
create_scheduler,
LocalDirectoryImageProvider,
)
from torchx.specs import AppDryRunInfo, CfgVal
from torchx.specs.api import (
from torchx.specs import (
AppDef,
AppDryRunInfo,
AppHandle,
AppState,
CfgVal,
parse_app_handle,
Resource,
Role,
runopts,
UnknownAppException,
Workspace,
)
from torchx.specs.finder import ComponentNotFoundException
from torchx.test.fixtures import TestWithTmpDir
Expand Down Expand Up @@ -400,6 +402,16 @@ def build_workspace_and_update_role(
) -> None:
if self.build_new_img:
role.image = f"{role.image}_new"
role.env["SRC_WORKSPACE"] = workspace

def create_role(image: str, workspace: str | None = None) -> Role:
return Role(
name="noop",
image=image,
resource=resource.SMALL,
entrypoint="/bin/true",
workspace=Workspace.from_str(workspace),
)

with Runner(
name=SESSION_NAME,
Expand All @@ -411,33 +423,71 @@ def build_workspace_and_update_role(
"builds-img": lambda name, **kwargs: TestScheduler(build_new_img=True),
},
) as runner:
app = AppDef(
"ignored",
roles=[create_role(image="foo"), create_role(image="bar")],
)
roles = runner.dryrun(
app, "no-build-img", workspace="//workspace"
).request.roles
self.assertEqual("foo", roles[0].image)
self.assertEqual("bar", roles[1].image)

roles = runner.dryrun(
app, "builds-img", workspace="//workspace"
).request.roles

# workspace is attached to role[0] when role[0].workspace is `None`
self.assertEqual("foo_new", roles[0].image)
self.assertEqual("bar", roles[1].image)

# now run with role[0] having workspace attribute defined
app = AppDef(
"ignored",
roles=[
Role(
name="sleep",
image="foo",
resource=resource.SMALL,
entrypoint="sleep",
args=["1"],
),
Role(
name="sleep",
image="bar",
resource=resource.SMALL,
entrypoint="sleep",
args=["1"],
),
create_role(image="foo", workspace="//should_be_overriden"),
create_role(image="bar"),
],
)
roles = runner.dryrun(
app, "builds-img", workspace="//workspace"
).request.roles
# workspace argument should override role[0].workspace attribute
self.assertEqual("foo_new", roles[0].image)
self.assertEqual("//workspace", roles[0].env["SRC_WORKSPACE"])
self.assertEqual("bar", roles[1].image)

# now run with both role[0] and role[1] having workspace attr
app = AppDef(
"ignored",
roles=[
create_role(image="foo", workspace="//foo"),
create_role(image="bar", workspace="//bar"),
],
)
roles = runner.dryrun(
app, "builds-img", workspace="//workspace"
).request.roles

# workspace argument should override role[0].workspace attribute
self.assertEqual("foo_new", roles[0].image)
self.assertEqual("//workspace", roles[0].env["SRC_WORKSPACE"])
self.assertEqual("bar_new", roles[1].image)
self.assertEqual("//bar", roles[1].env["SRC_WORKSPACE"])

# now run with both role[0] and role[1] having workspace attr but no workspace arg
app = AppDef(
"ignored",
roles=[
create_role(image="foo", workspace="//foo"),
create_role(image="bar", workspace="//bar"),
],
)
dryruninfo = runner.dryrun(app, "no-build-img", workspace="//workspace")
self.assertEqual("foo", dryruninfo.request.roles[0].image)
self.assertEqual("bar", dryruninfo.request.roles[1].image)

dryruninfo = runner.dryrun(app, "builds-img", workspace="//workspace")
# workspace is attached to role[0] by default
self.assertEqual("foo_new", dryruninfo.request.roles[0].image)
self.assertEqual("bar", dryruninfo.request.roles[1].image)
roles = runner.dryrun(app, "builds-img", workspace=None).request.roles
self.assertEqual("foo_new", roles[0].image)
self.assertEqual("//foo", roles[0].env["SRC_WORKSPACE"])
self.assertEqual("bar_new", roles[1].image)
self.assertEqual("//bar", roles[1].env["SRC_WORKSPACE"])

def test_describe(self, _) -> None:
with self.get_runner() as runner:
Expand Down
3 changes: 3 additions & 0 deletions torchx/specs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
UnknownAppException,
UnknownSchedulerException,
VolumeMount,
Workspace,
)
from torchx.specs.builders import make_app_handle, materialize_appdef, parse_mounts

Expand Down Expand Up @@ -236,4 +237,6 @@ def gpu_x_1() -> Dict[str, Resource]:
"torchx_run_args_from_json",
"TorchXRunArgs",
"ALL",
"TORCHX_HOME",
"Workspace",
]
83 changes: 80 additions & 3 deletions torchx/specs/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,78 @@ class DeviceMount:
permissions: str = "rwm"


@dataclass
class Workspace:
"""
Specifies a local "workspace" (a set of directories). Workspaces are ad-hoc built
into an (usually ephemeral) image. This effectively mirrors the local code changes
at job submission time.

For example:

1. ``projects={"~/github/torch": "torch"}`` copies ``~/github/torch/**`` into ``$REMOTE_WORKSPACE_ROOT/torch/**``
2. ``projects={"~/github/torch": ""}`` copies ``~/github/torch/**`` into ``$REMOTE_WORKSPACE_ROOT/**``

The exact location of ``$REMOTE_WORKSPACE_ROOT`` is implementation dependent and varies between
different implementations of :py:class:`~torchx.workspace.api.WorkspaceMixin`.
Check the scheduler documentation for details on which workspace it supports.

Note: ``projects`` maps the location of the local project to a sub-directory in the remote workspace root directory.
Typically the local project location is a directory path (e.g. ``/home/foo/github/torch``).


Attributes:
projects: mapping of local project to the sub-dir in the remote workspace dir.
"""

projects: dict[str, str]

def __bool__(self) -> bool:
"""False if no projects mapping. Lets us use workspace object in an if-statement"""
return bool(self.projects)

def is_unmapped_single_project(self) -> bool:
"""
Returns ``True`` if this workspace only has 1 project
and its target mapping is an empty string.
"""
return len(self.projects) == 1 and not next(iter(self.projects.values()))

@staticmethod
def from_str(workspace: str | None) -> "Workspace":
import yaml

if not workspace:
return Workspace({})

projects = yaml.safe_load(workspace)
if isinstance(projects, str): # single project workspace
projects = {projects: ""}
else: # multi-project workspace
# Replace None mappings with "" (empty string)
projects = {k: ("" if v is None else v) for k, v in projects.items()}

return Workspace(projects)

def __str__(self) -> str:
"""
Returns a string representation of the Workspace by concatenating
the project mappings using ';' as a delimiter and ':' between key and value.
If the single-project workspace with no target mapping, then simply
returns the src (local project dir)

NOTE: meant to be used for logging purposes not serde.
Therefore not symmetric with :py:func:`Workspace.from_str`.

"""
if self.is_unmapped_single_project():
return next(iter(self.projects))
else:
return ";".join(
k if not v else f"{k}:{v}" for k, v in self.projects.items()
)


@dataclass
class Role:
"""
Expand Down Expand Up @@ -402,6 +474,10 @@ class Role:
metadata: Free form information that is associated with the role, for example
scheduler specific data. The key should follow the pattern: ``$scheduler.$key``
mounts: a list of mounts on the machine
workspace: local project directories to be mirrored on the remote job.
NOTE: The workspace argument provided to the :py:class:`~torchx.runner.api.Runner` APIs
only takes effect on ``appdef.role[0]`` and overrides this attribute.

"""

name: str
Expand All @@ -417,9 +493,10 @@ class Role:
resource: Resource = field(default_factory=_null_resource)
port_map: Dict[str, int] = field(default_factory=dict)
metadata: Dict[str, Any] = field(default_factory=dict)
mounts: List[Union[BindMount, VolumeMount, DeviceMount]] = field(
default_factory=list
)
mounts: List[BindMount | VolumeMount | DeviceMount] = field(default_factory=list)
workspace: Workspace | None = None

# DEPRECATED DO NOT SET, WILL BE REMOVED SOON
overrides: Dict[str, Any] = field(default_factory=dict)

# pyre-ignore
Expand Down
Loading
Loading