1/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3#ident "$Id$"
4/*======
5This file is part of PerconaFT.
6
7
8Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 PerconaFT is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 PerconaFT is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
21
22----------------------------------------
23
24 PerconaFT is free software: you can redistribute it and/or modify
25 it under the terms of the GNU Affero General Public License, version 3,
26 as published by the Free Software Foundation.
27
28 PerconaFT is distributed in the hope that it will be useful,
29 but WITHOUT ANY WARRANTY; without even the implied warranty of
30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 GNU Affero General Public License for more details.
32
33 You should have received a copy of the GNU Affero General Public License
34 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
35======= */
36
37#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38
39#include <my_global.h>
40#include <toku_portability.h>
41
42#include <arpa/inet.h>
43
44#include <stdio.h>
45#include <memory.h>
46#include <errno.h>
47#include <toku_assert.h>
48#include <string.h>
49#include <fcntl.h>
50
51#include "ft/ft.h"
52#include "ft/ft-internal.h"
53#include "ft/leafentry.h"
54#include "ft/loader/loader-internal.h"
55#include "ft/loader/pqueue.h"
56#include "ft/loader/dbufio.h"
57#include "ft/logger/log-internal.h"
58#include "ft/node.h"
59#include "ft/serialize/block_table.h"
60#include "ft/serialize/ft-serialize.h"
61#include "ft/serialize/ft_node-serialize.h"
62#include "ft/serialize/sub_block.h"
63
64#include "util/x1764.h"
65
66toku_instr_key *loader_bl_mutex_key;
67toku_instr_key *loader_fi_lock_mutex_key;
68toku_instr_key *loader_out_mutex_key;
69
70toku_instr_key *extractor_thread_key;
71toku_instr_key *fractal_thread_key;
72
73toku_instr_key *tokudb_file_tmp_key;
74toku_instr_key *tokudb_file_load_key;
75
76// 1024 is the right size_factor for production.
77// Different values for these sizes may be used for testing.
78static uint32_t size_factor = 1024;
79static uint32_t default_loader_nodesize = FT_DEFAULT_NODE_SIZE;
80static uint32_t default_loader_basementnodesize = FT_DEFAULT_BASEMENT_NODE_SIZE;
81
82void
83toku_ft_loader_set_size_factor(uint32_t factor) {
84// For test purposes only
85 size_factor = factor;
86 default_loader_nodesize = (size_factor==1) ? (1<<15) : FT_DEFAULT_NODE_SIZE;
87}
88
89uint64_t
90toku_ft_loader_get_rowset_budget_for_testing (void)
91// For test purposes only. In production, the rowset size is determined by negotiation with the cachetable for some memory. (See #2613).
92{
93 return 16ULL*size_factor*1024ULL;
94}
95
96void ft_loader_lock_init(FTLOADER bl) {
97 invariant(!bl->mutex_init);
98 toku_mutex_init(*loader_bl_mutex_key, &bl->mutex, nullptr);
99 bl->mutex_init = true;
100}
101
102void ft_loader_lock_destroy(FTLOADER bl) {
103 if (bl->mutex_init) {
104 toku_mutex_destroy(&bl->mutex);
105 bl->mutex_init = false;
106 }
107}
108
109static void ft_loader_lock(FTLOADER bl) {
110 invariant(bl->mutex_init);
111 toku_mutex_lock(&bl->mutex);
112}
113
114static void ft_loader_unlock(FTLOADER bl) {
115 invariant(bl->mutex_init);
116 toku_mutex_unlock(&bl->mutex);
117}
118
119static int add_big_buffer(struct file_info *file) {
120 int result = 0;
121 bool newbuffer = false;
122 if (file->buffer == NULL) {
123 file->buffer = toku_malloc(file->buffer_size);
124 if (file->buffer == NULL)
125 result = get_error_errno();
126 else
127 newbuffer = true;
128 }
129 if (result == 0) {
130 int r = setvbuf(file->file->file,
131 static_cast<char *>(file->buffer),
132 _IOFBF,
133 file->buffer_size);
134 if (r != 0) {
135 result = get_error_errno();
136 if (newbuffer) {
137 toku_free(file->buffer);
138 file->buffer = NULL;
139 }
140 }
141 }
142 return result;
143}
144
145static void cleanup_big_buffer(struct file_info *file) {
146 if (file->buffer) {
147 toku_free(file->buffer);
148 file->buffer = NULL;
149 }
150}
151
152int ft_loader_init_file_infos(struct file_infos *fi) {
153 int result = 0;
154 toku_mutex_init(*loader_fi_lock_mutex_key, &fi->lock, nullptr);
155 fi->n_files = 0;
156 fi->n_files_limit = 1;
157 fi->n_files_open = 0;
158 fi->n_files_extant = 0;
159 MALLOC_N(fi->n_files_limit, fi->file_infos);
160 if (fi->file_infos == NULL)
161 result = get_error_errno();
162 return result;
163}
164
165void ft_loader_fi_destroy (struct file_infos *fi, bool is_error)
166// Effect: Free the resources in the fi.
167// If is_error then we close and unlink all the temp files.
168// If !is_error then requires that all the temp files have been closed and destroyed
169// No error codes are returned. If anything goes wrong with closing and unlinking then it's only in an is_error case, so we don't care.
170{
171 if (fi->file_infos == NULL) {
172 // ft_loader_init_file_infos guarantees this isn't null, so if it is, we know it hasn't been inited yet and we don't need to destroy it.
173 return;
174 }
175 toku_mutex_destroy(&fi->lock);
176 if (!is_error) {
177 invariant(fi->n_files_open==0);
178 invariant(fi->n_files_extant==0);
179 }
180 for (int i=0; i<fi->n_files; i++) {
181 if (fi->file_infos[i].is_open) {
182 invariant(is_error);
183 toku_os_fclose(fi->file_infos[i].file); // don't check for errors, since we are in an error case.
184 }
185 if (fi->file_infos[i].is_extant) {
186 invariant(is_error);
187 unlink(fi->file_infos[i].fname);
188 toku_free(fi->file_infos[i].fname);
189 }
190 cleanup_big_buffer(&fi->file_infos[i]);
191 }
192 toku_free(fi->file_infos);
193 fi->n_files=0;
194 fi->n_files_limit=0;
195 fi->file_infos = NULL;
196}
197
198static int open_file_add(struct file_infos *fi,
199 TOKU_FILE *file,
200 char *fname,
201 /* out */ FIDX *idx) {
202 int result = 0;
203 toku_mutex_lock(&fi->lock);
204 if (fi->n_files >= fi->n_files_limit) {
205 fi->n_files_limit *=2;
206 XREALLOC_N(fi->n_files_limit, fi->file_infos);
207 }
208 invariant(fi->n_files < fi->n_files_limit);
209 fi->file_infos[fi->n_files].is_open = true;
210 fi->file_infos[fi->n_files].is_extant = true;
211 fi->file_infos[fi->n_files].fname = fname;
212 fi->file_infos[fi->n_files].file = file;
213 fi->file_infos[fi->n_files].n_rows = 0;
214 fi->file_infos[fi->n_files].buffer_size = FILE_BUFFER_SIZE;
215 fi->file_infos[fi->n_files].buffer = NULL;
216 result = add_big_buffer(&fi->file_infos[fi->n_files]);
217 if (result == 0) {
218 idx->idx = fi->n_files;
219 fi->n_files++;
220 fi->n_files_extant++;
221 fi->n_files_open++;
222 }
223 toku_mutex_unlock(&fi->lock);
224 return result;
225}
226
227int ft_loader_fi_reopen (struct file_infos *fi, FIDX idx, const char *mode) {
228 int result = 0;
229 toku_mutex_lock(&fi->lock);
230 int i = idx.idx;
231 invariant(i >= 0 && i < fi->n_files);
232 invariant(!fi->file_infos[i].is_open);
233 invariant(fi->file_infos[i].is_extant);
234 fi->file_infos[i].file =
235 toku_os_fopen(fi->file_infos[i].fname, mode, *tokudb_file_load_key);
236 if (fi->file_infos[i].file == NULL) {
237 result = get_error_errno();
238 } else {
239 fi->file_infos[i].is_open = true;
240 // No longer need the big buffer for reopened files. Don't allocate the space, we need it elsewhere.
241 //add_big_buffer(&fi->file_infos[i]);
242 fi->n_files_open++;
243 }
244 toku_mutex_unlock(&fi->lock);
245 return result;
246}
247
248int ft_loader_fi_close (struct file_infos *fi, FIDX idx, bool require_open)
249{
250 int result = 0;
251 toku_mutex_lock(&fi->lock);
252 invariant(idx.idx >=0 && idx.idx < fi->n_files);
253 if (fi->file_infos[idx.idx].is_open) {
254 invariant(fi->n_files_open>0); // loader-cleanup-test failure
255 fi->n_files_open--;
256 fi->file_infos[idx.idx].is_open = false;
257 int r = toku_os_fclose(fi->file_infos[idx.idx].file);
258 if (r)
259 result = get_error_errno();
260 cleanup_big_buffer(&fi->file_infos[idx.idx]);
261 } else if (require_open)
262 result = EINVAL;
263 toku_mutex_unlock(&fi->lock);
264 return result;
265}
266
267int ft_loader_fi_unlink (struct file_infos *fi, FIDX idx) {
268 int result = 0;
269 toku_mutex_lock(&fi->lock);
270 int id = idx.idx;
271 invariant(id >=0 && id < fi->n_files);
272 if (fi->file_infos[id].is_extant) { // must still exist
273 invariant(fi->n_files_extant>0);
274 fi->n_files_extant--;
275 invariant(!fi->file_infos[id].is_open); // must be closed before we unlink
276 fi->file_infos[id].is_extant = false;
277 int r = unlink(fi->file_infos[id].fname);
278 if (r != 0)
279 result = get_error_errno();
280 toku_free(fi->file_infos[id].fname);
281 fi->file_infos[id].fname = NULL;
282 } else
283 result = EINVAL;
284 toku_mutex_unlock(&fi->lock);
285 return result;
286}
287
288int
289ft_loader_fi_close_all(struct file_infos *fi) {
290 int rval = 0;
291 for (int i = 0; i < fi->n_files; i++) {
292 int r;
293 FIDX idx = { i };
294 r = ft_loader_fi_close(fi, idx, false); // ignore files that are already closed
295 if (rval == 0 && r)
296 rval = r; // capture first error
297 }
298 return rval;
299}
300
301int ft_loader_open_temp_file (FTLOADER bl, FIDX *file_idx)
302/* Effect: Open a temporary file in read-write mode. Save enough information to close and delete the file later.
303 * Return value: 0 on success, an error number otherwise.
304 * On error, *file_idx and *fnamep will be unmodified.
305 * The open file will be saved in bl->file_infos so that even if errors happen we can free them all.
306 */
307{
308 int result = 0;
309 if (result) // debug hack
310 return result;
311 TOKU_FILE *f = NULL;
312 int fd = -1;
313 char *fname = toku_strdup(bl->temp_file_template);
314 if (fname == NULL)
315 result = get_error_errno();
316 else {
317 fd = mkstemp(fname);
318 if (fd < 0) {
319 result = get_error_errno();
320 } else {
321 f = toku_os_fdopen(fd, "r+", fname, *tokudb_file_tmp_key);
322 if (f->file == nullptr)
323 result = get_error_errno();
324 else
325 result = open_file_add(&bl->file_infos, f, fname, file_idx);
326 }
327 }
328 if (result != 0) {
329 if (fd >= 0) {
330 toku_os_close(fd);
331 unlink(fname);
332 }
333 if (f != NULL)
334 toku_os_fclose(f); // don't check for error because we're already in an error case
335 if (fname != NULL)
336 toku_free(fname);
337 }
338 return result;
339}
340
341void toku_ft_loader_internal_destroy(FTLOADER bl, bool is_error) {
342 ft_loader_lock_destroy(bl);
343
344 // These frees rely on the fact that if you free a NULL pointer then nothing bad happens.
345 toku_free(bl->dbs);
346 toku_free(bl->descriptors);
347 toku_free(bl->root_xids_that_created);
348 if (bl->new_fnames_in_env) {
349 for (int i = 0; i < bl->N; i++)
350 toku_free((char*)bl->new_fnames_in_env[i]);
351 toku_free(bl->new_fnames_in_env);
352 }
353 toku_free(bl->extracted_datasizes);
354 toku_free(bl->bt_compare_funs);
355 toku_free((char*)bl->temp_file_template);
356 ft_loader_fi_destroy(&bl->file_infos, is_error);
357
358 for (int i = 0; i < bl->N; i++)
359 destroy_rowset(&bl->rows[i]);
360 toku_free(bl->rows);
361
362 for (int i = 0; i < bl->N; i++)
363 destroy_merge_fileset(&bl->fs[i]);
364 toku_free(bl->fs);
365
366 if (bl->last_key) {
367 for (int i=0; i < bl->N; i++) {
368 toku_free(bl->last_key[i].data);
369 }
370 toku_free(bl->last_key);
371 bl->last_key = NULL;
372 }
373
374 destroy_rowset(&bl->primary_rowset);
375 if (bl->primary_rowset_queue) {
376 toku_queue_destroy(bl->primary_rowset_queue);
377 bl->primary_rowset_queue = nullptr;
378 }
379
380 for (int i=0; i<bl->N; i++) {
381 if ( bl->fractal_queues ) {
382 invariant(bl->fractal_queues[i]==NULL);
383 }
384 }
385 toku_free(bl->fractal_threads);
386 toku_free(bl->fractal_queues);
387 toku_free(bl->fractal_threads_live);
388
389 if (bl->did_reserve_memory) {
390 invariant(bl->cachetable);
391 toku_cachetable_release_reserved_memory(bl->cachetable, bl->reserved_memory);
392 }
393
394 ft_loader_destroy_error_callback(&bl->error_callback);
395 ft_loader_destroy_poll_callback(&bl->poll_callback);
396
397 //printf("Progress=%d/%d\n", bl->progress, PROGRESS_MAX);
398
399 toku_free(bl);
400}
401
402static void *extractor_thread (void*);
403
404#define MAX(a,b) (((a)<(b)) ? (b) : (a))
405
406static uint64_t memory_per_rowset_during_extract (FTLOADER bl)
407// Return how much memory can be allocated for each rowset.
408{
409 if (size_factor==1) {
410 return 16*1024;
411 } else {
412 // There is a primary rowset being maintained by the foreground thread.
413 // There could be two more in the queue.
414 // There is one rowset for each index (bl->N) being filled in.
415 // Later we may have sort_and_write operations spawning in parallel, and will need to account for that.
416 int n_copies = (1 // primary rowset
417 +EXTRACTOR_QUEUE_DEPTH // the number of primaries in the queue
418 +bl->N // the N rowsets being constructed by the extractor thread.
419 +bl->N // the N sort buffers
420 +1 // Give the extractor thread one more so that it can have temporary space for sorting. This is overkill.
421 );
422 int64_t extra_reserved_memory = bl->N * FILE_BUFFER_SIZE; // for each index we are writing to a file at any given time.
423 int64_t tentative_rowset_size = ((int64_t)(bl->reserved_memory - extra_reserved_memory))/(n_copies);
424 return MAX(tentative_rowset_size, (int64_t)MIN_ROWSET_MEMORY);
425 }
426}
427
428static unsigned ft_loader_get_fractal_workers_count(FTLOADER bl) {
429 unsigned w = 0;
430 while (1) {
431 ft_loader_lock(bl);
432 w = bl->fractal_workers;
433 ft_loader_unlock(bl);
434 if (w != 0)
435 break;
436 toku_pthread_yield(); // maybe use a cond var instead
437 }
438 return w;
439}
440
441static void ft_loader_set_fractal_workers_count(FTLOADER bl) {
442 ft_loader_lock(bl);
443 if (bl->fractal_workers == 0)
444 bl->fractal_workers = 1;
445 ft_loader_unlock(bl);
446}
447
448// To compute a merge, we have a certain amount of memory to work with.
449// We perform only one fanin at a time.
450// If the fanout is F then we are using
451// F merges. Each merge uses
452// DBUFIO_DEPTH buffers for double buffering. Each buffer is of size at least MERGE_BUF_SIZE
453// so the memory is
454// F*MERGE_BUF_SIZE*DBUFIO_DEPTH storage.
455// We use some additional space to buffer the outputs.
456// That's FILE_BUFFER_SIZE for writing to a merge file if we are writing to a mergefile.
457// And we have FRACTAL_WRITER_ROWSETS*MERGE_BUF_SIZE per queue
458// And if we are doing a fractal, each worker could have have a fractal tree that it's working on.
459//
460// DBUFIO_DEPTH*F*MERGE_BUF_SIZE + FRACTAL_WRITER_ROWSETS*MERGE_BUF_SIZE + WORKERS*NODESIZE*2 <= RESERVED_MEMORY
461
462static int64_t memory_avail_during_merge(FTLOADER bl, bool is_fractal_node) {
463 // avail memory = reserved memory - WORKERS*NODESIZE*2 for the last merge stage only
464 int64_t avail_memory = bl->reserved_memory;
465 if (is_fractal_node) {
466 // reserve space for the fractal writer thread buffers
467 avail_memory -= (int64_t)ft_loader_get_fractal_workers_count(bl) * (int64_t)default_loader_nodesize * 2; // compressed and uncompressed buffers
468 }
469 return avail_memory;
470}
471
472static int merge_fanin (FTLOADER bl, bool is_fractal_node) {
473 // return number of temp files to read in this pass
474 int64_t memory_avail = memory_avail_during_merge(bl, is_fractal_node);
475 int64_t nbuffers = memory_avail / (int64_t)TARGET_MERGE_BUF_SIZE;
476 if (is_fractal_node)
477 nbuffers -= FRACTAL_WRITER_ROWSETS;
478 return MAX(nbuffers / (int64_t)DBUFIO_DEPTH, (int)MIN_MERGE_FANIN);
479}
480
481static uint64_t memory_per_rowset_during_merge (FTLOADER bl, int merge_factor, bool is_fractal_node // if it is being sent to a q
482 ) {
483 int64_t memory_avail = memory_avail_during_merge(bl, is_fractal_node);
484 int64_t nbuffers = DBUFIO_DEPTH * merge_factor;
485 if (is_fractal_node)
486 nbuffers += FRACTAL_WRITER_ROWSETS;
487 return MAX(memory_avail / nbuffers, (int64_t)MIN_MERGE_BUF_SIZE);
488}
489
490int toku_ft_loader_internal_init (/* out */ FTLOADER *blp,
491 CACHETABLE cachetable,
492 generate_row_for_put_func g,
493 DB *src_db,
494 int N, FT_HANDLE fts[/*N*/], DB* dbs[/*N*/],
495 const char *new_fnames_in_env[/*N*/],
496 ft_compare_func bt_compare_functions[/*N*/],
497 const char *temp_file_template,
498 LSN load_lsn,
499 TOKUTXN txn,
500 bool reserve_memory,
501 uint64_t reserve_memory_size,
502 bool compress_intermediates,
503 bool allow_puts)
504// Effect: Allocate and initialize a FTLOADER, but do not create the extractor thread.
505{
506 FTLOADER CALLOC(bl); // initialized to all zeros (hence CALLOC)
507 if (!bl) return get_error_errno();
508
509 bl->generate_row_for_put = g;
510 bl->cachetable = cachetable;
511 if (reserve_memory && bl->cachetable) {
512 bl->did_reserve_memory = true;
513 bl->reserved_memory = toku_cachetable_reserve_memory(bl->cachetable, 2.0/3.0, reserve_memory_size); // allocate 2/3 of the unreserved part (which is 3/4 of the memory to start with).
514 }
515 else {
516 bl->did_reserve_memory = false;
517 bl->reserved_memory = 512*1024*1024; // if no cache table use 512MB.
518 }
519 bl->compress_intermediates = compress_intermediates;
520 bl->allow_puts = allow_puts;
521 bl->src_db = src_db;
522 bl->N = N;
523 bl->load_lsn = load_lsn;
524 if (txn) {
525 bl->load_root_xid = txn->txnid.parent_id64;
526 }
527 else {
528 bl->load_root_xid = TXNID_NONE;
529 }
530
531 ft_loader_init_error_callback(&bl->error_callback);
532 ft_loader_init_poll_callback(&bl->poll_callback);
533
534#define MY_CALLOC_N(n,v) CALLOC_N(n,v); if (!v) { int r = get_error_errno(); toku_ft_loader_internal_destroy(bl, true); return r; }
535#define SET_TO_MY_STRDUP(lval, s) do { char *v = toku_strdup(s); if (!v) { int r = get_error_errno(); toku_ft_loader_internal_destroy(bl, true); return r; } lval = v; } while (0)
536
537 MY_CALLOC_N(N, bl->root_xids_that_created);
538 for (int i=0; i<N; i++) if (fts[i]) bl->root_xids_that_created[i]=fts[i]->ft->h->root_xid_that_created;
539 MY_CALLOC_N(N, bl->dbs);
540 for (int i=0; i<N; i++) if (fts[i]) bl->dbs[i]=dbs[i];
541 MY_CALLOC_N(N, bl->descriptors);
542 for (int i=0; i<N; i++) if (fts[i]) bl->descriptors[i]=&fts[i]->ft->descriptor;
543 MY_CALLOC_N(N, bl->new_fnames_in_env);
544 for (int i=0; i<N; i++) SET_TO_MY_STRDUP(bl->new_fnames_in_env[i], new_fnames_in_env[i]);
545 MY_CALLOC_N(N, bl->extracted_datasizes); // the calloc_n zeroed everything, which is what we want
546 MY_CALLOC_N(N, bl->bt_compare_funs);
547 for (int i=0; i<N; i++) bl->bt_compare_funs[i] = bt_compare_functions[i];
548
549 MY_CALLOC_N(N, bl->fractal_queues);
550 for (int i=0; i<N; i++) bl->fractal_queues[i]=NULL;
551 MY_CALLOC_N(N, bl->fractal_threads);
552 MY_CALLOC_N(N, bl->fractal_threads_live);
553 for (int i=0; i<N; i++) bl->fractal_threads_live[i] = false;
554
555 {
556 int r = ft_loader_init_file_infos(&bl->file_infos);
557 if (r!=0) { toku_ft_loader_internal_destroy(bl, true); return r; }
558 }
559
560 SET_TO_MY_STRDUP(bl->temp_file_template, temp_file_template);
561
562 bl->n_rows = 0;
563 bl->progress = 0;
564 bl->progress_callback_result = 0;
565
566 MY_CALLOC_N(N, bl->rows);
567 MY_CALLOC_N(N, bl->fs);
568 MY_CALLOC_N(N, bl->last_key);
569 for(int i=0;i<N;i++) {
570 {
571 int r = init_rowset(&bl->rows[i], memory_per_rowset_during_extract(bl));
572 if (r!=0) { toku_ft_loader_internal_destroy(bl, true); return r; }
573 }
574 init_merge_fileset(&bl->fs[i]);
575 bl->last_key[i].flags = DB_DBT_REALLOC; // don't really need this, but it's nice to maintain it. We use ulen to keep track of the realloced space.
576 }
577
578 {
579 int r = init_rowset(&bl->primary_rowset, memory_per_rowset_during_extract(bl));
580 if (r!=0) { toku_ft_loader_internal_destroy(bl, true); return r; }
581 }
582 { int r = toku_queue_create(&bl->primary_rowset_queue, EXTRACTOR_QUEUE_DEPTH);
583 if (r!=0) { toku_ft_loader_internal_destroy(bl, true); return r; }
584 }
585 {
586 ft_loader_lock_init(bl);
587 }
588
589 *blp = bl;
590
591 return 0;
592}
593
594int toku_ft_loader_open (FTLOADER *blp, /* out */
595 CACHETABLE cachetable,
596 generate_row_for_put_func g,
597 DB *src_db,
598 int N, FT_HANDLE fts[/*N*/], DB* dbs[/*N*/],
599 const char *new_fnames_in_env[/*N*/],
600 ft_compare_func bt_compare_functions[/*N*/],
601 const char *temp_file_template,
602 LSN load_lsn,
603 TOKUTXN txn,
604 bool reserve_memory,
605 uint64_t reserve_memory_size,
606 bool compress_intermediates,
607 bool allow_puts) {
608// Effect: called by DB_ENV->create_loader to create an ft loader.
609// Arguments:
610// blp Return a ft loader ("bulk loader") here.
611// g The function for generating a row
612// src_db The source database. Needed by g. May be NULL if that's ok with g.
613// N The number of dbs to create.
614// dbs An array of open databases. Used by g. The data will be put in these database.
615// new_fnames The file names (these strings are owned by the caller: we make a copy for our own purposes).
616// temp_file_template A template suitable for mkstemp()
617// reserve_memory Cause the loader to reserve memory for its use from the cache table.
618// compress_intermediates Cause the loader to compress intermediate loader files.
619// allow_puts Prepare the loader for rows to insert. When puts are disabled, the loader does not run the
620// extractor or the fractal tree writer threads.
621// Return value: 0 on success, an error number otherwise.
622 int result = 0;
623 {
624 int r = toku_ft_loader_internal_init(blp, cachetable, g, src_db,
625 N, fts, dbs,
626 new_fnames_in_env,
627 bt_compare_functions,
628 temp_file_template,
629 load_lsn,
630 txn,
631 reserve_memory,
632 reserve_memory_size,
633 compress_intermediates,
634 allow_puts);
635 if (r!=0) result = r;
636 }
637 if (result == 0 && allow_puts) {
638 FTLOADER bl = *blp;
639 int r = toku_pthread_create(*extractor_thread_key,
640 &bl->extractor_thread,
641 nullptr,
642 extractor_thread,
643 static_cast<void *>(bl));
644 if (r == 0) {
645 bl->extractor_live = true;
646 } else {
647 result = r;
648 (void) toku_ft_loader_internal_destroy(bl, true);
649 }
650 }
651 return result;
652}
653
654static void ft_loader_set_panic(FTLOADER bl, int error, bool callback, int which_db, DBT *key, DBT *val) {
655 DB *db = nullptr;
656 if (bl && bl->dbs && which_db >= 0 && which_db < bl->N) {
657 db = bl->dbs[which_db];
658 }
659 int r = ft_loader_set_error(&bl->error_callback, error, db, which_db, key, val);
660 if (r == 0 && callback)
661 ft_loader_call_error_function(&bl->error_callback);
662}
663
664// One of the tests uses this.
665TOKU_FILE *toku_bl_fidx2file(FTLOADER bl, FIDX i) {
666 toku_mutex_lock(&bl->file_infos.lock);
667 invariant(i.idx >= 0 && i.idx < bl->file_infos.n_files);
668 invariant(bl->file_infos.file_infos[i.idx].is_open);
669 TOKU_FILE *result = bl->file_infos.file_infos[i.idx].file;
670 toku_mutex_unlock(&bl->file_infos.lock);
671 return result;
672}
673
674static int bl_finish_compressed_write(TOKU_FILE *stream, struct wbuf *wb) {
675 int r = 0;
676 char *compressed_buf = NULL;
677 const size_t data_size = wb->ndone;
678 invariant(data_size > 0);
679 invariant(data_size <= MAX_UNCOMPRESSED_BUF);
680
681 int n_sub_blocks = 0;
682 int sub_block_size = 0;
683
684 r = choose_sub_block_size(wb->ndone, max_sub_blocks, &sub_block_size, &n_sub_blocks);
685 invariant(r==0);
686 invariant(0 < n_sub_blocks && n_sub_blocks <= max_sub_blocks);
687 invariant(sub_block_size > 0);
688
689 struct sub_block sub_block[max_sub_blocks];
690 // set the initial sub block size for all of the sub blocks
691 for (int i = 0; i < n_sub_blocks; i++) {
692 sub_block_init(&sub_block[i]);
693 }
694 set_all_sub_block_sizes(data_size, sub_block_size, n_sub_blocks, sub_block);
695
696 size_t compressed_len = get_sum_compressed_size_bound(n_sub_blocks, sub_block, TOKU_DEFAULT_COMPRESSION_METHOD);
697 const size_t sub_block_header_len = sub_block_header_size(n_sub_blocks);
698 const size_t other_overhead = sizeof(uint32_t); //total_size
699 const size_t header_len = sub_block_header_len + other_overhead;
700 MALLOC_N(header_len + compressed_len, compressed_buf);
701 if (compressed_buf == nullptr) {
702 return ENOMEM;
703 }
704
705 // compress all of the sub blocks
706 char *uncompressed_ptr = (char*)wb->buf;
707 char *compressed_ptr = compressed_buf + header_len;
708 compressed_len = compress_all_sub_blocks(n_sub_blocks, sub_block, uncompressed_ptr, compressed_ptr,
709 get_num_cores(), get_ft_pool(), TOKU_DEFAULT_COMPRESSION_METHOD);
710
711 //total_size does NOT include itself
712 uint32_t total_size = compressed_len + sub_block_header_len;
713 // serialize the sub block header
714 uint32_t *ptr = (uint32_t *)(compressed_buf);
715 *ptr++ = toku_htod32(total_size);
716 *ptr++ = toku_htod32(n_sub_blocks);
717 for (int i=0; i<n_sub_blocks; i++) {
718 ptr[0] = toku_htod32(sub_block[i].compressed_size);
719 ptr[1] = toku_htod32(sub_block[i].uncompressed_size);
720 ptr[2] = toku_htod32(sub_block[i].xsum);
721 ptr += 3;
722 }
723 // Mark as written
724 wb->ndone = 0;
725
726 size_t size_to_write = total_size + 4; // Includes writing total_size
727
728 r = toku_os_fwrite(compressed_buf, 1, size_to_write, stream);
729
730 if (compressed_buf) {
731 toku_free(compressed_buf);
732 }
733 return r;
734}
735
736static int bl_compressed_write(void *ptr,
737 size_t nbytes,
738 TOKU_FILE *stream,
739 struct wbuf *wb) {
740 invariant(wb->size <= MAX_UNCOMPRESSED_BUF);
741 size_t bytes_left = nbytes;
742 char *buf = (char *)ptr;
743
744 while (bytes_left > 0) {
745 size_t bytes_to_copy = bytes_left;
746 if (wb->ndone + bytes_to_copy > wb->size) {
747 bytes_to_copy = wb->size - wb->ndone;
748 }
749 wbuf_nocrc_literal_bytes(wb, buf, bytes_to_copy);
750 if (wb->ndone == wb->size) {
751 //Compress, write to disk, and empty out wb
752 int r = bl_finish_compressed_write(stream, wb);
753 if (r != 0) {
754 errno = r;
755 return -1;
756 }
757 wb->ndone = 0;
758 }
759 bytes_left -= bytes_to_copy;
760 buf += bytes_to_copy;
761 }
762 return 0;
763}
764
765static int bl_fwrite(void *ptr,
766 size_t size,
767 size_t nmemb,
768 TOKU_FILE *stream,
769 struct wbuf *wb,
770 FTLOADER bl)
771/* Effect: this is a wrapper for fwrite that returns 0 on success, otherwise
772 * returns an error number.
773 * Arguments:
774 * ptr the data to be writen.
775 * size the amount of data to be written.
776 * nmemb the number of units of size to be written.
777 * stream write the data here.
778 * wb where to write uncompressed data (if we're compressing) or ignore if
779 * NULL
780 * bl passed so we can panic the ft_loader if something goes wrong
781 * (recording the error number).
782 * Return value: 0 on success, an error number otherwise.
783 */
784{
785 if (!bl->compress_intermediates || !wb) {
786 return toku_os_fwrite(ptr, size, nmemb, stream);
787 } else {
788 size_t num_bytes = size * nmemb;
789 int r = bl_compressed_write(ptr, num_bytes, stream, wb);
790 if (r != 0) {
791 return r;
792 }
793 }
794 return 0;
795}
796
797static int bl_fread(void *ptr, size_t size, size_t nmemb, TOKU_FILE *stream)
798/* Effect: this is a wrapper for fread that returns 0 on success, otherwise
799 * returns an error number.
800 * Arguments:
801 * ptr read data into here.
802 * size size of data element to be read.
803 * nmemb number of data elements to be read.
804 * stream where to read the data from.
805 * Return value: 0 on success, an error number otherwise.
806 */
807{
808 return toku_os_fread(ptr, size, nmemb, stream);
809}
810
811static int bl_write_dbt(DBT *dbt,
812 TOKU_FILE *datafile,
813 uint64_t *dataoff,
814 struct wbuf *wb,
815 FTLOADER bl) {
816 int r;
817 int dlen = dbt->size;
818 if ((r=bl_fwrite(&dlen, sizeof(dlen), 1, datafile, wb, bl))) return r;
819 if ((r=bl_fwrite(dbt->data, 1, dlen, datafile, wb, bl))) return r;
820 if (dataoff)
821 *dataoff += dlen + sizeof(dlen);
822 return 0;
823}
824
825static int bl_read_dbt(/*in*/ DBT *dbt, TOKU_FILE *stream) {
826 int len;
827 {
828 int r;
829 if ((r = bl_fread(&len, sizeof(len), 1, stream))) return r;
830 invariant(len>=0);
831 }
832 if ((int)dbt->ulen<len) { dbt->ulen=len; dbt->data=toku_xrealloc(dbt->data, len); }
833 {
834 int r;
835 if ((r = bl_fread(dbt->data, 1, len, stream))) return r;
836 }
837 dbt->size = len;
838 return 0;
839}
840
841static int bl_read_dbt_from_dbufio (/*in*/DBT *dbt, DBUFIO_FILESET bfs, int filenum)
842{
843 int result = 0;
844 uint32_t len;
845 {
846 size_t n_read;
847 int r = dbufio_fileset_read(bfs, filenum, &len, sizeof(len), &n_read);
848 if (r!=0) {
849 result = r;
850 } else if (n_read<sizeof(len)) {
851 result = TOKUDB_NO_DATA; // must have run out of data prematurely. This is not EOF, it's a real error.
852 }
853 }
854 if (result==0) {
855 if (dbt->ulen<len) {
856 void * data = toku_realloc(dbt->data, len);
857 if (data==NULL) {
858 result = get_error_errno();
859 } else {
860 dbt->ulen=len;
861 dbt->data=data;
862 }
863 }
864 }
865 if (result==0) {
866 size_t n_read;
867 int r = dbufio_fileset_read(bfs, filenum, dbt->data, len, &n_read);
868 if (r!=0) {
869 result = r;
870 } else if (n_read<len) {
871 result = TOKUDB_NO_DATA; // must have run out of data prematurely. This is not EOF, it's a real error.
872 } else {
873 dbt->size = len;
874 }
875 }
876 return result;
877}
878
879int loader_write_row(DBT *key,
880 DBT *val,
881 FIDX data,
882 TOKU_FILE *dataf,
883 uint64_t *dataoff,
884 struct wbuf *wb,
885 FTLOADER bl)
886/* Effect: Given a key and a val (both DBTs), write them to a file. Increment
887 * *dataoff so that it's up to date.
888 * Arguments:
889 * key, val write these.
890 * data the file to write them to
891 * dataoff a pointer to a counter that keeps track of the amount of data
892 * written so far.
893 * wb a pointer (possibly NULL) to buffer uncompressed output
894 * bl the ft_loader (passed so we can panic if needed).
895 * Return value: 0 on success, an error number otherwise.
896 */
897{
898 //int klen = key->size;
899 //int vlen = val->size;
900 int r;
901 // we have a chance to handle the errors because when we close we can delete all the files.
902 if ((r=bl_write_dbt(key, dataf, dataoff, wb, bl))) return r;
903 if ((r=bl_write_dbt(val, dataf, dataoff, wb, bl))) return r;
904 toku_mutex_lock(&bl->file_infos.lock);
905 bl->file_infos.file_infos[data.idx].n_rows++;
906 toku_mutex_unlock(&bl->file_infos.lock);
907 return 0;
908}
909
910int loader_read_row(TOKU_FILE *f, DBT *key, DBT *val)
911/* Effect: Read a key value pair from a file. The DBTs must have DB_DBT_REALLOC
912 * set.
913 * Arguments:
914 * f where to read it from.
915 * key, val read it into these.
916 * bl passed so we can panic if needed.
917 * Return value: 0 on success, an error number otherwise.
918 * Requires: The DBTs must have DB_DBT_REALLOC
919 */
920{
921 {
922 int r = bl_read_dbt(key, f);
923 if (r!=0) return r;
924 }
925 {
926 int r = bl_read_dbt(val, f);
927 if (r!=0) return r;
928 }
929 return 0;
930}
931
932static int loader_read_row_from_dbufio (DBUFIO_FILESET bfs, int filenum, DBT *key, DBT *val)
933/* Effect: Read a key value pair from a file. The DBTs must have DB_DBT_REALLOC set.
934 * Arguments:
935 * f where to read it from.
936 * key, val read it into these.
937 * bl passed so we can panic if needed.
938 * Return value: 0 on success, an error number otherwise.
939 * Requires: The DBTs must have DB_DBT_REALLOC
940 */
941{
942 {
943 int r = bl_read_dbt_from_dbufio(key, bfs, filenum);
944 if (r!=0) return r;
945 }
946 {
947 int r = bl_read_dbt_from_dbufio(val, bfs, filenum);
948 if (r!=0) return r;
949 }
950 return 0;
951}
952
953
954int init_rowset (struct rowset *rows, uint64_t memory_budget)
955/* Effect: Initialize a collection of rows to be empty. */
956{
957 int result = 0;
958
959 rows->memory_budget = memory_budget;
960
961 rows->rows = NULL;
962 rows->data = NULL;
963
964 rows->n_rows = 0;
965 rows->n_rows_limit = 100;
966 MALLOC_N(rows->n_rows_limit, rows->rows);
967 if (rows->rows == NULL)
968 result = get_error_errno();
969 rows->n_bytes = 0;
970 rows->n_bytes_limit = (size_factor==1) ? 1024*size_factor*16 : memory_budget;
971 //printf("%s:%d n_bytes_limit=%ld (size_factor based limit=%d)\n", __FILE__, __LINE__, rows->n_bytes_limit, 1024*size_factor*16);
972 rows->data = (char *) toku_malloc(rows->n_bytes_limit);
973 if (rows->rows==NULL || rows->data==NULL) {
974 if (result == 0)
975 result = get_error_errno();
976 toku_free(rows->rows);
977 toku_free(rows->data);
978 rows->rows = NULL;
979 rows->data = NULL;
980 }
981 return result;
982}
983
984static void zero_rowset (struct rowset *rows) {
985 memset(rows, 0, sizeof(*rows));
986}
987
988void destroy_rowset (struct rowset *rows) {
989 if ( rows ) {
990 toku_free(rows->data);
991 toku_free(rows->rows);
992 zero_rowset(rows);
993 }
994}
995
996static int row_wont_fit (struct rowset *rows, size_t size)
997/* Effect: Return nonzero if adding a row of size SIZE would be too big (bigger than the buffer limit) */
998{
999 // Account for the memory used by the data and also the row structures.
1000 size_t memory_in_use = (rows->n_rows*sizeof(struct row)
1001 + rows->n_bytes);
1002 return (rows->memory_budget < memory_in_use + size);
1003}
1004
1005int add_row (struct rowset *rows, DBT *key, DBT *val)
1006/* Effect: add a row to a collection. */
1007{
1008 int result = 0;
1009 if (rows->n_rows >= rows->n_rows_limit) {
1010 struct row *old_rows = rows->rows;
1011 size_t old_n_rows_limit = rows->n_rows_limit;
1012 rows->n_rows_limit *= 2;
1013 REALLOC_N(rows->n_rows_limit, rows->rows);
1014 if (rows->rows == NULL) {
1015 result = get_error_errno();
1016 rows->rows = old_rows;
1017 rows->n_rows_limit = old_n_rows_limit;
1018 return result;
1019 }
1020 }
1021 size_t off = rows->n_bytes;
1022 size_t next_off = off + key->size + val->size;
1023
1024 struct row newrow;
1025 memset(&newrow, 0, sizeof newrow); newrow.off = off; newrow.klen = key->size; newrow.vlen = val->size;
1026
1027 rows->rows[rows->n_rows++] = newrow;
1028 if (next_off > rows->n_bytes_limit) {
1029 size_t old_n_bytes_limit = rows->n_bytes_limit;
1030 while (next_off > rows->n_bytes_limit) {
1031 rows->n_bytes_limit = rows->n_bytes_limit*2;
1032 }
1033 invariant(next_off <= rows->n_bytes_limit);
1034 char *old_data = rows->data;
1035 REALLOC_N(rows->n_bytes_limit, rows->data);
1036 if (rows->data == NULL) {
1037 result = get_error_errno();
1038 rows->data = old_data;
1039 rows->n_bytes_limit = old_n_bytes_limit;
1040 return result;
1041 }
1042 }
1043 memcpy(rows->data+off, key->data, key->size);
1044 memcpy(rows->data+off+key->size, val->data, val->size);
1045 rows->n_bytes = next_off;
1046 return result;
1047}
1048
1049static int process_primary_rows (FTLOADER bl, struct rowset *primary_rowset);
1050
1051static int finish_primary_rows_internal (FTLOADER bl)
1052// now we have been asked to finish up.
1053// Be sure to destroy the rowsets.
1054{
1055 int *MALLOC_N(bl->N, ra);
1056 if (ra==NULL) return get_error_errno();
1057
1058 for (int i = 0; i < bl->N; i++) {
1059 //printf("%s:%d extractor finishing index %d with %ld rows\n", __FILE__, __LINE__, i, rows->n_rows);
1060 ra[i] = sort_and_write_rows(bl->rows[i], &(bl->fs[i]), bl, i, bl->dbs[i], bl->bt_compare_funs[i]);
1061 zero_rowset(&bl->rows[i]);
1062 }
1063
1064 // accept any of the error codes (in this case, the last one).
1065 int r = 0;
1066 for (int i = 0; i < bl->N; i++)
1067 if (ra[i] != 0)
1068 r = ra[i];
1069
1070 toku_free(ra);
1071 return r;
1072}
1073
1074static int finish_primary_rows (FTLOADER bl) {
1075 return finish_primary_rows_internal (bl);
1076}
1077
1078static void* extractor_thread (void *blv) {
1079 FTLOADER bl = (FTLOADER)blv;
1080 int r = 0;
1081 while (1) {
1082 void *item = nullptr;
1083 {
1084 int rq = toku_queue_deq(bl->primary_rowset_queue, &item, NULL, NULL);
1085 if (rq==EOF) break;
1086 invariant(rq==0); // other errors are arbitrarily bad.
1087 }
1088 struct rowset *primary_rowset = (struct rowset *)item;
1089
1090 //printf("%s:%d extractor got %ld rows\n", __FILE__, __LINE__, primary_rowset.n_rows);
1091
1092 // Now we have some rows to output
1093 {
1094 r = process_primary_rows(bl, primary_rowset);
1095 if (r)
1096 ft_loader_set_panic(bl, r, false, 0, nullptr, nullptr);
1097 }
1098 }
1099
1100 //printf("%s:%d extractor finishing\n", __FILE__, __LINE__);
1101 if (r == 0) {
1102 r = finish_primary_rows(bl);
1103 if (r)
1104 ft_loader_set_panic(bl, r, false, 0, nullptr, nullptr);
1105 }
1106 toku_instr_delete_current_thread();
1107 return nullptr;
1108}
1109
1110static void enqueue_for_extraction(FTLOADER bl) {
1111 //printf("%s:%d enqueing %ld items\n", __FILE__, __LINE__, bl->primary_rowset.n_rows);
1112 struct rowset *XMALLOC(enqueue_me);
1113 *enqueue_me = bl->primary_rowset;
1114 zero_rowset(&bl->primary_rowset);
1115 int r = toku_queue_enq(bl->primary_rowset_queue, (void*)enqueue_me, 1, NULL);
1116 resource_assert_zero(r);
1117}
1118
1119static int loader_do_put(FTLOADER bl,
1120 DBT *pkey,
1121 DBT *pval)
1122{
1123 int result;
1124 result = add_row(&bl->primary_rowset, pkey, pval);
1125 if (result == 0 && row_wont_fit(&bl->primary_rowset, 0)) {
1126 // queue the rows for further processing by the extractor thread.
1127 //printf("%s:%d please extract %ld\n", __FILE__, __LINE__, bl->primary_rowset.n_rows);
1128 enqueue_for_extraction(bl);
1129 {
1130 int r = init_rowset(&bl->primary_rowset, memory_per_rowset_during_extract(bl));
1131 // bl->primary_rowset will get destroyed by toku_ft_loader_abort
1132 if (r != 0)
1133 result = r;
1134 }
1135 }
1136 return result;
1137}
1138
1139static int
1140finish_extractor (FTLOADER bl) {
1141 //printf("%s:%d now finishing extraction\n", __FILE__, __LINE__);
1142
1143 int rval;
1144
1145 if (bl->primary_rowset.n_rows>0) {
1146 enqueue_for_extraction(bl);
1147 } else {
1148 destroy_rowset(&bl->primary_rowset);
1149 }
1150 //printf("%s:%d please finish extraction\n", __FILE__, __LINE__);
1151 {
1152 int r = toku_queue_eof(bl->primary_rowset_queue);
1153 invariant(r==0);
1154 }
1155 //printf("%s:%d joining\n", __FILE__, __LINE__);
1156 {
1157 void *toku_pthread_retval;
1158 int r = toku_pthread_join(bl->extractor_thread, &toku_pthread_retval);
1159 resource_assert_zero(r);
1160 invariant(toku_pthread_retval == NULL);
1161 bl->extractor_live = false;
1162 }
1163 {
1164 int r = toku_queue_destroy(bl->primary_rowset_queue);
1165 invariant(r==0);
1166 bl->primary_rowset_queue = nullptr;
1167 }
1168
1169 rval = ft_loader_fi_close_all(&bl->file_infos);
1170
1171 //printf("%s:%d joined\n", __FILE__, __LINE__);
1172 return rval;
1173}
1174
1175static const DBT zero_dbt = {0,0,0,0};
1176
1177static DBT make_dbt (void *data, uint32_t size) {
1178 DBT result = zero_dbt;
1179 result.data = data;
1180 result.size = size;
1181 return result;
1182}
1183
1184#define inc_error_count() error_count++
1185
1186static TXNID leafentry_xid(FTLOADER bl, int which_db) {
1187 TXNID le_xid = TXNID_NONE;
1188 if (bl->root_xids_that_created && bl->load_root_xid != bl->root_xids_that_created[which_db])
1189 le_xid = bl->load_root_xid;
1190 return le_xid;
1191}
1192
1193size_t ft_loader_leafentry_size(size_t key_size, size_t val_size, TXNID xid) {
1194 size_t s = 0;
1195 if (xid == TXNID_NONE)
1196 s = LE_CLEAN_MEMSIZE(val_size) + key_size + sizeof(uint32_t);
1197 else
1198 s = LE_MVCC_COMMITTED_MEMSIZE(val_size) + key_size + sizeof(uint32_t);
1199 return s;
1200}
1201
1202static int process_primary_rows_internal (FTLOADER bl, struct rowset *primary_rowset)
1203// process the rows in primary_rowset, and then destroy the rowset.
1204// if FLUSH is true then write all the buffered rows out.
1205// if primary_rowset is NULL then treat it as empty.
1206{
1207 int error_count = 0;
1208 int *XMALLOC_N(bl->N, error_codes);
1209
1210 // If we parallelize the first for loop, dest_keys/dest_vals init&cleanup need to move inside
1211 DBT_ARRAY dest_keys;
1212 DBT_ARRAY dest_vals;
1213 toku_dbt_array_init(&dest_keys, 1);
1214 toku_dbt_array_init(&dest_vals, 1);
1215
1216 for (int i = 0; i < bl->N; i++) {
1217 unsigned int klimit,vlimit; // maximum row sizes.
1218 toku_ft_get_maximum_advised_key_value_lengths(&klimit, &vlimit);
1219
1220 error_codes[i] = 0;
1221 struct rowset *rows = &(bl->rows[i]);
1222 struct merge_fileset *fs = &(bl->fs[i]);
1223 ft_compare_func compare = bl->bt_compare_funs[i];
1224
1225 // Don't parallelize this loop, or we have to lock access to add_row() which would be a lot of overehad.
1226 // Also this way we can reuse the DB_DBT_REALLOC'd values inside dest_keys/dest_vals without a race.
1227 for (size_t prownum=0; prownum<primary_rowset->n_rows; prownum++) {
1228 if (error_count) break;
1229
1230 struct row *prow = &primary_rowset->rows[prownum];
1231 DBT pkey = zero_dbt;
1232 DBT pval = zero_dbt;
1233 pkey.data = primary_rowset->data + prow->off;
1234 pkey.size = prow->klen;
1235 pval.data = primary_rowset->data + prow->off + prow->klen;
1236 pval.size = prow->vlen;
1237
1238
1239 DBT_ARRAY key_array;
1240 DBT_ARRAY val_array;
1241 if (bl->dbs[i] != bl->src_db) {
1242 int r = bl->generate_row_for_put(bl->dbs[i], bl->src_db, &dest_keys, &dest_vals, &pkey, &pval);
1243 if (r != 0) {
1244 error_codes[i] = r;
1245 inc_error_count();
1246 break;
1247 }
1248 paranoid_invariant(dest_keys.size <= dest_keys.capacity);
1249 paranoid_invariant(dest_vals.size <= dest_vals.capacity);
1250 paranoid_invariant(dest_keys.size == dest_vals.size);
1251
1252 key_array = dest_keys;
1253 val_array = dest_vals;
1254 } else {
1255 key_array.size = key_array.capacity = 1;
1256 key_array.dbts = &pkey;
1257
1258 val_array.size = val_array.capacity = 1;
1259 val_array.dbts = &pval;
1260 }
1261 for (uint32_t row = 0; row < key_array.size; row++) {
1262 DBT *dest_key = &key_array.dbts[row];
1263 DBT *dest_val = &val_array.dbts[row];
1264 if (dest_key->size > klimit) {
1265 error_codes[i] = EINVAL;
1266 fprintf(stderr, "Key too big (keysize=%d bytes, limit=%d bytes)\n", dest_key->size, klimit);
1267 inc_error_count();
1268 break;
1269 }
1270 if (dest_val->size > vlimit) {
1271 error_codes[i] = EINVAL;
1272 fprintf(stderr, "Row too big (rowsize=%d bytes, limit=%d bytes)\n", dest_val->size, vlimit);
1273 inc_error_count();
1274 break;
1275 }
1276
1277 bl->extracted_datasizes[i] += ft_loader_leafentry_size(dest_key->size, dest_val->size, leafentry_xid(bl, i));
1278
1279 if (row_wont_fit(rows, dest_key->size + dest_val->size)) {
1280 //printf("%s:%d rows.n_rows=%ld rows.n_bytes=%ld\n", __FILE__, __LINE__, rows->n_rows, rows->n_bytes);
1281 int r = sort_and_write_rows(*rows, fs, bl, i, bl->dbs[i], compare); // cannot spawn this because of the race on rows. If we were to create a new rows, and if sort_and_write_rows were to destroy the rows it is passed, we could spawn it, however.
1282 // If we do spawn this, then we must account for the additional storage in the memory_per_rowset() function.
1283 init_rowset(rows, memory_per_rowset_during_extract(bl)); // we passed the contents of rows to sort_and_write_rows.
1284 if (r != 0) {
1285 error_codes[i] = r;
1286 inc_error_count();
1287 break;
1288 }
1289 }
1290 int r = add_row(rows, dest_key, dest_val);
1291 if (r != 0) {
1292 error_codes[i] = r;
1293 inc_error_count();
1294 break;
1295 }
1296 }
1297 }
1298 }
1299 toku_dbt_array_destroy(&dest_keys);
1300 toku_dbt_array_destroy(&dest_vals);
1301
1302 destroy_rowset(primary_rowset);
1303 toku_free(primary_rowset);
1304 int r = 0;
1305 if (error_count > 0) {
1306 for (int i=0; i<bl->N; i++) {
1307 if (error_codes[i]) {
1308 r = error_codes[i];
1309 ft_loader_set_panic(bl, r, false, i, nullptr, nullptr);
1310 }
1311 }
1312 invariant(r); // found the error
1313 }
1314 toku_free(error_codes);
1315 return r;
1316}
1317
1318static int process_primary_rows (FTLOADER bl, struct rowset *primary_rowset) {
1319 int r = process_primary_rows_internal (bl, primary_rowset);
1320 return r;
1321}
1322
1323int toku_ft_loader_put (FTLOADER bl, DBT *key, DBT *val)
1324/* Effect: Put a key-value pair into the ft loader. Called by DB_LOADER->put().
1325 * Return value: 0 on success, an error number otherwise.
1326 */
1327{
1328 if (!bl->allow_puts || ft_loader_get_error(&bl->error_callback))
1329 return EINVAL; // previous panic
1330 bl->n_rows++;
1331 return loader_do_put(bl, key, val);
1332}
1333
1334void toku_ft_loader_set_n_rows(FTLOADER bl, uint64_t n_rows) {
1335 bl->n_rows = n_rows;
1336}
1337
1338uint64_t toku_ft_loader_get_n_rows(FTLOADER bl) {
1339 return bl->n_rows;
1340}
1341
1342int merge_row_arrays_base (struct row dest[/*an+bn*/], struct row a[/*an*/], int an, struct row b[/*bn*/], int bn,
1343 int which_db, DB *dest_db, ft_compare_func compare,
1344
1345 FTLOADER bl,
1346 struct rowset *rowset)
1347/* Effect: Given two arrays of rows, a and b, merge them using the comparison function, and write them into dest.
1348 * This function is suitable for use in a mergesort.
1349 * If a pair of duplicate keys is ever noticed, then call the error_callback function (if it exists), and return DB_KEYEXIST.
1350 * Arguments:
1351 * dest write the rows here
1352 * a,b the rows being merged
1353 * an,bn the lenth of a and b respectively.
1354 * dest_db We need the dest_db to run the comparison function.
1355 * compare We need the compare function for the dest_db.
1356 */
1357{
1358 while (an>0 && bn>0) {
1359 DBT akey; memset(&akey, 0, sizeof akey); akey.data=rowset->data+a->off; akey.size=a->klen;
1360 DBT bkey; memset(&bkey, 0, sizeof bkey); bkey.data=rowset->data+b->off; bkey.size=b->klen;
1361
1362 int compare_result = compare(dest_db, &akey, &bkey);
1363 if (compare_result==0) {
1364 if (bl->error_callback.error_callback) {
1365 DBT aval; memset(&aval, 0, sizeof aval); aval.data=rowset->data + a->off + a->klen; aval.size = a->vlen;
1366 ft_loader_set_error(&bl->error_callback, DB_KEYEXIST, dest_db, which_db, &akey, &aval);
1367 }
1368 return DB_KEYEXIST;
1369 } else if (compare_result<0) {
1370 // a is smaller
1371 *dest = *a;
1372 dest++; a++; an--;
1373 } else {
1374 *dest = *b;
1375 dest++; b++; bn--;
1376 }
1377 }
1378 while (an>0) {
1379 *dest = *a;
1380 dest++; a++; an--;
1381 }
1382 while (bn>0) {
1383 *dest = *b;
1384 dest++; b++; bn--;
1385 }
1386 return 0;
1387}
1388
1389static int binary_search (int *location,
1390 const DBT *key,
1391 struct row a[/*an*/], int an,
1392 int abefore,
1393 int which_db, DB *dest_db, ft_compare_func compare,
1394 FTLOADER bl,
1395 struct rowset *rowset)
1396// Given a sorted array of rows a, and a dbt key, find the first row in a that is > key.
1397// If no such row exists, then consider the result to be equal to an.
1398// On success store abefore+the index into *location
1399// Return 0 on success.
1400// Return DB_KEYEXIST if we find a row that is equal to key.
1401{
1402 if (an==0) {
1403 *location = abefore;
1404 return 0;
1405 } else {
1406 int a2 = an/2;
1407 DBT akey = make_dbt(rowset->data+a[a2].off, a[a2].klen);
1408 int compare_result = compare(dest_db, key, &akey);
1409 if (compare_result==0) {
1410 if (bl->error_callback.error_callback) {
1411 DBT aval = make_dbt(rowset->data + a[a2].off + a[a2].klen, a[a2].vlen);
1412 ft_loader_set_error(&bl->error_callback, DB_KEYEXIST, dest_db, which_db, &akey, &aval);
1413 }
1414 return DB_KEYEXIST;
1415 } else if (compare_result<0) {
1416 // key is before a2
1417 if (an==1) {
1418 *location = abefore;
1419 return 0;
1420 } else {
1421 return binary_search(location, key,
1422 a, a2,
1423 abefore,
1424 which_db, dest_db, compare, bl, rowset);
1425 }
1426 } else {
1427 // key is after a2
1428 if (an==1) {
1429 *location = abefore + 1;
1430 return 0;
1431 } else {
1432 return binary_search(location, key,
1433 a+a2, an-a2,
1434 abefore+a2,
1435 which_db, dest_db, compare, bl, rowset);
1436 }
1437 }
1438 }
1439}
1440
1441
1442#define SWAP(typ,x,y) { typ tmp = x; x=y; y=tmp; }
1443
1444static int merge_row_arrays (struct row dest[/*an+bn*/], struct row a[/*an*/], int an, struct row b[/*bn*/], int bn,
1445 int which_db, DB *dest_db, ft_compare_func compare,
1446 FTLOADER bl,
1447 struct rowset *rowset)
1448/* Effect: Given two sorted arrays of rows, a and b, merge them using the comparison function, and write them into dest.
1449 * Arguments:
1450 * dest write the rows here
1451 * a,b the rows being merged
1452 * an,bn the lenth of a and b respectively.
1453 * dest_db We need the dest_db to run the comparison function.
1454 * compare We need the compare function for the dest_db.
1455 */
1456{
1457 if (an + bn < 10000) {
1458 return merge_row_arrays_base(dest, a, an, b, bn, which_db, dest_db, compare, bl, rowset);
1459 }
1460 if (an < bn) {
1461 SWAP(struct row *,a, b)
1462 SWAP(int ,an,bn)
1463 }
1464 // an >= bn
1465 int a2 = an/2;
1466 DBT akey = make_dbt(rowset->data+a[a2].off, a[a2].klen);
1467 int b2 = 0; // initialize to zero so we can add the answer in.
1468 {
1469 int r = binary_search(&b2, &akey, b, bn, 0, which_db, dest_db, compare, bl, rowset);
1470 if (r!=0) return r; // for example if we found a duplicate, called the error_callback, and now we return an error code.
1471 }
1472 int ra, rb;
1473 ra = merge_row_arrays(dest, a, a2, b, b2, which_db, dest_db, compare, bl, rowset);
1474 rb = merge_row_arrays(dest+a2+b2, a+a2, an-a2, b+b2, bn-b2, which_db, dest_db, compare, bl, rowset);
1475 if (ra!=0) return ra;
1476 else return rb;
1477}
1478
1479int mergesort_row_array (struct row rows[/*n*/], int n, int which_db, DB *dest_db, ft_compare_func compare, FTLOADER bl, struct rowset *rowset)
1480/* Sort an array of rows (using mergesort).
1481 * Arguments:
1482 * rows sort this array of rows.
1483 * n the length of the array.
1484 * dest_db used by the comparison function.
1485 * compare the compare function
1486 */
1487{
1488 if (n<=1) return 0; // base case is sorted
1489 int mid = n/2;
1490 int r1, r2;
1491 r1 = mergesort_row_array (rows, mid, which_db, dest_db, compare, bl, rowset);
1492
1493 // Don't spawn this one explicitly
1494 r2 = mergesort_row_array (rows+mid, n-mid, which_db, dest_db, compare, bl, rowset);
1495
1496 if (r1!=0) return r1;
1497 if (r2!=0) return r2;
1498
1499 struct row *MALLOC_N(n, tmp);
1500 if (tmp == NULL) return get_error_errno();
1501 {
1502 int r = merge_row_arrays(tmp, rows, mid, rows+mid, n-mid, which_db, dest_db, compare, bl, rowset);
1503 if (r!=0) {
1504 toku_free(tmp);
1505 return r;
1506 }
1507 }
1508 memcpy(rows, tmp, sizeof(*tmp)*n);
1509 toku_free(tmp);
1510 return 0;
1511}
1512
1513// C function for testing mergesort_row_array
1514int ft_loader_mergesort_row_array (struct row rows[/*n*/], int n, int which_db, DB *dest_db, ft_compare_func compare, FTLOADER bl, struct rowset *rowset) {
1515 return mergesort_row_array (rows, n, which_db, dest_db, compare, bl, rowset);
1516}
1517
1518static int sort_rows (struct rowset *rows, int which_db, DB *dest_db, ft_compare_func compare,
1519 FTLOADER bl)
1520/* Effect: Sort a collection of rows.
1521 * If any duplicates are found, then call the error_callback function and return non zero.
1522 * Otherwise return 0.
1523 * Arguments:
1524 * rowset the */
1525{
1526 return mergesort_row_array(rows->rows, rows->n_rows, which_db, dest_db, compare, bl, rows);
1527}
1528
1529/* filesets Maintain a collection of files. Typically these files are each individually sorted, and we will merge them.
1530 * These files have two parts, one is for the data rows, and the other is a collection of offsets so we an more easily parallelize the manipulation (e.g., by allowing us to find the offset of the ith row quickly). */
1531
1532void init_merge_fileset (struct merge_fileset *fs)
1533/* Effect: Initialize a fileset */
1534{
1535 fs->have_sorted_output = false;
1536 fs->sorted_output = FIDX_NULL;
1537 fs->prev_key = zero_dbt;
1538 fs->prev_key.flags = DB_DBT_REALLOC;
1539
1540 fs->n_temp_files = 0;
1541 fs->n_temp_files_limit = 0;
1542 fs->data_fidxs = NULL;
1543}
1544
1545void destroy_merge_fileset (struct merge_fileset *fs)
1546/* Effect: Destroy a fileset. */
1547{
1548 if ( fs ) {
1549 toku_destroy_dbt(&fs->prev_key);
1550 fs->n_temp_files = 0;
1551 fs->n_temp_files_limit = 0;
1552 toku_free(fs->data_fidxs);
1553 fs->data_fidxs = NULL;
1554 }
1555}
1556
1557
1558static int extend_fileset (FTLOADER bl, struct merge_fileset *fs, FIDX*ffile)
1559/* Effect: Add two files (one for data and one for idx) to the fileset.
1560 * Arguments:
1561 * bl the ft_loader (needed to panic if anything goes wrong, and also to get the temp_file_template.
1562 * fs the fileset
1563 * ffile the data file (which will be open)
1564 * fidx the index file (which will be open)
1565 */
1566{
1567 FIDX sfile;
1568 int r;
1569 r = ft_loader_open_temp_file(bl, &sfile); if (r!=0) return r;
1570
1571 if (fs->n_temp_files+1 > fs->n_temp_files_limit) {
1572 fs->n_temp_files_limit = (fs->n_temp_files+1)*2;
1573 XREALLOC_N(fs->n_temp_files_limit, fs->data_fidxs);
1574 }
1575 fs->data_fidxs[fs->n_temp_files] = sfile;
1576 fs->n_temp_files++;
1577
1578 *ffile = sfile;
1579 return 0;
1580}
1581
1582// RFP maybe this should be buried in the ft_loader struct
1583static toku_mutex_t update_progress_lock = TOKU_MUTEX_INITIALIZER;
1584
1585static int update_progress (int N,
1586 FTLOADER bl,
1587 const char *UU(message))
1588{
1589 // Must protect the increment and the call to the poll_function.
1590 toku_mutex_lock(&update_progress_lock);
1591 bl->progress+=N;
1592
1593 int result;
1594 if (bl->progress_callback_result == 0) {
1595 //printf(" %20s: %d ", message, bl->progress);
1596 result = ft_loader_call_poll_function(&bl->poll_callback, (float)bl->progress/(float)PROGRESS_MAX);
1597 if (result!=0) {
1598 bl->progress_callback_result = result;
1599 }
1600 } else {
1601 result = bl->progress_callback_result;
1602 }
1603 toku_mutex_unlock(&update_progress_lock);
1604 return result;
1605}
1606
1607
1608static int write_rowset_to_file (FTLOADER bl, FIDX sfile, const struct rowset rows) {
1609 int r = 0;
1610 // Allocate a buffer if we're compressing intermediates.
1611 char *uncompressed_buffer = nullptr;
1612 if (bl->compress_intermediates) {
1613 MALLOC_N(MAX_UNCOMPRESSED_BUF, uncompressed_buffer);
1614 if (uncompressed_buffer == nullptr) {
1615 return ENOMEM;
1616 }
1617 }
1618 struct wbuf wb;
1619 wbuf_init(&wb, uncompressed_buffer, MAX_UNCOMPRESSED_BUF);
1620
1621 TOKU_FILE *sstream = toku_bl_fidx2file(bl, sfile);
1622 for (size_t i = 0; i < rows.n_rows; i++) {
1623 DBT skey = make_dbt(rows.data + rows.rows[i].off, rows.rows[i].klen);
1624 DBT sval = make_dbt(rows.data + rows.rows[i].off + rows.rows[i].klen,
1625 rows.rows[i].vlen);
1626
1627 uint64_t soffset=0; // don't really need this.
1628 r = loader_write_row(&skey, &sval, sfile, sstream, &soffset, &wb, bl);
1629 if (r != 0) {
1630 goto exit;
1631 }
1632 }
1633
1634 if (bl->compress_intermediates && wb.ndone > 0) {
1635 r = bl_finish_compressed_write(sstream, &wb);
1636 if (r != 0) {
1637 goto exit;
1638 }
1639 }
1640 r = 0;
1641exit:
1642 if (uncompressed_buffer) {
1643 toku_free(uncompressed_buffer);
1644 }
1645 return r;
1646}
1647
1648
1649int sort_and_write_rows (struct rowset rows, struct merge_fileset *fs, FTLOADER bl, int which_db, DB *dest_db, ft_compare_func compare)
1650/* Effect: Given a rowset, sort it and write it to a temporary file.
1651 * Note: The loader maintains for each index the most recently written-to file, as well as the DBT for the last key written into that file.
1652 * If this rowset is sorted and all greater than that dbt, then we append to the file (skipping the sort, and reducing the number of temporary files).
1653 * Arguments:
1654 * rows the rowset
1655 * fs the fileset into which the sorted data will be added
1656 * bl the ft_loader
1657 * dest_db the DB, needed for the comparison function.
1658 * compare The comparison function.
1659 * Returns 0 on success, otherwise an error number.
1660 * Destroy the rowset after finishing it.
1661 * Note: There is no sense in trying to calculate progress by this function since it's done concurrently with the loader->put operation.
1662 * Note first time called: invariant: fs->have_sorted_output == false
1663 */
1664{
1665 //printf(" sort_and_write use %d progress=%d fin at %d\n", progress_allocation, bl->progress, bl->progress+progress_allocation);
1666
1667 // TODO: erase the files, and deal with all the cleanup on error paths
1668 //printf("%s:%d sort_rows n_rows=%ld\n", __FILE__, __LINE__, rows->n_rows);
1669 //bl_time_t before_sort = bl_time_now();
1670
1671 int result;
1672 if (rows.n_rows == 0) {
1673 result = 0;
1674 } else {
1675 result = sort_rows(&rows, which_db, dest_db, compare, bl);
1676
1677 //bl_time_t after_sort = bl_time_now();
1678
1679 if (result == 0) {
1680 DBT min_rowset_key = make_dbt(rows.data+rows.rows[0].off, rows.rows[0].klen);
1681 if (fs->have_sorted_output && compare(dest_db, &fs->prev_key, &min_rowset_key) < 0) {
1682 // write everything to the same output if the max key in the temp file (prev_key) is < min of the sorted rowset
1683 result = write_rowset_to_file(bl, fs->sorted_output, rows);
1684 if (result == 0) {
1685 // set the max key in the temp file to the max key in the sorted rowset
1686 result = toku_dbt_set(rows.rows[rows.n_rows-1].klen, rows.data + rows.rows[rows.n_rows-1].off, &fs->prev_key, NULL);
1687 }
1688 } else {
1689 // write the sorted rowset into a new temp file
1690 if (fs->have_sorted_output) {
1691 fs->have_sorted_output = false;
1692 result = ft_loader_fi_close(&bl->file_infos, fs->sorted_output, true);
1693 }
1694 if (result == 0) {
1695 FIDX sfile = FIDX_NULL;
1696 result = extend_fileset(bl, fs, &sfile);
1697 if (result == 0) {
1698 result = write_rowset_to_file(bl, sfile, rows);
1699 if (result == 0) {
1700 fs->have_sorted_output = true; fs->sorted_output = sfile;
1701 // set the max key in the temp file to the max key in the sorted rowset
1702 result = toku_dbt_set(rows.rows[rows.n_rows-1].klen, rows.data + rows.rows[rows.n_rows-1].off, &fs->prev_key, NULL);
1703 }
1704 }
1705 }
1706 // Note: if result == 0 then invariant fs->have_sorted_output == true
1707 }
1708 }
1709 }
1710
1711 destroy_rowset(&rows);
1712
1713 //bl_time_t after_write = bl_time_now();
1714
1715 return result;
1716}
1717
1718// C function for testing sort_and_write_rows
1719int ft_loader_sort_and_write_rows (struct rowset *rows, struct merge_fileset *fs, FTLOADER bl, int which_db, DB *dest_db, ft_compare_func compare) {
1720 return sort_and_write_rows (*rows, fs, bl, which_db, dest_db, compare);
1721}
1722
1723int toku_merge_some_files_using_dbufio(const bool to_q,
1724 FIDX dest_data,
1725 QUEUE q,
1726 int n_sources,
1727 DBUFIO_FILESET bfs,
1728 FIDX srcs_fidxs[/*n_sources*/],
1729 FTLOADER bl,
1730 int which_db,
1731 DB *dest_db,
1732 ft_compare_func compare,
1733 int progress_allocation)
1734/* Effect: Given an array of FILE*'s each containing sorted, merge the data and
1735 * write it to an output. All the files remain open after the merge.
1736 * This merge is performed in one pass, so don't pass too many files in. If
1737 * you need a tree of merges do it elsewhere.
1738 * If TO_Q is true then we write rowsets into queue Q. Otherwise we write
1739 * into dest_data.
1740 * Modifies: May modify the arrays of files (but if modified, it must be a
1741 * permutation so the caller can use that array to close everything.)
1742 * Requires: The number of sources is at least one, and each of the input files
1743 * must have at least one row in it.
1744 * Arguments:
1745 * to_q boolean indicating that output is queue (true) or a file
1746 * (false)
1747 * dest_data where to write the sorted data
1748 * q where to write the sorted data
1749 * n_sources how many source files.
1750 * srcs_data the array of source data files.
1751 * bl the ft_loader.
1752 * dest_db the destination DB (used in the comparison function).
1753 * Return value: 0 on success, otherwise an error number.
1754 * The fidxs are not closed by this function.
1755 */
1756{
1757 int result = 0;
1758
1759 TOKU_FILE *dest_stream = to_q ? nullptr : toku_bl_fidx2file(bl, dest_data);
1760
1761 // printf(" merge_some_files progress=%d fin at %d\n", bl->progress,
1762 // bl->progress+progress_allocation);
1763 DBT keys[n_sources];
1764 DBT vals[n_sources];
1765 uint64_t dataoff[n_sources];
1766 DBT zero = zero_dbt; zero.flags=DB_DBT_REALLOC;
1767
1768 for (int i=0; i<n_sources; i++) {
1769 keys[i] = vals[i] = zero; // fill these all in with zero so we can delete stuff more reliably.
1770 }
1771
1772 pqueue_t *pq = NULL;
1773 pqueue_node_t *MALLOC_N(n_sources, pq_nodes); // freed in cleanup
1774 if (pq_nodes == NULL) { result = get_error_errno(); }
1775
1776 if (result==0) {
1777 int r = pqueue_init(&pq, n_sources, which_db, dest_db, compare, &bl->error_callback);
1778 if (r!=0) result = r;
1779 }
1780
1781 uint64_t n_rows = 0;
1782 if (result==0) {
1783 // load pqueue with first value from each source
1784 for (int i=0; i<n_sources; i++) {
1785 int r = loader_read_row_from_dbufio(bfs, i, &keys[i], &vals[i]);
1786 if (r==EOF) continue; // if the file is empty, don't initialize the pqueue.
1787 if (r!=0) {
1788 result = r;
1789 break;
1790 }
1791
1792 pq_nodes[i].key = &keys[i];
1793 pq_nodes[i].val = &vals[i];
1794 pq_nodes[i].i = i;
1795 r = pqueue_insert(pq, &pq_nodes[i]);
1796 if (r!=0) {
1797 result = r;
1798 // path tested by loader-dup-test5.tdbrun
1799 // printf("%s:%d returning\n", __FILE__, __LINE__);
1800 break;
1801 }
1802
1803 dataoff[i] = 0;
1804 toku_mutex_lock(&bl->file_infos.lock);
1805 n_rows += bl->file_infos.file_infos[srcs_fidxs[i].idx].n_rows;
1806 toku_mutex_unlock(&bl->file_infos.lock);
1807 }
1808 }
1809 uint64_t n_rows_done = 0;
1810
1811 struct rowset *output_rowset = NULL;
1812 if (result==0 && to_q) {
1813 XMALLOC(output_rowset); // freed in cleanup
1814 int r = init_rowset(output_rowset, memory_per_rowset_during_merge(bl, n_sources, to_q));
1815 if (r!=0) result = r;
1816 }
1817
1818 // Allocate a buffer if we're compressing intermediates.
1819 char *uncompressed_buffer = nullptr;
1820 struct wbuf wb;
1821 if (bl->compress_intermediates && !to_q) {
1822 MALLOC_N(MAX_UNCOMPRESSED_BUF, uncompressed_buffer);
1823 if (uncompressed_buffer == nullptr) {
1824 result = ENOMEM;
1825 }
1826 }
1827 wbuf_init(&wb, uncompressed_buffer, MAX_UNCOMPRESSED_BUF);
1828
1829 //printf(" n_rows=%ld\n", n_rows);
1830 while (result==0 && pqueue_size(pq)>0) {
1831 int mini;
1832 {
1833 // get the minimum
1834 pqueue_node_t *node;
1835 int r = pqueue_pop(pq, &node);
1836 if (r!=0) {
1837 result = r;
1838 invariant(0);
1839 break;
1840 }
1841 mini = node->i;
1842 }
1843 if (to_q) {
1844 if (row_wont_fit(output_rowset, keys[mini].size + vals[mini].size)) {
1845 {
1846 int r = toku_queue_enq(q, (void*)output_rowset, 1, NULL);
1847 if (r!=0) {
1848 result = r;
1849 break;
1850 }
1851 }
1852 XMALLOC(output_rowset); // freed in cleanup
1853 {
1854 int r = init_rowset(output_rowset, memory_per_rowset_during_merge(bl, n_sources, to_q));
1855 if (r!=0) {
1856 result = r;
1857 break;
1858 }
1859 }
1860 }
1861 {
1862 int r = add_row(output_rowset, &keys[mini], &vals[mini]);
1863 if (r!=0) {
1864 result = r;
1865 break;
1866 }
1867 }
1868 } else {
1869 // write it to the dest file
1870 int r = loader_write_row(&keys[mini], &vals[mini], dest_data, dest_stream, &dataoff[mini], &wb, bl);
1871 if (r!=0) {
1872 result = r;
1873 break;
1874 }
1875 }
1876
1877 {
1878 // read next row from file that just sourced min value
1879 int r = loader_read_row_from_dbufio(bfs, mini, &keys[mini], &vals[mini]);
1880 if (r!=0) {
1881 if (r==EOF) {
1882 // on feof, queue size permanently smaller
1883 toku_free(keys[mini].data); keys[mini].data = NULL;
1884 toku_free(vals[mini].data); vals[mini].data = NULL;
1885 } else {
1886 fprintf(stderr, "%s:%d r=%d errno=%d bfs=%p mini=%d\n", __FILE__, __LINE__, r, get_maybe_error_errno(), bfs, mini);
1887 dbufio_print(bfs);
1888 result = r;
1889 break;
1890 }
1891 } else {
1892 // insert value into queue (re-populate queue)
1893 pq_nodes[mini].key = &keys[mini];
1894 r = pqueue_insert(pq, &pq_nodes[mini]);
1895 if (r!=0) {
1896 // Note: This error path tested by loader-dup-test1.tdbrun (and by loader-dup-test4)
1897 result = r;
1898 // printf("%s:%d returning\n", __FILE__, __LINE__);
1899 break;
1900 }
1901 }
1902 }
1903
1904 n_rows_done++;
1905 const uint64_t rows_per_report = size_factor*1024;
1906 if (n_rows_done%rows_per_report==0) {
1907 // need to update the progress.
1908 double fraction_of_remaining_we_just_did = (double)rows_per_report / (double)(n_rows - n_rows_done + rows_per_report);
1909 invariant(0<= fraction_of_remaining_we_just_did && fraction_of_remaining_we_just_did<=1);
1910 int progress_just_done = fraction_of_remaining_we_just_did * progress_allocation;
1911 progress_allocation -= progress_just_done;
1912 // ignore the result from update_progress here, we'll call update_progress again below, which will give us the nonzero result.
1913 int r = update_progress(progress_just_done, bl, "in file merge");
1914 if (0) printf("%s:%d Progress=%d\n", __FILE__, __LINE__, r);
1915 }
1916 }
1917 if (result == 0 && uncompressed_buffer != nullptr && wb.ndone > 0) {
1918 result = bl_finish_compressed_write(dest_stream, &wb);
1919 }
1920
1921 if (result==0 && to_q) {
1922 int r = toku_queue_enq(q, (void*)output_rowset, 1, NULL);
1923 if (r!=0)
1924 result = r;
1925 else
1926 output_rowset = NULL;
1927 }
1928
1929 // cleanup
1930 if (uncompressed_buffer) {
1931 toku_free(uncompressed_buffer);
1932 }
1933 for (int i=0; i<n_sources; i++) {
1934 toku_free(keys[i].data); keys[i].data = NULL;
1935 toku_free(vals[i].data); vals[i].data = NULL;
1936 }
1937 if (output_rowset) {
1938 destroy_rowset(output_rowset);
1939 toku_free(output_rowset);
1940 }
1941 if (pq) { pqueue_free(pq); pq=NULL; }
1942 toku_free(pq_nodes);
1943 {
1944 int r = update_progress(progress_allocation, bl, "end of merge_some_files");
1945 //printf("%s:%d Progress=%d\n", __FILE__, __LINE__, r);
1946 if (r!=0 && result==0) result = r;
1947 }
1948 return result;
1949}
1950
1951static int merge_some_files (const bool to_q, FIDX dest_data, QUEUE q, int n_sources, FIDX srcs_fidxs[/*n_sources*/], FTLOADER bl, int which_db, DB *dest_db, ft_compare_func compare, int progress_allocation)
1952{
1953 int result = 0;
1954 DBUFIO_FILESET bfs = NULL;
1955 int *MALLOC_N(n_sources, fds);
1956 if (fds == NULL)
1957 result = get_error_errno();
1958 if (result == 0) {
1959 for (int i = 0; i < n_sources; i++) {
1960 int r = fileno(
1961 toku_bl_fidx2file(bl, srcs_fidxs[i])->file); // we rely on the
1962 // fact that when
1963 // the files are
1964 // closed, the fd
1965 // is also closed.
1966 if (r == -1) {
1967 result = get_error_errno();
1968 break;
1969 }
1970 fds[i] = r;
1971 }
1972 }
1973 if (result==0) {
1974 int r = create_dbufio_fileset(&bfs, n_sources, fds,
1975 memory_per_rowset_during_merge(bl, n_sources, to_q), bl->compress_intermediates);
1976 if (r!=0) { result = r; }
1977 }
1978
1979 if (result==0) {
1980 int r = toku_merge_some_files_using_dbufio (to_q, dest_data, q, n_sources, bfs, srcs_fidxs, bl, which_db, dest_db, compare, progress_allocation);
1981 if (r!=0) { result = r; }
1982 }
1983
1984 if (bfs!=NULL) {
1985 if (result != 0)
1986 (void) panic_dbufio_fileset(bfs, result);
1987 int r = destroy_dbufio_fileset(bfs);
1988 if (r!=0 && result==0) result=r;
1989 bfs = NULL;
1990 }
1991 if (fds!=NULL) {
1992 toku_free(fds);
1993 fds = NULL;
1994 }
1995 return result;
1996}
1997
1998static int int_min (int a, int b)
1999{
2000 if (a<b) return a;
2001 else return b;
2002}
2003
2004static int n_passes (int N, int B) {
2005 int result = 0;
2006 while (N>1) {
2007 N = (N+B-1)/B;
2008 result++;
2009 }
2010 return result;
2011}
2012
2013int merge_files (struct merge_fileset *fs,
2014 FTLOADER bl,
2015 // These are needed for the comparison function and error callback.
2016 int which_db, DB *dest_db, ft_compare_func compare,
2017 int progress_allocation,
2018 // Write rowsets into this queue.
2019 QUEUE output_q
2020 )
2021/* Effect: Given a fileset, merge all the files writing all the answers into a queue.
2022 * All the files in fs, and any temporary files will be closed and unlinked (and the fileset will be empty)
2023 * Return value: 0 on success, otherwise an error number.
2024 * On error *fs will contain no open files. All the files (including any temporary files) will be closed and unlinked.
2025 * (however the fs will still need to be deallocated.)
2026 */
2027{
2028 //printf(" merge_files %d files\n", fs->n_temp_files);
2029 //printf(" merge_files use %d progress=%d fin at %d\n", progress_allocation, bl->progress, bl->progress+progress_allocation);
2030 const int final_mergelimit = (size_factor == 1) ? 4 : merge_fanin(bl, true); // try for a merge to the leaf level
2031 const int earlier_mergelimit = (size_factor == 1) ? 4 : merge_fanin(bl, false); // try for a merge at nonleaf.
2032 int n_passes_left = (fs->n_temp_files<=final_mergelimit)
2033 ? 1
2034 : 1+n_passes((fs->n_temp_files+final_mergelimit-1)/final_mergelimit, earlier_mergelimit);
2035 // printf("%d files, %d on last pass, %d on earlier passes, %d passes\n", fs->n_temp_files, final_mergelimit, earlier_mergelimit, n_passes_left);
2036 int result = 0;
2037 while (fs->n_temp_files > 0) {
2038 int progress_allocation_for_this_pass = progress_allocation/n_passes_left;
2039 progress_allocation -= progress_allocation_for_this_pass;
2040 //printf("%s:%d n_passes_left=%d progress_allocation_for_this_pass=%d\n", __FILE__, __LINE__, n_passes_left, progress_allocation_for_this_pass);
2041
2042 invariant(fs->n_temp_files>0);
2043 struct merge_fileset next_file_set;
2044 bool to_queue = (bool)(fs->n_temp_files <= final_mergelimit);
2045 init_merge_fileset(&next_file_set);
2046 while (fs->n_temp_files>0) {
2047 // grab some files and merge them.
2048 int n_to_merge = int_min(to_queue?final_mergelimit:earlier_mergelimit, fs->n_temp_files);
2049
2050 // We are about to do n_to_merge/n_temp_files of the remaining for this pass.
2051 int progress_allocation_for_this_subpass = progress_allocation_for_this_pass * (double)n_to_merge / (double)fs->n_temp_files;
2052 // printf("%s:%d progress_allocation_for_this_subpass=%d n_temp_files=%d b=%llu\n", __FILE__, __LINE__, progress_allocation_for_this_subpass, fs->n_temp_files, (long long unsigned) memory_per_rowset_during_merge(bl, n_to_merge, to_queue));
2053 progress_allocation_for_this_pass -= progress_allocation_for_this_subpass;
2054
2055 //printf("%s:%d merging\n", __FILE__, __LINE__);
2056 FIDX merged_data = FIDX_NULL;
2057
2058 FIDX *XMALLOC_N(n_to_merge, data_fidxs);
2059 for (int i=0; i<n_to_merge; i++) {
2060 data_fidxs[i] = FIDX_NULL;
2061 }
2062 for (int i=0; i<n_to_merge; i++) {
2063 int idx = fs->n_temp_files -1 -i;
2064 FIDX fidx = fs->data_fidxs[idx];
2065 result = ft_loader_fi_reopen(&bl->file_infos, fidx, "r");
2066 if (result) break;
2067 data_fidxs[i] = fidx;
2068 }
2069 if (result==0 && !to_queue) {
2070 result = extend_fileset(bl, &next_file_set, &merged_data);
2071 }
2072
2073 if (result==0) {
2074 result = merge_some_files(to_queue, merged_data, output_q, n_to_merge, data_fidxs, bl, which_db, dest_db, compare, progress_allocation_for_this_subpass);
2075 // if result!=0, fall through
2076 if (result==0) {
2077 /*nothing*/;// this is gratuitous, but we need something to give code coverage tools to help us know that it's important to distinguish between result==0 and result!=0
2078 }
2079 }
2080
2081 //printf("%s:%d merged\n", __FILE__, __LINE__);
2082 for (int i=0; i<n_to_merge; i++) {
2083 if (!fidx_is_null(data_fidxs[i])) {
2084 {
2085 int r = ft_loader_fi_close(&bl->file_infos, data_fidxs[i], true);
2086 if (r!=0 && result==0) result = r;
2087 }
2088 {
2089 int r = ft_loader_fi_unlink(&bl->file_infos, data_fidxs[i]);
2090 if (r!=0 && result==0) result = r;
2091 }
2092 data_fidxs[i] = FIDX_NULL;
2093 }
2094 }
2095
2096 fs->n_temp_files -= n_to_merge;
2097 if (!to_queue && !fidx_is_null(merged_data)) {
2098 int r = ft_loader_fi_close(&bl->file_infos, merged_data, true);
2099 if (r!=0 && result==0) result = r;
2100 }
2101 toku_free(data_fidxs);
2102
2103 if (result!=0) break;
2104 }
2105
2106 destroy_merge_fileset(fs);
2107 *fs = next_file_set;
2108
2109 // Update the progress
2110 n_passes_left--;
2111
2112 if (result==0) { invariant(progress_allocation_for_this_pass==0); }
2113
2114 if (result!=0) break;
2115 }
2116 if (result) ft_loader_set_panic(bl, result, true, which_db, nullptr, nullptr);
2117
2118 {
2119 int r = toku_queue_eof(output_q);
2120 if (r!=0 && result==0) result = r;
2121 }
2122 // It's conceivable that the progress_allocation could be nonzero (for example if bl->N==0)
2123 {
2124 int r = update_progress(progress_allocation, bl, "did merge_files");
2125 if (r!=0 && result==0) result = r;
2126 }
2127 return result;
2128}
2129
2130struct subtree_info {
2131 int64_t block;
2132};
2133
2134struct subtrees_info {
2135 int64_t next_free_block;
2136 int64_t n_subtrees; // was n_blocks
2137 int64_t n_subtrees_limit;
2138 struct subtree_info *subtrees;
2139};
2140
2141static void subtrees_info_init(struct subtrees_info *p) {
2142 p->next_free_block = p->n_subtrees = p->n_subtrees_limit = 0;
2143 p->subtrees = NULL;
2144}
2145
2146static void subtrees_info_destroy(struct subtrees_info *p) {
2147 toku_free(p->subtrees);
2148 p->subtrees = NULL;
2149}
2150
2151static void allocate_node (struct subtrees_info *sts, int64_t b) {
2152 if (sts->n_subtrees >= sts->n_subtrees_limit) {
2153 sts->n_subtrees_limit *= 2;
2154 XREALLOC_N(sts->n_subtrees_limit, sts->subtrees);
2155 }
2156 sts->subtrees[sts->n_subtrees].block = b;
2157 sts->n_subtrees++;
2158}
2159
2160// dbuf will always contained 512-byte aligned buffer, but the length might not be a multiple of 512 bytes. If that's what you want, then pad it.
2161struct dbuf {
2162 unsigned char *buf;
2163 int buflen;
2164 int off;
2165 int error;
2166};
2167
2168struct leaf_buf {
2169 BLOCKNUM blocknum;
2170 TXNID xid;
2171 uint64_t nkeys, ndata, dsize;
2172 FTNODE node;
2173 XIDS xids;
2174 uint64_t off;
2175};
2176
2177struct translation {
2178 int64_t off, size;
2179};
2180
2181struct dbout {
2182 int fd;
2183 toku_off_t current_off;
2184
2185 int64_t n_translations;
2186 int64_t n_translations_limit;
2187 struct translation *translation;
2188 toku_mutex_t mutex;
2189 FT ft;
2190};
2191
2192static inline void dbout_init(struct dbout *out, FT ft) {
2193 out->fd = -1;
2194 out->current_off = 0;
2195 out->n_translations = out->n_translations_limit = 0;
2196 out->translation = NULL;
2197 toku_mutex_init(*loader_out_mutex_key, &out->mutex, nullptr);
2198 out->ft = ft;
2199}
2200
2201static inline void dbout_destroy(struct dbout *out) {
2202 if (out->fd >= 0) {
2203 toku_os_close(out->fd);
2204 out->fd = -1;
2205 }
2206 toku_free(out->translation);
2207 out->translation = NULL;
2208 toku_mutex_destroy(&out->mutex);
2209}
2210
2211static inline void dbout_lock(struct dbout *out) {
2212 toku_mutex_lock(&out->mutex);
2213}
2214
2215static inline void dbout_unlock(struct dbout *out) {
2216 toku_mutex_unlock(&out->mutex);
2217}
2218
2219static void seek_align_locked(struct dbout *out) {
2220 toku_off_t old_current_off = out->current_off;
2221 int alignment = 4096;
2222 out->current_off += alignment-1;
2223 out->current_off &= ~(alignment-1);
2224 toku_off_t r = lseek(out->fd, out->current_off, SEEK_SET);
2225 invariant(r==out->current_off);
2226 invariant(out->current_off >= old_current_off);
2227 invariant(out->current_off < old_current_off+alignment);
2228 invariant(out->current_off % alignment == 0);
2229}
2230
2231static void seek_align(struct dbout *out) {
2232 dbout_lock(out);
2233 seek_align_locked(out);
2234 dbout_unlock(out);
2235}
2236
2237static void dbuf_init (struct dbuf *dbuf) {
2238 dbuf->buf = 0;
2239 dbuf->buflen = 0;
2240 dbuf->off = 0;
2241 dbuf->error = 0;
2242}
2243
2244static void dbuf_destroy (struct dbuf *dbuf) {
2245 toku_free(dbuf->buf); dbuf->buf = NULL;
2246}
2247
2248static int allocate_block (struct dbout *out, int64_t *ret_block_number)
2249// Return the new block number
2250{
2251 int result = 0;
2252 dbout_lock(out);
2253 int64_t block_number = out->n_translations;
2254 if (block_number >= out->n_translations_limit) {
2255 int64_t old_n_translations_limit = out->n_translations_limit;
2256 struct translation *old_translation = out->translation;
2257 if (out->n_translations_limit==0) {
2258 out->n_translations_limit = 1;
2259 } else {
2260 out->n_translations_limit *= 2;
2261 }
2262 REALLOC_N(out->n_translations_limit, out->translation);
2263 if (out->translation == NULL) {
2264 result = get_error_errno();
2265 invariant(result);
2266 out->n_translations_limit = old_n_translations_limit;
2267 out->translation = old_translation;
2268 goto cleanup;
2269 }
2270 }
2271 out->n_translations++;
2272 *ret_block_number = block_number;
2273cleanup:
2274 dbout_unlock(out);
2275 return result;
2276}
2277
2278static void putbuf_bytes (struct dbuf *dbuf, const void *bytes, int nbytes) {
2279 if (!dbuf->error && dbuf->off + nbytes > dbuf->buflen) {
2280 unsigned char *oldbuf = dbuf->buf;
2281 int oldbuflen = dbuf->buflen;
2282 dbuf->buflen += dbuf->off + nbytes;
2283 dbuf->buflen *= 2;
2284 REALLOC_N_ALIGNED(512, dbuf->buflen, dbuf->buf);
2285 if (dbuf->buf == NULL) {
2286 dbuf->error = get_error_errno();
2287 dbuf->buf = oldbuf;
2288 dbuf->buflen = oldbuflen;
2289 }
2290 }
2291 if (!dbuf->error) {
2292 memcpy(dbuf->buf + dbuf->off, bytes, nbytes);
2293 dbuf->off += nbytes;
2294 }
2295}
2296
2297static void putbuf_int32 (struct dbuf *dbuf, int v) {
2298 putbuf_bytes(dbuf, &v, 4);
2299}
2300
2301static void putbuf_int64 (struct dbuf *dbuf, long long v) {
2302 putbuf_int32(dbuf, v>>32);
2303 putbuf_int32(dbuf, v&0xFFFFFFFF);
2304}
2305
2306static struct leaf_buf *start_leaf (struct dbout *out, const DESCRIPTOR UU(desc), int64_t lblocknum, TXNID xid, uint32_t UU(target_nodesize)) {
2307 invariant(lblocknum < out->n_translations_limit);
2308
2309 struct leaf_buf *XMALLOC(lbuf);
2310 lbuf->blocknum.b = lblocknum;
2311 lbuf->xid = xid;
2312 lbuf->nkeys = lbuf->ndata = lbuf->dsize = 0;
2313 lbuf->off = 0;
2314
2315 lbuf->xids = toku_xids_get_root_xids();
2316 if (xid != TXNID_NONE) {
2317 XIDS new_xids = NULL;
2318 int r = toku_xids_create_child(lbuf->xids, &new_xids, xid);
2319 assert(r == 0 && new_xids);
2320 toku_xids_destroy(&lbuf->xids);
2321 lbuf->xids = new_xids;
2322 }
2323
2324 FTNODE XMALLOC(node);
2325 toku_initialize_empty_ftnode(node, lbuf->blocknum, 0 /*height*/, 1 /*basement nodes*/, FT_LAYOUT_VERSION, 0);
2326 BP_STATE(node, 0) = PT_AVAIL;
2327 lbuf->node = node;
2328
2329 return lbuf;
2330}
2331
2332static void finish_leafnode(
2333 struct dbout* out,
2334 struct leaf_buf* lbuf,
2335 int progress_allocation,
2336 FTLOADER bl,
2337 uint32_t target_basementnodesize,
2338 enum toku_compression_method target_compression_method);
2339
2340static int write_nonleaves(
2341 FTLOADER bl,
2342 FIDX pivots_fidx,
2343 struct dbout* out,
2344 struct subtrees_info* sts,
2345 const DESCRIPTOR descriptor,
2346 uint32_t target_nodesize,
2347 uint32_t target_basementnodesize,
2348 enum toku_compression_method target_compression_method);
2349
2350static void add_pair_to_leafnode(
2351 struct leaf_buf* lbuf,
2352 unsigned char* key,
2353 int keylen,
2354 unsigned char* val,
2355 int vallen,
2356 int this_leafentry_size,
2357 STAT64INFO stats_to_update,
2358 int64_t* logical_rows_delta);
2359
2360static int write_translation_table(
2361 struct dbout* out,
2362 long long* off_of_translation_p);
2363
2364static int write_header(
2365 struct dbout* out,
2366 long long translation_location_on_disk,
2367 long long translation_size_on_disk);
2368
2369static void drain_writer_q(QUEUE q) {
2370 void *item;
2371 while (1) {
2372 int r = toku_queue_deq(q, &item, NULL, NULL);
2373 if (r == EOF)
2374 break;
2375 invariant(r == 0);
2376 struct rowset *rowset = (struct rowset *) item;
2377 destroy_rowset(rowset);
2378 toku_free(rowset);
2379 }
2380}
2381
2382static void cleanup_maxkey(DBT *maxkey) {
2383 if (maxkey->flags == DB_DBT_REALLOC) {
2384 toku_free(maxkey->data);
2385 maxkey->data = NULL;
2386 maxkey->flags = 0;
2387 }
2388}
2389
2390static void update_maxkey(DBT *maxkey, DBT *key) {
2391 cleanup_maxkey(maxkey);
2392 *maxkey = *key;
2393}
2394
2395static int copy_maxkey(DBT *maxkey) {
2396 DBT newkey;
2397 toku_init_dbt_flags(&newkey, DB_DBT_REALLOC);
2398 int r = toku_dbt_set(maxkey->size, maxkey->data, &newkey, NULL);
2399 if (r == 0)
2400 update_maxkey(maxkey, &newkey);
2401 return r;
2402}
2403
2404static int toku_loader_write_ft_from_q (FTLOADER bl,
2405 const DESCRIPTOR descriptor,
2406 int fd, // write to here
2407 int progress_allocation,
2408 QUEUE q,
2409 uint64_t total_disksize_estimate,
2410 int which_db,
2411 uint32_t target_nodesize,
2412 uint32_t target_basementnodesize,
2413 enum toku_compression_method target_compression_method,
2414 uint32_t target_fanout)
2415// Effect: Consume a sequence of rowsets work from a queue, creating a fractal tree. Closes fd.
2416{
2417 // set the number of fractal tree writer threads so that we can partition memory in the merger
2418 ft_loader_set_fractal_workers_count(bl);
2419
2420 int result = 0;
2421 int r;
2422
2423 // The pivots file will contain all the pivot strings (in the form <size(32bits)> <data>)
2424 // The pivots_fname is the name of the pivots file.
2425 // Note that the pivots file will have one extra pivot in it (the last key in the dictionary) which will not appear in the tree.
2426 int64_t n_pivots=0; // number of pivots in pivots_file
2427 FIDX pivots_file; // the file
2428
2429 r = ft_loader_open_temp_file (bl, &pivots_file);
2430 if (r) {
2431 result = r;
2432 drain_writer_q(q);
2433 r = toku_os_close(fd);
2434 assert_zero(r);
2435 return result;
2436 }
2437 TOKU_FILE *pivots_stream = toku_bl_fidx2file(bl, pivots_file);
2438
2439 TXNID root_xid_that_created = TXNID_NONE;
2440 if (bl->root_xids_that_created)
2441 root_xid_that_created = bl->root_xids_that_created[which_db];
2442
2443 // TODO: (Zardosht/Yoni/Leif), do this code properly
2444 struct ft ft;
2445 toku_ft_init(&ft, (BLOCKNUM){0}, bl->load_lsn, root_xid_that_created, target_nodesize, target_basementnodesize, target_compression_method, target_fanout);
2446
2447 struct dbout out;
2448 ZERO_STRUCT(out);
2449 dbout_init(&out, &ft);
2450 out.fd = fd;
2451 out.current_off = 8192; // leave 8K reserved at beginning
2452 out.n_translations = 3; // 3 translations reserved at the beginning
2453 out.n_translations_limit = 4;
2454 MALLOC_N(out.n_translations_limit, out.translation);
2455 if (out.translation == NULL) {
2456 result = get_error_errno();
2457 dbout_destroy(&out);
2458 drain_writer_q(q);
2459 toku_free(ft.h);
2460 return result;
2461 }
2462
2463 // The blocks_array will contain all the block numbers that correspond to the pivots. Generally there should be one more block than pivot.
2464 struct subtrees_info sts;
2465 subtrees_info_init(&sts);
2466 sts.next_free_block = 3;
2467 sts.n_subtrees = 0;
2468 sts.n_subtrees_limit = 1;
2469 MALLOC_N(sts.n_subtrees_limit, sts.subtrees);
2470 if (sts.subtrees == NULL) {
2471 result = get_error_errno();
2472 subtrees_info_destroy(&sts);
2473 dbout_destroy(&out);
2474 drain_writer_q(q);
2475 toku_free(ft.h);
2476 return result;
2477 }
2478
2479 out.translation[0].off = -2LL; out.translation[0].size = 0; // block 0 is NULL
2480 invariant(1==RESERVED_BLOCKNUM_TRANSLATION);
2481 invariant(2==RESERVED_BLOCKNUM_DESCRIPTOR);
2482 out.translation[1].off = -1; // block 1 is the block translation, filled in later
2483 out.translation[2].off = -1; // block 2 is the descriptor
2484 seek_align(&out);
2485 int64_t lblock = 0; // make gcc --happy
2486 result = allocate_block(&out, &lblock);
2487 invariant(result == 0); // can not fail since translations reserved above
2488
2489 TXNID le_xid = leafentry_xid(bl, which_db);
2490 struct leaf_buf *lbuf = start_leaf(&out, descriptor, lblock, le_xid, target_nodesize);
2491 uint64_t n_rows_remaining = bl->n_rows;
2492 uint64_t old_n_rows_remaining = bl->n_rows;
2493
2494 uint64_t used_estimate = 0; // how much diskspace have we used up?
2495
2496 DBT maxkey = make_dbt(0, 0); // keep track of the max key of the current node
2497
2498 STAT64INFO_S deltas = ZEROSTATS;
2499 // This is just a placeholder and not used in the loader, the real/accurate
2500 // stats will come out of 'deltas' because this loader is not pushing
2501 // messages down into the top of a fractal tree where the logical row count
2502 // is done, it is directly creating leaf entries so it must also take on
2503 // performing the logical row counting on its own
2504 int64_t logical_rows_delta = 0;
2505 while (result == 0) {
2506 void *item;
2507 {
2508 int rr = toku_queue_deq(q, &item, NULL, NULL);
2509 if (rr == EOF) break;
2510 if (rr != 0) {
2511 ft_loader_set_panic(bl, rr, true, which_db, nullptr, nullptr);
2512 break;
2513 }
2514 }
2515 struct rowset *output_rowset = (struct rowset *)item;
2516
2517 for (unsigned int i = 0; i < output_rowset->n_rows; i++) {
2518 DBT key = make_dbt(output_rowset->data+output_rowset->rows[i].off, output_rowset->rows[i].klen);
2519 DBT val = make_dbt(output_rowset->data+output_rowset->rows[i].off + output_rowset->rows[i].klen, output_rowset->rows[i].vlen);
2520
2521 size_t this_leafentry_size = ft_loader_leafentry_size(key.size, val.size, le_xid);
2522
2523 used_estimate += this_leafentry_size;
2524
2525 // Spawn off a node if
2526 // a) there is at least one row in it, and
2527 // b) this item would make the nodesize too big, or
2528 // c) the remaining amount won't fit in the current node and the current node's data is more than the remaining amount
2529 uint64_t remaining_amount = total_disksize_estimate - used_estimate;
2530 uint64_t used_here = lbuf->off + 1000; // leave 1000 for various overheads.
2531 uint64_t target_size = (target_nodesize*7L)/8; // use only 7/8 of the node.
2532 uint64_t used_here_with_next_key = used_here + this_leafentry_size;
2533 if (lbuf->nkeys > 0 &&
2534 ((used_here_with_next_key >= target_size) || (used_here + remaining_amount >= target_size && lbuf->off > remaining_amount))) {
2535
2536 int progress_this_node = progress_allocation * (double)(old_n_rows_remaining - n_rows_remaining)/(double)old_n_rows_remaining;
2537 progress_allocation -= progress_this_node;
2538 old_n_rows_remaining = n_rows_remaining;
2539
2540 allocate_node(&sts, lblock);
2541
2542 n_pivots++;
2543
2544 invariant(maxkey.data != NULL);
2545 if ((r = bl_write_dbt(&maxkey, pivots_stream, NULL, nullptr, bl))) {
2546 ft_loader_set_panic(bl, r, true, which_db, nullptr, nullptr);
2547 if (result == 0) result = r;
2548 break;
2549 }
2550
2551 finish_leafnode(&out, lbuf, progress_this_node, bl, target_basementnodesize, target_compression_method);
2552 lbuf = NULL;
2553
2554 r = allocate_block(&out, &lblock);
2555 if (r != 0) {
2556 ft_loader_set_panic(bl, r, true, which_db, nullptr, nullptr);
2557 if (result == 0) result = r;
2558 break;
2559 }
2560 lbuf = start_leaf(&out, descriptor, lblock, le_xid, target_nodesize);
2561 }
2562
2563 add_pair_to_leafnode(
2564 lbuf,
2565 (unsigned char*)key.data,
2566 key.size,
2567 (unsigned char*)val.data,
2568 val.size,
2569 this_leafentry_size,
2570 &deltas,
2571 &logical_rows_delta);
2572 n_rows_remaining--;
2573
2574 update_maxkey(&maxkey, &key); // set the new maxkey to the current key
2575 }
2576
2577 r = copy_maxkey(&maxkey); // make a copy of maxkey before the rowset is destroyed
2578 if (result == 0)
2579 result = r;
2580 destroy_rowset(output_rowset);
2581 toku_free(output_rowset);
2582
2583 if (result == 0)
2584 result = ft_loader_get_error(&bl->error_callback); // check if an error was posted and terminate this quickly
2585 }
2586
2587 if (deltas.numrows || deltas.numbytes) {
2588 toku_ft_update_stats(&ft.in_memory_stats, deltas);
2589 }
2590
2591 // As noted above, the loader directly creates a tree structure without
2592 // going through the higher level ft API and tus bypasses the logical row
2593 // counting performed at that level. So, we must manually update the logical
2594 // row count with the info we have from the physical delta that comes out of
2595 // add_pair_to_leafnode.
2596 toku_ft_adjust_logical_row_count(&ft, deltas.numrows);
2597
2598 cleanup_maxkey(&maxkey);
2599
2600 if (lbuf) {
2601 allocate_node(&sts, lblock);
2602 {
2603 int p = progress_allocation/2;
2604 finish_leafnode(&out, lbuf, p, bl, target_basementnodesize, target_compression_method);
2605 progress_allocation -= p;
2606 }
2607 }
2608
2609
2610 if (result == 0) {
2611 result = ft_loader_get_error(&bl->error_callback); // if there were any prior errors then exit
2612 }
2613
2614 if (result != 0) goto error;
2615
2616 // We haven't paniced, so the sum should add up.
2617 invariant(used_estimate == total_disksize_estimate);
2618
2619 n_pivots++;
2620
2621 {
2622 DBT key = make_dbt(0,0); // must write an extra DBT into the pivots file.
2623 r = bl_write_dbt(&key, pivots_stream, NULL, nullptr, bl);
2624 if (r) {
2625 result = r; goto error;
2626 }
2627 }
2628
2629 r = write_nonleaves(bl, pivots_file, &out, &sts, descriptor, target_nodesize, target_basementnodesize, target_compression_method);
2630 if (r) {
2631 result = r; goto error;
2632 }
2633
2634 {
2635 invariant(sts.n_subtrees==1);
2636 out.ft->h->root_blocknum = make_blocknum(sts.subtrees[0].block);
2637 toku_free(sts.subtrees); sts.subtrees = NULL;
2638
2639 // write the descriptor
2640 {
2641 seek_align(&out);
2642 invariant(out.n_translations >= RESERVED_BLOCKNUM_DESCRIPTOR);
2643 invariant(out.translation[RESERVED_BLOCKNUM_DESCRIPTOR].off == -1);
2644 out.translation[RESERVED_BLOCKNUM_DESCRIPTOR].off = out.current_off;
2645 size_t desc_size = 4+toku_serialize_descriptor_size(descriptor);
2646 invariant(desc_size>0);
2647 out.translation[RESERVED_BLOCKNUM_DESCRIPTOR].size = desc_size;
2648 struct wbuf wbuf;
2649 char *XMALLOC_N(desc_size, buf);
2650 wbuf_init(&wbuf, buf, desc_size);
2651 toku_serialize_descriptor_contents_to_wbuf(&wbuf, descriptor);
2652 uint32_t checksum = toku_x1764_finish(&wbuf.checksum);
2653 wbuf_int(&wbuf, checksum);
2654 invariant(wbuf.ndone==desc_size);
2655 r = toku_os_write(out.fd, wbuf.buf, wbuf.ndone);
2656 out.current_off += desc_size;
2657 toku_free(buf); // wbuf_destroy
2658 if (r) {
2659 result = r; goto error;
2660 }
2661 }
2662
2663 long long off_of_translation;
2664 r = write_translation_table(&out, &off_of_translation);
2665 if (r) {
2666 result = r; goto error;
2667 }
2668
2669 r = write_header(&out, off_of_translation, (out.n_translations+1)*16+4);
2670 if (r) {
2671 result = r; goto error;
2672 }
2673
2674 r = update_progress(progress_allocation, bl, "wrote tdb file");
2675 if (r) {
2676 result = r; goto error;
2677 }
2678 }
2679
2680 r = fsync(out.fd);
2681 if (r) {
2682 result = get_error_errno(); goto error;
2683 }
2684
2685 // Do we need to pay attention to user_said_stop? Or should the guy at the other end of the queue pay attention and send in an EOF.
2686
2687 error:
2688 {
2689 int rr = toku_os_close(fd);
2690 if (rr)
2691 result = get_error_errno();
2692 }
2693 out.fd = -1;
2694
2695 subtrees_info_destroy(&sts);
2696 dbout_destroy(&out);
2697 drain_writer_q(q);
2698 toku_free(ft.h);
2699
2700 return result;
2701}
2702
2703int toku_loader_write_ft_from_q_in_C (FTLOADER bl,
2704 const DESCRIPTOR descriptor,
2705 int fd, // write to here
2706 int progress_allocation,
2707 QUEUE q,
2708 uint64_t total_disksize_estimate,
2709 int which_db,
2710 uint32_t target_nodesize,
2711 uint32_t target_basementnodesize,
2712 enum toku_compression_method target_compression_method,
2713 uint32_t target_fanout)
2714// This is probably only for testing.
2715{
2716 target_nodesize = target_nodesize == 0 ? default_loader_nodesize : target_nodesize;
2717 target_basementnodesize = target_basementnodesize == 0 ? default_loader_basementnodesize : target_basementnodesize;
2718 return toku_loader_write_ft_from_q (bl, descriptor, fd, progress_allocation, q, total_disksize_estimate, which_db, target_nodesize, target_basementnodesize, target_compression_method, target_fanout);
2719}
2720
2721
2722static void* fractal_thread (void *ftav) {
2723 struct fractal_thread_args *fta = (struct fractal_thread_args *)ftav;
2724 int r = toku_loader_write_ft_from_q(fta->bl,
2725 fta->descriptor,
2726 fta->fd,
2727 fta->progress_allocation,
2728 fta->q,
2729 fta->total_disksize_estimate,
2730 fta->which_db,
2731 fta->target_nodesize,
2732 fta->target_basementnodesize,
2733 fta->target_compression_method,
2734 fta->target_fanout);
2735 fta->errno_result = r;
2736 toku_instr_delete_current_thread();
2737 return toku_pthread_done(nullptr);
2738}
2739
2740static int loader_do_i(FTLOADER bl,
2741 int which_db,
2742 DB *dest_db,
2743 ft_compare_func compare,
2744 const DESCRIPTOR descriptor,
2745 const char *new_fname,
2746 int progress_allocation // how much progress do I need
2747 // to add into bl->progress by
2748 // the end..
2749 )
2750/* Effect: Handle the file creating for one particular DB in the bulk loader. */
2751/* Requires: The data is fully extracted, so we can do merges out of files and
2752 write the ft file. */
2753{
2754 //printf("doing i use %d progress=%d fin at %d\n", progress_allocation, bl->progress, bl->progress+progress_allocation);
2755 struct merge_fileset *fs = &(bl->fs[which_db]);
2756 struct rowset *rows = &(bl->rows[which_db]);
2757 invariant(rows->data==NULL); // the rows should be all cleaned up already
2758
2759 int r = toku_queue_create(&bl->fractal_queues[which_db], FRACTAL_WRITER_QUEUE_DEPTH);
2760 if (r) goto error;
2761
2762 {
2763 mode_t mode = S_IRUSR + S_IWUSR + S_IRGRP + S_IWGRP;
2764 int fd = toku_os_open(new_fname,
2765 O_RDWR | O_CREAT | O_BINARY,
2766 mode,
2767 *tokudb_file_load_key); // #2621
2768 if (fd < 0) {
2769 r = get_error_errno();
2770 goto error;
2771 }
2772
2773 uint32_t target_nodesize, target_basementnodesize, target_fanout;
2774 enum toku_compression_method target_compression_method;
2775 r = dest_db->get_pagesize(dest_db, &target_nodesize);
2776 invariant_zero(r);
2777 r = dest_db->get_readpagesize(dest_db, &target_basementnodesize);
2778 invariant_zero(r);
2779 r = dest_db->get_compression_method(dest_db, &target_compression_method);
2780 invariant_zero(r);
2781 r = dest_db->get_fanout(dest_db, &target_fanout);
2782 invariant_zero(r);
2783
2784 if (bl->allow_puts) {
2785 // a better allocation would be to figure out roughly how many merge passes we'll need.
2786 int allocation_for_merge = (2*progress_allocation)/3;
2787 progress_allocation -= allocation_for_merge;
2788
2789 // This structure must stay live until the join below.
2790 struct fractal_thread_args fta = {bl,
2791 descriptor,
2792 fd,
2793 progress_allocation,
2794 bl->fractal_queues[which_db],
2795 bl->extracted_datasizes[which_db],
2796 0,
2797 which_db,
2798 target_nodesize,
2799 target_basementnodesize,
2800 target_compression_method,
2801 target_fanout};
2802
2803 r = toku_pthread_create(*fractal_thread_key,
2804 bl->fractal_threads + which_db,
2805 nullptr,
2806 fractal_thread,
2807 static_cast<void *>(&fta));
2808 if (r) {
2809 int r2 __attribute__((__unused__)) =
2810 toku_queue_destroy(bl->fractal_queues[which_db]);
2811 // ignore r2, since we already have an error
2812 bl->fractal_queues[which_db] = nullptr;
2813 goto error;
2814 }
2815 invariant(bl->fractal_threads_live[which_db]==false);
2816 bl->fractal_threads_live[which_db] = true;
2817
2818 r = merge_files(fs, bl, which_db, dest_db, compare, allocation_for_merge, bl->fractal_queues[which_db]);
2819
2820 {
2821 void *toku_pthread_retval;
2822 int r2 = toku_pthread_join(bl->fractal_threads[which_db], &toku_pthread_retval);
2823 invariant(fta.bl==bl); // this is a gratuitous assertion to make sure that the fta struct is still live here. A previous bug put that struct into a C block statement.
2824 resource_assert_zero(r2);
2825 invariant(toku_pthread_retval==NULL);
2826 invariant(bl->fractal_threads_live[which_db]);
2827 bl->fractal_threads_live[which_db] = false;
2828 if (r == 0) r = fta.errno_result;
2829 }
2830 } else {
2831 toku_queue_eof(bl->fractal_queues[which_db]);
2832 r = toku_loader_write_ft_from_q(bl, descriptor, fd, progress_allocation,
2833 bl->fractal_queues[which_db], bl->extracted_datasizes[which_db], which_db,
2834 target_nodesize, target_basementnodesize, target_compression_method, target_fanout);
2835 }
2836 }
2837
2838 error: // this is the cleanup code. Even if r==0 (no error) we fall through to here.
2839 if (bl->fractal_queues[which_db]) {
2840 int r2 = toku_queue_destroy(bl->fractal_queues[which_db]);
2841 invariant(r2==0);
2842 bl->fractal_queues[which_db] = nullptr;
2843 }
2844
2845 // if we get here we need to free up the merge_fileset and the rowset, as well as the keys
2846 toku_free(rows->data); rows->data = NULL;
2847 toku_free(rows->rows); rows->rows = NULL;
2848 toku_free(fs->data_fidxs); fs->data_fidxs = NULL;
2849 return r;
2850}
2851
2852static int toku_ft_loader_close_internal (FTLOADER bl)
2853/* Effect: Close the bulk loader.
2854 * Return all the file descriptors in the array fds. */
2855{
2856 int result = 0;
2857 if (bl->N == 0)
2858 result = update_progress(PROGRESS_MAX, bl, "done");
2859 else {
2860 int remaining_progress = PROGRESS_MAX;
2861 for (int i = 0; i < bl->N; i++) {
2862 // Take the unallocated progress and divide it among the unfinished jobs.
2863 // This calculation allocates all of the PROGRESS_MAX bits of progress to some job.
2864 int allocate_here = remaining_progress/(bl->N - i);
2865 remaining_progress -= allocate_here;
2866 char *fname_in_cwd = toku_cachetable_get_fname_in_cwd(bl->cachetable, bl->new_fnames_in_env[i]);
2867 result = loader_do_i(bl, i, bl->dbs[i], bl->bt_compare_funs[i], bl->descriptors[i], fname_in_cwd, allocate_here);
2868 toku_free(fname_in_cwd);
2869 if (result != 0)
2870 goto error;
2871 invariant(0 <= bl->progress && bl->progress <= PROGRESS_MAX);
2872 }
2873 if (result==0) invariant(remaining_progress==0);
2874
2875 // fsync the directory containing the new tokudb files.
2876 char *fname0 = toku_cachetable_get_fname_in_cwd(bl->cachetable, bl->new_fnames_in_env[0]);
2877 int r = toku_fsync_directory(fname0);
2878 toku_free(fname0);
2879 if (r != 0) {
2880 result = r; goto error;
2881 }
2882 }
2883 invariant(bl->file_infos.n_files_open == 0);
2884 invariant(bl->file_infos.n_files_extant == 0);
2885 invariant(bl->progress == PROGRESS_MAX);
2886 error:
2887 toku_ft_loader_internal_destroy(bl, (bool)(result!=0));
2888 return result;
2889}
2890
2891int toku_ft_loader_close (FTLOADER bl,
2892 ft_loader_error_func error_function, void *error_extra,
2893 ft_loader_poll_func poll_function, void *poll_extra
2894 )
2895{
2896 int result = 0;
2897
2898 int r;
2899
2900 //printf("Closing\n");
2901
2902 ft_loader_set_error_function(&bl->error_callback, error_function, error_extra);
2903
2904 ft_loader_set_poll_function(&bl->poll_callback, poll_function, poll_extra);
2905
2906 if (bl->extractor_live) {
2907 r = finish_extractor(bl);
2908 if (r)
2909 result = r;
2910 invariant(!bl->extractor_live);
2911 } else {
2912 r = finish_primary_rows(bl);
2913 if (r)
2914 result = r;
2915 }
2916
2917 // check for an error during extraction
2918 if (result == 0) {
2919 r = ft_loader_call_error_function(&bl->error_callback);
2920 if (r)
2921 result = r;
2922 }
2923
2924 if (result == 0) {
2925 r = toku_ft_loader_close_internal(bl);
2926 if (r && result == 0)
2927 result = r;
2928 } else
2929 toku_ft_loader_internal_destroy(bl, true);
2930
2931 return result;
2932}
2933
2934int toku_ft_loader_finish_extractor(FTLOADER bl) {
2935 int result = 0;
2936 if (bl->extractor_live) {
2937 int r = finish_extractor(bl);
2938 if (r)
2939 result = r;
2940 invariant(!bl->extractor_live);
2941 } else
2942 result = EINVAL;
2943 return result;
2944}
2945
2946int toku_ft_loader_abort(FTLOADER bl, bool is_error)
2947/* Effect : Abort the bulk loader, free ft_loader resources */
2948{
2949 int result = 0;
2950
2951 // cleanup the extractor thread
2952 if (bl->extractor_live) {
2953 int r = finish_extractor(bl);
2954 if (r)
2955 result = r;
2956 invariant(!bl->extractor_live);
2957 }
2958
2959 for (int i = 0; i < bl->N; i++)
2960 invariant(!bl->fractal_threads_live[i]);
2961
2962 toku_ft_loader_internal_destroy(bl, is_error);
2963 return result;
2964}
2965
2966int toku_ft_loader_get_error(FTLOADER bl, int *error) {
2967 *error = ft_loader_get_error(&bl->error_callback);
2968 return 0;
2969}
2970
2971static void add_pair_to_leafnode(
2972 struct leaf_buf* lbuf,
2973 unsigned char* key,
2974 int keylen,
2975 unsigned char* val,
2976 int vallen,
2977 int this_leafentry_size,
2978 STAT64INFO stats_to_update,
2979 int64_t* logical_rows_delta) {
2980
2981 lbuf->nkeys++;
2982 lbuf->ndata++;
2983 lbuf->dsize += keylen + vallen;
2984 lbuf->off += this_leafentry_size;
2985
2986 // append this key val pair to the leafnode
2987 // #3588 TODO just make a clean ule and append it to the omt
2988 // #3588 TODO can do the rebalancing here and avoid a lot of work later
2989 FTNODE leafnode = lbuf->node;
2990 uint32_t idx = BLB_DATA(leafnode, 0)->num_klpairs();
2991 DBT kdbt, vdbt;
2992 ft_msg msg(
2993 toku_fill_dbt(&kdbt, key, keylen),
2994 toku_fill_dbt(&vdbt, val, vallen),
2995 FT_INSERT,
2996 ZERO_MSN,
2997 lbuf->xids);
2998 uint64_t workdone = 0;
2999 // there's no mvcc garbage in a bulk-loaded FT, so there's no need to pass useful gc info
3000 txn_gc_info gc_info(nullptr, TXNID_NONE, TXNID_NONE, true);
3001 toku_ft_bn_apply_msg_once(
3002 BLB(leafnode, 0),
3003 msg,
3004 idx,
3005 keylen,
3006 NULL,
3007 &gc_info,
3008 &workdone,
3009 stats_to_update,
3010 logical_rows_delta);
3011}
3012
3013static int write_literal(struct dbout *out, void*data, size_t len) {
3014 invariant(out->current_off%4096==0);
3015 int result = toku_os_write(out->fd, data, len);
3016 if (result == 0)
3017 out->current_off+=len;
3018 return result;
3019}
3020
3021static void finish_leafnode(
3022 struct dbout* out,
3023 struct leaf_buf* lbuf,
3024 int progress_allocation,
3025 FTLOADER bl,
3026 uint32_t target_basementnodesize,
3027 enum toku_compression_method target_compression_method) {
3028
3029 int result = 0;
3030
3031 // serialize leaf to buffer
3032 size_t serialized_leaf_size = 0;
3033 size_t uncompressed_serialized_leaf_size = 0;
3034 char *serialized_leaf = NULL;
3035 FTNODE_DISK_DATA ndd = NULL;
3036 result = toku_serialize_ftnode_to_memory(
3037 lbuf->node,
3038 &ndd,
3039 target_basementnodesize,
3040 target_compression_method,
3041 true,
3042 true,
3043 &serialized_leaf_size,
3044 &uncompressed_serialized_leaf_size,
3045 &serialized_leaf);
3046
3047 // write it out
3048 if (result == 0) {
3049 dbout_lock(out);
3050 long long off_of_leaf = out->current_off;
3051 result = write_literal(out, serialized_leaf, serialized_leaf_size);
3052 if (result == 0) {
3053 out->translation[lbuf->blocknum.b].off = off_of_leaf;
3054 out->translation[lbuf->blocknum.b].size = serialized_leaf_size;
3055 seek_align_locked(out);
3056 }
3057 dbout_unlock(out);
3058 }
3059
3060 // free the node
3061 if (serialized_leaf) {
3062 toku_free(ndd);
3063 toku_free(serialized_leaf);
3064 }
3065 toku_ftnode_free(&lbuf->node);
3066 toku_xids_destroy(&lbuf->xids);
3067 toku_free(lbuf);
3068
3069 //printf("Nodewrite %d (%.1f%%):", progress_allocation, 100.0*progress_allocation/PROGRESS_MAX);
3070 if (result == 0)
3071 result = update_progress(progress_allocation, bl, "wrote node");
3072
3073 if (result)
3074 ft_loader_set_panic(bl, result, true, 0, nullptr, nullptr);
3075}
3076
3077static int write_translation_table (struct dbout *out, long long *off_of_translation_p) {
3078 seek_align(out);
3079 struct dbuf ttable;
3080 dbuf_init(&ttable);
3081 long long off_of_translation = out->current_off;
3082 long long bt_size_on_disk = out->n_translations * 16 + 20;
3083 putbuf_int64(&ttable, out->n_translations); // number of records
3084 putbuf_int64(&ttable, -1LL); // the linked list
3085 out->translation[1].off = off_of_translation;
3086 out->translation[1].size = bt_size_on_disk;
3087 for (int i=0; i<out->n_translations; i++) {
3088 putbuf_int64(&ttable, out->translation[i].off);
3089 putbuf_int64(&ttable, out->translation[i].size);
3090 }
3091 unsigned int checksum = toku_x1764_memory(ttable.buf, ttable.off);
3092 putbuf_int32(&ttable, checksum);
3093 // pad it to 512 zeros
3094 long long encoded_length = ttable.off;
3095 {
3096 int nbytes_to_add = roundup_to_multiple(512, ttable.off) - encoded_length;
3097 char zeros[nbytes_to_add];
3098 for (int i=0; i<nbytes_to_add; i++) zeros[i]=0;
3099 putbuf_bytes(&ttable, zeros, nbytes_to_add);
3100 }
3101 int result = ttable.error;
3102 if (result == 0) {
3103 invariant(bt_size_on_disk==encoded_length);
3104 result = toku_os_pwrite(out->fd, ttable.buf, ttable.off, off_of_translation);
3105 }
3106 dbuf_destroy(&ttable);
3107 *off_of_translation_p = off_of_translation;
3108 return result;
3109}
3110
3111static int write_header(
3112 struct dbout* out,
3113 long long translation_location_on_disk,
3114 long long translation_size_on_disk) {
3115
3116 int result = 0;
3117 size_t size = toku_serialize_ft_size(out->ft->h);
3118 size_t alloced_size = roundup_to_multiple(512, size);
3119 struct wbuf wbuf;
3120 char *MALLOC_N_ALIGNED(512, alloced_size, buf);
3121 if (buf == NULL) {
3122 result = get_error_errno();
3123 } else {
3124 wbuf_init(&wbuf, buf, size);
3125 out->ft->h->on_disk_stats = out->ft->in_memory_stats;
3126 out->ft->h->on_disk_logical_rows = out->ft->in_memory_logical_rows;
3127 toku_serialize_ft_to_wbuf(&wbuf, out->ft->h, translation_location_on_disk, translation_size_on_disk);
3128 for (size_t i=size; i<alloced_size; i++) buf[i]=0; // initialize all those unused spots to zero
3129 if (wbuf.ndone != size)
3130 result = EINVAL;
3131 else {
3132 assert(wbuf.ndone <= alloced_size);
3133 result = toku_os_pwrite(out->fd, wbuf.buf, alloced_size, 0);
3134 }
3135 toku_free(buf);
3136 }
3137 return result;
3138}
3139
3140static int read_some_pivots (FIDX pivots_file, int n_to_read, FTLOADER bl,
3141 /*out*/ DBT pivots[/*n_to_read*/])
3142// pivots is an array to be filled in. The pivots array is uninitialized.
3143{
3144 for (int i = 0; i < n_to_read; i++)
3145 pivots[i] = zero_dbt;
3146
3147 TOKU_FILE *pivots_stream = toku_bl_fidx2file(bl, pivots_file);
3148
3149 int result = 0;
3150 for (int i = 0; i < n_to_read; i++) {
3151 int r = bl_read_dbt(&pivots[i], pivots_stream);
3152 if (r != 0) {
3153 result = r;
3154 break;
3155 }
3156 }
3157 return result;
3158}
3159
3160static void delete_pivots(DBT pivots[], int n) {
3161 for (int i = 0; i < n; i++)
3162 toku_free(pivots[i].data);
3163 toku_free(pivots);
3164}
3165
3166static int setup_nonleaf_block (int n_children,
3167 struct subtrees_info *subtrees, FIDX pivots_file, int64_t first_child_offset_in_subtrees,
3168 struct subtrees_info *next_subtrees, FIDX next_pivots_file,
3169 struct dbout *out, FTLOADER bl,
3170 /*out*/int64_t *blocknum,
3171 /*out*/struct subtree_info **subtrees_info_p,
3172 /*out*/DBT **pivots_p)
3173// Do the serial part of setting up a non leaf block.
3174// Read the pivots out of the file, and store them in a newly allocated array of DBTs (returned in *pivots_p) There are (n_blocks_to_use-1) of these.
3175// Copy the final pivot into the next_pivots file instead of returning it.
3176// Copy the subtree_info from the subtrees structure, and store them in a newly allocated array of subtree_infos (return in *subtrees_info_p). There are n_blocks_to_use of these.
3177// Allocate a block number and return it in *blocknum.
3178// Store the blocknum in the next_blocks structure, so it can be combined with the pivots at the next level of the tree.
3179// Update n_blocks_used and n_translations.
3180// This code cannot be called in parallel because of all the race conditions.
3181// The actual creation of the node can be called in parallel after this work is done.
3182{
3183 //printf("Nonleaf has children :"); for(int i=0; i<n_children; i++) printf(" %ld", subtrees->subtrees[i].block); printf("\n");
3184
3185 int result = 0;
3186
3187 DBT *MALLOC_N(n_children, pivots);
3188 if (pivots == NULL) {
3189 result = get_error_errno();
3190 }
3191
3192 if (result == 0) {
3193 int r = read_some_pivots(pivots_file, n_children, bl, pivots);
3194 if (r)
3195 result = r;
3196 }
3197
3198 if (result == 0) {
3199 TOKU_FILE *next_pivots_stream = toku_bl_fidx2file(bl, next_pivots_file);
3200 int r = bl_write_dbt(
3201 &pivots[n_children - 1], next_pivots_stream, NULL, nullptr, bl);
3202 if (r)
3203 result = r;
3204 }
3205
3206 if (result == 0) {
3207 // The last pivot was written to the next_pivots file, so we free it now instead of returning it.
3208 toku_free(pivots[n_children-1].data);
3209 pivots[n_children-1] = zero_dbt;
3210
3211 struct subtree_info *XMALLOC_N(n_children, subtrees_array);
3212 for (int i = 0; i < n_children; i++) {
3213 int64_t from_blocknum = first_child_offset_in_subtrees + i;
3214 subtrees_array[i] = subtrees->subtrees[from_blocknum];
3215 }
3216
3217 int r = allocate_block(out, blocknum);
3218 if (r) {
3219 toku_free(subtrees_array);
3220 result = r;
3221 } else {
3222 allocate_node(next_subtrees, *blocknum);
3223
3224 *pivots_p = pivots;
3225 *subtrees_info_p = subtrees_array;
3226 }
3227 }
3228
3229 if (result != 0) {
3230 if (pivots) {
3231 delete_pivots(pivots, n_children); pivots = NULL;
3232 }
3233 }
3234
3235 return result;
3236}
3237
3238static void write_nonleaf_node (FTLOADER bl, struct dbout *out, int64_t blocknum_of_new_node, int n_children,
3239 DBT *pivots, /* must free this array, as well as the things it points t */
3240 struct subtree_info *subtree_info, int height, const DESCRIPTOR UU(desc), uint32_t UU(target_nodesize), uint32_t target_basementnodesize, enum toku_compression_method target_compression_method)
3241{
3242 //Nodes do not currently touch descriptors
3243 invariant(height > 0);
3244
3245 int result = 0;
3246
3247 FTNODE XMALLOC(node);
3248 toku_initialize_empty_ftnode(node, make_blocknum(blocknum_of_new_node), height, n_children,
3249 FT_LAYOUT_VERSION, 0);
3250 node->pivotkeys.create_from_dbts(pivots, n_children - 1);
3251 assert(node->bp);
3252 for (int i=0; i<n_children; i++) {
3253 BP_BLOCKNUM(node,i) = make_blocknum(subtree_info[i].block);
3254 BP_STATE(node,i) = PT_AVAIL;
3255 }
3256
3257 FTNODE_DISK_DATA ndd = NULL;
3258 if (result == 0) {
3259 size_t n_bytes;
3260 size_t n_uncompressed_bytes;
3261 char *bytes;
3262 int r;
3263 r = toku_serialize_ftnode_to_memory(node, &ndd, target_basementnodesize, target_compression_method, true, true, &n_bytes, &n_uncompressed_bytes, &bytes);
3264 if (r) {
3265 result = r;
3266 } else {
3267 dbout_lock(out);
3268 out->translation[blocknum_of_new_node].off = out->current_off;
3269 out->translation[blocknum_of_new_node].size = n_bytes;
3270 //fprintf(stderr, "Wrote internal node at %ld (%ld bytes)\n", out->current_off, n_bytes);
3271 //for (uint32_t i=0; i<n_bytes; i++) { unsigned char b = bytes[i]; printf("%d:%02x (%d) ('%c')\n", i, b, b, (b>=' ' && b<128) ? b : '*'); }
3272 r = write_literal(out, bytes, n_bytes);
3273 if (r)
3274 result = r;
3275 else
3276 seek_align_locked(out);
3277 dbout_unlock(out);
3278 toku_free(bytes);
3279 }
3280 }
3281
3282 for (int i=0; i<n_children-1; i++) {
3283 toku_free(pivots[i].data);
3284 }
3285 for (int i=0; i<n_children; i++) {
3286 destroy_nonleaf_childinfo(BNC(node,i));
3287 }
3288 toku_free(pivots);
3289 // TODO: Should be using toku_destroy_ftnode_internals, which should be renamed to toku_ftnode_destroy
3290 toku_free(node->bp);
3291 node->pivotkeys.destroy();
3292 toku_free(node);
3293 toku_free(ndd);
3294 toku_free(subtree_info);
3295
3296 if (result != 0)
3297 ft_loader_set_panic(bl, result, true, 0, nullptr, nullptr);
3298}
3299
3300static int write_nonleaves (FTLOADER bl, FIDX pivots_fidx, struct dbout *out, struct subtrees_info *sts, const DESCRIPTOR descriptor, uint32_t target_nodesize, uint32_t target_basementnodesize, enum toku_compression_method target_compression_method) {
3301 int result = 0;
3302 int height = 1;
3303
3304 // Watch out for the case where we saved the last pivot but didn't write any more nodes out.
3305 // The trick is not to look at n_pivots, but to look at blocks.n_blocks
3306 while (sts->n_subtrees > 1) {
3307 // If there is more than one block in blocks, then we must build another level of the tree.
3308
3309 // we need to create a pivots file for the pivots of the next level.
3310 // and a blocks_array
3311 // So for example.
3312 // 1) we grab 16 pivots and 16 blocks.
3313 // 2) We put the 15 pivots and 16 blocks into an non-leaf node.
3314 // 3) We put the 16th pivot into the next pivots file.
3315 {
3316 int r =
3317 fseek(toku_bl_fidx2file(bl, pivots_fidx)->file, 0, SEEK_SET);
3318 if (r != 0) {
3319 return get_error_errno();
3320 }
3321 }
3322
3323 FIDX next_pivots_file;
3324 {
3325 int r = ft_loader_open_temp_file (bl, &next_pivots_file);
3326 if (r != 0) { result = r; break; }
3327 }
3328
3329 struct subtrees_info next_sts;
3330 subtrees_info_init(&next_sts);
3331 next_sts.n_subtrees = 0;
3332 next_sts.n_subtrees_limit = 1;
3333 XMALLOC_N(next_sts.n_subtrees_limit, next_sts.subtrees);
3334
3335 const int n_per_block = 15;
3336 int64_t n_subtrees_used = 0;
3337 while (sts->n_subtrees - n_subtrees_used >= n_per_block*2) {
3338 // grab the first N_PER_BLOCK and build a node.
3339 DBT *pivots;
3340 int64_t blocknum_of_new_node = 0;
3341 struct subtree_info *subtree_info;
3342 int r = setup_nonleaf_block (n_per_block,
3343 sts, pivots_fidx, n_subtrees_used,
3344 &next_sts, next_pivots_file,
3345 out, bl,
3346 &blocknum_of_new_node, &subtree_info, &pivots);
3347 if (r) {
3348 result = r;
3349 break;
3350 } else {
3351 write_nonleaf_node(bl, out, blocknum_of_new_node, n_per_block, pivots, subtree_info, height, descriptor, target_nodesize, target_basementnodesize, target_compression_method); // frees all the data structures that go into making the node.
3352 n_subtrees_used += n_per_block;
3353 }
3354 }
3355
3356 int64_t n_blocks_left = sts->n_subtrees - n_subtrees_used;
3357 if (result == 0) {
3358 // Now we have a one or two blocks at the end to handle.
3359 invariant(n_blocks_left>=2);
3360 if (n_blocks_left > n_per_block) {
3361 // Write half the remaining blocks
3362 int64_t n_first = n_blocks_left/2;
3363 DBT *pivots;
3364 int64_t blocknum_of_new_node;
3365 struct subtree_info *subtree_info;
3366 int r = setup_nonleaf_block(n_first,
3367 sts, pivots_fidx, n_subtrees_used,
3368 &next_sts, next_pivots_file,
3369 out, bl,
3370 &blocknum_of_new_node, &subtree_info, &pivots);
3371 if (r) {
3372 result = r;
3373 } else {
3374 write_nonleaf_node(bl, out, blocknum_of_new_node, n_first, pivots, subtree_info, height, descriptor, target_nodesize, target_basementnodesize, target_compression_method);
3375 n_blocks_left -= n_first;
3376 n_subtrees_used += n_first;
3377 }
3378 }
3379 }
3380 if (result == 0) {
3381 // Write the last block.
3382 DBT *pivots;
3383 int64_t blocknum_of_new_node;
3384 struct subtree_info *subtree_info;
3385 int r = setup_nonleaf_block(n_blocks_left,
3386 sts, pivots_fidx, n_subtrees_used,
3387 &next_sts, next_pivots_file,
3388 out, bl,
3389 &blocknum_of_new_node, &subtree_info, &pivots);
3390 if (r) {
3391 result = r;
3392 } else {
3393 write_nonleaf_node(bl, out, blocknum_of_new_node, n_blocks_left, pivots, subtree_info, height, descriptor, target_nodesize, target_basementnodesize, target_compression_method);
3394 n_subtrees_used += n_blocks_left;
3395 }
3396 }
3397 if (result == 0)
3398 invariant(n_subtrees_used == sts->n_subtrees);
3399
3400
3401 if (result == 0) // pick up write_nonleaf_node errors
3402 result = ft_loader_get_error(&bl->error_callback);
3403
3404 // Now set things up for the next iteration.
3405 int r = ft_loader_fi_close(&bl->file_infos, pivots_fidx, true); if (r != 0 && result == 0) result = r;
3406 r = ft_loader_fi_unlink(&bl->file_infos, pivots_fidx); if (r != 0 && result == 0) result = r;
3407 pivots_fidx = next_pivots_file;
3408 toku_free(sts->subtrees); sts->subtrees = NULL;
3409 *sts = next_sts;
3410 height++;
3411
3412 if (result)
3413 break;
3414 }
3415 { int r = ft_loader_fi_close (&bl->file_infos, pivots_fidx, true); if (r != 0 && result == 0) result = r; }
3416 { int r = ft_loader_fi_unlink(&bl->file_infos, pivots_fidx); if (r != 0 && result == 0) result = r; }
3417 return result;
3418}
3419
3420void ft_loader_set_fractal_workers_count_from_c(FTLOADER bl) {
3421 ft_loader_set_fractal_workers_count (bl);
3422}
3423
3424
3425