1#include <Parsers/ASTInsertQuery.h>
2#include <Interpreters/Context.h>
3#include <Interpreters/InterpreterSetQuery.h>
4#include <IO/ConcatReadBuffer.h>
5#include <IO/ReadBufferFromMemory.h>
6#include <DataStreams/BlockIO.h>
7#include <DataStreams/InputStreamFromASTInsertQuery.h>
8#include <DataStreams/AddingDefaultsBlockInputStream.h>
9#include <Storages/ColumnsDescription.h>
10#include <Storages/IStorage.h>
11
12
13namespace DB
14{
15
16namespace ErrorCodes
17{
18 extern const int LOGICAL_ERROR;
19 extern const int INVALID_USAGE_OF_INPUT;
20}
21
22
23InputStreamFromASTInsertQuery::InputStreamFromASTInsertQuery(
24 const ASTPtr & ast, ReadBuffer * input_buffer_tail_part, const Block & header, const Context & context, const ASTPtr & input_function)
25{
26 const auto * ast_insert_query = ast->as<ASTInsertQuery>();
27
28 if (!ast_insert_query)
29 throw Exception("Logical error: query requires data to insert, but it is not INSERT query", ErrorCodes::LOGICAL_ERROR);
30
31 String format = ast_insert_query->format;
32 if (format.empty())
33 {
34 if (input_function)
35 throw Exception("FORMAT must be specified for function input()", ErrorCodes::INVALID_USAGE_OF_INPUT);
36 format = "Values";
37 }
38
39 /// Data could be in parsed (ast_insert_query.data) and in not parsed yet (input_buffer_tail_part) part of query.
40
41 input_buffer_ast_part = std::make_unique<ReadBufferFromMemory>(
42 ast_insert_query->data, ast_insert_query->data ? ast_insert_query->end - ast_insert_query->data : 0);
43
44 ConcatReadBuffer::ReadBuffers buffers;
45 if (ast_insert_query->data)
46 buffers.push_back(input_buffer_ast_part.get());
47
48 if (input_buffer_tail_part)
49 buffers.push_back(input_buffer_tail_part);
50
51 /** NOTE Must not read from 'input_buffer_tail_part' before read all between 'ast_insert_query.data' and 'ast_insert_query.end'.
52 * - because 'query.data' could refer to memory piece, used as buffer for 'input_buffer_tail_part'.
53 */
54
55 input_buffer_contacenated = std::make_unique<ConcatReadBuffer>(buffers);
56
57 res_stream = context.getInputFormat(format, *input_buffer_contacenated, header, context.getSettings().max_insert_block_size);
58
59 if (context.getSettingsRef().input_format_defaults_for_omitted_fields && !ast_insert_query->table.empty() && !input_function)
60 {
61 StoragePtr storage = context.getTable(ast_insert_query->database, ast_insert_query->table);
62 auto column_defaults = storage->getColumns().getDefaults();
63 if (!column_defaults.empty())
64 res_stream = std::make_shared<AddingDefaultsBlockInputStream>(res_stream, column_defaults, context);
65 }
66}
67
68}
69