Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions core/frontend/src/components/kraken/KrakenManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ export async function setManifestSourceOrder(identifier: string, order: number):
export async function installExtension(
extension: InstalledExtensionData,
progressHandler: (event: any) => void,
signal?: AbortSignal,
): Promise<void> {
await back_axios({
url: `${KRAKEN_API_V2_URL}/extension/install`,
Expand All @@ -211,6 +212,7 @@ export async function installExtension(
},
timeout: 600000,
onDownloadProgress: progressHandler,
signal,
})
}

Expand Down Expand Up @@ -250,6 +252,18 @@ export async function uninstallExtension(identifier: string): Promise<void> {
})
}

/**
* Uninstall a specific version of an extension by its identifier and tag, uses API v2
* @param {string} identifier The identifier of the extension
* @param {string} tag The tag of the version to uninstall
*/
export async function uninstallExtensionVersion(identifier: string, tag: string): Promise<void> {
await back_axios({
method: 'DELETE',
url: `${KRAKEN_API_V2_URL}/extension/${identifier}/${tag}`,
})
}

/**
* Restart an extension by its identifier, uses API v2
* @param {string} identifier The identifier of the extension
Expand All @@ -272,12 +286,14 @@ export async function updateExtensionToVersion(
identifier: string,
version: string,
progressHandler: (event: any) => void,
signal?: AbortSignal,
): Promise<void> {
await back_axios({
url: `${KRAKEN_API_V2_URL}/extension/${identifier}/${version}`,
method: 'PUT',
timeout: 120000,
onDownloadProgress: progressHandler,
signal,
})
}

Expand Down Expand Up @@ -370,6 +386,7 @@ export async function finalizeExtension(
extension: InstalledExtensionData,
tempTag: string,
progressHandler: (event: any) => void,
signal?: AbortSignal,
): Promise<void> {
await back_axios({
method: 'POST',
Expand All @@ -385,6 +402,7 @@ export async function finalizeExtension(
},
timeout: 120000,
onDownloadProgress: progressHandler,
signal,
})
}

Expand Down Expand Up @@ -430,6 +448,7 @@ export default {
enableExtension,
disableExtension,
uninstallExtension,
uninstallExtensionVersion,
restartExtension,
listContainers,
getContainersStats,
Expand Down
9 changes: 9 additions & 0 deletions core/frontend/src/components/utils/PullProgress.vue
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@
</v-expansion-panel>
</v-expansion-panels>
</v-card-text>
<v-card-actions v-if="cancelable" class="justify-end">
<v-btn color="primary" @click="$emit('cancel')">
Cancel
</v-btn>
</v-card-actions>
</v-card>
</v-dialog>
</template>
Expand Down Expand Up @@ -80,6 +85,10 @@ export default Vue.extend({
type: String,
required: true,
},
cancelable: {
type: Boolean,
default: false,
},
},
data() {
return {
Expand Down
98 changes: 72 additions & 26 deletions core/frontend/src/views/ExtensionManagerView.vue
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
:download="download_percentage"
:extraction="extraction_percentage"
:statustext="status_text"
:cancelable="!!active_abort_controller"
@cancel="cancelInstallOperation"
/>
<v-dialog
v-model="show_dialog"
Expand Down Expand Up @@ -510,6 +512,7 @@ export default Vue.extend({
active_operation_identifier: localStorage.getItem(ACTIVE_OPERATION_KEY) as null | string,
active_operation_type: (localStorage.getItem(ACTIVE_OPERATION_TYPE_KEY) ?? null) as null | 'install' | 'update',
session: null as Session | null,
active_abort_controller: null as null | AbortController,
}
},
computed: {
Expand Down Expand Up @@ -619,6 +622,8 @@ export default Vue.extend({
clearInterval(this.metrics_interval)
this.stopUploadKeepAlive()
this.session = null
this.active_abort_controller?.abort()
this.active_abort_controller = null
},
methods: {
async initializeZenohSession() {
Expand All @@ -635,6 +640,25 @@ export default Vue.extend({
clearEditedExtension() {
this.edited_extension = null
},
beginInstallOperation(): AbortController {
this.active_abort_controller?.abort()
const controller = new AbortController()
this.active_abort_controller = controller
return controller
},
cancelInstallOperation(): void {
this.active_abort_controller?.abort()
},
showAlertError(error: unknown): void {
this.alerter = true
this.alerter_error = String(error)
},
finishInstallOperation(): void {
this.active_abort_controller = null
this.clearInstallingState()
this.resetPullOutput()
this.fetchInstalledExtensions()
},
setInstallFromFilePhase(phase: TarInstallPhase) {
this.install_from_file_phase = phase
if (phase !== 'error') {
Expand Down Expand Up @@ -791,24 +815,30 @@ export default Vue.extend({
async update(extension: InstalledExtensionData, version: string) {
this.setInstallingState(extension.identifier, 'update')
this.show_pull_output = true
const tracker = this.getTracker()
const controller = this.beginInstallOperation()
const tracker = this.getTracker(controller.signal)
kraken.updateExtensionToVersion(
extension.identifier,
version,
(progressEvent) => this.handleDownloadProgress(progressEvent.event, tracker),
controller.signal,
)
.then(() => {
this.fetchInstalledExtensions()
notifier.pushSuccess('EXTENSION_UPDATE_SUCCESS', `${extension.name} updated successfully.`, true)
})
.catch((error) => {
this.alerter = true
this.alerter_error = String(error)
.catch(async (error) => {
if (axios.isCancel(error)) {
if (controller !== this.active_abort_controller) return
await kraken.uninstallExtensionVersion(extension.identifier, version)
.catch(() => { /* version may not be registered yet */ })
notifier.pushInfo('EXTENSION_UPDATE_CANCELLED', 'Extension update was cancelled.', true)
return
}
this.showAlertError(error)
notifier.pushBackError('EXTENSION_UPDATE_FAIL', error)
})
.finally(() => {
this.clearInstallingState()
this.resetPullOutput()
if (controller === this.active_abort_controller) this.finishInstallOperation()
})
},
metricsFor(extension: InstalledExtensionData): { cpu: number, memory: number} | Record<string, never> {
Expand Down Expand Up @@ -929,24 +959,30 @@ export default Vue.extend({
this.setInstallingState(extension.identifier, 'install')
this.show_dialog = false
this.show_pull_output = true
const tracker = this.getTracker()
const controller = this.beginInstallOperation()
const tracker = this.getTracker(controller.signal)

kraken.installExtension(
extension,
(progressEvent) => this.handleDownloadProgress(progressEvent.event, tracker),
controller.signal,
)
.then(() => {
this.fetchInstalledExtensions()
notifier.pushSuccess('EXTENSION_INSTALL_SUCCESS', `${extension.name} installed successfully.`, true)
})
.catch((error) => {
this.alerter = true
this.alerter_error = String(error)
notifier.pushBackError('EXTENSIONS_INSTALL_FAIL', error)
.catch(async (error) => {
if (axios.isCancel(error)) {
if (controller !== this.active_abort_controller) return
await kraken.uninstallExtensionVersion(extension.identifier, extension.tag)
.catch(() => { /* version may not be registered yet */ })
notifier.pushInfo('EXTENSION_INSTALL_CANCELLED', 'Extension install was cancelled.', true)
return
}
this.showAlertError(error)
notifier.pushBackError('EXTENSION_INSTALL_FAIL', error)
})
.finally(() => {
this.clearInstallingState()
this.resetPullOutput()
if (controller === this.active_abort_controller) this.finishInstallOperation()
})
},
async performActionFromModal(
Expand Down Expand Up @@ -1055,17 +1091,17 @@ export default Vue.extend({
temp[extension.identifier].loading = loading
this.installed_extensions = temp
},
getTracker(): PullTracker {
getTracker(signal: AbortSignal): PullTracker {
return new PullTracker(
() => {
setTimeout(() => {
this.show_pull_output = false
}, 1000)
},
(error) => {
this.alerter = true
this.alerter_error = String(error)
notifier.pushBackError('EXTENSIONS_INSTALL_FAIL', error)
if (signal.aborted) return
this.showAlertError(error)
notifier.pushBackError('EXTENSION_INSTALL_FAIL', error)
this.show_pull_output = false
},
)
Expand Down Expand Up @@ -1119,7 +1155,8 @@ export default Vue.extend({
}

this.show_pull_output = true
const tracker = this.getTracker()
const controller = this.beginInstallOperation()
const tracker = this.getTracker(controller.signal)
this.setInstallFromFilePhase('installing')
this.install_from_file_install_progress = 0
this.install_from_file_status_text = 'Starting installation...'
Expand All @@ -1129,20 +1166,29 @@ export default Vue.extend({
extension,
this.upload_temp_tag,
(progressEvent) => this.handleDownloadProgress(progressEvent.event, tracker),
controller.signal,
)
this.setInstallFromFilePhase('success')
this.install_from_file_status_text = 'Extension installed successfully'
this.stopUploadKeepAlive()
this.upload_temp_tag = null
this.upload_metadata = null
this.fetchInstalledExtensions()
} catch (error) {
this.applyInstallFromFileError(String(error))
this.alerter = true
this.alerter_error = String(error)
notifier.pushBackError('EXTENSION_FINALIZE_FAIL', error)
if (axios.isCancel(error)) {
if (controller === this.active_abort_controller) {
this.setInstallFromFilePhase('ready')
this.install_from_file_status_text = ''
await kraken.uninstallExtensionVersion(extension.identifier, extension.tag)
.catch(() => { /* version may not be registered yet */ })
notifier.pushInfo('EXTENSION_INSTALL_CANCELLED', 'Installation from file was cancelled.', true)
}
} else {
this.applyInstallFromFileError(String(error))
this.showAlertError(error)
notifier.pushBackError('EXTENSION_FINALIZE_FAIL', error)
}
} finally {
this.resetPullOutput()
if (controller === this.active_abort_controller) this.finishInstallOperation()
}
},
setInstallingState(identifier: string, action: 'install' | 'update'): void {
Expand Down
28 changes: 22 additions & 6 deletions core/libs/commonwealth/src/commonwealth/utils/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,27 @@ async def generator_wrapper(gen: AsyncGenerator[str | bytes, None], queue: async
finally:
if heartbeat_task:
heartbeat_task.cancel()
try:
await gen.aclose()
except Exception:
pass
await queue.put(None)

asyncio.create_task(generator_wrapper(gen, queue))
wrapper_task = asyncio.create_task(generator_wrapper(gen, queue))

while True:
item = await queue.get()
if item is None:
break
yield item
try:
while True:
item = await queue.get()
if item is None:
break
yield item
finally:
if not wrapper_task.done():
wrapper_task.cancel()
try:
await wrapper_task
except asyncio.CancelledError:
pass


async def _fetch_stream(
Expand All @@ -93,6 +105,10 @@ async def _fetch_stream(
except Exception as e:
await queue.put((None, e))
finally:
try:
await gen.aclose()
except Exception:
pass
await queue.put((None, None))


Expand Down
2 changes: 1 addition & 1 deletion core/services/kraken/api/v1/routers/extension.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
@extension_to_http_exception
async def install_extension(body: ExtensionSource) -> StreamingResponse:
extension = Extension(body)
return StreamingResponse(streamer(extension.install(atomic=True)))
return StreamingResponse(streamer(extension.install()))


@extension_router_v1.post("/uninstall", status_code=status.HTTP_200_OK)
Expand Down
8 changes: 4 additions & 4 deletions core/services/kraken/api/v2/routers/extension.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ async def install(body: ExtensionSource) -> StreamingResponse:
can install incompatible extensions. Make sure to check the extension source before installing it.
"""
extension = Extension(body)
return StreamingResponse(streamer(extension.install(atomic=True)))
return StreamingResponse(streamer(extension.install()))


@extension_router_v2.post("/{identifier}/install", status_code=status.HTTP_201_CREATED)
Expand Down Expand Up @@ -149,13 +149,13 @@ async def update_to_latest(identifier: str, purge: bool = True, stable: bool = T

@extension_router_v2.put("/{identifier}/{tag}", status_code=status.HTTP_200_OK)
@extension_to_http_exception
async def update_to_tag(identifier: str, tag: str, purge: bool = True) -> Response:
async def update_to_tag(identifier: str, tag: str, purge: bool = True, should_enable: bool = True) -> Response:
"""
Update a given extension by its identifier and tag to latest version on the higher priority manifest and by default
purge all other tags, if purge is set to false it will keep all other versions disabled only.
"""
extension = cast(Extension, await Extension.from_manifest(identifier, tag))
return StreamingResponse(streamer(extension.update(purge)))
return StreamingResponse(streamer(extension.update(purge, should_enable)))


@extension_router_v2.delete("/{identifier}", status_code=status.HTTP_202_ACCEPTED)
Expand Down Expand Up @@ -254,4 +254,4 @@ async def finalize_extension(
new_extension = await Extension.finalize_temporary_extension(temp_extension, body.identifier, body)

# Install the extension
return StreamingResponse(streamer(new_extension.install(atomic=True)))
return StreamingResponse(streamer(new_extension.install()))
Loading
Loading