1 | #include "duckdb/execution/operator/join/physical_delim_join.hpp" |
2 | |
3 | #include "duckdb/common/vector_operations/vector_operations.hpp" |
4 | #include "duckdb/execution/operator/scan/physical_chunk_scan.hpp" |
5 | |
6 | using namespace duckdb; |
7 | using namespace std; |
8 | |
9 | class PhysicalDelimJoinState : public PhysicalOperatorState { |
10 | public: |
11 | PhysicalDelimJoinState(PhysicalOperator *left) : PhysicalOperatorState(left) { |
12 | } |
13 | |
14 | unique_ptr<PhysicalOperatorState> join_state; |
15 | }; |
16 | |
17 | PhysicalDelimJoin::PhysicalDelimJoin(LogicalOperator &op, unique_ptr<PhysicalOperator> original_join, |
18 | vector<PhysicalOperator *> delim_scans) |
19 | : PhysicalOperator(PhysicalOperatorType::DELIM_JOIN, op.types), join(move(original_join)) { |
20 | assert(delim_scans.size() > 0); |
21 | assert(join->children.size() == 2); |
22 | // for any duplicate eliminated scans in the RHS, point them to the duplicate eliminated chunk that we create here |
23 | for (auto op : delim_scans) { |
24 | assert(op->type == PhysicalOperatorType::DELIM_SCAN); |
25 | auto scan = (PhysicalChunkScan *)op; |
26 | scan->collection = &delim_data; |
27 | } |
28 | // now for the original join |
29 | // we take its left child, this is the side that we will duplicate eliminate |
30 | children.push_back(move(join->children[0])); |
31 | // we replace it with a PhysicalChunkCollectionScan, that scans the ChunkCollection that we keep cached |
32 | auto cached_chunk_scan = make_unique<PhysicalChunkScan>(children[0]->GetTypes(), PhysicalOperatorType::CHUNK_SCAN); |
33 | cached_chunk_scan->collection = &lhs_data; |
34 | join->children[0] = move(cached_chunk_scan); |
35 | } |
36 | |
37 | void PhysicalDelimJoin::GetChunkInternal(ClientContext &context, DataChunk &chunk, PhysicalOperatorState *state_) { |
38 | auto state = reinterpret_cast<PhysicalDelimJoinState *>(state_); |
39 | assert(distinct); |
40 | if (!state->join_state) { |
41 | // first run: fully materialize the LHS |
42 | ChunkCollection &big_data = lhs_data; |
43 | do { |
44 | children[0]->GetChunk(context, state->child_chunk, state->child_state.get()); |
45 | big_data.Append(state->child_chunk); |
46 | } while (state->child_chunk.size() != 0); |
47 | // now create the duplicate eliminated chunk by pulling from the DISTINCT aggregate |
48 | DataChunk delim_chunk; |
49 | distinct->InitializeChunk(delim_chunk); |
50 | auto distinct_state = distinct->GetOperatorState(); |
51 | do { |
52 | delim_chunk.Reset(); |
53 | distinct->GetChunkInternal(context, delim_chunk, distinct_state.get()); |
54 | delim_data.Append(delim_chunk); |
55 | } while (delim_chunk.size() != 0); |
56 | // create the state of the underlying join |
57 | state->join_state = join->GetOperatorState(); |
58 | } |
59 | // now pull from the RHS from the underlying join |
60 | join->GetChunk(context, chunk, state->join_state.get()); |
61 | } |
62 | |
63 | unique_ptr<PhysicalOperatorState> PhysicalDelimJoin::GetOperatorState() { |
64 | return make_unique<PhysicalDelimJoinState>(children[0].get()); |
65 | } |
66 | |
67 | string PhysicalDelimJoin::() const { |
68 | return join->ExtraRenderInformation(); |
69 | } |
70 | |