1#include <Storages/StorageJoin.h>
2#include <Storages/StorageFactory.h>
3#include <Interpreters/Join.h>
4#include <Parsers/ASTCreateQuery.h>
5#include <Parsers/ASTSetQuery.h>
6#include <Parsers/ASTIdentifier.h>
7#include <Core/ColumnNumbers.h>
8#include <DataStreams/IBlockInputStream.h>
9#include <DataTypes/NestedUtils.h>
10#include <Interpreters/joinDispatch.h>
11#include <Interpreters/AnalyzedJoin.h>
12#include <Common/assert_cast.h>
13#include <Common/quoteString.h>
14
15#include <Poco/String.h> /// toLower
16#include <Poco/File.h>
17
18
19namespace DB
20{
21
22namespace ErrorCodes
23{
24 extern const int UNSUPPORTED_JOIN_KEYS;
25 extern const int NO_SUCH_COLUMN_IN_TABLE;
26 extern const int INCOMPATIBLE_TYPE_OF_JOIN;
27 extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
28 extern const int BAD_ARGUMENTS;
29}
30
31StorageJoin::StorageJoin(
32 const String & relative_path_,
33 const String & database_name_,
34 const String & table_name_,
35 const Names & key_names_,
36 bool use_nulls_,
37 SizeLimits limits_,
38 ASTTableJoin::Kind kind_,
39 ASTTableJoin::Strictness strictness_,
40 const ColumnsDescription & columns_,
41 const ConstraintsDescription & constraints_,
42 bool overwrite,
43 const Context & context_)
44 : StorageSetOrJoinBase{relative_path_, database_name_, table_name_, columns_, constraints_, context_}
45 , key_names(key_names_)
46 , use_nulls(use_nulls_)
47 , limits(limits_)
48 , kind(kind_)
49 , strictness(strictness_)
50{
51 for (const auto & key : key_names)
52 if (!getColumns().hasPhysical(key))
53 throw Exception{"Key column (" + key + ") does not exist in table declaration.", ErrorCodes::NO_SUCH_COLUMN_IN_TABLE};
54
55 table_join = std::make_shared<AnalyzedJoin>(limits, use_nulls, kind, strictness, key_names);
56 join = std::make_shared<Join>(table_join, getSampleBlock().sortColumns(), overwrite);
57 restore();
58}
59
60
61void StorageJoin::truncate(const ASTPtr &, const Context &, TableStructureWriteLockHolder &)
62{
63 Poco::File(path).remove(true);
64 Poco::File(path).createDirectories();
65 Poco::File(path + "tmp/").createDirectories();
66
67 increment = 0;
68 join = std::make_shared<Join>(table_join, getSampleBlock().sortColumns());
69}
70
71
72HashJoinPtr StorageJoin::getJoin(std::shared_ptr<AnalyzedJoin> analyzed_join) const
73{
74 if (kind != analyzed_join->kind() || strictness != analyzed_join->strictness())
75 throw Exception("Table " + backQuote(table_name) + " has incompatible type of JOIN.", ErrorCodes::INCOMPATIBLE_TYPE_OF_JOIN);
76
77 if ((analyzed_join->forceNullableRight() && !use_nulls) ||
78 (!analyzed_join->forceNullableRight() && isLeftOrFull(analyzed_join->kind()) && use_nulls))
79 throw Exception("Table " + backQuote(table_name) + " needs the same join_use_nulls setting as present in LEFT or FULL JOIN.",
80 ErrorCodes::INCOMPATIBLE_TYPE_OF_JOIN);
81
82 /// TODO: check key columns
83
84 /// Some HACK to remove wrong names qualifiers: table.column -> column.
85 analyzed_join->setRightKeys(key_names);
86
87 HashJoinPtr join_clone = std::make_shared<Join>(analyzed_join, getSampleBlock().sortColumns());
88 join_clone->reuseJoinedData(*join);
89 return join_clone;
90}
91
92
93void StorageJoin::insertBlock(const Block & block) { join->addJoinedBlock(block); }
94size_t StorageJoin::getSize() const { return join->getTotalRowCount(); }
95
96
97void registerStorageJoin(StorageFactory & factory)
98{
99 factory.registerStorage("Join", [](const StorageFactory::Arguments & args)
100 {
101 /// Join(ANY, LEFT, k1, k2, ...)
102
103 ASTs & engine_args = args.engine_args;
104
105 auto & settings = args.context.getSettingsRef();
106
107 auto join_use_nulls = settings.join_use_nulls;
108 auto max_rows_in_join = settings.max_rows_in_join;
109 auto max_bytes_in_join = settings.max_bytes_in_join;
110 auto join_overflow_mode = settings.join_overflow_mode;
111 auto join_any_take_last_row = settings.join_any_take_last_row;
112 auto old_any_join = settings.any_join_distinct_right_table_keys;
113
114 if (args.storage_def && args.storage_def->settings)
115 {
116 for (const auto & setting : args.storage_def->settings->changes)
117 {
118 if (setting.name == "join_use_nulls")
119 join_use_nulls.set(setting.value);
120 else if (setting.name == "max_rows_in_join")
121 max_rows_in_join.set(setting.value);
122 else if (setting.name == "max_bytes_in_join")
123 max_bytes_in_join.set(setting.value);
124 else if (setting.name == "join_overflow_mode")
125 join_overflow_mode.set(setting.value);
126 else if (setting.name == "join_any_take_last_row")
127 join_any_take_last_row.set(setting.value);
128 else if (setting.name == "any_join_distinct_right_table_keys")
129 old_any_join.set(setting.value);
130 else
131 throw Exception(
132 "Unknown setting " + setting.name + " for storage " + args.engine_name,
133 ErrorCodes::BAD_ARGUMENTS);
134 }
135 }
136
137 if (engine_args.size() < 3)
138 throw Exception(
139 "Storage Join requires at least 3 parameters: Join(ANY|ALL|SEMI|ANTI, LEFT|INNER|RIGHT, keys...).",
140 ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
141
142 ASTTableJoin::Strictness strictness = ASTTableJoin::Strictness::Unspecified;
143 ASTTableJoin::Kind kind = ASTTableJoin::Kind::Comma;
144
145 if (auto opt_strictness_id = tryGetIdentifierName(engine_args[0]))
146 {
147 const String strictness_str = Poco::toLower(*opt_strictness_id);
148
149 if (strictness_str == "any")
150 {
151 if (old_any_join)
152 strictness = ASTTableJoin::Strictness::RightAny;
153 else
154 strictness = ASTTableJoin::Strictness::Any;
155 }
156 else if (strictness_str == "all")
157 strictness = ASTTableJoin::Strictness::All;
158 else if (strictness_str == "semi")
159 strictness = ASTTableJoin::Strictness::Semi;
160 else if (strictness_str == "anti")
161 strictness = ASTTableJoin::Strictness::Anti;
162 }
163
164 if (strictness == ASTTableJoin::Strictness::Unspecified)
165 throw Exception("First parameter of storage Join must be ANY or ALL or SEMI or ANTI (without quotes).",
166 ErrorCodes::BAD_ARGUMENTS);
167
168 if (auto opt_kind_id = tryGetIdentifierName(engine_args[1]))
169 {
170 const String kind_str = Poco::toLower(*opt_kind_id);
171
172 if (kind_str == "left")
173 kind = ASTTableJoin::Kind::Left;
174 else if (kind_str == "inner")
175 kind = ASTTableJoin::Kind::Inner;
176 else if (kind_str == "right")
177 kind = ASTTableJoin::Kind::Right;
178 else if (kind_str == "full")
179 {
180 if (strictness == ASTTableJoin::Strictness::Any)
181 strictness = ASTTableJoin::Strictness::RightAny;
182 kind = ASTTableJoin::Kind::Full;
183 }
184 }
185
186 if (kind == ASTTableJoin::Kind::Comma)
187 throw Exception("Second parameter of storage Join must be LEFT or INNER or RIGHT or FULL (without quotes).",
188 ErrorCodes::BAD_ARGUMENTS);
189
190 Names key_names;
191 key_names.reserve(engine_args.size() - 2);
192 for (size_t i = 2, size = engine_args.size(); i < size; ++i)
193 {
194 auto opt_key = tryGetIdentifierName(engine_args[i]);
195 if (!opt_key)
196 throw Exception("Parameter â„–" + toString(i + 1) + " of storage Join don't look like column name.", ErrorCodes::BAD_ARGUMENTS);
197
198 key_names.push_back(*opt_key);
199 }
200
201 return StorageJoin::create(
202 args.relative_data_path,
203 args.database_name,
204 args.table_name,
205 key_names,
206 join_use_nulls,
207 SizeLimits{max_rows_in_join, max_bytes_in_join, join_overflow_mode},
208 kind,
209 strictness,
210 args.columns,
211 args.constraints,
212 join_any_take_last_row,
213 args.context);
214 });
215}
216
217template <typename T>
218static const char * rawData(T & t)
219{
220 return reinterpret_cast<const char *>(&t);
221}
222template <typename T>
223static size_t rawSize(T &)
224{
225 return sizeof(T);
226}
227template <>
228const char * rawData(const StringRef & t)
229{
230 return t.data;
231}
232template <>
233size_t rawSize(const StringRef & t)
234{
235 return t.size;
236}
237
238class JoinBlockInputStream : public IBlockInputStream
239{
240public:
241 JoinBlockInputStream(const Join & parent_, UInt64 max_block_size_, Block && sample_block_)
242 : parent(parent_), lock(parent.data->rwlock), max_block_size(max_block_size_), sample_block(std::move(sample_block_))
243 {
244 columns.resize(sample_block.columns());
245 column_indices.resize(sample_block.columns());
246 column_with_null.resize(sample_block.columns());
247 for (size_t i = 0; i < sample_block.columns(); ++i)
248 {
249 auto & [_, type, name] = sample_block.getByPosition(i);
250 if (parent.right_table_keys.has(name))
251 {
252 key_pos = i;
253 column_with_null[i] = parent.right_table_keys.getByName(name).type->isNullable();
254 }
255 else
256 {
257 auto pos = parent.sample_block_with_columns_to_add.getPositionByName(name);
258 column_indices[i] = pos;
259 column_with_null[i] = !parent.sample_block_with_columns_to_add.getByPosition(pos).type->equals(*type);
260 }
261 }
262 }
263
264 String getName() const override { return "Join"; }
265
266 Block getHeader() const override { return sample_block; }
267
268
269protected:
270 Block readImpl() override
271 {
272 if (parent.data->blocks.empty())
273 return Block();
274
275 Block block;
276 if (!joinDispatch(parent.kind, parent.strictness, parent.data->maps,
277 [&](auto kind, auto strictness, auto & map) { block = createBlock<kind, strictness>(map); }))
278 throw Exception("Logical error: unknown JOIN strictness", ErrorCodes::LOGICAL_ERROR);
279 return block;
280 }
281
282private:
283 const Join & parent;
284 std::shared_lock<std::shared_mutex> lock;
285 UInt64 max_block_size;
286 Block sample_block;
287
288 ColumnNumbers column_indices;
289 std::vector<bool> column_with_null;
290 std::optional<size_t> key_pos;
291 MutableColumns columns;
292
293 std::unique_ptr<void, std::function<void(void *)>> position; /// type erasure
294
295
296 template <ASTTableJoin::Kind KIND, ASTTableJoin::Strictness STRICTNESS, typename Maps>
297 Block createBlock(const Maps & maps)
298 {
299 for (size_t i = 0; i < sample_block.columns(); ++i)
300 {
301 const auto & src_col = sample_block.safeGetByPosition(i);
302 columns[i] = src_col.type->createColumn();
303 if (column_with_null[i])
304 {
305 if (key_pos == i)
306 {
307 // unwrap null key column
308 ColumnNullable & nullable_col = assert_cast<ColumnNullable &>(*columns[i]);
309 columns[i] = nullable_col.getNestedColumnPtr()->assumeMutable();
310 }
311 else
312 // wrap non key column with null
313 columns[i] = makeNullable(std::move(columns[i]))->assumeMutable();
314 }
315 }
316
317 size_t rows_added = 0;
318
319 switch (parent.data->type)
320 {
321#define M(TYPE) \
322 case Join::Type::TYPE: \
323 rows_added = fillColumns<KIND, STRICTNESS>(*maps.TYPE); \
324 break;
325 APPLY_FOR_JOIN_VARIANTS_LIMITED(M)
326#undef M
327
328 default:
329 throw Exception("Unsupported JOIN keys in StorageJoin. Type: " + toString(static_cast<UInt32>(parent.data->type)),
330 ErrorCodes::UNSUPPORTED_JOIN_KEYS);
331 }
332
333 if (!rows_added)
334 return {};
335
336 Block res = sample_block.cloneEmpty();
337 for (size_t i = 0; i < columns.size(); ++i)
338 if (column_with_null[i])
339 {
340 if (key_pos == i)
341 res.getByPosition(i).column = makeNullable(std::move(columns[i]));
342 else
343 {
344 const ColumnNullable & nullable_col = assert_cast<const ColumnNullable &>(*columns[i]);
345 res.getByPosition(i).column = nullable_col.getNestedColumnPtr();
346 }
347 }
348 else
349 res.getByPosition(i).column = std::move(columns[i]);
350
351 return res;
352 }
353
354 template <ASTTableJoin::Kind KIND, ASTTableJoin::Strictness STRICTNESS, typename Map>
355 size_t fillColumns(const Map & map)
356 {
357 size_t rows_added = 0;
358
359 if (!position)
360 position = decltype(position)(
361 static_cast<void *>(new typename Map::const_iterator(map.begin())),
362 [](void * ptr) { delete reinterpret_cast<typename Map::const_iterator *>(ptr); });
363
364 auto & it = *reinterpret_cast<typename Map::const_iterator *>(position.get());
365 auto end = map.end();
366
367 for (; it != end; ++it)
368 {
369 if constexpr (STRICTNESS == ASTTableJoin::Strictness::RightAny)
370 {
371 fillOne<Map>(columns, column_indices, it, key_pos, rows_added);
372 }
373 else if constexpr (STRICTNESS == ASTTableJoin::Strictness::All)
374 {
375 fillAll<Map>(columns, column_indices, it, key_pos, rows_added);
376 }
377 else if constexpr (STRICTNESS == ASTTableJoin::Strictness::Any)
378 {
379 if constexpr (KIND == ASTTableJoin::Kind::Left || KIND == ASTTableJoin::Kind::Inner)
380 fillOne<Map>(columns, column_indices, it, key_pos, rows_added);
381 else if constexpr (KIND == ASTTableJoin::Kind::Right)
382 fillAll<Map>(columns, column_indices, it, key_pos, rows_added);
383 }
384 else if constexpr (STRICTNESS == ASTTableJoin::Strictness::Semi)
385 {
386 if constexpr (KIND == ASTTableJoin::Kind::Left)
387 fillOne<Map>(columns, column_indices, it, key_pos, rows_added);
388 else if constexpr (KIND == ASTTableJoin::Kind::Right)
389 fillAll<Map>(columns, column_indices, it, key_pos, rows_added);
390 }
391 else if constexpr (STRICTNESS == ASTTableJoin::Strictness::Anti)
392 {
393 if constexpr (KIND == ASTTableJoin::Kind::Left)
394 fillOne<Map>(columns, column_indices, it, key_pos, rows_added);
395 else if constexpr (KIND == ASTTableJoin::Kind::Right)
396 fillAll<Map>(columns, column_indices, it, key_pos, rows_added);
397 }
398 else
399 throw Exception("This JOIN is not implemented yet", ErrorCodes::NOT_IMPLEMENTED);
400
401 if (rows_added >= max_block_size)
402 {
403 ++it;
404 break;
405 }
406 }
407
408 return rows_added;
409 }
410
411 template <typename Map>
412 static void fillOne(MutableColumns & columns, const ColumnNumbers & column_indices, typename Map::const_iterator & it,
413 const std::optional<size_t> & key_pos, size_t & rows_added)
414 {
415 for (size_t j = 0; j < columns.size(); ++j)
416 if (j == key_pos)
417 columns[j]->insertData(rawData(it->getKey()), rawSize(it->getKey()));
418 else
419 columns[j]->insertFrom(*it->getMapped().block->getByPosition(column_indices[j]).column.get(), it->getMapped().row_num);
420 ++rows_added;
421 }
422
423 template <typename Map>
424 static void fillAll(MutableColumns & columns, const ColumnNumbers & column_indices, typename Map::const_iterator & it,
425 const std::optional<size_t> & key_pos, size_t & rows_added)
426 {
427 for (auto ref_it = it->getMapped().begin(); ref_it.ok(); ++ref_it)
428 {
429 for (size_t j = 0; j < columns.size(); ++j)
430 if (j == key_pos)
431 columns[j]->insertData(rawData(it->getKey()), rawSize(it->getKey()));
432 else
433 columns[j]->insertFrom(*ref_it->block->getByPosition(column_indices[j]).column.get(), ref_it->row_num);
434 ++rows_added;
435 }
436 }
437};
438
439
440// TODO: multiple stream read and index read
441BlockInputStreams StorageJoin::read(
442 const Names & column_names,
443 const SelectQueryInfo & /*query_info*/,
444 const Context & /*context*/,
445 QueryProcessingStage::Enum /*processed_stage*/,
446 size_t max_block_size,
447 unsigned /*num_streams*/)
448{
449 check(column_names);
450 return {std::make_shared<JoinBlockInputStream>(*join, max_block_size, getSampleBlockForColumns(column_names))};
451}
452
453}
454