1#include <Processors/Formats/LazyOutputFormat.h>
2#include <Processors/Transforms/AggregatingTransform.h>
3
4
5namespace DB
6{
7
8WriteBuffer LazyOutputFormat::out(nullptr, 0);
9
10Block LazyOutputFormat::getBlock(UInt64 milliseconds)
11{
12 if (finished_processing)
13 {
14 if (queue.size() == 0)
15 return {};
16 }
17
18 Chunk chunk;
19 if (!queue.tryPop(chunk, milliseconds))
20 return {};
21
22 if (!chunk)
23 return {};
24
25 auto block = getPort(PortKind::Main).getHeader().cloneWithColumns(chunk.detachColumns());
26 info.update(block);
27
28 if (auto chunk_info = chunk.getChunkInfo())
29 {
30 if (auto * agg_info = typeid_cast<const AggregatedChunkInfo *>(chunk_info.get()))
31 {
32 block.info.bucket_num = agg_info->bucket_num;
33 block.info.is_overflows = agg_info->is_overflows;
34 }
35 }
36
37 return block;
38}
39
40Block LazyOutputFormat::getTotals()
41{
42 if (!totals)
43 return {};
44
45 return getPort(PortKind::Totals).getHeader().cloneWithColumns(totals.detachColumns());
46}
47
48Block LazyOutputFormat::getExtremes()
49{
50 if (!extremes)
51 return {};
52
53 return getPort(PortKind::Extremes).getHeader().cloneWithColumns(extremes.detachColumns());
54}
55
56void LazyOutputFormat::setRowsBeforeLimit(size_t rows_before_limit)
57{
58 info.setRowsBeforeLimit(rows_before_limit);
59}
60
61}
62