123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830 |
- #!/usr/bin/env python3
- """
- OCTv2 (Oreo Cookie Thrower v2) - Raspberry Pi Server v2
- - ESP32 serial communication for motor control
- - Automatic mouth detection and targeting
- - Stepper motor control for rotation and elevation
- """
- import socket
- import json
- import threading
- import time
- import logging
- import serial
- import cv2
- import numpy as np
- from datetime import datetime
- from typing import Dict, Any, Optional, Tuple, List
- import io
- import os
- import math
- # Camera imports
- try:
- from picamera2 import Picamera2
- CAMERA_AVAILABLE = True
- except ImportError:
- print("Camera not available - running in simulation mode")
- CAMERA_AVAILABLE = False
- # Configure logging
- logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s - %(levelname)s - %(message)s'
- )
- logger = logging.getLogger(__name__)
- class ESP32Controller:
- """Handle serial communication with ESP32 for motor control"""
- def __init__(self, port='/dev/ttyUSB0', baudrate=115200):
- self.port = port
- self.baudrate = baudrate
- self.serial_conn = None
- self.connect()
- def connect(self):
- """Connect to ESP32 via serial"""
- try:
- self.serial_conn = serial.Serial(self.port, self.baudrate, timeout=1)
- time.sleep(2) # Wait for ESP32 to initialize
- logger.info(f"Connected to ESP32 on {self.port}")
- return True
- except Exception as e:
- logger.error(f"Failed to connect to ESP32: {e}")
- return False
- def send_command(self, command: str) -> str:
- """Send command to ESP32 and get response"""
- if not self.serial_conn:
- logger.error("ESP32 not connected")
- return "ERROR: Not connected"
- try:
- self.serial_conn.write(f"{command}\n".encode())
- response = self.serial_conn.readline().decode().strip()
- logger.debug(f"ESP32 Command: {command} -> Response: {response}")
- return response
- except Exception as e:
- logger.error(f"ESP32 communication error: {e}")
- return "ERROR: Communication failed"
- def home_motors(self) -> bool:
- """Home both rotation and elevation motors"""
- response = self.send_command("HOME")
- return response == "OK"
- def move_to_position(self, rotation_degrees: float, elevation_degrees: float) -> bool:
- """Move to absolute position (rotation: -90 to +90, elevation: 0 to 60)"""
- cmd = f"MOVE {rotation_degrees:.1f} {elevation_degrees:.1f}"
- response = self.send_command(cmd)
- return response == "OK"
- def move_relative(self, delta_rotation: float, delta_elevation: float) -> bool:
- """Move relative to current position"""
- cmd = f"REL {delta_rotation:.1f} {delta_elevation:.1f}"
- response = self.send_command(cmd)
- return response == "OK"
- def fire_oreo(self) -> bool:
- """Trigger the firing mechanism"""
- response = self.send_command("FIRE")
- return response == "OK"
- def get_position(self) -> Tuple[float, float]:
- """Get current position (rotation, elevation)"""
- response = self.send_command("POS")
- try:
- parts = response.split()
- if len(parts) == 2:
- return float(parts[0]), float(parts[1])
- except:
- pass
- return 0.0, 0.0
- class MouthDetector:
- """Detect open mouths in camera feed for automatic targeting"""
- def __init__(self):
- # Load OpenCV face cascade classifier
- self.face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
- # For actual open mouth detection, we'll use facial landmarks
- try:
- import dlib
- # Download shape predictor: http://dlib.net/files/shape_predictor_68_face_landmarks.dat.bz2
- self.predictor = dlib.shape_predictor("shape_predictor_68_face_landmarks.dat")
- self.detector = dlib.get_frontal_face_detector()
- self.use_dlib = True
- logger.info("Using dlib for precise mouth detection")
- except ImportError:
- logger.warning("dlib not available - using basic mouth area detection")
- self.use_dlib = False
- # Camera parameters for targeting calculations
- self.camera_width = 640
- self.camera_height = 480
- self.center_x = self.camera_width // 2
- self.center_y = self.camera_height // 2
- # Camera-follows-aim targeting parameters
- self.target_deadzone_pixels = 30 # Don't adjust if mouth is within this radius of center
- self.max_adjustment_degrees = 10 # Maximum single adjustment per iteration
- # Distance estimation parameters (based on average human face size)
- self.average_face_width_cm = 16.0 # Average human face width
- self.camera_focal_length_mm = 3.04 # Pi Camera focal length
- self.sensor_width_mm = 3.68 # Pi Camera sensor width
- # Aiming offsets (configurable for mechanical compensation)
- self.rotation_offset_degrees = 0.0 # Adjust if camera/launcher not perfectly aligned
- self.elevation_offset_degrees = 0.0 # Adjust for gravity compensation
- self.distance_elevation_factor = 0.5 # Elevation adjustment based on distance
- logger.info("Mouth detector initialized")
- def detect_open_mouths(self, frame) -> List[Tuple[int, int, int, int, float]]:
- """
- Detect open mouths in frame
- Returns list of (x, y, w, h, confidence) for each detected mouth
- """
- gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
- mouths = []
- if self.use_dlib:
- # Use dlib for precise facial landmark detection
- faces = self.detector(gray)
- for face in faces:
- landmarks = self.predictor(gray, face)
- # Get mouth landmarks (points 48-67 in 68-point model)
- mouth_points = []
- for i in range(48, 68):
- point = landmarks.part(i)
- mouth_points.append((point.x, point.y))
- # Advanced mouth analysis for WIDE-OPEN detection only
- mouth_state, confidence = self._analyze_mouth_state(landmarks)
- # Only target WIDE_OPEN mouths (not speaking or smiling)
- if mouth_state == "WIDE_OPEN":
- # Calculate bounding box around mouth
- mouth_xs = [p[0] for p in mouth_points]
- mouth_ys = [p[1] for p in mouth_points]
- mx = min(mouth_xs) - 10
- my = min(mouth_ys) - 10
- mw = max(mouth_xs) - min(mouth_xs) + 20
- mh = max(mouth_ys) - min(mouth_ys) + 20
- mouths.append((mx, my, mw, mh, confidence))
- else:
- # Fallback to basic face detection + mouth area estimation
- faces = self.face_cascade.detectMultiScale(gray, 1.3, 5)
- for (fx, fy, fw, fh) in faces:
- # Estimate mouth area in lower third of face
- mouth_x = fx + int(fw * 0.25)
- mouth_y = fy + int(fh * 0.6)
- mouth_w = int(fw * 0.5)
- mouth_h = int(fh * 0.3)
- # Use intensity variance as proxy for open mouth
- mouth_roi = gray[mouth_y:mouth_y+mouth_h, mouth_x:mouth_x+mouth_w]
- if mouth_roi.size > 0:
- variance = np.var(mouth_roi)
- # Higher variance might indicate teeth/tongue visibility
- confidence = min(1.0, variance / 1000.0) # Adjust threshold
- if confidence > 0.3: # Minimum confidence threshold
- mouths.append((mouth_x, mouth_y, mouth_w, mouth_h, confidence))
- # Sort by confidence (highest first)
- mouths.sort(key=lambda x: x[4], reverse=True)
- return mouths
- def _analyze_mouth_state(self, landmarks) -> Tuple[str, float]:
- """
- Analyze mouth landmarks to determine state and confidence
- Returns: (state, confidence) where state is CLOSED, SPEAKING, SMILING, or WIDE_OPEN
- """
- # Key mouth landmark points
- # Outer lip: 48-54 (top), 54-60 (bottom)
- # Inner lip: 60-64 (top), 64-68 (bottom)
- # Vertical measurements (mouth opening)
- outer_top = landmarks.part(51) # Top of upper lip (center)
- outer_bottom = landmarks.part(57) # Bottom of lower lip (center)
- inner_top = landmarks.part(62) # Top of inner lip
- inner_bottom = landmarks.part(66) # Bottom of inner lip
- # Horizontal measurements (mouth width)
- left_corner = landmarks.part(48) # Left mouth corner
- right_corner = landmarks.part(54) # Right mouth corner
- # Calculate dimensions
- outer_height = abs(outer_top.y - outer_bottom.y)
- inner_height = abs(inner_top.y - inner_bottom.y)
- mouth_width = abs(right_corner.x - left_corner.x)
- # Calculate ratios
- outer_aspect_ratio = outer_height / mouth_width if mouth_width > 0 else 0
- inner_aspect_ratio = inner_height / mouth_width if mouth_width > 0 else 0
- # Calculate lip separation (distance between inner and outer lip)
- lip_thickness_top = abs(outer_top.y - inner_top.y)
- lip_thickness_bottom = abs(outer_bottom.y - inner_bottom.y)
- avg_lip_thickness = (lip_thickness_top + lip_thickness_bottom) / 2
- # Determine mouth state based on multiple criteria
- # WIDE_OPEN: Large inner opening + significant lip separation
- if (inner_aspect_ratio > 0.6 and
- outer_aspect_ratio > 0.4 and
- avg_lip_thickness > 8): # Pixels of lip separation
- confidence = min(1.0, inner_aspect_ratio * 1.5)
- return "WIDE_OPEN", confidence
- # SPEAKING: Moderate opening but less lip separation
- elif (inner_aspect_ratio > 0.3 and
- outer_aspect_ratio > 0.2 and
- avg_lip_thickness > 3):
- confidence = min(1.0, inner_aspect_ratio * 0.8)
- return "SPEAKING", confidence
- # SMILING: Wide mouth but minimal vertical opening
- elif (mouth_width > 40 and # Wider than normal
- outer_aspect_ratio < 0.25 and
- inner_aspect_ratio < 0.2):
- # Check if corners are raised (smile detection)
- mouth_center_y = (outer_top.y + outer_bottom.y) / 2
- corner_raise = mouth_center_y - ((left_corner.y + right_corner.y) / 2)
- if corner_raise > 3: # Corners raised above center
- return "SMILING", 0.3
- # Default: CLOSED
- return "CLOSED", 0.1
- def estimate_distance(self, face_width_pixels: int) -> float:
- """
- Estimate distance to face based on face width in pixels
- Returns distance in centimeters
- """
- if face_width_pixels <= 0:
- return 100.0 # Default fallback distance
- # Distance = (real_face_width * focal_length * image_width) / (face_width_pixels * sensor_width)
- distance_cm = (
- self.average_face_width_cm *
- self.camera_focal_length_mm *
- self.camera_width
- ) / (face_width_pixels * self.sensor_width_mm)
- # Clamp to reasonable range (50cm to 500cm)
- return max(50.0, min(500.0, distance_cm))
- def calculate_centering_adjustment(self, mouth_x: int, mouth_y: int, face_width: int) -> Tuple[float, float, float]:
- """
- Calculate motor adjustments to center the mouth in camera view
- Returns: (rotation_adjustment, elevation_adjustment, estimated_distance)
- """
- # Calculate offset from center
- dx = mouth_x - self.center_x
- dy = mouth_y - self.center_y
- distance_from_center = math.sqrt(dx*dx + dy*dy)
- # Estimate distance for context
- estimated_distance = self.estimate_distance(face_width)
- # If mouth is already centered (within deadzone), no adjustment needed
- if distance_from_center < self.target_deadzone_pixels:
- return 0.0, 0.0, estimated_distance
- # Calculate adjustment angles based on pixel offset
- # Larger faces (closer) need smaller adjustments, smaller faces (farther) need larger adjustments
- distance_factor = max(0.5, min(2.0, 100.0 / estimated_distance)) # Scale adjustments by distance
- # Convert pixel offset to approximate angle adjustment
- # This is empirical - you'll need to tune these values for your setup
- pixels_per_degree_rotation = 15 * distance_factor # Adjust based on your camera/motor setup
- pixels_per_degree_elevation = 12 * distance_factor # Adjust based on your camera/motor setup
- rotation_adjustment = dx / pixels_per_degree_rotation
- elevation_adjustment = -dy / pixels_per_degree_elevation # Negative because Y increases downward
- # Apply configured offsets
- rotation_adjustment += self.rotation_offset_degrees
- elevation_adjustment += self.elevation_offset_degrees
- # Add distance-based elevation compensation (closer targets need higher elevation)
- distance_elevation_compensation = (200.0 - estimated_distance) * self.distance_elevation_factor / 100.0
- elevation_adjustment += distance_elevation_compensation
- # Clamp to maximum adjustment per iteration
- rotation_adjustment = max(-self.max_adjustment_degrees,
- min(self.max_adjustment_degrees, rotation_adjustment))
- elevation_adjustment = max(-self.max_adjustment_degrees,
- min(self.max_adjustment_degrees, elevation_adjustment))
- return rotation_adjustment, elevation_adjustment, estimated_distance
- class OCTv2Server:
- def __init__(self, host='0.0.0.0', port=8080):
- self.host = host
- self.port = port
- self.running = False
- self.clients = []
- # Hardware components
- self.esp32 = ESP32Controller()
- self.mouth_detector = MouthDetector()
- # Hardware state
- self.current_rotation = 0.0 # -90 to +90 degrees
- self.current_elevation = 30.0 # 0 to 60 degrees
- self.is_auto_mode = False
- self.is_homed = False
- self.auto_fire_enabled = True
- # Camera state
- self.camera = None
- self.streaming_clients = []
- self.stream_thread = None
- # Auto mode state
- self.auto_mode_thread = None
- self.last_target_time = 0
- self.target_cooldown = 2.0 # Seconds between automatic shots
- self.setup_camera()
- def setup_camera(self):
- """Initialize camera"""
- if not CAMERA_AVAILABLE:
- logger.info("Camera not available - simulating camera")
- return
- try:
- self.camera = Picamera2()
- config = self.camera.create_preview_configuration(
- main={"size": (640, 480)},
- lores={"size": (320, 240)},
- display="lores"
- )
- self.camera.configure(config)
- self.camera.start()
- logger.info("Camera initialized successfully")
- except Exception as e:
- logger.error(f"Camera setup failed: {e}")
- def start_server(self):
- """Start the TCP server"""
- self.running = True
- server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- try:
- server_socket.bind((self.host, self.port))
- server_socket.listen(5)
- logger.info(f"OCTv2 Server v2 listening on {self.host}:{self.port}")
- # Start auto mode thread
- self.auto_mode_thread = threading.Thread(target=self.auto_mode_worker)
- self.auto_mode_thread.daemon = True
- self.auto_mode_thread.start()
- while self.running:
- try:
- client_socket, address = server_socket.accept()
- logger.info(f"Client connected from {address}")
- client_thread = threading.Thread(
- target=self.handle_client,
- args=(client_socket, address)
- )
- client_thread.daemon = True
- client_thread.start()
- except Exception as e:
- if self.running:
- logger.error(f"Error accepting client: {e}")
- except Exception as e:
- logger.error(f"Server error: {e}")
- finally:
- server_socket.close()
- self.cleanup()
- def handle_client(self, client_socket, address):
- """Handle individual client connections"""
- self.clients.append(client_socket)
- try:
- while self.running:
- data = client_socket.recv(1024)
- if not data:
- break
- try:
- command = json.loads(data.decode('utf-8'))
- logger.info(f"Received command: {command}")
- response = self.process_command(command, client_socket)
- if response:
- client_socket.send(json.dumps(response).encode('utf-8'))
- except json.JSONDecodeError:
- logger.error("Invalid JSON received")
- except Exception as e:
- logger.error(f"Error processing command: {e}")
- except Exception as e:
- logger.error(f"Client {address} error: {e}")
- finally:
- if client_socket in self.clients:
- self.clients.remove(client_socket)
- if client_socket in self.streaming_clients:
- self.streaming_clients.remove(client_socket)
- client_socket.close()
- logger.info(f"Client {address} disconnected")
- def process_command(self, command: Dict[str, Any], client_socket) -> Optional[Dict[str, Any]]:
- """Process commands from iOS app"""
- action = command.get('action')
- logger.info(f"Processing action: {action}")
- if action == 'aim_left':
- return self.aim_left()
- elif action == 'aim_right':
- return self.aim_right()
- elif action == 'fire':
- angle = command.get('angle', self.current_elevation)
- return self.fire_oreo(angle)
- elif action == 'home':
- return self.home_device()
- elif action == 'set_mode':
- mode = command.get('mode', 'manual')
- return self.set_mode(mode)
- elif action == 'capture_photo':
- return self.capture_photo(client_socket)
- elif action == 'start_video_stream':
- return self.start_video_stream(client_socket)
- elif action == 'stop_video_stream':
- return self.stop_video_stream(client_socket)
- elif action == 'status':
- return self.get_status()
- else:
- return {'error': f'Unknown action: {action}'}
- def aim_left(self) -> Dict[str, Any]:
- """Move aim left by small increment"""
- new_rotation = max(-90, self.current_rotation - 5)
- if self.esp32.move_to_position(new_rotation, self.current_elevation):
- self.current_rotation = new_rotation
- return {'status': 'success', 'rotation': self.current_rotation}
- return {'status': 'error', 'message': 'Failed to move left'}
- def aim_right(self) -> Dict[str, Any]:
- """Move aim right by small increment"""
- new_rotation = min(90, self.current_rotation + 5)
- if self.esp32.move_to_position(new_rotation, self.current_elevation):
- self.current_rotation = new_rotation
- return {'status': 'success', 'rotation': self.current_rotation}
- return {'status': 'error', 'message': 'Failed to move right'}
- def fire_oreo(self, elevation: float = None) -> Dict[str, Any]:
- """Fire an Oreo at the specified elevation"""
- if elevation is not None:
- # Move to target elevation first
- self.current_elevation = max(0, min(60, elevation))
- if not self.esp32.move_to_position(self.current_rotation, self.current_elevation):
- return {'status': 'error', 'message': 'Failed to move to position'}
- time.sleep(0.5) # Wait for positioning
- logger.info(f"FIRING OREO at rotation={self.current_rotation}°, elevation={self.current_elevation}°!")
- if self.esp32.fire_oreo():
- return {
- 'status': 'success',
- 'message': f'Oreo fired at R:{self.current_rotation}° E:{self.current_elevation}°',
- 'rotation': self.current_rotation,
- 'elevation': self.current_elevation
- }
- else:
- return {'status': 'error', 'message': 'Fire mechanism failed'}
- def home_device(self) -> Dict[str, Any]:
- """Home the device to its reference position"""
- logger.info("Homing device...")
- if self.esp32.home_motors():
- self.current_rotation = 0.0
- self.current_elevation = 0.0
- self.is_homed = True
- return {
- 'status': 'success',
- 'message': 'Device homed successfully',
- 'rotation': 0,
- 'elevation': 0
- }
- else:
- return {'status': 'error', 'message': 'Homing failed'}
- def set_mode(self, mode: str) -> Dict[str, Any]:
- """Set operating mode (auto/manual)"""
- self.is_auto_mode = (mode.lower() == 'auto')
- logger.info(f"Mode set to: {mode}")
- if self.is_auto_mode:
- logger.info("🎯 AUTOMATIC MODE ENABLED - Seeking open mouths!")
- else:
- logger.info("🎮 Manual mode enabled")
- return {
- 'status': 'success',
- 'mode': mode,
- 'auto_mode': self.is_auto_mode
- }
- def auto_mode_worker(self):
- """Worker thread for automatic mouth detection and firing"""
- while self.running:
- try:
- if self.is_auto_mode and CAMERA_AVAILABLE and self.camera:
- # Capture frame for analysis
- frame = self.camera.capture_array()
- # Detect open mouths
- mouths = self.mouth_detector.detect_open_mouths(frame)
- if mouths and self.auto_fire_enabled:
- current_time = time.time()
- if current_time - self.last_target_time > self.target_cooldown:
- # Target the most confident mouth
- best_mouth = mouths[0]
- mx, my, mw, mh, confidence = best_mouth
- # Calculate mouth center and face width (estimate from mouth width)
- mouth_center_x = mx + mw // 2
- mouth_center_y = my + mh // 2
- estimated_face_width = int(mw * 2.5) # Face is roughly 2.5x wider than mouth
- # Calculate centering adjustment
- rotation_adj, elevation_adj, distance = self.mouth_detector.calculate_centering_adjustment(
- mouth_center_x, mouth_center_y, estimated_face_width
- )
- logger.info(f"🎯 AUTO TARGET: Mouth detected (confidence {confidence:.2f}, distance ~{distance:.0f}cm)")
- # Only adjust if mouth is not already centered
- if abs(rotation_adj) > 0.1 or abs(elevation_adj) > 0.1:
- # Calculate new position with adjustments
- new_rotation = max(-90, min(90, self.current_rotation + rotation_adj))
- new_elevation = max(0, min(60, self.current_elevation + elevation_adj))
- logger.info(f"🎯 CENTERING: Adjusting R:{rotation_adj:+.1f}° E:{elevation_adj:+.1f}° -> R:{new_rotation:.1f}° E:{new_elevation:.1f}°")
- # Move to center the target
- if self.esp32.move_to_position(new_rotation, new_elevation):
- self.current_rotation = new_rotation
- self.current_elevation = new_elevation
- time.sleep(0.5) # Wait for positioning
- else:
- logger.info("🎯 TARGET CENTERED: Mouth already in optimal position")
- # Check if mouth is now well-centered for firing
- dx = mouth_center_x - self.mouth_detector.center_x
- dy = mouth_center_y - self.mouth_detector.center_y
- center_distance = math.sqrt(dx*dx + dy*dy)
- if center_distance < self.mouth_detector.target_deadzone_pixels * 1.5: # Allow some tolerance
- # Fire!
- logger.info(f"🔥 AUTO FIRE: Launching Oreo at centered target! (offset: {center_distance:.0f}px)")
- self.esp32.fire_oreo()
- self.last_target_time = current_time
- else:
- logger.info(f"🎯 TARGET NOT CENTERED: Waiting for better positioning (offset: {center_distance:.0f}px)")
- time.sleep(0.1) # Check at ~10fps
- except Exception as e:
- logger.error(f"Auto mode error: {e}")
- time.sleep(1)
- def capture_photo(self, client_socket) -> Dict[str, Any]:
- """Capture a high-resolution photo"""
- if not CAMERA_AVAILABLE:
- logger.info("SIMULATED: Photo captured")
- return {'status': 'success', 'message': 'Photo captured (simulated)'}
- try:
- timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
- filename = f"octv2_photo_{timestamp}.jpg"
- self.camera.capture_file(filename)
- # Send photo back to app
- with open(filename, 'rb') as f:
- photo_data = f.read()
- client_socket.send(photo_data)
- return {
- 'status': 'success',
- 'filename': filename,
- 'message': 'Photo captured and saved'
- }
- except Exception as e:
- logger.error(f"Photo capture failed: {e}")
- return {'status': 'error', 'message': str(e)}
- def start_video_stream(self, client_socket) -> Dict[str, Any]:
- """Start video streaming to client"""
- if client_socket not in self.streaming_clients:
- self.streaming_clients.append(client_socket)
- if not self.stream_thread or not self.stream_thread.is_alive():
- self.stream_thread = threading.Thread(target=self.video_stream_worker)
- self.stream_thread.daemon = True
- self.stream_thread.start()
- return {'status': 'success', 'message': 'Video stream started'}
- def stop_video_stream(self, client_socket) -> Dict[str, Any]:
- """Stop video streaming to client"""
- if client_socket in self.streaming_clients:
- self.streaming_clients.remove(client_socket)
- return {'status': 'success', 'message': 'Video stream stopped'}
- def video_stream_worker(self):
- """Worker thread for video streaming with mouth detection overlay"""
- if not CAMERA_AVAILABLE:
- logger.info("SIMULATED: Video streaming started")
- return
- try:
- while self.streaming_clients:
- # Capture frame
- frame = self.camera.capture_array()
- # Add mouth detection overlay in auto mode
- if self.is_auto_mode:
- mouths = self.mouth_detector.detect_open_mouths(frame)
- self._add_mouth_detection_overlay(frame, mouths)
- # Draw targeting crosshair and deadzone
- center_x, center_y = frame.shape[1] // 2, frame.shape[0] // 2
- deadzone = self.mouth_detector.target_deadzone_pixels
- # Main crosshair
- cv2.line(frame, (center_x-20, center_y), (center_x+20, center_y), (255, 0, 0), 2)
- cv2.line(frame, (center_x, center_y-20), (center_x, center_y+20), (255, 0, 0), 2)
- # Deadzone circle
- cv2.circle(frame, (center_x, center_y), deadzone, (255, 0, 0), 2)
- cv2.putText(frame, 'TARGET ZONE', (center_x-50, center_y+deadzone+20),
- cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 2)
- # Convert to JPEG
- _, jpeg = cv2.imencode('.jpg', frame)
- frame_data = jpeg.tobytes()
- # Send to all streaming clients
- for client in self.streaming_clients[:]:
- try:
- client.send(frame_data)
- except Exception as e:
- logger.error(f"Failed to send frame to client: {e}")
- self.streaming_clients.remove(client)
- time.sleep(0.1) # ~10 FPS
- except Exception as e:
- logger.error(f"Video streaming error: {e}")
- def get_status(self) -> Dict[str, Any]:
- """Return current device status"""
- # Get actual position from ESP32
- actual_rotation, actual_elevation = self.esp32.get_position()
- return {
- 'status': 'success',
- 'rotation': actual_rotation,
- 'elevation': actual_elevation,
- 'auto_mode': self.is_auto_mode,
- 'homed': self.is_homed,
- 'streaming_clients': len(self.streaming_clients),
- 'total_clients': len(self.clients),
- 'esp32_connected': self.esp32.serial_conn is not None
- }
- def cleanup(self):
- """Clean up resources"""
- self.running = False
- if self.camera and CAMERA_AVAILABLE:
- self.camera.stop()
- if self.esp32.serial_conn:
- self.esp32.serial_conn.close()
- logger.info("Server shutdown complete")
- def _add_mouth_detection_overlay(self, frame, mouths):
- """Add visual overlay showing mouth detection results"""
- if not self.mouth_detector.use_dlib:
- # Simple overlay for basic detection
- for mx, my, mw, mh, confidence in mouths:
- color = (0, 255, 0) if confidence > 0.5 else (0, 255, 255)
- cv2.rectangle(frame, (mx, my), (mx + mw, my + mh), color, 2)
- cv2.putText(frame, f'{confidence:.2f}', (mx, my-10),
- cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)
- return
- # Advanced overlay showing all mouth states
- gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
- faces = self.mouth_detector.detector(gray)
- for face in faces:
- landmarks = self.mouth_detector.predictor(gray, face)
- mouth_state, confidence = self.mouth_detector._analyze_mouth_state(landmarks)
- # Get face bounding box
- x1, y1, x2, y2 = face.left(), face.top(), face.right(), face.bottom()
- # Color coding for different states
- state_colors = {
- "WIDE_OPEN": (0, 255, 0), # Bright green - TARGET!
- "SPEAKING": (0, 165, 255), # Orange
- "SMILING": (255, 255, 0), # Cyan
- "CLOSED": (128, 128, 128) # Gray
- }
- color = state_colors.get(mouth_state, (128, 128, 128))
- # Draw face rectangle
- if mouth_state == "WIDE_OPEN":
- # Thick border for targets
- cv2.rectangle(frame, (x1, y1), (x2, y2), color, 4)
- # Add target indicator
- cv2.putText(frame, "🎯 TARGET!", (x1, y1-40),
- cv2.FONT_HERSHEY_SIMPLEX, 0.8, color, 2)
- else:
- # Thin border for non-targets
- cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
- # Show mouth state, confidence, and distance
- cv2.putText(frame, f'{mouth_state}', (x1, y1-20),
- cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)
- cv2.putText(frame, f'{confidence:.2f}', (x1, y2+20),
- cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
- # Add distance estimation for WIDE_OPEN mouths
- if mouth_state == "WIDE_OPEN":
- face_width = x2 - x1
- distance = self.mouth_detector.estimate_distance(face_width)
- cv2.putText(frame, f'~{distance:.0f}cm', (x1, y2+40),
- cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
- # Draw mouth landmarks for WIDE_OPEN targets
- if mouth_state == "WIDE_OPEN":
- # Draw mouth contour
- mouth_points = []
- for i in range(48, 68):
- point = landmarks.part(i)
- mouth_points.append((point.x, point.y))
- if i in [48, 54, 62, 66]: # Key points
- cv2.circle(frame, (point.x, point.y), 3, (0, 255, 0), -1)
- # Draw mouth outline
- mouth_points = np.array(mouth_points, dtype=np.int32)
- cv2.polylines(frame, [mouth_points], True, (0, 255, 0), 2)
- # Add detection stats
- total_faces = len(faces)
- wide_open_count = len([m for m in mouths if len(m) > 4]) # mouths from detect_open_mouths only contain WIDE_OPEN
- cv2.putText(frame, f'Faces: {total_faces}', (10, 30),
- cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
- cv2.putText(frame, f'Targets: {wide_open_count}', (10, 60),
- cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
- def main():
- """Main entry point"""
- print("🍪 OCTv2 (Oreo Cookie Thrower v2) Server v2 Starting...")
- print("🤖 Features: ESP32 Control + Automatic Mouth Detection")
- server = OCTv2Server()
- try:
- server.start_server()
- except KeyboardInterrupt:
- print("\n🛑 Shutting down OCTv2 server...")
- server.cleanup()
- if __name__ == "__main__":
- main()
|