1#include "duckdb/execution/operator/scan/physical_table_scan.hpp"
2
3#include "duckdb/catalog/catalog_entry/table_catalog_entry.hpp"
4#include "duckdb/common/string_util.hpp"
5#include "duckdb/planner/expression/bound_conjunction_expression.hpp"
6#include "duckdb/transaction/transaction.hpp"
7
8#include <utility>
9
10namespace duckdb {
11
12PhysicalTableScan::PhysicalTableScan(vector<LogicalType> types, TableFunction function_p,
13 unique_ptr<FunctionData> bind_data_p, vector<column_t> column_ids_p,
14 vector<string> names_p, unique_ptr<TableFilterSet> table_filters_p,
15 idx_t estimated_cardinality)
16 : PhysicalOperator(PhysicalOperatorType::TABLE_SCAN, std::move(types), estimated_cardinality),
17 function(std::move(function_p)), bind_data(std::move(bind_data_p)), column_ids(std::move(column_ids_p)),
18 names(std::move(names_p)), table_filters(std::move(table_filters_p)) {
19}
20
21PhysicalTableScan::PhysicalTableScan(vector<LogicalType> types, TableFunction function_p,
22 unique_ptr<FunctionData> bind_data_p, vector<LogicalType> returned_types_p,
23 vector<column_t> column_ids_p, vector<idx_t> projection_ids_p,
24 vector<string> names_p, unique_ptr<TableFilterSet> table_filters_p,
25 idx_t estimated_cardinality)
26 : PhysicalOperator(PhysicalOperatorType::TABLE_SCAN, std::move(types), estimated_cardinality),
27 function(std::move(function_p)), bind_data(std::move(bind_data_p)), returned_types(std::move(returned_types_p)),
28 column_ids(std::move(column_ids_p)), projection_ids(std::move(projection_ids_p)), names(std::move(names_p)),
29 table_filters(std::move(table_filters_p)) {
30}
31
32class TableScanGlobalSourceState : public GlobalSourceState {
33public:
34 TableScanGlobalSourceState(ClientContext &context, const PhysicalTableScan &op) {
35 if (op.function.init_global) {
36 TableFunctionInitInput input(op.bind_data.get(), op.column_ids, op.projection_ids, op.table_filters.get());
37 global_state = op.function.init_global(context, input);
38 if (global_state) {
39 max_threads = global_state->MaxThreads();
40 }
41 } else {
42 max_threads = 1;
43 }
44 }
45
46 idx_t max_threads = 0;
47 unique_ptr<GlobalTableFunctionState> global_state;
48
49 idx_t MaxThreads() override {
50 return max_threads;
51 }
52};
53
54class TableScanLocalSourceState : public LocalSourceState {
55public:
56 TableScanLocalSourceState(ExecutionContext &context, TableScanGlobalSourceState &gstate,
57 const PhysicalTableScan &op) {
58 if (op.function.init_local) {
59 TableFunctionInitInput input(op.bind_data.get(), op.column_ids, op.projection_ids, op.table_filters.get());
60 local_state = op.function.init_local(context, input, gstate.global_state.get());
61 }
62 }
63
64 unique_ptr<LocalTableFunctionState> local_state;
65};
66
67unique_ptr<LocalSourceState> PhysicalTableScan::GetLocalSourceState(ExecutionContext &context,
68 GlobalSourceState &gstate) const {
69 return make_uniq<TableScanLocalSourceState>(args&: context, args&: gstate.Cast<TableScanGlobalSourceState>(), args: *this);
70}
71
72unique_ptr<GlobalSourceState> PhysicalTableScan::GetGlobalSourceState(ClientContext &context) const {
73 return make_uniq<TableScanGlobalSourceState>(args&: context, args: *this);
74}
75
76SourceResultType PhysicalTableScan::GetData(ExecutionContext &context, DataChunk &chunk,
77 OperatorSourceInput &input) const {
78 D_ASSERT(!column_ids.empty());
79 auto &gstate = input.global_state.Cast<TableScanGlobalSourceState>();
80 auto &state = input.local_state.Cast<TableScanLocalSourceState>();
81
82 TableFunctionInput data(bind_data.get(), state.local_state.get(), gstate.global_state.get());
83 function.function(context.client, data, chunk);
84
85 return chunk.size() == 0 ? SourceResultType::FINISHED : SourceResultType::HAVE_MORE_OUTPUT;
86}
87
88double PhysicalTableScan::GetProgress(ClientContext &context, GlobalSourceState &gstate_p) const {
89 auto &gstate = gstate_p.Cast<TableScanGlobalSourceState>();
90 if (function.table_scan_progress) {
91 return function.table_scan_progress(context, bind_data.get(), gstate.global_state.get());
92 }
93 // if table_scan_progress is not implemented we don't support this function yet in the progress bar
94 return -1;
95}
96
97idx_t PhysicalTableScan::GetBatchIndex(ExecutionContext &context, DataChunk &chunk, GlobalSourceState &gstate_p,
98 LocalSourceState &lstate) const {
99 D_ASSERT(SupportsBatchIndex());
100 D_ASSERT(function.get_batch_index);
101 auto &gstate = gstate_p.Cast<TableScanGlobalSourceState>();
102 auto &state = lstate.Cast<TableScanLocalSourceState>();
103 return function.get_batch_index(context.client, bind_data.get(), state.local_state.get(),
104 gstate.global_state.get());
105}
106
107string PhysicalTableScan::GetName() const {
108 return StringUtil::Upper(str: function.name + " " + function.extra_info);
109}
110
111string PhysicalTableScan::ParamsToString() const {
112 string result;
113 if (function.to_string) {
114 result = function.to_string(bind_data.get());
115 result += "\n[INFOSEPARATOR]\n";
116 }
117 if (function.projection_pushdown) {
118 if (function.filter_prune) {
119 for (idx_t i = 0; i < projection_ids.size(); i++) {
120 const auto &column_id = column_ids[projection_ids[i]];
121 if (column_id < names.size()) {
122 if (i > 0) {
123 result += "\n";
124 }
125 result += names[column_id];
126 }
127 }
128 } else {
129 for (idx_t i = 0; i < column_ids.size(); i++) {
130 const auto &column_id = column_ids[i];
131 if (column_id < names.size()) {
132 if (i > 0) {
133 result += "\n";
134 }
135 result += names[column_id];
136 }
137 }
138 }
139 }
140 if (function.filter_pushdown && table_filters) {
141 result += "\n[INFOSEPARATOR]\n";
142 result += "Filters: ";
143 for (auto &f : table_filters->filters) {
144 auto &column_index = f.first;
145 auto &filter = f.second;
146 if (column_index < names.size()) {
147 result += filter->ToString(column_name: names[column_ids[column_index]]);
148 result += "\n";
149 }
150 }
151 }
152 result += "\n[INFOSEPARATOR]\n";
153 result += StringUtil::Format(fmt_str: "EC: %llu", params: estimated_props->GetCardinality<idx_t>());
154 return result;
155}
156
157bool PhysicalTableScan::Equals(const PhysicalOperator &other_p) const {
158 if (type != other_p.type) {
159 return false;
160 }
161 auto &other = other_p.Cast<PhysicalTableScan>();
162 if (function.function != other.function.function) {
163 return false;
164 }
165 if (column_ids != other.column_ids) {
166 return false;
167 }
168 if (!FunctionData::Equals(left: bind_data.get(), right: other.bind_data.get())) {
169 return false;
170 }
171 return true;
172}
173
174} // namespace duckdb
175