diff --git a/dpgen/auto_test/Interstitial.py b/dpgen/auto_test/Interstitial.py index a4b06ae1c..f194041cb 100644 --- a/dpgen/auto_test/Interstitial.py +++ b/dpgen/auto_test/Interstitial.py @@ -183,6 +183,9 @@ def make_confs(self, path_to_work, path_to_equi, refine=False): insert_element_task = os.path.join(path_to_work, "element.out") if os.path.isfile(insert_element_task): os.remove(insert_element_task) + # Always create element.out file, even if it will be empty + with open(insert_element_task, "w") as f: + pass for ii in self.insert_ele: pre_vds = InterstitialGenerator() @@ -467,32 +470,52 @@ def make_confs(self, path_to_work, path_to_equi, refine=False): return task_list def post_process(self, task_list): - if True: - fin1 = open(os.path.join(task_list[0], "..", "element.out")) - for ii in task_list: - conf = os.path.join(ii, "conf.lmp") - inter = os.path.join(ii, "inter.json") - insert_ele = fin1.readline().split()[0] - if os.path.isfile(conf): - with open(conf) as fin2: - conf_line = fin2.read().split("\n") - insert_line = conf_line[-2] - type_map = loadfn(inter)["type_map"] - type_map_list = lammps.element_list(type_map) - if int(insert_line.split()[1]) > len(type_map_list): - type_num = type_map[insert_ele] + 1 - conf_line[2] = str(len(type_map_list)) + " atom types" - conf_line[-2] = ( - "%6.d" % int(insert_line.split()[0]) # noqa: UP031 - + "%7.d" % type_num # noqa: UP031 - + f"{float(insert_line.split()[2]):16.10f}" - + f"{float(insert_line.split()[3]):16.10f}" - + f"{float(insert_line.split()[4]):16.10f}" - ) - with open(conf, "w+") as fout: - for jj in conf_line: - print(jj, file=fout) - fin1.close() + if not task_list: + # No tasks to process + return + + element_out_path = os.path.join(task_list[0], "..", "element.out") + if not os.path.exists(element_out_path): + # element.out doesn't exist, nothing to process + return + + with open(element_out_path) as fin1: + element_lines = fin1.readlines() + + for idx, ii in enumerate(task_list): + conf = os.path.join(ii, "conf.lmp") + inter = os.path.join(ii, "inter.json") + + # Check if we have a corresponding element line + if idx >= len(element_lines): + # No more element entries, skip this task + continue + + element_line = element_lines[idx].strip() + if not element_line: + # Empty line, skip this task + continue + + insert_ele = element_line.split()[0] + if os.path.isfile(conf): + with open(conf) as fin2: + conf_line = fin2.read().split("\n") + insert_line = conf_line[-2] + type_map = loadfn(inter)["type_map"] + type_map_list = lammps.element_list(type_map) + if int(insert_line.split()[1]) > len(type_map_list): + type_num = type_map[insert_ele] + 1 + conf_line[2] = str(len(type_map_list)) + " atom types" + conf_line[-2] = ( + "%6.d" % int(insert_line.split()[0]) # noqa: UP031 + + "%7.d" % type_num # noqa: UP031 + + f"{float(insert_line.split()[2]):16.10f}" + + f"{float(insert_line.split()[3]):16.10f}" + + f"{float(insert_line.split()[4]):16.10f}" + ) + with open(conf, "w+") as fout: + for jj in conf_line: + print(jj, file=fout) def task_type(self): return self.parameter["type"] diff --git a/tests/auto_test/test_interstitial.py b/tests/auto_test/test_interstitial.py index 876502595..d595dbefc 100644 --- a/tests/auto_test/test_interstitial.py +++ b/tests/auto_test/test_interstitial.py @@ -105,3 +105,111 @@ def test_make_confs_bcc(self): center = (inter_site1.coords + inter_site2.coords) / 2 self.assertTrue((center[0] - center[1]) < 1e-4) self.assertTrue((center[1] - center[2]) < 1e-4) + + def test_make_confs_filtered_out(self): + """Test that element.out is created even when all defects are filtered out.""" + # Create a configuration that filters out all defects + _jdata_filtered = { + "structures": ["confs/std-bcc"], + "interaction": { + "type": "vasp", + "incar": "vasp_input/INCAR.rlx", + "potcar_prefix": "vasp_input", + "potcars": {"V": "POTCAR"}, + }, + "properties": [ + { + "type": "interstitial", + "supercell": [1, 1, 1], + "insert_ele": ["V"], + "conf_filters": {"min_dist": 100.0}, # Very high filter - all defects filtered + "bcc_self": False, # Don't add bcc tasks + } + ], + } + + filtered_target_path = "confs/std-bcc/interstitial_filtered" + if not os.path.exists(filtered_target_path): + os.makedirs(filtered_target_path) + + try: + # Copy the test structure + shutil.copy( + os.path.join(self.source_path, "CONTCAR_V_bcc"), + os.path.join(self.equi_path, "CONTCAR"), + ) + + interstitial_filtered = Interstitial(_jdata_filtered["properties"][0]) + task_list = interstitial_filtered.make_confs(filtered_target_path, self.equi_path) + + # Should have no tasks since all are filtered + self.assertEqual(len(task_list), 0) + + # element.out should exist even if empty + element_out_path = os.path.join(filtered_target_path, "element.out") + self.assertTrue(os.path.exists(element_out_path), "element.out should exist even when empty") + + # Test that post_process doesn't crash with empty task list + try: + interstitial_filtered.post_process(task_list) + except Exception as e: + self.fail(f"post_process should handle empty task list gracefully, but got: {e}") + + finally: + # Clean up + if os.path.exists(filtered_target_path): + shutil.rmtree(filtered_target_path) + + def test_make_confs_partial_filtering(self): + """Test that post_process handles mismatched element.out entries gracefully.""" + # Create a configuration that might have some defects filtered + _jdata_partial = { + "structures": ["confs/std-bcc"], + "interaction": { + "type": "vasp", + "incar": "vasp_input/INCAR.rlx", + "potcar_prefix": "vasp_input", + "potcars": {"V": "POTCAR"}, + }, + "properties": [ + { + "type": "interstitial", + "supercell": [1, 1, 1], + "insert_ele": ["V"], + # No conf_filters to ensure some tasks are created + "bcc_self": True, # Add bcc tasks that always create more tasks + } + ], + } + + partial_target_path = "confs/std-bcc/interstitial_partial" + if not os.path.exists(partial_target_path): + os.makedirs(partial_target_path) + + try: + # Copy the test structure + shutil.copy( + os.path.join(self.source_path, "CONTCAR_V_bcc"), + os.path.join(self.equi_path, "CONTCAR"), + ) + + interstitial_partial = Interstitial(_jdata_partial["properties"][0]) + task_list = interstitial_partial.make_confs(partial_target_path, self.equi_path) + + # Should have some tasks (at least the bcc_self ones) + self.assertGreater(len(task_list), 0) + + # element.out should exist + element_out_path = os.path.join(partial_target_path, "element.out") + self.assertTrue(os.path.exists(element_out_path), "element.out should exist") + + # Test that post_process doesn't crash even if element.out and task count don't match + try: + interstitial_partial.post_process(task_list) + except Exception as e: + self.fail(f"post_process should handle mismatched element.out gracefully, but got: {e}") + + finally: + # Clean up + if os.path.exists(partial_target_path): + shutil.rmtree(partial_target_path)