| 1 | #include <Common/formatReadable.h> |
| 2 | #include <Common/PODArray.h> |
| 3 | #include <Common/typeid_cast.h> |
| 4 | |
| 5 | #include <IO/ConcatReadBuffer.h> |
| 6 | #include <IO/WriteBufferFromFile.h> |
| 7 | #include <IO/WriteBufferFromVector.h> |
| 8 | #include <IO/LimitReadBuffer.h> |
| 9 | #include <IO/copyData.h> |
| 10 | |
| 11 | #include <DataStreams/BlockIO.h> |
| 12 | #include <DataStreams/copyData.h> |
| 13 | #include <DataStreams/IBlockInputStream.h> |
| 14 | #include <DataStreams/InputStreamFromASTInsertQuery.h> |
| 15 | #include <DataStreams/CountingBlockOutputStream.h> |
| 16 | |
| 17 | #include <Parsers/ASTInsertQuery.h> |
| 18 | #include <Parsers/ASTShowProcesslistQuery.h> |
| 19 | #include <Parsers/ASTIdentifier.h> |
| 20 | #include <Parsers/ASTLiteral.h> |
| 21 | #include <Parsers/ParserQuery.h> |
| 22 | #include <Parsers/parseQuery.h> |
| 23 | #include <Parsers/queryToString.h> |
| 24 | |
| 25 | #include <Storages/StorageInput.h> |
| 26 | |
| 27 | #include <Access/QuotaContext.h> |
| 28 | #include <Interpreters/InterpreterFactory.h> |
| 29 | #include <Interpreters/ProcessList.h> |
| 30 | #include <Interpreters/QueryLog.h> |
| 31 | #include <Interpreters/InterpreterSetQuery.h> |
| 32 | #include <Interpreters/ReplaceQueryParameterVisitor.h> |
| 33 | #include <Interpreters/executeQuery.h> |
| 34 | #include <Common/ProfileEvents.h> |
| 35 | |
| 36 | #include <Interpreters/DNSCacheUpdater.h> |
| 37 | #include <Common/SensitiveDataMasker.h> |
| 38 | |
| 39 | #include <Processors/Transforms/LimitsCheckingTransform.h> |
| 40 | #include <Processors/Transforms/MaterializingTransform.h> |
| 41 | #include <Processors/Formats/IOutputFormat.h> |
| 42 | #include <Parsers/ASTWatchQuery.h> |
| 43 | |
| 44 | namespace ProfileEvents |
| 45 | { |
| 46 | extern const Event QueryMaskingRulesMatch; |
| 47 | } |
| 48 | |
| 49 | namespace DB |
| 50 | { |
| 51 | |
| 52 | namespace ErrorCodes |
| 53 | { |
| 54 | extern const int LOGICAL_ERROR; |
| 55 | extern const int QUERY_IS_TOO_LARGE; |
| 56 | extern const int INTO_OUTFILE_NOT_ALLOWED; |
| 57 | extern const int QUERY_WAS_CANCELLED; |
| 58 | } |
| 59 | |
| 60 | |
| 61 | static void checkASTSizeLimits(const IAST & ast, const Settings & settings) |
| 62 | { |
| 63 | if (settings.max_ast_depth) |
| 64 | ast.checkDepth(settings.max_ast_depth); |
| 65 | if (settings.max_ast_elements) |
| 66 | ast.checkSize(settings.max_ast_elements); |
| 67 | } |
| 68 | |
| 69 | /// NOTE This is wrong in case of single-line comments and in case of multiline string literals. |
| 70 | static String joinLines(const String & query) |
| 71 | { |
| 72 | String res = query; |
| 73 | std::replace(res.begin(), res.end(), '\n', ' '); |
| 74 | return res; |
| 75 | } |
| 76 | |
| 77 | |
| 78 | static String prepareQueryForLogging(const String & query, Context & context) |
| 79 | { |
| 80 | String res = query; |
| 81 | |
| 82 | // wiping sensitive data before cropping query by log_queries_cut_to_length, |
| 83 | // otherwise something like credit card without last digit can go to log |
| 84 | if (auto masker = SensitiveDataMasker::getInstance()) |
| 85 | { |
| 86 | auto matches = masker->wipeSensitiveData(res); |
| 87 | if (matches > 0) |
| 88 | { |
| 89 | ProfileEvents::increment(ProfileEvents::QueryMaskingRulesMatch, matches); |
| 90 | } |
| 91 | } |
| 92 | |
| 93 | res = res.substr(0, context.getSettingsRef().log_queries_cut_to_length); |
| 94 | |
| 95 | return res; |
| 96 | } |
| 97 | |
| 98 | |
| 99 | /// Log query into text log (not into system table). |
| 100 | static void logQuery(const String & query, const Context & context, bool internal) |
| 101 | { |
| 102 | if (internal) |
| 103 | { |
| 104 | LOG_DEBUG(&Logger::get("executeQuery" ), "(internal) " << joinLines(query)); |
| 105 | } |
| 106 | else |
| 107 | { |
| 108 | const auto & current_query_id = context.getClientInfo().current_query_id; |
| 109 | const auto & initial_query_id = context.getClientInfo().initial_query_id; |
| 110 | const auto & current_user = context.getClientInfo().current_user; |
| 111 | |
| 112 | LOG_DEBUG(&Logger::get("executeQuery" ), "(from " << context.getClientInfo().current_address.toString() |
| 113 | << (current_user != "default" ? ", user: " + context.getClientInfo().current_user : "" ) |
| 114 | << (!initial_query_id.empty() && current_query_id != initial_query_id ? ", initial_query_id: " + initial_query_id : std::string()) |
| 115 | << ") " |
| 116 | << joinLines(query)); |
| 117 | } |
| 118 | } |
| 119 | |
| 120 | |
| 121 | /// Call this inside catch block. |
| 122 | static void setExceptionStackTrace(QueryLogElement & elem) |
| 123 | { |
| 124 | /// Disable memory tracker for stack trace. |
| 125 | /// Because if exception is "Memory limit (for query) exceed", then we probably can't allocate another one string. |
| 126 | auto temporarily_disable_memory_tracker = getCurrentMemoryTrackerActionLock(); |
| 127 | |
| 128 | try |
| 129 | { |
| 130 | throw; |
| 131 | } |
| 132 | catch (const Exception & e) |
| 133 | { |
| 134 | elem.stack_trace = e.getStackTrace().toString(); |
| 135 | } |
| 136 | catch (...) {} |
| 137 | } |
| 138 | |
| 139 | |
| 140 | /// Log exception (with query info) into text log (not into system table). |
| 141 | static void logException(Context & context, QueryLogElement & elem) |
| 142 | { |
| 143 | LOG_ERROR(&Logger::get("executeQuery" ), elem.exception |
| 144 | << " (from " << context.getClientInfo().current_address.toString() << ")" |
| 145 | << " (in query: " << joinLines(elem.query) << ")" |
| 146 | << (!elem.stack_trace.empty() ? ", Stack trace (when copying this message, always include the lines below):\n\n" + elem.stack_trace : "" )); |
| 147 | } |
| 148 | |
| 149 | |
| 150 | static void onExceptionBeforeStart(const String & query_for_logging, Context & context, time_t current_time) |
| 151 | { |
| 152 | /// Exception before the query execution. |
| 153 | context.getQuota()->used(Quota::ERRORS, 1, /* check_exceeded = */ false); |
| 154 | |
| 155 | const Settings & settings = context.getSettingsRef(); |
| 156 | |
| 157 | /// Log the start of query execution into the table if necessary. |
| 158 | QueryLogElement elem; |
| 159 | |
| 160 | elem.type = QueryLogElement::EXCEPTION_BEFORE_START; |
| 161 | |
| 162 | elem.event_time = current_time; |
| 163 | elem.query_start_time = current_time; |
| 164 | |
| 165 | elem.query = query_for_logging; |
| 166 | elem.exception = getCurrentExceptionMessage(false); |
| 167 | |
| 168 | elem.client_info = context.getClientInfo(); |
| 169 | |
| 170 | if (settings.calculate_text_stack_trace) |
| 171 | setExceptionStackTrace(elem); |
| 172 | logException(context, elem); |
| 173 | |
| 174 | /// Update performance counters before logging to query_log |
| 175 | CurrentThread::finalizePerformanceCounters(); |
| 176 | |
| 177 | if (settings.log_queries) |
| 178 | if (auto query_log = context.getQueryLog()) |
| 179 | query_log->add(elem); |
| 180 | } |
| 181 | |
| 182 | |
| 183 | static std::tuple<ASTPtr, BlockIO> executeQueryImpl( |
| 184 | const char * begin, |
| 185 | const char * end, |
| 186 | Context & context, |
| 187 | bool internal, |
| 188 | QueryProcessingStage::Enum stage, |
| 189 | bool has_query_tail, |
| 190 | ReadBuffer * istr, |
| 191 | bool allow_processors) |
| 192 | { |
| 193 | time_t current_time = time(nullptr); |
| 194 | |
| 195 | /// If we already executing query and it requires to execute internal query, than |
| 196 | /// don't replace thread context with given (it can be temporary). Otherwise, attach context to thread. |
| 197 | if (!internal) |
| 198 | { |
| 199 | context.makeQueryContext(); |
| 200 | CurrentThread::attachQueryContext(context); |
| 201 | } |
| 202 | |
| 203 | const Settings & settings = context.getSettingsRef(); |
| 204 | |
| 205 | ParserQuery parser(end, settings.enable_debug_queries); |
| 206 | ASTPtr ast; |
| 207 | const char * query_end; |
| 208 | |
| 209 | /// Don't limit the size of internal queries. |
| 210 | size_t max_query_size = 0; |
| 211 | if (!internal) |
| 212 | max_query_size = settings.max_query_size; |
| 213 | |
| 214 | try |
| 215 | { |
| 216 | /// TODO Parser should fail early when max_query_size limit is reached. |
| 217 | ast = parseQuery(parser, begin, end, "" , max_query_size); |
| 218 | |
| 219 | auto * insert_query = ast->as<ASTInsertQuery>(); |
| 220 | |
| 221 | if (insert_query && insert_query->settings_ast) |
| 222 | InterpreterSetQuery(insert_query->settings_ast, context).executeForCurrentContext(); |
| 223 | |
| 224 | if (insert_query && insert_query->data) |
| 225 | { |
| 226 | query_end = insert_query->data; |
| 227 | insert_query->has_tail = has_query_tail; |
| 228 | } |
| 229 | else |
| 230 | { |
| 231 | query_end = end; |
| 232 | } |
| 233 | } |
| 234 | catch (...) |
| 235 | { |
| 236 | /// Anyway log the query. |
| 237 | String query = String(begin, begin + std::min(end - begin, static_cast<ptrdiff_t>(max_query_size))); |
| 238 | |
| 239 | auto query_for_logging = prepareQueryForLogging(query, context); |
| 240 | logQuery(query_for_logging, context, internal); |
| 241 | |
| 242 | if (!internal) |
| 243 | onExceptionBeforeStart(query_for_logging, context, current_time); |
| 244 | |
| 245 | throw; |
| 246 | } |
| 247 | |
| 248 | /// Copy query into string. It will be written to log and presented in processlist. If an INSERT query, string will not include data to insertion. |
| 249 | String query(begin, query_end); |
| 250 | BlockIO res; |
| 251 | QueryPipeline & pipeline = res.pipeline; |
| 252 | |
| 253 | String query_for_logging = "" ; |
| 254 | |
| 255 | try |
| 256 | { |
| 257 | /// Replace ASTQueryParameter with ASTLiteral for prepared statements. |
| 258 | if (context.hasQueryParameters()) |
| 259 | { |
| 260 | ReplaceQueryParameterVisitor visitor(context.getQueryParameters()); |
| 261 | visitor.visit(ast); |
| 262 | |
| 263 | /// Get new query after substitutions. |
| 264 | query = serializeAST(*ast); |
| 265 | } |
| 266 | |
| 267 | query_for_logging = prepareQueryForLogging(query, context); |
| 268 | |
| 269 | logQuery(query_for_logging, context, internal); |
| 270 | |
| 271 | /// Check the limits. |
| 272 | checkASTSizeLimits(*ast, settings); |
| 273 | |
| 274 | /// Put query to process list. But don't put SHOW PROCESSLIST query itself. |
| 275 | ProcessList::EntryPtr process_list_entry; |
| 276 | if (!internal && !ast->as<ASTShowProcesslistQuery>()) |
| 277 | { |
| 278 | /// processlist also has query masked now, to avoid secrets leaks though SHOW PROCESSLIST by other users. |
| 279 | process_list_entry = context.getProcessList().insert(query_for_logging, ast.get(), context); |
| 280 | context.setProcessListElement(&process_list_entry->get()); |
| 281 | } |
| 282 | |
| 283 | /// Load external tables if they were provided |
| 284 | context.initializeExternalTablesIfSet(); |
| 285 | |
| 286 | auto * insert_query = ast->as<ASTInsertQuery>(); |
| 287 | if (insert_query && insert_query->select) |
| 288 | { |
| 289 | /// Prepare Input storage before executing interpreter if we already got a buffer with data. |
| 290 | if (istr) |
| 291 | { |
| 292 | ASTPtr input_function; |
| 293 | insert_query->tryFindInputFunction(input_function); |
| 294 | if (input_function) |
| 295 | { |
| 296 | StoragePtr storage = context.executeTableFunction(input_function); |
| 297 | auto & input_storage = dynamic_cast<StorageInput &>(*storage); |
| 298 | BlockInputStreamPtr input_stream = std::make_shared<InputStreamFromASTInsertQuery>(ast, istr, |
| 299 | input_storage.getSampleBlock(), context, input_function); |
| 300 | input_storage.setInputStream(input_stream); |
| 301 | } |
| 302 | } |
| 303 | } |
| 304 | else |
| 305 | /// reset Input callbacks if query is not INSERT SELECT |
| 306 | context.resetInputCallbacks(); |
| 307 | |
| 308 | auto interpreter = InterpreterFactory::get(ast, context, stage); |
| 309 | bool use_processors = settings.experimental_use_processors && allow_processors && interpreter->canExecuteWithProcessors(); |
| 310 | |
| 311 | QuotaContextPtr quota; |
| 312 | if (!interpreter->ignoreQuota()) |
| 313 | { |
| 314 | quota = context.getQuota(); |
| 315 | quota->used(Quota::QUERIES, 1); |
| 316 | quota->checkExceeded(Quota::ERRORS); |
| 317 | } |
| 318 | |
| 319 | IBlockInputStream::LocalLimits limits; |
| 320 | if (!interpreter->ignoreLimits()) |
| 321 | { |
| 322 | limits.mode = IBlockInputStream::LIMITS_CURRENT; |
| 323 | limits.size_limits = SizeLimits(settings.max_result_rows, settings.max_result_bytes, settings.result_overflow_mode); |
| 324 | } |
| 325 | |
| 326 | if (use_processors) |
| 327 | pipeline = interpreter->executeWithProcessors(); |
| 328 | else |
| 329 | res = interpreter->execute(); |
| 330 | |
| 331 | if (auto * insert_interpreter = typeid_cast<const InterpreterInsertQuery *>(&*interpreter)) |
| 332 | { |
| 333 | /// Save insertion table (not table function). TODO: support remote() table function. |
| 334 | auto db_table = insert_interpreter->getDatabaseTable(); |
| 335 | if (!db_table.second.empty()) |
| 336 | context.setInsertionTable(std::move(db_table)); |
| 337 | } |
| 338 | |
| 339 | if (process_list_entry) |
| 340 | { |
| 341 | /// Query was killed before execution |
| 342 | if ((*process_list_entry)->isKilled()) |
| 343 | throw Exception("Query '" + (*process_list_entry)->getInfo().client_info.current_query_id + "' is killed in pending state" , |
| 344 | ErrorCodes::QUERY_WAS_CANCELLED); |
| 345 | else if (!use_processors) |
| 346 | (*process_list_entry)->setQueryStreams(res); |
| 347 | } |
| 348 | |
| 349 | /// Hold element of process list till end of query execution. |
| 350 | res.process_list_entry = process_list_entry; |
| 351 | |
| 352 | if (use_processors) |
| 353 | { |
| 354 | /// Limits on the result, the quota on the result, and also callback for progress. |
| 355 | /// Limits apply only to the final result. |
| 356 | pipeline.setProgressCallback(context.getProgressCallback()); |
| 357 | pipeline.setProcessListElement(context.getProcessListElement()); |
| 358 | if (stage == QueryProcessingStage::Complete) |
| 359 | { |
| 360 | pipeline.resize(1); |
| 361 | pipeline.addSimpleTransform([&](const Block & ) |
| 362 | { |
| 363 | auto transform = std::make_shared<LimitsCheckingTransform>(header, limits); |
| 364 | transform->setQuota(quota); |
| 365 | return transform; |
| 366 | }); |
| 367 | } |
| 368 | } |
| 369 | else |
| 370 | { |
| 371 | /// Limits on the result, the quota on the result, and also callback for progress. |
| 372 | /// Limits apply only to the final result. |
| 373 | if (res.in) |
| 374 | { |
| 375 | res.in->setProgressCallback(context.getProgressCallback()); |
| 376 | res.in->setProcessListElement(context.getProcessListElement()); |
| 377 | if (stage == QueryProcessingStage::Complete) |
| 378 | { |
| 379 | if (!interpreter->ignoreQuota()) |
| 380 | res.in->setQuota(quota); |
| 381 | if (!interpreter->ignoreLimits()) |
| 382 | res.in->setLimits(limits); |
| 383 | } |
| 384 | } |
| 385 | |
| 386 | if (res.out) |
| 387 | { |
| 388 | if (auto stream = dynamic_cast<CountingBlockOutputStream *>(res.out.get())) |
| 389 | { |
| 390 | stream->setProcessListElement(context.getProcessListElement()); |
| 391 | } |
| 392 | } |
| 393 | } |
| 394 | |
| 395 | /// Everything related to query log. |
| 396 | { |
| 397 | QueryLogElement elem; |
| 398 | |
| 399 | elem.type = QueryLogElement::QUERY_START; |
| 400 | |
| 401 | elem.event_time = current_time; |
| 402 | elem.query_start_time = current_time; |
| 403 | |
| 404 | elem.query = query_for_logging; |
| 405 | |
| 406 | elem.client_info = context.getClientInfo(); |
| 407 | |
| 408 | bool log_queries = settings.log_queries && !internal; |
| 409 | |
| 410 | /// Log into system table start of query execution, if need. |
| 411 | if (log_queries) |
| 412 | { |
| 413 | if (settings.log_query_settings) |
| 414 | elem.query_settings = std::make_shared<Settings>(context.getSettingsRef()); |
| 415 | |
| 416 | if (auto query_log = context.getQueryLog()) |
| 417 | query_log->add(elem); |
| 418 | } |
| 419 | |
| 420 | /// Also make possible for caller to log successful query finish and exception during execution. |
| 421 | auto finish_callback = [elem, &context, log_queries] (IBlockInputStream * stream_in, IBlockOutputStream * stream_out) mutable |
| 422 | { |
| 423 | QueryStatus * process_list_elem = context.getProcessListElement(); |
| 424 | |
| 425 | if (!process_list_elem) |
| 426 | return; |
| 427 | |
| 428 | /// Update performance counters before logging to query_log |
| 429 | CurrentThread::finalizePerformanceCounters(); |
| 430 | |
| 431 | QueryStatusInfo info = process_list_elem->getInfo(true, context.getSettingsRef().log_profile_events); |
| 432 | |
| 433 | double elapsed_seconds = info.elapsed_seconds; |
| 434 | |
| 435 | elem.type = QueryLogElement::QUERY_FINISH; |
| 436 | |
| 437 | elem.event_time = time(nullptr); |
| 438 | elem.query_duration_ms = elapsed_seconds * 1000; |
| 439 | |
| 440 | elem.read_rows = info.read_rows; |
| 441 | elem.read_bytes = info.read_bytes; |
| 442 | |
| 443 | elem.written_rows = info.written_rows; |
| 444 | elem.written_bytes = info.written_bytes; |
| 445 | |
| 446 | auto progress_callback = context.getProgressCallback(); |
| 447 | |
| 448 | if (progress_callback) |
| 449 | progress_callback(Progress(WriteProgress(info.written_rows, info.written_bytes))); |
| 450 | |
| 451 | elem.memory_usage = info.peak_memory_usage > 0 ? info.peak_memory_usage : 0; |
| 452 | |
| 453 | if (stream_in) |
| 454 | { |
| 455 | const BlockStreamProfileInfo & stream_in_info = stream_in->getProfileInfo(); |
| 456 | |
| 457 | /// NOTE: INSERT SELECT query contains zero metrics |
| 458 | elem.result_rows = stream_in_info.rows; |
| 459 | elem.result_bytes = stream_in_info.bytes; |
| 460 | } |
| 461 | else if (stream_out) /// will be used only for ordinary INSERT queries |
| 462 | { |
| 463 | if (auto counting_stream = dynamic_cast<const CountingBlockOutputStream *>(stream_out)) |
| 464 | { |
| 465 | /// NOTE: Redundancy. The same values could be extracted from process_list_elem->progress_out.query_settings = process_list_elem->progress_in |
| 466 | elem.result_rows = counting_stream->getProgress().read_rows; |
| 467 | elem.result_bytes = counting_stream->getProgress().read_bytes; |
| 468 | } |
| 469 | } |
| 470 | |
| 471 | if (elem.read_rows != 0) |
| 472 | { |
| 473 | LOG_INFO(&Logger::get("executeQuery" ), std::fixed << std::setprecision(3) |
| 474 | << "Read " << elem.read_rows << " rows, " |
| 475 | << formatReadableSizeWithBinarySuffix(elem.read_bytes) << " in " << elapsed_seconds << " sec., " |
| 476 | << static_cast<size_t>(elem.read_rows / elapsed_seconds) << " rows/sec., " |
| 477 | << formatReadableSizeWithBinarySuffix(elem.read_bytes / elapsed_seconds) << "/sec." ); |
| 478 | } |
| 479 | |
| 480 | elem.thread_numbers = std::move(info.thread_numbers); |
| 481 | elem.os_thread_ids = std::move(info.os_thread_ids); |
| 482 | elem.profile_counters = std::move(info.profile_counters); |
| 483 | |
| 484 | if (log_queries) |
| 485 | { |
| 486 | if (auto query_log = context.getQueryLog()) |
| 487 | query_log->add(elem); |
| 488 | } |
| 489 | }; |
| 490 | |
| 491 | auto exception_callback = [elem, &context, log_queries] () mutable |
| 492 | { |
| 493 | context.getQuota()->used(Quota::ERRORS, 1, /* check_exceeded = */ false); |
| 494 | |
| 495 | elem.type = QueryLogElement::EXCEPTION_WHILE_PROCESSING; |
| 496 | |
| 497 | elem.event_time = time(nullptr); |
| 498 | elem.query_duration_ms = 1000 * (elem.event_time - elem.query_start_time); |
| 499 | elem.exception = getCurrentExceptionMessage(false); |
| 500 | |
| 501 | QueryStatus * process_list_elem = context.getProcessListElement(); |
| 502 | const Settings & current_settings = context.getSettingsRef(); |
| 503 | |
| 504 | /// Update performance counters before logging to query_log |
| 505 | CurrentThread::finalizePerformanceCounters(); |
| 506 | |
| 507 | if (process_list_elem) |
| 508 | { |
| 509 | QueryStatusInfo info = process_list_elem->getInfo(true, current_settings.log_profile_events, false); |
| 510 | |
| 511 | elem.query_duration_ms = info.elapsed_seconds * 1000; |
| 512 | |
| 513 | elem.read_rows = info.read_rows; |
| 514 | elem.read_bytes = info.read_bytes; |
| 515 | |
| 516 | elem.memory_usage = info.peak_memory_usage > 0 ? info.peak_memory_usage : 0; |
| 517 | |
| 518 | elem.thread_numbers = std::move(info.thread_numbers); |
| 519 | elem.os_thread_ids = std::move(info.os_thread_ids); |
| 520 | elem.profile_counters = std::move(info.profile_counters); |
| 521 | } |
| 522 | |
| 523 | if (current_settings.calculate_text_stack_trace) |
| 524 | setExceptionStackTrace(elem); |
| 525 | logException(context, elem); |
| 526 | |
| 527 | /// In case of exception we log internal queries also |
| 528 | if (log_queries) |
| 529 | { |
| 530 | if (auto query_log = context.getQueryLog()) |
| 531 | query_log->add(elem); |
| 532 | } |
| 533 | }; |
| 534 | |
| 535 | res.finish_callback = std::move(finish_callback); |
| 536 | res.exception_callback = std::move(exception_callback); |
| 537 | |
| 538 | if (!internal && res.in) |
| 539 | { |
| 540 | std::stringstream log_str; |
| 541 | log_str << "Query pipeline:\n" ; |
| 542 | res.in->dumpTree(log_str); |
| 543 | LOG_DEBUG(&Logger::get("executeQuery" ), log_str.str()); |
| 544 | } |
| 545 | } |
| 546 | } |
| 547 | catch (...) |
| 548 | { |
| 549 | if (!internal) |
| 550 | { |
| 551 | if (query_for_logging.empty()) |
| 552 | query_for_logging = prepareQueryForLogging(query, context); |
| 553 | |
| 554 | onExceptionBeforeStart(query_for_logging, context, current_time); |
| 555 | } |
| 556 | |
| 557 | throw; |
| 558 | } |
| 559 | |
| 560 | return std::make_tuple(ast, res); |
| 561 | } |
| 562 | |
| 563 | |
| 564 | BlockIO executeQuery( |
| 565 | const String & query, |
| 566 | Context & context, |
| 567 | bool internal, |
| 568 | QueryProcessingStage::Enum stage, |
| 569 | bool may_have_embedded_data, |
| 570 | bool allow_processors) |
| 571 | { |
| 572 | ASTPtr ast; |
| 573 | BlockIO streams; |
| 574 | std::tie(ast, streams) = executeQueryImpl(query.data(), query.data() + query.size(), context, |
| 575 | internal, stage, !may_have_embedded_data, nullptr, allow_processors); |
| 576 | if (streams.in) |
| 577 | { |
| 578 | const auto * ast_query_with_output = dynamic_cast<const ASTQueryWithOutput *>(ast.get()); |
| 579 | String format_name = ast_query_with_output && (ast_query_with_output->format != nullptr) |
| 580 | ? getIdentifierName(ast_query_with_output->format) : context.getDefaultFormat(); |
| 581 | if (format_name == "Null" ) |
| 582 | streams.null_format = true; |
| 583 | } |
| 584 | return streams; |
| 585 | } |
| 586 | |
| 587 | |
| 588 | void executeQuery( |
| 589 | ReadBuffer & istr, |
| 590 | WriteBuffer & ostr, |
| 591 | bool allow_into_outfile, |
| 592 | Context & context, |
| 593 | std::function<void(const String &)> set_content_type, |
| 594 | std::function<void(const String &)> set_query_id) |
| 595 | { |
| 596 | PODArray<char> parse_buf; |
| 597 | const char * begin; |
| 598 | const char * end; |
| 599 | |
| 600 | /// If 'istr' is empty now, fetch next data into buffer. |
| 601 | if (istr.buffer().size() == 0) |
| 602 | istr.next(); |
| 603 | |
| 604 | size_t max_query_size = context.getSettingsRef().max_query_size; |
| 605 | |
| 606 | bool may_have_tail; |
| 607 | if (istr.buffer().end() - istr.position() > static_cast<ssize_t>(max_query_size)) |
| 608 | { |
| 609 | /// If remaining buffer space in 'istr' is enough to parse query up to 'max_query_size' bytes, then parse inplace. |
| 610 | begin = istr.position(); |
| 611 | end = istr.buffer().end(); |
| 612 | istr.position() += end - begin; |
| 613 | /// Actually we don't know will query has additional data or not. |
| 614 | /// But we can't check istr.eof(), because begin and end pointers will became invalid |
| 615 | may_have_tail = true; |
| 616 | } |
| 617 | else |
| 618 | { |
| 619 | /// If not - copy enough data into 'parse_buf'. |
| 620 | WriteBufferFromVector<PODArray<char>> out(parse_buf); |
| 621 | LimitReadBuffer limit(istr, max_query_size + 1, false); |
| 622 | copyData(limit, out); |
| 623 | out.finish(); |
| 624 | |
| 625 | begin = parse_buf.data(); |
| 626 | end = begin + parse_buf.size(); |
| 627 | /// Can check stream for eof, because we have copied data |
| 628 | may_have_tail = !istr.eof(); |
| 629 | } |
| 630 | |
| 631 | ASTPtr ast; |
| 632 | BlockIO streams; |
| 633 | |
| 634 | std::tie(ast, streams) = executeQueryImpl(begin, end, context, false, QueryProcessingStage::Complete, may_have_tail, &istr, true); |
| 635 | |
| 636 | auto & pipeline = streams.pipeline; |
| 637 | |
| 638 | try |
| 639 | { |
| 640 | if (streams.out) |
| 641 | { |
| 642 | InputStreamFromASTInsertQuery in(ast, &istr, streams.out->getHeader(), context, nullptr); |
| 643 | copyData(in, *streams.out); |
| 644 | } |
| 645 | |
| 646 | if (streams.in) |
| 647 | { |
| 648 | /// FIXME: try to prettify this cast using `as<>()` |
| 649 | const auto * ast_query_with_output = dynamic_cast<const ASTQueryWithOutput *>(ast.get()); |
| 650 | |
| 651 | WriteBuffer * out_buf = &ostr; |
| 652 | std::optional<WriteBufferFromFile> out_file_buf; |
| 653 | if (ast_query_with_output && ast_query_with_output->out_file) |
| 654 | { |
| 655 | if (!allow_into_outfile) |
| 656 | throw Exception("INTO OUTFILE is not allowed" , ErrorCodes::INTO_OUTFILE_NOT_ALLOWED); |
| 657 | |
| 658 | const auto & out_file = ast_query_with_output->out_file->as<ASTLiteral &>().value.safeGet<std::string>(); |
| 659 | out_file_buf.emplace(out_file, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_EXCL | O_CREAT); |
| 660 | out_buf = &*out_file_buf; |
| 661 | } |
| 662 | |
| 663 | String format_name = ast_query_with_output && (ast_query_with_output->format != nullptr) |
| 664 | ? getIdentifierName(ast_query_with_output->format) |
| 665 | : context.getDefaultFormat(); |
| 666 | |
| 667 | if (ast_query_with_output && ast_query_with_output->settings_ast) |
| 668 | InterpreterSetQuery(ast_query_with_output->settings_ast, context).executeForCurrentContext(); |
| 669 | |
| 670 | BlockOutputStreamPtr out = context.getOutputFormat(format_name, *out_buf, streams.in->getHeader()); |
| 671 | |
| 672 | /// Save previous progress callback if any. TODO Do it more conveniently. |
| 673 | auto previous_progress_callback = context.getProgressCallback(); |
| 674 | |
| 675 | /// NOTE Progress callback takes shared ownership of 'out'. |
| 676 | streams.in->setProgressCallback([out, previous_progress_callback] (const Progress & progress) |
| 677 | { |
| 678 | if (previous_progress_callback) |
| 679 | previous_progress_callback(progress); |
| 680 | out->onProgress(progress); |
| 681 | }); |
| 682 | |
| 683 | if (set_content_type) |
| 684 | set_content_type(out->getContentType()); |
| 685 | |
| 686 | if (set_query_id) |
| 687 | set_query_id(context.getClientInfo().current_query_id); |
| 688 | |
| 689 | if (ast->as<ASTWatchQuery>()) |
| 690 | { |
| 691 | /// For Watch query, flush data if block is empty (to send data to client). |
| 692 | auto flush_callback = [&out](const Block & block) |
| 693 | { |
| 694 | if (block.rows() == 0) |
| 695 | out->flush(); |
| 696 | }; |
| 697 | |
| 698 | copyData(*streams.in, *out, [](){ return false; }, std::move(flush_callback)); |
| 699 | } |
| 700 | else |
| 701 | copyData(*streams.in, *out); |
| 702 | } |
| 703 | |
| 704 | if (pipeline.initialized()) |
| 705 | { |
| 706 | const ASTQueryWithOutput * ast_query_with_output = dynamic_cast<const ASTQueryWithOutput *>(ast.get()); |
| 707 | |
| 708 | WriteBuffer * out_buf = &ostr; |
| 709 | std::optional<WriteBufferFromFile> out_file_buf; |
| 710 | if (ast_query_with_output && ast_query_with_output->out_file) |
| 711 | { |
| 712 | if (!allow_into_outfile) |
| 713 | throw Exception("INTO OUTFILE is not allowed" , ErrorCodes::INTO_OUTFILE_NOT_ALLOWED); |
| 714 | |
| 715 | const auto & out_file = typeid_cast<const ASTLiteral &>(*ast_query_with_output->out_file).value.safeGet<std::string>(); |
| 716 | out_file_buf.emplace(out_file, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_EXCL | O_CREAT); |
| 717 | out_buf = &*out_file_buf; |
| 718 | } |
| 719 | |
| 720 | String format_name = ast_query_with_output && (ast_query_with_output->format != nullptr) |
| 721 | ? getIdentifierName(ast_query_with_output->format) |
| 722 | : context.getDefaultFormat(); |
| 723 | |
| 724 | if (ast_query_with_output && ast_query_with_output->settings_ast) |
| 725 | InterpreterSetQuery(ast_query_with_output->settings_ast, context).executeForCurrentContext(); |
| 726 | |
| 727 | pipeline.addSimpleTransform([](const Block & ) |
| 728 | { |
| 729 | return std::make_shared<MaterializingTransform>(header); |
| 730 | }); |
| 731 | |
| 732 | auto out = context.getOutputFormatProcessor(format_name, *out_buf, pipeline.getHeader()); |
| 733 | |
| 734 | /// Save previous progress callback if any. TODO Do it more conveniently. |
| 735 | auto previous_progress_callback = context.getProgressCallback(); |
| 736 | |
| 737 | /// NOTE Progress callback takes shared ownership of 'out'. |
| 738 | pipeline.setProgressCallback([out, previous_progress_callback] (const Progress & progress) |
| 739 | { |
| 740 | if (previous_progress_callback) |
| 741 | previous_progress_callback(progress); |
| 742 | out->onProgress(progress); |
| 743 | }); |
| 744 | |
| 745 | if (set_content_type) |
| 746 | set_content_type(out->getContentType()); |
| 747 | |
| 748 | if (set_query_id) |
| 749 | set_query_id(context.getClientInfo().current_query_id); |
| 750 | |
| 751 | pipeline.setOutput(std::move(out)); |
| 752 | |
| 753 | { |
| 754 | auto executor = pipeline.execute(); |
| 755 | executor->execute(context.getSettingsRef().max_threads); |
| 756 | } |
| 757 | |
| 758 | pipeline.finalize(); |
| 759 | } |
| 760 | } |
| 761 | catch (...) |
| 762 | { |
| 763 | streams.onException(); |
| 764 | throw; |
| 765 | } |
| 766 | |
| 767 | streams.onFinish(); |
| 768 | } |
| 769 | |
| 770 | } |
| 771 | |