Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions backend/app/AppComponents.scala
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ class AppComponents(context: Context, config: Config)
val workspacesController = new Workspaces(authControllerComponents, annotations, esResources, manifest, users, blobStorage, previewStorage, postgresClient, remoteIngestStore, remoteIngestStorage, config.remoteIngest, snsClient)
val commentsController = new Comments(authControllerComponents, manifest, esResources, annotations)
val usersController = new Users(authControllerComponents, userProvider)
val workersController = new Workers(authControllerComponents, manifest)
val pagesController = new PagesController(authControllerComponents, manifest, esResources, pages2, annotations, previewStorage)
val ingestionController = new Ingestion(authControllerComponents, ingestStorage)
val ingestionEventsController = new IngestionEvents(authControllerComponents, postgresClient, users )
Expand Down Expand Up @@ -277,6 +278,7 @@ class AppComponents(context: Context, config: Config)
workspacesController,
previewController,
usersController,
workersController,
videoVerifierController,
authController,
appController,
Expand Down
29 changes: 29 additions & 0 deletions backend/app/controllers/api/Workers.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package controllers.api

import model.user.UserPermission.CanPerformAdminOperations
import play.api.libs.json.{Json, OWrites}
import services.manifest.Manifest
import services.manifest.Manifest.{Failure, ToDo, ToDoItem, ToDoResponse}
import utils.Logging
import utils.attempt.Attempt
import utils.controller.{AuthApiController, AuthControllerComponents}

class Workers(
override val controllerComponents: AuthControllerComponents,
manifest: Manifest,
) extends AuthApiController with Logging {

private implicit val writesFailure: OWrites[Failure] = Json.writes[Failure]
private implicit val writesToDoItem: OWrites[ToDoItem] = Json.writes[ToDoItem]
private implicit val writesToDo: OWrites[ToDo] = Json.writes[ToDo]
private implicit val writesToDoResponse: OWrites[ToDoResponse] = Json.writes[ToDoResponse]

def getToDo() = ApiAction.attempt { req =>
checkPermission(CanPerformAdminOperations, req) {
Attempt.fromEither(
manifest.getToDo().map(Json.toJson(_)).map(Ok(_))
)
}
}

}
12 changes: 10 additions & 2 deletions backend/app/services/manifest/Manifest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import model._
import model.annotations.WorkspaceMetadata
import model.frontend.email.EmailNeighbours
import model.frontend.{BasicResource, ExtractionFailures, ResourcesForExtractionFailure}
import model.ingestion.{IngestionFile, RemoteIngest, WorkspaceItemContext, WorkspaceItemUploadContext}
import model.ingestion.{IngestionFile, WorkspaceItemContext, WorkspaceItemUploadContext}
import model.manifest._
import services.manifest.Manifest.WorkCounts
import services.manifest.Manifest.{ToDo, ToDoResponse, WorkCounts}
import utils.attempt.{Attempt, Failure}

import java.nio.file.Path
Expand All @@ -30,9 +30,17 @@ object Manifest {
case class InsertEmail(email: Email, parent: Uri) extends Insertion

case class WorkCounts(inProgress: Int, outstanding: Int)

case class ToDo(total: Long, items: List[ToDoItem])
case class Failure(at: Long, stackTrace: String)
case class ToDoItem(extractor: String, ingestion: String, attempts: Int, priority: Int, size: Long, blobUri: String, lockedBy: Option[String], failures: List[Failure])
case class ToDoResponse(inProgress: ToDo, waiting: ToDo, failed: ToDo)
}

