1#include <Processors/Sources/SourceFromInputStream.h>
2#include <Processors/Transforms/AggregatingTransform.h>
3#include <DataTypes/DataTypeAggregateFunction.h>
4#include <DataStreams/RemoteBlockInputStream.h>
5
6namespace DB
7{
8
9SourceFromInputStream::SourceFromInputStream(BlockInputStreamPtr stream_, bool force_add_aggregating_info_)
10 : ISourceWithProgress(stream_->getHeader())
11 , force_add_aggregating_info(force_add_aggregating_info_)
12 , stream(std::move(stream_))
13{
14 auto & sample = getPort().getHeader();
15 for (auto & type : sample.getDataTypes())
16 if (typeid_cast<const DataTypeAggregateFunction *>(type.get()))
17 has_aggregate_functions = true;
18}
19
20void SourceFromInputStream::addTotalsPort()
21{
22 if (has_totals_port)
23 throw Exception("Totals port was already added for SourceFromInputStream.", ErrorCodes::LOGICAL_ERROR);
24
25 outputs.emplace_back(outputs.front().getHeader(), this);
26 has_totals_port = true;
27}
28
29IProcessor::Status SourceFromInputStream::prepare()
30{
31 auto status = ISource::prepare();
32
33 if (status == Status::Finished)
34 {
35 is_generating_finished = true;
36
37 /// Read postfix and get totals if needed.
38 if (!is_stream_finished)
39 return Status::Ready;
40
41 if (has_totals_port)
42 {
43 auto & totals_out = outputs.back();
44
45 if (totals_out.isFinished())
46 return Status::Finished;
47
48 if (has_totals)
49 {
50 if (!totals_out.canPush())
51 return Status::PortFull;
52
53 totals_out.push(std::move(totals));
54 has_totals = false;
55 }
56
57 totals_out.finish();
58 }
59 }
60
61 return status;
62}
63
64void SourceFromInputStream::work()
65{
66 if (!is_generating_finished)
67 {
68 try
69 {
70 ISource::work();
71 }
72 catch (...)
73 {
74 /// Won't read suffix in case of exception.
75 is_stream_finished = true;
76 throw;
77 }
78
79 return;
80 }
81
82 if (is_stream_finished)
83 return;
84
85 /// Don't cancel for RemoteBlockInputStream (otherwise readSuffix can stack)
86 if (!typeid_cast<const RemoteBlockInputStream *>(stream.get()))
87 stream->cancel(false);
88
89 stream->readSuffix();
90
91 if (auto totals_block = stream->getTotals())
92 {
93 totals.setColumns(totals_block.getColumns(), 1);
94 has_totals = true;
95 }
96
97 is_stream_finished = true;
98}
99
100Chunk SourceFromInputStream::generate()
101{
102 if (is_stream_finished)
103 return {};
104
105 if (!is_stream_started)
106 {
107 stream->readPrefix();
108 is_stream_started = true;
109 }
110
111 auto block = stream->read();
112 if (!block)
113 {
114 stream->readSuffix();
115
116 if (auto totals_block = stream->getTotals())
117 {
118 if (totals_block.rows() == 1) /// Sometimes we can get empty totals. Skip it.
119 {
120 totals.setColumns(totals_block.getColumns(), 1);
121 has_totals = true;
122 }
123 }
124
125 is_stream_finished = true;
126 return {};
127 }
128
129#ifndef NDEBUG
130 assertBlocksHaveEqualStructure(getPort().getHeader(), block, "SourceFromInputStream");
131#endif
132
133 UInt64 num_rows = block.rows();
134 Chunk chunk(block.getColumns(), num_rows);
135
136 if (force_add_aggregating_info || has_aggregate_functions)
137 {
138 auto info = std::make_shared<AggregatedChunkInfo>();
139 info->bucket_num = block.info.bucket_num;
140 info->is_overflows = block.info.is_overflows;
141 chunk.setChunkInfo(std::move(info));
142 }
143
144 return chunk;
145}
146
147}
148