1#pragma once
2
3#include <Core/NamesAndTypes.h>
4#include <Storages/MergeTree/RangesInDataPart.h>
5#include <Storages/MergeTree/MergeTreeBlockReadUtils.h>
6#include <Storages/SelectQueryInfo.h>
7#include <mutex>
8
9
10namespace DB
11{
12
13using MergeTreeReadTaskPtr = std::unique_ptr<MergeTreeReadTask>;
14
15/** Provides read tasks for MergeTreeThreadSelectBlockInputStream`s in fine-grained batches, allowing for more
16 * uniform distribution of work amongst multiple threads. All parts and their ranges are divided into `threads`
17 * workloads with at most `sum_marks / threads` marks. Then, threads are performing reads from these workloads
18 * in "sequential" manner, requesting work in small batches. As soon as some thread has exhausted
19 * it's workload, it either is signaled that no more work is available (`do_not_steal_tasks == false`) or
20 * continues taking small batches from other threads' workloads (`do_not_steal_tasks == true`).
21 */
22class MergeTreeReadPool : private boost::noncopyable
23{
24public:
25 /** Pull could dynamically lower (backoff) number of threads, if read operation are too slow.
26 * Settings for that backoff.
27 */
28 struct BackoffSettings
29 {
30 /// Pay attention only to reads, that took at least this amount of time. If set to 0 - means backoff is disabled.
31 size_t min_read_latency_ms = 1000;
32 /// Count events, when read throughput is less than specified bytes per second.
33 size_t max_throughput = 1048576;
34 /// Do not pay attention to event, if not enough time passed since previous event.
35 size_t min_interval_between_events_ms = 1000;
36 /// Number of events to do backoff - to lower number of threads in pool.
37 size_t min_events = 2;
38
39 /// Constants above is just an example.
40 BackoffSettings(const Settings & settings)
41 : min_read_latency_ms(settings.read_backoff_min_latency_ms.totalMilliseconds()),
42 max_throughput(settings.read_backoff_max_throughput),
43 min_interval_between_events_ms(settings.read_backoff_min_interval_between_events_ms.totalMilliseconds()),
44 min_events(settings.read_backoff_min_events)
45 {
46 }
47
48 BackoffSettings() : min_read_latency_ms(0) {}
49 };
50
51 BackoffSettings backoff_settings;
52
53private:
54 /** State to track numbers of slow reads.
55 */
56 struct BackoffState
57 {
58 size_t current_threads;
59 Stopwatch time_since_prev_event {CLOCK_MONOTONIC_COARSE};
60 size_t num_events = 0;
61
62 BackoffState(size_t threads) : current_threads(threads) {}
63 };
64
65 BackoffState backoff_state;
66
67public:
68 MergeTreeReadPool(
69 const size_t threads_, const size_t sum_marks_, const size_t min_marks_for_concurrent_read_,
70 RangesInDataParts parts_, const MergeTreeData & data_, const PrewhereInfoPtr & prewhere_info_,
71 const bool check_columns_, const Names & column_names_,
72 const BackoffSettings & backoff_settings_, size_t preferred_block_size_bytes_,
73 const bool do_not_steal_tasks_ = false);
74
75 MergeTreeReadTaskPtr getTask(const size_t min_marks_to_read, const size_t thread, const Names & ordered_names);
76
77 /** Each worker could call this method and pass information about read performance.
78 * If read performance is too low, pool could decide to lower number of threads: do not assign more tasks to several threads.
79 * This allows to overcome excessive load to disk subsystem, when reads are not from page cache.
80 */
81 void profileFeedback(const ReadBufferFromFileBase::ProfileInfo info);
82
83 /// This method tells which mark ranges we have to read if we start from @from mark range
84 MarkRanges getRestMarks(const MergeTreeDataPart & part, const MarkRange & from) const;
85
86 Block getHeader() const;
87
88private:
89 std::vector<size_t> fillPerPartInfo(
90 RangesInDataParts & parts, const bool check_columns);
91
92 void fillPerThreadInfo(
93 const size_t threads, const size_t sum_marks, std::vector<size_t> per_part_sum_marks,
94 RangesInDataParts & parts, const size_t min_marks_for_concurrent_read);
95
96 std::vector<std::shared_lock<std::shared_mutex>> per_part_columns_lock;
97 const MergeTreeData & data;
98 Names column_names;
99 bool do_not_steal_tasks;
100 bool predict_block_size_bytes;
101 std::vector<NameSet> per_part_column_name_set;
102 std::vector<NamesAndTypesList> per_part_columns;
103 std::vector<NamesAndTypesList> per_part_pre_columns;
104 std::vector<char> per_part_should_reorder;
105 std::vector<MergeTreeBlockSizePredictorPtr> per_part_size_predictor;
106 PrewhereInfoPtr prewhere_info;
107
108 struct Part
109 {
110 MergeTreeData::DataPartPtr data_part;
111 size_t part_index_in_query;
112 };
113
114 std::vector<Part> parts_with_idx;
115
116 struct ThreadTask
117 {
118 struct PartIndexAndRange
119 {
120 size_t part_idx;
121 MarkRanges ranges;
122 };
123
124 std::vector<PartIndexAndRange> parts_and_ranges;
125 std::vector<size_t> sum_marks_in_parts;
126 };
127
128 std::vector<ThreadTask> threads_tasks;
129
130 std::set<size_t> remaining_thread_tasks;
131
132 RangesInDataParts parts_ranges;
133
134 mutable std::mutex mutex;
135
136 Logger * log = &Logger::get("MergeTreeReadPool");
137};
138
139using MergeTreeReadPoolPtr = std::shared_ptr<MergeTreeReadPool>;
140
141}
142