From bd7a29673c788d86145564a5732b7164873e965c Mon Sep 17 00:00:00 2001 From: alexcos20 Date: Fri, 6 Feb 2026 12:15:42 +0200 Subject: [PATCH 1/6] use privateRegistry per job --- docs/API.md | 29 +-- docs/env.md | 167 ++++++++++++++++++ src/@types/C2D/C2D.ts | 1 + src/@types/commands.ts | 2 + src/components/c2d/compute_engine_base.ts | 9 +- src/components/c2d/compute_engine_docker.ts | 94 ++++++++-- src/components/c2d/index.ts | 3 +- src/components/core/compute/initialize.ts | 45 ++++- src/components/core/compute/startCompute.ts | 6 +- src/components/httpRoutes/compute.ts | 8 +- .../integration/dockerRegistryAuth.test.ts | 6 +- 11 files changed, 328 insertions(+), 42 deletions(-) diff --git a/docs/API.md b/docs/API.md index f9093b62a..df2d2b17c 100644 --- a/docs/API.md +++ b/docs/API.md @@ -1402,20 +1402,21 @@ starts a free compute job and returns jobId if succesfull #### Parameters -| name | type | required | description | -| ----------------- | ------ | -------- | ----------------------------------------------------------------------------- | -| command | string | v | command name | -| node | string | | if not present it means current node | -| consumerAddress | string | v | consumer address | -| signature | string | v | signature (msg=String(nonce) ) | -| nonce | string | v | nonce for the request | -| datasets | object | | list of ComputeAsset to be used as inputs | -| algorithm | object | | ComputeAlgorithm definition | -| environment | string | v | compute environment to use | -| resources | object | | optional list of required resources | -| metadata | object | | optional metadata for the job, data provided by the user | -| additionalViewers | object | | optional array of addresses that are allowed to fetch the result | -| queueMaxWaitTime | number | | optional max time in seconds a job can wait in the queue before being started | +| name | type | required | description | +| --------------------------- | ------ | -------------------------------------------- | ----------------------------------------------------------------------------- | +| command | string | v | command name | +| node | string | | if not present it means current node | +| consumerAddress | string | v | consumer address | +| signature | string | v | signature (msg=String(nonce) ) | +| nonce | string | v | nonce for the request | +| datasets | object | | list of ComputeAsset to be used as inputs | +| algorithm | object | | ComputeAlgorithm definition | +| environment | string | v | compute environment to use | +| resources | object | | optional list of required resources | +| metadata | object | | optional metadata for the job, data provided by the user | +| additionalViewers | object | | optional array of addresses that are allowed to fetch the result | +| queueMaxWaitTime | number | | optional max time in seconds a job can wait in the queue before being started | +| encryptedDockerRegistryAuth | string | Ecies encrypted docker auth schema for image | #### Request diff --git a/docs/env.md b/docs/env.md index 59e41d03b..36aba3222 100644 --- a/docs/env.md +++ b/docs/env.md @@ -261,3 +261,170 @@ The `DOCKER_COMPUTE_ENVIRONMENTS` environment variable should be a JSON array of - For GitLab Container Registry, use a personal access token (PAT) or deploy token. - The registry URL must match exactly (including protocol) with the registry used in the Docker image reference. - If no credentials are configured for a registry, the node will attempt unauthenticated access (works for public images only). + +--- + +## Private Docker Registries with Per-Job Authentication + +In addition to node-level registry authentication via `DOCKER_REGISTRY_AUTHS`, you can provide encrypted Docker registry authentication credentials on a per-job basis. This allows different users to use different private registries or credentials for their compute jobs. + +### Overview + +The `encryptedDockerRegistryAuth` parameter allows you to securely provide Docker registry credentials that are: + +- Encrypted using ECIES (Elliptic Curve Integrated Encryption Scheme) with the node's public key +- Validated to ensure proper format (either `auth` string OR `username`+`password`) +- Used only for the specific compute job, overriding node-level configuration if provided + +### Encryption Format + +The `encryptedDockerRegistryAuth` must be: + +1. A JSON object matching the Docker registry auth schema (see below) +2. Encrypted using ECIES with the node's public key +3. Hex-encoded as a string + +**Auth Schema Format:** + +The decrypted JSON must follow this structure: + +```json +{ + "username": "myuser", + "password": "mypassword" +} +``` + +OR + +```json +{ + "auth": "base64-encoded-username:password" +} +``` + +OR (all fields present) + +```json +{ + "username": "myuser", + "password": "mypassword", + "auth": "base64-encoded-username:password" +} +``` + +**Validation Rules:** + +- Either `auth` string must be provided (non-empty), OR +- Both `username` AND `password` must be provided (both non-empty) +- Empty strings are not accepted + +### Usage Examples + +#### 1. Paid Compute Start (`POST /api/services/compute`) + +```json +{ + "command": "startCompute", + "consumerAddress": "0x...", + "signature": "...", + "nonce": "123", + "environment": "0x...", + "algorithm": { + "meta": { + "container": { + "image": "registry.example.com/myorg/myimage:latest" + } + } + }, + "datasets": [], + "payment": { ... }, + "encryptedDockerRegistryAuth": "0xdeadbeef..." // ECIES encrypted hex string +} +``` + +#### 2. Free Compute Start (`POST /api/services/freeCompute`) + +```json +{ + "command": "freeStartCompute", + "consumerAddress": "0x...", + "signature": "...", + "nonce": "123", + "environment": "0x...", + "algorithm": { + "meta": { + "container": { + "image": "ghcr.io/myorg/myimage:latest" + } + } + }, + "datasets": [], + "encryptedDockerRegistryAuth": "0xdeadbeef..." // ECIES encrypted hex string +} +``` + +#### 3. Initialize Compute + +The `initialize` command accepts `encryptedDockerRegistryAuth` as part of the command payload, as it validates the image + +```json +{ + "command": "initialize", + "datasets": [...], + "algorithm": { + "meta": { + "container": { + "image": "registry.gitlab.com/myorg/myimage:latest" + } + } + }, + "environment": "0x...", + "payment": { ... }, + "consumerAddress": "0x...", + "maxJobDuration": 3600, + "encryptedDockerRegistryAuth": "0xdeadbeef..." // ECIES encrypted hex string +} +``` + +### Encryption Process + +To create `encryptedDockerRegistryAuth`, you need to: + +1. **Prepare the auth JSON object:** + + ```json + { + "username": "myuser", + "password": "mypassword" + } + ``` + +2. **Get the node's public key** (available via the node's API or P2P interface) + +3. **Encrypt the JSON string** using ECIES with the node's public key + +4. **Hex-encode the encrypted result** + +### Behavior + +- **Priority**: If `encryptedDockerRegistryAuth` is provided, it takes precedence over node-level `DOCKER_REGISTRY_AUTHS` configuration for that specific job +- **Validation**: The encrypted auth is decrypted and validated before the job starts. Invalid formats will result in an error +- **Scope**: The credentials are used for: + - Validating the Docker image exists (during initialize) + - Pulling the Docker image (during job execution) +- **Security**: Credentials are encrypted and only decrypted by the node using its private key + +### Error Handling + +If `encryptedDockerRegistryAuth` is invalid, you'll receive an error: + +- **Decryption failure**: `Invalid encryptedDockerRegistryAuth: failed to parse JSON - [error message]` +- **Schema validation failure**: `Invalid encryptedDockerRegistryAuth: Either 'auth' must be provided, or both 'username' and 'password' must be provided` + +### Notes + +- The `encryptedDockerRegistryAuth` parameter is optional. If not provided, the node will use `DOCKER_REGISTRY_AUTHS` configuration or attempt unauthenticated access +- The registry URL in the Docker image reference must match the registry you're authenticating to +- For Docker Hub, use `registry-1.docker.io` as the registry URL +- Credentials are stored encrypted in the job record and decrypted only when needed for image operations diff --git a/src/@types/C2D/C2D.ts b/src/@types/C2D/C2D.ts index 6a6213a2e..530df3252 100644 --- a/src/@types/C2D/C2D.ts +++ b/src/@types/C2D/C2D.ts @@ -264,6 +264,7 @@ export interface DBComputeJob extends ComputeJob { metadata?: DBComputeJobMetadata additionalViewers?: string[] // addresses of additional addresses that can get results algoDuration: number // duration of the job in seconds + encryptedDockerRegistryAuth?: string } // make sure we keep them both in sync diff --git a/src/@types/commands.ts b/src/@types/commands.ts index 4ebb78db4..5642bb087 100644 --- a/src/@types/commands.ts +++ b/src/@types/commands.ts @@ -200,6 +200,7 @@ export interface ComputeInitializeCommand extends Command { maxJobDuration: number policyServer?: any // object to pass to policy server queueMaxWaitTime?: number // max time in seconds a job can wait in the queue before being started + encryptedDockerRegistryAuth?: string } export interface FreeComputeStartCommand extends Command { @@ -216,6 +217,7 @@ export interface FreeComputeStartCommand extends Command { metadata?: DBComputeJobMetadata additionalViewers?: string[] // addresses of additional addresses that can get results queueMaxWaitTime?: number // max time in seconds a job can wait in the queue before being started + encryptedDockerRegistryAuth?: string } export interface PaidComputeStartCommand extends FreeComputeStartCommand { payment: ComputePayment diff --git a/src/components/c2d/compute_engine_base.ts b/src/components/c2d/compute_engine_base.ts index ec0c21e8e..ee4cfb78e 100644 --- a/src/components/c2d/compute_engine_base.ts +++ b/src/components/c2d/compute_engine_base.ts @@ -72,7 +72,11 @@ export abstract class C2DEngine { } // eslint-disable-next-line require-await - public abstract checkDockerImage(image: string, platform?: any): Promise + public abstract checkDockerImage( + image: string, + encryptedDockerRegistryAuth?: string, + platform?: any + ): Promise public abstract startComputeJob( assets: ComputeAsset[], @@ -86,7 +90,8 @@ export abstract class C2DEngine { jobId: string, metadata?: DBComputeJobMetadata, additionalViewers?: string[], - queueMaxWaitTime?: number + queueMaxWaitTime?: number, + encryptedDockerRegistryAuth?: string ): Promise public abstract stopComputeJob( diff --git a/src/components/c2d/compute_engine_docker.ts b/src/components/c2d/compute_engine_docker.ts index 7f3bd8814..bd04cfe1a 100644 --- a/src/components/c2d/compute_engine_docker.ts +++ b/src/components/c2d/compute_engine_docker.ts @@ -51,7 +51,9 @@ import { decryptFilesObject, omitDBComputeFieldsFromComputeJob } from './index.j import { ValidateParams } from '../httpRoutes/validateCommands.js' import { Service } from '@oceanprotocol/ddo-js' import { getOceanTokenAddressForChain } from '../../utils/address.js' -import { dockerRegistrysAuth } from '../../@types/OceanNode.js' +import { dockerRegistrysAuth, dockerRegistryAuth } from '../../@types/OceanNode.js' +import { EncryptMethod } from '../../@types/fileObject.js' +import { DockerRegistryAuthSchema } from '../../utils/config/schemas.js' export class C2DEngineDocker extends C2DEngine { private envs: ComputeEnvironment[] = [] @@ -474,12 +476,24 @@ export class C2DEngineDocker extends C2DEngine { return { registry, name, ref } } - public async getDockerManifest(image: string): Promise { + public async getDockerManifest( + image: string, + encryptedDockerRegistryAuth?: string + ): Promise { const { registry, name, ref } = this.parseImage(image) const url = `${registry}/v2/${name}/manifests/${ref}` - // Get registry auth from parent class - const dockerRegistryAuth = this.getDockerRegistryAuth(registry) + // Use user provided registry auth or get it from the config + let dockerRegistryAuth: dockerRegistryAuth | null = null + if (encryptedDockerRegistryAuth) { + const decryptedDockerRegistryAuth = await this.keyManager.decrypt( + Uint8Array.from(Buffer.from(encryptedDockerRegistryAuth, 'hex')), + EncryptMethod.ECIES + ) + dockerRegistryAuth = JSON.parse(decryptedDockerRegistryAuth.toString()) + } else { + dockerRegistryAuth = this.getDockerRegistryAuth(registry) + } let headers: Record = { Accept: @@ -549,10 +563,11 @@ export class C2DEngineDocker extends C2DEngine { */ public async checkDockerImage( image: string, + encryptedDockerRegistryAuth?: string, platform?: RunningPlatform ): Promise { try { - const manifest = await this.getDockerManifest(image) + const manifest = await this.getDockerManifest(image, encryptedDockerRegistryAuth) const platforms = Array.isArray(manifest.manifests) ? manifest.manifests.map((entry: any) => entry.platform) @@ -588,7 +603,8 @@ export class C2DEngineDocker extends C2DEngine { jobId: string, metadata?: DBComputeJobMetadata, additionalViewers?: string[], - queueMaxWaitTime?: number + queueMaxWaitTime?: number, + encryptedDockerRegistryAuth?: string ): Promise { if (!this.docker) return [] // TO DO - iterate over resources and get default runtime @@ -636,6 +652,37 @@ export class C2DEngineDocker extends C2DEngine { throw new Error(`additionalDockerFiles cannot be used with queued jobs`) } } + // validate encrypteddockerRegistryAuth + if (encryptedDockerRegistryAuth) { + let decryptedDockerRegistryAuth: dockerRegistryAuth + try { + const decryptedDockerRegistryAuthBuffer = await this.keyManager.decrypt( + Uint8Array.from(Buffer.from(encryptedDockerRegistryAuth, 'hex')), + EncryptMethod.ECIES + ) + + // Convert decrypted buffer to string and parse as JSON + const decryptedDockerRegistryAuthString = + decryptedDockerRegistryAuthBuffer.toString() + + decryptedDockerRegistryAuth = JSON.parse(decryptedDockerRegistryAuthString) + } catch (error: any) { + throw new Error( + `Invalid encryptedDockerRegistryAuth: failed to parse JSON - ${error?.message || String(error)}` + ) + } + + // Validate using schema - ensures either auth or username+password are provided + const validationResult = DockerRegistryAuthSchema.safeParse( + decryptedDockerRegistryAuth + ) + if (!validationResult.success) { + const errorMessage = validationResult.error.errors + .map((err) => err.message) + .join('; ') + throw new Error(`Invalid encryptedDockerRegistryAuth: ${errorMessage}`) + } + } const job: DBComputeJob = { clusterHash: this.getC2DConfig().hash, containerImage: image, @@ -672,7 +719,8 @@ export class C2DEngineDocker extends C2DEngine { additionalViewers, terminationDetails: { exitCode: null, OOMKilled: null }, algoDuration: 0, - queueMaxWaitTime: queueMaxWaitTime || 0 + queueMaxWaitTime: queueMaxWaitTime || 0, + encryptedDockerRegistryAuth // we store the encrypted docker registry auth in the job } if (algorithm.meta.container && algorithm.meta.container.dockerfile) { @@ -683,7 +731,11 @@ export class C2DEngineDocker extends C2DEngine { } } else { // already built, we need to validate it - const validation = await this.checkDockerImage(image, env.platform) + const validation = await this.checkDockerImage( + image, + job.encryptedDockerRegistryAuth, + env.platform + ) if (!validation.valid) throw new Error( `Cannot find image ${image} for ${env.platform.architecture}. Maybe it does not exist or it's build for other arhitectures.` @@ -1682,30 +1734,40 @@ export class C2DEngineDocker extends C2DEngine { try { // Get registry auth for the image const { registry } = this.parseImage(job.containerImage) - const dockerRegistryAuth = this.getDockerRegistryAuth(registry) + // Use user provided registry auth or get it from the config + let dockerRegistryAuthForPull: any + if (originaljob.encryptedDockerRegistryAuth) { + const decryptedDockerRegistryAuth = await this.keyManager.decrypt( + Uint8Array.from(Buffer.from(originaljob.encryptedDockerRegistryAuth, 'hex')), + EncryptMethod.ECIES + ) + dockerRegistryAuthForPull = JSON.parse(decryptedDockerRegistryAuth.toString()) + } else { + dockerRegistryAuthForPull = this.getDockerRegistryAuth(registry) + } // Prepare authconfig for Dockerode if credentials are available const pullOptions: any = {} - if (dockerRegistryAuth) { + if (dockerRegistryAuthForPull) { // Extract hostname from registry URL (remove protocol) const registryUrl = new URL(registry) const serveraddress = registryUrl.hostname + (registryUrl.port ? `:${registryUrl.port}` : '') // Use auth string if available, otherwise encode username:password - const authString = dockerRegistryAuth.auth - ? dockerRegistryAuth.auth + const authString = dockerRegistryAuthForPull.auth + ? dockerRegistryAuthForPull.auth : Buffer.from( - `${dockerRegistryAuth.username}:${dockerRegistryAuth.password}` + `${dockerRegistryAuthForPull.username}:${dockerRegistryAuthForPull.password}` ).toString('base64') pullOptions.authconfig = { serveraddress, - ...(dockerRegistryAuth.auth + ...(dockerRegistryAuthForPull.auth ? { auth: authString } : { - username: dockerRegistryAuth.username, - password: dockerRegistryAuth.password + username: dockerRegistryAuthForPull.username, + password: dockerRegistryAuthForPull.password }) } CORE_LOGGER.debug( diff --git a/src/components/c2d/index.ts b/src/components/c2d/index.ts index 87f4c1300..ccee0616d 100644 --- a/src/components/c2d/index.ts +++ b/src/components/c2d/index.ts @@ -42,7 +42,8 @@ export function omitDBComputeFieldsFromComputeJob(dbCompute: DBComputeJob): Comp 'assets', 'isRunning', 'isStarted', - 'containerImage' + 'containerImage', + 'encryptedDockerRegistryAuth' ]) as ComputeJob return job } diff --git a/src/components/core/compute/initialize.ts b/src/components/core/compute/initialize.ts index 7d4d74a76..9f4b61bb5 100644 --- a/src/components/core/compute/initialize.ts +++ b/src/components/core/compute/initialize.ts @@ -1,5 +1,5 @@ import { Readable } from 'stream' -import { P2PCommandResponse } from '../../../@types/OceanNode.js' +import { P2PCommandResponse, dockerRegistryAuth } from '../../../@types/OceanNode.js' import { C2DClusterType } from '../../../@types/C2D/C2D.js' import { CORE_LOGGER } from '../../../utils/logging/common.js' import { CommandHandler } from '../handler/handler.js' @@ -23,7 +23,11 @@ import { validateCommandParameters } from '../../httpRoutes/validateCommands.js' import { isAddress } from 'ethers' -import { getConfiguration, isPolicyServerConfigured } from '../../../utils/index.js' +import { + DockerRegistryAuthSchema, + getConfiguration, + isPolicyServerConfigured +} from '../../../utils/index.js' import { sanitizeServiceFiles } from '../../../utils/util.js' import { FindDdoHandler } from '../handler/ddoHandler.js' import { isOrderingAllowedForAsset } from '../handler/downloadHandler.js' @@ -388,8 +392,45 @@ export class ComputeInitializeHandler extends CommandHandler { if (hasDockerImages) { const algoImage = getAlgorithmImage(task.algorithm, generateUniqueID(task)) if (algoImage) { + // validate encrypteddockerRegistryAuth + if (task.encryptedDockerRegistryAuth) { + let decryptedDockerRegistryAuth: dockerRegistryAuth + try { + const decryptedDockerRegistryAuthBuffer = + await engine.keyManager.decrypt( + Uint8Array.from( + Buffer.from(task.encryptedDockerRegistryAuth, 'hex') + ), + EncryptMethod.ECIES + ) + + // Convert decrypted buffer to string and parse as JSON + const decryptedDockerRegistryAuthString = + decryptedDockerRegistryAuthBuffer.toString() + + decryptedDockerRegistryAuth = JSON.parse( + decryptedDockerRegistryAuthString + ) + } catch (error: any) { + throw new Error( + `Invalid encryptedDockerRegistryAuth: failed to parse JSON - ${error?.message || String(error)}` + ) + } + + // Validate using schema - ensures either auth or username+password are provided + const validationResult = DockerRegistryAuthSchema.safeParse( + decryptedDockerRegistryAuth + ) + if (!validationResult.success) { + const errorMessage = validationResult.error.errors + .map((err) => err.message) + .join('; ') + throw new Error(`Invalid encryptedDockerRegistryAuth: ${errorMessage}`) + } + } const validation: ValidateParams = await engine.checkDockerImage( algoImage, + task.encryptedDockerRegistryAuth, env.platform ) if (!validation.valid) { diff --git a/src/components/core/compute/startCompute.ts b/src/components/core/compute/startCompute.ts index 91fb2eaad..2024d2c8e 100644 --- a/src/components/core/compute/startCompute.ts +++ b/src/components/core/compute/startCompute.ts @@ -565,7 +565,8 @@ export class PaidComputeStartHandler extends CommandHandler { jobId, task.metadata, task.additionalViewers, - task.queueMaxWaitTime + task.queueMaxWaitTime, + task.encryptedDockerRegistryAuth ) CORE_LOGGER.logMessage( 'ComputeStartCommand Response: ' + JSON.stringify(response, null, 2), @@ -911,7 +912,8 @@ export class FreeComputeStartHandler extends CommandHandler { jobId, task.metadata, task.additionalViewers, - task.queueMaxWaitTime + task.queueMaxWaitTime, + task.encryptedDockerRegistryAuth ) CORE_LOGGER.logMessage( diff --git a/src/components/httpRoutes/compute.ts b/src/components/httpRoutes/compute.ts index c7137b89a..b664001b1 100644 --- a/src/components/httpRoutes/compute.ts +++ b/src/components/httpRoutes/compute.ts @@ -83,7 +83,9 @@ computeRoutes.post(`${SERVICES_API_BASE_PATH}/compute`, async (req, res) => { authorization: req.headers?.authorization, additionalViewers: (req.body.additionalViewers as unknown as string[]) || null, queueMaxWaitTime: req.body.queueMaxWaitTime || 0, - caller: req.caller + caller: req.caller, + encryptedDockerRegistryAuth: + (req.body.encryptedDockerRegistryAuth as string) || null } if (req.body.output) { startComputeTask.output = req.body.output as ComputeOutput @@ -130,7 +132,9 @@ computeRoutes.post(`${SERVICES_API_BASE_PATH}/freeCompute`, async (req, res) => authorization: req.headers?.authorization, additionalViewers: (req.body.additionalViewers as unknown as string[]) || null, queueMaxWaitTime: req.body.queueMaxWaitTime || 0, - caller: req.caller + caller: req.caller, + encryptedDockerRegistryAuth: + (req.body.encryptedDockerRegistryAuth as string) || null } if (req.body.output) { startComputeTask.output = req.body.output as ComputeOutput diff --git a/src/test/integration/dockerRegistryAuth.test.ts b/src/test/integration/dockerRegistryAuth.test.ts index f35660a3e..394bbd1df 100644 --- a/src/test/integration/dockerRegistryAuth.test.ts +++ b/src/test/integration/dockerRegistryAuth.test.ts @@ -37,7 +37,7 @@ describe('Docker Registry Authentication Integration Tests', () => { // Test with a well-known public image const image = 'library/alpine:latest' - const manifest = await dockerEngine.getDockerManifest(image) + const manifest = await dockerEngine.getDockerManifest(image, null) expect(manifest).to.exist expect(manifest).to.have.property('schemaVersion') @@ -64,7 +64,7 @@ describe('Docker Registry Authentication Integration Tests', () => { // Use a simple image reference that will default to Docker Hub const image = 'hello-world:latest' - const manifest = await dockerEngine.getDockerManifest(image) + const manifest = await dockerEngine.getDockerManifest(image, null) expect(manifest).to.exist expect(manifest).to.have.property('schemaVersion') @@ -233,7 +233,7 @@ describe('Docker Registry Authentication Integration Tests', () => { ) try { - await dockerEngine.getDockerManifest('invalid-image-reference') + await dockerEngine.getDockerManifest('invalid-image-reference', null) assert.fail('Should have thrown an error for invalid image') } catch (error: any) { expect(error).to.exist From b6d96e7ae16ba70c8f1db4c432dc7019efdf72fe Mon Sep 17 00:00:00 2001 From: alexcos20 Date: Fri, 6 Feb 2026 13:20:08 +0200 Subject: [PATCH 2/6] add tests --- src/components/c2d/compute_engine_docker.ts | 31 -- src/components/core/compute/initialize.ts | 24 +- src/components/core/compute/startCompute.ts | 97 +++- src/test/integration/compute.test.ts | 462 ++++++++++++++++++++ 4 files changed, 577 insertions(+), 37 deletions(-) diff --git a/src/components/c2d/compute_engine_docker.ts b/src/components/c2d/compute_engine_docker.ts index bd04cfe1a..37c1bd269 100644 --- a/src/components/c2d/compute_engine_docker.ts +++ b/src/components/c2d/compute_engine_docker.ts @@ -53,7 +53,6 @@ import { Service } from '@oceanprotocol/ddo-js' import { getOceanTokenAddressForChain } from '../../utils/address.js' import { dockerRegistrysAuth, dockerRegistryAuth } from '../../@types/OceanNode.js' import { EncryptMethod } from '../../@types/fileObject.js' -import { DockerRegistryAuthSchema } from '../../utils/config/schemas.js' export class C2DEngineDocker extends C2DEngine { private envs: ComputeEnvironment[] = [] @@ -652,37 +651,7 @@ export class C2DEngineDocker extends C2DEngine { throw new Error(`additionalDockerFiles cannot be used with queued jobs`) } } - // validate encrypteddockerRegistryAuth - if (encryptedDockerRegistryAuth) { - let decryptedDockerRegistryAuth: dockerRegistryAuth - try { - const decryptedDockerRegistryAuthBuffer = await this.keyManager.decrypt( - Uint8Array.from(Buffer.from(encryptedDockerRegistryAuth, 'hex')), - EncryptMethod.ECIES - ) - - // Convert decrypted buffer to string and parse as JSON - const decryptedDockerRegistryAuthString = - decryptedDockerRegistryAuthBuffer.toString() - - decryptedDockerRegistryAuth = JSON.parse(decryptedDockerRegistryAuthString) - } catch (error: any) { - throw new Error( - `Invalid encryptedDockerRegistryAuth: failed to parse JSON - ${error?.message || String(error)}` - ) - } - // Validate using schema - ensures either auth or username+password are provided - const validationResult = DockerRegistryAuthSchema.safeParse( - decryptedDockerRegistryAuth - ) - if (!validationResult.success) { - const errorMessage = validationResult.error.errors - .map((err) => err.message) - .join('; ') - throw new Error(`Invalid encryptedDockerRegistryAuth: ${errorMessage}`) - } - } const job: DBComputeJob = { clusterHash: this.getC2DConfig().hash, containerImage: image, diff --git a/src/components/core/compute/initialize.ts b/src/components/core/compute/initialize.ts index 9f4b61bb5..3de386051 100644 --- a/src/components/core/compute/initialize.ts +++ b/src/components/core/compute/initialize.ts @@ -412,9 +412,15 @@ export class ComputeInitializeHandler extends CommandHandler { decryptedDockerRegistryAuthString ) } catch (error: any) { - throw new Error( - `Invalid encryptedDockerRegistryAuth: failed to parse JSON - ${error?.message || String(error)}` - ) + const errorMessage = `Invalid encryptedDockerRegistryAuth: failed to parse JSON - ${error?.message || String(error)}` + CORE_LOGGER.error(errorMessage) + return { + stream: null, + status: { + httpStatus: 400, + error: errorMessage + } + } } // Validate using schema - ensures either auth or username+password are provided @@ -422,10 +428,18 @@ export class ComputeInitializeHandler extends CommandHandler { decryptedDockerRegistryAuth ) if (!validationResult.success) { - const errorMessage = validationResult.error.errors + const errorMessageValidation = validationResult.error.errors .map((err) => err.message) .join('; ') - throw new Error(`Invalid encryptedDockerRegistryAuth: ${errorMessage}`) + const errorMessage = `Invalid encryptedDockerRegistryAuth: ${errorMessageValidation}` + CORE_LOGGER.error(errorMessage) + return { + stream: null, + status: { + httpStatus: 400, + error: errorMessage + } + } } } const validation: ValidateParams = await engine.checkDockerImage( diff --git a/src/components/core/compute/startCompute.ts b/src/components/core/compute/startCompute.ts index 2024d2c8e..251ca1652 100644 --- a/src/components/core/compute/startCompute.ts +++ b/src/components/core/compute/startCompute.ts @@ -28,7 +28,11 @@ import { } from '../../../@types/C2D/C2D.js' // import { verifyProviderFees } from '../utils/feesHandler.js' import { validateOrderTransaction } from '../utils/validateOrders.js' -import { getConfiguration, isPolicyServerConfigured } from '../../../utils/index.js' +import { + getConfiguration, + isPolicyServerConfigured, + DockerRegistryAuthSchema +} from '../../../utils/index.js' import { sanitizeServiceFiles } from '../../../utils/util.js' import { FindDdoHandler } from '../handler/ddoHandler.js' // import { ProviderFeeValidation } from '../../../@types/Fees.js' @@ -38,6 +42,7 @@ import { getNonceAsNumber } from '../utils/nonceHandler.js' import { PolicyServer } from '../../policyServer/index.js' import { checkCredentials } from '../../../utils/credentials.js' import { checkAddressOnAccessList } from '../../../utils/accessList.js' +import { dockerRegistryAuth } from '../../../@types/OceanNode.js' export class PaidComputeStartHandler extends CommandHandler { validate(command: PaidComputeStartCommand): ValidateParams { @@ -151,6 +156,51 @@ export class PaidComputeStartHandler extends CommandHandler { } } } + // validate encrypteddockerRegistryAuth + if (task.encryptedDockerRegistryAuth) { + let decryptedDockerRegistryAuth: dockerRegistryAuth + try { + const decryptedDockerRegistryAuthBuffer = await engine.keyManager.decrypt( + Uint8Array.from(Buffer.from(task.encryptedDockerRegistryAuth, 'hex')), + EncryptMethod.ECIES + ) + + // Convert decrypted buffer to string and parse as JSON + const decryptedDockerRegistryAuthString = + decryptedDockerRegistryAuthBuffer.toString() + + decryptedDockerRegistryAuth = JSON.parse(decryptedDockerRegistryAuthString) + } catch (error: any) { + const errorMessage = `Invalid encryptedDockerRegistryAuth: failed to parse JSON - ${error?.message || String(error)}` + CORE_LOGGER.error(errorMessage) + return { + stream: null, + status: { + httpStatus: 400, + error: errorMessage + } + } + } + + // Validate using schema - ensures either auth or username+password are provided + const validationResult = DockerRegistryAuthSchema.safeParse( + decryptedDockerRegistryAuth + ) + if (!validationResult.success) { + const errorMessageValidation = validationResult.error.errors + .map((err) => err.message) + .join('; ') + const errorMessage = `Invalid encryptedDockerRegistryAuth: ${errorMessageValidation}` + CORE_LOGGER.error(errorMessage) + return { + stream: null, + status: { + httpStatus: 400, + error: errorMessage + } + } + } + } const { algorithm } = task const config = await getConfiguration() @@ -680,6 +730,51 @@ export class FreeComputeStartHandler extends CommandHandler { } } } + // validate encrypteddockerRegistryAuth + if (task.encryptedDockerRegistryAuth) { + let decryptedDockerRegistryAuth: dockerRegistryAuth + try { + const decryptedDockerRegistryAuthBuffer = await engine.keyManager.decrypt( + Uint8Array.from(Buffer.from(task.encryptedDockerRegistryAuth, 'hex')), + EncryptMethod.ECIES + ) + + // Convert decrypted buffer to string and parse as JSON + const decryptedDockerRegistryAuthString = + decryptedDockerRegistryAuthBuffer.toString() + + decryptedDockerRegistryAuth = JSON.parse(decryptedDockerRegistryAuthString) + } catch (error: any) { + const errorMessage = `Invalid encryptedDockerRegistryAuth: failed to parse JSON - ${error?.message || String(error)}` + CORE_LOGGER.error(errorMessage) + return { + stream: null, + status: { + httpStatus: 400, + error: errorMessage + } + } + } + + // Validate using schema - ensures either auth or username+password are provided + const validationResult = DockerRegistryAuthSchema.safeParse( + decryptedDockerRegistryAuth + ) + if (!validationResult.success) { + const errorMessageValidation = validationResult.error.errors + .map((err) => err.message) + .join('; ') + const errorMessage = `Invalid encryptedDockerRegistryAuth: ${errorMessageValidation}` + CORE_LOGGER.error(errorMessage) + return { + stream: null, + status: { + httpStatus: 400, + error: errorMessage + } + } + } + } const policyServer = new PolicyServer() for (const elem of [...[task.algorithm], ...task.datasets]) { if (!('documentId' in elem)) { diff --git a/src/test/integration/compute.test.ts b/src/test/integration/compute.test.ts index ed5c8c0eb..765e130c6 100644 --- a/src/test/integration/compute.test.ts +++ b/src/test/integration/compute.test.ts @@ -1393,6 +1393,468 @@ describe('Compute', () => { }) }) + describe('encryptedDockerRegistryAuth integration tests', () => { + /** + * Helper function to encrypt docker registry auth using ECIES + */ + async function encryptDockerRegistryAuth(auth: { + username?: string + password?: string + auth?: string + }): Promise { + const authJson = JSON.stringify(auth) + const authData = Uint8Array.from(Buffer.from(authJson)) + const encrypted = await oceanNode + .getKeyManager() + .encrypt(authData, EncryptMethod.ECIES) + return Buffer.from(encrypted).toString('hex') + } + + it('should initialize compute with valid encryptedDockerRegistryAuth (username/password)', async () => { + const validAuth = { + username: 'testuser', + password: 'testpass' + } + const encryptedAuth = await encryptDockerRegistryAuth(validAuth) + + const dataset: ComputeAsset = { + documentId: publishedComputeDataset.ddo.id, + serviceId: publishedComputeDataset.ddo.services[0].id, + transferTxId: String(datasetOrderTxId) + } + const algorithm: ComputeAlgorithm = { + documentId: publishedAlgoDataset.ddo.id, + serviceId: publishedAlgoDataset.ddo.services[0].id, + transferTxId: String(algoOrderTxId), + meta: publishedAlgoDataset.ddo.metadata.algorithm + } + const initializeComputeTask: ComputeInitializeCommand = { + datasets: [dataset], + algorithm, + environment: firstEnv.id, + payment: { + chainId: DEVELOPMENT_CHAIN_ID, + token: paymentToken + }, + maxJobDuration: computeJobDuration, + consumerAddress: firstEnv.consumerAddress, + command: PROTOCOL_COMMANDS.COMPUTE_INITIALIZE, + encryptedDockerRegistryAuth: encryptedAuth + } + + const resp = await new ComputeInitializeHandler(oceanNode).handle( + initializeComputeTask + ) + assert(resp, 'Failed to get response') + // Should succeed (200) or fail for other reasons, but not due to auth validation + // Check that error is not a validation error (format validation), even if Docker auth fails + if (resp.status.httpStatus !== 200) { + expect(resp.status.error).to.not.include('Invalid encryptedDockerRegistryAuth') + } + if (resp.status.httpStatus === 200) { + assert(resp.stream, 'Failed to get stream') + expect(resp.stream).to.be.instanceOf(Readable) + } + }) + + it('should initialize compute with valid encryptedDockerRegistryAuth (auth string)', async () => { + const validAuth = { + auth: Buffer.from('testuser:testpass').toString('base64') + } + const encryptedAuth = await encryptDockerRegistryAuth(validAuth) + + const dataset: ComputeAsset = { + documentId: publishedComputeDataset.ddo.id, + serviceId: publishedComputeDataset.ddo.services[0].id, + transferTxId: String(datasetOrderTxId) + } + const algorithm: ComputeAlgorithm = { + documentId: publishedAlgoDataset.ddo.id, + serviceId: publishedAlgoDataset.ddo.services[0].id, + transferTxId: String(algoOrderTxId), + meta: publishedAlgoDataset.ddo.metadata.algorithm + } + const initializeComputeTask: ComputeInitializeCommand = { + datasets: [dataset], + algorithm, + environment: firstEnv.id, + payment: { + chainId: DEVELOPMENT_CHAIN_ID, + token: paymentToken + }, + maxJobDuration: computeJobDuration, + consumerAddress: firstEnv.consumerAddress, + command: PROTOCOL_COMMANDS.COMPUTE_INITIALIZE, + encryptedDockerRegistryAuth: encryptedAuth + } + + const resp = await new ComputeInitializeHandler(oceanNode).handle( + initializeComputeTask + ) + assert(resp, 'Failed to get response') + // Should succeed (200) or fail for other reasons, but not due to auth validation + // Check that error is not a validation error (format validation), even if Docker auth fails + if (resp.status.httpStatus !== 200) { + expect(resp.status.error).to.not.include('Invalid encryptedDockerRegistryAuth') + } + if (resp.status.httpStatus === 200) { + assert(resp.stream, 'Failed to get stream') + expect(resp.stream).to.be.instanceOf(Readable) + } + }) + + it('should fail initialize compute with invalid encryptedDockerRegistryAuth (missing password)', async () => { + const invalidAuth = { + username: 'testuser' + // missing password + } + const encryptedAuth = await encryptDockerRegistryAuth(invalidAuth) + + const dataset: ComputeAsset = { + documentId: publishedComputeDataset.ddo.id, + serviceId: publishedComputeDataset.ddo.services[0].id, + transferTxId: String(datasetOrderTxId) + } + const algorithm: ComputeAlgorithm = { + documentId: publishedAlgoDataset.ddo.id, + serviceId: publishedAlgoDataset.ddo.services[0].id, + transferTxId: String(algoOrderTxId), + meta: publishedAlgoDataset.ddo.metadata.algorithm + } + const initializeComputeTask: ComputeInitializeCommand = { + datasets: [dataset], + algorithm, + environment: firstEnv.id, + payment: { + chainId: DEVELOPMENT_CHAIN_ID, + token: paymentToken + }, + maxJobDuration: computeJobDuration, + consumerAddress: firstEnv.consumerAddress, + command: PROTOCOL_COMMANDS.COMPUTE_INITIALIZE, + encryptedDockerRegistryAuth: encryptedAuth + } + + const resp = await new ComputeInitializeHandler(oceanNode).handle( + initializeComputeTask + ) + assert(resp, 'Failed to get response') + // Should fail with 400 due to validation error + assert( + resp.status.httpStatus === 400, + `Expected 400 but got ${resp.status.httpStatus}: ${resp.status.error}` + ) + expect(resp.status.error).to.include('Invalid encryptedDockerRegistryAuth') + expect(resp.status.error).to.include( + "Either 'auth' must be provided, or both 'username' and 'password' must be provided" + ) + }) + + it('should fail initialize compute with invalid encryptedDockerRegistryAuth (empty object)', async () => { + const invalidAuth = {} + const encryptedAuth = await encryptDockerRegistryAuth(invalidAuth) + + const dataset: ComputeAsset = { + documentId: publishedComputeDataset.ddo.id, + serviceId: publishedComputeDataset.ddo.services[0].id, + transferTxId: String(datasetOrderTxId) + } + const algorithm: ComputeAlgorithm = { + documentId: publishedAlgoDataset.ddo.id, + serviceId: publishedAlgoDataset.ddo.services[0].id, + transferTxId: String(algoOrderTxId), + meta: publishedAlgoDataset.ddo.metadata.algorithm + } + const initializeComputeTask: ComputeInitializeCommand = { + datasets: [dataset], + algorithm, + environment: firstEnv.id, + payment: { + chainId: DEVELOPMENT_CHAIN_ID, + token: paymentToken + }, + maxJobDuration: computeJobDuration, + consumerAddress: firstEnv.consumerAddress, + command: PROTOCOL_COMMANDS.COMPUTE_INITIALIZE, + encryptedDockerRegistryAuth: encryptedAuth + } + + const resp = await new ComputeInitializeHandler(oceanNode).handle( + initializeComputeTask + ) + assert(resp, 'Failed to get response') + assert( + resp.status.httpStatus === 400, + `Expected 400 but got ${resp.status.httpStatus}: ${resp.status.error}` + ) + expect(resp.status.error).to.include('Invalid encryptedDockerRegistryAuth') + }) + + it('should start paid compute job with valid encryptedDockerRegistryAuth', async () => { + const validAuth = { + username: 'testuser', + password: 'testpass' + } + const encryptedAuth = await encryptDockerRegistryAuth(validAuth) + + const nonce = Date.now().toString() + const message = String( + (await consumerAccount.getAddress()) + publishedComputeDataset.ddo.id + nonce + ) + const consumerMessage = ethers.solidityPackedKeccak256( + ['bytes'], + [ethers.hexlify(ethers.toUtf8Bytes(message))] + ) + const messageHashBytes = ethers.toBeArray(consumerMessage) + const signature = await wallet.signMessage(messageHashBytes) + + const startComputeTask: PaidComputeStartCommand = { + command: PROTOCOL_COMMANDS.COMPUTE_START, + consumerAddress: await consumerAccount.getAddress(), + environment: firstEnv.id, + signature, + nonce, + datasets: [ + { + documentId: publishedComputeDataset.ddo.id, + serviceId: publishedComputeDataset.ddo.services[0].id, + transferTxId: String(datasetOrderTxId) + } + ], + algorithm: { + documentId: publishedAlgoDataset.ddo.id, + serviceId: publishedAlgoDataset.ddo.services[0].id, + transferTxId: String(algoOrderTxId), + meta: publishedAlgoDataset.ddo.metadata.algorithm + }, + payment: { + chainId: DEVELOPMENT_CHAIN_ID, + token: paymentToken + }, + maxJobDuration: computeJobDuration, + encryptedDockerRegistryAuth: encryptedAuth + } + + const response = await new PaidComputeStartHandler(oceanNode).handle( + startComputeTask + ) + assert(response, 'Failed to get response') + // Should succeed (200) or fail for other reasons, but not due to auth validation + // Check that error is not a validation error (format validation), even if Docker auth fails + if (response.status.httpStatus !== 200) { + expect(response.status.error).to.not.include( + 'Invalid encryptedDockerRegistryAuth' + ) + } + if (response.status.httpStatus === 200) { + assert(response.stream, 'Failed to get stream') + expect(response.stream).to.be.instanceOf(Readable) + } + }) + + it('should fail paid compute start with invalid encryptedDockerRegistryAuth', async () => { + const invalidAuth = { + username: 'testuser' + // missing password + } + const encryptedAuth = await encryptDockerRegistryAuth(invalidAuth) + + const nonce = Date.now().toString() + const message = String( + (await consumerAccount.getAddress()) + publishedComputeDataset.ddo.id + nonce + ) + const consumerMessage = ethers.solidityPackedKeccak256( + ['bytes'], + [ethers.hexlify(ethers.toUtf8Bytes(message))] + ) + const messageHashBytes = ethers.toBeArray(consumerMessage) + const signature = await wallet.signMessage(messageHashBytes) + + const startComputeTask: PaidComputeStartCommand = { + command: PROTOCOL_COMMANDS.COMPUTE_START, + consumerAddress: await consumerAccount.getAddress(), + environment: firstEnv.id, + signature, + nonce, + datasets: [ + { + documentId: publishedComputeDataset.ddo.id, + serviceId: publishedComputeDataset.ddo.services[0].id, + transferTxId: String(datasetOrderTxId) + } + ], + algorithm: { + documentId: publishedAlgoDataset.ddo.id, + serviceId: publishedAlgoDataset.ddo.services[0].id, + transferTxId: String(algoOrderTxId), + meta: publishedAlgoDataset.ddo.metadata.algorithm + }, + payment: { + chainId: DEVELOPMENT_CHAIN_ID, + token: paymentToken + }, + maxJobDuration: computeJobDuration, + encryptedDockerRegistryAuth: encryptedAuth + } + + const response = await new PaidComputeStartHandler(oceanNode).handle( + startComputeTask + ) + assert(response, 'Failed to get response') + assert( + response.status.httpStatus === 400, + `Expected 400 but got ${response.status.httpStatus}: ${response.status.error}` + ) + expect(response.status.error).to.include('Invalid encryptedDockerRegistryAuth') + }) + + it('should start free compute job with valid encryptedDockerRegistryAuth', async () => { + const validAuth = { + username: 'testuser', + password: 'testpass' + } + const encryptedAuth = await encryptDockerRegistryAuth(validAuth) + + const nonce = Date.now().toString() + const consumerMessage = ethers.solidityPackedKeccak256( + ['bytes'], + [ethers.hexlify(ethers.toUtf8Bytes(nonce))] + ) + const signature = await wallet.signMessage(ethers.toBeArray(consumerMessage)) + + const startComputeTask: FreeComputeStartCommand = { + command: PROTOCOL_COMMANDS.FREE_COMPUTE_START, + consumerAddress: await wallet.getAddress(), + signature, + nonce, + environment: firstEnv.id, + datasets: [ + { + fileObject: computeAsset.services[0].files.files[0], + documentId: publishedComputeDataset.ddo.id, + serviceId: publishedComputeDataset.ddo.services[0].id, + transferTxId: datasetOrderTxId + } + ], + algorithm: { + fileObject: algoAsset.services[0].files.files[0], + documentId: publishedAlgoDataset.ddo.id, + serviceId: publishedAlgoDataset.ddo.services[0].id, + transferTxId: algoOrderTxId, + meta: publishedAlgoDataset.ddo.metadata.algorithm + }, + output: {}, + encryptedDockerRegistryAuth: encryptedAuth + } + + const response = await new FreeComputeStartHandler(oceanNode).handle( + startComputeTask + ) + assert(response, 'Failed to get response') + // Should succeed (200) or fail for other reasons, but not due to auth validation + // Check that error is not a validation error (format validation), even if Docker auth fails + if (response.status.httpStatus !== 200) { + expect(response.status.error).to.not.include( + 'Invalid encryptedDockerRegistryAuth' + ) + } + if (response.status.httpStatus === 200) { + assert(response.stream, 'Failed to get stream') + expect(response.stream).to.be.instanceOf(Readable) + } + }) + + it('should fail free compute start with invalid encryptedDockerRegistryAuth', async () => { + const invalidAuth = { + password: 'testpass' + // missing username + } + const encryptedAuth = await encryptDockerRegistryAuth(invalidAuth) + + const nonce = Date.now().toString() + const consumerMessage = ethers.solidityPackedKeccak256( + ['bytes'], + [ethers.hexlify(ethers.toUtf8Bytes(nonce))] + ) + const signature = await wallet.signMessage(ethers.toBeArray(consumerMessage)) + + const startComputeTask: FreeComputeStartCommand = { + command: PROTOCOL_COMMANDS.FREE_COMPUTE_START, + consumerAddress: await wallet.getAddress(), + signature, + nonce, + environment: firstEnv.id, + datasets: [ + { + fileObject: computeAsset.services[0].files.files[0], + documentId: publishedComputeDataset.ddo.id, + serviceId: publishedComputeDataset.ddo.services[0].id, + transferTxId: datasetOrderTxId + } + ], + algorithm: { + fileObject: algoAsset.services[0].files.files[0], + documentId: publishedAlgoDataset.ddo.id, + serviceId: publishedAlgoDataset.ddo.services[0].id, + transferTxId: algoOrderTxId, + meta: publishedAlgoDataset.ddo.metadata.algorithm + }, + output: {}, + encryptedDockerRegistryAuth: encryptedAuth + } + + const response = await new FreeComputeStartHandler(oceanNode).handle( + startComputeTask + ) + assert(response, 'Failed to get response') + assert( + response.status.httpStatus === 400, + `Expected 400 but got ${response.status.httpStatus}: ${response.status.error}` + ) + expect(response.status.error).to.include('Invalid encryptedDockerRegistryAuth') + }) + + it('should handle invalid hex-encoded encryptedDockerRegistryAuth gracefully', async () => { + const invalidHex = 'not-a-valid-hex-string' + + const dataset: ComputeAsset = { + documentId: publishedComputeDataset.ddo.id, + serviceId: publishedComputeDataset.ddo.services[0].id, + transferTxId: String(datasetOrderTxId) + } + const algorithm: ComputeAlgorithm = { + documentId: publishedAlgoDataset.ddo.id, + serviceId: publishedAlgoDataset.ddo.services[0].id, + transferTxId: String(algoOrderTxId), + meta: publishedAlgoDataset.ddo.metadata.algorithm + } + const initializeComputeTask: ComputeInitializeCommand = { + datasets: [dataset], + algorithm, + environment: firstEnv.id, + payment: { + chainId: DEVELOPMENT_CHAIN_ID, + token: paymentToken + }, + maxJobDuration: computeJobDuration, + consumerAddress: firstEnv.consumerAddress, + command: PROTOCOL_COMMANDS.COMPUTE_INITIALIZE, + encryptedDockerRegistryAuth: invalidHex + } + + const resp = await new ComputeInitializeHandler(oceanNode).handle( + initializeComputeTask + ) + assert(resp, 'Failed to get response') + // Should fail with 500 due to decryption/parsing error + assert( + resp.status.httpStatus === 400, + `Expected 400 but got ${resp.status.httpStatus}: ${resp.status.error}` + ) + expect(resp.status.error).to.include('Invalid encryptedDockerRegistryAuth') + }) + }) + after(async () => { await tearDownEnvironment(previousConfiguration) indexer.stopAllChainIndexers() From 7b23c18c9816c4115158c49e01e846a0234a7b3e Mon Sep 17 00:00:00 2001 From: alexcos20 Date: Fri, 6 Feb 2026 13:52:14 +0200 Subject: [PATCH 3/6] refactor --- docs/API.md | 2 +- src/components/c2d/compute_engine_base.ts | 51 +++++++++++++ src/components/core/compute/initialize.ts | 48 ++---------- src/components/core/compute/startCompute.ts | 81 +++------------------ 4 files changed, 70 insertions(+), 112 deletions(-) diff --git a/docs/API.md b/docs/API.md index df2d2b17c..ad6f1be1c 100644 --- a/docs/API.md +++ b/docs/API.md @@ -1416,7 +1416,7 @@ starts a free compute job and returns jobId if succesfull | metadata | object | | optional metadata for the job, data provided by the user | | additionalViewers | object | | optional array of addresses that are allowed to fetch the result | | queueMaxWaitTime | number | | optional max time in seconds a job can wait in the queue before being started | -| encryptedDockerRegistryAuth | string | Ecies encrypted docker auth schema for image | +| encryptedDockerRegistryAuth | string | Ecies encrypted docker auth schema for image (see [Private Docker Registries with Per-Job Authentication](../env.md#private-docker-registries-with-per-job-authentication)) | #### Request diff --git a/src/components/c2d/compute_engine_base.ts b/src/components/c2d/compute_engine_base.ts index ee4cfb78e..b6cb3e1e5 100644 --- a/src/components/c2d/compute_engine_base.ts +++ b/src/components/c2d/compute_engine_base.ts @@ -23,6 +23,9 @@ import { Escrow } from '../core/utils/escrow.js' import { KeyManager } from '../KeyManager/index.js' import { dockerRegistryAuth, dockerRegistrysAuth } from '../../@types/OceanNode.js' import { ValidateParams } from '../httpRoutes/validateCommands.js' +import { EncryptMethod } from '../../@types/fileObject.js' +import { CORE_LOGGER } from '../../utils/logging/common.js' +import { DockerRegistryAuthSchema } from '../../utils/config/schemas.js' export abstract class C2DEngine { private clusterConfig: C2DClusterInfo public db: C2DDatabase @@ -544,4 +547,52 @@ export abstract class C2DEngine { } return null } + + public async checkEncryptedDockerRegistryAuth( + encryptedDockerRegistryAuth: string + ): Promise { + let decryptedDockerRegistryAuth: dockerRegistryAuth + try { + const decryptedDockerRegistryAuthBuffer = await this.keyManager.decrypt( + Uint8Array.from(Buffer.from(encryptedDockerRegistryAuth, 'hex')), + EncryptMethod.ECIES + ) + + // Convert decrypted buffer to string and parse as JSON + const decryptedDockerRegistryAuthString = + decryptedDockerRegistryAuthBuffer.toString() + + decryptedDockerRegistryAuth = JSON.parse(decryptedDockerRegistryAuthString) + } catch (error: any) { + const errorMessage = `Invalid encryptedDockerRegistryAuth: failed to parse JSON - ${error?.message || String(error)}` + CORE_LOGGER.error(errorMessage) + return { + valid: false, + reason: errorMessage, + status: 400 + } + } + + // Validate using schema - ensures either auth or username+password are provided + const validationResult = DockerRegistryAuthSchema.safeParse( + decryptedDockerRegistryAuth + ) + if (!validationResult.success) { + const errorMessageValidation = validationResult.error.errors + .map((err) => err.message) + .join('; ') + const errorMessage = `Invalid encryptedDockerRegistryAuth: ${errorMessageValidation}` + CORE_LOGGER.error(errorMessage) + return { + valid: false, + reason: errorMessage, + status: 400 + } + } + return { + valid: true, + reason: null, + status: 200 + } + } } diff --git a/src/components/core/compute/initialize.ts b/src/components/core/compute/initialize.ts index 3de386051..de611ac78 100644 --- a/src/components/core/compute/initialize.ts +++ b/src/components/core/compute/initialize.ts @@ -393,56 +393,22 @@ export class ComputeInitializeHandler extends CommandHandler { const algoImage = getAlgorithmImage(task.algorithm, generateUniqueID(task)) if (algoImage) { // validate encrypteddockerRegistryAuth + let validation: ValidateParams if (task.encryptedDockerRegistryAuth) { - let decryptedDockerRegistryAuth: dockerRegistryAuth - try { - const decryptedDockerRegistryAuthBuffer = - await engine.keyManager.decrypt( - Uint8Array.from( - Buffer.from(task.encryptedDockerRegistryAuth, 'hex') - ), - EncryptMethod.ECIES - ) - - // Convert decrypted buffer to string and parse as JSON - const decryptedDockerRegistryAuthString = - decryptedDockerRegistryAuthBuffer.toString() - - decryptedDockerRegistryAuth = JSON.parse( - decryptedDockerRegistryAuthString - ) - } catch (error: any) { - const errorMessage = `Invalid encryptedDockerRegistryAuth: failed to parse JSON - ${error?.message || String(error)}` - CORE_LOGGER.error(errorMessage) - return { - stream: null, - status: { - httpStatus: 400, - error: errorMessage - } - } - } - - // Validate using schema - ensures either auth or username+password are provided - const validationResult = DockerRegistryAuthSchema.safeParse( - decryptedDockerRegistryAuth + validation = await engine.checkEncryptedDockerRegistryAuth( + task.encryptedDockerRegistryAuth ) - if (!validationResult.success) { - const errorMessageValidation = validationResult.error.errors - .map((err) => err.message) - .join('; ') - const errorMessage = `Invalid encryptedDockerRegistryAuth: ${errorMessageValidation}` - CORE_LOGGER.error(errorMessage) + if (!validation.valid) { return { stream: null, status: { - httpStatus: 400, - error: errorMessage + httpStatus: validation.status, + error: `Invalid encryptedDockerRegistryAuth :${validation.reason}` } } } } - const validation: ValidateParams = await engine.checkDockerImage( + validation = await engine.checkDockerImage( algoImage, task.encryptedDockerRegistryAuth, env.platform diff --git a/src/components/core/compute/startCompute.ts b/src/components/core/compute/startCompute.ts index 251ca1652..a2bff3461 100644 --- a/src/components/core/compute/startCompute.ts +++ b/src/components/core/compute/startCompute.ts @@ -158,49 +158,20 @@ export class PaidComputeStartHandler extends CommandHandler { } // validate encrypteddockerRegistryAuth if (task.encryptedDockerRegistryAuth) { - let decryptedDockerRegistryAuth: dockerRegistryAuth - try { - const decryptedDockerRegistryAuthBuffer = await engine.keyManager.decrypt( - Uint8Array.from(Buffer.from(task.encryptedDockerRegistryAuth, 'hex')), - EncryptMethod.ECIES - ) - - // Convert decrypted buffer to string and parse as JSON - const decryptedDockerRegistryAuthString = - decryptedDockerRegistryAuthBuffer.toString() - - decryptedDockerRegistryAuth = JSON.parse(decryptedDockerRegistryAuthString) - } catch (error: any) { - const errorMessage = `Invalid encryptedDockerRegistryAuth: failed to parse JSON - ${error?.message || String(error)}` - CORE_LOGGER.error(errorMessage) - return { - stream: null, - status: { - httpStatus: 400, - error: errorMessage - } - } - } - - // Validate using schema - ensures either auth or username+password are provided - const validationResult = DockerRegistryAuthSchema.safeParse( - decryptedDockerRegistryAuth + const validation = await engine.checkEncryptedDockerRegistryAuth( + task.encryptedDockerRegistryAuth ) - if (!validationResult.success) { - const errorMessageValidation = validationResult.error.errors - .map((err) => err.message) - .join('; ') - const errorMessage = `Invalid encryptedDockerRegistryAuth: ${errorMessageValidation}` - CORE_LOGGER.error(errorMessage) + if (!validation.valid) { return { stream: null, status: { - httpStatus: 400, - error: errorMessage + httpStatus: validation.status, + error: `Invalid encryptedDockerRegistryAuth :${validation.reason}` } } } } + const { algorithm } = task const config = await getConfiguration() @@ -732,45 +703,15 @@ export class FreeComputeStartHandler extends CommandHandler { } // validate encrypteddockerRegistryAuth if (task.encryptedDockerRegistryAuth) { - let decryptedDockerRegistryAuth: dockerRegistryAuth - try { - const decryptedDockerRegistryAuthBuffer = await engine.keyManager.decrypt( - Uint8Array.from(Buffer.from(task.encryptedDockerRegistryAuth, 'hex')), - EncryptMethod.ECIES - ) - - // Convert decrypted buffer to string and parse as JSON - const decryptedDockerRegistryAuthString = - decryptedDockerRegistryAuthBuffer.toString() - - decryptedDockerRegistryAuth = JSON.parse(decryptedDockerRegistryAuthString) - } catch (error: any) { - const errorMessage = `Invalid encryptedDockerRegistryAuth: failed to parse JSON - ${error?.message || String(error)}` - CORE_LOGGER.error(errorMessage) - return { - stream: null, - status: { - httpStatus: 400, - error: errorMessage - } - } - } - - // Validate using schema - ensures either auth or username+password are provided - const validationResult = DockerRegistryAuthSchema.safeParse( - decryptedDockerRegistryAuth + const validation = await engine.checkEncryptedDockerRegistryAuth( + task.encryptedDockerRegistryAuth ) - if (!validationResult.success) { - const errorMessageValidation = validationResult.error.errors - .map((err) => err.message) - .join('; ') - const errorMessage = `Invalid encryptedDockerRegistryAuth: ${errorMessageValidation}` - CORE_LOGGER.error(errorMessage) + if (!validation.valid) { return { stream: null, status: { - httpStatus: 400, - error: errorMessage + httpStatus: validation.status, + error: `Invalid encryptedDockerRegistryAuth :${validation.reason}` } } } From 911b06aa61ce54f32ba4b28f3b4ade836e6b98dc Mon Sep 17 00:00:00 2001 From: alexcos20 Date: Fri, 6 Feb 2026 13:58:43 +0200 Subject: [PATCH 4/6] lint --- src/components/core/compute/initialize.ts | 8 ++------ src/components/core/compute/startCompute.ts | 7 +------ 2 files changed, 3 insertions(+), 12 deletions(-) diff --git a/src/components/core/compute/initialize.ts b/src/components/core/compute/initialize.ts index de611ac78..24b2a5256 100644 --- a/src/components/core/compute/initialize.ts +++ b/src/components/core/compute/initialize.ts @@ -1,5 +1,5 @@ import { Readable } from 'stream' -import { P2PCommandResponse, dockerRegistryAuth } from '../../../@types/OceanNode.js' +import { P2PCommandResponse } from '../../../@types/OceanNode.js' import { C2DClusterType } from '../../../@types/C2D/C2D.js' import { CORE_LOGGER } from '../../../utils/logging/common.js' import { CommandHandler } from '../handler/handler.js' @@ -23,11 +23,7 @@ import { validateCommandParameters } from '../../httpRoutes/validateCommands.js' import { isAddress } from 'ethers' -import { - DockerRegistryAuthSchema, - getConfiguration, - isPolicyServerConfigured -} from '../../../utils/index.js' +import { getConfiguration, isPolicyServerConfigured } from '../../../utils/index.js' import { sanitizeServiceFiles } from '../../../utils/util.js' import { FindDdoHandler } from '../handler/ddoHandler.js' import { isOrderingAllowedForAsset } from '../handler/downloadHandler.js' diff --git a/src/components/core/compute/startCompute.ts b/src/components/core/compute/startCompute.ts index a2bff3461..ac83856e5 100644 --- a/src/components/core/compute/startCompute.ts +++ b/src/components/core/compute/startCompute.ts @@ -28,11 +28,7 @@ import { } from '../../../@types/C2D/C2D.js' // import { verifyProviderFees } from '../utils/feesHandler.js' import { validateOrderTransaction } from '../utils/validateOrders.js' -import { - getConfiguration, - isPolicyServerConfigured, - DockerRegistryAuthSchema -} from '../../../utils/index.js' +import { getConfiguration, isPolicyServerConfigured } from '../../../utils/index.js' import { sanitizeServiceFiles } from '../../../utils/util.js' import { FindDdoHandler } from '../handler/ddoHandler.js' // import { ProviderFeeValidation } from '../../../@types/Fees.js' @@ -42,7 +38,6 @@ import { getNonceAsNumber } from '../utils/nonceHandler.js' import { PolicyServer } from '../../policyServer/index.js' import { checkCredentials } from '../../../utils/credentials.js' import { checkAddressOnAccessList } from '../../../utils/accessList.js' -import { dockerRegistryAuth } from '../../../@types/OceanNode.js' export class PaidComputeStartHandler extends CommandHandler { validate(command: PaidComputeStartCommand): ValidateParams { From 241e7ac6ab2346b011e4071f0bbffd2752513379 Mon Sep 17 00:00:00 2001 From: Alex Coseru Date: Fri, 6 Feb 2026 17:02:35 +0200 Subject: [PATCH 5/6] use local image (#1198) --- src/components/c2d/compute_engine_docker.ts | 54 +++++- src/test/integration/compute.test.ts | 204 ++++++++++++++++++++ 2 files changed, 256 insertions(+), 2 deletions(-) diff --git a/src/components/c2d/compute_engine_docker.ts b/src/components/c2d/compute_engine_docker.ts index 37c1bd269..8b7444eed 100644 --- a/src/components/c2d/compute_engine_docker.ts +++ b/src/components/c2d/compute_engine_docker.ts @@ -556,15 +556,65 @@ export class C2DEngineDocker extends C2DEngine { } /** - * Checks the docker image by looking at the manifest + * Checks the docker image by looking at local images first, then remote manifest * @param image name or tag - * @returns boolean + * @param encryptedDockerRegistryAuth optional encrypted auth for remote registry + * @param platform optional platform to validate against + * @returns ValidateParams with valid flag and platform validation result */ public async checkDockerImage( image: string, encryptedDockerRegistryAuth?: string, platform?: RunningPlatform ): Promise { + // Step 1: Try to check local image first + if (this.docker) { + try { + const dockerImage = this.docker.getImage(image) + const imageInfo = await dockerImage.inspect() + + // Extract platform information from local image + const localPlatform = { + architecture: imageInfo.Architecture || 'amd64', + os: imageInfo.Os || 'linux' + } + + // Normalize architecture (amd64 -> x86_64 for compatibility) + if (localPlatform.architecture === 'amd64') { + localPlatform.architecture = 'x86_64' + } + + // Validate platform if required + const isValidPlatform = platform + ? checkManifestPlatform(localPlatform, platform) + : true + + if (isValidPlatform) { + CORE_LOGGER.debug(`Image ${image} found locally and platform is valid`) + return { valid: true } + } else { + CORE_LOGGER.warn( + `Image ${image} found locally but platform mismatch: ` + + `local=${localPlatform.architecture}/${localPlatform.os}, ` + + `required=${platform.architecture}/${platform.os}` + ) + return { + valid: false, + status: 400, + reason: + `Platform mismatch: image is ${localPlatform.architecture}/${localPlatform.os}, ` + + `but environment requires ${platform.architecture}/${platform.os}` + } + } + } catch (localErr: any) { + // Image not found locally or error inspecting - fall through to remote check + CORE_LOGGER.debug( + `Image ${image} not found locally (${localErr.message}), checking remote registry` + ) + } + } + + // Step 2: Fall back to remote registry check (existing behavior) try { const manifest = await this.getDockerManifest(image, encryptedDockerRegistryAuth) diff --git a/src/test/integration/compute.test.ts b/src/test/integration/compute.test.ts index 765e130c6..96dae67ec 100644 --- a/src/test/integration/compute.test.ts +++ b/src/test/integration/compute.test.ts @@ -75,6 +75,8 @@ import { import { freeComputeStartPayload } from '../data/commands.js' import { DDOManager } from '@oceanprotocol/ddo-js' +import Dockerode from 'dockerode' +import { C2DEngineDocker } from '../../components/c2d/compute_engine_docker.js' const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)) @@ -1855,6 +1857,208 @@ describe('Compute', () => { }) }) + describe('Local Docker image checking', () => { + let docker: Dockerode + let dockerEngine: C2DEngineDocker + + before(async function () { + // Skip if Docker not available + try { + docker = new Dockerode() + await docker.info() + } catch (e) { + this.skip() + } + + // Get the Docker engine from oceanNode + const c2dEngines = oceanNode.getC2DEngines() + const engines = (c2dEngines as any).engines as C2DEngineDocker[] + dockerEngine = engines.find((e) => e instanceof C2DEngineDocker) + if (!dockerEngine) { + this.skip() + } + }) + + it('should check local image when it exists locally', async function () { + // Skip if Docker not available + try { + await docker.info() + } catch (e) { + this.skip() + } + + const testImage = 'alpine:3.18' + + // Ensure image exists locally + try { + await docker.pull(testImage) + } catch (e) { + // If pull fails, skip test + this.skip() + } + + // Check the image - should find it locally + const result = await dockerEngine.checkDockerImage(testImage) + + assert(result, 'Result should exist') + assert(result.valid === true, 'Image should be valid') + }).timeout(30000) + + it('should validate platform for local images', async function () { + // Skip if Docker not available + try { + await docker.info() + } catch (e) { + this.skip() + } + + const testImage = 'alpine:3.18' + + // Ensure image exists locally + try { + await docker.pull(testImage) + } catch (e) { + this.skip() + } + + // Get the platform from the local image + const imageInfo = await docker.getImage(testImage).inspect() + const localArch = imageInfo.Architecture || 'amd64' + const localOs = imageInfo.Os || 'linux' + + // Check with matching platform + const matchingPlatform = { + architecture: localArch === 'amd64' ? 'x86_64' : localArch, + os: localOs + } + const resultMatching = await dockerEngine.checkDockerImage( + testImage, + undefined, + matchingPlatform + ) + + assert(resultMatching, 'Result should exist') + assert( + resultMatching.valid === true, + 'Image should be valid with matching platform' + ) + }).timeout(30000) + + it('should detect platform mismatch for local images', async function () { + // Skip if Docker not available + try { + await docker.info() + } catch (e) { + this.skip() + } + + const testImage = 'alpine:3.18' + + // Ensure image exists locally + try { + await docker.pull(testImage) + } catch (e) { + this.skip() + } + + // Check with mismatched platform (assuming local is linux/amd64 or linux/x86_64) + const mismatchedPlatform = { + architecture: 'arm64', // Different architecture + os: 'linux' + } + const resultMismatch = await dockerEngine.checkDockerImage( + testImage, + undefined, + mismatchedPlatform + ) + + assert(resultMismatch, 'Result should exist') + assert( + resultMismatch.valid === false, + 'Image should be invalid with mismatched platform' + ) + assert(resultMismatch.status === 400, 'Status should be 400 for platform mismatch') + assert( + resultMismatch.reason.includes('Platform mismatch'), + 'Reason should include platform mismatch message' + ) + }).timeout(30000) + + it('should fall back to remote registry when local image not found', async function () { + // Skip if Docker not available + try { + await docker.info() + } catch (e) { + this.skip() + } + + const nonExistentLocalImage = 'nonexistent-local-image:latest' + + // Ensure image doesn't exist locally + try { + const image = docker.getImage(nonExistentLocalImage) + await image.inspect() + // If we get here, image exists - remove it for test + await image.remove({ force: true }) + } catch (e) { + // Image doesn't exist locally, which is what we want + } + + // Check the image - should fall back to remote check + // This will likely fail with 404, but we're testing the fallback behavior + const result = await dockerEngine.checkDockerImage(nonExistentLocalImage) + + assert(result, 'Result should exist') + // Should have attempted remote check (will fail, but that's expected) + assert(result.valid === false, 'Image should be invalid (not found)') + assert(result.status === 404, 'Status should be 404 for not found') + }).timeout(30000) + + it('should work without platform validation when platform not specified', async function () { + // Skip if Docker not available + try { + await docker.info() + } catch (e) { + this.skip() + } + + const testImage = 'alpine:3.18' + + // Ensure image exists locally + try { + await docker.pull(testImage) + } catch (e) { + this.skip() + } + + // Check without platform - should succeed if image exists + const result = await dockerEngine.checkDockerImage(testImage) + + assert(result, 'Result should exist') + assert(result.valid === true, 'Image should be valid without platform check') + }).timeout(30000) + + after(async function () { + // Clean up test images if needed + try { + await docker.info() + } catch (e) { + // Docker not available, skip cleanup + } + + // Optionally remove test images to save space + // (commented out to avoid breaking other tests that might use these images) + /* + try { + const image = docker.getImage('alpine:3.18') + await image.remove({ force: true }) + } catch (e) { + // Ignore errors during cleanup + } + */ + }) + }) + after(async () => { await tearDownEnvironment(previousConfiguration) indexer.stopAllChainIndexers() From f14a243fa044ced3af7a408ef6601c4a74093223 Mon Sep 17 00:00:00 2001 From: alexcos20 Date: Fri, 6 Feb 2026 18:33:19 +0200 Subject: [PATCH 6/6] more verbose --- src/test/integration/compute.test.ts | 102 ++++++++------------------- 1 file changed, 29 insertions(+), 73 deletions(-) diff --git a/src/test/integration/compute.test.ts b/src/test/integration/compute.test.ts index 96dae67ec..2dc919b7d 100644 --- a/src/test/integration/compute.test.ts +++ b/src/test/integration/compute.test.ts @@ -1860,12 +1860,40 @@ describe('Compute', () => { describe('Local Docker image checking', () => { let docker: Dockerode let dockerEngine: C2DEngineDocker - + const testImage = 'alpine:3.18' before(async function () { // Skip if Docker not available try { docker = new Dockerode() await docker.info() + const pullStream = await docker.pull(testImage) + await new Promise((resolve, reject) => { + let wroteStatusBanner = false + docker.modem.followProgress( + pullStream, + (err: any, res: any) => { + // onFinished + if (err) { + console.log(err) + return reject(err) + } + console.log(`Successfully pulled image: ${testImage}`) + resolve(res) + }, + (progress: any) => { + // onProgress + if (!wroteStatusBanner) { + wroteStatusBanner = true + console.log('############# Pull docker image status: ##############') + } + // only write the status banner once, its cleaner + let logText = '' + if (progress.id) logText += progress.id + ' : ' + progress.status + else logText = progress.status + console.log('Pulling image : ' + logText) + } + ) + }) } catch (e) { this.skip() } @@ -1880,23 +1908,6 @@ describe('Compute', () => { }) it('should check local image when it exists locally', async function () { - // Skip if Docker not available - try { - await docker.info() - } catch (e) { - this.skip() - } - - const testImage = 'alpine:3.18' - - // Ensure image exists locally - try { - await docker.pull(testImage) - } catch (e) { - // If pull fails, skip test - this.skip() - } - // Check the image - should find it locally const result = await dockerEngine.checkDockerImage(testImage) @@ -1905,22 +1916,6 @@ describe('Compute', () => { }).timeout(30000) it('should validate platform for local images', async function () { - // Skip if Docker not available - try { - await docker.info() - } catch (e) { - this.skip() - } - - const testImage = 'alpine:3.18' - - // Ensure image exists locally - try { - await docker.pull(testImage) - } catch (e) { - this.skip() - } - // Get the platform from the local image const imageInfo = await docker.getImage(testImage).inspect() const localArch = imageInfo.Architecture || 'amd64' @@ -1945,22 +1940,6 @@ describe('Compute', () => { }).timeout(30000) it('should detect platform mismatch for local images', async function () { - // Skip if Docker not available - try { - await docker.info() - } catch (e) { - this.skip() - } - - const testImage = 'alpine:3.18' - - // Ensure image exists locally - try { - await docker.pull(testImage) - } catch (e) { - this.skip() - } - // Check with mismatched platform (assuming local is linux/amd64 or linux/x86_64) const mismatchedPlatform = { architecture: 'arm64', // Different architecture @@ -1985,13 +1964,6 @@ describe('Compute', () => { }).timeout(30000) it('should fall back to remote registry when local image not found', async function () { - // Skip if Docker not available - try { - await docker.info() - } catch (e) { - this.skip() - } - const nonExistentLocalImage = 'nonexistent-local-image:latest' // Ensure image doesn't exist locally @@ -2015,22 +1987,6 @@ describe('Compute', () => { }).timeout(30000) it('should work without platform validation when platform not specified', async function () { - // Skip if Docker not available - try { - await docker.info() - } catch (e) { - this.skip() - } - - const testImage = 'alpine:3.18' - - // Ensure image exists locally - try { - await docker.pull(testImage) - } catch (e) { - this.skip() - } - // Check without platform - should succeed if image exists const result = await dockerEngine.checkDockerImage(testImage)