123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388 |
- #!/usr/bin/env python3
- """
- OCTv2 (Oreo Cookie Thrower v2) - Raspberry Pi Server
- Handles commands from the iOS app to control hardware and camera
- """
- import socket
- import json
- import threading
- import time
- import logging
- from datetime import datetime
- from typing import Dict, Any, Optional
- import io
- import os
- # Camera imports (uncomment when on Pi)
- try:
- from picamera2 import Picamera2
- import RPi.GPIO as GPIO
- CAMERA_AVAILABLE = True
- GPIO_AVAILABLE = True
- except ImportError:
- print("Camera/GPIO not available - running in simulation mode")
- CAMERA_AVAILABLE = False
- GPIO_AVAILABLE = False
- # Configure logging
- logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s - %(levelname)s - %(message)s'
- )
- logger = logging.getLogger(__name__)
- class OCTv2Server:
- def __init__(self, host='0.0.0.0', port=8080):
- self.host = host
- self.port = port
- self.running = False
- self.clients = []
- # Hardware state
- self.current_angle = 30.0
- self.is_auto_mode = True
- self.is_homed = False
- # Camera state
- self.camera = None
- self.streaming_clients = []
- self.stream_thread = None
- # GPIO pins (adjust for your hardware)
- self.SERVO_PIN = 18
- self.STEPPER_PINS = [19, 20, 21, 22] # Example stepper motor pins
- self.FIRE_PIN = 23
- self.setup_hardware()
- self.setup_camera()
- def setup_hardware(self):
- """Initialize GPIO and hardware components"""
- if not GPIO_AVAILABLE:
- logger.info("GPIO not available - simulating hardware")
- return
- try:
- GPIO.setmode(GPIO.BCM)
- GPIO.setup(self.SERVO_PIN, GPIO.OUT)
- GPIO.setup(self.FIRE_PIN, GPIO.OUT)
- # Setup stepper motor pins
- for pin in self.STEPPER_PINS:
- GPIO.setup(pin, GPIO.OUT)
- GPIO.output(pin, False)
- # Initialize servo
- self.servo = GPIO.PWM(self.SERVO_PIN, 50) # 50Hz
- self.servo.start(0)
- logger.info("Hardware initialized successfully")
- except Exception as e:
- logger.error(f"Hardware setup failed: {e}")
- def setup_camera(self):
- """Initialize camera"""
- if not CAMERA_AVAILABLE:
- logger.info("Camera not available - simulating camera")
- return
- try:
- self.camera = Picamera2()
- # Configure camera for streaming and photos
- config = self.camera.create_preview_configuration(
- main={"size": (640, 480)},
- lores={"size": (320, 240)},
- display="lores"
- )
- self.camera.configure(config)
- self.camera.start()
- logger.info("Camera initialized successfully")
- except Exception as e:
- logger.error(f"Camera setup failed: {e}")
- def start_server(self):
- """Start the TCP server"""
- self.running = True
- server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- try:
- server_socket.bind((self.host, self.port))
- server_socket.listen(5)
- logger.info(f"OCTv2 Server listening on {self.host}:{self.port}")
- while self.running:
- try:
- client_socket, address = server_socket.accept()
- logger.info(f"Client connected from {address}")
- client_thread = threading.Thread(
- target=self.handle_client,
- args=(client_socket, address)
- )
- client_thread.daemon = True
- client_thread.start()
- except Exception as e:
- if self.running:
- logger.error(f"Error accepting client: {e}")
- except Exception as e:
- logger.error(f"Server error: {e}")
- finally:
- server_socket.close()
- self.cleanup()
- def handle_client(self, client_socket, address):
- """Handle individual client connections"""
- self.clients.append(client_socket)
- try:
- while self.running:
- data = client_socket.recv(1024)
- if not data:
- break
- try:
- command = json.loads(data.decode('utf-8'))
- logger.info(f"Received command: {command}")
- response = self.process_command(command, client_socket)
- if response:
- client_socket.send(json.dumps(response).encode('utf-8'))
- except json.JSONDecodeError:
- logger.error("Invalid JSON received")
- except Exception as e:
- logger.error(f"Error processing command: {e}")
- except Exception as e:
- logger.error(f"Client {address} error: {e}")
- finally:
- if client_socket in self.clients:
- self.clients.remove(client_socket)
- if client_socket in self.streaming_clients:
- self.streaming_clients.remove(client_socket)
- client_socket.close()
- logger.info(f"Client {address} disconnected")
- def process_command(self, command: Dict[str, Any], client_socket) -> Optional[Dict[str, Any]]:
- """Process commands from iOS app"""
- action = command.get('action')
- timestamp = command.get('timestamp')
- logger.info(f"Processing action: {action}")
- if action == 'aim_left':
- return self.aim_left()
- elif action == 'aim_right':
- return self.aim_right()
- elif action == 'fire':
- angle = command.get('angle', self.current_angle)
- return self.fire_oreo(angle)
- elif action == 'home':
- return self.home_device()
- elif action == 'set_mode':
- mode = command.get('mode', 'auto')
- return self.set_mode(mode)
- elif action == 'capture_photo':
- return self.capture_photo(client_socket)
- elif action == 'start_video_stream':
- return self.start_video_stream(client_socket)
- elif action == 'stop_video_stream':
- return self.stop_video_stream(client_socket)
- elif action == 'status':
- return self.get_status()
- else:
- return {'error': f'Unknown action: {action}'}
- def aim_left(self) -> Dict[str, Any]:
- """Move aim left by a small increment"""
- if self.current_angle > 0:
- self.current_angle = max(0, self.current_angle - 5)
- self.move_to_angle(self.current_angle)
- return {'status': 'success', 'angle': self.current_angle}
- return {'status': 'error', 'message': 'Already at minimum angle'}
- def aim_right(self) -> Dict[str, Any]:
- """Move aim right by a small increment"""
- if self.current_angle < 60:
- self.current_angle = min(60, self.current_angle + 5)
- self.move_to_angle(self.current_angle)
- return {'status': 'success', 'angle': self.current_angle}
- return {'status': 'error', 'message': 'Already at maximum angle'}
- def fire_oreo(self, angle: float) -> Dict[str, Any]:
- """Fire an Oreo at the specified angle"""
- logger.info(f"FIRING OREO at {angle} degrees!")
- # Move to target angle first
- self.current_angle = angle
- self.move_to_angle(angle)
- time.sleep(0.5) # Wait for positioning
- # Fire mechanism
- if GPIO_AVAILABLE:
- GPIO.output(self.FIRE_PIN, True)
- time.sleep(0.1) # Fire pulse
- GPIO.output(self.FIRE_PIN, False)
- else:
- logger.info("SIMULATED: Fire mechanism activated!")
- return {
- 'status': 'success',
- 'message': f'Oreo fired at {angle}°',
- 'angle': angle
- }
- def move_to_angle(self, angle: float):
- """Move servo to specified angle (0-60 degrees)"""
- if GPIO_AVAILABLE:
- # Convert angle to servo duty cycle
- duty = 2 + (angle / 60) * 10 # 2-12% duty cycle for 0-60°
- self.servo.ChangeDutyCycle(duty)
- time.sleep(0.1)
- self.servo.ChangeDutyCycle(0) # Stop sending signal
- else:
- logger.info(f"SIMULATED: Moving to {angle} degrees")
- def home_device(self) -> Dict[str, Any]:
- """Home the device to its reference position"""
- logger.info("Homing device...")
- # Move to home position (usually 0 degrees)
- self.current_angle = 0
- self.move_to_angle(0)
- self.is_homed = True
- return {
- 'status': 'success',
- 'message': 'Device homed successfully',
- 'angle': 0
- }
- def set_mode(self, mode: str) -> Dict[str, Any]:
- """Set operating mode (auto/manual)"""
- self.is_auto_mode = (mode.lower() == 'auto')
- logger.info(f"Mode set to: {mode}")
- return {
- 'status': 'success',
- 'mode': mode,
- 'auto_mode': self.is_auto_mode
- }
- def capture_photo(self, client_socket) -> Dict[str, Any]:
- """Capture a high-resolution photo"""
- if not CAMERA_AVAILABLE:
- logger.info("SIMULATED: Photo captured")
- return {'status': 'success', 'message': 'Photo captured (simulated)'}
- try:
- # Capture high-res photo
- timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
- filename = f"octv2_photo_{timestamp}.jpg"
- # Save to Pi storage
- self.camera.capture_file(filename)
- # Optionally send photo back to app
- with open(filename, 'rb') as f:
- photo_data = f.read()
- client_socket.send(photo_data)
- return {
- 'status': 'success',
- 'filename': filename,
- 'message': 'Photo captured and saved'
- }
- except Exception as e:
- logger.error(f"Photo capture failed: {e}")
- return {'status': 'error', 'message': str(e)}
- def start_video_stream(self, client_socket) -> Dict[str, Any]:
- """Start video streaming to client"""
- if client_socket not in self.streaming_clients:
- self.streaming_clients.append(client_socket)
- if not self.stream_thread or not self.stream_thread.is_alive():
- self.stream_thread = threading.Thread(target=self.video_stream_worker)
- self.stream_thread.daemon = True
- self.stream_thread.start()
- return {'status': 'success', 'message': 'Video stream started'}
- def stop_video_stream(self, client_socket) -> Dict[str, Any]:
- """Stop video streaming to client"""
- if client_socket in self.streaming_clients:
- self.streaming_clients.remove(client_socket)
- return {'status': 'success', 'message': 'Video stream stopped'}
- def video_stream_worker(self):
- """Worker thread for video streaming"""
- if not CAMERA_AVAILABLE:
- logger.info("SIMULATED: Video streaming started")
- return
- try:
- while self.streaming_clients:
- # Capture frame
- stream = io.BytesIO()
- self.camera.capture_file(stream, format='jpeg')
- frame_data = stream.getvalue()
- # Send to all streaming clients
- for client in self.streaming_clients[:]: # Copy list to avoid modification issues
- try:
- client.send(frame_data)
- except Exception as e:
- logger.error(f"Failed to send frame to client: {e}")
- self.streaming_clients.remove(client)
- time.sleep(0.1) # ~10 FPS
- except Exception as e:
- logger.error(f"Video streaming error: {e}")
- def get_status(self) -> Dict[str, Any]:
- """Return current device status"""
- return {
- 'status': 'success',
- 'angle': self.current_angle,
- 'auto_mode': self.is_auto_mode,
- 'homed': self.is_homed,
- 'streaming_clients': len(self.streaming_clients),
- 'total_clients': len(self.clients)
- }
- def cleanup(self):
- """Clean up resources"""
- self.running = False
- if self.camera and CAMERA_AVAILABLE:
- self.camera.stop()
- if GPIO_AVAILABLE:
- if hasattr(self, 'servo'):
- self.servo.stop()
- GPIO.cleanup()
- logger.info("Server shutdown complete")
- def main():
- """Main entry point"""
- print("🍪 OCTv2 (Oreo Cookie Thrower v2) Server Starting...")
- server = OCTv2Server()
- try:
- server.start_server()
- except KeyboardInterrupt:
- print("\n🛑 Shutting down OCTv2 server...")
- server.cleanup()
- if __name__ == "__main__":
- main()
|