1#pragma once
2
3#include <Poco/Event.h>
4
5#include <DataStreams/IBlockInputStream.h>
6#include <Common/CurrentMetrics.h>
7#include <Common/ThreadPool.h>
8
9
10namespace CurrentMetrics
11{
12 extern const Metric QueryThread;
13}
14
15namespace DB
16{
17
18/** Executes another BlockInputStream in a separate thread.
19 * This serves two purposes:
20 * 1. Allows you to make the different stages of the query execution pipeline work in parallel.
21 * 2. Allows you not to wait until the data is ready, and periodically check their readiness without blocking.
22 * This is necessary, for example, so that during the waiting period you can check if a packet
23 * has come over the network with a request to interrupt the execution of the query.
24 * It also allows you to execute multiple queries at the same time.
25 */
26class AsynchronousBlockInputStream : public IBlockInputStream
27{
28public:
29 AsynchronousBlockInputStream(const BlockInputStreamPtr & in)
30 {
31 children.push_back(in);
32 }
33
34 String getName() const override { return "Asynchronous"; }
35
36 void waitInnerThread()
37 {
38 if (started)
39 pool.wait();
40 }
41
42 void readPrefix() override
43 {
44 /// Do not call `readPrefix` on the child, so that the corresponding actions are performed in a separate thread.
45 if (!started)
46 {
47 next();
48 started = true;
49 }
50 }
51
52 void readSuffix() override
53 {
54 if (started)
55 {
56 pool.wait();
57 if (exception)
58 std::rethrow_exception(exception);
59 children.back()->readSuffix();
60 started = false;
61 }
62 }
63
64
65 /** Wait for the data to be ready no more than the specified timeout. Start receiving data if necessary.
66 * If the function returned true - the data is ready and you can do `read()`; You can not call the function just at the same moment again.
67 */
68 bool poll(UInt64 milliseconds)
69 {
70 if (!started)
71 {
72 next();
73 started = true;
74 }
75
76 return ready.tryWait(milliseconds);
77 }
78
79
80 Block getHeader() const override { return children.at(0)->getHeader(); }
81
82 void cancel(bool kill) override
83 {
84 IBlockInputStream::cancel(kill);
85
86 /// Wait for some backgroud calculations to be sure,
87 /// that after end of stream nothing is being executing.
88 if (started)
89 pool.wait();
90 }
91
92 ~AsynchronousBlockInputStream() override
93 {
94 if (started)
95 pool.wait();
96 }
97
98protected:
99 ThreadPool pool{1};
100 Poco::Event ready;
101 bool started = false;
102 bool first = true;
103
104 Block block;
105 std::exception_ptr exception;
106
107 Block readImpl() override;
108
109 void next();
110
111 /// Calculations that can be performed in a separate thread
112 void calculate();
113};
114
115}
116
117