1#include "mupdf/fitz.h"
2#include "curl_stream.h"
3
4#include <assert.h>
5#include <string.h>
6#include <ctype.h>
7
8#include <curl/curl.h>
9
10#ifdef _WIN32
11#include <windows.h>
12#else
13#include <pthread.h>
14#endif
15
16#undef DEBUG_BLOCK_FETCHING
17
18#ifdef DEBUG_BLOCK_FETCHING
19#ifdef _WIN32
20#include <varargs.h>
21static void
22output(const char *fmt, ...)
23{
24 va_list args;
25 char text[256];
26
27 va_start(args, fmt);
28 vsnprintf(text, sizeof(text), fmt, args);
29 va_end(args);
30
31 OutputDebugString(text);
32}
33#else
34#define output printf
35#endif
36
37#define DEBUG_MESSAGE(A) do { output A; } while(0)
38#else
39#define DEBUG_MESSAGE(A) do { } while(0)
40#endif
41
42#define BLOCK_SHIFT 18
43#define BLOCK_SIZE (1<<BLOCK_SHIFT)
44
45#define HAVE_BLOCK(map, num) (((map)[(num)>>3] & (1<<((num) & 7))) != 0)
46
47typedef struct curlstate
48{
49 fz_context *ctx;
50 CURL *easy;
51
52 /* START: The following entries are protected by the lock */
53 CURLcode curl_error;
54 int data_arrived;
55 int complete;
56 int kill_thread;
57 int accept_ranges;
58 int head;
59
60 /* content buffer */
61 size_t content_length; /* 0 => Unknown length */
62 unsigned char *buffer;
63 size_t buffer_fill;
64 size_t buffer_max;
65
66 /* map of which blocks we have */
67 unsigned char *map;
68 size_t map_length;
69
70 /* outstanding curl request info */
71 size_t next_fill_start; /* The next file offset we will fetch to */
72 size_t current_fill_start; /* The current file offset we are fetching to */
73 size_t current_fill_end;
74 /* END: The above entries are protected by the lock */
75
76 void (*more_data)(void *,int);
77 void *more_data_arg;
78
79 unsigned char public_buffer[4096];
80
81 /* We assume either Windows threads or pthreads here. */
82#ifdef _WIN32
83 void *thread;
84 DWORD thread_id;
85 HANDLE mutex;
86#else
87 pthread_t thread;
88 pthread_mutex_t mutex;
89#endif
90} curlstate;
91
92#ifdef _WIN32
93static int locked;
94
95static void
96lock(curlstate *state)
97{
98 WaitForSingleObject(state->mutex, INFINITE);
99 assert(locked == 0);
100 locked = 1;
101}
102
103static void
104unlock(curlstate *state)
105{
106 assert(locked == 1);
107 locked = 0;
108 ReleaseMutex(state->mutex);
109}
110#else
111static void
112lock(curlstate *state)
113{
114 pthread_mutex_lock(&state->mutex);
115}
116
117static void
118unlock(curlstate *state)
119{
120 pthread_mutex_unlock(&state->mutex);
121}
122#endif
123
124static size_t on_curl_header(void *ptr, size_t size, size_t nmemb, void *state_)
125{
126 struct curlstate *state = state_;
127
128 lock(state);
129 if (strncmp(ptr, "Accept-Ranges: bytes", 20) == 0)
130 {
131 DEBUG_MESSAGE(("header arrived with Accept-Ranges!\n"));
132 state->accept_ranges = 1;
133 }
134
135 if (strncmp(ptr, "Content-Length:", 15) == 0)
136 {
137 char *s = ptr;
138 state->content_length = fz_atoi(s + 15);
139 DEBUG_MESSAGE(("header arrived with Content-Length: %d\n", state->content_length));
140 }
141 unlock(state);
142
143 return nmemb * size;
144}
145
146static size_t on_curl_data(void *ptr, size_t size, size_t nmemb, void *state_)
147{
148 struct curlstate *state = state_;
149 size_t old_start;
150
151 size *= nmemb;
152
153 lock(state);
154 if (state->data_arrived == 0)
155 {
156 /* This is the first time data has arrived.
157 * If the header has Accept-Ranges then we can do byte requests.
158 * We know the Content-Length from having processed the header already.
159 */
160 if (state->content_length == 0)
161 {
162 /* What a crap server. Won't tell us how big the file
163 * is. We'll have to expand as data as arrives. */
164 DEBUG_MESSAGE(("have no length!\n"));
165 }
166 else if (state->accept_ranges)
167 {
168 /* We got a range header, and the correct http response
169 * code. We can assume that byte fetches are accepted
170 * and we'll run without progressive mode. */
171 size_t len = state->content_length;
172 state->map_length = (len+BLOCK_SIZE-1)>>BLOCK_SHIFT;
173 state->map = fz_malloc_no_throw(state->ctx, (state->map_length+7)>>3);
174 state->buffer = fz_malloc_no_throw(state->ctx, len);
175 state->buffer_max = len;
176 if (state->map == NULL || state->buffer == NULL)
177 {
178 unlock(state);
179 return 0;
180 }
181 memset(state->map, 0, (state->map_length+7)>>3);
182 DEBUG_MESSAGE(("have range header content_length=%d!\n", state->content_length));
183 }
184 else
185 {
186 /* We know the length, and that we can use ByteRanges -
187 * we can run as a progressive file. */
188 state->buffer = fz_malloc_no_throw(state->ctx, state->content_length);
189 if (state->buffer == NULL)
190 {
191 unlock(state);
192 return 0;
193 }
194 state->buffer_max = state->content_length;
195 }
196
197 state->data_arrived = 1;
198 }
199
200 if (state->content_length == 0)
201 {
202 size_t newsize = (state->current_fill_start + size);
203 if (newsize > state->buffer_max)
204 {
205 /* Expand the buffer */
206 size_t new_max = state->buffer_max * 2;
207 if (new_max == 0)
208 new_max = 4096;
209 fz_try(state->ctx)
210 state->buffer = fz_realloc_array(state->ctx, state->buffer, new_max, unsigned char);
211 fz_catch(state->ctx)
212 {
213 unlock(state);
214 return 0;
215 }
216 state->buffer_max = new_max;
217 }
218 }
219
220 DEBUG_MESSAGE(("data arrived: offset=%ld len=%ld\n", state->current_fill_start, size));
221 /* Although we always trigger fills starting on block boundaries,
222 * code this to allow for curl calling us to copy smaller blocks
223 * as they arrive. */
224 old_start = state->current_fill_start;
225 memcpy(state->buffer + state->current_fill_start, ptr, size);
226 state->current_fill_start += size;
227 /* If we've reached the end, or at least a different block
228 * mark that we've got that block. */
229 if (state->map && (state->current_fill_start == state->content_length ||
230 (((state->current_fill_start ^ old_start) & ~(BLOCK_SIZE-1)) != 0)))
231 {
232 old_start >>= BLOCK_SHIFT;
233 state->map[old_start>>3] |= 1<<(old_start & 7);
234 }
235 unlock(state);
236
237 return size;
238}
239
240static void fetch_chunk(struct curlstate *state)
241{
242 char text[32];
243 size_t block, start, end;
244 CURLcode ret;
245
246 ret = curl_easy_perform(state->easy);
247 if (ret != CURLE_OK) {
248 /* If we get an error, store it, and kill the thread.
249 * The next fetch will return it. */
250 lock(state);
251 state->curl_error = ret;
252 state->kill_thread = 1;
253 unlock(state);
254 return;
255 }
256
257 /* We finished the header, now request the body. */
258 lock(state);
259 if (state->head)
260 {
261 state->head = 0;
262 curl_easy_setopt(state->easy, CURLOPT_NOBODY, 0);
263 curl_easy_setopt(state->easy, CURLOPT_HEADERFUNCTION, NULL);
264 curl_easy_setopt(state->easy, CURLOPT_WRITEHEADER, NULL);
265 if (state->accept_ranges)
266 {
267 fz_snprintf(text, 32, "%d-%d", 0, BLOCK_SIZE-1);
268 curl_easy_setopt(state->easy, CURLOPT_RANGE, text);
269 state->next_fill_start = BLOCK_SIZE;
270 }
271 unlock(state);
272 return;
273 }
274
275 /* We finished the current body. If not accepting ranges, that's the end. */
276 if (!state->accept_ranges)
277 {
278 DEBUG_MESSAGE(("we got it all, in one request.\n"));
279 state->complete = 1;
280 state->kill_thread = 1;
281 unlock(state);
282 return;
283 }
284
285 /* Find the next block to fetch */
286 assert((state->next_fill_start & (BLOCK_SHIFT-1)) == 0);
287 block = state->next_fill_start>>BLOCK_SHIFT;
288 if (state->content_length > 0)
289 {
290 /* Find the next block that we haven't got */
291 size_t map_length = state->map_length;
292 unsigned char *map = state->map;
293 while (block < map_length && HAVE_BLOCK(map, block))
294 ++block;
295 if (block == map_length)
296 {
297 block = 0;
298 while (block < map_length && HAVE_BLOCK(map, block))
299 ++block;
300 if (block == map_length)
301 {
302 /* We've got it all! */
303 DEBUG_MESSAGE(("we got it all block=%d map_length=%d!\n", block, map_length));
304 state->complete = 1;
305 state->kill_thread = 1;
306 unlock(state);
307 return;
308 }
309 }
310 }
311 else
312 {
313 state->complete = 1;
314 state->kill_thread = 1;
315 unlock(state);
316 return;
317 }
318
319 DEBUG_MESSAGE(("block requested was %d, fetching %d\n", state->next_fill_start>>BLOCK_SHIFT, block));
320
321 /* Set up fetch of that block */
322 start = block<<BLOCK_SHIFT;
323 end = start + BLOCK_SIZE-1;
324 state->current_fill_start = start;
325 if (state->content_length > 0 && end >= state->content_length)
326 end = state->content_length-1;
327 state->current_fill_end = end;
328 fz_snprintf(text, 32, "%d-%d", start, end);
329
330 /* Unless anyone changes this in the meantime, the
331 * next block we fetch will follow on from this one. */
332 state->next_fill_start = state->current_fill_start+BLOCK_SIZE;
333 unlock(state);
334
335 /* Request next range! */
336 DEBUG_MESSAGE(("requesting range %s\n", text));
337 curl_easy_setopt(state->easy, CURLOPT_RANGE, text);
338}
339
340static int cs_next(fz_context *ctx, fz_stream *stream, size_t len)
341{
342 struct curlstate *state = stream->state;
343 size_t len_read = 0;
344 int64_t read_point = stream->pos;
345 int block = read_point>>BLOCK_SHIFT;
346 size_t left_over = (-read_point) & (BLOCK_SIZE-1);
347 unsigned char *buf = state->public_buffer;
348
349 assert(len != 0);
350
351 stream->rp = stream->wp = buf;
352 lock(state);
353
354 /* If we got an error from the fetching thread,
355 * throw it here (but just once). */
356 if (state->curl_error)
357 {
358 CURLcode err = state->curl_error;
359 state->curl_error = 0;
360 unlock(state);
361 fz_throw(ctx, FZ_ERROR_GENERIC, "cannot fetch data: %s", curl_easy_strerror(err));
362 }
363
364 if (read_point > state->content_length)
365 {
366 if (state->data_arrived == 0)
367 {
368 unlock(state);
369 fz_throw(ctx, FZ_ERROR_TRYLATER, "read of a block we don't have (A) (offset=%ld)", read_point);
370 }
371 return EOF;
372 }
373
374 if (len > sizeof(state->public_buffer))
375 len = sizeof(state->public_buffer);
376
377 if (state->map == NULL)
378 {
379 /* We are doing a simple linear fetch as we don't know the
380 * content length. */
381 if (read_point + len > state->current_fill_start)
382 {
383 unlock(state);
384 fz_throw(ctx, FZ_ERROR_TRYLATER, "read of a block we don't have (B) (offset=%ld)", read_point);
385 }
386 memcpy(buf, state->buffer + read_point, len);
387 unlock(state);
388 stream->wp = buf + len;
389 stream->pos += len;
390 if (len == 0)
391 return EOF;
392 return *stream->rp++;
393 }
394
395 /* We are reading from a "mapped" file */
396 if (read_point + len > state->content_length)
397 len = state->content_length - read_point;
398 if (left_over > len)
399 left_over = len;
400 if (left_over > 0)
401 {
402 /* We are starting midway through a block */
403 if (!HAVE_BLOCK(state->map, block))
404 {
405 state->next_fill_start = block<<BLOCK_SHIFT;
406 unlock(state);
407 fz_throw(ctx, FZ_ERROR_TRYLATER, "read of a block we don't have (C) (offset=%ld)", read_point);
408 }
409 block++;
410 memcpy(buf, state->buffer + read_point, left_over);
411 buf += left_over;
412 read_point += left_over;
413 len -= left_over;
414 len_read += left_over;
415 }
416
417 /* Copy any complete blocks */
418 while (len > BLOCK_SIZE)
419 {
420 if (!HAVE_BLOCK(state->map, block))
421 {
422 /* We don't have enough data to fulfill the request. */
423 /* Fetch the next block from here. */
424 unlock(state);
425 state->next_fill_start = block<<BLOCK_SHIFT;
426 stream->wp += len_read;
427 stream->pos += len_read;
428 /* If we haven't fetched anything, throw. */
429 if (len_read == 0)
430 fz_throw(ctx, FZ_ERROR_TRYLATER, "read of a block we don't have (D) (offset=%ld)", read_point);
431 /* Otherwise, we got at least one byte, so we can safely return that. */
432 return *stream->rp++;
433 }
434 block++;
435 memcpy(buf, state->buffer + read_point, BLOCK_SIZE);
436 buf += BLOCK_SIZE;
437 read_point += BLOCK_SIZE;
438 len -= BLOCK_SIZE;
439 len_read += BLOCK_SIZE;
440 }
441
442 /* Copy any trailing bytes */
443 if (len > 0)
444 {
445 if (!HAVE_BLOCK(state->map, block))
446 {
447 /* We don't have enough data to fulfill the request. */
448 /* Fetch the next block from here. */
449 unlock(state);
450 state->next_fill_start = block<<BLOCK_SHIFT;
451 stream->wp += len_read;
452 stream->pos += len_read;
453 /* If we haven't fetched anything, throw. */
454 if (len_read == 0)
455 fz_throw(ctx, FZ_ERROR_TRYLATER, "read of a block we don't have (E) (offset=%ld)", read_point);
456 /* Otherwise, we got at least one byte, so we can safely return that. */
457 return *stream->rp++;
458 }
459 memcpy(buf, state->buffer + read_point, len);
460 len_read += len;
461 }
462
463 unlock(state);
464 stream->wp += len_read;
465 stream->pos += len_read;
466 if (len_read == 0)
467 return EOF;
468 return *stream->rp++;
469}
470
471static void cs_close(fz_context *ctx, void *state_)
472{
473 struct curlstate *state = state_;
474
475 lock(state);
476 state->kill_thread = 1;
477 unlock(state);
478
479#ifdef _WIN32
480 WaitForSingleObject(state->thread, INFINITE);
481 CloseHandle(state->thread);
482 CloseHandle(state->mutex);
483#else
484 pthread_join(state->thread, NULL);
485 pthread_mutex_destroy(&state->mutex);
486#endif
487
488 curl_easy_cleanup(state->easy);
489 fz_free(ctx, state->buffer);
490 fz_free(ctx, state->map);
491 fz_free(ctx, state);
492}
493
494static void cs_seek(fz_context *ctx, fz_stream *stm, int64_t offset, int whence)
495{
496 struct curlstate *state = stm->state;
497
498 stm->wp = stm->rp;
499 if (whence == SEEK_END)
500 {
501 size_t clen;
502 int data_arrived;
503 lock(state);
504 data_arrived = state->data_arrived;
505 clen = state->content_length;
506 unlock(state);
507 if (!data_arrived)
508 fz_throw(ctx, FZ_ERROR_TRYLATER, "still awaiting file length");
509 stm->pos = clen + offset;
510 }
511 else if (whence == SEEK_CUR)
512 stm->pos += offset;
513 else
514 stm->pos = offset;
515 if (stm->pos < 0)
516 stm->pos = 0;
517}
518
519static void
520fetcher_thread(curlstate *state)
521{
522 /* Keep fetching chunks on a background thread until
523 * either we have to kill the thread, or the fetch
524 * is complete. */
525 while (1) {
526 int complete;
527 lock(state);
528 complete = state->complete || state->kill_thread;
529 unlock(state);
530 if (complete)
531 break;
532 fetch_chunk(state);
533 if (state->more_data)
534 state->more_data(state->more_data_arg, 0);
535 }
536 if (state->more_data)
537 state->more_data(state->more_data_arg, 1);
538}
539
540#ifdef _WIN32
541static DWORD WINAPI
542win_thread(void *lparam)
543{
544 fetcher_thread((curlstate *)lparam);
545
546 return 0;
547}
548#else
549static void *
550pthread_thread(void *arg)
551{
552 fetcher_thread((curlstate *)arg);
553 return NULL;
554}
555#endif
556
557fz_stream *fz_open_url(fz_context *ctx, const char *url, int kbps, void (*more_data)(void *,int), void *more_data_arg)
558{
559 struct curlstate *state;
560 fz_stream *stm;
561 CURLcode code;
562
563 state = fz_malloc_struct(ctx, struct curlstate);
564 state->ctx = ctx;
565
566 code = curl_global_init(CURL_GLOBAL_ALL);
567 if (code != CURLE_OK)
568 fz_throw(ctx, FZ_ERROR_GENERIC, "curl_global_init failed");
569
570 state->easy = curl_easy_init();
571 if (!state->easy)
572 fz_throw(ctx, FZ_ERROR_GENERIC, "curl_easy_init failed");
573
574 curl_easy_setopt(state->easy, CURLOPT_URL, url);
575 curl_easy_setopt(state->easy, CURLOPT_FOLLOWLOCATION, 1);
576 curl_easy_setopt(state->easy, CURLOPT_MAXREDIRS, 12);
577 curl_easy_setopt(state->easy, CURLOPT_SSL_VERIFYPEER, 0);
578 curl_easy_setopt(state->easy, CURLOPT_SSL_VERIFYHOST, 0);
579 curl_easy_setopt(state->easy, CURLOPT_MAX_RECV_SPEED_LARGE, kbps * 1024);
580 curl_easy_setopt(state->easy, CURLOPT_HEADERFUNCTION, on_curl_header);
581 curl_easy_setopt(state->easy, CURLOPT_WRITEHEADER, state);
582 curl_easy_setopt(state->easy, CURLOPT_WRITEFUNCTION, on_curl_data);
583 curl_easy_setopt(state->easy, CURLOPT_WRITEDATA, state);
584
585 /* Get only the HEAD first. */
586 state->head = 1;
587 curl_easy_setopt(state->easy, CURLOPT_NOBODY, 1);
588
589#ifdef _WIN32
590 state->mutex = CreateMutex(NULL, FALSE, NULL);
591 if (state->mutex == NULL)
592 fz_throw(ctx, FZ_ERROR_GENERIC, "mutex creation failed");
593
594 state->thread = CreateThread(NULL, 0, win_thread, state, 0, &state->thread_id);
595 if (state->thread == NULL)
596 fz_throw(ctx, FZ_ERROR_GENERIC, "thread creation failed");
597#else
598 if (pthread_mutex_init(&state->mutex, NULL))
599 fz_throw(ctx, FZ_ERROR_GENERIC, "mutex creation failed");
600
601 if (pthread_create(&state->thread, NULL, pthread_thread, state))
602 fz_throw(ctx, FZ_ERROR_GENERIC, "thread creation failed");
603#endif
604 state->more_data = more_data;
605 state->more_data_arg = more_data_arg;
606
607 stm = fz_new_stream(ctx, state, cs_next, cs_close);
608 stm->progressive = 1;
609 stm->seek = cs_seek;
610 return stm;
611}
612