Skip to content

Thread pool for consumption of a stream (aka work queues) #131

@hansenpansen

Description

@hansenpansen

Hello,

thank you very, very much for this incredible project and the effort you put into it.

I am wondering whether I understood the execution model correctly. I am trying to build a work queue, were data is put irregularly onto a stream via manifold.stream/put!. My naïve idea was to manifold.stream/map the (blocking) worker function with multiple workers (a manifold.executor/fixed-thread-executor pool assigned via manifold.stream/onto) and consume the results with manifold.stream/consume. But this resulted in a serial processing of the input. I found #126, but am unsure if that is the proper way.

At Slack, @dm3 was super-helpful and patiently went through this with me, and came up with the following proposal:

(defn fork
  "Takes an `src` stream and returns `n` forked streams which will receive
  messages from `src` on the first-come, first-served basis.

  Once the `src` stream is exhausted, the forked streams will be closed as
  well.

  Takes a map of options as the third argument:

    * `pool` - the executor where the execution will happen. Uses the
       Manifold `execute-pool` by default.
    * `generator` - function which produces the forked streams. Unbuffered
      `stream` by default."
  ([src n] (fork src n {:pool (ex/execute-pool), :generator s/stream}))
  ([src n {:keys [pool generator]}]
   (let [src' (s/stream)
         dsts (take n (repeatedly generator))
         ^java.util.concurrent.BlockingQueue ready
         (doto (java.util.concurrent.ArrayBlockingQueue. n)
           (.addAll dsts))

         free-up! #(.offer ready %)
         next! #(.take ready)
         send! #(-> (s/put! %1 %2)
                    (d/chain
                      (fn [result]
                        (if result
                          (free-up! %1)
                          (s/close! %1)))))]

     ;; in case anyone else wants to consume `src`
     (s/connect src src')

     (d/loop [dsts (.take ready)]
             (-> (s/take! src' ::none)
                 (d/chain
                   (fn [result]
                     (if (= result ::none)
                       (doseq [d dsts]
                         (s/close! d))
                       (do (d/future-with pool
                                          (send! dsts result))
                           (d/chain (next!)
                                    #(d/recur %))))))))

     (map s/source-only dsts))))

What is your valued opinion on realizing concurrent producer/consumer queues with manifold?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions