1 | #include <DataStreams/SquashingTransform.h> |
2 | |
3 | |
4 | namespace DB |
5 | { |
6 | |
7 | SquashingTransform::SquashingTransform(size_t min_block_size_rows_, size_t min_block_size_bytes_, bool reserve_memory_) |
8 | : min_block_size_rows(min_block_size_rows_) |
9 | , min_block_size_bytes(min_block_size_bytes_) |
10 | , reserve_memory(reserve_memory_) |
11 | { |
12 | } |
13 | |
14 | |
15 | SquashingTransform::Result SquashingTransform::add(MutableColumns && columns) |
16 | { |
17 | /// End of input stream. |
18 | if (columns.empty()) |
19 | return Result(std::move(accumulated_columns)); |
20 | |
21 | /// Just read block is already enough. |
22 | if (isEnoughSize(columns)) |
23 | { |
24 | /// If no accumulated data, return just read block. |
25 | if (accumulated_columns.empty()) |
26 | return Result(std::move(columns)); |
27 | |
28 | /// Return accumulated data (maybe it has small size) and place new block to accumulated data. |
29 | columns.swap(accumulated_columns); |
30 | return Result(std::move(columns)); |
31 | } |
32 | |
33 | /// Accumulated block is already enough. |
34 | if (!accumulated_columns.empty() && isEnoughSize(accumulated_columns)) |
35 | { |
36 | /// Return accumulated data and place new block to accumulated data. |
37 | columns.swap(accumulated_columns); |
38 | return Result(std::move(columns)); |
39 | } |
40 | |
41 | append(std::move(columns)); |
42 | |
43 | if (isEnoughSize(accumulated_columns)) |
44 | { |
45 | MutableColumns res; |
46 | res.swap(accumulated_columns); |
47 | return Result(std::move(res)); |
48 | } |
49 | |
50 | /// Squashed block is not ready. |
51 | return false; |
52 | } |
53 | |
54 | |
55 | void SquashingTransform::append(MutableColumns && columns) |
56 | { |
57 | if (accumulated_columns.empty()) |
58 | { |
59 | accumulated_columns = std::move(columns); |
60 | return; |
61 | } |
62 | |
63 | for (size_t i = 0, size = columns.size(); i < size; ++i) |
64 | { |
65 | auto & column = accumulated_columns[i]; |
66 | if (reserve_memory) |
67 | column->reserve(min_block_size_bytes); |
68 | column->insertRangeFrom(*columns[i], 0, columns[i]->size()); |
69 | } |
70 | } |
71 | |
72 | |
73 | bool SquashingTransform::isEnoughSize(const MutableColumns & columns) |
74 | { |
75 | size_t rows = 0; |
76 | size_t bytes = 0; |
77 | |
78 | for (const auto & column : columns) |
79 | { |
80 | if (!rows) |
81 | rows = column->size(); |
82 | else if (rows != column->size()) |
83 | throw Exception("Sizes of columns doesn't match" , ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH); |
84 | |
85 | bytes += column->byteSize(); |
86 | } |
87 | |
88 | return isEnoughSize(rows, bytes); |
89 | } |
90 | |
91 | |
92 | bool SquashingTransform::isEnoughSize(size_t rows, size_t bytes) const |
93 | { |
94 | return (!min_block_size_rows && !min_block_size_bytes) |
95 | || (min_block_size_rows && rows >= min_block_size_rows) |
96 | || (min_block_size_bytes && bytes >= min_block_size_bytes); |
97 | } |
98 | |
99 | } |
100 | |