1#include <DataStreams/SquashingBlockOutputStream.h>
2
3
4namespace DB
5{
6
7SquashingBlockOutputStream::SquashingBlockOutputStream(BlockOutputStreamPtr dst, Block header_, size_t min_block_size_rows, size_t min_block_size_bytes)
8 : output(std::move(dst)), header(std::move(header_)), transform(min_block_size_rows, min_block_size_bytes)
9{
10}
11
12
13void SquashingBlockOutputStream::write(const Block & block)
14{
15 SquashingTransform::Result result = transform.add(Block(block).mutateColumns());
16 if (result.ready)
17 output->write(header.cloneWithColumns(std::move(result.columns)));
18}
19
20
21void SquashingBlockOutputStream::finalize()
22{
23 if (all_written)
24 return;
25
26 all_written = true;
27
28 SquashingTransform::Result result = transform.add({});
29 if (result.ready && !result.columns.empty())
30 output->write(header.cloneWithColumns(std::move(result.columns)));
31}
32
33
34void SquashingBlockOutputStream::flush()
35{
36 if (!disable_flush)
37 finalize();
38 output->flush();
39}
40
41
42void SquashingBlockOutputStream::writePrefix()
43{
44 output->writePrefix();
45}
46
47
48void SquashingBlockOutputStream::writeSuffix()
49{
50 finalize();
51 output->writeSuffix();
52}
53
54}
55