1#pragma once
2
3#include <variant>
4#include <optional>
5#include <shared_mutex>
6#include <deque>
7
8#include <Parsers/ASTTablesInSelectQuery.h>
9
10#include <Interpreters/IJoin.h>
11#include <Interpreters/AggregationCommon.h>
12#include <Interpreters/RowRefs.h>
13
14#include <Common/Arena.h>
15#include <Common/ColumnsHashing.h>
16#include <Common/HashTable/HashMap.h>
17#include <Common/HashTable/FixedHashMap.h>
18
19#include <Columns/ColumnString.h>
20#include <Columns/ColumnFixedString.h>
21
22#include <DataStreams/SizeLimits.h>
23#include <DataStreams/IBlockStream_fwd.h>
24
25
26namespace DB
27{
28
29class AnalyzedJoin;
30
31namespace JoinStuff
32{
33
34/// Base class with optional flag attached that's needed to implement RIGHT and FULL JOINs.
35template <typename T, bool with_used>
36struct WithFlags;
37
38template <typename T>
39struct WithFlags<T, true> : T
40{
41 using Base = T;
42 using T::T;
43
44 mutable std::atomic<bool> used {};
45 void setUsed() const { used.store(true, std::memory_order_relaxed); } /// Could be set simultaneously from different threads.
46 bool getUsed() const { return used; }
47
48 bool setUsedOnce() const
49 {
50 /// fast check to prevent heavy CAS with seq_cst order
51 if (used.load(std::memory_order_relaxed))
52 return false;
53
54 bool expected = false;
55 return used.compare_exchange_strong(expected, true);
56 }
57};
58
59template <typename T>
60struct WithFlags<T, false> : T
61{
62 using Base = T;
63 using T::T;
64
65 void setUsed() const {}
66 bool getUsed() const { return true; }
67 bool setUsedOnce() const { return true; }
68};
69
70using MappedOne = WithFlags<RowRef, false>;
71using MappedAll = WithFlags<RowRefList, false>;
72using MappedOneFlagged = WithFlags<RowRef, true>;
73using MappedAllFlagged = WithFlags<RowRefList, true>;
74using MappedAsof = WithFlags<AsofRowRefs, false>;
75
76}
77
78/** Data structure for implementation of JOIN.
79 * It is just a hash table: keys -> rows of joined ("right") table.
80 * Additionally, CROSS JOIN is supported: instead of hash table, it use just set of blocks without keys.
81 *
82 * JOIN-s could be of these types:
83 * - ALL × LEFT/INNER/RIGHT/FULL
84 * - ANY × LEFT/INNER/RIGHT
85 * - SEMI/ANTI x LEFT/RIGHT
86 * - ASOF x LEFT/INNER
87 * - CROSS
88 *
89 * ALL means usual JOIN, when rows are multiplied by number of matching rows from the "right" table.
90 * ANY uses one line per unique key from right talbe. For LEFT JOIN it would be any row (with needed joined key) from the right table,
91 * for RIGHT JOIN it would be any row from the left table and for INNER one it would be any row from right and any row from left.
92 * SEMI JOIN filter left table by keys that are present in right table for LEFT JOIN, and filter right table by keys from left table
93 * for RIGHT JOIN. In other words SEMI JOIN returns only rows which joining keys present in another table.
94 * ANTI JOIN is the same as SEMI JOIN but returns rows with joining keys that are NOT present in another table.
95 * SEMI/ANTI JOINs allow to get values from both tables. For filter table it gets any row with joining same key. For ANTI JOIN it returns
96 * defaults other table columns.
97 * ASOF JOIN is not-equi join. For one key column it finds nearest value to join according to join inequality.
98 * It's expected that ANY|SEMI LEFT JOIN is more efficient that ALL one.
99 *
100 * If INNER is specified - leave only rows that have matching rows from "right" table.
101 * If LEFT is specified - in case when there is no matching row in "right" table, fill it with default values instead.
102 * If RIGHT is specified - first process as INNER, but track what rows from the right table was joined,
103 * and at the end, add rows from right table that was not joined and substitute default values for columns of left table.
104 * If FULL is specified - first process as LEFT, but track what rows from the right table was joined,
105 * and at the end, add rows from right table that was not joined and substitute default values for columns of left table.
106 *
107 * Thus, LEFT and RIGHT JOINs are not symmetric in terms of implementation.
108 *
109 * All JOINs (except CROSS) are done by equality condition on keys (equijoin).
110 * Non-equality and other conditions are not supported.
111 *
112 * Implementation:
113 *
114 * 1. Build hash table in memory from "right" table.
115 * This hash table is in form of keys -> row in case of ANY or keys -> [rows...] in case of ALL.
116 * This is done in insertFromBlock method.
117 *
118 * 2. Process "left" table and join corresponding rows from "right" table by lookups in the map.
119 * This is done in joinBlock methods.
120 *
121 * In case of ANY LEFT JOIN - form new columns with found values or default values.
122 * This is the most simple. Number of rows in left table does not change.
123 *
124 * In case of ANY INNER JOIN - form new columns with found values,
125 * and also build a filter - in what rows nothing was found.
126 * Then filter columns of "left" table.
127 *
128 * In case of ALL ... JOIN - form new columns with all found rows,
129 * and also fill 'offsets' array, describing how many times we need to replicate values of "left" table.
130 * Then replicate columns of "left" table.
131 *
132 * How Nullable keys are processed:
133 *
134 * NULLs never join to anything, even to each other.
135 * During building of map, we just skip keys with NULL value of any component.
136 * During joining, we simply treat rows with any NULLs in key as non joined.
137 *
138 * Default values for outer joins (LEFT, RIGHT, FULL):
139 *
140 * Behaviour is controlled by 'join_use_nulls' settings.
141 * If it is false, we substitute (global) default value for the data type, for non-joined rows
142 * (zero, empty string, etc. and NULL for Nullable data types).
143 * If it is true, we always generate Nullable column and substitute NULLs for non-joined rows,
144 * as in standard SQL.
145 */
146class Join : public IJoin
147{
148public:
149 Join(std::shared_ptr<AnalyzedJoin> table_join_, const Block & right_sample_block, bool any_take_last_row_ = false);
150
151 bool empty() { return data->type == Type::EMPTY; }
152
153 /** Add block of data from right hand of JOIN to the map.
154 * Returns false, if some limit was exceeded and you should not insert more data.
155 */
156 bool addJoinedBlock(const Block & block) override;
157
158 /** Join data from the map (that was previously built by calls to addJoinedBlock) to the block with data from "left" table.
159 * Could be called from different threads in parallel.
160 */
161 void joinBlock(Block & block) override;
162
163 /// Infer the return type for joinGet function
164 DataTypePtr joinGetReturnType(const String & column_name) const;
165
166 /// Used by joinGet function that turns StorageJoin into a dictionary
167 void joinGet(Block & block, const String & column_name) const;
168
169 /** Keep "totals" (separate part of dataset, see WITH TOTALS) to use later.
170 */
171 void setTotals(const Block & block) override { totals = block; }
172 bool hasTotals() const override { return totals; }
173
174 void joinTotals(Block & block) const override;
175
176 /** For RIGHT and FULL JOINs.
177 * A stream that will contain default values from left table, joined with rows from right table, that was not joined before.
178 * Use only after all calls to joinBlock was done.
179 * left_sample_block is passed without account of 'use_nulls' setting (columns will be converted to Nullable inside).
180 */
181 BlockInputStreamPtr createStreamWithNonJoinedRows(const Block & result_sample_block, UInt64 max_block_size) const override;
182 bool hasStreamWithNonJoinedRows() const override;
183
184 /// Number of keys in all built JOIN maps.
185 size_t getTotalRowCount() const final;
186 /// Sum size in bytes of all buffers, used for JOIN maps and for all memory pools.
187 size_t getTotalByteCount() const;
188
189 bool alwaysReturnsEmptySet() const final { return isInnerOrRight(getKind()) && data->empty; }
190
191 ASTTableJoin::Kind getKind() const { return kind; }
192 ASTTableJoin::Strictness getStrictness() const { return strictness; }
193 AsofRowRefs::Type getAsofType() const { return *asof_type; }
194 ASOF::Inequality getAsofInequality() const { return asof_inequality; }
195 bool anyTakeLastRow() const { return any_take_last_row; }
196
197 /// Different types of keys for maps.
198 #define APPLY_FOR_JOIN_VARIANTS(M) \
199 M(key8) \
200 M(key16) \
201 M(key32) \
202 M(key64) \
203 M(key_string) \
204 M(key_fixed_string) \
205 M(keys128) \
206 M(keys256) \
207 M(hashed)
208
209
210 /// Used for reading from StorageJoin and applying joinGet function
211 #define APPLY_FOR_JOIN_VARIANTS_LIMITED(M) \
212 M(key8) \
213 M(key16) \
214 M(key32) \
215 M(key64) \
216 M(key_string) \
217 M(key_fixed_string)
218
219 enum class Type
220 {
221 EMPTY,
222 CROSS,
223 #define M(NAME) NAME,
224 APPLY_FOR_JOIN_VARIANTS(M)
225 #undef M
226 };
227
228
229 /** Different data structures, that are used to perform JOIN.
230 */
231 template <typename Mapped>
232 struct MapsTemplate
233 {
234 std::unique_ptr<FixedHashMap<UInt8, Mapped>> key8;
235 std::unique_ptr<FixedHashMap<UInt16, Mapped>> key16;
236 std::unique_ptr<HashMap<UInt32, Mapped, HashCRC32<UInt32>>> key32;
237 std::unique_ptr<HashMap<UInt64, Mapped, HashCRC32<UInt64>>> key64;
238 std::unique_ptr<HashMapWithSavedHash<StringRef, Mapped>> key_string;
239 std::unique_ptr<HashMapWithSavedHash<StringRef, Mapped>> key_fixed_string;
240 std::unique_ptr<HashMap<UInt128, Mapped, UInt128HashCRC32>> keys128;
241 std::unique_ptr<HashMap<UInt256, Mapped, UInt256HashCRC32>> keys256;
242 std::unique_ptr<HashMap<UInt128, Mapped, UInt128TrivialHash>> hashed;
243
244 void create(Type which)
245 {
246 switch (which)
247 {
248 case Type::EMPTY: break;
249 case Type::CROSS: break;
250
251 #define M(NAME) \
252 case Type::NAME: NAME = std::make_unique<typename decltype(NAME)::element_type>(); break;
253 APPLY_FOR_JOIN_VARIANTS(M)
254 #undef M
255 }
256 }
257
258 size_t getTotalRowCount(Type which) const
259 {
260 switch (which)
261 {
262 case Type::EMPTY: return 0;
263 case Type::CROSS: return 0;
264
265 #define M(NAME) \
266 case Type::NAME: return NAME ? NAME->size() : 0;
267 APPLY_FOR_JOIN_VARIANTS(M)
268 #undef M
269 }
270
271 __builtin_unreachable();
272 }
273
274 size_t getTotalByteCountImpl(Type which) const
275 {
276 switch (which)
277 {
278 case Type::EMPTY: return 0;
279 case Type::CROSS: return 0;
280
281 #define M(NAME) \
282 case Type::NAME: return NAME ? NAME->getBufferSizeInBytes() : 0;
283 APPLY_FOR_JOIN_VARIANTS(M)
284 #undef M
285 }
286
287 __builtin_unreachable();
288 }
289 };
290
291 using MapsOne = MapsTemplate<JoinStuff::MappedOne>;
292 using MapsAll = MapsTemplate<JoinStuff::MappedAll>;
293 using MapsOneFlagged = MapsTemplate<JoinStuff::MappedOneFlagged>;
294 using MapsAllFlagged = MapsTemplate<JoinStuff::MappedAllFlagged>;
295 using MapsAsof = MapsTemplate<JoinStuff::MappedAsof>;
296
297 using MapsVariant = std::variant<MapsOne, MapsAll, MapsOneFlagged, MapsAllFlagged, MapsAsof>;
298 using BlockNullmapList = std::deque<std::pair<const Block *, ColumnPtr>>;
299
300 struct RightTableData
301 {
302 /// Protect state for concurrent use in insertFromBlock and joinBlock.
303 /// @note that these methods could be called simultaneously only while use of StorageJoin.
304 mutable std::shared_mutex rwlock;
305
306 Type type = Type::EMPTY;
307 bool empty = true;
308
309 MapsVariant maps;
310 Block sample_block; /// Block as it would appear in the BlockList
311 BlocksList blocks; /// Blocks of "right" table.
312 BlockNullmapList blocks_nullmaps; /// Nullmaps for blocks of "right" table (if needed)
313
314 /// Additional data - strings for string keys and continuation elements of single-linked lists of references to rows.
315 Arena pool;
316 };
317
318 void reuseJoinedData(const Join & join)
319 {
320 data = join.data;
321 }
322
323private:
324 friend class NonJoinedBlockInputStream;
325 friend class JoinBlockInputStream;
326
327 std::shared_ptr<AnalyzedJoin> table_join;
328 ASTTableJoin::Kind kind;
329 ASTTableJoin::Strictness strictness;
330
331 /// Names of key columns in right-side table (in the order they appear in ON/USING clause). @note It could contain duplicates.
332 const Names & key_names_right;
333
334 bool nullable_right_side; /// In case of LEFT and FULL joins, if use_nulls, convert right-side columns to Nullable.
335 bool nullable_left_side; /// In case of RIGHT and FULL joins, if use_nulls, convert left-side columns to Nullable.
336 bool any_take_last_row; /// Overwrite existing values when encountering the same key again
337 std::optional<AsofRowRefs::Type> asof_type;
338 ASOF::Inequality asof_inequality;
339
340 /// Right table data. StorageJoin shares it between many Join objects.
341 std::shared_ptr<RightTableData> data;
342 Sizes key_sizes;
343
344 /// Block with columns from the right-side table except key columns.
345 Block sample_block_with_columns_to_add;
346 /// Block with key columns in the same order they appear in the right-side table (duplicates appear once).
347 Block right_table_keys;
348 /// Block with key columns right-side table keys that are needed in result (would be attached after joined columns).
349 Block required_right_keys;
350 /// Left table column names that are sources for required_right_keys columns
351 std::vector<String> required_right_keys_sources;
352
353 Poco::Logger * log;
354
355 Block totals;
356
357 void init(Type type_);
358
359 /** Set information about structure of right hand of JOIN (joined data).
360 */
361 void setSampleBlock(const Block & block);
362
363 const Block & savedBlockSample() const { return data->sample_block; }
364
365 /// Modify (structure) right block to save it in block list
366 Block structureRightBlock(const Block & stored_block) const;
367 void initRightBlockStructure();
368 void initRequiredRightKeys();
369
370 template <ASTTableJoin::Kind KIND, ASTTableJoin::Strictness STRICTNESS, typename Maps>
371 void joinBlockImpl(
372 Block & block,
373 const Names & key_names_left,
374 const Block & block_with_columns_to_add,
375 const Maps & maps) const;
376
377 void joinBlockImplCross(Block & block) const;
378
379 template <typename Maps>
380 void joinGetImpl(Block & block, const String & column_name, const Maps & maps) const;
381
382 static Type chooseMethod(const ColumnRawPtrs & key_columns, Sizes & key_sizes);
383};
384
385}
386