Skip to content
88 changes: 76 additions & 12 deletions dev/user.clj
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
query
mk-session
clear-ns-vars!]]
[clara.rules.accumulators :as acc]
[clara.rules.engine :as eng]
[clara.tools.inspect :as inspect]
[clojure.core.cache.wrapped :as cache]))

(comment
Expand All @@ -26,24 +29,85 @@
(insert! {:type :thing/result
:value ?value}))

(defrule return-a-thang
[?thang <- :thang/that [{:keys [value]}] (= value ?value)]
[?thing <- :thing/that [{:keys [value]}] (= value ?value)]
[?zulu <- (acc/all) :from [:zulu/that]]
[:test (and (some? ?thing)
(some? ?thang)
(some? ?zulu))]
=>
(insert! {:type :thang/result
:value ?value}))

(defquery query-a-thing
[]
[?output <- :thing/result])

(defrule default-data
(insert-all!
[{:type :thing/foo
:value 1}
{:type :thing/bar
:value 2}
{:type :thing/bar
:value 3}]))
(defquery query-a-thang
[]
[?output <- :thang/result])

(comment
(time
(-> (mk-session 'user :fact-type-fn :type)
(fire-rules)
(query query-a-thing))))
(do
(def facts1
[{:type :thing/foo
:value 1}
{:type :thing/bar
:value 2}
{:type :thang/that
:value 3}
{:type :zulu/that
:value 4}
{:type :zulu/that
:value 5}
{:type :unmatched-it
:value 100}])

(def facts2
[{:type :thing/that
:value 3}])

(def session
(-> (mk-session 'user :fact-type-fn :type)
(insert-all facts1)
(fire-rules)
(insert-all facts2)
(fire-rules)))

(def components
(eng/components session))

(def get-alphas-fn
(-> components :rulebase :get-alphas-fn)))

(-> components :memory keys)

(->> components :memory :alpha-memory
vals
(mapcat vals)
(mapcat identity)
(map :fact))
(->> components :memory :beta-memory
vals
(mapcat vals)
(mapcat identity)
(mapcat :matches)
(map first))
(->> components :memory :accum-memory
vals
(mapcat vals)
(mapcat vals)
(mapcat first))
(-> components :memory :production-memory)

(get-alphas-fn facts1)
(get-alphas-fn facts2)

(query session query-a-thing)
(query session query-a-thang)

(inspect/inspect-facts session))

(def session-cache
(cache/lru-cache-factory {}))
Expand Down
21 changes: 13 additions & 8 deletions src/main/clojure/clara/rules/compiler.clj
Original file line number Diff line number Diff line change
Expand Up @@ -1889,9 +1889,8 @@

(defn create-get-alphas-fn
"Returns a function that given a sequence of facts,
returns a map associating alpha nodes with the facts they accept."
returns a list of tuples with each containing alpha nodes and the facts they accept."
[fact-type-fn ancestors-fn alpha-roots]

(let [;; If a customized fact-type-fn is provided,
;; we must use a specialized grouping function
;; that handles internal control types that may not
Expand Down Expand Up @@ -1941,28 +1940,34 @@
(.add fact-list fact)
fact-list))))]

(fn [facts]
^{:fact-type-fn wrapped-fact-type-fn
:ancestors-fn wrapped-ancestors-fn}
(fn do-get-alphas
[facts]
(let [roots->facts (java.util.LinkedHashMap.)]
(doseq [fact facts
;;; For each fact, find the matching alpha roots based on its type and ancestors.
roots-group (fact-type->roots (wrapped-fact-type-fn fact))]
;;; Update the map of roots to facts
(update-roots->facts! roots->facts roots-group fact))

(let [return-list (hf/mut-list)
(let [matched-alphas (hf/mut-list)
entries (.entrySet roots->facts)
entries-it (.iterator entries)]
;; We iterate over the LinkedHashMap manually to avoid potential issues described at http://dev.clojure.org/jira/browse/CLJ-1738
;; where a Java iterator can return the same entry object repeatedly and mutate it after each next() call. We use mutable lists
;; for performance but wrap them in unmodifiableList to make it clear that the caller is not expected to mutate these lists.
;; Since after this function returns the only reference to the fact lists will be through the unmodifiedList we can depend elsewhere
;; on these lists not changing. Since the only expected workflow with these lists is to loop through them, not add or remove elements,
;; we don't gain much from using a transient (which can be efficiently converted to a persistent data structure) rather than a mutable type.
;; we don't gain much from using a transient (which can be efficiently converted to a persistent data structure)
;; rather than a mutable type.
(loop []
(when (.hasNext entries-it)
(let [^java.util.Map$Entry e (.next entries-it)]
(.add return-list [(-> e ^AlphaRootsWrapper (.getKey) (.wrapped))
(hf/persistent! (.getValue e))])
(.add matched-alphas [(-> e ^AlphaRootsWrapper (.getKey) (.wrapped))
(hf/persistent! (.getValue e))])
(recur))))
(hf/persistent! return-list))))))
(hf/persistent! matched-alphas))))))

