|
@@ -0,0 +1,830 @@
|
|
|
+#!/usr/bin/env python3
|
|
|
+"""
|
|
|
+OCTv2 (Oreo Cookie Thrower v2) - Raspberry Pi Server v2
|
|
|
+- ESP32 serial communication for motor control
|
|
|
+- Automatic mouth detection and targeting
|
|
|
+- Stepper motor control for rotation and elevation
|
|
|
+"""
|
|
|
+
|
|
|
+import socket
|
|
|
+import json
|
|
|
+import threading
|
|
|
+import time
|
|
|
+import logging
|
|
|
+import serial
|
|
|
+import cv2
|
|
|
+import numpy as np
|
|
|
+from datetime import datetime
|
|
|
+from typing import Dict, Any, Optional, Tuple, List
|
|
|
+import io
|
|
|
+import os
|
|
|
+import math
|
|
|
+
|
|
|
+# Camera imports
|
|
|
+try:
|
|
|
+ from picamera2 import Picamera2
|
|
|
+ CAMERA_AVAILABLE = True
|
|
|
+except ImportError:
|
|
|
+ print("Camera not available - running in simulation mode")
|
|
|
+ CAMERA_AVAILABLE = False
|
|
|
+
|
|
|
+# Configure logging
|
|
|
+logging.basicConfig(
|
|
|
+ level=logging.INFO,
|
|
|
+ format='%(asctime)s - %(levelname)s - %(message)s'
|
|
|
+)
|
|
|
+logger = logging.getLogger(__name__)
|
|
|
+
|
|
|
+class ESP32Controller:
|
|
|
+ """Handle serial communication with ESP32 for motor control"""
|
|
|
+
|
|
|
+ def __init__(self, port='/dev/ttyUSB0', baudrate=115200):
|
|
|
+ self.port = port
|
|
|
+ self.baudrate = baudrate
|
|
|
+ self.serial_conn = None
|
|
|
+ self.connect()
|
|
|
+
|
|
|
+ def connect(self):
|
|
|
+ """Connect to ESP32 via serial"""
|
|
|
+ try:
|
|
|
+ self.serial_conn = serial.Serial(self.port, self.baudrate, timeout=1)
|
|
|
+ time.sleep(2) # Wait for ESP32 to initialize
|
|
|
+ logger.info(f"Connected to ESP32 on {self.port}")
|
|
|
+ return True
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"Failed to connect to ESP32: {e}")
|
|
|
+ return False
|
|
|
+
|
|
|
+ def send_command(self, command: str) -> str:
|
|
|
+ """Send command to ESP32 and get response"""
|
|
|
+ if not self.serial_conn:
|
|
|
+ logger.error("ESP32 not connected")
|
|
|
+ return "ERROR: Not connected"
|
|
|
+
|
|
|
+ try:
|
|
|
+ self.serial_conn.write(f"{command}\n".encode())
|
|
|
+ response = self.serial_conn.readline().decode().strip()
|
|
|
+ logger.debug(f"ESP32 Command: {command} -> Response: {response}")
|
|
|
+ return response
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"ESP32 communication error: {e}")
|
|
|
+ return "ERROR: Communication failed"
|
|
|
+
|
|
|
+ def home_motors(self) -> bool:
|
|
|
+ """Home both rotation and elevation motors"""
|
|
|
+ response = self.send_command("HOME")
|
|
|
+ return response == "OK"
|
|
|
+
|
|
|
+ def move_to_position(self, rotation_degrees: float, elevation_degrees: float) -> bool:
|
|
|
+ """Move to absolute position (rotation: -90 to +90, elevation: 0 to 60)"""
|
|
|
+ cmd = f"MOVE {rotation_degrees:.1f} {elevation_degrees:.1f}"
|
|
|
+ response = self.send_command(cmd)
|
|
|
+ return response == "OK"
|
|
|
+
|
|
|
+ def move_relative(self, delta_rotation: float, delta_elevation: float) -> bool:
|
|
|
+ """Move relative to current position"""
|
|
|
+ cmd = f"REL {delta_rotation:.1f} {delta_elevation:.1f}"
|
|
|
+ response = self.send_command(cmd)
|
|
|
+ return response == "OK"
|
|
|
+
|
|
|
+ def fire_oreo(self) -> bool:
|
|
|
+ """Trigger the firing mechanism"""
|
|
|
+ response = self.send_command("FIRE")
|
|
|
+ return response == "OK"
|
|
|
+
|
|
|
+ def get_position(self) -> Tuple[float, float]:
|
|
|
+ """Get current position (rotation, elevation)"""
|
|
|
+ response = self.send_command("POS")
|
|
|
+ try:
|
|
|
+ parts = response.split()
|
|
|
+ if len(parts) == 2:
|
|
|
+ return float(parts[0]), float(parts[1])
|
|
|
+ except:
|
|
|
+ pass
|
|
|
+ return 0.0, 0.0
|
|
|
+
|
|
|
+class MouthDetector:
|
|
|
+ """Detect open mouths in camera feed for automatic targeting"""
|
|
|
+
|
|
|
+ def __init__(self):
|
|
|
+ # Load OpenCV face cascade classifier
|
|
|
+ self.face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
|
|
|
+
|
|
|
+ # For actual open mouth detection, we'll use facial landmarks
|
|
|
+ try:
|
|
|
+ import dlib
|
|
|
+ # Download shape predictor: http://dlib.net/files/shape_predictor_68_face_landmarks.dat.bz2
|
|
|
+ self.predictor = dlib.shape_predictor("shape_predictor_68_face_landmarks.dat")
|
|
|
+ self.detector = dlib.get_frontal_face_detector()
|
|
|
+ self.use_dlib = True
|
|
|
+ logger.info("Using dlib for precise mouth detection")
|
|
|
+ except ImportError:
|
|
|
+ logger.warning("dlib not available - using basic mouth area detection")
|
|
|
+ self.use_dlib = False
|
|
|
+
|
|
|
+ # Camera parameters for targeting calculations
|
|
|
+ self.camera_width = 640
|
|
|
+ self.camera_height = 480
|
|
|
+ self.center_x = self.camera_width // 2
|
|
|
+ self.center_y = self.camera_height // 2
|
|
|
+
|
|
|
+ # Camera-follows-aim targeting parameters
|
|
|
+ self.target_deadzone_pixels = 30 # Don't adjust if mouth is within this radius of center
|
|
|
+ self.max_adjustment_degrees = 10 # Maximum single adjustment per iteration
|
|
|
+
|
|
|
+ # Distance estimation parameters (based on average human face size)
|
|
|
+ self.average_face_width_cm = 16.0 # Average human face width
|
|
|
+ self.camera_focal_length_mm = 3.04 # Pi Camera focal length
|
|
|
+ self.sensor_width_mm = 3.68 # Pi Camera sensor width
|
|
|
+
|
|
|
+ # Aiming offsets (configurable for mechanical compensation)
|
|
|
+ self.rotation_offset_degrees = 0.0 # Adjust if camera/launcher not perfectly aligned
|
|
|
+ self.elevation_offset_degrees = 0.0 # Adjust for gravity compensation
|
|
|
+ self.distance_elevation_factor = 0.5 # Elevation adjustment based on distance
|
|
|
+
|
|
|
+ logger.info("Mouth detector initialized")
|
|
|
+
|
|
|
+ def detect_open_mouths(self, frame) -> List[Tuple[int, int, int, int, float]]:
|
|
|
+ """
|
|
|
+ Detect open mouths in frame
|
|
|
+ Returns list of (x, y, w, h, confidence) for each detected mouth
|
|
|
+ """
|
|
|
+ gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
|
|
|
+ mouths = []
|
|
|
+
|
|
|
+ if self.use_dlib:
|
|
|
+ # Use dlib for precise facial landmark detection
|
|
|
+ faces = self.detector(gray)
|
|
|
+
|
|
|
+ for face in faces:
|
|
|
+ landmarks = self.predictor(gray, face)
|
|
|
+
|
|
|
+ # Get mouth landmarks (points 48-67 in 68-point model)
|
|
|
+ mouth_points = []
|
|
|
+ for i in range(48, 68):
|
|
|
+ point = landmarks.part(i)
|
|
|
+ mouth_points.append((point.x, point.y))
|
|
|
+
|
|
|
+ # Advanced mouth analysis for WIDE-OPEN detection only
|
|
|
+ mouth_state, confidence = self._analyze_mouth_state(landmarks)
|
|
|
+
|
|
|
+ # Only target WIDE_OPEN mouths (not speaking or smiling)
|
|
|
+ if mouth_state == "WIDE_OPEN":
|
|
|
+ # Calculate bounding box around mouth
|
|
|
+ mouth_xs = [p[0] for p in mouth_points]
|
|
|
+ mouth_ys = [p[1] for p in mouth_points]
|
|
|
+ mx = min(mouth_xs) - 10
|
|
|
+ my = min(mouth_ys) - 10
|
|
|
+ mw = max(mouth_xs) - min(mouth_xs) + 20
|
|
|
+ mh = max(mouth_ys) - min(mouth_ys) + 20
|
|
|
+
|
|
|
+ mouths.append((mx, my, mw, mh, confidence))
|
|
|
+
|
|
|
+ else:
|
|
|
+ # Fallback to basic face detection + mouth area estimation
|
|
|
+ faces = self.face_cascade.detectMultiScale(gray, 1.3, 5)
|
|
|
+
|
|
|
+ for (fx, fy, fw, fh) in faces:
|
|
|
+ # Estimate mouth area in lower third of face
|
|
|
+ mouth_x = fx + int(fw * 0.25)
|
|
|
+ mouth_y = fy + int(fh * 0.6)
|
|
|
+ mouth_w = int(fw * 0.5)
|
|
|
+ mouth_h = int(fh * 0.3)
|
|
|
+
|
|
|
+ # Use intensity variance as proxy for open mouth
|
|
|
+ mouth_roi = gray[mouth_y:mouth_y+mouth_h, mouth_x:mouth_x+mouth_w]
|
|
|
+ if mouth_roi.size > 0:
|
|
|
+ variance = np.var(mouth_roi)
|
|
|
+ # Higher variance might indicate teeth/tongue visibility
|
|
|
+ confidence = min(1.0, variance / 1000.0) # Adjust threshold
|
|
|
+
|
|
|
+ if confidence > 0.3: # Minimum confidence threshold
|
|
|
+ mouths.append((mouth_x, mouth_y, mouth_w, mouth_h, confidence))
|
|
|
+
|
|
|
+ # Sort by confidence (highest first)
|
|
|
+ mouths.sort(key=lambda x: x[4], reverse=True)
|
|
|
+ return mouths
|
|
|
+
|
|
|
+ def _analyze_mouth_state(self, landmarks) -> Tuple[str, float]:
|
|
|
+ """
|
|
|
+ Analyze mouth landmarks to determine state and confidence
|
|
|
+ Returns: (state, confidence) where state is CLOSED, SPEAKING, SMILING, or WIDE_OPEN
|
|
|
+ """
|
|
|
+ # Key mouth landmark points
|
|
|
+ # Outer lip: 48-54 (top), 54-60 (bottom)
|
|
|
+ # Inner lip: 60-64 (top), 64-68 (bottom)
|
|
|
+
|
|
|
+ # Vertical measurements (mouth opening)
|
|
|
+ outer_top = landmarks.part(51) # Top of upper lip (center)
|
|
|
+ outer_bottom = landmarks.part(57) # Bottom of lower lip (center)
|
|
|
+ inner_top = landmarks.part(62) # Top of inner lip
|
|
|
+ inner_bottom = landmarks.part(66) # Bottom of inner lip
|
|
|
+
|
|
|
+ # Horizontal measurements (mouth width)
|
|
|
+ left_corner = landmarks.part(48) # Left mouth corner
|
|
|
+ right_corner = landmarks.part(54) # Right mouth corner
|
|
|
+
|
|
|
+ # Calculate dimensions
|
|
|
+ outer_height = abs(outer_top.y - outer_bottom.y)
|
|
|
+ inner_height = abs(inner_top.y - inner_bottom.y)
|
|
|
+ mouth_width = abs(right_corner.x - left_corner.x)
|
|
|
+
|
|
|
+ # Calculate ratios
|
|
|
+ outer_aspect_ratio = outer_height / mouth_width if mouth_width > 0 else 0
|
|
|
+ inner_aspect_ratio = inner_height / mouth_width if mouth_width > 0 else 0
|
|
|
+
|
|
|
+ # Calculate lip separation (distance between inner and outer lip)
|
|
|
+ lip_thickness_top = abs(outer_top.y - inner_top.y)
|
|
|
+ lip_thickness_bottom = abs(outer_bottom.y - inner_bottom.y)
|
|
|
+ avg_lip_thickness = (lip_thickness_top + lip_thickness_bottom) / 2
|
|
|
+
|
|
|
+ # Determine mouth state based on multiple criteria
|
|
|
+
|
|
|
+ # WIDE_OPEN: Large inner opening + significant lip separation
|
|
|
+ if (inner_aspect_ratio > 0.6 and
|
|
|
+ outer_aspect_ratio > 0.4 and
|
|
|
+ avg_lip_thickness > 8): # Pixels of lip separation
|
|
|
+ confidence = min(1.0, inner_aspect_ratio * 1.5)
|
|
|
+ return "WIDE_OPEN", confidence
|
|
|
+
|
|
|
+ # SPEAKING: Moderate opening but less lip separation
|
|
|
+ elif (inner_aspect_ratio > 0.3 and
|
|
|
+ outer_aspect_ratio > 0.2 and
|
|
|
+ avg_lip_thickness > 3):
|
|
|
+ confidence = min(1.0, inner_aspect_ratio * 0.8)
|
|
|
+ return "SPEAKING", confidence
|
|
|
+
|
|
|
+ # SMILING: Wide mouth but minimal vertical opening
|
|
|
+ elif (mouth_width > 40 and # Wider than normal
|
|
|
+ outer_aspect_ratio < 0.25 and
|
|
|
+ inner_aspect_ratio < 0.2):
|
|
|
+ # Check if corners are raised (smile detection)
|
|
|
+ mouth_center_y = (outer_top.y + outer_bottom.y) / 2
|
|
|
+ corner_raise = mouth_center_y - ((left_corner.y + right_corner.y) / 2)
|
|
|
+ if corner_raise > 3: # Corners raised above center
|
|
|
+ return "SMILING", 0.3
|
|
|
+
|
|
|
+ # Default: CLOSED
|
|
|
+ return "CLOSED", 0.1
|
|
|
+
|
|
|
+ def estimate_distance(self, face_width_pixels: int) -> float:
|
|
|
+ """
|
|
|
+ Estimate distance to face based on face width in pixels
|
|
|
+ Returns distance in centimeters
|
|
|
+ """
|
|
|
+ if face_width_pixels <= 0:
|
|
|
+ return 100.0 # Default fallback distance
|
|
|
+
|
|
|
+ # Distance = (real_face_width * focal_length * image_width) / (face_width_pixels * sensor_width)
|
|
|
+ distance_cm = (
|
|
|
+ self.average_face_width_cm *
|
|
|
+ self.camera_focal_length_mm *
|
|
|
+ self.camera_width
|
|
|
+ ) / (face_width_pixels * self.sensor_width_mm)
|
|
|
+
|
|
|
+ # Clamp to reasonable range (50cm to 500cm)
|
|
|
+ return max(50.0, min(500.0, distance_cm))
|
|
|
+
|
|
|
+ def calculate_centering_adjustment(self, mouth_x: int, mouth_y: int, face_width: int) -> Tuple[float, float, float]:
|
|
|
+ """
|
|
|
+ Calculate motor adjustments to center the mouth in camera view
|
|
|
+ Returns: (rotation_adjustment, elevation_adjustment, estimated_distance)
|
|
|
+ """
|
|
|
+ # Calculate offset from center
|
|
|
+ dx = mouth_x - self.center_x
|
|
|
+ dy = mouth_y - self.center_y
|
|
|
+ distance_from_center = math.sqrt(dx*dx + dy*dy)
|
|
|
+
|
|
|
+ # Estimate distance for context
|
|
|
+ estimated_distance = self.estimate_distance(face_width)
|
|
|
+
|
|
|
+ # If mouth is already centered (within deadzone), no adjustment needed
|
|
|
+ if distance_from_center < self.target_deadzone_pixels:
|
|
|
+ return 0.0, 0.0, estimated_distance
|
|
|
+
|
|
|
+ # Calculate adjustment angles based on pixel offset
|
|
|
+ # Larger faces (closer) need smaller adjustments, smaller faces (farther) need larger adjustments
|
|
|
+ distance_factor = max(0.5, min(2.0, 100.0 / estimated_distance)) # Scale adjustments by distance
|
|
|
+
|
|
|
+ # Convert pixel offset to approximate angle adjustment
|
|
|
+ # This is empirical - you'll need to tune these values for your setup
|
|
|
+ pixels_per_degree_rotation = 15 * distance_factor # Adjust based on your camera/motor setup
|
|
|
+ pixels_per_degree_elevation = 12 * distance_factor # Adjust based on your camera/motor setup
|
|
|
+
|
|
|
+ rotation_adjustment = dx / pixels_per_degree_rotation
|
|
|
+ elevation_adjustment = -dy / pixels_per_degree_elevation # Negative because Y increases downward
|
|
|
+
|
|
|
+ # Apply configured offsets
|
|
|
+ rotation_adjustment += self.rotation_offset_degrees
|
|
|
+ elevation_adjustment += self.elevation_offset_degrees
|
|
|
+
|
|
|
+ # Add distance-based elevation compensation (closer targets need higher elevation)
|
|
|
+ distance_elevation_compensation = (200.0 - estimated_distance) * self.distance_elevation_factor / 100.0
|
|
|
+ elevation_adjustment += distance_elevation_compensation
|
|
|
+
|
|
|
+ # Clamp to maximum adjustment per iteration
|
|
|
+ rotation_adjustment = max(-self.max_adjustment_degrees,
|
|
|
+ min(self.max_adjustment_degrees, rotation_adjustment))
|
|
|
+ elevation_adjustment = max(-self.max_adjustment_degrees,
|
|
|
+ min(self.max_adjustment_degrees, elevation_adjustment))
|
|
|
+
|
|
|
+ return rotation_adjustment, elevation_adjustment, estimated_distance
|
|
|
+
|
|
|
+class OCTv2Server:
|
|
|
+ def __init__(self, host='0.0.0.0', port=8080):
|
|
|
+ self.host = host
|
|
|
+ self.port = port
|
|
|
+ self.running = False
|
|
|
+ self.clients = []
|
|
|
+
|
|
|
+ # Hardware components
|
|
|
+ self.esp32 = ESP32Controller()
|
|
|
+ self.mouth_detector = MouthDetector()
|
|
|
+
|
|
|
+ # Hardware state
|
|
|
+ self.current_rotation = 0.0 # -90 to +90 degrees
|
|
|
+ self.current_elevation = 30.0 # 0 to 60 degrees
|
|
|
+ self.is_auto_mode = False
|
|
|
+ self.is_homed = False
|
|
|
+ self.auto_fire_enabled = True
|
|
|
+
|
|
|
+ # Camera state
|
|
|
+ self.camera = None
|
|
|
+ self.streaming_clients = []
|
|
|
+ self.stream_thread = None
|
|
|
+
|
|
|
+ # Auto mode state
|
|
|
+ self.auto_mode_thread = None
|
|
|
+ self.last_target_time = 0
|
|
|
+ self.target_cooldown = 2.0 # Seconds between automatic shots
|
|
|
+
|
|
|
+ self.setup_camera()
|
|
|
+
|
|
|
+ def setup_camera(self):
|
|
|
+ """Initialize camera"""
|
|
|
+ if not CAMERA_AVAILABLE:
|
|
|
+ logger.info("Camera not available - simulating camera")
|
|
|
+ return
|
|
|
+
|
|
|
+ try:
|
|
|
+ self.camera = Picamera2()
|
|
|
+ config = self.camera.create_preview_configuration(
|
|
|
+ main={"size": (640, 480)},
|
|
|
+ lores={"size": (320, 240)},
|
|
|
+ display="lores"
|
|
|
+ )
|
|
|
+ self.camera.configure(config)
|
|
|
+ self.camera.start()
|
|
|
+ logger.info("Camera initialized successfully")
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"Camera setup failed: {e}")
|
|
|
+
|
|
|
+ def start_server(self):
|
|
|
+ """Start the TCP server"""
|
|
|
+ self.running = True
|
|
|
+ server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
|
+ server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
|
+
|
|
|
+ try:
|
|
|
+ server_socket.bind((self.host, self.port))
|
|
|
+ server_socket.listen(5)
|
|
|
+ logger.info(f"OCTv2 Server v2 listening on {self.host}:{self.port}")
|
|
|
+
|
|
|
+ # Start auto mode thread
|
|
|
+ self.auto_mode_thread = threading.Thread(target=self.auto_mode_worker)
|
|
|
+ self.auto_mode_thread.daemon = True
|
|
|
+ self.auto_mode_thread.start()
|
|
|
+
|
|
|
+ while self.running:
|
|
|
+ try:
|
|
|
+ client_socket, address = server_socket.accept()
|
|
|
+ logger.info(f"Client connected from {address}")
|
|
|
+
|
|
|
+ client_thread = threading.Thread(
|
|
|
+ target=self.handle_client,
|
|
|
+ args=(client_socket, address)
|
|
|
+ )
|
|
|
+ client_thread.daemon = True
|
|
|
+ client_thread.start()
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ if self.running:
|
|
|
+ logger.error(f"Error accepting client: {e}")
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"Server error: {e}")
|
|
|
+ finally:
|
|
|
+ server_socket.close()
|
|
|
+ self.cleanup()
|
|
|
+
|
|
|
+ def handle_client(self, client_socket, address):
|
|
|
+ """Handle individual client connections"""
|
|
|
+ self.clients.append(client_socket)
|
|
|
+
|
|
|
+ try:
|
|
|
+ while self.running:
|
|
|
+ data = client_socket.recv(1024)
|
|
|
+ if not data:
|
|
|
+ break
|
|
|
+
|
|
|
+ try:
|
|
|
+ command = json.loads(data.decode('utf-8'))
|
|
|
+ logger.info(f"Received command: {command}")
|
|
|
+
|
|
|
+ response = self.process_command(command, client_socket)
|
|
|
+ if response:
|
|
|
+ client_socket.send(json.dumps(response).encode('utf-8'))
|
|
|
+
|
|
|
+ except json.JSONDecodeError:
|
|
|
+ logger.error("Invalid JSON received")
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"Error processing command: {e}")
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"Client {address} error: {e}")
|
|
|
+ finally:
|
|
|
+ if client_socket in self.clients:
|
|
|
+ self.clients.remove(client_socket)
|
|
|
+ if client_socket in self.streaming_clients:
|
|
|
+ self.streaming_clients.remove(client_socket)
|
|
|
+ client_socket.close()
|
|
|
+ logger.info(f"Client {address} disconnected")
|
|
|
+
|
|
|
+ def process_command(self, command: Dict[str, Any], client_socket) -> Optional[Dict[str, Any]]:
|
|
|
+ """Process commands from iOS app"""
|
|
|
+ action = command.get('action')
|
|
|
+
|
|
|
+ logger.info(f"Processing action: {action}")
|
|
|
+
|
|
|
+ if action == 'aim_left':
|
|
|
+ return self.aim_left()
|
|
|
+ elif action == 'aim_right':
|
|
|
+ return self.aim_right()
|
|
|
+ elif action == 'fire':
|
|
|
+ angle = command.get('angle', self.current_elevation)
|
|
|
+ return self.fire_oreo(angle)
|
|
|
+ elif action == 'home':
|
|
|
+ return self.home_device()
|
|
|
+ elif action == 'set_mode':
|
|
|
+ mode = command.get('mode', 'manual')
|
|
|
+ return self.set_mode(mode)
|
|
|
+ elif action == 'capture_photo':
|
|
|
+ return self.capture_photo(client_socket)
|
|
|
+ elif action == 'start_video_stream':
|
|
|
+ return self.start_video_stream(client_socket)
|
|
|
+ elif action == 'stop_video_stream':
|
|
|
+ return self.stop_video_stream(client_socket)
|
|
|
+ elif action == 'status':
|
|
|
+ return self.get_status()
|
|
|
+ else:
|
|
|
+ return {'error': f'Unknown action: {action}'}
|
|
|
+
|
|
|
+ def aim_left(self) -> Dict[str, Any]:
|
|
|
+ """Move aim left by small increment"""
|
|
|
+ new_rotation = max(-90, self.current_rotation - 5)
|
|
|
+ if self.esp32.move_to_position(new_rotation, self.current_elevation):
|
|
|
+ self.current_rotation = new_rotation
|
|
|
+ return {'status': 'success', 'rotation': self.current_rotation}
|
|
|
+ return {'status': 'error', 'message': 'Failed to move left'}
|
|
|
+
|
|
|
+ def aim_right(self) -> Dict[str, Any]:
|
|
|
+ """Move aim right by small increment"""
|
|
|
+ new_rotation = min(90, self.current_rotation + 5)
|
|
|
+ if self.esp32.move_to_position(new_rotation, self.current_elevation):
|
|
|
+ self.current_rotation = new_rotation
|
|
|
+ return {'status': 'success', 'rotation': self.current_rotation}
|
|
|
+ return {'status': 'error', 'message': 'Failed to move right'}
|
|
|
+
|
|
|
+ def fire_oreo(self, elevation: float = None) -> Dict[str, Any]:
|
|
|
+ """Fire an Oreo at the specified elevation"""
|
|
|
+ if elevation is not None:
|
|
|
+ # Move to target elevation first
|
|
|
+ self.current_elevation = max(0, min(60, elevation))
|
|
|
+ if not self.esp32.move_to_position(self.current_rotation, self.current_elevation):
|
|
|
+ return {'status': 'error', 'message': 'Failed to move to position'}
|
|
|
+ time.sleep(0.5) # Wait for positioning
|
|
|
+
|
|
|
+ logger.info(f"FIRING OREO at rotation={self.current_rotation}°, elevation={self.current_elevation}°!")
|
|
|
+
|
|
|
+ if self.esp32.fire_oreo():
|
|
|
+ return {
|
|
|
+ 'status': 'success',
|
|
|
+ 'message': f'Oreo fired at R:{self.current_rotation}° E:{self.current_elevation}°',
|
|
|
+ 'rotation': self.current_rotation,
|
|
|
+ 'elevation': self.current_elevation
|
|
|
+ }
|
|
|
+ else:
|
|
|
+ return {'status': 'error', 'message': 'Fire mechanism failed'}
|
|
|
+
|
|
|
+ def home_device(self) -> Dict[str, Any]:
|
|
|
+ """Home the device to its reference position"""
|
|
|
+ logger.info("Homing device...")
|
|
|
+
|
|
|
+ if self.esp32.home_motors():
|
|
|
+ self.current_rotation = 0.0
|
|
|
+ self.current_elevation = 0.0
|
|
|
+ self.is_homed = True
|
|
|
+ return {
|
|
|
+ 'status': 'success',
|
|
|
+ 'message': 'Device homed successfully',
|
|
|
+ 'rotation': 0,
|
|
|
+ 'elevation': 0
|
|
|
+ }
|
|
|
+ else:
|
|
|
+ return {'status': 'error', 'message': 'Homing failed'}
|
|
|
+
|
|
|
+ def set_mode(self, mode: str) -> Dict[str, Any]:
|
|
|
+ """Set operating mode (auto/manual)"""
|
|
|
+ self.is_auto_mode = (mode.lower() == 'auto')
|
|
|
+ logger.info(f"Mode set to: {mode}")
|
|
|
+
|
|
|
+ if self.is_auto_mode:
|
|
|
+ logger.info("🎯 AUTOMATIC MODE ENABLED - Seeking open mouths!")
|
|
|
+ else:
|
|
|
+ logger.info("🎮 Manual mode enabled")
|
|
|
+
|
|
|
+ return {
|
|
|
+ 'status': 'success',
|
|
|
+ 'mode': mode,
|
|
|
+ 'auto_mode': self.is_auto_mode
|
|
|
+ }
|
|
|
+
|
|
|
+ def auto_mode_worker(self):
|
|
|
+ """Worker thread for automatic mouth detection and firing"""
|
|
|
+ while self.running:
|
|
|
+ try:
|
|
|
+ if self.is_auto_mode and CAMERA_AVAILABLE and self.camera:
|
|
|
+ # Capture frame for analysis
|
|
|
+ frame = self.camera.capture_array()
|
|
|
+
|
|
|
+ # Detect open mouths
|
|
|
+ mouths = self.mouth_detector.detect_open_mouths(frame)
|
|
|
+
|
|
|
+ if mouths and self.auto_fire_enabled:
|
|
|
+ current_time = time.time()
|
|
|
+ if current_time - self.last_target_time > self.target_cooldown:
|
|
|
+ # Target the most confident mouth
|
|
|
+ best_mouth = mouths[0]
|
|
|
+ mx, my, mw, mh, confidence = best_mouth
|
|
|
+
|
|
|
+ # Calculate mouth center and face width (estimate from mouth width)
|
|
|
+ mouth_center_x = mx + mw // 2
|
|
|
+ mouth_center_y = my + mh // 2
|
|
|
+ estimated_face_width = int(mw * 2.5) # Face is roughly 2.5x wider than mouth
|
|
|
+
|
|
|
+ # Calculate centering adjustment
|
|
|
+ rotation_adj, elevation_adj, distance = self.mouth_detector.calculate_centering_adjustment(
|
|
|
+ mouth_center_x, mouth_center_y, estimated_face_width
|
|
|
+ )
|
|
|
+
|
|
|
+ logger.info(f"🎯 AUTO TARGET: Mouth detected (confidence {confidence:.2f}, distance ~{distance:.0f}cm)")
|
|
|
+
|
|
|
+ # Only adjust if mouth is not already centered
|
|
|
+ if abs(rotation_adj) > 0.1 or abs(elevation_adj) > 0.1:
|
|
|
+ # Calculate new position with adjustments
|
|
|
+ new_rotation = max(-90, min(90, self.current_rotation + rotation_adj))
|
|
|
+ new_elevation = max(0, min(60, self.current_elevation + elevation_adj))
|
|
|
+
|
|
|
+ logger.info(f"🎯 CENTERING: Adjusting R:{rotation_adj:+.1f}° E:{elevation_adj:+.1f}° -> R:{new_rotation:.1f}° E:{new_elevation:.1f}°")
|
|
|
+
|
|
|
+ # Move to center the target
|
|
|
+ if self.esp32.move_to_position(new_rotation, new_elevation):
|
|
|
+ self.current_rotation = new_rotation
|
|
|
+ self.current_elevation = new_elevation
|
|
|
+ time.sleep(0.5) # Wait for positioning
|
|
|
+ else:
|
|
|
+ logger.info("🎯 TARGET CENTERED: Mouth already in optimal position")
|
|
|
+
|
|
|
+ # Check if mouth is now well-centered for firing
|
|
|
+ dx = mouth_center_x - self.mouth_detector.center_x
|
|
|
+ dy = mouth_center_y - self.mouth_detector.center_y
|
|
|
+ center_distance = math.sqrt(dx*dx + dy*dy)
|
|
|
+
|
|
|
+ if center_distance < self.mouth_detector.target_deadzone_pixels * 1.5: # Allow some tolerance
|
|
|
+ # Fire!
|
|
|
+ logger.info(f"🔥 AUTO FIRE: Launching Oreo at centered target! (offset: {center_distance:.0f}px)")
|
|
|
+ self.esp32.fire_oreo()
|
|
|
+ self.last_target_time = current_time
|
|
|
+ else:
|
|
|
+ logger.info(f"🎯 TARGET NOT CENTERED: Waiting for better positioning (offset: {center_distance:.0f}px)")
|
|
|
+
|
|
|
+ time.sleep(0.1) # Check at ~10fps
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"Auto mode error: {e}")
|
|
|
+ time.sleep(1)
|
|
|
+
|
|
|
+ def capture_photo(self, client_socket) -> Dict[str, Any]:
|
|
|
+ """Capture a high-resolution photo"""
|
|
|
+ if not CAMERA_AVAILABLE:
|
|
|
+ logger.info("SIMULATED: Photo captured")
|
|
|
+ return {'status': 'success', 'message': 'Photo captured (simulated)'}
|
|
|
+
|
|
|
+ try:
|
|
|
+ timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
|
+ filename = f"octv2_photo_{timestamp}.jpg"
|
|
|
+
|
|
|
+ self.camera.capture_file(filename)
|
|
|
+
|
|
|
+ # Send photo back to app
|
|
|
+ with open(filename, 'rb') as f:
|
|
|
+ photo_data = f.read()
|
|
|
+ client_socket.send(photo_data)
|
|
|
+
|
|
|
+ return {
|
|
|
+ 'status': 'success',
|
|
|
+ 'filename': filename,
|
|
|
+ 'message': 'Photo captured and saved'
|
|
|
+ }
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"Photo capture failed: {e}")
|
|
|
+ return {'status': 'error', 'message': str(e)}
|
|
|
+
|
|
|
+ def start_video_stream(self, client_socket) -> Dict[str, Any]:
|
|
|
+ """Start video streaming to client"""
|
|
|
+ if client_socket not in self.streaming_clients:
|
|
|
+ self.streaming_clients.append(client_socket)
|
|
|
+
|
|
|
+ if not self.stream_thread or not self.stream_thread.is_alive():
|
|
|
+ self.stream_thread = threading.Thread(target=self.video_stream_worker)
|
|
|
+ self.stream_thread.daemon = True
|
|
|
+ self.stream_thread.start()
|
|
|
+
|
|
|
+ return {'status': 'success', 'message': 'Video stream started'}
|
|
|
+
|
|
|
+ def stop_video_stream(self, client_socket) -> Dict[str, Any]:
|
|
|
+ """Stop video streaming to client"""
|
|
|
+ if client_socket in self.streaming_clients:
|
|
|
+ self.streaming_clients.remove(client_socket)
|
|
|
+
|
|
|
+ return {'status': 'success', 'message': 'Video stream stopped'}
|
|
|
+
|
|
|
+ def video_stream_worker(self):
|
|
|
+ """Worker thread for video streaming with mouth detection overlay"""
|
|
|
+ if not CAMERA_AVAILABLE:
|
|
|
+ logger.info("SIMULATED: Video streaming started")
|
|
|
+ return
|
|
|
+
|
|
|
+ try:
|
|
|
+ while self.streaming_clients:
|
|
|
+ # Capture frame
|
|
|
+ frame = self.camera.capture_array()
|
|
|
+
|
|
|
+ # Add mouth detection overlay in auto mode
|
|
|
+ if self.is_auto_mode:
|
|
|
+ mouths = self.mouth_detector.detect_open_mouths(frame)
|
|
|
+ self._add_mouth_detection_overlay(frame, mouths)
|
|
|
+
|
|
|
+ # Draw targeting crosshair and deadzone
|
|
|
+ center_x, center_y = frame.shape[1] // 2, frame.shape[0] // 2
|
|
|
+ deadzone = self.mouth_detector.target_deadzone_pixels
|
|
|
+
|
|
|
+ # Main crosshair
|
|
|
+ cv2.line(frame, (center_x-20, center_y), (center_x+20, center_y), (255, 0, 0), 2)
|
|
|
+ cv2.line(frame, (center_x, center_y-20), (center_x, center_y+20), (255, 0, 0), 2)
|
|
|
+
|
|
|
+ # Deadzone circle
|
|
|
+ cv2.circle(frame, (center_x, center_y), deadzone, (255, 0, 0), 2)
|
|
|
+ cv2.putText(frame, 'TARGET ZONE', (center_x-50, center_y+deadzone+20),
|
|
|
+ cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 2)
|
|
|
+
|
|
|
+ # Convert to JPEG
|
|
|
+ _, jpeg = cv2.imencode('.jpg', frame)
|
|
|
+ frame_data = jpeg.tobytes()
|
|
|
+
|
|
|
+ # Send to all streaming clients
|
|
|
+ for client in self.streaming_clients[:]:
|
|
|
+ try:
|
|
|
+ client.send(frame_data)
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"Failed to send frame to client: {e}")
|
|
|
+ self.streaming_clients.remove(client)
|
|
|
+
|
|
|
+ time.sleep(0.1) # ~10 FPS
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"Video streaming error: {e}")
|
|
|
+
|
|
|
+ def get_status(self) -> Dict[str, Any]:
|
|
|
+ """Return current device status"""
|
|
|
+ # Get actual position from ESP32
|
|
|
+ actual_rotation, actual_elevation = self.esp32.get_position()
|
|
|
+
|
|
|
+ return {
|
|
|
+ 'status': 'success',
|
|
|
+ 'rotation': actual_rotation,
|
|
|
+ 'elevation': actual_elevation,
|
|
|
+ 'auto_mode': self.is_auto_mode,
|
|
|
+ 'homed': self.is_homed,
|
|
|
+ 'streaming_clients': len(self.streaming_clients),
|
|
|
+ 'total_clients': len(self.clients),
|
|
|
+ 'esp32_connected': self.esp32.serial_conn is not None
|
|
|
+ }
|
|
|
+
|
|
|
+ def cleanup(self):
|
|
|
+ """Clean up resources"""
|
|
|
+ self.running = False
|
|
|
+
|
|
|
+ if self.camera and CAMERA_AVAILABLE:
|
|
|
+ self.camera.stop()
|
|
|
+
|
|
|
+ if self.esp32.serial_conn:
|
|
|
+ self.esp32.serial_conn.close()
|
|
|
+
|
|
|
+ logger.info("Server shutdown complete")
|
|
|
+
|
|
|
+ def _add_mouth_detection_overlay(self, frame, mouths):
|
|
|
+ """Add visual overlay showing mouth detection results"""
|
|
|
+ if not self.mouth_detector.use_dlib:
|
|
|
+ # Simple overlay for basic detection
|
|
|
+ for mx, my, mw, mh, confidence in mouths:
|
|
|
+ color = (0, 255, 0) if confidence > 0.5 else (0, 255, 255)
|
|
|
+ cv2.rectangle(frame, (mx, my), (mx + mw, my + mh), color, 2)
|
|
|
+ cv2.putText(frame, f'{confidence:.2f}', (mx, my-10),
|
|
|
+ cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)
|
|
|
+ return
|
|
|
+
|
|
|
+ # Advanced overlay showing all mouth states
|
|
|
+ gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
|
|
|
+ faces = self.mouth_detector.detector(gray)
|
|
|
+
|
|
|
+ for face in faces:
|
|
|
+ landmarks = self.mouth_detector.predictor(gray, face)
|
|
|
+ mouth_state, confidence = self.mouth_detector._analyze_mouth_state(landmarks)
|
|
|
+
|
|
|
+ # Get face bounding box
|
|
|
+ x1, y1, x2, y2 = face.left(), face.top(), face.right(), face.bottom()
|
|
|
+
|
|
|
+ # Color coding for different states
|
|
|
+ state_colors = {
|
|
|
+ "WIDE_OPEN": (0, 255, 0), # Bright green - TARGET!
|
|
|
+ "SPEAKING": (0, 165, 255), # Orange
|
|
|
+ "SMILING": (255, 255, 0), # Cyan
|
|
|
+ "CLOSED": (128, 128, 128) # Gray
|
|
|
+ }
|
|
|
+
|
|
|
+ color = state_colors.get(mouth_state, (128, 128, 128))
|
|
|
+
|
|
|
+ # Draw face rectangle
|
|
|
+ if mouth_state == "WIDE_OPEN":
|
|
|
+ # Thick border for targets
|
|
|
+ cv2.rectangle(frame, (x1, y1), (x2, y2), color, 4)
|
|
|
+ # Add target indicator
|
|
|
+ cv2.putText(frame, "🎯 TARGET!", (x1, y1-40),
|
|
|
+ cv2.FONT_HERSHEY_SIMPLEX, 0.8, color, 2)
|
|
|
+ else:
|
|
|
+ # Thin border for non-targets
|
|
|
+ cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
|
|
|
+
|
|
|
+ # Show mouth state, confidence, and distance
|
|
|
+ cv2.putText(frame, f'{mouth_state}', (x1, y1-20),
|
|
|
+ cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)
|
|
|
+ cv2.putText(frame, f'{confidence:.2f}', (x1, y2+20),
|
|
|
+ cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
|
|
|
+
|
|
|
+ # Add distance estimation for WIDE_OPEN mouths
|
|
|
+ if mouth_state == "WIDE_OPEN":
|
|
|
+ face_width = x2 - x1
|
|
|
+ distance = self.mouth_detector.estimate_distance(face_width)
|
|
|
+ cv2.putText(frame, f'~{distance:.0f}cm', (x1, y2+40),
|
|
|
+ cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
|
|
|
+
|
|
|
+ # Draw mouth landmarks for WIDE_OPEN targets
|
|
|
+ if mouth_state == "WIDE_OPEN":
|
|
|
+ # Draw mouth contour
|
|
|
+ mouth_points = []
|
|
|
+ for i in range(48, 68):
|
|
|
+ point = landmarks.part(i)
|
|
|
+ mouth_points.append((point.x, point.y))
|
|
|
+ if i in [48, 54, 62, 66]: # Key points
|
|
|
+ cv2.circle(frame, (point.x, point.y), 3, (0, 255, 0), -1)
|
|
|
+
|
|
|
+ # Draw mouth outline
|
|
|
+ mouth_points = np.array(mouth_points, dtype=np.int32)
|
|
|
+ cv2.polylines(frame, [mouth_points], True, (0, 255, 0), 2)
|
|
|
+
|
|
|
+ # Add detection stats
|
|
|
+ total_faces = len(faces)
|
|
|
+ wide_open_count = len([m for m in mouths if len(m) > 4]) # mouths from detect_open_mouths only contain WIDE_OPEN
|
|
|
+
|
|
|
+ cv2.putText(frame, f'Faces: {total_faces}', (10, 30),
|
|
|
+ cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
|
|
|
+ cv2.putText(frame, f'Targets: {wide_open_count}', (10, 60),
|
|
|
+ cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
|
|
|
+
|
|
|
+def main():
|
|
|
+ """Main entry point"""
|
|
|
+ print("🍪 OCTv2 (Oreo Cookie Thrower v2) Server v2 Starting...")
|
|
|
+ print("🤖 Features: ESP32 Control + Automatic Mouth Detection")
|
|
|
+
|
|
|
+ server = OCTv2Server()
|
|
|
+
|
|
|
+ try:
|
|
|
+ server.start_server()
|
|
|
+ except KeyboardInterrupt:
|
|
|
+ print("\n🛑 Shutting down OCTv2 server...")
|
|
|
+ server.cleanup()
|
|
|
+
|
|
|
+if __name__ == "__main__":
|
|
|
+ main()
|