1#pragma once
2#include <Processors/Formats/IOutputFormat.h>
3#include <Common/ConcurrentBoundedQueue.h>
4#include <DataStreams/BlockStreamProfileInfo.h>
5#include <IO/WriteBuffer.h>
6
7namespace DB
8{
9
10class LazyOutputFormat : public IOutputFormat
11{
12
13public:
14 explicit LazyOutputFormat(const Block & header)
15 : IOutputFormat(header, out), queue(2), finished_processing(false) {}
16
17 String getName() const override { return "LazyOutputFormat"; }
18
19 Block getBlock(UInt64 milliseconds = 0);
20 Block getTotals();
21 Block getExtremes();
22
23 bool isFinished() { return finished_processing; }
24
25 BlockStreamProfileInfo & getProfileInfo() { return info; }
26
27 void setRowsBeforeLimit(size_t rows_before_limit) override;
28
29 void finish() { finished_processing = true; }
30 void clearQueue() { queue.clear(); }
31
32protected:
33 void consume(Chunk chunk) override
34 {
35 if (!finished_processing)
36 queue.emplace(std::move(chunk));
37 }
38
39 void consumeTotals(Chunk chunk) override { totals = std::move(chunk); }
40 void consumeExtremes(Chunk chunk) override { extremes = std::move(chunk); }
41
42 void finalize() override
43 {
44 finished_processing = true;
45
46 /// In case we are waiting for result.
47 queue.emplace(Chunk());
48 }
49
50private:
51
52 ConcurrentBoundedQueue<Chunk> queue;
53 Chunk totals;
54 Chunk extremes;
55
56 /// Is not used.
57 static WriteBuffer out;
58
59 BlockStreamProfileInfo info;
60
61 std::atomic<bool> finished_processing;
62};
63
64}
65