1 | #include <Storages/StorageInput.h> |
2 | #include <Storages/IStorage.h> |
3 | |
4 | #include <Interpreters/Context.h> |
5 | |
6 | #include <DataStreams/IBlockInputStream.h> |
7 | #include <memory> |
8 | |
9 | |
10 | namespace DB |
11 | { |
12 | |
13 | namespace ErrorCodes |
14 | { |
15 | extern const int INVALID_USAGE_OF_INPUT; |
16 | } |
17 | |
18 | StorageInput::StorageInput(const String &table_name_, const ColumnsDescription & columns_) |
19 | : IStorage(columns_), table_name(table_name_) |
20 | { |
21 | setColumns(columns_); |
22 | } |
23 | |
24 | |
25 | class StorageInputBlockInputStream : public IBlockInputStream |
26 | { |
27 | public: |
28 | StorageInputBlockInputStream(Context & context_, const Block sample_block_) |
29 | : context(context_), |
30 | sample_block(sample_block_) |
31 | { |
32 | } |
33 | |
34 | Block readImpl() override { return context.getInputBlocksReaderCallback()(context); } |
35 | void readPrefix() override {} |
36 | void readSuffix() override {} |
37 | |
38 | String getName() const override { return "Input" ; } |
39 | |
40 | Block () const override { return sample_block; } |
41 | |
42 | private: |
43 | Context & context; |
44 | const Block sample_block; |
45 | }; |
46 | |
47 | |
48 | void StorageInput::setInputStream(BlockInputStreamPtr input_stream_) |
49 | { |
50 | input_stream = input_stream_; |
51 | } |
52 | |
53 | |
54 | BlockInputStreams StorageInput::read(const Names & /*column_names*/, |
55 | const SelectQueryInfo & /*query_info*/, |
56 | const Context & context, |
57 | QueryProcessingStage::Enum /*processed_stage*/, |
58 | size_t /*max_block_size*/, |
59 | unsigned /*num_streams*/) |
60 | { |
61 | Context & query_context = const_cast<Context &>(context).getQueryContext(); |
62 | /// It is TCP request if we have callbacks for input(). |
63 | if (query_context.getInputBlocksReaderCallback()) |
64 | { |
65 | /// Send structure to the client. |
66 | query_context.initializeInput(shared_from_this()); |
67 | input_stream = std::make_shared<StorageInputBlockInputStream>(query_context, getSampleBlock()); |
68 | } |
69 | |
70 | if (!input_stream) |
71 | throw Exception("Input stream is not initialized, input() must be used only in INSERT SELECT query" , ErrorCodes::INVALID_USAGE_OF_INPUT); |
72 | |
73 | return {input_stream}; |
74 | } |
75 | |
76 | } |
77 | |