1#include "duckdb/common/hive_partitioning.hpp"
2
3#include "duckdb/execution/expression_executor.hpp"
4#include "duckdb/optimizer/filter_combiner.hpp"
5#include "duckdb/planner/expression/bound_columnref_expression.hpp"
6#include "duckdb/planner/expression/bound_constant_expression.hpp"
7#include "duckdb/planner/expression/bound_reference_expression.hpp"
8#include "duckdb/planner/expression_iterator.hpp"
9#include "duckdb/planner/table_filter.hpp"
10#include "re2/re2.h"
11
12namespace duckdb {
13
14static unordered_map<column_t, string> GetKnownColumnValues(string &filename,
15 unordered_map<string, column_t> &column_map,
16 duckdb_re2::RE2 &compiled_regex, bool filename_col,
17 bool hive_partition_cols) {
18 unordered_map<column_t, string> result;
19
20 if (filename_col) {
21 auto lookup_column_id = column_map.find(x: "filename");
22 if (lookup_column_id != column_map.end()) {
23 result[lookup_column_id->second] = filename;
24 }
25 }
26
27 if (hive_partition_cols) {
28 auto partitions = HivePartitioning::Parse(filename, regex&: compiled_regex);
29 for (auto &partition : partitions) {
30 auto lookup_column_id = column_map.find(x: partition.first);
31 if (lookup_column_id != column_map.end()) {
32 result[lookup_column_id->second] = partition.second;
33 }
34 }
35 }
36
37 return result;
38}
39
40// Takes an expression and converts a list of known column_refs to constants
41static void ConvertKnownColRefToConstants(unique_ptr<Expression> &expr,
42 unordered_map<column_t, string> &known_column_values, idx_t table_index) {
43 if (expr->type == ExpressionType::BOUND_COLUMN_REF) {
44 auto &bound_colref = expr->Cast<BoundColumnRefExpression>();
45
46 // This bound column ref is for another table
47 if (table_index != bound_colref.binding.table_index) {
48 return;
49 }
50
51 auto lookup = known_column_values.find(x: bound_colref.binding.column_index);
52 if (lookup != known_column_values.end()) {
53 expr = make_uniq<BoundConstantExpression>(args: Value(lookup->second).DefaultCastAs(target_type: bound_colref.return_type));
54 }
55 } else {
56 ExpressionIterator::EnumerateChildren(expression&: *expr, callback: [&](unique_ptr<Expression> &child) {
57 ConvertKnownColRefToConstants(expr&: child, known_column_values, table_index);
58 });
59 }
60}
61
62// matches hive partitions in file name. For example:
63// - s3://bucket/var1=value1/bla/bla/var2=value2
64// - http(s)://domain(:port)/lala/kasdl/var1=value1/?not-a-var=not-a-value
65// - folder/folder/folder/../var1=value1/etc/.//var2=value2
66const string HivePartitioning::REGEX_STRING = "[\\/\\\\]([^\\/\\?\\\\]+)=([^\\/\\n\\?\\\\]+)";
67
68std::map<string, string> HivePartitioning::Parse(const string &filename, duckdb_re2::RE2 &regex) {
69 std::map<string, string> result;
70 duckdb_re2::StringPiece input(filename); // Wrap a StringPiece around it
71
72 string var;
73 string value;
74 while (RE2::FindAndConsume(input: &input, re: regex, a: &var, a: &value)) {
75 result.insert(x: std::pair<string, string>(var, value));
76 }
77 return result;
78}
79
80std::map<string, string> HivePartitioning::Parse(const string &filename) {
81 duckdb_re2::RE2 regex(REGEX_STRING);
82 return Parse(filename, regex);
83}
84
85// TODO: this can still be improved by removing the parts of filter expressions that are true for all remaining files.
86// currently, only expressions that cannot be evaluated during pushdown are removed.
87void HivePartitioning::ApplyFiltersToFileList(ClientContext &context, vector<string> &files,
88 vector<unique_ptr<Expression>> &filters,
89 unordered_map<string, column_t> &column_map, idx_t table_index,
90 bool hive_enabled, bool filename_enabled) {
91 vector<string> pruned_files;
92 vector<bool> have_preserved_filter(filters.size(), false);
93 vector<unique_ptr<Expression>> pruned_filters;
94 duckdb_re2::RE2 regex(REGEX_STRING);
95
96 if ((!filename_enabled && !hive_enabled) || filters.empty()) {
97 return;
98 }
99
100 for (idx_t i = 0; i < files.size(); i++) {
101 auto &file = files[i];
102 bool should_prune_file = false;
103 auto known_values = GetKnownColumnValues(filename&: file, column_map, compiled_regex&: regex, filename_col: filename_enabled, hive_partition_cols: hive_enabled);
104
105 FilterCombiner combiner(context);
106
107 for (idx_t j = 0; j < filters.size(); j++) {
108 auto &filter = filters[j];
109 unique_ptr<Expression> filter_copy = filter->Copy();
110 ConvertKnownColRefToConstants(expr&: filter_copy, known_column_values&: known_values, table_index);
111 // Evaluate the filter, if it can be evaluated here, we can not prune this filter
112 Value result_value;
113
114 if (!filter_copy->IsScalar() || !filter_copy->IsFoldable() ||
115 !ExpressionExecutor::TryEvaluateScalar(context, expr: *filter_copy, result&: result_value)) {
116 // can not be evaluated only with the filename/hive columns added, we can not prune this filter
117 if (!have_preserved_filter[j]) {
118 pruned_filters.emplace_back(args: filter->Copy());
119 have_preserved_filter[j] = true;
120 }
121 } else if (!result_value.GetValue<bool>()) {
122 // filter evaluates to false
123 should_prune_file = true;
124 }
125
126 // Use filter combiner to determine that this filter makes
127 if (!should_prune_file && combiner.AddFilter(expr: std::move(filter_copy)) == FilterResult::UNSATISFIABLE) {
128 should_prune_file = true;
129 }
130 }
131
132 if (!should_prune_file) {
133 pruned_files.push_back(x: file);
134 }
135 }
136
137 D_ASSERT(filters.size() >= pruned_filters.size());
138
139 filters = std::move(pruned_filters);
140 files = std::move(pruned_files);
141}
142
143HivePartitionedColumnData::HivePartitionedColumnData(const HivePartitionedColumnData &other)
144 : PartitionedColumnData(other), hashes_v(LogicalType::HASH) {
145 // Synchronize to ensure consistency of shared partition map
146 if (other.global_state) {
147 global_state = other.global_state;
148 unique_lock<mutex> lck(global_state->lock);
149 SynchronizeLocalMap();
150 }
151 InitializeKeys();
152}
153
154void HivePartitionedColumnData::InitializeKeys() {
155 keys.resize(STANDARD_VECTOR_SIZE);
156 for (idx_t i = 0; i < STANDARD_VECTOR_SIZE; i++) {
157 keys[i].values.resize(new_size: group_by_columns.size());
158 }
159}
160
161template <class T>
162static inline Value GetHiveKeyValue(const T &val) {
163 return Value::CreateValue<T>(val);
164}
165
166template <class T>
167static inline Value GetHiveKeyValue(const T &val, const LogicalType &type) {
168 auto result = GetHiveKeyValue(val);
169 result.Reinterpret(type);
170 return result;
171}
172
173static inline Value GetHiveKeyNullValue(const LogicalType &type) {
174 Value result;
175 result.Reinterpret(new_type: type);
176 return result;
177}
178
179template <class T>
180static void TemplatedGetHivePartitionValues(Vector &input, vector<HivePartitionKey> &keys, const idx_t col_idx,
181 const idx_t count) {
182 UnifiedVectorFormat format;
183 input.ToUnifiedFormat(count, data&: format);
184
185 const auto &sel = *format.sel;
186 const auto data = UnifiedVectorFormat::GetData<T>(format);
187 const auto &validity = format.validity;
188
189 const auto &type = input.GetType();
190
191 const auto reinterpret = Value::CreateValue<T>(data[0]).GetTypeMutable() != type;
192 if (reinterpret) {
193 for (idx_t i = 0; i < count; i++) {
194 auto &key = keys[i];
195 const auto idx = sel.get_index(idx: i);
196 if (validity.RowIsValid(row_idx: idx)) {
197 key.values[col_idx] = GetHiveKeyValue(data[idx], type);
198 } else {
199 key.values[col_idx] = GetHiveKeyNullValue(type);
200 }
201 }
202 } else {
203 for (idx_t i = 0; i < count; i++) {
204 auto &key = keys[i];
205 const auto idx = sel.get_index(idx: i);
206 if (validity.RowIsValid(row_idx: idx)) {
207 key.values[col_idx] = GetHiveKeyValue(data[idx]);
208 } else {
209 key.values[col_idx] = GetHiveKeyNullValue(type);
210 }
211 }
212 }
213}
214
215static void GetNestedHivePartitionValues(Vector &input, vector<HivePartitionKey> &keys, const idx_t col_idx,
216 const idx_t count) {
217 for (idx_t i = 0; i < count; i++) {
218 auto &key = keys[i];
219 key.values[col_idx] = input.GetValue(index: i);
220 }
221}
222
223static void GetHivePartitionValuesTypeSwitch(Vector &input, vector<HivePartitionKey> &keys, const idx_t col_idx,
224 const idx_t count) {
225 const auto &type = input.GetType();
226 switch (type.InternalType()) {
227 case PhysicalType::BOOL:
228 TemplatedGetHivePartitionValues<bool>(input, keys, col_idx, count);
229 break;
230 case PhysicalType::INT8:
231 TemplatedGetHivePartitionValues<int8_t>(input, keys, col_idx, count);
232 break;
233 case PhysicalType::INT16:
234 TemplatedGetHivePartitionValues<int16_t>(input, keys, col_idx, count);
235 break;
236 case PhysicalType::INT32:
237 TemplatedGetHivePartitionValues<int32_t>(input, keys, col_idx, count);
238 break;
239 case PhysicalType::INT64:
240 TemplatedGetHivePartitionValues<int64_t>(input, keys, col_idx, count);
241 break;
242 case PhysicalType::INT128:
243 TemplatedGetHivePartitionValues<hugeint_t>(input, keys, col_idx, count);
244 break;
245 case PhysicalType::UINT8:
246 TemplatedGetHivePartitionValues<uint8_t>(input, keys, col_idx, count);
247 break;
248 case PhysicalType::UINT16:
249 TemplatedGetHivePartitionValues<uint16_t>(input, keys, col_idx, count);
250 break;
251 case PhysicalType::UINT32:
252 TemplatedGetHivePartitionValues<uint32_t>(input, keys, col_idx, count);
253 break;
254 case PhysicalType::UINT64:
255 TemplatedGetHivePartitionValues<uint64_t>(input, keys, col_idx, count);
256 break;
257 case PhysicalType::FLOAT:
258 TemplatedGetHivePartitionValues<float>(input, keys, col_idx, count);
259 break;
260 case PhysicalType::DOUBLE:
261 TemplatedGetHivePartitionValues<double>(input, keys, col_idx, count);
262 break;
263 case PhysicalType::INTERVAL:
264 TemplatedGetHivePartitionValues<interval_t>(input, keys, col_idx, count);
265 break;
266 case PhysicalType::VARCHAR:
267 TemplatedGetHivePartitionValues<string_t>(input, keys, col_idx, count);
268 break;
269 case PhysicalType::STRUCT:
270 case PhysicalType::LIST:
271 GetNestedHivePartitionValues(input, keys, col_idx, count);
272 break;
273 default:
274 throw InternalException("Unsupported type for HivePartitionedColumnData::ComputePartitionIndices");
275 }
276}
277
278void HivePartitionedColumnData::ComputePartitionIndices(PartitionedColumnDataAppendState &state, DataChunk &input) {
279 const auto count = input.size();
280
281 input.Hash(column_ids&: group_by_columns, result&: hashes_v);
282 hashes_v.Flatten(count);
283
284 for (idx_t col_idx = 0; col_idx < group_by_columns.size(); col_idx++) {
285 auto &group_by_col = input.data[group_by_columns[col_idx]];
286 GetHivePartitionValuesTypeSwitch(input&: group_by_col, keys, col_idx, count);
287 }
288
289 const auto hashes = FlatVector::GetData<hash_t>(vector&: hashes_v);
290 const auto partition_indices = FlatVector::GetData<idx_t>(vector&: state.partition_indices);
291 for (idx_t i = 0; i < count; i++) {
292 auto &key = keys[i];
293 key.hash = hashes[i];
294 auto lookup = local_partition_map.find(x: key);
295 if (lookup == local_partition_map.end()) {
296 idx_t new_partition_id = RegisterNewPartition(key, state);
297 partition_indices[i] = new_partition_id;
298 } else {
299 partition_indices[i] = lookup->second;
300 }
301 }
302}
303
304std::map<idx_t, const HivePartitionKey *> HivePartitionedColumnData::GetReverseMap() {
305 std::map<idx_t, const HivePartitionKey *> ret;
306 for (const auto &pair : local_partition_map) {
307 ret[pair.second] = &(pair.first);
308 }
309 return ret;
310}
311
312void HivePartitionedColumnData::GrowAllocators() {
313 unique_lock<mutex> lck_gstate(allocators->lock);
314
315 idx_t current_allocator_size = allocators->allocators.size();
316 idx_t required_allocators = local_partition_map.size();
317
318 allocators->allocators.reserve(n: current_allocator_size);
319 for (idx_t i = current_allocator_size; i < required_allocators; i++) {
320 CreateAllocator();
321 }
322
323 D_ASSERT(allocators->allocators.size() == local_partition_map.size());
324}
325
326void HivePartitionedColumnData::GrowAppendState(PartitionedColumnDataAppendState &state) {
327 idx_t current_append_state_size = state.partition_append_states.size();
328 idx_t required_append_state_size = local_partition_map.size();
329
330 for (idx_t i = current_append_state_size; i < required_append_state_size; i++) {
331 state.partition_append_states.emplace_back(args: make_uniq<ColumnDataAppendState>());
332 state.partition_buffers.emplace_back(args: CreatePartitionBuffer());
333 }
334}
335
336void HivePartitionedColumnData::GrowPartitions(PartitionedColumnDataAppendState &state) {
337 idx_t current_partitions = partitions.size();
338 idx_t required_partitions = local_partition_map.size();
339
340 D_ASSERT(allocators->allocators.size() == required_partitions);
341
342 for (idx_t i = current_partitions; i < required_partitions; i++) {
343 partitions.emplace_back(args: CreatePartitionCollection(partition_index: i));
344 partitions[i]->InitializeAppend(state&: *state.partition_append_states[i]);
345 }
346 D_ASSERT(partitions.size() == local_partition_map.size());
347}
348
349void HivePartitionedColumnData::SynchronizeLocalMap() {
350 // Synchronise global map into local, may contain changes from other threads too
351 for (auto it = global_state->partitions.begin() + local_partition_map.size(); it < global_state->partitions.end();
352 it++) {
353 local_partition_map[(*it)->first] = (*it)->second;
354 }
355}
356
357idx_t HivePartitionedColumnData::RegisterNewPartition(HivePartitionKey key, PartitionedColumnDataAppendState &state) {
358 if (global_state) {
359 idx_t partition_id;
360
361 // Synchronize Global state with our local state with the newly discoveren partition
362 {
363 unique_lock<mutex> lck_gstate(global_state->lock);
364
365 // Insert into global map, or return partition if already present
366 auto res =
367 global_state->partition_map.emplace(args: std::make_pair(x: std::move(key), y: global_state->partition_map.size()));
368 auto it = res.first;
369 partition_id = it->second;
370
371 // Add iterator to vector to allow incrementally updating local states from global state
372 global_state->partitions.emplace_back(args&: it);
373 SynchronizeLocalMap();
374 }
375
376 // After synchronizing with the global state, we need to grow the shared allocators to support
377 // the number of partitions, which guarantees that there's always enough allocators available to each thread
378 GrowAllocators();
379
380 // Grow local partition data
381 GrowAppendState(state);
382 GrowPartitions(state);
383
384 return partition_id;
385 } else {
386 return local_partition_map.emplace(args: std::make_pair(x: std::move(key), y: local_partition_map.size())).first->second;
387 }
388}
389
390} // namespace duckdb
391