1#include "duckdb/execution/operator/join/physical_index_join.hpp"
2
3#include "duckdb/common/vector_operations/vector_operations.hpp"
4#include "duckdb/execution/expression_executor.hpp"
5#include "duckdb/execution/index/art/art.hpp"
6#include "duckdb/execution/operator/scan/physical_table_scan.hpp"
7#include "duckdb/function/table/table_scan.hpp"
8#include "duckdb/parallel/meta_pipeline.hpp"
9#include "duckdb/parallel/thread_context.hpp"
10#include "duckdb/storage/buffer_manager.hpp"
11#include "duckdb/storage/storage_manager.hpp"
12#include "duckdb/storage/table/append_state.hpp"
13#include "duckdb/transaction/duck_transaction.hpp"
14#include "duckdb/catalog/catalog_entry/duck_table_entry.hpp"
15#include "duckdb/storage/table/scan_state.hpp"
16#include "duckdb/execution/index/art/art_key.hpp"
17
18namespace duckdb {
19
20class IndexJoinOperatorState : public CachingOperatorState {
21public:
22 IndexJoinOperatorState(ClientContext &context, const PhysicalIndexJoin &op)
23 : probe_executor(context), arena_allocator(BufferAllocator::Get(context)), keys(STANDARD_VECTOR_SIZE) {
24 auto &allocator = Allocator::Get(context);
25 rhs_rows.resize(STANDARD_VECTOR_SIZE);
26 result_sizes.resize(STANDARD_VECTOR_SIZE);
27
28 join_keys.Initialize(allocator, types: op.condition_types);
29 for (auto &cond : op.conditions) {
30 probe_executor.AddExpression(expr: *cond.left);
31 }
32 if (!op.fetch_types.empty()) {
33 rhs_chunk.Initialize(allocator, types: op.fetch_types);
34 }
35 rhs_sel.Initialize(STANDARD_VECTOR_SIZE);
36 }
37
38 bool first_fetch = true;
39 idx_t lhs_idx = 0;
40 idx_t rhs_idx = 0;
41 idx_t result_size = 0;
42 vector<idx_t> result_sizes;
43 DataChunk join_keys;
44 DataChunk rhs_chunk;
45 SelectionVector rhs_sel;
46
47 //! Vector of rows that mush be fetched for every LHS key
48 vector<vector<row_t>> rhs_rows;
49 ExpressionExecutor probe_executor;
50
51 ArenaAllocator arena_allocator;
52 vector<ARTKey> keys;
53 unique_ptr<ColumnFetchState> fetch_state;
54
55public:
56 void Finalize(const PhysicalOperator &op, ExecutionContext &context) override {
57 context.thread.profiler.Flush(phys_op: op, expression_executor&: probe_executor, name: "probe_executor", id: 0);
58 }
59};
60
61PhysicalIndexJoin::PhysicalIndexJoin(LogicalOperator &op, unique_ptr<PhysicalOperator> left,
62 unique_ptr<PhysicalOperator> right, vector<JoinCondition> cond, JoinType join_type,
63 const vector<idx_t> &left_projection_map_p, vector<idx_t> right_projection_map_p,
64 vector<column_t> column_ids_p, Index &index_p, bool lhs_first,
65 idx_t estimated_cardinality)
66 : CachingPhysicalOperator(PhysicalOperatorType::INDEX_JOIN, std::move(op.types), estimated_cardinality),
67 left_projection_map(left_projection_map_p), right_projection_map(std::move(right_projection_map_p)),
68 index(index_p), conditions(std::move(cond)), join_type(join_type), lhs_first(lhs_first) {
69 D_ASSERT(right->type == PhysicalOperatorType::TABLE_SCAN);
70 auto &tbl_scan = right->Cast<PhysicalTableScan>();
71 column_ids = std::move(column_ids_p);
72 children.push_back(x: std::move(left));
73 children.push_back(x: std::move(right));
74 for (auto &condition : conditions) {
75 condition_types.push_back(x: condition.left->return_type);
76 }
77 //! Only add to fetch_ids columns that are not indexed
78 for (auto &index_id : index.column_ids) {
79 index_ids.insert(x: index_id);
80 }
81
82 for (idx_t i = 0; i < column_ids.size(); i++) {
83 auto column_id = column_ids[i];
84 auto it = index_ids.find(x: column_id);
85 if (it == index_ids.end()) {
86 fetch_ids.push_back(x: column_id);
87 if (column_id == COLUMN_IDENTIFIER_ROW_ID) {
88 fetch_types.emplace_back(args: LogicalType::ROW_TYPE);
89 } else {
90 fetch_types.push_back(x: tbl_scan.returned_types[column_id]);
91 }
92 }
93 }
94 if (right_projection_map.empty()) {
95 for (column_t i = 0; i < column_ids.size(); i++) {
96 right_projection_map.push_back(x: i);
97 }
98 }
99 if (left_projection_map.empty()) {
100 for (column_t i = 0; i < children[0]->types.size(); i++) {
101 left_projection_map.push_back(x: i);
102 }
103 }
104}
105
106unique_ptr<OperatorState> PhysicalIndexJoin::GetOperatorState(ExecutionContext &context) const {
107 return make_uniq<IndexJoinOperatorState>(args&: context.client, args: *this);
108}
109
110void PhysicalIndexJoin::Output(ExecutionContext &context, DataChunk &input, DataChunk &chunk,
111 OperatorState &state_p) const {
112 auto &phy_tbl_scan = children[1]->Cast<PhysicalTableScan>();
113 auto &bind_tbl = phy_tbl_scan.bind_data->Cast<TableScanBindData>();
114 auto &transaction = DuckTransaction::Get(context&: context.client, catalog&: bind_tbl.table.catalog);
115 auto &state = state_p.Cast<IndexJoinOperatorState>();
116
117 auto &tbl = bind_tbl.table.GetStorage();
118 idx_t output_sel_idx = 0;
119 vector<row_t> fetch_rows;
120
121 while (output_sel_idx < STANDARD_VECTOR_SIZE && state.lhs_idx < input.size()) {
122 if (state.rhs_idx < state.result_sizes[state.lhs_idx]) {
123 state.rhs_sel.set_index(idx: output_sel_idx++, loc: state.lhs_idx);
124 if (!fetch_types.empty()) {
125 //! We need to collect the rows we want to fetch
126 fetch_rows.push_back(x: state.rhs_rows[state.lhs_idx][state.rhs_idx]);
127 }
128 state.rhs_idx++;
129 } else {
130 //! We are done with the matches from this LHS Key
131 state.rhs_idx = 0;
132 state.lhs_idx++;
133 }
134 }
135 //! Now we fetch the RHS data
136 if (!fetch_types.empty()) {
137 if (fetch_rows.empty()) {
138 return;
139 }
140 state.rhs_chunk.Reset();
141 state.fetch_state = make_uniq<ColumnFetchState>();
142 Vector row_ids(LogicalType::ROW_TYPE, data_ptr_cast(src: &fetch_rows[0]));
143 tbl.Fetch(transaction, result&: state.rhs_chunk, column_ids: fetch_ids, row_ids, fetch_count: output_sel_idx, state&: *state.fetch_state);
144 }
145
146 //! Now we actually produce our result chunk
147 idx_t left_offset = lhs_first ? 0 : right_projection_map.size();
148 idx_t right_offset = lhs_first ? left_projection_map.size() : 0;
149 idx_t rhs_column_idx = 0;
150 for (idx_t i = 0; i < right_projection_map.size(); i++) {
151 auto it = index_ids.find(x: column_ids[right_projection_map[i]]);
152 if (it == index_ids.end()) {
153 chunk.data[right_offset + i].Reference(other&: state.rhs_chunk.data[rhs_column_idx++]);
154 } else {
155 chunk.data[right_offset + i].Slice(other&: state.join_keys.data[0], sel: state.rhs_sel, count: output_sel_idx);
156 }
157 }
158 for (idx_t i = 0; i < left_projection_map.size(); i++) {
159 chunk.data[left_offset + i].Slice(other&: input.data[left_projection_map[i]], sel: state.rhs_sel, count: output_sel_idx);
160 }
161
162 state.result_size = output_sel_idx;
163 chunk.SetCardinality(state.result_size);
164}
165
166void PhysicalIndexJoin::GetRHSMatches(ExecutionContext &context, DataChunk &input, OperatorState &state_p) const {
167
168 auto &state = state_p.Cast<IndexJoinOperatorState>();
169 auto &art = index.Cast<ART>();
170
171 // generate the keys for this chunk
172 state.arena_allocator.Reset();
173 ART::GenerateKeys(allocator&: state.arena_allocator, input&: state.join_keys, keys&: state.keys);
174
175 for (idx_t i = 0; i < input.size(); i++) {
176 state.rhs_rows[i].clear();
177 if (!state.keys[i].Empty()) {
178 if (fetch_types.empty()) {
179 IndexLock lock;
180 index.InitializeLock(state&: lock);
181 art.SearchEqualJoinNoFetch(key&: state.keys[i], result_size&: state.result_sizes[i]);
182 } else {
183 IndexLock lock;
184 index.InitializeLock(state&: lock);
185 art.SearchEqual(key&: state.keys[i], max_count: (idx_t)-1, result_ids&: state.rhs_rows[i]);
186 state.result_sizes[i] = state.rhs_rows[i].size();
187 }
188 } else {
189 //! This is null so no matches
190 state.result_sizes[i] = 0;
191 }
192 }
193 for (idx_t i = input.size(); i < STANDARD_VECTOR_SIZE; i++) {
194 //! No LHS chunk value so result size is empty
195 state.result_sizes[i] = 0;
196 }
197}
198
199OperatorResultType PhysicalIndexJoin::ExecuteInternal(ExecutionContext &context, DataChunk &input, DataChunk &chunk,
200 GlobalOperatorState &gstate, OperatorState &state_p) const {
201 auto &state = state_p.Cast<IndexJoinOperatorState>();
202
203 state.result_size = 0;
204 if (state.first_fetch) {
205 state.probe_executor.Execute(input, result&: state.join_keys);
206
207 //! Fill Matches for the current LHS chunk
208 GetRHSMatches(context, input, state_p);
209 state.first_fetch = false;
210 }
211 //! Check if we need to get a new LHS chunk
212 if (state.lhs_idx >= input.size()) {
213 state.lhs_idx = 0;
214 state.rhs_idx = 0;
215 state.first_fetch = true;
216 // reset the LHS chunk to reset the validity masks
217 state.join_keys.Reset();
218 return OperatorResultType::NEED_MORE_INPUT;
219 }
220 //! Output vectors
221 if (state.lhs_idx < input.size()) {
222 Output(context, input, chunk, state_p);
223 }
224 return OperatorResultType::HAVE_MORE_OUTPUT;
225}
226
227//===--------------------------------------------------------------------===//
228// Pipeline Construction
229//===--------------------------------------------------------------------===//
230void PhysicalIndexJoin::BuildPipelines(Pipeline &current, MetaPipeline &meta_pipeline) {
231 // index join: we only continue into the LHS
232 // the right side is probed by the index join
233 // so we don't need to do anything in the pipeline with this child
234 meta_pipeline.GetState().AddPipelineOperator(pipeline&: current, op&: *this);
235 children[0]->BuildPipelines(current, meta_pipeline);
236}
237
238vector<const_reference<PhysicalOperator>> PhysicalIndexJoin::GetSources() const {
239 return children[0]->GetSources();
240}
241
242} // namespace duckdb
243