1/*
2Copyright (c) 2012, Broadcom Europe Ltd
3All rights reserved.
4
5Redistribution and use in source and binary forms, with or without
6modification, are permitted provided that the following conditions are met:
7 * Redistributions of source code must retain the above copyright
8 notice, this list of conditions and the following disclaimer.
9 * Redistributions in binary form must reproduce the above copyright
10 notice, this list of conditions and the following disclaimer in the
11 documentation and/or other materials provided with the distribution.
12 * Neither the name of the copyright holder nor the
13 names of its contributors may be used to endorse or promote products
14 derived from this software without specific prior written permission.
15
16THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
17ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
19DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY
20DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
21(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
22LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
23ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26*/
27
28#include "vcos.h"
29#include "vcos_msgqueue.h"
30#include <stddef.h>
31#include <string.h>
32#include <stdio.h>
33
34#define MAGIC VCOS_MSGQ_MAGIC
35
36/* Probably a good idea for MSG_T to be multiple of 8 so that doubles
37 * are naturally aligned without problem.
38 */
39vcos_static_assert((sizeof(VCOS_MSG_T) & 7) == 0);
40
41static void vcos_msgq_pool_on_reply(VCOS_MSG_WAITER_T *waiter,
42 VCOS_MSG_T *msg);
43static void vcos_msgq_queue_waiter_on_reply(VCOS_MSG_WAITER_T *waiter,
44 VCOS_MSG_T *msg);
45
46/** Simple reply protocol. The client creates a semaphore and waits
47 * for it. No queuing of multiple replies is possible but nothing needs
48 * to be setup in advance. Because creating semaphores is very fast on
49 * VideoCore there's no need to do anything elaborate to optimize create
50 * time - this might need revisiting on other platforms.
51 */
52
53typedef struct
54{
55 VCOS_MSG_WAITER_T waiter;
56 VCOS_SEMAPHORE_T waitsem;
57} VCOS_MSG_SIMPLE_WAITER_T;
58
59static void vcos_msgq_simple_waiter_on_reply(VCOS_MSG_WAITER_T *waiter,
60 VCOS_MSG_T *msg)
61{
62 VCOS_MSG_SIMPLE_WAITER_T *self;
63 (void)msg;
64 self = (VCOS_MSG_SIMPLE_WAITER_T*)waiter;
65 vcos_semaphore_post(&self->waitsem);
66}
67
68static VCOS_STATUS_T vcos_msgq_simple_waiter_init(VCOS_MSG_SIMPLE_WAITER_T *waiter)
69{
70 VCOS_STATUS_T status;
71 status = vcos_semaphore_create(&waiter->waitsem, "waiter", 0);
72 waiter->waiter.on_reply = vcos_msgq_simple_waiter_on_reply;
73 return status;
74}
75
76static void vcos_msgq_simple_waiter_deinit(VCOS_MSG_SIMPLE_WAITER_T *waiter)
77{
78 vcos_semaphore_delete(&waiter->waitsem);
79}
80
81/*
82 * Message queues
83 */
84
85static VCOS_STATUS_T vcos_msgq_create_internal(VCOS_MSGQUEUE_T *q, const char *name)
86{
87 VCOS_STATUS_T st;
88
89 memset(q, 0, sizeof(*q));
90
91 q->waiter.on_reply = vcos_msgq_queue_waiter_on_reply;
92 st = vcos_semaphore_create(&q->sem, name, 0);
93 if (st != VCOS_SUCCESS)
94 goto fail_sem;
95
96 st = vcos_mutex_create(&q->lock, name);
97 if (st != VCOS_SUCCESS)
98 goto fail_mtx;
99
100 return st;
101
102fail_mtx:
103 vcos_semaphore_delete(&q->sem);
104fail_sem:
105 return st;
106}
107
108static void vcos_msgq_delete_internal(VCOS_MSGQUEUE_T *q)
109{
110 vcos_semaphore_delete(&q->sem);
111 vcos_mutex_delete(&q->lock);
112}
113
114VCOS_STATUS_T vcos_msgq_create(VCOS_MSGQUEUE_T *q, const char *name)
115{
116 VCOS_STATUS_T st;
117
118 st = vcos_msgq_create_internal(q, name);
119
120 return st;
121}
122
123void vcos_msgq_delete(VCOS_MSGQUEUE_T *q)
124{
125 vcos_msgq_delete_internal(q);
126}
127
128/* append a message to a message queue */
129static _VCOS_INLINE void msgq_append(VCOS_MSGQUEUE_T *q, VCOS_MSG_T *msg)
130{
131 vcos_mutex_lock(&q->lock);
132 if (q->head == NULL)
133 {
134 q->head = q->tail = msg;
135 }
136 else
137 {
138 q->tail->next = msg;
139 q->tail = msg;
140 }
141 vcos_mutex_unlock(&q->lock);
142}
143
144/*
145 * A waiter for a message queue. Just appends the message to the
146 * queue, waking up the waiting thread.
147 */
148static void vcos_msgq_queue_waiter_on_reply(VCOS_MSG_WAITER_T *waiter,
149 VCOS_MSG_T *msg)
150{
151 VCOS_MSGQUEUE_T *queue = (VCOS_MSGQUEUE_T*)waiter;
152 msgq_append(queue, msg);
153 vcos_semaphore_post(&queue->sem);
154}
155
156/* initialise this library */
157
158VCOS_STATUS_T vcos_msgq_init(void)
159{
160 return VCOS_SUCCESS;
161}
162
163void vcos_msgq_deinit(void)
164{
165}
166
167static _VCOS_INLINE
168void vcos_msg_send_helper(VCOS_MSG_WAITER_T *waiter,
169 VCOS_MSGQUEUE_T *dest,
170 uint32_t code,
171 VCOS_MSG_T *msg)
172{
173 vcos_assert(msg);
174 vcos_assert(dest);
175
176 msg->code = code;
177 if (waiter)
178 msg->waiter = waiter;
179 msg->next = NULL;
180 msg->src_thread = vcos_thread_current();
181
182 msgq_append(dest, msg);
183 vcos_semaphore_post(&dest->sem);
184}
185
186/* wait on a queue for a message */
187VCOS_MSG_T *vcos_msg_wait(VCOS_MSGQUEUE_T *queue)
188{
189 VCOS_MSG_T *msg;
190 vcos_semaphore_wait(&queue->sem);
191 vcos_mutex_lock(&queue->lock);
192
193 msg = queue->head;
194 vcos_assert(msg); /* should always be a message here! */
195
196 queue->head = msg->next;
197 if (queue->head == NULL)
198 queue->tail = NULL;
199
200 vcos_mutex_unlock(&queue->lock);
201 return msg;
202}
203
204/* peek on a queue for a message */
205VCOS_MSG_T *vcos_msg_peek(VCOS_MSGQUEUE_T *queue)
206{
207 VCOS_MSG_T *msg;
208 vcos_mutex_lock(&queue->lock);
209
210 msg = queue->head;
211
212 /* if there's a message, remove it from the queue */
213 if (msg)
214 {
215 queue->head = msg->next;
216 if (queue->head == NULL)
217 queue->tail = NULL;
218
219 /* keep the semaphore count consistent */
220
221 /* coverity[lock_order]
222 * the semaphore must have a non-zero count so cannot block here.
223 */
224 vcos_semaphore_wait(&queue->sem);
225 }
226
227 vcos_mutex_unlock(&queue->lock);
228 return msg;
229}
230
231void vcos_msg_send(VCOS_MSGQUEUE_T *dest, uint32_t code, VCOS_MSG_T *msg)
232{
233 vcos_assert(msg->magic == MAGIC);
234 vcos_msg_send_helper(NULL, dest, code, msg);
235}
236
237/** Send on to the target queue, then wait on a simple waiter for the reply
238 */
239VCOS_STATUS_T vcos_msg_sendwait(VCOS_MSGQUEUE_T *dest, uint32_t code, VCOS_MSG_T *msg)
240{
241 VCOS_STATUS_T st;
242 VCOS_MSG_SIMPLE_WAITER_T waiter;
243
244 vcos_assert(msg->magic == MAGIC);
245
246 /* if this fires, you've set a waiter up but are now about to obliterate it
247 * with the 'wait for a reply' waiter.
248 */
249 vcos_assert(msg->waiter == NULL);
250
251 if ((st=vcos_msgq_simple_waiter_init(&waiter)) != VCOS_SUCCESS)
252 return st;
253
254 vcos_msg_send_helper(&waiter.waiter, dest, code, msg);
255 vcos_semaphore_wait(&waiter.waitsem);
256 vcos_msgq_simple_waiter_deinit(&waiter);
257
258 return VCOS_SUCCESS;
259}
260
261/** Send a reply to a message
262 */
263void vcos_msg_reply(VCOS_MSG_T *msg)
264{
265 vcos_assert(msg->magic == MAGIC);
266 msg->code |= MSG_REPLY_BIT;
267 if (msg->waiter)
268 {
269 msg->waiter->on_reply(msg->waiter, msg);
270 }
271 else
272 {
273 VCOS_ALERT("%s: reply to non-reply message id %d",
274 VCOS_FUNCTION,
275 msg->code);
276 vcos_assert(0);
277 }
278}
279
280void vcos_msg_set_source(VCOS_MSG_T *msg, VCOS_MSGQUEUE_T *queue)
281{
282 vcos_assert(msg);
283 vcos_assert(msg->magic == MAGIC);
284 vcos_assert(queue);
285 msg->waiter = &queue->waiter;
286}
287
288/*
289 * Message pools
290 */
291
292VCOS_STATUS_T vcos_msgq_pool_create(VCOS_MSGQ_POOL_T *pool,
293 size_t count,
294 size_t payload_size,
295 const char *name)
296{
297 VCOS_STATUS_T status;
298 int bp_size = payload_size + sizeof(VCOS_MSG_T);
299 status = vcos_blockpool_create_on_heap(&pool->blockpool,
300 count, bp_size,
301 VCOS_BLOCKPOOL_ALIGN_DEFAULT,
302 0,
303 name);
304 if (status != VCOS_SUCCESS)
305 goto fail_pool;
306
307 status = vcos_semaphore_create(&pool->sem, name, count);
308 if (status != VCOS_SUCCESS)
309 goto fail_sem;
310
311 pool->waiter.on_reply = vcos_msgq_pool_on_reply;
312 pool->magic = MAGIC;
313 return status;
314
315fail_sem:
316 vcos_blockpool_delete(&pool->blockpool);
317fail_pool:
318 return status;
319}
320
321void vcos_msgq_pool_delete(VCOS_MSGQ_POOL_T *pool)
322{
323 vcos_blockpool_delete(&pool->blockpool);
324 vcos_semaphore_delete(&pool->sem);
325}
326
327/** Called when a message from a pool is replied-to. Just returns
328 * the message back to the blockpool.
329 */
330static void vcos_msgq_pool_on_reply(VCOS_MSG_WAITER_T *waiter,
331 VCOS_MSG_T *msg)
332{
333 vcos_unused(waiter);
334 vcos_assert(msg->magic == MAGIC);
335 vcos_msgq_pool_free(msg);
336}
337
338VCOS_MSG_T *vcos_msgq_pool_alloc(VCOS_MSGQ_POOL_T *pool)
339{
340 VCOS_MSG_T *msg;
341 if (vcos_semaphore_trywait(&pool->sem) == VCOS_SUCCESS)
342 {
343 msg = vcos_blockpool_calloc(&pool->blockpool);
344 vcos_assert(msg);
345 msg->magic = MAGIC;
346 msg->waiter = &pool->waiter;
347 msg->pool = pool;
348 }
349 else
350 {
351 msg = NULL;
352 }
353 return msg;
354}
355
356void vcos_msgq_pool_free(VCOS_MSG_T *msg)
357{
358 if (msg)
359 {
360 VCOS_MSGQ_POOL_T *pool;
361 vcos_assert(msg->pool);
362
363 pool = msg->pool;
364 vcos_assert(msg->pool->magic == MAGIC);
365
366 vcos_blockpool_free(msg);
367 vcos_semaphore_post(&pool->sem);
368 }
369}
370
371VCOS_MSG_T *vcos_msgq_pool_wait(VCOS_MSGQ_POOL_T *pool)
372{
373 VCOS_MSG_T *msg;
374 vcos_semaphore_wait(&pool->sem);
375 msg = vcos_blockpool_calloc(&pool->blockpool);
376 vcos_assert(msg);
377 msg->magic = MAGIC;
378 msg->waiter = &pool->waiter;
379 msg->pool = pool;
380 return msg;
381}
382
383void vcos_msg_init(VCOS_MSG_T *msg)
384{
385 msg->magic = MAGIC;
386 msg->next = NULL;
387 msg->waiter = NULL;
388 msg->pool = NULL;
389}
390