1#include <Processors/Transforms/CreatingSetsTransform.h>
2
3#include <DataStreams/BlockStreamProfileInfo.h>
4#include <DataStreams/IBlockInputStream.h>
5#include <DataStreams/IBlockOutputStream.h>
6
7#include <Interpreters/Set.h>
8#include <Interpreters/Join.h>
9#include <Storages/IStorage.h>
10
11#include <iomanip>
12#include <DataStreams/materializeBlock.h>
13
14namespace DB
15{
16
17namespace ErrorCodes
18{
19 extern const int SET_SIZE_LIMIT_EXCEEDED;
20}
21
22
23CreatingSetsTransform::CreatingSetsTransform(
24 Block out_header_,
25 const SubqueriesForSets & subqueries_for_sets_,
26 const SizeLimits & network_transfer_limits_,
27 const Context & context_)
28 : IProcessor({}, {std::move(out_header_)})
29 , subqueries_for_sets(subqueries_for_sets_)
30 , cur_subquery(subqueries_for_sets.begin())
31 , network_transfer_limits(network_transfer_limits_)
32 , context(context_)
33{
34}
35
36IProcessor::Status CreatingSetsTransform::prepare()
37{
38 auto & output = outputs.front();
39
40 if (finished)
41 {
42 output.finish();
43 return Status::Finished;
44 }
45
46 /// Check can output.
47 if (output.isFinished())
48 return Status::Finished;
49
50 if (!output.canPush())
51 return Status::PortFull;
52
53 return Status::Ready;
54}
55
56void CreatingSetsTransform::startSubquery(SubqueryForSet & subquery)
57{
58 LOG_TRACE(log, (subquery.set ? "Creating set. " : "")
59 << (subquery.join ? "Creating join. " : "")
60 << (subquery.table ? "Filling temporary table. " : ""));
61
62 elapsed_nanoseconds = 0;
63
64 if (subquery.table)
65 table_out = subquery.table->write({}, context);
66
67 done_with_set = !subquery.set;
68 done_with_join = !subquery.join;
69 done_with_table = !subquery.table;
70
71 if (done_with_set && done_with_join && done_with_table)
72 throw Exception("Logical error: nothing to do with subquery", ErrorCodes::LOGICAL_ERROR);
73
74 if (table_out)
75 table_out->writePrefix();
76}
77
78void CreatingSetsTransform::finishSubquery(SubqueryForSet & subquery)
79{
80 size_t head_rows = 0;
81 const BlockStreamProfileInfo & profile_info = subquery.source->getProfileInfo();
82
83 head_rows = profile_info.rows;
84
85 subquery.setTotals();
86
87 if (head_rows != 0)
88 {
89 std::stringstream msg;
90 msg << std::fixed << std::setprecision(3);
91 msg << "Created. ";
92
93 if (subquery.set)
94 msg << "Set with " << subquery.set->getTotalRowCount() << " entries from " << head_rows << " rows. ";
95 if (subquery.join)
96 msg << "Join with " << subquery.join->getTotalRowCount() << " entries from " << head_rows << " rows. ";
97 if (subquery.table)
98 msg << "Table with " << head_rows << " rows. ";
99
100 msg << "In " << (static_cast<double>(elapsed_nanoseconds) / 1000000000ULL) << " sec.";
101 LOG_DEBUG(log, msg.rdbuf());
102 }
103 else
104 {
105 LOG_DEBUG(log, "Subquery has empty result.");
106 }
107}
108
109void CreatingSetsTransform::init()
110{
111 is_initialized = true;
112
113 for (auto & elem : subqueries_for_sets)
114 if (elem.second.source && elem.second.set)
115 elem.second.set->setHeader(elem.second.source->getHeader());
116}
117
118void CreatingSetsTransform::work()
119{
120 if (!is_initialized)
121 init();
122
123 Stopwatch watch;
124
125 while (cur_subquery != subqueries_for_sets.end() && cur_subquery->second.source == nullptr)
126 ++cur_subquery;
127
128 if (cur_subquery == subqueries_for_sets.end())
129 {
130 finished = true;
131 return;
132 }
133
134 SubqueryForSet & subquery = cur_subquery->second;
135
136 if (!started_cur_subquery)
137 {
138 startSubquery(subquery);
139 started_cur_subquery = true;
140 }
141
142 auto finishCurrentSubquery = [&]()
143 {
144 if (subquery.set)
145 subquery.set->finishInsert();
146
147 if (table_out)
148 table_out->writeSuffix();
149
150 watch.stop();
151 elapsed_nanoseconds += watch.elapsedNanoseconds();
152
153 finishSubquery(subquery);
154
155 ++cur_subquery;
156 started_cur_subquery = false;
157
158 while (cur_subquery != subqueries_for_sets.end() && cur_subquery->second.source == nullptr)
159 ++cur_subquery;
160
161 if (cur_subquery == subqueries_for_sets.end())
162 finished = true;
163 };
164
165 auto block = subquery.source->read();
166 if (!block)
167 {
168 finishCurrentSubquery();
169 return;
170 }
171
172 if (!done_with_set)
173 {
174 if (!subquery.set->insertFromBlock(block))
175 done_with_set = true;
176 }
177
178 if (!done_with_join)
179 {
180 if (!subquery.insertJoinedBlock(block))
181 done_with_join = true;
182 }
183
184 if (!done_with_table)
185 {
186 block = materializeBlock(block);
187 table_out->write(block);
188
189 rows_to_transfer += block.rows();
190 bytes_to_transfer += block.bytes();
191
192 if (!network_transfer_limits.check(rows_to_transfer, bytes_to_transfer, "IN/JOIN external table",
193 ErrorCodes::SET_SIZE_LIMIT_EXCEEDED))
194 done_with_table = true;
195 }
196
197 if (done_with_set && done_with_join && done_with_table)
198 {
199 subquery.source->cancel(false);
200 finishCurrentSubquery();
201 }
202 else
203 elapsed_nanoseconds += watch.elapsedNanoseconds();
204}
205
206void CreatingSetsTransform::setProgressCallback(const ProgressCallback & callback)
207{
208 for (auto & elem : subqueries_for_sets)
209 if (elem.second.source)
210 elem.second.source->setProgressCallback(callback);
211}
212
213void CreatingSetsTransform::setProcessListElement(QueryStatus * status)
214{
215 for (auto & elem : subqueries_for_sets)
216 if (elem.second.source)
217 elem.second.source->setProcessListElement(status);
218}
219
220}
221