Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ jobs:
- version: '1'
os: windows-latest
arch: x64
- version: '1.6'
- version: '1.10'
os: windows-latest
arch: x64
- version: '1'
os: ubuntu-latest
arch: x64
- version: '1.6'
- version: '1.10'
os: ubuntu-latest
arch: x64
steps:
Expand Down
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,5 @@ docs/site/
# committed for packages, but should be committed for applications that require a static
# environment.
Manifest.toml

*.log
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name = "JobQueueMPI"
uuid = "32d208e1-246e-420c-b6ff-18b71b410923"
authors = ["pedroripper <pedros.ripper@gmail.com>"]
version = "0.1.3"
version = "0.2.0"

[deps]
MPI = "da04e1cc-30fd-572f-bb4f-1f8673147195"
Expand Down
22 changes: 15 additions & 7 deletions src/controller.jl
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ function send_termination_message()
if _is_debug_enabled()
_debug_message("Waiting for termination messsages to be received.")
end
# Wait all asserts that the request were received by the workers.
# and the buffers are deallocated.
_wait_all(requests)
return nothing
end
Expand All @@ -120,25 +122,31 @@ function check_for_job_answers(controller::Controller)
if !is_controller_process()
error("Only the controller process can check for workers' jobs.")
end
for j_i in eachindex(controller.pending_jobs)
for i in eachindex(controller.pending_jobs)
# Tests if the job was received by the worker.
# If it was not is not worth checking if the worker completed the job.
status = MPI.Test(controller.pending_jobs[i].request)
if !status
continue
end
worker_completed_a_job = MPI.Iprobe(
_mpi_comm();
source = controller.pending_jobs[j_i].worker,
tag = controller.pending_jobs[j_i].worker + 32,
source = controller.pending_jobs[i].worker,
tag = controller.pending_jobs[i].worker + 32,
)
if worker_completed_a_job
job_answer = MPI.recv(
_mpi_comm();
source = controller.pending_jobs[j_i].worker,
tag = controller.pending_jobs[j_i].worker + 32,
source = controller.pending_jobs[i].worker,
tag = controller.pending_jobs[i].worker + 32,
)
if _is_debug_enabled()
_debug_message(
"completed job $(job_answer.job_id)",
)
end
controller.worker_status[controller.pending_jobs[j_i].worker] = WORKER_AVAILABLE
deleteat!(controller.pending_jobs, j_i)
controller.worker_status[controller.pending_jobs[i].worker] = WORKER_AVAILABLE
deleteat!(controller.pending_jobs, i)
return job_answer
end
end
Expand Down
4 changes: 3 additions & 1 deletion src/worker.jl
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ function send_job_answer_to_controller(worker::Worker, message)
error("Only the controller process can send job answers.")
end
job = JobAnswer(worker.job_id_running, message)
MPI.isend(job, _mpi_comm(); dest = controller_rank(), tag = worker.rank + 32)
# Send the job answer to the controller process with a blocking call to make sure
# the message is sent and the buffer is deallocated.
MPI.send(job, _mpi_comm(); dest = controller_rank(), tag = worker.rank + 32)
if _is_debug_enabled()
_debug_message("Sending job answer $(job.job_id) to controller")
end
Expand Down