| 1 | /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ |
| 2 | // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: |
| 3 | #ident "$Id$" |
| 4 | /*====== |
| 5 | This file is part of PerconaFT. |
| 6 | |
| 7 | |
| 8 | Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. |
| 9 | |
| 10 | PerconaFT is free software: you can redistribute it and/or modify |
| 11 | it under the terms of the GNU General Public License, version 2, |
| 12 | as published by the Free Software Foundation. |
| 13 | |
| 14 | PerconaFT is distributed in the hope that it will be useful, |
| 15 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 16 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| 17 | GNU General Public License for more details. |
| 18 | |
| 19 | You should have received a copy of the GNU General Public License |
| 20 | along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. |
| 21 | |
| 22 | ---------------------------------------- |
| 23 | |
| 24 | PerconaFT is free software: you can redistribute it and/or modify |
| 25 | it under the terms of the GNU Affero General Public License, version 3, |
| 26 | as published by the Free Software Foundation. |
| 27 | |
| 28 | PerconaFT is distributed in the hope that it will be useful, |
| 29 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 30 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| 31 | GNU Affero General Public License for more details. |
| 32 | |
| 33 | You should have received a copy of the GNU Affero General Public License |
| 34 | along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. |
| 35 | ======= */ |
| 36 | |
| 37 | #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved." |
| 38 | |
| 39 | #pragma once |
| 40 | |
| 41 | #include <db.h> |
| 42 | |
| 43 | #include "portability/toku_pthread.h" |
| 44 | |
| 45 | #include "loader/dbufio.h" |
| 46 | #include "loader/loader.h" |
| 47 | #include "util/queue.h" |
| 48 | |
| 49 | enum { |
| 50 | = 2, |
| 51 | FILE_BUFFER_SIZE = 1<<24, |
| 52 | MIN_ROWSET_MEMORY = 1<<23, |
| 53 | MIN_MERGE_FANIN = 2, |
| 54 | FRACTAL_WRITER_QUEUE_DEPTH = 3, |
| 55 | FRACTAL_WRITER_ROWSETS = FRACTAL_WRITER_QUEUE_DEPTH + 2, |
| 56 | DBUFIO_DEPTH = 2, |
| 57 | TARGET_MERGE_BUF_SIZE = 1<<24, // we'd like the merge buffer to be this big. |
| 58 | MIN_MERGE_BUF_SIZE = 1<<20, // always use at least this much |
| 59 | MAX_UNCOMPRESSED_BUF = MIN_MERGE_BUF_SIZE |
| 60 | }; |
| 61 | |
| 62 | /* These functions are exported to allow the tests to compile. */ |
| 63 | |
| 64 | /* These structures maintain a collection of all the open temporary files used by the loader. */ |
| 65 | struct file_info { |
| 66 | bool is_open; |
| 67 | bool is_extant; // if true, the file must be unlinked. |
| 68 | char *fname; |
| 69 | TOKU_FILE *file; |
| 70 | uint64_t n_rows; // how many rows were written into that file |
| 71 | size_t buffer_size; |
| 72 | void *buffer; |
| 73 | }; |
| 74 | struct file_infos { |
| 75 | int n_files; |
| 76 | int n_files_limit; |
| 77 | struct file_info *file_infos; |
| 78 | int n_files_open, n_files_extant; |
| 79 | toku_mutex_t lock; // must protect this data structure because current activity performs a REALLOC(fi->file_infos). |
| 80 | }; |
| 81 | typedef struct fidx { int idx; } FIDX; |
| 82 | static const FIDX FIDX_NULL __attribute__((__unused__)) = {-1}; |
| 83 | static int fidx_is_null(const FIDX f) __attribute__((__unused__)); |
| 84 | static int fidx_is_null(const FIDX f) { return f.idx == -1; } |
| 85 | TOKU_FILE *toku_bl_fidx2file(FTLOADER bl, FIDX i); |
| 86 | |
| 87 | int ft_loader_open_temp_file(FTLOADER bl, FIDX *file_idx); |
| 88 | |
| 89 | /* These data structures are used for manipulating a collection of rows in main memory. */ |
| 90 | struct row { |
| 91 | size_t off; // the offset in the data array. |
| 92 | int klen,vlen; |
| 93 | }; |
| 94 | struct rowset { |
| 95 | uint64_t memory_budget; |
| 96 | size_t n_rows, n_rows_limit; |
| 97 | struct row *rows; |
| 98 | size_t n_bytes, n_bytes_limit; |
| 99 | char *data; |
| 100 | }; |
| 101 | |
| 102 | int init_rowset (struct rowset *rows, uint64_t memory_budget); |
| 103 | void destroy_rowset(struct rowset *rows); |
| 104 | int add_row(struct rowset *rows, DBT *key, DBT *val); |
| 105 | |
| 106 | int loader_write_row(DBT *key, |
| 107 | DBT *val, |
| 108 | FIDX data, |
| 109 | TOKU_FILE *, |
| 110 | uint64_t *dataoff, |
| 111 | struct wbuf *wb, |
| 112 | FTLOADER bl); |
| 113 | int loader_read_row(TOKU_FILE *f, DBT *key, DBT *val); |
| 114 | |
| 115 | struct merge_fileset { |
| 116 | bool have_sorted_output; // Is there an previous key? |
| 117 | FIDX sorted_output; // this points to one of the data_fidxs. If output_is_sorted then this is the file containing sorted data. It's still open |
| 118 | DBT prev_key; // What is it? If it's here, its the last output in the merge fileset |
| 119 | |
| 120 | int n_temp_files, n_temp_files_limit; |
| 121 | FIDX *data_fidxs; |
| 122 | }; |
| 123 | |
| 124 | void init_merge_fileset (struct merge_fileset *fs); |
| 125 | void destroy_merge_fileset (struct merge_fileset *fs); |
| 126 | |
| 127 | struct poll_callback_s { |
| 128 | ft_loader_poll_func poll_function; |
| 129 | void *; |
| 130 | }; |
| 131 | typedef struct poll_callback_s *ft_loader_poll_callback; |
| 132 | |
| 133 | int ft_loader_init_poll_callback(ft_loader_poll_callback); |
| 134 | |
| 135 | void ft_loader_destroy_poll_callback(ft_loader_poll_callback); |
| 136 | |
| 137 | void ft_loader_set_poll_function(ft_loader_poll_callback, ft_loader_poll_func poll_function, void *); |
| 138 | |
| 139 | int ft_loader_call_poll_function(ft_loader_poll_callback, float progress); |
| 140 | |
| 141 | struct error_callback_s { |
| 142 | int error; |
| 143 | ft_loader_error_func error_callback; |
| 144 | void *; |
| 145 | DB *db; |
| 146 | int which_db; |
| 147 | DBT key; |
| 148 | DBT val; |
| 149 | bool did_callback; |
| 150 | toku_mutex_t mutex; |
| 151 | }; |
| 152 | typedef struct error_callback_s *ft_loader_error_callback; |
| 153 | |
| 154 | void ft_loader_init_error_callback(ft_loader_error_callback); |
| 155 | |
| 156 | void ft_loader_destroy_error_callback(ft_loader_error_callback); |
| 157 | |
| 158 | int ft_loader_get_error(ft_loader_error_callback); |
| 159 | |
| 160 | void ft_loader_set_error_function(ft_loader_error_callback, ft_loader_error_func error_function, void *); |
| 161 | |
| 162 | int ft_loader_set_error(ft_loader_error_callback, int error, DB *db, int which_db, DBT *key, DBT *val); |
| 163 | |
| 164 | int ft_loader_call_error_function(ft_loader_error_callback); |
| 165 | |
| 166 | int ft_loader_set_error_and_callback(ft_loader_error_callback, int error, DB *db, int which_db, DBT *key, DBT *val); |
| 167 | |
| 168 | struct ft_loader_s { |
| 169 | // These two are set in the close function, and used while running close |
| 170 | struct error_callback_s error_callback; |
| 171 | struct poll_callback_s poll_callback; |
| 172 | |
| 173 | generate_row_for_put_func generate_row_for_put; |
| 174 | ft_compare_func *bt_compare_funs; |
| 175 | |
| 176 | DB *src_db; |
| 177 | int N; |
| 178 | DB **dbs; // N of these |
| 179 | DESCRIPTOR *descriptors; // N of these. |
| 180 | TXNID *root_xids_that_created; // N of these. |
| 181 | const char **new_fnames_in_env; // N of these. The file names that the final data will be written to (relative to env). |
| 182 | |
| 183 | uint64_t *; // N of these. |
| 184 | |
| 185 | struct rowset primary_rowset; // the primary rows that have been put, but the secondary rows haven't been generated. |
| 186 | struct rowset primary_rowset_temp; // the primary rows that are being worked on by the extractor_thread. |
| 187 | |
| 188 | QUEUE primary_rowset_queue; // main thread enqueues rowsets in this queue (in maybe 64MB chunks). The extractor thread removes them, sorts them, adn writes to file. |
| 189 | toku_pthread_t ; // the thread that takes primary rowset and does extraction and the first level sort and write to file. |
| 190 | bool ; |
| 191 | |
| 192 | DBT *last_key; // for each rowset, remember the most recently output key. The system may choose not to keep this up-to-date when a rowset is unsorted. These keys are malloced and ulen maintains the size of the malloced block. |
| 193 | |
| 194 | struct rowset *rows; // secondary rows that have been put, but haven't been sorted and written to a file. |
| 195 | uint64_t n_rows; // how many rows have been put? |
| 196 | struct merge_fileset *fs; |
| 197 | |
| 198 | const char *temp_file_template; |
| 199 | |
| 200 | CACHETABLE cachetable; |
| 201 | bool did_reserve_memory; |
| 202 | bool compress_intermediates; |
| 203 | bool allow_puts; |
| 204 | uint64_t reserved_memory; // how much memory are we allowed to use? |
| 205 | |
| 206 | /* To make it easier to recover from errors, we don't use TOKU_FILE*, |
| 207 | * instead we use an index into the file_infos. */ |
| 208 | struct file_infos file_infos; |
| 209 | |
| 210 | #define PROGRESS_MAX (1 << 16) |
| 211 | int progress; // Progress runs from 0 to PROGRESS_MAX. When we call the poll function we convert to a float from 0.0 to 1.0 |
| 212 | // We use an integer so that we can add to the progress using a fetch-and-add instruction. |
| 213 | |
| 214 | int progress_callback_result; // initially zero, if any call to the poll function callback returns nonzero, we save the result here (and don't call the poll callback function again). |
| 215 | |
| 216 | LSN load_lsn; //LSN of the fsynced 'load' log entry. Write this LSN (as checkpoint_lsn) in ft headers made by this loader. |
| 217 | TXNID load_root_xid; //(Root) transaction that performed the load. |
| 218 | |
| 219 | QUEUE *fractal_queues; // an array of work queues, one for each secondary index. |
| 220 | toku_pthread_t *fractal_threads; |
| 221 | bool *fractal_threads_live; // an array of bools indicating that fractal_threads[i] is a live thread. (There is no NULL for a pthread_t, so we have to maintain this separately). |
| 222 | |
| 223 | unsigned fractal_workers; // number of fractal tree writer threads |
| 224 | |
| 225 | toku_mutex_t mutex; |
| 226 | bool mutex_init; |
| 227 | }; |
| 228 | |
| 229 | // Set the number of rows in the loader. Used for test. |
| 230 | void toku_ft_loader_set_n_rows(FTLOADER bl, uint64_t n_rows); |
| 231 | |
| 232 | // Get the number of rows in the loader. Used for test. |
| 233 | uint64_t toku_ft_loader_get_n_rows(FTLOADER bl); |
| 234 | |
| 235 | // The data passed into a fractal_thread via pthread_create. |
| 236 | struct fractal_thread_args { |
| 237 | FTLOADER bl; |
| 238 | const DESCRIPTOR descriptor; |
| 239 | int fd; // write the ft into fd. |
| 240 | int progress_allocation; |
| 241 | QUEUE q; |
| 242 | uint64_t total_disksize_estimate; |
| 243 | int errno_result; // the final result. |
| 244 | int which_db; |
| 245 | uint32_t target_nodesize; |
| 246 | uint32_t target_basementnodesize; |
| 247 | enum toku_compression_method target_compression_method; |
| 248 | uint32_t target_fanout; |
| 249 | }; |
| 250 | |
| 251 | void toku_ft_loader_set_n_rows(FTLOADER bl, uint64_t n_rows); |
| 252 | uint64_t toku_ft_loader_get_n_rows(FTLOADER bl); |
| 253 | |
| 254 | int merge_row_arrays_base (struct row dest[/*an+bn*/], struct row a[/*an*/], int an, struct row b[/*bn*/], int bn, |
| 255 | int which_db, DB *dest_db, ft_compare_func, |
| 256 | FTLOADER, |
| 257 | struct rowset *); |
| 258 | |
| 259 | int merge_files (struct merge_fileset *fs, FTLOADER bl, int which_db, DB *dest_db, ft_compare_func, int progress_allocation, QUEUE); |
| 260 | |
| 261 | int sort_and_write_rows (struct rowset rows, struct merge_fileset *fs, FTLOADER bl, int which_db, DB *dest_db, ft_compare_func); |
| 262 | |
| 263 | int mergesort_row_array (struct row rows[/*n*/], int n, int which_db, DB *dest_db, ft_compare_func, FTLOADER, struct rowset *); |
| 264 | |
| 265 | //int write_file_to_dbfile (int outfile, FIDX infile, FTLOADER bl, const DESCRIPTOR descriptor, int progress_allocation); |
| 266 | int toku_merge_some_files_using_dbufio (const bool to_q, FIDX dest_data, QUEUE q, int n_sources, DBUFIO_FILESET bfs, FIDX srcs_fidxs[/*n_sources*/], FTLOADER bl, int which_db, DB *dest_db, ft_compare_func compare, int progress_allocation); |
| 267 | |
| 268 | int ft_loader_sort_and_write_rows (struct rowset *rows, struct merge_fileset *fs, FTLOADER bl, int which_db, DB *dest_db, ft_compare_func); |
| 269 | |
| 270 | // This is probably only for testing. |
| 271 | int toku_loader_write_ft_from_q_in_C (FTLOADER bl, |
| 272 | const DESCRIPTOR descriptor, |
| 273 | int fd, // write to here |
| 274 | int progress_allocation, |
| 275 | QUEUE q, |
| 276 | uint64_t total_disksize_estimate, |
| 277 | int which_db, |
| 278 | uint32_t target_nodesize, |
| 279 | uint32_t target_basementnodesize, |
| 280 | enum toku_compression_method target_compression_method, |
| 281 | uint32_t fanout); |
| 282 | |
| 283 | int ft_loader_mergesort_row_array (struct row rows[/*n*/], int n, int which_db, DB *dest_db, ft_compare_func, FTLOADER, struct rowset *); |
| 284 | |
| 285 | int ft_loader_write_file_to_dbfile (int outfile, FIDX infile, FTLOADER bl, const DESCRIPTOR descriptor, int progress_allocation); |
| 286 | |
| 287 | int ft_loader_init_file_infos (struct file_infos *fi); |
| 288 | void ft_loader_fi_destroy (struct file_infos *fi, bool is_error); |
| 289 | int ft_loader_fi_close (struct file_infos *fi, FIDX idx, bool require_open); |
| 290 | int ft_loader_fi_close_all (struct file_infos *fi); |
| 291 | int ft_loader_fi_reopen (struct file_infos *fi, FIDX idx, const char *mode); |
| 292 | int ft_loader_fi_unlink (struct file_infos *fi, FIDX idx); |
| 293 | |
| 294 | int toku_ft_loader_internal_init (/* out */ FTLOADER *blp, |
| 295 | CACHETABLE cachetable, |
| 296 | generate_row_for_put_func g, |
| 297 | DB *src_db, |
| 298 | int N, FT_HANDLE ft_hs[/*N*/], DB* dbs[/*N*/], |
| 299 | const char *new_fnames_in_env[/*N*/], |
| 300 | ft_compare_func bt_compare_functions[/*N*/], |
| 301 | const char *temp_file_template, |
| 302 | LSN load_lsn, |
| 303 | TOKUTXN txn, |
| 304 | bool reserve_memory, |
| 305 | uint64_t reserve_memory_size, |
| 306 | bool compress_intermediates, |
| 307 | bool allow_puts); |
| 308 | |
| 309 | void toku_ft_loader_internal_destroy (FTLOADER bl, bool is_error); |
| 310 | |
| 311 | // For test purposes only. (In production, the rowset size is determined by negotiation with the cachetable for some memory. See #2613.) |
| 312 | uint64_t toku_ft_loader_get_rowset_budget_for_testing (void); |
| 313 | |
| 314 | int (FTLOADER bl); |
| 315 | |
| 316 | int toku_ft_loader_get_error(FTLOADER bl, int *loader_errno); |
| 317 | |
| 318 | void ft_loader_lock_init(FTLOADER bl); |
| 319 | void ft_loader_lock_destroy(FTLOADER bl); |
| 320 | void ft_loader_set_fractal_workers_count_from_c(FTLOADER bl); |
| 321 | |