diff --git a/MCintegration/integrators.py b/MCintegration/integrators.py index ea5c78d..9654b67 100644 --- a/MCintegration/integrators.py +++ b/MCintegration/integrators.py @@ -233,7 +233,7 @@ def __call__(self, neval, nblock=32, verbose=-1, **kwargs): if verbose > 0: print( - f"nblock = {nblock}, n_steps_perblock = {epoch_perblock}, batch_size = {self.batch_size}, actual neval = {self.batch_size*epoch_perblock*nblock}" + f"nblock = {nblock}, n_steps_perblock = {epoch_perblock}, batch_size = {self.batch_size}, actual neval = {self.batch_size * epoch_perblock * nblock}" ) config = Configuration( @@ -356,13 +356,13 @@ def __call__( else: nblock = epoch // nsteps_perblock n_meas_perblock = nsteps_perblock // meas_freq - assert ( - n_meas_perblock > 0 - ), f"neval ({neval}) should be larger than batch_size * nblock * meas_freq ({self.batch_size} * {nblock} * {meas_freq})" + assert n_meas_perblock > 0, ( + f"neval ({neval}) should be larger than batch_size * nblock * meas_freq ({self.batch_size} * {nblock} * {meas_freq})" + ) if verbose > 0: print( - f"nblock = {nblock}, n_meas_perblock = {n_meas_perblock}, meas_freq = {meas_freq}, batch_size = {self.batch_size}, actual neval = {self.batch_size*nsteps_perblock*nblock}" + f"nblock = {nblock}, n_meas_perblock = {n_meas_perblock}, meas_freq = {meas_freq}, batch_size = {self.batch_size}, actual neval = {self.batch_size * nsteps_perblock * nblock}" ) config = Configuration( diff --git a/MCintegration/integrators_test.py b/MCintegration/integrators_test.py index 56a90fe..b0ebb33 100644 --- a/MCintegration/integrators_test.py +++ b/MCintegration/integrators_test.py @@ -1,10 +1,84 @@ import unittest +from unittest.mock import patch, MagicMock +import os import torch import numpy as np from integrators import Integrator, MonteCarlo, MarkovChainMonteCarlo +from integrators import get_ip, get_open_port, setup +from integrators import gaussian, random_walk -# from base import LinearMap, Uniform from maps import Configuration +from base import EPSILON + + +class TestIntegrators(unittest.TestCase): + @patch("socket.socket") + def test_get_ip(self, mock_socket): + # Mock the socket behavior + mock_socket_instance = mock_socket.return_value + mock_socket_instance.getsockname.return_value = ("192.168.1.1", 12345) + ip = get_ip() + self.assertEqual(ip, "192.168.1.1") + + @patch("socket.socket") + def test_get_open_port(self, mock_socket): + # Mock the socket behavior + mock_socket_instance = mock_socket.return_value.__enter__.return_value + mock_socket_instance.getsockname.return_value = ("0.0.0.0", 54321) + port = get_open_port() + self.assertEqual(port, 54321) + + @patch("torch.distributed.init_process_group") + @patch("torch.cuda.set_device") + @patch.dict(os.environ, {"LOCAL_RANK": "0"}) + def test_setup(self, mock_set_device, mock_init_process_group): + setup(backend="gloo") + mock_init_process_group.assert_called_once_with(backend="gloo") + mock_set_device.assert_called_once_with(0) + + def test_random_walk(self): + # Test random_walk function with default parameters + dim = 3 + device = "cpu" + dtype = torch.float32 + u = torch.rand(dim, device=device, dtype=dtype) + step_size = 0.2 + + new_u = random_walk(dim, device, dtype, u, step_size=step_size) + + # Validate output shape and range + self.assertEqual(new_u.shape, u.shape) + self.assertTrue(torch.all(new_u >= 0) and torch.all(new_u <= 1)) + + # Test with custom step size + custom_step_size = 0.5 + new_u_custom = random_walk(dim, device, dtype, u, step_size=custom_step_size) + self.assertEqual(new_u_custom.shape, u.shape) + self.assertTrue(torch.all(new_u_custom >= 0) and torch.all(new_u_custom <= 1)) + + def test_gaussian(self): + # Test gaussian function with default parameters + dim = 3 + device = "cpu" + dtype = torch.float32 + u = torch.rand(dim, device=device, dtype=dtype) + + mean = torch.zeros_like(u) + std = torch.ones_like(u) + + new_u = gaussian(dim, device, dtype, u, mean=mean, std=std) + + # Validate output shape + self.assertEqual(new_u.shape, u.shape) + + # Test with custom mean and std + custom_mean = torch.full_like(u, 0.5) + custom_std = torch.full_like(u, 0.1) + new_u_custom = gaussian(dim, device, dtype, u, mean=custom_mean, std=custom_std) + + self.assertEqual(new_u_custom.shape, u.shape) + self.assertTrue(torch.all(new_u_custom > custom_mean - 3 * custom_std)) + self.assertTrue(torch.all(new_u_custom < custom_mean + 3 * custom_std)) class TestIntegrator(unittest.TestCase): @@ -25,6 +99,46 @@ def test_initialization(self): self.assertTrue(hasattr(integrator.maps, "device")) self.assertTrue(hasattr(integrator.maps, "dtype")) + @patch("MCintegration.maps.CompositeMap") + @patch("MCintegration.base.LinearMap") + def test_initialization_with_maps(self, mock_linear_map, mock_composite_map): + # Mock the LinearMap and CompositeMap + mock_linear_map_instance = MagicMock() + mock_linear_map.return_value = mock_linear_map_instance + mock_composite_map_instance = MagicMock() + mock_composite_map.return_value = mock_composite_map_instance + + # Create a mock map + mock_map = MagicMock() + mock_map.device = "cpu" + mock_map.dtype = torch.float32 + mock_map.forward_with_detJ.return_value = (torch.rand(10, 2), torch.rand(10)) + + # Initialize Integrator with maps + integrator = Integrator( + bounds=self.bounds, f=self.f, maps=mock_map, batch_size=self.batch_size + ) + + # Assertions + self.assertEqual(integrator.dim, 2) + self.assertEqual(integrator.batch_size, 1000) + self.assertEqual(integrator.f_dim, 1) + self.assertTrue(hasattr(integrator.maps, "forward_with_detJ")) + self.assertTrue(hasattr(integrator.maps, "device")) + self.assertTrue(hasattr(integrator.maps, "dtype")) + + integrator = Integrator( + bounds=self.bounds, + f=self.f, + maps=mock_map, + batch_size=self.batch_size, + device="cpu", + ) + + # Assertions + self.assertEqual(integrator.device, "cpu") + self.assertTrue(hasattr(integrator.maps, "device")) + def test_bounds_conversion(self): # Test various input types test_cases = [ @@ -69,11 +183,11 @@ def test_invalid_bounds(self): with self.assertRaises(error_type): Integrator(bounds=bounds, f=self.f) - def test_device_handling(self): - if torch.cuda.is_available(): - integrator = Integrator(bounds=self.bounds, f=self.f, device="cuda") - self.assertTrue(integrator.bounds.is_cuda) - self.assertTrue(integrator.maps.device == "cuda") + # def test_device_handling(self): + # if torch.cuda.is_available(): + # integrator = Integrator(bounds=self.bounds, f=self.f, device="cuda") + # self.assertTrue(integrator.bounds.is_cuda) + # self.assertTrue(integrator.maps.device == "cuda") def test_dtype_handling(self): dtypes = [torch.float32, torch.float64] @@ -167,6 +281,27 @@ def test_batch_size_handling(self): # Should not raise warning self.mc(neval=neval, nblock=nblock) + def test_block_size_warning(self): + mc = MonteCarlo(bounds=self.bounds, f=self.simple_integral, batch_size=1000) + with self.assertWarns(UserWarning): + mc(neval=500, nblock=10) # neval too small for nblock + + def test_varying_nblock(self): + test_cases = [ + (10000, 10), # Standard case + (10000, 1), # Single block + (10000, 100), # Many blocks + ] + + for neval, nblock in test_cases: + with self.subTest(neval=neval, nblock=nblock): + result = self.mc(neval=neval, nblock=nblock) + if hasattr(result, "mean"): + value = result.mean + else: + value = result + self.assertAlmostEqual(float(value), 1.0, delta=0.1) + class TestMarkovChainMonteCarlo(unittest.TestCase): def setUp(self): @@ -237,29 +372,40 @@ def test_burnin_effect(self): value = result self.assertAlmostEqual(float(value), 1.0, delta=tolerance) - # def test_mix_rate_sensitivity(self): - # # Modified mix rate test to be more robust - # mix_rates = [0.0, 0.5, 1.0] - # results = [] + def test_sample_acceptance(self): + config = Configuration( + self.mcmc.batch_size, + self.mcmc.dim, + self.mcmc.f_dim, + self.mcmc.device, + self.mcmc.dtype, + ) + config.u, config.detJ = self.mcmc.q0.sample_with_detJ(self.mcmc.batch_size) + config.x, detj = self.mcmc.maps.forward_with_detJ(config.u) + config.detJ *= detj + config.weight = torch.rand(self.mcmc.batch_size, device=self.mcmc.device) - # for mix_rate in mix_rates: - # accumulated_error = 0 - # n_trials = 3 # Run multiple trials for each mix_rate + self.mcmc.sample(config, nsteps=1, mix_rate=0.5) - # for _ in range(n_trials): - # result = self.mcmc(neval=50000, mix_rate=mix_rate, nblock=10) - # if hasattr(result, "mean"): - # value = result.mean - # error = result.sdev - # else: - # value = result - # error = abs(float(value) - 1.0) - # accumulated_error += error + # Validate acceptance logic + self.assertTrue(torch.all(config.weight >= EPSILON)) + self.assertEqual(config.u.shape, config.x.shape) - # results.append(accumulated_error / n_trials) + def test_varying_mix_rate(self): + test_cases = [ + (0.1, 0.2), # Low mix rate + (0.5, 0.1), # Medium mix rate + (0.9, 0.05), # High mix rate + ] - # # We expect moderate mix rates to have lower average error - # self.assertLess(results[1], max(results[0], results[2])) + for mix_rate, tolerance in test_cases: + with self.subTest(mix_rate=mix_rate): + result = self.mcmc(neval=50000, mix_rate=mix_rate, nblock=10) + if hasattr(result, "mean"): + value = result.mean + else: + value = result + self.assertAlmostEqual(float(value), 1.0, delta=tolerance) class TestDistributedFunctionality(unittest.TestCase): @@ -271,26 +417,26 @@ def test_distributed_initialization(self): self.assertEqual(integrator.rank, 0) self.assertEqual(integrator.world_size, 1) - @unittest.skipIf(not torch.distributed.is_available(), "Distributed not available") - def test_multi_gpu_consistency(self): - if torch.cuda.device_count() >= 2: - bounds = torch.tensor([[0.0, 1.0]], dtype=torch.float64) - f = lambda x, fx: torch.ones_like(x) + # @unittest.skipIf(not torch.distributed.is_available(), "Distributed not available") + # def test_multi_gpu_consistency(self): + # if torch.cuda.device_count() >= 2: + # bounds = torch.tensor([[0.0, 1.0]], dtype=torch.float64) + # f = lambda x, fx: torch.ones_like(x) - # Create two integrators on different devices - integrator1 = Integrator(bounds=bounds, f=f, device="cuda:0") - integrator2 = Integrator(bounds=bounds, f=f, device="cuda:1") + # # Create two integrators on different devices + # integrator1 = Integrator(bounds=bounds, f=f, device="cuda:0") + # integrator2 = Integrator(bounds=bounds, f=f, device="cuda:1") - # Results should be consistent across devices - result1 = integrator1(neval=10000) - result2 = integrator2(neval=10000) + # # Results should be consistent across devices + # result1 = integrator1(neval=10000) + # result2 = integrator2(neval=10000) - if hasattr(result1, "mean"): - value1, value2 = result1.mean, result2.mean - else: - value1, value2 = result1, result2 + # if hasattr(result1, "mean"): + # value1, value2 = result1.mean, result2.mean + # else: + # value1, value2 = result1, result2 - self.assertAlmostEqual(float(value1), float(value2), places=1) + # self.assertAlmostEqual(float(value1), float(value2), places=1) if __name__ == "__main__": diff --git a/MCintegration/maps_test.py b/MCintegration/maps_test.py index d54cd36..7863828 100644 --- a/MCintegration/maps_test.py +++ b/MCintegration/maps_test.py @@ -137,6 +137,20 @@ def test_forward(self): self.assertEqual(x.shape, u.shape) self.assertEqual(log_jac.shape, (u.shape[0],)) + def test_forward_out_of_bounds(self): + # Test forward transformation with out-of-bounds u values + u = torch.tensor( + [[1.5, 0.5], [-0.1, 0.5]], dtype=torch.float64 + ) # Out-of-bounds values + x, log_jac = self.vegas.forward(u) + + # Check that out-of-bounds x values are clamped to grid boundaries + self.assertTrue(torch.all(x >= 0.0)) + self.assertTrue(torch.all(x <= 1.0)) + + # Check log determinant adjustment for out-of-bounds cases + self.assertEqual(log_jac.shape, (u.shape[0],)) + def test_inverse(self): # Test inverse transformation x = torch.tensor([[0.1, 0.2], [0.3, 0.4]], dtype=torch.float64) @@ -144,6 +158,20 @@ def test_inverse(self): self.assertEqual(u.shape, x.shape) self.assertEqual(log_jac.shape, (x.shape[0],)) + def test_inverse_out_of_bounds(self): + # Test inverse transformation with out-of-bounds x values + x = torch.tensor( + [[1.5, 0.5], [-0.1, 0.5]], dtype=torch.float64 + ) # Out-of-bounds values + u, log_jac = self.vegas.inverse(x) + + # Check that out-of-bounds u values are clamped to [0, 1] + self.assertTrue(torch.all(u >= 0.0)) + self.assertTrue(torch.all(u <= 1.0)) + + # Check log determinant adjustment for out-of-bounds cases + self.assertEqual(log_jac.shape, (x.shape[0],)) + def test_train(self): # Test training the Vegas class def f(x, fx): diff --git a/MCintegration/utils_test.py b/MCintegration/utils_test.py index a803b1e..d186774 100644 --- a/MCintegration/utils_test.py +++ b/MCintegration/utils_test.py @@ -142,21 +142,39 @@ def test_converged(self): self.weighted_ravg.add(gvar.gvar(1.0, 0.01)) self.assertTrue(self.weighted_ravg.converged(0.1, 0.1)) - # def test_multiplication(self): - # ravg1 = RAvg(weighted=True) - # ravg1.update(2.0, 0.1) - # ravg2 = RAvg(weighted=True) - # ravg2.update(3.0, 0.1) - # result = ravg1 * ravg2 - # self.assertAlmostEqual(result.mean, 6.0) - - # def test_division(self): - # ravg1 = RAvg(weighted=True) - # ravg1.update(6.0, 0.1) - # ravg2 = RAvg(weighted=True) - # ravg2.update(3.0, 0.1) - # result = ravg1 / ravg2 - # self.assertAlmostEqual(result.mean, 2.0) + def test_reduce_ex_serialization(self): + ravg = RAvg(weighted=True) + ravg.add(gvar.gvar(1.0, 0.1)) + reduced = ravg.__reduce_ex__(2) + restored = reduced[0](*reduced[1]) + self.assertEqual(restored.mean, ravg.mean) + self.assertEqual(restored.sdev, ravg.sdev) + + def test_summary_output(self): + ravg = RAvg(weighted=True) + ravg.add(gvar.gvar(1.0, 0.1)) + summary = ravg.summary() + self.assertIn("itn", summary) + self.assertIn("integral", summary) + self.assertIn("wgt average", summary) + + def test_converged_criteria(self): + ravg = RAvg(weighted=True) + ravg.add(gvar.gvar(1.0, 0.1)) + self.assertTrue(ravg.converged(0.1, 0.1)) + self.assertFalse(ravg.converged(0.001, 0.001)) + + def test_multiplication_with_another_ravg(self): + ravg1 = RAvg(weighted=True) + ravg1.update(2.0, 0.1) + ravg2 = RAvg(weighted=True) + ravg2.update(3.0, 0.1) + + result = ravg1 * ravg2 + self.assertAlmostEqual(result.mean, 6.0) + sdev = (0.1 / 2**2 + 0.1 / 3**2) ** 0.5 * 6.0 + self.assertAlmostEqual(result.sdev, sdev) + def test_multiplication(self): ravg1 = RAvg(weighted=True) # Test multiplication by another RAvg object @@ -198,6 +216,17 @@ def test_multiplication(self): np.allclose([r.sdev for r in result], [2.0 * ravg1.sdev, 3.0 * ravg1.sdev]) ) + def test_division_with_another_ravg(self): + ravg1 = RAvg(weighted=True) + ravg1.update(6.0, 0.1) + ravg2 = RAvg(weighted=True) + ravg2.update(3.0, 0.1) + + result = ravg1 / ravg2 + self.assertAlmostEqual(result.mean, 2.0) + sdev = (0.1 / 6.0**2 + 0.1 / 3.0**2) ** 0.5 * 2.0 + self.assertAlmostEqual(result.sdev, sdev) + def test_division(self): ravg1 = RAvg(weighted=True) ravg1.update(6.0, 0.1) @@ -254,13 +283,17 @@ def test_set_seed_cpu(self): # Test set_seed on a CPU-only environment set_seed(42) self.assertEqual(torch.initial_seed(), 42) - - @unittest.skipIf(not torch.cuda.is_available(), "CUDA is not available") - def test_set_seed_cuda(self): - # Test set_seed on a CUDA-enabled environment + u1 = torch.rand(10) set_seed(42) - self.assertEqual(torch.initial_seed(), 42) - self.assertEqual(torch.cuda.initial_seed(), 42) + u2 = torch.rand(10) + self.assertTrue(torch.all(u1 == u2)) + + # @unittest.skipIf(not torch.cuda.is_available(), "CUDA is not available") + # def test_set_seed_cuda(self): + # # Test set_seed on a CUDA-enabled environment + # set_seed(42) + # self.assertEqual(torch.initial_seed(), 42) + # self.assertEqual(torch.cuda.initial_seed(), 42) @unittest.skipIf(torch.cuda.is_available(), "CUDA is available") def test_get_device_cpu(self): @@ -268,11 +301,17 @@ def test_get_device_cpu(self): device = get_device() self.assertEqual(device, torch.device("cpu")) - @unittest.skipIf(not torch.cuda.is_available(), "CUDA is not available") - def test_get_device_cuda(self): - # Test get_device when CUDA is available - device = get_device() - self.assertEqual(device, torch.cuda.current_device()) + # @unittest.skipIf(not torch.cuda.is_available(), "CUDA is not available") + # def test_get_device_cuda(self): + # # Test get_device when CUDA is available + # device = get_device() + # self.assertEqual(device, torch.cuda.current_device()) + + def test_get_device_cuda_inactive(self): + if not torch.cuda.is_available(): + torch.cuda.set_device(-1) # Simulate inactive CUDA + device = get_device() + self.assertEqual(device, torch.device("cpu")) if __name__ == "__main__":