-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathagent_repl.py
More file actions
581 lines (487 loc) Β· 20.7 KB
/
agent_repl.py
File metadata and controls
581 lines (487 loc) Β· 20.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
import re
import subprocess
import sys
import os
from fetch_and_calculate import fetch_all_bookings_for_user, fetch_and_explain_booking
class TravelBookingAgent:
"""
Agent that interprets natural language commands and executes
booking functions via REPL-style exπ SINGLE BOOKING:
β’ "show booking 1" - Get detailed breakdown for booking ID 1
β’ "explain booking 2" - Show price calculation for booking ID 2
π CHECK BOOKING OWNER:
β’ "who owns booking 1" - See which user booked booking ID 1
β’ "which user has booking 2" - Check owner of booking ID 2
οΏ½οΈ VIEW ALL BOOKINGS:
β’ "show all bookings" - See every booking in the system
β’ "list all bookings" - See all bookings with users and routes
οΏ½π ALL USER BOOKINGS:
β’ "show me all bookings under nikitha" - All bookings with full details
β’ "find bookings of john" - All bookings with full details
π° USER TOTALS (Summary Only):
β’ "total price for user nikitha" - Route-wise summary + grand total
π’ MULTIPLE SPECIFIC BOOKINGS:
β’ "show bookings 1, 2, 3" - Sum of specific booking IDs"""
def __init__(self):
self.commands = {
'booking_by_id': r'(?:show|explain|calculate|get)\s+(?:booking|price)\s+(?:for\s+)?(?:id\s+)?(\d+)',
'booking_by_user': r'(?:show|get|find)\s+(?:me\s+)?(?:all\s+)?bookings?\s+(?:for|under|of)\s+(?:user\s+)?["\']?(\w+)["\']?',
'provide_all_bookings': r'(?:provide|show)\s+(?:all\s+)?(?:my\s+)?(?:bookings?|all\s+bookings?)',
'my_bookings': r'(?:show|get|find)\s+(?:all\s+)?(?:my)\s+bookings?',
'user_total': r'(?:total|sum)\s+(?:price|cost)\s+(?:for|of|under)\s+(?:user\s+)?["\']?(\w+)["\']?',
'multiple_bookings': r'(?:show|explain)\s+bookings?\s+(\d+(?:\s*,\s*\d+)*)',
'booking_owner': r'(?:who|which\s+user|owner)\s+(?:owns|has|booked)\s+booking\s+(\d+)',
'all_bookings': r'(?:show|list)\s+(?:all\s+)?(?:bookings?|system\s+bookings?)',
'help': r'(?:help|what\s+can\s+you\s+do|commands)',
}
def parse_natural_language(self, user_input):
"""Parse natural language input and extract intent and parameters"""
original_input = user_input.strip()
user_input_lower = user_input.lower().strip()
for intent, pattern in self.commands.items():
# Use lowercase for pattern matching
match = re.search(pattern, user_input_lower)
if match:
# But extract parameters from original input to preserve case
if intent in ['booking_by_user', 'user_total']:
# For user commands, extract username with original case
original_match = re.search(pattern, original_input, re.IGNORECASE)
if original_match:
return intent, original_match.groups()
return intent, match.groups()
return None, None
def execute_in_repl(self, code_to_execute):
"""Execute Python code in a REPL-like environment"""
try:
# Create a local namespace with our functions
local_namespace = {
'fetch_all_bookings_for_user': fetch_all_bookings_for_user,
'fetch_and_explain_booking': fetch_and_explain_booking,
}
# Execute the code in REPL-style using exec for multi-line code
exec(code_to_execute, {"__builtins__": __builtins__}, local_namespace)
# Get the result from the local namespace
result = local_namespace.get('result', None)
return result
except Exception as e:
print(f"β Error: {e}")
return None
def execute_system_repl(self, code_to_execute):
"""Execute code in actual Python REPL subprocess"""
try:
print(f"π€ Agent executing in SYSTEM REPL: {code_to_execute}")
print("=" * 60)
# Prepare the code with imports
full_code = f"""
import sys
import os
sys.path.append(os.getcwd())
from fetch_and_calculate import fetch_all_bookings_for_user, fetch_and_explain_booking
# Execute the agent command
result = {code_to_execute}
print("\\n" + "="*50)
print(f"π₯ SYSTEM REPL RESULT: {{result}}")
print("="*50)
"""
# Execute in subprocess (actual REPL call)
process = subprocess.run(
[sys.executable, '-c', full_code],
capture_output=True,
text=True,
cwd=os.getcwd()
)
print(process.stdout)
if process.stderr:
print(f"π¨ REPL Errors: {process.stderr}")
# Extract result from output
lines = process.stdout.split('\n')
for line in lines:
if 'SYSTEM REPL RESULT:' in line:
result = line.split('SYSTEM REPL RESULT: ')[1]
print(f"β
SYSTEM REPL execution completed. Final result: {result}")
return result
print("β
SYSTEM REPL execution completed")
return "Execution completed"
except Exception as e:
print(f"β Error in SYSTEM REPL: {e}")
return None
def execute_interactive_repl(self, code_to_execute):
"""Execute code by launching interactive Python REPL"""
try:
print(f"π€ Agent launching INTERACTIVE REPL for: {code_to_execute}")
print("=" * 60)
# Create a temporary script
script_content = f"""
# Auto-generated by Travel Booking Agent
import sys
import os
sys.path.append(os.getcwd())
from fetch_and_calculate import fetch_all_bookings_for_user, fetch_and_explain_booking
print("π€ Agent REPL Environment Loaded!")
print("Available functions:")
print("- fetch_all_bookings_for_user(username)")
print("- fetch_and_explain_booking(booking_id)")
print("\\nExecuting agent command: {code_to_execute}")
print("=" * 50)
# Execute the agent command
result = {code_to_execute}
print("=" * 50)
print(f"π― Agent execution result: {{result}}")
print("\\nType exit() to leave REPL")
# Drop into interactive mode
import code
code.interact(local=locals())
"""
# Write to temp file and execute
with open('agent_repl_session.py', 'w') as f:
f.write(script_content)
# Launch interactive Python
os.system(f'{sys.executable} -i agent_repl_session.py')
# Clean up
if os.path.exists('agent_repl_session.py'):
os.remove('agent_repl_session.py')
return "Interactive REPL session completed"
except Exception as e:
print(f"β Error in INTERACTIVE REPL: {e}")
return None
def process_command(self, user_input, repl_mode='local'):
"""Process natural language command and execute appropriate function"""
intent, params = self.parse_natural_language(user_input)
if intent is None:
return "β I don't understand that command. Type 'help' for available commands."
# Generate the appropriate Python code to execute
code_to_execute = None
if intent == 'booking_by_id':
booking_id = params[0]
code_to_execute = f"result = fetch_and_explain_booking({booking_id})"
elif intent == 'booking_by_user':
username = params[0]
# EXACT case-sensitive search - shows only the exact username requested
code_to_execute = f'''
import sqlite3
conn = sqlite3.connect("travel.db")
c = conn.cursor()
# Find users with EXACT username match (case sensitive)
c.execute("SELECT id, username FROM users WHERE username = ?", ("{username}",))
exact_users = c.fetchall()
if not exact_users:
print(f"No users found with EXACT name '{username}' (case sensitive)")
# Offer case-insensitive search as suggestion
c.execute("SELECT id, username FROM users WHERE lower(username) = lower(?)", ("{username}",))
similar_users = c.fetchall()
if similar_users:
similar_names = [user[1] for user in similar_users]
print(f"οΏ½ Did you mean one of these similar users? {{similar_names}}")
result = 0.0
else:
user_id, user_name = exact_users[0]
print(f"\\nοΏ½ Found EXACT user: {{user_name}} (ID: {{user_id}})")
print("π SHOWING ALL BOOKINGS:")
print("="*60)
c.execute("SELECT id FROM bookings WHERE user_id=?", (user_id,))
user_bookings = [row[0] for row in c.fetchall()]
if not user_bookings:
print(f"No bookings found for user '{{user_name}}'")
result = 0.0
else:
from collections import defaultdict
route_totals = defaultdict(float)
user_total = 0.0
for bid in sorted(user_bookings):
price = fetch_and_explain_booking(bid)
if price:
# Get route info for this booking
c.execute("""
SELECT r.origin, r.destination
FROM bookings b
JOIN routes r ON b.route_id = r.id
WHERE b.id = ?
""", (bid,))
route_info = c.fetchone()
if route_info:
route_key = f"{{route_info[0]}} -> {{route_info[1]}}"
route_totals[route_key] += price
user_total += price
# Show summary for this exact user
print("\\n" + "="*50)
print(f"π ROUTE-WISE SUMMARY FOR USER '{{user_name}}':")
print("="*50)
for route, total in route_totals.items():
print(f"Route {{route}}: {{total}}")
print("-"*30)
print(f"π― TOTAL FOR {{user_name}}: {{user_total}}")
print("="*50)
result = user_total
conn.close()
'''
elif intent == 'user_total':
username = params[0]
# Dynamic case-insensitive user search for totals - works for ANY username
code_to_execute = f'''
import sqlite3
conn = sqlite3.connect("travel.db")
c = conn.cursor()
# Find all users with same name (case insensitive) - DYNAMIC for any username
c.execute("SELECT id, username FROM users WHERE lower(username) = lower(?)", ("{username}",))
matching_users = c.fetchall()
if not matching_users:
print(f"No users found with name '{username}' (case insensitive)")
result = 0.0
else:
if len(matching_users) > 1:
user_names = [user[1] for user in matching_users]
print(f"\\nπ CALCULATING TOTAL FOR ALL '{username.upper()}' USERS: {{user_names}}")
else:
user_name = matching_users[0][1]
print(f"\\nπ CALCULATING TOTAL FOR USER '{{user_name}}'...")
all_booking_ids = []
for user_id, user_name in matching_users:
c.execute("SELECT id FROM bookings WHERE user_id=?", (user_id,))
user_bookings = [row[0] for row in c.fetchall()]
all_booking_ids.extend(user_bookings)
if not all_booking_ids:
print(f"No bookings found for user(s) with name '{username}'")
result = 0.0
else:
from collections import defaultdict
route_totals = defaultdict(float)
grand_total = 0.0
for bid in all_booking_ids:
# Get price by calling fetch_and_explain_booking but suppress its output
import io
import sys
old_stdout = sys.stdout
sys.stdout = buffer = io.StringIO()
try:
price = fetch_and_explain_booking(bid)
finally:
sys.stdout = old_stdout
if price:
# Get route info for this booking
c.execute("""
SELECT r.origin, r.destination
FROM bookings b
JOIN routes r ON b.route_id = r.id
WHERE b.id = ?
""", (bid,))
route_info = c.fetchone()
if route_info:
route_key = f"{{route_info[0]}} -> {{route_info[1]}}"
route_totals[route_key] += price
grand_total += price
print("\\n" + "="*50)
if len(matching_users) > 1:
print(f"π ROUTE-WISE SUMMARY FOR ALL '{username.upper()}' USERS:")
else:
print("π ROUTE-WISE SUMMARY:")
print("="*50)
for route, total in route_totals.items():
print(f"Route {{route}}: {{total}}")
print("-"*30)
if len(matching_users) > 1:
print(f"π― GRAND TOTAL FOR ALL '{username.upper()}': {{grand_total}}")
else:
print(f"π― GRAND TOTAL: {{grand_total}}")
print("="*50)
result = grand_total
conn.close()
result
'''
elif intent == 'booking_owner':
booking_id = params[0]
code_to_execute = f'''
import sqlite3
conn = sqlite3.connect("travel.db")
c = conn.cursor()
c.execute("""
SELECT u.username, b.id
FROM bookings b
JOIN users u ON b.user_id = u.id
WHERE b.id = ?
""", ({booking_id},))
result_row = c.fetchone()
if result_row:
username, bid = result_row
print(f"π Booking ID {{bid}} belongs to user: {{username}}")
result = f"Booking {{bid}} β User: {{username}}"
else:
print(f"β Booking ID {{booking_id}} not found")
result = f"Booking {{booking_id}} not found"
conn.close()
result
'''
elif intent == 'all_bookings':
code_to_execute = f'''
import sqlite3
conn = sqlite3.connect("travel.db")
c = conn.cursor()
c.execute("""
SELECT b.id, u.username, r.origin, r.destination
FROM bookings b
JOIN users u ON b.user_id = u.id
JOIN routes r ON b.route_id = r.id
ORDER BY b.id
""")
all_bookings = c.fetchall()
print("\\nπ ALL BOOKINGS IN SYSTEM:")
print("="*60)
for bid, username, origin, dest in all_bookings:
print(f"Booking {{bid:2}} β User: {{username:10}} β Route: {{origin}} -> {{dest}}")
print("="*60)
print(f"Total bookings in system: {{len(all_bookings)}}")
conn.close()
result = f"Found {{len(all_bookings)}} total bookings"
result
'''
elif intent == 'provide_all_bookings':
# Show ALL bookings in the system for ALL users with calculations
code_to_execute = '''
import sqlite3
conn = sqlite3.connect("travel.db")
c = conn.cursor()
# Get all users
c.execute("SELECT DISTINCT username FROM users ORDER BY username")
all_users = [row[0] for row in c.fetchall()]
if not all_users:
print("No users found in the system")
result = 0.0
else:
print("π ALL BOOKINGS IN SYSTEM - COMPLETE ANALYSIS:")
print("="*70)
total_system_price = 0.0
total_bookings = 0
users_with_bookings = 0
for username in all_users:
c.execute("SELECT id FROM users WHERE username=?", (username,))
user_row = c.fetchone()
if not user_row:
continue
user_id = user_row[0]
c.execute("SELECT id FROM bookings WHERE user_id=?", (user_id,))
user_bookings = [row[0] for row in c.fetchall()]
if user_bookings:
users_with_bookings += 1
print(f"\\nπ€ USER: {username.upper()} ({len(user_bookings)} bookings)")
print("-" * 50)
user_total = 0.0
for bid in user_bookings:
price = fetch_and_explain_booking(bid)
if price:
user_total += price
total_system_price += price
total_bookings += 1
print("-" * 30)
print(f"\\nπ° TOTAL FOR {username.upper()}: ${user_total:.2f}")
print("=" * 50)
print(f"\\nπ― GRAND TOTAL - ALL USERS: ${total_system_price:.2f}")
print(f"π TOTAL BOOKINGS: {total_bookings}")
print(f"π₯ USERS WITH BOOKINGS: {users_with_bookings}")
result = total_system_price
conn.close()
result
'''
elif intent == 'my_bookings':
# For CLI usage, ask for username. For web usage, this will be handled by the web interface
# The web interface should pass the current username to this method
# We'll provide a helpful message for CLI users
code_to_execute = '''
import sqlite3
# Check if we can determine a current user (this will work when called from web interface)
current_user = None
# For CLI usage, we need the user to specify
if not current_user:
print("π To view your bookings, you have two options:")
print("1. Use the web interface (login automatically provides your username)")
print("2. In CLI, use: 'show bookings for [your_username]'")
print("")
print("Examples:")
print("β’ 'show bookings for nikitha'")
print("β’ 'show bookings for john'")
print("β’ 'all bookings for [username]'")
result = "Please specify username for booking queries"
else:
# This would execute if called with user context (from web interface)
result = fetch_all_bookings_for_user(current_user)
result
'''
elif intent == 'my_bookings':
# For CLI usage, ask for username. For web usage, this will be handled by the web interface
code_to_execute = '''
print("π To view your bookings, you have two options:")
print("1. Use the web interface (login automatically provides your username)")
print("2. In CLI, use: 'show bookings for [your_username]'")
print("")
print("Examples:")
print("β’ 'show bookings for nikitha'")
print("β’ 'show bookings for john'")
print("β’ 'all bookings for [username]'")
result = "Please specify username for booking queries"
result
'''
elif intent == 'multiple_bookings':
booking_ids = [id.strip() for id in params[0].split(',')]
# Create a simple loop for multiple bookings
booking_ids_int = [int(bid) for bid in booking_ids if bid.isdigit()]
code_to_execute = f"sum([fetch_and_explain_booking(bid) or 0 for bid in {booking_ids_int}])"
elif intent == 'help':
return self.show_help()
if code_to_execute:
# Execute based on REPL mode
if repl_mode == 'system':
return self.execute_system_repl(code_to_execute)
elif repl_mode == 'interactive':
return self.execute_interactive_repl(code_to_execute)
else: # local
return self.execute_in_repl(code_to_execute)
return "β Could not generate executable code for that command."
def show_help(self):
"""Show available commands"""
help_text = """
π€ Travel Booking Agent - Simple Commands
π SINGLE BOOKING:
β’ "show booking 1" - Get detailed breakdown for booking ID 1
β’ "explain booking 2" - Show price calculation for booking ID 2
π ALL USER BOOKINGS:
β’ "show me all bookings under nikitha" - All bookings with full details
β’ "find bookings of john" - All bookings with full details
οΏ½ USER TOTALS (Summary Only):
β’ "total price for user nikitha" - Route-wise summary + grand total
π’ MULTIPLE SPECIFIC BOOKINGS:
β’ "show bookings 1, 2, 3" - Sum of specific booking IDs
π OTHER:
β’ "help" - Show this help
β’ "quit" - Exit
π‘ Use "show booking X" for single, "total price" for summary!
"""
return help_text
def chat_loop(self):
"""Simple chat loop for the agent"""
print("π€ Travel Booking Agent Started!")
print("=" * 40)
print("Type booking commands or 'help' for examples.")
print("Type 'quit' to exit.")
print("=" * 40)
while True:
try:
user_input = input("\n㪠You: ").strip()
if user_input.lower() in ['quit', 'exit', 'bye']:
print("π Goodbye!")
break
elif user_input.lower() == '':
continue
# Process the command (always use local REPL for simplicity)
result = self.process_command(user_input, 'local')
# Only show result if it's a simple response (not for complex outputs)
if result and isinstance(result, (str, int, float)) and len(str(result)) < 100:
print(f"\nπ€ Result: {result}")
except KeyboardInterrupt:
print("\nπ Goodbye!")
break
except Exception as e:
print(f"β Error: {e}")
if __name__ == "__main__":
print("π Travel Booking Agent")
print("=" * 30)
agent = TravelBookingAgent()
print(agent.show_help())
agent.chat_loop()