|
@@ -3,7 +3,7 @@
|
|
|
OCTv2 (Oreo Cookie Thrower v2) - Raspberry Pi Server v2
|
|
|
- ESP32 serial communication for motor control
|
|
|
- Automatic mouth detection and targeting
|
|
|
-- Stepper motor control for rotation and elevation
|
|
|
+- Stepper motor control for rotation and pitch
|
|
|
"""
|
|
|
|
|
|
import socket
|
|
@@ -45,15 +45,35 @@ class ESP32Controller:
|
|
|
self.connect()
|
|
|
|
|
|
def connect(self):
|
|
|
- """Connect to ESP32 via serial"""
|
|
|
- try:
|
|
|
- self.serial_conn = serial.Serial(self.port, self.baudrate, timeout=1)
|
|
|
- time.sleep(2) # Wait for ESP32 to initialize
|
|
|
- logger.info(f"Connected to ESP32 on {self.port}")
|
|
|
- return True
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"Failed to connect to ESP32: {e}")
|
|
|
- return False
|
|
|
+ """Connect to ESP32 via serial with infinite retry"""
|
|
|
+ retry_delay = 2.0
|
|
|
+ attempt = 0
|
|
|
+
|
|
|
+ logger.info("ESP32 connection required - will retry indefinitely...")
|
|
|
+
|
|
|
+ while True:
|
|
|
+ attempt += 1
|
|
|
+ try:
|
|
|
+ self.serial_conn = serial.Serial(self.port, self.baudrate, timeout=1)
|
|
|
+ time.sleep(2) # Wait for ESP32 to initialize
|
|
|
+
|
|
|
+ # Test connection with a simple command
|
|
|
+ test_response = self.send_command("STATUS")
|
|
|
+ if "ERROR" not in test_response:
|
|
|
+ logger.info(f"✅ Connected to ESP32 on {self.port} (attempt {attempt})")
|
|
|
+ return True
|
|
|
+ else:
|
|
|
+ self.serial_conn.close()
|
|
|
+ self.serial_conn = None
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.warning(f"ESP32 connection attempt {attempt} failed: {e}")
|
|
|
+ if self.serial_conn:
|
|
|
+ self.serial_conn.close()
|
|
|
+ self.serial_conn = None
|
|
|
+
|
|
|
+ logger.info(f"Retrying ESP32 connection in {retry_delay} seconds...")
|
|
|
+ time.sleep(retry_delay)
|
|
|
|
|
|
def send_command(self, command: str) -> str:
|
|
|
"""Send command to ESP32 and get response"""
|
|
@@ -71,19 +91,19 @@ class ESP32Controller:
|
|
|
return "ERROR: Communication failed"
|
|
|
|
|
|
def home_motors(self) -> bool:
|
|
|
- """Home both rotation and elevation motors"""
|
|
|
+ """Home both rotation and pitch motors"""
|
|
|
response = self.send_command("HOME")
|
|
|
return response == "OK"
|
|
|
|
|
|
- def move_to_position(self, rotation_degrees: float, elevation_degrees: float) -> bool:
|
|
|
- """Move to absolute position (rotation: -90 to +90, elevation: 0 to 60)"""
|
|
|
- cmd = f"MOVE {rotation_degrees:.1f} {elevation_degrees:.1f}"
|
|
|
+ def move_to_position(self, rotation_degrees: float, pitch_degrees: float) -> bool:
|
|
|
+ """Move to absolute position (rotation: -90 to +90, pitch: 0 to 60)"""
|
|
|
+ cmd = f"MOVE {rotation_degrees:.1f} {pitch_degrees:.1f}"
|
|
|
response = self.send_command(cmd)
|
|
|
return response == "OK"
|
|
|
|
|
|
- def move_relative(self, delta_rotation: float, delta_elevation: float) -> bool:
|
|
|
+ def move_relative(self, delta_rotation: float, delta_pitch: float) -> bool:
|
|
|
"""Move relative to current position"""
|
|
|
- cmd = f"REL {delta_rotation:.1f} {delta_elevation:.1f}"
|
|
|
+ cmd = f"REL {delta_rotation:.1f} {delta_pitch:.1f}"
|
|
|
response = self.send_command(cmd)
|
|
|
return response == "OK"
|
|
|
|
|
@@ -93,7 +113,7 @@ class ESP32Controller:
|
|
|
return response == "OK"
|
|
|
|
|
|
def get_position(self) -> Tuple[float, float]:
|
|
|
- """Get current position (rotation, elevation)"""
|
|
|
+ """Get current position (rotation, pitch)"""
|
|
|
response = self.send_command("POS")
|
|
|
try:
|
|
|
parts = response.split()
|
|
@@ -139,8 +159,8 @@ class MouthDetector:
|
|
|
|
|
|
# Aiming offsets (configurable for mechanical compensation)
|
|
|
self.rotation_offset_degrees = 0.0 # Adjust if camera/launcher not perfectly aligned
|
|
|
- self.elevation_offset_degrees = 0.0 # Adjust for gravity compensation
|
|
|
- self.distance_elevation_factor = 0.5 # Elevation adjustment based on distance
|
|
|
+ self.pitch_offset_degrees = 0.0 # Adjust for gravity compensation
|
|
|
+ self.distance_pitch_factor = 0.5 # Elevation adjustment based on distance
|
|
|
|
|
|
logger.info("Mouth detector initialized")
|
|
|
|
|
@@ -208,6 +228,19 @@ class MouthDetector:
|
|
|
def _analyze_mouth_state(self, landmarks) -> Tuple[str, float]:
|
|
|
"""
|
|
|
Analyze mouth landmarks to determine state and confidence
|
|
|
+
|
|
|
+ Uses 68-point facial landmark model where mouth points are 48-67:
|
|
|
+ - Points 48-54: Outer lip contour (left to right)
|
|
|
+ - Points 55-59: Outer lip contour (right to left)
|
|
|
+ - Points 60-64: Inner lip contour (left to right)
|
|
|
+ - Points 65-67: Inner lip contour (right to left)
|
|
|
+
|
|
|
+ Algorithm:
|
|
|
+ 1. Calculate mouth aspect ratios (height/width) for inner and outer lips
|
|
|
+ 2. Measure lip separation (distance between inner lip points)
|
|
|
+ 3. Classify based on thresholds tuned for WIDE_OPEN detection
|
|
|
+ 4. Only WIDE_OPEN mouths are targeted for firing
|
|
|
+
|
|
|
Returns: (state, confidence) where state is CLOSED, SPEAKING, SMILING, or WIDE_OPEN
|
|
|
"""
|
|
|
# Key mouth landmark points
|
|
@@ -288,7 +321,23 @@ class MouthDetector:
|
|
|
def calculate_centering_adjustment(self, mouth_x: int, mouth_y: int, face_width: int) -> Tuple[float, float, float]:
|
|
|
"""
|
|
|
Calculate motor adjustments to center the mouth in camera view
|
|
|
- Returns: (rotation_adjustment, elevation_adjustment, estimated_distance)
|
|
|
+
|
|
|
+ Camera-follows-aim targeting algorithm:
|
|
|
+ 1. Calculate pixel offset from mouth center to camera center
|
|
|
+ 2. Convert pixels to degrees using calibrated conversion factors
|
|
|
+ 3. Apply distance scaling (closer targets need smaller adjustments)
|
|
|
+ 4. Add mechanical offset compensation
|
|
|
+ 5. Add distance-based pitch compensation for projectile trajectory
|
|
|
+
|
|
|
+ Args:
|
|
|
+ mouth_x, mouth_y: Center coordinates of detected mouth in pixels
|
|
|
+ face_width: Width of face in pixels (for distance estimation)
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ (rotation_adjustment, pitch_adjustment, estimated_distance)
|
|
|
+ - rotation_adjustment: Horizontal angle change in degrees (-90 to +90)
|
|
|
+ - pitch_adjustment: Vertical angle change in degrees (0 to 60)
|
|
|
+ - estimated_distance: Distance to target in cm
|
|
|
"""
|
|
|
# Calculate offset from center
|
|
|
dx = mouth_x - self.center_x
|
|
@@ -309,41 +358,57 @@ class MouthDetector:
|
|
|
# Convert pixel offset to approximate angle adjustment
|
|
|
# This is empirical - you'll need to tune these values for your setup
|
|
|
pixels_per_degree_rotation = 15 * distance_factor # Adjust based on your camera/motor setup
|
|
|
- pixels_per_degree_elevation = 12 * distance_factor # Adjust based on your camera/motor setup
|
|
|
+ pixels_per_degree_pitch = 12 * distance_factor # Adjust based on your camera/motor setup
|
|
|
|
|
|
rotation_adjustment = dx / pixels_per_degree_rotation
|
|
|
- elevation_adjustment = -dy / pixels_per_degree_elevation # Negative because Y increases downward
|
|
|
+ pitch_adjustment = -dy / pixels_per_degree_pitch # Negative because Y increases downward
|
|
|
|
|
|
# Apply configured offsets
|
|
|
rotation_adjustment += self.rotation_offset_degrees
|
|
|
- elevation_adjustment += self.elevation_offset_degrees
|
|
|
+ pitch_adjustment += self.pitch_offset_degrees
|
|
|
|
|
|
- # Add distance-based elevation compensation (closer targets need higher elevation)
|
|
|
- distance_elevation_compensation = (200.0 - estimated_distance) * self.distance_elevation_factor / 100.0
|
|
|
- elevation_adjustment += distance_elevation_compensation
|
|
|
+ # Add distance-based pitch compensation (closer targets need higher pitch)
|
|
|
+ distance_pitch_compensation = (200.0 - estimated_distance) * self.distance_pitch_factor / 100.0
|
|
|
+ pitch_adjustment += distance_pitch_compensation
|
|
|
|
|
|
# Clamp to maximum adjustment per iteration
|
|
|
rotation_adjustment = max(-self.max_adjustment_degrees,
|
|
|
min(self.max_adjustment_degrees, rotation_adjustment))
|
|
|
- elevation_adjustment = max(-self.max_adjustment_degrees,
|
|
|
- min(self.max_adjustment_degrees, elevation_adjustment))
|
|
|
+ pitch_adjustment = max(-self.max_adjustment_degrees,
|
|
|
+ min(self.max_adjustment_degrees, pitch_adjustment))
|
|
|
|
|
|
- return rotation_adjustment, elevation_adjustment, estimated_distance
|
|
|
+ return rotation_adjustment, pitch_adjustment, estimated_distance
|
|
|
|
|
|
class OCTv2Server:
|
|
|
+ """
|
|
|
+ Main OCTv2 server handling iOS app communication and hardware control
|
|
|
+
|
|
|
+ Architecture:
|
|
|
+ - TCP server for iOS app communication (JSON protocol)
|
|
|
+ - ESP32 serial communication for motor control
|
|
|
+ - Computer vision for automatic mouth detection and targeting
|
|
|
+ - Camera streaming with detection overlays
|
|
|
+ """
|
|
|
+
|
|
|
def __init__(self, host='0.0.0.0', port=8080):
|
|
|
self.host = host
|
|
|
self.port = port
|
|
|
self.running = False
|
|
|
self.clients = []
|
|
|
|
|
|
- # Hardware components
|
|
|
+ # Initialize ESP32 controller (required - will block until connected)
|
|
|
+ logger.info("Initializing ESP32 controller...")
|
|
|
self.esp32 = ESP32Controller()
|
|
|
- self.mouth_detector = MouthDetector()
|
|
|
+
|
|
|
+ try:
|
|
|
+ self.mouth_detector = MouthDetector()
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"Failed to initialize mouth detector: {e}")
|
|
|
+ self.mouth_detector = None
|
|
|
|
|
|
# Hardware state
|
|
|
self.current_rotation = 0.0 # -90 to +90 degrees
|
|
|
- self.current_elevation = 30.0 # 0 to 60 degrees
|
|
|
+ self.current_pitch = 30.0 # 0 to 60 degrees
|
|
|
self.is_auto_mode = False
|
|
|
self.is_homed = False
|
|
|
self.auto_fire_enabled = True
|
|
@@ -361,23 +426,47 @@ class OCTv2Server:
|
|
|
self.setup_camera()
|
|
|
|
|
|
def setup_camera(self):
|
|
|
- """Initialize camera"""
|
|
|
+ """Initialize camera with error handling and fallback options"""
|
|
|
if not CAMERA_AVAILABLE:
|
|
|
- logger.info("Camera not available - simulating camera")
|
|
|
+ logger.warning("Picamera2 not available - running in simulation mode")
|
|
|
+ logger.info("Install with: sudo apt install python3-picamera2")
|
|
|
return
|
|
|
|
|
|
- try:
|
|
|
- self.camera = Picamera2()
|
|
|
- config = self.camera.create_preview_configuration(
|
|
|
- main={"size": (640, 480)},
|
|
|
- lores={"size": (320, 240)},
|
|
|
- display="lores"
|
|
|
- )
|
|
|
- self.camera.configure(config)
|
|
|
- self.camera.start()
|
|
|
- logger.info("Camera initialized successfully")
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"Camera setup failed: {e}")
|
|
|
+ max_retries = 3
|
|
|
+ for attempt in range(max_retries):
|
|
|
+ try:
|
|
|
+ self.camera = Picamera2()
|
|
|
+
|
|
|
+ # Configure camera with optimal settings for mouth detection
|
|
|
+ config = self.camera.create_preview_configuration(
|
|
|
+ main={"size": (640, 480)}, # Good resolution for detection
|
|
|
+ lores={"size": (320, 240)}, # Lower res for streaming
|
|
|
+ display="lores"
|
|
|
+ )
|
|
|
+ self.camera.configure(config)
|
|
|
+ self.camera.start()
|
|
|
+
|
|
|
+ # Test camera by capturing a frame
|
|
|
+ test_frame = self.camera.capture_array()
|
|
|
+ if test_frame is not None and test_frame.size > 0:
|
|
|
+ logger.info(f"Camera initialized successfully (attempt {attempt + 1})")
|
|
|
+ return True
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.warning(f"Camera initialization attempt {attempt + 1} failed: {e}")
|
|
|
+ if self.camera:
|
|
|
+ try:
|
|
|
+ self.camera.stop()
|
|
|
+ except:
|
|
|
+ pass
|
|
|
+ self.camera = None
|
|
|
+
|
|
|
+ if attempt < max_retries - 1:
|
|
|
+ time.sleep(1)
|
|
|
+
|
|
|
+ logger.error(f"Failed to initialize camera after {max_retries} attempts")
|
|
|
+ logger.info("System will run without camera functionality")
|
|
|
+ return False
|
|
|
|
|
|
def start_server(self):
|
|
|
"""Start the TCP server"""
|
|
@@ -461,7 +550,7 @@ class OCTv2Server:
|
|
|
elif action == 'aim_right':
|
|
|
return self.aim_right()
|
|
|
elif action == 'fire':
|
|
|
- angle = command.get('angle', self.current_elevation)
|
|
|
+ angle = command.get('angle', self.current_pitch)
|
|
|
return self.fire_oreo(angle)
|
|
|
elif action == 'home':
|
|
|
return self.home_device()
|
|
@@ -482,7 +571,7 @@ class OCTv2Server:
|
|
|
def aim_left(self) -> Dict[str, Any]:
|
|
|
"""Move aim left by small increment"""
|
|
|
new_rotation = max(-90, self.current_rotation - 5)
|
|
|
- if self.esp32.move_to_position(new_rotation, self.current_elevation):
|
|
|
+ if self.esp32.move_to_position(new_rotation, self.current_pitch):
|
|
|
self.current_rotation = new_rotation
|
|
|
return {'status': 'success', 'rotation': self.current_rotation}
|
|
|
return {'status': 'error', 'message': 'Failed to move left'}
|
|
@@ -490,28 +579,28 @@ class OCTv2Server:
|
|
|
def aim_right(self) -> Dict[str, Any]:
|
|
|
"""Move aim right by small increment"""
|
|
|
new_rotation = min(90, self.current_rotation + 5)
|
|
|
- if self.esp32.move_to_position(new_rotation, self.current_elevation):
|
|
|
+ if self.esp32.move_to_position(new_rotation, self.current_pitch):
|
|
|
self.current_rotation = new_rotation
|
|
|
return {'status': 'success', 'rotation': self.current_rotation}
|
|
|
return {'status': 'error', 'message': 'Failed to move right'}
|
|
|
|
|
|
- def fire_oreo(self, elevation: float = None) -> Dict[str, Any]:
|
|
|
- """Fire an Oreo at the specified elevation"""
|
|
|
- if elevation is not None:
|
|
|
- # Move to target elevation first
|
|
|
- self.current_elevation = max(0, min(60, elevation))
|
|
|
- if not self.esp32.move_to_position(self.current_rotation, self.current_elevation):
|
|
|
+ def fire_oreo(self, pitch: float = None) -> Dict[str, Any]:
|
|
|
+ """Fire an Oreo at the specified pitch"""
|
|
|
+ if pitch is not None:
|
|
|
+ # Move to target pitch first
|
|
|
+ self.current_pitch = max(0, min(60, pitch))
|
|
|
+ if not self.esp32.move_to_position(self.current_rotation, self.current_pitch):
|
|
|
return {'status': 'error', 'message': 'Failed to move to position'}
|
|
|
time.sleep(0.5) # Wait for positioning
|
|
|
|
|
|
- logger.info(f"FIRING OREO at rotation={self.current_rotation}°, elevation={self.current_elevation}°!")
|
|
|
+ logger.info(f"FIRING OREO at rotation={self.current_rotation}°, pitch={self.current_pitch}°!")
|
|
|
|
|
|
if self.esp32.fire_oreo():
|
|
|
return {
|
|
|
'status': 'success',
|
|
|
- 'message': f'Oreo fired at R:{self.current_rotation}° E:{self.current_elevation}°',
|
|
|
+ 'message': f'Oreo fired at R:{self.current_rotation}° E:{self.current_pitch}°',
|
|
|
'rotation': self.current_rotation,
|
|
|
- 'elevation': self.current_elevation
|
|
|
+ 'pitch': self.current_pitch
|
|
|
}
|
|
|
else:
|
|
|
return {'status': 'error', 'message': 'Fire mechanism failed'}
|
|
@@ -522,13 +611,13 @@ class OCTv2Server:
|
|
|
|
|
|
if self.esp32.home_motors():
|
|
|
self.current_rotation = 0.0
|
|
|
- self.current_elevation = 0.0
|
|
|
+ self.current_pitch = 0.0
|
|
|
self.is_homed = True
|
|
|
return {
|
|
|
'status': 'success',
|
|
|
'message': 'Device homed successfully',
|
|
|
'rotation': 0,
|
|
|
- 'elevation': 0
|
|
|
+ 'pitch': 0
|
|
|
}
|
|
|
else:
|
|
|
return {'status': 'error', 'message': 'Homing failed'}
|
|
@@ -573,24 +662,24 @@ class OCTv2Server:
|
|
|
estimated_face_width = int(mw * 2.5) # Face is roughly 2.5x wider than mouth
|
|
|
|
|
|
# Calculate centering adjustment
|
|
|
- rotation_adj, elevation_adj, distance = self.mouth_detector.calculate_centering_adjustment(
|
|
|
+ rotation_adj, pitch_adj, distance = self.mouth_detector.calculate_centering_adjustment(
|
|
|
mouth_center_x, mouth_center_y, estimated_face_width
|
|
|
)
|
|
|
|
|
|
logger.info(f"🎯 AUTO TARGET: Mouth detected (confidence {confidence:.2f}, distance ~{distance:.0f}cm)")
|
|
|
|
|
|
# Only adjust if mouth is not already centered
|
|
|
- if abs(rotation_adj) > 0.1 or abs(elevation_adj) > 0.1:
|
|
|
+ if abs(rotation_adj) > 0.1 or abs(pitch_adj) > 0.1:
|
|
|
# Calculate new position with adjustments
|
|
|
new_rotation = max(-90, min(90, self.current_rotation + rotation_adj))
|
|
|
- new_elevation = max(0, min(60, self.current_elevation + elevation_adj))
|
|
|
+ new_pitch = max(0, min(60, self.current_pitch + pitch_adj))
|
|
|
|
|
|
- logger.info(f"🎯 CENTERING: Adjusting R:{rotation_adj:+.1f}° E:{elevation_adj:+.1f}° -> R:{new_rotation:.1f}° E:{new_elevation:.1f}°")
|
|
|
+ logger.info(f"🎯 CENTERING: Adjusting R:{rotation_adj:+.1f}° E:{pitch_adj:+.1f}° -> R:{new_rotation:.1f}° E:{new_pitch:.1f}°")
|
|
|
|
|
|
# Move to center the target
|
|
|
- if self.esp32.move_to_position(new_rotation, new_elevation):
|
|
|
+ if self.esp32.move_to_position(new_rotation, new_pitch):
|
|
|
self.current_rotation = new_rotation
|
|
|
- self.current_elevation = new_elevation
|
|
|
+ self.current_pitch = new_pitch
|
|
|
time.sleep(0.5) # Wait for positioning
|
|
|
else:
|
|
|
logger.info("🎯 TARGET CENTERED: Mouth already in optimal position")
|
|
@@ -709,12 +798,12 @@ class OCTv2Server:
|
|
|
def get_status(self) -> Dict[str, Any]:
|
|
|
"""Return current device status"""
|
|
|
# Get actual position from ESP32
|
|
|
- actual_rotation, actual_elevation = self.esp32.get_position()
|
|
|
+ actual_rotation, actual_pitch = self.esp32.get_position()
|
|
|
|
|
|
return {
|
|
|
'status': 'success',
|
|
|
'rotation': actual_rotation,
|
|
|
- 'elevation': actual_elevation,
|
|
|
+ 'pitch': actual_pitch,
|
|
|
'auto_mode': self.is_auto_mode,
|
|
|
'homed': self.is_homed,
|
|
|
'streaming_clients': len(self.streaming_clients),
|