Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 53 additions & 15 deletions Framework/Core/test/benchmark_DataRelayer.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,60 @@
#include "Framework/CompletionPolicyHelpers.h"
#include "Framework/DataRelayer.h"
#include "Framework/DataProcessingHeader.h"
#include "Framework/DataProcessingStates.h"
#include "Framework/DataProcessingStats.h"
#include "Framework/DeviceState.h"
#include "Framework/DriverConfig.h"
#include "Framework/ServiceRegistryHelpers.h"
#include "Framework/TimingHelpers.h"
#include <Monitoring/Monitoring.h>
#include <fairmq/TransportFactory.h>
#include <cstring>
#include <vector>
#include <uv.h>

using Monitoring = o2::monitoring::Monitoring;
using namespace o2::framework;
using DataHeader = o2::header::DataHeader;
using Stack = o2::header::Stack;
using RecordAction = o2::framework::DataRelayer::RecordAction;

struct BenchmarkServices {
Monitoring monitoring;
const DriverConfig driverConfig{.batch = false};
DataProcessingStates states{
TimingHelpers::defaultRealtimeBaseConfigurator(0, uv_default_loop()),
TimingHelpers::defaultCPUTimeConfigurator(uv_default_loop())};
DataProcessingStats stats{
TimingHelpers::defaultRealtimeBaseConfigurator(0, uv_default_loop()),
TimingHelpers::defaultCPUTimeConfigurator(uv_default_loop()),
{}};
DeviceState deviceState;
ServiceRegistry registry;

ServiceRegistryRef ref()
{
using MetricSpec = DataProcessingStats::MetricSpec;
int quickUpdateInterval = 1;
std::vector<MetricSpec> specs{
MetricSpec{.name = "malformed_inputs", .metricId = static_cast<short>(ProcessingStatsId::MALFORMED_INPUTS), .minPublishInterval = quickUpdateInterval},
MetricSpec{.name = "dropped_computations", .metricId = static_cast<short>(ProcessingStatsId::DROPPED_COMPUTATIONS), .minPublishInterval = quickUpdateInterval},
MetricSpec{.name = "dropped_incoming_messages", .metricId = static_cast<short>(ProcessingStatsId::DROPPED_INCOMING_MESSAGES), .minPublishInterval = quickUpdateInterval},
MetricSpec{.name = "relayed_messages", .metricId = static_cast<short>(ProcessingStatsId::RELAYED_MESSAGES), .minPublishInterval = quickUpdateInterval}};
for (auto& spec : specs) {
stats.registerMetric(spec);
}

ServiceRegistryRef r{registry};
r.registerService(ServiceRegistryHelpers::handleForService<Monitoring>(&monitoring));
r.registerService(ServiceRegistryHelpers::handleForService<DataProcessingStates>(&states));
r.registerService(ServiceRegistryHelpers::handleForService<DataProcessingStats>(&stats));
r.registerService(ServiceRegistryHelpers::handleForService<DriverConfig const>(&driverConfig));
r.registerService(ServiceRegistryHelpers::handleForService<DeviceState>(&deviceState));
return r;
}
};

