From 08bf97219210f6d421e8cbfeba03b601b4194fd0 Mon Sep 17 00:00:00 2001 From: Taimoor Sohail Date: Mon, 20 Apr 2026 20:15:11 +1000 Subject: [PATCH 1/8] Add distributed checkpoint reproducibility test --- test/test_distributed_utils.jl | 102 +++++++++++++++++++++++++++++++++ 1 file changed, 102 insertions(+) diff --git a/test/test_distributed_utils.jl b/test/test_distributed_utils.jl index a594bb4bb..0d7221475 100644 --- a/test/test_distributed_utils.jl +++ b/test/test_distributed_utils.jl @@ -1,5 +1,6 @@ include("runtests_setup.jl") +using Glob using MPI MPI.Init() @@ -9,6 +10,7 @@ using NCDatasets using NumericalEarth.DataWrangling: metadata_path using Oceananigans.DistributedComputations using Oceananigans.DistributedComputations: reconstruct_global_grid +using Oceananigans.OutputWriters: Checkpointer # We start by building a fake bathymetry on rank 0 and save it to file rm("./trivial_bathymetry.nc", force=true) @@ -103,4 +105,104 @@ end end end +@testset "Distributed EarthSystemModel checkpointing" begin + mpi_ranks = MPI.Comm_size(MPI.COMM_WORLD) + + if mpi_ranks == 1 + @warn "Skipping distributed checkpointing test because only one MPI rank is active." + else + # Use an x-only partition that matches the active MPI communicator. + arch = Distributed(CPU(), partition=Partition(mpi_ranks, 1)) + + # Choose a size divisible by mpi_ranks for robust domain decomposition. + grid = LatitudeLongitudeGrid(arch; + size = (12 * mpi_ranks, 24, 4), + z = (-100, 0), + latitude = (-80, 80), + longitude = (0, 360), + halo = (6, 6, 6)) + + function make_coupled_model(grid, arch) + @inline hi(λ, φ) = φ > 70 || φ < -70 + + ocean = ocean_simulation(grid, closure=nothing) + set!(ocean.model, T=20, S=35, u=0.01, v=-0.005) + + sea_ice = sea_ice_simulation(grid, ocean) + set!(sea_ice.model, h=hi, ℵ=hi) + + backend = JRA55NetCDFBackend(4) + atmosphere = JRA55PrescribedAtmosphere(arch; backend) + + return OceanSeaIceModel(ocean, sea_ice; atmosphere) + end + + # Reference run: run to 3, then continue to 6. + model = make_coupled_model(grid, arch) + simulation = Simulation(model, Δt=60, stop_iteration=3) + run!(simulation) + + simulation = Simulation(model, Δt=60, stop_iteration=6) + run!(simulation) + + ref_T = Array(interior(model.ocean.model.tracers.T)) + ref_S = Array(interior(model.ocean.model.tracers.S)) + ref_u = Array(interior(model.ocean.model.velocities.u)) + ref_v = Array(interior(model.ocean.model.velocities.v)) + ref_h = Array(interior(model.sea_ice.model.ice_thickness)) + ref_ui = Array(interior(model.sea_ice.model.velocities.u)) + ref_vi = Array(interior(model.sea_ice.model.velocities.v)) + ref_time = model.clock.time + ref_iteration = model.clock.iteration + + # Checkpointed run: run to 3 and checkpoint. + model = make_coupled_model(grid, arch) + simulation = Simulation(model, Δt=60, stop_iteration=3) + + prefix = "distributed_osm_checkpointer_test_rank$(MPI.Comm_rank(MPI.COMM_WORLD))" + simulation.output_writers[:checkpointer] = Checkpointer(simulation.model; + schedule = IterationInterval(3), + prefix = prefix) + run!(simulation) + + @test !isempty(glob("$(prefix)_iteration3*.jld2")) + + # Recreate and restore from latest checkpoint. + model = make_coupled_model(grid, arch) + simulation = Simulation(model, Δt=60, stop_iteration=6) + simulation.output_writers[:checkpointer] = Checkpointer(model; + schedule = IterationInterval(3), + prefix = prefix) + + set!(simulation; checkpoint=:latest) + @test simulation.model.clock.iteration == 3 + + set!(simulation; iteration=3) + @test simulation.model.clock.iteration == 3 + + run!(simulation) + + T = Array(interior(model.ocean.model.tracers.T)) + S = Array(interior(model.ocean.model.tracers.S)) + u = Array(interior(model.ocean.model.velocities.u)) + v = Array(interior(model.ocean.model.velocities.v)) + h = Array(interior(model.sea_ice.model.ice_thickness)) + ui = Array(interior(model.sea_ice.model.velocities.u)) + vi = Array(interior(model.sea_ice.model.velocities.v)) + + # Match serial checkpointer tolerances. + @test T ≈ ref_T rtol=1e-13 + @test S ≈ ref_S rtol=1e-13 + @test h ≈ ref_h rtol=1e-13 + @test u ≈ ref_u rtol=1e-10 + @test v ≈ ref_v rtol=1e-10 + @test ui ≈ ref_ui rtol=1e-10 + @test vi ≈ ref_vi rtol=1e-10 + @test model.clock.time == ref_time + @test model.clock.iteration == ref_iteration + + rm.(glob("$(prefix)_iteration*.jld2"), force=true) + end +end + MPI.Finalize() From 454afc83475bac0ca31e4c222b4fc9605469a7a9 Mon Sep 17 00:00:00 2001 From: Taimoor Sohail Date: Tue, 21 Apr 2026 14:47:02 +1000 Subject: [PATCH 2/8] Update CI.yml to run distributed tests --- .github/workflows/ci.yml | 46 ++-- test/runtests.jl | 14 +- test/test_distributed_utils.jl | 412 +++++++++++++++++++-------------- 3 files changed, 269 insertions(+), 203 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 6620ffa0b..602ed5f21 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -237,30 +237,27 @@ jobs: ##### Distributed MPI tests (GitHub-hosted runners) ##### - # distributed_tests: - # name: Distributed MPI tests (4 ranks) - # runs-on: ubuntu-latest - # timeout-minutes: 60 - # env: - # OPENBLAS_NUM_THREADS: 1 - # JULIA_NUM_PRECOMPILE_TASKS: 8 - # steps: - # - uses: actions/checkout@v6 - # - uses: julia-actions/setup-julia@v2 - # with: - # version: '1.12.5' - # - name: Instantiate and precompile - # shell: julia --project --color=yes {0} - # run: | - # using Pkg - # Pkg.instantiate(verbose=true) - # Pkg.precompile(strict=true) - # - name: Install mpiexecjl - # run: julia --project --color=yes -e 'using MPI; MPI.install_mpiexecjl(destdir=".")' - # - name: Run distributed tests - # env: - # TEST_GROUP: distributed - # run: ./mpiexecjl -np 4 julia --project --color=yes -e 'using Pkg; Pkg.test()' + distributed_tests: + name: Distributed MPI tests (4 ranks) + runs-on: ubuntu-latest + timeout-minutes: 60 + env: + OPENBLAS_NUM_THREADS: 1 + JULIA_NUM_PRECOMPILE_TASKS: 8 + TEST_GROUP: distributed_mpi + steps: + - uses: actions/checkout@v6 + - uses: julia-actions/setup-julia@v2 + with: + version: "1.12.5" + arch: default + - uses: julia-actions/cache@v3 + with: + cache-scratchspaces: false + - uses: julia-actions/julia-buildpkg@v1 + - uses: julia-actions/julia-runtest@v1 + with: + test_args: "--verbose" ##### ##### The final test (a final check) @@ -273,6 +270,7 @@ jobs: - gpu_tests - cds_downloading - reactant + - distributed_tests runs-on: ubuntu-latest if: ${{ success() }} steps: diff --git a/test/runtests.jl b/test/runtests.jl index 39e6985b0..ef5067494 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -7,6 +7,8 @@ using Scratch using NumericalEarth.DataWrangling: download_dataset using ParallelTestRunner: find_tests, parse_args, filter_tests!, runtests +group = get(ENV, "TEST_GROUP", "all") + # Start with autodiscovered tests testsuite = find_tests(@__DIR__) @@ -109,8 +111,10 @@ function __init__() end # Initialize and download required datasets -__init__() - -runtests(NumericalEarth, args; testsuite) - -delete_inpainted_files(@get_scratch!(".")) +if group == "distributed_mpi" + include("test_distributed_utils.jl") +else + __init__() + runtests(NumericalEarth, args; testsuite) + delete_inpainted_files(@get_scratch!(".")) +end diff --git a/test/test_distributed_utils.jl b/test/test_distributed_utils.jl index 0d7221475..f1382b66e 100644 --- a/test/test_distributed_utils.jl +++ b/test/test_distributed_utils.jl @@ -2,207 +2,271 @@ include("runtests_setup.jl") using Glob using MPI -MPI.Init() -using CFTime -using Dates -using NCDatasets -using NumericalEarth.DataWrangling: metadata_path -using Oceananigans.DistributedComputations -using Oceananigans.DistributedComputations: reconstruct_global_grid -using Oceananigans.OutputWriters: Checkpointer +function run_mpi_script(filename, script; nranks=4) + script_path = joinpath(@__DIR__, filename) + write(script_path, script) -# We start by building a fake bathymetry on rank 0 and save it to file -rm("./trivial_bathymetry.nc", force=true) - -res = 0.5 # degrees -λ = -180+res/2:res:180-res/2 -φ = 0:res:50 + try + run(`$(mpiexec()) -n $(nranks) $(Base.julia_cmd()) -O0 $(script_path)`) + finally + rm(script_path, force=true) + end -Nλ = length(λ) -Nφ = length(φ) + return nothing +end -@root begin - ds = NCDataset("./trivial_bathymetry.nc", "c") +function distributed_ecco_download_script() + return raw""" + include("runtests_setup.jl") - # Define the dimension "lon" and "lat" with the size Nλ and Nφ respectively - defDim(ds, "lon", Nλ) - defDim(ds, "lat", Nφ) - defVar(ds, "lat", Float32, ("lat", )) - defVar(ds, "lon", Float32, ("lon", )) + using MPI + MPI.Init() - # Define the variables z - z = defVar(ds, "z", Float32, ("lon", "lat")) + using CFTime + using Dates + using NumericalEarth.DataWrangling: metadata_path + using Oceananigans.DistributedComputations - # Generate some example data - data = [Float32(-i) for i = 1:Nλ, j = 1:Nφ] + @testset "Distributed ECCO download" begin + dates = DateTimeProlepticGregorian(1992, 1, 1) : Month(1) : DateTimeProlepticGregorian(1994, 4, 1) + metadata = Metadata(:u_velocity; dataset=ECCO4Monthly(), dates) + download_dataset(metadata) - # write a the complete data set - ds["lon"][:] = λ - ds["lat"][:] = φ - z[:, :] = data + @root for metadatum in metadata + @test isfile(metadata_path(metadatum)) + end + end - close(ds) + MPI.Barrier(MPI.COMM_WORLD) + MPI.Finalize() + """ end -struct TrivalBathymetry end +function distributed_bathymetry_interpolation_script() + return raw""" + include("runtests_setup.jl") -import NumericalEarth.DataWrangling: download_dataset, z_interfaces, longitude_interfaces, latitude_interfaces, metadata_filename + using MPI + MPI.Init() -download_dataset(::Metadatum{<:TrivalBathymetry}) = nothing -Base.size(::TrivalBathymetry) = (Nλ, Nφ, 1) -Base.size(::TrivalBathymetry, variable) = (Nλ, Nφ, 1) -z_interfaces(::TrivalBathymetry) = (0, 1) -longitude_interfaces(::TrivalBathymetry) = (-180, 180) -latitude_interfaces(::TrivalBathymetry) = (0, 50) -metadata_filename(::TrivalBathymetry, name, date, bounding_box) = "trivial_bathymetry.nc" + using NCDatasets + using Oceananigans.DistributedComputations -@testset "Distributed ECCO download" begin - dates = DateTimeProlepticGregorian(1992, 1, 1) : Month(1) : DateTimeProlepticGregorian(1994, 4, 1) - metadata = Metadata(:u_velocity; dataset=ECCO4Monthly(), dates) - download_dataset(metadata) + bathymetry_path = joinpath(@__DIR__, "trivial_bathymetry.nc") - @root for metadatum in metadata - @test isfile(metadata_path(metadatum)) - end -end + # We start by building a fake bathymetry on rank 0 and save it to file. + @root begin + rm(bathymetry_path, force=true) -@testset "Distributed Bathymetry interpolation" begin - TrivialBathymetry_metadata = Metadata(:z, TrivalBathymetry(), nothing, nothing, ".") - - global_grid = LatitudeLongitudeGrid(CPU(); - size = (40, 40, 1), - longitude = (0, 100), - latitude = (0, 20), - z = (0, 1)) - - interpolation_passes = 4 - global_height = regrid_bathymetry(global_grid, TrivialBathymetry_metadata; - interpolation_passes) - - arch_x = Distributed(CPU(), partition=Partition(4, 1)) - arch_y = Distributed(CPU(), partition=Partition(1, 4)) - arch_xy = Distributed(CPU(), partition=Partition(2, 2)) - - for arch in (arch_x, arch_y, arch_xy) - local_grid = LatitudeLongitudeGrid(arch; - size = (40, 40, 1), - longitude = (0, 100), - latitude = (0, 20), - z = (0, 1)) - - local_height = regrid_bathymetry(local_grid, TrivialBathymetry_metadata; - interpolation_passes) - - Nx, Ny, _ = size(local_grid) - rx, ry, _ = arch.local_index - irange = (rx - 1) * Nx + 1 : rx * Nx - jrange = (ry - 1) * Ny + 1 : ry * Ny - - begin - @test interior(global_height, irange, jrange, 1) == interior(local_height, :, :, 1) - end + res = 0.5 # degrees + λ = -180+res/2:res:180-res/2 + φ = 0:res:50 + + Nλ = length(λ) + Nφ = length(φ) + + ds = NCDataset(bathymetry_path, "c") + + # Define the dimensions "lon" and "lat". + defDim(ds, "lon", Nλ) + defDim(ds, "lat", Nφ) + defVar(ds, "lat", Float32, ("lat", )) + defVar(ds, "lon", Float32, ("lon", )) + + # Define variable z. + z = defVar(ds, "z", Float32, ("lon", "lat")) + + # Generate some example data. + data = [Float32(-i) for i = 1:Nλ, j = 1:Nφ] + + ds["lon"][:] = λ + ds["lat"][:] = φ + z[:, :] = data + + close(ds) end -end -@testset "Distributed EarthSystemModel checkpointing" begin - mpi_ranks = MPI.Comm_size(MPI.COMM_WORLD) + MPI.Barrier(MPI.COMM_WORLD) + + struct TrivalBathymetry end + + import NumericalEarth.DataWrangling: download_dataset, z_interfaces, longitude_interfaces, latitude_interfaces, metadata_filename + + download_dataset(::Metadatum{<:TrivalBathymetry}) = nothing + Base.size(::TrivalBathymetry) = (720, 101, 1) + Base.size(::TrivalBathymetry, variable) = (720, 101, 1) + z_interfaces(::TrivalBathymetry) = (0, 1) + longitude_interfaces(::TrivalBathymetry) = (-180, 180) + latitude_interfaces(::TrivalBathymetry) = (0, 50) + metadata_filename(::TrivalBathymetry, name, date, bounding_box) = "trivial_bathymetry.nc" + + @testset "Distributed Bathymetry interpolation" begin + TrivialBathymetry_metadata = Metadata(:z, TrivalBathymetry(), nothing, nothing, @__DIR__) - if mpi_ranks == 1 - @warn "Skipping distributed checkpointing test because only one MPI rank is active." - else - # Use an x-only partition that matches the active MPI communicator. - arch = Distributed(CPU(), partition=Partition(mpi_ranks, 1)) + global_grid = LatitudeLongitudeGrid(CPU(); + size = (40, 40, 1), + longitude = (0, 100), + latitude = (0, 20), + z = (0, 1)) - # Choose a size divisible by mpi_ranks for robust domain decomposition. - grid = LatitudeLongitudeGrid(arch; - size = (12 * mpi_ranks, 24, 4), - z = (-100, 0), - latitude = (-80, 80), - longitude = (0, 360), - halo = (6, 6, 6)) + interpolation_passes = 4 + global_height = regrid_bathymetry(global_grid, TrivialBathymetry_metadata; + interpolation_passes) - function make_coupled_model(grid, arch) - @inline hi(λ, φ) = φ > 70 || φ < -70 + arch_x = Distributed(CPU(), partition=Partition(4, 1)) + arch_y = Distributed(CPU(), partition=Partition(1, 4)) + arch_xy = Distributed(CPU(), partition=Partition(2, 2)) - ocean = ocean_simulation(grid, closure=nothing) - set!(ocean.model, T=20, S=35, u=0.01, v=-0.005) + for arch in (arch_x, arch_y, arch_xy) + local_grid = LatitudeLongitudeGrid(arch; + size = (40, 40, 1), + longitude = (0, 100), + latitude = (0, 20), + z = (0, 1)) - sea_ice = sea_ice_simulation(grid, ocean) - set!(sea_ice.model, h=hi, ℵ=hi) + local_height = regrid_bathymetry(local_grid, TrivialBathymetry_metadata; + interpolation_passes) - backend = JRA55NetCDFBackend(4) - atmosphere = JRA55PrescribedAtmosphere(arch; backend) + Nx, Ny, _ = size(local_grid) + rx, ry, _ = arch.local_index + irange = (rx - 1) * Nx + 1 : rx * Nx + jrange = (ry - 1) * Ny + 1 : ry * Ny - return OceanSeaIceModel(ocean, sea_ice; atmosphere) + @test interior(global_height, irange, jrange, 1) == interior(local_height, :, :, 1) end + end - # Reference run: run to 3, then continue to 6. - model = make_coupled_model(grid, arch) - simulation = Simulation(model, Δt=60, stop_iteration=3) - run!(simulation) - - simulation = Simulation(model, Δt=60, stop_iteration=6) - run!(simulation) - - ref_T = Array(interior(model.ocean.model.tracers.T)) - ref_S = Array(interior(model.ocean.model.tracers.S)) - ref_u = Array(interior(model.ocean.model.velocities.u)) - ref_v = Array(interior(model.ocean.model.velocities.v)) - ref_h = Array(interior(model.sea_ice.model.ice_thickness)) - ref_ui = Array(interior(model.sea_ice.model.velocities.u)) - ref_vi = Array(interior(model.sea_ice.model.velocities.v)) - ref_time = model.clock.time - ref_iteration = model.clock.iteration - - # Checkpointed run: run to 3 and checkpoint. - model = make_coupled_model(grid, arch) - simulation = Simulation(model, Δt=60, stop_iteration=3) - - prefix = "distributed_osm_checkpointer_test_rank$(MPI.Comm_rank(MPI.COMM_WORLD))" - simulation.output_writers[:checkpointer] = Checkpointer(simulation.model; - schedule = IterationInterval(3), - prefix = prefix) - run!(simulation) - - @test !isempty(glob("$(prefix)_iteration3*.jld2")) - - # Recreate and restore from latest checkpoint. - model = make_coupled_model(grid, arch) - simulation = Simulation(model, Δt=60, stop_iteration=6) - simulation.output_writers[:checkpointer] = Checkpointer(model; - schedule = IterationInterval(3), - prefix = prefix) - - set!(simulation; checkpoint=:latest) - @test simulation.model.clock.iteration == 3 - - set!(simulation; iteration=3) - @test simulation.model.clock.iteration == 3 - - run!(simulation) - - T = Array(interior(model.ocean.model.tracers.T)) - S = Array(interior(model.ocean.model.tracers.S)) - u = Array(interior(model.ocean.model.velocities.u)) - v = Array(interior(model.ocean.model.velocities.v)) - h = Array(interior(model.sea_ice.model.ice_thickness)) - ui = Array(interior(model.sea_ice.model.velocities.u)) - vi = Array(interior(model.sea_ice.model.velocities.v)) - - # Match serial checkpointer tolerances. - @test T ≈ ref_T rtol=1e-13 - @test S ≈ ref_S rtol=1e-13 - @test h ≈ ref_h rtol=1e-13 - @test u ≈ ref_u rtol=1e-10 - @test v ≈ ref_v rtol=1e-10 - @test ui ≈ ref_ui rtol=1e-10 - @test vi ≈ ref_vi rtol=1e-10 - @test model.clock.time == ref_time - @test model.clock.iteration == ref_iteration - - rm.(glob("$(prefix)_iteration*.jld2"), force=true) + MPI.Barrier(MPI.COMM_WORLD) + @root rm(bathymetry_path, force=true) + MPI.Barrier(MPI.COMM_WORLD) + MPI.Finalize() + """ +end + +function distributed_earth_system_checkpointing_script() + return raw""" + include("runtests_setup.jl") + + using Glob + using MPI + MPI.Init() + + using Oceananigans.OutputWriters: Checkpointer + + @testset "Distributed EarthSystemModel checkpointing" begin + mpi_ranks = MPI.Comm_size(MPI.COMM_WORLD) + + if mpi_ranks == 1 + @warn "Skipping distributed checkpointing test because only one MPI rank is active." + else + # Use an x-only partition that matches the active MPI communicator. + arch = Distributed(CPU(), partition=Partition(mpi_ranks, 1)) + + # Choose a size divisible by mpi_ranks for robust domain decomposition. + grid = LatitudeLongitudeGrid(arch; + size = (12 * mpi_ranks, 24, 4), + z = (-100, 0), + latitude = (-80, 80), + longitude = (0, 360), + halo = (6, 6, 6)) + + function make_coupled_model(grid, arch) + @inline hi(λ, φ) = φ > 70 || φ < -70 + + ocean = ocean_simulation(grid, closure=nothing) + set!(ocean.model, T=20, S=35, u=0.01, v=-0.005) + + sea_ice = sea_ice_simulation(grid, ocean) + set!(sea_ice.model, h=hi, ℵ=hi) + + backend = JRA55NetCDFBackend(4) + atmosphere = JRA55PrescribedAtmosphere(arch; backend) + + return OceanSeaIceModel(ocean, sea_ice; atmosphere) + end + + # Reference run: run to 3, then continue to 6. + model = make_coupled_model(grid, arch) + simulation = Simulation(model, Δt=60, stop_iteration=3) + run!(simulation) + + simulation = Simulation(model, Δt=60, stop_iteration=6) + run!(simulation) + + ref_T = Array(interior(model.ocean.model.tracers.T)) + ref_S = Array(interior(model.ocean.model.tracers.S)) + ref_u = Array(interior(model.ocean.model.velocities.u)) + ref_v = Array(interior(model.ocean.model.velocities.v)) + ref_h = Array(interior(model.sea_ice.model.ice_thickness)) + ref_ui = Array(interior(model.sea_ice.model.velocities.u)) + ref_vi = Array(interior(model.sea_ice.model.velocities.v)) + ref_time = model.clock.time + ref_iteration = model.clock.iteration + + # Checkpointed run: run to 3 and checkpoint. + model = make_coupled_model(grid, arch) + simulation = Simulation(model, Δt=60, stop_iteration=3) + + prefix = "distributed_osm_checkpointer_test_rank$(MPI.Comm_rank(MPI.COMM_WORLD))" + simulation.output_writers[:checkpointer] = Checkpointer(simulation.model; + schedule = IterationInterval(3), + prefix = prefix) + run!(simulation) + + @test !isempty(glob("$(prefix)_iteration3*.jld2")) + + # Recreate and restore from latest checkpoint. + model = make_coupled_model(grid, arch) + simulation = Simulation(model, Δt=60, stop_iteration=6) + simulation.output_writers[:checkpointer] = Checkpointer(model; + schedule = IterationInterval(3), + prefix = prefix) + + set!(simulation; checkpoint=:latest) + @test simulation.model.clock.iteration == 3 + + set!(simulation; iteration=3) + @test simulation.model.clock.iteration == 3 + + run!(simulation) + + T = Array(interior(model.ocean.model.tracers.T)) + S = Array(interior(model.ocean.model.tracers.S)) + u = Array(interior(model.ocean.model.velocities.u)) + v = Array(interior(model.ocean.model.velocities.v)) + h = Array(interior(model.sea_ice.model.ice_thickness)) + ui = Array(interior(model.sea_ice.model.velocities.u)) + vi = Array(interior(model.sea_ice.model.velocities.v)) + + # Match serial checkpointer tolerances. + @test T ≈ ref_T rtol=1e-13 + @test S ≈ ref_S rtol=1e-13 + @test h ≈ ref_h rtol=1e-13 + @test u ≈ ref_u rtol=1e-10 + @test v ≈ ref_v rtol=1e-10 + @test ui ≈ ref_ui rtol=1e-10 + @test vi ≈ ref_vi rtol=1e-10 + @test model.clock.time == ref_time + @test model.clock.iteration == ref_iteration + + rm.(glob("$(prefix)_iteration*.jld2"), force=true) + end end + + MPI.Barrier(MPI.COMM_WORLD) + MPI.Finalize() + """ end -MPI.Finalize() +@testset "Distributed ECCO download" begin + run_mpi_script("distributed_ecco_download.jl", distributed_ecco_download_script()) +end + +@testset "Distributed Bathymetry interpolation" begin + run_mpi_script("distributed_bathymetry_interpolation.jl", distributed_bathymetry_interpolation_script()) +end + +@testset "Distributed EarthSystemModel checkpointing" begin + run_mpi_script("distributed_earth_system_checkpointing.jl", distributed_earth_system_checkpointing_script()) +end From 4d166b4b3a3ce3fc68acd31611776105d07efcaf Mon Sep 17 00:00:00 2001 From: Taimoor Sohail Date: Tue, 21 Apr 2026 17:30:22 +1000 Subject: [PATCH 3/8] Add modules to test ECCO downloading in distributed --- test/test_distributed_utils.jl | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test/test_distributed_utils.jl b/test/test_distributed_utils.jl index f1382b66e..eca614222 100644 --- a/test/test_distributed_utils.jl +++ b/test/test_distributed_utils.jl @@ -26,6 +26,8 @@ function distributed_ecco_download_script() using CFTime using Dates using NumericalEarth.DataWrangling: metadata_path + using NumericalEarth.ECCO + using NumericalEarth.ECCO: download_dataset using Oceananigans.DistributedComputations @testset "Distributed ECCO download" begin From 93aa067a505ab3c2948f9a762ca2e49545982f88 Mon Sep 17 00:00:00 2001 From: Taimoor Sohail Date: Tue, 21 Apr 2026 17:52:35 +1000 Subject: [PATCH 4/8] Update test/runtests.jl Co-authored-by: Simone Silvestri --- test/runtests.jl | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/test/runtests.jl b/test/runtests.jl index ef5067494..d7c69f51a 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -111,10 +111,6 @@ function __init__() end # Initialize and download required datasets -if group == "distributed_mpi" - include("test_distributed_utils.jl") -else - __init__() - runtests(NumericalEarth, args; testsuite) - delete_inpainted_files(@get_scratch!(".")) -end +__init__() +runtests(NumericalEarth, args; testsuite) +delete_inpainted_files(@get_scratch!(".")) From ffe0aec22b41997eecc81018242c68eaaa677e8b Mon Sep 17 00:00:00 2001 From: Simone Silvestri Date: Thu, 23 Apr 2026 09:41:38 +0200 Subject: [PATCH 5/8] run MPI tests --- .github/workflows/ci.yml | 38 ++++++++++++++++++++++++++------------ 1 file changed, 26 insertions(+), 12 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 602ed5f21..71ba844e7 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -238,26 +238,40 @@ jobs: ##### distributed_tests: - name: Distributed MPI tests (4 ranks) - runs-on: ubuntu-latest - timeout-minutes: 60 - env: - OPENBLAS_NUM_THREADS: 1 - JULIA_NUM_PRECOMPILE_TASKS: 8 - TEST_GROUP: distributed_mpi + name: Distributed (CPU) - Julia ${{ matrix.version }} - ${{ matrix.os }} - ${{ matrix.arch }} - ${{ github.event_name }} + runs-on: ${{ matrix.os }} + timeout-minutes: 80 + strategy: + fail-fast: false + matrix: + version: + - "1.12.5" + os: + - ubuntu-latest + arch: + - default steps: - uses: actions/checkout@v6 - uses: julia-actions/setup-julia@v2 with: - version: "1.12.5" - arch: default + version: ${{ matrix.version }} + arch: ${{ matrix.arch }} - uses: julia-actions/cache@v3 with: cache-scratchspaces: false - - uses: julia-actions/julia-buildpkg@v1 - - uses: julia-actions/julia-runtest@v1 + - name: Run tests + run: | + julia --project --color=yes --check-bounds=yes -e ' + using Pkg; + Pkg.test(; coverage=true, + julia_args=["--check-bounds=yes", "--compiled-modules=yes", "-O0"], + test_args=["--verbose", "test_distributed_utils"]) + ' + - uses: julia-actions/julia-processcoverage@v1 + - uses: codecov/codecov-action@v5 with: - test_args: "--verbose" + files: lcov.info + token: ${{ secrets.CODECOV_TOKEN }} ##### ##### The final test (a final check) From cfeefdc95547a6b46e6f3c586a9d7e3ccd8796b2 Mon Sep 17 00:00:00 2001 From: Simone Silvestri Date: Thu, 23 Apr 2026 14:25:26 +0200 Subject: [PATCH 6/8] actually run the mpi tests --- test/runtests.jl | 3 --- 1 file changed, 3 deletions(-) diff --git a/test/runtests.jl b/test/runtests.jl index d7c69f51a..ec9898d8a 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -7,8 +7,6 @@ using Scratch using NumericalEarth.DataWrangling: download_dataset using ParallelTestRunner: find_tests, parse_args, filter_tests!, runtests -group = get(ENV, "TEST_GROUP", "all") - # Start with autodiscovered tests testsuite = find_tests(@__DIR__) @@ -18,7 +16,6 @@ args = parse_args(ARGS) # download_utils and runtests_setup are not tests! delete!(testsuite, "runtests_setup") delete!(testsuite, "download_utils") -delete!(testsuite, "test_distributed_utils") gpu_test = parse(Bool, get(ENV, "GPU_TEST", "false")) From 088cf26d53776a89c5498b3a27f35fe5b4d95879 Mon Sep 17 00:00:00 2001 From: Simone Silvestri Date: Thu, 23 Apr 2026 15:05:53 +0200 Subject: [PATCH 7/8] try reducing the bounds --- test/test_distributed_utils.jl | 184 ++++++++++++++++----------------- 1 file changed, 90 insertions(+), 94 deletions(-) diff --git a/test/test_distributed_utils.jl b/test/test_distributed_utils.jl index eca614222..89c9d81f2 100644 --- a/test/test_distributed_utils.jl +++ b/test/test_distributed_utils.jl @@ -159,101 +159,97 @@ function distributed_earth_system_checkpointing_script() @testset "Distributed EarthSystemModel checkpointing" begin mpi_ranks = MPI.Comm_size(MPI.COMM_WORLD) - if mpi_ranks == 1 - @warn "Skipping distributed checkpointing test because only one MPI rank is active." - else - # Use an x-only partition that matches the active MPI communicator. - arch = Distributed(CPU(), partition=Partition(mpi_ranks, 1)) - - # Choose a size divisible by mpi_ranks for robust domain decomposition. - grid = LatitudeLongitudeGrid(arch; - size = (12 * mpi_ranks, 24, 4), - z = (-100, 0), - latitude = (-80, 80), - longitude = (0, 360), - halo = (6, 6, 6)) - - function make_coupled_model(grid, arch) - @inline hi(λ, φ) = φ > 70 || φ < -70 - - ocean = ocean_simulation(grid, closure=nothing) - set!(ocean.model, T=20, S=35, u=0.01, v=-0.005) - - sea_ice = sea_ice_simulation(grid, ocean) - set!(sea_ice.model, h=hi, ℵ=hi) - - backend = JRA55NetCDFBackend(4) - atmosphere = JRA55PrescribedAtmosphere(arch; backend) - - return OceanSeaIceModel(ocean, sea_ice; atmosphere) - end - - # Reference run: run to 3, then continue to 6. - model = make_coupled_model(grid, arch) - simulation = Simulation(model, Δt=60, stop_iteration=3) - run!(simulation) - - simulation = Simulation(model, Δt=60, stop_iteration=6) - run!(simulation) - - ref_T = Array(interior(model.ocean.model.tracers.T)) - ref_S = Array(interior(model.ocean.model.tracers.S)) - ref_u = Array(interior(model.ocean.model.velocities.u)) - ref_v = Array(interior(model.ocean.model.velocities.v)) - ref_h = Array(interior(model.sea_ice.model.ice_thickness)) - ref_ui = Array(interior(model.sea_ice.model.velocities.u)) - ref_vi = Array(interior(model.sea_ice.model.velocities.v)) - ref_time = model.clock.time - ref_iteration = model.clock.iteration - - # Checkpointed run: run to 3 and checkpoint. - model = make_coupled_model(grid, arch) - simulation = Simulation(model, Δt=60, stop_iteration=3) - - prefix = "distributed_osm_checkpointer_test_rank$(MPI.Comm_rank(MPI.COMM_WORLD))" - simulation.output_writers[:checkpointer] = Checkpointer(simulation.model; - schedule = IterationInterval(3), - prefix = prefix) - run!(simulation) - - @test !isempty(glob("$(prefix)_iteration3*.jld2")) - - # Recreate and restore from latest checkpoint. - model = make_coupled_model(grid, arch) - simulation = Simulation(model, Δt=60, stop_iteration=6) - simulation.output_writers[:checkpointer] = Checkpointer(model; - schedule = IterationInterval(3), - prefix = prefix) - - set!(simulation; checkpoint=:latest) - @test simulation.model.clock.iteration == 3 - - set!(simulation; iteration=3) - @test simulation.model.clock.iteration == 3 - - run!(simulation) - - T = Array(interior(model.ocean.model.tracers.T)) - S = Array(interior(model.ocean.model.tracers.S)) - u = Array(interior(model.ocean.model.velocities.u)) - v = Array(interior(model.ocean.model.velocities.v)) - h = Array(interior(model.sea_ice.model.ice_thickness)) - ui = Array(interior(model.sea_ice.model.velocities.u)) - vi = Array(interior(model.sea_ice.model.velocities.v)) - - # Match serial checkpointer tolerances. - @test T ≈ ref_T rtol=1e-13 - @test S ≈ ref_S rtol=1e-13 - @test h ≈ ref_h rtol=1e-13 - @test u ≈ ref_u rtol=1e-10 - @test v ≈ ref_v rtol=1e-10 - @test ui ≈ ref_ui rtol=1e-10 - @test vi ≈ ref_vi rtol=1e-10 - @test model.clock.time == ref_time - @test model.clock.iteration == ref_iteration - - rm.(glob("$(prefix)_iteration*.jld2"), force=true) + # Use an x-only partition that matches the active MPI communicator. + arch = Distributed(CPU(), partition=Partition(mpi_ranks, 1)) + + # Choose a size divisible by mpi_ranks for robust domain decomposition. + grid = LatitudeLongitudeGrid(arch; + size = (12 * mpi_ranks, 24, 4), + z = (-100, 0), + latitude = (-10, 10), + longitude = (0, 24), + halo = (6, 6, 6)) + + function make_coupled_model(grid, arch) + @inline hi(λ, φ) = φ > 70 || φ < -70 + + ocean = ocean_simulation(grid, free_surface=SplitExplicitFreeSurface(grid; substeps=30), closure=nothing) + set!(ocean.model, T=20, S=35, u=0.01, v=-0.005) + + sea_ice = sea_ice_simulation(grid, ocean) + set!(sea_ice.model, h=hi, ℵ=hi) + + backend = JRA55NetCDFBackend(4) + atmosphere = JRA55PrescribedAtmosphere(arch; backend) + + return OceanSeaIceModel(ocean, sea_ice; atmosphere) end + + # Reference run: run to 3, then continue to 6. + model = make_coupled_model(grid, arch) + simulation = Simulation(model, Δt=60, stop_iteration=3) + run!(simulation) + + simulation = Simulation(model, Δt=60, stop_iteration=6) + run!(simulation) + + ref_T = Array(interior(model.ocean.model.tracers.T)) + ref_S = Array(interior(model.ocean.model.tracers.S)) + ref_u = Array(interior(model.ocean.model.velocities.u)) + ref_v = Array(interior(model.ocean.model.velocities.v)) + ref_h = Array(interior(model.sea_ice.model.ice_thickness)) + ref_ui = Array(interior(model.sea_ice.model.velocities.u)) + ref_vi = Array(interior(model.sea_ice.model.velocities.v)) + ref_time = model.clock.time + ref_iteration = model.clock.iteration + + # Checkpointed run: run to 3 and checkpoint. + model = make_coupled_model(grid, arch) + simulation = Simulation(model, Δt=60, stop_iteration=3) + + prefix = "distributed_osm_checkpointer_test_rank$(MPI.Comm_rank(MPI.COMM_WORLD))" + simulation.output_writers[:checkpointer] = Checkpointer(simulation.model; + schedule = IterationInterval(3), + prefix = prefix) + run!(simulation) + + @test !isempty(glob("$(prefix)_iteration3*.jld2")) + + # Recreate and restore from latest checkpoint. + model = make_coupled_model(grid, arch) + simulation = Simulation(model, Δt=60, stop_iteration=6) + simulation.output_writers[:checkpointer] = Checkpointer(model; + schedule = IterationInterval(3), + prefix = prefix) + + set!(simulation; checkpoint=:latest) + @test simulation.model.clock.iteration == 3 + + set!(simulation; iteration=3) + @test simulation.model.clock.iteration == 3 + + run!(simulation) + + T = Array(interior(model.ocean.model.tracers.T)) + S = Array(interior(model.ocean.model.tracers.S)) + u = Array(interior(model.ocean.model.velocities.u)) + v = Array(interior(model.ocean.model.velocities.v)) + h = Array(interior(model.sea_ice.model.ice_thickness)) + ui = Array(interior(model.sea_ice.model.velocities.u)) + vi = Array(interior(model.sea_ice.model.velocities.v)) + + # Match serial checkpointer tolerances. + @test T ≈ ref_T rtol=1e-13 + @test S ≈ ref_S rtol=1e-13 + @test h ≈ ref_h rtol=1e-13 + @test u ≈ ref_u rtol=1e-10 + @test v ≈ ref_v rtol=1e-10 + @test ui ≈ ref_ui rtol=1e-10 + @test vi ≈ ref_vi rtol=1e-10 + @test model.clock.time == ref_time + @test model.clock.iteration == ref_iteration + + rm.(glob("$(prefix)_iteration*.jld2"), force=true) end MPI.Barrier(MPI.COMM_WORLD) From 9547d0d843a2833bc2affee49c4c52d379729c1c Mon Sep 17 00:00:00 2001 From: Simone Silvestri Date: Thu, 23 Apr 2026 16:28:51 +0200 Subject: [PATCH 8/8] correct bathymetry tests --- test/test_distributed_utils.jl | 1 + 1 file changed, 1 insertion(+) diff --git a/test/test_distributed_utils.jl b/test/test_distributed_utils.jl index 89c9d81f2..896d11c03 100644 --- a/test/test_distributed_utils.jl +++ b/test/test_distributed_utils.jl @@ -102,6 +102,7 @@ function distributed_bathymetry_interpolation_script() longitude_interfaces(::TrivalBathymetry) = (-180, 180) latitude_interfaces(::TrivalBathymetry) = (0, 50) metadata_filename(::TrivalBathymetry, name, date, bounding_box) = "trivial_bathymetry.nc" + metadata_path(::Metadatum{<:TrivalBathymetry}) = bathymetry_path @testset "Distributed Bathymetry interpolation" begin TrivialBathymetry_metadata = Metadata(:z, TrivalBathymetry(), nothing, nothing, @__DIR__)