Skip to content

HPCToolkit reader using core reader #144

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 23 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 173 additions & 0 deletions pipit/readers/core_reader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
from typing import List, Dict

import pandas
import numpy
from pipit.trace import Trace


class CoreTraceReader:
"""
Helper Object to read traces from different sources and convert them into a common
format
"""

def __init__(self, start: int = 0, stride: int = 1):
"""
Should be called by each process to create an empty trace per process in the
reader. Creates the following data structures to represent an empty trace:
- events: Dict[int, Dict[int, List[Dict]]]
- stacks: Dict[int, Dict[int, List[int]]]
"""
# keep stride for how much unique id should be incremented
self.stride = stride

# keep track of a unique id for each event
self.unique_id = start - self.stride

# events are indexed by process number, then thread number
# stores a list of events
self.events: Dict[int, Dict[int, List[Dict]]] = {}

# stacks are indexed by process number, then thread number
# stores indices of events in the event list
self.stacks: Dict[int, Dict[int, List[int]]] = {}

def add_event(self, event: Dict) -> None:
"""
Should be called to add each event to the trace. Will update the event lists and
stacks accordingly.
"""
# get process number -- if not present, set to 0
if "Process" in event:
process = event["Process"]
else:
process = 0

# get thread number -- if not present, set to 0
if "Thread" in event:
thread = event["Thread"]
else:
thread = 0
# event["Thread"] = 0

# assign a unique id to the event
event["unique_id"] = self.__get_unique_id()

# get event list
if process not in self.events:
self.events[process] = {}
if thread not in self.events[process]:
self.events[process][thread] = []
event_list = self.events[process][thread]

# get stack
if process not in self.stacks:
self.stacks[process] = {}
if thread not in self.stacks[process]:
self.stacks[process][thread] = []
stack: List[int] = self.stacks[process][thread]

# if the event is an enter event, add the event to the stack and update the
# parent-child relationships
if event["Event Type"] == "Enter":
self.__update_parent_child_relationships(event, stack, event_list, False)
elif event["Event Type"] == "Instant":
self.__update_parent_child_relationships(event, stack, event_list, True)
# if the event is a leave event, update the matching event and pop from the
# stack
elif event["Event Type"] == "Leave":
self.__update_match_event(event, stack, event_list)

# Finally add the event to the event list
event_list.append(event)

def finalize(self):
"""
Converts the events data structure into a pandas dataframe and returns it
"""
all_events = []
for process in self.events:
for thread in self.events[process]:
all_events.extend(self.events[process][thread])

# create a dataframe
trace_df = pandas.DataFrame(all_events)

trace_df["_matching_event"].fillna(-1, inplace=True)
trace_df["_parent"].fillna(-1, inplace=True)
trace_df["_matching_timestamp"].fillna(-1, inplace=True)

# categorical for memory savings
trace_df = trace_df.astype(
{
"Name": "category",
"Event Type": "category",
"Process": "category",
"_matching_event": "int32",
"_parent": "int32",
"_matching_timestamp": "int32",
}
)
return trace_df

def __update_parent_child_relationships(
self, event: Dict, stack: List[int], event_list: List[Dict], is_instant: bool
) -> None:
"""
This method can be thought of the update upon an "Enter" event. It adds to the
stack and CCT
"""
if len(stack) == 0:
# root event
event["_parent"] = -1
else:
parent_event = event_list[stack[-1]]
event["_parent"] = parent_event["unique_id"]

# update stack
if not is_instant:
stack.append(len(event_list))

def __update_match_event(
self, leave_event: Dict, stack: List[int], event_list: List[Dict]
) -> None:
"""
This method can be thought of the update upon a "Leave" event. It pops from the
stack and updates the event list. We should look into using this function to add
artificial "Leave" events for unmatched "Enter" events
"""

while len(stack) > 0:

# popping matched events from the stack
enter_event = event_list[stack.pop()]

if enter_event["Name"] == leave_event["Name"]:
# matching event found

# update matching event ids
leave_event["_matching_event"] = enter_event["unique_id"]
enter_event["_matching_event"] = leave_event["unique_id"]

# update matching timestamps
leave_event["_matching_timestamp"] = enter_event["Timestamp (ns)"]
enter_event["_matching_timestamp"] = leave_event["Timestamp (ns)"]

break

def __get_unique_id(self) -> int:
self.unique_id += self.stride
return self.unique_id


def concat_trace_data(data_list):
"""
Concatenates the data from multiple trace readers into a single trace reader
"""
trace_data = pandas.concat(data_list, ignore_index=True)
# set index to unique_id
trace_data.set_index("unique_id", inplace=True)
trace_data.sort_values(
by="Timestamp (ns)", axis=0, ascending=True, inplace=True, ignore_index=True
)
return Trace(None, trace_data, None)
108 changes: 53 additions & 55 deletions pipit/readers/hpctoolkit_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import pandas as pd
import pipit.trace
from pipit.graph import Graph, Node
from pipit.readers.core_reader import CoreTraceReader


