1 | /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ |
2 | // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: |
3 | #ident "$Id$" |
4 | /*====== |
5 | This file is part of PerconaFT. |
6 | |
7 | |
8 | Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. |
9 | |
10 | PerconaFT is free software: you can redistribute it and/or modify |
11 | it under the terms of the GNU General Public License, version 2, |
12 | as published by the Free Software Foundation. |
13 | |
14 | PerconaFT is distributed in the hope that it will be useful, |
15 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
16 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
17 | GNU General Public License for more details. |
18 | |
19 | You should have received a copy of the GNU General Public License |
20 | along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. |
21 | |
22 | ---------------------------------------- |
23 | |
24 | PerconaFT is free software: you can redistribute it and/or modify |
25 | it under the terms of the GNU Affero General Public License, version 3, |
26 | as published by the Free Software Foundation. |
27 | |
28 | PerconaFT is distributed in the hope that it will be useful, |
29 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
30 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
31 | GNU Affero General Public License for more details. |
32 | |
33 | You should have received a copy of the GNU Affero General Public License |
34 | along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. |
35 | ======= */ |
36 | |
37 | #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved." |
38 | |
39 | #include "portability/toku_atomic.h" |
40 | |
41 | #include "ft/cachetable/cachetable.h" |
42 | #include "ft/ft.h" |
43 | #include "ft/ft-internal.h" |
44 | #include "ft/node.h" |
45 | #include "ft/logger/log-internal.h" |
46 | #include "ft/txn/rollback.h" |
47 | #include "ft/serialize/block_allocator.h" |
48 | #include "ft/serialize/block_table.h" |
49 | #include "ft/serialize/compress.h" |
50 | #include "ft/serialize/ft_node-serialize.h" |
51 | #include "ft/serialize/sub_block.h" |
52 | #include "util/sort.h" |
53 | #include "util/threadpool.h" |
54 | #include "util/status.h" |
55 | #include "util/scoped_malloc.h" |
56 | |
57 | static FT_UPGRADE_STATUS_S ft_upgrade_status; |
58 | |
59 | #define STATUS_INIT(k,c,t,l,inc) TOKUFT_STATUS_INIT(ft_upgrade_status, k, c, t, "ft upgrade: " l, inc) |
60 | |
61 | static void |
62 | status_init(void) |
63 | { |
64 | // Note, this function initializes the keyname, type, and legend fields. |
65 | // Value fields are initialized to zero by compiler. |
66 | STATUS_INIT(FT_UPGRADE_FOOTPRINT, nullptr, UINT64, "footprint" , TOKU_ENGINE_STATUS); |
67 | ft_upgrade_status.initialized = true; |
68 | } |
69 | #undef STATUS_INIT |
70 | |
71 | #define UPGRADE_STATUS_VALUE(x) ft_upgrade_status.status[x].value.num |
72 | |
73 | void |
74 | toku_ft_upgrade_get_status(FT_UPGRADE_STATUS s) { |
75 | if (!ft_upgrade_status.initialized) { |
76 | status_init(); |
77 | } |
78 | UPGRADE_STATUS_VALUE(FT_UPGRADE_FOOTPRINT) = toku_log_upgrade_get_footprint(); |
79 | *s = ft_upgrade_status; |
80 | } |
81 | |
82 | static int num_cores = 0; // cache the number of cores for the parallelization |
83 | static struct toku_thread_pool *ft_pool = NULL; |
84 | bool toku_serialize_in_parallel; |
85 | |
86 | int get_num_cores(void) { |
87 | return num_cores; |
88 | } |
89 | |
90 | struct toku_thread_pool *get_ft_pool(void) { |
91 | return ft_pool; |
92 | } |
93 | |
94 | void toku_serialize_set_parallel(bool in_parallel) { |
95 | toku_unsafe_set(&toku_serialize_in_parallel, in_parallel); |
96 | } |
97 | |
98 | void toku_ft_serialize_layer_init(void) { |
99 | num_cores = toku_os_get_number_active_processors(); |
100 | int r = toku_thread_pool_create(&ft_pool, num_cores); |
101 | lazy_assert_zero(r); |
102 | toku_serialize_in_parallel = false; |
103 | } |
104 | |
105 | void toku_ft_serialize_layer_destroy(void) { |
106 | toku_thread_pool_destroy(&ft_pool); |
107 | } |
108 | |
109 | enum { FILE_CHANGE_INCREMENT = (16 << 20) }; |
110 | |
111 | static inline uint64_t |
112 | alignup64(uint64_t a, uint64_t b) { |
113 | return ((a+b-1)/b)*b; |
114 | } |
115 | |
116 | // safe_file_size_lock must be held. |
117 | void |
118 | toku_maybe_truncate_file (int fd, uint64_t size_used, uint64_t expected_size, uint64_t *new_sizep) |
119 | // Effect: If file size >= SIZE+32MiB, reduce file size. |
120 | // (32 instead of 16.. hysteresis). |
121 | // Return 0 on success, otherwise an error number. |
122 | { |
123 | int64_t file_size; |
124 | { |
125 | int r = toku_os_get_file_size(fd, &file_size); |
126 | lazy_assert_zero(r); |
127 | invariant(file_size >= 0); |
128 | } |
129 | invariant(expected_size == (uint64_t)file_size); |
130 | // If file space is overallocated by at least 32M |
131 | if ((uint64_t)file_size >= size_used + (2*FILE_CHANGE_INCREMENT)) { |
132 | toku_off_t new_size = alignup64(size_used, (2*FILE_CHANGE_INCREMENT)); //Truncate to new size_used. |
133 | invariant(new_size < file_size); |
134 | invariant(new_size >= 0); |
135 | int r = ftruncate(fd, new_size); |
136 | lazy_assert_zero(r); |
137 | *new_sizep = new_size; |
138 | } |
139 | else { |
140 | *new_sizep = file_size; |
141 | } |
142 | return; |
143 | } |
144 | |
145 | static int64_t |
146 | min64(int64_t a, int64_t b) { |
147 | if (a<b) return a; |
148 | return b; |
149 | } |
150 | |
151 | void |
152 | toku_maybe_preallocate_in_file (int fd, int64_t size, int64_t expected_size, int64_t *new_size) |
153 | // Effect: make the file bigger by either doubling it or growing by 16MiB whichever is less, until it is at least size |
154 | // Return 0 on success, otherwise an error number. |
155 | { |
156 | int64_t file_size = 0; |
157 | //TODO(yoni): Allow variable stripe_width (perhaps from ft) for larger raids |
158 | const uint64_t stripe_width = 4096; |
159 | { |
160 | int r = toku_os_get_file_size(fd, &file_size); |
161 | if (r != 0) { // debug #2463 |
162 | int the_errno = get_maybe_error_errno(); |
163 | fprintf(stderr, "%s:%d fd=%d size=%" PRIu64 " r=%d errno=%d\n" , __FUNCTION__, __LINE__, fd, size, r, the_errno); fflush(stderr); |
164 | } |
165 | lazy_assert_zero(r); |
166 | } |
167 | invariant(file_size >= 0); |
168 | invariant(expected_size == file_size); |
169 | // We want to double the size of the file, or add 16MiB, whichever is less. |
170 | // We emulate calling this function repeatedly until it satisfies the request. |
171 | int64_t to_write = 0; |
172 | if (file_size == 0) { |
173 | // Prevent infinite loop by starting with stripe_width as a base case. |
174 | to_write = stripe_width; |
175 | } |
176 | while (file_size + to_write < size) { |
177 | to_write += alignup64(min64(file_size + to_write, FILE_CHANGE_INCREMENT), stripe_width); |
178 | } |
179 | if (to_write > 0) { |
180 | assert(to_write%512==0); |
181 | toku::scoped_malloc_aligned wbuf_aligned(to_write, 512); |
182 | char *wbuf = reinterpret_cast<char *>(wbuf_aligned.get()); |
183 | memset(wbuf, 0, to_write); |
184 | toku_off_t start_write = alignup64(file_size, stripe_width); |
185 | invariant(start_write >= file_size); |
186 | toku_os_full_pwrite(fd, wbuf, to_write, start_write); |
187 | *new_size = start_write + to_write; |
188 | } |
189 | else { |
190 | *new_size = file_size; |
191 | } |
192 | } |
193 | |
194 | // Don't include the sub_block header |
195 | // Overhead calculated in same order fields are written to wbuf |
196 | enum { |
197 | = (8+ // magic "tokunode" or "tokuleaf" or "tokuroll" |
198 | 4+ // layout_version |
199 | 4+ // layout_version_original |
200 | 4), // build_id |
201 | }; |
202 | |
203 | // uncompressed header offsets |
204 | enum { |
205 | uncompressed_magic_offset = 0, |
206 | uncompressed_version_offset = 8, |
207 | }; |
208 | |
209 | static uint32_t |
210 | (FTNODE node) { |
211 | uint32_t retval = 0; |
212 | retval += 8; // magic |
213 | retval += sizeof(node->layout_version); |
214 | retval += sizeof(node->layout_version_original); |
215 | retval += 4; // BUILD_ID |
216 | retval += 4; // n_children |
217 | retval += node->n_children*8; // encode start offset and length of each partition |
218 | retval += 4; // checksum |
219 | return retval; |
220 | } |
221 | |
222 | static void |
223 | (FTNODE node, FTNODE_DISK_DATA ndd, struct wbuf *wbuf) { |
224 | if (node->height == 0) |
225 | wbuf_nocrc_literal_bytes(wbuf, "tokuleaf" , 8); |
226 | else |
227 | wbuf_nocrc_literal_bytes(wbuf, "tokunode" , 8); |
228 | paranoid_invariant(node->layout_version == FT_LAYOUT_VERSION); |
229 | wbuf_nocrc_int(wbuf, node->layout_version); |
230 | wbuf_nocrc_int(wbuf, node->layout_version_original); |
231 | wbuf_nocrc_uint(wbuf, BUILD_ID); |
232 | wbuf_nocrc_int (wbuf, node->n_children); |
233 | for (int i=0; i<node->n_children; i++) { |
234 | assert(BP_SIZE(ndd,i)>0); |
235 | wbuf_nocrc_int(wbuf, BP_START(ndd, i)); // save the beginning of the partition |
236 | wbuf_nocrc_int(wbuf, BP_SIZE (ndd, i)); // and the size |
237 | } |
238 | // checksum the header |
239 | uint32_t end_to_end_checksum = toku_x1764_memory(wbuf->buf, wbuf_get_woffset(wbuf)); |
240 | wbuf_nocrc_int(wbuf, end_to_end_checksum); |
241 | invariant(wbuf->ndone == wbuf->size); |
242 | } |
243 | |
244 | static uint32_t |
245 | serialize_ftnode_partition_size (FTNODE node, int i) |
246 | { |
247 | uint32_t result = 0; |
248 | paranoid_invariant(node->bp[i].state == PT_AVAIL); |
249 | result++; // Byte that states what the partition is |
250 | if (node->height > 0) { |
251 | NONLEAF_CHILDINFO bnc = BNC(node, i); |
252 | // number of messages (4 bytes) plus size of the buffer |
253 | result += (4 + toku_bnc_nbytesinbuf(bnc)); |
254 | // number of offsets (4 bytes) plus an array of 4 byte offsets, for each message tree |
255 | result += (4 + (4 * bnc->fresh_message_tree.size())); |
256 | result += (4 + (4 * bnc->stale_message_tree.size())); |
257 | result += (4 + (4 * bnc->broadcast_list.size())); |
258 | } |
259 | else { |
260 | result += 4 + bn_data::HEADER_LENGTH; // n_entries in buffer table + basement header |
261 | result += BLB_NBYTESINDATA(node, i); |
262 | } |
263 | result += 4; // checksum |
264 | return result; |
265 | } |
266 | |
267 | #define FTNODE_PARTITION_DMT_LEAVES 0xaa |
268 | #define FTNODE_PARTITION_MSG_BUFFER 0xbb |
269 | |
270 | UU() static int |
271 | assert_fresh(const int32_t &offset, const uint32_t UU(idx), message_buffer *const msg_buffer) { |
272 | bool is_fresh = msg_buffer->get_freshness(offset); |
273 | assert(is_fresh); |
274 | return 0; |
275 | } |
276 | |
277 | UU() static int |
278 | assert_stale(const int32_t &offset, const uint32_t UU(idx), message_buffer *const msg_buffer) { |
279 | bool is_fresh = msg_buffer->get_freshness(offset); |
280 | assert(!is_fresh); |
281 | return 0; |
282 | } |
283 | |
284 | static void bnc_verify_message_trees(NONLEAF_CHILDINFO UU(bnc)) { |
285 | #ifdef TOKU_DEBUG_PARANOID |
286 | bnc->fresh_message_tree.iterate<message_buffer, assert_fresh>(&bnc->msg_buffer); |
287 | bnc->stale_message_tree.iterate<message_buffer, assert_stale>(&bnc->msg_buffer); |
288 | #endif |
289 | } |
290 | |
291 | static int |
292 | wbuf_write_offset(const int32_t &offset, const uint32_t UU(idx), struct wbuf *const wb) { |
293 | wbuf_nocrc_int(wb, offset); |
294 | return 0; |
295 | } |
296 | |
297 | static void serialize_child_buffer(NONLEAF_CHILDINFO bnc, struct wbuf *wb) { |
298 | unsigned char ch = FTNODE_PARTITION_MSG_BUFFER; |
299 | wbuf_nocrc_char(wb, ch); |
300 | |
301 | // serialize the message buffer |
302 | bnc->msg_buffer.serialize_to_wbuf(wb); |
303 | |
304 | // serialize the message trees (num entries, offsets array): |
305 | // first, verify their contents are consistent with the message buffer |
306 | bnc_verify_message_trees(bnc); |
307 | |
308 | // fresh |
309 | wbuf_nocrc_int(wb, bnc->fresh_message_tree.size()); |
310 | bnc->fresh_message_tree.iterate<struct wbuf, wbuf_write_offset>(wb); |
311 | |
312 | // stale |
313 | wbuf_nocrc_int(wb, bnc->stale_message_tree.size()); |
314 | bnc->stale_message_tree.iterate<struct wbuf, wbuf_write_offset>(wb); |
315 | |
316 | // broadcast |
317 | wbuf_nocrc_int(wb, bnc->broadcast_list.size()); |
318 | bnc->broadcast_list.iterate<struct wbuf, wbuf_write_offset>(wb); |
319 | } |
320 | |
321 | // |
322 | // Serialize the i'th partition of node into sb |
323 | // For leaf nodes, this would be the i'th basement node |
324 | // For internal nodes, this would be the i'th internal node |
325 | // |
326 | static void |
327 | serialize_ftnode_partition(FTNODE node, int i, struct sub_block *sb) { |
328 | // Caller should have allocated memory. |
329 | invariant_notnull(sb->uncompressed_ptr); |
330 | invariant(sb->uncompressed_size > 0); |
331 | paranoid_invariant(sb->uncompressed_size == serialize_ftnode_partition_size(node, i)); |
332 | |
333 | // |
334 | // Now put the data into sb->uncompressed_ptr |
335 | // |
336 | struct wbuf wb; |
337 | wbuf_init(&wb, sb->uncompressed_ptr, sb->uncompressed_size); |
338 | if (node->height > 0) { |
339 | // TODO: (Zardosht) possibly exit early if there are no messages |
340 | serialize_child_buffer(BNC(node, i), &wb); |
341 | } |
342 | else { |
343 | unsigned char ch = FTNODE_PARTITION_DMT_LEAVES; |
344 | bn_data* bd = BLB_DATA(node, i); |
345 | |
346 | wbuf_nocrc_char(&wb, ch); |
347 | wbuf_nocrc_uint(&wb, bd->num_klpairs()); |
348 | |
349 | bd->serialize_to_wbuf(&wb); |
350 | } |
351 | uint32_t end_to_end_checksum = toku_x1764_memory(sb->uncompressed_ptr, wbuf_get_woffset(&wb)); |
352 | wbuf_nocrc_int(&wb, end_to_end_checksum); |
353 | invariant(wb.ndone == wb.size); |
354 | invariant(sb->uncompressed_size==wb.ndone); |
355 | } |
356 | |
357 | // |
358 | // Takes the data in sb->uncompressed_ptr, and compresses it |
359 | // into a newly allocated buffer sb->compressed_ptr |
360 | // |
361 | static void |
362 | compress_ftnode_sub_block(struct sub_block *sb, enum toku_compression_method method) { |
363 | invariant(sb->compressed_ptr != nullptr); |
364 | invariant(sb->compressed_size_bound > 0); |
365 | paranoid_invariant(sb->compressed_size_bound == toku_compress_bound(method, sb->uncompressed_size)); |
366 | |
367 | // |
368 | // This probably seems a bit complicated. Here is what is going on. |
369 | // In PerconaFT 5.0, sub_blocks were compressed and the compressed data |
370 | // was checksummed. The checksum did NOT include the size of the compressed data |
371 | // and the size of the uncompressed data. The fields of sub_block only reference the |
372 | // compressed data, and it is the responsibility of the user of the sub_block |
373 | // to write the length |
374 | // |
375 | // For Dr. No, we want the checksum to also include the size of the compressed data, and the |
376 | // size of the decompressed data, because this data |
377 | // may be read off of disk alone, so it must be verifiable alone. |
378 | // |
379 | // So, we pass in a buffer to compress_nocrc_sub_block that starts 8 bytes after the beginning |
380 | // of sb->compressed_ptr, so we have space to put in the sizes, and then run the checksum. |
381 | // |
382 | sb->compressed_size = compress_nocrc_sub_block( |
383 | sb, |
384 | (char *)sb->compressed_ptr + 8, |
385 | sb->compressed_size_bound, |
386 | method |
387 | ); |
388 | |
389 | uint32_t* = (uint32_t *)(sb->compressed_ptr); |
390 | // store the compressed and uncompressed size at the beginning |
391 | extra[0] = toku_htod32(sb->compressed_size); |
392 | extra[1] = toku_htod32(sb->uncompressed_size); |
393 | // now checksum the entire thing |
394 | sb->compressed_size += 8; // now add the eight bytes that we saved for the sizes |
395 | sb->xsum = toku_x1764_memory(sb->compressed_ptr,sb->compressed_size); |
396 | |
397 | // |
398 | // This is the end result for Dr. No and forward. For ftnodes, sb->compressed_ptr contains |
399 | // two integers at the beginning, the size and uncompressed size, and then the compressed |
400 | // data. sb->xsum contains the checksum of this entire thing. |
401 | // |
402 | // In PerconaFT 5.0, sb->compressed_ptr only contained the compressed data, sb->xsum |
403 | // checksummed only the compressed data, and the checksumming of the sizes were not |
404 | // done here. |
405 | // |
406 | } |
407 | |
408 | // |
409 | // Returns the size needed to serialize the ftnode info |
410 | // Does not include header information that is common with rollback logs |
411 | // such as the magic, layout_version, and build_id |
412 | // Includes only node specific info such as pivot information, n_children, and so on |
413 | // |
414 | static uint32_t |
415 | serialize_ftnode_info_size(FTNODE node) |
416 | { |
417 | uint32_t retval = 0; |
418 | retval += 8; // max_msn_applied_to_node_on_disk |
419 | retval += 4; // nodesize |
420 | retval += 4; // flags |
421 | retval += 4; // height; |
422 | retval += 8; // oldest_referenced_xid_known |
423 | retval += node->pivotkeys.serialized_size(); |
424 | retval += (node->n_children-1)*4; // encode length of each pivot |
425 | if (node->height > 0) { |
426 | retval += node->n_children*8; // child blocknum's |
427 | } |
428 | retval += 4; // checksum |
429 | return retval; |
430 | } |
431 | |
432 | static void serialize_ftnode_info(FTNODE node, SUB_BLOCK sb) { |
433 | // Memory must have been allocated by our caller. |
434 | invariant(sb->uncompressed_size > 0); |
435 | invariant_notnull(sb->uncompressed_ptr); |
436 | paranoid_invariant(sb->uncompressed_size == serialize_ftnode_info_size(node)); |
437 | |
438 | struct wbuf wb; |
439 | wbuf_init(&wb, sb->uncompressed_ptr, sb->uncompressed_size); |
440 | |
441 | wbuf_MSN(&wb, node->max_msn_applied_to_node_on_disk); |
442 | wbuf_nocrc_uint(&wb, 0); // write a dummy value for where node->nodesize used to be |
443 | wbuf_nocrc_uint(&wb, node->flags); |
444 | wbuf_nocrc_int (&wb, node->height); |
445 | wbuf_TXNID(&wb, node->oldest_referenced_xid_known); |
446 | node->pivotkeys.serialize_to_wbuf(&wb); |
447 | |
448 | // child blocks, only for internal nodes |
449 | if (node->height > 0) { |
450 | for (int i = 0; i < node->n_children; i++) { |
451 | wbuf_nocrc_BLOCKNUM(&wb, BP_BLOCKNUM(node,i)); |
452 | } |
453 | } |
454 | |
455 | uint32_t end_to_end_checksum = toku_x1764_memory(sb->uncompressed_ptr, wbuf_get_woffset(&wb)); |
456 | wbuf_nocrc_int(&wb, end_to_end_checksum); |
457 | invariant(wb.ndone == wb.size); |
458 | invariant(sb->uncompressed_size==wb.ndone); |
459 | } |
460 | |
461 | // This is the size of the uncompressed data, not including the compression headers |
462 | unsigned int |
463 | toku_serialize_ftnode_size (FTNODE node) { |
464 | unsigned int result = 0; |
465 | // |
466 | // As of now, this seems to be called if and only if the entire node is supposed |
467 | // to be in memory, so we will assert it. |
468 | // |
469 | toku_ftnode_assert_fully_in_memory(node); |
470 | result += serialize_node_header_size(node); |
471 | result += serialize_ftnode_info_size(node); |
472 | for (int i = 0; i < node->n_children; i++) { |
473 | result += serialize_ftnode_partition_size(node,i); |
474 | } |
475 | return result; |
476 | } |
477 | |
478 | struct serialize_times { |
479 | tokutime_t serialize_time; |
480 | tokutime_t compress_time; |
481 | }; |
482 | |
483 | static void |
484 | serialize_and_compress_partition(FTNODE node, |
485 | int childnum, |
486 | enum toku_compression_method compression_method, |
487 | SUB_BLOCK sb, |
488 | struct serialize_times *st) |
489 | { |
490 | // serialize, compress, update status |
491 | tokutime_t t0 = toku_time_now(); |
492 | serialize_ftnode_partition(node, childnum, sb); |
493 | tokutime_t t1 = toku_time_now(); |
494 | compress_ftnode_sub_block(sb, compression_method); |
495 | tokutime_t t2 = toku_time_now(); |
496 | |
497 | st->serialize_time += t1 - t0; |
498 | st->compress_time += t2 - t1; |
499 | } |
500 | |
501 | void |
502 | toku_create_compressed_partition_from_available( |
503 | FTNODE node, |
504 | int childnum, |
505 | enum toku_compression_method compression_method, |
506 | SUB_BLOCK sb |
507 | ) |
508 | { |
509 | tokutime_t t0 = toku_time_now(); |
510 | |
511 | // serialize |
512 | sb->uncompressed_size = serialize_ftnode_partition_size(node, childnum); |
513 | toku::scoped_malloc uncompressed_buf(sb->uncompressed_size); |
514 | sb->uncompressed_ptr = uncompressed_buf.get(); |
515 | serialize_ftnode_partition(node, childnum, sb); |
516 | |
517 | tokutime_t t1 = toku_time_now(); |
518 | |
519 | // compress. no need to pad with extra bytes for sizes/xsum - we're not storing them |
520 | set_compressed_size_bound(sb, compression_method); |
521 | sb->compressed_ptr = toku_xmalloc(sb->compressed_size_bound); |
522 | sb->compressed_size = compress_nocrc_sub_block( |
523 | sb, |
524 | sb->compressed_ptr, |
525 | sb->compressed_size_bound, |
526 | compression_method |
527 | ); |
528 | sb->uncompressed_ptr = NULL; |
529 | |
530 | tokutime_t t2 = toku_time_now(); |
531 | |
532 | toku_ft_status_update_serialize_times(node, t1 - t0, t2 - t1); |
533 | } |
534 | |
535 | static void |
536 | serialize_and_compress_serially(FTNODE node, |
537 | int npartitions, |
538 | enum toku_compression_method compression_method, |
539 | struct sub_block sb[], |
540 | struct serialize_times *st) { |
541 | for (int i = 0; i < npartitions; i++) { |
542 | serialize_and_compress_partition(node, i, compression_method, &sb[i], st); |
543 | } |
544 | } |
545 | |
546 | struct serialize_compress_work { |
547 | struct work base; |
548 | FTNODE node; |
549 | int i; |
550 | enum toku_compression_method compression_method; |
551 | struct sub_block *sb; |
552 | struct serialize_times st; |
553 | }; |
554 | |
555 | static void * |
556 | serialize_and_compress_worker(void *arg) { |
557 | struct workset *ws = (struct workset *) arg; |
558 | while (1) { |
559 | struct serialize_compress_work *w = (struct serialize_compress_work *) workset_get(ws); |
560 | if (w == NULL) |
561 | break; |
562 | int i = w->i; |
563 | serialize_and_compress_partition(w->node, i, w->compression_method, &w->sb[i], &w->st); |
564 | } |
565 | workset_release_ref(ws); |
566 | return arg; |
567 | } |
568 | |
569 | static void |
570 | serialize_and_compress_in_parallel(FTNODE node, |
571 | int npartitions, |
572 | enum toku_compression_method compression_method, |
573 | struct sub_block sb[], |
574 | struct serialize_times *st) { |
575 | if (npartitions == 1) { |
576 | serialize_and_compress_partition(node, 0, compression_method, &sb[0], st); |
577 | } else { |
578 | int T = num_cores; |
579 | if (T > npartitions) |
580 | T = npartitions; |
581 | if (T > 0) |
582 | T = T - 1; |
583 | struct workset ws; |
584 | ZERO_STRUCT(ws); |
585 | workset_init(&ws); |
586 | struct serialize_compress_work work[npartitions]; |
587 | workset_lock(&ws); |
588 | for (int i = 0; i < npartitions; i++) { |
589 | work[i] = (struct serialize_compress_work) { .base = {{NULL, NULL}}, |
590 | .node = node, |
591 | .i = i, |
592 | .compression_method = compression_method, |
593 | .sb = sb, |
594 | .st = { .serialize_time = 0, .compress_time = 0} }; |
595 | workset_put_locked(&ws, &work[i].base); |
596 | } |
597 | workset_unlock(&ws); |
598 | toku_thread_pool_run(ft_pool, 0, &T, serialize_and_compress_worker, &ws); |
599 | workset_add_ref(&ws, T); |
600 | serialize_and_compress_worker(&ws); |
601 | workset_join(&ws); |
602 | workset_destroy(&ws); |
603 | |
604 | // gather up the statistics from each thread's work item |
605 | for (int i = 0; i < npartitions; i++) { |
606 | st->serialize_time += work[i].st.serialize_time; |
607 | st->compress_time += work[i].st.compress_time; |
608 | } |
609 | } |
610 | } |
611 | |
612 | static void |
613 | serialize_and_compress_sb_node_info(FTNODE node, struct sub_block *sb, |
614 | enum toku_compression_method compression_method, struct serialize_times *st) { |
615 | // serialize, compress, update serialize times. |
616 | tokutime_t t0 = toku_time_now(); |
617 | serialize_ftnode_info(node, sb); |
618 | tokutime_t t1 = toku_time_now(); |
619 | compress_ftnode_sub_block(sb, compression_method); |
620 | tokutime_t t2 = toku_time_now(); |
621 | |
622 | st->serialize_time += t1 - t0; |
623 | st->compress_time += t2 - t1; |
624 | } |
625 | |
626 | int toku_serialize_ftnode_to_memory(FTNODE node, |
627 | FTNODE_DISK_DATA* ndd, |
628 | unsigned int basementnodesize, |
629 | enum toku_compression_method compression_method, |
630 | bool do_rebalancing, |
631 | bool in_parallel, // for loader is true, for toku_ftnode_flush_callback, is false |
632 | /*out*/ size_t *n_bytes_to_write, |
633 | /*out*/ size_t *n_uncompressed_bytes, |
634 | /*out*/ char **bytes_to_write) |
635 | // Effect: Writes out each child to a separate malloc'd buffer, then compresses |
636 | // all of them, and writes the uncompressed header, to bytes_to_write, |
637 | // which is malloc'd. |
638 | // |
639 | // The resulting buffer is guaranteed to be 512-byte aligned and the total length is a multiple of 512 (so we pad with zeros at the end if needed). |
640 | // 512-byte padding is for O_DIRECT to work. |
641 | { |
642 | toku_ftnode_assert_fully_in_memory(node); |
643 | |
644 | if (do_rebalancing && node->height == 0) { |
645 | toku_ftnode_leaf_rebalance(node, basementnodesize); |
646 | } |
647 | const int npartitions = node->n_children; |
648 | |
649 | // Each partition represents a compressed sub block |
650 | // For internal nodes, a sub block is a message buffer |
651 | // For leaf nodes, a sub block is a basement node |
652 | toku::scoped_calloc sb_buf(sizeof(struct sub_block) * npartitions); |
653 | struct sub_block *sb = reinterpret_cast<struct sub_block *>(sb_buf.get()); |
654 | XREALLOC_N(npartitions, *ndd); |
655 | |
656 | // |
657 | // First, let's serialize and compress the individual sub blocks |
658 | // |
659 | |
660 | // determine how large our serialization and compression buffers need to be. |
661 | size_t serialize_buf_size = 0, compression_buf_size = 0; |
662 | for (int i = 0; i < node->n_children; i++) { |
663 | sb[i].uncompressed_size = serialize_ftnode_partition_size(node, i); |
664 | sb[i].compressed_size_bound = toku_compress_bound(compression_method, sb[i].uncompressed_size); |
665 | serialize_buf_size += sb[i].uncompressed_size; |
666 | compression_buf_size += sb[i].compressed_size_bound + 8; // add 8 extra bytes, 4 for compressed size, 4 for decompressed size |
667 | } |
668 | |
669 | // give each sub block a base pointer to enough buffer space for serialization and compression |
670 | toku::scoped_malloc serialize_buf(serialize_buf_size); |
671 | toku::scoped_malloc compression_buf(compression_buf_size); |
672 | for (size_t i = 0, uncompressed_offset = 0, compressed_offset = 0; i < (size_t) node->n_children; i++) { |
673 | sb[i].uncompressed_ptr = reinterpret_cast<char *>(serialize_buf.get()) + uncompressed_offset; |
674 | sb[i].compressed_ptr = reinterpret_cast<char *>(compression_buf.get()) + compressed_offset; |
675 | uncompressed_offset += sb[i].uncompressed_size; |
676 | compressed_offset += sb[i].compressed_size_bound + 8; // add 8 extra bytes, 4 for compressed size, 4 for decompressed size |
677 | invariant(uncompressed_offset <= serialize_buf_size); |
678 | invariant(compressed_offset <= compression_buf_size); |
679 | } |
680 | |
681 | // do the actual serialization now that we have buffer space |
682 | struct serialize_times st = { 0, 0 }; |
683 | if (in_parallel) { |
684 | serialize_and_compress_in_parallel(node, npartitions, compression_method, sb, &st); |
685 | } else { |
686 | serialize_and_compress_serially(node, npartitions, compression_method, sb, &st); |
687 | } |
688 | |
689 | // |
690 | // Now lets create a sub-block that has the common node information, |
691 | // This does NOT include the header |
692 | // |
693 | |
694 | // determine how large our serialization and copmression buffers need to be |
695 | struct sub_block sb_node_info; |
696 | sub_block_init(&sb_node_info); |
697 | size_t sb_node_info_uncompressed_size = serialize_ftnode_info_size(node); |
698 | size_t sb_node_info_compressed_size_bound = toku_compress_bound(compression_method, sb_node_info_uncompressed_size); |
699 | toku::scoped_malloc sb_node_info_uncompressed_buf(sb_node_info_uncompressed_size); |
700 | toku::scoped_malloc sb_node_info_compressed_buf(sb_node_info_compressed_size_bound + 8); // add 8 extra bytes, 4 for compressed size, 4 for decompressed size |
701 | sb_node_info.uncompressed_size = sb_node_info_uncompressed_size; |
702 | sb_node_info.uncompressed_ptr = sb_node_info_uncompressed_buf.get(); |
703 | sb_node_info.compressed_size_bound = sb_node_info_compressed_size_bound; |
704 | sb_node_info.compressed_ptr = sb_node_info_compressed_buf.get(); |
705 | |
706 | // do the actual serialization now that we have buffer space |
707 | serialize_and_compress_sb_node_info(node, &sb_node_info, compression_method, &st); |
708 | |
709 | // |
710 | // At this point, we have compressed each of our pieces into individual sub_blocks, |
711 | // we can put the header and all the subblocks into a single buffer and return it. |
712 | // |
713 | |
714 | // update the serialize times, ignore the header for simplicity. we captured all |
715 | // of the partitions' serialize times so that's probably good enough. |
716 | toku_ft_status_update_serialize_times(node, st.serialize_time, st.compress_time); |
717 | |
718 | // The total size of the node is: |
719 | // size of header + disk size of the n+1 sub_block's created above |
720 | uint32_t total_node_size = (serialize_node_header_size(node) // uncompressed header |
721 | + sb_node_info.compressed_size // compressed nodeinfo (without its checksum) |
722 | + 4); // nodeinfo's checksum |
723 | uint32_t total_uncompressed_size = (serialize_node_header_size(node) // uncompressed header |
724 | + sb_node_info.uncompressed_size // uncompressed nodeinfo (without its checksum) |
725 | + 4); // nodeinfo's checksum |
726 | // store the BP_SIZESs |
727 | for (int i = 0; i < node->n_children; i++) { |
728 | uint32_t len = sb[i].compressed_size + 4; // data and checksum |
729 | BP_SIZE (*ndd,i) = len; |
730 | BP_START(*ndd,i) = total_node_size; |
731 | total_node_size += sb[i].compressed_size + 4; |
732 | total_uncompressed_size += sb[i].uncompressed_size + 4; |
733 | } |
734 | |
735 | // now create the final serialized node |
736 | uint32_t total_buffer_size = roundup_to_multiple(512, total_node_size); // make the buffer be 512 bytes. |
737 | char *XMALLOC_N_ALIGNED(512, total_buffer_size, data); |
738 | char *curr_ptr = data; |
739 | |
740 | // write the header |
741 | struct wbuf wb; |
742 | wbuf_init(&wb, curr_ptr, serialize_node_header_size(node)); |
743 | serialize_node_header(node, *ndd, &wb); |
744 | assert(wb.ndone == wb.size); |
745 | curr_ptr += serialize_node_header_size(node); |
746 | |
747 | // now write sb_node_info |
748 | memcpy(curr_ptr, sb_node_info.compressed_ptr, sb_node_info.compressed_size); |
749 | curr_ptr += sb_node_info.compressed_size; |
750 | // write the checksum |
751 | *(uint32_t *)curr_ptr = toku_htod32(sb_node_info.xsum); |
752 | curr_ptr += sizeof(sb_node_info.xsum); |
753 | |
754 | for (int i = 0; i < npartitions; i++) { |
755 | memcpy(curr_ptr, sb[i].compressed_ptr, sb[i].compressed_size); |
756 | curr_ptr += sb[i].compressed_size; |
757 | // write the checksum |
758 | *(uint32_t *)curr_ptr = toku_htod32(sb[i].xsum); |
759 | curr_ptr += sizeof(sb[i].xsum); |
760 | } |
761 | // Zero the rest of the buffer |
762 | memset(data + total_node_size, 0, total_buffer_size - total_node_size); |
763 | |
764 | assert((uint32_t) (curr_ptr - data) == total_node_size); |
765 | *bytes_to_write = data; |
766 | *n_bytes_to_write = total_buffer_size; |
767 | *n_uncompressed_bytes = total_uncompressed_size; |
768 | |
769 | invariant(*n_bytes_to_write % 512 == 0); |
770 | invariant(reinterpret_cast<unsigned long long>(*bytes_to_write) % 512 == 0); |
771 | return 0; |
772 | } |
773 | |
774 | int toku_serialize_ftnode_to(int fd, |
775 | BLOCKNUM blocknum, |
776 | FTNODE node, |
777 | FTNODE_DISK_DATA *ndd, |
778 | bool do_rebalancing, |
779 | FT ft, |
780 | bool for_checkpoint) { |
781 | size_t n_to_write; |
782 | size_t n_uncompressed_bytes; |
783 | char *compressed_buf = nullptr; |
784 | |
785 | // because toku_serialize_ftnode_to is only called for |
786 | // in toku_ftnode_flush_callback, we pass false |
787 | // for in_parallel. The reasoning is that when we write |
788 | // nodes to disk via toku_ftnode_flush_callback, we |
789 | // assume that it is being done on a non-critical |
790 | // background thread (probably for checkpointing), and therefore |
791 | // should not hog CPU, |
792 | // |
793 | // Should the above facts change, we may want to revisit |
794 | // passing false for in_parallel here |
795 | // |
796 | // alternatively, we could have made in_parallel a parameter |
797 | // for toku_serialize_ftnode_to, but instead we did this. |
798 | int r = toku_serialize_ftnode_to_memory( |
799 | node, |
800 | ndd, |
801 | ft->h->basementnodesize, |
802 | ft->h->compression_method, |
803 | do_rebalancing, |
804 | toku_unsafe_fetch(&toku_serialize_in_parallel), |
805 | &n_to_write, |
806 | &n_uncompressed_bytes, |
807 | &compressed_buf); |
808 | if (r != 0) { |
809 | return r; |
810 | } |
811 | |
812 | // If the node has never been written, then write the whole buffer, |
813 | // including the zeros |
814 | invariant(blocknum.b >= 0); |
815 | DISKOFF offset; |
816 | |
817 | // Dirties the ft |
818 | ft->blocktable.realloc_on_disk( |
819 | blocknum, n_to_write, &offset, ft, fd, for_checkpoint); |
820 | |
821 | tokutime_t t0 = toku_time_now(); |
822 | toku_os_full_pwrite(fd, compressed_buf, n_to_write, offset); |
823 | tokutime_t t1 = toku_time_now(); |
824 | |
825 | tokutime_t io_time = t1 - t0; |
826 | toku_ft_status_update_flush_reason( |
827 | node, n_uncompressed_bytes, n_to_write, io_time, for_checkpoint); |
828 | |
829 | toku_free(compressed_buf); |
830 | node->dirty = 0; // See #1957. Must set the node to be clean after |
831 | // serializing it so that it doesn't get written again on |
832 | // the next checkpoint or eviction. |
833 | if (node->height == 0) { |
834 | for (int i = 0; i < node->n_children; i++) { |
835 | if (BP_STATE(node, i) == PT_AVAIL) { |
836 | BLB_LRD(node, i) = 0; |
837 | } |
838 | } |
839 | } |
840 | return 0; |
841 | } |
842 | |
843 | static void |
844 | sort_and_steal_offset_arrays(NONLEAF_CHILDINFO bnc, |
845 | const toku::comparator &cmp, |
846 | int32_t **fresh_offsets, int32_t nfresh, |
847 | int32_t **stale_offsets, int32_t nstale, |
848 | int32_t **broadcast_offsets, int32_t nbroadcast) { |
849 | // We always have fresh / broadcast offsets (even if they are empty) |
850 | // but we may not have stale offsets, in the case of v13 upgrade. |
851 | invariant(fresh_offsets != nullptr); |
852 | invariant(broadcast_offsets != nullptr); |
853 | invariant(cmp.valid()); |
854 | |
855 | typedef toku::sort<int32_t, const struct toku_msg_buffer_key_msn_cmp_extra, toku_msg_buffer_key_msn_cmp> msn_sort; |
856 | |
857 | const int32_t n_in_this_buffer = nfresh + nstale + nbroadcast; |
858 | struct toku_msg_buffer_key_msn_cmp_extra (cmp, &bnc->msg_buffer); |
859 | msn_sort::mergesort_r(*fresh_offsets, nfresh, extra); |
860 | bnc->fresh_message_tree.destroy(); |
861 | bnc->fresh_message_tree.create_steal_sorted_array(fresh_offsets, nfresh, n_in_this_buffer); |
862 | if (stale_offsets) { |
863 | msn_sort::mergesort_r(*stale_offsets, nstale, extra); |
864 | bnc->stale_message_tree.destroy(); |
865 | bnc->stale_message_tree.create_steal_sorted_array(stale_offsets, nstale, n_in_this_buffer); |
866 | } |
867 | bnc->broadcast_list.destroy(); |
868 | bnc->broadcast_list.create_steal_sorted_array(broadcast_offsets, nbroadcast, n_in_this_buffer); |
869 | } |
870 | |
871 | static MSN |
872 | deserialize_child_buffer_v13(FT ft, NONLEAF_CHILDINFO bnc, struct rbuf *rb) { |
873 | // We skip 'stale' offsets for upgraded nodes. |
874 | int32_t nfresh = 0, nbroadcast = 0; |
875 | int32_t *fresh_offsets = nullptr, *broadcast_offsets = nullptr; |
876 | |
877 | // Only sort buffers if we have a valid comparison function. In certain scenarios, |
878 | // like deserialie_ft_versioned() or tokuftdump, we'll need to deserialize ftnodes |
879 | // for simple inspection and don't actually require that the message buffers are |
880 | // properly sorted. This is very ugly, but correct. |
881 | const bool sort = ft->cmp.valid(); |
882 | |
883 | MSN highest_msn_in_this_buffer = |
884 | bnc->msg_buffer.deserialize_from_rbuf_v13(rb, &ft->h->highest_unused_msn_for_upgrade, |
885 | sort ? &fresh_offsets : nullptr, &nfresh, |
886 | sort ? &broadcast_offsets : nullptr, &nbroadcast); |
887 | |
888 | if (sort) { |
889 | sort_and_steal_offset_arrays(bnc, ft->cmp, |
890 | &fresh_offsets, nfresh, |
891 | nullptr, 0, // no stale offsets |
892 | &broadcast_offsets, nbroadcast); |
893 | } |
894 | |
895 | return highest_msn_in_this_buffer; |
896 | } |
897 | |
898 | static void |
899 | deserialize_child_buffer_v26(NONLEAF_CHILDINFO bnc, struct rbuf *rb, const toku::comparator &cmp) { |
900 | int32_t nfresh = 0, nstale = 0, nbroadcast = 0; |
901 | int32_t *fresh_offsets, *stale_offsets, *broadcast_offsets; |
902 | |
903 | // Only sort buffers if we have a valid comparison function. In certain scenarios, |
904 | // like deserialie_ft_versioned() or tokuftdump, we'll need to deserialize ftnodes |
905 | // for simple inspection and don't actually require that the message buffers are |
906 | // properly sorted. This is very ugly, but correct. |
907 | const bool sort = cmp.valid(); |
908 | |
909 | // read in the message buffer |
910 | bnc->msg_buffer.deserialize_from_rbuf(rb, |
911 | sort ? &fresh_offsets : nullptr, &nfresh, |
912 | sort ? &stale_offsets : nullptr, &nstale, |
913 | sort ? &broadcast_offsets : nullptr, &nbroadcast); |
914 | |
915 | if (sort) { |
916 | sort_and_steal_offset_arrays(bnc, cmp, |
917 | &fresh_offsets, nfresh, |
918 | &stale_offsets, nstale, |
919 | &broadcast_offsets, nbroadcast); |
920 | } |
921 | } |
922 | |
923 | static void |
924 | deserialize_child_buffer(NONLEAF_CHILDINFO bnc, struct rbuf *rb) { |
925 | // read in the message buffer |
926 | bnc->msg_buffer.deserialize_from_rbuf(rb, |
927 | nullptr, nullptr, // fresh_offsets, nfresh, |
928 | nullptr, nullptr, // stale_offsets, nstale, |
929 | nullptr, nullptr); // broadcast_offsets, nbroadcast |
930 | |
931 | // read in each message tree (fresh, stale, broadcast) |
932 | int32_t nfresh = rbuf_int(rb); |
933 | int32_t *XMALLOC_N(nfresh, fresh_offsets); |
934 | for (int i = 0; i < nfresh; i++) { |
935 | fresh_offsets[i] = rbuf_int(rb); |
936 | } |
937 | |
938 | int32_t nstale = rbuf_int(rb); |
939 | int32_t *XMALLOC_N(nstale, stale_offsets); |
940 | for (int i = 0; i < nstale; i++) { |
941 | stale_offsets[i] = rbuf_int(rb); |
942 | } |
943 | |
944 | int32_t nbroadcast = rbuf_int(rb); |
945 | int32_t *XMALLOC_N(nbroadcast, broadcast_offsets); |
946 | for (int i = 0; i < nbroadcast; i++) { |
947 | broadcast_offsets[i] = rbuf_int(rb); |
948 | } |
949 | |
950 | // build OMTs out of each offset array |
951 | bnc->fresh_message_tree.destroy(); |
952 | bnc->fresh_message_tree.create_steal_sorted_array(&fresh_offsets, nfresh, nfresh); |
953 | bnc->stale_message_tree.destroy(); |
954 | bnc->stale_message_tree.create_steal_sorted_array(&stale_offsets, nstale, nstale); |
955 | bnc->broadcast_list.destroy(); |
956 | bnc->broadcast_list.create_steal_sorted_array(&broadcast_offsets, nbroadcast, nbroadcast); |
957 | } |
958 | |
959 | // dump a buffer to stderr |
960 | // no locking around this for now |
961 | void |
962 | dump_bad_block(unsigned char *vp, uint64_t size) { |
963 | const uint64_t linesize = 64; |
964 | uint64_t n = size / linesize; |
965 | for (uint64_t i = 0; i < n; i++) { |
966 | fprintf(stderr, "%p: " , vp); |
967 | for (uint64_t j = 0; j < linesize; j++) { |
968 | unsigned char c = vp[j]; |
969 | fprintf(stderr, "%2.2X" , c); |
970 | } |
971 | fprintf(stderr, "\n" ); |
972 | vp += linesize; |
973 | } |
974 | size = size % linesize; |
975 | for (uint64_t i=0; i<size; i++) { |
976 | if ((i % linesize) == 0) |
977 | fprintf(stderr, "%p: " , vp+i); |
978 | fprintf(stderr, "%2.2X" , vp[i]); |
979 | if (((i+1) % linesize) == 0) |
980 | fprintf(stderr, "\n" ); |
981 | } |
982 | fprintf(stderr, "\n" ); |
983 | } |
984 | |
985 | //////////////////////////////////////////////////////////////////// |
986 | //////////////////////////////////////////////////////////////////// |
987 | //////////////////////////////////////////////////////////////////// |
988 | //////////////////////////////////////////////////////////////////// |
989 | //////////////////////////////////////////////////////////////////// |
990 | //////////////////////////////////////////////////////////////////// |
991 | //////////////////////////////////////////////////////////////////// |
992 | //////////////////////////////////////////////////////////////////// |
993 | |
994 | BASEMENTNODE toku_create_empty_bn(void) { |
995 | BASEMENTNODE bn = toku_create_empty_bn_no_buffer(); |
996 | bn->data_buffer.initialize_empty(); |
997 | return bn; |
998 | } |
999 | |
1000 | BASEMENTNODE toku_clone_bn(BASEMENTNODE orig_bn) { |
1001 | BASEMENTNODE bn = toku_create_empty_bn_no_buffer(); |
1002 | bn->max_msn_applied = orig_bn->max_msn_applied; |
1003 | bn->seqinsert = orig_bn->seqinsert; |
1004 | bn->stale_ancestor_messages_applied = orig_bn->stale_ancestor_messages_applied; |
1005 | bn->stat64_delta = orig_bn->stat64_delta; |
1006 | bn->logical_rows_delta = orig_bn->logical_rows_delta; |
1007 | bn->data_buffer.clone(&orig_bn->data_buffer); |
1008 | return bn; |
1009 | } |
1010 | |
1011 | BASEMENTNODE toku_create_empty_bn_no_buffer(void) { |
1012 | BASEMENTNODE XMALLOC(bn); |
1013 | bn->max_msn_applied.msn = 0; |
1014 | bn->seqinsert = 0; |
1015 | bn->stale_ancestor_messages_applied = false; |
1016 | bn->stat64_delta = ZEROSTATS; |
1017 | bn->logical_rows_delta = 0; |
1018 | bn->data_buffer.init_zero(); |
1019 | return bn; |
1020 | } |
1021 | |
1022 | NONLEAF_CHILDINFO toku_create_empty_nl(void) { |
1023 | NONLEAF_CHILDINFO XMALLOC(cn); |
1024 | cn->msg_buffer.create(); |
1025 | cn->fresh_message_tree.create_no_array(); |
1026 | cn->stale_message_tree.create_no_array(); |
1027 | cn->broadcast_list.create_no_array(); |
1028 | memset(cn->flow, 0, sizeof cn->flow); |
1029 | return cn; |
1030 | } |
1031 | |
1032 | // must clone the OMTs, since we serialize them along with the message buffer |
1033 | NONLEAF_CHILDINFO toku_clone_nl(NONLEAF_CHILDINFO orig_childinfo) { |
1034 | NONLEAF_CHILDINFO XMALLOC(cn); |
1035 | cn->msg_buffer.clone(&orig_childinfo->msg_buffer); |
1036 | cn->fresh_message_tree.create_no_array(); |
1037 | cn->fresh_message_tree.clone(orig_childinfo->fresh_message_tree); |
1038 | cn->stale_message_tree.create_no_array(); |
1039 | cn->stale_message_tree.clone(orig_childinfo->stale_message_tree); |
1040 | cn->broadcast_list.create_no_array(); |
1041 | cn->broadcast_list.clone(orig_childinfo->broadcast_list); |
1042 | memset(cn->flow, 0, sizeof cn->flow); |
1043 | return cn; |
1044 | } |
1045 | |
1046 | void destroy_basement_node (BASEMENTNODE bn) |
1047 | { |
1048 | bn->data_buffer.destroy(); |
1049 | toku_free(bn); |
1050 | } |
1051 | |
1052 | void destroy_nonleaf_childinfo (NONLEAF_CHILDINFO nl) |
1053 | { |
1054 | nl->msg_buffer.destroy(); |
1055 | nl->fresh_message_tree.destroy(); |
1056 | nl->stale_message_tree.destroy(); |
1057 | nl->broadcast_list.destroy(); |
1058 | toku_free(nl); |
1059 | } |
1060 | |
1061 | void read_block_from_fd_into_rbuf( |
1062 | int fd, |
1063 | BLOCKNUM blocknum, |
1064 | FT ft, |
1065 | struct rbuf *rb |
1066 | ) |
1067 | { |
1068 | // get the file offset and block size for the block |
1069 | DISKOFF offset, size; |
1070 | ft->blocktable.translate_blocknum_to_offset_size(blocknum, &offset, &size); |
1071 | DISKOFF size_aligned = roundup_to_multiple(512, size); |
1072 | uint8_t *XMALLOC_N_ALIGNED(512, size_aligned, raw_block); |
1073 | rbuf_init(rb, raw_block, size); |
1074 | // read the block |
1075 | ssize_t rlen = toku_os_pread(fd, raw_block, size_aligned, offset); |
1076 | assert((DISKOFF)rlen >= size); |
1077 | assert((DISKOFF)rlen <= size_aligned); |
1078 | } |
1079 | |
1080 | static const int = 32*1024; |
1081 | |
1082 | #ifndef MIN |
1083 | #define MIN(a,b) (((a)>(b)) ? (b) : (a)) |
1084 | #endif |
1085 | |
1086 | // Effect: If the header part of the node is small enough, then read it into the rbuf. The rbuf will be allocated to be big enough in any case. |
1087 | static void (int fd, BLOCKNUM blocknum, |
1088 | FT ft, struct rbuf *rb, |
1089 | ftnode_fetch_extra *bfe) { |
1090 | DISKOFF offset, size; |
1091 | ft->blocktable.translate_blocknum_to_offset_size(blocknum, &offset, &size); |
1092 | DISKOFF read_size = roundup_to_multiple(512, MIN(read_header_heuristic_max, size)); |
1093 | uint8_t *XMALLOC_N_ALIGNED(512, roundup_to_multiple(512, size), raw_block); |
1094 | rbuf_init(rb, raw_block, read_size); |
1095 | |
1096 | // read the block |
1097 | tokutime_t t0 = toku_time_now(); |
1098 | ssize_t rlen = toku_os_pread(fd, raw_block, read_size, offset); |
1099 | tokutime_t t1 = toku_time_now(); |
1100 | |
1101 | assert(rlen >= 0); |
1102 | rbuf_init(rb, raw_block, rlen); |
1103 | |
1104 | bfe->bytes_read = rlen; |
1105 | bfe->io_time = t1 - t0; |
1106 | toku_ft_status_update_pivot_fetch_reason(bfe); |
1107 | } |
1108 | |
1109 | // |
1110 | // read the compressed partition into the sub_block, |
1111 | // validate the checksum of the compressed data |
1112 | // |
1113 | int |
1114 | read_compressed_sub_block(struct rbuf *rb, struct sub_block *sb) |
1115 | { |
1116 | int r = 0; |
1117 | sb->compressed_size = rbuf_int(rb); |
1118 | sb->uncompressed_size = rbuf_int(rb); |
1119 | const void **cp = (const void **) &sb->compressed_ptr; |
1120 | rbuf_literal_bytes(rb, cp, sb->compressed_size); |
1121 | sb->xsum = rbuf_int(rb); |
1122 | // let's check the checksum |
1123 | uint32_t actual_xsum = toku_x1764_memory((char *)sb->compressed_ptr-8, 8+sb->compressed_size); |
1124 | if (sb->xsum != actual_xsum) { |
1125 | r = TOKUDB_BAD_CHECKSUM; |
1126 | } |
1127 | return r; |
1128 | } |
1129 | |
1130 | static int |
1131 | read_and_decompress_sub_block(struct rbuf *rb, struct sub_block *sb) |
1132 | { |
1133 | int r = 0; |
1134 | r = read_compressed_sub_block(rb, sb); |
1135 | if (r != 0) { |
1136 | goto exit; |
1137 | } |
1138 | |
1139 | just_decompress_sub_block(sb); |
1140 | exit: |
1141 | return r; |
1142 | } |
1143 | |
1144 | // Allocates space for the sub-block and de-compresses the data from |
1145 | // the supplied compressed pointer.. |
1146 | void |
1147 | just_decompress_sub_block(struct sub_block *sb) |
1148 | { |
1149 | // <CER> TODO: Add assert that the subblock was read in. |
1150 | sb->uncompressed_ptr = toku_xmalloc(sb->uncompressed_size); |
1151 | |
1152 | toku_decompress( |
1153 | (Bytef *) sb->uncompressed_ptr, |
1154 | sb->uncompressed_size, |
1155 | (Bytef *) sb->compressed_ptr, |
1156 | sb->compressed_size |
1157 | ); |
1158 | } |
1159 | |
1160 | // verify the checksum |
1161 | int verify_ftnode_sub_block(struct sub_block *sb, |
1162 | const char *fname, |
1163 | BLOCKNUM blocknum) { |
1164 | int r = 0; |
1165 | // first verify the checksum |
1166 | uint32_t data_size = sb->uncompressed_size - 4; // checksum is 4 bytes at end |
1167 | uint32_t stored_xsum = toku_dtoh32(*((uint32_t *)((char *)sb->uncompressed_ptr + data_size))); |
1168 | uint32_t actual_xsum = toku_x1764_memory(sb->uncompressed_ptr, data_size); |
1169 | if (stored_xsum != actual_xsum) { |
1170 | fprintf( |
1171 | stderr, |
1172 | "%s:%d:verify_ftnode_sub_block - " |
1173 | "file[%s], blocknum[%ld], stored_xsum[%u] != actual_xsum[%u]\n" , |
1174 | __FILE__, |
1175 | __LINE__, |
1176 | fname ? fname : "unknown" , |
1177 | blocknum.b, |
1178 | stored_xsum, |
1179 | actual_xsum); |
1180 | dump_bad_block((Bytef *) sb->uncompressed_ptr, sb->uncompressed_size); |
1181 | r = TOKUDB_BAD_CHECKSUM; |
1182 | } |
1183 | return r; |
1184 | } |
1185 | |
1186 | // This function deserializes the data stored by serialize_ftnode_info |
1187 | static int deserialize_ftnode_info(struct sub_block *sb, FTNODE node) { |
1188 | |
1189 | // sb_node_info->uncompressed_ptr stores the serialized node information |
1190 | // this function puts that information into node |
1191 | |
1192 | // first verify the checksum |
1193 | int r = 0; |
1194 | const char *fname = toku_ftnode_get_cachefile_fname_in_env(node); |
1195 | r = verify_ftnode_sub_block(sb, fname, node->blocknum); |
1196 | if (r != 0) { |
1197 | fprintf( |
1198 | stderr, |
1199 | "%s:%d:deserialize_ftnode_info - " |
1200 | "file[%s], blocknum[%ld], verify_ftnode_sub_block failed with %d\n" , |
1201 | __FILE__, |
1202 | __LINE__, |
1203 | fname ? fname : "unknown" , |
1204 | node->blocknum.b, |
1205 | r); |
1206 | dump_bad_block(static_cast<unsigned char *>(sb->uncompressed_ptr), |
1207 | sb->uncompressed_size); |
1208 | goto exit; |
1209 | } |
1210 | |
1211 | uint32_t data_size; |
1212 | data_size = sb->uncompressed_size - 4; // checksum is 4 bytes at end |
1213 | |
1214 | // now with the data verified, we can read the information into the node |
1215 | struct rbuf rb; |
1216 | rbuf_init(&rb, (unsigned char *) sb->uncompressed_ptr, data_size); |
1217 | |
1218 | node->max_msn_applied_to_node_on_disk = rbuf_MSN(&rb); |
1219 | (void)rbuf_int(&rb); |
1220 | node->flags = rbuf_int(&rb); |
1221 | node->height = rbuf_int(&rb); |
1222 | if (node->layout_version_read_from_disk < FT_LAYOUT_VERSION_19) { |
1223 | (void) rbuf_int(&rb); // optimized_for_upgrade |
1224 | } |
1225 | if (node->layout_version_read_from_disk >= FT_LAYOUT_VERSION_22) { |
1226 | rbuf_TXNID(&rb, &node->oldest_referenced_xid_known); |
1227 | } |
1228 | |
1229 | // now create the basement nodes or childinfos, depending on whether this is a |
1230 | // leaf node or internal node |
1231 | // now the subtree_estimates |
1232 | |
1233 | // n_children is now in the header, nd the allocatio of the node->bp is in deserialize_ftnode_from_rbuf. |
1234 | |
1235 | // now the pivots |
1236 | if (node->n_children > 1) { |
1237 | node->pivotkeys.deserialize_from_rbuf(&rb, node->n_children - 1); |
1238 | } else { |
1239 | node->pivotkeys.create_empty(); |
1240 | } |
1241 | |
1242 | // if this is an internal node, unpack the block nums, and fill in necessary fields |
1243 | // of childinfo |
1244 | if (node->height > 0) { |
1245 | for (int i = 0; i < node->n_children; i++) { |
1246 | BP_BLOCKNUM(node,i) = rbuf_blocknum(&rb); |
1247 | BP_WORKDONE(node, i) = 0; |
1248 | } |
1249 | } |
1250 | |
1251 | // make sure that all the data was read |
1252 | if (data_size != rb.ndone) { |
1253 | fprintf( |
1254 | stderr, |
1255 | "%s:%d:deserialize_ftnode_info - " |
1256 | "file[%s], blocknum[%ld], data_size[%d] != rb.ndone[%d]\n" , |
1257 | __FILE__, |
1258 | __LINE__, |
1259 | fname ? fname : "unknown" , |
1260 | node->blocknum.b, |
1261 | data_size, |
1262 | rb.ndone); |
1263 | dump_bad_block(rb.buf, rb.size); |
1264 | abort(); |
1265 | } |
1266 | exit: |
1267 | return r; |
1268 | } |
1269 | |
1270 | static void |
1271 | setup_available_ftnode_partition(FTNODE node, int i) { |
1272 | if (node->height == 0) { |
1273 | set_BLB(node, i, toku_create_empty_bn()); |
1274 | BLB_MAX_MSN_APPLIED(node,i) = node->max_msn_applied_to_node_on_disk; |
1275 | } |
1276 | else { |
1277 | set_BNC(node, i, toku_create_empty_nl()); |
1278 | } |
1279 | } |
1280 | |
1281 | // Assign the child_to_read member of the bfe from the given ftnode |
1282 | // that has been brought into memory. |
1283 | static void |
1284 | (FTNODE node, ftnode_fetch_extra *bfe) |
1285 | { |
1286 | if (bfe->type == ftnode_fetch_subset && bfe->search != NULL) { |
1287 | // we do not take into account prefetching yet |
1288 | // as of now, if we need a subset, the only thing |
1289 | // we can possibly require is a single basement node |
1290 | // we find out what basement node the query cares about |
1291 | // and check if it is available |
1292 | bfe->child_to_read = toku_ft_search_which_child( |
1293 | bfe->ft->cmp, |
1294 | node, |
1295 | bfe->search |
1296 | ); |
1297 | } else if (bfe->type == ftnode_fetch_keymatch) { |
1298 | // we do not take into account prefetching yet |
1299 | // as of now, if we need a subset, the only thing |
1300 | // we can possibly require is a single basement node |
1301 | // we find out what basement node the query cares about |
1302 | // and check if it is available |
1303 | if (node->height == 0) { |
1304 | int left_child = bfe->leftmost_child_wanted(node); |
1305 | int right_child = bfe->rightmost_child_wanted(node); |
1306 | if (left_child == right_child) { |
1307 | bfe->child_to_read = left_child; |
1308 | } |
1309 | } |
1310 | } |
1311 | } |
1312 | |
1313 | // Using the search parameters in the bfe, this function will |
1314 | // initialize all of the given ftnode's partitions. |
1315 | static void |
1316 | (FTNODE node, |
1317 | ftnode_fetch_extra *bfe, |
1318 | bool data_in_memory) |
1319 | { |
1320 | // Leftmost and Rightmost Child bounds. |
1321 | int lc, rc; |
1322 | if (bfe->type == ftnode_fetch_subset || bfe->type == ftnode_fetch_prefetch) { |
1323 | lc = bfe->leftmost_child_wanted(node); |
1324 | rc = bfe->rightmost_child_wanted(node); |
1325 | } else { |
1326 | lc = -1; |
1327 | rc = -1; |
1328 | } |
1329 | |
1330 | // |
1331 | // setup memory needed for the node |
1332 | // |
1333 | //printf("node height %d, blocknum %" PRId64 ", type %d lc %d rc %d\n", node->height, node->blocknum.b, bfe->type, lc, rc); |
1334 | for (int i = 0; i < node->n_children; i++) { |
1335 | BP_INIT_UNTOUCHED_CLOCK(node,i); |
1336 | if (data_in_memory) { |
1337 | BP_STATE(node, i) = ((bfe->wants_child_available(i) || (lc <= i && i <= rc)) |
1338 | ? PT_AVAIL : PT_COMPRESSED); |
1339 | } else { |
1340 | BP_STATE(node, i) = PT_ON_DISK; |
1341 | } |
1342 | BP_WORKDONE(node,i) = 0; |
1343 | |
1344 | switch (BP_STATE(node,i)) { |
1345 | case PT_AVAIL: |
1346 | setup_available_ftnode_partition(node, i); |
1347 | BP_TOUCH_CLOCK(node,i); |
1348 | break; |
1349 | case PT_COMPRESSED: |
1350 | set_BSB(node, i, sub_block_creat()); |
1351 | break; |
1352 | case PT_ON_DISK: |
1353 | set_BNULL(node, i); |
1354 | break; |
1355 | case PT_INVALID: |
1356 | abort(); |
1357 | } |
1358 | } |
1359 | } |
1360 | |
1361 | static void (FTNODE node, ftnode_fetch_extra *bfe, bool data_in_memory) |
1362 | // Effect: Used when reading a ftnode into main memory, this sets up the partitions. |
1363 | // We set bfe->child_to_read as well as the BP_STATE and the data pointers (e.g., with set_BSB or set_BNULL or other set_ operations). |
1364 | // Arguments: Node: the node to set up. |
1365 | // bfe: Describes the key range needed. |
1366 | // data_in_memory: true if we have all the data (in which case we set the BP_STATE to be either PT_AVAIL or PT_COMPRESSED depending on the bfe. |
1367 | // false if we don't have the partitions in main memory (in which case we set the state to PT_ON_DISK. |
1368 | { |
1369 | // Set bfe->child_to_read. |
1370 | update_bfe_using_ftnode(node, bfe); |
1371 | |
1372 | // Setup the partitions. |
1373 | setup_partitions_using_bfe(node, bfe, data_in_memory); |
1374 | } |
1375 | |
1376 | /* deserialize the partition from the sub-block's uncompressed buffer |
1377 | * and destroy the uncompressed buffer |
1378 | */ |
1379 | static int deserialize_ftnode_partition( |
1380 | struct sub_block *sb, |
1381 | FTNODE node, |
1382 | int childnum, // which partition to deserialize |
1383 | const toku::comparator &cmp) { |
1384 | |
1385 | int r = 0; |
1386 | const char *fname = toku_ftnode_get_cachefile_fname_in_env(node); |
1387 | r = verify_ftnode_sub_block(sb, fname, node->blocknum); |
1388 | if (r != 0) { |
1389 | fprintf(stderr, |
1390 | "%s:%d:deserialize_ftnode_partition - " |
1391 | "file[%s], blocknum[%ld], " |
1392 | "verify_ftnode_sub_block failed with %d\n" , |
1393 | __FILE__, |
1394 | __LINE__, |
1395 | fname ? fname : "unknown" , |
1396 | node->blocknum.b, |
1397 | r); |
1398 | goto exit; |
1399 | } |
1400 | uint32_t data_size; |
1401 | data_size = sb->uncompressed_size - 4; // checksum is 4 bytes at end |
1402 | |
1403 | // now with the data verified, we can read the information into the node |
1404 | struct rbuf rb; |
1405 | rbuf_init(&rb, (unsigned char *) sb->uncompressed_ptr, data_size); |
1406 | unsigned char ch; |
1407 | ch = rbuf_char(&rb); |
1408 | |
1409 | if (node->height > 0) { |
1410 | if (ch != FTNODE_PARTITION_MSG_BUFFER) { |
1411 | fprintf(stderr, |
1412 | "%s:%d:deserialize_ftnode_partition - " |
1413 | "file[%s], blocknum[%ld], ch[%d] != " |
1414 | "FTNODE_PARTITION_MSG_BUFFER[%d]\n" , |
1415 | __FILE__, |
1416 | __LINE__, |
1417 | fname ? fname : "unknown" , |
1418 | node->blocknum.b, |
1419 | ch, |
1420 | FTNODE_PARTITION_MSG_BUFFER); |
1421 | dump_bad_block(rb.buf, rb.size); |
1422 | assert(ch == FTNODE_PARTITION_MSG_BUFFER); |
1423 | } |
1424 | NONLEAF_CHILDINFO bnc = BNC(node, childnum); |
1425 | if (node->layout_version_read_from_disk <= FT_LAYOUT_VERSION_26) { |
1426 | // Layout version <= 26 did not serialize sorted message trees to disk. |
1427 | deserialize_child_buffer_v26(bnc, &rb, cmp); |
1428 | } else { |
1429 | deserialize_child_buffer(bnc, &rb); |
1430 | } |
1431 | BP_WORKDONE(node, childnum) = 0; |
1432 | } else { |
1433 | if (ch != FTNODE_PARTITION_DMT_LEAVES) { |
1434 | fprintf(stderr, |
1435 | "%s:%d:deserialize_ftnode_partition - " |
1436 | "file[%s], blocknum[%ld], ch[%d] != " |
1437 | "FTNODE_PARTITION_DMT_LEAVES[%d]\n" , |
1438 | __FILE__, |
1439 | __LINE__, |
1440 | fname ? fname : "unknown" , |
1441 | node->blocknum.b, |
1442 | ch, |
1443 | FTNODE_PARTITION_DMT_LEAVES); |
1444 | dump_bad_block(rb.buf, rb.size); |
1445 | assert(ch == FTNODE_PARTITION_DMT_LEAVES); |
1446 | } |
1447 | |
1448 | BLB_SEQINSERT(node, childnum) = 0; |
1449 | uint32_t num_entries = rbuf_int(&rb); |
1450 | // we are now at the first byte of first leafentry |
1451 | data_size -= rb.ndone; // remaining bytes of leafentry data |
1452 | |
1453 | BASEMENTNODE bn = BLB(node, childnum); |
1454 | bn->data_buffer.deserialize_from_rbuf( |
1455 | num_entries, &rb, data_size, node->layout_version_read_from_disk); |
1456 | } |
1457 | if (rb.ndone != rb.size) { |
1458 | fprintf(stderr, |
1459 | "%s:%d:deserialize_ftnode_partition - " |
1460 | "file[%s], blocknum[%ld], rb.ndone[%d] != rb.size[%d]\n" , |
1461 | __FILE__, |
1462 | __LINE__, |
1463 | fname ? fname : "unknown" , |
1464 | node->blocknum.b, |
1465 | rb.ndone, |
1466 | rb.size); |
1467 | dump_bad_block(rb.buf, rb.size); |
1468 | assert(rb.ndone == rb.size); |
1469 | } |
1470 | |
1471 | exit: |
1472 | return r; |
1473 | } |
1474 | |
1475 | static int decompress_and_deserialize_worker(struct rbuf curr_rbuf, |
1476 | struct sub_block curr_sb, |
1477 | FTNODE node, |
1478 | int child, |
1479 | const toku::comparator &cmp, |
1480 | tokutime_t *decompress_time) { |
1481 | int r = 0; |
1482 | tokutime_t t0 = toku_time_now(); |
1483 | r = read_and_decompress_sub_block(&curr_rbuf, &curr_sb); |
1484 | if (r != 0) { |
1485 | const char *fname = toku_ftnode_get_cachefile_fname_in_env(node); |
1486 | fprintf(stderr, |
1487 | "%s:%d:decompress_and_deserialize_worker - " |
1488 | "file[%s], blocknum[%ld], read_and_decompress_sub_block failed " |
1489 | "with %d\n" , |
1490 | __FILE__, |
1491 | __LINE__, |
1492 | fname ? fname : "unknown" , |
1493 | node->blocknum.b, |
1494 | r); |
1495 | dump_bad_block(curr_rbuf.buf, curr_rbuf.size); |
1496 | goto exit; |
1497 | } |
1498 | *decompress_time = toku_time_now() - t0; |
1499 | // at this point, sb->uncompressed_ptr stores the serialized node partition |
1500 | r = deserialize_ftnode_partition(&curr_sb, node, child, cmp); |
1501 | if (r != 0) { |
1502 | const char *fname = toku_ftnode_get_cachefile_fname_in_env(node); |
1503 | fprintf(stderr, |
1504 | "%s:%d:decompress_and_deserialize_worker - " |
1505 | "file[%s], blocknum[%ld], deserialize_ftnode_partition failed " |
1506 | "with %d\n" , |
1507 | __FILE__, |
1508 | __LINE__, |
1509 | fname ? fname : "unknown" , |
1510 | node->blocknum.b, |
1511 | r); |
1512 | dump_bad_block(curr_rbuf.buf, curr_rbuf.size); |
1513 | goto exit; |
1514 | } |
1515 | |
1516 | exit: |
1517 | toku_free(curr_sb.uncompressed_ptr); |
1518 | return r; |
1519 | } |
1520 | |
1521 | static int check_and_copy_compressed_sub_block_worker(struct rbuf curr_rbuf, |
1522 | struct sub_block curr_sb, |
1523 | FTNODE node, |
1524 | int child) { |
1525 | int r = 0; |
1526 | r = read_compressed_sub_block(&curr_rbuf, &curr_sb); |
1527 | if (r != 0) { |
1528 | goto exit; |
1529 | } |
1530 | |
1531 | SUB_BLOCK bp_sb; |
1532 | bp_sb = BSB(node, child); |
1533 | bp_sb->compressed_size = curr_sb.compressed_size; |
1534 | bp_sb->uncompressed_size = curr_sb.uncompressed_size; |
1535 | bp_sb->compressed_ptr = toku_xmalloc(bp_sb->compressed_size); |
1536 | memcpy( |
1537 | bp_sb->compressed_ptr, curr_sb.compressed_ptr, bp_sb->compressed_size); |
1538 | exit: |
1539 | return r; |
1540 | } |
1541 | |
1542 | static FTNODE alloc_ftnode_for_deserialize(uint32_t fullhash, BLOCKNUM blocknum) { |
1543 | // Effect: Allocate an FTNODE and fill in the values that are not read from |
1544 | FTNODE XMALLOC(node); |
1545 | node->fullhash = fullhash; |
1546 | node->blocknum = blocknum; |
1547 | node->dirty = 0; |
1548 | node->oldest_referenced_xid_known = TXNID_NONE; |
1549 | node->bp = nullptr; |
1550 | node->ct_pair = nullptr; |
1551 | return node; |
1552 | } |
1553 | |
1554 | static int ( |
1555 | FTNODE *ftnode, |
1556 | FTNODE_DISK_DATA *ndd, |
1557 | BLOCKNUM blocknum, |
1558 | uint32_t fullhash, |
1559 | ftnode_fetch_extra *bfe, |
1560 | struct rbuf *rb, |
1561 | int fd) |
1562 | // If we have enough information in the rbuf to construct a header, then do so. |
1563 | // Also fetch in the basement node if needed. |
1564 | // Return 0 if it worked. If something goes wrong (including that we are |
1565 | // looking at some old data format that doesn't have partitions) then return |
1566 | // nonzero. |
1567 | { |
1568 | int r = 0; |
1569 | |
1570 | tokutime_t t0, t1; |
1571 | tokutime_t decompress_time = 0; |
1572 | tokutime_t deserialize_time = 0; |
1573 | // we must get the name from bfe and not through |
1574 | // toku_ftnode_get_cachefile_fname_in_env as the node is not set up yet |
1575 | const char* fname = toku_cachefile_fname_in_env(bfe->ft->cf); |
1576 | |
1577 | t0 = toku_time_now(); |
1578 | |
1579 | FTNODE node = alloc_ftnode_for_deserialize(fullhash, blocknum); |
1580 | |
1581 | if (rb->size < 24) { |
1582 | fprintf( |
1583 | stderr, |
1584 | "%s:%d:deserialize_ftnode_header_from_rbuf_if_small_enough - " |
1585 | "file[%s], blocknum[%ld], rb->size[%u] < 24\n" , |
1586 | __FILE__, |
1587 | __LINE__, |
1588 | fname ? fname : "unknown" , |
1589 | blocknum.b, |
1590 | rb->size); |
1591 | dump_bad_block(rb->buf, rb->size); |
1592 | // TODO: What error do we return here? |
1593 | // Does it even matter? |
1594 | r = toku_db_badformat(); |
1595 | goto cleanup; |
1596 | } |
1597 | |
1598 | const void *magic; |
1599 | rbuf_literal_bytes(rb, &magic, 8); |
1600 | if (memcmp(magic, "tokuleaf" , 8) != 0 && |
1601 | memcmp(magic, "tokunode" , 8) != 0) { |
1602 | fprintf( |
1603 | stderr, |
1604 | "%s:%d:deserialize_ftnode_header_from_rbuf_if_small_enough - " |
1605 | "file[%s], blocknum[%ld], unrecognized magic number " |
1606 | "%2.2x %2.2x %2.2x %2.2x %2.2x %2.2x %2.2x %2.2x\n" , |
1607 | __FILE__, |
1608 | __LINE__, |
1609 | fname ? fname : "unknown" , |
1610 | blocknum.b, |
1611 | static_cast<const uint8_t*>(magic)[0], |
1612 | static_cast<const uint8_t*>(magic)[1], |
1613 | static_cast<const uint8_t*>(magic)[2], |
1614 | static_cast<const uint8_t*>(magic)[3], |
1615 | static_cast<const uint8_t*>(magic)[4], |
1616 | static_cast<const uint8_t*>(magic)[5], |
1617 | static_cast<const uint8_t*>(magic)[6], |
1618 | static_cast<const uint8_t*>(magic)[7]); |
1619 | dump_bad_block(rb->buf, rb->size); |
1620 | r = toku_db_badformat(); |
1621 | goto cleanup; |
1622 | } |
1623 | |
1624 | node->layout_version_read_from_disk = rbuf_int(rb); |
1625 | if (node->layout_version_read_from_disk < |
1626 | FT_FIRST_LAYOUT_VERSION_WITH_BASEMENT_NODES) { |
1627 | fprintf( |
1628 | stderr, |
1629 | "%s:%d:deserialize_ftnode_header_from_rbuf_if_small_enough - " |
1630 | "file[%s], blocknum[%ld], node->layout_version_read_from_disk[%d] " |
1631 | "< FT_FIRST_LAYOUT_VERSION_WITH_BASEMENT_NODES[%d]\n" , |
1632 | __FILE__, |
1633 | __LINE__, |
1634 | fname ? fname : "unknown" , |
1635 | blocknum.b, |
1636 | node->layout_version_read_from_disk, |
1637 | FT_FIRST_LAYOUT_VERSION_WITH_BASEMENT_NODES); |
1638 | dump_bad_block(rb->buf, rb->size); |
1639 | // This code path doesn't have to worry about upgrade. |
1640 | r = toku_db_badformat(); |
1641 | goto cleanup; |
1642 | } |
1643 | |
1644 | // If we get here, we know the node is at least |
1645 | // FT_FIRST_LAYOUT_VERSION_WITH_BASEMENT_NODES. We haven't changed |
1646 | // the serialization format since then (this comment is correct as of |
1647 | // version 20, which is Deadshot) so we can go ahead and say the |
1648 | // layout version is current (it will be as soon as we finish |
1649 | // deserializing). |
1650 | // TODO(leif): remove node->layout_version (#5174) |
1651 | node->layout_version = FT_LAYOUT_VERSION; |
1652 | |
1653 | node->layout_version_original = rbuf_int(rb); |
1654 | node->build_id = rbuf_int(rb); |
1655 | node->n_children = rbuf_int(rb); |
1656 | // Guaranteed to be have been able to read up to here. If n_children |
1657 | // is too big, we may have a problem, so check that we won't overflow |
1658 | // while reading the partition locations. |
1659 | unsigned int nhsize; |
1660 | // we can do this because n_children is filled in. |
1661 | nhsize = serialize_node_header_size(node); |
1662 | unsigned int needed_size; |
1663 | // we need 12 more so that we can read the compressed block size information |
1664 | // that follows for the nodeinfo. |
1665 | needed_size = nhsize + 12; |
1666 | if (needed_size > rb->size) { |
1667 | fprintf( |
1668 | stderr, |
1669 | "%s:%d:deserialize_ftnode_header_from_rbuf_if_small_enough - " |
1670 | "file[%s], blocknum[%ld], needed_size[%d] > rb->size[%d]\n" , |
1671 | __FILE__, |
1672 | __LINE__, |
1673 | fname ? fname : "unknown" , |
1674 | blocknum.b, |
1675 | needed_size, |
1676 | rb->size); |
1677 | dump_bad_block(rb->buf, rb->size); |
1678 | r = toku_db_badformat(); |
1679 | goto cleanup; |
1680 | } |
1681 | |
1682 | XMALLOC_N(node->n_children, node->bp); |
1683 | XMALLOC_N(node->n_children, *ndd); |
1684 | // read the partition locations |
1685 | for (int i=0; i<node->n_children; i++) { |
1686 | BP_START(*ndd,i) = rbuf_int(rb); |
1687 | BP_SIZE (*ndd,i) = rbuf_int(rb); |
1688 | } |
1689 | |
1690 | uint32_t checksum; |
1691 | checksum = toku_x1764_memory(rb->buf, rb->ndone); |
1692 | uint32_t stored_checksum; |
1693 | stored_checksum = rbuf_int(rb); |
1694 | if (stored_checksum != checksum) { |
1695 | fprintf( |
1696 | stderr, |
1697 | "%s:%d:deserialize_ftnode_header_from_rbuf_if_small_enough - " |
1698 | "file[%s], blocknum[%ld], stored_checksum[%d] != checksum[%d]\n" , |
1699 | __FILE__, |
1700 | __LINE__, |
1701 | fname ? fname : "unknown" , |
1702 | blocknum.b, |
1703 | stored_checksum, |
1704 | checksum); |
1705 | dump_bad_block(rb->buf, rb->size); |
1706 | r = TOKUDB_BAD_CHECKSUM; |
1707 | goto cleanup; |
1708 | } |
1709 | |
1710 | // Now we want to read the pivot information. |
1711 | struct sub_block sb_node_info; |
1712 | sub_block_init(&sb_node_info); |
1713 | // we'll be able to read these because we checked the size earlier. |
1714 | sb_node_info.compressed_size = rbuf_int(rb); |
1715 | sb_node_info.uncompressed_size = rbuf_int(rb); |
1716 | if (rb->size - rb->ndone < sb_node_info.compressed_size + 8) { |
1717 | fprintf( |
1718 | stderr, |
1719 | "%s:%d:deserialize_ftnode_header_from_rbuf_if_small_enough - " |
1720 | "file[%s], blocknum[%ld], rb->size[%d] - rb->ndone[%d] < " |
1721 | "sb_node_info.compressed_size[%d] + 8\n" , |
1722 | __FILE__, |
1723 | __LINE__, |
1724 | fname ? fname : "unknown" , |
1725 | blocknum.b, |
1726 | rb->size, |
1727 | rb->ndone, |
1728 | sb_node_info.compressed_size); |
1729 | dump_bad_block(rb->buf, rb->size); |
1730 | r = toku_db_badformat(); |
1731 | goto cleanup; |
1732 | } |
1733 | |
1734 | // Finish reading compressed the sub_block |
1735 | const void **cp; |
1736 | cp = (const void **) &sb_node_info.compressed_ptr; |
1737 | rbuf_literal_bytes(rb, cp, sb_node_info.compressed_size); |
1738 | sb_node_info.xsum = rbuf_int(rb); |
1739 | // let's check the checksum |
1740 | uint32_t actual_xsum; |
1741 | actual_xsum = toku_x1764_memory((char *)sb_node_info.compressed_ptr - 8, |
1742 | 8 + sb_node_info.compressed_size); |
1743 | if (sb_node_info.xsum != actual_xsum) { |
1744 | fprintf( |
1745 | stderr, |
1746 | "%s:%d:deserialize_ftnode_header_from_rbuf_if_small_enough - " |
1747 | "file[%s], blocknum[%ld], sb_node_info.xsum[%d] != actual_xsum[%d]\n" , |
1748 | __FILE__, |
1749 | __LINE__, |
1750 | fname ? fname : "unknown" , |
1751 | blocknum.b, |
1752 | sb_node_info.xsum, |
1753 | actual_xsum); |
1754 | dump_bad_block(rb->buf, rb->size); |
1755 | r = TOKUDB_BAD_CHECKSUM; |
1756 | goto cleanup; |
1757 | } |
1758 | |
1759 | // Now decompress the subblock |
1760 | { |
1761 | toku::scoped_malloc sb_node_info_buf(sb_node_info.uncompressed_size); |
1762 | sb_node_info.uncompressed_ptr = sb_node_info_buf.get(); |
1763 | tokutime_t decompress_t0 = toku_time_now(); |
1764 | toku_decompress((Bytef *)sb_node_info.uncompressed_ptr, |
1765 | sb_node_info.uncompressed_size, |
1766 | (Bytef *)sb_node_info.compressed_ptr, |
1767 | sb_node_info.compressed_size); |
1768 | tokutime_t decompress_t1 = toku_time_now(); |
1769 | decompress_time = decompress_t1 - decompress_t0; |
1770 | |
1771 | // at this point sb->uncompressed_ptr stores the serialized node info. |
1772 | r = deserialize_ftnode_info(&sb_node_info, node); |
1773 | if (r != 0) { |
1774 | fprintf( |
1775 | stderr, |
1776 | "%s:%d:deserialize_ftnode_header_from_rbuf_if_small_enough - " |
1777 | "file[%s], blocknum[%ld], deserialize_ftnode_info failed with " |
1778 | "%d\n" , |
1779 | __FILE__, |
1780 | __LINE__, |
1781 | fname ? fname : "unknown" , |
1782 | blocknum.b, |
1783 | r); |
1784 | dump_bad_block( |
1785 | static_cast<unsigned char *>(sb_node_info.uncompressed_ptr), |
1786 | sb_node_info.uncompressed_size); |
1787 | dump_bad_block(rb->buf, rb->size); |
1788 | goto cleanup; |
1789 | } |
1790 | } |
1791 | |
1792 | // Now we have the ftnode_info. We have a bunch more stuff in the |
1793 | // rbuf, so we might be able to store the compressed data for some |
1794 | // objects. |
1795 | // We can proceed to deserialize the individual subblocks. |
1796 | |
1797 | // setup the memory of the partitions |
1798 | // for partitions being decompressed, create either message buffer or basement node |
1799 | // for partitions staying compressed, create sub_block |
1800 | setup_ftnode_partitions(node, bfe, false); |
1801 | |
1802 | // We must capture deserialize and decompression time before |
1803 | // the pf_callback, otherwise we would double-count. |
1804 | t1 = toku_time_now(); |
1805 | deserialize_time = (t1 - t0) - decompress_time; |
1806 | |
1807 | // do partial fetch if necessary |
1808 | if (bfe->type != ftnode_fetch_none) { |
1809 | PAIR_ATTR attr; |
1810 | r = toku_ftnode_pf_callback(node, *ndd, bfe, fd, &attr); |
1811 | if (r != 0) { |
1812 | fprintf( |
1813 | stderr, |
1814 | "%s:%d:deserialize_ftnode_header_from_rbuf_if_small_enough - " |
1815 | "file[%s], blocknum[%ld], toku_ftnode_pf_callback failed with " |
1816 | "%d\n" , |
1817 | __FILE__, |
1818 | __LINE__, |
1819 | fname ? fname : "unknown" , |
1820 | blocknum.b, |
1821 | r); |
1822 | dump_bad_block(rb->buf, rb->size); |
1823 | goto cleanup; |
1824 | } |
1825 | } |
1826 | |
1827 | // handle clock |
1828 | for (int i = 0; i < node->n_children; i++) { |
1829 | if (bfe->wants_child_available(i)) { |
1830 | paranoid_invariant(BP_STATE(node,i) == PT_AVAIL); |
1831 | BP_TOUCH_CLOCK(node,i); |
1832 | } |
1833 | } |
1834 | *ftnode = node; |
1835 | r = 0; |
1836 | |
1837 | cleanup: |
1838 | if (r == 0) { |
1839 | bfe->deserialize_time += deserialize_time; |
1840 | bfe->decompress_time += decompress_time; |
1841 | toku_ft_status_update_deserialize_times(node, deserialize_time, decompress_time); |
1842 | } |
1843 | if (r != 0) { |
1844 | if (node) { |
1845 | toku_free(*ndd); |
1846 | toku_free(node->bp); |
1847 | toku_free(node); |
1848 | } |
1849 | } |
1850 | return r; |
1851 | } |
1852 | |
1853 | // This function takes a deserialized version 13 or 14 buffer and |
1854 | // constructs the associated internal, non-leaf ftnode object. It |
1855 | // also creates MSN's for older messages created in older versions |
1856 | // that did not generate MSN's for messages. These new MSN's are |
1857 | // generated from the root downwards, counting backwards from MIN_MSN |
1858 | // and persisted in the ft header. |
1859 | static int deserialize_and_upgrade_internal_node(FTNODE node, |
1860 | struct rbuf *rb, |
1861 | ftnode_fetch_extra *bfe, |
1862 | STAT64INFO info) { |
1863 | int version = node->layout_version_read_from_disk; |
1864 | |
1865 | if (version == FT_LAST_LAYOUT_VERSION_WITH_FINGERPRINT) { |
1866 | (void) rbuf_int(rb); // 10. fingerprint |
1867 | } |
1868 | |
1869 | node->n_children = rbuf_int(rb); // 11. n_children |
1870 | |
1871 | // Sub-tree esitmates... |
1872 | for (int i = 0; i < node->n_children; ++i) { |
1873 | if (version == FT_LAST_LAYOUT_VERSION_WITH_FINGERPRINT) { |
1874 | (void) rbuf_int(rb); // 12. fingerprint |
1875 | } |
1876 | uint64_t nkeys = rbuf_ulonglong(rb); // 13. nkeys |
1877 | uint64_t ndata = rbuf_ulonglong(rb); // 14. ndata |
1878 | uint64_t dsize = rbuf_ulonglong(rb); // 15. dsize |
1879 | (void) rbuf_char(rb); // 16. exact (char) |
1880 | invariant(nkeys == ndata); |
1881 | if (info) { |
1882 | // info is non-null if we're trying to upgrade old subtree |
1883 | // estimates to stat64info |
1884 | info->numrows += nkeys; |
1885 | info->numbytes += dsize; |
1886 | } |
1887 | } |
1888 | |
1889 | // Pivot keys |
1890 | node->pivotkeys.deserialize_from_rbuf(rb, node->n_children - 1); |
1891 | |
1892 | // Create space for the child node buffers (a.k.a. partitions). |
1893 | XMALLOC_N(node->n_children, node->bp); |
1894 | |
1895 | // Set the child blocknums. |
1896 | for (int i = 0; i < node->n_children; ++i) { |
1897 | BP_BLOCKNUM(node, i) = rbuf_blocknum(rb); // 18. blocknums |
1898 | BP_WORKDONE(node, i) = 0; |
1899 | } |
1900 | |
1901 | // Read in the child buffer maps. |
1902 | for (int i = 0; i < node->n_children; ++i) { |
1903 | // The following fields were previously used by the `sub_block_map' |
1904 | // They include: |
1905 | // - 4 byte index |
1906 | (void) rbuf_int(rb); |
1907 | // - 4 byte offset |
1908 | (void) rbuf_int(rb); |
1909 | // - 4 byte size |
1910 | (void) rbuf_int(rb); |
1911 | } |
1912 | |
1913 | // We need to setup this node's partitions, but we can't call the |
1914 | // existing call (setup_ftnode_paritions.) because there are |
1915 | // existing optimizations that would prevent us from bringing all |
1916 | // of this node's partitions into memory. Instead, We use the |
1917 | // existing bfe and node to set the bfe's child_to_search member. |
1918 | // Then we create a temporary bfe that needs all the nodes to make |
1919 | // sure we properly intitialize our partitions before filling them |
1920 | // in from our soon-to-be-upgraded node. |
1921 | update_bfe_using_ftnode(node, bfe); |
1922 | ftnode_fetch_extra temp_bfe; |
1923 | temp_bfe.create_for_full_read(nullptr); |
1924 | setup_partitions_using_bfe(node, &temp_bfe, true); |
1925 | |
1926 | // Cache the highest MSN generated for the message buffers. This |
1927 | // will be set in the ftnode. |
1928 | // |
1929 | // The way we choose MSNs for upgraded messages is delicate. The |
1930 | // field `highest_unused_msn_for_upgrade' in the header is always an |
1931 | // MSN that no message has yet. So when we have N messages that need |
1932 | // MSNs, we decrement it by N, and then use it and the N-1 MSNs less |
1933 | // than it, but we do not use the value we decremented it to. |
1934 | // |
1935 | // In the code below, we initialize `lowest' with the value of |
1936 | // `highest_unused_msn_for_upgrade' after it is decremented, so we |
1937 | // need to be sure to increment it once before we enqueue our first |
1938 | // message. |
1939 | MSN highest_msn; |
1940 | highest_msn.msn = 0; |
1941 | |
1942 | // Deserialize de-compressed buffers. |
1943 | for (int i = 0; i < node->n_children; ++i) { |
1944 | NONLEAF_CHILDINFO bnc = BNC(node, i); |
1945 | MSN highest_msn_in_this_buffer = deserialize_child_buffer_v13(bfe->ft, bnc, rb); |
1946 | if (highest_msn.msn == 0) { |
1947 | highest_msn.msn = highest_msn_in_this_buffer.msn; |
1948 | } |
1949 | } |
1950 | |
1951 | // Assign the highest msn from our upgrade message buffers |
1952 | node->max_msn_applied_to_node_on_disk = highest_msn; |
1953 | // Since we assigned MSNs to this node's messages, we need to dirty it. |
1954 | node->dirty = 1; |
1955 | |
1956 | // Must compute the checksum now (rather than at the end, while we |
1957 | // still have the pointer to the buffer). |
1958 | if (version >= FT_FIRST_LAYOUT_VERSION_WITH_END_TO_END_CHECKSUM) { |
1959 | uint32_t expected_xsum = toku_dtoh32(*(uint32_t*)(rb->buf+rb->size-4)); // 27. checksum |
1960 | uint32_t actual_xsum = toku_x1764_memory(rb->buf, rb->size-4); |
1961 | if (expected_xsum != actual_xsum) { |
1962 | fprintf(stderr, "%s:%d: Bad checksum: expected = %" PRIx32 ", actual= %" PRIx32 "\n" , |
1963 | __FUNCTION__, |
1964 | __LINE__, |
1965 | expected_xsum, |
1966 | actual_xsum); |
1967 | fprintf(stderr, |
1968 | "Checksum failure while reading node in file %s.\n" , |
1969 | toku_cachefile_fname_in_env(bfe->ft->cf)); |
1970 | fflush(stderr); |
1971 | return toku_db_badformat(); |
1972 | } |
1973 | } |
1974 | |
1975 | return 0; |
1976 | } |
1977 | |
1978 | // This function takes a deserialized version 13 or 14 buffer and |
1979 | // constructs the associated leaf ftnode object. |
1980 | static int |
1981 | deserialize_and_upgrade_leaf_node(FTNODE node, |
1982 | struct rbuf *rb, |
1983 | ftnode_fetch_extra *bfe, |
1984 | STAT64INFO info) |
1985 | { |
1986 | int r = 0; |
1987 | int version = node->layout_version_read_from_disk; |
1988 | |
1989 | // This is a leaf node, so the offsets in the buffer will be |
1990 | // different from the internal node offsets above. |
1991 | uint64_t nkeys = rbuf_ulonglong(rb); // 10. nkeys |
1992 | uint64_t ndata = rbuf_ulonglong(rb); // 11. ndata |
1993 | uint64_t dsize = rbuf_ulonglong(rb); // 12. dsize |
1994 | invariant(nkeys == ndata); |
1995 | if (info) { |
1996 | // info is non-null if we're trying to upgrade old subtree |
1997 | // estimates to stat64info |
1998 | info->numrows += nkeys; |
1999 | info->numbytes += dsize; |
2000 | } |
2001 | |
2002 | // This is the optimized for upgrade field. |
2003 | if (version == FT_LAYOUT_VERSION_14) { |
2004 | (void) rbuf_int(rb); // 13. optimized |
2005 | } |
2006 | |
2007 | // npartitions - This is really the number of leaf entries in |
2008 | // our single basement node. There should only be 1 (ONE) |
2009 | // partition, so there shouldn't be any pivot key stored. This |
2010 | // means the loop will not iterate. We could remove the loop and |
2011 | // assert that the value is indeed 1. |
2012 | int npartitions = rbuf_int(rb); // 14. npartitions |
2013 | assert(npartitions == 1); |
2014 | |
2015 | // Set number of children to 1, since we will only have one |
2016 | // basement node. |
2017 | node->n_children = 1; |
2018 | XMALLOC_N(node->n_children, node->bp); |
2019 | node->pivotkeys.create_empty(); |
2020 | |
2021 | // Create one basement node to contain all the leaf entries by |
2022 | // setting up the single partition and updating the bfe. |
2023 | update_bfe_using_ftnode(node, bfe); |
2024 | ftnode_fetch_extra temp_bfe; |
2025 | temp_bfe.create_for_full_read(bfe->ft); |
2026 | setup_partitions_using_bfe(node, &temp_bfe, true); |
2027 | |
2028 | // 11. Deserialize the partition maps, though they are not used in the |
2029 | // newer versions of ftnodes. |
2030 | for (int i = 0; i < node->n_children; ++i) { |
2031 | // The following fields were previously used by the `sub_block_map' |
2032 | // They include: |
2033 | // - 4 byte index |
2034 | (void) rbuf_int(rb); |
2035 | // - 4 byte offset |
2036 | (void) rbuf_int(rb); |
2037 | // - 4 byte size |
2038 | (void) rbuf_int(rb); |
2039 | } |
2040 | |
2041 | // Copy all of the leaf entries into the single basement node. |
2042 | |
2043 | // The number of leaf entries in buffer. |
2044 | int n_in_buf = rbuf_int(rb); // 15. # of leaves |
2045 | BLB_SEQINSERT(node,0) = 0; |
2046 | BASEMENTNODE bn = BLB(node, 0); |
2047 | |
2048 | // Read the leaf entries from the buffer, advancing the buffer |
2049 | // as we go. |
2050 | bool has_end_to_end_checksum = (version >= FT_FIRST_LAYOUT_VERSION_WITH_END_TO_END_CHECKSUM); |
2051 | if (version <= FT_LAYOUT_VERSION_13) { |
2052 | // Create our mempool. |
2053 | // Loop through |
2054 | for (int i = 0; i < n_in_buf; ++i) { |
2055 | LEAFENTRY_13 le = reinterpret_cast<LEAFENTRY_13>(&rb->buf[rb->ndone]); |
2056 | uint32_t disksize = leafentry_disksize_13(le); |
2057 | rb->ndone += disksize; // 16. leaf entry (13) |
2058 | invariant(rb->ndone<=rb->size); |
2059 | LEAFENTRY new_le; |
2060 | size_t new_le_size; |
2061 | void* key = NULL; |
2062 | uint32_t keylen = 0; |
2063 | r = toku_le_upgrade_13_14(le, |
2064 | &key, |
2065 | &keylen, |
2066 | &new_le_size, |
2067 | &new_le); |
2068 | assert_zero(r); |
2069 | // Copy the pointer value straight into the OMT |
2070 | LEAFENTRY new_le_in_bn = nullptr; |
2071 | void *maybe_free; |
2072 | bn->data_buffer.get_space_for_insert( |
2073 | i, |
2074 | key, |
2075 | keylen, |
2076 | new_le_size, |
2077 | &new_le_in_bn, |
2078 | &maybe_free |
2079 | ); |
2080 | if (maybe_free) { |
2081 | toku_free(maybe_free); |
2082 | } |
2083 | memcpy(new_le_in_bn, new_le, new_le_size); |
2084 | toku_free(new_le); |
2085 | } |
2086 | } else { |
2087 | uint32_t data_size = rb->size - rb->ndone; |
2088 | if (has_end_to_end_checksum) { |
2089 | data_size -= sizeof(uint32_t); |
2090 | } |
2091 | bn->data_buffer.deserialize_from_rbuf(n_in_buf, rb, data_size, node->layout_version_read_from_disk); |
2092 | } |
2093 | |
2094 | // Whatever this is must be less than the MSNs of every message above |
2095 | // it, so it's ok to take it here. |
2096 | bn->max_msn_applied = bfe->ft->h->highest_unused_msn_for_upgrade; |
2097 | bn->stale_ancestor_messages_applied = false; |
2098 | node->max_msn_applied_to_node_on_disk = bn->max_msn_applied; |
2099 | |
2100 | // Checksum (end to end) is only on version 14 |
2101 | if (has_end_to_end_checksum) { |
2102 | uint32_t expected_xsum = rbuf_int(rb); // 17. checksum |
2103 | uint32_t actual_xsum = toku_x1764_memory(rb->buf, rb->size - 4); |
2104 | if (expected_xsum != actual_xsum) { |
2105 | fprintf(stderr, "%s:%d: Bad checksum: expected = %" PRIx32 ", actual= %" PRIx32 "\n" , |
2106 | __FUNCTION__, |
2107 | __LINE__, |
2108 | expected_xsum, |
2109 | actual_xsum); |
2110 | fprintf(stderr, |
2111 | "Checksum failure while reading node in file %s.\n" , |
2112 | toku_cachefile_fname_in_env(bfe->ft->cf)); |
2113 | fflush(stderr); |
2114 | return toku_db_badformat(); |
2115 | } |
2116 | } |
2117 | |
2118 | // We should have read the whole block by this point. |
2119 | if (rb->ndone != rb->size) { |
2120 | // TODO: Error handling. |
2121 | return 1; |
2122 | } |
2123 | |
2124 | return r; |
2125 | } |
2126 | |
2127 | static int read_and_decompress_block_from_fd_into_rbuf( |
2128 | int fd, |
2129 | BLOCKNUM blocknum, |
2130 | DISKOFF offset, |
2131 | DISKOFF size, |
2132 | FT ft, |
2133 | struct rbuf *rb, |
2134 | /* out */ int *layout_version_p); |
2135 | |
2136 | // This function upgrades a version 14 or 13 ftnode to the current |
2137 | // version. NOTE: This code assumes the first field of the rbuf has |
2138 | // already been read from the buffer (namely the layout_version of the |
2139 | // ftnode.) |
2140 | static int deserialize_and_upgrade_ftnode(FTNODE node, |
2141 | FTNODE_DISK_DATA *ndd, |
2142 | BLOCKNUM blocknum, |
2143 | ftnode_fetch_extra *bfe, |
2144 | STAT64INFO info, |
2145 | int fd) { |
2146 | int r = 0; |
2147 | int version; |
2148 | |
2149 | // I. First we need to de-compress the entire node, only then can |
2150 | // we read the different sub-sections. |
2151 | // get the file offset and block size for the block |
2152 | DISKOFF offset, size; |
2153 | bfe->ft->blocktable.translate_blocknum_to_offset_size(blocknum, &offset, &size); |
2154 | |
2155 | struct rbuf rb; |
2156 | r = read_and_decompress_block_from_fd_into_rbuf(fd, |
2157 | blocknum, |
2158 | offset, |
2159 | size, |
2160 | bfe->ft, |
2161 | &rb, |
2162 | &version); |
2163 | if (r != 0) { |
2164 | const char* fname = toku_cachefile_fname_in_env(bfe->ft->cf); |
2165 | fprintf(stderr, |
2166 | "%s:%d:deserialize_and_upgrade_ftnode - " |
2167 | "file[%s], blocknum[%ld], " |
2168 | "read_and_decompress_block_from_fd_into_rbuf failed with %d\n" , |
2169 | __FILE__, |
2170 | __LINE__, |
2171 | fname ? fname : "unknown" , |
2172 | blocknum.b, |
2173 | r); |
2174 | goto exit; |
2175 | } |
2176 | |
2177 | // Re-read the magic field from the previous call, since we are |
2178 | // restarting with a fresh rbuf. |
2179 | { |
2180 | const void *magic; |
2181 | rbuf_literal_bytes(&rb, &magic, 8); // 1. magic |
2182 | } |
2183 | |
2184 | // II. Start reading ftnode fields out of the decompressed buffer. |
2185 | |
2186 | // Copy over old version info. |
2187 | node->layout_version_read_from_disk = rbuf_int(&rb); // 2. layout version |
2188 | version = node->layout_version_read_from_disk; |
2189 | if (version > FT_LAYOUT_VERSION_14) { |
2190 | const char* fname = toku_cachefile_fname_in_env(bfe->ft->cf); |
2191 | fprintf(stderr, |
2192 | "%s:%d:deserialize_and_upgrade_ftnode - " |
2193 | "file[%s], blocknum[%ld], version[%d] > " |
2194 | "FT_LAYOUT_VERSION_14[%d]\n" , |
2195 | __FILE__, |
2196 | __LINE__, |
2197 | fname ? fname : "unknown" , |
2198 | blocknum.b, |
2199 | version, |
2200 | FT_LAYOUT_VERSION_14); |
2201 | dump_bad_block(rb.buf, rb.size); |
2202 | goto exit; |
2203 | } |
2204 | assert(version <= FT_LAYOUT_VERSION_14); |
2205 | // Upgrade the current version number to the current version. |
2206 | node->layout_version = FT_LAYOUT_VERSION; |
2207 | |
2208 | node->layout_version_original = rbuf_int(&rb); // 3. original layout |
2209 | node->build_id = rbuf_int(&rb); // 4. build id |
2210 | |
2211 | // The remaining offsets into the rbuf do not map to the current |
2212 | // version, so we need to fill in the blanks and ignore older |
2213 | // fields. |
2214 | (void)rbuf_int(&rb); // 5. nodesize |
2215 | node->flags = rbuf_int(&rb); // 6. flags |
2216 | node->height = rbuf_int(&rb); // 7. height |
2217 | |
2218 | // If the version is less than 14, there are two extra ints here. |
2219 | // we would need to ignore them if they are there. |
2220 | // These are the 'fingerprints'. |
2221 | if (version == FT_LAYOUT_VERSION_13) { |
2222 | (void) rbuf_int(&rb); // 8. rand4 |
2223 | (void) rbuf_int(&rb); // 9. local |
2224 | } |
2225 | |
2226 | // The next offsets are dependent on whether this is a leaf node |
2227 | // or not. |
2228 | |
2229 | // III. Read in Leaf and Internal Node specific data. |
2230 | |
2231 | // Check height to determine whether this is a leaf node or not. |
2232 | if (node->height > 0) { |
2233 | r = deserialize_and_upgrade_internal_node(node, &rb, bfe, info); |
2234 | } else { |
2235 | r = deserialize_and_upgrade_leaf_node(node, &rb, bfe, info); |
2236 | } |
2237 | |
2238 | XMALLOC_N(node->n_children, *ndd); |
2239 | // Initialize the partition locations to zero, because version 14 |
2240 | // and below have no notion of partitions on disk. |
2241 | for (int i=0; i<node->n_children; i++) { |
2242 | BP_START(*ndd,i) = 0; |
2243 | BP_SIZE (*ndd,i) = 0; |
2244 | } |
2245 | |
2246 | toku_free(rb.buf); |
2247 | exit: |
2248 | return r; |
2249 | } |
2250 | |
2251 | // Effect: deserializes a ftnode that is in rb (with pointer of rb just past the |
2252 | // magic) into a FTNODE. |
2253 | static int (FTNODE *ftnode, |
2254 | FTNODE_DISK_DATA *ndd, |
2255 | BLOCKNUM blocknum, |
2256 | uint32_t fullhash, |
2257 | ftnode_fetch_extra *bfe, |
2258 | STAT64INFO info, |
2259 | struct rbuf *rb, |
2260 | int fd) { |
2261 | int r = 0; |
2262 | struct sub_block sb_node_info; |
2263 | |
2264 | tokutime_t t0, t1; |
2265 | tokutime_t decompress_time = 0; |
2266 | tokutime_t deserialize_time = 0; |
2267 | const char* fname = toku_cachefile_fname_in_env(bfe->ft->cf); |
2268 | |
2269 | t0 = toku_time_now(); |
2270 | |
2271 | FTNODE node = alloc_ftnode_for_deserialize(fullhash, blocknum); |
2272 | |
2273 | // now start reading from rbuf |
2274 | // first thing we do is read the header information |
2275 | const void *magic; |
2276 | rbuf_literal_bytes(rb, &magic, 8); |
2277 | if (memcmp(magic, "tokuleaf" , 8) != 0 && |
2278 | memcmp(magic, "tokunode" , 8) != 0) { |
2279 | fprintf(stderr, |
2280 | "%s:%d:deserialize_ftnode_from_rbuf - " |
2281 | "file[%s], blocknum[%ld], unrecognized magic number " |
2282 | "%2.2x %2.2x %2.2x %2.2x %2.2x %2.2x %2.2x %2.2x\n" , |
2283 | __FILE__, |
2284 | __LINE__, |
2285 | fname ? fname : "unknown" , |
2286 | blocknum.b, |
2287 | static_cast<const uint8_t *>(magic)[0], |
2288 | static_cast<const uint8_t *>(magic)[1], |
2289 | static_cast<const uint8_t *>(magic)[2], |
2290 | static_cast<const uint8_t *>(magic)[3], |
2291 | static_cast<const uint8_t *>(magic)[4], |
2292 | static_cast<const uint8_t *>(magic)[5], |
2293 | static_cast<const uint8_t *>(magic)[6], |
2294 | static_cast<const uint8_t *>(magic)[7]); |
2295 | dump_bad_block(rb->buf, rb->size); |
2296 | |
2297 | r = toku_db_badformat(); |
2298 | goto cleanup; |
2299 | } |
2300 | |
2301 | node->layout_version_read_from_disk = rbuf_int(rb); |
2302 | lazy_assert(node->layout_version_read_from_disk >= FT_LAYOUT_MIN_SUPPORTED_VERSION); |
2303 | |
2304 | // Check if we are reading in an older node version. |
2305 | if (node->layout_version_read_from_disk <= FT_LAYOUT_VERSION_14) { |
2306 | int version = node->layout_version_read_from_disk; |
2307 | // Perform the upgrade. |
2308 | r = deserialize_and_upgrade_ftnode(node, ndd, blocknum, bfe, info, fd); |
2309 | if (r != 0) { |
2310 | fprintf(stderr, |
2311 | "%s:%d:deserialize_ftnode_from_rbuf - " |
2312 | "file[%s], blocknum[%ld], deserialize_and_upgrade_ftnode " |
2313 | "failed with %d\n" , |
2314 | __FILE__, |
2315 | __LINE__, |
2316 | fname ? fname : "unknown" , |
2317 | blocknum.b, |
2318 | r); |
2319 | dump_bad_block(rb->buf, rb->size); |
2320 | goto cleanup; |
2321 | } |
2322 | |
2323 | if (version <= FT_LAYOUT_VERSION_13) { |
2324 | // deprecate 'TOKU_DB_VALCMP_BUILTIN'. just remove the flag |
2325 | node->flags &= ~TOKU_DB_VALCMP_BUILTIN_13; |
2326 | } |
2327 | |
2328 | // If everything is ok, just re-assign the ftnode and retrn. |
2329 | *ftnode = node; |
2330 | r = 0; |
2331 | goto cleanup; |
2332 | } |
2333 | |
2334 | // Upgrade versions after 14 to current. This upgrade is trivial, it |
2335 | // removes the optimized for upgrade field, which has already been |
2336 | // removed in the deserialization code (see |
2337 | // deserialize_ftnode_info()). |
2338 | node->layout_version = FT_LAYOUT_VERSION; |
2339 | node->layout_version_original = rbuf_int(rb); |
2340 | node->build_id = rbuf_int(rb); |
2341 | node->n_children = rbuf_int(rb); |
2342 | XMALLOC_N(node->n_children, node->bp); |
2343 | XMALLOC_N(node->n_children, *ndd); |
2344 | // read the partition locations |
2345 | for (int i=0; i<node->n_children; i++) { |
2346 | BP_START(*ndd,i) = rbuf_int(rb); |
2347 | BP_SIZE (*ndd,i) = rbuf_int(rb); |
2348 | } |
2349 | // verify checksum of header stored |
2350 | uint32_t checksum; |
2351 | checksum = toku_x1764_memory(rb->buf, rb->ndone); |
2352 | uint32_t stored_checksum; |
2353 | stored_checksum = rbuf_int(rb); |
2354 | if (stored_checksum != checksum) { |
2355 | fprintf( |
2356 | stderr, |
2357 | "%s:%d:deserialize_ftnode_from_rbuf - " |
2358 | "file[%s], blocknum[%ld], stored_checksum[%d] != checksum[%d]\n" , |
2359 | __FILE__, |
2360 | __LINE__, |
2361 | fname ? fname : "unknown" , |
2362 | blocknum.b, |
2363 | stored_checksum, |
2364 | checksum); |
2365 | dump_bad_block(rb->buf, rb->size); |
2366 | invariant(stored_checksum == checksum); |
2367 | } |
2368 | |
2369 | // now we read and decompress the pivot and child information |
2370 | sub_block_init(&sb_node_info); |
2371 | { |
2372 | tokutime_t sb_decompress_t0 = toku_time_now(); |
2373 | r = read_and_decompress_sub_block(rb, &sb_node_info); |
2374 | tokutime_t sb_decompress_t1 = toku_time_now(); |
2375 | decompress_time += sb_decompress_t1 - sb_decompress_t0; |
2376 | if (r != 0) { |
2377 | fprintf( |
2378 | stderr, |
2379 | "%s:%d:deserialize_ftnode_from_rbuf - " |
2380 | "file[%s], blocknum[%ld], read_and_decompress_sub_block failed " |
2381 | "with %d\n" , |
2382 | __FILE__, |
2383 | __LINE__, |
2384 | fname ? fname : "unknown" , |
2385 | blocknum.b, |
2386 | r); |
2387 | dump_bad_block( |
2388 | static_cast<unsigned char *>(sb_node_info.uncompressed_ptr), |
2389 | sb_node_info.uncompressed_size); |
2390 | dump_bad_block(rb->buf, rb->size); |
2391 | goto cleanup; |
2392 | } |
2393 | } |
2394 | |
2395 | // at this point, sb->uncompressed_ptr stores the serialized node info |
2396 | r = deserialize_ftnode_info(&sb_node_info, node); |
2397 | if (r != 0) { |
2398 | fprintf( |
2399 | stderr, |
2400 | "%s:%d:deserialize_ftnode_from_rbuf - " |
2401 | "file[%s], blocknum[%ld], deserialize_ftnode_info failed with " |
2402 | "%d\n" , |
2403 | __FILE__, |
2404 | __LINE__, |
2405 | fname ? fname : "unknown" , |
2406 | blocknum.b, |
2407 | r); |
2408 | dump_bad_block(rb->buf, rb->size); |
2409 | goto cleanup; |
2410 | } |
2411 | toku_free(sb_node_info.uncompressed_ptr); |
2412 | |
2413 | // now that the node info has been deserialized, we can proceed to |
2414 | // deserialize the individual sub blocks |
2415 | |
2416 | // setup the memory of the partitions |
2417 | // for partitions being decompressed, create either message buffer or |
2418 | // basement node |
2419 | // for partitions staying compressed, create sub_block |
2420 | setup_ftnode_partitions(node, bfe, true); |
2421 | |
2422 | // This loop is parallelizeable, since we don't have a dependency on the |
2423 | // work done so far. |
2424 | for (int i = 0; i < node->n_children; i++) { |
2425 | uint32_t curr_offset = BP_START(*ndd, i); |
2426 | uint32_t curr_size = BP_SIZE(*ndd, i); |
2427 | // the compressed, serialized partitions start at where rb is currently |
2428 | // pointing, which would be rb->buf + rb->ndone |
2429 | // we need to intialize curr_rbuf to point to this place |
2430 | struct rbuf curr_rbuf = {.buf = nullptr, .size = 0, .ndone = 0}; |
2431 | rbuf_init(&curr_rbuf, rb->buf + curr_offset, curr_size); |
2432 | |
2433 | // |
2434 | // now we are at the point where we have: |
2435 | // - read the entire compressed node off of disk, |
2436 | // - decompressed the pivot and offset information, |
2437 | // - have arrived at the individual partitions. |
2438 | // |
2439 | // Based on the information in bfe, we want to decompress a subset of |
2440 | // of the compressed partitions (also possibly none or possibly all) |
2441 | // The partitions that we want to decompress and make available |
2442 | // to the node, we do, the rest we simply copy in compressed |
2443 | // form into the node, and set the state of the partition to |
2444 | // PT_COMPRESSED |
2445 | // |
2446 | |
2447 | struct sub_block curr_sb; |
2448 | sub_block_init(&curr_sb); |
2449 | |
2450 | // curr_rbuf is passed by value to decompress_and_deserialize_worker, |
2451 | // so there's no ugly race condition. |
2452 | // This would be more obvious if curr_rbuf were an array. |
2453 | |
2454 | // deserialize_ftnode_info figures out what the state |
2455 | // should be and sets up the memory so that we are ready to use it |
2456 | |
2457 | switch (BP_STATE(node, i)) { |
2458 | case PT_AVAIL: { |
2459 | // case where we read and decompress the partition |
2460 | tokutime_t partition_decompress_time; |
2461 | r = decompress_and_deserialize_worker( |
2462 | curr_rbuf, |
2463 | curr_sb, |
2464 | node, |
2465 | i, |
2466 | bfe->ft->cmp, |
2467 | &partition_decompress_time); |
2468 | decompress_time += partition_decompress_time; |
2469 | if (r != 0) { |
2470 | fprintf( |
2471 | stderr, |
2472 | "%s:%d:deserialize_ftnode_from_rbuf - " |
2473 | "file[%s], blocknum[%ld], childnum[%d], " |
2474 | "decompress_and_deserialize_worker failed with %d\n" , |
2475 | __FILE__, |
2476 | __LINE__, |
2477 | fname ? fname : "unknown" , |
2478 | blocknum.b, |
2479 | i, |
2480 | r); |
2481 | dump_bad_block(rb->buf, rb->size); |
2482 | goto cleanup; |
2483 | } |
2484 | break; |
2485 | } |
2486 | case PT_COMPRESSED: |
2487 | // case where we leave the partition in the compressed state |
2488 | r = check_and_copy_compressed_sub_block_worker(curr_rbuf, curr_sb, node, i); |
2489 | if (r != 0) { |
2490 | fprintf( |
2491 | stderr, |
2492 | "%s:%d:deserialize_ftnode_from_rbuf - " |
2493 | "file[%s], blocknum[%ld], childnum[%d], " |
2494 | "check_and_copy_compressed_sub_block_worker failed with " |
2495 | "%d\n" , |
2496 | __FILE__, |
2497 | __LINE__, |
2498 | fname ? fname : "unknown" , |
2499 | blocknum.b, |
2500 | i, |
2501 | r); |
2502 | dump_bad_block(rb->buf, rb->size); |
2503 | goto cleanup; |
2504 | } |
2505 | break; |
2506 | case PT_INVALID: // this is really bad |
2507 | case PT_ON_DISK: // it's supposed to be in memory. |
2508 | abort(); |
2509 | } |
2510 | } |
2511 | *ftnode = node; |
2512 | r = 0; |
2513 | |
2514 | cleanup: |
2515 | if (r == 0) { |
2516 | t1 = toku_time_now(); |
2517 | deserialize_time = (t1 - t0) - decompress_time; |
2518 | bfe->deserialize_time += deserialize_time; |
2519 | bfe->decompress_time += decompress_time; |
2520 | toku_ft_status_update_deserialize_times(node, deserialize_time, decompress_time); |
2521 | } |
2522 | if (r != 0) { |
2523 | // NOTE: Right now, callers higher in the stack will assert on |
2524 | // failure, so this is OK for production. However, if we |
2525 | // create tools that use this function to search for errors in |
2526 | // the FT, then we will leak memory. |
2527 | if (node) { |
2528 | toku_free(node); |
2529 | } |
2530 | } |
2531 | return r; |
2532 | } |
2533 | |
2534 | int |
2535 | (FTNODE node, FTNODE_DISK_DATA ndd, int childnum, int fd, ftnode_fetch_extra *bfe) { |
2536 | int r = 0; |
2537 | assert(BP_STATE(node,childnum) == PT_ON_DISK); |
2538 | assert(node->bp[childnum].ptr.tag == BCT_NULL); |
2539 | |
2540 | // |
2541 | // setup the partition |
2542 | // |
2543 | setup_available_ftnode_partition(node, childnum); |
2544 | BP_STATE(node,childnum) = PT_AVAIL; |
2545 | |
2546 | // |
2547 | // read off disk and make available in memory |
2548 | // |
2549 | // get the file offset and block size for the block |
2550 | DISKOFF node_offset, total_node_disk_size; |
2551 | bfe->ft->blocktable.translate_blocknum_to_offset_size(node->blocknum, &node_offset, &total_node_disk_size); |
2552 | |
2553 | uint32_t curr_offset = BP_START(ndd, childnum); |
2554 | uint32_t curr_size = BP_SIZE (ndd, childnum); |
2555 | |
2556 | struct rbuf rb; |
2557 | rbuf_init(&rb, nullptr, 0); |
2558 | |
2559 | uint32_t pad_at_beginning = (node_offset+curr_offset)%512; |
2560 | uint32_t padded_size = roundup_to_multiple(512, pad_at_beginning + curr_size); |
2561 | |
2562 | toku::scoped_malloc_aligned raw_block_buf(padded_size, 512); |
2563 | uint8_t *raw_block = reinterpret_cast<uint8_t *>(raw_block_buf.get()); |
2564 | rbuf_init(&rb, pad_at_beginning+raw_block, curr_size); |
2565 | tokutime_t t0 = toku_time_now(); |
2566 | |
2567 | // read the block |
2568 | assert(0==((unsigned long long)raw_block)%512); // for O_DIRECT |
2569 | assert(0==(padded_size)%512); |
2570 | assert(0==(node_offset+curr_offset-pad_at_beginning)%512); |
2571 | ssize_t rlen = toku_os_pread(fd, raw_block, padded_size, node_offset+curr_offset-pad_at_beginning); |
2572 | assert((DISKOFF)rlen >= pad_at_beginning + curr_size); // we read in at least enough to get what we wanted |
2573 | assert((DISKOFF)rlen <= padded_size); // we didn't read in too much. |
2574 | |
2575 | tokutime_t t1 = toku_time_now(); |
2576 | |
2577 | // read sub block |
2578 | struct sub_block curr_sb; |
2579 | sub_block_init(&curr_sb); |
2580 | r = read_compressed_sub_block(&rb, &curr_sb); |
2581 | if (r != 0) { |
2582 | return r; |
2583 | } |
2584 | invariant(curr_sb.compressed_ptr != NULL); |
2585 | |
2586 | // decompress |
2587 | toku::scoped_malloc uncompressed_buf(curr_sb.uncompressed_size); |
2588 | curr_sb.uncompressed_ptr = uncompressed_buf.get(); |
2589 | toku_decompress((Bytef *) curr_sb.uncompressed_ptr, curr_sb.uncompressed_size, |
2590 | (Bytef *) curr_sb.compressed_ptr, curr_sb.compressed_size); |
2591 | |
2592 | // deserialize |
2593 | tokutime_t t2 = toku_time_now(); |
2594 | |
2595 | r = deserialize_ftnode_partition(&curr_sb, node, childnum, bfe->ft->cmp); |
2596 | |
2597 | tokutime_t t3 = toku_time_now(); |
2598 | |
2599 | // capture stats |
2600 | tokutime_t io_time = t1 - t0; |
2601 | tokutime_t decompress_time = t2 - t1; |
2602 | tokutime_t deserialize_time = t3 - t2; |
2603 | bfe->deserialize_time += deserialize_time; |
2604 | bfe->decompress_time += decompress_time; |
2605 | toku_ft_status_update_deserialize_times(node, deserialize_time, decompress_time); |
2606 | |
2607 | bfe->bytes_read = rlen; |
2608 | bfe->io_time = io_time; |
2609 | |
2610 | return r; |
2611 | } |
2612 | |
2613 | // Take a ftnode partition that is in the compressed state, and make it avail |
2614 | int (FTNODE node, |
2615 | int childnum, |
2616 | ftnode_fetch_extra *bfe) { |
2617 | |
2618 | int r = 0; |
2619 | assert(BP_STATE(node, childnum) == PT_COMPRESSED); |
2620 | SUB_BLOCK curr_sb = BSB(node, childnum); |
2621 | |
2622 | toku::scoped_malloc uncompressed_buf(curr_sb->uncompressed_size); |
2623 | assert(curr_sb->uncompressed_ptr == NULL); |
2624 | curr_sb->uncompressed_ptr = uncompressed_buf.get(); |
2625 | |
2626 | setup_available_ftnode_partition(node, childnum); |
2627 | BP_STATE(node,childnum) = PT_AVAIL; |
2628 | |
2629 | // decompress the sub_block |
2630 | tokutime_t t0 = toku_time_now(); |
2631 | |
2632 | toku_decompress((Bytef *)curr_sb->uncompressed_ptr, |
2633 | curr_sb->uncompressed_size, |
2634 | (Bytef *)curr_sb->compressed_ptr, |
2635 | curr_sb->compressed_size); |
2636 | |
2637 | tokutime_t t1 = toku_time_now(); |
2638 | |
2639 | r = deserialize_ftnode_partition(curr_sb, node, childnum, bfe->ft->cmp); |
2640 | if (r != 0) { |
2641 | const char* fname = toku_cachefile_fname_in_env(bfe->ft->cf); |
2642 | fprintf(stderr, |
2643 | "%s:%d:toku_deserialize_bp_from_compressed - " |
2644 | "file[%s], blocknum[%ld], " |
2645 | "deserialize_ftnode_partition failed with %d\n" , |
2646 | __FILE__, |
2647 | __LINE__, |
2648 | fname ? fname : "unknown" , |
2649 | node->blocknum.b, |
2650 | r); |
2651 | dump_bad_block(static_cast<unsigned char *>(curr_sb->compressed_ptr), |
2652 | curr_sb->compressed_size); |
2653 | dump_bad_block(static_cast<unsigned char *>(curr_sb->uncompressed_ptr), |
2654 | curr_sb->uncompressed_size); |
2655 | } |
2656 | |
2657 | tokutime_t t2 = toku_time_now(); |
2658 | |
2659 | tokutime_t decompress_time = t1 - t0; |
2660 | tokutime_t deserialize_time = t2 - t1; |
2661 | bfe->deserialize_time += deserialize_time; |
2662 | bfe->decompress_time += decompress_time; |
2663 | toku_ft_status_update_deserialize_times(node, deserialize_time, decompress_time); |
2664 | |
2665 | toku_free(curr_sb->compressed_ptr); |
2666 | toku_free(curr_sb); |
2667 | return r; |
2668 | } |
2669 | |
2670 | static int (int fd, |
2671 | BLOCKNUM blocknum, |
2672 | uint32_t fullhash, |
2673 | FTNODE *ftnode, |
2674 | FTNODE_DISK_DATA *ndd, |
2675 | ftnode_fetch_extra *bfe, |
2676 | STAT64INFO info) { |
2677 | struct rbuf rb = RBUF_INITIALIZER; |
2678 | |
2679 | tokutime_t t0 = toku_time_now(); |
2680 | read_block_from_fd_into_rbuf(fd, blocknum, bfe->ft, &rb); |
2681 | tokutime_t t1 = toku_time_now(); |
2682 | |
2683 | // Decompress and deserialize the ftnode. Time statistics |
2684 | // are taken inside this function. |
2685 | int r = deserialize_ftnode_from_rbuf( |
2686 | ftnode, ndd, blocknum, fullhash, bfe, info, &rb, fd); |
2687 | if (r != 0) { |
2688 | const char* fname = toku_cachefile_fname_in_env(bfe->ft->cf); |
2689 | fprintf( |
2690 | stderr, |
2691 | "%s:%d:deserialize_ftnode_from_fd - " |
2692 | "file[%s], blocknum[%ld], deserialize_ftnode_from_rbuf failed with " |
2693 | "%d\n" , |
2694 | __FILE__, |
2695 | __LINE__, |
2696 | fname ? fname : "unknown" , |
2697 | blocknum.b, |
2698 | r); |
2699 | dump_bad_block(rb.buf, rb.size); |
2700 | } |
2701 | |
2702 | bfe->bytes_read = rb.size; |
2703 | bfe->io_time = t1 - t0; |
2704 | toku_free(rb.buf); |
2705 | return r; |
2706 | } |
2707 | |
2708 | // Effect: Read a node in. If possible, read just the header. |
2709 | // Perform version upgrade if necessary. |
2710 | int (int fd, |
2711 | BLOCKNUM blocknum, |
2712 | uint32_t fullhash, |
2713 | FTNODE *ftnode, |
2714 | FTNODE_DISK_DATA *ndd, |
2715 | ftnode_fetch_extra *bfe) { |
2716 | int r = 0; |
2717 | struct rbuf rb = RBUF_INITIALIZER; |
2718 | |
2719 | // each function below takes the appropriate io/decompression/deserialize |
2720 | // statistics |
2721 | |
2722 | if (!bfe->read_all_partitions) { |
2723 | read_ftnode_header_from_fd_into_rbuf_if_small_enough( |
2724 | fd, blocknum, bfe->ft, &rb, bfe); |
2725 | r = deserialize_ftnode_header_from_rbuf_if_small_enough( |
2726 | ftnode, ndd, blocknum, fullhash, bfe, &rb, fd); |
2727 | } else { |
2728 | // force us to do it the old way |
2729 | r = -1; |
2730 | } |
2731 | if (r != 0) { |
2732 | // Something went wrong, go back to doing it the old way. |
2733 | r = deserialize_ftnode_from_fd( |
2734 | fd, blocknum, fullhash, ftnode, ndd, bfe, nullptr); |
2735 | } |
2736 | |
2737 | toku_free(rb.buf); |
2738 | return r; |
2739 | } |
2740 | |
2741 | void |
2742 | toku_verify_or_set_counts(FTNODE UU(node)) { |
2743 | } |
2744 | |
2745 | int |
2746 | toku_db_badformat(void) { |
2747 | return DB_BADFORMAT; |
2748 | } |
2749 | |
2750 | static size_t |
2751 | serialize_rollback_log_size(ROLLBACK_LOG_NODE log) { |
2752 | size_t size = node_header_overhead //8 "tokuroll", 4 version, 4 version_original, 4 build_id |
2753 | +16 //TXNID_PAIR |
2754 | +8 //sequence |
2755 | +8 //blocknum |
2756 | +8 //previous (blocknum) |
2757 | +8 //resident_bytecount |
2758 | +8 //memarena size |
2759 | +log->rollentry_resident_bytecount; |
2760 | return size; |
2761 | } |
2762 | |
2763 | static void |
2764 | serialize_rollback_log_node_to_buf(ROLLBACK_LOG_NODE log, char *buf, size_t calculated_size, int UU(n_sub_blocks), struct sub_block UU(sub_block[])) { |
2765 | struct wbuf wb; |
2766 | wbuf_init(&wb, buf, calculated_size); |
2767 | { //Serialize rollback log to local wbuf |
2768 | wbuf_nocrc_literal_bytes(&wb, "tokuroll" , 8); |
2769 | lazy_assert(log->layout_version == FT_LAYOUT_VERSION); |
2770 | wbuf_nocrc_int(&wb, log->layout_version); |
2771 | wbuf_nocrc_int(&wb, log->layout_version_original); |
2772 | wbuf_nocrc_uint(&wb, BUILD_ID); |
2773 | wbuf_nocrc_TXNID_PAIR(&wb, log->txnid); |
2774 | wbuf_nocrc_ulonglong(&wb, log->sequence); |
2775 | wbuf_nocrc_BLOCKNUM(&wb, log->blocknum); |
2776 | wbuf_nocrc_BLOCKNUM(&wb, log->previous); |
2777 | wbuf_nocrc_ulonglong(&wb, log->rollentry_resident_bytecount); |
2778 | //Write down memarena size needed to restore |
2779 | wbuf_nocrc_ulonglong(&wb, log->rollentry_arena.total_size_in_use()); |
2780 | |
2781 | { |
2782 | //Store rollback logs |
2783 | struct roll_entry *item; |
2784 | size_t done_before = wb.ndone; |
2785 | for (item = log->newest_logentry; item; item = item->prev) { |
2786 | toku_logger_rollback_wbuf_nocrc_write(&wb, item); |
2787 | } |
2788 | lazy_assert(done_before + log->rollentry_resident_bytecount == wb.ndone); |
2789 | } |
2790 | } |
2791 | lazy_assert(wb.ndone == wb.size); |
2792 | lazy_assert(calculated_size==wb.ndone); |
2793 | } |
2794 | |
2795 | static void |
2796 | serialize_uncompressed_block_to_memory(char * uncompressed_buf, |
2797 | int n_sub_blocks, |
2798 | struct sub_block sub_block[/*n_sub_blocks*/], |
2799 | enum toku_compression_method method, |
2800 | /*out*/ size_t *n_bytes_to_write, |
2801 | /*out*/ char **bytes_to_write) |
2802 | // Guarantees that the malloc'd BYTES_TO_WRITE is 512-byte aligned (so that O_DIRECT will work) |
2803 | { |
2804 | // allocate space for the compressed uncompressed_buf |
2805 | size_t compressed_len = get_sum_compressed_size_bound(n_sub_blocks, sub_block, method); |
2806 | size_t = sub_block_header_size(n_sub_blocks); |
2807 | size_t = node_header_overhead + sub_block_header_len + sizeof (uint32_t); // node + sub_block + checksum |
2808 | char *XMALLOC_N_ALIGNED(512, roundup_to_multiple(512, header_len + compressed_len), compressed_buf); |
2809 | |
2810 | // copy the header |
2811 | memcpy(compressed_buf, uncompressed_buf, node_header_overhead); |
2812 | if (0) printf("First 4 bytes before compressing data are %02x%02x%02x%02x\n" , |
2813 | uncompressed_buf[node_header_overhead], uncompressed_buf[node_header_overhead+1], |
2814 | uncompressed_buf[node_header_overhead+2], uncompressed_buf[node_header_overhead+3]); |
2815 | |
2816 | // compress all of the sub blocks |
2817 | char *uncompressed_ptr = uncompressed_buf + node_header_overhead; |
2818 | char *compressed_ptr = compressed_buf + header_len; |
2819 | compressed_len = compress_all_sub_blocks(n_sub_blocks, sub_block, uncompressed_ptr, compressed_ptr, num_cores, ft_pool, method); |
2820 | |
2821 | //if (0) printf("Block %" PRId64 " Size before compressing %u, after compression %" PRIu64 "\n", blocknum.b, calculated_size-node_header_overhead, (uint64_t) compressed_len); |
2822 | |
2823 | // serialize the sub block header |
2824 | uint32_t *ptr = (uint32_t *)(compressed_buf + node_header_overhead); |
2825 | *ptr++ = toku_htod32(n_sub_blocks); |
2826 | for (int i=0; i<n_sub_blocks; i++) { |
2827 | ptr[0] = toku_htod32(sub_block[i].compressed_size); |
2828 | ptr[1] = toku_htod32(sub_block[i].uncompressed_size); |
2829 | ptr[2] = toku_htod32(sub_block[i].xsum); |
2830 | ptr += 3; |
2831 | } |
2832 | |
2833 | // compute the header checksum and serialize it |
2834 | uint32_t = (char *)ptr - (char *)compressed_buf; |
2835 | uint32_t xsum = toku_x1764_memory(compressed_buf, header_length); |
2836 | *ptr = toku_htod32(xsum); |
2837 | |
2838 | uint32_t padded_len = roundup_to_multiple(512, header_len + compressed_len); |
2839 | // Zero out padding. |
2840 | for (uint32_t i = header_len+compressed_len; i < padded_len; i++) { |
2841 | compressed_buf[i] = 0; |
2842 | } |
2843 | *n_bytes_to_write = padded_len; |
2844 | *bytes_to_write = compressed_buf; |
2845 | } |
2846 | |
2847 | void |
2848 | toku_serialize_rollback_log_to_memory_uncompressed(ROLLBACK_LOG_NODE log, SERIALIZED_ROLLBACK_LOG_NODE serialized) { |
2849 | // get the size of the serialized node |
2850 | size_t calculated_size = serialize_rollback_log_size(log); |
2851 | |
2852 | serialized->len = calculated_size; |
2853 | serialized->n_sub_blocks = 0; |
2854 | // choose sub block parameters |
2855 | int sub_block_size = 0; |
2856 | size_t data_size = calculated_size - node_header_overhead; |
2857 | choose_sub_block_size(data_size, max_sub_blocks, &sub_block_size, &serialized->n_sub_blocks); |
2858 | lazy_assert(0 < serialized->n_sub_blocks && serialized->n_sub_blocks <= max_sub_blocks); |
2859 | lazy_assert(sub_block_size > 0); |
2860 | |
2861 | // set the initial sub block size for all of the sub blocks |
2862 | for (int i = 0; i < serialized->n_sub_blocks; i++) |
2863 | sub_block_init(&serialized->sub_block[i]); |
2864 | set_all_sub_block_sizes(data_size, sub_block_size, serialized->n_sub_blocks, serialized->sub_block); |
2865 | |
2866 | // allocate space for the serialized node |
2867 | XMALLOC_N(calculated_size, serialized->data); |
2868 | // serialize the node into buf |
2869 | serialize_rollback_log_node_to_buf(log, serialized->data, calculated_size, serialized->n_sub_blocks, serialized->sub_block); |
2870 | serialized->blocknum = log->blocknum; |
2871 | } |
2872 | |
2873 | int toku_serialize_rollback_log_to(int fd, |
2874 | ROLLBACK_LOG_NODE log, |
2875 | SERIALIZED_ROLLBACK_LOG_NODE serialized_log, |
2876 | bool is_serialized, |
2877 | FT ft, |
2878 | bool for_checkpoint) { |
2879 | size_t n_to_write; |
2880 | char *compressed_buf; |
2881 | struct serialized_rollback_log_node serialized_local; |
2882 | |
2883 | if (is_serialized) { |
2884 | invariant_null(log); |
2885 | } else { |
2886 | invariant_null(serialized_log); |
2887 | serialized_log = &serialized_local; |
2888 | toku_serialize_rollback_log_to_memory_uncompressed(log, serialized_log); |
2889 | } |
2890 | |
2891 | BLOCKNUM blocknum = serialized_log->blocknum; |
2892 | invariant(blocknum.b >= 0); |
2893 | |
2894 | // Compress and malloc buffer to write |
2895 | serialize_uncompressed_block_to_memory(serialized_log->data, |
2896 | serialized_log->n_sub_blocks, |
2897 | serialized_log->sub_block, |
2898 | ft->h->compression_method, |
2899 | &n_to_write, |
2900 | &compressed_buf); |
2901 | |
2902 | // Dirties the ft |
2903 | DISKOFF offset; |
2904 | ft->blocktable.realloc_on_disk( |
2905 | blocknum, n_to_write, &offset, ft, fd, for_checkpoint); |
2906 | |
2907 | toku_os_full_pwrite(fd, compressed_buf, n_to_write, offset); |
2908 | toku_free(compressed_buf); |
2909 | if (!is_serialized) { |
2910 | toku_static_serialized_rollback_log_destroy(&serialized_local); |
2911 | log->dirty = 0; // See #1957. Must set the node to be clean after |
2912 | // serializing it so that it doesn't get written again |
2913 | // on the next checkpoint or eviction. |
2914 | } |
2915 | return 0; |
2916 | } |
2917 | |
2918 | static int |
2919 | deserialize_rollback_log_from_rbuf (BLOCKNUM blocknum, ROLLBACK_LOG_NODE *log_p, struct rbuf *rb) { |
2920 | ROLLBACK_LOG_NODE MALLOC(result); |
2921 | int r; |
2922 | if (result==NULL) { |
2923 | r=get_error_errno(); |
2924 | if (0) { died0: toku_free(result); } |
2925 | return r; |
2926 | } |
2927 | |
2928 | const void *magic; |
2929 | rbuf_literal_bytes(rb, &magic, 8); |
2930 | lazy_assert(!memcmp(magic, "tokuroll" , 8)); |
2931 | |
2932 | result->layout_version = rbuf_int(rb); |
2933 | lazy_assert((FT_LAYOUT_VERSION_25 <= result->layout_version && result->layout_version <= FT_LAYOUT_VERSION_27) || |
2934 | (result->layout_version == FT_LAYOUT_VERSION)); |
2935 | result->layout_version_original = rbuf_int(rb); |
2936 | result->layout_version_read_from_disk = result->layout_version; |
2937 | result->build_id = rbuf_int(rb); |
2938 | result->dirty = false; |
2939 | //TODO: Maybe add descriptor (or just descriptor version) here eventually? |
2940 | //TODO: This is hard.. everything is shared in a single dictionary. |
2941 | rbuf_TXNID_PAIR(rb, &result->txnid); |
2942 | result->sequence = rbuf_ulonglong(rb); |
2943 | result->blocknum = rbuf_blocknum(rb); |
2944 | if (result->blocknum.b != blocknum.b) { |
2945 | r = toku_db_badformat(); |
2946 | goto died0; |
2947 | } |
2948 | result->previous = rbuf_blocknum(rb); |
2949 | result->rollentry_resident_bytecount = rbuf_ulonglong(rb); |
2950 | |
2951 | size_t arena_initial_size = rbuf_ulonglong(rb); |
2952 | result->rollentry_arena.create(arena_initial_size); |
2953 | if (0) { died1: result->rollentry_arena.destroy(); goto died0; } |
2954 | |
2955 | //Load rollback entries |
2956 | lazy_assert(rb->size > 4); |
2957 | //Start with empty list |
2958 | result->oldest_logentry = result->newest_logentry = NULL; |
2959 | while (rb->ndone < rb->size) { |
2960 | struct roll_entry *item; |
2961 | uint32_t rollback_fsize = rbuf_int(rb); //Already read 4. Rest is 4 smaller |
2962 | const void *item_vec; |
2963 | rbuf_literal_bytes(rb, &item_vec, rollback_fsize-4); |
2964 | unsigned char* item_buf = (unsigned char*)item_vec; |
2965 | r = toku_parse_rollback(item_buf, rollback_fsize-4, &item, &result->rollentry_arena); |
2966 | if (r!=0) { |
2967 | r = toku_db_badformat(); |
2968 | goto died1; |
2969 | } |
2970 | //Add to head of list |
2971 | if (result->oldest_logentry) { |
2972 | result->oldest_logentry->prev = item; |
2973 | result->oldest_logentry = item; |
2974 | item->prev = NULL; |
2975 | } |
2976 | else { |
2977 | result->oldest_logentry = result->newest_logentry = item; |
2978 | item->prev = NULL; |
2979 | } |
2980 | } |
2981 | |
2982 | toku_free(rb->buf); |
2983 | rb->buf = NULL; |
2984 | *log_p = result; |
2985 | return 0; |
2986 | } |
2987 | |
2988 | static int |
2989 | deserialize_rollback_log_from_rbuf_versioned (uint32_t version, BLOCKNUM blocknum, |
2990 | ROLLBACK_LOG_NODE *log, |
2991 | struct rbuf *rb) { |
2992 | int r = 0; |
2993 | ROLLBACK_LOG_NODE rollback_log_node = NULL; |
2994 | invariant((FT_LAYOUT_VERSION_25 <= version && version <= FT_LAYOUT_VERSION_27) || version == FT_LAYOUT_VERSION); |
2995 | r = deserialize_rollback_log_from_rbuf(blocknum, &rollback_log_node, rb); |
2996 | if (r==0) { |
2997 | *log = rollback_log_node; |
2998 | } |
2999 | return r; |
3000 | } |
3001 | |
3002 | int |
3003 | decompress_from_raw_block_into_rbuf(uint8_t *raw_block, size_t raw_block_size, struct rbuf *rb, BLOCKNUM blocknum) { |
3004 | int r = 0; |
3005 | // get the number of compressed sub blocks |
3006 | int n_sub_blocks; |
3007 | n_sub_blocks = toku_dtoh32(*(uint32_t*)(&raw_block[node_header_overhead])); |
3008 | |
3009 | // verify the number of sub blocks |
3010 | invariant(0 <= n_sub_blocks); |
3011 | invariant(n_sub_blocks <= max_sub_blocks); |
3012 | |
3013 | { // verify the header checksum |
3014 | uint32_t = node_header_overhead + sub_block_header_size(n_sub_blocks); |
3015 | invariant(header_length <= raw_block_size); |
3016 | uint32_t xsum = toku_x1764_memory(raw_block, header_length); |
3017 | uint32_t stored_xsum = toku_dtoh32(*(uint32_t *)(raw_block + header_length)); |
3018 | if (xsum != stored_xsum) { |
3019 | r = TOKUDB_BAD_CHECKSUM; |
3020 | } |
3021 | } |
3022 | |
3023 | // deserialize the sub block header |
3024 | struct sub_block sub_block[n_sub_blocks]; |
3025 | uint32_t * = (uint32_t *) &raw_block[node_header_overhead+4]; |
3026 | for (int i = 0; i < n_sub_blocks; i++) { |
3027 | sub_block_init(&sub_block[i]); |
3028 | sub_block[i].compressed_size = toku_dtoh32(sub_block_header[0]); |
3029 | sub_block[i].uncompressed_size = toku_dtoh32(sub_block_header[1]); |
3030 | sub_block[i].xsum = toku_dtoh32(sub_block_header[2]); |
3031 | sub_block_header += 3; |
3032 | } |
3033 | |
3034 | // This predicate needs to be here and instead of where it is set |
3035 | // for the compiler. |
3036 | if (r == TOKUDB_BAD_CHECKSUM) { |
3037 | goto exit; |
3038 | } |
3039 | |
3040 | // verify sub block sizes |
3041 | for (int i = 0; i < n_sub_blocks; i++) { |
3042 | uint32_t compressed_size = sub_block[i].compressed_size; |
3043 | if (compressed_size<=0 || compressed_size>(1<<30)) { |
3044 | r = toku_db_badformat(); |
3045 | goto exit; |
3046 | } |
3047 | |
3048 | uint32_t uncompressed_size = sub_block[i].uncompressed_size; |
3049 | if (0) printf("Block %" PRId64 " Compressed size = %u, uncompressed size=%u\n" , blocknum.b, compressed_size, uncompressed_size); |
3050 | if (uncompressed_size<=0 || uncompressed_size>(1<<30)) { |
3051 | r = toku_db_badformat(); |
3052 | goto exit; |
3053 | } |
3054 | } |
3055 | |
3056 | // sum up the uncompressed size of the sub blocks |
3057 | size_t uncompressed_size; |
3058 | uncompressed_size = get_sum_uncompressed_size(n_sub_blocks, sub_block); |
3059 | |
3060 | // allocate the uncompressed buffer |
3061 | size_t size; |
3062 | size = node_header_overhead + uncompressed_size; |
3063 | unsigned char *buf; |
3064 | XMALLOC_N(size, buf); |
3065 | rbuf_init(rb, buf, size); |
3066 | |
3067 | // copy the uncompressed node header to the uncompressed buffer |
3068 | memcpy(rb->buf, raw_block, node_header_overhead); |
3069 | |
3070 | // point at the start of the compressed data (past the node header, the sub block header, and the header checksum) |
3071 | unsigned char *compressed_data; |
3072 | compressed_data = raw_block + node_header_overhead + sub_block_header_size(n_sub_blocks) + sizeof (uint32_t); |
3073 | |
3074 | // point at the start of the uncompressed data |
3075 | unsigned char *uncompressed_data; |
3076 | uncompressed_data = rb->buf + node_header_overhead; |
3077 | |
3078 | // decompress all the compressed sub blocks into the uncompressed buffer |
3079 | r = decompress_all_sub_blocks(n_sub_blocks, sub_block, compressed_data, uncompressed_data, num_cores, ft_pool); |
3080 | if (r != 0) { |
3081 | fprintf(stderr, "%s:%d block %" PRId64 " failed %d at %p size %zu\n" , __FUNCTION__, __LINE__, blocknum.b, r, raw_block, raw_block_size); |
3082 | dump_bad_block(raw_block, raw_block_size); |
3083 | goto exit; |
3084 | } |
3085 | |
3086 | rb->ndone=0; |
3087 | exit: |
3088 | return r; |
3089 | } |
3090 | |
3091 | static int decompress_from_raw_block_into_rbuf_versioned(uint32_t version, uint8_t *raw_block, size_t raw_block_size, struct rbuf *rb, BLOCKNUM blocknum) { |
3092 | // This function exists solely to accommodate future changes in compression. |
3093 | int r = 0; |
3094 | if ((version == FT_LAYOUT_VERSION_13 || version == FT_LAYOUT_VERSION_14) || |
3095 | (FT_LAYOUT_VERSION_25 <= version && version <= FT_LAYOUT_VERSION_27) || |
3096 | version == FT_LAYOUT_VERSION) { |
3097 | r = decompress_from_raw_block_into_rbuf(raw_block, raw_block_size, rb, blocknum); |
3098 | } else { |
3099 | abort(); |
3100 | } |
3101 | return r; |
3102 | } |
3103 | |
3104 | static int read_and_decompress_block_from_fd_into_rbuf( |
3105 | int fd, |
3106 | BLOCKNUM blocknum, |
3107 | DISKOFF offset, |
3108 | DISKOFF size, |
3109 | FT ft, |
3110 | struct rbuf *rb, |
3111 | /* out */ int *layout_version_p) { |
3112 | int r = 0; |
3113 | if (0) printf("Deserializing Block %" PRId64 "\n" , blocknum.b); |
3114 | |
3115 | DISKOFF size_aligned = roundup_to_multiple(512, size); |
3116 | uint8_t *XMALLOC_N_ALIGNED(512, size_aligned, raw_block); |
3117 | { |
3118 | // read the (partially compressed) block |
3119 | ssize_t rlen = toku_os_pread(fd, raw_block, size_aligned, offset); |
3120 | lazy_assert((DISKOFF)rlen >= size); |
3121 | lazy_assert((DISKOFF)rlen <= size_aligned); |
3122 | } |
3123 | // get the layout_version |
3124 | int layout_version; |
3125 | { |
3126 | uint8_t *magic = raw_block + uncompressed_magic_offset; |
3127 | if (memcmp(magic, "tokuleaf" , 8)!=0 && |
3128 | memcmp(magic, "tokunode" , 8)!=0 && |
3129 | memcmp(magic, "tokuroll" , 8)!=0) { |
3130 | r = toku_db_badformat(); |
3131 | goto cleanup; |
3132 | } |
3133 | uint8_t *version = raw_block + uncompressed_version_offset; |
3134 | layout_version = toku_dtoh32(*(uint32_t*)version); |
3135 | if (layout_version < FT_LAYOUT_MIN_SUPPORTED_VERSION || layout_version > FT_LAYOUT_VERSION) { |
3136 | r = toku_db_badformat(); |
3137 | goto cleanup; |
3138 | } |
3139 | } |
3140 | |
3141 | r = decompress_from_raw_block_into_rbuf_versioned(layout_version, raw_block, size, rb, blocknum); |
3142 | if (r != 0) { |
3143 | // We either failed the checksome, or there is a bad format in |
3144 | // the buffer. |
3145 | if (r == TOKUDB_BAD_CHECKSUM) { |
3146 | fprintf(stderr, |
3147 | "Checksum failure while reading raw block in file %s.\n" , |
3148 | toku_cachefile_fname_in_env(ft->cf)); |
3149 | abort(); |
3150 | } else { |
3151 | r = toku_db_badformat(); |
3152 | goto cleanup; |
3153 | } |
3154 | } |
3155 | |
3156 | *layout_version_p = layout_version; |
3157 | cleanup: |
3158 | if (r!=0) { |
3159 | if (rb->buf) toku_free(rb->buf); |
3160 | rb->buf = NULL; |
3161 | } |
3162 | if (raw_block) { |
3163 | toku_free(raw_block); |
3164 | } |
3165 | return r; |
3166 | } |
3167 | |
3168 | // Read rollback log node from file into struct. |
3169 | // Perform version upgrade if necessary. |
3170 | int toku_deserialize_rollback_log_from(int fd, BLOCKNUM blocknum, ROLLBACK_LOG_NODE *logp, FT ft) { |
3171 | int layout_version = 0; |
3172 | int r; |
3173 | |
3174 | struct rbuf rb; |
3175 | rbuf_init(&rb, nullptr, 0); |
3176 | |
3177 | // get the file offset and block size for the block |
3178 | DISKOFF offset, size; |
3179 | ft->blocktable.translate_blocknum_to_offset_size(blocknum, &offset, &size); |
3180 | |
3181 | // if the size is 0, then the blocknum is unused |
3182 | if (size == 0) { |
3183 | // blocknum is unused, just create an empty one and get out |
3184 | ROLLBACK_LOG_NODE XMALLOC(log); |
3185 | rollback_empty_log_init(log); |
3186 | log->blocknum.b = blocknum.b; |
3187 | r = 0; |
3188 | *logp = log; |
3189 | goto cleanup; |
3190 | } |
3191 | |
3192 | r = read_and_decompress_block_from_fd_into_rbuf(fd, blocknum, offset, size, ft, &rb, &layout_version); |
3193 | if (r!=0) goto cleanup; |
3194 | |
3195 | { |
3196 | uint8_t *magic = rb.buf + uncompressed_magic_offset; |
3197 | if (memcmp(magic, "tokuroll" , 8)!=0) { |
3198 | r = toku_db_badformat(); |
3199 | goto cleanup; |
3200 | } |
3201 | } |
3202 | |
3203 | r = deserialize_rollback_log_from_rbuf_versioned(layout_version, blocknum, logp, &rb); |
3204 | |
3205 | cleanup: |
3206 | if (rb.buf) { |
3207 | toku_free(rb.buf); |
3208 | } |
3209 | return r; |
3210 | } |
3211 | |
3212 | int |
3213 | toku_upgrade_subtree_estimates_to_stat64info(int fd, FT ft) |
3214 | { |
3215 | int r = 0; |
3216 | // 15 was the last version with subtree estimates |
3217 | invariant(ft->layout_version_read_from_disk <= FT_LAYOUT_VERSION_15); |
3218 | |
3219 | FTNODE unused_node = NULL; |
3220 | FTNODE_DISK_DATA unused_ndd = NULL; |
3221 | ftnode_fetch_extra bfe; |
3222 | bfe.create_for_min_read(ft); |
3223 | r = deserialize_ftnode_from_fd(fd, ft->h->root_blocknum, 0, &unused_node, &unused_ndd, |
3224 | &bfe, &ft->h->on_disk_stats); |
3225 | ft->in_memory_stats = ft->h->on_disk_stats; |
3226 | |
3227 | if (unused_node) { |
3228 | toku_ftnode_free(&unused_node); |
3229 | } |
3230 | if (unused_ndd) { |
3231 | toku_free(unused_ndd); |
3232 | } |
3233 | return r; |
3234 | } |
3235 | |
3236 | int |
3237 | (int fd, FT ft) |
3238 | { |
3239 | int r; |
3240 | // 21 was the first version with max_msn_in_ft in the header |
3241 | invariant(ft->layout_version_read_from_disk <= FT_LAYOUT_VERSION_20); |
3242 | |
3243 | FTNODE node; |
3244 | FTNODE_DISK_DATA ndd; |
3245 | ftnode_fetch_extra bfe; |
3246 | bfe.create_for_min_read(ft); |
3247 | r = deserialize_ftnode_from_fd(fd, ft->h->root_blocknum, 0, &node, &ndd, &bfe, nullptr); |
3248 | if (r != 0) { |
3249 | goto exit; |
3250 | } |
3251 | |
3252 | ft->h->max_msn_in_ft = node->max_msn_applied_to_node_on_disk; |
3253 | toku_ftnode_free(&node); |
3254 | toku_free(ndd); |
3255 | exit: |
3256 | return r; |
3257 | } |
3258 | |
3259 | #undef UPGRADE_STATUS_VALUE |
3260 | |