1#include <Interpreters/Set.h>
2#include <DataStreams/materializeBlock.h>
3#include <DataStreams/IBlockOutputStream.h>
4#include <DataStreams/CreatingSetsBlockInputStream.h>
5#include <Storages/IStorage.h>
6#include <iomanip>
7
8
9namespace DB
10{
11
12namespace ErrorCodes
13{
14 extern const int SET_SIZE_LIMIT_EXCEEDED;
15}
16
17
18CreatingSetsBlockInputStream::CreatingSetsBlockInputStream(
19 const BlockInputStreamPtr & input,
20 const SubqueriesForSets & subqueries_for_sets_,
21 const Context & context_)
22 : subqueries_for_sets(subqueries_for_sets_)
23 , context(context_)
24{
25 const Settings & settings = context.getSettingsRef();
26 network_transfer_limits = SizeLimits(
27 settings.max_rows_to_transfer, settings.max_bytes_to_transfer, settings.transfer_overflow_mode);
28
29 for (auto & elem : subqueries_for_sets)
30 {
31 if (elem.second.source)
32 {
33 children.push_back(elem.second.source);
34
35 if (elem.second.set)
36 elem.second.set->setHeader(elem.second.source->getHeader());
37 }
38 }
39
40 children.push_back(input);
41}
42
43
44Block CreatingSetsBlockInputStream::readImpl()
45{
46 Block res;
47
48 createAll();
49
50 if (isCancelledOrThrowIfKilled())
51 return res;
52
53 return children.back()->read();
54}
55
56
57void CreatingSetsBlockInputStream::readPrefixImpl()
58{
59 createAll();
60}
61
62
63Block CreatingSetsBlockInputStream::getTotals()
64{
65 return children.back()->getTotals();
66}
67
68
69void CreatingSetsBlockInputStream::createAll()
70{
71 if (!created)
72 {
73 for (auto & elem : subqueries_for_sets)
74 {
75 if (elem.second.source) /// There could be prepared in advance Set/Join - no source is specified for them.
76 {
77 if (isCancelledOrThrowIfKilled())
78 return;
79
80 createOne(elem.second);
81 }
82 }
83
84 created = true;
85 }
86}
87
88
89void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery)
90{
91 LOG_TRACE(log, (subquery.set ? "Creating set. " : "")
92 << (subquery.join ? "Creating join. " : "")
93 << (subquery.table ? "Filling temporary table. " : ""));
94 Stopwatch watch;
95
96 BlockOutputStreamPtr table_out;
97 if (subquery.table)
98 table_out = subquery.table->write({}, context);
99
100 bool done_with_set = !subquery.set;
101 bool done_with_join = !subquery.join;
102 bool done_with_table = !subquery.table;
103
104 if (done_with_set && done_with_join && done_with_table)
105 throw Exception("Logical error: nothing to do with subquery", ErrorCodes::LOGICAL_ERROR);
106
107 if (table_out)
108 table_out->writePrefix();
109
110 while (Block block = subquery.source->read())
111 {
112 if (isCancelled())
113 {
114 LOG_DEBUG(log, "Query was cancelled during set / join or temporary table creation.");
115 return;
116 }
117
118 if (!done_with_set)
119 {
120 if (!subquery.set->insertFromBlock(block))
121 done_with_set = true;
122 }
123
124 if (!done_with_join)
125 {
126 if (!subquery.insertJoinedBlock(block))
127 done_with_join = true;
128 }
129
130 if (!done_with_table)
131 {
132 block = materializeBlock(block);
133 table_out->write(block);
134
135 rows_to_transfer += block.rows();
136 bytes_to_transfer += block.bytes();
137
138 if (!network_transfer_limits.check(rows_to_transfer, bytes_to_transfer, "IN/JOIN external table", ErrorCodes::SET_SIZE_LIMIT_EXCEEDED))
139 done_with_table = true;
140 }
141
142 if (done_with_set && done_with_join && done_with_table)
143 {
144 subquery.source->cancel(false);
145 break;
146 }
147 }
148
149 if (subquery.set)
150 subquery.set->finishInsert();
151
152 if (table_out)
153 table_out->writeSuffix();
154
155 watch.stop();
156
157 size_t head_rows = 0;
158 const BlockStreamProfileInfo & profile_info = subquery.source->getProfileInfo();
159
160 head_rows = profile_info.rows;
161
162 subquery.setTotals();
163
164 if (head_rows != 0)
165 {
166 std::stringstream msg;
167 msg << std::fixed << std::setprecision(3);
168 msg << "Created. ";
169
170 if (subquery.set)
171 msg << "Set with " << subquery.set->getTotalRowCount() << " entries from " << head_rows << " rows. ";
172 if (subquery.join)
173 msg << "Join with " << subquery.join->getTotalRowCount() << " entries from " << head_rows << " rows. ";
174 if (subquery.table)
175 msg << "Table with " << head_rows << " rows. ";
176
177 msg << "In " << watch.elapsedSeconds() << " sec.";
178 LOG_DEBUG(log, msg.rdbuf());
179 }
180 else
181 {
182 LOG_DEBUG(log, "Subquery has empty result.");
183 }
184}
185
186}
187