1 | #include <Processors/Transforms/CreatingSetsTransform.h> |
2 | |
3 | #include <DataStreams/BlockStreamProfileInfo.h> |
4 | #include <DataStreams/IBlockInputStream.h> |
5 | #include <DataStreams/IBlockOutputStream.h> |
6 | |
7 | #include <Interpreters/Set.h> |
8 | #include <Interpreters/Join.h> |
9 | #include <Storages/IStorage.h> |
10 | |
11 | #include <iomanip> |
12 | #include <DataStreams/materializeBlock.h> |
13 | |
14 | namespace DB |
15 | { |
16 | |
17 | namespace ErrorCodes |
18 | { |
19 | extern const int SET_SIZE_LIMIT_EXCEEDED; |
20 | } |
21 | |
22 | |
23 | CreatingSetsTransform::CreatingSetsTransform( |
24 | Block , |
25 | const SubqueriesForSets & subqueries_for_sets_, |
26 | const SizeLimits & network_transfer_limits_, |
27 | const Context & context_) |
28 | : IProcessor({}, {std::move(out_header_)}) |
29 | , subqueries_for_sets(subqueries_for_sets_) |
30 | , cur_subquery(subqueries_for_sets.begin()) |
31 | , network_transfer_limits(network_transfer_limits_) |
32 | , context(context_) |
33 | { |
34 | } |
35 | |
36 | IProcessor::Status CreatingSetsTransform::prepare() |
37 | { |
38 | auto & output = outputs.front(); |
39 | |
40 | if (finished) |
41 | { |
42 | output.finish(); |
43 | return Status::Finished; |
44 | } |
45 | |
46 | /// Check can output. |
47 | if (output.isFinished()) |
48 | return Status::Finished; |
49 | |
50 | if (!output.canPush()) |
51 | return Status::PortFull; |
52 | |
53 | return Status::Ready; |
54 | } |
55 | |
56 | void CreatingSetsTransform::startSubquery(SubqueryForSet & subquery) |
57 | { |
58 | LOG_TRACE(log, (subquery.set ? "Creating set. " : "" ) |
59 | << (subquery.join ? "Creating join. " : "" ) |
60 | << (subquery.table ? "Filling temporary table. " : "" )); |
61 | |
62 | elapsed_nanoseconds = 0; |
63 | |
64 | if (subquery.table) |
65 | table_out = subquery.table->write({}, context); |
66 | |
67 | done_with_set = !subquery.set; |
68 | done_with_join = !subquery.join; |
69 | done_with_table = !subquery.table; |
70 | |
71 | if (done_with_set && done_with_join && done_with_table) |
72 | throw Exception("Logical error: nothing to do with subquery" , ErrorCodes::LOGICAL_ERROR); |
73 | |
74 | if (table_out) |
75 | table_out->writePrefix(); |
76 | } |
77 | |
78 | void CreatingSetsTransform::finishSubquery(SubqueryForSet & subquery) |
79 | { |
80 | size_t head_rows = 0; |
81 | const BlockStreamProfileInfo & profile_info = subquery.source->getProfileInfo(); |
82 | |
83 | head_rows = profile_info.rows; |
84 | |
85 | subquery.setTotals(); |
86 | |
87 | if (head_rows != 0) |
88 | { |
89 | std::stringstream msg; |
90 | msg << std::fixed << std::setprecision(3); |
91 | msg << "Created. " ; |
92 | |
93 | if (subquery.set) |
94 | msg << "Set with " << subquery.set->getTotalRowCount() << " entries from " << head_rows << " rows. " ; |
95 | if (subquery.join) |
96 | msg << "Join with " << subquery.join->getTotalRowCount() << " entries from " << head_rows << " rows. " ; |
97 | if (subquery.table) |
98 | msg << "Table with " << head_rows << " rows. " ; |
99 | |
100 | msg << "In " << (static_cast<double>(elapsed_nanoseconds) / 1000000000ULL) << " sec." ; |
101 | LOG_DEBUG(log, msg.rdbuf()); |
102 | } |
103 | else |
104 | { |
105 | LOG_DEBUG(log, "Subquery has empty result." ); |
106 | } |
107 | } |
108 | |
109 | void CreatingSetsTransform::init() |
110 | { |
111 | is_initialized = true; |
112 | |
113 | for (auto & elem : subqueries_for_sets) |
114 | if (elem.second.source && elem.second.set) |
115 | elem.second.set->setHeader(elem.second.source->getHeader()); |
116 | } |
117 | |
118 | void CreatingSetsTransform::work() |
119 | { |
120 | if (!is_initialized) |
121 | init(); |
122 | |
123 | Stopwatch watch; |
124 | |
125 | while (cur_subquery != subqueries_for_sets.end() && cur_subquery->second.source == nullptr) |
126 | ++cur_subquery; |
127 | |
128 | if (cur_subquery == subqueries_for_sets.end()) |
129 | { |
130 | finished = true; |
131 | return; |
132 | } |
133 | |
134 | SubqueryForSet & subquery = cur_subquery->second; |
135 | |
136 | if (!started_cur_subquery) |
137 | { |
138 | startSubquery(subquery); |
139 | started_cur_subquery = true; |
140 | } |
141 | |
142 | auto finishCurrentSubquery = [&]() |
143 | { |
144 | if (subquery.set) |
145 | subquery.set->finishInsert(); |
146 | |
147 | if (table_out) |
148 | table_out->writeSuffix(); |
149 | |
150 | watch.stop(); |
151 | elapsed_nanoseconds += watch.elapsedNanoseconds(); |
152 | |
153 | finishSubquery(subquery); |
154 | |
155 | ++cur_subquery; |
156 | started_cur_subquery = false; |
157 | |
158 | while (cur_subquery != subqueries_for_sets.end() && cur_subquery->second.source == nullptr) |
159 | ++cur_subquery; |
160 | |
161 | if (cur_subquery == subqueries_for_sets.end()) |
162 | finished = true; |
163 | }; |
164 | |
165 | auto block = subquery.source->read(); |
166 | if (!block) |
167 | { |
168 | finishCurrentSubquery(); |
169 | return; |
170 | } |
171 | |
172 | if (!done_with_set) |
173 | { |
174 | if (!subquery.set->insertFromBlock(block)) |
175 | done_with_set = true; |
176 | } |
177 | |
178 | if (!done_with_join) |
179 | { |
180 | if (!subquery.insertJoinedBlock(block)) |
181 | done_with_join = true; |
182 | } |
183 | |
184 | if (!done_with_table) |
185 | { |
186 | block = materializeBlock(block); |
187 | table_out->write(block); |
188 | |
189 | rows_to_transfer += block.rows(); |
190 | bytes_to_transfer += block.bytes(); |
191 | |
192 | if (!network_transfer_limits.check(rows_to_transfer, bytes_to_transfer, "IN/JOIN external table" , |
193 | ErrorCodes::SET_SIZE_LIMIT_EXCEEDED)) |
194 | done_with_table = true; |
195 | } |
196 | |
197 | if (done_with_set && done_with_join && done_with_table) |
198 | { |
199 | subquery.source->cancel(false); |
200 | finishCurrentSubquery(); |
201 | } |
202 | else |
203 | elapsed_nanoseconds += watch.elapsedNanoseconds(); |
204 | } |
205 | |
206 | void CreatingSetsTransform::setProgressCallback(const ProgressCallback & callback) |
207 | { |
208 | for (auto & elem : subqueries_for_sets) |
209 | if (elem.second.source) |
210 | elem.second.source->setProgressCallback(callback); |
211 | } |
212 | |
213 | void CreatingSetsTransform::setProcessListElement(QueryStatus * status) |
214 | { |
215 | for (auto & elem : subqueries_for_sets) |
216 | if (elem.second.source) |
217 | elem.second.source->setProcessListElement(status); |
218 | } |
219 | |
220 | } |
221 | |