#!/usr/bin/env python3 """ OCTv2 (Oreo Cookie Thrower v2) - Raspberry Pi Server v2 - ESP32 serial communication for motor control - Automatic mouth detection and targeting - Stepper motor control for rotation and elevation """ import socket import json import threading import time import logging import serial import cv2 import numpy as np from datetime import datetime from typing import Dict, Any, Optional, Tuple, List import io import os import math # Camera imports try: from picamera2 import Picamera2 CAMERA_AVAILABLE = True except ImportError: print("Camera not available - running in simulation mode") CAMERA_AVAILABLE = False # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) class ESP32Controller: """Handle serial communication with ESP32 for motor control""" def __init__(self, port='/dev/ttyUSB0', baudrate=115200): self.port = port self.baudrate = baudrate self.serial_conn = None self.connect() def connect(self): """Connect to ESP32 via serial""" try: self.serial_conn = serial.Serial(self.port, self.baudrate, timeout=1) time.sleep(2) # Wait for ESP32 to initialize logger.info(f"Connected to ESP32 on {self.port}") return True except Exception as e: logger.error(f"Failed to connect to ESP32: {e}") return False def send_command(self, command: str) -> str: """Send command to ESP32 and get response""" if not self.serial_conn: logger.error("ESP32 not connected") return "ERROR: Not connected" try: self.serial_conn.write(f"{command}\n".encode()) response = self.serial_conn.readline().decode().strip() logger.debug(f"ESP32 Command: {command} -> Response: {response}") return response except Exception as e: logger.error(f"ESP32 communication error: {e}") return "ERROR: Communication failed" def home_motors(self) -> bool: """Home both rotation and elevation motors""" response = self.send_command("HOME") return response == "OK" def move_to_position(self, rotation_degrees: float, elevation_degrees: float) -> bool: """Move to absolute position (rotation: -90 to +90, elevation: 0 to 60)""" cmd = f"MOVE {rotation_degrees:.1f} {elevation_degrees:.1f}" response = self.send_command(cmd) return response == "OK" def move_relative(self, delta_rotation: float, delta_elevation: float) -> bool: """Move relative to current position""" cmd = f"REL {delta_rotation:.1f} {delta_elevation:.1f}" response = self.send_command(cmd) return response == "OK" def fire_oreo(self) -> bool: """Trigger the firing mechanism""" response = self.send_command("FIRE") return response == "OK" def get_position(self) -> Tuple[float, float]: """Get current position (rotation, elevation)""" response = self.send_command("POS") try: parts = response.split() if len(parts) == 2: return float(parts[0]), float(parts[1]) except: pass return 0.0, 0.0 class MouthDetector: """Detect open mouths in camera feed for automatic targeting""" def __init__(self): # Load OpenCV face cascade classifier self.face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml') # For actual open mouth detection, we'll use facial landmarks try: import dlib # Download shape predictor: http://dlib.net/files/shape_predictor_68_face_landmarks.dat.bz2 self.predictor = dlib.shape_predictor("shape_predictor_68_face_landmarks.dat") self.detector = dlib.get_frontal_face_detector() self.use_dlib = True logger.info("Using dlib for precise mouth detection") except ImportError: logger.warning("dlib not available - using basic mouth area detection") self.use_dlib = False # Camera parameters for targeting calculations self.camera_width = 640 self.camera_height = 480 self.center_x = self.camera_width // 2 self.center_y = self.camera_height // 2 # Camera-follows-aim targeting parameters self.target_deadzone_pixels = 30 # Don't adjust if mouth is within this radius of center self.max_adjustment_degrees = 10 # Maximum single adjustment per iteration # Distance estimation parameters (based on average human face size) self.average_face_width_cm = 16.0 # Average human face width self.camera_focal_length_mm = 3.04 # Pi Camera focal length self.sensor_width_mm = 3.68 # Pi Camera sensor width # Aiming offsets (configurable for mechanical compensation) self.rotation_offset_degrees = 0.0 # Adjust if camera/launcher not perfectly aligned self.elevation_offset_degrees = 0.0 # Adjust for gravity compensation self.distance_elevation_factor = 0.5 # Elevation adjustment based on distance logger.info("Mouth detector initialized") def detect_open_mouths(self, frame) -> List[Tuple[int, int, int, int, float]]: """ Detect open mouths in frame Returns list of (x, y, w, h, confidence) for each detected mouth """ gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) mouths = [] if self.use_dlib: # Use dlib for precise facial landmark detection faces = self.detector(gray) for face in faces: landmarks = self.predictor(gray, face) # Get mouth landmarks (points 48-67 in 68-point model) mouth_points = [] for i in range(48, 68): point = landmarks.part(i) mouth_points.append((point.x, point.y)) # Advanced mouth analysis for WIDE-OPEN detection only mouth_state, confidence = self._analyze_mouth_state(landmarks) # Only target WIDE_OPEN mouths (not speaking or smiling) if mouth_state == "WIDE_OPEN": # Calculate bounding box around mouth mouth_xs = [p[0] for p in mouth_points] mouth_ys = [p[1] for p in mouth_points] mx = min(mouth_xs) - 10 my = min(mouth_ys) - 10 mw = max(mouth_xs) - min(mouth_xs) + 20 mh = max(mouth_ys) - min(mouth_ys) + 20 mouths.append((mx, my, mw, mh, confidence)) else: # Fallback to basic face detection + mouth area estimation faces = self.face_cascade.detectMultiScale(gray, 1.3, 5) for (fx, fy, fw, fh) in faces: # Estimate mouth area in lower third of face mouth_x = fx + int(fw * 0.25) mouth_y = fy + int(fh * 0.6) mouth_w = int(fw * 0.5) mouth_h = int(fh * 0.3) # Use intensity variance as proxy for open mouth mouth_roi = gray[mouth_y:mouth_y+mouth_h, mouth_x:mouth_x+mouth_w] if mouth_roi.size > 0: variance = np.var(mouth_roi) # Higher variance might indicate teeth/tongue visibility confidence = min(1.0, variance / 1000.0) # Adjust threshold if confidence > 0.3: # Minimum confidence threshold mouths.append((mouth_x, mouth_y, mouth_w, mouth_h, confidence)) # Sort by confidence (highest first) mouths.sort(key=lambda x: x[4], reverse=True) return mouths def _analyze_mouth_state(self, landmarks) -> Tuple[str, float]: """ Analyze mouth landmarks to determine state and confidence Returns: (state, confidence) where state is CLOSED, SPEAKING, SMILING, or WIDE_OPEN """ # Key mouth landmark points # Outer lip: 48-54 (top), 54-60 (bottom) # Inner lip: 60-64 (top), 64-68 (bottom) # Vertical measurements (mouth opening) outer_top = landmarks.part(51) # Top of upper lip (center) outer_bottom = landmarks.part(57) # Bottom of lower lip (center) inner_top = landmarks.part(62) # Top of inner lip inner_bottom = landmarks.part(66) # Bottom of inner lip # Horizontal measurements (mouth width) left_corner = landmarks.part(48) # Left mouth corner right_corner = landmarks.part(54) # Right mouth corner # Calculate dimensions outer_height = abs(outer_top.y - outer_bottom.y) inner_height = abs(inner_top.y - inner_bottom.y) mouth_width = abs(right_corner.x - left_corner.x) # Calculate ratios outer_aspect_ratio = outer_height / mouth_width if mouth_width > 0 else 0 inner_aspect_ratio = inner_height / mouth_width if mouth_width > 0 else 0 # Calculate lip separation (distance between inner and outer lip) lip_thickness_top = abs(outer_top.y - inner_top.y) lip_thickness_bottom = abs(outer_bottom.y - inner_bottom.y) avg_lip_thickness = (lip_thickness_top + lip_thickness_bottom) / 2 # Determine mouth state based on multiple criteria # WIDE_OPEN: Large inner opening + significant lip separation if (inner_aspect_ratio > 0.6 and outer_aspect_ratio > 0.4 and avg_lip_thickness > 8): # Pixels of lip separation confidence = min(1.0, inner_aspect_ratio * 1.5) return "WIDE_OPEN", confidence # SPEAKING: Moderate opening but less lip separation elif (inner_aspect_ratio > 0.3 and outer_aspect_ratio > 0.2 and avg_lip_thickness > 3): confidence = min(1.0, inner_aspect_ratio * 0.8) return "SPEAKING", confidence # SMILING: Wide mouth but minimal vertical opening elif (mouth_width > 40 and # Wider than normal outer_aspect_ratio < 0.25 and inner_aspect_ratio < 0.2): # Check if corners are raised (smile detection) mouth_center_y = (outer_top.y + outer_bottom.y) / 2 corner_raise = mouth_center_y - ((left_corner.y + right_corner.y) / 2) if corner_raise > 3: # Corners raised above center return "SMILING", 0.3 # Default: CLOSED return "CLOSED", 0.1 def estimate_distance(self, face_width_pixels: int) -> float: """ Estimate distance to face based on face width in pixels Returns distance in centimeters """ if face_width_pixels <= 0: return 100.0 # Default fallback distance # Distance = (real_face_width * focal_length * image_width) / (face_width_pixels * sensor_width) distance_cm = ( self.average_face_width_cm * self.camera_focal_length_mm * self.camera_width ) / (face_width_pixels * self.sensor_width_mm) # Clamp to reasonable range (50cm to 500cm) return max(50.0, min(500.0, distance_cm)) def calculate_centering_adjustment(self, mouth_x: int, mouth_y: int, face_width: int) -> Tuple[float, float, float]: """ Calculate motor adjustments to center the mouth in camera view Returns: (rotation_adjustment, elevation_adjustment, estimated_distance) """ # Calculate offset from center dx = mouth_x - self.center_x dy = mouth_y - self.center_y distance_from_center = math.sqrt(dx*dx + dy*dy) # Estimate distance for context estimated_distance = self.estimate_distance(face_width) # If mouth is already centered (within deadzone), no adjustment needed if distance_from_center < self.target_deadzone_pixels: return 0.0, 0.0, estimated_distance # Calculate adjustment angles based on pixel offset # Larger faces (closer) need smaller adjustments, smaller faces (farther) need larger adjustments distance_factor = max(0.5, min(2.0, 100.0 / estimated_distance)) # Scale adjustments by distance # Convert pixel offset to approximate angle adjustment # This is empirical - you'll need to tune these values for your setup pixels_per_degree_rotation = 15 * distance_factor # Adjust based on your camera/motor setup pixels_per_degree_elevation = 12 * distance_factor # Adjust based on your camera/motor setup rotation_adjustment = dx / pixels_per_degree_rotation elevation_adjustment = -dy / pixels_per_degree_elevation # Negative because Y increases downward # Apply configured offsets rotation_adjustment += self.rotation_offset_degrees elevation_adjustment += self.elevation_offset_degrees # Add distance-based elevation compensation (closer targets need higher elevation) distance_elevation_compensation = (200.0 - estimated_distance) * self.distance_elevation_factor / 100.0 elevation_adjustment += distance_elevation_compensation # Clamp to maximum adjustment per iteration rotation_adjustment = max(-self.max_adjustment_degrees, min(self.max_adjustment_degrees, rotation_adjustment)) elevation_adjustment = max(-self.max_adjustment_degrees, min(self.max_adjustment_degrees, elevation_adjustment)) return rotation_adjustment, elevation_adjustment, estimated_distance class OCTv2Server: def __init__(self, host='0.0.0.0', port=8080): self.host = host self.port = port self.running = False self.clients = [] # Hardware components self.esp32 = ESP32Controller() self.mouth_detector = MouthDetector() # Hardware state self.current_rotation = 0.0 # -90 to +90 degrees self.current_elevation = 30.0 # 0 to 60 degrees self.is_auto_mode = False self.is_homed = False self.auto_fire_enabled = True # Camera state self.camera = None self.streaming_clients = [] self.stream_thread = None # Auto mode state self.auto_mode_thread = None self.last_target_time = 0 self.target_cooldown = 2.0 # Seconds between automatic shots self.setup_camera() def setup_camera(self): """Initialize camera""" if not CAMERA_AVAILABLE: logger.info("Camera not available - simulating camera") return try: self.camera = Picamera2() config = self.camera.create_preview_configuration( main={"size": (640, 480)}, lores={"size": (320, 240)}, display="lores" ) self.camera.configure(config) self.camera.start() logger.info("Camera initialized successfully") except Exception as e: logger.error(f"Camera setup failed: {e}") def start_server(self): """Start the TCP server""" self.running = True server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) try: server_socket.bind((self.host, self.port)) server_socket.listen(5) logger.info(f"OCTv2 Server v2 listening on {self.host}:{self.port}") # Start auto mode thread self.auto_mode_thread = threading.Thread(target=self.auto_mode_worker) self.auto_mode_thread.daemon = True self.auto_mode_thread.start() while self.running: try: client_socket, address = server_socket.accept() logger.info(f"Client connected from {address}") client_thread = threading.Thread( target=self.handle_client, args=(client_socket, address) ) client_thread.daemon = True client_thread.start() except Exception as e: if self.running: logger.error(f"Error accepting client: {e}") except Exception as e: logger.error(f"Server error: {e}") finally: server_socket.close() self.cleanup() def handle_client(self, client_socket, address): """Handle individual client connections""" self.clients.append(client_socket) try: while self.running: data = client_socket.recv(1024) if not data: break try: command = json.loads(data.decode('utf-8')) logger.info(f"Received command: {command}") response = self.process_command(command, client_socket) if response: client_socket.send(json.dumps(response).encode('utf-8')) except json.JSONDecodeError: logger.error("Invalid JSON received") except Exception as e: logger.error(f"Error processing command: {e}") except Exception as e: logger.error(f"Client {address} error: {e}") finally: if client_socket in self.clients: self.clients.remove(client_socket) if client_socket in self.streaming_clients: self.streaming_clients.remove(client_socket) client_socket.close() logger.info(f"Client {address} disconnected") def process_command(self, command: Dict[str, Any], client_socket) -> Optional[Dict[str, Any]]: """Process commands from iOS app""" action = command.get('action') logger.info(f"Processing action: {action}") if action == 'aim_left': return self.aim_left() elif action == 'aim_right': return self.aim_right() elif action == 'fire': angle = command.get('angle', self.current_elevation) return self.fire_oreo(angle) elif action == 'home': return self.home_device() elif action == 'set_mode': mode = command.get('mode', 'manual') return self.set_mode(mode) elif action == 'capture_photo': return self.capture_photo(client_socket) elif action == 'start_video_stream': return self.start_video_stream(client_socket) elif action == 'stop_video_stream': return self.stop_video_stream(client_socket) elif action == 'status': return self.get_status() else: return {'error': f'Unknown action: {action}'} def aim_left(self) -> Dict[str, Any]: """Move aim left by small increment""" new_rotation = max(-90, self.current_rotation - 5) if self.esp32.move_to_position(new_rotation, self.current_elevation): self.current_rotation = new_rotation return {'status': 'success', 'rotation': self.current_rotation} return {'status': 'error', 'message': 'Failed to move left'} def aim_right(self) -> Dict[str, Any]: """Move aim right by small increment""" new_rotation = min(90, self.current_rotation + 5) if self.esp32.move_to_position(new_rotation, self.current_elevation): self.current_rotation = new_rotation return {'status': 'success', 'rotation': self.current_rotation} return {'status': 'error', 'message': 'Failed to move right'} def fire_oreo(self, elevation: float = None) -> Dict[str, Any]: """Fire an Oreo at the specified elevation""" if elevation is not None: # Move to target elevation first self.current_elevation = max(0, min(60, elevation)) if not self.esp32.move_to_position(self.current_rotation, self.current_elevation): return {'status': 'error', 'message': 'Failed to move to position'} time.sleep(0.5) # Wait for positioning logger.info(f"FIRING OREO at rotation={self.current_rotation}°, elevation={self.current_elevation}°!") if self.esp32.fire_oreo(): return { 'status': 'success', 'message': f'Oreo fired at R:{self.current_rotation}° E:{self.current_elevation}°', 'rotation': self.current_rotation, 'elevation': self.current_elevation } else: return {'status': 'error', 'message': 'Fire mechanism failed'} def home_device(self) -> Dict[str, Any]: """Home the device to its reference position""" logger.info("Homing device...") if self.esp32.home_motors(): self.current_rotation = 0.0 self.current_elevation = 0.0 self.is_homed = True return { 'status': 'success', 'message': 'Device homed successfully', 'rotation': 0, 'elevation': 0 } else: return {'status': 'error', 'message': 'Homing failed'} def set_mode(self, mode: str) -> Dict[str, Any]: """Set operating mode (auto/manual)""" self.is_auto_mode = (mode.lower() == 'auto') logger.info(f"Mode set to: {mode}") if self.is_auto_mode: logger.info("🎯 AUTOMATIC MODE ENABLED - Seeking open mouths!") else: logger.info("🎮 Manual mode enabled") return { 'status': 'success', 'mode': mode, 'auto_mode': self.is_auto_mode } def auto_mode_worker(self): """Worker thread for automatic mouth detection and firing""" while self.running: try: if self.is_auto_mode and CAMERA_AVAILABLE and self.camera: # Capture frame for analysis frame = self.camera.capture_array() # Detect open mouths mouths = self.mouth_detector.detect_open_mouths(frame) if mouths and self.auto_fire_enabled: current_time = time.time() if current_time - self.last_target_time > self.target_cooldown: # Target the most confident mouth best_mouth = mouths[0] mx, my, mw, mh, confidence = best_mouth # Calculate mouth center and face width (estimate from mouth width) mouth_center_x = mx + mw // 2 mouth_center_y = my + mh // 2 estimated_face_width = int(mw * 2.5) # Face is roughly 2.5x wider than mouth # Calculate centering adjustment rotation_adj, elevation_adj, distance = self.mouth_detector.calculate_centering_adjustment( mouth_center_x, mouth_center_y, estimated_face_width ) logger.info(f"🎯 AUTO TARGET: Mouth detected (confidence {confidence:.2f}, distance ~{distance:.0f}cm)") # Only adjust if mouth is not already centered if abs(rotation_adj) > 0.1 or abs(elevation_adj) > 0.1: # Calculate new position with adjustments new_rotation = max(-90, min(90, self.current_rotation + rotation_adj)) new_elevation = max(0, min(60, self.current_elevation + elevation_adj)) logger.info(f"🎯 CENTERING: Adjusting R:{rotation_adj:+.1f}° E:{elevation_adj:+.1f}° -> R:{new_rotation:.1f}° E:{new_elevation:.1f}°") # Move to center the target if self.esp32.move_to_position(new_rotation, new_elevation): self.current_rotation = new_rotation self.current_elevation = new_elevation time.sleep(0.5) # Wait for positioning else: logger.info("🎯 TARGET CENTERED: Mouth already in optimal position") # Check if mouth is now well-centered for firing dx = mouth_center_x - self.mouth_detector.center_x dy = mouth_center_y - self.mouth_detector.center_y center_distance = math.sqrt(dx*dx + dy*dy) if center_distance < self.mouth_detector.target_deadzone_pixels * 1.5: # Allow some tolerance # Fire! logger.info(f"🔥 AUTO FIRE: Launching Oreo at centered target! (offset: {center_distance:.0f}px)") self.esp32.fire_oreo() self.last_target_time = current_time else: logger.info(f"🎯 TARGET NOT CENTERED: Waiting for better positioning (offset: {center_distance:.0f}px)") time.sleep(0.1) # Check at ~10fps except Exception as e: logger.error(f"Auto mode error: {e}") time.sleep(1) def capture_photo(self, client_socket) -> Dict[str, Any]: """Capture a high-resolution photo""" if not CAMERA_AVAILABLE: logger.info("SIMULATED: Photo captured") return {'status': 'success', 'message': 'Photo captured (simulated)'} try: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"octv2_photo_{timestamp}.jpg" self.camera.capture_file(filename) # Send photo back to app with open(filename, 'rb') as f: photo_data = f.read() client_socket.send(photo_data) return { 'status': 'success', 'filename': filename, 'message': 'Photo captured and saved' } except Exception as e: logger.error(f"Photo capture failed: {e}") return {'status': 'error', 'message': str(e)} def start_video_stream(self, client_socket) -> Dict[str, Any]: """Start video streaming to client""" if client_socket not in self.streaming_clients: self.streaming_clients.append(client_socket) if not self.stream_thread or not self.stream_thread.is_alive(): self.stream_thread = threading.Thread(target=self.video_stream_worker) self.stream_thread.daemon = True self.stream_thread.start() return {'status': 'success', 'message': 'Video stream started'} def stop_video_stream(self, client_socket) -> Dict[str, Any]: """Stop video streaming to client""" if client_socket in self.streaming_clients: self.streaming_clients.remove(client_socket) return {'status': 'success', 'message': 'Video stream stopped'} def video_stream_worker(self): """Worker thread for video streaming with mouth detection overlay""" if not CAMERA_AVAILABLE: logger.info("SIMULATED: Video streaming started") return try: while self.streaming_clients: # Capture frame frame = self.camera.capture_array() # Add mouth detection overlay in auto mode if self.is_auto_mode: mouths = self.mouth_detector.detect_open_mouths(frame) self._add_mouth_detection_overlay(frame, mouths) # Draw targeting crosshair and deadzone center_x, center_y = frame.shape[1] // 2, frame.shape[0] // 2 deadzone = self.mouth_detector.target_deadzone_pixels # Main crosshair cv2.line(frame, (center_x-20, center_y), (center_x+20, center_y), (255, 0, 0), 2) cv2.line(frame, (center_x, center_y-20), (center_x, center_y+20), (255, 0, 0), 2) # Deadzone circle cv2.circle(frame, (center_x, center_y), deadzone, (255, 0, 0), 2) cv2.putText(frame, 'TARGET ZONE', (center_x-50, center_y+deadzone+20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 2) # Convert to JPEG _, jpeg = cv2.imencode('.jpg', frame) frame_data = jpeg.tobytes() # Send to all streaming clients for client in self.streaming_clients[:]: try: client.send(frame_data) except Exception as e: logger.error(f"Failed to send frame to client: {e}") self.streaming_clients.remove(client) time.sleep(0.1) # ~10 FPS except Exception as e: logger.error(f"Video streaming error: {e}") def get_status(self) -> Dict[str, Any]: """Return current device status""" # Get actual position from ESP32 actual_rotation, actual_elevation = self.esp32.get_position() return { 'status': 'success', 'rotation': actual_rotation, 'elevation': actual_elevation, 'auto_mode': self.is_auto_mode, 'homed': self.is_homed, 'streaming_clients': len(self.streaming_clients), 'total_clients': len(self.clients), 'esp32_connected': self.esp32.serial_conn is not None } def cleanup(self): """Clean up resources""" self.running = False if self.camera and CAMERA_AVAILABLE: self.camera.stop() if self.esp32.serial_conn: self.esp32.serial_conn.close() logger.info("Server shutdown complete") def _add_mouth_detection_overlay(self, frame, mouths): """Add visual overlay showing mouth detection results""" if not self.mouth_detector.use_dlib: # Simple overlay for basic detection for mx, my, mw, mh, confidence in mouths: color = (0, 255, 0) if confidence > 0.5 else (0, 255, 255) cv2.rectangle(frame, (mx, my), (mx + mw, my + mh), color, 2) cv2.putText(frame, f'{confidence:.2f}', (mx, my-10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2) return # Advanced overlay showing all mouth states gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) faces = self.mouth_detector.detector(gray) for face in faces: landmarks = self.mouth_detector.predictor(gray, face) mouth_state, confidence = self.mouth_detector._analyze_mouth_state(landmarks) # Get face bounding box x1, y1, x2, y2 = face.left(), face.top(), face.right(), face.bottom() # Color coding for different states state_colors = { "WIDE_OPEN": (0, 255, 0), # Bright green - TARGET! "SPEAKING": (0, 165, 255), # Orange "SMILING": (255, 255, 0), # Cyan "CLOSED": (128, 128, 128) # Gray } color = state_colors.get(mouth_state, (128, 128, 128)) # Draw face rectangle if mouth_state == "WIDE_OPEN": # Thick border for targets cv2.rectangle(frame, (x1, y1), (x2, y2), color, 4) # Add target indicator cv2.putText(frame, "🎯 TARGET!", (x1, y1-40), cv2.FONT_HERSHEY_SIMPLEX, 0.8, color, 2) else: # Thin border for non-targets cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2) # Show mouth state, confidence, and distance cv2.putText(frame, f'{mouth_state}', (x1, y1-20), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2) cv2.putText(frame, f'{confidence:.2f}', (x1, y2+20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2) # Add distance estimation for WIDE_OPEN mouths if mouth_state == "WIDE_OPEN": face_width = x2 - x1 distance = self.mouth_detector.estimate_distance(face_width) cv2.putText(frame, f'~{distance:.0f}cm', (x1, y2+40), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2) # Draw mouth landmarks for WIDE_OPEN targets if mouth_state == "WIDE_OPEN": # Draw mouth contour mouth_points = [] for i in range(48, 68): point = landmarks.part(i) mouth_points.append((point.x, point.y)) if i in [48, 54, 62, 66]: # Key points cv2.circle(frame, (point.x, point.y), 3, (0, 255, 0), -1) # Draw mouth outline mouth_points = np.array(mouth_points, dtype=np.int32) cv2.polylines(frame, [mouth_points], True, (0, 255, 0), 2) # Add detection stats total_faces = len(faces) wide_open_count = len([m for m in mouths if len(m) > 4]) # mouths from detect_open_mouths only contain WIDE_OPEN cv2.putText(frame, f'Faces: {total_faces}', (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2) cv2.putText(frame, f'Targets: {wide_open_count}', (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2) def main(): """Main entry point""" print("🍪 OCTv2 (Oreo Cookie Thrower v2) Server v2 Starting...") print("🤖 Features: ESP32 Control + Automatic Mouth Detection") server = OCTv2Server() try: server.start_server() except KeyboardInterrupt: print("\n🛑 Shutting down OCTv2 server...") server.cleanup() if __name__ == "__main__": main()