| 1 | #pragma once |
| 2 | |
| 3 | #include <Common/Arena.h> |
| 4 | #include <Columns/IColumn.h> |
| 5 | #include <Interpreters/asof.h> |
| 6 | |
| 7 | #include <optional> |
| 8 | #include <variant> |
| 9 | #include <list> |
| 10 | #include <mutex> |
| 11 | #include <algorithm> |
| 12 | |
| 13 | namespace DB |
| 14 | { |
| 15 | |
| 16 | class Block; |
| 17 | |
| 18 | /// Reference to the row in block. |
| 19 | struct RowRef |
| 20 | { |
| 21 | const Block * block = nullptr; |
| 22 | size_t row_num = 0; |
| 23 | |
| 24 | RowRef() {} |
| 25 | RowRef(const Block * block_, size_t row_num_) : block(block_), row_num(row_num_) {} |
| 26 | }; |
| 27 | |
| 28 | /// Single linked list of references to rows. Used for ALL JOINs (non-unique JOINs) |
| 29 | struct RowRefList : RowRef |
| 30 | { |
| 31 | /// Portion of RowRefs, 16 * (MAX_SIZE + 1) bytes sized. |
| 32 | struct Batch |
| 33 | { |
| 34 | static constexpr size_t MAX_SIZE = 7; /// Adequate values are 3, 7, 15, 31. |
| 35 | |
| 36 | size_t size = 0; |
| 37 | Batch * next; |
| 38 | RowRef row_refs[MAX_SIZE]; |
| 39 | |
| 40 | Batch(Batch * parent) |
| 41 | : next(parent) |
| 42 | {} |
| 43 | |
| 44 | bool full() const { return size == MAX_SIZE; } |
| 45 | |
| 46 | Batch * insert(RowRef && row_ref, Arena & pool) |
| 47 | { |
| 48 | if (full()) |
| 49 | { |
| 50 | auto batch = pool.alloc<Batch>(); |
| 51 | *batch = Batch(this); |
| 52 | batch->insert(std::move(row_ref), pool); |
| 53 | return batch; |
| 54 | } |
| 55 | |
| 56 | row_refs[size++] = std::move(row_ref); |
| 57 | return this; |
| 58 | } |
| 59 | }; |
| 60 | |
| 61 | class ForwardIterator |
| 62 | { |
| 63 | public: |
| 64 | ForwardIterator(const RowRefList * begin) |
| 65 | : root(begin) |
| 66 | , first(true) |
| 67 | , batch(root->next) |
| 68 | , position(0) |
| 69 | {} |
| 70 | |
| 71 | const RowRef * operator -> () const |
| 72 | { |
| 73 | if (first) |
| 74 | return root; |
| 75 | return &batch->row_refs[position]; |
| 76 | } |
| 77 | |
| 78 | void operator ++ () |
| 79 | { |
| 80 | if (first) |
| 81 | { |
| 82 | first = false; |
| 83 | return; |
| 84 | } |
| 85 | |
| 86 | if (batch) |
| 87 | { |
| 88 | ++position; |
| 89 | if (position >= batch->size) |
| 90 | { |
| 91 | batch = batch->next; |
| 92 | position = 0; |
| 93 | } |
| 94 | } |
| 95 | } |
| 96 | |
| 97 | bool ok() const { return first || (batch && position < batch->size); } |
| 98 | |
| 99 | private: |
| 100 | const RowRefList * root; |
| 101 | bool first; |
| 102 | Batch * batch; |
| 103 | size_t position; |
| 104 | }; |
| 105 | |
| 106 | RowRefList() {} |
| 107 | RowRefList(const Block * block_, size_t row_num_) : RowRef(block_, row_num_) {} |
| 108 | |
| 109 | ForwardIterator begin() const { return ForwardIterator(this); } |
| 110 | |
| 111 | /// insert element after current one |
| 112 | void insert(RowRef && row_ref, Arena & pool) |
| 113 | { |
| 114 | if (!next) |
| 115 | { |
| 116 | next = pool.alloc<Batch>(); |
| 117 | *next = Batch(nullptr); |
| 118 | } |
| 119 | next = next->insert(std::move(row_ref), pool); |
| 120 | } |
| 121 | |
| 122 | private: |
| 123 | Batch * next = nullptr; |
| 124 | }; |
| 125 | |
| 126 | /** |
| 127 | * This class is intended to push sortable data into. |
| 128 | * When looking up values the container ensures that it is sorted for log(N) lookup |
| 129 | * After calling any of the lookup methods, it is no longer allowed to insert more data as this would invalidate the |
| 130 | * references that can be returned by the lookup methods |
| 131 | */ |
| 132 | |
| 133 | template <typename TEntry, typename TKey> |
| 134 | class SortedLookupVector |
| 135 | { |
| 136 | public: |
| 137 | using Base = std::vector<TEntry>; |
| 138 | |
| 139 | // First stage, insertions into the vector |
| 140 | template <typename U, typename ... TAllocatorParams> |
| 141 | void insert(U && x, TAllocatorParams &&... allocator_params) |
| 142 | { |
| 143 | assert(!sorted.load(std::memory_order_acquire)); |
| 144 | array.push_back(std::forward<U>(x), std::forward<TAllocatorParams>(allocator_params)...); |
| 145 | } |
| 146 | |
| 147 | const RowRef * upperBound(const TEntry & k, bool ascending) |
| 148 | { |
| 149 | sort(ascending); |
| 150 | auto it = std::upper_bound(array.cbegin(), array.cend(), k, (ascending ? less : greater)); |
| 151 | if (it != array.cend()) |
| 152 | return &(it->row_ref); |
| 153 | return nullptr; |
| 154 | } |
| 155 | |
| 156 | const RowRef * lowerBound(const TEntry & k, bool ascending) |
| 157 | { |
| 158 | sort(ascending); |
| 159 | auto it = std::lower_bound(array.cbegin(), array.cend(), k, (ascending ? less : greater)); |
| 160 | if (it != array.cend()) |
| 161 | return &(it->row_ref); |
| 162 | return nullptr; |
| 163 | } |
| 164 | |
| 165 | private: |
| 166 | std::atomic<bool> sorted = false; |
| 167 | Base array; |
| 168 | mutable std::mutex lock; |
| 169 | |
| 170 | static bool less(const TEntry & a, const TEntry & b) |
| 171 | { |
| 172 | return a.asof_value < b.asof_value; |
| 173 | } |
| 174 | |
| 175 | static bool greater(const TEntry & a, const TEntry & b) |
| 176 | { |
| 177 | return a.asof_value > b.asof_value; |
| 178 | } |
| 179 | |
| 180 | // Double checked locking with SC atomics works in C++ |
| 181 | // https://preshing.com/20130930/double-checked-locking-is-fixed-in-cpp11/ |
| 182 | // The first thread that calls one of the lookup methods sorts the data |
| 183 | // After calling the first lookup method it is no longer allowed to insert any data |
| 184 | // the array becomes immutable |
| 185 | void sort(bool ascending) |
| 186 | { |
| 187 | if (!sorted.load(std::memory_order_acquire)) |
| 188 | { |
| 189 | std::lock_guard<std::mutex> l(lock); |
| 190 | if (!sorted.load(std::memory_order_relaxed)) |
| 191 | { |
| 192 | if (!array.empty()) |
| 193 | std::sort(array.begin(), array.end(), (ascending ? less : greater)); |
| 194 | |
| 195 | sorted.store(true, std::memory_order_release); |
| 196 | } |
| 197 | } |
| 198 | } |
| 199 | }; |
| 200 | |
| 201 | class AsofRowRefs |
| 202 | { |
| 203 | public: |
| 204 | template <typename T> |
| 205 | struct Entry |
| 206 | { |
| 207 | using LookupType = SortedLookupVector<Entry<T>, T>; |
| 208 | using LookupPtr = std::unique_ptr<LookupType>; |
| 209 | T asof_value; |
| 210 | RowRef row_ref; |
| 211 | |
| 212 | Entry(T v) : asof_value(v) {} |
| 213 | Entry(T v, RowRef rr) : asof_value(v), row_ref(rr) {} |
| 214 | }; |
| 215 | |
| 216 | using Lookups = std::variant< |
| 217 | Entry<UInt32>::LookupPtr, |
| 218 | Entry<UInt64>::LookupPtr, |
| 219 | Entry<Int32>::LookupPtr, |
| 220 | Entry<Int64>::LookupPtr, |
| 221 | Entry<Float32>::LookupPtr, |
| 222 | Entry<Float64>::LookupPtr, |
| 223 | Entry<Decimal32>::LookupPtr, |
| 224 | Entry<Decimal64>::LookupPtr, |
| 225 | Entry<Decimal128>::LookupPtr>; |
| 226 | |
| 227 | enum class Type |
| 228 | { |
| 229 | keyu32, |
| 230 | keyu64, |
| 231 | keyi32, |
| 232 | keyi64, |
| 233 | keyf32, |
| 234 | keyf64, |
| 235 | keyDecimal32, |
| 236 | keyDecimal64, |
| 237 | keyDecimal128, |
| 238 | }; |
| 239 | |
| 240 | AsofRowRefs() {} |
| 241 | AsofRowRefs(Type t); |
| 242 | |
| 243 | static std::optional<Type> getTypeSize(const IColumn * asof_column, size_t & type_size); |
| 244 | |
| 245 | // This will be synchronized by the rwlock mutex in Join.h |
| 246 | void insert(Type type, const IColumn * asof_column, const Block * block, size_t row_num); |
| 247 | |
| 248 | // This will internally synchronize |
| 249 | const RowRef * findAsof(Type type, ASOF::Inequality inequality, const IColumn * asof_column, size_t row_num) const; |
| 250 | |
| 251 | private: |
| 252 | // Lookups can be stored in a HashTable because it is memmovable |
| 253 | // A std::variant contains a currently active type id (memmovable), together with a union of the types |
| 254 | // The types are all std::unique_ptr, which contains a single pointer, which is memmovable. |
| 255 | // Source: https://github.com/ClickHouse/ClickHouse/issues/4906 |
| 256 | Lookups lookups; |
| 257 | }; |
| 258 | |
| 259 | } |
| 260 | |