1 | /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ |
2 | // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: |
3 | #ident "$Id$" |
4 | /*====== |
5 | This file is part of TokuDB |
6 | |
7 | |
8 | Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. |
9 | |
10 | TokuDBis is free software: you can redistribute it and/or modify |
11 | it under the terms of the GNU General Public License, version 2, |
12 | as published by the Free Software Foundation. |
13 | |
14 | TokuDB is distributed in the hope that it will be useful, |
15 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
16 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
17 | GNU General Public License for more details. |
18 | |
19 | You should have received a copy of the GNU General Public License |
20 | along with TokuDB. If not, see <http://www.gnu.org/licenses/>. |
21 | |
22 | ======= */ |
23 | |
24 | #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved." |
25 | |
26 | // Update operation codes. These codes get stuffed into update messages, so they can not change. |
27 | // The operations are currently stored in a single byte in the update message, so only 256 operations |
28 | // are supported. When we need more, we can use the last (255) code to indicate that the operation code |
29 | // is expanded beyond 1 byte. |
30 | enum { |
31 | UPDATE_OP_COL_ADD_OR_DROP = 0, |
32 | |
33 | UPDATE_OP_EXPAND_VARIABLE_OFFSETS = 1, |
34 | UPDATE_OP_EXPAND_INT = 2, |
35 | UPDATE_OP_EXPAND_UINT = 3, |
36 | UPDATE_OP_EXPAND_CHAR = 4, |
37 | UPDATE_OP_EXPAND_BINARY = 5, |
38 | UPDATE_OP_EXPAND_BLOB = 6, |
39 | |
40 | UPDATE_OP_UPDATE_1 = 10, |
41 | UPDATE_OP_UPSERT_1 = 11, |
42 | UPDATE_OP_UPDATE_2 = 12, |
43 | UPDATE_OP_UPSERT_2 = 13, |
44 | }; |
45 | |
46 | // Field types used in the update messages |
47 | enum { |
48 | UPDATE_TYPE_UNKNOWN = 0, |
49 | UPDATE_TYPE_INT = 1, |
50 | UPDATE_TYPE_UINT = 2, |
51 | UPDATE_TYPE_CHAR = 3, |
52 | UPDATE_TYPE_BINARY = 4, |
53 | UPDATE_TYPE_VARCHAR = 5, |
54 | UPDATE_TYPE_VARBINARY = 6, |
55 | UPDATE_TYPE_TEXT = 7, |
56 | UPDATE_TYPE_BLOB = 8, |
57 | }; |
58 | |
59 | #define UP_COL_ADD_OR_DROP UPDATE_OP_COL_ADD_OR_DROP |
60 | |
61 | // add or drop column sub-operations |
62 | #define COL_DROP 0xaa |
63 | #define COL_ADD 0xbb |
64 | |
65 | // add or drop column types |
66 | #define COL_FIXED 0xcc |
67 | #define COL_VAR 0xdd |
68 | #define COL_BLOB 0xee |
69 | |
70 | #define STATIC_ROW_MUTATOR_SIZE 1+8+2+8+8+8 |
71 | |
72 | // how much space do I need for the mutators? |
73 | // static stuff first: |
74 | // operation 1 == UP_COL_ADD_OR_DROP |
75 | // 8 - old null, new null |
76 | // 2 - old num_offset, new num_offset |
77 | // 8 - old fixed_field size, new fixed_field_size |
78 | // 8 - old and new length of offsets |
79 | // 8 - old and new starting null bit position |
80 | // TOTAL: 27 |
81 | |
82 | // dynamic stuff: |
83 | // 4 - number of columns |
84 | // for each column: |
85 | // 1 - add or drop |
86 | // 1 - is nullable |
87 | // 4 - if nullable, position |
88 | // 1 - if add, whether default is null or not |
89 | // 1 - if fixed, var, or not |
90 | // for fixed, entire default |
91 | // for var, 4 bytes length, then entire default |
92 | // for blob, nothing |
93 | // So, an upperbound is 4 + num_fields(12) + all default stuff |
94 | |
95 | // static blob stuff: |
96 | // 4 - num blobs |
97 | // 1 byte for each num blobs in old table |
98 | // So, an upperbound is 4 + kc_info->num_blobs |
99 | |
100 | // dynamic blob stuff: |
101 | // for each blob added: |
102 | // 1 - state if we are adding or dropping |
103 | // 4 - blob index |
104 | // if add, 1 len bytes |
105 | // at most, 4 0's |
106 | // So, upperbound is num_blobs(1+4+1+4) = num_columns*10 |
107 | |
108 | // The expand varchar offsets message is used to expand the size of an offset |
109 | // from 1 to 2 bytes. Not VLQ coded. |
110 | // uint8 operation = UPDATE_OP_EXPAND_VARIABLE_OFFSETS |
111 | // uint32 number of offsets |
112 | // uint32 starting offset of the variable length field offsets |
113 | |
114 | // Expand the size of a fixed length column message. Not VLQ coded. |
115 | // The field type is encoded in the operation code. |
116 | // uint8 operation = UPDATE_OP_EXPAND_INT/UINT/CHAR/BINARY |
117 | // uint32 offset offset of the field |
118 | // uint32 old length the old length of the field's value |
119 | // uint32 new length the new length of the field's value |
120 | |
121 | // uint8 operation = UPDATE_OP_EXPAND_CHAR/BINARY |
122 | // uint32 offset offset of the field |
123 | // uint32 old length the old length of the field's value |
124 | // uint32 new length the new length of the field's value |
125 | // uint8 pad char |
126 | |
127 | // Expand blobs message. VLQ coded. |
128 | // uint8 operation = UPDATE_OP_EXPAND_BLOB |
129 | // uint32 start variable offset |
130 | // uint32 variable offset bytes |
131 | // uint32 bytes per offset |
132 | // uint32 num blobs = N |
133 | // uint8 old lengths[N] |
134 | // uint8 new lengths[N] |
135 | |
136 | // Update and Upsert version 1 messages. Not VLQ coded. Not used anymore, but |
137 | // may be in the fractal tree from a previous build. |
138 | // |
139 | // Field descriptor: |
140 | // Operations: |
141 | // update operation 4 == { '=', '+', '-' } |
142 | // x = k |
143 | // x = x + k |
144 | // x = x - k |
145 | // field type 4 see field types above |
146 | // unused 4 unused |
147 | // field null num 4 bit 31 is 1 if the field is nullible and the |
148 | // remaining bits contain the null bit number |
149 | // field offset 4 for fixed fields, this is the offset from |
150 | // begining of the row of the field |
151 | // value: |
152 | // value length 4 == N, length of the value |
153 | // value N value to add or subtract |
154 | // |
155 | // Update_1 message: |
156 | // Operation 1 == UPDATE_OP_UPDATE_1 |
157 | // fixed field offset 4 offset of the beginning of the fixed fields |
158 | // var field offset 4 offset of the variable length offsets |
159 | // var_offset_bytes 1 length of offsets (Note: not big enough) |
160 | // bytes_per_offset 4 number of bytes per offset |
161 | // Number of update ops 4 == N |
162 | // Update ops [N] |
163 | // |
164 | // Upsert_1 message: |
165 | // Operation 1 == UPDATE_OP_UPSERT_1 |
166 | // Insert row: |
167 | // length 4 == N |
168 | // data N |
169 | // fixed field offset 4 offset of the beginning of the fixed fields |
170 | // var field offset 4 offset of the variable length offsets |
171 | // var_offset_bytes 1 length of offsets (Note: not big enough) |
172 | // bytes_per_offset 4 number of bytes per offset |
173 | // Number of update ops 4 == N |
174 | // Update ops [N] |
175 | |
176 | // Update and Upserver version 2 messages. VLQ coded. |
177 | // Update version 2 |
178 | // uint8 operation = UPDATE_OP_UPDATE_2 |
179 | // uint32 number of update ops = N |
180 | // uint8 update ops [ N ] |
181 | // |
182 | // Upsert version 2 |
183 | // uint8 operation = UPDATE_OP_UPSERT_2 |
184 | // uint32 insert length = N |
185 | // uint8 insert data [ N ] |
186 | // uint32 number of update ops = M |
187 | // update ops [ M ] |
188 | // |
189 | // Variable fields info |
190 | // uint32 update operation = 'v' |
191 | // uint32 start offset |
192 | // uint32 num varchars |
193 | // uint32 bytes per offset |
194 | // |
195 | // Blobs info |
196 | // uint32 update operation = 'b' |
197 | // uint32 num blobs = N |
198 | // uint8 blob lengths [ N ] |
199 | // |
200 | // Update operation on fixed length fields |
201 | // uint32 update operation = '=', '+', '-' |
202 | // uint32 field type |
203 | // uint32 null num 0 => not nullable, otherwise encoded as field_null_num + 1 |
204 | // uint32 offset |
205 | // uint32 value length = N |
206 | // uint8 value [ N ] |
207 | // |
208 | // Update operation on varchar fields |
209 | // uint32 update operation = '=' |
210 | // uint32 field type |
211 | // uint32 null num |
212 | // uint32 var index |
213 | // uint32 value length = N |
214 | // uint8 value [ N ] |
215 | // |
216 | // Update operation on blob fields |
217 | // uint32 update operation = '=' |
218 | // uint32 field type |
219 | // uint32 null num |
220 | // uint32 blob index |
221 | // uint32 value length = N |
222 | // uint8 value [ N ] |
223 | |
224 | #include "tokudb_buffer.h" |
225 | #include "tokudb_math.h" |
226 | |
227 | // |
228 | // checks whether the bit at index pos in data is set or not |
229 | // |
230 | static inline bool is_overall_null_position_set(uchar* data, uint32_t pos) { |
231 | uint32_t offset = pos/8; |
232 | uchar remainder = pos%8; |
233 | uchar null_bit = 1<<remainder; |
234 | return ((data[offset] & null_bit) != 0); |
235 | } |
236 | |
237 | // |
238 | // sets the bit at index pos in data to 1 if is_null, 0 otherwise |
239 | // |
240 | static inline void set_overall_null_position( |
241 | uchar* data, |
242 | uint32_t pos, |
243 | bool is_null) { |
244 | |
245 | uint32_t offset = pos/8; |
246 | uchar remainder = pos%8; |
247 | uchar null_bit = 1<<remainder; |
248 | if (is_null) { |
249 | data[offset] |= null_bit; |
250 | } |
251 | else { |
252 | data[offset] &= ~null_bit; |
253 | } |
254 | } |
255 | |
256 | static inline void copy_null_bits( |
257 | uint32_t start_old_pos, |
258 | uint32_t start_new_pos, |
259 | uint32_t num_bits, |
260 | uchar* old_null_bytes, |
261 | uchar* new_null_bytes) { |
262 | for (uint32_t i = 0; i < num_bits; i++) { |
263 | uint32_t curr_old_pos = i + start_old_pos; |
264 | uint32_t curr_new_pos = i + start_new_pos; |
265 | // copy over old null bytes |
266 | if (is_overall_null_position_set(old_null_bytes,curr_old_pos)) { |
267 | set_overall_null_position(new_null_bytes,curr_new_pos,true); |
268 | } |
269 | else { |
270 | set_overall_null_position(new_null_bytes,curr_new_pos,false); |
271 | } |
272 | } |
273 | } |
274 | |
275 | static inline void copy_var_fields( |
276 | //index of var fields that we should start writing |
277 | uint32_t start_old_num_var_field, |
278 | // number of var fields to copy |
279 | uint32_t num_var_fields, |
280 | //static ptr to where offset bytes begin in old row |
281 | uchar* old_var_field_offset_ptr, |
282 | //number of offset bytes used in old row |
283 | uchar old_num_offset_bytes, |
284 | // where the new var data should be written |
285 | uchar* start_new_var_field_data_ptr, |
286 | // where the new var offsets should be written |
287 | uchar* start_new_var_field_offset_ptr, |
288 | // pointer to beginning of var fields in new row |
289 | uchar* new_var_field_data_ptr, |
290 | // pointer to beginning of var fields in old row |
291 | uchar* old_var_field_data_ptr, |
292 | // number of offset bytes used in new row |
293 | uint32_t new_num_offset_bytes, |
294 | uint32_t* num_data_bytes_written, |
295 | uint32_t* num_offset_bytes_written) { |
296 | |
297 | uchar* curr_new_var_field_data_ptr = start_new_var_field_data_ptr; |
298 | uchar* curr_new_var_field_offset_ptr = start_new_var_field_offset_ptr; |
299 | for (uint32_t i = 0; i < num_var_fields; i++) { |
300 | uint32_t field_len; |
301 | uint32_t start_read_offset; |
302 | uint32_t curr_old = i + start_old_num_var_field; |
303 | uchar* data_to_copy = NULL; |
304 | // get the length and pointer to data that needs to be copied |
305 | get_var_field_info( |
306 | &field_len, |
307 | &start_read_offset, |
308 | curr_old, |
309 | old_var_field_offset_ptr, |
310 | old_num_offset_bytes); |
311 | data_to_copy = old_var_field_data_ptr + start_read_offset; |
312 | // now need to copy field_len bytes starting from data_to_copy |
313 | curr_new_var_field_data_ptr = write_var_field( |
314 | curr_new_var_field_offset_ptr, |
315 | curr_new_var_field_data_ptr, |
316 | new_var_field_data_ptr, |
317 | data_to_copy, |
318 | field_len, |
319 | new_num_offset_bytes); |
320 | curr_new_var_field_offset_ptr += new_num_offset_bytes; |
321 | } |
322 | *num_data_bytes_written = |
323 | (uint32_t)(curr_new_var_field_data_ptr - start_new_var_field_data_ptr); |
324 | *num_offset_bytes_written = |
325 | (uint32_t)(curr_new_var_field_offset_ptr - |
326 | start_new_var_field_offset_ptr); |
327 | } |
328 | |
329 | static inline uint32_t copy_toku_blob( |
330 | uchar* to_ptr, |
331 | uchar* from_ptr, |
332 | uint32_t len_bytes, |
333 | bool skip) { |
334 | |
335 | uint32_t length = 0; |
336 | if (!skip) { |
337 | memcpy(to_ptr, from_ptr, len_bytes); |
338 | } |
339 | length = get_blob_field_len(from_ptr,len_bytes); |
340 | if (!skip) { |
341 | memcpy(to_ptr + len_bytes, from_ptr + len_bytes, length); |
342 | } |
343 | return (length + len_bytes); |
344 | } |
345 | |
346 | static int tokudb_hcad_update_fun( |
347 | DB* db, |
348 | const DBT* key, |
349 | const DBT* old_val, |
350 | const DBT* , |
351 | void (*set_val)(const DBT* new_val, void* ), |
352 | void* ) { |
353 | |
354 | uint32_t max_num_bytes; |
355 | uint32_t num_columns; |
356 | DBT new_val; |
357 | uint32_t num_bytes_left; |
358 | uint32_t num_var_fields_to_copy; |
359 | uint32_t num_data_bytes_written = 0; |
360 | uint32_t num_offset_bytes_written = 0; |
361 | int error; |
362 | memset(&new_val, 0, sizeof(DBT)); |
363 | uchar operation; |
364 | uchar* new_val_data = NULL; |
365 | uchar* = NULL; |
366 | uchar* = NULL; |
367 | // |
368 | // info for pointers into rows |
369 | // |
370 | uint32_t old_num_null_bytes; |
371 | uint32_t new_num_null_bytes; |
372 | uchar old_num_offset_bytes; |
373 | uchar new_num_offset_bytes; |
374 | uint32_t old_fixed_field_size; |
375 | uint32_t new_fixed_field_size; |
376 | uint32_t old_len_of_offsets; |
377 | uint32_t new_len_of_offsets; |
378 | |
379 | uchar* old_fixed_field_ptr = NULL; |
380 | uchar* new_fixed_field_ptr = NULL; |
381 | uint32_t curr_old_fixed_offset; |
382 | uint32_t curr_new_fixed_offset; |
383 | |
384 | uchar* old_null_bytes = NULL; |
385 | uchar* new_null_bytes = NULL; |
386 | uint32_t curr_old_null_pos; |
387 | uint32_t curr_new_null_pos; |
388 | uint32_t old_null_bits_left; |
389 | uint32_t new_null_bits_left; |
390 | uint32_t overall_null_bits_left; |
391 | |
392 | uint32_t old_num_var_fields; |
393 | // uint32_t new_num_var_fields; |
394 | uint32_t curr_old_num_var_field; |
395 | uint32_t curr_new_num_var_field; |
396 | uchar* old_var_field_offset_ptr = NULL; |
397 | uchar* new_var_field_offset_ptr = NULL; |
398 | uchar* curr_new_var_field_offset_ptr = NULL; |
399 | uchar* old_var_field_data_ptr = NULL; |
400 | uchar* new_var_field_data_ptr = NULL; |
401 | uchar* curr_new_var_field_data_ptr = NULL; |
402 | |
403 | uint32_t start_blob_offset; |
404 | uchar* start_blob_ptr; |
405 | uint32_t num_blob_bytes; |
406 | |
407 | // came across a delete, nothing to update |
408 | if (old_val == NULL) { |
409 | error = 0; |
410 | goto cleanup; |
411 | } |
412 | |
413 | extra_pos_start = (uchar *)extra->data; |
414 | extra_pos = (uchar *)extra->data; |
415 | |
416 | operation = extra_pos[0]; |
417 | extra_pos++; |
418 | assert_always(operation == UP_COL_ADD_OR_DROP); |
419 | |
420 | memcpy(&old_num_null_bytes, extra_pos, sizeof(uint32_t)); |
421 | extra_pos += sizeof(uint32_t); |
422 | memcpy(&new_num_null_bytes, extra_pos, sizeof(uint32_t)); |
423 | extra_pos += sizeof(uint32_t); |
424 | |
425 | old_num_offset_bytes = extra_pos[0]; |
426 | extra_pos++; |
427 | new_num_offset_bytes = extra_pos[0]; |
428 | extra_pos++; |
429 | |
430 | memcpy(&old_fixed_field_size, extra_pos, sizeof(uint32_t)); |
431 | extra_pos += sizeof(uint32_t); |
432 | memcpy(&new_fixed_field_size, extra_pos, sizeof(uint32_t)); |
433 | extra_pos += sizeof(uint32_t); |
434 | |
435 | memcpy(&old_len_of_offsets, extra_pos, sizeof(uint32_t)); |
436 | extra_pos += sizeof(uint32_t); |
437 | memcpy(&new_len_of_offsets, extra_pos, sizeof(uint32_t)); |
438 | extra_pos += sizeof(uint32_t); |
439 | |
440 | max_num_bytes = |
441 | old_val->size + extra->size + new_len_of_offsets + new_fixed_field_size; |
442 | new_val_data = (uchar *)tokudb::memory::malloc( |
443 | max_num_bytes, |
444 | MYF(MY_FAE)); |
445 | if (new_val_data == NULL) { |
446 | error = ENOMEM; |
447 | goto cleanup; |
448 | } |
449 | |
450 | old_fixed_field_ptr = (uchar *) old_val->data; |
451 | old_fixed_field_ptr += old_num_null_bytes; |
452 | new_fixed_field_ptr = new_val_data + new_num_null_bytes; |
453 | curr_old_fixed_offset = 0; |
454 | curr_new_fixed_offset = 0; |
455 | |
456 | old_num_var_fields = old_len_of_offsets/old_num_offset_bytes; |
457 | // new_num_var_fields = new_len_of_offsets/new_num_offset_bytes; |
458 | // following fields will change as we write the variable data |
459 | old_var_field_offset_ptr = old_fixed_field_ptr + old_fixed_field_size; |
460 | new_var_field_offset_ptr = new_fixed_field_ptr + new_fixed_field_size; |
461 | old_var_field_data_ptr = old_var_field_offset_ptr + old_len_of_offsets; |
462 | new_var_field_data_ptr = new_var_field_offset_ptr + new_len_of_offsets; |
463 | curr_new_var_field_offset_ptr = new_var_field_offset_ptr; |
464 | curr_new_var_field_data_ptr = new_var_field_data_ptr; |
465 | curr_old_num_var_field = 0; |
466 | curr_new_num_var_field = 0; |
467 | |
468 | old_null_bytes = (uchar *)old_val->data; |
469 | new_null_bytes = new_val_data; |
470 | |
471 | memcpy(&curr_old_null_pos, extra_pos, sizeof(uint32_t)); |
472 | extra_pos += sizeof(uint32_t); |
473 | memcpy(&curr_new_null_pos, extra_pos, sizeof(uint32_t)); |
474 | extra_pos += sizeof(uint32_t); |
475 | |
476 | memcpy(&num_columns, extra_pos, sizeof(num_columns)); |
477 | extra_pos += sizeof(num_columns); |
478 | |
479 | memset(new_null_bytes, 0, new_num_null_bytes); // shut valgrind up |
480 | |
481 | // |
482 | // now go through and apply the change into new_val_data |
483 | // |
484 | for (uint32_t i = 0; i < num_columns; i++) { |
485 | uchar op_type = extra_pos[0]; |
486 | bool is_null_default = false; |
487 | extra_pos++; |
488 | |
489 | assert_always(op_type == COL_DROP || op_type == COL_ADD); |
490 | bool nullable = (extra_pos[0] != 0); |
491 | extra_pos++; |
492 | if (nullable) { |
493 | uint32_t null_bit_position; |
494 | memcpy(&null_bit_position, extra_pos, sizeof(uint32_t)); |
495 | extra_pos += sizeof(uint32_t); |
496 | uint32_t num_bits; |
497 | if (op_type == COL_DROP) { |
498 | assert_always(curr_old_null_pos <= null_bit_position); |
499 | num_bits = null_bit_position - curr_old_null_pos; |
500 | } else { |
501 | assert_always(curr_new_null_pos <= null_bit_position); |
502 | num_bits = null_bit_position - curr_new_null_pos; |
503 | } |
504 | copy_null_bits( |
505 | curr_old_null_pos, |
506 | curr_new_null_pos, |
507 | num_bits, |
508 | old_null_bytes, |
509 | new_null_bytes); |
510 | // update the positions |
511 | curr_new_null_pos += num_bits; |
512 | curr_old_null_pos += num_bits; |
513 | if (op_type == COL_DROP) { |
514 | curr_old_null_pos++; // account for dropped column |
515 | } else { |
516 | is_null_default = (extra_pos[0] != 0); |
517 | extra_pos++; |
518 | set_overall_null_position( |
519 | new_null_bytes, |
520 | null_bit_position, |
521 | is_null_default); |
522 | curr_new_null_pos++; //account for added column |
523 | } |
524 | } |
525 | uchar col_type = extra_pos[0]; |
526 | extra_pos++; |
527 | if (col_type == COL_FIXED) { |
528 | uint32_t col_offset; |
529 | uint32_t col_size; |
530 | uint32_t num_bytes_to_copy; |
531 | memcpy(&col_offset, extra_pos, sizeof(uint32_t)); |
532 | extra_pos += sizeof(uint32_t); |
533 | memcpy(&col_size, extra_pos, sizeof(uint32_t)); |
534 | extra_pos += sizeof(uint32_t); |
535 | |
536 | if (op_type == COL_DROP) { |
537 | num_bytes_to_copy = col_offset - curr_old_fixed_offset; |
538 | } else { |
539 | num_bytes_to_copy = col_offset - curr_new_fixed_offset; |
540 | } |
541 | memcpy( |
542 | new_fixed_field_ptr + curr_new_fixed_offset, |
543 | old_fixed_field_ptr + curr_old_fixed_offset, |
544 | num_bytes_to_copy); |
545 | curr_old_fixed_offset += num_bytes_to_copy; |
546 | curr_new_fixed_offset += num_bytes_to_copy; |
547 | if (op_type == COL_DROP) { |
548 | // move old_fixed_offset val to skip OVER column that is |
549 | // being dropped |
550 | curr_old_fixed_offset += col_size; |
551 | } else { |
552 | if (is_null_default) { |
553 | // copy zeroes |
554 | memset( |
555 | new_fixed_field_ptr + curr_new_fixed_offset, |
556 | 0, |
557 | col_size); |
558 | } else { |
559 | // copy data from extra_pos into new row |
560 | memcpy( |
561 | new_fixed_field_ptr + curr_new_fixed_offset, |
562 | extra_pos, |
563 | col_size); |
564 | extra_pos += col_size; |
565 | } |
566 | curr_new_fixed_offset += col_size; |
567 | } |
568 | |
569 | } else if (col_type == COL_VAR) { |
570 | uint32_t var_col_index; |
571 | memcpy(&var_col_index, extra_pos, sizeof(uint32_t)); |
572 | extra_pos += sizeof(uint32_t); |
573 | if (op_type == COL_DROP) { |
574 | num_var_fields_to_copy = var_col_index - curr_old_num_var_field; |
575 | } else { |
576 | num_var_fields_to_copy = var_col_index - curr_new_num_var_field; |
577 | } |
578 | copy_var_fields( |
579 | curr_old_num_var_field, |
580 | num_var_fields_to_copy, |
581 | old_var_field_offset_ptr, |
582 | old_num_offset_bytes, |
583 | curr_new_var_field_data_ptr, |
584 | curr_new_var_field_offset_ptr, |
585 | // pointer to beginning of var fields in new row |
586 | new_var_field_data_ptr, |
587 | // pointer to beginning of var fields in old row |
588 | old_var_field_data_ptr, |
589 | // number of offset bytes used in new row |
590 | new_num_offset_bytes, |
591 | &num_data_bytes_written, |
592 | &num_offset_bytes_written); |
593 | curr_new_var_field_data_ptr += num_data_bytes_written; |
594 | curr_new_var_field_offset_ptr += num_offset_bytes_written; |
595 | curr_new_num_var_field += num_var_fields_to_copy; |
596 | curr_old_num_var_field += num_var_fields_to_copy; |
597 | if (op_type == COL_DROP) { |
598 | curr_old_num_var_field++; // skip over dropped field |
599 | } else { |
600 | if (is_null_default) { |
601 | curr_new_var_field_data_ptr = write_var_field( |
602 | curr_new_var_field_offset_ptr, |
603 | curr_new_var_field_data_ptr, |
604 | new_var_field_data_ptr, |
605 | NULL, //copying no data |
606 | 0, //copying 0 bytes |
607 | new_num_offset_bytes); |
608 | curr_new_var_field_offset_ptr += new_num_offset_bytes; |
609 | } else { |
610 | uint32_t data_length; |
611 | memcpy(&data_length, extra_pos, sizeof(data_length)); |
612 | extra_pos += sizeof(data_length); |
613 | curr_new_var_field_data_ptr = write_var_field( |
614 | curr_new_var_field_offset_ptr, |
615 | curr_new_var_field_data_ptr, |
616 | new_var_field_data_ptr, |
617 | extra_pos, //copying data from mutator |
618 | data_length, //copying data_length bytes |
619 | new_num_offset_bytes); |
620 | extra_pos += data_length; |
621 | curr_new_var_field_offset_ptr += new_num_offset_bytes; |
622 | } |
623 | curr_new_num_var_field++; //account for added column |
624 | } |
625 | } else if (col_type == COL_BLOB) { |
626 | // handle blob data later |
627 | continue; |
628 | } else { |
629 | assert_unreachable(); |
630 | } |
631 | } |
632 | // finish copying the null stuff |
633 | old_null_bits_left = 8*old_num_null_bytes - curr_old_null_pos; |
634 | new_null_bits_left = 8*new_num_null_bytes - curr_new_null_pos; |
635 | overall_null_bits_left = old_null_bits_left; |
636 | set_if_smaller(overall_null_bits_left, new_null_bits_left); |
637 | copy_null_bits( |
638 | curr_old_null_pos, |
639 | curr_new_null_pos, |
640 | overall_null_bits_left, |
641 | old_null_bytes, |
642 | new_null_bytes); |
643 | // finish copying fixed field stuff |
644 | num_bytes_left = old_fixed_field_size - curr_old_fixed_offset; |
645 | memcpy( |
646 | new_fixed_field_ptr + curr_new_fixed_offset, |
647 | old_fixed_field_ptr + curr_old_fixed_offset, |
648 | num_bytes_left); |
649 | curr_old_fixed_offset += num_bytes_left; |
650 | curr_new_fixed_offset += num_bytes_left; |
651 | // sanity check |
652 | assert_always(curr_new_fixed_offset == new_fixed_field_size); |
653 | |
654 | // finish copying var field stuff |
655 | num_var_fields_to_copy = old_num_var_fields - curr_old_num_var_field; |
656 | copy_var_fields( |
657 | curr_old_num_var_field, |
658 | num_var_fields_to_copy, |
659 | old_var_field_offset_ptr, |
660 | old_num_offset_bytes, |
661 | curr_new_var_field_data_ptr, |
662 | curr_new_var_field_offset_ptr, |
663 | // pointer to beginning of var fields in new row |
664 | new_var_field_data_ptr, |
665 | // pointer to beginning of var fields in old row |
666 | old_var_field_data_ptr, |
667 | // number of offset bytes used in new row |
668 | new_num_offset_bytes, |
669 | &num_data_bytes_written, |
670 | &num_offset_bytes_written); |
671 | curr_new_var_field_offset_ptr += num_offset_bytes_written; |
672 | curr_new_var_field_data_ptr += num_data_bytes_written; |
673 | // sanity check |
674 | assert_always(curr_new_var_field_offset_ptr == new_var_field_data_ptr); |
675 | |
676 | // start handling blobs |
677 | get_blob_field_info( |
678 | &start_blob_offset, |
679 | old_len_of_offsets, |
680 | old_var_field_data_ptr, |
681 | old_num_offset_bytes); |
682 | start_blob_ptr = old_var_field_data_ptr + start_blob_offset; |
683 | // if nothing else in extra, then there are no blobs to add or drop, so |
684 | // can copy blobs straight |
685 | if ((extra_pos - extra_pos_start) == extra->size) { |
686 | num_blob_bytes = old_val->size - (start_blob_ptr - old_null_bytes); |
687 | memcpy(curr_new_var_field_data_ptr, start_blob_ptr, num_blob_bytes); |
688 | curr_new_var_field_data_ptr += num_blob_bytes; |
689 | } else { |
690 | // else, there is blob information to process |
691 | uchar* len_bytes = NULL; |
692 | uint32_t curr_old_blob = 0; |
693 | uint32_t curr_new_blob = 0; |
694 | uint32_t num_old_blobs = 0; |
695 | uchar* curr_old_blob_ptr = start_blob_ptr; |
696 | memcpy(&num_old_blobs, extra_pos, sizeof(num_old_blobs)); |
697 | extra_pos += sizeof(num_old_blobs); |
698 | len_bytes = extra_pos; |
699 | extra_pos += num_old_blobs; |
700 | // copy over blob fields one by one |
701 | while ((extra_pos - extra_pos_start) < extra->size) { |
702 | uchar op_type = extra_pos[0]; |
703 | extra_pos++; |
704 | uint32_t num_blobs_to_copy = 0; |
705 | uint32_t blob_index; |
706 | memcpy(&blob_index, extra_pos, sizeof(blob_index)); |
707 | extra_pos += sizeof(blob_index); |
708 | assert_always (op_type == COL_DROP || op_type == COL_ADD); |
709 | if (op_type == COL_DROP) { |
710 | num_blobs_to_copy = blob_index - curr_old_blob; |
711 | } else { |
712 | num_blobs_to_copy = blob_index - curr_new_blob; |
713 | } |
714 | for (uint32_t i = 0; i < num_blobs_to_copy; i++) { |
715 | uint32_t num_bytes_written = copy_toku_blob( |
716 | curr_new_var_field_data_ptr, |
717 | curr_old_blob_ptr, |
718 | len_bytes[curr_old_blob + i], |
719 | false); |
720 | curr_old_blob_ptr += num_bytes_written; |
721 | curr_new_var_field_data_ptr += num_bytes_written; |
722 | } |
723 | curr_old_blob += num_blobs_to_copy; |
724 | curr_new_blob += num_blobs_to_copy; |
725 | if (op_type == COL_DROP) { |
726 | // skip over blob in row |
727 | uint32_t num_bytes = copy_toku_blob( |
728 | NULL, |
729 | curr_old_blob_ptr, |
730 | len_bytes[curr_old_blob], |
731 | true); |
732 | curr_old_blob++; |
733 | curr_old_blob_ptr += num_bytes; |
734 | } else { |
735 | // copy new data |
736 | uint32_t new_len_bytes = extra_pos[0]; |
737 | extra_pos++; |
738 | uint32_t num_bytes = copy_toku_blob( |
739 | curr_new_var_field_data_ptr, |
740 | extra_pos, |
741 | new_len_bytes, |
742 | false); |
743 | curr_new_blob++; |
744 | curr_new_var_field_data_ptr += num_bytes; |
745 | extra_pos += num_bytes; |
746 | } |
747 | } |
748 | num_blob_bytes = old_val->size - (curr_old_blob_ptr - old_null_bytes); |
749 | memcpy(curr_new_var_field_data_ptr, curr_old_blob_ptr, num_blob_bytes); |
750 | curr_new_var_field_data_ptr += num_blob_bytes; |
751 | } |
752 | new_val.data = new_val_data; |
753 | new_val.size = curr_new_var_field_data_ptr - new_val_data; |
754 | set_val(&new_val, set_extra); |
755 | |
756 | error = 0; |
757 | cleanup: |
758 | tokudb::memory::free(new_val_data); |
759 | return error; |
760 | } |
761 | |
762 | // Expand the variable offset array in the old row given the update mesage |
763 | // in the extra. |
764 | static int tokudb_expand_variable_offsets( |
765 | DB* db, |
766 | const DBT* key, |
767 | const DBT* old_val, |
768 | const DBT* , |
769 | void (*set_val)(const DBT* new_val, void* ), |
770 | void* ) { |
771 | |
772 | int error = 0; |
773 | tokudb::buffer (extra->data, 0, extra->size); |
774 | |
775 | // decode the operation |
776 | uint8_t operation; |
777 | extra_val.consume(&operation, sizeof operation); |
778 | assert_always(operation == UPDATE_OP_EXPAND_VARIABLE_OFFSETS); |
779 | |
780 | // decode number of offsets |
781 | uint32_t number_of_offsets; |
782 | extra_val.consume(&number_of_offsets, sizeof number_of_offsets); |
783 | |
784 | // decode the offset start |
785 | uint32_t offset_start; |
786 | extra_val.consume(&offset_start, sizeof offset_start); |
787 | |
788 | assert_always(extra_val.size() == extra_val.limit()); |
789 | |
790 | DBT new_val; memset(&new_val, 0, sizeof new_val); |
791 | |
792 | if (old_val != NULL) { |
793 | assert_always(offset_start + number_of_offsets <= old_val->size); |
794 | |
795 | // compute the new val from the old val |
796 | uchar* old_val_ptr = (uchar*)old_val->data; |
797 | |
798 | // allocate space for the new val's data |
799 | uchar* new_val_ptr = (uchar*)tokudb::memory::malloc( |
800 | number_of_offsets + old_val->size, |
801 | MYF(MY_FAE)); |
802 | if (!new_val_ptr) { |
803 | error = ENOMEM; |
804 | goto cleanup; |
805 | } |
806 | new_val.data = new_val_ptr; |
807 | |
808 | // copy up to the start of the varchar offset |
809 | memcpy(new_val_ptr, old_val_ptr, offset_start); |
810 | new_val_ptr += offset_start; |
811 | old_val_ptr += offset_start; |
812 | |
813 | // expand each offset from 1 to 2 bytes |
814 | for (uint32_t i = 0; i < number_of_offsets; i++) { |
815 | uint16_t new_offset = *old_val_ptr; |
816 | int2store(new_val_ptr, new_offset); |
817 | new_val_ptr += 2; |
818 | old_val_ptr += 1; |
819 | } |
820 | |
821 | // copy the rest of the row |
822 | size_t n = old_val->size - (old_val_ptr - (uchar *)old_val->data); |
823 | memcpy(new_val_ptr, old_val_ptr, n); |
824 | new_val_ptr += n; |
825 | old_val_ptr += n; |
826 | new_val.size = new_val_ptr - (uchar *)new_val.data; |
827 | |
828 | assert_always(new_val_ptr == (uchar *)new_val.data + new_val.size); |
829 | assert_always(old_val_ptr == (uchar *)old_val->data + old_val->size); |
830 | |
831 | // set the new val |
832 | set_val(&new_val, set_extra); |
833 | } |
834 | |
835 | error = 0; |
836 | |
837 | cleanup: |
838 | tokudb::memory::free(new_val.data); |
839 | return error; |
840 | } |
841 | |
842 | // Expand an int field in a old row given the expand message in the extra. |
843 | static int tokudb_expand_int_field( |
844 | DB* db, |
845 | const DBT* key, |
846 | const DBT* old_val, |
847 | const DBT* , |
848 | void (*set_val)(const DBT* new_val, void* ), |
849 | void* ) { |
850 | |
851 | int error = 0; |
852 | tokudb::buffer (extra->data, 0, extra->size); |
853 | |
854 | uint8_t operation; |
855 | extra_val.consume(&operation, sizeof operation); |
856 | assert_always( |
857 | operation == UPDATE_OP_EXPAND_INT || |
858 | operation == UPDATE_OP_EXPAND_UINT); |
859 | uint32_t the_offset; |
860 | extra_val.consume(&the_offset, sizeof the_offset); |
861 | uint32_t old_length; |
862 | extra_val.consume(&old_length, sizeof old_length); |
863 | uint32_t new_length; |
864 | extra_val.consume(&new_length, sizeof new_length); |
865 | assert_always(extra_val.size() == extra_val.limit()); |
866 | |
867 | assert_always(new_length >= old_length); // expand only |
868 | |
869 | DBT new_val; memset(&new_val, 0, sizeof new_val); |
870 | |
871 | if (old_val != NULL) { |
872 | // old field within the old val |
873 | assert_always(the_offset + old_length <= old_val->size); |
874 | |
875 | // compute the new val from the old val |
876 | uchar* old_val_ptr = (uchar*)old_val->data; |
877 | |
878 | // allocate space for the new val's data |
879 | uchar* new_val_ptr = (uchar*)tokudb::memory::malloc( |
880 | old_val->size + (new_length - old_length), |
881 | MYF(MY_FAE)); |
882 | if (!new_val_ptr) { |
883 | error = ENOMEM; |
884 | goto cleanup; |
885 | } |
886 | new_val.data = new_val_ptr; |
887 | |
888 | // copy up to the old offset |
889 | memcpy(new_val_ptr, old_val_ptr, the_offset); |
890 | new_val_ptr += the_offset; |
891 | old_val_ptr += the_offset; |
892 | |
893 | switch (operation) { |
894 | case UPDATE_OP_EXPAND_INT: |
895 | // fill the entire new value with ones or zeros depending on the |
896 | // sign bit the encoding is little endian |
897 | memset( |
898 | new_val_ptr, |
899 | (old_val_ptr[old_length-1] & 0x80) ? 0xff : 0x00, |
900 | new_length); |
901 | // overlay the low bytes of the new value with the old value |
902 | memcpy(new_val_ptr, old_val_ptr, old_length); |
903 | new_val_ptr += new_length; |
904 | old_val_ptr += old_length; |
905 | break; |
906 | case UPDATE_OP_EXPAND_UINT: |
907 | // fill the entire new value with zeros |
908 | memset(new_val_ptr, 0, new_length); |
909 | // overlay the low bytes of the new value with the old value |
910 | memcpy(new_val_ptr, old_val_ptr, old_length); |
911 | new_val_ptr += new_length; |
912 | old_val_ptr += old_length; |
913 | break; |
914 | default: |
915 | assert_unreachable(); |
916 | } |
917 | |
918 | // copy the rest |
919 | size_t n = old_val->size - (old_val_ptr - (uchar *)old_val->data); |
920 | memcpy(new_val_ptr, old_val_ptr, n); |
921 | new_val_ptr += n; |
922 | old_val_ptr += n; |
923 | new_val.size = new_val_ptr - (uchar *)new_val.data; |
924 | |
925 | assert_always(new_val_ptr == (uchar *)new_val.data + new_val.size); |
926 | assert_always(old_val_ptr == (uchar *)old_val->data + old_val->size); |
927 | |
928 | // set the new val |
929 | set_val(&new_val, set_extra); |
930 | } |
931 | |
932 | error = 0; |
933 | |
934 | cleanup: |
935 | tokudb::memory::free(new_val.data); |
936 | return error; |
937 | } |
938 | |
939 | // Expand a char field in a old row given the expand message in the extra. |
940 | static int tokudb_expand_char_field( |
941 | DB* db, |
942 | const DBT* key, |
943 | const DBT* old_val, |
944 | const DBT* , |
945 | void (*set_val)(const DBT* new_val, void* ), |
946 | void* ) { |
947 | |
948 | int error = 0; |
949 | tokudb::buffer (extra->data, 0, extra->size); |
950 | |
951 | uint8_t operation; |
952 | extra_val.consume(&operation, sizeof operation); |
953 | assert_always( |
954 | operation == UPDATE_OP_EXPAND_CHAR || |
955 | operation == UPDATE_OP_EXPAND_BINARY); |
956 | uint32_t the_offset; |
957 | extra_val.consume(&the_offset, sizeof the_offset); |
958 | uint32_t old_length; |
959 | extra_val.consume(&old_length, sizeof old_length); |
960 | uint32_t new_length; |
961 | extra_val.consume(&new_length, sizeof new_length); |
962 | uchar pad_char; |
963 | extra_val.consume(&pad_char, sizeof pad_char); |
964 | assert_always(extra_val.size() == extra_val.limit()); |
965 | |
966 | assert_always(new_length >= old_length); // expand only |
967 | |
968 | DBT new_val; memset(&new_val, 0, sizeof new_val); |
969 | |
970 | if (old_val != NULL) { |
971 | // old field within the old val |
972 | assert_always(the_offset + old_length <= old_val->size); |
973 | |
974 | // compute the new val from the old val |
975 | uchar* old_val_ptr = (uchar*)old_val->data; |
976 | |
977 | // allocate space for the new val's data |
978 | uchar* new_val_ptr = (uchar*)tokudb::memory::malloc( |
979 | old_val->size + (new_length - old_length), |
980 | MYF(MY_FAE)); |
981 | if (!new_val_ptr) { |
982 | error = ENOMEM; |
983 | goto cleanup; |
984 | } |
985 | new_val.data = new_val_ptr; |
986 | |
987 | // copy up to the old offset |
988 | memcpy(new_val_ptr, old_val_ptr, the_offset); |
989 | new_val_ptr += the_offset; |
990 | old_val_ptr += the_offset; |
991 | |
992 | switch (operation) { |
993 | case UPDATE_OP_EXPAND_CHAR: |
994 | case UPDATE_OP_EXPAND_BINARY: |
995 | // fill the entire new value with the pad char |
996 | memset(new_val_ptr, pad_char, new_length); |
997 | // overlay the low bytes of the new value with the old value |
998 | memcpy(new_val_ptr, old_val_ptr, old_length); |
999 | new_val_ptr += new_length; |
1000 | old_val_ptr += old_length; |
1001 | break; |
1002 | default: |
1003 | assert_unreachable(); |
1004 | } |
1005 | |
1006 | // copy the rest |
1007 | size_t n = old_val->size - (old_val_ptr - (uchar *)old_val->data); |
1008 | memcpy(new_val_ptr, old_val_ptr, n); |
1009 | new_val_ptr += n; |
1010 | old_val_ptr += n; |
1011 | new_val.size = new_val_ptr - (uchar *)new_val.data; |
1012 | |
1013 | assert_always(new_val_ptr == (uchar *)new_val.data + new_val.size); |
1014 | assert_always(old_val_ptr == (uchar *)old_val->data + old_val->size); |
1015 | |
1016 | // set the new val |
1017 | set_val(&new_val, set_extra); |
1018 | } |
1019 | |
1020 | error = 0; |
1021 | |
1022 | cleanup: |
1023 | tokudb::memory::free(new_val.data); |
1024 | return error; |
1025 | } |
1026 | |
1027 | namespace tokudb { |
1028 | |
1029 | class var_fields { |
1030 | public: |
1031 | inline var_fields() { |
1032 | } |
1033 | inline void init_var_fields( |
1034 | uint32_t var_offset, |
1035 | uint32_t offset_bytes, |
1036 | uint32_t bytes_per_offset, |
1037 | tokudb::buffer* val_buffer) { |
1038 | |
1039 | assert_always( |
1040 | bytes_per_offset == 0 || |
1041 | bytes_per_offset == 1 || |
1042 | bytes_per_offset == 2); |
1043 | m_var_offset = var_offset; |
1044 | m_val_offset = m_var_offset + offset_bytes; |
1045 | m_bytes_per_offset = bytes_per_offset; |
1046 | if (bytes_per_offset > 0) { |
1047 | m_num_fields = offset_bytes / bytes_per_offset; |
1048 | } else { |
1049 | assert_always(offset_bytes == 0); |
1050 | m_num_fields = 0; |
1051 | } |
1052 | m_val_buffer = val_buffer; |
1053 | } |
1054 | uint32_t value_offset(uint32_t var_index); |
1055 | uint32_t value_length(uint32_t var_index); |
1056 | void update_offsets(uint32_t var_index, uint32_t old_s, uint32_t new_s); |
1057 | uint32_t end_offset(); |
1058 | void replace( |
1059 | uint32_t var_index, |
1060 | void* new_val_ptr, |
1061 | uint32_t new_val_length); |
1062 | private: |
1063 | uint32_t read_offset(uint32_t var_index); |
1064 | void write_offset(uint32_t var_index, uint32_t v); |
1065 | private: |
1066 | uint32_t m_var_offset; |
1067 | uint32_t m_val_offset; |
1068 | uint32_t m_bytes_per_offset; |
1069 | uint32_t m_num_fields; |
1070 | tokudb::buffer* m_val_buffer; |
1071 | }; |
1072 | |
1073 | // Return the ith variable length offset |
1074 | uint32_t var_fields::read_offset(uint32_t var_index) { |
1075 | uint32_t offset = 0; |
1076 | m_val_buffer->read( |
1077 | &offset, m_bytes_per_offset, m_var_offset + var_index * m_bytes_per_offset); |
1078 | return offset; |
1079 | } |
1080 | |
1081 | // Write the ith variable length offset with a new offset. |
1082 | void var_fields::write_offset(uint32_t var_index, uint32_t new_offset) { |
1083 | m_val_buffer->write( |
1084 | &new_offset, |
1085 | m_bytes_per_offset, |
1086 | m_var_offset + var_index * m_bytes_per_offset); |
1087 | } |
1088 | |
1089 | // Return the offset of the ith variable length field |
1090 | uint32_t var_fields::value_offset(uint32_t var_index) { |
1091 | assert_always(var_index < m_num_fields); |
1092 | if (var_index == 0) |
1093 | return m_val_offset; |
1094 | else |
1095 | return m_val_offset + read_offset(var_index-1); |
1096 | } |
1097 | |
1098 | // Return the length of the ith variable length field |
1099 | uint32_t var_fields::value_length(uint32_t var_index) { |
1100 | assert_always(var_index < m_num_fields); |
1101 | if (var_index == 0) |
1102 | return read_offset(0); |
1103 | else |
1104 | return read_offset(var_index) - read_offset(var_index-1); |
1105 | } |
1106 | |
1107 | // The length of the ith variable length fields changed. |
1108 | // Update all of the subsequent offsets. |
1109 | void var_fields::update_offsets( |
1110 | uint32_t var_index, |
1111 | uint32_t old_s, |
1112 | uint32_t new_s) { |
1113 | |
1114 | assert_always(var_index < m_num_fields); |
1115 | if (old_s == new_s) |
1116 | return; |
1117 | for (uint i = var_index; i < m_num_fields; i++) { |
1118 | uint32_t v = read_offset(i); |
1119 | if (new_s > old_s) |
1120 | write_offset(i, v + (new_s - old_s)); |
1121 | else |
1122 | write_offset(i, v - (old_s - new_s)); |
1123 | } |
1124 | } |
1125 | |
1126 | uint32_t var_fields::end_offset() { |
1127 | if (m_num_fields == 0) |
1128 | return m_val_offset; |
1129 | else |
1130 | return m_val_offset + read_offset(m_num_fields-1); |
1131 | } |
1132 | |
1133 | void var_fields::replace( |
1134 | uint32_t var_index, |
1135 | void* new_val_ptr, |
1136 | uint32_t new_val_length) { |
1137 | |
1138 | // replace the new val with the extra val |
1139 | uint32_t the_offset = value_offset(var_index); |
1140 | uint32_t old_s = value_length(var_index); |
1141 | uint32_t new_s = new_val_length; |
1142 | m_val_buffer->replace(the_offset, old_s, new_val_ptr, new_s); |
1143 | |
1144 | // update the var offsets |
1145 | update_offsets(var_index, old_s, new_s); |
1146 | } |
1147 | |
1148 | class blob_fields { |
1149 | public: |
1150 | blob_fields() { |
1151 | } |
1152 | void init_blob_fields( |
1153 | uint32_t num_blobs, |
1154 | const uint8_t* blob_lengths, |
1155 | tokudb::buffer* val_buffer) { |
1156 | m_num_blobs = num_blobs; |
1157 | m_blob_lengths = blob_lengths; |
1158 | m_val_buffer = val_buffer; |
1159 | } |
1160 | void start_blobs(uint32_t offset) { |
1161 | m_blob_offset = offset; |
1162 | } |
1163 | void replace(uint32_t blob_index, uint32_t length, void *p); |
1164 | |
1165 | void expand_length( |
1166 | uint32_t blob_index, |
1167 | uint8_t old_length_length, |
1168 | uint8_t new_length_length); |
1169 | private: |
1170 | uint32_t read_length(uint32_t offset, size_t size); |
1171 | void write_length(uint32_t offset, size_t size, uint32_t new_length); |
1172 | uint32_t blob_offset(uint32_t blob_index); |
1173 | private: |
1174 | uint32_t m_blob_offset; |
1175 | uint32_t m_num_blobs; |
1176 | const uint8_t *m_blob_lengths; |
1177 | tokudb::buffer *m_val_buffer; |
1178 | }; |
1179 | |
1180 | uint32_t blob_fields::read_length(uint32_t offset, size_t blob_length) { |
1181 | uint32_t length = 0; |
1182 | m_val_buffer->read(&length, blob_length, offset); |
1183 | return length; |
1184 | } |
1185 | |
1186 | void blob_fields::write_length( |
1187 | uint32_t offset, |
1188 | size_t size, |
1189 | uint32_t new_length) { |
1190 | m_val_buffer->write(&new_length, size, offset); |
1191 | } |
1192 | |
1193 | uint32_t blob_fields::blob_offset(uint32_t blob_index) { |
1194 | assert_always(blob_index < m_num_blobs); |
1195 | uint32_t offset = m_blob_offset; |
1196 | for (uint i = 0; i < blob_index; i++) { |
1197 | uint32_t blob_length = m_blob_lengths[i]; |
1198 | uint32_t length = read_length(offset, blob_length); |
1199 | offset += blob_length + length; |
1200 | } |
1201 | return offset; |
1202 | } |
1203 | |
1204 | void blob_fields::replace( |
1205 | uint32_t blob_index, |
1206 | uint32_t new_length, |
1207 | void* new_value) { |
1208 | |
1209 | assert_always(blob_index < m_num_blobs); |
1210 | |
1211 | // compute the ith blob offset |
1212 | uint32_t offset = blob_offset(blob_index); |
1213 | uint8_t blob_length = m_blob_lengths[blob_index]; |
1214 | |
1215 | // read the old length |
1216 | uint32_t old_length = read_length(offset, blob_length); |
1217 | |
1218 | // replace the data |
1219 | m_val_buffer->replace( |
1220 | offset + blob_length, |
1221 | old_length, |
1222 | new_value, |
1223 | new_length); |
1224 | |
1225 | // write the new length |
1226 | write_length(offset, blob_length, new_length); |
1227 | } |
1228 | |
1229 | void blob_fields::expand_length( |
1230 | uint32_t blob_index, |
1231 | uint8_t old_length_length, |
1232 | uint8_t new_length_length) { |
1233 | |
1234 | assert_always(blob_index < m_num_blobs); |
1235 | assert_always(old_length_length == m_blob_lengths[blob_index]); |
1236 | |
1237 | // compute the ith blob offset |
1238 | uint32_t offset = blob_offset(blob_index); |
1239 | |
1240 | // read the blob length |
1241 | uint32_t blob_length = read_length(offset, old_length_length); |
1242 | |
1243 | // expand the length |
1244 | m_val_buffer->replace( |
1245 | offset, |
1246 | old_length_length, |
1247 | &blob_length, |
1248 | new_length_length); |
1249 | } |
1250 | |
1251 | class value_map { |
1252 | public: |
1253 | value_map(tokudb::buffer *val_buffer) : m_val_buffer(val_buffer) { |
1254 | } |
1255 | |
1256 | void init_var_fields( |
1257 | uint32_t var_offset, |
1258 | uint32_t offset_bytes, |
1259 | uint32_t bytes_per_offset) { |
1260 | |
1261 | m_var_fields.init_var_fields( |
1262 | var_offset, |
1263 | offset_bytes, |
1264 | bytes_per_offset, |
1265 | m_val_buffer); |
1266 | } |
1267 | |
1268 | void init_blob_fields(uint32_t num_blobs, const uint8_t *blob_lengths) { |
1269 | m_blob_fields.init_blob_fields(num_blobs, blob_lengths, m_val_buffer); |
1270 | } |
1271 | |
1272 | // Replace the value of a fixed length field |
1273 | void replace_fixed( |
1274 | uint32_t the_offset, |
1275 | uint32_t field_null_num, |
1276 | void* new_val_ptr, |
1277 | uint32_t new_val_length) { |
1278 | |
1279 | m_val_buffer->replace( |
1280 | the_offset, |
1281 | new_val_length, |
1282 | new_val_ptr, |
1283 | new_val_length); |
1284 | maybe_clear_null(field_null_num); |
1285 | } |
1286 | |
1287 | // Replace the value of a variable length field |
1288 | void replace_varchar( |
1289 | uint32_t var_index, |
1290 | uint32_t field_null_num, |
1291 | void* new_val_ptr, |
1292 | uint32_t new_val_length) { |
1293 | |
1294 | m_var_fields.replace(var_index, new_val_ptr, new_val_length); |
1295 | maybe_clear_null(field_null_num); |
1296 | } |
1297 | |
1298 | // Replace the value of a blob field |
1299 | void replace_blob( |
1300 | uint32_t blob_index, |
1301 | uint32_t field_null_num, |
1302 | void* new_val_ptr, |
1303 | uint32_t new_val_length) { |
1304 | |
1305 | m_blob_fields.start_blobs(m_var_fields.end_offset()); |
1306 | m_blob_fields.replace(blob_index, new_val_length, new_val_ptr); |
1307 | maybe_clear_null(field_null_num); |
1308 | } |
1309 | |
1310 | void expand_blob_lengths( |
1311 | uint32_t num_blob, |
1312 | const uint8_t* old_length, |
1313 | const uint8_t* new_length); |
1314 | |
1315 | void int_op( |
1316 | uint32_t operation, |
1317 | uint32_t the_offset, |
1318 | uint32_t length, |
1319 | uint32_t field_null_num, |
1320 | tokudb::buffer& old_val, |
1321 | void* ); |
1322 | |
1323 | void uint_op( |
1324 | uint32_t operation, |
1325 | uint32_t the_offset, |
1326 | uint32_t length, |
1327 | uint32_t field_null_num, |
1328 | tokudb::buffer& old_val, |
1329 | void* ); |
1330 | |
1331 | private: |
1332 | bool is_null(uint32_t null_num, uchar *null_bytes) { |
1333 | bool field_is_null = false; |
1334 | if (null_num) { |
1335 | if (null_num & (1<<31)) |
1336 | null_num &= ~(1<<31); |
1337 | else |
1338 | null_num -= 1; |
1339 | field_is_null = is_overall_null_position_set(null_bytes, null_num); |
1340 | } |
1341 | return field_is_null; |
1342 | } |
1343 | |
1344 | void maybe_clear_null(uint32_t null_num) { |
1345 | if (null_num) { |
1346 | if (null_num & (1<<31)) |
1347 | null_num &= ~(1<<31); |
1348 | else |
1349 | null_num -= 1; |
1350 | set_overall_null_position( |
1351 | (uchar*)m_val_buffer->data(), |
1352 | null_num, |
1353 | false); |
1354 | } |
1355 | } |
1356 | |
1357 | private: |
1358 | var_fields m_var_fields; |
1359 | blob_fields m_blob_fields; |
1360 | tokudb::buffer *m_val_buffer; |
1361 | }; |
1362 | |
1363 | // Update an int field: signed newval@offset = old_val@offset OP extra_val |
1364 | void value_map::int_op( |
1365 | uint32_t operation, |
1366 | uint32_t the_offset, |
1367 | uint32_t length, |
1368 | uint32_t field_null_num, |
1369 | tokudb::buffer &old_val, |
1370 | void* ) { |
1371 | |
1372 | assert_always(the_offset + length <= m_val_buffer->size()); |
1373 | assert_always(the_offset + length <= old_val.size()); |
1374 | assert_always( |
1375 | length == 1 || length == 2 || length == 3 || |
1376 | length == 4 || length == 8); |
1377 | |
1378 | uchar *old_val_ptr = (uchar *) old_val.data(); |
1379 | bool field_is_null = is_null(field_null_num, old_val_ptr); |
1380 | int64_t v = 0; |
1381 | memcpy(&v, old_val_ptr + the_offset, length); |
1382 | v = tokudb::int_sign_extend(v, 8*length); |
1383 | int64_t = 0; |
1384 | memcpy(&extra_v, extra_val, length); |
1385 | extra_v = tokudb::int_sign_extend(extra_v, 8*length); |
1386 | switch (operation) { |
1387 | case '+': |
1388 | if (!field_is_null) { |
1389 | bool over; |
1390 | v = tokudb::int_add(v, extra_v, 8*length, &over); |
1391 | if (over) { |
1392 | if (extra_v > 0) |
1393 | v = tokudb::int_high_endpoint(8*length); |
1394 | else |
1395 | v = tokudb::int_low_endpoint(8*length); |
1396 | } |
1397 | m_val_buffer->replace(the_offset, length, &v, length); |
1398 | } |
1399 | break; |
1400 | case '-': |
1401 | if (!field_is_null) { |
1402 | bool over; |
1403 | v = tokudb::int_sub(v, extra_v, 8*length, &over); |
1404 | if (over) { |
1405 | if (extra_v > 0) |
1406 | v = tokudb::int_low_endpoint(8*length); |
1407 | else |
1408 | v = tokudb::int_high_endpoint(8*length); |
1409 | } |
1410 | m_val_buffer->replace(the_offset, length, &v, length); |
1411 | } |
1412 | break; |
1413 | default: |
1414 | assert_unreachable(); |
1415 | } |
1416 | } |
1417 | |
1418 | // Update an unsigned field: unsigned newval@offset = old_val@offset OP extra_val |
1419 | void value_map::uint_op( |
1420 | uint32_t operation, |
1421 | uint32_t the_offset, |
1422 | uint32_t length, |
1423 | uint32_t field_null_num, |
1424 | tokudb::buffer& old_val, |
1425 | void* ) { |
1426 | |
1427 | assert_always(the_offset + length <= m_val_buffer->size()); |
1428 | assert_always(the_offset + length <= old_val.size()); |
1429 | assert_always( |
1430 | length == 1 || length == 2 || length == 3 || |
1431 | length == 4 || length == 8); |
1432 | |
1433 | uchar *old_val_ptr = (uchar *) old_val.data(); |
1434 | bool field_is_null = is_null(field_null_num, old_val_ptr); |
1435 | uint64_t v = 0; |
1436 | memcpy(&v, old_val_ptr + the_offset, length); |
1437 | uint64_t = 0; |
1438 | memcpy(&extra_v, extra_val, length); |
1439 | switch (operation) { |
1440 | case '+': |
1441 | if (!field_is_null) { |
1442 | bool over; |
1443 | v = tokudb::uint_add(v, extra_v, 8*length, &over); |
1444 | if (over) { |
1445 | v = tokudb::uint_high_endpoint(8*length); |
1446 | } |
1447 | m_val_buffer->replace(the_offset, length, &v, length); |
1448 | } |
1449 | break; |
1450 | case '-': |
1451 | if (!field_is_null) { |
1452 | bool over; |
1453 | v = tokudb::uint_sub(v, extra_v, 8*length, &over); |
1454 | if (over) { |
1455 | v = tokudb::uint_low_endpoint(8*length); |
1456 | } |
1457 | m_val_buffer->replace(the_offset, length, &v, length); |
1458 | } |
1459 | break; |
1460 | default: |
1461 | assert_unreachable(); |
1462 | } |
1463 | } |
1464 | |
1465 | void value_map::expand_blob_lengths( |
1466 | uint32_t num_blob, |
1467 | const uint8_t* old_length, |
1468 | const uint8_t* new_length) { |
1469 | |
1470 | uint8_t current_length[num_blob]; |
1471 | memcpy(current_length, old_length, num_blob); |
1472 | for (uint32_t i = 0; i < num_blob; i++) { |
1473 | if (new_length[i] > current_length[i]) { |
1474 | m_blob_fields.init_blob_fields( |
1475 | num_blob, |
1476 | current_length, |
1477 | m_val_buffer); |
1478 | m_blob_fields.start_blobs(m_var_fields.end_offset()); |
1479 | m_blob_fields.expand_length(i, current_length[i], new_length[i]); |
1480 | current_length[i] = new_length[i]; |
1481 | } |
1482 | } |
1483 | } |
1484 | |
1485 | } |
1486 | |
1487 | static uint32_t consume_uint32(tokudb::buffer &b) { |
1488 | uint32_t n; |
1489 | size_t s = b.consume_ui<uint32_t>(&n); |
1490 | assert_always(s > 0); |
1491 | return n; |
1492 | } |
1493 | |
1494 | static uint8_t *consume_uint8_array(tokudb::buffer &b, uint32_t array_size) { |
1495 | uint8_t *p = (uint8_t *) b.consume_ptr(array_size); |
1496 | assert_always(p); |
1497 | return p; |
1498 | } |
1499 | |
1500 | static int tokudb_expand_blobs( |
1501 | DB* db, |
1502 | const DBT* key_dbt, |
1503 | const DBT* old_val_dbt, |
1504 | const DBT* , |
1505 | void (*set_val)(const DBT* new_val_dbt, void* ), |
1506 | void* ) { |
1507 | |
1508 | tokudb::buffer (extra->data, 0, extra->size); |
1509 | |
1510 | uint8_t operation; |
1511 | extra_val.consume(&operation, sizeof operation); |
1512 | assert_always(operation == UPDATE_OP_EXPAND_BLOB); |
1513 | |
1514 | if (old_val_dbt != NULL) { |
1515 | // new val = old val |
1516 | tokudb::buffer new_val; |
1517 | new_val.append(old_val_dbt->data, old_val_dbt->size); |
1518 | |
1519 | tokudb::value_map vd(&new_val); |
1520 | |
1521 | // decode variable field info |
1522 | uint32_t var_field_offset = consume_uint32(extra_val); |
1523 | uint32_t var_offset_bytes = consume_uint32(extra_val); |
1524 | uint32_t bytes_per_offset = consume_uint32(extra_val); |
1525 | vd.init_var_fields( |
1526 | var_field_offset, |
1527 | var_offset_bytes, |
1528 | bytes_per_offset); |
1529 | |
1530 | // decode blob info |
1531 | uint32_t num_blob = consume_uint32(extra_val); |
1532 | const uint8_t* old_blob_length = |
1533 | consume_uint8_array(extra_val, num_blob); |
1534 | const uint8_t* new_blob_length = |
1535 | consume_uint8_array(extra_val, num_blob); |
1536 | assert_always(extra_val.size() == extra_val.limit()); |
1537 | |
1538 | // expand blob lengths |
1539 | vd.expand_blob_lengths(num_blob, old_blob_length, new_blob_length); |
1540 | |
1541 | // set the new val |
1542 | DBT new_val_dbt; memset(&new_val_dbt, 0, sizeof new_val_dbt); |
1543 | new_val_dbt.data = new_val.data(); |
1544 | new_val_dbt.size = new_val.size(); |
1545 | set_val(&new_val_dbt, set_extra); |
1546 | } |
1547 | return 0; |
1548 | } |
1549 | |
1550 | // Decode and apply a sequence of update operations defined in the extra to |
1551 | // the old value and put the result in the new value. |
1552 | static void apply_1_updates( |
1553 | tokudb::value_map& vd, |
1554 | tokudb::buffer& new_val, |
1555 | tokudb::buffer& old_val, |
1556 | tokudb::buffer& ) { |
1557 | |
1558 | uint32_t num_updates; |
1559 | extra_val.consume(&num_updates, sizeof num_updates); |
1560 | for ( ; num_updates > 0; num_updates--) { |
1561 | // get the update operation |
1562 | uint32_t update_operation; |
1563 | extra_val.consume(&update_operation, sizeof update_operation); |
1564 | uint32_t field_type; |
1565 | extra_val.consume(&field_type, sizeof field_type); |
1566 | uint32_t unused; |
1567 | extra_val.consume(&unused, sizeof unused); |
1568 | uint32_t field_null_num; |
1569 | extra_val.consume(&field_null_num, sizeof field_null_num); |
1570 | uint32_t the_offset; |
1571 | extra_val.consume(&the_offset, sizeof the_offset); |
1572 | uint32_t ; |
1573 | extra_val.consume(&extra_val_length, sizeof extra_val_length); |
1574 | void * = extra_val.consume_ptr(extra_val_length); |
1575 | |
1576 | // apply the update |
1577 | switch (field_type) { |
1578 | case UPDATE_TYPE_INT: |
1579 | if (update_operation == '=') |
1580 | vd.replace_fixed( |
1581 | the_offset, |
1582 | field_null_num, |
1583 | extra_val_ptr, |
1584 | extra_val_length); |
1585 | else |
1586 | vd.int_op( |
1587 | update_operation, |
1588 | the_offset, |
1589 | extra_val_length, |
1590 | field_null_num, |
1591 | old_val, |
1592 | extra_val_ptr); |
1593 | break; |
1594 | case UPDATE_TYPE_UINT: |
1595 | if (update_operation == '=') |
1596 | vd.replace_fixed( |
1597 | the_offset, |
1598 | field_null_num, |
1599 | extra_val_ptr, |
1600 | extra_val_length); |
1601 | else |
1602 | vd.uint_op( |
1603 | update_operation, |
1604 | the_offset, |
1605 | extra_val_length, |
1606 | field_null_num, |
1607 | old_val, |
1608 | extra_val_ptr); |
1609 | break; |
1610 | case UPDATE_TYPE_CHAR: |
1611 | case UPDATE_TYPE_BINARY: |
1612 | if (update_operation == '=') |
1613 | vd.replace_fixed( |
1614 | the_offset, |
1615 | field_null_num, |
1616 | extra_val_ptr, |
1617 | extra_val_length); |
1618 | else |
1619 | assert_unreachable(); |
1620 | break; |
1621 | default: |
1622 | assert_unreachable(); |
1623 | break; |
1624 | } |
1625 | } |
1626 | assert_always(extra_val.size() == extra_val.limit()); |
1627 | } |
1628 | |
1629 | // Simple update handler. Decode the update message, apply the update operations |
1630 | // to the old value, and set the new value. |
1631 | static int tokudb_update_1_fun( |
1632 | DB* db, |
1633 | const DBT* key_dbt, |
1634 | const DBT* old_val_dbt, |
1635 | const DBT* , |
1636 | void (*set_val)(const DBT* new_val_dbt, void* ), |
1637 | void* ) { |
1638 | |
1639 | tokudb::buffer (extra->data, 0, extra->size); |
1640 | |
1641 | uint8_t operation; |
1642 | extra_val.consume(&operation, sizeof operation); |
1643 | assert_always(operation == UPDATE_OP_UPDATE_1); |
1644 | |
1645 | if (old_val_dbt != NULL) { |
1646 | // get the simple descriptor |
1647 | uint32_t m_fixed_field_offset; |
1648 | extra_val.consume(&m_fixed_field_offset, sizeof m_fixed_field_offset); |
1649 | uint32_t m_var_field_offset; |
1650 | extra_val.consume(&m_var_field_offset, sizeof m_var_field_offset); |
1651 | uint32_t m_var_offset_bytes; |
1652 | extra_val.consume(&m_var_offset_bytes, sizeof m_var_offset_bytes); |
1653 | uint32_t m_bytes_per_offset; |
1654 | extra_val.consume(&m_bytes_per_offset, sizeof m_bytes_per_offset); |
1655 | |
1656 | tokudb::buffer old_val( |
1657 | old_val_dbt->data, |
1658 | old_val_dbt->size, |
1659 | old_val_dbt->size); |
1660 | |
1661 | // new val = old val |
1662 | tokudb::buffer new_val; |
1663 | new_val.append(old_val_dbt->data, old_val_dbt->size); |
1664 | |
1665 | tokudb::value_map vd(&new_val); |
1666 | vd.init_var_fields( |
1667 | m_var_field_offset, |
1668 | m_var_offset_bytes, |
1669 | m_bytes_per_offset); |
1670 | |
1671 | // apply updates to new val |
1672 | apply_1_updates(vd, new_val, old_val, extra_val); |
1673 | |
1674 | // set the new val |
1675 | DBT new_val_dbt; memset(&new_val_dbt, 0, sizeof new_val_dbt); |
1676 | new_val_dbt.data = new_val.data(); |
1677 | new_val_dbt.size = new_val.size(); |
1678 | set_val(&new_val_dbt, set_extra); |
1679 | } |
1680 | |
1681 | return 0; |
1682 | } |
1683 | |
1684 | // Simple upsert handler. Decode the upsert message. If the key does not exist, |
1685 | // then insert a new value from the extra. |
1686 | // Otherwise, apply the update operations to the old value, and then set the |
1687 | // new value. |
1688 | static int tokudb_upsert_1_fun( |
1689 | DB* db, |
1690 | const DBT* key_dbt, |
1691 | const DBT* old_val_dbt, |
1692 | const DBT* , |
1693 | void (*set_val)(const DBT* new_val_dbt, void* ), |
1694 | void* ) { |
1695 | |
1696 | tokudb::buffer (extra->data, 0, extra->size); |
1697 | |
1698 | uint8_t operation; |
1699 | extra_val.consume(&operation, sizeof operation); |
1700 | assert_always(operation == UPDATE_OP_UPSERT_1); |
1701 | |
1702 | uint32_t insert_length; |
1703 | extra_val.consume(&insert_length, sizeof insert_length); |
1704 | void *insert_row = extra_val.consume_ptr(insert_length); |
1705 | |
1706 | if (old_val_dbt == NULL) { |
1707 | // insert a new row |
1708 | DBT new_val_dbt; memset(&new_val_dbt, 0, sizeof new_val_dbt); |
1709 | new_val_dbt.size = insert_length; |
1710 | new_val_dbt.data = insert_row; |
1711 | set_val(&new_val_dbt, set_extra); |
1712 | } else { |
1713 | // decode the simple descriptor |
1714 | uint32_t m_fixed_field_offset; |
1715 | extra_val.consume(&m_fixed_field_offset, sizeof m_fixed_field_offset); |
1716 | uint32_t m_var_field_offset; |
1717 | extra_val.consume(&m_var_field_offset, sizeof m_var_field_offset); |
1718 | uint32_t m_var_offset_bytes; |
1719 | extra_val.consume(&m_var_offset_bytes, sizeof m_var_offset_bytes); |
1720 | uint32_t m_bytes_per_offset; |
1721 | extra_val.consume(&m_bytes_per_offset, sizeof m_bytes_per_offset); |
1722 | |
1723 | tokudb::buffer old_val( |
1724 | old_val_dbt->data, |
1725 | old_val_dbt->size, |
1726 | old_val_dbt->size); |
1727 | |
1728 | // new val = old val |
1729 | tokudb::buffer new_val; |
1730 | new_val.append(old_val_dbt->data, old_val_dbt->size); |
1731 | |
1732 | tokudb::value_map vd(&new_val); |
1733 | vd.init_var_fields( |
1734 | m_var_field_offset, |
1735 | m_var_offset_bytes, |
1736 | m_bytes_per_offset); |
1737 | |
1738 | // apply updates to new val |
1739 | apply_1_updates(vd, new_val, old_val, extra_val); |
1740 | |
1741 | // set the new val |
1742 | DBT new_val_dbt; memset(&new_val_dbt, 0, sizeof new_val_dbt); |
1743 | new_val_dbt.data = new_val.data(); |
1744 | new_val_dbt.size = new_val.size(); |
1745 | set_val(&new_val_dbt, set_extra); |
1746 | } |
1747 | |
1748 | return 0; |
1749 | } |
1750 | |
1751 | // Decode and apply a sequence of update operations defined in the extra to the |
1752 | // old value and put the result in the new value. |
1753 | static void apply_2_updates( |
1754 | tokudb::value_map& vd, |
1755 | tokudb::buffer& new_val, |
1756 | tokudb::buffer& old_val, |
1757 | tokudb::buffer& ) { |
1758 | |
1759 | uint32_t num_updates = consume_uint32(extra_val); |
1760 | for (uint32_t i = 0; i < num_updates; i++) { |
1761 | uint32_t update_operation = consume_uint32(extra_val); |
1762 | if (update_operation == 'v') { |
1763 | uint32_t var_field_offset = consume_uint32(extra_val); |
1764 | uint32_t var_offset_bytes = consume_uint32(extra_val); |
1765 | uint32_t bytes_per_offset = consume_uint32(extra_val); |
1766 | vd.init_var_fields( |
1767 | var_field_offset, |
1768 | var_offset_bytes, |
1769 | bytes_per_offset); |
1770 | } else if (update_operation == 'b') { |
1771 | uint32_t num_blobs = consume_uint32(extra_val); |
1772 | const uint8_t* blob_lengths = |
1773 | consume_uint8_array(extra_val, num_blobs); |
1774 | vd.init_blob_fields(num_blobs, blob_lengths); |
1775 | } else { |
1776 | uint32_t field_type = consume_uint32(extra_val); |
1777 | uint32_t field_null_num = consume_uint32(extra_val); |
1778 | uint32_t the_offset = consume_uint32(extra_val); |
1779 | uint32_t = consume_uint32(extra_val); |
1780 | void* = extra_val.consume_ptr(extra_val_length); |
1781 | assert_always(extra_val_ptr); |
1782 | |
1783 | switch (field_type) { |
1784 | case UPDATE_TYPE_INT: |
1785 | if (update_operation == '=') |
1786 | vd.replace_fixed( |
1787 | the_offset, |
1788 | field_null_num, |
1789 | extra_val_ptr, |
1790 | extra_val_length); |
1791 | else |
1792 | vd.int_op( |
1793 | update_operation, |
1794 | the_offset, |
1795 | extra_val_length, |
1796 | field_null_num, |
1797 | old_val, |
1798 | extra_val_ptr); |
1799 | break; |
1800 | case UPDATE_TYPE_UINT: |
1801 | if (update_operation == '=') |
1802 | vd.replace_fixed( |
1803 | the_offset, |
1804 | field_null_num, |
1805 | extra_val_ptr, |
1806 | extra_val_length); |
1807 | else |
1808 | vd.uint_op( |
1809 | update_operation, |
1810 | the_offset, |
1811 | extra_val_length, |
1812 | field_null_num, |
1813 | old_val, |
1814 | extra_val_ptr); |
1815 | break; |
1816 | case UPDATE_TYPE_CHAR: |
1817 | case UPDATE_TYPE_BINARY: |
1818 | if (update_operation == '=') |
1819 | vd.replace_fixed( |
1820 | the_offset, |
1821 | field_null_num, |
1822 | extra_val_ptr, |
1823 | extra_val_length); |
1824 | else |
1825 | assert_unreachable(); |
1826 | break; |
1827 | case UPDATE_TYPE_VARBINARY: |
1828 | case UPDATE_TYPE_VARCHAR: |
1829 | if (update_operation == '=') |
1830 | vd.replace_varchar( |
1831 | the_offset, |
1832 | field_null_num, |
1833 | extra_val_ptr, |
1834 | extra_val_length); |
1835 | else |
1836 | assert_unreachable(); |
1837 | break; |
1838 | case UPDATE_TYPE_TEXT: |
1839 | case UPDATE_TYPE_BLOB: |
1840 | if (update_operation == '=') |
1841 | vd.replace_blob( |
1842 | the_offset, |
1843 | field_null_num, |
1844 | extra_val_ptr, |
1845 | extra_val_length); |
1846 | else |
1847 | assert_unreachable(); |
1848 | break; |
1849 | default: |
1850 | assert_unreachable(); |
1851 | } |
1852 | } |
1853 | } |
1854 | assert_always(extra_val.size() == extra_val.limit()); |
1855 | } |
1856 | |
1857 | // Simple update handler. Decode the update message, apply the update |
1858 | // operations to the old value, and set the new value. |
1859 | static int tokudb_update_2_fun( |
1860 | DB* db, |
1861 | const DBT* key_dbt, |
1862 | const DBT* old_val_dbt, |
1863 | const DBT* , |
1864 | void (*set_val)(const DBT* new_val_dbt, void* ), |
1865 | void* ) { |
1866 | |
1867 | tokudb::buffer (extra->data, 0, extra->size); |
1868 | |
1869 | uint8_t op; |
1870 | extra_val.consume(&op, sizeof op); |
1871 | assert_always(op == UPDATE_OP_UPDATE_2); |
1872 | |
1873 | if (old_val_dbt != NULL) { |
1874 | tokudb::buffer old_val( |
1875 | old_val_dbt->data, |
1876 | old_val_dbt->size, |
1877 | old_val_dbt->size); |
1878 | |
1879 | // new val = old val |
1880 | tokudb::buffer new_val; |
1881 | new_val.append(old_val_dbt->data, old_val_dbt->size); |
1882 | |
1883 | tokudb::value_map vd(&new_val); |
1884 | |
1885 | // apply updates to new val |
1886 | apply_2_updates(vd, new_val, old_val, extra_val); |
1887 | |
1888 | // set the new val |
1889 | DBT new_val_dbt; memset(&new_val_dbt, 0, sizeof new_val_dbt); |
1890 | new_val_dbt.data = new_val.data(); |
1891 | new_val_dbt.size = new_val.size(); |
1892 | set_val(&new_val_dbt, set_extra); |
1893 | } |
1894 | |
1895 | return 0; |
1896 | } |
1897 | |
1898 | // Simple upsert handler. Decode the upsert message. If the key does not exist, |
1899 | // then insert a new value from the extra. |
1900 | // Otherwise, apply the update operations to the old value, and then set the |
1901 | // new value. |
1902 | static int tokudb_upsert_2_fun( |
1903 | DB* db, |
1904 | const DBT* key_dbt, |
1905 | const DBT* old_val_dbt, |
1906 | const DBT* , |
1907 | void (*set_val)(const DBT* new_val_dbt, void* ), |
1908 | void* ) { |
1909 | |
1910 | tokudb::buffer (extra->data, 0, extra->size); |
1911 | |
1912 | uint8_t op; |
1913 | extra_val.consume(&op, sizeof op); |
1914 | assert_always(op == UPDATE_OP_UPSERT_2); |
1915 | |
1916 | uint32_t insert_length = consume_uint32(extra_val); |
1917 | assert_always(insert_length < extra_val.limit()); |
1918 | void* insert_row = extra_val.consume_ptr(insert_length); |
1919 | assert_always(insert_row); |
1920 | |
1921 | if (old_val_dbt == NULL) { |
1922 | // insert a new row |
1923 | DBT new_val_dbt; memset(&new_val_dbt, 0, sizeof new_val_dbt); |
1924 | new_val_dbt.size = insert_length; |
1925 | new_val_dbt.data = insert_row; |
1926 | set_val(&new_val_dbt, set_extra); |
1927 | } else { |
1928 | tokudb::buffer old_val( |
1929 | old_val_dbt->data, |
1930 | old_val_dbt->size, |
1931 | old_val_dbt->size); |
1932 | |
1933 | // new val = old val |
1934 | tokudb::buffer new_val; |
1935 | new_val.append(old_val_dbt->data, old_val_dbt->size); |
1936 | |
1937 | tokudb::value_map vd(&new_val); |
1938 | |
1939 | // apply updates to new val |
1940 | apply_2_updates(vd, new_val, old_val, extra_val); |
1941 | |
1942 | // set the new val |
1943 | DBT new_val_dbt; memset(&new_val_dbt, 0, sizeof new_val_dbt); |
1944 | new_val_dbt.data = new_val.data(); |
1945 | new_val_dbt.size = new_val.size(); |
1946 | set_val(&new_val_dbt, set_extra); |
1947 | } |
1948 | |
1949 | return 0; |
1950 | } |
1951 | |
1952 | // This function is the update callback function that is registered with the |
1953 | // YDB environment. It uses the first byte in the update message to identify |
1954 | // the update message type and call the handler for that message. |
1955 | int tokudb_update_fun( |
1956 | DB* db, |
1957 | const DBT* key, |
1958 | const DBT* old_val, |
1959 | const DBT* , |
1960 | void (*set_val)(const DBT* new_val, void* ), |
1961 | void* ) { |
1962 | |
1963 | assert_always(extra->size > 0); |
1964 | uint8_t* = (uchar*)extra->data; |
1965 | uint8_t operation = extra_pos[0]; |
1966 | int error; |
1967 | switch (operation) { |
1968 | case UPDATE_OP_COL_ADD_OR_DROP: |
1969 | error = tokudb_hcad_update_fun( |
1970 | db, |
1971 | key, |
1972 | old_val, |
1973 | extra, |
1974 | set_val, |
1975 | set_extra); |
1976 | break; |
1977 | case UPDATE_OP_EXPAND_VARIABLE_OFFSETS: |
1978 | error = tokudb_expand_variable_offsets( |
1979 | db, |
1980 | key, |
1981 | old_val, |
1982 | extra, |
1983 | set_val, |
1984 | set_extra); |
1985 | break; |
1986 | case UPDATE_OP_EXPAND_INT: |
1987 | case UPDATE_OP_EXPAND_UINT: |
1988 | error = tokudb_expand_int_field( |
1989 | db, |
1990 | key, |
1991 | old_val, |
1992 | extra, |
1993 | set_val, |
1994 | set_extra); |
1995 | break; |
1996 | case UPDATE_OP_EXPAND_CHAR: |
1997 | case UPDATE_OP_EXPAND_BINARY: |
1998 | error = tokudb_expand_char_field( |
1999 | db, |
2000 | key, |
2001 | old_val, |
2002 | extra, |
2003 | set_val, |
2004 | set_extra); |
2005 | break; |
2006 | case UPDATE_OP_EXPAND_BLOB: |
2007 | error = tokudb_expand_blobs( |
2008 | db, |
2009 | key, |
2010 | old_val, |
2011 | extra, |
2012 | set_val, |
2013 | set_extra); |
2014 | break; |
2015 | case UPDATE_OP_UPDATE_1: |
2016 | error = tokudb_update_1_fun( |
2017 | db, |
2018 | key, |
2019 | old_val, |
2020 | extra, |
2021 | set_val, |
2022 | set_extra); |
2023 | break; |
2024 | case UPDATE_OP_UPSERT_1: |
2025 | error = tokudb_upsert_1_fun( |
2026 | db, |
2027 | key, |
2028 | old_val, |
2029 | extra, |
2030 | set_val, |
2031 | set_extra); |
2032 | break; |
2033 | case UPDATE_OP_UPDATE_2: |
2034 | error = tokudb_update_2_fun( |
2035 | db, |
2036 | key, |
2037 | old_val, |
2038 | extra, |
2039 | set_val, |
2040 | set_extra); |
2041 | break; |
2042 | case UPDATE_OP_UPSERT_2: |
2043 | error = tokudb_upsert_2_fun( |
2044 | db, |
2045 | key, |
2046 | old_val, |
2047 | extra, |
2048 | set_val, |
2049 | set_extra); |
2050 | break; |
2051 | default: |
2052 | assert_unreachable(); |
2053 | } |
2054 | return error; |
2055 | } |
2056 | |