From f4bb17b45be1d514de4cd542649056b22bb600f8 Mon Sep 17 00:00:00 2001 From: Raphael Gachuhi Date: Thu, 2 Oct 2025 15:13:33 -0400 Subject: [PATCH 1/2] add tutor log messages to datashop export --- dataset/dataset.py | 29 ++++++++++- dataset/datashop.py | 117 ++++++++++++++++++++++++++++++++++++++++---- 2 files changed, 134 insertions(+), 12 deletions(-) diff --git a/dataset/dataset.py b/dataset/dataset.py index 75f56f8..12bf896 100644 --- a/dataset/dataset.py +++ b/dataset/dataset.py @@ -11,7 +11,7 @@ from dataset.utils import parallel_map, prune_fields from dataset.manifest import build_html_manifest, build_json_manifest from dataset.event_registry import get_event_config -from dataset.datashop import handle_datashop, process_jsonl_file, process_part_attempts +from dataset.datashop import handle_datashop, process_jsonl_file, process_part_attempts, process_tutor_messages from dataset.lookup import retrieve_lookup @@ -78,7 +78,32 @@ def generate_datashop(context): results = process_part_attempts(partitioned_part_attempts[key], context) all_results.extend(results) - + tutor_keys = list_keys_from_inventory(section_ids, "tutor_message", source_bucket, inventory_bucket) + number_of_chunks = calculate_number_of_chunks(len(tutor_keys), chunk_size) + + all_tutor_messages = [] + for chunk_index, chunk_keys in enumerate(chunkify(tutor_keys, chunk_size)): + try: + # Process keys in parallel to + part_attempts = parallel_map(sc, source_bucket, chunk_keys, process_jsonl_file, context, []) + all_tutor_messages.extend(part_attempts) + + except Exception as e: + print(f"Error processing chunk {chunk_index + 1}/{number_of_chunks}: {e}") + + partitioned_part_attempts = {} + for part_attempt in all_tutor_messages: + key = str(part_attempt.get('section_id', '')) + "_" + str(part_attempt.get('user_id', '')) + "_" + str(part_attempt.get('session_id', '')) + + if key not in partitioned_part_attempts: + partitioned_part_attempts[key] = [] + partitioned_part_attempts[key].append(part_attempt) + + for key in partitioned_part_attempts: + + results = process_tutor_messages(partitioned_part_attempts[key], context) + all_results.extend(results) + # Every XML chunk should have the directive and the # outermost tutor_related_message_sequence element chunk_prefix = """ diff --git a/dataset/datashop.py b/dataset/datashop.py index 3788228..e9e35b7 100644 --- a/dataset/datashop.py +++ b/dataset/datashop.py @@ -35,20 +35,101 @@ def handle_datashop(bucket_key, context, excluded_indices): global_context["last_good_context_message_id"] = None for line in content.splitlines(): - # parse one line of json - j = json.loads(line) - - student_id = j["actor"]["account"]["name"] - project_matches = context["project_id"] is None or context["project_id"] == j["context"]["extensions"]["http://oli.cmu.edu/extensions/project_id"] - - if student_id not in context["ignored_student_ids"] and project_matches: - if j["object"]["definition"]["type"] == "http://adlnet.gov/expapi/activities/question": - o = to_xml_message(j, lookup) - values.append(o) + if not line.strip(): + continue + + try: + # Parse JSON line + j = json.loads(line) + + student_id = j["actor"]["account"]["name"] + project_matches = (context.get("project_id") is None or + context.get("project_id") == j["context"]["extensions"]["http://oli.cmu.edu/extensions/project_id"]) + + if student_id not in context.get("ignored_student_ids", []) and project_matches: + # Process different types of messages + obj_type = j["object"]["definition"]["type"] + + if obj_type == "http://adlnet.gov/expapi/activities/question": + # Handle attempt_evaluated messages + o = to_xml_message(j, lookup) + values.append(o) + + elif obj_type == "http://oli.cmu.edu/extensions/tutor_message": + # Handle tutor_message messages + o = process_tutor_message(j, lookup) + if o: + values.append(o) + + except json.JSONDecodeError as e: + print(f"Error parsing JSON line: {e}") + continue + except Exception as e: + print(f"Error processing line: {e}") + continue return values +def process_tutor_message(j, lookup): + """ + Process tutor message from local XAPI data. + Extracts the XML content and replaces the meta element with a generated one. + """ + try: + # Extract the message content from the XAPI JSON + message_content = j["result"]["message"] + + message_root = ET.fromstring(message_content) + + # Create context for generating new meta element + faux_full_context = {'lookup': lookup, 'anonymize': lookup.get('anonymize', False)} + user_id = determine_student_id(faux_full_context, j) + + context = { + 'user_id': user_id, + 'session_id': f"{user_id} {j['timestamp'].replace('T', ' ').replace('Z', '')}", + 'time': j['timestamp'], + 'time_zone': 'GMT' + } + + updated_xml = "" + for child in message_root: + # Find and remove existing meta element + existing_meta = child.find('meta') + if existing_meta is not None: + child.remove(existing_meta) + + # Create new meta element using our generator + new_meta_xml = meta_xml(context) + new_meta_element = ET.fromstring(new_meta_xml) + + # Insert the new meta element at the beginning + child.insert(0, new_meta_element) + + updated_xml += ET.tostring(child, encoding='unicode') + # Clean up any escaped entities + cleaned_message = unescape_numeric_entities(updated_xml) + + # Add proper indentation for readability + cleaned_message = " " + cleaned_message.replace("\n", "\n ") + + return cleaned_message + + except Exception as e: + print(f"Error processing tutor message: {e}") + import traceback + traceback.print_exc() + return None + +def meta_xml(context): + """Create meta XML element.""" + return f''' + {sanitize_element_text(context.get("user_id", "Unknown"))} + {sanitize_element_text(context.get("session_id", "Unknown"))} + + {sanitize_element_text(context.get("time_zone", "GMT"))} + ''' def process_part_attempts(part_attempts, context): @@ -65,6 +146,22 @@ def process_part_attempts(part_attempts, context): return values +def process_tutor_messages(part_attempts, context): + + values = [] + + lookup = context['lookup'] + lookup['anonymize'] = context['anonymize'] + + global_context["last_good_context_message_id"] = None + + for part_attempt in part_attempts: + o = process_tutor_message(part_attempt, lookup) + values.append(o) + + return values + + def process_jsonl_file(bucket_key, context, excluded_indices): bucket_name, key = bucket_key From 4793d8fa18e3a4e4b500d0cf59b0235a1ed49a5b Mon Sep 17 00:00:00 2001 From: Raphael Gachuhi Date: Fri, 3 Oct 2025 09:31:21 -0400 Subject: [PATCH 2/2] address PR review --- dataset/dataset.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/dataset/dataset.py b/dataset/dataset.py index 12bf896..550977c 100644 --- a/dataset/dataset.py +++ b/dataset/dataset.py @@ -79,7 +79,7 @@ def generate_datashop(context): all_results.extend(results) tutor_keys = list_keys_from_inventory(section_ids, "tutor_message", source_bucket, inventory_bucket) - number_of_chunks = calculate_number_of_chunks(len(tutor_keys), chunk_size) + tutor_number_of_chunks = calculate_number_of_chunks(len(tutor_keys), chunk_size) all_tutor_messages = [] for chunk_index, chunk_keys in enumerate(chunkify(tutor_keys, chunk_size)): @@ -89,7 +89,7 @@ def generate_datashop(context): all_tutor_messages.extend(part_attempts) except Exception as e: - print(f"Error processing chunk {chunk_index + 1}/{number_of_chunks}: {e}") + print(f"Error processing tutor chunk {chunk_index + 1}/{tutor_number_of_chunks}: {e}") partitioned_part_attempts = {} for part_attempt in all_tutor_messages: @@ -104,6 +104,9 @@ def generate_datashop(context): results = process_tutor_messages(partitioned_part_attempts[key], context) all_results.extend(results) + # Calculate total number of chunks based on combined results + total_number_of_chunks = calculate_number_of_chunks(len(all_results), chunk_size) + # Every XML chunk should have the directive and the # outermost tutor_related_message_sequence element chunk_prefix = """ @@ -118,19 +121,19 @@ def generate_datashop(context): # Save the collected results as an XML chunk to S3 save_xml_chunk(chunk_data, s3_client, target_prefix, chunk_index, results_bucket_name=context["results_bucket_name"]) - print(f"Successfully processed chunk {chunk_index + 1}/{number_of_chunks}") + print(f"Successfully processed chunk {chunk_index + 1}/{total_number_of_chunks}") except Exception as e: - print(f"Error processing chunk {chunk_index + 1}/{number_of_chunks}: {e}") + print(f"Error processing chunk {chunk_index + 1}/{total_number_of_chunks}: {e}") # Build and save JSON and HTML manifests, resetting the lookup so we don't preserve it context['lookup'] = {} - build_manifests(s3_client, context, number_of_chunks, "xml") + build_manifests(s3_client, context, total_number_of_chunks, "xml") # Stop Spark context sc.stop() - return number_of_chunks + return total_number_of_chunks def generate_dataset(section_ids, action, context):