| 1 | #include "duckdb/execution/operator/join/physical_delim_join.hpp" |
| 2 | |
| 3 | #include "duckdb/common/vector_operations/vector_operations.hpp" |
| 4 | #include "duckdb/execution/operator/scan/physical_chunk_scan.hpp" |
| 5 | |
| 6 | using namespace duckdb; |
| 7 | using namespace std; |
| 8 | |
| 9 | class PhysicalDelimJoinState : public PhysicalOperatorState { |
| 10 | public: |
| 11 | PhysicalDelimJoinState(PhysicalOperator *left) : PhysicalOperatorState(left) { |
| 12 | } |
| 13 | |
| 14 | unique_ptr<PhysicalOperatorState> join_state; |
| 15 | }; |
| 16 | |
| 17 | PhysicalDelimJoin::PhysicalDelimJoin(LogicalOperator &op, unique_ptr<PhysicalOperator> original_join, |
| 18 | vector<PhysicalOperator *> delim_scans) |
| 19 | : PhysicalOperator(PhysicalOperatorType::DELIM_JOIN, op.types), join(move(original_join)) { |
| 20 | assert(delim_scans.size() > 0); |
| 21 | assert(join->children.size() == 2); |
| 22 | // for any duplicate eliminated scans in the RHS, point them to the duplicate eliminated chunk that we create here |
| 23 | for (auto op : delim_scans) { |
| 24 | assert(op->type == PhysicalOperatorType::DELIM_SCAN); |
| 25 | auto scan = (PhysicalChunkScan *)op; |
| 26 | scan->collection = &delim_data; |
| 27 | } |
| 28 | // now for the original join |
| 29 | // we take its left child, this is the side that we will duplicate eliminate |
| 30 | children.push_back(move(join->children[0])); |
| 31 | // we replace it with a PhysicalChunkCollectionScan, that scans the ChunkCollection that we keep cached |
| 32 | auto cached_chunk_scan = make_unique<PhysicalChunkScan>(children[0]->GetTypes(), PhysicalOperatorType::CHUNK_SCAN); |
| 33 | cached_chunk_scan->collection = &lhs_data; |
| 34 | join->children[0] = move(cached_chunk_scan); |
| 35 | } |
| 36 | |
| 37 | void PhysicalDelimJoin::GetChunkInternal(ClientContext &context, DataChunk &chunk, PhysicalOperatorState *state_) { |
| 38 | auto state = reinterpret_cast<PhysicalDelimJoinState *>(state_); |
| 39 | assert(distinct); |
| 40 | if (!state->join_state) { |
| 41 | // first run: fully materialize the LHS |
| 42 | ChunkCollection &big_data = lhs_data; |
| 43 | do { |
| 44 | children[0]->GetChunk(context, state->child_chunk, state->child_state.get()); |
| 45 | big_data.Append(state->child_chunk); |
| 46 | } while (state->child_chunk.size() != 0); |
| 47 | // now create the duplicate eliminated chunk by pulling from the DISTINCT aggregate |
| 48 | DataChunk delim_chunk; |
| 49 | distinct->InitializeChunk(delim_chunk); |
| 50 | auto distinct_state = distinct->GetOperatorState(); |
| 51 | do { |
| 52 | delim_chunk.Reset(); |
| 53 | distinct->GetChunkInternal(context, delim_chunk, distinct_state.get()); |
| 54 | delim_data.Append(delim_chunk); |
| 55 | } while (delim_chunk.size() != 0); |
| 56 | // create the state of the underlying join |
| 57 | state->join_state = join->GetOperatorState(); |
| 58 | } |
| 59 | // now pull from the RHS from the underlying join |
| 60 | join->GetChunk(context, chunk, state->join_state.get()); |
| 61 | } |
| 62 | |
| 63 | unique_ptr<PhysicalOperatorState> PhysicalDelimJoin::GetOperatorState() { |
| 64 | return make_unique<PhysicalDelimJoinState>(children[0].get()); |
| 65 | } |
| 66 | |
| 67 | string PhysicalDelimJoin::() const { |
| 68 | return join->ExtraRenderInformation(); |
| 69 | } |
| 70 | |