|
145 | 145 | "Converts a vector-structured `query` to a corresponding SQL query which will |
146 | 146 | return nodes matching the `query`." |
147 | 147 | ([query entity version options] |
148 | | - (query->sql query entity version options {})) |
| 148 | + (query->sql query entity version options nil)) |
149 | 149 | ([query entity version options context] |
150 | 150 | {:pre [((some-fn nil? sequential?) query)] |
151 | 151 | :post [(map? %) |
|
163 | 163 | (events/legacy-query->sql false version query options) |
164 | 164 |
|
165 | 165 | :else (regular-query->sql query entity options))] |
166 | | - (when-let [log-id (:log-id context)] |
| 166 | + (when (:log-queries context) |
167 | 167 | (let [{[sql & params] :results-query} result] |
168 | 168 | (log/infof "PDBQuery:%s:%s" |
169 | | - log-id (-> (sorted-map :origin (:origin options) |
170 | | - :sql sql |
171 | | - :params (vec params)) |
172 | | - json/generate-string)))) |
| 169 | + (:query-id context) |
| 170 | + (-> (sorted-map :origin (:origin options) |
| 171 | + :sql sql |
| 172 | + :params (vec params)) |
| 173 | + json/generate-string)))) |
173 | 174 | result))) |
174 | 175 |
|
175 | 176 | (defn get-munge-fn |
|
194 | 195 | :add-agent-report-filter Boolean |
195 | 196 | (s/optional-key :warn-experimental) Boolean |
196 | 197 | (s/optional-key :pretty-print) (s/maybe Boolean) |
197 | | - (s/optional-key :log-queries) Boolean}) |
| 198 | + (s/optional-key :log-queries) Boolean |
| 199 | + (s/optional-key :query-id) s/Str}) |
198 | 200 |
|
199 | 201 | (pls/defn-validated stream-query-result |
200 | 202 | "Given a query, and database connection, return a Ring response with the query |
|
207 | 209 | options |
208 | 210 | context :- query-context-schema |
209 | 211 | row-fn] |
210 | | - (let [{:keys [scf-read-db url-prefix warn-experimental pretty-print log-queries] |
211 | | - :or {warn-experimental true}} context |
212 | | - log-id (when log-queries (str (java.util.UUID/randomUUID))) |
213 | | - {:keys [remaining-query entity]} (eng/parse-query-context version query warn-experimental) |
214 | | - munge-fn (get-munge-fn entity version options url-prefix)] |
| 212 | + ;; For now, generate the ids here; perhaps later, higher up |
| 213 | + (assert (not (:query-id context))) |
| 214 | + (let [query-id (str (java.util.UUID/randomUUID)) |
| 215 | + context (assoc context :query-id query-id)] |
| 216 | + (let [{:keys [scf-read-db url-prefix warn-experimental pretty-print log-queries] |
| 217 | + :or {warn-experimental true}} context |
| 218 | + {:keys [remaining-query entity]} (eng/parse-query-context version query warn-experimental) |
| 219 | + munge-fn (get-munge-fn entity version options url-prefix)] |
215 | 220 |
|
216 | | - (when log-queries |
217 | | - ;; Log origin and AST of incoming query |
218 | | - (log/infof "PDBQuery:%s:%s" |
219 | | - log-id (-> (sorted-map :origin (:origin options) :ast query) |
220 | | - json/generate-string))) |
| 221 | + (when log-queries |
| 222 | + ;; Log origin and AST of incoming query |
| 223 | + (log/infof "PDBQuery:%s:%s" |
| 224 | + query-id (-> (sorted-map :origin (:origin options) :ast query) |
| 225 | + json/generate-string))) |
221 | 226 |
|
222 | | - (let [f #(let [{:keys [results-query]} |
223 | | - (query->sql remaining-query entity version |
224 | | - options {:log-id log-id})] |
225 | | - (jdbc/call-with-array-converted-query-rows results-query |
226 | | - (comp row-fn munge-fn)))] |
227 | | - (if use-preferred-streaming-method? |
228 | | - (jdbc/with-db-connection scf-read-db (jdbc/with-db-transaction [] (f))) |
229 | | - (jdbc/with-transacted-connection scf-read-db (f))))))) |
| 227 | + (let [f #(let [{:keys [results-query]} |
| 228 | + (query->sql remaining-query entity version |
| 229 | + options |
| 230 | + (select-keys context [:log-queries :query-id]))] |
| 231 | + (jdbc/call-with-array-converted-query-rows results-query |
| 232 | + (comp row-fn munge-fn)))] |
| 233 | + (if use-preferred-streaming-method? |
| 234 | + (jdbc/with-db-connection scf-read-db (jdbc/with-db-transaction [] (f))) |
| 235 | + (jdbc/with-transacted-connection scf-read-db (f)))))))) |
230 | 236 |
|
231 | 237 | ;; Do we still need this, i.e. do we need the pass-through, and the |
232 | 238 | ;; strict selectivity in the caller below? |
|
298 | 304 | if an exception happens before then. An exception thrown by the |
299 | 305 | future after that point will produce an exception from the next call |
300 | 306 | to the InputStream read or close methods." |
301 | | - [db query entity version query-options log-id munge-fn pretty-print] |
| 307 | + [db query entity version query-options munge-fn |
| 308 | + {:keys [log-queries pretty-print query-id] :as context}] |
302 | 309 | ;; Client disconnects present as generic IOExceptions from the |
303 | 310 | ;; output writer (via stream-json), and we just log them at debug |
304 | 311 | ;; level. For now, after the first row, there's nothing we can do |
|
323 | 330 | (jdbc/with-db-connection db |
324 | 331 | (jdbc/with-db-transaction [] |
325 | 332 | (let [{:keys [results-query count-query]} |
326 | | - (query->sql query entity version query-options |
327 | | - {:log-id log-id}) |
| 333 | + (query->sql query entity version query-options context) |
328 | 334 | st (when count-query |
329 | 335 | {:count (jdbc/get-result-count count-query)}) |
330 | 336 | stream-row (fn [row] |
|
361 | 367 |
|
362 | 368 | (defn preferred-produce-streaming-body |
363 | 369 | [version query-map context] |
364 | | - (let [{:keys [scf-read-db url-prefix warn-experimental pretty-print log-queries] |
| 370 | + (let [{:keys [scf-read-db url-prefix warn-experimental pretty-print |
| 371 | + log-queries query-id] |
365 | 372 | :or {warn-experimental true}} context |
366 | | - log-id (when log-queries (str (java.util.UUID/randomUUID))) |
367 | 373 | query-config (select-keys context [:node-purge-ttl :add-agent-report-filter]) |
368 | 374 | {:keys [query remaining-query entity query-options]} |
369 | 375 | (user-query->engine-query version query-map query-config warn-experimental)] |
|
372 | 378 | ;; Log origin and AST of incoming query |
373 | 379 | (let [{:keys [origin query]} query-map] |
374 | 380 | (log/infof "PDBQuery:%s:%s" |
375 | | - log-id (-> (sorted-map :origin origin :ast query) |
376 | | - json/generate-string)))) |
| 381 | + query-id (-> (sorted-map :origin origin :ast query) |
| 382 | + json/generate-string)))) |
377 | 383 |
|
378 | 384 | (try |
379 | 385 | (let [munge-fn (get-munge-fn entity version query-options url-prefix) |
| 386 | + stream-ctx (select-keys context [:log-queries :pretty-print :query-id]) |
380 | 387 | {:keys [status stream]} (body-stream scf-read-db |
381 | 388 | (coerce-from-json remaining-query) |
382 | 389 | entity version query-options |
383 | | - log-id munge-fn pretty-print)] |
| 390 | + munge-fn |
| 391 | + stream-ctx)] |
384 | 392 | (let [{:keys [count error]} @status] |
385 | 393 | (when error |
386 | 394 | (throw error)) |
387 | 395 | (cond-> (http/json-response* stream) |
388 | 396 | count (http/add-headers {:count count})))) |
389 | 397 | (catch JsonParseException ex |
390 | | - (log/error ex (trs "Unparsable query: {0} {1} {2}" log-id query query-options)) |
| 398 | + (log/error ex (trs "Unparsable query: {0} {1} {2}" query-id query query-options)) |
391 | 399 | (http/error-response ex)) |
392 | 400 | (catch IllegalArgumentException ex ;; thrown by (at least) munge-fn |
393 | | - (log/error ex (trs "Invalid query: {0} {1} {2}" log-id query query-options)) |
| 401 | + (log/error ex (trs "Invalid query: {0} {1} {2}" query-id query query-options)) |
394 | 402 | (http/error-response ex)) |
395 | 403 | (catch PSQLException ex |
396 | 404 | (when-not (= (.getSQLState ex) (jdbc/sql-state :invalid-regular-expression)) |
397 | 405 | (throw ex)) |
398 | 406 | (do |
399 | | - (log/debug ex (trs "Invalid query regex: {0} {1} {2}" log-id query query-options)) |
| 407 | + (log/debug ex (trs "Invalid query regex: {0} {1} {2}" query-id query query-options)) |
400 | 408 | (http/error-response ex)))))) |
401 | 409 |
|
402 | 410 | ;; for testing via with-redefs |
403 | 411 | (def munge-fn-hook identity) |
404 | 412 |
|
405 | 413 | (defn- deprecated-produce-streaming-body |
406 | 414 | [version query-map context] |
407 | | - (let [{:keys [scf-read-db url-prefix warn-experimental pretty-print log-queries] |
| 415 | + (let [{:keys [scf-read-db url-prefix warn-experimental pretty-print |
| 416 | + log-queries query-id] |
408 | 417 | :or {warn-experimental true}} context |
409 | | - log-id (when log-queries (str (java.util.UUID/randomUUID))) |
410 | 418 | query-config (select-keys context [:node-purge-ttl :add-agent-report-filter]) |
411 | 419 | {:keys [query remaining-query entity query-options]} |
412 | 420 | (user-query->engine-query version query-map query-config warn-experimental)] |
|
415 | 423 | ;; Log origin and AST of incoming query |
416 | 424 | (let [{:keys [origin query]} query-map] |
417 | 425 | (log/infof "PDBQuery:%s:%s" |
418 | | - log-id (-> (sorted-map :origin origin :ast query) |
419 | | - json/generate-string)))) |
| 426 | + query-id (-> (sorted-map :origin origin :ast query) |
| 427 | + json/generate-string)))) |
420 | 428 |
|
421 | 429 | (try |
422 | 430 | (jdbc/with-transacted-connection scf-read-db |
423 | 431 | (let [munge-fn (get-munge-fn entity version query-options url-prefix) |
424 | | - {:keys [results-query count-query]} (-> remaining-query |
425 | | - coerce-from-json |
426 | | - (query->sql entity |
427 | | - version |
428 | | - query-options |
429 | | - {:log-id log-id})) |
| 432 | + {:keys [results-query count-query]} |
| 433 | + (-> remaining-query |
| 434 | + coerce-from-json |
| 435 | + (query->sql entity version query-options |
| 436 | + (select-keys context [:log-queries :query-id]))) |
430 | 437 | query-error (promise) |
431 | 438 | resp (http/streamed-response |
432 | 439 | buffer |
|
474 | 481 | [version :- s/Keyword |
475 | 482 | query-map |
476 | 483 | context :- query-context-schema] |
477 | | - (if use-preferred-streaming-method? |
478 | | - (preferred-produce-streaming-body version query-map context) |
479 | | - (deprecated-produce-streaming-body version query-map context))) |
| 484 | + ;; For now, generate the ids here; perhaps later, higher up |
| 485 | + (assert (not (:query-id context))) |
| 486 | + (let [context (assoc context :query-id (str (java.util.UUID/randomUUID)))] |
| 487 | + (if use-preferred-streaming-method? |
| 488 | + (preferred-produce-streaming-body version query-map context) |
| 489 | + (deprecated-produce-streaming-body version query-map context)))) |
480 | 490 |
|
481 | 491 | (pls/defn-validated object-exists? :- s/Bool |
482 | 492 | "Returns true if an object exists." |
|
0 commit comments