| 1 | #include "config_core.h" |
| 2 | #include <Interpreters/Set.h> |
| 3 | #include <Common/ProfileEvents.h> |
| 4 | #include <Common/SipHash.h> |
| 5 | #include <Interpreters/ExpressionActions.h> |
| 6 | #include <Interpreters/ExpressionJIT.h> |
| 7 | #include <Interpreters/AnalyzedJoin.h> |
| 8 | #include <Columns/ColumnsNumber.h> |
| 9 | #include <Columns/ColumnArray.h> |
| 10 | #include <Common/typeid_cast.h> |
| 11 | #include <DataTypes/DataTypeArray.h> |
| 12 | #include <DataTypes/DataTypesNumber.h> |
| 13 | #include <Functions/FunctionFactory.h> |
| 14 | #include <Functions/IFunction.h> |
| 15 | #include <set> |
| 16 | #include <optional> |
| 17 | #include <Columns/ColumnSet.h> |
| 18 | #include <Functions/FunctionHelpers.h> |
| 19 | |
| 20 | |
| 21 | namespace ProfileEvents |
| 22 | { |
| 23 | extern const Event FunctionExecute; |
| 24 | extern const Event CompiledFunctionExecute; |
| 25 | } |
| 26 | |
| 27 | namespace DB |
| 28 | { |
| 29 | |
| 30 | namespace ErrorCodes |
| 31 | { |
| 32 | extern const int DUPLICATE_COLUMN; |
| 33 | extern const int UNKNOWN_IDENTIFIER; |
| 34 | extern const int UNKNOWN_ACTION; |
| 35 | extern const int NOT_FOUND_COLUMN_IN_BLOCK; |
| 36 | extern const int SIZES_OF_ARRAYS_DOESNT_MATCH; |
| 37 | extern const int TOO_MANY_TEMPORARY_COLUMNS; |
| 38 | extern const int TOO_MANY_TEMPORARY_NON_CONST_COLUMNS; |
| 39 | extern const int TYPE_MISMATCH; |
| 40 | } |
| 41 | |
| 42 | /// Read comment near usage |
| 43 | static constexpr auto DUMMY_COLUMN_NAME = "_dummy" ; |
| 44 | |
| 45 | Names ExpressionAction::getNeededColumns() const |
| 46 | { |
| 47 | Names res = argument_names; |
| 48 | |
| 49 | res.insert(res.end(), array_joined_columns.begin(), array_joined_columns.end()); |
| 50 | |
| 51 | if (table_join) |
| 52 | res.insert(res.end(), table_join->keyNamesLeft().begin(), table_join->keyNamesLeft().end()); |
| 53 | |
| 54 | for (const auto & column : projection) |
| 55 | res.push_back(column.first); |
| 56 | |
| 57 | if (!source_name.empty()) |
| 58 | res.push_back(source_name); |
| 59 | |
| 60 | return res; |
| 61 | } |
| 62 | |
| 63 | |
| 64 | ExpressionAction ExpressionAction::applyFunction( |
| 65 | const FunctionOverloadResolverPtr & function_, |
| 66 | const std::vector<std::string> & argument_names_, |
| 67 | std::string result_name_) |
| 68 | { |
| 69 | if (result_name_ == "" ) |
| 70 | { |
| 71 | result_name_ = function_->getName() + "(" ; |
| 72 | for (size_t i = 0 ; i < argument_names_.size(); ++i) |
| 73 | { |
| 74 | if (i) |
| 75 | result_name_ += ", " ; |
| 76 | result_name_ += argument_names_[i]; |
| 77 | } |
| 78 | result_name_ += ")" ; |
| 79 | } |
| 80 | |
| 81 | ExpressionAction a; |
| 82 | a.type = APPLY_FUNCTION; |
| 83 | a.result_name = result_name_; |
| 84 | a.function_builder = function_; |
| 85 | a.argument_names = argument_names_; |
| 86 | return a; |
| 87 | } |
| 88 | |
| 89 | ExpressionAction ExpressionAction::addColumn( |
| 90 | const ColumnWithTypeAndName & added_column_) |
| 91 | { |
| 92 | ExpressionAction a; |
| 93 | a.type = ADD_COLUMN; |
| 94 | a.result_name = added_column_.name; |
| 95 | a.result_type = added_column_.type; |
| 96 | a.added_column = added_column_.column; |
| 97 | return a; |
| 98 | } |
| 99 | |
| 100 | ExpressionAction ExpressionAction::removeColumn(const std::string & removed_name) |
| 101 | { |
| 102 | ExpressionAction a; |
| 103 | a.type = REMOVE_COLUMN; |
| 104 | a.source_name = removed_name; |
| 105 | return a; |
| 106 | } |
| 107 | |
| 108 | ExpressionAction ExpressionAction::copyColumn(const std::string & from_name, const std::string & to_name, bool can_replace) |
| 109 | { |
| 110 | ExpressionAction a; |
| 111 | a.type = COPY_COLUMN; |
| 112 | a.source_name = from_name; |
| 113 | a.result_name = to_name; |
| 114 | a.can_replace = can_replace; |
| 115 | return a; |
| 116 | } |
| 117 | |
| 118 | ExpressionAction ExpressionAction::project(const NamesWithAliases & projected_columns_) |
| 119 | { |
| 120 | ExpressionAction a; |
| 121 | a.type = PROJECT; |
| 122 | a.projection = projected_columns_; |
| 123 | return a; |
| 124 | } |
| 125 | |
| 126 | ExpressionAction ExpressionAction::project(const Names & projected_columns_) |
| 127 | { |
| 128 | ExpressionAction a; |
| 129 | a.type = PROJECT; |
| 130 | a.projection.resize(projected_columns_.size()); |
| 131 | for (size_t i = 0; i < projected_columns_.size(); ++i) |
| 132 | a.projection[i] = NameWithAlias(projected_columns_[i], "" ); |
| 133 | return a; |
| 134 | } |
| 135 | |
| 136 | ExpressionAction ExpressionAction::addAliases(const NamesWithAliases & aliased_columns_) |
| 137 | { |
| 138 | ExpressionAction a; |
| 139 | a.type = ADD_ALIASES; |
| 140 | a.projection = aliased_columns_; |
| 141 | return a; |
| 142 | } |
| 143 | |
| 144 | ExpressionAction ExpressionAction::arrayJoin(const NameSet & array_joined_columns, bool array_join_is_left, const Context & context) |
| 145 | { |
| 146 | if (array_joined_columns.empty()) |
| 147 | throw Exception("No arrays to join" , ErrorCodes::LOGICAL_ERROR); |
| 148 | ExpressionAction a; |
| 149 | a.type = ARRAY_JOIN; |
| 150 | a.array_joined_columns = array_joined_columns; |
| 151 | a.array_join_is_left = array_join_is_left; |
| 152 | a.unaligned_array_join = context.getSettingsRef().enable_unaligned_array_join; |
| 153 | |
| 154 | if (a.unaligned_array_join) |
| 155 | { |
| 156 | a.function_length = FunctionFactory::instance().get("length" , context); |
| 157 | a.function_greatest = FunctionFactory::instance().get("greatest" , context); |
| 158 | a.function_arrayResize = FunctionFactory::instance().get("arrayResize" , context); |
| 159 | } |
| 160 | else if (array_join_is_left) |
| 161 | a.function_builder = FunctionFactory::instance().get("emptyArrayToSingle" , context); |
| 162 | |
| 163 | return a; |
| 164 | } |
| 165 | |
| 166 | ExpressionAction ExpressionAction::ordinaryJoin(std::shared_ptr<AnalyzedJoin> table_join, JoinPtr join) |
| 167 | { |
| 168 | ExpressionAction a; |
| 169 | a.type = JOIN; |
| 170 | a.table_join = table_join; |
| 171 | a.join = join; |
| 172 | return a; |
| 173 | } |
| 174 | |
| 175 | |
| 176 | void ExpressionAction::prepare(Block & sample_block, const Settings & settings, NameSet & names_not_for_constant_folding) |
| 177 | { |
| 178 | // std::cerr << "preparing: " << toString() << std::endl; |
| 179 | |
| 180 | /** Constant expressions should be evaluated, and put the result in sample_block. |
| 181 | */ |
| 182 | |
| 183 | switch (type) |
| 184 | { |
| 185 | case APPLY_FUNCTION: |
| 186 | { |
| 187 | if (sample_block.has(result_name)) |
| 188 | throw Exception("Column '" + result_name + "' already exists" , ErrorCodes::DUPLICATE_COLUMN); |
| 189 | |
| 190 | bool all_const = true; |
| 191 | bool all_suitable_for_constant_folding = true; |
| 192 | |
| 193 | ColumnNumbers arguments(argument_names.size()); |
| 194 | for (size_t i = 0; i < argument_names.size(); ++i) |
| 195 | { |
| 196 | arguments[i] = sample_block.getPositionByName(argument_names[i]); |
| 197 | ColumnPtr col = sample_block.safeGetByPosition(arguments[i]).column; |
| 198 | if (!col || !isColumnConst(*col)) |
| 199 | all_const = false; |
| 200 | |
| 201 | if (names_not_for_constant_folding.count(argument_names[i])) |
| 202 | all_suitable_for_constant_folding = false; |
| 203 | } |
| 204 | |
| 205 | size_t result_position = sample_block.columns(); |
| 206 | sample_block.insert({nullptr, result_type, result_name}); |
| 207 | function = function_base->prepare(sample_block, arguments, result_position); |
| 208 | function->createLowCardinalityResultCache(settings.max_threads); |
| 209 | |
| 210 | bool compile_expressions = false; |
| 211 | #if USE_EMBEDDED_COMPILER |
| 212 | compile_expressions = settings.compile_expressions; |
| 213 | #endif |
| 214 | /// If all arguments are constants, and function is suitable to be executed in 'prepare' stage - execute function. |
| 215 | /// But if we compile expressions compiled version of this function maybe placed in cache, |
| 216 | /// so we don't want to unfold non deterministic functions |
| 217 | if (all_const && function_base->isSuitableForConstantFolding() && (!compile_expressions || function_base->isDeterministic())) |
| 218 | { |
| 219 | function->execute(sample_block, arguments, result_position, sample_block.rows(), true); |
| 220 | |
| 221 | /// If the result is not a constant, just in case, we will consider the result as unknown. |
| 222 | ColumnWithTypeAndName & col = sample_block.safeGetByPosition(result_position); |
| 223 | if (!isColumnConst(*col.column)) |
| 224 | { |
| 225 | col.column = nullptr; |
| 226 | } |
| 227 | else |
| 228 | { |
| 229 | /// All constant (literal) columns in block are added with size 1. |
| 230 | /// But if there was no columns in block before executing a function, the result has size 0. |
| 231 | /// Change the size to 1. |
| 232 | |
| 233 | if (col.column->empty()) |
| 234 | col.column = col.column->cloneResized(1); |
| 235 | |
| 236 | if (!all_suitable_for_constant_folding) |
| 237 | names_not_for_constant_folding.insert(result_name); |
| 238 | } |
| 239 | } |
| 240 | |
| 241 | /// Some functions like ignore() or getTypeName() always return constant result even if arguments are not constant. |
| 242 | /// We can't do constant folding, but can specify in sample block that function result is constant to avoid |
| 243 | /// unnecessary materialization. |
| 244 | auto & res = sample_block.getByPosition(result_position); |
| 245 | if (!res.column && function_base->isSuitableForConstantFolding()) |
| 246 | { |
| 247 | if (auto col = function_base->getResultIfAlwaysReturnsConstantAndHasArguments(sample_block, arguments)) |
| 248 | { |
| 249 | res.column = std::move(col); |
| 250 | names_not_for_constant_folding.insert(result_name); |
| 251 | } |
| 252 | } |
| 253 | |
| 254 | break; |
| 255 | } |
| 256 | |
| 257 | case ARRAY_JOIN: |
| 258 | { |
| 259 | for (const auto & name : array_joined_columns) |
| 260 | { |
| 261 | ColumnWithTypeAndName & current = sample_block.getByName(name); |
| 262 | const DataTypeArray * array_type = typeid_cast<const DataTypeArray *>(&*current.type); |
| 263 | if (!array_type) |
| 264 | throw Exception("ARRAY JOIN requires array argument" , ErrorCodes::TYPE_MISMATCH); |
| 265 | current.type = array_type->getNestedType(); |
| 266 | current.column = nullptr; |
| 267 | } |
| 268 | |
| 269 | break; |
| 270 | } |
| 271 | |
| 272 | case JOIN: |
| 273 | { |
| 274 | table_join->addJoinedColumnsAndCorrectNullability(sample_block); |
| 275 | break; |
| 276 | } |
| 277 | |
| 278 | case PROJECT: |
| 279 | { |
| 280 | Block new_block; |
| 281 | |
| 282 | for (size_t i = 0; i < projection.size(); ++i) |
| 283 | { |
| 284 | const std::string & name = projection[i].first; |
| 285 | const std::string & alias = projection[i].second; |
| 286 | ColumnWithTypeAndName column = sample_block.getByName(name); |
| 287 | if (alias != "" ) |
| 288 | column.name = alias; |
| 289 | new_block.insert(std::move(column)); |
| 290 | } |
| 291 | |
| 292 | sample_block.swap(new_block); |
| 293 | break; |
| 294 | } |
| 295 | |
| 296 | case ADD_ALIASES: |
| 297 | { |
| 298 | for (size_t i = 0; i < projection.size(); ++i) |
| 299 | { |
| 300 | const std::string & name = projection[i].first; |
| 301 | const std::string & alias = projection[i].second; |
| 302 | const ColumnWithTypeAndName & column = sample_block.getByName(name); |
| 303 | if (alias != "" && !sample_block.has(alias)) |
| 304 | sample_block.insert({column.column, column.type, alias}); |
| 305 | } |
| 306 | break; |
| 307 | } |
| 308 | |
| 309 | case REMOVE_COLUMN: |
| 310 | { |
| 311 | sample_block.erase(source_name); |
| 312 | break; |
| 313 | } |
| 314 | |
| 315 | case ADD_COLUMN: |
| 316 | { |
| 317 | if (sample_block.has(result_name)) |
| 318 | throw Exception("Column '" + result_name + "' already exists" , ErrorCodes::DUPLICATE_COLUMN); |
| 319 | |
| 320 | sample_block.insert(ColumnWithTypeAndName(added_column, result_type, result_name)); |
| 321 | break; |
| 322 | } |
| 323 | |
| 324 | case COPY_COLUMN: |
| 325 | { |
| 326 | const auto & source = sample_block.getByName(source_name); |
| 327 | result_type = source.type; |
| 328 | |
| 329 | if (sample_block.has(result_name)) |
| 330 | { |
| 331 | if (can_replace) |
| 332 | { |
| 333 | auto & result = sample_block.getByName(result_name); |
| 334 | result.type = result_type; |
| 335 | result.column = source.column; |
| 336 | } |
| 337 | else |
| 338 | throw Exception("Column '" + result_name + "' already exists" , ErrorCodes::DUPLICATE_COLUMN); |
| 339 | } |
| 340 | else |
| 341 | sample_block.insert(ColumnWithTypeAndName(source.column, result_type, result_name)); |
| 342 | |
| 343 | break; |
| 344 | } |
| 345 | } |
| 346 | } |
| 347 | |
| 348 | |
| 349 | void ExpressionAction::execute(Block & block, bool dry_run) const |
| 350 | { |
| 351 | size_t input_rows_count = block.rows(); |
| 352 | |
| 353 | if (type == REMOVE_COLUMN || type == COPY_COLUMN) |
| 354 | if (!block.has(source_name)) |
| 355 | throw Exception("Not found column '" + source_name + "'. There are columns: " + block.dumpNames(), ErrorCodes::NOT_FOUND_COLUMN_IN_BLOCK); |
| 356 | |
| 357 | if (type == ADD_COLUMN || (type == COPY_COLUMN && !can_replace) || type == APPLY_FUNCTION) |
| 358 | if (block.has(result_name)) |
| 359 | throw Exception("Column '" + result_name + "' already exists" , ErrorCodes::DUPLICATE_COLUMN); |
| 360 | |
| 361 | switch (type) |
| 362 | { |
| 363 | case APPLY_FUNCTION: |
| 364 | { |
| 365 | ColumnNumbers arguments(argument_names.size()); |
| 366 | for (size_t i = 0; i < argument_names.size(); ++i) |
| 367 | { |
| 368 | if (!block.has(argument_names[i])) |
| 369 | throw Exception("Not found column: '" + argument_names[i] + "'" , ErrorCodes::NOT_FOUND_COLUMN_IN_BLOCK); |
| 370 | arguments[i] = block.getPositionByName(argument_names[i]); |
| 371 | } |
| 372 | |
| 373 | size_t num_columns_without_result = block.columns(); |
| 374 | block.insert({ nullptr, result_type, result_name}); |
| 375 | |
| 376 | ProfileEvents::increment(ProfileEvents::FunctionExecute); |
| 377 | if (is_function_compiled) |
| 378 | ProfileEvents::increment(ProfileEvents::CompiledFunctionExecute); |
| 379 | function->execute(block, arguments, num_columns_without_result, input_rows_count, dry_run); |
| 380 | |
| 381 | break; |
| 382 | } |
| 383 | |
| 384 | case ARRAY_JOIN: |
| 385 | { |
| 386 | if (array_joined_columns.empty()) |
| 387 | throw Exception("No arrays to join" , ErrorCodes::LOGICAL_ERROR); |
| 388 | |
| 389 | ColumnPtr any_array_ptr = block.getByName(*array_joined_columns.begin()).column->convertToFullColumnIfConst(); |
| 390 | const ColumnArray * any_array = typeid_cast<const ColumnArray *>(&*any_array_ptr); |
| 391 | if (!any_array) |
| 392 | throw Exception("ARRAY JOIN of not array: " + *array_joined_columns.begin(), ErrorCodes::TYPE_MISMATCH); |
| 393 | |
| 394 | /// If LEFT ARRAY JOIN, then we create columns in which empty arrays are replaced by arrays with one element - the default value. |
| 395 | std::map<String, ColumnPtr> non_empty_array_columns; |
| 396 | |
| 397 | if (unaligned_array_join) |
| 398 | { |
| 399 | /// Resize all array joined columns to the longest one, (at least 1 if LEFT ARRAY JOIN), padded with default values. |
| 400 | auto rows = block.rows(); |
| 401 | auto uint64 = std::make_shared<DataTypeUInt64>(); |
| 402 | ColumnWithTypeAndName column_of_max_length; |
| 403 | if (array_join_is_left) |
| 404 | column_of_max_length = ColumnWithTypeAndName(uint64->createColumnConst(rows, 1u), uint64, {}); |
| 405 | else |
| 406 | column_of_max_length = ColumnWithTypeAndName(uint64->createColumnConst(rows, 0u), uint64, {}); |
| 407 | |
| 408 | for (const auto & name : array_joined_columns) |
| 409 | { |
| 410 | auto & src_col = block.getByName(name); |
| 411 | |
| 412 | Block tmp_block{src_col, {{}, uint64, {}}}; |
| 413 | function_length->build({src_col})->execute(tmp_block, {0}, 1, rows); |
| 414 | |
| 415 | Block tmp_block2{ |
| 416 | column_of_max_length, tmp_block.safeGetByPosition(1), {{}, uint64, {}}}; |
| 417 | function_greatest->build({column_of_max_length, tmp_block.safeGetByPosition(1)})->execute(tmp_block2, {0, 1}, 2, rows); |
| 418 | column_of_max_length = tmp_block2.safeGetByPosition(2); |
| 419 | } |
| 420 | |
| 421 | for (const auto & name : array_joined_columns) |
| 422 | { |
| 423 | auto & src_col = block.getByName(name); |
| 424 | |
| 425 | Block tmp_block{src_col, column_of_max_length, {{}, src_col.type, {}}}; |
| 426 | function_arrayResize->build({src_col, column_of_max_length})->execute(tmp_block, {0, 1}, 2, rows); |
| 427 | src_col.column = tmp_block.safeGetByPosition(2).column; |
| 428 | any_array_ptr = src_col.column->convertToFullColumnIfConst(); |
| 429 | } |
| 430 | |
| 431 | any_array = typeid_cast<const ColumnArray *>(&*any_array_ptr); |
| 432 | } |
| 433 | else if (array_join_is_left) |
| 434 | { |
| 435 | for (const auto & name : array_joined_columns) |
| 436 | { |
| 437 | auto src_col = block.getByName(name); |
| 438 | |
| 439 | Block tmp_block{src_col, {{}, src_col.type, {}}}; |
| 440 | |
| 441 | function_builder->build({src_col})->execute(tmp_block, {0}, 1, src_col.column->size(), dry_run); |
| 442 | non_empty_array_columns[name] = tmp_block.safeGetByPosition(1).column; |
| 443 | } |
| 444 | |
| 445 | any_array_ptr = non_empty_array_columns.begin()->second->convertToFullColumnIfConst(); |
| 446 | any_array = &typeid_cast<const ColumnArray &>(*any_array_ptr); |
| 447 | } |
| 448 | |
| 449 | size_t columns = block.columns(); |
| 450 | for (size_t i = 0; i < columns; ++i) |
| 451 | { |
| 452 | ColumnWithTypeAndName & current = block.safeGetByPosition(i); |
| 453 | |
| 454 | if (array_joined_columns.count(current.name)) |
| 455 | { |
| 456 | if (!typeid_cast<const DataTypeArray *>(&*current.type)) |
| 457 | throw Exception("ARRAY JOIN of not array: " + current.name, ErrorCodes::TYPE_MISMATCH); |
| 458 | |
| 459 | ColumnPtr array_ptr = (array_join_is_left && !unaligned_array_join) ? non_empty_array_columns[current.name] : current.column; |
| 460 | array_ptr = array_ptr->convertToFullColumnIfConst(); |
| 461 | |
| 462 | const ColumnArray & array = typeid_cast<const ColumnArray &>(*array_ptr); |
| 463 | if (!unaligned_array_join && !array.hasEqualOffsets(typeid_cast<const ColumnArray &>(*any_array_ptr))) |
| 464 | throw Exception("Sizes of ARRAY-JOIN-ed arrays do not match" , ErrorCodes::SIZES_OF_ARRAYS_DOESNT_MATCH); |
| 465 | |
| 466 | current.column = typeid_cast<const ColumnArray &>(*array_ptr).getDataPtr(); |
| 467 | current.type = typeid_cast<const DataTypeArray &>(*current.type).getNestedType(); |
| 468 | } |
| 469 | else |
| 470 | { |
| 471 | current.column = current.column->replicate(any_array->getOffsets()); |
| 472 | } |
| 473 | } |
| 474 | |
| 475 | break; |
| 476 | } |
| 477 | |
| 478 | case JOIN: |
| 479 | { |
| 480 | join->joinBlock(block); |
| 481 | break; |
| 482 | } |
| 483 | |
| 484 | case PROJECT: |
| 485 | { |
| 486 | Block new_block; |
| 487 | |
| 488 | for (size_t i = 0; i < projection.size(); ++i) |
| 489 | { |
| 490 | const std::string & name = projection[i].first; |
| 491 | const std::string & alias = projection[i].second; |
| 492 | ColumnWithTypeAndName column = block.getByName(name); |
| 493 | if (alias != "" ) |
| 494 | column.name = alias; |
| 495 | new_block.insert(std::move(column)); |
| 496 | } |
| 497 | |
| 498 | block.swap(new_block); |
| 499 | |
| 500 | break; |
| 501 | } |
| 502 | |
| 503 | case ADD_ALIASES: |
| 504 | { |
| 505 | for (size_t i = 0; i < projection.size(); ++i) |
| 506 | { |
| 507 | const std::string & name = projection[i].first; |
| 508 | const std::string & alias = projection[i].second; |
| 509 | const ColumnWithTypeAndName & column = block.getByName(name); |
| 510 | if (alias != "" && !block.has(alias)) |
| 511 | block.insert({column.column, column.type, alias}); |
| 512 | } |
| 513 | break; |
| 514 | } |
| 515 | |
| 516 | case REMOVE_COLUMN: |
| 517 | block.erase(source_name); |
| 518 | break; |
| 519 | |
| 520 | case ADD_COLUMN: |
| 521 | block.insert({ added_column->cloneResized(input_rows_count), result_type, result_name }); |
| 522 | break; |
| 523 | |
| 524 | case COPY_COLUMN: |
| 525 | if (can_replace && block.has(result_name)) |
| 526 | { |
| 527 | auto & result = block.getByName(result_name); |
| 528 | const auto & source = block.getByName(source_name); |
| 529 | result.type = source.type; |
| 530 | result.column = source.column; |
| 531 | } |
| 532 | else |
| 533 | { |
| 534 | const auto & source_column = block.getByName(source_name); |
| 535 | block.insert({source_column.column, source_column.type, result_name}); |
| 536 | } |
| 537 | |
| 538 | break; |
| 539 | } |
| 540 | } |
| 541 | |
| 542 | |
| 543 | void ExpressionAction::executeOnTotals(Block & block) const |
| 544 | { |
| 545 | if (type != JOIN) |
| 546 | execute(block, false); |
| 547 | else |
| 548 | join->joinTotals(block); |
| 549 | } |
| 550 | |
| 551 | |
| 552 | std::string ExpressionAction::toString() const |
| 553 | { |
| 554 | std::stringstream ss; |
| 555 | switch (type) |
| 556 | { |
| 557 | case ADD_COLUMN: |
| 558 | ss << "ADD " << result_name << " " |
| 559 | << (result_type ? result_type->getName() : "(no type)" ) << " " |
| 560 | << (added_column ? added_column->getName() : "(no column)" ); |
| 561 | break; |
| 562 | |
| 563 | case REMOVE_COLUMN: |
| 564 | ss << "REMOVE " << source_name; |
| 565 | break; |
| 566 | |
| 567 | case COPY_COLUMN: |
| 568 | ss << "COPY " << result_name << " = " << source_name; |
| 569 | if (can_replace) |
| 570 | ss << " (can replace)" ; |
| 571 | break; |
| 572 | |
| 573 | case APPLY_FUNCTION: |
| 574 | ss << "FUNCTION " << result_name << " " << (is_function_compiled ? "[compiled] " : "" ) |
| 575 | << (result_type ? result_type->getName() : "(no type)" ) << " = " |
| 576 | << (function_base ? function_base->getName() : "(no function)" ) << "(" ; |
| 577 | for (size_t i = 0; i < argument_names.size(); ++i) |
| 578 | { |
| 579 | if (i) |
| 580 | ss << ", " ; |
| 581 | ss << argument_names[i]; |
| 582 | } |
| 583 | ss << ")" ; |
| 584 | break; |
| 585 | |
| 586 | case ARRAY_JOIN: |
| 587 | ss << (array_join_is_left ? "LEFT " : "" ) << "ARRAY JOIN " ; |
| 588 | for (NameSet::const_iterator it = array_joined_columns.begin(); it != array_joined_columns.end(); ++it) |
| 589 | { |
| 590 | if (it != array_joined_columns.begin()) |
| 591 | ss << ", " ; |
| 592 | ss << *it; |
| 593 | } |
| 594 | break; |
| 595 | |
| 596 | case JOIN: |
| 597 | ss << "JOIN " ; |
| 598 | for (NamesAndTypesList::const_iterator it = table_join->columnsAddedByJoin().begin(); |
| 599 | it != table_join->columnsAddedByJoin().end(); ++it) |
| 600 | { |
| 601 | if (it != table_join->columnsAddedByJoin().begin()) |
| 602 | ss << ", " ; |
| 603 | ss << it->name; |
| 604 | } |
| 605 | break; |
| 606 | |
| 607 | case PROJECT: [[fallthrough]]; |
| 608 | case ADD_ALIASES: |
| 609 | ss << (type == PROJECT ? "PROJECT " : "ADD_ALIASES " ); |
| 610 | for (size_t i = 0; i < projection.size(); ++i) |
| 611 | { |
| 612 | if (i) |
| 613 | ss << ", " ; |
| 614 | ss << projection[i].first; |
| 615 | if (projection[i].second != "" && projection[i].second != projection[i].first) |
| 616 | ss << " AS " << projection[i].second; |
| 617 | } |
| 618 | break; |
| 619 | } |
| 620 | |
| 621 | return ss.str(); |
| 622 | } |
| 623 | |
| 624 | void ExpressionActions::checkLimits(Block & block) const |
| 625 | { |
| 626 | if (settings.max_temporary_columns && block.columns() > settings.max_temporary_columns) |
| 627 | throw Exception("Too many temporary columns: " + block.dumpNames() |
| 628 | + ". Maximum: " + settings.max_temporary_columns.toString(), |
| 629 | ErrorCodes::TOO_MANY_TEMPORARY_COLUMNS); |
| 630 | |
| 631 | if (settings.max_temporary_non_const_columns) |
| 632 | { |
| 633 | size_t non_const_columns = 0; |
| 634 | for (size_t i = 0, size = block.columns(); i < size; ++i) |
| 635 | if (block.safeGetByPosition(i).column && !isColumnConst(*block.safeGetByPosition(i).column)) |
| 636 | ++non_const_columns; |
| 637 | |
| 638 | if (non_const_columns > settings.max_temporary_non_const_columns) |
| 639 | { |
| 640 | std::stringstream list_of_non_const_columns; |
| 641 | for (size_t i = 0, size = block.columns(); i < size; ++i) |
| 642 | if (block.safeGetByPosition(i).column && !isColumnConst(*block.safeGetByPosition(i).column)) |
| 643 | list_of_non_const_columns << "\n" << block.safeGetByPosition(i).name; |
| 644 | |
| 645 | throw Exception("Too many temporary non-const columns:" + list_of_non_const_columns.str() |
| 646 | + ". Maximum: " + settings.max_temporary_non_const_columns.toString(), |
| 647 | ErrorCodes::TOO_MANY_TEMPORARY_NON_CONST_COLUMNS); |
| 648 | } |
| 649 | } |
| 650 | } |
| 651 | |
| 652 | void ExpressionActions::addInput(const ColumnWithTypeAndName & column) |
| 653 | { |
| 654 | input_columns.emplace_back(column.name, column.type); |
| 655 | sample_block.insert(column); |
| 656 | } |
| 657 | |
| 658 | void ExpressionActions::addInput(const NameAndTypePair & column) |
| 659 | { |
| 660 | addInput(ColumnWithTypeAndName(nullptr, column.type, column.name)); |
| 661 | } |
| 662 | |
| 663 | void ExpressionActions::add(const ExpressionAction & action, Names & out_new_columns) |
| 664 | { |
| 665 | addImpl(action, out_new_columns); |
| 666 | } |
| 667 | |
| 668 | void ExpressionActions::add(const ExpressionAction & action) |
| 669 | { |
| 670 | Names new_names; |
| 671 | addImpl(action, new_names); |
| 672 | } |
| 673 | |
| 674 | void ExpressionActions::addImpl(ExpressionAction action, Names & new_names) |
| 675 | { |
| 676 | if (action.result_name != "" ) |
| 677 | new_names.push_back(action.result_name); |
| 678 | new_names.insert(new_names.end(), action.array_joined_columns.begin(), action.array_joined_columns.end()); |
| 679 | |
| 680 | /// Compiled functions are custom functions and they don't need building |
| 681 | if (action.type == ExpressionAction::APPLY_FUNCTION && !action.is_function_compiled) |
| 682 | { |
| 683 | if (sample_block.has(action.result_name)) |
| 684 | throw Exception("Column '" + action.result_name + "' already exists" , ErrorCodes::DUPLICATE_COLUMN); |
| 685 | |
| 686 | ColumnsWithTypeAndName arguments(action.argument_names.size()); |
| 687 | for (size_t i = 0; i < action.argument_names.size(); ++i) |
| 688 | { |
| 689 | if (!sample_block.has(action.argument_names[i])) |
| 690 | throw Exception("Unknown identifier: '" + action.argument_names[i] + "'" , ErrorCodes::UNKNOWN_IDENTIFIER); |
| 691 | arguments[i] = sample_block.getByName(action.argument_names[i]); |
| 692 | } |
| 693 | |
| 694 | action.function_base = action.function_builder->build(arguments); |
| 695 | action.result_type = action.function_base->getReturnType(); |
| 696 | } |
| 697 | |
| 698 | if (action.type == ExpressionAction::ADD_ALIASES) |
| 699 | for (const auto & name_with_alias : action.projection) |
| 700 | new_names.emplace_back(name_with_alias.second); |
| 701 | |
| 702 | action.prepare(sample_block, settings, names_not_for_constant_folding); |
| 703 | actions.push_back(action); |
| 704 | } |
| 705 | |
| 706 | void ExpressionActions::prependProjectInput() |
| 707 | { |
| 708 | actions.insert(actions.begin(), ExpressionAction::project(getRequiredColumns())); |
| 709 | } |
| 710 | |
| 711 | void ExpressionActions::prependArrayJoin(const ExpressionAction & action, const Block & sample_block_before) |
| 712 | { |
| 713 | if (action.type != ExpressionAction::ARRAY_JOIN) |
| 714 | throw Exception("ARRAY_JOIN action expected" , ErrorCodes::LOGICAL_ERROR); |
| 715 | |
| 716 | NameSet array_join_set(action.array_joined_columns.begin(), action.array_joined_columns.end()); |
| 717 | for (auto & it : input_columns) |
| 718 | { |
| 719 | if (array_join_set.count(it.name)) |
| 720 | { |
| 721 | array_join_set.erase(it.name); |
| 722 | it.type = std::make_shared<DataTypeArray>(it.type); |
| 723 | } |
| 724 | } |
| 725 | for (const std::string & name : array_join_set) |
| 726 | { |
| 727 | input_columns.emplace_back(name, sample_block_before.getByName(name).type); |
| 728 | actions.insert(actions.begin(), ExpressionAction::removeColumn(name)); |
| 729 | } |
| 730 | |
| 731 | actions.insert(actions.begin(), action); |
| 732 | optimizeArrayJoin(); |
| 733 | } |
| 734 | |
| 735 | |
| 736 | bool ExpressionActions::popUnusedArrayJoin(const Names & required_columns, ExpressionAction & out_action) |
| 737 | { |
| 738 | if (actions.empty() || actions.back().type != ExpressionAction::ARRAY_JOIN) |
| 739 | return false; |
| 740 | NameSet required_set(required_columns.begin(), required_columns.end()); |
| 741 | for (const std::string & name : actions.back().array_joined_columns) |
| 742 | { |
| 743 | if (required_set.count(name)) |
| 744 | return false; |
| 745 | } |
| 746 | for (const std::string & name : actions.back().array_joined_columns) |
| 747 | { |
| 748 | DataTypePtr & type = sample_block.getByName(name).type; |
| 749 | type = std::make_shared<DataTypeArray>(type); |
| 750 | } |
| 751 | out_action = actions.back(); |
| 752 | actions.pop_back(); |
| 753 | return true; |
| 754 | } |
| 755 | |
| 756 | void ExpressionActions::execute(Block & block, bool dry_run) const |
| 757 | { |
| 758 | for (const auto & action : actions) |
| 759 | { |
| 760 | action.execute(block, dry_run); |
| 761 | checkLimits(block); |
| 762 | } |
| 763 | } |
| 764 | |
| 765 | bool ExpressionActions::hasTotalsInJoin() const |
| 766 | { |
| 767 | for (const auto & action : actions) |
| 768 | if (action.table_join && action.join->hasTotals()) |
| 769 | return true; |
| 770 | return false; |
| 771 | } |
| 772 | |
| 773 | void ExpressionActions::executeOnTotals(Block & block) const |
| 774 | { |
| 775 | /// If there is `totals` in the subquery for JOIN, but we do not have totals, then take the block with the default values instead of `totals`. |
| 776 | if (!block) |
| 777 | { |
| 778 | if (hasTotalsInJoin()) |
| 779 | { |
| 780 | for (const auto & name_and_type : input_columns) |
| 781 | { |
| 782 | auto column = name_and_type.type->createColumn(); |
| 783 | column->insertDefault(); |
| 784 | block.insert(ColumnWithTypeAndName(std::move(column), name_and_type.type, name_and_type.name)); |
| 785 | } |
| 786 | } |
| 787 | else |
| 788 | return; /// There's nothing to JOIN. |
| 789 | } |
| 790 | |
| 791 | for (const auto & action : actions) |
| 792 | action.executeOnTotals(block); |
| 793 | } |
| 794 | |
| 795 | std::string ExpressionActions::getSmallestColumn(const NamesAndTypesList & columns) |
| 796 | { |
| 797 | std::optional<size_t> min_size; |
| 798 | String res; |
| 799 | |
| 800 | for (const auto & column : columns) |
| 801 | { |
| 802 | /// @todo resolve evil constant |
| 803 | size_t size = column.type->haveMaximumSizeOfValue() ? column.type->getMaximumSizeOfValueInMemory() : 100; |
| 804 | |
| 805 | if (!min_size || size < *min_size) |
| 806 | { |
| 807 | min_size = size; |
| 808 | res = column.name; |
| 809 | } |
| 810 | } |
| 811 | |
| 812 | if (!min_size) |
| 813 | throw Exception("No available columns" , ErrorCodes::LOGICAL_ERROR); |
| 814 | |
| 815 | return res; |
| 816 | } |
| 817 | |
| 818 | void ExpressionActions::finalize(const Names & output_columns) |
| 819 | { |
| 820 | NameSet final_columns; |
| 821 | for (size_t i = 0; i < output_columns.size(); ++i) |
| 822 | { |
| 823 | const std::string & name = output_columns[i]; |
| 824 | if (!sample_block.has(name)) |
| 825 | throw Exception("Unknown column: " + name + ", there are only columns " |
| 826 | + sample_block.dumpNames(), ErrorCodes::UNKNOWN_IDENTIFIER); |
| 827 | final_columns.insert(name); |
| 828 | } |
| 829 | |
| 830 | #if USE_EMBEDDED_COMPILER |
| 831 | /// This has to be done before removing redundant actions and inserting REMOVE_COLUMNs |
| 832 | /// because inlining may change dependency sets. |
| 833 | if (settings.compile_expressions) |
| 834 | compileFunctions(actions, output_columns, sample_block, compilation_cache, settings.min_count_to_compile_expression); |
| 835 | #endif |
| 836 | |
| 837 | /// Which columns are needed to perform actions from the current to the last. |
| 838 | NameSet needed_columns = final_columns; |
| 839 | /// Which columns nobody will touch from the current action to the last. |
| 840 | NameSet unmodified_columns; |
| 841 | |
| 842 | { |
| 843 | NamesAndTypesList sample_columns = sample_block.getNamesAndTypesList(); |
| 844 | for (NamesAndTypesList::iterator it = sample_columns.begin(); it != sample_columns.end(); ++it) |
| 845 | unmodified_columns.insert(it->name); |
| 846 | } |
| 847 | |
| 848 | /// Let's go from the end and maintain set of required columns at this stage. |
| 849 | /// We will throw out unnecessary actions, although usually they are absent by construction. |
| 850 | for (int i = static_cast<int>(actions.size()) - 1; i >= 0; --i) |
| 851 | { |
| 852 | ExpressionAction & action = actions[i]; |
| 853 | Names in = action.getNeededColumns(); |
| 854 | |
| 855 | if (action.type == ExpressionAction::PROJECT) |
| 856 | { |
| 857 | needed_columns = NameSet(in.begin(), in.end()); |
| 858 | unmodified_columns.clear(); |
| 859 | } |
| 860 | else if (action.type == ExpressionAction::ADD_ALIASES) |
| 861 | { |
| 862 | needed_columns.insert(in.begin(), in.end()); |
| 863 | for (auto & name_wit_alias : action.projection) |
| 864 | { |
| 865 | auto it = unmodified_columns.find(name_wit_alias.second); |
| 866 | if (it != unmodified_columns.end()) |
| 867 | unmodified_columns.erase(it); |
| 868 | } |
| 869 | } |
| 870 | else if (action.type == ExpressionAction::ARRAY_JOIN) |
| 871 | { |
| 872 | /// Do not ARRAY JOIN columns that are not used anymore. |
| 873 | /// Usually, such columns are not used until ARRAY JOIN, and therefore are ejected further in this function. |
| 874 | /// We will not remove all the columns so as not to lose the number of rows. |
| 875 | for (auto it = action.array_joined_columns.begin(); it != action.array_joined_columns.end();) |
| 876 | { |
| 877 | bool need = needed_columns.count(*it); |
| 878 | if (!need && action.array_joined_columns.size() > 1) |
| 879 | { |
| 880 | action.array_joined_columns.erase(it++); |
| 881 | } |
| 882 | else |
| 883 | { |
| 884 | needed_columns.insert(*it); |
| 885 | unmodified_columns.erase(*it); |
| 886 | |
| 887 | /// If no ARRAY JOIN results are used, forcibly leave an arbitrary column at the output, |
| 888 | /// so you do not lose the number of rows. |
| 889 | if (!need) |
| 890 | final_columns.insert(*it); |
| 891 | |
| 892 | ++it; |
| 893 | } |
| 894 | } |
| 895 | } |
| 896 | else |
| 897 | { |
| 898 | std::string out = action.result_name; |
| 899 | if (!out.empty()) |
| 900 | { |
| 901 | /// If the result is not used and there are no side effects, throw out the action. |
| 902 | if (!needed_columns.count(out) && |
| 903 | (action.type == ExpressionAction::APPLY_FUNCTION |
| 904 | || action.type == ExpressionAction::ADD_COLUMN |
| 905 | || action.type == ExpressionAction::COPY_COLUMN)) |
| 906 | { |
| 907 | actions.erase(actions.begin() + i); |
| 908 | |
| 909 | if (unmodified_columns.count(out)) |
| 910 | { |
| 911 | sample_block.erase(out); |
| 912 | unmodified_columns.erase(out); |
| 913 | } |
| 914 | |
| 915 | continue; |
| 916 | } |
| 917 | |
| 918 | unmodified_columns.erase(out); |
| 919 | needed_columns.erase(out); |
| 920 | |
| 921 | /** If the function is a constant expression, then replace the action by adding a column-constant - result. |
| 922 | * That is, we perform constant folding. |
| 923 | */ |
| 924 | if (action.type == ExpressionAction::APPLY_FUNCTION && sample_block.has(out)) |
| 925 | { |
| 926 | auto & result = sample_block.getByName(out); |
| 927 | if (result.column && names_not_for_constant_folding.count(result.name) == 0) |
| 928 | { |
| 929 | action.type = ExpressionAction::ADD_COLUMN; |
| 930 | action.result_type = result.type; |
| 931 | action.added_column = result.column; |
| 932 | action.function_builder = nullptr; |
| 933 | action.function_base = nullptr; |
| 934 | action.function = nullptr; |
| 935 | action.argument_names.clear(); |
| 936 | in.clear(); |
| 937 | } |
| 938 | } |
| 939 | } |
| 940 | |
| 941 | needed_columns.insert(in.begin(), in.end()); |
| 942 | } |
| 943 | } |
| 944 | |
| 945 | |
| 946 | /// 1) Sometimes we don't need any columns to perform actions and sometimes actions doesn't produce any columns as result. |
| 947 | /// But Block class doesn't store any information about structure itself, it uses information from column. |
| 948 | /// If we remove all columns from input or output block we will lose information about amount of rows in it. |
| 949 | /// To avoid this situation we always leaving one of the columns in required columns (input) |
| 950 | /// and output column. We choose that "redundant" column by size with help of getSmallestColumn. |
| 951 | /// |
| 952 | /// 2) Sometimes we have to read data from different Storages to execute query. |
| 953 | /// For example in 'remote' function which requires to read data from local table (for example MergeTree) and |
| 954 | /// remote table (doesn't know anything about it). |
| 955 | /// |
| 956 | /// If we have combination of two previous cases, our heuristic from (1) can choose absolutely different columns, |
| 957 | /// so generated streams with these actions will have different headers. To avoid this we addionaly rename our "redundant" column |
| 958 | /// to DUMMY_COLUMN_NAME with help of COPY_COLUMN action and consequent remove of original column. |
| 959 | /// It doesn't affect any logic, but all streams will have same "redundant" column in header called "_dummy". |
| 960 | |
| 961 | /// Also, it seems like we will always have same type (UInt8) of "redundant" column, but it's not obvious. |
| 962 | |
| 963 | bool dummy_column_copied = false; |
| 964 | |
| 965 | |
| 966 | /// We will not throw out all the input columns, so as not to lose the number of rows in the block. |
| 967 | if (needed_columns.empty() && !input_columns.empty()) |
| 968 | { |
| 969 | auto colname = getSmallestColumn(input_columns); |
| 970 | needed_columns.insert(colname); |
| 971 | actions.insert(actions.begin(), ExpressionAction::copyColumn(colname, DUMMY_COLUMN_NAME, true)); |
| 972 | dummy_column_copied = true; |
| 973 | } |
| 974 | |
| 975 | /// We will not leave the block empty so as not to lose the number of rows in it. |
| 976 | if (final_columns.empty() && !input_columns.empty()) |
| 977 | { |
| 978 | auto colname = getSmallestColumn(input_columns); |
| 979 | final_columns.insert(DUMMY_COLUMN_NAME); |
| 980 | if (!dummy_column_copied) /// otherwise we already have this column |
| 981 | actions.insert(actions.begin(), ExpressionAction::copyColumn(colname, DUMMY_COLUMN_NAME, true)); |
| 982 | } |
| 983 | |
| 984 | for (NamesAndTypesList::iterator it = input_columns.begin(); it != input_columns.end();) |
| 985 | { |
| 986 | NamesAndTypesList::iterator it0 = it; |
| 987 | ++it; |
| 988 | if (!needed_columns.count(it0->name)) |
| 989 | { |
| 990 | if (unmodified_columns.count(it0->name)) |
| 991 | sample_block.erase(it0->name); |
| 992 | input_columns.erase(it0); |
| 993 | } |
| 994 | } |
| 995 | |
| 996 | /* std::cerr << "\n"; |
| 997 | for (const auto & action : actions) |
| 998 | std::cerr << action.toString() << "\n"; |
| 999 | std::cerr << "\n";*/ |
| 1000 | |
| 1001 | /// Deletes unnecessary temporary columns. |
| 1002 | |
| 1003 | /// If the column after performing the function `refcount = 0`, it can be deleted. |
| 1004 | std::map<String, int> columns_refcount; |
| 1005 | |
| 1006 | for (const auto & name : final_columns) |
| 1007 | ++columns_refcount[name]; |
| 1008 | |
| 1009 | for (const auto & action : actions) |
| 1010 | { |
| 1011 | if (!action.source_name.empty()) |
| 1012 | ++columns_refcount[action.source_name]; |
| 1013 | |
| 1014 | for (const auto & name : action.argument_names) |
| 1015 | ++columns_refcount[name]; |
| 1016 | |
| 1017 | for (const auto & name_alias : action.projection) |
| 1018 | ++columns_refcount[name_alias.first]; |
| 1019 | } |
| 1020 | |
| 1021 | Actions new_actions; |
| 1022 | new_actions.reserve(actions.size()); |
| 1023 | |
| 1024 | for (const auto & action : actions) |
| 1025 | { |
| 1026 | new_actions.push_back(action); |
| 1027 | |
| 1028 | auto process = [&] (const String & name) |
| 1029 | { |
| 1030 | auto refcount = --columns_refcount[name]; |
| 1031 | if (refcount <= 0) |
| 1032 | { |
| 1033 | new_actions.push_back(ExpressionAction::removeColumn(name)); |
| 1034 | if (sample_block.has(name)) |
| 1035 | sample_block.erase(name); |
| 1036 | } |
| 1037 | }; |
| 1038 | |
| 1039 | if (!action.source_name.empty()) |
| 1040 | process(action.source_name); |
| 1041 | |
| 1042 | for (const auto & name : action.argument_names) |
| 1043 | process(name); |
| 1044 | |
| 1045 | /// For `projection`, there is no reduction in `refcount`, because the `project` action replaces the names of the columns, in effect, already deleting them under the old names. |
| 1046 | } |
| 1047 | |
| 1048 | actions.swap(new_actions); |
| 1049 | |
| 1050 | /* std::cerr << "\n"; |
| 1051 | for (const auto & action : actions) |
| 1052 | std::cerr << action.toString() << "\n"; |
| 1053 | std::cerr << "\n";*/ |
| 1054 | |
| 1055 | optimizeArrayJoin(); |
| 1056 | checkLimits(sample_block); |
| 1057 | } |
| 1058 | |
| 1059 | |
| 1060 | std::string ExpressionActions::dumpActions() const |
| 1061 | { |
| 1062 | std::stringstream ss; |
| 1063 | |
| 1064 | ss << "input:\n" ; |
| 1065 | for (NamesAndTypesList::const_iterator it = input_columns.begin(); it != input_columns.end(); ++it) |
| 1066 | ss << it->name << " " << it->type->getName() << "\n" ; |
| 1067 | |
| 1068 | ss << "\nactions:\n" ; |
| 1069 | for (size_t i = 0; i < actions.size(); ++i) |
| 1070 | ss << actions[i].toString() << '\n'; |
| 1071 | |
| 1072 | ss << "\noutput:\n" ; |
| 1073 | NamesAndTypesList output_columns = sample_block.getNamesAndTypesList(); |
| 1074 | for (NamesAndTypesList::const_iterator it = output_columns.begin(); it != output_columns.end(); ++it) |
| 1075 | ss << it->name << " " << it->type->getName() << "\n" ; |
| 1076 | |
| 1077 | return ss.str(); |
| 1078 | } |
| 1079 | |
| 1080 | void ExpressionActions::optimizeArrayJoin() |
| 1081 | { |
| 1082 | const size_t NONE = actions.size(); |
| 1083 | size_t first_array_join = NONE; |
| 1084 | |
| 1085 | /// Columns that need to be evaluated for arrayJoin. |
| 1086 | /// Actions for adding them can not be moved to the left of the arrayJoin. |
| 1087 | NameSet array_joined_columns; |
| 1088 | |
| 1089 | /// Columns needed to evaluate arrayJoin or those that depend on it. |
| 1090 | /// Actions to delete them can not be moved to the left of the arrayJoin. |
| 1091 | NameSet array_join_dependencies; |
| 1092 | |
| 1093 | for (size_t i = 0; i < actions.size(); ++i) |
| 1094 | { |
| 1095 | /// Do not move the action to the right of the projection (the more that they are not usually there). |
| 1096 | if (actions[i].type == ExpressionAction::PROJECT) |
| 1097 | break; |
| 1098 | |
| 1099 | bool depends_on_array_join = false; |
| 1100 | Names needed; |
| 1101 | |
| 1102 | if (actions[i].type == ExpressionAction::ARRAY_JOIN) |
| 1103 | { |
| 1104 | depends_on_array_join = true; |
| 1105 | needed = actions[i].getNeededColumns(); |
| 1106 | } |
| 1107 | else |
| 1108 | { |
| 1109 | if (first_array_join == NONE) |
| 1110 | continue; |
| 1111 | |
| 1112 | needed = actions[i].getNeededColumns(); |
| 1113 | |
| 1114 | for (size_t j = 0; j < needed.size(); ++j) |
| 1115 | { |
| 1116 | if (array_joined_columns.count(needed[j])) |
| 1117 | { |
| 1118 | depends_on_array_join = true; |
| 1119 | break; |
| 1120 | } |
| 1121 | } |
| 1122 | } |
| 1123 | |
| 1124 | if (depends_on_array_join) |
| 1125 | { |
| 1126 | if (first_array_join == NONE) |
| 1127 | first_array_join = i; |
| 1128 | |
| 1129 | if (actions[i].result_name != "" ) |
| 1130 | array_joined_columns.insert(actions[i].result_name); |
| 1131 | array_joined_columns.insert(actions[i].array_joined_columns.begin(), actions[i].array_joined_columns.end()); |
| 1132 | |
| 1133 | array_join_dependencies.insert(needed.begin(), needed.end()); |
| 1134 | } |
| 1135 | else |
| 1136 | { |
| 1137 | bool can_move = false; |
| 1138 | |
| 1139 | if (actions[i].type == ExpressionAction::REMOVE_COLUMN) |
| 1140 | { |
| 1141 | /// If you delete a column that is not needed for arrayJoin (and those who depend on it), you can delete it before arrayJoin. |
| 1142 | can_move = !array_join_dependencies.count(actions[i].source_name); |
| 1143 | } |
| 1144 | else |
| 1145 | { |
| 1146 | /// If the action does not delete the columns and does not depend on the result of arrayJoin, you can make it until arrayJoin. |
| 1147 | can_move = true; |
| 1148 | } |
| 1149 | |
| 1150 | /// Move the current action to the position just before the first arrayJoin. |
| 1151 | if (can_move) |
| 1152 | { |
| 1153 | /// Move the i-th element to the position `first_array_join`. |
| 1154 | std::rotate(actions.begin() + first_array_join, actions.begin() + i, actions.begin() + i + 1); |
| 1155 | ++first_array_join; |
| 1156 | } |
| 1157 | } |
| 1158 | } |
| 1159 | } |
| 1160 | |
| 1161 | |
| 1162 | JoinPtr ExpressionActions::getTableJoinAlgo() const |
| 1163 | { |
| 1164 | for (const auto & action : actions) |
| 1165 | if (action.join) |
| 1166 | return action.join; |
| 1167 | return {}; |
| 1168 | } |
| 1169 | |
| 1170 | |
| 1171 | bool ExpressionActions::resultIsAlwaysEmpty() const |
| 1172 | { |
| 1173 | /// Check that has join which returns empty result. |
| 1174 | |
| 1175 | for (auto & action : actions) |
| 1176 | { |
| 1177 | if (action.type == action.JOIN && action.join && action.join->alwaysReturnsEmptySet()) |
| 1178 | return true; |
| 1179 | } |
| 1180 | |
| 1181 | return false; |
| 1182 | } |
| 1183 | |
| 1184 | |
| 1185 | bool ExpressionActions::checkColumnIsAlwaysFalse(const String & column_name) const |
| 1186 | { |
| 1187 | /// Check has column in (empty set). |
| 1188 | String set_to_check; |
| 1189 | |
| 1190 | for (auto it = actions.rbegin(); it != actions.rend(); ++it) |
| 1191 | { |
| 1192 | auto & action = *it; |
| 1193 | if (action.type == action.APPLY_FUNCTION && action.function_base) |
| 1194 | { |
| 1195 | auto name = action.function_base->getName(); |
| 1196 | if ((name == "in" || name == "globalIn" ) |
| 1197 | && action.result_name == column_name |
| 1198 | && action.argument_names.size() > 1) |
| 1199 | { |
| 1200 | set_to_check = action.argument_names[1]; |
| 1201 | break; |
| 1202 | } |
| 1203 | } |
| 1204 | } |
| 1205 | |
| 1206 | if (!set_to_check.empty()) |
| 1207 | { |
| 1208 | for (auto & action : actions) |
| 1209 | { |
| 1210 | if (action.type == action.ADD_COLUMN && action.result_name == set_to_check) |
| 1211 | { |
| 1212 | // Constant ColumnSet cannot be empty, so we only need to check non-constant ones. |
| 1213 | if (auto * column_set = checkAndGetColumn<const ColumnSet>(action.added_column.get())) |
| 1214 | { |
| 1215 | if (column_set->getData()->isCreated() && column_set->getData()->getTotalRowCount() == 0) |
| 1216 | return true; |
| 1217 | } |
| 1218 | } |
| 1219 | } |
| 1220 | } |
| 1221 | |
| 1222 | return false; |
| 1223 | } |
| 1224 | |
| 1225 | |
| 1226 | /// It is not important to calculate the hash of individual strings or their concatenation |
| 1227 | UInt128 ExpressionAction::ActionHash::operator()(const ExpressionAction & action) const |
| 1228 | { |
| 1229 | SipHash hash; |
| 1230 | hash.update(action.type); |
| 1231 | hash.update(action.is_function_compiled); |
| 1232 | switch (action.type) |
| 1233 | { |
| 1234 | case ADD_COLUMN: |
| 1235 | hash.update(action.result_name); |
| 1236 | if (action.result_type) |
| 1237 | hash.update(action.result_type->getName()); |
| 1238 | if (action.added_column) |
| 1239 | hash.update(action.added_column->getName()); |
| 1240 | break; |
| 1241 | case REMOVE_COLUMN: |
| 1242 | hash.update(action.source_name); |
| 1243 | break; |
| 1244 | case COPY_COLUMN: |
| 1245 | hash.update(action.result_name); |
| 1246 | hash.update(action.source_name); |
| 1247 | break; |
| 1248 | case APPLY_FUNCTION: |
| 1249 | hash.update(action.result_name); |
| 1250 | if (action.result_type) |
| 1251 | hash.update(action.result_type->getName()); |
| 1252 | if (action.function_base) |
| 1253 | { |
| 1254 | hash.update(action.function_base->getName()); |
| 1255 | for (const auto & arg_type : action.function_base->getArgumentTypes()) |
| 1256 | hash.update(arg_type->getName()); |
| 1257 | } |
| 1258 | for (const auto & arg_name : action.argument_names) |
| 1259 | hash.update(arg_name); |
| 1260 | break; |
| 1261 | case ARRAY_JOIN: |
| 1262 | hash.update(action.array_join_is_left); |
| 1263 | for (const auto & col : action.array_joined_columns) |
| 1264 | hash.update(col); |
| 1265 | break; |
| 1266 | case JOIN: |
| 1267 | for (const auto & col : action.table_join->columnsAddedByJoin()) |
| 1268 | hash.update(col.name); |
| 1269 | break; |
| 1270 | case PROJECT: |
| 1271 | for (const auto & pair_of_strs : action.projection) |
| 1272 | { |
| 1273 | hash.update(pair_of_strs.first); |
| 1274 | hash.update(pair_of_strs.second); |
| 1275 | } |
| 1276 | break; |
| 1277 | case ADD_ALIASES: |
| 1278 | break; |
| 1279 | } |
| 1280 | UInt128 result; |
| 1281 | hash.get128(result.low, result.high); |
| 1282 | return result; |
| 1283 | } |
| 1284 | |
| 1285 | bool ExpressionAction::operator==(const ExpressionAction & other) const |
| 1286 | { |
| 1287 | if (result_type != other.result_type) |
| 1288 | { |
| 1289 | if (result_type == nullptr || other.result_type == nullptr) |
| 1290 | return false; |
| 1291 | else if (!result_type->equals(*other.result_type)) |
| 1292 | return false; |
| 1293 | } |
| 1294 | |
| 1295 | if (function_base != other.function_base) |
| 1296 | { |
| 1297 | if (function_base == nullptr || other.function_base == nullptr) |
| 1298 | return false; |
| 1299 | else if (function_base->getName() != other.function_base->getName()) |
| 1300 | return false; |
| 1301 | |
| 1302 | const auto & my_arg_types = function_base->getArgumentTypes(); |
| 1303 | const auto & other_arg_types = other.function_base->getArgumentTypes(); |
| 1304 | if (my_arg_types.size() != other_arg_types.size()) |
| 1305 | return false; |
| 1306 | |
| 1307 | for (size_t i = 0; i < my_arg_types.size(); ++i) |
| 1308 | if (!my_arg_types[i]->equals(*other_arg_types[i])) |
| 1309 | return false; |
| 1310 | } |
| 1311 | |
| 1312 | if (added_column != other.added_column) |
| 1313 | { |
| 1314 | if (added_column == nullptr || other.added_column == nullptr) |
| 1315 | return false; |
| 1316 | else if (added_column->getName() != other.added_column->getName()) |
| 1317 | return false; |
| 1318 | } |
| 1319 | |
| 1320 | return source_name == other.source_name |
| 1321 | && result_name == other.result_name |
| 1322 | && argument_names == other.argument_names |
| 1323 | && array_joined_columns == other.array_joined_columns |
| 1324 | && array_join_is_left == other.array_join_is_left |
| 1325 | && AnalyzedJoin::sameJoin(table_join.get(), other.table_join.get()) |
| 1326 | && projection == other.projection |
| 1327 | && is_function_compiled == other.is_function_compiled; |
| 1328 | } |
| 1329 | |
| 1330 | void ExpressionActionsChain::addStep() |
| 1331 | { |
| 1332 | if (steps.empty()) |
| 1333 | throw Exception("Cannot add action to empty ExpressionActionsChain" , ErrorCodes::LOGICAL_ERROR); |
| 1334 | |
| 1335 | ColumnsWithTypeAndName columns = steps.back().actions->getSampleBlock().getColumnsWithTypeAndName(); |
| 1336 | steps.push_back(Step(std::make_shared<ExpressionActions>(columns, context))); |
| 1337 | } |
| 1338 | |
| 1339 | void ExpressionActionsChain::finalize() |
| 1340 | { |
| 1341 | /// Finalize all steps. Right to left to define unnecessary input columns. |
| 1342 | for (int i = static_cast<int>(steps.size()) - 1; i >= 0; --i) |
| 1343 | { |
| 1344 | Names required_output = steps[i].required_output; |
| 1345 | std::unordered_map<String, size_t> required_output_indexes; |
| 1346 | for (size_t j = 0; j < required_output.size(); ++j) |
| 1347 | required_output_indexes[required_output[j]] = j; |
| 1348 | auto & can_remove_required_output = steps[i].can_remove_required_output; |
| 1349 | |
| 1350 | if (i + 1 < static_cast<int>(steps.size())) |
| 1351 | { |
| 1352 | const NameSet & additional_input = steps[i + 1].additional_input; |
| 1353 | for (const auto & it : steps[i + 1].actions->getRequiredColumnsWithTypes()) |
| 1354 | { |
| 1355 | if (additional_input.count(it.name) == 0) |
| 1356 | { |
| 1357 | auto iter = required_output_indexes.find(it.name); |
| 1358 | if (iter == required_output_indexes.end()) |
| 1359 | required_output.push_back(it.name); |
| 1360 | else if (!can_remove_required_output.empty()) |
| 1361 | can_remove_required_output[iter->second] = false; |
| 1362 | } |
| 1363 | } |
| 1364 | } |
| 1365 | steps[i].actions->finalize(required_output); |
| 1366 | } |
| 1367 | |
| 1368 | /// When possible, move the ARRAY JOIN from earlier steps to later steps. |
| 1369 | for (size_t i = 1; i < steps.size(); ++i) |
| 1370 | { |
| 1371 | ExpressionAction action; |
| 1372 | if (steps[i - 1].actions->popUnusedArrayJoin(steps[i - 1].required_output, action)) |
| 1373 | steps[i].actions->prependArrayJoin(action, steps[i - 1].actions->getSampleBlock()); |
| 1374 | } |
| 1375 | |
| 1376 | /// Adding the ejection of unnecessary columns to the beginning of each step. |
| 1377 | for (size_t i = 1; i < steps.size(); ++i) |
| 1378 | { |
| 1379 | size_t columns_from_previous = steps[i - 1].actions->getSampleBlock().columns(); |
| 1380 | |
| 1381 | /// If unnecessary columns are formed at the output of the previous step, we'll add them to the beginning of this step. |
| 1382 | /// Except when we drop all the columns and lose the number of rows in the block. |
| 1383 | if (!steps[i].actions->getRequiredColumnsWithTypes().empty() |
| 1384 | && columns_from_previous > steps[i].actions->getRequiredColumnsWithTypes().size()) |
| 1385 | steps[i].actions->prependProjectInput(); |
| 1386 | } |
| 1387 | } |
| 1388 | |
| 1389 | std::string ExpressionActionsChain::dumpChain() |
| 1390 | { |
| 1391 | std::stringstream ss; |
| 1392 | |
| 1393 | for (size_t i = 0; i < steps.size(); ++i) |
| 1394 | { |
| 1395 | ss << "step " << i << "\n" ; |
| 1396 | ss << "required output:\n" ; |
| 1397 | for (const std::string & name : steps[i].required_output) |
| 1398 | ss << name << "\n" ; |
| 1399 | ss << "\n" << steps[i].actions->dumpActions() << "\n" ; |
| 1400 | } |
| 1401 | |
| 1402 | return ss.str(); |
| 1403 | } |
| 1404 | |
| 1405 | } |
| 1406 | |