From f3bdb3318614c6f4fd4c287df7fcfce8ebbf611e Mon Sep 17 00:00:00 2001 From: Tom Richards Date: Mon, 9 Mar 2026 10:36:46 +0000 Subject: [PATCH] WIP UI view of TODO list (in progress, waiting and 'exceeded max attempts') --- backend/app/AppComponents.scala | 2 + backend/app/controllers/api/Workers.scala | 29 ++++ backend/app/services/manifest/Manifest.scala | 12 +- .../app/services/manifest/Neo4jManifest.scala | 79 ++++++++++- backend/conf/routes | 2 + frontend/src/js/App.js | 2 + .../js/components/Settings/SettingsSidebar.js | 9 +- frontend/src/js/components/Settings/ToDo.tsx | 131 ++++++++++++++++++ 8 files changed, 260 insertions(+), 6 deletions(-) create mode 100644 backend/app/controllers/api/Workers.scala create mode 100644 frontend/src/js/components/Settings/ToDo.tsx diff --git a/backend/app/AppComponents.scala b/backend/app/AppComponents.scala index 8e175719..d70b0a6b 100644 --- a/backend/app/AppComponents.scala +++ b/backend/app/AppComponents.scala @@ -211,6 +211,7 @@ class AppComponents(context: Context, config: Config) val workspacesController = new Workspaces(authControllerComponents, annotations, esResources, manifest, users, blobStorage, previewStorage, postgresClient, remoteIngestStore, remoteIngestStorage, config.remoteIngest, snsClient) val commentsController = new Comments(authControllerComponents, manifest, esResources, annotations) val usersController = new Users(authControllerComponents, userProvider) + val workersController = new Workers(authControllerComponents, manifest) val pagesController = new PagesController(authControllerComponents, manifest, esResources, pages2, annotations, previewStorage) val ingestionController = new Ingestion(authControllerComponents, ingestStorage) val ingestionEventsController = new IngestionEvents(authControllerComponents, postgresClient, users ) @@ -277,6 +278,7 @@ class AppComponents(context: Context, config: Config) workspacesController, previewController, usersController, + workersController, videoVerifierController, authController, appController, diff --git a/backend/app/controllers/api/Workers.scala b/backend/app/controllers/api/Workers.scala new file mode 100644 index 00000000..9effe184 --- /dev/null +++ b/backend/app/controllers/api/Workers.scala @@ -0,0 +1,29 @@ +package controllers.api + +import model.user.UserPermission.CanPerformAdminOperations +import play.api.libs.json.{Json, OWrites} +import services.manifest.Manifest +import services.manifest.Manifest.{Failure, ToDo, ToDoItem, ToDoResponse} +import utils.Logging +import utils.attempt.Attempt +import utils.controller.{AuthApiController, AuthControllerComponents} + +class Workers( + override val controllerComponents: AuthControllerComponents, + manifest: Manifest, +) extends AuthApiController with Logging { + + private implicit val writesFailure: OWrites[Failure] = Json.writes[Failure] + private implicit val writesToDoItem: OWrites[ToDoItem] = Json.writes[ToDoItem] + private implicit val writesToDo: OWrites[ToDo] = Json.writes[ToDo] + private implicit val writesToDoResponse: OWrites[ToDoResponse] = Json.writes[ToDoResponse] + + def getToDo() = ApiAction.attempt { req => + checkPermission(CanPerformAdminOperations, req) { + Attempt.fromEither( + manifest.getToDo().map(Json.toJson(_)).map(Ok(_)) + ) + } + } + +} diff --git a/backend/app/services/manifest/Manifest.scala b/backend/app/services/manifest/Manifest.scala index 2b34b2db..581cebe2 100644 --- a/backend/app/services/manifest/Manifest.scala +++ b/backend/app/services/manifest/Manifest.scala @@ -6,9 +6,9 @@ import model._ import model.annotations.WorkspaceMetadata import model.frontend.email.EmailNeighbours import model.frontend.{BasicResource, ExtractionFailures, ResourcesForExtractionFailure} -import model.ingestion.{IngestionFile, RemoteIngest, WorkspaceItemContext, WorkspaceItemUploadContext} +import model.ingestion.{IngestionFile, WorkspaceItemContext, WorkspaceItemUploadContext} import model.manifest._ -import services.manifest.Manifest.WorkCounts +import services.manifest.Manifest.{ToDo, ToDoResponse, WorkCounts} import utils.attempt.{Attempt, Failure} import java.nio.file.Path @@ -30,9 +30,17 @@ object Manifest { case class InsertEmail(email: Email, parent: Uri) extends Insertion case class WorkCounts(inProgress: Int, outstanding: Int) + + case class ToDo(total: Long, items: List[ToDoItem]) + case class Failure(at: Long, stackTrace: String) + case class ToDoItem(extractor: String, ingestion: String, attempts: Int, priority: Int, size: Long, blobUri: String, lockedBy: Option[String], failures: List[Failure]) + case class ToDoResponse(inProgress: ToDo, waiting: ToDo, failed: ToDo) } trait WorkerManifest { + + def getToDo(): Either[Failure, ToDoResponse] + def fetchWork(workerName: String, workerCount:Int, workerIndex:Int, maxBatchSize: Int, maxCost: Int): Either[Failure, List[WorkItem]] def releaseLocks(workerName: String): Either[Failure, Unit] diff --git a/backend/app/services/manifest/Neo4jManifest.scala b/backend/app/services/manifest/Neo4jManifest.scala index a099c9ea..1de4f12e 100644 --- a/backend/app/services/manifest/Neo4jManifest.scala +++ b/backend/app/services/manifest/Neo4jManifest.scala @@ -16,7 +16,7 @@ import org.joda.time.DateTime import org.neo4j.driver.v1.Values.parameters import org.neo4j.driver.v1.{Driver, StatementResult, StatementRunner, Value} import services.Neo4jQueryLoggingConfig -import services.manifest.Manifest.WorkCounts +import services.manifest.Manifest.{ToDo, ToDoItem, ToDoResponse, WorkCounts} import utils._ import utils.attempt.{Attempt, Failure, IllegalStateFailure, NotFoundFailure} @@ -451,6 +451,83 @@ class Neo4jManifest(driver: Driver, executionContext: ExecutionContext, queryLog Right(()) } + override def getToDo(): Either[Failure, ToDoResponse] = transaction { tx => + // TODO DRY out the common parts with fetchWork + val r = tx.run( + s""" + |MATCH (extractor: Extractor)-[todo: TODO]->(blob: Blob:Resource) + |OPTIONAL MATCH (blob)<-[failure:EXTRACTION_FAILURE]-(extractor) + |WITH extractor, todo, blob, collect(failure) as failures + |ORDER BY coalesce(todo.priority, extractor.priority) DESC + |WITH COLLECT({ + | extractor: extractor.name, + | ingestion: todo.ingestion, + | attempts: todo.attempts, + | priority: todo.priority, + | size: blob.size, + | blobUri: blob.uri, + | lockedBy: todo.lockedBy, + | failures: failures + |}) as allItems + |RETURN reduce(acc = {inProgress: {total: 0, items:[]}, waiting: {total: 0, items:[]}, failed: {total: 0, items:[]}}, item in allItems | + | CASE + | WHEN item.lockedBy IS NOT NULL + | THEN { + | inProgress: {total: acc.inProgress.total + 1, items: acc.inProgress.items + item}, + | waiting: acc.waiting, + | failed: acc.failed + | } + | WHEN item.attempts < {maxExtractionAttempts} + | THEN { + | inProgress: acc.inProgress, + | waiting: {total: acc.waiting.total + 1, items: ( + | CASE WHEN size(acc.waiting.items) < {limit} THEN acc.waiting.items + item ELSE acc.waiting.items END + | )}, + | failed: acc.failed + | } + | ELSE + | { + | inProgress: acc.inProgress, + | waiting: acc.waiting, + | failed: {total: acc.failed.total + 1, items: ( + | CASE WHEN size(acc.failed.items) < {limit} THEN acc.failed.items + item ELSE acc.failed.items END + | )} + | } + | END + |) as result + |""".stripMargin, + parameters( + "limit", 25, + "maxExtractionAttempts", Int.box(maxExtractionAttempts) + ), + ).single() + + def extractToDo(r: Value): ToDo = ToDo( + total = r.get("total").asLong(), + items = r.get("items").asList[ToDoItem](v => ToDoItem( + extractor = v.get("extractor").asString(), + ingestion = v.get("ingestion").asString(), + attempts = v.get("attempts").asInt(), + priority = v.get("priority").asInt(), + size = v.get("size").asLong(), + blobUri = v.get("blobUri").asString(), + lockedBy = if(v.get("lockedBy").isNull) None else Some(v.get("lockedBy").asString()), //TODO also get the lockedAt timestamp + failures = v.get("failures").asList[Manifest.Failure](f => Manifest.Failure( + at = f.get("at").asLong(), + stackTrace = f.get("stackTrace").asString() + )).asScala.toList + )).asScala.toList + ) + + Right( + ToDoResponse( + inProgress = extractToDo(r.get("result").get("inProgress")), + waiting = extractToDo(r.get("result").get("waiting")), + failed = extractToDo(r.get("result").get("failed")) + ) + ) + } + override def fetchWork(workerName: String, workerCount: Int, workerIndex: Int, maxBatchSize: Int, maxCost: Int): Either[Failure, List[WorkItem]] = transaction { tx => val summary = tx.run( s""" diff --git a/backend/conf/routes b/backend/conf/routes index 05fb7811..64edbd61 100644 --- a/backend/conf/routes +++ b/backend/conf/routes @@ -97,6 +97,8 @@ PUT /api/users/:username/register cont GET /api/currentUser/permissions controllers.api.Users.getMyPermissions() +GET /api/todo controllers.api.Workers.getToDo() + POST /api/video-verifier/fetch controllers.api.VideoVerifier.fetchVideosToVerify() + NOCSRF diff --git a/frontend/src/js/App.js b/frontend/src/js/App.js index 56e5a79f..f51c54f7 100644 --- a/frontend/src/js/App.js +++ b/frontend/src/js/App.js @@ -44,6 +44,7 @@ import MyUploads from "./components/IngestionEvents/MyUploads"; import AllIngestionEvents from "./components/IngestionEvents/AllIngestionEvents"; import { CaptureFromUrl } from "./components/Uploads/CaptureFromUrl"; import { VideoVerifier } from "./components/VideoVerifier"; +import {ToDo} from "./components/Settings/ToDo"; class App extends React.Component { static propTypes = { @@ -108,6 +109,7 @@ class App extends React.Component { +
Upload Calendar
+ +
TODO
+
); }; render() { - const canManageUsers = this.props.myPermissions.includes( + const canPerformAdminOperations = this.props.myPermissions.includes( "CanPerformAdminOperations", ); @@ -95,7 +98,7 @@ class SettingsSidebar extends React.Component {
Feature Switches
- {canManageUsers ? this.renderAdminSettingsLinks() : false} + {canPerformAdminOperations ? this.renderAdminSettingsLinks() : false}
Logs
@@ -105,7 +108,7 @@ class SettingsSidebar extends React.Component { >
My Uploads
- {canManageUsers ? this.renderAdminLogsLinks() : false} + {canPerformAdminOperations ? this.renderAdminLogsLinks() : false}
); diff --git a/frontend/src/js/components/Settings/ToDo.tsx b/frontend/src/js/components/Settings/ToDo.tsx new file mode 100644 index 00000000..8843d1d1 --- /dev/null +++ b/frontend/src/js/components/Settings/ToDo.tsx @@ -0,0 +1,131 @@ +import {useEffect, useState} from "react"; +import authFetch from "../../util/auth/authFetch"; +import {EuiBasicTable, EuiButton, EuiPopover, EuiToolTip} from "@elastic/eui"; + +interface Failure { + at: number, + stackTrace: string +} + +interface ToDoItem { + extractor: string; + ingestion: string; + attempts: number; + priority: number; + size: number; + blobUri: string; + lockedBy: string | undefined; + failures: Failure[]; +} + +interface ToDo { + total: number; + items: ToDoItem[]; +} + +interface ToDoDisplayProps { + title: string, + todo: ToDo, +} +const ToDoDisplay = ({title, todo}: ToDoDisplayProps) => <> +

{title}

+
Showing {todo.items.length} of {todo.total} items
+ { /* TODO some grouping by worker, possibly serverside */ } + { /* TODO make priority changeable */ } + { /* TODO consider some sort of search */ } + { /* TODO highlight same blob being locked by multiple workers */ } + + + {blobUri.substring(0, 4)}...{blobUri.substring(blobUri.length - 4)} + , + }, + { + name: "Extractor", + field: "extractor", + }, + { + name: "Ingestion", + field: "ingestion", + style: { + whiteSpace: "nowrap", + } + }, + { + name: "Attempts", + field: "attempts", + }, + { + name: "Priority", + field: "priority", + }, + { + name: "Size (bytes)", + field: "size", + render: (size: number, item) => size.toLocaleString(), + align: "right", + }, + { + name: Previous Failures?
(hover for stack trace)
, + field: "failures", + render: (failures: Failure[], item) => + + {failures.map(({at, stackTrace}) => ( +
  • + {new Date(at).toLocaleString()} +
    + {stackTrace.split("\n")[0].substring(0, 70)}... +
  • + ))} + }> + {failures.length} failure{failures.length !== 1 && "s"} +
    + } + ]} /> + + +export const ToDo = () => { + + const [isRefreshing, setIsRefreshing] = useState(false); + const [data, setData] = useState<{ + inProgress: ToDo, + waiting: ToDo, + failed: ToDo + } | null>(null); + + const refresh = () => { + setIsRefreshing(true); + authFetch("/api/todo").then(res => res.json()).then(res => { + setData(res); + setIsRefreshing(false); + }); + } + useEffect(refresh, []); + + return ( +
    + + {isRefreshing ? "Refreshing" : "Refresh"} + + {data ? (<> + + + + ) : (
    Loading...
    )} +
    + ); +}