1#include <DataStreams/SquashingTransform.h>
2
3
4namespace DB
5{
6
7SquashingTransform::SquashingTransform(size_t min_block_size_rows_, size_t min_block_size_bytes_, bool reserve_memory_)
8 : min_block_size_rows(min_block_size_rows_)
9 , min_block_size_bytes(min_block_size_bytes_)
10 , reserve_memory(reserve_memory_)
11{
12}
13
14
15SquashingTransform::Result SquashingTransform::add(MutableColumns && columns)
16{
17 /// End of input stream.
18 if (columns.empty())
19 return Result(std::move(accumulated_columns));
20
21 /// Just read block is already enough.
22 if (isEnoughSize(columns))
23 {
24 /// If no accumulated data, return just read block.
25 if (accumulated_columns.empty())
26 return Result(std::move(columns));
27
28 /// Return accumulated data (maybe it has small size) and place new block to accumulated data.
29 columns.swap(accumulated_columns);
30 return Result(std::move(columns));
31 }
32
33 /// Accumulated block is already enough.
34 if (!accumulated_columns.empty() && isEnoughSize(accumulated_columns))
35 {
36 /// Return accumulated data and place new block to accumulated data.
37 columns.swap(accumulated_columns);
38 return Result(std::move(columns));
39 }
40
41 append(std::move(columns));
42
43 if (isEnoughSize(accumulated_columns))
44 {
45 MutableColumns res;
46 res.swap(accumulated_columns);
47 return Result(std::move(res));
48 }
49
50 /// Squashed block is not ready.
51 return false;
52}
53
54
55void SquashingTransform::append(MutableColumns && columns)
56{
57 if (accumulated_columns.empty())
58 {
59 accumulated_columns = std::move(columns);
60 return;
61 }
62
63 for (size_t i = 0, size = columns.size(); i < size; ++i)
64 {
65 auto & column = accumulated_columns[i];
66 if (reserve_memory)
67 column->reserve(min_block_size_bytes);
68 column->insertRangeFrom(*columns[i], 0, columns[i]->size());
69 }
70}
71
72
73bool SquashingTransform::isEnoughSize(const MutableColumns & columns)
74{
75 size_t rows = 0;
76 size_t bytes = 0;
77
78 for (const auto & column : columns)
79 {
80 if (!rows)
81 rows = column->size();
82 else if (rows != column->size())
83 throw Exception("Sizes of columns doesn't match", ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH);
84
85 bytes += column->byteSize();
86 }
87
88 return isEnoughSize(rows, bytes);
89}
90
91
92bool SquashingTransform::isEnoughSize(size_t rows, size_t bytes) const
93{
94 return (!min_block_size_rows && !min_block_size_bytes)
95 || (min_block_size_rows && rows >= min_block_size_rows)
96 || (min_block_size_bytes && bytes >= min_block_size_bytes);
97}
98
99}
100