diff --git a/src/dstack/_internal/core/models/instances.py b/src/dstack/_internal/core/models/instances.py index f1f802d54..ef4fbc17b 100644 --- a/src/dstack/_internal/core/models/instances.py +++ b/src/dstack/_internal/core/models/instances.py @@ -216,6 +216,18 @@ def finished_statuses(cls) -> List["InstanceStatus"]: return [cls.TERMINATING, cls.TERMINATED] +class InstanceTerminationReason(str, Enum): + IDLE_TIMEOUT = "idle_timeout" + PROOVISIONING_TIMEOUT = "provisioning_timeout" + ERROR = "error" + JOB_FINISHED = "job_finished" + TERMINATION_TIMEOUT = "termination_timeout" + STARTING_TIMEOUT = "starting_timeout" + NO_OFFERS = "no_offers" + MASTER_FAILED = "master_failed" + NO_BALANCE = "no_balance" + + class Instance(CoreModel): id: UUID project_name: str @@ -231,6 +243,7 @@ class Instance(CoreModel): unreachable: bool = False health_status: HealthStatus = HealthStatus.HEALTHY termination_reason: Optional[str] = None + termination_reason_message: Optional[str] = None created: datetime.datetime region: Optional[str] = None availability_zone: Optional[str] = None diff --git a/src/dstack/_internal/core/models/runs.py b/src/dstack/_internal/core/models/runs.py index 969b336b9..dce8ef8e0 100644 --- a/src/dstack/_internal/core/models/runs.py +++ b/src/dstack/_internal/core/models/runs.py @@ -138,6 +138,7 @@ class JobTerminationReason(str, Enum): TERMINATED_BY_SERVER = "terminated_by_server" INACTIVITY_DURATION_EXCEEDED = "inactivity_duration_exceeded" TERMINATED_DUE_TO_UTILIZATION_POLICY = "terminated_due_to_utilization_policy" + NO_BALANCE = "no_balance" # Set by the runner CONTAINER_EXITED_WITH_ERROR = "container_exited_with_error" PORTS_BINDING_FAILED = "ports_binding_failed" @@ -161,6 +162,7 @@ def to_status(self) -> JobStatus: self.TERMINATED_BY_SERVER: JobStatus.TERMINATED, self.INACTIVITY_DURATION_EXCEEDED: JobStatus.TERMINATED, self.TERMINATED_DUE_TO_UTILIZATION_POLICY: JobStatus.TERMINATED, + self.NO_BALANCE: JobStatus.TERMINATED, self.CONTAINER_EXITED_WITH_ERROR: JobStatus.FAILED, self.PORTS_BINDING_FAILED: JobStatus.FAILED, self.CREATING_CONTAINER_ERROR: JobStatus.FAILED, diff --git a/src/dstack/_internal/server/background/tasks/process_instances.py b/src/dstack/_internal/server/background/tasks/process_instances.py index ec7ca8f7e..c08b5f0b7 100644 --- a/src/dstack/_internal/server/background/tasks/process_instances.py +++ b/src/dstack/_internal/server/background/tasks/process_instances.py @@ -45,6 +45,7 @@ InstanceOfferWithAvailability, InstanceRuntime, InstanceStatus, + InstanceTerminationReason, RemoteConnectionInfo, SSHKey, ) @@ -240,7 +241,7 @@ def _check_and_mark_terminating_if_idle_duration_expired(instance: InstanceModel delta = datetime.timedelta(seconds=idle_seconds) if idle_duration > delta: instance.status = InstanceStatus.TERMINATING - instance.termination_reason = "Idle timeout" + instance.termination_reason = InstanceTerminationReason.IDLE_TIMEOUT.value logger.info( "Instance %s idle duration expired: idle time %ss. Terminating", instance.name, @@ -262,7 +263,7 @@ async def _add_remote(instance: InstanceModel) -> None: retry_duration_deadline = instance.created_at + timedelta(seconds=PROVISIONING_TIMEOUT_SECONDS) if retry_duration_deadline < get_current_datetime(): instance.status = InstanceStatus.TERMINATED - instance.termination_reason = "Provisioning timeout expired" + instance.termination_reason = InstanceTerminationReason.PROOVISIONING_TIMEOUT.value logger.warning( "Failed to start instance %s in %d seconds. Terminating...", instance.name, @@ -285,7 +286,8 @@ async def _add_remote(instance: InstanceModel) -> None: ssh_proxy_pkeys = None except (ValueError, PasswordRequiredException): instance.status = InstanceStatus.TERMINATED - instance.termination_reason = "Unsupported private SSH key type" + instance.termination_reason = InstanceTerminationReason.ERROR.value + instance.termination_reason_message = "Unsupported private SSH key type" logger.warning( "Failed to add instance %s: unsupported private SSH key type", instance.name, @@ -343,7 +345,10 @@ async def _add_remote(instance: InstanceModel) -> None: ) if instance_network is not None and internal_ip is None: instance.status = InstanceStatus.TERMINATED - instance.termination_reason = "Failed to locate internal IP address on the given network" + instance.termination_reason = InstanceTerminationReason.ERROR.value + instance.termination_reason_message = ( + "Failed to locate internal IP address on the given network" + ) logger.warning( "Failed to add instance %s: failed to locate internal IP address on the given network", instance.name, @@ -356,7 +361,8 @@ async def _add_remote(instance: InstanceModel) -> None: if internal_ip is not None: if not is_ip_among_addresses(ip_address=internal_ip, addresses=host_network_addresses): instance.status = InstanceStatus.TERMINATED - instance.termination_reason = ( + instance.termination_reason = InstanceTerminationReason.ERROR.value + instance.termination_reason_message = ( "Specified internal IP not found among instance interfaces" ) logger.warning( @@ -378,7 +384,8 @@ async def _add_remote(instance: InstanceModel) -> None: instance.total_blocks = blocks else: instance.status = InstanceStatus.TERMINATED - instance.termination_reason = "Cannot split into blocks" + instance.termination_reason = InstanceTerminationReason.ERROR.value + instance.termination_reason_message = "Cannot split into blocks" logger.warning( "Failed to add instance %s: cannot split into blocks", instance.name, @@ -497,7 +504,8 @@ async def _create_instance(session: AsyncSession, instance: InstanceModel) -> No requirements = get_instance_requirements(instance) except ValidationError as e: instance.status = InstanceStatus.TERMINATED - instance.termination_reason = ( + instance.termination_reason = InstanceTerminationReason.ERROR.value + instance.termination_reason_message = ( f"Error to parse profile, requirements or instance_configuration: {e}" ) logger.warning( @@ -645,7 +653,7 @@ async def _create_instance(session: AsyncSession, instance: InstanceModel) -> No ) return - _mark_terminated(instance, "All offers failed" if offers else "No offers found") + _mark_terminated(instance, InstanceTerminationReason.NO_OFFERS.value) if ( instance.fleet and _is_fleet_master_instance(instance) @@ -656,7 +664,7 @@ async def _create_instance(session: AsyncSession, instance: InstanceModel) -> No for sibling_instance in instance.fleet.instances: if sibling_instance.id == instance.id: continue - _mark_terminated(sibling_instance, "Master instance failed to start") + _mark_terminated(sibling_instance, InstanceTerminationReason.MASTER_FAILED.value) def _mark_terminated(instance: InstanceModel, termination_reason: str) -> None: @@ -681,7 +689,7 @@ async def _check_instance(session: AsyncSession, instance: InstanceModel) -> Non ): # A busy instance could have no active jobs due to this bug: https://github.com/dstackai/dstack/issues/2068 instance.status = InstanceStatus.TERMINATING - instance.termination_reason = "Instance job finished" + instance.termination_reason = InstanceTerminationReason.JOB_FINISHED.value logger.info( "Detected busy instance %s with finished job. Marked as TERMINATING", instance.name, @@ -810,7 +818,7 @@ async def _check_instance(session: AsyncSession, instance: InstanceModel) -> Non deadline = instance.termination_deadline if get_current_datetime() > deadline: instance.status = InstanceStatus.TERMINATING - instance.termination_reason = "Termination deadline" + instance.termination_reason = InstanceTerminationReason.TERMINATION_TIMEOUT.value logger.warning( "Instance %s shim waiting timeout. Marked as TERMINATING", instance.name, @@ -839,7 +847,7 @@ async def _wait_for_instance_provisioning_data( "Instance %s failed because instance has not become running in time", instance.name ) instance.status = InstanceStatus.TERMINATING - instance.termination_reason = "Instance has not become running in time" + instance.termination_reason = InstanceTerminationReason.STARTING_TIMEOUT.value return backend = await backends_services.get_project_backend_by_type( @@ -852,7 +860,8 @@ async def _wait_for_instance_provisioning_data( instance.name, ) instance.status = InstanceStatus.TERMINATING - instance.termination_reason = "Backend not available" + instance.termination_reason = InstanceTerminationReason.ERROR.value + instance.termination_reason_message = "Backend not available" return try: await run_async( @@ -869,7 +878,8 @@ async def _wait_for_instance_provisioning_data( repr(e), ) instance.status = InstanceStatus.TERMINATING - instance.termination_reason = "Error while waiting for instance to become running" + instance.termination_reason = InstanceTerminationReason.ERROR.value + instance.termination_reason_message = "Error while waiting for instance to become running" except Exception: logger.exception( "Got exception when updating instance %s provisioning data", instance.name diff --git a/src/dstack/_internal/server/background/tasks/process_running_jobs.py b/src/dstack/_internal/server/background/tasks/process_running_jobs.py index aa69f2f33..080f8ffd2 100644 --- a/src/dstack/_internal/server/background/tasks/process_running_jobs.py +++ b/src/dstack/_internal/server/background/tasks/process_running_jobs.py @@ -18,6 +18,7 @@ from dstack._internal.core.models.files import FileArchiveMapping from dstack._internal.core.models.instances import ( InstanceStatus, + InstanceTerminationReason, RemoteConnectionInfo, SSHConnectionParams, ) @@ -58,6 +59,7 @@ from dstack._internal.server.services.jobs import ( find_job, get_job_attached_volumes, + get_job_provisioning_data, get_job_runtime_data, job_model_to_job_submission, ) @@ -381,9 +383,22 @@ async def _process_running_job(session: AsyncSession, job_model: JobModel): fmt(job_model), job_submission.age, ) - # TODO: Replace with JobTerminationReason.INSTANCE_UNREACHABLE in 0.20 or - # when CLI <= 0.19.8 is no longer supported - job_model.termination_reason = JobTerminationReason.INTERRUPTED_BY_NO_CAPACITY + if ( + job_model.instance is not None + and job_model.instance.termination_reason + == InstanceTerminationReason.NO_BALANCE.value + ): + # if instance was terminated due to no balance, set job termination reason accodingly + job_model.termination_reason = JobTerminationReason.NO_BALANCE + else: + job_provisioning_data = get_job_provisioning_data(job_model) + # use JobTerminationReason.INSTANCE_UNREACHABLE for on-demand instances only + job_model.termination_reason = ( + JobTerminationReason.INSTANCE_UNREACHABLE + if job_provisioning_data + and not job_provisioning_data.instance_type.resources.spot + else JobTerminationReason.INTERRUPTED_BY_NO_CAPACITY + ) job_model.status = JobStatus.TERMINATING else: logger.warning( diff --git a/src/dstack/_internal/server/migrations/versions/a16a05249504_instance_termination_reason_message.py b/src/dstack/_internal/server/migrations/versions/a16a05249504_instance_termination_reason_message.py new file mode 100644 index 000000000..7b5cfae47 --- /dev/null +++ b/src/dstack/_internal/server/migrations/versions/a16a05249504_instance_termination_reason_message.py @@ -0,0 +1,34 @@ +"""instance.termination_reason_message + +Revision ID: a16a05249504 +Revises: 2498ab323443 +Create Date: 2025-10-13 15:29:56.691164 + +""" + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = "a16a05249504" +down_revision = "2498ab323443" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table("instances", schema=None) as batch_op: + batch_op.add_column( + sa.Column("termination_reason_message", sa.String(length=4000), nullable=True) + ) + + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table("instances", schema=None) as batch_op: + batch_op.drop_column("termination_reason_message") + + # ### end Alembic commands ### diff --git a/src/dstack/_internal/server/models.py b/src/dstack/_internal/server/models.py index b21ba81a4..b6537b65b 100644 --- a/src/dstack/_internal/server/models.py +++ b/src/dstack/_internal/server/models.py @@ -614,7 +614,9 @@ class InstanceModel(BaseModel): # instance termination handling termination_deadline: Mapped[Optional[datetime]] = mapped_column(NaiveDateTime) + # TODO: Migrate to EnumAsString(InstanceTerminationReason, 100) after enough releases to ensure backward compatibility termination_reason: Mapped[Optional[str]] = mapped_column(String(4000)) + termination_reason_message: Mapped[Optional[str]] = mapped_column(String(4000)) # Deprecated since 0.19.22, not used health_status: Mapped[Optional[str]] = mapped_column(String(4000)) health: Mapped[HealthStatus] = mapped_column( diff --git a/src/dstack/_internal/server/services/instances.py b/src/dstack/_internal/server/services/instances.py index 7c679b0cc..aba50cfc5 100644 --- a/src/dstack/_internal/server/services/instances.py +++ b/src/dstack/_internal/server/services/instances.py @@ -122,6 +122,7 @@ def instance_model_to_instance(instance_model: InstanceModel) -> Instance: unreachable=instance_model.unreachable, health_status=instance_model.health, termination_reason=instance_model.termination_reason, + termination_reason_message=instance_model.termination_reason_message, created=instance_model.created_at, total_blocks=instance_model.total_blocks, busy_blocks=instance_model.busy_blocks, diff --git a/src/dstack/_internal/server/services/jobs/__init__.py b/src/dstack/_internal/server/services/jobs/__init__.py index ffea0c72e..e82300513 100644 --- a/src/dstack/_internal/server/services/jobs/__init__.py +++ b/src/dstack/_internal/server/services/jobs/__init__.py @@ -749,6 +749,8 @@ def _get_job_status_message(job_model: JobModel) -> str: return "stopped" elif job_model.termination_reason == JobTerminationReason.ABORTED_BY_USER: return "aborted" + elif job_model.termination_reason == JobTerminationReason.NO_BALANCE: + return "no balance" return job_model.status.value diff --git a/src/tests/_internal/core/models/test_runs.py b/src/tests/_internal/core/models/test_runs.py index 23b27c018..5e847beb4 100644 --- a/src/tests/_internal/core/models/test_runs.py +++ b/src/tests/_internal/core/models/test_runs.py @@ -43,6 +43,7 @@ def test_get_error_returns_expected_messages(): JobTerminationReason.ABORTED_BY_USER, JobTerminationReason.TERMINATED_BY_SERVER, JobTerminationReason.CONTAINER_EXITED_WITH_ERROR, + JobTerminationReason.NO_BALANCE, ] for reason in JobTerminationReason: diff --git a/src/tests/_internal/server/background/tasks/test_process_instances.py b/src/tests/_internal/server/background/tasks/test_process_instances.py index 690cb71d9..68dc5dea7 100644 --- a/src/tests/_internal/server/background/tasks/test_process_instances.py +++ b/src/tests/_internal/server/background/tasks/test_process_instances.py @@ -27,6 +27,7 @@ InstanceOffer, InstanceOfferWithAvailability, InstanceStatus, + InstanceTerminationReason, InstanceType, Resources, ) @@ -251,7 +252,7 @@ async def test_check_shim_terminate_instance_by_deadline(self, test_db, session: assert instance is not None assert instance.status == InstanceStatus.TERMINATING assert instance.termination_deadline == termination_deadline_time - assert instance.termination_reason == "Termination deadline" + assert instance.termination_reason == InstanceTerminationReason.TERMINATION_TIMEOUT.value @pytest.mark.asyncio @pytest.mark.parametrize( @@ -510,7 +511,7 @@ async def test_terminate_by_idle_timeout(self, test_db, session: AsyncSession): await session.refresh(instance) assert instance is not None assert instance.status == InstanceStatus.TERMINATING - assert instance.termination_reason == "Idle timeout" + assert instance.termination_reason == InstanceTerminationReason.IDLE_TIMEOUT.value class TestSSHInstanceTerminateProvisionTimeoutExpired: @@ -531,7 +532,7 @@ async def test_terminate_by_idle_timeout(self, test_db, session: AsyncSession): await session.refresh(instance) assert instance.status == InstanceStatus.TERMINATED - assert instance.termination_reason == "Provisioning timeout expired" + assert instance.termination_reason == InstanceTerminationReason.PROOVISIONING_TIMEOUT.value class TestTerminate: @@ -800,7 +801,7 @@ async def test_fails_if_all_offers_fail(self, session: AsyncSession, err: Except await session.refresh(instance) assert instance.status == InstanceStatus.TERMINATED - assert instance.termination_reason == "All offers failed" + assert instance.termination_reason == InstanceTerminationReason.NO_OFFERS.value async def test_fails_if_no_offers(self, session: AsyncSession): project = await create_project(session=session) @@ -813,19 +814,22 @@ async def test_fails_if_no_offers(self, session: AsyncSession): await session.refresh(instance) assert instance.status == InstanceStatus.TERMINATED - assert instance.termination_reason == "No offers found" + assert instance.termination_reason == InstanceTerminationReason.NO_OFFERS.value @pytest.mark.parametrize( ("placement", "expected_termination_reasons"), [ pytest.param( InstanceGroupPlacement.CLUSTER, - {"No offers found": 1, "Master instance failed to start": 3}, + { + InstanceTerminationReason.NO_OFFERS.value: 1, + InstanceTerminationReason.MASTER_FAILED.value: 3, + }, id="cluster", ), pytest.param( None, - {"No offers found": 4}, + {InstanceTerminationReason.NO_OFFERS.value: 4}, id="non-cluster", ), ], diff --git a/src/tests/_internal/server/background/tasks/test_process_running_jobs.py b/src/tests/_internal/server/background/tasks/test_process_running_jobs.py index e3cc011be..d13959b48 100644 --- a/src/tests/_internal/server/background/tasks/test_process_running_jobs.py +++ b/src/tests/_internal/server/background/tasks/test_process_running_jobs.py @@ -528,7 +528,7 @@ async def test_pulling_shim_failed(self, test_db, session: AsyncSession): assert SSHTunnelMock.call_count == 3 await session.refresh(job) assert job.status == JobStatus.TERMINATING - assert job.termination_reason == JobTerminationReason.INTERRUPTED_BY_NO_CAPACITY + assert job.termination_reason == JobTerminationReason.INSTANCE_UNREACHABLE assert job.remove_at is None @pytest.mark.asyncio diff --git a/src/tests/_internal/server/routers/test_fleets.py b/src/tests/_internal/server/routers/test_fleets.py index 934f333b6..7b34b20a6 100644 --- a/src/tests/_internal/server/routers/test_fleets.py +++ b/src/tests/_internal/server/routers/test_fleets.py @@ -402,6 +402,7 @@ async def test_creates_fleet(self, test_db, session: AsyncSession, client: Async "unreachable": False, "health_status": "healthy", "termination_reason": None, + "termination_reason_message": None, "created": "2023-01-02T03:04:00+00:00", "backend": None, "region": None, @@ -537,6 +538,7 @@ async def test_creates_ssh_fleet(self, test_db, session: AsyncSession, client: A "unreachable": False, "health_status": "healthy", "termination_reason": None, + "termination_reason_message": None, "created": "2023-01-02T03:04:00+00:00", "region": "remote", "availability_zone": None, @@ -708,6 +710,7 @@ async def test_updates_ssh_fleet(self, test_db, session: AsyncSession, client: A "unreachable": False, "health_status": "healthy", "termination_reason": None, + "termination_reason_message": None, "created": "2023-01-02T03:04:00+00:00", "region": "remote", "availability_zone": None, @@ -741,6 +744,7 @@ async def test_updates_ssh_fleet(self, test_db, session: AsyncSession, client: A "unreachable": False, "health_status": "healthy", "termination_reason": None, + "termination_reason_message": None, "created": "2023-01-02T03:04:00+00:00", "region": "remote", "availability_zone": None,