diff --git a/pyproject.toml b/pyproject.toml index da2095840..3595bd16b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -44,6 +44,8 @@ dependencies = [ "numpy>=1.24; python_version >= '3.11'", "numpy>=1.22", "numpy>=1.22,<2; sys_platform == 'darwin' and 'x86_64' in platform_machine and python_version < '3.13'", # Restrict numpy v2 for macOS x86 since it is not supported anymore since torch v2.3.0 + "optuna>=4.5.0", + "torch-geometric>=2.6.1", "torch>=2.7.1,<2.8.0; sys_platform == 'darwin' and 'x86_64' in platform_machine and python_version < '3.13'", # Restrict torch v2.3.0 for macOS x86 since it is not supported anymore. "typing-extensions>=4.1", # for `assert_never` ] @@ -119,6 +121,8 @@ xfail_strict = true filterwarnings = [ 'error', 'ignore:.*pytorch.*:UserWarning:', + "ignore:.*torch_geometric.*:UserWarning:", + "ignore:.*'type_params' parameter of 'typing\\._eval_type'.*:DeprecationWarning:", 'ignore:.*Values in x.*:RuntimeWarning:', 'ignore:.*The least populated class in y has only 3 members, which is less than n_splits=5.*:UserWarning:', 'ignore:.*divide by zero encountered in det.*:RuntimeWarning:', @@ -164,9 +168,13 @@ implicit_reexport = true # recent versions of `gym` are typed, but stable-baselines3 pins a very old version of gym. # qiskit is not yet marked as typed, but is typed mostly. # the other libraries do not have type stubs. -module = ["qiskit.*", "joblib.*", "sklearn.*", "matplotlib.*", "gymnasium.*", "mqt.bench.*", "sb3_contrib.*", "bqskit.*", "qiskit_ibm_runtime.*", "networkx.*", "stable_baselines3.*"] +module = ["qiskit.*", "joblib.*", "sklearn.*", "matplotlib.*", "gymnasium.*", "mqt.bench.*", "sb3_contrib.*", "bqskit.*", "qiskit_ibm_runtime.*", "networkx.*", "stable_baselines3.*", "torch", "torch.*", "torch_geometric", "torch_geometric.*", "optuna.*"] ignore_missing_imports = true +[[tool.mypy.overrides]] +module = ["mqt.predictor.ml.*"] +disallow_subclassing_any = false + [tool.ruff] line-length = 120 extend-include = ["*.ipynb"] @@ -245,6 +253,7 @@ wille = "wille" anc = "anc" aer = "aer" fom = "fom" +TPE = "TPE" [tool.repo-review] ignore = ["GH200"] diff --git a/src/mqt/predictor/_version.py b/src/mqt/predictor/_version.py new file mode 100644 index 000000000..79c219efe --- /dev/null +++ b/src/mqt/predictor/_version.py @@ -0,0 +1,40 @@ +# Copyright (c) 2023 - 2025 Chair for Design Automation, TUM +# Copyright (c) 2025 Munich Quantum Software Company GmbH +# All rights reserved. +# +# SPDX-License-Identifier: MIT +# +# Licensed under the MIT License + +# file generated by setuptools-scm +# don't change, don't track in version control +from __future__ import annotations + +__all__ = [ + "__commit_id__", + "__version__", + "__version_tuple__", + "commit_id", + "version", + "version_tuple", +] + +TYPE_CHECKING = False +if TYPE_CHECKING: + VERSION_TUPLE = tuple[int | str, ...] + COMMIT_ID = str | None +else: + VERSION_TUPLE = object + COMMIT_ID = object + +version: str +__version__: str +__version_tuple__: VERSION_TUPLE +version_tuple: VERSION_TUPLE +commit_id: COMMIT_ID +__commit_id__: COMMIT_ID + +__version__ = version = "0.1.dev719+g5ea17201a.d20250908" +__version_tuple__ = version_tuple = (0, 1, "dev719", "g5ea17201a.d20250908") + +__commit_id__ = commit_id = None diff --git a/src/mqt/predictor/hellinger/utils.py b/src/mqt/predictor/hellinger/utils.py index 6f1a3fffa..1522a8d10 100644 --- a/src/mqt/predictor/hellinger/utils.py +++ b/src/mqt/predictor/hellinger/utils.py @@ -132,12 +132,12 @@ def calc_device_specific_features( return np.array(list(feature_dict.values())) -def get_hellinger_model_path(device: Target) -> Path: +def get_hellinger_model_path(device: Target, gnn: bool = False) -> Path: """Returns the path to the trained model folder resulting from the machine learning training.""" - training_data_path = Path(str(resources.files("mqt.predictor"))) / "ml" / "training_data" + training_data_path = Path(str(resources.files("mqt.predictor"))) / "ml" / "training_data" / "trained_model" model_path = ( - training_data_path - / "trained_model" - / ("trained_hellinger_distance_regressor_" + device.description + ".joblib") + (training_data_path / ("trained_hellinger_distance_regressor_gnn_" + device.description + ".pth")) + if gnn + else (training_data_path / ("trained_hellinger_distance_regressor_" + device.description + ".joblib")) ) return Path(model_path) diff --git a/src/mqt/predictor/ml/__init__.py b/src/mqt/predictor/ml/__init__.py index 6887f5367..151ece6a4 100644 --- a/src/mqt/predictor/ml/__init__.py +++ b/src/mqt/predictor/ml/__init__.py @@ -13,4 +13,9 @@ from mqt.predictor.ml import helper from mqt.predictor.ml.predictor import Predictor, predict_device_for_figure_of_merit, setup_device_predictor -__all__ = ["Predictor", "helper", "predict_device_for_figure_of_merit", "setup_device_predictor"] +__all__ = [ + "Predictor", + "helper", + "predict_device_for_figure_of_merit", + "setup_device_predictor", +] diff --git a/src/mqt/predictor/ml/gnn.py b/src/mqt/predictor/ml/gnn.py new file mode 100644 index 000000000..cb2262c25 --- /dev/null +++ b/src/mqt/predictor/ml/gnn.py @@ -0,0 +1,139 @@ +# Copyright (c) 2023 - 2025 Chair for Design Automation, TUM +# Copyright (c) 2025 Munich Quantum Software Company GmbH +# All rights reserved. +# +# SPDX-License-Identifier: MIT +# +# Licensed under the MIT License + +"""This module contains the GNN module for graph neural networks.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +import torch +import torch.nn as nn +import torch.nn.functional as functional +from torch_geometric.nn import SAGEConv, global_mean_pool + +if TYPE_CHECKING: + from collections.abc import ( + Callable, # on 3.10+ prefer collections.abc + ) + + from torch_geometric.data import Data + + +class GraphConvolutionSage(nn.Module): + """Graph convolutional layer using SAGEConv.""" + + def __init__( + self, + in_feats: int, + hidden_dim: int, + num_resnet_layers: int, + *, + conv_activation: Callable[..., torch.Tensor] = functional.leaky_relu, + conv_act_kwargs: dict[str, Any] | None = None, + ) -> None: + """A flexible SageConv graph classification model. + + Args: + in_feats: dimensionality of node features + hidden_dim: output size of SageConv + num_resnet_layers: how many SageConv layers (with residuals) to stack after the SageConvs + mlp_units: list of units for each layer of the final MLP + conv_activation: activation fn after each graph layer + conv_act_kwargs: extra kwargs for conv_activation + final_activation: activation applied to the final scalar output + """ + super().__init__() + self.conv_activation = conv_activation + self.conv_act_kwargs = conv_act_kwargs or {} + + # --- GRAPH ENCODER --- + self.convs = nn.ModuleList() + # 1) Convolution not in residual configuration + # Possible to generalize the code + self.convs.append(SAGEConv(in_feats, hidden_dim)) + self.convs.append(SAGEConv(hidden_dim, hidden_dim)) + + for _ in range(num_resnet_layers): + self.convs.append(SAGEConv(hidden_dim, hidden_dim)) + + def forward(self, data: Data) -> torch.Tensor: + """Forward function that allows to elaborate the input graph.""" + x, edge_index, batch = data.x, data.edge_index, data.batch + # 1) Graph stack with residuals + for i, conv in enumerate(self.convs): + x_new = conv(x, edge_index) + x_new = self.conv_activation(x_new, **self.conv_act_kwargs) + # the number 2 is set because two convolution without residual configuration are applied + # and then all the others are in residual configuration + x = x_new if i < 2 else x + x_new + + # 2) Global pooling + return global_mean_pool(x, batch) + + +class GNN(nn.Module): + """Architecture composed by a Graph Convolutional part with Sage Convolution module and followed by a MLP.""" + + def __init__( + self, + in_feats: int, + hidden_dim: int, + num_resnet_layers: int, + mlp_units: list[int], + *, + conv_activation: Callable[..., torch.Tensor] = functional.leaky_relu, + conv_act_kwargs: dict[str, Any] | None = None, + mlp_activation: Callable[..., torch.Tensor] = functional.leaky_relu, + mlp_act_kwargs: dict[str, Any] | None = None, + classes: list[str] | None = None, + output_dim: int = 1, + ) -> None: + """Init class for the GNN. + + Arguments: + in_feats: dimension of input features of the node + hidden_dim: dimension of hidden output channels of the Convolutional part + num_resnet_layers: number of residual layers + mlp_units: list of units for each layer of the final MLP + conv_activation: activation fn after each graph layer + conv_act_kwargs: extra kwargs for conv_activation. + mlp_activation: activation fn after each MLP layer + mlp_act_kwargs: extra kwargs for mlp_activation. + output_dim: dimension of the output, default is 1 for regression tasks + classes: list of class names for classification tasks + """ + super().__init__() + # Convolutional part + self.graph_conv = GraphConvolutionSage( + in_feats, hidden_dim, num_resnet_layers, conv_activation=conv_activation, conv_act_kwargs=conv_act_kwargs + ) + + # MLP architecture + self.mlp_activation = mlp_activation + self.mlp_act_kwargs = mlp_act_kwargs or {} + self.classes = classes + self.fcs = nn.ModuleList() + last_dim = hidden_dim + for out_dim in mlp_units: + self.fcs.append(nn.Linear(last_dim, out_dim)) + last_dim = out_dim + self.out = nn.Linear(last_dim, output_dim) + + def forward(self, data: Data) -> torch.Tensor: + """Forward function that allows to elaborate the input graph. + + Arguments: + data: The input graph data. + """ + # apply the convolution + x = self.graph_conv(data) + # Apply the MLP + for fc in self.fcs: + x = self.mlp_activation(fc(x), **self.mlp_act_kwargs) + return self.out(x) diff --git a/src/mqt/predictor/ml/helper.py b/src/mqt/predictor/ml/helper.py index 4550cf015..3858546a6 100644 --- a/src/mqt/predictor/ml/helper.py +++ b/src/mqt/predictor/ml/helper.py @@ -10,17 +10,37 @@ from __future__ import annotations +from copy import deepcopy from dataclasses import dataclass from importlib import resources from pathlib import Path from typing import TYPE_CHECKING +import numpy as np +import torch +from qiskit.converters import circuit_to_dag +from qiskit.transpiler import PassManager +from qiskit.transpiler.passes import RemoveBarriers +from sklearn.metrics import ( + accuracy_score, + average_precision_score, + f1_score, + mean_absolute_error, + mean_squared_error, + precision_score, + r2_score, + recall_score, + roc_auc_score, +) +from torch import nn + from mqt.predictor.utils import calc_supermarq_features if TYPE_CHECKING: - import numpy as np + import torch_geometric from numpy._typing import NDArray from qiskit import QuantumCircuit + from qiskit.dagcircuit import DAGOpNode def get_path_training_data() -> Path: @@ -40,6 +60,11 @@ def get_path_trained_model(figure_of_merit: str) -> Path: return get_path_training_data() / "trained_model" / ("trained_clf_" + figure_of_merit + ".joblib") +def get_path_trained_model_gnn(figure_of_merit: str) -> Path: + """Returns the path to the trained model folder resulting from the GNN training.""" + return get_path_training_data() / "trained_model" / ("trained_gnn_" + figure_of_merit + ".pth") + + def get_path_training_circuits() -> Path: """Returns the path to the training circuits folder.""" return get_path_training_data() / "training_circuits" @@ -99,6 +124,46 @@ def get_openqasm_gates() -> list[str]: ] +def get_openqasm3_gates() -> list[str]: + """Returns a list of all quantum gates within the openQASM 3.0 standard header.""" + return [ + # Single-qubit + "id", + "x", + "y", + "z", + "h", + "s", + "sdg", + "t", + "tdg", + "sx", + "p", + "rx", + "ry", + "rz", + "u", + # Two-qubit + "cx", + "cy", + "cz", + "ch", + "cp", + "crx", + "cry", + "crz", + "cu", + "swap", + # Three-qubit + "ccx", + "cswap", + # OpenQasm2 compatibility + "u1", + "u2", + "u3", + ] + + def dict_to_featurevector(gate_dict: dict[str, int]) -> dict[str, int]: """Calculates and returns the feature vector of a given quantum circuit gate dictionary.""" res_dct = dict.fromkeys(get_openqasm_gates(), 0) @@ -137,14 +202,500 @@ def create_feature_vector(qc: QuantumCircuit) -> list[int | float]: return list(feature_dict.values()) +def create_dag(qc: QuantumCircuit) -> tuple[torch.Tensor, torch.Tensor, int]: + """Creates and returns the feature-annotated DAG of the quantum circuit. + + Arguments: + qc: the quantum circuit to be compiled + + Returns: + node_vector: node vectors, each element of the vector contains a vector + which describes the type of operation, the qubits involved + and the associated parameters + edge_index: edge_matrix describing the associated graph + number_of_gates: the number of nodes, and so the operations applied + """ + # Get the number of qubits + num_qubits = qc.num_qubits + # remove barriers + pm = PassManager(RemoveBarriers()) + qc = pm.run(qc) + # Transform the circuit into a DAG + dag = circuit_to_dag(qc) + + unique_gates = [*get_openqasm3_gates(), "measure"] + gate2idx = {g: i for i, g in enumerate(unique_gates)} + number_unique_gates = len(unique_gates) + + def qubit_vector(node: DAGOpNode) -> list[int]: + """Return [target, ctrl1, ctrl2], fill -1 if missing.""" + qinds = [qc.find_bit(q).index for q in node.qargs] + # from the node get the number of control qubits (if field missing, set 0) + n_ctrl = getattr(node.op, "num_ctrl_qubits", 0) + # assume controls appear first, then target: + ctrls = qinds[:n_ctrl] + + tgt = qinds[n_ctrl:] if qinds else [-1] * (len(qinds) - n_ctrl) + # pad to 2 controls + ctrls = ctrls + [-1] * (3 - len(qinds)) + return tgt + ctrls + + # helper to extract up to 3 real-valued params + def param_vector(node: DAGOpNode, dim: int = 3) -> list[float]: + p = [float(val) for val in node.op.params] + p = p[:dim] # truncate if more than dim + return p + [0.0] * (dim - len(p)) # pad with zeros + + nodes = list(dag.op_nodes()) + number_of_gates = len(nodes) + + # preallocate feature arrays + onehots = torch.zeros((number_of_gates, number_unique_gates), dtype=torch.float) + qubits = torch.full((number_of_gates, 3), -1, dtype=torch.float) + params = torch.zeros((number_of_gates, 3), dtype=torch.float) + + for i, node in enumerate(nodes): + # 2a) one-hot gate + # check if name gate in unique_gates + if node.op.name not in unique_gates: + # otherwise raise an error + msg = f"Unknown gate: {node.op.name}" + raise ValueError(msg) + onehots[i, gate2idx[node.op.name]] = 1.0 + + # 2b) [target, ctrl1, ctrl2] + val = torch.tensor(qubit_vector(node)) / num_qubits + qubits[i] = val.clone() + # 2c) up to 3 angle params + params[i] = torch.tensor(param_vector(node), dtype=torch.float) % (2 * np.pi) + + node_vector = torch.cat([onehots, qubits, params], dim=1) + + # build edges + idx_map = {node: i for i, node in enumerate(nodes)} + edges = [] + for src, dst, _ in dag.edges(): + if src in idx_map and dst in idx_map: + edges.append([idx_map[src], idx_map[dst]]) + edge_index = torch.tensor(edges, dtype=torch.long).t().contiguous() + return node_vector, edge_index, number_of_gates + + +def evaluate_classification_model( + model: nn.Module, + loader: torch_geometric.loader.DataLoader, + loss_fn: nn.Module, + *, + task: str = "binary", + device: str | None = None, + return_arrays: bool = False, + verbose: bool = False, +) -> tuple[float, dict[str, float], tuple[np.ndarray, np.ndarray] | None]: + """Evaluate the classifier models, it returns a dictionary with all the metrics considered for both binary and multiclass classification. + + Arguments: + model: the model to be evaluated, model's output must be logits + loader: contain the set in a minibatch approach + loss_fn: is the loss function used + task: describe which kind of classification is done + device: where to run the evaluation (gpu or cpu) + return_arrays: decide if return the probability and targets. + verbose: set as True if you want also the metrics results + Returns: + avg_loss: average loss measured + metrics: dictionary containing the metrics of the model + arrays: an array containing the probabilities of the targets and the actual value + """ + if device is None: + device = "cuda" if torch.cuda.is_available() else "cpu" + device = torch.device(device) + + model.eval() + total_loss, total = 0.0, 0 + all_logits, all_targets = [], [] + arrays = None + need_arrays = return_arrays or verbose + + # --- no decorator; use context manager instead --- + with torch.no_grad(): + for batch in loader: + batch_device = batch.to(device) + logits = model(batch_device) # [B,1] or [B,K] + # y = batch_device.y.view_as(logits) + y = batch_device.y + # unify shapes for loss computation + if task == "multiclass": + if y.dim() > 1: + y = y.squeeze(-1) + y_loss = y.long() + bs = y_loss.size(0) + elif task == "binary": + y_loss = y.float().view(-1, 1) + bs = y_loss.size(0) + else: + msg = f"Unknown task: {task}" + raise ValueError(msg) + + loss = loss_fn(logits, y_loss) + total_loss += loss.item() * bs + total += bs + + all_logits.append(logits.detach().cpu()) + all_targets.append(y.detach().cpu()) + + avg_loss = total_loss / max(1, total) + if need_arrays: + logits = torch.cat(all_logits, dim=0) + y_true = torch.cat(all_targets, dim=0) + else: + logits = y_true = None + metrics: dict[str, float] = {"loss": float(avg_loss)} + # ---- Convert logits -> probs / preds & compute sklearn metrics ---- + if verbose: + if task == "binary": + probs = torch.sigmoid(logits).squeeze(-1).numpy() # [N] + y_bin = y_true.view(-1).numpy().astype(int) # [N] + preds = (probs >= 0.5).astype(int) + + metrics["accuracy"] = accuracy_score(y_bin, preds) + metrics["precision"] = precision_score(y_bin, preds, zero_division=0) + metrics["recall"] = recall_score(y_bin, preds, zero_division=0) + metrics["f1"] = f1_score(y_bin, preds, zero_division=0) + if np.unique(y_bin).size > 1: + metrics["roc_auc"] = roc_auc_score(y_bin, probs) + metrics["avg_prec"] = average_precision_score(y_bin, probs) + if return_arrays: + arrays = (probs, y_bin) + + elif task == "multiclass": + probs = torch.softmax(logits, dim=1).numpy() # [N,K] + preds = probs.argmax(axis=1) # [N] + y_mc = y_true.view(-1).numpy().astype(int) + metrics["accuracy"] = accuracy_score(y_mc, preds) + metrics["f1_macro"] = f1_score(y_mc, preds, average="macro", zero_division=0) + metrics["f1_micro"] = f1_score(y_mc, preds, average="micro", zero_division=0) + if return_arrays: + arrays = (probs, y_mc) + + return avg_loss, metrics, arrays + + +def train_classification_model( + model: nn.Module, + train_loader: torch_geometric.loader.DataLoader, + optimizer: torch.optim.Optimizer, + loss_fn: nn.Module, + num_epochs: int, + *, + task: str = "binary", + device: str | None = None, + verbose: bool = True, + val_loader: torch_geometric.loader.DataLoader = None, + patience: int = 10, + min_delta: float = 0.0, + restore_best: bool = True, + scheduler: torch.optim.lr_scheduler._LRScheduler | None = None, +) -> None: + """Trains a GNN model with optional early stopping on validation loss. + + Arguments: + model: the model to be trained + train_loader: training set split in mini-batch + optimizer: the optimizer chosen + loss_fn: loss function adopted + num_epochs: number of epochs set for training + task: type of classification (binary, multiclass) + device: if the code is run on a cpu or a gpu + verbose: if set true print the results obtained during the training + val_loader: validation set which allows also to understand if apply early-stopping methods + patience: variable used for saying how many epochs waiting for the early-stopping + min_delta: if the loss is lower that delta, patience is incremented; otherwise reset it + restore_best: allows to restore the best model found during training + scheduler: scheduler used for training (optionally) + """ + if device is None: + device = "cuda" if torch.cuda.is_available() else "cpu" + device = torch.device(device) + model.to(device) + + best_state = None + best_metric = float("inf") + best_metrics_dict: dict[str, float] = {} + epochs_no_improve = 0 + + for epoch in range(1, num_epochs + 1): + model.train() + running_loss, total = 0.0, 0 + + for batch in train_loader: + batch_device = batch.to(device) + logits = model(batch_device) + # y = batch_device.y.view_as(logits) + y = batch_device.y + if task == "multiclass": + if y.dim() > 1: + y = y.squeeze(-1) + y_loss = y.long() + bs = y_loss.size(0) + elif task == "binary": + y_loss = y.float().view(-1, 1) + bs = y_loss.size(0) + else: + msg = f"Unknown task: {task}" + raise ValueError(msg) + + loss = loss_fn(logits, y_loss) + optimizer.zero_grad() + loss.backward() + optimizer.step() + + running_loss += loss.item() * bs + total += bs + + train_loss = running_loss / max(1, total) + if scheduler is not None: + scheduler.step() + + if val_loader is not None: + val_loss, val_metrics, _ = evaluate_classification_model( + model, val_loader, loss_fn, task=task, device=str(device), verbose=verbose, return_arrays=False + ) + + improved = (best_metric - val_loss) > min_delta + if improved: + best_metric = val_loss + best_state = {k: v.detach().cpu() for k, v in model.state_dict().items()} #deepcopy(model.state_dict()) # freeze best weights + best_metrics_dict = {"val_" + k: v for k, v in val_metrics.items()} + best_metrics_dict["train_loss_at_best"] = float(train_loss) + epochs_no_improve = 0 + else: + epochs_no_improve += 1 + + if verbose: + metrics_str = " | ".join(f"{k}={v:.6f}" for k, v in val_metrics.items()) + print( + f"Epoch {epoch:03d}/{num_epochs} | train_loss={train_loss:.6f} | {metrics_str} | " + f"no_improve={epochs_no_improve}/{patience} | metrics={best_metrics_dict}" + ) + + if epochs_no_improve >= patience: + if verbose: + print(f"Early stopping at epoch {epoch} (best val_loss={best_metric:.6f}).") + break + else: + # Optional early stopping on training loss only + improved = (best_metric - train_loss) > min_delta + if improved: + best_metric = train_loss + best_state = {k: v.detach().cpu() for k, v in model.state_dict().items()} #deepcopy(model.state_dict()) # freeze best weights + epochs_no_improve = 0 + else: + epochs_no_improve += 1 + if verbose: + print( + f"Epoch {epoch:03d}/{num_epochs} | train_loss={train_loss:.6f} | " + f"no_improve={epochs_no_improve}/{patience}" + ) + if epochs_no_improve >= patience: + if verbose: + print(f"Early stopping on training loss at epoch {epoch} (best train_loss={best_metric:.6f}).") + break + + if restore_best and best_state is not None: + model.load_state_dict(best_state) + model.to(device) + + +def evaluate_regression_model( + model: nn.Module, + loader: torch_geometric.loader.DataLoader, + loss_fn: nn.Module, + *, + device: str | None = None, + return_arrays: bool = False, + verbose: bool = False, +) -> tuple[float, dict[str, float], tuple[np.ndarray, np.ndarray] | None]: + """Evaluate a regression model (logits = scalar predictions). + + Arguments: + model: regression model to be evaluated + loader: data loader for the evaluation dataset + loss_fn: loss function for evaluation + device: device to be used for evaluation (gpu or cpu) + return_arrays: whether to return prediction and target arrays + verbose: whether to print the metrics results. + + Returns: + avg_loss: average loss over the loader + metrics: {"rmse": ..., "mae": ..., "r2": ...} + arrays: (preds, y_true) if return_arrays=True, else None + """ + if device is None: + device = "cuda" if torch.cuda.is_available() else "cpu" + device = torch.device(device) + + model.eval() + total_loss, total = 0.0, 0 + all_preds, all_targets = [], [] + + with torch.no_grad(): + for batch in loader: + batch_device = batch.to(device) + logits = model(batch_device) + y = batch_device.y.float().view_as(logits) + + loss = loss_fn(logits, y) + bs = y.numel() + total_loss += loss.item() * bs + total += bs + + # porta a 1D per metriche + preds_1d = logits.view(-1).detach().cpu().numpy() + y_1d = y.view(-1).detach().cpu().numpy() + all_preds.append(preds_1d) + all_targets.append(y_1d) + + avg_loss = total_loss / max(1, total) + preds = np.concatenate(all_preds, axis=0) if all_preds else np.array([]) + y_true = np.concatenate(all_targets, axis=0) if all_targets else np.array([]) + + metrics: dict[str, float] = {"loss": float(avg_loss)} + if preds.size > 0: + rmse = float(np.sqrt(mean_squared_error(y_true, preds))) + mae = float(mean_absolute_error(y_true, preds)) + r2 = float(r2_score(y_true, preds)) if np.var(y_true) > 0 else float("nan") + metrics.update({"rmse": rmse, "mae": mae, "r2": r2}) + + if verbose: + print(f"[Eval] loss={avg_loss:.6f} | rmse={rmse:.6f} | mae={mae:.6f} | r2={metrics['r2']:.6f}") + + arrays = (preds, y_true) if return_arrays else None + return avg_loss, metrics, arrays + + +def train_regression_model( + model: nn.Module, + train_loader: torch_geometric.loader.DataLoader, + optimizer: torch.optim.Optimizer, + loss_fn: nn.Module, + num_epochs: int, + *, + device: str | None = None, + verbose: bool = True, + val_loader: torch_geometric.loader.DataLoader | None = None, + patience: int = 10, + min_delta: float = 0.0, + restore_best: bool = True, + scheduler: torch.optim.lr_scheduler._LRScheduler | None = None, +) -> None: + """Train a regression model with optional early stopping on validation loss. + + Arguments: + model: regression model to be trained + train_loader: training set split into mini-batch + optimizer: optimizer for model training + loss_fn: loss function for training + num_epochs: number of training epochs + device: device to be used for training (gpu or cpu) + verbose: whether to print progress messages + val_loader: validation set split into mini-batch (optional) + patience: number of epochs with no improvement after which training will be stopped + min_delta: minimum change in the monitored quantity to qualify as an improvement + restore_best: whether to restore model weights from the epoch with the best validation loss + scheduler: learning rate scheduler (optional) + """ + if device is None: + device = "cuda" if torch.cuda.is_available() else "cpu" + device = torch.device(device) + model.to(device) + + best_state = None + best_metric = float("inf") + best_metrics_dict: dict[str, float] = {} + epochs_no_improve = 0 + + for epoch in range(1, num_epochs + 1): + model.train() + running_loss, total = 0.0, 0 + + for batch in train_loader: + batch_device = batch.to(device) + preds = model(batch_device) # [B] o [B,1] + # align y + y = batch_device.y.float().view_as(preds) + + loss = loss_fn(preds, y) + optimizer.zero_grad() + loss.backward() + torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0) + optimizer.step() + + running_loss += loss.item() * y.numel() + total += y.numel() + + train_loss = running_loss / max(1, total) + if scheduler is not None: + scheduler.step() + val_loss = float("inf") + + if val_loader is not None: + val_loss, val_metrics, _ = evaluate_regression_model( + model, val_loader, loss_fn, device=str(device), return_arrays=False, verbose=False + ) + + improved = (best_metric - val_loss) > min_delta + if improved: + best_metric = val_loss + best_state = deepcopy(model.state_dict()) + best_metrics_dict = {"val_" + k: float(v) for k, v in val_metrics.items()} + best_metrics_dict["train_loss_at_best"] = float(train_loss) + epochs_no_improve = 0 + else: + epochs_no_improve += 1 + + if verbose: + msg_metrics = " | ".join(f"{k}={v:.6f}" for k, v in val_metrics.items()) + print( + f"Epoch {epoch:03d}/{num_epochs} | train_loss={train_loss:.6f} | {msg_metrics} | " + f"no_improve={epochs_no_improve}/{patience}" + ) + + if epochs_no_improve >= patience: + if verbose: + print(f"Early stopping at epoch {epoch} (best val_loss={best_metric:.6f}).") + break + else: + # early stopping opzionale on training loss + improved = (best_metric - train_loss) > min_delta + if improved: + best_metric = train_loss + best_state = deepcopy(model.state_dict()) + best_metrics_dict["train_loss_at_best"] = float(train_loss) + epochs_no_improve = 0 + else: + epochs_no_improve += 1 + if verbose: + print( + f"Epoch {epoch:03d}/{num_epochs} | train_loss={train_loss:.6f} | " + f"no_improve={epochs_no_improve}/{patience}" + ) + if epochs_no_improve >= patience: + if verbose: + print(f"Early stopping on training loss at epoch {epoch} (best train_loss={best_metric:.6f}).") + break + + if restore_best and best_state is not None: + model.load_state_dict(best_state) + + @dataclass class TrainingData: """Dataclass for the training data.""" - X_train: NDArray[np.float64] - y_train: NDArray[np.float64] - X_test: NDArray[np.float64] | None = None - y_test: NDArray[np.float64] | None = None + X_train: NDArray[np.float64] | list[torch_geometric.data.Data] + y_train: NDArray[np.float64] | torch.Tensor + X_test: NDArray[np.float64] | list[torch_geometric.data.Data] | None = None + y_test: NDArray[np.float64] | torch.Tensor | None = None indices_train: list[int] | None = None indices_test: list[int] | None = None names_list: list[str] | None = None diff --git a/src/mqt/predictor/ml/predictor.py b/src/mqt/predictor/ml/predictor.py index 3f0ec5497..ee3307c7d 100644 --- a/src/mqt/predictor/ml/predictor.py +++ b/src/mqt/predictor/ml/predictor.py @@ -15,32 +15,52 @@ import zipfile from importlib import resources from pathlib import Path -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, TypedDict from joblib import dump as joblib_dump +from torch import nn +from torch_geometric.loader import DataLoader +from typing_extensions import Unpack + +from mqt.predictor.ml.gnn import GNN if sys.version_info >= (3, 11) and TYPE_CHECKING: # pragma: no cover from typing import assert_never else: from typing_extensions import assert_never +import gc + import matplotlib.pyplot as plt import numpy as np +import optuna +import torch from joblib import Parallel, delayed, load from mqt.bench.targets import get_device +from optuna.samplers import TPESampler + +# cspell:disable-next-line from qiskit import QuantumCircuit from qiskit.qasm2 import dump from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor -from sklearn.model_selection import GridSearchCV, train_test_split +from sklearn.model_selection import GridSearchCV, KFold, train_test_split +from torch_geometric.data import Data from mqt.predictor.hellinger import get_hellinger_model_path from mqt.predictor.ml.helper import ( TrainingData, + create_dag, create_feature_vector, + evaluate_classification_model, + evaluate_regression_model, + get_openqasm3_gates, get_path_trained_model, + get_path_trained_model_gnn, get_path_training_circuits, get_path_training_circuits_compiled, get_path_training_data, + train_classification_model, + train_regression_model, ) from mqt.predictor.reward import ( crit_depth, @@ -53,15 +73,31 @@ from mqt.predictor.utils import timeout_watcher if TYPE_CHECKING: + import torch_geometric + from numpy._typing import NDArray from qiskit.transpiler import Target from mqt.predictor.reward import figure_of_merit +import json + +GNNSample = tuple[torch.Tensor, torch.Tensor, torch.Tensor, int, str] +FeatureSample = tuple[list[float], str] +TrainingSample = GNNSample | FeatureSample + plt.rcParams["font.family"] = "Times New Roman" logger = logging.getLogger("mqt-predictor") +class TrainGNNKwargs(TypedDict, total=False): + """Arguments for training the GNN model.""" + + num_epochs: int + num_trials: int + verbose: bool + + def setup_device_predictor( devices: list[Target], figure_of_merit: figure_of_merit = "expected_fidelity", @@ -69,6 +105,8 @@ def setup_device_predictor( path_compiled_circuits: Path | None = None, path_training_data: Path | None = None, timeout: int = 600, + gnn: bool = False, + **gnn_kwargs: Unpack[TrainGNNKwargs], ) -> bool: """Sets up the device predictor for the given figure of merit. @@ -79,14 +117,13 @@ def setup_device_predictor( path_compiled_circuits: The path to the directory where the compiled circuits should be saved. Defaults to None. path_training_data: The path to the directory where the generated training data should be saved. Defaults to None. timeout: The timeout in seconds for the compilation of a single circuit. Defaults to 600. + gnn: Whether to use a GNN for training. Defaults to False. + gnn_kwargs: Additional keyword arguments for GNN training. Returns: True if the setup was successful, False otherwise. """ - predictor = Predictor( - figure_of_merit=figure_of_merit, - devices=devices, - ) + predictor = Predictor(figure_of_merit=figure_of_merit, devices=devices, gnn=gnn) try: logger.info(f"Start the training for the figure of merit: {figure_of_merit}") # Step 1: Generate compiled circuits for all devices @@ -103,9 +140,14 @@ def setup_device_predictor( path_training_data=path_training_data, ) logger.info(f"Generated training data for {figure_of_merit}") + # Step 3: Train the random forest classifier - predictor.train_random_forest_model() - logger.info(f"Trained random forest classifier for {figure_of_merit}") + if not predictor.gnn: + predictor.train_random_forest_model() + logger.info(f"Trained random forest classifier for {figure_of_merit}") + else: + predictor.train_gnn_model(**gnn_kwargs) + logger.info(f"Trained random GNN for {figure_of_merit}") except FileNotFoundError: logger.exception("File not found during setup.") @@ -129,6 +171,7 @@ def __init__( self, devices: list[Target], figure_of_merit: figure_of_merit = "expected_fidelity", + gnn: bool = False, logger_level: int = logging.INFO, ) -> None: """Initializes the Predictor class. @@ -137,12 +180,13 @@ def __init__( figure_of_merit: The figure of merit to be used for training. devices: The devices to be used for training. logger_level: The level of the logger. Defaults to logging.INFO. - + gnn: Decide if using GNN or other models """ logger.setLevel(logger_level) self.figure_of_merit = figure_of_merit self.devices = devices + self.gnn = gnn self.devices.sort( key=lambda x: x.description ) # sorting is necessary to determine the ground truth label later on when generating the training data @@ -280,17 +324,26 @@ def generate_training_data( training_sample, circuit_name, scores = sample if all(score == -1 for score in scores): continue - training_data.append(training_sample) + + if self.gnn: + x, y, edge_idx, n_nodes, target_label = training_sample + gnn_training_sample = Data(x=x, y=y, edge_index=edge_idx, num_nodes=n_nodes, target_label=target_label) + + training_data.append(gnn_training_sample if self.gnn else training_sample) names_list.append(circuit_name) scores_list.append(scores) - with resources.as_file(path_training_data) as path: - data = np.asarray(training_data, dtype=object) - np.save(str(path / ("training_data_" + self.figure_of_merit + ".npy")), data) - data = np.asarray(names_list, dtype=str) - np.save(str(path / ("names_list_" + self.figure_of_merit + ".npy")), data) - data = np.asarray(scores_list, dtype=object) - np.save(str(path / ("scores_list_" + self.figure_of_merit + ".npy")), data) + with resources.as_file(path_training_data) as path: + if self.gnn: + torch.save(training_data, str(path / ("graph_dataset_" + self.figure_of_merit + ".pt"))) + else: + data = np.asarray(training_data, dtype=object) + np.save(str(path / ("training_data_" + self.figure_of_merit + ".npy")), data) + + data = np.asarray(names_list, dtype=str) + np.save(str(path / ("names_list_" + self.figure_of_merit + ".npy")), data) + data = np.asarray(scores_list, dtype=object) + np.save(str(path / ("scores_list_" + self.figure_of_merit + ".npy")), data) def _generate_training_sample( self, @@ -298,7 +351,7 @@ def _generate_training_sample( path_uncompiled_circuit: Path, path_compiled_circuits: Path, logger_level: int = logging.INFO, - ) -> tuple[tuple[list[Any], Any], str, list[float]]: + ) -> tuple[tuple[list[float], Any] | tuple[torch.Tensor, torch.Tensor, torch.Tensor, int, str], str, list[float]]: """Handles to create a training sample from a given file. Arguments: @@ -360,11 +413,323 @@ def _generate_training_sample( target_label = max(scores, key=lambda k: scores[k]) qc = QuantumCircuit.from_qasm_file(path_uncompiled_circuit / file) - feature_vec = create_feature_vector(qc) - training_sample = (feature_vec, target_label) + training_sample: TrainingSample + if self.gnn: + x, edge_index, number_of_gates = create_dag(qc) + y = torch.tensor([[dev.description for dev in self.devices].index(target_label)], dtype=torch.float) + training_sample = (x, y, edge_index, number_of_gates, target_label) + else: + feature_vec = create_feature_vector(qc) + training_sample = (feature_vec, target_label) circuit_name = str(file).split(".")[0] return training_sample, circuit_name, scores_list + def objective( + self, + trial: optuna.Trial, + dataset: NDArray[np.float64] | list[torch_geometric.data.Data], + task: str, + in_feats: int, + num_outputs: int, + loss_fn: nn.Module, + k_folds: int, + classes: list[str] | None = None, + batch_size: int = 32, + num_epochs: int = 10, + patience: int = 10, + verbose: bool = False, + device: str | None = None, + ) -> float: + """Objective function for Optuna GNN hyperparameter optimization. + + Arguments: + trial: The Optuna trial object. + dataset: The dataset to use for training and validation. + task: The task to optimize (e.g., "binary", "multiclass", or "regression"). + in_feats: number of input features. + num_outputs: number of output features. + device: device to use for training. + loss_fn: loss function to use. + optimizer: optimizer to use. + k_folds: number of folds for cross-validation. + classes: list of class names (for classification tasks). + batch_size: batch size for training. + num_epochs: number of epochs for training. + patience: patience for early stopping. + verbose: whether to print verbose output during training. + + + Returns: + mean_val: The mean value in validation considering the k-folds. + """ + # Type of device used + if device is None: + device = "cuda" if torch.cuda.is_available() else "cpu" + device_obj = torch.device(device) + + # Hyperparameter spaces + hidden_dim = trial.suggest_categorical("hidden_dim", [32, 64, 128]) + num_resnet_layers = trial.suggest_int("num_resnet_layers", 1, 6) + mlp_depth = trial.suggest_int("mlp_depth", 1, 3) + mlp_choices = [32, 64, 128, 256, 512, 1024] + mlp_units = [trial.suggest_categorical(f"mlp_units_{i}", mlp_choices) for i in range(mlp_depth)] + + # Split into k-folds + kf = KFold(n_splits=k_folds, shuffle=True) + fold_val_best_losses: list[float] = [] + + for _fold_idx, (train_idx, val_idx) in enumerate(kf.split(range(len(dataset)))): + train_subset = [dataset[i] for i in train_idx] + val_subset = [dataset[i] for i in val_idx] + # Transform the data into loaders + train_loader = DataLoader(train_subset, batch_size=batch_size, shuffle=True) + val_loader = DataLoader(val_subset, batch_size=batch_size, shuffle=False) + # Define the GNN + model = GNN( + in_feats=in_feats, + hidden_dim=hidden_dim, + num_resnet_layers=num_resnet_layers, + mlp_units=mlp_units, + output_dim=num_outputs, + classes=classes, + ).to(device_obj) + + optimizer = torch.optim.Adam(model.parameters(), lr=1e-3) + # Based on the task, do a training and evaluation for regression or classification + if task == "regression": + train_regression_model( + model, + train_loader, + optimizer, + loss_fn, + num_epochs=num_epochs, + device=device, + verbose=False, + val_loader=val_loader, + patience=patience, + min_delta=0.0, + restore_best=True, + scheduler=None, + ) + val_loss, val_metrics, _ = evaluate_regression_model( + model, val_loader, loss_fn, device=device, return_arrays=False, verbose=verbose + ) + else: + train_classification_model( + model, + train_loader, + optimizer, + loss_fn, + num_epochs=num_epochs, + task=task, + device=device, + verbose=verbose, + val_loader=val_loader, + patience=patience, + min_delta=0.0, + restore_best=True, + scheduler=None, + ) + val_loss, val_metrics, _ = evaluate_classification_model( + model, val_loader, loss_fn, task=task, device=device, return_arrays=False, verbose=verbose + ) + + fold_val_best_losses.append(float(val_loss)) + del train_loader, val_loader, train_subset, val_subset, optimizer, model + if device_obj.type == "cuda": + torch.cuda.empty_cache() + gc.collect() + # Take the mean value + mean_val = float(np.mean(fold_val_best_losses)) + trial.set_user_attr("fold_val_best_losses", fold_val_best_losses) + def _to_serializable(obj): + # detach → cpu → convert scalars to python numbers + if torch.is_tensor(obj): + obj = obj.detach().cpu() + return obj.item() if obj.numel() == 1 else obj.tolist() + if isinstance(obj, dict): + return {k: _to_serializable(v) for k, v in obj.items()} + if isinstance(obj, (list, tuple)): + return [_to_serializable(v) for v in obj] + return obj + + trial.set_user_attr( + "best_hparams", + { + "in_feats": in_feats, + "hidden_dim": hidden_dim, + "num_resnet_layers": num_resnet_layers, + "mlp_units": mlp_units, + "num_outputs": num_outputs, + "val_metrics": _to_serializable(val_metrics), + }, + ) + return mean_val + + def train_gnn_model( + self, + training_data: TrainingData | None = None, + num_epochs: int = 10, + num_trials: int = 2, + patience: int = 10, + verbose: bool = False, + ) -> nn.Module: + """Train the GNN model(s) and return the trained model. + + Arguments: + training_data: The training data to use for training the model. + num_epochs: The number of epochs to train the model. + num_trials: The number of trials to run for hyperparameter optimization. + verbose: Whether to print verbose output during training. + + + Returns: + The trained GNN model. + """ + # Figure out outputs and save path + if self.figure_of_merit == "hellinger_distance": + if len(self.devices) != 1: + msg = "A single device must be provided for Hellinger distance model training." + raise ValueError(msg) + num_outputs = 1 + save_mdl_path = str(get_hellinger_model_path(self.devices[0], gnn=True)) + else: + num_outputs = max(1, len(self.devices)) + save_mdl_path = str(get_path_trained_model_gnn(self.figure_of_merit)) + + # Prepare data + if training_data is None: + training_data = self._get_prepared_training_data() + number_in_features = int(len(get_openqasm3_gates()) + 1 + 3 + 3) + + if self.figure_of_merit == "hellinger_distance": + loss_fn = nn.MSELoss() + task = "regression" + classes = None + else: + if num_outputs == 1: + loss_fn = nn.BCEWithLogitsLoss() + task = "binary" + + else: + loss_fn = nn.CrossEntropyLoss() + task = "multiclass" + classes = [dev.description for dev in self.devices] + sampler_obj = TPESampler(n_startup_trials=10) + study = optuna.create_study(study_name="Best GNN Model", direction="minimize", sampler=sampler_obj) + k_folds = min(len(training_data.y_train), 5) + + def _obj(trial: optuna.Trial) -> float: + return self.objective( + trial=trial, + dataset=training_data.X_train, + task=task, + in_feats=number_in_features, + num_outputs=num_outputs, + loss_fn=loss_fn, + k_folds=k_folds, + classes=classes, + num_epochs=num_epochs, + patience=patience, + verbose=verbose, + ) + + study.optimize(_obj, n_trials=num_trials) + dict_best_hyper = study.best_trial.user_attrs.get("best_hparams") + # Build model (ensure final layer outputs raw logits/no activation) + if self.figure_of_merit != "hellinger_distance": + model = GNN( + in_feats=dict_best_hyper["in_feats"], + hidden_dim=dict_best_hyper["hidden_dim"], + num_resnet_layers=dict_best_hyper["num_resnet_layers"], + mlp_units=dict_best_hyper["mlp_units"], + output_dim=num_outputs, + classes=[dev.description for dev in self.devices], + ) + json_dict = { + "in_feats": dict_best_hyper["in_feats"], + "hidden_dim": dict_best_hyper["hidden_dim"], + "num_resnet_layers": dict_best_hyper["num_resnet_layers"], + "mlp_units": dict_best_hyper["mlp_units"], + "output_dim": num_outputs, + "classes": [dev.description for dev in self.devices], + } + else: + model = GNN( + in_feats=dict_best_hyper["in_feats"], + hidden_dim=dict_best_hyper["hidden_dim"], + num_resnet_layers=dict_best_hyper["num_resnet_layers"], + mlp_units=dict_best_hyper["mlp_units"], + output_dim=num_outputs, + ) + + # create a json with the characteristics of the model + json_dict = { + "in_feats": dict_best_hyper["in_feats"], + "hidden_dim": dict_best_hyper["hidden_dim"], + "num_resnet_layers": dict_best_hyper["num_resnet_layers"], + "mlp_units": dict_best_hyper["mlp_units"], + "output_dim": num_outputs, + } + + json_path = Path(save_mdl_path).with_suffix(".json") # works whether save_mdl_path is str or Path + with json_path.open("w", encoding="utf-8") as f: + json.dump(json_dict, f, indent=4) + + # Device handling + device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + model.to(device) + # Optimizer + optimizer = torch.optim.Adam(model.parameters(), lr=1e-3) + x_train, x_val, _y_train, _y_val = train_test_split( + training_data.X_train, training_data.y_train, test_size=0.2, random_state=5 + ) + # Dataloader + train_loader = DataLoader(x_train, batch_size=32, shuffle=True) + + val_loader = DataLoader(x_val, batch_size=32, shuffle=False) + if task == "regression": + train_regression_model( + model, + train_loader, + optimizer, + loss_fn, + num_epochs=num_epochs, + device=device, + verbose=verbose, + val_loader=val_loader, + patience=10, + min_delta=0.0, + restore_best=True, + scheduler=None, + ) + else: + train_classification_model( + model, + train_loader, + optimizer, + loss_fn, + num_epochs=num_epochs, + task=task, + device=device, + verbose=verbose, + val_loader=val_loader, + patience=10, + min_delta=0.0, + restore_best=True, + scheduler=None, + ) + if verbose: + test_loader = DataLoader(training_data.X_test, batch_size=32, shuffle=False) + avg_loss_test, dict_results, _ = evaluate_classification_model( + model, test_loader, loss_fn=loss_fn, device=device, verbose=verbose, task=task + ) + print(f"Test loss: {avg_loss_test:.4f}, {dict_results}") + + # Save the model + torch.save(model.state_dict(), save_mdl_path) + return model + def train_random_forest_model( self, training_data: TrainingData | None = None ) -> RandomForestRegressor | RandomForestClassifier: @@ -420,23 +785,29 @@ def _get_prepared_training_data(self) -> TrainingData: """ with resources.as_file(get_path_training_data() / "training_data_aggregated") as path: prefix = f"{self.figure_of_merit}.npy" - file_data = path / f"training_data_{prefix}" file_names = path / f"names_list_{prefix}" file_scores = path / f"scores_list_{prefix}" + file_data = ( + path / f"training_data_{prefix}" if not self.gnn else path / f"graph_dataset_{self.figure_of_merit}.pt" + ) if file_data.is_file() and file_names.is_file() and file_scores.is_file(): - training_data = np.load(file_data, allow_pickle=True) + training_data = ( + np.load(file_data, allow_pickle=True) if not self.gnn else torch.load(file_data, weights_only=False) + ) names_list = list(np.load(file_names, allow_pickle=True)) scores_list = [list(scores) for scores in np.load(file_scores, allow_pickle=True)] else: msg = "Training data not found." raise FileNotFoundError(msg) - - x_list, y_list = zip(*training_data, strict=False) - x = np.array(x_list, dtype=np.float64) - y = np.array(y_list, dtype=str) + if not self.gnn: + x_list, y_list = zip(*training_data, strict=False) + x = np.array(x_list, dtype=np.float64) + y = np.array(y_list, dtype=str) + else: + x = training_data + y = np.array([el.target_label for el in training_data]) indices = np.arange(len(y), dtype=np.int64) - x_train, x_test, y_train, y_test, indices_train, indices_test = train_test_split( x, y, indices, test_size=0.3, random_state=5 ) @@ -454,13 +825,14 @@ def _get_prepared_training_data(self) -> TrainingData: def predict_device_for_figure_of_merit( - qc: Path | QuantumCircuit, figure_of_merit: figure_of_merit = "expected_fidelity" + qc: Path | QuantumCircuit, figure_of_merit: figure_of_merit = "expected_fidelity", gnn: bool = False ) -> Target: """Returns the probabilities for all supported quantum devices to be the most suitable one for the given quantum circuit. Arguments: qc: The QuantumCircuit or Path to the respective qasm file. figure_of_merit: The figure of merit to be used for compilation. + gnn: Whether to use a GNN for prediction. Defaults to False. Returns: The probabilities for all supported quantum devices to be the most suitable one for the given quantum circuit. @@ -472,22 +844,49 @@ def predict_device_for_figure_of_merit( if isinstance(qc, Path) and qc.exists(): qc = QuantumCircuit.from_qasm_file(qc) assert isinstance(qc, QuantumCircuit) - - path = get_path_trained_model(figure_of_merit) + path = get_path_trained_model(figure_of_merit) if not gnn else get_path_trained_model_gnn(figure_of_merit) if not path.exists(): error_msg = "The ML model is not trained yet. Please train the model before using it." logger.error(error_msg) raise FileNotFoundError(error_msg) - clf = load(path) - - feature_vector = create_feature_vector(qc) - - probabilities = clf.predict_proba([feature_vector])[0] - class_labels = clf.classes_ - # sort all devices with decreasing probabilities - sorted_devices = np.array([ - label for _, label in sorted(zip(probabilities, class_labels, strict=False), reverse=True) - ]) + if not gnn: + clf = load(path) + + feature_vector = create_feature_vector(qc) + + probabilities = clf.predict_proba([feature_vector])[0] + class_labels = clf.classes_ + # sort all devices with decreasing probabilities + sorted_devices = np.array([ + label for _, label in sorted(zip(probabilities, class_labels, strict=False), reverse=True) + ]) + else: + # Open the json file save_mdl_path[:-4] + ".json" + with Path.open(path.with_suffix(".json"), encoding="utf-8") as f: + json_dict = json.load(f) + + gnn_model = GNN( + in_feats=json_dict["in_feats"], + hidden_dim=json_dict["hidden_dim"], + num_resnet_layers=json_dict["num_resnet_layers"], + mlp_units=json_dict["mlp_units"], + output_dim=json_dict["output_dim"], + classes=json_dict["classes"], + ) + gnn_model.load_state_dict(torch.load(path)) + x, edge_index, number_of_gates = create_dag(qc) + feature_vector = Data(x=x, edge_index=edge_index, num_gates=number_of_gates) + gnn_model.eval() + class_labels = gnn_model.classes + with torch.no_grad(): + probabilities = torch.softmax(gnn_model(feature_vector), dim=1) + assert class_labels is not None + if len(class_labels) != len(probabilities): + msg = "probabilities and class_labels must be same length" + raise ValueError(msg) + + pairs = sorted(zip(probabilities.tolist(), class_labels, strict=False), reverse=True) + sorted_devices = np.array([label for _, label in pairs]) for dev_name in sorted_devices: dev = get_device(dev_name) diff --git a/tests/device_selection/test_helper_ml.py b/tests/device_selection/test_helper_ml.py index daeda6825..8b57cd027 100644 --- a/tests/device_selection/test_helper_ml.py +++ b/tests/device_selection/test_helper_ml.py @@ -13,6 +13,7 @@ from mqt.bench import BenchmarkLevel, get_benchmark from mqt.predictor.ml.helper import ( + create_dag, create_feature_vector, get_openqasm_gates, get_path_training_circuits, @@ -28,6 +29,13 @@ def test_create_feature_vector() -> None: assert feature_vector is not None +def test_create_dag() -> None: + """Test the creation of a DAG.""" + qc = get_benchmark("dj", BenchmarkLevel.INDEP, 3).decompose() + dag = create_dag(qc) + assert dag is not None + + def test_get_openqasm_gates() -> None: """Test the retrieval of the OpenQASM gates.""" assert get_openqasm_gates() is not None diff --git a/tests/device_selection/test_predictor_ml.py b/tests/device_selection/test_predictor_ml.py index 0b2f1485f..7498307ce 100644 --- a/tests/device_selection/test_predictor_ml.py +++ b/tests/device_selection/test_predictor_ml.py @@ -35,7 +35,10 @@ def path_compiled_circuits() -> Path: return Path("./test_compiled_circuits") -def test_setup_device_predictor_with_prediction(path_uncompiled_circuits: Path, path_compiled_circuits: Path) -> None: +@pytest.mark.parametrize("gnn", [False, True], ids=["rf", "gnn"]) +def test_setup_device_predictor_with_prediction( + path_uncompiled_circuits: Path, path_compiled_circuits: Path, gnn: bool +) -> None: """Test the full training pipeline and prediction using a mock device.""" if not path_uncompiled_circuits.exists(): path_uncompiled_circuits.mkdir() @@ -49,19 +52,22 @@ def test_setup_device_predictor_with_prediction(path_uncompiled_circuits: Path, dump(qc, f) device = get_device("ibm_falcon_127") - success = setup_device_predictor( devices=[device], figure_of_merit="expected_fidelity", path_uncompiled_circuits=path_uncompiled_circuits, path_compiled_circuits=path_compiled_circuits, + gnn=gnn, ) assert success data_path = get_path_training_data() / "training_data_aggregated" - assert (data_path / "training_data_expected_fidelity.npy").exists() - assert (data_path / "names_list_expected_fidelity.npy").exists() - assert (data_path / "scores_list_expected_fidelity.npy").exists() + if gnn: + assert (data_path / "graph_dataset_expected_fidelity.pt").exists() + else: + assert (data_path / "training_data_expected_fidelity.npy").exists() + assert (data_path / "names_list_expected_fidelity.npy").exists() + assert (data_path / "scores_list_expected_fidelity.npy").exists() test_qc = get_benchmark("ghz", BenchmarkLevel.ALG, 3) predicted = predict_device_for_figure_of_merit(test_qc, figure_of_merit="expected_fidelity") @@ -86,7 +92,7 @@ def test_remove_files(path_uncompiled_circuits: Path, path_compiled_circuits: Pa data_path = get_path_training_data() / "training_data_aggregated" if data_path.exists(): for file in data_path.iterdir(): - if file.suffix == ".npy": + if file.suffix == ".npy" or file.suffix == ".pt": file.unlink() @@ -100,8 +106,9 @@ def test_predict_device_for_figure_of_merit_no_suitable_device() -> None: predict_device_for_figure_of_merit(qc) -def test_get_prepared_training_data_false_input() -> None: +@pytest.mark.parametrize("gnn", [False, True], ids=["rf", "gnn"]) +def test_get_prepared_training_data_false_input(gnn: bool) -> None: """Test the retrieval of prepared training data.""" - pred = Predictor(devices=[], figure_of_merit="expected_fidelity") + pred = Predictor(devices=[], figure_of_merit="expected_fidelity", gnn=gnn) with pytest.raises(FileNotFoundError, match=re.escape("Training data not found.")): pred._get_prepared_training_data() # noqa: SLF001 diff --git a/tests/hellinger_distance/test_estimated_hellinger_distance.py b/tests/hellinger_distance/test_estimated_hellinger_distance.py index d13151d51..6743efbd8 100644 --- a/tests/hellinger_distance/test_estimated_hellinger_distance.py +++ b/tests/hellinger_distance/test_estimated_hellinger_distance.py @@ -182,7 +182,10 @@ def test_train_random_forest_regressor_and_predict(device: Target) -> None: assert np.isclose(trained_model.predict([feature_vector]), distance_label) -def test_train_and_qcompile_with_hellinger_model(source_path: Path, target_path: Path, device: Target) -> None: +@pytest.mark.parametrize("gnn", [False, True], ids=["rf", "gnn"]) +def test_train_and_qcompile_with_hellinger_model( + source_path: Path, target_path: Path, device: Target, gnn: bool +) -> None: """Test the entire predictor toolchain with the Hellinger distance model that was trained in the previous test.""" figure_of_merit = "estimated_hellinger_distance" @@ -202,7 +205,7 @@ def test_train_and_qcompile_with_hellinger_model(source_path: Path, target_path: ) # 2. Setup and train the machine learning model for device selection - ml_predictor = ml_Predictor(devices=[device], figure_of_merit=figure_of_merit) + ml_predictor = ml_Predictor(devices=[device], figure_of_merit=figure_of_merit, gnn=gnn) # Prepare uncompiled circuits if not source_path.exists(): @@ -220,7 +223,10 @@ def test_train_and_qcompile_with_hellinger_model(source_path: Path, target_path: if sys.platform == "win32": with pytest.warns(RuntimeWarning, match=re.escape("Timeout is not supported on Windows.")): ml_predictor.compile_training_circuits( - timeout=600, path_compiled_circuits=target_path, path_uncompiled_circuits=source_path, num_workers=1 + timeout=600, + path_compiled_circuits=target_path, + path_uncompiled_circuits=source_path, + num_workers=1, ) else: ml_predictor.compile_training_circuits( @@ -231,17 +237,21 @@ def test_train_and_qcompile_with_hellinger_model(source_path: Path, target_path: ml_predictor.generate_training_data( path_uncompiled_circuits=source_path, path_compiled_circuits=target_path, num_workers=1 ) - - for file in [ - "training_data_estimated_hellinger_distance.npy", - "names_list_estimated_hellinger_distance.npy", - "scores_list_estimated_hellinger_distance.npy", - ]: - path = get_path_training_data() / "training_data_aggregated" / file - assert path.exists() + if gnn: + assert ( + get_path_training_data() / "training_data_aggregated" / "graph_dataset_estimated_hellinger_distance.pt" + ).exists() + else: + for file in [ + "training_data_estimated_hellinger_distance.npy", + "names_list_estimated_hellinger_distance.npy", + "scores_list_estimated_hellinger_distance.npy", + ]: + path = get_path_training_data() / "training_data_aggregated" / file + assert path.exists() # Train the ML model - ml_predictor.train_random_forest_model() + ml_predictor.train_gnn_model() if gnn else ml_predictor.train_random_forest_model() qc = get_benchmark("ghz", BenchmarkLevel.ALG, 3) # Test the prediction @@ -269,10 +279,16 @@ def test_remove_files(source_path: Path, target_path: Path) -> None: if file.suffix == ".npy": file.unlink() + data_path = get_path_training_data() / "training_data_aggregated" + if data_path.exists(): + for file in data_path.iterdir(): + if file.suffix == ".pt": + file.unlink() + model_path = get_path_training_data() / "trained_model" if model_path.exists(): for file in model_path.iterdir(): - if file.suffix == ".joblib": + if file.suffix == ".joblib" or file.suffix == ".pth" or file.suffix == ".json": file.unlink()