| 1 | #include <Common/formatReadable.h> | 
|---|
| 2 | #include <Common/PODArray.h> | 
|---|
| 3 | #include <Common/typeid_cast.h> | 
|---|
| 4 |  | 
|---|
| 5 | #include <IO/ConcatReadBuffer.h> | 
|---|
| 6 | #include <IO/WriteBufferFromFile.h> | 
|---|
| 7 | #include <IO/WriteBufferFromVector.h> | 
|---|
| 8 | #include <IO/LimitReadBuffer.h> | 
|---|
| 9 | #include <IO/copyData.h> | 
|---|
| 10 |  | 
|---|
| 11 | #include <DataStreams/BlockIO.h> | 
|---|
| 12 | #include <DataStreams/copyData.h> | 
|---|
| 13 | #include <DataStreams/IBlockInputStream.h> | 
|---|
| 14 | #include <DataStreams/InputStreamFromASTInsertQuery.h> | 
|---|
| 15 | #include <DataStreams/CountingBlockOutputStream.h> | 
|---|
| 16 |  | 
|---|
| 17 | #include <Parsers/ASTInsertQuery.h> | 
|---|
| 18 | #include <Parsers/ASTShowProcesslistQuery.h> | 
|---|
| 19 | #include <Parsers/ASTIdentifier.h> | 
|---|
| 20 | #include <Parsers/ASTLiteral.h> | 
|---|
| 21 | #include <Parsers/ParserQuery.h> | 
|---|
| 22 | #include <Parsers/parseQuery.h> | 
|---|
| 23 | #include <Parsers/queryToString.h> | 
|---|
| 24 |  | 
|---|
| 25 | #include <Storages/StorageInput.h> | 
|---|
| 26 |  | 
|---|
| 27 | #include <Access/QuotaContext.h> | 
|---|
| 28 | #include <Interpreters/InterpreterFactory.h> | 
|---|
| 29 | #include <Interpreters/ProcessList.h> | 
|---|
| 30 | #include <Interpreters/QueryLog.h> | 
|---|
| 31 | #include <Interpreters/InterpreterSetQuery.h> | 
|---|
| 32 | #include <Interpreters/ReplaceQueryParameterVisitor.h> | 
|---|
| 33 | #include <Interpreters/executeQuery.h> | 
|---|
| 34 | #include <Common/ProfileEvents.h> | 
|---|
| 35 |  | 
|---|
| 36 | #include <Interpreters/DNSCacheUpdater.h> | 
|---|
| 37 | #include <Common/SensitiveDataMasker.h> | 
|---|
| 38 |  | 
|---|
| 39 | #include <Processors/Transforms/LimitsCheckingTransform.h> | 
|---|
| 40 | #include <Processors/Transforms/MaterializingTransform.h> | 
|---|
| 41 | #include <Processors/Formats/IOutputFormat.h> | 
|---|
| 42 | #include <Parsers/ASTWatchQuery.h> | 
|---|
| 43 |  | 
|---|
| 44 | namespace ProfileEvents | 
|---|
| 45 | { | 
|---|
| 46 | extern const Event QueryMaskingRulesMatch; | 
|---|
| 47 | } | 
|---|
| 48 |  | 
|---|
| 49 | namespace DB | 
|---|
| 50 | { | 
|---|
| 51 |  | 
|---|
| 52 | namespace ErrorCodes | 
|---|
| 53 | { | 
|---|
| 54 | extern const int LOGICAL_ERROR; | 
|---|
| 55 | extern const int QUERY_IS_TOO_LARGE; | 
|---|
| 56 | extern const int INTO_OUTFILE_NOT_ALLOWED; | 
|---|
| 57 | extern const int QUERY_WAS_CANCELLED; | 
|---|
| 58 | } | 
|---|
| 59 |  | 
|---|
| 60 |  | 
|---|
| 61 | static void checkASTSizeLimits(const IAST & ast, const Settings & settings) | 
|---|
| 62 | { | 
|---|
| 63 | if (settings.max_ast_depth) | 
|---|
| 64 | ast.checkDepth(settings.max_ast_depth); | 
|---|
| 65 | if (settings.max_ast_elements) | 
|---|
| 66 | ast.checkSize(settings.max_ast_elements); | 
|---|
| 67 | } | 
|---|
| 68 |  | 
|---|
| 69 | /// NOTE This is wrong in case of single-line comments and in case of multiline string literals. | 
|---|
| 70 | static String joinLines(const String & query) | 
|---|
| 71 | { | 
|---|
| 72 | String res = query; | 
|---|
| 73 | std::replace(res.begin(), res.end(), '\n', ' '); | 
|---|
| 74 | return res; | 
|---|
| 75 | } | 
|---|
| 76 |  | 
|---|
| 77 |  | 
|---|
| 78 | static String prepareQueryForLogging(const String & query, Context & context) | 
|---|
| 79 | { | 
|---|
| 80 | String res = query; | 
|---|
| 81 |  | 
|---|
| 82 | // wiping sensitive data before cropping query by log_queries_cut_to_length, | 
|---|
| 83 | // otherwise something like credit card without last digit can go to log | 
|---|
| 84 | if (auto masker = SensitiveDataMasker::getInstance()) | 
|---|
| 85 | { | 
|---|
| 86 | auto matches = masker->wipeSensitiveData(res); | 
|---|
| 87 | if (matches > 0) | 
|---|
| 88 | { | 
|---|
| 89 | ProfileEvents::increment(ProfileEvents::QueryMaskingRulesMatch, matches); | 
|---|
| 90 | } | 
|---|
| 91 | } | 
|---|
| 92 |  | 
|---|
| 93 | res = res.substr(0, context.getSettingsRef().log_queries_cut_to_length); | 
|---|
| 94 |  | 
|---|
| 95 | return res; | 
|---|
| 96 | } | 
|---|
| 97 |  | 
|---|
| 98 |  | 
|---|
| 99 | /// Log query into text log (not into system table). | 
|---|
| 100 | static void logQuery(const String & query, const Context & context, bool internal) | 
|---|
| 101 | { | 
|---|
| 102 | if (internal) | 
|---|
| 103 | { | 
|---|
| 104 | LOG_DEBUG(&Logger::get( "executeQuery"), "(internal) "<< joinLines(query)); | 
|---|
| 105 | } | 
|---|
| 106 | else | 
|---|
| 107 | { | 
|---|
| 108 | const auto & current_query_id = context.getClientInfo().current_query_id; | 
|---|
| 109 | const auto & initial_query_id = context.getClientInfo().initial_query_id; | 
|---|
| 110 | const auto & current_user = context.getClientInfo().current_user; | 
|---|
| 111 |  | 
|---|
| 112 | LOG_DEBUG(&Logger::get( "executeQuery"), "(from "<< context.getClientInfo().current_address.toString() | 
|---|
| 113 | << (current_user != "default"? ", user: "+ context.getClientInfo().current_user : "") | 
|---|
| 114 | << (!initial_query_id.empty() && current_query_id != initial_query_id ? ", initial_query_id: "+ initial_query_id : std::string()) | 
|---|
| 115 | << ") " | 
|---|
| 116 | << joinLines(query)); | 
|---|
| 117 | } | 
|---|
| 118 | } | 
|---|
| 119 |  | 
|---|
| 120 |  | 
|---|
| 121 | /// Call this inside catch block. | 
|---|
| 122 | static void setExceptionStackTrace(QueryLogElement & elem) | 
|---|
| 123 | { | 
|---|
| 124 | /// Disable memory tracker for stack trace. | 
|---|
| 125 | /// Because if exception is "Memory limit (for query) exceed", then we probably can't allocate another one string. | 
|---|
| 126 | auto temporarily_disable_memory_tracker = getCurrentMemoryTrackerActionLock(); | 
|---|
| 127 |  | 
|---|
| 128 | try | 
|---|
| 129 | { | 
|---|
| 130 | throw; | 
|---|
| 131 | } | 
|---|
| 132 | catch (const Exception & e) | 
|---|
| 133 | { | 
|---|
| 134 | elem.stack_trace = e.getStackTrace().toString(); | 
|---|
| 135 | } | 
|---|
| 136 | catch (...) {} | 
|---|
| 137 | } | 
|---|
| 138 |  | 
|---|
| 139 |  | 
|---|
| 140 | /// Log exception (with query info) into text log (not into system table). | 
|---|
| 141 | static void logException(Context & context, QueryLogElement & elem) | 
|---|
| 142 | { | 
|---|
| 143 | LOG_ERROR(&Logger::get( "executeQuery"), elem.exception | 
|---|
| 144 | << " (from "<< context.getClientInfo().current_address.toString() << ")" | 
|---|
| 145 | << " (in query: "<< joinLines(elem.query) << ")" | 
|---|
| 146 | << (!elem.stack_trace.empty() ? ", Stack trace (when copying this message, always include the lines below):\n\n"+ elem.stack_trace : "")); | 
|---|
| 147 | } | 
|---|
| 148 |  | 
|---|
| 149 |  | 
|---|
| 150 | static void onExceptionBeforeStart(const String & query_for_logging, Context & context, time_t current_time) | 
|---|
| 151 | { | 
|---|
| 152 | /// Exception before the query execution. | 
|---|
| 153 | context.getQuota()->used(Quota::ERRORS, 1, /* check_exceeded = */ false); | 
|---|
| 154 |  | 
|---|
| 155 | const Settings & settings = context.getSettingsRef(); | 
|---|
| 156 |  | 
|---|
| 157 | /// Log the start of query execution into the table if necessary. | 
|---|
| 158 | QueryLogElement elem; | 
|---|
| 159 |  | 
|---|
| 160 | elem.type = QueryLogElement::EXCEPTION_BEFORE_START; | 
|---|
| 161 |  | 
|---|
| 162 | elem.event_time = current_time; | 
|---|
| 163 | elem.query_start_time = current_time; | 
|---|
| 164 |  | 
|---|
| 165 | elem.query = query_for_logging; | 
|---|
| 166 | elem.exception = getCurrentExceptionMessage(false); | 
|---|
| 167 |  | 
|---|
| 168 | elem.client_info = context.getClientInfo(); | 
|---|
| 169 |  | 
|---|
| 170 | if (settings.calculate_text_stack_trace) | 
|---|
| 171 | setExceptionStackTrace(elem); | 
|---|
| 172 | logException(context, elem); | 
|---|
| 173 |  | 
|---|
| 174 | /// Update performance counters before logging to query_log | 
|---|
| 175 | CurrentThread::finalizePerformanceCounters(); | 
|---|
| 176 |  | 
|---|
| 177 | if (settings.log_queries) | 
|---|
| 178 | if (auto query_log = context.getQueryLog()) | 
|---|
| 179 | query_log->add(elem); | 
|---|
| 180 | } | 
|---|
| 181 |  | 
|---|
| 182 |  | 
|---|
| 183 | static std::tuple<ASTPtr, BlockIO> executeQueryImpl( | 
|---|
| 184 | const char * begin, | 
|---|
| 185 | const char * end, | 
|---|
| 186 | Context & context, | 
|---|
| 187 | bool internal, | 
|---|
| 188 | QueryProcessingStage::Enum stage, | 
|---|
| 189 | bool has_query_tail, | 
|---|
| 190 | ReadBuffer * istr, | 
|---|
| 191 | bool allow_processors) | 
|---|
| 192 | { | 
|---|
| 193 | time_t current_time = time(nullptr); | 
|---|
| 194 |  | 
|---|
| 195 | /// If we already executing query and it requires to execute internal query, than | 
|---|
| 196 | /// don't replace thread context with given (it can be temporary). Otherwise, attach context to thread. | 
|---|
| 197 | if (!internal) | 
|---|
| 198 | { | 
|---|
| 199 | context.makeQueryContext(); | 
|---|
| 200 | CurrentThread::attachQueryContext(context); | 
|---|
| 201 | } | 
|---|
| 202 |  | 
|---|
| 203 | const Settings & settings = context.getSettingsRef(); | 
|---|
| 204 |  | 
|---|
| 205 | ParserQuery parser(end, settings.enable_debug_queries); | 
|---|
| 206 | ASTPtr ast; | 
|---|
| 207 | const char * query_end; | 
|---|
| 208 |  | 
|---|
| 209 | /// Don't limit the size of internal queries. | 
|---|
| 210 | size_t max_query_size = 0; | 
|---|
| 211 | if (!internal) | 
|---|
| 212 | max_query_size = settings.max_query_size; | 
|---|
| 213 |  | 
|---|
| 214 | try | 
|---|
| 215 | { | 
|---|
| 216 | /// TODO Parser should fail early when max_query_size limit is reached. | 
|---|
| 217 | ast = parseQuery(parser, begin, end, "", max_query_size); | 
|---|
| 218 |  | 
|---|
| 219 | auto * insert_query = ast->as<ASTInsertQuery>(); | 
|---|
| 220 |  | 
|---|
| 221 | if (insert_query && insert_query->settings_ast) | 
|---|
| 222 | InterpreterSetQuery(insert_query->settings_ast, context).executeForCurrentContext(); | 
|---|
| 223 |  | 
|---|
| 224 | if (insert_query && insert_query->data) | 
|---|
| 225 | { | 
|---|
| 226 | query_end = insert_query->data; | 
|---|
| 227 | insert_query->has_tail = has_query_tail; | 
|---|
| 228 | } | 
|---|
| 229 | else | 
|---|
| 230 | { | 
|---|
| 231 | query_end = end; | 
|---|
| 232 | } | 
|---|
| 233 | } | 
|---|
| 234 | catch (...) | 
|---|
| 235 | { | 
|---|
| 236 | /// Anyway log the query. | 
|---|
| 237 | String query = String(begin, begin + std::min(end - begin, static_cast<ptrdiff_t>(max_query_size))); | 
|---|
| 238 |  | 
|---|
| 239 | auto query_for_logging = prepareQueryForLogging(query, context); | 
|---|
| 240 | logQuery(query_for_logging, context, internal); | 
|---|
| 241 |  | 
|---|
| 242 | if (!internal) | 
|---|
| 243 | onExceptionBeforeStart(query_for_logging, context, current_time); | 
|---|
| 244 |  | 
|---|
| 245 | throw; | 
|---|
| 246 | } | 
|---|
| 247 |  | 
|---|
| 248 | /// Copy query into string. It will be written to log and presented in processlist. If an INSERT query, string will not include data to insertion. | 
|---|
| 249 | String query(begin, query_end); | 
|---|
| 250 | BlockIO res; | 
|---|
| 251 | QueryPipeline & pipeline = res.pipeline; | 
|---|
| 252 |  | 
|---|
| 253 | String query_for_logging = ""; | 
|---|
| 254 |  | 
|---|
| 255 | try | 
|---|
| 256 | { | 
|---|
| 257 | /// Replace ASTQueryParameter with ASTLiteral for prepared statements. | 
|---|
| 258 | if (context.hasQueryParameters()) | 
|---|
| 259 | { | 
|---|
| 260 | ReplaceQueryParameterVisitor visitor(context.getQueryParameters()); | 
|---|
| 261 | visitor.visit(ast); | 
|---|
| 262 |  | 
|---|
| 263 | /// Get new query after substitutions. | 
|---|
| 264 | query = serializeAST(*ast); | 
|---|
| 265 | } | 
|---|
| 266 |  | 
|---|
| 267 | query_for_logging = prepareQueryForLogging(query, context); | 
|---|
| 268 |  | 
|---|
| 269 | logQuery(query_for_logging, context, internal); | 
|---|
| 270 |  | 
|---|
| 271 | /// Check the limits. | 
|---|
| 272 | checkASTSizeLimits(*ast, settings); | 
|---|
| 273 |  | 
|---|
| 274 | /// Put query to process list. But don't put SHOW PROCESSLIST query itself. | 
|---|
| 275 | ProcessList::EntryPtr process_list_entry; | 
|---|
| 276 | if (!internal && !ast->as<ASTShowProcesslistQuery>()) | 
|---|
| 277 | { | 
|---|
| 278 | /// processlist also has query masked now, to avoid secrets leaks though SHOW PROCESSLIST by other users. | 
|---|
| 279 | process_list_entry = context.getProcessList().insert(query_for_logging, ast.get(), context); | 
|---|
| 280 | context.setProcessListElement(&process_list_entry->get()); | 
|---|
| 281 | } | 
|---|
| 282 |  | 
|---|
| 283 | /// Load external tables if they were provided | 
|---|
| 284 | context.initializeExternalTablesIfSet(); | 
|---|
| 285 |  | 
|---|
| 286 | auto * insert_query = ast->as<ASTInsertQuery>(); | 
|---|
| 287 | if (insert_query && insert_query->select) | 
|---|
| 288 | { | 
|---|
| 289 | /// Prepare Input storage before executing interpreter if we already got a buffer with data. | 
|---|
| 290 | if (istr) | 
|---|
| 291 | { | 
|---|
| 292 | ASTPtr input_function; | 
|---|
| 293 | insert_query->tryFindInputFunction(input_function); | 
|---|
| 294 | if (input_function) | 
|---|
| 295 | { | 
|---|
| 296 | StoragePtr storage = context.executeTableFunction(input_function); | 
|---|
| 297 | auto & input_storage = dynamic_cast<StorageInput &>(*storage); | 
|---|
| 298 | BlockInputStreamPtr input_stream = std::make_shared<InputStreamFromASTInsertQuery>(ast, istr, | 
|---|
| 299 | input_storage.getSampleBlock(), context, input_function); | 
|---|
| 300 | input_storage.setInputStream(input_stream); | 
|---|
| 301 | } | 
|---|
| 302 | } | 
|---|
| 303 | } | 
|---|
| 304 | else | 
|---|
| 305 | /// reset Input callbacks if query is not INSERT SELECT | 
|---|
| 306 | context.resetInputCallbacks(); | 
|---|
| 307 |  | 
|---|
| 308 | auto interpreter = InterpreterFactory::get(ast, context, stage); | 
|---|
| 309 | bool use_processors = settings.experimental_use_processors && allow_processors && interpreter->canExecuteWithProcessors(); | 
|---|
| 310 |  | 
|---|
| 311 | QuotaContextPtr quota; | 
|---|
| 312 | if (!interpreter->ignoreQuota()) | 
|---|
| 313 | { | 
|---|
| 314 | quota = context.getQuota(); | 
|---|
| 315 | quota->used(Quota::QUERIES, 1); | 
|---|
| 316 | quota->checkExceeded(Quota::ERRORS); | 
|---|
| 317 | } | 
|---|
| 318 |  | 
|---|
| 319 | IBlockInputStream::LocalLimits limits; | 
|---|
| 320 | if (!interpreter->ignoreLimits()) | 
|---|
| 321 | { | 
|---|
| 322 | limits.mode = IBlockInputStream::LIMITS_CURRENT; | 
|---|
| 323 | limits.size_limits = SizeLimits(settings.max_result_rows, settings.max_result_bytes, settings.result_overflow_mode); | 
|---|
| 324 | } | 
|---|
| 325 |  | 
|---|
| 326 | if (use_processors) | 
|---|
| 327 | pipeline = interpreter->executeWithProcessors(); | 
|---|
| 328 | else | 
|---|
| 329 | res = interpreter->execute(); | 
|---|
| 330 |  | 
|---|
| 331 | if (auto * insert_interpreter = typeid_cast<const InterpreterInsertQuery *>(&*interpreter)) | 
|---|
| 332 | { | 
|---|
| 333 | /// Save insertion table (not table function). TODO: support remote() table function. | 
|---|
| 334 | auto db_table = insert_interpreter->getDatabaseTable(); | 
|---|
| 335 | if (!db_table.second.empty()) | 
|---|
| 336 | context.setInsertionTable(std::move(db_table)); | 
|---|
| 337 | } | 
|---|
| 338 |  | 
|---|
| 339 | if (process_list_entry) | 
|---|
| 340 | { | 
|---|
| 341 | /// Query was killed before execution | 
|---|
| 342 | if ((*process_list_entry)->isKilled()) | 
|---|
| 343 | throw Exception( "Query '"+ (*process_list_entry)->getInfo().client_info.current_query_id + "' is killed in pending state", | 
|---|
| 344 | ErrorCodes::QUERY_WAS_CANCELLED); | 
|---|
| 345 | else if (!use_processors) | 
|---|
| 346 | (*process_list_entry)->setQueryStreams(res); | 
|---|
| 347 | } | 
|---|
| 348 |  | 
|---|
| 349 | /// Hold element of process list till end of query execution. | 
|---|
| 350 | res.process_list_entry = process_list_entry; | 
|---|
| 351 |  | 
|---|
| 352 | if (use_processors) | 
|---|
| 353 | { | 
|---|
| 354 | /// Limits on the result, the quota on the result, and also callback for progress. | 
|---|
| 355 | /// Limits apply only to the final result. | 
|---|
| 356 | pipeline.setProgressCallback(context.getProgressCallback()); | 
|---|
| 357 | pipeline.setProcessListElement(context.getProcessListElement()); | 
|---|
| 358 | if (stage == QueryProcessingStage::Complete) | 
|---|
| 359 | { | 
|---|
| 360 | pipeline.resize(1); | 
|---|
| 361 | pipeline.addSimpleTransform([&](const Block & ) | 
|---|
| 362 | { | 
|---|
| 363 | auto transform = std::make_shared<LimitsCheckingTransform>(header, limits); | 
|---|
| 364 | transform->setQuota(quota); | 
|---|
| 365 | return transform; | 
|---|
| 366 | }); | 
|---|
| 367 | } | 
|---|
| 368 | } | 
|---|
| 369 | else | 
|---|
| 370 | { | 
|---|
| 371 | /// Limits on the result, the quota on the result, and also callback for progress. | 
|---|
| 372 | /// Limits apply only to the final result. | 
|---|
| 373 | if (res.in) | 
|---|
| 374 | { | 
|---|
| 375 | res.in->setProgressCallback(context.getProgressCallback()); | 
|---|
| 376 | res.in->setProcessListElement(context.getProcessListElement()); | 
|---|
| 377 | if (stage == QueryProcessingStage::Complete) | 
|---|
| 378 | { | 
|---|
| 379 | if (!interpreter->ignoreQuota()) | 
|---|
| 380 | res.in->setQuota(quota); | 
|---|
| 381 | if (!interpreter->ignoreLimits()) | 
|---|
| 382 | res.in->setLimits(limits); | 
|---|
| 383 | } | 
|---|
| 384 | } | 
|---|
| 385 |  | 
|---|
| 386 | if (res.out) | 
|---|
| 387 | { | 
|---|
| 388 | if (auto stream = dynamic_cast<CountingBlockOutputStream *>(res.out.get())) | 
|---|
| 389 | { | 
|---|
| 390 | stream->setProcessListElement(context.getProcessListElement()); | 
|---|
| 391 | } | 
|---|
| 392 | } | 
|---|
| 393 | } | 
|---|
| 394 |  | 
|---|
| 395 | /// Everything related to query log. | 
|---|
| 396 | { | 
|---|
| 397 | QueryLogElement elem; | 
|---|
| 398 |  | 
|---|
| 399 | elem.type = QueryLogElement::QUERY_START; | 
|---|
| 400 |  | 
|---|
| 401 | elem.event_time = current_time; | 
|---|
| 402 | elem.query_start_time = current_time; | 
|---|
| 403 |  | 
|---|
| 404 | elem.query = query_for_logging; | 
|---|
| 405 |  | 
|---|
| 406 | elem.client_info = context.getClientInfo(); | 
|---|
| 407 |  | 
|---|
| 408 | bool log_queries = settings.log_queries && !internal; | 
|---|
| 409 |  | 
|---|
| 410 | /// Log into system table start of query execution, if need. | 
|---|
| 411 | if (log_queries) | 
|---|
| 412 | { | 
|---|
| 413 | if (settings.log_query_settings) | 
|---|
| 414 | elem.query_settings = std::make_shared<Settings>(context.getSettingsRef()); | 
|---|
| 415 |  | 
|---|
| 416 | if (auto query_log = context.getQueryLog()) | 
|---|
| 417 | query_log->add(elem); | 
|---|
| 418 | } | 
|---|
| 419 |  | 
|---|
| 420 | /// Also make possible for caller to log successful query finish and exception during execution. | 
|---|
| 421 | auto finish_callback = [elem, &context, log_queries] (IBlockInputStream * stream_in, IBlockOutputStream * stream_out) mutable | 
|---|
| 422 | { | 
|---|
| 423 | QueryStatus * process_list_elem = context.getProcessListElement(); | 
|---|
| 424 |  | 
|---|
| 425 | if (!process_list_elem) | 
|---|
| 426 | return; | 
|---|
| 427 |  | 
|---|
| 428 | /// Update performance counters before logging to query_log | 
|---|
| 429 | CurrentThread::finalizePerformanceCounters(); | 
|---|
| 430 |  | 
|---|
| 431 | QueryStatusInfo info = process_list_elem->getInfo(true, context.getSettingsRef().log_profile_events); | 
|---|
| 432 |  | 
|---|
| 433 | double elapsed_seconds = info.elapsed_seconds; | 
|---|
| 434 |  | 
|---|
| 435 | elem.type = QueryLogElement::QUERY_FINISH; | 
|---|
| 436 |  | 
|---|
| 437 | elem.event_time = time(nullptr); | 
|---|
| 438 | elem.query_duration_ms = elapsed_seconds * 1000; | 
|---|
| 439 |  | 
|---|
| 440 | elem.read_rows = info.read_rows; | 
|---|
| 441 | elem.read_bytes = info.read_bytes; | 
|---|
| 442 |  | 
|---|
| 443 | elem.written_rows = info.written_rows; | 
|---|
| 444 | elem.written_bytes = info.written_bytes; | 
|---|
| 445 |  | 
|---|
| 446 | auto progress_callback = context.getProgressCallback(); | 
|---|
| 447 |  | 
|---|
| 448 | if (progress_callback) | 
|---|
| 449 | progress_callback(Progress(WriteProgress(info.written_rows, info.written_bytes))); | 
|---|
| 450 |  | 
|---|
| 451 | elem.memory_usage = info.peak_memory_usage > 0 ? info.peak_memory_usage : 0; | 
|---|
| 452 |  | 
|---|
| 453 | if (stream_in) | 
|---|
| 454 | { | 
|---|
| 455 | const BlockStreamProfileInfo & stream_in_info = stream_in->getProfileInfo(); | 
|---|
| 456 |  | 
|---|
| 457 | /// NOTE: INSERT SELECT query contains zero metrics | 
|---|
| 458 | elem.result_rows = stream_in_info.rows; | 
|---|
| 459 | elem.result_bytes = stream_in_info.bytes; | 
|---|
| 460 | } | 
|---|
| 461 | else if (stream_out) /// will be used only for ordinary INSERT queries | 
|---|
| 462 | { | 
|---|
| 463 | if (auto counting_stream = dynamic_cast<const CountingBlockOutputStream *>(stream_out)) | 
|---|
| 464 | { | 
|---|
| 465 | /// NOTE: Redundancy. The same values could be extracted from process_list_elem->progress_out.query_settings = process_list_elem->progress_in | 
|---|
| 466 | elem.result_rows = counting_stream->getProgress().read_rows; | 
|---|
| 467 | elem.result_bytes = counting_stream->getProgress().read_bytes; | 
|---|
| 468 | } | 
|---|
| 469 | } | 
|---|
| 470 |  | 
|---|
| 471 | if (elem.read_rows != 0) | 
|---|
| 472 | { | 
|---|
| 473 | LOG_INFO(&Logger::get( "executeQuery"), std::fixed << std::setprecision(3) | 
|---|
| 474 | << "Read "<< elem.read_rows << " rows, " | 
|---|
| 475 | << formatReadableSizeWithBinarySuffix(elem.read_bytes) << " in "<< elapsed_seconds << " sec., " | 
|---|
| 476 | << static_cast<size_t>(elem.read_rows / elapsed_seconds) << " rows/sec., " | 
|---|
| 477 | << formatReadableSizeWithBinarySuffix(elem.read_bytes / elapsed_seconds) << "/sec."); | 
|---|
| 478 | } | 
|---|
| 479 |  | 
|---|
| 480 | elem.thread_numbers = std::move(info.thread_numbers); | 
|---|
| 481 | elem.os_thread_ids = std::move(info.os_thread_ids); | 
|---|
| 482 | elem.profile_counters = std::move(info.profile_counters); | 
|---|
| 483 |  | 
|---|
| 484 | if (log_queries) | 
|---|
| 485 | { | 
|---|
| 486 | if (auto query_log = context.getQueryLog()) | 
|---|
| 487 | query_log->add(elem); | 
|---|
| 488 | } | 
|---|
| 489 | }; | 
|---|
| 490 |  | 
|---|
| 491 | auto exception_callback = [elem, &context, log_queries] () mutable | 
|---|
| 492 | { | 
|---|
| 493 | context.getQuota()->used(Quota::ERRORS, 1, /* check_exceeded = */ false); | 
|---|
| 494 |  | 
|---|
| 495 | elem.type = QueryLogElement::EXCEPTION_WHILE_PROCESSING; | 
|---|
| 496 |  | 
|---|
| 497 | elem.event_time = time(nullptr); | 
|---|
| 498 | elem.query_duration_ms = 1000 * (elem.event_time - elem.query_start_time); | 
|---|
| 499 | elem.exception = getCurrentExceptionMessage(false); | 
|---|
| 500 |  | 
|---|
| 501 | QueryStatus * process_list_elem = context.getProcessListElement(); | 
|---|
| 502 | const Settings & current_settings = context.getSettingsRef(); | 
|---|
| 503 |  | 
|---|
| 504 | /// Update performance counters before logging to query_log | 
|---|
| 505 | CurrentThread::finalizePerformanceCounters(); | 
|---|
| 506 |  | 
|---|
| 507 | if (process_list_elem) | 
|---|
| 508 | { | 
|---|
| 509 | QueryStatusInfo info = process_list_elem->getInfo(true, current_settings.log_profile_events, false); | 
|---|
| 510 |  | 
|---|
| 511 | elem.query_duration_ms = info.elapsed_seconds * 1000; | 
|---|
| 512 |  | 
|---|
| 513 | elem.read_rows = info.read_rows; | 
|---|
| 514 | elem.read_bytes = info.read_bytes; | 
|---|
| 515 |  | 
|---|
| 516 | elem.memory_usage = info.peak_memory_usage > 0 ? info.peak_memory_usage : 0; | 
|---|
| 517 |  | 
|---|
| 518 | elem.thread_numbers = std::move(info.thread_numbers); | 
|---|
| 519 | elem.os_thread_ids = std::move(info.os_thread_ids); | 
|---|
| 520 | elem.profile_counters = std::move(info.profile_counters); | 
|---|
| 521 | } | 
|---|
| 522 |  | 
|---|
| 523 | if (current_settings.calculate_text_stack_trace) | 
|---|
| 524 | setExceptionStackTrace(elem); | 
|---|
| 525 | logException(context, elem); | 
|---|
| 526 |  | 
|---|
| 527 | /// In case of exception we log internal queries also | 
|---|
| 528 | if (log_queries) | 
|---|
| 529 | { | 
|---|
| 530 | if (auto query_log = context.getQueryLog()) | 
|---|
| 531 | query_log->add(elem); | 
|---|
| 532 | } | 
|---|
| 533 | }; | 
|---|
| 534 |  | 
|---|
| 535 | res.finish_callback = std::move(finish_callback); | 
|---|
| 536 | res.exception_callback = std::move(exception_callback); | 
|---|
| 537 |  | 
|---|
| 538 | if (!internal && res.in) | 
|---|
| 539 | { | 
|---|
| 540 | std::stringstream log_str; | 
|---|
| 541 | log_str << "Query pipeline:\n"; | 
|---|
| 542 | res.in->dumpTree(log_str); | 
|---|
| 543 | LOG_DEBUG(&Logger::get( "executeQuery"), log_str.str()); | 
|---|
| 544 | } | 
|---|
| 545 | } | 
|---|
| 546 | } | 
|---|
| 547 | catch (...) | 
|---|
| 548 | { | 
|---|
| 549 | if (!internal) | 
|---|
| 550 | { | 
|---|
| 551 | if (query_for_logging.empty()) | 
|---|
| 552 | query_for_logging = prepareQueryForLogging(query, context); | 
|---|
| 553 |  | 
|---|
| 554 | onExceptionBeforeStart(query_for_logging, context, current_time); | 
|---|
| 555 | } | 
|---|
| 556 |  | 
|---|
| 557 | throw; | 
|---|
| 558 | } | 
|---|
| 559 |  | 
|---|
| 560 | return std::make_tuple(ast, res); | 
|---|
| 561 | } | 
|---|
| 562 |  | 
|---|
| 563 |  | 
|---|
| 564 | BlockIO executeQuery( | 
|---|
| 565 | const String & query, | 
|---|
| 566 | Context & context, | 
|---|
| 567 | bool internal, | 
|---|
| 568 | QueryProcessingStage::Enum stage, | 
|---|
| 569 | bool may_have_embedded_data, | 
|---|
| 570 | bool allow_processors) | 
|---|
| 571 | { | 
|---|
| 572 | ASTPtr ast; | 
|---|
| 573 | BlockIO streams; | 
|---|
| 574 | std::tie(ast, streams) = executeQueryImpl(query.data(), query.data() + query.size(), context, | 
|---|
| 575 | internal, stage, !may_have_embedded_data, nullptr, allow_processors); | 
|---|
| 576 | if (streams.in) | 
|---|
| 577 | { | 
|---|
| 578 | const auto * ast_query_with_output = dynamic_cast<const ASTQueryWithOutput *>(ast.get()); | 
|---|
| 579 | String format_name = ast_query_with_output && (ast_query_with_output->format != nullptr) | 
|---|
| 580 | ? getIdentifierName(ast_query_with_output->format) : context.getDefaultFormat(); | 
|---|
| 581 | if (format_name == "Null") | 
|---|
| 582 | streams.null_format = true; | 
|---|
| 583 | } | 
|---|
| 584 | return streams; | 
|---|
| 585 | } | 
|---|
| 586 |  | 
|---|
| 587 |  | 
|---|
| 588 | void executeQuery( | 
|---|
| 589 | ReadBuffer & istr, | 
|---|
| 590 | WriteBuffer & ostr, | 
|---|
| 591 | bool allow_into_outfile, | 
|---|
| 592 | Context & context, | 
|---|
| 593 | std::function<void(const String &)> set_content_type, | 
|---|
| 594 | std::function<void(const String &)> set_query_id) | 
|---|
| 595 | { | 
|---|
| 596 | PODArray<char> parse_buf; | 
|---|
| 597 | const char * begin; | 
|---|
| 598 | const char * end; | 
|---|
| 599 |  | 
|---|
| 600 | /// If 'istr' is empty now, fetch next data into buffer. | 
|---|
| 601 | if (istr.buffer().size() == 0) | 
|---|
| 602 | istr.next(); | 
|---|
| 603 |  | 
|---|
| 604 | size_t max_query_size = context.getSettingsRef().max_query_size; | 
|---|
| 605 |  | 
|---|
| 606 | bool may_have_tail; | 
|---|
| 607 | if (istr.buffer().end() - istr.position() > static_cast<ssize_t>(max_query_size)) | 
|---|
| 608 | { | 
|---|
| 609 | /// If remaining buffer space in 'istr' is enough to parse query up to 'max_query_size' bytes, then parse inplace. | 
|---|
| 610 | begin = istr.position(); | 
|---|
| 611 | end = istr.buffer().end(); | 
|---|
| 612 | istr.position() += end - begin; | 
|---|
| 613 | /// Actually we don't know will query has additional data or not. | 
|---|
| 614 | /// But we can't check istr.eof(), because begin and end pointers will became invalid | 
|---|
| 615 | may_have_tail = true; | 
|---|
| 616 | } | 
|---|
| 617 | else | 
|---|
| 618 | { | 
|---|
| 619 | /// If not - copy enough data into 'parse_buf'. | 
|---|
| 620 | WriteBufferFromVector<PODArray<char>> out(parse_buf); | 
|---|
| 621 | LimitReadBuffer limit(istr, max_query_size + 1, false); | 
|---|
| 622 | copyData(limit, out); | 
|---|
| 623 | out.finish(); | 
|---|
| 624 |  | 
|---|
| 625 | begin = parse_buf.data(); | 
|---|
| 626 | end = begin + parse_buf.size(); | 
|---|
| 627 | /// Can check stream for eof, because we have copied data | 
|---|
| 628 | may_have_tail = !istr.eof(); | 
|---|
| 629 | } | 
|---|
| 630 |  | 
|---|
| 631 | ASTPtr ast; | 
|---|
| 632 | BlockIO streams; | 
|---|
| 633 |  | 
|---|
| 634 | std::tie(ast, streams) = executeQueryImpl(begin, end, context, false, QueryProcessingStage::Complete, may_have_tail, &istr, true); | 
|---|
| 635 |  | 
|---|
| 636 | auto & pipeline = streams.pipeline; | 
|---|
| 637 |  | 
|---|
| 638 | try | 
|---|
| 639 | { | 
|---|
| 640 | if (streams.out) | 
|---|
| 641 | { | 
|---|
| 642 | InputStreamFromASTInsertQuery in(ast, &istr, streams.out->getHeader(), context, nullptr); | 
|---|
| 643 | copyData(in, *streams.out); | 
|---|
| 644 | } | 
|---|
| 645 |  | 
|---|
| 646 | if (streams.in) | 
|---|
| 647 | { | 
|---|
| 648 | /// FIXME: try to prettify this cast using `as<>()` | 
|---|
| 649 | const auto * ast_query_with_output = dynamic_cast<const ASTQueryWithOutput *>(ast.get()); | 
|---|
| 650 |  | 
|---|
| 651 | WriteBuffer * out_buf = &ostr; | 
|---|
| 652 | std::optional<WriteBufferFromFile> out_file_buf; | 
|---|
| 653 | if (ast_query_with_output && ast_query_with_output->out_file) | 
|---|
| 654 | { | 
|---|
| 655 | if (!allow_into_outfile) | 
|---|
| 656 | throw Exception( "INTO OUTFILE is not allowed", ErrorCodes::INTO_OUTFILE_NOT_ALLOWED); | 
|---|
| 657 |  | 
|---|
| 658 | const auto & out_file = ast_query_with_output->out_file->as<ASTLiteral &>().value.safeGet<std::string>(); | 
|---|
| 659 | out_file_buf.emplace(out_file, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_EXCL | O_CREAT); | 
|---|
| 660 | out_buf = &*out_file_buf; | 
|---|
| 661 | } | 
|---|
| 662 |  | 
|---|
| 663 | String format_name = ast_query_with_output && (ast_query_with_output->format != nullptr) | 
|---|
| 664 | ? getIdentifierName(ast_query_with_output->format) | 
|---|
| 665 | : context.getDefaultFormat(); | 
|---|
| 666 |  | 
|---|
| 667 | if (ast_query_with_output && ast_query_with_output->settings_ast) | 
|---|
| 668 | InterpreterSetQuery(ast_query_with_output->settings_ast, context).executeForCurrentContext(); | 
|---|
| 669 |  | 
|---|
| 670 | BlockOutputStreamPtr out = context.getOutputFormat(format_name, *out_buf, streams.in->getHeader()); | 
|---|
| 671 |  | 
|---|
| 672 | /// Save previous progress callback if any. TODO Do it more conveniently. | 
|---|
| 673 | auto previous_progress_callback = context.getProgressCallback(); | 
|---|
| 674 |  | 
|---|
| 675 | /// NOTE Progress callback takes shared ownership of 'out'. | 
|---|
| 676 | streams.in->setProgressCallback([out, previous_progress_callback] (const Progress & progress) | 
|---|
| 677 | { | 
|---|
| 678 | if (previous_progress_callback) | 
|---|
| 679 | previous_progress_callback(progress); | 
|---|
| 680 | out->onProgress(progress); | 
|---|
| 681 | }); | 
|---|
| 682 |  | 
|---|
| 683 | if (set_content_type) | 
|---|
| 684 | set_content_type(out->getContentType()); | 
|---|
| 685 |  | 
|---|
| 686 | if (set_query_id) | 
|---|
| 687 | set_query_id(context.getClientInfo().current_query_id); | 
|---|
| 688 |  | 
|---|
| 689 | if (ast->as<ASTWatchQuery>()) | 
|---|
| 690 | { | 
|---|
| 691 | /// For Watch query, flush data if block is empty (to send data to client). | 
|---|
| 692 | auto flush_callback = [&out](const Block & block) | 
|---|
| 693 | { | 
|---|
| 694 | if (block.rows() == 0) | 
|---|
| 695 | out->flush(); | 
|---|
| 696 | }; | 
|---|
| 697 |  | 
|---|
| 698 | copyData(*streams.in, *out, [](){ return false; }, std::move(flush_callback)); | 
|---|
| 699 | } | 
|---|
| 700 | else | 
|---|
| 701 | copyData(*streams.in, *out); | 
|---|
| 702 | } | 
|---|
| 703 |  | 
|---|
| 704 | if (pipeline.initialized()) | 
|---|
| 705 | { | 
|---|
| 706 | const ASTQueryWithOutput * ast_query_with_output = dynamic_cast<const ASTQueryWithOutput *>(ast.get()); | 
|---|
| 707 |  | 
|---|
| 708 | WriteBuffer * out_buf = &ostr; | 
|---|
| 709 | std::optional<WriteBufferFromFile> out_file_buf; | 
|---|
| 710 | if (ast_query_with_output && ast_query_with_output->out_file) | 
|---|
| 711 | { | 
|---|
| 712 | if (!allow_into_outfile) | 
|---|
| 713 | throw Exception( "INTO OUTFILE is not allowed", ErrorCodes::INTO_OUTFILE_NOT_ALLOWED); | 
|---|
| 714 |  | 
|---|
| 715 | const auto & out_file = typeid_cast<const ASTLiteral &>(*ast_query_with_output->out_file).value.safeGet<std::string>(); | 
|---|
| 716 | out_file_buf.emplace(out_file, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_EXCL | O_CREAT); | 
|---|
| 717 | out_buf = &*out_file_buf; | 
|---|
| 718 | } | 
|---|
| 719 |  | 
|---|
| 720 | String format_name = ast_query_with_output && (ast_query_with_output->format != nullptr) | 
|---|
| 721 | ? getIdentifierName(ast_query_with_output->format) | 
|---|
| 722 | : context.getDefaultFormat(); | 
|---|
| 723 |  | 
|---|
| 724 | if (ast_query_with_output && ast_query_with_output->settings_ast) | 
|---|
| 725 | InterpreterSetQuery(ast_query_with_output->settings_ast, context).executeForCurrentContext(); | 
|---|
| 726 |  | 
|---|
| 727 | pipeline.addSimpleTransform([](const Block & ) | 
|---|
| 728 | { | 
|---|
| 729 | return std::make_shared<MaterializingTransform>(header); | 
|---|
| 730 | }); | 
|---|
| 731 |  | 
|---|
| 732 | auto out = context.getOutputFormatProcessor(format_name, *out_buf, pipeline.getHeader()); | 
|---|
| 733 |  | 
|---|
| 734 | /// Save previous progress callback if any. TODO Do it more conveniently. | 
|---|
| 735 | auto previous_progress_callback = context.getProgressCallback(); | 
|---|
| 736 |  | 
|---|
| 737 | /// NOTE Progress callback takes shared ownership of 'out'. | 
|---|
| 738 | pipeline.setProgressCallback([out, previous_progress_callback] (const Progress & progress) | 
|---|
| 739 | { | 
|---|
| 740 | if (previous_progress_callback) | 
|---|
| 741 | previous_progress_callback(progress); | 
|---|
| 742 | out->onProgress(progress); | 
|---|
| 743 | }); | 
|---|
| 744 |  | 
|---|
| 745 | if (set_content_type) | 
|---|
| 746 | set_content_type(out->getContentType()); | 
|---|
| 747 |  | 
|---|
| 748 | if (set_query_id) | 
|---|
| 749 | set_query_id(context.getClientInfo().current_query_id); | 
|---|
| 750 |  | 
|---|
| 751 | pipeline.setOutput(std::move(out)); | 
|---|
| 752 |  | 
|---|
| 753 | { | 
|---|
| 754 | auto executor = pipeline.execute(); | 
|---|
| 755 | executor->execute(context.getSettingsRef().max_threads); | 
|---|
| 756 | } | 
|---|
| 757 |  | 
|---|
| 758 | pipeline.finalize(); | 
|---|
| 759 | } | 
|---|
| 760 | } | 
|---|
| 761 | catch (...) | 
|---|
| 762 | { | 
|---|
| 763 | streams.onException(); | 
|---|
| 764 | throw; | 
|---|
| 765 | } | 
|---|
| 766 |  | 
|---|
| 767 | streams.onFinish(); | 
|---|
| 768 | } | 
|---|
| 769 |  | 
|---|
| 770 | } | 
|---|
| 771 |  | 
|---|