1#include "duckdb/execution/operator/join/physical_delim_join.hpp"
2
3#include "duckdb/common/vector_operations/vector_operations.hpp"
4#include "duckdb/execution/operator/scan/physical_chunk_scan.hpp"
5
6using namespace duckdb;
7using namespace std;
8
9class PhysicalDelimJoinState : public PhysicalOperatorState {
10public:
11 PhysicalDelimJoinState(PhysicalOperator *left) : PhysicalOperatorState(left) {
12 }
13
14 unique_ptr<PhysicalOperatorState> join_state;
15};
16
17PhysicalDelimJoin::PhysicalDelimJoin(LogicalOperator &op, unique_ptr<PhysicalOperator> original_join,
18 vector<PhysicalOperator *> delim_scans)
19 : PhysicalOperator(PhysicalOperatorType::DELIM_JOIN, op.types), join(move(original_join)) {
20 assert(delim_scans.size() > 0);
21 assert(join->children.size() == 2);
22 // for any duplicate eliminated scans in the RHS, point them to the duplicate eliminated chunk that we create here
23 for (auto op : delim_scans) {
24 assert(op->type == PhysicalOperatorType::DELIM_SCAN);
25 auto scan = (PhysicalChunkScan *)op;
26 scan->collection = &delim_data;
27 }
28 // now for the original join
29 // we take its left child, this is the side that we will duplicate eliminate
30 children.push_back(move(join->children[0]));
31 // we replace it with a PhysicalChunkCollectionScan, that scans the ChunkCollection that we keep cached
32 auto cached_chunk_scan = make_unique<PhysicalChunkScan>(children[0]->GetTypes(), PhysicalOperatorType::CHUNK_SCAN);
33 cached_chunk_scan->collection = &lhs_data;
34 join->children[0] = move(cached_chunk_scan);
35}
36
37void PhysicalDelimJoin::GetChunkInternal(ClientContext &context, DataChunk &chunk, PhysicalOperatorState *state_) {
38 auto state = reinterpret_cast<PhysicalDelimJoinState *>(state_);
39 assert(distinct);
40 if (!state->join_state) {
41 // first run: fully materialize the LHS
42 ChunkCollection &big_data = lhs_data;
43 do {
44 children[0]->GetChunk(context, state->child_chunk, state->child_state.get());
45 big_data.Append(state->child_chunk);
46 } while (state->child_chunk.size() != 0);
47 // now create the duplicate eliminated chunk by pulling from the DISTINCT aggregate
48 DataChunk delim_chunk;
49 distinct->InitializeChunk(delim_chunk);
50 auto distinct_state = distinct->GetOperatorState();
51 do {
52 delim_chunk.Reset();
53 distinct->GetChunkInternal(context, delim_chunk, distinct_state.get());
54 delim_data.Append(delim_chunk);
55 } while (delim_chunk.size() != 0);
56 // create the state of the underlying join
57 state->join_state = join->GetOperatorState();
58 }
59 // now pull from the RHS from the underlying join
60 join->GetChunk(context, chunk, state->join_state.get());
61}
62
63unique_ptr<PhysicalOperatorState> PhysicalDelimJoin::GetOperatorState() {
64 return make_unique<PhysicalDelimJoinState>(children[0].get());
65}
66
67string PhysicalDelimJoin::ExtraRenderInformation() const {
68 return join->ExtraRenderInformation();
69}
70