diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index ed694ef..8412c15 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -15,13 +15,13 @@ jobs: - version: '1' os: windows-latest arch: x64 - - version: '1.6' + - version: '1.10' os: windows-latest arch: x64 - version: '1' os: ubuntu-latest arch: x64 - - version: '1.6' + - version: '1.10' os: ubuntu-latest arch: x64 steps: diff --git a/.gitignore b/.gitignore index 29126e4..02adee2 100644 --- a/.gitignore +++ b/.gitignore @@ -22,3 +22,5 @@ docs/site/ # committed for packages, but should be committed for applications that require a static # environment. Manifest.toml + +*.log diff --git a/Project.toml b/Project.toml index 1500afd..ccc198c 100644 --- a/Project.toml +++ b/Project.toml @@ -1,7 +1,7 @@ name = "JobQueueMPI" uuid = "32d208e1-246e-420c-b6ff-18b71b410923" authors = ["pedroripper "] -version = "0.1.3" +version = "0.2.0" [deps] MPI = "da04e1cc-30fd-572f-bb4f-1f8673147195" diff --git a/src/controller.jl b/src/controller.jl index 94cb36b..ff0b703 100644 --- a/src/controller.jl +++ b/src/controller.jl @@ -107,6 +107,8 @@ function send_termination_message() if _is_debug_enabled() _debug_message("Waiting for termination messsages to be received.") end + # Wait all asserts that the request were received by the workers. + # and the buffers are deallocated. _wait_all(requests) return nothing end @@ -120,25 +122,31 @@ function check_for_job_answers(controller::Controller) if !is_controller_process() error("Only the controller process can check for workers' jobs.") end - for j_i in eachindex(controller.pending_jobs) + for i in eachindex(controller.pending_jobs) + # Tests if the job was received by the worker. + # If it was not is not worth checking if the worker completed the job. + status = MPI.Test(controller.pending_jobs[i].request) + if !status + continue + end worker_completed_a_job = MPI.Iprobe( _mpi_comm(); - source = controller.pending_jobs[j_i].worker, - tag = controller.pending_jobs[j_i].worker + 32, + source = controller.pending_jobs[i].worker, + tag = controller.pending_jobs[i].worker + 32, ) if worker_completed_a_job job_answer = MPI.recv( _mpi_comm(); - source = controller.pending_jobs[j_i].worker, - tag = controller.pending_jobs[j_i].worker + 32, + source = controller.pending_jobs[i].worker, + tag = controller.pending_jobs[i].worker + 32, ) if _is_debug_enabled() _debug_message( "completed job $(job_answer.job_id)", ) end - controller.worker_status[controller.pending_jobs[j_i].worker] = WORKER_AVAILABLE - deleteat!(controller.pending_jobs, j_i) + controller.worker_status[controller.pending_jobs[i].worker] = WORKER_AVAILABLE + deleteat!(controller.pending_jobs, i) return job_answer end end diff --git a/src/worker.jl b/src/worker.jl index 1d03c0f..f80796e 100644 --- a/src/worker.jl +++ b/src/worker.jl @@ -28,7 +28,9 @@ function send_job_answer_to_controller(worker::Worker, message) error("Only the controller process can send job answers.") end job = JobAnswer(worker.job_id_running, message) - MPI.isend(job, _mpi_comm(); dest = controller_rank(), tag = worker.rank + 32) + # Send the job answer to the controller process with a blocking call to make sure + # the message is sent and the buffer is deallocated. + MPI.send(job, _mpi_comm(); dest = controller_rank(), tag = worker.rank + 32) if _is_debug_enabled() _debug_message("Sending job answer $(job.job_id) to controller") end