Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 74 additions & 56 deletions services/data/postgres_async_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ async def _init(self, db_conf: DBConfiguration, create_triggers=DB_TRIGGER_CREAT

break # Break the retry loop
except Exception as e:
self.logger.exception("Exception occured")
self.logger.exception("Exception occurred")
if retries - i <= 1:
raise e
time.sleep(connection_retry_wait_time_seconds)
Expand Down Expand Up @@ -296,13 +296,12 @@ async def create_record(self, record_dict):
return aiopg_exception_handling(error)

async def update_row(self, filter_dict={}, update_dict={}):
query_params = {}
# generate where clause
filters = []
for col_name, col_val in filter_dict.items():
v = str(col_val).strip("'")
if not v.isnumeric():
v = "'" + v + "'"
filters.append(col_name + "=" + str(v))
query_params['_filter_%s' % col_name] = col_val
filters.append('%s = %%(_filter_%s)s' % (col_name, col_name))

seperator = " and "
where_clause = ""
Expand All @@ -311,11 +310,12 @@ async def update_row(self, filter_dict={}, update_dict={}):

sets = []
for col_name, col_val in update_dict.items():
sets.append(col_name + " = " + str(col_val))
query_params['_set_%s' % col_name] = col_val
sets.append('%s = %%(_filter_%s)s' % (col_name, col_name))

set_seperator = ", "
set_clause = ""
if bool(filter_dict):
if bool(sets):
set_clause = set_seperator.join(sets)
update_sql = """
UPDATE {0} SET {1} WHERE {2};
Expand All @@ -326,7 +326,7 @@ async def update_row(self, filter_dict={}, update_dict={}):
cursor_factory=psycopg2.extras.DictCursor
)
) as cur:
await cur.execute(update_sql)
await cur.execute(update_sql, query_params)
if cur.rowcount < 1:
return DBResponse(response_code=404,
body={"msg": "could not find row"})
Expand All @@ -338,7 +338,7 @@ async def update_row(self, filter_dict={}, update_dict={}):
cur.close()
return DBResponse(response_code=200, body=body)
except (Exception, psycopg2.DatabaseError) as error:
self.db.logger.exception("Exception occured")
self.db.logger.exception("Exception occurred")
return aiopg_exception_handling(error)


Expand Down Expand Up @@ -466,6 +466,10 @@ class AsyncFlowTablePostgres(AsyncPostgresTable):
)
_row_type = FlowRow

@staticmethod
def get_filter_dict(flow_id: str):
return {"flow_id": flow_id}

async def add_flow(self, flow: FlowRow):
dict = {
"flow_id": flow.flow_id,
Expand All @@ -476,7 +480,7 @@ async def add_flow(self, flow: FlowRow):
return await self.create_record(dict)

async def get_flow(self, flow_id: str):
filter_dict = {"flow_id": flow_id}
filter_dict = self.get_filter_dict(flow_id)
return await self.get_records(filter_dict=filter_dict, fetch_single=True)

async def get_all_flows(self):
Expand Down Expand Up @@ -523,9 +527,13 @@ async def add_run(self, run: RunRow):
}
return await self.create_record(dict)

async def get_run(self, flow_id: str, run_id: str, expanded: bool = False):
@staticmethod
def get_filter_dict(flow_id: str, run_id: str):
key, value = translate_run_key(run_id)
filter_dict = {"flow_id": flow_id, key: str(value)}
return {"flow_id": flow_id, key: str(value)}

async def get_run(self, flow_id: str, run_id: str, expanded: bool = False):
filter_dict = self.get_filter_dict(flow_id, run_id)
return await self.get_records(filter_dict=filter_dict,
fetch_single=True, expanded=expanded)

Expand All @@ -534,9 +542,7 @@ async def get_all_runs(self, flow_id: str):
return await self.get_records(filter_dict=filter_dict)

async def update_heartbeat(self, flow_id: str, run_id: str):
run_key, run_value = translate_run_key(run_id)
filter_dict = {"flow_id": flow_id,
run_key: str(run_value)}
filter_dict = self.get_filter_dict(flow_id, run_id)
set_dict = {
"last_heartbeat_ts": int(datetime.datetime.utcnow().timestamp())
}
Expand Down Expand Up @@ -589,19 +595,23 @@ async def add_step(self, step_object: StepRow):
}
return await self.create_record(dict)

@staticmethod
def get_filter_dict(flow_id: str, run_id: str, step_name: str):
run_id_key, run_id_value = translate_run_key(run_id)
return {
"flow_id": flow_id,
run_id_key: run_id_value,
"step_name": step_name,
}

async def get_steps(self, flow_id: str, run_id: str):
run_id_key, run_id_value = translate_run_key(run_id)
filter_dict = {"flow_id": flow_id,
run_id_key: run_id_value}
return await self.get_records(filter_dict=filter_dict)

async def get_step(self, flow_id: str, run_id: str, step_name: str):
run_id_key, run_id_value = translate_run_key(run_id)
filter_dict = {
"flow_id": flow_id,
run_id_key: run_id_value,
"step_name": step_name,
}
filter_dict = self.get_filter_dict(flow_id, run_id, step_name)
return await self.get_records(filter_dict=filter_dict, fetch_single=True)


Expand Down Expand Up @@ -651,36 +661,35 @@ async def add_task(self, task: TaskRow):
}
return await self.create_record(dict)

async def get_tasks(self, flow_id: str, run_id: str, step_name: str):
@staticmethod
def get_filter_dict(flow_id: str, run_id: str, step_name: str, task_id: str):
run_id_key, run_id_value = translate_run_key(run_id)
filter_dict = {
task_id_key, task_id_value = translate_task_key(task_id)
return {
"flow_id": flow_id,
run_id_key: run_id_value,
"step_name": step_name,
task_id_key: task_id_value,
}
return await self.get_records(filter_dict=filter_dict)

async def get_task(self, flow_id: str, run_id: str, step_name: str,
task_id: str, expanded: bool = False):
async def get_tasks(self, flow_id: str, run_id: str, step_name: str):
run_id_key, run_id_value = translate_run_key(run_id)
task_id_key, task_id_value = translate_task_key(task_id)
filter_dict = {
"flow_id": flow_id,
run_id_key: run_id_value,
"step_name": step_name,
task_id_key: task_id_value,
}
return await self.get_records(filter_dict=filter_dict)

async def get_task(self, flow_id: str, run_id: str, step_name: str,
task_id: str, expanded: bool = False):
filter_dict = self.get_filter_dict(flow_id, run_id, step_name, task_id)
return await self.get_records(filter_dict=filter_dict,
fetch_single=True, expanded=expanded)

async def update_heartbeat(self, flow_id: str, run_id: str, step_name: str,
task_id: str):
run_key, run_value = translate_run_key(run_id)
task_key, task_value = translate_task_key(task_id)
filter_dict = {"flow_id": flow_id,
run_key: str(run_value),
"step_name": step_name,
task_key: str(task_value)}
filter_dict = self.get_filter_dict(flow_id, run_id, step_name, task_id)
set_dict = {
"last_heartbeat_ts": int(datetime.datetime.utcnow().timestamp())
}
Expand Down Expand Up @@ -757,23 +766,27 @@ async def add_metadata(
}
return await self.create_record(dict)

@staticmethod
def get_filter_dict(flow_id: str, run_id: str, step_name: str, task_id: str):
run_id_key, run_id_value = translate_run_key(run_id)
task_id_key, task_id_value = translate_task_key(task_id)
return {
"flow_id": flow_id,
run_id_key: run_id_value,
"step_name": step_name,
task_id_key: task_id_value,
}

async def get_metadata_in_runs(self, flow_id: str, run_id: str):
run_id_key, run_id_value = translate_run_key(run_id)
filter_dict = {"flow_id": flow_id,
run_id_key: run_id_value}
return await self.get_records(filter_dict=filter_dict)

async def get_metadata(
self, flow_id: str, run_id: int, step_name: str, task_id: str
self, flow_id: str, run_id: str, step_name: str, task_id: str
):
run_id_key, run_id_value = translate_run_key(run_id)
task_id_key, task_id_value = translate_task_key(task_id)
filter_dict = {
"flow_id": flow_id,
run_id_key: run_id_value,
"step_name": step_name,
task_id_key: task_id_value,
}
filter_dict = self.get_filter_dict(flow_id, run_id, step_name, task_id)
return await self.get_records(filter_dict=filter_dict)


Expand Down Expand Up @@ -856,7 +869,20 @@ async def add_artifact(
}
return await self.create_record(dict)

async def get_artifacts_in_runs(self, flow_id: str, run_id: int):
@staticmethod
def get_filter_dict(
flow_id: str, run_id: str, step_name: str, task_id: str, name: str):
run_id_key, run_id_value = translate_run_key(run_id)
task_id_key, task_id_value = translate_task_key(task_id)
return {
"flow_id": flow_id,
run_id_key: run_id_value,
"step_name": step_name,
task_id_key: task_id_value,
'"name"': name,
}

async def get_artifacts_in_runs(self, flow_id: str, run_id: str):
run_id_key, run_id_value = translate_run_key(run_id)
filter_dict = {
"flow_id": flow_id,
Expand All @@ -865,7 +891,7 @@ async def get_artifacts_in_runs(self, flow_id: str, run_id: int):
return await self.get_records(filter_dict=filter_dict,
ordering=self.ordering)

async def get_artifact_in_steps(self, flow_id: str, run_id: int, step_name: str):
async def get_artifact_in_steps(self, flow_id: str, run_id: str, step_name: str):
run_id_key, run_id_value = translate_run_key(run_id)
filter_dict = {
"flow_id": flow_id,
Expand All @@ -876,7 +902,7 @@ async def get_artifact_in_steps(self, flow_id: str, run_id: int, step_name: str)
ordering=self.ordering)

async def get_artifact_in_task(
self, flow_id: str, run_id: int, step_name: str, task_id: int
self, flow_id: str, run_id: str, step_name: str, task_id: str
):
run_id_key, run_id_value = translate_run_key(run_id)
task_id_key, task_id_value = translate_task_key(task_id)
Expand All @@ -890,16 +916,8 @@ async def get_artifact_in_task(
ordering=self.ordering)

async def get_artifact(
self, flow_id: str, run_id: int, step_name: str, task_id: int, name: str
self, flow_id: str, run_id: str, step_name: str, task_id: str, name: str
):
run_id_key, run_id_value = translate_run_key(run_id)
task_id_key, task_id_value = translate_task_key(task_id)
filter_dict = {
"flow_id": flow_id,
run_id_key: run_id_value,
"step_name": step_name,
task_id_key: task_id_value,
'"name"': name,
}
filter_dict = self.get_filter_dict(flow_id, run_id, step_name, task_id, name)
return await self.get_records(filter_dict=filter_dict,
fetch_single=True, ordering=self.ordering)
Loading