1 | /* |
2 | * Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"). |
5 | * You may not use this file except in compliance with the License. |
6 | * A copy of the License is located at |
7 | * |
8 | * http://aws.amazon.com/apache2.0 |
9 | * |
10 | * or in the "license" file accompanying this file. This file is distributed |
11 | * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either |
12 | * express or implied. See the License for the specific language governing |
13 | * permissions and limitations under the License. |
14 | */ |
15 | |
16 | #include <aws/common/task_scheduler.h> |
17 | |
18 | #include <aws/common/logging.h> |
19 | |
20 | #include <inttypes.h> |
21 | |
22 | static const size_t DEFAULT_QUEUE_SIZE = 7; |
23 | |
24 | void aws_task_init(struct aws_task *task, aws_task_fn *fn, void *arg, const char *type_tag) { |
25 | AWS_ZERO_STRUCT(*task); |
26 | task->fn = fn; |
27 | task->arg = arg; |
28 | task->type_tag = type_tag; |
29 | } |
30 | |
31 | const char *aws_task_status_to_c_str(enum aws_task_status status) { |
32 | switch (status) { |
33 | case AWS_TASK_STATUS_RUN_READY: |
34 | return "<Running>" ; |
35 | |
36 | case AWS_TASK_STATUS_CANCELED: |
37 | return "<Canceled>" ; |
38 | |
39 | default: |
40 | return "<Unknown>" ; |
41 | } |
42 | } |
43 | |
44 | void aws_task_run(struct aws_task *task, enum aws_task_status status) { |
45 | AWS_ASSERT(task->fn); |
46 | AWS_LOGF_DEBUG( |
47 | AWS_LS_COMMON_TASK_SCHEDULER, |
48 | "id=%p: Running %s task with %s status" , |
49 | (void *)task, |
50 | task->type_tag, |
51 | aws_task_status_to_c_str(status)); |
52 | |
53 | task->fn(task, task->arg, status); |
54 | } |
55 | |
56 | static int s_compare_timestamps(const void *a, const void *b) { |
57 | uint64_t a_time = (*(struct aws_task **)a)->timestamp; |
58 | uint64_t b_time = (*(struct aws_task **)b)->timestamp; |
59 | return a_time > b_time; /* min-heap */ |
60 | } |
61 | |
62 | static void s_run_all(struct aws_task_scheduler *scheduler, uint64_t current_time, enum aws_task_status status); |
63 | |
64 | int aws_task_scheduler_init(struct aws_task_scheduler *scheduler, struct aws_allocator *alloc) { |
65 | AWS_ASSERT(alloc); |
66 | |
67 | AWS_ZERO_STRUCT(*scheduler); |
68 | |
69 | if (aws_priority_queue_init_dynamic( |
70 | &scheduler->timed_queue, alloc, DEFAULT_QUEUE_SIZE, sizeof(struct aws_task *), &s_compare_timestamps)) { |
71 | return AWS_OP_ERR; |
72 | }; |
73 | |
74 | scheduler->alloc = alloc; |
75 | aws_linked_list_init(&scheduler->timed_list); |
76 | aws_linked_list_init(&scheduler->asap_list); |
77 | |
78 | AWS_POSTCONDITION(aws_task_scheduler_is_valid(scheduler)); |
79 | return AWS_OP_SUCCESS; |
80 | } |
81 | |
82 | void aws_task_scheduler_clean_up(struct aws_task_scheduler *scheduler) { |
83 | AWS_ASSERT(scheduler); |
84 | |
85 | if (aws_task_scheduler_is_valid(scheduler)) { |
86 | /* Execute all remaining tasks as CANCELED. |
87 | * Do this in a loop so that tasks scheduled by other tasks are executed */ |
88 | while (aws_task_scheduler_has_tasks(scheduler, NULL)) { |
89 | s_run_all(scheduler, UINT64_MAX, AWS_TASK_STATUS_CANCELED); |
90 | } |
91 | } |
92 | |
93 | aws_priority_queue_clean_up(&scheduler->timed_queue); |
94 | AWS_ZERO_STRUCT(*scheduler); |
95 | } |
96 | |
97 | bool aws_task_scheduler_is_valid(const struct aws_task_scheduler *scheduler) { |
98 | return scheduler && scheduler->alloc && aws_priority_queue_is_valid(&scheduler->timed_queue) && |
99 | aws_linked_list_is_valid(&scheduler->asap_list) && aws_linked_list_is_valid(&scheduler->timed_list); |
100 | } |
101 | |
102 | bool aws_task_scheduler_has_tasks(const struct aws_task_scheduler *scheduler, uint64_t *next_task_time) { |
103 | AWS_ASSERT(scheduler); |
104 | |
105 | uint64_t timestamp = UINT64_MAX; |
106 | bool has_tasks = false; |
107 | |
108 | if (!aws_linked_list_empty(&scheduler->asap_list)) { |
109 | timestamp = 0; |
110 | has_tasks = true; |
111 | |
112 | } else { |
113 | /* Check whether timed_list or timed_queue has the earlier task */ |
114 | if (AWS_UNLIKELY(!aws_linked_list_empty(&scheduler->timed_list))) { |
115 | struct aws_linked_list_node *node = aws_linked_list_front(&scheduler->timed_list); |
116 | struct aws_task *task = AWS_CONTAINER_OF(node, struct aws_task, node); |
117 | timestamp = task->timestamp; |
118 | has_tasks = true; |
119 | } |
120 | |
121 | struct aws_task **task_ptrptr = NULL; |
122 | if (aws_priority_queue_top(&scheduler->timed_queue, (void **)&task_ptrptr) == AWS_OP_SUCCESS) { |
123 | if ((*task_ptrptr)->timestamp < timestamp) { |
124 | timestamp = (*task_ptrptr)->timestamp; |
125 | } |
126 | has_tasks = true; |
127 | } |
128 | } |
129 | |
130 | if (next_task_time) { |
131 | *next_task_time = timestamp; |
132 | } |
133 | return has_tasks; |
134 | } |
135 | |
136 | void aws_task_scheduler_schedule_now(struct aws_task_scheduler *scheduler, struct aws_task *task) { |
137 | AWS_ASSERT(scheduler); |
138 | AWS_ASSERT(task); |
139 | AWS_ASSERT(task->fn); |
140 | |
141 | AWS_LOGF_DEBUG( |
142 | AWS_LS_COMMON_TASK_SCHEDULER, |
143 | "id=%p: Scheduling %s task for immediate execution" , |
144 | (void *)task, |
145 | task->type_tag); |
146 | |
147 | task->priority_queue_node.current_index = SIZE_MAX; |
148 | aws_linked_list_node_reset(&task->node); |
149 | task->timestamp = 0; |
150 | |
151 | aws_linked_list_push_back(&scheduler->asap_list, &task->node); |
152 | } |
153 | |
154 | void aws_task_scheduler_schedule_future( |
155 | struct aws_task_scheduler *scheduler, |
156 | struct aws_task *task, |
157 | uint64_t time_to_run) { |
158 | |
159 | AWS_ASSERT(scheduler); |
160 | AWS_ASSERT(task); |
161 | AWS_ASSERT(task->fn); |
162 | |
163 | AWS_LOGF_DEBUG( |
164 | AWS_LS_COMMON_TASK_SCHEDULER, |
165 | "id=%p: Scheduling %s task for future execution at time %" PRIu64, |
166 | (void *)task, |
167 | task->type_tag, |
168 | time_to_run); |
169 | |
170 | task->timestamp = time_to_run; |
171 | |
172 | task->priority_queue_node.current_index = SIZE_MAX; |
173 | aws_linked_list_node_reset(&task->node); |
174 | int err = aws_priority_queue_push_ref(&scheduler->timed_queue, &task, &task->priority_queue_node); |
175 | if (AWS_UNLIKELY(err)) { |
176 | /* In the (very unlikely) case that we can't push into the timed_queue, |
177 | * perform a sorted insertion into timed_list. */ |
178 | struct aws_linked_list_node *node_i; |
179 | for (node_i = aws_linked_list_begin(&scheduler->timed_list); |
180 | node_i != aws_linked_list_end(&scheduler->timed_list); |
181 | node_i = aws_linked_list_next(node_i)) { |
182 | |
183 | struct aws_task *task_i = AWS_CONTAINER_OF(node_i, struct aws_task, node); |
184 | if (task_i->timestamp > time_to_run) { |
185 | break; |
186 | } |
187 | } |
188 | aws_linked_list_insert_before(node_i, &task->node); |
189 | } |
190 | } |
191 | |
192 | void aws_task_scheduler_run_all(struct aws_task_scheduler *scheduler, uint64_t current_time) { |
193 | AWS_ASSERT(scheduler); |
194 | |
195 | s_run_all(scheduler, current_time, AWS_TASK_STATUS_RUN_READY); |
196 | } |
197 | |
198 | static void s_run_all(struct aws_task_scheduler *scheduler, uint64_t current_time, enum aws_task_status status) { |
199 | |
200 | /* Move scheduled tasks to running_list before executing. |
201 | * This gives us the desired behavior that: if executing a task results in another task being scheduled, |
202 | * that new task is not executed until the next time run() is invoked. */ |
203 | struct aws_linked_list running_list; |
204 | aws_linked_list_init(&running_list); |
205 | |
206 | /* First move everything from asap_list */ |
207 | aws_linked_list_swap_contents(&running_list, &scheduler->asap_list); |
208 | |
209 | /* Next move tasks from timed_queue and timed_list, based on whichever's next-task is sooner. |
210 | * It's very unlikely that any tasks are in timed_list, so once it has no more valid tasks, |
211 | * break out of this complex loop in favor of a simpler one. */ |
212 | while (AWS_UNLIKELY(!aws_linked_list_empty(&scheduler->timed_list))) { |
213 | |
214 | struct aws_linked_list_node *timed_list_node = aws_linked_list_begin(&scheduler->timed_list); |
215 | struct aws_task *timed_list_task = AWS_CONTAINER_OF(timed_list_node, struct aws_task, node); |
216 | if (timed_list_task->timestamp > current_time) { |
217 | /* timed_list is out of valid tasks, break out of complex loop */ |
218 | break; |
219 | } |
220 | |
221 | /* Check if timed_queue has a task which is sooner */ |
222 | struct aws_task **timed_queue_task_ptrptr = NULL; |
223 | if (aws_priority_queue_top(&scheduler->timed_queue, (void **)&timed_queue_task_ptrptr) == AWS_OP_SUCCESS) { |
224 | if ((*timed_queue_task_ptrptr)->timestamp <= current_time) { |
225 | if ((*timed_queue_task_ptrptr)->timestamp < timed_list_task->timestamp) { |
226 | /* Take task from timed_queue */ |
227 | struct aws_task *timed_queue_task; |
228 | aws_priority_queue_pop(&scheduler->timed_queue, &timed_queue_task); |
229 | aws_linked_list_push_back(&running_list, &timed_queue_task->node); |
230 | continue; |
231 | } |
232 | } |
233 | } |
234 | |
235 | /* Take task from timed_list */ |
236 | aws_linked_list_pop_front(&scheduler->timed_list); |
237 | aws_linked_list_push_back(&running_list, &timed_list_task->node); |
238 | } |
239 | |
240 | /* Simpler loop that moves remaining valid tasks from timed_queue */ |
241 | struct aws_task **timed_queue_task_ptrptr = NULL; |
242 | while (aws_priority_queue_top(&scheduler->timed_queue, (void **)&timed_queue_task_ptrptr) == AWS_OP_SUCCESS) { |
243 | if ((*timed_queue_task_ptrptr)->timestamp > current_time) { |
244 | break; |
245 | } |
246 | |
247 | struct aws_task *next_timed_task; |
248 | aws_priority_queue_pop(&scheduler->timed_queue, &next_timed_task); |
249 | aws_linked_list_push_back(&running_list, &next_timed_task->node); |
250 | } |
251 | |
252 | /* Run tasks */ |
253 | while (!aws_linked_list_empty(&running_list)) { |
254 | struct aws_linked_list_node *task_node = aws_linked_list_pop_front(&running_list); |
255 | struct aws_task *task = AWS_CONTAINER_OF(task_node, struct aws_task, node); |
256 | aws_task_run(task, status); |
257 | } |
258 | } |
259 | |
260 | void aws_task_scheduler_cancel_task(struct aws_task_scheduler *scheduler, struct aws_task *task) { |
261 | /* attempt the linked lists first since those will be faster access and more likely to occur |
262 | * anyways. |
263 | */ |
264 | if (task->node.next) { |
265 | aws_linked_list_remove(&task->node); |
266 | } else { |
267 | aws_priority_queue_remove(&scheduler->timed_queue, &task, &task->priority_queue_node); |
268 | } |
269 | |
270 | /* |
271 | * No need to log cancellation specially; it will get logged during the run call with the canceled status |
272 | */ |
273 | aws_task_run(task, AWS_TASK_STATUS_CANCELED); |
274 | } |
275 | |