| 1 | #include "duckdb/execution/operator/helper/physical_vacuum.hpp" |
| 2 | |
| 3 | #include "duckdb/planner/operator/logical_get.hpp" |
| 4 | #include "duckdb/storage/data_table.hpp" |
| 5 | #include "duckdb/storage/statistics/distinct_statistics.hpp" |
| 6 | #include "duckdb/catalog/catalog_entry/table_catalog_entry.hpp" |
| 7 | |
| 8 | namespace duckdb { |
| 9 | |
| 10 | PhysicalVacuum::PhysicalVacuum(unique_ptr<VacuumInfo> info_p, idx_t estimated_cardinality) |
| 11 | : PhysicalOperator(PhysicalOperatorType::VACUUM, {LogicalType::BOOLEAN}, estimated_cardinality), |
| 12 | info(std::move(info_p)) { |
| 13 | } |
| 14 | |
| 15 | class VacuumLocalSinkState : public LocalSinkState { |
| 16 | public: |
| 17 | explicit VacuumLocalSinkState(VacuumInfo &info) { |
| 18 | for (idx_t col_idx = 0; col_idx < info.columns.size(); col_idx++) { |
| 19 | column_distinct_stats.push_back(x: make_uniq<DistinctStatistics>()); |
| 20 | } |
| 21 | }; |
| 22 | |
| 23 | vector<unique_ptr<DistinctStatistics>> column_distinct_stats; |
| 24 | }; |
| 25 | |
| 26 | unique_ptr<LocalSinkState> PhysicalVacuum::GetLocalSinkState(ExecutionContext &context) const { |
| 27 | return make_uniq<VacuumLocalSinkState>(args&: *info); |
| 28 | } |
| 29 | |
| 30 | class VacuumGlobalSinkState : public GlobalSinkState { |
| 31 | public: |
| 32 | explicit VacuumGlobalSinkState(VacuumInfo &info) { |
| 33 | for (idx_t col_idx = 0; col_idx < info.columns.size(); col_idx++) { |
| 34 | column_distinct_stats.push_back(x: make_uniq<DistinctStatistics>()); |
| 35 | } |
| 36 | }; |
| 37 | |
| 38 | mutex stats_lock; |
| 39 | vector<unique_ptr<DistinctStatistics>> column_distinct_stats; |
| 40 | }; |
| 41 | |
| 42 | unique_ptr<GlobalSinkState> PhysicalVacuum::GetGlobalSinkState(ClientContext &context) const { |
| 43 | return make_uniq<VacuumGlobalSinkState>(args&: *info); |
| 44 | } |
| 45 | |
| 46 | SinkResultType PhysicalVacuum::Sink(ExecutionContext &context, DataChunk &chunk, OperatorSinkInput &input) const { |
| 47 | auto &lstate = input.local_state.Cast<VacuumLocalSinkState>(); |
| 48 | D_ASSERT(lstate.column_distinct_stats.size() == info->column_id_map.size()); |
| 49 | |
| 50 | for (idx_t col_idx = 0; col_idx < chunk.data.size(); col_idx++) { |
| 51 | if (!DistinctStatistics::TypeIsSupported(type: chunk.data[col_idx].GetType())) { |
| 52 | continue; |
| 53 | } |
| 54 | lstate.column_distinct_stats[col_idx]->Update(update&: chunk.data[col_idx], count: chunk.size(), sample: false); |
| 55 | } |
| 56 | |
| 57 | return SinkResultType::NEED_MORE_INPUT; |
| 58 | } |
| 59 | |
| 60 | void PhysicalVacuum::Combine(ExecutionContext &context, GlobalSinkState &gstate_p, LocalSinkState &lstate_p) const { |
| 61 | auto &gstate = gstate_p.Cast<VacuumGlobalSinkState>(); |
| 62 | auto &lstate = lstate_p.Cast<VacuumLocalSinkState>(); |
| 63 | |
| 64 | lock_guard<mutex> lock(gstate.stats_lock); |
| 65 | D_ASSERT(gstate.column_distinct_stats.size() == lstate.column_distinct_stats.size()); |
| 66 | for (idx_t col_idx = 0; col_idx < gstate.column_distinct_stats.size(); col_idx++) { |
| 67 | gstate.column_distinct_stats[col_idx]->Merge(other: *lstate.column_distinct_stats[col_idx]); |
| 68 | } |
| 69 | } |
| 70 | |
| 71 | SinkFinalizeType PhysicalVacuum::Finalize(Pipeline &pipeline, Event &event, ClientContext &context, |
| 72 | GlobalSinkState &gstate) const { |
| 73 | auto &sink = gstate.Cast<VacuumGlobalSinkState>(); |
| 74 | |
| 75 | auto table = info->table; |
| 76 | for (idx_t col_idx = 0; col_idx < sink.column_distinct_stats.size(); col_idx++) { |
| 77 | table->GetStorage().SetDistinct(column_id: info->column_id_map.at(k: col_idx), |
| 78 | distinct_stats: std::move(sink.column_distinct_stats[col_idx])); |
| 79 | } |
| 80 | |
| 81 | return SinkFinalizeType::READY; |
| 82 | } |
| 83 | |
| 84 | SourceResultType PhysicalVacuum::GetData(ExecutionContext &context, DataChunk &chunk, |
| 85 | OperatorSourceInput &input) const { |
| 86 | // NOP |
| 87 | return SourceResultType::FINISHED; |
| 88 | } |
| 89 | |
| 90 | } // namespace duckdb |
| 91 | |