| 1 | #include <Storages/MergeTree/MergeTreeReaderStream.h> | 
| 2 | #include <Poco/File.h> | 
| 3 |  | 
| 4 |  | 
| 5 | namespace DB | 
| 6 | { | 
| 7 |  | 
| 8 | namespace ErrorCodes | 
| 9 | { | 
| 10 |     extern const int LOGICAL_ERROR; | 
| 11 |     extern const int CORRUPTED_DATA; | 
| 12 |     extern const int CANNOT_READ_ALL_DATA; | 
| 13 |     extern const int ARGUMENT_OUT_OF_BOUND; | 
| 14 | } | 
| 15 |  | 
| 16 |  | 
| 17 | MergeTreeReaderStream::MergeTreeReaderStream( | 
| 18 |         const String & path_prefix_, const String & data_file_extension_, size_t marks_count_, | 
| 19 |         const MarkRanges & all_mark_ranges, | 
| 20 |         MarkCache * mark_cache_, bool save_marks_in_cache_, | 
| 21 |         UncompressedCache * uncompressed_cache, | 
| 22 |         size_t file_size, size_t aio_threshold, size_t max_read_buffer_size, | 
| 23 |         const MergeTreeIndexGranularityInfo * index_granularity_info_, | 
| 24 |         const ReadBufferFromFileBase::ProfileCallback & profile_callback, clockid_t clock_type) | 
| 25 |         : path_prefix(path_prefix_), data_file_extension(data_file_extension_), marks_count(marks_count_) | 
| 26 |         , mark_cache(mark_cache_), save_marks_in_cache(save_marks_in_cache_) | 
| 27 |         , index_granularity_info(index_granularity_info_) | 
| 28 | { | 
| 29 |     /// Compute the size of the buffer. | 
| 30 |     size_t max_mark_range_bytes = 0; | 
| 31 |     size_t sum_mark_range_bytes = 0; | 
| 32 |  | 
| 33 |     /// Care should be taken to not load marks when the part is empty (marks_count == 0). | 
| 34 |  | 
| 35 |     for (const auto & mark_range : all_mark_ranges) | 
| 36 |     { | 
| 37 |         size_t left_mark = mark_range.begin; | 
| 38 |         size_t right_mark = mark_range.end; | 
| 39 |  | 
| 40 |         /// NOTE: if we are reading the whole file, then right_mark == marks_count | 
| 41 |         /// and we will use max_read_buffer_size for buffer size, thus avoiding the need to load marks. | 
| 42 |  | 
| 43 |         /// If the end of range is inside the block, we will need to read it too. | 
| 44 |         if (right_mark < marks_count && getMark(right_mark).offset_in_decompressed_block > 0) | 
| 45 |         { | 
| 46 |             while (right_mark < marks_count | 
| 47 |                 && getMark(right_mark).offset_in_compressed_file == getMark(mark_range.end).offset_in_compressed_file) | 
| 48 |             { | 
| 49 |                 ++right_mark; | 
| 50 |             } | 
| 51 |         } | 
| 52 |  | 
| 53 |         size_t mark_range_bytes; | 
| 54 |  | 
| 55 |         /// If there are no marks after the end of range, just use file size | 
| 56 |         if (right_mark >= marks_count | 
| 57 |             || (right_mark + 1 == marks_count | 
| 58 |                 && getMark(right_mark).offset_in_compressed_file == getMark(mark_range.end).offset_in_compressed_file)) | 
| 59 |         { | 
| 60 |             mark_range_bytes = file_size - (left_mark < marks_count ? getMark(left_mark).offset_in_compressed_file : 0); | 
| 61 |         } | 
| 62 |         else | 
| 63 |         { | 
| 64 |             mark_range_bytes = getMark(right_mark).offset_in_compressed_file - getMark(left_mark).offset_in_compressed_file; | 
| 65 |         } | 
| 66 |  | 
| 67 |         max_mark_range_bytes = std::max(max_mark_range_bytes, mark_range_bytes); | 
| 68 |         sum_mark_range_bytes += mark_range_bytes; | 
| 69 |     } | 
| 70 |  | 
| 71 |     /// Avoid empty buffer. May happen while reading dictionary for DataTypeLowCardinality. | 
| 72 |     /// For example: part has single dictionary and all marks point to the same position. | 
| 73 |     if (max_mark_range_bytes == 0) | 
| 74 |         max_mark_range_bytes = max_read_buffer_size; | 
| 75 |  | 
| 76 |     size_t buffer_size = std::min(max_read_buffer_size, max_mark_range_bytes); | 
| 77 |  | 
| 78 |     /// Initialize the objects that shall be used to perform read operations. | 
| 79 |     if (uncompressed_cache) | 
| 80 |     { | 
| 81 |         auto buffer = std::make_unique<CachedCompressedReadBuffer>( | 
| 82 |             path_prefix + data_file_extension, uncompressed_cache, sum_mark_range_bytes, aio_threshold, buffer_size); | 
| 83 |  | 
| 84 |         if (profile_callback) | 
| 85 |             buffer->setProfileCallback(profile_callback, clock_type); | 
| 86 |  | 
| 87 |         cached_buffer = std::move(buffer); | 
| 88 |         data_buffer = cached_buffer.get(); | 
| 89 |     } | 
| 90 |     else | 
| 91 |     { | 
| 92 |         auto buffer = std::make_unique<CompressedReadBufferFromFile>( | 
| 93 |             path_prefix + data_file_extension, sum_mark_range_bytes, aio_threshold, buffer_size); | 
| 94 |  | 
| 95 |         if (profile_callback) | 
| 96 |             buffer->setProfileCallback(profile_callback, clock_type); | 
| 97 |  | 
| 98 |         non_cached_buffer = std::move(buffer); | 
| 99 |         data_buffer = non_cached_buffer.get(); | 
| 100 |     } | 
| 101 | } | 
| 102 |  | 
| 103 |  | 
| 104 | const MarkInCompressedFile & MergeTreeReaderStream::getMark(size_t index) | 
| 105 | { | 
| 106 |     if (!marks) | 
| 107 |         loadMarks(); | 
| 108 |     return (*marks)[index]; | 
| 109 | } | 
| 110 |  | 
| 111 |  | 
| 112 | void MergeTreeReaderStream::loadMarks() | 
| 113 | { | 
| 114 |     std::string mrk_path = index_granularity_info->getMarksFilePath(path_prefix); | 
| 115 |  | 
| 116 |     auto load = [&]() -> MarkCache::MappedPtr | 
| 117 |     { | 
| 118 |         /// Memory for marks must not be accounted as memory usage for query, because they are stored in shared cache. | 
| 119 |         auto temporarily_disable_memory_tracker = getCurrentMemoryTrackerActionLock(); | 
| 120 |  | 
| 121 |         size_t file_size = Poco::File(mrk_path).getSize(); | 
| 122 |         size_t expected_file_size = index_granularity_info->mark_size_in_bytes * marks_count; | 
| 123 |         if (expected_file_size != file_size) | 
| 124 |             throw Exception( | 
| 125 |                 "Bad size of marks file '"  + mrk_path + "': "  + std::to_string(file_size) + ", must be: "  + std::to_string(expected_file_size), | 
| 126 |                 ErrorCodes::CORRUPTED_DATA); | 
| 127 |  | 
| 128 |         auto res = std::make_shared<MarksInCompressedFile>(marks_count); | 
| 129 |  | 
| 130 |         if (!index_granularity_info->is_adaptive) | 
| 131 |         { | 
| 132 |             /// Read directly to marks. | 
| 133 |             ReadBufferFromFile buffer(mrk_path, file_size, -1, reinterpret_cast<char *>(res->data())); | 
| 134 |  | 
| 135 |             if (buffer.eof() || buffer.buffer().size() != file_size) | 
| 136 |                 throw Exception("Cannot read all marks from file "  + mrk_path, ErrorCodes::CANNOT_READ_ALL_DATA); | 
| 137 |         } | 
| 138 |         else | 
| 139 |         { | 
| 140 |             ReadBufferFromFile buffer(mrk_path, file_size, -1); | 
| 141 |             size_t i = 0; | 
| 142 |             while (!buffer.eof()) | 
| 143 |             { | 
| 144 |                 readIntBinary((*res)[i].offset_in_compressed_file, buffer); | 
| 145 |                 readIntBinary((*res)[i].offset_in_decompressed_block, buffer); | 
| 146 |                 buffer.seek(sizeof(size_t), SEEK_CUR); | 
| 147 |                 ++i; | 
| 148 |             } | 
| 149 |             if (i * index_granularity_info->mark_size_in_bytes != file_size) | 
| 150 |                 throw Exception("Cannot read all marks from file "  + mrk_path, ErrorCodes::CANNOT_READ_ALL_DATA); | 
| 151 |         } | 
| 152 |         res->protect(); | 
| 153 |         return res; | 
| 154 |     }; | 
| 155 |  | 
| 156 |     if (mark_cache) | 
| 157 |     { | 
| 158 |         auto key = mark_cache->hash(mrk_path); | 
| 159 |         if (save_marks_in_cache) | 
| 160 |         { | 
| 161 |             marks = mark_cache->getOrSet(key, load); | 
| 162 |         } | 
| 163 |         else | 
| 164 |         { | 
| 165 |             marks = mark_cache->get(key); | 
| 166 |             if (!marks) | 
| 167 |                 marks = load(); | 
| 168 |         } | 
| 169 |     } | 
| 170 |     else | 
| 171 |         marks = load(); | 
| 172 |  | 
| 173 |     if (!marks) | 
| 174 |         throw Exception("Failed to load marks: "  + mrk_path, ErrorCodes::LOGICAL_ERROR); | 
| 175 | } | 
| 176 |  | 
| 177 |  | 
| 178 | void MergeTreeReaderStream::seekToMark(size_t index) | 
| 179 | { | 
| 180 |     MarkInCompressedFile mark = getMark(index); | 
| 181 |  | 
| 182 |     try | 
| 183 |     { | 
| 184 |         if (cached_buffer) | 
| 185 |             cached_buffer->seek(mark.offset_in_compressed_file, mark.offset_in_decompressed_block); | 
| 186 |         if (non_cached_buffer) | 
| 187 |             non_cached_buffer->seek(mark.offset_in_compressed_file, mark.offset_in_decompressed_block); | 
| 188 |     } | 
| 189 |     catch (Exception & e) | 
| 190 |     { | 
| 191 |         /// Better diagnostics. | 
| 192 |         if (e.code() == ErrorCodes::ARGUMENT_OUT_OF_BOUND) | 
| 193 |             e.addMessage("(while seeking to mark "  + toString(index) | 
| 194 |                          + " of column "  + path_prefix + "; offsets are: "  | 
| 195 |                          + toString(mark.offset_in_compressed_file) + " "  | 
| 196 |                          + toString(mark.offset_in_decompressed_block) + ")" ); | 
| 197 |  | 
| 198 |         throw; | 
| 199 |     } | 
| 200 | } | 
| 201 |  | 
| 202 |  | 
| 203 | void MergeTreeReaderStream::seekToStart() | 
| 204 | { | 
| 205 |     try | 
| 206 |     { | 
| 207 |         if (cached_buffer) | 
| 208 |             cached_buffer->seek(0, 0); | 
| 209 |         if (non_cached_buffer) | 
| 210 |             non_cached_buffer->seek(0, 0); | 
| 211 |     } | 
| 212 |     catch (Exception & e) | 
| 213 |     { | 
| 214 |         /// Better diagnostics. | 
| 215 |         if (e.code() == ErrorCodes::ARGUMENT_OUT_OF_BOUND) | 
| 216 |             e.addMessage("(while seeking to start of column "  + path_prefix + ")" ); | 
| 217 |  | 
| 218 |         throw; | 
| 219 |     } | 
| 220 | } | 
| 221 |  | 
| 222 | } | 
| 223 |  |