1#include "config_core.h"
2#include <Interpreters/Set.h>
3#include <Common/ProfileEvents.h>
4#include <Common/SipHash.h>
5#include <Interpreters/ExpressionActions.h>
6#include <Interpreters/ExpressionJIT.h>
7#include <Interpreters/AnalyzedJoin.h>
8#include <Columns/ColumnsNumber.h>
9#include <Columns/ColumnArray.h>
10#include <Common/typeid_cast.h>
11#include <DataTypes/DataTypeArray.h>
12#include <DataTypes/DataTypesNumber.h>
13#include <Functions/FunctionFactory.h>
14#include <Functions/IFunction.h>
15#include <set>
16#include <optional>
17#include <Columns/ColumnSet.h>
18#include <Functions/FunctionHelpers.h>
19
20
21namespace ProfileEvents
22{
23 extern const Event FunctionExecute;
24 extern const Event CompiledFunctionExecute;
25}
26
27namespace DB
28{
29
30namespace ErrorCodes
31{
32 extern const int DUPLICATE_COLUMN;
33 extern const int UNKNOWN_IDENTIFIER;
34 extern const int UNKNOWN_ACTION;
35 extern const int NOT_FOUND_COLUMN_IN_BLOCK;
36 extern const int SIZES_OF_ARRAYS_DOESNT_MATCH;
37 extern const int TOO_MANY_TEMPORARY_COLUMNS;
38 extern const int TOO_MANY_TEMPORARY_NON_CONST_COLUMNS;
39 extern const int TYPE_MISMATCH;
40}
41
42/// Read comment near usage
43static constexpr auto DUMMY_COLUMN_NAME = "_dummy";
44
45Names ExpressionAction::getNeededColumns() const
46{
47 Names res = argument_names;
48
49 res.insert(res.end(), array_joined_columns.begin(), array_joined_columns.end());
50
51 if (table_join)
52 res.insert(res.end(), table_join->keyNamesLeft().begin(), table_join->keyNamesLeft().end());
53
54 for (const auto & column : projection)
55 res.push_back(column.first);
56
57 if (!source_name.empty())
58 res.push_back(source_name);
59
60 return res;
61}
62
63
64ExpressionAction ExpressionAction::applyFunction(
65 const FunctionOverloadResolverPtr & function_,
66 const std::vector<std::string> & argument_names_,
67 std::string result_name_)
68{
69 if (result_name_ == "")
70 {
71 result_name_ = function_->getName() + "(";
72 for (size_t i = 0 ; i < argument_names_.size(); ++i)
73 {
74 if (i)
75 result_name_ += ", ";
76 result_name_ += argument_names_[i];
77 }
78 result_name_ += ")";
79 }
80
81 ExpressionAction a;
82 a.type = APPLY_FUNCTION;
83 a.result_name = result_name_;
84 a.function_builder = function_;
85 a.argument_names = argument_names_;
86 return a;
87}
88
89ExpressionAction ExpressionAction::addColumn(
90 const ColumnWithTypeAndName & added_column_)
91{
92 ExpressionAction a;
93 a.type = ADD_COLUMN;
94 a.result_name = added_column_.name;
95 a.result_type = added_column_.type;
96 a.added_column = added_column_.column;
97 return a;
98}
99
100ExpressionAction ExpressionAction::removeColumn(const std::string & removed_name)
101{
102 ExpressionAction a;
103 a.type = REMOVE_COLUMN;
104 a.source_name = removed_name;
105 return a;
106}
107
108ExpressionAction ExpressionAction::copyColumn(const std::string & from_name, const std::string & to_name, bool can_replace)
109{
110 ExpressionAction a;
111 a.type = COPY_COLUMN;
112 a.source_name = from_name;
113 a.result_name = to_name;
114 a.can_replace = can_replace;
115 return a;
116}
117
118ExpressionAction ExpressionAction::project(const NamesWithAliases & projected_columns_)
119{
120 ExpressionAction a;
121 a.type = PROJECT;
122 a.projection = projected_columns_;
123 return a;
124}
125
126ExpressionAction ExpressionAction::project(const Names & projected_columns_)
127{
128 ExpressionAction a;
129 a.type = PROJECT;
130 a.projection.resize(projected_columns_.size());
131 for (size_t i = 0; i < projected_columns_.size(); ++i)
132 a.projection[i] = NameWithAlias(projected_columns_[i], "");
133 return a;
134}
135
136ExpressionAction ExpressionAction::addAliases(const NamesWithAliases & aliased_columns_)
137{
138 ExpressionAction a;
139 a.type = ADD_ALIASES;
140 a.projection = aliased_columns_;
141 return a;
142}
143
144ExpressionAction ExpressionAction::arrayJoin(const NameSet & array_joined_columns, bool array_join_is_left, const Context & context)
145{
146 if (array_joined_columns.empty())
147 throw Exception("No arrays to join", ErrorCodes::LOGICAL_ERROR);
148 ExpressionAction a;
149 a.type = ARRAY_JOIN;
150 a.array_joined_columns = array_joined_columns;
151 a.array_join_is_left = array_join_is_left;
152 a.unaligned_array_join = context.getSettingsRef().enable_unaligned_array_join;
153
154 if (a.unaligned_array_join)
155 {
156 a.function_length = FunctionFactory::instance().get("length", context);
157 a.function_greatest = FunctionFactory::instance().get("greatest", context);
158 a.function_arrayResize = FunctionFactory::instance().get("arrayResize", context);
159 }
160 else if (array_join_is_left)
161 a.function_builder = FunctionFactory::instance().get("emptyArrayToSingle", context);
162
163 return a;
164}
165
166ExpressionAction ExpressionAction::ordinaryJoin(std::shared_ptr<AnalyzedJoin> table_join, JoinPtr join)
167{
168 ExpressionAction a;
169 a.type = JOIN;
170 a.table_join = table_join;
171 a.join = join;
172 return a;
173}
174
175
176void ExpressionAction::prepare(Block & sample_block, const Settings & settings, NameSet & names_not_for_constant_folding)
177{
178 // std::cerr << "preparing: " << toString() << std::endl;
179
180 /** Constant expressions should be evaluated, and put the result in sample_block.
181 */
182
183 switch (type)
184 {
185 case APPLY_FUNCTION:
186 {
187 if (sample_block.has(result_name))
188 throw Exception("Column '" + result_name + "' already exists", ErrorCodes::DUPLICATE_COLUMN);
189
190 bool all_const = true;
191 bool all_suitable_for_constant_folding = true;
192
193 ColumnNumbers arguments(argument_names.size());
194 for (size_t i = 0; i < argument_names.size(); ++i)
195 {
196 arguments[i] = sample_block.getPositionByName(argument_names[i]);
197 ColumnPtr col = sample_block.safeGetByPosition(arguments[i]).column;
198 if (!col || !isColumnConst(*col))
199 all_const = false;
200
201 if (names_not_for_constant_folding.count(argument_names[i]))
202 all_suitable_for_constant_folding = false;
203 }
204
205 size_t result_position = sample_block.columns();
206 sample_block.insert({nullptr, result_type, result_name});
207 function = function_base->prepare(sample_block, arguments, result_position);
208 function->createLowCardinalityResultCache(settings.max_threads);
209
210 bool compile_expressions = false;
211#if USE_EMBEDDED_COMPILER
212 compile_expressions = settings.compile_expressions;
213#endif
214 /// If all arguments are constants, and function is suitable to be executed in 'prepare' stage - execute function.
215 /// But if we compile expressions compiled version of this function maybe placed in cache,
216 /// so we don't want to unfold non deterministic functions
217 if (all_const && function_base->isSuitableForConstantFolding() && (!compile_expressions || function_base->isDeterministic()))
218 {
219 function->execute(sample_block, arguments, result_position, sample_block.rows(), true);
220
221 /// If the result is not a constant, just in case, we will consider the result as unknown.
222 ColumnWithTypeAndName & col = sample_block.safeGetByPosition(result_position);
223 if (!isColumnConst(*col.column))
224 {
225 col.column = nullptr;
226 }
227 else
228 {
229 /// All constant (literal) columns in block are added with size 1.
230 /// But if there was no columns in block before executing a function, the result has size 0.
231 /// Change the size to 1.
232
233 if (col.column->empty())
234 col.column = col.column->cloneResized(1);
235
236 if (!all_suitable_for_constant_folding)
237 names_not_for_constant_folding.insert(result_name);
238 }
239 }
240
241 /// Some functions like ignore() or getTypeName() always return constant result even if arguments are not constant.
242 /// We can't do constant folding, but can specify in sample block that function result is constant to avoid
243 /// unnecessary materialization.
244 auto & res = sample_block.getByPosition(result_position);
245 if (!res.column && function_base->isSuitableForConstantFolding())
246 {
247 if (auto col = function_base->getResultIfAlwaysReturnsConstantAndHasArguments(sample_block, arguments))
248 {
249 res.column = std::move(col);
250 names_not_for_constant_folding.insert(result_name);
251 }
252 }
253
254 break;
255 }
256
257 case ARRAY_JOIN:
258 {
259 for (const auto & name : array_joined_columns)
260 {
261 ColumnWithTypeAndName & current = sample_block.getByName(name);
262 const DataTypeArray * array_type = typeid_cast<const DataTypeArray *>(&*current.type);
263 if (!array_type)
264 throw Exception("ARRAY JOIN requires array argument", ErrorCodes::TYPE_MISMATCH);
265 current.type = array_type->getNestedType();
266 current.column = nullptr;
267 }
268
269 break;
270 }
271
272 case JOIN:
273 {
274 table_join->addJoinedColumnsAndCorrectNullability(sample_block);
275 break;
276 }
277
278 case PROJECT:
279 {
280 Block new_block;
281
282 for (size_t i = 0; i < projection.size(); ++i)
283 {
284 const std::string & name = projection[i].first;
285 const std::string & alias = projection[i].second;
286 ColumnWithTypeAndName column = sample_block.getByName(name);
287 if (alias != "")
288 column.name = alias;
289 new_block.insert(std::move(column));
290 }
291
292 sample_block.swap(new_block);
293 break;
294 }
295
296 case ADD_ALIASES:
297 {
298 for (size_t i = 0; i < projection.size(); ++i)
299 {
300 const std::string & name = projection[i].first;
301 const std::string & alias = projection[i].second;
302 const ColumnWithTypeAndName & column = sample_block.getByName(name);
303 if (alias != "" && !sample_block.has(alias))
304 sample_block.insert({column.column, column.type, alias});
305 }
306 break;
307 }
308
309 case REMOVE_COLUMN:
310 {
311 sample_block.erase(source_name);
312 break;
313 }
314
315 case ADD_COLUMN:
316 {
317 if (sample_block.has(result_name))
318 throw Exception("Column '" + result_name + "' already exists", ErrorCodes::DUPLICATE_COLUMN);
319
320 sample_block.insert(ColumnWithTypeAndName(added_column, result_type, result_name));
321 break;
322 }
323
324 case COPY_COLUMN:
325 {
326 const auto & source = sample_block.getByName(source_name);
327 result_type = source.type;
328
329 if (sample_block.has(result_name))
330 {
331 if (can_replace)
332 {
333 auto & result = sample_block.getByName(result_name);
334 result.type = result_type;
335 result.column = source.column;
336 }
337 else
338 throw Exception("Column '" + result_name + "' already exists", ErrorCodes::DUPLICATE_COLUMN);
339 }
340 else
341 sample_block.insert(ColumnWithTypeAndName(source.column, result_type, result_name));
342
343 break;
344 }
345 }
346}
347
348
349void ExpressionAction::execute(Block & block, bool dry_run) const
350{
351 size_t input_rows_count = block.rows();
352
353 if (type == REMOVE_COLUMN || type == COPY_COLUMN)
354 if (!block.has(source_name))
355 throw Exception("Not found column '" + source_name + "'. There are columns: " + block.dumpNames(), ErrorCodes::NOT_FOUND_COLUMN_IN_BLOCK);
356
357 if (type == ADD_COLUMN || (type == COPY_COLUMN && !can_replace) || type == APPLY_FUNCTION)
358 if (block.has(result_name))
359 throw Exception("Column '" + result_name + "' already exists", ErrorCodes::DUPLICATE_COLUMN);
360
361 switch (type)
362 {
363 case APPLY_FUNCTION:
364 {
365 ColumnNumbers arguments(argument_names.size());
366 for (size_t i = 0; i < argument_names.size(); ++i)
367 {
368 if (!block.has(argument_names[i]))
369 throw Exception("Not found column: '" + argument_names[i] + "'", ErrorCodes::NOT_FOUND_COLUMN_IN_BLOCK);
370 arguments[i] = block.getPositionByName(argument_names[i]);
371 }
372
373 size_t num_columns_without_result = block.columns();
374 block.insert({ nullptr, result_type, result_name});
375
376 ProfileEvents::increment(ProfileEvents::FunctionExecute);
377 if (is_function_compiled)
378 ProfileEvents::increment(ProfileEvents::CompiledFunctionExecute);
379 function->execute(block, arguments, num_columns_without_result, input_rows_count, dry_run);
380
381 break;
382 }
383
384 case ARRAY_JOIN:
385 {
386 if (array_joined_columns.empty())
387 throw Exception("No arrays to join", ErrorCodes::LOGICAL_ERROR);
388
389 ColumnPtr any_array_ptr = block.getByName(*array_joined_columns.begin()).column->convertToFullColumnIfConst();
390 const ColumnArray * any_array = typeid_cast<const ColumnArray *>(&*any_array_ptr);
391 if (!any_array)
392 throw Exception("ARRAY JOIN of not array: " + *array_joined_columns.begin(), ErrorCodes::TYPE_MISMATCH);
393
394 /// If LEFT ARRAY JOIN, then we create columns in which empty arrays are replaced by arrays with one element - the default value.
395 std::map<String, ColumnPtr> non_empty_array_columns;
396
397 if (unaligned_array_join)
398 {
399 /// Resize all array joined columns to the longest one, (at least 1 if LEFT ARRAY JOIN), padded with default values.
400 auto rows = block.rows();
401 auto uint64 = std::make_shared<DataTypeUInt64>();
402 ColumnWithTypeAndName column_of_max_length;
403 if (array_join_is_left)
404 column_of_max_length = ColumnWithTypeAndName(uint64->createColumnConst(rows, 1u), uint64, {});
405 else
406 column_of_max_length = ColumnWithTypeAndName(uint64->createColumnConst(rows, 0u), uint64, {});
407
408 for (const auto & name : array_joined_columns)
409 {
410 auto & src_col = block.getByName(name);
411
412 Block tmp_block{src_col, {{}, uint64, {}}};
413 function_length->build({src_col})->execute(tmp_block, {0}, 1, rows);
414
415 Block tmp_block2{
416 column_of_max_length, tmp_block.safeGetByPosition(1), {{}, uint64, {}}};
417 function_greatest->build({column_of_max_length, tmp_block.safeGetByPosition(1)})->execute(tmp_block2, {0, 1}, 2, rows);
418 column_of_max_length = tmp_block2.safeGetByPosition(2);
419 }
420
421 for (const auto & name : array_joined_columns)
422 {
423 auto & src_col = block.getByName(name);
424
425 Block tmp_block{src_col, column_of_max_length, {{}, src_col.type, {}}};
426 function_arrayResize->build({src_col, column_of_max_length})->execute(tmp_block, {0, 1}, 2, rows);
427 src_col.column = tmp_block.safeGetByPosition(2).column;
428 any_array_ptr = src_col.column->convertToFullColumnIfConst();
429 }
430
431 any_array = typeid_cast<const ColumnArray *>(&*any_array_ptr);
432 }
433 else if (array_join_is_left)
434 {
435 for (const auto & name : array_joined_columns)
436 {
437 auto src_col = block.getByName(name);
438
439 Block tmp_block{src_col, {{}, src_col.type, {}}};
440
441 function_builder->build({src_col})->execute(tmp_block, {0}, 1, src_col.column->size(), dry_run);
442 non_empty_array_columns[name] = tmp_block.safeGetByPosition(1).column;
443 }
444
445 any_array_ptr = non_empty_array_columns.begin()->second->convertToFullColumnIfConst();
446 any_array = &typeid_cast<const ColumnArray &>(*any_array_ptr);
447 }
448
449 size_t columns = block.columns();
450 for (size_t i = 0; i < columns; ++i)
451 {
452 ColumnWithTypeAndName & current = block.safeGetByPosition(i);
453
454 if (array_joined_columns.count(current.name))
455 {
456 if (!typeid_cast<const DataTypeArray *>(&*current.type))
457 throw Exception("ARRAY JOIN of not array: " + current.name, ErrorCodes::TYPE_MISMATCH);
458
459 ColumnPtr array_ptr = (array_join_is_left && !unaligned_array_join) ? non_empty_array_columns[current.name] : current.column;
460 array_ptr = array_ptr->convertToFullColumnIfConst();
461
462 const ColumnArray & array = typeid_cast<const ColumnArray &>(*array_ptr);
463 if (!unaligned_array_join && !array.hasEqualOffsets(typeid_cast<const ColumnArray &>(*any_array_ptr)))
464 throw Exception("Sizes of ARRAY-JOIN-ed arrays do not match", ErrorCodes::SIZES_OF_ARRAYS_DOESNT_MATCH);
465
466 current.column = typeid_cast<const ColumnArray &>(*array_ptr).getDataPtr();
467 current.type = typeid_cast<const DataTypeArray &>(*current.type).getNestedType();
468 }
469 else
470 {
471 current.column = current.column->replicate(any_array->getOffsets());
472 }
473 }
474
475 break;
476 }
477
478 case JOIN:
479 {
480 join->joinBlock(block);
481 break;
482 }
483
484 case PROJECT:
485 {
486 Block new_block;
487
488 for (size_t i = 0; i < projection.size(); ++i)
489 {
490 const std::string & name = projection[i].first;
491 const std::string & alias = projection[i].second;
492 ColumnWithTypeAndName column = block.getByName(name);
493 if (alias != "")
494 column.name = alias;
495 new_block.insert(std::move(column));
496 }
497
498 block.swap(new_block);
499
500 break;
501 }
502
503 case ADD_ALIASES:
504 {
505 for (size_t i = 0; i < projection.size(); ++i)
506 {
507 const std::string & name = projection[i].first;
508 const std::string & alias = projection[i].second;
509 const ColumnWithTypeAndName & column = block.getByName(name);
510 if (alias != "" && !block.has(alias))
511 block.insert({column.column, column.type, alias});
512 }
513 break;
514 }
515
516 case REMOVE_COLUMN:
517 block.erase(source_name);
518 break;
519
520 case ADD_COLUMN:
521 block.insert({ added_column->cloneResized(input_rows_count), result_type, result_name });
522 break;
523
524 case COPY_COLUMN:
525 if (can_replace && block.has(result_name))
526 {
527 auto & result = block.getByName(result_name);
528 const auto & source = block.getByName(source_name);
529 result.type = source.type;
530 result.column = source.column;
531 }
532 else
533 {
534 const auto & source_column = block.getByName(source_name);
535 block.insert({source_column.column, source_column.type, result_name});
536 }
537
538 break;
539 }
540}
541
542
543void ExpressionAction::executeOnTotals(Block & block) const
544{
545 if (type != JOIN)
546 execute(block, false);
547 else
548 join->joinTotals(block);
549}
550
551
552std::string ExpressionAction::toString() const
553{
554 std::stringstream ss;
555 switch (type)
556 {
557 case ADD_COLUMN:
558 ss << "ADD " << result_name << " "
559 << (result_type ? result_type->getName() : "(no type)") << " "
560 << (added_column ? added_column->getName() : "(no column)");
561 break;
562
563 case REMOVE_COLUMN:
564 ss << "REMOVE " << source_name;
565 break;
566
567 case COPY_COLUMN:
568 ss << "COPY " << result_name << " = " << source_name;
569 if (can_replace)
570 ss << " (can replace)";
571 break;
572
573 case APPLY_FUNCTION:
574 ss << "FUNCTION " << result_name << " " << (is_function_compiled ? "[compiled] " : "")
575 << (result_type ? result_type->getName() : "(no type)") << " = "
576 << (function_base ? function_base->getName() : "(no function)") << "(";
577 for (size_t i = 0; i < argument_names.size(); ++i)
578 {
579 if (i)
580 ss << ", ";
581 ss << argument_names[i];
582 }
583 ss << ")";
584 break;
585
586 case ARRAY_JOIN:
587 ss << (array_join_is_left ? "LEFT " : "") << "ARRAY JOIN ";
588 for (NameSet::const_iterator it = array_joined_columns.begin(); it != array_joined_columns.end(); ++it)
589 {
590 if (it != array_joined_columns.begin())
591 ss << ", ";
592 ss << *it;
593 }
594 break;
595
596 case JOIN:
597 ss << "JOIN ";
598 for (NamesAndTypesList::const_iterator it = table_join->columnsAddedByJoin().begin();
599 it != table_join->columnsAddedByJoin().end(); ++it)
600 {
601 if (it != table_join->columnsAddedByJoin().begin())
602 ss << ", ";
603 ss << it->name;
604 }
605 break;
606
607 case PROJECT: [[fallthrough]];
608 case ADD_ALIASES:
609 ss << (type == PROJECT ? "PROJECT " : "ADD_ALIASES ");
610 for (size_t i = 0; i < projection.size(); ++i)
611 {
612 if (i)
613 ss << ", ";
614 ss << projection[i].first;
615 if (projection[i].second != "" && projection[i].second != projection[i].first)
616 ss << " AS " << projection[i].second;
617 }
618 break;
619 }
620
621 return ss.str();
622}
623
624void ExpressionActions::checkLimits(Block & block) const
625{
626 if (settings.max_temporary_columns && block.columns() > settings.max_temporary_columns)
627 throw Exception("Too many temporary columns: " + block.dumpNames()
628 + ". Maximum: " + settings.max_temporary_columns.toString(),
629 ErrorCodes::TOO_MANY_TEMPORARY_COLUMNS);
630
631 if (settings.max_temporary_non_const_columns)
632 {
633 size_t non_const_columns = 0;
634 for (size_t i = 0, size = block.columns(); i < size; ++i)
635 if (block.safeGetByPosition(i).column && !isColumnConst(*block.safeGetByPosition(i).column))
636 ++non_const_columns;
637
638 if (non_const_columns > settings.max_temporary_non_const_columns)
639 {
640 std::stringstream list_of_non_const_columns;
641 for (size_t i = 0, size = block.columns(); i < size; ++i)
642 if (block.safeGetByPosition(i).column && !isColumnConst(*block.safeGetByPosition(i).column))
643 list_of_non_const_columns << "\n" << block.safeGetByPosition(i).name;
644
645 throw Exception("Too many temporary non-const columns:" + list_of_non_const_columns.str()
646 + ". Maximum: " + settings.max_temporary_non_const_columns.toString(),
647 ErrorCodes::TOO_MANY_TEMPORARY_NON_CONST_COLUMNS);
648 }
649 }
650}
651
652void ExpressionActions::addInput(const ColumnWithTypeAndName & column)
653{
654 input_columns.emplace_back(column.name, column.type);
655 sample_block.insert(column);
656}
657
658void ExpressionActions::addInput(const NameAndTypePair & column)
659{
660 addInput(ColumnWithTypeAndName(nullptr, column.type, column.name));
661}
662
663void ExpressionActions::add(const ExpressionAction & action, Names & out_new_columns)
664{
665 addImpl(action, out_new_columns);
666}
667
668void ExpressionActions::add(const ExpressionAction & action)
669{
670 Names new_names;
671 addImpl(action, new_names);
672}
673
674void ExpressionActions::addImpl(ExpressionAction action, Names & new_names)
675{
676 if (action.result_name != "")
677 new_names.push_back(action.result_name);
678 new_names.insert(new_names.end(), action.array_joined_columns.begin(), action.array_joined_columns.end());
679
680 /// Compiled functions are custom functions and they don't need building
681 if (action.type == ExpressionAction::APPLY_FUNCTION && !action.is_function_compiled)
682 {
683 if (sample_block.has(action.result_name))
684 throw Exception("Column '" + action.result_name + "' already exists", ErrorCodes::DUPLICATE_COLUMN);
685
686 ColumnsWithTypeAndName arguments(action.argument_names.size());
687 for (size_t i = 0; i < action.argument_names.size(); ++i)
688 {
689 if (!sample_block.has(action.argument_names[i]))
690 throw Exception("Unknown identifier: '" + action.argument_names[i] + "'", ErrorCodes::UNKNOWN_IDENTIFIER);
691 arguments[i] = sample_block.getByName(action.argument_names[i]);
692 }
693
694 action.function_base = action.function_builder->build(arguments);
695 action.result_type = action.function_base->getReturnType();
696 }
697
698 if (action.type == ExpressionAction::ADD_ALIASES)
699 for (const auto & name_with_alias : action.projection)
700 new_names.emplace_back(name_with_alias.second);
701
702 action.prepare(sample_block, settings, names_not_for_constant_folding);
703 actions.push_back(action);
704}
705
706void ExpressionActions::prependProjectInput()
707{
708 actions.insert(actions.begin(), ExpressionAction::project(getRequiredColumns()));
709}
710
711void ExpressionActions::prependArrayJoin(const ExpressionAction & action, const Block & sample_block_before)
712{
713 if (action.type != ExpressionAction::ARRAY_JOIN)
714 throw Exception("ARRAY_JOIN action expected", ErrorCodes::LOGICAL_ERROR);
715
716 NameSet array_join_set(action.array_joined_columns.begin(), action.array_joined_columns.end());
717 for (auto & it : input_columns)
718 {
719 if (array_join_set.count(it.name))
720 {
721 array_join_set.erase(it.name);
722 it.type = std::make_shared<DataTypeArray>(it.type);
723 }
724 }
725 for (const std::string & name : array_join_set)
726 {
727 input_columns.emplace_back(name, sample_block_before.getByName(name).type);
728 actions.insert(actions.begin(), ExpressionAction::removeColumn(name));
729 }
730
731 actions.insert(actions.begin(), action);
732 optimizeArrayJoin();
733}
734
735
736bool ExpressionActions::popUnusedArrayJoin(const Names & required_columns, ExpressionAction & out_action)
737{
738 if (actions.empty() || actions.back().type != ExpressionAction::ARRAY_JOIN)
739 return false;
740 NameSet required_set(required_columns.begin(), required_columns.end());
741 for (const std::string & name : actions.back().array_joined_columns)
742 {
743 if (required_set.count(name))
744 return false;
745 }
746 for (const std::string & name : actions.back().array_joined_columns)
747 {
748 DataTypePtr & type = sample_block.getByName(name).type;
749 type = std::make_shared<DataTypeArray>(type);
750 }
751 out_action = actions.back();
752 actions.pop_back();
753 return true;
754}
755
756void ExpressionActions::execute(Block & block, bool dry_run) const
757{
758 for (const auto & action : actions)
759 {
760 action.execute(block, dry_run);
761 checkLimits(block);
762 }
763}
764
765bool ExpressionActions::hasTotalsInJoin() const
766{
767 for (const auto & action : actions)
768 if (action.table_join && action.join->hasTotals())
769 return true;
770 return false;
771}
772
773void ExpressionActions::executeOnTotals(Block & block) const
774{
775 /// If there is `totals` in the subquery for JOIN, but we do not have totals, then take the block with the default values instead of `totals`.
776 if (!block)
777 {
778 if (hasTotalsInJoin())
779 {
780 for (const auto & name_and_type : input_columns)
781 {
782 auto column = name_and_type.type->createColumn();
783 column->insertDefault();
784 block.insert(ColumnWithTypeAndName(std::move(column), name_and_type.type, name_and_type.name));
785 }
786 }
787 else
788 return; /// There's nothing to JOIN.
789 }
790
791 for (const auto & action : actions)
792 action.executeOnTotals(block);
793}
794
795std::string ExpressionActions::getSmallestColumn(const NamesAndTypesList & columns)
796{
797 std::optional<size_t> min_size;
798 String res;
799
800 for (const auto & column : columns)
801 {
802 /// @todo resolve evil constant
803 size_t size = column.type->haveMaximumSizeOfValue() ? column.type->getMaximumSizeOfValueInMemory() : 100;
804
805 if (!min_size || size < *min_size)
806 {
807 min_size = size;
808 res = column.name;
809 }
810 }
811
812 if (!min_size)
813 throw Exception("No available columns", ErrorCodes::LOGICAL_ERROR);
814
815 return res;
816}
817
818void ExpressionActions::finalize(const Names & output_columns)
819{
820 NameSet final_columns;
821 for (size_t i = 0; i < output_columns.size(); ++i)
822 {
823 const std::string & name = output_columns[i];
824 if (!sample_block.has(name))
825 throw Exception("Unknown column: " + name + ", there are only columns "
826 + sample_block.dumpNames(), ErrorCodes::UNKNOWN_IDENTIFIER);
827 final_columns.insert(name);
828 }
829
830#if USE_EMBEDDED_COMPILER
831 /// This has to be done before removing redundant actions and inserting REMOVE_COLUMNs
832 /// because inlining may change dependency sets.
833 if (settings.compile_expressions)
834 compileFunctions(actions, output_columns, sample_block, compilation_cache, settings.min_count_to_compile_expression);
835#endif
836
837 /// Which columns are needed to perform actions from the current to the last.
838 NameSet needed_columns = final_columns;
839 /// Which columns nobody will touch from the current action to the last.
840 NameSet unmodified_columns;
841
842 {
843 NamesAndTypesList sample_columns = sample_block.getNamesAndTypesList();
844 for (NamesAndTypesList::iterator it = sample_columns.begin(); it != sample_columns.end(); ++it)
845 unmodified_columns.insert(it->name);
846 }
847
848 /// Let's go from the end and maintain set of required columns at this stage.
849 /// We will throw out unnecessary actions, although usually they are absent by construction.
850 for (int i = static_cast<int>(actions.size()) - 1; i >= 0; --i)
851 {
852 ExpressionAction & action = actions[i];
853 Names in = action.getNeededColumns();
854
855 if (action.type == ExpressionAction::PROJECT)
856 {
857 needed_columns = NameSet(in.begin(), in.end());
858 unmodified_columns.clear();
859 }
860 else if (action.type == ExpressionAction::ADD_ALIASES)
861 {
862 needed_columns.insert(in.begin(), in.end());
863 for (auto & name_wit_alias : action.projection)
864 {
865 auto it = unmodified_columns.find(name_wit_alias.second);
866 if (it != unmodified_columns.end())
867 unmodified_columns.erase(it);
868 }
869 }
870 else if (action.type == ExpressionAction::ARRAY_JOIN)
871 {
872 /// Do not ARRAY JOIN columns that are not used anymore.
873 /// Usually, such columns are not used until ARRAY JOIN, and therefore are ejected further in this function.
874 /// We will not remove all the columns so as not to lose the number of rows.
875 for (auto it = action.array_joined_columns.begin(); it != action.array_joined_columns.end();)
876 {
877 bool need = needed_columns.count(*it);
878 if (!need && action.array_joined_columns.size() > 1)
879 {
880 action.array_joined_columns.erase(it++);
881 }
882 else
883 {
884 needed_columns.insert(*it);
885 unmodified_columns.erase(*it);
886
887 /// If no ARRAY JOIN results are used, forcibly leave an arbitrary column at the output,
888 /// so you do not lose the number of rows.
889 if (!need)
890 final_columns.insert(*it);
891
892 ++it;
893 }
894 }
895 }
896 else
897 {
898 std::string out = action.result_name;
899 if (!out.empty())
900 {
901 /// If the result is not used and there are no side effects, throw out the action.
902 if (!needed_columns.count(out) &&
903 (action.type == ExpressionAction::APPLY_FUNCTION
904 || action.type == ExpressionAction::ADD_COLUMN
905 || action.type == ExpressionAction::COPY_COLUMN))
906 {
907 actions.erase(actions.begin() + i);
908
909 if (unmodified_columns.count(out))
910 {
911 sample_block.erase(out);
912 unmodified_columns.erase(out);
913 }
914
915 continue;
916 }
917
918 unmodified_columns.erase(out);
919 needed_columns.erase(out);
920
921 /** If the function is a constant expression, then replace the action by adding a column-constant - result.
922 * That is, we perform constant folding.
923 */
924 if (action.type == ExpressionAction::APPLY_FUNCTION && sample_block.has(out))
925 {
926 auto & result = sample_block.getByName(out);
927 if (result.column && names_not_for_constant_folding.count(result.name) == 0)
928 {
929 action.type = ExpressionAction::ADD_COLUMN;
930 action.result_type = result.type;
931 action.added_column = result.column;
932 action.function_builder = nullptr;
933 action.function_base = nullptr;
934 action.function = nullptr;
935 action.argument_names.clear();
936 in.clear();
937 }
938 }
939 }
940
941 needed_columns.insert(in.begin(), in.end());
942 }
943 }
944
945
946 /// 1) Sometimes we don't need any columns to perform actions and sometimes actions doesn't produce any columns as result.
947 /// But Block class doesn't store any information about structure itself, it uses information from column.
948 /// If we remove all columns from input or output block we will lose information about amount of rows in it.
949 /// To avoid this situation we always leaving one of the columns in required columns (input)
950 /// and output column. We choose that "redundant" column by size with help of getSmallestColumn.
951 ///
952 /// 2) Sometimes we have to read data from different Storages to execute query.
953 /// For example in 'remote' function which requires to read data from local table (for example MergeTree) and
954 /// remote table (doesn't know anything about it).
955 ///
956 /// If we have combination of two previous cases, our heuristic from (1) can choose absolutely different columns,
957 /// so generated streams with these actions will have different headers. To avoid this we addionaly rename our "redundant" column
958 /// to DUMMY_COLUMN_NAME with help of COPY_COLUMN action and consequent remove of original column.
959 /// It doesn't affect any logic, but all streams will have same "redundant" column in header called "_dummy".
960
961 /// Also, it seems like we will always have same type (UInt8) of "redundant" column, but it's not obvious.
962
963 bool dummy_column_copied = false;
964
965
966 /// We will not throw out all the input columns, so as not to lose the number of rows in the block.
967 if (needed_columns.empty() && !input_columns.empty())
968 {
969 auto colname = getSmallestColumn(input_columns);
970 needed_columns.insert(colname);
971 actions.insert(actions.begin(), ExpressionAction::copyColumn(colname, DUMMY_COLUMN_NAME, true));
972 dummy_column_copied = true;
973 }
974
975 /// We will not leave the block empty so as not to lose the number of rows in it.
976 if (final_columns.empty() && !input_columns.empty())
977 {
978 auto colname = getSmallestColumn(input_columns);
979 final_columns.insert(DUMMY_COLUMN_NAME);
980 if (!dummy_column_copied) /// otherwise we already have this column
981 actions.insert(actions.begin(), ExpressionAction::copyColumn(colname, DUMMY_COLUMN_NAME, true));
982 }
983
984 for (NamesAndTypesList::iterator it = input_columns.begin(); it != input_columns.end();)
985 {
986 NamesAndTypesList::iterator it0 = it;
987 ++it;
988 if (!needed_columns.count(it0->name))
989 {
990 if (unmodified_columns.count(it0->name))
991 sample_block.erase(it0->name);
992 input_columns.erase(it0);
993 }
994 }
995
996/* std::cerr << "\n";
997 for (const auto & action : actions)
998 std::cerr << action.toString() << "\n";
999 std::cerr << "\n";*/
1000
1001 /// Deletes unnecessary temporary columns.
1002
1003 /// If the column after performing the function `refcount = 0`, it can be deleted.
1004 std::map<String, int> columns_refcount;
1005
1006 for (const auto & name : final_columns)
1007 ++columns_refcount[name];
1008
1009 for (const auto & action : actions)
1010 {
1011 if (!action.source_name.empty())
1012 ++columns_refcount[action.source_name];
1013
1014 for (const auto & name : action.argument_names)
1015 ++columns_refcount[name];
1016
1017 for (const auto & name_alias : action.projection)
1018 ++columns_refcount[name_alias.first];
1019 }
1020
1021 Actions new_actions;
1022 new_actions.reserve(actions.size());
1023
1024 for (const auto & action : actions)
1025 {
1026 new_actions.push_back(action);
1027
1028 auto process = [&] (const String & name)
1029 {
1030 auto refcount = --columns_refcount[name];
1031 if (refcount <= 0)
1032 {
1033 new_actions.push_back(ExpressionAction::removeColumn(name));
1034 if (sample_block.has(name))
1035 sample_block.erase(name);
1036 }
1037 };
1038
1039 if (!action.source_name.empty())
1040 process(action.source_name);
1041
1042 for (const auto & name : action.argument_names)
1043 process(name);
1044
1045 /// For `projection`, there is no reduction in `refcount`, because the `project` action replaces the names of the columns, in effect, already deleting them under the old names.
1046 }
1047
1048 actions.swap(new_actions);
1049
1050/* std::cerr << "\n";
1051 for (const auto & action : actions)
1052 std::cerr << action.toString() << "\n";
1053 std::cerr << "\n";*/
1054
1055 optimizeArrayJoin();
1056 checkLimits(sample_block);
1057}
1058
1059
1060std::string ExpressionActions::dumpActions() const
1061{
1062 std::stringstream ss;
1063
1064 ss << "input:\n";
1065 for (NamesAndTypesList::const_iterator it = input_columns.begin(); it != input_columns.end(); ++it)
1066 ss << it->name << " " << it->type->getName() << "\n";
1067
1068 ss << "\nactions:\n";
1069 for (size_t i = 0; i < actions.size(); ++i)
1070 ss << actions[i].toString() << '\n';
1071
1072 ss << "\noutput:\n";
1073 NamesAndTypesList output_columns = sample_block.getNamesAndTypesList();
1074 for (NamesAndTypesList::const_iterator it = output_columns.begin(); it != output_columns.end(); ++it)
1075 ss << it->name << " " << it->type->getName() << "\n";
1076
1077 return ss.str();
1078}
1079
1080void ExpressionActions::optimizeArrayJoin()
1081{
1082 const size_t NONE = actions.size();
1083 size_t first_array_join = NONE;
1084
1085 /// Columns that need to be evaluated for arrayJoin.
1086 /// Actions for adding them can not be moved to the left of the arrayJoin.
1087 NameSet array_joined_columns;
1088
1089 /// Columns needed to evaluate arrayJoin or those that depend on it.
1090 /// Actions to delete them can not be moved to the left of the arrayJoin.
1091 NameSet array_join_dependencies;
1092
1093 for (size_t i = 0; i < actions.size(); ++i)
1094 {
1095 /// Do not move the action to the right of the projection (the more that they are not usually there).
1096 if (actions[i].type == ExpressionAction::PROJECT)
1097 break;
1098
1099 bool depends_on_array_join = false;
1100 Names needed;
1101
1102 if (actions[i].type == ExpressionAction::ARRAY_JOIN)
1103 {
1104 depends_on_array_join = true;
1105 needed = actions[i].getNeededColumns();
1106 }
1107 else
1108 {
1109 if (first_array_join == NONE)
1110 continue;
1111
1112 needed = actions[i].getNeededColumns();
1113
1114 for (size_t j = 0; j < needed.size(); ++j)
1115 {
1116 if (array_joined_columns.count(needed[j]))
1117 {
1118 depends_on_array_join = true;
1119 break;
1120 }
1121 }
1122 }
1123
1124 if (depends_on_array_join)
1125 {
1126 if (first_array_join == NONE)
1127 first_array_join = i;
1128
1129 if (actions[i].result_name != "")
1130 array_joined_columns.insert(actions[i].result_name);
1131 array_joined_columns.insert(actions[i].array_joined_columns.begin(), actions[i].array_joined_columns.end());
1132
1133 array_join_dependencies.insert(needed.begin(), needed.end());
1134 }
1135 else
1136 {
1137 bool can_move = false;
1138
1139 if (actions[i].type == ExpressionAction::REMOVE_COLUMN)
1140 {
1141 /// If you delete a column that is not needed for arrayJoin (and those who depend on it), you can delete it before arrayJoin.
1142 can_move = !array_join_dependencies.count(actions[i].source_name);
1143 }
1144 else
1145 {
1146 /// If the action does not delete the columns and does not depend on the result of arrayJoin, you can make it until arrayJoin.
1147 can_move = true;
1148 }
1149
1150 /// Move the current action to the position just before the first arrayJoin.
1151 if (can_move)
1152 {
1153 /// Move the i-th element to the position `first_array_join`.
1154 std::rotate(actions.begin() + first_array_join, actions.begin() + i, actions.begin() + i + 1);
1155 ++first_array_join;
1156 }
1157 }
1158 }
1159}
1160
1161
1162JoinPtr ExpressionActions::getTableJoinAlgo() const
1163{
1164 for (const auto & action : actions)
1165 if (action.join)
1166 return action.join;
1167 return {};
1168}
1169
1170
1171bool ExpressionActions::resultIsAlwaysEmpty() const
1172{
1173 /// Check that has join which returns empty result.
1174
1175 for (auto & action : actions)
1176 {
1177 if (action.type == action.JOIN && action.join && action.join->alwaysReturnsEmptySet())
1178 return true;
1179 }
1180
1181 return false;
1182}
1183
1184
1185bool ExpressionActions::checkColumnIsAlwaysFalse(const String & column_name) const
1186{
1187 /// Check has column in (empty set).
1188 String set_to_check;
1189
1190 for (auto it = actions.rbegin(); it != actions.rend(); ++it)
1191 {
1192 auto & action = *it;
1193 if (action.type == action.APPLY_FUNCTION && action.function_base)
1194 {
1195 auto name = action.function_base->getName();
1196 if ((name == "in" || name == "globalIn")
1197 && action.result_name == column_name
1198 && action.argument_names.size() > 1)
1199 {
1200 set_to_check = action.argument_names[1];
1201 break;
1202 }
1203 }
1204 }
1205
1206 if (!set_to_check.empty())
1207 {
1208 for (auto & action : actions)
1209 {
1210 if (action.type == action.ADD_COLUMN && action.result_name == set_to_check)
1211 {
1212 // Constant ColumnSet cannot be empty, so we only need to check non-constant ones.
1213 if (auto * column_set = checkAndGetColumn<const ColumnSet>(action.added_column.get()))
1214 {
1215 if (column_set->getData()->isCreated() && column_set->getData()->getTotalRowCount() == 0)
1216 return true;
1217 }
1218 }
1219 }
1220 }
1221
1222 return false;
1223}
1224
1225
1226/// It is not important to calculate the hash of individual strings or their concatenation
1227UInt128 ExpressionAction::ActionHash::operator()(const ExpressionAction & action) const
1228{
1229 SipHash hash;
1230 hash.update(action.type);
1231 hash.update(action.is_function_compiled);
1232 switch (action.type)
1233 {
1234 case ADD_COLUMN:
1235 hash.update(action.result_name);
1236 if (action.result_type)
1237 hash.update(action.result_type->getName());
1238 if (action.added_column)
1239 hash.update(action.added_column->getName());
1240 break;
1241 case REMOVE_COLUMN:
1242 hash.update(action.source_name);
1243 break;
1244 case COPY_COLUMN:
1245 hash.update(action.result_name);
1246 hash.update(action.source_name);
1247 break;
1248 case APPLY_FUNCTION:
1249 hash.update(action.result_name);
1250 if (action.result_type)
1251 hash.update(action.result_type->getName());
1252 if (action.function_base)
1253 {
1254 hash.update(action.function_base->getName());
1255 for (const auto & arg_type : action.function_base->getArgumentTypes())
1256 hash.update(arg_type->getName());
1257 }
1258 for (const auto & arg_name : action.argument_names)
1259 hash.update(arg_name);
1260 break;
1261 case ARRAY_JOIN:
1262 hash.update(action.array_join_is_left);
1263 for (const auto & col : action.array_joined_columns)
1264 hash.update(col);
1265 break;
1266 case JOIN:
1267 for (const auto & col : action.table_join->columnsAddedByJoin())
1268 hash.update(col.name);
1269 break;
1270 case PROJECT:
1271 for (const auto & pair_of_strs : action.projection)
1272 {
1273 hash.update(pair_of_strs.first);
1274 hash.update(pair_of_strs.second);
1275 }
1276 break;
1277 case ADD_ALIASES:
1278 break;
1279 }
1280 UInt128 result;
1281 hash.get128(result.low, result.high);
1282 return result;
1283}
1284
1285bool ExpressionAction::operator==(const ExpressionAction & other) const
1286{
1287 if (result_type != other.result_type)
1288 {
1289 if (result_type == nullptr || other.result_type == nullptr)
1290 return false;
1291 else if (!result_type->equals(*other.result_type))
1292 return false;
1293 }
1294
1295 if (function_base != other.function_base)
1296 {
1297 if (function_base == nullptr || other.function_base == nullptr)
1298 return false;
1299 else if (function_base->getName() != other.function_base->getName())
1300 return false;
1301
1302 const auto & my_arg_types = function_base->getArgumentTypes();
1303 const auto & other_arg_types = other.function_base->getArgumentTypes();
1304 if (my_arg_types.size() != other_arg_types.size())
1305 return false;
1306
1307 for (size_t i = 0; i < my_arg_types.size(); ++i)
1308 if (!my_arg_types[i]->equals(*other_arg_types[i]))
1309 return false;
1310 }
1311
1312 if (added_column != other.added_column)
1313 {
1314 if (added_column == nullptr || other.added_column == nullptr)
1315 return false;
1316 else if (added_column->getName() != other.added_column->getName())
1317 return false;
1318 }
1319
1320 return source_name == other.source_name
1321 && result_name == other.result_name
1322 && argument_names == other.argument_names
1323 && array_joined_columns == other.array_joined_columns
1324 && array_join_is_left == other.array_join_is_left
1325 && AnalyzedJoin::sameJoin(table_join.get(), other.table_join.get())
1326 && projection == other.projection
1327 && is_function_compiled == other.is_function_compiled;
1328}
1329
1330void ExpressionActionsChain::addStep()
1331{
1332 if (steps.empty())
1333 throw Exception("Cannot add action to empty ExpressionActionsChain", ErrorCodes::LOGICAL_ERROR);
1334
1335 ColumnsWithTypeAndName columns = steps.back().actions->getSampleBlock().getColumnsWithTypeAndName();
1336 steps.push_back(Step(std::make_shared<ExpressionActions>(columns, context)));
1337}
1338
1339void ExpressionActionsChain::finalize()
1340{
1341 /// Finalize all steps. Right to left to define unnecessary input columns.
1342 for (int i = static_cast<int>(steps.size()) - 1; i >= 0; --i)
1343 {
1344 Names required_output = steps[i].required_output;
1345 std::unordered_map<String, size_t> required_output_indexes;
1346 for (size_t j = 0; j < required_output.size(); ++j)
1347 required_output_indexes[required_output[j]] = j;
1348 auto & can_remove_required_output = steps[i].can_remove_required_output;
1349
1350 if (i + 1 < static_cast<int>(steps.size()))
1351 {
1352 const NameSet & additional_input = steps[i + 1].additional_input;
1353 for (const auto & it : steps[i + 1].actions->getRequiredColumnsWithTypes())
1354 {
1355 if (additional_input.count(it.name) == 0)
1356 {
1357 auto iter = required_output_indexes.find(it.name);
1358 if (iter == required_output_indexes.end())
1359 required_output.push_back(it.name);
1360 else if (!can_remove_required_output.empty())
1361 can_remove_required_output[iter->second] = false;
1362 }
1363 }
1364 }
1365 steps[i].actions->finalize(required_output);
1366 }
1367
1368 /// When possible, move the ARRAY JOIN from earlier steps to later steps.
1369 for (size_t i = 1; i < steps.size(); ++i)
1370 {
1371 ExpressionAction action;
1372 if (steps[i - 1].actions->popUnusedArrayJoin(steps[i - 1].required_output, action))
1373 steps[i].actions->prependArrayJoin(action, steps[i - 1].actions->getSampleBlock());
1374 }
1375
1376 /// Adding the ejection of unnecessary columns to the beginning of each step.
1377 for (size_t i = 1; i < steps.size(); ++i)
1378 {
1379 size_t columns_from_previous = steps[i - 1].actions->getSampleBlock().columns();
1380
1381 /// If unnecessary columns are formed at the output of the previous step, we'll add them to the beginning of this step.
1382 /// Except when we drop all the columns and lose the number of rows in the block.
1383 if (!steps[i].actions->getRequiredColumnsWithTypes().empty()
1384 && columns_from_previous > steps[i].actions->getRequiredColumnsWithTypes().size())
1385 steps[i].actions->prependProjectInput();
1386 }
1387}
1388
1389std::string ExpressionActionsChain::dumpChain()
1390{
1391 std::stringstream ss;
1392
1393 for (size_t i = 0; i < steps.size(); ++i)
1394 {
1395 ss << "step " << i << "\n";
1396 ss << "required output:\n";
1397 for (const std::string & name : steps[i].required_output)
1398 ss << name << "\n";
1399 ss << "\n" << steps[i].actions->dumpActions() << "\n";
1400 }
1401
1402 return ss.str();
1403}
1404
1405}
1406