1/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3#ident "$Id$"
4/*======
5This file is part of PerconaFT.
6
7
8Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 PerconaFT is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 PerconaFT is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
21
22----------------------------------------
23
24 PerconaFT is free software: you can redistribute it and/or modify
25 it under the terms of the GNU Affero General Public License, version 3,
26 as published by the Free Software Foundation.
27
28 PerconaFT is distributed in the hope that it will be useful,
29 but WITHOUT ANY WARRANTY; without even the implied warranty of
30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 GNU Affero General Public License for more details.
32
33 You should have received a copy of the GNU Affero General Public License
34 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
35======= */
36
37#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38
39#pragma once
40
41#include <db.h>
42
43#include "portability/toku_pthread.h"
44
45#include "loader/dbufio.h"
46#include "loader/loader.h"
47#include "util/queue.h"
48
49enum {
50 EXTRACTOR_QUEUE_DEPTH = 2,
51 FILE_BUFFER_SIZE = 1<<24,
52 MIN_ROWSET_MEMORY = 1<<23,
53 MIN_MERGE_FANIN = 2,
54 FRACTAL_WRITER_QUEUE_DEPTH = 3,
55 FRACTAL_WRITER_ROWSETS = FRACTAL_WRITER_QUEUE_DEPTH + 2,
56 DBUFIO_DEPTH = 2,
57 TARGET_MERGE_BUF_SIZE = 1<<24, // we'd like the merge buffer to be this big.
58 MIN_MERGE_BUF_SIZE = 1<<20, // always use at least this much
59 MAX_UNCOMPRESSED_BUF = MIN_MERGE_BUF_SIZE
60};
61
62/* These functions are exported to allow the tests to compile. */
63
64/* These structures maintain a collection of all the open temporary files used by the loader. */
65struct file_info {
66 bool is_open;
67 bool is_extant; // if true, the file must be unlinked.
68 char *fname;
69 TOKU_FILE *file;
70 uint64_t n_rows; // how many rows were written into that file
71 size_t buffer_size;
72 void *buffer;
73};
74struct file_infos {
75 int n_files;
76 int n_files_limit;
77 struct file_info *file_infos;
78 int n_files_open, n_files_extant;
79 toku_mutex_t lock; // must protect this data structure because current activity performs a REALLOC(fi->file_infos).
80};
81typedef struct fidx { int idx; } FIDX;
82static const FIDX FIDX_NULL __attribute__((__unused__)) = {-1};
83static int fidx_is_null(const FIDX f) __attribute__((__unused__));
84static int fidx_is_null(const FIDX f) { return f.idx == -1; }
85TOKU_FILE *toku_bl_fidx2file(FTLOADER bl, FIDX i);
86
87int ft_loader_open_temp_file(FTLOADER bl, FIDX *file_idx);
88
89/* These data structures are used for manipulating a collection of rows in main memory. */
90struct row {
91 size_t off; // the offset in the data array.
92 int klen,vlen;
93};
94struct rowset {
95 uint64_t memory_budget;
96 size_t n_rows, n_rows_limit;
97 struct row *rows;
98 size_t n_bytes, n_bytes_limit;
99 char *data;
100};
101
102int init_rowset (struct rowset *rows, uint64_t memory_budget);
103void destroy_rowset(struct rowset *rows);
104int add_row(struct rowset *rows, DBT *key, DBT *val);
105
106int loader_write_row(DBT *key,
107 DBT *val,
108 FIDX data,
109 TOKU_FILE *,
110 uint64_t *dataoff,
111 struct wbuf *wb,
112 FTLOADER bl);
113int loader_read_row(TOKU_FILE *f, DBT *key, DBT *val);
114
115struct merge_fileset {
116 bool have_sorted_output; // Is there an previous key?
117 FIDX sorted_output; // this points to one of the data_fidxs. If output_is_sorted then this is the file containing sorted data. It's still open
118 DBT prev_key; // What is it? If it's here, its the last output in the merge fileset
119
120 int n_temp_files, n_temp_files_limit;
121 FIDX *data_fidxs;
122};
123
124void init_merge_fileset (struct merge_fileset *fs);
125void destroy_merge_fileset (struct merge_fileset *fs);
126
127struct poll_callback_s {
128 ft_loader_poll_func poll_function;
129 void *poll_extra;
130};
131typedef struct poll_callback_s *ft_loader_poll_callback;
132
133int ft_loader_init_poll_callback(ft_loader_poll_callback);
134
135void ft_loader_destroy_poll_callback(ft_loader_poll_callback);
136
137void ft_loader_set_poll_function(ft_loader_poll_callback, ft_loader_poll_func poll_function, void *poll_extra);
138
139int ft_loader_call_poll_function(ft_loader_poll_callback, float progress);
140
141struct error_callback_s {
142 int error;
143 ft_loader_error_func error_callback;
144 void *extra;
145 DB *db;
146 int which_db;
147 DBT key;
148 DBT val;
149 bool did_callback;
150 toku_mutex_t mutex;
151};
152typedef struct error_callback_s *ft_loader_error_callback;
153
154void ft_loader_init_error_callback(ft_loader_error_callback);
155
156void ft_loader_destroy_error_callback(ft_loader_error_callback);
157
158int ft_loader_get_error(ft_loader_error_callback);
159
160void ft_loader_set_error_function(ft_loader_error_callback, ft_loader_error_func error_function, void *extra);
161
162int ft_loader_set_error(ft_loader_error_callback, int error, DB *db, int which_db, DBT *key, DBT *val);
163
164int ft_loader_call_error_function(ft_loader_error_callback);
165
166int ft_loader_set_error_and_callback(ft_loader_error_callback, int error, DB *db, int which_db, DBT *key, DBT *val);
167
168struct ft_loader_s {
169 // These two are set in the close function, and used while running close
170 struct error_callback_s error_callback;
171 struct poll_callback_s poll_callback;
172
173 generate_row_for_put_func generate_row_for_put;
174 ft_compare_func *bt_compare_funs;
175
176 DB *src_db;
177 int N;
178 DB **dbs; // N of these
179 DESCRIPTOR *descriptors; // N of these.
180 TXNID *root_xids_that_created; // N of these.
181 const char **new_fnames_in_env; // N of these. The file names that the final data will be written to (relative to env).
182
183 uint64_t *extracted_datasizes; // N of these.
184
185 struct rowset primary_rowset; // the primary rows that have been put, but the secondary rows haven't been generated.
186 struct rowset primary_rowset_temp; // the primary rows that are being worked on by the extractor_thread.
187
188 QUEUE primary_rowset_queue; // main thread enqueues rowsets in this queue (in maybe 64MB chunks). The extractor thread removes them, sorts them, adn writes to file.
189 toku_pthread_t extractor_thread; // the thread that takes primary rowset and does extraction and the first level sort and write to file.
190 bool extractor_live;
191
192 DBT *last_key; // for each rowset, remember the most recently output key. The system may choose not to keep this up-to-date when a rowset is unsorted. These keys are malloced and ulen maintains the size of the malloced block.
193
194 struct rowset *rows; // secondary rows that have been put, but haven't been sorted and written to a file.
195 uint64_t n_rows; // how many rows have been put?
196 struct merge_fileset *fs;
197
198 const char *temp_file_template;
199
200 CACHETABLE cachetable;
201 bool did_reserve_memory;
202 bool compress_intermediates;
203 bool allow_puts;
204 uint64_t reserved_memory; // how much memory are we allowed to use?
205
206 /* To make it easier to recover from errors, we don't use TOKU_FILE*,
207 * instead we use an index into the file_infos. */
208 struct file_infos file_infos;
209
210#define PROGRESS_MAX (1 << 16)
211 int progress; // Progress runs from 0 to PROGRESS_MAX. When we call the poll function we convert to a float from 0.0 to 1.0
212 // We use an integer so that we can add to the progress using a fetch-and-add instruction.
213
214 int progress_callback_result; // initially zero, if any call to the poll function callback returns nonzero, we save the result here (and don't call the poll callback function again).
215
216 LSN load_lsn; //LSN of the fsynced 'load' log entry. Write this LSN (as checkpoint_lsn) in ft headers made by this loader.
217 TXNID load_root_xid; //(Root) transaction that performed the load.
218
219 QUEUE *fractal_queues; // an array of work queues, one for each secondary index.
220 toku_pthread_t *fractal_threads;
221 bool *fractal_threads_live; // an array of bools indicating that fractal_threads[i] is a live thread. (There is no NULL for a pthread_t, so we have to maintain this separately).
222
223 unsigned fractal_workers; // number of fractal tree writer threads
224
225 toku_mutex_t mutex;
226 bool mutex_init;
227};
228
229// Set the number of rows in the loader. Used for test.
230void toku_ft_loader_set_n_rows(FTLOADER bl, uint64_t n_rows);
231
232// Get the number of rows in the loader. Used for test.
233uint64_t toku_ft_loader_get_n_rows(FTLOADER bl);
234
235// The data passed into a fractal_thread via pthread_create.
236struct fractal_thread_args {
237 FTLOADER bl;
238 const DESCRIPTOR descriptor;
239 int fd; // write the ft into fd.
240 int progress_allocation;
241 QUEUE q;
242 uint64_t total_disksize_estimate;
243 int errno_result; // the final result.
244 int which_db;
245 uint32_t target_nodesize;
246 uint32_t target_basementnodesize;
247 enum toku_compression_method target_compression_method;
248 uint32_t target_fanout;
249};
250
251void toku_ft_loader_set_n_rows(FTLOADER bl, uint64_t n_rows);
252uint64_t toku_ft_loader_get_n_rows(FTLOADER bl);
253
254int merge_row_arrays_base (struct row dest[/*an+bn*/], struct row a[/*an*/], int an, struct row b[/*bn*/], int bn,
255 int which_db, DB *dest_db, ft_compare_func,
256 FTLOADER,
257 struct rowset *);
258
259int merge_files (struct merge_fileset *fs, FTLOADER bl, int which_db, DB *dest_db, ft_compare_func, int progress_allocation, QUEUE);
260
261int sort_and_write_rows (struct rowset rows, struct merge_fileset *fs, FTLOADER bl, int which_db, DB *dest_db, ft_compare_func);
262
263int mergesort_row_array (struct row rows[/*n*/], int n, int which_db, DB *dest_db, ft_compare_func, FTLOADER, struct rowset *);
264
265//int write_file_to_dbfile (int outfile, FIDX infile, FTLOADER bl, const DESCRIPTOR descriptor, int progress_allocation);
266int toku_merge_some_files_using_dbufio (const bool to_q, FIDX dest_data, QUEUE q, int n_sources, DBUFIO_FILESET bfs, FIDX srcs_fidxs[/*n_sources*/], FTLOADER bl, int which_db, DB *dest_db, ft_compare_func compare, int progress_allocation);
267
268int ft_loader_sort_and_write_rows (struct rowset *rows, struct merge_fileset *fs, FTLOADER bl, int which_db, DB *dest_db, ft_compare_func);
269
270// This is probably only for testing.
271int toku_loader_write_ft_from_q_in_C (FTLOADER bl,
272 const DESCRIPTOR descriptor,
273 int fd, // write to here
274 int progress_allocation,
275 QUEUE q,
276 uint64_t total_disksize_estimate,
277 int which_db,
278 uint32_t target_nodesize,
279 uint32_t target_basementnodesize,
280 enum toku_compression_method target_compression_method,
281 uint32_t fanout);
282
283int ft_loader_mergesort_row_array (struct row rows[/*n*/], int n, int which_db, DB *dest_db, ft_compare_func, FTLOADER, struct rowset *);
284
285int ft_loader_write_file_to_dbfile (int outfile, FIDX infile, FTLOADER bl, const DESCRIPTOR descriptor, int progress_allocation);
286
287int ft_loader_init_file_infos (struct file_infos *fi);
288void ft_loader_fi_destroy (struct file_infos *fi, bool is_error);
289int ft_loader_fi_close (struct file_infos *fi, FIDX idx, bool require_open);
290int ft_loader_fi_close_all (struct file_infos *fi);
291int ft_loader_fi_reopen (struct file_infos *fi, FIDX idx, const char *mode);
292int ft_loader_fi_unlink (struct file_infos *fi, FIDX idx);
293
294int toku_ft_loader_internal_init (/* out */ FTLOADER *blp,
295 CACHETABLE cachetable,
296 generate_row_for_put_func g,
297 DB *src_db,
298 int N, FT_HANDLE ft_hs[/*N*/], DB* dbs[/*N*/],
299 const char *new_fnames_in_env[/*N*/],
300 ft_compare_func bt_compare_functions[/*N*/],
301 const char *temp_file_template,
302 LSN load_lsn,
303 TOKUTXN txn,
304 bool reserve_memory,
305 uint64_t reserve_memory_size,
306 bool compress_intermediates,
307 bool allow_puts);
308
309void toku_ft_loader_internal_destroy (FTLOADER bl, bool is_error);
310
311// For test purposes only. (In production, the rowset size is determined by negotiation with the cachetable for some memory. See #2613.)
312uint64_t toku_ft_loader_get_rowset_budget_for_testing (void);
313
314int toku_ft_loader_finish_extractor(FTLOADER bl);
315
316int toku_ft_loader_get_error(FTLOADER bl, int *loader_errno);
317
318void ft_loader_lock_init(FTLOADER bl);
319void ft_loader_lock_destroy(FTLOADER bl);
320void ft_loader_set_fractal_workers_count_from_c(FTLOADER bl);
321