1/*
2 * This file is part of the MicroPython project, http://micropython.org/
3 *
4 * The MIT License (MIT)
5 *
6 * Copyright (c) 2020 Damien P. George
7 *
8 * Permission is hereby granted, free of charge, to any person obtaining a copy
9 * of this software and associated documentation files (the "Software"), to deal
10 * in the Software without restriction, including without limitation the rights
11 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
12 * copies of the Software, and to permit persons to whom the Software is
13 * furnished to do so, subject to the following conditions:
14 *
15 * The above copyright notice and this permission notice shall be included in
16 * all copies or substantial portions of the Software.
17 *
18 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
19 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
20 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
21 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
22 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
23 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
24 * THE SOFTWARE.
25 */
26
27#include "py/runtime.h"
28#include "py/smallint.h"
29#include "py/pairheap.h"
30#include "py/mphal.h"
31
32#if MICROPY_PY_UASYNCIO
33
34typedef struct _mp_obj_task_t {
35 mp_pairheap_t pairheap;
36 mp_obj_t coro;
37 mp_obj_t data;
38 mp_obj_t waiting;
39
40 mp_obj_t ph_key;
41} mp_obj_task_t;
42
43typedef struct _mp_obj_task_queue_t {
44 mp_obj_base_t base;
45 mp_obj_task_t *heap;
46} mp_obj_task_queue_t;
47
48STATIC const mp_obj_type_t task_queue_type;
49STATIC const mp_obj_type_t task_type;
50
51STATIC mp_obj_t task_queue_make_new(const mp_obj_type_t *type, size_t n_args, size_t n_kw, const mp_obj_t *args);
52
53/******************************************************************************/
54// Ticks for task ordering in pairing heap
55
56STATIC mp_obj_t ticks(void) {
57 return MP_OBJ_NEW_SMALL_INT(mp_hal_ticks_ms() & (MICROPY_PY_UTIME_TICKS_PERIOD - 1));
58}
59
60STATIC mp_int_t ticks_diff(mp_obj_t t1_in, mp_obj_t t0_in) {
61 mp_uint_t t0 = MP_OBJ_SMALL_INT_VALUE(t0_in);
62 mp_uint_t t1 = MP_OBJ_SMALL_INT_VALUE(t1_in);
63 mp_int_t diff = ((t1 - t0 + MICROPY_PY_UTIME_TICKS_PERIOD / 2) & (MICROPY_PY_UTIME_TICKS_PERIOD - 1))
64 - MICROPY_PY_UTIME_TICKS_PERIOD / 2;
65 return diff;
66}
67
68STATIC int task_lt(mp_pairheap_t *n1, mp_pairheap_t *n2) {
69 mp_obj_task_t *t1 = (mp_obj_task_t *)n1;
70 mp_obj_task_t *t2 = (mp_obj_task_t *)n2;
71 return MP_OBJ_SMALL_INT_VALUE(ticks_diff(t1->ph_key, t2->ph_key)) < 0;
72}
73
74/******************************************************************************/
75// TaskQueue class
76
77STATIC mp_obj_t task_queue_make_new(const mp_obj_type_t *type, size_t n_args, size_t n_kw, const mp_obj_t *args) {
78 (void)args;
79 mp_arg_check_num(n_args, n_kw, 0, 0, false);
80 mp_obj_task_queue_t *self = m_new_obj(mp_obj_task_queue_t);
81 self->base.type = type;
82 self->heap = (mp_obj_task_t *)mp_pairheap_new(task_lt);
83 return MP_OBJ_FROM_PTR(self);
84}
85
86STATIC mp_obj_t task_queue_peek(mp_obj_t self_in) {
87 mp_obj_task_queue_t *self = MP_OBJ_TO_PTR(self_in);
88 if (self->heap == NULL) {
89 return mp_const_none;
90 } else {
91 return MP_OBJ_FROM_PTR(self->heap);
92 }
93}
94STATIC MP_DEFINE_CONST_FUN_OBJ_1(task_queue_peek_obj, task_queue_peek);
95
96STATIC mp_obj_t task_queue_push_sorted(size_t n_args, const mp_obj_t *args) {
97 mp_obj_task_queue_t *self = MP_OBJ_TO_PTR(args[0]);
98 mp_obj_task_t *task = MP_OBJ_TO_PTR(args[1]);
99 task->data = mp_const_none;
100 if (n_args == 2) {
101 task->ph_key = ticks();
102 } else {
103 assert(mp_obj_is_small_int(args[2]));
104 task->ph_key = args[2];
105 }
106 self->heap = (mp_obj_task_t *)mp_pairheap_push(task_lt, &self->heap->pairheap, &task->pairheap);
107 return mp_const_none;
108}
109STATIC MP_DEFINE_CONST_FUN_OBJ_VAR_BETWEEN(task_queue_push_sorted_obj, 2, 3, task_queue_push_sorted);
110
111STATIC mp_obj_t task_queue_pop_head(mp_obj_t self_in) {
112 mp_obj_task_queue_t *self = MP_OBJ_TO_PTR(self_in);
113 mp_obj_task_t *head = (mp_obj_task_t *)mp_pairheap_peek(task_lt, &self->heap->pairheap);
114 if (head == NULL) {
115 mp_raise_msg(&mp_type_IndexError, MP_ERROR_TEXT("empty heap"));
116 }
117 self->heap = (mp_obj_task_t *)mp_pairheap_pop(task_lt, &self->heap->pairheap);
118 return MP_OBJ_FROM_PTR(head);
119}
120STATIC MP_DEFINE_CONST_FUN_OBJ_1(task_queue_pop_head_obj, task_queue_pop_head);
121
122STATIC mp_obj_t task_queue_remove(mp_obj_t self_in, mp_obj_t task_in) {
123 mp_obj_task_queue_t *self = MP_OBJ_TO_PTR(self_in);
124 mp_obj_task_t *task = MP_OBJ_TO_PTR(task_in);
125 self->heap = (mp_obj_task_t *)mp_pairheap_delete(task_lt, &self->heap->pairheap, &task->pairheap);
126 return mp_const_none;
127}
128STATIC MP_DEFINE_CONST_FUN_OBJ_2(task_queue_remove_obj, task_queue_remove);
129
130STATIC const mp_rom_map_elem_t task_queue_locals_dict_table[] = {
131 { MP_ROM_QSTR(MP_QSTR_peek), MP_ROM_PTR(&task_queue_peek_obj) },
132 { MP_ROM_QSTR(MP_QSTR_push_sorted), MP_ROM_PTR(&task_queue_push_sorted_obj) },
133 { MP_ROM_QSTR(MP_QSTR_push_head), MP_ROM_PTR(&task_queue_push_sorted_obj) },
134 { MP_ROM_QSTR(MP_QSTR_pop_head), MP_ROM_PTR(&task_queue_pop_head_obj) },
135 { MP_ROM_QSTR(MP_QSTR_remove), MP_ROM_PTR(&task_queue_remove_obj) },
136};
137STATIC MP_DEFINE_CONST_DICT(task_queue_locals_dict, task_queue_locals_dict_table);
138
139STATIC const mp_obj_type_t task_queue_type = {
140 { &mp_type_type },
141 .name = MP_QSTR_TaskQueue,
142 .make_new = task_queue_make_new,
143 .locals_dict = (mp_obj_dict_t *)&task_queue_locals_dict,
144};
145
146/******************************************************************************/
147// Task class
148
149// For efficiency, the task object is stored to the coro entry when the task is done.
150#define TASK_IS_DONE(task) ((task)->coro == MP_OBJ_FROM_PTR(task))
151
152// This is the core uasyncio context with cur_task, _task_queue and CancelledError.
153STATIC mp_obj_t uasyncio_context = MP_OBJ_NULL;
154
155STATIC mp_obj_t task_make_new(const mp_obj_type_t *type, size_t n_args, size_t n_kw, const mp_obj_t *args) {
156 mp_arg_check_num(n_args, n_kw, 1, 2, false);
157 mp_obj_task_t *self = m_new_obj(mp_obj_task_t);
158 self->pairheap.base.type = type;
159 mp_pairheap_init_node(task_lt, &self->pairheap);
160 self->coro = args[0];
161 self->data = mp_const_none;
162 self->waiting = mp_const_none;
163 self->ph_key = MP_OBJ_NEW_SMALL_INT(0);
164 if (n_args == 2) {
165 uasyncio_context = args[1];
166 }
167 return MP_OBJ_FROM_PTR(self);
168}
169
170STATIC mp_obj_t task_done(mp_obj_t self_in) {
171 mp_obj_task_t *self = MP_OBJ_TO_PTR(self_in);
172 return mp_obj_new_bool(TASK_IS_DONE(self));
173}
174STATIC MP_DEFINE_CONST_FUN_OBJ_1(task_done_obj, task_done);
175
176STATIC mp_obj_t task_cancel(mp_obj_t self_in) {
177 mp_obj_task_t *self = MP_OBJ_TO_PTR(self_in);
178 // Check if task is already finished.
179 if (TASK_IS_DONE(self)) {
180 return mp_const_false;
181 }
182 // Can't cancel self (not supported yet).
183 mp_obj_t cur_task = mp_obj_dict_get(uasyncio_context, MP_OBJ_NEW_QSTR(MP_QSTR_cur_task));
184 if (self_in == cur_task) {
185 mp_raise_msg(&mp_type_RuntimeError, MP_ERROR_TEXT("can't cancel self"));
186 }
187 // If Task waits on another task then forward the cancel to the one it's waiting on.
188 while (mp_obj_is_subclass_fast(MP_OBJ_FROM_PTR(mp_obj_get_type(self->data)), MP_OBJ_FROM_PTR(&task_type))) {
189 self = MP_OBJ_TO_PTR(self->data);
190 }
191
192 mp_obj_t _task_queue = mp_obj_dict_get(uasyncio_context, MP_OBJ_NEW_QSTR(MP_QSTR__task_queue));
193
194 // Reschedule Task as a cancelled task.
195 mp_obj_t dest[3];
196 mp_load_method_maybe(self->data, MP_QSTR_remove, dest);
197 if (dest[0] != MP_OBJ_NULL) {
198 // Not on the main running queue, remove the task from the queue it's on.
199 dest[2] = MP_OBJ_FROM_PTR(self);
200 mp_call_method_n_kw(1, 0, dest);
201 // _task_queue.push_head(self)
202 dest[0] = _task_queue;
203 dest[1] = MP_OBJ_FROM_PTR(self);
204 task_queue_push_sorted(2, dest);
205 } else if (ticks_diff(self->ph_key, ticks()) > 0) {
206 // On the main running queue but scheduled in the future, so bring it forward to now.
207 // _task_queue.remove(self)
208 task_queue_remove(_task_queue, MP_OBJ_FROM_PTR(self));
209 // _task_queue.push_head(self)
210 dest[0] = _task_queue;
211 dest[1] = MP_OBJ_FROM_PTR(self);
212 task_queue_push_sorted(2, dest);
213 }
214
215 self->data = mp_obj_dict_get(uasyncio_context, MP_OBJ_NEW_QSTR(MP_QSTR_CancelledError));
216
217 return mp_const_true;
218}
219STATIC MP_DEFINE_CONST_FUN_OBJ_1(task_cancel_obj, task_cancel);
220
221STATIC mp_obj_t task_throw(mp_obj_t self_in, mp_obj_t value_in) {
222 // This task raised an exception which was uncaught; handle that now.
223 mp_obj_task_t *self = MP_OBJ_TO_PTR(self_in);
224 // Set the data because it was cleared by the main scheduling loop.
225 self->data = value_in;
226 if (self->waiting == mp_const_none) {
227 // Nothing await'ed on the task so call the exception handler.
228 mp_obj_t _exc_context = mp_obj_dict_get(uasyncio_context, MP_OBJ_NEW_QSTR(MP_QSTR__exc_context));
229 mp_obj_dict_store(_exc_context, MP_OBJ_NEW_QSTR(MP_QSTR_exception), value_in);
230 mp_obj_dict_store(_exc_context, MP_OBJ_NEW_QSTR(MP_QSTR_future), self_in);
231 mp_obj_t Loop = mp_obj_dict_get(uasyncio_context, MP_OBJ_NEW_QSTR(MP_QSTR_Loop));
232 mp_obj_t call_exception_handler = mp_load_attr(Loop, MP_QSTR_call_exception_handler);
233 mp_call_function_1(call_exception_handler, _exc_context);
234 }
235 return mp_const_none;
236}
237STATIC MP_DEFINE_CONST_FUN_OBJ_2(task_throw_obj, task_throw);
238
239STATIC void task_attr(mp_obj_t self_in, qstr attr, mp_obj_t *dest) {
240 mp_obj_task_t *self = MP_OBJ_TO_PTR(self_in);
241 if (dest[0] == MP_OBJ_NULL) {
242 // Load
243 if (attr == MP_QSTR_coro) {
244 dest[0] = self->coro;
245 } else if (attr == MP_QSTR_data) {
246 dest[0] = self->data;
247 } else if (attr == MP_QSTR_waiting) {
248 if (self->waiting != mp_const_none && self->waiting != mp_const_false) {
249 dest[0] = self->waiting;
250 }
251 } else if (attr == MP_QSTR_done) {
252 dest[0] = MP_OBJ_FROM_PTR(&task_done_obj);
253 dest[1] = self_in;
254 } else if (attr == MP_QSTR_cancel) {
255 dest[0] = MP_OBJ_FROM_PTR(&task_cancel_obj);
256 dest[1] = self_in;
257 } else if (attr == MP_QSTR_throw) {
258 dest[0] = MP_OBJ_FROM_PTR(&task_throw_obj);
259 dest[1] = self_in;
260 } else if (attr == MP_QSTR_ph_key) {
261 dest[0] = self->ph_key;
262 }
263 } else if (dest[1] != MP_OBJ_NULL) {
264 // Store
265 if (attr == MP_QSTR_coro) {
266 self->coro = dest[1];
267 dest[0] = MP_OBJ_NULL;
268 } else if (attr == MP_QSTR_data) {
269 self->data = dest[1];
270 dest[0] = MP_OBJ_NULL;
271 } else if (attr == MP_QSTR_waiting) {
272 self->waiting = dest[1];
273 dest[0] = MP_OBJ_NULL;
274 }
275 }
276}
277
278STATIC mp_obj_t task_getiter(mp_obj_t self_in, mp_obj_iter_buf_t *iter_buf) {
279 (void)iter_buf;
280 mp_obj_task_t *self = MP_OBJ_TO_PTR(self_in);
281 if (self->waiting == mp_const_none) {
282 // The is the first access of the "waiting" entry.
283 if (TASK_IS_DONE(self)) {
284 // Signal that the completed-task has been await'ed on.
285 self->waiting = mp_const_false;
286 } else {
287 // Lazily allocate the waiting queue.
288 self->waiting = task_queue_make_new(&task_queue_type, 0, 0, NULL);
289 }
290 }
291 return self_in;
292}
293
294STATIC mp_obj_t task_iternext(mp_obj_t self_in) {
295 mp_obj_task_t *self = MP_OBJ_TO_PTR(self_in);
296 if (TASK_IS_DONE(self)) {
297 // Task finished, raise return value to caller so it can continue.
298 nlr_raise(self->data);
299 } else {
300 // Put calling task on waiting queue.
301 mp_obj_t cur_task = mp_obj_dict_get(uasyncio_context, MP_OBJ_NEW_QSTR(MP_QSTR_cur_task));
302 mp_obj_t args[2] = { self->waiting, cur_task };
303 task_queue_push_sorted(2, args);
304 // Set calling task's data to this task that it waits on, to double-link it.
305 ((mp_obj_task_t *)MP_OBJ_TO_PTR(cur_task))->data = self_in;
306 }
307 return mp_const_none;
308}
309
310STATIC const mp_obj_type_t task_type = {
311 { &mp_type_type },
312 .name = MP_QSTR_Task,
313 .make_new = task_make_new,
314 .attr = task_attr,
315 .getiter = task_getiter,
316 .iternext = task_iternext,
317};
318
319/******************************************************************************/
320// C-level uasyncio module
321
322STATIC const mp_rom_map_elem_t mp_module_uasyncio_globals_table[] = {
323 { MP_ROM_QSTR(MP_QSTR___name__), MP_ROM_QSTR(MP_QSTR__uasyncio) },
324 { MP_ROM_QSTR(MP_QSTR_TaskQueue), MP_ROM_PTR(&task_queue_type) },
325 { MP_ROM_QSTR(MP_QSTR_Task), MP_ROM_PTR(&task_type) },
326};
327STATIC MP_DEFINE_CONST_DICT(mp_module_uasyncio_globals, mp_module_uasyncio_globals_table);
328
329const mp_obj_module_t mp_module_uasyncio = {
330 .base = { &mp_type_module },
331 .globals = (mp_obj_dict_t *)&mp_module_uasyncio_globals,
332};
333
334#endif // MICROPY_PY_UASYNCIO
335