Skip to content

Commit 80df8ab

Browse files
authored
implement get_transaction_query, to retrieve full query text for the … (#119)
* implement get_transaction_query, to retrieve full query text for the given tx, plus fix some type hints. * small bugfix to get_transaction_query
1 parent eda1b28 commit 80df8ab

File tree

1 file changed

+50
-48
lines changed

1 file changed

+50
-48
lines changed

railib/api.py

Lines changed: 50 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import io
2222
import logging
2323
from enum import Enum, unique
24-
from typing import List, Union
24+
from typing import Dict, List, Union
2525
from requests_toolbelt import multipart
2626
from . import rest
2727

@@ -228,7 +228,7 @@ def _mkurl(ctx: Context, path: str) -> str:
228228

229229

230230
# Retrieve an individual resource.
231-
def _get_resource(ctx: Context, path: str, key=None, **kwargs) -> dict:
231+
def _get_resource(ctx: Context, path: str, key=None, **kwargs) -> Dict:
232232
url = _mkurl(ctx, path)
233233
rsp = rest.get(ctx, url, **kwargs)
234234
rsp = json.loads(rsp.read())
@@ -413,57 +413,62 @@ def _create_mode(source_database: str, overwrite: bool) -> Mode:
413413
return result
414414

415415

416-
def delete_database(ctx: Context, database: str, **kwargs) -> dict:
416+
def delete_database(ctx: Context, database: str, **kwargs) -> Dict:
417417
data = {"name": database}
418418
url = _mkurl(ctx, PATH_DATABASE)
419419
rsp = rest.delete(ctx, url, data, **kwargs)
420420
return json.loads(rsp.read())
421421

422422

423-
def delete_engine(ctx: Context, engine: str, **kwargs) -> dict:
423+
def delete_engine(ctx: Context, engine: str, **kwargs) -> Dict:
424424
data = {"name": engine}
425425
url = _mkurl(ctx, PATH_ENGINE)
426426
rsp = rest.delete(ctx, url, data, **kwargs)
427427
return json.loads(rsp.read())
428428

429429

430-
def delete_user(ctx: Context, id: str, **kwargs) -> dict:
430+
def delete_user(ctx: Context, id: str, **kwargs) -> Dict:
431431
url = _mkurl(ctx, f"{PATH_USER}/{id}")
432432
rsp = rest.delete(ctx, url, None, **kwargs)
433433
return json.loads(rsp.read())
434434

435435

436-
def disable_user(ctx: Context, userid: str, **kwargs) -> dict:
436+
def disable_user(ctx: Context, userid: str, **kwargs) -> Dict:
437437
return update_user(ctx, userid, status="INACTIVE", **kwargs)
438438

439439

440-
def delete_oauth_client(ctx: Context, id: str, **kwargs) -> dict:
440+
def delete_oauth_client(ctx: Context, id: str, **kwargs) -> Dict:
441441
url = _mkurl(ctx, f"{PATH_OAUTH_CLIENT}/{id}")
442442
rsp = rest.delete(ctx, url, None, **kwargs)
443443
return json.loads(rsp.read())
444444

445445

446-
def enable_user(ctx: Context, userid: str, **kwargs) -> dict:
446+
def enable_user(ctx: Context, userid: str, **kwargs) -> Dict:
447447
return update_user(ctx, userid, status="ACTIVE", **kwargs)
448448

449449

450-
def get_engine(ctx: Context, engine: str, **kwargs) -> dict:
450+
def get_engine(ctx: Context, engine: str, **kwargs) -> Dict:
451451
return _get_resource(ctx, PATH_ENGINE, name=engine, deleted_on="", key="computes", **kwargs)
452452

453453

454-
def get_database(ctx: Context, database: str, **kwargs) -> dict:
454+
def get_database(ctx: Context, database: str, **kwargs) -> Dict:
455455
return _get_resource(ctx, PATH_DATABASE, name=database, key="databases", **kwargs)
456456

457457

458-
def get_oauth_client(ctx: Context, id: str, **kwargs) -> dict:
458+
def get_oauth_client(ctx: Context, id: str, **kwargs) -> Dict:
459459
return _get_resource(ctx, f"{PATH_OAUTH_CLIENT}/{id}", key="client", **kwargs)
460460

461461

462-
def get_transaction(ctx: Context, id: str, **kwargs) -> dict:
462+
def cancel_transaction(ctx: Context, id: str, **kwargs) -> Dict:
463+
rsp = rest.post(ctx, _mkurl(ctx, f"{PATH_TRANSACTIONS}/{id}/cancel"), {}, **kwargs)
464+
return json.loads(rsp.read())
465+
466+
467+
def get_transaction(ctx: Context, id: str, **kwargs) -> Dict:
463468
return _get_resource(ctx, f"{PATH_TRANSACTIONS}/{id}", key="transaction", **kwargs)
464469

465470

466-
def get_transaction_metadata(ctx: Context, id: str, **kwargs) -> list:
471+
def get_transaction_metadata(ctx: Context, id: str, **kwargs) -> List:
467472
headers = {"Accept": "application/x-protobuf"}
468473
url = _mkurl(ctx, f"{PATH_TRANSACTIONS}/{id}/metadata")
469474
rsp = rest.get(ctx, url, headers=headers, **kwargs)
@@ -474,15 +479,11 @@ def get_transaction_metadata(ctx: Context, id: str, **kwargs) -> list:
474479
raise Exception(f"invalid content type for metadata proto: {content_type}")
475480

476481

477-
def list_transactions(ctx: Context, **kwargs) -> list:
478-
return _get_collection(ctx, PATH_TRANSACTIONS, key="transactions", **kwargs)
479-
480-
481-
def get_transaction_problems(ctx: Context, id: str, **kwargs) -> list:
482+
def get_transaction_problems(ctx: Context, id: str, **kwargs) -> List:
482483
return _get_collection(ctx, f"{PATH_TRANSACTIONS}/{id}/problems", **kwargs)
483484

484485

485-
def get_transaction_results(ctx: Context, id: str, **kwargs) -> list:
486+
def get_transaction_results(ctx: Context, id: str, **kwargs) -> List:
486487
url = _mkurl(ctx, f"{PATH_TRANSACTIONS}/{id}/results")
487488
rsp = rest.get(ctx, url, **kwargs)
488489
content_type = rsp.headers.get("content-type", "")
@@ -495,43 +496,46 @@ def get_transaction_results(ctx: Context, id: str, **kwargs) -> list:
495496

496497
# When problems are part of the results relations, this function should be
497498
# deprecated, get_transaction_results should be called instead
498-
499-
500-
def get_transaction_results_and_problems(ctx: Context, id: str, **kwargs) -> list:
499+
def get_transaction_results_and_problems(ctx: Context, id: str, **kwargs) -> List:
501500
rsp = TransactionAsyncResponse()
502501
rsp.problems = get_transaction_problems(ctx, id, **kwargs)
503502
rsp.results = get_transaction_results(ctx, id, **kwargs)
504503
return rsp
505504

506505

507-
def cancel_transaction(ctx: Context, id: str, **kwargs) -> dict:
508-
rsp = rest.post(ctx, _mkurl(ctx, f"{PATH_TRANSACTIONS}/{id}/cancel"), {}, **kwargs)
509-
return json.loads(rsp.read())
506+
def get_transaction_query(ctx: Context, id: str, **kwargs) -> str:
507+
url = _mkurl(ctx, f"{PATH_TRANSACTIONS}/{id}/query")
508+
rsp = rest.get(ctx, url, **kwargs)
509+
return rsp.read().decode("utf-8")
510+
511+
512+
def list_transactions(ctx: Context, **kwargs) -> List:
513+
return _get_collection(ctx, PATH_TRANSACTIONS, key="transactions", **kwargs)
510514

511515

512-
def get_user(ctx: Context, userid: str, **kwargs) -> dict:
516+
def get_user(ctx: Context, userid: str, **kwargs) -> Dict:
513517
return _get_resource(ctx, f"{PATH_USER}/{userid}", name=userid, **kwargs)
514518

515519

516-
def list_engines(ctx: Context, state=None) -> list:
520+
def list_engines(ctx: Context, state=None) -> List:
517521
kwargs = {}
518522
if state is not None:
519523
kwargs["state"] = state
520524
return _get_collection(ctx, PATH_ENGINE, key="computes", **kwargs)
521525

522526

523-
def list_databases(ctx: Context, state=None) -> list:
527+
def list_databases(ctx: Context, state=None) -> List:
524528
kwargs = {}
525529
if state is not None:
526530
kwargs["state"] = state
527531
return _get_collection(ctx, PATH_DATABASE, key="databases", **kwargs)
528532

529533

530-
def list_users(ctx: Context, **kwargs) -> list:
534+
def list_users(ctx: Context, **kwargs) -> List:
531535
return _get_collection(ctx, PATH_USER, key="users", **kwargs)
532536

533537

534-
def list_oauth_clients(ctx: Context, **kwargs) -> list:
538+
def list_oauth_clients(ctx: Context, **kwargs) -> List:
535539
return _get_collection(ctx, PATH_OAUTH_CLIENT, key="clients", **kwargs)
536540

537541

@@ -595,7 +599,7 @@ def data(self):
595599
result["source_dbname"] = self.source_database
596600
return result
597601

598-
def run(self, ctx: Context, *args) -> dict:
602+
def run(self, ctx: Context, *args) -> Dict:
599603
data = self.data
600604
data["actions"] = self._actions(args)
601605
# several of the request params are duplicated in the query
@@ -653,11 +657,11 @@ def run(self, ctx: Context, command: str, language: str, inputs: dict = None, **
653657
raise Exception("invalid response type")
654658

655659

656-
def _delete_model_action(name: str) -> dict:
660+
def _delete_model_action(name: str) -> Dict:
657661
return {"type": "ModifyWorkspaceAction", "delete_source": [name]}
658662

659663

660-
def _install_model_action(name: str, model: str) -> dict:
664+
def _install_model_action(name: str, model: str) -> Dict:
661665
return {"type": "InstallAction", "sources": [_model(name, model)]}
662666

663667

@@ -670,7 +674,7 @@ def _list_edb_action():
670674

671675

672676
# Return rel key correponding to the given name and list of keys.
673-
def _rel_key(name: str, keys: list) -> dict:
677+
def _rel_key(name: str, keys: list) -> Dict:
674678
return {"type": "RelKey", "name": name, "keys": keys, "values": []}
675679

676680

@@ -682,7 +686,7 @@ def _rel_typename(v):
682686

683687

684688
# Return a qeury action input corresponding to the given name, value pair.
685-
def _query_action_input(name: str, value) -> dict:
689+
def _query_action_input(name: str, value) -> Dict:
686690
return {
687691
"columns": [[value]],
688692
"rel_key": _rel_key(name, [_rel_typename(value)]),
@@ -691,7 +695,7 @@ def _query_action_input(name: str, value) -> dict:
691695

692696

693697
# `inputs`: map of parameter name to input value
694-
def _query_action(model: str, inputs: dict = None, outputs: list = None) -> dict:
698+
def _query_action(model: str, inputs: dict = None, outputs: list = None) -> Dict:
695699
inputs = inputs or {}
696700
inputs = [_query_action_input(k, v) for k, v in inputs.items()]
697701
return {
@@ -703,7 +707,7 @@ def _query_action(model: str, inputs: dict = None, outputs: list = None) -> dict
703707
}
704708

705709

706-
def _model(name: str, model: str) -> dict:
710+
def _model(name: str, model: str) -> Dict:
707711
return {
708712
"type": "Source",
709713
"name": name,
@@ -713,7 +717,7 @@ def _model(name: str, model: str) -> dict:
713717

714718

715719
# Returns full list of models.
716-
def _list_models(ctx: Context, database: str, engine: str) -> dict:
720+
def _list_models(ctx: Context, database: str, engine: str) -> Dict:
717721
tx = Transaction(database, engine, mode=Mode.OPEN)
718722
rsp = tx.run(ctx, _list_action())
719723
actions = rsp["actions"]
@@ -723,14 +727,14 @@ def _list_models(ctx: Context, database: str, engine: str) -> dict:
723727
return models
724728

725729

726-
def create_database(ctx: Context, database: str, source: str = None, **kwargs) -> dict:
730+
def create_database(ctx: Context, database: str, source: str = None, **kwargs) -> Dict:
727731
data = {"name": database, "source_name": source}
728732
url = _mkurl(ctx, PATH_DATABASE)
729733
rsp = rest.put(ctx, url, data, **kwargs)
730734
return json.loads(rsp.read())
731735

732736

733-
def delete_model(ctx: Context, database: str, engine: str, model: str) -> dict:
737+
def delete_model(ctx: Context, database: str, engine: str, model: str) -> Dict:
734738
tx = Transaction(database, engine, mode=Mode.OPEN, readonly=False)
735739
actions = [_delete_model_action(model)]
736740
return tx.run(ctx, *actions)
@@ -745,13 +749,13 @@ def get_model(ctx: Context, database: str, engine: str, name: str) -> str:
745749
raise Exception(f"model '{name}' not found")
746750

747751

748-
def install_model(ctx: Context, database: str, engine: str, models: dict) -> dict:
752+
def install_model(ctx: Context, database: str, engine: str, models: dict) -> Dict:
749753
tx = Transaction(database, engine, mode=Mode.OPEN, readonly=False)
750754
actions = [_install_model_action(name, model) for name, model in models.items()]
751755
return tx.run(ctx, *actions)
752756

753757

754-
def list_edbs(ctx: Context, database: str, engine: str) -> list:
758+
def list_edbs(ctx: Context, database: str, engine: str) -> List:
755759
tx = Transaction(database, engine, mode=Mode.OPEN)
756760
rsp = tx.run(ctx, _list_edb_action())
757761
actions = rsp["actions"]
@@ -762,7 +766,7 @@ def list_edbs(ctx: Context, database: str, engine: str) -> list:
762766

763767

764768
# Returns a list of models installed in the given database.
765-
def list_models(ctx: Context, database: str, engine: str) -> list:
769+
def list_models(ctx: Context, database: str, engine: str) -> List:
766770
models = _list_models(ctx, database, engine)
767771
return [model["name"] for model in models]
768772

@@ -832,7 +836,7 @@ def load_csv(
832836
relation: str,
833837
data: str or io.TextIOBase,
834838
syntax: dict = {},
835-
) -> dict:
839+
) -> Dict:
836840
if isinstance(data, str):
837841
pass # ok
838842
elif isinstance(data, io.TextIOBase):
@@ -851,7 +855,7 @@ def load_json(
851855
engine: str,
852856
relation: str,
853857
data: str or io.TextIOBase,
854-
) -> dict:
858+
) -> Dict:
855859
if isinstance(data, str):
856860
pass # ok
857861
elif isinstance(data, io.TextIOBase):
@@ -870,14 +874,12 @@ def exec_v1(
870874
command: str,
871875
inputs: dict = None,
872876
readonly: bool = True,
873-
) -> dict:
877+
) -> Dict:
874878
tx = Transaction(database, engine, readonly=readonly)
875879
return tx.run(ctx, _query_action(command, inputs=inputs))
876880

877881

878882
# Answers if the given transaction state is a terminal state.
879-
880-
881883
def is_txn_term_state(state: str) -> bool:
882884
return state == "COMPLETED" or state == "ABORTED"
883885

0 commit comments

Comments
 (0)