1 | /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ |
2 | // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: |
3 | #ident "$Id$" |
4 | /*====== |
5 | This file is part of PerconaFT. |
6 | |
7 | |
8 | Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. |
9 | |
10 | PerconaFT is free software: you can redistribute it and/or modify |
11 | it under the terms of the GNU General Public License, version 2, |
12 | as published by the Free Software Foundation. |
13 | |
14 | PerconaFT is distributed in the hope that it will be useful, |
15 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
16 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
17 | GNU General Public License for more details. |
18 | |
19 | You should have received a copy of the GNU General Public License |
20 | along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. |
21 | |
22 | ---------------------------------------- |
23 | |
24 | PerconaFT is free software: you can redistribute it and/or modify |
25 | it under the terms of the GNU Affero General Public License, version 3, |
26 | as published by the Free Software Foundation. |
27 | |
28 | PerconaFT is distributed in the hope that it will be useful, |
29 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
30 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
31 | GNU Affero General Public License for more details. |
32 | |
33 | You should have received a copy of the GNU Affero General Public License |
34 | along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. |
35 | ======= */ |
36 | |
37 | #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved." |
38 | |
39 | #include <my_global.h> |
40 | #include <errno.h> |
41 | #include <string.h> |
42 | #include <unistd.h> |
43 | |
44 | #include "portability/toku_assert.h" |
45 | #include "portability/memory.h" |
46 | |
47 | #include "ft/ft-internal.h" |
48 | #include "ft/serialize/ft_node-serialize.h" |
49 | #include "loader/dbufio.h" |
50 | #include "loader/loader-internal.h" |
51 | |
52 | toku_instr_key *bfs_mutex_key; |
53 | toku_instr_key *bfs_cond_key; |
54 | toku_instr_key *io_thread_key; |
55 | |
56 | struct dbufio_file { |
57 | // i/o thread owns these |
58 | int fd; |
59 | |
60 | // consumers own these |
61 | size_t offset_in_buf; |
62 | toku_off_t offset_in_uncompressed_file; |
63 | |
64 | // need the mutex to modify these |
65 | struct dbufio_file *next; |
66 | bool second_buf_ready; // if true, the i/o thread is not touching anything. |
67 | |
68 | // consumers own [0], i/o thread owns [1], they are swapped by the consumer only when the condition mutex is held and second_buf_ready is true. |
69 | char *buf[2]; |
70 | size_t n_in_buf[2]; |
71 | int error_code[2]; // includes errno or eof. [0] is the error code associated with buf[0], [1] is the code for buf[1] |
72 | |
73 | bool io_done; |
74 | }; |
75 | |
76 | |
77 | /* A dbufio_fileset */ |
78 | struct dbufio_fileset { |
79 | // The mutex/condition variables protect |
80 | // the singly-linked list of files that need I/O (head/tail in the fileset, and next in each file) |
81 | // in each file: |
82 | // the second_buf_ready boolean (which says the second buffer is full of data). |
83 | // the swapping of the buf[], n_in_buf[], and error_code[] values. |
84 | toku_mutex_t mutex; |
85 | toku_cond_t cond; |
86 | int N; // How many files. This is constant once established. |
87 | int n_not_done; // how many of the files require more I/O? Owned by the i/o thread. |
88 | struct dbufio_file *files; // an array of length N. |
89 | struct dbufio_file *head, *tail; // must have the mutex to fiddle with these. |
90 | size_t bufsize; // the bufsize is the constant (the same for all buffers). |
91 | |
92 | bool panic; |
93 | bool compressed; |
94 | int panic_errno; |
95 | toku_pthread_t iothread; |
96 | }; |
97 | |
98 | |
99 | static void enq (DBUFIO_FILESET bfs, struct dbufio_file *f) { |
100 | if (bfs->tail==NULL) { |
101 | bfs->head = f; |
102 | } else { |
103 | bfs->tail->next = f; |
104 | } |
105 | bfs->tail = f; |
106 | f->next = NULL; |
107 | } |
108 | |
109 | static void panic (DBUFIO_FILESET bfs, int r) { |
110 | if (bfs->panic) return; |
111 | bfs->panic_errno = r; // Don't really care about a race on this variable... Writes to it are atomic, so at least one good panic reason will be stored. |
112 | bfs->panic = true; |
113 | return; |
114 | } |
115 | |
116 | static bool paniced (DBUFIO_FILESET bfs) { |
117 | return bfs->panic; |
118 | } |
119 | |
120 | static ssize_t dbf_read_some_compressed(struct dbufio_file *dbf, char *buf, size_t bufsize) { |
121 | ssize_t ret; |
122 | invariant(bufsize >= MAX_UNCOMPRESSED_BUF); |
123 | unsigned char *raw_block = NULL; |
124 | |
125 | // deserialize the sub block header |
126 | |
127 | // total_size |
128 | // num_sub_blocks |
129 | // compressed_size,uncompressed_size,xsum (repeated num_sub_blocks times) |
130 | ssize_t readcode; |
131 | const uint32_t = sizeof(uint32_t); |
132 | char [header_size]; |
133 | |
134 | readcode = toku_os_read(dbf->fd, &header, header_size); |
135 | if (readcode < 0) { |
136 | ret = -1; |
137 | goto exit; |
138 | } |
139 | if (readcode == 0) { |
140 | ret = 0; |
141 | goto exit; |
142 | } |
143 | if (readcode < (ssize_t) header_size) { |
144 | errno = TOKUDB_NO_DATA; |
145 | ret = -1; |
146 | goto exit; |
147 | } |
148 | uint32_t total_size; |
149 | { |
150 | uint32_t *p = (uint32_t *) &header[0]; |
151 | total_size = toku_dtoh32(p[0]); |
152 | } |
153 | if (total_size == 0 || total_size > (1<<30)) { |
154 | errno = toku_db_badformat(); |
155 | ret = -1; |
156 | goto exit; |
157 | } |
158 | |
159 | //Cannot use XMALLOC |
160 | MALLOC_N(total_size, raw_block); |
161 | if (raw_block == nullptr) { |
162 | errno = ENOMEM; |
163 | ret = -1; |
164 | goto exit; |
165 | } |
166 | readcode = toku_os_read(dbf->fd, raw_block, total_size); |
167 | if (readcode < 0) { |
168 | ret = -1; |
169 | goto exit; |
170 | } |
171 | if (readcode < (ssize_t) total_size) { |
172 | errno = TOKUDB_NO_DATA; |
173 | ret = -1; |
174 | goto exit; |
175 | } |
176 | |
177 | struct sub_block sub_block[max_sub_blocks]; |
178 | uint32_t *; |
179 | sub_block_header = (uint32_t *) &raw_block[0]; |
180 | int32_t n_sub_blocks; |
181 | n_sub_blocks = toku_dtoh32(sub_block_header[0]); |
182 | sub_block_header++; |
183 | size_t ; |
184 | size_subblock_header = sub_block_header_size(n_sub_blocks); |
185 | if (n_sub_blocks == 0 || n_sub_blocks > max_sub_blocks || size_subblock_header > total_size) { |
186 | errno = toku_db_badformat(); |
187 | ret = -1; |
188 | goto exit; |
189 | } |
190 | for (int i = 0; i < n_sub_blocks; i++) { |
191 | sub_block_init(&sub_block[i]); |
192 | sub_block[i].compressed_size = toku_dtoh32(sub_block_header[0]); |
193 | sub_block[i].uncompressed_size = toku_dtoh32(sub_block_header[1]); |
194 | sub_block[i].xsum = toku_dtoh32(sub_block_header[2]); |
195 | sub_block_header += 3; |
196 | } |
197 | |
198 | // verify sub block sizes |
199 | size_t total_compressed_size; |
200 | total_compressed_size = 0; |
201 | for (int i = 0; i < n_sub_blocks; i++) { |
202 | uint32_t compressed_size = sub_block[i].compressed_size; |
203 | if (compressed_size<=0 || compressed_size>(1<<30)) { |
204 | errno = toku_db_badformat(); |
205 | ret = -1; |
206 | goto exit; |
207 | } |
208 | |
209 | uint32_t uncompressed_size = sub_block[i].uncompressed_size; |
210 | if (uncompressed_size<=0 || uncompressed_size>(1<<30)) { |
211 | errno = toku_db_badformat(); |
212 | ret = -1; |
213 | goto exit; |
214 | } |
215 | total_compressed_size += compressed_size; |
216 | } |
217 | if (total_size != total_compressed_size + size_subblock_header) { |
218 | errno = toku_db_badformat(); |
219 | ret = -1; |
220 | goto exit; |
221 | } |
222 | |
223 | // sum up the uncompressed size of the sub blocks |
224 | size_t uncompressed_size; |
225 | uncompressed_size = get_sum_uncompressed_size(n_sub_blocks, sub_block); |
226 | if (uncompressed_size > bufsize || uncompressed_size > MAX_UNCOMPRESSED_BUF) { |
227 | errno = toku_db_badformat(); |
228 | ret = -1; |
229 | goto exit; |
230 | } |
231 | |
232 | unsigned char *uncompressed_data; |
233 | uncompressed_data = (unsigned char *)buf; |
234 | |
235 | // point at the start of the compressed data (past the node header, the sub block header, and the header checksum) |
236 | unsigned char *compressed_data; |
237 | compressed_data = raw_block + size_subblock_header; |
238 | |
239 | // decompress all the compressed sub blocks into the uncompressed buffer |
240 | { |
241 | int r; |
242 | r = decompress_all_sub_blocks(n_sub_blocks, sub_block, compressed_data, uncompressed_data, get_num_cores(), get_ft_pool()); |
243 | if (r != 0) { |
244 | fprintf(stderr, "%s:%d loader failed %d at %p size %" PRIu32"\n" , __FUNCTION__, __LINE__, r, raw_block, total_size); |
245 | dump_bad_block(raw_block, total_size); |
246 | errno = r; |
247 | ret = -1; |
248 | goto exit; |
249 | } |
250 | } |
251 | ret = uncompressed_size; |
252 | exit: |
253 | if (raw_block) { |
254 | toku_free(raw_block); |
255 | } |
256 | return ret; |
257 | } |
258 | |
259 | static ssize_t dbf_read_compressed(struct dbufio_file *dbf, char *buf, size_t bufsize) { |
260 | invariant(bufsize >= MAX_UNCOMPRESSED_BUF); |
261 | size_t count = 0; |
262 | |
263 | while (count + MAX_UNCOMPRESSED_BUF <= bufsize) { |
264 | ssize_t readcode = dbf_read_some_compressed(dbf, buf + count, bufsize - count); |
265 | if (readcode < 0) { |
266 | return readcode; |
267 | } |
268 | count += readcode; |
269 | if (readcode == 0) { |
270 | break; |
271 | } |
272 | } |
273 | return count; |
274 | } |
275 | |
276 | static void* io_thread (void *v) |
277 | // The dbuf_thread does all the asynchronous I/O. |
278 | { |
279 | DBUFIO_FILESET bfs = (DBUFIO_FILESET)v; |
280 | toku_mutex_lock(&bfs->mutex); |
281 | //printf("%s:%d Locked\n", __FILE__, __LINE__); |
282 | while (1) { |
283 | if (paniced(bfs)) { |
284 | toku_mutex_unlock(&bfs->mutex); // ignore any error |
285 | toku_instr_delete_current_thread(); |
286 | return toku_pthread_done(nullptr); |
287 | } |
288 | // printf("n_not_done=%d\n", bfs->n_not_done); |
289 | if (bfs->n_not_done == 0) { |
290 | // all done (meaning we stored EOF (or another error) in |
291 | // error_code[0] for the file. |
292 | // printf("unlocked\n"); |
293 | toku_mutex_unlock(&bfs->mutex); |
294 | toku_instr_delete_current_thread(); |
295 | return toku_pthread_done(nullptr); |
296 | } |
297 | |
298 | struct dbufio_file *dbf = bfs->head; |
299 | if (dbf == NULL) { |
300 | // No I/O needs to be done yet. |
301 | // Wait until something happens that will wake us up. |
302 | toku_cond_wait(&bfs->cond, &bfs->mutex); |
303 | if (paniced(bfs)) { |
304 | toku_mutex_unlock(&bfs->mutex); // ignore any error |
305 | toku_instr_delete_current_thread(); |
306 | return toku_pthread_done(nullptr); |
307 | } |
308 | // Have the lock so go around. |
309 | } else { |
310 | // Some I/O needs to be done. |
311 | // printf("%s:%d Need I/O\n", __FILE__, __LINE__); |
312 | assert(dbf->second_buf_ready == false); |
313 | assert(!dbf->io_done); |
314 | bfs->head = dbf->next; |
315 | if (bfs->head == NULL) |
316 | bfs->tail = NULL; |
317 | |
318 | // Unlock the mutex now that we have ownership of dbf to allow |
319 | // consumers to get the mutex and perform swaps. They won't swap |
320 | // this buffer because second_buf_ready is false. |
321 | toku_mutex_unlock(&bfs->mutex); |
322 | //printf("%s:%d Doing read fd=%d\n", __FILE__, __LINE__, dbf->fd); |
323 | { |
324 | ssize_t readcode; |
325 | if (bfs->compressed) { |
326 | readcode = dbf_read_compressed(dbf, dbf->buf[1], bfs->bufsize); |
327 | } |
328 | else { |
329 | readcode = toku_os_read(dbf->fd, dbf->buf[1], bfs->bufsize); |
330 | } |
331 | //printf("%s:%d readcode=%ld\n", __FILE__, __LINE__, readcode); |
332 | if (readcode==-1) { |
333 | // a real error. Save the real error. |
334 | int the_errno = get_error_errno(); |
335 | fprintf(stderr, "%s:%d dbf=%p fd=%d errno=%d\n" , __FILE__, __LINE__, dbf, dbf->fd, the_errno); |
336 | dbf->error_code[1] = the_errno; |
337 | dbf->n_in_buf[1] = 0; |
338 | } else if (readcode==0) { |
339 | // End of file. Save it. |
340 | dbf->error_code[1] = EOF; |
341 | dbf->n_in_buf[1] = 0; |
342 | dbf->io_done = true; |
343 | |
344 | } else { |
345 | dbf->error_code[1] = 0; |
346 | dbf->n_in_buf[1] = readcode; |
347 | } |
348 | |
349 | //printf("%s:%d locking mutex again=%ld\n", __FILE__, __LINE__, readcode); |
350 | { |
351 | toku_mutex_lock(&bfs->mutex); |
352 | if (paniced(bfs)) { |
353 | toku_mutex_unlock(&bfs->mutex); // ignore any error |
354 | toku_instr_delete_current_thread(); |
355 | return toku_pthread_done(nullptr); |
356 | } |
357 | } |
358 | // Now that we have the mutex, we can decrement n_not_done (if |
359 | // applicable) and set second_buf_ready |
360 | if (readcode<=0) { |
361 | bfs->n_not_done--; |
362 | } |
363 | //printf("%s:%d n_not_done=%d\n", __FILE__, __LINE__, bfs->n_not_done); |
364 | dbf->second_buf_ready = true; |
365 | toku_cond_broadcast(&bfs->cond); |
366 | //printf("%s:%d did broadcast=%d\n", __FILE__, __LINE__, bfs->n_not_done); |
367 | // Still have the lock so go around the loop |
368 | } |
369 | } |
370 | } |
371 | } |
372 | |
373 | int create_dbufio_fileset (DBUFIO_FILESET *bfsp, int N, int fds[/*N*/], size_t bufsize, bool compressed) { |
374 | //printf("%s:%d here\n", __FILE__, __LINE__); |
375 | int result = 0; |
376 | DBUFIO_FILESET CALLOC(bfs); |
377 | if (bfs==0) { result = get_error_errno(); } |
378 | |
379 | bfs->compressed = compressed; |
380 | |
381 | bool mutex_inited = false, cond_inited = false; |
382 | if (result==0) { |
383 | CALLOC_N(N, bfs->files); |
384 | if (bfs->files==NULL) { result = get_error_errno(); } |
385 | else { |
386 | for (int i=0; i<N; i++) { |
387 | bfs->files[i].buf[0] = bfs->files[i].buf[1] = NULL; |
388 | } |
389 | } |
390 | } |
391 | // printf("%s:%d here\n", __FILE__, __LINE__); |
392 | if (result == 0) { |
393 | toku_mutex_init(*bfs_mutex_key, &bfs->mutex, nullptr); |
394 | mutex_inited = true; |
395 | } |
396 | if (result == 0) { |
397 | toku_cond_init(*bfs_cond_key, &bfs->cond, nullptr); |
398 | cond_inited = true; |
399 | } |
400 | if (result == 0) { |
401 | bfs->N = N; |
402 | bfs->n_not_done = N; |
403 | bfs->head = bfs->tail = NULL; |
404 | for (int i = 0; i < N; i++) { |
405 | bfs->files[i].fd = fds[i]; |
406 | bfs->files[i].offset_in_buf = 0; |
407 | bfs->files[i].offset_in_uncompressed_file = 0; |
408 | bfs->files[i].next = NULL; |
409 | bfs->files[i].second_buf_ready = false; |
410 | for (int j = 0; j < 2; j++) { |
411 | if (result == 0) { |
412 | MALLOC_N(bufsize, bfs->files[i].buf[j]); |
413 | if (bfs->files[i].buf[j] == NULL) { |
414 | result = get_error_errno(); |
415 | } |
416 | } |
417 | bfs->files[i].n_in_buf[j] = 0; |
418 | bfs->files[i].error_code[j] = 0; |
419 | } |
420 | bfs->files[i].io_done = false; |
421 | ssize_t r; |
422 | if (bfs->compressed) { |
423 | r = dbf_read_compressed(&bfs->files[i], bfs->files[i].buf[0], bufsize); |
424 | } else { |
425 | r = toku_os_read(bfs->files[i].fd, bfs->files[i].buf[0], bufsize); |
426 | } |
427 | { |
428 | if (r<0) { |
429 | result=get_error_errno(); |
430 | break; |
431 | } else if (r==0) { |
432 | // it's EOF |
433 | bfs->files[i].io_done = true; |
434 | bfs->n_not_done--; |
435 | bfs->files[i].error_code[0] = EOF; |
436 | } else { |
437 | bfs->files[i].n_in_buf[0] = r; |
438 | //printf("%s:%d enq [%d]\n", __FILE__, __LINE__, i); |
439 | enq(bfs, &bfs->files[i]); |
440 | } |
441 | } |
442 | } |
443 | bfs->bufsize = bufsize; |
444 | bfs->panic = false; |
445 | bfs->panic_errno = 0; |
446 | } |
447 | // printf("Creating IO thread\n"); |
448 | if (result == 0) { |
449 | result = toku_pthread_create(*io_thread_key, |
450 | &bfs->iothread, |
451 | nullptr, |
452 | io_thread, |
453 | static_cast<void *>(bfs)); |
454 | } |
455 | if (result == 0) { |
456 | *bfsp = bfs; |
457 | return 0; |
458 | } |
459 | // Now undo everything. |
460 | // If we got here, there is no thread (either result was zero before the |
461 | // thread was created, or else the thread creation itself failed. |
462 | if (bfs) { |
463 | if (bfs->files) { |
464 | // the files were allocated, so we have to free all the bufs. |
465 | for (int i=0; i<N; i++) { |
466 | for (int j=0; j<2; j++) { |
467 | if (bfs->files[i].buf[j]) |
468 | toku_free(bfs->files[i].buf[j]); |
469 | bfs->files[i].buf[j]=NULL; |
470 | } |
471 | } |
472 | toku_free(bfs->files); |
473 | bfs->files=NULL; |
474 | } |
475 | if (cond_inited) { |
476 | toku_cond_destroy(&bfs->cond); // don't check error status |
477 | } |
478 | if (mutex_inited) { |
479 | toku_mutex_destroy(&bfs->mutex); // don't check error status |
480 | } |
481 | toku_free(bfs); |
482 | } |
483 | return result; |
484 | } |
485 | |
486 | int panic_dbufio_fileset(DBUFIO_FILESET bfs, int error) { |
487 | toku_mutex_lock(&bfs->mutex); |
488 | panic(bfs, error); |
489 | toku_cond_broadcast(&bfs->cond); |
490 | toku_mutex_unlock(&bfs->mutex); |
491 | return 0; |
492 | } |
493 | |
494 | int destroy_dbufio_fileset (DBUFIO_FILESET bfs) { |
495 | int result = 0; |
496 | { |
497 | void *retval; |
498 | int r = toku_pthread_join(bfs->iothread, &retval); |
499 | assert(r==0); |
500 | assert(retval==NULL); |
501 | } |
502 | { |
503 | toku_mutex_destroy(&bfs->mutex); |
504 | } |
505 | { |
506 | toku_cond_destroy(&bfs->cond); |
507 | } |
508 | if (bfs->files) { |
509 | for (int i=0; i<bfs->N; i++) { |
510 | for (int j=0; j<2; j++) { |
511 | //printf("%s:%d free([%d][%d]=%p\n", __FILE__, __LINE__, i,j, bfs->files[i].buf[j]); |
512 | toku_free(bfs->files[i].buf[j]); |
513 | } |
514 | } |
515 | toku_free(bfs->files); |
516 | } |
517 | toku_free(bfs); |
518 | return result; |
519 | } |
520 | |
521 | int dbufio_fileset_read (DBUFIO_FILESET bfs, int filenum, void *buf_v, size_t count, size_t *n_read) { |
522 | char *buf = (char*)buf_v; |
523 | struct dbufio_file *dbf = &bfs->files[filenum]; |
524 | if (dbf->error_code[0]!=0) return dbf->error_code[0]; |
525 | if (dbf->offset_in_buf + count <= dbf->n_in_buf[0]) { |
526 | // Enough data is present to do it all now |
527 | memcpy(buf, dbf->buf[0]+dbf->offset_in_buf, count); |
528 | dbf->offset_in_buf += count; |
529 | dbf->offset_in_uncompressed_file += count; |
530 | *n_read = count; |
531 | return 0; |
532 | } else if (dbf->n_in_buf[0] > dbf->offset_in_buf) { |
533 | // There is something in buf[0] |
534 | size_t this_count = dbf->n_in_buf[0]-dbf->offset_in_buf; |
535 | assert(dbf->offset_in_buf + this_count <= bfs->bufsize); |
536 | memcpy(buf, dbf->buf[0]+dbf->offset_in_buf, this_count); |
537 | dbf->offset_in_buf += this_count; |
538 | dbf->offset_in_uncompressed_file += this_count; |
539 | size_t sub_n_read; |
540 | int r = dbufio_fileset_read(bfs, filenum, buf+this_count, count-this_count, &sub_n_read); |
541 | if (r==0) { |
542 | *n_read = this_count + sub_n_read; |
543 | return 0; |
544 | } else { |
545 | // The error code will have been saved. We got some data so return that |
546 | *n_read = this_count; |
547 | return 0; |
548 | } |
549 | } else { |
550 | // There is nothing in buf[0]. So we need to swap buffers |
551 | toku_mutex_lock(&bfs->mutex); |
552 | while (1) { |
553 | if (dbf->second_buf_ready) { |
554 | dbf->n_in_buf[0] = dbf->n_in_buf[1]; |
555 | { |
556 | char *tmp = dbf->buf[0]; |
557 | dbf->buf[0] = dbf->buf[1]; |
558 | dbf->buf[1] = tmp; |
559 | } |
560 | dbf->error_code[0] = dbf->error_code[1]; |
561 | dbf->second_buf_ready = false; |
562 | dbf->offset_in_buf = 0; |
563 | if (!dbf->io_done) { |
564 | // Don't enqueue it if the I/O is all done. |
565 | //printf("%s:%d enq [%ld]\n", __FILE__, __LINE__, dbf-&bfs->files[0]); |
566 | enq(bfs, dbf); |
567 | } |
568 | toku_cond_broadcast(&bfs->cond); |
569 | toku_mutex_unlock(&bfs->mutex); |
570 | if (dbf->error_code[0]==0) { |
571 | assert(dbf->n_in_buf[0]>0); |
572 | return dbufio_fileset_read(bfs, filenum, buf_v, count, n_read); |
573 | } else { |
574 | *n_read = 0; |
575 | return dbf->error_code[0]; |
576 | } |
577 | } else { |
578 | toku_cond_wait(&bfs->cond, &bfs->mutex); |
579 | } |
580 | } |
581 | assert(0); // cannot get here. |
582 | } |
583 | } |
584 | |
585 | void |
586 | dbufio_print(DBUFIO_FILESET bfs) { |
587 | fprintf(stderr, "%s:%d bfs=%p" , __FILE__, __LINE__, bfs); |
588 | if (bfs->panic) |
589 | fprintf(stderr, " panic=%d" , bfs->panic_errno); |
590 | fprintf(stderr, " N=%d %d %" PRIuMAX, bfs->N, bfs->n_not_done, (uintmax_t) bfs->bufsize); |
591 | for (int i = 0; i < bfs->N; i++) { |
592 | struct dbufio_file *dbf = &bfs->files[i]; |
593 | if (dbf->error_code[0] || dbf->error_code[1]) |
594 | fprintf(stderr, " %d=[%d,%d]" , i, dbf->error_code[0], dbf->error_code[1]); |
595 | } |
596 | fprintf(stderr, "\n" ); |
597 | |
598 | } |
599 | |