| 1 | #include <Common/FieldVisitors.h> |
| 2 | #include <Common/assert_cast.h> |
| 3 | #include <DataStreams/VersionedCollapsingSortedBlockInputStream.h> |
| 4 | #include <Columns/ColumnsNumber.h> |
| 5 | |
| 6 | |
| 7 | namespace DB |
| 8 | { |
| 9 | |
| 10 | namespace ErrorCodes |
| 11 | { |
| 12 | extern const int LOGICAL_ERROR; |
| 13 | extern const int NOT_IMPLEMENTED; |
| 14 | } |
| 15 | |
| 16 | |
| 17 | VersionedCollapsingSortedBlockInputStream::VersionedCollapsingSortedBlockInputStream( |
| 18 | const BlockInputStreams & inputs_, const SortDescription & description_, |
| 19 | const String & sign_column_, size_t max_block_size_, |
| 20 | WriteBuffer * out_row_sources_buf_, bool average_block_sizes_) |
| 21 | : MergingSortedBlockInputStream(inputs_, description_, max_block_size_, 0, out_row_sources_buf_, false, average_block_sizes_) |
| 22 | , max_rows_in_queue(std::min(std::max<size_t>(3, max_block_size_), MAX_ROWS_IN_MULTIVERSION_QUEUE) - 2) |
| 23 | , current_keys(max_rows_in_queue + 1) |
| 24 | { |
| 25 | sign_column_number = header.getPositionByName(sign_column_); |
| 26 | } |
| 27 | |
| 28 | |
| 29 | |
| 30 | inline ALWAYS_INLINE static void writeRowSourcePart(WriteBuffer & buffer, RowSourcePart row_source) |
| 31 | { |
| 32 | if constexpr (sizeof(RowSourcePart) == 1) |
| 33 | buffer.write(*reinterpret_cast<const char *>(&row_source)); |
| 34 | else |
| 35 | buffer.write(reinterpret_cast<const char *>(&row_source), sizeof(RowSourcePart)); |
| 36 | } |
| 37 | |
| 38 | void VersionedCollapsingSortedBlockInputStream::insertGap(size_t gap_size) |
| 39 | { |
| 40 | if (out_row_sources_buf) |
| 41 | { |
| 42 | for (size_t i = 0; i < gap_size; ++i) |
| 43 | { |
| 44 | writeRowSourcePart(*out_row_sources_buf, current_row_sources.front()); |
| 45 | current_row_sources.pop(); |
| 46 | } |
| 47 | } |
| 48 | } |
| 49 | |
| 50 | void VersionedCollapsingSortedBlockInputStream::insertRow(size_t skip_rows, const SharedBlockRowRef & row, MutableColumns & merged_columns) |
| 51 | { |
| 52 | const auto & columns = row.shared_block->all_columns; |
| 53 | for (size_t i = 0; i < num_columns; ++i) |
| 54 | merged_columns[i]->insertFrom(*columns[i], row.row_num); |
| 55 | |
| 56 | insertGap(skip_rows); |
| 57 | |
| 58 | if (out_row_sources_buf) |
| 59 | { |
| 60 | current_row_sources.front().setSkipFlag(false); |
| 61 | writeRowSourcePart(*out_row_sources_buf, current_row_sources.front()); |
| 62 | current_row_sources.pop(); |
| 63 | } |
| 64 | } |
| 65 | |
| 66 | Block VersionedCollapsingSortedBlockInputStream::readImpl() |
| 67 | { |
| 68 | if (finished) |
| 69 | return {}; |
| 70 | |
| 71 | MutableColumns merged_columns; |
| 72 | init(merged_columns); |
| 73 | |
| 74 | if (has_collation) |
| 75 | throw Exception("Logical error: " + getName() + " does not support collations" , ErrorCodes::NOT_IMPLEMENTED); |
| 76 | |
| 77 | if (merged_columns.empty()) |
| 78 | return {}; |
| 79 | |
| 80 | merge(merged_columns, queue_without_collation); |
| 81 | return header.cloneWithColumns(std::move(merged_columns)); |
| 82 | } |
| 83 | |
| 84 | |
| 85 | void VersionedCollapsingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue) |
| 86 | { |
| 87 | MergeStopCondition stop_condition(average_block_sizes, max_block_size); |
| 88 | |
| 89 | auto update_queue = [this, & queue](SortCursor & cursor) |
| 90 | { |
| 91 | queue.pop(); |
| 92 | |
| 93 | if (out_row_sources_buf) |
| 94 | current_row_sources.emplace(cursor->order, true); |
| 95 | |
| 96 | if (!cursor->isLast()) |
| 97 | { |
| 98 | cursor->next(); |
| 99 | queue.push(cursor); |
| 100 | } |
| 101 | else |
| 102 | { |
| 103 | /// We take next block from the corresponding source, if there is one. |
| 104 | fetchNextBlock(cursor, queue); |
| 105 | } |
| 106 | }; |
| 107 | |
| 108 | /// Take rows in correct order and put them into `merged_columns` until the rows no more than `max_block_size` |
| 109 | while (!queue.empty()) |
| 110 | { |
| 111 | SortCursor current = queue.top(); |
| 112 | size_t current_block_granularity = current->rows; |
| 113 | |
| 114 | SharedBlockRowRef next_key; |
| 115 | |
| 116 | Int8 sign = assert_cast<const ColumnInt8 &>(*current->all_columns[sign_column_number]).getData()[current->pos]; |
| 117 | |
| 118 | setPrimaryKeyRef(next_key, current); |
| 119 | |
| 120 | size_t rows_to_merge = 0; |
| 121 | |
| 122 | /// Each branch either updates queue or increases rows_to_merge. |
| 123 | if (current_keys.empty()) |
| 124 | { |
| 125 | sign_in_queue = sign; |
| 126 | current_keys.pushBack(next_key); |
| 127 | update_queue(current); |
| 128 | } |
| 129 | else |
| 130 | { |
| 131 | if (current_keys.back() == next_key) |
| 132 | { |
| 133 | update_queue(current); |
| 134 | |
| 135 | if (sign == sign_in_queue) |
| 136 | current_keys.pushBack(next_key); |
| 137 | else |
| 138 | { |
| 139 | current_keys.popBack(); |
| 140 | current_keys.pushGap(2); |
| 141 | } |
| 142 | } |
| 143 | else |
| 144 | rows_to_merge = current_keys.size(); |
| 145 | } |
| 146 | |
| 147 | if (current_keys.size() > max_rows_in_queue) |
| 148 | rows_to_merge = std::max(rows_to_merge, current_keys.size() - max_rows_in_queue); |
| 149 | |
| 150 | while (rows_to_merge) |
| 151 | { |
| 152 | const auto & row = current_keys.front(); |
| 153 | auto gap = current_keys.frontGap(); |
| 154 | |
| 155 | insertRow(gap, row, merged_columns); |
| 156 | |
| 157 | current_keys.popFront(); |
| 158 | |
| 159 | stop_condition.addRowWithGranularity(current_block_granularity); |
| 160 | --rows_to_merge; |
| 161 | |
| 162 | if (stop_condition.checkStop()) |
| 163 | { |
| 164 | ++blocks_written; |
| 165 | return; |
| 166 | } |
| 167 | } |
| 168 | } |
| 169 | |
| 170 | while (!current_keys.empty()) |
| 171 | { |
| 172 | const auto & row = current_keys.front(); |
| 173 | auto gap = current_keys.frontGap(); |
| 174 | |
| 175 | insertRow(gap, row, merged_columns); |
| 176 | |
| 177 | current_keys.popFront(); |
| 178 | } |
| 179 | |
| 180 | /// Write information about last collapsed rows. |
| 181 | insertGap(current_keys.frontGap()); |
| 182 | |
| 183 | finished = true; |
| 184 | } |
| 185 | |
| 186 | } |
| 187 | |