diff --git a/pipit/readers/core_reader.py b/pipit/readers/core_reader.py index db2dbaf..3f82644 100644 --- a/pipit/readers/core_reader.py +++ b/pipit/readers/core_reader.py @@ -1,6 +1,7 @@ from typing import List, Dict import pandas +import numpy as np from pipit.trace import Trace @@ -102,9 +103,9 @@ def finalize(self): "Name": "category", "Event Type": "category", "Process": "category", - "_matching_event": "int32", - "_parent": "int32", - "_matching_timestamp": "int32", + "_matching_event": "Int32", + "_parent": "Int32", + "_matching_timestamp": "Int32", } ) return trace_df @@ -118,7 +119,7 @@ def __update_parent_child_relationships( """ if len(stack) == 0: # root event - event["_parent"] = -1 + event["_parent"] = np.nan else: parent_event = event_list[stack[-1]] event["_parent"] = parent_event["unique_id"] diff --git a/pipit/readers/projections_reader.py b/pipit/readers/projections_reader.py index 0a2044c..09f5593 100644 --- a/pipit/readers/projections_reader.py +++ b/pipit/readers/projections_reader.py @@ -5,10 +5,12 @@ import os import gzip -import pipit.trace + import pandas as pd import multiprocessing as mp +from pipit.readers.core_reader import CoreTraceReader, concat_trace_data + class ProjectionsConstants: """ @@ -83,8 +85,6 @@ class ProjectionsConstants: class STSReader: def __init__(self, file_location): - self.sts_file = open(file_location, "r") # self.chares = {} - # In 'self.entries', each entry stores (entry_name: str, chare_id: int) self.entries = {} @@ -94,7 +94,7 @@ def __init__(self, file_location): # Stores user stat names: {user_event_id: user stat name} self.user_stats = {} - self.read_sts_file() + self.read_sts_file(file_location) # to get name of entry print > def get_entry_name(self, entry_id): @@ -132,93 +132,94 @@ def get_num_perf_counts(self): def get_event_name(self, event_id): return self.user_events[event_id] - def read_sts_file(self): - for line in self.sts_file: - line_arr = line.split() - - # Note: I'm disregarding TOTAL_STATS and TOTAL_EVENTS, because - # projections reader disregards them - - # Note: currently not reading/storing VERSION, MACHINE, SMPMODE, - # COMMANDLINE, CHARMVERSION, USERNAME, HOSTNAME - - # create chares array - # In 'self.chares', each entry stores (chare_name: str, dimension: int) - if line_arr[0] == "TOTAL_CHARES": - total_chares = int(line_arr[1]) - self.chares = [None] * total_chares - - elif line_arr[0] == "TOTAL_EPS": - self.num_eps = int(line_arr[1]) - - # get num processors - elif line_arr[0] == "PROCESSORS": - self.num_pes = int(line_arr[1]) - - # create message array - elif line_arr[0] == "TOTAL_MSGS": - total_messages = int(line_arr[1]) - self.message_table = [None] * total_messages - elif line_arr[0] == "TIMESTAMP": - self.timestamp_string = line_arr[1] - - # Add to self.chares - elif line_arr[0] == "CHARE": - id = int(line_arr[1]) - name = " ".join(line_arr[2:-1])[1:-1] - dimensions = int(line_arr[-1]) - self.chares[id] = (name, dimensions) - - # add to self.entries - elif line_arr[0] == "ENTRY": - # Need to concat entry_name - while not line_arr[3].endswith('"'): - line_arr[3] = line_arr[3] + " " + line_arr[4] - del line_arr[4] - - id = int(line_arr[2]) - entry_name = line_arr[3][1 : len(line_arr[3]) - 1] - chare_id = int(line_arr[4]) - self.entries[id] = (entry_name, chare_id) - - # Add to message_table - # Need clarification on this, as message_table is never referenced in - # projections - elif line_arr[0] == "MESSAGE": - id = int(line_arr[1]) - message_size = int(line_arr[2]) - self.message_table[id] = message_size - - # Read/store event - elif line_arr[0] == "EVENT": - id = int(line_arr[1]) - event_name = "" - # rest of line is the event name - for i in range(2, len(line_arr)): - event_name = event_name + line_arr[i] + " " - self.user_events[id] = event_name - - # Read/store user stat - elif line_arr[0] == "STAT": - id = int(line_arr[1]) - event_name = "" - # rest of line is the stat - for i in range(2, len(line_arr)): - event_name = event_name + line_arr[i] + " " - self.user_stats[id] = event_name - - # create papi array - elif line_arr[0] == "TOTAL_PAPI_EVENTS": - num_papi_events = int(line_arr[1]) - self.papi_event_names = [None] * num_papi_events - - # Unsure of what these are for - elif line_arr[0] == "PAPI_EVENT": - id = int(line_arr[1]) - papi_event = line_arr[2] - self.papi_event_names[id] = papi_event - - self.sts_file.close() + def read_sts_file(self, file_path): + with open(file_path, "r") as sts_file: + for line in sts_file: + line_arr = line.split() + + # Note: I'm disregarding TOTAL_STATS and TOTAL_EVENTS, because + # projections reader disregards them + + # Note: currently not reading/storing VERSION, MACHINE, SMPMODE, + # COMMANDLINE, CHARMVERSION, USERNAME, HOSTNAME + + # create chares array + # In 'self.chares', each entry stores (chare_name: str, dimension: int) + if line_arr[0] == "TOTAL_CHARES": + total_chares = int(line_arr[1]) + self.chares = [None] * total_chares + + elif line_arr[0] == "TOTAL_EPS": + self.num_eps = int(line_arr[1]) + + # get num processors + elif line_arr[0] == "PROCESSORS": + self.num_pes = int(line_arr[1]) + + # create message array + elif line_arr[0] == "TOTAL_MSGS": + total_messages = int(line_arr[1]) + self.message_table = [None] * total_messages + elif line_arr[0] == "TIMESTAMP": + self.timestamp_string = line_arr[1] + + # Add to self.chares + elif line_arr[0] == "CHARE": + id = int(line_arr[1]) + name = line_arr[2][1 : len(line_arr[2]) - 1] + dimensions = int(line_arr[3]) + self.chares[id] = (name, dimensions) + # print(int(line_arr[1]), line_arr[2][1:len(line_arr[2]) - 1]) + + # add to self.entries + elif line_arr[0] == "ENTRY": + # Need to concat entry_name + while not line_arr[3].endswith('"'): + line_arr[3] = line_arr[3] + " " + line_arr[4] + del line_arr[4] + + id = int(line_arr[2]) + entry_name = line_arr[3][1 : len(line_arr[3]) - 1] + chare_id = int(line_arr[4]) + # name = self.chares[chare_id][0] + '::' + entry_name + self.entries[id] = (entry_name, chare_id) + + # Add to message_table + # Need clarification on this, as message_table is never referenced in + # projections + elif line_arr[0] == "MESSAGE": + id = int(line_arr[1]) + message_size = int(line_arr[2]) + self.message_table[id] = message_size + + # Read/store event + elif line_arr[0] == "EVENT": + id = int(line_arr[1]) + event_name = "" + # rest of line is the event name + for i in range(2, len(line_arr)): + event_name = event_name + line_arr[i] + " " + self.user_events[id] = event_name + + # Read/store user stat + elif line_arr[0] == "STAT": + id = int(line_arr[1]) + event_name = "" + # rest of line is the stat + for i in range(2, len(line_arr)): + event_name = event_name + line_arr[i] + " " + self.user_stats[id] = event_name + + # create papi array + elif line_arr[0] == "TOTAL_PAPI_EVENTS": + num_papi_events = int(line_arr[1]) + self.papi_event_names = [None] * num_papi_events + + # Unsure of what these are for + elif line_arr[0] == "PAPI_EVENT": + id = int(line_arr[1]) + papi_event = line_arr[2] + self.papi_event_names[id] = papi_event class ProjectionsReader: @@ -245,7 +246,8 @@ def __init__( if not hasattr(self, "executable_location"): raise ValueError("Invalid directory for projections - no sts files found.") - self.num_pes = STSReader(self.executable_location + ".sts").num_pes + self.sts_reader = STSReader(self.executable_location + ".sts") + self.num_pes = self.sts_reader.num_pes # make sure all the log files exist for i in range(self.num_pes): @@ -271,17 +273,6 @@ def __init__( self.create_cct = create_cct - # Returns an empty dict, used for reading log file into dataframe - @staticmethod - def _create_empty_dict() -> dict: - return { - "Name": [], - "Event Type": [], - "Timestamp (ns)": [], - "Process": [], - "Attributes": [], - } - def read(self): if self.num_pes < 1: return None @@ -292,46 +283,28 @@ def read(self): pool_size, pool = self.num_processes, mp.Pool(self.num_processes) # Read each log file and store as list of dataframes - dataframes_list = pool.map( + data_list = pool.map( self._read_log_file, [(rank, pool_size) for rank in range(pool_size)] ) pool.close() # Concatenate the dataframes list into dataframe containing entire trace - trace_df = pd.concat(dataframes_list, ignore_index=True) - trace_df.sort_values( - by="Timestamp (ns)", axis=0, ascending=True, inplace=True, ignore_index=True - ) - - # categorical for memory savings - trace_df = trace_df.astype( - { - "Name": "category", - "Event Type": "category", - "Process": "category", - } - ) - - # re-order columns - trace_df = trace_df[ - ["Timestamp (ns)", "Event Type", "Name", "Process", "Attributes"] - ] - trace = pipit.trace.Trace(None, trace_df) - if self.create_cct: - trace.create_cct() - - return trace + return concat_trace_data(data_list) def _read_log_file(self, rank_size) -> pd.DataFrame: + # has information needed in sts file - sts_reader = STSReader(self.executable_location + ".sts") + sts_reader = self.sts_reader rank, size = rank_size[0], rank_size[1] per_process = int(self.num_pes // size) remainder = int(self.num_pes % size) + # Start Core Reader + core_reader = CoreTraceReader(rank, size) + if rank < remainder: begin_int = rank * (per_process + 1) end_int = (rank + 1) * (per_process + 1) @@ -339,10 +312,7 @@ def _read_log_file(self, rank_size) -> pd.DataFrame: begin_int = (rank * per_process) + remainder end_int = ((rank + 1) * per_process) + remainder - dfs = [] for pe_num in range(begin_int, end_int, 1): - # create an empty dict to append to - data = self._create_empty_dict() # opening the log file we need to read log_file = gzip.open( @@ -363,7 +333,7 @@ def _read_log_file(self, rank_size) -> pd.DataFrame: details = {"From PE": pe} - _add_to_trace_dict(data, "Idle", "Enter", time, pe_num, details) + _add_to_trace(core_reader, "Idle", "Enter", time, pe_num, details) elif int(line_arr[0]) == ProjectionsConstants.END_IDLE: time = int(line_arr[1]) * 1000 @@ -371,7 +341,7 @@ def _read_log_file(self, rank_size) -> pd.DataFrame: details = {"From PE": pe} - _add_to_trace_dict(data, "Idle", "Leave", time, pe_num, details) + _add_to_trace(core_reader, "Idle", "Leave", time, pe_num, details) # Pack message to be sent elif int(line_arr[0]) == ProjectionsConstants.BEGIN_PACK: @@ -380,7 +350,7 @@ def _read_log_file(self, rank_size) -> pd.DataFrame: details = {"From PE": pe} - _add_to_trace_dict(data, "Pack", "Enter", time, pe_num, details) + _add_to_trace(core_reader, "Pack", "Enter", time, pe_num, details) elif int(line_arr[0]) == ProjectionsConstants.END_PACK: time = int(line_arr[1]) * 1000 @@ -388,7 +358,7 @@ def _read_log_file(self, rank_size) -> pd.DataFrame: details = {"From PE": pe} - _add_to_trace_dict(data, "Pack", "Leave", time, pe_num, details) + _add_to_trace(core_reader, "Pack", "Leave", time, pe_num, details) # Unpacking a received message elif int(line_arr[0]) == ProjectionsConstants.BEGIN_UNPACK: @@ -397,7 +367,7 @@ def _read_log_file(self, rank_size) -> pd.DataFrame: details = {"From PE": pe} - _add_to_trace_dict(data, "Unpack", "Enter", time, pe_num, details) + _add_to_trace(core_reader, "Unpack", "Enter", time, pe_num, details) elif int(line_arr[0]) == ProjectionsConstants.END_UNPACK: time = int(line_arr[1]) * 1000 @@ -405,14 +375,14 @@ def _read_log_file(self, rank_size) -> pd.DataFrame: details = {"From PE": pe} - _add_to_trace_dict(data, "Unpack", "Leave", time, pe_num, details) + _add_to_trace(core_reader, "Unpack", "Leave", time, pe_num, details) elif int(line_arr[0]) == ProjectionsConstants.USER_SUPPLIED: user_supplied = line_arr[1] details = {"User Supplied": user_supplied} - _add_to_trace_dict( - data, "User Supplied", "Instant", -1, pe_num, details + _add_to_trace( + core_reader, "User Supplied", "Instant", -1, pe_num, details ) elif int(line_arr[0]) == ProjectionsConstants.USER_SUPPLIED_NOTE: @@ -423,8 +393,13 @@ def _read_log_file(self, rank_size) -> pd.DataFrame: details = {"Note": note} - _add_to_trace_dict( - data, "User Supplied Note", "Instant", time, pe_num, details + _add_to_trace( + core_reader, + "User Supplied Note", + "Instant", + time, + pe_num, + details, ) # Not sure if this should be instant or enter/leave @@ -446,8 +421,8 @@ def _read_log_file(self, rank_size) -> pd.DataFrame: "Note": note, } - _add_to_trace_dict( - data, + _add_to_trace( + core_reader, "User Supplied Bracketed Note", "Enter", time, @@ -455,8 +430,8 @@ def _read_log_file(self, rank_size) -> pd.DataFrame: details, ) - _add_to_trace_dict( - data, + _add_to_trace( + core_reader, "User Supplied Bracketed Note", "Leave", end_time, @@ -471,8 +446,8 @@ def _read_log_file(self, rank_size) -> pd.DataFrame: details = {"Memory Usage": memory_usage} - _add_to_trace_dict( - data, "Memory Usage", "Instant", time, pe_num, details + _add_to_trace( + core_reader, "Memory Usage", "Instant", time, pe_num, details ) # New chare create message being sent @@ -494,8 +469,8 @@ def _read_log_file(self, rank_size) -> pd.DataFrame: "Send Time": send_time, } - _add_to_trace_dict( - data, + _add_to_trace( + core_reader, sts_reader.get_entry_name(entry), "Instant", time, @@ -526,8 +501,8 @@ def _read_log_file(self, rank_size) -> pd.DataFrame: "Destinatopn PEs": dest_procs, } - _add_to_trace_dict( - data, + _add_to_trace( + core_reader, sts_reader.get_entry_name(entry), "Instant", time, @@ -567,8 +542,8 @@ def _read_log_file(self, rank_size) -> pd.DataFrame: "perf counts list": perf_counts, } - _add_to_trace_dict( - data, + _add_to_trace( + core_reader, sts_reader.get_entry_name(entry), "Enter", time, @@ -599,8 +574,8 @@ def _read_log_file(self, rank_size) -> pd.DataFrame: "perf counts list": perf_counts, } - _add_to_trace_dict( - data, + _add_to_trace( + core_reader, sts_reader.get_entry_name(entry), "Leave", time, @@ -612,12 +587,12 @@ def _read_log_file(self, rank_size) -> pd.DataFrame: elif int(line_arr[0]) == ProjectionsConstants.BEGIN_TRACE: time = int(line_arr[1]) * 1000 - _add_to_trace_dict(data, "Trace", "Enter", time, pe_num, None) + _add_to_trace(core_reader, "Trace", "Enter", time, pe_num, None) elif int(line_arr[0]) == ProjectionsConstants.END_TRACE: time = int(line_arr[1]) * 1000 - _add_to_trace_dict(data, "Trace", "Leave", time, pe_num, None) + _add_to_trace(core_reader, "Trace", "Leave", time, pe_num, None) # Message Receive ? elif int(line_arr[0]) == ProjectionsConstants.MESSAGE_RECV: @@ -634,8 +609,8 @@ def _read_log_file(self, rank_size) -> pd.DataFrame: "Message Length": message_length, } - _add_to_trace_dict( - data, "Message Receive", "Instant", time, pe_num, details + _add_to_trace( + core_reader, "Message Receive", "Instant", time, pe_num, details ) # queueing creation ? @@ -647,7 +622,9 @@ def _read_log_file(self, rank_size) -> pd.DataFrame: details = {"From PE": pe, "Message Type": mtype, "Event ID": event} - _add_to_trace_dict(data, "Enque", "Instant", time, pe_num, details) + _add_to_trace( + core_reader, "Enque", "Instant", time, pe_num, details + ) elif int(line_arr[0]) == ProjectionsConstants.DEQUEUE: mtype = int(line_arr[1]) @@ -657,7 +634,9 @@ def _read_log_file(self, rank_size) -> pd.DataFrame: details = {"From PE": pe, "Message Type": mtype, "Event ID": event} - _add_to_trace_dict(data, "Deque", "Instant", time, pe_num, details) + _add_to_trace( + core_reader, "Deque", "Instant", time, pe_num, details + ) # Interrupt from different chare ? elif int(line_arr[0]) == ProjectionsConstants.BEGIN_INTERRUPT: @@ -667,8 +646,8 @@ def _read_log_file(self, rank_size) -> pd.DataFrame: details = {"From PE": pe, "Event ID": event} - _add_to_trace_dict( - data, "Interrupt", "Enter", time, pe_num, details + _add_to_trace( + core_reader, "Interrupt", "Enter", time, pe_num, details ) elif int(line_arr[0]) == ProjectionsConstants.END_INTERRUPT: @@ -678,20 +657,24 @@ def _read_log_file(self, rank_size) -> pd.DataFrame: details = {"From PE": pe, "Event ID": event} - _add_to_trace_dict( - data, "Interrupt", "Leave", time, pe_num, details + _add_to_trace( + core_reader, "Interrupt", "Leave", time, pe_num, details ) # Very start of the program - encapsulates every other event elif int(line_arr[0]) == ProjectionsConstants.BEGIN_COMPUTATION: time = int(line_arr[1]) * 1000 - _add_to_trace_dict(data, "Computation", "Enter", time, pe_num, None) + _add_to_trace( + core_reader, "Computation", "Enter", time, pe_num, None + ) elif int(line_arr[0]) == ProjectionsConstants.END_COMPUTATION: time = int(line_arr[1]) * 1000 - _add_to_trace_dict(data, "Computation", "Leave", time, pe_num, None) + _add_to_trace( + core_reader, "Computation", "Leave", time, pe_num, None + ) # User event (in code) elif int(line_arr[0]) == ProjectionsConstants.USER_EVENT: @@ -708,8 +691,8 @@ def _read_log_file(self, rank_size) -> pd.DataFrame: "Event Type": "User Event", } - _add_to_trace_dict( - data, user_event_name, "Instant", time, pe_num, details + _add_to_trace( + core_reader, user_event_name, "Instant", time, pe_num, details ) elif int(line_arr[0]) == ProjectionsConstants.USER_EVENT_PAIR: @@ -728,8 +711,8 @@ def _read_log_file(self, rank_size) -> pd.DataFrame: "Event Type": "User Event Pair", } - _add_to_trace_dict( - data, user_event_name, "Instant", time, pe_num, details + _add_to_trace( + core_reader, user_event_name, "Instant", time, pe_num, details ) elif int(line_arr[0]) == ProjectionsConstants.BEGIN_USER_EVENT_PAIR: @@ -746,8 +729,8 @@ def _read_log_file(self, rank_size) -> pd.DataFrame: "User Event Name": sts_reader.get_user_event(user_event_id), } - _add_to_trace_dict( - data, "User Event Pair", "Enter", time, pe_num, details + _add_to_trace( + core_reader, "User Event Pair", "Enter", time, pe_num, details ) elif int(line_arr[0]) == ProjectionsConstants.END_USER_EVENT_PAIR: @@ -764,9 +747,7 @@ def _read_log_file(self, rank_size) -> pd.DataFrame: "User Event Name": sts_reader.get_user_event(user_event_id), } - _add_to_trace_dict( - "User Event Pair", "Leave", time, pe_num, details - ) + _add_to_trace("User Event Pair", "Leave", time, pe_num, details) # User stat (in code) elif int(line_arr[0]) == ProjectionsConstants.USER_STAT: @@ -785,24 +766,23 @@ def _read_log_file(self, rank_size) -> pd.DataFrame: "Event Type": "User Stat", } - _add_to_trace_dict( - data, user_stat_name, "Instant", time, pe_num, details + _add_to_trace( + core_reader, user_stat_name, "Instant", time, pe_num, details ) - # Making sure that the log file ends with END_COMPUTATION - if len(data["Name"]) > 0 and data["Name"][-1] != "Computation": - time = data["Timestamp (ns)"][-1] * 1000 - _add_to_trace_dict(data, "Computation", "Leave", time, pe_num, None) - log_file.close() - dfs.append(pd.DataFrame(data)) - return pd.concat(dfs) + return core_reader.finalize() -def _add_to_trace_dict(data, name, evt_type, time, process, attributes): - data["Name"].append(name) - data["Event Type"].append(evt_type) - data["Timestamp (ns)"].append(time) - data["Process"].append(process) - data["Attributes"].append(attributes) +def _add_to_trace( + core_reader: CoreTraceReader, name, evt_type, time, process, attributes +): + new_event = { + "Name": name, + "Event Type": evt_type, + "Timestamp (ns)": time, + "Process": process, + "Attributes": attributes, + } + core_reader.add_event(new_event)