From 4fbf8face2ad698ef6f1d8afb5bbd689bf4df4d5 Mon Sep 17 00:00:00 2001 From: Tristan Nelson Date: Fri, 4 Nov 2022 16:02:42 -0400 Subject: [PATCH 1/3] Working asyc jena instance --- src/genegraph/database/dataset.clj | 92 ++++++++++++++++++++++++++++++ 1 file changed, 92 insertions(+) create mode 100644 src/genegraph/database/dataset.clj diff --git a/src/genegraph/database/dataset.clj b/src/genegraph/database/dataset.clj new file mode 100644 index 00000000..9e9cd008 --- /dev/null +++ b/src/genegraph/database/dataset.clj @@ -0,0 +1,92 @@ +(ns genegraph.database.dataset + "Namespace for handling operations on persistent Jena datasets" + (:require [clojure.spec.alpha :as spec] + [clojure.core.async :as async]) + (:import [org.apache.jena.tdb2 TDB2Factory] + [org.apache.jena.query Dataset ReadWrite] + [java.util.concurrent BlockingQueue ArrayBlockingQueue TimeUnit] + [java.util List ArrayList])) + + +(defrecord PersistentDataset [dataset + run-atom + complete-promise + write-queue + write-queue-size + name] + + java.io.Closeable + (close [this] + (println "closing dataset") + (reset! run-atom false) + (if (deref complete-promise (* 5 1000) false) + (.close dataset) + (do (throw (ex-info "Timeout closing dataset." + (select-keys this [:name]))) + false)))) + + +(defn execute + "Execute command on dataset" + [dataset command] + ;; consider raising exception if bad command + (case (:command command) + :replace (.replaceNamedModel dataset + (:model-name command) + (:model command)) + :remove (.removeNamedModel dataset + (:model-name command)) + (throw (ex-info "Invalid command" command)))) + +(defn write-loop + "Read commands from queue and execute them on dataset. Pulls multiple records + from the queue if possible given buffer limit and availability" + [{:keys [write-queue-size write-queue run-atom complete-promise dataset] :as dataset-record}] + (let [write-buffer (ArrayList. write-queue-size)] + (println "init write loop") + (while @run-atom + (try + (when-let [first-command (.poll write-queue 1000 TimeUnit/MILLISECONDS)] + (.begin dataset ReadWrite/WRITE) + (println "write lock acquired") + (execute dataset first-command) + (.drainTo write-queue write-buffer write-queue-size) + (run! #(execute dataset %) write-buffer) + (.clear write-buffer) + (.commit dataset) + (.end dataset)) + (catch Exception e (println e)))) + (println "complete write loop") + (deliver complete-promise true))) + +(defn execute-async + "execute command list asynchronously" + [{:keys [run-atom write-queue] :as dataset} commands] + (when @run-atom + (run! #(.put write-queue %) commands))) + +(defn execute-sync + "Execute command list synchronously. Will claim write transaction." + [{:keys [dataset]} commands] + (.begin dataset ReadWrite/WRITE) + (run! #(execute dataset %) commands) + (.commit dataset) + (.end dataset)) + +(defn dataset [opts] + (let [write-queue-size (or (:write-queue-size opts) 100) + persistent-dataset (map->PersistentDataset + (merge + opts + {:dataset (if (:path opts) + (TDB2Factory/connectDataset (:path opts)) + (TDB2Factory/createDataset)) + :run-atom (atom true) + :complete-promise (promise) + :write-queue-size write-queue-size + :write-queue (ArrayBlockingQueue. + write-queue-size)}))] + + (.start (Thread. #(println "initializing dataset async thread"))) + (.start (Thread. #(write-loop persistent-dataset))) + persistent-dataset)) From b121924ccd7e3c6c55abc0f68ee620e99a9dbee3 Mon Sep 17 00:00:00 2001 From: Tristan Nelson Date: Mon, 7 Nov 2022 11:01:35 -0500 Subject: [PATCH 2/3] Adding promise functionality --- src/genegraph/database/dataset.clj | 38 +++++++++++--------- src/genegraph/database/query/algebra.clj | 2 +- src/genegraph/source/graphql/gene_dosage.clj | 3 +- src/genegraph/suggest/infix_suggester.clj | 2 +- 4 files changed, 26 insertions(+), 19 deletions(-) diff --git a/src/genegraph/database/dataset.clj b/src/genegraph/database/dataset.clj index 9e9cd008..32d87b78 100644 --- a/src/genegraph/database/dataset.clj +++ b/src/genegraph/database/dataset.clj @@ -1,11 +1,13 @@ (ns genegraph.database.dataset - "Namespace for handling operations on persistent Jena datasets" + "Namespace for handling operations on persistent Jena datasets. + Specifically designed around handling asychronous writes. " (:require [clojure.spec.alpha :as spec] [clojure.core.async :as async]) (:import [org.apache.jena.tdb2 TDB2Factory] [org.apache.jena.query Dataset ReadWrite] [java.util.concurrent BlockingQueue ArrayBlockingQueue TimeUnit] - [java.util List ArrayList])) + [java.util List ArrayList] + [org.apache.jena.query.text TextDatasetFactory])) (defrecord PersistentDataset [dataset @@ -17,7 +19,6 @@ java.io.Closeable (close [this] - (println "closing dataset") (reset! run-atom false) (if (deref complete-promise (* 5 1000) false) (.close dataset) @@ -38,25 +39,29 @@ (:model-name command)) (throw (ex-info "Invalid command" command)))) +(defn deliver-commit-promise + "Deliver promise when command is committed to the database." + [command] + (when-let [committed-promise (:committed command)] + (deliver committed-promise true))) + (defn write-loop "Read commands from queue and execute them on dataset. Pulls multiple records from the queue if possible given buffer limit and availability" [{:keys [write-queue-size write-queue run-atom complete-promise dataset] :as dataset-record}] (let [write-buffer (ArrayList. write-queue-size)] - (println "init write loop") (while @run-atom (try (when-let [first-command (.poll write-queue 1000 TimeUnit/MILLISECONDS)] - (.begin dataset ReadWrite/WRITE) - (println "write lock acquired") - (execute dataset first-command) (.drainTo write-queue write-buffer write-queue-size) - (run! #(execute dataset %) write-buffer) - (.clear write-buffer) - (.commit dataset) - (.end dataset)) + (let [commands (cons first-command write-buffer)] + (.begin dataset ReadWrite/WRITE) + (run! #(execute dataset %) commands) + (.clear write-buffer) + (.commit dataset) + (.end dataset) + (run! deliver-commit-promise commands))) (catch Exception e (println e)))) - (println "complete write loop") (deliver complete-promise true))) (defn execute-async @@ -78,15 +83,16 @@ persistent-dataset (map->PersistentDataset (merge opts - {:dataset (if (:path opts) + {:dataset (cond + (:assembly-path opts) + (TextDatasetFactory/create (:assembly-path)) + (:path opts) (TDB2Factory/connectDataset (:path opts)) - (TDB2Factory/createDataset)) + :else (TDB2Factory/createDataset)) :run-atom (atom true) :complete-promise (promise) :write-queue-size write-queue-size :write-queue (ArrayBlockingQueue. write-queue-size)}))] - - (.start (Thread. #(println "initializing dataset async thread"))) (.start (Thread. #(write-loop persistent-dataset))) persistent-dataset)) diff --git a/src/genegraph/database/query/algebra.clj b/src/genegraph/database/query/algebra.clj index e898bf65..0f143519 100644 --- a/src/genegraph/database/query/algebra.clj +++ b/src/genegraph/database/query/algebra.clj @@ -1,5 +1,5 @@ (ns genegraph.database.query.algebra - (:require [genegraph.database.instance :refer [db]] + (:require #_[genegraph.database.instance :refer [db]] ; TODO remove if this works [genegraph.database.query.types :as types] [genegraph.database.util :as util :refer [tx]] [genegraph.database.names :as names :refer diff --git a/src/genegraph/source/graphql/gene_dosage.clj b/src/genegraph/source/graphql/gene_dosage.clj index ba7f5ccd..f6b0d331 100644 --- a/src/genegraph/source/graphql/gene_dosage.clj +++ b/src/genegraph/source/graphql/gene_dosage.clj @@ -4,7 +4,8 @@ [genegraph.database.query :as q] [genegraph.source.graphql.common.cache :refer [defresolver]] [com.walmartlabs.lacinia.schema :refer [tag-with-type]] - [genegraph.database.instance :refer [db]]) + #_[genegraph.database.instance :refer [db]] ; TODO remove if this works + ) (:import [org.apache.jena.query ReadWrite QueryFactory QueryExecutionFactory] [org.apache.jena.sparql.util QueryExecUtils])) diff --git a/src/genegraph/suggest/infix_suggester.clj b/src/genegraph/suggest/infix_suggester.clj index 1b7bc52f..eb5c5c5f 100644 --- a/src/genegraph/suggest/infix_suggester.clj +++ b/src/genegraph/suggest/infix_suggester.clj @@ -2,7 +2,7 @@ (:require [clojure.string :as str] [mount.core :as mount :refer [defstate]] [genegraph.env :as env] - [genegraph.database.instance :refer [db]] + #_[genegraph.database.instance :refer [db]] ;; TODO remove if this works [genegraph.database.query :as q] [genegraph.source.graphql.condition :as condition] [genegraph.source.graphql.common.curation :as curation] From 15150a0314686e9e2a73985d6e944043c3c4b08c Mon Sep 17 00:00:00 2001 From: Tristan Nelson Date: Thu, 10 Nov 2022 14:07:54 -0500 Subject: [PATCH 3/3] Wiring into the interceptor stack --- src/genegraph/annotate.clj | 34 ++++++++++++++- src/genegraph/annotate/dataset.clj | 3 ++ src/genegraph/annotate/interface.clj | 4 ++ src/genegraph/database/dataset.clj | 65 ++++++++++++++++++++++++++-- src/genegraph/database/instance.clj | 6 ++- 5 files changed, 105 insertions(+), 7 deletions(-) create mode 100644 src/genegraph/annotate/dataset.clj create mode 100644 src/genegraph/annotate/interface.clj diff --git a/src/genegraph/annotate.clj b/src/genegraph/annotate.clj index 94171019..14c43b77 100644 --- a/src/genegraph/annotate.clj +++ b/src/genegraph/annotate.clj @@ -1,9 +1,11 @@ (ns genegraph.annotate - (:require [genegraph.annotate.action :as action] + (:require [genegraph.annotate.interface :as annotate-interface] + [genegraph.annotate.action :as action] [genegraph.annotate.replaces :as replaces] [genegraph.database.query :as q] [genegraph.database.validation :as validate] [genegraph.database.util :as util :refer [tx]] + [genegraph.database.instance :as instance] [genegraph.env :as env] [genegraph.interceptor :as intercept :refer [interceptor-enter-def]] [genegraph.transform.core :as transform] @@ -206,3 +208,33 @@ (def add-jsonld-interceptor {:name ::add-jsonld-interceptor :enter (fn [event] (xform-types/add-model-jsonld event))}) + +(defmethod annotate-interface/add-dataset :default [event] + (assoc event ::dataset instance/db)) + +(def add-dataset-interceptor + {:name ::add-dataset-interceptor + :enter annotate-interface/add-dataset}) + +(defn add-command [event command] + (update event ::dataset-commands conj command)) + +(defmethod annotate-interface/add-dataset-commands :default [event] + (cond-> event + (= :publish (::action event)) (add-command + {:command :replace + :model-name (::iri event) + :model (::q/model event)}) + (= :unpublish (::action event)) (add-command + {:command :remove + :model-name (::iri event)}) + (= :unpublish (::action event)) (add-command + {:command :remove + :model-name (::iri event)}) + (::replaces event) (add-command + {:command :remove + :model-name (::replaces event)}))) + +(def add-dataset-commands-interceptor + {:name ::add-dataset-commands-interceptor + :enter annotate-interface/add-dataset-commands}) diff --git a/src/genegraph/annotate/dataset.clj b/src/genegraph/annotate/dataset.clj new file mode 100644 index 00000000..69668ada --- /dev/null +++ b/src/genegraph/annotate/dataset.clj @@ -0,0 +1,3 @@ +(ns genegraph.annotate.dataset) + +(def ) diff --git a/src/genegraph/annotate/interface.clj b/src/genegraph/annotate/interface.clj new file mode 100644 index 00000000..0207e026 --- /dev/null +++ b/src/genegraph/annotate/interface.clj @@ -0,0 +1,4 @@ +(ns genegraph.annotate.interface) + +(defmulti add-dataset :genegraph.transform.core/format) +(defmulti add-dataset-commands :genegraph.transform.core/format) diff --git a/src/genegraph/database/dataset.clj b/src/genegraph/database/dataset.clj index 32d87b78..6c00dd30 100644 --- a/src/genegraph/database/dataset.clj +++ b/src/genegraph/database/dataset.clj @@ -4,7 +4,8 @@ (:require [clojure.spec.alpha :as spec] [clojure.core.async :as async]) (:import [org.apache.jena.tdb2 TDB2Factory] - [org.apache.jena.query Dataset ReadWrite] + [org.apache.jena.query Dataset ReadWrite TxnType] + [org.apache.jena.rdf.model Model Resource] [java.util.concurrent BlockingQueue ArrayBlockingQueue TimeUnit] [java.util List ArrayList] [org.apache.jena.query.text TextDatasetFactory])) @@ -16,15 +17,71 @@ write-queue write-queue-size name] + - java.io.Closeable + ;; Setting the behavior of close to cleanly terminate all related + ;; resources + org.apache.jena.query.Dataset (close [this] (reset! run-atom false) (if (deref complete-promise (* 5 1000) false) (.close dataset) (do (throw (ex-info "Timeout closing dataset." (select-keys this [:name]))) - false)))) + false))) + + ;; The rest is just boilerplate to pass through calls to the Dataset + ;; interface to the underlying Dataset object. Ideally the need for this + ;; should diminish as we're able to do more refactoring, but for now + ;; this keeps this implementation consistent with the way datasets + ;; have been used up to this point. + ;; https://jena.apache.org/documentation/javadoc/arq/org.apache.jena.arq/org/apache/jena/query/Dataset.html + ;; note that the inherited Transactional interface is also implemented + + (abort [this] (.abort dataset)) + (^Dataset addNamedModel [this ^String uri ^Model model] (.addNamedModel dataset uri model)) + (^Dataset addNamedModel [this ^Resource resource ^Model model] (.addNamedModel dataset resource model)) + (asDatasetGraph [this] (.asDatasetGraph dataset)) + (^void begin [this ^ReadWrite tx-type] (.begin dataset tx-type)) + (^void begin [this] (.begin dataset)) + (^void begin [this ^TxnType t] (.begin dataset t)) + (calc [this txn-type action] (.calc dataset txn-type action)) + (calculate [this supplier] (.calculate dataset supplier)) + (calculateRead [this supplier] (.calculate dataset supplier)) + (calculateWrite [this supplier] (.calculate dataset supplier)) + (commit [this] (.commit dataset)) + (^boolean containsNamedModel [this ^String uri] (.containsNamedModel dataset uri)) + (^boolean containsNamedModel [this ^Resource resource] (.containsNamedModel dataset resource)) + (end [this] (.end dataset)) + (exec [this tx-type action] (.exec dataset tx-type action)) + (execute [this action] (.execute dataset action)) + (executeRead [this action] (.executeRead dataset action)) + (executeWrite [this action] (.executeWrite dataset action)) + (getContext [this] (.getContext dataset)) + (getDefaultModel [this] (.getDefautModel dataset)) + (getLock [this] (.getLock dataset)) + (^Model getNamedModel [this ^String name] (.getNamedModel dataset name)) + (^Model getNamedModel [this ^Resource name] (.getNamedModel dataset name)) + (getPrefixMapping [this] (.getPrefixMapping dataset)) + (getUnionModel [this] (.getUnionModel dataset)) + (isInTransaction [this] (.isInTransaction dataset)) + (listModelNames [this] (.listModelNames dataset)) + (listNames [this] (.listNames dataset)) + (promote [this] (.promote dataset)) + (promote [this mode] (.promote dataset mode)) + (^Dataset removeNamedModel [this ^String name] + (.removeNamedModel dataset name)) + (^Dataset removeNamedModel [this ^Resource name] + (.removeNamedModel dataset name)) + (^Dataset replaceNamedModel [this ^String name ^Model model] + (.replaceNamedModel dataset name model)) + (^Dataset replaceNamedModel [this ^Resource name ^Model model] + (.replaceNamedModel dataset name model)) + (setDefaultModel [this model] (.setDefaultModel dataset model)) + (supportsTransactionAbort [this] (.supportsTransactionAbort dataset)) + (supportsTransactions [this] (.supportsTransactions dataset)) + (transactionMode [this] (.transactionMode dataset)) + (transactionType [this] (.transactionType dataset))) (defn execute @@ -85,7 +142,7 @@ opts {:dataset (cond (:assembly-path opts) - (TextDatasetFactory/create (:assembly-path)) + (TextDatasetFactory/create (:assembly-path opts)) (:path opts) (TDB2Factory/connectDataset (:path opts)) :else (TDB2Factory/createDataset)) diff --git a/src/genegraph/database/instance.clj b/src/genegraph/database/instance.clj index 16d45419..cae81af6 100644 --- a/src/genegraph/database/instance.clj +++ b/src/genegraph/database/instance.clj @@ -4,7 +4,8 @@ [clojure.java.io :as io] [clojure.string :as string] [mount.core :as mount :refer [defstate]] - [genegraph.env :as env]) + [genegraph.env :as env] + [genegraph.database.dataset :as ds]) (:import [org.apache.jena.query.text TextDatasetFactory])) (def assembly-file "genegraph-assembly.ttl") @@ -27,5 +28,6 @@ path)) (defstate db - :start (TextDatasetFactory/create (get-expanded-assembly-file)) + :start (ds/dataset {:assembly-path (get-expanded-assembly-file)}) + #_(TextDatasetFactory/create (get-expanded-assembly-file)) :stop (.close db))