1/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3#ident "$Id$"
4/*======
5This file is part of TokuDB
6
7
8Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 TokuDBis is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 TokuDB is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with TokuDB. If not, see <http://www.gnu.org/licenses/>.
21
22======= */
23
24#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
25
26// Update operation codes. These codes get stuffed into update messages, so they can not change.
27// The operations are currently stored in a single byte in the update message, so only 256 operations
28// are supported. When we need more, we can use the last (255) code to indicate that the operation code
29// is expanded beyond 1 byte.
30enum {
31 UPDATE_OP_COL_ADD_OR_DROP = 0,
32
33 UPDATE_OP_EXPAND_VARIABLE_OFFSETS = 1,
34 UPDATE_OP_EXPAND_INT = 2,
35 UPDATE_OP_EXPAND_UINT = 3,
36 UPDATE_OP_EXPAND_CHAR = 4,
37 UPDATE_OP_EXPAND_BINARY = 5,
38 UPDATE_OP_EXPAND_BLOB = 6,
39
40 UPDATE_OP_UPDATE_1 = 10,
41 UPDATE_OP_UPSERT_1 = 11,
42 UPDATE_OP_UPDATE_2 = 12,
43 UPDATE_OP_UPSERT_2 = 13,
44};
45
46// Field types used in the update messages
47enum {
48 UPDATE_TYPE_UNKNOWN = 0,
49 UPDATE_TYPE_INT = 1,
50 UPDATE_TYPE_UINT = 2,
51 UPDATE_TYPE_CHAR = 3,
52 UPDATE_TYPE_BINARY = 4,
53 UPDATE_TYPE_VARCHAR = 5,
54 UPDATE_TYPE_VARBINARY = 6,
55 UPDATE_TYPE_TEXT = 7,
56 UPDATE_TYPE_BLOB = 8,
57};
58
59#define UP_COL_ADD_OR_DROP UPDATE_OP_COL_ADD_OR_DROP
60
61// add or drop column sub-operations
62#define COL_DROP 0xaa
63#define COL_ADD 0xbb
64
65// add or drop column types
66#define COL_FIXED 0xcc
67#define COL_VAR 0xdd
68#define COL_BLOB 0xee
69
70#define STATIC_ROW_MUTATOR_SIZE 1+8+2+8+8+8
71
72// how much space do I need for the mutators?
73// static stuff first:
74// operation 1 == UP_COL_ADD_OR_DROP
75// 8 - old null, new null
76// 2 - old num_offset, new num_offset
77// 8 - old fixed_field size, new fixed_field_size
78// 8 - old and new length of offsets
79// 8 - old and new starting null bit position
80// TOTAL: 27
81
82// dynamic stuff:
83// 4 - number of columns
84// for each column:
85// 1 - add or drop
86// 1 - is nullable
87// 4 - if nullable, position
88// 1 - if add, whether default is null or not
89// 1 - if fixed, var, or not
90// for fixed, entire default
91// for var, 4 bytes length, then entire default
92// for blob, nothing
93// So, an upperbound is 4 + num_fields(12) + all default stuff
94
95// static blob stuff:
96// 4 - num blobs
97// 1 byte for each num blobs in old table
98// So, an upperbound is 4 + kc_info->num_blobs
99
100// dynamic blob stuff:
101// for each blob added:
102// 1 - state if we are adding or dropping
103// 4 - blob index
104// if add, 1 len bytes
105// at most, 4 0's
106// So, upperbound is num_blobs(1+4+1+4) = num_columns*10
107
108// The expand varchar offsets message is used to expand the size of an offset
109// from 1 to 2 bytes. Not VLQ coded.
110// uint8 operation = UPDATE_OP_EXPAND_VARIABLE_OFFSETS
111// uint32 number of offsets
112// uint32 starting offset of the variable length field offsets
113
114// Expand the size of a fixed length column message. Not VLQ coded.
115// The field type is encoded in the operation code.
116// uint8 operation = UPDATE_OP_EXPAND_INT/UINT/CHAR/BINARY
117// uint32 offset offset of the field
118// uint32 old length the old length of the field's value
119// uint32 new length the new length of the field's value
120
121// uint8 operation = UPDATE_OP_EXPAND_CHAR/BINARY
122// uint32 offset offset of the field
123// uint32 old length the old length of the field's value
124// uint32 new length the new length of the field's value
125// uint8 pad char
126
127// Expand blobs message. VLQ coded.
128// uint8 operation = UPDATE_OP_EXPAND_BLOB
129// uint32 start variable offset
130// uint32 variable offset bytes
131// uint32 bytes per offset
132// uint32 num blobs = N
133// uint8 old lengths[N]
134// uint8 new lengths[N]
135
136// Update and Upsert version 1 messages. Not VLQ coded. Not used anymore, but
137// may be in the fractal tree from a previous build.
138//
139// Field descriptor:
140// Operations:
141// update operation 4 == { '=', '+', '-' }
142// x = k
143// x = x + k
144// x = x - k
145// field type 4 see field types above
146// unused 4 unused
147// field null num 4 bit 31 is 1 if the field is nullible and the
148// remaining bits contain the null bit number
149// field offset 4 for fixed fields, this is the offset from
150// begining of the row of the field
151// value:
152// value length 4 == N, length of the value
153// value N value to add or subtract
154//
155// Update_1 message:
156// Operation 1 == UPDATE_OP_UPDATE_1
157// fixed field offset 4 offset of the beginning of the fixed fields
158// var field offset 4 offset of the variable length offsets
159// var_offset_bytes 1 length of offsets (Note: not big enough)
160// bytes_per_offset 4 number of bytes per offset
161// Number of update ops 4 == N
162// Update ops [N]
163//
164// Upsert_1 message:
165// Operation 1 == UPDATE_OP_UPSERT_1
166// Insert row:
167// length 4 == N
168// data N
169// fixed field offset 4 offset of the beginning of the fixed fields
170// var field offset 4 offset of the variable length offsets
171// var_offset_bytes 1 length of offsets (Note: not big enough)
172// bytes_per_offset 4 number of bytes per offset
173// Number of update ops 4 == N
174// Update ops [N]
175
176// Update and Upserver version 2 messages. VLQ coded.
177// Update version 2
178// uint8 operation = UPDATE_OP_UPDATE_2
179// uint32 number of update ops = N
180// uint8 update ops [ N ]
181//
182// Upsert version 2
183// uint8 operation = UPDATE_OP_UPSERT_2
184// uint32 insert length = N
185// uint8 insert data [ N ]
186// uint32 number of update ops = M
187// update ops [ M ]
188//
189// Variable fields info
190// uint32 update operation = 'v'
191// uint32 start offset
192// uint32 num varchars
193// uint32 bytes per offset
194//
195// Blobs info
196// uint32 update operation = 'b'
197// uint32 num blobs = N
198// uint8 blob lengths [ N ]
199//
200// Update operation on fixed length fields
201// uint32 update operation = '=', '+', '-'
202// uint32 field type
203// uint32 null num 0 => not nullable, otherwise encoded as field_null_num + 1
204// uint32 offset
205// uint32 value length = N
206// uint8 value [ N ]
207//
208// Update operation on varchar fields
209// uint32 update operation = '='
210// uint32 field type
211// uint32 null num
212// uint32 var index
213// uint32 value length = N
214// uint8 value [ N ]
215//
216// Update operation on blob fields
217// uint32 update operation = '='
218// uint32 field type
219// uint32 null num
220// uint32 blob index
221// uint32 value length = N
222// uint8 value [ N ]
223
224#include "tokudb_buffer.h"
225#include "tokudb_math.h"
226
227//
228// checks whether the bit at index pos in data is set or not
229//
230static inline bool is_overall_null_position_set(uchar* data, uint32_t pos) {
231 uint32_t offset = pos/8;
232 uchar remainder = pos%8;
233 uchar null_bit = 1<<remainder;
234 return ((data[offset] & null_bit) != 0);
235}
236
237//
238// sets the bit at index pos in data to 1 if is_null, 0 otherwise
239//
240static inline void set_overall_null_position(
241 uchar* data,
242 uint32_t pos,
243 bool is_null) {
244
245 uint32_t offset = pos/8;
246 uchar remainder = pos%8;
247 uchar null_bit = 1<<remainder;
248 if (is_null) {
249 data[offset] |= null_bit;
250 }
251 else {
252 data[offset] &= ~null_bit;
253 }
254}
255
256static inline void copy_null_bits(
257 uint32_t start_old_pos,
258 uint32_t start_new_pos,
259 uint32_t num_bits,
260 uchar* old_null_bytes,
261 uchar* new_null_bytes) {
262 for (uint32_t i = 0; i < num_bits; i++) {
263 uint32_t curr_old_pos = i + start_old_pos;
264 uint32_t curr_new_pos = i + start_new_pos;
265 // copy over old null bytes
266 if (is_overall_null_position_set(old_null_bytes,curr_old_pos)) {
267 set_overall_null_position(new_null_bytes,curr_new_pos,true);
268 }
269 else {
270 set_overall_null_position(new_null_bytes,curr_new_pos,false);
271 }
272 }
273}
274
275static inline void copy_var_fields(
276 //index of var fields that we should start writing
277 uint32_t start_old_num_var_field,
278 // number of var fields to copy
279 uint32_t num_var_fields,
280 //static ptr to where offset bytes begin in old row
281 uchar* old_var_field_offset_ptr,
282 //number of offset bytes used in old row
283 uchar old_num_offset_bytes,
284 // where the new var data should be written
285 uchar* start_new_var_field_data_ptr,
286 // where the new var offsets should be written
287 uchar* start_new_var_field_offset_ptr,
288 // pointer to beginning of var fields in new row
289 uchar* new_var_field_data_ptr,
290 // pointer to beginning of var fields in old row
291 uchar* old_var_field_data_ptr,
292 // number of offset bytes used in new row
293 uint32_t new_num_offset_bytes,
294 uint32_t* num_data_bytes_written,
295 uint32_t* num_offset_bytes_written) {
296
297 uchar* curr_new_var_field_data_ptr = start_new_var_field_data_ptr;
298 uchar* curr_new_var_field_offset_ptr = start_new_var_field_offset_ptr;
299 for (uint32_t i = 0; i < num_var_fields; i++) {
300 uint32_t field_len;
301 uint32_t start_read_offset;
302 uint32_t curr_old = i + start_old_num_var_field;
303 uchar* data_to_copy = NULL;
304 // get the length and pointer to data that needs to be copied
305 get_var_field_info(
306 &field_len,
307 &start_read_offset,
308 curr_old,
309 old_var_field_offset_ptr,
310 old_num_offset_bytes);
311 data_to_copy = old_var_field_data_ptr + start_read_offset;
312 // now need to copy field_len bytes starting from data_to_copy
313 curr_new_var_field_data_ptr = write_var_field(
314 curr_new_var_field_offset_ptr,
315 curr_new_var_field_data_ptr,
316 new_var_field_data_ptr,
317 data_to_copy,
318 field_len,
319 new_num_offset_bytes);
320 curr_new_var_field_offset_ptr += new_num_offset_bytes;
321 }
322 *num_data_bytes_written =
323 (uint32_t)(curr_new_var_field_data_ptr - start_new_var_field_data_ptr);
324 *num_offset_bytes_written =
325 (uint32_t)(curr_new_var_field_offset_ptr -
326 start_new_var_field_offset_ptr);
327}
328
329static inline uint32_t copy_toku_blob(
330 uchar* to_ptr,
331 uchar* from_ptr,
332 uint32_t len_bytes,
333 bool skip) {
334
335 uint32_t length = 0;
336 if (!skip) {
337 memcpy(to_ptr, from_ptr, len_bytes);
338 }
339 length = get_blob_field_len(from_ptr,len_bytes);
340 if (!skip) {
341 memcpy(to_ptr + len_bytes, from_ptr + len_bytes, length);
342 }
343 return (length + len_bytes);
344}
345
346static int tokudb_hcad_update_fun(
347 DB* db,
348 const DBT* key,
349 const DBT* old_val,
350 const DBT* extra,
351 void (*set_val)(const DBT* new_val, void* set_extra),
352 void* set_extra) {
353
354 uint32_t max_num_bytes;
355 uint32_t num_columns;
356 DBT new_val;
357 uint32_t num_bytes_left;
358 uint32_t num_var_fields_to_copy;
359 uint32_t num_data_bytes_written = 0;
360 uint32_t num_offset_bytes_written = 0;
361 int error;
362 memset(&new_val, 0, sizeof(DBT));
363 uchar operation;
364 uchar* new_val_data = NULL;
365 uchar* extra_pos = NULL;
366 uchar* extra_pos_start = NULL;
367 //
368 // info for pointers into rows
369 //
370 uint32_t old_num_null_bytes;
371 uint32_t new_num_null_bytes;
372 uchar old_num_offset_bytes;
373 uchar new_num_offset_bytes;
374 uint32_t old_fixed_field_size;
375 uint32_t new_fixed_field_size;
376 uint32_t old_len_of_offsets;
377 uint32_t new_len_of_offsets;
378
379 uchar* old_fixed_field_ptr = NULL;
380 uchar* new_fixed_field_ptr = NULL;
381 uint32_t curr_old_fixed_offset;
382 uint32_t curr_new_fixed_offset;
383
384 uchar* old_null_bytes = NULL;
385 uchar* new_null_bytes = NULL;
386 uint32_t curr_old_null_pos;
387 uint32_t curr_new_null_pos;
388 uint32_t old_null_bits_left;
389 uint32_t new_null_bits_left;
390 uint32_t overall_null_bits_left;
391
392 uint32_t old_num_var_fields;
393 // uint32_t new_num_var_fields;
394 uint32_t curr_old_num_var_field;
395 uint32_t curr_new_num_var_field;
396 uchar* old_var_field_offset_ptr = NULL;
397 uchar* new_var_field_offset_ptr = NULL;
398 uchar* curr_new_var_field_offset_ptr = NULL;
399 uchar* old_var_field_data_ptr = NULL;
400 uchar* new_var_field_data_ptr = NULL;
401 uchar* curr_new_var_field_data_ptr = NULL;
402
403 uint32_t start_blob_offset;
404 uchar* start_blob_ptr;
405 uint32_t num_blob_bytes;
406
407 // came across a delete, nothing to update
408 if (old_val == NULL) {
409 error = 0;
410 goto cleanup;
411 }
412
413 extra_pos_start = (uchar *)extra->data;
414 extra_pos = (uchar *)extra->data;
415
416 operation = extra_pos[0];
417 extra_pos++;
418 assert_always(operation == UP_COL_ADD_OR_DROP);
419
420 memcpy(&old_num_null_bytes, extra_pos, sizeof(uint32_t));
421 extra_pos += sizeof(uint32_t);
422 memcpy(&new_num_null_bytes, extra_pos, sizeof(uint32_t));
423 extra_pos += sizeof(uint32_t);
424
425 old_num_offset_bytes = extra_pos[0];
426 extra_pos++;
427 new_num_offset_bytes = extra_pos[0];
428 extra_pos++;
429
430 memcpy(&old_fixed_field_size, extra_pos, sizeof(uint32_t));
431 extra_pos += sizeof(uint32_t);
432 memcpy(&new_fixed_field_size, extra_pos, sizeof(uint32_t));
433 extra_pos += sizeof(uint32_t);
434
435 memcpy(&old_len_of_offsets, extra_pos, sizeof(uint32_t));
436 extra_pos += sizeof(uint32_t);
437 memcpy(&new_len_of_offsets, extra_pos, sizeof(uint32_t));
438 extra_pos += sizeof(uint32_t);
439
440 max_num_bytes =
441 old_val->size + extra->size + new_len_of_offsets + new_fixed_field_size;
442 new_val_data = (uchar *)tokudb::memory::malloc(
443 max_num_bytes,
444 MYF(MY_FAE));
445 if (new_val_data == NULL) {
446 error = ENOMEM;
447 goto cleanup;
448 }
449
450 old_fixed_field_ptr = (uchar *) old_val->data;
451 old_fixed_field_ptr += old_num_null_bytes;
452 new_fixed_field_ptr = new_val_data + new_num_null_bytes;
453 curr_old_fixed_offset = 0;
454 curr_new_fixed_offset = 0;
455
456 old_num_var_fields = old_len_of_offsets/old_num_offset_bytes;
457 // new_num_var_fields = new_len_of_offsets/new_num_offset_bytes;
458 // following fields will change as we write the variable data
459 old_var_field_offset_ptr = old_fixed_field_ptr + old_fixed_field_size;
460 new_var_field_offset_ptr = new_fixed_field_ptr + new_fixed_field_size;
461 old_var_field_data_ptr = old_var_field_offset_ptr + old_len_of_offsets;
462 new_var_field_data_ptr = new_var_field_offset_ptr + new_len_of_offsets;
463 curr_new_var_field_offset_ptr = new_var_field_offset_ptr;
464 curr_new_var_field_data_ptr = new_var_field_data_ptr;
465 curr_old_num_var_field = 0;
466 curr_new_num_var_field = 0;
467
468 old_null_bytes = (uchar *)old_val->data;
469 new_null_bytes = new_val_data;
470
471 memcpy(&curr_old_null_pos, extra_pos, sizeof(uint32_t));
472 extra_pos += sizeof(uint32_t);
473 memcpy(&curr_new_null_pos, extra_pos, sizeof(uint32_t));
474 extra_pos += sizeof(uint32_t);
475
476 memcpy(&num_columns, extra_pos, sizeof(num_columns));
477 extra_pos += sizeof(num_columns);
478
479 memset(new_null_bytes, 0, new_num_null_bytes); // shut valgrind up
480
481 //
482 // now go through and apply the change into new_val_data
483 //
484 for (uint32_t i = 0; i < num_columns; i++) {
485 uchar op_type = extra_pos[0];
486 bool is_null_default = false;
487 extra_pos++;
488
489 assert_always(op_type == COL_DROP || op_type == COL_ADD);
490 bool nullable = (extra_pos[0] != 0);
491 extra_pos++;
492 if (nullable) {
493 uint32_t null_bit_position;
494 memcpy(&null_bit_position, extra_pos, sizeof(uint32_t));
495 extra_pos += sizeof(uint32_t);
496 uint32_t num_bits;
497 if (op_type == COL_DROP) {
498 assert_always(curr_old_null_pos <= null_bit_position);
499 num_bits = null_bit_position - curr_old_null_pos;
500 } else {
501 assert_always(curr_new_null_pos <= null_bit_position);
502 num_bits = null_bit_position - curr_new_null_pos;
503 }
504 copy_null_bits(
505 curr_old_null_pos,
506 curr_new_null_pos,
507 num_bits,
508 old_null_bytes,
509 new_null_bytes);
510 // update the positions
511 curr_new_null_pos += num_bits;
512 curr_old_null_pos += num_bits;
513 if (op_type == COL_DROP) {
514 curr_old_null_pos++; // account for dropped column
515 } else {
516 is_null_default = (extra_pos[0] != 0);
517 extra_pos++;
518 set_overall_null_position(
519 new_null_bytes,
520 null_bit_position,
521 is_null_default);
522 curr_new_null_pos++; //account for added column
523 }
524 }
525 uchar col_type = extra_pos[0];
526 extra_pos++;
527 if (col_type == COL_FIXED) {
528 uint32_t col_offset;
529 uint32_t col_size;
530 uint32_t num_bytes_to_copy;
531 memcpy(&col_offset, extra_pos, sizeof(uint32_t));
532 extra_pos += sizeof(uint32_t);
533 memcpy(&col_size, extra_pos, sizeof(uint32_t));
534 extra_pos += sizeof(uint32_t);
535
536 if (op_type == COL_DROP) {
537 num_bytes_to_copy = col_offset - curr_old_fixed_offset;
538 } else {
539 num_bytes_to_copy = col_offset - curr_new_fixed_offset;
540 }
541 memcpy(
542 new_fixed_field_ptr + curr_new_fixed_offset,
543 old_fixed_field_ptr + curr_old_fixed_offset,
544 num_bytes_to_copy);
545 curr_old_fixed_offset += num_bytes_to_copy;
546 curr_new_fixed_offset += num_bytes_to_copy;
547 if (op_type == COL_DROP) {
548 // move old_fixed_offset val to skip OVER column that is
549 // being dropped
550 curr_old_fixed_offset += col_size;
551 } else {
552 if (is_null_default) {
553 // copy zeroes
554 memset(
555 new_fixed_field_ptr + curr_new_fixed_offset,
556 0,
557 col_size);
558 } else {
559 // copy data from extra_pos into new row
560 memcpy(
561 new_fixed_field_ptr + curr_new_fixed_offset,
562 extra_pos,
563 col_size);
564 extra_pos += col_size;
565 }
566 curr_new_fixed_offset += col_size;
567 }
568
569 } else if (col_type == COL_VAR) {
570 uint32_t var_col_index;
571 memcpy(&var_col_index, extra_pos, sizeof(uint32_t));
572 extra_pos += sizeof(uint32_t);
573 if (op_type == COL_DROP) {
574 num_var_fields_to_copy = var_col_index - curr_old_num_var_field;
575 } else {
576 num_var_fields_to_copy = var_col_index - curr_new_num_var_field;
577 }
578 copy_var_fields(
579 curr_old_num_var_field,
580 num_var_fields_to_copy,
581 old_var_field_offset_ptr,
582 old_num_offset_bytes,
583 curr_new_var_field_data_ptr,
584 curr_new_var_field_offset_ptr,
585 // pointer to beginning of var fields in new row
586 new_var_field_data_ptr,
587 // pointer to beginning of var fields in old row
588 old_var_field_data_ptr,
589 // number of offset bytes used in new row
590 new_num_offset_bytes,
591 &num_data_bytes_written,
592 &num_offset_bytes_written);
593 curr_new_var_field_data_ptr += num_data_bytes_written;
594 curr_new_var_field_offset_ptr += num_offset_bytes_written;
595 curr_new_num_var_field += num_var_fields_to_copy;
596 curr_old_num_var_field += num_var_fields_to_copy;
597 if (op_type == COL_DROP) {
598 curr_old_num_var_field++; // skip over dropped field
599 } else {
600 if (is_null_default) {
601 curr_new_var_field_data_ptr = write_var_field(
602 curr_new_var_field_offset_ptr,
603 curr_new_var_field_data_ptr,
604 new_var_field_data_ptr,
605 NULL, //copying no data
606 0, //copying 0 bytes
607 new_num_offset_bytes);
608 curr_new_var_field_offset_ptr += new_num_offset_bytes;
609 } else {
610 uint32_t data_length;
611 memcpy(&data_length, extra_pos, sizeof(data_length));
612 extra_pos += sizeof(data_length);
613 curr_new_var_field_data_ptr = write_var_field(
614 curr_new_var_field_offset_ptr,
615 curr_new_var_field_data_ptr,
616 new_var_field_data_ptr,
617 extra_pos, //copying data from mutator
618 data_length, //copying data_length bytes
619 new_num_offset_bytes);
620 extra_pos += data_length;
621 curr_new_var_field_offset_ptr += new_num_offset_bytes;
622 }
623 curr_new_num_var_field++; //account for added column
624 }
625 } else if (col_type == COL_BLOB) {
626 // handle blob data later
627 continue;
628 } else {
629 assert_unreachable();
630 }
631 }
632 // finish copying the null stuff
633 old_null_bits_left = 8*old_num_null_bytes - curr_old_null_pos;
634 new_null_bits_left = 8*new_num_null_bytes - curr_new_null_pos;
635 overall_null_bits_left = old_null_bits_left;
636 set_if_smaller(overall_null_bits_left, new_null_bits_left);
637 copy_null_bits(
638 curr_old_null_pos,
639 curr_new_null_pos,
640 overall_null_bits_left,
641 old_null_bytes,
642 new_null_bytes);
643 // finish copying fixed field stuff
644 num_bytes_left = old_fixed_field_size - curr_old_fixed_offset;
645 memcpy(
646 new_fixed_field_ptr + curr_new_fixed_offset,
647 old_fixed_field_ptr + curr_old_fixed_offset,
648 num_bytes_left);
649 curr_old_fixed_offset += num_bytes_left;
650 curr_new_fixed_offset += num_bytes_left;
651 // sanity check
652 assert_always(curr_new_fixed_offset == new_fixed_field_size);
653
654 // finish copying var field stuff
655 num_var_fields_to_copy = old_num_var_fields - curr_old_num_var_field;
656 copy_var_fields(
657 curr_old_num_var_field,
658 num_var_fields_to_copy,
659 old_var_field_offset_ptr,
660 old_num_offset_bytes,
661 curr_new_var_field_data_ptr,
662 curr_new_var_field_offset_ptr,
663 // pointer to beginning of var fields in new row
664 new_var_field_data_ptr,
665 // pointer to beginning of var fields in old row
666 old_var_field_data_ptr,
667 // number of offset bytes used in new row
668 new_num_offset_bytes,
669 &num_data_bytes_written,
670 &num_offset_bytes_written);
671 curr_new_var_field_offset_ptr += num_offset_bytes_written;
672 curr_new_var_field_data_ptr += num_data_bytes_written;
673 // sanity check
674 assert_always(curr_new_var_field_offset_ptr == new_var_field_data_ptr);
675
676 // start handling blobs
677 get_blob_field_info(
678 &start_blob_offset,
679 old_len_of_offsets,
680 old_var_field_data_ptr,
681 old_num_offset_bytes);
682 start_blob_ptr = old_var_field_data_ptr + start_blob_offset;
683 // if nothing else in extra, then there are no blobs to add or drop, so
684 // can copy blobs straight
685 if ((extra_pos - extra_pos_start) == extra->size) {
686 num_blob_bytes = old_val->size - (start_blob_ptr - old_null_bytes);
687 memcpy(curr_new_var_field_data_ptr, start_blob_ptr, num_blob_bytes);
688 curr_new_var_field_data_ptr += num_blob_bytes;
689 } else {
690 // else, there is blob information to process
691 uchar* len_bytes = NULL;
692 uint32_t curr_old_blob = 0;
693 uint32_t curr_new_blob = 0;
694 uint32_t num_old_blobs = 0;
695 uchar* curr_old_blob_ptr = start_blob_ptr;
696 memcpy(&num_old_blobs, extra_pos, sizeof(num_old_blobs));
697 extra_pos += sizeof(num_old_blobs);
698 len_bytes = extra_pos;
699 extra_pos += num_old_blobs;
700 // copy over blob fields one by one
701 while ((extra_pos - extra_pos_start) < extra->size) {
702 uchar op_type = extra_pos[0];
703 extra_pos++;
704 uint32_t num_blobs_to_copy = 0;
705 uint32_t blob_index;
706 memcpy(&blob_index, extra_pos, sizeof(blob_index));
707 extra_pos += sizeof(blob_index);
708 assert_always (op_type == COL_DROP || op_type == COL_ADD);
709 if (op_type == COL_DROP) {
710 num_blobs_to_copy = blob_index - curr_old_blob;
711 } else {
712 num_blobs_to_copy = blob_index - curr_new_blob;
713 }
714 for (uint32_t i = 0; i < num_blobs_to_copy; i++) {
715 uint32_t num_bytes_written = copy_toku_blob(
716 curr_new_var_field_data_ptr,
717 curr_old_blob_ptr,
718 len_bytes[curr_old_blob + i],
719 false);
720 curr_old_blob_ptr += num_bytes_written;
721 curr_new_var_field_data_ptr += num_bytes_written;
722 }
723 curr_old_blob += num_blobs_to_copy;
724 curr_new_blob += num_blobs_to_copy;
725 if (op_type == COL_DROP) {
726 // skip over blob in row
727 uint32_t num_bytes = copy_toku_blob(
728 NULL,
729 curr_old_blob_ptr,
730 len_bytes[curr_old_blob],
731 true);
732 curr_old_blob++;
733 curr_old_blob_ptr += num_bytes;
734 } else {
735 // copy new data
736 uint32_t new_len_bytes = extra_pos[0];
737 extra_pos++;
738 uint32_t num_bytes = copy_toku_blob(
739 curr_new_var_field_data_ptr,
740 extra_pos,
741 new_len_bytes,
742 false);
743 curr_new_blob++;
744 curr_new_var_field_data_ptr += num_bytes;
745 extra_pos += num_bytes;
746 }
747 }
748 num_blob_bytes = old_val->size - (curr_old_blob_ptr - old_null_bytes);
749 memcpy(curr_new_var_field_data_ptr, curr_old_blob_ptr, num_blob_bytes);
750 curr_new_var_field_data_ptr += num_blob_bytes;
751 }
752 new_val.data = new_val_data;
753 new_val.size = curr_new_var_field_data_ptr - new_val_data;
754 set_val(&new_val, set_extra);
755
756 error = 0;
757cleanup:
758 tokudb::memory::free(new_val_data);
759 return error;
760}
761
762// Expand the variable offset array in the old row given the update mesage
763// in the extra.
764static int tokudb_expand_variable_offsets(
765 DB* db,
766 const DBT* key,
767 const DBT* old_val,
768 const DBT* extra,
769 void (*set_val)(const DBT* new_val, void* set_extra),
770 void* set_extra) {
771
772 int error = 0;
773 tokudb::buffer extra_val(extra->data, 0, extra->size);
774
775 // decode the operation
776 uint8_t operation;
777 extra_val.consume(&operation, sizeof operation);
778 assert_always(operation == UPDATE_OP_EXPAND_VARIABLE_OFFSETS);
779
780 // decode number of offsets
781 uint32_t number_of_offsets;
782 extra_val.consume(&number_of_offsets, sizeof number_of_offsets);
783
784 // decode the offset start
785 uint32_t offset_start;
786 extra_val.consume(&offset_start, sizeof offset_start);
787
788 assert_always(extra_val.size() == extra_val.limit());
789
790 DBT new_val; memset(&new_val, 0, sizeof new_val);
791
792 if (old_val != NULL) {
793 assert_always(offset_start + number_of_offsets <= old_val->size);
794
795 // compute the new val from the old val
796 uchar* old_val_ptr = (uchar*)old_val->data;
797
798 // allocate space for the new val's data
799 uchar* new_val_ptr = (uchar*)tokudb::memory::malloc(
800 number_of_offsets + old_val->size,
801 MYF(MY_FAE));
802 if (!new_val_ptr) {
803 error = ENOMEM;
804 goto cleanup;
805 }
806 new_val.data = new_val_ptr;
807
808 // copy up to the start of the varchar offset
809 memcpy(new_val_ptr, old_val_ptr, offset_start);
810 new_val_ptr += offset_start;
811 old_val_ptr += offset_start;
812
813 // expand each offset from 1 to 2 bytes
814 for (uint32_t i = 0; i < number_of_offsets; i++) {
815 uint16_t new_offset = *old_val_ptr;
816 int2store(new_val_ptr, new_offset);
817 new_val_ptr += 2;
818 old_val_ptr += 1;
819 }
820
821 // copy the rest of the row
822 size_t n = old_val->size - (old_val_ptr - (uchar *)old_val->data);
823 memcpy(new_val_ptr, old_val_ptr, n);
824 new_val_ptr += n;
825 old_val_ptr += n;
826 new_val.size = new_val_ptr - (uchar *)new_val.data;
827
828 assert_always(new_val_ptr == (uchar *)new_val.data + new_val.size);
829 assert_always(old_val_ptr == (uchar *)old_val->data + old_val->size);
830
831 // set the new val
832 set_val(&new_val, set_extra);
833 }
834
835 error = 0;
836
837cleanup:
838 tokudb::memory::free(new_val.data);
839 return error;
840}
841
842// Expand an int field in a old row given the expand message in the extra.
843static int tokudb_expand_int_field(
844 DB* db,
845 const DBT* key,
846 const DBT* old_val,
847 const DBT* extra,
848 void (*set_val)(const DBT* new_val, void* set_extra),
849 void* set_extra) {
850
851 int error = 0;
852 tokudb::buffer extra_val(extra->data, 0, extra->size);
853
854 uint8_t operation;
855 extra_val.consume(&operation, sizeof operation);
856 assert_always(
857 operation == UPDATE_OP_EXPAND_INT ||
858 operation == UPDATE_OP_EXPAND_UINT);
859 uint32_t the_offset;
860 extra_val.consume(&the_offset, sizeof the_offset);
861 uint32_t old_length;
862 extra_val.consume(&old_length, sizeof old_length);
863 uint32_t new_length;
864 extra_val.consume(&new_length, sizeof new_length);
865 assert_always(extra_val.size() == extra_val.limit());
866
867 assert_always(new_length >= old_length); // expand only
868
869 DBT new_val; memset(&new_val, 0, sizeof new_val);
870
871 if (old_val != NULL) {
872 // old field within the old val
873 assert_always(the_offset + old_length <= old_val->size);
874
875 // compute the new val from the old val
876 uchar* old_val_ptr = (uchar*)old_val->data;
877
878 // allocate space for the new val's data
879 uchar* new_val_ptr = (uchar*)tokudb::memory::malloc(
880 old_val->size + (new_length - old_length),
881 MYF(MY_FAE));
882 if (!new_val_ptr) {
883 error = ENOMEM;
884 goto cleanup;
885 }
886 new_val.data = new_val_ptr;
887
888 // copy up to the old offset
889 memcpy(new_val_ptr, old_val_ptr, the_offset);
890 new_val_ptr += the_offset;
891 old_val_ptr += the_offset;
892
893 switch (operation) {
894 case UPDATE_OP_EXPAND_INT:
895 // fill the entire new value with ones or zeros depending on the
896 // sign bit the encoding is little endian
897 memset(
898 new_val_ptr,
899 (old_val_ptr[old_length-1] & 0x80) ? 0xff : 0x00,
900 new_length);
901 // overlay the low bytes of the new value with the old value
902 memcpy(new_val_ptr, old_val_ptr, old_length);
903 new_val_ptr += new_length;
904 old_val_ptr += old_length;
905 break;
906 case UPDATE_OP_EXPAND_UINT:
907 // fill the entire new value with zeros
908 memset(new_val_ptr, 0, new_length);
909 // overlay the low bytes of the new value with the old value
910 memcpy(new_val_ptr, old_val_ptr, old_length);
911 new_val_ptr += new_length;
912 old_val_ptr += old_length;
913 break;
914 default:
915 assert_unreachable();
916 }
917
918 // copy the rest
919 size_t n = old_val->size - (old_val_ptr - (uchar *)old_val->data);
920 memcpy(new_val_ptr, old_val_ptr, n);
921 new_val_ptr += n;
922 old_val_ptr += n;
923 new_val.size = new_val_ptr - (uchar *)new_val.data;
924
925 assert_always(new_val_ptr == (uchar *)new_val.data + new_val.size);
926 assert_always(old_val_ptr == (uchar *)old_val->data + old_val->size);
927
928 // set the new val
929 set_val(&new_val, set_extra);
930 }
931
932 error = 0;
933
934cleanup:
935 tokudb::memory::free(new_val.data);
936 return error;
937}
938
939// Expand a char field in a old row given the expand message in the extra.
940static int tokudb_expand_char_field(
941 DB* db,
942 const DBT* key,
943 const DBT* old_val,
944 const DBT* extra,
945 void (*set_val)(const DBT* new_val, void* set_extra),
946 void* set_extra) {
947
948 int error = 0;
949 tokudb::buffer extra_val(extra->data, 0, extra->size);
950
951 uint8_t operation;
952 extra_val.consume(&operation, sizeof operation);
953 assert_always(
954 operation == UPDATE_OP_EXPAND_CHAR ||
955 operation == UPDATE_OP_EXPAND_BINARY);
956 uint32_t the_offset;
957 extra_val.consume(&the_offset, sizeof the_offset);
958 uint32_t old_length;
959 extra_val.consume(&old_length, sizeof old_length);
960 uint32_t new_length;
961 extra_val.consume(&new_length, sizeof new_length);
962 uchar pad_char;
963 extra_val.consume(&pad_char, sizeof pad_char);
964 assert_always(extra_val.size() == extra_val.limit());
965
966 assert_always(new_length >= old_length); // expand only
967
968 DBT new_val; memset(&new_val, 0, sizeof new_val);
969
970 if (old_val != NULL) {
971 // old field within the old val
972 assert_always(the_offset + old_length <= old_val->size);
973
974 // compute the new val from the old val
975 uchar* old_val_ptr = (uchar*)old_val->data;
976
977 // allocate space for the new val's data
978 uchar* new_val_ptr = (uchar*)tokudb::memory::malloc(
979 old_val->size + (new_length - old_length),
980 MYF(MY_FAE));
981 if (!new_val_ptr) {
982 error = ENOMEM;
983 goto cleanup;
984 }
985 new_val.data = new_val_ptr;
986
987 // copy up to the old offset
988 memcpy(new_val_ptr, old_val_ptr, the_offset);
989 new_val_ptr += the_offset;
990 old_val_ptr += the_offset;
991
992 switch (operation) {
993 case UPDATE_OP_EXPAND_CHAR:
994 case UPDATE_OP_EXPAND_BINARY:
995 // fill the entire new value with the pad char
996 memset(new_val_ptr, pad_char, new_length);
997 // overlay the low bytes of the new value with the old value
998 memcpy(new_val_ptr, old_val_ptr, old_length);
999 new_val_ptr += new_length;
1000 old_val_ptr += old_length;
1001 break;
1002 default:
1003 assert_unreachable();
1004 }
1005
1006 // copy the rest
1007 size_t n = old_val->size - (old_val_ptr - (uchar *)old_val->data);
1008 memcpy(new_val_ptr, old_val_ptr, n);
1009 new_val_ptr += n;
1010 old_val_ptr += n;
1011 new_val.size = new_val_ptr - (uchar *)new_val.data;
1012
1013 assert_always(new_val_ptr == (uchar *)new_val.data + new_val.size);
1014 assert_always(old_val_ptr == (uchar *)old_val->data + old_val->size);
1015
1016 // set the new val
1017 set_val(&new_val, set_extra);
1018 }
1019
1020 error = 0;
1021
1022cleanup:
1023 tokudb::memory::free(new_val.data);
1024 return error;
1025}
1026
1027namespace tokudb {
1028
1029class var_fields {
1030public:
1031 inline var_fields() {
1032 }
1033 inline void init_var_fields(
1034 uint32_t var_offset,
1035 uint32_t offset_bytes,
1036 uint32_t bytes_per_offset,
1037 tokudb::buffer* val_buffer) {
1038
1039 assert_always(
1040 bytes_per_offset == 0 ||
1041 bytes_per_offset == 1 ||
1042 bytes_per_offset == 2);
1043 m_var_offset = var_offset;
1044 m_val_offset = m_var_offset + offset_bytes;
1045 m_bytes_per_offset = bytes_per_offset;
1046 if (bytes_per_offset > 0) {
1047 m_num_fields = offset_bytes / bytes_per_offset;
1048 } else {
1049 assert_always(offset_bytes == 0);
1050 m_num_fields = 0;
1051 }
1052 m_val_buffer = val_buffer;
1053 }
1054 uint32_t value_offset(uint32_t var_index);
1055 uint32_t value_length(uint32_t var_index);
1056 void update_offsets(uint32_t var_index, uint32_t old_s, uint32_t new_s);
1057 uint32_t end_offset();
1058 void replace(
1059 uint32_t var_index,
1060 void* new_val_ptr,
1061 uint32_t new_val_length);
1062private:
1063 uint32_t read_offset(uint32_t var_index);
1064 void write_offset(uint32_t var_index, uint32_t v);
1065private:
1066 uint32_t m_var_offset;
1067 uint32_t m_val_offset;
1068 uint32_t m_bytes_per_offset;
1069 uint32_t m_num_fields;
1070 tokudb::buffer* m_val_buffer;
1071};
1072
1073// Return the ith variable length offset
1074uint32_t var_fields::read_offset(uint32_t var_index) {
1075 uint32_t offset = 0;
1076 m_val_buffer->read(
1077 &offset, m_bytes_per_offset, m_var_offset + var_index * m_bytes_per_offset);
1078 return offset;
1079}
1080
1081// Write the ith variable length offset with a new offset.
1082void var_fields::write_offset(uint32_t var_index, uint32_t new_offset) {
1083 m_val_buffer->write(
1084 &new_offset,
1085 m_bytes_per_offset,
1086 m_var_offset + var_index * m_bytes_per_offset);
1087}
1088
1089// Return the offset of the ith variable length field
1090uint32_t var_fields::value_offset(uint32_t var_index) {
1091 assert_always(var_index < m_num_fields);
1092 if (var_index == 0)
1093 return m_val_offset;
1094 else
1095 return m_val_offset + read_offset(var_index-1);
1096}
1097
1098// Return the length of the ith variable length field
1099uint32_t var_fields::value_length(uint32_t var_index) {
1100 assert_always(var_index < m_num_fields);
1101 if (var_index == 0)
1102 return read_offset(0);
1103 else
1104 return read_offset(var_index) - read_offset(var_index-1);
1105}
1106
1107// The length of the ith variable length fields changed.
1108// Update all of the subsequent offsets.
1109void var_fields::update_offsets(
1110 uint32_t var_index,
1111 uint32_t old_s,
1112 uint32_t new_s) {
1113
1114 assert_always(var_index < m_num_fields);
1115 if (old_s == new_s)
1116 return;
1117 for (uint i = var_index; i < m_num_fields; i++) {
1118 uint32_t v = read_offset(i);
1119 if (new_s > old_s)
1120 write_offset(i, v + (new_s - old_s));
1121 else
1122 write_offset(i, v - (old_s - new_s));
1123 }
1124}
1125
1126uint32_t var_fields::end_offset() {
1127 if (m_num_fields == 0)
1128 return m_val_offset;
1129 else
1130 return m_val_offset + read_offset(m_num_fields-1);
1131}
1132
1133void var_fields::replace(
1134 uint32_t var_index,
1135 void* new_val_ptr,
1136 uint32_t new_val_length) {
1137
1138 // replace the new val with the extra val
1139 uint32_t the_offset = value_offset(var_index);
1140 uint32_t old_s = value_length(var_index);
1141 uint32_t new_s = new_val_length;
1142 m_val_buffer->replace(the_offset, old_s, new_val_ptr, new_s);
1143
1144 // update the var offsets
1145 update_offsets(var_index, old_s, new_s);
1146}
1147
1148class blob_fields {
1149public:
1150 blob_fields() {
1151 }
1152 void init_blob_fields(
1153 uint32_t num_blobs,
1154 const uint8_t* blob_lengths,
1155 tokudb::buffer* val_buffer) {
1156 m_num_blobs = num_blobs;
1157 m_blob_lengths = blob_lengths;
1158 m_val_buffer = val_buffer;
1159 }
1160 void start_blobs(uint32_t offset) {
1161 m_blob_offset = offset;
1162 }
1163 void replace(uint32_t blob_index, uint32_t length, void *p);
1164
1165 void expand_length(
1166 uint32_t blob_index,
1167 uint8_t old_length_length,
1168 uint8_t new_length_length);
1169private:
1170 uint32_t read_length(uint32_t offset, size_t size);
1171 void write_length(uint32_t offset, size_t size, uint32_t new_length);
1172 uint32_t blob_offset(uint32_t blob_index);
1173private:
1174 uint32_t m_blob_offset;
1175 uint32_t m_num_blobs;
1176 const uint8_t *m_blob_lengths;
1177 tokudb::buffer *m_val_buffer;
1178};
1179
1180uint32_t blob_fields::read_length(uint32_t offset, size_t blob_length) {
1181 uint32_t length = 0;
1182 m_val_buffer->read(&length, blob_length, offset);
1183 return length;
1184}
1185
1186void blob_fields::write_length(
1187 uint32_t offset,
1188 size_t size,
1189 uint32_t new_length) {
1190 m_val_buffer->write(&new_length, size, offset);
1191}
1192
1193uint32_t blob_fields::blob_offset(uint32_t blob_index) {
1194 assert_always(blob_index < m_num_blobs);
1195 uint32_t offset = m_blob_offset;
1196 for (uint i = 0; i < blob_index; i++) {
1197 uint32_t blob_length = m_blob_lengths[i];
1198 uint32_t length = read_length(offset, blob_length);
1199 offset += blob_length + length;
1200 }
1201 return offset;
1202}
1203
1204void blob_fields::replace(
1205 uint32_t blob_index,
1206 uint32_t new_length,
1207 void* new_value) {
1208
1209 assert_always(blob_index < m_num_blobs);
1210
1211 // compute the ith blob offset
1212 uint32_t offset = blob_offset(blob_index);
1213 uint8_t blob_length = m_blob_lengths[blob_index];
1214
1215 // read the old length
1216 uint32_t old_length = read_length(offset, blob_length);
1217
1218 // replace the data
1219 m_val_buffer->replace(
1220 offset + blob_length,
1221 old_length,
1222 new_value,
1223 new_length);
1224
1225 // write the new length
1226 write_length(offset, blob_length, new_length);
1227}
1228
1229void blob_fields::expand_length(
1230 uint32_t blob_index,
1231 uint8_t old_length_length,
1232 uint8_t new_length_length) {
1233
1234 assert_always(blob_index < m_num_blobs);
1235 assert_always(old_length_length == m_blob_lengths[blob_index]);
1236
1237 // compute the ith blob offset
1238 uint32_t offset = blob_offset(blob_index);
1239
1240 // read the blob length
1241 uint32_t blob_length = read_length(offset, old_length_length);
1242
1243 // expand the length
1244 m_val_buffer->replace(
1245 offset,
1246 old_length_length,
1247 &blob_length,
1248 new_length_length);
1249}
1250
1251class value_map {
1252public:
1253 value_map(tokudb::buffer *val_buffer) : m_val_buffer(val_buffer) {
1254 }
1255
1256 void init_var_fields(
1257 uint32_t var_offset,
1258 uint32_t offset_bytes,
1259 uint32_t bytes_per_offset) {
1260
1261 m_var_fields.init_var_fields(
1262 var_offset,
1263 offset_bytes,
1264 bytes_per_offset,
1265 m_val_buffer);
1266 }
1267
1268 void init_blob_fields(uint32_t num_blobs, const uint8_t *blob_lengths) {
1269 m_blob_fields.init_blob_fields(num_blobs, blob_lengths, m_val_buffer);
1270 }
1271
1272 // Replace the value of a fixed length field
1273 void replace_fixed(
1274 uint32_t the_offset,
1275 uint32_t field_null_num,
1276 void* new_val_ptr,
1277 uint32_t new_val_length) {
1278
1279 m_val_buffer->replace(
1280 the_offset,
1281 new_val_length,
1282 new_val_ptr,
1283 new_val_length);
1284 maybe_clear_null(field_null_num);
1285 }
1286
1287 // Replace the value of a variable length field
1288 void replace_varchar(
1289 uint32_t var_index,
1290 uint32_t field_null_num,
1291 void* new_val_ptr,
1292 uint32_t new_val_length) {
1293
1294 m_var_fields.replace(var_index, new_val_ptr, new_val_length);
1295 maybe_clear_null(field_null_num);
1296 }
1297
1298 // Replace the value of a blob field
1299 void replace_blob(
1300 uint32_t blob_index,
1301 uint32_t field_null_num,
1302 void* new_val_ptr,
1303 uint32_t new_val_length) {
1304
1305 m_blob_fields.start_blobs(m_var_fields.end_offset());
1306 m_blob_fields.replace(blob_index, new_val_length, new_val_ptr);
1307 maybe_clear_null(field_null_num);
1308 }
1309
1310 void expand_blob_lengths(
1311 uint32_t num_blob,
1312 const uint8_t* old_length,
1313 const uint8_t* new_length);
1314
1315 void int_op(
1316 uint32_t operation,
1317 uint32_t the_offset,
1318 uint32_t length,
1319 uint32_t field_null_num,
1320 tokudb::buffer& old_val,
1321 void* extra_val);
1322
1323 void uint_op(
1324 uint32_t operation,
1325 uint32_t the_offset,
1326 uint32_t length,
1327 uint32_t field_null_num,
1328 tokudb::buffer& old_val,
1329 void* extra_val);
1330
1331private:
1332 bool is_null(uint32_t null_num, uchar *null_bytes) {
1333 bool field_is_null = false;
1334 if (null_num) {
1335 if (null_num & (1<<31))
1336 null_num &= ~(1<<31);
1337 else
1338 null_num -= 1;
1339 field_is_null = is_overall_null_position_set(null_bytes, null_num);
1340 }
1341 return field_is_null;
1342 }
1343
1344 void maybe_clear_null(uint32_t null_num) {
1345 if (null_num) {
1346 if (null_num & (1<<31))
1347 null_num &= ~(1<<31);
1348 else
1349 null_num -= 1;
1350 set_overall_null_position(
1351 (uchar*)m_val_buffer->data(),
1352 null_num,
1353 false);
1354 }
1355 }
1356
1357private:
1358 var_fields m_var_fields;
1359 blob_fields m_blob_fields;
1360 tokudb::buffer *m_val_buffer;
1361};
1362
1363// Update an int field: signed newval@offset = old_val@offset OP extra_val
1364void value_map::int_op(
1365 uint32_t operation,
1366 uint32_t the_offset,
1367 uint32_t length,
1368 uint32_t field_null_num,
1369 tokudb::buffer &old_val,
1370 void* extra_val) {
1371
1372 assert_always(the_offset + length <= m_val_buffer->size());
1373 assert_always(the_offset + length <= old_val.size());
1374 assert_always(
1375 length == 1 || length == 2 || length == 3 ||
1376 length == 4 || length == 8);
1377
1378 uchar *old_val_ptr = (uchar *) old_val.data();
1379 bool field_is_null = is_null(field_null_num, old_val_ptr);
1380 int64_t v = 0;
1381 memcpy(&v, old_val_ptr + the_offset, length);
1382 v = tokudb::int_sign_extend(v, 8*length);
1383 int64_t extra_v = 0;
1384 memcpy(&extra_v, extra_val, length);
1385 extra_v = tokudb::int_sign_extend(extra_v, 8*length);
1386 switch (operation) {
1387 case '+':
1388 if (!field_is_null) {
1389 bool over;
1390 v = tokudb::int_add(v, extra_v, 8*length, &over);
1391 if (over) {
1392 if (extra_v > 0)
1393 v = tokudb::int_high_endpoint(8*length);
1394 else
1395 v = tokudb::int_low_endpoint(8*length);
1396 }
1397 m_val_buffer->replace(the_offset, length, &v, length);
1398 }
1399 break;
1400 case '-':
1401 if (!field_is_null) {
1402 bool over;
1403 v = tokudb::int_sub(v, extra_v, 8*length, &over);
1404 if (over) {
1405 if (extra_v > 0)
1406 v = tokudb::int_low_endpoint(8*length);
1407 else
1408 v = tokudb::int_high_endpoint(8*length);
1409 }
1410 m_val_buffer->replace(the_offset, length, &v, length);
1411 }
1412 break;
1413 default:
1414 assert_unreachable();
1415 }
1416}
1417
1418// Update an unsigned field: unsigned newval@offset = old_val@offset OP extra_val
1419void value_map::uint_op(
1420 uint32_t operation,
1421 uint32_t the_offset,
1422 uint32_t length,
1423 uint32_t field_null_num,
1424 tokudb::buffer& old_val,
1425 void* extra_val) {
1426
1427 assert_always(the_offset + length <= m_val_buffer->size());
1428 assert_always(the_offset + length <= old_val.size());
1429 assert_always(
1430 length == 1 || length == 2 || length == 3 ||
1431 length == 4 || length == 8);
1432
1433 uchar *old_val_ptr = (uchar *) old_val.data();
1434 bool field_is_null = is_null(field_null_num, old_val_ptr);
1435 uint64_t v = 0;
1436 memcpy(&v, old_val_ptr + the_offset, length);
1437 uint64_t extra_v = 0;
1438 memcpy(&extra_v, extra_val, length);
1439 switch (operation) {
1440 case '+':
1441 if (!field_is_null) {
1442 bool over;
1443 v = tokudb::uint_add(v, extra_v, 8*length, &over);
1444 if (over) {
1445 v = tokudb::uint_high_endpoint(8*length);
1446 }
1447 m_val_buffer->replace(the_offset, length, &v, length);
1448 }
1449 break;
1450 case '-':
1451 if (!field_is_null) {
1452 bool over;
1453 v = tokudb::uint_sub(v, extra_v, 8*length, &over);
1454 if (over) {
1455 v = tokudb::uint_low_endpoint(8*length);
1456 }
1457 m_val_buffer->replace(the_offset, length, &v, length);
1458 }
1459 break;
1460 default:
1461 assert_unreachable();
1462 }
1463}
1464
1465void value_map::expand_blob_lengths(
1466 uint32_t num_blob,
1467 const uint8_t* old_length,
1468 const uint8_t* new_length) {
1469
1470 uint8_t current_length[num_blob];
1471 memcpy(current_length, old_length, num_blob);
1472 for (uint32_t i = 0; i < num_blob; i++) {
1473 if (new_length[i] > current_length[i]) {
1474 m_blob_fields.init_blob_fields(
1475 num_blob,
1476 current_length,
1477 m_val_buffer);
1478 m_blob_fields.start_blobs(m_var_fields.end_offset());
1479 m_blob_fields.expand_length(i, current_length[i], new_length[i]);
1480 current_length[i] = new_length[i];
1481 }
1482 }
1483}
1484
1485}
1486
1487static uint32_t consume_uint32(tokudb::buffer &b) {
1488 uint32_t n;
1489 size_t s = b.consume_ui<uint32_t>(&n);
1490 assert_always(s > 0);
1491 return n;
1492}
1493
1494static uint8_t *consume_uint8_array(tokudb::buffer &b, uint32_t array_size) {
1495 uint8_t *p = (uint8_t *) b.consume_ptr(array_size);
1496 assert_always(p);
1497 return p;
1498}
1499
1500static int tokudb_expand_blobs(
1501 DB* db,
1502 const DBT* key_dbt,
1503 const DBT* old_val_dbt,
1504 const DBT* extra,
1505 void (*set_val)(const DBT* new_val_dbt, void* set_extra),
1506 void* set_extra) {
1507
1508 tokudb::buffer extra_val(extra->data, 0, extra->size);
1509
1510 uint8_t operation;
1511 extra_val.consume(&operation, sizeof operation);
1512 assert_always(operation == UPDATE_OP_EXPAND_BLOB);
1513
1514 if (old_val_dbt != NULL) {
1515 // new val = old val
1516 tokudb::buffer new_val;
1517 new_val.append(old_val_dbt->data, old_val_dbt->size);
1518
1519 tokudb::value_map vd(&new_val);
1520
1521 // decode variable field info
1522 uint32_t var_field_offset = consume_uint32(extra_val);
1523 uint32_t var_offset_bytes = consume_uint32(extra_val);
1524 uint32_t bytes_per_offset = consume_uint32(extra_val);
1525 vd.init_var_fields(
1526 var_field_offset,
1527 var_offset_bytes,
1528 bytes_per_offset);
1529
1530 // decode blob info
1531 uint32_t num_blob = consume_uint32(extra_val);
1532 const uint8_t* old_blob_length =
1533 consume_uint8_array(extra_val, num_blob);
1534 const uint8_t* new_blob_length =
1535 consume_uint8_array(extra_val, num_blob);
1536 assert_always(extra_val.size() == extra_val.limit());
1537
1538 // expand blob lengths
1539 vd.expand_blob_lengths(num_blob, old_blob_length, new_blob_length);
1540
1541 // set the new val
1542 DBT new_val_dbt; memset(&new_val_dbt, 0, sizeof new_val_dbt);
1543 new_val_dbt.data = new_val.data();
1544 new_val_dbt.size = new_val.size();
1545 set_val(&new_val_dbt, set_extra);
1546 }
1547 return 0;
1548}
1549
1550// Decode and apply a sequence of update operations defined in the extra to
1551// the old value and put the result in the new value.
1552static void apply_1_updates(
1553 tokudb::value_map& vd,
1554 tokudb::buffer& new_val,
1555 tokudb::buffer& old_val,
1556 tokudb::buffer& extra_val) {
1557
1558 uint32_t num_updates;
1559 extra_val.consume(&num_updates, sizeof num_updates);
1560 for ( ; num_updates > 0; num_updates--) {
1561 // get the update operation
1562 uint32_t update_operation;
1563 extra_val.consume(&update_operation, sizeof update_operation);
1564 uint32_t field_type;
1565 extra_val.consume(&field_type, sizeof field_type);
1566 uint32_t unused;
1567 extra_val.consume(&unused, sizeof unused);
1568 uint32_t field_null_num;
1569 extra_val.consume(&field_null_num, sizeof field_null_num);
1570 uint32_t the_offset;
1571 extra_val.consume(&the_offset, sizeof the_offset);
1572 uint32_t extra_val_length;
1573 extra_val.consume(&extra_val_length, sizeof extra_val_length);
1574 void *extra_val_ptr = extra_val.consume_ptr(extra_val_length);
1575
1576 // apply the update
1577 switch (field_type) {
1578 case UPDATE_TYPE_INT:
1579 if (update_operation == '=')
1580 vd.replace_fixed(
1581 the_offset,
1582 field_null_num,
1583 extra_val_ptr,
1584 extra_val_length);
1585 else
1586 vd.int_op(
1587 update_operation,
1588 the_offset,
1589 extra_val_length,
1590 field_null_num,
1591 old_val,
1592 extra_val_ptr);
1593 break;
1594 case UPDATE_TYPE_UINT:
1595 if (update_operation == '=')
1596 vd.replace_fixed(
1597 the_offset,
1598 field_null_num,
1599 extra_val_ptr,
1600 extra_val_length);
1601 else
1602 vd.uint_op(
1603 update_operation,
1604 the_offset,
1605 extra_val_length,
1606 field_null_num,
1607 old_val,
1608 extra_val_ptr);
1609 break;
1610 case UPDATE_TYPE_CHAR:
1611 case UPDATE_TYPE_BINARY:
1612 if (update_operation == '=')
1613 vd.replace_fixed(
1614 the_offset,
1615 field_null_num,
1616 extra_val_ptr,
1617 extra_val_length);
1618 else
1619 assert_unreachable();
1620 break;
1621 default:
1622 assert_unreachable();
1623 break;
1624 }
1625 }
1626 assert_always(extra_val.size() == extra_val.limit());
1627}
1628
1629// Simple update handler. Decode the update message, apply the update operations
1630// to the old value, and set the new value.
1631static int tokudb_update_1_fun(
1632 DB* db,
1633 const DBT* key_dbt,
1634 const DBT* old_val_dbt,
1635 const DBT* extra,
1636 void (*set_val)(const DBT* new_val_dbt, void* set_extra),
1637 void* set_extra) {
1638
1639 tokudb::buffer extra_val(extra->data, 0, extra->size);
1640
1641 uint8_t operation;
1642 extra_val.consume(&operation, sizeof operation);
1643 assert_always(operation == UPDATE_OP_UPDATE_1);
1644
1645 if (old_val_dbt != NULL) {
1646 // get the simple descriptor
1647 uint32_t m_fixed_field_offset;
1648 extra_val.consume(&m_fixed_field_offset, sizeof m_fixed_field_offset);
1649 uint32_t m_var_field_offset;
1650 extra_val.consume(&m_var_field_offset, sizeof m_var_field_offset);
1651 uint32_t m_var_offset_bytes;
1652 extra_val.consume(&m_var_offset_bytes, sizeof m_var_offset_bytes);
1653 uint32_t m_bytes_per_offset;
1654 extra_val.consume(&m_bytes_per_offset, sizeof m_bytes_per_offset);
1655
1656 tokudb::buffer old_val(
1657 old_val_dbt->data,
1658 old_val_dbt->size,
1659 old_val_dbt->size);
1660
1661 // new val = old val
1662 tokudb::buffer new_val;
1663 new_val.append(old_val_dbt->data, old_val_dbt->size);
1664
1665 tokudb::value_map vd(&new_val);
1666 vd.init_var_fields(
1667 m_var_field_offset,
1668 m_var_offset_bytes,
1669 m_bytes_per_offset);
1670
1671 // apply updates to new val
1672 apply_1_updates(vd, new_val, old_val, extra_val);
1673
1674 // set the new val
1675 DBT new_val_dbt; memset(&new_val_dbt, 0, sizeof new_val_dbt);
1676 new_val_dbt.data = new_val.data();
1677 new_val_dbt.size = new_val.size();
1678 set_val(&new_val_dbt, set_extra);
1679 }
1680
1681 return 0;
1682}
1683
1684// Simple upsert handler. Decode the upsert message. If the key does not exist,
1685// then insert a new value from the extra.
1686// Otherwise, apply the update operations to the old value, and then set the
1687// new value.
1688static int tokudb_upsert_1_fun(
1689 DB* db,
1690 const DBT* key_dbt,
1691 const DBT* old_val_dbt,
1692 const DBT* extra,
1693 void (*set_val)(const DBT* new_val_dbt, void* set_extra),
1694 void* set_extra) {
1695
1696 tokudb::buffer extra_val(extra->data, 0, extra->size);
1697
1698 uint8_t operation;
1699 extra_val.consume(&operation, sizeof operation);
1700 assert_always(operation == UPDATE_OP_UPSERT_1);
1701
1702 uint32_t insert_length;
1703 extra_val.consume(&insert_length, sizeof insert_length);
1704 void *insert_row = extra_val.consume_ptr(insert_length);
1705
1706 if (old_val_dbt == NULL) {
1707 // insert a new row
1708 DBT new_val_dbt; memset(&new_val_dbt, 0, sizeof new_val_dbt);
1709 new_val_dbt.size = insert_length;
1710 new_val_dbt.data = insert_row;
1711 set_val(&new_val_dbt, set_extra);
1712 } else {
1713 // decode the simple descriptor
1714 uint32_t m_fixed_field_offset;
1715 extra_val.consume(&m_fixed_field_offset, sizeof m_fixed_field_offset);
1716 uint32_t m_var_field_offset;
1717 extra_val.consume(&m_var_field_offset, sizeof m_var_field_offset);
1718 uint32_t m_var_offset_bytes;
1719 extra_val.consume(&m_var_offset_bytes, sizeof m_var_offset_bytes);
1720 uint32_t m_bytes_per_offset;
1721 extra_val.consume(&m_bytes_per_offset, sizeof m_bytes_per_offset);
1722
1723 tokudb::buffer old_val(
1724 old_val_dbt->data,
1725 old_val_dbt->size,
1726 old_val_dbt->size);
1727
1728 // new val = old val
1729 tokudb::buffer new_val;
1730 new_val.append(old_val_dbt->data, old_val_dbt->size);
1731
1732 tokudb::value_map vd(&new_val);
1733 vd.init_var_fields(
1734 m_var_field_offset,
1735 m_var_offset_bytes,
1736 m_bytes_per_offset);
1737
1738 // apply updates to new val
1739 apply_1_updates(vd, new_val, old_val, extra_val);
1740
1741 // set the new val
1742 DBT new_val_dbt; memset(&new_val_dbt, 0, sizeof new_val_dbt);
1743 new_val_dbt.data = new_val.data();
1744 new_val_dbt.size = new_val.size();
1745 set_val(&new_val_dbt, set_extra);
1746 }
1747
1748 return 0;
1749}
1750
1751// Decode and apply a sequence of update operations defined in the extra to the
1752// old value and put the result in the new value.
1753static void apply_2_updates(
1754 tokudb::value_map& vd,
1755 tokudb::buffer& new_val,
1756 tokudb::buffer& old_val,
1757 tokudb::buffer& extra_val) {
1758
1759 uint32_t num_updates = consume_uint32(extra_val);
1760 for (uint32_t i = 0; i < num_updates; i++) {
1761 uint32_t update_operation = consume_uint32(extra_val);
1762 if (update_operation == 'v') {
1763 uint32_t var_field_offset = consume_uint32(extra_val);
1764 uint32_t var_offset_bytes = consume_uint32(extra_val);
1765 uint32_t bytes_per_offset = consume_uint32(extra_val);
1766 vd.init_var_fields(
1767 var_field_offset,
1768 var_offset_bytes,
1769 bytes_per_offset);
1770 } else if (update_operation == 'b') {
1771 uint32_t num_blobs = consume_uint32(extra_val);
1772 const uint8_t* blob_lengths =
1773 consume_uint8_array(extra_val, num_blobs);
1774 vd.init_blob_fields(num_blobs, blob_lengths);
1775 } else {
1776 uint32_t field_type = consume_uint32(extra_val);
1777 uint32_t field_null_num = consume_uint32(extra_val);
1778 uint32_t the_offset = consume_uint32(extra_val);
1779 uint32_t extra_val_length = consume_uint32(extra_val);
1780 void* extra_val_ptr = extra_val.consume_ptr(extra_val_length);
1781 assert_always(extra_val_ptr);
1782
1783 switch (field_type) {
1784 case UPDATE_TYPE_INT:
1785 if (update_operation == '=')
1786 vd.replace_fixed(
1787 the_offset,
1788 field_null_num,
1789 extra_val_ptr,
1790 extra_val_length);
1791 else
1792 vd.int_op(
1793 update_operation,
1794 the_offset,
1795 extra_val_length,
1796 field_null_num,
1797 old_val,
1798 extra_val_ptr);
1799 break;
1800 case UPDATE_TYPE_UINT:
1801 if (update_operation == '=')
1802 vd.replace_fixed(
1803 the_offset,
1804 field_null_num,
1805 extra_val_ptr,
1806 extra_val_length);
1807 else
1808 vd.uint_op(
1809 update_operation,
1810 the_offset,
1811 extra_val_length,
1812 field_null_num,
1813 old_val,
1814 extra_val_ptr);
1815 break;
1816 case UPDATE_TYPE_CHAR:
1817 case UPDATE_TYPE_BINARY:
1818 if (update_operation == '=')
1819 vd.replace_fixed(
1820 the_offset,
1821 field_null_num,
1822 extra_val_ptr,
1823 extra_val_length);
1824 else
1825 assert_unreachable();
1826 break;
1827 case UPDATE_TYPE_VARBINARY:
1828 case UPDATE_TYPE_VARCHAR:
1829 if (update_operation == '=')
1830 vd.replace_varchar(
1831 the_offset,
1832 field_null_num,
1833 extra_val_ptr,
1834 extra_val_length);
1835 else
1836 assert_unreachable();
1837 break;
1838 case UPDATE_TYPE_TEXT:
1839 case UPDATE_TYPE_BLOB:
1840 if (update_operation == '=')
1841 vd.replace_blob(
1842 the_offset,
1843 field_null_num,
1844 extra_val_ptr,
1845 extra_val_length);
1846 else
1847 assert_unreachable();
1848 break;
1849 default:
1850 assert_unreachable();
1851 }
1852 }
1853 }
1854 assert_always(extra_val.size() == extra_val.limit());
1855}
1856
1857// Simple update handler. Decode the update message, apply the update
1858// operations to the old value, and set the new value.
1859static int tokudb_update_2_fun(
1860 DB* db,
1861 const DBT* key_dbt,
1862 const DBT* old_val_dbt,
1863 const DBT* extra,
1864 void (*set_val)(const DBT* new_val_dbt, void* set_extra),
1865 void* set_extra) {
1866
1867 tokudb::buffer extra_val(extra->data, 0, extra->size);
1868
1869 uint8_t op;
1870 extra_val.consume(&op, sizeof op);
1871 assert_always(op == UPDATE_OP_UPDATE_2);
1872
1873 if (old_val_dbt != NULL) {
1874 tokudb::buffer old_val(
1875 old_val_dbt->data,
1876 old_val_dbt->size,
1877 old_val_dbt->size);
1878
1879 // new val = old val
1880 tokudb::buffer new_val;
1881 new_val.append(old_val_dbt->data, old_val_dbt->size);
1882
1883 tokudb::value_map vd(&new_val);
1884
1885 // apply updates to new val
1886 apply_2_updates(vd, new_val, old_val, extra_val);
1887
1888 // set the new val
1889 DBT new_val_dbt; memset(&new_val_dbt, 0, sizeof new_val_dbt);
1890 new_val_dbt.data = new_val.data();
1891 new_val_dbt.size = new_val.size();
1892 set_val(&new_val_dbt, set_extra);
1893 }
1894
1895 return 0;
1896}
1897
1898// Simple upsert handler. Decode the upsert message. If the key does not exist,
1899// then insert a new value from the extra.
1900// Otherwise, apply the update operations to the old value, and then set the
1901// new value.
1902static int tokudb_upsert_2_fun(
1903 DB* db,
1904 const DBT* key_dbt,
1905 const DBT* old_val_dbt,
1906 const DBT* extra,
1907 void (*set_val)(const DBT* new_val_dbt, void* set_extra),
1908 void* set_extra) {
1909
1910 tokudb::buffer extra_val(extra->data, 0, extra->size);
1911
1912 uint8_t op;
1913 extra_val.consume(&op, sizeof op);
1914 assert_always(op == UPDATE_OP_UPSERT_2);
1915
1916 uint32_t insert_length = consume_uint32(extra_val);
1917 assert_always(insert_length < extra_val.limit());
1918 void* insert_row = extra_val.consume_ptr(insert_length);
1919 assert_always(insert_row);
1920
1921 if (old_val_dbt == NULL) {
1922 // insert a new row
1923 DBT new_val_dbt; memset(&new_val_dbt, 0, sizeof new_val_dbt);
1924 new_val_dbt.size = insert_length;
1925 new_val_dbt.data = insert_row;
1926 set_val(&new_val_dbt, set_extra);
1927 } else {
1928 tokudb::buffer old_val(
1929 old_val_dbt->data,
1930 old_val_dbt->size,
1931 old_val_dbt->size);
1932
1933 // new val = old val
1934 tokudb::buffer new_val;
1935 new_val.append(old_val_dbt->data, old_val_dbt->size);
1936
1937 tokudb::value_map vd(&new_val);
1938
1939 // apply updates to new val
1940 apply_2_updates(vd, new_val, old_val, extra_val);
1941
1942 // set the new val
1943 DBT new_val_dbt; memset(&new_val_dbt, 0, sizeof new_val_dbt);
1944 new_val_dbt.data = new_val.data();
1945 new_val_dbt.size = new_val.size();
1946 set_val(&new_val_dbt, set_extra);
1947 }
1948
1949 return 0;
1950}
1951
1952// This function is the update callback function that is registered with the
1953// YDB environment. It uses the first byte in the update message to identify
1954// the update message type and call the handler for that message.
1955int tokudb_update_fun(
1956 DB* db,
1957 const DBT* key,
1958 const DBT* old_val,
1959 const DBT* extra,
1960 void (*set_val)(const DBT* new_val, void* set_extra),
1961 void* set_extra) {
1962
1963 assert_always(extra->size > 0);
1964 uint8_t* extra_pos = (uchar*)extra->data;
1965 uint8_t operation = extra_pos[0];
1966 int error;
1967 switch (operation) {
1968 case UPDATE_OP_COL_ADD_OR_DROP:
1969 error = tokudb_hcad_update_fun(
1970 db,
1971 key,
1972 old_val,
1973 extra,
1974 set_val,
1975 set_extra);
1976 break;
1977 case UPDATE_OP_EXPAND_VARIABLE_OFFSETS:
1978 error = tokudb_expand_variable_offsets(
1979 db,
1980 key,
1981 old_val,
1982 extra,
1983 set_val,
1984 set_extra);
1985 break;
1986 case UPDATE_OP_EXPAND_INT:
1987 case UPDATE_OP_EXPAND_UINT:
1988 error = tokudb_expand_int_field(
1989 db,
1990 key,
1991 old_val,
1992 extra,
1993 set_val,
1994 set_extra);
1995 break;
1996 case UPDATE_OP_EXPAND_CHAR:
1997 case UPDATE_OP_EXPAND_BINARY:
1998 error = tokudb_expand_char_field(
1999 db,
2000 key,
2001 old_val,
2002 extra,
2003 set_val,
2004 set_extra);
2005 break;
2006 case UPDATE_OP_EXPAND_BLOB:
2007 error = tokudb_expand_blobs(
2008 db,
2009 key,
2010 old_val,
2011 extra,
2012 set_val,
2013 set_extra);
2014 break;
2015 case UPDATE_OP_UPDATE_1:
2016 error = tokudb_update_1_fun(
2017 db,
2018 key,
2019 old_val,
2020 extra,
2021 set_val,
2022 set_extra);
2023 break;
2024 case UPDATE_OP_UPSERT_1:
2025 error = tokudb_upsert_1_fun(
2026 db,
2027 key,
2028 old_val,
2029 extra,
2030 set_val,
2031 set_extra);
2032 break;
2033 case UPDATE_OP_UPDATE_2:
2034 error = tokudb_update_2_fun(
2035 db,
2036 key,
2037 old_val,
2038 extra,
2039 set_val,
2040 set_extra);
2041 break;
2042 case UPDATE_OP_UPSERT_2:
2043 error = tokudb_upsert_2_fun(
2044 db,
2045 key,
2046 old_val,
2047 extra,
2048 set_val,
2049 set_extra);
2050 break;
2051 default:
2052 assert_unreachable();
2053 }
2054 return error;
2055}
2056