From 3948e71f3f4637bf4a0a0d5064c93775288292a4 Mon Sep 17 00:00:00 2001 From: Darren Siegel Date: Wed, 17 Dec 2025 10:16:40 -0500 Subject: [PATCH 1/2] debug mode --- dataset/attempts.py | 89 ++++++++++++++++++------------ dataset/dataset.py | 25 ++++++--- dataset/page_viewed.py | 73 ++++++++++++++++--------- dataset/video.py | 120 +++++++++++++++++++++++++---------------- job.py | 4 ++ 5 files changed, 198 insertions(+), 113 deletions(-) diff --git a/dataset/attempts.py b/dataset/attempts.py index ca8cfd0..e1a128f 100644 --- a/dataset/attempts.py +++ b/dataset/attempts.py @@ -1,49 +1,65 @@ import json +import logging import boto3 from dataset.utils import encode_array, encode_json, prune_fields from dataset.lookup import determine_student_id def attempts_handler(bucket_key, context, excluded_indices): - # Use the key to read in the file contents, split on line endings - bucket_name, key = bucket_key + """ + Entry point for attempts extraction. Any exception is swallowed so the + Spark job cannot be aborted by a bad record or failed download. + """ + try: + # Use the key to read in the file contents, split on line endings + bucket_name, key = bucket_key - # Create a session using the specified profile - s3_client = boto3.client('s3') - - response = s3_client.get_object(Bucket=bucket_name, Key=key) + # Create a session using the specified profile + s3_client = boto3.client('s3') - # Read the contents of the file - content = response['Body'].read().decode('utf-8') + response = s3_client.get_object(Bucket=bucket_name, Key=key) - values = [] + # Read the contents of the file + content = response['Body'].read().decode('utf-8') - subtypes = context["sub_types"] - - for line in content.splitlines(): - # parse one line of json - j = json.loads(line) + values = [] - student_id = j["actor"]["account"]["name"] - project_matches = context["project_id"] is None or context["project_id"] == j["context"]["extensions"]["http://oli.cmu.edu/extensions/project_id"] - page_matches = context["page_ids"] is None or j["context"]["extensions"]["http://oli.cmu.edu/extensions/page_id"] in context["page_ids"] - - if student_id not in context["ignored_student_ids"] and project_matches and page_matches: - if (("part_attempt_evaluated" in subtypes) or ("part_attempt_evaluted" in subtypes)) and j["object"]["definition"]["type"] == "http://adlnet.gov/expapi/activities/question": - o = from_part_attempt(j, context) - o = prune_fields(o, excluded_indices) - values.append(o) - elif "activity_attempt_evaluated" in subtypes and j["object"]["definition"]["type"] == "http://oli.cmu.edu/extensions/activity_attempt": - o = from_activity_attempt(j, context) - o = prune_fields(o, excluded_indices) - values.append(o) - elif (("page_attempt_evaluated" in subtypes) or ("page_attempt_evaluted" in subtypes)) and j["object"]["definition"]["type"] == "http://oli.cmu.edu/extensions/page_attempt": - o = from_page_attempt(j, context) - o = prune_fields(o, excluded_indices) - values.append(o) + subtypes = context["sub_types"] - - return values + for line in content.splitlines(): + if not line.strip(): + continue + try: + # parse one line of json + j = json.loads(line) + + student_id = j["actor"]["account"]["name"] + project_matches = context["project_id"] is None or context["project_id"] == j["context"]["extensions"]["http://oli.cmu.edu/extensions/project_id"] + page_matches = context["page_ids"] is None or j["context"]["extensions"]["http://oli.cmu.edu/extensions/page_id"] in context["page_ids"] + + if student_id not in context["ignored_student_ids"] and project_matches and page_matches: + if (("part_attempt_evaluated" in subtypes) or ("part_attempt_evaluted" in subtypes)) and j["object"]["definition"]["type"] == "http://adlnet.gov/expapi/activities/question": + o = from_part_attempt(j, context) + o = prune_fields(o, excluded_indices) + values.append(o) + elif "activity_attempt_evaluated" in subtypes and j["object"]["definition"]["type"] == "http://oli.cmu.edu/extensions/activity_attempt": + o = from_activity_attempt(j, context) + o = prune_fields(o, excluded_indices) + values.append(o) + elif (("page_attempt_evaluated" in subtypes) or ("page_attempt_evaluted" in subtypes)) and j["object"]["definition"]["type"] == "http://oli.cmu.edu/extensions/page_attempt": + o = from_page_attempt(j, context) + o = prune_fields(o, excluded_indices) + values.append(o) + except Exception: + logging.exception("attempts_handler: skipping malformed line in %s", key) + continue + + return values + + except Exception: + logging.exception("attempts_handler: failed to process %s", bucket_key) + debug_log(context, f"Failed to process {bucket_key}") + return [] def from_part_attempt(value, context): @@ -123,3 +139,10 @@ def from_page_attempt(value, context): None, #feedback None ] + + +def debug_log(context, message): + """Log a debug message if debugging is enabled in the context.""" + if context.get("debug", False): + print(f"DEBUG: {message}") + diff --git a/dataset/dataset.py b/dataset/dataset.py index 550977c..791bc43 100644 --- a/dataset/dataset.py +++ b/dataset/dataset.py @@ -29,15 +29,18 @@ def generate_datashop(context): section_ids = context["section_ids"] # Retrieve matching keys from S3 inventory + debug_log(context, "Listing keys from inventory") keys = list_keys_from_inventory(section_ids, "attempt_evaluated", source_bucket, inventory_bucket) + + debug_log(context, f"Found {len(keys)} keys from inventory") number_of_chunks = calculate_number_of_chunks(len(keys), chunk_size) - - print(f"Context: {context}") - print(f"Number of keys: {len(keys)}") - print(f"Number of chunks: {number_of_chunks}") + debug_log(context, f"Calculated number of chunks: {number_of_chunks}") # Retrieve the datashop lookup context lookup = retrieve_lookup(s3_client, context) + + debug_log(context, "Retrieved lookup data") + context['lookup'] = lookup @@ -157,15 +160,16 @@ def generate_dataset(section_ids, action, context): columns = prune_fields(columns, excluded_indices) # Download the additional lookup information file + debug_log(context, "Retrieving lookup data") context["lookup"] = retrieve_lookup(s3_client, context) # Retrieve matching keys from S3 inventory + debug_log(context, "Listing keys from inventory") keys = list_keys_from_inventory(section_ids, action, source_bucket, inventory_bucket) number_of_chunks = calculate_number_of_chunks(len(keys), chunk_size) - print(f"Context: {context}") - print(f"Number of keys: {len(keys)}") - print(f"Number of chunks: {number_of_chunks}") + debug_log(context, f"Calculated number of chunks: {number_of_chunks}") + debug_log(context, f"Found {len(keys)} keys from inventory") # Process keys in chunks, serially for chunk_index, chunk_keys in enumerate(chunkify(keys, chunk_size)): @@ -181,10 +185,12 @@ def generate_dataset(section_ids, action, context): print(f"Error processing chunk {chunk_index + 1}/{number_of_chunks}: {e}") # Build and save JSON and HTML manifests + debug_log(context, "Building manifests") context['lookup'] = {} build_manifests(s3_client, context, number_of_chunks, "csv") # Stop Spark context + debug_log(context, "Ending job, stopping Spark context") sc.stop() return number_of_chunks @@ -231,5 +237,8 @@ def build_manifests(s3_client, context, number_of_chunks, extension): build_html_manifest(s3_client, context, number_of_chunks, extension) build_json_manifest(s3_client, context, number_of_chunks, extension) - +def debug_log(context, message): + """Log a debug message if debugging is enabled in the context.""" + if context.get("debug", False): + print(f"DEBUG: {message}") diff --git a/dataset/page_viewed.py b/dataset/page_viewed.py index 25d6bd8..fe93020 100644 --- a/dataset/page_viewed.py +++ b/dataset/page_viewed.py @@ -1,37 +1,53 @@ import json +import logging import boto3 from dataset.utils import prune_fields from dataset.lookup import determine_student_id def page_viewed_handler(bucket_key, context, excluded_indices): - # Use the key to read in the file contents, split on line endings - bucket_name, key = bucket_key - - # Create a session using the specified profile - s3_client = boto3.client('s3') - - response = s3_client.get_object(Bucket=bucket_name, Key=key) - - # Read the contents of the file - content = response['Body'].read().decode('utf-8') - - values = [] - - for line in content.splitlines(): - # parse one line of json - j = json.loads(line) - - student_id = j["actor"]["account"]["name"] - project_matches = context["project_id"] is None or context["project_id"] == j["context"]["extensions"]["http://oli.cmu.edu/extensions/project_id"] - page_matches = context["page_ids"] is None or j["context"]["extensions"]["http://oli.cmu.edu/extensions/page_id"] in context["page_ids"] + """ + Entry point for page viewed extraction. Never raise so Spark job keeps going. + """ + try: + # Use the key to read in the file contents, split on line endings + bucket_name, key = bucket_key + + # Create a session using the specified profile + s3_client = boto3.client('s3') - if student_id not in context["ignored_student_ids"] and project_matches and page_matches: - o = from_page_viewed(j, context) - o = prune_fields(o, excluded_indices) - values.append(o) + response = s3_client.get_object(Bucket=bucket_name, Key=key) + + # Read the contents of the file + content = response['Body'].read().decode('utf-8') + + values = [] + + for line in content.splitlines(): + if not line.strip(): + continue + try: + # parse one line of json + j = json.loads(line) + + student_id = j["actor"]["account"]["name"] + project_matches = context["project_id"] is None or context["project_id"] == j["context"]["extensions"]["http://oli.cmu.edu/extensions/project_id"] + page_matches = context["page_ids"] is None or j["context"]["extensions"]["http://oli.cmu.edu/extensions/page_id"] in context["page_ids"] + + if student_id not in context["ignored_student_ids"] and project_matches and page_matches: + o = from_page_viewed(j, context) + o = prune_fields(o, excluded_indices) + values.append(o) + except Exception: + logging.exception("page_viewed_handler: skipping malformed line in %s", key) + continue - return values + return values + + except Exception: + logging.exception("page_viewed_handler: failed to process %s", bucket_key) + debug_log(context, f"Failed to process {bucket_key}") + return [] def from_page_viewed(value, context): return [ @@ -45,3 +61,10 @@ def from_page_viewed(value, context): value["context"]["extensions"]["http://oli.cmu.edu/extensions/page_attempt_guid"], value["context"]["extensions"]["http://oli.cmu.edu/extensions/page_attempt_number"] ] + + +def debug_log(context, message): + """Log a debug message if debugging is enabled in the context.""" + if context.get("debug", False): + print(f"DEBUG: {message}") + diff --git a/dataset/video.py b/dataset/video.py index 3daa34a..f4dbf6a 100644 --- a/dataset/video.py +++ b/dataset/video.py @@ -1,57 +1,73 @@ import json +import logging import boto3 from dataset.utils import prune_fields from dataset.lookup import determine_student_id + def video_handler(bucket_key, context, excluded_indices): - # Use the key to read in the file contents, split on line endings - bucket_name, key = bucket_key - - # Create a session using the specified profile - s3_client = boto3.client('s3') - - response = s3_client.get_object(Bucket=bucket_name, Key=key) - - # Read the contents of the file - content = response['Body'].read().decode('utf-8') - - values = [] - - subtypes = context["sub_types"] - - for line in content.splitlines(): - # parse one line of json - j = json.loads(line) - - student_id = j["actor"]["account"]["name"] - short_verb = j["verb"]["display"]["en-US"] - - project_matches = context["project_id"] is None or context["project_id"] == j["context"]["extensions"]["http://oli.cmu.edu/extensions/project_id"] - page_matches = context["page_ids"] is None or j["context"]["extensions"]["http://oli.cmu.edu/extensions/resource_id"] in context["page_ids"] - - if student_id not in context["ignored_student_ids"] and project_matches and page_matches: - if short_verb in subtypes: - if short_verb == "played": - o = from_played(j, context) - o = prune_fields(o, excluded_indices) - values.append(o) - elif short_verb == "paused": - o = from_paused(j, context) - o = prune_fields(o, excluded_indices) - values.append(o) - elif short_verb == "seeked": - o = from_seeked(j, context) - o = prune_fields(o, excluded_indices) - values.append(o) - elif short_verb == "completed": - o = from_completed(j, context) - o = prune_fields(o, excluded_indices) - values.append(o) - - - return values - + """ + Entry point for video events extraction. Never raise to keep Spark job alive. + """ + try: + # Use the key to read in the file contents, split on line endings + bucket_name, key = bucket_key + + # Create a session using the specified profile + s3_client = boto3.client('s3') + + response = s3_client.get_object(Bucket=bucket_name, Key=key) + + # Read the contents of the file + content = response['Body'].read().decode('utf-8') + + values = [] + + subtypes = context["sub_types"] + + for line in content.splitlines(): + if not line.strip(): + continue + try: + # parse one line of json + j = json.loads(line) + + student_id = j["actor"]["account"]["name"] + short_verb = j["verb"]["display"]["en-US"] + + project_matches = context["project_id"] is None or context["project_id"] == j["context"]["extensions"]["http://oli.cmu.edu/extensions/project_id"] + page_matches = context["page_ids"] is None or j["context"]["extensions"]["http://oli.cmu.edu/extensions/resource_id"] in context["page_ids"] + + if student_id not in context["ignored_student_ids"] and project_matches and page_matches: + if short_verb in subtypes: + if short_verb == "played": + o = from_played(j, context) + o = prune_fields(o, excluded_indices) + values.append(o) + elif short_verb == "paused": + o = from_paused(j, context) + o = prune_fields(o, excluded_indices) + values.append(o) + elif short_verb == "seeked": + o = from_seeked(j, context) + o = prune_fields(o, excluded_indices) + values.append(o) + elif short_verb == "completed": + o = from_completed(j, context) + o = prune_fields(o, excluded_indices) + values.append(o) + except Exception: + logging.exception("video_handler: skipping malformed line in %s", key) + continue + + return values + + except Exception: + logging.exception("video_handler: failed to process %s", bucket_key) + debug_log(context, f"Failed to process {bucket_key}") + return [] + def from_played(value, context): return [ @@ -74,6 +90,7 @@ def from_played(value, context): None ] + def from_paused(value, context): return [ "paused", @@ -95,6 +112,7 @@ def from_paused(value, context): value["result"]["extensions"]["https://w3id.org/xapi/video/extensions/progress"] ] + def from_seeked(value, context): return [ "seeked", @@ -116,6 +134,7 @@ def from_seeked(value, context): None ] + def from_completed(value, context): return [ "completed", @@ -136,3 +155,10 @@ def from_completed(value, context): value["result"]["extensions"]["https://w3id.org/xapi/video/extensions/played-segments"], value["result"]["extensions"]["https://w3id.org/xapi/video/extensions/progress"] ] + + +def debug_log(context, message): + """Log a debug message if debugging is enabled in the context.""" + if context.get("debug", False): + print(f"DEBUG: {message}") + diff --git a/job.py b/job.py index 8815d9f..e3f3623 100644 --- a/job.py +++ b/job.py @@ -18,6 +18,7 @@ parser.add_argument("--anonymize", required=False, help="Whether to anonymize students") parser.add_argument("--exclude_fields", required=False, help="List of fields to exclude") parser.add_argument("--enforce_project_id", required=False, help="Project id to ensure the data is from this project") + parser.add_argument("--debug", required=False, help="Enables detailed logging for debugging purposes") args = parser.parse_args() @@ -42,6 +43,8 @@ project_id = args.enforce_project_id if args.enforce_project_id else None project_id = guarentee_int(project_id) + debug = args.debug == "true" + context = { "bucket_name": bucket_name, "inventory_bucket_name": inventory_bucket_name, @@ -56,6 +59,7 @@ "exclude_fields": exclude_fields, "project_id": project_id, "anonymize": anonymize, + "debug": debug } action = args.action From 5eda5e052bd20f61904a7d49114e847edafea59b Mon Sep 17 00:00:00 2001 From: Darren Siegel Date: Wed, 17 Dec 2025 10:20:16 -0500 Subject: [PATCH 2/2] updates --- dataset/attempts.py | 11 ++++------- dataset/page_viewed.py | 11 ++++------- dataset/video.py | 11 ++++------- 3 files changed, 12 insertions(+), 21 deletions(-) diff --git a/dataset/attempts.py b/dataset/attempts.py index e1a128f..d627c33 100644 --- a/dataset/attempts.py +++ b/dataset/attempts.py @@ -1,5 +1,4 @@ import json -import logging import boto3 from dataset.utils import encode_array, encode_json, prune_fields @@ -50,15 +49,14 @@ def attempts_handler(bucket_key, context, excluded_indices): o = from_page_attempt(j, context) o = prune_fields(o, excluded_indices) values.append(o) - except Exception: - logging.exception("attempts_handler: skipping malformed line in %s", key) + except Exception as exc: + debug_log(context, f"attempts_handler: skipping malformed line in {key}: {exc}") continue return values - except Exception: - logging.exception("attempts_handler: failed to process %s", bucket_key) - debug_log(context, f"Failed to process {bucket_key}") + except Exception as exc: + debug_log(context, f"attempts_handler: failed to process {bucket_key}: {exc}") return [] @@ -145,4 +143,3 @@ def debug_log(context, message): """Log a debug message if debugging is enabled in the context.""" if context.get("debug", False): print(f"DEBUG: {message}") - diff --git a/dataset/page_viewed.py b/dataset/page_viewed.py index fe93020..daab23a 100644 --- a/dataset/page_viewed.py +++ b/dataset/page_viewed.py @@ -1,5 +1,4 @@ import json -import logging import boto3 from dataset.utils import prune_fields @@ -38,15 +37,14 @@ def page_viewed_handler(bucket_key, context, excluded_indices): o = from_page_viewed(j, context) o = prune_fields(o, excluded_indices) values.append(o) - except Exception: - logging.exception("page_viewed_handler: skipping malformed line in %s", key) + except Exception as exc: + debug_log(context, f"page_viewed_handler: skipping malformed line in {key}: {exc}") continue return values - except Exception: - logging.exception("page_viewed_handler: failed to process %s", bucket_key) - debug_log(context, f"Failed to process {bucket_key}") + except Exception as exc: + debug_log(context, f"page_viewed_handler: failed to process {bucket_key}: {exc}") return [] def from_page_viewed(value, context): @@ -67,4 +65,3 @@ def debug_log(context, message): """Log a debug message if debugging is enabled in the context.""" if context.get("debug", False): print(f"DEBUG: {message}") - diff --git a/dataset/video.py b/dataset/video.py index f4dbf6a..67978b7 100644 --- a/dataset/video.py +++ b/dataset/video.py @@ -1,5 +1,4 @@ import json -import logging import boto3 from dataset.utils import prune_fields @@ -57,15 +56,14 @@ def video_handler(bucket_key, context, excluded_indices): o = from_completed(j, context) o = prune_fields(o, excluded_indices) values.append(o) - except Exception: - logging.exception("video_handler: skipping malformed line in %s", key) + except Exception as exc: + debug_log(context, f"video_handler: skipping malformed line in {key}: {exc}") continue return values - except Exception: - logging.exception("video_handler: failed to process %s", bucket_key) - debug_log(context, f"Failed to process {bucket_key}") + except Exception as exc: + debug_log(context, f"video_handler: failed to process {bucket_key}: {exc}") return [] @@ -161,4 +159,3 @@ def debug_log(context, message): """Log a debug message if debugging is enabled in the context.""" if context.get("debug", False): print(f"DEBUG: {message}") -