trait WorkerManifest {

def getToDo(): Either[Failure, ToDoResponse]

def fetchWork(workerName: String, workerCount:Int, workerIndex:Int, maxBatchSize: Int, maxCost: Int): Either[Failure, List[WorkItem]]

def releaseLocks(workerName: String): Either[Failure, Unit]
Expand Down
79 changes: 78 additions & 1 deletion backend/app/services/manifest/Neo4jManifest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import org.joda.time.DateTime
import org.neo4j.driver.v1.Values.parameters
import org.neo4j.driver.v1.{Driver, StatementResult, StatementRunner, Value}
import services.Neo4jQueryLoggingConfig
import services.manifest.Manifest.WorkCounts
import services.manifest.Manifest.{ToDo, ToDoItem, ToDoResponse, WorkCounts}
import utils._
import utils.attempt.{Attempt, Failure, IllegalStateFailure, NotFoundFailure}

Expand Down Expand Up @@ -451,6 +451,83 @@ class Neo4jManifest(driver: Driver, executionContext: ExecutionContext, queryLog
Right(())
}

override def getToDo(): Either[Failure, ToDoResponse] = transaction { tx =>
// TODO DRY out the common parts with fetchWork
val r = tx.run(
s"""
|MATCH (extractor: Extractor)-[todo: TODO]->(blob: Blob:Resource)
|OPTIONAL MATCH (blob)<-[failure:EXTRACTION_FAILURE]-(extractor)
|WITH extractor, todo, blob, collect(failure) as failures
|ORDER BY coalesce(todo.priority, extractor.priority) DESC
|WITH COLLECT({
| extractor: extractor.name,
| ingestion: todo.ingestion,
| attempts: todo.attempts,
| priority: todo.priority,
| size: blob.size,
| blobUri: blob.uri,
| lockedBy: todo.lockedBy,
| failures: failures
|}) as allItems
|RETURN reduce(acc = {inProgress: {total: 0, items:[]}, waiting: {total: 0, items:[]}, failed: {total: 0, items:[]}}, item in allItems |
| CASE
| WHEN item.lockedBy IS NOT NULL
| THEN {
| inProgress: {total: acc.inProgress.total + 1, items: acc.inProgress.items + item},
| waiting: acc.waiting,
| failed: acc.failed
| }
| WHEN item.attempts < {maxExtractionAttempts}
| THEN {
| inProgress: acc.inProgress,
| waiting: {total: acc.waiting.total + 1, items: (
| CASE WHEN size(acc.waiting.items) < {limit} THEN acc.waiting.items + item ELSE acc.waiting.items END
| )},
| failed: acc.failed
| }
| ELSE
| {
| inProgress: acc.inProgress,
| waiting: acc.waiting,
| failed: {total: acc.failed.total + 1, items: (
| CASE WHEN size(acc.failed.items) < {limit} THEN acc.failed.items + item ELSE acc.failed.items END
| )}
| }
| END
|) as result
|""".stripMargin,
parameters(
"limit", 25,
"maxExtractionAttempts", Int.box(maxExtractionAttempts)
),
).single()

def extractToDo(r: Value): ToDo = ToDo(
total = r.get("total").asLong(),
items = r.get("items").asList[ToDoItem](v => ToDoItem(
extractor = v.get("extractor").asString(),
ingestion = v.get("ingestion").asString(),
attempts = v.get("attempts").asInt(),
priority = v.get("priority").asInt(),
size = v.get("size").asLong(),
blobUri = v.get("blobUri").asString(),
lockedBy = if(v.get("lockedBy").isNull) None else Some(v.get("lockedBy").asString()), //TODO also get the lockedAt timestamp
failures = v.get("failures").asList[Manifest.Failure](f => Manifest.Failure(
at = f.get("at").asLong(),
stackTrace = f.get("stackTrace").asString()
)).asScala.toList
)).asScala.toList
)

Right(
ToDoResponse(
inProgress = extractToDo(r.get("result").get("inProgress")),
waiting = extractToDo(r.get("result").get("waiting")),
failed = extractToDo(r.get("result").get("failed"))
)
)
}

override def fetchWork(workerName: String, workerCount: Int, workerIndex: Int, maxBatchSize: Int, maxCost: Int): Either[Failure, List[WorkItem]] = transaction { tx =>
val summary = tx.run(
s"""
Expand Down
2 changes: 2 additions & 0 deletions backend/conf/routes
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ PUT /api/users/:username/register cont

GET /api/currentUser/permissions controllers.api.Users.getMyPermissions()

GET /api/todo controllers.api.Workers.getToDo()

POST /api/video-verifier/fetch controllers.api.VideoVerifier.fetchVideosToVerify()

+ NOCSRF
Expand Down
2 changes: 2 additions & 0 deletions frontend/src/js/App.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import MyUploads from "./components/IngestionEvents/MyUploads";
import AllIngestionEvents from "./components/IngestionEvents/AllIngestionEvents";
import { CaptureFromUrl } from "./components/Uploads/CaptureFromUrl";
import { VideoVerifier } from "./components/VideoVerifier";
import {ToDo} from "./components/Settings/ToDo";

class App extends React.Component {
static propTypes = {
Expand Down Expand Up @@ -108,6 +109,7 @@ class App extends React.Component {
<Route path="/settings/features" component={FeatureSwitches} />
<Route path="/settings/about" component={About} />
<Route path="/settings/uploads" component={WeeklyUploadsFeed} />
<Route path="/settings/todo" component={ToDo} />
<Route
path="/workspaces/:id/:workspaceLocation?"
component={Workspaces}
Expand Down
9 changes: 6 additions & 3 deletions frontend/src/js/components/Settings/SettingsSidebar.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,15 @@ class SettingsSidebar extends React.Component {
<SidebarSearchLink className="sidebar__item" to="/settings/uploads">
<div className="sidebar__item__text">Upload Calendar</div>
</SidebarSearchLink>
<SidebarSearchLink className="sidebar__item" to="/settings/todo">
<div className="sidebar__item__text">TODO</div>
</SidebarSearchLink>
</React.Fragment>
);
};

render() {
const canManageUsers = this.props.myPermissions.includes(
const canPerformAdminOperations = this.props.myPermissions.includes(
"CanPerformAdminOperations",
);

Expand Down Expand Up @@ -95,7 +98,7 @@ class SettingsSidebar extends React.Component {
<SidebarSearchLink className="sidebar__item" to="/settings/features">
<div className="sidebar__item__text">Feature Switches</div>
</SidebarSearchLink>
{canManageUsers ? this.renderAdminSettingsLinks() : false}
{canPerformAdminOperations ? this.renderAdminSettingsLinks() : false}
</div>
<div className="sidebar__group">
<div className="sidebar__title">Logs</div>
Expand All @@ -105,7 +108,7 @@ class SettingsSidebar extends React.Component {
>
<div className="sidebar__item__text">My Uploads</div>
</SidebarSearchLink>
{canManageUsers ? this.renderAdminLogsLinks() : false}
{canPerformAdminOperations ? this.renderAdminLogsLinks() : false}
</div>
</div>
);
Expand Down
131 changes: 131 additions & 0 deletions frontend/src/js/components/Settings/ToDo.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
import {useEffect, useState} from "react";
import authFetch from "../../util/auth/authFetch";
import {EuiBasicTable, EuiButton, EuiPopover, EuiToolTip} from "@elastic/eui";

interface Failure {
at: number,
stackTrace: string
}

interface ToDoItem {
extractor: string;
ingestion: string;
attempts: number;
priority: number;
size: number;
blobUri: string;
lockedBy: string | undefined;
failures: Failure[];
}

interface ToDo {
total: number;
items: ToDoItem[];
}

interface ToDoDisplayProps {
title: string,
todo: ToDo,
}
const ToDoDisplay = ({title, todo}: ToDoDisplayProps) => <>
<h2>{title}</h2>
<div>Showing {todo.items.length} of {todo.total} items</div>
{ /* TODO some grouping by worker, possibly serverside */ }
{ /* TODO make priority changeable */ }
{ /* TODO consider some sort of search */ }
{ /* TODO highlight same blob being locked by multiple workers */ }
<EuiBasicTable tableLayout="auto" items={todo.items} css={{
th: {
position: "sticky",
top: 0,
background: "white",
zIndex: 1,
borderBottom: "1px solid lightgrey",
boxSizing: "border-box"
}
}} columns={[
{
name: "Blob URI",
field: "blobUri",
width: "min-content",
render: (blobUri: string, item) =>
<a href={`/viewer/${blobUri}`} target="_blank" rel="noreferrer">
{blobUri.substring(0, 4)}...{blobUri.substring(blobUri.length - 4)}
</a>,
},
{
name: "Extractor",
field: "extractor",
},
{
name: "Ingestion",
field: "ingestion",
style: {
whiteSpace: "nowrap",
}
},
{
name: "Attempts",
field: "attempts",
},
{
name: "Priority",
field: "priority",
},
{
name: "Size (bytes)",
field: "size",
render: (size: number, item) => size.toLocaleString(),
align: "right",
},
{
name: <span>Previous Failures?<br/>(hover for stack trace)</span>,
field: "failures",
render: (failures: Failure[], item) =>
<EuiToolTip position="left" content={ <ul>
{failures.map(({at, stackTrace}) => (
<li title={stackTrace}>
<strong>{new Date(at).toLocaleString()}</strong>
<br/>
{stackTrace.split("\n")[0].substring(0, 70)}...
</li>
))}
</ul>}>
<span>{failures.length} failure{failures.length !== 1 && "s"}</span>
</EuiToolTip>
}
]} />
</>

export const ToDo = () => {

const [isRefreshing, setIsRefreshing] = useState(false);
const [data, setData] = useState<{
inProgress: ToDo,
waiting: ToDo,
failed: ToDo
} | null>(null);

const refresh = () => {
setIsRefreshing(true);
authFetch("/api/todo").then(res => res.json()).then(res => {
setData(res);
setIsRefreshing(false);
});
}
useEffect(refresh, []);

return (
<div>
<EuiButton isLoading={isRefreshing} disabled={isRefreshing} onClick={refresh} iconType="refresh"
style={{position: "fixed", right: "5px", zIndex: 2, marginTop: "5px"}}>
{isRefreshing ? "Refreshing" : "Refresh"}
</EuiButton>
{data ? (<>
<ToDoDisplay title="In Progress" todo={data.inProgress} />
<ToDoDisplay title="Waiting" todo={data.waiting} />
<ToDoDisplay title="Exceeded Max. Attempts" todo={data.failed} />
</>) : (<div>Loading...</div>)}
</div>
);
}
Loading