Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions src/dstack/_internal/core/models/instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,18 @@ def finished_statuses(cls) -> List["InstanceStatus"]:
return [cls.TERMINATING, cls.TERMINATED]


class InstanceTerminationReason(str, Enum):
IDLE_TIMEOUT = "idle_timeout"
PROOVISIONING_TIMEOUT = "provisioning_timeout"
ERROR = "error"
JOB_FINISHED = "job_finished"
TERMINATION_TIMEOUT = "termination_timeout"
STARTING_TIMEOUT = "starting_timeout"
NO_OFFERS = "no_offers"
MASTER_FAILED = "master_failed"
NO_BALANCE = "no_balance"


class Instance(CoreModel):
id: UUID
project_name: str
Expand All @@ -231,6 +243,7 @@ class Instance(CoreModel):
unreachable: bool = False
health_status: HealthStatus = HealthStatus.HEALTHY
termination_reason: Optional[str] = None
termination_reason_message: Optional[str] = None
created: datetime.datetime
region: Optional[str] = None
availability_zone: Optional[str] = None
Expand Down
2 changes: 2 additions & 0 deletions src/dstack/_internal/core/models/runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ class JobTerminationReason(str, Enum):
TERMINATED_BY_SERVER = "terminated_by_server"
INACTIVITY_DURATION_EXCEEDED = "inactivity_duration_exceeded"
TERMINATED_DUE_TO_UTILIZATION_POLICY = "terminated_due_to_utilization_policy"
NO_BALANCE = "no_balance"
# Set by the runner
CONTAINER_EXITED_WITH_ERROR = "container_exited_with_error"
PORTS_BINDING_FAILED = "ports_binding_failed"
Expand All @@ -161,6 +162,7 @@ def to_status(self) -> JobStatus:
self.TERMINATED_BY_SERVER: JobStatus.TERMINATED,
self.INACTIVITY_DURATION_EXCEEDED: JobStatus.TERMINATED,
self.TERMINATED_DUE_TO_UTILIZATION_POLICY: JobStatus.TERMINATED,
self.NO_BALANCE: JobStatus.TERMINATED,
self.CONTAINER_EXITED_WITH_ERROR: JobStatus.FAILED,
self.PORTS_BINDING_FAILED: JobStatus.FAILED,
self.CREATING_CONTAINER_ERROR: JobStatus.FAILED,
Expand Down
38 changes: 24 additions & 14 deletions src/dstack/_internal/server/background/tasks/process_instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
InstanceOfferWithAvailability,
InstanceRuntime,
InstanceStatus,
InstanceTerminationReason,
RemoteConnectionInfo,
SSHKey,
)
Expand Down Expand Up @@ -240,7 +241,7 @@ def _check_and_mark_terminating_if_idle_duration_expired(instance: InstanceModel
delta = datetime.timedelta(seconds=idle_seconds)
if idle_duration > delta:
instance.status = InstanceStatus.TERMINATING
instance.termination_reason = "Idle timeout"
instance.termination_reason = InstanceTerminationReason.IDLE_TIMEOUT.value
logger.info(
"Instance %s idle duration expired: idle time %ss. Terminating",
instance.name,
Expand All @@ -262,7 +263,7 @@ async def _add_remote(instance: InstanceModel) -> None:
retry_duration_deadline = instance.created_at + timedelta(seconds=PROVISIONING_TIMEOUT_SECONDS)
if retry_duration_deadline < get_current_datetime():
instance.status = InstanceStatus.TERMINATED
instance.termination_reason = "Provisioning timeout expired"
instance.termination_reason = InstanceTerminationReason.PROOVISIONING_TIMEOUT.value
logger.warning(
"Failed to start instance %s in %d seconds. Terminating...",
instance.name,
Expand All @@ -285,7 +286,8 @@ async def _add_remote(instance: InstanceModel) -> None:
ssh_proxy_pkeys = None
except (ValueError, PasswordRequiredException):
instance.status = InstanceStatus.TERMINATED
instance.termination_reason = "Unsupported private SSH key type"
instance.termination_reason = InstanceTerminationReason.ERROR.value
instance.termination_reason_message = "Unsupported private SSH key type"
logger.warning(
"Failed to add instance %s: unsupported private SSH key type",
instance.name,
Expand Down Expand Up @@ -343,7 +345,10 @@ async def _add_remote(instance: InstanceModel) -> None:
)
if instance_network is not None and internal_ip is None:
instance.status = InstanceStatus.TERMINATED
instance.termination_reason = "Failed to locate internal IP address on the given network"
instance.termination_reason = InstanceTerminationReason.ERROR.value
instance.termination_reason_message = (
"Failed to locate internal IP address on the given network"
)
logger.warning(
"Failed to add instance %s: failed to locate internal IP address on the given network",
instance.name,
Expand All @@ -356,7 +361,8 @@ async def _add_remote(instance: InstanceModel) -> None:
if internal_ip is not None:
if not is_ip_among_addresses(ip_address=internal_ip, addresses=host_network_addresses):
instance.status = InstanceStatus.TERMINATED
instance.termination_reason = (
instance.termination_reason = InstanceTerminationReason.ERROR.value
instance.termination_reason_message = (
"Specified internal IP not found among instance interfaces"
)
logger.warning(
Expand All @@ -378,7 +384,8 @@ async def _add_remote(instance: InstanceModel) -> None:
instance.total_blocks = blocks
else:
instance.status = InstanceStatus.TERMINATED
instance.termination_reason = "Cannot split into blocks"
instance.termination_reason = InstanceTerminationReason.ERROR.value
instance.termination_reason_message = "Cannot split into blocks"
logger.warning(
"Failed to add instance %s: cannot split into blocks",
instance.name,
Expand Down Expand Up @@ -497,7 +504,8 @@ async def _create_instance(session: AsyncSession, instance: InstanceModel) -> No
requirements = get_instance_requirements(instance)
except ValidationError as e:
instance.status = InstanceStatus.TERMINATED
instance.termination_reason = (
instance.termination_reason = InstanceTerminationReason.ERROR.value
instance.termination_reason_message = (
f"Error to parse profile, requirements or instance_configuration: {e}"
)
logger.warning(
Expand Down Expand Up @@ -645,7 +653,7 @@ async def _create_instance(session: AsyncSession, instance: InstanceModel) -> No
)
return

_mark_terminated(instance, "All offers failed" if offers else "No offers found")
_mark_terminated(instance, InstanceTerminationReason.NO_OFFERS.value)
if (
instance.fleet
and _is_fleet_master_instance(instance)
Expand All @@ -656,7 +664,7 @@ async def _create_instance(session: AsyncSession, instance: InstanceModel) -> No
for sibling_instance in instance.fleet.instances:
if sibling_instance.id == instance.id:
continue
_mark_terminated(sibling_instance, "Master instance failed to start")
_mark_terminated(sibling_instance, InstanceTerminationReason.MASTER_FAILED.value)


def _mark_terminated(instance: InstanceModel, termination_reason: str) -> None:
Expand All @@ -681,7 +689,7 @@ async def _check_instance(session: AsyncSession, instance: InstanceModel) -> Non
):
# A busy instance could have no active jobs due to this bug: https://github.com/dstackai/dstack/issues/2068
instance.status = InstanceStatus.TERMINATING
instance.termination_reason = "Instance job finished"
instance.termination_reason = InstanceTerminationReason.JOB_FINISHED.value
logger.info(
"Detected busy instance %s with finished job. Marked as TERMINATING",
instance.name,
Expand Down Expand Up @@ -810,7 +818,7 @@ async def _check_instance(session: AsyncSession, instance: InstanceModel) -> Non
deadline = instance.termination_deadline
if get_current_datetime() > deadline:
instance.status = InstanceStatus.TERMINATING
instance.termination_reason = "Termination deadline"
instance.termination_reason = InstanceTerminationReason.TERMINATION_TIMEOUT.value
logger.warning(
"Instance %s shim waiting timeout. Marked as TERMINATING",
instance.name,
Expand Down Expand Up @@ -839,7 +847,7 @@ async def _wait_for_instance_provisioning_data(
"Instance %s failed because instance has not become running in time", instance.name
)
instance.status = InstanceStatus.TERMINATING
instance.termination_reason = "Instance has not become running in time"
instance.termination_reason = InstanceTerminationReason.STARTING_TIMEOUT.value
return

backend = await backends_services.get_project_backend_by_type(
Expand All @@ -852,7 +860,8 @@ async def _wait_for_instance_provisioning_data(
instance.name,
)
instance.status = InstanceStatus.TERMINATING
instance.termination_reason = "Backend not available"
instance.termination_reason = InstanceTerminationReason.ERROR.value
instance.termination_reason_message = "Backend not available"
return
try:
await run_async(
Expand All @@ -869,7 +878,8 @@ async def _wait_for_instance_provisioning_data(
repr(e),
)
instance.status = InstanceStatus.TERMINATING
instance.termination_reason = "Error while waiting for instance to become running"
instance.termination_reason = InstanceTerminationReason.ERROR.value
instance.termination_reason_message = "Error while waiting for instance to become running"
except Exception:
logger.exception(
"Got exception when updating instance %s provisioning data", instance.name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from dstack._internal.core.models.files import FileArchiveMapping
from dstack._internal.core.models.instances import (
InstanceStatus,
InstanceTerminationReason,
RemoteConnectionInfo,
SSHConnectionParams,
)
Expand Down Expand Up @@ -58,6 +59,7 @@
from dstack._internal.server.services.jobs import (
find_job,
get_job_attached_volumes,
get_job_provisioning_data,
get_job_runtime_data,
job_model_to_job_submission,
)
Expand Down Expand Up @@ -381,9 +383,22 @@ async def _process_running_job(session: AsyncSession, job_model: JobModel):
fmt(job_model),
job_submission.age,
)
# TODO: Replace with JobTerminationReason.INSTANCE_UNREACHABLE in 0.20 or
# when CLI <= 0.19.8 is no longer supported
job_model.termination_reason = JobTerminationReason.INTERRUPTED_BY_NO_CAPACITY
if (
job_model.instance is not None
and job_model.instance.termination_reason
== InstanceTerminationReason.NO_BALANCE.value
):
# if instance was terminated due to no balance, set job termination reason accodingly
job_model.termination_reason = JobTerminationReason.NO_BALANCE
else:
job_provisioning_data = get_job_provisioning_data(job_model)
# use JobTerminationReason.INSTANCE_UNREACHABLE for on-demand instances only
job_model.termination_reason = (
JobTerminationReason.INSTANCE_UNREACHABLE
if job_provisioning_data
and not job_provisioning_data.instance_type.resources.spot
else JobTerminationReason.INTERRUPTED_BY_NO_CAPACITY
)
job_model.status = JobStatus.TERMINATING
else:
logger.warning(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""instance.termination_reason_message

Revision ID: a16a05249504
Revises: 2498ab323443
Create Date: 2025-10-13 15:29:56.691164

"""

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "a16a05249504"
down_revision = "2498ab323443"
branch_labels = None
depends_on = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("instances", schema=None) as batch_op:
batch_op.add_column(
sa.Column("termination_reason_message", sa.String(length=4000), nullable=True)
)

# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("instances", schema=None) as batch_op:
batch_op.drop_column("termination_reason_message")

# ### end Alembic commands ###
2 changes: 2 additions & 0 deletions src/dstack/_internal/server/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,9 @@ class InstanceModel(BaseModel):

# instance termination handling
termination_deadline: Mapped[Optional[datetime]] = mapped_column(NaiveDateTime)
# TODO: Migrate to EnumAsString(InstanceTerminationReason, 100) after enough releases to ensure backward compatibility
termination_reason: Mapped[Optional[str]] = mapped_column(String(4000))
Comment on lines +617 to 618
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens after enough releases? Old instances continue to have non-enum values forever.

termination_reason_message: Mapped[Optional[str]] = mapped_column(String(4000))
# Deprecated since 0.19.22, not used
health_status: Mapped[Optional[str]] = mapped_column(String(4000))
health: Mapped[HealthStatus] = mapped_column(
Expand Down
1 change: 1 addition & 0 deletions src/dstack/_internal/server/services/instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ def instance_model_to_instance(instance_model: InstanceModel) -> Instance:
unreachable=instance_model.unreachable,
health_status=instance_model.health,
termination_reason=instance_model.termination_reason,
termination_reason_message=instance_model.termination_reason_message,
created=instance_model.created_at,
total_blocks=instance_model.total_blocks,
busy_blocks=instance_model.busy_blocks,
Expand Down
2 changes: 2 additions & 0 deletions src/dstack/_internal/server/services/jobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -749,6 +749,8 @@ def _get_job_status_message(job_model: JobModel) -> str:
return "stopped"
elif job_model.termination_reason == JobTerminationReason.ABORTED_BY_USER:
return "aborted"
elif job_model.termination_reason == JobTerminationReason.NO_BALANCE:
return "no balance"
return job_model.status.value


Expand Down
1 change: 1 addition & 0 deletions src/tests/_internal/core/models/test_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ def test_get_error_returns_expected_messages():
JobTerminationReason.ABORTED_BY_USER,
JobTerminationReason.TERMINATED_BY_SERVER,
JobTerminationReason.CONTAINER_EXITED_WITH_ERROR,
JobTerminationReason.NO_BALANCE,
]

for reason in JobTerminationReason:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
InstanceOffer,
InstanceOfferWithAvailability,
InstanceStatus,
InstanceTerminationReason,
InstanceType,
Resources,
)
Expand Down Expand Up @@ -251,7 +252,7 @@ async def test_check_shim_terminate_instance_by_deadline(self, test_db, session:
assert instance is not None
assert instance.status == InstanceStatus.TERMINATING
assert instance.termination_deadline == termination_deadline_time
assert instance.termination_reason == "Termination deadline"
assert instance.termination_reason == InstanceTerminationReason.TERMINATION_TIMEOUT.value

@pytest.mark.asyncio
@pytest.mark.parametrize(
Expand Down Expand Up @@ -510,7 +511,7 @@ async def test_terminate_by_idle_timeout(self, test_db, session: AsyncSession):
await session.refresh(instance)
assert instance is not None
assert instance.status == InstanceStatus.TERMINATING
assert instance.termination_reason == "Idle timeout"
assert instance.termination_reason == InstanceTerminationReason.IDLE_TIMEOUT.value


class TestSSHInstanceTerminateProvisionTimeoutExpired:
Expand All @@ -531,7 +532,7 @@ async def test_terminate_by_idle_timeout(self, test_db, session: AsyncSession):

await session.refresh(instance)
assert instance.status == InstanceStatus.TERMINATED
assert instance.termination_reason == "Provisioning timeout expired"
assert instance.termination_reason == InstanceTerminationReason.PROOVISIONING_TIMEOUT.value


class TestTerminate:
Expand Down Expand Up @@ -800,7 +801,7 @@ async def test_fails_if_all_offers_fail(self, session: AsyncSession, err: Except

await session.refresh(instance)
assert instance.status == InstanceStatus.TERMINATED
assert instance.termination_reason == "All offers failed"
assert instance.termination_reason == InstanceTerminationReason.NO_OFFERS.value

async def test_fails_if_no_offers(self, session: AsyncSession):
project = await create_project(session=session)
Expand All @@ -813,19 +814,22 @@ async def test_fails_if_no_offers(self, session: AsyncSession):

await session.refresh(instance)
assert instance.status == InstanceStatus.TERMINATED
assert instance.termination_reason == "No offers found"
assert instance.termination_reason == InstanceTerminationReason.NO_OFFERS.value

@pytest.mark.parametrize(
("placement", "expected_termination_reasons"),
[
pytest.param(
InstanceGroupPlacement.CLUSTER,
{"No offers found": 1, "Master instance failed to start": 3},
{
InstanceTerminationReason.NO_OFFERS.value: 1,
InstanceTerminationReason.MASTER_FAILED.value: 3,
},
id="cluster",
),
pytest.param(
None,
{"No offers found": 4},
{InstanceTerminationReason.NO_OFFERS.value: 4},
id="non-cluster",
),
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ async def test_pulling_shim_failed(self, test_db, session: AsyncSession):
assert SSHTunnelMock.call_count == 3
await session.refresh(job)
assert job.status == JobStatus.TERMINATING
assert job.termination_reason == JobTerminationReason.INTERRUPTED_BY_NO_CAPACITY
assert job.termination_reason == JobTerminationReason.INSTANCE_UNREACHABLE
assert job.remove_at is None

@pytest.mark.asyncio
Expand Down
Loading