(defn create-ancestors-fn
[{:keys [ancestors-fn
Expand Down
97 changes: 60 additions & 37 deletions src/main/clojure/clara/rules/engine.clj
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@
;; An activation for the given production and token.
(defrecord Activation [node token])

(defn ->RootElement
"Creates a root element with a root fact and empty bindings."
[fact]
(->Element fact {}))

;; Token with no bindings, used as the root of beta nodes.
(def empty-token (->Token [] {}))

Expand Down Expand Up @@ -231,12 +236,12 @@
This is similar to the function of the pending-updates in the fire-rules* loop."
[get-alphas-fn memory transport listener]
(loop []
(let [retractions (deref *pending-external-retractions*)
;; We have already obtained a direct reference to the facts to be
;; retracted in this iteration of the loop outside the cache. Now reset
;; the cache. The retractions we execute may cause new retractions to be queued
;; up, in which case the loop will execute again.
_ (reset! *pending-external-retractions* [])]
(let [retractions (deref *pending-external-retractions*)]
;; We have already obtained a direct reference to the facts to be
;; retracted in this iteration of the loop outside the cache. Now reset
;; the cache. The retractions we execute may cause new retractions to be queued
;; up, in which case the loop will execute again.
(reset! *pending-external-retractions* [])
(doseq [[alpha-roots fact-group] (get-alphas-fn retractions)
root alpha-roots]
(alpha-retract root fact-group memory transport listener))
Expand All @@ -251,7 +256,7 @@
(throw (ex-info "session pending updates missing:" {:session current-session
:label label})))
(letfn [(flush-all [current-session flushed-items?]
(let [{:keys [rulebase transient-memory transport insertions get-alphas-fn listener]} current-session
(let [{:keys [transient-memory transport get-alphas-fn listener]} current-session
pending-updates (-> current-session :pending-updates uc/get-updates-and-reset!)]

(if (empty? pending-updates)
Expand Down Expand Up @@ -294,7 +299,7 @@
This should only be used for facts explicitly retracted in a RHS.
It should not be used for retractions that occur as part of automatic truth maintenance."
[facts]
(let [{:keys [rulebase transient-memory transport insertions get-alphas-fn listener]} *current-session*
(let [{:keys [transient-memory transport insertions get-alphas-fn listener]} *current-session*
{:keys [node token]} *rule-context*]
;; Update the count so the rule engine will know when we have normalized.
(swap! insertions + (count facts))
Expand All @@ -311,7 +316,7 @@
"Perform the actual fact insertion, optionally making them unconditional. This should only
be called once per rule activation for logical insertions."
[facts unconditional]
(let [{:keys [rulebase transient-memory transport insertions get-alphas-fn listener]} *current-session*
(let [{:keys [transient-memory insertions listener]} *current-session*
{:keys [node token]} *rule-context*]

;; Update the insertion count.
Expand Down Expand Up @@ -1969,10 +1974,14 @@

(case op-type

:insertion
:insert
(do
(l/insert-facts! listener nil nil facts)

(when (seq facts)
(mem/add-elements! memory mem/ROOT_NODE {}
(map ->RootElement facts)))

(binding [*pending-external-retractions* (atom [])]
;; Bind the external retractions cache so that any logical retractions as a result
;; of these insertions can be cached and executed as a batch instead of eagerly realizing
Expand All @@ -1983,10 +1992,14 @@
(alpha-activate root fact-group memory transport listener))
(external-retract-loop get-alphas-fn memory transport listener)))

:retraction
:retract
(do
(l/retract-facts! listener nil nil facts)

(when (seq facts)
(mem/remove-elements! memory mem/ROOT_NODE {}
(map ->RootElement facts)))

(binding [*pending-external-retractions* (atom facts)]
(external-retract-loop get-alphas-fn memory transport listener)))))

Expand All @@ -1995,27 +2008,37 @@
(let [insertions (sequence
(comp (filter (fn [pending-op]
(= (:type pending-op)
:insertion)))
:insert)))
(mapcat :facts))
pending-operations)

retractions (sequence
(comp (filter (fn [pending-op]
(= (:type pending-op)
:retraction)))
:retract)))
(mapcat :facts))
pending-operations)]
;; Insertions should come before retractions so that if we insert and then retract the same
;; fact that is not already in the session the end result will be that the session won't have that fact.
;; If retractions came first then we'd first retract a fact that isn't in the session, which doesn't do anything,
;; and then later we would insert the fact.

(when (seq insertions)
(mem/add-elements! memory mem/ROOT_NODE {}
(map ->RootElement insertions)))

(doseq [[alpha-roots fact-group] (get-alphas-fn insertions)
root alpha-roots]
(alpha-activate root fact-group memory transport listener))

(when (seq retractions)
(mem/remove-elements! memory mem/ROOT_NODE {}
(map ->RootElement retractions)))

(doseq [[alpha-roots fact-group] (get-alphas-fn retractions)
root alpha-roots]
(alpha-retract root fact-group memory transport listener))

(fire-rules-handler session opts))))))

(defn- query*
Expand Down Expand Up @@ -2046,15 +2069,16 @@
ISession
(insert [session facts]

(let [new-pending-operations (conj pending-operations (uc/->PendingUpdate :insertion
;; Preserve the behavior prior to https://github.com/cerner/clara-rules/issues/268
;; , particularly for the Java API, where the caller could freely mutate a
;; collection of facts after passing it to Clara for the constituent
;; facts to be inserted or retracted. If the caller passes a persistent
;; Clojure collection don't do any additional work.
(if (coll? facts)
facts
(into [] facts))))]
(let [new-pending-operations (conj pending-operations
(uc/->PendingUpdate :insert
;; Preserve the behavior prior to https://github.com/cerner/clara-rules/issues/268
;; , particularly for the Java API, where the caller could freely mutate a
;; collection of facts after passing it to Clara for the constituent
;; facts to be inserted or retracted. If the caller passes a persistent
;; Clojure collection don't do any additional work.
(if (coll? facts)
facts
(into [] facts))))]

(->LocalSession rulebase
memory
Expand All @@ -2065,11 +2089,12 @@

(retract [session facts]

(let [new-pending-operations (conj pending-operations (uc/->PendingUpdate :retraction
;; As in insert above defend against facts being a mutable collection.
(if (coll? facts)
facts
(into [] facts))))]
(let [new-pending-operations (conj pending-operations
(uc/->PendingUpdate :retract
;; As in insert above defend against facts being a mutable collection.
(if (coll? facts)
facts
(into [] facts))))]

(->LocalSession rulebase
memory
Expand All @@ -2085,11 +2110,10 @@
(fire-rules [session opts]
(let [transient-memory (mem/to-transient memory)
transient-listener (l/to-transient listener)]
(fire-rules*
rulebase transient-memory transport
transient-listener get-alphas-fn
pending-operations opts
fire-rules!)
(fire-rules* rulebase transient-memory transport
transient-listener get-alphas-fn
pending-operations opts
fire-rules!)
(->LocalSession rulebase
(mem/to-persistent! transient-memory)
transport
Expand All @@ -2103,11 +2127,10 @@
(async
(let [transient-memory (mem/to-transient memory)
transient-listener (l/to-transient listener)]
(<! (fire-rules*
rulebase transient-memory transport
transient-listener get-alphas-fn
pending-operations opts
fire-rules-async!))
(<! (fire-rules* rulebase transient-memory transport
transient-listener get-alphas-fn
pending-operations opts
fire-rules-async!))
(->LocalSession rulebase
(mem/to-persistent! transient-memory)
transport
Expand Down
4 changes: 3 additions & 1 deletion src/main/clojure/clara/rules/memory.clj
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@
;; Converts the transient memory to persistent form.
(to-persistent! [memory]))

(def ROOT_NODE_ID 0)
(def ROOT_NODE {:id ROOT_NODE_ID})

(defn- coll-empty?
"Returns true if the collection is empty. Does not call seq due to avoid
overhead that may cause for non-persistent collection types, e.g.
Expand Down Expand Up @@ -447,7 +450,6 @@
(declare ->PersistentLocalMemory)

;;; Transient local memory implementation. Typically only persistent memory will be visible externally.

(deftype TransientLocalMemory [rulebase
activation-group-sort-fn
activation-group-fn
Expand Down
Loading