| 1 | /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ | 
| 2 | // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: | 
| 3 | #ident "$Id$" | 
| 4 | /*====== | 
| 5 | This file is part of PerconaFT. | 
| 6 |  | 
| 7 |  | 
| 8 | Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. | 
| 9 |  | 
| 10 |     PerconaFT is free software: you can redistribute it and/or modify | 
| 11 |     it under the terms of the GNU General Public License, version 2, | 
| 12 |     as published by the Free Software Foundation. | 
| 13 |  | 
| 14 |     PerconaFT is distributed in the hope that it will be useful, | 
| 15 |     but WITHOUT ANY WARRANTY; without even the implied warranty of | 
| 16 |     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the | 
| 17 |     GNU General Public License for more details. | 
| 18 |  | 
| 19 |     You should have received a copy of the GNU General Public License | 
| 20 |     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>. | 
| 21 |  | 
| 22 | ---------------------------------------- | 
| 23 |  | 
| 24 |     PerconaFT is free software: you can redistribute it and/or modify | 
| 25 |     it under the terms of the GNU Affero General Public License, version 3, | 
| 26 |     as published by the Free Software Foundation. | 
| 27 |  | 
| 28 |     PerconaFT is distributed in the hope that it will be useful, | 
| 29 |     but WITHOUT ANY WARRANTY; without even the implied warranty of | 
| 30 |     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the | 
| 31 |     GNU Affero General Public License for more details. | 
| 32 |  | 
| 33 |     You should have received a copy of the GNU Affero General Public License | 
| 34 |     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>. | 
| 35 | ======= */ | 
| 36 |  | 
| 37 | #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved." | 
| 38 |  | 
| 39 | #pragma once | 
| 40 |  | 
| 41 | #include <db.h> | 
| 42 |  | 
| 43 | #include "portability/toku_pthread.h" | 
| 44 |  | 
| 45 | #include "loader/dbufio.h" | 
| 46 | #include "loader/loader.h" | 
| 47 | #include "util/queue.h" | 
| 48 |  | 
| 49 | enum { | 
| 50 |      = 2, | 
| 51 |     FILE_BUFFER_SIZE  = 1<<24, | 
| 52 |     MIN_ROWSET_MEMORY = 1<<23, | 
| 53 |     MIN_MERGE_FANIN   = 2, | 
| 54 |     FRACTAL_WRITER_QUEUE_DEPTH = 3, | 
| 55 |     FRACTAL_WRITER_ROWSETS = FRACTAL_WRITER_QUEUE_DEPTH + 2, | 
| 56 |     DBUFIO_DEPTH = 2, | 
| 57 |     TARGET_MERGE_BUF_SIZE = 1<<24, // we'd like the merge buffer to be this big. | 
| 58 |     MIN_MERGE_BUF_SIZE = 1<<20, // always use at least this much | 
| 59 |     MAX_UNCOMPRESSED_BUF = MIN_MERGE_BUF_SIZE | 
| 60 | }; | 
| 61 |  | 
| 62 | /* These functions are exported to allow the tests to compile. */ | 
| 63 |  | 
| 64 | /* These structures maintain a collection of all the open temporary files used by the loader. */ | 
| 65 | struct file_info { | 
| 66 |     bool is_open; | 
| 67 |     bool is_extant;  // if true, the file must be unlinked. | 
| 68 |     char *fname; | 
| 69 |     TOKU_FILE *file; | 
| 70 |     uint64_t n_rows;  // how many rows were written into that file | 
| 71 |     size_t buffer_size; | 
| 72 |     void *buffer; | 
| 73 | }; | 
| 74 | struct file_infos { | 
| 75 |     int n_files; | 
| 76 |     int n_files_limit; | 
| 77 |     struct file_info *file_infos; | 
| 78 |     int n_files_open, n_files_extant; | 
| 79 |     toku_mutex_t lock; // must protect this data structure because current activity performs a REALLOC(fi->file_infos). | 
| 80 | }; | 
| 81 | typedef struct fidx { int idx; } FIDX; | 
| 82 | static const FIDX FIDX_NULL __attribute__((__unused__)) = {-1}; | 
| 83 | static int fidx_is_null(const FIDX f) __attribute__((__unused__)); | 
| 84 | static int fidx_is_null(const FIDX f) { return f.idx == -1; } | 
| 85 | TOKU_FILE *toku_bl_fidx2file(FTLOADER bl, FIDX i); | 
| 86 |  | 
| 87 | int ft_loader_open_temp_file(FTLOADER bl, FIDX *file_idx); | 
| 88 |  | 
| 89 | /* These data structures are used for manipulating a collection of rows in main memory. */ | 
| 90 | struct row { | 
| 91 |     size_t off; // the offset in the data array. | 
| 92 |     int   klen,vlen; | 
| 93 | }; | 
| 94 | struct rowset { | 
| 95 |     uint64_t memory_budget; | 
| 96 |     size_t n_rows, n_rows_limit; | 
| 97 |     struct row *rows; | 
| 98 |     size_t n_bytes, n_bytes_limit; | 
| 99 |     char *data; | 
| 100 | }; | 
| 101 |  | 
| 102 | int init_rowset (struct rowset *rows, uint64_t memory_budget); | 
| 103 | void destroy_rowset(struct rowset *rows); | 
| 104 | int add_row(struct rowset *rows, DBT *key, DBT *val); | 
| 105 |  | 
| 106 | int loader_write_row(DBT *key, | 
| 107 |                      DBT *val, | 
| 108 |                      FIDX data, | 
| 109 |                      TOKU_FILE *, | 
| 110 |                      uint64_t *dataoff, | 
| 111 |                      struct wbuf *wb, | 
| 112 |                      FTLOADER bl); | 
| 113 | int loader_read_row(TOKU_FILE *f, DBT *key, DBT *val); | 
| 114 |  | 
| 115 | struct merge_fileset { | 
| 116 |     bool have_sorted_output;  // Is there an previous key? | 
| 117 |     FIDX sorted_output;       // this points to one of the data_fidxs.  If output_is_sorted then this is the file containing sorted data.  It's still open | 
| 118 |     DBT  prev_key;            // What is it?  If it's here, its the last output in the merge fileset | 
| 119 |  | 
| 120 |     int n_temp_files, n_temp_files_limit; | 
| 121 |     FIDX *data_fidxs; | 
| 122 | }; | 
| 123 |  | 
| 124 | void init_merge_fileset (struct merge_fileset *fs); | 
| 125 | void destroy_merge_fileset (struct merge_fileset *fs); | 
| 126 |  | 
| 127 | struct poll_callback_s { | 
| 128 |     ft_loader_poll_func poll_function; | 
| 129 |     void *; | 
| 130 | }; | 
| 131 | typedef struct poll_callback_s *ft_loader_poll_callback; | 
| 132 |  | 
| 133 | int ft_loader_init_poll_callback(ft_loader_poll_callback); | 
| 134 |  | 
| 135 | void ft_loader_destroy_poll_callback(ft_loader_poll_callback); | 
| 136 |  | 
| 137 | void ft_loader_set_poll_function(ft_loader_poll_callback, ft_loader_poll_func poll_function, void *); | 
| 138 |  | 
| 139 | int ft_loader_call_poll_function(ft_loader_poll_callback, float progress); | 
| 140 |  | 
| 141 | struct error_callback_s { | 
| 142 |     int error; | 
| 143 |     ft_loader_error_func error_callback; | 
| 144 |     void *; | 
| 145 |     DB *db; | 
| 146 |     int which_db; | 
| 147 |     DBT key; | 
| 148 |     DBT val; | 
| 149 |     bool did_callback; | 
| 150 |     toku_mutex_t mutex; | 
| 151 | }; | 
| 152 | typedef struct error_callback_s *ft_loader_error_callback; | 
| 153 |  | 
| 154 | void ft_loader_init_error_callback(ft_loader_error_callback); | 
| 155 |  | 
| 156 | void ft_loader_destroy_error_callback(ft_loader_error_callback); | 
| 157 |  | 
| 158 | int ft_loader_get_error(ft_loader_error_callback); | 
| 159 |  | 
| 160 | void ft_loader_set_error_function(ft_loader_error_callback, ft_loader_error_func error_function, void *); | 
| 161 |  | 
| 162 | int ft_loader_set_error(ft_loader_error_callback, int error, DB *db, int which_db, DBT *key, DBT *val); | 
| 163 |  | 
| 164 | int ft_loader_call_error_function(ft_loader_error_callback); | 
| 165 |  | 
| 166 | int ft_loader_set_error_and_callback(ft_loader_error_callback, int error, DB *db, int which_db, DBT *key, DBT *val); | 
| 167 |  | 
| 168 | struct ft_loader_s { | 
| 169 |     // These two are set in the close function, and used while running close | 
| 170 |     struct error_callback_s error_callback; | 
| 171 |     struct poll_callback_s poll_callback; | 
| 172 |  | 
| 173 |     generate_row_for_put_func generate_row_for_put; | 
| 174 |     ft_compare_func *bt_compare_funs; | 
| 175 |  | 
| 176 |     DB *src_db; | 
| 177 |     int N; | 
| 178 |     DB **dbs; // N of these | 
| 179 |     DESCRIPTOR *descriptors; // N of these. | 
| 180 |     TXNID      *root_xids_that_created; // N of these. | 
| 181 |     const char **new_fnames_in_env; // N of these.  The file names that the final data will be written to (relative to env). | 
| 182 |  | 
| 183 |     uint64_t *; // N of these. | 
| 184 |  | 
| 185 |     struct rowset primary_rowset; // the primary rows that have been put, but the secondary rows haven't been generated. | 
| 186 |     struct rowset primary_rowset_temp; // the primary rows that are being worked on by the extractor_thread. | 
| 187 |  | 
| 188 |     QUEUE primary_rowset_queue; // main thread enqueues rowsets in this queue (in maybe 64MB chunks).  The extractor thread removes them, sorts them, adn writes to file. | 
| 189 |     toku_pthread_t     ;     // the thread that takes primary rowset and does extraction and the first level sort and write to file. | 
| 190 |     bool ; | 
| 191 |  | 
| 192 |     DBT  *last_key;         // for each rowset, remember the most recently output key.  The system may choose not to keep this up-to-date when a rowset is unsorted.  These keys are malloced and ulen maintains the size of the malloced block. | 
| 193 |      | 
| 194 |     struct rowset *rows; // secondary rows that have been put, but haven't been sorted and written to a file. | 
| 195 |     uint64_t n_rows; // how many rows have been put? | 
| 196 |     struct merge_fileset *fs; | 
| 197 |  | 
| 198 |     const char *temp_file_template; | 
| 199 |  | 
| 200 |     CACHETABLE cachetable; | 
| 201 |     bool did_reserve_memory; | 
| 202 |     bool compress_intermediates; | 
| 203 |     bool allow_puts; | 
| 204 |     uint64_t reserved_memory;  // how much memory are we allowed to use? | 
| 205 |  | 
| 206 |     /* To make it easier to recover from errors, we don't use TOKU_FILE*, | 
| 207 |      * instead we use an index into the file_infos. */ | 
| 208 |     struct file_infos file_infos; | 
| 209 |  | 
| 210 | #define PROGRESS_MAX (1 << 16) | 
| 211 |     int progress;       // Progress runs from 0 to PROGRESS_MAX.  When we call the poll function we convert to a float from 0.0 to 1.0 | 
| 212 |     // We use an integer so that we can add to the progress using a fetch-and-add instruction. | 
| 213 |  | 
| 214 |     int progress_callback_result; // initially zero, if any call to the poll function callback returns nonzero, we save the result here (and don't call the poll callback function again). | 
| 215 |  | 
| 216 |     LSN load_lsn; //LSN of the fsynced 'load' log entry.  Write this LSN (as checkpoint_lsn) in ft headers made by this loader. | 
| 217 |     TXNID load_root_xid; //(Root) transaction that performed the load. | 
| 218 |  | 
| 219 |     QUEUE *fractal_queues; // an array of work queues, one for each secondary index. | 
| 220 |     toku_pthread_t *fractal_threads; | 
| 221 |     bool *fractal_threads_live; // an array of bools indicating that fractal_threads[i] is a live thread.  (There is no NULL for a pthread_t, so we have to maintain this separately). | 
| 222 |  | 
| 223 |     unsigned fractal_workers; // number of fractal tree writer threads | 
| 224 |  | 
| 225 |     toku_mutex_t mutex; | 
| 226 |     bool mutex_init; | 
| 227 | }; | 
| 228 |  | 
| 229 | // Set the number of rows in the loader.  Used for test. | 
| 230 | void toku_ft_loader_set_n_rows(FTLOADER bl, uint64_t n_rows); | 
| 231 |  | 
| 232 | // Get the number of rows in the loader.  Used for test. | 
| 233 | uint64_t toku_ft_loader_get_n_rows(FTLOADER bl); | 
| 234 |  | 
| 235 | // The data passed into a fractal_thread via pthread_create. | 
| 236 | struct fractal_thread_args { | 
| 237 |     FTLOADER                bl; | 
| 238 |     const DESCRIPTOR descriptor; | 
| 239 |     int                      fd; // write the ft into fd. | 
| 240 |     int                      progress_allocation; | 
| 241 |     QUEUE                    q; | 
| 242 |     uint64_t                 total_disksize_estimate; | 
| 243 |     int                      errno_result; // the final result. | 
| 244 |     int                      which_db; | 
| 245 |     uint32_t                 target_nodesize; | 
| 246 |     uint32_t                 target_basementnodesize; | 
| 247 |     enum toku_compression_method target_compression_method; | 
| 248 |     uint32_t                 target_fanout; | 
| 249 | }; | 
| 250 |  | 
| 251 | void toku_ft_loader_set_n_rows(FTLOADER bl, uint64_t n_rows); | 
| 252 | uint64_t toku_ft_loader_get_n_rows(FTLOADER bl); | 
| 253 |  | 
| 254 | int merge_row_arrays_base (struct row dest[/*an+bn*/], struct row a[/*an*/], int an, struct row b[/*bn*/], int bn, | 
| 255 |                            int which_db, DB *dest_db, ft_compare_func, | 
| 256 | 			   FTLOADER, | 
| 257 |                            struct rowset *); | 
| 258 |  | 
| 259 | int merge_files (struct merge_fileset *fs, FTLOADER bl, int which_db, DB *dest_db, ft_compare_func, int progress_allocation, QUEUE); | 
| 260 |  | 
| 261 | int sort_and_write_rows (struct rowset rows, struct merge_fileset *fs, FTLOADER bl, int which_db, DB *dest_db, ft_compare_func); | 
| 262 |  | 
| 263 | int mergesort_row_array (struct row rows[/*n*/], int n, int which_db, DB *dest_db, ft_compare_func, FTLOADER, struct rowset *); | 
| 264 |  | 
| 265 | //int write_file_to_dbfile (int outfile, FIDX infile, FTLOADER bl, const DESCRIPTOR descriptor, int progress_allocation); | 
| 266 | int toku_merge_some_files_using_dbufio (const bool to_q, FIDX dest_data, QUEUE q, int n_sources, DBUFIO_FILESET bfs, FIDX srcs_fidxs[/*n_sources*/], FTLOADER bl, int which_db, DB *dest_db, ft_compare_func compare, int progress_allocation); | 
| 267 |  | 
| 268 | int ft_loader_sort_and_write_rows (struct rowset *rows, struct merge_fileset *fs, FTLOADER bl, int which_db, DB *dest_db, ft_compare_func); | 
| 269 |  | 
| 270 | // This is probably only for testing. | 
| 271 | int toku_loader_write_ft_from_q_in_C (FTLOADER                 bl, | 
| 272 | 				      const DESCRIPTOR         descriptor, | 
| 273 | 				      int                      fd, // write to here | 
| 274 | 				      int                      progress_allocation, | 
| 275 | 				      QUEUE                    q, | 
| 276 | 				      uint64_t                 total_disksize_estimate, | 
| 277 |                                       int                      which_db, | 
| 278 |                                       uint32_t                 target_nodesize, | 
| 279 |                                       uint32_t                 target_basementnodesize, | 
| 280 |                                       enum toku_compression_method target_compression_method, | 
| 281 |                                       uint32_t                 fanout); | 
| 282 |  | 
| 283 | int ft_loader_mergesort_row_array (struct row rows[/*n*/], int n, int which_db, DB *dest_db, ft_compare_func, FTLOADER, struct rowset *); | 
| 284 |  | 
| 285 | int ft_loader_write_file_to_dbfile (int outfile, FIDX infile, FTLOADER bl, const DESCRIPTOR descriptor, int progress_allocation); | 
| 286 |  | 
| 287 | int ft_loader_init_file_infos (struct file_infos *fi); | 
| 288 | void ft_loader_fi_destroy (struct file_infos *fi, bool is_error); | 
| 289 | int ft_loader_fi_close (struct file_infos *fi, FIDX idx, bool require_open); | 
| 290 | int ft_loader_fi_close_all (struct file_infos *fi); | 
| 291 | int ft_loader_fi_reopen (struct file_infos *fi, FIDX idx, const char *mode); | 
| 292 | int ft_loader_fi_unlink (struct file_infos *fi, FIDX idx); | 
| 293 |  | 
| 294 | int toku_ft_loader_internal_init (/* out */ FTLOADER *blp, | 
| 295 | 				   CACHETABLE cachetable, | 
| 296 | 				   generate_row_for_put_func g, | 
| 297 | 				   DB *src_db, | 
| 298 | 				   int N, FT_HANDLE ft_hs[/*N*/], DB* dbs[/*N*/], | 
| 299 | 				   const char *new_fnames_in_env[/*N*/], | 
| 300 | 				   ft_compare_func bt_compare_functions[/*N*/], | 
| 301 | 				   const char *temp_file_template, | 
| 302 | 				   LSN load_lsn, | 
| 303 |                                    TOKUTXN txn, | 
| 304 |                                    bool reserve_memory, | 
| 305 |                                    uint64_t reserve_memory_size, | 
| 306 |                                    bool compress_intermediates, | 
| 307 |                                    bool allow_puts); | 
| 308 |  | 
| 309 | void toku_ft_loader_internal_destroy (FTLOADER bl, bool is_error); | 
| 310 |  | 
| 311 | // For test purposes only.  (In production, the rowset size is determined by negotiation with the cachetable for some memory.  See #2613.) | 
| 312 | uint64_t toku_ft_loader_get_rowset_budget_for_testing (void); | 
| 313 |  | 
| 314 | int (FTLOADER bl); | 
| 315 |  | 
| 316 | int toku_ft_loader_get_error(FTLOADER bl, int *loader_errno); | 
| 317 |  | 
| 318 | void ft_loader_lock_init(FTLOADER bl); | 
| 319 | void ft_loader_lock_destroy(FTLOADER bl); | 
| 320 | void ft_loader_set_fractal_workers_count_from_c(FTLOADER bl); | 
| 321 |  |