Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 24 additions & 13 deletions Framework/Core/include/Framework/AnalysisHelpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -358,14 +358,14 @@ template <TableRef R>
constexpr auto tableRef2InputSpec()
{
std::vector<framework::ConfigParamSpec> metadata;
auto m = getInputMetadata<typename o2::aod::MetadataTrait<o2::aod::Hash<R.desc_hash>>::metadata>();
metadata.insert(metadata.end(), m.begin(), m.end());
auto ccdbMetadata = getCCDBMetadata<typename o2::aod::MetadataTrait<o2::aod::Hash<R.desc_hash>>::metadata>();
metadata.insert(metadata.end(), ccdbMetadata.begin(), ccdbMetadata.end());
auto p = getExpressionMetadata<typename o2::aod::MetadataTrait<o2::aod::Hash<R.desc_hash>>::metadata>();
metadata.insert(metadata.end(), p.begin(), p.end());
auto idx = getIndexMetadata<typename o2::aod::MetadataTrait<o2::aod::Hash<R.desc_hash>>::metadata>();
metadata.insert(metadata.end(), idx.begin(), idx.end());
auto sources = getInputMetadata<typename o2::aod::MetadataTrait<o2::aod::Hash<R.desc_hash>>::metadata>();
metadata.insert(metadata.end(), sources.begin(), sources.end());
auto ccdbURLs = getCCDBMetadata<typename o2::aod::MetadataTrait<o2::aod::Hash<R.desc_hash>>::metadata>();
metadata.insert(metadata.end(), ccdbURLs.begin(), ccdbURLs.end());
auto expressions = getExpressionMetadata<typename o2::aod::MetadataTrait<o2::aod::Hash<R.desc_hash>>::metadata>();
metadata.insert(metadata.end(), expressions.begin(), expressions.end());
auto indices = getIndexMetadata<typename o2::aod::MetadataTrait<o2::aod::Hash<R.desc_hash>>::metadata>();
metadata.insert(metadata.end(), indices.begin(), indices.end());
if constexpr (!soa::with_ccdb_urls<typename o2::aod::MetadataTrait<o2::aod::Hash<R.desc_hash>>::metadata>) {
metadata.emplace_back(framework::ConfigParamSpec{"schema", framework::VariantType::String, framework::serializeSchema(o2::aod::MetadataTrait<o2::aod::Hash<R.desc_hash>>::metadata::getSchema()), {"\"\""}});
}
Expand All @@ -382,11 +382,22 @@ constexpr auto tableRef2InputSpec()
template <TableRef R>
constexpr auto tableRef2OutputSpec()
{
std::vector<framework::ConfigParamSpec> metadata;
using md = typename o2::aod::MetadataTrait<o2::aod::Hash<R.desc_hash>>::metadata;
if constexpr (soa::with_ccdb_urls<md>) {
metadata.emplace_back("ccdb:", framework::VariantType::Bool, true, framework::ConfigParamSpec::HelpString{"\"\""});
} else if constexpr (soa::with_expression_pack<md>) {
metadata.emplace_back("projectors", framework::VariantType::Bool, true, framework::ConfigParamSpec::HelpString{"\"\""});
} else if constexpr (soa::with_index_pack<md>) {
metadata.emplace_back("index-records", framework::VariantType::Bool, true, framework::ConfigParamSpec::HelpString{"\"\""});
}
return framework::OutputSpec{
framework::OutputLabel{o2::aod::label<R>()},
o2::aod::origin<R>(),
o2::aod::description(o2::aod::signature<R>()),
R.version};
R.version,
framework::Lifetime::Timeframe,
metadata};
}

template <TableRef R>
Expand Down Expand Up @@ -504,14 +515,14 @@ struct OutputForTable {
using table_t = decltype(typeWithRef<T>());
using metadata = aod::MetadataTrait<o2::aod::Hash<table_t::ref.desc_hash>>::metadata;

static OutputSpec const spec()
static constexpr auto spec()
{
return OutputSpec{OutputLabel{aod::label<table_t::ref>()}, o2::aod::origin<table_t::ref>(), o2::aod::description(o2::aod::signature<table_t::ref>()), table_t::ref.version};
return soa::tableRef2OutputSpec<table_t::ref>();
}

static OutputRef ref()
static constexpr auto ref()
{
return OutputRef{aod::label<table_t::ref>(), table_t::ref.version};
return soa::tableRef2OutputRef<table_t::ref>();
}
};

