1/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3#ident "$Id$"
4/*======
5This file is part of PerconaFT.
6
7
8Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 PerconaFT is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 PerconaFT is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
21
22----------------------------------------
23
24 PerconaFT is free software: you can redistribute it and/or modify
25 it under the terms of the GNU Affero General Public License, version 3,
26 as published by the Free Software Foundation.
27
28 PerconaFT is distributed in the hope that it will be useful,
29 but WITHOUT ANY WARRANTY; without even the implied warranty of
30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 GNU Affero General Public License for more details.
32
33 You should have received a copy of the GNU Affero General Public License
34 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
35======= */
36
37#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38
39#include <errno.h>
40#include <stdio.h>
41#include <string.h>
42#include <zlib.h>
43
44#include "portability/memory.h"
45#include "portability/toku_assert.h"
46#include "portability/toku_portability.h"
47
48#include "ft/serialize/compress.h"
49#include "ft/serialize/sub_block.h"
50#include "ft/serialize/quicklz.h"
51#include "util/threadpool.h"
52#include "util/x1764.h"
53
54toku_instr_key *workset_lock_mutex_key;
55toku_instr_key *ws_worker_wait_key;
56
57SUB_BLOCK sub_block_creat(void) {
58 SUB_BLOCK XMALLOC(sb);
59 sub_block_init(sb);
60 return sb;
61}
62void sub_block_init(SUB_BLOCK sub_block) {
63 sub_block->uncompressed_ptr = 0;
64 sub_block->uncompressed_size = 0;
65
66 sub_block->compressed_ptr = 0;
67 sub_block->compressed_size_bound = 0;
68 sub_block->compressed_size = 0;
69
70 sub_block->xsum = 0;
71}
72
73// get the size of the compression header
74size_t
75sub_block_header_size(int n_sub_blocks) {
76 return sizeof (uint32_t) + n_sub_blocks * sizeof (struct stored_sub_block);
77}
78
79void
80set_compressed_size_bound(struct sub_block *se, enum toku_compression_method method) {
81 se->compressed_size_bound = toku_compress_bound(method, se->uncompressed_size);
82}
83
84// get the sum of the sub block compressed sizes
85size_t
86get_sum_compressed_size_bound(int n_sub_blocks, struct sub_block sub_block[], enum toku_compression_method method) {
87 size_t compressed_size_bound = 0;
88 for (int i = 0; i < n_sub_blocks; i++) {
89 sub_block[i].compressed_size_bound = toku_compress_bound(method, sub_block[i].uncompressed_size);
90 compressed_size_bound += sub_block[i].compressed_size_bound;
91 }
92 return compressed_size_bound;
93}
94
95// get the sum of the sub block uncompressed sizes
96size_t
97get_sum_uncompressed_size(int n_sub_blocks, struct sub_block sub_block[]) {
98 size_t uncompressed_size = 0;
99 for (int i = 0; i < n_sub_blocks; i++)
100 uncompressed_size += sub_block[i].uncompressed_size;
101 return uncompressed_size;
102}
103
104// round up n
105static inline int
106alignup32(int a, int b) {
107 return ((a+b-1) / b) * b;
108}
109
110// Choose n_sub_blocks and sub_block_size such that the product is >= total_size and the sub_block_size is at
111// least >= the target_sub_block_size.
112int
113choose_sub_block_size(int total_size, int n_sub_blocks_limit, int *sub_block_size_ret, int *n_sub_blocks_ret) {
114 if (total_size < 0 || n_sub_blocks_limit < 1)
115 return EINVAL;
116
117 const int alignment = 32;
118
119 int n_sub_blocks, sub_block_size;
120 n_sub_blocks = total_size / target_sub_block_size;
121 if (n_sub_blocks <= 1) {
122 if (total_size > 0 && n_sub_blocks_limit > 0)
123 n_sub_blocks = 1;
124 sub_block_size = total_size;
125 } else {
126 if (n_sub_blocks > n_sub_blocks_limit) // limit the number of sub-blocks
127 n_sub_blocks = n_sub_blocks_limit;
128 sub_block_size = alignup32(total_size / n_sub_blocks, alignment);
129 while (sub_block_size * n_sub_blocks < total_size) // round up the sub-block size until big enough
130 sub_block_size += alignment;
131 }
132
133 *sub_block_size_ret = sub_block_size;
134 *n_sub_blocks_ret = n_sub_blocks;
135
136 return 0;
137}
138
139// Choose the right size of basement nodes. For now, just align up to
140// 256k blocks and hope it compresses well enough.
141int
142choose_basement_node_size(int total_size, int *sub_block_size_ret, int *n_sub_blocks_ret) {
143 if (total_size < 0)
144 return EINVAL;
145
146 *n_sub_blocks_ret = (total_size + max_basement_node_uncompressed_size - 1) / max_basement_node_uncompressed_size;
147 *sub_block_size_ret = max_basement_node_uncompressed_size;
148
149 return 0;
150}
151
152void
153set_all_sub_block_sizes(int total_size, int sub_block_size, int n_sub_blocks, struct sub_block sub_block[]) {
154 int size_left = total_size;
155 int i;
156 for (i = 0; i < n_sub_blocks-1; i++) {
157 sub_block[i].uncompressed_size = sub_block_size;
158 size_left -= sub_block_size;
159 }
160 if (i == 0 || size_left > 0)
161 sub_block[i].uncompressed_size = size_left;
162}
163
164// find the index of the first sub block that contains offset
165// Returns the sub block index, else returns -1
166int
167get_sub_block_index(int n_sub_blocks, struct sub_block sub_block[], size_t offset) {
168 size_t start_offset = 0;
169 for (int i = 0; i < n_sub_blocks; i++) {
170 size_t size = sub_block[i].uncompressed_size;
171 if (offset < start_offset + size)
172 return i;
173 start_offset += size;
174 }
175 return -1;
176}
177
178#include "workset.h"
179
180void
181compress_work_init(struct compress_work *w, enum toku_compression_method method, struct sub_block *sub_block) {
182 w->method = method;
183 w->sub_block = sub_block;
184}
185
186//
187// takes the uncompressed contents of sub_block
188// and compresses them into sb_compressed_ptr
189// cs_bound is the compressed size bound
190// Returns the size of the compressed data
191//
192uint32_t
193compress_nocrc_sub_block(
194 struct sub_block *sub_block,
195 void* sb_compressed_ptr,
196 uint32_t cs_bound,
197 enum toku_compression_method method
198 )
199{
200 // compress it
201 Bytef *uncompressed_ptr = (Bytef *) sub_block->uncompressed_ptr;
202 Bytef *compressed_ptr = (Bytef *) sb_compressed_ptr;
203 uLongf uncompressed_len = sub_block->uncompressed_size;
204 uLongf real_compressed_len = cs_bound;
205 toku_compress(method,
206 compressed_ptr, &real_compressed_len,
207 uncompressed_ptr, uncompressed_len);
208 return real_compressed_len;
209}
210
211void
212compress_sub_block(struct sub_block *sub_block, enum toku_compression_method method) {
213 sub_block->compressed_size = compress_nocrc_sub_block(
214 sub_block,
215 sub_block->compressed_ptr,
216 sub_block->compressed_size_bound,
217 method
218 );
219 // checksum it
220 sub_block->xsum = toku_x1764_memory(sub_block->compressed_ptr, sub_block->compressed_size);
221}
222
223void *
224compress_worker(void *arg) {
225 struct workset *ws = (struct workset *) arg;
226 while (1) {
227 struct compress_work *w = (struct compress_work *) workset_get(ws);
228 if (w == NULL)
229 break;
230 compress_sub_block(w->sub_block, w->method);
231 }
232 workset_release_ref(ws);
233 return arg;
234}
235
236size_t
237compress_all_sub_blocks(int n_sub_blocks, struct sub_block sub_block[], char *uncompressed_ptr, char *compressed_ptr, int num_cores, struct toku_thread_pool *pool, enum toku_compression_method method) {
238 char *compressed_base_ptr = compressed_ptr;
239 size_t compressed_len;
240
241 // This is a complex way to write a parallel loop. Cilk would be better.
242
243 if (n_sub_blocks == 1) {
244 // single sub-block
245 sub_block[0].uncompressed_ptr = uncompressed_ptr;
246 sub_block[0].compressed_ptr = compressed_ptr;
247 compress_sub_block(&sub_block[0], method);
248 compressed_len = sub_block[0].compressed_size;
249 } else {
250 // multiple sub-blocks
251 int T = num_cores; // T = min(num_cores, n_sub_blocks) - 1
252 if (T > n_sub_blocks)
253 T = n_sub_blocks;
254 if (T > 0)
255 T = T - 1; // threads in addition to the running thread
256
257 struct workset ws;
258 ZERO_STRUCT(ws);
259 workset_init(&ws);
260
261 struct compress_work work[n_sub_blocks];
262 workset_lock(&ws);
263 for (int i = 0; i < n_sub_blocks; i++) {
264 sub_block[i].uncompressed_ptr = uncompressed_ptr;
265 sub_block[i].compressed_ptr = compressed_ptr;
266 compress_work_init(&work[i], method, &sub_block[i]);
267 workset_put_locked(&ws, &work[i].base);
268 uncompressed_ptr += sub_block[i].uncompressed_size;
269 compressed_ptr += sub_block[i].compressed_size_bound;
270 }
271 workset_unlock(&ws);
272
273 // compress the sub-blocks
274 if (0) printf("%s:%d T=%d N=%d\n", __FUNCTION__, __LINE__, T, n_sub_blocks);
275 toku_thread_pool_run(pool, 0, &T, compress_worker, &ws);
276 workset_add_ref(&ws, T);
277 compress_worker(&ws);
278
279 // wait for all of the work to complete
280 workset_join(&ws);
281 workset_destroy(&ws);
282
283 // squeeze out the holes not used by the compress bound
284 compressed_ptr = compressed_base_ptr + sub_block[0].compressed_size;
285 for (int i = 1; i < n_sub_blocks; i++) {
286 memmove(compressed_ptr, sub_block[i].compressed_ptr, sub_block[i].compressed_size);
287 compressed_ptr += sub_block[i].compressed_size;
288 }
289
290 compressed_len = compressed_ptr - compressed_base_ptr;
291 }
292 return compressed_len;
293}
294
295// initialize the decompression work
296void
297decompress_work_init(struct decompress_work *dw,
298 void *compress_ptr, uint32_t compress_size,
299 void *uncompress_ptr, uint32_t uncompress_size,
300 uint32_t xsum) {
301 dw->compress_ptr = compress_ptr;
302 dw->compress_size = compress_size;
303 dw->uncompress_ptr = uncompress_ptr;
304 dw->uncompress_size = uncompress_size;
305 dw->xsum = xsum;
306 dw->error = 0;
307}
308
309int verbose_decompress_sub_block = 1;
310
311// decompress one block
312int
313decompress_sub_block(void *compress_ptr, uint32_t compress_size, void *uncompress_ptr, uint32_t uncompress_size, uint32_t expected_xsum) {
314 int result = 0;
315
316 // verify checksum
317 uint32_t xsum = toku_x1764_memory(compress_ptr, compress_size);
318 if (xsum != expected_xsum) {
319 if (verbose_decompress_sub_block) fprintf(stderr, "%s:%d xsum %u expected %u\n", __FUNCTION__, __LINE__, xsum, expected_xsum);
320 result = EINVAL;
321 } else {
322 // decompress
323 toku_decompress((Bytef *) uncompress_ptr, uncompress_size, (Bytef *) compress_ptr, compress_size);
324 }
325 return result;
326}
327
328// decompress blocks until there is no more work to do
329void *
330decompress_worker(void *arg) {
331 struct workset *ws = (struct workset *) arg;
332 while (1) {
333 struct decompress_work *dw = (struct decompress_work *) workset_get(ws);
334 if (dw == NULL)
335 break;
336 dw->error = decompress_sub_block(dw->compress_ptr, dw->compress_size, dw->uncompress_ptr, dw->uncompress_size, dw->xsum);
337 }
338 workset_release_ref(ws);
339 return arg;
340}
341
342int
343decompress_all_sub_blocks(int n_sub_blocks, struct sub_block sub_block[], unsigned char *compressed_data, unsigned char *uncompressed_data, int num_cores, struct toku_thread_pool *pool) {
344 int r;
345
346 if (n_sub_blocks == 1) {
347 r = decompress_sub_block(compressed_data, sub_block[0].compressed_size, uncompressed_data, sub_block[0].uncompressed_size, sub_block[0].xsum);
348 } else {
349 // compute the number of additional threads needed for decompressing this node
350 int T = num_cores; // T = min(#cores, #blocks) - 1
351 if (T > n_sub_blocks)
352 T = n_sub_blocks;
353 if (T > 0)
354 T = T - 1; // threads in addition to the running thread
355
356 // init the decompression work set
357 struct workset ws;
358 ZERO_STRUCT(ws);
359 workset_init(&ws);
360
361 // initialize the decompression work and add to the work set
362 struct decompress_work decompress_work[n_sub_blocks];
363 workset_lock(&ws);
364 for (int i = 0; i < n_sub_blocks; i++) {
365 decompress_work_init(&decompress_work[i], compressed_data, sub_block[i].compressed_size, uncompressed_data, sub_block[i].uncompressed_size, sub_block[i].xsum);
366 workset_put_locked(&ws, &decompress_work[i].base);
367
368 uncompressed_data += sub_block[i].uncompressed_size;
369 compressed_data += sub_block[i].compressed_size;
370 }
371 workset_unlock(&ws);
372
373 // decompress the sub-blocks
374 if (0) printf("%s:%d Cores=%d Blocks=%d T=%d\n", __FUNCTION__, __LINE__, num_cores, n_sub_blocks, T);
375 toku_thread_pool_run(pool, 0, &T, decompress_worker, &ws);
376 workset_add_ref(&ws, T);
377 decompress_worker(&ws);
378
379 // cleanup
380 workset_join(&ws);
381 workset_destroy(&ws);
382
383 r = 0;
384 for (int i = 0; i < n_sub_blocks; i++) {
385 r = decompress_work[i].error;
386 if (r != 0)
387 break;
388 }
389 }
390
391 return r;
392}
393