Skip to content
This repository was archived by the owner on Dec 20, 2019. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/archive_bolt/backends/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,5 @@
(throw (Exception. (str "Backend not found for " backend))))

(defmethod filter-from-backend :s3
[_ conf location & [pred-fn]]
(s3/filter-from-backend conf location pred-fn))
[_ conf location & [opts]]
(s3/filter-from-backend conf location opts))
60 changes: 33 additions & 27 deletions src/archive_bolt/backends/s3.clj
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
(ns archive-bolt.backends.s3
(:require [backtype.storm.log :as storm]
(:require [backtype.storm.log :as storm]
[amazonica.aws.s3 :as s3]
[archive-bolt.utils :refer [get-or-throw]]
[cheshire.core :as json]
Expand All @@ -23,14 +23,14 @@
(str "s3://" bucket "/" location)))

(defn safe-put
"Attempt to PUT the file to s3 returns full s3 path when successful or
"Attempt to PUT the file to s3 returns full s3 path when successful or
nil if unsuccessful. Retries on failure up to max-retries times."
[creds bucket location content
& {:keys [retry-count max-retries wait-time]
:or {retry-count 0, max-retries 10 wait-time 1000}}]
;; Store the content and return the location
(try
(put-object creds bucket location content)
(put-object creds bucket location content)
(catch Exception e
(do (Thread/sleep wait-time)
(storm/log-error e " Failed to store in s3. "
Expand Down Expand Up @@ -68,7 +68,7 @@
:value (try (-> (s3/get-object credentials
:bucket-name bucket-name
:key key)
:input-stream
:input-stream
slurp
(json/parse-string true))
(catch Exception e
Expand All @@ -79,26 +79,32 @@
(defn filter-from-backend
"Search s3 for keys at the given location. Take the list of keys and look
them up. If the results are paginated, recur until all results are returned.
Results are paginated by 1,000 keys as per the S3 API docs. Results keys are
filtered by filter-fn. Returns a collection of results."
[conf location & [filter-fn accum marker]]
(let [creds (mk-credentials conf)
;; For backwards compatibility look for the old key as fallback
bucket-name (or (get conf "ARCHIVE_READ_S3_BUCKET")
(get conf "S3_BUCKET")
(throw (Exception. "Missing config field ARCHIVE_READ_S3_BUCKET")))
;; Search s3 for all keys at the location
search-results (s3/list-objects creds
:bucket-name bucket-name
:prefix location
:marker marker)
;; Grab the keys and optionally filter them
keys ((or filter-fn identity) (get-keys-from-results search-results))
values (pmap #(lookup-key creds bucket-name location %) keys)
result (concat accum values)]
;; If there is a next marker then we should recur
(if-let [next-marker (:next-marker search-results)]
;; NOTE optional args must be in a vector when using recur
(do (storm/log-message "Paging archive results at " location)
(recur conf location [filter-fn result next-marker]))
result)))
Results are paginated by ARCHIVE_READ_S3_MAX_KEYS keys as per the S3
API docs. Results keys are filtered by filter-fn. Returns a lazy seq of results."
([conf location]
(filter-from-backend conf location nil {}))
([conf location {:keys [filter-fn marker] :as opts}]
(lazy-seq
(let [creds (mk-credentials conf)
;; For backwards compatibility look for the old key as fallback
bucket-name (or (get conf "ARCHIVE_READ_S3_BUCKET")
(get conf "S3_BUCKET")
(throw (Exception. "Missing config field ARCHIVE_READ_S3_BUCKET")))
max-keys (if-let [n-keys (get conf "ARCHIVE_READ_S3_MAX_KEYS")]
(Float. n-keys)
1000)
;; Search s3 for all keys at the location
search-results (s3/list-objects creds
:bucket-name bucket-name
:prefix location
:marker marker
:max-keys max-keys)
;; Grab the keys and optionally filter them
ks ((or filter-fn identity) (get-keys-from-results search-results))
values (pmap #(lookup-key creds bucket-name location %) ks)]
;; If there is a next marker then we should recur
(if-let [next-marker (:next-marker search-results)]
(do (storm/log-message "Paging archive results at " location)
(concat values
(filter-from-backend conf location (assoc opts :marker next-marker))))
values)))))
22 changes: 17 additions & 5 deletions src/archive_bolt/storm.clj
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
(ns archive-bolt.storm
(:require [backtype.storm.clojure :refer [defbolt bolt emit-bolt! ack! fail!]]
[backtype.storm.log :refer [log-debug log-warn]]
[backtype.storm.log :refer [log-debug log-message log-warn]]
[archive-bolt.backends.core :refer [store filter-from-backend]]
[archive-bolt.fields :as fields])
[archive-bolt.fields :as fields])
(:gen-class))


Expand Down Expand Up @@ -32,11 +32,11 @@
[conf collector tuple & [filter-fn]]
(let [{:keys [meta backend location]} tuple
filter-fn (or filter-fn identity)
results (filter-from-backend backend conf location filter-fn)]
results (filter-from-backend backend conf location {:filter-fn filter-fn})]
(if (seq results)
(emit-bolt! collector [meta results] :anchor tuple)
(log-debug (format "No results returned from %s backend at %s"
backend location)))
(log-message (format "No results returned from %s backend at %s"
backend location)))
(ack! collector tuple)))

(defbolt archive-read fields/archive-read-output-fields
Expand All @@ -51,3 +51,15 @@
{:prepare true :params [filter-fn]}
[conf context collector]
(bolt (execute [tuple] (-archive-read conf collector tuple (resolve filter-fn)))))

(defbolt archive-read-chunked fields/archive-read-output-fields
{:prepare true :params [chunk-n]}
[conf context collector]
(bolt
(execute
[tuple]
(let [{:keys [meta backend location]} tuple
results (filter-from-backend backend conf location)]
(doseq [r (partition-all chunk-n results)]
(emit-bolt! collector [meta results] :anchor tuple))
(ack! collector tuple)))))
30 changes: 16 additions & 14 deletions test/archive_bolt/backends/s3_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,11 @@

(deftest test-filter-from-backend
(with-s3-key
#(is (= (backend/filter-from-backend :s3 (get-test-conf) test-location)
[{:meta {:location test-location
:full-path test-key
:file-name test-file-name}
:value test-content}]))))
#(is (= [{:meta {:location test-location
:full-path test-key
:file-name test-file-name}
:value test-content}]
(backend/filter-from-backend :s3 (get-test-conf) test-location)))))

