| 1 | #include <Storages/MergeTree/MergeTreeReaderStream.h> | 
|---|
| 2 | #include <Poco/File.h> | 
|---|
| 3 |  | 
|---|
| 4 |  | 
|---|
| 5 | namespace DB | 
|---|
| 6 | { | 
|---|
| 7 |  | 
|---|
| 8 | namespace ErrorCodes | 
|---|
| 9 | { | 
|---|
| 10 | extern const int LOGICAL_ERROR; | 
|---|
| 11 | extern const int CORRUPTED_DATA; | 
|---|
| 12 | extern const int CANNOT_READ_ALL_DATA; | 
|---|
| 13 | extern const int ARGUMENT_OUT_OF_BOUND; | 
|---|
| 14 | } | 
|---|
| 15 |  | 
|---|
| 16 |  | 
|---|
| 17 | MergeTreeReaderStream::MergeTreeReaderStream( | 
|---|
| 18 | const String & path_prefix_, const String & data_file_extension_, size_t marks_count_, | 
|---|
| 19 | const MarkRanges & all_mark_ranges, | 
|---|
| 20 | MarkCache * mark_cache_, bool save_marks_in_cache_, | 
|---|
| 21 | UncompressedCache * uncompressed_cache, | 
|---|
| 22 | size_t file_size, size_t aio_threshold, size_t max_read_buffer_size, | 
|---|
| 23 | const MergeTreeIndexGranularityInfo * index_granularity_info_, | 
|---|
| 24 | const ReadBufferFromFileBase::ProfileCallback & profile_callback, clockid_t clock_type) | 
|---|
| 25 | : path_prefix(path_prefix_), data_file_extension(data_file_extension_), marks_count(marks_count_) | 
|---|
| 26 | , mark_cache(mark_cache_), save_marks_in_cache(save_marks_in_cache_) | 
|---|
| 27 | , index_granularity_info(index_granularity_info_) | 
|---|
| 28 | { | 
|---|
| 29 | /// Compute the size of the buffer. | 
|---|
| 30 | size_t max_mark_range_bytes = 0; | 
|---|
| 31 | size_t sum_mark_range_bytes = 0; | 
|---|
| 32 |  | 
|---|
| 33 | /// Care should be taken to not load marks when the part is empty (marks_count == 0). | 
|---|
| 34 |  | 
|---|
| 35 | for (const auto & mark_range : all_mark_ranges) | 
|---|
| 36 | { | 
|---|
| 37 | size_t left_mark = mark_range.begin; | 
|---|
| 38 | size_t right_mark = mark_range.end; | 
|---|
| 39 |  | 
|---|
| 40 | /// NOTE: if we are reading the whole file, then right_mark == marks_count | 
|---|
| 41 | /// and we will use max_read_buffer_size for buffer size, thus avoiding the need to load marks. | 
|---|
| 42 |  | 
|---|
| 43 | /// If the end of range is inside the block, we will need to read it too. | 
|---|
| 44 | if (right_mark < marks_count && getMark(right_mark).offset_in_decompressed_block > 0) | 
|---|
| 45 | { | 
|---|
| 46 | while (right_mark < marks_count | 
|---|
| 47 | && getMark(right_mark).offset_in_compressed_file == getMark(mark_range.end).offset_in_compressed_file) | 
|---|
| 48 | { | 
|---|
| 49 | ++right_mark; | 
|---|
| 50 | } | 
|---|
| 51 | } | 
|---|
| 52 |  | 
|---|
| 53 | size_t mark_range_bytes; | 
|---|
| 54 |  | 
|---|
| 55 | /// If there are no marks after the end of range, just use file size | 
|---|
| 56 | if (right_mark >= marks_count | 
|---|
| 57 | || (right_mark + 1 == marks_count | 
|---|
| 58 | && getMark(right_mark).offset_in_compressed_file == getMark(mark_range.end).offset_in_compressed_file)) | 
|---|
| 59 | { | 
|---|
| 60 | mark_range_bytes = file_size - (left_mark < marks_count ? getMark(left_mark).offset_in_compressed_file : 0); | 
|---|
| 61 | } | 
|---|
| 62 | else | 
|---|
| 63 | { | 
|---|
| 64 | mark_range_bytes = getMark(right_mark).offset_in_compressed_file - getMark(left_mark).offset_in_compressed_file; | 
|---|
| 65 | } | 
|---|
| 66 |  | 
|---|
| 67 | max_mark_range_bytes = std::max(max_mark_range_bytes, mark_range_bytes); | 
|---|
| 68 | sum_mark_range_bytes += mark_range_bytes; | 
|---|
| 69 | } | 
|---|
| 70 |  | 
|---|
| 71 | /// Avoid empty buffer. May happen while reading dictionary for DataTypeLowCardinality. | 
|---|
| 72 | /// For example: part has single dictionary and all marks point to the same position. | 
|---|
| 73 | if (max_mark_range_bytes == 0) | 
|---|
| 74 | max_mark_range_bytes = max_read_buffer_size; | 
|---|
| 75 |  | 
|---|
| 76 | size_t buffer_size = std::min(max_read_buffer_size, max_mark_range_bytes); | 
|---|
| 77 |  | 
|---|
| 78 | /// Initialize the objects that shall be used to perform read operations. | 
|---|
| 79 | if (uncompressed_cache) | 
|---|
| 80 | { | 
|---|
| 81 | auto buffer = std::make_unique<CachedCompressedReadBuffer>( | 
|---|
| 82 | path_prefix + data_file_extension, uncompressed_cache, sum_mark_range_bytes, aio_threshold, buffer_size); | 
|---|
| 83 |  | 
|---|
| 84 | if (profile_callback) | 
|---|
| 85 | buffer->setProfileCallback(profile_callback, clock_type); | 
|---|
| 86 |  | 
|---|
| 87 | cached_buffer = std::move(buffer); | 
|---|
| 88 | data_buffer = cached_buffer.get(); | 
|---|
| 89 | } | 
|---|
| 90 | else | 
|---|
| 91 | { | 
|---|
| 92 | auto buffer = std::make_unique<CompressedReadBufferFromFile>( | 
|---|
| 93 | path_prefix + data_file_extension, sum_mark_range_bytes, aio_threshold, buffer_size); | 
|---|
| 94 |  | 
|---|
| 95 | if (profile_callback) | 
|---|
| 96 | buffer->setProfileCallback(profile_callback, clock_type); | 
|---|
| 97 |  | 
|---|
| 98 | non_cached_buffer = std::move(buffer); | 
|---|
| 99 | data_buffer = non_cached_buffer.get(); | 
|---|
| 100 | } | 
|---|
| 101 | } | 
|---|
| 102 |  | 
|---|
| 103 |  | 
|---|
| 104 | const MarkInCompressedFile & MergeTreeReaderStream::getMark(size_t index) | 
|---|
| 105 | { | 
|---|
| 106 | if (!marks) | 
|---|
| 107 | loadMarks(); | 
|---|
| 108 | return (*marks)[index]; | 
|---|
| 109 | } | 
|---|
| 110 |  | 
|---|
| 111 |  | 
|---|
| 112 | void MergeTreeReaderStream::loadMarks() | 
|---|
| 113 | { | 
|---|
| 114 | std::string mrk_path = index_granularity_info->getMarksFilePath(path_prefix); | 
|---|
| 115 |  | 
|---|
| 116 | auto load = [&]() -> MarkCache::MappedPtr | 
|---|
| 117 | { | 
|---|
| 118 | /// Memory for marks must not be accounted as memory usage for query, because they are stored in shared cache. | 
|---|
| 119 | auto temporarily_disable_memory_tracker = getCurrentMemoryTrackerActionLock(); | 
|---|
| 120 |  | 
|---|
| 121 | size_t file_size = Poco::File(mrk_path).getSize(); | 
|---|
| 122 | size_t expected_file_size = index_granularity_info->mark_size_in_bytes * marks_count; | 
|---|
| 123 | if (expected_file_size != file_size) | 
|---|
| 124 | throw Exception( | 
|---|
| 125 | "Bad size of marks file '"+ mrk_path + "': "+ std::to_string(file_size) + ", must be: "+ std::to_string(expected_file_size), | 
|---|
| 126 | ErrorCodes::CORRUPTED_DATA); | 
|---|
| 127 |  | 
|---|
| 128 | auto res = std::make_shared<MarksInCompressedFile>(marks_count); | 
|---|
| 129 |  | 
|---|
| 130 | if (!index_granularity_info->is_adaptive) | 
|---|
| 131 | { | 
|---|
| 132 | /// Read directly to marks. | 
|---|
| 133 | ReadBufferFromFile buffer(mrk_path, file_size, -1, reinterpret_cast<char *>(res->data())); | 
|---|
| 134 |  | 
|---|
| 135 | if (buffer.eof() || buffer.buffer().size() != file_size) | 
|---|
| 136 | throw Exception( "Cannot read all marks from file "+ mrk_path, ErrorCodes::CANNOT_READ_ALL_DATA); | 
|---|
| 137 | } | 
|---|
| 138 | else | 
|---|
| 139 | { | 
|---|
| 140 | ReadBufferFromFile buffer(mrk_path, file_size, -1); | 
|---|
| 141 | size_t i = 0; | 
|---|
| 142 | while (!buffer.eof()) | 
|---|
| 143 | { | 
|---|
| 144 | readIntBinary((*res)[i].offset_in_compressed_file, buffer); | 
|---|
| 145 | readIntBinary((*res)[i].offset_in_decompressed_block, buffer); | 
|---|
| 146 | buffer.seek(sizeof(size_t), SEEK_CUR); | 
|---|
| 147 | ++i; | 
|---|
| 148 | } | 
|---|
| 149 | if (i * index_granularity_info->mark_size_in_bytes != file_size) | 
|---|
| 150 | throw Exception( "Cannot read all marks from file "+ mrk_path, ErrorCodes::CANNOT_READ_ALL_DATA); | 
|---|
| 151 | } | 
|---|
| 152 | res->protect(); | 
|---|
| 153 | return res; | 
|---|
| 154 | }; | 
|---|
| 155 |  | 
|---|
| 156 | if (mark_cache) | 
|---|
| 157 | { | 
|---|
| 158 | auto key = mark_cache->hash(mrk_path); | 
|---|
| 159 | if (save_marks_in_cache) | 
|---|
| 160 | { | 
|---|
| 161 | marks = mark_cache->getOrSet(key, load); | 
|---|
| 162 | } | 
|---|
| 163 | else | 
|---|
| 164 | { | 
|---|
| 165 | marks = mark_cache->get(key); | 
|---|
| 166 | if (!marks) | 
|---|
| 167 | marks = load(); | 
|---|
| 168 | } | 
|---|
| 169 | } | 
|---|
| 170 | else | 
|---|
| 171 | marks = load(); | 
|---|
| 172 |  | 
|---|
| 173 | if (!marks) | 
|---|
| 174 | throw Exception( "Failed to load marks: "+ mrk_path, ErrorCodes::LOGICAL_ERROR); | 
|---|
| 175 | } | 
|---|
| 176 |  | 
|---|
| 177 |  | 
|---|
| 178 | void MergeTreeReaderStream::seekToMark(size_t index) | 
|---|
| 179 | { | 
|---|
| 180 | MarkInCompressedFile mark = getMark(index); | 
|---|
| 181 |  | 
|---|
| 182 | try | 
|---|
| 183 | { | 
|---|
| 184 | if (cached_buffer) | 
|---|
| 185 | cached_buffer->seek(mark.offset_in_compressed_file, mark.offset_in_decompressed_block); | 
|---|
| 186 | if (non_cached_buffer) | 
|---|
| 187 | non_cached_buffer->seek(mark.offset_in_compressed_file, mark.offset_in_decompressed_block); | 
|---|
| 188 | } | 
|---|
| 189 | catch (Exception & e) | 
|---|
| 190 | { | 
|---|
| 191 | /// Better diagnostics. | 
|---|
| 192 | if (e.code() == ErrorCodes::ARGUMENT_OUT_OF_BOUND) | 
|---|
| 193 | e.addMessage( "(while seeking to mark "+ toString(index) | 
|---|
| 194 | + " of column "+ path_prefix + "; offsets are: " | 
|---|
| 195 | + toString(mark.offset_in_compressed_file) + " " | 
|---|
| 196 | + toString(mark.offset_in_decompressed_block) + ")"); | 
|---|
| 197 |  | 
|---|
| 198 | throw; | 
|---|
| 199 | } | 
|---|
| 200 | } | 
|---|
| 201 |  | 
|---|
| 202 |  | 
|---|
| 203 | void MergeTreeReaderStream::seekToStart() | 
|---|
| 204 | { | 
|---|
| 205 | try | 
|---|
| 206 | { | 
|---|
| 207 | if (cached_buffer) | 
|---|
| 208 | cached_buffer->seek(0, 0); | 
|---|
| 209 | if (non_cached_buffer) | 
|---|
| 210 | non_cached_buffer->seek(0, 0); | 
|---|
| 211 | } | 
|---|
| 212 | catch (Exception & e) | 
|---|
| 213 | { | 
|---|
| 214 | /// Better diagnostics. | 
|---|
| 215 | if (e.code() == ErrorCodes::ARGUMENT_OUT_OF_BOUND) | 
|---|
| 216 | e.addMessage( "(while seeking to start of column "+ path_prefix + ")"); | 
|---|
| 217 |  | 
|---|
| 218 | throw; | 
|---|
| 219 | } | 
|---|
| 220 | } | 
|---|
| 221 |  | 
|---|
| 222 | } | 
|---|
| 223 |  | 
|---|