1#include <Common/formatReadable.h>
2#include <Common/PODArray.h>
3#include <Common/typeid_cast.h>
4
5#include <IO/ConcatReadBuffer.h>
6#include <IO/WriteBufferFromFile.h>
7#include <IO/WriteBufferFromVector.h>
8#include <IO/LimitReadBuffer.h>
9#include <IO/copyData.h>
10
11#include <DataStreams/BlockIO.h>
12#include <DataStreams/copyData.h>
13#include <DataStreams/IBlockInputStream.h>
14#include <DataStreams/InputStreamFromASTInsertQuery.h>
15#include <DataStreams/CountingBlockOutputStream.h>
16
17#include <Parsers/ASTInsertQuery.h>
18#include <Parsers/ASTShowProcesslistQuery.h>
19#include <Parsers/ASTIdentifier.h>
20#include <Parsers/ASTLiteral.h>
21#include <Parsers/ParserQuery.h>
22#include <Parsers/parseQuery.h>
23#include <Parsers/queryToString.h>
24
25#include <Storages/StorageInput.h>
26
27#include <Access/QuotaContext.h>
28#include <Interpreters/InterpreterFactory.h>
29#include <Interpreters/ProcessList.h>
30#include <Interpreters/QueryLog.h>
31#include <Interpreters/InterpreterSetQuery.h>
32#include <Interpreters/ReplaceQueryParameterVisitor.h>
33#include <Interpreters/executeQuery.h>
34#include <Common/ProfileEvents.h>
35
36#include <Interpreters/DNSCacheUpdater.h>
37#include <Common/SensitiveDataMasker.h>
38
39#include <Processors/Transforms/LimitsCheckingTransform.h>
40#include <Processors/Transforms/MaterializingTransform.h>
41#include <Processors/Formats/IOutputFormat.h>
42#include <Parsers/ASTWatchQuery.h>
43
44namespace ProfileEvents
45{
46 extern const Event QueryMaskingRulesMatch;
47}
48
49namespace DB
50{
51
52namespace ErrorCodes
53{
54 extern const int LOGICAL_ERROR;
55 extern const int QUERY_IS_TOO_LARGE;
56 extern const int INTO_OUTFILE_NOT_ALLOWED;
57 extern const int QUERY_WAS_CANCELLED;
58}
59
60
61static void checkASTSizeLimits(const IAST & ast, const Settings & settings)
62{
63 if (settings.max_ast_depth)
64 ast.checkDepth(settings.max_ast_depth);
65 if (settings.max_ast_elements)
66 ast.checkSize(settings.max_ast_elements);
67}
68
69/// NOTE This is wrong in case of single-line comments and in case of multiline string literals.
70static String joinLines(const String & query)
71{
72 String res = query;
73 std::replace(res.begin(), res.end(), '\n', ' ');
74 return res;
75}
76
77
78static String prepareQueryForLogging(const String & query, Context & context)
79{
80 String res = query;
81
82 // wiping sensitive data before cropping query by log_queries_cut_to_length,
83 // otherwise something like credit card without last digit can go to log
84 if (auto masker = SensitiveDataMasker::getInstance())
85 {
86 auto matches = masker->wipeSensitiveData(res);
87 if (matches > 0)
88 {
89 ProfileEvents::increment(ProfileEvents::QueryMaskingRulesMatch, matches);
90 }
91 }
92
93 res = res.substr(0, context.getSettingsRef().log_queries_cut_to_length);
94
95 return res;
96}
97
98
99/// Log query into text log (not into system table).
100static void logQuery(const String & query, const Context & context, bool internal)
101{
102 if (internal)
103 {
104 LOG_DEBUG(&Logger::get("executeQuery"), "(internal) " << joinLines(query));
105 }
106 else
107 {
108 const auto & current_query_id = context.getClientInfo().current_query_id;
109 const auto & initial_query_id = context.getClientInfo().initial_query_id;
110 const auto & current_user = context.getClientInfo().current_user;
111
112 LOG_DEBUG(&Logger::get("executeQuery"), "(from " << context.getClientInfo().current_address.toString()
113 << (current_user != "default" ? ", user: " + context.getClientInfo().current_user : "")
114 << (!initial_query_id.empty() && current_query_id != initial_query_id ? ", initial_query_id: " + initial_query_id : std::string())
115 << ") "
116 << joinLines(query));
117 }
118}
119
120
121/// Call this inside catch block.
122static void setExceptionStackTrace(QueryLogElement & elem)
123{
124 /// Disable memory tracker for stack trace.
125 /// Because if exception is "Memory limit (for query) exceed", then we probably can't allocate another one string.
126 auto temporarily_disable_memory_tracker = getCurrentMemoryTrackerActionLock();
127
128 try
129 {
130 throw;
131 }
132 catch (const Exception & e)
133 {
134 elem.stack_trace = e.getStackTrace().toString();
135 }
136 catch (...) {}
137}
138
139
140/// Log exception (with query info) into text log (not into system table).
141static void logException(Context & context, QueryLogElement & elem)
142{
143 LOG_ERROR(&Logger::get("executeQuery"), elem.exception
144 << " (from " << context.getClientInfo().current_address.toString() << ")"
145 << " (in query: " << joinLines(elem.query) << ")"
146 << (!elem.stack_trace.empty() ? ", Stack trace (when copying this message, always include the lines below):\n\n" + elem.stack_trace : ""));
147}
148
149
150static void onExceptionBeforeStart(const String & query_for_logging, Context & context, time_t current_time)
151{
152 /// Exception before the query execution.
153 context.getQuota()->used(Quota::ERRORS, 1, /* check_exceeded = */ false);
154
155 const Settings & settings = context.getSettingsRef();
156
157 /// Log the start of query execution into the table if necessary.
158 QueryLogElement elem;
159
160 elem.type = QueryLogElement::EXCEPTION_BEFORE_START;
161
162 elem.event_time = current_time;
163 elem.query_start_time = current_time;
164
165 elem.query = query_for_logging;
166 elem.exception = getCurrentExceptionMessage(false);
167
168 elem.client_info = context.getClientInfo();
169
170 if (settings.calculate_text_stack_trace)
171 setExceptionStackTrace(elem);
172 logException(context, elem);
173
174 /// Update performance counters before logging to query_log
175 CurrentThread::finalizePerformanceCounters();
176
177 if (settings.log_queries)
178 if (auto query_log = context.getQueryLog())
179 query_log->add(elem);
180}
181
182
183static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
184 const char * begin,
185 const char * end,
186 Context & context,
187 bool internal,
188 QueryProcessingStage::Enum stage,
189 bool has_query_tail,
190 ReadBuffer * istr,
191 bool allow_processors)
192{
193 time_t current_time = time(nullptr);
194
195 /// If we already executing query and it requires to execute internal query, than
196 /// don't replace thread context with given (it can be temporary). Otherwise, attach context to thread.
197 if (!internal)
198 {
199 context.makeQueryContext();
200 CurrentThread::attachQueryContext(context);
201 }
202
203 const Settings & settings = context.getSettingsRef();
204
205 ParserQuery parser(end, settings.enable_debug_queries);
206 ASTPtr ast;
207 const char * query_end;
208
209 /// Don't limit the size of internal queries.
210 size_t max_query_size = 0;
211 if (!internal)
212 max_query_size = settings.max_query_size;
213
214 try
215 {
216 /// TODO Parser should fail early when max_query_size limit is reached.
217 ast = parseQuery(parser, begin, end, "", max_query_size);
218
219 auto * insert_query = ast->as<ASTInsertQuery>();
220
221 if (insert_query && insert_query->settings_ast)
222 InterpreterSetQuery(insert_query->settings_ast, context).executeForCurrentContext();
223
224 if (insert_query && insert_query->data)
225 {
226 query_end = insert_query->data;
227 insert_query->has_tail = has_query_tail;
228 }
229 else
230 {
231 query_end = end;
232 }
233 }
234 catch (...)
235 {
236 /// Anyway log the query.
237 String query = String(begin, begin + std::min(end - begin, static_cast<ptrdiff_t>(max_query_size)));
238
239 auto query_for_logging = prepareQueryForLogging(query, context);
240 logQuery(query_for_logging, context, internal);
241
242 if (!internal)
243 onExceptionBeforeStart(query_for_logging, context, current_time);
244
245 throw;
246 }
247
248 /// Copy query into string. It will be written to log and presented in processlist. If an INSERT query, string will not include data to insertion.
249 String query(begin, query_end);
250 BlockIO res;
251 QueryPipeline & pipeline = res.pipeline;
252
253 String query_for_logging = "";
254
255 try
256 {
257 /// Replace ASTQueryParameter with ASTLiteral for prepared statements.
258 if (context.hasQueryParameters())
259 {
260 ReplaceQueryParameterVisitor visitor(context.getQueryParameters());
261 visitor.visit(ast);
262
263 /// Get new query after substitutions.
264 query = serializeAST(*ast);
265 }
266
267 query_for_logging = prepareQueryForLogging(query, context);
268
269 logQuery(query_for_logging, context, internal);
270
271 /// Check the limits.
272 checkASTSizeLimits(*ast, settings);
273
274 /// Put query to process list. But don't put SHOW PROCESSLIST query itself.
275 ProcessList::EntryPtr process_list_entry;
276 if (!internal && !ast->as<ASTShowProcesslistQuery>())
277 {
278 /// processlist also has query masked now, to avoid secrets leaks though SHOW PROCESSLIST by other users.
279 process_list_entry = context.getProcessList().insert(query_for_logging, ast.get(), context);
280 context.setProcessListElement(&process_list_entry->get());
281 }
282
283 /// Load external tables if they were provided
284 context.initializeExternalTablesIfSet();
285
286 auto * insert_query = ast->as<ASTInsertQuery>();
287 if (insert_query && insert_query->select)
288 {
289 /// Prepare Input storage before executing interpreter if we already got a buffer with data.
290 if (istr)
291 {
292 ASTPtr input_function;
293 insert_query->tryFindInputFunction(input_function);
294 if (input_function)
295 {
296 StoragePtr storage = context.executeTableFunction(input_function);
297 auto & input_storage = dynamic_cast<StorageInput &>(*storage);
298 BlockInputStreamPtr input_stream = std::make_shared<InputStreamFromASTInsertQuery>(ast, istr,
299 input_storage.getSampleBlock(), context, input_function);
300 input_storage.setInputStream(input_stream);
301 }
302 }
303 }
304 else
305 /// reset Input callbacks if query is not INSERT SELECT
306 context.resetInputCallbacks();
307
308 auto interpreter = InterpreterFactory::get(ast, context, stage);
309 bool use_processors = settings.experimental_use_processors && allow_processors && interpreter->canExecuteWithProcessors();
310
311 QuotaContextPtr quota;
312 if (!interpreter->ignoreQuota())
313 {
314 quota = context.getQuota();
315 quota->used(Quota::QUERIES, 1);
316 quota->checkExceeded(Quota::ERRORS);
317 }
318
319 IBlockInputStream::LocalLimits limits;
320 if (!interpreter->ignoreLimits())
321 {
322 limits.mode = IBlockInputStream::LIMITS_CURRENT;
323 limits.size_limits = SizeLimits(settings.max_result_rows, settings.max_result_bytes, settings.result_overflow_mode);
324 }
325
326 if (use_processors)
327 pipeline = interpreter->executeWithProcessors();
328 else
329 res = interpreter->execute();
330
331 if (auto * insert_interpreter = typeid_cast<const InterpreterInsertQuery *>(&*interpreter))
332 {
333 /// Save insertion table (not table function). TODO: support remote() table function.
334 auto db_table = insert_interpreter->getDatabaseTable();
335 if (!db_table.second.empty())
336 context.setInsertionTable(std::move(db_table));
337 }
338
339 if (process_list_entry)
340 {
341 /// Query was killed before execution
342 if ((*process_list_entry)->isKilled())
343 throw Exception("Query '" + (*process_list_entry)->getInfo().client_info.current_query_id + "' is killed in pending state",
344 ErrorCodes::QUERY_WAS_CANCELLED);
345 else if (!use_processors)
346 (*process_list_entry)->setQueryStreams(res);
347 }
348
349 /// Hold element of process list till end of query execution.
350 res.process_list_entry = process_list_entry;
351
352 if (use_processors)
353 {
354 /// Limits on the result, the quota on the result, and also callback for progress.
355 /// Limits apply only to the final result.
356 pipeline.setProgressCallback(context.getProgressCallback());
357 pipeline.setProcessListElement(context.getProcessListElement());
358 if (stage == QueryProcessingStage::Complete)
359 {
360 pipeline.resize(1);
361 pipeline.addSimpleTransform([&](const Block & header)
362 {
363 auto transform = std::make_shared<LimitsCheckingTransform>(header, limits);
364 transform->setQuota(quota);
365 return transform;
366 });
367 }
368 }
369 else
370 {
371 /// Limits on the result, the quota on the result, and also callback for progress.
372 /// Limits apply only to the final result.
373 if (res.in)
374 {
375 res.in->setProgressCallback(context.getProgressCallback());
376 res.in->setProcessListElement(context.getProcessListElement());
377 if (stage == QueryProcessingStage::Complete)
378 {
379 if (!interpreter->ignoreQuota())
380 res.in->setQuota(quota);
381 if (!interpreter->ignoreLimits())
382 res.in->setLimits(limits);
383 }
384 }
385
386 if (res.out)
387 {
388 if (auto stream = dynamic_cast<CountingBlockOutputStream *>(res.out.get()))
389 {
390 stream->setProcessListElement(context.getProcessListElement());
391 }
392 }
393 }
394
395 /// Everything related to query log.
396 {
397 QueryLogElement elem;
398
399 elem.type = QueryLogElement::QUERY_START;
400
401 elem.event_time = current_time;
402 elem.query_start_time = current_time;
403
404 elem.query = query_for_logging;
405
406 elem.client_info = context.getClientInfo();
407
408 bool log_queries = settings.log_queries && !internal;
409
410 /// Log into system table start of query execution, if need.
411 if (log_queries)
412 {
413 if (settings.log_query_settings)
414 elem.query_settings = std::make_shared<Settings>(context.getSettingsRef());
415
416 if (auto query_log = context.getQueryLog())
417 query_log->add(elem);
418 }
419
420 /// Also make possible for caller to log successful query finish and exception during execution.
421 auto finish_callback = [elem, &context, log_queries] (IBlockInputStream * stream_in, IBlockOutputStream * stream_out) mutable
422 {
423 QueryStatus * process_list_elem = context.getProcessListElement();
424
425 if (!process_list_elem)
426 return;
427
428 /// Update performance counters before logging to query_log
429 CurrentThread::finalizePerformanceCounters();
430
431 QueryStatusInfo info = process_list_elem->getInfo(true, context.getSettingsRef().log_profile_events);
432
433 double elapsed_seconds = info.elapsed_seconds;
434
435 elem.type = QueryLogElement::QUERY_FINISH;
436
437 elem.event_time = time(nullptr);
438 elem.query_duration_ms = elapsed_seconds * 1000;
439
440 elem.read_rows = info.read_rows;
441 elem.read_bytes = info.read_bytes;
442
443 elem.written_rows = info.written_rows;
444 elem.written_bytes = info.written_bytes;
445
446 auto progress_callback = context.getProgressCallback();
447
448 if (progress_callback)
449 progress_callback(Progress(WriteProgress(info.written_rows, info.written_bytes)));
450
451 elem.memory_usage = info.peak_memory_usage > 0 ? info.peak_memory_usage : 0;
452
453 if (stream_in)
454 {
455 const BlockStreamProfileInfo & stream_in_info = stream_in->getProfileInfo();
456
457 /// NOTE: INSERT SELECT query contains zero metrics
458 elem.result_rows = stream_in_info.rows;
459 elem.result_bytes = stream_in_info.bytes;
460 }
461 else if (stream_out) /// will be used only for ordinary INSERT queries
462 {
463 if (auto counting_stream = dynamic_cast<const CountingBlockOutputStream *>(stream_out))
464 {
465 /// NOTE: Redundancy. The same values could be extracted from process_list_elem->progress_out.query_settings = process_list_elem->progress_in
466 elem.result_rows = counting_stream->getProgress().read_rows;
467 elem.result_bytes = counting_stream->getProgress().read_bytes;
468 }
469 }
470
471 if (elem.read_rows != 0)
472 {
473 LOG_INFO(&Logger::get("executeQuery"), std::fixed << std::setprecision(3)
474 << "Read " << elem.read_rows << " rows, "
475 << formatReadableSizeWithBinarySuffix(elem.read_bytes) << " in " << elapsed_seconds << " sec., "
476 << static_cast<size_t>(elem.read_rows / elapsed_seconds) << " rows/sec., "
477 << formatReadableSizeWithBinarySuffix(elem.read_bytes / elapsed_seconds) << "/sec.");
478 }
479
480 elem.thread_numbers = std::move(info.thread_numbers);
481 elem.os_thread_ids = std::move(info.os_thread_ids);
482 elem.profile_counters = std::move(info.profile_counters);
483
484 if (log_queries)
485 {
486 if (auto query_log = context.getQueryLog())
487 query_log->add(elem);
488 }
489 };
490
491 auto exception_callback = [elem, &context, log_queries] () mutable
492 {
493 context.getQuota()->used(Quota::ERRORS, 1, /* check_exceeded = */ false);
494
495 elem.type = QueryLogElement::EXCEPTION_WHILE_PROCESSING;
496
497 elem.event_time = time(nullptr);
498 elem.query_duration_ms = 1000 * (elem.event_time - elem.query_start_time);
499 elem.exception = getCurrentExceptionMessage(false);
500
501 QueryStatus * process_list_elem = context.getProcessListElement();
502 const Settings & current_settings = context.getSettingsRef();
503
504 /// Update performance counters before logging to query_log
505 CurrentThread::finalizePerformanceCounters();
506
507 if (process_list_elem)
508 {
509 QueryStatusInfo info = process_list_elem->getInfo(true, current_settings.log_profile_events, false);
510
511 elem.query_duration_ms = info.elapsed_seconds * 1000;
512
513 elem.read_rows = info.read_rows;
514 elem.read_bytes = info.read_bytes;
515
516 elem.memory_usage = info.peak_memory_usage > 0 ? info.peak_memory_usage : 0;
517
518 elem.thread_numbers = std::move(info.thread_numbers);
519 elem.os_thread_ids = std::move(info.os_thread_ids);
520 elem.profile_counters = std::move(info.profile_counters);
521 }
522
523 if (current_settings.calculate_text_stack_trace)
524 setExceptionStackTrace(elem);
525 logException(context, elem);
526
527 /// In case of exception we log internal queries also
528 if (log_queries)
529 {
530 if (auto query_log = context.getQueryLog())
531 query_log->add(elem);
532 }
533 };
534
535 res.finish_callback = std::move(finish_callback);
536 res.exception_callback = std::move(exception_callback);
537
538 if (!internal && res.in)
539 {
540 std::stringstream log_str;
541 log_str << "Query pipeline:\n";
542 res.in->dumpTree(log_str);
543 LOG_DEBUG(&Logger::get("executeQuery"), log_str.str());
544 }
545 }
546 }
547 catch (...)
548 {
549 if (!internal)
550 {
551 if (query_for_logging.empty())
552 query_for_logging = prepareQueryForLogging(query, context);
553
554 onExceptionBeforeStart(query_for_logging, context, current_time);
555 }
556
557 throw;
558 }
559
560 return std::make_tuple(ast, res);
561}
562
563
564BlockIO executeQuery(
565 const String & query,
566 Context & context,
567 bool internal,
568 QueryProcessingStage::Enum stage,
569 bool may_have_embedded_data,
570 bool allow_processors)
571{
572 ASTPtr ast;
573 BlockIO streams;
574 std::tie(ast, streams) = executeQueryImpl(query.data(), query.data() + query.size(), context,
575 internal, stage, !may_have_embedded_data, nullptr, allow_processors);
576 if (streams.in)
577 {
578 const auto * ast_query_with_output = dynamic_cast<const ASTQueryWithOutput *>(ast.get());
579 String format_name = ast_query_with_output && (ast_query_with_output->format != nullptr)
580 ? getIdentifierName(ast_query_with_output->format) : context.getDefaultFormat();
581 if (format_name == "Null")
582 streams.null_format = true;
583 }
584 return streams;
585}
586
587
588void executeQuery(
589 ReadBuffer & istr,
590 WriteBuffer & ostr,
591 bool allow_into_outfile,
592 Context & context,
593 std::function<void(const String &)> set_content_type,
594 std::function<void(const String &)> set_query_id)
595{
596 PODArray<char> parse_buf;
597 const char * begin;
598 const char * end;
599
600 /// If 'istr' is empty now, fetch next data into buffer.
601 if (istr.buffer().size() == 0)
602 istr.next();
603
604 size_t max_query_size = context.getSettingsRef().max_query_size;
605
606 bool may_have_tail;
607 if (istr.buffer().end() - istr.position() > static_cast<ssize_t>(max_query_size))
608 {
609 /// If remaining buffer space in 'istr' is enough to parse query up to 'max_query_size' bytes, then parse inplace.
610 begin = istr.position();
611 end = istr.buffer().end();
612 istr.position() += end - begin;
613 /// Actually we don't know will query has additional data or not.
614 /// But we can't check istr.eof(), because begin and end pointers will became invalid
615 may_have_tail = true;
616 }
617 else
618 {
619 /// If not - copy enough data into 'parse_buf'.
620 WriteBufferFromVector<PODArray<char>> out(parse_buf);
621 LimitReadBuffer limit(istr, max_query_size + 1, false);
622 copyData(limit, out);
623 out.finish();
624
625 begin = parse_buf.data();
626 end = begin + parse_buf.size();
627 /// Can check stream for eof, because we have copied data
628 may_have_tail = !istr.eof();
629 }
630
631 ASTPtr ast;
632 BlockIO streams;
633
634 std::tie(ast, streams) = executeQueryImpl(begin, end, context, false, QueryProcessingStage::Complete, may_have_tail, &istr, true);
635
636 auto & pipeline = streams.pipeline;
637
638 try
639 {
640 if (streams.out)
641 {
642 InputStreamFromASTInsertQuery in(ast, &istr, streams.out->getHeader(), context, nullptr);
643 copyData(in, *streams.out);
644 }
645
646 if (streams.in)
647 {
648 /// FIXME: try to prettify this cast using `as<>()`
649 const auto * ast_query_with_output = dynamic_cast<const ASTQueryWithOutput *>(ast.get());
650
651 WriteBuffer * out_buf = &ostr;
652 std::optional<WriteBufferFromFile> out_file_buf;
653 if (ast_query_with_output && ast_query_with_output->out_file)
654 {
655 if (!allow_into_outfile)
656 throw Exception("INTO OUTFILE is not allowed", ErrorCodes::INTO_OUTFILE_NOT_ALLOWED);
657
658 const auto & out_file = ast_query_with_output->out_file->as<ASTLiteral &>().value.safeGet<std::string>();
659 out_file_buf.emplace(out_file, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_EXCL | O_CREAT);
660 out_buf = &*out_file_buf;
661 }
662
663 String format_name = ast_query_with_output && (ast_query_with_output->format != nullptr)
664 ? getIdentifierName(ast_query_with_output->format)
665 : context.getDefaultFormat();
666
667 if (ast_query_with_output && ast_query_with_output->settings_ast)
668 InterpreterSetQuery(ast_query_with_output->settings_ast, context).executeForCurrentContext();
669
670 BlockOutputStreamPtr out = context.getOutputFormat(format_name, *out_buf, streams.in->getHeader());
671
672 /// Save previous progress callback if any. TODO Do it more conveniently.
673 auto previous_progress_callback = context.getProgressCallback();
674
675 /// NOTE Progress callback takes shared ownership of 'out'.
676 streams.in->setProgressCallback([out, previous_progress_callback] (const Progress & progress)
677 {
678 if (previous_progress_callback)
679 previous_progress_callback(progress);
680 out->onProgress(progress);
681 });
682
683 if (set_content_type)
684 set_content_type(out->getContentType());
685
686 if (set_query_id)
687 set_query_id(context.getClientInfo().current_query_id);
688
689 if (ast->as<ASTWatchQuery>())
690 {
691 /// For Watch query, flush data if block is empty (to send data to client).
692 auto flush_callback = [&out](const Block & block)
693 {
694 if (block.rows() == 0)
695 out->flush();
696 };
697
698 copyData(*streams.in, *out, [](){ return false; }, std::move(flush_callback));
699 }
700 else
701 copyData(*streams.in, *out);
702 }
703
704 if (pipeline.initialized())
705 {
706 const ASTQueryWithOutput * ast_query_with_output = dynamic_cast<const ASTQueryWithOutput *>(ast.get());
707
708 WriteBuffer * out_buf = &ostr;
709 std::optional<WriteBufferFromFile> out_file_buf;
710 if (ast_query_with_output && ast_query_with_output->out_file)
711 {
712 if (!allow_into_outfile)
713 throw Exception("INTO OUTFILE is not allowed", ErrorCodes::INTO_OUTFILE_NOT_ALLOWED);
714
715 const auto & out_file = typeid_cast<const ASTLiteral &>(*ast_query_with_output->out_file).value.safeGet<std::string>();
716 out_file_buf.emplace(out_file, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_EXCL | O_CREAT);
717 out_buf = &*out_file_buf;
718 }
719
720 String format_name = ast_query_with_output && (ast_query_with_output->format != nullptr)
721 ? getIdentifierName(ast_query_with_output->format)
722 : context.getDefaultFormat();
723
724 if (ast_query_with_output && ast_query_with_output->settings_ast)
725 InterpreterSetQuery(ast_query_with_output->settings_ast, context).executeForCurrentContext();
726
727 pipeline.addSimpleTransform([](const Block & header)
728 {
729 return std::make_shared<MaterializingTransform>(header);
730 });
731
732 auto out = context.getOutputFormatProcessor(format_name, *out_buf, pipeline.getHeader());
733
734 /// Save previous progress callback if any. TODO Do it more conveniently.
735 auto previous_progress_callback = context.getProgressCallback();
736
737 /// NOTE Progress callback takes shared ownership of 'out'.
738 pipeline.setProgressCallback([out, previous_progress_callback] (const Progress & progress)
739 {
740 if (previous_progress_callback)
741 previous_progress_callback(progress);
742 out->onProgress(progress);
743 });
744
745 if (set_content_type)
746 set_content_type(out->getContentType());
747
748 if (set_query_id)
749 set_query_id(context.getClientInfo().current_query_id);
750
751 pipeline.setOutput(std::move(out));
752
753 {
754 auto executor = pipeline.execute();
755 executor->execute(context.getSettingsRef().max_threads);
756 }
757
758 pipeline.finalize();
759 }
760 }
761 catch (...)
762 {
763 streams.onException();
764 throw;
765 }
766
767 streams.onFinish();
768}
769
770}
771