diff --git a/src/utils/index.ts b/src/utils/index.ts index 68c9dbb..4d6269b 100644 --- a/src/utils/index.ts +++ b/src/utils/index.ts @@ -16,6 +16,8 @@ export * from './godot.js' export * from './help.js' export * from './hooks/index.js' export * from './query/index.js' +export * from './upload.js' +export * from './zip.js' /** * Works the same way that git short commits are generated. diff --git a/src/utils/query/useShip.ts b/src/utils/query/useShip.ts index 2d6db06..cfd93cd 100644 --- a/src/utils/query/useShip.ts +++ b/src/utils/query/useShip.ts @@ -2,16 +2,14 @@ import fs from 'node:fs' import {Command} from '@oclif/core' import {useMutation} from '@tanstack/react-query' -import axios from 'axios' import fg from 'fast-glob' import {v4 as uuid} from 'uuid' -import {ZipFile} from 'yazl' import {getNewUploadTicket, getProject, startJobsFromUpload} from '@cli/api/index.js' import {BaseCommand} from '@cli/baseCommands/index.js' import {DEFAULT_IGNORED_FILES_GLOBS, DEFAULT_SHIPPED_FILES_GLOBS, cacheKeys} from '@cli/constants/index.js' import {Job, Platform, ProjectConfig, ShipGameFlags, UploadDetails} from '@cli/types' -import {getCWDGitInfo, getFileHash, queryClient} from '@cli/utils/index.js' +import {createZip, getCWDGitInfo, getFileHash, queryClient, uploadZip} from '@cli/utils/index.js' // Takes the current command so we can get the project config // This could be made more composable @@ -21,6 +19,22 @@ interface ShipOptions { shipFlags?: ShipGameFlags // If provided, will override command flags } +function formatProgressLog( + label: string, + data: {progress: number; elapsedSeconds: number; speedMBps: number; [key: string]: any}, + bytesKey: 'writtenBytes' | 'loadedBytes', + totalKey: 'estimatedTotalBytes' | 'totalBytes', + isEstimated = false, +): string { + const elapsed = data.elapsedSeconds.toFixed(1) + const transferredMB = (data[bytesKey] / 1024 / 1024).toFixed(2) + const totalMB = (data[totalKey] / 1024 / 1024).toFixed(2) + const progressPercent = Math.round(data.progress * 100) + const speed = data.speedMBps.toFixed(2) + const totalPrefix = isEstimated ? '~' : '' + return `${label}: ${progressPercent}% (${transferredMB}MB / ${totalPrefix}${totalMB}MB) - ${elapsed}s - ${speed}MB/s` +} + export async function ship({command, log = () => {}, shipFlags}: ShipOptions): Promise { const commandFlags = command.getFlags() as ShipGameFlags const finalFlags = shipFlags || commandFlags @@ -51,41 +65,49 @@ export async function ship({command, log = () => {}, shipFlags}: ShipOptions): P const files = await fg(shippedFilesGlobs, {dot: true, ignore: ignoredFilesGlobs}) verbose && log(`Found ${files.length} files, adding to zip...`) - const zipFile = new ZipFile() - for (const file of files) { - zipFile.addFile(file, file) - } - const outputZipToFile = (zip: ZipFile, fileName: string) => - new Promise((resolve) => { - const outputStream = fs.createWriteStream(fileName) - zip.outputStream.pipe(outputStream).on('close', () => resolve()) - zip.end() - }) - - const tmpZipFile = `${process.cwd()}/shipthis-${uuid()}.zip` - log(`Creating zip file: ${tmpZipFile}`) - await outputZipToFile(zipFile, tmpZipFile) + const tmpZipFileName = `shipthis-${uuid()}.zip` + const tmpZipFile = `${process.cwd()}/${tmpZipFileName}` + log(`Creating zip file: ${tmpZipFileName}`) + await createZip({ + files, + outputPath: tmpZipFile, + onProgress: (data) => { + log(formatProgressLog('Zipping', data, 'writtenBytes', 'estimatedTotalBytes', true)) + }, + }) - verbose && log('Reading zip file buffer...') - const zipBuffer = fs.readFileSync(tmpZipFile) const {size} = fs.statSync(tmpZipFile) verbose && log('Requesting upload ticket...') const uploadTicket = await getNewUploadTicket(projectConfig.project.id) log('Uploading zip file...') - await axios.put(uploadTicket.url, zipBuffer, { - headers: { - 'Content-Type': 'application/zip', - 'Content-length': size, + const zipStream = fs.createReadStream(tmpZipFile) + + const response = await uploadZip({ + url: uploadTicket.url, + zipStream, + zipSize: size, + onProgress: (data) => { + log(formatProgressLog('Uploading', data, 'loadedBytes', 'totalBytes', false)) }, }) + verbose && log('Computing zip file hash...') + const zipFileMd5 = await getFileHash(tmpZipFile) + + verbose && log('Cleaning up temporary zip file...') + fs.unlinkSync(tmpZipFile) + + if (!response.ok) { + throw new Error(`Upload failed: ${response.status} ${response.statusText}`) + } + + log(`Upload complete`) + verbose && log('Fetching Git info...') const gitInfo = await getCWDGitInfo() - verbose && log('Computing file hash...') - const zipFileMd5 = await getFileHash(tmpZipFile) const uploadDetails: UploadDetails = { ...gitInfo, zipFileMd5, @@ -104,9 +126,6 @@ export async function ship({command, log = () => {}, shipFlags}: ShipOptions): P const jobs = await startJobsFromUpload(uploadTicket.id, startJobsOptions) - verbose && log('Cleaning up temporary zip file...') - fs.unlinkSync(tmpZipFile) - verbose && log('Job submission complete.') if (jobs.length === 0) { diff --git a/src/utils/upload.ts b/src/utils/upload.ts new file mode 100644 index 0000000..ce7fcec --- /dev/null +++ b/src/utils/upload.ts @@ -0,0 +1,79 @@ +import {Readable, Transform} from 'stream' + +export const ON_PROGRESS_THROTTLE_MS = 2000 + +export function createProgressStream( + totalSize: number, + onProgress: (sent: number, total: number) => void, + throttleMs?: number +): Transform { + let sent = 0 + let lastCallTime = 0 + + return new Transform({ + transform(chunk, encoding, callback) { + sent += chunk.length + + const now = Date.now() + if (!throttleMs || now - lastCallTime >= throttleMs) { + onProgress(sent, totalSize) + lastCallTime = now + } + + callback(null, chunk) + }, + }) +} + +interface ProgressData { + progress: number + loadedBytes: number + totalBytes: number + speedMBps: number + elapsedSeconds: number +} + +interface UploadProps { + url: string + zipStream: Readable + zipSize: number + onProgress: (data: ProgressData) => void +} + +// Uploads a zip file with progress tracking +export function uploadZip({url, zipStream, zipSize, onProgress}: UploadProps): Promise { + const startTime = Date.now() + + const progressStream = createProgressStream(zipSize, (sent, total) => { + const elapsedSeconds = (Date.now() - startTime) / 1000 + const speedMBps = elapsedSeconds < 0.001 ? 0 : sent / elapsedSeconds / 1024 / 1024 + onProgress({ + progress: total ? sent / total : 0, + loadedBytes: sent, + totalBytes: total, + speedMBps, + elapsedSeconds, + }) + }, ON_PROGRESS_THROTTLE_MS) + + const streamWithProgress = zipStream.pipe(progressStream) + const webStream = Readable.toWeb(streamWithProgress) as ReadableStream + + // The 'duplex' property is required when using a ReadableStream as the request body. + // 'duplex: half' indicates half-duplex communication (one direction at a time), + // which is the mode needed for streaming request bodies with fetch(). + // Type assertion is necessary because 'duplex' is not yet part of the standard + // TypeScript RequestInit type definition, though it's required by the fetch spec + // for streaming uploads. + const response = fetch(url, { + method: 'PUT', + headers: { + 'Content-Type': 'application/zip', + 'Content-Length': zipSize.toString(), + }, + body: webStream, + duplex: 'half', + } as RequestInit & {duplex: 'half'}) + + return response +} diff --git a/src/utils/zip.ts b/src/utils/zip.ts new file mode 100644 index 0000000..68a7e59 --- /dev/null +++ b/src/utils/zip.ts @@ -0,0 +1,102 @@ +import fs from 'node:fs' +import {ZipFile} from 'yazl' + +import {createProgressStream, ON_PROGRESS_THROTTLE_MS} from './upload.js' + +// Used to estimate the final zip size +const COMPRESSION_RATIO = 0.65 + +export interface ZipProgressData { + progress: number + writtenBytes: number + estimatedTotalBytes: number + sourceTotalBytes: number + elapsedSeconds: number + speedMBps: number +} + +interface CreateZipProps { + files: string[] + outputPath: string + onProgress: (data: ZipProgressData) => void +} + +// Creates a zip file with progress tracking +export async function createZip({files, outputPath, onProgress}: CreateZipProps): Promise { + const startTime = Date.now() + + const statPromises = files.map(async (file) => { + try { + return await fs.promises.stat(file) + } catch { + // Skip inaccessible files + return null + } + }) + + const statsResults = await Promise.all(statPromises) + + let totalSourceSize = 0 + for (const stats of statsResults) { + if (stats) { + totalSourceSize += stats.size + } + } + + const estimatedZipSize = Math.max(Math.round(totalSourceSize * COMPRESSION_RATIO), 1) + + const zipFile = new ZipFile() + for (const file of files) { + zipFile.addFile(file, file) + } + + return new Promise((resolve, reject) => { + let settled = false + + const handleError = (error: Error) => { + if (settled) return + settled = true + + // Clean up streams + zipFile.outputStream.destroy() + progressStream.destroy() + outputStream.destroy() + + reject(error) + } + + const handleSuccess = () => { + if (settled) return + settled = true + resolve() + } + + const outputStream = fs.createWriteStream(outputPath) + + const progressStream = createProgressStream(estimatedZipSize, (written, total) => { + const elapsedSeconds = (Date.now() - startTime) / 1000 + const speedMBps = elapsedSeconds < 0.001 ? 0 : written / elapsedSeconds / 1024 / 1024 + onProgress({ + progress: total ? Math.min(1, written / total) : 0, + writtenBytes: written, + estimatedTotalBytes: total, + sourceTotalBytes: totalSourceSize, + elapsedSeconds, + speedMBps, + }) + }, ON_PROGRESS_THROTTLE_MS) + + // Add error handlers to all streams in the pipe chain + zipFile.outputStream.on('error', handleError) + progressStream.on('error', handleError) + outputStream.on('error', handleError) + outputStream.on('close', handleSuccess) + + zipFile.outputStream + .pipe(progressStream) + .pipe(outputStream) + + zipFile.end() + }) +} +