1 | #pragma once |
2 | |
3 | #include <map> |
4 | #include <mutex> |
5 | #include <condition_variable> |
6 | #include <memory> |
7 | #include <chrono> |
8 | #include <Common/CurrentMetrics.h> |
9 | #include <Common/Stopwatch.h> |
10 | |
11 | |
12 | namespace CurrentMetrics |
13 | { |
14 | extern const Metric QueryPreempted; |
15 | } |
16 | |
17 | |
18 | namespace DB |
19 | { |
20 | |
21 | /** Implements query priorities in very primitive way. |
22 | * Allows to freeze query execution if at least one query of higher priority is executed. |
23 | * |
24 | * Priority value is integer, smaller means higher priority. |
25 | * |
26 | * Priority 0 is special - queries with that priority is always executed, |
27 | * not depends on other queries and not affect other queries. |
28 | * Thus 0 means - don't use priorities. |
29 | * |
30 | * NOTE Possibilities for improvement: |
31 | * - implement limit on maximum number of running queries with same priority. |
32 | */ |
33 | class QueryPriorities |
34 | { |
35 | public: |
36 | using Priority = int; |
37 | |
38 | private: |
39 | friend struct Handle; |
40 | |
41 | using Count = int; |
42 | |
43 | /// Number of currently running queries for each priority. |
44 | using Container = std::map<Priority, Count>; |
45 | |
46 | std::mutex mutex; |
47 | std::condition_variable condvar; |
48 | Container container; |
49 | |
50 | |
51 | /** If there are higher priority queries - sleep until they are finish or timeout happens. |
52 | */ |
53 | template <typename Duration> |
54 | void waitIfNeed(Priority priority, Duration timeout) |
55 | { |
56 | if (0 == priority) |
57 | return; |
58 | |
59 | std::unique_lock lock(mutex); |
60 | |
61 | /// Is there at least one more priority query? |
62 | bool found = false; |
63 | for (const auto & value : container) |
64 | { |
65 | if (value.first >= priority) |
66 | break; |
67 | |
68 | if (value.second > 0) |
69 | { |
70 | found = true; |
71 | break; |
72 | } |
73 | } |
74 | |
75 | if (!found) |
76 | return; |
77 | |
78 | CurrentMetrics::Increment metric_increment{CurrentMetrics::QueryPreempted}; |
79 | |
80 | /// Spurious wakeups are Ok. We allow to wait less than requested. |
81 | condvar.wait_for(lock, timeout); |
82 | } |
83 | |
84 | public: |
85 | struct HandleImpl |
86 | { |
87 | private: |
88 | QueryPriorities & parent; |
89 | QueryPriorities::Container::value_type & value; |
90 | |
91 | public: |
92 | HandleImpl(QueryPriorities & parent_, QueryPriorities::Container::value_type & value_) |
93 | : parent(parent_), value(value_) {} |
94 | |
95 | ~HandleImpl() |
96 | { |
97 | { |
98 | std::lock_guard lock(parent.mutex); |
99 | --value.second; |
100 | } |
101 | parent.condvar.notify_all(); |
102 | } |
103 | |
104 | template <typename Duration> |
105 | void waitIfNeed(Duration timeout) |
106 | { |
107 | parent.waitIfNeed(value.first, timeout); |
108 | } |
109 | }; |
110 | |
111 | using Handle = std::shared_ptr<HandleImpl>; |
112 | |
113 | /** Register query with specified priority. |
114 | * Returns an object that remove record in destructor. |
115 | */ |
116 | Handle insert(Priority priority) |
117 | { |
118 | if (0 == priority) |
119 | return {}; |
120 | |
121 | std::lock_guard lock(mutex); |
122 | auto it = container.emplace(priority, 0).first; |
123 | ++it->second; |
124 | return std::make_shared<HandleImpl>(*this, *it); |
125 | } |
126 | }; |
127 | |
128 | } |
129 | |