Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 48 additions & 29 deletions cloud_storage/cloud_storage/overrides/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from pathlib import Path
from urllib.parse import quote, unquote
from urllib.request import urlopen
import tempfile

import frappe
from boto3.exceptions import S3UploadFailedError
Expand Down Expand Up @@ -346,36 +347,54 @@ def get_pdf_preview(self):
if self.is_folder:
frappe.throw(_("Cannot get file contents of a Folder"))

import tempfile

ext = self.file_name.split(".")[-1].lower()
client = get_cloud_storage_client()
ppt_s3_key = self.s3_key
with tempfile.NamedTemporaryFile(suffix=f".{ext}", delete=False) as temp_ppt:
ppt_bytes = client.get_object(Bucket=client.bucket, Key=ppt_s3_key)["Body"].read()
temp_ppt.write(ppt_bytes)
temp_ppt.flush()
ppt_path = Path(temp_ppt.name)
with tempfile.TemporaryDirectory() as tmpdir:
tmpdir_path = Path(tmpdir)
subprocess.run(
[
"libreoffice",
"--headless",
"--convert-to",
"pdf",
"--outdir",
str(tmpdir_path),
str(ppt_path),
],
check=True,
)
pdf_filename = ppt_path.with_suffix(".pdf").name
pdf_path = tmpdir_path / pdf_filename
with open(pdf_path, "rb") as f:
pdf_bytes = f.read()
encoded = base64.b64encode(pdf_bytes).decode("utf-8")
return encoded

if self.file_url.startswith("/api/method/retrieve"):
client = get_cloud_storage_client()
ppt_s3_key = self.s3_key

with tempfile.NamedTemporaryFile(suffix=f".{ext}", delete=False) as temp_file:
file_bytes = client.get_object(Bucket=client.bucket, Key=ppt_s3_key)["Body"].read()

temp_file.write(file_bytes)
temp_file.flush()

file_path = Path(temp_file.name)

return convert_to_pdf_base64(file_path)

else:
if not self.is_private:
file_path = Path(frappe.get_site_path("public", "files", self.file_name))
else:
file_path = Path(frappe.get_site_path("private", "files", self.file_name))

return convert_to_pdf_base64(file_path)


def convert_to_pdf_base64(file_path: Path):
with tempfile.TemporaryDirectory() as tmpdir:
tmpdir_path = Path(tmpdir)

subprocess.run(
[
"libreoffice",
"--headless",
"--convert-to",
"pdf",
"--outdir",
str(tmpdir_path),
str(file_path),
],
check=True,
)

pdf_filename = file_path.with_suffix(".pdf").name
pdf_path = tmpdir_path / pdf_filename

with open(pdf_path, "rb") as f:
pdf_bytes = f.read()
return base64.b64encode(pdf_bytes).decode("utf-8")


def is_safe_path(path: str) -> bool:
Expand Down
Binary file added cloud_storage/tests/fixtures/sample.docx
Binary file not shown.
Binary file added cloud_storage/tests/fixtures/sample.odp
Binary file not shown.
Binary file added cloud_storage/tests/fixtures/sample.ppt
Binary file not shown.
Binary file added cloud_storage/tests/fixtures/sample.pptx
Binary file not shown.
153 changes: 153 additions & 0 deletions cloud_storage/tests/test_file_preview.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
# Copyright (c) 2026, AgriTheory and contributors
# For license information, please see license.txt

import base64
from pathlib import Path
from unittest.mock import patch, MagicMock

import frappe
import pytest

from cloud_storage.cloud_storage.overrides.file import CloudStorageFile


TEST_FILES = Path(__file__).parent / "fixtures"


def _create_file_doc(file_name, file_url=None, is_private=0):
doc = frappe.get_doc(
{
"doctype": "File",
"file_name": file_name,
"file_url": file_url or f"/files/{file_name}",
"is_private": is_private,
}
)

doc.__class__ = CloudStorageFile
return doc


@pytest.mark.parametrize("ext", ["doc", "docx"])
def test_doc_preview_content(monkeypatch, ext):
"""Test preview for doc/docx files using get_content"""

fake_content = b"fake document content"

doc = _create_file_doc(f"sample.{ext}")

monkeypatch.setattr(
"builtins.open",
lambda *args, **kwargs: MagicMock(read=lambda: fake_content),
)

content = doc.get_content()

assert content is not None


@pytest.mark.parametrize("ext", ["ppt", "pptx", "odp", "key"])
@patch("cloud_storage.cloud_storage.overrides.file.subprocess.run")
def test_local_presentation_pdf_preview(mock_run, tmp_path, monkeypatch, ext):
"""Test local presentation preview converted to pdf"""

pres_file = tmp_path / f"slides.{ext}"
pres_file.write_bytes(b"fake presentation content")

pdf_file = tmp_path / "slides.pdf"
pdf_file.write_bytes(b"%PDF fake")

doc = _create_file_doc(f"slides.{ext}")
doc.is_private = 0

monkeypatch.setattr(
"frappe.get_site_path",
lambda *args: str(pres_file),
)

mock_run.return_value = None

with patch(
"cloud_storage.cloud_storage.overrides.file.Path.with_suffix",
return_value=pdf_file,
):
with patch("builtins.open", lambda *args, **kwargs: pdf_file.open("rb")):
result = doc.get_pdf_preview()

decoded = base64.b64decode(result)

assert decoded.startswith(b"%PDF")


@pytest.mark.parametrize("ext", ["ppt", "pptx", "odp", "key"])
@patch("cloud_storage.cloud_storage.overrides.file.subprocess.run")
def test_s3_presentation_preview(mock_run, mocked_s3_client, monkeypatch, tmp_path, ext):
"""Test preview when file comes from S3"""

pres_content = b"presentation binary"

mocked_s3_client.put_object(
Bucket=mocked_s3_client.bucket,
Key=f"slides.{ext}",
Body=pres_content,
)

pdf_file = tmp_path / "slides.pdf"
pdf_file.write_bytes(b"%PDF mock")

doc = _create_file_doc(
f"slides.{ext}",
file_url=f"/api/method/retrieve?key=slides.{ext}",
)

doc.s3_key = f"slides.{ext}"

monkeypatch.setattr(
"cloud_storage.cloud_storage.overrides.file.get_cloud_storage_client",
lambda: mocked_s3_client,
)

mock_run.return_value = None

with patch(
"builtins.open",
lambda *args, **kwargs: pdf_file.open("rb"),
):
result = doc.get_pdf_preview()

decoded = base64.b64decode(result)

assert decoded.startswith(b"%PDF")


def test_preview_folder_error():
doc = _create_file_doc("folder")
doc.is_folder = 1

with pytest.raises(frappe.ValidationError):
doc.get_pdf_preview()


def test_safe_path():
from cloud_storage.cloud_storage.overrides.file import is_safe_path

safe = frappe.get_site_path("public", "files", "test.txt")

assert is_safe_path(safe)


def test_generate_sharing_link(monkeypatch):

doc = frappe.get_doc(
{
"doctype": "File",
"file_name": "sample.txt",
"is_private": 0,
}
).insert()

from cloud_storage.cloud_storage.overrides.file import get_sharing_link

url = get_sharing_link(doc.name)

assert "share?key=" in url
Loading