-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdata_manager.py
More file actions
88 lines (48 loc) · 2.08 KB
/
data_manager.py
File metadata and controls
88 lines (48 loc) · 2.08 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
"""
Database management module for LG NPS RAG System
Handles CSV import, SQLite operations, and data validation
"""
import sqlite3
import pandas as pd
import json
from pathlib import Path
from typing import Dict, Any, Optional, List
from sqlalchemy import create_engine, text
from loguru import logger
from config import Config
class DatabaseManager:
"""Database management class for NPS scorecard data"""
def __init__(self):
self.db_path = Config.DATABASE_PATH
self.csv_path = Config.CSV_FILE_PATH
self.engine = create_engine(Config.get_database_url())
def execute_query(self, query: str, params: Optional[Dict] = None) -> pd.DataFrame:
"""Execute SQL query and return results"""
try:
with self.engine.connect() as conn:
result = pd.read_sql(text(query), conn, params=params or {})
return result
except Exception as e:
# Raise to allow upstream agents/UI to surface real errors instead of
# looking like a successful-but-empty result
logger.error(f"Query execution failed: {e}")
raise
def get_all_table_names(self) -> List[str]:
"""Get all table names in the database"""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
tables = [row[0] for row in cursor.fetchall()]
return tables
except Exception as e:
logger.error(f"Error getting table names: {e}")
return []
# Utility function for easy database initialization
if __name__ == "__main__":
# Test database initialization
db_manager = DatabaseManager()
tables = db_manager.get_all_table_names()
print(f"Tables in database: {tables}")
# result = initialize_database()
# print(f"Database initialization result: {result}")