import streamlit as st
import requests
from ultralytics import YOLO
import numpy as np
from PIL import Image
import io
import base64
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
import json
import os
from datetime import datetime
from fpdf import FPDF
@st.dialog("📘 AI Interpretation Guide")
def show_tech_guide():
st.write("### 🧠 1. The 'Thinking' Phase: The Raw Tensor [1, 300, 6]")
st.write("""
When the AI 'thinks' about an image, it doesn't see 'Ripe' or 'Unripe'. It populates a
fixed-size memory buffer (Tensor) with **300 potential candidates**. Each candidate is
represented by a row of 6 numbers.
""")
st.table({
"Tensor Index": ["0, 1, 2, 3", "4", "5"],
"AI Output": ["Coordinates", "Confidence Score", "Class ID"],
"Programmer's Logic": ["`[x1, y1, x2, y2]`", "`float (0.0 - 1.0)`", "`int (0-5)`"]
})
st.write("#### 🎯 The Coordinate Paradox (Pixels vs. Ratios)")
st.write("""
Depending on the engine, the **Values at Index 0-3** speak different languages.
This is why the raw numbers won't match if you swap engines:
""")
col_a, col_b = st.columns(2)
with col_a:
st.info("**PyTorch Pathway (.pt)**")
st.write("- **Format**: Absolute Pixels")
st.write("- **Logic**: The AI outputs numbers mapped to the photo's resolution (e.g., `245.0`).")
with col_b:
st.success("**ONNX Pathway (.onnx)**")
st.write("- **Format**: Normalized Ratios")
st.write("- **Logic**: The AI outputs percentages (0.0 to 1.0) relative to its internal 640x640 grid (e.g., `0.38`).")
st.write("---")
st.write("### 🎯 2. What is 'Confidence'? (The Probability Filter)")
st.write("""
Confidence is the AI's **mathematical certainty** that an object exists in a specific box.
It is the product of *Objectness* (Is something there?) and *Class Probability* (What is it?).
""")
st.table({
"Confidence Value": ["> 0.90", "0.50 - 0.89", "< 0.25 (Threshold)"],
"Interpretation": ["**Certain**: Clear, unobstructed view.", "**Likely**: Valid, but possibly obscured by fronds.", "**Noise**: Discarded to prevent False Positives."]
})
st.write("---")
st.write("### 🛠️ 3. The Custom Handler (The Translation Layer)")
st.write("""
Because ONNX returns raw ratios, we built a **Manual Scaling Handler**. It maps those
`0.0 - 1.0` values back to your high-resolution photo pixels.
This explains our two key metrics:
- **Inference Speed**: The time the AI spent populating the Raw Tensor.
- **Post-Processing**: The time our code spent 'translating' that Tensor into labels and pixels.
""")
st.write("---")
st.markdown("""
Your detection environment is powered by **YOLO26**, a custom architectural fork designed for zero-latency industrial sorting.
### ⚡ Performance Comparison
| Feature | YOLO26 (ONNX) | YOLO26 (Native) |
| :--- | :--- | :--- |
| **Coordinate System** | Normalized (0.0 - 1.0) | Absolute (Pixels) |
| **Primary Use Case** | Real-time Edge Sorting | High-Resolution Auditing |
| **Post-Processing** | None (NMS-Free) | Standard NMS |
""")
# --- 1. Global Backend Check ---
API_BASE_URL = "http://localhost:8000"
# MPOB Color Map for Overlays (Global for consistency)
overlay_colors = {
'Ripe': '#22c55e', # Industrial Green
'Underripe': '#fbbf24', # Industrial Orange
'Unripe': '#3b82f6', # Industrial Blue
'Abnormal': '#dc2626', # Critical Red
'Empty_Bunch': '#64748b',# Waste Gray
'Overripe': '#7c2d12' # Dark Brown/Orange
}
# Helper to reset results when files change or engine switches
def get_color(class_name):
"""Robust color lookup for consistent across models."""
# Normalize: "Under-ripe" -> "underripe", "Empty Bunch" -> "emptybunch"
norm_name = class_name.lower().replace("-", "").replace("_", "").replace(" ", "")
# Map normalized names to your MPOB standard colors
color_map = {k.lower().replace("_", ""): v for k, v in overlay_colors.items()}
if norm_name in color_map:
return color_map[norm_name]
# Fallback: Generate a consistent unique color for benchmark-only classes
import hashlib
return f"#{hashlib.md5(class_name.encode()).hexdigest()[:6]}"
def reset_single_results():
st.session_state.last_detection = None
def reset_batch_results():
st.session_state.last_batch_results = None
def reset_all_analysis():
"""Global reset for all active analysis views."""
st.session_state.last_detection = None
st.session_state.last_batch_results = None
# Increment uploader keys to 'forget' current files (Clear Canvas)
if "single_uploader_key" not in st.session_state:
st.session_state.single_uploader_key = 0
st.session_state.single_uploader_key += 1
if "batch_uploader_key" not in st.session_state:
st.session_state.batch_uploader_key = 0
st.session_state.batch_uploader_key += 1
def check_backend():
try:
res = requests.get(f"{API_BASE_URL}/get_confidence", timeout=2)
return res.status_code == 200
except:
return False
backend_active = check_backend()
# LOCAL MODEL LOADING REMOVED (YOLO26 Clean Sweep)
# UI now relies entirely on Backend API for NMS-Free inference.
if not backend_active:
st.error("⚠️ Backend API is offline!")
st.info("Please start the backend server first (e.g., `python main.py`) to unlock AI features.")
if st.button("🔄 Retry Connection"):
st.rerun()
st.stop() # Stops execution here, effectively disabling the app
# --- 2. Main Page Config (Only rendered if backend is active) ---
st.set_page_config(page_title="Palm Oil Ripeness AI (YOLO26)", layout="wide")
st.title("🌴 Palm Oil FFB Management System")
st.markdown("### Production-Ready AI Analysis & Archival")
# --- Sidebar ---
st.sidebar.header("Backend Controls")
def update_confidence():
new_conf = st.session_state.conf_slider
try:
requests.post(f"{API_BASE_URL}/set_confidence", json={"threshold": new_conf})
st.toast(f"Threshold updated to {new_conf}")
except:
st.sidebar.error("Failed to update threshold")
# We already know backend is up here
response = requests.get(f"{API_BASE_URL}/get_confidence")
current_conf = response.json().get("current_confidence", 0.25)
st.sidebar.success(f"Connected to API")
st.sidebar.info("Engine: YOLO26 NMS-Free (Inference: ~39ms)")
# Synchronized Slider
st.sidebar.slider(
"Confidence Threshold",
0.1, 1.0,
value=float(current_conf),
key="conf_slider",
on_change=update_confidence
)
st.sidebar.markdown("---")
# Inference Engine
engine_choice = st.sidebar.selectbox(
"Select Model Engine:",
["YOLO26 (ONNX - High Speed)", "YOLO26 (PyTorch - Native)", "Sawit-TBS (Benchmark)"],
index=0,
on_change=reset_all_analysis # Clear canvas on engine switch
)
# Map selection to internal labels
engine_map = {
"YOLO26 (ONNX - High Speed)": "onnx",
"YOLO26 (PyTorch - Native)": "pytorch",
"Sawit-TBS (Benchmark)": "benchmark"
}
st.sidebar.markdown("---")
model_type = engine_map[engine_choice]
if st.sidebar.button("❓ How to read results?", icon="📘", width='stretch'):
show_tech_guide()
# Function definitions moved to top
def display_interactive_results(image, detections, key=None):
"""Renders image with interactive hover-boxes using Plotly."""
img_width, img_height = image.size
fig = go.Figure()
# Add the palm image as the background
fig.add_layout_image(
dict(source=image, x=0, y=img_height, sizex=img_width, sizey=img_height,
sizing="stretch", opacity=1, layer="below", xref="x", yref="y")
)
# Configure axes to match image dimensions
fig.update_xaxes(showgrid=False, range=(0, img_width), zeroline=False, visible=False)
fig.update_yaxes(showgrid=False, range=(0, img_height), zeroline=False, visible=False, scaleanchor="x")
# Add interactive boxes
for i, det in enumerate(detections):
x1, y1, x2, y2 = det['box']
# Plotly y-axis is inverted relative to PIL, so we flip y
y_top, y_bottom = img_height - y1, img_height - y2
color = get_color(det['class'])
is_bench = (st.session_state.get('engine_choice') == "Sawit-TBS (Benchmark)")
# The 'Hover' shape
bunch_id = det.get('bunch_id', i+1)
fig.add_trace(go.Scatter(
x=[x1, x2, x2, x1, x1],
y=[y_top, y_top, y_bottom, y_bottom, y_top],
fill="toself",
fillcolor=color,
opacity=0.5 if is_bench else 0.3, # Stronger highlight for benchmark
mode='lines',
line=dict(color=color, width=5 if is_bench else 3, dash='dot' if is_bench else 'solid'),
name=f"ID: #{bunch_id}", # Unified ID Tag
text=f"ID: #{bunch_id}
Grade: {det['class']}
Score: {det['confidence']:.2f}
Alert: {det['is_health_alert']}",
hoverinfo="text"
))
fig.update_layout(width=800, height=600, margin=dict(l=0, r=0, b=0, t=0), showlegend=False)
st.plotly_chart(fig, width='stretch', key=key)
def annotate_image(image, detections):
"""Draws high-visibility 'Plated Labels' and boxes on the image."""
from PIL import ImageDraw, ImageFont
draw = ImageDraw.Draw(image)
# 1. Dynamic Font Scaling (width // 40 as requested)
font_size = max(20, image.width // 40)
try:
# standard Windows font paths for agent environment
font_path = "C:\\Windows\\Fonts\\arialbd.ttf" # Bold for higher visibility
if not os.path.exists(font_path):
font_path = "C:\\Windows\\Fonts\\arial.ttf"
if os.path.exists(font_path):
font = ImageFont.truetype(font_path, font_size)
else:
font = ImageFont.load_default()
except:
font = ImageFont.load_default()
for det in detections:
box = det['box'] # [x1, y1, x2, y2]
cls = det['class']
conf = det['confidence']
bunch_id = det.get('bunch_id', '?')
color = get_color(cls)
is_bench = (st.session_state.get('engine_choice') == "Sawit-TBS (Benchmark)")
# 2. Draw Heavy-Duty Bounding Box
line_width = max(6 if is_bench else 4, image.width // (80 if is_bench else 150))
draw.rectangle(box, outline=color, width=line_width)
# 3. Draw 'Plated Label' (Background Shaded)
label = f"#{bunch_id} {cls} {conf:.2f}"
try:
# Precise background calculation using textbbox
l, t, r, b = draw.textbbox((box[0], box[1]), label, font=font)
# Shift background up so it doesn't obscure the fruit
bg_rect = [l - 2, t - (b - t) - 10, r + 2, t - 6]
draw.rectangle(bg_rect, fill=color)
# Draw text inside the plate
draw.text((l, t - (b - t) - 8), label, fill="white", font=font)
except:
# Simple fallback
draw.text((box[0], box[1] - font_size), label, fill=color)
return image
def generate_batch_report(data, uploaded_files_map=None):
"""Generates a professional PDF report for batch results with visual evidence."""
from PIL import ImageDraw
pdf = FPDF()
pdf.add_page()
pdf.set_font("Arial", "B", 16)
pdf.cell(190, 10, "Palm Oil FFB Harvest Quality Report", ln=True, align="C")
pdf.set_font("Arial", "", 12)
pdf.cell(190, 10, f"Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", ln=True, align="C")
pdf.ln(10)
# 1. Summary Table
pdf.set_font("Arial", "B", 14)
pdf.cell(190, 10, "1. Batch Summary", ln=True)
pdf.set_font("Arial", "", 12)
summary = data.get('industrial_summary', {})
total_bunches = data.get('total_count', 0)
pdf.cell(95, 10, "Metric", border=1)
pdf.cell(95, 10, "Value", border=1, ln=True)
pdf.cell(95, 10, "Total Bunches Detected", border=1)
pdf.cell(95, 10, str(total_bunches), border=1, ln=True)
for grade, count in summary.items():
if count > 0:
pdf.cell(95, 10, f"Grade: {grade}", border=1)
pdf.cell(95, 10, str(count), border=1, ln=True)
pdf.ln(10)
# 2. Strategic Insights
pdf.set_font("Arial", "B", 14)
pdf.cell(190, 10, "2. Strategic Yield Insights", ln=True)
pdf.set_font("Arial", "", 12)
unripe = summary.get('Unripe', 0)
underripe = summary.get('Underripe', 0)
loss = unripe + underripe
if loss > 0:
pdf.multi_cell(190, 10, f"WARNING: {loss} bunches were harvested before peak ripeness. "
"This directly impacts the Oil Extraction Rate (OER) and results in potential yield loss.")
else:
pdf.multi_cell(190, 10, "EXCELLENT: All detected bunches meet prime ripeness standards. Harvest efficiency is 100%.")
# Critical Alerts
abnormal = summary.get('Abnormal', 0)
empty = summary.get('Empty_Bunch', 0)
if abnormal > 0 or empty > 0:
pdf.ln(5)
pdf.set_text_color(220, 0, 0)
pdf.set_font("Arial", "B", 12)
pdf.cell(190, 10, "CRITICAL HEALTH ALERTS:", ln=True)
pdf.set_font("Arial", "", 12)
if abnormal > 0:
pdf.cell(190, 10, f"- {abnormal} Abnormal Bunches detected (Requires immediate field inspection).", ln=True)
if empty > 0:
pdf.cell(190, 10, f"- {empty} Empty Bunches detected (Waste reduction needed).", ln=True)
pdf.set_text_color(0, 0, 0)
# 3. Visual Evidence Section
if 'detailed_results' in data and uploaded_files_map:
pdf.add_page()
pdf.set_font("Arial", "B", 14)
pdf.cell(190, 10, "3. Visual Batch Evidence (AI Overlay)", ln=True)
pdf.ln(5)
# Group detections by filename
results_by_file = {}
for res in data['detailed_results']:
fname = res['filename']
if fname not in results_by_file:
results_by_file[fname] = []
results_by_file[fname].append(res['detection'])
for fname, detections in results_by_file.items():
if fname in uploaded_files_map:
img_bytes = uploaded_files_map[fname]
img = Image.open(io.BytesIO(img_bytes)).convert("RGB")
draw = ImageDraw.Draw(img)
# Drawing annotated boxes for PDF using high-visibility utility
annotate_image(img, detections)
# Save to temp file for PDF
temp_img_path = f"temp_report_{fname}"
img.save(temp_img_path)
# Check if we need a new page based on image height (rough estimate)
if pdf.get_y() > 200:
pdf.add_page()
pdf.image(temp_img_path, x=10, w=150)
pdf.set_font("Arial", "I", 10)
pdf.cell(190, 10, f"Annotated: {fname}", ln=True)
pdf.ln(5)
os.remove(temp_img_path)
# Footer
pdf.set_y(-15)
pdf.set_font("Arial", "I", 8)
pdf.cell(190, 10, "Generated by Palm Oil AI Desktop PoC - YOLO26 Engine", align="C")
return pdf.output(dest='S')
# --- Tabs ---
tab1, tab2, tab3, tab4 = st.tabs(["Single Analysis", "Batch Processing", "Similarity Search", "History Vault"])
# --- Tab 1: Single Analysis ---
with tab1:
st.subheader("Analyze Single Bunch")
# 1. Initialize Uploader Key
if "single_uploader_key" not in st.session_state:
st.session_state.single_uploader_key = 0
uploaded_file = st.file_uploader(
"Upload a bunch image...",
type=["jpg", "jpeg", "png"],
key=f"single_{st.session_state.single_uploader_key}",
on_change=reset_single_results
)
if uploaded_file:
# State initialization
if "last_detection" not in st.session_state:
st.session_state.last_detection = None
# 1. Auto-Detection Trigger
if uploaded_file and st.session_state.last_detection is None:
with st.spinner(f"Processing with {model_type.upper()} Engine..."):
files = {"file": (uploaded_file.name, uploaded_file.getvalue(), uploaded_file.type)}
payload = {"model_type": model_type}
res = requests.post(f"{API_BASE_URL}/analyze", files=files, data=payload)
if res.status_code == 200:
st.session_state.last_detection = res.json()
st.rerun() # Refresh to show results immediately
else:
st.error(f"Detection Failed: {res.text}")
# 2. Results Layout
if st.session_state.last_detection:
# Redo Button at the top for easy access
if st.button("🔄 Re-analyze Image", width='stretch', type="primary", help="Force a fresh detection (useful if threshold changed)."):
st.session_state.last_detection = None
st.rerun()
data = st.session_state.last_detection
st.divider()
if model_type == "benchmark":
st.info("💡 **Benchmark Mode**: Labels and colors are determined by the external model's architecture. Some labels may not match standard MPOB categories.")
st.write("### 📈 Manager's Dashboard")
m_col1, m_col2, m_col3, m_col4 = st.columns(4)
with m_col1:
st.metric("Total Bunches", data.get('total_count', 0))
with m_col2:
if model_type == "benchmark":
# For benchmark model, show the top detected class instead of 'Healthy'
top_class = "None"
if data.get('industrial_summary'):
top_class = max(data['industrial_summary'], key=data['industrial_summary'].get)
st.metric("Top Detected Class", top_class)
else:
st.metric("Healthy (Ripe)", data['industrial_summary'].get('Ripe', 0))
with m_col3:
# Refined speed label based on engine
speed_label = "Raw Speed (Unlabeled)" if model_type == "onnx" else "Wrapped Speed (Auto-Labeled)"
st.metric("Inference Speed", f"{data.get('inference_ms', 0):.1f} ms", help=speed_label)
with m_col4:
st.metric("Post-Processing", f"{data.get('processing_ms', 0):.1f} ms", help="Labeling/Scaling overhead")
st.divider()
# Side-by-Side View (Technical Trace)
img = Image.open(uploaded_file).convert("RGB")
if st.session_state.get('tech_trace', False):
t_col1, t_col2 = st.columns(2)
with t_col1:
st.subheader("🔢 Raw Output Tensor (The Math)")
st.caption("First 5 rows of the 1x300x6 detection tensor.")
st.json(data.get('raw_array_sample', []))
with t_col2:
st.subheader("🎨 AI Interpretation")
img_annotated = annotate_image(img.copy(), data['detections'])
st.image(img_annotated, width='stretch')
else:
# Regular View
st.write("### 🔍 AI Analytical View")
display_interactive_results(img, data['detections'], key="main_viewer")
col1, col2 = st.columns([1.5, 1]) # Keep original col structure for summary below
with col1:
col_tech_h1, col_tech_h2 = st.columns([1, 1])
with col_tech_h1:
st.write("#### 🛠️ Technical Evidence")
with col_tech_h2:
st.session_state.tech_trace = st.toggle("🔬 Side-by-Side Trace", value=st.session_state.get('tech_trace', False))
with st.expander("Raw Output Tensor (NMS-Free)", expanded=False):
coord_type = "Absolute Pixels" if model_type == "pytorch" else "Normalized Ratios (0.0-1.0)"
st.warning(f"Engine detected: {model_type.upper()} | Coordinate System: {coord_type}")
st.json(data.get('raw_array_sample', []))
with st.container(border=True):
st.write("### 🏷️ Detection Results")
if not data['detections']:
st.warning("No Fresh Fruit Bunches detected.")
else:
for det in data['detections']:
st.info(f"### Bunch #{det['bunch_id']}: {det['class']} ({det['confidence']:.2%})")
st.write("### 📊 Harvest Quality Mix")
# Convert industrial_summary dictionary to a DataFrame for charting
summary_df = pd.DataFrame(
list(data['industrial_summary'].items()),
columns=['Grade', 'Count']
)
# Filter out classes with 0 count for a cleaner chart
summary_df = summary_df[summary_df['Count'] > 0]
if not summary_df.empty:
# Create a Pie Chart to show the proportion of each grade
fig = px.pie(summary_df, values='Count', names='Grade',
color='Grade',
color_discrete_map={
'Ripe': '#22c55e', # Industrial Green
'Underripe': '#fbbf24', # Industrial Orange
'Unripe': '#3b82f6', # Industrial Blue
'Abnormal': '#dc2626', # Critical Red
'Empty_Bunch': '#64748b' # Waste Gray
},
hole=0.4)
fig.update_layout(margin=dict(t=0, b=0, l=0, r=0), height=300)
st.plotly_chart(fig, width='stretch', key="single_pie")
# 💡 Strategic R&D Insight: Harvest Efficiency
st.write("---")
st.write("#### 💡 Strategic R&D Insight")
unripe_count = data['industrial_summary'].get('Unripe', 0)
underripe_count = data['industrial_summary'].get('Underripe', 0)
total_non_prime = unripe_count + underripe_count
st.write(f"🌑 **Unripe (Mentah):** {unripe_count}")
st.write(f"🌗 **Underripe (Kurang Masak):** {underripe_count}")
if total_non_prime > 0:
st.warning(f"🚨 **Potential Yield Loss:** {total_non_prime} bunches harvested too early. This will reduce OER (Oil Extraction Rate).")
else:
st.success("✅ **Harvest Efficiency:** 100% Prime Ripeness detected.")
# High-Priority Health Alert
if data['industrial_summary'].get('Abnormal', 0) > 0:
st.error(f"🚨 CRITICAL: {data['industrial_summary']['Abnormal']} Abnormal Bunches Detected!")
if data['industrial_summary'].get('Empty_Bunch', 0) > 0:
st.warning(f"⚠️ ALERT: {data['industrial_summary']['Empty_Bunch']} Empty Bunches Detected.")
# 3. Cloud Actions (Only if detections found)
st.write("---")
st.write("#### ✨ Cloud Archive")
if st.button("🚀 Save to Atlas (Vectorize)", width='stretch'):
with st.spinner("Archiving..."):
import json
primary_det = data['detections'][0]
payload = {"detection_data": json.dumps(primary_det)}
files_cloud = {"file": (uploaded_file.name, uploaded_file.getvalue(), uploaded_file.type)}
res_cloud = requests.post(f"{API_BASE_URL}/vectorize_and_store", files=files_cloud, data=payload)
if res_cloud.status_code == 200:
res_json = res_cloud.json()
if res_json["status"] == "success":
st.success(f"Archived! ID: `{res_json['record_id'][:8]}...`")
else:
st.error(f"Cloud Error: {res_json['message']}")
else:
st.error("Failed to connect to cloud service")
if st.button("🚩 Flag Misclassification", width='stretch', type="secondary"):
# Save to local feedback folder
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
feedback_id = f"fb_{timestamp}"
img_path = f"feedback/{feedback_id}.jpg"
json_path = f"feedback/{feedback_id}.json"
# Save image
Image.open(uploaded_file).save(img_path)
# Save metadata
feedback_data = {
"original_filename": uploaded_file.name,
"timestamp": timestamp,
"detections": data['detections'],
"threshold_used": data['current_threshold']
}
with open(json_path, "w") as f:
json.dump(feedback_data, f, indent=4)
st.toast("✅ Feedback saved to local vault!", icon="🚩")
if st.button("💾 Local History Vault (Auto-Saved)", width='stretch', type="secondary", disabled=True):
pass
st.caption("✅ This analysis was automatically archived to the local vault.")
# --- Tab 2: Batch Processing ---
with tab2:
st.subheader("Bulk Analysis")
# 1. Initialize Session State
if "batch_uploader_key" not in st.session_state:
st.session_state.batch_uploader_key = 0
if "last_batch_results" not in st.session_state:
st.session_state.last_batch_results = None
# 2. Display Persisted Results (if any)
if st.session_state.last_batch_results:
res_data = st.session_state.last_batch_results
with st.container(border=True):
st.success(f"✅ Successfully processed {res_data['processed_count']} images.")
# Batch Summary Dashboard
st.write("### 📈 Batch Quality Overview")
batch_summary = res_data.get('industrial_summary', {})
if batch_summary:
sum_df = pd.DataFrame(list(batch_summary.items()), columns=['Grade', 'Count'])
sum_df = sum_df[sum_df['Count'] > 0]
b_col1, b_col2 = st.columns([1, 1])
with b_col1:
st.dataframe(sum_df, hide_index=True, width='stretch')
with b_col2:
if not sum_df.empty:
fig_batch = px.bar(sum_df, x='Grade', y='Count', color='Grade',
color_discrete_map={
'Ripe': '#22c55e',
'Underripe': '#fbbf24',
'Unripe': '#3b82f6',
'Abnormal': '#dc2626',
'Empty_Bunch': '#64748b'
})
fig_batch.update_layout(margin=dict(t=0, b=0, l=0, r=0), height=200, showlegend=False)
st.plotly_chart(fig_batch, width='stretch', key="batch_bar")
if batch_summary.get('Abnormal', 0) > 0:
st.error(f"🚨 BATCH CRITICAL: {batch_summary['Abnormal']} Abnormal Bunches found in this batch!")
st.write("Generated Record IDs:")
st.code(res_data['record_ids'])
# --- 4. Batch Evidence Gallery ---
st.write("### 🖼️ Detailed Detection Evidence")
if 'detailed_results' in res_data:
# Group results by filename for gallery
gallery_map = {}
for res in res_data['detailed_results']:
fname = res['filename']
if fname not in gallery_map:
gallery_map[fname] = []
gallery_map[fname].append(res['detection'])
# Show images with overlays using consistent utility
for up_file in uploaded_files:
if up_file.name in gallery_map:
with st.container(border=True):
g_img = Image.open(up_file).convert("RGB")
g_annotated = annotate_image(g_img, gallery_map[up_file.name])
st.image(g_annotated, caption=f"Evidence: {up_file.name}", width='stretch')
# PDF Export Button (Pass images map)
files_map = {f.name: f.getvalue() for f in uploaded_files}
pdf_bytes = generate_batch_report(res_data, files_map)
st.download_button(
label="📄 Download Executive Batch Report (PDF)",
data=pdf_bytes,
file_name=f"PalmOil_BatchReport_{datetime.now().strftime('%Y%m%d_%H%M%S')}.pdf",
mime="application/pdf",
width='stretch'
)
if st.button("Clear Results & Start New Batch", width='stretch'):
st.session_state.last_batch_results = None
st.rerun()
st.divider()
# 3. Uploader UI
col_batch1, col_batch2 = st.columns([4, 1])
with col_batch1:
uploaded_files = st.file_uploader(
"Upload multiple images...",
type=["jpg", "jpeg", "png"],
accept_multiple_files=True,
key=f"batch_{st.session_state.batch_uploader_key}",
on_change=reset_batch_results
)
with col_batch2:
st.write("##") # Alignment
if st.session_state.last_batch_results is None and uploaded_files:
if st.button("🔍 Process Batch", type="primary", width='stretch'):
with st.spinner(f"Analyzing {len(uploaded_files)} images with {model_type.upper()}..."):
files = [("files", (f.name, f.getvalue(), f.type)) for f in uploaded_files]
payload = {"model_type": model_type}
res = requests.post(f"{API_BASE_URL}/process_batch", files=files, data=payload)
if res.status_code == 200:
data = res.json()
if data["status"] == "success":
st.session_state.last_batch_results = data
st.session_state.batch_uploader_key += 1
st.rerun()
elif data["status"] == "partial_success":
st.warning(data["message"])
st.info(f"Successfully detected {data['detections_count']} bunches locally.")
else:
st.error(f"Batch Error: {data['message']}")
else:
st.error(f"Batch Processing Failed: {res.text}")
if st.button("🗑️ Reset Uploader"):
st.session_state.batch_uploader_key += 1
st.session_state.last_batch_results = None
st.rerun()
# --- Tab 3: Similarity Search ---
with tab3:
st.subheader("Hybrid Semantic Search")
st.markdown("Search records by either **Image Similarity** or **Natural Language Query**.")
with st.form("hybrid_search_form"):
col_input1, col_input2 = st.columns(2)
with col_input1:
search_file = st.file_uploader("Option A: Search Image...", type=["jpg", "jpeg", "png"], key="search")
with col_input2:
text_query = st.text_input("Option B: Natural Language Query", placeholder="e.g., 'ripe bunches with dark spots' or 'unripe fruit'")
top_k = st.slider("Results Limit (Top K)", 1, 20, 3)
submit_search = st.form_submit_button("Run Semantic Search")
if submit_search:
if not search_file and not text_query:
st.warning("Please provide either an image or a text query.")
else:
with st.spinner("Searching Vector Index..."):
payload = {"limit": top_k}
# If an image is uploaded, it takes precedence for visual search
if search_file:
files = {"file": (search_file.name, search_file.getvalue(), search_file.type)}
# Pass top_k as part of the data
res = requests.post(f"{API_BASE_URL}/search_hybrid", files=files, data=payload)
# Otherwise, use text query
elif text_query:
payload["text_query"] = text_query
# Send as form-data (data=) to match FastAPI's Form(None)
res = requests.post(f"{API_BASE_URL}/search_hybrid", data=payload)
if res.status_code == 200:
results = res.json().get("results", [])
if not results:
st.warning("No similar records found.")
else:
st.success(f"Found {len(results)} matches.")
for item in results:
with st.container(border=True):
c1, c2 = st.columns([1, 2])
# Fetch the image for this result
rec_id = item["_id"]
img_res = requests.get(f"{API_BASE_URL}/get_image/{rec_id}")
with c1:
if img_res.status_code == 200:
img_b64 = img_res.json().get("image_data")
if img_b64:
st.image(base64.b64decode(img_b64), width=250)
else:
st.write("No image data found.")
else:
st.write("Failed to load image.")
with c2:
st.write(f"**Class:** {item['ripeness_class']}")
st.write(f"**Similarity Score:** {item['score']:.4f}")
st.write(f"**Timestamp:** {item['timestamp']}")
st.write(f"**ID:** `{rec_id}`")
else:
st.error(f"Search failed: {res.text}")
# --- Tab 4: History Vault ---
with tab4:
st.subheader("📜 Local History Vault")
st.caption("Industrial-grade audit log of all past AI harvest scans.")
if "selected_history_id" not in st.session_state:
st.session_state.selected_history_id = None
try:
res = requests.get(f"{API_BASE_URL}/get_history")
if res.status_code == 200:
history_data = res.json().get("history", [])
if not history_data:
st.info("No saved records found in the vault.")
else:
if st.session_state.selected_history_id is None:
# --- 1. ListView Mode (Management Dashboard) ---
st.write("### 📋 Audit Log")
# Prepare searchable dataframe
df_history = pd.DataFrame(history_data)
# Clean up for display
display_df = df_history[['id', 'timestamp', 'engine', 'filename', 'inference_ms']].copy()
display_df.columns = ['ID', 'Date/Time', 'Engine', 'Filename', 'Inference (ms)']
st.dataframe(
display_df,
hide_index=True,
width='stretch',
column_config={
"ID": st.column_config.NumberColumn(width="small"),
"Inference (ms)": st.column_config.NumberColumn(format="%.1f ms")
}
)
# Industrial Selection UI
hist_col1, hist_col2 = st.columns([3, 1])
with hist_col1:
target_id = st.selectbox(
"Select Record for Deep Dive Analysis",
options=df_history['id'].tolist(),
format_func=lambda x: f"Record #{x} - {df_history[df_history['id']==x]['filename'].values[0]}"
)
with hist_col2:
st.write("##") # Alignment
if st.button("🔬 Start Deep Dive", type="primary", width='stretch'):
st.session_state.selected_history_id = target_id
st.rerun()
else:
# --- 2. Detail View Mode (Technical Auditor) ---
record = next((item for item in history_data if item["id"] == st.session_state.selected_history_id), None)
if not record:
st.error("Audit record not found.")
if st.button("Back to List"):
st.session_state.selected_history_id = None
st.rerun()
else:
st.button("⬅️ Back to Audit Log", on_click=lambda: st.session_state.update({"selected_history_id": None}))
st.divider()
st.write(f"## 🔍 Deep Dive: Record #{record['id']}")
engine_val = record.get('engine', 'Unknown')
st.caption(f"Original Filename: `{record['filename']}` | Processed: `{record['timestamp']}` | Engine: `{engine_val.upper()}`")
detections = json.loads(record['detections'])
summary = json.loads(record['summary'])
# Metrics Executive Summary
h_col1, h_col2, h_col3, h_col4 = st.columns(4)
with h_col1:
st.metric("Total Bunches", sum(summary.values()))
with h_col2:
st.metric("Healthy (Ripe)", summary.get('Ripe', 0))
with h_col3:
st.metric("Engine Performance", f"{record.get('inference_ms', 0) or 0:.1f} ms")
with h_col4:
st.metric("Labeling Overhead", f"{record.get('processing_ms', 0) or 0:.1f} ms")
# Re-Annotate Archived Image
if os.path.exists(record['archive_path']):
with open(record['archive_path'], "rb") as f:
hist_img = Image.open(f).convert("RGB")
# Side-by-Side: Interactive vs Static Plate
v_tab1, v_tab2 = st.tabs(["Interactive Plotly View", "Static Annotated Evidence"])
with v_tab1:
display_interactive_results(hist_img, detections, key=f"hist_plotly_{record['id']}")
with v_tab2:
img_plate = annotate_image(hist_img.copy(), detections)
st.image(img_plate, width='stretch', caption="Point-of-Harvest AI Interpretation")
else:
st.warning(f"Technical Error: Archive file missing at `{record['archive_path']}`")
# Technical Evidence Expander (Mathematical Audit)
st.divider()
st.write("### 🛠️ Technical Audit Trail")
with st.expander("🔬 View Raw Mathematical Tensor", expanded=False):
st.info("This is the exact numerical output from the AI engine prior to human-readable transformation.")
raw_data = record.get('raw_tensor')
if raw_data:
try:
st.json(json.loads(raw_data))
except:
st.code(raw_data)
else:
st.warning("No raw tensor trace was archived for this legacy record.")
else:
st.error(f"Vault Connection Failed: {res.text}")
except Exception as e:
st.error(f"Audit System Error: {str(e)}")