1#pragma once
2#include <Processors/Sources/SourceWithProgress.h>
3
4namespace DB
5{
6
7class IBlockInputStream;
8using BlockInputStreamPtr = std::shared_ptr<IBlockInputStream>;
9
10/// Wrapper for IBlockInputStream which implements ISourceWithProgress.
11class SourceFromInputStream : public ISourceWithProgress
12{
13public:
14 explicit SourceFromInputStream(BlockInputStreamPtr stream_, bool force_add_aggregating_info_ = false);
15 String getName() const override { return "SourceFromInputStream"; }
16
17 Status prepare() override;
18 void work() override;
19
20 Chunk generate() override;
21
22 IBlockInputStream & getStream() { return *stream; }
23
24 void addTotalsPort();
25
26 /// Implementation for methods from ISourceWithProgress.
27 void setLimits(const LocalLimits & limits_) final { stream->setLimits(limits_); }
28 void setQuota(const std::shared_ptr<QuotaContext> & quota_) final { stream->setQuota(quota_); }
29 void setProcessListElement(QueryStatus * elem) final { stream->setProcessListElement(elem); }
30 void setProgressCallback(const ProgressCallback & callback) final { stream->setProgressCallback(callback); }
31 void addTotalRowsApprox(size_t value) final { stream->addTotalRowsApprox(value); }
32
33private:
34 bool has_aggregate_functions = false;
35 bool force_add_aggregating_info;
36 BlockInputStreamPtr stream;
37
38 Chunk totals;
39 bool has_totals_port = false;
40 bool has_totals = false;
41
42 bool is_generating_finished = false;
43 bool is_stream_finished = false;
44 bool is_stream_started = false;
45};
46
47}
48