diff --git a/.github/workflows/create-release.yml b/.github/workflows/create-release.yml index 2f18f24993..a555e8c012 100644 --- a/.github/workflows/create-release.yml +++ b/.github/workflows/create-release.yml @@ -39,7 +39,7 @@ jobs: tar czvf joystream-node-macos.tar.gz -C ./target/release joystream-node - name: Temporarily save node binary - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: name: joystream-node-macos-${{ steps.compute_shasum.outputs.shasum }} path: joystream-node-macos.tar.gz @@ -80,7 +80,7 @@ jobs: tar -czvf joystream-node-$VERSION_AND_COMMIT-arm64-linux-gnu.tar.gz joystream-node - name: Retrieve saved MacOS binary - uses: actions/download-artifact@v3 + uses: actions/download-artifact@v4 with: name: joystream-node-macos-${{ steps.compute_shasum.outputs.shasum }} diff --git a/.github/workflows/deploy-node-network.yml b/.github/workflows/deploy-node-network.yml index 5d51135494..23f02eb537 100644 --- a/.github/workflows/deploy-node-network.yml +++ b/.github/workflows/deploy-node-network.yml @@ -168,7 +168,7 @@ jobs: 7z a -p${{ steps.network_config.outputs.encryptionKey }} chain-data.7z deploy_artifacts/* - name: Save the output as an artifact - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: name: data-chainspec-auth path: devops/ansible/chain-data.7z diff --git a/.github/workflows/deploy-playground.yml b/.github/workflows/deploy-playground.yml index 2c5abe6813..f15e40cf1e 100644 --- a/.github/workflows/deploy-playground.yml +++ b/.github/workflows/deploy-playground.yml @@ -34,7 +34,7 @@ on: description: 'SURI of treasury account' required: false default: '//Alice' - initialBalances: + initialBalances: description: 'JSON string or http URL to override initial balances and vesting config' default: '' required: false @@ -112,7 +112,7 @@ jobs: --verbose - name: Save the endpoints file as an artifact - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: name: endpoints path: devops/ansible/endpoints.json diff --git a/.github/workflows/run-network-tests.yml b/.github/workflows/run-network-tests.yml index fef9b8be46..89510acf38 100644 --- a/.github/workflows/run-network-tests.yml +++ b/.github/workflows/run-network-tests.yml @@ -141,7 +141,7 @@ jobs: if: steps.check_files.outputs.files_exists == 'false' - name: Save joystream/node image to Artifacts - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: name: ${{ steps.compute_shasum.outputs.shasum }}-joystream-node-docker-image.tar.gz path: joystream-node-docker-image.tar.gz @@ -152,7 +152,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - scenario: ['full', 'setupNewChain', 'setupNewChainMultiStorage', 'bonding', 'storageSync'] + scenario: ['full', 'setupNewChain', 'setupNewChainMultiStorage', 'bonding', 'storage'] include: - scenario: 'full' no_storage: 'false' @@ -160,13 +160,15 @@ jobs: no_storage: 'true' - scenario: 'setupNewChainMultiStorage' no_storage: 'true' + - scenario: 'storage' + cleanup_interval: '1' steps: - uses: actions/checkout@v4 - uses: actions/setup-node@v4 with: node-version: '18.x' - name: Get artifacts - uses: actions/download-artifact@v3 + uses: actions/download-artifact@v4 with: name: ${{ needs.build_images.outputs.use_artifact }} - name: Install artifacts @@ -182,4 +184,5 @@ jobs: run: | export RUNTIME=${{ needs.build_images.outputs.runtime }} export NO_STORAGE=${{ matrix.no_storage }} + export CLEANUP_INTERVAL=${{ matrix.cleanup_interval }} tests/network-tests/run-tests.sh ${{ matrix.scenario }} diff --git a/devops/extrinsic-ordering/tx-ordering.yml b/devops/extrinsic-ordering/tx-ordering.yml index 0613ed04d3..fbe514174e 100644 --- a/devops/extrinsic-ordering/tx-ordering.yml +++ b/devops/extrinsic-ordering/tx-ordering.yml @@ -74,7 +74,7 @@ jobs: run: pkill polkadot - name: Save output as artifact - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: name: ${{ env.CHAIN }} path: | diff --git a/docker-compose.yml b/docker-compose.yml index dfaf84feb6..2641db4e87 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -8,8 +8,8 @@ services: - chain-data:/data environment: - CHAIN=${CHAIN} - command: "--chain ${CHAIN:-dev} --alice --validator --pruning=archive --unsafe-ws-external --unsafe-rpc-external - --rpc-methods Safe --rpc-cors=all --log runtime --base-path /data --no-hardware-benchmarks" + command: '--chain ${CHAIN:-dev} --alice --validator --pruning=archive --unsafe-ws-external --unsafe-rpc-external + --rpc-methods Safe --rpc-cors=all --log runtime --base-path /data --no-hardware-benchmarks' ports: - 9944:9944 - 9933:9933 @@ -35,16 +35,25 @@ services: - ACCOUNT_URI=${COLOSSUS_1_TRANSACTOR_URI} - OTEL_EXPORTER_OTLP_ENDPOINT=${TELEMETRY_ENDPOINT} - OTEL_RESOURCE_ATTRIBUTES=service.name=colossus-1,deployment.environment=production + - CLEANUP + - CLEANUP_INTERVAL + - CLEANUP_NEW_OBJECT_EXPIRATION_PERIOD + - CLEANUP_MIN_REPLICATION_THRESHOLD entrypoint: ['/joystream/entrypoints/storage.sh'] - command: [ - 'server', '--worker=${COLOSSUS_1_WORKER_ID}', '--port=3333', '--uploads=/data/uploads/', - '--sync', '--syncInterval=1', - '--storageSquidEndpoint=${COLOSSUS_STORAGE_SQUID_URL}', - '--apiUrl=${JOYSTREAM_NODE_WS}', - '--logFilePath=/logs', - '--tempFolder=/data/temp/', - '--pendingFolder=/data/pending/' - ] + command: + [ + 'server', + '--worker=${COLOSSUS_1_WORKER_ID}', + '--port=3333', + '--uploads=/data/uploads/', + '--sync', + '--syncInterval=1', + '--storageSquidEndpoint=${COLOSSUS_STORAGE_SQUID_URL}', + '--apiUrl=${JOYSTREAM_NODE_WS}', + '--logFilePath=/logs', + '--tempFolder=/data/temp/', + '--pendingFolder=/data/pending/', + ] distributor-1: image: node:18 @@ -68,7 +77,7 @@ services: environment: JOYSTREAM_DISTRIBUTOR__ID: distributor-1 JOYSTREAM_DISTRIBUTOR__ENDPOINTS__STORAGE_SQUID: ${DISTRIBUTOR_STORAGE_SQUID_URL} - JOYSTREAM_DISTRIBUTOR__KEYS: "[{\"suri\":\"${DISTRIBUTOR_1_ACCOUNT_URI}\"}]" + JOYSTREAM_DISTRIBUTOR__KEYS: '[{"suri":"${DISTRIBUTOR_1_ACCOUNT_URI}"}]' JOYSTREAM_DISTRIBUTOR__WORKER_ID: ${DISTRIBUTOR_1_WORKER_ID} JOYSTREAM_DISTRIBUTOR__PUBLIC_API__PORT: 3334 JOYSTREAM_DISTRIBUTOR__OPERATOR_API__PORT: 4334 @@ -105,22 +114,25 @@ services: environment: # ACCOUNT_URI overrides command line arg --accountUri - ACCOUNT_URI=${COLOSSUS_2_TRANSACTOR_URI} - # Env that allows testing cleanup - - CLEANUP_NEW_OBJECT_EXPIRATION_PERIOD=10 - - CLEANUP_MIN_REPLICATION_THRESHOLD=1 + - CLEANUP + - CLEANUP_INTERVAL + - CLEANUP_NEW_OBJECT_EXPIRATION_PERIOD + - CLEANUP_MIN_REPLICATION_THRESHOLD entrypoint: ['yarn', 'storage-node'] - command: [ - 'server', '--worker=${COLOSSUS_2_WORKER_ID}', '--port=3333', '--uploads=/data/uploads', - '--sync', '--syncInterval=1', - '--storageSquidEndpoint=${COLOSSUS_STORAGE_SQUID_URL}', - '--apiUrl=${JOYSTREAM_NODE_WS}', - '--logFilePath=/logs', - '--tempFolder=/data/temp/', - '--pendingFolder=/data/pending/', - # Use cleanup on colossus-2 for testing purposes - '--cleanup', - '--cleanupInterval=1' - ] + command: + [ + 'server', + '--worker=${COLOSSUS_2_WORKER_ID}', + '--port=3333', + '--uploads=/data/uploads', + '--sync', + '--syncInterval=1', + '--storageSquidEndpoint=${COLOSSUS_STORAGE_SQUID_URL}', + '--apiUrl=${JOYSTREAM_NODE_WS}', + '--logFilePath=/logs', + '--tempFolder=/data/temp/', + '--pendingFolder=/data/pending/', + ] distributor-2: image: node:18 @@ -144,7 +156,7 @@ services: environment: JOYSTREAM_DISTRIBUTOR__ID: distributor-2 JOYSTREAM_DISTRIBUTOR__ENDPOINTS__STORAGE_SQUID: ${DISTRIBUTOR_STORAGE_SQUID_URL} - JOYSTREAM_DISTRIBUTOR__KEYS: "[{\"suri\":\"${DISTRIBUTOR_2_ACCOUNT_URI}\"}]" + JOYSTREAM_DISTRIBUTOR__KEYS: '[{"suri":"${DISTRIBUTOR_2_ACCOUNT_URI}"}]' JOYSTREAM_DISTRIBUTOR__WORKER_ID: ${DISTRIBUTOR_2_WORKER_ID} JOYSTREAM_DISTRIBUTOR__PUBLIC_API__PORT: 3334 JOYSTREAM_DISTRIBUTOR__OPERATOR_API__PORT: 4334 @@ -192,8 +204,8 @@ services: - OTEL_EXPORTER_OTLP_ENDPOINT=${TELEMETRY_ENDPOINT} - OTEL_RESOURCE_ATTRIBUTES=service.name=query-node,deployment.environment=production ports: - - "${GRAPHQL_SERVER_PORT}:${GRAPHQL_SERVER_PORT}" - - "127.0.0.1:${PROCESSOR_STATE_APP_PORT}:${PROCESSOR_STATE_APP_PORT}" + - '${GRAPHQL_SERVER_PORT}:${GRAPHQL_SERVER_PORT}' + - '127.0.0.1:${PROCESSOR_STATE_APP_PORT}:${PROCESSOR_STATE_APP_PORT}' depends_on: - db volumes: @@ -275,7 +287,7 @@ services: - PORT=${HYDRA_INDEXER_GATEWAY_PORT} - PGSSLMODE=disable ports: - - "${HYDRA_INDEXER_GATEWAY_PORT}:${HYDRA_INDEXER_GATEWAY_PORT}" + - '${HYDRA_INDEXER_GATEWAY_PORT}:${HYDRA_INDEXER_GATEWAY_PORT}' depends_on: - db - redis @@ -285,7 +297,7 @@ services: container_name: redis restart: unless-stopped ports: - - "127.0.0.1:6379:6379" + - '127.0.0.1:6379:6379' faucet: image: joystream/faucet:carthage @@ -304,7 +316,7 @@ services: - BALANCE_CREDIT=${BALANCE_CREDIT} - BALANCE_LOCKED=${BALANCE_LOCKED} ports: - - "3002:3002" + - '3002:3002' # PostgerSQL database for Orion orion-db: @@ -437,10 +449,7 @@ services: environment: DATABASE_MAX_CONNECTIONS: 5 RUST_LOG: 'actix_web=info,actix_server=info' - command: [ - '--database-url', - 'postgres://postgres:postgres@orion_archive_db:${ARCHIVE_DB_PORT}/squid-archive', - ] + command: ['--database-url', 'postgres://postgres:postgres@orion_archive_db:${ARCHIVE_DB_PORT}/squid-archive'] ports: - '127.0.0.1:${ARCHIVE_GATEWAY_PORT}:8000' - '[::1]:${ARCHIVE_GATEWAY_PORT}:8000' diff --git a/storage-node/CHANGELOG.md b/storage-node/CHANGELOG.md index 193e9b6a88..f2e953af07 100644 --- a/storage-node/CHANGELOG.md +++ b/storage-node/CHANGELOG.md @@ -1,10 +1,30 @@ +### 4.5.0 + +#### Features + +- New commands to help storage bags / buckets management: + + - `leader:set-replication` - allows adjusting bag-to-bucket assignments in order to achieve a target replication rate. + - `leader:copy-bags` - allows copying all bags from one bucket / set of buckets to a different bucket / set of buckets. + - `leader:empty-bucket` - allows removing all bags from a given bucket. + + All of those commands support generating detailed summaries of planned / executed changes in the storage system thanks to the new `BagsUpdateCreator` and `BagsUpdateSummaryCreator` services. + +- Adds a possibility to set `CLEANUP` and `CLEANUP_INTERVAL` via env in the `server` command. + +#### Small / internal changes + +- Fixes Colossus docker build by removing a deprecated [`@types/winston`](https://www.npmjs.com/package/@types/winston) package. +- Adds a few new utility functions (`stringifyBagId`, `cmpBagId`, `isEvent`, `asStorageSize`, `getBatchResults`). +- Updates `updateStorageBucketsForBags` to rely on the new `getBatchResults` utility function. + ### 4.4.0 - **Optimizations:** The way data objects / data object ids are queried and processed during sync and cleanup has been optimized: - - Sync and cleanup services now process tasks in batches of configurable size (`--syncBatchSize`, `--cleanupBatchSize`) to avoid overflowing the memory. - - Synchronous operations like `sort` or `filter` on larger arrays of data objects have been optimized (for example, by replacing `.filter(Array.includes(...))` with `.filter(Set.has(...))`). - - Enforced a limit of max. results per single GraphQL query to `10,000` and max input arguments per query to `1,000`. - - Added `--cleanupWorkersNumber` flag to limit the number of concurrent async requests during cleanup. + - Sync and cleanup services now process tasks in batches of configurable size (`--syncBatchSize`, `--cleanupBatchSize`) to avoid overflowing the memory. + - Synchronous operations like `sort` or `filter` on larger arrays of data objects have been optimized (for example, by replacing `.filter(Array.includes(...))` with `.filter(Set.has(...))`). + - Enforced a limit of max. results per single GraphQL query to `10,000` and max input arguments per query to `1,000`. + - Added `--cleanupWorkersNumber` flag to limit the number of concurrent async requests during cleanup. - A safety mechanism was added to avoid removing "deleted" objects for which a related `DataObjectDeleted` event cannot be found in storage squid. - Improved logging during sync and cleanup. diff --git a/storage-node/README.md b/storage-node/README.md index e7228f6c62..cac05587ef 100644 --- a/storage-node/README.md +++ b/storage-node/README.md @@ -150,12 +150,15 @@ There is also an option to run Colossus as [Docker container](../colossus.Docker * [`storage-node archive`](#storage-node-archive) * [`storage-node help [COMMAND]`](#storage-node-help-command) * [`storage-node leader:cancel-invite`](#storage-node-leadercancel-invite) +* [`storage-node leader:copy-bags`](#storage-node-leadercopy-bags) * [`storage-node leader:create-bucket`](#storage-node-leadercreate-bucket) * [`storage-node leader:delete-bucket`](#storage-node-leaderdelete-bucket) +* [`storage-node leader:empty-bucket`](#storage-node-leaderempty-bucket) * [`storage-node leader:invite-operator`](#storage-node-leaderinvite-operator) * [`storage-node leader:remove-operator`](#storage-node-leaderremove-operator) * [`storage-node leader:set-bucket-limits`](#storage-node-leaderset-bucket-limits) * [`storage-node leader:set-global-uploading-status`](#storage-node-leaderset-global-uploading-status) +* [`storage-node leader:set-replication`](#storage-node-leaderset-replication) * [`storage-node leader:update-bag-limit`](#storage-node-leaderupdate-bag-limit) * [`storage-node leader:update-bags`](#storage-node-leaderupdate-bags) * [`storage-node leader:update-blacklist`](#storage-node-leaderupdate-blacklist) @@ -305,7 +308,7 @@ OPTIONS [default: 4] Upload workers number (max async operations in progress). ``` -_See code: [src/commands/archive.ts](https://github.com/Joystream/joystream/blob/v4.4.0/src/commands/archive.ts)_ +_See code: [src/commands/archive.ts](https://github.com/Joystream/joystream/blob/v4.5.0/src/commands/archive.ts)_ ## `storage-node help [COMMAND]` @@ -349,7 +352,51 @@ OPTIONS --keyStore=keyStore Path to a folder with multiple key files to load into keystore. ``` -_See code: [src/commands/leader/cancel-invite.ts](https://github.com/Joystream/joystream/blob/v4.4.0/src/commands/leader/cancel-invite.ts)_ +_See code: [src/commands/leader/cancel-invite.ts](https://github.com/Joystream/joystream/blob/v4.5.0/src/commands/leader/cancel-invite.ts)_ + +## `storage-node leader:copy-bags` + +Copy all storage bags from a given bucket / set of buckets to a different bucket / set of buckets + +``` +USAGE + $ storage-node leader:copy-bags + +OPTIONS + -b, --batchSize=batchSize [default: 100] Number of extrinsics to send in a single utility.batch call + -h, --help show CLI help + -k, --keyFile=keyFile Path to key file to add to the keyring. + -m, --dev Use development mode + -o, --output=output Output result to a file (based on the provided path) instead of stdout + + -p, --password=password Password to unlock keyfiles. Multiple passwords can be passed, to try against all files. + If not specified a single password can be set in ACCOUNT_PWD environment variable. + + -u, --apiUrl=apiUrl [default: ws://localhost:9944] Runtime API URL. Mandatory in non-dev environment. + + -y, --accountUri=accountUri Account URI (optional). If not specified a single key can be set in ACCOUNT_URI + environment variable. + + --copies=copies [default: 1] Number of copies to make (by default each bag is only copied once, to a + single, selected destination bucket) + + --dryRun Assumes all transactions were successful and generates the summary + + --from=from (required) List of bucket ids to copy bags from + + --keyStore=keyStore Path to a folder with multiple key files to load into keystore. + + --skipBucketsSummary Whether to skip a summary of changes by each individual bucket in the final result + + --skipConfirmation Skips asking for confirmation before sending transactions + + --skipTxSummary Whether to skip a summary of changes by each individual batch transaction in the final + result + + --to=to (required) List of bucket ids to copy bags to +``` + +_See code: [src/commands/leader/copy-bags.ts](https://github.com/Joystream/joystream/blob/v4.5.0/src/commands/leader/copy-bags.ts)_ ## `storage-node leader:create-bucket` @@ -380,7 +427,7 @@ OPTIONS --keyStore=keyStore Path to a folder with multiple key files to load into keystore. ``` -_See code: [src/commands/leader/create-bucket.ts](https://github.com/Joystream/joystream/blob/v4.4.0/src/commands/leader/create-bucket.ts)_ +_See code: [src/commands/leader/create-bucket.ts](https://github.com/Joystream/joystream/blob/v4.5.0/src/commands/leader/create-bucket.ts)_ ## `storage-node leader:delete-bucket` @@ -407,7 +454,46 @@ OPTIONS --keyStore=keyStore Path to a folder with multiple key files to load into keystore. ``` -_See code: [src/commands/leader/delete-bucket.ts](https://github.com/Joystream/joystream/blob/v4.4.0/src/commands/leader/delete-bucket.ts)_ +_See code: [src/commands/leader/delete-bucket.ts](https://github.com/Joystream/joystream/blob/v4.5.0/src/commands/leader/delete-bucket.ts)_ + +## `storage-node leader:empty-bucket` + +Removes all storage bags from a given bucket / set of buckets + +``` +USAGE + $ storage-node leader:empty-bucket + +OPTIONS + -b, --batchSize=batchSize [default: 100] Number of extrinsics to send in a single utility.batch call + -h, --help show CLI help + -k, --keyFile=keyFile Path to key file to add to the keyring. + -m, --dev Use development mode + -o, --output=output Output result to a file (based on the provided path) instead of stdout + + -p, --password=password Password to unlock keyfiles. Multiple passwords can be passed, to try against all files. + If not specified a single password can be set in ACCOUNT_PWD environment variable. + + -u, --apiUrl=apiUrl [default: ws://localhost:9944] Runtime API URL. Mandatory in non-dev environment. + + -y, --accountUri=accountUri Account URI (optional). If not specified a single key can be set in ACCOUNT_URI + environment variable. + + --dryRun Assumes all transactions were successful and generates the summary + + --id=id (required) Id of the bucket to remove bags from + + --keyStore=keyStore Path to a folder with multiple key files to load into keystore. + + --skipBucketsSummary Whether to skip a summary of changes by each individual bucket in the final result + + --skipConfirmation Skips asking for confirmation before sending transactions + + --skipTxSummary Whether to skip a summary of changes by each individual batch transaction in the final + result +``` + +_See code: [src/commands/leader/empty-bucket.ts](https://github.com/Joystream/joystream/blob/v4.5.0/src/commands/leader/empty-bucket.ts)_ ## `storage-node leader:invite-operator` @@ -436,7 +522,7 @@ OPTIONS --keyStore=keyStore Path to a folder with multiple key files to load into keystore. ``` -_See code: [src/commands/leader/invite-operator.ts](https://github.com/Joystream/joystream/blob/v4.4.0/src/commands/leader/invite-operator.ts)_ +_See code: [src/commands/leader/invite-operator.ts](https://github.com/Joystream/joystream/blob/v4.5.0/src/commands/leader/invite-operator.ts)_ ## `storage-node leader:remove-operator` @@ -463,7 +549,7 @@ OPTIONS --keyStore=keyStore Path to a folder with multiple key files to load into keystore. ``` -_See code: [src/commands/leader/remove-operator.ts](https://github.com/Joystream/joystream/blob/v4.4.0/src/commands/leader/remove-operator.ts)_ +_See code: [src/commands/leader/remove-operator.ts](https://github.com/Joystream/joystream/blob/v4.5.0/src/commands/leader/remove-operator.ts)_ ## `storage-node leader:set-bucket-limits` @@ -493,7 +579,7 @@ OPTIONS --keyStore=keyStore Path to a folder with multiple key files to load into keystore. ``` -_See code: [src/commands/leader/set-bucket-limits.ts](https://github.com/Joystream/joystream/blob/v4.4.0/src/commands/leader/set-bucket-limits.ts)_ +_See code: [src/commands/leader/set-bucket-limits.ts](https://github.com/Joystream/joystream/blob/v4.5.0/src/commands/leader/set-bucket-limits.ts)_ ## `storage-node leader:set-global-uploading-status` @@ -521,7 +607,47 @@ OPTIONS --keyStore=keyStore Path to a folder with multiple key files to load into keystore. ``` -_See code: [src/commands/leader/set-global-uploading-status.ts](https://github.com/Joystream/joystream/blob/v4.4.0/src/commands/leader/set-global-uploading-status.ts)_ +_See code: [src/commands/leader/set-global-uploading-status.ts](https://github.com/Joystream/joystream/blob/v4.5.0/src/commands/leader/set-global-uploading-status.ts)_ + +## `storage-node leader:set-replication` + +Adjusts bag-to-bucket assignments to achieve a given replication rate. + +``` +USAGE + $ storage-node leader:set-replication + +OPTIONS + -a, --[no-]activeOnly Only take active buckets into account when calculating replication rate and updating bags + -b, --batchSize=batchSize [default: 100] Number of extrinsics to send in a single utility.batch call + -h, --help show CLI help + -k, --keyFile=keyFile Path to key file to add to the keyring. + -m, --dev Use development mode + -o, --output=output Output result to a file (based on the provided path) instead of stdout + + -p, --password=password Password to unlock keyfiles. Multiple passwords can be passed, to try against all files. + If not specified a single password can be set in ACCOUNT_PWD environment variable. + + -r, --rate=rate (required) The target replication rate + + -u, --apiUrl=apiUrl [default: ws://localhost:9944] Runtime API URL. Mandatory in non-dev environment. + + -y, --accountUri=accountUri Account URI (optional). If not specified a single key can be set in ACCOUNT_URI + environment variable. + + --dryRun Assumes all transactions were successful and generates the summary + + --keyStore=keyStore Path to a folder with multiple key files to load into keystore. + + --skipBucketsSummary Whether to skip a summary of changes by each individual bucket in the final result + + --skipConfirmation Skips asking for confirmation before sending transactions + + --skipTxSummary Whether to skip a summary of changes by each individual batch transaction in the final + result +``` + +_See code: [src/commands/leader/set-replication.ts](https://github.com/Joystream/joystream/blob/v4.5.0/src/commands/leader/set-replication.ts)_ ## `storage-node leader:update-bag-limit` @@ -548,7 +674,7 @@ OPTIONS --keyStore=keyStore Path to a folder with multiple key files to load into keystore. ``` -_See code: [src/commands/leader/update-bag-limit.ts](https://github.com/Joystream/joystream/blob/v4.4.0/src/commands/leader/update-bag-limit.ts)_ +_See code: [src/commands/leader/update-bag-limit.ts](https://github.com/Joystream/joystream/blob/v4.5.0/src/commands/leader/update-bag-limit.ts)_ ## `storage-node leader:update-bags` @@ -604,7 +730,7 @@ OPTIONS Path to a folder with multiple key files to load into keystore. ``` -_See code: [src/commands/leader/update-bags.ts](https://github.com/Joystream/joystream/blob/v4.4.0/src/commands/leader/update-bags.ts)_ +_See code: [src/commands/leader/update-bags.ts](https://github.com/Joystream/joystream/blob/v4.5.0/src/commands/leader/update-bags.ts)_ ## `storage-node leader:update-blacklist` @@ -633,7 +759,7 @@ OPTIONS --keyStore=keyStore Path to a folder with multiple key files to load into keystore. ``` -_See code: [src/commands/leader/update-blacklist.ts](https://github.com/Joystream/joystream/blob/v4.4.0/src/commands/leader/update-blacklist.ts)_ +_See code: [src/commands/leader/update-blacklist.ts](https://github.com/Joystream/joystream/blob/v4.5.0/src/commands/leader/update-blacklist.ts)_ ## `storage-node leader:update-bucket-status` @@ -662,7 +788,7 @@ OPTIONS --keyStore=keyStore Path to a folder with multiple key files to load into keystore. ``` -_See code: [src/commands/leader/update-bucket-status.ts](https://github.com/Joystream/joystream/blob/v4.4.0/src/commands/leader/update-bucket-status.ts)_ +_See code: [src/commands/leader/update-bucket-status.ts](https://github.com/Joystream/joystream/blob/v4.5.0/src/commands/leader/update-bucket-status.ts)_ ## `storage-node leader:update-data-fee` @@ -689,7 +815,7 @@ OPTIONS --keyStore=keyStore Path to a folder with multiple key files to load into keystore. ``` -_See code: [src/commands/leader/update-data-fee.ts](https://github.com/Joystream/joystream/blob/v4.4.0/src/commands/leader/update-data-fee.ts)_ +_See code: [src/commands/leader/update-data-fee.ts](https://github.com/Joystream/joystream/blob/v4.5.0/src/commands/leader/update-data-fee.ts)_ ## `storage-node leader:update-data-object-bloat-bond` @@ -717,7 +843,7 @@ OPTIONS --keyStore=keyStore Path to a folder with multiple key files to load into keystore. ``` -_See code: [src/commands/leader/update-data-object-bloat-bond.ts](https://github.com/Joystream/joystream/blob/v4.4.0/src/commands/leader/update-data-object-bloat-bond.ts)_ +_See code: [src/commands/leader/update-data-object-bloat-bond.ts](https://github.com/Joystream/joystream/blob/v4.5.0/src/commands/leader/update-data-object-bloat-bond.ts)_ ## `storage-node leader:update-dynamic-bag-policy` @@ -747,7 +873,7 @@ OPTIONS --keyStore=keyStore Path to a folder with multiple key files to load into keystore. ``` -_See code: [src/commands/leader/update-dynamic-bag-policy.ts](https://github.com/Joystream/joystream/blob/v4.4.0/src/commands/leader/update-dynamic-bag-policy.ts)_ +_See code: [src/commands/leader/update-dynamic-bag-policy.ts](https://github.com/Joystream/joystream/blob/v4.5.0/src/commands/leader/update-dynamic-bag-policy.ts)_ ## `storage-node leader:update-voucher-limits` @@ -776,7 +902,7 @@ OPTIONS --keyStore=keyStore Path to a folder with multiple key files to load into keystore. ``` -_See code: [src/commands/leader/update-voucher-limits.ts](https://github.com/Joystream/joystream/blob/v4.4.0/src/commands/leader/update-voucher-limits.ts)_ +_See code: [src/commands/leader/update-voucher-limits.ts](https://github.com/Joystream/joystream/blob/v4.5.0/src/commands/leader/update-voucher-limits.ts)_ ## `storage-node operator:accept-invitation` @@ -809,7 +935,7 @@ OPTIONS --keyStore=keyStore Path to a folder with multiple key files to load into keystore. ``` -_See code: [src/commands/operator/accept-invitation.ts](https://github.com/Joystream/joystream/blob/v4.4.0/src/commands/operator/accept-invitation.ts)_ +_See code: [src/commands/operator/accept-invitation.ts](https://github.com/Joystream/joystream/blob/v4.5.0/src/commands/operator/accept-invitation.ts)_ ## `storage-node operator:set-metadata` @@ -840,7 +966,7 @@ OPTIONS --keyStore=keyStore Path to a folder with multiple key files to load into keystore. ``` -_See code: [src/commands/operator/set-metadata.ts](https://github.com/Joystream/joystream/blob/v4.4.0/src/commands/operator/set-metadata.ts)_ +_See code: [src/commands/operator/set-metadata.ts](https://github.com/Joystream/joystream/blob/v4.5.0/src/commands/operator/set-metadata.ts)_ ## `storage-node server` @@ -914,6 +1040,12 @@ OPTIONS -z, --logFileChangeFrequency=(yearly|monthly|daily|hourly|none) [default: daily] Log files update frequency. + --cleanupBatchSize=cleanupBatchSize [default: 10000] Maximum number of objects to process + in a single batch during cleanup. + + --cleanupWorkersNumber=cleanupWorkersNumber [default: 100] Cleanup workers number (max async + operations in progress). + --elasticSearchIndexPrefix=elasticSearchIndexPrefix Elasticsearch index prefix. Node ID will be appended to the prefix. Default: logs-colossus. Can be passed through ELASTIC_INDEX_PREFIX environment variable. @@ -937,6 +1069,9 @@ OPTIONS If not specified a subfolder under the uploads directory will be used. + --syncBatchSize=syncBatchSize [default: 10000] Maximum number of objects to process + in a single batch during synchronization. + --syncRetryInterval=syncRetryInterval [default: 3] Interval before retrying failed synchronization run (in minutes) @@ -946,7 +1081,7 @@ OPTIONS directory will be used. ``` -_See code: [src/commands/server.ts](https://github.com/Joystream/joystream/blob/v4.4.0/src/commands/server.ts)_ +_See code: [src/commands/server.ts](https://github.com/Joystream/joystream/blob/v4.5.0/src/commands/server.ts)_ ## `storage-node util:cleanup` @@ -981,10 +1116,13 @@ OPTIONS -y, --accountUri=accountUri Account URI (optional). If not specified a single key can be set in ACCOUNT_URI environment variable. + --cleanupBatchSize=cleanupBatchSize [default: 10000] Maximum number of objects to process in a single + batch during cleanup. + --keyStore=keyStore Path to a folder with multiple key files to load into keystore. ``` -_See code: [src/commands/util/cleanup.ts](https://github.com/Joystream/joystream/blob/v4.4.0/src/commands/util/cleanup.ts)_ +_See code: [src/commands/util/cleanup.ts](https://github.com/Joystream/joystream/blob/v4.5.0/src/commands/util/cleanup.ts)_ ## `storage-node util:fetch-bucket` @@ -1011,13 +1149,16 @@ OPTIONS -t, --syncWorkersTimeout=syncWorkersTimeout [default: 30] Asset downloading timeout for the syncronization (in minutes). + --syncBatchSize=syncBatchSize [default: 10000] Maximum number of objects to process in a single + batch. + --tempFolder=tempFolder Directory to store tempory files during sync and upload (absolute path). ,Temporary directory (absolute path). If not specified a subfolder under the uploads directory will be used. ``` -_See code: [src/commands/util/fetch-bucket.ts](https://github.com/Joystream/joystream/blob/v4.4.0/src/commands/util/fetch-bucket.ts)_ +_See code: [src/commands/util/fetch-bucket.ts](https://github.com/Joystream/joystream/blob/v4.5.0/src/commands/util/fetch-bucket.ts)_ ## `storage-node util:multihash` @@ -1032,7 +1173,7 @@ OPTIONS -h, --help show CLI help ``` -_See code: [src/commands/util/multihash.ts](https://github.com/Joystream/joystream/blob/v4.4.0/src/commands/util/multihash.ts)_ +_See code: [src/commands/util/multihash.ts](https://github.com/Joystream/joystream/blob/v4.5.0/src/commands/util/multihash.ts)_ ## `storage-node util:search-archives` @@ -1049,7 +1190,7 @@ OPTIONS -o, --dataObjects=dataObjects (required) List of the data object ids to look for (comma-separated) ``` -_See code: [src/commands/util/search-archives.ts](https://github.com/Joystream/joystream/blob/v4.4.0/src/commands/util/search-archives.ts)_ +_See code: [src/commands/util/search-archives.ts](https://github.com/Joystream/joystream/blob/v4.5.0/src/commands/util/search-archives.ts)_ ## `storage-node util:verify-bag-id` @@ -1077,5 +1218,5 @@ OPTIONS - dynamic:member:4 ``` -_See code: [src/commands/util/verify-bag-id.ts](https://github.com/Joystream/joystream/blob/v4.4.0/src/commands/util/verify-bag-id.ts)_ +_See code: [src/commands/util/verify-bag-id.ts](https://github.com/Joystream/joystream/blob/v4.5.0/src/commands/util/verify-bag-id.ts)_ diff --git a/storage-node/package.json b/storage-node/package.json index fb5127f42d..824fd2c204 100644 --- a/storage-node/package.json +++ b/storage-node/package.json @@ -1,7 +1,7 @@ { "name": "storage-node", "description": "Joystream storage subsystem.", - "version": "4.4.0", + "version": "4.5.0", "author": "Joystream contributors", "bin": { "storage-node": "./bin/run" @@ -20,6 +20,7 @@ "@oclif/plugin-help": "^3", "@polkadot/api": "10.7.1", "@types/base64url": "^2.0.0", + "@types/cli-progress": "^3.11.6", "@types/express": "4.17.13", "@types/file-type": "^10.9.1", "@types/lodash": "^4.14.171", @@ -33,12 +34,12 @@ "@types/superagent": "^4.1.12", "@types/url-join": "^4.0.1", "@types/uuid": "^8.3.1", - "@types/winston": "^2.4.4", "ajv": "^8.0.0", "await-lock": "^2.1.0", "base64url": "^3.0.1", "blake3-wasm": "^2.1.5", "chokidar": "4.0.1", + "cli-progress": "^3.12.0", "cors": "^2.8.5", "cross-fetch": "^3.1.4", "express": "4.17.1", diff --git a/storage-node/src/command-base/ApiCommandBase.ts b/storage-node/src/command-base/ApiCommandBase.ts index 8a803069c2..caa6d108f0 100644 --- a/storage-node/src/command-base/ApiCommandBase.ts +++ b/storage-node/src/command-base/ApiCommandBase.ts @@ -61,6 +61,9 @@ export default abstract class ApiCommandBase extends Command { async finally(err: Error | undefined): Promise { // called after run and catch regardless of whether or not the command errored // We'll force exit here, in case there is no error, to prevent console.log from hanging the process + if (err && process.env.DEBUG) { + console.error(err) + } if (!err) this.exit(0) super.finally(err) } diff --git a/storage-node/src/commands/leader/copy-bags.ts b/storage-node/src/commands/leader/copy-bags.ts new file mode 100644 index 0000000000..355fa42ad9 --- /dev/null +++ b/storage-node/src/commands/leader/copy-bags.ts @@ -0,0 +1,173 @@ +import _ from 'lodash' +import fs from 'fs/promises' +import readline from 'node:readline/promises' +import { stderr, stdin } from 'node:process' +import assert from 'node:assert' +import { SingleBar } from 'cli-progress' +import { flags } from '@oclif/command' +import ExitCodes from '../../command-base/ExitCodes' +import LeaderCommandBase from '../../command-base/LeaderCommandBase' +import logger from '../../services/logger' +import { getBatchResults, ParsedBatchCallResult } from '../../services/runtime/extrinsics' +import { stringifyBagId } from '../../services/helpers/bagTypes' +import { sendAndFollowNamedTx } from '../../services/runtime/api' +import { BagsUpdateCreator } from '../../services/helpers/bagsUpdate' + +/** + * CLI command: + * Copies storage bags from a given bucket / set of buckets to a different bucket / set of buckets. + * + * @remarks + * Storage working group leader command. Requires storage WG leader priviliges. + * Shell command: "leader:copy-bags" + */ +export default class LeaderCopyBags extends LeaderCommandBase { + static description = `Copy all storage bags from a given bucket / set of buckets to a different bucket / set of buckets` + + static flags = { + from: flags.integer({ + multiple: true, + required: true, + description: 'List of bucket ids to copy bags from', + }), + to: flags.integer({ + multiple: true, + required: true, + description: 'List of bucket ids to copy bags to', + }), + copies: flags.integer({ + default: 1, + description: + 'Number of copies to make (by default each bag is only copied once, to a single, selected destination bucket)', + }), + batchSize: flags.integer({ + char: 'b', + default: 100, + description: 'Number of extrinsics to send in a single utility.batch call', + }), + dryRun: flags.boolean({ + default: false, + description: 'Assumes all transactions were successful and generates the summary', + }), + skipBucketsSummary: flags.boolean({ + default: false, + description: 'Whether to skip a summary of changes by each individual bucket in the final result', + }), + skipTxSummary: flags.boolean({ + default: false, + description: 'Whether to skip a summary of changes by each individual batch transaction in the final result', + }), + skipConfirmation: flags.boolean({ + default: false, + description: 'Skips asking for confirmation before sending transactions', + }), + output: flags.string({ + char: 'o', + description: 'Output result to a file (based on the provided path) instead of stdout', + }), + ...LeaderCommandBase.flags, + } + + async promptForConfirmation(): Promise { + const rl = readline.createInterface({ input: stdin, output: stderr }) + const confirmed = await rl.question('Are you sure you want to continue? (y/N) ') + rl.close() + + return confirmed === 'y' + } + + async run(): Promise { + const { + flags: { from, to, batchSize, skipBucketsSummary, skipTxSummary, output, dryRun, skipConfirmation, copies }, + } = this.parse(LeaderCopyBags) + + if (output) { + try { + await fs.writeFile(output, '') + } catch (e) { + logger.error(`Cannot access "${output}" for writing: ${e instanceof Error ? e.message : String(e)}`) + this.exit(ExitCodes.FileError) + } + } + + const api = await this.getApi() + + const bucketsIntersection = _.intersection(from, to).length + if (bucketsIntersection) { + this.error( + `--from and --to arrays cannot intersect! Values: ${JSON.stringify(bucketsIntersection)} appear in both arrays.` + ) + } + + const bucketsToFetch = [...from, ...to] + const bagsUpdateCreator = new BagsUpdateCreator(api) + + logger.info(`Fetching storage buckets (${JSON.stringify(bucketsToFetch)})...`) + await bagsUpdateCreator.loadBucketsByIds(bucketsToFetch) + logger.info(`${bagsUpdateCreator.loadedBucketsCount} storage buckets fetched.`) + + logger.info(`Fetching storage bags of bucket(s) ${from.join(', ')}...`) + await bagsUpdateCreator.loadBagsBy((bag) => from.some((bucketId) => bag.storedBy.has(bucketId))) + logger.info(`${bagsUpdateCreator.loadedBagsCount} storage bags found.`) + + logger.info(`Preparing storage bag updates...`) + bagsUpdateCreator.prepareUpdates((bag) => { + for (let i = 0; i < copies; ++i) { + const bucket = bagsUpdateCreator.pickBucketToAdd(bag, to) + if (!bucket) { + this.error( + `Cannot make ${i} copies of bag ${stringifyBagId(bag.id)}! No more buckets available... ` + + `Provide more destination buckets, reduce the number of copies or increase storage bucket voucher limits.` + ) + } + bag.addBucket(bucket) + } + }) + const { modifiedBagsCount } = bagsUpdateCreator + logger.info(`${modifiedBagsCount} updates prepared.`) + + const summaryCreator = bagsUpdateCreator.getSummaryCreator({ + logger, + skipBucketsSummary, + skipTxSummary, + }) + + const extrinsicsWithBagUpdates = bagsUpdateCreator.prepareExtrinsics(batchSize) + logger.info( + `Will execute ${modifiedBagsCount} storage bag updates in ${extrinsicsWithBagUpdates.length} utility.forceBatch transactions` + ) + summaryCreator.printExpectedResults() + + const confirmed = skipConfirmation ? true : await this.promptForConfirmation() + + if (confirmed) { + const progressBar = new SingleBar({ noTTYOutput: true }) + progressBar.start(extrinsicsWithBagUpdates.length, 0, { title: `Executing the transactions...` }) + for (const [i, [batchTx, bagUpdates]] of extrinsicsWithBagUpdates.entries()) { + const batchResults: ParsedBatchCallResult[] | void = dryRun + ? Array.from({ length: bagUpdates.length }, () => ({ success: true })) + : await sendAndFollowNamedTx(api, this.getAccount(), batchTx, (result) => + getBatchResults(batchTx, api, result) + ) + assert(batchResults, `Could not parse utility.forceBatch results (tx: ${batchTx.hash.toJSON()})`) + summaryCreator.updateSummaryWithBatchResults( + dryRun ? `tx${i}_hash` : batchTx.hash.toJSON(), + batchResults, + bagUpdates + ) + progressBar.update(i + 1) + } + progressBar.stop() + + const summaryJson = summaryCreator.getSummaryJSON() + if (output) { + logger.info(`Writing output to ${output}...`) + await fs.writeFile(output, summaryJson) + } else { + console.log(summaryJson) + } + } + + this.exit() + } +} diff --git a/storage-node/src/commands/leader/empty-bucket.ts b/storage-node/src/commands/leader/empty-bucket.ts new file mode 100644 index 0000000000..905b6889fd --- /dev/null +++ b/storage-node/src/commands/leader/empty-bucket.ts @@ -0,0 +1,143 @@ +import fs from 'fs/promises' +import readline from 'node:readline/promises' +import { stderr, stdin } from 'node:process' +import assert from 'node:assert' +import { SingleBar } from 'cli-progress' +import { flags } from '@oclif/command' +import ExitCodes from '../../command-base/ExitCodes' +import LeaderCommandBase from '../../command-base/LeaderCommandBase' +import logger from '../../services/logger' +import { getBatchResults, ParsedBatchCallResult } from '../../services/runtime/extrinsics' +import { sendAndFollowNamedTx } from '../../services/runtime/api' +import { BagsUpdateCreator } from '../../services/helpers/bagsUpdate' + +/** + * CLI command: + * Removes all storage bags from a given bucket. + * + * @remarks + * Storage working group leader command. Requires storage WG leader priviliges. + * Shell command: "leader:empty-bucket" + */ +export default class LeaderEmptyBucket extends LeaderCommandBase { + static description = `Removes all storage bags from a given bucket / set of buckets` + + static flags = { + id: flags.integer({ + required: true, + description: 'Id of the bucket to remove bags from', + }), + batchSize: flags.integer({ + char: 'b', + default: 100, + description: 'Number of extrinsics to send in a single utility.batch call', + }), + dryRun: flags.boolean({ + default: false, + description: 'Assumes all transactions were successful and generates the summary', + }), + skipBucketsSummary: flags.boolean({ + default: false, + description: 'Whether to skip a summary of changes by each individual bucket in the final result', + }), + skipTxSummary: flags.boolean({ + default: false, + description: 'Whether to skip a summary of changes by each individual batch transaction in the final result', + }), + skipConfirmation: flags.boolean({ + default: false, + description: 'Skips asking for confirmation before sending transactions', + }), + output: flags.string({ + char: 'o', + description: 'Output result to a file (based on the provided path) instead of stdout', + }), + ...LeaderCommandBase.flags, + } + + async promptForConfirmation(): Promise { + const rl = readline.createInterface({ input: stdin, output: stderr }) + const confirmed = await rl.question('Are you sure you want to continue? (y/N) ') + rl.close() + + return confirmed === 'y' + } + + async run(): Promise { + const { + flags: { id, batchSize, skipBucketsSummary, skipTxSummary, output, dryRun, skipConfirmation }, + } = this.parse(LeaderEmptyBucket) + + if (output) { + try { + await fs.writeFile(output, '') + } catch (e) { + logger.error(`Cannot access "${output}" for writing: ${e instanceof Error ? e.message : String(e)}`) + this.exit(ExitCodes.FileError) + } + } + + const api = await this.getApi() + + const bagsUpdateCreator = new BagsUpdateCreator(api) + logger.info(`Fetching storage bucket ${id}...`) + await bagsUpdateCreator.loadBucketsByIds([id]) + logger.info(`Storage bucket successfully fetched.`) + + logger.info(`Fetching storage bags of bucket ${id}...`) + await bagsUpdateCreator.loadBagsBy((bag) => bag.storedBy.has(id)) + logger.info(`${bagsUpdateCreator.loadedBagsCount} storage bags found.`) + + logger.info(`Preparing storage bag updates...`) + const bucket = bagsUpdateCreator.getBucket(id) + bagsUpdateCreator.prepareUpdates((bag) => { + bag.removeBucket(bucket) + }) + logger.info(`Updates prepared.`) + + const summaryCreator = bagsUpdateCreator.getSummaryCreator({ + logger, + skipBucketsSummary, + skipTxSummary, + }) + + const extrinsicsWithBagUpdates = bagsUpdateCreator.prepareExtrinsics(batchSize) + logger.info( + `Will execute ${bagsUpdateCreator.modifiedBagsCount} storage bag updates in` + + ` ${extrinsicsWithBagUpdates.length} utility.forceBatch transactions` + ) + summaryCreator.printExpectedResults() + + const confirmed = skipConfirmation ? true : await this.promptForConfirmation() + + if (confirmed) { + const progressBar = new SingleBar({ noTTYOutput: true }) + progressBar.start(extrinsicsWithBagUpdates.length, 0, { title: `Executing the transactions...` }) + for (const [i, [batchTx, bagUpdates]] of extrinsicsWithBagUpdates.entries()) { + const batchResults: ParsedBatchCallResult[] | void = dryRun + ? Array.from({ length: bagUpdates.length }, () => ({ success: true })) + : await sendAndFollowNamedTx(api, this.getAccount(), batchTx, (result) => + getBatchResults(batchTx, api, result) + ) + assert(batchResults, `Could not parse utility.forceBatch results (tx: ${batchTx.hash.toJSON()})`) + summaryCreator.updateSummaryWithBatchResults( + dryRun ? `tx${i}_hash` : batchTx.hash.toJSON(), + batchResults, + bagUpdates + ) + progressBar.update(i + 1) + } + progressBar.stop() + + const summaryJson = summaryCreator.getSummaryJSON() + if (output) { + logger.info(`Writing output to ${output}...`) + await fs.writeFile(output, summaryJson) + } else { + console.log(summaryJson) + } + } + + this.exit() + } +} diff --git a/storage-node/src/commands/leader/set-replication.ts b/storage-node/src/commands/leader/set-replication.ts new file mode 100644 index 0000000000..86206999fc --- /dev/null +++ b/storage-node/src/commands/leader/set-replication.ts @@ -0,0 +1,181 @@ +import fs from 'fs/promises' +import readline from 'node:readline/promises' +import { stderr, stdin } from 'node:process' +import assert from 'node:assert' +import { SingleBar } from 'cli-progress' +import { flags } from '@oclif/command' +import ExitCodes from '../../command-base/ExitCodes' +import LeaderCommandBase from '../../command-base/LeaderCommandBase' +import logger from '../../services/logger' +import { getBatchResults, ParsedBatchCallResult } from '../../services/runtime/extrinsics' +import { stringifyBagId } from '../../services/helpers/bagTypes' +import { sendAndFollowNamedTx } from '../../services/runtime/api' +import { BagsUpdateCreator } from '../../services/helpers/bagsUpdate' + +/** + * CLI command: + * Adjusts bag-to-bucket assignments to achieve a given replication rate. + * + * @remarks + * Storage working group leader command. Requires storage WG leader priviliges. + * Shell command: "leader:set-replication" + */ +export default class LeaderSetReplication extends LeaderCommandBase { + static description = `Adjusts bag-to-bucket assignments to achieve a given replication rate.` + + static flags = { + rate: flags.integer({ + char: 'r', + required: true, + description: 'The target replication rate', + }), + activeOnly: flags.boolean({ + char: 'a', + allowNo: true, + default: true, + description: 'Only take active buckets into account when calculating replication rate and updating bags', + }), + batchSize: flags.integer({ + char: 'b', + default: 100, + description: 'Number of extrinsics to send in a single utility.batch call', + }), + dryRun: flags.boolean({ + default: false, + description: 'Assumes all transactions were successful and generates the summary', + }), + skipBucketsSummary: flags.boolean({ + default: false, + description: 'Whether to skip a summary of changes by each individual bucket in the final result', + }), + skipTxSummary: flags.boolean({ + default: false, + description: 'Whether to skip a summary of changes by each individual batch transaction in the final result', + }), + skipConfirmation: flags.boolean({ + default: false, + description: 'Skips asking for confirmation before sending transactions', + }), + output: flags.string({ + char: 'o', + description: 'Output result to a file (based on the provided path) instead of stdout', + }), + ...LeaderCommandBase.flags, + } + + async promptForConfirmation(): Promise { + const rl = readline.createInterface({ input: stdin, output: stderr }) + const confirmed = await rl.question('Are you sure you want to continue? (y/N) ') + rl.close() + + return confirmed === 'y' + } + + async run(): Promise { + const { + flags: { + rate: targetReplicationRate, + activeOnly, + batchSize, + skipBucketsSummary, + skipTxSummary, + output, + dryRun, + skipConfirmation, + }, + } = this.parse(LeaderSetReplication) + + if (output) { + try { + await fs.writeFile(output, '') + } catch (e) { + logger.error(`Cannot access "${output}" for writing: ${e instanceof Error ? e.message : String(e)}`) + this.exit(ExitCodes.FileError) + } + } + + const api = await this.getApi() + const bagsUpdateCreator = new BagsUpdateCreator(api) + + logger.info(`Fetching${activeOnly ? ' active' : ''} storage buckets...`) + await bagsUpdateCreator.loadBuckets(activeOnly) + logger.info(`${bagsUpdateCreator.loadedBucketsCount}${activeOnly ? ' active' : ''} storage buckets found.`) + + logger.info(`Fetching storage bags...`) + await bagsUpdateCreator.loadBags() + logger.info(`${bagsUpdateCreator.loadedBagsCount} storage bags found.`) + + logger.info(`Preparing storage bag updates...`) + let avgReplicationRate = 0 + bagsUpdateCreator.prepareUpdates((bag) => { + avgReplicationRate += bag.storedBy.size / bagsUpdateCreator.loadedBagsCount + + while (bag.storedBy.size > targetReplicationRate) { + const bucket = bagsUpdateCreator.pickBucketToRemove(bag) + assert(bucket, `Cannot pick a bucket to remove from bag ${stringifyBagId(bag.id)}`) + bag.removeBucket(bucket) + } + + while (bag.storedBy.size < targetReplicationRate) { + const bucket = bagsUpdateCreator.pickBucketToAdd(bag) + assert( + bucket, + 'Storage system capacity too low. Increase some stroage bucket voucher limits or choose a lower replication rate.' + ) + bag.addBucket(bucket) + } + }) + const { modifiedBagsCount } = bagsUpdateCreator + logger.info(`${modifiedBagsCount} updates prepared.`) + + const summaryCreator = bagsUpdateCreator.getSummaryCreator({ + logger, + replicationRate: { + initial: avgReplicationRate, + target: targetReplicationRate, + totalBagsNum: bagsUpdateCreator.loadedBagsCount, + }, + skipBucketsSummary, + skipTxSummary, + }) + + const extrinsicsWithBagUpdates = bagsUpdateCreator.prepareExtrinsics(batchSize) + + logger.info( + `Will execute ${modifiedBagsCount} storage bag updates in ${extrinsicsWithBagUpdates.length} utility.forceBatch transactions` + ) + summaryCreator.printExpectedResults() + + const confirmed = skipConfirmation ? true : await this.promptForConfirmation() + + if (confirmed) { + const progressBar = new SingleBar({ noTTYOutput: true }) + progressBar.start(extrinsicsWithBagUpdates.length, 0, { title: `Executing the transactions...` }) + for (const [i, [batchTx, bagUpdates]] of extrinsicsWithBagUpdates.entries()) { + const batchResults: ParsedBatchCallResult[] | void = dryRun + ? Array.from({ length: bagUpdates.length }, () => ({ success: true })) + : await sendAndFollowNamedTx(api, this.getAccount(), batchTx, (result) => + getBatchResults(batchTx, api, result) + ) + assert(batchResults, `Could not parse utility.forceBatch results (tx: ${batchTx.hash.toJSON()})`) + summaryCreator.updateSummaryWithBatchResults( + dryRun ? `tx${i}_hash` : batchTx.hash.toJSON(), + batchResults, + bagUpdates + ) + progressBar.update(i + 1) + } + progressBar.stop() + + const summaryJson = summaryCreator.getSummaryJSON() + if (output) { + logger.info(`Writing output to ${output}...`) + await fs.writeFile(output, summaryJson) + } else { + console.log(summaryJson) + } + } + + this.exit() + } +} diff --git a/storage-node/src/commands/server.ts b/storage-node/src/commands/server.ts index 62be3c75cc..6e94c51c14 100644 --- a/storage-node/src/commands/server.ts +++ b/storage-node/src/commands/server.ts @@ -85,7 +85,8 @@ export default class Server extends ApiCommandBase { cleanup: flags.boolean({ char: 'c', description: 'Enable cleanup/pruning of no-longer assigned assets.', - default: false, + // Setting `env` key doesn't work for boolean flags: https://github.com/oclif/core/issues/487 + default: process.env.CLEANUP === 'true', }), cleanupBatchSize: flags.integer({ description: 'Maximum number of objects to process in a single batch during cleanup.', @@ -95,6 +96,7 @@ export default class Server extends ApiCommandBase { char: 'i', description: 'Interval between periodic cleanup actions (in minutes)', default: 360, + env: 'CLEANUP_INTERVAL', }), cleanupWorkersNumber: flags.integer({ required: false, diff --git a/storage-node/src/services/helpers/bagTypes.ts b/storage-node/src/services/helpers/bagTypes.ts index 32eb8cf444..16226cd7bc 100644 --- a/storage-node/src/services/helpers/bagTypes.ts +++ b/storage-node/src/services/helpers/bagTypes.ts @@ -7,6 +7,7 @@ import { import { createType, keysOf } from '@joystream/types' import ExitCodes from '../../command-base/ExitCodes' import { CLIError } from '@oclif/errors' +import { hexToBigInt } from '@polkadot/util' /** * Special error type for bagId parsing. Extends the CLIError with setting @@ -51,6 +52,36 @@ export function parseBagId(bagId: string): BagId { return parser.parse() } +/** + * Converts a BagId Codec type to string + * (compatible with Storage Squid / Orion / Query Node bag id) + * + * @param bagId Bag id as Codec type + * @returns Bag id as string + */ +export function stringifyBagId(bagId: BagId): string { + if (bagId.isDynamic) { + return bagId.asDynamic.isChannel + ? `dynamic:channel:${bagId.asDynamic.asChannel.toString()}` + : `dynamic:member:${bagId.asDynamic.asMember.toString()}` + } + + return bagId.asStatic.isCouncil ? `static:council` : `static:wg:${bagId.asStatic.asWorkingGroup.type.toLowerCase()}` +} + +/** + * Compares two bag ids by converting them to a BigInt + * (useful for sorting) + * + * @param idA First bag id + * @param idB Second bag id + * @returns -1 if idA < idB, 0 if idA == idB, 1 if idA > idA + */ +export function cmpBagIds(idA: BagId, idB: BagId): number { + const diff = hexToBigInt(idA.toHex()) - hexToBigInt(idB.toHex()) + return diff < 0 ? -1 : diff > 0 ? 1 : 0 +} + /** * Class-helper for actual bag ID parsing. */ diff --git a/storage-node/src/services/helpers/bagsUpdate.ts b/storage-node/src/services/helpers/bagsUpdate.ts new file mode 100644 index 0000000000..14c14edad9 --- /dev/null +++ b/storage-node/src/services/helpers/bagsUpdate.ts @@ -0,0 +1,638 @@ +import _ from 'lodash' +import { + PalletStorageBagIdType, + PalletStorageBagRecord, + PalletStorageStorageBucketRecord, +} from '@polkadot/types/lookup' +import { ParsedBatchCallResult } from '../runtime/extrinsics' +import { cmpBagIds, stringifyBagId } from './bagTypes' +import { Logger } from 'winston' +import { asStorageSize } from './storageSize' +import { createType } from '@joystream/types' +import { ApiPromise } from '@polkadot/api' +import { SubmittableExtrinsic } from '@polkadot/api/types' + +export class UpdateableLimit { + public readonly limit: bigint + public readonly usedBefore: bigint + public change = 0n + + constructor(limit: bigint, usedBefore: bigint) { + this.limit = limit + this.usedBefore = usedBefore + } + + public get usedAfter(): bigint { + return this.usedBefore + this.change + } + + public get availableBefore(): bigint { + return this.limit - this.usedBefore + } + + public get availableAfter(): bigint { + return this.limit - this.usedAfter + } +} + +export class UpdateableBucket { + public readonly id: number + public readonly storage: UpdateableLimit + public readonly objects: UpdateableLimit + public readonly acceptingNewBags: boolean + public readonly active: boolean + public readonly bagsToRemove: Set + public readonly bagsToAdd: Set + + constructor(bucketId: number, storageBucket: PalletStorageStorageBucketRecord) { + const { sizeLimit, sizeUsed, objectsLimit, objectsUsed } = storageBucket.voucher + this.id = bucketId + this.active = storageBucket.operatorStatus.isStorageWorker.valueOf() + this.acceptingNewBags = storageBucket.acceptingNewBags.valueOf() + this.storage = new UpdateableLimit(sizeLimit.toBigInt(), sizeUsed.toBigInt()) + this.objects = new UpdateableLimit(objectsLimit.toBigInt(), objectsUsed.toBigInt()) + this.bagsToRemove = new Set() + this.bagsToAdd = new Set() + } +} + +export type BagUpdateSummary = { + size: bigint + bagId: string + bucketsToAdd: number[] + bucketsToRemove: number[] +} + +type FailedBagUpdate = BagUpdateSummary & { error: string } + +export class UpdateableBag { + public readonly id: PalletStorageBagIdType + public readonly size: bigint + public readonly objectsNum: bigint + public readonly storedBy: Set + public readonly bucketsToRemove: Set + public readonly bucketsToAdd: Set + + constructor(bagId: PalletStorageBagIdType, bag: PalletStorageBagRecord) { + const { objectsTotalSize, objectsNumber, storedBy } = bag + this.id = bagId + this.size = objectsTotalSize.toBigInt() + this.objectsNum = objectsNumber.toBigInt() + this.storedBy = new Set(Array.from(storedBy.values()).map((bucketId) => bucketId.toNumber())) + this.bucketsToRemove = new Set() + this.bucketsToAdd = new Set() + } + + public removeBucket(bucket: UpdateableBucket): void { + if (this.storedBy.has(bucket.id)) { + this.storedBy.delete(bucket.id) + this.bucketsToRemove.add(bucket.id) + bucket.storage.change -= this.size + bucket.objects.change -= this.objectsNum + bucket.bagsToRemove.add(stringifyBagId(this.id)) + } + } + + public addBucket(bucket: UpdateableBucket): void { + if (!this.storedBy.has(bucket.id)) { + this.storedBy.add(bucket.id) + this.bucketsToAdd.add(bucket.id) + bucket.storage.change += this.size + bucket.objects.change += this.objectsNum + bucket.bagsToAdd.add(stringifyBagId(this.id)) + } + } + + public toUpdateSummary(): BagUpdateSummary { + return { + bagId: stringifyBagId(this.id), + bucketsToAdd: Array.from(this.bucketsToAdd), + bucketsToRemove: Array.from(this.bucketsToRemove), + size: this.size, + } + } +} + +type BeforeAfterStats = { + before: T + after: T +} + +type BagsSummary = { + totalSize: bigint + bags: { id: string; size: bigint }[] +} + +export type Serialized = T extends Record + ? { [K in keyof T]: undefined extends T[K] ? Serialized> | undefined : Serialized } + : T extends Array + ? Array> + : T extends bigint + ? string + : T + +type BucketUpdateSummary = { + id: number + storageUsed: BeforeAfterStats + removed: BagsSummary + added: BagsSummary + failedToRemove: BagsSummary + failedToAdd: BagsSummary +} + +type TransactionSummary = { + hash: string + totalStorageUsage: BeforeAfterStats + avgReplicationRate?: BeforeAfterStats + successfulUpdates: BagUpdateSummary[] + failedUpdates: FailedBagUpdate[] +} + +export type FinalSummary = { + totalStorageUsage: BeforeAfterStats + avgReplicationRate?: BeforeAfterStats + buckets?: BucketUpdateSummary[] + transactions?: TransactionSummary[] +} + +type BagsUpdateSummaryCreatorConfig = { + logger: Logger + storageBucketsMap: Map + skipBucketsSummary: boolean + skipTxSummary: boolean + replicationRate?: { + initial: number + target: number + totalBagsNum: number + } +} + +export type ExtrinsicWithBagUpdates = [SubmittableExtrinsic<'promise'>, BagUpdateSummary[]] + +export class BagsUpdateCreator { + private storageBucketsMap: Map = new Map() + private bags: UpdateableBag[] = [] + private modifiedBags: UpdateableBag[] = [] + private api: ApiPromise + + constructor(api: ApiPromise) { + this.api = api + } + + public get loadedBucketsCount(): number { + return this.storageBucketsMap.size + } + + public get loadedBagsCount(): number { + return this.bags.length + } + + public get modifiedBagsCount(): number { + return this.modifiedBags.length + } + + public async loadBucketsByIds(ids: number[]): Promise { + const entries = (await this.api.query.storage.storageBucketById.multi(ids)).map((optBucket, i) => { + const bucketId = ids[i] + try { + const storageBucket = optBucket.unwrap() + return [bucketId, new UpdateableBucket(bucketId, storageBucket)] as const + } catch (e) { + throw new Error(`Couldn't find bucket by id: ${bucketId}`) + } + }) + if (ids.length !== entries.length) { + throw new Error(`Failed to retrieve requested buckets!`) + } + this.setStorageBucketsMap(entries) + } + + public async loadBuckets(activeOnly?: boolean): Promise { + const entries = (await this.api.query.storage.storageBucketById.entries()).flatMap(([sKey, optBucket]) => { + const bucketId = sKey.args[0].toNumber() + const updatableBucket = new UpdateableBucket(bucketId, optBucket.unwrap()) + if (activeOnly && !updatableBucket.active) { + return [] + } + return [[bucketId, updatableBucket] as const] + }) + + this.setStorageBucketsMap(entries) + } + + public async loadBags(filterStoredBy = true): Promise { + await this.loadBagsBy(() => true, filterStoredBy) + } + + public async loadBagsBy(predicate: (b: UpdateableBag) => boolean, filterStoredBy = true): Promise { + const bags = (await this.api.query.storage.bags.entries()).flatMap(([sKey, bag]) => { + const bagId = sKey.args[0] + const updatableBag = new UpdateableBag(bagId, bag) + if (filterStoredBy) { + for (const bucketId of updatableBag.storedBy.values()) { + if (!this.storageBucketsMap.has(bucketId)) { + updatableBag.storedBy.delete(bucketId) + } + } + } + if (predicate(updatableBag)) { + return [updatableBag] + } + return [] + }) + this.setBags(bags) + } + + public getBucket(id: number): UpdateableBucket { + const bucket = this.storageBucketsMap.get(id) + if (!bucket) { + throw new Error(`Bucket ${id} not found in loaded storage buckets map`) + } + return bucket + } + + public getSummaryCreator( + config: Omit + ): BagsUpdateSummaryCreator { + return new BagsUpdateSummaryCreator({ + ...config, + storageBucketsMap: _.cloneDeep(this.storageBucketsMap), + }) + } + + public pickBucketToAdd(bag: UpdateableBag, choices?: number[]): UpdateableBucket | undefined { + // Pick a bucket with highest storage available among buckets that DON'T store the bag + // (taking into account already scheduled updates) and: + // - have objects.availableAfter >= bag.objectsNum + // - have storage.availableAfter >= bag.size + // - have acceptingNewBags == true + if (!choices) { + choices = Array.from(this.storageBucketsMap.keys()) + } + const availableBuckets = choices + .filter((bucketId) => !bag.storedBy.has(bucketId)) + .map((bucketId) => this.storageBucketsMap.get(bucketId)) + .filter( + (bucket): bucket is UpdateableBucket => + !!bucket && + bucket.acceptingNewBags && + bucket.objects.availableAfter >= bag.objectsNum && + bucket.storage.availableAfter >= bag.size + ) + + return _.maxBy(availableBuckets, (b) => b.storage.availableAfter) + } + + public pickBucketToRemove(bag: UpdateableBag, choices?: number[]): UpdateableBucket | undefined { + // Pick a bucket with lowest storage available (taking into account already scheduled updates) + // among buckets that store the bag + if (!choices) { + choices = Array.from(bag.storedBy) + } + const candidates = choices + .filter((bucketId) => bag.storedBy.has(bucketId)) + .map((bucketId) => this.storageBucketsMap.get(bucketId)) + .filter((bucket): bucket is UpdateableBucket => !!bucket) + + return _.minBy(candidates, (b) => b.storage.availableAfter) + } + + public prepareUpdates(updateFunc: (bag: UpdateableBag) => void): void { + for (const bag of this.bags) { + updateFunc(bag) + if (bag.bucketsToAdd.size || bag.bucketsToRemove.size) { + this.modifiedBags.push(bag) + } + } + } + + public prepareExtrinsics(batchSize: number): ExtrinsicWithBagUpdates[] { + const { modifiedBags } = this + const chunkedBags = _.chunk(modifiedBags, batchSize) + const extrinsicsWithBagUpdates = chunkedBags.map( + (chunk) => + [ + this.api.tx.utility.forceBatch( + chunk.map((bag) => + this.api.tx.storage.updateStorageBucketsForBag( + bag.id, + createType('BTreeSet', bag.bucketsToAdd), + createType('BTreeSet', bag.bucketsToRemove) + ) + ) + ), + chunk.map((bag) => bag.toUpdateSummary()), + ] as ExtrinsicWithBagUpdates + ) + return extrinsicsWithBagUpdates + } + + private setStorageBucketsMap(entries: (readonly [number, UpdateableBucket])[]) { + this.storageBucketsMap = new Map( + // Sort entries to ensure deterministic results + entries.sort(([idA], [idB]) => idA - idB) + ) + + return this.storageBucketsMap + } + + private setBags(bags: UpdateableBag[]) { + this.bags = bags + // Sort entries to ensure deterministic results + .sort(({ id: idA }, { id: idB }) => cmpBagIds(idA, idB)) + + return this.bags + } +} + +export class BagsUpdateSummaryCreator { + private currentTxSummary?: TransactionSummary + private perBucketSummaries?: Map + private summary: FinalSummary + + constructor(private config: BagsUpdateSummaryCreatorConfig) { + this.summary = this.initSummary() + } + + private formatBigInt(value: bigint, sep = ' ') { + const maybeSign = value < 0 ? '-' : '' + return ( + maybeSign + + _.chunk(Array.from(value.toString().replace('-', '')).reverse(), 3) + .map((c) => c.reverse().join('')) + .reverse() + .join(sep) + ) + } + + private formatBeforeAfterStats( + stats: BeforeAfterStats, + unit = '', + decimals = 2 + ): string { + const change: number | bigint = stats.after - stats.before + const formatValue = (v: number | bigint, addSign = false) => + (addSign && v >= 0 ? '+' : '') + + (typeof v === 'bigint' ? this.formatBigInt(v) : v.toFixed(decimals)) + + (unit ? ` ${unit}` : '') + return `${formatValue(stats.before)} => ${formatValue(stats.after)} (${formatValue(change, true)})` + } + + public printExpectedResults(includeBuckets = true): void { + const { logger, replicationRate, storageBucketsMap } = this.config + const [storageUsageBefore, storageUnit] = asStorageSize(this.summary.totalStorageUsage.before) + const [storageUsageAfter] = asStorageSize( + this.summary.totalStorageUsage.before + + Array.from(storageBucketsMap.values()).reduce((sum, b) => (sum += b.storage.change), 0n), + storageUnit + ) + + let output = '\n' + + if (replicationRate) { + const replicationRateStats = { + before: replicationRate.initial, + after: replicationRate.target, + } + + output += `Avg. replication rate: ${this.formatBeforeAfterStats(replicationRateStats)}\n` + } + + const storageUsageStats = { + before: storageUsageBefore, + after: storageUsageAfter, + } + output += `Total storage usage (among selected buckets): ${this.formatBeforeAfterStats( + storageUsageStats, + storageUnit + )}\n` + + if (includeBuckets) { + for (const bucket of Array.from(storageBucketsMap.values())) { + const [storageUsageBefore, storageUsageUnit] = asStorageSize(bucket.storage.usedBefore) + const [storageUsageAfter] = asStorageSize(bucket.storage.usedAfter, storageUsageUnit) + + const [storageAvailBefore, storageAvailUnit] = asStorageSize(bucket.storage.availableBefore) + const [storageAvailAfter] = asStorageSize(bucket.storage.availableAfter, storageAvailUnit) + + const storageUsageStats = { before: storageUsageBefore, after: storageUsageAfter } + const storageAvailStats = { before: storageAvailBefore, after: storageAvailAfter } + const objectsUsageStats = { before: bucket.objects.usedBefore, after: bucket.objects.usedAfter } + const objectsAvailStats = { before: bucket.objects.availableBefore, after: bucket.objects.availableAfter } + + output += '\n' + output += `-- Bucket ${bucket.id}:\n` + if (bucket.bagsToAdd.size || bucket.bagsToRemove.size) { + output += `---- Storage usage: ${this.formatBeforeAfterStats(storageUsageStats, storageUsageUnit)}\n` + output += `---- Storage available: ${this.formatBeforeAfterStats(storageAvailStats, storageAvailUnit)}\n` + output += `---- Objects stored: ${this.formatBeforeAfterStats(objectsUsageStats)}\n` + output += `---- Objects limit remaining: ${this.formatBeforeAfterStats(objectsAvailStats)}\n` + output += `---- Bags to remove: ${bucket.bagsToRemove.size}\n` + output += `---- Bags to add: ${bucket.bagsToAdd.size}\n` + } else { + output += '---- NO CHANGES\n' + } + } + } + + logger.info(`Expected results:\n${output}\n`) + } + + private initSummary(): FinalSummary { + const { replicationRate, storageBucketsMap, skipBucketsSummary } = this.config + const initStorageUsage = Array.from(storageBucketsMap.values()).reduce((sum, b) => sum + b.storage.usedBefore, 0n) + const summary: FinalSummary = { + totalStorageUsage: { + before: initStorageUsage, + after: initStorageUsage, + }, + } + + if (replicationRate) { + summary.avgReplicationRate = { + before: replicationRate.initial, + after: replicationRate.initial, + } + } + + if (!skipBucketsSummary) { + this.perBucketSummaries = new Map() + for (const bucket of Array.from(storageBucketsMap.values())) { + this.perBucketSummaries.set(bucket.id, { + id: bucket.id, + storageUsed: { + before: bucket.storage.usedBefore, + after: bucket.storage.usedAfter, + }, + added: { totalSize: 0n, bags: [] }, + removed: { totalSize: 0n, bags: [] }, + failedToAdd: { totalSize: 0n, bags: [] }, + failedToRemove: { totalSize: 0n, bags: [] }, + }) + } + } + + return summary + } + + private initTxSummary(txHash: string): void { + const transactionSummary: TransactionSummary = { + hash: txHash, + totalStorageUsage: { + before: this.summary.totalStorageUsage.after, + after: this.summary.totalStorageUsage.after, + }, + failedUpdates: [], + successfulUpdates: [], + } + + if (this.summary.avgReplicationRate) { + transactionSummary.avgReplicationRate = { + before: this.summary.avgReplicationRate.after, + after: this.summary.avgReplicationRate.after, + } + } + + if (!this.summary.transactions) { + this.summary.transactions = [] + } + + this.summary.transactions.push(transactionSummary) + this.currentTxSummary = transactionSummary + } + + public handleSuccessfulBagUpdate(bagUpdate: BagUpdateSummary): void { + this.updateStorageUsage(this.summary.totalStorageUsage, bagUpdate) + if (this.summary.avgReplicationRate) { + this.updateAvgReplicationRate(this.summary.avgReplicationRate, bagUpdate) + } + if (this.currentTxSummary) { + this.currentTxSummary.successfulUpdates.push(bagUpdate) + } + this.updatePerBucketSummaries(bagUpdate) + } + + private updateBagsSummary(bagsSummary: BagsSummary, bagUpdate: BagUpdateSummary) { + bagsSummary.totalSize += bagUpdate.size + bagsSummary.bags.push({ id: bagUpdate.bagId, size: bagUpdate.size }) + } + + private updateBagsSummaryOfBucket( + bucketId: number, + type: 'added' | 'failedToAdd' | 'removed' | 'failedToRemove', + bagUpdate: BagUpdateSummary + ) { + if (this.perBucketSummaries) { + const bucketSummary = this.perBucketSummaries.get(bucketId) + if (bucketSummary) { + this.updateBagsSummary(bucketSummary[type], bagUpdate) + } + } + } + + private updatePerBucketSummaries(bagUpdate: BagUpdateSummary | FailedBagUpdate): void { + if (this.perBucketSummaries) { + if ('error' in bagUpdate) { + for (const bucketId of bagUpdate.bucketsToAdd) { + this.updateBagsSummaryOfBucket(bucketId, 'failedToAdd', bagUpdate) + } + for (const bucketId of bagUpdate.bucketsToRemove) { + this.updateBagsSummaryOfBucket(bucketId, 'failedToRemove', bagUpdate) + } + } else { + for (const bucketId of bagUpdate.bucketsToAdd) { + this.updateBagsSummaryOfBucket(bucketId, 'added', bagUpdate) + } + for (const bucketId of bagUpdate.bucketsToRemove) { + this.updateBagsSummaryOfBucket(bucketId, 'removed', bagUpdate) + } + } + } + } + + public handleFailedBagUpdate(failedBagUpdate: FailedBagUpdate): void { + if (this.currentTxSummary) { + this.currentTxSummary.failedUpdates.push(failedBagUpdate) + } + this.updatePerBucketSummaries(failedBagUpdate) + } + + private updateAvgReplicationRate(avgReplicationRate: BeforeAfterStats, bagUpdate: BagUpdateSummary) { + if (this.config.replicationRate) { + const { + replicationRate: { target: targetReplicationRate, totalBagsNum }, + } = this.config + const bagPreviousReplicationRate = + targetReplicationRate + bagUpdate.bucketsToRemove.length - bagUpdate.bucketsToAdd.length + avgReplicationRate.after -= bagPreviousReplicationRate / totalBagsNum + avgReplicationRate.after += targetReplicationRate / totalBagsNum + } + } + + private updateStorageUsage(stats: BeforeAfterStats, bagUpdate: BagUpdateSummary): void { + stats.after += bagUpdate.size * (BigInt(bagUpdate.bucketsToAdd.length) - BigInt(bagUpdate.bucketsToRemove.length)) + } + + public updateSummaryWithBatchResults( + txHash: string, + results: ParsedBatchCallResult[], + bagUpdates: BagUpdateSummary[] + ): void { + if (!this.config.skipTxSummary) { + this.initTxSummary(txHash) + } + + for (const i in results) { + const result = results[i] + const bagUpdate = bagUpdates[i] + if ('error' in result) { + this.handleFailedBagUpdate({ ...bagUpdate, error: result.error }) + } else { + this.handleSuccessfulBagUpdate(bagUpdate) + } + } + + if (this.currentTxSummary) { + if (this.currentTxSummary.avgReplicationRate && this.summary.avgReplicationRate) { + this.currentTxSummary.avgReplicationRate.after = this.summary.avgReplicationRate.after + } + this.currentTxSummary.totalStorageUsage.after = this.summary.totalStorageUsage.after + } + } + + private roundBeforeAfterStat(stat: BeforeAfterStats, precision = 2) { + stat.before = _.round(stat.before, precision) + stat.after = _.round(stat.after, precision) + } + + public getSummary(): FinalSummary { + if (this.perBucketSummaries) { + this.summary.buckets = Array.from(this.perBucketSummaries.values()) + } + if (this.summary.avgReplicationRate) { + this.roundBeforeAfterStat(this.summary.avgReplicationRate) + if (this.summary.transactions) { + for (const txSummary of this.summary.transactions) { + if (txSummary.avgReplicationRate) { + this.roundBeforeAfterStat(txSummary.avgReplicationRate) + } + } + } + } + + return this.summary + } + + public getSummaryJSON(): string { + return JSON.stringify( + this.getSummary(), + (key, value) => { + if (typeof value === 'bigint') { + return value.toString() + } + return value + }, + 2 + ) + } +} diff --git a/storage-node/src/services/helpers/storageSize.ts b/storage-node/src/services/helpers/storageSize.ts new file mode 100644 index 0000000000..6eeda4a972 --- /dev/null +++ b/storage-node/src/services/helpers/storageSize.ts @@ -0,0 +1,34 @@ +export const UNITS = { + 'B': 1, + 'KB': 1_000, + 'MB': 1_000_000, + 'GB': 1_000_000_000, + 'TB': 1_000_000_000_000, + 'PB': 1_000_000_000_000_000, // MAX_SAFE_INTEGER / 9.007 +} as const + +export type UnitType = keyof typeof UNITS + +/** + * Converts storage size in bytes (BigInt) to a most approperiate unit (MB / GB / TB etc.), + * such that the size expressed in this unit is < 1_000 and can be converted to a Number. + * Optionally the target unit can also be forced. + * + * @param bytes Number of bytes (BigInt) + * @param forcedUnit Optional: Target unit to force + * @param decimals Number of digits past the decimal point to include in the result. + * @returns [Number, Unit] tuple + */ +export function asStorageSize(bytes: bigint, forcedUnit?: UnitType, decimals = 2): [number, UnitType] { + const unitEntires = Object.entries(UNITS) + let targetUnit = unitEntires.find(([unit]) => unit === forcedUnit) + if (!targetUnit) { + let i = 0 + while (bytes / BigInt(unitEntires[i][1]) >= 1_000 && i < unitEntires.length - 1) { + i += 1 + } + targetUnit = unitEntires[i] + } + const decimalScaler = Math.min(targetUnit[1], Math.pow(10, decimals)) + return [Number(bytes / BigInt(targetUnit[1] / decimalScaler)) / decimalScaler, targetUnit[0] as UnitType] +} diff --git a/storage-node/src/services/runtime/api.ts b/storage-node/src/services/runtime/api.ts index 569bc733ea..5db9abd890 100644 --- a/storage-node/src/services/runtime/api.ts +++ b/storage-node/src/services/runtime/api.ts @@ -5,7 +5,7 @@ import { KeyringPair } from '@polkadot/keyring/types' import { TypeRegistry } from '@polkadot/types' import type { Index } from '@polkadot/types/interfaces/runtime' import { DispatchError } from '@polkadot/types/interfaces/system' -import { IEvent, ISubmittableResult } from '@polkadot/types/types' +import { Codec, IEvent, ISubmittableResult } from '@polkadot/types/types' import { formatBalance } from '@polkadot/util' import AwaitLock from 'await-lock' import stringify from 'fast-safe-stringify' @@ -246,3 +246,11 @@ export function getEvents< >(result: SubmittableResult, section: S, eventNames: M[]): EventType[] { return result.filterRecords(section, eventNames).map((e) => e.event as unknown as EventType) } + +export function isEvent< + S extends keyof ApiPromise['events'] & string, + M extends keyof ApiPromise['events'][S] & string, + EventData extends Codec[] = ApiPromise['events'][S][M] extends AugmentedEvent<'promise', infer T> ? T : Codec[] +>(event: IEvent, section: S, method: M): event is IEvent { + return event.section === section && event.method === method +} diff --git a/storage-node/src/services/runtime/extrinsics.ts b/storage-node/src/services/runtime/extrinsics.ts index 924009f4f8..2acc8346d9 100644 --- a/storage-node/src/services/runtime/extrinsics.ts +++ b/storage-node/src/services/runtime/extrinsics.ts @@ -6,7 +6,11 @@ import { timeout } from 'promise-timeout' import logger from '../../services/logger' import { parseBagId } from '../helpers/bagTypes' import { AcceptPendingDataObjectsParams } from '../sync/acceptPendingObjects' -import { formatDispatchError, getEvent, getEvents, sendAndFollowNamedTx } from './api' +import { formatDispatchError, getEvent, getEvents, isEvent, sendAndFollowNamedTx } from './api' +import { ISubmittableResult } from '@polkadot/types/types' +import { SubmittableExtrinsic } from '@polkadot/api/types' +import { Call } from '@polkadot/types/interfaces' +import { Vec } from '@polkadot/types-codec' /** * Creates storage bucket. @@ -107,29 +111,54 @@ export async function updateStorageBucketsForBags( const txBatch = batchFn(txs) failedCalls = await sendAndFollowNamedTx(api, account, txBatch, (result) => { - const [batchCompletedEvent] = getEvents(result, 'utility', ['BatchCompleted']) - if (batchCompletedEvent) { - return [] - } - - // find all the failed calls based on their index - const events = getEvents(result, 'utility', ['ItemCompleted', 'ItemFailed']) - return events - .map((e, i) => { - if (e.method === 'ItemFailed') { - return { - args: txs[i].args.toString(), - error: formatDispatchError(api, e.data[0]), - } - } - }) - .filter(Boolean) + const batchResults = getBatchResults(txBatch, api, result) + return batchResults.flatMap((r, i) => ('error' in r ? [{ args: txs[i].args.toString(), error: r.error }] : [])) }) }) return [success, failedCalls] } +export type ParsedBatchCallResult = { success: true } | { error: string } + +/** + * Extracts individual call results from an utility.(batch|forceBatch|batchAll) + * extrinsic result. + * + * @param tx The utility.(batch|forceBatch|batchAll) extrinsic + * @param api @polkadot/api instance + * @param result Extrinsic result + * + * @returns An array of parsed results + */ +export function getBatchResults( + tx: SubmittableExtrinsic<'promise'>, + api: ApiPromise, + result: ISubmittableResult +): ParsedBatchCallResult[] { + const callsNum = (tx.args[0] as Vec).length + let results: ParsedBatchCallResult[] = [] + for (const { event } of result.events) { + if (isEvent(event, 'utility', 'ItemFailed')) { + const [dispatchError] = event.data + results.push({ error: formatDispatchError(api, dispatchError) }) + } + if (isEvent(event, 'utility', 'ItemCompleted')) { + results.push({ success: true }) + } + } + if (results.length < callsNum) { + results = [ + ...results, + ...Array.from({ length: callsNum - results.length }, () => ({ + error: 'Interrupted', + })), + ] + } + + return results +} + /** * Accepts pending data objects by storage provider in batch transaction. * diff --git a/tests/network-tests/run-tests.sh b/tests/network-tests/run-tests.sh index e6aedbf9e6..9ff51076df 100755 --- a/tests/network-tests/run-tests.sh +++ b/tests/network-tests/run-tests.sh @@ -38,6 +38,7 @@ if [ "${NO_QN}" != true ]; then fi if [ "${NO_STORAGE}" != true ]; then + export CLEANUP_INTERVAL ./start-storage.sh fi diff --git a/tests/network-tests/src/fixtures/storage/CopyBagsFixture.ts b/tests/network-tests/src/fixtures/storage/CopyBagsFixture.ts new file mode 100644 index 0000000000..e4062c95eb --- /dev/null +++ b/tests/network-tests/src/fixtures/storage/CopyBagsFixture.ts @@ -0,0 +1,134 @@ +import _ from 'lodash' +import { PalletStorageStorageBucketRecord } from '@polkadot/types/lookup' +import { FinalSummary, Serialized } from 'storage-node/src/services/helpers/bagsUpdate' +import { BaseQueryNodeFixture } from '../../Fixture' +import { QueryNodeApi } from '../../QueryNodeApi' +import { Api } from '../../Api' +import { assert } from 'chai' +import { StorageCLI } from '../../cli/storage' + +export type CopyBagsFixtureParams = { + from: number[] + to: number[] + copies?: number + expectedStorageIncrease: number +} + +export class CopyBagsFixture extends BaseQueryNodeFixture { + private bucketIds: number[] = [] + + constructor(public api: Api, public query: QueryNodeApi, private params: CopyBagsFixtureParams) { + super(api, query) + } + + private async setupStorageCLI() { + const leaderKey = await this.api.getLeadRoleKey('storageWorkingGroup') + const cli = new StorageCLI(this.api.getSuri(leaderKey)) + return cli + } + + private async checkResultAgainsExpectations( + result: Serialized, + storageBucketsBefore: (readonly [number, PalletStorageStorageBucketRecord])[] + ) { + const { from, to, copies = 1, expectedStorageIncrease } = this.params + const expectedStorageUsageBefore = _.sumBy(storageBucketsBefore, ([, b]) => b.voucher.sizeUsed.toNumber()) + const expectedStorageUsageAfter = expectedStorageUsageBefore + expectedStorageIncrease + assert.equal(result.totalStorageUsage.before, expectedStorageUsageBefore.toString()) + assert.equal(result.totalStorageUsage.after, expectedStorageUsageAfter.toString()) + assert(result.transactions) + // Expecting 1 batch transaction only! + const expectedNumUpdates = _.sumBy(storageBucketsBefore, ([id, bucket]) => + from.includes(id) ? bucket.assignedBags.toNumber() : 0 + ) + assert.equal(result.transactions[0].failedUpdates.length, 0) + assert.equal(result.transactions[0].successfulUpdates.length, expectedNumUpdates) + assert.equal(result.transactions[0].totalStorageUsage.before, expectedStorageUsageBefore.toString()) + assert.equal(result.transactions[0].totalStorageUsage.after, expectedStorageUsageAfter.toString()) + const resultBuckets = result.buckets + assert(resultBuckets) + assert.sameMembers( + resultBuckets.map((b) => b.id), + [...from, ...to] + ) + for (const bucketId of from) { + const [, bucketBefore] = storageBucketsBefore.find(([id]) => id === bucketId) || [] + const resultBucket = resultBuckets.find(({ id }) => id === bucketId) + assert(bucketBefore) + assert(resultBucket) + assert.equal(resultBucket.storageUsed.before, bucketBefore.voucher.sizeUsed.toString()) + assert.equal(resultBucket.storageUsed.after, bucketBefore.voucher.sizeUsed.toString()) + assert.equal(resultBucket.added.bags.length, 0) + assert.equal(resultBucket.added.totalSize, '0') + assert.equal(resultBucket.failedToAdd.bags.length, 0) + assert.equal(resultBucket.failedToAdd.totalSize, '0') + assert.equal(resultBucket.failedToRemove.bags.length, 0) + assert.equal(resultBucket.failedToRemove.totalSize, '0') + assert.equal(resultBucket.removed.bags.length, 0) + assert.equal(resultBucket.removed.totalSize, '0') + } + for (const bucketId of to) { + const [, bucketBefore] = storageBucketsBefore.find(([id]) => id === bucketId) || [] + const resultBucket = resultBuckets.find(({ id }) => id === bucketId) + assert(bucketBefore) + assert(resultBucket) + assert.equal(resultBucket.storageUsed.before, bucketBefore.voucher.sizeUsed.toString()) + assert.isAtLeast(parseInt(resultBucket.storageUsed.after), parseInt(resultBucket.storageUsed.before)) + assert.isAtLeast(resultBucket.added.bags.length, 0) + assert.isAtLeast(parseInt(resultBucket.added.totalSize), 0) + assert.equal(resultBucket.failedToAdd.bags.length, 0) + assert.equal(resultBucket.failedToAdd.totalSize, '0') + assert.equal(resultBucket.failedToRemove.bags.length, 0) + assert.equal(resultBucket.failedToRemove.totalSize, '0') + assert.equal(resultBucket.removed.bags.length, 0) + assert.equal(resultBucket.removed.totalSize, '0') + assert.equal(resultBucket.added.totalSize, _.sum(resultBucket.added.bags.map((b) => parseInt(b.size))).toString()) + } + assert.equal( + resultBuckets.reduce((a, b) => a + b.added.bags.length, 0), + expectedNumUpdates * copies + ) + assert.equal( + resultBuckets.reduce((a, b) => a + parseInt(b.added.totalSize), 0), + expectedStorageIncrease + ) + } + + private async checkResultAgainstNewChainState(result: Serialized): Promise { + const storageBucketsAfter = await this.fetchBuckets() + + assert.equal( + result.totalStorageUsage.after, + storageBucketsAfter.reduce((sum, [, bucket]) => sum + bucket.voucher.sizeUsed.toBigInt(), 0n).toString() + ) + + for (const [bucketId, bucket] of storageBucketsAfter) { + const resultBucket = result.buckets?.find((b) => b.id === bucketId) + assert(resultBucket) + assert.equal(resultBucket.storageUsed.after, bucket.voucher.sizeUsed.toString()) + } + } + + private async fetchBuckets(): Promise<[number, PalletStorageStorageBucketRecord][]> { + return (await this.api.query.storage.storageBucketById.multi(this.bucketIds)).map( + (b, i) => [this.bucketIds[i], b.unwrap()] as [number, PalletStorageStorageBucketRecord] + ) + } + + /* + Execute this Fixture. + */ + public async execute(): Promise { + const { from, to, copies = 1 } = this.params + this.bucketIds = [...from, ...to] + + const storageBucketsBefore = await this.fetchBuckets() + const storageCli = await this.setupStorageCLI() + const flags = ['--from', from.join(' '), '--to', to.join(' '), '--copies', copies.toString(), '--skipConfirmation'] + const { out } = await storageCli.run('leader:copy-bags', flags) + const result: Serialized = JSON.parse(out) + + await this.checkResultAgainsExpectations(result, storageBucketsBefore) + await this.checkResultAgainstNewChainState(result) + } +} diff --git a/tests/network-tests/src/fixtures/storage/EmptyBucketFixture.ts b/tests/network-tests/src/fixtures/storage/EmptyBucketFixture.ts new file mode 100644 index 0000000000..d1494f30e1 --- /dev/null +++ b/tests/network-tests/src/fixtures/storage/EmptyBucketFixture.ts @@ -0,0 +1,80 @@ +import { PalletStorageStorageBucketRecord } from '@polkadot/types/lookup' +import { FinalSummary, Serialized } from 'storage-node/src/services/helpers/bagsUpdate' +import { BaseQueryNodeFixture } from '../../Fixture' +import { QueryNodeApi } from '../../QueryNodeApi' +import { Api } from '../../Api' +import { assert } from 'chai' +import { StorageCLI } from '../../cli/storage' + +export type EmptyBucketFixtureParams = { + id: number +} + +export class EmptyBucketFixture extends BaseQueryNodeFixture { + constructor(public api: Api, public query: QueryNodeApi, private params: EmptyBucketFixtureParams) { + super(api, query) + } + + private async setupStorageCLI() { + const leaderKey = await this.api.getLeadRoleKey('storageWorkingGroup') + const cli = new StorageCLI(this.api.getSuri(leaderKey)) + return cli + } + + private async checkResultAgainsExpectations( + result: Serialized, + storageBucketBefore: PalletStorageStorageBucketRecord + ) { + const expectedStorageUsageBefore = storageBucketBefore.voucher.sizeUsed.toNumber() + assert.equal(result.totalStorageUsage.before, expectedStorageUsageBefore.toString()) + assert.equal(result.totalStorageUsage.after, '0') + assert(result.transactions) + // Expecting 1 batch transaction only! + const expectedNumUpdates = storageBucketBefore.assignedBags.toNumber() + assert.equal(result.transactions[0].failedUpdates.length, 0) + assert.equal(result.transactions[0].successfulUpdates.length, expectedNumUpdates) + assert.equal(result.transactions[0].totalStorageUsage.before, expectedStorageUsageBefore.toString()) + assert.equal(result.transactions[0].totalStorageUsage.after, '0') + assert(result.buckets) + assert.equal(result.buckets.length, 1) + const [resultBucket] = result.buckets + assert.equal(resultBucket.id, this.params.id) + assert.equal(resultBucket.storageUsed.before, expectedStorageUsageBefore.toString()) + assert.equal(resultBucket.storageUsed.after, '0') + assert.equal(resultBucket.added.bags.length, 0) + assert.equal(resultBucket.added.totalSize, '0') + assert.equal(resultBucket.failedToAdd.bags.length, 0) + assert.equal(resultBucket.failedToAdd.totalSize, '0') + assert.equal(resultBucket.failedToRemove.bags.length, 0) + assert.equal(resultBucket.failedToRemove.totalSize, '0') + assert.equal(resultBucket.removed.bags.length, expectedNumUpdates) + assert.equal(resultBucket.removed.totalSize, expectedStorageUsageBefore.toString()) + } + + private async checkResultAgainstNewChainState(result: Serialized): Promise { + const storageBucketAfter = await this.fetchBucket() + + assert.equal(result.totalStorageUsage.after, storageBucketAfter.voucher.sizeUsed.toString()) + assert(result.buckets) + assert.equal(result.buckets[0].storageUsed.after, storageBucketAfter.voucher.sizeUsed.toString()) + } + + private async fetchBucket(): Promise { + return (await this.api.query.storage.storageBucketById(this.params.id)).unwrap() + } + + /* + Execute this Fixture. + */ + public async execute(): Promise { + const { id } = this.params + const storageBucketBefore = await this.fetchBucket() + const storageCli = await this.setupStorageCLI() + const flags = ['--id', id.toString(), '--skipConfirmation'] + const { out } = await storageCli.run('leader:empty-bucket', flags) + const result: Serialized = JSON.parse(out) + + await this.checkResultAgainsExpectations(result, storageBucketBefore) + await this.checkResultAgainstNewChainState(result) + } +} diff --git a/tests/network-tests/src/fixtures/storage/GenerateChannelAssetsFixture.ts b/tests/network-tests/src/fixtures/storage/GenerateChannelAssetsFixture.ts new file mode 100644 index 0000000000..83d00c5545 --- /dev/null +++ b/tests/network-tests/src/fixtures/storage/GenerateChannelAssetsFixture.ts @@ -0,0 +1,127 @@ +import assert from 'assert' +import { ChannelCreationInputParameters } from '@joystream/cli/src/Types' +import { MemberId } from '@joystream/types/primitives' +import { BaseQueryNodeFixture, FixtureRunner } from '../../Fixture' +import { JoystreamCLI } from '../../cli/joystream' +import { QueryNodeApi } from '../../QueryNodeApi' +import { Api } from '../../Api' +import { BuyMembershipHappyCaseFixture } from '../membership' +import { Utils } from '../../utils' +import { ChannelFieldsFragment } from '../../graphql/generated/queries' +import { verifyAssets, VerifyAssetsInput } from './utils' + +export type GenerateAssetsFixtureParams = { + numberOfChannels: number + avatarGenerator?: (i: number) => string + coverGenerator?: (i: number) => string +} + +export type CreatedChannelData = { + id: number + coverPhotoPath: string + avatarPhotoPath: string + qnData?: ChannelFieldsFragment +} + +export class GenerateAssetsFixture extends BaseQueryNodeFixture { + private _channelsCreated: CreatedChannelData[] = [] + + constructor( + public api: Api, + public query: QueryNodeApi, + public cli: JoystreamCLI, + private params: GenerateAssetsFixtureParams + ) { + super(api, query) + } + + public get channelsCreated(): CreatedChannelData[] { + assert(this._channelsCreated.length, 'Trying to retrieve channelsCreated before any results are available') + return this._channelsCreated + } + + private async setupChannelOwner() { + const { api, query } = this + // Create a member that will create the channels + const [memberKeyPair] = await api.createKeyPairs(1) + const memberAddr = memberKeyPair.key.address + const buyMembershipFixture = new BuyMembershipHappyCaseFixture(api, query, [memberAddr]) + await new FixtureRunner(buyMembershipFixture).run() + const [memberId] = buyMembershipFixture.getCreatedMembers() + + // Give member 10 JOY per channel, to be able to pay the fees + await api.treasuryTransferBalance(memberAddr, Utils.joy(10)) + + // Import the member controller key to CLI + this.cli.importAccount(memberKeyPair.key) + + return memberId + } + + /* + Execute this Fixture. + */ + public async execute(): Promise { + this.debug('Setting up channel owner') + const memberId = await this.setupChannelOwner() + + this.debug('Creating channels') + this._channelsCreated = await this.createChannels(memberId) + } + + private defaultAvatarGenerator(i: number): string { + return this.cli.getTmpFileManager().randomImgFile(300, 300) + } + + private defaultCoverGenerator(i: number): string { + return this.cli.getTmpFileManager().randomImgFile(1920, 500) + } + + /** + Generates the channels. + */ + private async createChannels(memberId: MemberId): Promise { + const { avatarGenerator: customAvatarGenerator, coverGenerator: customCoverGenerator } = this.params + const avatarGenerator = customAvatarGenerator || this.defaultAvatarGenerator.bind(this) + const coverGenerator = customCoverGenerator || this.defaultCoverGenerator.bind(this) + const channelsData: CreatedChannelData[] = [] + for (let i = 0; i < this.params.numberOfChannels; ++i) { + const avatarPhotoPath = avatarGenerator(i) + const coverPhotoPath = coverGenerator(i) + const channelInput: ChannelCreationInputParameters = { + title: `GenerateAssetsFixture channel ${i + 1}`, + avatarPhotoPath, + coverPhotoPath, + } + const channelId = await this.cli.createChannel(channelInput, [ + '--context', + 'Member', + '--useMemberId', + memberId.toString(), + ]) + this.debug(`Created channel ${i + 1} / ${this.params.numberOfChannels}`) + channelsData.push({ id: channelId, avatarPhotoPath, coverPhotoPath }) + } + return channelsData + } + + public async runQueryNodeChecks(): Promise { + await super.runQueryNodeChecks() + const { query, channelsCreated } = this + await query.tryQueryWithTimeout( + () => query.channelsByIds(channelsCreated.map(({ id }) => id.toString())), + (r) => { + this.channelsCreated.forEach((channelData) => { + const qnData = r.find((c) => c.id === channelData.id.toString()) + Utils.assert(qnData, `Cannot find channel ${channelData.id} in the query node`) + Utils.assert(qnData.avatarPhoto && qnData.coverPhoto, `Missing some assets in channel ${channelData.id}`) + channelData.qnData = qnData + }) + } + ) + } + + public verifyAssets = async (inputs: VerifyAssetsInput[], retryTime = 10_000, maxRetries = 18): Promise => { + await verifyAssets(inputs, this.channelsCreated, retryTime, maxRetries) + } +} diff --git a/tests/network-tests/src/fixtures/storage/SetReplicationRateFixture.ts b/tests/network-tests/src/fixtures/storage/SetReplicationRateFixture.ts new file mode 100644 index 0000000000..a48440d483 --- /dev/null +++ b/tests/network-tests/src/fixtures/storage/SetReplicationRateFixture.ts @@ -0,0 +1,127 @@ +import _ from 'lodash' +import { PalletStorageStorageBucketRecord } from '@polkadot/types/lookup' +import { FinalSummary, Serialized } from 'storage-node/src/services/helpers/bagsUpdate' +import { BaseQueryNodeFixture } from '../../Fixture' +import { QueryNodeApi } from '../../QueryNodeApi' +import { Api } from '../../Api' +import { assert } from 'chai' +import { StorageCLI } from '../../cli/storage' + +export type SetReplicationRateFixtureParams = { + oldRate: number + newRate: number + expectedNumUpdates: number + expectedBuckets: { + id: number + removed: { id: string; size: bigint }[] + added: { id: string; size: bigint }[] + }[] +} + +export class SetReplicationRateFixture extends BaseQueryNodeFixture { + constructor(public api: Api, public query: QueryNodeApi, private params: SetReplicationRateFixtureParams) { + super(api, query) + } + + private async setupStorageCLI() { + const leaderKey = await this.api.getLeadRoleKey('storageWorkingGroup') + const cli = new StorageCLI(this.api.getSuri(leaderKey)) + return cli + } + + private async checkResultAgainsExpectations( + result: Serialized, + storageBucketsBefore: (readonly [number, PalletStorageStorageBucketRecord])[] + ) { + const { oldRate, newRate, expectedNumUpdates, expectedBuckets } = this.params + + const expectedStorageUsageBefore = storageBucketsBefore.reduce((a, [, b]) => a + b.voucher.sizeUsed.toBigInt(), 0n) + const expectedStorageUsageAfter = (Number(expectedStorageUsageBefore) * (newRate / oldRate)).toString() + + assert.equal(result.avgReplicationRate?.before, oldRate) + assert.equal(result.avgReplicationRate?.after, newRate) + assert.equal(result.totalStorageUsage.before.toString(), expectedStorageUsageBefore.toString()) + assert.equal(result.totalStorageUsage.after.toString(), expectedStorageUsageAfter.toString()) + assert(result.transactions) + // Expecting 1 batch transaction only! + assert.equal(result.transactions[0].avgReplicationRate?.before, oldRate) + assert.equal(result.transactions[0].avgReplicationRate?.after, newRate) + assert.equal(result.transactions[0].failedUpdates.length, 0) + assert.equal(result.transactions[0].successfulUpdates.length, expectedNumUpdates) + assert.equal(result.transactions[0].totalStorageUsage.before.toString(), expectedStorageUsageBefore.toString()) + assert.equal(result.transactions[0].totalStorageUsage.after.toString(), expectedStorageUsageAfter.toString()) + const resultBuckets = result.buckets + assert(resultBuckets) + assert.equal(resultBuckets.length, expectedBuckets.length) + for (const i of resultBuckets.keys()) { + const [, bucketBefore] = storageBucketsBefore.find(([id]) => id === resultBuckets[i].id) || [] + const expectedBucket = expectedBuckets.find((b) => b.id === resultBuckets[i].id) + assert(bucketBefore) + assert(expectedBucket) + assert.equal(resultBuckets[i].storageUsed.before.toString(), bucketBefore.voucher.sizeUsed.toString()) + assert.sameDeepMembers( + resultBuckets[i].added.bags, + expectedBucket.added.map((v) => ({ ...v, size: v.size.toString() })) + ) + assert.sameDeepMembers( + resultBuckets[i].removed.bags, + expectedBucket.removed.map((v) => ({ ...v, size: v.size.toString() })) + ) + assert.equal( + resultBuckets[i].storageUsed.after.toString(), + ( + bucketBefore.voucher.sizeUsed.toBigInt() - + expectedBucket.removed.reduce((sum, b) => sum + b.size, 0n) + + expectedBucket.added.reduce((sum, b) => sum + b.size, 0n) + ).toString() + ) + assert.equal(resultBuckets[i].failedToAdd.bags.length, 0) + assert.equal(resultBuckets[i].failedToRemove.bags.length, 0) + } + } + + private async checkResultAgainstNewChainState(result: Serialized): Promise { + const storageBucketsAfter = (await this.api.query.storage.storageBucketById.entries()).map(([sKey, bucket]) => { + return [sKey.args[0].toNumber(), bucket.unwrap()] as const + }) + const storageBagsAfter = (await this.api.query.storage.bags.entries()).map(([sKey, bag]) => { + return [sKey.args[0], bag] as const + }) + + assert(result.avgReplicationRate) + assert.closeTo( + result.avgReplicationRate.after, + _.meanBy(storageBagsAfter, ([, bag]) => bag.storedBy.size), + 1e-6 + ) + assert.equal( + result.totalStorageUsage.after, + storageBucketsAfter.reduce((sum, [, bucket]) => sum + bucket.voucher.sizeUsed.toBigInt(), 0n).toString() + ) + + for (const [bucketId, bucket] of storageBucketsAfter) { + const resultBucket = result.buckets?.find((b) => b.id === bucketId) + assert(resultBucket) + assert.equal(resultBucket.storageUsed.after, bucket.voucher.sizeUsed.toString()) + } + } + + /* + Execute this Fixture. + */ + public async execute(): Promise { + const { newRate } = this.params + + const storageBucketsBefore = (await this.api.query.storage.storageBucketById.entries()).map(([sKey, bucket]) => { + return [sKey.args[0].toNumber(), bucket.unwrap()] as const + }) + + const storageCli = await this.setupStorageCLI() + const flags = ['--rate', newRate.toString(), '--skipConfirmation'] + const { out } = await storageCli.run('leader:set-replication', flags) + const result: Serialized = JSON.parse(out) + + await this.checkResultAgainsExpectations(result, storageBucketsBefore) + await this.checkResultAgainstNewChainState(result) + } +} diff --git a/tests/network-tests/src/fixtures/storage/utils.ts b/tests/network-tests/src/fixtures/storage/utils.ts new file mode 100644 index 0000000000..0062f068b0 --- /dev/null +++ b/tests/network-tests/src/fixtures/storage/utils.ts @@ -0,0 +1,42 @@ +import { readFileSync } from 'fs' +import { Utils } from '../../utils' +import { ColossusApi } from '../../../ColossusApi' +import { CreatedChannelData } from './GenerateChannelAssetsFixture' + +export type VerifyAssetsInput = { + api: ColossusApi + channelIds: number[] +} + +export const verifyAssets = async ( + inputs: VerifyAssetsInput[], + channelsData: CreatedChannelData[], + retryTime = 10_000, + maxRetries = 18 +): Promise => { + await Utils.until( + `assets stored by Colossus nodes match expectations`, + async () => { + const verifyAssetsPromises = channelsData.map(async ({ id, avatarPhotoPath, coverPhotoPath, qnData }) => { + Utils.assert(qnData && qnData.avatarPhoto && qnData.coverPhoto) + for (const { api: colossusApi, channelIds: expectedChannelIds } of inputs) { + if (expectedChannelIds.includes(id)) { + await Promise.all([ + colossusApi.fetchAndVerifyAsset(qnData.coverPhoto.id, readFileSync(coverPhotoPath), 'image/bmp'), + colossusApi.fetchAndVerifyAsset(qnData.avatarPhoto.id, readFileSync(avatarPhotoPath), 'image/bmp'), + ]) + } else { + await Promise.all([ + colossusApi.expectAssetNotFound(qnData.coverPhoto.id), + colossusApi.expectAssetNotFound(qnData.avatarPhoto.id), + ]) + } + } + }) + await Promise.all(verifyAssetsPromises) + return true + }, + retryTime, + maxRetries + ) +} diff --git a/tests/network-tests/src/flows/storage/copyAndEmptyBuckets.ts b/tests/network-tests/src/flows/storage/copyAndEmptyBuckets.ts new file mode 100644 index 0000000000..daa6a707c5 --- /dev/null +++ b/tests/network-tests/src/flows/storage/copyAndEmptyBuckets.ts @@ -0,0 +1,167 @@ +import urljoin from 'url-join' +import { assert } from 'chai' +import { FlowProps } from '../../Flow' +import { extendDebug } from '../../Debugger' +import { FixtureRunner } from '../../Fixture' +import { ColossusApi } from '../../../ColossusApi' +import { doubleBucketConfig } from './initStorage' +import { createJoystreamCli } from '../utils' +import { CreatedChannelData, GenerateAssetsFixture } from '../../fixtures/storage/GenerateChannelAssetsFixture' +import { CopyBagsFixture } from '../../fixtures/storage/CopyBagsFixture' +import { verifyAssets } from '../../fixtures/storage/utils' +import { EmptyBucketFixture } from '../../fixtures/storage/EmptyBucketFixture' +import { Utils } from '../../utils' + +export async function copyAndEmptyBuckets({ api, query }: FlowProps): Promise { + const debug = extendDebug('flow:copyAndEmptyBuckets') + api.enableDebugTxLogs() + debug('Started') + + // Get storage leader key + const [, storageLeader] = await api.getLeader('storageWorkingGroup') + const storageLeaderKey = storageLeader.roleAccountId.toString() + + // Check preconditions: + const activeStorageBuckets = (await api.query.storage.storageBucketById.entries()) + .filter(([, b]) => b.unwrap().operatorStatus.isStorageWorker) + .map(([sKey, bucket]) => [sKey.args[0].toNumber(), bucket.unwrap()] as const) + const channelBagPolicies = await api.query.storage.dynamicBagCreationPolicies('Channel') + assert.equal(channelBagPolicies.numberOfStorageBuckets.toNumber(), 2) + assert.equal(activeStorageBuckets.length, 2) + assert.sameMembers( + activeStorageBuckets.map(([id]) => id), + [0, 1] + ) + debug('Preconditions OK') + + // Update number of storage buckets in dynamic bag creation policy to 1 + debug('Updating channel bag creation policy (1)...') + const updateDynamicBagPolicyTx = api.tx.storage.updateNumberOfStorageBucketsInDynamicBagCreationPolicy('Channel', 1) + await api.sendExtrinsicsAndGetResults([updateDynamicBagPolicyTx], storageLeaderKey) + + // Initialize Joystream CLI + const joystreamCli = await createJoystreamCli() + + // Create colossus APIs + const bucketIds = [0, 1] + const colossusApis = bucketIds.map((bucketId) => { + const colossusEndpoint = doubleBucketConfig.buckets[bucketId].metadata.endpoint + assert(colossusEndpoint, `Missing colossus endpoint for bucket ${bucketId}`) + const colossusApi = new ColossusApi(urljoin(colossusEndpoint, 'api/v1')) + return colossusApi + }) + + // Empty existing buckets + debug('Emptying existing buckets...') + for (const bucketId of bucketIds) { + const emptyBucketFixture = new EmptyBucketFixture(api, query, { id: bucketId }) + await new FixtureRunner(emptyBucketFixture).runWithQueryNodeChecks() + } + + // Generate different assets in both buckets + const NUM_CHANNELS_PER_BUCKET = 3 + const allChannelsData: CreatedChannelData[] = [] + const allChannelIds: number[] = [] + let singleChannelAssetsSize = 0 + for (const bucketId of bucketIds) { + debug(`Generating new assets for bucket ${bucketId}...`) + // Prevent the other bucket(s) from accepting the bags + debug(`Disabling other buckets...`) + const disableOtherBucketsExtrinsics = bucketIds.map((otherBucketId) => + api.tx.storage.updateStorageBucketStatus(otherBucketId, bucketId === otherBucketId) + ) + await api.sendExtrinsicsAndGetResults(disableOtherBucketsExtrinsics, storageLeaderKey) + // Waiting till query node syncs the update... + await Utils.until('QN syncs `acceptingNewBags` changes', async () => { + const buckets = await query.storageBucketsForNewChannel() + return buckets.length === 1 && buckets[0].id === bucketId.toString() + }) + const generateAssetsFixture = new GenerateAssetsFixture(api, query, joystreamCli, { + numberOfChannels: NUM_CHANNELS_PER_BUCKET, + }) + // Proceed to create channels + debug(`Creating channels...`) + await new FixtureRunner(generateAssetsFixture).runWithQueryNodeChecks() + const channelsData = generateAssetsFixture.channelsCreated + const channelIds = channelsData.map((c) => c.id).sort() + singleChannelAssetsSize = + parseInt(channelsData[0].qnData?.avatarPhoto?.size || '0') + + parseInt(channelsData[0].qnData?.coverPhoto?.size || '0') + assert(singleChannelAssetsSize > 0) + allChannelIds.push(...channelIds) + allChannelsData.push(...channelsData) + debug(`Verifying assets in Colossus nodes...`) + await generateAssetsFixture.verifyAssets( + colossusApis.map((colossusApi, apiBucketId) => ({ + api: colossusApi, + channelIds: apiBucketId === bucketId ? channelIds : [], + })) + ) + } + + // Re-enable all buckets + debug(`Re-enabling all buckets...`) + const reenableBucketsExtrinsics = bucketIds.map((bucketId) => + api.tx.storage.updateStorageBucketStatus(bucketId, true) + ) + await api.sendExtrinsicsAndGetResults(reenableBucketsExtrinsics, storageLeaderKey) + + // Copy bags from bucket 0 to bucket 1 + debug(`Copying bags from bucket 0 to bucket 1...`) + const copyBagsFixture1 = new CopyBagsFixture(api, query, { + from: [0], + to: [1], + expectedStorageIncrease: singleChannelAssetsSize * NUM_CHANNELS_PER_BUCKET, + }) + await new FixtureRunner(copyBagsFixture1).runWithQueryNodeChecks() + + // Verify assets after copying + debug(`Verifying assets in Colossus nodes...`) + await verifyAssets( + [ + { api: colossusApis[0], channelIds: allChannelIds.slice(0, NUM_CHANNELS_PER_BUCKET) }, + { api: colossusApis[1], channelIds: allChannelIds }, + ], + allChannelsData + ) + + // Empty bucket 0 + debug(`Emptying bucket 0...`) + const emptyBucket0Fixture = new EmptyBucketFixture(api, query, { id: 0 }) + await new FixtureRunner(emptyBucket0Fixture).runWithQueryNodeChecks() + + // Verify assets after emptying bucket 0 + debug(`Verifying assets in Colossus nodes...`) + await verifyAssets( + [ + { api: colossusApis[0], channelIds: [] }, + { api: colossusApis[1], channelIds: allChannelIds }, + ], + allChannelsData + ) + + // Copy bags from bucket 1 to bucket 0 + debug(`Copying bags from bucket 1 to bucket 0...`) + const copyBagsFixture2 = new CopyBagsFixture(api, query, { + from: [1], + to: [0], + expectedStorageIncrease: singleChannelAssetsSize * NUM_CHANNELS_PER_BUCKET * 2, + }) + await new FixtureRunner(copyBagsFixture2).runWithQueryNodeChecks() + + // Verify assets after copying + debug(`Verifying assets in Colossus nodes...`) + await verifyAssets( + [ + { api: colossusApis[0], channelIds: allChannelIds }, + { api: colossusApis[1], channelIds: allChannelIds }, + ], + allChannelsData + ) + + debug(`All OK, restoring previous dynamic bag policy...`) + const restoreDynamicBagPolicyTx = api.tx.storage.updateNumberOfStorageBucketsInDynamicBagCreationPolicy('Channel', 2) + await api.sendExtrinsicsAndGetResults([restoreDynamicBagPolicyTx], storageLeaderKey) + + debug('Done') +} diff --git a/tests/network-tests/src/flows/storage/setReplicationRate.ts b/tests/network-tests/src/flows/storage/setReplicationRate.ts new file mode 100644 index 0000000000..0375dc4bf5 --- /dev/null +++ b/tests/network-tests/src/flows/storage/setReplicationRate.ts @@ -0,0 +1,182 @@ +import fs from 'fs/promises' +import urljoin from 'url-join' +import _ from 'lodash' +import { assert } from 'chai' +import { stringifyBagId } from 'storage-node/src/services/helpers/bagTypes' +import { FlowProps } from '../../Flow' +import { extendDebug } from '../../Debugger' +import { FixtureRunner } from '../../Fixture' +import { Utils } from '../../utils' +import { ColossusApi } from '../../../ColossusApi' +import { doubleBucketConfig } from './initStorage' +import { createJoystreamCli } from '../utils' +import { GenerateAssetsFixture } from '../../fixtures/storage/GenerateChannelAssetsFixture' +import { SetReplicationRateFixture } from '../../fixtures/storage/SetReplicationRateFixture' + +export async function setReplicationRate({ api, query }: FlowProps): Promise { + const debug = extendDebug('flow:setReplicationRate') + api.enableDebugTxLogs() + debug('Started') + + // Get storage leader key + const [, storageLeader] = await api.getLeader('storageWorkingGroup') + const storageLeaderKey = storageLeader.roleAccountId.toString() + + // Check preconditions: + const activeStorageBuckets = (await api.query.storage.storageBucketById.entries()) + .filter(([, b]) => b.unwrap().operatorStatus.isStorageWorker) + .map(([sKey, bucket]) => [sKey.args[0].toNumber(), bucket.unwrap()] as const) + const channelBagPolicies = await api.query.storage.dynamicBagCreationPolicies('Channel') + assert.equal(channelBagPolicies.numberOfStorageBuckets.toNumber(), 2) + assert.equal(activeStorageBuckets.length, 2) + debug('Preconditions OK') + + // Initialize Joystream CLI + const joystreamCli = await createJoystreamCli() + + // Generate assets + const NUM_CHANNELS = 3 + const generateAssetsFixture = new GenerateAssetsFixture(api, query, joystreamCli, { numberOfChannels: NUM_CHANNELS }) + await new FixtureRunner(generateAssetsFixture).runWithQueryNodeChecks() + const channelsData = generateAssetsFixture.channelsCreated + const channelIds = channelsData.map((c) => c.id).sort() + const singleChannelAssetsSize = + (await fs.stat(channelsData[0].avatarPhotoPath)).size + (await fs.stat(channelsData[0].coverPhotoPath)).size + + // Verify that both storage nodes store all assets of the created channels + const colossus1Endpoint = doubleBucketConfig.buckets[0].metadata.endpoint + const colossus2Endpoint = doubleBucketConfig.buckets[1].metadata.endpoint + Utils.assert(colossus1Endpoint && colossus2Endpoint, 'Missing one of 2 colossus node endpoints!') + + const colossus1Api = new ColossusApi(urljoin(colossus1Endpoint, 'api/v1')) + const colossus2Api = new ColossusApi(urljoin(colossus2Endpoint, 'api/v1')) + + debug('Checking if both storage nodes store all assets...') + await generateAssetsFixture.verifyAssets([ + { api: colossus1Api, channelIds }, + { api: colossus2Api, channelIds }, + ]) + + // Adjust vouchers so that + // 1. bucket0 (colossus1) has `singleChannelAssetsSize - 1` available space + // 2. bucket1 (colossus2) has 0 available space + + // This would cause the channel bags to be REMOVED in the following order when setting replication rate to 1: + // from bucket1, from bucket0, from bucket1 ... + + // After the change: + // 1. bucket0 will have `singleChannelAssetsSize * (floor(NUM_CHANNELS / 2) + 1) - 1` available space + // 2. bucket1 will have `singleChannelAssetsSize * (floor(NUM_CHANNELS / 2) + NUM_CHANNELS % 2)` available space + + // Assuming NUM_CHANNELS % 2 === 1, this would cause the channel bags to be ADDED in the following order + // when setting replication rate back to 2: + // to bucket1, to bucket0, to bucket1 ... + + debug('Updating storage bucket voucher limits...') + const updateLimitTxs = [ + api.tx.storage.setStorageBucketVoucherLimits( + 0, + singleChannelAssetsSize * (NUM_CHANNELS + 1) - 1, + await api.query.storage.voucherMaxObjectsNumberLimit() + ), + api.tx.storage.setStorageBucketVoucherLimits( + 1, + singleChannelAssetsSize * NUM_CHANNELS, + await api.query.storage.voucherMaxObjectsNumberLimit() + ), + ] + await api.sendExtrinsicsAndGetResults(updateLimitTxs, storageLeaderKey) + + // Update number of storage buckets in dynamic bag creation policy to 1 + debug('Updating channel bag creation policy (1)...') + const updateDynamicBagPolicyTx = api.tx.storage.updateNumberOfStorageBucketsInDynamicBagCreationPolicy('Channel', 1) + await api.sendExtrinsicsAndGetResults([updateDynamicBagPolicyTx], storageLeaderKey) + + // Adjust the actual replication rate to match the new policy + const staticBagIds = (await api.query.storage.bags.entries()) + .map(([sKey]) => sKey.args[0]) + .filter((bagId) => bagId.isStatic) + .map(stringifyBagId) + + // Channel bags should be removed alternately starting from bucket1 + // (due to voucher configuration provided above) + const bucket0ExpectedChannelsRemoved = channelIds.filter((c, i) => i % 2 === 1) + const bucket1ExpectedChannelsRemoved = channelIds.filter((c, i) => i % 2 === 0) + + const expectedBagRemovalsByBucket = new Map([ + [ + 0, + bucket0ExpectedChannelsRemoved.map((id) => ({ + id: `dynamic:channel:${id}`, + size: BigInt(singleChannelAssetsSize), + })), + ], + [ + 1, + [ + // Because all static bags are empty, they will be removed from bucket1, + // as it initally has less storage available + ...staticBagIds.map((id) => ({ id, size: BigInt(0) })), + ...bucket1ExpectedChannelsRemoved.map((id) => ({ + id: `dynamic:channel:${id}`, + size: BigInt(singleChannelAssetsSize), + })), + ], + ], + ]) + + debug('Setting replication rate (1)...') + const setReplicationRateFixture = new SetReplicationRateFixture(api, query, { + oldRate: 2, + newRate: 1, + expectedNumUpdates: staticBagIds.length + 3, + expectedBuckets: Array.from(expectedBagRemovalsByBucket.entries()).map(([bucketId, bagRemovals]) => ({ + id: bucketId, + removed: bagRemovals, + added: [], + })), + }) + await new FixtureRunner(setReplicationRateFixture).run() + + debug('Checking if storage nodes only store expected assets...') + await generateAssetsFixture.verifyAssets([ + { api: colossus1Api, channelIds: _.difference(channelIds, bucket0ExpectedChannelsRemoved) }, + { api: colossus2Api, channelIds: _.difference(channelIds, bucket1ExpectedChannelsRemoved) }, + ]) + + // Update number of storage buckets in dynamic bag creation policy back to 2 + debug('Updating channel bag creation policy (2)...') + const updateDynamicBagPolicy2Tx = api.tx.storage.updateNumberOfStorageBucketsInDynamicBagCreationPolicy('Channel', 2) + await api.sendExtrinsicsAndGetResults([updateDynamicBagPolicy2Tx], storageLeaderKey) + + // Adjust the actual replication rate to match the new policy + debug('Setting replication rate (2)...') + const setReplicationRateFixture2 = new SetReplicationRateFixture(api, query, { + oldRate: 1, + newRate: 2, + expectedNumUpdates: staticBagIds.length + 3, + // bucket1 will initially have more storage avialable, so the order of adding bags to buckets will match + // the order in which they were removed + expectedBuckets: Array.from(expectedBagRemovalsByBucket.entries()).map(([bucketId, bagRemovals]) => ({ + id: bucketId, + removed: [], + added: bagRemovals, + })), + }) + await new FixtureRunner(setReplicationRateFixture2).run() + + debug('Checking if both storage nodes store all assets...') + await generateAssetsFixture.verifyAssets([ + { api: colossus1Api, channelIds }, + { api: colossus2Api, channelIds }, + ]) + + // Restore previous storage bucket voucher limits + debug('Restoring previous storage bucket voucher limits...') + const restoreLimitTxs = activeStorageBuckets.map(([bucketId, bucket]) => + api.tx.storage.setStorageBucketVoucherLimits(bucketId, bucket.voucher.sizeLimit, bucket.voucher.objectsLimit) + ) + await api.sendExtrinsicsAndGetResults(restoreLimitTxs, storageLeaderKey) + + debug('Done') +} diff --git a/tests/network-tests/src/flows/storage/storageCleanup.ts b/tests/network-tests/src/flows/storage/storageCleanup.ts index 75b3ddaf20..b715fd7883 100644 --- a/tests/network-tests/src/flows/storage/storageCleanup.ts +++ b/tests/network-tests/src/flows/storage/storageCleanup.ts @@ -1,74 +1,29 @@ +import urljoin from 'url-join' +import { createType } from '@joystream/types' import { FlowProps } from '../../Flow' import { extendDebug } from '../../Debugger' -import { BuyMembershipHappyCaseFixture } from '../../fixtures/membership' +import { GenerateAssetsFixture } from '../../fixtures/storage/GenerateChannelAssetsFixture' import { FixtureRunner } from '../../Fixture' -import { createType } from '@joystream/types' -import { ChannelCreationInputParameters } from '@joystream/cli/src/Types' import { Utils } from '../../utils' import { ColossusApi } from '../../../ColossusApi' import { doubleBucketConfig } from './initStorage' -import { readFileSync } from 'fs' import { createJoystreamCli } from '../utils' -import urljoin from 'url-join' export async function storageCleanup({ api, query }: FlowProps): Promise { const debug = extendDebug('flow:storageCleanup') api.enableDebugTxLogs() debug('Started') - // Get sotrage leader key + // Get storage leader key const [, storageLeader] = await api.getLeader('storageWorkingGroup') const storageLeaderKey = storageLeader.roleAccountId.toString() - // Create a member that will create the channels - const [, memberKeyPair] = await api.createKeyPairs(2) - const memberAddr = memberKeyPair.key.address - const buyMembershipFixture = new BuyMembershipHappyCaseFixture(api, query, [memberAddr]) - await new FixtureRunner(buyMembershipFixture).run() - const [memberId] = buyMembershipFixture.getCreatedMembers() - - // Give member 100 JOY, to be able to create a few channels through CLI - await api.treasuryTransferBalance(memberAddr, Utils.joy(100)) - - // Use JoystreamCLI to create a few channels w/ some avatarPhoto and coverPhoto objects + // Generate channel assets const joystreamCli = await createJoystreamCli() - await joystreamCli.importAccount(memberKeyPair.key) - - const numChannels = 3 - - const channelsData: { channelId: number; avatarPhotoPath: string; coverPhotoPath: string }[] = [] - for (let i = 0; i < numChannels; ++i) { - const avatarPhotoPath = joystreamCli.getTmpFileManager().randomImgFile(300, 300) - const coverPhotoPath = joystreamCli.getTmpFileManager().randomImgFile(1920, 500) - const channelInput: ChannelCreationInputParameters = { - title: `Cleanup test channel ${i + 1}`, - avatarPhotoPath, - coverPhotoPath, - description: `This is a cleanup test channel ${i + 1}`, - isPublic: true, - language: 'en', - } - const channelId = await joystreamCli.createChannel(channelInput, [ - '--context', - 'Member', - '--useMemberId', - memberId.toString(), - ]) - debug(`Created channel ${i + 1}`) - channelsData.push({ channelId, avatarPhotoPath, coverPhotoPath }) - } - const channelIds = channelsData.map((c) => c.channelId) - - // Wait until QN processes the channels - debug('Waiting for QN to process the channels...') - const channels = await query.tryQueryWithTimeout( - () => query.channelsByIds(channelIds.map((id) => id.toString())), - (r) => Utils.assert(r.length === numChannels, `Expected ${numChannels} channels, found: ${r.length}`) - ) - - // Give colossus nodes some time to sync - debug('Giving colossus nodes 120 seconds to sync...') - await Utils.wait(120_000) + const generateAssetsFixture = new GenerateAssetsFixture(api, query, joystreamCli, { numberOfChannels: 3 }) + await new FixtureRunner(generateAssetsFixture).runWithQueryNodeChecks() + const channelsData = generateAssetsFixture.channelsCreated + const channelIds = channelsData.map((c) => c.id) // Verify that both storage nodes store all the assets of created channels const colossus1Endpoint = doubleBucketConfig.buckets[0].metadata.endpoint @@ -78,39 +33,10 @@ export async function storageCleanup({ api, query }: FlowProps): Promise { const colossus1Api = new ColossusApi(urljoin(colossus1Endpoint, 'api/v1')) const colossus2Api = new ColossusApi(urljoin(colossus2Endpoint, 'api/v1')) - const verifyAssets = async (colossus1StoredChannelIds: number[], colossus2StoredChannelIds: number[]) => { - const verifyAssetsPromises = channelsData.map(async ({ channelId, avatarPhotoPath, coverPhotoPath }) => { - const channel = channels.find((c) => c.id === channelId.toString()) - Utils.assert(channel, `Channel ${channelId} missing in QN result`) - Utils.assert(channel.coverPhoto && channel.avatarPhoto, `Channel assets missing in QN result`) - if (colossus1StoredChannelIds.includes(channelId)) { - await Promise.all([ - colossus1Api.fetchAndVerifyAsset(channel.coverPhoto.id, readFileSync(coverPhotoPath), 'image/bmp'), - colossus1Api.fetchAndVerifyAsset(channel.avatarPhoto.id, readFileSync(avatarPhotoPath), 'image/bmp'), - ]) - } else { - await Promise.all([ - colossus1Api.expectAssetNotFound(channel.coverPhoto.id), - colossus1Api.expectAssetNotFound(channel.avatarPhoto.id), - ]) - } - if (colossus2StoredChannelIds.includes(channelId)) { - await Promise.all([ - colossus2Api.fetchAndVerifyAsset(channel.coverPhoto.id, readFileSync(coverPhotoPath), 'image/bmp'), - colossus2Api.fetchAndVerifyAsset(channel.avatarPhoto.id, readFileSync(avatarPhotoPath), 'image/bmp'), - ]) - } else { - await Promise.all([ - colossus2Api.expectAssetNotFound(channel.coverPhoto.id), - colossus2Api.expectAssetNotFound(channel.avatarPhoto.id), - ]) - } - }) - await Promise.all(verifyAssetsPromises) - } - - // At this point we expect both nodes to store all assets - await verifyAssets(channelIds, channelIds) + await generateAssetsFixture.verifyAssets([ + { api: colossus1Api, channelIds }, + { api: colossus2Api, channelIds }, + ]) debug('All assets correctly stored!') // Delete the 1st channel @@ -126,23 +52,22 @@ export async function storageCleanup({ api, query }: FlowProps): Promise { api.tx.storage.updateStorageBucketsForBag( bag1Id, createType('BTreeSet', []), - createType('BTreeSet', [1]) // Remove 1st bucket (colossu2) + createType('BTreeSet', [1]) // Remove 1st bucket (colossus2) ), api.tx.storage.updateStorageBucketsForBag( bag2Id, createType('BTreeSet', []), - createType('BTreeSet', [0]) // Remove 0th bucket (colossu1) + createType('BTreeSet', [0]) // Remove 0th bucket (colossus1) ), ] await api.sendExtrinsicsAndGetResults(updateTxs, storageLeaderKey) - // Wait 2 minutes to make sure cleanup is executed - debug('Giving nodes 120 seconds to cleanup...') - await Utils.wait(120_000) - - // Verify that Colossus2 (w/ auto cleanup) no longer stores 1st and 2nd channel assets, - // while Colossus1 still stores all assets - await verifyAssets(channelIds, channelIds.slice(2)) + // Verify that colossus1 only stores 2nd channel assets, + // while colossus2 only stores 3rd channel assets + await generateAssetsFixture.verifyAssets([ + { api: colossus1Api, channelIds: [channelIds[1]] }, + { api: colossus2Api, channelIds: [channelIds[2]] }, + ]) debug('Cleanup correctly executed!') debug('Done') diff --git a/tests/network-tests/src/scenarios/full.ts b/tests/network-tests/src/scenarios/full.ts index ae372f5262..9daaec620c 100644 --- a/tests/network-tests/src/scenarios/full.ts +++ b/tests/network-tests/src/scenarios/full.ts @@ -22,8 +22,6 @@ import failToElect from '../flows/council/failToElect' import exactExecutionBlock from '../flows/proposals/exactExecutionBlock' import expireProposal from '../flows/proposals/expireProposal' import proposalsDiscussion from '../flows/proposalsDiscussion' -import initDistributionBucket from '../flows/clis/initDistributionBucket' -import initStorageBucket from '../flows/clis/initStorageBucket' import channelsAndVideos from '../flows/clis/channelsAndVideos' import { scenario } from '../Scenario' import activeVideoCounters from '../flows/content/activeVideoCounters' @@ -42,7 +40,6 @@ import { updateApp } from '../flows/content/updateApp' import curatorModerationActions from '../flows/content/curatorModerationActions' import collaboratorAndCuratorPermissions from '../flows/content/collaboratorAndCuratorPermissions' import updateValidatorVerificationStatus from '../flows/membership/updateValidatorVerifications' -import { storageCleanup } from '../flows/storage/storageCleanup' // eslint-disable-next-line @typescript-eslint/no-floating-promises scenario('Full', async ({ job }) => { @@ -113,21 +110,10 @@ scenario('Full', async ({ job }) => { 'curators and collaborators permissions', collaboratorAndCuratorPermissions ).after(curatorModerationActionsJob) - const directChannelPaymentJob = job('direct channel payment by members', directChannelPayment).after( - collaboratorAndCuratorPermissionsJob - ) + job('direct channel payment by members', directChannelPayment).after(collaboratorAndCuratorPermissionsJob) // Apps job('create app', createApp).after(hireLeads) job('update app', updateApp).after(hireLeads) job('create app actions', createAppActions).after(hireLeads) - - const contentDirectoryJob = directChannelPaymentJob // keep updated to last job above - - // Storage cleanup - const storageCleanupJob = job('storage cleanup', storageCleanup).after(contentDirectoryJob) - // Storage & distribution CLIs - job('init storage and distribution buckets via CLI', [initDistributionBucket, initStorageBucket]).after( - storageCleanupJob - ) }) diff --git a/tests/network-tests/src/scenarios/storage.ts b/tests/network-tests/src/scenarios/storage.ts new file mode 100644 index 0000000000..2108b6f31d --- /dev/null +++ b/tests/network-tests/src/scenarios/storage.ts @@ -0,0 +1,19 @@ +import { scenario } from '../Scenario' +import initDistributionBucket from '../flows/clis/initDistributionBucket' +import initStorageBucket from '../flows/clis/initStorageBucket' +import { storageSync } from '../flows/storage/storageSync' +import { storageCleanup } from '../flows/storage/storageCleanup' +import { setReplicationRate } from '../flows/storage/setReplicationRate' +import { copyAndEmptyBuckets } from '../flows/storage/copyAndEmptyBuckets' + +// eslint-disable-next-line @typescript-eslint/no-floating-promises +scenario('Storage', async ({ job }) => { + const setReplicationRateJob = job('set replication rate', setReplicationRate) + const copyAndEmptyBucketsJob = job('copy/empty buckets', copyAndEmptyBuckets).after(setReplicationRateJob) + const storageSyncJob = job('storage sync', storageSync).after(copyAndEmptyBucketsJob) + const storageCleanupJob = job('storage cleanup', storageCleanup).after(storageSyncJob) + // Storage & distribution CLIs + job('init storage and distribution buckets via CLI', [initDistributionBucket, initStorageBucket]).after( + storageCleanupJob + ) +}) diff --git a/tests/network-tests/src/scenarios/storageSync.ts b/tests/network-tests/src/scenarios/storageSync.ts deleted file mode 100644 index 2973de2fe8..0000000000 --- a/tests/network-tests/src/scenarios/storageSync.ts +++ /dev/null @@ -1,8 +0,0 @@ -import { scenario } from '../Scenario' -import { storageSync } from '../flows/storage/storageSync' - -// eslint-disable-next-line @typescript-eslint/no-floating-promises -scenario('Storage sync', async ({ job }) => { - // DEPENDS OF STORGE BEEING INITIALIZED WITH AT LEAST 2 BUCKETS! - job('test storage node sync', storageSync) -}) diff --git a/tests/network-tests/start-storage.sh b/tests/network-tests/start-storage.sh index 6acb285a73..caca30e0d5 100755 --- a/tests/network-tests/start-storage.sh +++ b/tests/network-tests/start-storage.sh @@ -13,6 +13,19 @@ export COLOSSUS_1_URL="http://${HOST_IP}:3333" export DISTRIBUTOR_1_URL="http://${HOST_IP}:3334" export COLOSSUS_2_URL="http://${HOST_IP}:3335" export DISTRIBUTOR_2_URL="http://${HOST_IP}:3336" + +if [ ! -z "$CLEANUP_INTERVAL" ]; then + # Cleanup testing configuration + export CLEANUP="true" + export CLEANUP_INTERVAL + export CLEANUP_NEW_OBJECT_EXPIRATION_PERIOD=10 # 10 seconds + export CLEANUP_MIN_REPLICATION_THRESHOLD=1 + echo "Cleanup enabled!" + echo "Cleanup interval: ${CLEANUP_INTERVAL}m" + echo "New object expiration period: ${CLEANUP_NEW_OBJECT_EXPIRATION_PERIOD}s" + echo "Min. replication threshold: ${CLEANUP_MIN_REPLICATION_THRESHOLD}" +fi + $THIS_DIR/run-test-scenario.sh initStorageAndDistribution # give QN time to catch up so nodes can get their initial state diff --git a/yarn.lock b/yarn.lock index 6e9cdd6401..11d7e3f156 100644 --- a/yarn.lock +++ b/yarn.lock @@ -5618,6 +5618,13 @@ resolved "https://registry.npmjs.org/@types/chai/-/chai-4.3.1.tgz" integrity sha512-/zPMqDkzSZ8t3VtxOa4KPq7uzzW978M9Tvh+j7GHKuo6k6GTLxPJ4J5gE5cjfJ26pnXst0N5Hax8Sr0T2Mi9zQ== +"@types/cli-progress@^3.11.6": + version "3.11.6" + resolved "https://registry.yarnpkg.com/@types/cli-progress/-/cli-progress-3.11.6.tgz#94b334ebe4190f710e51c1bf9b4fedb681fa9e45" + integrity sha512-cE3+jb9WRlu+uOSAugewNpITJDt1VF8dHOopPO4IABFc3SXYL5WE/+PTz/FCdZRRfIujiWW3n3aMbv1eIGVRWA== + dependencies: + "@types/node" "*" + "@types/cli-progress@^3.9.1": version "3.11.0" resolved "https://registry.npmjs.org/@types/cli-progress/-/cli-progress-3.11.0.tgz" @@ -6459,13 +6466,6 @@ dependencies: "@types/node" "*" -"@types/winston@^2.4.4": - version "2.4.4" - resolved "https://registry.npmjs.org/@types/winston/-/winston-2.4.4.tgz" - integrity sha512-BVGCztsypW8EYwJ+Hq+QNYiT/MUyCif0ouBH+flrY66O5W+KIXAMML6E/0fJpm7VjIzgangahl5S03bJJQGrZw== - dependencies: - winston "*" - "@types/ws@^5.1.2": version "5.1.2" resolved "https://registry.npmjs.org/@types/ws/-/ws-5.1.2.tgz" @@ -8780,6 +8780,13 @@ cli-highlight@^2.1.11: parse5-htmlparser2-tree-adapter "^6.0.0" yargs "^16.0.0" +cli-progress@^3.12.0: + version "3.12.0" + resolved "https://registry.yarnpkg.com/cli-progress/-/cli-progress-3.12.0.tgz#807ee14b66bcc086258e444ad0f19e7d42577942" + integrity sha512-tRkV3HJ1ASwm19THiiLIXLO7Im7wlTuKnvkYaTkyoAPefqjNg7W7DHKUlGRxy9vxDvbyCYQkQozvptuMkGCg8A== + dependencies: + string-width "^4.2.3" + cli-progress@^3.4.0, cli-progress@^3.9.0: version "3.11.0" resolved "https://registry.npmjs.org/cli-progress/-/cli-progress-3.11.0.tgz" @@ -23497,11 +23504,12 @@ winston-transport@^4.4.0, winston-transport@^4.5.0: readable-stream "^3.6.0" triple-beam "^1.3.0" -winston@*, winston@^3.3.3: - version "3.7.2" - resolved "https://registry.npmjs.org/winston/-/winston-3.7.2.tgz" - integrity sha512-QziIqtojHBoyzUOdQvQiar1DH0Xp9nF1A1y7NVy2DGEsz82SBDtOalS0ulTRGVT14xPX3WRWkCsdcJKqNflKng== +winston@^3.10.0: + version "3.11.0" + resolved "https://registry.npmjs.org/winston/-/winston-3.11.0.tgz" + integrity sha512-L3yR6/MzZAOl0DsysUXHVjOwv8mKZ71TrA/41EIduGpOOV5LQVodqN+QdQ6BS6PJ/RdIshZhq84P/fStEZkk7g== dependencies: + "@colors/colors" "^1.6.0" "@dabh/diagnostics" "^2.0.2" async "^3.2.3" is-stream "^2.0.0" @@ -23513,12 +23521,11 @@ winston@*, winston@^3.3.3: triple-beam "^1.3.0" winston-transport "^4.5.0" -winston@^3.10.0: - version "3.11.0" - resolved "https://registry.npmjs.org/winston/-/winston-3.11.0.tgz" - integrity sha512-L3yR6/MzZAOl0DsysUXHVjOwv8mKZ71TrA/41EIduGpOOV5LQVodqN+QdQ6BS6PJ/RdIshZhq84P/fStEZkk7g== +winston@^3.3.3: + version "3.7.2" + resolved "https://registry.npmjs.org/winston/-/winston-3.7.2.tgz" + integrity sha512-QziIqtojHBoyzUOdQvQiar1DH0Xp9nF1A1y7NVy2DGEsz82SBDtOalS0ulTRGVT14xPX3WRWkCsdcJKqNflKng== dependencies: - "@colors/colors" "^1.6.0" "@dabh/diagnostics" "^2.0.2" async "^3.2.3" is-stream "^2.0.0"