1 | #include "duckdb/parallel/interrupt.hpp" |
---|---|
2 | #include "duckdb/execution/executor.hpp" |
3 | #include "duckdb/main/client_context.hpp" |
4 | #include "duckdb/common/atomic.hpp" |
5 | #include "duckdb/common/mutex.hpp" |
6 | #include <condition_variable> |
7 | |
8 | namespace duckdb { |
9 | |
10 | InterruptState::InterruptState() : mode(InterruptMode::NO_INTERRUPTS) { |
11 | } |
12 | InterruptState::InterruptState(weak_ptr<Task> task) : mode(InterruptMode::TASK), current_task(std::move(task)) { |
13 | } |
14 | InterruptState::InterruptState(weak_ptr<InterruptDoneSignalState> signal_state_p) |
15 | : mode(InterruptMode::BLOCKING), signal_state(std::move(signal_state_p)) { |
16 | } |
17 | |
18 | void InterruptState::Callback() const { |
19 | if (mode == InterruptMode::TASK) { |
20 | auto task = current_task.lock(); |
21 | |
22 | if (!task) { |
23 | return; |
24 | } |
25 | |
26 | task->Reschedule(); |
27 | } else if (mode == InterruptMode::BLOCKING) { |
28 | auto signal_state_l = signal_state.lock(); |
29 | |
30 | if (!signal_state_l) { |
31 | return; |
32 | } |
33 | |
34 | // Signal the caller, who is currently blocked |
35 | signal_state_l->Signal(); |
36 | } else { |
37 | throw InternalException("Callback made on InterruptState without valid interrupt mode specified"); |
38 | } |
39 | } |
40 | |
41 | void InterruptDoneSignalState::Signal() { |
42 | { |
43 | unique_lock<mutex> lck {lock}; |
44 | done = true; |
45 | } |
46 | cv.notify_all(); |
47 | } |
48 | |
49 | void InterruptDoneSignalState::Await() { |
50 | std::unique_lock<std::mutex> lck(lock); |
51 | cv.wait(lock&: lck, p: [&]() { return done; }); |
52 | |
53 | // Reset after signal received |
54 | done = false; |
55 | } |
56 | |
57 | } // namespace duckdb |
58 |