Skip to content

Commit c02ff65

Browse files
committed
[ntuple] add tutorial to show efficient multi-stream reading
1 parent dce742a commit c02ff65

File tree

1 file changed

+152
-0
lines changed

1 file changed

+152
-0
lines changed
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
/// \file
2+
/// \ingroup tutorial_ntuple
3+
/// \notebook
4+
/// Example of efficient multi-threaded reading when multiple threads share a single reader.
5+
///
6+
/// \macro_image
7+
/// \macro_code
8+
///
9+
/// \date October 2025
10+
/// \author The ROOT Team
11+
12+
#include <ROOT/RNTupleModel.hxx>
13+
#include <ROOT/RNTupleReader.hxx>
14+
#include <ROOT/RNTupleWriter.hxx>
15+
16+
#include <TCanvas.h>
17+
#include <TGraph.h>
18+
#include <TRandom3.h>
19+
#include <TStyle.h>
20+
21+
#include <array>
22+
#include <atomic>
23+
#include <chrono>
24+
#include <mutex>
25+
#include <thread>
26+
#include <vector>
27+
28+
using namespace std::chrono_literals;
29+
30+
// Where to store the ntuple of this example
31+
constexpr char const *kNTupleFileName = "ntpl017_shared_reader.root";
32+
33+
struct Point {
34+
float fX;
35+
float fY;
36+
};
37+
38+
void Write()
39+
{
40+
auto model = ROOT::RNTupleModel::Create();
41+
auto ptrPoint = model->MakeField<Point>("point");
42+
43+
auto writer = ROOT::RNTupleWriter::Recreate(std::move(model), "ntpl", kNTupleFileName);
44+
45+
for (int i = 0; i < 10000; ++i) {
46+
if (i % 1000 == 0)
47+
writer->CommitCluster();
48+
49+
auto prng = std::make_unique<TRandom3>();
50+
prng->SetSeed();
51+
52+
ptrPoint->fX = prng->Rndm(1);
53+
ptrPoint->fY = prng->Rndm(1);
54+
writer->Fill();
55+
}
56+
}
57+
58+
template <bool InformedT>
59+
void ProcessEntries(ROOT::RNTupleReader *reader, const std::chrono::microseconds &usPerEvent,
60+
std::vector<int> *countLoadedClusters)
61+
{
62+
static std::mutex gLock;
63+
64+
static std::atomic<int> gThreadId;
65+
const auto threadId = ++gThreadId;
66+
67+
static std::atomic<int> gNEntriesDone;
68+
69+
const auto N = reader->GetNEntries();
70+
71+
auto token = reader->CreateActiveEntryToken();
72+
for (int i = threadId; i < N; i += 2) {
73+
{
74+
std::lock_guard<std::mutex> guard(gLock);
75+
if constexpr (InformedT)
76+
token.SetEntryNumber(i);
77+
reader->LoadEntry(i);
78+
}
79+
80+
std::this_thread::sleep_for(usPerEvent);
81+
82+
countLoadedClusters->at(++gNEntriesDone) =
83+
reader->GetMetrics().GetCounter("RNTupleReader.RPageSourceFile.nClusterLoaded")->GetValueAsInt();
84+
}
85+
}
86+
87+
void ReadNaive()
88+
{
89+
auto reader = ROOT::RNTupleReader::Open("ntpl", kNTupleFileName);
90+
reader->EnableMetrics();
91+
92+
const auto N = reader->GetNEntries();
93+
std::vector<int> countLoadedClusters(N);
94+
95+
std::array<std::thread, 2> threads;
96+
threads[0] = std::thread(ProcessEntries<false>, reader.get(), 100us, &countLoadedClusters);
97+
threads[1] = std::thread(ProcessEntries<false>, reader.get(), 200us, &countLoadedClusters);
98+
for (auto &t : threads) {
99+
t.join();
100+
}
101+
102+
gStyle->SetOptStat(0);
103+
104+
TCanvas *canvas = new TCanvas("", "Shared Reader Example", 200, 10, 1500, 500);
105+
//canvas->Divide(2, 1);
106+
107+
//
108+
// canvas->cd(1);
109+
110+
auto graph = new TGraph();
111+
for (unsigned int i = 0; i < N; ++i) {
112+
graph->SetPoint(i, i, countLoadedClusters[i]);
113+
}
114+
graph->Draw("ALP");
115+
}
116+
117+
void ReadInformed()
118+
{
119+
auto reader = ROOT::RNTupleReader::Open("ntpl", kNTupleFileName);
120+
reader->EnableMetrics();
121+
122+
const auto N = reader->GetNEntries();
123+
std::vector<int> countLoadedClusters(N);
124+
125+
std::array<std::thread, 2> threads;
126+
threads[0] = std::thread(ProcessEntries<true>, reader.get(), 100us, &countLoadedClusters);
127+
threads[1] = std::thread(ProcessEntries<true>, reader.get(), 200us, &countLoadedClusters);
128+
for (auto &t : threads) {
129+
t.join();
130+
}
131+
132+
gStyle->SetOptStat(0);
133+
134+
TCanvas *canvas = new TCanvas("", "Shared Reader Example", 200, 10, 1500, 500);
135+
//canvas->Divide(2, 1);
136+
137+
//
138+
// canvas->cd(1);
139+
140+
auto graph = new TGraph();
141+
for (unsigned int i = 0; i < N; ++i) {
142+
graph->SetPoint(i, i, countLoadedClusters[i]);
143+
}
144+
graph->Draw("ALP");
145+
}
146+
147+
void ntpl017_shared_reader()
148+
{
149+
Write();
150+
ReadNaive();
151+
ReadInformed();
152+
}

0 commit comments

Comments
 (0)