| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677 |
- import streamlit as st
- import requests
- from ultralytics import YOLO
- import numpy as np
- from PIL import Image
- import io
- import base64
- import pandas as pd
- import plotly.express as px
- import plotly.graph_objects as go
- import json
- import os
- from datetime import datetime
- from fpdf import FPDF
- # --- 1. Global Backend Check ---
- API_BASE_URL = "http://localhost:8000"
- def check_backend():
- try:
- res = requests.get(f"{API_BASE_URL}/get_confidence", timeout=2)
- return res.status_code == 200
- except:
- return False
- backend_active = check_backend()
- # LOCAL MODEL LOADING REMOVED (YOLO26 Clean Sweep)
- # UI now relies entirely on Backend API for NMS-Free inference.
- if not backend_active:
- st.error("⚠️ Backend API is offline!")
- st.info("Please start the backend server first (e.g., `python main.py`) to unlock AI features.")
- if st.button("🔄 Retry Connection"):
- st.rerun()
- st.stop() # Stops execution here, effectively disabling the app
- # --- 2. Main Page Config (Only rendered if backend is active) ---
- st.set_page_config(page_title="Palm Oil Ripeness AI (YOLO26)", layout="wide")
- st.title("🌴 Palm Oil FFB Management System")
- st.markdown("### Production-Ready AI Analysis & Archival")
- # --- Sidebar ---
- st.sidebar.header("Backend Controls")
- def update_confidence():
- new_conf = st.session_state.conf_slider
- try:
- requests.post(f"{API_BASE_URL}/set_confidence", json={"threshold": new_conf})
- st.toast(f"Threshold updated to {new_conf}")
- except:
- st.sidebar.error("Failed to update threshold")
- # We already know backend is up here
- response = requests.get(f"{API_BASE_URL}/get_confidence")
- current_conf = response.json().get("current_confidence", 0.25)
- st.sidebar.success(f"Connected to API")
- st.sidebar.info("Engine: YOLO26 NMS-Free (Inference: ~39ms)")
- # Synchronized Slider
- st.sidebar.slider(
- "Confidence Threshold",
- 0.1, 1.0,
- value=float(current_conf),
- key="conf_slider",
- on_change=update_confidence
- )
- st.sidebar.markdown("---")
- st.sidebar.subheader("Inference Engine")
- engine_choice = st.sidebar.selectbox(
- "Select Model Engine",
- ["YOLO26 (ONNX - High Speed)", "YOLO26 (PyTorch - Native)"],
- index=0,
- help="ONNX is optimized for latency. PyTorch provides native object handling."
- )
- model_type = "onnx" if "ONNX" in engine_choice else "pytorch"
- if model_type == "pytorch":
- st.sidebar.warning("PyTorch Engine: Higher Memory Usage")
- else:
- st.sidebar.info("ONNX Engine: ~39ms Latency")
- # Helper to reset results when files change
- def reset_single_results():
- st.session_state.last_detection = None
- def reset_batch_results():
- st.session_state.last_batch_results = None
- # MPOB Color Map for Overlays (Global for consistency)
- overlay_colors = {
- 'Ripe': '#22c55e', # Industrial Green
- 'Underripe': '#fbbf24', # Industrial Orange
- 'Unripe': '#3b82f6', # Industrial Blue
- 'Abnormal': '#dc2626', # Critical Red
- 'Empty_Bunch': '#64748b',# Waste Gray
- 'Overripe': '#7c2d12' # Dark Brown/Orange
- }
- def display_interactive_results(image, detections, key=None):
- """Renders image with interactive hover-boxes using Plotly."""
- img_width, img_height = image.size
- fig = go.Figure()
- # Add the palm image as the background
- fig.add_layout_image(
- dict(source=image, x=0, y=img_height, sizex=img_width, sizey=img_height,
- sizing="stretch", opacity=1, layer="below", xref="x", yref="y")
- )
- # Configure axes to match image dimensions
- fig.update_xaxes(showgrid=False, range=(0, img_width), zeroline=False, visible=False)
- fig.update_yaxes(showgrid=False, range=(0, img_height), zeroline=False, visible=False, scaleanchor="x")
- # Add interactive boxes
- for i, det in enumerate(detections):
- x1, y1, x2, y2 = det['box']
- # Plotly y-axis is inverted relative to PIL, so we flip y
- y_top, y_bottom = img_height - y1, img_height - y2
- color = overlay_colors.get(det['class'], "#ffeb3b")
- # The 'Hover' shape
- bunch_id = det.get('bunch_id', i+1)
- fig.add_trace(go.Scatter(
- x=[x1, x2, x2, x1, x1],
- y=[y_top, y_top, y_bottom, y_bottom, y_top],
- fill="toself",
- fillcolor=color,
- opacity=0.3, # Semi-transparent until hover
- mode='lines',
- line=dict(color=color, width=3),
- name=f"Bunch #{bunch_id}",
- text=f"<b>ID: #{bunch_id}</b><br>Grade: {det['class']}<br>Score: {det['confidence']:.2f}<br>Alert: {det['is_health_alert']}",
- hoverinfo="text"
- ))
- fig.update_layout(width=800, height=600, margin=dict(l=0, r=0, b=0, t=0), showlegend=False)
- st.plotly_chart(fig, use_container_width=True, key=key)
- def annotate_image(image, detections):
- """Draws high-visibility boxes and background-shaded labels."""
- from PIL import ImageDraw, ImageFont
- draw = ImageDraw.Draw(image)
- # Dynamic font size based on image resolution
- font_size = max(20, image.width // 40)
- try:
- font_path = "C:\\Windows\\Fonts\\arial.ttf"
- if os.path.exists(font_path):
- font = ImageFont.truetype(font_path, font_size)
- else:
- font = ImageFont.load_default()
- except:
- font = ImageFont.load_default()
- for det in detections:
- box = det['box'] # [x1, y1, x2, y2]
- cls = det['class']
- conf = det['confidence']
- bunch_id = det.get('bunch_id', '?')
- color = overlay_colors.get(cls, '#ffffff')
- # 1. Draw Bold Bounding Box
- draw.rectangle(box, outline=color, width=max(4, image.width // 200))
- # 2. Draw Label Background (High Contrast)
- label = f"#{bunch_id} {cls} {conf:.2f}"
- try:
- # textbbox provides precise coordinates for background rectangle
- l, t, r, b = draw.textbbox((box[0], box[1] - font_size - 10), label, font=font)
- draw.rectangle([l-5, t-5, r+5, b+5], fill=color)
- draw.text((l, t), label, fill="white", font=font)
- except:
- # Fallback for basic text drawing
- draw.text((box[0], box[1] - 25), label, fill=color)
-
- return image
- def generate_batch_report(data, uploaded_files_map=None):
- """Generates a professional PDF report for batch results with visual evidence."""
- from PIL import ImageDraw
- pdf = FPDF()
- pdf.add_page()
- pdf.set_font("Arial", "B", 16)
- pdf.cell(190, 10, "Palm Oil FFB Harvest Quality Report", ln=True, align="C")
- pdf.set_font("Arial", "", 12)
- pdf.cell(190, 10, f"Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", ln=True, align="C")
- pdf.ln(10)
- # 1. Summary Table
- pdf.set_font("Arial", "B", 14)
- pdf.cell(190, 10, "1. Batch Summary", ln=True)
- pdf.set_font("Arial", "", 12)
-
- summary = data.get('industrial_summary', {})
- total_bunches = data.get('total_count', 0)
- pdf.cell(95, 10, "Metric", border=1)
- pdf.cell(95, 10, "Value", border=1, ln=True)
-
- pdf.cell(95, 10, "Total Bunches Detected", border=1)
- pdf.cell(95, 10, str(total_bunches), border=1, ln=True)
-
- for grade, count in summary.items():
- if count > 0:
- pdf.cell(95, 10, f"Grade: {grade}", border=1)
- pdf.cell(95, 10, str(count), border=1, ln=True)
-
- pdf.ln(10)
- # 2. Strategic Insights
- pdf.set_font("Arial", "B", 14)
- pdf.cell(190, 10, "2. Strategic Yield Insights", ln=True)
- pdf.set_font("Arial", "", 12)
-
- unripe = summary.get('Unripe', 0)
- underripe = summary.get('Underripe', 0)
- loss = unripe + underripe
-
- if loss > 0:
- pdf.multi_cell(190, 10, f"WARNING: {loss} bunches were harvested before peak ripeness. "
- "This directly impacts the Oil Extraction Rate (OER) and results in potential yield loss.")
- else:
- pdf.multi_cell(190, 10, "EXCELLENT: All detected bunches meet prime ripeness standards. Harvest efficiency is 100%.")
- # Critical Alerts
- abnormal = summary.get('Abnormal', 0)
- empty = summary.get('Empty_Bunch', 0)
- if abnormal > 0 or empty > 0:
- pdf.ln(5)
- pdf.set_text_color(220, 0, 0)
- pdf.set_font("Arial", "B", 12)
- pdf.cell(190, 10, "CRITICAL HEALTH ALERTS:", ln=True)
- pdf.set_font("Arial", "", 12)
- if abnormal > 0:
- pdf.cell(190, 10, f"- {abnormal} Abnormal Bunches detected (Requires immediate field inspection).", ln=True)
- if empty > 0:
- pdf.cell(190, 10, f"- {empty} Empty Bunches detected (Waste reduction needed).", ln=True)
- pdf.set_text_color(0, 0, 0)
- # 3. Visual Evidence Section
- if 'detailed_results' in data and uploaded_files_map:
- pdf.add_page()
- pdf.set_font("Arial", "B", 14)
- pdf.cell(190, 10, "3. Visual Batch Evidence (AI Overlay)", ln=True)
- pdf.ln(5)
-
- # Group detections by filename
- results_by_file = {}
- for res in data['detailed_results']:
- fname = res['filename']
- if fname not in results_by_file:
- results_by_file[fname] = []
- results_by_file[fname].append(res['detection'])
-
- for fname, detections in results_by_file.items():
- if fname in uploaded_files_map:
- img_bytes = uploaded_files_map[fname]
- img = Image.open(io.BytesIO(img_bytes)).convert("RGB")
- draw = ImageDraw.Draw(img)
- # Drawing annotated boxes for PDF using high-visibility utility
- annotate_image(img, detections)
-
- # Save to temp file for PDF
- temp_img_path = f"temp_report_{fname}"
- img.save(temp_img_path)
-
- # Check if we need a new page based on image height (rough estimate)
- if pdf.get_y() > 200:
- pdf.add_page()
-
- pdf.image(temp_img_path, x=10, w=150)
- pdf.set_font("Arial", "I", 10)
- pdf.cell(190, 10, f"Annotated: {fname}", ln=True)
- pdf.ln(5)
- os.remove(temp_img_path)
- # Footer
- pdf.set_y(-15)
- pdf.set_font("Arial", "I", 8)
- pdf.cell(190, 10, "Generated by Palm Oil AI Desktop PoC - YOLO26 Engine", align="C")
-
- return pdf.output(dest='S')
- # --- Tabs ---
- tab1, tab2, tab3, tab4 = st.tabs(["Single Analysis", "Batch Processing", "Similarity Search", "History Vault"])
- # --- Tab 1: Single Analysis ---
- with tab1:
- st.subheader("Analyze Single Bunch")
- uploaded_file = st.file_uploader(
- "Upload a bunch image...",
- type=["jpg", "jpeg", "png"],
- key="single",
- on_change=reset_single_results
- )
-
- if uploaded_file:
- # State initialization
- if "last_detection" not in st.session_state:
- st.session_state.last_detection = None
- # 1. Auto-Detection Trigger
- if uploaded_file and st.session_state.last_detection is None:
- with st.spinner(f"Processing with {model_type.upper()} Engine..."):
- files = {"file": (uploaded_file.name, uploaded_file.getvalue(), uploaded_file.type)}
- payload = {"model_type": model_type}
- res = requests.post(f"{API_BASE_URL}/analyze", files=files, data=payload)
- if res.status_code == 200:
- st.session_state.last_detection = res.json()
- st.rerun() # Refresh to show results immediately
- else:
- st.error(f"Detection Failed: {res.text}")
- # 2. Results Layout
- if st.session_state.last_detection:
- st.divider()
-
- # PRIMARY ANNOTATED VIEW
- st.write("### 🔍 AI Analytical View")
- data = st.session_state.last_detection
- img = Image.open(uploaded_file).convert("RGB")
- display_interactive_results(img, data['detections'], key="main_viewer")
- # Visual Legend
- st.write("#### 🎨 Ripeness Legend")
- l_cols = st.columns(len(overlay_colors))
- for i, (grade, color) in enumerate(overlay_colors.items()):
- with l_cols[i]:
- st.markdown(f'<div style="background-color:{color}; padding:10px; border-radius:5px; text-align:center; color:white; font-weight:bold;">{grade}</div>', unsafe_allow_html=True)
- st.divider()
- st.write("### 📈 Manager's Dashboard")
- m_col1, m_col2, m_col3 = st.columns(3)
- with m_col1:
- st.metric("Total Bunches", data.get('total_count', 0))
- with m_col2:
- st.metric("Healthy (Ripe)", data['industrial_summary'].get('Ripe', 0))
- with m_col3:
- abnormal = data['industrial_summary'].get('Abnormal', 0)
- st.metric("Abnormal Alerts", abnormal, delta=-abnormal, delta_color="inverse")
- col1, col2 = st.columns([1.5, 1]) # Keep original col structure for summary below
-
- with col2:
- with st.container(border=True):
- st.write("### 🏷️ Detection Results")
- if not data['detections']:
- st.warning("No Fresh Fruit Bunches detected.")
- else:
- for det in data['detections']:
- st.info(f"### Bunch #{det['bunch_id']}: {det['class']} ({det['confidence']:.2%})")
-
- st.write("### 📊 Harvest Quality Mix")
- # Convert industrial_summary dictionary to a DataFrame for charting
- summary_df = pd.DataFrame(
- list(data['industrial_summary'].items()),
- columns=['Grade', 'Count']
- )
- # Filter out classes with 0 count for a cleaner chart
- summary_df = summary_df[summary_df['Count'] > 0]
- if not summary_df.empty:
- # Create a Pie Chart to show the proportion of each grade
- fig = px.pie(summary_df, values='Count', names='Grade',
- color='Grade',
- color_discrete_map={
- 'Ripe': '#22c55e', # Industrial Green
- 'Underripe': '#fbbf24', # Industrial Orange
- 'Unripe': '#3b82f6', # Industrial Blue
- 'Abnormal': '#dc2626', # Critical Red
- 'Empty_Bunch': '#64748b' # Waste Gray
- },
- hole=0.4)
- fig.update_layout(margin=dict(t=0, b=0, l=0, r=0), height=300)
- st.plotly_chart(fig, width='stretch', key="single_pie")
- # 💡 Strategic R&D Insight: Harvest Efficiency
- st.write("---")
- st.write("#### 💡 Strategic R&D Insight")
- unripe_count = data['industrial_summary'].get('Unripe', 0)
- underripe_count = data['industrial_summary'].get('Underripe', 0)
- total_non_prime = unripe_count + underripe_count
-
- st.write(f"🌑 **Unripe (Mentah):** {unripe_count}")
- st.write(f"🌗 **Underripe (Kurang Masak):** {underripe_count}")
-
- if total_non_prime > 0:
- st.warning(f"🚨 **Potential Yield Loss:** {total_non_prime} bunches harvested too early. This will reduce OER (Oil Extraction Rate).")
- else:
- st.success("✅ **Harvest Efficiency:** 100% Prime Ripeness detected.")
-
- # High-Priority Health Alert
- if data['industrial_summary'].get('Abnormal', 0) > 0:
- st.error(f"🚨 CRITICAL: {data['industrial_summary']['Abnormal']} Abnormal Bunches Detected!")
- if data['industrial_summary'].get('Empty_Bunch', 0) > 0:
- st.warning(f"⚠️ ALERT: {data['industrial_summary']['Empty_Bunch']} Empty Bunches Detected.")
-
- # 3. Cloud Actions (Only if detections found)
- st.write("---")
- st.write("#### ✨ Cloud Archive")
- if st.button("🚀 Save to Atlas (Vectorize)", width='stretch'):
- with st.spinner("Archiving..."):
- import json
- primary_det = data['detections'][0]
- payload = {"detection_data": json.dumps(primary_det)}
- files_cloud = {"file": (uploaded_file.name, uploaded_file.getvalue(), uploaded_file.type)}
-
- res_cloud = requests.post(f"{API_BASE_URL}/vectorize_and_store", files=files_cloud, data=payload)
-
- if res_cloud.status_code == 200:
- res_json = res_cloud.json()
- if res_json["status"] == "success":
- st.success(f"Archived! ID: `{res_json['record_id'][:8]}...`")
- else:
- st.error(f"Cloud Error: {res_json['message']}")
- else:
- st.error("Failed to connect to cloud service")
- if st.button("🚩 Flag Misclassification", width='stretch', type="secondary"):
- # Save to local feedback folder
- timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
- feedback_id = f"fb_{timestamp}"
- img_path = f"feedback/{feedback_id}.jpg"
- json_path = f"feedback/{feedback_id}.json"
-
- # Save image
- Image.open(uploaded_file).save(img_path)
-
- # Save metadata
- feedback_data = {
- "original_filename": uploaded_file.name,
- "timestamp": timestamp,
- "detections": data['detections'],
- "threshold_used": data['current_threshold']
- }
- with open(json_path, "w") as f:
- json.dump(feedback_data, f, indent=4)
-
- st.toast("✅ Feedback saved to local vault!", icon="🚩")
- if st.button("💾 Local History Vault (Auto-Saved)", width='stretch', type="secondary", disabled=True):
- pass
- st.caption("✅ This analysis was automatically archived to the local vault.")
- # --- Tab 2: Batch Processing ---
- with tab2:
- st.subheader("Bulk Analysis")
-
- # 1. Initialize Session State
- if "batch_uploader_key" not in st.session_state:
- st.session_state.batch_uploader_key = 0
- if "last_batch_results" not in st.session_state:
- st.session_state.last_batch_results = None
- # 2. Display Persisted Results (if any)
- if st.session_state.last_batch_results:
- res_data = st.session_state.last_batch_results
- with st.container(border=True):
- st.success(f"✅ Successfully processed {res_data['processed_count']} images.")
-
- # Batch Summary Dashboard
- st.write("### 📈 Batch Quality Overview")
- batch_summary = res_data.get('industrial_summary', {})
- if batch_summary:
- sum_df = pd.DataFrame(list(batch_summary.items()), columns=['Grade', 'Count'])
- sum_df = sum_df[sum_df['Count'] > 0]
-
- b_col1, b_col2 = st.columns([1, 1])
- with b_col1:
- st.dataframe(sum_df, hide_index=True, width='stretch')
- with b_col2:
- if not sum_df.empty:
- fig_batch = px.bar(sum_df, x='Grade', y='Count', color='Grade',
- color_discrete_map={
- 'Ripe': '#22c55e',
- 'Underripe': '#fbbf24',
- 'Unripe': '#3b82f6',
- 'Abnormal': '#dc2626',
- 'Empty_Bunch': '#64748b'
- })
- fig_batch.update_layout(margin=dict(t=0, b=0, l=0, r=0), height=200, showlegend=False)
- st.plotly_chart(fig_batch, width='stretch', key="batch_bar")
- if batch_summary.get('Abnormal', 0) > 0:
- st.error(f"🚨 BATCH CRITICAL: {batch_summary['Abnormal']} Abnormal Bunches found in this batch!")
- st.write("Generated Record IDs:")
- st.code(res_data['record_ids'])
-
- # --- 4. Batch Evidence Gallery ---
- st.write("### 🖼️ Detailed Detection Evidence")
- if 'detailed_results' in res_data:
- # Group results by filename for gallery
- gallery_map = {}
- for res in res_data['detailed_results']:
- fname = res['filename']
- if fname not in gallery_map:
- gallery_map[fname] = []
- gallery_map[fname].append(res['detection'])
-
- # Show images with overlays using consistent utility
- for up_file in uploaded_files:
- if up_file.name in gallery_map:
- with st.container(border=True):
- g_img = Image.open(up_file).convert("RGB")
- g_annotated = annotate_image(g_img, gallery_map[up_file.name])
- st.image(g_annotated, caption=f"Evidence: {up_file.name}", use_container_width=True)
- # PDF Export Button (Pass images map)
- files_map = {f.name: f.getvalue() for f in uploaded_files}
- pdf_bytes = generate_batch_report(res_data, files_map)
- st.download_button(
- label="📄 Download Executive Batch Report (PDF)",
- data=pdf_bytes,
- file_name=f"PalmOil_BatchReport_{datetime.now().strftime('%Y%m%d_%H%M%S')}.pdf",
- mime="application/pdf",
- width='stretch'
- )
- if st.button("Clear Results & Start New Batch", width='stretch'):
- st.session_state.last_batch_results = None
- st.rerun()
- st.divider()
- # 3. Uploader UI
- col_batch1, col_batch2 = st.columns([4, 1])
- with col_batch1:
- uploaded_files = st.file_uploader(
- "Upload multiple images...",
- type=["jpg", "jpeg", "png"],
- accept_multiple_files=True,
- key=f"batch_{st.session_state.batch_uploader_key}",
- on_change=reset_batch_results
- )
-
- with col_batch2:
- st.write("##") # Alignment
- if st.session_state.last_batch_results is None and uploaded_files:
- if st.button("🔍 Process Batch", type="primary", width='stretch'):
- with st.spinner(f"Analyzing {len(uploaded_files)} images with {model_type.upper()}..."):
- files = [("files", (f.name, f.getvalue(), f.type)) for f in uploaded_files]
- payload = {"model_type": model_type}
- res = requests.post(f"{API_BASE_URL}/process_batch", files=files, data=payload)
-
- if res.status_code == 200:
- data = res.json()
- if data["status"] == "success":
- st.session_state.last_batch_results = data
- st.session_state.batch_uploader_key += 1
- st.rerun()
- elif data["status"] == "partial_success":
- st.warning(data["message"])
- st.info(f"Successfully detected {data['detections_count']} bunches locally.")
- else:
- st.error(f"Batch Error: {data['message']}")
- else:
- st.error(f"Batch Processing Failed: {res.text}")
- if st.button("🗑️ Reset Uploader"):
- st.session_state.batch_uploader_key += 1
- st.session_state.last_batch_results = None
- st.rerun()
- # --- Tab 3: Similarity Search ---
- with tab3:
- st.subheader("Hybrid Semantic Search")
- st.markdown("Search records by either **Image Similarity** or **Natural Language Query**.")
-
- with st.form("hybrid_search_form"):
- col_input1, col_input2 = st.columns(2)
-
- with col_input1:
- search_file = st.file_uploader("Option A: Search Image...", type=["jpg", "jpeg", "png"], key="search")
-
- with col_input2:
- text_query = st.text_input("Option B: Natural Language Query", placeholder="e.g., 'ripe bunches with dark spots' or 'unripe fruit'")
- top_k = st.slider("Results Limit (Top K)", 1, 20, 3)
- submit_search = st.form_submit_button("Run Semantic Search")
- if submit_search:
- if not search_file and not text_query:
- st.warning("Please provide either an image or a text query.")
- else:
- with st.spinner("Searching Vector Index..."):
- payload = {"limit": top_k}
-
- # If an image is uploaded, it takes precedence for visual search
- if search_file:
- files = {"file": (search_file.name, search_file.getvalue(), search_file.type)}
- # Pass top_k as part of the data
- res = requests.post(f"{API_BASE_URL}/search_hybrid", files=files, data=payload)
- # Otherwise, use text query
- elif text_query:
- payload["text_query"] = text_query
- # Send as form-data (data=) to match FastAPI's Form(None)
- res = requests.post(f"{API_BASE_URL}/search_hybrid", data=payload)
-
- if res.status_code == 200:
- results = res.json().get("results", [])
- if not results:
- st.warning("No similar records found.")
- else:
- st.success(f"Found {len(results)} matches.")
- for item in results:
- with st.container(border=True):
- c1, c2 = st.columns([1, 2])
- # Fetch the image for this result
- rec_id = item["_id"]
- img_res = requests.get(f"{API_BASE_URL}/get_image/{rec_id}")
-
- with c1:
- if img_res.status_code == 200:
- img_b64 = img_res.json().get("image_data")
- if img_b64:
- st.image(base64.b64decode(img_b64), width=250)
- else:
- st.write("No image data found.")
- else:
- st.write("Failed to load image.")
- with c2:
- st.write(f"**Class:** {item['ripeness_class']}")
- st.write(f"**Similarity Score:** {item['score']:.4f}")
- st.write(f"**Timestamp:** {item['timestamp']}")
- st.write(f"**ID:** `{rec_id}`")
- else:
- st.error(f"Search failed: {res.text}")
- # --- Tab 4: History Vault ---
- with tab4:
- st.subheader("📜 Local History Vault")
- try:
- res = requests.get(f"{API_BASE_URL}/get_history")
- if res.status_code == 200:
- history_data = res.json().get("history", [])
- if not history_data:
- st.info("No saved records found.")
- else:
- # Selection table
- df_history = pd.DataFrame(history_data)[['id', 'filename', 'timestamp']]
- selected_id = st.selectbox("Select a record to review:", df_history['id'])
-
- if selected_id:
- record = next(item for item in history_data if item["id"] == selected_id)
- detections = json.loads(record['detections'])
-
- # Display Interactive Hover View
- if os.path.exists(record['archive_path']):
- with open(record['archive_path'], "rb") as f:
- hist_img = Image.open(f).convert("RGB")
- display_interactive_results(hist_img, detections, key=f"hist_{record['id']}")
-
- st.write("### 📈 Archived Summary")
- summary = json.loads(record['summary'])
- s_col1, s_col2, s_col3 = st.columns(3)
- with s_col1:
- st.metric("Total Bunches", sum(summary.values()))
- with s_col2:
- st.metric("Healthy (Ripe)", summary.get('Ripe', 0))
- with s_col3:
- abnormal = summary.get('Abnormal', 0)
- st.metric("Abnormal Alerts", abnormal)
- else:
- st.error(f"Archive file not found: {record['archive_path']}")
- else:
- st.error(f"Failed to fetch history: {res.text}")
- except Exception as e:
- st.error(f"Error loading history: {str(e)}")
|