| 1 | #include <Storages/StorageInput.h> |
| 2 | #include <Storages/IStorage.h> |
| 3 | |
| 4 | #include <Interpreters/Context.h> |
| 5 | |
| 6 | #include <DataStreams/IBlockInputStream.h> |
| 7 | #include <memory> |
| 8 | |
| 9 | |
| 10 | namespace DB |
| 11 | { |
| 12 | |
| 13 | namespace ErrorCodes |
| 14 | { |
| 15 | extern const int INVALID_USAGE_OF_INPUT; |
| 16 | } |
| 17 | |
| 18 | StorageInput::StorageInput(const String &table_name_, const ColumnsDescription & columns_) |
| 19 | : IStorage(columns_), table_name(table_name_) |
| 20 | { |
| 21 | setColumns(columns_); |
| 22 | } |
| 23 | |
| 24 | |
| 25 | class StorageInputBlockInputStream : public IBlockInputStream |
| 26 | { |
| 27 | public: |
| 28 | StorageInputBlockInputStream(Context & context_, const Block sample_block_) |
| 29 | : context(context_), |
| 30 | sample_block(sample_block_) |
| 31 | { |
| 32 | } |
| 33 | |
| 34 | Block readImpl() override { return context.getInputBlocksReaderCallback()(context); } |
| 35 | void readPrefix() override {} |
| 36 | void readSuffix() override {} |
| 37 | |
| 38 | String getName() const override { return "Input" ; } |
| 39 | |
| 40 | Block () const override { return sample_block; } |
| 41 | |
| 42 | private: |
| 43 | Context & context; |
| 44 | const Block sample_block; |
| 45 | }; |
| 46 | |
| 47 | |
| 48 | void StorageInput::setInputStream(BlockInputStreamPtr input_stream_) |
| 49 | { |
| 50 | input_stream = input_stream_; |
| 51 | } |
| 52 | |
| 53 | |
| 54 | BlockInputStreams StorageInput::read(const Names & /*column_names*/, |
| 55 | const SelectQueryInfo & /*query_info*/, |
| 56 | const Context & context, |
| 57 | QueryProcessingStage::Enum /*processed_stage*/, |
| 58 | size_t /*max_block_size*/, |
| 59 | unsigned /*num_streams*/) |
| 60 | { |
| 61 | Context & query_context = const_cast<Context &>(context).getQueryContext(); |
| 62 | /// It is TCP request if we have callbacks for input(). |
| 63 | if (query_context.getInputBlocksReaderCallback()) |
| 64 | { |
| 65 | /// Send structure to the client. |
| 66 | query_context.initializeInput(shared_from_this()); |
| 67 | input_stream = std::make_shared<StorageInputBlockInputStream>(query_context, getSampleBlock()); |
| 68 | } |
| 69 | |
| 70 | if (!input_stream) |
| 71 | throw Exception("Input stream is not initialized, input() must be used only in INSERT SELECT query" , ErrorCodes::INVALID_USAGE_OF_INPUT); |
| 72 | |
| 73 | return {input_stream}; |
| 74 | } |
| 75 | |
| 76 | } |
| 77 | |