1 | //===----------------------------------------------------------------------===// |
2 | // DuckDB |
3 | // |
4 | // duckdb/execution/reservoir_sample.hpp |
5 | // |
6 | // |
7 | //===----------------------------------------------------------------------===// |
8 | |
9 | #pragma once |
10 | |
11 | #include "duckdb/common/common.hpp" |
12 | #include "duckdb/common/random_engine.hpp" |
13 | #include "duckdb/common/types/chunk_collection.hpp" |
14 | #include "duckdb/common/queue.hpp" |
15 | |
16 | namespace duckdb { |
17 | |
18 | class BaseReservoirSampling { |
19 | public: |
20 | explicit BaseReservoirSampling(int64_t seed); |
21 | BaseReservoirSampling(); |
22 | |
23 | void InitializeReservoir(idx_t cur_size, idx_t sample_size); |
24 | |
25 | void SetNextEntry(); |
26 | |
27 | void ReplaceElement(); |
28 | |
29 | //! The random generator |
30 | RandomEngine random; |
31 | //! Priority queue of [random element, index] for each of the elements in the sample |
32 | std::priority_queue<std::pair<double, idx_t>> reservoir_weights; |
33 | //! The next element to sample |
34 | idx_t next_index; |
35 | //! The reservoir threshold of the current min entry |
36 | double min_threshold; |
37 | //! The reservoir index of the current min entry |
38 | idx_t min_entry; |
39 | //! The current count towards next index (i.e. we will replace an entry in next_index - current_count tuples) |
40 | idx_t current_count; |
41 | }; |
42 | |
43 | class BlockingSample { |
44 | public: |
45 | explicit BlockingSample(int64_t seed) : base_reservoir_sample(seed), random(base_reservoir_sample.random) { |
46 | } |
47 | virtual ~BlockingSample() { |
48 | } |
49 | |
50 | //! Add a chunk of data to the sample |
51 | virtual void AddToReservoir(DataChunk &input) = 0; |
52 | |
53 | //! Fetches a chunk from the sample. Note that this method is destructive and should only be used after the |
54 | // sample is completely built. |
55 | virtual unique_ptr<DataChunk> GetChunk() = 0; |
56 | |
57 | protected: |
58 | //! The reservoir sampling |
59 | BaseReservoirSampling base_reservoir_sample; |
60 | RandomEngine &random; |
61 | }; |
62 | |
63 | //! The reservoir sample class maintains a streaming sample of fixed size "sample_count" |
64 | class ReservoirSample : public BlockingSample { |
65 | public: |
66 | ReservoirSample(Allocator &allocator, idx_t sample_count, int64_t seed); |
67 | |
68 | //! Add a chunk of data to the sample |
69 | void AddToReservoir(DataChunk &input) override; |
70 | |
71 | //! Fetches a chunk from the sample. Note that this method is destructive and should only be used after the |
72 | //! sample is completely built. |
73 | unique_ptr<DataChunk> GetChunk() override; |
74 | |
75 | private: |
76 | //! Replace a single element of the input |
77 | void ReplaceElement(DataChunk &input, idx_t index_in_chunk); |
78 | |
79 | //! Fills the reservoir up until sample_count entries, returns how many entries are still required |
80 | idx_t FillReservoir(DataChunk &input); |
81 | |
82 | private: |
83 | //! The size of the reservoir sample |
84 | idx_t sample_count; |
85 | //! The current reservoir |
86 | ChunkCollection reservoir; |
87 | }; |
88 | |
89 | //! The reservoir sample sample_size class maintains a streaming sample of variable size |
90 | class ReservoirSamplePercentage : public BlockingSample { |
91 | constexpr static idx_t RESERVOIR_THRESHOLD = 100000; |
92 | |
93 | public: |
94 | ReservoirSamplePercentage(Allocator &allocator, double percentage, int64_t seed); |
95 | |
96 | //! Add a chunk of data to the sample |
97 | void AddToReservoir(DataChunk &input) override; |
98 | |
99 | //! Fetches a chunk from the sample. Note that this method is destructive and should only be used after the |
100 | //! sample is completely built. |
101 | unique_ptr<DataChunk> GetChunk() override; |
102 | |
103 | private: |
104 | void Finalize(); |
105 | |
106 | private: |
107 | Allocator &allocator; |
108 | //! The sample_size to sample |
109 | double sample_percentage; |
110 | //! The fixed sample size of the sub-reservoirs |
111 | idx_t reservoir_sample_size; |
112 | //! The current sample |
113 | unique_ptr<ReservoirSample> current_sample; |
114 | //! The set of finished samples of the reservoir sample |
115 | vector<unique_ptr<ReservoirSample>> finished_samples; |
116 | //! The amount of tuples that have been processed so far |
117 | idx_t current_count = 0; |
118 | //! Whether or not the stream is finalized. The stream is automatically finalized on the first call to GetChunk(); |
119 | bool is_finalized; |
120 | }; |
121 | |
122 | } // namespace duckdb |
123 | |