Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 46 additions & 29 deletions bq/utilities.py → bq/bq_utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,52 @@
import pytz
import argparse
import json5


from datetime import datetime, timedelta, timezone
import requests
import yaml


def get_data_from_comet(path, branch="current"):
file_url = f"https://raw.githubusercontent.com/ImagingDataCommons/idc-comet/{branch}/{path}"
headers = {
"Authorization": f"token {settings.GITHUB_TOKEN}",
"Accept": "application/vnd.github.v3+json"
}
response = requests.get(file_url, headers=headers)
if response.status_code == 200:
# Specify the local path where you want to save the file
metadata = yaml.load(StringIO(response.text), Loader=yaml.Loader)
return metadata["programs"]
else:
print(f"Failed to retrieve file. Status code: {response.status_code}")
exit(1)

# Create a table from a data frame. The table will be deleted after the time limit expires
def create_temp_table_from_df(client, table_id, schema, df, expire_in_minutes=10):
table = bigquery.Table(table_id)

# Set expiration to 2 minutes from now
expiration_duration = timedelta(minutes=expire_in_minutes)
table.expires = datetime.now(timezone.utc) + expiration_duration
try:
client.create_table(table, exists_ok=True)
# print(f"Table {table_id} created/updated with expiration at {table.expires}")
except Exception as e:
print(f"Error setting table metadata: {e}")
exit(1)

job_config = bigquery.LoadJobConfig(
schema=schema,
write_disposition="WRITE_TRUNCATE"
)
# 5. Load data
job = client.load_table_from_dataframe(
df, table_id, job_config=job_config
)
job.result() # Wait for job to complete


# Read the file at the file path into a dataframe. The file is assumed to be JSON formatted
def read_json_to_dataframe(file_path):
with open(file_path) as f:
definitions = json5.load(f)
Expand Down Expand Up @@ -74,33 +118,6 @@ def json_file_to_bq(args, file_path, lifetime=None):

return

# # Initialize the BigQuery client
# client = bigquery.Client()
#
# # Define the BigQuery table reference
# table_ref = f'{args.project}.{args.bq_dataset_id}.{args.table_id}'
#
# # Create the BigQuery table if it doesn't exist
# try:
# client.get_table(table_ref)
# except:
# table = bigquery.Table(table_ref)
# client.create_table(table)
#
# # Write the DataFrame data to BigQuery
# job_config = bigquery.LoadJobConfig(write_disposition='WRITE_TRUNCATE')
# job = client.load_table_from_dataframe(df, table_ref, job_config=job_config)
# job.result()
#
# if lifetime:
# table = client.get_table(table_ref) # Get the table object
# expiration_time = datetime.now(pytz.utc) + timedelta(minutes=lifetime)
# table.expires = expiration_time
# client.update_table(table, ["expires"])
#
# print('Data imported successfully!')



if __name__ == '__main__':
parser = argparse.ArgumentParser()
Expand Down
31 changes: 27 additions & 4 deletions bq/generate_tables_and_views/all_joined.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import settings
from google.cloud import bigquery
from utilities.bq_helpers import create_BQ_dataset
from utilities.tcia_helpers import get_tcia_collection_manager_data
import pandas as pd
from bq.bq_utilities import create_temp_table_from_df

# Flatten the version/collection/... hierarchy
# Note that we no longer include license here as the license can change over time.
Expand Down Expand Up @@ -119,7 +122,16 @@ def create_all_flattened(client):


def create_all_sources(client):
table_id = f"{settings.DEV_PROJECT}.{settings.BQ_DEV_INT_DATASET}.all_sources"
# Create a temporary table of names of all TCIA analysis results
tcia_analysis_result_dois= [row['result_doi'].lower() for row in get_tcia_collection_manager_data('analysis-results')]
df = pd.DataFrame(tcia_analysis_result_dois, columns=['source_doi'])
table_id = f"{settings.DEV_PROJECT}.{settings.BQ_DEV_INT_DATASET}.tcia_analysis_result_dois"
schema = [
bigquery.SchemaField("source_doi", "STRING")
]
create_temp_table_from_df(client, table_id, schema, df, 60)


query = f"""
with basics as (
SELECT distinct
Expand All @@ -136,17 +148,28 @@ def create_all_sources(client):
ON af.source_doi = dtc.source_doi
LEFT JOIN `{settings.DEV_PROJECT}.{settings.BQ_DEV_INT_DATASET}.metadata_sunset` ms
ON af.source_doi = ms.source_doi
),
analysis_result_dois as (
SELECT DISTINCT source_doi
FROM `{settings.DEV_PROJECT}.{settings.BQ_DEV_INT_DATASET}.tcia_analysis_result_dois`
UNION ALL
SELECT DISTINCT source_doi
FROM `{settings.DEV_PROJECT}.{settings.BQ_DEV_INT_DATASET}.analysis_results_metadata_idc_source`
)
SELECT
*,
basics.*,
if(analysis_result_dois.source_doi IS NULL, False, True) analysis_result,
if(Type='Open', 'idc-arch-open', if(Type='Cr', 'idc-arch-cr', if(Type='Defaced', 'idc-arch-defaced', if(Type='Redacted','idc-arch-redacted','idc-arch-excluded')))) dev_bucket,
if(Type='Open', 'idc-open-data', if(Type='Cr', 'idc-open-cr', if(Type='Defaced', 'idc-open-idc1', NULL))) pub_gcs_bucket,
if(Type='Open', 'idc-open-data', if(Type='Cr', 'idc-open-data-cr', if(Type='Defaced', 'idc-open-data-two', NULL))) pub_aws_bucket,
FROM basics
-- ORDER by collection_id, source_doi, dev_bucket, pub_gcs_bucket, pub_aws_bucket
ORDER by collection_id, source_doi, pub_gcs_bucket, pub_aws_bucket
LEFT JOIN analysis_result_dois
ON basics.source_doi = analysis_result_dois.source_doi
ORDER by collection_id, basics.source_doi, pub_gcs_bucket, pub_aws_bucket
"""
# Make an API request to create the view.
table_id = f"{settings.DEV_PROJECT}.{settings.BQ_DEV_INT_DATASET}.all_sources"
client.delete_table(table_id, not_found_ok=True)
job_config = bigquery.QueryJobConfig(destination=table_id)
query_job = client.query(query,job_config=job_config)
Expand All @@ -159,7 +182,7 @@ def create_all_joined(client):
view = bigquery.Table(view_id)
view.view_query = f"""
-- SELECT af.*, ac.source, ac.Class, ac.Access, ac.metadata_sunset, ac.dev_bucket, ac.pub_gcs_bucket, ac.pub_aws_bucket
SELECT af.*, ac.source, ac.Type, ac.Access, ac.metadata_sunset, ac.dev_bucket, ac.pub_gcs_bucket, ac.pub_aws_bucket
SELECT af.*, ac.source, ac.Type, ac.Access, ac.metadata_sunset, ac.analysis_result, ac.dev_bucket, ac.pub_gcs_bucket, ac.pub_aws_bucket
FROM `{settings.DEV_PROJECT}.{settings.BQ_DEV_INT_DATASET}.all_flattened` af
JOIN `{settings.DEV_PROJECT}.{settings.BQ_DEV_INT_DATASET}.all_sources` ac
ON af.source_doi = ac.source_doi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import pandas as pd
from google.cloud import bigquery
import markdownify
from bq.utilities import read_json_to_dataframe, dataframe_to_bq
from bq.bq_utilities import read_json_to_dataframe, dataframe_to_bq


# Get the descriptions of collections that are only sourced from IDC
Expand Down
Loading