diff --git a/cloud_storage/cloud_storage/overrides/file.py b/cloud_storage/cloud_storage/overrides/file.py index 02d9b2c..659d2e1 100644 --- a/cloud_storage/cloud_storage/overrides/file.py +++ b/cloud_storage/cloud_storage/overrides/file.py @@ -12,6 +12,7 @@ from pathlib import Path from urllib.parse import quote, unquote from urllib.request import urlopen +import tempfile import frappe from boto3.exceptions import S3UploadFailedError @@ -346,36 +347,54 @@ def get_pdf_preview(self): if self.is_folder: frappe.throw(_("Cannot get file contents of a Folder")) - import tempfile - ext = self.file_name.split(".")[-1].lower() - client = get_cloud_storage_client() - ppt_s3_key = self.s3_key - with tempfile.NamedTemporaryFile(suffix=f".{ext}", delete=False) as temp_ppt: - ppt_bytes = client.get_object(Bucket=client.bucket, Key=ppt_s3_key)["Body"].read() - temp_ppt.write(ppt_bytes) - temp_ppt.flush() - ppt_path = Path(temp_ppt.name) - with tempfile.TemporaryDirectory() as tmpdir: - tmpdir_path = Path(tmpdir) - subprocess.run( - [ - "libreoffice", - "--headless", - "--convert-to", - "pdf", - "--outdir", - str(tmpdir_path), - str(ppt_path), - ], - check=True, - ) - pdf_filename = ppt_path.with_suffix(".pdf").name - pdf_path = tmpdir_path / pdf_filename - with open(pdf_path, "rb") as f: - pdf_bytes = f.read() - encoded = base64.b64encode(pdf_bytes).decode("utf-8") - return encoded + + if self.file_url.startswith("/api/method/retrieve"): + client = get_cloud_storage_client() + ppt_s3_key = self.s3_key + + with tempfile.NamedTemporaryFile(suffix=f".{ext}", delete=False) as temp_file: + file_bytes = client.get_object(Bucket=client.bucket, Key=ppt_s3_key)["Body"].read() + + temp_file.write(file_bytes) + temp_file.flush() + + file_path = Path(temp_file.name) + + return convert_to_pdf_base64(file_path) + + else: + if not self.is_private: + file_path = Path(frappe.get_site_path("public", "files", self.file_name)) + else: + file_path = Path(frappe.get_site_path("private", "files", self.file_name)) + + return convert_to_pdf_base64(file_path) + + +def convert_to_pdf_base64(file_path: Path): + with tempfile.TemporaryDirectory() as tmpdir: + tmpdir_path = Path(tmpdir) + + subprocess.run( + [ + "libreoffice", + "--headless", + "--convert-to", + "pdf", + "--outdir", + str(tmpdir_path), + str(file_path), + ], + check=True, + ) + + pdf_filename = file_path.with_suffix(".pdf").name + pdf_path = tmpdir_path / pdf_filename + + with open(pdf_path, "rb") as f: + pdf_bytes = f.read() + return base64.b64encode(pdf_bytes).decode("utf-8") def is_safe_path(path: str) -> bool: diff --git a/cloud_storage/tests/fixtures/sample.docx b/cloud_storage/tests/fixtures/sample.docx new file mode 100644 index 0000000..325731e Binary files /dev/null and b/cloud_storage/tests/fixtures/sample.docx differ diff --git a/cloud_storage/tests/fixtures/sample.odp b/cloud_storage/tests/fixtures/sample.odp new file mode 100644 index 0000000..3db51e1 Binary files /dev/null and b/cloud_storage/tests/fixtures/sample.odp differ diff --git a/cloud_storage/tests/fixtures/sample.ppt b/cloud_storage/tests/fixtures/sample.ppt new file mode 100644 index 0000000..4cfa725 Binary files /dev/null and b/cloud_storage/tests/fixtures/sample.ppt differ diff --git a/cloud_storage/tests/fixtures/sample.pptx b/cloud_storage/tests/fixtures/sample.pptx new file mode 100644 index 0000000..c0524a7 Binary files /dev/null and b/cloud_storage/tests/fixtures/sample.pptx differ diff --git a/cloud_storage/tests/test_file_preview.py b/cloud_storage/tests/test_file_preview.py new file mode 100644 index 0000000..f98a802 --- /dev/null +++ b/cloud_storage/tests/test_file_preview.py @@ -0,0 +1,153 @@ +# Copyright (c) 2026, AgriTheory and contributors +# For license information, please see license.txt + +import base64 +from pathlib import Path +from unittest.mock import patch, MagicMock + +import frappe +import pytest + +from cloud_storage.cloud_storage.overrides.file import CloudStorageFile + + +TEST_FILES = Path(__file__).parent / "fixtures" + + +def _create_file_doc(file_name, file_url=None, is_private=0): + doc = frappe.get_doc( + { + "doctype": "File", + "file_name": file_name, + "file_url": file_url or f"/files/{file_name}", + "is_private": is_private, + } + ) + + doc.__class__ = CloudStorageFile + return doc + + +@pytest.mark.parametrize("ext", ["doc", "docx"]) +def test_doc_preview_content(monkeypatch, ext): + """Test preview for doc/docx files using get_content""" + + fake_content = b"fake document content" + + doc = _create_file_doc(f"sample.{ext}") + + monkeypatch.setattr( + "builtins.open", + lambda *args, **kwargs: MagicMock(read=lambda: fake_content), + ) + + content = doc.get_content() + + assert content is not None + + +@pytest.mark.parametrize("ext", ["ppt", "pptx", "odp", "key"]) +@patch("cloud_storage.cloud_storage.overrides.file.subprocess.run") +def test_local_presentation_pdf_preview(mock_run, tmp_path, monkeypatch, ext): + """Test local presentation preview converted to pdf""" + + pres_file = tmp_path / f"slides.{ext}" + pres_file.write_bytes(b"fake presentation content") + + pdf_file = tmp_path / "slides.pdf" + pdf_file.write_bytes(b"%PDF fake") + + doc = _create_file_doc(f"slides.{ext}") + doc.is_private = 0 + + monkeypatch.setattr( + "frappe.get_site_path", + lambda *args: str(pres_file), + ) + + mock_run.return_value = None + + with patch( + "cloud_storage.cloud_storage.overrides.file.Path.with_suffix", + return_value=pdf_file, + ): + with patch("builtins.open", lambda *args, **kwargs: pdf_file.open("rb")): + result = doc.get_pdf_preview() + + decoded = base64.b64decode(result) + + assert decoded.startswith(b"%PDF") + + +@pytest.mark.parametrize("ext", ["ppt", "pptx", "odp", "key"]) +@patch("cloud_storage.cloud_storage.overrides.file.subprocess.run") +def test_s3_presentation_preview(mock_run, mocked_s3_client, monkeypatch, tmp_path, ext): + """Test preview when file comes from S3""" + + pres_content = b"presentation binary" + + mocked_s3_client.put_object( + Bucket=mocked_s3_client.bucket, + Key=f"slides.{ext}", + Body=pres_content, + ) + + pdf_file = tmp_path / "slides.pdf" + pdf_file.write_bytes(b"%PDF mock") + + doc = _create_file_doc( + f"slides.{ext}", + file_url=f"/api/method/retrieve?key=slides.{ext}", + ) + + doc.s3_key = f"slides.{ext}" + + monkeypatch.setattr( + "cloud_storage.cloud_storage.overrides.file.get_cloud_storage_client", + lambda: mocked_s3_client, + ) + + mock_run.return_value = None + + with patch( + "builtins.open", + lambda *args, **kwargs: pdf_file.open("rb"), + ): + result = doc.get_pdf_preview() + + decoded = base64.b64decode(result) + + assert decoded.startswith(b"%PDF") + + +def test_preview_folder_error(): + doc = _create_file_doc("folder") + doc.is_folder = 1 + + with pytest.raises(frappe.ValidationError): + doc.get_pdf_preview() + + +def test_safe_path(): + from cloud_storage.cloud_storage.overrides.file import is_safe_path + + safe = frappe.get_site_path("public", "files", "test.txt") + + assert is_safe_path(safe) + + +def test_generate_sharing_link(monkeypatch): + + doc = frappe.get_doc( + { + "doctype": "File", + "file_name": "sample.txt", + "is_private": 0, + } + ).insert() + + from cloud_storage.cloud_storage.overrides.file import get_sharing_link + + url = get_sharing_link(doc.name) + + assert "share?key=" in url