| 1 | #include "duckdb/parallel/interrupt.hpp" |
|---|---|
| 2 | #include "duckdb/execution/executor.hpp" |
| 3 | #include "duckdb/main/client_context.hpp" |
| 4 | #include "duckdb/common/atomic.hpp" |
| 5 | #include "duckdb/common/mutex.hpp" |
| 6 | #include <condition_variable> |
| 7 | |
| 8 | namespace duckdb { |
| 9 | |
| 10 | InterruptState::InterruptState() : mode(InterruptMode::NO_INTERRUPTS) { |
| 11 | } |
| 12 | InterruptState::InterruptState(weak_ptr<Task> task) : mode(InterruptMode::TASK), current_task(std::move(task)) { |
| 13 | } |
| 14 | InterruptState::InterruptState(weak_ptr<InterruptDoneSignalState> signal_state_p) |
| 15 | : mode(InterruptMode::BLOCKING), signal_state(std::move(signal_state_p)) { |
| 16 | } |
| 17 | |
| 18 | void InterruptState::Callback() const { |
| 19 | if (mode == InterruptMode::TASK) { |
| 20 | auto task = current_task.lock(); |
| 21 | |
| 22 | if (!task) { |
| 23 | return; |
| 24 | } |
| 25 | |
| 26 | task->Reschedule(); |
| 27 | } else if (mode == InterruptMode::BLOCKING) { |
| 28 | auto signal_state_l = signal_state.lock(); |
| 29 | |
| 30 | if (!signal_state_l) { |
| 31 | return; |
| 32 | } |
| 33 | |
| 34 | // Signal the caller, who is currently blocked |
| 35 | signal_state_l->Signal(); |
| 36 | } else { |
| 37 | throw InternalException("Callback made on InterruptState without valid interrupt mode specified"); |
| 38 | } |
| 39 | } |
| 40 | |
| 41 | void InterruptDoneSignalState::Signal() { |
| 42 | { |
| 43 | unique_lock<mutex> lck {lock}; |
| 44 | done = true; |
| 45 | } |
| 46 | cv.notify_all(); |
| 47 | } |
| 48 | |
| 49 | void InterruptDoneSignalState::Await() { |
| 50 | std::unique_lock<std::mutex> lck(lock); |
| 51 | cv.wait(lock&: lck, p: [&]() { return done; }); |
| 52 | |
| 53 | // Reset after signal received |
| 54 | done = false; |
| 55 | } |
| 56 | |
| 57 | } // namespace duckdb |
| 58 |