diff --git a/package.json b/package.json index 00cbd15..a8c1601 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@nevermined-io/cli", - "version": "0.8.1", + "version": "0.9.0", "main": "index.js", "repository": "git@github.com:nevermined-io/cli.git", "author": "Nevermined", @@ -22,9 +22,9 @@ "ncli": "./dist/src/index.js" }, "dependencies": { - "@nevermined-io/nevermined-sdk-js": "0.26.0", - "@nevermined-io/nevermined-sdk-dtp": "0.2.2", - "@truffle/hdwallet-provider": "^2.0.9", + "@nevermined-io/nevermined-sdk-js": "0.27.3", + "@nevermined-io/nevermined-sdk-dtp": "0.2.4", + "@truffle/hdwallet-provider": "^2.1.3", "chalk": "^4.1.2", "cross-fetch": "~3.1.5", "dotenv": "^16.0.1", diff --git a/resources/commands.json b/resources/commands.json index 79703dd..c523a7c 100644 --- a/resources/commands.json +++ b/resources/commands.json @@ -277,6 +277,44 @@ "description": "The type of the asset to register" } ] + }, { + "name": "register-workflow", + "description": "Register a new workflow", + "details": "This command registers a new workflow on the Nevermined network. The workflow allows to specify a an operation that needs to be executed. This typically involve the execution of an algorithm (referenced by a DID) and the input data that is given to that algorithm (referenced by DIDs too).", + "examples": ["ncli assets register-workflow --name 'Word count of English Enciclopedia' --input did:nv:abc --algorithm did:nv:012 "], + "commandHandler": "registerAsset", + "optionalArguments": [{ + "name": "name", + "type": "string", + "demandOption": true, + "description": "The asset name" + }, + { + "name": "author", + "type": "string", + "demandOption": false, + "description": "The author of the asset" + }, + { + "name": "input", + "type": "array", + "demandOption": true, + "description": "The DID of the input asset" + }, + { + "name": "algorithm", + "type": "string", + "demandOption": true, + "description": "The DID of the algorithm asset to process the input" + }, + { + "name": "assetType", + "type": "string", + "default": "workflow", + "hidden": true, + "description": "The type of the asset to register" + } + ] }, { "name": "import [metadata]", "description": "Import an asset using the metadata in JSON format", @@ -450,6 +488,81 @@ "description": "The asset DID" }] }] + }, { + "name": "compute", + "description": "Allows the execution of remote computation", + "usage": "usage: $0 compute parameters [options]", + "subcommands": [{ + "name": "order [did]", + "description": "Order a compute asset given a DID", + "details": "This method makes the payment and retrieve a serviceAgreementId that can be used later to execute a compute job", + "examples": ["ncli compute order did:nv:912e7a547bcd675ffbc5d2063ef770e15744029f048f706f6bb0281df4f4700f"], + "commandHandler": "orderAsset", + "positionalArguments": [{ + "name": "did", + "type": "string", + "description": "The asset DID" + }], + "optionalArguments": [{ + "name": "orderType", + "type": "string", + "default": "compute", + "hidden": true, + "description": "The type of asset to order" + }] + }, { + "name": "execute [did]", + "description": "Order & download or download directly a previously purchased asset", + "details": "This commands is the best entry point to access the files attached to a Nevermined asset. Depending on the parameters provided, it allows to order and download the files of an asset, or if this was already purchased, provides the service agreement to download them.", + "examples": ["ncli assets get did:nv:912e7a547bcd675ffbc5d2063ef770e15744029f048f706f6bb0281df4f4700f --destination /tmp", "ncli assets get did:nv:912e7a547bcd675ffbc5d2063ef770e15744029f048f706f6bb0281df4f4700f --agreementId 0x412dceaa0c5506095daa6b221be93c680e8a49bfd5b63ce54522d85d2b0e1384 --destination /tmp"], + "commandHandler": "execCompute", + "positionalArguments": [{ + "name": "did", + "type": "string", + "description": "The asset DID" + }], + "optionalArguments": [{ + "name": "agreementId", + "type": "string", + "default": "", + "description": "Agreement Id of a previously purchased asset. If not given a new purchase will be executed" + } + ] + }, { + "name": "status [agreementId] [jobId]", + "description": "Shows the status about the execution of a job", + "details": "When a user triggers an execution Nevermined orchestrates the infrastructure allowing to put together the input data and the algorithm. This process can require some time and this command checks the status of that operation.", + "examples": ["ncli compute status 0xf29bebaeacf865b4f57373aeb84635cc68c7719761607aec2802f1ad87213777 abxckdsaofksdadsa"], + "commandHandler": "statusJob", + "positionalArguments": [{ + "name": "agreementId", + "type": "string", + "default": "", + "description": "Agreement Id of a previously purchased asset" + }, { + "name": "jobId", + "type": "string", + "description": "The id of the user execution" + }], + "optionalArguments": [] + }, { + "name": "logs [agreementId] [jobId]", + "description": "Shows the logs about the execution of a job", + "details": "When a user triggers an execution Nevermined orchestrates the infrastructure allowing to put together the input data and the algorithm. This command fetches the logs generated by the execution of the algorithm.", + "examples": ["ncli compute logs 0xf29bebaeacf865b4f57373aeb84635cc68c7719761607aec2802f1ad87213777 abxckdsaofksdadsa"], + "commandHandler": "logsJob", + "positionalArguments": [{ + "name": "agreementId", + "type": "string", + "default": "", + "description": "Agreement Id of a previously purchased asset" + }, { + "name": "jobId", + "type": "string", + "description": "The id of the user execution" + }], + "optionalArguments": [] + }] }, { "name": "agreements", "description": "Get information about the Service Execution Agreements", diff --git a/src/commands/assets/orderAsset.ts b/src/commands/assets/orderAsset.ts index 946d636..51aa9fd 100644 --- a/src/commands/assets/orderAsset.ts +++ b/src/commands/assets/orderAsset.ts @@ -19,7 +19,7 @@ export const orderAsset = async ( config: ConfigEntry, logger: Logger ): Promise => { - const { did } = argv + const { did, orderType } = argv // TODO: Enable DTP when `sdk-dtp` is ready // const keyTransfer = await makeKeyTransfer() @@ -28,7 +28,10 @@ export const orderAsset = async ( logger.debug(chalk.dim(`Using account: '${account.getId()}'`)) - const agreementId = await nvm.assets.order(did, 'access', account) + const agreementId = await nvm.assets.order( + did, + orderType === 'compute'?'compute':'access', + account) // } logger.info(chalk.dim(`Agreement Id: ${agreementId}`)) diff --git a/src/commands/assets/registerAsset.ts b/src/commands/assets/registerAsset.ts index a7f9b81..8546e2e 100644 --- a/src/commands/assets/registerAsset.ts +++ b/src/commands/assets/registerAsset.ts @@ -68,14 +68,16 @@ export const registerAsset = async ( }) _fileIndex++ } - argv.urls.forEach((_url: string) => { - _files.push({ - index: _fileIndex, - url: _url, - contentType: argv.contentType + if (assetType!== 'workflow') { + argv.urls.forEach((_url: string) => { + _files.push({ + index: _fileIndex, + url: _url, + contentType: argv.contentType + }) + _fileIndex++ }) - _fileIndex++ - }) + } ddoMetadata = { main: { @@ -113,6 +115,41 @@ export const registerAsset = async ( } } } + + // TODO Add support for multiple stages/inputs when ComputePods does + if (assetType === 'workflow') { + const argvInput = argv.input as string + const algorithm = argv.algorithm + + ddoMetadata.main.workflow = { + coordinationType: 'argo', + stages: [ + { + index: 0, + // TODO - irrelevant. this info is included in algorithm ddo. update sdk-js to remove this from metadata + requirements: { + container: { + image: '', + tag: '', + checksum: '' + } + }, + input: [{ + index: 0, + id: argvInput + }], + transformation: { + id: algorithm + }, + output: { + metadataUrl: `${config.nvm.marketplaceUri}/api/v1/metadata/assets/ddo/`, + accessProxyUrl: `${config.nvm.neverminedNodeUri}/api/v1/node/`, + metadata: {} as any + } + } + ] + } + } } else { ddoMetadata = JSON.parse(fs.readFileSync(metadata).toString()) } @@ -135,7 +172,7 @@ export const registerAsset = async ( ddoMetadata, account, assetRewards, - ['access'], + assetType ==='compute'?['compute']:['access'], [], DEFAULT_ENCRYPTION_METHOD, // eslint-disable-next-line @typescript-eslint/no-non-null-assertion diff --git a/src/commands/compute/execCompute.ts b/src/commands/compute/execCompute.ts new file mode 100644 index 0000000..1b6f051 --- /dev/null +++ b/src/commands/compute/execCompute.ts @@ -0,0 +1,55 @@ +import { Account, Nevermined } from '@nevermined-io/nevermined-sdk-js' +import { StatusCodes } from '../../utils' +import chalk from 'chalk' +import { Logger } from 'log4js' +import { ExecutionOutput } from '../../models/ExecutionOutput' +import { ConfigEntry } from '../../models/ConfigDefinition' + + +export const execCompute = async ( + nvm: Nevermined, + account: Account, + argv: any, + config: ConfigEntry, + logger: Logger +): Promise => { + const { did } = argv + + let agreementId + + + if (!argv.agreementId) { + logger.info(chalk.dim(`Ordering asset: ${did}`)) + agreementId = await nvm.assets.order(did, 'compute', account) + } else { + agreementId = argv.agreementId + } + + logger.info( + chalk.dim(`Executing asset: ${did} with agreement id: ${agreementId}`) + ) + + try { + const jobId = await nvm.assets.execute(agreementId, did, account) + + logger.info( + chalk.dim(`Created Job ${jobId}`) + ) + + return { + status: StatusCodes.OK, + results: JSON.stringify({ + did, + agreementId, + jobId + }) + } + } catch (error) { + return { + status: StatusCodes.ERROR, + errorMessage: `Unable to execute the asset ${did} with agreement id: ${agreementId}: ${ + (error as Error).message + }` + } + } +} diff --git a/src/commands/compute/logsJob.ts b/src/commands/compute/logsJob.ts new file mode 100644 index 0000000..5244142 --- /dev/null +++ b/src/commands/compute/logsJob.ts @@ -0,0 +1,44 @@ +import { Account, Nevermined } from '@nevermined-io/nevermined-sdk-js' +import { StatusCodes } from '../../utils' +import chalk from 'chalk' +import { Logger } from 'log4js' +import { ExecutionOutput } from '../../models/ExecutionOutput' +import { ConfigEntry } from '../../models/ConfigDefinition' + +export const logsJob = async ( + nvm: Nevermined, + account: Account, + argv: any, + config: ConfigEntry, + logger: Logger +): Promise => { + const { agreementId, jobId } = argv + + logger.info( + chalk.dim(`Fetching logs of jobId: ${jobId} and agreement id: ${agreementId}`) + ) + + try { + const computeLogs = await nvm.assets.computeLogs(agreementId, jobId, account) + + logger.info( + chalk.dim(`Logs for ${jobId} fetched correctly`) + ) + + return { + status: StatusCodes.OK, + results: JSON.stringify({ + agreementId, + jobId, + computeLogs + }) + } + }catch (error) { + return { + status: StatusCodes.ERROR, + errorMessage: `Unable to fetch the logs of jobId: ${jobId} and agreement id: ${agreementId}: ${ + (error as Error).message + }` + } + } +} diff --git a/src/commands/compute/statusJob.ts b/src/commands/compute/statusJob.ts new file mode 100644 index 0000000..9b670fe --- /dev/null +++ b/src/commands/compute/statusJob.ts @@ -0,0 +1,45 @@ +import { Account, Nevermined } from '@nevermined-io/nevermined-sdk-js' +import { StatusCodes } from '../../utils' +import chalk from 'chalk' +import { Logger } from 'log4js' +import { ExecutionOutput } from '../../models/ExecutionOutput' +import { ConfigEntry } from '../../models/ConfigDefinition' + +export const statusJob = async ( + nvm: Nevermined, + account: Account, + argv: any, + config: ConfigEntry, + logger: Logger +): Promise => { + const { agreementId, jobId } = argv + + logger.info( + chalk.dim(`Fetching status of jobId: ${jobId} and agreement id: ${agreementId}`) + ) + + try{ + + const computeStatus = await nvm.assets.computeStatus(agreementId, jobId, account) + + logger.info( + chalk.dim(`Status fetched: ${computeStatus}`) + ) + + return { + status: StatusCodes.OK, + results: JSON.stringify({ + agreementId, + jobId, + computeStatus + }) + } + } catch (error) { + return { + status: StatusCodes.ERROR, + errorMessage: `Unable to fetch the status of jobId: ${jobId} and agreement id: ${agreementId}: ${ + (error as Error).message + }` + } + } +} diff --git a/src/commands/index.ts b/src/commands/index.ts index c122f15..ff7d0ff 100644 --- a/src/commands/index.ts +++ b/src/commands/index.ts @@ -20,6 +20,11 @@ export * from './nfts/transferNft' export * from './nfts/downloadNft' export * from './nfts/accessNft' +// compute +export * from './compute/execCompute' +export * from './compute/statusJob' +export * from './compute/logsJob' + // accounts export * from './accounts/new' export * from './accounts/list' diff --git a/src/utils/config.ts b/src/utils/config.ts index 95a7fa0..ade55ed 100644 --- a/src/utils/config.ts +++ b/src/utils/config.ts @@ -178,32 +178,52 @@ export function getConfig( process.env.KEYFILE_PATH!, process.env.KEYFILE_PASSWORD! ) - hdWalletProvider = new HDWalletProvider( - [ + hdWalletProvider = new HDWalletProvider({ + privateKeys: [ getPrivateKey( process.env.KEYFILE_PATH!, process.env.KEYFILE_PASSWORD! ) ], - config.nvm.web3ProviderUri - ) + providerOrUrl: config.nvm.web3ProviderUri, + }) + // hdWalletProvider = new HDWalletProvider( + // [ + // getPrivateKey( + // process.env.KEYFILE_PATH!, + // process.env.KEYFILE_PASSWORD! + // ) + // ], + // config.nvm.web3ProviderUri + // ) } else { signer = Wallet.fromMnemonic(config.seed!) - hdWalletProvider = new HDWalletProvider( - config.seed!, - config.nvm.web3ProviderUri, - accountIndex, - 10 - ) + hdWalletProvider = new HDWalletProvider({ + mnemonic: config.seed!, + providerOrUrl: config.nvm.web3ProviderUri, + addressIndex: accountIndex, + numberOfAddresses: 10 + }) + // hdWalletProvider = new HDWalletProvider( + // config.seed!, + // config.nvm.web3ProviderUri, + // accountIndex, + // 10 + // ) } } else { signer = Wallet.fromMnemonic(DUMMY_SEED_WORDS) - hdWalletProvider = new HDWalletProvider( - DUMMY_SEED_WORDS, - config.nvm.web3ProviderUri, - 0, - 1 - ) + hdWalletProvider = new HDWalletProvider({ + mnemonic: DUMMY_SEED_WORDS, + providerOrUrl: config.nvm.web3ProviderUri, + addressIndex: 0, + numberOfAddresses: 1 + }) + // DUMMY_SEED_WORDS, + // config.nvm.web3ProviderUri, + // 0, + // 1 + // ) } return { diff --git a/test/helpers/Config.ts b/test/helpers/Config.ts index eab3f39..a7796b8 100644 --- a/test/helpers/Config.ts +++ b/test/helpers/Config.ts @@ -110,5 +110,11 @@ export const baseCommands = { decrypt: `${BASE_COMMAND} ${VERBOSE} utils decrypt `, publishMetadata: `${BASE_COMMAND} ${VERBOSE} utils publish-nft-metadata `, getMetadata: `${BASE_COMMAND} ${VERBOSE} utils get-nft-metadata ` + }, + compute: { + order: `${BASE_COMMAND} ${VERBOSE} compute order`, + execute: `${BASE_COMMAND} ${VERBOSE} compute execute`, + status: `${BASE_COMMAND} ${VERBOSE} compute status`, + logs: `${BASE_COMMAND} ${VERBOSE} compute logs` } } diff --git a/test/helpers/StdoutParser.ts b/test/helpers/StdoutParser.ts index 2867e84..7d061fa 100644 --- a/test/helpers/StdoutParser.ts +++ b/test/helpers/StdoutParser.ts @@ -30,6 +30,11 @@ export const commandRegex = { }, utils: { upload: new RegExp('URL: (.*)\\nPassword: (.*)\\n', 'gm') + }, + compute: { + execute: new RegExp('.*Created Job (.*)', 'gm'), + algorithm: new RegExp('.*Created Asset.(.{71}).*', 'g'), + workflow: new RegExp('.*Created Asset.(.{71}).*', 'g') } } @@ -149,3 +154,28 @@ export const parseAddressOfContractDeployed = (stdout: string): string => { } return '' } + +export const parseComputeJobId = (stdout: string): string => { + const jobId = commandRegex.compute.execute.exec(stdout) + if (jobId != null) { + return jobId[1] + } + return '' +} + +export const parseDIDFromNewAlgorithm = (stdout: string): string => { + const did = commandRegex.compute.algorithm.exec(stdout) + if (did != null) { + return did[1] + } + return '' +} + +export const parseDIDFromNewWorkflow = (stdout: string): string => { + const did = commandRegex.compute.workflow.exec(stdout) + if (did != null) { + return did[1] + } + return '' +} + diff --git a/test/integration-tests/Compute.test.ts b/test/integration-tests/Compute.test.ts new file mode 100644 index 0000000..dab00a5 --- /dev/null +++ b/test/integration-tests/Compute.test.ts @@ -0,0 +1,135 @@ +import { execOpts, metadataConfig, baseCommands } from '../helpers/Config' +import { + parseDIDFromNewAsset, + parseServiceAgreementId, + parseComputeJobId, + parseDIDFromNewAlgorithm, + parseDIDFromNewWorkflow +} from '../helpers/StdoutParser' +import execCommand from '../helpers/ExecCommand' + +describe('Compute e2e Testing', () => { + let did = '' + let algoDid = '' + let workflowDid = '' + let agreementId: string | null= '' + let jobId = '' + + beforeAll(async () => { + console.log(`NETWORK: ${execOpts.env.NETWORK}`) + try { + if ( + execOpts.env.NETWORK === 'spree' || + execOpts.env.NETWORK === 'geth-localnet' || + execOpts.env.NETWORK === 'polygon-localnet' + ) { + console.log( + `Funding accounts: ${execOpts.accounts[0]} + ${execOpts.accounts[1]}` + ) + + let fundCommand = `${baseCommands.accounts.fund} "${execOpts.accounts[0]}" --token both` + console.log(fundCommand) + console.log(execCommand(fundCommand, execOpts)) + fundCommand = `${baseCommands.accounts.fund} "${execOpts.accounts[1]}" --token both` + console.log(fundCommand) + console.log(execCommand(fundCommand, execOpts)) + } + } catch (error) { + console.warn(`Unable to fund accounts`) + } + + }) + + test.skip('Registering a new compute asset and resolve the DID', async () => { + + const registerAssetCommand = `${baseCommands.assets.registerAsset} --assetType compute --accountIndex 0 --name "${metadataConfig.name}" --author "${metadataConfig.author}" --price "${metadataConfig.price}" --urls ${metadataConfig.url} --contentType ${metadataConfig.contentType}` + console.debug(`COMMAND: ${registerAssetCommand}`) + + const registerStdout = execCommand(registerAssetCommand, execOpts) + + console.log(`STDOUT: ${registerStdout}`) + did = parseDIDFromNewAsset(registerStdout) + console.log(`DID: ${did}`) + expect(did === '' ? false : did.startsWith('did:nv:')) + + const resolveDIDCommand = `${baseCommands.assets.resolveDID} ${did}` + const stdoutResolve = execCommand(resolveDIDCommand, execOpts) + + console.log(`Resolved: ${stdoutResolve}`) + expect(stdoutResolve.includes('DID found')) + expect(stdoutResolve.includes(did)) + }) + + test.skip('Order a compute DDO', async () => { + + const orderComputeCommand = `${baseCommands.compute.order} ${did}` + + const stdout = execCommand(orderComputeCommand, execOpts) + + console.log(`STDOUT: ${stdout}`) + agreementId = parseServiceAgreementId(stdout) + console.log(`Agreement Id: ${agreementId}`) + expect(agreementId === null ? false : agreementId.startsWith('0x')) + + }) + + test.skip('Registering a new algorithm', async () => { + const registerAlgorithmCommand = `${baseCommands.assets.registerAlgorithm} --name "Test Algorithm" --author "${metadataConfig.author}" --price "0" --language "python" --entrypoint "python word_count.py" --container "python:3.8-alpine" --urls ${metadataConfig.url} --contentType ${metadataConfig.contentType}` + console.debug(`COMMAND: ${registerAlgorithmCommand}`) + + const stdout = execCommand(registerAlgorithmCommand, execOpts) + + console.log(`STDOUT: ${stdout}`) + algoDid = parseDIDFromNewAlgorithm(stdout) + console.log(`ALGO DID: ${algoDid}`) + expect(algoDid === null ? false : algoDid.startsWith('did:nv:')) + }) + + test.skip('Registering a workflow associating the data and the algorithm', async () => { + + const registerWorkflowCommand = `${baseCommands.assets.registerWorkflow} --name "Test Worfklow" --author "${metadataConfig.author}" --price "0" --input ${did} --algorithm ${algoDid}` + + const stdout = execCommand(registerWorkflowCommand, execOpts) + + console.log(`STDOUT: ${stdout}`) + workflowDid = parseDIDFromNewWorkflow(stdout) + console.log(`WORKFLOW DID: ${workflowDid}`) + expect(workflowDid === null ? false : workflowDid.startsWith('did:nv:')) + + }) + + + test.skip('Executing a compute job', async () => { + + const execComputeCommand = `${baseCommands.compute.execute} ${workflowDid} --agreementId ${agreementId}` + + const stdout = execCommand(execComputeCommand, execOpts) + + console.log(`STDOUT: ${stdout}`) + jobId = parseComputeJobId(stdout) + console.log(`jobId: ${jobId}`) + expect(jobId !== null) + + }) + + const sleep = (ms: number) => new Promise(r => setTimeout(r, ms)) + + test.skip('Fetching status of a compute job', async () => { + + // wait a couple of seconds to make sure the status of the job is created + await sleep(4000) + + const execComputeCommand = `${baseCommands.compute.status} --jobId ${jobId} --agreementId ${agreementId}` + const stdout = execCommand(execComputeCommand, execOpts) + expect(stdout.includes('Status fetched:')) + + }) + + test.skip('Fetching logs of a compute job', async () => { + + const execComputeCommand = `${baseCommands.compute.logs} --jobId ${jobId} --agreementId ${agreementId}` + const stdout = execCommand(execComputeCommand, execOpts) + expect(stdout.includes(`Logs for ${jobId} fetched correctly`)) + + }) +})