diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index cdc454c2..8f8629a8 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -11,16 +11,16 @@ jobs: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 - - name: Set up Python 3.9 - uses: actions/setup-python@v2 + - uses: actions/checkout@v6 + - name: Set up Python 3.10 + uses: actions/setup-python@v6 with: - python-version: 3.9 + python-version: '3.10' - name: Install dependencies run: | python -m pip install --upgrade pip - pip install flake8 pytest mypy if [ -f requirements.txt ]; then pip install -r requirements.txt; fi + pip install flake8 pytest mypy - name: Lint with flake8 run: | # stop the build if there are Python syntax errors or undefined names @@ -30,19 +30,21 @@ jobs: - name: Lint with mypy run: mypy nimrod/test_suite_generation/ nimrod/test_suites_execution/ nimrod/dynamic_analysis/ nimrod/core nimrod/output_generation nimrod/__main__.py nimrod/smat.py --ignore-missing-imports - name: Setup Java - uses: actions/setup-java@v2 + uses: actions/setup-java@v5 with: - distribution: 'adopt' + distribution: 'temurin' java-version: '8' - name: Setup Maven - uses: stCarolas/setup-maven@v4.1 + uses: stCarolas/setup-maven@v5 + with: + maven-version: 3.8.2 - name: Creating env-config.json run: | - cd /home/runner/work/SMAT/SMAT/nimrod/tests/ - java_path="/opt/hostedtoolcache/Java_Adopt_jdk/$(ls /opt/hostedtoolcache/Java_Adopt_jdk)/x64" - contents="$(jq --arg java_path "$java_path" '.java_home=$java_path | .maven_home = "/opt/hostedtoolcache/maven/3.5.4/x64"' env-config.json)" + repo_name=$(basename $GITHUB_REPOSITORY) + cd /home/runner/work/$repo_name/$repo_name/nimrod/tests/ + contents="$(jq --arg java_path "$JAVA_HOME" --arg maven_path "${MAVEN_HOME:-/opt/hostedtoolcache/maven/3.8.2/x64}" '.java_home=$java_path | .maven_home=$maven_path' env-config.json)" echo "${contents}" > env-config.json - cd /home/runner/work/SMAT/SMAT + cd /home/runner/work/$repo_name/$repo_name - name: Test with pytest run: | - pytest -k 'not test_general_behavior_study_semantic_conflict' + pytest -k 'not test_general_behavior_study_semantic_conflict' --color=yes diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 00000000..9ed02675 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,17 @@ +services: + smat: + build: + context: . + dockerfile: docker/Dockerfile + args: + USER_ID: ${USER_ID} + GROUP_ID: ${GROUP_ID} + image: smat-ubuntu + container_name: smat_container + volumes: + # Mount the current directory to /app in the container + - .:/app + # Mount the dataset directory to /data/dataset in the container (read-only) + - /path/to/your/mergedataset/:/data/dataset:ro + stdin_open: true + tty: true \ No newline at end of file diff --git a/docker/Dockerfile b/docker/Dockerfile new file mode 100644 index 00000000..bcecacd9 --- /dev/null +++ b/docker/Dockerfile @@ -0,0 +1,44 @@ +FROM ubuntu:22.04 + +ENV DEBIAN_FRONTEND=noninteractive + +ARG USER_ID +ARG GROUP_ID + +# 1. Install system dependencies, Python, and Java in a single layer +RUN apt-get update && \ + apt-get install -y --no-install-recommends \ + python3.11 \ + python3-pip \ + python3.11-dev \ + openjdk-8-jdk \ + maven \ + git \ + curl \ + jq \ + build-essential \ + ca-certificates && \ + + groupadd -g ${GROUP_ID} appuser && \ + useradd -m -u ${USER_ID} -g ${GROUP_ID} appuser && \ + + update-alternatives --install /usr/bin/python3 python3 /usr/bin/python3.11 1 && \ + + apt-get clean && rm -rf /var/lib/apt/lists/* + +# 2. Set Environment Variables for Java and Maven +ENV JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64 +ENV MAVEN_HOME=/usr/share/maven +ENV PATH="$MAVEN_HOME/bin:$JAVA_HOME/bin:$PATH" + +# 3. Setup working directory and data mount point +WORKDIR /app +RUN mkdir -p /data/dataset && chown appuser:appuser /data/dataset + +# 4. Install Python dependencies +COPY --chown=appuser:appuser requirements.txt . +RUN python3 -m pip install --no-cache-dir --upgrade pip && \ + python3 -m pip install --no-cache-dir -r requirements.txt ruff mypy pytest + +USER appuser +CMD ["/bin/bash", "-i"] \ No newline at end of file diff --git a/docker/README.md b/docker/README.md new file mode 100644 index 00000000..e530943a --- /dev/null +++ b/docker/README.md @@ -0,0 +1,109 @@ +# Running SMAT with Docker Compose + +This guide explains how to run SMAT in a containerized environment using **Docker Compose**. This ensures a consistent environment with Python 3.11, Java 8, and Maven, regardless of your host operating system. + +## 1. Prerequisites + +Install Docker on your system: + +* **Docker Desktop** (Recommended for Windows and Mac): [Download here](https://docs.docker.com/desktop/) +* **Docker Engine** (For Linux): [Installation Guide](https://docs.docker.com/engine/install/) + +--- + +## 2. Configuration + +Before running the container, you need to configure three files to ensure the paths match the Docker environment. + +### A. Docker Compose (Dataset Path) + +Open `docker-compose.yml` and point the dataset volume to your local path: + +```yaml +volumes: + - .:/app + # Replace the path below with the path to your dataset on your host machine + - /path/to/your/mergedataset/:/data/dataset:ro + +``` + +*Note: The `:ro` flag ensures your dataset is read-only for safety.* + +### B. SMAT Input Config (`input-smat.json`) + +The `input-smat.json` should be in the **root directory** of the project, so that the container can see it. Internally, the scenario jars must point to the `/data/dataset/` path, for example: + +```json +{ + ... + "scenarioJars": { + "base": "/data/dataset/antlr4/69ff2669eec265e25721dbc27cb00f6c381d0b41/...", + ... + }, + ... +} +``` + +### C. Environment Config (`nimrod/tests/env-config.json`) + +Point the `input_smat` path to the location inside the container. If it is on the root folder: + +```json +"input_path": "/app/input-smat.json", +``` + +--- + +## 3. Running the Container + +Navigate to the project root and run the following command according to your OS: + +### Linux & macOS (Terminal) + +The following command passes your user and group IDs to avoid permission issues with generated files: + +```bash +USER_ID=$(id -u) GROUP_ID=$(id -g) docker compose run --rm --build smat +``` + +### Windows (PowerShell) + +In PowerShell, the variables are handled differently: + +```powershell +$env:USER_ID=1000; $env:GROUP_ID=1000; docker compose run --rm --build smat +``` + +*Note: On Windows, the default UID/GID 1000 is usually sufficient for Docker Desktop.* + +--- + +## 4. Usage Inside the Container + +Once the command finishes, you will be inside the Ubuntu shell at `/app`. You can run tests or start an analysis: + +```bash +# Check if the dataset is visible +ls /data/dataset + +# Run SMAT analysis +python3 -m nimrod + +# Run tests +pytest -k 'not test_general_behavior_study_semantic_conflict' +``` + +### Command Breakdown: + +* `run`: Starts a one-off container for interactive use. +* `--rm`: Automatically removes the container upon exit to keep your system clean. +* `--build`: Forces a rebuild of the image if you modified the Dockerfile or requirements. +* `smat`: The service name defined in `docker-compose.yml`. + +--- + +## Troubleshooting + +* **Dataset Not Found**: Ensure the path on the left side of the colon in `docker-compose.yml` is an absolute path to your local folder. +* **Permission Denied**: On Linux, double-check that `USER_ID` and `GROUP_ID` match the output of the `id` command on your host terminal. +* **File Changes**: Since we use volumes, any code change made on your host machine will be instantly reflected inside the container. diff --git a/nimrod/__main__.py b/nimrod/__main__.py index 81df5624..d5604f12 100644 --- a/nimrod/__main__.py +++ b/nimrod/__main__.py @@ -1,5 +1,5 @@ import logging -from typing import Dict, List +from typing import Dict, List, Any from nimrod.dynamic_analysis.behavior_change_checker import BehaviorChangeChecker from nimrod.dynamic_analysis.criteria.first_semantic_conflict_criteria import FirstSemanticConflictCriteria from nimrod.dynamic_analysis.criteria.second_semantic_conflict_criteria import SecondSemanticConflictCriteria @@ -16,6 +16,7 @@ from nimrod.test_suite_generation.generators.randoop_test_suite_generator import RandoopTestSuiteGenerator from nimrod.test_suite_generation.generators.evosuite_differential_test_suite_generator import EvosuiteDifferentialTestSuiteGenerator from nimrod.test_suite_generation.generators.evosuite_test_suite_generator import EvosuiteTestSuiteGenerator +from nimrod.test_suite_generation.generators.ollama_test_suite_generator import OllamaTestSuiteGenerator from nimrod.test_suite_generation.generators.project_test_suite_generator import ProjectTestSuiteGenerator from nimrod.test_suites_execution.main import TestSuitesExecution, TestSuiteExecutor from nimrod.tools.bin import MOD_RANDOOP, RANDOOP @@ -24,9 +25,9 @@ from nimrod.input_parsing.input_parser import CsvInputParser, JsonInputParser -def get_test_suite_generators(config: Dict[str, str]) -> List[TestSuiteGenerator]: +def get_test_suite_generators(config: Dict[str, Any]) -> List[TestSuiteGenerator]: config_generators = config.get( - 'test_suite_generators', ['randoop', 'randoop-modified', 'evosuite', 'evosuite-differential', 'project']) + 'test_suite_generators', ['randoop', 'randoop-modified', 'evosuite', 'evosuite-differential', 'ollama', 'project']) generators: List[TestSuiteGenerator] = list() if 'randoop' in config_generators: @@ -38,13 +39,21 @@ def get_test_suite_generators(config: Dict[str, str]) -> List[TestSuiteGenerator generators.append(EvosuiteTestSuiteGenerator(Java())) if 'evosuite-differential' in config_generators: generators.append(EvosuiteDifferentialTestSuiteGenerator(Java())) + if 'ollama' in config_generators: + # Create one generator instance for each configured model + api_params = config.get('api_params', {}) + if api_params: + for model_key, model_config in api_params.items(): + generators.append(OllamaTestSuiteGenerator(Java(), model_key, model_config)) + else: + generators.append(OllamaTestSuiteGenerator(Java())) if 'project' in config_generators: generators.append(ProjectTestSuiteGenerator(Java())) return generators -def get_output_generators(config: Dict[str, str]) -> List[OutputGenerator]: +def get_output_generators(config: Dict[str, Any]) -> List[OutputGenerator]: config_generators = config.get( 'output_generators', ['behavior_changes', 'semantic_conflicts', 'test_suites']) generators: List[OutputGenerator] = list() @@ -60,7 +69,7 @@ def get_output_generators(config: Dict[str, str]) -> List[OutputGenerator]: return generators -def parse_scenarios_from_input(config: Dict[str, str]) -> List[MergeScenarioUnderAnalysis]: +def parse_scenarios_from_input(config: Dict[str, Any]) -> List[MergeScenarioUnderAnalysis]: json_input = config.get('input_path', "") csv_input_path = config.get('path_hash_csv', "") @@ -95,7 +104,7 @@ def main(): if scenario.run_analysis: smat.run_tool_for_semmantic_conflict_detection(scenario) else: - logging.info(f"Skipping tool execution for project f{scenario.project_name}") + logging.info(f"Skipping tool execution for project {scenario.project_name}") if __name__ == '__main__': diff --git a/nimrod/core/merge_scenario_under_analysis.py b/nimrod/core/merge_scenario_under_analysis.py index e2cec34a..86fb9a56 100644 --- a/nimrod/core/merge_scenario_under_analysis.py +++ b/nimrod/core/merge_scenario_under_analysis.py @@ -1,8 +1,8 @@ -from typing import List, Dict +from typing import List, Dict, Union class MergeScenarioUnderAnalysis: - def __init__(self, project_name: str, run_analysis: bool, scenario_commits: "ScenarioInformation", targets: "Dict[str, List[str]]", scenario_jars: "ScenarioInformation", jar_type: str): + def __init__(self, project_name: str, run_analysis: bool, scenario_commits: "ScenarioInformation", targets: "Dict[str, Union[List[Dict[str, str]], List[str]]]", scenario_jars: "ScenarioInformation", jar_type: str): self.project_name = project_name self.run_analysis = run_analysis self.scenario_commits = scenario_commits diff --git a/nimrod/output_generation/output_generator.py b/nimrod/output_generation/output_generator.py index b7a53440..b3cf03ce 100644 --- a/nimrod/output_generation/output_generator.py +++ b/nimrod/output_generation/output_generator.py @@ -11,7 +11,8 @@ class OutputGenerator(ABC, Generic[T]): - REPORTS_DIRECTORY = path.join(get_base_output_path(), "reports") + parent_dir = path.dirname(get_base_output_path()) + REPORTS_DIRECTORY = path.join(parent_dir, "reports") def __init__(self, report_name: str) -> None: super().__init__() @@ -27,9 +28,29 @@ def write_report(self, context: OutputGeneratorContext) -> None: file_path = path.join(self.REPORTS_DIRECTORY, self._report_name) logging.info(f"Starting data processing of {self._report_name} report") - data = self._generate_report_data(context) + new_data = self._generate_report_data(context) logging.info(f"Finished data processing of {self._report_name} report") - with open(file_path, "w") as write: - json.dump(data, write) + existing_data = self._load_existing_data(file_path) + + if not isinstance(existing_data, list): + existing_data = [existing_data] if existing_data else [] + existing_data.append(new_data) + + self._write_json(file_path, existing_data) logging.info(f"Finished generation of {self._report_name} report") + + def _load_existing_data(self, file_path: str): + """Loads data stored from previous runs so new data is appended instead of overwriting.""" + if not path.exists(file_path): + return [] + try: + with open(file_path, "r") as read_file: + return json.load(read_file) + except json.JSONDecodeError: + return [] + + def _write_json(self, file_path: str, data) -> None: + """Writes data to a JSON file with indentation for readability.""" + with open(file_path, "w") as write_file: + json.dump(data, write_file, indent=4) diff --git a/nimrod/output_generation/semantic_conflicts_output_generator.py b/nimrod/output_generation/semantic_conflicts_output_generator.py index 12c582c0..73a742fa 100644 --- a/nimrod/output_generation/semantic_conflicts_output_generator.py +++ b/nimrod/output_generation/semantic_conflicts_output_generator.py @@ -1,8 +1,9 @@ -from typing import Dict, List, TypedDict +from typing import Dict, List, TypedDict, Union from nimrod.output_generation.output_generator import OutputGenerator, OutputGeneratorContext from nimrod.test_suites_execution.main import TestSuitesExecution from os import path from bs4 import BeautifulSoup +import logging class SemanticConflictsOutput(TypedDict): @@ -12,7 +13,7 @@ class SemanticConflictsOutput(TypedDict): test_case_name: str test_case_results: Dict[str, str] test_suite_path: str - scenario_targets: Dict[str, List[str]] + scenario_targets: Dict[str, Union[List[Dict[str, str]], List[str]]] exercised_targets: Dict[str, List[str]] @@ -26,40 +27,51 @@ def _generate_report_data(self, context: OutputGeneratorContext) -> List[Semanti for semantic_conflict in context.semantic_conflicts: # We need to detect which targets from the input were exercised in this conflict. - coverage_report_root = self._test_suites_execution.execute_test_suite_with_coverage( - test_suite=semantic_conflict.detected_in.test_suite, - target_jar=context.scenario.scenario_jars.merge, - test_cases=[semantic_conflict.detected_in.name] - ) - - exercised_targets = self._extract_exercised_targets_from_coverage_report( - coverage_report_root=coverage_report_root, - targets=context.scenario.targets - ) - - report_data.append({ - "project_name": context.scenario.project_name, - "scenario_commits": context.scenario.scenario_commits.__dict__, - "criteria": semantic_conflict._satisfying_criteria.__class__.__name__, - "test_case_name": semantic_conflict.detected_in.name, - "test_case_results": { - "base": semantic_conflict.detected_in.base, - "left": semantic_conflict.detected_in.left, - "right": semantic_conflict.detected_in.right, - "merge": semantic_conflict.detected_in.merge - }, - "test_suite_path": semantic_conflict.detected_in.test_suite.path, - "scenario_targets": context.scenario.targets, - "exercised_targets": exercised_targets - }) + exercised_targets: Dict[str, List[str]] = dict() + try: + coverage_report_root = self._test_suites_execution.execute_test_suite_with_coverage( + test_suite=semantic_conflict.detected_in.test_suite, + target_jar=context.scenario.scenario_jars.merge, + test_cases=[semantic_conflict.detected_in.name] + ) + + exercised_targets = self._extract_exercised_targets_from_coverage_report( + coverage_report_root=coverage_report_root, + targets=context.scenario.targets + ) + + except Exception as e: + # If we cannot execute the test suite with coverage, we log the error and continue. + logging.error("Error executing test suite with coverage for semantic conflict: %s", e) + + finally: + report_data.append({ + "project_name": context.scenario.project_name, + "scenario_commits": context.scenario.scenario_commits.__dict__, + "criteria": semantic_conflict._satisfying_criteria.__class__.__name__, + "test_case_name": semantic_conflict.detected_in.name, + "test_case_results": { + "base": semantic_conflict.detected_in.base, + "left": semantic_conflict.detected_in.left, + "right": semantic_conflict.detected_in.right, + "merge": semantic_conflict.detected_in.merge + }, + "test_suite_path": semantic_conflict.detected_in.test_suite.path, + "scenario_targets": context.scenario.targets, + "exercised_targets": exercised_targets + }) return report_data - def _extract_exercised_targets_from_coverage_report(self, coverage_report_root: str, targets: Dict[str, List[str]]): + def _extract_exercised_targets_from_coverage_report(self, coverage_report_root: str, targets: Dict[str, Union[List[Dict[str, str]], List[str]]]): exercised_targets: Dict[str, List[str]] = dict() for class_name in targets.keys(): - for method_name in targets[class_name]: + for method_item in targets[class_name]: + if isinstance(method_item, dict): + method_name = method_item.get("method", "") + else: + method_name = method_item if self._was_target_exercised(coverage_report_root, class_name, method_name): exercised_targets[class_name] = exercised_targets.get( class_name, []) + [method_name] @@ -80,7 +92,11 @@ def _was_target_exercised(self, coverage_report_root: str, fqcn: str, method_sig # We itereate in each method row for method_row in method_report_rows: if method_row.get_text().find(method_name) != -1: - if method_row.select_one('td:nth-last-child(2)').get_text() == '0': + tag = method_row.select_one('td:nth-last-child(2)') + if tag is None: + continue + # If the second last column is 0, it means the method was not executed + if tag.get_text() == "0": return True return False diff --git a/nimrod/output_generation/test_suites_output_generator.py b/nimrod/output_generation/test_suites_output_generator.py index 2d14765c..2ebb1ed0 100644 --- a/nimrod/output_generation/test_suites_output_generator.py +++ b/nimrod/output_generation/test_suites_output_generator.py @@ -1,4 +1,4 @@ -from typing import List, TypedDict +from typing import List, TypedDict, Dict, Union from nimrod.dynamic_analysis.behavior_change import BehaviorChange from nimrod.dynamic_analysis.semantic_conflict import SemanticConflict from nimrod.output_generation.output_generator import OutputGenerator, OutputGeneratorContext @@ -7,6 +7,7 @@ class TestSuitesOutput(TypedDict): project_name: str + targets: Dict[str, Union[List[Dict[str, str]], List[str]]] generator_name: str path: str detected_semantic_conflicts: bool @@ -23,6 +24,7 @@ def _generate_report_data(self, context: OutputGeneratorContext) -> List[TestSui for test_suite in context.test_suites: report_data.append({ "project_name": context.scenario.project_name, + "targets": context.scenario.targets, "generator_name": test_suite.generator_name, "path": test_suite.path, "detected_semantic_conflicts": self._has_detected_semantic_conflicts_in_test_suite(test_suite, context.semantic_conflicts), diff --git a/nimrod/report_metrics/coverage/coverage_report.py b/nimrod/report_metrics/coverage/coverage_report.py index 37aad142..07a26273 100644 --- a/nimrod/report_metrics/coverage/coverage_report.py +++ b/nimrod/report_metrics/coverage/coverage_report.py @@ -94,9 +94,6 @@ def get_valid_test_suite(self, toolSuites, first_entry, last_entry): return None def retornaDadosParaAnalise(self, evo, path_suite, suite_merge, jacoco, classeTarget, listaPacoteMetodoClasse, targets: "dict[str, list[str]]"): - global tagAClasseTarget - global tagSpanMetodoTarget - print("Classe Target ", classeTarget) listaJar = evo.project_dep.mergeDir.split( diff --git a/nimrod/setup_tools/tools.py b/nimrod/setup_tools/tools.py index e517dbf3..57b3dcf7 100644 --- a/nimrod/setup_tools/tools.py +++ b/nimrod/setup_tools/tools.py @@ -4,4 +4,5 @@ class Tools(Enum): RANDOOP='RANDOOP' RANDOOP_MOD='RANDOOP-MODIFIED' EVOSUITE='EVOSUITE' - DIFF_EVOSUITE='DIFFERENTIAL-EVOSUITE' \ No newline at end of file + DIFF_EVOSUITE='DIFFERENTIAL-EVOSUITE' + OLLAMA='OLLAMA' \ No newline at end of file diff --git a/nimrod/test_suite_generation/generators/evosuite_test_suite_generator.py b/nimrod/test_suite_generation/generators/evosuite_test_suite_generator.py index 39eb9343..28895137 100644 --- a/nimrod/test_suite_generation/generators/evosuite_test_suite_generator.py +++ b/nimrod/test_suite_generation/generators/evosuite_test_suite_generator.py @@ -1,6 +1,6 @@ import logging import os -from typing import List +from typing import List, Dict, Union from nimrod.core.merge_scenario_under_analysis import MergeScenarioUnderAnalysis from nimrod.test_suite_generation.generators.test_suite_generator import \ @@ -69,10 +69,15 @@ def _get_test_suite_class_names(self, test_suite_path: str) -> List[str]: def _compile_test_suite(self, input_jar: str, output_path: str, extra_class_path: List[str] = []) -> str: return super()._compile_test_suite(input_jar, output_path, [EVOSUITE_RUNTIME] + extra_class_path) - def _create_method_list(self, methods: "List[str]"): - rectified_methods = [self._convert_method_signature( - method) for method in methods] - return (":").join(rectified_methods) + def _create_method_list(self, methods: "Union[List[Dict[str, str]], List[str]]"): + rectified_methods = [] + for method_item in methods: + if isinstance(method_item, dict): + method_str = method_item.get("method", "") + else: + method_str = method_item + rectified_methods.append(self._convert_method_signature(method_str)) + return ":".join(rectified_methods) def _convert_method_signature(self, meth_signature: str) -> str: method_return = "" diff --git a/nimrod/test_suite_generation/generators/llm_output_processor.py b/nimrod/test_suite_generation/generators/llm_output_processor.py new file mode 100644 index 00000000..9209ec2e --- /dev/null +++ b/nimrod/test_suite_generation/generators/llm_output_processor.py @@ -0,0 +1,108 @@ +import re +from typing import List, Optional +from abc import ABC, abstractmethod + + +class OutputSanitizationRule(ABC): + """Abstract base class for output sanitization rules.""" + + @abstractmethod + def apply(self, output: str) -> str: + """Apply the sanitization rule to the output.""" + pass + + +class RemoveThinkTagsRule(OutputSanitizationRule): + """Removes content between tags.""" + + def apply(self, output: str) -> str: + return re.sub(r'.*?', '', output, flags=re.DOTALL) + + +class ExtractCodeBlocksRule(OutputSanitizationRule): + """Extracts content from code blocks (``` markers).""" + + def apply(self, output: str) -> str: + matches = re.findall(r'```(?:\w+)?\n?(.*?)```', output, flags=re.DOTALL) + return '\n'.join(matches).strip() + + +class RemoveNumberedLinesRule(OutputSanitizationRule): + """Removes lines starting with 'number. ' pattern.""" + + def apply(self, output: str) -> str: + return re.sub(r"^\d+\.\s.*$", "", output, flags=re.MULTILINE) + + +class ExtractFromAnnotationsRule(OutputSanitizationRule): + """Keeps only content starting from the first annotation marker.""" + + def __init__(self, markers: Optional[List[str]] = None): + self.markers = markers or ["@Before", "@BeforeClass", "@Test"] + + def apply(self, output: str) -> str: + index = min( + (output.find(marker) for marker in self.markers if marker in output), + default=-1 + ) + return output[index:] if index != -1 else output + + +class LLMOutputProcessor: + """ + Processes and sanitizes LLM outputs using configurable rules. + + This class provides a flexible framework for cleaning LLM outputs + by applying a series of sanitization rules in sequence. + """ + + def __init__(self) -> None: + self._rules: List[OutputSanitizationRule] = [] + self._load_default_rules() + + def _load_default_rules(self) -> None: + """Load the default set of sanitization rules.""" + self._rules = [ + RemoveThinkTagsRule(), + ExtractCodeBlocksRule(), + RemoveNumberedLinesRule(), + ExtractFromAnnotationsRule() + ] + + def add_rule(self, rule: OutputSanitizationRule) -> None: + """Add a custom sanitization rule.""" + self._rules.append(rule) + + def remove_rule(self, rule_type: type) -> None: + """Remove all rules of the specified type.""" + self._rules = [rule for rule in self._rules if not isinstance(rule, rule_type)] + + def clear_rules(self) -> None: + """Remove all sanitization rules.""" + self._rules.clear() + + def process(self, output: str) -> str: + """ + Process the LLM output by applying all sanitization rules in sequence. + + Args: + output: The raw output from the LLM model + + Returns: + The processed and sanitized output + """ + processed_output = output + + for rule in self._rules: + try: + processed_output = rule.apply(processed_output) + except Exception as e: + # Log the error but continue processing with other rules + import logging + logging.warning(f"Error applying rule {rule.__class__.__name__}: {e}") + + return processed_output + + def get_active_rules(self) -> List[str]: + """Get the names of currently active rules.""" + return [rule.__class__.__name__ for rule in self._rules] diff --git a/nimrod/test_suite_generation/generators/ollama_test_suite_generator.py b/nimrod/test_suite_generation/generators/ollama_test_suite_generator.py new file mode 100644 index 00000000..43009207 --- /dev/null +++ b/nimrod/test_suite_generation/generators/ollama_test_suite_generator.py @@ -0,0 +1,600 @@ +import json +import logging +import os +import requests # type: ignore +from typing import List, Dict, Union, Any, Optional +import re + +import tree_sitter_java as tsjava +from tree_sitter import Language, Parser, QueryCursor + +from nimrod.core.merge_scenario_under_analysis import MergeScenarioUnderAnalysis +from nimrod.test_suite_generation.generators.test_suite_generator import TestSuiteGenerator +from nimrod.test_suite_generation.generators.prompt_manager import PromptManager +from nimrod.test_suite_generation.generators.llm_output_processor import LLMOutputProcessor +from nimrod.tests.utils import get_config +from nimrod.utils import load_json, save_json + + +class Api: + + def __init__(self, api_url: str, timeout_seconds: int, temperature: float, seed: int, model: str) -> None: + self.api_url = api_url + self.timeout_seconds = timeout_seconds + self.temperature = temperature + self.seed = seed + self.model = model + self.headers = {"Content-Type": "application/json"} + self.payload = { + "model": self.model, + "messages": [], + "stream": False, + "options": { + "temperature": self.temperature, + "num_ctx": 16384, + "seed": self.seed, + }, + } + self.branch: Optional[str] = None + + def set_branch(self, branch: str) -> None: + """Sets the branch to be used in the API requests.""" + self.branch = branch + + def post(self, payload: Dict[str, Any]) -> Dict[str, Any]: + """Sends a POST request to the API and handles the response.""" + try: + response: requests.Response = requests.post( + self.api_url, + headers=self.headers, + json=payload, + timeout=self.timeout_seconds + ) + response.raise_for_status() + logging.debug("Request successful. Status: %s", response.status_code) + return response.json() + except requests.exceptions.Timeout: + logging.error("Request timed out.") + return {"error": "Request timed out"} + except requests.exceptions.RequestException as e: + logging.error(f"Request error: {e}") + return {"error": "Request error"} + except json.JSONDecodeError: + logging.error("JSON decoding error.") + return {"error": "JSON decoding error"} + + def set_payload_messages(self, messages: List[Dict[str, str]]) -> None: + """Sets the messages in the payload.""" + self.payload["messages"] = messages + + def generate_output(self, messages: List[Dict[str, str]]) -> Dict[str, Any]: + """Generates output by sending messages to the API.""" + try: + self.set_payload_messages(messages) + response = self.post(self.payload) + return { + "response": response.get("message", {}).get("content", "Response not found."), + "total_duration": response.get("total_duration", self.timeout_seconds * 1_000_000_000), + } + except Exception as e: + logging.error(f"Error generating output: {e}") + return {"error": "Output generation error", "total_duration": self.timeout_seconds * 1_000_000_000} + + +class OllamaTestSuiteGenerator(TestSuiteGenerator): + + def __init__(self, java_tool, model_key: str = "codellama", model_config: Dict[str, Any] = {}): + super().__init__(java_tool) + self.model_key = model_key + self.model_config = model_config + self.api: Optional[Api] = None + self.prompt_manager = PromptManager() + self.output_processor = LLMOutputProcessor() + + # Loads global configurations + global_config = get_config() + + # Prompt configurations (priority: model_config > global_config > default) + self.prompt_template = ( + self.model_config.get("prompt_template") or + global_config.get("prompt_template") or + "zero_shot" + ) + + logging.info(f"Initialized {self.model_key} with prompt template: {self.prompt_template}") + + def generate_messages_list(self, method_info: Dict[str, str], full_class_name: str, + branch: str, output_path: str) -> Dict[str, List[Dict[str, str]]]: + """ + Generates messages for API requests using the configurable prompt system. + Supports different templates (zero-shot, one-shot) and context combinations. + """ + api = self._get_api_instance() + api.set_branch(branch) # Set the branch + class_name = full_class_name.split('.')[-1] + method_name = method_info.get("method_name", "") + + logging.debug(f"Generating messages for {class_name}.{method_name} using template: {self.prompt_template}") + + # Generates messages using the PromptManager + messages_dict = self.prompt_manager.generate_all_combinations( + method_info=method_info, + class_name=class_name, + branch=branch, + template_name=self.prompt_template + ) + + # Saves generated messages + self.prompt_manager.save_generated_messages( + messages_dict=messages_dict, + output_path=output_path, + class_name=class_name, + method_name=method_name + ) + + logging.info(f"Generated {len(messages_dict)} prompt variations for {class_name}.{method_name}") + + return messages_dict + + def _ensure_api_initialized(self) -> Api: + """Initializes the API if it has not been initialized yet and returns the instance.""" + if self.api is not None: + return self.api + + config = get_config() + api_params = config.get("api_params", {}) + if not api_params: + raise ValueError("The 'api_params' section is missing from the configuration file") + + # Use the specific model configuration for this generator instance + if self.model_config: + model_params = self.model_config + else: + if not api_params.get(self.model_key): + raise ValueError(f"The '{self.model_key}' section is missing from the 'api_params' configuration") + model_params = api_params.get(self.model_key, {}) + + self.api = Api( + api_url=model_params.get("api_url", "http://localhost:11434/api/chat"), + timeout_seconds=model_params.get("timeout_seconds", 60), + temperature=model_params.get("temperature", 0), + seed=model_params.get("seed", 42), + model=model_params.get("model", "codellama:70b") + ) + return self.api + + def _get_api_instance(self) -> Api: + """Returns a valid API instance, initializing it if necessary.""" + return self._ensure_api_initialized() + + def get_generator_tool_name(self) -> str: + self._get_api_instance() + config_suffix = self._generate_config_suffix() + return f"{self.model_key.upper()}{config_suffix}" + + def _generate_config_suffix(self) -> str: + """Generates a suffix with configuration information for folder identification""" + if not self.api: + return "" + + # Prompt format: ZS (zero-shot) or 1S (one-shot) + prompt_code = "ZS" if self.prompt_template == "zero_shot" else "1S" + + # Temperature: T00, T05, T07, etc. (always with 2 digits) + temp_value = int(self.api.temperature * 100) # 0.7 -> 70, 0.05 -> 5, 0 -> 0 + temp_code = f"T{temp_value:02d}" # Formats with 2 digits: T00, T05, T07 + + # Seed: S123, S42, etc. + seed_code = f"S{self.api.seed}" + + return f"_{prompt_code}_{temp_code}_{seed_code}" + + def _get_test_suite_class_paths(self, path: str) -> List[str]: + paths: List[str] = [] + for root, _, files in os.walk(path): + paths.extend(os.path.join(root, file) for file in files if file.endswith(".java")) + return paths + + def _get_test_suite_class_names(self, test_suite_path: str) -> List[str]: + return [os.path.basename(path).replace(".java", "") for path in self._get_test_suite_class_paths(test_suite_path)] + + def save_output(self, test_template: str, output: str, dir: str, output_file_name: str) -> None: + """Saves the output generated by the model to a file, replacing #TEST_METHODS# in the template.""" + # Process and sanitize the LLM output using the configured processor + processed_output = self.output_processor.process(output) + + # Replace the placeholder in the template with the processed content + filled_template = test_template.replace("#TEST_METHODS#", processed_output) + + # Save to file + llm_outputs_dir = os.path.join(dir, "llm_outputs") + output_file_path = os.path.join(llm_outputs_dir, f"{output_file_name}.txt") + + os.makedirs(llm_outputs_dir, exist_ok=True) + with open(output_file_path, "w") as file: + file.write(filled_template) + + def parse_code(self, source_code_path: str) -> tuple: + """Parses the Java source code using the Tree-sitter parser and return the language, source code, and generated AST""" + JAVA_LANGUAGE = Language(tsjava.language()) + parser = Parser(JAVA_LANGUAGE) + with open(source_code_path, 'r') as f: + source_code = f.read() + tree = parser.parse(bytes(source_code, "utf8")) + return JAVA_LANGUAGE, source_code, tree + + def extract_snippet(self, source_code: str, start_byte: int, end_byte: int) -> str: + """Extracts a snippet of code from the source code using the start and end byte offsets""" + return source_code[start_byte:end_byte] + + def extract_class_info(self, source_code_path: str, full_method_name: str, full_class_name: str) -> tuple: + """ + Extracts the class fields, constructor, and body of the method under test from the source code + using the generated AST to query the information + """ + try: + class_name = full_class_name.split('.')[-1] + method_name = full_method_name.split('(')[0] + JAVA_LANGUAGE, source_code, tree = self.parse_code(source_code_path) + + query_text = f""" + (class_declaration + name: (identifier) @class_name + body: (class_body + [ + (field_declaration) @field_declaration + (constructor_declaration + name: (identifier) @constructor_name) @constructor_declaration + (method_declaration + name: (identifier) @method_name) @method_def + (#eq? @method_name "{method_name}") + (#eq? @constructor_name "{class_name}") + ] + ) + (#eq? @class_name "{class_name}") + ) + """ + query = JAVA_LANGUAGE.query(query_text) + cursor = QueryCursor(query) + captures_dict = cursor.captures(tree.root_node) + + captures = [] + for capture_name, nodes in captures_dict.items(): + for node in nodes: + captures.append((node, capture_name)) + + if not captures: + raise Exception(f"No captures found for the class '{class_name}' in '{source_code_path}'") + + class_fields: List[str] = [] + class_constructors: List[str] = [] + class_method: str = "" + + for node, capture_name in captures: + captured_text = self.extract_snippet(source_code, node.start_byte, node.end_byte) + + if capture_name == "field_declaration": + class_fields.append(captured_text) + elif capture_name == "constructor_declaration": + class_constructors.append(captured_text) + elif capture_name == "method_def": + class_method = captured_text + + return class_fields, class_constructors, class_method + + except Exception as e: + logging.error("An error occurred while extracting class info for '%s': %s", full_class_name, e) + raise e + + def save_scenario_infos(self, scenario_infos_path: str, class_name: str, methods: Union[List[str], List[Dict[str, str]]], source_code_path: str) -> None: + """Stores relevant scenario information (for each class and method) in a JSON file""" + if os.path.exists(scenario_infos_path): + scenario_infos_dict = load_json(scenario_infos_path) + else: + scenario_infos_dict = {} + + if class_name not in scenario_infos_dict: + scenario_infos_dict[class_name] = [] + + """ + method_item -> + { + "method": "methodname()", + "leftChangesSummary": "Left does...", + "rightChangesSummary": "Right does..." + } + + method_item -> "methodname()" + """ + for method_item in methods: + if not isinstance(method_item, dict): + method = method_item + left_changes_summary = "" + right_changes_summary = "" + else: + method = method_item.get("method", "") + left_changes_summary = method_item.get("leftChangesSummary", "") + right_changes_summary = method_item.get("rightChangesSummary", "") + try: + method = re.sub(r'\|', ',', method) + logging.debug("Saving scenario information for method '%s' in class '%s'", method, class_name) + class_fields, constructor_codes, method_code = self.extract_class_info(source_code_path, method, class_name) + + scenario_infos_dict[class_name].append({ + 'class_fields': class_fields if class_fields else [], + 'constructor_codes': constructor_codes if constructor_codes else [], + 'method_name': method, + 'method_code': method_code if method_code else "", + 'left_changes_summary': left_changes_summary, + 'right_changes_summary': right_changes_summary, + 'test_template': ( + "import org.junit.Test;\n" + "import org.junit.Before;\n" + "import org.junit.BeforeClass;\n" + "import static org.junit.Assert.*;\n\n" + f"public class {class_name.split('.')[-1]}_{method.split('(')[0]}Test {{\n" + "#TEST_METHODS#\n" + "}" + ) + }) + + except Exception as e: + logging.error("Error while saving scenario information for method '%s' in class '%s': %s", method, class_name, e) + + save_json(scenario_infos_path, scenario_infos_dict) + + def save_imports(self, class_name: str, source_code_path: str, imports_path: str) -> None: + """Extracts import statements from the Java source code and stores them in a JSON file""" + JAVA_LANGUAGE, source_code, tree = self.parse_code(source_code_path) + + query_text = """ + (import_declaration) @import + (package_declaration) @package + """ + query = JAVA_LANGUAGE.query(query_text) + cursor = QueryCursor(query) + captures_dict = cursor.captures(tree.root_node) + + captures = [] + for capture_name, nodes in captures_dict.items(): + for node in nodes: + captures.append((node, capture_name)) + + if os.path.exists(imports_path): + imports_dict = load_json(imports_path) + else: + imports_dict = {} + + class_imports = imports_dict.setdefault(class_name, []) + + for node, capture_name in captures: + start_byte, end_byte = node.start_byte, node.end_byte + captured_text = source_code[start_byte:end_byte].strip() + + if capture_name == "import": + class_imports.append(f'{captured_text}\n') + elif capture_name == "package": + package_name = captured_text.split()[1].rstrip(';') + class_imports.append(f'import {package_name}.*;\n') + + save_json(imports_path, imports_dict) + + def extract_individual_tests(self, output_path: str, test_template: str, class_name: str, imports: List[str], i: int, j: int, prompt_key: str, branch: str) -> None: + """Extracts individual tests from the generated test suite and saves them to separate files""" + llm_outputs_path = os.path.join(output_path, "llm_outputs") + counter = 0 + + def classify_annotations(captures: List[tuple], source_code: str) -> tuple: + """Classifies annotations in the captured snippets as 'before' or 'test' blocks""" + before_block: List[Dict[str, str]] = [] + test_block: List[Dict[str, str]] = [] + + for node, _ in captures: + captured_text = self.extract_snippet(source_code, node.start_byte, node.end_byte) + if "@Test" in captured_text: + test_block.append({"snippet": captured_text}) + elif any(annotation in captured_text for annotation in ["@Before", "@BeforeClass"]): + before_block.append({"snippet": captured_text}) + + return before_block, test_block + + # Format: {i}{j}_{branch}_{class_name.split('.')[-1]}_{prompt_key}.txt + pattern = rf"^{i}{j}_(left|right)_{re.escape(class_name.split('.')[-1])}_{prompt_key}\.txt$" + for file in os.listdir(llm_outputs_path): + # Avoid processing the wrong files (from different classes) + if re.match(pattern, file): + logging.debug("Processing file: %s", file) + source_code_path = os.path.join(llm_outputs_path, file) + JAVA_LANGUAGE, source_code, tree = self.parse_code(source_code_path) + + # Tree-sitter query to find method definitions with annotations + query_text = """ + (method_declaration + (modifiers [ + (annotation) + (marker_annotation) + ] + )) @method_def + """ + query = JAVA_LANGUAGE.query(query_text) + cursor = QueryCursor(query) + captures_dict = cursor.captures(tree.root_node) + + captures = [] + for capture_name, nodes in captures_dict.items(): + for node in nodes: + captures.append((node, capture_name)) + + before_block, test_block = classify_annotations(captures, source_code) + for test in test_block: + method_name = f"{class_name.split('.')[-1]}Test_{branch}_{prompt_key}_{j}_{i}_{counter}" + output_file_path = os.path.join(output_path, f"{method_name}.java") + + new_template = test_template.split("public class")[0] + f"public class {method_name} {{\n" + full_template = "".join(imports) + new_template + + if before_block: + full_template += "".join(before['snippet'] for before in before_block) + + snippet = test['snippet'] + public_index = snippet.find('public') + if public_index != -1: + test_method_name = snippet[public_index:].split('(')[0].split()[-1] + new_snippet = snippet.replace(test_method_name, f"test{i}{counter}") + + with open(output_file_path, "w") as f: + f.write(full_template + new_snippet + "\n}") + + counter += 1 + + def find_source_code_paths(self, input_jar: str, jar_type: str, class_name: str, project_name: str) -> Dict[str, str]: + """Finds the source code files for the given class name in the specified JAR path""" + input_jar = input_jar.split(":")[0] + if not os.path.exists(input_jar): + logging.error("The provided jar path '%s' does not exist", input_jar) + raise FileNotFoundError(f"The provided path '{input_jar}' does not exist") + + path_parts = input_jar.split(os.sep) + if jar_type != "transformed" and jar_type != "original": + raise ValueError("The provided path does not contain the expected jar type (transformed/original)") + + type_index = path_parts.index(jar_type) + base_path = os.path.join("/", *path_parts[:type_index], "source") + + if not os.path.exists(base_path): + raise FileNotFoundError(f"The base path '{base_path}' does not exist") + + java_files = {key: "" for key in ["base", "left", "right", "merge"]} + + for root, _, files in os.walk(base_path): + java_candidates = [file for file in files if file.endswith(".java")] + + # Prioritize files within the class-named folder + if os.path.basename(root) == class_name: + for file in java_candidates: + file_key = file.replace(".java", "") + if file_key in java_files: + java_files[file_key] = os.path.join(root, file) + + # If any file is still missing, try to fill it in + for file in java_candidates: + file_key = file.replace(".java", "") + if file_key in java_files and not java_files[file_key]: + java_files[file_key] = os.path.join(root, file) + + missing_files = [key for key, path in java_files.items() if not path] + if missing_files: + raise FileNotFoundError(f"The following source code files were not found: {', '.join(missing_files)}") + + return java_files + + def fetch_source_code_branch(self, input_jar: str, jar_type: str, class_name: str, project_name: str) -> tuple: + """Retrieves the source code path and branch for the given JAR path""" + source_code_paths = self.find_source_code_paths(input_jar, jar_type, class_name.split('.')[-1], project_name) + branches = ["base", "left", "right", "merge"] + + branch = next((b for b in branches if b in input_jar), None) + + if branch: + source_code_path = source_code_paths.get(branch) + if source_code_path: + return source_code_path, branch, source_code_paths + + available_branches = ", ".join(branches) + raise ValueError(f"No corresponding branch found in '{input_jar}'. Available branches: {available_branches}") + + def record_output_duration(self, time_duration_path: str, output_path: str, class_name: str, + output_file_name: str, total_duration: int, project_name: str) -> None: + """Records the duration of output generation for the given class and output file""" + logging.debug("Recording duration for output '%s' in class '%s'", output_file_name, class_name) + os.makedirs(os.path.dirname(time_duration_path), exist_ok=True) + + if not os.path.exists(time_duration_path): + with open(time_duration_path, "w") as file: + json.dump({}, file) + + time_duration_dict = load_json(time_duration_path) + + project_data = time_duration_dict.setdefault(project_name, {}) + class_data = project_data.setdefault(class_name, {"total_duration": 0, "outputs": {}}) + + key_name = output_path.split(os.sep)[-1] + '_' + output_file_name + + total_duration_seconds = total_duration / 1_000_000_000 + + duration_rounded = round(total_duration_seconds, 2) + class_data["outputs"][key_name] = duration_rounded + class_data["total_duration"] = round(class_data["total_duration"] + duration_rounded, 2) + + try: + save_json(time_duration_path, time_duration_dict) + except Exception as e: + logging.error("Error while recording duration for output '%s' in class '%s': %s", output_file_name, class_name, e) + raise + + def _execute_tool_for_tests_generation(self, input_jar: str, output_path: str, scenario: MergeScenarioUnderAnalysis, use_determinism: bool) -> None: + self._get_api_instance() + + # Define paths for storing scenario information (for prompt generation), + # importing data (to be extracted from source code), and recording time duration (for each output) + scenario_infos_path = os.path.join(output_path, "scenario_infos.json") + imports_path = os.path.join(output_path, "imports.json") + # Save time duration data in the 'reports' folder, located next to the 'projects' folder + # Generates config suffix for the duration file + config_suffix = self._generate_config_suffix().replace("_", "") if hasattr(self, '_generate_config_suffix') else "" + duration_filename = f"{self.model_key}{config_suffix}_time_duration.json" + + time_duration_path = os.path.join( + os.path.dirname( + os.path.dirname( + os.path.dirname(output_path))), "reports", duration_filename) + + project_name = scenario.project_name + targets = scenario.targets + jar_type = scenario.jar_type + + # Fetch the source code paths for each class and save the associated scenario information and import data + for class_name, methods in targets.items(): + source_code_path, branch, source_paths = self.fetch_source_code_branch(input_jar, jar_type, class_name, project_name) + self.save_scenario_infos(scenario_infos_path, class_name, methods, source_code_path) + self.save_imports(class_name, source_code_path, imports_path) + + # Load scenario information and import data into dictionaries + scenario_infos_dict = load_json(scenario_infos_path) + imports_dict = load_json(imports_path) + + # Generate tests for each method in every class and save the results + for class_name, scenario_infos_list in scenario_infos_dict.items(): + logging.debug("Generating tests for target methods in class '%s'", class_name) + for i, method_info in enumerate(scenario_infos_list): + messages_list = self.generate_messages_list(method_info, class_name, branch, output_path) + test_template = method_info.get("test_template", "") + self._process_prompts(messages_list=messages_list, test_template=test_template, output_path=output_path, + branch=branch, class_name=class_name, imports=imports_dict.get(class_name, []), + i=i, time_duration_path=time_duration_path, project_name=project_name) + + def _process_prompts(self, messages_list: Dict[str, List[Dict[str, str]]], test_template: str, output_path: str, branch: str, + class_name: str, imports: List[str], i: int, time_duration_path: str, project_name: str, + num_outputs: int = 1) -> None: + for j in range(num_outputs): + for prompt_key, messages in messages_list.items(): + output_file_name = f"{i}{j}_{branch}_{class_name.split('.')[-1]}_{prompt_key}" + self._process_single_prompt(messages, test_template, output_path, branch, class_name, imports, i, j, time_duration_path, project_name, output_file_name, prompt_key) + + def _process_single_prompt(self, messages: List[Dict[str, str]], test_template: str, output_path: str, branch: str, + class_name: str, imports: List[str], i: int, j: int, time_duration_path: str, + project_name: str, output_file_name: str, prompt_key: str) -> None: + api = self._get_api_instance() + total_duration = api.timeout_seconds * 1_000_000_000 # Initialize with timeout value in nanoseconds + try: + logging.debug("Processing output %d%d for prompt key '%s' in branch \"%s\"", i, j, prompt_key, branch) + output = api.generate_output(messages) + response = output.get("response", "Response not found.") + total_duration = int(output.get("total_duration", api.timeout_seconds * 1_000_000_000)) + self.save_output(test_template, response, output_path, output_file_name) + except Exception as e: + logging.error("Error while processing output %d%d for prompt key '%s' in branch \"%s\": %s", i, j, prompt_key, branch, e) + finally: + self.record_output_duration(time_duration_path, output_path, class_name, output_file_name, total_duration, project_name) + + self.extract_individual_tests(output_path, test_template, class_name, imports, i, j, prompt_key, branch) diff --git a/nimrod/test_suite_generation/generators/prompt_manager.py b/nimrod/test_suite_generation/generators/prompt_manager.py new file mode 100644 index 00000000..f0913bb6 --- /dev/null +++ b/nimrod/test_suite_generation/generators/prompt_manager.py @@ -0,0 +1,158 @@ +#!/usr/bin/env python3 + +import json +import os +from typing import Dict, List, Any, Optional + + +class PromptManager: + def __init__(self, config_path: Optional[str] = None): + if config_path is None: + current_dir = os.path.dirname(os.path.abspath(__file__)) + config_path = os.path.join(current_dir, "prompt_templates.json") + self.config_path = config_path + self.config = self._load_config() + + def _load_config(self) -> Dict[str, Any]: + try: + with open(self.config_path, 'r', encoding='utf-8') as f: + return json.load(f) + except (FileNotFoundError, json.JSONDecodeError, IOError): + return {"prompt_templates": {"zero_shot": {}}} + + def _format_template(self, template: Dict[str, str], **kwargs) -> Dict[str, str]: + formatted = template.copy() + if "content" in formatted: + formatted["content"] = formatted["content"].format(**kwargs) + return formatted + + def _prepare_context_data(self, method_info: Dict[str, Any]) -> Dict[str, str]: + class_fields = method_info.get("class_fields", []) + class_fields_str = "\n".join(class_fields) if class_fields else "" + + constructors = method_info.get("constructor_codes", []) + constructors_str = "\n".join(constructors) if constructors else "" + + return { + "class_fields": class_fields_str, + "constructors": constructors_str, + "method_code": method_info.get("method_code", ""), + "left_changes_summary": method_info.get("left_changes_summary", ""), + "right_changes_summary": method_info.get("right_changes_summary", "") + } + + def generate_messages_for_template(self, template_name: str, method_info: Dict[str, Any], + class_name: str, branch: str, context_keys: Optional[List[str]] = None) -> List[Dict[str, str]]: + template_config = self.config.get("prompt_templates", {}).get(template_name, {}) + + context_data = self._prepare_context_data(method_info) + context_data.update({"class_name": class_name, "branch": branch, "method_name": method_info.get("method_name", "")}) + + messages = [] + + if template_name == "zero_shot": + if "system_message" in template_config: + messages.append(self._format_template(template_config["system_message"], **context_data)) + + if "user_init_message" in template_config: + messages.append(self._format_template(template_config["user_init_message"], **context_data)) + + if context_keys: + context_templates = template_config.get("context_templates", {}) + for key in context_keys: + if key in context_templates: + messages.append(self._format_template(context_templates[key], **context_data)) + + if "method_context_message" in template_config: + messages.append(self._format_template(template_config["method_context_message"], **context_data)) + + elif template_name == "one_shot": + if "system_message" in template_config: + messages.append(self._format_template(template_config["system_message"], **context_data)) + + if "user_init_message" in template_config: + messages.append(self._format_template(template_config["user_init_message"], **context_data)) + + if context_keys: + example_context = template_config.get("example_context", {}) + for key in context_keys: + if key in example_context: + messages.append(example_context[key]) + + if "example_method_message" in template_config: + messages.append(template_config["example_method_message"]) + + if "example_response" in template_config: + messages.append(template_config["example_response"]) + + if "user_init_message" in template_config: + messages.append(self._format_template(template_config["user_init_message"], **context_data)) + + if context_keys: + context_templates = template_config.get("context_templates", {}) + for key in context_keys: + if key in context_templates: + messages.append(self._format_template(context_templates[key], **context_data)) + + if "method_context_message" in template_config: + messages.append(self._format_template(template_config["method_context_message"], **context_data)) + + return messages + + def generate_all_combinations(self, method_info: Dict[str, Any], class_name: str, + branch: str, template_name: str = "zero_shot") -> Dict[str, List[Dict[str, str]]]: + """Generates the 8 specific combinations for zero-shot or one-shot""" + if template_name == "zero_shot": + combinations = [ + [], # prompt1 + ["changes_summary"], # prompt2 + ["class_fields"], # prompt3 + ["constructors"], # prompt4 + ["changes_summary", "class_fields"], # prompt5 + ["changes_summary", "constructors"], # prompt6 + ["class_fields", "constructors"], # prompt7 + ["changes_summary", "class_fields", "constructors"] # prompt8 + ] + elif template_name == "one_shot": + combinations = [ + [], # prompt1: sem contexto + ["changes_summary"], # prompt2 + ["class_fields"], # prompt3 + ["constructors"], # prompt4 + ["changes_summary", "class_fields"], # prompt5 + ["changes_summary", "constructors"], # prompt6 + ["class_fields", "constructors"], # prompt7 + ["changes_summary", "class_fields", "constructors"] # prompt8 + ] + else: + return {} + + messages_dict = {} + for i, context_keys in enumerate(combinations, 1): + messages_dict[f"prompt{i}"] = self.generate_messages_for_template( + template_name, method_info, class_name, branch, context_keys + ) + + return messages_dict + + def save_generated_messages(self, messages_dict: Dict[str, List[Dict[str, str]]], + output_path: str, class_name: str, method_name: str) -> None: + output_file_path = os.path.join(output_path, "generated_messages.json") + + try: + if os.path.exists(output_file_path): + with open(output_file_path, "r", encoding='utf-8') as file: + existing_data = json.load(file) + else: + existing_data = {} + except (FileNotFoundError, json.JSONDecodeError, IOError): + existing_data = {} + + if class_name not in existing_data: + existing_data[class_name] = {} + + existing_data[class_name][method_name] = messages_dict + + os.makedirs(output_path, exist_ok=True) + with open(output_file_path, "w", encoding='utf-8') as file: + json.dump(existing_data, file, indent=4, ensure_ascii=False) \ No newline at end of file diff --git a/nimrod/test_suite_generation/generators/prompt_templates.json b/nimrod/test_suite_generation/generators/prompt_templates.json new file mode 100644 index 00000000..d895e03d --- /dev/null +++ b/nimrod/test_suite_generation/generators/prompt_templates.json @@ -0,0 +1,82 @@ +{ + "prompt_templates": { + "zero_shot": { + "system_message": { + "role": "system", + "content": "You are a senior Java developer with expertise in JUnit testing.\nYour task is to provide JUnit tests for the given method in the class under test, considering the changes introduced in the left and right branches.\nYou have to answer with the test code only, inside code blocks (```).\nThe tests should start with @Test." + }, + "user_init_message": { + "role": "user", + "content": "Here is the context of the method under test in the class {class_name} on the {branch} branch:" + }, + "context_templates": { + "changes_summary": { + "role": "user", + "content": "Left {left_changes_summary}.\nRight {right_changes_summary}" + }, + "class_fields": { + "role": "user", + "content": "Class fields:\n{class_fields}" + }, + "constructors": { + "role": "user", + "content": "Constructors:\n{constructors}" + } + }, + "method_context_message": { + "role": "user", + "content": "Target Method Under Test:\n{method_code}\n\nNow generate JUnit tests for the method under test, considering the given context. Remember to create meaningful assertions.\nWrite all tests inside code blocks (```), and start each test with @Test." + } + }, + "one_shot": { + "system_message": { + "role": "system", + "content": "You are a senior Java developer with expertise in JUnit testing.Your only task is to generate JUnit test methods based on the provided Java class details, using Java syntax.Follow these guidelines:1. Provide only the JUnit test code, written in Java syntax, and nothing else.2. Fully implement the test methods, including the actual test logic (assertEquals, assertTrue, etc.), starting with @Test.3. Exclude any setup/teardown code or content outside the test method itself.4. You have to answer with the test code only, inside code blocks (```).5. Use comment blocks /* */ or // for extra text, such as comments, titles, explanations, or any additional details within the code.6. Ensure that the generated output is completely functional as code, compiles successfully, and runs without errors." + }, + "user_init_message": { + "role": "user", + "content": "Below, you will find additional information regarding the Class Under Test:" + }, + "example_context": { + "changes_summary": { + "role": "user", + "content": "Left wanted to extend the method cleanText() to also remove duplicated whitespace in the text by adding the method call normalizeWhitespace().\nRight wanted to clean the text by removing consecutive duplicated words." + }, + "class_fields": { + "role": "user", + "content": "Class fields:\npublic String text;\n" + }, + "constructors": { + "role": "user", + "content": "Constructors:\npublic DFPBaseSample(String text) {\n this.text = text;\n }\n" + } + }, + "example_method_message": { + "role": "user", + "content": "Target Method Under Test:\npublic void cleanText() {\n DFPBaseSample inst = new DFPBaseSample(text);\n inst.normalizeWhiteSpace();\n inst.removeComments();\n this.text = inst.text;\n }\n\nTests for the method 'cleanText()' in the class 'DFPBaseSample':" + }, + "example_response": { + "role": "assistant", + "content": "```@Test\npublic void test00() {\n DFPBaseSample sample = new DFPBaseSample(\"This is a sample text\");\n sample.cleanText();\n assertEquals(\"This is a sample text\", sample.getText());\n}\n\n@Test\npublic void test01() {\n DFPBaseSample sample1 = new DFPBaseSample(\"Hello World\");\n DFPBaseSample sample2 = sample1;\n sample1.cleanText();\n sample2.normalizeWhiteSpace();\n sample2.removeComments();\n assertEquals(sample1.getText(), sample2.getText());\n}```" + }, + "context_templates": { + "changes_summary": { + "role": "user", + "content": "{left_changes_summary}\n{right_changes_summary}" + }, + "class_fields": { + "role": "user", + "content": "Class fields:\n{class_fields}" + }, + "constructors": { + "role": "user", + "content": "Constructors:\n{constructors}" + } + }, + "method_context_message": { + "role": "user", + "content": "Target Method Under Test:\n{method_code}\n\nTests for the method '{method_name}' in the class '{class_name}':" + } + } + } +} \ No newline at end of file diff --git a/nimrod/test_suite_generation/generators/randoop_test_suite_generator.py b/nimrod/test_suite_generation/generators/randoop_test_suite_generator.py index 775d14b1..0bd84855 100644 --- a/nimrod/test_suite_generation/generators/randoop_test_suite_generator.py +++ b/nimrod/test_suite_generation/generators/randoop_test_suite_generator.py @@ -1,5 +1,5 @@ import os -from typing import Dict, List +from typing import Dict, List, Union from nimrod.core.merge_scenario_under_analysis import MergeScenarioUnderAnalysis from nimrod.test_suite_generation.generators.test_suite_generator import \ @@ -38,7 +38,7 @@ def _execute_tool_for_tests_generation(self, input_jar: str, output_path: str, s self._java.exec_java(output_path, self._java.get_env(), 3000, *tuple(params)) - def _generate_target_classes_file(self, output_path: str, targets: "Dict[str, List[str]]"): + def _generate_target_classes_file(self, output_path: str, targets: "Dict[str, Union[List[Dict[str, str]], List[str]]]"): filename = os.path.join(output_path, self.TARGET_CLASS_LIST_FILENAME) with open(filename, 'w') as f: @@ -48,12 +48,16 @@ def _generate_target_classes_file(self, output_path: str, targets: "Dict[str, Li return filename - def _generate_target_methods_file(self, output_path: str, targets: "Dict[str, List[str]]"): + def _generate_target_methods_file(self, output_path: str, targets: "Dict[str, Union[List[Dict[str, str]], List[str]]]"): filename = os.path.join(output_path, self.TARGET_METHODS_LIST_FILENAME) with open(filename, 'w') as f: for fqcn, methods in targets.items(): - for method in methods: + for method_item in methods: + if not isinstance(method_item, dict): + method = method_item + else: + method = method_item.get("method", "") method_signature = fqcn + "." + method f.write(method_signature) diff --git a/nimrod/test_suite_generation/generators/test_suite_generator.py b/nimrod/test_suite_generation/generators/test_suite_generator.py index 6a9f5ab1..d71c9fa2 100644 --- a/nimrod/test_suite_generation/generators/test_suite_generator.py +++ b/nimrod/test_suite_generation/generators/test_suite_generator.py @@ -4,6 +4,7 @@ from time import time from typing import List from subprocess import CalledProcessError +import json from nimrod.tests.utils import get_config from nimrod.core.merge_scenario_under_analysis import MergeScenarioUnderAnalysis @@ -66,14 +67,39 @@ def _get_test_suite_class_paths(self, test_suite_path: str) -> List[str]: def _get_test_suite_class_names(self, test_suite_path: str) -> List[str]: pass + def _update_compilation_results(self, test_suite_path: str, java_file: str, output: str) -> None: + """Updates the compilation results file with the output of the compilation of a test suite class.""" + reports_dir = path.join(path.dirname(get_base_output_path()), "reports") + COMPILATION_LOG_FILE = path.join(reports_dir, "compilation_results.json") + + makedirs(reports_dir, exist_ok=True) + + if path.exists(COMPILATION_LOG_FILE): + with open(COMPILATION_LOG_FILE, "r", encoding="utf-8") as f: + try: + compilation_results = json.load(f) + except json.JSONDecodeError: + compilation_results = {} + else: + compilation_results = {} + + test_suite_entry = compilation_results.setdefault(test_suite_path, {"compilation_output": {}}) + safe_output = output.strip() if output.strip() else "" + test_suite_entry["compilation_output"][java_file] = safe_output + + with open(COMPILATION_LOG_FILE, "w", encoding="utf-8") as f: + json.dump(compilation_results, f, indent=4) + def _compile_test_suite(self, input_jar: str, test_suite_path: str, extra_class_path: List[str] = []) -> str: compiled_classes_path = path.join(test_suite_path, 'classes') class_path = generate_classpath([input_jar, test_suite_path, compiled_classes_path, JUNIT, HAMCREST] + extra_class_path) - for java_file in self._get_test_suite_class_paths(test_suite_path): + output = "" try: self._java.exec_javac(java_file, test_suite_path, None, None, '-classpath', class_path, '-d', compiled_classes_path) - except CalledProcessError: + except CalledProcessError as e: + output = (e.stdout or b'').decode("utf-8", errors="ignore") + (e.stderr or b'').decode("utf-8", errors="ignore") logging.error("Error while compiling %s", java_file) + self._update_compilation_results(test_suite_path, java_file, output) return class_path diff --git a/nimrod/test_suites_execution/test_suite_executor.py b/nimrod/test_suites_execution/test_suite_executor.py index 18bb67e6..c1e40466 100644 --- a/nimrod/test_suites_execution/test_suite_executor.py +++ b/nimrod/test_suites_execution/test_suite_executor.py @@ -1,8 +1,9 @@ import logging import re import subprocess -from os import path -from typing import Dict, List +import json +from os import path, makedirs +from typing import Dict, List, Optional from nimrod.test_suite_generation.test_suite import TestSuite from nimrod.test_suites_execution.test_case_result import TestCaseResult from nimrod.tests.utils import get_base_output_path @@ -11,13 +12,17 @@ from nimrod.tools.jacoco import Jacoco from nimrod.utils import generate_classpath +reports_dir = path.join(path.dirname(get_base_output_path()), "reports") +makedirs(reports_dir, exist_ok=True) +EXECUTION_LOG_FILE = path.join(reports_dir, "execution_results.json") + def is_failed_caused_by_compilation_problem(test_case_name: str, failed_test_message: str) -> bool: my_regex = re.escape(test_case_name) + r"[0-9A-Za-z0-9_\(\.\)\n \:]+(NoSuchMethodError|NoSuchFieldError|NoSuchClassError|NoClassDefFoundError|NoSuchAttributeError|tried to access method)" - return re.search(my_regex, failed_test_message) != None + return re.search(my_regex, failed_test_message) is not None def is_failed_caused_by_error(test_case_name: str, failed_test_message: str) -> bool: my_regex = re.escape(test_case_name) + r"[0-9A-Za-z0-9_(.)]RegressionTest[0-9A-Za-z0-9_(.)\n]+Exception" - return re.search(my_regex, failed_test_message) != None + return re.search(my_regex, failed_test_message) is not None def get_result_for_test_case(failed_test: str, output: str) -> TestCaseResult: if is_failed_caused_by_compilation_problem(failed_test, output): @@ -34,10 +39,43 @@ def __init__(self, java: Java, jacoco: Jacoco) -> None: def execute_test_suite(self, test_suite: TestSuite, jar: str, number_of_executions: int = 3) -> Dict[str, TestCaseResult]: results: Dict[str, TestCaseResult] = dict() + # Load existing log if it exists + try: + with open(EXECUTION_LOG_FILE, "r") as log_file: + execution_log = json.load(log_file) + except (FileNotFoundError, json.JSONDecodeError): + execution_log = {} + for test_class in test_suite.test_classes_names: + logging.debug("Test class: %s", test_class) + if test_suite.generator_name == "EVOSUITE": + class_file_path = path.join(test_suite.path, f"classes/{test_class.replace('.', '/')}.class") + + else: + class_file_path = path.join(test_suite.path, f"classes/{test_class}.class") + + if not path.exists(class_file_path): + logging.warning("Class file %s does not exist; skipping execution", class_file_path) + continue + + if test_class not in execution_log: + execution_log[test_class] = [] + + # Check if the current test_suite.path is already in the log + test_suite_entry = next((entry for entry in execution_log[test_class] if test_suite.path in entry), None) + if not test_suite_entry: + test_suite_entry = {test_suite.path: {"jar": {}}} + execution_log[test_class].append(test_suite_entry) + + # Ensure the JAR is tracked under the current test_suite.path + if jar not in test_suite_entry[test_suite.path]["jar"]: + test_suite_entry[test_suite.path]["jar"][jar] = [] + + # Append execution results for the current JAR for i in range(0, number_of_executions): - logging.debug("Starting execution %d of %s from suite %s", i + 1, test_class, test_suite.path) + logging.info("Starting execution %d of %s from suite %s", i + 1, test_class, test_suite.path) response = self._execute_junit(test_suite, jar, test_class) + logging.debug("RESULTS: %s", response) for test_case, test_case_result in response.items(): test_fqname = f"{test_class}#{test_case}" if results.get(test_fqname) and results.get(test_fqname) != test_case_result: @@ -45,6 +83,14 @@ def execute_test_suite(self, test_suite: TestSuite, jar: str, number_of_executio elif not results.get(test_fqname): results[test_fqname] = test_case_result + test_suite_entry[test_suite.path]["jar"][jar].append({ + "execution_number": i + 1, + "result": {test_case: str(test_case_result) for test_case, test_case_result in response.items()} + }) + + with open(EXECUTION_LOG_FILE, "w") as log_file: + json.dump(execution_log, log_file, indent=4) + return results def _execute_junit(self, test_suite: TestSuite, target_jar: str, test_class: str, extra_params: List[str] = []) -> Dict[str, TestCaseResult]: @@ -62,20 +108,33 @@ def _execute_junit(self, test_suite: TestSuite, target_jar: str, test_class: str command = self._java.exec_java(test_suite.path, self._java.get_env(), TIMEOUT, *params) output = command.decode('unicode_escape') + # Special handling for LLM-generated test classes + if test_suite.generator_name not in ["RANDOOP", "EVOSUITE", "RANDOOP_MODIFIED", "EVOSUITE_DIFFERENTIAL", "PROJECT_TEST"]: + parts = test_class.replace(".java", "").split("_") + if len(parts) >= 2: + test_class_num = parts[-2] + parts[-1] + else: + logging.warning(f"Unexpected test_class format: '{test_class}'. Unable to extract test_class_num, defaulting to '0'.") + test_class_num = "0" + return self._parse_test_results_from_output(output, test_class_num) return self._parse_test_results_from_output(output) except subprocess.CalledProcessError as error: output = error.output.decode('unicode_escape') return self._parse_test_results_from_output(output) - def _parse_test_results_from_output(self, output: str) -> Dict[str, TestCaseResult]: + def _parse_test_results_from_output(self, output: str, test_class_num: Optional[str] = None) -> Dict[str, TestCaseResult]: results: Dict[str, TestCaseResult] = dict() success_match = re.search(r'OK \((?P\d+) tests?\)', output) if success_match: - number_of_tests = int(success_match.group('number_of_tests')) - for i in range(0, number_of_tests): - test_case_name = 'test{number:0{width}d}'.format(width=len(str(number_of_tests)), number=i) + if test_class_num: + test_case_name = f'test{test_class_num}' results[test_case_name] = TestCaseResult.PASS + else: + number_of_tests = int(success_match.group('number_of_tests')) + for i in range(0, number_of_tests): + test_case_name = 'test{number:0{width}d}'.format(width=len(str(number_of_tests)), number=i) + results[test_case_name] = TestCaseResult.PASS else: failed_tests = re.findall(r'(?Ptest\d+)\([A-Za-z0-9_.]+\)', output) for failed_test in failed_tests: @@ -88,8 +147,14 @@ def _parse_test_results_from_output(self, output: str) -> Dict[str, TestCaseResu if results: for i in range(0, test_run_count): test_case_name = 'test{number:0{width}d}'.format(width=len(str(test_run_count)), number=i) - if not results.get(test_case_name): + if not results.get(test_case_name) and test_run_count > 1: results[test_case_name] = TestCaseResult.PASS + if not results: + if test_class_num: + test_case_name = f'test{test_class_num}' + else: + test_case_name = 'test0' + results[test_case_name] = TestCaseResult.NOT_EXECUTABLE return results def execute_test_suite_with_coverage(self, test_suite: TestSuite, target_jar: str, test_cases: List[str]) -> str: @@ -127,5 +192,5 @@ def _execute_junit_5(self, test_suite: TestSuite, target_jar: str, test_targets: ['org.junit.platform.console.ConsoleLauncher'] + test_targets return self._java.exec_java(test_suite.path, self._java.get_env(), TIMEOUT, *params) - except subprocess.CalledProcessError as error: + except subprocess.CalledProcessError: return None diff --git a/nimrod/tests/env-config.json b/nimrod/tests/env-config.json index 7578cf3b..9934a146 100644 --- a/nimrod/tests/env-config.json +++ b/nimrod/tests/env-config.json @@ -7,5 +7,17 @@ "input_path": "", "tests_dst": "", "path_output_csv": "", - "logger_level": "DEBUG" + "logger_level": "DEBUG", + "test_suite_generators":["ollama", "evosuite", "randoop"], + "test_suite_generation_search_time_available":"45", + "prompt_template": "zero_shot", + "api_params": { + "codellama-70b": { + "model": "codellama:70b", + "api_url": "http://ip/api/chat", + "temperature": 0.7, + "seed": 42, + "timeout_seconds": 300 + } + } } diff --git a/nimrod/tests/example/pom.xml b/nimrod/tests/example/pom.xml index 56e5109d..47dc02f7 100644 --- a/nimrod/tests/example/pom.xml +++ b/nimrod/tests/example/pom.xml @@ -14,7 +14,7 @@ junit junit - 4.12 + 4.13.1 test diff --git a/nimrod/tests/test_utils.py b/nimrod/tests/test_utils.py index 4c89c24f..40449164 100644 --- a/nimrod/tests/test_utils.py +++ b/nimrod/tests/test_utils.py @@ -22,7 +22,7 @@ def setUp(self): self.java = Java(self.java_home) self.maven = Maven(self.java, self.maven_home) - self.maven.compile(calculator_project_dir(), 10) + self.maven.compile(calculator_project_dir(), 20) def test_get_files(self): classes = get_files(calculator_target_dir()) diff --git a/nimrod/tests/tools/test_maven.py b/nimrod/tests/tools/test_maven.py index 6e31155c..6ded82db 100644 --- a/nimrod/tests/tools/test_maven.py +++ b/nimrod/tests/tools/test_maven.py @@ -81,8 +81,8 @@ def test_extract_results(self): '/a/b/c/target/classes\n[INFO] 0asdjhaskdjf Compiling') results = Maven.extract_results(output) - self.assertEquals(6, results.source_files) - self.assertEquals('/a/b/c/target/classes', results.classes_dir) + self.assertEqual(6, results.source_files) + self.assertEqual('/a/b/c/target/classes', results.classes_dir) @staticmethod def _clear_environment(): diff --git a/nimrod/tests/tools/test_mujava.py b/nimrod/tests/tools/test_mujava.py index 81b54bc0..3829662b 100644 --- a/nimrod/tests/tools/test_mujava.py +++ b/nimrod/tests/tools/test_mujava.py @@ -42,7 +42,7 @@ def test_read_log_without_log_dir(self): mujava = MuJava(self.java, calculator_mutants_dir()) mutants = mujava.read_log() - self.assertEquals(3, len(mutants)) + self.assertEqual(3, len(mutants)) def test_not_found_log(self): mujava = MuJava(self.java, calculator_mutants_dir()) diff --git a/nimrod/tests/utils.py b/nimrod/tests/utils.py index 0f199c3b..d3726a3b 100644 --- a/nimrod/tests/utils.py +++ b/nimrod/tests/utils.py @@ -2,18 +2,14 @@ import os import json import shutil -from typing import Dict +from typing import Dict, Any PATH = os.path.dirname(os.path.abspath(__file__)) -def get_config() -> "Dict[str, str]": - config: "Dict[str, str]" = dict() - - with open(os.path.join(PATH, os.sep.join(['env-config.json'])), 'r') as j: - config = json.loads(j.read()) - - return config +def get_config() -> Dict[str, Any]: + with open(os.path.join(PATH, "env-config.json"), 'r') as j: + return json.load(j) def calculator_project_dir(): @@ -76,14 +72,30 @@ def calculator_sum_aor_1(): def setup_logging(): config = get_config() - config_level = config.get('logger_level') - level = logging._nameToLevel[config_level] if config_level else logging.INFO - logging.basicConfig( - level=level, - format='%(asctime)s.%(msecs)03d %(levelname)s %(module)s - %(funcName)s: %(message)s', + config_level = config.get('logger_level', 'INFO').upper() + level = logging._nameToLevel.get(config_level, logging.INFO) + + logger = logging.getLogger() + logger.setLevel(level) + + if logger.hasHandlers(): + logger.handlers.clear() + + formatter = logging.Formatter( + '[%(asctime)s] %(levelname)s %(filename)s:%(lineno)d: %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) + file_handler = logging.FileHandler('logfile.log', mode='a') + file_handler.setFormatter(formatter) + + stream_handler = logging.StreamHandler() + stream_handler.setFormatter(formatter) + + logger.addHandler(file_handler) + logger.addHandler(stream_handler) def get_base_output_path() -> str: - return os.getcwd().replace("/nimrod/proj", "/")+'/output-test-dest/' if os.getcwd().__contains__("/nimrod/proj") else os.getcwd() + "/output-test-dest/" + current_dir = os.getcwd() + base_dir = current_dir.replace("/nimrod/proj", "") if "/nimrod/proj" in current_dir else current_dir + return os.path.join(base_dir, "output-test-dest", "projects") diff --git a/nimrod/tools/evosuite.py b/nimrod/tools/evosuite.py index 7bed4412..0ca96e6a 100644 --- a/nimrod/tools/evosuite.py +++ b/nimrod/tools/evosuite.py @@ -97,7 +97,7 @@ def generate_differential(self, mutant_classpath, make_dir=True): def get_format_evosuite_method_name(self): method_name = "" try: - pattern = re.compile("\.[a-zA-Z0-9\-\_]*\([\s\S]*") + pattern = re.compile(r"\.[a-zA-Z0-9\-\_]*\([\s\S]*") result = pattern.search(self.sut_method) method_name = result.group(0)[1:] except Exception as e: diff --git a/nimrod/tools/jacoco.py b/nimrod/tools/jacoco.py index f85cd64b..d0017e33 100644 --- a/nimrod/tools/jacoco.py +++ b/nimrod/tools/jacoco.py @@ -48,7 +48,7 @@ def dealingWithDuplicatedFilesOnJars(self, jarFile, message_error): def parseDuplicatedFile(self, message_error): "Exception in thread \"main\" java.util.zip.ZipException: duplicate entry: META-INF/LICENSE.txt" - x = re.search("duplicate entry\: .*", message_error, re.IGNORECASE) + x = re.search(r"duplicate entry: .*", message_error, re.IGNORECASE) if x: fileName = str(message_error.split("duplicate entry: ")[1]).split("\n")[0] if ("$" in fileName): diff --git a/nimrod/tools/junit.py b/nimrod/tools/junit.py index 0683ed5d..bb7a8385 100644 --- a/nimrod/tools/junit.py +++ b/nimrod/tools/junit.py @@ -131,9 +131,9 @@ def _extract_test_id(output): list_failed_tests = [] list_failed_tests = re.findall(r'test[0-9]+\([A-Za-z0-9_.]+\)', output) - number_executed_tests = int(re.findall('Tests run: \d+', output)[0].split("Tests run: ")[-1]) + number_executed_tests = int(re.findall(r'Tests run: \d+', output)[0].split("Tests run: ")[-1]) for test in list_failed_tests: - i = re.findall('\d+', test) + i = re.findall(r'\d+', test) test_case = re.findall(r'.+?(?=\()', test)[0] file = str(re.findall(r'\(.+?(?=\))', test)[0]).split(".")[-1] #re.findall(r'\(.+?(?=\))', test)[0][1:].to_s.split(".")[-1] diff --git a/nimrod/tools/ollama.py b/nimrod/tools/ollama.py new file mode 100644 index 00000000..939ef04e --- /dev/null +++ b/nimrod/tools/ollama.py @@ -0,0 +1,22 @@ +import os + +from nimrod.tools.suite_generator import SuiteGenerator +from nimrod.utils import get_class_files + + +class Ollama(SuiteGenerator): + + def _get_tool_name(self): + return "ollama" + + def _test_classes(self): + classes = [] + + for class_file in sorted(get_class_files(self.suite_classes_dir)): + filename, _ = os.path.splitext(class_file) + classes.append(filename.replace(os.sep, '.')) + + return classes + + def _get_suite_dir(self): + return os.path.join(self.suite_dir, 'ollama-tests') diff --git a/nimrod/utils.py b/nimrod/utils.py index 6280f380..b6cf35c1 100644 --- a/nimrod/utils.py +++ b/nimrod/utils.py @@ -1,4 +1,5 @@ import os +import json def get_class_files(path): @@ -32,3 +33,23 @@ def package_to_dir(package): def dir_to_package(directory): return directory.replace(os.sep, '.') + + +def load_json(file_path): + """Loads a JSON file and return its content as a dictionary""" + with open(file_path, "r") as file: + try: + content = json.load(file) + except json.JSONDecodeError: + content = {} + return content + + +def save_json(file_path, content): + """Saves a dictionary as a JSON file""" + try: + os.makedirs(os.path.dirname(file_path), exist_ok=True) + with open(file_path, "w") as file: + json.dump(content, file, indent=4) + except (OSError, IOError) as e: + print(f"Error saving JSON to {file_path}: {e}") diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 00000000..200ef1ee --- /dev/null +++ b/pytest.ini @@ -0,0 +1,6 @@ +[tool:pytest] +python_classes = *Test *Tests +python_functions = test_* +addopts = --tb=short +filterwarnings = + ignore::pytest.PytestCollectionWarning \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 91bbd0f5..14a71772 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,6 @@ -BeautifulSoup4 +beautifulsoup4 +setuptools +tree_sitter==0.25.1 +tree_sitter_java==0.23.5 +requests +types-requests \ No newline at end of file diff --git a/setup.py b/setup.py index 64337572..c0854b14 100644 --- a/setup.py +++ b/setup.py @@ -24,8 +24,8 @@ def readme(): install_requires=[ 'argparse==1.4.0', 'beautifulsoup4==4.6.0', - 'pygithub==1.43.7', - 'gitpython==2.1.11' + 'pygithub==2.5.0', + 'gitpython==3.1.41' ], test_suite='nose.collector', tests_require=[