1 | #pragma once |
2 | |
3 | #include <Common/Arena.h> |
4 | #include <Columns/IColumn.h> |
5 | #include <Interpreters/asof.h> |
6 | |
7 | #include <optional> |
8 | #include <variant> |
9 | #include <list> |
10 | #include <mutex> |
11 | #include <algorithm> |
12 | |
13 | namespace DB |
14 | { |
15 | |
16 | class Block; |
17 | |
18 | /// Reference to the row in block. |
19 | struct RowRef |
20 | { |
21 | const Block * block = nullptr; |
22 | size_t row_num = 0; |
23 | |
24 | RowRef() {} |
25 | RowRef(const Block * block_, size_t row_num_) : block(block_), row_num(row_num_) {} |
26 | }; |
27 | |
28 | /// Single linked list of references to rows. Used for ALL JOINs (non-unique JOINs) |
29 | struct RowRefList : RowRef |
30 | { |
31 | /// Portion of RowRefs, 16 * (MAX_SIZE + 1) bytes sized. |
32 | struct Batch |
33 | { |
34 | static constexpr size_t MAX_SIZE = 7; /// Adequate values are 3, 7, 15, 31. |
35 | |
36 | size_t size = 0; |
37 | Batch * next; |
38 | RowRef row_refs[MAX_SIZE]; |
39 | |
40 | Batch(Batch * parent) |
41 | : next(parent) |
42 | {} |
43 | |
44 | bool full() const { return size == MAX_SIZE; } |
45 | |
46 | Batch * insert(RowRef && row_ref, Arena & pool) |
47 | { |
48 | if (full()) |
49 | { |
50 | auto batch = pool.alloc<Batch>(); |
51 | *batch = Batch(this); |
52 | batch->insert(std::move(row_ref), pool); |
53 | return batch; |
54 | } |
55 | |
56 | row_refs[size++] = std::move(row_ref); |
57 | return this; |
58 | } |
59 | }; |
60 | |
61 | class ForwardIterator |
62 | { |
63 | public: |
64 | ForwardIterator(const RowRefList * begin) |
65 | : root(begin) |
66 | , first(true) |
67 | , batch(root->next) |
68 | , position(0) |
69 | {} |
70 | |
71 | const RowRef * operator -> () const |
72 | { |
73 | if (first) |
74 | return root; |
75 | return &batch->row_refs[position]; |
76 | } |
77 | |
78 | void operator ++ () |
79 | { |
80 | if (first) |
81 | { |
82 | first = false; |
83 | return; |
84 | } |
85 | |
86 | if (batch) |
87 | { |
88 | ++position; |
89 | if (position >= batch->size) |
90 | { |
91 | batch = batch->next; |
92 | position = 0; |
93 | } |
94 | } |
95 | } |
96 | |
97 | bool ok() const { return first || (batch && position < batch->size); } |
98 | |
99 | private: |
100 | const RowRefList * root; |
101 | bool first; |
102 | Batch * batch; |
103 | size_t position; |
104 | }; |
105 | |
106 | RowRefList() {} |
107 | RowRefList(const Block * block_, size_t row_num_) : RowRef(block_, row_num_) {} |
108 | |
109 | ForwardIterator begin() const { return ForwardIterator(this); } |
110 | |
111 | /// insert element after current one |
112 | void insert(RowRef && row_ref, Arena & pool) |
113 | { |
114 | if (!next) |
115 | { |
116 | next = pool.alloc<Batch>(); |
117 | *next = Batch(nullptr); |
118 | } |
119 | next = next->insert(std::move(row_ref), pool); |
120 | } |
121 | |
122 | private: |
123 | Batch * next = nullptr; |
124 | }; |
125 | |
126 | /** |
127 | * This class is intended to push sortable data into. |
128 | * When looking up values the container ensures that it is sorted for log(N) lookup |
129 | * After calling any of the lookup methods, it is no longer allowed to insert more data as this would invalidate the |
130 | * references that can be returned by the lookup methods |
131 | */ |
132 | |
133 | template <typename TEntry, typename TKey> |
134 | class SortedLookupVector |
135 | { |
136 | public: |
137 | using Base = std::vector<TEntry>; |
138 | |
139 | // First stage, insertions into the vector |
140 | template <typename U, typename ... TAllocatorParams> |
141 | void insert(U && x, TAllocatorParams &&... allocator_params) |
142 | { |
143 | assert(!sorted.load(std::memory_order_acquire)); |
144 | array.push_back(std::forward<U>(x), std::forward<TAllocatorParams>(allocator_params)...); |
145 | } |
146 | |
147 | const RowRef * upperBound(const TEntry & k, bool ascending) |
148 | { |
149 | sort(ascending); |
150 | auto it = std::upper_bound(array.cbegin(), array.cend(), k, (ascending ? less : greater)); |
151 | if (it != array.cend()) |
152 | return &(it->row_ref); |
153 | return nullptr; |
154 | } |
155 | |
156 | const RowRef * lowerBound(const TEntry & k, bool ascending) |
157 | { |
158 | sort(ascending); |
159 | auto it = std::lower_bound(array.cbegin(), array.cend(), k, (ascending ? less : greater)); |
160 | if (it != array.cend()) |
161 | return &(it->row_ref); |
162 | return nullptr; |
163 | } |
164 | |
165 | private: |
166 | std::atomic<bool> sorted = false; |
167 | Base array; |
168 | mutable std::mutex lock; |
169 | |
170 | static bool less(const TEntry & a, const TEntry & b) |
171 | { |
172 | return a.asof_value < b.asof_value; |
173 | } |
174 | |
175 | static bool greater(const TEntry & a, const TEntry & b) |
176 | { |
177 | return a.asof_value > b.asof_value; |
178 | } |
179 | |
180 | // Double checked locking with SC atomics works in C++ |
181 | // https://preshing.com/20130930/double-checked-locking-is-fixed-in-cpp11/ |
182 | // The first thread that calls one of the lookup methods sorts the data |
183 | // After calling the first lookup method it is no longer allowed to insert any data |
184 | // the array becomes immutable |
185 | void sort(bool ascending) |
186 | { |
187 | if (!sorted.load(std::memory_order_acquire)) |
188 | { |
189 | std::lock_guard<std::mutex> l(lock); |
190 | if (!sorted.load(std::memory_order_relaxed)) |
191 | { |
192 | if (!array.empty()) |
193 | std::sort(array.begin(), array.end(), (ascending ? less : greater)); |
194 | |
195 | sorted.store(true, std::memory_order_release); |
196 | } |
197 | } |
198 | } |
199 | }; |
200 | |
201 | class AsofRowRefs |
202 | { |
203 | public: |
204 | template <typename T> |
205 | struct Entry |
206 | { |
207 | using LookupType = SortedLookupVector<Entry<T>, T>; |
208 | using LookupPtr = std::unique_ptr<LookupType>; |
209 | T asof_value; |
210 | RowRef row_ref; |
211 | |
212 | Entry(T v) : asof_value(v) {} |
213 | Entry(T v, RowRef rr) : asof_value(v), row_ref(rr) {} |
214 | }; |
215 | |
216 | using Lookups = std::variant< |
217 | Entry<UInt32>::LookupPtr, |
218 | Entry<UInt64>::LookupPtr, |
219 | Entry<Int32>::LookupPtr, |
220 | Entry<Int64>::LookupPtr, |
221 | Entry<Float32>::LookupPtr, |
222 | Entry<Float64>::LookupPtr, |
223 | Entry<Decimal32>::LookupPtr, |
224 | Entry<Decimal64>::LookupPtr, |
225 | Entry<Decimal128>::LookupPtr>; |
226 | |
227 | enum class Type |
228 | { |
229 | keyu32, |
230 | keyu64, |
231 | keyi32, |
232 | keyi64, |
233 | keyf32, |
234 | keyf64, |
235 | keyDecimal32, |
236 | keyDecimal64, |
237 | keyDecimal128, |
238 | }; |
239 | |
240 | AsofRowRefs() {} |
241 | AsofRowRefs(Type t); |
242 | |
243 | static std::optional<Type> getTypeSize(const IColumn * asof_column, size_t & type_size); |
244 | |
245 | // This will be synchronized by the rwlock mutex in Join.h |
246 | void insert(Type type, const IColumn * asof_column, const Block * block, size_t row_num); |
247 | |
248 | // This will internally synchronize |
249 | const RowRef * findAsof(Type type, ASOF::Inequality inequality, const IColumn * asof_column, size_t row_num) const; |
250 | |
251 | private: |
252 | // Lookups can be stored in a HashTable because it is memmovable |
253 | // A std::variant contains a currently active type id (memmovable), together with a union of the types |
254 | // The types are all std::unique_ptr, which contains a single pointer, which is memmovable. |
255 | // Source: https://github.com/ClickHouse/ClickHouse/issues/4906 |
256 | Lookups lookups; |
257 | }; |
258 | |
259 | } |
260 | |