Skip to content
Open

Nso #92

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,6 @@ __pycache__
!.readthedocs.yaml
*high_score*
chocolate-doom
site
site
*.pio
*.vscode
Binary file added demos/all_on/.main.py.swo
Binary file not shown.
39 changes: 39 additions & 0 deletions demos/all_on/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
class AllOn:
"""This demo turns all the segments on"""

demo_time = None

# User input is passed through input_queue
# Game output is passed through output_queue
# Screen updates are done through the screen object
def __init__(self, input_queue, output_queue, screen):
"""
Constructor

Args:
input_queue (Queue): Queue for user input
output_queue (Queue): Queue for game output
screen (Screen): Screen object
"""
# Provide the framerate in frames/seconds and the amount of time of the demo in seconds
self.frame_rate = 50

self.input_queue = input_queue
self.output_queue = output_queue
self.screen = screen
# init demo/game specific variables here

def run(self):
"""Run the demo"""
# Create generator here
for column in range(0, self.screen.x_width):
for row in range(0, self.screen.y_height):
self.screen.draw_pixel(column, row, 0xF, combine=True)

self.screen.push()
while True:
yield

def stop(self):
"""Reset the state of the demo if needed, else leave blank"""
pass
2 changes: 1 addition & 1 deletion demos/netlab_flag/main.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
class NetlabFlag:
"""This demo puts a cool netlab logo along with some checker pattern on the screen"""

demo_time = 15
demo_time = None

# User input is passed through input_queue
# Game output is passed through output_queue
Expand Down
210 changes: 210 additions & 0 deletions demos/prusa/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
import os

import requests
import yaml


class Prusa:
"""
This is not a game, but a demo that shows the status of a Prusa printer
using the PrusaLink API. It is designed for an 'SSSMini' which is a 1x1 screen.
To use this demo, you need to have a Prusa printer with PrusaLink enabled
and the API key set up in the config.yaml file.
Example config.yaml:
prusalink_ip: "<PRUSA_LINK_IP>"
api_key: "<API_KEY>"
You can also add the service file to your systemd services to run it automatically.
Ensure that your system is connected to the same network as the Prusa printer,
and you have a connection to the screen.
"""

demo_time = None # Number of seconds or None if its game

def __init__(self, input_queue, output_queue, screen):
"""
Constructor

Args:
input_queue (Queue): Queue for user input
output_queue (Queue): Queue for game output
screen (Screen): Screen object
"""
# Provide the framerate in frames/seconds and the amount of time of the demo in seconds
self.frame_rate = 0.5

self.input_queue = input_queue
self.output_queue = output_queue
self.screen = screen

# init demo/game specific variables here
config_path = os.path.join(os.path.dirname(__file__), "config.yaml")
with open(config_path, "r") as f:
config = yaml.safe_load(f)
self.prusalink_ip = config.get("prusalink_ip")
self.api_key = config.get("api_key")
self.URL = f"http://{self.prusalink_ip}/api/v1/job"
self.HEADERS = {
"X-Api-Key": self.api_key,
}
self.response = None
self.data = None
self.state = "N/A"
self.filename = "N/A"
self.progress = 0
self.time_elapsed = "N/A"
self.time_left = "N/A"
self.minutes_left = "N/A"
self.display_filename = "N/A"
self.display_status = "N/A"
self.display_time_elapsed = "0"
self.display_time_left = "0"
self.display_progress = 0

def get_stats(self):
"""
Returns static information.
"""
try:
self.response = requests.get(self.URL, headers=self.HEADERS, timeout=5)
self.response.raise_for_status()

# Check if response has content before parsing JSON
if not self.response.text.strip():
# print("Empty response received")
return self._get_default_stats()

# Check content type
content_type = self.response.headers.get("content-type", "")
if "application/json" not in content_type:
print(f"Unexpected content type: {content_type}")
print(f"Response body: {self.response.text[:200]}") # First 200 chars
return self._get_default_stats()

self.data = self.response.json()

