1 | #pragma once |
---|---|
2 | #include <DataStreams/IBlockInputStream.h> |
3 | #include <Processors/Formats/IInputFormat.h> |
4 | |
5 | namespace DB |
6 | { |
7 | |
8 | class IInputFormat; |
9 | using InputFormatPtr = std::shared_ptr<IInputFormat>; |
10 | |
11 | class InputStreamFromInputFormat : public IBlockInputStream |
12 | { |
13 | public: |
14 | explicit InputStreamFromInputFormat(InputFormatPtr input_format_) |
15 | : input_format(std::move(input_format_)) |
16 | , port(input_format->getPort().getHeader(), input_format.get()) |
17 | { |
18 | connect(input_format->getPort(), port); |
19 | port.setNeeded(); |
20 | } |
21 | |
22 | String getName() const override { return input_format->getName(); } |
23 | Block getHeader() const override { return input_format->getPort().getHeader(); } |
24 | |
25 | void cancel(bool kill) override |
26 | { |
27 | input_format->cancel(); |
28 | IBlockInputStream::cancel(kill); |
29 | } |
30 | |
31 | const BlockMissingValues & getMissingValues() const override { return input_format->getMissingValues(); } |
32 | |
33 | protected: |
34 | |
35 | Block readImpl() override |
36 | { |
37 | while (true) |
38 | { |
39 | auto status = input_format->prepare(); |
40 | |
41 | switch (status) |
42 | { |
43 | case IProcessor::Status::Ready: |
44 | input_format->work(); |
45 | break; |
46 | |
47 | case IProcessor::Status::Finished: |
48 | return {}; |
49 | |
50 | case IProcessor::Status::PortFull: |
51 | return input_format->getPort().getHeader().cloneWithColumns(port.pull().detachColumns()); |
52 | |
53 | case IProcessor::Status::NeedData: |
54 | case IProcessor::Status::Async: |
55 | case IProcessor::Status::Wait: |
56 | case IProcessor::Status::ExpandPipeline: |
57 | throw Exception("Source processor returned status "+ IProcessor::statusToName(status), ErrorCodes::LOGICAL_ERROR); |
58 | } |
59 | } |
60 | } |
61 | |
62 | private: |
63 | InputFormatPtr input_format; |
64 | InputPort port; |
65 | }; |
66 | |
67 | } |
68 |