Expand Down
10 changes: 5 additions & 5 deletions Framework/Core/include/Framework/AnalysisManagers.h
Original file line number Diff line number Diff line change
Expand Up @@ -183,20 +183,20 @@ bool newDataframeCondition(InputRecord& record, C& conditionGroup)

/// Outputs handling
template <typename T>
bool appendOutput(std::vector<OutputSpec>&, T&, uint32_t)
constexpr bool appendOutput(std::vector<OutputSpec>&, T&, uint32_t)
{
return false;
}

template <is_produces T>
bool appendOutput(std::vector<OutputSpec>& outputs, T&, uint32_t)
constexpr bool appendOutput(std::vector<OutputSpec>& outputs, T&, uint32_t)
{
outputs.emplace_back(OutputForTable<typename T::persistent_table_t>::spec());
outputs.emplace_back(soa::tableRef2OutputSpec<T::persistent_table_t::ref>());
return true;
}

template <is_produces_group T>
bool appendOutput(std::vector<OutputSpec>& outputs, T& producesGroup, uint32_t hash)
constexpr bool appendOutput(std::vector<OutputSpec>& outputs, T& producesGroup, uint32_t hash)
{
homogeneous_apply_refs<true>([&outputs, hash](auto& produces) { return appendOutput(outputs, produces, hash); }, producesGroup);
return true;
Expand Down Expand Up @@ -261,7 +261,7 @@ bool prepareOutput(ProcessingContext&, T&)
template <is_produces T>
bool prepareOutput(ProcessingContext& context, T& produces)
{
produces.resetCursor(std::move(context.outputs().make<TableBuilder>(OutputForTable<typename T::persistent_table_t>::ref())));
produces.resetCursor(std::move(context.outputs().make<TableBuilder>(soa::tableRef2OutputRef<T::persistent_table_t::ref>())));
return true;
}

Expand Down
9 changes: 9 additions & 0 deletions Framework/Core/include/Framework/DanglingEdgesContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,24 @@ struct OutputObjectInfo {
// been requested and for which we will need to inject
// some source device.
struct DanglingEdgesContext {
// generic AOD tables
std::vector<InputSpec> requestedAODs;
std::vector<OutputSpec> providedAODs;
// extension tables
std::vector<InputSpec> requestedDYNs;
std::vector<OutputSpec> providedDYNs;
// index tables
std::vector<InputSpec> requestedIDXs;
std::vector<OutputSpec> providedIDXs;
// ccdb tables
std::vector<OutputSpec> providedTIMs;
std::vector<InputSpec> requestedTIMs;
// output objects
std::vector<OutputSpec> providedOutputObjHist;
// inputs for the extension spawner
std::vector<InputSpec> spawnerInputs;
// inputs for the index builder
std::vector<InputSpec> builderInputs;

// These are the timestamped tables which are required to
// inject the the CCDB objecs.
Expand Down
23 changes: 23 additions & 0 deletions Framework/Core/include/Framework/DataSpecViews.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,31 @@
#include "Framework/DataSpecUtils.h"
#include <ranges>

namespace o2::framework::checks
{
static auto has_params_with_name(std::string&& name)
{
return [name](ConfigParamSpec const& p) { return p.name.compare(name) == 0; };
}

static auto has_params_with_name_starting(std::string&& name)
{
return [name](ConfigParamSpec const& p) { return p.name.starts_with(name); };
}
} // namespace o2::framework::checks

namespace o2::framework::views
{
static auto filter_with_params_by_name(std::string&& name)
{
return std::views::filter([name = std::move(name)](auto const& spec) mutable { return std::ranges::any_of(spec.metadata, checks::has_params_with_name(std::move(name))); });
}

static auto filter_with_params_by_name_starting(std::string&& name)
{
return std::views::filter([name = std::move(name)](auto const& spec) mutable { return std::ranges::any_of(spec.metadata, checks::has_params_with_name_starting(std::move(name))); });
}

static auto partial_match_filter(auto what)
{
return std::views::filter([what](auto const& t) -> bool { return DataSpecUtils::partialMatch(t, what); });
Expand Down
7 changes: 6 additions & 1 deletion Framework/Core/src/AnalysisSupportHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,14 @@ void AnalysisSupportHelpers::addMissingOutputsToBuilder(std::vector<InputSpec> c
// FIXME: until we have a single list of pairs
additionalInputs |
views::partial_match_filter(AODOrigins) |
std::ranges::views::filter([](InputSpec const& input) {
return std::ranges::none_of(input.metadata, [](ConfigParamSpec const& p) { return (p.name.compare("projectors") == 0) || (p.name.compare("index-records") == 0); });
}) |
sinks::update_input_list{requestedAODs}; // update requestedAODs
additionalInputs |
views::partial_match_filter(header::DataOrigin{"DYN"}) |
std::ranges::views::filter([](InputSpec const& input) {
return std::ranges::any_of(input.metadata, [](ConfigParamSpec const& p) { return p.name.compare("projectors") == 0; });
}) |
sinks::update_input_list{requestedDYNs}; // update requestedDYNs
}

Expand Down
86 changes: 51 additions & 35 deletions Framework/Core/src/ArrowSupport.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ O2_DECLARE_DYNAMIC_LOG(rate_limiting);

namespace o2::framework
{

class EndOfStreamContext;
class ProcessingContext;

Expand Down Expand Up @@ -578,45 +577,80 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
} },
.adjustTopology = [](WorkflowSpecNode& node, ConfigContext const& ctx) {
auto& workflow = node.specs;
auto spawner = std::ranges::find_if(workflow, [](DataProcessorSpec const& spec) { return spec.name.starts_with("internal-dpl-aod-spawner"); });
auto analysisCCDB = std::ranges::find_if(workflow, [](DataProcessorSpec const& spec) { return spec.name.starts_with("internal-dpl-aod-ccdb"); });
auto builder = std::ranges::find_if(workflow, [](DataProcessorSpec const& spec) { return spec.name.starts_with("internal-dpl-aod-index-builder"); });
auto writer = std::ranges::find_if(workflow, [](DataProcessorSpec const& spec) { return spec.name.starts_with("internal-dpl-aod-writer"); });
auto& dec = ctx.services().get<DanglingEdgesContext>();
dec.requestedAODs.clear();
dec.requestedDYNs.clear();
dec.providedDYNs.clear();
dec.providedTIMs.clear();
dec.requestedTIMs.clear();

auto inputSpecLessThan = [](InputSpec const& lhs, InputSpec const& rhs) { return DataSpecUtils::describe(lhs) < DataSpecUtils::describe(rhs); };
auto outputSpecLessThan = [](OutputSpec const& lhs, OutputSpec const& rhs) { return DataSpecUtils::describe(lhs) < DataSpecUtils::describe(rhs); };

auto builder = std::ranges::find_if(workflow, [](DataProcessorSpec const& spec) { return spec.name.starts_with("internal-dpl-aod-index-builder"); });
if (builder != workflow.end()) {
// collect currently requested IDXs
dec.requestedIDXs.clear();
dec.providedIDXs.clear();
for (auto& d : workflow | views::exclude_by_name(builder->name)) {
d.inputs |
views::partial_match_filter(header::DataOrigin{"IDX"}) |
views::filter_with_params_by_name("index-records") |
sinks::update_input_list{dec.requestedIDXs};
d.outputs |
views::filter_with_params_by_name("index-records") |
sinks::update_output_list{dec.providedIDXs};
}
std::ranges::sort(dec.requestedIDXs, inputSpecLessThan);
std::ranges::sort(dec.providedIDXs, outputSpecLessThan);
dec.builderInputs.clear();
dec.requestedIDXs |
views::filter_not_matching(dec.providedIDXs) |
sinks::append_to{dec.builderInputs};
// recreate inputs and outputs
builder->inputs.clear();
builder->outputs.clear();
AnalysisSupportHelpers::addMissingOutputsToBuilder(dec.builderInputs, dec.requestedAODs, dec.requestedDYNs, *builder);
if (!builder->inputs.empty()) {
// load real AlgorithmSpec before deployment
builder->algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkOnDemandTablesSupport", "IndexTableBuilder", ctx);
}
}

auto analysisCCDB = std::ranges::find_if(workflow, [](DataProcessorSpec const& spec) { return spec.name.starts_with("internal-dpl-aod-ccdb"); });
if (analysisCCDB != workflow.end()) {
dec.requestedTIMs.clear();
dec.providedTIMs.clear();
for (auto& d : workflow | views::exclude_by_name(analysisCCDB->name)) {
d.inputs |
views::filter_with_params_by_name_starting("ccdb:") |
sinks::update_input_list{dec.requestedTIMs};
d.outputs |
views::filter_with_params_by_name_starting("ccdb:") |
sinks::append_to{dec.providedTIMs};
}
std::ranges::sort(dec.requestedTIMs, inputSpecLessThan);
std::ranges::sort(dec.providedTIMs, outputSpecLessThan);
// Use ranges::to<std::vector<>> in C++23...
dec.analysisCCDBInputs.clear();
dec.requestedTIMs |
views::filter_not_matching(dec.providedTIMs) |
sinks::append_to{dec.analysisCCDBInputs};

// recreate inputs and outputs
analysisCCDB->outputs.clear();
analysisCCDB->inputs.clear();
AnalysisSupportHelpers::addMissingOutputsToBuilder(dec.analysisCCDBInputs, dec.requestedAODs, dec.requestedDYNs, *analysisCCDB);
// load real AlgorithmSpec before deployment
builder->algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkOnDemandTablesSupport", "IndexTableBuilder", ctx);
AnalysisSupportHelpers::addMissingOutputsToBuilder(dec.requestedIDXs, dec.requestedAODs, dec.requestedDYNs, *builder);
analysisCCDB->algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkCCDBSupport", "AnalysisCCDBFetcherPlugin", ctx);
}

auto spawner = std::ranges::find_if(workflow, [](DataProcessorSpec const& spec) { return spec.name.starts_with("internal-dpl-aod-spawner"); });
if (spawner != workflow.end()) {
dec.providedDYNs.clear();
// collect currently requested DYNs
for (auto& d : workflow | views::exclude_by_name(spawner->name)) {
d.inputs |
views::partial_match_filter(header::DataOrigin{"DYN"}) |
views::filter_with_params_by_name("projectors") |
sinks::update_input_list{dec.requestedDYNs};
d.outputs |
views::partial_match_filter(header::DataOrigin{"DYN"}) |
views::filter_with_params_by_name("projectors") |
sinks::append_to{dec.providedDYNs};
}
std::ranges::sort(dec.requestedDYNs, inputSpecLessThan);
Expand All @@ -628,32 +662,14 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
// recreate inputs and outputs
spawner->outputs.clear();
spawner->inputs.clear();

// load real AlgorithmSpec before deployment
spawner->algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkOnDemandTablesSupport", "ExtendedTableSpawner", ctx);
AnalysisSupportHelpers::addMissingOutputsToSpawner({}, dec.spawnerInputs, dec.requestedAODs, *spawner);
}

if (analysisCCDB != workflow.end()) {
for (auto& d : workflow | views::exclude_by_name(analysisCCDB->name)) {
d.inputs | views::partial_match_filter(header::DataOrigin{"ATIM"}) | sinks::update_input_list{dec.requestedTIMs};
d.outputs | views::partial_match_filter(header::DataOrigin{"ATIM"}) | sinks::append_to{dec.providedTIMs};
if (!spawner->inputs.empty()) {
// load real AlgorithmSpec before deployment
spawner->algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkOnDemandTablesSupport", "ExtendedTableSpawner", ctx);
}
std::ranges::sort(dec.requestedTIMs, inputSpecLessThan);
std::ranges::sort(dec.providedTIMs, outputSpecLessThan);
// Use ranges::to<std::vector<>> in C++23...
dec.analysisCCDBInputs.clear();
dec.requestedTIMs | views::filter_not_matching(dec.providedTIMs) | sinks::append_to{dec.analysisCCDBInputs};

// recreate inputs and outputs
analysisCCDB->outputs.clear();
analysisCCDB->inputs.clear();
// load real AlgorithmSpec before deployment
// FIXME how can I make the lookup depend on DYN tables as well??
analysisCCDB->algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkCCDBSupport", "AnalysisCCDBFetcherPlugin", ctx);
AnalysisSupportHelpers::addMissingOutputsToBuilder(dec.analysisCCDBInputs, dec.requestedAODs, dec.requestedDYNs, *analysisCCDB);
}

auto writer = std::ranges::find_if(workflow, [](DataProcessorSpec const& spec) { return spec.name.starts_with("internal-dpl-aod-writer"); });
if (writer != workflow.end()) {
workflow.erase(writer);
}
Expand Down
Loading
Loading