-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtask_2.py
More file actions
108 lines (87 loc) · 4.34 KB
/
task_2.py
File metadata and controls
108 lines (87 loc) · 4.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
import re
import json
import pandas as pd
import log
from db import move_df_to_db
class DataConverter:
def __init__(self, path_fo_files):
self.employee = self._rename_columns(self._read_file(path_fo_files[0], level=0))
self.departments = self._rename_columns(self._read_file(path_fo_files[1]))
self.dummy = self._get_dummy_employee()
self.employee = self.employee.loc[
~self.employee['rid'].isin(self.dummy['rid'].tolist())
]
self.valid_values = self.find_index_of_valid_rows()
self._report_invalid_rows_rid()
self.employee = self._rename_columns(self._read_file(path_fo_files[0])).loc[self.valid_values]
# self.dummy_copy = self.dummy.copy()
# self.dummy_copy.comment = ""
# self.employee = pd.concat([self.employee.copy(), self.dummy_copy.copy()])
self.employee = self._create_unique_id(self.employee.copy(), 'employee_id')
self.departments = self._create_unique_id(self.departments.copy(), "department_id")
self.employee = self.replace_strings_to_id()
self.departments = self.departments.drop(columns=['rid'])
@staticmethod
def _rename_columns(df):
main_df = df.rename(
columns={
col: "_".join(
re.findall("[a-zA-Z]+_?[0-9]*", col)
).lower()
for col in df.columns
}
)
return main_df
def _read_file(self, path_to_file, level=None):
with open(path_to_file) as data_file:
data = json.load(data_file)['result']
df = pd.json_normalize(data, max_level=level)
if '@class' in df.columns:
return df.drop(columns=['@class'])
return df
def _create_mappers(self):
rid_mapper = dict(zip(self.departments.rid, self.departments.department_id))
name_mapper = dict(zip(self.departments.name, self.departments.department_id))
return rid_mapper, name_mapper
@staticmethod
def _create_unique_id(df, col_name):
original_names = df['name'].unique()
new_ids = {cid: indx+100 for indx, cid in enumerate(original_names)}
df[col_name] = df['name'].map(new_ids)
return df
def _get_dummy_employee(self):
dummy_df = self.employee.iloc[self.employee.query('name == "DummyEmployee"').index].fillna(0)
return dummy_df
def find_index_of_valid_rows(self):
masker = self.employee[['department_1', 'department_2', 'department_3']].applymap(
lambda x: tuple(x.values()) if type(x) == dict else x)
res = self.employee[masker.isin(list(self.departments["rid"].values)) | masker.isin(
list(map(tuple, self.departments[["name", "comment"]].values.tolist())))]
return res[['department_1', 'department_2', 'department_3']].dropna().index
def _report_invalid_rows_rid(self):
invalid_values = self.employee.drop(self.valid_values)
log.info(f"Invalid @rid's: {invalid_values.rid.to_list()}")
def replace_strings_to_id(self):
df = self.employee.copy()
rid_mapper, name_mapper = self._create_mappers()
df['department_1'] = df['department_1'].map(rid_mapper)
df['department_2'] = df['department_2'].map(rid_mapper)
df['department_3'] = df['department_3'].map(rid_mapper)
df['department_1_name'] = df['department_1_name'].map(name_mapper)
df['department_2_name'] = df['department_2_name'].map(name_mapper)
df['department_3_name'] = df['department_3_name'].map(name_mapper)
df = df.fillna(0)
df['department_1'] = (df['department_1_name'] + df['department_1']).astype('int32')
df['department_2'] = (df['department_2_name'] + df['department_2']).astype('int32')
df['department_3'] = (df['department_3_name'] + df['department_3']).astype('int32')
drop_cols = df.columns.str.contains("_name|_comment")
df = df.loc[:, ~drop_cols].drop(columns=["rid"])
return df
if __name__ == "__main__":
log.init()
path_to_file_1 = "test_tasks/python_task_2_1_employees.json"
path_to_file_2 = "test_tasks/python_task_2_2_departments.json"
conv = DataConverter([path_to_file_1, path_to_file_2])
# pd.set_option('display.max_columns', None)
move_df_to_db(conv.departments, "second_task", "department")
move_df_to_db(conv.employee, "second_task", "employee")