| 1 | #include "duckdb/execution/operator/join/physical_positional_join.hpp" |
| 2 | |
| 3 | #include "duckdb/common/vector_operations/vector_operations.hpp" |
| 4 | #include "duckdb/execution/operator/join/physical_join.hpp" |
| 5 | |
| 6 | namespace duckdb { |
| 7 | |
| 8 | PhysicalPositionalJoin::PhysicalPositionalJoin(vector<LogicalType> types, unique_ptr<PhysicalOperator> left, |
| 9 | unique_ptr<PhysicalOperator> right, idx_t estimated_cardinality) |
| 10 | : PhysicalOperator(PhysicalOperatorType::POSITIONAL_JOIN, std::move(types), estimated_cardinality) { |
| 11 | children.push_back(x: std::move(left)); |
| 12 | children.push_back(x: std::move(right)); |
| 13 | } |
| 14 | |
| 15 | //===--------------------------------------------------------------------===// |
| 16 | // Sink |
| 17 | //===--------------------------------------------------------------------===// |
| 18 | class PositionalJoinGlobalState : public GlobalSinkState { |
| 19 | public: |
| 20 | explicit PositionalJoinGlobalState(ClientContext &context, const PhysicalPositionalJoin &op) |
| 21 | : rhs(context, op.children[1]->GetTypes()), initialized(false), source_offset(0), exhausted(false) { |
| 22 | rhs.InitializeAppend(state&: append_state); |
| 23 | } |
| 24 | |
| 25 | ColumnDataCollection rhs; |
| 26 | ColumnDataAppendState append_state; |
| 27 | mutex rhs_lock; |
| 28 | |
| 29 | bool initialized; |
| 30 | ColumnDataScanState scan_state; |
| 31 | DataChunk source; |
| 32 | idx_t source_offset; |
| 33 | bool exhausted; |
| 34 | |
| 35 | void InitializeScan(); |
| 36 | idx_t Refill(); |
| 37 | idx_t CopyData(DataChunk &output, const idx_t count, const idx_t col_offset); |
| 38 | void Execute(DataChunk &input, DataChunk &output); |
| 39 | void GetData(DataChunk &output); |
| 40 | }; |
| 41 | |
| 42 | unique_ptr<GlobalSinkState> PhysicalPositionalJoin::GetGlobalSinkState(ClientContext &context) const { |
| 43 | return make_uniq<PositionalJoinGlobalState>(args&: context, args: *this); |
| 44 | } |
| 45 | |
| 46 | SinkResultType PhysicalPositionalJoin::Sink(ExecutionContext &context, DataChunk &chunk, |
| 47 | OperatorSinkInput &input) const { |
| 48 | auto &sink = input.global_state.Cast<PositionalJoinGlobalState>(); |
| 49 | lock_guard<mutex> client_guard(sink.rhs_lock); |
| 50 | sink.rhs.Append(state&: sink.append_state, new_chunk&: chunk); |
| 51 | return SinkResultType::NEED_MORE_INPUT; |
| 52 | } |
| 53 | |
| 54 | //===--------------------------------------------------------------------===// |
| 55 | // Operator |
| 56 | //===--------------------------------------------------------------------===// |
| 57 | void PositionalJoinGlobalState::InitializeScan() { |
| 58 | if (!initialized) { |
| 59 | // not initialized yet: initialize the scan |
| 60 | initialized = true; |
| 61 | rhs.InitializeScanChunk(chunk&: source); |
| 62 | rhs.InitializeScan(state&: scan_state); |
| 63 | } |
| 64 | } |
| 65 | |
| 66 | idx_t PositionalJoinGlobalState::Refill() { |
| 67 | if (source_offset >= source.size()) { |
| 68 | if (!exhausted) { |
| 69 | source.Reset(); |
| 70 | rhs.Scan(state&: scan_state, result&: source); |
| 71 | } |
| 72 | source_offset = 0; |
| 73 | } |
| 74 | |
| 75 | const auto available = source.size() - source_offset; |
| 76 | if (!available) { |
| 77 | if (!exhausted) { |
| 78 | source.Reset(); |
| 79 | for (idx_t i = 0; i < source.ColumnCount(); ++i) { |
| 80 | auto &vec = source.data[i]; |
| 81 | vec.SetVectorType(VectorType::CONSTANT_VECTOR); |
| 82 | ConstantVector::SetNull(vector&: vec, is_null: true); |
| 83 | } |
| 84 | exhausted = true; |
| 85 | } |
| 86 | } |
| 87 | |
| 88 | return available; |
| 89 | } |
| 90 | |
| 91 | idx_t PositionalJoinGlobalState::CopyData(DataChunk &output, const idx_t count, const idx_t col_offset) { |
| 92 | if (!source_offset && (source.size() >= count || exhausted)) { |
| 93 | // Fast track: aligned and has enough data |
| 94 | for (idx_t i = 0; i < source.ColumnCount(); ++i) { |
| 95 | output.data[col_offset + i].Reference(other&: source.data[i]); |
| 96 | } |
| 97 | source_offset += count; |
| 98 | } else { |
| 99 | // Copy data |
| 100 | for (idx_t target_offset = 0; target_offset < count;) { |
| 101 | const auto needed = count - target_offset; |
| 102 | const auto available = exhausted ? needed : (source.size() - source_offset); |
| 103 | const auto copy_size = MinValue(a: needed, b: available); |
| 104 | const auto source_count = source_offset + copy_size; |
| 105 | for (idx_t i = 0; i < source.ColumnCount(); ++i) { |
| 106 | VectorOperations::Copy(source: source.data[i], target&: output.data[col_offset + i], source_count, source_offset, |
| 107 | target_offset); |
| 108 | } |
| 109 | target_offset += copy_size; |
| 110 | source_offset += copy_size; |
| 111 | Refill(); |
| 112 | } |
| 113 | } |
| 114 | |
| 115 | return source.ColumnCount(); |
| 116 | } |
| 117 | |
| 118 | void PositionalJoinGlobalState::Execute(DataChunk &input, DataChunk &output) { |
| 119 | lock_guard<mutex> client_guard(rhs_lock); |
| 120 | |
| 121 | // Reference the input and assume it will be full |
| 122 | const auto col_offset = input.ColumnCount(); |
| 123 | for (idx_t i = 0; i < col_offset; ++i) { |
| 124 | output.data[i].Reference(other&: input.data[i]); |
| 125 | } |
| 126 | |
| 127 | // Copy or reference the RHS columns |
| 128 | const auto count = input.size(); |
| 129 | InitializeScan(); |
| 130 | Refill(); |
| 131 | CopyData(output, count, col_offset); |
| 132 | |
| 133 | output.SetCardinality(count); |
| 134 | } |
| 135 | |
| 136 | OperatorResultType PhysicalPositionalJoin::Execute(ExecutionContext &context, DataChunk &input, DataChunk &chunk, |
| 137 | GlobalOperatorState &gstate, OperatorState &state_p) const { |
| 138 | auto &sink = sink_state->Cast<PositionalJoinGlobalState>(); |
| 139 | sink.Execute(input, output&: chunk); |
| 140 | return OperatorResultType::NEED_MORE_INPUT; |
| 141 | } |
| 142 | |
| 143 | //===--------------------------------------------------------------------===// |
| 144 | // Source |
| 145 | //===--------------------------------------------------------------------===// |
| 146 | void PositionalJoinGlobalState::GetData(DataChunk &output) { |
| 147 | lock_guard<mutex> client_guard(rhs_lock); |
| 148 | |
| 149 | InitializeScan(); |
| 150 | Refill(); |
| 151 | |
| 152 | // LHS exhausted |
| 153 | if (exhausted) { |
| 154 | // RHS exhausted too, so we are done |
| 155 | output.SetCardinality(0); |
| 156 | return; |
| 157 | } |
| 158 | |
| 159 | // LHS is all NULL |
| 160 | const auto col_offset = output.ColumnCount() - source.ColumnCount(); |
| 161 | for (idx_t i = 0; i < col_offset; ++i) { |
| 162 | auto &vec = output.data[i]; |
| 163 | vec.SetVectorType(VectorType::CONSTANT_VECTOR); |
| 164 | ConstantVector::SetNull(vector&: vec, is_null: true); |
| 165 | } |
| 166 | |
| 167 | // RHS still has data, so copy it |
| 168 | const auto count = MinValue<idx_t>(STANDARD_VECTOR_SIZE, b: source.size() - source_offset); |
| 169 | CopyData(output, count, col_offset); |
| 170 | output.SetCardinality(count); |
| 171 | } |
| 172 | |
| 173 | SourceResultType PhysicalPositionalJoin::GetData(ExecutionContext &context, DataChunk &result, |
| 174 | OperatorSourceInput &input) const { |
| 175 | auto &sink = sink_state->Cast<PositionalJoinGlobalState>(); |
| 176 | sink.GetData(output&: result); |
| 177 | |
| 178 | return result.size() == 0 ? SourceResultType::FINISHED : SourceResultType::HAVE_MORE_OUTPUT; |
| 179 | } |
| 180 | |
| 181 | //===--------------------------------------------------------------------===// |
| 182 | // Pipeline Construction |
| 183 | //===--------------------------------------------------------------------===// |
| 184 | void PhysicalPositionalJoin::BuildPipelines(Pipeline ¤t, MetaPipeline &meta_pipeline) { |
| 185 | PhysicalJoin::BuildJoinPipelines(current, meta_pipeline, op&: *this); |
| 186 | } |
| 187 | |
| 188 | vector<const_reference<PhysicalOperator>> PhysicalPositionalJoin::GetSources() const { |
| 189 | auto result = children[0]->GetSources(); |
| 190 | if (IsSource()) { |
| 191 | result.push_back(x: *this); |
| 192 | } |
| 193 | return result; |
| 194 | } |
| 195 | |
| 196 | } // namespace duckdb |
| 197 | |