1#include "mupdf/fitz.h"
2
3#include <errno.h>
4#include <stdio.h>
5#include <string.h>
6#include <assert.h>
7
8#include "curl_stream.h"
9
10#ifdef _WIN32
11#include <windows.h>
12#else
13#include <sys/time.h>
14#include <unistd.h>
15#include <pthread.h>
16#endif
17
18/* File stream - progressive reading to simulate http download */
19
20typedef struct prog_state
21{
22 FILE *file;
23 int64_t length;
24 int64_t available; /* guarded by lock below */
25 int kbps;
26 int64_t start_time; /* in milliseconds since epoch */
27 unsigned char buffer[4096];
28 void (*on_data)(void*,int);
29 void *on_data_arg;
30
31 /* We assume either Windows threads or pthreads here. */
32#ifdef _WIN32
33 void *thread;
34 DWORD thread_id;
35 HANDLE mutex;
36#else
37 pthread_t thread;
38 pthread_mutex_t mutex;
39#endif
40} prog_state;
41
42static int64_t get_current_time(void)
43{
44#ifdef _WIN32
45 return (int)GetTickCount();
46#else
47 struct timeval now;
48 gettimeofday(&now, NULL);
49 return (int64_t)now.tv_sec*1000 + now.tv_usec/1000;
50#endif
51}
52
53#ifdef _WIN32
54static int locked;
55
56static void
57lock(prog_state *state)
58{
59 WaitForSingleObject(state->mutex, INFINITE);
60 assert(locked == 0);
61 locked = 1;
62}
63
64static void
65unlock(prog_state *state)
66{
67 assert(locked == 1);
68 locked = 0;
69 ReleaseMutex(state->mutex);
70}
71#else
72static void
73lock(prog_state *state)
74{
75 pthread_mutex_lock(&state->mutex);
76}
77
78static void
79unlock(prog_state *state)
80{
81 pthread_mutex_unlock(&state->mutex);
82}
83#endif
84
85static int next_prog(fz_context *ctx, fz_stream *stm, size_t in_len)
86{
87 prog_state *ps = (prog_state *)stm->state;
88 int64_t len = in_len;
89 int64_t n;
90
91 if (len > sizeof(ps->buffer))
92 len = sizeof(ps->buffer);
93
94 /* Simulate not all data available yet. */
95 lock(ps);
96 if (ps->available < ps->length)
97 {
98 if (stm->pos + len > ps->available)
99 {
100 len = ps->available - stm->pos;
101 if (len <= 0)
102 {
103 unlock(ps);
104 fz_throw(ctx, FZ_ERROR_TRYLATER, "Not enough data yet");
105 }
106 }
107 }
108 unlock(ps);
109
110 n = fread(ps->buffer, 1, len, ps->file);
111 stm->rp = ps->buffer;
112 stm->wp = ps->buffer + n;
113 stm->pos += (int64_t)n;
114
115 if (n < len)
116 {
117 if (ferror(ps->file))
118 fz_throw(ctx, FZ_ERROR_GENERIC, "prog read error: %s", strerror(errno));
119 if (n == 0)
120 return EOF;
121 }
122 return *stm->rp++;
123}
124
125static void seek_prog(fz_context *ctx, fz_stream *stm, int64_t offset, int whence)
126{
127 prog_state *ps = (prog_state *)stm->state;
128
129 if (whence == SEEK_END)
130 {
131 whence = SEEK_SET;
132 offset += ps->length;
133 }
134 else if (whence == SEEK_CUR)
135 {
136 whence = SEEK_SET;
137 offset += stm->pos;
138 }
139
140 if (fseek(ps->file, offset, whence) != 0)
141 fz_throw(ctx, FZ_ERROR_GENERIC, "cannot seek: %s", strerror(errno));
142 stm->pos = offset;
143 stm->wp = stm->rp;
144}
145
146static void close_prog(fz_context *ctx, void *state)
147{
148 prog_state *ps = (prog_state *)state;
149 int n = fclose(ps->file);
150 if (n < 0)
151 fz_warn(ctx, "cannot fclose: %s", strerror(errno));
152 fz_free(ctx, state);
153}
154
155static void fetcher_thread(prog_state *ps)
156{
157 int complete = 0;
158
159 ps->start_time = get_current_time();
160
161 while (!complete)
162 {
163 /* Wait a while. */
164#ifdef _WIN32
165 Sleep(200);
166#else
167 usleep(200000);
168#endif
169
170 lock(ps);
171
172 /* Simulate more data having arrived. */
173 if (ps->available < ps->length)
174 {
175 int64_t av = (get_current_time() - ps->start_time) * ps->kbps;
176 if (av > ps->length)
177 av = ps->length;
178 ps->available = av;
179 }
180 else
181 {
182 complete = 1;
183 }
184
185 unlock(ps);
186
187 /* Ping callback with new data. */
188 if (ps->on_data)
189 ps->on_data(ps->on_data_arg, complete);
190 }
191}
192
193#ifdef _WIN32
194static DWORD WINAPI win_thread(void *lparam)
195{
196 fetcher_thread((prog_state *)lparam);
197 return 0;
198}
199#else
200static void *pthread_thread(void *arg)
201{
202 fetcher_thread((prog_state *)arg);
203 return NULL;
204}
205#endif
206
207static fz_stream *
208fz_open_file_ptr_progressive(fz_context *ctx, FILE *file, int kbps, void (*on_data)(void*,int), void *opaque)
209{
210 fz_stream *stm;
211 prog_state *state;
212
213 state = fz_malloc_struct(ctx, prog_state);
214 state->file = file;
215 state->kbps = kbps;
216 state->available = 0;
217
218 state->on_data = on_data;
219 state->on_data_arg = opaque;
220
221 fseek(state->file, 0, SEEK_END);
222 state->length = ftell(state->file);
223 fseek(state->file, 0, SEEK_SET);
224
225#ifdef _WIN32
226 state->mutex = CreateMutex(NULL, FALSE, NULL);
227 if (state->mutex == NULL)
228 fz_throw(ctx, FZ_ERROR_GENERIC, "mutex creation failed");
229
230 state->thread = CreateThread(NULL, 0, win_thread, state, 0, &state->thread_id);
231 if (state->thread == NULL)
232 fz_throw(ctx, FZ_ERROR_GENERIC, "thread creation failed");
233#else
234 if (pthread_mutex_init(&state->mutex, NULL))
235 fz_throw(ctx, FZ_ERROR_GENERIC, "mutex creation failed");
236
237 if (pthread_create(&state->thread, NULL, pthread_thread, state))
238 fz_throw(ctx, FZ_ERROR_GENERIC, "thread creation failed");
239#endif
240
241 stm = fz_new_stream(ctx, state, next_prog, close_prog);
242 stm->progressive = 1;
243 stm->seek = seek_prog;
244
245 return stm;
246}
247
248fz_stream *
249fz_open_file_progressive(fz_context *ctx, const char *name, int kbps, void (*on_data)(void*,int), void *opaque)
250{
251 FILE *f;
252#ifdef _WIN32
253 f = fz_fopen_utf8(name, "rb");
254#else
255 f = fopen(name, "rb");
256#endif
257 if (f == NULL)
258 fz_throw(ctx, FZ_ERROR_GENERIC, "cannot open %s", name);
259 return fz_open_file_ptr_progressive(ctx, f, kbps, on_data, opaque);
260}
261