1 | #include <Storages/StorageJoin.h> |
2 | #include <Storages/StorageFactory.h> |
3 | #include <Interpreters/Join.h> |
4 | #include <Parsers/ASTCreateQuery.h> |
5 | #include <Parsers/ASTSetQuery.h> |
6 | #include <Parsers/ASTIdentifier.h> |
7 | #include <Core/ColumnNumbers.h> |
8 | #include <DataStreams/IBlockInputStream.h> |
9 | #include <DataTypes/NestedUtils.h> |
10 | #include <Interpreters/joinDispatch.h> |
11 | #include <Interpreters/AnalyzedJoin.h> |
12 | #include <Common/assert_cast.h> |
13 | #include <Common/quoteString.h> |
14 | |
15 | #include <Poco/String.h> /// toLower |
16 | #include <Poco/File.h> |
17 | |
18 | |
19 | namespace DB |
20 | { |
21 | |
22 | namespace ErrorCodes |
23 | { |
24 | extern const int UNSUPPORTED_JOIN_KEYS; |
25 | extern const int NO_SUCH_COLUMN_IN_TABLE; |
26 | extern const int INCOMPATIBLE_TYPE_OF_JOIN; |
27 | extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; |
28 | extern const int BAD_ARGUMENTS; |
29 | } |
30 | |
31 | StorageJoin::StorageJoin( |
32 | const String & relative_path_, |
33 | const String & database_name_, |
34 | const String & table_name_, |
35 | const Names & key_names_, |
36 | bool use_nulls_, |
37 | SizeLimits limits_, |
38 | ASTTableJoin::Kind kind_, |
39 | ASTTableJoin::Strictness strictness_, |
40 | const ColumnsDescription & columns_, |
41 | const ConstraintsDescription & constraints_, |
42 | bool overwrite, |
43 | const Context & context_) |
44 | : StorageSetOrJoinBase{relative_path_, database_name_, table_name_, columns_, constraints_, context_} |
45 | , key_names(key_names_) |
46 | , use_nulls(use_nulls_) |
47 | , limits(limits_) |
48 | , kind(kind_) |
49 | , strictness(strictness_) |
50 | { |
51 | for (const auto & key : key_names) |
52 | if (!getColumns().hasPhysical(key)) |
53 | throw Exception{"Key column (" + key + ") does not exist in table declaration." , ErrorCodes::NO_SUCH_COLUMN_IN_TABLE}; |
54 | |
55 | table_join = std::make_shared<AnalyzedJoin>(limits, use_nulls, kind, strictness, key_names); |
56 | join = std::make_shared<Join>(table_join, getSampleBlock().sortColumns(), overwrite); |
57 | restore(); |
58 | } |
59 | |
60 | |
61 | void StorageJoin::truncate(const ASTPtr &, const Context &, TableStructureWriteLockHolder &) |
62 | { |
63 | Poco::File(path).remove(true); |
64 | Poco::File(path).createDirectories(); |
65 | Poco::File(path + "tmp/" ).createDirectories(); |
66 | |
67 | increment = 0; |
68 | join = std::make_shared<Join>(table_join, getSampleBlock().sortColumns()); |
69 | } |
70 | |
71 | |
72 | HashJoinPtr StorageJoin::getJoin(std::shared_ptr<AnalyzedJoin> analyzed_join) const |
73 | { |
74 | if (kind != analyzed_join->kind() || strictness != analyzed_join->strictness()) |
75 | throw Exception("Table " + backQuote(table_name) + " has incompatible type of JOIN." , ErrorCodes::INCOMPATIBLE_TYPE_OF_JOIN); |
76 | |
77 | if ((analyzed_join->forceNullableRight() && !use_nulls) || |
78 | (!analyzed_join->forceNullableRight() && isLeftOrFull(analyzed_join->kind()) && use_nulls)) |
79 | throw Exception("Table " + backQuote(table_name) + " needs the same join_use_nulls setting as present in LEFT or FULL JOIN." , |
80 | ErrorCodes::INCOMPATIBLE_TYPE_OF_JOIN); |
81 | |
82 | /// TODO: check key columns |
83 | |
84 | /// Some HACK to remove wrong names qualifiers: table.column -> column. |
85 | analyzed_join->setRightKeys(key_names); |
86 | |
87 | HashJoinPtr join_clone = std::make_shared<Join>(analyzed_join, getSampleBlock().sortColumns()); |
88 | join_clone->reuseJoinedData(*join); |
89 | return join_clone; |
90 | } |
91 | |
92 | |
93 | void StorageJoin::insertBlock(const Block & block) { join->addJoinedBlock(block); } |
94 | size_t StorageJoin::getSize() const { return join->getTotalRowCount(); } |
95 | |
96 | |
97 | void registerStorageJoin(StorageFactory & factory) |
98 | { |
99 | factory.registerStorage("Join" , [](const StorageFactory::Arguments & args) |
100 | { |
101 | /// Join(ANY, LEFT, k1, k2, ...) |
102 | |
103 | ASTs & engine_args = args.engine_args; |
104 | |
105 | auto & settings = args.context.getSettingsRef(); |
106 | |
107 | auto join_use_nulls = settings.join_use_nulls; |
108 | auto max_rows_in_join = settings.max_rows_in_join; |
109 | auto max_bytes_in_join = settings.max_bytes_in_join; |
110 | auto join_overflow_mode = settings.join_overflow_mode; |
111 | auto join_any_take_last_row = settings.join_any_take_last_row; |
112 | auto old_any_join = settings.any_join_distinct_right_table_keys; |
113 | |
114 | if (args.storage_def && args.storage_def->settings) |
115 | { |
116 | for (const auto & setting : args.storage_def->settings->changes) |
117 | { |
118 | if (setting.name == "join_use_nulls" ) |
119 | join_use_nulls.set(setting.value); |
120 | else if (setting.name == "max_rows_in_join" ) |
121 | max_rows_in_join.set(setting.value); |
122 | else if (setting.name == "max_bytes_in_join" ) |
123 | max_bytes_in_join.set(setting.value); |
124 | else if (setting.name == "join_overflow_mode" ) |
125 | join_overflow_mode.set(setting.value); |
126 | else if (setting.name == "join_any_take_last_row" ) |
127 | join_any_take_last_row.set(setting.value); |
128 | else if (setting.name == "any_join_distinct_right_table_keys" ) |
129 | old_any_join.set(setting.value); |
130 | else |
131 | throw Exception( |
132 | "Unknown setting " + setting.name + " for storage " + args.engine_name, |
133 | ErrorCodes::BAD_ARGUMENTS); |
134 | } |
135 | } |
136 | |
137 | if (engine_args.size() < 3) |
138 | throw Exception( |
139 | "Storage Join requires at least 3 parameters: Join(ANY|ALL|SEMI|ANTI, LEFT|INNER|RIGHT, keys...)." , |
140 | ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); |
141 | |
142 | ASTTableJoin::Strictness strictness = ASTTableJoin::Strictness::Unspecified; |
143 | ASTTableJoin::Kind kind = ASTTableJoin::Kind::Comma; |
144 | |
145 | if (auto opt_strictness_id = tryGetIdentifierName(engine_args[0])) |
146 | { |
147 | const String strictness_str = Poco::toLower(*opt_strictness_id); |
148 | |
149 | if (strictness_str == "any" ) |
150 | { |
151 | if (old_any_join) |
152 | strictness = ASTTableJoin::Strictness::RightAny; |
153 | else |
154 | strictness = ASTTableJoin::Strictness::Any; |
155 | } |
156 | else if (strictness_str == "all" ) |
157 | strictness = ASTTableJoin::Strictness::All; |
158 | else if (strictness_str == "semi" ) |
159 | strictness = ASTTableJoin::Strictness::Semi; |
160 | else if (strictness_str == "anti" ) |
161 | strictness = ASTTableJoin::Strictness::Anti; |
162 | } |
163 | |
164 | if (strictness == ASTTableJoin::Strictness::Unspecified) |
165 | throw Exception("First parameter of storage Join must be ANY or ALL or SEMI or ANTI (without quotes)." , |
166 | ErrorCodes::BAD_ARGUMENTS); |
167 | |
168 | if (auto opt_kind_id = tryGetIdentifierName(engine_args[1])) |
169 | { |
170 | const String kind_str = Poco::toLower(*opt_kind_id); |
171 | |
172 | if (kind_str == "left" ) |
173 | kind = ASTTableJoin::Kind::Left; |
174 | else if (kind_str == "inner" ) |
175 | kind = ASTTableJoin::Kind::Inner; |
176 | else if (kind_str == "right" ) |
177 | kind = ASTTableJoin::Kind::Right; |
178 | else if (kind_str == "full" ) |
179 | { |
180 | if (strictness == ASTTableJoin::Strictness::Any) |
181 | strictness = ASTTableJoin::Strictness::RightAny; |
182 | kind = ASTTableJoin::Kind::Full; |
183 | } |
184 | } |
185 | |
186 | if (kind == ASTTableJoin::Kind::Comma) |
187 | throw Exception("Second parameter of storage Join must be LEFT or INNER or RIGHT or FULL (without quotes)." , |
188 | ErrorCodes::BAD_ARGUMENTS); |
189 | |
190 | Names key_names; |
191 | key_names.reserve(engine_args.size() - 2); |
192 | for (size_t i = 2, size = engine_args.size(); i < size; ++i) |
193 | { |
194 | auto opt_key = tryGetIdentifierName(engine_args[i]); |
195 | if (!opt_key) |
196 | throw Exception("Parameter â„–" + toString(i + 1) + " of storage Join don't look like column name." , ErrorCodes::BAD_ARGUMENTS); |
197 | |
198 | key_names.push_back(*opt_key); |
199 | } |
200 | |
201 | return StorageJoin::create( |
202 | args.relative_data_path, |
203 | args.database_name, |
204 | args.table_name, |
205 | key_names, |
206 | join_use_nulls, |
207 | SizeLimits{max_rows_in_join, max_bytes_in_join, join_overflow_mode}, |
208 | kind, |
209 | strictness, |
210 | args.columns, |
211 | args.constraints, |
212 | join_any_take_last_row, |
213 | args.context); |
214 | }); |
215 | } |
216 | |
217 | template <typename T> |
218 | static const char * rawData(T & t) |
219 | { |
220 | return reinterpret_cast<const char *>(&t); |
221 | } |
222 | template <typename T> |
223 | static size_t rawSize(T &) |
224 | { |
225 | return sizeof(T); |
226 | } |
227 | template <> |
228 | const char * rawData(const StringRef & t) |
229 | { |
230 | return t.data; |
231 | } |
232 | template <> |
233 | size_t rawSize(const StringRef & t) |
234 | { |
235 | return t.size; |
236 | } |
237 | |
238 | class JoinBlockInputStream : public IBlockInputStream |
239 | { |
240 | public: |
241 | JoinBlockInputStream(const Join & parent_, UInt64 max_block_size_, Block && sample_block_) |
242 | : parent(parent_), lock(parent.data->rwlock), max_block_size(max_block_size_), sample_block(std::move(sample_block_)) |
243 | { |
244 | columns.resize(sample_block.columns()); |
245 | column_indices.resize(sample_block.columns()); |
246 | column_with_null.resize(sample_block.columns()); |
247 | for (size_t i = 0; i < sample_block.columns(); ++i) |
248 | { |
249 | auto & [_, type, name] = sample_block.getByPosition(i); |
250 | if (parent.right_table_keys.has(name)) |
251 | { |
252 | key_pos = i; |
253 | column_with_null[i] = parent.right_table_keys.getByName(name).type->isNullable(); |
254 | } |
255 | else |
256 | { |
257 | auto pos = parent.sample_block_with_columns_to_add.getPositionByName(name); |
258 | column_indices[i] = pos; |
259 | column_with_null[i] = !parent.sample_block_with_columns_to_add.getByPosition(pos).type->equals(*type); |
260 | } |
261 | } |
262 | } |
263 | |
264 | String getName() const override { return "Join" ; } |
265 | |
266 | Block () const override { return sample_block; } |
267 | |
268 | |
269 | protected: |
270 | Block readImpl() override |
271 | { |
272 | if (parent.data->blocks.empty()) |
273 | return Block(); |
274 | |
275 | Block block; |
276 | if (!joinDispatch(parent.kind, parent.strictness, parent.data->maps, |
277 | [&](auto kind, auto strictness, auto & map) { block = createBlock<kind, strictness>(map); })) |
278 | throw Exception("Logical error: unknown JOIN strictness" , ErrorCodes::LOGICAL_ERROR); |
279 | return block; |
280 | } |
281 | |
282 | private: |
283 | const Join & parent; |
284 | std::shared_lock<std::shared_mutex> lock; |
285 | UInt64 max_block_size; |
286 | Block sample_block; |
287 | |
288 | ColumnNumbers column_indices; |
289 | std::vector<bool> column_with_null; |
290 | std::optional<size_t> key_pos; |
291 | MutableColumns columns; |
292 | |
293 | std::unique_ptr<void, std::function<void(void *)>> position; /// type erasure |
294 | |
295 | |
296 | template <ASTTableJoin::Kind KIND, ASTTableJoin::Strictness STRICTNESS, typename Maps> |
297 | Block createBlock(const Maps & maps) |
298 | { |
299 | for (size_t i = 0; i < sample_block.columns(); ++i) |
300 | { |
301 | const auto & src_col = sample_block.safeGetByPosition(i); |
302 | columns[i] = src_col.type->createColumn(); |
303 | if (column_with_null[i]) |
304 | { |
305 | if (key_pos == i) |
306 | { |
307 | // unwrap null key column |
308 | ColumnNullable & nullable_col = assert_cast<ColumnNullable &>(*columns[i]); |
309 | columns[i] = nullable_col.getNestedColumnPtr()->assumeMutable(); |
310 | } |
311 | else |
312 | // wrap non key column with null |
313 | columns[i] = makeNullable(std::move(columns[i]))->assumeMutable(); |
314 | } |
315 | } |
316 | |
317 | size_t rows_added = 0; |
318 | |
319 | switch (parent.data->type) |
320 | { |
321 | #define M(TYPE) \ |
322 | case Join::Type::TYPE: \ |
323 | rows_added = fillColumns<KIND, STRICTNESS>(*maps.TYPE); \ |
324 | break; |
325 | APPLY_FOR_JOIN_VARIANTS_LIMITED(M) |
326 | #undef M |
327 | |
328 | default: |
329 | throw Exception("Unsupported JOIN keys in StorageJoin. Type: " + toString(static_cast<UInt32>(parent.data->type)), |
330 | ErrorCodes::UNSUPPORTED_JOIN_KEYS); |
331 | } |
332 | |
333 | if (!rows_added) |
334 | return {}; |
335 | |
336 | Block res = sample_block.cloneEmpty(); |
337 | for (size_t i = 0; i < columns.size(); ++i) |
338 | if (column_with_null[i]) |
339 | { |
340 | if (key_pos == i) |
341 | res.getByPosition(i).column = makeNullable(std::move(columns[i])); |
342 | else |
343 | { |
344 | const ColumnNullable & nullable_col = assert_cast<const ColumnNullable &>(*columns[i]); |
345 | res.getByPosition(i).column = nullable_col.getNestedColumnPtr(); |
346 | } |
347 | } |
348 | else |
349 | res.getByPosition(i).column = std::move(columns[i]); |
350 | |
351 | return res; |
352 | } |
353 | |
354 | template <ASTTableJoin::Kind KIND, ASTTableJoin::Strictness STRICTNESS, typename Map> |
355 | size_t fillColumns(const Map & map) |
356 | { |
357 | size_t rows_added = 0; |
358 | |
359 | if (!position) |
360 | position = decltype(position)( |
361 | static_cast<void *>(new typename Map::const_iterator(map.begin())), |
362 | [](void * ptr) { delete reinterpret_cast<typename Map::const_iterator *>(ptr); }); |
363 | |
364 | auto & it = *reinterpret_cast<typename Map::const_iterator *>(position.get()); |
365 | auto end = map.end(); |
366 | |
367 | for (; it != end; ++it) |
368 | { |
369 | if constexpr (STRICTNESS == ASTTableJoin::Strictness::RightAny) |
370 | { |
371 | fillOne<Map>(columns, column_indices, it, key_pos, rows_added); |
372 | } |
373 | else if constexpr (STRICTNESS == ASTTableJoin::Strictness::All) |
374 | { |
375 | fillAll<Map>(columns, column_indices, it, key_pos, rows_added); |
376 | } |
377 | else if constexpr (STRICTNESS == ASTTableJoin::Strictness::Any) |
378 | { |
379 | if constexpr (KIND == ASTTableJoin::Kind::Left || KIND == ASTTableJoin::Kind::Inner) |
380 | fillOne<Map>(columns, column_indices, it, key_pos, rows_added); |
381 | else if constexpr (KIND == ASTTableJoin::Kind::Right) |
382 | fillAll<Map>(columns, column_indices, it, key_pos, rows_added); |
383 | } |
384 | else if constexpr (STRICTNESS == ASTTableJoin::Strictness::Semi) |
385 | { |
386 | if constexpr (KIND == ASTTableJoin::Kind::Left) |
387 | fillOne<Map>(columns, column_indices, it, key_pos, rows_added); |
388 | else if constexpr (KIND == ASTTableJoin::Kind::Right) |
389 | fillAll<Map>(columns, column_indices, it, key_pos, rows_added); |
390 | } |
391 | else if constexpr (STRICTNESS == ASTTableJoin::Strictness::Anti) |
392 | { |
393 | if constexpr (KIND == ASTTableJoin::Kind::Left) |
394 | fillOne<Map>(columns, column_indices, it, key_pos, rows_added); |
395 | else if constexpr (KIND == ASTTableJoin::Kind::Right) |
396 | fillAll<Map>(columns, column_indices, it, key_pos, rows_added); |
397 | } |
398 | else |
399 | throw Exception("This JOIN is not implemented yet" , ErrorCodes::NOT_IMPLEMENTED); |
400 | |
401 | if (rows_added >= max_block_size) |
402 | { |
403 | ++it; |
404 | break; |
405 | } |
406 | } |
407 | |
408 | return rows_added; |
409 | } |
410 | |
411 | template <typename Map> |
412 | static void fillOne(MutableColumns & columns, const ColumnNumbers & column_indices, typename Map::const_iterator & it, |
413 | const std::optional<size_t> & key_pos, size_t & rows_added) |
414 | { |
415 | for (size_t j = 0; j < columns.size(); ++j) |
416 | if (j == key_pos) |
417 | columns[j]->insertData(rawData(it->getKey()), rawSize(it->getKey())); |
418 | else |
419 | columns[j]->insertFrom(*it->getMapped().block->getByPosition(column_indices[j]).column.get(), it->getMapped().row_num); |
420 | ++rows_added; |
421 | } |
422 | |
423 | template <typename Map> |
424 | static void fillAll(MutableColumns & columns, const ColumnNumbers & column_indices, typename Map::const_iterator & it, |
425 | const std::optional<size_t> & key_pos, size_t & rows_added) |
426 | { |
427 | for (auto ref_it = it->getMapped().begin(); ref_it.ok(); ++ref_it) |
428 | { |
429 | for (size_t j = 0; j < columns.size(); ++j) |
430 | if (j == key_pos) |
431 | columns[j]->insertData(rawData(it->getKey()), rawSize(it->getKey())); |
432 | else |
433 | columns[j]->insertFrom(*ref_it->block->getByPosition(column_indices[j]).column.get(), ref_it->row_num); |
434 | ++rows_added; |
435 | } |
436 | } |
437 | }; |
438 | |
439 | |
440 | // TODO: multiple stream read and index read |
441 | BlockInputStreams StorageJoin::read( |
442 | const Names & column_names, |
443 | const SelectQueryInfo & /*query_info*/, |
444 | const Context & /*context*/, |
445 | QueryProcessingStage::Enum /*processed_stage*/, |
446 | size_t max_block_size, |
447 | unsigned /*num_streams*/) |
448 | { |
449 | check(column_names); |
450 | return {std::make_shared<JoinBlockInputStream>(*join, max_block_size, getSampleBlockForColumns(column_names))}; |
451 | } |
452 | |
453 | } |
454 | |