-
Notifications
You must be signed in to change notification settings - Fork 7
[WIP] Add support for RDataFrame info operations #65
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
7849b47
912c68b
8ad2915
73e066b
0e39b2f
9830f2a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -33,7 +33,7 @@ def get_action_nodes(self, node_py=None): | |
| # current PyRDF node as the head node | ||
| node_py = self.head_node | ||
| else: | ||
| if node_py.operation.is_action(): | ||
| if node_py.operation.is_action() or node_py.operation.is_info(): | ||
| # Collect all action nodes in order to return them | ||
| return_nodes.append(node_py) | ||
|
|
||
|
|
@@ -96,7 +96,7 @@ def mapper(node_cpp, node_py=None): | |
| # recursive call | ||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Now this part becomes a little fuzzy. While before we were actually returning PyROOT class instances in the form of lazy transformations and booked actions, now we can also return fundamental types like strings and vectors. |
||
| parent_node = pyroot_node | ||
|
|
||
| if node_py.operation.is_action(): | ||
| if node_py.operation.is_action() or node_py.operation.is_info(): | ||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same as above, maybe use
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same answer as above, indeed this could be refactored and use one single function to return nodes and values. |
||
| # Collect all action nodes in order to return them | ||
| return_vals.append(pyroot_node) | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -77,7 +77,7 @@ def GetValue(self): | |
| from PyRDF import current_backend | ||
| if not self.proxied_node.value: # If event-loop not triggered | ||
| generator = CallableGenerator(self.proxied_node.get_head()) | ||
| current_backend.execute(generator) | ||
| current_backend.execute(generator, trigger_loop=True) | ||
|
|
||
| return self.proxied_node.value | ||
|
|
||
|
|
@@ -123,10 +123,10 @@ def _create_new_op(self, *args, **kwargs): | |
| Handles an operation call to the current node and returns the new node | ||
| built using the operation call. | ||
| """ | ||
| from PyRDF import current_backend | ||
| # Create a new `Operation` object for the | ||
| # incoming operation call | ||
| op = Operation(self.proxied_node._new_op_name, *args, **kwargs) | ||
|
|
||
| # Create a new `Node` object to house the operation | ||
| newNode = Node(operation=op, get_head=self.proxied_node.get_head) | ||
|
|
||
|
|
@@ -136,5 +136,13 @@ def _create_new_op(self, *args, **kwargs): | |
| # Return the appropriate proxy object for the node | ||
| if op.is_action(): | ||
| return ActionProxy(newNode) | ||
| else: | ||
| elif op.is_transformation(): | ||
| return TransformationProxy(newNode) | ||
| else: | ||
| try: | ||
| generator = CallableGenerator(self.proxied_node.get_head()) | ||
| current_backend.execute(generator, trigger_loop=False) | ||
| except TypeError as e: | ||
| self.proxied_node.children.remove(newNode) | ||
| raise e | ||
| return newNode.ResultPtr | ||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In this case Furthermore, while I get that creating a new node also for info operations can guarantee the execution of the same event loop only once via the |
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -40,6 +40,10 @@ class Backend(ABC): | |
| 'Foreach', | ||
| 'Reduce', | ||
| 'Aggregate', | ||
| 'GetColumnNames', | ||
| 'GetDefinedColumnNames', | ||
| 'GetColumnType', | ||
| 'GetFilterNames', | ||
| 'Graph' | ||
| ] | ||
|
|
||
|
|
@@ -93,7 +97,7 @@ def check_supported(self, operation_name): | |
| ) | ||
|
|
||
| @abstractmethod | ||
| def execute(self, generator): | ||
| def execute(self, generator, trigger_loop=False): | ||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I like this, now that I see it I wish it were added before. One counterargument to it though: in this way we are creating pyroot nodes even when no computation is involved, or not ? I am thinking a situation in which the user first wants to find out the name of a column and then executes some operations A way to solve this would be to implement some checks in the |
||
| """ | ||
| Subclasses must define how to run the RDataFrame graph on a given | ||
| environment. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -353,7 +353,7 @@ def _get_friend_info(self, tree): | |
|
|
||
| return FriendInfo(friend_names, friend_file_names) | ||
|
|
||
| def execute(self, generator): | ||
| def execute(self, generator, trigger_loop=True): | ||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. distributed info operations won't work now, will they? Similar to how |
||
| """ | ||
| Executes the current RDataFrame graph | ||
| in the given distributed environment. | ||
|
|
@@ -526,7 +526,7 @@ def reducer(values_list1, values_list2): | |
| warnings.warn(msg, UserWarning, stacklevel=2) | ||
| PyRDF.use("local") | ||
| from .. import current_backend | ||
| return current_backend.execute(generator) | ||
| return current_backend.execute(generator, trigger_loop=True) | ||
|
|
||
| # Values produced after Map-Reduce | ||
| values = self.ProcessAndMerge(mapper, reducer) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -38,7 +38,7 @@ def __init__(self, config={}): | |
| if op not in operations_not_supported] | ||
| self.pyroot_rdf = None | ||
|
|
||
| def execute(self, generator): | ||
| def execute(self, generator, trigger_loop=False): | ||
| """ | ||
| Executes locally the current RDataFrame graph. | ||
|
|
||
|
|
@@ -52,22 +52,21 @@ def execute(self, generator): | |
|
|
||
| # if the RDataFrame has not been created yet or if a new one | ||
| # is created by the user in the same session | ||
| if (not self.pyroot_rdf) or \ | ||
| (self.pyroot_rdf is not generator.head_node): | ||
| if not self.pyroot_rdf or self.pyroot_rdf is not generator.head_node: | ||
| self.pyroot_rdf = ROOT.ROOT.RDataFrame(*generator.head_node.args) | ||
|
|
||
| values = mapper(self.pyroot_rdf) # Execute the mapper function | ||
|
|
||
| # Get the action nodes in the same order as values | ||
| nodes = generator.get_action_nodes() | ||
|
|
||
| values[0].GetValue() # Trigger event-loop | ||
|
|
||
| for i in range(len(values)): | ||
| for node, value in zip(nodes, values): | ||
| # Set the obtained values and | ||
| # 'RResultPtr's of action nodes | ||
| nodes[i].value = values[i].GetValue() | ||
| if trigger_loop and hasattr(value, 'GetValue'): | ||
| # Info actions do not have GetValue | ||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wouldn't call them "info actions". Info operations do not trigger the event loop and they have a different scope than the action operations |
||
| node.value = value.GetValue() | ||
| # We store the 'RResultPtr's because, | ||
| # those should be in scope while doing | ||
| # a 'GetValue' call on them | ||
| nodes[i].ResultPtr = values[i] | ||
| node.ResultPtr = value | ||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not all of these outputs are
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Furthermore, for those nodes that actually return |
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,69 @@ | ||
| import unittest | ||
| import PyRDF | ||
| import ROOT | ||
|
|
||
|
|
||
| class InfoOperationsLocalTest(unittest.TestCase): | ||
| """ | ||
| Check that Info operations return the expected result rather than a proxy. | ||
| """ | ||
|
|
||
| def test_GetColumnNames(self): | ||
| """ | ||
| GetColumnNames returns ROOT string vector without running the event | ||
| loop. | ||
| """ | ||
| rdf = PyRDF.RDataFrame(1) | ||
| d = rdf.Define('a', 'rdfentry_').Define('b', 'a*a') | ||
|
|
||
| column_names = d.GetColumnNames() | ||
| expected_columns = ROOT.std.vector('string')() | ||
| expected_columns.push_back("a") | ||
| expected_columns.push_back("b") | ||
|
|
||
| for column, expected in zip(column_names, expected_columns): | ||
| self.assertEqual(column, expected) | ||
|
|
||
| def test_GetColumnType(self): | ||
| """ | ||
| GetColumnType returns the type of a given column as a string. | ||
| """ | ||
| rdf = PyRDF.RDataFrame(1) | ||
| d = rdf.Define('a', 'rdfentry_').Define('b', 'a*a') | ||
|
|
||
| a_typename = d.GetColumnType('a') | ||
| b_typename = d.GetColumnType('b') | ||
| expected_type = 'ULong64_t' | ||
|
|
||
| self.assertEqual(a_typename, expected_type) | ||
| self.assertEqual(b_typename, expected_type) | ||
|
|
||
| def test_GetDefinedColumnNames(self): | ||
| """ | ||
| GetDefinedColumnNames returns the names of the defined columns. | ||
| """ | ||
| rdf = PyRDF.RDataFrame(1) | ||
| d = rdf.Define('a', 'rdfentry_').Define('b', 'a*a') | ||
|
|
||
| column_names = d.GetColumnNames() | ||
| expected_columns = ROOT.std.vector('string')() | ||
| expected_columns.push_back("a") | ||
| expected_columns.push_back("b") | ||
|
|
||
| for column, expected in zip(column_names, expected_columns): | ||
| self.assertEqual(column, expected) | ||
|
|
||
| def test_GetFilterNames(self): | ||
| """ | ||
| GetFilterNames returns the names of the filters created. | ||
| """ | ||
| rdf = PyRDF.RDataFrame(1) | ||
| filter_name = 'custom_filter' | ||
| d = rdf.Filter('rdfentry_ > 1', filter_name) | ||
|
|
||
| filters = d.GetFilterNames() | ||
| expected_filters = ROOT.std.vector('string')() | ||
| expected_filters.push_back(filter_name) | ||
|
|
||
| for f, expected in zip(filters, expected_filters): | ||
| self.assertEqual(f, expected) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What would this imply when we will add support for instant actions? Something like this would maybe start to be to long
Instead could it be better to type the following?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also this function is called
get_action_nodesso we need to remember to change the name afterwardsThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer the explicit expression, for now we do not require three conditions and we could even include
instant_actionsininfoor maybe a more suitable name if we need to support it.