1#include <Storages/StorageInput.h>
2#include <Storages/IStorage.h>
3
4#include <Interpreters/Context.h>
5
6#include <DataStreams/IBlockInputStream.h>
7#include <memory>
8
9
10namespace DB
11{
12
13namespace ErrorCodes
14{
15 extern const int INVALID_USAGE_OF_INPUT;
16}
17
18StorageInput::StorageInput(const String &table_name_, const ColumnsDescription & columns_)
19 : IStorage(columns_), table_name(table_name_)
20{
21 setColumns(columns_);
22}
23
24
25class StorageInputBlockInputStream : public IBlockInputStream
26{
27public:
28 StorageInputBlockInputStream(Context & context_, const Block sample_block_)
29 : context(context_),
30 sample_block(sample_block_)
31 {
32 }
33
34 Block readImpl() override { return context.getInputBlocksReaderCallback()(context); }
35 void readPrefix() override {}
36 void readSuffix() override {}
37
38 String getName() const override { return "Input"; }
39
40 Block getHeader() const override { return sample_block; }
41
42private:
43 Context & context;
44 const Block sample_block;
45};
46
47
48void StorageInput::setInputStream(BlockInputStreamPtr input_stream_)
49{
50 input_stream = input_stream_;
51}
52
53
54BlockInputStreams StorageInput::read(const Names & /*column_names*/,
55 const SelectQueryInfo & /*query_info*/,
56 const Context & context,
57 QueryProcessingStage::Enum /*processed_stage*/,
58 size_t /*max_block_size*/,
59 unsigned /*num_streams*/)
60{
61 Context & query_context = const_cast<Context &>(context).getQueryContext();
62 /// It is TCP request if we have callbacks for input().
63 if (query_context.getInputBlocksReaderCallback())
64 {
65 /// Send structure to the client.
66 query_context.initializeInput(shared_from_this());
67 input_stream = std::make_shared<StorageInputBlockInputStream>(query_context, getSampleBlock());
68 }
69
70 if (!input_stream)
71 throw Exception("Input stream is not initialized, input() must be used only in INSERT SELECT query", ErrorCodes::INVALID_USAGE_OF_INPUT);
72
73 return {input_stream};
74}
75
76}
77