self.state = self.data.get("state", "N/A")
self.filename = self.data.get("file", {}).get("display_name", "N/A")
self.progress = self.data.get("progress", 0)
self.time_elapsed = self.data.get("time_printing", "N/A")
self.time_left = self.data.get("time_remaining", "N/A")
self.minutes_left = (
self.time_left // 60 if isinstance(self.time_left, int) else "N/A"
)

except requests.RequestException as e:
# print(f"Request failed: {e}")
return self._get_default_stats()
except ValueError as e:
# print(f"JSON parsing failed: {e}")
# print(f"Response status: {self.response.status_code}")
# print(f"Response headers: {dict(self.response.headers)}")
# print(f"Response body: {self.response.text[:200]}") # First 200 chars
return self._get_default_stats()

return {
"state": self.state,
"filename": self.filename,
"progress": self.progress,
"time_elapsed": self.time_elapsed,
"time_left": self.time_left,
"minutes_left": self.minutes_left,
}

def _get_default_stats(self):
"""Return default stats when API call fails"""
return {
"state": "IDLE",
"filename": "N/A",
"progress": 0,
"time_elapsed": "0",
"time_left": "0",
"minutes_left": "0",
}

def run(self):
"""Main loop for the demo"""
# Create generator here
while True:
self.stats = self.get_stats()

# print("=== Printer Status ===")
# print(f"time left: {self.stats['time_left']}")
# print(f"Stats: {self.stats}")
# draw the filename
self.display_status = self.stats["state"]
# print(f"Status: {self.display_status}")
if (
self.stats["state"] == "IDLE"
and int(self.display_time_elapsed[0:8].strip()) > 0
):
self.display_progress = 0
self.display_time_elapsed = "0"
self.display_time_left = "0"
self.display_filename = "N/A"
self.screen.clear()
print("hit")

self.screen.draw_text(
self.screen.x_width // 2 - 8,
self.screen.y_height // 2 - 6,
self.display_status,
push=True,
)

if self.stats["state"] == "IDLE":
yield
continue

self.display_filename = self.stats["filename"][:16].upper()
# print(f"Filename: {self.display_filename}")
self.screen.draw_text(
self.screen.x_width // 2 - 8,
self.screen.y_height // 2 - 4,
self.display_filename,
push=True,
)

# print(f"Time left: {self.stats['minutes_left']} minutes")
self.display_time_elapsed = (
f"{str(self.stats['time_elapsed']):>5} ELAPSED"
)
self.screen.draw_text(
self.screen.x_width // 2 - 8,
self.screen.y_height // 2 - 2,
self.display_time_elapsed,
push=True,
)

# print(f"Time left: {self.stats['minutes_left']} minutes")
self.display_time_left = f"{str(self.stats['time_left']):>5} LEFT{self.stats['minutes_left']/60:>4.1f}"
self.screen.draw_text(
self.screen.x_width // 2 - 8,
self.screen.y_height // 2,
self.display_time_left,
push=True,
)

# Map progress (0-100) to a value between 0 and 16
percentage_complete = int((self.stats["progress"] / 100) * 16)
percentage_complete = max(0, min(percentage_complete, 16))
# print(f"Progress: {self.stats['progress']}%, (mapped to {percentage_complete})")
self.display_progress = f"{int(self.stats['progress']):>5} PROGRESS"
self.screen.draw_text(
self.screen.x_width // 2 - 8,
self.screen.y_height // 2 + 2,
self.display_progress,
push=True,
)
for i in range(percentage_complete):
self.screen.draw_pixel(
self.screen.x_width // 2 - 8 + i,
self.screen.y_height // 2 + 5,
15,
combine=False,
push=True,
)
yield

def stop(self):
"""Reset the state of the demo if needed, else leave blank"""
pass
15 changes: 15 additions & 0 deletions demos/prusa/template.service
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[Unit]
Description=Prusa Printer Status Demo
After=network.target

