1/*
2 * Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License").
5 * You may not use this file except in compliance with the License.
6 * A copy of the License is located at
7 *
8 * http://aws.amazon.com/apache2.0
9 *
10 * or in the "license" file accompanying this file. This file is distributed
11 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12 * express or implied. See the License for the specific language governing
13 * permissions and limitations under the License.
14 */
15
16#include <aws/common/task_scheduler.h>
17
18#include <aws/common/logging.h>
19
20#include <inttypes.h>
21
22static const size_t DEFAULT_QUEUE_SIZE = 7;
23
24void aws_task_init(struct aws_task *task, aws_task_fn *fn, void *arg, const char *type_tag) {
25 AWS_ZERO_STRUCT(*task);
26 task->fn = fn;
27 task->arg = arg;
28 task->type_tag = type_tag;
29}
30
31const char *aws_task_status_to_c_str(enum aws_task_status status) {
32 switch (status) {
33 case AWS_TASK_STATUS_RUN_READY:
34 return "<Running>";
35
36 case AWS_TASK_STATUS_CANCELED:
37 return "<Canceled>";
38
39 default:
40 return "<Unknown>";
41 }
42}
43
44void aws_task_run(struct aws_task *task, enum aws_task_status status) {
45 AWS_ASSERT(task->fn);
46 AWS_LOGF_DEBUG(
47 AWS_LS_COMMON_TASK_SCHEDULER,
48 "id=%p: Running %s task with %s status",
49 (void *)task,
50 task->type_tag,
51 aws_task_status_to_c_str(status));
52
53 task->fn(task, task->arg, status);
54}
55
56static int s_compare_timestamps(const void *a, const void *b) {
57 uint64_t a_time = (*(struct aws_task **)a)->timestamp;
58 uint64_t b_time = (*(struct aws_task **)b)->timestamp;
59 return a_time > b_time; /* min-heap */
60}
61
62static void s_run_all(struct aws_task_scheduler *scheduler, uint64_t current_time, enum aws_task_status status);
63
64int aws_task_scheduler_init(struct aws_task_scheduler *scheduler, struct aws_allocator *alloc) {
65 AWS_ASSERT(alloc);
66
67 AWS_ZERO_STRUCT(*scheduler);
68
69 if (aws_priority_queue_init_dynamic(
70 &scheduler->timed_queue, alloc, DEFAULT_QUEUE_SIZE, sizeof(struct aws_task *), &s_compare_timestamps)) {
71 return AWS_OP_ERR;
72 };
73
74 scheduler->alloc = alloc;
75 aws_linked_list_init(&scheduler->timed_list);
76 aws_linked_list_init(&scheduler->asap_list);
77
78 AWS_POSTCONDITION(aws_task_scheduler_is_valid(scheduler));
79 return AWS_OP_SUCCESS;
80}
81
82void aws_task_scheduler_clean_up(struct aws_task_scheduler *scheduler) {
83 AWS_ASSERT(scheduler);
84
85 if (aws_task_scheduler_is_valid(scheduler)) {
86 /* Execute all remaining tasks as CANCELED.
87 * Do this in a loop so that tasks scheduled by other tasks are executed */
88 while (aws_task_scheduler_has_tasks(scheduler, NULL)) {
89 s_run_all(scheduler, UINT64_MAX, AWS_TASK_STATUS_CANCELED);
90 }
91 }
92
93 aws_priority_queue_clean_up(&scheduler->timed_queue);
94 AWS_ZERO_STRUCT(*scheduler);
95}
96
97bool aws_task_scheduler_is_valid(const struct aws_task_scheduler *scheduler) {
98 return scheduler && scheduler->alloc && aws_priority_queue_is_valid(&scheduler->timed_queue) &&
99 aws_linked_list_is_valid(&scheduler->asap_list) && aws_linked_list_is_valid(&scheduler->timed_list);
100}
101
102bool aws_task_scheduler_has_tasks(const struct aws_task_scheduler *scheduler, uint64_t *next_task_time) {
103 AWS_ASSERT(scheduler);
104
105 uint64_t timestamp = UINT64_MAX;
106 bool has_tasks = false;
107
108 if (!aws_linked_list_empty(&scheduler->asap_list)) {
109 timestamp = 0;
110 has_tasks = true;
111
112 } else {
113 /* Check whether timed_list or timed_queue has the earlier task */
114 if (AWS_UNLIKELY(!aws_linked_list_empty(&scheduler->timed_list))) {
115 struct aws_linked_list_node *node = aws_linked_list_front(&scheduler->timed_list);
116 struct aws_task *task = AWS_CONTAINER_OF(node, struct aws_task, node);
117 timestamp = task->timestamp;
118 has_tasks = true;
119 }
120
121 struct aws_task **task_ptrptr = NULL;
122 if (aws_priority_queue_top(&scheduler->timed_queue, (void **)&task_ptrptr) == AWS_OP_SUCCESS) {
123 if ((*task_ptrptr)->timestamp < timestamp) {
124 timestamp = (*task_ptrptr)->timestamp;
125 }
126 has_tasks = true;
127 }
128 }
129
130 if (next_task_time) {
131 *next_task_time = timestamp;
132 }
133 return has_tasks;
134}
135
136void aws_task_scheduler_schedule_now(struct aws_task_scheduler *scheduler, struct aws_task *task) {
137 AWS_ASSERT(scheduler);
138 AWS_ASSERT(task);
139 AWS_ASSERT(task->fn);
140
141 AWS_LOGF_DEBUG(
142 AWS_LS_COMMON_TASK_SCHEDULER,
143 "id=%p: Scheduling %s task for immediate execution",
144 (void *)task,
145 task->type_tag);
146
147 task->priority_queue_node.current_index = SIZE_MAX;
148 aws_linked_list_node_reset(&task->node);
149 task->timestamp = 0;
150
151 aws_linked_list_push_back(&scheduler->asap_list, &task->node);
152}
153
154void aws_task_scheduler_schedule_future(
155 struct aws_task_scheduler *scheduler,
156 struct aws_task *task,
157 uint64_t time_to_run) {
158
159 AWS_ASSERT(scheduler);
160 AWS_ASSERT(task);
161 AWS_ASSERT(task->fn);
162
163 AWS_LOGF_DEBUG(
164 AWS_LS_COMMON_TASK_SCHEDULER,
165 "id=%p: Scheduling %s task for future execution at time %" PRIu64,
166 (void *)task,
167 task->type_tag,
168 time_to_run);
169
170 task->timestamp = time_to_run;
171
172 task->priority_queue_node.current_index = SIZE_MAX;
173 aws_linked_list_node_reset(&task->node);
174 int err = aws_priority_queue_push_ref(&scheduler->timed_queue, &task, &task->priority_queue_node);
175 if (AWS_UNLIKELY(err)) {
176 /* In the (very unlikely) case that we can't push into the timed_queue,
177 * perform a sorted insertion into timed_list. */
178 struct aws_linked_list_node *node_i;
179 for (node_i = aws_linked_list_begin(&scheduler->timed_list);
180 node_i != aws_linked_list_end(&scheduler->timed_list);
181 node_i = aws_linked_list_next(node_i)) {
182
183 struct aws_task *task_i = AWS_CONTAINER_OF(node_i, struct aws_task, node);
184 if (task_i->timestamp > time_to_run) {
185 break;
186 }
187 }
188 aws_linked_list_insert_before(node_i, &task->node);
189 }
190}
191
192void aws_task_scheduler_run_all(struct aws_task_scheduler *scheduler, uint64_t current_time) {
193 AWS_ASSERT(scheduler);
194
195 s_run_all(scheduler, current_time, AWS_TASK_STATUS_RUN_READY);
196}
197
198static void s_run_all(struct aws_task_scheduler *scheduler, uint64_t current_time, enum aws_task_status status) {
199
200 /* Move scheduled tasks to running_list before executing.
201 * This gives us the desired behavior that: if executing a task results in another task being scheduled,
202 * that new task is not executed until the next time run() is invoked. */
203 struct aws_linked_list running_list;
204 aws_linked_list_init(&running_list);
205
206 /* First move everything from asap_list */
207 aws_linked_list_swap_contents(&running_list, &scheduler->asap_list);
208
209 /* Next move tasks from timed_queue and timed_list, based on whichever's next-task is sooner.
210 * It's very unlikely that any tasks are in timed_list, so once it has no more valid tasks,
211 * break out of this complex loop in favor of a simpler one. */
212 while (AWS_UNLIKELY(!aws_linked_list_empty(&scheduler->timed_list))) {
213
214 struct aws_linked_list_node *timed_list_node = aws_linked_list_begin(&scheduler->timed_list);
215 struct aws_task *timed_list_task = AWS_CONTAINER_OF(timed_list_node, struct aws_task, node);
216 if (timed_list_task->timestamp > current_time) {
217 /* timed_list is out of valid tasks, break out of complex loop */
218 break;
219 }
220
221 /* Check if timed_queue has a task which is sooner */
222 struct aws_task **timed_queue_task_ptrptr = NULL;
223 if (aws_priority_queue_top(&scheduler->timed_queue, (void **)&timed_queue_task_ptrptr) == AWS_OP_SUCCESS) {
224 if ((*timed_queue_task_ptrptr)->timestamp <= current_time) {
225 if ((*timed_queue_task_ptrptr)->timestamp < timed_list_task->timestamp) {
226 /* Take task from timed_queue */
227 struct aws_task *timed_queue_task;
228 aws_priority_queue_pop(&scheduler->timed_queue, &timed_queue_task);
229 aws_linked_list_push_back(&running_list, &timed_queue_task->node);
230 continue;
231 }
232 }
233 }
234
235 /* Take task from timed_list */
236 aws_linked_list_pop_front(&scheduler->timed_list);
237 aws_linked_list_push_back(&running_list, &timed_list_task->node);
238 }
239
240 /* Simpler loop that moves remaining valid tasks from timed_queue */
241 struct aws_task **timed_queue_task_ptrptr = NULL;
242 while (aws_priority_queue_top(&scheduler->timed_queue, (void **)&timed_queue_task_ptrptr) == AWS_OP_SUCCESS) {
243 if ((*timed_queue_task_ptrptr)->timestamp > current_time) {
244 break;
245 }
246
247 struct aws_task *next_timed_task;
248 aws_priority_queue_pop(&scheduler->timed_queue, &next_timed_task);
249 aws_linked_list_push_back(&running_list, &next_timed_task->node);
250 }
251
252 /* Run tasks */
253 while (!aws_linked_list_empty(&running_list)) {
254 struct aws_linked_list_node *task_node = aws_linked_list_pop_front(&running_list);
255 struct aws_task *task = AWS_CONTAINER_OF(task_node, struct aws_task, node);
256 aws_task_run(task, status);
257 }
258}
259
260void aws_task_scheduler_cancel_task(struct aws_task_scheduler *scheduler, struct aws_task *task) {
261 /* attempt the linked lists first since those will be faster access and more likely to occur
262 * anyways.
263 */
264 if (task->node.next) {
265 aws_linked_list_remove(&task->node);
266 } else {
267 aws_priority_queue_remove(&scheduler->timed_queue, &task, &task->priority_queue_node);
268 }
269
270 /*
271 * No need to log cancellation specially; it will get logged during the run call with the canceled status
272 */
273 aws_task_run(task, AWS_TASK_STATUS_CANCELED);
274}
275