1/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3#ident "$Id$"
4/*======
5This file is part of PerconaFT.
6
7
8Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 PerconaFT is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 PerconaFT is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
21
22----------------------------------------
23
24 PerconaFT is free software: you can redistribute it and/or modify
25 it under the terms of the GNU Affero General Public License, version 3,
26 as published by the Free Software Foundation.
27
28 PerconaFT is distributed in the hope that it will be useful,
29 but WITHOUT ANY WARRANTY; without even the implied warranty of
30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 GNU Affero General Public License for more details.
32
33 You should have received a copy of the GNU Affero General Public License
34 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
35======= */
36
37#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38
39#include <my_global.h>
40#include <errno.h>
41#include <string.h>
42#include <unistd.h>
43
44#include "portability/toku_assert.h"
45#include "portability/memory.h"
46
47#include "ft/ft-internal.h"
48#include "ft/serialize/ft_node-serialize.h"
49#include "loader/dbufio.h"
50#include "loader/loader-internal.h"
51
52toku_instr_key *bfs_mutex_key;
53toku_instr_key *bfs_cond_key;
54toku_instr_key *io_thread_key;
55
56struct dbufio_file {
57 // i/o thread owns these
58 int fd;
59
60 // consumers own these
61 size_t offset_in_buf;
62 toku_off_t offset_in_uncompressed_file;
63
64 // need the mutex to modify these
65 struct dbufio_file *next;
66 bool second_buf_ready; // if true, the i/o thread is not touching anything.
67
68 // consumers own [0], i/o thread owns [1], they are swapped by the consumer only when the condition mutex is held and second_buf_ready is true.
69 char *buf[2];
70 size_t n_in_buf[2];
71 int error_code[2]; // includes errno or eof. [0] is the error code associated with buf[0], [1] is the code for buf[1]
72
73 bool io_done;
74};
75
76
77/* A dbufio_fileset */
78struct dbufio_fileset {
79 // The mutex/condition variables protect
80 // the singly-linked list of files that need I/O (head/tail in the fileset, and next in each file)
81 // in each file:
82 // the second_buf_ready boolean (which says the second buffer is full of data).
83 // the swapping of the buf[], n_in_buf[], and error_code[] values.
84 toku_mutex_t mutex;
85 toku_cond_t cond;
86 int N; // How many files. This is constant once established.
87 int n_not_done; // how many of the files require more I/O? Owned by the i/o thread.
88 struct dbufio_file *files; // an array of length N.
89 struct dbufio_file *head, *tail; // must have the mutex to fiddle with these.
90 size_t bufsize; // the bufsize is the constant (the same for all buffers).
91
92 bool panic;
93 bool compressed;
94 int panic_errno;
95 toku_pthread_t iothread;
96};
97
98
99static void enq (DBUFIO_FILESET bfs, struct dbufio_file *f) {
100 if (bfs->tail==NULL) {
101 bfs->head = f;
102 } else {
103 bfs->tail->next = f;
104 }
105 bfs->tail = f;
106 f->next = NULL;
107}
108
109static void panic (DBUFIO_FILESET bfs, int r) {
110 if (bfs->panic) return;
111 bfs->panic_errno = r; // Don't really care about a race on this variable... Writes to it are atomic, so at least one good panic reason will be stored.
112 bfs->panic = true;
113 return;
114}
115
116static bool paniced (DBUFIO_FILESET bfs) {
117 return bfs->panic;
118}
119
120static ssize_t dbf_read_some_compressed(struct dbufio_file *dbf, char *buf, size_t bufsize) {
121 ssize_t ret;
122 invariant(bufsize >= MAX_UNCOMPRESSED_BUF);
123 unsigned char *raw_block = NULL;
124
125 // deserialize the sub block header
126
127 // total_size
128 // num_sub_blocks
129 // compressed_size,uncompressed_size,xsum (repeated num_sub_blocks times)
130 ssize_t readcode;
131 const uint32_t header_size = sizeof(uint32_t);
132 char header[header_size];
133
134 readcode = toku_os_read(dbf->fd, &header, header_size);
135 if (readcode < 0) {
136 ret = -1;
137 goto exit;
138 }
139 if (readcode == 0) {
140 ret = 0;
141 goto exit;
142 }
143 if (readcode < (ssize_t) header_size) {
144 errno = TOKUDB_NO_DATA;
145 ret = -1;
146 goto exit;
147 }
148 uint32_t total_size;
149 {
150 uint32_t *p = (uint32_t *) &header[0];
151 total_size = toku_dtoh32(p[0]);
152 }
153 if (total_size == 0 || total_size > (1<<30)) {
154 errno = toku_db_badformat();
155 ret = -1;
156 goto exit;
157 }
158
159 //Cannot use XMALLOC
160 MALLOC_N(total_size, raw_block);
161 if (raw_block == nullptr) {
162 errno = ENOMEM;
163 ret = -1;
164 goto exit;
165 }
166 readcode = toku_os_read(dbf->fd, raw_block, total_size);
167 if (readcode < 0) {
168 ret = -1;
169 goto exit;
170 }
171 if (readcode < (ssize_t) total_size) {
172 errno = TOKUDB_NO_DATA;
173 ret = -1;
174 goto exit;
175 }
176
177 struct sub_block sub_block[max_sub_blocks];
178 uint32_t *sub_block_header;
179 sub_block_header = (uint32_t *) &raw_block[0];
180 int32_t n_sub_blocks;
181 n_sub_blocks = toku_dtoh32(sub_block_header[0]);
182 sub_block_header++;
183 size_t size_subblock_header;
184 size_subblock_header = sub_block_header_size(n_sub_blocks);
185 if (n_sub_blocks == 0 || n_sub_blocks > max_sub_blocks || size_subblock_header > total_size) {
186 errno = toku_db_badformat();
187 ret = -1;
188 goto exit;
189 }
190 for (int i = 0; i < n_sub_blocks; i++) {
191 sub_block_init(&sub_block[i]);
192 sub_block[i].compressed_size = toku_dtoh32(sub_block_header[0]);
193 sub_block[i].uncompressed_size = toku_dtoh32(sub_block_header[1]);
194 sub_block[i].xsum = toku_dtoh32(sub_block_header[2]);
195 sub_block_header += 3;
196 }
197
198 // verify sub block sizes
199 size_t total_compressed_size;
200 total_compressed_size = 0;
201 for (int i = 0; i < n_sub_blocks; i++) {
202 uint32_t compressed_size = sub_block[i].compressed_size;
203 if (compressed_size<=0 || compressed_size>(1<<30)) {
204 errno = toku_db_badformat();
205 ret = -1;
206 goto exit;
207 }
208
209 uint32_t uncompressed_size = sub_block[i].uncompressed_size;
210 if (uncompressed_size<=0 || uncompressed_size>(1<<30)) {
211 errno = toku_db_badformat();
212 ret = -1;
213 goto exit;
214 }
215 total_compressed_size += compressed_size;
216 }
217 if (total_size != total_compressed_size + size_subblock_header) {
218 errno = toku_db_badformat();
219 ret = -1;
220 goto exit;
221 }
222
223 // sum up the uncompressed size of the sub blocks
224 size_t uncompressed_size;
225 uncompressed_size = get_sum_uncompressed_size(n_sub_blocks, sub_block);
226 if (uncompressed_size > bufsize || uncompressed_size > MAX_UNCOMPRESSED_BUF) {
227 errno = toku_db_badformat();
228 ret = -1;
229 goto exit;
230 }
231
232 unsigned char *uncompressed_data;
233 uncompressed_data = (unsigned char *)buf;
234
235 // point at the start of the compressed data (past the node header, the sub block header, and the header checksum)
236 unsigned char *compressed_data;
237 compressed_data = raw_block + size_subblock_header;
238
239 // decompress all the compressed sub blocks into the uncompressed buffer
240 {
241 int r;
242 r = decompress_all_sub_blocks(n_sub_blocks, sub_block, compressed_data, uncompressed_data, get_num_cores(), get_ft_pool());
243 if (r != 0) {
244 fprintf(stderr, "%s:%d loader failed %d at %p size %" PRIu32"\n", __FUNCTION__, __LINE__, r, raw_block, total_size);
245 dump_bad_block(raw_block, total_size);
246 errno = r;
247 ret = -1;
248 goto exit;
249 }
250 }
251 ret = uncompressed_size;
252exit:
253 if (raw_block) {
254 toku_free(raw_block);
255 }
256 return ret;
257}
258
259static ssize_t dbf_read_compressed(struct dbufio_file *dbf, char *buf, size_t bufsize) {
260 invariant(bufsize >= MAX_UNCOMPRESSED_BUF);
261 size_t count = 0;
262
263 while (count + MAX_UNCOMPRESSED_BUF <= bufsize) {
264 ssize_t readcode = dbf_read_some_compressed(dbf, buf + count, bufsize - count);
265 if (readcode < 0) {
266 return readcode;
267 }
268 count += readcode;
269 if (readcode == 0) {
270 break;
271 }
272 }
273 return count;
274}
275
276static void* io_thread (void *v)
277// The dbuf_thread does all the asynchronous I/O.
278{
279 DBUFIO_FILESET bfs = (DBUFIO_FILESET)v;
280 toku_mutex_lock(&bfs->mutex);
281 //printf("%s:%d Locked\n", __FILE__, __LINE__);
282 while (1) {
283 if (paniced(bfs)) {
284 toku_mutex_unlock(&bfs->mutex); // ignore any error
285 toku_instr_delete_current_thread();
286 return toku_pthread_done(nullptr);
287 }
288 // printf("n_not_done=%d\n", bfs->n_not_done);
289 if (bfs->n_not_done == 0) {
290 // all done (meaning we stored EOF (or another error) in
291 // error_code[0] for the file.
292 // printf("unlocked\n");
293 toku_mutex_unlock(&bfs->mutex);
294 toku_instr_delete_current_thread();
295 return toku_pthread_done(nullptr);
296 }
297
298 struct dbufio_file *dbf = bfs->head;
299 if (dbf == NULL) {
300 // No I/O needs to be done yet.
301 // Wait until something happens that will wake us up.
302 toku_cond_wait(&bfs->cond, &bfs->mutex);
303 if (paniced(bfs)) {
304 toku_mutex_unlock(&bfs->mutex); // ignore any error
305 toku_instr_delete_current_thread();
306 return toku_pthread_done(nullptr);
307 }
308 // Have the lock so go around.
309 } else {
310 // Some I/O needs to be done.
311 // printf("%s:%d Need I/O\n", __FILE__, __LINE__);
312 assert(dbf->second_buf_ready == false);
313 assert(!dbf->io_done);
314 bfs->head = dbf->next;
315 if (bfs->head == NULL)
316 bfs->tail = NULL;
317
318 // Unlock the mutex now that we have ownership of dbf to allow
319 // consumers to get the mutex and perform swaps. They won't swap
320 // this buffer because second_buf_ready is false.
321 toku_mutex_unlock(&bfs->mutex);
322 //printf("%s:%d Doing read fd=%d\n", __FILE__, __LINE__, dbf->fd);
323 {
324 ssize_t readcode;
325 if (bfs->compressed) {
326 readcode = dbf_read_compressed(dbf, dbf->buf[1], bfs->bufsize);
327 }
328 else {
329 readcode = toku_os_read(dbf->fd, dbf->buf[1], bfs->bufsize);
330 }
331 //printf("%s:%d readcode=%ld\n", __FILE__, __LINE__, readcode);
332 if (readcode==-1) {
333 // a real error. Save the real error.
334 int the_errno = get_error_errno();
335 fprintf(stderr, "%s:%d dbf=%p fd=%d errno=%d\n", __FILE__, __LINE__, dbf, dbf->fd, the_errno);
336 dbf->error_code[1] = the_errno;
337 dbf->n_in_buf[1] = 0;
338 } else if (readcode==0) {
339 // End of file. Save it.
340 dbf->error_code[1] = EOF;
341 dbf->n_in_buf[1] = 0;
342 dbf->io_done = true;
343
344 } else {
345 dbf->error_code[1] = 0;
346 dbf->n_in_buf[1] = readcode;
347 }
348
349 //printf("%s:%d locking mutex again=%ld\n", __FILE__, __LINE__, readcode);
350 {
351 toku_mutex_lock(&bfs->mutex);
352 if (paniced(bfs)) {
353 toku_mutex_unlock(&bfs->mutex); // ignore any error
354 toku_instr_delete_current_thread();
355 return toku_pthread_done(nullptr);
356 }
357 }
358 // Now that we have the mutex, we can decrement n_not_done (if
359 // applicable) and set second_buf_ready
360 if (readcode<=0) {
361 bfs->n_not_done--;
362 }
363 //printf("%s:%d n_not_done=%d\n", __FILE__, __LINE__, bfs->n_not_done);
364 dbf->second_buf_ready = true;
365 toku_cond_broadcast(&bfs->cond);
366 //printf("%s:%d did broadcast=%d\n", __FILE__, __LINE__, bfs->n_not_done);
367 // Still have the lock so go around the loop
368 }
369 }
370 }
371}
372
373int create_dbufio_fileset (DBUFIO_FILESET *bfsp, int N, int fds[/*N*/], size_t bufsize, bool compressed) {
374 //printf("%s:%d here\n", __FILE__, __LINE__);
375 int result = 0;
376 DBUFIO_FILESET CALLOC(bfs);
377 if (bfs==0) { result = get_error_errno(); }
378
379 bfs->compressed = compressed;
380
381 bool mutex_inited = false, cond_inited = false;
382 if (result==0) {
383 CALLOC_N(N, bfs->files);
384 if (bfs->files==NULL) { result = get_error_errno(); }
385 else {
386 for (int i=0; i<N; i++) {
387 bfs->files[i].buf[0] = bfs->files[i].buf[1] = NULL;
388 }
389 }
390 }
391 // printf("%s:%d here\n", __FILE__, __LINE__);
392 if (result == 0) {
393 toku_mutex_init(*bfs_mutex_key, &bfs->mutex, nullptr);
394 mutex_inited = true;
395 }
396 if (result == 0) {
397 toku_cond_init(*bfs_cond_key, &bfs->cond, nullptr);
398 cond_inited = true;
399 }
400 if (result == 0) {
401 bfs->N = N;
402 bfs->n_not_done = N;
403 bfs->head = bfs->tail = NULL;
404 for (int i = 0; i < N; i++) {
405 bfs->files[i].fd = fds[i];
406 bfs->files[i].offset_in_buf = 0;
407 bfs->files[i].offset_in_uncompressed_file = 0;
408 bfs->files[i].next = NULL;
409 bfs->files[i].second_buf_ready = false;
410 for (int j = 0; j < 2; j++) {
411 if (result == 0) {
412 MALLOC_N(bufsize, bfs->files[i].buf[j]);
413 if (bfs->files[i].buf[j] == NULL) {
414 result = get_error_errno();
415 }
416 }
417 bfs->files[i].n_in_buf[j] = 0;
418 bfs->files[i].error_code[j] = 0;
419 }
420 bfs->files[i].io_done = false;
421 ssize_t r;
422 if (bfs->compressed) {
423 r = dbf_read_compressed(&bfs->files[i], bfs->files[i].buf[0], bufsize);
424 } else {
425 r = toku_os_read(bfs->files[i].fd, bfs->files[i].buf[0], bufsize);
426 }
427 {
428 if (r<0) {
429 result=get_error_errno();
430 break;
431 } else if (r==0) {
432 // it's EOF
433 bfs->files[i].io_done = true;
434 bfs->n_not_done--;
435 bfs->files[i].error_code[0] = EOF;
436 } else {
437 bfs->files[i].n_in_buf[0] = r;
438 //printf("%s:%d enq [%d]\n", __FILE__, __LINE__, i);
439 enq(bfs, &bfs->files[i]);
440 }
441 }
442 }
443 bfs->bufsize = bufsize;
444 bfs->panic = false;
445 bfs->panic_errno = 0;
446 }
447 // printf("Creating IO thread\n");
448 if (result == 0) {
449 result = toku_pthread_create(*io_thread_key,
450 &bfs->iothread,
451 nullptr,
452 io_thread,
453 static_cast<void *>(bfs));
454 }
455 if (result == 0) {
456 *bfsp = bfs;
457 return 0;
458 }
459 // Now undo everything.
460 // If we got here, there is no thread (either result was zero before the
461 // thread was created, or else the thread creation itself failed.
462 if (bfs) {
463 if (bfs->files) {
464 // the files were allocated, so we have to free all the bufs.
465 for (int i=0; i<N; i++) {
466 for (int j=0; j<2; j++) {
467 if (bfs->files[i].buf[j])
468 toku_free(bfs->files[i].buf[j]);
469 bfs->files[i].buf[j]=NULL;
470 }
471 }
472 toku_free(bfs->files);
473 bfs->files=NULL;
474 }
475 if (cond_inited) {
476 toku_cond_destroy(&bfs->cond); // don't check error status
477 }
478 if (mutex_inited) {
479 toku_mutex_destroy(&bfs->mutex); // don't check error status
480 }
481 toku_free(bfs);
482 }
483 return result;
484}
485
486int panic_dbufio_fileset(DBUFIO_FILESET bfs, int error) {
487 toku_mutex_lock(&bfs->mutex);
488 panic(bfs, error);
489 toku_cond_broadcast(&bfs->cond);
490 toku_mutex_unlock(&bfs->mutex);
491 return 0;
492}
493
494int destroy_dbufio_fileset (DBUFIO_FILESET bfs) {
495 int result = 0;
496 {
497 void *retval;
498 int r = toku_pthread_join(bfs->iothread, &retval);
499 assert(r==0);
500 assert(retval==NULL);
501 }
502 {
503 toku_mutex_destroy(&bfs->mutex);
504 }
505 {
506 toku_cond_destroy(&bfs->cond);
507 }
508 if (bfs->files) {
509 for (int i=0; i<bfs->N; i++) {
510 for (int j=0; j<2; j++) {
511 //printf("%s:%d free([%d][%d]=%p\n", __FILE__, __LINE__, i,j, bfs->files[i].buf[j]);
512 toku_free(bfs->files[i].buf[j]);
513 }
514 }
515 toku_free(bfs->files);
516 }
517 toku_free(bfs);
518 return result;
519}
520
521int dbufio_fileset_read (DBUFIO_FILESET bfs, int filenum, void *buf_v, size_t count, size_t *n_read) {
522 char *buf = (char*)buf_v;
523 struct dbufio_file *dbf = &bfs->files[filenum];
524 if (dbf->error_code[0]!=0) return dbf->error_code[0];
525 if (dbf->offset_in_buf + count <= dbf->n_in_buf[0]) {
526 // Enough data is present to do it all now
527 memcpy(buf, dbf->buf[0]+dbf->offset_in_buf, count);
528 dbf->offset_in_buf += count;
529 dbf->offset_in_uncompressed_file += count;
530 *n_read = count;
531 return 0;
532 } else if (dbf->n_in_buf[0] > dbf->offset_in_buf) {
533 // There is something in buf[0]
534 size_t this_count = dbf->n_in_buf[0]-dbf->offset_in_buf;
535 assert(dbf->offset_in_buf + this_count <= bfs->bufsize);
536 memcpy(buf, dbf->buf[0]+dbf->offset_in_buf, this_count);
537 dbf->offset_in_buf += this_count;
538 dbf->offset_in_uncompressed_file += this_count;
539 size_t sub_n_read;
540 int r = dbufio_fileset_read(bfs, filenum, buf+this_count, count-this_count, &sub_n_read);
541 if (r==0) {
542 *n_read = this_count + sub_n_read;
543 return 0;
544 } else {
545 // The error code will have been saved. We got some data so return that
546 *n_read = this_count;
547 return 0;
548 }
549 } else {
550 // There is nothing in buf[0]. So we need to swap buffers
551 toku_mutex_lock(&bfs->mutex);
552 while (1) {
553 if (dbf->second_buf_ready) {
554 dbf->n_in_buf[0] = dbf->n_in_buf[1];
555 {
556 char *tmp = dbf->buf[0];
557 dbf->buf[0] = dbf->buf[1];
558 dbf->buf[1] = tmp;
559 }
560 dbf->error_code[0] = dbf->error_code[1];
561 dbf->second_buf_ready = false;
562 dbf->offset_in_buf = 0;
563 if (!dbf->io_done) {
564 // Don't enqueue it if the I/O is all done.
565 //printf("%s:%d enq [%ld]\n", __FILE__, __LINE__, dbf-&bfs->files[0]);
566 enq(bfs, dbf);
567 }
568 toku_cond_broadcast(&bfs->cond);
569 toku_mutex_unlock(&bfs->mutex);
570 if (dbf->error_code[0]==0) {
571 assert(dbf->n_in_buf[0]>0);
572 return dbufio_fileset_read(bfs, filenum, buf_v, count, n_read);
573 } else {
574 *n_read = 0;
575 return dbf->error_code[0];
576 }
577 } else {
578 toku_cond_wait(&bfs->cond, &bfs->mutex);
579 }
580 }
581 assert(0); // cannot get here.
582 }
583}
584
585void
586dbufio_print(DBUFIO_FILESET bfs) {
587 fprintf(stderr, "%s:%d bfs=%p", __FILE__, __LINE__, bfs);
588 if (bfs->panic)
589 fprintf(stderr, " panic=%d", bfs->panic_errno);
590 fprintf(stderr, " N=%d %d %" PRIuMAX, bfs->N, bfs->n_not_done, (uintmax_t) bfs->bufsize);
591 for (int i = 0; i < bfs->N; i++) {
592 struct dbufio_file *dbf = &bfs->files[i];
593 if (dbf->error_code[0] || dbf->error_code[1])
594 fprintf(stderr, " %d=[%d,%d]", i, dbf->error_code[0], dbf->error_code[1]);
595 }
596 fprintf(stderr, "\n");
597
598}
599