diff --git a/src/archive_bolt/backends/core.clj b/src/archive_bolt/backends/core.clj index d41ebc8..b8ca587 100644 --- a/src/archive_bolt/backends/core.clj +++ b/src/archive_bolt/backends/core.clj @@ -29,5 +29,5 @@ (throw (Exception. (str "Backend not found for " backend)))) (defmethod filter-from-backend :s3 - [_ conf location & [pred-fn]] - (s3/filter-from-backend conf location pred-fn)) + [_ conf location & [opts]] + (s3/filter-from-backend conf location opts)) diff --git a/src/archive_bolt/backends/s3.clj b/src/archive_bolt/backends/s3.clj index d01278a..9a5f327 100644 --- a/src/archive_bolt/backends/s3.clj +++ b/src/archive_bolt/backends/s3.clj @@ -1,5 +1,5 @@ (ns archive-bolt.backends.s3 - (:require [backtype.storm.log :as storm] + (:require [backtype.storm.log :as storm] [amazonica.aws.s3 :as s3] [archive-bolt.utils :refer [get-or-throw]] [cheshire.core :as json] @@ -23,14 +23,14 @@ (str "s3://" bucket "/" location))) (defn safe-put - "Attempt to PUT the file to s3 returns full s3 path when successful or + "Attempt to PUT the file to s3 returns full s3 path when successful or nil if unsuccessful. Retries on failure up to max-retries times." [creds bucket location content & {:keys [retry-count max-retries wait-time] :or {retry-count 0, max-retries 10 wait-time 1000}}] ;; Store the content and return the location (try - (put-object creds bucket location content) + (put-object creds bucket location content) (catch Exception e (do (Thread/sleep wait-time) (storm/log-error e " Failed to store in s3. " @@ -68,7 +68,7 @@ :value (try (-> (s3/get-object credentials :bucket-name bucket-name :key key) - :input-stream + :input-stream slurp (json/parse-string true)) (catch Exception e @@ -79,26 +79,32 @@ (defn filter-from-backend "Search s3 for keys at the given location. Take the list of keys and look them up. If the results are paginated, recur until all results are returned. - Results are paginated by 1,000 keys as per the S3 API docs. Results keys are - filtered by filter-fn. Returns a collection of results." - [conf location & [filter-fn accum marker]] - (let [creds (mk-credentials conf) - ;; For backwards compatibility look for the old key as fallback - bucket-name (or (get conf "ARCHIVE_READ_S3_BUCKET") - (get conf "S3_BUCKET") - (throw (Exception. "Missing config field ARCHIVE_READ_S3_BUCKET"))) - ;; Search s3 for all keys at the location - search-results (s3/list-objects creds - :bucket-name bucket-name - :prefix location - :marker marker) - ;; Grab the keys and optionally filter them - keys ((or filter-fn identity) (get-keys-from-results search-results)) - values (pmap #(lookup-key creds bucket-name location %) keys) - result (concat accum values)] - ;; If there is a next marker then we should recur - (if-let [next-marker (:next-marker search-results)] - ;; NOTE optional args must be in a vector when using recur - (do (storm/log-message "Paging archive results at " location) - (recur conf location [filter-fn result next-marker])) - result))) + Results are paginated by ARCHIVE_READ_S3_MAX_KEYS keys as per the S3 + API docs. Results keys are filtered by filter-fn. Returns a lazy seq of results." + ([conf location] + (filter-from-backend conf location nil {})) + ([conf location {:keys [filter-fn marker] :as opts}] + (lazy-seq + (let [creds (mk-credentials conf) + ;; For backwards compatibility look for the old key as fallback + bucket-name (or (get conf "ARCHIVE_READ_S3_BUCKET") + (get conf "S3_BUCKET") + (throw (Exception. "Missing config field ARCHIVE_READ_S3_BUCKET"))) + max-keys (if-let [n-keys (get conf "ARCHIVE_READ_S3_MAX_KEYS")] + (Float. n-keys) + 1000) + ;; Search s3 for all keys at the location + search-results (s3/list-objects creds + :bucket-name bucket-name + :prefix location + :marker marker + :max-keys max-keys) + ;; Grab the keys and optionally filter them + ks ((or filter-fn identity) (get-keys-from-results search-results)) + values (pmap #(lookup-key creds bucket-name location %) ks)] + ;; If there is a next marker then we should recur + (if-let [next-marker (:next-marker search-results)] + (do (storm/log-message "Paging archive results at " location) + (concat values + (filter-from-backend conf location (assoc opts :marker next-marker)))) + values))))) diff --git a/src/archive_bolt/storm.clj b/src/archive_bolt/storm.clj index 2f47417..e70ee99 100644 --- a/src/archive_bolt/storm.clj +++ b/src/archive_bolt/storm.clj @@ -1,8 +1,8 @@ (ns archive-bolt.storm (:require [backtype.storm.clojure :refer [defbolt bolt emit-bolt! ack! fail!]] - [backtype.storm.log :refer [log-debug log-warn]] + [backtype.storm.log :refer [log-debug log-message log-warn]] [archive-bolt.backends.core :refer [store filter-from-backend]] - [archive-bolt.fields :as fields]) + [archive-bolt.fields :as fields]) (:gen-class)) @@ -32,11 +32,11 @@ [conf collector tuple & [filter-fn]] (let [{:keys [meta backend location]} tuple filter-fn (or filter-fn identity) - results (filter-from-backend backend conf location filter-fn)] + results (filter-from-backend backend conf location {:filter-fn filter-fn})] (if (seq results) (emit-bolt! collector [meta results] :anchor tuple) - (log-debug (format "No results returned from %s backend at %s" - backend location))) + (log-message (format "No results returned from %s backend at %s" + backend location))) (ack! collector tuple))) (defbolt archive-read fields/archive-read-output-fields @@ -51,3 +51,15 @@ {:prepare true :params [filter-fn]} [conf context collector] (bolt (execute [tuple] (-archive-read conf collector tuple (resolve filter-fn))))) + +(defbolt archive-read-chunked fields/archive-read-output-fields + {:prepare true :params [chunk-n]} + [conf context collector] + (bolt + (execute + [tuple] + (let [{:keys [meta backend location]} tuple + results (filter-from-backend backend conf location)] + (doseq [r (partition-all chunk-n results)] + (emit-bolt! collector [meta results] :anchor tuple)) + (ack! collector tuple))))) diff --git a/test/archive_bolt/backends/s3_test.clj b/test/archive_bolt/backends/s3_test.clj index b397c79..9beec50 100644 --- a/test/archive_bolt/backends/s3_test.clj +++ b/test/archive_bolt/backends/s3_test.clj @@ -52,11 +52,11 @@ (deftest test-filter-from-backend (with-s3-key - #(is (= (backend/filter-from-backend :s3 (get-test-conf) test-location) - [{:meta {:location test-location - :full-path test-key - :file-name test-file-name} - :value test-content}])))) + #(is (= [{:meta {:location test-location + :full-path test-key + :file-name test-file-name} + :value test-content}] + (backend/filter-from-backend :s3 (get-test-conf) test-location))))) (deftest test-filter-from-backend-no-results) #(is (= (backend/filter-from-backend :s3 (get-test-conf) test-location) @@ -67,21 +67,23 @@ {:object-summaries [{:key "foo"} {:key "foo"}] :next-marker nil}) s3-backend/lookup-key (fn [_ _ _ k] {k k})] - (is (= (backend/filter-from-backend :s3 + (is (= [{"foo" "foo"}] + (backend/filter-from-backend :s3 (get-test-conf) test-location - (fn [coll] (set coll))) - [{"foo" "foo"}])))) + {:filter-fn (fn [coll] (set coll))}))))) (defn mock-list-objects [_ & {:keys [bucket-name prefix marker]}] - (if (= marker 123) - {:object-summaries [{:key "foo"}] :next-marker nil} - {:object-summaries [{:key "bar"}] :next-marker 123})) + (if-not marker + {:object-summaries [{:key "foo"}] :next-marker 123} + (condp = marker + 123 {:object-summaries [{:key "bar"}] :next-marker 456} + 456 {:object-summaries [{:key "baz"}] :next-marker nil}))) (deftest test-filter-from-backend-pagination "Test paging through results of searching s3 without actually hitting s3" (with-redefs [s3/list-objects mock-list-objects - s3-backend/lookup-key (fn [_ _ _ k] {k k})] - (is (= (backend/filter-from-backend :s3 (get-test-conf) test-location) - [{"bar" "bar"} {"foo" "foo"}])))) + s3-backend/lookup-key (fn [_ _ _ k] k)] + (is (= ["foo" "bar" "baz"] + (backend/filter-from-backend :s3 (get-test-conf) test-location))))) diff --git a/test/archive_bolt/storm_test.clj b/test/archive_bolt/storm_test.clj index 09954a0..cfbc8c5 100644 --- a/test/archive_bolt/storm_test.clj +++ b/test/archive_bolt/storm_test.clj @@ -1,9 +1,12 @@ (ns archive-bolt.storm-test - (:require [clojure.test :refer :all] + (:require [clojure.test :refer :all] [backtype.storm.clojure :refer :all] [backtype.storm.testing :refer :all] [amazonica.aws.s3 :refer [get-object delete-object]] - [archive-bolt.storm :refer [archive archive-read archive-read-filtered]] + [archive-bolt.storm :refer [archive + archive-read + archive-read-filtered + archive-read-chunked]] [archive-bolt.fields :refer [archive-input-fields archive-read-input-fields archive-output-fields]] @@ -42,6 +45,13 @@ {"1" (spout-spec mock-read-spout)} {"2" (bolt-spec {"1" :shuffle} (archive-read-filtered `test-filter-fn))})) +(defn mk-test-read-chunked-topology + "Returns a Storm topology for testing the archive bolt" + [] + (topology + {"1" (spout-spec mock-read-spout)} + {"2" (bolt-spec {"1" :shuffle} (archive-read-chunked 1))})) + (deftest test-archive-write-bolt "Test the topology on a local cluster" (with-simulated-time-local-cluster [cluster] @@ -58,10 +68,10 @@ expected (get-object test-creds test-bucket-name test-location) expected-content (-> expected :input-stream slurp)] ;; Clean up - (delete-object test-creds test-bucket-name test-location) + (delete-object test-creds test-bucket-name test-location) ;; Verify that the side effect of writing to s3 worked (is (= test-content-str expected-content)) - ;; Check the output of the bolt matches expected tuple output + ;; Check the output of the bolt matches expected tuple output ;; Order is not guaranteed so we are using the built in storm ;; equality function ms= rather than = (is (ms= [[{} (str "s3://" test-bucket-name "/" test-location)]] @@ -74,7 +84,6 @@ ;; This becomes the input to the archive bolt mock-sources {"1" [[{} "s3" test-location]]} topo (mk-test-read-topology) - bucket-name "dev.shareablee.com" results (complete-topology cluster topo :storm-conf test-conf @@ -83,7 +92,7 @@ :full-path test-key :file-name test-file-name} :value test-content}]] - ;; Check the output of the bolt matches expected tuple output + ;; Check the output of the bolt matches expected tuple output ;; Order is not guaranteed so we are using the built in storm ;; equality function ms= rather than = (is (ms= [[{} expected]] @@ -95,7 +104,6 @@ (let [test-conf (get-test-conf) mock-sources {"1" [[{} "s3" test-location]]} topo (mk-test-read-filtered-topology) - bucket-name (get test-conf "S3_BUCKET") results (complete-topology cluster topo :storm-conf test-conf @@ -104,7 +112,27 @@ :full-path test-key :file-name test-file-name} :value test-content}]] - ;; Check the output of the bolt matches expected tuple output + ;; Check the output of the bolt matches expected tuple output + ;; Order is not guaranteed so we are using the built in storm + ;; equality function ms= rather than = + (is (ms= [[{} expected]] + (read-tuples results "2"))))))) + +(deftest test-archive-read-chunked-bolt + (with-s3-key + #(with-simulated-time-local-cluster [cluster] + (let [test-conf (get-test-conf) + mock-sources {"1" [[{} "s3" test-location]]} + topo (mk-test-read-chunked-topology) + results (complete-topology cluster + topo + :storm-conf test-conf + :mock-sources mock-sources) + expected [{:meta {:location test-location + :full-path test-key + :file-name test-file-name} + :value test-content}]] + ;; Check the output of the bolt matches expected tuple output ;; Order is not guaranteed so we are using the built in storm ;; equality function ms= rather than = (is (ms= [[{} expected]]