| 1 | #include <Storages/MergeTree/MergeTreeReaderStream.h> |
| 2 | #include <Poco/File.h> |
| 3 | |
| 4 | |
| 5 | namespace DB |
| 6 | { |
| 7 | |
| 8 | namespace ErrorCodes |
| 9 | { |
| 10 | extern const int LOGICAL_ERROR; |
| 11 | extern const int CORRUPTED_DATA; |
| 12 | extern const int CANNOT_READ_ALL_DATA; |
| 13 | extern const int ARGUMENT_OUT_OF_BOUND; |
| 14 | } |
| 15 | |
| 16 | |
| 17 | MergeTreeReaderStream::MergeTreeReaderStream( |
| 18 | const String & path_prefix_, const String & data_file_extension_, size_t marks_count_, |
| 19 | const MarkRanges & all_mark_ranges, |
| 20 | MarkCache * mark_cache_, bool save_marks_in_cache_, |
| 21 | UncompressedCache * uncompressed_cache, |
| 22 | size_t file_size, size_t aio_threshold, size_t max_read_buffer_size, |
| 23 | const MergeTreeIndexGranularityInfo * index_granularity_info_, |
| 24 | const ReadBufferFromFileBase::ProfileCallback & profile_callback, clockid_t clock_type) |
| 25 | : path_prefix(path_prefix_), data_file_extension(data_file_extension_), marks_count(marks_count_) |
| 26 | , mark_cache(mark_cache_), save_marks_in_cache(save_marks_in_cache_) |
| 27 | , index_granularity_info(index_granularity_info_) |
| 28 | { |
| 29 | /// Compute the size of the buffer. |
| 30 | size_t max_mark_range_bytes = 0; |
| 31 | size_t sum_mark_range_bytes = 0; |
| 32 | |
| 33 | /// Care should be taken to not load marks when the part is empty (marks_count == 0). |
| 34 | |
| 35 | for (const auto & mark_range : all_mark_ranges) |
| 36 | { |
| 37 | size_t left_mark = mark_range.begin; |
| 38 | size_t right_mark = mark_range.end; |
| 39 | |
| 40 | /// NOTE: if we are reading the whole file, then right_mark == marks_count |
| 41 | /// and we will use max_read_buffer_size for buffer size, thus avoiding the need to load marks. |
| 42 | |
| 43 | /// If the end of range is inside the block, we will need to read it too. |
| 44 | if (right_mark < marks_count && getMark(right_mark).offset_in_decompressed_block > 0) |
| 45 | { |
| 46 | while (right_mark < marks_count |
| 47 | && getMark(right_mark).offset_in_compressed_file == getMark(mark_range.end).offset_in_compressed_file) |
| 48 | { |
| 49 | ++right_mark; |
| 50 | } |
| 51 | } |
| 52 | |
| 53 | size_t mark_range_bytes; |
| 54 | |
| 55 | /// If there are no marks after the end of range, just use file size |
| 56 | if (right_mark >= marks_count |
| 57 | || (right_mark + 1 == marks_count |
| 58 | && getMark(right_mark).offset_in_compressed_file == getMark(mark_range.end).offset_in_compressed_file)) |
| 59 | { |
| 60 | mark_range_bytes = file_size - (left_mark < marks_count ? getMark(left_mark).offset_in_compressed_file : 0); |
| 61 | } |
| 62 | else |
| 63 | { |
| 64 | mark_range_bytes = getMark(right_mark).offset_in_compressed_file - getMark(left_mark).offset_in_compressed_file; |
| 65 | } |
| 66 | |
| 67 | max_mark_range_bytes = std::max(max_mark_range_bytes, mark_range_bytes); |
| 68 | sum_mark_range_bytes += mark_range_bytes; |
| 69 | } |
| 70 | |
| 71 | /// Avoid empty buffer. May happen while reading dictionary for DataTypeLowCardinality. |
| 72 | /// For example: part has single dictionary and all marks point to the same position. |
| 73 | if (max_mark_range_bytes == 0) |
| 74 | max_mark_range_bytes = max_read_buffer_size; |
| 75 | |
| 76 | size_t buffer_size = std::min(max_read_buffer_size, max_mark_range_bytes); |
| 77 | |
| 78 | /// Initialize the objects that shall be used to perform read operations. |
| 79 | if (uncompressed_cache) |
| 80 | { |
| 81 | auto buffer = std::make_unique<CachedCompressedReadBuffer>( |
| 82 | path_prefix + data_file_extension, uncompressed_cache, sum_mark_range_bytes, aio_threshold, buffer_size); |
| 83 | |
| 84 | if (profile_callback) |
| 85 | buffer->setProfileCallback(profile_callback, clock_type); |
| 86 | |
| 87 | cached_buffer = std::move(buffer); |
| 88 | data_buffer = cached_buffer.get(); |
| 89 | } |
| 90 | else |
| 91 | { |
| 92 | auto buffer = std::make_unique<CompressedReadBufferFromFile>( |
| 93 | path_prefix + data_file_extension, sum_mark_range_bytes, aio_threshold, buffer_size); |
| 94 | |
| 95 | if (profile_callback) |
| 96 | buffer->setProfileCallback(profile_callback, clock_type); |
| 97 | |
| 98 | non_cached_buffer = std::move(buffer); |
| 99 | data_buffer = non_cached_buffer.get(); |
| 100 | } |
| 101 | } |
| 102 | |
| 103 | |
| 104 | const MarkInCompressedFile & MergeTreeReaderStream::getMark(size_t index) |
| 105 | { |
| 106 | if (!marks) |
| 107 | loadMarks(); |
| 108 | return (*marks)[index]; |
| 109 | } |
| 110 | |
| 111 | |
| 112 | void MergeTreeReaderStream::loadMarks() |
| 113 | { |
| 114 | std::string mrk_path = index_granularity_info->getMarksFilePath(path_prefix); |
| 115 | |
| 116 | auto load = [&]() -> MarkCache::MappedPtr |
| 117 | { |
| 118 | /// Memory for marks must not be accounted as memory usage for query, because they are stored in shared cache. |
| 119 | auto temporarily_disable_memory_tracker = getCurrentMemoryTrackerActionLock(); |
| 120 | |
| 121 | size_t file_size = Poco::File(mrk_path).getSize(); |
| 122 | size_t expected_file_size = index_granularity_info->mark_size_in_bytes * marks_count; |
| 123 | if (expected_file_size != file_size) |
| 124 | throw Exception( |
| 125 | "Bad size of marks file '" + mrk_path + "': " + std::to_string(file_size) + ", must be: " + std::to_string(expected_file_size), |
| 126 | ErrorCodes::CORRUPTED_DATA); |
| 127 | |
| 128 | auto res = std::make_shared<MarksInCompressedFile>(marks_count); |
| 129 | |
| 130 | if (!index_granularity_info->is_adaptive) |
| 131 | { |
| 132 | /// Read directly to marks. |
| 133 | ReadBufferFromFile buffer(mrk_path, file_size, -1, reinterpret_cast<char *>(res->data())); |
| 134 | |
| 135 | if (buffer.eof() || buffer.buffer().size() != file_size) |
| 136 | throw Exception("Cannot read all marks from file " + mrk_path, ErrorCodes::CANNOT_READ_ALL_DATA); |
| 137 | } |
| 138 | else |
| 139 | { |
| 140 | ReadBufferFromFile buffer(mrk_path, file_size, -1); |
| 141 | size_t i = 0; |
| 142 | while (!buffer.eof()) |
| 143 | { |
| 144 | readIntBinary((*res)[i].offset_in_compressed_file, buffer); |
| 145 | readIntBinary((*res)[i].offset_in_decompressed_block, buffer); |
| 146 | buffer.seek(sizeof(size_t), SEEK_CUR); |
| 147 | ++i; |
| 148 | } |
| 149 | if (i * index_granularity_info->mark_size_in_bytes != file_size) |
| 150 | throw Exception("Cannot read all marks from file " + mrk_path, ErrorCodes::CANNOT_READ_ALL_DATA); |
| 151 | } |
| 152 | res->protect(); |
| 153 | return res; |
| 154 | }; |
| 155 | |
| 156 | if (mark_cache) |
| 157 | { |
| 158 | auto key = mark_cache->hash(mrk_path); |
| 159 | if (save_marks_in_cache) |
| 160 | { |
| 161 | marks = mark_cache->getOrSet(key, load); |
| 162 | } |
| 163 | else |
| 164 | { |
| 165 | marks = mark_cache->get(key); |
| 166 | if (!marks) |
| 167 | marks = load(); |
| 168 | } |
| 169 | } |
| 170 | else |
| 171 | marks = load(); |
| 172 | |
| 173 | if (!marks) |
| 174 | throw Exception("Failed to load marks: " + mrk_path, ErrorCodes::LOGICAL_ERROR); |
| 175 | } |
| 176 | |
| 177 | |
| 178 | void MergeTreeReaderStream::seekToMark(size_t index) |
| 179 | { |
| 180 | MarkInCompressedFile mark = getMark(index); |
| 181 | |
| 182 | try |
| 183 | { |
| 184 | if (cached_buffer) |
| 185 | cached_buffer->seek(mark.offset_in_compressed_file, mark.offset_in_decompressed_block); |
| 186 | if (non_cached_buffer) |
| 187 | non_cached_buffer->seek(mark.offset_in_compressed_file, mark.offset_in_decompressed_block); |
| 188 | } |
| 189 | catch (Exception & e) |
| 190 | { |
| 191 | /// Better diagnostics. |
| 192 | if (e.code() == ErrorCodes::ARGUMENT_OUT_OF_BOUND) |
| 193 | e.addMessage("(while seeking to mark " + toString(index) |
| 194 | + " of column " + path_prefix + "; offsets are: " |
| 195 | + toString(mark.offset_in_compressed_file) + " " |
| 196 | + toString(mark.offset_in_decompressed_block) + ")" ); |
| 197 | |
| 198 | throw; |
| 199 | } |
| 200 | } |
| 201 | |
| 202 | |
| 203 | void MergeTreeReaderStream::seekToStart() |
| 204 | { |
| 205 | try |
| 206 | { |
| 207 | if (cached_buffer) |
| 208 | cached_buffer->seek(0, 0); |
| 209 | if (non_cached_buffer) |
| 210 | non_cached_buffer->seek(0, 0); |
| 211 | } |
| 212 | catch (Exception & e) |
| 213 | { |
| 214 | /// Better diagnostics. |
| 215 | if (e.code() == ErrorCodes::ARGUMENT_OUT_OF_BOUND) |
| 216 | e.addMessage("(while seeking to start of column " + path_prefix + ")" ); |
| 217 | |
| 218 | throw; |
| 219 | } |
| 220 | } |
| 221 | |
| 222 | } |
| 223 | |