1#pragma once
2
3#include <Common/Arena.h>
4#include <Columns/IColumn.h>
5#include <Interpreters/asof.h>
6
7#include <optional>
8#include <variant>
9#include <list>
10#include <mutex>
11#include <algorithm>
12
13namespace DB
14{
15
16class Block;
17
18/// Reference to the row in block.
19struct RowRef
20{
21 const Block * block = nullptr;
22 size_t row_num = 0;
23
24 RowRef() {}
25 RowRef(const Block * block_, size_t row_num_) : block(block_), row_num(row_num_) {}
26};
27
28/// Single linked list of references to rows. Used for ALL JOINs (non-unique JOINs)
29struct RowRefList : RowRef
30{
31 /// Portion of RowRefs, 16 * (MAX_SIZE + 1) bytes sized.
32 struct Batch
33 {
34 static constexpr size_t MAX_SIZE = 7; /// Adequate values are 3, 7, 15, 31.
35
36 size_t size = 0;
37 Batch * next;
38 RowRef row_refs[MAX_SIZE];
39
40 Batch(Batch * parent)
41 : next(parent)
42 {}
43
44 bool full() const { return size == MAX_SIZE; }
45
46 Batch * insert(RowRef && row_ref, Arena & pool)
47 {
48 if (full())
49 {
50 auto batch = pool.alloc<Batch>();
51 *batch = Batch(this);
52 batch->insert(std::move(row_ref), pool);
53 return batch;
54 }
55
56 row_refs[size++] = std::move(row_ref);
57 return this;
58 }
59 };
60
61 class ForwardIterator
62 {
63 public:
64 ForwardIterator(const RowRefList * begin)
65 : root(begin)
66 , first(true)
67 , batch(root->next)
68 , position(0)
69 {}
70
71 const RowRef * operator -> () const
72 {
73 if (first)
74 return root;
75 return &batch->row_refs[position];
76 }
77
78 void operator ++ ()
79 {
80 if (first)
81 {
82 first = false;
83 return;
84 }
85
86 if (batch)
87 {
88 ++position;
89 if (position >= batch->size)
90 {
91 batch = batch->next;
92 position = 0;
93 }
94 }
95 }
96
97 bool ok() const { return first || (batch && position < batch->size); }
98
99 private:
100 const RowRefList * root;
101 bool first;
102 Batch * batch;
103 size_t position;
104 };
105
106 RowRefList() {}
107 RowRefList(const Block * block_, size_t row_num_) : RowRef(block_, row_num_) {}
108
109 ForwardIterator begin() const { return ForwardIterator(this); }
110
111 /// insert element after current one
112 void insert(RowRef && row_ref, Arena & pool)
113 {
114 if (!next)
115 {
116 next = pool.alloc<Batch>();
117 *next = Batch(nullptr);
118 }
119 next = next->insert(std::move(row_ref), pool);
120 }
121
122private:
123 Batch * next = nullptr;
124};
125
126/**
127 * This class is intended to push sortable data into.
128 * When looking up values the container ensures that it is sorted for log(N) lookup
129 * After calling any of the lookup methods, it is no longer allowed to insert more data as this would invalidate the
130 * references that can be returned by the lookup methods
131 */
132
133template <typename TEntry, typename TKey>
134class SortedLookupVector
135{
136public:
137 using Base = std::vector<TEntry>;
138
139 // First stage, insertions into the vector
140 template <typename U, typename ... TAllocatorParams>
141 void insert(U && x, TAllocatorParams &&... allocator_params)
142 {
143 assert(!sorted.load(std::memory_order_acquire));
144 array.push_back(std::forward<U>(x), std::forward<TAllocatorParams>(allocator_params)...);
145 }
146
147 const RowRef * upperBound(const TEntry & k, bool ascending)
148 {
149 sort(ascending);
150 auto it = std::upper_bound(array.cbegin(), array.cend(), k, (ascending ? less : greater));
151 if (it != array.cend())
152 return &(it->row_ref);
153 return nullptr;
154 }
155
156 const RowRef * lowerBound(const TEntry & k, bool ascending)
157 {
158 sort(ascending);
159 auto it = std::lower_bound(array.cbegin(), array.cend(), k, (ascending ? less : greater));
160 if (it != array.cend())
161 return &(it->row_ref);
162 return nullptr;
163 }
164
165private:
166 std::atomic<bool> sorted = false;
167 Base array;
168 mutable std::mutex lock;
169
170 static bool less(const TEntry & a, const TEntry & b)
171 {
172 return a.asof_value < b.asof_value;
173 }
174
175 static bool greater(const TEntry & a, const TEntry & b)
176 {
177 return a.asof_value > b.asof_value;
178 }
179
180 // Double checked locking with SC atomics works in C++
181 // https://preshing.com/20130930/double-checked-locking-is-fixed-in-cpp11/
182 // The first thread that calls one of the lookup methods sorts the data
183 // After calling the first lookup method it is no longer allowed to insert any data
184 // the array becomes immutable
185 void sort(bool ascending)
186 {
187 if (!sorted.load(std::memory_order_acquire))
188 {
189 std::lock_guard<std::mutex> l(lock);
190 if (!sorted.load(std::memory_order_relaxed))
191 {
192 if (!array.empty())
193 std::sort(array.begin(), array.end(), (ascending ? less : greater));
194
195 sorted.store(true, std::memory_order_release);
196 }
197 }
198 }
199};
200
201class AsofRowRefs
202{
203public:
204 template <typename T>
205 struct Entry
206 {
207 using LookupType = SortedLookupVector<Entry<T>, T>;
208 using LookupPtr = std::unique_ptr<LookupType>;
209 T asof_value;
210 RowRef row_ref;
211
212 Entry(T v) : asof_value(v) {}
213 Entry(T v, RowRef rr) : asof_value(v), row_ref(rr) {}
214 };
215
216 using Lookups = std::variant<
217 Entry<UInt32>::LookupPtr,
218 Entry<UInt64>::LookupPtr,
219 Entry<Int32>::LookupPtr,
220 Entry<Int64>::LookupPtr,
221 Entry<Float32>::LookupPtr,
222 Entry<Float64>::LookupPtr,
223 Entry<Decimal32>::LookupPtr,
224 Entry<Decimal64>::LookupPtr,
225 Entry<Decimal128>::LookupPtr>;
226
227 enum class Type
228 {
229 keyu32,
230 keyu64,
231 keyi32,
232 keyi64,
233 keyf32,
234 keyf64,
235 keyDecimal32,
236 keyDecimal64,
237 keyDecimal128,
238 };
239
240 AsofRowRefs() {}
241 AsofRowRefs(Type t);
242
243 static std::optional<Type> getTypeSize(const IColumn * asof_column, size_t & type_size);
244
245 // This will be synchronized by the rwlock mutex in Join.h
246 void insert(Type type, const IColumn * asof_column, const Block * block, size_t row_num);
247
248 // This will internally synchronize
249 const RowRef * findAsof(Type type, ASOF::Inequality inequality, const IColumn * asof_column, size_t row_num) const;
250
251private:
252 // Lookups can be stored in a HashTable because it is memmovable
253 // A std::variant contains a currently active type id (memmovable), together with a union of the types
254 // The types are all std::unique_ptr, which contains a single pointer, which is memmovable.
255 // Source: https://github.com/ClickHouse/ClickHouse/issues/4906
256 Lookups lookups;
257};
258
259}
260