Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions cmake/modules/RootMacros.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -2369,7 +2369,7 @@ macro(ROOTTEST_COMPILE_MACRO filename)
endif()
set_property(TEST ${COMPILE_MACRO_TEST} PROPERTY ENVIRONMENT ${ROOTTEST_ENVIRONMENT})
if(CMAKE_GENERATOR MATCHES Ninja AND NOT MSVC)
set_property(TEST ${COMPILE_MACRO_TEST} PROPERTY RUN_SERIAL true)
set_property(TEST ${COMPILE_MACRO_TEST} PROPERTY RESOURCE_LOCK NINJA_COMPILATION)
endif()
if (ARG_FIXTURES_SETUP)
set_property(TEST ${COMPILE_MACRO_TEST} PROPERTY
Expand Down Expand Up @@ -2475,7 +2475,7 @@ macro(ROOTTEST_GENERATE_DICTIONARY dictname)

set_property(TEST ${GENERATE_DICTIONARY_TEST} PROPERTY ENVIRONMENT ${ROOTTEST_ENVIRONMENT})
if(CMAKE_GENERATOR MATCHES Ninja AND NOT MSVC)
set_property(TEST ${GENERATE_DICTIONARY_TEST} PROPERTY RUN_SERIAL true)
set_property(TEST ${GENERATE_DICTIONARY_TEST} PROPERTY RESOURCE_LOCK NINJA_COMPILATION)
endif()

if (ARG_FIXTURES_SETUP)
Expand Down Expand Up @@ -2592,7 +2592,7 @@ macro(ROOTTEST_GENERATE_REFLEX_DICTIONARY dictionary)

set_property(TEST ${GENERATE_REFLEX_TEST} PROPERTY ENVIRONMENT ${ROOTTEST_ENVIRONMENT})
if(CMAKE_GENERATOR MATCHES Ninja AND NOT MSVC)
set_property(TEST ${GENERATE_REFLEX_TEST} PROPERTY RUN_SERIAL true)
set_property(TEST ${GENERATE_REFLEX_TEST} PROPERTY RESOURCE_LOCK NINJA_COMPILATION)
endif()

if (ARG_FIXTURES_SETUP)
Expand Down Expand Up @@ -2714,7 +2714,7 @@ macro(ROOTTEST_GENERATE_EXECUTABLE executable)
endif()

if(CMAKE_GENERATOR MATCHES Ninja AND NOT MSVC)
set_property(TEST ${GENERATE_EXECUTABLE_TEST} PROPERTY RUN_SERIAL true)
set_property(TEST ${GENERATE_EXECUTABLE_TEST} PROPERTY RESOURCE_LOCK NINJA_COMPILATION)
endif()

if(MSVC AND NOT CMAKE_GENERATOR MATCHES Ninja)
Expand Down
4 changes: 4 additions & 0 deletions core/clingutils/src/unordered_mapLinkdef.h
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// clang-format off
#include <unordered_map>
#include <string>

Expand Down Expand Up @@ -28,3 +29,6 @@
#pragma create TClass unordered_map<string,float>;
#pragma create TClass unordered_map<string,double>;
#pragma create TClass unordered_map<string,void*>;

// For snapshot with systematic variations in RDF:
#pragma create TClass unordered_map<string,pair<string,unsigned int>>;
22 changes: 17 additions & 5 deletions tree/dataframe/inc/ROOT/RDF/InterfaceUtils.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ struct SnapshotHelperArgs {
ROOT::Detail::RDF::RLoopManager *fOutputLoopManager;
ROOT::Detail::RDF::RLoopManager *fInputLoopManager;
bool fToNTuple;
bool fIncludeVariations;
};

template <typename PrevNodeType>
Expand Down Expand Up @@ -309,12 +310,23 @@ BuildAction(const ColumnNames_t &colNames, const std::shared_ptr<SnapshotHelperA
} else {
if (!ROOT::IsImplicitMTEnabled()) {
// single-thread snapshot
using Helper_t = UntypedSnapshotTTreeHelper;
using Action_t = RActionSnapshot<Helper_t, PrevNodeType>;
actionPtr.reset(new Action_t(Helper_t(filename, dirname, treename, colNames, outputColNames, options,
std::move(isDefine), outputLM, inputLM, colTypeIDs),
colNames, colTypeIDs, prevNode, colRegister));
if (snapHelperArgs->fIncludeVariations) {
using Helper_t = SnapshotHelperWithVariations;
using Action_t = RActionSnapshot<Helper_t, PrevNodeType>;
actionPtr.reset(new Action_t(Helper_t(filename, dirname, treename, colNames, outputColNames, options,
std::move(isDefine), outputLM, inputLM, colTypeIDs),
colNames, colTypeIDs, prevNode, colRegister));
} else {
using Helper_t = UntypedSnapshotTTreeHelper;
using Action_t = RActionSnapshot<Helper_t, PrevNodeType>;
actionPtr.reset(new Action_t(Helper_t(filename, dirname, treename, colNames, outputColNames, options,
std::move(isDefine), outputLM, inputLM, colTypeIDs),
colNames, colTypeIDs, prevNode, colRegister));
}
} else {
if (snapHelperArgs->fIncludeVariations) {
throw std::invalid_argument("Multi-threaded snapshot with variations is not supported yet.");
}
// multi-thread snapshot
using Helper_t = UntypedSnapshotTTreeHelperMT;
using Action_t = RActionSnapshot<Helper_t, PrevNodeType>;
Expand Down
106 changes: 87 additions & 19 deletions tree/dataframe/inc/ROOT/RDF/RActionSnapshot.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,16 @@ std::shared_ptr<GraphNode> AddDefinesToGraph(std::shared_ptr<GraphNode> node, co
std::unordered_map<void *, std::shared_ptr<GraphNode>> &visitedMap);
} // namespace GraphDrawing

class SnapshotHelperWithVariations;

template <typename Helper, typename PrevNode>
class R__CLING_PTRCHECK(off) RActionSnapshot final : public RActionBase {

// Template needed to avoid dependency on ActionHelpers.hxx
Helper fHelper;

/// Pointer to the previous node in this branch of the computation graph
std::shared_ptr<PrevNode> fPrevNode;
std::vector<std::shared_ptr<PrevNode>> fPrevNodes;

/// Column readers per slot and per input column
std::vector<std::vector<RColumnReaderBase *>> fValues;
Expand All @@ -54,8 +56,8 @@ public:
const std::vector<const std::type_info *> &colTypeIDs, std::shared_ptr<PrevNode> pd,
const RColumnRegister &colRegister)
: RActionBase(pd->GetLoopManagerUnchecked(), columns, colRegister, pd->GetVariations()),
fHelper(std::forward<Helper>(h)),
fPrevNode(std::move(pd)),
fHelper(std::move(h)),
fPrevNodes{std::move(pd)},
fValues(GetNSlots()),
fColTypeIDs(colTypeIDs)
{
Expand All @@ -65,6 +67,27 @@ public:
fIsDefine.reserve(nColumns);
for (auto i = 0u; i < nColumns; ++i)
fIsDefine.push_back(colRegister.IsDefineOrAlias(columns[i]));

if (const auto &variations = GetVariations(); !variations.empty()) {
// Get pointers to previous nodes of all systematics
fPrevNodes.reserve(1 + variations.size());
auto nominalFilter = fPrevNodes.front();
if (static_cast<RNodeBase *>(nominalFilter.get()) == fLoopManager) {
// just fill this with the RLoopManager N times
fPrevNodes.resize(1 + variations.size(), nominalFilter);
} else {
// create varied versions of the previous filter node
const auto &prevVariations = nominalFilter->GetVariations();
for (const auto &variation : variations) {
if (IsStrInVec(variation, prevVariations)) {
fPrevNodes.emplace_back(
std::static_pointer_cast<PrevNode>(nominalFilter->GetVariedFilter(variation)));
} else {
fPrevNodes.emplace_back(nominalFilter);
}
}
}
}
}

RActionSnapshot(const RActionSnapshot &) = delete;
Expand All @@ -89,11 +112,33 @@ public:
{
fValues[slot] = GetUntypedColumnReaders(slot, r, RActionBase::GetColRegister(), *fLoopManager,
RActionBase::GetColumnNames(), fColTypeIDs);

if constexpr (std::is_same_v<Helper, SnapshotHelperWithVariations>) {
// In case of systematic variations, append also the varied column readers to the values
// that get passed to the helpers
auto const &variations = GetVariations();
for (unsigned int variationIndex = 0; variationIndex < variations.size(); ++variationIndex) {
auto const &readers =
GetUntypedColumnReaders(slot, r, RActionBase::GetColRegister(), *fLoopManager,
RActionBase::GetColumnNames(), fColTypeIDs, variations[variationIndex]);
for (unsigned int i = 0; i < readers.size(); ++i) {
if (fValues[slot][i] != readers[i]) {
fValues[slot].push_back(readers[i]);
fHelper.RegisterVariedColumn(slot, i, i, 0, "nominal");
fHelper.RegisterVariedColumn(slot, fValues[slot].size() - 1, i, variationIndex + 1,
variations[variationIndex]);
}
}
}
}

fHelper.InitTask(r, slot);
}

void *GetValue(unsigned int slot, std::size_t readerIdx, Long64_t entry)
{
assert(slot < fValues.size());
assert(readerIdx < fValues[slot].size());
if (auto *val = fValues[slot][readerIdx]->template TryGet<void>(entry))
return val;

Expand All @@ -117,12 +162,34 @@ public:

void Run(unsigned int slot, Long64_t entry) final
{
// check if entry passes all filters
if (fPrevNode->CheckFilters(slot, entry))
CallExec(slot, entry);
if constexpr (std::is_same_v<Helper, SnapshotHelperWithVariations>) {
// check if entry passes all filters
std::vector<bool> filterPassed(fPrevNodes.size(), false);
for (unsigned int variation = 0; variation < fPrevNodes.size(); ++variation) {
filterPassed[variation] = fPrevNodes[variation]->CheckFilters(slot, entry);
}

if (std::any_of(filterPassed.begin(), filterPassed.end(), [](bool val) { return val; })) {
// TODO: Don't allocate
std::vector<void *> untypedValues;
auto nReaders = fValues[slot].size();
untypedValues.reserve(nReaders);
for (decltype(nReaders) readerIdx{}; readerIdx < nReaders; readerIdx++)
untypedValues.push_back(GetValue(slot, readerIdx, entry));

fHelper.Exec(slot, untypedValues, filterPassed);
}
} else {
if (fPrevNodes.front()->CheckFilters(slot, entry))
CallExec(slot, entry);
}
}

void TriggerChildrenCount() final { fPrevNode->IncrChildrenCount(); }
void TriggerChildrenCount() final
{
for (auto const &node : fPrevNodes)
node->IncrChildrenCount();
}

/// Clean-up operations to be performed at the end of a task.
void FinalizeSlot(unsigned int slot) final
Expand All @@ -142,29 +209,30 @@ public:
std::shared_ptr<GraphDrawing::GraphNode>
GetGraph(std::unordered_map<void *, std::shared_ptr<GraphDrawing::GraphNode>> &visitedMap) final
{
auto prevNode = fPrevNode->GetGraph(visitedMap);
const auto &prevColumns = prevNode->GetDefinedColumns();

// Action nodes do not need to go through CreateFilterNode: they are never common nodes between multiple branches
const auto nodeType = HasRun() ? GraphDrawing::ENodeType::kUsedAction : GraphDrawing::ENodeType::kAction;
auto thisNode = std::make_shared<GraphDrawing::GraphNode>(fHelper.GetActionName(), visitedMap.size(), nodeType);
visitedMap[(void *)this] = thisNode;

auto upmostNode = AddDefinesToGraph(thisNode, GetColRegister(), prevColumns, visitedMap);
for (auto const &node : fPrevNodes) {
auto prevNode = node->GetGraph(visitedMap);
const auto &prevColumns = prevNode->GetDefinedColumns();
auto upmostNode = AddDefinesToGraph(thisNode, GetColRegister(), prevColumns, visitedMap);

thisNode->AddDefinedColumns(GetColRegister().GenerateColumnNames());
upmostNode->SetPrevNode(prevNode);
thisNode->AddDefinedColumns(GetColRegister().GenerateColumnNames());
upmostNode->SetPrevNode(prevNode);
}
return thisNode;
}

/// This method is invoked to update a partial result during the event loop, right before passing the result to a
/// user-defined callback registered via RResultPtr::RegisterCallback
/// Forwards to the action helpers; will throw since PartialUpdate not supported for most snapshot helpers.
void *PartialUpdate(unsigned int slot) final { return fHelper.CallPartialUpdate(slot); }

/// Will throw, since varied actions are unsupported. Instead, set a flag in RSnapshotOptions.
[[maybe_unused]] std::unique_ptr<RActionBase> MakeVariedAction(std::vector<void *> && /*results*/) final
{
// TODO: Probably we also need an untyped RVariedAction
throw std::runtime_error("RDataFrame::Snapshot: Snapshot with systematic variations is not supported yet.");
throw std::logic_error("RDataFrame::Snapshot: The snapshot action cannot be varied. Instead, switch on "
"variations in RSnapshotOptions.");
}

/**
Expand All @@ -175,8 +243,8 @@ public:
*/
std::unique_ptr<RActionBase> CloneAction(void *newResult) final
{
return std::make_unique<RActionSnapshot>(fHelper.CallMakeNew(newResult), GetColumnNames(), fColTypeIDs, fPrevNode,
GetColRegister());
return std::make_unique<RActionSnapshot>(fHelper.CallMakeNew(newResult), GetColumnNames(), fColTypeIDs,
fPrevNodes.front(), GetColRegister());
}
};

Expand Down
2 changes: 1 addition & 1 deletion tree/dataframe/inc/ROOT/RDF/RDefineReader.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class RDefinesWithReaders {
// (see BookDefineJit). it is never null.
std::shared_ptr<ROOT::Detail::RDF::RDefineBase> fDefine;
// Column readers per variation (in the map) per slot (in the vector).
std::vector<std::unordered_map<std::string_view, std::unique_ptr<RDefineReader>>> fReadersPerVariation;
std::vector<std::unordered_map<std::string_view, std::shared_ptr<RDefineReader>>> fReadersPerVariation;

// Strings that were already used to represent column names in this RDataFrame instance.
ROOT::Internal::RDF::RStringCache &fCachedColNames;
Expand Down
Loading
Loading