1 | #include "duckdb/main/capi/capi_internal.hpp" |
---|---|
2 | #include "duckdb/parallel/task_scheduler.hpp" |
3 | |
4 | using duckdb::DatabaseData; |
5 | |
6 | struct CAPITaskState { |
7 | CAPITaskState(duckdb::DatabaseInstance &db) |
8 | : db(db), marker(duckdb::make_uniq<duckdb::atomic<bool>>(args: true)), execute_count(0) { |
9 | } |
10 | |
11 | duckdb::DatabaseInstance &db; |
12 | duckdb::unique_ptr<duckdb::atomic<bool>> marker; |
13 | duckdb::atomic<idx_t> execute_count; |
14 | }; |
15 | |
16 | void duckdb_execute_tasks(duckdb_database database, idx_t max_tasks) { |
17 | if (!database) { |
18 | return; |
19 | } |
20 | auto wrapper = (DatabaseData *)database; |
21 | auto &scheduler = duckdb::TaskScheduler::GetScheduler(db&: *wrapper->database->instance); |
22 | scheduler.ExecuteTasks(max_tasks); |
23 | } |
24 | |
25 | duckdb_task_state duckdb_create_task_state(duckdb_database database) { |
26 | if (!database) { |
27 | return nullptr; |
28 | } |
29 | auto wrapper = (DatabaseData *)database; |
30 | auto state = new CAPITaskState(*wrapper->database->instance); |
31 | return state; |
32 | } |
33 | |
34 | void duckdb_execute_tasks_state(duckdb_task_state state_p) { |
35 | if (!state_p) { |
36 | return; |
37 | } |
38 | auto state = (CAPITaskState *)state_p; |
39 | auto &scheduler = duckdb::TaskScheduler::GetScheduler(db&: state->db); |
40 | state->execute_count++; |
41 | scheduler.ExecuteForever(marker: state->marker.get()); |
42 | } |
43 | |
44 | idx_t duckdb_execute_n_tasks_state(duckdb_task_state state_p, idx_t max_tasks) { |
45 | if (!state_p) { |
46 | return 0; |
47 | } |
48 | auto state = (CAPITaskState *)state_p; |
49 | auto &scheduler = duckdb::TaskScheduler::GetScheduler(db&: state->db); |
50 | return scheduler.ExecuteTasks(marker: state->marker.get(), max_tasks); |
51 | } |
52 | |
53 | void duckdb_finish_execution(duckdb_task_state state_p) { |
54 | if (!state_p) { |
55 | return; |
56 | } |
57 | auto state = (CAPITaskState *)state_p; |
58 | *state->marker = false; |
59 | if (state->execute_count > 0) { |
60 | // signal to the threads to wake up |
61 | auto &scheduler = duckdb::TaskScheduler::GetScheduler(db&: state->db); |
62 | scheduler.Signal(n: state->execute_count); |
63 | } |
64 | } |
65 | |
66 | bool duckdb_task_state_is_finished(duckdb_task_state state_p) { |
67 | if (!state_p) { |
68 | return false; |
69 | } |
70 | auto state = (CAPITaskState *)state_p; |
71 | return !(*state->marker); |
72 | } |
73 | |
74 | void duckdb_destroy_task_state(duckdb_task_state state_p) { |
75 | if (!state_p) { |
76 | return; |
77 | } |
78 | auto state = (CAPITaskState *)state_p; |
79 | delete state; |
80 | } |
81 | |
82 | bool duckdb_execution_is_finished(duckdb_connection con) { |
83 | if (!con) { |
84 | return false; |
85 | } |
86 | duckdb::Connection *conn = (duckdb::Connection *)con; |
87 | return conn->context->ExecutionIsFinished(); |
88 | } |
89 |