1 | #include "duckdb/execution/operator/helper/physical_vacuum.hpp" |
2 | |
3 | #include "duckdb/planner/operator/logical_get.hpp" |
4 | #include "duckdb/storage/data_table.hpp" |
5 | #include "duckdb/storage/statistics/distinct_statistics.hpp" |
6 | #include "duckdb/catalog/catalog_entry/table_catalog_entry.hpp" |
7 | |
8 | namespace duckdb { |
9 | |
10 | PhysicalVacuum::PhysicalVacuum(unique_ptr<VacuumInfo> info_p, idx_t estimated_cardinality) |
11 | : PhysicalOperator(PhysicalOperatorType::VACUUM, {LogicalType::BOOLEAN}, estimated_cardinality), |
12 | info(std::move(info_p)) { |
13 | } |
14 | |
15 | class VacuumLocalSinkState : public LocalSinkState { |
16 | public: |
17 | explicit VacuumLocalSinkState(VacuumInfo &info) { |
18 | for (idx_t col_idx = 0; col_idx < info.columns.size(); col_idx++) { |
19 | column_distinct_stats.push_back(x: make_uniq<DistinctStatistics>()); |
20 | } |
21 | }; |
22 | |
23 | vector<unique_ptr<DistinctStatistics>> column_distinct_stats; |
24 | }; |
25 | |
26 | unique_ptr<LocalSinkState> PhysicalVacuum::GetLocalSinkState(ExecutionContext &context) const { |
27 | return make_uniq<VacuumLocalSinkState>(args&: *info); |
28 | } |
29 | |
30 | class VacuumGlobalSinkState : public GlobalSinkState { |
31 | public: |
32 | explicit VacuumGlobalSinkState(VacuumInfo &info) { |
33 | for (idx_t col_idx = 0; col_idx < info.columns.size(); col_idx++) { |
34 | column_distinct_stats.push_back(x: make_uniq<DistinctStatistics>()); |
35 | } |
36 | }; |
37 | |
38 | mutex stats_lock; |
39 | vector<unique_ptr<DistinctStatistics>> column_distinct_stats; |
40 | }; |
41 | |
42 | unique_ptr<GlobalSinkState> PhysicalVacuum::GetGlobalSinkState(ClientContext &context) const { |
43 | return make_uniq<VacuumGlobalSinkState>(args&: *info); |
44 | } |
45 | |
46 | SinkResultType PhysicalVacuum::Sink(ExecutionContext &context, DataChunk &chunk, OperatorSinkInput &input) const { |
47 | auto &lstate = input.local_state.Cast<VacuumLocalSinkState>(); |
48 | D_ASSERT(lstate.column_distinct_stats.size() == info->column_id_map.size()); |
49 | |
50 | for (idx_t col_idx = 0; col_idx < chunk.data.size(); col_idx++) { |
51 | if (!DistinctStatistics::TypeIsSupported(type: chunk.data[col_idx].GetType())) { |
52 | continue; |
53 | } |
54 | lstate.column_distinct_stats[col_idx]->Update(update&: chunk.data[col_idx], count: chunk.size(), sample: false); |
55 | } |
56 | |
57 | return SinkResultType::NEED_MORE_INPUT; |
58 | } |
59 | |
60 | void PhysicalVacuum::Combine(ExecutionContext &context, GlobalSinkState &gstate_p, LocalSinkState &lstate_p) const { |
61 | auto &gstate = gstate_p.Cast<VacuumGlobalSinkState>(); |
62 | auto &lstate = lstate_p.Cast<VacuumLocalSinkState>(); |
63 | |
64 | lock_guard<mutex> lock(gstate.stats_lock); |
65 | D_ASSERT(gstate.column_distinct_stats.size() == lstate.column_distinct_stats.size()); |
66 | for (idx_t col_idx = 0; col_idx < gstate.column_distinct_stats.size(); col_idx++) { |
67 | gstate.column_distinct_stats[col_idx]->Merge(other: *lstate.column_distinct_stats[col_idx]); |
68 | } |
69 | } |
70 | |
71 | SinkFinalizeType PhysicalVacuum::Finalize(Pipeline &pipeline, Event &event, ClientContext &context, |
72 | GlobalSinkState &gstate) const { |
73 | auto &sink = gstate.Cast<VacuumGlobalSinkState>(); |
74 | |
75 | auto table = info->table; |
76 | for (idx_t col_idx = 0; col_idx < sink.column_distinct_stats.size(); col_idx++) { |
77 | table->GetStorage().SetDistinct(column_id: info->column_id_map.at(k: col_idx), |
78 | distinct_stats: std::move(sink.column_distinct_stats[col_idx])); |
79 | } |
80 | |
81 | return SinkFinalizeType::READY; |
82 | } |
83 | |
84 | SourceResultType PhysicalVacuum::GetData(ExecutionContext &context, DataChunk &chunk, |
85 | OperatorSourceInput &input) const { |
86 | // NOP |
87 | return SourceResultType::FINISHED; |
88 | } |
89 | |
90 | } // namespace duckdb |
91 | |