1#pragma once
2
3#include <Core/NamesAndTypes.h>
4#include <Storages/MergeTree/MergeTreeReaderStream.h>
5#include <port/clock.h>
6
7
8namespace DB
9{
10
11class IDataType;
12
13/// Reads the data between pairs of marks in the same part. When reading consecutive ranges, avoids unnecessary seeks.
14/// When ranges are almost consecutive, seeks are fast because they are performed inside the buffer.
15/// Avoids loading the marks file if it is not needed (e.g. when reading the whole part).
16class MergeTreeReader : private boost::noncopyable
17{
18public:
19 using ValueSizeMap = std::map<std::string, double>;
20 using DeserializeBinaryBulkStateMap = std::map<std::string, IDataType::DeserializeBinaryBulkStatePtr>;
21
22 MergeTreeReader(String path_, /// Path to the directory containing the part
23 MergeTreeData::DataPartPtr data_part_,
24 NamesAndTypesList columns_,
25 UncompressedCache * uncompressed_cache_,
26 MarkCache * mark_cache_,
27 bool save_marks_in_cache_,
28 const MergeTreeData & storage_,
29 MarkRanges all_mark_ranges_,
30 size_t aio_threshold_,
31 size_t max_read_buffer_size_,
32 ValueSizeMap avg_value_size_hints_ = ValueSizeMap{},
33 const ReadBufferFromFileBase::ProfileCallback & profile_callback_ = ReadBufferFromFileBase::ProfileCallback{},
34 clockid_t clock_type_ = CLOCK_MONOTONIC_COARSE);
35
36 ~MergeTreeReader();
37
38 const ValueSizeMap & getAvgValueSizeHints() const;
39
40 /// Add columns from ordered_names that are not present in the block.
41 /// Missing columns are added in the order specified by ordered_names.
42 /// num_rows is needed in case if all res_columns are nullptr.
43 void fillMissingColumns(Columns & res_columns, bool & should_evaluate_missing_defaults, size_t num_rows);
44 /// Evaluate defaulted columns if necessary.
45 void evaluateMissingDefaults(Block additional_columns, Columns & res_columns);
46
47 const NamesAndTypesList & getColumns() const { return columns; }
48 size_t numColumnsInResult() const { return columns.size(); }
49
50 /// Return the number of rows has been read or zero if there is no columns to read.
51 /// If continue_reading is true, continue reading from last state, otherwise seek to from_mark.
52 /// Fills res_columns in order specified in getColumns() list. If column was not read it will be nullptr.
53 size_t readRows(size_t from_mark, bool continue_reading, size_t max_rows_to_read, Columns & res_columns);
54
55 MergeTreeData::DataPartPtr data_part;
56
57 size_t getFirstMarkToRead() const
58 {
59 return all_mark_ranges.back().begin;
60 }
61private:
62 using FileStreams = std::map<std::string, std::unique_ptr<MergeTreeReaderStream>>;
63
64 /// avg_value_size_hints are used to reduce the number of reallocations when creating columns of variable size.
65 ValueSizeMap avg_value_size_hints;
66 /// Stores states for IDataType::deserializeBinaryBulk
67 DeserializeBinaryBulkStateMap deserialize_binary_bulk_state_map;
68 /// Path to the directory containing the part
69 String path;
70
71 FileStreams streams;
72
73 /// Columns that are read.
74 NamesAndTypesList columns;
75
76 UncompressedCache * uncompressed_cache;
77 MarkCache * mark_cache;
78 /// If save_marks_in_cache is false, then, if marks are not in cache, we will load them but won't save in the cache, to avoid evicting other data.
79 bool save_marks_in_cache;
80
81 const MergeTreeData & storage;
82 MarkRanges all_mark_ranges;
83 size_t aio_threshold;
84 size_t max_read_buffer_size;
85
86 void addStreams(const String & name, const IDataType & type,
87 const ReadBufferFromFileBase::ProfileCallback & profile_callback, clockid_t clock_type);
88
89 void readData(
90 const String & name, const IDataType & type, IColumn & column,
91 size_t from_mark, bool continue_reading, size_t max_rows_to_read,
92 bool read_offsets = true);
93
94
95 friend class MergeTreeRangeReader::DelayedStream;
96};
97
98}
99