from typing import List, Optional import uuid import os import shutil from fastapi import FastAPI, File, UploadFile, Body, Form, BackgroundTasks import onnxruntime as ort from ultralytics import YOLO import numpy as np from dotenv import load_dotenv import io from PIL import Image from src.infrastructure.vision_service import VertexVisionService from src.infrastructure.repository import MongoPalmOilRepository from src.application.analyze_bunch import AnalyzeBunchUseCase, AnalyzeBatchUseCase, SearchSimilarUseCase import sqlite3 import json import pandas as pd from datetime import datetime DB_PATH = "palm_history.db" ARCHIVE_DIR = "history_archive" os.makedirs(ARCHIVE_DIR, exist_ok=True) def init_local_db(): print(f"Initializing Local DB at {DB_PATH}...") conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS history ( id INTEGER PRIMARY KEY AUTOINCREMENT, filename TEXT, archive_path TEXT, detections TEXT, summary TEXT, engine TEXT, inference_ms REAL, processing_ms REAL, raw_tensor TEXT, timestamp DATETIME DEFAULT CURRENT_TIMESTAMP ) ''') # Migration: Add engine column if it doesn't exist (for existing DBs) try: cursor.execute("ALTER TABLE history ADD COLUMN engine TEXT") print("Migrated History table: Added 'engine' column.") except sqlite3.OperationalError: # Column already exists pass conn.commit() conn.close() print("Local DB Initialized.") init_local_db() # Load environment variables load_dotenv() # Set Google Cloud credentials globally os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "gemini-embedding-service-key.json" app = FastAPI(title="Palm Oil Ripeness Service (DDD)") class ModelManager: def __init__(self, onnx_path: str, pt_path: str, benchmark_path: str = 'sawit_tbs.pt'): self.onnx_session = ort.InferenceSession(onnx_path) self.onnx_input_name = self.onnx_session.get_inputs()[0].name self.pt_model = YOLO(pt_path) self.class_names = self.pt_model.names self.benchmark_model = YOLO(benchmark_path) self.benchmark_class_names = self.benchmark_model.names def preprocess_onnx(self, img: Image.Image): img = img.convert("RGB") orig_w, orig_h = img.size img_resized = img.resize((640, 640)) img_array = np.array(img_resized) / 255.0 img_array = img_array.transpose(2, 0, 1) img_array = img_array.reshape(1, 3, 640, 640).astype(np.float32) return img_array, orig_w, orig_h def run_onnx_inference(self, img: Image.Image, conf_threshold: float): img_array, orig_w, orig_h = self.preprocess_onnx(img) import time start_inf = time.perf_counter() outputs = self.onnx_session.run(None, {self.onnx_input_name: img_array}) end_inf = time.perf_counter() inference_ms = (end_inf - start_inf) * 1000 # ONNX Output: [batch, num_boxes, 6] (Where 6: x1, y1, x2, y2, conf, cls) # Note: YOLOv8 endpoints often produce normalized coordinates (0.0 to 1.0) detections_batch = outputs[0] detections = [] valid_count = 0 for i in range(detections_batch.shape[1]): det = detections_batch[0, i] conf = float(det[4]) if conf >= conf_threshold: valid_count += 1 # 1. Coordinate Scaling: Convert normalized (0.0-1.0) to absolute pixels x1, y1, x2, y2 = det[:4] abs_x1 = x1 * orig_w abs_y1 = y1 * orig_h abs_x2 = x2 * orig_w abs_y2 = y2 * orig_h class_id = int(det[5]) class_name = self.class_names.get(class_id, "Unknown") detections.append({ "bunch_id": valid_count, "class": class_name, "confidence": round(conf, 2), "is_health_alert": class_name in ["Abnormal", "Empty_Bunch"], "box": [float(abs_x1), float(abs_y1), float(abs_x2), float(abs_y2)] }) # Capture a raw tensor sample (first 5 detections) for technical evidence raw_sample = detections_batch[0, :5].tolist() return detections, raw_sample, inference_ms def run_pytorch_inference(self, img: Image.Image, conf_threshold: float, engine_type: str = "pytorch"): import time start_inf = time.perf_counter() # Selection Logic for Third Engine (YOLOv8-Sawit) model = self.pt_model if engine_type == "pytorch" else self.benchmark_model names = self.class_names if engine_type == "pytorch" else self.benchmark_class_names results = model(img, conf=conf_threshold, verbose=False) end_inf = time.perf_counter() inference_ms = (end_inf - start_inf) * 1000 detections = [] for i, box in enumerate(results[0].boxes): class_id = int(box.cls) class_name = names.get(class_id, "Unknown") detections.append({ "bunch_id": i + 1, "class": class_name, "confidence": round(float(box.conf), 2), "is_health_alert": class_name in ["Abnormal", "Empty_Bunch"], "box": box.xyxy.tolist()[0] }) # Extract snippet from results (simulating raw output) raw_snippet = results[0].boxes.data[:5].tolist() if len(results[0].boxes) > 0 else [] return detections, raw_snippet, inference_ms model_manager = ModelManager(onnx_path='best.onnx', pt_path='best.pt') # Global state for the confidence threshold current_conf = 0.25 # Initialize DDD Components vision_service = VertexVisionService( project_id=os.getenv("PROJECT_ID", "your-project-id"), location=os.getenv("LOCATION", "us-central1") ) repo = MongoPalmOilRepository( uri=os.getenv("MONGO_URI"), db_name=os.getenv("DB_NAME", "palm_oil_db"), collection_name=os.getenv("COLLECTION_NAME", "ffb_records") ) db_connected = False try: print("Connecting to MongoDB Atlas...") repo.ensure_indexes() db_connected = True print("MongoDB Atlas Connected.") except Exception as e: print(f"Warning: Could not connect to MongoDB Atlas (Timeout). Cloud archival will be disabled. Details: {e}") analyze_use_case = AnalyzeBunchUseCase(vision_service, repo) analyze_batch_use_case = AnalyzeBatchUseCase(vision_service, repo) search_use_case = SearchSimilarUseCase(vision_service, repo) @app.get("/get_confidence") # ... (rest of the code remains same until analyze) async def get_confidence(): """Returns the current confidence threshold used by the model.""" return { "status": "success", "current_confidence": current_conf, "model_version": "best.pt" } @app.post("/set_confidence") async def set_confidence(threshold: float = Body(..., embed=True)): """Updates the confidence threshold globally.""" global current_conf if 0.0 <= threshold <= 1.0: current_conf = threshold return {"status": "success", "new_confidence": current_conf} else: return {"status": "error", "message": "Threshold must be between 0.0 and 1.0"} @app.post("/analyze") async def analyze_with_health_metrics(file: UploadFile = File(...), model_type: str = Form("onnx")): """Industry-grade analysis with health metrics and summary.""" image_bytes = await file.read() img = Image.open(io.BytesIO(image_bytes)) import time start_total = time.perf_counter() # Select Inference Engine if model_type == "pytorch": detections, raw_sample, inference_ms = model_manager.run_pytorch_inference(img, current_conf, "pytorch") elif model_type == "yolov8_sawit": detections, raw_sample, inference_ms = model_manager.run_pytorch_inference(img, current_conf, "yolov8_sawit") else: detections, raw_sample, inference_ms = model_manager.run_onnx_inference(img, current_conf) end_total = time.perf_counter() total_ms = (end_total - start_total) * 1000 processing_ms = total_ms - inference_ms # Initialize summary active_names = model_manager.class_names if model_type != "yolov8_sawit" else model_manager.benchmark_class_names summary = {name: 0 for name in active_names.values()} for det in detections: summary[det['class']] += 1 # AUTO-ARCHIVE to Local History Vault unique_id = uuid.uuid4().hex archive_filename = f"{unique_id}_{file.filename}" archive_path = os.path.join(ARCHIVE_DIR, archive_filename) # Save image copy with open(archive_path, "wb") as buffer: buffer.write(image_bytes) # Save to SQLite conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute("INSERT INTO history (filename, archive_path, detections, summary, engine, inference_ms, processing_ms, raw_tensor) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", (file.filename, archive_path, json.dumps(detections), json.dumps(summary), model_type, inference_ms, processing_ms, json.dumps(raw_sample))) conn.commit() conn.close() return { "status": "success", "current_threshold": current_conf, "total_count": len(detections), "industrial_summary": summary, "detections": detections, "inference_ms": inference_ms, "processing_ms": processing_ms, "raw_array_sample": raw_sample, "archive_id": unique_id } @app.post("/vectorize_and_store") async def vectorize_and_store(file: UploadFile = File(...), detection_data: str = Form(...)): """Cloud-dependent. Requires active billing.""" if not db_connected: return {"status": "error", "message": "Cloud Archival is currently unavailable (Database Offline)."} import json try: primary_detection = json.loads(detection_data) except Exception: return {"status": "error", "message": "Invalid detection_data format"} unique_id = uuid.uuid4().hex temp_path = f"temp_vec_{unique_id}_{file.filename}" # Reset file pointer since it might have been read (though here it's a new request) # Actually, in a new request, we read it for the first time. with open(temp_path, "wb") as buffer: shutil.copyfileobj(file.file, buffer) try: record_id = analyze_use_case.execute(temp_path, primary_detection) return { "status": "success", "record_id": record_id, "message": "FFB vectorized and archived successfully" } except RuntimeError as e: return {"status": "error", "message": str(e)} except Exception as e: return {"status": "error", "message": f"An unexpected error occurred: {str(e)}"} finally: if os.path.exists(temp_path): os.remove(temp_path) @app.post("/process_batch") async def process_batch( files: List[UploadFile] = File(...), model_type: str = Form("onnx"), metadata: str = Form("{}") # JSON string from Frontend ): batch_id = f"BATCH_{uuid.uuid4().hex[:8].upper()}" output_dir = os.path.join("batch_outputs", batch_id) os.makedirs(os.path.join(output_dir, "raw"), exist_ok=True) start_time = datetime.now() meta_dict = json.loads(metadata) batch_records = [] for file in files: unique_id = uuid.uuid4().hex[:6] filename = f"{unique_id}_{file.filename}" save_path = os.path.join(output_dir, "raw", filename) # 1. Save Raw Image to Bundle image_bytes = await file.read() with open(save_path, "wb") as f: f.write(image_bytes) # 2. Run Inference img = Image.open(io.BytesIO(image_bytes)) # Selection logic based on existing API pattern if model_type == "pytorch": detections, raw_sample, inf_ms = model_manager.run_pytorch_inference(img, current_conf, "pytorch") elif model_type == "yolov8_sawit": detections, raw_sample, inf_ms = model_manager.run_pytorch_inference(img, current_conf, "yolov8_sawit") else: detections, raw_sample, inf_ms = model_manager.run_onnx_inference(img, current_conf) # 3. Normalize Coordinates for the Contract # Downstream processes shouldn't care about your input resolution w, h = img.size normalized_dets = [] for d in detections: x1, y1, x2, y2 = d['box'] normalized_dets.append({ **d, "norm_box": [x1/w, y1/h, x2/w, y2/h] }) batch_records.append({ "image_id": unique_id, "filename": filename, "detections": normalized_dets, "inference_ms": inf_ms, "raw_tensor": raw_sample # Added for technical evidence/contract }) end_time = datetime.now() duration = (end_time - start_time).total_seconds() # 4. Generate the Summary (For Manifest and immediate UI feedback) active_names = model_manager.class_names if model_type != "yolov8_sawit" else model_manager.benchmark_class_names total_summary = {name: 0 for name in active_names.values()} for record in batch_records: for det in record['detections']: total_summary[det['class']] += 1 # 5. Generate the Manifest (The Contract) performance_metrics = { "start_time": start_time.isoformat(), "end_time": end_time.isoformat(), "duration_seconds": round(duration, 2) } manifest = { "job_id": batch_id, "timestamp": end_time.isoformat(), "source_context": meta_dict, "engine": { "name": "YOLO26", "type": model_type, "threshold": current_conf }, "performance": performance_metrics, # Added performance metrics "industrial_summary": total_summary, # Added for subscribers "inventory": batch_records } with open(os.path.join(output_dir, "manifest.json"), "w") as f: json.dump(manifest, f, indent=4) # Note: Maintaining compatibility with the frontend's expectation of 'industrial_summary' # and 'processed_count' for immediate UI feedback. return { "status": "success", "batch_id": batch_id, "bundle_path": output_dir, "processed_count": len(files), "total_count": sum(total_summary.values()), "industrial_summary": total_summary, "performance": performance_metrics, "record_ids": [r['image_id'] for r in batch_records], # Backward compatibility "manifest_preview": manifest, "detailed_results": [{"filename": r['filename'], "detection": d} for r in batch_records for d in r['detections']] # Backward compatibility } @app.post("/search_hybrid") async def search_hybrid( file: Optional[UploadFile] = File(None), text_query: Optional[str] = Form(None), limit: int = Form(3) ): """Hybrid Search: Supports Visual Similarity and Natural Language Search.""" if not db_connected: return {"status": "error", "message": "Semantic Search is currently unavailable (Database Offline)."} temp_path = None try: try: if file: unique_id = uuid.uuid4().hex temp_path = f"temp_search_{unique_id}_{file.filename}" with open(temp_path, "wb") as buffer: shutil.copyfileobj(file.file, buffer) results = search_use_case.execute(image_path=temp_path, limit=limit) elif text_query: results = search_use_case.execute(text_query=text_query, limit=limit) else: return {"status": "error", "message": "No search input provided"} return {"status": "success", "results": results} except RuntimeError as e: return {"status": "error", "message": f"Search unavailable: {str(e)}"} finally: if temp_path and os.path.exists(temp_path): os.remove(temp_path) @app.get("/get_image/{record_id}") async def get_image(record_id: str): """Retrieve the Base64 image data for a specific record.""" record = repo.get_by_id(record_id) if not record: return {"status": "error", "message": "Record not found"} return { "status": "success", "image_data": record.get("image_data") } @app.post("/save_to_history") async def save_to_history(file: UploadFile = File(...), detections: str = Form(...), summary: str = Form(...)): unique_id = uuid.uuid4().hex filename = f"{unique_id}_{file.filename}" archive_path = os.path.join(ARCHIVE_DIR, filename) with open(archive_path, "wb") as buffer: shutil.copyfileobj(file.file, buffer) conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute("INSERT INTO history (filename, archive_path, detections, summary, inference_ms, processing_ms, raw_tensor) VALUES (?, ?, ?, ?, ?, ?, ?)", (file.filename, archive_path, detections, summary, 0.0, 0.0, "")) conn.commit() conn.close() return {"status": "success", "message": "Saved to local vault"} @app.get("/get_history") async def get_history(): conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row cursor = conn.cursor() cursor.execute("SELECT * FROM history ORDER BY timestamp DESC") rows = [dict(row) for row in cursor.fetchall()] conn.close() return {"status": "success", "history": rows} @app.get("/get_model_info") async def get_model_info(model_type: str = "onnx"): """Returns metadata and capabilities for the specified model engine.""" if model_type in ["onnx", "pytorch"]: classes = list(model_manager.class_names.values()) description = "Standard YOLO26 Industrial Model." elif model_type == "yolov8_sawit": classes = list(model_manager.benchmark_class_names.values()) description = "YOLOv8-Sawit (Benchmark) - External Architecture." else: return {"status": "error", "message": "Unknown model type"} return { "status": "success", "model_type": model_type, "description": description, "detections_categories": classes } if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)