1 | #pragma once |
---|---|
2 | #include <Processors/Formats/IOutputFormat.h> |
3 | #include <Common/ConcurrentBoundedQueue.h> |
4 | #include <DataStreams/BlockStreamProfileInfo.h> |
5 | #include <IO/WriteBuffer.h> |
6 | |
7 | namespace DB |
8 | { |
9 | |
10 | class LazyOutputFormat : public IOutputFormat |
11 | { |
12 | |
13 | public: |
14 | explicit LazyOutputFormat(const Block & header) |
15 | : IOutputFormat(header, out), queue(2), finished_processing(false) {} |
16 | |
17 | String getName() const override { return "LazyOutputFormat"; } |
18 | |
19 | Block getBlock(UInt64 milliseconds = 0); |
20 | Block getTotals(); |
21 | Block getExtremes(); |
22 | |
23 | bool isFinished() { return finished_processing; } |
24 | |
25 | BlockStreamProfileInfo & getProfileInfo() { return info; } |
26 | |
27 | void setRowsBeforeLimit(size_t rows_before_limit) override; |
28 | |
29 | void finish() { finished_processing = true; } |
30 | void clearQueue() { queue.clear(); } |
31 | |
32 | protected: |
33 | void consume(Chunk chunk) override |
34 | { |
35 | if (!finished_processing) |
36 | queue.emplace(std::move(chunk)); |
37 | } |
38 | |
39 | void consumeTotals(Chunk chunk) override { totals = std::move(chunk); } |
40 | void consumeExtremes(Chunk chunk) override { extremes = std::move(chunk); } |
41 | |
42 | void finalize() override |
43 | { |
44 | finished_processing = true; |
45 | |
46 | /// In case we are waiting for result. |
47 | queue.emplace(Chunk()); |
48 | } |
49 | |
50 | private: |
51 | |
52 | ConcurrentBoundedQueue<Chunk> queue; |
53 | Chunk totals; |
54 | Chunk extremes; |
55 | |
56 | /// Is not used. |
57 | static WriteBuffer out; |
58 | |
59 | BlockStreamProfileInfo info; |
60 | |
61 | std::atomic<bool> finished_processing; |
62 | }; |
63 | |
64 | } |
65 |