|
|
@@ -1,13 +1,14 @@
|
|
|
+import tempfile
|
|
|
import grpc
|
|
|
+import sys
|
|
|
from concurrent import futures
|
|
|
-import time
|
|
|
import os
|
|
|
from deepface import DeepFace
|
|
|
-import pandas as pd
|
|
|
|
|
|
-import face_recognition_pb2
|
|
|
-import face_recognition_pb2_grpc
|
|
|
+sys.path.insert(0, os.path.join(os.path.dirname(__file__), "proto"))
|
|
|
|
|
|
+from proto import face_recognition_pb2
|
|
|
+from proto import face_recognition_pb2_grpc
|
|
|
|
|
|
EMPLOYEE_DB_PATH = "data/employees"
|
|
|
|
|
|
@@ -15,24 +16,22 @@ EMPLOYEE_DB_PATH = "data/employees"
|
|
|
class FaceRecognitionServicer(face_recognition_pb2_grpc.FaceRecognitionServiceServicer):
|
|
|
|
|
|
def Recognize(self, request, context):
|
|
|
- print("[INFO] Received request...")
|
|
|
-
|
|
|
- # Save temp image
|
|
|
- temp_path = "temp_upload.jpg"
|
|
|
- with open(temp_path, "wb") as f:
|
|
|
- f.write(request.image)
|
|
|
-
|
|
|
+ print("[INFO] Received recognition request...")
|
|
|
+ temp_file = None
|
|
|
try:
|
|
|
- print("[INFO] Running DeepFace search...")
|
|
|
+ # Save incoming image to a temporary file
|
|
|
+ with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp:
|
|
|
+ temp_file = tmp.name
|
|
|
+ tmp.write(request.image)
|
|
|
|
|
|
+ print("[INFO] Running DeepFace search...")
|
|
|
result = DeepFace.find(
|
|
|
- img_path=temp_path,
|
|
|
+ img_path=temp_file,
|
|
|
db_path=EMPLOYEE_DB_PATH,
|
|
|
+ detector_backend="opencv",
|
|
|
enforce_detection=False
|
|
|
)
|
|
|
|
|
|
- # result is a list of pandas DataFrames for each model;
|
|
|
- # we only use first result
|
|
|
if isinstance(result, list):
|
|
|
result = result[0]
|
|
|
|
|
|
@@ -40,35 +39,90 @@ class FaceRecognitionServicer(face_recognition_pb2_grpc.FaceRecognitionServiceSe
|
|
|
print("[INFO] No match found.")
|
|
|
return face_recognition_pb2.FaceResponse(
|
|
|
name="Unknown",
|
|
|
- confidence=0.0
|
|
|
+ confidence=0.0,
|
|
|
+ image=b""
|
|
|
)
|
|
|
|
|
|
- # Pick best match
|
|
|
best_row = result.iloc[0]
|
|
|
matched_image_path = best_row["identity"]
|
|
|
name = os.path.splitext(os.path.basename(matched_image_path))[0]
|
|
|
|
|
|
- # DeepFace gives "distance" values, convert roughly to confidence
|
|
|
+ # Convert distance to rough confidence
|
|
|
distance = best_row.get("VGG-Face_cosine", 0.3)
|
|
|
confidence = float(max(0.0, 1.0 - distance))
|
|
|
|
|
|
- print(f"[INFO] Match: {name}, confidence: {confidence:.2f}")
|
|
|
+ # Read matched image bytes
|
|
|
+ with open(matched_image_path, "rb") as f:
|
|
|
+ matched_image_bytes = f.read()
|
|
|
|
|
|
+ print(f"[INFO] Match: {name}, confidence: {confidence:.2f}")
|
|
|
return face_recognition_pb2.FaceResponse(
|
|
|
name=name,
|
|
|
- confidence=confidence
|
|
|
+ confidence=confidence,
|
|
|
+ image=matched_image_bytes
|
|
|
)
|
|
|
|
|
|
except Exception as e:
|
|
|
print("[ERROR] DeepFace failed:", e)
|
|
|
return face_recognition_pb2.FaceResponse(
|
|
|
name="Error",
|
|
|
- confidence=0.0
|
|
|
+ confidence=0.0,
|
|
|
+ image=b""
|
|
|
)
|
|
|
|
|
|
finally:
|
|
|
- if os.path.exists(temp_path):
|
|
|
- os.remove(temp_path)
|
|
|
+ if temp_file and os.path.exists(temp_file):
|
|
|
+ try:
|
|
|
+ os.remove(temp_file)
|
|
|
+ except Exception as cleanup_error:
|
|
|
+ print(f"[WARN] Could not delete temp file {temp_file}: {cleanup_error}")
|
|
|
+
|
|
|
+ def GetAllEmployees(self, request, context):
|
|
|
+ try:
|
|
|
+ employee_files = [
|
|
|
+ f for f in os.listdir(EMPLOYEE_DB_PATH)
|
|
|
+ if f.lower().endswith((".jpg", ".png", ".jpeg"))
|
|
|
+ ]
|
|
|
+
|
|
|
+ employees = []
|
|
|
+ for file in employee_files:
|
|
|
+ path = os.path.join(EMPLOYEE_DB_PATH, file)
|
|
|
+ name = os.path.splitext(file)[0]
|
|
|
+
|
|
|
+ with open(path, "rb") as f:
|
|
|
+ image_bytes = f.read()
|
|
|
+
|
|
|
+ employees.append(face_recognition_pb2.Employee(
|
|
|
+ name=name,
|
|
|
+ image=image_bytes
|
|
|
+ ))
|
|
|
+
|
|
|
+ return face_recognition_pb2.EmployeeListResponse(
|
|
|
+ employees=employees
|
|
|
+ )
|
|
|
+ except Exception as e:
|
|
|
+ print("[ERROR] Failed to list employees:", e)
|
|
|
+ return face_recognition_pb2.EmployeeListResponse(employees=[])
|
|
|
+
|
|
|
+ def DeleteEmployee(self, request, context):
|
|
|
+ try:
|
|
|
+ found = False
|
|
|
+ for file in os.listdir(EMPLOYEE_DB_PATH):
|
|
|
+ if os.path.splitext(file)[0] == request.name:
|
|
|
+ os.remove(os.path.join(EMPLOYEE_DB_PATH, file))
|
|
|
+ found = True
|
|
|
+ break
|
|
|
+ message = "Deleted successfully" if found else "Employee not found"
|
|
|
+ return face_recognition_pb2.DeleteEmployeeResponse(
|
|
|
+ success=found,
|
|
|
+ message=message
|
|
|
+ )
|
|
|
+ except Exception as e:
|
|
|
+ print("[ERROR] Failed to delete employee:", e)
|
|
|
+ return face_recognition_pb2.DeleteEmployeeResponse(
|
|
|
+ success=False,
|
|
|
+ message=str(e)
|
|
|
+ )
|
|
|
|
|
|
|
|
|
def serve():
|
|
|
@@ -84,4 +138,5 @@ def serve():
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
+ os.makedirs(EMPLOYEE_DB_PATH, exist_ok=True)
|
|
|
serve()
|