| 1 | #include <DataStreams/AddingDefaultBlockOutputStream.h> |
| 2 | #include <DataStreams/ConvertingBlockInputStream.h> |
| 3 | #include <DataStreams/PushingToViewsBlockOutputStream.h> |
| 4 | #include <DataStreams/SquashingBlockInputStream.h> |
| 5 | #include <DataTypes/NestedUtils.h> |
| 6 | #include <Interpreters/InterpreterSelectQuery.h> |
| 7 | #include <Interpreters/InterpreterInsertQuery.h> |
| 8 | #include <Parsers/ASTInsertQuery.h> |
| 9 | #include <Common/CurrentThread.h> |
| 10 | #include <Common/setThreadName.h> |
| 11 | #include <Common/getNumberOfPhysicalCPUCores.h> |
| 12 | #include <Common/ThreadPool.h> |
| 13 | #include <Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h> |
| 14 | #include <Storages/StorageValues.h> |
| 15 | #include <Storages/LiveView/StorageLiveView.h> |
| 16 | |
| 17 | namespace DB |
| 18 | { |
| 19 | |
| 20 | PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream( |
| 21 | const String & database, const String & table, const StoragePtr & storage_, |
| 22 | const Context & context_, const ASTPtr & query_ptr_, bool no_destination) |
| 23 | : storage(storage_), context(context_), query_ptr(query_ptr_) |
| 24 | { |
| 25 | /** TODO This is a very important line. At any insertion into the table one of streams should own lock. |
| 26 | * Although now any insertion into the table is done via PushingToViewsBlockOutputStream, |
| 27 | * but it's clear that here is not the best place for this functionality. |
| 28 | */ |
| 29 | addTableLock(storage->lockStructureForShare(true, context.getInitialQueryId())); |
| 30 | |
| 31 | /// If the "root" table deduplactes blocks, there are no need to make deduplication for children |
| 32 | /// Moreover, deduplication for AggregatingMergeTree children could produce false positives due to low size of inserting blocks |
| 33 | bool disable_deduplication_for_children = !no_destination && storage->supportsDeduplication(); |
| 34 | |
| 35 | if (!table.empty()) |
| 36 | { |
| 37 | Dependencies dependencies = context.getDependencies(database, table); |
| 38 | |
| 39 | /// We need special context for materialized views insertions |
| 40 | if (!dependencies.empty()) |
| 41 | { |
| 42 | views_context = std::make_unique<Context>(context); |
| 43 | // Do not deduplicate insertions into MV if the main insertion is Ok |
| 44 | if (disable_deduplication_for_children) |
| 45 | views_context->getSettingsRef().insert_deduplicate = false; |
| 46 | } |
| 47 | |
| 48 | for (const auto & database_table : dependencies) |
| 49 | { |
| 50 | auto dependent_table = context.getTable(database_table.first, database_table.second); |
| 51 | |
| 52 | ASTPtr query; |
| 53 | BlockOutputStreamPtr out; |
| 54 | |
| 55 | if (auto * materialized_view = dynamic_cast<const StorageMaterializedView *>(dependent_table.get())) |
| 56 | { |
| 57 | StoragePtr inner_table = materialized_view->getTargetTable(); |
| 58 | query = materialized_view->getInnerQuery(); |
| 59 | std::unique_ptr<ASTInsertQuery> insert = std::make_unique<ASTInsertQuery>(); |
| 60 | insert->database = inner_table->getDatabaseName(); |
| 61 | insert->table = inner_table->getTableName(); |
| 62 | ASTPtr insert_query_ptr(insert.release()); |
| 63 | InterpreterInsertQuery interpreter(insert_query_ptr, *views_context); |
| 64 | BlockIO io = interpreter.execute(); |
| 65 | out = io.out; |
| 66 | } |
| 67 | else if (dynamic_cast<const StorageLiveView *>(dependent_table.get())) |
| 68 | out = std::make_shared<PushingToViewsBlockOutputStream>( |
| 69 | database_table.first, database_table.second, dependent_table, *views_context, ASTPtr(), true); |
| 70 | else |
| 71 | out = std::make_shared<PushingToViewsBlockOutputStream>( |
| 72 | database_table.first, database_table.second, dependent_table, *views_context, ASTPtr()); |
| 73 | |
| 74 | views.emplace_back(ViewInfo{std::move(query), database_table.first, database_table.second, std::move(out)}); |
| 75 | } |
| 76 | } |
| 77 | |
| 78 | /* Do not push to destination table if the flag is set */ |
| 79 | if (!no_destination) |
| 80 | { |
| 81 | output = storage->write(query_ptr, context); |
| 82 | replicated_output = dynamic_cast<ReplicatedMergeTreeBlockOutputStream *>(output.get()); |
| 83 | } |
| 84 | } |
| 85 | |
| 86 | |
| 87 | Block PushingToViewsBlockOutputStream::() const |
| 88 | { |
| 89 | /// If we don't write directly to the destination |
| 90 | /// then expect that we're inserting with precalculated virtual columns |
| 91 | if (output) |
| 92 | return storage->getSampleBlock(); |
| 93 | else |
| 94 | return storage->getSampleBlockWithVirtuals(); |
| 95 | } |
| 96 | |
| 97 | |
| 98 | void PushingToViewsBlockOutputStream::write(const Block & block) |
| 99 | { |
| 100 | /** Throw an exception if the sizes of arrays - elements of nested data structures doesn't match. |
| 101 | * We have to make this assertion before writing to table, because storage engine may assume that they have equal sizes. |
| 102 | * NOTE It'd better to do this check in serialization of nested structures (in place when this assumption is required), |
| 103 | * but currently we don't have methods for serialization of nested structures "as a whole". |
| 104 | */ |
| 105 | Nested::validateArraySizes(block); |
| 106 | |
| 107 | if (auto * live_view = dynamic_cast<StorageLiveView *>(storage.get())) |
| 108 | { |
| 109 | StorageLiveView::writeIntoLiveView(*live_view, block, context); |
| 110 | } |
| 111 | else |
| 112 | { |
| 113 | if (output) |
| 114 | /// TODO: to support virtual and alias columns inside MVs, we should return here the inserted block extended |
| 115 | /// with additional columns directly from storage and pass it to MVs instead of raw block. |
| 116 | output->write(block); |
| 117 | } |
| 118 | |
| 119 | /// Don't process materialized views if this block is duplicate |
| 120 | if (replicated_output && replicated_output->lastBlockIsDuplicate()) |
| 121 | return; |
| 122 | |
| 123 | // Insert data into materialized views only after successful insert into main table |
| 124 | const Settings & settings = context.getSettingsRef(); |
| 125 | if (settings.parallel_view_processing && views.size() > 1) |
| 126 | { |
| 127 | // Push to views concurrently if enabled, and more than one view is attached |
| 128 | ThreadPool pool(std::min(size_t(settings.max_threads), views.size())); |
| 129 | for (size_t view_num = 0; view_num < views.size(); ++view_num) |
| 130 | { |
| 131 | auto thread_group = CurrentThread::getGroup(); |
| 132 | pool.scheduleOrThrowOnError([=, this] |
| 133 | { |
| 134 | setThreadName("PushingToViews" ); |
| 135 | if (thread_group) |
| 136 | CurrentThread::attachToIfDetached(thread_group); |
| 137 | process(block, view_num); |
| 138 | }); |
| 139 | } |
| 140 | // Wait for concurrent view processing |
| 141 | pool.wait(); |
| 142 | } |
| 143 | else |
| 144 | { |
| 145 | // Process sequentially |
| 146 | for (size_t view_num = 0; view_num < views.size(); ++view_num) |
| 147 | process(block, view_num); |
| 148 | } |
| 149 | } |
| 150 | |
| 151 | void PushingToViewsBlockOutputStream::writePrefix() |
| 152 | { |
| 153 | if (output) |
| 154 | output->writePrefix(); |
| 155 | |
| 156 | for (auto & view : views) |
| 157 | { |
| 158 | try |
| 159 | { |
| 160 | view.out->writePrefix(); |
| 161 | } |
| 162 | catch (Exception & ex) |
| 163 | { |
| 164 | ex.addMessage("while write prefix to view " + view.database + "." + view.table); |
| 165 | throw; |
| 166 | } |
| 167 | } |
| 168 | } |
| 169 | |
| 170 | void PushingToViewsBlockOutputStream::writeSuffix() |
| 171 | { |
| 172 | if (output) |
| 173 | output->writeSuffix(); |
| 174 | |
| 175 | for (auto & view : views) |
| 176 | { |
| 177 | try |
| 178 | { |
| 179 | view.out->writeSuffix(); |
| 180 | } |
| 181 | catch (Exception & ex) |
| 182 | { |
| 183 | ex.addMessage("while write prefix to view " + view.database + "." + view.table); |
| 184 | throw; |
| 185 | } |
| 186 | } |
| 187 | } |
| 188 | |
| 189 | void PushingToViewsBlockOutputStream::flush() |
| 190 | { |
| 191 | if (output) |
| 192 | output->flush(); |
| 193 | |
| 194 | for (auto & view : views) |
| 195 | view.out->flush(); |
| 196 | } |
| 197 | |
| 198 | void PushingToViewsBlockOutputStream::process(const Block & block, size_t view_num) |
| 199 | { |
| 200 | auto & view = views[view_num]; |
| 201 | |
| 202 | try |
| 203 | { |
| 204 | BlockInputStreamPtr in; |
| 205 | |
| 206 | /// We need keep InterpreterSelectQuery, until the processing will be finished, since: |
| 207 | /// |
| 208 | /// - We copy Context inside InterpreterSelectQuery to support |
| 209 | /// modification of context (Settings) for subqueries |
| 210 | /// - InterpreterSelectQuery lives shorter than query pipeline. |
| 211 | /// It's used just to build the query pipeline and no longer needed |
| 212 | /// - ExpressionAnalyzer and then, Functions, that created in InterpreterSelectQuery, |
| 213 | /// **can** take a reference to Context from InterpreterSelectQuery |
| 214 | /// (the problem raises only when function uses context from the |
| 215 | /// execute*() method, like FunctionDictGet do) |
| 216 | /// - These objects live inside query pipeline (DataStreams) and the reference become dangling. |
| 217 | std::optional<InterpreterSelectQuery> select; |
| 218 | |
| 219 | if (view.query) |
| 220 | { |
| 221 | /// We create a table with the same name as original table and the same alias columns, |
| 222 | /// but it will contain single block (that is INSERT-ed into main table). |
| 223 | /// InterpreterSelectQuery will do processing of alias columns. |
| 224 | Context local_context = *views_context; |
| 225 | local_context.addViewSource( |
| 226 | StorageValues::create(storage->getDatabaseName(), storage->getTableName(), storage->getColumns(), |
| 227 | block)); |
| 228 | select.emplace(view.query, local_context, SelectQueryOptions()); |
| 229 | in = std::make_shared<MaterializingBlockInputStream>(select->execute().in); |
| 230 | |
| 231 | /// Squashing is needed here because the materialized view query can generate a lot of blocks |
| 232 | /// even when only one block is inserted into the parent table (e.g. if the query is a GROUP BY |
| 233 | /// and two-level aggregation is triggered). |
| 234 | in = std::make_shared<SquashingBlockInputStream>( |
| 235 | in, context.getSettingsRef().min_insert_block_size_rows, context.getSettingsRef().min_insert_block_size_bytes); |
| 236 | in = std::make_shared<ConvertingBlockInputStream>(context, in, view.out->getHeader(), ConvertingBlockInputStream::MatchColumnsMode::Name); |
| 237 | } |
| 238 | else |
| 239 | in = std::make_shared<OneBlockInputStream>(block); |
| 240 | |
| 241 | in->readPrefix(); |
| 242 | |
| 243 | while (Block result_block = in->read()) |
| 244 | { |
| 245 | Nested::validateArraySizes(result_block); |
| 246 | view.out->write(result_block); |
| 247 | } |
| 248 | |
| 249 | in->readSuffix(); |
| 250 | } |
| 251 | catch (Exception & ex) |
| 252 | { |
| 253 | ex.addMessage("while pushing to view " + backQuoteIfNeed(view.database) + "." + backQuoteIfNeed(view.table)); |
| 254 | throw; |
| 255 | } |
| 256 | } |
| 257 | |
| 258 | } |
| 259 | |