| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178 |
- import sys
- import tempfile
- import grpc
- from concurrent import futures
- import os
- from deepface import DeepFace
- sys.path.insert(0, os.path.join(os.path.dirname(__file__), "proto"))
- from proto import face_recognition_pb2, face_recognition_pb2_grpc
- EMPLOYEE_DB_PATH = "data/employees"
- class FaceRecognitionServicer(face_recognition_pb2_grpc.FaceRecognitionServiceServicer):
- def Recognize(self, request, context):
- print("[INFO] Received recognition request...")
- temp_file = None
- try:
- # Save incoming image to a temporary file
- with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp:
- temp_file = tmp.name
- tmp.write(request.image)
- print("[INFO] Running DeepFace search...")
- result = DeepFace.find(
- img_path=temp_file,
- db_path=EMPLOYEE_DB_PATH,
- detector_backend="opencv",
- enforce_detection=False
- )
- if isinstance(result, list):
- result = result[0]
- if result is None or len(result) == 0:
- print("[INFO] No match found.")
- return face_recognition_pb2.FaceResponse(
- name="Unknown",
- confidence=0.0,
- image=b""
- )
- best_row = result.iloc[0]
- matched_image_path = best_row["identity"]
- name = os.path.splitext(os.path.basename(matched_image_path))[0]
- # Convert distance to rough confidence
- distance = best_row.get("VGG-Face_cosine", 0.3)
- confidence = float(max(0.0, 1.0 - distance))
- # Read matched image bytes
- with open(matched_image_path, "rb") as f:
- matched_image_bytes = f.read()
- print(f"[INFO] Match: {name}, confidence: {confidence:.2f}")
- return face_recognition_pb2.FaceResponse(
- name=name,
- confidence=confidence,
- image=matched_image_bytes
- )
- except Exception as e:
- print("[ERROR] DeepFace failed:", e)
- return face_recognition_pb2.FaceResponse(
- name="Error",
- confidence=0.0,
- image=b""
- )
- finally:
- if temp_file and os.path.exists(temp_file):
- try:
- os.remove(temp_file)
- except Exception as cleanup_error:
- print(
- f"[WARN] Could not delete temp file {temp_file}: {cleanup_error}")
- def EnrollFace(self, request, context):
- """
- Handles enrollment of a new employee.
- Saves the uploaded image in EMPLOYEE_DB_PATH with the employee's name.
- """
- try:
- name = request.name
- image_bytes = request.image
- if not name or not image_bytes:
- return face_recognition_pb2.EnrollFaceResponse(
- success=False,
- message="Name or image missing"
- )
- os.makedirs(EMPLOYEE_DB_PATH, exist_ok=True)
- file_path = os.path.join(EMPLOYEE_DB_PATH, f"{name}.jpg")
- with open(file_path, "wb") as f:
- f.write(image_bytes)
- print(f"[INFO] Enrolled employee: {name}, saved at {file_path}")
- return face_recognition_pb2.EnrollFaceResponse(
- success=True,
- message="Enrollment successful"
- )
- except Exception as e:
- print(f"[ERROR] Enrollment failed: {e}")
- return face_recognition_pb2.EnrollFaceResponse(
- success=False,
- message=str(e)
- )
- def GetAllEmployees(self, request, context):
- try:
- employee_files = [
- f for f in os.listdir(EMPLOYEE_DB_PATH)
- if f.lower().endswith((".jpg", ".png", ".jpeg"))
- ]
- employees = []
- for file in employee_files:
- path = os.path.join(EMPLOYEE_DB_PATH, file)
- name = os.path.splitext(file)[0]
- with open(path, "rb") as f:
- image_bytes = f.read()
- employees.append(face_recognition_pb2.Employee(
- name=name,
- image=image_bytes
- ))
- return face_recognition_pb2.EmployeeListResponse(
- employees=employees
- )
- except Exception as e:
- print("[ERROR] Failed to list employees:", e)
- return face_recognition_pb2.EmployeeListResponse(employees=[])
- def DeleteEmployee(self, request, context):
- try:
- found = False
- for file in os.listdir(EMPLOYEE_DB_PATH):
- if os.path.splitext(file)[0] == request.name:
- os.remove(os.path.join(EMPLOYEE_DB_PATH, file))
- found = True
- break
- message = "Deleted successfully" if found else "Employee not found"
- return face_recognition_pb2.DeleteEmployeeResponse(
- success=found,
- message=message
- )
- except Exception as e:
- print("[ERROR] Failed to delete employee:", e)
- return face_recognition_pb2.DeleteEmployeeResponse(
- success=False,
- message=str(e)
- )
- def serve():
- server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
- face_recognition_pb2_grpc.add_FaceRecognitionServiceServicer_to_server(
- FaceRecognitionServicer(), server
- )
- server.add_insecure_port("[::]:50051")
- server.start()
- print("[INFO] gRPC Face Recognition server running on port 50051")
- server.wait_for_termination()
- if __name__ == "__main__":
- os.makedirs(EMPLOYEE_DB_PATH, exist_ok=True)
- serve()
|