diff --git a/desdeo/api/models/request_models.py b/desdeo/api/models/request_models.py new file mode 100644 index 000000000..84ee74da1 --- /dev/null +++ b/desdeo/api/models/request_models.py @@ -0,0 +1,12 @@ +"""Requests models.""" +from pydantic import BaseModel + + +class RepresentativeSolutionSetRequest(BaseModel): + """Model of the request to the representative solution set.""" + problem_id: int + name: str + description: str | None = None + solution_data: dict[str, list[float]] + ideal: dict[str, float] + nadir: dict[str, float] diff --git a/desdeo/api/routers/problem.py b/desdeo/api/routers/problem.py index 42772d667..438d575eb 100644 --- a/desdeo/api/routers/problem.py +++ b/desdeo/api/routers/problem.py @@ -5,7 +5,7 @@ from fastapi import APIRouter, Depends, HTTPException, Request, UploadFile, status from fastapi.responses import JSONResponse -from sqlmodel import select +from sqlmodel import Session, select from desdeo.api.models import ( ForestProblemMetaData, @@ -21,6 +21,7 @@ User, UserRole, ) +from desdeo.api.models.request_models import RepresentativeSolutionSetRequest from desdeo.api.routers.user_authentication import get_current_user from desdeo.problem import Problem from desdeo.tools.utils import available_solvers @@ -318,3 +319,162 @@ def select_solver( db_session.refresh(problem_metadata) return JSONResponse(content={"message": "OK"}, status_code=status.HTTP_200_OK) + +@router.post("/add_representative_solution_set") +def add_representative_solution_set( + payload: RepresentativeSolutionSetRequest, + context: Annotated[SessionContext, Depends(get_session_context_without_request)], +): + """Add a new representative solution set as metadata to a problem. + + Args: + payload (RepresentativeSolutionSetRequest): The JSON body containing the + details of the representative solution set (name, description, solution data, ideal, nadir). + context (SessionContext): The session context providing the current user and database session. + + Raises: + HTTPException: If problem not found or unauthorized user. + + Returns: + dict: Confirmation message. + """ + user = context.user + db_session: Session = context.db_session + + # Fetch the problem + problem_db = db_session.get(ProblemDB, payload.problem_id) + if problem_db is None: + raise HTTPException(status_code=404, detail=f"Problem with ID {payload.problem_id} not found.") + + # Check the user + if user.id != problem_db.user_id: + raise HTTPException(status_code=401, detail="Unauthorized user.") + + # Ensure metadata object exists + if problem_db.problem_metadata is None: + problem_metadata = ProblemMetaDataDB(problem_id=problem_db.id, problem=problem_db) + db_session.add(problem_metadata) + db_session.commit() + db_session.refresh(problem_metadata) + else: + problem_metadata = problem_db.problem_metadata + + # Add new representative solution set + repr_metadata = RepresentativeNonDominatedSolutions( + metadata_id=problem_metadata.id, + name=payload.name, + description=payload.description, + solution_data=payload.solution_data, + ideal=payload.ideal, + nadir=payload.nadir, + metadata_instance=problem_metadata, + ) + + db_session.add(repr_metadata) + db_session.commit() + db_session.refresh(repr_metadata) + + # Attach to problem metadata + problem_metadata.representative_nd_metadata.append(repr_metadata) + db_session.add(problem_metadata) + db_session.commit() + db_session.refresh(problem_metadata) + + return {"message": "Representative solution set added successfully."} + +@router.get("/all_representative_solution_sets/{problem_id}") +def get_all_representative_solution_sets( + problem_id: int, + context: Annotated[SessionContext, Depends(get_session_context_without_request)], +): + """Get meta information about all representative solution sets for a given problem. + + Returns only name, description, ideal, and nadir for each set. + """ + db_session: Session = context.db_session + user = context.user + + # Fetch problem + problem_db = db_session.get(ProblemDB, problem_id) + if not problem_db: + raise HTTPException(status_code=404, detail=f"Problem with ID {problem_id} not found.") + + # Check the user + if problem_db.user_id != user.id: + raise HTTPException(status_code=401, detail="Unauthorized user.") + + # Fetch metadata + problem_metadata = problem_db.problem_metadata + if not problem_metadata or not problem_metadata.representative_nd_metadata: + return { + "problem_id": problem_id, + "representative_sets": [] + } + + # Build response + sets_meta = [ + { + "name": rep.name, + "description": rep.description, + "ideal": rep.ideal, + "nadir": rep.nadir + } + for rep in problem_metadata.representative_nd_metadata + ] + + return { + "problem_id": problem_id, + "representative_sets": sets_meta + } + +@router.get("/representative_solution_set/{set_id}") +def get_representative_solution_set( + set_id: int, + context: Annotated[SessionContext, Depends(get_session_context_without_request)], +): + """Fetch full information of a single representative solution set by its ID.""" + db_session: Session = context.db_session + + # Fetch the representative set + repr_set = db_session.get(RepresentativeNonDominatedSolutions, set_id) + if repr_set is None: + raise HTTPException(status_code=404, detail=f"Representative set with ID {set_id} not found.") + + # Check the user + if repr_set.metadata_instance.problem.user_id != context.user.id: + raise HTTPException(status_code=401, detail="Unauthorized user.") + + # Return all fields as a dict + return { + "id": repr_set.id, + "name": repr_set.name, + "description": repr_set.description, + "solution_data": repr_set.solution_data, + "ideal": repr_set.ideal, + "nadir": repr_set.nadir, + } + +@router.delete("/representative_solution_set/{set_id}") +def delete_representative_solution_set( + set_id: int, + context: Annotated[SessionContext, Depends(get_session_context_without_request)], +): + """Delete a representative solution set by its ID.""" + db_session: Session = context.db_session + user = context.user + + # Fetch the set + repr_metadata = db_session.get(RepresentativeNonDominatedSolutions, set_id) + if repr_metadata is None: + raise HTTPException(status_code=404, detail=f"Representative solution set with ID {set_id} not found.") + + # Ensure the user owns the problem this set belongs to + problem_metadata = repr_metadata.metadata_instance + if problem_metadata.problem.user_id != user.id: + raise HTTPException(status_code=401, detail="Unauthorized user.") + + # Delete the set + db_session.delete(repr_metadata) + db_session.commit() + + return {"detail": "Deleted successfully"} diff --git a/desdeo/api/tests/test_problem_metadata.py b/desdeo/api/tests/test_problem_metadata.py new file mode 100644 index 000000000..d11b4ec79 --- /dev/null +++ b/desdeo/api/tests/test_problem_metadata.py @@ -0,0 +1,217 @@ +from fastapi.testclient import TestClient # noqa: D100 +from sqlmodel import select + +from desdeo.api.models import ProblemDB, ProblemMetaDataDB, RepresentativeNonDominatedSolutions +from desdeo.problem.testproblems import dtlz2 + +from .conftest import login + + +def test_add_representative_solution_set(client: TestClient, session_and_user: dict): + """Test that the representative solution set can be added via the endpoint.""" + session = session_and_user["session"] + user = session_and_user["user"] + + access_token = login(client) + + # Create a test problem + problem = ProblemDB.from_problem(dtlz2(5, 3), user=user) + session.add(problem) + session.commit() + session.refresh(problem) + + # Prepare solution set JSON + solution_set_payload = { + "problem_id": problem.id, + "name": "Test solutions", + "description": "Solutions for testing", + "solution_data": { + "x_1": [1.1, 2.2, 3.3], + "x_2": [-1.1, -2.2, -3.3], + "f_1": [0.1, 0.5, 0.9], + "f_2": [-0.1, 0.2, 199.2], + "f_1_min": [], + "f_2_min": [], + }, + "ideal": {"f_1": 0.1, "f_2": -0.1}, + "nadir": {"f_1": 0.9, "f_2": 199.2}, + } + + response = client.post( + "/problem/add_representative_solution_set", + headers={"Authorization": f"Bearer {access_token}"}, + json=solution_set_payload, + ) + + assert response.status_code == 200 + assert response.json()["message"] == "Representative solution set added successfully." + + # Verify DB + statement = select(ProblemMetaDataDB).where(ProblemMetaDataDB.problem_id == problem.id) + metadata = session.exec(statement).first() + assert metadata is not None + assert metadata.problem_id == problem.id + + repr_metadata = metadata.representative_nd_metadata[0] + assert isinstance(repr_metadata, RepresentativeNonDominatedSolutions) + assert repr_metadata.name == solution_set_payload["name"] + assert repr_metadata.description == solution_set_payload["description"] + assert repr_metadata.solution_data == solution_set_payload["solution_data"] + assert repr_metadata.ideal == solution_set_payload["ideal"] + assert repr_metadata.nadir == solution_set_payload["nadir"] + +def test_get_all_representative_solution_sets(client: TestClient, session_and_user: dict): + """Test that all representative solution sets for a problem can be fetched (meta-level).""" + session = session_and_user["session"] + user = session_and_user["user"] + + access_token = login(client) + + # Create a test problem + problem = ProblemDB.from_problem(dtlz2(5, 3), user=user) + session.add(problem) + session.commit() + session.refresh(problem) + + # Add a representative solution set + solution_set = RepresentativeNonDominatedSolutions( + metadata_id=None, + name="Test Set GET", + description="Description GET", + solution_data={"x": [1.0, 2.0], "f": [0.1, 0.2]}, + ideal={"f_1": 0.1}, + nadir={"f_1": 0.2}, + metadata_instance=None + ) + + # Attach problem metadata + problem_metadata = ProblemMetaDataDB(problem_id=problem.id, problem=problem) + session.add(problem_metadata) + session.commit() + session.refresh(problem_metadata) + + # Attach the representative set + solution_set.metadata_id = problem_metadata.id + solution_set.metadata_instance = problem_metadata + session.add(solution_set) + problem_metadata.representative_nd_metadata.append(solution_set) + session.add_all([solution_set, problem_metadata]) + session.commit() + session.refresh(problem_metadata) + + # Call GET endpoint + response = client.get( + f"/problem/all_representative_solution_sets/{problem.id}", + headers={"Authorization": f"Bearer {access_token}"} + ) + + assert response.status_code == 200 + + data = response.json() + assert data["problem_id"] == problem.id + assert len(data["representative_sets"]) == 1 + + repr_meta = data["representative_sets"][0] + assert repr_meta["name"] == "Test Set GET" + assert repr_meta["description"] == "Description GET" + assert repr_meta["ideal"] == {"f_1": 0.1} + assert repr_meta["nadir"] == {"f_1": 0.2} + +def test_get_representative_solution_set(client: TestClient, session_and_user: dict): + """Test that a single representative solution set can be fetched by its ID.""" + session = session_and_user["session"] + user = session_and_user["user"] + + access_token = login(client) + + # Create a test problem + problem = ProblemDB.from_problem(dtlz2(5, 3), user=user) + session.add(problem) + session.commit() + session.refresh(problem) + + # Add a representative solution set + solution_set_payload = { + "problem_id": problem.id, + "name": "Full Test Solution Set", + "description": "Full info for testing", + "solution_data": { + "x_1": [1.1, 2.2, 3.3], + "x_2": [-1.1, -2.2, -3.3], + "f_1": [0.1, 0.5, 0.9], + "f_2": [-0.1, 0.2, 199.2], + "f_1_min": [], + "f_2_min": [], + }, + "ideal": {"f_1": 0.1, "f_2": -0.1}, + "nadir": {"f_1": 0.9, "f_2": 199.2}, + } + + post_response = client.post( + "/problem/add_representative_solution_set", + headers={"Authorization": f"Bearer {access_token}"}, + json=solution_set_payload, + ) + assert post_response.status_code == 200 + + # Fetch the added representative set from DB + repr_metadata = session.exec( + select(RepresentativeNonDominatedSolutions) + .where(RepresentativeNonDominatedSolutions.name == "Full Test Solution Set") + ).first() + assert repr_metadata is not None + + # Call the GET endpoint + get_response = client.get( + f"/problem/representative_solution_set/{repr_metadata.id}", + headers={"Authorization": f"Bearer {access_token}"}, + ) + assert get_response.status_code == 200 + + # Verify returned data + data = get_response.json() + assert data["id"] == repr_metadata.id + assert data["name"] == solution_set_payload["name"] + assert data["description"] == solution_set_payload["description"] + assert data["solution_data"] == solution_set_payload["solution_data"] + assert data["ideal"] == solution_set_payload["ideal"] + assert data["nadir"] == solution_set_payload["nadir"] + +def test_delete_representative_solution_set(client: TestClient, session_and_user: dict): + """Test that a representative solution set can be deleted by its ID.""" + session = session_and_user["session"] + user = session_and_user["user"] + + access_token = login(client) + + # Create test problem + problem = ProblemDB.from_problem(dtlz2(5, 3), user=user) + session.add(problem) + session.commit() + session.refresh(problem) + + # Add a representative solution set + repr_metadata = RepresentativeNonDominatedSolutions( + metadata_id=ProblemMetaDataDB(problem_id=problem.id, problem=problem).id, + metadata_instance=ProblemMetaDataDB(problem_id=problem.id, problem=problem), + name="To be deleted", + description="Test deletion", + solution_data={"x": [1.0], "f": [0.0]}, + ideal={"f": 0.0}, + nadir={"f": 1.0}, + ) + session.add(repr_metadata) + session.commit() + session.refresh(repr_metadata) + + # Call DELETE request + response = client.delete( + f"/problem/representative_solution_set/{repr_metadata.id}", + headers={"Authorization": f"Bearer {access_token}"}, + ) + assert response.status_code == 200 + assert response.json()["detail"] == "Deleted successfully" + + # Verify DB deletion + deleted_set = session.get(RepresentativeNonDominatedSolutions, repr_metadata.id) + assert deleted_set is None