Kaynağa Gözat

Initial Commit of Micropython, Seems Functional, Needs testing

Fred Damstra [Titan] 3 yıl önce
işleme
4aba28ecb5
7 değiştirilmiş dosya ile 628 ekleme ve 0 silme
  1. 1 0
      .gitignore
  2. 3 0
      .gitmodules
  3. 5 0
      boot.py
  4. 615 0
      main.py
  5. 1 0
      micropython-tm1637
  6. 2 0
      test.py
  7. 1 0
      tm1637.py

+ 1 - 0
.gitignore

@@ -0,0 +1 @@
+*.bin

+ 3 - 0
.gitmodules

@@ -0,0 +1,3 @@
+[submodule "micropython-tm1637"]
+	path = micropython-tm1637
+	url = https://github.com/mcauser/micropython-tm1637.git

+ 5 - 0
boot.py

@@ -0,0 +1,5 @@
+# This file is executed on every boot (including wake-boot from deepsleep)
+#import esp
+#esp.osdebug(None)
+#import webrepl
+#webrepl.start()

+ 615 - 0
main.py

@@ -0,0 +1,615 @@
+#/ 
+#
+# Note: Requires a build of micropython with ESPNOW support, which I got from
+# https://github.com/glenn20/micropython-espnow-images
+import binascii
+import esp
+import esp32
+import json
+import machine
+import math
+import network
+import random
+import _thread
+import time
+import tm1637
+
+from machine import Pin, PWM
+
+COMM_TIMEOUT = 5 # Prune neighbor if not heard from in 10 seconds
+MIN_DELAY = 2000 # Minimum time for a random start
+MAX_DELAY = 5000 # Maximum time for a random start
+
+PIN_NUMBERS = {
+         "BUZZER": 18,
+         "LED": [ 26, 25, 23, 22, 21 ],
+         "DISPLAY": [ 33, 32 ],
+         "COASTER": 27,
+         "READY": 14,
+         "MODE": 15
+       }
+PINS = {
+         "BUZZER": Pin(PIN_NUMBERS["BUZZER"], Pin.OUT),
+         "LED": [ 
+             Pin(PIN_NUMBERS['LED'][0], Pin.OUT, value=0),
+             Pin(PIN_NUMBERS['LED'][1], Pin.OUT, value=0),
+             Pin(PIN_NUMBERS['LED'][2], Pin.OUT, value=0),
+             Pin(PIN_NUMBERS['LED'][3], Pin.OUT, value=0),
+             Pin(PIN_NUMBERS['LED'][4], Pin.OUT, value=0)
+         ],
+         "COASTER": Pin(PIN_NUMBERS['COASTER'], Pin.IN, Pin.PULL_UP),
+         "READY": Pin(PIN_NUMBERS['READY'], Pin.IN, Pin.PULL_UP),
+         "MODE": Pin(PIN_NUMBERS['MODE'], Pin.IN, Pin.PULL_UP)
+       }
+display = tm1637.TM1637(clk=Pin(PIN_NUMBERS["DISPLAY"][0]), dio=Pin(PIN_NUMBERS["DISPLAY"][1]))
+wlan = network.WLAN(network.STA_IF)
+
+BROADCAST = b'\xff\xff\xff\xff\xff\xff'
+
+msgqueue = []
+neighbors = {}
+neighborlist = []
+abort  = False
+winner = False
+songfinished = False
+
+def initialize():
+    print("Here's what I know about myself:")
+    print('')
+    print(f'\tMAC Address: {wlan.config('mac')}')
+    print('')
+    print(f'\tFrequency:   {machine.freq()}')
+    print(f'\tFlash Size:  {esp.flash_size()}')
+    print(f'\tTemperature: {esp32.raw_temperature()}')
+    print('')
+    print('Current Switch status:')
+    print(f'\tCoaster: { get_switch("COASTER") }')
+    print(f'\t  Ready: { get_switch("READY") }')
+    print(f'\t   Mode: { get_switch("MODE") }')
+    print('')
+    for i in range(5):
+        print(f'\tTesting LED {i}')
+        led(i, True)
+        beep()
+        time.sleep(0.25)
+        led(i, False)
+
+def led(pin, value):
+    PINS["LED"][pin].value(value)
+
+def get_switch(name):
+    return not PINS[name].value()
+
+def am_ready():
+    buttons_ready = get_switch("COASTER") and get_switch("READY")
+    if buttons_ready is False:
+        return False
+    for k, n in neighbors.items():
+        if n['self'] is True:
+            continue
+        if n['neighborlist'] != neighborlist:
+            return False
+    return True
+
+#######################
+# Buzzer Functions
+def tone(freq, delay):
+    beeper = PWM(PINS["BUZZER"], freq=freq, duty=512)
+    time.sleep_ms(delay)
+    beeper.deinit()
+
+def buzz():
+    tone(freq=300, delay=800)
+
+def beep():
+    tone(freq=2000, delay=200)
+
+def test_buzzer():
+    for freq in range(0, 5500, 100):
+        tone(freq=freq, delay=50)
+
+def play_song():
+    global songfinished
+    frequencies = {
+            'c': 262,
+            'd': 294,
+            'e': 330,
+            'f': 349,
+            'g': 392,
+            'a': 440,
+            'b': 494,
+            'C': 523
+        }
+    songs = {
+                'song1': {
+                    'tempo': 118,
+                    'notes': [
+                          ['c', 1],
+                          ['d', 1],
+                          ['f', 1],
+                          ['d', 1],
+                          ['a', 1],
+                          [' ', 1],
+                          ['a', 4],
+                          ['g', 4],
+                          [' ', 2],
+                          ['c', 1],
+                          ['d', 1],
+                          ['f', 1],
+                          ['d', 1],
+                          ['g', 1],
+                          [' ', 1],
+                          ['g', 4],
+                          ['f', 4],
+                          [' ', 2],
+                        ]
+                },
+                'jingle_bells': {
+                    'tempo': 118,
+                    'notes': [
+                          ['e', 1],
+                          ['e', 1],
+                          ['e', 2],
+                          ['e', 1],
+                          ['e', 1],
+                          ['e', 2],
+                          ['e', 1],
+                          ['g', 1],
+                          ['c', 1],
+                          ['d', 1],
+                          ['e', 4],
+                          [' ', 1],
+                          ['f', 1],
+                          ['f', 1],
+                          ['f', 1],
+                          ['f', 1],
+                          ['f', 1],
+                          ['e', 1],
+                          ['e', 1],
+                          ['e', 1],
+                          ['e', 1],
+                          ['d', 1],
+                          ['d', 1],
+                          ['e', 1],
+                          ['d', 2],
+                          ['g', 2]
+                        ]
+                }
+            }
+
+    #s = random.choice(list(songs.keys()))
+    if random.randint(0, 10) == 10: # one in 10 chance, for now
+        s = 'jingle_bells'
+    else:
+        s = 'song1'
+    print(f'Randomly chose song { s }')
+
+    for note in songs[s]['notes']:
+        duration = note[1] * songs[s]['tempo']
+        if note[0] == ' ':
+            #print(f'Rest for { duration }')
+            time.sleep_ms(duration)
+        else:
+            #print(f'Note { note[0] } at { frequencies[note[0]] } for { duration }')
+            tone(frequencies[note[0]], duration)
+        time.sleep_ms(math.floor(songs[s]['tempo'] / 10))
+
+    songfinished = True
+
+
+
+
+
+#int songLength2 = 26;
+#char notes2[] = "eeeeeeegcde fffffeeeeddedg";
+#int beats2[] = { 1, 1, 2, 1, 1, 2, 1, 1, 1, 1, 4, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2};
+#const int tempo2 = 130;
+
+
+#######################
+# Display  Functions
+def display_counts():
+    nr = count_ready()
+    nc = count_neighbors()
+    digits = [ 0, 0, 0, 0 ]
+
+    if nr > 19 or nc > 19:
+        display.show('OVER')
+        return
+
+    if nr < 10:
+        digits[0] = display.encode_digit(nr)
+    else:
+        digits[0] = display.encode_digit(0x01)
+        digits[1] = display.encode_digit(nr % 10)
+
+    if nc < 10:
+        digits[3] = display.encode_digit(nc)
+    else:
+        digits[2] = display.encode_digit(0x01)
+        digits[3] = display.encode_digit(nc % 10)
+
+    display.write(digits)
+
+
+#######################
+# ESPNow Functions
+def initialize_network():
+    global wlan
+    global espnow
+    global neighbors
+
+    neighbors[mac_to_hex(wlan.config('mac'))] = { 'self': True } # we're our own neighbor
+    neighborlist.append(mac_to_hex(wlan.config('mac')))
+
+    print('Activating WLAN')
+    wlan.active(True)
+
+    print('Activating ESPNow')
+    espnow = esp.espnow.ESPNow()
+    espnow.init()
+    espnow.config(on_recv=espnow_recv_handler)
+    espnow.add_peer(BROADCAST)
+
+
+def espnow_recv_handler(e):
+    # We should add this to a processing queue
+    global msgqueue
+    while(e.poll()):
+        data = e.irecv(0)
+        #neighbor = binascii.hexlify(data[0]).decode('ascii')
+        msg = data[1].decode()
+        msgqueue.append({ 'neighbor': data[0], 'msg': msg })
+        #print(f'Received from { neighbor }: {msg}')
+
+def process_queue():
+    global msgqueue
+    global neighbors
+    global neighborlist
+    global abort
+
+    while(len(msgqueue)>0):
+        item = msgqueue.pop(0)
+        mac = mac_to_hex(item['neighbor'])
+        process_neighbor(mac)
+        try:
+            neighbor_json = json.loads(item["msg"])
+        except:
+            print(f'ERROR: Could not decode message from { mac_to_hex(item["neighbor"]) }: { item["msg"] }')
+
+        msg_type = neighbor_json.get('type', None)
+        if msg_type is None:
+            print(f'ERROR: No message type from { mac_to_hex(item["neighbor"]) }: { item["msg"] }')
+            return False
+        if msg_type == 'GOGOGO':
+            print(f"I'm a peon, and master (or somebody--I don't check) said TIME TO GO!")
+            return True # time to gO!
+        if msg_type == 'ABORT':
+            print(f'Neighbor { mac } disqualified.')
+            abort = True
+            return False
+        if msg_type == 'FINISH':
+            print(f'Neighbor { mac } finished.')
+            neighbors[mac]['finished'] = True
+            neighbors[mac]['time'] = neighbor_json['time']
+            return False
+        if msg_type == 'beacon':
+            neighbors[mac]['neighborlist'] = list(neighbor_json['neighborlist'])
+            neighbors[mac]['mode'] = neighbor_json['mode']
+            # Only neighbors in the same mode as us can be ready
+            if neighbors[mac]['mode'] != get_switch('MODE'): 
+                neighbors[mac]['ready'] = False
+            else:
+                neighbors[mac]['ready'] = neighbor_json['ready']
+
+        doc = json.dumps(neighbor_json)
+        print(f'Received from { mac_to_hex(item["neighbor"]) }        : { doc }')
+    return False # Not time to go
+
+def process_neighbor(mac):
+    if not(mac in neighbors.keys()):
+        print(f'NEW NEIGHBOR: { mac }')
+        neighbors[mac] = { 
+            'self': False,
+            'lastseen': time.time(),
+            'mode': get_switch('MODE'), # Good a guess as any
+            'ready': False
+        }
+        neighborlist.append(mac)
+        neighborlist.sort()
+    neighbors[mac]['lastseen'] = time.time()
+
+def prune_neighbors():
+    for n in list(neighborlist):
+        if neighbors[n]['self'] is True:
+            continue
+        if time.time() - neighbors[n]['lastseen'] > COMM_TIMEOUT:
+            print(f'LOST NEIGHBOR: { n }')
+            neighborlist.remove(n)
+            neighbors.pop(n)
+
+def mac_to_hex(mac):
+    return binascii.hexlify(mac).decode('ascii')
+
+def am_master():
+    # We're the master if we're the first one on the list
+    return neighbors[neighborlist[0]]['self']
+
+last_beacon = time.time()
+def broadcast_beacon():
+    global last_beacon
+    if time.time() - last_beacon < 1:
+        return False
+    last_beacon = time.time()
+    json_beacon = { 
+        'type': 'beacon',
+        'neighborlist': neighborlist,
+        'mode': get_switch("MODE"),
+        'ready': am_ready()
+    }
+    beacon = json.dumps(json_beacon, separators=(',', ':'))
+    #, 'neighbors': neighbors }
+    if(len(beacon)) > 250:
+        display.show('MANY')
+    else:
+        print(f'Sending JSON Broadcast ({len(beacon)} bytes): {beacon}')
+        espnow.send(BROADCAST, beacon, False)
+    return True
+
+
+def count_neighbors():
+    return len(neighborlist)
+
+def count_ready():
+    count = 0
+    for m, i in neighbors.items():
+        if i['self'] and am_ready():
+            count += 1
+        elif i.get('ready', False):
+            count += 1
+    # For testing with only 1:
+    #if count == 1:
+    #    return 2
+    return count
+
+
+##### Sub-Ready Loops
+def call_regularly():
+    broadcast_beacon()
+    time_to_go = process_queue()
+    prune_neighbors()
+
+def get_ready_loop(should_display_counts):
+    print(f'***** Stage 1: Get Ready Loop')
+    time_to_go = False
+    while time_to_go is False: 
+        call_regularly()
+        if should_display_counts or get_switch('READY'):
+            # we only display counts on the first run or once ready
+            # has been pressed. On other runs
+            # we wnat the time and/or disqualification to show
+            should_display_counts = True
+            display_counts()
+        
+        if(get_switch('COASTER')):
+            led(4, 1)
+        else:
+            led(4, 0)
+        
+        # Time to go?
+        if am_master() and count_neighbors() == count_ready():
+            print(f"I'm the master and it's TIME TO GO!")
+            espnow.send(BROADCAST, json.dumps({ 'type': 'GOGOGO' }), False)
+            time_to_go = True
+
+        #time.sleep(1.0)
+
+def sleep_with_coaster_checks(ms):
+    ''' Returns True if there was a disqualifying event '''
+    start = time.ticks_ms()
+    while(time.ticks_diff(time.ticks_ms(), start) < ms):
+        if not get_switch('COASTER'):
+            return True
+    return False
+
+def light_countdown():
+    ''' Countdown the lights, returns True if there was a disqualifying event '''
+    print(f'Mode is Drag Race. Running lights...')
+
+    # Fix for early disqulaification
+    if not get_switch('COASTER'):
+        print('DISQUALIFIED')
+        return True
+
+    for i in range(3):
+        print(f'\t GO LED {i}')
+        led(i, True)
+        beep()
+        if(sleep_with_coaster_checks(500)):
+            print('DISQUALIFIED')
+            return True
+        led(i, False)
+        call_regularly()
+    led(3, 1) # Turn on green
+    led(4, 0) # turn off red
+    return False
+
+
+def random_countdown():
+    delay = random.randint(MIN_DELAY, MAX_DELAY)
+    print(f'Mode is random delay, waiting { delay } ms')
+    start_ticks = time.ticks_ms()
+    led(0, 1)
+    led(1, 1)
+    led(2, 1)
+    led(3, 0)
+    led(4, 1)
+    while( time.ticks_diff(time.ticks_ms(), start_ticks) < delay ):
+        call_regularly()
+        if not get_switch('COASTER'):
+            print('DISQUALIFIED')
+            return True
+    led(0, 0)
+    led(1, 0)
+    led(2, 0)
+    led(3, 1)
+    led(4, 0)
+    return False
+
+def handle_disqualification():
+    global abort
+    espnow.send(BROADCAST, json.dumps({ 'type': 'ABORT' }), False)
+
+    for i in range(5):
+        led(4, 1)
+        display.show('FAUL')
+        buzz()
+        call_regularly()
+        led(4, 0)
+        display.show('    ')
+        time.sleep(0.5)
+        call_regularly()
+    led(4,1)
+
+    abort = True
+
+
+def is_everybody_finished():
+    for mac, state in neighbors.items():
+        if state['self'] is True:
+            continue
+        if state['finished'] is not True:
+            return False
+    return True
+
+
+def did_we_win(time_diff):
+    for mac, state in neighbors.items():
+        if state['self'] is True:
+            continue
+        if state['time'] < time_diff:
+            # somebody else did better
+            return False
+    return True
+
+
+def time_to_go_loop():
+    global abort
+    global winner
+    global songfinished
+
+    display.show(" GO ")
+    beep(); beep(); beep()
+
+    # Reset State of the Game
+    abort = False
+    winner = False
+
+    for mac, state in neighbors.items():
+        state['finished'] = False
+        state['time'] = 0
+
+    time.sleep(1.0)
+    call_regularly()
+
+    if(get_switch('MODE')):
+        disqualified = light_countdown()
+    else:
+        disqualified = random_countdown()
+
+    if disqualified:
+        handle_disqualification()
+        led(0, 1)
+        led(1, 1)
+        led(2, 1)
+        led(3, 0)
+        led(4, 1)
+        return
+    elif abort:
+        # somebody else disqualified
+        led(0, 1)
+        led(1, 0)
+        led(2, 0)
+        led(3, 0)
+        led(4, 1)
+        beep(); beep(); beep();
+        return # Soembody disqualified
+
+    call_regularly()
+
+    # Start go tone:
+    beeper = PWM(PINS["BUZZER"], freq=2000, duty=512)
+    start_time = time.ticks_ms()
+    done = False
+    coaster_lifted = False
+
+    time_diff = 0
+    seconds = 0
+    ms = 0
+
+    while not done:
+        call_regularly()
+        time_diff = time.ticks_diff(time.ticks_ms(), start_time)
+        if time_diff > 1000:
+            beeper.deinit()
+        seconds = int(time_diff / 1000)
+        ms      = int( (time_diff % 1000) / 10 )
+        #print(f' Time is {seconds}.{ms}')
+        display.numbers(seconds, ms)
+
+        # Can't win until coaster's been lifted
+        if not get_switch('COASTER'):
+            coaster_lifted = True
+
+        if coaster_lifted and get_switch('COASTER'):
+            # We've finished, and may or may not have won
+            done = True
+
+    led(3,0) # turn off the green light when finished
+    espnow.send(BROADCAST, json.dumps({ 'type': 'FINISH', 'time': time_diff }), False)
+
+    while not is_everybody_finished():
+        call_regularly()
+
+    # Determine winner
+    # If winner, blink a few times and play a song
+    # If loser, play a sadder song
+    if did_we_win(time_diff):
+        songfinished = False
+        songthread = _thread.start_new_thread(play_song, [])
+        while not songfinished:
+            # thread will change end condition
+            display.show('    ')
+            time.sleep_ms(200)
+            display.numbers(seconds, ms)
+            time.sleep_ms(200)
+            call_regularly()
+    else:
+        buzz()
+
+
+
+
+
+##########################################################################
+##########################################################################
+##########################################################################
+if __name__ == "__main__":
+    initialize_network()
+    initialize()
+
+    should_display_counts = True
+    while True:
+        get_ready_loop(should_display_counts)
+        time_to_go_loop()
+        #test_buzzer()
+        should_display_counts = False
+
+
+#while True:
+#    ###################################################################
+#    # Loop code goes inside the loop here, this is called repeatedly: #
+#    ###################################################################
+#    print(i)
+#    i  = 1
+#    time.sleep(1.0)  # Delay for 1 second.

+ 1 - 0
micropython-tm1637

@@ -0,0 +1 @@
+Subproject commit 0ef0fc04829f6b3d12215a1479d60e2023f3465f

+ 2 - 0
test.py

@@ -0,0 +1,2 @@
+print('Hello world!')
+

+ 1 - 0
tm1637.py

@@ -0,0 +1 @@
+micropython-tm1637/tm1637.py