1 | #include <Common/formatReadable.h> |
2 | #include <Common/PODArray.h> |
3 | #include <Common/typeid_cast.h> |
4 | |
5 | #include <IO/ConcatReadBuffer.h> |
6 | #include <IO/WriteBufferFromFile.h> |
7 | #include <IO/WriteBufferFromVector.h> |
8 | #include <IO/LimitReadBuffer.h> |
9 | #include <IO/copyData.h> |
10 | |
11 | #include <DataStreams/BlockIO.h> |
12 | #include <DataStreams/copyData.h> |
13 | #include <DataStreams/IBlockInputStream.h> |
14 | #include <DataStreams/InputStreamFromASTInsertQuery.h> |
15 | #include <DataStreams/CountingBlockOutputStream.h> |
16 | |
17 | #include <Parsers/ASTInsertQuery.h> |
18 | #include <Parsers/ASTShowProcesslistQuery.h> |
19 | #include <Parsers/ASTIdentifier.h> |
20 | #include <Parsers/ASTLiteral.h> |
21 | #include <Parsers/ParserQuery.h> |
22 | #include <Parsers/parseQuery.h> |
23 | #include <Parsers/queryToString.h> |
24 | |
25 | #include <Storages/StorageInput.h> |
26 | |
27 | #include <Access/QuotaContext.h> |
28 | #include <Interpreters/InterpreterFactory.h> |
29 | #include <Interpreters/ProcessList.h> |
30 | #include <Interpreters/QueryLog.h> |
31 | #include <Interpreters/InterpreterSetQuery.h> |
32 | #include <Interpreters/ReplaceQueryParameterVisitor.h> |
33 | #include <Interpreters/executeQuery.h> |
34 | #include <Common/ProfileEvents.h> |
35 | |
36 | #include <Interpreters/DNSCacheUpdater.h> |
37 | #include <Common/SensitiveDataMasker.h> |
38 | |
39 | #include <Processors/Transforms/LimitsCheckingTransform.h> |
40 | #include <Processors/Transforms/MaterializingTransform.h> |
41 | #include <Processors/Formats/IOutputFormat.h> |
42 | #include <Parsers/ASTWatchQuery.h> |
43 | |
44 | namespace ProfileEvents |
45 | { |
46 | extern const Event QueryMaskingRulesMatch; |
47 | } |
48 | |
49 | namespace DB |
50 | { |
51 | |
52 | namespace ErrorCodes |
53 | { |
54 | extern const int LOGICAL_ERROR; |
55 | extern const int QUERY_IS_TOO_LARGE; |
56 | extern const int INTO_OUTFILE_NOT_ALLOWED; |
57 | extern const int QUERY_WAS_CANCELLED; |
58 | } |
59 | |
60 | |
61 | static void checkASTSizeLimits(const IAST & ast, const Settings & settings) |
62 | { |
63 | if (settings.max_ast_depth) |
64 | ast.checkDepth(settings.max_ast_depth); |
65 | if (settings.max_ast_elements) |
66 | ast.checkSize(settings.max_ast_elements); |
67 | } |
68 | |
69 | /// NOTE This is wrong in case of single-line comments and in case of multiline string literals. |
70 | static String joinLines(const String & query) |
71 | { |
72 | String res = query; |
73 | std::replace(res.begin(), res.end(), '\n', ' '); |
74 | return res; |
75 | } |
76 | |
77 | |
78 | static String prepareQueryForLogging(const String & query, Context & context) |
79 | { |
80 | String res = query; |
81 | |
82 | // wiping sensitive data before cropping query by log_queries_cut_to_length, |
83 | // otherwise something like credit card without last digit can go to log |
84 | if (auto masker = SensitiveDataMasker::getInstance()) |
85 | { |
86 | auto matches = masker->wipeSensitiveData(res); |
87 | if (matches > 0) |
88 | { |
89 | ProfileEvents::increment(ProfileEvents::QueryMaskingRulesMatch, matches); |
90 | } |
91 | } |
92 | |
93 | res = res.substr(0, context.getSettingsRef().log_queries_cut_to_length); |
94 | |
95 | return res; |
96 | } |
97 | |
98 | |
99 | /// Log query into text log (not into system table). |
100 | static void logQuery(const String & query, const Context & context, bool internal) |
101 | { |
102 | if (internal) |
103 | { |
104 | LOG_DEBUG(&Logger::get("executeQuery" ), "(internal) " << joinLines(query)); |
105 | } |
106 | else |
107 | { |
108 | const auto & current_query_id = context.getClientInfo().current_query_id; |
109 | const auto & initial_query_id = context.getClientInfo().initial_query_id; |
110 | const auto & current_user = context.getClientInfo().current_user; |
111 | |
112 | LOG_DEBUG(&Logger::get("executeQuery" ), "(from " << context.getClientInfo().current_address.toString() |
113 | << (current_user != "default" ? ", user: " + context.getClientInfo().current_user : "" ) |
114 | << (!initial_query_id.empty() && current_query_id != initial_query_id ? ", initial_query_id: " + initial_query_id : std::string()) |
115 | << ") " |
116 | << joinLines(query)); |
117 | } |
118 | } |
119 | |
120 | |
121 | /// Call this inside catch block. |
122 | static void setExceptionStackTrace(QueryLogElement & elem) |
123 | { |
124 | /// Disable memory tracker for stack trace. |
125 | /// Because if exception is "Memory limit (for query) exceed", then we probably can't allocate another one string. |
126 | auto temporarily_disable_memory_tracker = getCurrentMemoryTrackerActionLock(); |
127 | |
128 | try |
129 | { |
130 | throw; |
131 | } |
132 | catch (const Exception & e) |
133 | { |
134 | elem.stack_trace = e.getStackTrace().toString(); |
135 | } |
136 | catch (...) {} |
137 | } |
138 | |
139 | |
140 | /// Log exception (with query info) into text log (not into system table). |
141 | static void logException(Context & context, QueryLogElement & elem) |
142 | { |
143 | LOG_ERROR(&Logger::get("executeQuery" ), elem.exception |
144 | << " (from " << context.getClientInfo().current_address.toString() << ")" |
145 | << " (in query: " << joinLines(elem.query) << ")" |
146 | << (!elem.stack_trace.empty() ? ", Stack trace (when copying this message, always include the lines below):\n\n" + elem.stack_trace : "" )); |
147 | } |
148 | |
149 | |
150 | static void onExceptionBeforeStart(const String & query_for_logging, Context & context, time_t current_time) |
151 | { |
152 | /// Exception before the query execution. |
153 | context.getQuota()->used(Quota::ERRORS, 1, /* check_exceeded = */ false); |
154 | |
155 | const Settings & settings = context.getSettingsRef(); |
156 | |
157 | /// Log the start of query execution into the table if necessary. |
158 | QueryLogElement elem; |
159 | |
160 | elem.type = QueryLogElement::EXCEPTION_BEFORE_START; |
161 | |
162 | elem.event_time = current_time; |
163 | elem.query_start_time = current_time; |
164 | |
165 | elem.query = query_for_logging; |
166 | elem.exception = getCurrentExceptionMessage(false); |
167 | |
168 | elem.client_info = context.getClientInfo(); |
169 | |
170 | if (settings.calculate_text_stack_trace) |
171 | setExceptionStackTrace(elem); |
172 | logException(context, elem); |
173 | |
174 | /// Update performance counters before logging to query_log |
175 | CurrentThread::finalizePerformanceCounters(); |
176 | |
177 | if (settings.log_queries) |
178 | if (auto query_log = context.getQueryLog()) |
179 | query_log->add(elem); |
180 | } |
181 | |
182 | |
183 | static std::tuple<ASTPtr, BlockIO> executeQueryImpl( |
184 | const char * begin, |
185 | const char * end, |
186 | Context & context, |
187 | bool internal, |
188 | QueryProcessingStage::Enum stage, |
189 | bool has_query_tail, |
190 | ReadBuffer * istr, |
191 | bool allow_processors) |
192 | { |
193 | time_t current_time = time(nullptr); |
194 | |
195 | /// If we already executing query and it requires to execute internal query, than |
196 | /// don't replace thread context with given (it can be temporary). Otherwise, attach context to thread. |
197 | if (!internal) |
198 | { |
199 | context.makeQueryContext(); |
200 | CurrentThread::attachQueryContext(context); |
201 | } |
202 | |
203 | const Settings & settings = context.getSettingsRef(); |
204 | |
205 | ParserQuery parser(end, settings.enable_debug_queries); |
206 | ASTPtr ast; |
207 | const char * query_end; |
208 | |
209 | /// Don't limit the size of internal queries. |
210 | size_t max_query_size = 0; |
211 | if (!internal) |
212 | max_query_size = settings.max_query_size; |
213 | |
214 | try |
215 | { |
216 | /// TODO Parser should fail early when max_query_size limit is reached. |
217 | ast = parseQuery(parser, begin, end, "" , max_query_size); |
218 | |
219 | auto * insert_query = ast->as<ASTInsertQuery>(); |
220 | |
221 | if (insert_query && insert_query->settings_ast) |
222 | InterpreterSetQuery(insert_query->settings_ast, context).executeForCurrentContext(); |
223 | |
224 | if (insert_query && insert_query->data) |
225 | { |
226 | query_end = insert_query->data; |
227 | insert_query->has_tail = has_query_tail; |
228 | } |
229 | else |
230 | { |
231 | query_end = end; |
232 | } |
233 | } |
234 | catch (...) |
235 | { |
236 | /// Anyway log the query. |
237 | String query = String(begin, begin + std::min(end - begin, static_cast<ptrdiff_t>(max_query_size))); |
238 | |
239 | auto query_for_logging = prepareQueryForLogging(query, context); |
240 | logQuery(query_for_logging, context, internal); |
241 | |
242 | if (!internal) |
243 | onExceptionBeforeStart(query_for_logging, context, current_time); |
244 | |
245 | throw; |
246 | } |
247 | |
248 | /// Copy query into string. It will be written to log and presented in processlist. If an INSERT query, string will not include data to insertion. |
249 | String query(begin, query_end); |
250 | BlockIO res; |
251 | QueryPipeline & pipeline = res.pipeline; |
252 | |
253 | String query_for_logging = "" ; |
254 | |
255 | try |
256 | { |
257 | /// Replace ASTQueryParameter with ASTLiteral for prepared statements. |
258 | if (context.hasQueryParameters()) |
259 | { |
260 | ReplaceQueryParameterVisitor visitor(context.getQueryParameters()); |
261 | visitor.visit(ast); |
262 | |
263 | /// Get new query after substitutions. |
264 | query = serializeAST(*ast); |
265 | } |
266 | |
267 | query_for_logging = prepareQueryForLogging(query, context); |
268 | |
269 | logQuery(query_for_logging, context, internal); |
270 | |
271 | /// Check the limits. |
272 | checkASTSizeLimits(*ast, settings); |
273 | |
274 | /// Put query to process list. But don't put SHOW PROCESSLIST query itself. |
275 | ProcessList::EntryPtr process_list_entry; |
276 | if (!internal && !ast->as<ASTShowProcesslistQuery>()) |
277 | { |
278 | /// processlist also has query masked now, to avoid secrets leaks though SHOW PROCESSLIST by other users. |
279 | process_list_entry = context.getProcessList().insert(query_for_logging, ast.get(), context); |
280 | context.setProcessListElement(&process_list_entry->get()); |
281 | } |
282 | |
283 | /// Load external tables if they were provided |
284 | context.initializeExternalTablesIfSet(); |
285 | |
286 | auto * insert_query = ast->as<ASTInsertQuery>(); |
287 | if (insert_query && insert_query->select) |
288 | { |
289 | /// Prepare Input storage before executing interpreter if we already got a buffer with data. |
290 | if (istr) |
291 | { |
292 | ASTPtr input_function; |
293 | insert_query->tryFindInputFunction(input_function); |
294 | if (input_function) |
295 | { |
296 | StoragePtr storage = context.executeTableFunction(input_function); |
297 | auto & input_storage = dynamic_cast<StorageInput &>(*storage); |
298 | BlockInputStreamPtr input_stream = std::make_shared<InputStreamFromASTInsertQuery>(ast, istr, |
299 | input_storage.getSampleBlock(), context, input_function); |
300 | input_storage.setInputStream(input_stream); |
301 | } |
302 | } |
303 | } |
304 | else |
305 | /// reset Input callbacks if query is not INSERT SELECT |
306 | context.resetInputCallbacks(); |
307 | |
308 | auto interpreter = InterpreterFactory::get(ast, context, stage); |
309 | bool use_processors = settings.experimental_use_processors && allow_processors && interpreter->canExecuteWithProcessors(); |
310 | |
311 | QuotaContextPtr quota; |
312 | if (!interpreter->ignoreQuota()) |
313 | { |
314 | quota = context.getQuota(); |
315 | quota->used(Quota::QUERIES, 1); |
316 | quota->checkExceeded(Quota::ERRORS); |
317 | } |
318 | |
319 | IBlockInputStream::LocalLimits limits; |
320 | if (!interpreter->ignoreLimits()) |
321 | { |
322 | limits.mode = IBlockInputStream::LIMITS_CURRENT; |
323 | limits.size_limits = SizeLimits(settings.max_result_rows, settings.max_result_bytes, settings.result_overflow_mode); |
324 | } |
325 | |
326 | if (use_processors) |
327 | pipeline = interpreter->executeWithProcessors(); |
328 | else |
329 | res = interpreter->execute(); |
330 | |
331 | if (auto * insert_interpreter = typeid_cast<const InterpreterInsertQuery *>(&*interpreter)) |
332 | { |
333 | /// Save insertion table (not table function). TODO: support remote() table function. |
334 | auto db_table = insert_interpreter->getDatabaseTable(); |
335 | if (!db_table.second.empty()) |
336 | context.setInsertionTable(std::move(db_table)); |
337 | } |
338 | |
339 | if (process_list_entry) |
340 | { |
341 | /// Query was killed before execution |
342 | if ((*process_list_entry)->isKilled()) |
343 | throw Exception("Query '" + (*process_list_entry)->getInfo().client_info.current_query_id + "' is killed in pending state" , |
344 | ErrorCodes::QUERY_WAS_CANCELLED); |
345 | else if (!use_processors) |
346 | (*process_list_entry)->setQueryStreams(res); |
347 | } |
348 | |
349 | /// Hold element of process list till end of query execution. |
350 | res.process_list_entry = process_list_entry; |
351 | |
352 | if (use_processors) |
353 | { |
354 | /// Limits on the result, the quota on the result, and also callback for progress. |
355 | /// Limits apply only to the final result. |
356 | pipeline.setProgressCallback(context.getProgressCallback()); |
357 | pipeline.setProcessListElement(context.getProcessListElement()); |
358 | if (stage == QueryProcessingStage::Complete) |
359 | { |
360 | pipeline.resize(1); |
361 | pipeline.addSimpleTransform([&](const Block & ) |
362 | { |
363 | auto transform = std::make_shared<LimitsCheckingTransform>(header, limits); |
364 | transform->setQuota(quota); |
365 | return transform; |
366 | }); |
367 | } |
368 | } |
369 | else |
370 | { |
371 | /// Limits on the result, the quota on the result, and also callback for progress. |
372 | /// Limits apply only to the final result. |
373 | if (res.in) |
374 | { |
375 | res.in->setProgressCallback(context.getProgressCallback()); |
376 | res.in->setProcessListElement(context.getProcessListElement()); |
377 | if (stage == QueryProcessingStage::Complete) |
378 | { |
379 | if (!interpreter->ignoreQuota()) |
380 | res.in->setQuota(quota); |
381 | if (!interpreter->ignoreLimits()) |
382 | res.in->setLimits(limits); |
383 | } |
384 | } |
385 | |
386 | if (res.out) |
387 | { |
388 | if (auto stream = dynamic_cast<CountingBlockOutputStream *>(res.out.get())) |
389 | { |
390 | stream->setProcessListElement(context.getProcessListElement()); |
391 | } |
392 | } |
393 | } |
394 | |
395 | /// Everything related to query log. |
396 | { |
397 | QueryLogElement elem; |
398 | |
399 | elem.type = QueryLogElement::QUERY_START; |
400 | |
401 | elem.event_time = current_time; |
402 | elem.query_start_time = current_time; |
403 | |
404 | elem.query = query_for_logging; |
405 | |
406 | elem.client_info = context.getClientInfo(); |
407 | |
408 | bool log_queries = settings.log_queries && !internal; |
409 | |
410 | /// Log into system table start of query execution, if need. |
411 | if (log_queries) |
412 | { |
413 | if (settings.log_query_settings) |
414 | elem.query_settings = std::make_shared<Settings>(context.getSettingsRef()); |
415 | |
416 | if (auto query_log = context.getQueryLog()) |
417 | query_log->add(elem); |
418 | } |
419 | |
420 | /// Also make possible for caller to log successful query finish and exception during execution. |
421 | auto finish_callback = [elem, &context, log_queries] (IBlockInputStream * stream_in, IBlockOutputStream * stream_out) mutable |
422 | { |
423 | QueryStatus * process_list_elem = context.getProcessListElement(); |
424 | |
425 | if (!process_list_elem) |
426 | return; |
427 | |
428 | /// Update performance counters before logging to query_log |
429 | CurrentThread::finalizePerformanceCounters(); |
430 | |
431 | QueryStatusInfo info = process_list_elem->getInfo(true, context.getSettingsRef().log_profile_events); |
432 | |
433 | double elapsed_seconds = info.elapsed_seconds; |
434 | |
435 | elem.type = QueryLogElement::QUERY_FINISH; |
436 | |
437 | elem.event_time = time(nullptr); |
438 | elem.query_duration_ms = elapsed_seconds * 1000; |
439 | |
440 | elem.read_rows = info.read_rows; |
441 | elem.read_bytes = info.read_bytes; |
442 | |
443 | elem.written_rows = info.written_rows; |
444 | elem.written_bytes = info.written_bytes; |
445 | |
446 | auto progress_callback = context.getProgressCallback(); |
447 | |
448 | if (progress_callback) |
449 | progress_callback(Progress(WriteProgress(info.written_rows, info.written_bytes))); |
450 | |
451 | elem.memory_usage = info.peak_memory_usage > 0 ? info.peak_memory_usage : 0; |
452 | |
453 | if (stream_in) |
454 | { |
455 | const BlockStreamProfileInfo & stream_in_info = stream_in->getProfileInfo(); |
456 | |
457 | /// NOTE: INSERT SELECT query contains zero metrics |
458 | elem.result_rows = stream_in_info.rows; |
459 | elem.result_bytes = stream_in_info.bytes; |
460 | } |
461 | else if (stream_out) /// will be used only for ordinary INSERT queries |
462 | { |
463 | if (auto counting_stream = dynamic_cast<const CountingBlockOutputStream *>(stream_out)) |
464 | { |
465 | /// NOTE: Redundancy. The same values could be extracted from process_list_elem->progress_out.query_settings = process_list_elem->progress_in |
466 | elem.result_rows = counting_stream->getProgress().read_rows; |
467 | elem.result_bytes = counting_stream->getProgress().read_bytes; |
468 | } |
469 | } |
470 | |
471 | if (elem.read_rows != 0) |
472 | { |
473 | LOG_INFO(&Logger::get("executeQuery" ), std::fixed << std::setprecision(3) |
474 | << "Read " << elem.read_rows << " rows, " |
475 | << formatReadableSizeWithBinarySuffix(elem.read_bytes) << " in " << elapsed_seconds << " sec., " |
476 | << static_cast<size_t>(elem.read_rows / elapsed_seconds) << " rows/sec., " |
477 | << formatReadableSizeWithBinarySuffix(elem.read_bytes / elapsed_seconds) << "/sec." ); |
478 | } |
479 | |
480 | elem.thread_numbers = std::move(info.thread_numbers); |
481 | elem.os_thread_ids = std::move(info.os_thread_ids); |
482 | elem.profile_counters = std::move(info.profile_counters); |
483 | |
484 | if (log_queries) |
485 | { |
486 | if (auto query_log = context.getQueryLog()) |
487 | query_log->add(elem); |
488 | } |
489 | }; |
490 | |
491 | auto exception_callback = [elem, &context, log_queries] () mutable |
492 | { |
493 | context.getQuota()->used(Quota::ERRORS, 1, /* check_exceeded = */ false); |
494 | |
495 | elem.type = QueryLogElement::EXCEPTION_WHILE_PROCESSING; |
496 | |
497 | elem.event_time = time(nullptr); |
498 | elem.query_duration_ms = 1000 * (elem.event_time - elem.query_start_time); |
499 | elem.exception = getCurrentExceptionMessage(false); |
500 | |
501 | QueryStatus * process_list_elem = context.getProcessListElement(); |
502 | const Settings & current_settings = context.getSettingsRef(); |
503 | |
504 | /// Update performance counters before logging to query_log |
505 | CurrentThread::finalizePerformanceCounters(); |
506 | |
507 | if (process_list_elem) |
508 | { |
509 | QueryStatusInfo info = process_list_elem->getInfo(true, current_settings.log_profile_events, false); |
510 | |
511 | elem.query_duration_ms = info.elapsed_seconds * 1000; |
512 | |
513 | elem.read_rows = info.read_rows; |
514 | elem.read_bytes = info.read_bytes; |
515 | |
516 | elem.memory_usage = info.peak_memory_usage > 0 ? info.peak_memory_usage : 0; |
517 | |
518 | elem.thread_numbers = std::move(info.thread_numbers); |
519 | elem.os_thread_ids = std::move(info.os_thread_ids); |
520 | elem.profile_counters = std::move(info.profile_counters); |
521 | } |
522 | |
523 | if (current_settings.calculate_text_stack_trace) |
524 | setExceptionStackTrace(elem); |
525 | logException(context, elem); |
526 | |
527 | /// In case of exception we log internal queries also |
528 | if (log_queries) |
529 | { |
530 | if (auto query_log = context.getQueryLog()) |
531 | query_log->add(elem); |
532 | } |
533 | }; |
534 | |
535 | res.finish_callback = std::move(finish_callback); |
536 | res.exception_callback = std::move(exception_callback); |
537 | |
538 | if (!internal && res.in) |
539 | { |
540 | std::stringstream log_str; |
541 | log_str << "Query pipeline:\n" ; |
542 | res.in->dumpTree(log_str); |
543 | LOG_DEBUG(&Logger::get("executeQuery" ), log_str.str()); |
544 | } |
545 | } |
546 | } |
547 | catch (...) |
548 | { |
549 | if (!internal) |
550 | { |
551 | if (query_for_logging.empty()) |
552 | query_for_logging = prepareQueryForLogging(query, context); |
553 | |
554 | onExceptionBeforeStart(query_for_logging, context, current_time); |
555 | } |
556 | |
557 | throw; |
558 | } |
559 | |
560 | return std::make_tuple(ast, res); |
561 | } |
562 | |
563 | |
564 | BlockIO executeQuery( |
565 | const String & query, |
566 | Context & context, |
567 | bool internal, |
568 | QueryProcessingStage::Enum stage, |
569 | bool may_have_embedded_data, |
570 | bool allow_processors) |
571 | { |
572 | ASTPtr ast; |
573 | BlockIO streams; |
574 | std::tie(ast, streams) = executeQueryImpl(query.data(), query.data() + query.size(), context, |
575 | internal, stage, !may_have_embedded_data, nullptr, allow_processors); |
576 | if (streams.in) |
577 | { |
578 | const auto * ast_query_with_output = dynamic_cast<const ASTQueryWithOutput *>(ast.get()); |
579 | String format_name = ast_query_with_output && (ast_query_with_output->format != nullptr) |
580 | ? getIdentifierName(ast_query_with_output->format) : context.getDefaultFormat(); |
581 | if (format_name == "Null" ) |
582 | streams.null_format = true; |
583 | } |
584 | return streams; |
585 | } |
586 | |
587 | |
588 | void executeQuery( |
589 | ReadBuffer & istr, |
590 | WriteBuffer & ostr, |
591 | bool allow_into_outfile, |
592 | Context & context, |
593 | std::function<void(const String &)> set_content_type, |
594 | std::function<void(const String &)> set_query_id) |
595 | { |
596 | PODArray<char> parse_buf; |
597 | const char * begin; |
598 | const char * end; |
599 | |
600 | /// If 'istr' is empty now, fetch next data into buffer. |
601 | if (istr.buffer().size() == 0) |
602 | istr.next(); |
603 | |
604 | size_t max_query_size = context.getSettingsRef().max_query_size; |
605 | |
606 | bool may_have_tail; |
607 | if (istr.buffer().end() - istr.position() > static_cast<ssize_t>(max_query_size)) |
608 | { |
609 | /// If remaining buffer space in 'istr' is enough to parse query up to 'max_query_size' bytes, then parse inplace. |
610 | begin = istr.position(); |
611 | end = istr.buffer().end(); |
612 | istr.position() += end - begin; |
613 | /// Actually we don't know will query has additional data or not. |
614 | /// But we can't check istr.eof(), because begin and end pointers will became invalid |
615 | may_have_tail = true; |
616 | } |
617 | else |
618 | { |
619 | /// If not - copy enough data into 'parse_buf'. |
620 | WriteBufferFromVector<PODArray<char>> out(parse_buf); |
621 | LimitReadBuffer limit(istr, max_query_size + 1, false); |
622 | copyData(limit, out); |
623 | out.finish(); |
624 | |
625 | begin = parse_buf.data(); |
626 | end = begin + parse_buf.size(); |
627 | /// Can check stream for eof, because we have copied data |
628 | may_have_tail = !istr.eof(); |
629 | } |
630 | |
631 | ASTPtr ast; |
632 | BlockIO streams; |
633 | |
634 | std::tie(ast, streams) = executeQueryImpl(begin, end, context, false, QueryProcessingStage::Complete, may_have_tail, &istr, true); |
635 | |
636 | auto & pipeline = streams.pipeline; |
637 | |
638 | try |
639 | { |
640 | if (streams.out) |
641 | { |
642 | InputStreamFromASTInsertQuery in(ast, &istr, streams.out->getHeader(), context, nullptr); |
643 | copyData(in, *streams.out); |
644 | } |
645 | |
646 | if (streams.in) |
647 | { |
648 | /// FIXME: try to prettify this cast using `as<>()` |
649 | const auto * ast_query_with_output = dynamic_cast<const ASTQueryWithOutput *>(ast.get()); |
650 | |
651 | WriteBuffer * out_buf = &ostr; |
652 | std::optional<WriteBufferFromFile> out_file_buf; |
653 | if (ast_query_with_output && ast_query_with_output->out_file) |
654 | { |
655 | if (!allow_into_outfile) |
656 | throw Exception("INTO OUTFILE is not allowed" , ErrorCodes::INTO_OUTFILE_NOT_ALLOWED); |
657 | |
658 | const auto & out_file = ast_query_with_output->out_file->as<ASTLiteral &>().value.safeGet<std::string>(); |
659 | out_file_buf.emplace(out_file, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_EXCL | O_CREAT); |
660 | out_buf = &*out_file_buf; |
661 | } |
662 | |
663 | String format_name = ast_query_with_output && (ast_query_with_output->format != nullptr) |
664 | ? getIdentifierName(ast_query_with_output->format) |
665 | : context.getDefaultFormat(); |
666 | |
667 | if (ast_query_with_output && ast_query_with_output->settings_ast) |
668 | InterpreterSetQuery(ast_query_with_output->settings_ast, context).executeForCurrentContext(); |
669 | |
670 | BlockOutputStreamPtr out = context.getOutputFormat(format_name, *out_buf, streams.in->getHeader()); |
671 | |
672 | /// Save previous progress callback if any. TODO Do it more conveniently. |
673 | auto previous_progress_callback = context.getProgressCallback(); |
674 | |
675 | /// NOTE Progress callback takes shared ownership of 'out'. |
676 | streams.in->setProgressCallback([out, previous_progress_callback] (const Progress & progress) |
677 | { |
678 | if (previous_progress_callback) |
679 | previous_progress_callback(progress); |
680 | out->onProgress(progress); |
681 | }); |
682 | |
683 | if (set_content_type) |
684 | set_content_type(out->getContentType()); |
685 | |
686 | if (set_query_id) |
687 | set_query_id(context.getClientInfo().current_query_id); |
688 | |
689 | if (ast->as<ASTWatchQuery>()) |
690 | { |
691 | /// For Watch query, flush data if block is empty (to send data to client). |
692 | auto flush_callback = [&out](const Block & block) |
693 | { |
694 | if (block.rows() == 0) |
695 | out->flush(); |
696 | }; |
697 | |
698 | copyData(*streams.in, *out, [](){ return false; }, std::move(flush_callback)); |
699 | } |
700 | else |
701 | copyData(*streams.in, *out); |
702 | } |
703 | |
704 | if (pipeline.initialized()) |
705 | { |
706 | const ASTQueryWithOutput * ast_query_with_output = dynamic_cast<const ASTQueryWithOutput *>(ast.get()); |
707 | |
708 | WriteBuffer * out_buf = &ostr; |
709 | std::optional<WriteBufferFromFile> out_file_buf; |
710 | if (ast_query_with_output && ast_query_with_output->out_file) |
711 | { |
712 | if (!allow_into_outfile) |
713 | throw Exception("INTO OUTFILE is not allowed" , ErrorCodes::INTO_OUTFILE_NOT_ALLOWED); |
714 | |
715 | const auto & out_file = typeid_cast<const ASTLiteral &>(*ast_query_with_output->out_file).value.safeGet<std::string>(); |
716 | out_file_buf.emplace(out_file, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_EXCL | O_CREAT); |
717 | out_buf = &*out_file_buf; |
718 | } |
719 | |
720 | String format_name = ast_query_with_output && (ast_query_with_output->format != nullptr) |
721 | ? getIdentifierName(ast_query_with_output->format) |
722 | : context.getDefaultFormat(); |
723 | |
724 | if (ast_query_with_output && ast_query_with_output->settings_ast) |
725 | InterpreterSetQuery(ast_query_with_output->settings_ast, context).executeForCurrentContext(); |
726 | |
727 | pipeline.addSimpleTransform([](const Block & ) |
728 | { |
729 | return std::make_shared<MaterializingTransform>(header); |
730 | }); |
731 | |
732 | auto out = context.getOutputFormatProcessor(format_name, *out_buf, pipeline.getHeader()); |
733 | |
734 | /// Save previous progress callback if any. TODO Do it more conveniently. |
735 | auto previous_progress_callback = context.getProgressCallback(); |
736 | |
737 | /// NOTE Progress callback takes shared ownership of 'out'. |
738 | pipeline.setProgressCallback([out, previous_progress_callback] (const Progress & progress) |
739 | { |
740 | if (previous_progress_callback) |
741 | previous_progress_callback(progress); |
742 | out->onProgress(progress); |
743 | }); |
744 | |
745 | if (set_content_type) |
746 | set_content_type(out->getContentType()); |
747 | |
748 | if (set_query_id) |
749 | set_query_id(context.getClientInfo().current_query_id); |
750 | |
751 | pipeline.setOutput(std::move(out)); |
752 | |
753 | { |
754 | auto executor = pipeline.execute(); |
755 | executor->execute(context.getSettingsRef().max_threads); |
756 | } |
757 | |
758 | pipeline.finalize(); |
759 | } |
760 | } |
761 | catch (...) |
762 | { |
763 | streams.onException(); |
764 | throw; |
765 | } |
766 | |
767 | streams.onFinish(); |
768 | } |
769 | |
770 | } |
771 | |