// a simple benchmark of the contribution of the pure message creation
// this was important when the benchmarks below included the message
// creation inside the benchmark loop, its somewhat obsolete now but
Expand Down Expand Up @@ -54,7 +97,7 @@ BENCHMARK(BM_RelayMessageCreation);
// and the subsequent InputRecord is immediately requested.
static void BM_RelaySingleSlot(benchmark::State& state)
{
Monitoring metrics;
BenchmarkServices services;
InputSpec spec{"clusters", "TPC", "CLUSTERS"};

std::vector<InputRoute> inputs = {
Expand All @@ -64,8 +107,7 @@ static void BM_RelaySingleSlot(benchmark::State& state)
std::vector<InputChannelInfo> infos{1};
TimesliceIndex index{1, infos};
auto policy = CompletionPolicyHelpers::consumeWhenAny();
ServiceRegistry registry;
DataRelayer relayer(policy, inputs, index, {registry}, -1);
DataRelayer relayer(policy, inputs, index, services.ref(), -1);
relayer.setPipelineLength(4);

// Let's create a dummy O2 Message with two headers in the stack:
Expand Down Expand Up @@ -106,7 +148,7 @@ BENCHMARK(BM_RelaySingleSlot);
// This one will simulate a single input.
static void BM_RelayMultipleSlots(benchmark::State& state)
{
Monitoring metrics;
BenchmarkServices services;
InputSpec spec{"clusters", "TPC", "CLUSTERS"};

std::vector<InputRoute> inputs = {
Expand All @@ -117,8 +159,7 @@ static void BM_RelayMultipleSlots(benchmark::State& state)
TimesliceIndex index{1, infos};

auto policy = CompletionPolicyHelpers::consumeWhenAny();
ServiceRegistry registry;
DataRelayer relayer(policy, inputs, index, {registry}, -1);
DataRelayer relayer(policy, inputs, index, services.ref(), -1);
relayer.setPipelineLength(4);

// Let's create a dummy O2 Message with two headers in the stack:
Expand Down Expand Up @@ -163,7 +204,7 @@ BENCHMARK(BM_RelayMultipleSlots);
/// In this case we have a record with two entries
static void BM_RelayMultipleRoutes(benchmark::State& state)
{
Monitoring metrics;
BenchmarkServices services;
InputSpec spec1{"clusters", "TPC", "CLUSTERS"};
InputSpec spec2{"tracks", "TPC", "TRACKS"};

Expand All @@ -176,8 +217,7 @@ static void BM_RelayMultipleRoutes(benchmark::State& state)
TimesliceIndex index{1, infos};

auto policy = CompletionPolicyHelpers::consumeWhenAny();
ServiceRegistry registry;
DataRelayer relayer(policy, inputs, index, {registry}, -1);
DataRelayer relayer(policy, inputs, index, services.ref(), -1);
relayer.setPipelineLength(4);

// Let's create a dummy O2 Message with two headers in the stack:
Expand Down Expand Up @@ -241,7 +281,7 @@ BENCHMARK(BM_RelayMultipleRoutes);
/// In this case we have a record with two entries
static void BM_RelaySplitParts(benchmark::State& state)
{
Monitoring metrics;
BenchmarkServices services;
InputSpec spec1{"clusters", "TPC", "CLUSTERS"};

std::vector<InputRoute> inputs = {
Expand All @@ -253,8 +293,7 @@ static void BM_RelaySplitParts(benchmark::State& state)
TimesliceIndex index{1, infos};

auto policy = CompletionPolicyHelpers::consumeWhenAny();
ServiceRegistry registry;
DataRelayer relayer(policy, inputs, index, {registry}, -1);
DataRelayer relayer(policy, inputs, index, services.ref(), -1);
relayer.setPipelineLength(4);

// Let's create a dummy O2 Message with two headers in the stack:
Expand Down Expand Up @@ -301,7 +340,7 @@ BENCHMARK(BM_RelaySplitParts)->Arg(10)->Arg(100)->Arg(1000);

static void BM_RelayMultiplePayloads(benchmark::State& state)
{
Monitoring metrics;
BenchmarkServices services;
InputSpec spec1{"clusters", "TPC", "CLUSTERS"};

std::vector<InputRoute> inputs = {
Expand All @@ -313,8 +352,7 @@ static void BM_RelayMultiplePayloads(benchmark::State& state)
TimesliceIndex index{1, infos};

auto policy = CompletionPolicyHelpers::consumeWhenAny();
ServiceRegistry registry;
DataRelayer relayer(policy, inputs, index, {registry}, -1);
DataRelayer relayer(policy, inputs, index, services.ref(), -1);
relayer.setPipelineLength(4);

// DataHeader matching the one provided in the input
Expand Down