import streamlit as st import requests from ultralytics import YOLO import numpy as np from PIL import Image import io import base64 import pandas as pd import plotly.express as px import plotly.graph_objects as go import json import os from datetime import datetime from fpdf import FPDF # --- 1. Global Backend Check --- API_BASE_URL = "http://localhost:8000" def check_backend(): try: res = requests.get(f"{API_BASE_URL}/get_confidence", timeout=2) return res.status_code == 200 except: return False backend_active = check_backend() # LOCAL MODEL LOADING REMOVED (YOLO26 Clean Sweep) # UI now relies entirely on Backend API for NMS-Free inference. if not backend_active: st.error("⚠️ Backend API is offline!") st.info("Please start the backend server first (e.g., `python main.py`) to unlock AI features.") if st.button("🔄 Retry Connection"): st.rerun() st.stop() # Stops execution here, effectively disabling the app # --- 2. Main Page Config (Only rendered if backend is active) --- st.set_page_config(page_title="Palm Oil Ripeness AI (YOLO26)", layout="wide") st.title("🌴 Palm Oil FFB Management System") st.markdown("### Production-Ready AI Analysis & Archival") # --- Sidebar --- st.sidebar.header("Backend Controls") def update_confidence(): new_conf = st.session_state.conf_slider try: requests.post(f"{API_BASE_URL}/set_confidence", json={"threshold": new_conf}) st.toast(f"Threshold updated to {new_conf}") except: st.sidebar.error("Failed to update threshold") # We already know backend is up here response = requests.get(f"{API_BASE_URL}/get_confidence") current_conf = response.json().get("current_confidence", 0.25) st.sidebar.success(f"Connected to API") st.sidebar.info("Engine: YOLO26 NMS-Free (Inference: ~39ms)") # Synchronized Slider st.sidebar.slider( "Confidence Threshold", 0.1, 1.0, value=float(current_conf), key="conf_slider", on_change=update_confidence ) st.sidebar.markdown("---") st.sidebar.subheader("Inference Engine") engine_choice = st.sidebar.selectbox( "Select Model Engine", ["YOLO26 (ONNX - High Speed)", "YOLO26 (PyTorch - Native)"], index=0, help="ONNX is optimized for latency. PyTorch provides native object handling." ) model_type = "onnx" if "ONNX" in engine_choice else "pytorch" if model_type == "pytorch": st.sidebar.warning("PyTorch Engine: Higher Memory Usage") else: st.sidebar.info("ONNX Engine: ~39ms Latency") # Helper to reset results when files change def reset_single_results(): st.session_state.last_detection = None def reset_batch_results(): st.session_state.last_batch_results = None # MPOB Color Map for Overlays (Global for consistency) overlay_colors = { 'Ripe': '#22c55e', # Industrial Green 'Underripe': '#fbbf24', # Industrial Orange 'Unripe': '#3b82f6', # Industrial Blue 'Abnormal': '#dc2626', # Critical Red 'Empty_Bunch': '#64748b',# Waste Gray 'Overripe': '#7c2d12' # Dark Brown/Orange } def display_interactive_results(image, detections, key=None): """Renders image with interactive hover-boxes using Plotly.""" img_width, img_height = image.size fig = go.Figure() # Add the palm image as the background fig.add_layout_image( dict(source=image, x=0, y=img_height, sizex=img_width, sizey=img_height, sizing="stretch", opacity=1, layer="below", xref="x", yref="y") ) # Configure axes to match image dimensions fig.update_xaxes(showgrid=False, range=(0, img_width), zeroline=False, visible=False) fig.update_yaxes(showgrid=False, range=(0, img_height), zeroline=False, visible=False, scaleanchor="x") # Add interactive boxes for i, det in enumerate(detections): x1, y1, x2, y2 = det['box'] # Plotly y-axis is inverted relative to PIL, so we flip y y_top, y_bottom = img_height - y1, img_height - y2 color = overlay_colors.get(det['class'], "#ffeb3b") # The 'Hover' shape bunch_id = det.get('bunch_id', i+1) fig.add_trace(go.Scatter( x=[x1, x2, x2, x1, x1], y=[y_top, y_top, y_bottom, y_bottom, y_top], fill="toself", fillcolor=color, opacity=0.3, # Semi-transparent until hover mode='lines', line=dict(color=color, width=3), name=f"Bunch #{bunch_id}", text=f"ID: #{bunch_id}
Grade: {det['class']}
Score: {det['confidence']:.2f}
Alert: {det['is_health_alert']}", hoverinfo="text" )) fig.update_layout(width=800, height=600, margin=dict(l=0, r=0, b=0, t=0), showlegend=False) st.plotly_chart(fig, use_container_width=True, key=key) def annotate_image(image, detections): """Draws high-visibility boxes and background-shaded labels.""" from PIL import ImageDraw, ImageFont draw = ImageDraw.Draw(image) # Dynamic font size based on image resolution font_size = max(20, image.width // 40) try: font_path = "C:\\Windows\\Fonts\\arial.ttf" if os.path.exists(font_path): font = ImageFont.truetype(font_path, font_size) else: font = ImageFont.load_default() except: font = ImageFont.load_default() for det in detections: box = det['box'] # [x1, y1, x2, y2] cls = det['class'] conf = det['confidence'] bunch_id = det.get('bunch_id', '?') color = overlay_colors.get(cls, '#ffffff') # 1. Draw Bold Bounding Box draw.rectangle(box, outline=color, width=max(4, image.width // 200)) # 2. Draw Label Background (High Contrast) label = f"#{bunch_id} {cls} {conf:.2f}" try: # textbbox provides precise coordinates for background rectangle l, t, r, b = draw.textbbox((box[0], box[1] - font_size - 10), label, font=font) draw.rectangle([l-5, t-5, r+5, b+5], fill=color) draw.text((l, t), label, fill="white", font=font) except: # Fallback for basic text drawing draw.text((box[0], box[1] - 25), label, fill=color) return image def generate_batch_report(data, uploaded_files_map=None): """Generates a professional PDF report for batch results with visual evidence.""" from PIL import ImageDraw pdf = FPDF() pdf.add_page() pdf.set_font("Arial", "B", 16) pdf.cell(190, 10, "Palm Oil FFB Harvest Quality Report", ln=True, align="C") pdf.set_font("Arial", "", 12) pdf.cell(190, 10, f"Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", ln=True, align="C") pdf.ln(10) # 1. Summary Table pdf.set_font("Arial", "B", 14) pdf.cell(190, 10, "1. Batch Summary", ln=True) pdf.set_font("Arial", "", 12) summary = data.get('industrial_summary', {}) total_bunches = data.get('total_count', 0) pdf.cell(95, 10, "Metric", border=1) pdf.cell(95, 10, "Value", border=1, ln=True) pdf.cell(95, 10, "Total Bunches Detected", border=1) pdf.cell(95, 10, str(total_bunches), border=1, ln=True) for grade, count in summary.items(): if count > 0: pdf.cell(95, 10, f"Grade: {grade}", border=1) pdf.cell(95, 10, str(count), border=1, ln=True) pdf.ln(10) # 2. Strategic Insights pdf.set_font("Arial", "B", 14) pdf.cell(190, 10, "2. Strategic Yield Insights", ln=True) pdf.set_font("Arial", "", 12) unripe = summary.get('Unripe', 0) underripe = summary.get('Underripe', 0) loss = unripe + underripe if loss > 0: pdf.multi_cell(190, 10, f"WARNING: {loss} bunches were harvested before peak ripeness. " "This directly impacts the Oil Extraction Rate (OER) and results in potential yield loss.") else: pdf.multi_cell(190, 10, "EXCELLENT: All detected bunches meet prime ripeness standards. Harvest efficiency is 100%.") # Critical Alerts abnormal = summary.get('Abnormal', 0) empty = summary.get('Empty_Bunch', 0) if abnormal > 0 or empty > 0: pdf.ln(5) pdf.set_text_color(220, 0, 0) pdf.set_font("Arial", "B", 12) pdf.cell(190, 10, "CRITICAL HEALTH ALERTS:", ln=True) pdf.set_font("Arial", "", 12) if abnormal > 0: pdf.cell(190, 10, f"- {abnormal} Abnormal Bunches detected (Requires immediate field inspection).", ln=True) if empty > 0: pdf.cell(190, 10, f"- {empty} Empty Bunches detected (Waste reduction needed).", ln=True) pdf.set_text_color(0, 0, 0) # 3. Visual Evidence Section if 'detailed_results' in data and uploaded_files_map: pdf.add_page() pdf.set_font("Arial", "B", 14) pdf.cell(190, 10, "3. Visual Batch Evidence (AI Overlay)", ln=True) pdf.ln(5) # Group detections by filename results_by_file = {} for res in data['detailed_results']: fname = res['filename'] if fname not in results_by_file: results_by_file[fname] = [] results_by_file[fname].append(res['detection']) for fname, detections in results_by_file.items(): if fname in uploaded_files_map: img_bytes = uploaded_files_map[fname] img = Image.open(io.BytesIO(img_bytes)).convert("RGB") draw = ImageDraw.Draw(img) # Drawing annotated boxes for PDF using high-visibility utility annotate_image(img, detections) # Save to temp file for PDF temp_img_path = f"temp_report_{fname}" img.save(temp_img_path) # Check if we need a new page based on image height (rough estimate) if pdf.get_y() > 200: pdf.add_page() pdf.image(temp_img_path, x=10, w=150) pdf.set_font("Arial", "I", 10) pdf.cell(190, 10, f"Annotated: {fname}", ln=True) pdf.ln(5) os.remove(temp_img_path) # Footer pdf.set_y(-15) pdf.set_font("Arial", "I", 8) pdf.cell(190, 10, "Generated by Palm Oil AI Desktop PoC - YOLO26 Engine", align="C") return pdf.output(dest='S') # --- Tabs --- tab1, tab2, tab3, tab4 = st.tabs(["Single Analysis", "Batch Processing", "Similarity Search", "History Vault"]) # --- Tab 1: Single Analysis --- with tab1: st.subheader("Analyze Single Bunch") uploaded_file = st.file_uploader( "Upload a bunch image...", type=["jpg", "jpeg", "png"], key="single", on_change=reset_single_results ) if uploaded_file: # State initialization if "last_detection" not in st.session_state: st.session_state.last_detection = None # 1. Auto-Detection Trigger if uploaded_file and st.session_state.last_detection is None: with st.spinner(f"Processing with {model_type.upper()} Engine..."): files = {"file": (uploaded_file.name, uploaded_file.getvalue(), uploaded_file.type)} payload = {"model_type": model_type} res = requests.post(f"{API_BASE_URL}/analyze", files=files, data=payload) if res.status_code == 200: st.session_state.last_detection = res.json() st.rerun() # Refresh to show results immediately else: st.error(f"Detection Failed: {res.text}") # 2. Results Layout if st.session_state.last_detection: st.divider() # PRIMARY ANNOTATED VIEW st.write("### 🔍 AI Analytical View") data = st.session_state.last_detection img = Image.open(uploaded_file).convert("RGB") display_interactive_results(img, data['detections'], key="main_viewer") # Visual Legend st.write("#### 🎨 Ripeness Legend") l_cols = st.columns(len(overlay_colors)) for i, (grade, color) in enumerate(overlay_colors.items()): with l_cols[i]: st.markdown(f'
{grade}
', unsafe_allow_html=True) st.divider() st.write("### 📈 Manager's Dashboard") m_col1, m_col2, m_col3, m_col4 = st.columns(4) with m_col1: st.metric("Total Bunches", data.get('total_count', 0)) with m_col2: st.metric("Healthy (Ripe)", data['industrial_summary'].get('Ripe', 0)) with m_col3: abnormal = data['industrial_summary'].get('Abnormal', 0) st.metric("Abnormal Alerts", abnormal, delta=-abnormal, delta_color="inverse") with m_col4: st.metric("Inference Speed", f"{data.get('inference_ms', 0):.1f} ms") col1, col2 = st.columns([1.5, 1]) # Keep original col structure for summary below with col1: with st.expander("🛠️ Technical Evidence: Raw Output Tensor", expanded=False): st.write("First 5 detections from raw output tensor:") st.json(data.get('raw_array_sample', [])) with st.container(border=True): st.write("### 🏷️ Detection Results") if not data['detections']: st.warning("No Fresh Fruit Bunches detected.") else: for det in data['detections']: st.info(f"### Bunch #{det['bunch_id']}: {det['class']} ({det['confidence']:.2%})") st.write("### 📊 Harvest Quality Mix") # Convert industrial_summary dictionary to a DataFrame for charting summary_df = pd.DataFrame( list(data['industrial_summary'].items()), columns=['Grade', 'Count'] ) # Filter out classes with 0 count for a cleaner chart summary_df = summary_df[summary_df['Count'] > 0] if not summary_df.empty: # Create a Pie Chart to show the proportion of each grade fig = px.pie(summary_df, values='Count', names='Grade', color='Grade', color_discrete_map={ 'Ripe': '#22c55e', # Industrial Green 'Underripe': '#fbbf24', # Industrial Orange 'Unripe': '#3b82f6', # Industrial Blue 'Abnormal': '#dc2626', # Critical Red 'Empty_Bunch': '#64748b' # Waste Gray }, hole=0.4) fig.update_layout(margin=dict(t=0, b=0, l=0, r=0), height=300) st.plotly_chart(fig, width='stretch', key="single_pie") # 💡 Strategic R&D Insight: Harvest Efficiency st.write("---") st.write("#### 💡 Strategic R&D Insight") unripe_count = data['industrial_summary'].get('Unripe', 0) underripe_count = data['industrial_summary'].get('Underripe', 0) total_non_prime = unripe_count + underripe_count st.write(f"🌑 **Unripe (Mentah):** {unripe_count}") st.write(f"🌗 **Underripe (Kurang Masak):** {underripe_count}") if total_non_prime > 0: st.warning(f"🚨 **Potential Yield Loss:** {total_non_prime} bunches harvested too early. This will reduce OER (Oil Extraction Rate).") else: st.success("✅ **Harvest Efficiency:** 100% Prime Ripeness detected.") # High-Priority Health Alert if data['industrial_summary'].get('Abnormal', 0) > 0: st.error(f"🚨 CRITICAL: {data['industrial_summary']['Abnormal']} Abnormal Bunches Detected!") if data['industrial_summary'].get('Empty_Bunch', 0) > 0: st.warning(f"⚠️ ALERT: {data['industrial_summary']['Empty_Bunch']} Empty Bunches Detected.") # 3. Cloud Actions (Only if detections found) st.write("---") st.write("#### ✨ Cloud Archive") if st.button("🚀 Save to Atlas (Vectorize)", width='stretch'): with st.spinner("Archiving..."): import json primary_det = data['detections'][0] payload = {"detection_data": json.dumps(primary_det)} files_cloud = {"file": (uploaded_file.name, uploaded_file.getvalue(), uploaded_file.type)} res_cloud = requests.post(f"{API_BASE_URL}/vectorize_and_store", files=files_cloud, data=payload) if res_cloud.status_code == 200: res_json = res_cloud.json() if res_json["status"] == "success": st.success(f"Archived! ID: `{res_json['record_id'][:8]}...`") else: st.error(f"Cloud Error: {res_json['message']}") else: st.error("Failed to connect to cloud service") if st.button("🚩 Flag Misclassification", width='stretch', type="secondary"): # Save to local feedback folder timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") feedback_id = f"fb_{timestamp}" img_path = f"feedback/{feedback_id}.jpg" json_path = f"feedback/{feedback_id}.json" # Save image Image.open(uploaded_file).save(img_path) # Save metadata feedback_data = { "original_filename": uploaded_file.name, "timestamp": timestamp, "detections": data['detections'], "threshold_used": data['current_threshold'] } with open(json_path, "w") as f: json.dump(feedback_data, f, indent=4) st.toast("✅ Feedback saved to local vault!", icon="🚩") if st.button("💾 Local History Vault (Auto-Saved)", width='stretch', type="secondary", disabled=True): pass st.caption("✅ This analysis was automatically archived to the local vault.") # --- Tab 2: Batch Processing --- with tab2: st.subheader("Bulk Analysis") # 1. Initialize Session State if "batch_uploader_key" not in st.session_state: st.session_state.batch_uploader_key = 0 if "last_batch_results" not in st.session_state: st.session_state.last_batch_results = None # 2. Display Persisted Results (if any) if st.session_state.last_batch_results: res_data = st.session_state.last_batch_results with st.container(border=True): st.success(f"✅ Successfully processed {res_data['processed_count']} images.") # Batch Summary Dashboard st.write("### 📈 Batch Quality Overview") batch_summary = res_data.get('industrial_summary', {}) if batch_summary: sum_df = pd.DataFrame(list(batch_summary.items()), columns=['Grade', 'Count']) sum_df = sum_df[sum_df['Count'] > 0] b_col1, b_col2 = st.columns([1, 1]) with b_col1: st.dataframe(sum_df, hide_index=True, width='stretch') with b_col2: if not sum_df.empty: fig_batch = px.bar(sum_df, x='Grade', y='Count', color='Grade', color_discrete_map={ 'Ripe': '#22c55e', 'Underripe': '#fbbf24', 'Unripe': '#3b82f6', 'Abnormal': '#dc2626', 'Empty_Bunch': '#64748b' }) fig_batch.update_layout(margin=dict(t=0, b=0, l=0, r=0), height=200, showlegend=False) st.plotly_chart(fig_batch, width='stretch', key="batch_bar") if batch_summary.get('Abnormal', 0) > 0: st.error(f"🚨 BATCH CRITICAL: {batch_summary['Abnormal']} Abnormal Bunches found in this batch!") st.write("Generated Record IDs:") st.code(res_data['record_ids']) # --- 4. Batch Evidence Gallery --- st.write("### 🖼️ Detailed Detection Evidence") if 'detailed_results' in res_data: # Group results by filename for gallery gallery_map = {} for res in res_data['detailed_results']: fname = res['filename'] if fname not in gallery_map: gallery_map[fname] = [] gallery_map[fname].append(res['detection']) # Show images with overlays using consistent utility for up_file in uploaded_files: if up_file.name in gallery_map: with st.container(border=True): g_img = Image.open(up_file).convert("RGB") g_annotated = annotate_image(g_img, gallery_map[up_file.name]) st.image(g_annotated, caption=f"Evidence: {up_file.name}", use_container_width=True) # PDF Export Button (Pass images map) files_map = {f.name: f.getvalue() for f in uploaded_files} pdf_bytes = generate_batch_report(res_data, files_map) st.download_button( label="📄 Download Executive Batch Report (PDF)", data=pdf_bytes, file_name=f"PalmOil_BatchReport_{datetime.now().strftime('%Y%m%d_%H%M%S')}.pdf", mime="application/pdf", width='stretch' ) if st.button("Clear Results & Start New Batch", width='stretch'): st.session_state.last_batch_results = None st.rerun() st.divider() # 3. Uploader UI col_batch1, col_batch2 = st.columns([4, 1]) with col_batch1: uploaded_files = st.file_uploader( "Upload multiple images...", type=["jpg", "jpeg", "png"], accept_multiple_files=True, key=f"batch_{st.session_state.batch_uploader_key}", on_change=reset_batch_results ) with col_batch2: st.write("##") # Alignment if st.session_state.last_batch_results is None and uploaded_files: if st.button("🔍 Process Batch", type="primary", width='stretch'): with st.spinner(f"Analyzing {len(uploaded_files)} images with {model_type.upper()}..."): files = [("files", (f.name, f.getvalue(), f.type)) for f in uploaded_files] payload = {"model_type": model_type} res = requests.post(f"{API_BASE_URL}/process_batch", files=files, data=payload) if res.status_code == 200: data = res.json() if data["status"] == "success": st.session_state.last_batch_results = data st.session_state.batch_uploader_key += 1 st.rerun() elif data["status"] == "partial_success": st.warning(data["message"]) st.info(f"Successfully detected {data['detections_count']} bunches locally.") else: st.error(f"Batch Error: {data['message']}") else: st.error(f"Batch Processing Failed: {res.text}") if st.button("🗑️ Reset Uploader"): st.session_state.batch_uploader_key += 1 st.session_state.last_batch_results = None st.rerun() # --- Tab 3: Similarity Search --- with tab3: st.subheader("Hybrid Semantic Search") st.markdown("Search records by either **Image Similarity** or **Natural Language Query**.") with st.form("hybrid_search_form"): col_input1, col_input2 = st.columns(2) with col_input1: search_file = st.file_uploader("Option A: Search Image...", type=["jpg", "jpeg", "png"], key="search") with col_input2: text_query = st.text_input("Option B: Natural Language Query", placeholder="e.g., 'ripe bunches with dark spots' or 'unripe fruit'") top_k = st.slider("Results Limit (Top K)", 1, 20, 3) submit_search = st.form_submit_button("Run Semantic Search") if submit_search: if not search_file and not text_query: st.warning("Please provide either an image or a text query.") else: with st.spinner("Searching Vector Index..."): payload = {"limit": top_k} # If an image is uploaded, it takes precedence for visual search if search_file: files = {"file": (search_file.name, search_file.getvalue(), search_file.type)} # Pass top_k as part of the data res = requests.post(f"{API_BASE_URL}/search_hybrid", files=files, data=payload) # Otherwise, use text query elif text_query: payload["text_query"] = text_query # Send as form-data (data=) to match FastAPI's Form(None) res = requests.post(f"{API_BASE_URL}/search_hybrid", data=payload) if res.status_code == 200: results = res.json().get("results", []) if not results: st.warning("No similar records found.") else: st.success(f"Found {len(results)} matches.") for item in results: with st.container(border=True): c1, c2 = st.columns([1, 2]) # Fetch the image for this result rec_id = item["_id"] img_res = requests.get(f"{API_BASE_URL}/get_image/{rec_id}") with c1: if img_res.status_code == 200: img_b64 = img_res.json().get("image_data") if img_b64: st.image(base64.b64decode(img_b64), width=250) else: st.write("No image data found.") else: st.write("Failed to load image.") with c2: st.write(f"**Class:** {item['ripeness_class']}") st.write(f"**Similarity Score:** {item['score']:.4f}") st.write(f"**Timestamp:** {item['timestamp']}") st.write(f"**ID:** `{rec_id}`") else: st.error(f"Search failed: {res.text}") # --- Tab 4: History Vault --- with tab4: st.subheader("📜 Local History Vault") if "selected_history_id" not in st.session_state: st.session_state.selected_history_id = None try: res = requests.get(f"{API_BASE_URL}/get_history") if res.status_code == 200: history_data = res.json().get("history", []) if not history_data: st.info("No saved records found.") else: if st.session_state.selected_history_id is None: # ListView Mode st.write("### 📋 Record List") df_history = pd.DataFrame(history_data)[['id', 'filename', 'timestamp', 'inference_ms']] st.dataframe(df_history, hide_index=True, use_container_width=True) id_to_select = st.number_input("Enter Record ID to view details:", min_value=int(df_history['id'].min()), max_value=int(df_history['id'].max()), step=1) if st.button("Deep Dive Analysis", type="primary"): st.session_state.selected_history_id = id_to_select st.rerun() else: # Detail View Mode record = next((item for item in history_data if item["id"] == st.session_state.selected_history_id), None) if not record: st.error("Record not found.") if st.button("Back to List"): st.session_state.selected_history_id = None st.rerun() else: if st.button("⬅️ Back to History List"): st.session_state.selected_history_id = None st.rerun() st.divider() st.write(f"## 🔍 Deep Dive: Record #{record['id']} ({record['filename']})") detections = json.loads(record['detections']) summary = json.loads(record['summary']) # Metrics Row h_col1, h_col2, h_col3, h_col4 = st.columns(4) with h_col1: st.metric("Total Bunches", sum(summary.values())) with h_col2: st.metric("Healthy (Ripe)", summary.get('Ripe', 0)) with h_col3: st.metric("Abnormal Alerts", summary.get('Abnormal', 0)) with h_col4: st.metric("Inference Speed", f"{record.get('inference_ms', 0) or 0:.1f} ms") # Image View if os.path.exists(record['archive_path']): with open(record['archive_path'], "rb") as f: hist_img = Image.open(f).convert("RGB") display_interactive_results(hist_img, detections, key=f"hist_{record['id']}") else: st.error(f"Archive file not found: {record['archive_path']}") # Technical Evidence Expander with st.expander("🛠️ Technical Evidence: Raw Output Tensor"): raw_data = record.get('raw_array_sample') if raw_data: try: st.json(json.loads(raw_data)) except: st.text(raw_data) else: st.info("No raw tensor data available for this record.") else: st.error(f"Failed to fetch history: {res.text}") except Exception as e: st.error(f"Error loading history: {str(e)}")