1#pragma once
2#include <DataStreams/IBlockInputStream.h>
3#include <Processors/Formats/IInputFormat.h>
4
5namespace DB
6{
7
8class IInputFormat;
9using InputFormatPtr = std::shared_ptr<IInputFormat>;
10
11class InputStreamFromInputFormat : public IBlockInputStream
12{
13public:
14 explicit InputStreamFromInputFormat(InputFormatPtr input_format_)
15 : input_format(std::move(input_format_))
16 , port(input_format->getPort().getHeader(), input_format.get())
17 {
18 connect(input_format->getPort(), port);
19 port.setNeeded();
20 }
21
22 String getName() const override { return input_format->getName(); }
23 Block getHeader() const override { return input_format->getPort().getHeader(); }
24
25 void cancel(bool kill) override
26 {
27 input_format->cancel();
28 IBlockInputStream::cancel(kill);
29 }
30
31 const BlockMissingValues & getMissingValues() const override { return input_format->getMissingValues(); }
32
33protected:
34
35 Block readImpl() override
36 {
37 while (true)
38 {
39 auto status = input_format->prepare();
40
41 switch (status)
42 {
43 case IProcessor::Status::Ready:
44 input_format->work();
45 break;
46
47 case IProcessor::Status::Finished:
48 return {};
49
50 case IProcessor::Status::PortFull:
51 return input_format->getPort().getHeader().cloneWithColumns(port.pull().detachColumns());
52
53 case IProcessor::Status::NeedData:
54 case IProcessor::Status::Async:
55 case IProcessor::Status::Wait:
56 case IProcessor::Status::ExpandPipeline:
57 throw Exception("Source processor returned status " + IProcessor::statusToName(status), ErrorCodes::LOGICAL_ERROR);
58 }
59 }
60 }
61
62private:
63 InputFormatPtr input_format;
64 InputPort port;
65};
66
67}
68