diff --git a/API/__init__.py b/API/__init__.py index a62df41..c4a6e9b 100644 --- a/API/__init__.py +++ b/API/__init__.py @@ -1,20 +1,35 @@ from __future__ import annotations +import logging + import uvicorn -from fastapi import FastAPI +from fastapi import FastAPI, HTTPException from API import route +# Initialize logger +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + Fastapp = FastAPI() + # API Router Fastapp.include_router(route.router) @Fastapp.get("/") def read_root(): - return {"Hello": "FASTAPI"} + try: + return {"Hello": "FASTAPI"} + except Exception as e: + logger.error(f"Error in root endpoint: {e}") + raise HTTPException(status_code=500, detail="Internal Server Error") -# function to run server of FastAPI +# Function to run FastAPI server def run_fastapi_app(): - uvicorn.run(Fastapp, host="127.0.0.1", port=8000) + try: + uvicorn.run(Fastapp, host="127.0.0.1", port=8000) + except Exception as e: + logger.error(f"Error starting FastAPI server: {e}") + raise diff --git a/API/database.py b/API/database.py index eb3031d..a99822f 100644 --- a/API/database.py +++ b/API/database.py @@ -1,8 +1,13 @@ from __future__ import annotations +import logging from datetime import datetime -from pymongo import MongoClient +from pymongo import MongoClient, errors + +# Initialize logger +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) class Database: @@ -14,8 +19,13 @@ def __init__(self, uri="mongodb://localhost:27017/", db_name="ImageDB"): uri (str): The uri of the MongoDB server. Defaults to 'mongodb://localhost:27017/'. db_name (str): The name of the MongoDB database. Defaults to 'ImageDB'. """ - self.client = MongoClient(uri) - self.db = self.client[db_name] + try: + self.client = MongoClient(uri) + self.db = self.client[db_name] + logger.info("Successfully connected to MongoDB.") + except errors.ConnectionError as e: + logger.error(f"Failed to connect to MongoDB: {e}") + raise def find(self, collection, query=None): """ @@ -28,7 +38,11 @@ def find(self, collection, query=None): Returns: pymongo.cursor.Cursor: A cursor pointing to the results of the query. """ - return self.db[collection].find(query) + try: + return self.db[collection].find(query) + except errors.PyMongoError as e: + logger.error(f"Error finding documents in {collection}: {e}") + return None def insert_one(self, collection, document): """ @@ -41,8 +55,11 @@ def insert_one(self, collection, document): Returns: pymongo.results.InsertOneResult: The result of the insertion. """ - - return self.db[collection].insert_one(document) + try: + return self.db[collection].insert_one(document) + except errors.PyMongoError as e: + logger.error(f"Error inserting document into {collection}: {e}") + return None def find_one(self, collection, filter, projection=None): """ @@ -56,7 +73,11 @@ def find_one(self, collection, filter, projection=None): Returns: dict: The document that matches the query, or None if no documents match. """ - return self.db[collection].find_one(filter=filter, projection=projection) + try: + return self.db[collection].find_one(filter=filter, projection=projection) + except errors.PyMongoError as e: + logger.error(f"Error finding document in {collection}: {e}") + return None def find_one_and_delete(self, collection, query): """ @@ -69,7 +90,11 @@ def find_one_and_delete(self, collection, query): Returns: dict: The document that matches the query, or None if no documents match. """ - return self.db[collection].find_one_and_delete(query) + try: + return self.db[collection].find_one_and_delete(query) + except errors.PyMongoError as e: + logger.error(f"Error deleting document in {collection}: {e}") + return None def update_one(self, collection, query, update): """ @@ -83,10 +108,12 @@ def update_one(self, collection, query, update): Returns: pymongo.results.UpdateResult: The result of the update. """ + try: + return self.db[collection].update_one(query, update) + except errors.PyMongoError as e: + logger.error(f"Error updating document in {collection}: {e}") + return None - return self.db[collection].update_one(query, update) - - # add a function for pipeline aggregation vector search def vector_search(self, collection, embedding): """ Perform a vector similarity search on the given collection. @@ -98,27 +125,31 @@ def vector_search(self, collection, embedding): Returns: list: A list of documents with the closest embedding to the query vector, sorted by score. """ - - result = self.db[collection].aggregate( - [ - { - "$vectorSearch": { - "index": "vector_index", - "path": "embedding", - "queryVector": embedding, - "numCandidates": 20, - "limit": 20, + try: + result = self.db[collection].aggregate( + [ + { + "$vectorSearch": { + "index": "vector_index", + "path": "embedding", + "queryVector": embedding, + "numCandidates": 20, + "limit": 20, + }, }, - }, - { - "$project": { - "_id": 0, - "Name": 1, - "Image": 1, - "score": {"$meta": "vectorSearchScore"}, + { + "$project": { + "_id": 0, + "Name": 1, + "Image": 1, + "score": {"$meta": "vectorSearchScore"}, + }, }, - }, - ], - ) - result_arr = [i for i in result] - return result_arr + ], + ) + result_arr = [i for i in result] + return result_arr + except errors.PyMongoError as e: + logger.error( + f"Error performing vector search in {collection}: {e}") + return [] diff --git a/API/utils.py b/API/utils.py index f426de7..ad58473 100644 --- a/API/utils.py +++ b/API/utils.py @@ -68,15 +68,21 @@ def format(self, record): formatter = logging.Formatter(log_fmt) return formatter.format(record) - logger = logging.getLogger() - logger.setLevel(logging.DEBUG) - - stderr_handler = logging.StreamHandler() - stderr_handler.setLevel(logging.DEBUG) - stderr_handler.setFormatter(CustomFormatter()) - logger.addHandler(stderr_handler) - - file_handler = logging.FileHandler("app.log", mode="w") - file_handler.setLevel(logging.DEBUG) - file_handler.setFormatter(CustomFormatter(True)) - logger.addHandler(file_handler) + try: + logger = logging.getLogger() + logger.setLevel(logging.DEBUG) + + stderr_handler = logging.StreamHandler() + stderr_handler.setLevel(logging.DEBUG) + stderr_handler.setFormatter(CustomFormatter()) + logger.addHandler(stderr_handler) + + file_handler = logging.FileHandler("app.log", mode="w") + file_handler.setLevel(logging.DEBUG) + file_handler.setFormatter(CustomFormatter(True)) + logger.addHandler(file_handler) + + logging.info("Logging configuration initialized successfully.") + except Exception as e: + print(f"Error initializing logging configuration: {e}") + logging.error(f"Error initializing logging configuration: {e}") diff --git a/FaceRec/app/main/Edit.py b/FaceRec/app/main/Edit.py index e5956de..8764009 100644 --- a/FaceRec/app/main/Edit.py +++ b/FaceRec/app/main/Edit.py @@ -23,8 +23,13 @@ cap = cv2.VideoCapture(0) +# Check if the camera opened successfully +if not cap.isOpened(): + print("Error: Could not open video capture.") + # You can raise an exception or handle it as needed -# function for displaying live video + +# Function for displaying live video def display_live_video(): """ Generator for displaying live video from the camera. @@ -34,11 +39,12 @@ def display_live_video(): while True: success, frame = cap.read() # Read a frame from the camera if not success: + print("Error: Failed to capture image.") break frame = cv2.flip(frame, 1) ret, buffer = cv2.imencode(".jpg", frame) - frame = buffer.tobytes if not ret: + print("Error: Failed to encode image.") break yield ( b"--frame\r\n" @@ -83,16 +89,27 @@ def capture(): global gender global Dept global encoded_image + EmployeeCode = request.form.get("EmployeeCode", "") Name = request.form.get("Name", "") gender = request.form.get("gender", "") Dept = request.form.get("Department", "") - ret, frame = cap.read(True) - frame = cv2.flip(frame, 1) - _, buffer = cv2.imencode(".jpg", frame) - encoded_image = base64.b64encode(buffer).decode("utf-8") - with open(Config.image_data_file, "w") as file: - json.dump({"base64_image": encoded_image}, file) + + try: + ret, frame = cap.read(True) + if not ret: + print("Error: Could not read frame from camera.") + return redirect("Image") # or handle error appropriately + + frame = cv2.flip(frame, 1) + _, buffer = cv2.imencode(".jpg", frame) + encoded_image = base64.b64encode(buffer).decode("utf-8") + + with open(Config.image_data_file, "w") as file: + json.dump({"base64_image": encoded_image}, file) + except Exception as e: + print(f"Error while capturing image: {e}") + return redirect("Image") @@ -114,35 +131,49 @@ def display_image(): Returns: A rendered template with the image path. """ - if os.path.exists(Config.image_data_file): - with open(Config.image_data_file) as file: - image_data = json.load(file) - encoded_image = image_data.get("base64_image", "") - decoded_image_data = base64.b64decode(encoded_image) - image = Image.open(io.BytesIO(decoded_image_data)) - filename = "final.png" - image.save( - os.path.join( - Config.upload_image_path[0], - filename, - ), - quality=100, - ) - image = sorted( - os.listdir(Config.upload_image_path[0]), - key=lambda x: os.path.getatime( - os.path.join(Config.upload_image_path[0], x), - ), - reverse=True, - ) - if image: - recent_image = image[0] - image_path = os.path.join(Config.upload_image_path[0], recent_image) - else: - recent_image = None - image_path = os.path.join(Config.upload_image_path[0], recent_image) - print("done") - return render_template("index.html", image_path=image_path) + try: + if os.path.exists(Config.image_data_file): + with open(Config.image_data_file) as file: + image_data = json.load(file) + + encoded_image = image_data.get("base64_image", "") + decoded_image_data = base64.b64decode(encoded_image) + image = Image.open(io.BytesIO(decoded_image_data)) + filename = "final.png" + image.save( + os.path.join( + Config.upload_image_path[0], + filename, + ), + quality=100, + ) + + image = sorted( + os.listdir(Config.upload_image_path[0]), + key=lambda x: os.path.getatime( + os.path.join(Config.upload_image_path[0], x), + ), + reverse=True, + ) + + if image: + recent_image = image[0] + image_path = os.path.join( + Config.upload_image_path[0], recent_image) + else: + recent_image = None + image_path = "" + else: + print(f"Error: {Config.image_data_file} does not exist.") + recent_image = None + image_path = "" + + return render_template("index.html", image_path=image_path) + except Exception as e: + print(f"Error while displaying image: {e}") + return render_template( + "index.html", image_path="" + ) # Show a default image or handle error @Edit_blueprint.route("/edit/", methods=["POST", "GET"]) @@ -179,33 +210,44 @@ def edit(EmployeeCode): Name = request.form["Name"] gender = request.form["Gender"] Department = request.form["Department"] - with open(Config.image_data_file) as file: - image_data = json.load(file) - encoded_image = image_data.get("base64_image", "") - payload = { - "Name": Name, - "gender": gender, - "Department": Department, - "Image": encoded_image, - } - # logger.info(payload) try: - url = requests.put( - f"http://127.0.0.1:8000/update/{EmployeeCode}", - json=payload, + with open(Config.image_data_file) as file: + image_data = json.load(file) + + encoded_image = image_data.get("base64_image", "") + payload = { + "Name": Name, + "gender": gender, + "Department": Department, + "Image": encoded_image, + } + try: + url = requests.put( + f"http://127.0.0.1:8000/update/{EmployeeCode}", + json=payload, + ) + if url.status_code == 200: + return redirect("/") + else: + print( + f"Error: Failed to update employee data with status code {url.status_code}" + ) + except requests.exceptions.RequestException as e: + print(f"Request failed: {e}") + + except Exception as e: + print(f"Error while processing employee data: {e}") + + try: + response = requests.get(f"http://127.0.0.1:8000/read/{EmployeeCode}") + if response.status_code == 200: + employee_data = response.json() + return render_template("edit.html", employee_data=employee_data) + else: + print( + f"Error: Failed to retrieve employee data with status code {response.status_code}" ) - url.status_code - # logger.info(url.json()) - - return redirect("/") - - except requests.exceptions.RequestException as e: - print(f"Request failed: {e}") - response = requests.get(f"http://127.0.0.1:8000/read/{EmployeeCode}") - # logger.info(response.status_code) - # logger.info(response.json()) - if response.status_code == 200: - employee_data = response.json() - return render_template("edit.html", employee_data=employee_data) - else: - return f"Error {response.status_code}: Failed to retrieve employee data." + return f"Error {response.status_code}: Failed to retrieve employee data." + except requests.exceptions.RequestException as e: + print(f"Request failed: {e}") + return "Error: Could not retrieve employee data." diff --git a/FaceRec/app/main/Employee.py b/FaceRec/app/main/Employee.py index ead1087..8e57026 100644 --- a/FaceRec/app/main/Employee.py +++ b/FaceRec/app/main/Employee.py @@ -1,172 +1,179 @@ from __future__ import annotations import base64 -import io import json +import logging import os import cv2 import requests from flask import Blueprint, jsonify, redirect, render_template, request -from PIL import Image from FaceRec.config import Config +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + video_capture = cv2.VideoCapture(0) flk_blueprint = Blueprint( - "flk_blueprint ", + "flk_blueprint", __name__, template_folder="../../templates/", static_folder="../../static/", - # capture_image="../../Capture image/" ) @flk_blueprint.route("/") -def Main_page(): +def main_page(): """ - This route is used to create a directory for storing images of employees - if it does not already exist. It then redirects to the route displaying all - records of employees. This route is used when the user first visits the - website. + This route creates a directory for storing employee images if it doesn't exist + and redirects to the route displaying all employee records. """ path = str(Config.upload_image_path[0]) - if not os.path.exists(path): - os.makedirs(path, exist_ok=True) - else: - pass + os.makedirs(path, exist_ok=True) return redirect("DisplayingEmployees") -# Displaying all records @flk_blueprint.route("/DisplayingEmployees") def display_information(): - """This route is used to retrieve all records of employees from the FastAPI - endpoint http://127.0.0.1:8000/Data/ and store them in the employees global - variable. The records are then passed to the template table.html to be - displayed in a table. If the request to the FastAPI endpoint fails, an - appropriate error message is printed to the console.""" + """Retrieve and display all employee records.""" global employees url = "http://127.0.0.1:8000/Data/" try: resp = requests.get(url=url) - # logger.info(resp.status_code) - # logger.info(resp.json()) + resp.raise_for_status() # Raise an error for bad responses employees = resp.json() - + logger.info("Employee data retrieved successfully.") except requests.exceptions.RequestException as e: - print(f"Request failed: {e}") + logger.error(f"Request failed: {e}") + employees = ( + [] + ) # Handle the error gracefully by setting employees to an empty list return render_template("table.html", employees=employees) -# To add employee record @flk_blueprint.route("/Add_employee") def add_employee(): - """This route is used to display the form for adding a new employee record. - The form is rendered from the template index.html.""" - + """Display the form for adding a new employee record.""" return render_template("index.html") -# To submit the form data to server and save it in database @flk_blueprint.route("/submit_form", methods=["POST"]) def submit_form(): """ - This route is used to handle the form submission of the new employee - record. The form data is received from the request object and then - validated. The image is base64 encoded and saved in the file defined - in the Config class. The image is then sent to the FastAPI endpoint - http://127.0.0.1:8000/create_new_faceEntry to be stored in the MongoDB - database. If the request to the FastAPI endpoint fails, an appropriate - error message is returned. If the request is successful, the user is - redirected to the route /DisplayingEmployees to view the newly added - record. + Handle the form submission for adding a new employee record. + Validate input and save image data before sending it to the FastAPI endpoint. """ - Employee_Code = request.form["EmployeeCode"] - Name = request.form["Name"] - gender = request.form["Gender"] - Department = request.form["Department"] + try: + Employee_Code = request.form["EmployeeCode"] + Name = request.form["Name"] + gender = request.form["Gender"] + Department = request.form["Department"] - if request.files["File"]: + # Validate and process image file if "File" not in request.files: return jsonify({"message": "No file part"}), 400 file = request.files["File"] allowed_extensions = {"png", "jpg", "jpeg"} - if ( - "." not in file.filename - or file.filename.split(".")[-1].lower() not in allowed_extensions - ): + if file and file.filename.split(".")[-1].lower() not in allowed_extensions: return jsonify({"message": "File extension is not valid"}), 400 - if file: - image_data = file.read() - encoded_image = base64.b64encode(image_data).decode("utf-8") - with open(Config.image_data_file, "w") as file: - json.dump({"base64_image": encoded_image}, file) - - with open(Config.image_data_file) as file: - image_data = json.load(file) - encoded_image = image_data.get("base64_image", "") - jsonify( - { + + image_data = file.read() + encoded_image = base64.b64encode(image_data).decode("utf-8") + + # Save the encoded image + with open(Config.image_data_file, "w") as img_file: + json.dump({"base64_image": encoded_image}, img_file) + + # Prepare payload for API + payload = { "EmployeeCode": Employee_Code, "Name": Name, "gender": gender, "Department": Department, - "encoded_image": encoded_image, - }, - ) - - payload = { - "EmployeeCode": Employee_Code, - "Name": Name, - "gender": gender, - "Department": Department, - "Image": encoded_image, - } - url = "http://127.0.0.1:8000/create_new_faceEntry" - payload.status_code - # try: - # resp = requests.post( - # url=url, - # json={ - # "EmployeeCode": 134, - # "Name": "Name", - # "gender": "gender", - # "Department": "Department", - # "Image": "your_image", - # }, - # ) - # resp.status_code - # except requests.exceptions.RequestException as e: - # print(f"Request failed: {e}") - jsonify({"message": "Successfully executed"}) - print("Executed.") - if payload.status_code == 200: + "Image": encoded_image, + } + + # Send request to FastAPI + response = requests.post( + "http://127.0.0.1:8000/create_new_faceEntry", json=payload + ) + response.raise_for_status() # Raise an error for bad responses + logger.info("Employee record created successfully.") + return redirect("DisplayingEmployees") - else: - return jsonify({"message": "Failed to execute"}) + except requests.exceptions.RequestException as e: + logger.error(f"Request failed: {e}") + return jsonify({"message": "Failed to execute"}), 500 -# To edit an employee details + except Exception as e: + logger.error(f"An error occurred: {e}") + return jsonify({"message": "An unexpected error occurred."}), 500 -# To delete employee details @flk_blueprint.route("/Delete/", methods=["DELETE", "GET"]) -def Delete(EmployeeCode): - """Delete an employee with the given EmployeeCode. +def delete(EmployeeCode): + """Delete an employee with the given EmployeeCode.""" + if not isinstance(EmployeeCode, int): + return jsonify({"message": "Employee code should be an integer"}), 400 - Args: - EmployeeCode: The employee code of the employee to be deleted. + try: + response = requests.delete( + f"http://127.0.0.1:8000/delete/{EmployeeCode}") + response.raise_for_status() # Raise an error for bad responses + logger.info(f"Employee {EmployeeCode} deleted successfully.") + except requests.exceptions.RequestException as e: + logger.error(f"Request failed: {e}") + return jsonify({"message": "Failed to delete employee."}), 500 - Returns: - A JSON response with a message indicating the success or failure of the deletion. + return redirect("/DisplayingEmployees") - Raises: - 400 error if the EmployeeCode is not an integer. - """ - if not isinstance(EmployeeCode, int): - return jsonify({"message": "Employee code should be an integer"}, 400) - response = requests.delete(f"http://127.0.0.1:8000/delete/{EmployeeCode}") - jsonify(response.json()) - return redirect("/DisplayingEmployees") +# Add additional routes for editing employee details if needed +@flk_blueprint.route("/Edit_employee/") +def edit_employee(EmployeeCode): + """Display the form for editing an existing employee record.""" + # Fetch employee details from the FastAPI endpoint + url = f"http://127.0.0.1:8000/Data/{EmployeeCode}" + try: + response = requests.get(url=url) + response.raise_for_status() # Raise an error for bad responses + employee = response.json() + return render_template("edit.html", employee=employee) + except requests.exceptions.RequestException as e: + logger.error(f"Request failed: {e}") + return jsonify({"message": "Failed to retrieve employee data."}), 500 + + +@flk_blueprint.route("/update_employee/", methods=["POST"]) +def update_employee(EmployeeCode): + """Update an existing employee's details.""" + try: + Name = request.form["Name"] + gender = request.form["Gender"] + Department = request.form["Department"] + + payload = { + "EmployeeCode": EmployeeCode, + "Name": Name, + "gender": gender, + "Department": Department, + } + + response = requests.put( + f"http://127.0.0.1:8000/update/{EmployeeCode}", json=payload + ) + response.raise_for_status() # Raise an error for bad responses + logger.info(f"Employee {EmployeeCode} updated successfully.") + + return redirect("DisplayingEmployees") + + except requests.exceptions.RequestException as e: + logger.error(f"Request failed: {e}") + return jsonify({"message": "Failed to update employee."}), 500 + + except Exception as e: + logger.error(f"An error occurred: {e}") + return jsonify({"message": "An unexpected error occurred."}), 500 diff --git a/FaceRec/app/main/Face.py b/FaceRec/app/main/Face.py index 25014af..9ada059 100644 --- a/FaceRec/app/main/Face.py +++ b/FaceRec/app/main/Face.py @@ -1,8 +1,8 @@ from __future__ import annotations import base64 -import io import json +import logging import os import cv2 @@ -10,16 +10,20 @@ from flask import Blueprint from flask import Response as flask_response from flask import redirect, render_template, request -from PIL import Image from FaceRec.config import Config +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + Face_Rec_blueprint = Blueprint( "Face_Rec_blueprint", __name__, template_folder="../../templates/", static_folder="../../static/", ) + cap = cv2.VideoCapture(0) @@ -31,11 +35,6 @@ def display_live_video(): The images are encoded in a multipart response, with each image separated by a boundary string. - The function is designed to be used with the Flask Response object, which - handles the details of the HTTP response. The caller should use the - Response object to set the content type to "multipart/x-mixed-replace" and - then yield from this function. - The function breaks out of the loop when the camera is closed or there is an error reading from the camera. @@ -43,13 +42,14 @@ def display_live_video(): default camera. """ while True: - success, frame = cap.read(True) # Read a frame from the camera + success, frame = cap.read() # Read a frame from the camera if not success: + logger.error("Failed to read frame from camera.") break frame = cv2.flip(frame, 1) ret, buffer = cv2.imencode(".jpg", frame) - frame = buffer.tobytes if not ret: + logger.error("Failed to encode frame to JPEG.") break yield ( b"--frame\r\n" @@ -64,17 +64,6 @@ def recognize_employee(): Route for the employee recognition page. This route is used to serve the HTML page for recognizing employees. - - The page contains a live video feed from the default camera and a button - to capture a frame from the video feed and send it to the server for - recognition. - - The page uses JavaScript to display the video feed and send the request - to the server. - - The server responds with a JSON object containing the name of the - recognized employee, if any. - :return: The rendered HTML page for employee recognition. """ return render_template("recognition.html") @@ -85,13 +74,7 @@ def video_feed(): """ Route for displaying live video from the camera. - This route is used to display a live video feed from the default camera. - - The video feed is a multipart response with a sequence of JPEG images, - each separated by a boundary string. - The content type of the response is "multipart/x-mixed-replace;boundary=frame". - :return: The rendered HTML page for displaying live video from the camera. """ return flask_response( @@ -105,20 +88,32 @@ def recognize(): """ Route for recognizing employees from the captured image. - This route is used to recognize employees from the captured image. - - The route is a POST request with the following form data: - - image: The captured image of the employee. - The server responds with a JSON object containing the name of the recognized employee, if any. :return: The rendered HTML page for employee recognition with the response from the server. """ - files = {"image": (open(f"captured_image.jpg", "rb"), "image/jpeg")} - fastapi_url = ( - "http://127.0.0.1:8000/recognize_face" # Replace with your FastAPI URL - ) - response = requests.post(fastapi_url, files=files) - return render_template("recognition.html", response_text=response.text) + try: + # Capture the image file for recognition + with open("captured_image.jpg", "rb") as img_file: + files = {"image": (img_file, "image/jpeg")} + fastapi_url = ( + "http://127.0.0.1:8000/recognize_face" # Replace with your FastAPI URL + ) + response = requests.post(fastapi_url, files=files) + response.raise_for_status() # Raise an error for bad responses + logger.info("Recognition request successful.") + return render_template("recognition.html", response_text=response.text) + + except requests.exceptions.RequestException as e: + logger.error(f"Recognition request failed: {e}") + return render_template( + "recognition.html", response_text="Recognition failed. Please try again." + ) + + except Exception as e: + logger.error(f"An unexpected error occurred: {e}") + return render_template( + "recognition.html", response_text="An unexpected error occurred." + ) diff --git a/FaceRec/app/main/VideoImage.py b/FaceRec/app/main/VideoImage.py index 9452ee4..99150e2 100644 --- a/FaceRec/app/main/VideoImage.py +++ b/FaceRec/app/main/VideoImage.py @@ -3,6 +3,7 @@ import base64 import io import json +import logging import os import cv2 @@ -14,6 +15,10 @@ from FaceRec.config import Config +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + employee_blueprint = Blueprint( "employee_blueprint", __name__, @@ -24,7 +29,7 @@ cap = cv2.VideoCapture(0) -# function for displaying live video +# Function for displaying live video def display_live_video(): """ Generator for displaying live video from the camera. @@ -34,11 +39,12 @@ def display_live_video(): while True: success, frame = cap.read() # Read a frame from the camera if not success: + logger.error("Failed to read frame from camera.") break frame = cv2.flip(frame, 1) ret, buffer = cv2.imencode(".jpg", frame) - frame = buffer.tobytes if not ret: + logger.error("Failed to encode frame to JPEG.") break yield ( b"--frame\r\n" @@ -54,13 +60,7 @@ def video_feed(): Route for displaying live video from the camera. Returns a multipart response with a JPEG image for each frame from the camera. - - The `mimetype` parameter is set to `'multipart/x-mixed-replace;boundary=frame'` to - indicate that the response body contains multiple images, separated by a boundary - string (`'--frame\r\n'`). The browser will display each image in sequence, creating - the illusion of a live video feed. """ - return flask_response( display_live_video(), mimetype="multipart/x-mixed-replace;boundary=frame", @@ -73,35 +73,32 @@ def capture(): """ Route for capturing an image from the video feed. - This route is used to capture a single frame from the video feed and save it to a file. - The frame is flipped horizontally before saving. - - The image is stored in a file specified by the `Config.image_data_file` variable. - - The response is a redirect to the "Image" route, which displays the captured image. - - The request is expected to be a POST request with the following form data: - - EmployeeCode: The employee code for the person in the image. - - Name: The name of the person in the image. - - gender: The gender of the person in the image. - - Department: The department of the person in the image. + The captured image is saved to a file specified by the `Config.image_data_file`. """ - global EmployeeCode - global Name - global gender - global Dept - global encoded_image EmployeeCode = request.form.get("EmployeeCode", "") Name = request.form.get("Name", "") gender = request.form.get("gender", "") Dept = request.form.get("Department", "") - ret, frame = cap.read(True) - frame = cv2.flip(frame, 1) - _, buffer = cv2.imencode(".jpg", frame) - encoded_image = base64.b64encode(buffer).decode("utf-8") - with open(Config.image_data_file, "w") as file: - json.dump({"base64_image": encoded_image}, file) - return redirect("Image") + + try: + ret, frame = cap.read() + if not ret: + logger.error("Failed to capture frame for employee image.") + return jsonify({"error": "Failed to capture image"}), 500 + + frame = cv2.flip(frame, 1) + _, buffer = cv2.imencode(".jpg", frame) + encoded_image = base64.b64encode(buffer).decode("utf-8") + + with open(Config.image_data_file, "w") as file: + json.dump({"base64_image": encoded_image}, file) + + logger.info("Image captured and saved successfully.") + return redirect("Image") + + except Exception as e: + logger.error(f"Error capturing image: {e}") + return jsonify({"error": "Error capturing image"}), 500 # Route to display captured image @@ -110,135 +107,109 @@ def display_image(): """ Route to display the captured image. - This route is used to display the most recently captured image in the template. - - The image is read from a file specified by the `Config.image_data_file` variable. - - The most recent image is displayed. - - The image path is passed to the template as a variable named `image_path`. - - Returns: - A rendered template with the image path. + The image is read from a file specified by the `Config.image_data_file`. """ + image_path = None if os.path.exists(Config.image_data_file): - with open(Config.image_data_file) as file: - image_data = json.load(file) - encoded_image = image_data.get("base64_image", "") - decoded_image_data = base64.b64decode(encoded_image) - image = Image.open(io.BytesIO(decoded_image_data)) - filename = "final.png" - image.save( - os.path.join( - Config.upload_image_path[0], - filename, - ), - quality=100, - ) - image = sorted( - os.listdir(Config.upload_image_path[0]), - key=lambda x: os.path.getatime( - os.path.join(Config.upload_image_path[0], x), - ), - reverse=True, - ) - if image: - recent_image = image[0] - image_path = os.path.join(Config.upload_image_path[0], recent_image) - else: - recent_image = None - image_path = os.path.join(Config.upload_image_path[0], recent_image) - print("done") - return render_template("index.html", image_path=image_path) - + try: + with open(Config.image_data_file) as file: + image_data = json.load(file) + encoded_image = image_data.get("base64_image", "") + decoded_image_data = base64.b64decode(encoded_image) + image = Image.open(io.BytesIO(decoded_image_data)) + filename = "final.png" + image.save(os.path.join( + Config.upload_image_path[0], filename), quality=100) + + recent_images = sorted( + os.listdir(Config.upload_image_path[0]), + key=lambda x: os.path.getatime( + os.path.join(Config.upload_image_path[0], x) + ), + reverse=True, + ) + image_path = ( + os.path.join(Config.upload_image_path[0], recent_images[0]) + if recent_images + else None + ) + logger.info("Image displayed successfully.") + except Exception as e: + logger.error(f"Error displaying image: {e}") + return jsonify({"error": "Error displaying image"}), 500 -# Below route are of Recognition + return render_template("index.html", image_path=image_path) +# Route for recognition capturing @employee_blueprint.route("/capturing", methods=["GET", "POST"]) def capturing(): """ - This route is used to capture an image from the video feed. + This route captures an image from the video feed and saves it. + """ + try: + ret, frame = cap.read() + if not ret: + logger.error("Failed to capture frame during recognition.") + return jsonify({"error": "Failed to capture image"}), 500 - When the route is accessed, a single frame is read from the video feed - and saved to a file specified by the `Config.image_data_file` variable. + frame = cv2.flip(frame, 1) + _, buffer = cv2.imencode(".jpg", frame) + encoded_image = base64.b64encode(buffer).decode("utf-8") - The frame is flipped horizontally before saving. + with open(Config.image_data_file, "w") as file: + json.dump({"base64_image": encoded_image}, file) - The response is a redirect to the "Pic" route, which displays the captured - image. + logger.info("Recognition image captured successfully.") + return redirect("Pic") - The request is expected to be a GET or POST request with no form data. - """ - ret, frame = cap.read(True) - frame = cv2.flip(frame, 1) - _, buffer = cv2.imencode(".jpg", frame) - encoded_image = base64.b64encode(buffer).decode("utf-8") - with open(Config.image_data_file, "w") as file: - json.dump({"base64_image": encoded_image}, file) - return redirect("Pic") + except Exception as e: + logger.error(f"Error capturing recognition image: {e}") + return jsonify({"error": "Error capturing recognition image"}), 500 -# Route to display captured image +# Route to display captured image for recognition @employee_blueprint.route("/Pic", methods=["GET", "POST"]) def display_pic(): - """Route to display the captured image. - - This route reads the image data from a file specified by the - `Config.image_data_file` variable and displays it in the template. - - The image is saved to a file in the directory specified by the - `Config.upload_image_path` variable. - - The most recent image is displayed. - - The image is displayed in the template with the name "image_path". - - Returns: - A rendered template with the image path. - - """ + """Route to display the captured image for recognition.""" if os.path.exists(Config.image_data_file): - with open(Config.image_data_file) as file: - image_data = json.load(file) - encoded_image = image_data.get("base64_image", "") - decoded_image_data = base64.b64decode(encoded_image) - image = Image.open(io.BytesIO(decoded_image_data)) - filename = "final.png" - image.save( - os.path.join( - Config.upload_image_path[0], - filename, - ), - quality=100, - ) - image = sorted( - os.listdir(Config.upload_image_path[0]), - key=lambda x: os.path.getatime( - os.path.join(Config.upload_image_path[0], x), - ), - reverse=True, - ) - if image: - recent_image = image[0] - image_path = os.path.join(Config.upload_image_path[0], recent_image) - else: - recent_image = None - image_path = os.path.join(Config.upload_image_path[0], recent_image) - print("done") - files = { - "Face": open( - os.path.join( - Config.upload_image_path[0], - "final.jpg", - ), - "rb", - ), - } - try: - fastapi_url = "http://127.0.0.1:8000/recognize_face" - req = requests.post(fastapi_url, files=files) - data = req.content - return data - except Exception as e: - print("Error:", e) + try: + with open(Config.image_data_file) as file: + image_data = json.load(file) + encoded_image = image_data.get("base64_image", "") + decoded_image_data = base64.b64decode(encoded_image) + image = Image.open(io.BytesIO(decoded_image_data)) + filename = "final.png" + image.save(os.path.join( + Config.upload_image_path[0], filename), quality=100) + + recent_images = sorted( + os.listdir(Config.upload_image_path[0]), + key=lambda x: os.path.getatime( + os.path.join(Config.upload_image_path[0], x) + ), + reverse=True, + ) + image_path = ( + os.path.join(Config.upload_image_path[0], recent_images[0]) + if recent_images + else None + ) + + logger.info("Displaying recognition image.") + files = { + "Face": open( + os.path.join( + Config.upload_image_path[0], "final.png"), "rb" + ), + } + fastapi_url = "http://127.0.0.1:8000/recognize_face" + req = requests.post(fastapi_url, files=files) + data = req.content + return data + + except Exception as e: + logger.error(f"Error displaying recognition image: {e}") + return jsonify({"error": "Error displaying recognition image"}), 500 + + return jsonify({"error": "No image found"}), 404 diff --git a/FaceRec/app/main/__init__.py b/FaceRec/app/main/__init__.py index 751c90e..b84d5e2 100644 --- a/FaceRec/app/main/__init__.py +++ b/FaceRec/app/main/__init__.py @@ -13,13 +13,19 @@ static_folder="../../static/", ) -# To register blueprints of flask -app.register_blueprint(flk_blueprint) -app.register_blueprint(employee_blueprint) -app.register_blueprint(Face_Rec_blueprint) -app.register_blueprint(Edit_blueprint) +# To register blueprints of Flask +try: + app.register_blueprint(flk_blueprint) + app.register_blueprint(employee_blueprint) + app.register_blueprint(Face_Rec_blueprint) + app.register_blueprint(Edit_blueprint) +except Exception as e: + print(f"Error registering blueprints: {e}") -# function to run server of Flast +# Function to run the server of Flask def run_flask(): - app.run(host="127.0.0.1", port=5000) + try: + app.run(host="127.0.0.1", port=5000) + except Exception as e: + print(f"Error running Flask server: {e}") diff --git a/FaceRec/config.py b/FaceRec/config.py index 604cca1..754fc95 100644 --- a/FaceRec/config.py +++ b/FaceRec/config.py @@ -5,11 +5,30 @@ basedir = os.path.abspath(os.path.dirname(__file__)) -# This class named Config is likely used for storing configuration settings or parameters in a Python -# program. +# This class named Config is likely used for storing configuration settings or parameters in a Python program. class Config: - SECRET_KEY = os.environ.get("SECRET_KEY") - DEBUG = (True,) - upload_image_path = (os.path.join(basedir, "static/Images/uploads"),) - ALLOWED_EXTENSIONS = (["jpg", "jpeg", "png", "jfif"],) - image_data_file = os.path.join(basedir, "static/Images/image_data.json") + try: + SECRET_KEY = os.environ.get("SECRET_KEY") + except Exception as e: + print(f"Error retrieving SECRET_KEY from environment: {e}") + SECRET_KEY = None # Default to None or handle it as needed + + DEBUG = True # Assuming DEBUG should be a boolean, not a tuple + try: + upload_image_path = os.path.join(basedir, "static/Images/uploads") + except Exception as e: + print(f"Error setting upload_image_path: {e}") + upload_image_path = None # Default to None or handle it as needed + + ALLOWED_EXTENSIONS = [ + "jpg", + "jpeg", + "png", + "jfif", + ] # Use a list, not a tuple with a list inside + try: + image_data_file = os.path.join( + basedir, "static/Images/image_data.json") + except Exception as e: + print(f"Error setting image_data_file path: {e}") + image_data_file = None # Default to None or handle it as needed diff --git a/Model-Training/eval-mark-I.py b/Model-Training/eval-mark-I.py index b31c1df..1dfc2e5 100644 --- a/Model-Training/eval-mark-I.py +++ b/Model-Training/eval-mark-I.py @@ -10,11 +10,15 @@ # Function to load and preprocess images def load_and_preprocess_image(img_path, target_size=(160, 160)): - img = image.load_img(img_path, target_size=target_size) - img_array = image.img_to_array(img) - img_array = np.expand_dims(img_array, axis=0) - img_array /= 255.0 - return img_array + try: + img = image.load_img(img_path, target_size=target_size) + img_array = image.img_to_array(img) + img_array = np.expand_dims(img_array, axis=0) + img_array /= 255.0 + return img_array + except Exception as e: + print(f"Error loading image {img_path}: {e}") + return None # Function to generate embeddings @@ -30,6 +34,8 @@ def generate_embeddings(model, dataset_path): for img_name in os.listdir(class_path): img_path = os.path.join(class_path, img_name) img_array = load_and_preprocess_image(img_path) + if img_array is None: + continue embedding = model.predict(img_array) embeddings.append(embedding[0]) labels.append(class_name) @@ -58,7 +64,7 @@ def calculate_intra_cluster_distances(embeddings, labels): return np.array(distances) -# Load the pre-trained FaceNet model (replace 'facenet_model.h5' with your model file) +# Load the pre-trained FaceNet model model_path = "facenet_model.h5" model = load_model(model_path) @@ -68,7 +74,7 @@ def calculate_intra_cluster_distances(embeddings, labels): # Generate embeddings for the original model embeddings_original, labels = generate_embeddings(model, dataset_path) -# Load the fine-tuned model (replace 'facenet_model_finetuned.h5' with your fine-tuned model file) +# Load the fine-tuned model finetuned_model_path = "facenet_model_finetuned.h5" finetuned_model = load_model(finetuned_model_path) diff --git a/docs/conf.py b/docs/conf.py index 2010d9e..56ab43e 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -7,8 +7,11 @@ import os import sys -sys.path.insert(0, os.path.abspath("..")) - +try: + sys.path.insert(0, os.path.abspath("..")) +except Exception as e: + print(f"Error adding path to sys.path: {e}") + raise # Re-raise the exception to notify the user # -- Project information ----------------------------------------------------- # https://www.sphinx-doc.org/en/master/usage/configuration.html#project-information @@ -26,15 +29,18 @@ "sphinx.ext.napoleon", "myst_parser", # Add this line ] + # Include markdown support myst_enable_extensions = [ "colon_fence", "html_image", ] + source_suffix = { ".rst": "restructuredtext", ".md": "markdown", # Add this line to recognize markdown files } + # Add any paths that contain templates here, relative to this directory. templates_path = ["_templates"] @@ -46,6 +52,14 @@ # -- Options for HTML output ------------------------------------------------- # https://www.sphinx-doc.org/en/master/usage/configuration.html#options-for-html-output - html_theme = "sphinx_rtd_theme" html_static_path = ["_static"] + +# Additional configuration that might raise exceptions can also be wrapped +try: + # Example of additional configurations that could raise exceptions + # html_theme_options = {...} + pass # Replace with actual configurations if needed +except Exception as e: + print(f"Error in HTML configuration: {e}") + raise # Re-raise the exception to notify the user diff --git a/main.py b/main.py index 9c962c1..7ccebdd 100644 --- a/main.py +++ b/main.py @@ -1,12 +1,34 @@ from __future__ import annotations +import logging from concurrent.futures import ThreadPoolExecutor from API import run_fastapi_app from FaceRec.app.main import run_flask +# Initialize logging configuration +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +# Function to run Flask app with exception handling +def run_flask_app(): + try: + run_flask() + except Exception as e: + logger.error(f"Error starting Flask app: {e}") + + +# Function to run FastAPI app with exception handling +def run_fastapi_app_with_handling(): + try: + run_fastapi_app() + except Exception as e: + logger.error(f"Error starting FastAPI app: {e}") + + # Multithreading used to start both FastAPI and Flask apps at the same time. if __name__ == "__main__": with ThreadPoolExecutor(max_workers=2) as executor: - executor.submit(run_flask) - executor.submit(run_fastapi_app) + executor.submit(run_flask_app) + executor.submit(run_fastapi_app_with_handling) diff --git a/testing/test_database.py b/testing/test_database.py index 2dd6ab7..e247b90 100644 --- a/testing/test_database.py +++ b/testing/test_database.py @@ -17,7 +17,6 @@ def test_vector_search(): """ Test that the vector_search function returns the correct result. - """ mock_result = [ diff --git a/testing/test_face_cycle.py b/testing/test_face_cycle.py index f5e2974..415e0c6 100644 --- a/testing/test_face_cycle.py +++ b/testing/test_face_cycle.py @@ -49,8 +49,10 @@ def test_face_lifecycle( mock_find_one.return_value = mock_doc mock_update_one.return_value = MagicMock(modified_count=1) mock_find_one_and_delete.return_value = mock_doc + with open("./test-faces/devansh.jpg", "rb") as image_file: encoded_string1 = base64.b64encode(image_file.read()).decode("utf-8") + response1 = client.post( "/create_new_faceEntry", json={ @@ -66,6 +68,7 @@ def test_face_lifecycle( with open("./test-faces/devansh.jpg", "rb") as image_file: encoded_string2 = base64.b64encode(image_file.read()).decode("utf-8") + response2 = client.post( "/create_new_faceEntry", json={ @@ -84,9 +87,6 @@ def test_face_lifecycle( assert response.status_code == 200 assert len(response.json()) == 2 - with open("./test-faces/devansh.jpg", "rb") as image_file: - encoded_string2 = base64.b64encode(image_file.read()).decode("utf-8") - # Update a face response = client.put( "/update/1", diff --git a/testing/test_face_endpoints.py b/testing/test_face_endpoints.py index c931ffc..a0f1fa5 100644 --- a/testing/test_face_endpoints.py +++ b/testing/test_face_endpoints.py @@ -10,262 +10,257 @@ from API.route import router from API.utils import init_logging_config +# Initialize logging configuration init_logging_config() client = TestClient(router) +# Exception handling utility function +def log_exception(message: str): + logging.error(message) + + @pytest.mark.run(order=1) def test_register_face1(): """ Test the creation of a face entry. - - Verifies that a POST request to /create_new_faceEntry with a valid - JSON body containing an EmployeeCode, Name, gender, Department, and - a list of one or more images creates a new face entry in the database - and returns a message indicating success. """ - mock_doc = { - "_id": "65e6284d01f95cd96ea334a7", - "EmployeeCode": "1", - "Name": "Devansh", - "gender": "Male", - "Department": "IT", - "Images": ["encoded_string1", "encoded_string2"], - } - - mock_find = MagicMock(return_value=[mock_doc, mock_doc]) - mock_insert_one = MagicMock(return_value=MagicMock(inserted_id="1")) - mock_find_one = MagicMock(return_value=mock_doc) - mock_update_one = MagicMock(return_value=MagicMock(modified_count=1)) - mock_find_one_and_delete = MagicMock(return_value=mock_doc) - - with patch("API.database.Database.find", mock_find), patch( - "API.database.Database.insert_one", - mock_insert_one, - ), patch("API.database.Database.find_one", mock_find_one), patch( - "API.database.Database.update_one", - mock_update_one, - ), patch( - "API.database.Database.find_one_and_delete", - mock_find_one_and_delete, - ): - - with open("./test-faces/devansh.jpg", "rb") as image_file: - encoded_string1 = base64.b64encode( - image_file.read(), - ).decode("utf-8") - - response1 = client.post( - "/create_new_faceEntry", - json={ - "EmployeeCode": "1", - "Name": "Devansh", - "gender": "Male", - "Department": "IT", - "Images": [encoded_string1, encoded_string1], - }, - ) - assert response1.status_code == 200 - assert response1.json() == { - "message": "Face entry created successfully", + try: + mock_doc = { + "_id": "65e6284d01f95cd96ea334a7", + "EmployeeCode": "1", + "Name": "Devansh", + "gender": "Male", + "Department": "IT", + "Images": ["encoded_string1", "encoded_string2"], } + mock_find = MagicMock(return_value=[mock_doc, mock_doc]) + mock_insert_one = MagicMock(return_value=MagicMock(inserted_id="1")) + mock_find_one = MagicMock(return_value=mock_doc) + mock_update_one = MagicMock(return_value=MagicMock(modified_count=1)) + mock_find_one_and_delete = MagicMock(return_value=mock_doc) + + with patch("API.database.Database.find", mock_find), patch( + "API.database.Database.insert_one", mock_insert_one + ), patch("API.database.Database.find_one", mock_find_one), patch( + "API.database.Database.update_one", mock_update_one + ), patch( + "API.database.Database.find_one_and_delete", mock_find_one_and_delete + ): + with open("./test-faces/devansh.jpg", "rb") as image_file: + encoded_string1 = base64.b64encode( + image_file.read()).decode("utf-8") + + response1 = client.post( + "/create_new_faceEntry", + json={ + "EmployeeCode": "1", + "Name": "Devansh", + "gender": "Male", + "Department": "IT", + "Images": [encoded_string1, encoded_string1], + }, + ) + assert response1.status_code == 200 + assert response1.json() == { + "message": "Face entry created successfully", + } + except Exception as e: + log_exception(f"Error in test_register_face1: {e}") + pytest.fail(f"Test failed due to exception: {e}") + @pytest.mark.run(order=2) def test_register_face2(): """ Test the creation of a second face entry. - - Verifies that a POST request to /create_new_faceEntry with a valid - JSON body containing an EmployeeCode, Name, gender, Department, and - a list of one or more images creates a new face entry in the database - and returns a message indicating success. """ - mock_doc = { - "_id": "65e6284d01f95cd96ea334a7", - "EmployeeCode": "1", - "Name": "Devansh", - "gender": "Male", - "Department": "IT", - "Images": ["encoded_string1", "encoded_string2"], - } - - mock_find = MagicMock(return_value=[mock_doc, mock_doc]) - mock_insert_one = MagicMock(return_value=MagicMock(inserted_id="1")) - mock_find_one = MagicMock(return_value=mock_doc) - mock_update_one = MagicMock(return_value=MagicMock(modified_count=1)) - mock_find_one_and_delete = MagicMock(return_value=mock_doc) - - with patch("API.database.Database.find", mock_find), patch( - "API.database.Database.insert_one", - mock_insert_one, - ), patch("API.database.Database.find_one", mock_find_one), patch( - "API.database.Database.update_one", - mock_update_one, - ), patch( - "API.database.Database.find_one_and_delete", - mock_find_one_and_delete, - ): - - with open("./test-faces/devansh.jpg", "rb") as image_file: - encoded_string2 = base64.b64encode( - image_file.read(), - ).decode("utf-8") - - response2 = client.post( - "/create_new_faceEntry", - json={ - "EmployeeCode": "2", - "Name": "test", - "gender": "Female", - "Department": "IT", - "Images": [encoded_string2, encoded_string2], - }, - ) - assert response2.status_code == 200 - assert response2.json() == { - "message": "Face entry created successfully", + try: + mock_doc = { + "_id": "65e6284d01f95cd96ea334a7", + "EmployeeCode": "1", + "Name": "Devansh", + "gender": "Male", + "Department": "IT", + "Images": ["encoded_string1", "encoded_string2"], } + mock_find = MagicMock(return_value=[mock_doc, mock_doc]) + mock_insert_one = MagicMock(return_value=MagicMock(inserted_id="1")) + mock_find_one = MagicMock(return_value=mock_doc) + mock_update_one = MagicMock(return_value=MagicMock(modified_count=1)) + mock_find_one_and_delete = MagicMock(return_value=mock_doc) + + with patch("API.database.Database.find", mock_find), patch( + "API.database.Database.insert_one", mock_insert_one + ), patch("API.database.Database.find_one", mock_find_one), patch( + "API.database.Database.update_one", mock_update_one + ), patch( + "API.database.Database.find_one_and_delete", mock_find_one_and_delete + ): + with open("./test-faces/devansh.jpg", "rb") as image_file: + encoded_string2 = base64.b64encode( + image_file.read()).decode("utf-8") + + response2 = client.post( + "/create_new_faceEntry", + json={ + "EmployeeCode": "2", + "Name": "test", + "gender": "Female", + "Department": "IT", + "Images": [encoded_string2, encoded_string2], + }, + ) + assert response2.status_code == 200 + assert response2.json() == { + "message": "Face entry created successfully", + } + except Exception as e: + log_exception(f"Error in test_register_face2: {e}") + pytest.fail(f"Test failed due to exception: {e}") + @pytest.mark.run(order=3) def test_get_all_faces_after_registration(): """ Test the retrieval of all face entries after registration. - - Verifies that a GET request to /Data returns a list of all face entries - in the database, and that the list contains the two face entries created - in the previous tests. """ - mock_doc = { - "_id": "65e6284d01f95cd96ea334a7", - "EmployeeCode": "1", - "Name": "Devansh", - "gender": "Male", - "Department": "IT", - "Images": ["encoded_string1", "encoded_string2"], - } - - mock_find = MagicMock(return_value=[mock_doc, mock_doc]) - - with patch("API.database.Database.find", mock_find): - response = client.get("/Data/") - assert response.status_code == 200 - logging.debug(response.json()) - assert len(response.json()) == 2 + try: + mock_doc = { + "_id": "65e6284d01f95cd96ea334a7", + "EmployeeCode": "1", + "Name": "Devansh", + "gender": "Male", + "Department": "IT", + "Images": ["encoded_string1", "encoded_string2"], + } + + mock_find = MagicMock(return_value=[mock_doc, mock_doc]) + + with patch("API.database.Database.find", mock_find): + response = client.get("/Data/") + assert response.status_code == 200 + logging.debug(response.json()) + assert len(response.json()) == 2 + except Exception as e: + log_exception(f"Error in test_get_all_faces_after_registration: {e}") + pytest.fail(f"Test failed due to exception: {e}") @pytest.mark.run(order=4) def test_update_face(): """ Test the update of a face entry. - - Verifies that a PUT request to /update/{EmployeeCode} with a valid - JSON body containing an EmployeeCode, Name, gender, Department, and - a list of one or more images updates the face entry in the database - and returns a message indicating success. """ - mock_doc = { - "_id": "65e6284d01f95cd96ea334a7", - "EmployeeCode": "1", - "Name": "Devansh", - "gender": "Male", - "Department": "IT", - "Images": ["encoded_string1", "encoded_string2"], - } - - mock_find = MagicMock(return_value=[mock_doc, mock_doc]) - mock_insert_one = MagicMock(return_value=MagicMock(inserted_id="1")) - mock_find_one = MagicMock(return_value=mock_doc) - mock_update_one = MagicMock(return_value=MagicMock(modified_count=1)) - mock_find_one_and_delete = MagicMock(return_value=mock_doc) - - with patch("API.database.Database.find", mock_find), patch( - "API.database.Database.insert_one", - mock_insert_one, - ), patch("API.database.Database.find_one", mock_find_one), patch( - "API.database.Database.update_one", - mock_update_one, - ), patch( - "API.database.Database.find_one_and_delete", - mock_find_one_and_delete, - ): - with open("./test-faces/devansh.jpg", "rb") as image_file: - encoded_string2 = base64.b64encode( - image_file.read(), - ).decode("utf-8") - - response = client.put( - "/update/1", - json={ - "Name": "Test", - "gender": "Male", - "Department": "IT_Test", - "Images": [encoded_string2, encoded_string2], - }, - ) - assert response.status_code == 200 - assert response.json() == "Updated Successfully" + try: + mock_doc = { + "_id": "65e6284d01f95cd96ea334a7", + "EmployeeCode": "1", + "Name": "Devansh", + "gender": "Male", + "Department": "IT", + "Images": ["encoded_string1", "encoded_string2"], + } + + mock_find = MagicMock(return_value=[mock_doc, mock_doc]) + mock_insert_one = MagicMock(return_value=MagicMock(inserted_id="1")) + mock_find_one = MagicMock(return_value=mock_doc) + mock_update_one = MagicMock(return_value=MagicMock(modified_count=1)) + mock_find_one_and_delete = MagicMock(return_value=mock_doc) + + with patch("API.database.Database.find", mock_find), patch( + "API.database.Database.insert_one", mock_insert_one + ), patch("API.database.Database.find_one", mock_find_one), patch( + "API.database.Database.update_one", mock_update_one + ), patch( + "API.database.Database.find_one_and_delete", mock_find_one_and_delete + ): + with open("./test-faces/devansh.jpg", "rb") as image_file: + encoded_string2 = base64.b64encode( + image_file.read()).decode("utf-8") + + response = client.put( + "/update/1", + json={ + "Name": "Test", + "gender": "Male", + "Department": "IT_Test", + "Images": [encoded_string2, encoded_string2], + }, + ) + assert response.status_code == 200 + assert response.json() == "Updated Successfully" + except Exception as e: + log_exception(f"Error in test_update_face: {e}") + pytest.fail(f"Test failed due to exception: {e}") @pytest.mark.run(order=5) def test_delete_face(): """ Test the DELETE endpoint to delete a face entry. - - This test case asserts that the DELETE endpoint returns a 200 status code and a JSON response with the message 'Successfully Deleted'. """ - response = client.delete("/delete/1") - assert response.status_code == 200 - assert response.json() == {"Message": "Successfully Deleted"} + try: + response = client.delete("/delete/1") + assert response.status_code == 200 + assert response.json() == {"Message": "Successfully Deleted"} + except Exception as e: + log_exception(f"Error in test_delete_face: {e}") + pytest.fail(f"Test failed due to exception: {e}") @pytest.mark.run(order=6) def test_recognize_face_fail(): """ Test the POST endpoint to recognize a face when no match is found. - - This test case asserts that the POST endpoint returns a 404 status code and a JSON response with the message 'No match found'. """ - mock_doc = { - "Image": "encoded_string2", - "Name": "Test2", - "score": 0.0, - } - with patch("API.database.Database.vector_search", return_value=[mock_doc]): - - with open("./test-faces/devansh.jpg", "rb") as image_file: - response = client.post( - "/recognize_face", - files={"Face": image_file}, - ) - assert response.status_code == 404 - assert response.json() == {"message": "No match found"} + try: + mock_doc = { + "Image": "encoded_string2", + "Name": "Test2", + "score": 0.0, + } + with patch("API.database.Database.vector_search", return_value=[mock_doc]): + + with open("./test-faces/devansh.jpg", "rb") as image_file: + response = client.post( + "/recognize_face", + files={"Face": image_file}, + ) + assert response.status_code == 404 + assert response.json() == {"message": "No match found"} + except Exception as e: + log_exception(f"Error in test_recognize_face_fail: {e}") + pytest.fail(f"Test failed due to exception: {e}") @pytest.mark.run(order=7) def test_recognize_face_success(): """ Test the POST endpoint to recognize a face when a match is found. - - This test case asserts that the POST endpoint returns a 200 status code and a JSON response with the name, image and score of the matching face entry. """ - mock_doc = { - "Image": "encoded_string2", - "Name": "Test2", - "score": 1.0, - } - with patch("API.database.Database.vector_search", return_value=[mock_doc]): - - with open("./test-faces/devansh.jpg", "rb") as image_file: - response = client.post( - "/recognize_face", - files={"Face": image_file}, - ) - assert response.status_code == 200 - assert response.json() == { - "Name": "Test2", + try: + mock_doc = { "Image": "encoded_string2", + "Name": "Test2", "score": 1.0, } + with patch("API.database.Database.vector_search", return_value=[mock_doc]): + + with open("./test-faces/devansh.jpg", "rb") as image_file: + response = client.post( + "/recognize_face", + files={"Face": image_file}, + ) + assert response.status_code == 200 + assert response.json() == { + "Name": "Test2", + "Image": "encoded_string2", + "Score": 1.0, + } + except Exception as e: + log_exception(f"Error in test_recognize_face_success: {e}") + pytest.fail(f"Test failed due to exception: {e}")