1#include "duckdb/execution/operator/projection/physical_tableinout_function.hpp"
2
3namespace duckdb {
4
5class TableInOutLocalState : public OperatorState {
6public:
7 TableInOutLocalState() : row_index(0), new_row(true) {
8 }
9
10 unique_ptr<LocalTableFunctionState> local_state;
11 idx_t row_index;
12 bool new_row;
13 DataChunk input_chunk;
14};
15
16class TableInOutGlobalState : public GlobalOperatorState {
17public:
18 TableInOutGlobalState() {
19 }
20
21 unique_ptr<GlobalTableFunctionState> global_state;
22};
23
24PhysicalTableInOutFunction::PhysicalTableInOutFunction(vector<LogicalType> types, TableFunction function_p,
25 unique_ptr<FunctionData> bind_data_p,
26 vector<column_t> column_ids_p, idx_t estimated_cardinality,
27 vector<column_t> project_input_p)
28 : PhysicalOperator(PhysicalOperatorType::INOUT_FUNCTION, std::move(types), estimated_cardinality),
29 function(std::move(function_p)), bind_data(std::move(bind_data_p)), column_ids(std::move(column_ids_p)),
30 projected_input(std::move(project_input_p)) {
31}
32
33unique_ptr<OperatorState> PhysicalTableInOutFunction::GetOperatorState(ExecutionContext &context) const {
34 auto &gstate = op_state->Cast<TableInOutGlobalState>();
35 auto result = make_uniq<TableInOutLocalState>();
36 if (function.init_local) {
37 TableFunctionInitInput input(bind_data.get(), column_ids, vector<idx_t>(), nullptr);
38 result->local_state = function.init_local(context, input, gstate.global_state.get());
39 }
40 if (!projected_input.empty()) {
41 result->input_chunk.Initialize(context&: context.client, types: children[0]->types);
42 }
43 return std::move(result);
44}
45
46unique_ptr<GlobalOperatorState> PhysicalTableInOutFunction::GetGlobalOperatorState(ClientContext &context) const {
47 auto result = make_uniq<TableInOutGlobalState>();
48 if (function.init_global) {
49 TableFunctionInitInput input(bind_data.get(), column_ids, vector<idx_t>(), nullptr);
50 result->global_state = function.init_global(context, input);
51 }
52 return std::move(result);
53}
54
55OperatorResultType PhysicalTableInOutFunction::Execute(ExecutionContext &context, DataChunk &input, DataChunk &chunk,
56 GlobalOperatorState &gstate_p, OperatorState &state_p) const {
57 auto &gstate = gstate_p.Cast<TableInOutGlobalState>();
58 auto &state = state_p.Cast<TableInOutLocalState>();
59 TableFunctionInput data(bind_data.get(), state.local_state.get(), gstate.global_state.get());
60 if (projected_input.empty()) {
61 // straightforward case - no need to project input
62 return function.in_out_function(context, data, input, chunk);
63 }
64 // when project_input is set we execute the input function row-by-row
65 if (state.new_row) {
66 if (state.row_index >= input.size()) {
67 // finished processing this chunk
68 state.new_row = true;
69 state.row_index = 0;
70 return OperatorResultType::NEED_MORE_INPUT;
71 }
72 // we are processing a new row: fetch the data for the current row
73 state.input_chunk.Reset();
74 D_ASSERT(input.ColumnCount() == state.input_chunk.ColumnCount());
75 // set up the input data to the table in-out function
76 for (idx_t col_idx = 0; col_idx < input.ColumnCount(); col_idx++) {
77 ConstantVector::Reference(vector&: state.input_chunk.data[col_idx], source&: input.data[col_idx], position: state.row_index, count: 1);
78 }
79 state.input_chunk.SetCardinality(1);
80 state.row_index++;
81 state.new_row = false;
82 }
83 // set up the output data in "chunk"
84 D_ASSERT(chunk.ColumnCount() > projected_input.size());
85 D_ASSERT(state.row_index > 0);
86 idx_t base_idx = chunk.ColumnCount() - projected_input.size();
87 for (idx_t project_idx = 0; project_idx < projected_input.size(); project_idx++) {
88 auto source_idx = projected_input[project_idx];
89 auto target_idx = base_idx + project_idx;
90 ConstantVector::Reference(vector&: chunk.data[target_idx], source&: input.data[source_idx], position: state.row_index - 1, count: 1);
91 }
92 auto result = function.in_out_function(context, data, state.input_chunk, chunk);
93 if (result == OperatorResultType::FINISHED) {
94 return result;
95 }
96 if (result == OperatorResultType::NEED_MORE_INPUT) {
97 // we finished processing this row: move to the next row
98 state.new_row = true;
99 }
100 return OperatorResultType::HAVE_MORE_OUTPUT;
101}
102
103OperatorFinalizeResultType PhysicalTableInOutFunction::FinalExecute(ExecutionContext &context, DataChunk &chunk,
104 GlobalOperatorState &gstate_p,
105 OperatorState &state_p) const {
106 auto &gstate = gstate_p.Cast<TableInOutGlobalState>();
107 auto &state = state_p.Cast<TableInOutLocalState>();
108 if (!projected_input.empty()) {
109 throw InternalException("FinalExecute not supported for project_input");
110 }
111 TableFunctionInput data(bind_data.get(), state.local_state.get(), gstate.global_state.get());
112 return function.in_out_function_final(context, data, chunk);
113}
114
115} // namespace duckdb
116