From 174ab0dcf50f6862e31f42fe1ad21dcf5ecac53c Mon Sep 17 00:00:00 2001 From: Alex Kehayias Date: Wed, 2 Sep 2015 13:27:55 -0400 Subject: [PATCH 1/6] Make filter-from-backend return a lazy seq This is meant to reduce the memory overhead when there are many keys in a given location when trying to use filter-from-backend. Actual calls to s3 are lazy and only perform IO as the sequence is consumed. --- src/archive_bolt/backends/core.clj | 2 +- src/archive_bolt/backends/s3.clj | 64 +++++++++++++++----------- src/archive_bolt/storm.clj | 17 ++++++- test/archive_bolt/backends/s3_test.clj | 27 ++++++----- 4 files changed, 68 insertions(+), 42 deletions(-) diff --git a/src/archive_bolt/backends/core.clj b/src/archive_bolt/backends/core.clj index d41ebc8..1205e1d 100644 --- a/src/archive_bolt/backends/core.clj +++ b/src/archive_bolt/backends/core.clj @@ -30,4 +30,4 @@ (defmethod filter-from-backend :s3 [_ conf location & [pred-fn]] - (s3/filter-from-backend conf location pred-fn)) + (s3/filter-from-backend conf location {:filter-fn pred-fn})) diff --git a/src/archive_bolt/backends/s3.clj b/src/archive_bolt/backends/s3.clj index d01278a..d5becaa 100644 --- a/src/archive_bolt/backends/s3.clj +++ b/src/archive_bolt/backends/s3.clj @@ -1,5 +1,5 @@ (ns archive-bolt.backends.s3 - (:require [backtype.storm.log :as storm] + (:require [backtype.storm.log :as storm] [amazonica.aws.s3 :as s3] [archive-bolt.utils :refer [get-or-throw]] [cheshire.core :as json] @@ -23,14 +23,14 @@ (str "s3://" bucket "/" location))) (defn safe-put - "Attempt to PUT the file to s3 returns full s3 path when successful or + "Attempt to PUT the file to s3 returns full s3 path when successful or nil if unsuccessful. Retries on failure up to max-retries times." [creds bucket location content & {:keys [retry-count max-retries wait-time] :or {retry-count 0, max-retries 10 wait-time 1000}}] ;; Store the content and return the location (try - (put-object creds bucket location content) + (put-object creds bucket location content) (catch Exception e (do (Thread/sleep wait-time) (storm/log-error e " Failed to store in s3. " @@ -68,7 +68,7 @@ :value (try (-> (s3/get-object credentials :bucket-name bucket-name :key key) - :input-stream + :input-stream slurp (json/parse-string true)) (catch Exception e @@ -76,29 +76,39 @@ (format "Failed to get bucket: %s, key: %s, error: %s " bucket-name key e))))}) +;; TODO add an option to set the pagination value and lazy concat +;; results together should give enough control to avoid out of memory issues + (defn filter-from-backend "Search s3 for keys at the given location. Take the list of keys and look them up. If the results are paginated, recur until all results are returned. - Results are paginated by 1,000 keys as per the S3 API docs. Results keys are - filtered by filter-fn. Returns a collection of results." - [conf location & [filter-fn accum marker]] - (let [creds (mk-credentials conf) - ;; For backwards compatibility look for the old key as fallback - bucket-name (or (get conf "ARCHIVE_READ_S3_BUCKET") - (get conf "S3_BUCKET") - (throw (Exception. "Missing config field ARCHIVE_READ_S3_BUCKET"))) - ;; Search s3 for all keys at the location - search-results (s3/list-objects creds - :bucket-name bucket-name - :prefix location - :marker marker) - ;; Grab the keys and optionally filter them - keys ((or filter-fn identity) (get-keys-from-results search-results)) - values (pmap #(lookup-key creds bucket-name location %) keys) - result (concat accum values)] - ;; If there is a next marker then we should recur - (if-let [next-marker (:next-marker search-results)] - ;; NOTE optional args must be in a vector when using recur - (do (storm/log-message "Paging archive results at " location) - (recur conf location [filter-fn result next-marker])) - result))) + Results are paginated by 1,000 keys as per the S3 API docs. Results keys are + filtered by filter-fn. Returns a lazy seq of results." + ([conf location] + (filter-from-backend conf location nil {})) + ([conf location {:keys [filter-fn marker] :as opts}] + (filter-from-backend conf location nil opts)) + ([conf location accum {:keys [filter-fn marker] :as opts}] + (let [creds (mk-credentials conf) + ;; For backwards compatibility look for the old key as fallback + bucket-name (or (get conf "ARCHIVE_READ_S3_BUCKET") + (get conf "S3_BUCKET") + (throw (Exception. "Missing config field ARCHIVE_READ_S3_BUCKET"))) + ;; Search s3 for all keys at the location + search-results (s3/list-objects creds + :bucket-name bucket-name + :prefix location + :marker marker) + ;; Grab the keys and optionally filter them + keys ((or filter-fn identity) (get-keys-from-results search-results)) + values (pmap #(lookup-key creds bucket-name location %) keys)] + ;; If there is a next marker then we should recur + (if-let [next-marker (:next-marker search-results)] + (do (storm/log-message "Paging archive results at " location) + (lazy-cat accum + (filter-from-backend conf + location + (lazy-cat accum values) + {:filter-fn filter-fn + :marker next-marker}))) + (lazy-cat accum values))))) diff --git a/src/archive_bolt/storm.clj b/src/archive_bolt/storm.clj index 2f47417..6f956b6 100644 --- a/src/archive_bolt/storm.clj +++ b/src/archive_bolt/storm.clj @@ -2,7 +2,7 @@ (:require [backtype.storm.clojure :refer [defbolt bolt emit-bolt! ack! fail!]] [backtype.storm.log :refer [log-debug log-warn]] [archive-bolt.backends.core :refer [store filter-from-backend]] - [archive-bolt.fields :as fields]) + [archive-bolt.fields :as fields]) (:gen-class)) @@ -32,7 +32,8 @@ [conf collector tuple & [filter-fn]] (let [{:keys [meta backend location]} tuple filter-fn (or filter-fn identity) - results (filter-from-backend backend conf location filter-fn)] + results (take-while (comp not nil?) + (filter-from-backend backend conf location {:filter-fn filter-fn}))] (if (seq results) (emit-bolt! collector [meta results] :anchor tuple) (log-debug (format "No results returned from %s backend at %s" @@ -51,3 +52,15 @@ {:prepare true :params [filter-fn]} [conf context collector] (bolt (execute [tuple] (-archive-read conf collector tuple (resolve filter-fn))))) + +(defbolt archive-read-chunked fields/archive-read-output-fields + {:prepare true :params [chunk-n]} + [conf context collector] + (bolt + (execute + [tuple] + (let [{:keys [meta backend location]} tuple + results (filter-from-backend backend conf location)] + (doseq [r (partition-all chunk-n (take-while (comp not nil?) results))] + (emit-bolt! collector [meta results] :anchor tuple)) + (ack! collector tuple))))) diff --git a/test/archive_bolt/backends/s3_test.clj b/test/archive_bolt/backends/s3_test.clj index b397c79..7892092 100644 --- a/test/archive_bolt/backends/s3_test.clj +++ b/test/archive_bolt/backends/s3_test.clj @@ -52,11 +52,12 @@ (deftest test-filter-from-backend (with-s3-key - #(is (= (backend/filter-from-backend :s3 (get-test-conf) test-location) - [{:meta {:location test-location - :full-path test-key - :file-name test-file-name} - :value test-content}])))) + #(is (= [{:meta {:location test-location + :full-path test-key + :file-name test-file-name} + :value test-content}] + (take-while (comp not nil?) + (backend/filter-from-backend :s3 (get-test-conf) test-location)))))) (deftest test-filter-from-backend-no-results) #(is (= (backend/filter-from-backend :s3 (get-test-conf) test-location) @@ -67,10 +68,11 @@ {:object-summaries [{:key "foo"} {:key "foo"}] :next-marker nil}) s3-backend/lookup-key (fn [_ _ _ k] {k k})] - (is (= (backend/filter-from-backend :s3 - (get-test-conf) - test-location - (fn [coll] (set coll))) + (is (= (take-while (comp not nil?) + (backend/filter-from-backend :s3 + (get-test-conf) + test-location + (fn [coll] (set coll)))) [{"foo" "foo"}])))) (defn mock-list-objects @@ -82,6 +84,7 @@ (deftest test-filter-from-backend-pagination "Test paging through results of searching s3 without actually hitting s3" (with-redefs [s3/list-objects mock-list-objects - s3-backend/lookup-key (fn [_ _ _ k] {k k})] - (is (= (backend/filter-from-backend :s3 (get-test-conf) test-location) - [{"bar" "bar"} {"foo" "foo"}])))) + s3-backend/lookup-key (fn [_ _ _ k] k)] + (is (= ["bar" "foo"] + (take-while (comp not nil?) + (backend/filter-from-backend :s3 (get-test-conf) test-location)))))) From 4ef1a8647dadf4b4a0a72b51aa41242af147bc5a Mon Sep 17 00:00:00 2001 From: Alex Kehayias Date: Wed, 2 Sep 2015 13:40:06 -0400 Subject: [PATCH 2/6] Remove duplicate lazy-cat --- src/archive_bolt/backends/s3.clj | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/src/archive_bolt/backends/s3.clj b/src/archive_bolt/backends/s3.clj index d5becaa..0be0a8e 100644 --- a/src/archive_bolt/backends/s3.clj +++ b/src/archive_bolt/backends/s3.clj @@ -105,10 +105,6 @@ ;; If there is a next marker then we should recur (if-let [next-marker (:next-marker search-results)] (do (storm/log-message "Paging archive results at " location) - (lazy-cat accum - (filter-from-backend conf - location - (lazy-cat accum values) - {:filter-fn filter-fn - :marker next-marker}))) + (recur conf location (lazy-cat accum values) {:filter-fn filter-fn + :marker next-marker})) (lazy-cat accum values))))) From 8d8dfc30b0acad0b08ee78e26dc6cec1cba9e796 Mon Sep 17 00:00:00 2001 From: Alex Kehayias Date: Wed, 2 Sep 2015 13:58:00 -0400 Subject: [PATCH 3/6] Remove accum in filter from backend --- src/archive_bolt/backends/s3.clj | 9 ++++----- test/archive_bolt/backends/s3_test.clj | 10 ++++++---- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/src/archive_bolt/backends/s3.clj b/src/archive_bolt/backends/s3.clj index 0be0a8e..92856fc 100644 --- a/src/archive_bolt/backends/s3.clj +++ b/src/archive_bolt/backends/s3.clj @@ -87,8 +87,6 @@ ([conf location] (filter-from-backend conf location nil {})) ([conf location {:keys [filter-fn marker] :as opts}] - (filter-from-backend conf location nil opts)) - ([conf location accum {:keys [filter-fn marker] :as opts}] (let [creds (mk-credentials conf) ;; For backwards compatibility look for the old key as fallback bucket-name (or (get conf "ARCHIVE_READ_S3_BUCKET") @@ -105,6 +103,7 @@ ;; If there is a next marker then we should recur (if-let [next-marker (:next-marker search-results)] (do (storm/log-message "Paging archive results at " location) - (recur conf location (lazy-cat accum values) {:filter-fn filter-fn - :marker next-marker})) - (lazy-cat accum values))))) + (concat values + (lazy-seq (filter-from-backend conf location {:filter-fn filter-fn + :marker next-marker})))) + values)))) diff --git a/test/archive_bolt/backends/s3_test.clj b/test/archive_bolt/backends/s3_test.clj index 7892092..9704803 100644 --- a/test/archive_bolt/backends/s3_test.clj +++ b/test/archive_bolt/backends/s3_test.clj @@ -77,14 +77,16 @@ (defn mock-list-objects [_ & {:keys [bucket-name prefix marker]}] - (if (= marker 123) - {:object-summaries [{:key "foo"}] :next-marker nil} - {:object-summaries [{:key "bar"}] :next-marker 123})) + (if-not marker + {:object-summaries [{:key "foo"}] :next-marker 123} + (condp = marker + 123 {:object-summaries [{:key "bar"}] :next-marker 456} + 456 {:object-summaries [{:key "baz"}] :next-marker nil}))) (deftest test-filter-from-backend-pagination "Test paging through results of searching s3 without actually hitting s3" (with-redefs [s3/list-objects mock-list-objects s3-backend/lookup-key (fn [_ _ _ k] k)] - (is (= ["bar" "foo"] + (is (= ["foo" "bar" "baz"] (take-while (comp not nil?) (backend/filter-from-backend :s3 (get-test-conf) test-location)))))) From 06eb538bbb40090d65323e44a93b7c6cd83a295a Mon Sep 17 00:00:00 2001 From: Alex Kehayias Date: Wed, 2 Sep 2015 14:11:07 -0400 Subject: [PATCH 4/6] Wrap filter from backend in a lazy-seq --- src/archive_bolt/backends/s3.clj | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/archive_bolt/backends/s3.clj b/src/archive_bolt/backends/s3.clj index 92856fc..1532238 100644 --- a/src/archive_bolt/backends/s3.clj +++ b/src/archive_bolt/backends/s3.clj @@ -87,7 +87,8 @@ ([conf location] (filter-from-backend conf location nil {})) ([conf location {:keys [filter-fn marker] :as opts}] - (let [creds (mk-credentials conf) + (lazy-seq + (let [creds (mk-credentials conf) ;; For backwards compatibility look for the old key as fallback bucket-name (or (get conf "ARCHIVE_READ_S3_BUCKET") (get conf "S3_BUCKET") @@ -104,6 +105,6 @@ (if-let [next-marker (:next-marker search-results)] (do (storm/log-message "Paging archive results at " location) (concat values - (lazy-seq (filter-from-backend conf location {:filter-fn filter-fn - :marker next-marker})))) - values)))) + (filter-from-backend conf location {:filter-fn filter-fn + :marker next-marker}))) + values))))) From 5eabe055e1c47b896cdb390ad3528a75ffb59a76 Mon Sep 17 00:00:00 2001 From: Alex Kehayias Date: Wed, 2 Sep 2015 14:18:49 -0400 Subject: [PATCH 5/6] Remove the use of take-while when getting filtered results --- src/archive_bolt/storm.clj | 5 ++--- test/archive_bolt/backends/s3_test.clj | 17 +++++++---------- 2 files changed, 9 insertions(+), 13 deletions(-) diff --git a/src/archive_bolt/storm.clj b/src/archive_bolt/storm.clj index 6f956b6..0800bb2 100644 --- a/src/archive_bolt/storm.clj +++ b/src/archive_bolt/storm.clj @@ -32,8 +32,7 @@ [conf collector tuple & [filter-fn]] (let [{:keys [meta backend location]} tuple filter-fn (or filter-fn identity) - results (take-while (comp not nil?) - (filter-from-backend backend conf location {:filter-fn filter-fn}))] + results (filter-from-backend backend conf location {:filter-fn filter-fn})] (if (seq results) (emit-bolt! collector [meta results] :anchor tuple) (log-debug (format "No results returned from %s backend at %s" @@ -61,6 +60,6 @@ [tuple] (let [{:keys [meta backend location]} tuple results (filter-from-backend backend conf location)] - (doseq [r (partition-all chunk-n (take-while (comp not nil?) results))] + (doseq [r (partition-all chunk-n results)] (emit-bolt! collector [meta results] :anchor tuple)) (ack! collector tuple))))) diff --git a/test/archive_bolt/backends/s3_test.clj b/test/archive_bolt/backends/s3_test.clj index 9704803..27810f0 100644 --- a/test/archive_bolt/backends/s3_test.clj +++ b/test/archive_bolt/backends/s3_test.clj @@ -56,8 +56,7 @@ :full-path test-key :file-name test-file-name} :value test-content}] - (take-while (comp not nil?) - (backend/filter-from-backend :s3 (get-test-conf) test-location)))))) + (backend/filter-from-backend :s3 (get-test-conf) test-location))))) (deftest test-filter-from-backend-no-results) #(is (= (backend/filter-from-backend :s3 (get-test-conf) test-location) @@ -68,12 +67,11 @@ {:object-summaries [{:key "foo"} {:key "foo"}] :next-marker nil}) s3-backend/lookup-key (fn [_ _ _ k] {k k})] - (is (= (take-while (comp not nil?) - (backend/filter-from-backend :s3 - (get-test-conf) - test-location - (fn [coll] (set coll)))) - [{"foo" "foo"}])))) + (is (= [{"foo" "foo"}] + (backend/filter-from-backend :s3 + (get-test-conf) + test-location + (fn [coll] (set coll))))))) (defn mock-list-objects [_ & {:keys [bucket-name prefix marker]}] @@ -88,5 +86,4 @@ (with-redefs [s3/list-objects mock-list-objects s3-backend/lookup-key (fn [_ _ _ k] k)] (is (= ["foo" "bar" "baz"] - (take-while (comp not nil?) - (backend/filter-from-backend :s3 (get-test-conf) test-location)))))) + (backend/filter-from-backend :s3 (get-test-conf) test-location))))) From 4c7c4fba6c27a6e6dfb5724087487de16f81405e Mon Sep 17 00:00:00 2001 From: Alex Kehayias Date: Wed, 2 Sep 2015 16:30:41 -0400 Subject: [PATCH 6/6] Fix passing of filter fn to filter-from-backend in storm bolts --- src/archive_bolt/backends/core.clj | 4 +-- src/archive_bolt/backends/s3.clj | 48 +++++++++++++------------- src/archive_bolt/storm.clj | 6 ++-- test/archive_bolt/backends/s3_test.clj | 2 +- test/archive_bolt/storm_test.clj | 44 ++++++++++++++++++----- 5 files changed, 66 insertions(+), 38 deletions(-) diff --git a/src/archive_bolt/backends/core.clj b/src/archive_bolt/backends/core.clj index 1205e1d..b8ca587 100644 --- a/src/archive_bolt/backends/core.clj +++ b/src/archive_bolt/backends/core.clj @@ -29,5 +29,5 @@ (throw (Exception. (str "Backend not found for " backend)))) (defmethod filter-from-backend :s3 - [_ conf location & [pred-fn]] - (s3/filter-from-backend conf location {:filter-fn pred-fn})) + [_ conf location & [opts]] + (s3/filter-from-backend conf location opts)) diff --git a/src/archive_bolt/backends/s3.clj b/src/archive_bolt/backends/s3.clj index 1532238..9a5f327 100644 --- a/src/archive_bolt/backends/s3.clj +++ b/src/archive_bolt/backends/s3.clj @@ -76,35 +76,35 @@ (format "Failed to get bucket: %s, key: %s, error: %s " bucket-name key e))))}) -;; TODO add an option to set the pagination value and lazy concat -;; results together should give enough control to avoid out of memory issues - (defn filter-from-backend "Search s3 for keys at the given location. Take the list of keys and look them up. If the results are paginated, recur until all results are returned. - Results are paginated by 1,000 keys as per the S3 API docs. Results keys are - filtered by filter-fn. Returns a lazy seq of results." + Results are paginated by ARCHIVE_READ_S3_MAX_KEYS keys as per the S3 + API docs. Results keys are filtered by filter-fn. Returns a lazy seq of results." ([conf location] (filter-from-backend conf location nil {})) ([conf location {:keys [filter-fn marker] :as opts}] (lazy-seq (let [creds (mk-credentials conf) - ;; For backwards compatibility look for the old key as fallback - bucket-name (or (get conf "ARCHIVE_READ_S3_BUCKET") - (get conf "S3_BUCKET") - (throw (Exception. "Missing config field ARCHIVE_READ_S3_BUCKET"))) - ;; Search s3 for all keys at the location - search-results (s3/list-objects creds - :bucket-name bucket-name - :prefix location - :marker marker) - ;; Grab the keys and optionally filter them - keys ((or filter-fn identity) (get-keys-from-results search-results)) - values (pmap #(lookup-key creds bucket-name location %) keys)] - ;; If there is a next marker then we should recur - (if-let [next-marker (:next-marker search-results)] - (do (storm/log-message "Paging archive results at " location) - (concat values - (filter-from-backend conf location {:filter-fn filter-fn - :marker next-marker}))) - values))))) + ;; For backwards compatibility look for the old key as fallback + bucket-name (or (get conf "ARCHIVE_READ_S3_BUCKET") + (get conf "S3_BUCKET") + (throw (Exception. "Missing config field ARCHIVE_READ_S3_BUCKET"))) + max-keys (if-let [n-keys (get conf "ARCHIVE_READ_S3_MAX_KEYS")] + (Float. n-keys) + 1000) + ;; Search s3 for all keys at the location + search-results (s3/list-objects creds + :bucket-name bucket-name + :prefix location + :marker marker + :max-keys max-keys) + ;; Grab the keys and optionally filter them + ks ((or filter-fn identity) (get-keys-from-results search-results)) + values (pmap #(lookup-key creds bucket-name location %) ks)] + ;; If there is a next marker then we should recur + (if-let [next-marker (:next-marker search-results)] + (do (storm/log-message "Paging archive results at " location) + (concat values + (filter-from-backend conf location (assoc opts :marker next-marker)))) + values))))) diff --git a/src/archive_bolt/storm.clj b/src/archive_bolt/storm.clj index 0800bb2..e70ee99 100644 --- a/src/archive_bolt/storm.clj +++ b/src/archive_bolt/storm.clj @@ -1,6 +1,6 @@ (ns archive-bolt.storm (:require [backtype.storm.clojure :refer [defbolt bolt emit-bolt! ack! fail!]] - [backtype.storm.log :refer [log-debug log-warn]] + [backtype.storm.log :refer [log-debug log-message log-warn]] [archive-bolt.backends.core :refer [store filter-from-backend]] [archive-bolt.fields :as fields]) (:gen-class)) @@ -35,8 +35,8 @@ results (filter-from-backend backend conf location {:filter-fn filter-fn})] (if (seq results) (emit-bolt! collector [meta results] :anchor tuple) - (log-debug (format "No results returned from %s backend at %s" - backend location))) + (log-message (format "No results returned from %s backend at %s" + backend location))) (ack! collector tuple))) (defbolt archive-read fields/archive-read-output-fields diff --git a/test/archive_bolt/backends/s3_test.clj b/test/archive_bolt/backends/s3_test.clj index 27810f0..9beec50 100644 --- a/test/archive_bolt/backends/s3_test.clj +++ b/test/archive_bolt/backends/s3_test.clj @@ -71,7 +71,7 @@ (backend/filter-from-backend :s3 (get-test-conf) test-location - (fn [coll] (set coll))))))) + {:filter-fn (fn [coll] (set coll))}))))) (defn mock-list-objects [_ & {:keys [bucket-name prefix marker]}] diff --git a/test/archive_bolt/storm_test.clj b/test/archive_bolt/storm_test.clj index 09954a0..cfbc8c5 100644 --- a/test/archive_bolt/storm_test.clj +++ b/test/archive_bolt/storm_test.clj @@ -1,9 +1,12 @@ (ns archive-bolt.storm-test - (:require [clojure.test :refer :all] + (:require [clojure.test :refer :all] [backtype.storm.clojure :refer :all] [backtype.storm.testing :refer :all] [amazonica.aws.s3 :refer [get-object delete-object]] - [archive-bolt.storm :refer [archive archive-read archive-read-filtered]] + [archive-bolt.storm :refer [archive + archive-read + archive-read-filtered + archive-read-chunked]] [archive-bolt.fields :refer [archive-input-fields archive-read-input-fields archive-output-fields]] @@ -42,6 +45,13 @@ {"1" (spout-spec mock-read-spout)} {"2" (bolt-spec {"1" :shuffle} (archive-read-filtered `test-filter-fn))})) +(defn mk-test-read-chunked-topology + "Returns a Storm topology for testing the archive bolt" + [] + (topology + {"1" (spout-spec mock-read-spout)} + {"2" (bolt-spec {"1" :shuffle} (archive-read-chunked 1))})) + (deftest test-archive-write-bolt "Test the topology on a local cluster" (with-simulated-time-local-cluster [cluster] @@ -58,10 +68,10 @@ expected (get-object test-creds test-bucket-name test-location) expected-content (-> expected :input-stream slurp)] ;; Clean up - (delete-object test-creds test-bucket-name test-location) + (delete-object test-creds test-bucket-name test-location) ;; Verify that the side effect of writing to s3 worked (is (= test-content-str expected-content)) - ;; Check the output of the bolt matches expected tuple output + ;; Check the output of the bolt matches expected tuple output ;; Order is not guaranteed so we are using the built in storm ;; equality function ms= rather than = (is (ms= [[{} (str "s3://" test-bucket-name "/" test-location)]] @@ -74,7 +84,6 @@ ;; This becomes the input to the archive bolt mock-sources {"1" [[{} "s3" test-location]]} topo (mk-test-read-topology) - bucket-name "dev.shareablee.com" results (complete-topology cluster topo :storm-conf test-conf @@ -83,7 +92,7 @@ :full-path test-key :file-name test-file-name} :value test-content}]] - ;; Check the output of the bolt matches expected tuple output + ;; Check the output of the bolt matches expected tuple output ;; Order is not guaranteed so we are using the built in storm ;; equality function ms= rather than = (is (ms= [[{} expected]] @@ -95,7 +104,6 @@ (let [test-conf (get-test-conf) mock-sources {"1" [[{} "s3" test-location]]} topo (mk-test-read-filtered-topology) - bucket-name (get test-conf "S3_BUCKET") results (complete-topology cluster topo :storm-conf test-conf @@ -104,7 +112,27 @@ :full-path test-key :file-name test-file-name} :value test-content}]] - ;; Check the output of the bolt matches expected tuple output + ;; Check the output of the bolt matches expected tuple output + ;; Order is not guaranteed so we are using the built in storm + ;; equality function ms= rather than = + (is (ms= [[{} expected]] + (read-tuples results "2"))))))) + +(deftest test-archive-read-chunked-bolt + (with-s3-key + #(with-simulated-time-local-cluster [cluster] + (let [test-conf (get-test-conf) + mock-sources {"1" [[{} "s3" test-location]]} + topo (mk-test-read-chunked-topology) + results (complete-topology cluster + topo + :storm-conf test-conf + :mock-sources mock-sources) + expected [{:meta {:location test-location + :full-path test-key + :file-name test-file-name} + :value test-content}]] + ;; Check the output of the bolt matches expected tuple output ;; Order is not guaranteed so we are using the built in storm ;; equality function ms= rather than = (is (ms= [[{} expected]]