From 76680c4a209b91d479fc9878cf6adfd84987b316 Mon Sep 17 00:00:00 2001 From: Grady O'Connell Date: Mon, 31 Mar 2025 20:04:35 -0700 Subject: [PATCH 1/2] led light experiment --- midimech.py | 6 +- requirements.txt | 17 +-- settings.ini.example | 2 +- src/constants.py | 1 + src/core.py | 79 +++++++++++--- src/leds.py | 254 +++++++++++++++++++++++++++++++++++++++++++ src/settings.py | 4 +- 7 files changed, 336 insertions(+), 27 deletions(-) create mode 100644 src/leds.py diff --git a/midimech.py b/midimech.py index 2db8b2d..392f6f4 100755 --- a/midimech.py +++ b/midimech.py @@ -32,19 +32,19 @@ try: import launchpad except ImportError: - error("The project dependencies have changed! Run the requirements setup command again!") + print("The project dependencies have changed! Run the requirements setup command again!") try: import yaml except ImportError: - error("The project dependencies have changed! Run the requirements setup command again!") + print("The project dependencies have changed! Run the requirements setup command again!") # import mido try: import musicpy as mp except ImportError: - error("The project dependencies have changed! Run the requirements setup command again!") + print("The project dependencies have changed! Run the requirements setup command again!") def main(): diff --git a/requirements.txt b/requirements.txt index f364c84..21d4152 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,8 +1,11 @@ -pygame-ce -pygame_gui -pyglm -rtmidi2 -launchpad-py +dataclasses==0.6 +launchpad_py==0.9.1 +mido_fix==1.2.12 musicpy -pyyaml -webcolors +pygame-ce==2.5.2 +pygame_gui==0.6.13 +pyglm==2.8.0 +python-i18n==0.3.9 +PyYAML==6.0.2 +rtmidi2==1.4.0 +webcolors==24.11.1 diff --git a/settings.ini.example b/settings.ini.example index e888190..5f0a417 100644 --- a/settings.ini.example +++ b/settings.ini.example @@ -6,7 +6,7 @@ size=128 ;velocity_curve=1.0 ;lights=1,9,9,2,2,3,3,5,8,8,11,11 ;split_lights=4,7,5,7,5,5,7,5,7,5,7,5 -;colors=red,darkred,orange,goldenrod,yellow,green,darkolivegreen,blue,darkslateblue,indigo,darkorchid,pink +colors=red,darkred,orange,goldenrod,yellow,green,darkolivegreen,blue,darkslateblue,indigo,darkorchid,pink ;split_colors=cyan,blue,blue,blue,blue,blue,blue,blue,blue,blue,blue,blue ;mark_light=1 ;mark_color=red diff --git a/src/constants.py b/src/constants.py index a8c4b66..c3f2069 100644 --- a/src/constants.py +++ b/src/constants.py @@ -13,6 +13,7 @@ GRAY = ivec3(16 * BRIGHTNESS) BORDER_COLOR = ivec3(48) DARK = ivec3(0) +BLACK = ivec3(0) BASE_OFFSET = -4 # linnstrument # CHORD_ANALYZER = get_option(opts,'chord_analyzer',False) EPSILON = 0.0001 diff --git a/src/core.py b/src/core.py index 930d754..8f21853 100644 --- a/src/core.py +++ b/src/core.py @@ -7,6 +7,7 @@ from dataclasses import dataclass from glm import ivec2, vec2, ivec3, vec3 import time +import colorsys from src.util import * from src.constants import * @@ -1419,19 +1420,29 @@ def __init__(self): self.options.octave_separation = get_option(opts, "octave_separation", DEFAULT_OPTIONS.octave_separation) self.options.octave_split = get_option(opts, "octave_split", DEFAULT_OPTIONS.octave_split) + self.options.width = get_option(opts, "width", 0) + self.options.height = get_option(opts, "height", 0) + + self.split_point = None + hardware_split = False - self.options.size = get_option(opts, "size", DEFAULT_OPTIONS.size) - if self.options.size == 128: - self.options.width = 16 - self.split_point = None - elif self.options.size == 200: - self.options.width = 25 - self.split_point = 11 - hardware_split = True - elif self.options.size < 0: # test hardware split - self.options.width = 16 - self.split_point = -self.options.size - hardware_split = True + self.split_point = 11 + if self.options.width<=0 or self.options.height<=0: + self.options.size = get_option(opts, "size", DEFAULT_OPTIONS.size) + if self.options.size == 200: + self.options.width = 25 + self.options.height = 8 + self.split_point = 11 + hardware_split = False + elif self.options.size < 0: # test hardware split + self.options.width = 16 + self.options.height = 8 + self.split_point = -self.options.size + hardware_split = True + else: # if self.options.size == 128: + self.options.width = 16 + self.options.height = 8 + self.split_point = None # Note: The default below is what is determined by size above. # Overriding hardware_split is only useful for 128 user testing 200 behavior @@ -1477,10 +1488,11 @@ def __init__(self): self.scale = vec2(64.0) self.board_w = self.options.width + self.board_h = self.options.height self.board_sz = ivec2(self.board_w, self.board_h) self.screen_w = self.board_w * self.scale.x self.screen_h = self.board_h * self.scale.y + self.menu_sz + self.status_sz - self.button_sz = self.screen_w / self.board_w + self.button_sz = 64 self.screen_sz = ivec2(self.screen_w, self.screen_h) self.lowest_note = None # x,y location of lowest note currently pressed @@ -1823,6 +1835,14 @@ def __init__(self): self.setup_rpn() # self.test() + self.leds = None + # try: + # from src.leds import LEDGridInterface + # self.leds = LEDGridInterface() + # print("LEDs initialized") + # except Exception as e: + # print("LEDs not initialized: ", e) + def midi_mode_rpn(self, on=True): if on: self.rpn(0, 1 if self.is_mpe() else 0) @@ -2357,6 +2377,10 @@ def render(self): sz = self.screen_w / self.board_w y = 0 rad = int(sz // 2 - 8) + T = pygame.time.get_ticks()/1000 + + if self.leds: + self.leds.clear(BLACK) for row in self.board: x = 0 @@ -2376,12 +2400,36 @@ def render(self): # else: # col = self.get_color(x, y) lit_col = ivec3(255, 0, 0) - unlit_col = copy.copy(self.get_color(x, y) or ivec3(0)) + col = self.get_color(x, y) or ivec3(0) + unlit_col = copy.copy(col) black = unlit_col == ivec3(0) inner_col = copy.copy(unlit_col) for i in range(len(unlit_col)): unlit_col[i] = min(255, unlit_col[i] * 1.5) + if self.leds: + if cell: + lit = glm.vec3(1,1,1) + self.leds.put(lit, x, y) + # self.leds.put(red, x*2+1, y*2) + # self.leds.put(red, x*2+1, y*2+1) + # self.leds.put(red, x*2, y*2+1) + else: + colf = glm.vec3(col.x / 255, col.y / 255, col.z / 255) + # # change saturation + # r, g, b = colf.x, colf.y, colf.z + # h, s, v = colorsys.rgb_to_hsv(r, g, b) + # # if v > 0.1: + # # h = (h + T * 0.1) % 1.0 + # # h, s, v = color_grade_hsv((h, s, v)) + # r, g, b = colorsys.hsv_to_rgb(h, s, v) + # colf = glm.vec3(r, g, b) + + self.leds.put(colf, x, y) + # self.leds.put(colf, x*2+1, y*2) + # self.leds.put(colf / 2, x*2+1, y*2+1) + # self.leds.put(colf / 2, x*2, y*2+1) + ry = y + self.menu_sz # real y # pygame.gfxdraw.box(self.screen.surface, [x*sz + b, self.menu_sz + y*sz + b, sz - b, sz - b], unlit_col) rect = [x * sz + b, self.menu_sz + y * sz + b, sz - b, sz - b] @@ -2476,6 +2524,9 @@ def render(self): x += 1 y += 1 + if self.leds: + self.leds.draw() + # if self.gamepad: # pos = self.gamepad.positions() # # gp_pos.y = self.board_h - y - 1 diff --git a/src/leds.py b/src/leds.py new file mode 100644 index 0000000..d9af753 --- /dev/null +++ b/src/leds.py @@ -0,0 +1,254 @@ +import io +import struct +import glm +import time +import sys +import multiprocessing +from queue import Empty, Full + +def led_grid_worker(command_queue, width=24, height=16): + import xled + class LEDGridDisplay: + def __init__(self, width=24, height=16): + self.WIDTH = width + self.HEIGHT = height + self.SIZE = width * height + self.BMP = [None] * self.SIZE + self.SCREEN = [glm.vec3(0, 0, 0) for _ in range(self.SIZE)] + self.ORIENT = [0] * self.SIZE + + # Colors + self.RED = glm.vec3(1, 0, 0) + self.GREEN = glm.vec3(0, 1, 0) + self.BLUE = glm.vec3(0, 0, 1) + self.BLACK = glm.vec3(0, 0, 0) + self.WHITE = glm.vec3(1, 1, 1) + + # Grid configuration + self.GRID_SIZE = 8 + self.GRIDS_WIDE = 3 + self.GRIDS_HIGH = 2 + + # Initialize device + try: + dev = xled.discover.discover() + self.host = dev.ip_address + print(f"Connected to device at {self.host}") + self.ctr = xled.ControlInterface(self.host) + self.ctr.set_mode("rt") + except: + print("No device found, running in simulation mode") + self.ctr = None + + # Block configuration + self.BLOCK_FLAGS = [set() for _ in range(6)] + self.LBLOCK = [0] * 6 + self.PBLOCK = [0] * 6 + self._setup_block_config() + self._setup_orientations() + + def _setup_block_config(self): + # Physical block flags + self.BLOCK_FLAGS[0].add('h') + self.BLOCK_FLAGS[1].add('h') + self.BLOCK_FLAGS[2].add('h') + self.BLOCK_FLAGS[3].add('v') + self.BLOCK_FLAGS[4].add('v') + self.BLOCK_FLAGS[5].add('v') + + # Logical to Physical block mapping + self.LBLOCK[0], self.PBLOCK[3] = 3, 0 + self.LBLOCK[1], self.PBLOCK[4] = 4, 1 + self.LBLOCK[2], self.PBLOCK[5] = 5, 2 + self.LBLOCK[3], self.PBLOCK[2] = 2, 3 + self.LBLOCK[4], self.PBLOCK[1] = 1, 4 + self.LBLOCK[5], self.PBLOCK[0] = 0, 5 + + def _setup_orientations(self): + r = 0 + for lblock in range(6): + pblock = self.PBLOCK[lblock] + H = 'h' in self.BLOCK_FLAGS[pblock] + V = 'v' in self.BLOCK_FLAGS[pblock] + for i in range(64): + x = i % 8 + y = i // 8 + flip_row = (y % 2 == 0) + if H: + flip_row = not flip_row + if V: + flip_row = not flip_row + if flip_row: + self.ORIENT[r] = (pblock * 64) + (x + (8 * (7-y)) if V else x + (8 * y)) + else: + self.ORIENT[r] = (pblock * 64) + ((7-x) + (8 * (7-y)) if V else (7-x) + (8 * y)) + r += 1 + + def encode_frame(self, frame): + for i in range(self.SIZE): + px = frame[i] + r = glm.clamp(int(px[0] * 255), 0, 255) + g = glm.clamp(int(px[1] * 255), 0, 255) + b = glm.clamp(int(px[2] * 255), 0, 255) + self.BMP[i] = struct.pack(">BBB", r, g, b) + frame_io = io.BytesIO() + frame_io.write(b"".join(self.BMP)) + frame_io.seek(0) + return frame_io + + def clear(self, color): + for i in range(self.SIZE): + self.SCREEN[i] = color + + def draw(self): + frame = self.encode_frame(self.SCREEN) + self.ctr.set_rt_frame_rest(frame) + + def transform(self, i): + total_width = self.GRIDS_WIDE * self.GRID_SIZE + global_x = i % total_width + global_y = i // total_width + grid_num_x = global_x // self.GRID_SIZE + grid_num_y = global_y // self.GRID_SIZE + grid_num = grid_num_y * self.GRIDS_WIDE + grid_num_x + local_x = global_x % self.GRID_SIZE + local_y = global_y % self.GRID_SIZE + grid_index = local_y * self.GRID_SIZE + local_x + return grid_index + 64 * grid_num + + def transform_xy(self, x, y): + p = self.transform(x + (y * self.WIDTH)) + return glm.ivec2(p % self.WIDTH, p // self.WIDTH) + + def put(self, color, x, y): + if y < 0 or y >= self.HEIGHT: + return + if x < 0 or x >= self.WIDTH: + return + i = x + (y * self.WIDTH) + ii = self.ORIENT[self.transform(i)] + self.SCREEN[ii] = color + + def test_pattern(self): + # Print coordinate mapping + # for j in range(self.HEIGHT): + # for i in range(self.WIDTH): + # t = self.transform_xy(i, j) + # print(f"{i},{j} {t.x},{t.y}", end=' ') + # print() + + # Test display + for i in range(self.SIZE): + pos = glm.ivec2(i % self.WIDTH, i // self.WIDTH) + self.clear(self.BLACK) + self.put(self.RED, pos.x, pos.y) + self.draw() + # time.sleep(0.001) + + display = LEDGridDisplay(width, height) + + last_draw_time = 0 + DRAW_INTERVAL = 0.25 + dirty = False + + while True: + try: + command, args = command_queue.get(timeout=0.1) + if command == "put": + display.put(*args) + elif command == "clear": + display.clear(*args) + elif command == "draw": + # Ignore explicit draw commands; we'll handle it in the loop + dirty = True + + # Check if it's time to draw + if dirty: + current_time = time.time() + if current_time - last_draw_time >= DRAW_INTERVAL: + display.draw() + last_draw_time = current_time + dirty = False + + except Empty: + # Still check for drawing even if queue is empty + if dirty: + current_time = time.time() + if current_time - last_draw_time >= DRAW_INTERVAL: + display.draw() + last_draw_time = current_time + dirty = False + time.sleep(0.01) # Avoid busy-waiting + except Exception as e: + print(f"Worker: Error processing command: {e}") + # last_draw_time = time.time() + + # Process commands from the queue + while True: + try: + command, args = command_queue.get(timeout=1) + if command == "put": + display.put(*args) + elif command == "clear": + display.clear(*args) + elif command == "draw": + display.draw() + except Empty: + continue + +class LEDGridInterface: + def __init__(self, width=24, height=16): + self.command_queue = multiprocessing.Queue(maxsize=10) + self.process = multiprocessing.Process( + target=led_grid_worker, + args=(self.command_queue, width, height), + daemon=True + ) + self.process.start() + time.sleep(1) + + def put(self, color, x, y): + try: + self.command_queue.put(("put", (color, x, y))) + except Full: + return False + return True + + def clear(self, color): + try: + self.command_queue.put(("clear", (color,))) + except Full: + return False + return True + + def draw(self): + try: + self.command_queue.put(("draw", ())) + except Full: + return False + return True + +# Example usage +if __name__ == "__main__": + # Create the interface + display = LEDGridInterface() + + # Test pattern equivalent + RED = glm.vec3(1, 0, 0) + BLACK = glm.vec3(0, 0, 0) + WHITE = glm.vec3(1, 1, 1) + + for i in range(24 * 16): # SIZE = WIDTH * HEIGHT + pos = glm.ivec2(i % 24, i // 24) + display.clear(BLACK) + display.put(RED, pos.x, pos.y) + display.draw() + time.sleep(0.001) # Small delay to simulate original timing + + display.clear(WHITE) + display.draw() + + # Give some time for the process to handle commands + time.sleep(1) + print("Main process done") + diff --git a/src/settings.py b/src/settings.py index c6eef24..b0538d6 100644 --- a/src/settings.py +++ b/src/settings.py @@ -80,8 +80,8 @@ class Settings: size: int = 128 # Right now these are calculated from size, don't use - width: int = 16 - height: int = 8 + width: int = 0 + height: int = 0 # launchpad viberato method (off, mod, or pitch) vibrato: str = 'mod' From 0c896b1603a3af43cda7d3b35e3dab7353a46c20 Mon Sep 17 00:00:00 2001 From: Grady O'Connell Date: Sat, 12 Apr 2025 15:42:33 -0700 Subject: [PATCH 2/2] LED experiment (to be moved to plugin) --- src/core.py | 21 ++- src/leds.py | 456 +++++++++++++++++++++++++++------------------------- 2 files changed, 252 insertions(+), 225 deletions(-) diff --git a/src/core.py b/src/core.py index 8f21853..45684c3 100644 --- a/src/core.py +++ b/src/core.py @@ -18,6 +18,11 @@ from src.articulation import Articulation # from src.gamepad import Gamepad +LEDS = False +if LEDS: + from src.leds import LEDGridDisplay + print(LEDGridDisplay) + with open(os.devnull, "w") as devnull: # suppress pygame messages (to keep console output clean) stdout = sys.stdout @@ -1836,12 +1841,12 @@ def __init__(self): # self.test() self.leds = None - # try: - # from src.leds import LEDGridInterface - # self.leds = LEDGridInterface() - # print("LEDs initialized") - # except Exception as e: - # print("LEDs not initialized: ", e) + if LEDS: + try: + self.leds = LEDGridDisplay() + print("LEDs initialized") + except Exception as e: + print("LEDs not initialized: ", e) def midi_mode_rpn(self, on=True): if on: @@ -2410,7 +2415,7 @@ def render(self): if self.leds: if cell: lit = glm.vec3(1,1,1) - self.leds.put(lit, x, y) + self.leds.put(lit, x, y, 2) # self.leds.put(red, x*2+1, y*2) # self.leds.put(red, x*2+1, y*2+1) # self.leds.put(red, x*2, y*2+1) @@ -2425,7 +2430,7 @@ def render(self): # r, g, b = colorsys.hsv_to_rgb(h, s, v) # colf = glm.vec3(r, g, b) - self.leds.put(colf, x, y) + self.leds.put(colf, x, y, 2) # self.leds.put(colf, x*2+1, y*2) # self.leds.put(colf / 2, x*2+1, y*2+1) # self.leds.put(colf / 2, x*2, y*2+1) diff --git a/src/leds.py b/src/leds.py index d9af753..569d1ed 100644 --- a/src/leds.py +++ b/src/leds.py @@ -6,232 +6,254 @@ import multiprocessing from queue import Empty, Full -def led_grid_worker(command_queue, width=24, height=16): - import xled - class LEDGridDisplay: - def __init__(self, width=24, height=16): - self.WIDTH = width - self.HEIGHT = height - self.SIZE = width * height - self.BMP = [None] * self.SIZE - self.SCREEN = [glm.vec3(0, 0, 0) for _ in range(self.SIZE)] - self.ORIENT = [0] * self.SIZE - - # Colors - self.RED = glm.vec3(1, 0, 0) - self.GREEN = glm.vec3(0, 1, 0) - self.BLUE = glm.vec3(0, 0, 1) - self.BLACK = glm.vec3(0, 0, 0) - self.WHITE = glm.vec3(1, 1, 1) - - # Grid configuration - self.GRID_SIZE = 8 - self.GRIDS_WIDE = 3 - self.GRIDS_HIGH = 2 - - # Initialize device - try: - dev = xled.discover.discover() - self.host = dev.ip_address - print(f"Connected to device at {self.host}") - self.ctr = xled.ControlInterface(self.host) - self.ctr.set_mode("rt") - except: - print("No device found, running in simulation mode") - self.ctr = None - - # Block configuration - self.BLOCK_FLAGS = [set() for _ in range(6)] - self.LBLOCK = [0] * 6 - self.PBLOCK = [0] * 6 - self._setup_block_config() - self._setup_orientations() - - def _setup_block_config(self): - # Physical block flags - self.BLOCK_FLAGS[0].add('h') - self.BLOCK_FLAGS[1].add('h') - self.BLOCK_FLAGS[2].add('h') - self.BLOCK_FLAGS[3].add('v') - self.BLOCK_FLAGS[4].add('v') - self.BLOCK_FLAGS[5].add('v') - - # Logical to Physical block mapping - self.LBLOCK[0], self.PBLOCK[3] = 3, 0 - self.LBLOCK[1], self.PBLOCK[4] = 4, 1 - self.LBLOCK[2], self.PBLOCK[5] = 5, 2 - self.LBLOCK[3], self.PBLOCK[2] = 2, 3 - self.LBLOCK[4], self.PBLOCK[1] = 1, 4 - self.LBLOCK[5], self.PBLOCK[0] = 0, 5 - - def _setup_orientations(self): - r = 0 - for lblock in range(6): - pblock = self.PBLOCK[lblock] - H = 'h' in self.BLOCK_FLAGS[pblock] - V = 'v' in self.BLOCK_FLAGS[pblock] - for i in range(64): - x = i % 8 - y = i // 8 - flip_row = (y % 2 == 0) - if H: - flip_row = not flip_row - if V: - flip_row = not flip_row - if flip_row: - self.ORIENT[r] = (pblock * 64) + (x + (8 * (7-y)) if V else x + (8 * y)) - else: - self.ORIENT[r] = (pblock * 64) + ((7-x) + (8 * (7-y)) if V else (7-x) + (8 * y)) - r += 1 - - def encode_frame(self, frame): - for i in range(self.SIZE): - px = frame[i] - r = glm.clamp(int(px[0] * 255), 0, 255) - g = glm.clamp(int(px[1] * 255), 0, 255) - b = glm.clamp(int(px[2] * 255), 0, 255) - self.BMP[i] = struct.pack(">BBB", r, g, b) - frame_io = io.BytesIO() - frame_io.write(b"".join(self.BMP)) - frame_io.seek(0) - return frame_io - - def clear(self, color): - for i in range(self.SIZE): - self.SCREEN[i] = color - - def draw(self): - frame = self.encode_frame(self.SCREEN) - self.ctr.set_rt_frame_rest(frame) - - def transform(self, i): - total_width = self.GRIDS_WIDE * self.GRID_SIZE - global_x = i % total_width - global_y = i // total_width - grid_num_x = global_x // self.GRID_SIZE - grid_num_y = global_y // self.GRID_SIZE - grid_num = grid_num_y * self.GRIDS_WIDE + grid_num_x - local_x = global_x % self.GRID_SIZE - local_y = global_y % self.GRID_SIZE - grid_index = local_y * self.GRID_SIZE + local_x - return grid_index + 64 * grid_num - - def transform_xy(self, x, y): - p = self.transform(x + (y * self.WIDTH)) - return glm.ivec2(p % self.WIDTH, p // self.WIDTH) - - def put(self, color, x, y): - if y < 0 or y >= self.HEIGHT: - return - if x < 0 or x >= self.WIDTH: - return - i = x + (y * self.WIDTH) - ii = self.ORIENT[self.transform(i)] - self.SCREEN[ii] = color - - def test_pattern(self): - # Print coordinate mapping - # for j in range(self.HEIGHT): - # for i in range(self.WIDTH): - # t = self.transform_xy(i, j) - # print(f"{i},{j} {t.x},{t.y}", end=' ') - # print() - - # Test display - for i in range(self.SIZE): - pos = glm.ivec2(i % self.WIDTH, i // self.WIDTH) - self.clear(self.BLACK) - self.put(self.RED, pos.x, pos.y) - self.draw() - # time.sleep(0.001) - - display = LEDGridDisplay(width, height) - - last_draw_time = 0 - DRAW_INTERVAL = 0.25 - dirty = False - - while True: +class LEDGridDisplay: + def __init__(self, width=32, height=16): + import xled + self.WIDTH = width + self.HEIGHT = height + self.SIZE = width * height + self.BMP = [None] * self.SIZE + self.SCREEN = [glm.vec3(0, 0, 0) for _ in range(self.SIZE)] + self.ORIENT = [0] * self.SIZE + + # Colors + self.RED = glm.vec3(1, 0, 0) + self.GREEN = glm.vec3(0, 1, 0) + self.BLUE = glm.vec3(0, 0, 1) + self.BLACK = glm.vec3(0, 0, 0) + self.WHITE = glm.vec3(1, 1, 1) + + # Grid configuration + self.GRID_SIZE = 8 # subgrid is always 8 on device + self.GRIDS_WIDE = 4 + self.GRIDS_HIGH = 2 + self.NUM_GRIDS = self.GRIDS_WIDE * self.GRIDS_HIGH + + # Initialize device try: - command, args = command_queue.get(timeout=0.1) - if command == "put": - display.put(*args) - elif command == "clear": - display.clear(*args) - elif command == "draw": - # Ignore explicit draw commands; we'll handle it in the loop - dirty = True - - # Check if it's time to draw - if dirty: - current_time = time.time() - if current_time - last_draw_time >= DRAW_INTERVAL: - display.draw() - last_draw_time = current_time - dirty = False - - except Empty: - # Still check for drawing even if queue is empty - if dirty: - current_time = time.time() - if current_time - last_draw_time >= DRAW_INTERVAL: - display.draw() - last_draw_time = current_time - dirty = False - time.sleep(0.01) # Avoid busy-waiting - except Exception as e: - print(f"Worker: Error processing command: {e}") - # last_draw_time = time.time() - - # Process commands from the queue - while True: - try: - command, args = command_queue.get(timeout=1) - if command == "put": - display.put(*args) - elif command == "clear": - display.clear(*args) - elif command == "draw": - display.draw() - except Empty: - continue - -class LEDGridInterface: - def __init__(self, width=24, height=16): - self.command_queue = multiprocessing.Queue(maxsize=10) - self.process = multiprocessing.Process( - target=led_grid_worker, - args=(self.command_queue, width, height), - daemon=True - ) - self.process.start() - time.sleep(1) - - def put(self, color, x, y): - try: - self.command_queue.put(("put", (color, x, y))) - except Full: - return False - return True + dev = xled.discover.discover() + self.host = dev.ip_address + print(f"Connected to device at {self.host}") + self.ctr = xled.ControlInterface(self.host) + self.ctr.set_mode("rt") + except Exception as ex: + print(ex) + + # Block configuration + self.BLOCK_FLAGS = [None] * self.NUM_GRIDS + for i in range(self.NUM_GRIDS): + self.BLOCK_FLAGS[i] = set() + self.LBLOCK = [0] * self.NUM_GRIDS + self.PBLOCK = [0] * self.NUM_GRIDS + self._setup_block_config() + self._setup_orientations() + + + def _setup_block_config(self): + # # Physical block flags + self.BLOCK_FLAGS[0].add('h') + self.BLOCK_FLAGS[1].add('h') + self.BLOCK_FLAGS[2].add('h') + self.BLOCK_FLAGS[3].add('h') + self.BLOCK_FLAGS[4].add('v') + self.BLOCK_FLAGS[5].add('v') + self.BLOCK_FLAGS[6].add('v') + self.BLOCK_FLAGS[7].add('h') + + # Logical to Physical block mapping + self.PBLOCK[0] = 7 + self.PBLOCK[1] = 6 + self.PBLOCK[2] = 5 + self.PBLOCK[3] = 4 + self.PBLOCK[4] = 0 + self.PBLOCK[5] = 1 + self.PBLOCK[6] = 2 + self.PBLOCK[7] = 3 + + def _setup_orientations(self): + r = 0 + for lblock in range(8): + pblock = self.PBLOCK[lblock] + H = 'h' in self.BLOCK_FLAGS[pblock] + V = 'v' in self.BLOCK_FLAGS[pblock] + for i in range(64): + x = i % 8 + y = i // 8 + flip_row = (y % 2 == 0) + if H: + flip_row = not flip_row + if V: + flip_row = not flip_row + if flip_row: + self.ORIENT[r] = (pblock * 64) + (x + (8 * (7-y)) if V else x + (8 * y)) + else: + self.ORIENT[r] = (pblock * 64) + ((7-x) + (8 * (7-y)) if V else (7-x) + (8 * y)) + r += 1 + + def encode_frame(self, frame): + for i in range(self.SIZE): + px = frame[i] + r = glm.clamp(int(px[0] * 255), 0, 255) + g = glm.clamp(int(px[1] * 255), 0, 255) + b = glm.clamp(int(px[2] * 255), 0, 255) + self.BMP[i] = struct.pack(">BBB", r, g, b) + frame_io = io.BytesIO() + frame_io.write(b"".join(self.BMP)) + frame_io.seek(0) + return frame_io def clear(self, color): - try: - self.command_queue.put(("clear", (color,))) - except Full: - return False - return True + for i in range(self.SIZE): + self.SCREEN[i] = color def draw(self): - try: - self.command_queue.put(("draw", ())) - except Full: - return False - return True + frame = self.encode_frame(self.SCREEN) + self.ctr.set_rt_frame_rest(frame) + + def transform(self, i): + total_width = self.GRIDS_WIDE * self.GRID_SIZE + global_x = i % total_width + global_y = i // total_width + grid_num_x = global_x // self.GRID_SIZE + grid_num_y = global_y // self.GRID_SIZE + grid_num = grid_num_y * self.GRIDS_WIDE + grid_num_x + local_x = global_x % self.GRID_SIZE + local_y = global_y % self.GRID_SIZE + grid_index = local_y * self.GRID_SIZE + local_x + return grid_index + 64 * grid_num + + def transform_xy(self, x, y): + p = self.transform(x + (y * self.WIDTH)) + return glm.ivec2(p % self.WIDTH, p // self.WIDTH) + + # def put(self, color, x, y): + # if y < 0 or y >= self.HEIGHT: + # return + # if x < 0 or x >= self.WIDTH: + # return + # i = x + (y * self.WIDTH) + # ii = self.ORIENT[self.transform(i)] + # self.SCREEN[ii] = color + + def put(self, color, x, y, size=1): + x *= size + y *= size + for yofs in range(size): + for xofs in range(size): + if y < 0 or y >= self.HEIGHT: + continue + if x < 0 or x >= self.WIDTH: + continue + i = x + xofs + (y+yofs) * self.WIDTH + try: + ii = self.ORIENT[self.transform(i)] + except Exception as ex: + continue + self.SCREEN[ii] = color + + def test_pattern(self): + # Print coordinate mapping + # for j in range(self.HEIGHT): + # for i in range(self.WIDTH): + # t = self.transform_xy(i, j) + # print(f"{i},{j} {t.x},{t.y}", end=' ') + # print() + + # Test display + for i in range(self.SIZE): + pos = glm.ivec2(i % self.WIDTH, i // self.WIDTH) + self.clear(self.BLACK) + self.put(self.RED, pos.x, pos.y) + self.draw() + # time.sleep(0.001) + +# display = LEDGridDisplay(width, height) + +# last_draw_time = 0 +# DRAW_INTERVAL = 0.25 +# dirty = False + +# while True: +# try: +# command, args = command_queue.get(timeout=0.1) +# if command == "put": +# display.put(*args) +# elif command == "clear": +# display.clear(*args) +# elif command == "draw": +# # Ignore explicit draw commands; we'll handle it in the loop +# dirty = True + +# # Check if it's time to draw +# if dirty: +# current_time = time.time() +# if current_time - last_draw_time >= DRAW_INTERVAL: +# display.draw() +# last_draw_time = current_time +# dirty = False + +# except Empty: +# # Still check for drawing even if queue is empty +# if dirty: +# current_time = time.time() +# if current_time - last_draw_time >= DRAW_INTERVAL: +# display.draw() +# last_draw_time = current_time +# dirty = False +# time.sleep(0.01) # Avoid busy-waiting +# except Exception as e: +# print(f"Worker: Error processing command: {e}") +# # last_draw_time = time.time() + +# # Process commands from the queue +# while True: +# try: +# command, args = command_queue.get(timeout=1) +# if command == "put": +# display.put(*args) +# elif command == "clear": +# display.clear(*args) +# elif command == "draw": +# display.draw() +# except Empty: +# continue + +# class LEDGridInterface: +# def __init__(self, width=24, height=16): +# self.command_queue = multiprocessing.Queue(maxsize=10) +# self.process = multiprocessing.Process( +# target=led_grid_worker, +# args=(self.command_queue, width, height), +# daemon=True +# ) +# self.process.start() +# time.sleep(1) + +# def put(self, color, x, y): +# try: +# self.command_queue.put(("put", (color, x, y))) +# except Full: +# return False +# return True + +# def clear(self, color): +# try: +# self.command_queue.put(("clear", (color,))) +# except Full: +# return False +# return True + +# def draw(self): +# try: +# self.command_queue.put(("draw", ())) +# except Full: +# return False +# return True # Example usage if __name__ == "__main__": # Create the interface - display = LEDGridInterface() + display = LEDGridDisplay() # Test pattern equivalent RED = glm.vec3(1, 0, 0)