|
22 | 22 | [puppetlabs.puppetdb.query-eng.default-reports :as dr] |
23 | 23 | [puppetlabs.puppetdb.scf.storage-utils :as sutils] |
24 | 24 | [puppetlabs.puppetdb.schema :as pls] |
25 | | - [puppetlabs.puppetdb.utils :as utils] |
| 25 | + [puppetlabs.puppetdb.utils :as utils :refer [with-log-mdc]] |
26 | 26 | [puppetlabs.puppetdb.utils.string-formatter :as formatter] |
27 | 27 | [puppetlabs.puppetdb.query-eng.engine :as eng] |
28 | 28 | [ring.util.io :as rio] |
|
212 | 212 | ;; For now, generate the ids here; perhaps later, higher up |
213 | 213 | (assert (not (:query-id context))) |
214 | 214 | (let [query-id (str (java.util.UUID/randomUUID)) |
215 | | - context (assoc context :query-id query-id)] |
216 | | - (let [{:keys [scf-read-db url-prefix warn-experimental pretty-print log-queries] |
217 | | - :or {warn-experimental true}} context |
218 | | - {:keys [remaining-query entity]} (eng/parse-query-context version query warn-experimental) |
219 | | - munge-fn (get-munge-fn entity version options url-prefix)] |
| 215 | + context (assoc context :query-id query-id) |
| 216 | + origin (:origin options)] |
| 217 | + (with-log-mdc ["pdb-query-id" query-id |
| 218 | + "pdb-query-origin" origin] |
| 219 | + (let [{:keys [scf-read-db url-prefix warn-experimental pretty-print log-queries] |
| 220 | + :or {warn-experimental true}} context |
| 221 | + {:keys [remaining-query entity]} (eng/parse-query-context version query warn-experimental) |
| 222 | + munge-fn (get-munge-fn entity version options url-prefix)] |
220 | 223 |
|
221 | | - (when log-queries |
222 | | - ;; Log origin and AST of incoming query |
223 | | - (log/infof "PDBQuery:%s:%s" |
224 | | - query-id (-> (sorted-map :origin (:origin options) :ast query) |
225 | | - json/generate-string))) |
| 224 | + (when log-queries |
| 225 | + ;; Log origin and AST of incoming query |
| 226 | + (log/infof "PDBQuery:%s:%s" |
| 227 | + query-id (-> (sorted-map :origin origin :ast query) |
| 228 | + json/generate-string))) |
226 | 229 |
|
227 | | - (let [f #(let [{:keys [results-query]} |
228 | | - (query->sql remaining-query entity version |
229 | | - options |
230 | | - (select-keys context [:log-queries :query-id]))] |
231 | | - (jdbc/call-with-array-converted-query-rows results-query |
232 | | - (comp row-fn munge-fn)))] |
233 | | - (if use-preferred-streaming-method? |
234 | | - (jdbc/with-db-connection scf-read-db (jdbc/with-db-transaction [] (f))) |
235 | | - (jdbc/with-transacted-connection scf-read-db (f)))))))) |
| 230 | + (let [f #(let [{:keys [results-query]} |
| 231 | + (query->sql remaining-query entity version |
| 232 | + options |
| 233 | + (select-keys context [:log-queries :query-id]))] |
| 234 | + (jdbc/call-with-array-converted-query-rows results-query |
| 235 | + (comp row-fn munge-fn)))] |
| 236 | + (if use-preferred-streaming-method? |
| 237 | + (jdbc/with-db-connection scf-read-db (jdbc/with-db-transaction [] (f))) |
| 238 | + (jdbc/with-transacted-connection scf-read-db (f))))))))) |
236 | 239 |
|
237 | 240 | ;; Do we still need this, i.e. do we need the pass-through, and the |
238 | 241 | ;; strict selectivity in the caller below? |
|
325 | 328 | stream (generated-stream |
326 | 329 | ;; Runs in a future |
327 | 330 | (fn [out] |
328 | | - (with-open! [out (io/writer out :encoding "UTF-8")] |
329 | | - (try |
330 | | - (jdbc/with-db-connection db |
331 | | - (jdbc/with-db-transaction [] |
332 | | - (let [{:keys [results-query count-query]} |
333 | | - (query->sql query entity version query-options context) |
334 | | - st (when count-query |
335 | | - {:count (jdbc/get-result-count count-query)}) |
336 | | - stream-row (fn [row] |
337 | | - (let [r (munge-fn row)] |
338 | | - (when-not (instance? PGobject r) |
339 | | - (first r)) |
340 | | - (when-not (realized? status) |
341 | | - (deliver status st)) |
342 | | - (try |
343 | | - (http/stream-json r out pretty-print) |
344 | | - (catch IOException ex |
345 | | - (log/debug ex (trs "Unable to stream response: {0}" |
346 | | - (.getMessage ex))) |
347 | | - (throw quiet-exit)))))] |
348 | | - (jdbc/call-with-array-converted-query-rows results-query |
349 | | - stream-row) |
350 | | - (when-not (realized? status) |
351 | | - (deliver status st))))) |
352 | | - (catch Exception ex |
353 | | - ;; If it's an exit, we've already handled it. |
354 | | - (when-not (identical? quiet-exit ex) |
| 331 | + (with-log-mdc ["pdb-query-id" query-id |
| 332 | + "pdb-query-origin" (:origin query-options)] |
| 333 | + (with-open! [out (io/writer out :encoding "UTF-8")] |
| 334 | + (try |
| 335 | + (jdbc/with-db-connection db |
| 336 | + (jdbc/with-db-transaction [] |
| 337 | + (let [{:keys [results-query count-query]} |
| 338 | + (query->sql query entity version query-options context) |
| 339 | + st (when count-query |
| 340 | + {:count (jdbc/get-result-count count-query)}) |
| 341 | + stream-row (fn [row] |
| 342 | + (let [r (munge-fn row)] |
| 343 | + (when-not (instance? PGobject r) |
| 344 | + (first r)) |
| 345 | + (when-not (realized? status) |
| 346 | + (deliver status st)) |
| 347 | + (try |
| 348 | + (http/stream-json r out pretty-print) |
| 349 | + (catch IOException ex |
| 350 | + (log/debug ex (trs "Unable to stream response: {0}" |
| 351 | + (.getMessage ex))) |
| 352 | + (throw quiet-exit)))))] |
| 353 | + (jdbc/call-with-array-converted-query-rows results-query |
| 354 | + stream-row) |
| 355 | + (when-not (realized? status) |
| 356 | + (deliver status st))))) |
| 357 | + (catch Exception ex |
| 358 | + ;; If it's an exit, we've already handled it. |
| 359 | + (when-not (identical? quiet-exit ex) |
| 360 | + (if (realized? status) |
| 361 | + (throw ex) |
| 362 | + (deliver status {:error ex})))) |
| 363 | + (catch Throwable ex |
355 | 364 | (if (realized? status) |
356 | | - (throw ex) |
357 | | - (deliver status {:error ex})))) |
358 | | - (catch Throwable ex |
359 | | - (if (realized? status) |
360 | | - (do |
361 | | - (log/error ex (trs "Query streaming failed: {0} {1}" |
362 | | - query query-options)) |
363 | | - (throw ex)) |
364 | | - (deliver status {:error ex})))))))] |
| 365 | + (do |
| 366 | + (log/error ex (trs "Query streaming failed: {0} {1}" |
| 367 | + query query-options)) |
| 368 | + (throw ex)) |
| 369 | + (deliver status {:error ex}))))))))] |
365 | 370 | {:status status |
366 | 371 | :stream stream})) |
367 | 372 |
|
|
417 | 422 | :or {warn-experimental true}} context |
418 | 423 | query-config (select-keys context [:node-purge-ttl :add-agent-report-filter]) |
419 | 424 | {:keys [query remaining-query entity query-options]} |
420 | | - (user-query->engine-query version query-map query-config warn-experimental)] |
| 425 | + (user-query->engine-query version query-map query-config warn-experimental) |
| 426 | + origin (:origin query-map)] |
421 | 427 |
|
422 | 428 | (when log-queries |
423 | 429 | ;; Log origin and AST of incoming query |
424 | | - (let [{:keys [origin query]} query-map] |
| 430 | + (let [query (:query query-map)] |
425 | 431 | (log/infof "PDBQuery:%s:%s" |
426 | 432 | query-id (-> (sorted-map :origin origin :ast query) |
427 | 433 | json/generate-string)))) |
|
437 | 443 | query-error (promise) |
438 | 444 | resp (http/streamed-response |
439 | 445 | buffer |
440 | | - (try (jdbc/with-transacted-connection scf-read-db |
441 | | - (jdbc/call-with-array-converted-query-rows |
442 | | - results-query |
443 | | - (comp #(http/stream-json % buffer pretty-print) |
444 | | - #(do (when-not (instance? PGobject %) |
445 | | - (first %)) |
446 | | - (deliver query-error nil) %) |
447 | | - (munge-fn-hook munge-fn)))) |
448 | | - ;; catch throwable to make sure any trouble is forwarded to the |
449 | | - ;; query-error promise below. If something throws and is not passed |
450 | | - ;; along the deref will block indefinitely. |
451 | | - (catch Throwable e |
452 | | - (deliver query-error e))))] |
| 446 | + (with-log-mdc ["pdb-query-id" query-id |
| 447 | + "pdb-query-origin" origin] |
| 448 | + (try (jdbc/with-transacted-connection scf-read-db |
| 449 | + (jdbc/call-with-array-converted-query-rows |
| 450 | + results-query |
| 451 | + (comp #(http/stream-json % buffer pretty-print) |
| 452 | + #(do (when-not (instance? PGobject %) |
| 453 | + (first %)) |
| 454 | + (deliver query-error nil) %) |
| 455 | + (munge-fn-hook munge-fn)))) |
| 456 | + ;; catch throwable to make sure any trouble is forwarded to the |
| 457 | + ;; query-error promise below. If something throws and is not passed |
| 458 | + ;; along the deref will block indefinitely. |
| 459 | + (catch Throwable e |
| 460 | + (deliver query-error e)))))] |
453 | 461 | (if @query-error |
454 | 462 | (throw @query-error) |
455 | 463 | (cond-> (http/json-response* resp) |
|
483 | 491 | context :- query-context-schema] |
484 | 492 | ;; For now, generate the ids here; perhaps later, higher up |
485 | 493 | (assert (not (:query-id context))) |
486 | | - (let [context (assoc context :query-id (str (java.util.UUID/randomUUID)))] |
487 | | - (if use-preferred-streaming-method? |
488 | | - (preferred-produce-streaming-body version query-map context) |
489 | | - (deprecated-produce-streaming-body version query-map context)))) |
| 494 | + (let [qid (str (java.util.UUID/randomUUID)) |
| 495 | + context (assoc context :query-id qid)] |
| 496 | + (with-log-mdc ["pdb-query-id" qid |
| 497 | + "pdb-query-origin" (:origin query-map)] |
| 498 | + (if use-preferred-streaming-method? |
| 499 | + (preferred-produce-streaming-body version query-map context) |
| 500 | + (deprecated-produce-streaming-body version query-map context))))) |
490 | 501 |
|
491 | 502 | (pls/defn-validated object-exists? :- s/Bool |
492 | 503 | "Returns true if an object exists." |
|
0 commit comments