1 | /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ |
2 | // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: |
3 | #ident "$Id$" |
4 | /*====== |
5 | This file is part of PerconaFT. |
6 | |
7 | |
8 | Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. |
9 | |
10 | PerconaFT is free software: you can redistribute it and/or modify |
11 | it under the terms of the GNU General Public License, version 2, |
12 | as published by the Free Software Foundation. |
13 | |
14 | PerconaFT is distributed in the hope that it will be useful, |
15 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
16 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
17 | GNU General Public License for more details. |
18 | |
19 | You should have received a copy of the GNU General Public License |
20 | along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. |
21 | |
22 | ---------------------------------------- |
23 | |
24 | PerconaFT is free software: you can redistribute it and/or modify |
25 | it under the terms of the GNU Affero General Public License, version 3, |
26 | as published by the Free Software Foundation. |
27 | |
28 | PerconaFT is distributed in the hope that it will be useful, |
29 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
30 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
31 | GNU Affero General Public License for more details. |
32 | |
33 | You should have received a copy of the GNU Affero General Public License |
34 | along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. |
35 | ======= */ |
36 | |
37 | #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved." |
38 | |
39 | #pragma once |
40 | |
41 | #include <db.h> |
42 | |
43 | #include "portability/toku_pthread.h" |
44 | |
45 | #include "loader/dbufio.h" |
46 | #include "loader/loader.h" |
47 | #include "util/queue.h" |
48 | |
49 | enum { |
50 | = 2, |
51 | FILE_BUFFER_SIZE = 1<<24, |
52 | MIN_ROWSET_MEMORY = 1<<23, |
53 | MIN_MERGE_FANIN = 2, |
54 | FRACTAL_WRITER_QUEUE_DEPTH = 3, |
55 | FRACTAL_WRITER_ROWSETS = FRACTAL_WRITER_QUEUE_DEPTH + 2, |
56 | DBUFIO_DEPTH = 2, |
57 | TARGET_MERGE_BUF_SIZE = 1<<24, // we'd like the merge buffer to be this big. |
58 | MIN_MERGE_BUF_SIZE = 1<<20, // always use at least this much |
59 | MAX_UNCOMPRESSED_BUF = MIN_MERGE_BUF_SIZE |
60 | }; |
61 | |
62 | /* These functions are exported to allow the tests to compile. */ |
63 | |
64 | /* These structures maintain a collection of all the open temporary files used by the loader. */ |
65 | struct file_info { |
66 | bool is_open; |
67 | bool is_extant; // if true, the file must be unlinked. |
68 | char *fname; |
69 | TOKU_FILE *file; |
70 | uint64_t n_rows; // how many rows were written into that file |
71 | size_t buffer_size; |
72 | void *buffer; |
73 | }; |
74 | struct file_infos { |
75 | int n_files; |
76 | int n_files_limit; |
77 | struct file_info *file_infos; |
78 | int n_files_open, n_files_extant; |
79 | toku_mutex_t lock; // must protect this data structure because current activity performs a REALLOC(fi->file_infos). |
80 | }; |
81 | typedef struct fidx { int idx; } FIDX; |
82 | static const FIDX FIDX_NULL __attribute__((__unused__)) = {-1}; |
83 | static int fidx_is_null(const FIDX f) __attribute__((__unused__)); |
84 | static int fidx_is_null(const FIDX f) { return f.idx == -1; } |
85 | TOKU_FILE *toku_bl_fidx2file(FTLOADER bl, FIDX i); |
86 | |
87 | int ft_loader_open_temp_file(FTLOADER bl, FIDX *file_idx); |
88 | |
89 | /* These data structures are used for manipulating a collection of rows in main memory. */ |
90 | struct row { |
91 | size_t off; // the offset in the data array. |
92 | int klen,vlen; |
93 | }; |
94 | struct rowset { |
95 | uint64_t memory_budget; |
96 | size_t n_rows, n_rows_limit; |
97 | struct row *rows; |
98 | size_t n_bytes, n_bytes_limit; |
99 | char *data; |
100 | }; |
101 | |
102 | int init_rowset (struct rowset *rows, uint64_t memory_budget); |
103 | void destroy_rowset(struct rowset *rows); |
104 | int add_row(struct rowset *rows, DBT *key, DBT *val); |
105 | |
106 | int loader_write_row(DBT *key, |
107 | DBT *val, |
108 | FIDX data, |
109 | TOKU_FILE *, |
110 | uint64_t *dataoff, |
111 | struct wbuf *wb, |
112 | FTLOADER bl); |
113 | int loader_read_row(TOKU_FILE *f, DBT *key, DBT *val); |
114 | |
115 | struct merge_fileset { |
116 | bool have_sorted_output; // Is there an previous key? |
117 | FIDX sorted_output; // this points to one of the data_fidxs. If output_is_sorted then this is the file containing sorted data. It's still open |
118 | DBT prev_key; // What is it? If it's here, its the last output in the merge fileset |
119 | |
120 | int n_temp_files, n_temp_files_limit; |
121 | FIDX *data_fidxs; |
122 | }; |
123 | |
124 | void init_merge_fileset (struct merge_fileset *fs); |
125 | void destroy_merge_fileset (struct merge_fileset *fs); |
126 | |
127 | struct poll_callback_s { |
128 | ft_loader_poll_func poll_function; |
129 | void *; |
130 | }; |
131 | typedef struct poll_callback_s *ft_loader_poll_callback; |
132 | |
133 | int ft_loader_init_poll_callback(ft_loader_poll_callback); |
134 | |
135 | void ft_loader_destroy_poll_callback(ft_loader_poll_callback); |
136 | |
137 | void ft_loader_set_poll_function(ft_loader_poll_callback, ft_loader_poll_func poll_function, void *); |
138 | |
139 | int ft_loader_call_poll_function(ft_loader_poll_callback, float progress); |
140 | |
141 | struct error_callback_s { |
142 | int error; |
143 | ft_loader_error_func error_callback; |
144 | void *; |
145 | DB *db; |
146 | int which_db; |
147 | DBT key; |
148 | DBT val; |
149 | bool did_callback; |
150 | toku_mutex_t mutex; |
151 | }; |
152 | typedef struct error_callback_s *ft_loader_error_callback; |
153 | |
154 | void ft_loader_init_error_callback(ft_loader_error_callback); |
155 | |
156 | void ft_loader_destroy_error_callback(ft_loader_error_callback); |
157 | |
158 | int ft_loader_get_error(ft_loader_error_callback); |
159 | |
160 | void ft_loader_set_error_function(ft_loader_error_callback, ft_loader_error_func error_function, void *); |
161 | |
162 | int ft_loader_set_error(ft_loader_error_callback, int error, DB *db, int which_db, DBT *key, DBT *val); |
163 | |
164 | int ft_loader_call_error_function(ft_loader_error_callback); |
165 | |
166 | int ft_loader_set_error_and_callback(ft_loader_error_callback, int error, DB *db, int which_db, DBT *key, DBT *val); |
167 | |
168 | struct ft_loader_s { |
169 | // These two are set in the close function, and used while running close |
170 | struct error_callback_s error_callback; |
171 | struct poll_callback_s poll_callback; |
172 | |
173 | generate_row_for_put_func generate_row_for_put; |
174 | ft_compare_func *bt_compare_funs; |
175 | |
176 | DB *src_db; |
177 | int N; |
178 | DB **dbs; // N of these |
179 | DESCRIPTOR *descriptors; // N of these. |
180 | TXNID *root_xids_that_created; // N of these. |
181 | const char **new_fnames_in_env; // N of these. The file names that the final data will be written to (relative to env). |
182 | |
183 | uint64_t *; // N of these. |
184 | |
185 | struct rowset primary_rowset; // the primary rows that have been put, but the secondary rows haven't been generated. |
186 | struct rowset primary_rowset_temp; // the primary rows that are being worked on by the extractor_thread. |
187 | |
188 | QUEUE primary_rowset_queue; // main thread enqueues rowsets in this queue (in maybe 64MB chunks). The extractor thread removes them, sorts them, adn writes to file. |
189 | toku_pthread_t ; // the thread that takes primary rowset and does extraction and the first level sort and write to file. |
190 | bool ; |
191 | |
192 | DBT *last_key; // for each rowset, remember the most recently output key. The system may choose not to keep this up-to-date when a rowset is unsorted. These keys are malloced and ulen maintains the size of the malloced block. |
193 | |
194 | struct rowset *rows; // secondary rows that have been put, but haven't been sorted and written to a file. |
195 | uint64_t n_rows; // how many rows have been put? |
196 | struct merge_fileset *fs; |
197 | |
198 | const char *temp_file_template; |
199 | |
200 | CACHETABLE cachetable; |
201 | bool did_reserve_memory; |
202 | bool compress_intermediates; |
203 | bool allow_puts; |
204 | uint64_t reserved_memory; // how much memory are we allowed to use? |
205 | |
206 | /* To make it easier to recover from errors, we don't use TOKU_FILE*, |
207 | * instead we use an index into the file_infos. */ |
208 | struct file_infos file_infos; |
209 | |
210 | #define PROGRESS_MAX (1 << 16) |
211 | int progress; // Progress runs from 0 to PROGRESS_MAX. When we call the poll function we convert to a float from 0.0 to 1.0 |
212 | // We use an integer so that we can add to the progress using a fetch-and-add instruction. |
213 | |
214 | int progress_callback_result; // initially zero, if any call to the poll function callback returns nonzero, we save the result here (and don't call the poll callback function again). |
215 | |
216 | LSN load_lsn; //LSN of the fsynced 'load' log entry. Write this LSN (as checkpoint_lsn) in ft headers made by this loader. |
217 | TXNID load_root_xid; //(Root) transaction that performed the load. |
218 | |
219 | QUEUE *fractal_queues; // an array of work queues, one for each secondary index. |
220 | toku_pthread_t *fractal_threads; |
221 | bool *fractal_threads_live; // an array of bools indicating that fractal_threads[i] is a live thread. (There is no NULL for a pthread_t, so we have to maintain this separately). |
222 | |
223 | unsigned fractal_workers; // number of fractal tree writer threads |
224 | |
225 | toku_mutex_t mutex; |
226 | bool mutex_init; |
227 | }; |
228 | |
229 | // Set the number of rows in the loader. Used for test. |
230 | void toku_ft_loader_set_n_rows(FTLOADER bl, uint64_t n_rows); |
231 | |
232 | // Get the number of rows in the loader. Used for test. |
233 | uint64_t toku_ft_loader_get_n_rows(FTLOADER bl); |
234 | |
235 | // The data passed into a fractal_thread via pthread_create. |
236 | struct fractal_thread_args { |
237 | FTLOADER bl; |
238 | const DESCRIPTOR descriptor; |
239 | int fd; // write the ft into fd. |
240 | int progress_allocation; |
241 | QUEUE q; |
242 | uint64_t total_disksize_estimate; |
243 | int errno_result; // the final result. |
244 | int which_db; |
245 | uint32_t target_nodesize; |
246 | uint32_t target_basementnodesize; |
247 | enum toku_compression_method target_compression_method; |
248 | uint32_t target_fanout; |
249 | }; |
250 | |
251 | void toku_ft_loader_set_n_rows(FTLOADER bl, uint64_t n_rows); |
252 | uint64_t toku_ft_loader_get_n_rows(FTLOADER bl); |
253 | |
254 | int merge_row_arrays_base (struct row dest[/*an+bn*/], struct row a[/*an*/], int an, struct row b[/*bn*/], int bn, |
255 | int which_db, DB *dest_db, ft_compare_func, |
256 | FTLOADER, |
257 | struct rowset *); |
258 | |
259 | int merge_files (struct merge_fileset *fs, FTLOADER bl, int which_db, DB *dest_db, ft_compare_func, int progress_allocation, QUEUE); |
260 | |
261 | int sort_and_write_rows (struct rowset rows, struct merge_fileset *fs, FTLOADER bl, int which_db, DB *dest_db, ft_compare_func); |
262 | |
263 | int mergesort_row_array (struct row rows[/*n*/], int n, int which_db, DB *dest_db, ft_compare_func, FTLOADER, struct rowset *); |
264 | |
265 | //int write_file_to_dbfile (int outfile, FIDX infile, FTLOADER bl, const DESCRIPTOR descriptor, int progress_allocation); |
266 | int toku_merge_some_files_using_dbufio (const bool to_q, FIDX dest_data, QUEUE q, int n_sources, DBUFIO_FILESET bfs, FIDX srcs_fidxs[/*n_sources*/], FTLOADER bl, int which_db, DB *dest_db, ft_compare_func compare, int progress_allocation); |
267 | |
268 | int ft_loader_sort_and_write_rows (struct rowset *rows, struct merge_fileset *fs, FTLOADER bl, int which_db, DB *dest_db, ft_compare_func); |
269 | |
270 | // This is probably only for testing. |
271 | int toku_loader_write_ft_from_q_in_C (FTLOADER bl, |
272 | const DESCRIPTOR descriptor, |
273 | int fd, // write to here |
274 | int progress_allocation, |
275 | QUEUE q, |
276 | uint64_t total_disksize_estimate, |
277 | int which_db, |
278 | uint32_t target_nodesize, |
279 | uint32_t target_basementnodesize, |
280 | enum toku_compression_method target_compression_method, |
281 | uint32_t fanout); |
282 | |
283 | int ft_loader_mergesort_row_array (struct row rows[/*n*/], int n, int which_db, DB *dest_db, ft_compare_func, FTLOADER, struct rowset *); |
284 | |
285 | int ft_loader_write_file_to_dbfile (int outfile, FIDX infile, FTLOADER bl, const DESCRIPTOR descriptor, int progress_allocation); |
286 | |
287 | int ft_loader_init_file_infos (struct file_infos *fi); |
288 | void ft_loader_fi_destroy (struct file_infos *fi, bool is_error); |
289 | int ft_loader_fi_close (struct file_infos *fi, FIDX idx, bool require_open); |
290 | int ft_loader_fi_close_all (struct file_infos *fi); |
291 | int ft_loader_fi_reopen (struct file_infos *fi, FIDX idx, const char *mode); |
292 | int ft_loader_fi_unlink (struct file_infos *fi, FIDX idx); |
293 | |
294 | int toku_ft_loader_internal_init (/* out */ FTLOADER *blp, |
295 | CACHETABLE cachetable, |
296 | generate_row_for_put_func g, |
297 | DB *src_db, |
298 | int N, FT_HANDLE ft_hs[/*N*/], DB* dbs[/*N*/], |
299 | const char *new_fnames_in_env[/*N*/], |
300 | ft_compare_func bt_compare_functions[/*N*/], |
301 | const char *temp_file_template, |
302 | LSN load_lsn, |
303 | TOKUTXN txn, |
304 | bool reserve_memory, |
305 | uint64_t reserve_memory_size, |
306 | bool compress_intermediates, |
307 | bool allow_puts); |
308 | |
309 | void toku_ft_loader_internal_destroy (FTLOADER bl, bool is_error); |
310 | |
311 | // For test purposes only. (In production, the rowset size is determined by negotiation with the cachetable for some memory. See #2613.) |
312 | uint64_t toku_ft_loader_get_rowset_budget_for_testing (void); |
313 | |
314 | int (FTLOADER bl); |
315 | |
316 | int toku_ft_loader_get_error(FTLOADER bl, int *loader_errno); |
317 | |
318 | void ft_loader_lock_init(FTLOADER bl); |
319 | void ft_loader_lock_destroy(FTLOADER bl); |
320 | void ft_loader_set_fractal_workers_count_from_c(FTLOADER bl); |
321 | |