1#include "duckdb/execution/reservoir_sample.hpp"
2#include "duckdb/common/pair.hpp"
3
4namespace duckdb {
5
6ReservoirSample::ReservoirSample(Allocator &allocator, idx_t sample_count, int64_t seed)
7 : BlockingSample(seed), sample_count(sample_count), reservoir(allocator) {
8}
9
10void ReservoirSample::AddToReservoir(DataChunk &input) {
11 if (sample_count == 0) {
12 return;
13 }
14 // Input: A population V of n weighted items
15 // Output: A reservoir R with a size m
16 // 1: The first m items of V are inserted into R
17 // first we need to check if the reservoir already has "m" elements
18 if (reservoir.Count() < sample_count) {
19 if (FillReservoir(input) == 0) {
20 // entire chunk was consumed by reservoir
21 return;
22 }
23 }
24 // find the position of next_index relative to current_count
25 idx_t remaining = input.size();
26 idx_t base_offset = 0;
27 while (true) {
28 idx_t offset = base_reservoir_sample.next_index - base_reservoir_sample.current_count;
29 if (offset >= remaining) {
30 // not in this chunk! increment current count and go to the next chunk
31 base_reservoir_sample.current_count += remaining;
32 return;
33 }
34 // in this chunk! replace the element
35 ReplaceElement(input, index_in_chunk: base_offset + offset);
36 // shift the chunk forward
37 remaining -= offset;
38 base_offset += offset;
39 }
40}
41
42unique_ptr<DataChunk> ReservoirSample::GetChunk() {
43 return reservoir.Fetch();
44}
45
46void ReservoirSample::ReplaceElement(DataChunk &input, idx_t index_in_chunk) {
47 // replace the entry in the reservoir
48 // 8. The item in R with the minimum key is replaced by item vi
49 for (idx_t col_idx = 0; col_idx < input.ColumnCount(); col_idx++) {
50 reservoir.SetValue(column: col_idx, index: base_reservoir_sample.min_entry, value: input.GetValue(col_idx, index: index_in_chunk));
51 }
52 base_reservoir_sample.ReplaceElement();
53}
54
55idx_t ReservoirSample::FillReservoir(DataChunk &input) {
56 idx_t chunk_count = input.size();
57 input.Flatten();
58
59 // we have not: append to the reservoir
60 idx_t required_count;
61 if (reservoir.Count() + chunk_count >= sample_count) {
62 // have to limit the count of the chunk
63 required_count = sample_count - reservoir.Count();
64 } else {
65 // we copy the entire chunk
66 required_count = chunk_count;
67 }
68 // instead of copying we just change the pointer in the current chunk
69 input.SetCardinality(required_count);
70 reservoir.Append(new_chunk&: input);
71
72 base_reservoir_sample.InitializeReservoir(cur_size: reservoir.Count(), sample_size: sample_count);
73
74 // check if there are still elements remaining
75 // this happens if we are on a boundary
76 // for example, input.size() is 1024, but our sample size is 10
77 if (required_count == chunk_count) {
78 // we are done here
79 return 0;
80 }
81 // we still need to process a part of the chunk
82 // create a selection vector of the remaining elements
83 SelectionVector sel(STANDARD_VECTOR_SIZE);
84 for (idx_t i = required_count; i < chunk_count; i++) {
85 sel.set_index(idx: i - required_count, loc: i);
86 }
87 // slice the input vector and continue
88 input.Slice(sel_vector: sel, count: chunk_count - required_count);
89 return input.size();
90}
91
92ReservoirSamplePercentage::ReservoirSamplePercentage(Allocator &allocator, double percentage, int64_t seed)
93 : BlockingSample(seed), allocator(allocator), sample_percentage(percentage / 100.0), current_count(0),
94 is_finalized(false) {
95 reservoir_sample_size = idx_t(sample_percentage * RESERVOIR_THRESHOLD);
96 current_sample = make_uniq<ReservoirSample>(args&: allocator, args&: reservoir_sample_size, args: random.NextRandomInteger());
97}
98
99void ReservoirSamplePercentage::AddToReservoir(DataChunk &input) {
100 if (current_count + input.size() > RESERVOIR_THRESHOLD) {
101 // we don't have enough space in our current reservoir
102 // first check what we still need to append to the current sample
103 idx_t append_to_current_sample_count = RESERVOIR_THRESHOLD - current_count;
104 idx_t append_to_next_sample = input.size() - append_to_current_sample_count;
105 if (append_to_current_sample_count > 0) {
106 // we have elements remaining, first add them to the current sample
107 if (append_to_next_sample > 0) {
108 // we need to also add to the next sample
109 DataChunk new_chunk;
110 new_chunk.Initialize(allocator, types: input.GetTypes());
111 SelectionVector sel(append_to_current_sample_count);
112 for (idx_t r = 0; r < append_to_current_sample_count; r++) {
113 sel.set_index(idx: r, loc: r);
114 }
115 new_chunk.Slice(sel_vector: sel, count: append_to_current_sample_count);
116 new_chunk.Flatten();
117
118 current_sample->AddToReservoir(input&: new_chunk);
119 } else {
120 input.Flatten();
121
122 input.SetCardinality(append_to_current_sample_count);
123 current_sample->AddToReservoir(input);
124 }
125 }
126 if (append_to_next_sample > 0) {
127 // slice the input for the remainder
128 SelectionVector sel(STANDARD_VECTOR_SIZE);
129 for (idx_t i = 0; i < append_to_next_sample; i++) {
130 sel.set_index(idx: i, loc: append_to_current_sample_count + i);
131 }
132 input.Slice(sel_vector: sel, count: append_to_next_sample);
133 }
134 // now our first sample is filled: append it to the set of finished samples
135 finished_samples.push_back(x: std::move(current_sample));
136
137 // allocate a new sample, and potentially add the remainder of the current input to that sample
138 current_sample = make_uniq<ReservoirSample>(args&: allocator, args&: reservoir_sample_size, args: random.NextRandomInteger());
139 if (append_to_next_sample > 0) {
140 current_sample->AddToReservoir(input);
141 }
142 current_count = append_to_next_sample;
143 } else {
144 // we can just append to the current sample
145 current_count += input.size();
146 current_sample->AddToReservoir(input);
147 }
148}
149
150unique_ptr<DataChunk> ReservoirSamplePercentage::GetChunk() {
151 if (!is_finalized) {
152 Finalize();
153 }
154 while (!finished_samples.empty()) {
155 auto &front = finished_samples.front();
156 auto chunk = front->GetChunk();
157 if (chunk && chunk->size() > 0) {
158 return chunk;
159 }
160 // move to the next sample
161 finished_samples.erase(position: finished_samples.begin());
162 }
163 return nullptr;
164}
165
166void ReservoirSamplePercentage::Finalize() {
167 // need to finalize the current sample, if any
168 if (current_count > 0) {
169 // create a new sample
170 auto new_sample_size = idx_t(round(x: sample_percentage * current_count));
171 auto new_sample = make_uniq<ReservoirSample>(args&: allocator, args&: new_sample_size, args: random.NextRandomInteger());
172 while (true) {
173 auto chunk = current_sample->GetChunk();
174 if (!chunk || chunk->size() == 0) {
175 break;
176 }
177 new_sample->AddToReservoir(input&: *chunk);
178 }
179 finished_samples.push_back(x: std::move(new_sample));
180 }
181 is_finalized = true;
182}
183
184BaseReservoirSampling::BaseReservoirSampling(int64_t seed) : random(seed) {
185 next_index = 0;
186 min_threshold = 0;
187 min_entry = 0;
188 current_count = 0;
189}
190
191BaseReservoirSampling::BaseReservoirSampling() : BaseReservoirSampling(-1) {
192}
193
194void BaseReservoirSampling::InitializeReservoir(idx_t cur_size, idx_t sample_size) {
195 //! 1: The first m items of V are inserted into R
196 //! first we need to check if the reservoir already has "m" elements
197 if (cur_size == sample_size) {
198 //! 2. For each item vi ∈ R: Calculate a key ki = random(0, 1)
199 //! we then define the threshold to enter the reservoir T_w as the minimum key of R
200 //! we use a priority queue to extract the minimum key in O(1) time
201 for (idx_t i = 0; i < sample_size; i++) {
202 double k_i = random.NextRandom();
203 reservoir_weights.emplace(args: -k_i, args&: i);
204 }
205 SetNextEntry();
206 }
207}
208
209void BaseReservoirSampling::SetNextEntry() {
210 //! 4. Let r = random(0, 1) and Xw = log(r) / log(T_w)
211 auto &min_key = reservoir_weights.top();
212 double t_w = -min_key.first;
213 double r = random.NextRandom();
214 double x_w = log(x: r) / log(x: t_w);
215 //! 5. From the current item vc skip items until item vi , such that:
216 //! 6. wc +wc+1 +···+wi−1 < Xw <= wc +wc+1 +···+wi−1 +wi
217 //! since all our weights are 1 (uniform sampling), we can just determine the amount of elements to skip
218 min_threshold = t_w;
219 min_entry = min_key.second;
220 next_index = MaxValue<idx_t>(a: 1, b: idx_t(round(x: x_w)));
221 current_count = 0;
222}
223
224void BaseReservoirSampling::ReplaceElement() {
225 //! replace the entry in the reservoir
226 //! pop the minimum entry
227 reservoir_weights.pop();
228 //! now update the reservoir
229 //! 8. Let tw = Tw i , r2 = random(tw,1) and vi’s key: ki = (r2)1/wi
230 //! 9. The new threshold Tw is the new minimum key of R
231 //! we generate a random number between (min_threshold, 1)
232 double r2 = random.NextRandom(min: min_threshold, max: 1);
233 //! now we insert the new weight into the reservoir
234 reservoir_weights.emplace(args: -r2, args&: min_entry);
235 //! we update the min entry with the new min entry in the reservoir
236 SetNextEntry();
237}
238
239} // namespace duckdb
240