Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 34 additions & 6 deletions dataset/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from dataset.utils import parallel_map, prune_fields
from dataset.manifest import build_html_manifest, build_json_manifest
from dataset.event_registry import get_event_config
from dataset.datashop import handle_datashop, process_jsonl_file, process_part_attempts
from dataset.datashop import handle_datashop, process_jsonl_file, process_part_attempts, process_tutor_messages
from dataset.lookup import retrieve_lookup


Expand Down Expand Up @@ -78,7 +78,35 @@ def generate_datashop(context):
results = process_part_attempts(partitioned_part_attempts[key], context)
all_results.extend(results)


tutor_keys = list_keys_from_inventory(section_ids, "tutor_message", source_bucket, inventory_bucket)
tutor_number_of_chunks = calculate_number_of_chunks(len(tutor_keys), chunk_size)

all_tutor_messages = []
for chunk_index, chunk_keys in enumerate(chunkify(tutor_keys, chunk_size)):
try:
# Process keys in parallel to
part_attempts = parallel_map(sc, source_bucket, chunk_keys, process_jsonl_file, context, [])
all_tutor_messages.extend(part_attempts)

except Exception as e:
print(f"Error processing tutor chunk {chunk_index + 1}/{tutor_number_of_chunks}: {e}")

partitioned_part_attempts = {}
for part_attempt in all_tutor_messages:
key = str(part_attempt.get('section_id', '')) + "_" + str(part_attempt.get('user_id', '')) + "_" + str(part_attempt.get('session_id', ''))

if key not in partitioned_part_attempts:
partitioned_part_attempts[key] = []
partitioned_part_attempts[key].append(part_attempt)

for key in partitioned_part_attempts:

results = process_tutor_messages(partitioned_part_attempts[key], context)
all_results.extend(results)

# Calculate total number of chunks based on combined results
total_number_of_chunks = calculate_number_of_chunks(len(all_results), chunk_size)

# Every XML chunk should have the <?xml ?> directive and the
# outermost tutor_related_message_sequence element
chunk_prefix = """<?xml version= \"1.0\" encoding= \"UTF-8\"?>
Expand All @@ -93,19 +121,19 @@ def generate_datashop(context):

# Save the collected results as an XML chunk to S3
save_xml_chunk(chunk_data, s3_client, target_prefix, chunk_index, results_bucket_name=context["results_bucket_name"])
print(f"Successfully processed chunk {chunk_index + 1}/{number_of_chunks}")
print(f"Successfully processed chunk {chunk_index + 1}/{total_number_of_chunks}")

except Exception as e:
print(f"Error processing chunk {chunk_index + 1}/{number_of_chunks}: {e}")
print(f"Error processing chunk {chunk_index + 1}/{total_number_of_chunks}: {e}")

# Build and save JSON and HTML manifests, resetting the lookup so we don't preserve it
context['lookup'] = {}
build_manifests(s3_client, context, number_of_chunks, "xml")
build_manifests(s3_client, context, total_number_of_chunks, "xml")

# Stop Spark context
sc.stop()

return number_of_chunks
return total_number_of_chunks


def generate_dataset(section_ids, action, context):
Expand Down
117 changes: 107 additions & 10 deletions dataset/datashop.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,101 @@ def handle_datashop(bucket_key, context, excluded_indices):
global_context["last_good_context_message_id"] = None

for line in content.splitlines():
# parse one line of json
j = json.loads(line)

student_id = j["actor"]["account"]["name"]
project_matches = context["project_id"] is None or context["project_id"] == j["context"]["extensions"]["http://oli.cmu.edu/extensions/project_id"]

if student_id not in context["ignored_student_ids"] and project_matches:
if j["object"]["definition"]["type"] == "http://adlnet.gov/expapi/activities/question":
o = to_xml_message(j, lookup)
values.append(o)
if not line.strip():
continue

try:
# Parse JSON line
j = json.loads(line)

student_id = j["actor"]["account"]["name"]
project_matches = (context.get("project_id") is None or
context.get("project_id") == j["context"]["extensions"]["http://oli.cmu.edu/extensions/project_id"])

if student_id not in context.get("ignored_student_ids", []) and project_matches:
# Process different types of messages
obj_type = j["object"]["definition"]["type"]

if obj_type == "http://adlnet.gov/expapi/activities/question":
# Handle attempt_evaluated messages
o = to_xml_message(j, lookup)
values.append(o)

elif obj_type == "http://oli.cmu.edu/extensions/tutor_message":
# Handle tutor_message messages
o = process_tutor_message(j, lookup)
if o:
values.append(o)

except json.JSONDecodeError as e:
print(f"Error parsing JSON line: {e}")
continue
except Exception as e:
print(f"Error processing line: {e}")
continue

return values

def process_tutor_message(j, lookup):
"""
Process tutor message from local XAPI data.
Extracts the XML content and replaces the meta element with a generated one.
"""
try:
# Extract the message content from the XAPI JSON
message_content = j["result"]["message"]

message_root = ET.fromstring(message_content)

# Create context for generating new meta element
faux_full_context = {'lookup': lookup, 'anonymize': lookup.get('anonymize', False)}
user_id = determine_student_id(faux_full_context, j)

context = {
'user_id': user_id,
'session_id': f"{user_id} {j['timestamp'].replace('T', ' ').replace('Z', '')}",
'time': j['timestamp'],
'time_zone': 'GMT'
}

updated_xml = ""
for child in message_root:
# Find and remove existing meta element
existing_meta = child.find('meta')
if existing_meta is not None:
child.remove(existing_meta)

# Create new meta element using our generator
new_meta_xml = meta_xml(context)
new_meta_element = ET.fromstring(new_meta_xml)

# Insert the new meta element at the beginning
child.insert(0, new_meta_element)

updated_xml += ET.tostring(child, encoding='unicode')

# Clean up any escaped entities
cleaned_message = unescape_numeric_entities(updated_xml)

# Add proper indentation for readability
cleaned_message = " " + cleaned_message.replace("\n", "\n ")

return cleaned_message

except Exception as e:
print(f"Error processing tutor message: {e}")
import traceback
traceback.print_exc()
return None

def meta_xml(context):
"""Create meta XML element."""
return f'''<meta>
<user_id>{sanitize_element_text(context.get("user_id", "Unknown"))}</user_id>
<session_id>{sanitize_element_text(context.get("session_id", "Unknown"))}</session_id>
<time>{sanitize_element_text(format_time(context.get("time")))}</time>
<time_zone>{sanitize_element_text(context.get("time_zone", "GMT"))}</time_zone>
</meta>'''

def process_part_attempts(part_attempts, context):

Expand All @@ -65,6 +146,22 @@ def process_part_attempts(part_attempts, context):

return values

def process_tutor_messages(part_attempts, context):

values = []

lookup = context['lookup']
lookup['anonymize'] = context['anonymize']

global_context["last_good_context_message_id"] = None

for part_attempt in part_attempts:
o = process_tutor_message(part_attempt, lookup)
values.append(o)

return values


def process_jsonl_file(bucket_key, context, excluded_indices):
bucket_name, key = bucket_key

Expand Down