1#include "duckdb/parallel/interrupt.hpp"
2#include "duckdb/execution/executor.hpp"
3#include "duckdb/main/client_context.hpp"
4#include "duckdb/common/atomic.hpp"
5#include "duckdb/common/mutex.hpp"
6#include <condition_variable>
7
8namespace duckdb {
9
10InterruptState::InterruptState() : mode(InterruptMode::NO_INTERRUPTS) {
11}
12InterruptState::InterruptState(weak_ptr<Task> task) : mode(InterruptMode::TASK), current_task(std::move(task)) {
13}
14InterruptState::InterruptState(weak_ptr<InterruptDoneSignalState> signal_state_p)
15 : mode(InterruptMode::BLOCKING), signal_state(std::move(signal_state_p)) {
16}
17
18void InterruptState::Callback() const {
19 if (mode == InterruptMode::TASK) {
20 auto task = current_task.lock();
21
22 if (!task) {
23 return;
24 }
25
26 task->Reschedule();
27 } else if (mode == InterruptMode::BLOCKING) {
28 auto signal_state_l = signal_state.lock();
29
30 if (!signal_state_l) {
31 return;
32 }
33
34 // Signal the caller, who is currently blocked
35 signal_state_l->Signal();
36 } else {
37 throw InternalException("Callback made on InterruptState without valid interrupt mode specified");
38 }
39}
40
41void InterruptDoneSignalState::Signal() {
42 {
43 unique_lock<mutex> lck {lock};
44 done = true;
45 }
46 cv.notify_all();
47}
48
49void InterruptDoneSignalState::Await() {
50 std::unique_lock<std::mutex> lck(lock);
51 cv.wait(lock&: lck, p: [&]() { return done; });
52
53 // Reset after signal received
54 done = false;
55}
56
57} // namespace duckdb
58