-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_data_exploration.py
More file actions
executable file
·161 lines (130 loc) · 5.5 KB
/
test_data_exploration.py
File metadata and controls
executable file
·161 lines (130 loc) · 5.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
#!/usr/bin/env python3
"""
Data exploration script for PICORI to OMOP ETL
This script explores the Parquet data files to understand the structure and content
"""
import os
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, countDistinct, min as spark_min, max as spark_max
def create_spark_session():
"""Create Spark session"""
return SparkSession.builder \
.appName("PICORI2OMOP-DataExploration") \
.master("local[*]") \
.config("spark.sql.session.timeZone", "UTC") \
.getOrCreate()
def explore_parquet_file(spark, file_path, table_name):
"""Explore a single Parquet file"""
print(f"\n{'='*60}")
print(f"EXPLORING: {table_name}")
print(f"File: {file_path}")
print(f"{'='*60}")
if not os.path.exists(file_path):
print(f"❌ File not found: {file_path}")
return None
try:
# Read the Parquet file
df = spark.read.parquet(file_path)
# Basic info
row_count = df.count()
col_count = len(df.columns)
print(f"📊 Rows: {row_count:,}")
print(f"📊 Columns: {col_count}")
print(f"📊 Columns: {', '.join(df.columns)}")
# Show schema
print(f"\n📋 Schema:")
df.printSchema()
# Show sample data
print(f"\n📄 Sample data (first 5 rows):")
df.show(5, truncate=False)
# Show data types and null counts
print(f"\n📈 Column Analysis:")
for col_name in df.columns:
col_data = df.select(col(col_name))
null_count = col_data.filter(col(col_name).isNull()).count()
distinct_count = col_data.select(col_name).distinct().count()
print(f" {col_name}:")
print(f" - Nulls: {null_count:,} ({null_count/row_count*100:.1f}%)")
print(f" - Distinct: {distinct_count:,}")
# Show sample values for string columns
if col_data.dtypes[0][1] == 'string':
sample_values = col_data.filter(col(col_name).isNotNull()).select(col_name).distinct().limit(10).collect()
if sample_values:
values = [str(row[0]) for row in sample_values]
print(f" - Sample values: {', '.join(values[:5])}")
if len(values) > 5:
print(f" ... and {len(values)-5} more")
return df
except Exception as e:
print(f"❌ Error reading {file_path}: {e}")
return None
def main():
"""Main function"""
print("🔍 PICORI to OMOP Data Exploration")
print("="*60)
# Data directory
data_dir = "/home/asadr/datasets/stroke_data"
if not os.path.exists(data_dir):
print(f"❌ Data directory not found: {data_dir}")
print("Please check the path and ensure the Parquet files are available")
return 1
# Create Spark session
spark = create_spark_session()
try:
# List of files to explore
files_to_explore = [
("demographic.parquet", "DEMOGRAPHIC"),
("encounter.parquet", "ENCOUNTER"),
("diagnosis.parquet", "DIAGNOSIS"),
("condition.parquet", "CONDITION"),
("procedures.parquet", "PROCEDURES"),
("prescribing.parquet", "PRESCRIBING"),
("dispensing.parquet", "DISPENSING"),
("lab_result_cm.parquet", "LAB_RESULT_CM"),
("vital.parquet", "VITAL"),
("obs_clin.parquet", "OBS_CLIN"),
("obs_gen.parquet", "OBS_GEN"),
("immunization.parquet", "IMMUNIZATION"),
("death.parquet", "DEATH"),
("death_cause.parquet", "DEATH_CAUSE"),
("med_admin.parquet", "MED_ADMIN")
]
explored_files = []
for filename, table_name in files_to_explore:
file_path = os.path.join(data_dir, filename)
df = explore_parquet_file(spark, file_path, table_name)
if df is not None:
explored_files.append((table_name, df))
# Summary
print(f"\n{'='*60}")
print("📊 EXPLORATION SUMMARY")
print(f"{'='*60}")
print(f"Total files explored: {len(explored_files)}")
for table_name, df in explored_files:
row_count = df.count()
print(f" {table_name}: {row_count:,} rows")
# Cross-table analysis
if len(explored_files) >= 2:
print(f"\n🔗 Cross-table Analysis:")
# Check for common ID columns
id_columns = ['PATID', 'ENCOUNTERID', 'PROVIDERID']
for id_col in id_columns:
print(f"\n {id_col} analysis:")
for table_name, df in explored_files:
if id_col in df.columns:
distinct_count = df.select(id_col).distinct().count()
print(f" {table_name}: {distinct_count:,} distinct {id_col}")
print(f"\n✅ Data exploration completed successfully!")
print(f"\nNext steps:")
print(f"1. Review the data structure and content above")
print(f"2. Set up PostgreSQL database")
print(f"3. Run the ETL process")
return 0
except Exception as e:
print(f"❌ Error during exploration: {e}")
return 1
finally:
spark.stop()
if __name__ == "__main__":
sys.exit(main())