Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 53 additions & 33 deletions dataset/attempts.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,45 +5,59 @@
from dataset.lookup import determine_student_id

def attempts_handler(bucket_key, context, excluded_indices):
# Use the key to read in the file contents, split on line endings
bucket_name, key = bucket_key
"""
Entry point for attempts extraction. Any exception is swallowed so the
Spark job cannot be aborted by a bad record or failed download.
"""
try:
# Use the key to read in the file contents, split on line endings
bucket_name, key = bucket_key

# Create a session using the specified profile
s3_client = boto3.client('s3')

response = s3_client.get_object(Bucket=bucket_name, Key=key)
# Create a session using the specified profile
s3_client = boto3.client('s3')

# Read the contents of the file
content = response['Body'].read().decode('utf-8')
response = s3_client.get_object(Bucket=bucket_name, Key=key)

values = []
# Read the contents of the file
content = response['Body'].read().decode('utf-8')

subtypes = context["sub_types"]

for line in content.splitlines():
# parse one line of json
j = json.loads(line)
values = []

student_id = j["actor"]["account"]["name"]
project_matches = context["project_id"] is None or context["project_id"] == j["context"]["extensions"]["http://oli.cmu.edu/extensions/project_id"]
page_matches = context["page_ids"] is None or j["context"]["extensions"]["http://oli.cmu.edu/extensions/page_id"] in context["page_ids"]

if student_id not in context["ignored_student_ids"] and project_matches and page_matches:
if (("part_attempt_evaluated" in subtypes) or ("part_attempt_evaluted" in subtypes)) and j["object"]["definition"]["type"] == "http://adlnet.gov/expapi/activities/question":
o = from_part_attempt(j, context)
o = prune_fields(o, excluded_indices)
values.append(o)
elif "activity_attempt_evaluated" in subtypes and j["object"]["definition"]["type"] == "http://oli.cmu.edu/extensions/activity_attempt":
o = from_activity_attempt(j, context)
o = prune_fields(o, excluded_indices)
values.append(o)
elif (("page_attempt_evaluated" in subtypes) or ("page_attempt_evaluted" in subtypes)) and j["object"]["definition"]["type"] == "http://oli.cmu.edu/extensions/page_attempt":
o = from_page_attempt(j, context)
o = prune_fields(o, excluded_indices)
values.append(o)
subtypes = context["sub_types"]


return values
for line in content.splitlines():
if not line.strip():
continue
try:
# parse one line of json
j = json.loads(line)

student_id = j["actor"]["account"]["name"]
project_matches = context["project_id"] is None or context["project_id"] == j["context"]["extensions"]["http://oli.cmu.edu/extensions/project_id"]
page_matches = context["page_ids"] is None or j["context"]["extensions"]["http://oli.cmu.edu/extensions/page_id"] in context["page_ids"]

if student_id not in context["ignored_student_ids"] and project_matches and page_matches:
if (("part_attempt_evaluated" in subtypes) or ("part_attempt_evaluted" in subtypes)) and j["object"]["definition"]["type"] == "http://adlnet.gov/expapi/activities/question":
o = from_part_attempt(j, context)
o = prune_fields(o, excluded_indices)
values.append(o)
elif "activity_attempt_evaluated" in subtypes and j["object"]["definition"]["type"] == "http://oli.cmu.edu/extensions/activity_attempt":
o = from_activity_attempt(j, context)
o = prune_fields(o, excluded_indices)
values.append(o)
elif (("page_attempt_evaluated" in subtypes) or ("page_attempt_evaluted" in subtypes)) and j["object"]["definition"]["type"] == "http://oli.cmu.edu/extensions/page_attempt":
o = from_page_attempt(j, context)
o = prune_fields(o, excluded_indices)
values.append(o)
except Exception as exc:
debug_log(context, f"attempts_handler: skipping malformed line in {key}: {exc}")
continue

return values

except Exception as exc:
debug_log(context, f"attempts_handler: failed to process {bucket_key}: {exc}")
return []


def from_part_attempt(value, context):
Expand Down Expand Up @@ -123,3 +137,9 @@ def from_page_attempt(value, context):
None, #feedback
None
]


def debug_log(context, message):
"""Log a debug message if debugging is enabled in the context."""
if context.get("debug", False):
print(f"DEBUG: {message}")
25 changes: 17 additions & 8 deletions dataset/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,18 @@ def generate_datashop(context):
section_ids = context["section_ids"]

# Retrieve matching keys from S3 inventory
debug_log(context, "Listing keys from inventory")
keys = list_keys_from_inventory(section_ids, "attempt_evaluated", source_bucket, inventory_bucket)

debug_log(context, f"Found {len(keys)} keys from inventory")
number_of_chunks = calculate_number_of_chunks(len(keys), chunk_size)

print(f"Context: {context}")
print(f"Number of keys: {len(keys)}")
print(f"Number of chunks: {number_of_chunks}")
debug_log(context, f"Calculated number of chunks: {number_of_chunks}")

