|
| 1 | +# celery/preprocessing.py |
| 2 | + |
1 | 3 | from openai import OpenAI |
2 | 4 |
|
3 | 5 | from .. import models |
4 | 6 | from ..dependencies import get_db |
5 | | -from ..utils.preprocessing import preprocess_files |
| 7 | +from ..utils.preprocessing import PreprocessingPipeline |
6 | 8 | from .celery_config import celery_app |
7 | 9 |
|
8 | 10 | if celery_app: |
9 | 11 |
|
10 | | - @celery_app.task(autoretry_for=(Exception,), retry_backoff=True) |
| 12 | + @celery_app.task( |
| 13 | + bind=True, |
| 14 | + autoretry_for=(Exception,), |
| 15 | + retry_backoff=True, |
| 16 | + max_retries=3, |
| 17 | + default_retry_delay=60 |
| 18 | + ) |
| 19 | + def process_files_async(self, task_id: int): |
| 20 | + """ |
| 21 | + Celery task for asynchronous file processing with the new pipeline. |
| 22 | +
|
| 23 | + Parameters: |
| 24 | + ----------- |
| 25 | + task_id : int |
| 26 | + The ID of the PreprocessingTask to process |
| 27 | + """ |
| 28 | + with next(get_db()) as db: |
| 29 | + try: |
| 30 | + # Update celery task ID for cancellation support |
| 31 | + task = db.get(models.PreprocessingTask, task_id) |
| 32 | + if task and self.request.id: |
| 33 | + task.celery_task_id = self.request.id |
| 34 | + db.commit() |
| 35 | + |
| 36 | + # Process using the new pipeline |
| 37 | + pipeline = PreprocessingPipeline(db, task_id) |
| 38 | + pipeline.process() |
| 39 | + |
| 40 | + except Exception as e: |
| 41 | + # Update task status on failure |
| 42 | + task = db.get(models.PreprocessingTask, task_id) |
| 43 | + if task: |
| 44 | + task.status = models.PreprocessingStatus.FAILED |
| 45 | + task.message = f"Processing failed: {str(e)}" |
| 46 | + db.commit() |
| 47 | + raise |
| 48 | + |
| 49 | + @celery_app.task( |
| 50 | + autoretry_for=(Exception,), |
| 51 | + retry_backoff=True, |
| 52 | + max_retries=3 |
| 53 | + ) |
11 | 54 | def preprocess_file_celery( |
12 | | - file_ids: list[int], |
13 | | - client: OpenAI | None = None, |
14 | | - base_url: str | None = None, |
15 | | - api_key: str | None = None, |
16 | | - pdf_backend: str = "pymupdf4llm", |
17 | | - ocr_backend: str = "ocrmypdf", |
18 | | - llm_model: str | None = None, |
19 | | - use_ocr: bool = True, |
20 | | - force_ocr: bool = False, |
21 | | - ocr_languages: list[str] | None = None, |
22 | | - ocr_model: str | None = None, |
23 | | - project_id: int | None = None, |
24 | | - preprocessing_task_id: int | None = None, |
25 | | - output_file: bool = True, |
| 55 | + file_ids: list[int], |
| 56 | + client: OpenAI | None = None, |
| 57 | + base_url: str | None = None, |
| 58 | + api_key: str | None = None, |
| 59 | + pdf_backend: str = "pymupdf4llm", |
| 60 | + ocr_backend: str = "ocrmypdf", |
| 61 | + llm_model: str | None = None, |
| 62 | + use_ocr: bool = True, |
| 63 | + force_ocr: bool = False, |
| 64 | + ocr_languages: list[str] | None = None, |
| 65 | + ocr_model: str | None = None, |
| 66 | + project_id: int | None = None, |
| 67 | + preprocessing_task_id: int | None = None, |
| 68 | + output_file: bool = True, |
26 | 69 | ): |
27 | | - with next(get_db()) as db_session: |
28 | | - files = ( |
29 | | - db_session.query(models.File).filter(models.File.id.in_(file_ids)).all() |
30 | | - ) |
31 | | - return preprocess_files( |
32 | | - files=files, |
33 | | - client=client, |
34 | | - ocr_backend=ocr_backend, |
35 | | - pdf_backend=pdf_backend, |
36 | | - llm_model=llm_model, |
37 | | - use_ocr=use_ocr, |
38 | | - force_ocr=force_ocr, |
39 | | - ocr_languages=ocr_languages, |
40 | | - ocr_model=ocr_model, |
41 | | - base_url=base_url, |
42 | | - api_key=api_key, |
43 | | - db_session=db_session, |
| 70 | + """ |
| 71 | + Legacy celery task for backward compatibility. |
| 72 | + Creates a preprocessing configuration and task, then processes files. |
| 73 | + """ |
| 74 | + with next(get_db()) as db: |
| 75 | + if not project_id: |
| 76 | + raise ValueError("project_id is required") |
| 77 | + |
| 78 | + # Get or create a configuration for these settings |
| 79 | + config_snapshot = { |
| 80 | + "file_type": models.FileType.APPLICATION_PDF, # Default, will be overridden |
| 81 | + "preprocessing_strategy": models.PreprocessingStrategy.FULL_DOCUMENT, |
| 82 | + "pdf_backend": pdf_backend, |
| 83 | + "ocr_backend": ocr_backend, |
| 84 | + "use_ocr": use_ocr, |
| 85 | + "force_ocr": force_ocr, |
| 86 | + "ocr_languages": ocr_languages, |
| 87 | + "ocr_model": ocr_model, |
| 88 | + "llm_model": llm_model, |
| 89 | + "additional_settings": { |
| 90 | + "base_url": base_url, |
| 91 | + "api_key": api_key, |
| 92 | + "output_file": output_file, |
| 93 | + } |
| 94 | + } |
| 95 | + |
| 96 | + # Create temporary configuration |
| 97 | + config = models.PreprocessingConfiguration( |
44 | 98 | project_id=project_id, |
45 | | - preprocessing_task_id=preprocessing_task_id, |
46 | | - output_file=output_file, |
| 99 | + name=f"Legacy config - {ocr_backend}", |
| 100 | + description="Created from legacy preprocess_file_celery call", |
| 101 | + **{k: v for k, v in config_snapshot.items() if k != "additional_settings"}, |
| 102 | + additional_settings=config_snapshot.get("additional_settings", {}) |
47 | 103 | ) |
| 104 | + db.add(config) |
| 105 | + db.commit() |
| 106 | + db.refresh(config) |
| 107 | + |
| 108 | + # Use existing task or create new one |
| 109 | + if preprocessing_task_id: |
| 110 | + task = db.get(models.PreprocessingTask, preprocessing_task_id) |
| 111 | + if not task: |
| 112 | + raise ValueError(f"PreprocessingTask {preprocessing_task_id} not found") |
| 113 | + task.configuration_id = config.id |
| 114 | + else: |
| 115 | + task = models.PreprocessingTask( |
| 116 | + project_id=project_id, |
| 117 | + configuration_id=config.id, |
| 118 | + total_files=len(file_ids), |
| 119 | + rollback_on_cancel=True |
| 120 | + ) |
| 121 | + db.add(task) |
| 122 | + |
| 123 | + db.commit() |
| 124 | + |
| 125 | + # Create file tasks |
| 126 | + for file_id in file_ids: |
| 127 | + file_task = models.FilePreprocessingTask( |
| 128 | + preprocessing_task_id=task.id, |
| 129 | + file_id=file_id |
| 130 | + ) |
| 131 | + db.add(file_task) |
| 132 | + |
| 133 | + db.commit() |
| 134 | + |
| 135 | + # Process using new pipeline |
| 136 | + pipeline = PreprocessingPipeline(db, task.id) |
| 137 | + # Pass OpenAI client if provided |
| 138 | + if client or (base_url and api_key): |
| 139 | + pipeline.client = client or OpenAI(base_url=base_url, api_key=api_key) |
| 140 | + |
| 141 | + pipeline.process() |
| 142 | + |
| 143 | + # Return document IDs for backward compatibility |
| 144 | + db.refresh(task) |
| 145 | + document_ids = [] |
| 146 | + for file_task in task.file_tasks: |
| 147 | + document_ids.extend([doc.id for doc in file_task.documents]) |
| 148 | + |
| 149 | + return document_ids |
0 commit comments