-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathparse_minimum_fields.py
More file actions
287 lines (246 loc) · 10.2 KB
/
parse_minimum_fields.py
File metadata and controls
287 lines (246 loc) · 10.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
#!/usr/bin/env python3
"""
Script to parse xlsx file and extract entity dictionaries.
Reads column A until 'END_OF_FILE', creates dictionaries for each entity
with column A as keys and column D as values.
"""
import pandas as pd
import sys
from pathlib import Path
from obp_dynamic_api import create_dynamic_entity_from_parsed
import argparse
import os
import datetime
def _has_green_checkmark(cell_value):
"""
Check if a cell contains a green checkmark.
Args:
cell_value (str): The cell value to check
Returns:
bool: True if cell contains a green checkmark
"""
if not cell_value:
return False
# Common representations of green checkmarks
checkmarks = ['✓', '✔', '✅', '☑', '√', 'YES', 'Y', 'TRUE', '1']
cell_upper = cell_value.upper().strip()
return any(mark in cell_upper for mark in checkmarks)
def parse_xlsx_entities(file_path):
"""
Parse xlsx file to extract entity dictionaries.
Args:
file_path (str): Path to the xlsx file
Returns:
dict: Dictionary containing all entity dictionaries
"""
try:
# Read the xlsx file
df = pd.read_excel(file_path, engine='openpyxl')
# Initialize variables
entities = {}
current_entity = None
current_dict = {}
# Iterate through rows
for index, row in df.iterrows():
# Get values from columns A, B, C, and D (0-indexed: A=0, B=1, C=2, D=3)
col_a_value = row.iloc[0] if pd.notna(row.iloc[0]) else ""
col_b_value = row.iloc[1] if len(row) > 1 and pd.notna(row.iloc[1]) else ""
col_c_value = row.iloc[2] if len(row) > 2 and pd.notna(row.iloc[2]) else ""
col_d_value = row.iloc[3] if len(row) > 3 and pd.notna(row.iloc[3]) else ""
# Column G (index 6) holds descriptions; Column H (index 7) holds examples
col_g_value = row.iloc[6] if len(row) > 6 and pd.notna(row.iloc[6]) else ""
col_h_value = row.iloc[7] if len(row) > 7 and pd.notna(row.iloc[7]) else ""
# Convert to string for processing (handle Excel dates safely)
col_a_str = str(col_a_value).strip()
col_b_str = str(col_b_value).strip()
col_c_str = str(col_c_value).strip()
col_d_str = str(col_d_value).strip()
col_g_str = str(col_g_value).strip()
# If pandas read a datetime/timestamp, format as YYYY-MM-DD to match DATE_WITH_DAY
if col_h_value == "" or pd.isna(col_h_value):
col_h_str = ""
else:
if isinstance(col_h_value, (pd.Timestamp, datetime.datetime, datetime.date)):
try:
col_h_str = col_h_value.strftime("%Y-%m-%d")
except Exception:
col_h_str = str(col_h_value).strip()
else:
col_h_str = str(col_h_value).strip()
# Clean example string: remove surrounding double or single quotes if present
cleaned_example = col_h_str
if len(cleaned_example) >= 2:
if (cleaned_example.startswith('"') and cleaned_example.endswith('"')) or (
cleaned_example.startswith("'") and cleaned_example.endswith("'")
):
cleaned_example = cleaned_example[1:-1].strip()
# Check for stop marker
if col_a_str == "END_OF_FILE":
# Save current entity if exists
if current_entity and current_dict:
entities[current_entity] = current_dict
break
# Check if this row starts a new entity
if col_a_str.lower().startswith("entity:"):
# Save previous entity if exists
if current_entity and current_dict:
entities[current_entity] = {"description": current_entity_description, "fields": current_dict}
# Start new entity
entity_name = col_a_str[7:].strip() # Remove "entity:" prefix
current_entity = entity_name
current_dict = {}
# capture entity-level description from column G when present
current_entity_description = col_g_str if col_g_str else f"Parsed entity {entity_name}"
elif current_entity:
# Add to current entity dictionary if we have a valid key
if col_a_str and col_a_str != "nan":
# sanitize field name: replace dots with underscore to satisfy allowed chars
# sanitize field name: only allow A-Z a-z 0-9 underscore and hyphen
# replace any other character with underscore, collapse repeated underscores
import re
safe_key_raw = col_a_str.replace('.', '_')
safe_key = re.sub(r'[^A-Za-z0-9_\-]', '_', safe_key_raw)
# collapse multiple underscores
safe_key = re.sub(r'_+', '_', safe_key).strip('_')
# Check for green check marks in columns B and C
has_green_check_b = _has_green_checkmark(col_b_str)
has_green_check_c = _has_green_checkmark(col_c_str)
if has_green_check_b:
# Column B has green check - add normally; preserve column D as value
# and attach column H as explicit example when available
entry = {"value": col_d_str}
if cleaned_example:
entry["example"] = cleaned_example
if col_g_str:
entry["description"] = col_g_str
current_dict[safe_key] = entry
elif has_green_check_c:
# Column C has green check but not B - mark as optional
opt_key = f"{safe_key} (optional)"
entry = {"value": col_d_str}
if cleaned_example:
entry["example"] = cleaned_example
if col_g_str:
entry["description"] = col_g_str
current_dict[opt_key] = entry
# If neither B nor C has green check, skip this row
# Save the last entity if exists
if current_entity and current_dict:
entities[current_entity] = {"description": current_entity_description, "fields": current_dict}
return entities
except FileNotFoundError:
print(f"Error: File '{file_path}' not found.")
return {}
except Exception as e:
print(f"Error reading file: {e}")
return {}
def main():
"""Main function to run the parser."""
parser = argparse.ArgumentParser(description="Parse minimal field matrix and optionally create dynamic entities on OBP")
parser.add_argument("file", nargs="?", default="min_field_matrix.xlsx", help="Path to the xlsx file")
parser.add_argument("--create", action="store_true", help="Create parsed entities on OBP (will call management API)")
parser.add_argument("--update", action="store_true", help="Update existing parsed entities on OBP (will call management API)")
parser.add_argument("--token", default=None, help="DirectLogin token to use (overrides obp_client.token)")
parser.add_argument("--host", default=None, help="OBP host to use (overrides obp_client.obp_host)")
parser.add_argument("--yes", action="store_true", help="If set with --create, skip confirmation prompt")
args = parser.parse_args()
file_path = args.file
# Read access flags from environment (.env or system env)
def _env_to_bool(val):
if val is None:
return False
if isinstance(val, bool):
return val
s = str(val).strip().lower()
return s in ("1", "true", "yes", "y", "on")
has_personal = _env_to_bool(os.getenv("HAS_PERSONAL_ENTITY", "false"))
has_community = _env_to_bool(os.getenv("HAS_COMMUNITY_ACCESS", "false"))
# Check if file exists
if not Path(file_path).exists():
print(f"File '{file_path}' does not exist.")
return
print(f"Parsing file: {file_path}")
entities = parse_xlsx_entities(file_path)
if not entities:
print("No entities found or error occurred.")
return
# Display results
print(f"\nFound {len(entities)} entities:")
print("=" * 50)
for entity_name, entity_dict in entities.items():
print(f"\nEntity: {entity_name}")
print("-" * 30)
if entity_dict:
for key, value in entity_dict.items():
print(f" {key}: {value}")
else:
print(" (No data)")
# Optionally create or update entities on OBP management API
if args.create and args.update:
print("Cannot use --create and --update together. Choose one.")
return
if args.create or args.update:
print("--create flag provided: will attempt to create parsed entities on OBP")
if args.update:
print("--update flag provided: will attempt to update existing parsed entities on OBP")
if not args.yes:
confirm = input("Proceed to create entities on OBP? Type 'yes' to continue: ")
if confirm.strip().lower() != "yes":
print("Aborted by user.")
return
# iterate and call API (create or update)
for entity_name, entity_wrapper in entities.items():
print(f"Processing entity: {entity_name} ...")
try:
entity_description = entity_wrapper.get("description") if isinstance(entity_wrapper, dict) else None
fields = entity_wrapper.get("fields") if isinstance(entity_wrapper, dict) else entity_wrapper
if args.create:
resp = create_dynamic_entity_from_parsed(
entity_name,
fields,
token=args.token,
base_url=args.host,
has_personal=has_personal,
has_community=has_community,
entity_description=entity_description,
)
print(f"Created: {resp.get('dynamicEntityId', '<no-id>')}")
elif args.update:
# find existing dynamicEntityId by name
from obp_dynamic_api import get_dynamic_entity_id_by_name, build_entity_definition_from_parsed, update_system_dynamic_entity
dynamic_id = get_dynamic_entity_id_by_name(entity_name, token=args.token, base_url=args.host)
if not dynamic_id:
print(f"No existing dynamic entity found for '{entity_name}', skipping update.")
continue
# build definition and call update
entity_def = build_entity_definition_from_parsed(entity_name, fields, entity_description=entity_description)
resp = update_system_dynamic_entity(dynamic_id, entity_def, token=args.token, base_url=args.host)
print(f"Updated: {resp.get('dynamicEntityId', dynamic_id)}")
except Exception as e:
print(f"Failed to process entity {entity_name}: {e}")
# end create loop
return
# If not creating, offer to save to file
save_option = input("\nSave results to a file? (y/n): ").lower().strip()
if save_option in ['y', 'yes']:
output_file = input("Enter output filename (default: entities_output.txt): ").strip()
if not output_file:
output_file = "entities_output.txt"
try:
with open(output_file, 'w', encoding='utf-8') as f:
f.write(f"Parsed entities from: {file_path}\n")
f.write("=" * 50 + "\n\n")
for entity_name, entity_dict in entities.items():
f.write(f"Entity: {entity_name}\n")
f.write("-" * 30 + "\n")
if entity_dict:
for key, value in entity_dict.items():
f.write(f" {key}: {value}\n")
else:
f.write(" (No data)\n")
f.write("\n")
print(f"Results saved to: {output_file}")
except Exception as e:
print(f"Error saving file: {e}")
if __name__ == "__main__":
main()