1//===----------------------------------------------------------------------===//
2// DuckDB
3//
4// duckdb/execution/reservoir_sample.hpp
5//
6//
7//===----------------------------------------------------------------------===//
8
9#pragma once
10
11#include "duckdb/common/common.hpp"
12#include "duckdb/common/random_engine.hpp"
13#include "duckdb/common/types/chunk_collection.hpp"
14#include "duckdb/common/queue.hpp"
15
16namespace duckdb {
17
18class BaseReservoirSampling {
19public:
20 explicit BaseReservoirSampling(int64_t seed);
21 BaseReservoirSampling();
22
23 void InitializeReservoir(idx_t cur_size, idx_t sample_size);
24
25 void SetNextEntry();
26
27 void ReplaceElement();
28
29 //! The random generator
30 RandomEngine random;
31 //! Priority queue of [random element, index] for each of the elements in the sample
32 std::priority_queue<std::pair<double, idx_t>> reservoir_weights;
33 //! The next element to sample
34 idx_t next_index;
35 //! The reservoir threshold of the current min entry
36 double min_threshold;
37 //! The reservoir index of the current min entry
38 idx_t min_entry;
39 //! The current count towards next index (i.e. we will replace an entry in next_index - current_count tuples)
40 idx_t current_count;
41};
42
43class BlockingSample {
44public:
45 explicit BlockingSample(int64_t seed) : base_reservoir_sample(seed), random(base_reservoir_sample.random) {
46 }
47 virtual ~BlockingSample() {
48 }
49
50 //! Add a chunk of data to the sample
51 virtual void AddToReservoir(DataChunk &input) = 0;
52
53 //! Fetches a chunk from the sample. Note that this method is destructive and should only be used after the
54 // sample is completely built.
55 virtual unique_ptr<DataChunk> GetChunk() = 0;
56
57protected:
58 //! The reservoir sampling
59 BaseReservoirSampling base_reservoir_sample;
60 RandomEngine &random;
61};
62
63//! The reservoir sample class maintains a streaming sample of fixed size "sample_count"
64class ReservoirSample : public BlockingSample {
65public:
66 ReservoirSample(Allocator &allocator, idx_t sample_count, int64_t seed);
67
68 //! Add a chunk of data to the sample
69 void AddToReservoir(DataChunk &input) override;
70
71 //! Fetches a chunk from the sample. Note that this method is destructive and should only be used after the
72 //! sample is completely built.
73 unique_ptr<DataChunk> GetChunk() override;
74
75private:
76 //! Replace a single element of the input
77 void ReplaceElement(DataChunk &input, idx_t index_in_chunk);
78
79 //! Fills the reservoir up until sample_count entries, returns how many entries are still required
80 idx_t FillReservoir(DataChunk &input);
81
82private:
83 //! The size of the reservoir sample
84 idx_t sample_count;
85 //! The current reservoir
86 ChunkCollection reservoir;
87};
88
89//! The reservoir sample sample_size class maintains a streaming sample of variable size
90class ReservoirSamplePercentage : public BlockingSample {
91 constexpr static idx_t RESERVOIR_THRESHOLD = 100000;
92
93public:
94 ReservoirSamplePercentage(Allocator &allocator, double percentage, int64_t seed);
95
96 //! Add a chunk of data to the sample
97 void AddToReservoir(DataChunk &input) override;
98
99 //! Fetches a chunk from the sample. Note that this method is destructive and should only be used after the
100 //! sample is completely built.
101 unique_ptr<DataChunk> GetChunk() override;
102
103private:
104 void Finalize();
105
106private:
107 Allocator &allocator;
108 //! The sample_size to sample
109 double sample_percentage;
110 //! The fixed sample size of the sub-reservoirs
111 idx_t reservoir_sample_size;
112 //! The current sample
113 unique_ptr<ReservoirSample> current_sample;
114 //! The set of finished samples of the reservoir sample
115 vector<unique_ptr<ReservoirSample>> finished_samples;
116 //! The amount of tuples that have been processed so far
117 idx_t current_count = 0;
118 //! Whether or not the stream is finalized. The stream is automatically finalized on the first call to GetChunk();
119 bool is_finalized;
120};
121
122} // namespace duckdb
123