1#include "duckdb/execution/operator/helper/physical_vacuum.hpp"
2
3#include "duckdb/planner/operator/logical_get.hpp"
4#include "duckdb/storage/data_table.hpp"
5#include "duckdb/storage/statistics/distinct_statistics.hpp"
6#include "duckdb/catalog/catalog_entry/table_catalog_entry.hpp"
7
8namespace duckdb {
9
10PhysicalVacuum::PhysicalVacuum(unique_ptr<VacuumInfo> info_p, idx_t estimated_cardinality)
11 : PhysicalOperator(PhysicalOperatorType::VACUUM, {LogicalType::BOOLEAN}, estimated_cardinality),
12 info(std::move(info_p)) {
13}
14
15class VacuumLocalSinkState : public LocalSinkState {
16public:
17 explicit VacuumLocalSinkState(VacuumInfo &info) {
18 for (idx_t col_idx = 0; col_idx < info.columns.size(); col_idx++) {
19 column_distinct_stats.push_back(x: make_uniq<DistinctStatistics>());
20 }
21 };
22
23 vector<unique_ptr<DistinctStatistics>> column_distinct_stats;
24};
25
26unique_ptr<LocalSinkState> PhysicalVacuum::GetLocalSinkState(ExecutionContext &context) const {
27 return make_uniq<VacuumLocalSinkState>(args&: *info);
28}
29
30class VacuumGlobalSinkState : public GlobalSinkState {
31public:
32 explicit VacuumGlobalSinkState(VacuumInfo &info) {
33 for (idx_t col_idx = 0; col_idx < info.columns.size(); col_idx++) {
34 column_distinct_stats.push_back(x: make_uniq<DistinctStatistics>());
35 }
36 };
37
38 mutex stats_lock;
39 vector<unique_ptr<DistinctStatistics>> column_distinct_stats;
40};
41
42unique_ptr<GlobalSinkState> PhysicalVacuum::GetGlobalSinkState(ClientContext &context) const {
43 return make_uniq<VacuumGlobalSinkState>(args&: *info);
44}
45
46SinkResultType PhysicalVacuum::Sink(ExecutionContext &context, DataChunk &chunk, OperatorSinkInput &input) const {
47 auto &lstate = input.local_state.Cast<VacuumLocalSinkState>();
48 D_ASSERT(lstate.column_distinct_stats.size() == info->column_id_map.size());
49
50 for (idx_t col_idx = 0; col_idx < chunk.data.size(); col_idx++) {
51 if (!DistinctStatistics::TypeIsSupported(type: chunk.data[col_idx].GetType())) {
52 continue;
53 }
54 lstate.column_distinct_stats[col_idx]->Update(update&: chunk.data[col_idx], count: chunk.size(), sample: false);
55 }
56
57 return SinkResultType::NEED_MORE_INPUT;
58}
59
60void PhysicalVacuum::Combine(ExecutionContext &context, GlobalSinkState &gstate_p, LocalSinkState &lstate_p) const {
61 auto &gstate = gstate_p.Cast<VacuumGlobalSinkState>();
62 auto &lstate = lstate_p.Cast<VacuumLocalSinkState>();
63
64 lock_guard<mutex> lock(gstate.stats_lock);
65 D_ASSERT(gstate.column_distinct_stats.size() == lstate.column_distinct_stats.size());
66 for (idx_t col_idx = 0; col_idx < gstate.column_distinct_stats.size(); col_idx++) {
67 gstate.column_distinct_stats[col_idx]->Merge(other: *lstate.column_distinct_stats[col_idx]);
68 }
69}
70
71SinkFinalizeType PhysicalVacuum::Finalize(Pipeline &pipeline, Event &event, ClientContext &context,
72 GlobalSinkState &gstate) const {
73 auto &sink = gstate.Cast<VacuumGlobalSinkState>();
74
75 auto table = info->table;
76 for (idx_t col_idx = 0; col_idx < sink.column_distinct_stats.size(); col_idx++) {
77 table->GetStorage().SetDistinct(column_id: info->column_id_map.at(k: col_idx),
78 distinct_stats: std::move(sink.column_distinct_stats[col_idx]));
79 }
80
81 return SinkFinalizeType::READY;
82}
83
84SourceResultType PhysicalVacuum::GetData(ExecutionContext &context, DataChunk &chunk,
85 OperatorSourceInput &input) const {
86 // NOP
87 return SourceResultType::FINISHED;
88}
89
90} // namespace duckdb
91