Skip to content

Commit f87d0a8

Browse files
committed
fix job queue
1 parent e6e8354 commit f87d0a8

File tree

2 files changed

+28
-18
lines changed

2 files changed

+28
-18
lines changed

xtraplatform-jobs/src/main/java/de/ii/xtraplatform/jobs/app/JobQueueBackendLocal.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ protected Optional<Job> getJob(String jobId) {
126126
}
127127

128128
@Override
129-
protected void queueJob(Job job, boolean untake) {
129+
protected synchronized void queueJob(Job job, boolean untake) {
130130
Deque<String> queue = getQueue(job.getType(), job.getPriority());
131131
updateJob(job);
132132

@@ -178,7 +178,7 @@ protected Job doneJob(Job job) {
178178
}
179179

180180
@Override
181-
protected Optional<Job> takeJob(Deque<String> queue) {
181+
protected synchronized Optional<Job> takeJob(Deque<String> queue) {
182182
if (!queue.isEmpty()) {
183183
String jobId = queue.remove();
184184
takenQueue.add(jobId);
@@ -190,7 +190,7 @@ protected Optional<Job> takeJob(Deque<String> queue) {
190190
}
191191

192192
@Override
193-
protected Optional<Job> untakeJob(String jobId) {
193+
protected synchronized Optional<Job> untakeJob(String jobId) {
194194
Optional<String> id =
195195
takenQueue.stream().filter(takenId -> Objects.equals(takenId, jobId)).findFirst();
196196

xtraplatform-jobs/src/main/java/de/ii/xtraplatform/jobs/app/OpsEndpointJobs.java

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -169,23 +169,34 @@ public synchronized Response takeJob(Map<String, String> executor)
169169
@ApiResponse(responseCode = "500", description = "Internal server error")
170170
})
171171
public synchronized Response updateJob(
172-
@PathParam("jobId") String jobId, Map<String, Object> progress)
173-
throws JsonProcessingException {
174-
Optional<Job> job =
175-
jobQueue.getTaken().stream()
176-
.filter(job1 -> Objects.equals(job1.getId(), jobId))
177-
.findFirst();
172+
@PathParam("jobId") String jobId, Map<String, Object> progress) {
173+
try {
174+
Optional<Job> job =
175+
jobQueue.getTaken().stream()
176+
.filter(job1 -> Objects.equals(job1.getId(), jobId))
177+
.findFirst();
178178

179-
if (job.isPresent()) {
180-
int delta =
181-
progress.containsKey("delta") ? Integer.parseInt((String) progress.get("delta")) : 0;
179+
if (job.isPresent()) {
180+
int delta = (Integer) progress.getOrDefault("delta", 0);
182181

183-
jobQueue.updateJob(job.get(), delta);
182+
jobQueue.updateJob(job.get(), delta);
183+
184+
if (delta > 0 && job.get().getPartOf().isPresent()) {
185+
JobSet set = jobQueue.getSet(job.get().getPartOf().get());
186+
jobQueue.updateJobSet(set, delta, progress);
187+
}
184188

185-
if (delta > 0 && job.get().getPartOf().isPresent()) {
186-
JobSet set = jobQueue.getSet(job.get().getPartOf().get());
187-
jobQueue.updateJobSet(set, delta, progress);
189+
if (LOGGER.isTraceEnabled() || LOGGER.isTraceEnabled(MARKER.JOBS)) {
190+
LOGGER.trace(
191+
MARKER.JOBS, "Job {} progress updated by remote executor ({})", jobId, progress);
192+
}
193+
} else {
194+
if (LOGGER.isWarnEnabled() || LOGGER.isWarnEnabled(MARKER.JOBS)) {
195+
LOGGER.warn(MARKER.JOBS, "Received progress update for unknown job {}", jobId);
196+
}
188197
}
198+
} catch (Throwable e) {
199+
LOGGER.error("Error while updating job {}", jobId, e);
189200
}
190201

191202
return Response.noContent().build();
@@ -202,8 +213,7 @@ public synchronized Response updateJob(
202213
@ApiResponse(responseCode = "500", description = "Internal server error")
203214
})
204215
public synchronized Response closeJob(
205-
@PathParam("jobId") String jobId, @Parameter(hidden = true) Map<String, String> result)
206-
throws JsonProcessingException {
216+
@PathParam("jobId") String jobId, @Parameter(hidden = true) Map<String, String> result) {
207217
if (result.containsKey("error") && Objects.nonNull(result.get("error"))) {
208218
boolean retry =
209219
jobQueue.error(jobId, result.get("error"), Boolean.parseBoolean(result.get("retry")));

0 commit comments

Comments
 (0)