Branch data Line data Source code
1 : : /*
2 : : * This file is part of the MicroPython project, http://micropython.org/
3 : : *
4 : : * The MIT License (MIT)
5 : : *
6 : : * Copyright (c) 2020 Damien P. George
7 : : *
8 : : * Permission is hereby granted, free of charge, to any person obtaining a copy
9 : : * of this software and associated documentation files (the "Software"), to deal
10 : : * in the Software without restriction, including without limitation the rights
11 : : * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
12 : : * copies of the Software, and to permit persons to whom the Software is
13 : : * furnished to do so, subject to the following conditions:
14 : : *
15 : : * The above copyright notice and this permission notice shall be included in
16 : : * all copies or substantial portions of the Software.
17 : : *
18 : : * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
19 : : * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
20 : : * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
21 : : * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
22 : : * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
23 : : * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
24 : : * THE SOFTWARE.
25 : : */
26 : :
27 : : #include "py/runtime.h"
28 : : #include "py/smallint.h"
29 : : #include "py/pairheap.h"
30 : : #include "py/mphal.h"
31 : :
32 : : #if MICROPY_PY_ASYNCIO
33 : :
34 : : // Used when task cannot be guaranteed to be non-NULL.
35 : : #define TASK_PAIRHEAP(task) ((task) ? &(task)->pairheap : NULL)
36 : :
37 : : #define TASK_STATE_RUNNING_NOT_WAITED_ON (mp_const_true)
38 : : #define TASK_STATE_DONE_NOT_WAITED_ON (mp_const_none)
39 : : #define TASK_STATE_DONE_WAS_WAITED_ON (mp_const_false)
40 : :
41 : : #define TASK_IS_DONE(task) ( \
42 : : (task)->state == TASK_STATE_DONE_NOT_WAITED_ON \
43 : : || (task)->state == TASK_STATE_DONE_WAS_WAITED_ON)
44 : :
45 : : typedef struct _mp_obj_task_t {
46 : : mp_pairheap_t pairheap;
47 : : mp_obj_t coro;
48 : : mp_obj_t data;
49 : : mp_obj_t state;
50 : : mp_obj_t ph_key;
51 : : } mp_obj_task_t;
52 : :
53 : : typedef struct _mp_obj_task_queue_t {
54 : : mp_obj_base_t base;
55 : : mp_obj_task_t *heap;
56 : : #if MICROPY_PY_ASYNCIO_TASK_QUEUE_PUSH_CALLBACK
57 : : mp_obj_t push_callback;
58 : : #endif
59 : : } mp_obj_task_queue_t;
60 : :
61 : : static const mp_obj_type_t task_queue_type;
62 : : static const mp_obj_type_t task_type;
63 : :
64 : : static mp_obj_t task_queue_make_new(const mp_obj_type_t *type, size_t n_args, size_t n_kw, const mp_obj_t *args);
65 : :
66 : : /******************************************************************************/
67 : : // Ticks for task ordering in pairing heap
68 : :
69 : 1164 : static mp_obj_t ticks(void) {
70 : 1164 : return MP_OBJ_NEW_SMALL_INT(mp_hal_ticks_ms() & (MICROPY_PY_TIME_TICKS_PERIOD - 1));
71 : : }
72 : :
73 : 884589 : static mp_int_t ticks_diff(mp_obj_t t1_in, mp_obj_t t0_in) {
74 : 884589 : mp_uint_t t0 = MP_OBJ_SMALL_INT_VALUE(t0_in);
75 : 884589 : mp_uint_t t1 = MP_OBJ_SMALL_INT_VALUE(t1_in);
76 : 884589 : mp_int_t diff = ((t1 - t0 + MICROPY_PY_TIME_TICKS_PERIOD / 2) & (MICROPY_PY_TIME_TICKS_PERIOD - 1))
77 : 884589 : - MICROPY_PY_TIME_TICKS_PERIOD / 2;
78 : 884589 : return diff;
79 : : }
80 : :
81 : 884471 : static int task_lt(mp_pairheap_t *n1, mp_pairheap_t *n2) {
82 : 884471 : mp_obj_task_t *t1 = (mp_obj_task_t *)n1;
83 : 884471 : mp_obj_task_t *t2 = (mp_obj_task_t *)n2;
84 : 884471 : return MP_OBJ_SMALL_INT_VALUE(ticks_diff(t1->ph_key, t2->ph_key)) < 0;
85 : : }
86 : :
87 : : /******************************************************************************/
88 : : // TaskQueue class
89 : :
90 : 218 : static mp_obj_t task_queue_make_new(const mp_obj_type_t *type, size_t n_args, size_t n_kw, const mp_obj_t *args) {
91 : 218 : (void)args;
92 : 218 : mp_arg_check_num(n_args, n_kw, 0, MICROPY_PY_ASYNCIO_TASK_QUEUE_PUSH_CALLBACK ? 1 : 0, false);
93 : 218 : mp_obj_task_queue_t *self = mp_obj_malloc(mp_obj_task_queue_t, type);
94 : 218 : self->heap = (mp_obj_task_t *)mp_pairheap_new(task_lt);
95 : : #if MICROPY_PY_ASYNCIO_TASK_QUEUE_PUSH_CALLBACK
96 : : if (n_args == 1) {
97 : : self->push_callback = args[0];
98 : : } else {
99 : : self->push_callback = MP_OBJ_NULL;
100 : : }
101 : : #endif
102 : 218 : return MP_OBJ_FROM_PTR(self);
103 : : }
104 : :
105 : 296148 : static mp_obj_t task_queue_peek(mp_obj_t self_in) {
106 : 296148 : mp_obj_task_queue_t *self = MP_OBJ_TO_PTR(self_in);
107 [ + + ]: 296148 : if (self->heap == NULL) {
108 : : return mp_const_none;
109 : : } else {
110 : 295901 : return MP_OBJ_FROM_PTR(self->heap);
111 : : }
112 : : }
113 : : static MP_DEFINE_CONST_FUN_OBJ_1(task_queue_peek_obj, task_queue_peek);
114 : :
115 : 295922 : static mp_obj_t task_queue_push(size_t n_args, const mp_obj_t *args) {
116 : 295922 : mp_obj_task_queue_t *self = MP_OBJ_TO_PTR(args[0]);
117 : 295922 : mp_obj_task_t *task = MP_OBJ_TO_PTR(args[1]);
118 : 295922 : task->data = mp_const_none;
119 [ + + ]: 295922 : if (n_args == 2) {
120 : 1046 : task->ph_key = ticks();
121 : : } else {
122 [ - + ]: 294876 : assert(mp_obj_is_small_int(args[2]));
123 : 294876 : task->ph_key = args[2];
124 : : }
125 [ + + ]: 295922 : self->heap = (mp_obj_task_t *)mp_pairheap_push(task_lt, TASK_PAIRHEAP(self->heap), TASK_PAIRHEAP(task));
126 : : #if MICROPY_PY_ASYNCIO_TASK_QUEUE_PUSH_CALLBACK
127 : : if (self->push_callback != MP_OBJ_NULL) {
128 : : mp_call_function_1(self->push_callback, MP_OBJ_NEW_SMALL_INT(0));
129 : : }
130 : : #endif
131 : 295922 : return mp_const_none;
132 : : }
133 : : static MP_DEFINE_CONST_FUN_OBJ_VAR_BETWEEN(task_queue_push_obj, 2, 3, task_queue_push);
134 : :
135 : 295816 : static mp_obj_t task_queue_pop(mp_obj_t self_in) {
136 : 295816 : mp_obj_task_queue_t *self = MP_OBJ_TO_PTR(self_in);
137 : 295816 : mp_obj_task_t *head = (mp_obj_task_t *)mp_pairheap_peek(task_lt, &self->heap->pairheap);
138 [ - + ]: 295816 : if (head == NULL) {
139 : 0 : mp_raise_msg(&mp_type_IndexError, MP_ERROR_TEXT("empty heap"));
140 : : }
141 : 295816 : self->heap = (mp_obj_task_t *)mp_pairheap_pop(task_lt, &self->heap->pairheap);
142 : 295816 : return MP_OBJ_FROM_PTR(head);
143 : : }
144 : : static MP_DEFINE_CONST_FUN_OBJ_1(task_queue_pop_obj, task_queue_pop);
145 : :
146 : 80 : static mp_obj_t task_queue_remove(mp_obj_t self_in, mp_obj_t task_in) {
147 : 80 : mp_obj_task_queue_t *self = MP_OBJ_TO_PTR(self_in);
148 : 80 : mp_obj_task_t *task = MP_OBJ_TO_PTR(task_in);
149 : 80 : self->heap = (mp_obj_task_t *)mp_pairheap_delete(task_lt, &self->heap->pairheap, &task->pairheap);
150 : 80 : return mp_const_none;
151 : : }
152 : : static MP_DEFINE_CONST_FUN_OBJ_2(task_queue_remove_obj, task_queue_remove);
153 : :
154 : : static const mp_rom_map_elem_t task_queue_locals_dict_table[] = {
155 : : { MP_ROM_QSTR(MP_QSTR_peek), MP_ROM_PTR(&task_queue_peek_obj) },
156 : : { MP_ROM_QSTR(MP_QSTR_push), MP_ROM_PTR(&task_queue_push_obj) },
157 : : { MP_ROM_QSTR(MP_QSTR_pop), MP_ROM_PTR(&task_queue_pop_obj) },
158 : : { MP_ROM_QSTR(MP_QSTR_remove), MP_ROM_PTR(&task_queue_remove_obj) },
159 : : };
160 : : static MP_DEFINE_CONST_DICT(task_queue_locals_dict, task_queue_locals_dict_table);
161 : :
162 : : static MP_DEFINE_CONST_OBJ_TYPE(
163 : : task_queue_type,
164 : : MP_QSTR_TaskQueue,
165 : : MP_TYPE_FLAG_NONE,
166 : : make_new, task_queue_make_new,
167 : : locals_dict, &task_queue_locals_dict
168 : : );
169 : :
170 : : /******************************************************************************/
171 : : // Task class
172 : :
173 : : // This is the core asyncio context with cur_task, _task_queue and CancelledError.
174 : : mp_obj_t mp_asyncio_context = MP_OBJ_NULL;
175 : :
176 : 460 : static mp_obj_t task_make_new(const mp_obj_type_t *type, size_t n_args, size_t n_kw, const mp_obj_t *args) {
177 : 460 : mp_arg_check_num(n_args, n_kw, 1, 2, false);
178 : 460 : mp_obj_task_t *self = m_new_obj(mp_obj_task_t);
179 : 460 : self->pairheap.base.type = type;
180 [ + - ]: 460 : mp_pairheap_init_node(task_lt, &self->pairheap);
181 : 460 : self->coro = args[0];
182 : 460 : self->data = mp_const_none;
183 : 460 : self->state = TASK_STATE_RUNNING_NOT_WAITED_ON;
184 : 460 : self->ph_key = MP_OBJ_NEW_SMALL_INT(0);
185 [ + - ]: 460 : if (n_args == 2) {
186 : 460 : mp_asyncio_context = args[1];
187 : : }
188 : 460 : return MP_OBJ_FROM_PTR(self);
189 : : }
190 : :
191 : 42 : static mp_obj_t task_done(mp_obj_t self_in) {
192 : 42 : mp_obj_task_t *self = MP_OBJ_TO_PTR(self_in);
193 [ + + ]: 42 : return mp_obj_new_bool(TASK_IS_DONE(self));
194 : : }
195 : : static MP_DEFINE_CONST_FUN_OBJ_1(task_done_obj, task_done);
196 : :
197 : 152 : static mp_obj_t task_cancel(mp_obj_t self_in) {
198 : 152 : mp_obj_task_t *self = MP_OBJ_TO_PTR(self_in);
199 : : // Check if task is already finished.
200 [ + + ]: 152 : if (TASK_IS_DONE(self)) {
201 : : return mp_const_false;
202 : : }
203 : : // Can't cancel self (not supported yet).
204 : 136 : mp_obj_t cur_task = mp_obj_dict_get(mp_asyncio_context, MP_OBJ_NEW_QSTR(MP_QSTR_cur_task));
205 [ + + ]: 136 : if (self_in == cur_task) {
206 : 2 : mp_raise_msg(&mp_type_RuntimeError, MP_ERROR_TEXT("can't cancel self"));
207 : : }
208 : : // If Task waits on another task then forward the cancel to the one it's waiting on.
209 [ + + ]: 160 : while (mp_obj_is_subclass_fast(MP_OBJ_FROM_PTR(mp_obj_get_type(self->data)), MP_OBJ_FROM_PTR(&task_type))) {
210 : 26 : self = MP_OBJ_TO_PTR(self->data);
211 : : }
212 : :
213 : 134 : mp_obj_t _task_queue = mp_obj_dict_get(mp_asyncio_context, MP_OBJ_NEW_QSTR(MP_QSTR__task_queue));
214 : :
215 : : // Reschedule Task as a cancelled task.
216 : 134 : mp_obj_t dest[3];
217 : 134 : mp_load_method_maybe(self->data, MP_QSTR_remove, dest);
218 [ + + ]: 134 : if (dest[0] != MP_OBJ_NULL) {
219 : : // Not on the main running queue, remove the task from the queue it's on.
220 : 16 : dest[2] = MP_OBJ_FROM_PTR(self);
221 : 16 : mp_call_method_n_kw(1, 0, dest);
222 : : // _task_queue.push(self)
223 : 16 : dest[0] = _task_queue;
224 : 16 : dest[1] = MP_OBJ_FROM_PTR(self);
225 : 16 : task_queue_push(2, dest);
226 [ + + ]: 118 : } else if (ticks_diff(self->ph_key, ticks()) > 0) {
227 : : // On the main running queue but scheduled in the future, so bring it forward to now.
228 : : // _task_queue.remove(self)
229 : 70 : task_queue_remove(_task_queue, MP_OBJ_FROM_PTR(self));
230 : : // _task_queue.push(self)
231 : 70 : dest[0] = _task_queue;
232 : 70 : dest[1] = MP_OBJ_FROM_PTR(self);
233 : 70 : task_queue_push(2, dest);
234 : : }
235 : :
236 : 134 : self->data = mp_obj_dict_get(mp_asyncio_context, MP_OBJ_NEW_QSTR(MP_QSTR_CancelledError));
237 : :
238 : 134 : return mp_const_true;
239 : : }
240 : : static MP_DEFINE_CONST_FUN_OBJ_1(task_cancel_obj, task_cancel);
241 : :
242 : 890443 : static void task_attr(mp_obj_t self_in, qstr attr, mp_obj_t *dest) {
243 : 890443 : mp_obj_task_t *self = MP_OBJ_TO_PTR(self_in);
244 [ + + ]: 890443 : if (dest[0] == MP_OBJ_NULL) {
245 : : // Load
246 [ + + ]: 889331 : if (attr == MP_QSTR_coro) {
247 : 295630 : dest[0] = self->coro;
248 [ + + ]: 593701 : } else if (attr == MP_QSTR_data) {
249 : 296332 : dest[0] = self->data;
250 [ + + ]: 297369 : } else if (attr == MP_QSTR_state) {
251 : 1456 : dest[0] = self->state;
252 [ + + ]: 295913 : } else if (attr == MP_QSTR_done) {
253 : 42 : dest[0] = MP_OBJ_FROM_PTR(&task_done_obj);
254 : 42 : dest[1] = self_in;
255 [ + + ]: 295871 : } else if (attr == MP_QSTR_cancel) {
256 : 152 : dest[0] = MP_OBJ_FROM_PTR(&task_cancel_obj);
257 : 152 : dest[1] = self_in;
258 [ + + ]: 295719 : } else if (attr == MP_QSTR_ph_key) {
259 : 295715 : dest[0] = self->ph_key;
260 : : }
261 [ + - ]: 1112 : } else if (dest[1] != MP_OBJ_NULL) {
262 : : // Store
263 [ + + ]: 1112 : if (attr == MP_QSTR_data) {
264 : 706 : self->data = dest[1];
265 : 706 : dest[0] = MP_OBJ_NULL;
266 [ + - ]: 406 : } else if (attr == MP_QSTR_state) {
267 : 406 : self->state = dest[1];
268 : 406 : dest[0] = MP_OBJ_NULL;
269 : : }
270 : : }
271 : 890443 : }
272 : :
273 : 136 : static mp_obj_t task_getiter(mp_obj_t self_in, mp_obj_iter_buf_t *iter_buf) {
274 : 136 : (void)iter_buf;
275 : 136 : mp_obj_task_t *self = MP_OBJ_TO_PTR(self_in);
276 [ + + ]: 136 : if (TASK_IS_DONE(self)) {
277 : : // Signal that the completed-task has been await'ed on.
278 : 26 : self->state = TASK_STATE_DONE_WAS_WAITED_ON;
279 [ + + ]: 110 : } else if (self->state == TASK_STATE_RUNNING_NOT_WAITED_ON) {
280 : : // Allocate the waiting queue.
281 : 108 : self->state = task_queue_make_new(&task_queue_type, 0, 0, NULL);
282 [ + - ]: 2 : } else if (mp_obj_get_type(self->state) != &task_queue_type) {
283 : : // Task has state used for another purpose, so can't also wait on it.
284 : 2 : mp_raise_msg(&mp_type_RuntimeError, MP_ERROR_TEXT("can't wait"));
285 : : }
286 : 134 : return self_in;
287 : : }
288 : :
289 : 236 : static mp_obj_t task_iternext(mp_obj_t self_in) {
290 : 236 : mp_obj_task_t *self = MP_OBJ_TO_PTR(self_in);
291 [ + + ]: 236 : if (TASK_IS_DONE(self)) {
292 : : // Task finished, raise return value to caller so it can continue.
293 : 128 : nlr_raise(self->data);
294 : : } else {
295 : : // Put calling task on waiting queue.
296 : 108 : mp_obj_t cur_task = mp_obj_dict_get(mp_asyncio_context, MP_OBJ_NEW_QSTR(MP_QSTR_cur_task));
297 : 108 : mp_obj_t args[2] = { self->state, cur_task };
298 : 108 : task_queue_push(2, args);
299 : : // Set calling task's data to this task that it waits on, to double-link it.
300 : 108 : ((mp_obj_task_t *)MP_OBJ_TO_PTR(cur_task))->data = self_in;
301 : : }
302 : 108 : return mp_const_none;
303 : : }
304 : :
305 : : static const mp_getiter_iternext_custom_t task_getiter_iternext = {
306 : : .getiter = task_getiter,
307 : : .iternext = task_iternext,
308 : : };
309 : :
310 : : static MP_DEFINE_CONST_OBJ_TYPE(
311 : : task_type,
312 : : MP_QSTR_Task,
313 : : MP_TYPE_FLAG_ITER_IS_CUSTOM,
314 : : make_new, task_make_new,
315 : : attr, task_attr,
316 : : iter, &task_getiter_iternext
317 : : );
318 : :
319 : : /******************************************************************************/
320 : : // C-level asyncio module
321 : :
322 : : static const mp_rom_map_elem_t mp_module_asyncio_globals_table[] = {
323 : : { MP_ROM_QSTR(MP_QSTR___name__), MP_ROM_QSTR(MP_QSTR__asyncio) },
324 : : { MP_ROM_QSTR(MP_QSTR_TaskQueue), MP_ROM_PTR(&task_queue_type) },
325 : : { MP_ROM_QSTR(MP_QSTR_Task), MP_ROM_PTR(&task_type) },
326 : : };
327 : : static MP_DEFINE_CONST_DICT(mp_module_asyncio_globals, mp_module_asyncio_globals_table);
328 : :
329 : : const mp_obj_module_t mp_module_asyncio = {
330 : : .base = { &mp_type_module },
331 : : .globals = (mp_obj_dict_t *)&mp_module_asyncio_globals,
332 : : };
333 : :
334 : : MP_REGISTER_MODULE(MP_QSTR__asyncio, mp_module_asyncio);
335 : :
336 : : #endif // MICROPY_PY_ASYNCIO
|