From 0a34b3be23173b8f46a5d513e7e2dcd5d26d6e21 Mon Sep 17 00:00:00 2001 From: nicoschmdt Date: Mon, 6 Apr 2026 17:38:00 -0300 Subject: [PATCH 1/3] core: streaming: cancel inner task when client disconnects --- .../src/commonwealth/utils/streaming.py | 28 +++++++++++++++---- 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/core/libs/commonwealth/src/commonwealth/utils/streaming.py b/core/libs/commonwealth/src/commonwealth/utils/streaming.py index f12d7daa05..3048c6c3ec 100644 --- a/core/libs/commonwealth/src/commonwealth/utils/streaming.py +++ b/core/libs/commonwealth/src/commonwealth/utils/streaming.py @@ -73,15 +73,27 @@ async def generator_wrapper(gen: AsyncGenerator[str | bytes, None], queue: async finally: if heartbeat_task: heartbeat_task.cancel() + try: + await gen.aclose() + except Exception: + pass await queue.put(None) - asyncio.create_task(generator_wrapper(gen, queue)) + wrapper_task = asyncio.create_task(generator_wrapper(gen, queue)) - while True: - item = await queue.get() - if item is None: - break - yield item + try: + while True: + item = await queue.get() + if item is None: + break + yield item + finally: + if not wrapper_task.done(): + wrapper_task.cancel() + try: + await wrapper_task + except asyncio.CancelledError: + pass async def _fetch_stream( @@ -93,6 +105,10 @@ async def _fetch_stream( except Exception as e: await queue.put((None, e)) finally: + try: + await gen.aclose() + except Exception: + pass await queue.put((None, None)) From c3358c661caeea77a99222f2eccddda0bcd0a271 Mon Sep 17 00:00:00 2001 From: Nico Date: Thu, 26 Mar 2026 11:38:08 -0300 Subject: [PATCH 2/3] kraken: defer state mutations until after image pull succeeds --- .../kraken/api/v1/routers/extension.py | 2 +- .../kraken/api/v2/routers/extension.py | 8 +-- core/services/kraken/extension/extension.py | 49 +++++++++---------- 3 files changed, 27 insertions(+), 32 deletions(-) diff --git a/core/services/kraken/api/v1/routers/extension.py b/core/services/kraken/api/v1/routers/extension.py index 12e5a757a4..5f406fd6ba 100644 --- a/core/services/kraken/api/v1/routers/extension.py +++ b/core/services/kraken/api/v1/routers/extension.py @@ -21,7 +21,7 @@ @extension_to_http_exception async def install_extension(body: ExtensionSource) -> StreamingResponse: extension = Extension(body) - return StreamingResponse(streamer(extension.install(atomic=True))) + return StreamingResponse(streamer(extension.install())) @extension_router_v1.post("/uninstall", status_code=status.HTTP_200_OK) diff --git a/core/services/kraken/api/v2/routers/extension.py b/core/services/kraken/api/v2/routers/extension.py index e05952b297..0d3861323a 100644 --- a/core/services/kraken/api/v2/routers/extension.py +++ b/core/services/kraken/api/v2/routers/extension.py @@ -83,7 +83,7 @@ async def install(body: ExtensionSource) -> StreamingResponse: can install incompatible extensions. Make sure to check the extension source before installing it. """ extension = Extension(body) - return StreamingResponse(streamer(extension.install(atomic=True))) + return StreamingResponse(streamer(extension.install())) @extension_router_v2.post("/{identifier}/install", status_code=status.HTTP_201_CREATED) @@ -149,13 +149,13 @@ async def update_to_latest(identifier: str, purge: bool = True, stable: bool = T @extension_router_v2.put("/{identifier}/{tag}", status_code=status.HTTP_200_OK) @extension_to_http_exception -async def update_to_tag(identifier: str, tag: str, purge: bool = True) -> Response: +async def update_to_tag(identifier: str, tag: str, purge: bool = True, should_enable: bool = True) -> Response: """ Update a given extension by its identifier and tag to latest version on the higher priority manifest and by default purge all other tags, if purge is set to false it will keep all other versions disabled only. """ extension = cast(Extension, await Extension.from_manifest(identifier, tag)) - return StreamingResponse(streamer(extension.update(purge))) + return StreamingResponse(streamer(extension.update(purge, should_enable))) @extension_router_v2.delete("/{identifier}", status_code=status.HTTP_202_ACCEPTED) @@ -254,4 +254,4 @@ async def finalize_extension( new_extension = await Extension.finalize_temporary_extension(temp_extension, body.identifier, body) # Install the extension - return StreamingResponse(streamer(new_extension.install(atomic=True))) + return StreamingResponse(streamer(new_extension.install())) diff --git a/core/services/kraken/extension/extension.py b/core/services/kraken/extension/extension.py index 53257a2649..5ba8636e0a 100644 --- a/core/services/kraken/extension/extension.py +++ b/core/services/kraken/extension/extension.py @@ -171,7 +171,7 @@ async def _disable_running_extension(self) -> Optional["Extension"]: except ExtensionNotRunning: return None - def _create_extension_settings(self) -> ExtensionSettings: + def _create_extension_settings(self, should_enable: bool = True) -> ExtensionSettings: """Create and save extension settings.""" new_extension = ExtensionSettings( identifier=self.identifier, @@ -179,10 +179,9 @@ def _create_extension_settings(self) -> ExtensionSettings: docker=self.source.docker, tag=self.tag, permissions=self.source.permissions, - enabled=True, + enabled=should_enable, user_permissions=self.source.user_permissions, ) - # Save in settings first, if the image fails to install it will try to fetch after in main kraken check loop self._save_settings(new_extension) return new_extension @@ -216,6 +215,10 @@ async def _pull_docker_image(self, docker_auth: Optional[str]) -> AsyncGenerator if self.digest: await client.images.tag(tag, f"{self.source.docker}:{self.tag}") + @staticmethod + def _status_message(message: str) -> bytes: + return json.dumps({"status": message}).encode("utf-8") + async def _clear_remaining_tags(self) -> None: """Uninstall all other tags for this extension.""" logger.info(f"Clearing remaining tags for extension {self.identifier}") @@ -223,13 +226,11 @@ async def _clear_remaining_tags(self) -> None: to_clear = [version for version in to_clear if version.source.tag != self.tag] await asyncio.gather(*(version.uninstall() for version in to_clear)) - async def install(self, clear_remaining_tags: bool = True, atomic: bool = False) -> AsyncGenerator[bytes, None]: + async def install( + self, clear_remaining_tags: bool = True, should_enable: bool = True + ) -> AsyncGenerator[bytes, None]: logger.info(f"Installing extension {self.identifier}:{self.tag}") - # First we should make sure no other tag is running - running_ext = await self._disable_running_extension() - - self._create_extension_settings() try: self.lock(self.unique_entry) @@ -237,33 +238,27 @@ async def install(self, clear_remaining_tags: bool = True, atomic: bool = False) async for line in self._pull_docker_image(docker_auth): yield line except Exception as error: - # In case of some external installs kraken shouldn't try to install it again so we remove from settings - if atomic: - should_raise = False - if await self._image_is_available_locally(): - logger.info(f"Pull failed but image {self.identifier}:{self.tag} is already available locally") - else: - if not running_ext or self.unique_entry != running_ext.unique_entry: - should_raise = True - await self.uninstall() - if running_ext: - await running_ext.enable() - - if should_raise: - raise ExtensionPullFailed(f"Failed to pull extension {self.identifier}:{self.tag}") from error - # Reached only if the extensions are the same, the change is in permissions, not installation failure. - return + if await self._image_is_available_locally(): + logger.info(f"Pull failed but image {self.identifier}:{self.tag} is already available locally") + else: + raise ExtensionPullFailed(f"Failed to pull extension {self.identifier}:{self.tag}") from error finally: self.unlock(self.unique_entry) self.reset_start_attempt(self.unique_entry) - logger.info(f"Extension {self.identifier}:{self.tag} installed") + await self._disable_running_extension() + self._create_extension_settings(should_enable) + yield self._status_message(f"Extension {self.identifier}:{self.tag} registered") + # Uninstall all other tags in case user wants to clear them if clear_remaining_tags: await self._clear_remaining_tags() + yield self._status_message(f"Previous versions of {self.identifier} cleared") + + logger.info(f"Extension {self.identifier}:{self.tag} installed") - async def update(self, clear_remaining_tags: bool) -> AsyncGenerator[bytes, None]: - async for data in self.install(clear_remaining_tags): + async def update(self, clear_remaining_tags: bool, should_enable: bool = True) -> AsyncGenerator[bytes, None]: + async for data in self.install(clear_remaining_tags, should_enable=should_enable): yield data async def uninstall(self) -> None: From f525fc74f670b0385a8e441138d62c93498eff1a Mon Sep 17 00:00:00 2001 From: Nico Date: Thu, 26 Mar 2026 11:38:48 -0300 Subject: [PATCH 3/3] frontend: kraken: add cancellation support for extension install and update --- .../src/components/kraken/KrakenManager.ts | 19 ++++ .../src/components/utils/PullProgress.vue | 9 ++ .../src/views/ExtensionManagerView.vue | 98 ++++++++++++++----- 3 files changed, 100 insertions(+), 26 deletions(-) diff --git a/core/frontend/src/components/kraken/KrakenManager.ts b/core/frontend/src/components/kraken/KrakenManager.ts index 451c4b6d54..55f70202ab 100644 --- a/core/frontend/src/components/kraken/KrakenManager.ts +++ b/core/frontend/src/components/kraken/KrakenManager.ts @@ -196,6 +196,7 @@ export async function setManifestSourceOrder(identifier: string, order: number): export async function installExtension( extension: InstalledExtensionData, progressHandler: (event: any) => void, + signal?: AbortSignal, ): Promise { await back_axios({ url: `${KRAKEN_API_V2_URL}/extension/install`, @@ -211,6 +212,7 @@ export async function installExtension( }, timeout: 600000, onDownloadProgress: progressHandler, + signal, }) } @@ -250,6 +252,18 @@ export async function uninstallExtension(identifier: string): Promise { }) } +/** + * Uninstall a specific version of an extension by its identifier and tag, uses API v2 + * @param {string} identifier The identifier of the extension + * @param {string} tag The tag of the version to uninstall + */ +export async function uninstallExtensionVersion(identifier: string, tag: string): Promise { + await back_axios({ + method: 'DELETE', + url: `${KRAKEN_API_V2_URL}/extension/${identifier}/${tag}`, + }) +} + /** * Restart an extension by its identifier, uses API v2 * @param {string} identifier The identifier of the extension @@ -272,12 +286,14 @@ export async function updateExtensionToVersion( identifier: string, version: string, progressHandler: (event: any) => void, + signal?: AbortSignal, ): Promise { await back_axios({ url: `${KRAKEN_API_V2_URL}/extension/${identifier}/${version}`, method: 'PUT', timeout: 120000, onDownloadProgress: progressHandler, + signal, }) } @@ -370,6 +386,7 @@ export async function finalizeExtension( extension: InstalledExtensionData, tempTag: string, progressHandler: (event: any) => void, + signal?: AbortSignal, ): Promise { await back_axios({ method: 'POST', @@ -385,6 +402,7 @@ export async function finalizeExtension( }, timeout: 120000, onDownloadProgress: progressHandler, + signal, }) } @@ -430,6 +448,7 @@ export default { enableExtension, disableExtension, uninstallExtension, + uninstallExtensionVersion, restartExtension, listContainers, getContainersStats, diff --git a/core/frontend/src/components/utils/PullProgress.vue b/core/frontend/src/components/utils/PullProgress.vue index 001a8d4df7..880a8a51ec 100755 --- a/core/frontend/src/components/utils/PullProgress.vue +++ b/core/frontend/src/components/utils/PullProgress.vue @@ -50,6 +50,11 @@ + + + Cancel + + @@ -80,6 +85,10 @@ export default Vue.extend({ type: String, required: true, }, + cancelable: { + type: Boolean, + default: false, + }, }, data() { return { diff --git a/core/frontend/src/views/ExtensionManagerView.vue b/core/frontend/src/views/ExtensionManagerView.vue index ffd9ebfe02..54a2d55747 100644 --- a/core/frontend/src/views/ExtensionManagerView.vue +++ b/core/frontend/src/views/ExtensionManagerView.vue @@ -7,6 +7,8 @@ :download="download_percentage" :extraction="extraction_percentage" :statustext="status_text" + :cancelable="!!active_abort_controller" + @cancel="cancelInstallOperation" /> this.handleDownloadProgress(progressEvent.event, tracker), + controller.signal, ) .then(() => { - this.fetchInstalledExtensions() notifier.pushSuccess('EXTENSION_UPDATE_SUCCESS', `${extension.name} updated successfully.`, true) }) - .catch((error) => { - this.alerter = true - this.alerter_error = String(error) + .catch(async (error) => { + if (axios.isCancel(error)) { + if (controller !== this.active_abort_controller) return + await kraken.uninstallExtensionVersion(extension.identifier, version) + .catch(() => { /* version may not be registered yet */ }) + notifier.pushInfo('EXTENSION_UPDATE_CANCELLED', 'Extension update was cancelled.', true) + return + } + this.showAlertError(error) notifier.pushBackError('EXTENSION_UPDATE_FAIL', error) }) .finally(() => { - this.clearInstallingState() - this.resetPullOutput() + if (controller === this.active_abort_controller) this.finishInstallOperation() }) }, metricsFor(extension: InstalledExtensionData): { cpu: number, memory: number} | Record { @@ -929,24 +959,30 @@ export default Vue.extend({ this.setInstallingState(extension.identifier, 'install') this.show_dialog = false this.show_pull_output = true - const tracker = this.getTracker() + const controller = this.beginInstallOperation() + const tracker = this.getTracker(controller.signal) kraken.installExtension( extension, (progressEvent) => this.handleDownloadProgress(progressEvent.event, tracker), + controller.signal, ) .then(() => { - this.fetchInstalledExtensions() notifier.pushSuccess('EXTENSION_INSTALL_SUCCESS', `${extension.name} installed successfully.`, true) }) - .catch((error) => { - this.alerter = true - this.alerter_error = String(error) - notifier.pushBackError('EXTENSIONS_INSTALL_FAIL', error) + .catch(async (error) => { + if (axios.isCancel(error)) { + if (controller !== this.active_abort_controller) return + await kraken.uninstallExtensionVersion(extension.identifier, extension.tag) + .catch(() => { /* version may not be registered yet */ }) + notifier.pushInfo('EXTENSION_INSTALL_CANCELLED', 'Extension install was cancelled.', true) + return + } + this.showAlertError(error) + notifier.pushBackError('EXTENSION_INSTALL_FAIL', error) }) .finally(() => { - this.clearInstallingState() - this.resetPullOutput() + if (controller === this.active_abort_controller) this.finishInstallOperation() }) }, async performActionFromModal( @@ -1055,7 +1091,7 @@ export default Vue.extend({ temp[extension.identifier].loading = loading this.installed_extensions = temp }, - getTracker(): PullTracker { + getTracker(signal: AbortSignal): PullTracker { return new PullTracker( () => { setTimeout(() => { @@ -1063,9 +1099,9 @@ export default Vue.extend({ }, 1000) }, (error) => { - this.alerter = true - this.alerter_error = String(error) - notifier.pushBackError('EXTENSIONS_INSTALL_FAIL', error) + if (signal.aborted) return + this.showAlertError(error) + notifier.pushBackError('EXTENSION_INSTALL_FAIL', error) this.show_pull_output = false }, ) @@ -1119,7 +1155,8 @@ export default Vue.extend({ } this.show_pull_output = true - const tracker = this.getTracker() + const controller = this.beginInstallOperation() + const tracker = this.getTracker(controller.signal) this.setInstallFromFilePhase('installing') this.install_from_file_install_progress = 0 this.install_from_file_status_text = 'Starting installation...' @@ -1129,20 +1166,29 @@ export default Vue.extend({ extension, this.upload_temp_tag, (progressEvent) => this.handleDownloadProgress(progressEvent.event, tracker), + controller.signal, ) this.setInstallFromFilePhase('success') this.install_from_file_status_text = 'Extension installed successfully' this.stopUploadKeepAlive() this.upload_temp_tag = null this.upload_metadata = null - this.fetchInstalledExtensions() } catch (error) { - this.applyInstallFromFileError(String(error)) - this.alerter = true - this.alerter_error = String(error) - notifier.pushBackError('EXTENSION_FINALIZE_FAIL', error) + if (axios.isCancel(error)) { + if (controller === this.active_abort_controller) { + this.setInstallFromFilePhase('ready') + this.install_from_file_status_text = '' + await kraken.uninstallExtensionVersion(extension.identifier, extension.tag) + .catch(() => { /* version may not be registered yet */ }) + notifier.pushInfo('EXTENSION_INSTALL_CANCELLED', 'Installation from file was cancelled.', true) + } + } else { + this.applyInstallFromFileError(String(error)) + this.showAlertError(error) + notifier.pushBackError('EXTENSION_FINALIZE_FAIL', error) + } } finally { - this.resetPullOutput() + if (controller === this.active_abort_controller) this.finishInstallOperation() } }, setInstallingState(identifier: string, action: 'install' | 'update'): void {