#!/usr/bin/env python3 """ OCTv2 (Oreo Cookie Thrower v2) - Raspberry Pi Server Handles commands from the iOS app to control hardware and camera """ import socket import json import threading import time import logging from datetime import datetime from typing import Dict, Any, Optional import io import os # Camera imports (uncomment when on Pi) try: from picamera2 import Picamera2 import RPi.GPIO as GPIO CAMERA_AVAILABLE = True GPIO_AVAILABLE = True except ImportError: print("Camera/GPIO not available - running in simulation mode") CAMERA_AVAILABLE = False GPIO_AVAILABLE = False # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) class OCTv2Server: def __init__(self, host='0.0.0.0', port=8080): self.host = host self.port = port self.running = False self.clients = [] # Hardware state self.current_angle = 30.0 self.is_auto_mode = True self.is_homed = False # Camera state self.camera = None self.streaming_clients = [] self.stream_thread = None # GPIO pins (adjust for your hardware) self.SERVO_PIN = 18 self.STEPPER_PINS = [19, 20, 21, 22] # Example stepper motor pins self.FIRE_PIN = 23 self.setup_hardware() self.setup_camera() def setup_hardware(self): """Initialize GPIO and hardware components""" if not GPIO_AVAILABLE: logger.info("GPIO not available - simulating hardware") return try: GPIO.setmode(GPIO.BCM) GPIO.setup(self.SERVO_PIN, GPIO.OUT) GPIO.setup(self.FIRE_PIN, GPIO.OUT) # Setup stepper motor pins for pin in self.STEPPER_PINS: GPIO.setup(pin, GPIO.OUT) GPIO.output(pin, False) # Initialize servo self.servo = GPIO.PWM(self.SERVO_PIN, 50) # 50Hz self.servo.start(0) logger.info("Hardware initialized successfully") except Exception as e: logger.error(f"Hardware setup failed: {e}") def setup_camera(self): """Initialize camera""" if not CAMERA_AVAILABLE: logger.info("Camera not available - simulating camera") return try: self.camera = Picamera2() # Configure camera for streaming and photos config = self.camera.create_preview_configuration( main={"size": (640, 480)}, lores={"size": (320, 240)}, display="lores" ) self.camera.configure(config) self.camera.start() logger.info("Camera initialized successfully") except Exception as e: logger.error(f"Camera setup failed: {e}") def start_server(self): """Start the TCP server""" self.running = True server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) try: server_socket.bind((self.host, self.port)) server_socket.listen(5) logger.info(f"OCTv2 Server listening on {self.host}:{self.port}") while self.running: try: client_socket, address = server_socket.accept() logger.info(f"Client connected from {address}") client_thread = threading.Thread( target=self.handle_client, args=(client_socket, address) ) client_thread.daemon = True client_thread.start() except Exception as e: if self.running: logger.error(f"Error accepting client: {e}") except Exception as e: logger.error(f"Server error: {e}") finally: server_socket.close() self.cleanup() def handle_client(self, client_socket, address): """Handle individual client connections""" self.clients.append(client_socket) try: while self.running: data = client_socket.recv(1024) if not data: break try: command = json.loads(data.decode('utf-8')) logger.info(f"Received command: {command}") response = self.process_command(command, client_socket) if response: client_socket.send(json.dumps(response).encode('utf-8')) except json.JSONDecodeError: logger.error("Invalid JSON received") except Exception as e: logger.error(f"Error processing command: {e}") except Exception as e: logger.error(f"Client {address} error: {e}") finally: if client_socket in self.clients: self.clients.remove(client_socket) if client_socket in self.streaming_clients: self.streaming_clients.remove(client_socket) client_socket.close() logger.info(f"Client {address} disconnected") def process_command(self, command: Dict[str, Any], client_socket) -> Optional[Dict[str, Any]]: """Process commands from iOS app""" action = command.get('action') timestamp = command.get('timestamp') logger.info(f"Processing action: {action}") if action == 'aim_left': return self.aim_left() elif action == 'aim_right': return self.aim_right() elif action == 'fire': angle = command.get('angle', self.current_angle) return self.fire_oreo(angle) elif action == 'home': return self.home_device() elif action == 'set_mode': mode = command.get('mode', 'auto') return self.set_mode(mode) elif action == 'capture_photo': return self.capture_photo(client_socket) elif action == 'start_video_stream': return self.start_video_stream(client_socket) elif action == 'stop_video_stream': return self.stop_video_stream(client_socket) elif action == 'status': return self.get_status() else: return {'error': f'Unknown action: {action}'} def aim_left(self) -> Dict[str, Any]: """Move aim left by a small increment""" if self.current_angle > 0: self.current_angle = max(0, self.current_angle - 5) self.move_to_angle(self.current_angle) return {'status': 'success', 'angle': self.current_angle} return {'status': 'error', 'message': 'Already at minimum angle'} def aim_right(self) -> Dict[str, Any]: """Move aim right by a small increment""" if self.current_angle < 60: self.current_angle = min(60, self.current_angle + 5) self.move_to_angle(self.current_angle) return {'status': 'success', 'angle': self.current_angle} return {'status': 'error', 'message': 'Already at maximum angle'} def fire_oreo(self, angle: float) -> Dict[str, Any]: """Fire an Oreo at the specified angle""" logger.info(f"FIRING OREO at {angle} degrees!") # Move to target angle first self.current_angle = angle self.move_to_angle(angle) time.sleep(0.5) # Wait for positioning # Fire mechanism if GPIO_AVAILABLE: GPIO.output(self.FIRE_PIN, True) time.sleep(0.1) # Fire pulse GPIO.output(self.FIRE_PIN, False) else: logger.info("SIMULATED: Fire mechanism activated!") return { 'status': 'success', 'message': f'Oreo fired at {angle}°', 'angle': angle } def move_to_angle(self, angle: float): """Move servo to specified angle (0-60 degrees)""" if GPIO_AVAILABLE: # Convert angle to servo duty cycle duty = 2 + (angle / 60) * 10 # 2-12% duty cycle for 0-60° self.servo.ChangeDutyCycle(duty) time.sleep(0.1) self.servo.ChangeDutyCycle(0) # Stop sending signal else: logger.info(f"SIMULATED: Moving to {angle} degrees") def home_device(self) -> Dict[str, Any]: """Home the device to its reference position""" logger.info("Homing device...") # Move to home position (usually 0 degrees) self.current_angle = 0 self.move_to_angle(0) self.is_homed = True return { 'status': 'success', 'message': 'Device homed successfully', 'angle': 0 } def set_mode(self, mode: str) -> Dict[str, Any]: """Set operating mode (auto/manual)""" self.is_auto_mode = (mode.lower() == 'auto') logger.info(f"Mode set to: {mode}") return { 'status': 'success', 'mode': mode, 'auto_mode': self.is_auto_mode } def capture_photo(self, client_socket) -> Dict[str, Any]: """Capture a high-resolution photo""" if not CAMERA_AVAILABLE: logger.info("SIMULATED: Photo captured") return {'status': 'success', 'message': 'Photo captured (simulated)'} try: # Capture high-res photo timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"octv2_photo_{timestamp}.jpg" # Save to Pi storage self.camera.capture_file(filename) # Optionally send photo back to app with open(filename, 'rb') as f: photo_data = f.read() client_socket.send(photo_data) return { 'status': 'success', 'filename': filename, 'message': 'Photo captured and saved' } except Exception as e: logger.error(f"Photo capture failed: {e}") return {'status': 'error', 'message': str(e)} def start_video_stream(self, client_socket) -> Dict[str, Any]: """Start video streaming to client""" if client_socket not in self.streaming_clients: self.streaming_clients.append(client_socket) if not self.stream_thread or not self.stream_thread.is_alive(): self.stream_thread = threading.Thread(target=self.video_stream_worker) self.stream_thread.daemon = True self.stream_thread.start() return {'status': 'success', 'message': 'Video stream started'} def stop_video_stream(self, client_socket) -> Dict[str, Any]: """Stop video streaming to client""" if client_socket in self.streaming_clients: self.streaming_clients.remove(client_socket) return {'status': 'success', 'message': 'Video stream stopped'} def video_stream_worker(self): """Worker thread for video streaming""" if not CAMERA_AVAILABLE: logger.info("SIMULATED: Video streaming started") return try: while self.streaming_clients: # Capture frame stream = io.BytesIO() self.camera.capture_file(stream, format='jpeg') frame_data = stream.getvalue() # Send to all streaming clients for client in self.streaming_clients[:]: # Copy list to avoid modification issues try: client.send(frame_data) except Exception as e: logger.error(f"Failed to send frame to client: {e}") self.streaming_clients.remove(client) time.sleep(0.1) # ~10 FPS except Exception as e: logger.error(f"Video streaming error: {e}") def get_status(self) -> Dict[str, Any]: """Return current device status""" return { 'status': 'success', 'angle': self.current_angle, 'auto_mode': self.is_auto_mode, 'homed': self.is_homed, 'streaming_clients': len(self.streaming_clients), 'total_clients': len(self.clients) } def cleanup(self): """Clean up resources""" self.running = False if self.camera and CAMERA_AVAILABLE: self.camera.stop() if GPIO_AVAILABLE: if hasattr(self, 'servo'): self.servo.stop() GPIO.cleanup() logger.info("Server shutdown complete") def main(): """Main entry point""" print("🍪 OCTv2 (Oreo Cookie Thrower v2) Server Starting...") server = OCTv2Server() try: server.start_server() except KeyboardInterrupt: print("\n🛑 Shutting down OCTv2 server...") server.cleanup() if __name__ == "__main__": main()