1 | #include <DataStreams/AddingDefaultBlockOutputStream.h> |
2 | #include <DataStreams/ConvertingBlockInputStream.h> |
3 | #include <DataStreams/PushingToViewsBlockOutputStream.h> |
4 | #include <DataStreams/SquashingBlockInputStream.h> |
5 | #include <DataTypes/NestedUtils.h> |
6 | #include <Interpreters/InterpreterSelectQuery.h> |
7 | #include <Interpreters/InterpreterInsertQuery.h> |
8 | #include <Parsers/ASTInsertQuery.h> |
9 | #include <Common/CurrentThread.h> |
10 | #include <Common/setThreadName.h> |
11 | #include <Common/getNumberOfPhysicalCPUCores.h> |
12 | #include <Common/ThreadPool.h> |
13 | #include <Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h> |
14 | #include <Storages/StorageValues.h> |
15 | #include <Storages/LiveView/StorageLiveView.h> |
16 | |
17 | namespace DB |
18 | { |
19 | |
20 | PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream( |
21 | const String & database, const String & table, const StoragePtr & storage_, |
22 | const Context & context_, const ASTPtr & query_ptr_, bool no_destination) |
23 | : storage(storage_), context(context_), query_ptr(query_ptr_) |
24 | { |
25 | /** TODO This is a very important line. At any insertion into the table one of streams should own lock. |
26 | * Although now any insertion into the table is done via PushingToViewsBlockOutputStream, |
27 | * but it's clear that here is not the best place for this functionality. |
28 | */ |
29 | addTableLock(storage->lockStructureForShare(true, context.getInitialQueryId())); |
30 | |
31 | /// If the "root" table deduplactes blocks, there are no need to make deduplication for children |
32 | /// Moreover, deduplication for AggregatingMergeTree children could produce false positives due to low size of inserting blocks |
33 | bool disable_deduplication_for_children = !no_destination && storage->supportsDeduplication(); |
34 | |
35 | if (!table.empty()) |
36 | { |
37 | Dependencies dependencies = context.getDependencies(database, table); |
38 | |
39 | /// We need special context for materialized views insertions |
40 | if (!dependencies.empty()) |
41 | { |
42 | views_context = std::make_unique<Context>(context); |
43 | // Do not deduplicate insertions into MV if the main insertion is Ok |
44 | if (disable_deduplication_for_children) |
45 | views_context->getSettingsRef().insert_deduplicate = false; |
46 | } |
47 | |
48 | for (const auto & database_table : dependencies) |
49 | { |
50 | auto dependent_table = context.getTable(database_table.first, database_table.second); |
51 | |
52 | ASTPtr query; |
53 | BlockOutputStreamPtr out; |
54 | |
55 | if (auto * materialized_view = dynamic_cast<const StorageMaterializedView *>(dependent_table.get())) |
56 | { |
57 | StoragePtr inner_table = materialized_view->getTargetTable(); |
58 | query = materialized_view->getInnerQuery(); |
59 | std::unique_ptr<ASTInsertQuery> insert = std::make_unique<ASTInsertQuery>(); |
60 | insert->database = inner_table->getDatabaseName(); |
61 | insert->table = inner_table->getTableName(); |
62 | ASTPtr insert_query_ptr(insert.release()); |
63 | InterpreterInsertQuery interpreter(insert_query_ptr, *views_context); |
64 | BlockIO io = interpreter.execute(); |
65 | out = io.out; |
66 | } |
67 | else if (dynamic_cast<const StorageLiveView *>(dependent_table.get())) |
68 | out = std::make_shared<PushingToViewsBlockOutputStream>( |
69 | database_table.first, database_table.second, dependent_table, *views_context, ASTPtr(), true); |
70 | else |
71 | out = std::make_shared<PushingToViewsBlockOutputStream>( |
72 | database_table.first, database_table.second, dependent_table, *views_context, ASTPtr()); |
73 | |
74 | views.emplace_back(ViewInfo{std::move(query), database_table.first, database_table.second, std::move(out)}); |
75 | } |
76 | } |
77 | |
78 | /* Do not push to destination table if the flag is set */ |
79 | if (!no_destination) |
80 | { |
81 | output = storage->write(query_ptr, context); |
82 | replicated_output = dynamic_cast<ReplicatedMergeTreeBlockOutputStream *>(output.get()); |
83 | } |
84 | } |
85 | |
86 | |
87 | Block PushingToViewsBlockOutputStream::() const |
88 | { |
89 | /// If we don't write directly to the destination |
90 | /// then expect that we're inserting with precalculated virtual columns |
91 | if (output) |
92 | return storage->getSampleBlock(); |
93 | else |
94 | return storage->getSampleBlockWithVirtuals(); |
95 | } |
96 | |
97 | |
98 | void PushingToViewsBlockOutputStream::write(const Block & block) |
99 | { |
100 | /** Throw an exception if the sizes of arrays - elements of nested data structures doesn't match. |
101 | * We have to make this assertion before writing to table, because storage engine may assume that they have equal sizes. |
102 | * NOTE It'd better to do this check in serialization of nested structures (in place when this assumption is required), |
103 | * but currently we don't have methods for serialization of nested structures "as a whole". |
104 | */ |
105 | Nested::validateArraySizes(block); |
106 | |
107 | if (auto * live_view = dynamic_cast<StorageLiveView *>(storage.get())) |
108 | { |
109 | StorageLiveView::writeIntoLiveView(*live_view, block, context); |
110 | } |
111 | else |
112 | { |
113 | if (output) |
114 | /// TODO: to support virtual and alias columns inside MVs, we should return here the inserted block extended |
115 | /// with additional columns directly from storage and pass it to MVs instead of raw block. |
116 | output->write(block); |
117 | } |
118 | |
119 | /// Don't process materialized views if this block is duplicate |
120 | if (replicated_output && replicated_output->lastBlockIsDuplicate()) |
121 | return; |
122 | |
123 | // Insert data into materialized views only after successful insert into main table |
124 | const Settings & settings = context.getSettingsRef(); |
125 | if (settings.parallel_view_processing && views.size() > 1) |
126 | { |
127 | // Push to views concurrently if enabled, and more than one view is attached |
128 | ThreadPool pool(std::min(size_t(settings.max_threads), views.size())); |
129 | for (size_t view_num = 0; view_num < views.size(); ++view_num) |
130 | { |
131 | auto thread_group = CurrentThread::getGroup(); |
132 | pool.scheduleOrThrowOnError([=, this] |
133 | { |
134 | setThreadName("PushingToViews" ); |
135 | if (thread_group) |
136 | CurrentThread::attachToIfDetached(thread_group); |
137 | process(block, view_num); |
138 | }); |
139 | } |
140 | // Wait for concurrent view processing |
141 | pool.wait(); |
142 | } |
143 | else |
144 | { |
145 | // Process sequentially |
146 | for (size_t view_num = 0; view_num < views.size(); ++view_num) |
147 | process(block, view_num); |
148 | } |
149 | } |
150 | |
151 | void PushingToViewsBlockOutputStream::writePrefix() |
152 | { |
153 | if (output) |
154 | output->writePrefix(); |
155 | |
156 | for (auto & view : views) |
157 | { |
158 | try |
159 | { |
160 | view.out->writePrefix(); |
161 | } |
162 | catch (Exception & ex) |
163 | { |
164 | ex.addMessage("while write prefix to view " + view.database + "." + view.table); |
165 | throw; |
166 | } |
167 | } |
168 | } |
169 | |
170 | void PushingToViewsBlockOutputStream::writeSuffix() |
171 | { |
172 | if (output) |
173 | output->writeSuffix(); |
174 | |
175 | for (auto & view : views) |
176 | { |
177 | try |
178 | { |
179 | view.out->writeSuffix(); |
180 | } |
181 | catch (Exception & ex) |
182 | { |
183 | ex.addMessage("while write prefix to view " + view.database + "." + view.table); |
184 | throw; |
185 | } |
186 | } |
187 | } |
188 | |
189 | void PushingToViewsBlockOutputStream::flush() |
190 | { |
191 | if (output) |
192 | output->flush(); |
193 | |
194 | for (auto & view : views) |
195 | view.out->flush(); |
196 | } |
197 | |
198 | void PushingToViewsBlockOutputStream::process(const Block & block, size_t view_num) |
199 | { |
200 | auto & view = views[view_num]; |
201 | |
202 | try |
203 | { |
204 | BlockInputStreamPtr in; |
205 | |
206 | /// We need keep InterpreterSelectQuery, until the processing will be finished, since: |
207 | /// |
208 | /// - We copy Context inside InterpreterSelectQuery to support |
209 | /// modification of context (Settings) for subqueries |
210 | /// - InterpreterSelectQuery lives shorter than query pipeline. |
211 | /// It's used just to build the query pipeline and no longer needed |
212 | /// - ExpressionAnalyzer and then, Functions, that created in InterpreterSelectQuery, |
213 | /// **can** take a reference to Context from InterpreterSelectQuery |
214 | /// (the problem raises only when function uses context from the |
215 | /// execute*() method, like FunctionDictGet do) |
216 | /// - These objects live inside query pipeline (DataStreams) and the reference become dangling. |
217 | std::optional<InterpreterSelectQuery> select; |
218 | |
219 | if (view.query) |
220 | { |
221 | /// We create a table with the same name as original table and the same alias columns, |
222 | /// but it will contain single block (that is INSERT-ed into main table). |
223 | /// InterpreterSelectQuery will do processing of alias columns. |
224 | Context local_context = *views_context; |
225 | local_context.addViewSource( |
226 | StorageValues::create(storage->getDatabaseName(), storage->getTableName(), storage->getColumns(), |
227 | block)); |
228 | select.emplace(view.query, local_context, SelectQueryOptions()); |
229 | in = std::make_shared<MaterializingBlockInputStream>(select->execute().in); |
230 | |
231 | /// Squashing is needed here because the materialized view query can generate a lot of blocks |
232 | /// even when only one block is inserted into the parent table (e.g. if the query is a GROUP BY |
233 | /// and two-level aggregation is triggered). |
234 | in = std::make_shared<SquashingBlockInputStream>( |
235 | in, context.getSettingsRef().min_insert_block_size_rows, context.getSettingsRef().min_insert_block_size_bytes); |
236 | in = std::make_shared<ConvertingBlockInputStream>(context, in, view.out->getHeader(), ConvertingBlockInputStream::MatchColumnsMode::Name); |
237 | } |
238 | else |
239 | in = std::make_shared<OneBlockInputStream>(block); |
240 | |
241 | in->readPrefix(); |
242 | |
243 | while (Block result_block = in->read()) |
244 | { |
245 | Nested::validateArraySizes(result_block); |
246 | view.out->write(result_block); |
247 | } |
248 | |
249 | in->readSuffix(); |
250 | } |
251 | catch (Exception & ex) |
252 | { |
253 | ex.addMessage("while pushing to view " + backQuoteIfNeed(view.database) + "." + backQuoteIfNeed(view.table)); |
254 | throw; |
255 | } |
256 | } |
257 | |
258 | } |
259 | |