diff --git a/api/src/jobs/Metadata.test.ts b/api/src/jobs/Metadata.test.ts index d32f3bd6..70890b6e 100644 --- a/api/src/jobs/Metadata.test.ts +++ b/api/src/jobs/Metadata.test.ts @@ -21,13 +21,11 @@ describe("Metadata", () => { let dataStream: string; let contentFile; beforeEach(() => { - contentFile = { - name: "test1", - }; + contentFile = "test1"; dataStream = "test2"; fedoraObject = { addMasterMetadataDatastream: jest.fn(), - getDatastreamAsBuffer: jest.fn(), + downloadDatastreamToTempFile: jest.fn(), }; job = { data: { @@ -42,15 +40,13 @@ describe("Metadata", () => { }); it("adds a master data stream", async () => { - fedoraObject.getDatastreamAsBuffer.mockResolvedValue(dataStream); - tmp.fileSync.mockReturnValue(contentFile); + fedoraObject.downloadDatastreamToTempFile.mockResolvedValue(contentFile); const consoleSpy = jest.spyOn(console, "log").mockImplementation(jest.fn()); await metadata.run(job); expect(consoleSpy).toHaveBeenCalledTimes(1); expect(consoleSpy).toHaveBeenCalledWith("Adding metadata...", { pid: 123 }); - expect(fs.writeFileSync).toHaveBeenCalledWith(contentFile.name, dataStream); - expect(fedoraObject.addMasterMetadataDatastream).toHaveBeenCalledWith(contentFile.name); + expect(fedoraObject.addMasterMetadataDatastream).toHaveBeenCalledWith(contentFile); }); }); }); diff --git a/api/src/jobs/Metadata.ts b/api/src/jobs/Metadata.ts index 9f44b2aa..4f97df54 100644 --- a/api/src/jobs/Metadata.ts +++ b/api/src/jobs/Metadata.ts @@ -1,6 +1,5 @@ import { Job as QueueJob } from "bullmq"; import fs = require("fs"); -import tmp = require("tmp"); import Config from "../models/Config"; import { FedoraObject } from "../models/FedoraObject"; import FedoraObjectFactory from "../services/FedoraObjectFactory"; @@ -23,15 +22,15 @@ class MetadataProcessor { async addMasterMetadataDatastream(): Promise { const fedoraObject: FedoraObject = FedoraObject.build(this.pid, null, this.config); - const dataStream: Buffer = await fedoraObject.getDatastreamAsBuffer("MASTER"); - const contentFile = tmp.fileSync(); - fs.writeFileSync(contentFile.name, dataStream); - await fedoraObject.addMasterMetadataDatastream(contentFile.name); - fs.truncateSync(contentFile.name, 0); - fs.rmSync(contentFile.name); + // Stream the MASTER datastream directly to a temporary file to avoid + // buffering very large files into memory, then run FITS on that file. + const contentPath = await fedoraObject.downloadDatastreamToTempFile("MASTER"); + await fedoraObject.addMasterMetadataDatastream(contentPath); + fs.truncateSync(contentPath, 0); + fs.rmSync(contentPath); // FITS XML will have been generated in /tmp as a side-effect; clean it up: - fs.truncateSync(contentFile.name + ".fits.xml", 0); - fs.rmSync(contentFile.name + ".fits.xml"); + fs.truncateSync(contentPath + ".fits.xml", 0); + fs.rmSync(contentPath + ".fits.xml"); } async run(): Promise { diff --git a/api/src/models/FedoraObject.ts b/api/src/models/FedoraObject.ts old mode 100644 new mode 100755 index 684729b0..db96e20a --- a/api/src/models/FedoraObject.ts +++ b/api/src/models/FedoraObject.ts @@ -5,6 +5,7 @@ import Config from "./Config"; import { DatastreamParameters, Fedora } from "../services/Fedora"; import FedoraDataCollector from "../services/FedoraDataCollector"; import { execSync } from "child_process"; +import crypto = require("crypto"); import { Agent } from "../services/interfaces"; export interface ObjectParameters { @@ -76,8 +77,33 @@ export class FedoraObject { await this.fedora.deleteDatastreamTombstone(this.pid, stream); } + async computeDigestHeaderForFile(filename: string): Promise { + // Compute digest by streaming the file once (avoids loading the whole file into memory) + const md5Hash = crypto.createHash("md5"); + const sha512Hash = crypto.createHash("sha512"); + await new Promise((resolve, reject) => { + const rs = fs.createReadStream(filename); + rs.on("data", (chunk: Buffer) => { + md5Hash.update(chunk); + sha512Hash.update(chunk); + }); + rs.on("end", () => resolve()); + rs.on("error", (err) => reject(err)); + }); + const md5 = md5Hash.digest("hex"); + const sha512 = sha512Hash.digest("hex"); + return `md5=${md5}, sha-512=${sha512}`; + } + async addDatastreamFromFile(filename: string, stream: string, mimeType: string): Promise { - await this.addDatastreamFromStringOrBuffer(fs.readFileSync(filename), stream, mimeType, [201]); + // Create a fresh read stream for the upload + const readStream = fs.createReadStream(filename); + const digestHeader = await this.computeDigestHeaderForFile(filename); + const params: DatastreamParameters = { + mimeType: mimeType, + logMessage: "Initial Ingest addDatastream - " + stream, + }; + await this.fedora.addDatastream(this.pid, stream, params, readStream, [201], digestHeader); } async updateDatastreamFromFile(filename: string, stream: string, mimeType: string): Promise { @@ -106,6 +132,16 @@ export class FedoraObject { logMessage: "Initial Ingest addDatastream - MASTER-MD", }; const fitsXml = this.fitsMasterMetadata(filename); + + // Check if MASTER-MD exists and delete it if it does + try { + const checkResponse = await this.fedora.getDatastream(this.pid, "MASTER-MD"); + if (checkResponse.statusCode === 200) { + await this.deleteDatastream("MASTER-MD"); + } + } catch (e) { + // No existing MASTER-MD to delete + } await this.addDatastream("MASTER-MD", params, fitsXml, [201, 204]); } @@ -221,6 +257,10 @@ export class FedoraObject { return this.fedora.getDatastreamAsBuffer(this.pid, datastream); } + async downloadDatastreamToTempFile(datastream: string, treatMissingAsEmpty = false): Promise { + return this.fedora.downloadDatastreamToTempFile(this.pid, datastream, treatMissingAsEmpty); + } + async getDatastreamMetadata(datastream: string): Promise { return await this.fedora.getRdf(`${this.pid}/${datastream}/fcr:metadata`); } diff --git a/api/src/services/Fedora.test.ts b/api/src/services/Fedora.test.ts index 4896b4d5..d550a10c 100644 --- a/api/src/services/Fedora.test.ts +++ b/api/src/services/Fedora.test.ts @@ -74,7 +74,7 @@ describe("Fedora", () => { const putSpy = jest.spyOn(fedora, "putDatastream").mockImplementation(jest.fn()); const patchSpy = jest.spyOn(fedora, "patchRdf").mockReturnValue({ statusCode: 204 }); await fedora.addDatastream(pid, "MASTER", { mimeType: "foo/bar" }, "content"); - expect(putSpy).toHaveBeenCalledWith(pid, "MASTER", "foo/bar", [201], "content", ""); + expect(putSpy).toHaveBeenCalledWith(pid, "MASTER", "foo/bar", [201], "content", "", ""); const expectedTurtle = '<> "A";\n "test4_MASTER".\n'; expect(patchSpy).toHaveBeenCalledWith(`/${pid}/MASTER/fcr:metadata`, expectedTurtle); diff --git a/api/src/services/Fedora.ts b/api/src/services/Fedora.ts old mode 100644 new mode 100755 index b7cff740..5542e205 --- a/api/src/services/Fedora.ts +++ b/api/src/services/Fedora.ts @@ -9,6 +9,8 @@ import xmlescape = require("xml-escape"); import { HttpError } from "../models/HttpError"; import winston = require("winston"); import SolrCache from "./SolrCache"; +import fs = require("fs"); +import tmp = require("tmp"); export interface DatastreamParameters { dsLabel?: string; @@ -58,7 +60,7 @@ export class Fedora { protected _request( method = "get", _path = "/", - data: string | Buffer = null, + data: string | Buffer | NodeJS.ReadableStream = null, _options: Record = {}, ): Promise { const path = _path[0] == "/" ? _path.slice(1) : _path; @@ -70,7 +72,11 @@ export class Fedora { password: this.config.fedoraPassword, }; const options = Object.assign({}, auth, _options); - return http(method, url, data, options); + + return http(method, url, data, options).catch((err) => { + console.error(`Request failed for ${method.toUpperCase()} ${url}:`, err); + throw err; + }); } /** @@ -201,6 +207,65 @@ export class Fedora { ); } + /** + * Download a datastream directly to a temporary file to avoid buffering + * large files into memory. + * + * @param pid Record id + * @param datastream Which stream to request + * @param treatMissingAsEmpty If true, return empty temp file on 404 + */ + async downloadDatastreamToTempFile(pid: string, datastream: string, treatMissingAsEmpty = false): Promise { + const url = this.config.restBaseUrl + "/" + pid + "/" + datastream; + const auth = { + username: this.config.fedoraUsername, + password: this.config.fedoraPassword, + }; + + return new Promise((resolve, reject) => { + const tmpobj = tmp.fileSync(); + const writeStream = fs.createWriteStream(tmpobj.name); + + const req = http.get(url, auth); + + req.on("response", (res) => { + if (res.statusCode === 200) { + req.pipe(writeStream); + writeStream.on("finish", () => { + resolve(tmpobj.name); + }); + writeStream.on("error", (err) => { + try { + fs.unlinkSync(tmpobj.name); + } catch (e) { + console.error(e); + } + reject(err); + }); + } else if (res.statusCode === 404 && treatMissingAsEmpty) { + // create empty file and return its path + writeStream.end(() => resolve(tmpobj.name)); + } else { + try { + fs.unlinkSync(tmpobj.name); + } catch (e) { + console.error(e); + } + reject(new Error("Unexpected response for " + pid + "/" + datastream + ": " + res.statusCode)); + } + }); + + req.on("error", (err) => { + try { + fs.unlinkSync(tmpobj.name); + } catch (e) { + console.error(e); + } + reject(err); + }); + }); + } + /** * Get DC datastream from Fedora * @@ -264,18 +329,31 @@ export class Fedora { stream: string, mimeType: string, expectedStatus = [201], - data: string | Buffer, + data: string | Buffer | NodeJS.ReadableStream, linkHeader = "", + precomputedDigest = "", ): Promise { this.cache.purgeFromCacheIfEnabled(pid); - const md5 = crypto.createHash("md5").update(data).digest("hex"); - const sha = crypto.createHash("sha512").update(data).digest("hex"); const headers: Record = { "Overwrite-Tombstone": "true", "Content-Disposition": 'attachment; filename="' + stream + '"', "Content-Type": mimeType, - Digest: "md5=" + md5 + ", sha-512=" + sha, }; + + // If caller supplied a precomputed digest (for streaming upload), use it. + if (precomputedDigest && precomputedDigest.length > 0) { + headers.Digest = precomputedDigest; + } else { + // For string/Buffer payloads, compute digests here. + if (typeof data === "string" || Buffer.isBuffer(data)) { + const md5 = crypto.createHash("md5").update(data).digest("hex"); + const sha = crypto.createHash("sha512").update(data).digest("hex"); + headers.Digest = "md5=" + md5 + ", sha-512=" + sha; + } else { + // No precomputed digest and data is a stream — cannot compute here. + throw new Error("Streaming data requires a precomputed digest header to be provided"); + } + } const options = { headers: headers }; if (linkHeader.length > 0) { options.headers.Link = linkHeader; @@ -306,13 +384,22 @@ export class Fedora { pid: string, stream: string, params: DatastreamParameters, - data: string | Buffer, + data: string | Buffer | NodeJS.ReadableStream, expectedStatus = [201], + precomputedDigest = "", ): Promise { this.cache.purgeFromCacheIfEnabled(pid); // First create the stream: - await this.putDatastream(pid, stream, params.mimeType, expectedStatus, data, params.linkHeader ?? ""); + await this.putDatastream( + pid, + stream, + params.mimeType, + expectedStatus, + data, + params.linkHeader ?? "", + precomputedDigest, + ); // Now set appropriate metadata: const writer = new N3.Writer({ format: "text/turtle" });