|
@@ -1,7 +1,9 @@
|
|
|
#!/usr/bin/env python3
|
|
|
"""
|
|
|
-OCTv2 (Oreo Cookie Thrower v2) - Raspberry Pi Server
|
|
|
-Handles commands from the iOS app to control hardware and camera
|
|
|
+OCTv2 (Oreo Cookie Thrower v2) - Raspberry Pi Server v2
|
|
|
+- ESP32 serial communication for motor control
|
|
|
+- Automatic mouth detection and targeting
|
|
|
+- Stepper motor control for rotation and elevation
|
|
|
"""
|
|
|
|
|
|
import socket
|
|
@@ -9,21 +11,22 @@ import json
|
|
|
import threading
|
|
|
import time
|
|
|
import logging
|
|
|
+import serial
|
|
|
+import cv2
|
|
|
+import numpy as np
|
|
|
from datetime import datetime
|
|
|
-from typing import Dict, Any, Optional
|
|
|
+from typing import Dict, Any, Optional, Tuple, List
|
|
|
import io
|
|
|
import os
|
|
|
+import math
|
|
|
|
|
|
-# Camera imports (uncomment when on Pi)
|
|
|
+# Camera imports
|
|
|
try:
|
|
|
from picamera2 import Picamera2
|
|
|
- import RPi.GPIO as GPIO
|
|
|
CAMERA_AVAILABLE = True
|
|
|
- GPIO_AVAILABLE = True
|
|
|
except ImportError:
|
|
|
- print("Camera/GPIO not available - running in simulation mode")
|
|
|
+ print("Camera not available - running in simulation mode")
|
|
|
CAMERA_AVAILABLE = False
|
|
|
- GPIO_AVAILABLE = False
|
|
|
|
|
|
# Configure logging
|
|
|
logging.basicConfig(
|
|
@@ -32,6 +35,301 @@ logging.basicConfig(
|
|
|
)
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
+class ESP32Controller:
|
|
|
+ """Handle serial communication with ESP32 for motor control"""
|
|
|
+
|
|
|
+ def __init__(self, port='/dev/ttyUSB0', baudrate=115200):
|
|
|
+ self.port = port
|
|
|
+ self.baudrate = baudrate
|
|
|
+ self.serial_conn = None
|
|
|
+ self.connect()
|
|
|
+
|
|
|
+ def connect(self):
|
|
|
+ """Connect to ESP32 via serial"""
|
|
|
+ try:
|
|
|
+ self.serial_conn = serial.Serial(self.port, self.baudrate, timeout=1)
|
|
|
+ time.sleep(2) # Wait for ESP32 to initialize
|
|
|
+ logger.info(f"Connected to ESP32 on {self.port}")
|
|
|
+ return True
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"Failed to connect to ESP32: {e}")
|
|
|
+ return False
|
|
|
+
|
|
|
+ def send_command(self, command: str) -> str:
|
|
|
+ """Send command to ESP32 and get response"""
|
|
|
+ if not self.serial_conn:
|
|
|
+ logger.error("ESP32 not connected")
|
|
|
+ return "ERROR: Not connected"
|
|
|
+
|
|
|
+ try:
|
|
|
+ self.serial_conn.write(f"{command}\n".encode())
|
|
|
+ response = self.serial_conn.readline().decode().strip()
|
|
|
+ logger.debug(f"ESP32 Command: {command} -> Response: {response}")
|
|
|
+ return response
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"ESP32 communication error: {e}")
|
|
|
+ return "ERROR: Communication failed"
|
|
|
+
|
|
|
+ def home_motors(self) -> bool:
|
|
|
+ """Home both rotation and elevation motors"""
|
|
|
+ response = self.send_command("HOME")
|
|
|
+ return response == "OK"
|
|
|
+
|
|
|
+ def move_to_position(self, rotation_degrees: float, elevation_degrees: float) -> bool:
|
|
|
+ """Move to absolute position (rotation: -90 to +90, elevation: 0 to 60)"""
|
|
|
+ cmd = f"MOVE {rotation_degrees:.1f} {elevation_degrees:.1f}"
|
|
|
+ response = self.send_command(cmd)
|
|
|
+ return response == "OK"
|
|
|
+
|
|
|
+ def move_relative(self, delta_rotation: float, delta_elevation: float) -> bool:
|
|
|
+ """Move relative to current position"""
|
|
|
+ cmd = f"REL {delta_rotation:.1f} {delta_elevation:.1f}"
|
|
|
+ response = self.send_command(cmd)
|
|
|
+ return response == "OK"
|
|
|
+
|
|
|
+ def fire_oreo(self) -> bool:
|
|
|
+ """Trigger the firing mechanism"""
|
|
|
+ response = self.send_command("FIRE")
|
|
|
+ return response == "OK"
|
|
|
+
|
|
|
+ def get_position(self) -> Tuple[float, float]:
|
|
|
+ """Get current position (rotation, elevation)"""
|
|
|
+ response = self.send_command("POS")
|
|
|
+ try:
|
|
|
+ parts = response.split()
|
|
|
+ if len(parts) == 2:
|
|
|
+ return float(parts[0]), float(parts[1])
|
|
|
+ except:
|
|
|
+ pass
|
|
|
+ return 0.0, 0.0
|
|
|
+
|
|
|
+class MouthDetector:
|
|
|
+ """Detect open mouths in camera feed for automatic targeting"""
|
|
|
+
|
|
|
+ def __init__(self):
|
|
|
+ # Load OpenCV face cascade classifier
|
|
|
+ self.face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
|
|
|
+
|
|
|
+ # For actual open mouth detection, we'll use facial landmarks
|
|
|
+ try:
|
|
|
+ import dlib
|
|
|
+ # Download shape predictor: http://dlib.net/files/shape_predictor_68_face_landmarks.dat.bz2
|
|
|
+ self.predictor = dlib.shape_predictor("shape_predictor_68_face_landmarks.dat")
|
|
|
+ self.detector = dlib.get_frontal_face_detector()
|
|
|
+ self.use_dlib = True
|
|
|
+ logger.info("Using dlib for precise mouth detection")
|
|
|
+ except ImportError:
|
|
|
+ logger.warning("dlib not available - using basic mouth area detection")
|
|
|
+ self.use_dlib = False
|
|
|
+
|
|
|
+ # Camera parameters for targeting calculations
|
|
|
+ self.camera_width = 640
|
|
|
+ self.camera_height = 480
|
|
|
+ self.center_x = self.camera_width // 2
|
|
|
+ self.center_y = self.camera_height // 2
|
|
|
+
|
|
|
+ # Camera-follows-aim targeting parameters
|
|
|
+ self.target_deadzone_pixels = 30 # Don't adjust if mouth is within this radius of center
|
|
|
+ self.max_adjustment_degrees = 10 # Maximum single adjustment per iteration
|
|
|
+
|
|
|
+ # Distance estimation parameters (based on average human face size)
|
|
|
+ self.average_face_width_cm = 16.0 # Average human face width
|
|
|
+ self.camera_focal_length_mm = 3.04 # Pi Camera focal length
|
|
|
+ self.sensor_width_mm = 3.68 # Pi Camera sensor width
|
|
|
+
|
|
|
+ # Aiming offsets (configurable for mechanical compensation)
|
|
|
+ self.rotation_offset_degrees = 0.0 # Adjust if camera/launcher not perfectly aligned
|
|
|
+ self.elevation_offset_degrees = 0.0 # Adjust for gravity compensation
|
|
|
+ self.distance_elevation_factor = 0.5 # Elevation adjustment based on distance
|
|
|
+
|
|
|
+ logger.info("Mouth detector initialized")
|
|
|
+
|
|
|
+ def detect_open_mouths(self, frame) -> List[Tuple[int, int, int, int, float]]:
|
|
|
+ """
|
|
|
+ Detect open mouths in frame
|
|
|
+ Returns list of (x, y, w, h, confidence) for each detected mouth
|
|
|
+ """
|
|
|
+ gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
|
|
|
+ mouths = []
|
|
|
+
|
|
|
+ if self.use_dlib:
|
|
|
+ # Use dlib for precise facial landmark detection
|
|
|
+ faces = self.detector(gray)
|
|
|
+
|
|
|
+ for face in faces:
|
|
|
+ landmarks = self.predictor(gray, face)
|
|
|
+
|
|
|
+ # Get mouth landmarks (points 48-67 in 68-point model)
|
|
|
+ mouth_points = []
|
|
|
+ for i in range(48, 68):
|
|
|
+ point = landmarks.part(i)
|
|
|
+ mouth_points.append((point.x, point.y))
|
|
|
+
|
|
|
+ # Advanced mouth analysis for WIDE-OPEN detection only
|
|
|
+ mouth_state, confidence = self._analyze_mouth_state(landmarks)
|
|
|
+
|
|
|
+ # Only target WIDE_OPEN mouths (not speaking or smiling)
|
|
|
+ if mouth_state == "WIDE_OPEN":
|
|
|
+ # Calculate bounding box around mouth
|
|
|
+ mouth_xs = [p[0] for p in mouth_points]
|
|
|
+ mouth_ys = [p[1] for p in mouth_points]
|
|
|
+ mx = min(mouth_xs) - 10
|
|
|
+ my = min(mouth_ys) - 10
|
|
|
+ mw = max(mouth_xs) - min(mouth_xs) + 20
|
|
|
+ mh = max(mouth_ys) - min(mouth_ys) + 20
|
|
|
+
|
|
|
+ mouths.append((mx, my, mw, mh, confidence))
|
|
|
+
|
|
|
+ else:
|
|
|
+ # Fallback to basic face detection + mouth area estimation
|
|
|
+ faces = self.face_cascade.detectMultiScale(gray, 1.3, 5)
|
|
|
+
|
|
|
+ for (fx, fy, fw, fh) in faces:
|
|
|
+ # Estimate mouth area in lower third of face
|
|
|
+ mouth_x = fx + int(fw * 0.25)
|
|
|
+ mouth_y = fy + int(fh * 0.6)
|
|
|
+ mouth_w = int(fw * 0.5)
|
|
|
+ mouth_h = int(fh * 0.3)
|
|
|
+
|
|
|
+ # Use intensity variance as proxy for open mouth
|
|
|
+ mouth_roi = gray[mouth_y:mouth_y+mouth_h, mouth_x:mouth_x+mouth_w]
|
|
|
+ if mouth_roi.size > 0:
|
|
|
+ variance = np.var(mouth_roi)
|
|
|
+ # Higher variance might indicate teeth/tongue visibility
|
|
|
+ confidence = min(1.0, variance / 1000.0) # Adjust threshold
|
|
|
+
|
|
|
+ if confidence > 0.3: # Minimum confidence threshold
|
|
|
+ mouths.append((mouth_x, mouth_y, mouth_w, mouth_h, confidence))
|
|
|
+
|
|
|
+ # Sort by confidence (highest first)
|
|
|
+ mouths.sort(key=lambda x: x[4], reverse=True)
|
|
|
+ return mouths
|
|
|
+
|
|
|
+ def _analyze_mouth_state(self, landmarks) -> Tuple[str, float]:
|
|
|
+ """
|
|
|
+ Analyze mouth landmarks to determine state and confidence
|
|
|
+ Returns: (state, confidence) where state is CLOSED, SPEAKING, SMILING, or WIDE_OPEN
|
|
|
+ """
|
|
|
+ # Key mouth landmark points
|
|
|
+ # Outer lip: 48-54 (top), 54-60 (bottom)
|
|
|
+ # Inner lip: 60-64 (top), 64-68 (bottom)
|
|
|
+
|
|
|
+ # Vertical measurements (mouth opening)
|
|
|
+ outer_top = landmarks.part(51) # Top of upper lip (center)
|
|
|
+ outer_bottom = landmarks.part(57) # Bottom of lower lip (center)
|
|
|
+ inner_top = landmarks.part(62) # Top of inner lip
|
|
|
+ inner_bottom = landmarks.part(66) # Bottom of inner lip
|
|
|
+
|
|
|
+ # Horizontal measurements (mouth width)
|
|
|
+ left_corner = landmarks.part(48) # Left mouth corner
|
|
|
+ right_corner = landmarks.part(54) # Right mouth corner
|
|
|
+
|
|
|
+ # Calculate dimensions
|
|
|
+ outer_height = abs(outer_top.y - outer_bottom.y)
|
|
|
+ inner_height = abs(inner_top.y - inner_bottom.y)
|
|
|
+ mouth_width = abs(right_corner.x - left_corner.x)
|
|
|
+
|
|
|
+ # Calculate ratios
|
|
|
+ outer_aspect_ratio = outer_height / mouth_width if mouth_width > 0 else 0
|
|
|
+ inner_aspect_ratio = inner_height / mouth_width if mouth_width > 0 else 0
|
|
|
+
|
|
|
+ # Calculate lip separation (distance between inner and outer lip)
|
|
|
+ lip_thickness_top = abs(outer_top.y - inner_top.y)
|
|
|
+ lip_thickness_bottom = abs(outer_bottom.y - inner_bottom.y)
|
|
|
+ avg_lip_thickness = (lip_thickness_top + lip_thickness_bottom) / 2
|
|
|
+
|
|
|
+ # Determine mouth state based on multiple criteria
|
|
|
+
|
|
|
+ # WIDE_OPEN: Large inner opening + significant lip separation
|
|
|
+ if (inner_aspect_ratio > 0.6 and
|
|
|
+ outer_aspect_ratio > 0.4 and
|
|
|
+ avg_lip_thickness > 8): # Pixels of lip separation
|
|
|
+ confidence = min(1.0, inner_aspect_ratio * 1.5)
|
|
|
+ return "WIDE_OPEN", confidence
|
|
|
+
|
|
|
+ # SPEAKING: Moderate opening but less lip separation
|
|
|
+ elif (inner_aspect_ratio > 0.3 and
|
|
|
+ outer_aspect_ratio > 0.2 and
|
|
|
+ avg_lip_thickness > 3):
|
|
|
+ confidence = min(1.0, inner_aspect_ratio * 0.8)
|
|
|
+ return "SPEAKING", confidence
|
|
|
+
|
|
|
+ # SMILING: Wide mouth but minimal vertical opening
|
|
|
+ elif (mouth_width > 40 and # Wider than normal
|
|
|
+ outer_aspect_ratio < 0.25 and
|
|
|
+ inner_aspect_ratio < 0.2):
|
|
|
+ # Check if corners are raised (smile detection)
|
|
|
+ mouth_center_y = (outer_top.y + outer_bottom.y) / 2
|
|
|
+ corner_raise = mouth_center_y - ((left_corner.y + right_corner.y) / 2)
|
|
|
+ if corner_raise > 3: # Corners raised above center
|
|
|
+ return "SMILING", 0.3
|
|
|
+
|
|
|
+ # Default: CLOSED
|
|
|
+ return "CLOSED", 0.1
|
|
|
+
|
|
|
+ def estimate_distance(self, face_width_pixels: int) -> float:
|
|
|
+ """
|
|
|
+ Estimate distance to face based on face width in pixels
|
|
|
+ Returns distance in centimeters
|
|
|
+ """
|
|
|
+ if face_width_pixels <= 0:
|
|
|
+ return 100.0 # Default fallback distance
|
|
|
+
|
|
|
+ # Distance = (real_face_width * focal_length * image_width) / (face_width_pixels * sensor_width)
|
|
|
+ distance_cm = (
|
|
|
+ self.average_face_width_cm *
|
|
|
+ self.camera_focal_length_mm *
|
|
|
+ self.camera_width
|
|
|
+ ) / (face_width_pixels * self.sensor_width_mm)
|
|
|
+
|
|
|
+ # Clamp to reasonable range (50cm to 500cm)
|
|
|
+ return max(50.0, min(500.0, distance_cm))
|
|
|
+
|
|
|
+ def calculate_centering_adjustment(self, mouth_x: int, mouth_y: int, face_width: int) -> Tuple[float, float, float]:
|
|
|
+ """
|
|
|
+ Calculate motor adjustments to center the mouth in camera view
|
|
|
+ Returns: (rotation_adjustment, elevation_adjustment, estimated_distance)
|
|
|
+ """
|
|
|
+ # Calculate offset from center
|
|
|
+ dx = mouth_x - self.center_x
|
|
|
+ dy = mouth_y - self.center_y
|
|
|
+ distance_from_center = math.sqrt(dx*dx + dy*dy)
|
|
|
+
|
|
|
+ # Estimate distance for context
|
|
|
+ estimated_distance = self.estimate_distance(face_width)
|
|
|
+
|
|
|
+ # If mouth is already centered (within deadzone), no adjustment needed
|
|
|
+ if distance_from_center < self.target_deadzone_pixels:
|
|
|
+ return 0.0, 0.0, estimated_distance
|
|
|
+
|
|
|
+ # Calculate adjustment angles based on pixel offset
|
|
|
+ # Larger faces (closer) need smaller adjustments, smaller faces (farther) need larger adjustments
|
|
|
+ distance_factor = max(0.5, min(2.0, 100.0 / estimated_distance)) # Scale adjustments by distance
|
|
|
+
|
|
|
+ # Convert pixel offset to approximate angle adjustment
|
|
|
+ # This is empirical - you'll need to tune these values for your setup
|
|
|
+ pixels_per_degree_rotation = 15 * distance_factor # Adjust based on your camera/motor setup
|
|
|
+ pixels_per_degree_elevation = 12 * distance_factor # Adjust based on your camera/motor setup
|
|
|
+
|
|
|
+ rotation_adjustment = dx / pixels_per_degree_rotation
|
|
|
+ elevation_adjustment = -dy / pixels_per_degree_elevation # Negative because Y increases downward
|
|
|
+
|
|
|
+ # Apply configured offsets
|
|
|
+ rotation_adjustment += self.rotation_offset_degrees
|
|
|
+ elevation_adjustment += self.elevation_offset_degrees
|
|
|
+
|
|
|
+ # Add distance-based elevation compensation (closer targets need higher elevation)
|
|
|
+ distance_elevation_compensation = (200.0 - estimated_distance) * self.distance_elevation_factor / 100.0
|
|
|
+ elevation_adjustment += distance_elevation_compensation
|
|
|
+
|
|
|
+ # Clamp to maximum adjustment per iteration
|
|
|
+ rotation_adjustment = max(-self.max_adjustment_degrees,
|
|
|
+ min(self.max_adjustment_degrees, rotation_adjustment))
|
|
|
+ elevation_adjustment = max(-self.max_adjustment_degrees,
|
|
|
+ min(self.max_adjustment_degrees, elevation_adjustment))
|
|
|
+
|
|
|
+ return rotation_adjustment, elevation_adjustment, estimated_distance
|
|
|
+
|
|
|
class OCTv2Server:
|
|
|
def __init__(self, host='0.0.0.0', port=8080):
|
|
|
self.host = host
|
|
@@ -39,48 +337,29 @@ class OCTv2Server:
|
|
|
self.running = False
|
|
|
self.clients = []
|
|
|
|
|
|
+ # Hardware components
|
|
|
+ self.esp32 = ESP32Controller()
|
|
|
+ self.mouth_detector = MouthDetector()
|
|
|
+
|
|
|
# Hardware state
|
|
|
- self.current_angle = 30.0
|
|
|
- self.is_auto_mode = True
|
|
|
+ self.current_rotation = 0.0 # -90 to +90 degrees
|
|
|
+ self.current_elevation = 30.0 # 0 to 60 degrees
|
|
|
+ self.is_auto_mode = False
|
|
|
self.is_homed = False
|
|
|
+ self.auto_fire_enabled = True
|
|
|
|
|
|
# Camera state
|
|
|
self.camera = None
|
|
|
self.streaming_clients = []
|
|
|
self.stream_thread = None
|
|
|
|
|
|
- # GPIO pins (adjust for your hardware)
|
|
|
- self.SERVO_PIN = 18
|
|
|
- self.STEPPER_PINS = [19, 20, 21, 22] # Example stepper motor pins
|
|
|
- self.FIRE_PIN = 23
|
|
|
+ # Auto mode state
|
|
|
+ self.auto_mode_thread = None
|
|
|
+ self.last_target_time = 0
|
|
|
+ self.target_cooldown = 2.0 # Seconds between automatic shots
|
|
|
|
|
|
- self.setup_hardware()
|
|
|
self.setup_camera()
|
|
|
|
|
|
- def setup_hardware(self):
|
|
|
- """Initialize GPIO and hardware components"""
|
|
|
- if not GPIO_AVAILABLE:
|
|
|
- logger.info("GPIO not available - simulating hardware")
|
|
|
- return
|
|
|
-
|
|
|
- try:
|
|
|
- GPIO.setmode(GPIO.BCM)
|
|
|
- GPIO.setup(self.SERVO_PIN, GPIO.OUT)
|
|
|
- GPIO.setup(self.FIRE_PIN, GPIO.OUT)
|
|
|
-
|
|
|
- # Setup stepper motor pins
|
|
|
- for pin in self.STEPPER_PINS:
|
|
|
- GPIO.setup(pin, GPIO.OUT)
|
|
|
- GPIO.output(pin, False)
|
|
|
-
|
|
|
- # Initialize servo
|
|
|
- self.servo = GPIO.PWM(self.SERVO_PIN, 50) # 50Hz
|
|
|
- self.servo.start(0)
|
|
|
-
|
|
|
- logger.info("Hardware initialized successfully")
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"Hardware setup failed: {e}")
|
|
|
-
|
|
|
def setup_camera(self):
|
|
|
"""Initialize camera"""
|
|
|
if not CAMERA_AVAILABLE:
|
|
@@ -89,7 +368,6 @@ class OCTv2Server:
|
|
|
|
|
|
try:
|
|
|
self.camera = Picamera2()
|
|
|
- # Configure camera for streaming and photos
|
|
|
config = self.camera.create_preview_configuration(
|
|
|
main={"size": (640, 480)},
|
|
|
lores={"size": (320, 240)},
|
|
@@ -110,7 +388,12 @@ class OCTv2Server:
|
|
|
try:
|
|
|
server_socket.bind((self.host, self.port))
|
|
|
server_socket.listen(5)
|
|
|
- logger.info(f"OCTv2 Server listening on {self.host}:{self.port}")
|
|
|
+ logger.info(f"OCTv2 Server v2 listening on {self.host}:{self.port}")
|
|
|
+
|
|
|
+ # Start auto mode thread
|
|
|
+ self.auto_mode_thread = threading.Thread(target=self.auto_mode_worker)
|
|
|
+ self.auto_mode_thread.daemon = True
|
|
|
+ self.auto_mode_thread.start()
|
|
|
|
|
|
while self.running:
|
|
|
try:
|
|
@@ -170,7 +453,6 @@ class OCTv2Server:
|
|
|
def process_command(self, command: Dict[str, Any], client_socket) -> Optional[Dict[str, Any]]:
|
|
|
"""Process commands from iOS app"""
|
|
|
action = command.get('action')
|
|
|
- timestamp = command.get('timestamp')
|
|
|
|
|
|
logger.info(f"Processing action: {action}")
|
|
|
|
|
@@ -179,12 +461,12 @@ class OCTv2Server:
|
|
|
elif action == 'aim_right':
|
|
|
return self.aim_right()
|
|
|
elif action == 'fire':
|
|
|
- angle = command.get('angle', self.current_angle)
|
|
|
+ angle = command.get('angle', self.current_elevation)
|
|
|
return self.fire_oreo(angle)
|
|
|
elif action == 'home':
|
|
|
return self.home_device()
|
|
|
elif action == 'set_mode':
|
|
|
- mode = command.get('mode', 'auto')
|
|
|
+ mode = command.get('mode', 'manual')
|
|
|
return self.set_mode(mode)
|
|
|
elif action == 'capture_photo':
|
|
|
return self.capture_photo(client_socket)
|
|
@@ -198,81 +480,140 @@ class OCTv2Server:
|
|
|
return {'error': f'Unknown action: {action}'}
|
|
|
|
|
|
def aim_left(self) -> Dict[str, Any]:
|
|
|
- """Move aim left by a small increment"""
|
|
|
- if self.current_angle > 0:
|
|
|
- self.current_angle = max(0, self.current_angle - 5)
|
|
|
- self.move_to_angle(self.current_angle)
|
|
|
- return {'status': 'success', 'angle': self.current_angle}
|
|
|
- return {'status': 'error', 'message': 'Already at minimum angle'}
|
|
|
+ """Move aim left by small increment"""
|
|
|
+ new_rotation = max(-90, self.current_rotation - 5)
|
|
|
+ if self.esp32.move_to_position(new_rotation, self.current_elevation):
|
|
|
+ self.current_rotation = new_rotation
|
|
|
+ return {'status': 'success', 'rotation': self.current_rotation}
|
|
|
+ return {'status': 'error', 'message': 'Failed to move left'}
|
|
|
|
|
|
def aim_right(self) -> Dict[str, Any]:
|
|
|
- """Move aim right by a small increment"""
|
|
|
- if self.current_angle < 60:
|
|
|
- self.current_angle = min(60, self.current_angle + 5)
|
|
|
- self.move_to_angle(self.current_angle)
|
|
|
- return {'status': 'success', 'angle': self.current_angle}
|
|
|
- return {'status': 'error', 'message': 'Already at maximum angle'}
|
|
|
-
|
|
|
- def fire_oreo(self, angle: float) -> Dict[str, Any]:
|
|
|
- """Fire an Oreo at the specified angle"""
|
|
|
- logger.info(f"FIRING OREO at {angle} degrees!")
|
|
|
-
|
|
|
- # Move to target angle first
|
|
|
- self.current_angle = angle
|
|
|
- self.move_to_angle(angle)
|
|
|
- time.sleep(0.5) # Wait for positioning
|
|
|
-
|
|
|
- # Fire mechanism
|
|
|
- if GPIO_AVAILABLE:
|
|
|
- GPIO.output(self.FIRE_PIN, True)
|
|
|
- time.sleep(0.1) # Fire pulse
|
|
|
- GPIO.output(self.FIRE_PIN, False)
|
|
|
- else:
|
|
|
- logger.info("SIMULATED: Fire mechanism activated!")
|
|
|
-
|
|
|
- return {
|
|
|
- 'status': 'success',
|
|
|
- 'message': f'Oreo fired at {angle}°',
|
|
|
- 'angle': angle
|
|
|
- }
|
|
|
-
|
|
|
- def move_to_angle(self, angle: float):
|
|
|
- """Move servo to specified angle (0-60 degrees)"""
|
|
|
- if GPIO_AVAILABLE:
|
|
|
- # Convert angle to servo duty cycle
|
|
|
- duty = 2 + (angle / 60) * 10 # 2-12% duty cycle for 0-60°
|
|
|
- self.servo.ChangeDutyCycle(duty)
|
|
|
- time.sleep(0.1)
|
|
|
- self.servo.ChangeDutyCycle(0) # Stop sending signal
|
|
|
+ """Move aim right by small increment"""
|
|
|
+ new_rotation = min(90, self.current_rotation + 5)
|
|
|
+ if self.esp32.move_to_position(new_rotation, self.current_elevation):
|
|
|
+ self.current_rotation = new_rotation
|
|
|
+ return {'status': 'success', 'rotation': self.current_rotation}
|
|
|
+ return {'status': 'error', 'message': 'Failed to move right'}
|
|
|
+
|
|
|
+ def fire_oreo(self, elevation: float = None) -> Dict[str, Any]:
|
|
|
+ """Fire an Oreo at the specified elevation"""
|
|
|
+ if elevation is not None:
|
|
|
+ # Move to target elevation first
|
|
|
+ self.current_elevation = max(0, min(60, elevation))
|
|
|
+ if not self.esp32.move_to_position(self.current_rotation, self.current_elevation):
|
|
|
+ return {'status': 'error', 'message': 'Failed to move to position'}
|
|
|
+ time.sleep(0.5) # Wait for positioning
|
|
|
+
|
|
|
+ logger.info(f"FIRING OREO at rotation={self.current_rotation}°, elevation={self.current_elevation}°!")
|
|
|
+
|
|
|
+ if self.esp32.fire_oreo():
|
|
|
+ return {
|
|
|
+ 'status': 'success',
|
|
|
+ 'message': f'Oreo fired at R:{self.current_rotation}° E:{self.current_elevation}°',
|
|
|
+ 'rotation': self.current_rotation,
|
|
|
+ 'elevation': self.current_elevation
|
|
|
+ }
|
|
|
else:
|
|
|
- logger.info(f"SIMULATED: Moving to {angle} degrees")
|
|
|
+ return {'status': 'error', 'message': 'Fire mechanism failed'}
|
|
|
|
|
|
def home_device(self) -> Dict[str, Any]:
|
|
|
"""Home the device to its reference position"""
|
|
|
logger.info("Homing device...")
|
|
|
|
|
|
- # Move to home position (usually 0 degrees)
|
|
|
- self.current_angle = 0
|
|
|
- self.move_to_angle(0)
|
|
|
- self.is_homed = True
|
|
|
-
|
|
|
- return {
|
|
|
- 'status': 'success',
|
|
|
- 'message': 'Device homed successfully',
|
|
|
- 'angle': 0
|
|
|
- }
|
|
|
+ if self.esp32.home_motors():
|
|
|
+ self.current_rotation = 0.0
|
|
|
+ self.current_elevation = 0.0
|
|
|
+ self.is_homed = True
|
|
|
+ return {
|
|
|
+ 'status': 'success',
|
|
|
+ 'message': 'Device homed successfully',
|
|
|
+ 'rotation': 0,
|
|
|
+ 'elevation': 0
|
|
|
+ }
|
|
|
+ else:
|
|
|
+ return {'status': 'error', 'message': 'Homing failed'}
|
|
|
|
|
|
def set_mode(self, mode: str) -> Dict[str, Any]:
|
|
|
"""Set operating mode (auto/manual)"""
|
|
|
self.is_auto_mode = (mode.lower() == 'auto')
|
|
|
logger.info(f"Mode set to: {mode}")
|
|
|
|
|
|
+ if self.is_auto_mode:
|
|
|
+ logger.info("🎯 AUTOMATIC MODE ENABLED - Seeking open mouths!")
|
|
|
+ else:
|
|
|
+ logger.info("🎮 Manual mode enabled")
|
|
|
+
|
|
|
return {
|
|
|
'status': 'success',
|
|
|
'mode': mode,
|
|
|
'auto_mode': self.is_auto_mode
|
|
|
}
|
|
|
|
|
|
+ def auto_mode_worker(self):
|
|
|
+ """Worker thread for automatic mouth detection and firing"""
|
|
|
+ while self.running:
|
|
|
+ try:
|
|
|
+ if self.is_auto_mode and CAMERA_AVAILABLE and self.camera:
|
|
|
+ # Capture frame for analysis
|
|
|
+ frame = self.camera.capture_array()
|
|
|
+
|
|
|
+ # Detect open mouths
|
|
|
+ mouths = self.mouth_detector.detect_open_mouths(frame)
|
|
|
+
|
|
|
+ if mouths and self.auto_fire_enabled:
|
|
|
+ current_time = time.time()
|
|
|
+ if current_time - self.last_target_time > self.target_cooldown:
|
|
|
+ # Target the most confident mouth
|
|
|
+ best_mouth = mouths[0]
|
|
|
+ mx, my, mw, mh, confidence = best_mouth
|
|
|
+
|
|
|
+ # Calculate mouth center and face width (estimate from mouth width)
|
|
|
+ mouth_center_x = mx + mw // 2
|
|
|
+ mouth_center_y = my + mh // 2
|
|
|
+ estimated_face_width = int(mw * 2.5) # Face is roughly 2.5x wider than mouth
|
|
|
+
|
|
|
+ # Calculate centering adjustment
|
|
|
+ rotation_adj, elevation_adj, distance = self.mouth_detector.calculate_centering_adjustment(
|
|
|
+ mouth_center_x, mouth_center_y, estimated_face_width
|
|
|
+ )
|
|
|
+
|
|
|
+ logger.info(f"🎯 AUTO TARGET: Mouth detected (confidence {confidence:.2f}, distance ~{distance:.0f}cm)")
|
|
|
+
|
|
|
+ # Only adjust if mouth is not already centered
|
|
|
+ if abs(rotation_adj) > 0.1 or abs(elevation_adj) > 0.1:
|
|
|
+ # Calculate new position with adjustments
|
|
|
+ new_rotation = max(-90, min(90, self.current_rotation + rotation_adj))
|
|
|
+ new_elevation = max(0, min(60, self.current_elevation + elevation_adj))
|
|
|
+
|
|
|
+ logger.info(f"🎯 CENTERING: Adjusting R:{rotation_adj:+.1f}° E:{elevation_adj:+.1f}° -> R:{new_rotation:.1f}° E:{new_elevation:.1f}°")
|
|
|
+
|
|
|
+ # Move to center the target
|
|
|
+ if self.esp32.move_to_position(new_rotation, new_elevation):
|
|
|
+ self.current_rotation = new_rotation
|
|
|
+ self.current_elevation = new_elevation
|
|
|
+ time.sleep(0.5) # Wait for positioning
|
|
|
+ else:
|
|
|
+ logger.info("🎯 TARGET CENTERED: Mouth already in optimal position")
|
|
|
+
|
|
|
+ # Check if mouth is now well-centered for firing
|
|
|
+ dx = mouth_center_x - self.mouth_detector.center_x
|
|
|
+ dy = mouth_center_y - self.mouth_detector.center_y
|
|
|
+ center_distance = math.sqrt(dx*dx + dy*dy)
|
|
|
+
|
|
|
+ if center_distance < self.mouth_detector.target_deadzone_pixels * 1.5: # Allow some tolerance
|
|
|
+ # Fire!
|
|
|
+ logger.info(f"🔥 AUTO FIRE: Launching Oreo at centered target! (offset: {center_distance:.0f}px)")
|
|
|
+ self.esp32.fire_oreo()
|
|
|
+ self.last_target_time = current_time
|
|
|
+ else:
|
|
|
+ logger.info(f"🎯 TARGET NOT CENTERED: Waiting for better positioning (offset: {center_distance:.0f}px)")
|
|
|
+
|
|
|
+ time.sleep(0.1) # Check at ~10fps
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"Auto mode error: {e}")
|
|
|
+ time.sleep(1)
|
|
|
+
|
|
|
def capture_photo(self, client_socket) -> Dict[str, Any]:
|
|
|
"""Capture a high-resolution photo"""
|
|
|
if not CAMERA_AVAILABLE:
|
|
@@ -280,14 +621,12 @@ class OCTv2Server:
|
|
|
return {'status': 'success', 'message': 'Photo captured (simulated)'}
|
|
|
|
|
|
try:
|
|
|
- # Capture high-res photo
|
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
|
filename = f"octv2_photo_{timestamp}.jpg"
|
|
|
|
|
|
- # Save to Pi storage
|
|
|
self.camera.capture_file(filename)
|
|
|
|
|
|
- # Optionally send photo back to app
|
|
|
+ # Send photo back to app
|
|
|
with open(filename, 'rb') as f:
|
|
|
photo_data = f.read()
|
|
|
client_socket.send(photo_data)
|
|
@@ -322,7 +661,7 @@ class OCTv2Server:
|
|
|
return {'status': 'success', 'message': 'Video stream stopped'}
|
|
|
|
|
|
def video_stream_worker(self):
|
|
|
- """Worker thread for video streaming"""
|
|
|
+ """Worker thread for video streaming with mouth detection overlay"""
|
|
|
if not CAMERA_AVAILABLE:
|
|
|
logger.info("SIMULATED: Video streaming started")
|
|
|
return
|
|
@@ -330,12 +669,32 @@ class OCTv2Server:
|
|
|
try:
|
|
|
while self.streaming_clients:
|
|
|
# Capture frame
|
|
|
- stream = io.BytesIO()
|
|
|
- self.camera.capture_file(stream, format='jpeg')
|
|
|
- frame_data = stream.getvalue()
|
|
|
+ frame = self.camera.capture_array()
|
|
|
+
|
|
|
+ # Add mouth detection overlay in auto mode
|
|
|
+ if self.is_auto_mode:
|
|
|
+ mouths = self.mouth_detector.detect_open_mouths(frame)
|
|
|
+ self._add_mouth_detection_overlay(frame, mouths)
|
|
|
+
|
|
|
+ # Draw targeting crosshair and deadzone
|
|
|
+ center_x, center_y = frame.shape[1] // 2, frame.shape[0] // 2
|
|
|
+ deadzone = self.mouth_detector.target_deadzone_pixels
|
|
|
+
|
|
|
+ # Main crosshair
|
|
|
+ cv2.line(frame, (center_x-20, center_y), (center_x+20, center_y), (255, 0, 0), 2)
|
|
|
+ cv2.line(frame, (center_x, center_y-20), (center_x, center_y+20), (255, 0, 0), 2)
|
|
|
+
|
|
|
+ # Deadzone circle
|
|
|
+ cv2.circle(frame, (center_x, center_y), deadzone, (255, 0, 0), 2)
|
|
|
+ cv2.putText(frame, 'TARGET ZONE', (center_x-50, center_y+deadzone+20),
|
|
|
+ cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 2)
|
|
|
+
|
|
|
+ # Convert to JPEG
|
|
|
+ _, jpeg = cv2.imencode('.jpg', frame)
|
|
|
+ frame_data = jpeg.tobytes()
|
|
|
|
|
|
# Send to all streaming clients
|
|
|
- for client in self.streaming_clients[:]: # Copy list to avoid modification issues
|
|
|
+ for client in self.streaming_clients[:]:
|
|
|
try:
|
|
|
client.send(frame_data)
|
|
|
except Exception as e:
|
|
@@ -349,13 +708,18 @@ class OCTv2Server:
|
|
|
|
|
|
def get_status(self) -> Dict[str, Any]:
|
|
|
"""Return current device status"""
|
|
|
+ # Get actual position from ESP32
|
|
|
+ actual_rotation, actual_elevation = self.esp32.get_position()
|
|
|
+
|
|
|
return {
|
|
|
'status': 'success',
|
|
|
- 'angle': self.current_angle,
|
|
|
+ 'rotation': actual_rotation,
|
|
|
+ 'elevation': actual_elevation,
|
|
|
'auto_mode': self.is_auto_mode,
|
|
|
'homed': self.is_homed,
|
|
|
'streaming_clients': len(self.streaming_clients),
|
|
|
- 'total_clients': len(self.clients)
|
|
|
+ 'total_clients': len(self.clients),
|
|
|
+ 'esp32_connected': self.esp32.serial_conn is not None
|
|
|
}
|
|
|
|
|
|
def cleanup(self):
|
|
@@ -365,16 +729,94 @@ class OCTv2Server:
|
|
|
if self.camera and CAMERA_AVAILABLE:
|
|
|
self.camera.stop()
|
|
|
|
|
|
- if GPIO_AVAILABLE:
|
|
|
- if hasattr(self, 'servo'):
|
|
|
- self.servo.stop()
|
|
|
- GPIO.cleanup()
|
|
|
+ if self.esp32.serial_conn:
|
|
|
+ self.esp32.serial_conn.close()
|
|
|
|
|
|
logger.info("Server shutdown complete")
|
|
|
|
|
|
+ def _add_mouth_detection_overlay(self, frame, mouths):
|
|
|
+ """Add visual overlay showing mouth detection results"""
|
|
|
+ if not self.mouth_detector.use_dlib:
|
|
|
+ # Simple overlay for basic detection
|
|
|
+ for mx, my, mw, mh, confidence in mouths:
|
|
|
+ color = (0, 255, 0) if confidence > 0.5 else (0, 255, 255)
|
|
|
+ cv2.rectangle(frame, (mx, my), (mx + mw, my + mh), color, 2)
|
|
|
+ cv2.putText(frame, f'{confidence:.2f}', (mx, my-10),
|
|
|
+ cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)
|
|
|
+ return
|
|
|
+
|
|
|
+ # Advanced overlay showing all mouth states
|
|
|
+ gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
|
|
|
+ faces = self.mouth_detector.detector(gray)
|
|
|
+
|
|
|
+ for face in faces:
|
|
|
+ landmarks = self.mouth_detector.predictor(gray, face)
|
|
|
+ mouth_state, confidence = self.mouth_detector._analyze_mouth_state(landmarks)
|
|
|
+
|
|
|
+ # Get face bounding box
|
|
|
+ x1, y1, x2, y2 = face.left(), face.top(), face.right(), face.bottom()
|
|
|
+
|
|
|
+ # Color coding for different states
|
|
|
+ state_colors = {
|
|
|
+ "WIDE_OPEN": (0, 255, 0), # Bright green - TARGET!
|
|
|
+ "SPEAKING": (0, 165, 255), # Orange
|
|
|
+ "SMILING": (255, 255, 0), # Cyan
|
|
|
+ "CLOSED": (128, 128, 128) # Gray
|
|
|
+ }
|
|
|
+
|
|
|
+ color = state_colors.get(mouth_state, (128, 128, 128))
|
|
|
+
|
|
|
+ # Draw face rectangle
|
|
|
+ if mouth_state == "WIDE_OPEN":
|
|
|
+ # Thick border for targets
|
|
|
+ cv2.rectangle(frame, (x1, y1), (x2, y2), color, 4)
|
|
|
+ # Add target indicator
|
|
|
+ cv2.putText(frame, "🎯 TARGET!", (x1, y1-40),
|
|
|
+ cv2.FONT_HERSHEY_SIMPLEX, 0.8, color, 2)
|
|
|
+ else:
|
|
|
+ # Thin border for non-targets
|
|
|
+ cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
|
|
|
+
|
|
|
+ # Show mouth state, confidence, and distance
|
|
|
+ cv2.putText(frame, f'{mouth_state}', (x1, y1-20),
|
|
|
+ cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)
|
|
|
+ cv2.putText(frame, f'{confidence:.2f}', (x1, y2+20),
|
|
|
+ cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
|
|
|
+
|
|
|
+ # Add distance estimation for WIDE_OPEN mouths
|
|
|
+ if mouth_state == "WIDE_OPEN":
|
|
|
+ face_width = x2 - x1
|
|
|
+ distance = self.mouth_detector.estimate_distance(face_width)
|
|
|
+ cv2.putText(frame, f'~{distance:.0f}cm', (x1, y2+40),
|
|
|
+ cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
|
|
|
+
|
|
|
+ # Draw mouth landmarks for WIDE_OPEN targets
|
|
|
+ if mouth_state == "WIDE_OPEN":
|
|
|
+ # Draw mouth contour
|
|
|
+ mouth_points = []
|
|
|
+ for i in range(48, 68):
|
|
|
+ point = landmarks.part(i)
|
|
|
+ mouth_points.append((point.x, point.y))
|
|
|
+ if i in [48, 54, 62, 66]: # Key points
|
|
|
+ cv2.circle(frame, (point.x, point.y), 3, (0, 255, 0), -1)
|
|
|
+
|
|
|
+ # Draw mouth outline
|
|
|
+ mouth_points = np.array(mouth_points, dtype=np.int32)
|
|
|
+ cv2.polylines(frame, [mouth_points], True, (0, 255, 0), 2)
|
|
|
+
|
|
|
+ # Add detection stats
|
|
|
+ total_faces = len(faces)
|
|
|
+ wide_open_count = len([m for m in mouths if len(m) > 4]) # mouths from detect_open_mouths only contain WIDE_OPEN
|
|
|
+
|
|
|
+ cv2.putText(frame, f'Faces: {total_faces}', (10, 30),
|
|
|
+ cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
|
|
|
+ cv2.putText(frame, f'Targets: {wide_open_count}', (10, 60),
|
|
|
+ cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
|
|
|
+
|
|
|
def main():
|
|
|
"""Main entry point"""
|
|
|
- print("🍪 OCTv2 (Oreo Cookie Thrower v2) Server Starting...")
|
|
|
+ print("🍪 OCTv2 (Oreo Cookie Thrower v2) Server v2 Starting...")
|
|
|
+ print("🤖 Features: ESP32 Control + Automatic Mouth Detection")
|
|
|
|
|
|
server = OCTv2Server()
|
|
|
|