Scripts for G1 LEDs
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
G1_LEDs/led-udp.py

134 lines
4.2 KiB

#!/usr/bin/env python3
import logging
import socket
import threading
import time
import neopixel
logging.basicConfig(level=logging.DEBUG)
NUMBER_OF_PIXELS = 776
LISTEN_PORTS = [2811, 2812, 2813]
UDP_TIMEOUT = 1
def encode_24bit(white, red, green, blue):
return (white << 24) | (green << 16) | (red << 8) | blue
class NetworkThread(threading.Thread):
def __init__(self, port):
self.port = port
self.frame = [0] * NUMBER_OF_PIXELS
self.last_receive = 0
self.last_address = None
threading.Thread.__init__(self)
def __repr__(self):
if self.last_address:
return '<NetworkThread port={0} client={1[0]}:{1[1]}>'.format(self.port, self.last_address)
else:
return '<NetworkThread port={0}>'.format(self.port, self.last_address)
def is_active(self):
if time.time() - self.last_receive < UDP_TIMEOUT:
return True
def run(self):
while True:
try:
self.loop()
except:
logging.exception('{}'.format(self))
def loop(self):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind(("0.0.0.0", self.port))
while True:
data, address = sock.recvfrom(4000)
if address != self.last_address:
if self.is_active():
#print('!', end='', flush=True)
continue
else:
#logging.info('{} streaming from {}'.format(self, address))
self.last_address = address
if len(data) > 0:
if data[0] == 0x01:
# single colour
r, g, b = data[1:4]
value = encode_24bit(0, r, g, b)
for pixel in range(0, len(self.frame)):
self.frame[pixel] = value
self.last_receive = time.time()
#print('1', end='', flush=True)
elif data[0] == 0x03:
# full frame
pos = 1
pixel = 0
while pos + 2 < len(data):
r, g, b = data[pos:pos+3]
self.frame[pixel] = encode_24bit(0, r, g, b)
pixel = pixel + 1
pos = pos + 3
self.last_receive = time.time()
#print('3', end='', flush=True)
elif data[0] == 0x04:
# partial frame + render
pixel = (data[1] << 8) + data[2]
pos = 3
while pos + 2 < len(data):
r, g, b = data[pos:pos+3]
self.frame[pixel] = encode_24bit(0, r, g, b)
pixel = pixel + 1
pos = pos + 3
self.last_receive = time.time()
#print('4', end='', flush=True)
elif data[0] == 0x05:
# partial frame + render
pixel = (data[1] << 8) + data[2]
pos = 3
while pos + 2 < len(data):
r, g, b = data[pos:pos+3]
self.frame[pixel] = encode_24bit(0, r, g, b)
pixel = pixel + 1
pos = pos + 3
self.last_receive = time.time()
#print('5', end='', flush=True)
threadlist = []
for port in reversed(LISTEN_PORTS):
n = NetworkThread(port)
n.daemon = True
n.start()
threadlist.append(n)
strip = neopixel.Adafruit_NeoPixel(NUMBER_OF_PIXELS, 18, 800000, 5, False, 255)
strip.begin()
current_thread = None
while True:
found = False
for t in threadlist:
if t.is_active():
if current_thread is not t:
current_thread = t
print('thread: {}'.format(current_thread))
for i in range(0, len(t.frame)):
strip.setPixelColor(i, t.frame[i])
strip.show()
found = True
break
if not found and current_thread is not None:
current_thread = None
print('thread: {}'.format(current_thread))