[Service]
Type=simple
User=<USER>
WorkingDirectory=<PATH TO SSS>
Environment=PATH=<PATH TO venv/bin>
ExecStart=<PATH TO VENV>/python <PATH TO>/main.py demo -n prusa
Restart=always
RestartSec=10

[Install]
WantedBy=multi-user.target
2 changes: 1 addition & 1 deletion demos/sine/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def __init__(self, input_queue, output_queue, screen):
screen (Screen): Surface to draw on

"""
self.frame_rate = 10
self.frame_rate = 20

self.input_queue = input_queue
self.output_queue = output_queue
Expand Down
2 changes: 1 addition & 1 deletion demos/video/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def __init__(self, input_queue, output_queue, screen):
self.path = "./demos/video/resources/pre-processed/"
self.targets = os.listdir(self.path)
self.address = random.randint(0, len(self.targets) - 1)
self.target = self.targets[self.address]
self.target = "Rick.npz"
self.pause = False
self.new_video = False
self.next_frame = False
Expand Down
2 changes: 1 addition & 1 deletion demos/welcome_y/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def __init__(self, input_queue, output_queue, screen):
screen (Screen): Screen object
"""
# Provide the framerate in frames/seconds and the amount of time of the demo in seconds
self.frame_rate = 10
self.frame_rate = 15

self.input_queue = input_queue
self.output_queue = output_queue
Expand Down
30 changes: 26 additions & 4 deletions display/physical_screen_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,39 @@


class PhysicalScreen:
def __init__(self, brightness=3):
def __init__(self, brightness=5):
self.brightness = brightness
self.num_segs_across = 1
self.num_segs_down = 1
self.num_segs_across = 3
self.num_segs_down = 4

self.addresses = [
# First row
"172.16.0.5",
"172.16.0.4",
"172.16.0.3",
# Second row
"172.16.0.8",
"172.16.0.7",
"172.16.0.6",
# Third row
"172.16.0.11",
"172.16.0.10",
"172.16.0.9",
# Fourth row
"172.16.0.14",
"172.16.0.13",
"172.16.0.12",
]
self._create_display()

def _create_display(self):
# need to have an array of ip addresses if more panels
panel_array = [
[
SevenSegment(ip_address="172.0.0.3", brightness=self.brightness)
SevenSegment(
ip_address=self.addresses[i * self.num_segs_across + j],
brightness=self.brightness,
)
for j in range(self.num_segs_across)
]
for i in range(self.num_segs_down)
Expand Down
9 changes: 7 additions & 2 deletions display/seven_seg_v2.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import socket
import struct

from loguru import logger

from display import symbols as sy # get_char2

MAX72XX_DIGITS = 8
Expand Down Expand Up @@ -28,8 +30,8 @@ class SevenSegment:
def __init__(
self,
baudrate=DEFAULT_BAUDRATE,
ip_address="172.0.0.3",
port=1883,
ip_address="172.16.0.3",
port=2018,
brightness=7,
clear=True,
):
Expand Down Expand Up @@ -58,6 +60,7 @@ def __init__(
self.dataSerialer = struct.Struct("B" * (self.num_digits + 2))

# Setup the display
logger.debug(f"Sending setup commands to {self.addr}")
self.command(MAX72XX_REG_SHUTDOWN, 1) # 1 enables the display
self.command(
MAX72XX_REG_DECODEMODE, 0
Expand Down Expand Up @@ -291,10 +294,12 @@ def text2(self, x, y, txt, horizontal=True, flush=False):

# Write data buffer to panel through socket
def _write_data(self):
logger.debug(f"Writing data {self._buf} to {self.addr}")
self.panel_sock.sendto(self.dataSerialer.pack(*self._buf), self.addr)

# Write command buffer to panel through socket
def _write_command(self):
logger.debug(f"Writing command {self._command_buf} to {self.addr}")
self.panel_sock.sendto(
self.commandSerializer.pack(*self._command_buf), self.addr
)
Expand Down
Loading
Loading