1#include "executeQuery.h"
2#include <IO/Progress.h>
3#include <DataStreams/RemoteBlockInputStream.h>
4#include <Core/Block.h>
5#include <Poco/UUIDGenerator.h>
6
7namespace DB
8{
9
10namespace
11{
12
13void checkFulfilledConditionsAndUpdate(
14 const Progress & progress, RemoteBlockInputStream & stream,
15 TestStats & statistics, TestStopConditions & stop_conditions,
16 InterruptListener & interrupt_listener)
17{
18 statistics.add(progress.read_rows, progress.read_bytes);
19
20 stop_conditions.reportRowsRead(statistics.total_rows_read);
21 stop_conditions.reportBytesReadUncompressed(statistics.total_bytes_read);
22 stop_conditions.reportTotalTime(statistics.watch.elapsed() / (1000 * 1000));
23 stop_conditions.reportMinTimeNotChangingFor(statistics.min_time_watch.elapsed() / (1000 * 1000));
24 stop_conditions.reportMaxSpeedNotChangingFor(statistics.max_rows_speed_watch.elapsed() / (1000 * 1000));
25 stop_conditions.reportAverageSpeedNotChangingFor(statistics.avg_rows_speed_watch.elapsed() / (1000 * 1000));
26
27 if (stop_conditions.areFulfilled())
28 {
29 statistics.last_query_was_cancelled = true;
30 stream.cancel(false);
31 }
32
33 if (interrupt_listener.check())
34 {
35 statistics.got_SIGINT = true;
36 statistics.last_query_was_cancelled = true;
37 stream.cancel(false);
38 }
39}
40
41} // anonymous namespace
42
43void executeQuery(
44 Connection & connection,
45 const std::string & query,
46 TestStats & statistics,
47 TestStopConditions & stop_conditions,
48 InterruptListener & interrupt_listener,
49 Context & context,
50 const Settings & settings)
51{
52 static const std::string query_id_prefix
53 = Poco::UUIDGenerator::defaultGenerator().create().toString() + "-";
54 static int next_query_id = 1;
55
56 statistics.watch_per_query.restart();
57 statistics.last_query_was_cancelled = false;
58 statistics.last_query_rows_read = 0;
59 statistics.last_query_bytes_read = 0;
60 statistics.query_id = query_id_prefix + std::to_string(next_query_id++);
61
62 RemoteBlockInputStream stream(connection, query, {}, context, &settings);
63 stream.setQueryId(statistics.query_id);
64
65 stream.setProgressCallback(
66 [&](const Progress & value)
67 {
68 checkFulfilledConditionsAndUpdate(
69 value, stream, statistics,
70 stop_conditions, interrupt_listener);
71 });
72 stream.readPrefix();
73 while (Block block = stream.read());
74 stream.readSuffix();
75
76 if (!statistics.last_query_was_cancelled)
77 statistics.updateQueryInfo();
78
79 statistics.setTotalTime();
80}
81
82}
83