(deftest test-filter-from-backend-no-results)
#(is (= (backend/filter-from-backend :s3 (get-test-conf) test-location)
Expand All @@ -67,21 +67,23 @@
{:object-summaries [{:key "foo"} {:key "foo"}]
:next-marker nil})
s3-backend/lookup-key (fn [_ _ _ k] {k k})]
(is (= (backend/filter-from-backend :s3
(is (= [{"foo" "foo"}]
(backend/filter-from-backend :s3
(get-test-conf)
test-location
(fn [coll] (set coll)))
[{"foo" "foo"}]))))
{:filter-fn (fn [coll] (set coll))})))))

(defn mock-list-objects
[_ & {:keys [bucket-name prefix marker]}]
(if (= marker 123)
{:object-summaries [{:key "foo"}] :next-marker nil}
{:object-summaries [{:key "bar"}] :next-marker 123}))
(if-not marker
{:object-summaries [{:key "foo"}] :next-marker 123}
(condp = marker
123 {:object-summaries [{:key "bar"}] :next-marker 456}
456 {:object-summaries [{:key "baz"}] :next-marker nil})))

(deftest test-filter-from-backend-pagination
"Test paging through results of searching s3 without actually hitting s3"
(with-redefs [s3/list-objects mock-list-objects
s3-backend/lookup-key (fn [_ _ _ k] {k k})]
(is (= (backend/filter-from-backend :s3 (get-test-conf) test-location)
[{"bar" "bar"} {"foo" "foo"}]))))
s3-backend/lookup-key (fn [_ _ _ k] k)]
(is (= ["foo" "bar" "baz"]
(backend/filter-from-backend :s3 (get-test-conf) test-location)))))
44 changes: 36 additions & 8 deletions test/archive_bolt/storm_test.clj
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
(ns archive-bolt.storm-test
(:require [clojure.test :refer :all]
(:require [clojure.test :refer :all]
[backtype.storm.clojure :refer :all]
[backtype.storm.testing :refer :all]
[amazonica.aws.s3 :refer [get-object delete-object]]
[archive-bolt.storm :refer [archive archive-read archive-read-filtered]]
[archive-bolt.storm :refer [archive
archive-read
archive-read-filtered
archive-read-chunked]]
[archive-bolt.fields :refer [archive-input-fields
archive-read-input-fields
archive-output-fields]]
Expand Down Expand Up @@ -42,6 +45,13 @@
{"1" (spout-spec mock-read-spout)}
{"2" (bolt-spec {"1" :shuffle} (archive-read-filtered `test-filter-fn))}))

(defn mk-test-read-chunked-topology
"Returns a Storm topology for testing the archive bolt"
[]
(topology
{"1" (spout-spec mock-read-spout)}
{"2" (bolt-spec {"1" :shuffle} (archive-read-chunked 1))}))

(deftest test-archive-write-bolt
"Test the topology on a local cluster"
(with-simulated-time-local-cluster [cluster]
Expand All @@ -58,10 +68,10 @@
expected (get-object test-creds test-bucket-name test-location)
expected-content (-> expected :input-stream slurp)]
;; Clean up
(delete-object test-creds test-bucket-name test-location)
(delete-object test-creds test-bucket-name test-location)
;; Verify that the side effect of writing to s3 worked
(is (= test-content-str expected-content))
;; Check the output of the bolt matches expected tuple output
;; Check the output of the bolt matches expected tuple output
;; Order is not guaranteed so we are using the built in storm
;; equality function ms= rather than =
(is (ms= [[{} (str "s3://" test-bucket-name "/" test-location)]]
Expand All @@ -74,7 +84,6 @@
;; This becomes the input to the archive bolt
mock-sources {"1" [[{} "s3" test-location]]}
topo (mk-test-read-topology)
bucket-name "dev.shareablee.com"
results (complete-topology cluster
topo
:storm-conf test-conf
Expand All @@ -83,7 +92,7 @@
:full-path test-key
:file-name test-file-name}
:value test-content}]]
;; Check the output of the bolt matches expected tuple output
;; Check the output of the bolt matches expected tuple output
;; Order is not guaranteed so we are using the built in storm
;; equality function ms= rather than =
(is (ms= [[{} expected]]
Expand All @@ -95,7 +104,6 @@
(let [test-conf (get-test-conf)
mock-sources {"1" [[{} "s3" test-location]]}
topo (mk-test-read-filtered-topology)
bucket-name (get test-conf "S3_BUCKET")
results (complete-topology cluster
topo
:storm-conf test-conf
Expand All @@ -104,7 +112,27 @@
:full-path test-key
:file-name test-file-name}
:value test-content}]]
;; Check the output of the bolt matches expected tuple output
;; Check the output of the bolt matches expected tuple output
;; Order is not guaranteed so we are using the built in storm
;; equality function ms= rather than =
(is (ms= [[{} expected]]
(read-tuples results "2")))))))

(deftest test-archive-read-chunked-bolt
(with-s3-key
#(with-simulated-time-local-cluster [cluster]
(let [test-conf (get-test-conf)
mock-sources {"1" [[{} "s3" test-location]]}
topo (mk-test-read-chunked-topology)
results (complete-topology cluster
topo
:storm-conf test-conf
:mock-sources mock-sources)
expected [{:meta {:location test-location
:full-path test-key
:file-name test-file-name}
:value test-content}]]
;; Check the output of the bolt matches expected tuple output
;; Order is not guaranteed so we are using the built in storm
;; equality function ms= rather than =
(is (ms= [[{} expected]]
Expand Down