1 | #include <port/unistd.h> |
2 | #include <stdlib.h> |
3 | #include <fcntl.h> |
4 | #include <signal.h> |
5 | #include <time.h> |
6 | #include <iostream> |
7 | #include <fstream> |
8 | #include <iomanip> |
9 | #include <random> |
10 | #include <pcg_random.hpp> |
11 | #include <Poco/File.h> |
12 | #include <Poco/Util/Application.h> |
13 | #include <Common/Stopwatch.h> |
14 | #include <Common/ThreadPool.h> |
15 | #include <AggregateFunctions/ReservoirSampler.h> |
16 | #include <AggregateFunctions/registerAggregateFunctions.h> |
17 | #include <boost/program_options.hpp> |
18 | #include <Common/ConcurrentBoundedQueue.h> |
19 | #include <Common/Exception.h> |
20 | #include <Common/randomSeed.h> |
21 | #include <Core/Types.h> |
22 | #include <IO/ReadBufferFromFileDescriptor.h> |
23 | #include <IO/WriteBufferFromFileDescriptor.h> |
24 | #include <IO/WriteBufferFromFile.h> |
25 | #include <IO/ReadHelpers.h> |
26 | #include <IO/WriteHelpers.h> |
27 | #include <IO/Operators.h> |
28 | #include <IO/ConnectionTimeouts.h> |
29 | #include <IO/UseSSL.h> |
30 | #include <DataStreams/RemoteBlockInputStream.h> |
31 | #include <Interpreters/Context.h> |
32 | #include <Client/Connection.h> |
33 | #include <Common/InterruptListener.h> |
34 | #include <Common/Config/configReadClient.h> |
35 | #include <Common/TerminalSize.h> |
36 | #include <Common/StudentTTest.h> |
37 | |
38 | |
39 | /** A tool for evaluating ClickHouse performance. |
40 | * The tool emulates a case with fixed amount of simultaneously executing queries. |
41 | */ |
42 | |
43 | namespace DB |
44 | { |
45 | |
46 | using Ports = std::vector<UInt16>; |
47 | |
48 | namespace ErrorCodes |
49 | { |
50 | extern const int BAD_ARGUMENTS; |
51 | extern const int EMPTY_DATA_PASSED; |
52 | } |
53 | |
54 | class Benchmark : public Poco::Util::Application |
55 | { |
56 | public: |
57 | Benchmark(unsigned concurrency_, double delay_, Strings && hosts_, Ports && ports_, |
58 | bool cumulative_, bool secure_, const String & default_database_, |
59 | const String & user_, const String & password_, const String & stage, |
60 | bool randomize_, size_t max_iterations_, double max_time_, |
61 | const String & json_path_, size_t confidence_, const Settings & settings_) |
62 | : |
63 | concurrency(concurrency_), delay(delay_), queue(concurrency), randomize(randomize_), |
64 | cumulative(cumulative_), max_iterations(max_iterations_), max_time(max_time_), |
65 | confidence(confidence_), json_path(json_path_), settings(settings_), |
66 | global_context(Context::createGlobal()), pool(concurrency) |
67 | { |
68 | const auto secure = secure_ ? Protocol::Secure::Enable : Protocol::Secure::Disable; |
69 | size_t connections_cnt = std::max(ports_.size(), hosts_.size()); |
70 | |
71 | connections.reserve(connections_cnt); |
72 | comparison_info_total.reserve(connections_cnt); |
73 | comparison_info_per_interval.reserve(connections_cnt); |
74 | |
75 | for (size_t i = 0; i < connections_cnt; ++i) |
76 | { |
77 | UInt16 cur_port = i >= ports_.size() ? 9000 : ports_[i]; |
78 | std::string cur_host = i >= hosts_.size() ? "localhost" : hosts_[i]; |
79 | |
80 | connections.emplace_back(std::make_unique<ConnectionPool>(concurrency, cur_host, cur_port, default_database_, user_, password_, "benchmark" , Protocol::Compression::Enable, secure)); |
81 | comparison_info_per_interval.emplace_back(std::make_shared<Stats>()); |
82 | comparison_info_total.emplace_back(std::make_shared<Stats>()); |
83 | } |
84 | |
85 | global_context.makeGlobalContext(); |
86 | |
87 | std::cerr << std::fixed << std::setprecision(3); |
88 | |
89 | /// This is needed to receive blocks with columns of AggregateFunction data type |
90 | /// (example: when using stage = 'with_mergeable_state') |
91 | registerAggregateFunctions(); |
92 | |
93 | if (stage == "complete" ) |
94 | query_processing_stage = QueryProcessingStage::Complete; |
95 | else if (stage == "fetch_columns" ) |
96 | query_processing_stage = QueryProcessingStage::FetchColumns; |
97 | else if (stage == "with_mergeable_state" ) |
98 | query_processing_stage = QueryProcessingStage::WithMergeableState; |
99 | else |
100 | throw Exception("Unknown query processing stage: " + stage, ErrorCodes::BAD_ARGUMENTS); |
101 | |
102 | } |
103 | |
104 | void initialize(Poco::Util::Application & self [[maybe_unused]]) |
105 | { |
106 | std::string home_path; |
107 | const char * home_path_cstr = getenv("HOME" ); |
108 | if (home_path_cstr) |
109 | home_path = home_path_cstr; |
110 | |
111 | configReadClient(config(), home_path); |
112 | } |
113 | |
114 | int main(const std::vector<std::string> &) |
115 | { |
116 | if (!json_path.empty() && Poco::File(json_path).exists()) /// Clear file with previous results |
117 | Poco::File(json_path).remove(); |
118 | |
119 | readQueries(); |
120 | runBenchmark(); |
121 | return 0; |
122 | } |
123 | |
124 | private: |
125 | using Entry = ConnectionPool::Entry; |
126 | using EntryPtr = std::shared_ptr<Entry>; |
127 | using EntryPtrs = std::vector<EntryPtr>; |
128 | |
129 | unsigned concurrency; |
130 | double delay; |
131 | |
132 | using Query = std::string; |
133 | using Queries = std::vector<Query>; |
134 | Queries queries; |
135 | |
136 | using Queue = ConcurrentBoundedQueue<Query>; |
137 | Queue queue; |
138 | |
139 | using ConnectionPoolUniq = std::unique_ptr<ConnectionPool>; |
140 | using ConnectionPoolUniqs = std::vector<ConnectionPoolUniq>; |
141 | ConnectionPoolUniqs connections; |
142 | |
143 | bool randomize; |
144 | bool cumulative; |
145 | size_t max_iterations; |
146 | double max_time; |
147 | size_t confidence; |
148 | String json_path; |
149 | Settings settings; |
150 | Context global_context; |
151 | QueryProcessingStage::Enum query_processing_stage; |
152 | |
153 | /// Don't execute new queries after timelimit or SIGINT or exception |
154 | std::atomic<bool> shutdown{false}; |
155 | |
156 | std::atomic<size_t> queries_executed{0}; |
157 | |
158 | struct Stats |
159 | { |
160 | std::atomic<size_t> queries{0}; |
161 | size_t read_rows = 0; |
162 | size_t read_bytes = 0; |
163 | size_t result_rows = 0; |
164 | size_t result_bytes = 0; |
165 | double work_time = 0; |
166 | |
167 | using Sampler = ReservoirSampler<double>; |
168 | Sampler sampler {1 << 16}; |
169 | |
170 | void add(double seconds, size_t read_rows_inc, size_t read_bytes_inc, size_t result_rows_inc, size_t result_bytes_inc) |
171 | { |
172 | ++queries; |
173 | work_time += seconds; |
174 | read_rows += read_rows_inc; |
175 | read_bytes += read_bytes_inc; |
176 | result_rows += result_rows_inc; |
177 | result_bytes += result_bytes_inc; |
178 | sampler.insert(seconds); |
179 | } |
180 | |
181 | void clear() |
182 | { |
183 | queries = 0; |
184 | work_time = 0; |
185 | read_rows = 0; |
186 | read_bytes = 0; |
187 | result_rows = 0; |
188 | result_bytes = 0; |
189 | sampler.clear(); |
190 | } |
191 | }; |
192 | |
193 | using MultiStats = std::vector<std::shared_ptr<Stats>>; |
194 | MultiStats comparison_info_per_interval; |
195 | MultiStats comparison_info_total; |
196 | StudentTTest t_test; |
197 | |
198 | Stopwatch total_watch; |
199 | Stopwatch delay_watch; |
200 | |
201 | std::mutex mutex; |
202 | |
203 | ThreadPool pool; |
204 | |
205 | void readQueries() |
206 | { |
207 | ReadBufferFromFileDescriptor in(STDIN_FILENO); |
208 | |
209 | while (!in.eof()) |
210 | { |
211 | std::string query; |
212 | readText(query, in); |
213 | assertChar('\n', in); |
214 | |
215 | if (!query.empty()) |
216 | queries.emplace_back(query); |
217 | } |
218 | |
219 | if (queries.empty()) |
220 | throw Exception("Empty list of queries." , ErrorCodes::EMPTY_DATA_PASSED); |
221 | |
222 | std::cerr << "Loaded " << queries.size() << " queries.\n" ; |
223 | } |
224 | |
225 | |
226 | void printNumberOfQueriesExecuted(size_t num) |
227 | { |
228 | std::cerr << "\nQueries executed: " << num; |
229 | if (queries.size() > 1) |
230 | std::cerr << " (" << (num * 100.0 / queries.size()) << "%)" ; |
231 | std::cerr << ".\n" ; |
232 | } |
233 | |
234 | /// Try push new query and check cancellation conditions |
235 | bool tryPushQueryInteractively(const String & query, InterruptListener & interrupt_listener) |
236 | { |
237 | bool inserted = false; |
238 | |
239 | while (!inserted) |
240 | { |
241 | inserted = queue.tryPush(query, 100); |
242 | |
243 | if (shutdown) |
244 | { |
245 | /// An exception occurred in a worker |
246 | return false; |
247 | } |
248 | |
249 | if (max_time > 0 && total_watch.elapsedSeconds() >= max_time) |
250 | { |
251 | std::cout << "Stopping launch of queries. Requested time limit is exhausted.\n" ; |
252 | return false; |
253 | } |
254 | |
255 | if (interrupt_listener.check()) |
256 | { |
257 | std::cout << "Stopping launch of queries. SIGINT recieved.\n" ; |
258 | return false; |
259 | } |
260 | |
261 | if (delay > 0 && delay_watch.elapsedSeconds() > delay) |
262 | { |
263 | printNumberOfQueriesExecuted(queries_executed); |
264 | cumulative ? report(comparison_info_total) : report(comparison_info_per_interval); |
265 | delay_watch.restart(); |
266 | } |
267 | } |
268 | |
269 | return true; |
270 | } |
271 | |
272 | void runBenchmark() |
273 | { |
274 | pcg64 generator(randomSeed()); |
275 | std::uniform_int_distribution<size_t> distribution(0, queries.size() - 1); |
276 | |
277 | try |
278 | { |
279 | for (size_t i = 0; i < concurrency; ++i) |
280 | { |
281 | EntryPtrs connection_entries; |
282 | connection_entries.reserve(connections.size()); |
283 | |
284 | for (const auto & connection : connections) |
285 | connection_entries.emplace_back(std::make_shared<Entry>( |
286 | connection->get(ConnectionTimeouts::getTCPTimeoutsWithoutFailover(settings)))); |
287 | |
288 | pool.scheduleOrThrowOnError(std::bind(&Benchmark::thread, this, connection_entries)); |
289 | } |
290 | } |
291 | catch (...) |
292 | { |
293 | pool.wait(); |
294 | throw; |
295 | } |
296 | |
297 | InterruptListener interrupt_listener; |
298 | delay_watch.restart(); |
299 | |
300 | /// Push queries into queue |
301 | for (size_t i = 0; !max_iterations || i < max_iterations; ++i) |
302 | { |
303 | size_t query_index = randomize ? distribution(generator) : i % queries.size(); |
304 | |
305 | if (!tryPushQueryInteractively(queries[query_index], interrupt_listener)) |
306 | { |
307 | shutdown = true; |
308 | break; |
309 | } |
310 | } |
311 | |
312 | pool.wait(); |
313 | total_watch.stop(); |
314 | |
315 | if (!json_path.empty()) |
316 | reportJSON(comparison_info_total, json_path); |
317 | |
318 | printNumberOfQueriesExecuted(queries_executed); |
319 | report(comparison_info_total); |
320 | } |
321 | |
322 | |
323 | void thread(EntryPtrs & connection_entries) |
324 | { |
325 | Query query; |
326 | |
327 | /// Randomly choosing connection index |
328 | pcg64 generator(randomSeed()); |
329 | std::uniform_int_distribution<size_t> distribution(0, connection_entries.size() - 1); |
330 | |
331 | try |
332 | { |
333 | /// In these threads we do not accept INT signal. |
334 | sigset_t sig_set; |
335 | if (sigemptyset(&sig_set) |
336 | || sigaddset(&sig_set, SIGINT) |
337 | || pthread_sigmask(SIG_BLOCK, &sig_set, nullptr)) |
338 | throwFromErrno("Cannot block signal." , ErrorCodes::CANNOT_BLOCK_SIGNAL); |
339 | |
340 | while (true) |
341 | { |
342 | bool = false; |
343 | |
344 | while (!extracted) |
345 | { |
346 | extracted = queue.tryPop(query, 100); |
347 | |
348 | if (shutdown || (max_iterations && queries_executed == max_iterations)) |
349 | return; |
350 | } |
351 | execute(connection_entries, query, distribution(generator)); |
352 | ++queries_executed; |
353 | } |
354 | } |
355 | catch (...) |
356 | { |
357 | shutdown = true; |
358 | std::cerr << "An error occurred while processing query:\n" << query << "\n" ; |
359 | throw; |
360 | } |
361 | } |
362 | |
363 | void execute(EntryPtrs & connection_entries, Query & query, size_t connection_index) |
364 | { |
365 | Stopwatch watch; |
366 | RemoteBlockInputStream stream( |
367 | *(*connection_entries[connection_index]), |
368 | query, {}, global_context, &settings, nullptr, Scalars(), Tables(), query_processing_stage); |
369 | |
370 | Progress progress; |
371 | stream.setProgressCallback([&progress](const Progress & value) { progress.incrementPiecewiseAtomically(value); }); |
372 | |
373 | stream.readPrefix(); |
374 | while (Block block = stream.read()); |
375 | |
376 | stream.readSuffix(); |
377 | |
378 | const BlockStreamProfileInfo & info = stream.getProfileInfo(); |
379 | |
380 | double seconds = watch.elapsedSeconds(); |
381 | |
382 | std::lock_guard lock(mutex); |
383 | |
384 | comparison_info_per_interval[connection_index]->add(seconds, progress.read_rows, progress.read_bytes, info.rows, info.bytes); |
385 | comparison_info_total[connection_index]->add(seconds, progress.read_rows, progress.read_bytes, info.rows, info.bytes); |
386 | t_test.add(connection_index, seconds); |
387 | } |
388 | |
389 | void report(MultiStats & infos) |
390 | { |
391 | std::lock_guard lock(mutex); |
392 | |
393 | std::cerr << "\n" ; |
394 | for (size_t i = 0; i < infos.size(); ++i) |
395 | { |
396 | const auto & info = infos[i]; |
397 | |
398 | /// Avoid zeros, nans or exceptions |
399 | if (0 == info->queries) |
400 | return; |
401 | |
402 | double seconds = info->work_time / concurrency; |
403 | |
404 | std::cerr |
405 | << connections[i]->getDescription() << ", " |
406 | << "queries " << info->queries << ", " |
407 | << "QPS: " << (info->queries / seconds) << ", " |
408 | << "RPS: " << (info->read_rows / seconds) << ", " |
409 | << "MiB/s: " << (info->read_bytes / seconds / 1048576) << ", " |
410 | << "result RPS: " << (info->result_rows / seconds) << ", " |
411 | << "result MiB/s: " << (info->result_bytes / seconds / 1048576) << "." |
412 | << "\n" ; |
413 | } |
414 | std::cerr << "\n" ; |
415 | |
416 | auto print_percentile = [&](double percent) |
417 | { |
418 | std::cerr << percent << "%\t\t" ; |
419 | for (const auto & info : infos) |
420 | { |
421 | std::cerr << info->sampler.quantileInterpolated(percent / 100.0) << " sec." << "\t" ; |
422 | } |
423 | std::cerr << "\n" ; |
424 | }; |
425 | |
426 | for (int percent = 0; percent <= 90; percent += 10) |
427 | print_percentile(percent); |
428 | |
429 | print_percentile(95); |
430 | print_percentile(99); |
431 | print_percentile(99.9); |
432 | print_percentile(99.99); |
433 | |
434 | std::cerr << "\n" << t_test.compareAndReport(confidence).second << "\n" ; |
435 | |
436 | if (!cumulative) |
437 | { |
438 | for (auto & info : infos) |
439 | info->clear(); |
440 | } |
441 | } |
442 | |
443 | void reportJSON(MultiStats & infos, const std::string & filename) |
444 | { |
445 | WriteBufferFromFile json_out(filename); |
446 | |
447 | std::lock_guard lock(mutex); |
448 | |
449 | auto print_key_value = [&](auto key, auto value, bool with_comma = true) |
450 | { |
451 | json_out << double_quote << key << ": " << value << (with_comma ? ",\n" : "\n" ); |
452 | }; |
453 | |
454 | auto print_percentile = [&json_out](Stats & info, auto percent, bool with_comma = true) |
455 | { |
456 | json_out << "\"" << percent << "\"" << ": " << info.sampler.quantileInterpolated(percent / 100.0) << (with_comma ? ",\n" : "\n" ); |
457 | }; |
458 | |
459 | json_out << "{\n" ; |
460 | |
461 | for (size_t i = 0; i < infos.size(); ++i) |
462 | { |
463 | const auto & info = infos[i]; |
464 | |
465 | json_out << double_quote << connections[i]->getDescription() << ": {\n" ; |
466 | json_out << double_quote << "statistics" << ": {\n" ; |
467 | |
468 | print_key_value("QPS" , info->queries / info->work_time); |
469 | print_key_value("RPS" , info->read_rows / info->work_time); |
470 | print_key_value("MiBPS" , info->read_bytes / info->work_time); |
471 | print_key_value("RPS_result" , info->result_rows / info->work_time); |
472 | print_key_value("MiBPS_result" , info->result_bytes / info->work_time); |
473 | print_key_value("num_queries" , info->queries.load(), false); |
474 | |
475 | json_out << "},\n" ; |
476 | json_out << double_quote << "query_time_percentiles" << ": {\n" ; |
477 | |
478 | for (int percent = 0; percent <= 90; percent += 10) |
479 | print_percentile(*info, percent); |
480 | |
481 | print_percentile(*info, 95); |
482 | print_percentile(*info, 99); |
483 | print_percentile(*info, 99.9); |
484 | print_percentile(*info, 99.99, false); |
485 | |
486 | json_out << "}\n" ; |
487 | json_out << (i == infos.size() - 1 ? "}\n" : "},\n" ); |
488 | } |
489 | |
490 | json_out << "}\n" ; |
491 | } |
492 | |
493 | public: |
494 | |
495 | ~Benchmark() |
496 | { |
497 | shutdown = true; |
498 | } |
499 | }; |
500 | |
501 | } |
502 | |
503 | |
504 | #ifndef __clang__ |
505 | #pragma GCC optimize("-fno-var-tracking-assignments") |
506 | #endif |
507 | #pragma GCC diagnostic ignored "-Wmissing-declarations" |
508 | |
509 | int mainEntryClickHouseBenchmark(int argc, char ** argv) |
510 | { |
511 | using namespace DB; |
512 | bool print_stacktrace = true; |
513 | |
514 | try |
515 | { |
516 | using boost::program_options::value; |
517 | |
518 | boost::program_options::options_description desc = createOptionsDescription("Allowed options" , getTerminalWidth()); |
519 | desc.add_options() |
520 | ("help" , "produce help message" ) |
521 | ("concurrency,c" , value<unsigned>()->default_value(1), "number of parallel queries" ) |
522 | ("delay,d" , value<double>()->default_value(1), "delay between intermediate reports in seconds (set 0 to disable reports)" ) |
523 | ("stage" , value<std::string>()->default_value("complete" ), "request query processing up to specified stage: complete,fetch_columns,with_mergeable_state" ) |
524 | ("iterations,i" , value<size_t>()->default_value(0), "amount of queries to be executed" ) |
525 | ("timelimit,t" , value<double>()->default_value(0.), "stop launch of queries after specified time limit" ) |
526 | ("randomize,r" , value<bool>()->default_value(false), "randomize order of execution" ) |
527 | ("json" , value<std::string>()->default_value("" ), "write final report to specified file in JSON format" ) |
528 | ("host,h" , value<Strings>()->multitoken(), "" ) |
529 | ("port,p" , value<Ports>()->multitoken(), "" ) |
530 | ("cumulative" , "prints cumulative data instead of data per interval" ) |
531 | ("secure,s" , "Use TLS connection" ) |
532 | ("user" , value<std::string>()->default_value("default" ), "" ) |
533 | ("password" , value<std::string>()->default_value("" ), "" ) |
534 | ("database" , value<std::string>()->default_value("default" ), "" ) |
535 | ("stacktrace" , "print stack traces of exceptions" ) |
536 | ("confidence" , value<size_t>()->default_value(5), "set the level of confidence for T-test [0=80%, 1=90%, 2=95%, 3=98%, 4=99%, 5=99.5%(default)" ) |
537 | ; |
538 | |
539 | Settings settings; |
540 | settings.addProgramOptions(desc); |
541 | |
542 | boost::program_options::variables_map options; |
543 | boost::program_options::store(boost::program_options::parse_command_line(argc, argv, desc), options); |
544 | boost::program_options::notify(options); |
545 | |
546 | if (options.count("help" )) |
547 | { |
548 | std::cout << "Usage: " << argv[0] << " [options] < queries.txt\n" ; |
549 | std::cout << desc << "\n" ; |
550 | return 1; |
551 | } |
552 | |
553 | print_stacktrace = options.count("stacktrace" ); |
554 | |
555 | UseSSL use_ssl; |
556 | Ports ports = options.count("port" ) ? options["port" ].as<Ports>() : Ports({9000}); |
557 | Strings hosts = options.count("host" ) ? options["host" ].as<Strings>() : Strings({"localhost" }); |
558 | |
559 | Benchmark benchmark( |
560 | options["concurrency" ].as<unsigned>(), |
561 | options["delay" ].as<double>(), |
562 | std::move(hosts), |
563 | std::move(ports), |
564 | options.count("cumulative" ), |
565 | options.count("secure" ), |
566 | options["database" ].as<std::string>(), |
567 | options["user" ].as<std::string>(), |
568 | options["password" ].as<std::string>(), |
569 | options["stage" ].as<std::string>(), |
570 | options["randomize" ].as<bool>(), |
571 | options["iterations" ].as<size_t>(), |
572 | options["timelimit" ].as<double>(), |
573 | options["json" ].as<std::string>(), |
574 | options["confidence" ].as<size_t>(), |
575 | settings); |
576 | return benchmark.run(); |
577 | } |
578 | catch (...) |
579 | { |
580 | std::cerr << getCurrentExceptionMessage(print_stacktrace, true) << std::endl; |
581 | return getCurrentExceptionCode(); |
582 | } |
583 | } |
584 | |