diff --git a/dataset/dataset.py b/dataset/dataset.py
index 75f56f8..550977c 100644
--- a/dataset/dataset.py
+++ b/dataset/dataset.py
@@ -11,7 +11,7 @@
from dataset.utils import parallel_map, prune_fields
from dataset.manifest import build_html_manifest, build_json_manifest
from dataset.event_registry import get_event_config
-from dataset.datashop import handle_datashop, process_jsonl_file, process_part_attempts
+from dataset.datashop import handle_datashop, process_jsonl_file, process_part_attempts, process_tutor_messages
from dataset.lookup import retrieve_lookup
@@ -78,7 +78,35 @@ def generate_datashop(context):
results = process_part_attempts(partitioned_part_attempts[key], context)
all_results.extend(results)
-
+ tutor_keys = list_keys_from_inventory(section_ids, "tutor_message", source_bucket, inventory_bucket)
+ tutor_number_of_chunks = calculate_number_of_chunks(len(tutor_keys), chunk_size)
+
+ all_tutor_messages = []
+ for chunk_index, chunk_keys in enumerate(chunkify(tutor_keys, chunk_size)):
+ try:
+ # Process keys in parallel to
+ part_attempts = parallel_map(sc, source_bucket, chunk_keys, process_jsonl_file, context, [])
+ all_tutor_messages.extend(part_attempts)
+
+ except Exception as e:
+ print(f"Error processing tutor chunk {chunk_index + 1}/{tutor_number_of_chunks}: {e}")
+
+ partitioned_part_attempts = {}
+ for part_attempt in all_tutor_messages:
+ key = str(part_attempt.get('section_id', '')) + "_" + str(part_attempt.get('user_id', '')) + "_" + str(part_attempt.get('session_id', ''))
+
+ if key not in partitioned_part_attempts:
+ partitioned_part_attempts[key] = []
+ partitioned_part_attempts[key].append(part_attempt)
+
+ for key in partitioned_part_attempts:
+
+ results = process_tutor_messages(partitioned_part_attempts[key], context)
+ all_results.extend(results)
+
+ # Calculate total number of chunks based on combined results
+ total_number_of_chunks = calculate_number_of_chunks(len(all_results), chunk_size)
+
# Every XML chunk should have the directive and the
# outermost tutor_related_message_sequence element
chunk_prefix = """
@@ -93,19 +121,19 @@ def generate_datashop(context):
# Save the collected results as an XML chunk to S3
save_xml_chunk(chunk_data, s3_client, target_prefix, chunk_index, results_bucket_name=context["results_bucket_name"])
- print(f"Successfully processed chunk {chunk_index + 1}/{number_of_chunks}")
+ print(f"Successfully processed chunk {chunk_index + 1}/{total_number_of_chunks}")
except Exception as e:
- print(f"Error processing chunk {chunk_index + 1}/{number_of_chunks}: {e}")
+ print(f"Error processing chunk {chunk_index + 1}/{total_number_of_chunks}: {e}")
# Build and save JSON and HTML manifests, resetting the lookup so we don't preserve it
context['lookup'] = {}
- build_manifests(s3_client, context, number_of_chunks, "xml")
+ build_manifests(s3_client, context, total_number_of_chunks, "xml")
# Stop Spark context
sc.stop()
- return number_of_chunks
+ return total_number_of_chunks
def generate_dataset(section_ids, action, context):
diff --git a/dataset/datashop.py b/dataset/datashop.py
index 3788228..e9e35b7 100644
--- a/dataset/datashop.py
+++ b/dataset/datashop.py
@@ -35,20 +35,101 @@ def handle_datashop(bucket_key, context, excluded_indices):
global_context["last_good_context_message_id"] = None
for line in content.splitlines():
- # parse one line of json
- j = json.loads(line)
-
- student_id = j["actor"]["account"]["name"]
- project_matches = context["project_id"] is None or context["project_id"] == j["context"]["extensions"]["http://oli.cmu.edu/extensions/project_id"]
-
- if student_id not in context["ignored_student_ids"] and project_matches:
- if j["object"]["definition"]["type"] == "http://adlnet.gov/expapi/activities/question":
- o = to_xml_message(j, lookup)
- values.append(o)
+ if not line.strip():
+ continue
+
+ try:
+ # Parse JSON line
+ j = json.loads(line)
+
+ student_id = j["actor"]["account"]["name"]
+ project_matches = (context.get("project_id") is None or
+ context.get("project_id") == j["context"]["extensions"]["http://oli.cmu.edu/extensions/project_id"])
+
+ if student_id not in context.get("ignored_student_ids", []) and project_matches:
+ # Process different types of messages
+ obj_type = j["object"]["definition"]["type"]
+
+ if obj_type == "http://adlnet.gov/expapi/activities/question":
+ # Handle attempt_evaluated messages
+ o = to_xml_message(j, lookup)
+ values.append(o)
+
+ elif obj_type == "http://oli.cmu.edu/extensions/tutor_message":
+ # Handle tutor_message messages
+ o = process_tutor_message(j, lookup)
+ if o:
+ values.append(o)
+
+ except json.JSONDecodeError as e:
+ print(f"Error parsing JSON line: {e}")
+ continue
+ except Exception as e:
+ print(f"Error processing line: {e}")
+ continue
return values
+def process_tutor_message(j, lookup):
+ """
+ Process tutor message from local XAPI data.
+ Extracts the XML content and replaces the meta element with a generated one.
+ """
+ try:
+ # Extract the message content from the XAPI JSON
+ message_content = j["result"]["message"]
+
+ message_root = ET.fromstring(message_content)
+
+ # Create context for generating new meta element
+ faux_full_context = {'lookup': lookup, 'anonymize': lookup.get('anonymize', False)}
+ user_id = determine_student_id(faux_full_context, j)
+
+ context = {
+ 'user_id': user_id,
+ 'session_id': f"{user_id} {j['timestamp'].replace('T', ' ').replace('Z', '')}",
+ 'time': j['timestamp'],
+ 'time_zone': 'GMT'
+ }
+
+ updated_xml = ""
+ for child in message_root:
+ # Find and remove existing meta element
+ existing_meta = child.find('meta')
+ if existing_meta is not None:
+ child.remove(existing_meta)
+
+ # Create new meta element using our generator
+ new_meta_xml = meta_xml(context)
+ new_meta_element = ET.fromstring(new_meta_xml)
+
+ # Insert the new meta element at the beginning
+ child.insert(0, new_meta_element)
+
+ updated_xml += ET.tostring(child, encoding='unicode')
+ # Clean up any escaped entities
+ cleaned_message = unescape_numeric_entities(updated_xml)
+
+ # Add proper indentation for readability
+ cleaned_message = " " + cleaned_message.replace("\n", "\n ")
+
+ return cleaned_message
+
+ except Exception as e:
+ print(f"Error processing tutor message: {e}")
+ import traceback
+ traceback.print_exc()
+ return None
+
+def meta_xml(context):
+ """Create meta XML element."""
+ return f'''
+ {sanitize_element_text(context.get("user_id", "Unknown"))}
+ {sanitize_element_text(context.get("session_id", "Unknown"))}
+
+ {sanitize_element_text(context.get("time_zone", "GMT"))}
+ '''
def process_part_attempts(part_attempts, context):
@@ -65,6 +146,22 @@ def process_part_attempts(part_attempts, context):
return values
+def process_tutor_messages(part_attempts, context):
+
+ values = []
+
+ lookup = context['lookup']
+ lookup['anonymize'] = context['anonymize']
+
+ global_context["last_good_context_message_id"] = None
+
+ for part_attempt in part_attempts:
+ o = process_tutor_message(part_attempt, lookup)
+ values.append(o)
+
+ return values
+
+
def process_jsonl_file(bucket_key, context, excluded_indices):
bucket_name, key = bucket_key