diff --git a/docker-compose.dev.yaml b/docker-compose.dev.yaml index 510004d2..107e99f8 100644 --- a/docker-compose.dev.yaml +++ b/docker-compose.dev.yaml @@ -22,6 +22,7 @@ services: - LOG_LEVEL - ENABLE_USER_REGISTRATION - GITHUB_TOKEN + - MAX_CONCURRENT_RENDERS=${MAX_CONCURRENT_RENDERS:-5} volumes: go_modules: diff --git a/docker-compose.https.yaml b/docker-compose.https.yaml index c067a3b9..d344e938 100644 --- a/docker-compose.https.yaml +++ b/docker-compose.https.yaml @@ -13,6 +13,7 @@ services: - PRODUCTION - ENABLE_USER_REGISTRATION - GITHUB_TOKEN + - MAX_CONCURRENT_RENDERS=${MAX_CONCURRENT_RENDERS:-5} healthcheck: test: ["CMD", "/app/tronbyt-server", "health"] diff --git a/docker-compose.postgres.yaml b/docker-compose.postgres.yaml index 0203b845..fefff7f7 100644 --- a/docker-compose.postgres.yaml +++ b/docker-compose.postgres.yaml @@ -16,6 +16,7 @@ services: - ENABLE_USER_REGISTRATION - SINGLE_USER_AUTO_LOGIN - GITHUB_TOKEN + - MAX_CONCURRENT_RENDERS=${MAX_CONCURRENT_RENDERS:-5} - DB_DSN=host=db user=tronbyt password=tronbyt dbname=tronbyt port=5432 sslmode=disable TimeZone=UTC depends_on: - db diff --git a/docker-compose.redis.yaml b/docker-compose.redis.yaml index 669ed7db..186cbce0 100644 --- a/docker-compose.redis.yaml +++ b/docker-compose.redis.yaml @@ -17,6 +17,7 @@ services: - ENABLE_USER_REGISTRATION - SINGLE_USER_AUTO_LOGIN - GITHUB_TOKEN + - MAX_CONCURRENT_RENDERS=${MAX_CONCURRENT_RENDERS:-5} healthcheck: test: ["CMD", "/app/tronbyt-server", "health"] diff --git a/docker-compose.yaml b/docker-compose.yaml index 787b1629..bfca84d4 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -16,6 +16,7 @@ services: - ENABLE_USER_REGISTRATION - SINGLE_USER_AUTO_LOGIN - GITHUB_TOKEN + - MAX_CONCURRENT_RENDERS=${MAX_CONCURRENT_RENDERS:-5} healthcheck: test: ["CMD", "/app/tronbyt-server", "health"] diff --git a/internal/config/config.go b/internal/config/config.go index 28401f49..14b37468 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -28,6 +28,7 @@ type Settings struct { TrustedProxies string `env:"TRONBYT_TRUSTED_PROXIES" envDefault:"*"` LogLevel string `env:"LOG_LEVEL" envDefault:"INFO"` EnableUpdateChecks bool `env:"ENABLE_UPDATE_CHECKS" envDefault:"true"` + MaxConcurrentRenders int `env:"MAX_CONCURRENT_RENDERS" envDefault:"5"` } // TemplateConfig holds configuration values needed in templates. diff --git a/internal/server/handlers_device_api.go b/internal/server/handlers_device_api.go index 985c6cb7..c0ef0b1c 100644 --- a/internal/server/handlers_device_api.go +++ b/internal/server/handlers_device_api.go @@ -15,6 +15,17 @@ import ( func (s *Server) handleNextApp(w http.ResponseWriter, r *http.Request) { id := r.PathValue("id") + renderMetrics.RecordRequest() + + // Acquire per-device semaphore to prevent queue backup from same device. + // If already processing a request for this device, return cached image immediately. + if !s.acquireDeviceSemaphore(id) { + slog.Debug("Device busy, serving cached image", "device", id) + s.serveCachedImageForDevice(w, r, id) + return + } + defer s.releaseDeviceSemaphore(id) + var device *data.Device if d, err := DeviceFromContext(r.Context()); err == nil { device = d @@ -107,9 +118,13 @@ func (s *Server) handleNextApp(w http.ResponseWriter, r *http.Request) { // Send default image if error (or not found) slog.Error("Failed to get next app image", "device", device.ID, "error", err) s.sendDefaultImage(w, r, device) + webpMetrics.RecordWebPServed(0) + webpMetrics.RecordUniqueDevice(device.ID) return } + webpMetrics.RecordUniqueDevice(device.ID) + // For HTTP devices, we assume "Sent" equals "Displaying" (or roughly so). // We update DisplayingApp here so the Preview uses the explicit field instead of fallback. if app != nil { @@ -142,6 +157,8 @@ func (s *Server) handleNextApp(w http.ResponseWriter, r *http.Request) { dwell := device.GetEffectiveDwellTime(app) w.Header().Set("Tronbyt-Dwell-Secs", fmt.Sprintf("%d", dwell)) + webpMetrics.RecordWebPServed(len(imgData)) + if _, err := w.Write(imgData); err != nil { slog.Error("Failed to write image data to response", "error", err) // Log error, but can't change HTTP status after writing headers. diff --git a/internal/server/handlers_user.go b/internal/server/handlers_user.go index 860c1bf4..5b763b92 100644 --- a/internal/server/handlers_user.go +++ b/internal/server/handlers_user.go @@ -429,3 +429,24 @@ func (s *Server) handleRefreshSystemRepo(w http.ResponseWriter, r *http.Request) http.Redirect(w, r, "/auth/edit", http.StatusSeeOther) } + +func (s *Server) handleAdminDashboard(w http.ResponseWriter, r *http.Request) { + user := GetUser(r) + if !user.IsAdmin { + http.Error(w, "Forbidden", http.StatusForbidden) + return + } + + var totalDevices, totalUsers int64 + s.DB.Model(&data.Device{}).Count(&totalDevices) + s.DB.Model(&data.User{}).Count(&totalUsers) + + stats := GetStatsSnapshot() + + s.renderTemplate(w, r, "admin_dashboard", TemplateData{ + User: user, + TotalDevices: totalDevices, + TotalUsers: totalUsers, + Stats: stats, + }) +} diff --git a/internal/server/helpers.go b/internal/server/helpers.go index 5dabe641..4d4a2382 100644 --- a/internal/server/helpers.go +++ b/internal/server/helpers.go @@ -86,6 +86,11 @@ type TemplateData struct { AppConfig map[string]any AppMetadata *apps.AppMetadata + // Admin Dashboard + TotalDevices int64 + TotalUsers int64 + Stats StatsSnapshot + // Device Update Extras ColorFilterOptions []ColorFilterOption ShowFullAnimationOptions []ShowFullAnimationOption diff --git a/internal/server/render_metrics.go b/internal/server/render_metrics.go new file mode 100644 index 00000000..ed57cdef --- /dev/null +++ b/internal/server/render_metrics.go @@ -0,0 +1,267 @@ +package server + +import ( + "fmt" + "log/slog" + "os" + "strconv" + "strings" + "sync" + "sync/atomic" + "time" +) + +type RenderMetrics struct { + activeCount atomic.Int64 + queuedCount atomic.Int64 + totalCount atomic.Int64 + failedCount atomic.Int64 + totalDur int64 // nanoseconds + maxDur atomic.Int64 + + // Sliding window tracking (timestamps of events in last 60 seconds) + mu sync.Mutex + rendersByMinute []int64 // timestamps of renders + reqsByMinute []int64 // timestamps of requests +} + +var renderMetrics RenderMetrics + +type WebPMetrics struct { + servedCount atomic.Int64 + renderCount atomic.Int64 + bytesServed atomic.Int64 + uniqueMu sync.Mutex + uniqueDevices map[string]int64 // device ID -> last seen timestamp + + // Sliding window tracking + mu sync.Mutex + webpsByMinute []int64 // timestamps of webp serves +} + +var webpMetrics WebPMetrics + +const windowDuration = 60 * time.Second + +func (m *RenderMetrics) StartRender() { + m.activeCount.Add(1) + m.queuedCount.Add(1) +} + +func (m *RenderMetrics) EndRender(dur time.Duration, failed bool) { + m.activeCount.Add(-1) + m.queuedCount.Add(-1) + m.totalCount.Add(1) + atomic.AddInt64(&m.totalDur, int64(dur)) + + currentMax := m.maxDur.Load() + if int64(dur) > currentMax { + m.maxDur.Store(int64(dur)) + } + + if failed { + m.failedCount.Add(1) + } + + now := time.Now().Unix() + m.mu.Lock() + m.rendersByMinute = append(m.rendersByMinute, now) + m.mu.Unlock() +} + +func (m *RenderMetrics) RecordRequest() { + now := time.Now().Unix() + m.mu.Lock() + m.reqsByMinute = append(m.reqsByMinute, now) + m.mu.Unlock() +} + +func (m *RenderMetrics) ActiveCount() int64 { + return m.activeCount.Load() +} + +func (m *RenderMetrics) AvgDuration() time.Duration { + total := m.totalCount.Load() + if total == 0 { + return 0 + } + return time.Duration(m.totalDur / total) +} + +func (m *RenderMetrics) MaxDuration() time.Duration { + return time.Duration(m.maxDur.Load()) +} + +func (m *RenderMetrics) TotalCount() int64 { + return m.totalCount.Load() +} + +func (m *RenderMetrics) FailedCount() int64 { + return m.failedCount.Load() +} + +func (m *RenderMetrics) QueuedCount() int64 { + return m.queuedCount.Load() +} + +func (m *RenderMetrics) RendersPerMin() int64 { + m.mu.Lock() + defer m.mu.Unlock() + cutoff := time.Now().Add(-windowDuration).Unix() + var count int64 + for _, t := range m.rendersByMinute { + if t >= cutoff { + count++ + } + } + return count +} + +func (m *RenderMetrics) ReqsPerMin() int64 { + m.mu.Lock() + defer m.mu.Unlock() + cutoff := time.Now().Add(-windowDuration).Unix() + var count int64 + for _, t := range m.reqsByMinute { + if t >= cutoff { + count++ + } + } + return count +} + +func (w *WebPMetrics) RecordWebPServed(bytes int) { + w.servedCount.Add(1) + w.bytesServed.Add(int64(bytes)) + + now := time.Now().Unix() + w.mu.Lock() + w.webpsByMinute = append(w.webpsByMinute, now) + w.mu.Unlock() +} + +func (w *WebPMetrics) RecordRender() { + w.renderCount.Add(1) +} + +func (w *WebPMetrics) RecordUniqueDevice(deviceID string) { + now := time.Now().Unix() + w.uniqueMu.Lock() + if w.uniqueDevices == nil { + w.uniqueDevices = make(map[string]int64) + } + w.uniqueDevices[deviceID] = now + w.uniqueMu.Unlock() +} + +func (w *WebPMetrics) LogStats() { + served := w.servedCount.Swap(0) + renders := w.renderCount.Swap(0) + + cutoff := time.Now().Add(-windowDuration).Unix() + w.uniqueMu.Lock() + var uniqueDevs int64 + for _, lastSeen := range w.uniqueDevices { + if lastSeen >= cutoff { + uniqueDevs++ + } + } + // Clean up old entries + for id, lastSeen := range w.uniqueDevices { + if lastSeen < cutoff { + delete(w.uniqueDevices, id) + } + } + w.uniqueMu.Unlock() + + loadAvg1m := getLoadAverage() + if served > 0 { + slog.Info(fmt.Sprintf("Stats ------ : %.1f - %d / %d ", loadAvg1m, served, renders)) + } +} + +func getLoadAverage() float64 { + data, err := os.ReadFile("/proc/loadavg") + if err != nil { + return 0 + } + parts := strings.Split(string(data), " ") + if len(parts) < 1 { + return 0 + } + f, err := strconv.ParseFloat(parts[0], 64) + if err != nil { + return 0 + } + return f +} + +func (w *WebPMetrics) ServedCount() int64 { + return w.servedCount.Load() +} + +func (w *WebPMetrics) RenderCount() int64 { + return w.renderCount.Load() +} + +func (w *WebPMetrics) BytesServed() int64 { + return w.bytesServed.Load() +} + +func (w *WebPMetrics) WebpsPerMin() int64 { + w.mu.Lock() + defer w.mu.Unlock() + cutoff := time.Now().Add(-windowDuration).Unix() + var count int64 + for _, t := range w.webpsByMinute { + if t >= cutoff { + count++ + } + } + return count +} + +func (w *WebPMetrics) UniqueDevicesPerMin() int64 { + cutoff := time.Now().Add(-windowDuration).Unix() + w.uniqueMu.Lock() + defer w.uniqueMu.Unlock() + var count int64 + for _, lastSeen := range w.uniqueDevices { + if lastSeen >= cutoff { + count++ + } + } + return count +} + +type StatsSnapshot struct { + ActiveRenders int64 + QueuedRenders int64 + TotalRenders int64 + FailedRenders int64 + AvgRenderMs int64 + MaxRenderMs int64 + RendersPerMin int64 + ReqsPerMin int64 + WebpsServed int64 + WebpsPerMin int64 + BytesServedMB float64 + UniqueDevsPerMin int64 +} + +func GetStatsSnapshot() StatsSnapshot { + return StatsSnapshot{ + ActiveRenders: renderMetrics.ActiveCount(), + QueuedRenders: renderMetrics.QueuedCount(), + TotalRenders: renderMetrics.TotalCount(), + FailedRenders: renderMetrics.FailedCount(), + AvgRenderMs: renderMetrics.AvgDuration().Milliseconds(), + MaxRenderMs: renderMetrics.MaxDuration().Milliseconds(), + RendersPerMin: renderMetrics.RendersPerMin(), + ReqsPerMin: renderMetrics.ReqsPerMin(), + WebpsServed: webpMetrics.ServedCount(), + WebpsPerMin: webpMetrics.WebpsPerMin(), + BytesServedMB: float64(webpMetrics.BytesServed()) / (1024 * 1024), + UniqueDevsPerMin: webpMetrics.UniqueDevicesPerMin(), + } +} diff --git a/internal/server/render_utils.go b/internal/server/render_utils.go index 4216b084..3cc165ea 100644 --- a/internal/server/render_utils.go +++ b/internal/server/render_utils.go @@ -143,12 +143,41 @@ func (s *Server) possiblyRender(ctx context.Context, app *data.App, device *data now := time.Now() // uinterval is minutes if time.Since(app.LastRender) > time.Duration(app.UInterval)*time.Minute { - slog.Info("Rendering app", "app", appBasename) + // Try to acquire render semaphore without blocking. + // If no slot available, only skip rendering if we have a valid cached image. + // If last render was empty or failed, we must attempt rendering. + hasValidCachedImage := app.LastSuccessfulRender != nil && !app.EmptyLastRender + + if hasValidCachedImage { + select { + case s.RenderSem <- struct{}{}: + defer func() { <-s.RenderSem }() + default: + slog.Debug("Skipping render - no slot available", "app", appBasename) + return true // Use existing cached image + } + } else { + // No valid cached image, must render - use non-blocking but wait briefly + select { + case s.RenderSem <- struct{}{}: + defer func() { <-s.RenderSem }() + case <-ctx.Done(): + slog.Warn("Context cancelled waiting for render slot", "app", appBasename) + return false // Cannot render, skip to next app + } + } + + renderMetrics.StartRender() + webpMetrics.RecordRender() startTime := time.Now() imgBytes, messages, err := s.RenderApp(ctx, device, app, appPath, nil) renderDur := time.Since(startTime) + slog.Info(fmt.Sprintf("Rendered in %dms %s ", renderDur.Milliseconds(), appBasename)) + + renderMetrics.EndRender(renderDur, err != nil) + for _, msg := range messages { slog.Debug("Render message", "app", appBasename, "message", msg) } diff --git a/internal/server/server.go b/internal/server/server.go index 5f567607..f48b25fe 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -20,6 +20,7 @@ import ( "tronbyt-server/internal/apps" "tronbyt-server/internal/config" + "tronbyt-server/internal/data" syncer "tronbyt-server/internal/sync" "tronbyt-server/web" @@ -46,6 +47,10 @@ type Server struct { Upgrader *websocket.Upgrader PromRegistry prometheus.Registerer PromGatherer prometheus.Gatherer + RenderSem chan struct{} // Semaphore to limit concurrent renders + + deviceSemaphores map[string]chan struct{} + deviceSemaphoresMutex sync.Mutex systemAppsCache []apps.AppMetadata systemAppsCacheMutex sync.RWMutex @@ -56,18 +61,19 @@ type Server struct { // Map template names to their file paths relative to web/templates. var templateFiles = map[string]string{ - "index": "manager/index.html", - "adminindex": "manager/adminindex.html", - "login": "auth/login.html", - "register": "auth/register.html", - "edit": "auth/edit.html", - "create": "manager/create.html", - "addapp": "manager/addapp.html", - "configapp": "manager/configapp.html", - "uploadapp": "manager/uploadapp.html", - "firmware": "manager/firmware.html", - "update": "manager/update.html", - "device_tv": "manager/device_tv.html", + "index": "manager/index.html", + "adminindex": "manager/adminindex.html", + "admin_dashboard": "manager/admin_dashboard.html", + "login": "auth/login.html", + "register": "auth/register.html", + "edit": "auth/edit.html", + "create": "manager/create.html", + "addapp": "manager/addapp.html", + "configapp": "manager/configapp.html", + "uploadapp": "manager/uploadapp.html", + "firmware": "manager/firmware.html", + "update": "manager/update.html", + "device_tv": "manager/device_tv.html", } func NewServer(db *gorm.DB, cfg *config.Settings) *Server { @@ -89,6 +95,17 @@ func NewServer(db *gorm.DB, cfg *config.Settings) *Server { PromGatherer: prometheus.DefaultGatherer, } + // Initialize render semaphore (default to 5 for zero/negative values) + maxRenders := cfg.MaxConcurrentRenders + if maxRenders <= 0 { + maxRenders = 5 + slog.Warn("MaxConcurrentRenders is zero or negative, using default value of 5") + } + s.RenderSem = make(chan struct{}, maxRenders) + + // Initialize device semaphores map + s.deviceSemaphores = make(map[string]chan struct{}) + // Load Settings from DB // Secret Key secretKey, err := s.getSetting("secret_key") @@ -222,6 +239,7 @@ func (s *Server) routes() { // Web UI s.Router.HandleFunc("GET /", s.RequireLogin(s.handleIndex)) s.Router.HandleFunc("GET /admin", s.RequireLogin(s.handleAdminIndex)) + s.Router.HandleFunc("GET /admin/dashboard", s.RequireLogin(s.handleAdminDashboard)) s.Router.HandleFunc("DELETE /admin/users/{username}", s.RequireLogin(s.handleDeleteUser)) s.Router.HandleFunc("GET /devices/create", s.RequireLogin(s.handleCreateDeviceGet)) @@ -302,6 +320,15 @@ func (s *Server) routes() { s.Router.HandleFunc("GET /debug/pprof/symbol", pprof.Symbol) s.Router.HandleFunc("GET /debug/pprof/trace", pprof.Trace) } + + // Start periodic WebP stats logger (every 10 seconds) + go func() { + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + for range ticker.C { + webpMetrics.LogStats() + } + }() } func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { @@ -350,3 +377,69 @@ func (s *Server) handleDots(w http.ResponseWriter, r *http.Request) { slog.Error("Failed to write dots SVG", "error", err) } } + +// acquireDeviceSemaphore tries to acquire a slot for processing a device request. +// Returns true if acquired, false if device is already processing a request. +// When false is returned, the caller should serve a cached image. +func (s *Server) acquireDeviceSemaphore(deviceID string) bool { + s.deviceSemaphoresMutex.Lock() + ch, exists := s.deviceSemaphores[deviceID] + if !exists { + ch = make(chan struct{}, 1) + s.deviceSemaphores[deviceID] = ch + } + s.deviceSemaphoresMutex.Unlock() + + select { + case ch <- struct{}{}: + return true + default: + return false + } +} + +// releaseDeviceSemaphore releases the slot for a device after processing. +func (s *Server) releaseDeviceSemaphore(deviceID string) { + s.deviceSemaphoresMutex.Lock() + ch, exists := s.deviceSemaphores[deviceID] + s.deviceSemaphoresMutex.Unlock() + if exists { + <-ch + } +} + +// serveCachedImageForDevice returns a cached image when the device is busy processing another request. +// This prevents queue buildup from retry storms. +func (s *Server) serveCachedImageForDevice(w http.ResponseWriter, r *http.Request, deviceID string) { + device, err := gorm.G[data.Device](s.DB).Preload("Apps", nil).Where("id = ?", deviceID).First(r.Context()) + if err != nil { + slog.Error("Failed to fetch device for cached image", "device", deviceID, "error", err) + http.Error(w, "Device not found", http.StatusNotFound) + return + } + + imgData, app, err := s.GetCurrentAppImage(r.Context(), &device) + if err != nil || imgData == nil { + slog.Error("Failed to get cached image", "device", deviceID, "error", err) + http.Error(w, "Image not found", http.StatusNotFound) + return + } + + w.Header().Set("Content-Type", "image/webp") + w.Header().Set("Cache-Control", "public, max-age=0, must-revalidate") + w.Header().Set("X-Cached", "true") + + if app != nil { + w.Header().Set("Tronbyt-App", app.Iname) + } + + brightness := device.GetEffectiveBrightness() + w.Header().Set("Tronbyt-Brightness", fmt.Sprintf("%d", brightness)) + + dwell := device.GetEffectiveDwellTime(app) + w.Header().Set("Tronbyt-Dwell-Secs", fmt.Sprintf("%d", dwell)) + + if _, err := w.Write(imgData); err != nil { + slog.Error("Failed to write cached image", "error", err) + } +} diff --git a/internal/server/websockets.go b/internal/server/websockets.go index f8f0c4d7..4cbf3735 100644 --- a/internal/server/websockets.go +++ b/internal/server/websockets.go @@ -274,6 +274,8 @@ func (s *Server) wsWriteLoop(ctx context.Context, conn *websocket.Conn, initialD if err := conn.WriteMessage(websocket.BinaryMessage, imgData); err != nil { return } + webpMetrics.RecordWebPServed(len(imgData)) + webpMetrics.RecordUniqueDevice(device.ID) if sendImmediate { if err := conn.WriteJSON(map[string]bool{"immediate": true}); err != nil { diff --git a/web/templates/base.html b/web/templates/base.html index 32bf1321..5e27fb7a 100644 --- a/web/templates/base.html +++ b/web/templates/base.html @@ -47,6 +47,9 @@
{{ .TotalUsers }}
+{{ .TotalDevices }}
+{{ .Stats.ReqsPerMin }}
+| + {{ t .Localizer "Total WebPs Served" }} + | +{{ .Stats.WebpsServed }} | +
| + {{ t .Localizer "WebPs / Min (avg)" }} + | +{{ .Stats.WebpsPerMin }} | +
| + {{ t .Localizer "MB Served" }} + | +{{ printf "%.2f" .Stats.BytesServedMB }} MB | +
| + {{ t .Localizer "Unique Devices / Min" }} + | +{{ .Stats.UniqueDevsPerMin }} | +
| + {{ t .Localizer "Active Renders" }} + | +{{ .Stats.ActiveRenders }} | +
| + {{ t .Localizer "Queued Renders" }} + | +{{ .Stats.QueuedRenders }} | +
| + {{ t .Localizer "Total Renders" }} + | +{{ .Stats.TotalRenders }} | +
| + {{ t .Localizer "Failed Renders" }} + | +{{ .Stats.FailedRenders }} | +
| + {{ t .Localizer "Renders / Min (avg)" }} + | +{{ .Stats.RendersPerMin }} | +
| + {{ t .Localizer "Avg Render Time" }} + | +{{ .Stats.AvgRenderMs }} ms | +
| + {{ t .Localizer "Max Render Time" }} + | +{{ .Stats.MaxRenderMs }} ms | +