1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
5//
6// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7// Use of this source code is governed by a BSD-style license that can be
8// found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10#include "table/merging_iterator.h"
11#include <string>
12#include <vector>
13#include "db/dbformat.h"
14#include "db/pinned_iterators_manager.h"
15#include "monitoring/perf_context_imp.h"
16#include "rocksdb/comparator.h"
17#include "rocksdb/iterator.h"
18#include "rocksdb/options.h"
19#include "table/internal_iterator.h"
20#include "table/iter_heap.h"
21#include "table/iterator_wrapper.h"
22#include "util/arena.h"
23#include "util/autovector.h"
24#include "util/heap.h"
25#include "util/stop_watch.h"
26#include "util/sync_point.h"
27
28namespace rocksdb {
29// Without anonymous namespace here, we fail the warning -Wmissing-prototypes
30namespace {
31typedef BinaryHeap<IteratorWrapper*, MaxIteratorComparator> MergerMaxIterHeap;
32typedef BinaryHeap<IteratorWrapper*, MinIteratorComparator> MergerMinIterHeap;
33} // namespace
34
35const size_t kNumIterReserve = 4;
36
37class MergingIterator : public InternalIterator {
38 public:
39 MergingIterator(const InternalKeyComparator* comparator,
40 InternalIterator** children, int n, bool is_arena_mode,
41 bool prefix_seek_mode)
42 : is_arena_mode_(is_arena_mode),
43 comparator_(comparator),
44 current_(nullptr),
45 direction_(kForward),
46 minHeap_(comparator_),
47 prefix_seek_mode_(prefix_seek_mode),
48 pinned_iters_mgr_(nullptr) {
49 children_.resize(n);
50 for (int i = 0; i < n; i++) {
51 children_[i].Set(children[i]);
52 }
53 for (auto& child : children_) {
54 if (child.Valid()) {
55 minHeap_.push(&child);
56 }
57 }
58 current_ = CurrentForward();
59 }
60
61 virtual void AddIterator(InternalIterator* iter) {
62 assert(direction_ == kForward);
63 children_.emplace_back(iter);
64 if (pinned_iters_mgr_) {
65 iter->SetPinnedItersMgr(pinned_iters_mgr_);
66 }
67 auto new_wrapper = children_.back();
68 if (new_wrapper.Valid()) {
69 minHeap_.push(&new_wrapper);
70 current_ = CurrentForward();
71 }
72 }
73
74 virtual ~MergingIterator() {
75 for (auto& child : children_) {
76 child.DeleteIter(is_arena_mode_);
77 }
78 }
79
80 virtual bool Valid() const override { return (current_ != nullptr); }
81
82 virtual void SeekToFirst() override {
83 ClearHeaps();
84 for (auto& child : children_) {
85 child.SeekToFirst();
86 if (child.Valid()) {
87 minHeap_.push(&child);
88 }
89 }
90 direction_ = kForward;
91 current_ = CurrentForward();
92 }
93
94 virtual void SeekToLast() override {
95 ClearHeaps();
96 InitMaxHeap();
97 for (auto& child : children_) {
98 child.SeekToLast();
99 if (child.Valid()) {
100 maxHeap_->push(&child);
101 }
102 }
103 direction_ = kReverse;
104 current_ = CurrentReverse();
105 }
106
107 virtual void Seek(const Slice& target) override {
108 ClearHeaps();
109 for (auto& child : children_) {
110 {
111 PERF_TIMER_GUARD(seek_child_seek_time);
112 child.Seek(target);
113 }
114 PERF_COUNTER_ADD(seek_child_seek_count, 1);
115
116 if (child.Valid()) {
117 PERF_TIMER_GUARD(seek_min_heap_time);
118 minHeap_.push(&child);
119 }
120 }
121 direction_ = kForward;
122 {
123 PERF_TIMER_GUARD(seek_min_heap_time);
124 current_ = CurrentForward();
125 }
126 }
127
128 virtual void SeekForPrev(const Slice& target) override {
129 ClearHeaps();
130 InitMaxHeap();
131
132 for (auto& child : children_) {
133 {
134 PERF_TIMER_GUARD(seek_child_seek_time);
135 child.SeekForPrev(target);
136 }
137 PERF_COUNTER_ADD(seek_child_seek_count, 1);
138
139 if (child.Valid()) {
140 PERF_TIMER_GUARD(seek_max_heap_time);
141 maxHeap_->push(&child);
142 }
143 }
144 direction_ = kReverse;
145 {
146 PERF_TIMER_GUARD(seek_max_heap_time);
147 current_ = CurrentReverse();
148 }
149 }
150
151 virtual void Next() override {
152 assert(Valid());
153
154 // Ensure that all children are positioned after key().
155 // If we are moving in the forward direction, it is already
156 // true for all of the non-current children since current_ is
157 // the smallest child and key() == current_->key().
158 if (direction_ != kForward) {
159 SwitchToForward();
160 // The loop advanced all non-current children to be > key() so current_
161 // should still be strictly the smallest key.
162 assert(current_ == CurrentForward());
163 }
164
165 // For the heap modifications below to be correct, current_ must be the
166 // current top of the heap.
167 assert(current_ == CurrentForward());
168
169 // as the current points to the current record. move the iterator forward.
170 current_->Next();
171 if (current_->Valid()) {
172 // current is still valid after the Next() call above. Call
173 // replace_top() to restore the heap property. When the same child
174 // iterator yields a sequence of keys, this is cheap.
175 minHeap_.replace_top(current_);
176 } else {
177 // current stopped being valid, remove it from the heap.
178 minHeap_.pop();
179 }
180 current_ = CurrentForward();
181 }
182
183 virtual void Prev() override {
184 assert(Valid());
185 // Ensure that all children are positioned before key().
186 // If we are moving in the reverse direction, it is already
187 // true for all of the non-current children since current_ is
188 // the largest child and key() == current_->key().
189 if (direction_ != kReverse) {
190 // Otherwise, retreat the non-current children. We retreat current_
191 // just after the if-block.
192 ClearHeaps();
193 InitMaxHeap();
194 for (auto& child : children_) {
195 if (&child != current_) {
196 if (!prefix_seek_mode_) {
197 child.Seek(key());
198 if (child.Valid()) {
199 // Child is at first entry >= key(). Step back one to be < key()
200 TEST_SYNC_POINT_CALLBACK("MergeIterator::Prev:BeforePrev",
201 &child);
202 child.Prev();
203 } else {
204 // Child has no entries >= key(). Position at last entry.
205 TEST_SYNC_POINT("MergeIterator::Prev:BeforeSeekToLast");
206 child.SeekToLast();
207 }
208 } else {
209 child.SeekForPrev(key());
210 if (child.Valid() && comparator_->Equal(key(), child.key())) {
211 child.Prev();
212 }
213 }
214 }
215 if (child.Valid()) {
216 maxHeap_->push(&child);
217 }
218 }
219 direction_ = kReverse;
220 if (!prefix_seek_mode_) {
221 // Note that we don't do assert(current_ == CurrentReverse()) here
222 // because it is possible to have some keys larger than the seek-key
223 // inserted between Seek() and SeekToLast(), which makes current_ not
224 // equal to CurrentReverse().
225 current_ = CurrentReverse();
226 }
227 // The loop advanced all non-current children to be < key() so current_
228 // should still be strictly the smallest key.
229 assert(current_ == CurrentReverse());
230 }
231
232 // For the heap modifications below to be correct, current_ must be the
233 // current top of the heap.
234 assert(current_ == CurrentReverse());
235
236 current_->Prev();
237 if (current_->Valid()) {
238 // current is still valid after the Prev() call above. Call
239 // replace_top() to restore the heap property. When the same child
240 // iterator yields a sequence of keys, this is cheap.
241 maxHeap_->replace_top(current_);
242 } else {
243 // current stopped being valid, remove it from the heap.
244 maxHeap_->pop();
245 }
246 current_ = CurrentReverse();
247 }
248
249 virtual Slice key() const override {
250 assert(Valid());
251 return current_->key();
252 }
253
254 virtual Slice value() const override {
255 assert(Valid());
256 return current_->value();
257 }
258
259 virtual Status status() const override {
260 Status s;
261 for (auto& child : children_) {
262 s = child.status();
263 if (!s.ok()) {
264 break;
265 }
266 }
267 return s;
268 }
269
270 virtual void SetPinnedItersMgr(
271 PinnedIteratorsManager* pinned_iters_mgr) override {
272 pinned_iters_mgr_ = pinned_iters_mgr;
273 for (auto& child : children_) {
274 child.SetPinnedItersMgr(pinned_iters_mgr);
275 }
276 }
277
278 virtual bool IsKeyPinned() const override {
279 assert(Valid());
280 return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
281 current_->IsKeyPinned();
282 }
283
284 virtual bool IsValuePinned() const override {
285 assert(Valid());
286 return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
287 current_->IsValuePinned();
288 }
289
290 private:
291 // Clears heaps for both directions, used when changing direction or seeking
292 void ClearHeaps();
293 // Ensures that maxHeap_ is initialized when starting to go in the reverse
294 // direction
295 void InitMaxHeap();
296
297 bool is_arena_mode_;
298 const InternalKeyComparator* comparator_;
299 autovector<IteratorWrapper, kNumIterReserve> children_;
300
301 // Cached pointer to child iterator with the current key, or nullptr if no
302 // child iterators are valid. This is the top of minHeap_ or maxHeap_
303 // depending on the direction.
304 IteratorWrapper* current_;
305 // Which direction is the iterator moving?
306 enum Direction {
307 kForward,
308 kReverse
309 };
310 Direction direction_;
311 MergerMinIterHeap minHeap_;
312 bool prefix_seek_mode_;
313
314 // Max heap is used for reverse iteration, which is way less common than
315 // forward. Lazily initialize it to save memory.
316 std::unique_ptr<MergerMaxIterHeap> maxHeap_;
317 PinnedIteratorsManager* pinned_iters_mgr_;
318
319 void SwitchToForward();
320
321 IteratorWrapper* CurrentForward() const {
322 assert(direction_ == kForward);
323 return !minHeap_.empty() ? minHeap_.top() : nullptr;
324 }
325
326 IteratorWrapper* CurrentReverse() const {
327 assert(direction_ == kReverse);
328 assert(maxHeap_);
329 return !maxHeap_->empty() ? maxHeap_->top() : nullptr;
330 }
331};
332
333void MergingIterator::SwitchToForward() {
334 // Otherwise, advance the non-current children. We advance current_
335 // just after the if-block.
336 ClearHeaps();
337 for (auto& child : children_) {
338 if (&child != current_) {
339 child.Seek(key());
340 if (child.Valid() && comparator_->Equal(key(), child.key())) {
341 child.Next();
342 }
343 }
344 if (child.Valid()) {
345 minHeap_.push(&child);
346 }
347 }
348 direction_ = kForward;
349}
350
351void MergingIterator::ClearHeaps() {
352 minHeap_.clear();
353 if (maxHeap_) {
354 maxHeap_->clear();
355 }
356}
357
358void MergingIterator::InitMaxHeap() {
359 if (!maxHeap_) {
360 maxHeap_.reset(new MergerMaxIterHeap(comparator_));
361 }
362}
363
364InternalIterator* NewMergingIterator(const InternalKeyComparator* cmp,
365 InternalIterator** list, int n,
366 Arena* arena, bool prefix_seek_mode) {
367 assert(n >= 0);
368 if (n == 0) {
369 return NewEmptyInternalIterator(arena);
370 } else if (n == 1) {
371 return list[0];
372 } else {
373 if (arena == nullptr) {
374 return new MergingIterator(cmp, list, n, false, prefix_seek_mode);
375 } else {
376 auto mem = arena->AllocateAligned(sizeof(MergingIterator));
377 return new (mem) MergingIterator(cmp, list, n, true, prefix_seek_mode);
378 }
379 }
380}
381
382MergeIteratorBuilder::MergeIteratorBuilder(
383 const InternalKeyComparator* comparator, Arena* a, bool prefix_seek_mode)
384 : first_iter(nullptr), use_merging_iter(false), arena(a) {
385 auto mem = arena->AllocateAligned(sizeof(MergingIterator));
386 merge_iter =
387 new (mem) MergingIterator(comparator, nullptr, 0, true, prefix_seek_mode);
388}
389
390MergeIteratorBuilder::~MergeIteratorBuilder() {
391 if (first_iter != nullptr) {
392 first_iter->~InternalIterator();
393 }
394 if (merge_iter != nullptr) {
395 merge_iter->~MergingIterator();
396 }
397}
398
399void MergeIteratorBuilder::AddIterator(InternalIterator* iter) {
400 if (!use_merging_iter && first_iter != nullptr) {
401 merge_iter->AddIterator(first_iter);
402 use_merging_iter = true;
403 first_iter = nullptr;
404 }
405 if (use_merging_iter) {
406 merge_iter->AddIterator(iter);
407 } else {
408 first_iter = iter;
409 }
410}
411
412InternalIterator* MergeIteratorBuilder::Finish() {
413 InternalIterator* ret = nullptr;
414 if (!use_merging_iter) {
415 ret = first_iter;
416 first_iter = nullptr;
417 } else {
418 ret = merge_iter;
419 merge_iter = nullptr;
420 }
421 return ret;
422}
423
424} // namespace rocksdb
425