1#include <port/unistd.h>
2#include <stdlib.h>
3#include <fcntl.h>
4#include <signal.h>
5#include <time.h>
6#include <iostream>
7#include <fstream>
8#include <iomanip>
9#include <random>
10#include <pcg_random.hpp>
11#include <Poco/File.h>
12#include <Poco/Util/Application.h>
13#include <Common/Stopwatch.h>
14#include <Common/ThreadPool.h>
15#include <AggregateFunctions/ReservoirSampler.h>
16#include <AggregateFunctions/registerAggregateFunctions.h>
17#include <boost/program_options.hpp>
18#include <Common/ConcurrentBoundedQueue.h>
19#include <Common/Exception.h>
20#include <Common/randomSeed.h>
21#include <Core/Types.h>
22#include <IO/ReadBufferFromFileDescriptor.h>
23#include <IO/WriteBufferFromFileDescriptor.h>
24#include <IO/WriteBufferFromFile.h>
25#include <IO/ReadHelpers.h>
26#include <IO/WriteHelpers.h>
27#include <IO/Operators.h>
28#include <IO/ConnectionTimeouts.h>
29#include <IO/UseSSL.h>
30#include <DataStreams/RemoteBlockInputStream.h>
31#include <Interpreters/Context.h>
32#include <Client/Connection.h>
33#include <Common/InterruptListener.h>
34#include <Common/Config/configReadClient.h>
35#include <Common/TerminalSize.h>
36#include <Common/StudentTTest.h>
37
38
39/** A tool for evaluating ClickHouse performance.
40 * The tool emulates a case with fixed amount of simultaneously executing queries.
41 */
42
43namespace DB
44{
45
46using Ports = std::vector<UInt16>;
47
48namespace ErrorCodes
49{
50 extern const int BAD_ARGUMENTS;
51 extern const int EMPTY_DATA_PASSED;
52}
53
54class Benchmark : public Poco::Util::Application
55{
56public:
57 Benchmark(unsigned concurrency_, double delay_, Strings && hosts_, Ports && ports_,
58 bool cumulative_, bool secure_, const String & default_database_,
59 const String & user_, const String & password_, const String & stage,
60 bool randomize_, size_t max_iterations_, double max_time_,
61 const String & json_path_, size_t confidence_, const Settings & settings_)
62 :
63 concurrency(concurrency_), delay(delay_), queue(concurrency), randomize(randomize_),
64 cumulative(cumulative_), max_iterations(max_iterations_), max_time(max_time_),
65 confidence(confidence_), json_path(json_path_), settings(settings_),
66 global_context(Context::createGlobal()), pool(concurrency)
67 {
68 const auto secure = secure_ ? Protocol::Secure::Enable : Protocol::Secure::Disable;
69 size_t connections_cnt = std::max(ports_.size(), hosts_.size());
70
71 connections.reserve(connections_cnt);
72 comparison_info_total.reserve(connections_cnt);
73 comparison_info_per_interval.reserve(connections_cnt);
74
75 for (size_t i = 0; i < connections_cnt; ++i)
76 {
77 UInt16 cur_port = i >= ports_.size() ? 9000 : ports_[i];
78 std::string cur_host = i >= hosts_.size() ? "localhost" : hosts_[i];
79
80 connections.emplace_back(std::make_unique<ConnectionPool>(concurrency, cur_host, cur_port, default_database_, user_, password_, "benchmark", Protocol::Compression::Enable, secure));
81 comparison_info_per_interval.emplace_back(std::make_shared<Stats>());
82 comparison_info_total.emplace_back(std::make_shared<Stats>());
83 }
84
85 global_context.makeGlobalContext();
86
87 std::cerr << std::fixed << std::setprecision(3);
88
89 /// This is needed to receive blocks with columns of AggregateFunction data type
90 /// (example: when using stage = 'with_mergeable_state')
91 registerAggregateFunctions();
92
93 if (stage == "complete")
94 query_processing_stage = QueryProcessingStage::Complete;
95 else if (stage == "fetch_columns")
96 query_processing_stage = QueryProcessingStage::FetchColumns;
97 else if (stage == "with_mergeable_state")
98 query_processing_stage = QueryProcessingStage::WithMergeableState;
99 else
100 throw Exception("Unknown query processing stage: " + stage, ErrorCodes::BAD_ARGUMENTS);
101
102 }
103
104 void initialize(Poco::Util::Application & self [[maybe_unused]])
105 {
106 std::string home_path;
107 const char * home_path_cstr = getenv("HOME");
108 if (home_path_cstr)
109 home_path = home_path_cstr;
110
111 configReadClient(config(), home_path);
112 }
113
114 int main(const std::vector<std::string> &)
115 {
116 if (!json_path.empty() && Poco::File(json_path).exists()) /// Clear file with previous results
117 Poco::File(json_path).remove();
118
119 readQueries();
120 runBenchmark();
121 return 0;
122 }
123
124private:
125 using Entry = ConnectionPool::Entry;
126 using EntryPtr = std::shared_ptr<Entry>;
127 using EntryPtrs = std::vector<EntryPtr>;
128
129 unsigned concurrency;
130 double delay;
131
132 using Query = std::string;
133 using Queries = std::vector<Query>;
134 Queries queries;
135
136 using Queue = ConcurrentBoundedQueue<Query>;
137 Queue queue;
138
139 using ConnectionPoolUniq = std::unique_ptr<ConnectionPool>;
140 using ConnectionPoolUniqs = std::vector<ConnectionPoolUniq>;
141 ConnectionPoolUniqs connections;
142
143 bool randomize;
144 bool cumulative;
145 size_t max_iterations;
146 double max_time;
147 size_t confidence;
148 String json_path;
149 Settings settings;
150 Context global_context;
151 QueryProcessingStage::Enum query_processing_stage;
152
153 /// Don't execute new queries after timelimit or SIGINT or exception
154 std::atomic<bool> shutdown{false};
155
156 std::atomic<size_t> queries_executed{0};
157
158 struct Stats
159 {
160 std::atomic<size_t> queries{0};
161 size_t read_rows = 0;
162 size_t read_bytes = 0;
163 size_t result_rows = 0;
164 size_t result_bytes = 0;
165 double work_time = 0;
166
167 using Sampler = ReservoirSampler<double>;
168 Sampler sampler {1 << 16};
169
170 void add(double seconds, size_t read_rows_inc, size_t read_bytes_inc, size_t result_rows_inc, size_t result_bytes_inc)
171 {
172 ++queries;
173 work_time += seconds;
174 read_rows += read_rows_inc;
175 read_bytes += read_bytes_inc;
176 result_rows += result_rows_inc;
177 result_bytes += result_bytes_inc;
178 sampler.insert(seconds);
179 }
180
181 void clear()
182 {
183 queries = 0;
184 work_time = 0;
185 read_rows = 0;
186 read_bytes = 0;
187 result_rows = 0;
188 result_bytes = 0;
189 sampler.clear();
190 }
191 };
192
193 using MultiStats = std::vector<std::shared_ptr<Stats>>;
194 MultiStats comparison_info_per_interval;
195 MultiStats comparison_info_total;
196 StudentTTest t_test;
197
198 Stopwatch total_watch;
199 Stopwatch delay_watch;
200
201 std::mutex mutex;
202
203 ThreadPool pool;
204
205 void readQueries()
206 {
207 ReadBufferFromFileDescriptor in(STDIN_FILENO);
208
209 while (!in.eof())
210 {
211 std::string query;
212 readText(query, in);
213 assertChar('\n', in);
214
215 if (!query.empty())
216 queries.emplace_back(query);
217 }
218
219 if (queries.empty())
220 throw Exception("Empty list of queries.", ErrorCodes::EMPTY_DATA_PASSED);
221
222 std::cerr << "Loaded " << queries.size() << " queries.\n";
223 }
224
225
226 void printNumberOfQueriesExecuted(size_t num)
227 {
228 std::cerr << "\nQueries executed: " << num;
229 if (queries.size() > 1)
230 std::cerr << " (" << (num * 100.0 / queries.size()) << "%)";
231 std::cerr << ".\n";
232 }
233
234 /// Try push new query and check cancellation conditions
235 bool tryPushQueryInteractively(const String & query, InterruptListener & interrupt_listener)
236 {
237 bool inserted = false;
238
239 while (!inserted)
240 {
241 inserted = queue.tryPush(query, 100);
242
243 if (shutdown)
244 {
245 /// An exception occurred in a worker
246 return false;
247 }
248
249 if (max_time > 0 && total_watch.elapsedSeconds() >= max_time)
250 {
251 std::cout << "Stopping launch of queries. Requested time limit is exhausted.\n";
252 return false;
253 }
254
255 if (interrupt_listener.check())
256 {
257 std::cout << "Stopping launch of queries. SIGINT recieved.\n";
258 return false;
259 }
260
261 if (delay > 0 && delay_watch.elapsedSeconds() > delay)
262 {
263 printNumberOfQueriesExecuted(queries_executed);
264 cumulative ? report(comparison_info_total) : report(comparison_info_per_interval);
265 delay_watch.restart();
266 }
267 }
268
269 return true;
270 }
271
272 void runBenchmark()
273 {
274 pcg64 generator(randomSeed());
275 std::uniform_int_distribution<size_t> distribution(0, queries.size() - 1);
276
277 try
278 {
279 for (size_t i = 0; i < concurrency; ++i)
280 {
281 EntryPtrs connection_entries;
282 connection_entries.reserve(connections.size());
283
284 for (const auto & connection : connections)
285 connection_entries.emplace_back(std::make_shared<Entry>(
286 connection->get(ConnectionTimeouts::getTCPTimeoutsWithoutFailover(settings))));
287
288 pool.scheduleOrThrowOnError(std::bind(&Benchmark::thread, this, connection_entries));
289 }
290 }
291 catch (...)
292 {
293 pool.wait();
294 throw;
295 }
296
297 InterruptListener interrupt_listener;
298 delay_watch.restart();
299
300 /// Push queries into queue
301 for (size_t i = 0; !max_iterations || i < max_iterations; ++i)
302 {
303 size_t query_index = randomize ? distribution(generator) : i % queries.size();
304
305 if (!tryPushQueryInteractively(queries[query_index], interrupt_listener))
306 {
307 shutdown = true;
308 break;
309 }
310 }
311
312 pool.wait();
313 total_watch.stop();
314
315 if (!json_path.empty())
316 reportJSON(comparison_info_total, json_path);
317
318 printNumberOfQueriesExecuted(queries_executed);
319 report(comparison_info_total);
320 }
321
322
323 void thread(EntryPtrs & connection_entries)
324 {
325 Query query;
326
327 /// Randomly choosing connection index
328 pcg64 generator(randomSeed());
329 std::uniform_int_distribution<size_t> distribution(0, connection_entries.size() - 1);
330
331 try
332 {
333 /// In these threads we do not accept INT signal.
334 sigset_t sig_set;
335 if (sigemptyset(&sig_set)
336 || sigaddset(&sig_set, SIGINT)
337 || pthread_sigmask(SIG_BLOCK, &sig_set, nullptr))
338 throwFromErrno("Cannot block signal.", ErrorCodes::CANNOT_BLOCK_SIGNAL);
339
340 while (true)
341 {
342 bool extracted = false;
343
344 while (!extracted)
345 {
346 extracted = queue.tryPop(query, 100);
347
348 if (shutdown || (max_iterations && queries_executed == max_iterations))
349 return;
350 }
351 execute(connection_entries, query, distribution(generator));
352 ++queries_executed;
353 }
354 }
355 catch (...)
356 {
357 shutdown = true;
358 std::cerr << "An error occurred while processing query:\n" << query << "\n";
359 throw;
360 }
361 }
362
363 void execute(EntryPtrs & connection_entries, Query & query, size_t connection_index)
364 {
365 Stopwatch watch;
366 RemoteBlockInputStream stream(
367 *(*connection_entries[connection_index]),
368 query, {}, global_context, &settings, nullptr, Scalars(), Tables(), query_processing_stage);
369
370 Progress progress;
371 stream.setProgressCallback([&progress](const Progress & value) { progress.incrementPiecewiseAtomically(value); });
372
373 stream.readPrefix();
374 while (Block block = stream.read());
375
376 stream.readSuffix();
377
378 const BlockStreamProfileInfo & info = stream.getProfileInfo();
379
380 double seconds = watch.elapsedSeconds();
381
382 std::lock_guard lock(mutex);
383
384 comparison_info_per_interval[connection_index]->add(seconds, progress.read_rows, progress.read_bytes, info.rows, info.bytes);
385 comparison_info_total[connection_index]->add(seconds, progress.read_rows, progress.read_bytes, info.rows, info.bytes);
386 t_test.add(connection_index, seconds);
387 }
388
389 void report(MultiStats & infos)
390 {
391 std::lock_guard lock(mutex);
392
393 std::cerr << "\n";
394 for (size_t i = 0; i < infos.size(); ++i)
395 {
396 const auto & info = infos[i];
397
398 /// Avoid zeros, nans or exceptions
399 if (0 == info->queries)
400 return;
401
402 double seconds = info->work_time / concurrency;
403
404 std::cerr
405 << connections[i]->getDescription() << ", "
406 << "queries " << info->queries << ", "
407 << "QPS: " << (info->queries / seconds) << ", "
408 << "RPS: " << (info->read_rows / seconds) << ", "
409 << "MiB/s: " << (info->read_bytes / seconds / 1048576) << ", "
410 << "result RPS: " << (info->result_rows / seconds) << ", "
411 << "result MiB/s: " << (info->result_bytes / seconds / 1048576) << "."
412 << "\n";
413 }
414 std::cerr << "\n";
415
416 auto print_percentile = [&](double percent)
417 {
418 std::cerr << percent << "%\t\t";
419 for (const auto & info : infos)
420 {
421 std::cerr << info->sampler.quantileInterpolated(percent / 100.0) << " sec." << "\t";
422 }
423 std::cerr << "\n";
424 };
425
426 for (int percent = 0; percent <= 90; percent += 10)
427 print_percentile(percent);
428
429 print_percentile(95);
430 print_percentile(99);
431 print_percentile(99.9);
432 print_percentile(99.99);
433
434 std::cerr << "\n" << t_test.compareAndReport(confidence).second << "\n";
435
436 if (!cumulative)
437 {
438 for (auto & info : infos)
439 info->clear();
440 }
441 }
442
443 void reportJSON(MultiStats & infos, const std::string & filename)
444 {
445 WriteBufferFromFile json_out(filename);
446
447 std::lock_guard lock(mutex);
448
449 auto print_key_value = [&](auto key, auto value, bool with_comma = true)
450 {
451 json_out << double_quote << key << ": " << value << (with_comma ? ",\n" : "\n");
452 };
453
454 auto print_percentile = [&json_out](Stats & info, auto percent, bool with_comma = true)
455 {
456 json_out << "\"" << percent << "\"" << ": " << info.sampler.quantileInterpolated(percent / 100.0) << (with_comma ? ",\n" : "\n");
457 };
458
459 json_out << "{\n";
460
461 for (size_t i = 0; i < infos.size(); ++i)
462 {
463 const auto & info = infos[i];
464
465 json_out << double_quote << connections[i]->getDescription() << ": {\n";
466 json_out << double_quote << "statistics" << ": {\n";
467
468 print_key_value("QPS", info->queries / info->work_time);
469 print_key_value("RPS", info->read_rows / info->work_time);
470 print_key_value("MiBPS", info->read_bytes / info->work_time);
471 print_key_value("RPS_result", info->result_rows / info->work_time);
472 print_key_value("MiBPS_result", info->result_bytes / info->work_time);
473 print_key_value("num_queries", info->queries.load(), false);
474
475 json_out << "},\n";
476 json_out << double_quote << "query_time_percentiles" << ": {\n";
477
478 for (int percent = 0; percent <= 90; percent += 10)
479 print_percentile(*info, percent);
480
481 print_percentile(*info, 95);
482 print_percentile(*info, 99);
483 print_percentile(*info, 99.9);
484 print_percentile(*info, 99.99, false);
485
486 json_out << "}\n";
487 json_out << (i == infos.size() - 1 ? "}\n" : "},\n");
488 }
489
490 json_out << "}\n";
491 }
492
493public:
494
495 ~Benchmark()
496 {
497 shutdown = true;
498 }
499};
500
501}
502
503
504#ifndef __clang__
505#pragma GCC optimize("-fno-var-tracking-assignments")
506#endif
507#pragma GCC diagnostic ignored "-Wmissing-declarations"
508
509int mainEntryClickHouseBenchmark(int argc, char ** argv)
510{
511 using namespace DB;
512 bool print_stacktrace = true;
513
514 try
515 {
516 using boost::program_options::value;
517
518 boost::program_options::options_description desc = createOptionsDescription("Allowed options", getTerminalWidth());
519 desc.add_options()
520 ("help", "produce help message")
521 ("concurrency,c", value<unsigned>()->default_value(1), "number of parallel queries")
522 ("delay,d", value<double>()->default_value(1), "delay between intermediate reports in seconds (set 0 to disable reports)")
523 ("stage", value<std::string>()->default_value("complete"), "request query processing up to specified stage: complete,fetch_columns,with_mergeable_state")
524 ("iterations,i", value<size_t>()->default_value(0), "amount of queries to be executed")
525 ("timelimit,t", value<double>()->default_value(0.), "stop launch of queries after specified time limit")
526 ("randomize,r", value<bool>()->default_value(false), "randomize order of execution")
527 ("json", value<std::string>()->default_value(""), "write final report to specified file in JSON format")
528 ("host,h", value<Strings>()->multitoken(), "")
529 ("port,p", value<Ports>()->multitoken(), "")
530 ("cumulative", "prints cumulative data instead of data per interval")
531 ("secure,s", "Use TLS connection")
532 ("user", value<std::string>()->default_value("default"), "")
533 ("password", value<std::string>()->default_value(""), "")
534 ("database", value<std::string>()->default_value("default"), "")
535 ("stacktrace", "print stack traces of exceptions")
536 ("confidence", value<size_t>()->default_value(5), "set the level of confidence for T-test [0=80%, 1=90%, 2=95%, 3=98%, 4=99%, 5=99.5%(default)")
537 ;
538
539 Settings settings;
540 settings.addProgramOptions(desc);
541
542 boost::program_options::variables_map options;
543 boost::program_options::store(boost::program_options::parse_command_line(argc, argv, desc), options);
544 boost::program_options::notify(options);
545
546 if (options.count("help"))
547 {
548 std::cout << "Usage: " << argv[0] << " [options] < queries.txt\n";
549 std::cout << desc << "\n";
550 return 1;
551 }
552
553 print_stacktrace = options.count("stacktrace");
554
555 UseSSL use_ssl;
556 Ports ports = options.count("port") ? options["port"].as<Ports>() : Ports({9000});
557 Strings hosts = options.count("host") ? options["host"].as<Strings>() : Strings({"localhost"});
558
559 Benchmark benchmark(
560 options["concurrency"].as<unsigned>(),
561 options["delay"].as<double>(),
562 std::move(hosts),
563 std::move(ports),
564 options.count("cumulative"),
565 options.count("secure"),
566 options["database"].as<std::string>(),
567 options["user"].as<std::string>(),
568 options["password"].as<std::string>(),
569 options["stage"].as<std::string>(),
570 options["randomize"].as<bool>(),
571 options["iterations"].as<size_t>(),
572 options["timelimit"].as<double>(),
573 options["json"].as<std::string>(),
574 options["confidence"].as<size_t>(),
575 settings);
576 return benchmark.run();
577 }
578 catch (...)
579 {
580 std::cerr << getCurrentExceptionMessage(print_stacktrace, true) << std::endl;
581 return getCurrentExceptionCode();
582 }
583}
584