# Retrieve the datashop lookup context
lookup = retrieve_lookup(s3_client, context)

debug_log(context, "Retrieved lookup data")

context['lookup'] = lookup


Expand Down Expand Up @@ -157,15 +160,16 @@ def generate_dataset(section_ids, action, context):
columns = prune_fields(columns, excluded_indices)

# Download the additional lookup information file
debug_log(context, "Retrieving lookup data")
context["lookup"] = retrieve_lookup(s3_client, context)

# Retrieve matching keys from S3 inventory
debug_log(context, "Listing keys from inventory")
keys = list_keys_from_inventory(section_ids, action, source_bucket, inventory_bucket)
number_of_chunks = calculate_number_of_chunks(len(keys), chunk_size)

print(f"Context: {context}")
print(f"Number of keys: {len(keys)}")
print(f"Number of chunks: {number_of_chunks}")
debug_log(context, f"Calculated number of chunks: {number_of_chunks}")
debug_log(context, f"Found {len(keys)} keys from inventory")

# Process keys in chunks, serially
for chunk_index, chunk_keys in enumerate(chunkify(keys, chunk_size)):
Expand All @@ -181,10 +185,12 @@ def generate_dataset(section_ids, action, context):
print(f"Error processing chunk {chunk_index + 1}/{number_of_chunks}: {e}")

# Build and save JSON and HTML manifests
debug_log(context, "Building manifests")
context['lookup'] = {}
build_manifests(s3_client, context, number_of_chunks, "csv")

# Stop Spark context
debug_log(context, "Ending job, stopping Spark context")
sc.stop()

return number_of_chunks
Expand Down Expand Up @@ -231,5 +237,8 @@ def build_manifests(s3_client, context, number_of_chunks, extension):
build_html_manifest(s3_client, context, number_of_chunks, extension)
build_json_manifest(s3_client, context, number_of_chunks, extension)


def debug_log(context, message):
"""Log a debug message if debugging is enabled in the context."""
if context.get("debug", False):
print(f"DEBUG: {message}")

70 changes: 45 additions & 25 deletions dataset/page_viewed.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,33 +5,47 @@
from dataset.lookup import determine_student_id

def page_viewed_handler(bucket_key, context, excluded_indices):
# Use the key to read in the file contents, split on line endings
bucket_name, key = bucket_key

# Create a session using the specified profile
s3_client = boto3.client('s3')

response = s3_client.get_object(Bucket=bucket_name, Key=key)

# Read the contents of the file
content = response['Body'].read().decode('utf-8')

values = []

for line in content.splitlines():
# parse one line of json
j = json.loads(line)

student_id = j["actor"]["account"]["name"]
project_matches = context["project_id"] is None or context["project_id"] == j["context"]["extensions"]["http://oli.cmu.edu/extensions/project_id"]
page_matches = context["page_ids"] is None or j["context"]["extensions"]["http://oli.cmu.edu/extensions/page_id"] in context["page_ids"]
"""
Entry point for page viewed extraction. Never raise so Spark job keeps going.
"""
try:
# Use the key to read in the file contents, split on line endings
bucket_name, key = bucket_key

# Create a session using the specified profile
s3_client = boto3.client('s3')

response = s3_client.get_object(Bucket=bucket_name, Key=key)

# Read the contents of the file
content = response['Body'].read().decode('utf-8')

values = []

if student_id not in context["ignored_student_ids"] and project_matches and page_matches:
o = from_page_viewed(j, context)
o = prune_fields(o, excluded_indices)
values.append(o)
for line in content.splitlines():
if not line.strip():
continue
try:
# parse one line of json
j = json.loads(line)

student_id = j["actor"]["account"]["name"]
project_matches = context["project_id"] is None or context["project_id"] == j["context"]["extensions"]["http://oli.cmu.edu/extensions/project_id"]
page_matches = context["page_ids"] is None or j["context"]["extensions"]["http://oli.cmu.edu/extensions/page_id"] in context["page_ids"]

if student_id not in context["ignored_student_ids"] and project_matches and page_matches:
o = from_page_viewed(j, context)
o = prune_fields(o, excluded_indices)
values.append(o)
except Exception as exc:
debug_log(context, f"page_viewed_handler: skipping malformed line in {key}: {exc}")
continue

return values
return values

except Exception as exc:
debug_log(context, f"page_viewed_handler: failed to process {bucket_key}: {exc}")
return []

def from_page_viewed(value, context):
return [
Expand All @@ -45,3 +59,9 @@ def from_page_viewed(value, context):
value["context"]["extensions"]["http://oli.cmu.edu/extensions/page_attempt_guid"],
value["context"]["extensions"]["http://oli.cmu.edu/extensions/page_attempt_number"]
]


def debug_log(context, message):
"""Log a debug message if debugging is enabled in the context."""
if context.get("debug", False):
print(f"DEBUG: {message}")
Loading