Skip to content

Commit 87467fc

Browse files
salamehsameerageokolliasSameera SalamehSameera Salamehvilterp
authored
Call query_async and poll for state in run_query.py (#78)
* Return higher-level structures for txn v2 results * run_query to call query_async and poll for state * add get_transaction_results_and_problems * nits * nits * nits * changes after code review * bring names in line with other SDKs * no need for show.results(multipart) * show.results for query is physical * file renamed * adding execute.py * query rename to exec_v1 Co-authored-by: George Kollias <286415+geokollias@users.noreply.github.com> Co-authored-by: Sameera Salameh <sameera@sameeras-mbp.relational.ai.beta.tailscale.net> Co-authored-by: Sameera Salameh <sameera@sameeras-mbp.mynet> Co-authored-by: Pete Vilter <pete.vilter@gmail.com>
1 parent 6339abe commit 87467fc

File tree

4 files changed

+71
-7
lines changed

4 files changed

+71
-7
lines changed

examples/execute.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
# Copyright 2022 RelationalAI, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License
14+
15+
from argparse import ArgumentParser
16+
from urllib.request import HTTPError
17+
from railib import api, config, show
18+
19+
20+
def run(database: str, engine: str, command: str, readonly: bool, profile: str):
21+
cfg = config.read(profile=profile)
22+
ctx = api.Context(**cfg)
23+
rsp = api.exec(ctx, database, engine, command, readonly=readonly)
24+
show.results(rsp, "wire")
25+
26+
27+
if __name__ == "__main__":
28+
p = ArgumentParser()
29+
p.add_argument("database", type=str, help="database name")
30+
p.add_argument("engine", type=str, help="engine name")
31+
p.add_argument("command", type=str, help="rel source string")
32+
p.add_argument("--readonly", action="store_true", default=False,
33+
help="readonly query (default: false)")
34+
p.add_argument("-p", "--profile", type=str, default="default",
35+
help="profile name")
36+
args = p.parse_args()
37+
try:
38+
run(args.database, args.engine, args.command, args.readonly,
39+
args.profile)
40+
except HTTPError as e:
41+
show.http_error(e)
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
def run(database: str, engine: str, command: str, readonly: bool, profile: str):
2222
cfg = config.read(profile=profile)
2323
ctx = api.Context(**cfg)
24-
rsp = api.query_async(ctx, database, engine, command, readonly=readonly)
24+
rsp = api.exec_async(ctx, database, engine, command, readonly=readonly)
2525
print(json.dumps(rsp, indent=2))
2626

2727

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
def run(database: str, engine: str, command: str, readonly: bool, profile: str):
2121
cfg = config.read(profile=profile)
2222
ctx = api.Context(**cfg)
23-
rsp = api.query(ctx, database, engine, command, readonly=readonly)
23+
rsp = api.exec_v1(ctx, database, engine, command, readonly=readonly)
2424
show.results(rsp)
2525

2626

railib/api.py

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import io
1818
import json
1919
import pyarrow as pa
20+
import time
2021
from enum import Enum, unique
2122
from typing import List, Union
2223
from . import rest
@@ -655,7 +656,7 @@ def load_csv(ctx: Context, database: str, engine: str, relation: str,
655656
command = _gen_syntax_config(syntax)
656657
command += ("def config:data = data\n"
657658
"def insert:%s = load_csv[config]" % relation)
658-
return query(ctx, database, engine, command, inputs=inputs, readonly=False)
659+
return exec_v1(ctx, database, engine, command, inputs=inputs, readonly=False)
659660

660661

661662
def load_json(ctx: Context, database: str, engine: str, relation: str,
@@ -669,16 +670,38 @@ def load_json(ctx: Context, database: str, engine: str, relation: str,
669670
inputs = {'data': data}
670671
command = ("def config:data = data\n"
671672
"def insert:%s = load_json[config]" % relation)
672-
return query(ctx, database, engine, command, inputs=inputs, readonly=False)
673+
return exec_v1(ctx, database, engine, command, inputs=inputs, readonly=False)
673674

674-
675-
def query(ctx: Context, database: str, engine: str, command: str,
675+
def exec_v1(ctx: Context, database: str, engine: str, command: str,
676676
inputs: dict = None, readonly: bool = True) -> dict:
677677
tx = Transaction(database, engine, readonly=readonly)
678678
return tx.run(ctx, _query_action(command, inputs=inputs))
679679

680+
# Answers if the given transaction state is a terminal state.
681+
def is_txn_term_state(state: str) -> bool:
682+
return state == "COMPLETED" or state == "ABORTED"
683+
684+
def exec(ctx: Context, database: str, engine: str, command: str,
685+
inputs: dict = None, readonly: bool = True) -> list:
686+
async_result = exec_async(ctx, database, engine, command, readonly=readonly)
687+
if isinstance(async_result, list): # in case of if short-path, return results directly, no need to poll for state
688+
return async_result
689+
690+
rsp = []
691+
while True:
692+
time.sleep(3)
693+
txn = get_transaction(ctx, async_result["id"])
694+
if is_txn_term_state(txn["state"]):
695+
rsp.append(txn)
696+
rsp.append(get_transaction_metadata(ctx, txn["id"]))
697+
rsp.append(get_transaction_problems(ctx, txn["id"]))
698+
rsp.append(get_transaction_results(ctx, txn["id"]))
699+
break
700+
701+
return rsp
702+
680703

681-
def query_async(ctx: Context, database: str, engine: str, command: str,
704+
def exec_async(ctx: Context, database: str, engine: str, command: str,
682705
readonly: bool = True, inputs: dict = None) -> Union[dict, list]:
683706
tx = TransactionAsync(database, engine, readonly=readonly)
684707
return tx.run(ctx, command, inputs=inputs)

0 commit comments

Comments
 (0)