From cb55f9e31c0494fda0866ad3dd28d1007ca4c824 Mon Sep 17 00:00:00 2001 From: Alexander Movsesyan Date: Fri, 27 Sep 2024 00:20:14 -0400 Subject: [PATCH 01/20] initial draft of base reader --- pipit/readers/base_reader.py | 126 +++++++++++++++++++++++++++++++++++ 1 file changed, 126 insertions(+) create mode 100644 pipit/readers/base_reader.py diff --git a/pipit/readers/base_reader.py b/pipit/readers/base_reader.py new file mode 100644 index 00000000..caefad5a --- /dev/null +++ b/pipit/readers/base_reader.py @@ -0,0 +1,126 @@ +from abc import ABC, abstractmethod +from typing import List, Dict +from ..graph import Graph, Node +import numpy + + +class BaseTraceReader(ABC): + + + # The following methods should be called by each reader class + def create_empty_trace(self, num_processes: int, create_cct: bool) -> None: + # keep track if we want to create a CCT + self.create_cct = create_cct + + # keep track of a unique id for each event + self.unique_id = -1 + + # events are indexed by process number, then thread number + self.events: List[Dict[List[Dict]]] = [{}] * num_processes + + # stacks are indexed by process number, then thread number + self.stacks: List[Dict[List[int]]] = [{}] * num_processes + + self.ccts: List[Dict[Graph]] = [{}] * num_processes + + + + def add_event(self, event: Dict) -> None: + + # get process number -- if not present, set to 0 + if "process" in event: + process = event["process"] + else: + process = 0 + + # get process number -- if not present, set to 0 + if "thread" in event: + thread = event["thread"] + else: + thread = 0 + + # assign a unique id to the event + event["id"] = self.__get_unique_id() + + + # get event list + if thread not in self.events[process]: + self.events[process][thread] = [] + event_list: List[Dict] = self.events[process][thread] + + # get stack + if thread not in self.stacks[process]: + self.stacks[process][thread] = [] + stack: List[int] = self.stacks[process][thread] + + # if the event is an enter event, add the event to the stack and CCT + if event["Event Type"] == "Enter": + cct = None + # if we are creating a CCT, get the correct CCT + if self.create_cct: + if thread not in self.ccts[process]: + self.ccts[process][thread] = Graph() + cct = self.ccts[process][thread] + self.__update_cct_and_parent_child_relationships(event, self.stacks[process][thread], event_list, cct) + elif event["Event Type"] == "Leave": + self.__update_match_event(event, self.stacks[process][thread], event_list) + + + def finalize(self) -> None: + pass + + # Helper methods + + # This method can be thought of the update upon an "Enter" event + # It adds to the stack and CCT + def __update_cct_and_parent_child_relationships(self, event: Dict, stack: List[int], event_list: List[Dict], cct: Graph) -> None: + if len(stack) == 0: + # root event + event["parent"] = numpy.nan + if self.create_cct: + new_graph_node = Node(event["id"], None) + cct.add_root(new_graph_node) + event["Node"] = new_graph_node + else: + parent_event = event_list[stack[-1]] + event["parent"] = parent_event["id"] + if self.create_cct: + new_graph_node = Node(event["id"], parent_event["Node"]) + parent_event["Node"].add_child(new_graph_node) + event["Node"] = new_graph_node + + # update stack and event list + stack.append(len(event_list) - 1) + # event_list.append(event) + + + # def __update_cct(self, event: Dict, process: int, thread: int) -> None: + # pass + + # This method can be thought of the update upon a "Leave" event + # It pops from the stack and updates the event list + # We should look into using this function to add artificial "Leave" events for unmatched "Enter" events + def __update_match_event(self, leave_event: Dict, stack: List[int], event_list: List[Dict]) -> None: + + while len(stack) > 0: + enter_event = event_list[stack[-1]] + + if enter_event["Name"] == leave_event["Name"]: + # matching event found + + # update matching event ids + leave_event["_matching_event"] = enter_event["id"] + enter_event["_matching_event"] = leave_event["id"] + + # popping matched events from the stack + stack.pop() + break + else: + # popping unmatched events from the stack + stack.pop() + + # event_list.append(leave_event) + + def __get_unique_id(self) -> int: + self.unique_id += 1 + return self.unique_id \ No newline at end of file From 8d7d9e2667c839a91694acee77433b92fa130a0d Mon Sep 17 00:00:00 2001 From: Alexander Movsesyan Date: Mon, 30 Sep 2024 13:39:36 -0400 Subject: [PATCH 02/20] updates to base reader --- pipit/readers/base_reader.py | 31 +++++++++++++++++++++---------- 1 file changed, 21 insertions(+), 10 deletions(-) diff --git a/pipit/readers/base_reader.py b/pipit/readers/base_reader.py index caefad5a..419ea572 100644 --- a/pipit/readers/base_reader.py +++ b/pipit/readers/base_reader.py @@ -1,5 +1,8 @@ from abc import ABC, abstractmethod from typing import List, Dict + +import pandas as pd + from ..graph import Graph, Node import numpy @@ -16,12 +19,12 @@ def create_empty_trace(self, num_processes: int, create_cct: bool) -> None: self.unique_id = -1 # events are indexed by process number, then thread number - self.events: List[Dict[List[Dict]]] = [{}] * num_processes + self.events: List[Dict[int, List[Dict]]] = [{}] * num_processes # stacks are indexed by process number, then thread number - self.stacks: List[Dict[List[int]]] = [{}] * num_processes + self.stacks: List[Dict[int, List[int]]] = [{}] * num_processes - self.ccts: List[Dict[Graph]] = [{}] * num_processes + self.ccts: List[Dict[int, Graph]] = [{}] * num_processes @@ -67,6 +70,12 @@ def add_event(self, event: Dict) -> None: def finalize(self) -> None: + # first step put everything in one list + all_events = [] + for process in self.events: + for thread in process: + all_events.extend(process[thread]) + self.dataframe = pd.DataFrame(all_events) pass # Helper methods @@ -77,17 +86,19 @@ def __update_cct_and_parent_child_relationships(self, event: Dict, stack: List[i if len(stack) == 0: # root event event["parent"] = numpy.nan - if self.create_cct: - new_graph_node = Node(event["id"], None) - cct.add_root(new_graph_node) - event["Node"] = new_graph_node + # if self.create_cct: + # new_graph_node = Node(event["id"], None) + # cct.add_root(new_graph_node) + # event["Node"] = new_graph_node else: parent_event = event_list[stack[-1]] event["parent"] = parent_event["id"] if self.create_cct: - new_graph_node = Node(event["id"], parent_event["Node"]) - parent_event["Node"].add_child(new_graph_node) - event["Node"] = new_graph_node + parent_graph_node = parent_event["Node"] + # if + # new_graph_node = Node(event["id"], parent_event["Node"]) + # parent_event["Node"].add_child(new_graph_node) + # event["Node"] = new_graph_node # update stack and event list stack.append(len(event_list) - 1) From ceae939419f8f86c1019fe477121d93a7413a1e3 Mon Sep 17 00:00:00 2001 From: Alexander Movsesyan Date: Thu, 3 Oct 2024 13:13:56 -0400 Subject: [PATCH 03/20] base reader working without cct creation --- pipit/readers/base_reader.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/pipit/readers/base_reader.py b/pipit/readers/base_reader.py index 419ea572..a33e1e29 100644 --- a/pipit/readers/base_reader.py +++ b/pipit/readers/base_reader.py @@ -3,12 +3,17 @@ import pandas as pd +from .. import Trace from ..graph import Graph, Node import numpy class BaseTraceReader(ABC): + @abstractmethod + def read(self) -> Trace: + pass + # The following methods should be called by each reader class def create_empty_trace(self, num_processes: int, create_cct: bool) -> None: @@ -68,6 +73,8 @@ def add_event(self, event: Dict) -> None: elif event["Event Type"] == "Leave": self.__update_match_event(event, self.stacks[process][thread], event_list) + event_list.append(event) + def finalize(self) -> None: # first step put everything in one list @@ -75,8 +82,9 @@ def finalize(self) -> None: for process in self.events: for thread in process: all_events.extend(process[thread]) - self.dataframe = pd.DataFrame(all_events) - pass + # create a dataframe + self.events_dataframe = pd.DataFrame(all_events) + self.trace = Trace(None, self.events_dataframe, None) # Helper methods @@ -101,7 +109,7 @@ def __update_cct_and_parent_child_relationships(self, event: Dict, stack: List[i # event["Node"] = new_graph_node # update stack and event list - stack.append(len(event_list) - 1) + stack.append(len(event_list)) # event_list.append(event) From 8f7880af4e3389f7ee438cff8374c4fb898bf210 Mon Sep 17 00:00:00 2001 From: Alexander Movsesyan Date: Sun, 13 Oct 2024 18:36:02 -0400 Subject: [PATCH 04/20] Base Reader for single threaded reading --- pipit/readers/base_reader.py | 145 ++++++++++++++++++++--------------- 1 file changed, 83 insertions(+), 62 deletions(-) diff --git a/pipit/readers/base_reader.py b/pipit/readers/base_reader.py index a33e1e29..89057ab3 100644 --- a/pipit/readers/base_reader.py +++ b/pipit/readers/base_reader.py @@ -1,6 +1,7 @@ from abc import ABC, abstractmethod from typing import List, Dict +import pandas import pandas as pd from .. import Trace @@ -16,105 +17,128 @@ def read(self) -> Trace: # The following methods should be called by each reader class - def create_empty_trace(self, num_processes: int, create_cct: bool) -> None: - # keep track if we want to create a CCT - self.create_cct = create_cct - + def create_empty_trace(self, num_processes: int) -> None: # keep track of a unique id for each event self.unique_id = -1 # events are indexed by process number, then thread number - self.events: List[Dict[int, List[Dict]]] = [{}] * num_processes + # stores a list of events + self.events: List[Dict[int, List[Dict]]] = [] + for i in range(num_processes): + self.events.append({}) # stacks are indexed by process number, then thread number + # stores indices of events in the event list self.stacks: List[Dict[int, List[int]]] = [{}] * num_processes - self.ccts: List[Dict[int, Graph]] = [{}] * num_processes - def add_event(self, event: Dict) -> None: # get process number -- if not present, set to 0 - if "process" in event: - process = event["process"] + if "Process" in event: + process = event["Process"] else: + print("something is wrong") process = 0 - # get process number -- if not present, set to 0 - if "thread" in event: - thread = event["thread"] + # get thread number -- if not present, set to 0 + if "Thread" in event: + print("something is wrong") + thread = event["Thread"] else: thread = 0 + # event["Thread"] = 0 # assign a unique id to the event - event["id"] = self.__get_unique_id() + event["unique_id"] = self.__get_unique_id() + process_events = self.events[process] + process_stacks = self.stacks[process] + # get event list - if thread not in self.events[process]: - self.events[process][thread] = [] - event_list: List[Dict] = self.events[process][thread] + if thread not in process_events: + process_events[thread] = [] + event_list = process_events[thread] # get stack - if thread not in self.stacks[process]: - self.stacks[process][thread] = [] - stack: List[int] = self.stacks[process][thread] + if thread not in process_stacks: + process_stacks[thread] = [] + stack: List[int] = process_stacks[thread] - # if the event is an enter event, add the event to the stack and CCT + # if the event is an enter event, add the event to the stack and update the parent-child relationships if event["Event Type"] == "Enter": - cct = None - # if we are creating a CCT, get the correct CCT - if self.create_cct: - if thread not in self.ccts[process]: - self.ccts[process][thread] = Graph() - cct = self.ccts[process][thread] - self.__update_cct_and_parent_child_relationships(event, self.stacks[process][thread], event_list, cct) + self.__update_parent_child_relationships(event, stack, event_list) + # if the event is a leave event, update the matching event and pop from the stack elif event["Event Type"] == "Leave": - self.__update_match_event(event, self.stacks[process][thread], event_list) + self.__update_match_event(event, stack, event_list) event_list.append(event) + x = 0 - - def finalize(self) -> None: + def finalize_process(self, process: int) -> pd.DataFrame: # first step put everything in one list + # all_events = [] + # for process in self.events: + # for thread in process: + # all_events.extend(process[thread]) + + # convert 3d list of events to 1d list all_events = [] - for process in self.events: - for thread in process: - all_events.extend(process[thread]) + for thread_id in self.events[process]: + all_events.extend(self.events[process][thread_id]) + # df = pd.DataFrame(self.events[proc_id][thread_id]) + # just_for_break = 0 + + # for i in range(len(self.events[proc_id][thread_id])): + # all_events.append(self.events[proc_id][thread_id][i]) + # pass + # print(self.events[i][j][k]['Process']) + + # print('all_events has length: ' + str(len(all_events))) + # for i in range(len(self.events)): + # print(f'self.events[{i}] has length', len(self.events[i].keys())) + # # print(type(self.events[i])) + # # print (self.events[i].keys()) + # for j in self.events[i]: + # # print(j) + # # print (j in self.events[i]) + # # print(self.events[i][j]) + # print(f'self.events[{i}][{j}] has length', len(self.events[i][j])) + + # df_list = [] + # for process in self.events: + # for thread in process: + # df_list.append(pd.DataFrame(process[thread])) + # all_events = pd.concat(df_list) + # create a dataframe - self.events_dataframe = pd.DataFrame(all_events) - self.trace = Trace(None, self.events_dataframe, None) + df = pd.DataFrame(all_events) + # print(df.head()) + # print(self.events_dataframe["unique_id"].value_counts()) + return df + # self.events_dataframe = pandas.DataFrame(all_events) + # print number of events per id + # print(self.events_dataframe.sort_values(by=["unique_id"])) + # self.trace = Trace(None, self.events_dataframe, None) # Helper methods # This method can be thought of the update upon an "Enter" event # It adds to the stack and CCT - def __update_cct_and_parent_child_relationships(self, event: Dict, stack: List[int], event_list: List[Dict], cct: Graph) -> None: + def __update_parent_child_relationships(self, event: Dict, stack: List[int], event_list: List[Dict]) -> None: if len(stack) == 0: # root event event["parent"] = numpy.nan - # if self.create_cct: - # new_graph_node = Node(event["id"], None) - # cct.add_root(new_graph_node) - # event["Node"] = new_graph_node else: parent_event = event_list[stack[-1]] - event["parent"] = parent_event["id"] - if self.create_cct: - parent_graph_node = parent_event["Node"] - # if - # new_graph_node = Node(event["id"], parent_event["Node"]) - # parent_event["Node"].add_child(new_graph_node) - # event["Node"] = new_graph_node - - # update stack and event list + event["parent"] = parent_event["unique_id"] + + + # update stack stack.append(len(event_list)) - # event_list.append(event) - - # def __update_cct(self, event: Dict, process: int, thread: int) -> None: - # pass # This method can be thought of the update upon a "Leave" event # It pops from the stack and updates the event list @@ -122,23 +146,20 @@ def __update_cct_and_parent_child_relationships(self, event: Dict, stack: List[i def __update_match_event(self, leave_event: Dict, stack: List[int], event_list: List[Dict]) -> None: while len(stack) > 0: - enter_event = event_list[stack[-1]] + + # popping matched events from the stack + enter_event = event_list[stack.pop(-1)] + if enter_event["Name"] == leave_event["Name"]: # matching event found # update matching event ids - leave_event["_matching_event"] = enter_event["id"] - enter_event["_matching_event"] = leave_event["id"] + leave_event["_matching_event"] = enter_event["unique_id"] + enter_event["_matching_event"] = leave_event["unique_id"] - # popping matched events from the stack - stack.pop() break - else: - # popping unmatched events from the stack - stack.pop() - # event_list.append(leave_event) def __get_unique_id(self) -> int: self.unique_id += 1 From 65e433232c7f7c205c2cca0472f5ace8ac94425a Mon Sep 17 00:00:00 2001 From: Alexander Movsesyan Date: Sun, 13 Oct 2024 19:47:02 -0400 Subject: [PATCH 05/20] changes to make core reader more parallel-friendly --- pipit/readers/base_reader.py | 166 ----------------------------------- pipit/readers/core_reader.py | 124 ++++++++++++++++++++++++++ 2 files changed, 124 insertions(+), 166 deletions(-) delete mode 100644 pipit/readers/base_reader.py create mode 100644 pipit/readers/core_reader.py diff --git a/pipit/readers/base_reader.py b/pipit/readers/base_reader.py deleted file mode 100644 index 89057ab3..00000000 --- a/pipit/readers/base_reader.py +++ /dev/null @@ -1,166 +0,0 @@ -from abc import ABC, abstractmethod -from typing import List, Dict - -import pandas -import pandas as pd - -from .. import Trace -from ..graph import Graph, Node -import numpy - - -class BaseTraceReader(ABC): - - @abstractmethod - def read(self) -> Trace: - pass - - - # The following methods should be called by each reader class - def create_empty_trace(self, num_processes: int) -> None: - # keep track of a unique id for each event - self.unique_id = -1 - - # events are indexed by process number, then thread number - # stores a list of events - self.events: List[Dict[int, List[Dict]]] = [] - for i in range(num_processes): - self.events.append({}) - - # stacks are indexed by process number, then thread number - # stores indices of events in the event list - self.stacks: List[Dict[int, List[int]]] = [{}] * num_processes - - - - def add_event(self, event: Dict) -> None: - - # get process number -- if not present, set to 0 - if "Process" in event: - process = event["Process"] - else: - print("something is wrong") - process = 0 - - # get thread number -- if not present, set to 0 - if "Thread" in event: - print("something is wrong") - thread = event["Thread"] - else: - thread = 0 - # event["Thread"] = 0 - - # assign a unique id to the event - event["unique_id"] = self.__get_unique_id() - - - process_events = self.events[process] - process_stacks = self.stacks[process] - - # get event list - if thread not in process_events: - process_events[thread] = [] - event_list = process_events[thread] - - # get stack - if thread not in process_stacks: - process_stacks[thread] = [] - stack: List[int] = process_stacks[thread] - - # if the event is an enter event, add the event to the stack and update the parent-child relationships - if event["Event Type"] == "Enter": - self.__update_parent_child_relationships(event, stack, event_list) - # if the event is a leave event, update the matching event and pop from the stack - elif event["Event Type"] == "Leave": - self.__update_match_event(event, stack, event_list) - - event_list.append(event) - x = 0 - - def finalize_process(self, process: int) -> pd.DataFrame: - # first step put everything in one list - # all_events = [] - # for process in self.events: - # for thread in process: - # all_events.extend(process[thread]) - - # convert 3d list of events to 1d list - all_events = [] - for thread_id in self.events[process]: - all_events.extend(self.events[process][thread_id]) - # df = pd.DataFrame(self.events[proc_id][thread_id]) - # just_for_break = 0 - - # for i in range(len(self.events[proc_id][thread_id])): - # all_events.append(self.events[proc_id][thread_id][i]) - # pass - # print(self.events[i][j][k]['Process']) - - # print('all_events has length: ' + str(len(all_events))) - # for i in range(len(self.events)): - # print(f'self.events[{i}] has length', len(self.events[i].keys())) - # # print(type(self.events[i])) - # # print (self.events[i].keys()) - # for j in self.events[i]: - # # print(j) - # # print (j in self.events[i]) - # # print(self.events[i][j]) - # print(f'self.events[{i}][{j}] has length', len(self.events[i][j])) - - # df_list = [] - # for process in self.events: - # for thread in process: - # df_list.append(pd.DataFrame(process[thread])) - # all_events = pd.concat(df_list) - - # create a dataframe - df = pd.DataFrame(all_events) - # print(df.head()) - # print(self.events_dataframe["unique_id"].value_counts()) - return df - # self.events_dataframe = pandas.DataFrame(all_events) - # print number of events per id - # print(self.events_dataframe.sort_values(by=["unique_id"])) - # self.trace = Trace(None, self.events_dataframe, None) - - # Helper methods - - # This method can be thought of the update upon an "Enter" event - # It adds to the stack and CCT - def __update_parent_child_relationships(self, event: Dict, stack: List[int], event_list: List[Dict]) -> None: - if len(stack) == 0: - # root event - event["parent"] = numpy.nan - else: - parent_event = event_list[stack[-1]] - event["parent"] = parent_event["unique_id"] - - - # update stack - stack.append(len(event_list)) - - - # This method can be thought of the update upon a "Leave" event - # It pops from the stack and updates the event list - # We should look into using this function to add artificial "Leave" events for unmatched "Enter" events - def __update_match_event(self, leave_event: Dict, stack: List[int], event_list: List[Dict]) -> None: - - while len(stack) > 0: - - # popping matched events from the stack - enter_event = event_list[stack.pop(-1)] - - - if enter_event["Name"] == leave_event["Name"]: - # matching event found - - # update matching event ids - leave_event["_matching_event"] = enter_event["unique_id"] - enter_event["_matching_event"] = leave_event["unique_id"] - - break - - - def __get_unique_id(self) -> int: - self.unique_id += 1 - return self.unique_id \ No newline at end of file diff --git a/pipit/readers/core_reader.py b/pipit/readers/core_reader.py new file mode 100644 index 00000000..9c08fcb5 --- /dev/null +++ b/pipit/readers/core_reader.py @@ -0,0 +1,124 @@ +from abc import ABC, abstractmethod +from typing import List, Dict + +import pandas +import numpy + + +class CoreTraceReader: + """ + Helper Object to read traces from different sources and convert them into a common format + """ + + def __init__(self): + """ + Should be called by each process to create an empty trace per process in the reader. Creates the following + data structures to represent an empty trace: + - events: Dict[int, Dict[int, List[Dict]]] + - stacks: Dict[int, Dict[int, List[int]]] + """ + # keep track of a unique id for each event + self.unique_id = -1 + + # events are indexed by process number, then thread number + # stores a list of events + self.events: Dict[int, Dict[int, List[Dict]]] = {} + + # stacks are indexed by process number, then thread number + # stores indices of events in the event list + self.stacks: Dict[int, Dict[int, List[int]]] = {} + + def add_event(self, event: Dict) -> None: + """ + Should be called to add each event to the trace. Will update the event lists and stacks accordingly. + """ + # get process number -- if not present, set to 0 + if "Process" in event: + process = event["Process"] + else: + process = 0 + + # get thread number -- if not present, set to 0 + if "Thread" in event: + thread = event["Thread"] + else: + thread = 0 + # event["Thread"] = 0 + + # assign a unique id to the event + event["unique_id"] = self.__get_unique_id() + + # get event list + if process not in self.events: + self.events[process] = {} + if thread not in self.events[process]: + self.events[process][thread] = [] + event_list = self.events[process][thread] + + # get stack + if process not in self.stacks: + self.stacks[process] = {} + if thread not in self.stacks[process]: + self.stacks[process][thread] = [] + stack: List[int] = self.stacks[process][thread] + + # if the event is an enter event, add the event to the stack and update the parent-child relationships + if event["Event Type"] == "Enter": + self.__update_parent_child_relationships(event, stack, event_list) + # if the event is a leave event, update the matching event and pop from the stack + elif event["Event Type"] == "Leave": + self.__update_match_event(event, stack, event_list) + + # Finally add the event to the event list + event_list.append(event) + + def finalize(self): + """ + Converts the events data structure into a pandas dataframe and returns it + """ + all_events = [] + for process in self.events: + for thread in self.events[process]: + all_events.extend(self.events[process][thread]) + + # create a dataframe + events_dataframe = pandas.DataFrame(all_events) + return events_dataframe + + def __update_parent_child_relationships(self, event: Dict, stack: List[int], event_list: List[Dict]) -> None: + """ + This method can be thought of the update upon an "Enter" event. It adds to the stack and CCT + """ + if len(stack) == 0: + # root event + event["parent"] = numpy.nan + else: + parent_event = event_list[stack[-1]] + event["parent"] = parent_event["unique_id"] + + # update stack + stack.append(len(event_list)) + + def __update_match_event(self, leave_event: Dict, stack: List[int], event_list: List[Dict]) -> None: + """ + This method can be thought of the update upon a "Leave" event. It pops from the stack and updates the event list. + We should look into using this function to add artificial "Leave" events for unmatched "Enter" events + """ + + while len(stack) > 0: + + # popping matched events from the stack + enter_event = event_list[stack.pop(-1)] + + if enter_event["Name"] == leave_event["Name"]: + # matching event found + + # update matching event ids + leave_event["_matching_event"] = enter_event["unique_id"] + enter_event["_matching_event"] = leave_event["unique_id"] + + break + + def __get_unique_id(self) -> int: + self.unique_id += 1 + return self.unique_id From b959feed321a35e736cbfc06902dca8c4625cd10 Mon Sep 17 00:00:00 2001 From: Alexander Movsesyan Date: Sun, 13 Oct 2024 21:46:50 -0400 Subject: [PATCH 06/20] updates to have strided unique_id, take care of instant events, and to concat trace dfs --- pipit/readers/core_reader.py | 59 ++++++++++++++++++++++++++++-------- 1 file changed, 47 insertions(+), 12 deletions(-) diff --git a/pipit/readers/core_reader.py b/pipit/readers/core_reader.py index 9c08fcb5..8fd8aa73 100644 --- a/pipit/readers/core_reader.py +++ b/pipit/readers/core_reader.py @@ -3,6 +3,7 @@ import pandas import numpy +from pipit.trace import Trace class CoreTraceReader: @@ -10,15 +11,18 @@ class CoreTraceReader: Helper Object to read traces from different sources and convert them into a common format """ - def __init__(self): + def __init__(self, start: int = 0, stride: int = 1): """ Should be called by each process to create an empty trace per process in the reader. Creates the following data structures to represent an empty trace: - events: Dict[int, Dict[int, List[Dict]]] - stacks: Dict[int, Dict[int, List[int]]] """ + # keep stride for how much unique id should be incremented + self.stride = stride + # keep track of a unique id for each event - self.unique_id = -1 + self.unique_id = start - self.stride # events are indexed by process number, then thread number # stores a list of events @@ -64,7 +68,9 @@ def add_event(self, event: Dict) -> None: # if the event is an enter event, add the event to the stack and update the parent-child relationships if event["Event Type"] == "Enter": - self.__update_parent_child_relationships(event, stack, event_list) + self.__update_parent_child_relationships(event, stack, event_list, False) + elif event["Event Type"] == "Instant": + self.__update_parent_child_relationships(event, stack, event_list, True) # if the event is a leave event, update the matching event and pop from the stack elif event["Event Type"] == "Leave": self.__update_match_event(event, stack, event_list) @@ -82,22 +88,35 @@ def finalize(self): all_events.extend(self.events[process][thread]) # create a dataframe - events_dataframe = pandas.DataFrame(all_events) - return events_dataframe - - def __update_parent_child_relationships(self, event: Dict, stack: List[int], event_list: List[Dict]) -> None: + trace_df = pandas.DataFrame(all_events) + + # categorical for memory savings + trace_df = trace_df.astype( + { + "Name": "category", + "Event Type": "category", + "Process": "category", + "_matching_event": "Int32", + "_parent": "Int32", + "_matching_timestamp": "Int32", + } + ) + return trace_df + + def __update_parent_child_relationships(self, event: Dict, stack: List[int], event_list: List[Dict],is_instant: bool) -> None: """ This method can be thought of the update upon an "Enter" event. It adds to the stack and CCT """ if len(stack) == 0: # root event - event["parent"] = numpy.nan + event["_parent"] = numpy.nan else: parent_event = event_list[stack[-1]] - event["parent"] = parent_event["unique_id"] + event["_parent"] = parent_event["unique_id"] # update stack - stack.append(len(event_list)) + if not is_instant: + stack.append(len(event_list)) def __update_match_event(self, leave_event: Dict, stack: List[int], event_list: List[Dict]) -> None: """ @@ -108,7 +127,7 @@ def __update_match_event(self, leave_event: Dict, stack: List[int], event_list: while len(stack) > 0: # popping matched events from the stack - enter_event = event_list[stack.pop(-1)] + enter_event = event_list[stack.pop()] if enter_event["Name"] == leave_event["Name"]: # matching event found @@ -117,8 +136,24 @@ def __update_match_event(self, leave_event: Dict, stack: List[int], event_list: leave_event["_matching_event"] = enter_event["unique_id"] enter_event["_matching_event"] = leave_event["unique_id"] + # update matching timestamps + leave_event["_matching_timestamp"] = enter_event["Timestamp (ns)"] + enter_event["_matching_timestamp"] = leave_event["Timestamp (ns)"] + break def __get_unique_id(self) -> int: - self.unique_id += 1 + self.unique_id += self.stride return self.unique_id + +def concat_trace_data(data_list): + """ + Concatenates the data from multiple trace readers into a single trace reader + """ + trace_data = pandas.concat(data_list, ignore_index=True) + # set index to unique_id + trace_data.set_index("unique_id", inplace=True) + trace_data.sort_values( + by="Timestamp (ns)", axis=0, ascending=True, inplace=True, ignore_index=True + ) + return Trace(None, trace_data, None) From 1fd2bc92e5c4456ef600720da0c8b97e111da0c4 Mon Sep 17 00:00:00 2001 From: Alexander Movsesyan Date: Sun, 13 Oct 2024 21:58:47 -0400 Subject: [PATCH 07/20] style updates --- pipit/readers/core_reader.py | 38 +++++++++++++++++++++++------------- 1 file changed, 24 insertions(+), 14 deletions(-) diff --git a/pipit/readers/core_reader.py b/pipit/readers/core_reader.py index 8fd8aa73..29a62fee 100644 --- a/pipit/readers/core_reader.py +++ b/pipit/readers/core_reader.py @@ -1,4 +1,3 @@ -from abc import ABC, abstractmethod from typing import List, Dict import pandas @@ -8,15 +7,16 @@ class CoreTraceReader: """ - Helper Object to read traces from different sources and convert them into a common format + Helper Object to read traces from different sources and convert them into a common + format """ def __init__(self, start: int = 0, stride: int = 1): """ - Should be called by each process to create an empty trace per process in the reader. Creates the following - data structures to represent an empty trace: - - events: Dict[int, Dict[int, List[Dict]]] - - stacks: Dict[int, Dict[int, List[int]]] + Should be called by each process to create an empty trace per process in the + reader. Creates the following data structures to represent an empty trace: + - events: Dict[int, Dict[int, List[Dict]]] + - stacks: Dict[int, Dict[int, List[int]]] """ # keep stride for how much unique id should be incremented self.stride = stride @@ -34,7 +34,8 @@ def __init__(self, start: int = 0, stride: int = 1): def add_event(self, event: Dict) -> None: """ - Should be called to add each event to the trace. Will update the event lists and stacks accordingly. + Should be called to add each event to the trace. Will update the event lists and + stacks accordingly. """ # get process number -- if not present, set to 0 if "Process" in event: @@ -66,12 +67,14 @@ def add_event(self, event: Dict) -> None: self.stacks[process][thread] = [] stack: List[int] = self.stacks[process][thread] - # if the event is an enter event, add the event to the stack and update the parent-child relationships + # if the event is an enter event, add the event to the stack and update the + # parent-child relationships if event["Event Type"] == "Enter": self.__update_parent_child_relationships(event, stack, event_list, False) elif event["Event Type"] == "Instant": self.__update_parent_child_relationships(event, stack, event_list, True) - # if the event is a leave event, update the matching event and pop from the stack + # if the event is a leave event, update the matching event and pop from the + # stack elif event["Event Type"] == "Leave": self.__update_match_event(event, stack, event_list) @@ -103,9 +106,12 @@ def finalize(self): ) return trace_df - def __update_parent_child_relationships(self, event: Dict, stack: List[int], event_list: List[Dict],is_instant: bool) -> None: + def __update_parent_child_relationships( + self, event: Dict, stack: List[int], event_list: List[Dict], is_instant: bool + ) -> None: """ - This method can be thought of the update upon an "Enter" event. It adds to the stack and CCT + This method can be thought of the update upon an "Enter" event. It adds to the + stack and CCT """ if len(stack) == 0: # root event @@ -118,10 +124,13 @@ def __update_parent_child_relationships(self, event: Dict, stack: List[int], eve if not is_instant: stack.append(len(event_list)) - def __update_match_event(self, leave_event: Dict, stack: List[int], event_list: List[Dict]) -> None: + def __update_match_event( + self, leave_event: Dict, stack: List[int], event_list: List[Dict] + ) -> None: """ - This method can be thought of the update upon a "Leave" event. It pops from the stack and updates the event list. - We should look into using this function to add artificial "Leave" events for unmatched "Enter" events + This method can be thought of the update upon a "Leave" event. It pops from the + stack and updates the event list. We should look into using this function to add + artificial "Leave" events for unmatched "Enter" events """ while len(stack) > 0: @@ -146,6 +155,7 @@ def __get_unique_id(self) -> int: self.unique_id += self.stride return self.unique_id + def concat_trace_data(data_list): """ Concatenates the data from multiple trace readers into a single trace reader From 0aa4669e5d3d717c1dbbc1595fd6e74d2c4a20de Mon Sep 17 00:00:00 2001 From: zoreoo <122127038+zoreoo@users.noreply.github.com> Date: Sun, 3 Nov 2024 23:19:33 -0500 Subject: [PATCH 08/20] Implement otf2 reader with core reader --- pipit/readers/otf2_reader.py | 85 ++++++++++++++++++------------------ 1 file changed, 42 insertions(+), 43 deletions(-) diff --git a/pipit/readers/otf2_reader.py b/pipit/readers/otf2_reader.py index b685c3a7..9aa33795 100644 --- a/pipit/readers/otf2_reader.py +++ b/pipit/readers/otf2_reader.py @@ -8,6 +8,7 @@ import pandas as pd import multiprocessing as mp import pipit.trace +from pipit.readers.core_reader import CoreTraceReader, concat_trace_data class OTF2Reader: @@ -162,6 +163,9 @@ def events_reader(self, rank_size): locations = list(trace.definitions._locations) num_locations = len(locations) + # start core reader + core_reader = CoreTraceReader(rank, size) + # base number of locations read by each process per_process = int(num_locations // size) @@ -190,11 +194,6 @@ def events_reader(self, rank_size): # select the locations to read based on above calculations loc_events = list(trace.events(locations[begin_int:end_int]).__iter__()) - # columns of the DataFrame - timestamps, event_types, event_attributes, names = [], [], [], [] - - # note: the below lists are for storing logical ids - process_ids, thread_ids = [], [] """ Relevant Documentation for Metrics: @@ -227,7 +226,7 @@ def events_reader(self, rank_size): ) # maps each metric to a list of its values - metrics_dict = {metric_name: [] for metric_name in metric_names} + metrics_dict = {metric_name: None for metric_name in metric_names} # used to keep track of time that the # most recent metrics that were read at @@ -239,6 +238,10 @@ def events_reader(self, rank_size): # location could be thread, process, etc loc, event = loc_event[0], loc_event[1] + timestamp, event_t, event_attribute, name = None, None, None, None + + process_id, thread_id = None, None + # To Do: # Support for GPU events has to be # added and unified across readers. @@ -257,11 +260,12 @@ def events_reader(self, rank_size): # append the values for the metrics # to their appropriate lists for i in range(len(metrics)): - metrics_dict[metrics[i]].append(metric_values[i]) + metrics_dict[metrics[i]] = metric_values[i] # store the metrics and their timestamp prev_metric_time = event.time else: + new_event = {} # MetricClass metric events are synchronous # and coupled with an enter or leave event that # has the same timestamp @@ -269,7 +273,15 @@ def events_reader(self, rank_size): # if the event is not paired with any metric, then # add placeholders for all the metric lists for metric in metric_names: - metrics_dict[metric].append(float("nan")) + metrics_dict[metric] = float("nan") + else: + for metric, metric_value in metrics_dict.items(): + # only add columns of metrics which are populated with + # some values (sometimes a metric could be defined but not + # appear in the trace itself) + if not np.isnan(metric_value): + new_event[metric] = metric_value + # reset this as a metric event was not read prev_metric_time = -1 @@ -280,28 +292,25 @@ def events_reader(self, rank_size): """ process_id = loc.group._ref - process_ids.append(process_id) # subtract the minimum location number of a process # from the location number to get threads numbered # 0 to (num_threads per process - 1) for each process. - thread_ids.append( - loc._ref - self.process_threads_map[process_id] - ) + thread_id = loc._ref - self.process_threads_map[process_id] # type of event - enter, leave, or other types event_type = str(type(event))[20:-2] if event_type == "Enter" or event_type == "Leave": - event_types.append(event_type) + event_t = event_type else: - event_types.append("Instant") + event_t = "Instant" if event_type in ["Enter", "Leave"]: - names.append(event.region.name) + name = event.region.name else: - names.append(event_type) + name = event_type - timestamps.append(event.time) + timestamp = event.time # only add attributes for non-leave rows so that # there aren't duplicate attributes for a single event @@ -319,34 +328,28 @@ def events_reader(self, rank_size): attributes_dict[self.field_to_val(key)] = ( self.handle_data(value) ) - event_attributes.append(attributes_dict) + event_attribute = attributes_dict else: # nan attributes for leave rows # attributes column is of object dtype - event_attributes.append(None) + event_attribute = None + + columns = { + "Name": name, + "Event Type": event_t, + "Timestamp (ns)": timestamp, + "Thread": thread_id, + "Process": process_id, + "Attributes": event_attribute, + } + + new_event.update(columns) + core_reader.add_event(new_event) trace.close() # close event files - # returns dataframe with all events and their fields - trace_df = pd.DataFrame( - { - "Timestamp (ns)": timestamps, - "Event Type": event_types, - "Name": names, - "Thread": thread_ids, - "Process": process_ids, - "Attributes": event_attributes, - } - ) - - for metric, metric_values in metrics_dict.items(): - # only add columns of metrics which are populated with - # some values (sometimes a metric could be defined but not - # appear in the trace itself) - if not np.isnan(metric_values).all(): - trace_df[metric] = metric_values - return trace_df + return core_reader.finalize() def read_definitions(self, trace): """ @@ -452,7 +455,7 @@ def read_events(self): pool.close() # merges the dataframe into one events dataframe - events_dataframe = pd.concat(events_dataframes) + events_dataframe = concat_trace_data(events_dataframes) del events_dataframes # accessing the clock properties of the trace using the definitions @@ -470,10 +473,6 @@ def read_events(self): events_dataframe["Timestamp (ns)"] -= offset events_dataframe["Timestamp (ns)"] *= (10**9) / resolution - # ensures the DataFrame is in order of increasing timestamp - events_dataframe.sort_values( - by="Timestamp (ns)", axis=0, ascending=True, inplace=True, ignore_index=True - ) # convert these to ints # (sometimes they get converted to floats From 4501825b26196e0fd3560d528806d954e7be6402 Mon Sep 17 00:00:00 2001 From: zoreoo <122127038+zoreoo@users.noreply.github.com> Date: Sun, 3 Nov 2024 23:27:45 -0500 Subject: [PATCH 09/20] Fix metrics for non-paired events --- pipit/readers/otf2_reader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipit/readers/otf2_reader.py b/pipit/readers/otf2_reader.py index 9aa33795..590daae2 100644 --- a/pipit/readers/otf2_reader.py +++ b/pipit/readers/otf2_reader.py @@ -273,7 +273,7 @@ def events_reader(self, rank_size): # if the event is not paired with any metric, then # add placeholders for all the metric lists for metric in metric_names: - metrics_dict[metric] = float("nan") + new_event[metric] = float("nan") else: for metric, metric_value in metrics_dict.items(): # only add columns of metrics which are populated with From e615e016f4c406a0b245c70ddee09f8e0ecd8867 Mon Sep 17 00:00:00 2001 From: zoreoo <122127038+zoreoo@users.noreply.github.com> Date: Sun, 3 Nov 2024 23:36:53 -0500 Subject: [PATCH 10/20] Fix another thing with metrics --- pipit/readers/otf2_reader.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/pipit/readers/otf2_reader.py b/pipit/readers/otf2_reader.py index 590daae2..4c25899c 100644 --- a/pipit/readers/otf2_reader.py +++ b/pipit/readers/otf2_reader.py @@ -226,7 +226,7 @@ def events_reader(self, rank_size): ) # maps each metric to a list of its values - metrics_dict = {metric_name: None for metric_name in metric_names} + metrics_dict = {metric_name: float("nan") for metric_name in metric_names} # used to keep track of time that the # most recent metrics that were read at @@ -276,11 +276,7 @@ def events_reader(self, rank_size): new_event[metric] = float("nan") else: for metric, metric_value in metrics_dict.items(): - # only add columns of metrics which are populated with - # some values (sometimes a metric could be defined but not - # appear in the trace itself) - if not np.isnan(metric_value): - new_event[metric] = metric_value + new_event[metric] = metric_value # reset this as a metric event was not read From 5a0157ec57139ff562bdf458ae12d0d1edb277ec Mon Sep 17 00:00:00 2001 From: zoreoo <122127038+zoreoo@users.noreply.github.com> Date: Sun, 3 Nov 2024 23:49:33 -0500 Subject: [PATCH 11/20] Ensure metrics_dict is reset --- pipit/readers/otf2_reader.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pipit/readers/otf2_reader.py b/pipit/readers/otf2_reader.py index 4c25899c..dd781ee2 100644 --- a/pipit/readers/otf2_reader.py +++ b/pipit/readers/otf2_reader.py @@ -281,6 +281,7 @@ def events_reader(self, rank_size): # reset this as a metric event was not read prev_metric_time = -1 + metrics_dict = {metric_name: float("nan") for metric_name in metric_names} """ Below is code to read the primary information about the From 02a00e5930c368be21b0730a5748b4e6adc234e6 Mon Sep 17 00:00:00 2001 From: Alexander Movsesyan Date: Thu, 7 Nov 2024 21:46:13 -0500 Subject: [PATCH 12/20] minor update --- pipit/readers/otf2_reader.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pipit/readers/otf2_reader.py b/pipit/readers/otf2_reader.py index dd781ee2..b63e0613 100644 --- a/pipit/readers/otf2_reader.py +++ b/pipit/readers/otf2_reader.py @@ -452,8 +452,8 @@ def read_events(self): pool.close() # merges the dataframe into one events dataframe - events_dataframe = concat_trace_data(events_dataframes) - del events_dataframes + trace = concat_trace_data(events_dataframes) + events_dataframe = trace.events # accessing the clock properties of the trace using the definitions clock_properties = self.definitions.loc[ From 27e95f8140df2f8b9e2296eca4011409cee265c2 Mon Sep 17 00:00:00 2001 From: Alexander Movsesyan Date: Thu, 7 Nov 2024 21:47:08 -0500 Subject: [PATCH 13/20] updated non-existing int values to -1 --- pipit/readers/core_reader.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/pipit/readers/core_reader.py b/pipit/readers/core_reader.py index 29a62fee..d31b7b54 100644 --- a/pipit/readers/core_reader.py +++ b/pipit/readers/core_reader.py @@ -93,15 +93,19 @@ def finalize(self): # create a dataframe trace_df = pandas.DataFrame(all_events) + trace_df["_matching_event"].fillna(-1, inplace=True) + trace_df["_parent"].fillna(-1, inplace=True) + trace_df["_matching_timestamp"].fillna(-1, inplace=True) + # categorical for memory savings trace_df = trace_df.astype( { "Name": "category", "Event Type": "category", "Process": "category", - "_matching_event": "Int32", - "_parent": "Int32", - "_matching_timestamp": "Int32", + "_matching_event": "int32", + "_parent": "int32", + "_matching_timestamp": "int32", } ) return trace_df @@ -115,7 +119,7 @@ def __update_parent_child_relationships( """ if len(stack) == 0: # root event - event["_parent"] = numpy.nan + event["_parent"] = -1 else: parent_event = event_list[stack[-1]] event["_parent"] = parent_event["unique_id"] From 466584e47d2dfc9276771f69b1b2766c496fe1f5 Mon Sep 17 00:00:00 2001 From: zoreoo <122127038+zoreoo@users.noreply.github.com> Date: Tue, 3 Dec 2024 15:43:37 -0500 Subject: [PATCH 14/20] Initial implementation of HPCToolkit reader --- pipit/readers/hpctoolkit_reader.py | 108 ++++++++++++++--------------- 1 file changed, 53 insertions(+), 55 deletions(-) diff --git a/pipit/readers/hpctoolkit_reader.py b/pipit/readers/hpctoolkit_reader.py index a6cda337..c871fa26 100644 --- a/pipit/readers/hpctoolkit_reader.py +++ b/pipit/readers/hpctoolkit_reader.py @@ -7,6 +7,7 @@ import pandas as pd import pipit.trace from pipit.graph import Graph, Node +from pipit.readers.core_reader import CoreTraceReader class MetaReader: @@ -1034,6 +1035,8 @@ def __init__( self.signed = False self.encoding = "ASCII" + self.core_reader = CoreTraceReader(0, 1) + # The trace.db header consists of the common .db header and n sections. # We're going to do a little set up work, so that's easy to change if # any revisions change the orders. @@ -1150,20 +1153,6 @@ def __read_trace_headers_section( self.file.read(8), byteorder=self.byte_order, signed=self.signed ) - self.data = { - "Timestamp (ns)": [], - "Event Type": [], - "Name": [], - "Thread": [], - "Process": [], - "Core": [], - "Host": [], - "Node": [], - "Source File Name": [], - "Source File Line Number": [], - "Calling Context ID": [], - } - for i in range(num_trace_headers): header_pointer = trace_headers_pointer + (i * trace_header_size) self.__read_single_trace_header(header_pointer) @@ -1247,24 +1236,25 @@ def __read_single_trace_header(self, header_pointer: int) -> None: self.meta_reader.get_information_from_context_id(curr_ctx_id) ) - self.data["Name"].append(str(context_information["function"])) + event = {} + event["Name"] = str(context_information["function"]) if context_information["loop_type"]: # HPCViewer only puts loops in CCT, but not trace view, so # we use a special Loop Enter/Leave event type - self.data["Event Type"].append("Loop Leave") + event["Event Type"] = "Loop Leave" else: - self.data["Event Type"].append("Leave") - self.data["Timestamp (ns)"].append(timestamp) - self.data["Process"].append(hit["RANK"]) - self.data["Thread"].append(hit["THREAD"]) - self.data["Host"].append(hit["NODE"]) - self.data["Core"].append(hit["CORE"]) - self.data["Node"].append(last_node) - self.data["Source File Name"].append(context_information["file"]) - self.data["Source File Line Number"].append( - context_information["line"] - ) - self.data["Calling Context ID"].append(curr_ctx_id) + event["Event Type"] = "Leave" + event["Timestamp (ns)"] = timestamp + event["Process"] = hit["RANK"] + event["Thread"] = hit["THREAD"] + event["Host"] = hit["NODE"] + event["Core"] = hit["CORE"] + event["Node"] = last_node + event["Source File Name"] = context_information["file"] + event["Source File Line Number"] = context_information["line"] + event["Calling Context ID"] = curr_ctx_id + + self.core_reader.add_event(event) last_node = last_node.parent # Now we want to add all the new "enter" events after @@ -1282,24 +1272,26 @@ def __read_single_trace_header(self, header_pointer: int) -> None: self.meta_reader.get_information_from_context_id(curr_ctx_id) ) - self.data["Name"].append(str(context_information["function"])) + event = {} + + event["Name"] = str(context_information["function"]) if context_information["loop_type"]: # HPCViewer only puts loops in CCT, but not trace view, so # we use a special Loop Enter/Leave event type - self.data["Event Type"].append("Loop Enter") + event["Event Type"] = "Loop Enter" else: - self.data["Event Type"].append("Enter") - self.data["Timestamp (ns)"].append(timestamp) - self.data["Process"].append(hit["RANK"]) - self.data["Thread"].append(hit["THREAD"]) - self.data["Host"].append(hit["NODE"]) - self.data["Core"].append(hit["CORE"]) - self.data["Node"].append(entry_node) - self.data["Source File Name"].append(context_information["file"]) - self.data["Source File Line Number"].append( - context_information["line"] - ) - self.data["Calling Context ID"].append(curr_ctx_id) + event["Event Type"] = "Enter" + event["Timestamp (ns)"] = timestamp + event["Process"] = hit["RANK"] + event["Thread"] = hit["THREAD"] + event["Host"] = hit["NODE"] + event["Core"] = hit["CORE"] + event["Node"] = entry_node + event["Source File Name"] = context_information["file"] + event["Source File Line Number"] = context_information["line"] + event["Calling Context ID"] = curr_ctx_id + + self.core_reader.add_event(event) last_node = current_node last_id = context_id @@ -1320,20 +1312,25 @@ def __read_single_trace_header(self, header_pointer: int) -> None: curr_ctx_id ) - self.data["Name"].append(str(context_information["function"])) + event = {} + + event["Name"] = str(context_information["function"]) if context_information["loop_type"]: - self.data["Event Type"].append("Loop Leave") + event["Event Type"]= "Loop Leave" else: - self.data["Event Type"].append("Leave") - self.data["Timestamp (ns)"].append(timestamp) - self.data["Process"].append(hit["RANK"]) - self.data["Thread"].append(hit["THREAD"]) - self.data["Host"].append(hit["NODE"]) - self.data["Core"].append(hit["CORE"]) - self.data["Node"].append(last_node) - self.data["Source File Name"].append(context_information["file"]) - self.data["Source File Line Number"].append(context_information["line"]) - self.data["Calling Context ID"].append(curr_ctx_id) + event["Event Type"] = "Leave" + event["Timestamp (ns)"]= timestamp + event["Process"] = hit["RANK"] + event["Thread"] = hit["THREAD"] + event["Host"] = hit["NODE"] + event["Core"] = hit["CORE"] + event["Node"] = last_node + event["Source File Name"] = context_information["file"] + event["Source File Line Number"] = context_information["line"] + event["Calling Context ID"] = curr_ctx_id + + self.core_reader.add_event(event) + last_node = last_node.parent @@ -1346,7 +1343,8 @@ def __init__(self, directory: str) -> None: ) def read(self) -> pipit.trace.Trace: - trace_df = pd.DataFrame(self.trace_reader.data) + trace_df = self.trace_reader.core_reader.finalize() + # Need to sort df by timestamp then index # (since many events occur at the same timestamp) From 947dbf164f8fc364b03306fc973f421352de4248 Mon Sep 17 00:00:00 2001 From: zoreoo <122127038+zoreoo@users.noreply.github.com> Date: Tue, 3 Dec 2024 15:47:01 -0500 Subject: [PATCH 15/20] Update comments --- pipit/readers/otf2_reader.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pipit/readers/otf2_reader.py b/pipit/readers/otf2_reader.py index b63e0613..c4ae6391 100644 --- a/pipit/readers/otf2_reader.py +++ b/pipit/readers/otf2_reader.py @@ -257,8 +257,7 @@ def events_reader(self, rank_size): ) metric_values = event.values - # append the values for the metrics - # to their appropriate lists + # Set the values for the metrics for i in range(len(metrics)): metrics_dict[metrics[i]] = metric_values[i] @@ -271,7 +270,7 @@ def events_reader(self, rank_size): # has the same timestamp if event.time != prev_metric_time: # if the event is not paired with any metric, then - # add placeholders for all the metric lists + # add placeholder for metric in metric_names: new_event[metric] = float("nan") else: From 050d75aa93008d76dcedd1b3b7ddfa2047a11fdb Mon Sep 17 00:00:00 2001 From: zoreoo <122127038+zoreoo@users.noreply.github.com> Date: Tue, 3 Dec 2024 16:03:25 -0500 Subject: [PATCH 16/20] Revert "updated non-existing int values to -1" This reverts commit 27e95f8140df2f8b9e2296eca4011409cee265c2. --- pipit/readers/core_reader.py | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/pipit/readers/core_reader.py b/pipit/readers/core_reader.py index d31b7b54..29a62fee 100644 --- a/pipit/readers/core_reader.py +++ b/pipit/readers/core_reader.py @@ -93,19 +93,15 @@ def finalize(self): # create a dataframe trace_df = pandas.DataFrame(all_events) - trace_df["_matching_event"].fillna(-1, inplace=True) - trace_df["_parent"].fillna(-1, inplace=True) - trace_df["_matching_timestamp"].fillna(-1, inplace=True) - # categorical for memory savings trace_df = trace_df.astype( { "Name": "category", "Event Type": "category", "Process": "category", - "_matching_event": "int32", - "_parent": "int32", - "_matching_timestamp": "int32", + "_matching_event": "Int32", + "_parent": "Int32", + "_matching_timestamp": "Int32", } ) return trace_df @@ -119,7 +115,7 @@ def __update_parent_child_relationships( """ if len(stack) == 0: # root event - event["_parent"] = -1 + event["_parent"] = numpy.nan else: parent_event = event_list[stack[-1]] event["_parent"] = parent_event["unique_id"] From 3f220bbcba79108eb22583a0369d4b5754f1d222 Mon Sep 17 00:00:00 2001 From: zoreoo <122127038+zoreoo@users.noreply.github.com> Date: Tue, 3 Dec 2024 16:05:07 -0500 Subject: [PATCH 17/20] Reapply "updated non-existing int values to -1" This reverts commit 050d75aa93008d76dcedd1b3b7ddfa2047a11fdb. --- pipit/readers/core_reader.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/pipit/readers/core_reader.py b/pipit/readers/core_reader.py index 29a62fee..d31b7b54 100644 --- a/pipit/readers/core_reader.py +++ b/pipit/readers/core_reader.py @@ -93,15 +93,19 @@ def finalize(self): # create a dataframe trace_df = pandas.DataFrame(all_events) + trace_df["_matching_event"].fillna(-1, inplace=True) + trace_df["_parent"].fillna(-1, inplace=True) + trace_df["_matching_timestamp"].fillna(-1, inplace=True) + # categorical for memory savings trace_df = trace_df.astype( { "Name": "category", "Event Type": "category", "Process": "category", - "_matching_event": "Int32", - "_parent": "Int32", - "_matching_timestamp": "Int32", + "_matching_event": "int32", + "_parent": "int32", + "_matching_timestamp": "int32", } ) return trace_df @@ -115,7 +119,7 @@ def __update_parent_child_relationships( """ if len(stack) == 0: # root event - event["_parent"] = numpy.nan + event["_parent"] = -1 else: parent_event = event_list[stack[-1]] event["_parent"] = parent_event["unique_id"] From a3cf10676a88bda182c47618be55455dcfc0629c Mon Sep 17 00:00:00 2001 From: zoreoo <122127038+zoreoo@users.noreply.github.com> Date: Tue, 3 Dec 2024 15:47:01 -0500 Subject: [PATCH 18/20] Update comments --- pipit/readers/otf2_reader.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pipit/readers/otf2_reader.py b/pipit/readers/otf2_reader.py index b685c3a7..75d6b5dc 100644 --- a/pipit/readers/otf2_reader.py +++ b/pipit/readers/otf2_reader.py @@ -254,8 +254,7 @@ def events_reader(self, rank_size): ) metric_values = event.values - # append the values for the metrics - # to their appropriate lists + # Set the values for the metrics for i in range(len(metrics)): metrics_dict[metrics[i]].append(metric_values[i]) @@ -267,7 +266,7 @@ def events_reader(self, rank_size): # has the same timestamp if event.time != prev_metric_time: # if the event is not paired with any metric, then - # add placeholders for all the metric lists + # add placeholder for metric in metric_names: metrics_dict[metric].append(float("nan")) From e058bec868d5b7557e8692e02960e69e5e820b38 Mon Sep 17 00:00:00 2001 From: zoreoo <122127038+zoreoo@users.noreply.github.com> Date: Tue, 3 Dec 2024 15:43:37 -0500 Subject: [PATCH 19/20] Initial implementation of HPCToolkit reader --- pipit/readers/hpctoolkit_reader.py | 108 ++++++++++++++--------------- 1 file changed, 53 insertions(+), 55 deletions(-) diff --git a/pipit/readers/hpctoolkit_reader.py b/pipit/readers/hpctoolkit_reader.py index a6cda337..c871fa26 100644 --- a/pipit/readers/hpctoolkit_reader.py +++ b/pipit/readers/hpctoolkit_reader.py @@ -7,6 +7,7 @@ import pandas as pd import pipit.trace from pipit.graph import Graph, Node +from pipit.readers.core_reader import CoreTraceReader class MetaReader: @@ -1034,6 +1035,8 @@ def __init__( self.signed = False self.encoding = "ASCII" + self.core_reader = CoreTraceReader(0, 1) + # The trace.db header consists of the common .db header and n sections. # We're going to do a little set up work, so that's easy to change if # any revisions change the orders. @@ -1150,20 +1153,6 @@ def __read_trace_headers_section( self.file.read(8), byteorder=self.byte_order, signed=self.signed ) - self.data = { - "Timestamp (ns)": [], - "Event Type": [], - "Name": [], - "Thread": [], - "Process": [], - "Core": [], - "Host": [], - "Node": [], - "Source File Name": [], - "Source File Line Number": [], - "Calling Context ID": [], - } - for i in range(num_trace_headers): header_pointer = trace_headers_pointer + (i * trace_header_size) self.__read_single_trace_header(header_pointer) @@ -1247,24 +1236,25 @@ def __read_single_trace_header(self, header_pointer: int) -> None: self.meta_reader.get_information_from_context_id(curr_ctx_id) ) - self.data["Name"].append(str(context_information["function"])) + event = {} + event["Name"] = str(context_information["function"]) if context_information["loop_type"]: # HPCViewer only puts loops in CCT, but not trace view, so # we use a special Loop Enter/Leave event type - self.data["Event Type"].append("Loop Leave") + event["Event Type"] = "Loop Leave" else: - self.data["Event Type"].append("Leave") - self.data["Timestamp (ns)"].append(timestamp) - self.data["Process"].append(hit["RANK"]) - self.data["Thread"].append(hit["THREAD"]) - self.data["Host"].append(hit["NODE"]) - self.data["Core"].append(hit["CORE"]) - self.data["Node"].append(last_node) - self.data["Source File Name"].append(context_information["file"]) - self.data["Source File Line Number"].append( - context_information["line"] - ) - self.data["Calling Context ID"].append(curr_ctx_id) + event["Event Type"] = "Leave" + event["Timestamp (ns)"] = timestamp + event["Process"] = hit["RANK"] + event["Thread"] = hit["THREAD"] + event["Host"] = hit["NODE"] + event["Core"] = hit["CORE"] + event["Node"] = last_node + event["Source File Name"] = context_information["file"] + event["Source File Line Number"] = context_information["line"] + event["Calling Context ID"] = curr_ctx_id + + self.core_reader.add_event(event) last_node = last_node.parent # Now we want to add all the new "enter" events after @@ -1282,24 +1272,26 @@ def __read_single_trace_header(self, header_pointer: int) -> None: self.meta_reader.get_information_from_context_id(curr_ctx_id) ) - self.data["Name"].append(str(context_information["function"])) + event = {} + + event["Name"] = str(context_information["function"]) if context_information["loop_type"]: # HPCViewer only puts loops in CCT, but not trace view, so # we use a special Loop Enter/Leave event type - self.data["Event Type"].append("Loop Enter") + event["Event Type"] = "Loop Enter" else: - self.data["Event Type"].append("Enter") - self.data["Timestamp (ns)"].append(timestamp) - self.data["Process"].append(hit["RANK"]) - self.data["Thread"].append(hit["THREAD"]) - self.data["Host"].append(hit["NODE"]) - self.data["Core"].append(hit["CORE"]) - self.data["Node"].append(entry_node) - self.data["Source File Name"].append(context_information["file"]) - self.data["Source File Line Number"].append( - context_information["line"] - ) - self.data["Calling Context ID"].append(curr_ctx_id) + event["Event Type"] = "Enter" + event["Timestamp (ns)"] = timestamp + event["Process"] = hit["RANK"] + event["Thread"] = hit["THREAD"] + event["Host"] = hit["NODE"] + event["Core"] = hit["CORE"] + event["Node"] = entry_node + event["Source File Name"] = context_information["file"] + event["Source File Line Number"] = context_information["line"] + event["Calling Context ID"] = curr_ctx_id + + self.core_reader.add_event(event) last_node = current_node last_id = context_id @@ -1320,20 +1312,25 @@ def __read_single_trace_header(self, header_pointer: int) -> None: curr_ctx_id ) - self.data["Name"].append(str(context_information["function"])) + event = {} + + event["Name"] = str(context_information["function"]) if context_information["loop_type"]: - self.data["Event Type"].append("Loop Leave") + event["Event Type"]= "Loop Leave" else: - self.data["Event Type"].append("Leave") - self.data["Timestamp (ns)"].append(timestamp) - self.data["Process"].append(hit["RANK"]) - self.data["Thread"].append(hit["THREAD"]) - self.data["Host"].append(hit["NODE"]) - self.data["Core"].append(hit["CORE"]) - self.data["Node"].append(last_node) - self.data["Source File Name"].append(context_information["file"]) - self.data["Source File Line Number"].append(context_information["line"]) - self.data["Calling Context ID"].append(curr_ctx_id) + event["Event Type"] = "Leave" + event["Timestamp (ns)"]= timestamp + event["Process"] = hit["RANK"] + event["Thread"] = hit["THREAD"] + event["Host"] = hit["NODE"] + event["Core"] = hit["CORE"] + event["Node"] = last_node + event["Source File Name"] = context_information["file"] + event["Source File Line Number"] = context_information["line"] + event["Calling Context ID"] = curr_ctx_id + + self.core_reader.add_event(event) + last_node = last_node.parent @@ -1346,7 +1343,8 @@ def __init__(self, directory: str) -> None: ) def read(self) -> pipit.trace.Trace: - trace_df = pd.DataFrame(self.trace_reader.data) + trace_df = self.trace_reader.core_reader.finalize() + # Need to sort df by timestamp then index # (since many events occur at the same timestamp) From f729fb17d2abc29023af3b86f52a757ec9547bc1 Mon Sep 17 00:00:00 2001 From: zoreoo <122127038+zoreoo@users.noreply.github.com> Date: Tue, 3 Dec 2024 16:05:07 -0500 Subject: [PATCH 20/20] Reapply "updated non-existing int values to -1" This reverts commit 050d75aa93008d76dcedd1b3b7ddfa2047a11fdb. --- pipit/readers/core_reader.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/pipit/readers/core_reader.py b/pipit/readers/core_reader.py index 29a62fee..d31b7b54 100644 --- a/pipit/readers/core_reader.py +++ b/pipit/readers/core_reader.py @@ -93,15 +93,19 @@ def finalize(self): # create a dataframe trace_df = pandas.DataFrame(all_events) + trace_df["_matching_event"].fillna(-1, inplace=True) + trace_df["_parent"].fillna(-1, inplace=True) + trace_df["_matching_timestamp"].fillna(-1, inplace=True) + # categorical for memory savings trace_df = trace_df.astype( { "Name": "category", "Event Type": "category", "Process": "category", - "_matching_event": "Int32", - "_parent": "Int32", - "_matching_timestamp": "Int32", + "_matching_event": "int32", + "_parent": "int32", + "_matching_timestamp": "int32", } ) return trace_df @@ -115,7 +119,7 @@ def __update_parent_child_relationships( """ if len(stack) == 0: # root event - event["_parent"] = numpy.nan + event["_parent"] = -1 else: parent_event = event_list[stack[-1]] event["_parent"] = parent_event["unique_id"]