1#include <DataStreams/SquashingBlockInputStream.h>
2
3
4namespace DB
5{
6
7SquashingBlockInputStream::SquashingBlockInputStream(
8 const BlockInputStreamPtr & src, size_t min_block_size_rows, size_t min_block_size_bytes, bool reserve_memory)
9 : header(src->getHeader()), transform(min_block_size_rows, min_block_size_bytes, reserve_memory)
10{
11 children.emplace_back(src);
12}
13
14
15Block SquashingBlockInputStream::readImpl()
16{
17 if (all_read)
18 return {};
19
20 while (true)
21 {
22 Block block = children[0]->read();
23 if (!block)
24 all_read = true;
25
26 SquashingTransform::Result result = transform.add(block.mutateColumns());
27 if (result.ready)
28 {
29 if (result.columns.empty())
30 return {};
31 return header.cloneWithColumns(std::move(result.columns));
32 }
33 }
34}
35
36}
37