class MetaReader:
Expand Down Expand Up @@ -1034,6 +1035,8 @@ def __init__(
self.signed = False
self.encoding = "ASCII"

self.core_reader = CoreTraceReader(0, 1)

# The trace.db header consists of the common .db header and n sections.
# We're going to do a little set up work, so that's easy to change if
# any revisions change the orders.
Expand Down Expand Up @@ -1150,20 +1153,6 @@ def __read_trace_headers_section(
self.file.read(8), byteorder=self.byte_order, signed=self.signed
)

self.data = {
"Timestamp (ns)": [],
"Event Type": [],
"Name": [],
"Thread": [],
"Process": [],
"Core": [],
"Host": [],
"Node": [],
"Source File Name": [],
"Source File Line Number": [],
"Calling Context ID": [],
}

for i in range(num_trace_headers):
header_pointer = trace_headers_pointer + (i * trace_header_size)
self.__read_single_trace_header(header_pointer)
Expand Down Expand Up @@ -1247,24 +1236,25 @@ def __read_single_trace_header(self, header_pointer: int) -> None:
self.meta_reader.get_information_from_context_id(curr_ctx_id)
)

self.data["Name"].append(str(context_information["function"]))
event = {}
event["Name"] = str(context_information["function"])
if context_information["loop_type"]:
# HPCViewer only puts loops in CCT, but not trace view, so
# we use a special Loop Enter/Leave event type
self.data["Event Type"].append("Loop Leave")
event["Event Type"] = "Loop Leave"
else:
self.data["Event Type"].append("Leave")
self.data["Timestamp (ns)"].append(timestamp)
self.data["Process"].append(hit["RANK"])
self.data["Thread"].append(hit["THREAD"])
self.data["Host"].append(hit["NODE"])
self.data["Core"].append(hit["CORE"])
self.data["Node"].append(last_node)
self.data["Source File Name"].append(context_information["file"])
self.data["Source File Line Number"].append(
context_information["line"]
)
self.data["Calling Context ID"].append(curr_ctx_id)
event["Event Type"] = "Leave"
event["Timestamp (ns)"] = timestamp
event["Process"] = hit["RANK"]
event["Thread"] = hit["THREAD"]
event["Host"] = hit["NODE"]
event["Core"] = hit["CORE"]
event["Node"] = last_node
event["Source File Name"] = context_information["file"]
event["Source File Line Number"] = context_information["line"]
event["Calling Context ID"] = curr_ctx_id

self.core_reader.add_event(event)

last_node = last_node.parent
# Now we want to add all the new "enter" events after
Expand All @@ -1282,24 +1272,26 @@ def __read_single_trace_header(self, header_pointer: int) -> None:
self.meta_reader.get_information_from_context_id(curr_ctx_id)
)

self.data["Name"].append(str(context_information["function"]))
event = {}

event["Name"] = str(context_information["function"])
if context_information["loop_type"]:
# HPCViewer only puts loops in CCT, but not trace view, so
# we use a special Loop Enter/Leave event type
self.data["Event Type"].append("Loop Enter")
event["Event Type"] = "Loop Enter"
else:
self.data["Event Type"].append("Enter")
self.data["Timestamp (ns)"].append(timestamp)
self.data["Process"].append(hit["RANK"])
self.data["Thread"].append(hit["THREAD"])
self.data["Host"].append(hit["NODE"])
self.data["Core"].append(hit["CORE"])
self.data["Node"].append(entry_node)
self.data["Source File Name"].append(context_information["file"])
self.data["Source File Line Number"].append(
context_information["line"]
)
self.data["Calling Context ID"].append(curr_ctx_id)
event["Event Type"] = "Enter"
event["Timestamp (ns)"] = timestamp
event["Process"] = hit["RANK"]
event["Thread"] = hit["THREAD"]
event["Host"] = hit["NODE"]
event["Core"] = hit["CORE"]
event["Node"] = entry_node
event["Source File Name"] = context_information["file"]
event["Source File Line Number"] = context_information["line"]
event["Calling Context ID"] = curr_ctx_id

self.core_reader.add_event(event)

last_node = current_node
last_id = context_id
Expand All @@ -1320,20 +1312,25 @@ def __read_single_trace_header(self, header_pointer: int) -> None:
curr_ctx_id
)

self.data["Name"].append(str(context_information["function"]))
event = {}

event["Name"] = str(context_information["function"])
if context_information["loop_type"]:
self.data["Event Type"].append("Loop Leave")
event["Event Type"]= "Loop Leave"
else:
self.data["Event Type"].append("Leave")
self.data["Timestamp (ns)"].append(timestamp)
self.data["Process"].append(hit["RANK"])
self.data["Thread"].append(hit["THREAD"])
self.data["Host"].append(hit["NODE"])
self.data["Core"].append(hit["CORE"])
self.data["Node"].append(last_node)
self.data["Source File Name"].append(context_information["file"])
self.data["Source File Line Number"].append(context_information["line"])
self.data["Calling Context ID"].append(curr_ctx_id)
event["Event Type"] = "Leave"
event["Timestamp (ns)"]= timestamp
event["Process"] = hit["RANK"]
event["Thread"] = hit["THREAD"]
event["Host"] = hit["NODE"]
event["Core"] = hit["CORE"]
event["Node"] = last_node
event["Source File Name"] = context_information["file"]
event["Source File Line Number"] = context_information["line"]
event["Calling Context ID"] = curr_ctx_id

self.core_reader.add_event(event)

last_node = last_node.parent


Expand All @@ -1346,7 +1343,8 @@ def __init__(self, directory: str) -> None:
)

def read(self) -> pipit.trace.Trace:
trace_df = pd.DataFrame(self.trace_reader.data)
trace_df = self.trace_reader.core_reader.finalize()

# Need to sort df by timestamp then index
# (since many events occur at the same timestamp)

Expand Down
Loading
Loading