1#include "AsynchronousBlockInputStream.h"
2#include <Common/setThreadName.h>
3#include <Common/CurrentThread.h>
4
5
6namespace DB
7{
8
9Block AsynchronousBlockInputStream::readImpl()
10{
11 /// If there were no calculations yet, calculate the first block synchronously
12 if (!started)
13 {
14 calculate();
15 started = true;
16 }
17 else /// If the calculations are already in progress - wait for the result
18 pool.wait();
19
20 if (exception)
21 std::rethrow_exception(exception);
22
23 Block res = block;
24 if (!res)
25 return res;
26
27 /// Start the next block calculation
28 block.clear();
29 next();
30
31 return res;
32}
33
34
35void AsynchronousBlockInputStream::next()
36{
37 ready.reset();
38
39 pool.scheduleOrThrowOnError([this, thread_group = CurrentThread::getGroup()]()
40 {
41 CurrentMetrics::Increment metric_increment{CurrentMetrics::QueryThread};
42
43 try
44 {
45 if (first)
46 setThreadName("AsyncBlockInput");
47
48 /// AsynchronousBlockInputStream is used in Client which does not create queries and thread groups
49 if (thread_group)
50 CurrentThread::attachToIfDetached(thread_group);
51 }
52 catch (...)
53 {
54 exception = std::current_exception();
55 ready.set();
56 return;
57 }
58
59 calculate();
60 });
61}
62
63
64void AsynchronousBlockInputStream::calculate()
65{
66 try
67 {
68 if (first)
69 {
70 first = false;
71 children.back()->readPrefix();
72 }
73
74 block = children.back()->read();
75 }
76 catch (...)
77 {
78 exception = std::current_exception();
79 }
80
81 ready.set();
82}
83
84}
85
86