1/*
2Copyright (c) 2012, Broadcom Europe Ltd
3All rights reserved.
4
5Redistribution and use in source and binary forms, with or without
6modification, are permitted provided that the following conditions are met:
7 * Redistributions of source code must retain the above copyright
8 notice, this list of conditions and the following disclaimer.
9 * Redistributions in binary form must reproduce the above copyright
10 notice, this list of conditions and the following disclaimer in the
11 documentation and/or other materials provided with the distribution.
12 * Neither the name of the copyright holder nor the
13 names of its contributors may be used to endorse or promote products
14 derived from this software without specific prior written permission.
15
16THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
17ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
19DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY
20DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
21(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
22LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
23ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26*/
27
28#include <stdlib.h>
29#include <string.h>
30#include <stdio.h>
31
32#include "containers/containers.h"
33#include "containers/core/containers_io.h"
34#include "containers/core/containers_common.h"
35#include "containers/core/containers_utils.h"
36#include "containers/core/containers_uri.h"
37
38#define MAX_NUM_CACHED_AREAS 16
39#define MAX_NUM_MEMORY_AREAS 4
40#define NUM_TMP_MEMORY_AREAS 2
41#define MEM_CACHE_READ_MAX_SIZE (32*1024) /* Needs to be a power of 2 */
42#define MEM_CACHE_WRITE_MAX_SIZE (128*1024) /* Needs to be a power of 2 */
43#define MEM_CACHE_TMP_MAX_SIZE (32*1024) /* Needs to be a power of 2 */
44#define MEM_CACHE_ALIGNMENT (1*1024) /* Needs to be a power of 2 */
45#define MEM_CACHE_AREA_READ_MAX_SIZE (4*1024*1024) /* Needs to be a power of 2 */
46
47typedef struct VC_CONTAINER_IO_PRIVATE_CACHE_T
48{
49 int64_t start; /**< Offset to the start of the cached area in the stream */
50 int64_t end; /**< Offset to the end of the cached area in the stream */
51
52 int64_t offset; /**< Offset of the currently cached data in the stream */
53 size_t size; /**< Size of the cached area */
54 bool dirty; /**< Whether the cache is dirty and needs to be written back */
55
56 size_t position; /**< Current position in the cache */
57
58 uint8_t *buffer; /**< Pointer to the start of the valid cache area */
59 uint8_t *buffer_end; /**< Pointer to the end of the cache */
60
61 unsigned int mem_max_size; /**< Maximum size of the memory cache */
62 unsigned int mem_size; /**< Size of the memory cache */
63 uint8_t *mem; /**< Pointer to the memory cache */
64
65 VC_CONTAINER_IO_T *io;
66
67} VC_CONTAINER_IO_PRIVATE_CACHE_T;
68
69typedef struct VC_CONTAINER_IO_PRIVATE_T
70{
71 VC_CONTAINER_IO_PRIVATE_CACHE_T *cache; /**< Current cache */
72
73 unsigned int caches_num;
74 VC_CONTAINER_IO_PRIVATE_CACHE_T caches;
75
76 unsigned int cached_areas_num;
77 VC_CONTAINER_IO_PRIVATE_CACHE_T cached_areas[MAX_NUM_CACHED_AREAS];
78
79 int64_t actual_offset;
80
81 struct VC_CONTAINER_IO_ASYNC_T *async_io;
82
83} VC_CONTAINER_IO_PRIVATE_T;
84
85/*****************************************************************************/
86VC_CONTAINER_STATUS_T vc_container_io_file_open( VC_CONTAINER_IO_T *p_ctx, const char *uri,
87 VC_CONTAINER_IO_MODE_T mode );
88VC_CONTAINER_STATUS_T vc_container_io_null_open( VC_CONTAINER_IO_T *p_ctx, const char *uri,
89 VC_CONTAINER_IO_MODE_T mode );
90VC_CONTAINER_STATUS_T vc_container_io_net_open( VC_CONTAINER_IO_T *p_ctx, const char *uri,
91 VC_CONTAINER_IO_MODE_T mode );
92VC_CONTAINER_STATUS_T vc_container_io_pktfile_open( VC_CONTAINER_IO_T *p_ctx, const char *uri,
93 VC_CONTAINER_IO_MODE_T mode );
94VC_CONTAINER_STATUS_T vc_container_io_http_open( VC_CONTAINER_IO_T *p_ctx, const char *uri,
95 VC_CONTAINER_IO_MODE_T mode );
96static VC_CONTAINER_STATUS_T io_seek_not_seekable(VC_CONTAINER_IO_T *p_ctx, int64_t offset);
97
98static size_t vc_container_io_cache_read( VC_CONTAINER_IO_T *p_ctx,
99 VC_CONTAINER_IO_PRIVATE_CACHE_T *cache, uint8_t *data, size_t size );
100static int32_t vc_container_io_cache_write( VC_CONTAINER_IO_T *p_ctx,
101 VC_CONTAINER_IO_PRIVATE_CACHE_T *cache, const uint8_t *data, size_t size );
102static VC_CONTAINER_STATUS_T vc_container_io_cache_seek( VC_CONTAINER_IO_T *p_ctx,
103 VC_CONTAINER_IO_PRIVATE_CACHE_T *cache, int64_t offset );
104static size_t vc_container_io_cache_refill( VC_CONTAINER_IO_T *p_ctx,
105 VC_CONTAINER_IO_PRIVATE_CACHE_T *cache );
106static size_t vc_container_io_cache_flush( VC_CONTAINER_IO_T *p_ctx,
107 VC_CONTAINER_IO_PRIVATE_CACHE_T *cache, int complete );
108
109static struct VC_CONTAINER_IO_ASYNC_T *async_io_start( VC_CONTAINER_IO_T *io, int num_areas, VC_CONTAINER_STATUS_T * );
110static VC_CONTAINER_STATUS_T async_io_stop( struct VC_CONTAINER_IO_ASYNC_T *ctx );
111static int async_io_write( struct VC_CONTAINER_IO_ASYNC_T *ctx, VC_CONTAINER_IO_PRIVATE_CACHE_T *cache );
112static VC_CONTAINER_STATUS_T async_io_wait_complete( struct VC_CONTAINER_IO_ASYNC_T *ctx,
113 VC_CONTAINER_IO_PRIVATE_CACHE_T *cache, int complete );
114static void async_io_stats_initialise( struct VC_CONTAINER_IO_ASYNC_T *ctx, int enable );
115static void async_io_stats_get( struct VC_CONTAINER_IO_ASYNC_T *ctx, VC_CONTAINER_WRITE_STATS_T *stats );
116
117/*****************************************************************************/
118static VC_CONTAINER_IO_T *vc_container_io_open_core( const char *uri, VC_CONTAINER_IO_MODE_T mode,
119 VC_CONTAINER_IO_CAPABILITIES_T capabilities,
120 bool b_open, VC_CONTAINER_STATUS_T *p_status )
121{
122 VC_CONTAINER_STATUS_T status = VC_CONTAINER_SUCCESS;
123 VC_CONTAINER_IO_T *p_ctx = 0;
124 VC_CONTAINER_IO_PRIVATE_T *private = 0;
125 unsigned int uri_length, caches = 0, cache_max_size, num_areas = MAX_NUM_MEMORY_AREAS;
126
127 /* XXX */
128 uri_length = strlen(uri) + 1;
129
130 /* Allocate our context before trying out the different io modules */
131 p_ctx = malloc( sizeof(*p_ctx) + sizeof(*private) + uri_length);
132 if(!p_ctx) { status = VC_CONTAINER_ERROR_OUT_OF_MEMORY; goto error; }
133 memset(p_ctx, 0, sizeof(*p_ctx) + sizeof(*private) + uri_length );
134 p_ctx->priv = private = (VC_CONTAINER_IO_PRIVATE_T *)&p_ctx[1];
135 p_ctx->uri = (char *)&private[1];
136 memcpy((char *)p_ctx->uri, uri, uri_length);
137 p_ctx->uri_parts = vc_uri_create();
138 if(!p_ctx->uri_parts) { status = VC_CONTAINER_ERROR_OUT_OF_MEMORY; goto error; }
139 vc_uri_parse(p_ctx->uri_parts, uri);
140
141 if (b_open)
142 {
143 /* Open the actual i/o module */
144 status = vc_container_io_null_open(p_ctx, uri, mode);
145 if(status) status = vc_container_io_net_open(p_ctx, uri, mode);
146 if(status) status = vc_container_io_pktfile_open(p_ctx, uri, mode);
147#ifdef ENABLE_CONTAINER_IO_HTTP
148 if(status) status = vc_container_io_http_open(p_ctx, uri, mode);
149#endif
150 if(status) status = vc_container_io_file_open(p_ctx, uri, mode);
151 if(status != VC_CONTAINER_SUCCESS) goto error;
152
153 if(!p_ctx->pf_seek || (p_ctx->capabilities & VC_CONTAINER_IO_CAPS_CANT_SEEK))
154 {
155 p_ctx->capabilities |= VC_CONTAINER_IO_CAPS_CANT_SEEK;
156 p_ctx->pf_seek = io_seek_not_seekable;
157 }
158 }
159 else
160 {
161 /* We're only creating an empty container i/o */
162 p_ctx->capabilities = capabilities;
163 }
164
165 if(p_ctx->capabilities & VC_CONTAINER_IO_CAPS_NO_CACHING)
166 caches = 1;
167
168 if(mode == VC_CONTAINER_IO_MODE_WRITE) cache_max_size = MEM_CACHE_WRITE_MAX_SIZE;
169 else cache_max_size = MEM_CACHE_READ_MAX_SIZE;
170
171 if(mode == VC_CONTAINER_IO_MODE_WRITE &&
172 vc_uri_path_extension(p_ctx->uri_parts) &&
173 !strcasecmp(vc_uri_path_extension(p_ctx->uri_parts), "tmp"))
174 {
175 caches = 1;
176 cache_max_size = MEM_CACHE_TMP_MAX_SIZE;
177 num_areas = NUM_TMP_MEMORY_AREAS;
178 }
179
180 /* Check if the I/O needs caching */
181 if(caches)
182 {
183 VC_CONTAINER_IO_PRIVATE_CACHE_T *cache = &p_ctx->priv->caches;
184 cache->mem_max_size = cache_max_size;
185 cache->mem_size = cache->mem_max_size;
186 cache->io = p_ctx;
187 cache->mem = malloc(p_ctx->priv->caches.mem_size);
188 if(cache->mem)
189 {
190 cache->buffer = cache->mem;
191 cache->buffer_end = cache->mem + cache->mem_size;
192 p_ctx->priv->caches_num = 1;
193 }
194 }
195
196 if(p_ctx->priv->caches_num)
197 p_ctx->priv->cache = &p_ctx->priv->caches;
198
199
200 /* Try to start an asynchronous io if we're in write mode and we've got at least 2 cache memory areas */
201 if(mode == VC_CONTAINER_IO_MODE_WRITE && p_ctx->priv->cache && num_areas >= 2)
202 p_ctx->priv->async_io = async_io_start( p_ctx, num_areas, 0 );
203
204 end:
205 if(p_status) *p_status = status;
206 return p_ctx;
207
208 error:
209 if(p_ctx) vc_uri_release(p_ctx->uri_parts);
210 if(p_ctx) free(p_ctx);
211 p_ctx = 0;
212 goto end;
213}
214
215/*****************************************************************************/
216VC_CONTAINER_IO_T *vc_container_io_open( const char *uri, VC_CONTAINER_IO_MODE_T mode,
217 VC_CONTAINER_STATUS_T *p_status )
218{
219 return vc_container_io_open_core( uri, mode, 0, true, p_status );
220}
221
222/*****************************************************************************/
223VC_CONTAINER_IO_T *vc_container_io_create( const char *uri, VC_CONTAINER_IO_MODE_T mode,
224 VC_CONTAINER_IO_CAPABILITIES_T capabilities,
225 VC_CONTAINER_STATUS_T *p_status )
226{
227 return vc_container_io_open_core( uri, mode, capabilities, false, p_status );
228}
229
230/*****************************************************************************/
231VC_CONTAINER_STATUS_T vc_container_io_close( VC_CONTAINER_IO_T *p_ctx )
232{
233 unsigned int i;
234
235 if(p_ctx)
236 {
237 if(p_ctx->priv)
238 {
239 if(p_ctx->priv->caches_num)
240 {
241 if(p_ctx->priv->caches.dirty)
242 vc_container_io_cache_flush( p_ctx, &p_ctx->priv->caches, 1 );
243 }
244
245 if(p_ctx->priv->async_io)
246 async_io_stop( p_ctx->priv->async_io );
247 else if(p_ctx->priv->caches_num)
248 free(p_ctx->priv->caches.mem);
249
250 for(i = 0; i < p_ctx->priv->cached_areas_num; i++)
251 free(p_ctx->priv->cached_areas[i].mem);
252
253 if(p_ctx->pf_close)
254 p_ctx->pf_close(p_ctx);
255 }
256 vc_uri_release(p_ctx->uri_parts);
257 free(p_ctx);
258 }
259 return VC_CONTAINER_SUCCESS;
260}
261
262/*****************************************************************************/
263size_t vc_container_io_peek(VC_CONTAINER_IO_T *p_ctx, void *buffer, size_t size)
264{
265 size_t ret;
266
267 if(p_ctx->priv->cache)
268 {
269 /* FIXME: do something a bit more clever than this */
270 int64_t offset = p_ctx->offset;
271 ret = vc_container_io_read(p_ctx, buffer, size);
272 vc_container_io_seek(p_ctx, offset);
273 return ret;
274 }
275
276 if (p_ctx->capabilities & VC_CONTAINER_IO_CAPS_CANT_SEEK)
277 return 0;
278
279 ret = p_ctx->pf_read(p_ctx, buffer, size);
280 p_ctx->pf_seek(p_ctx, p_ctx->offset);
281 return ret;
282}
283
284/*****************************************************************************/
285size_t vc_container_io_read(VC_CONTAINER_IO_T *p_ctx, void *buffer, size_t size)
286{
287 size_t ret;
288
289 if(p_ctx->priv->cache)
290 ret = vc_container_io_cache_read( p_ctx, p_ctx->priv->cache, (uint8_t*)buffer, size );
291 else
292 {
293 ret = p_ctx->pf_read(p_ctx, buffer, size);
294 p_ctx->priv->actual_offset += ret;
295 }
296
297 p_ctx->offset += ret;
298 return ret;
299}
300
301/*****************************************************************************/
302size_t vc_container_io_write(VC_CONTAINER_IO_T *p_ctx, const void *buffer, size_t size)
303{
304 int32_t ret;
305
306 if(p_ctx->priv->cache)
307 ret = vc_container_io_cache_write( p_ctx, p_ctx->priv->cache, (const uint8_t*)buffer, size );
308 else
309 {
310 ret = p_ctx->pf_write(p_ctx, buffer, size);
311 p_ctx->priv->actual_offset += ret;
312 }
313
314 p_ctx->offset += ret;
315 return ret < 0 ? 0 : ret;
316}
317
318/*****************************************************************************/
319size_t vc_container_io_skip(VC_CONTAINER_IO_T *p_ctx, size_t size)
320{
321 if(!size) return 0;
322
323 if(size < 8)
324 {
325 uint8_t value[8];
326 return vc_container_io_read(p_ctx, value, size);
327 }
328
329 if(p_ctx->priv->cache)
330 {
331 if(vc_container_io_cache_seek(p_ctx, p_ctx->priv->cache, p_ctx->offset + size)) return 0;
332 p_ctx->offset += size;
333 return size;
334 }
335
336 if(vc_container_io_seek(p_ctx, p_ctx->offset + size)) return 0;
337 return size;
338}
339
340/*****************************************************************************/
341VC_CONTAINER_STATUS_T vc_container_io_seek(VC_CONTAINER_IO_T *p_ctx, int64_t offset)
342{
343 VC_CONTAINER_STATUS_T status;
344 unsigned int i;
345
346 /* Check if the requested position is in one of the cached areas */
347 for(i = 0; i < p_ctx->priv->cached_areas_num; i++)
348 {
349 VC_CONTAINER_IO_PRIVATE_CACHE_T *cache = &p_ctx->priv->cached_areas[i];
350 if(offset >= cache->start && offset < cache->end)
351 {
352 p_ctx->priv->cache = cache;
353 break;
354 }
355 }
356 if(i == p_ctx->priv->cached_areas_num)
357 p_ctx->priv->cache = p_ctx->priv->caches_num ? &p_ctx->priv->caches : 0;
358
359 if(p_ctx->priv->cache)
360 {
361 status = vc_container_io_cache_seek( p_ctx, p_ctx->priv->cache, offset );
362 if(status == VC_CONTAINER_SUCCESS) p_ctx->offset = offset;
363 return status;
364 }
365
366 if(p_ctx->status == VC_CONTAINER_SUCCESS &&
367 offset == p_ctx->offset) return VC_CONTAINER_SUCCESS;
368
369 status = p_ctx->pf_seek(p_ctx, offset);
370 if(status == VC_CONTAINER_SUCCESS) p_ctx->offset = offset;
371 p_ctx->priv->actual_offset = p_ctx->offset;
372 return status;
373}
374
375/*****************************************************************************/
376static VC_CONTAINER_STATUS_T io_seek_not_seekable(VC_CONTAINER_IO_T *p_ctx, int64_t offset)
377{
378 VC_CONTAINER_IO_PRIVATE_T *private = p_ctx->priv;
379
380 vc_container_assert(offset >= private->actual_offset);
381 if(offset == private->actual_offset) return VC_CONTAINER_SUCCESS;
382
383 if(offset < private->actual_offset)
384 {
385 p_ctx->status = VC_CONTAINER_ERROR_EOS;
386 return p_ctx->status;
387 }
388
389 offset -= private->actual_offset;
390 while(offset && !p_ctx->status)
391 {
392 uint8_t value[64];
393 unsigned int ret, size = MIN(offset, 64);
394 ret = p_ctx->pf_read(p_ctx, value, size);
395 if(ret != size) p_ctx->status = VC_CONTAINER_ERROR_EOS;
396 offset -= ret;
397 }
398 return p_ctx->status;
399}
400
401/*****************************************************************************/
402VC_CONTAINER_STATUS_T vc_container_io_control_list(VC_CONTAINER_IO_T *context, VC_CONTAINER_CONTROL_T operation, va_list args)
403{
404 VC_CONTAINER_STATUS_T status = VC_CONTAINER_ERROR_UNSUPPORTED_OPERATION;
405
406 if (context->pf_control)
407 status = context->pf_control(context, operation, args);
408
409 /* Option to add generic I/O control here */
410
411 if(operation == VC_CONTAINER_CONTROL_IO_FLUSH && context->priv->cache)
412 {
413 status = VC_CONTAINER_SUCCESS;
414 (void)vc_container_io_cache_flush( context, context->priv->cache, 1 );
415 }
416
417 if(operation == VC_CONTAINER_CONTROL_SET_IO_PERF_STATS && context->priv->async_io)
418 {
419 status = VC_CONTAINER_SUCCESS;
420 async_io_stats_initialise(context->priv->async_io, va_arg(args, int));
421 }
422
423 if(operation == VC_CONTAINER_CONTROL_GET_IO_PERF_STATS && context->priv->async_io)
424 {
425 status = VC_CONTAINER_SUCCESS;
426 async_io_stats_get(context->priv->async_io, va_arg(args, VC_CONTAINER_WRITE_STATS_T *));
427 }
428
429 return status;
430}
431
432/*****************************************************************************/
433VC_CONTAINER_STATUS_T vc_container_io_control(VC_CONTAINER_IO_T *context, VC_CONTAINER_CONTROL_T operation, ...)
434{
435 VC_CONTAINER_STATUS_T result;
436 va_list args;
437
438 va_start(args, operation);
439 result = vc_container_io_control_list(context, operation, args);
440 va_end(args);
441
442 return result;
443}
444
445/*****************************************************************************/
446size_t vc_container_io_cache(VC_CONTAINER_IO_T *p_ctx, size_t size)
447{
448 VC_CONTAINER_IO_PRIVATE_T *private = p_ctx->priv;
449 VC_CONTAINER_IO_PRIVATE_CACHE_T *cache, *main_cache;
450 VC_CONTAINER_STATUS_T status;
451
452 /* Sanity checking */
453 if(private->cached_areas_num >= MAX_NUM_CACHED_AREAS) return 0;
454
455 cache = &private->cached_areas[private->cached_areas_num];
456 cache->start = p_ctx->offset;
457 cache->end = cache->start + size;
458 cache->offset = p_ctx->offset;
459 cache->position = 0;
460 cache->size = 0;
461 cache->io = p_ctx;
462
463 /* Set the size of the cache area depending on the capabilities of the i/o */
464 if(p_ctx->capabilities & VC_CONTAINER_IO_CAPS_CANT_SEEK)
465 cache->mem_max_size = MEM_CACHE_AREA_READ_MAX_SIZE;
466 else if((p_ctx->capabilities & VC_CONTAINER_IO_CAPS_SEEK_SLOW) &&
467 size <= MEM_CACHE_AREA_READ_MAX_SIZE)
468 cache->mem_max_size = MEM_CACHE_AREA_READ_MAX_SIZE;
469 else
470 cache->mem_max_size = MEM_CACHE_READ_MAX_SIZE;
471
472 cache->mem_size = size;
473 if(cache->mem_size > cache->mem_max_size) cache->mem_size = cache->mem_max_size;
474 cache->mem = malloc(cache->mem_size);
475
476 cache->buffer = cache->mem;
477 cache->buffer_end = cache->mem + cache->mem_size;
478
479 if(!cache->mem) return 0;
480 private->cached_areas_num++;
481
482 /* Copy any data we've got in the current cache into the new cache */
483 main_cache = p_ctx->priv->cache;
484 if(main_cache && main_cache->position < main_cache->size)
485 {
486 cache->size = main_cache->size - main_cache->position;
487 if(cache->size > cache->mem_size) cache->size = cache->mem_size;
488 memcpy(cache->buffer, main_cache->buffer + main_cache->position, cache->size);
489 main_cache->position += cache->size;
490 }
491
492 /* Read the rest of the cache directly from the stream */
493 if(cache->mem_size > cache->size)
494 {
495 size_t ret = cache->io->pf_read(cache->io, cache->buffer + cache->size,
496 cache->mem_size - cache->size);
497 cache->size += ret;
498 cache->io->priv->actual_offset = cache->offset + cache->size;
499 }
500
501 status = vc_container_io_seek(p_ctx, cache->end);
502 if(status != VC_CONTAINER_SUCCESS)
503 return 0;
504
505 if(p_ctx->capabilities & VC_CONTAINER_IO_CAPS_CANT_SEEK)
506 return cache->size;
507 else
508 return size;
509}
510
511/*****************************************************************************/
512static size_t vc_container_io_cache_refill( VC_CONTAINER_IO_T *p_ctx,
513 VC_CONTAINER_IO_PRIVATE_CACHE_T *cache )
514{
515 size_t ret = vc_container_io_cache_flush( p_ctx, cache, 1 );
516
517 if(ret) return 0; /* TODO what should we do there ? */
518
519 if(p_ctx->priv->actual_offset != cache->offset)
520 {
521 if(cache->io->pf_seek(cache->io, cache->offset) != VC_CONTAINER_SUCCESS)
522 return 0;
523 }
524
525 ret = cache->io->pf_read(cache->io, cache->buffer, cache->buffer_end - cache->buffer);
526 cache->size = ret;
527 cache->position = 0;
528 cache->io->priv->actual_offset = cache->offset + ret;
529 return ret;
530}
531
532/*****************************************************************************/
533static size_t vc_container_io_cache_refill_bypass( VC_CONTAINER_IO_T *p_ctx,
534 VC_CONTAINER_IO_PRIVATE_CACHE_T *cache, uint8_t *buffer, size_t size )
535{
536 size_t ret = vc_container_io_cache_flush( p_ctx, cache, 1 );
537
538 if(ret) return 0; /* TODO what should we do there ? */
539
540 if(p_ctx->priv->actual_offset != cache->offset)
541 {
542 if(cache->io->pf_seek(cache->io, cache->offset) != VC_CONTAINER_SUCCESS)
543 return 0;
544 }
545
546 ret = cache->io->pf_read(cache->io, buffer, size);
547 cache->size = cache->position = 0;
548 cache->offset += ret;
549 cache->io->priv->actual_offset = cache->offset;
550 return ret;
551}
552
553/*****************************************************************************/
554static size_t vc_container_io_cache_read( VC_CONTAINER_IO_T *p_ctx,
555 VC_CONTAINER_IO_PRIVATE_CACHE_T *cache, uint8_t *data, size_t size )
556{
557 size_t read = 0, bytes, ret;
558
559 while(size)
560 {
561 bytes = cache->size - cache->position; /* Bytes left in cache */
562
563#if 1 // FIXME Only if stream is seekable
564 /* Try to read directly from the stream if the cache just gets in the way */
565 if(!bytes && size > cache->mem_size)
566 {
567 bytes = cache->mem_size;
568 ret = vc_container_io_cache_refill_bypass( p_ctx, cache, data + read, bytes);
569 read += ret;
570
571 if(ret != bytes) /* We didn't read as many bytes as we had hoped */
572 goto end;
573
574 size -= bytes;
575 continue;
576 }
577#endif
578
579 /* Refill the cache if it is empty */
580 if(!bytes) bytes = vc_container_io_cache_refill( p_ctx, cache );
581 if(!bytes) goto end;
582
583 /* We do have some data in the cache so override the status */
584 p_ctx->status = VC_CONTAINER_SUCCESS;
585
586 /* Read data directly from the cache */
587 if(bytes > size) bytes = size;
588 memcpy(data + read, cache->buffer + cache->position, bytes);
589 cache->position += bytes;
590 read += bytes;
591 size -= bytes;
592 }
593
594 end:
595 vc_container_assert(cache->offset + cache->position == p_ctx->offset + read);
596 return read;
597}
598
599/*****************************************************************************/
600static int32_t vc_container_io_cache_write( VC_CONTAINER_IO_T *p_ctx,
601 VC_CONTAINER_IO_PRIVATE_CACHE_T *cache, const uint8_t *data, size_t size )
602{
603 int32_t written = 0;
604 size_t bytes, ret;
605
606 /* If we do not have a write cache then we need to flush it */
607 if(cache->size && !cache->dirty)
608 {
609 ret = vc_container_io_cache_flush( p_ctx, cache, 1 );
610 if(ret) return -(int32_t)ret;
611 }
612
613 while(size)
614 {
615 bytes = (cache->buffer_end - cache->buffer) - cache->position; /* Space left in cache */
616
617 /* Flush the cache if it is full */
618 if(!bytes)
619 {
620 /* Cache full, flush it */
621 ret = vc_container_io_cache_flush( p_ctx, cache, 0 );
622 if(ret)
623 {
624 written -= ret;
625 return written;
626 }
627 continue;
628 }
629
630 if(bytes > size) bytes = size;
631
632 if(!p_ctx->priv->async_io && bytes == cache->mem_size)
633 {
634 /* Write directly from the buffer */
635 ret = cache->io->pf_write(cache->io, data + written, bytes);
636 cache->offset += ret;
637 cache->io->priv->actual_offset += ret;
638 }
639 else
640 {
641 /* Write in the cache */
642 memcpy(cache->buffer + cache->position, data + written, bytes);
643 cache->position += bytes;
644 cache->dirty = 1;
645 ret = bytes;
646 }
647
648 written += ret;
649 if(ret != bytes) goto end;
650
651 size -= bytes;
652 }
653
654 end:
655 vc_container_assert(cache->offset + (int64_t)cache->position == p_ctx->offset + written);
656 if(cache->position > cache->size) cache->size = cache->position;
657 return written;
658}
659
660/*****************************************************************************/
661static VC_CONTAINER_STATUS_T vc_container_io_cache_seek(VC_CONTAINER_IO_T *p_ctx,
662 VC_CONTAINER_IO_PRIVATE_CACHE_T *cache, int64_t offset)
663{
664 VC_CONTAINER_STATUS_T status;
665 size_t shift, ret;
666
667 /* Check if the seek position is within our cache */
668 if(offset >= cache->offset && offset < cache->offset + (int64_t)cache->size)
669 {
670 cache->position = offset - cache->offset;
671 return VC_CONTAINER_SUCCESS;
672 }
673
674 shift = cache->buffer - cache->mem;
675 if(!cache->dirty && shift && cache->size &&
676 offset >= cache->offset - (int64_t)shift && offset < cache->offset)
677 {
678 /* We need to refill the partial bit of the cache that we didn't take care of last time */
679 status = cache->io->pf_seek(cache->io, cache->offset - shift);
680 if(status != VC_CONTAINER_SUCCESS) return status;
681 cache->offset -= shift;
682 cache->buffer -= shift;
683
684 ret = cache->io->pf_read(cache->io, cache->buffer, shift);
685 vc_container_assert(ret == shift); /* FIXME: ret must = shift */
686 cache->size += shift;
687 cache->position = offset - cache->offset;
688 cache->io->priv->actual_offset = cache->offset + ret;
689 return VC_CONTAINER_SUCCESS;
690 }
691
692 if(cache->dirty) vc_container_io_cache_flush( p_ctx, cache, 1 );
693 // FIXME: what if all the data couldn't be flushed ?
694
695 if(p_ctx->priv->async_io) async_io_wait_complete( p_ctx->priv->async_io, cache, 1 );
696
697 status = cache->io->pf_seek(cache->io, offset);
698 if(status != VC_CONTAINER_SUCCESS) return status;
699
700 vc_container_io_cache_flush( p_ctx, cache, 1 );
701
702 cache->offset = offset;
703 cache->io->priv->actual_offset = offset;
704 return VC_CONTAINER_SUCCESS;
705}
706
707/*****************************************************************************/
708static size_t vc_container_io_cache_flush( VC_CONTAINER_IO_T *p_ctx,
709 VC_CONTAINER_IO_PRIVATE_CACHE_T *cache, int complete )
710{
711 size_t ret = 0, shift;
712
713 if(cache->position > cache->size) cache->size = cache->position;
714
715 if(cache->dirty && cache->size)
716 {
717 if(p_ctx->priv->actual_offset != cache->offset)
718 {
719 if(p_ctx->priv->async_io) async_io_wait_complete( p_ctx->priv->async_io, cache, complete );
720
721 if(cache->io->pf_seek(cache->io, cache->offset) != VC_CONTAINER_SUCCESS)
722 return 0;
723 }
724
725 if(p_ctx->priv->async_io)
726 {
727 ret = async_io_write( p_ctx->priv->async_io, cache );
728 if(async_io_wait_complete( p_ctx->priv->async_io, cache, complete ) != VC_CONTAINER_SUCCESS)
729 ret = 0;
730 }
731 else
732 ret = cache->io->pf_write(cache->io, cache->buffer, cache->size);
733
734 cache->io->priv->actual_offset = cache->offset + ret;
735 ret = cache->position - ret;
736 }
737 cache->dirty = 0;
738
739 cache->offset += cache->size;
740 if(cache->mem_size == cache->mem_max_size)
741 {
742 shift = cache->offset &(MEM_CACHE_ALIGNMENT-1);
743 cache->buffer = cache->mem + shift;
744 }
745
746 cache->position = cache->size = 0;
747 return ret;
748}
749
750/*****************************************************************************
751 * Asynchronous I/O.
752 * This is here to keep the I/O as busy as possible by allowing the writer
753 * to continue its work while the I/O is taking place in the background.
754 *****************************************************************************/
755
756#ifdef ENABLE_CONTAINERS_ASYNC_IO
757#include "vcos.h"
758
759#define NUMPC(c,n,s) ((c) < (1<<(s)) ? (n) : ((n) / (c >> (s))))
760
761static void stats_initialise(VC_CONTAINER_STATS_T *st, uint32_t shift)
762{
763 memset(st, 0, sizeof(VC_CONTAINER_STATS_T));
764 st->shift = shift;
765}
766
767static void stats_add_value(VC_CONTAINER_STATS_T *st, uint32_t count, uint32_t num)
768{
769 uint32_t numpc;
770 int i, j;
771
772 if(count == 0)
773 return;
774
775 numpc = NUMPC(count, num, st->shift);
776 // insert in the right place
777 i=0;
778 while(i < VC_CONTAINER_STATS_BINS && st->record[i].count != 0 && st->record[i].numpc > numpc)
779 i++;
780
781 if(st->record[i].count != 0 && st->record[i].numpc == numpc)
782 {
783 // equal numpc, can merge now
784 st->record[i].count += count;
785 st->record[i].num += num;
786 }
787 else
788 {
789 // shift higher records up
790 for(j=VC_CONTAINER_STATS_BINS; j>i; j--)
791 st->record[j] = st->record[j-1];
792
793 // write record in
794 st->record[i].count = count;
795 st->record[i].num = num;
796 st->record[i].numpc = numpc;
797
798 // if full, join the two closest records
799 if(st->record[VC_CONTAINER_STATS_BINS].count)
800 {
801 uint32_t min_diff = 0;
802 j = -1;
803
804 // find closest, based on difference between numpc
805 for(i=0; i<VC_CONTAINER_STATS_BINS; i++)
806 {
807 uint32_t diff = st->record[i].numpc - st->record[i+1].numpc;
808 if(j == -1 || diff < min_diff)
809 {
810 j = i;
811 min_diff = diff;
812 }
813 }
814
815 // merge these records
816 st->record[j].count += st->record[j+1].count;
817 st->record[j].num += st->record[j+1].num;
818 st->record[j].numpc = NUMPC(st->record[j].count, st->record[j].num, st->shift);
819
820 // shift down higher records
821 while(++j < VC_CONTAINER_STATS_BINS)
822 st->record[j] = st->record[j+1];
823
824 // zero the free top record
825 st->record[VC_CONTAINER_STATS_BINS].count = 0;
826 st->record[VC_CONTAINER_STATS_BINS].num = 0;
827 st->record[VC_CONTAINER_STATS_BINS].numpc = 0;
828 }
829 }
830}
831
832typedef struct VC_CONTAINER_IO_ASYNC_T
833{
834 VC_CONTAINER_IO_T *io;
835 VCOS_THREAD_T thread;
836 VCOS_SEMAPHORE_T spare_sema;
837 VCOS_SEMAPHORE_T queue_sema;
838 VCOS_EVENT_T wake_event;
839 int quit;
840
841 unsigned int num_area;
842 uint8_t *mem[MAX_NUM_MEMORY_AREAS]; /**< Base address of memory areas */
843 uint8_t *buffer[MAX_NUM_MEMORY_AREAS]; /**< When queued for writing, pointer to start of valid cache area */
844 size_t size[MAX_NUM_MEMORY_AREAS]; /**< When queued for writing, size of valid area to write */
845 unsigned int cur_area;
846
847 unsigned char stack[3000];
848 int error;
849
850 int stats_enable;
851 VC_CONTAINER_WRITE_STATS_T stats;
852
853} VC_CONTAINER_IO_ASYNC_T;
854
855/*****************************************************************************/
856static void async_io_stats_initialise( struct VC_CONTAINER_IO_ASYNC_T *ctx, int enable )
857{
858 ctx->stats_enable = enable;
859 stats_initialise(&ctx->stats.write, 8);
860 stats_initialise(&ctx->stats.wait, 0);
861 stats_initialise(&ctx->stats.flush, 0);
862}
863
864static void async_io_stats_get( struct VC_CONTAINER_IO_ASYNC_T *ctx, VC_CONTAINER_WRITE_STATS_T *stats )
865{
866 *stats = ctx->stats;
867}
868
869static void *async_io_thread(VOID *argv)
870{
871 VC_CONTAINER_IO_ASYNC_T *ctx = argv;
872 unsigned int write_area = 0;
873
874 while (1)
875 {
876 unsigned long time = 0;
877
878 vcos_event_wait(&ctx->wake_event);
879 if(ctx->quit) break;
880
881 while(vcos_semaphore_trywait(&ctx->queue_sema) == VCOS_SUCCESS)
882 {
883 uint8_t *buffer = ctx->buffer[write_area];
884 size_t size = ctx->size[write_area];
885
886 if(ctx->stats_enable)
887 time = vcos_getmicrosecs();
888
889 if(ctx->io->pf_write(ctx->io, buffer, size) != size)
890 ctx->error = 1;
891
892 if(ctx->stats_enable)
893 stats_add_value(&ctx->stats.write, size, vcos_getmicrosecs() - time);
894
895 /* Signal that the write is done */
896 vcos_semaphore_post(&ctx->spare_sema);
897
898 if(++write_area == ctx->num_area)
899 write_area = 0;
900 }
901 }
902
903 return NULL;
904}
905
906static int async_io_write( VC_CONTAINER_IO_ASYNC_T *ctx, VC_CONTAINER_IO_PRIVATE_CACHE_T *cache )
907{
908 unsigned long time = 0;
909 unsigned int offset;
910
911 if(ctx->stats_enable)
912 time = vcos_getmicrosecs();
913
914 /* post the current area */
915 ctx->buffer[ctx->cur_area] = cache->buffer;
916 ctx->size[ctx->cur_area] = cache->size;
917 vcos_semaphore_post(&ctx->queue_sema);
918 vcos_event_signal(&ctx->wake_event);
919
920 /* now we need to grab another area */
921 vcos_semaphore_wait(&ctx->spare_sema);
922 if(++ctx->cur_area == ctx->num_area)
923 ctx->cur_area = 0;
924
925 if(ctx->stats_enable)
926 stats_add_value(&ctx->stats.wait, 1, vcos_getmicrosecs() - time);
927
928 /* alter cache mem to point to the new cur_area */
929 offset = cache->buffer - cache->mem;
930 cache->mem = ctx->mem[ctx->cur_area];
931 cache->buffer = cache->mem + offset;
932 cache->buffer_end = cache->mem + cache->mem_size;
933
934 return ctx->error ? 0 : cache->size;
935}
936
937static VC_CONTAINER_STATUS_T async_io_wait_complete( struct VC_CONTAINER_IO_ASYNC_T *ctx,
938 VC_CONTAINER_IO_PRIVATE_CACHE_T *cache, int complete )
939{
940 unsigned int time = 0;
941
942 if(ctx->stats_enable)
943 time = vcos_getmicrosecs();
944
945 if(complete)
946 {
947 int num;
948 /* Need to make sure that all memory areas have been written out, so should have num-1 spare */
949 for(num=0; num<ctx->num_area-1; num++)
950 vcos_semaphore_wait(&ctx->spare_sema);
951
952 for(num=0; num<ctx->num_area-1; num++)
953 vcos_semaphore_post(&ctx->spare_sema);
954 }
955 else
956 {
957 /* Need to make sure we can acquire one memory area */
958 vcos_semaphore_wait(&ctx->spare_sema);
959 vcos_semaphore_post(&ctx->spare_sema);
960 }
961
962 if(ctx->stats_enable)
963 stats_add_value(&ctx->stats.flush, 1, vcos_getmicrosecs() - time);
964
965 return ctx->error ? VC_CONTAINER_ERROR_FAILED : VC_CONTAINER_SUCCESS;
966}
967
968static VC_CONTAINER_IO_ASYNC_T *async_io_start( VC_CONTAINER_IO_T *io, int num_areas, VC_CONTAINER_STATUS_T *status )
969{
970 VC_CONTAINER_IO_ASYNC_T *ctx = 0;
971 VCOS_UNSIGNED pri = 0;
972
973 /* Allocate our context */
974 ctx = malloc(sizeof(*ctx));
975 if(!ctx) goto error_spare_sema;
976 memset(ctx, 0, sizeof(*ctx));
977 ctx->io = io;
978
979 ctx->mem[0] = io->priv->cache->mem;
980
981 for(ctx->num_area = 1; ctx->num_area < num_areas; ctx->num_area++)
982 {
983 ctx->mem[ctx->num_area] = malloc(io->priv->cache->mem_size);
984 if(!ctx->mem[ctx->num_area])
985 break;
986 }
987
988 if(ctx->num_area == 1) // no real benefit in asynchronous writes
989 goto error_spare_sema;
990
991 async_io_stats_initialise(ctx, 0);
992
993 if(vcos_semaphore_create(&ctx->spare_sema, "async_spare_sem", ctx->num_area-1) != VCOS_SUCCESS)
994 goto error_spare_sema;
995
996 if(vcos_semaphore_create(&ctx->queue_sema, "async_queue_sem", 0) != VCOS_SUCCESS)
997 goto error_queue_sema;
998
999 if (vcos_event_create(&ctx->wake_event, "async_wake_event") != VCOS_SUCCESS)
1000 goto error_event;
1001
1002 // run this thread at a slightly higher priority than the calling thread - that means that
1003 // we prefer to write to the SD card rather than filling the memory buffer.
1004 pri = vcos_thread_get_priority(vcos_thread_current());
1005 if(vcos_thread_create_classic(&ctx->thread, "async_io", async_io_thread, ctx,
1006 ctx->stack, sizeof(ctx->stack), pri-1, 10, VCOS_START) != VCOS_SUCCESS)
1007 goto error_thread;
1008
1009 if(status) *status = VC_CONTAINER_SUCCESS;
1010 return ctx;
1011
1012 error_thread:
1013 vcos_event_delete(&ctx->wake_event);
1014 error_event:
1015 vcos_semaphore_delete(&ctx->queue_sema);
1016 error_queue_sema:
1017 vcos_semaphore_delete(&ctx->spare_sema);
1018 error_spare_sema:
1019 if(ctx) free(ctx);
1020 if(status) *status = VC_CONTAINER_ERROR_FAILED;
1021 return 0;
1022}
1023
1024static VC_CONTAINER_STATUS_T async_io_stop( VC_CONTAINER_IO_ASYNC_T *ctx )
1025{
1026 /* Block if a write operation is already in progress */
1027 //vcos_semaphore_wait(&ctx->sema);
1028 // XXX block until all done
1029
1030 ctx->quit = 1;
1031 vcos_event_signal(&ctx->wake_event);
1032 vcos_thread_join(&ctx->thread,NULL);
1033 vcos_event_delete(&ctx->wake_event);
1034 vcos_semaphore_delete(&ctx->queue_sema);
1035 vcos_semaphore_delete(&ctx->spare_sema);
1036
1037 while(ctx->num_area > 0)
1038 free(ctx->mem[--ctx->num_area]);
1039
1040 free(ctx);
1041 return VC_CONTAINER_SUCCESS;
1042}
1043#else
1044
1045static struct VC_CONTAINER_IO_ASYNC_T *async_io_start( VC_CONTAINER_IO_T *io, int num_areas, VC_CONTAINER_STATUS_T *status )
1046{
1047 VC_CONTAINER_PARAM_UNUSED(io);
1048 VC_CONTAINER_PARAM_UNUSED(num_areas);
1049 if(status) *status = VC_CONTAINER_ERROR_FAILED;
1050 return 0;
1051}
1052
1053static int async_io_write( struct VC_CONTAINER_IO_ASYNC_T *ctx, VC_CONTAINER_IO_PRIVATE_CACHE_T *cache )
1054{
1055 VC_CONTAINER_PARAM_UNUSED(ctx);
1056 VC_CONTAINER_PARAM_UNUSED(cache);
1057 return 0;
1058}
1059
1060static VC_CONTAINER_STATUS_T async_io_wait_complete( struct VC_CONTAINER_IO_ASYNC_T *ctx,
1061 VC_CONTAINER_IO_PRIVATE_CACHE_T *cache, int complete )
1062{
1063 VC_CONTAINER_PARAM_UNUSED(ctx);
1064 VC_CONTAINER_PARAM_UNUSED(cache);
1065 VC_CONTAINER_PARAM_UNUSED(complete);
1066 return 0;
1067}
1068
1069static VC_CONTAINER_STATUS_T async_io_stop( struct VC_CONTAINER_IO_ASYNC_T *ctx )
1070{
1071 VC_CONTAINER_PARAM_UNUSED(ctx);
1072 return VC_CONTAINER_SUCCESS;
1073}
1074
1075static void async_io_stats_initialise( struct VC_CONTAINER_IO_ASYNC_T *ctx, int enable )
1076{
1077 VC_CONTAINER_PARAM_UNUSED(ctx);
1078 VC_CONTAINER_PARAM_UNUSED(enable);
1079}
1080
1081static void async_io_stats_get( struct VC_CONTAINER_IO_ASYNC_T *ctx, VC_CONTAINER_WRITE_STATS_T *stats )
1082{
1083 VC_CONTAINER_PARAM_UNUSED(ctx);
1084 VC_CONTAINER_PARAM_UNUSED(stats);
1085}
1086
1087
1088#endif
1089