Skip to content

Commit 1868f51

Browse files
committed
[ADD] comprehensive tests for queue job tracking
1 parent f88aa22 commit 1868f51

File tree

3 files changed

+298
-0
lines changed

3 files changed

+298
-0
lines changed

spp_base_common/CHANGELOG.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
# Changelog
2+
3+
## 2025-11-20
4+
5+
### 2025-11-20 16:00:00 - [ADD] comprehensive tests for queue job tracking
6+
7+
- Added `test_queue_job_tracking.py` with 10 comprehensive test cases
8+
- Tests verify `res_id` and `res_model` field functionality
9+
- Tests verify `_compute_job_ids` correctly filters jobs by record
10+
- Tests verify `_compute_has_ongoing_jobs` detects all job states (pending, enqueued, started)
11+
- Tests verify completed jobs (done/failed) don't trigger ongoing flag
12+
- Tests verify model isolation - jobs from different models don't interfere
13+
- Tests cover edge cases including empty results and mixed job states
14+
- Updated `__init__.py` to import new test module

spp_base_common/tests/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
from . import test_ir_module_module
22
from . import test_phone_number_validation
3+
from . import test_queue_job_tracking
Lines changed: 283 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,283 @@
1+
# Part of OpenSPP. See LICENSE file for full copyright and licensing details.
2+
3+
from odoo.tests.common import TransactionCase
4+
5+
6+
class TestQueueJobTracking(TransactionCase):
7+
@classmethod
8+
def setUpClass(cls):
9+
super().setUpClass()
10+
cls.queue_job_model = cls.env["queue.job"]
11+
cls.area_import_model = cls.env["spp.area.import"]
12+
13+
# Create test area import records
14+
cls.area_import_1 = cls.area_import_model.create(
15+
{
16+
"name": "Test Area Import 1",
17+
}
18+
)
19+
cls.area_import_2 = cls.area_import_model.create(
20+
{
21+
"name": "Test Area Import 2",
22+
}
23+
)
24+
25+
def test_01_queue_job_res_fields(self):
26+
"""Test that queue.job model has res_id and res_model fields"""
27+
job = self.queue_job_model.create(
28+
{
29+
"name": "Test Job",
30+
"model_name": "spp.area.import",
31+
"method_name": "test_method",
32+
"res_model": "spp.area.import",
33+
"res_id": self.area_import_1.id,
34+
}
35+
)
36+
37+
self.assertEqual(job.res_model, "spp.area.import")
38+
self.assertEqual(job.res_id, self.area_import_1.id)
39+
40+
def test_02_compute_job_ids(self):
41+
"""Test that area import correctly computes related jobs"""
42+
# Create jobs for area_import_1
43+
job1 = self.queue_job_model.create(
44+
{
45+
"name": "Job 1 for Import 1",
46+
"model_name": "spp.area.import",
47+
"method_name": "test_method",
48+
"res_model": "spp.area.import",
49+
"res_id": self.area_import_1.id,
50+
"state": "done",
51+
}
52+
)
53+
job2 = self.queue_job_model.create(
54+
{
55+
"name": "Job 2 for Import 1",
56+
"model_name": "spp.area.import",
57+
"method_name": "test_method",
58+
"res_model": "spp.area.import",
59+
"res_id": self.area_import_1.id,
60+
"state": "pending",
61+
}
62+
)
63+
64+
# Create job for area_import_2
65+
job3 = self.queue_job_model.create(
66+
{
67+
"name": "Job 1 for Import 2",
68+
"model_name": "spp.area.import",
69+
"method_name": "test_method",
70+
"res_model": "spp.area.import",
71+
"res_id": self.area_import_2.id,
72+
"state": "done",
73+
}
74+
)
75+
76+
# Trigger compute
77+
self.area_import_1._compute_job_ids()
78+
self.area_import_2._compute_job_ids()
79+
80+
# Assert area_import_1 has 2 jobs
81+
self.assertEqual(len(self.area_import_1.job_ids), 2)
82+
self.assertIn(job1, self.area_import_1.job_ids)
83+
self.assertIn(job2, self.area_import_1.job_ids)
84+
self.assertNotIn(job3, self.area_import_1.job_ids)
85+
86+
# Assert area_import_2 has 1 job
87+
self.assertEqual(len(self.area_import_2.job_ids), 1)
88+
self.assertIn(job3, self.area_import_2.job_ids)
89+
self.assertNotIn(job1, self.area_import_2.job_ids)
90+
self.assertNotIn(job2, self.area_import_2.job_ids)
91+
92+
def test_03_has_ongoing_jobs_no_jobs(self):
93+
"""Test has_ongoing_jobs when there are no jobs"""
94+
self.area_import_1._compute_has_ongoing_jobs()
95+
self.area_import_2._compute_has_ongoing_jobs()
96+
97+
self.assertFalse(self.area_import_1.has_ongoing_jobs)
98+
self.assertFalse(self.area_import_2.has_ongoing_jobs)
99+
100+
def test_04_has_ongoing_jobs_with_pending_job(self):
101+
"""Test has_ongoing_jobs when there is a pending job"""
102+
# Create a pending job
103+
self.queue_job_model.create(
104+
{
105+
"name": "Pending Job",
106+
"model_name": "spp.area.import",
107+
"method_name": "test_method",
108+
"res_model": "spp.area.import",
109+
"res_id": self.area_import_1.id,
110+
"state": "pending",
111+
}
112+
)
113+
114+
# Trigger compute on both records
115+
self.area_import_1._compute_has_ongoing_jobs()
116+
self.area_import_2._compute_has_ongoing_jobs()
117+
118+
# Both should show has_ongoing_jobs = True because it checks the entire model
119+
self.assertTrue(self.area_import_1.has_ongoing_jobs)
120+
self.assertTrue(self.area_import_2.has_ongoing_jobs)
121+
122+
def test_05_has_ongoing_jobs_with_enqueued_job(self):
123+
"""Test has_ongoing_jobs when there is an enqueued job"""
124+
# Clean up previous jobs
125+
self.queue_job_model.search([("res_model", "=", "spp.area.import")]).unlink()
126+
127+
# Create an enqueued job
128+
self.queue_job_model.create(
129+
{
130+
"name": "Enqueued Job",
131+
"model_name": "spp.area.import",
132+
"method_name": "test_method",
133+
"res_model": "spp.area.import",
134+
"res_id": self.area_import_2.id,
135+
"state": "enqueued",
136+
}
137+
)
138+
139+
# Trigger compute
140+
self.area_import_1._compute_has_ongoing_jobs()
141+
self.area_import_2._compute_has_ongoing_jobs()
142+
143+
# Both should show has_ongoing_jobs = True
144+
self.assertTrue(self.area_import_1.has_ongoing_jobs)
145+
self.assertTrue(self.area_import_2.has_ongoing_jobs)
146+
147+
def test_06_has_ongoing_jobs_with_started_job(self):
148+
"""Test has_ongoing_jobs when there is a started job"""
149+
# Clean up previous jobs
150+
self.queue_job_model.search([("res_model", "=", "spp.area.import")]).unlink()
151+
152+
# Create a started job
153+
self.queue_job_model.create(
154+
{
155+
"name": "Started Job",
156+
"model_name": "spp.area.import",
157+
"method_name": "test_method",
158+
"res_model": "spp.area.import",
159+
"res_id": self.area_import_1.id,
160+
"state": "started",
161+
}
162+
)
163+
164+
# Trigger compute
165+
self.area_import_1._compute_has_ongoing_jobs()
166+
self.area_import_2._compute_has_ongoing_jobs()
167+
168+
# Both should show has_ongoing_jobs = True
169+
self.assertTrue(self.area_import_1.has_ongoing_jobs)
170+
self.assertTrue(self.area_import_2.has_ongoing_jobs)
171+
172+
def test_07_has_ongoing_jobs_with_done_job(self):
173+
"""Test has_ongoing_jobs when all jobs are done"""
174+
# Clean up previous jobs
175+
self.queue_job_model.search([("res_model", "=", "spp.area.import")]).unlink()
176+
177+
# Create only done/failed jobs
178+
self.queue_job_model.create(
179+
{
180+
"name": "Done Job",
181+
"model_name": "spp.area.import",
182+
"method_name": "test_method",
183+
"res_model": "spp.area.import",
184+
"res_id": self.area_import_1.id,
185+
"state": "done",
186+
}
187+
)
188+
self.queue_job_model.create(
189+
{
190+
"name": "Failed Job",
191+
"model_name": "spp.area.import",
192+
"method_name": "test_method",
193+
"res_model": "spp.area.import",
194+
"res_id": self.area_import_2.id,
195+
"state": "failed",
196+
}
197+
)
198+
199+
# Trigger compute
200+
self.area_import_1._compute_has_ongoing_jobs()
201+
self.area_import_2._compute_has_ongoing_jobs()
202+
203+
# Both should show has_ongoing_jobs = False
204+
self.assertFalse(self.area_import_1.has_ongoing_jobs)
205+
self.assertFalse(self.area_import_2.has_ongoing_jobs)
206+
207+
def test_08_has_ongoing_jobs_mixed_states(self):
208+
"""Test has_ongoing_jobs with mixed job states"""
209+
# Clean up previous jobs
210+
self.queue_job_model.search([("res_model", "=", "spp.area.import")]).unlink()
211+
212+
# Create jobs with different states
213+
self.queue_job_model.create(
214+
{
215+
"name": "Done Job",
216+
"model_name": "spp.area.import",
217+
"method_name": "test_method",
218+
"res_model": "spp.area.import",
219+
"res_id": self.area_import_1.id,
220+
"state": "done",
221+
}
222+
)
223+
self.queue_job_model.create(
224+
{
225+
"name": "Pending Job",
226+
"model_name": "spp.area.import",
227+
"method_name": "test_method",
228+
"res_model": "spp.area.import",
229+
"res_id": self.area_import_2.id,
230+
"state": "pending",
231+
}
232+
)
233+
234+
# Trigger compute
235+
self.area_import_1._compute_has_ongoing_jobs()
236+
self.area_import_2._compute_has_ongoing_jobs()
237+
238+
# Both should show has_ongoing_jobs = True because there's one pending job
239+
self.assertTrue(self.area_import_1.has_ongoing_jobs)
240+
self.assertTrue(self.area_import_2.has_ongoing_jobs)
241+
242+
def test_09_job_ids_empty_for_different_model(self):
243+
"""Test that job_ids is empty when jobs belong to different model"""
244+
# Create a job with different res_model
245+
self.queue_job_model.create(
246+
{
247+
"name": "Job for different model",
248+
"model_name": "res.partner",
249+
"method_name": "test_method",
250+
"res_model": "res.partner",
251+
"res_id": 1,
252+
"state": "done",
253+
}
254+
)
255+
256+
# Trigger compute
257+
self.area_import_1._compute_job_ids()
258+
259+
# Should not include jobs from other models
260+
self.assertEqual(len(self.area_import_1.job_ids), 0)
261+
262+
def test_10_has_ongoing_jobs_different_model(self):
263+
"""Test that has_ongoing_jobs is not affected by jobs from different models"""
264+
# Clean up previous jobs
265+
self.queue_job_model.search([("res_model", "=", "spp.area.import")]).unlink()
266+
267+
# Create a pending job for a different model
268+
self.queue_job_model.create(
269+
{
270+
"name": "Job for different model",
271+
"model_name": "res.partner",
272+
"method_name": "test_method",
273+
"res_model": "res.partner",
274+
"res_id": 1,
275+
"state": "pending",
276+
}
277+
)
278+
279+
# Trigger compute
280+
self.area_import_1._compute_has_ongoing_jobs()
281+
282+
# Should not be affected by jobs from other models
283+
self.assertFalse(self.area_import_1.has_ongoing_jobs)

0 commit comments

Comments
 (0)