1 | /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ |
2 | // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: |
3 | #ident "$Id$" |
4 | /*====== |
5 | This file is part of TokuDB |
6 | |
7 | |
8 | Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. |
9 | |
10 | TokuDBis is free software: you can redistribute it and/or modify |
11 | it under the terms of the GNU General Public License, version 2, |
12 | as published by the Free Software Foundation. |
13 | |
14 | TokuDB is distributed in the hope that it will be useful, |
15 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
16 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
17 | GNU General Public License for more details. |
18 | |
19 | You should have received a copy of the GNU General Public License |
20 | along with TokuDB. If not, see <http://www.gnu.org/licenses/>. |
21 | |
22 | ======= */ |
23 | |
24 | #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved." |
25 | |
26 | #if defined(TOKU_INCLUDE_UPSERT) |
27 | |
28 | // Point updates and upserts |
29 | |
30 | // Restrictions: |
31 | // No triggers |
32 | // Statement or mixed replication |
33 | // Primary key must be defined |
34 | // Simple and compound primary key |
35 | // Int, char and varchar primary key types |
36 | // No updates on fields that are part of any key |
37 | // No clustering keys |
38 | // Integer and char field updates |
39 | // Update expressions: |
40 | // x = constant |
41 | // x = x + constant |
42 | // x = x - constant |
43 | // x = if (x=0,0,x-1) |
44 | // x = x + values(x) |
45 | // Session variable disables slow updates and slow upserts |
46 | |
47 | // Future features: |
48 | // Support more primary key types |
49 | // Force statement logging for fast updates |
50 | // Support clustering keys using broadcast updates |
51 | // Support primary key ranges using multicast messages |
52 | // Support more complicated update expressions |
53 | // Replace field_offset |
54 | |
55 | // Debug function to dump an Item |
56 | static void dump_item(Item* item) { |
57 | fprintf(stderr, "%u" , item->type()); |
58 | switch (item->type()) { |
59 | case Item::FUNC_ITEM: { |
60 | Item_func* func = static_cast<Item_func*>(item); |
61 | uint n = func->argument_count(); |
62 | Item** arguments = func->arguments(); |
63 | fprintf( |
64 | stderr, |
65 | ":func=%u,%s,%u(" , |
66 | func->functype(), |
67 | func->func_name(), |
68 | n); |
69 | for (uint i = 0; i < n ; i++) { |
70 | dump_item(arguments[i]); |
71 | if (i < n-1) |
72 | fprintf(stderr,"," ); |
73 | } |
74 | fprintf(stderr, ")" ); |
75 | break; |
76 | } |
77 | case Item::INT_ITEM: { |
78 | Item_int* int_item = static_cast<Item_int*>(item); |
79 | fprintf(stderr, ":int=%lld" , int_item->val_int()); |
80 | break; |
81 | } |
82 | case Item::STRING_ITEM: { |
83 | Item_string* str_item = static_cast<Item_string*>(item); |
84 | fprintf(stderr, ":str=%s" , str_item->val_str(NULL)->c_ptr()); |
85 | break; |
86 | } |
87 | case Item::FIELD_ITEM: { |
88 | Item_field* field_item = static_cast<Item_field*>(item); |
89 | fprintf( |
90 | stderr, |
91 | ":field=%s.%s.%s" , |
92 | field_item->db_name, |
93 | field_item->table_name, |
94 | field_item->field_name.str); |
95 | break; |
96 | } |
97 | case Item::COND_ITEM: { |
98 | Item_cond* cond_item = static_cast<Item_cond*>(item); |
99 | fprintf(stderr, ":cond=%s(\n" , cond_item->func_name()); |
100 | List_iterator<Item> li(*cond_item->argument_list()); |
101 | Item* list_item; |
102 | while ((list_item = li++)) { |
103 | dump_item(list_item); |
104 | fprintf(stderr, "\n" ); |
105 | } |
106 | fprintf(stderr, ")\n" ); |
107 | break; |
108 | } |
109 | case Item::INSERT_VALUE_ITEM: { |
110 | Item_insert_value* value_item = static_cast<Item_insert_value*>(item); |
111 | fprintf(stderr, ":insert_value" ); |
112 | dump_item(value_item->arg); |
113 | break; |
114 | } |
115 | default: |
116 | fprintf(stderr, ":unsupported\n" ); |
117 | break; |
118 | } |
119 | } |
120 | |
121 | // Debug function to dump an Item list |
122 | static void dump_item_list(const char* h, List<Item> &l) { |
123 | fprintf(stderr, "%s elements=%u\n" , h, l.elements); |
124 | List_iterator<Item> li(l); |
125 | Item* item; |
126 | while ((item = li++) != NULL) { |
127 | dump_item(item); |
128 | fprintf(stderr, "\n" ); |
129 | } |
130 | } |
131 | |
132 | // Find a Field by its Item name |
133 | static Field* find_field_by_name(TABLE* table, Item* item) { |
134 | if (item->type() != Item::FIELD_ITEM) |
135 | return NULL; |
136 | Item_field* field_item = static_cast<Item_field*>(item); |
137 | #if 0 |
138 | if (strcmp(table->s->db.str, field_item->db_name) != 0 || |
139 | strcmp(table->s->table_name.str, field_item->table_name) != 0) |
140 | return NULL; |
141 | Field *found_field = NULL; |
142 | for (uint i = 0; i < table->s->fields; i++) { |
143 | Field *test_field = table->s->field[i]; |
144 | if (strcmp(field_item->field_name.str, test_field->field_name.str) == 0) { |
145 | found_field = test_field; |
146 | break; |
147 | } |
148 | } |
149 | return found_field; |
150 | #else |
151 | // item->field may be a shortcut instead of the above table lookup |
152 | return field_item->field; |
153 | #endif |
154 | } |
155 | |
156 | // Return the starting offset in the value for a particular index (selected by idx) of a |
157 | // particular field (selected by expand_field_num). |
158 | // This only works for fixed length fields |
159 | static uint32_t fixed_field_offset( |
160 | uint32_t null_bytes, |
161 | KEY_AND_COL_INFO* kc_info, |
162 | uint idx, |
163 | uint expand_field_num) { |
164 | |
165 | uint32_t offset = null_bytes; |
166 | for (uint i = 0; i < expand_field_num; i++) { |
167 | if (bitmap_is_set(&kc_info->key_filters[idx], i)) |
168 | continue; |
169 | offset += kc_info->field_lengths[i]; |
170 | } |
171 | return offset; |
172 | } |
173 | |
174 | static uint32_t var_field_index( |
175 | TABLE* table, |
176 | KEY_AND_COL_INFO* kc_info, |
177 | uint idx, |
178 | uint field_num) { |
179 | |
180 | assert_always(field_num < table->s->fields); |
181 | uint v_index = 0; |
182 | for (uint i = 0; i < table->s->fields; i++) { |
183 | if (bitmap_is_set(&kc_info->key_filters[idx], i)) |
184 | continue; |
185 | if (kc_info->length_bytes[i]) { |
186 | if (i == field_num) |
187 | break; |
188 | v_index++; |
189 | } |
190 | } |
191 | return v_index; |
192 | } |
193 | |
194 | static uint32_t blob_field_index( |
195 | TABLE* table, |
196 | KEY_AND_COL_INFO* kc_info, |
197 | uint idx, |
198 | uint field_num) { |
199 | |
200 | assert_always(field_num < table->s->fields); |
201 | uint b_index; |
202 | for (b_index = 0; b_index < kc_info->num_blobs; b_index++) { |
203 | if (kc_info->blob_fields[b_index] == field_num) |
204 | break; |
205 | } |
206 | assert_always(b_index < kc_info->num_blobs); |
207 | return b_index; |
208 | } |
209 | |
210 | // Determine if an update operation can be offloaded to the storage engine. |
211 | // The update operation consists of a list of update expressions |
212 | // (fields[i] = values[i]), and a list of where conditions (conds). |
213 | // The function returns 0 if the update is handled in the storage engine. |
214 | // Otherwise, an error is returned. |
215 | int ha_tokudb::fast_update( |
216 | THD* thd, |
217 | List<Item>& update_fields, |
218 | List<Item>& update_values, |
219 | Item* conds) { |
220 | |
221 | TOKUDB_HANDLER_DBUG_ENTER("" ); |
222 | int error = 0; |
223 | |
224 | if (TOKUDB_UNLIKELY(TOKUDB_DEBUG_FLAGS(TOKUDB_DEBUG_UPSERT))) { |
225 | dump_item_list("fields" , update_fields); |
226 | dump_item_list("values" , update_values); |
227 | if (conds) { |
228 | fprintf(stderr, "conds\n" ); dump_item(conds); fprintf(stderr, "\n" ); |
229 | } |
230 | } |
231 | |
232 | if (update_fields.elements < 1 || |
233 | update_fields.elements != update_values.elements) { |
234 | error = ENOTSUP; // something is fishy with the parameters |
235 | goto return_error; |
236 | } |
237 | |
238 | if (!check_fast_update(thd, update_fields, update_values, conds)) { |
239 | error = ENOTSUP; |
240 | goto check_error; |
241 | } |
242 | |
243 | error = send_update_message( |
244 | update_fields, |
245 | update_values, |
246 | conds, |
247 | transaction); |
248 | if (error != 0) { |
249 | goto check_error; |
250 | } |
251 | |
252 | check_error: |
253 | if (error != 0) { |
254 | if (tokudb::sysvars::disable_slow_update(thd) != 0) |
255 | error = HA_ERR_UNSUPPORTED; |
256 | if (error != ENOTSUP) |
257 | print_error(error, MYF(0)); |
258 | } |
259 | |
260 | return_error: |
261 | TOKUDB_HANDLER_DBUG_RETURN(error); |
262 | } |
263 | |
264 | // Return true if an expression is a simple int expression or a simple function |
265 | // of +- int expression. |
266 | static bool check_int_result(Item* item) { |
267 | Item::Type t = item->type(); |
268 | if (t == Item::INT_ITEM) { |
269 | return true; |
270 | } else if (t == Item::FUNC_ITEM) { |
271 | Item_func* item_func = static_cast<Item_func*>(item); |
272 | if (strcmp(item_func->func_name(), "+" ) != 0 && |
273 | strcmp(item_func->func_name(), "-" ) != 0) |
274 | return false; |
275 | if (item_func->argument_count() != 1) |
276 | return false; |
277 | Item** arguments = item_func->arguments(); |
278 | if (arguments[0]->type() != Item::INT_ITEM) |
279 | return false; |
280 | return true; |
281 | } else |
282 | return false; |
283 | } |
284 | |
285 | // check that an item is an insert value item with the same field name |
286 | static bool check_insert_value(Item* item, const char* field_name) { |
287 | if (item->type() != Item::INSERT_VALUE_ITEM) |
288 | return false; |
289 | Item_insert_value* value_item = static_cast<Item_insert_value*>(item); |
290 | if (value_item->arg->type() != Item::FIELD_ITEM) |
291 | return false; |
292 | Item_field* arg = static_cast<Item_field*>(value_item->arg); |
293 | if (strcmp(field_name, arg->field_name.str) != 0) |
294 | return false; |
295 | return true; |
296 | } |
297 | |
298 | // Return true if an expression looks like field_name op constant. |
299 | static bool check_x_op_constant( |
300 | const char* field_name, |
301 | Item* item, |
302 | const char* op, |
303 | Item** item_constant, |
304 | bool allow_insert_value) { |
305 | |
306 | if (item->type() != Item::FUNC_ITEM) |
307 | return false; |
308 | Item_func* item_func = static_cast<Item_func*>(item); |
309 | if (strcmp(item_func->func_name(), op) != 0) |
310 | return false; |
311 | Item** arguments = item_func->arguments(); |
312 | uint n = item_func->argument_count(); |
313 | if (n != 2) |
314 | return false; |
315 | if (arguments[0]->type() != Item::FIELD_ITEM) |
316 | return false; |
317 | Item_field* arg0 = static_cast<Item_field*>(arguments[0]); |
318 | if (strcmp(field_name, arg0->field_name.str) != 0) |
319 | return false; |
320 | if (!check_int_result(arguments[1])) |
321 | if (!(allow_insert_value && |
322 | check_insert_value(arguments[1], field_name))) |
323 | return false; |
324 | *item_constant = arguments[1]; |
325 | return true; |
326 | } |
327 | |
328 | // Return true if an expression looks like field_name = constant |
329 | static bool check_x_equal_0(const char *field_name, Item *item) { |
330 | Item* item_constant; |
331 | if (!check_x_op_constant(field_name, item, "=" , &item_constant, false)) |
332 | return false; |
333 | if (item_constant->type() != Item::INT_ITEM || |
334 | item_constant->val_int() != 0) |
335 | return false; |
336 | return true; |
337 | } |
338 | |
339 | // Return true if an expression looks like fieldname - 1 |
340 | static bool check_x_minus_1(const char* field_name, Item* item) { |
341 | Item* item_constant; |
342 | if (!check_x_op_constant(field_name, item, "-" , &item_constant, false)) |
343 | return false; |
344 | if (item_constant->type() != Item::INT_ITEM || |
345 | item_constant->val_int() != 1) |
346 | return false; |
347 | return true; |
348 | } |
349 | |
350 | // Return true if an expression looks like if(fieldname=0, 0, fieldname-1) and |
351 | // the field named by fieldname is an unsigned int. |
352 | static bool check_decr_floor_expression(Field* lhs_field, Item* item) { |
353 | if (item->type() != Item::FUNC_ITEM) |
354 | return false; |
355 | Item_func* item_func = static_cast<Item_func*>(item); |
356 | if (strcmp(item_func->func_name(), "if" ) != 0) |
357 | return false; |
358 | Item** arguments = item_func->arguments(); |
359 | uint n = item_func->argument_count(); |
360 | if (n != 3) |
361 | return false; |
362 | if (!check_x_equal_0(lhs_field->field_name.str, arguments[0])) |
363 | return false; |
364 | if (arguments[1]->type() != Item::INT_ITEM || arguments[1]->val_int() != 0) |
365 | return false; |
366 | if (!check_x_minus_1(lhs_field->field_name.str, arguments[2])) |
367 | return false; |
368 | if (!(lhs_field->flags & UNSIGNED_FLAG)) |
369 | return false; |
370 | return true; |
371 | } |
372 | |
373 | // Check if lhs = rhs expression is simple. Return true if it is. |
374 | static bool check_update_expression( |
375 | Item* lhs_item, |
376 | Item* rhs_item, |
377 | TABLE* table, |
378 | bool allow_insert_value) { |
379 | |
380 | Field* lhs_field = find_field_by_name(table, lhs_item); |
381 | if (lhs_field == NULL) |
382 | return false; |
383 | if (!lhs_field->part_of_key.is_clear_all()) |
384 | return false; |
385 | enum_field_types lhs_type = lhs_field->type(); |
386 | Item::Type rhs_type = rhs_item->type(); |
387 | switch (lhs_type) { |
388 | case MYSQL_TYPE_TINY: |
389 | case MYSQL_TYPE_SHORT: |
390 | case MYSQL_TYPE_INT24: |
391 | case MYSQL_TYPE_LONG: |
392 | case MYSQL_TYPE_LONGLONG: |
393 | if (check_int_result(rhs_item)) |
394 | return true; |
395 | Item* item_constant; |
396 | if (check_x_op_constant( |
397 | lhs_field->field_name.str, |
398 | rhs_item, |
399 | "+" , |
400 | &item_constant, |
401 | allow_insert_value)) |
402 | return true; |
403 | if (check_x_op_constant( |
404 | lhs_field->field_name.str, |
405 | rhs_item, |
406 | "-" , |
407 | &item_constant, |
408 | allow_insert_value)) |
409 | return true; |
410 | if (check_decr_floor_expression(lhs_field, rhs_item)) |
411 | return true; |
412 | break; |
413 | case MYSQL_TYPE_STRING: |
414 | if (rhs_type == Item::INT_ITEM || rhs_type == Item::STRING_ITEM) |
415 | return true; |
416 | break; |
417 | case MYSQL_TYPE_VARCHAR: |
418 | case MYSQL_TYPE_BLOB: |
419 | if (rhs_type == Item::STRING_ITEM) |
420 | return true; |
421 | break; |
422 | default: |
423 | break; |
424 | } |
425 | return false; |
426 | } |
427 | |
428 | // Check that all update expressions are simple. Return true if they are. |
429 | static bool check_all_update_expressions( |
430 | List<Item>& fields, |
431 | List<Item>& values, |
432 | TABLE* table, |
433 | bool allow_insert_value) { |
434 | |
435 | List_iterator<Item> lhs_i(fields); |
436 | List_iterator<Item> rhs_i(values); |
437 | while (1) { |
438 | Item* lhs_item = lhs_i++; |
439 | if (lhs_item == NULL) |
440 | break; |
441 | Item* rhs_item = rhs_i++; |
442 | assert_always(rhs_item != NULL); |
443 | if (!check_update_expression( |
444 | lhs_item, |
445 | rhs_item, |
446 | table, |
447 | allow_insert_value)) |
448 | return false; |
449 | } |
450 | return true; |
451 | } |
452 | |
453 | static bool full_field_in_key(TABLE* table, Field* field) { |
454 | assert_always(table->s->primary_key < table->s->keys); |
455 | KEY* key = &table->s->key_info[table->s->primary_key]; |
456 | for (uint i = 0; i < key->user_defined_key_parts; i++) { |
457 | KEY_PART_INFO* key_part = &key->key_part[i]; |
458 | if (strcmp(field->field_name.str, key_part->field->field_name.str) == 0) { |
459 | return key_part->length == field->field_length; |
460 | } |
461 | } |
462 | return false; |
463 | } |
464 | |
465 | // Check that an expression looks like fieldname = constant, fieldname is part |
466 | // of the primary key, and the named field is an int, char or varchar type. |
467 | // Return true if it does. |
468 | static bool check_pk_field_equal_constant( |
469 | Item* item, |
470 | TABLE* table, |
471 | MY_BITMAP* pk_fields) { |
472 | |
473 | if (item->type() != Item::FUNC_ITEM) |
474 | return false; |
475 | Item_func* func = static_cast<Item_func*>(item); |
476 | if (strcmp(func->func_name(), "=" ) != 0) |
477 | return false; |
478 | uint n = func->argument_count(); |
479 | if (n != 2) |
480 | return false; |
481 | Item** arguments = func->arguments(); |
482 | Field* field = find_field_by_name(table, arguments[0]); |
483 | if (field == NULL) |
484 | return false; |
485 | if (!bitmap_test_and_clear(pk_fields, field->field_index)) |
486 | return false; |
487 | switch (field->type()) { |
488 | case MYSQL_TYPE_TINY: |
489 | case MYSQL_TYPE_SHORT: |
490 | case MYSQL_TYPE_INT24: |
491 | case MYSQL_TYPE_LONG: |
492 | case MYSQL_TYPE_LONGLONG: |
493 | return arguments[1]->type() == Item::INT_ITEM || |
494 | arguments[1]->type() == Item::STRING_ITEM; |
495 | case MYSQL_TYPE_STRING: |
496 | case MYSQL_TYPE_VARCHAR: |
497 | return full_field_in_key(table, field) && |
498 | (arguments[1]->type() == Item::INT_ITEM || |
499 | arguments[1]->type() == Item::STRING_ITEM); |
500 | default: |
501 | return false; |
502 | } |
503 | } |
504 | |
505 | // Check that the where condition covers all of the primary key components |
506 | // with fieldname = constant expressions. Return true if it does. |
507 | static bool check_point_update(Item* conds, TABLE* table) { |
508 | bool result = false; |
509 | |
510 | if (conds == NULL) |
511 | return false; // no where condition on the update |
512 | |
513 | if (table->s->primary_key >= table->s->keys) |
514 | return false; // no primary key defined |
515 | |
516 | // use a bitmap of the primary key fields to keep track of those fields |
517 | // that are covered by the where conditions |
518 | MY_BITMAP pk_fields; |
519 | if (bitmap_init(&pk_fields, NULL, table->s->fields, FALSE)) // 1 -> failure |
520 | return false; |
521 | KEY *key = &table->s->key_info[table->s->primary_key]; |
522 | for (uint i = 0; i < key->user_defined_key_parts; i++) |
523 | bitmap_set_bit(&pk_fields, key->key_part[i].field->field_index); |
524 | |
525 | switch (conds->type()) { |
526 | case Item::FUNC_ITEM: |
527 | result = check_pk_field_equal_constant(conds, table, &pk_fields); |
528 | break; |
529 | case Item::COND_ITEM: { |
530 | Item_cond* cond_item = static_cast<Item_cond*>(conds); |
531 | if (strcmp(cond_item->func_name(), "and" ) != 0) |
532 | break; |
533 | List_iterator<Item> li(*cond_item->argument_list()); |
534 | Item* list_item; |
535 | result = true; |
536 | while (result == true && (list_item = li++)) { |
537 | result = check_pk_field_equal_constant( |
538 | list_item, |
539 | table, |
540 | &pk_fields); |
541 | } |
542 | break; |
543 | } |
544 | default: |
545 | break; |
546 | } |
547 | |
548 | if (!bitmap_is_clear_all(&pk_fields)) |
549 | result = false; |
550 | bitmap_free(&pk_fields); |
551 | return result; |
552 | } |
553 | |
554 | // Return true if there are any clustering keys (except the primary). |
555 | // Precompute this when the table is opened. |
556 | static bool clustering_keys_exist(TABLE *table) { |
557 | for (uint keynr = 0; keynr < table->s->keys; keynr++) { |
558 | if (keynr != table->s->primary_key && |
559 | key_is_clustering(&table->s->key_info[keynr])) |
560 | return true; |
561 | } |
562 | return false; |
563 | } |
564 | |
565 | static bool is_strict_mode(THD* thd) { |
566 | #if 50600 <= MYSQL_VERSION_ID && MYSQL_VERSION_ID <= 50699 |
567 | return thd->is_strict_mode(); |
568 | #else |
569 | return tokudb_test(thd->variables.sql_mode & (MODE_STRICT_TRANS_TABLES | MODE_STRICT_ALL_TABLES)); |
570 | #endif |
571 | } |
572 | |
573 | // Check if an update operation can be handled by this storage engine. |
574 | // Return true if it can. |
575 | bool ha_tokudb::check_fast_update( |
576 | THD* thd, |
577 | List<Item>& fields, |
578 | List<Item>& values, |
579 | Item* conds) { |
580 | |
581 | if (!transaction) |
582 | return false; |
583 | |
584 | // avoid strict mode arithmetic overflow issues |
585 | if (is_strict_mode(thd)) |
586 | return false; |
587 | |
588 | // no triggers |
589 | if (table->triggers) |
590 | return false; |
591 | |
592 | // no binlog |
593 | if (mysql_bin_log.is_open() && |
594 | !(thd->variables.binlog_format == BINLOG_FORMAT_STMT || |
595 | thd->variables.binlog_format == BINLOG_FORMAT_MIXED)) |
596 | return false; |
597 | |
598 | // no clustering keys (need to broadcast an increment into the clustering |
599 | // keys since we are selecting with the primary key) |
600 | if (clustering_keys_exist(table)) |
601 | return false; |
602 | |
603 | if (!check_all_update_expressions(fields, values, table, false)) |
604 | return false; |
605 | |
606 | if (!check_point_update(conds, table)) |
607 | return false; |
608 | |
609 | return true; |
610 | } |
611 | |
612 | static void marshall_varchar_descriptor( |
613 | tokudb::buffer& b, |
614 | TABLE* table, |
615 | KEY_AND_COL_INFO* kc_info, |
616 | uint key_num) { |
617 | |
618 | b.append_ui<uint32_t>('v'); |
619 | b.append_ui<uint32_t>( |
620 | table->s->null_bytes + kc_info->mcp_info[key_num].fixed_field_size); |
621 | uint32_t var_offset_bytes = kc_info->mcp_info[key_num].len_of_offsets; |
622 | b.append_ui<uint32_t>(var_offset_bytes); |
623 | b.append_ui<uint32_t>( |
624 | var_offset_bytes == 0 ? 0 : kc_info->num_offset_bytes); |
625 | } |
626 | |
627 | static void marshall_blobs_descriptor( |
628 | tokudb::buffer& b, |
629 | TABLE* table, |
630 | KEY_AND_COL_INFO* kc_info) { |
631 | |
632 | b.append_ui<uint32_t>('b'); |
633 | uint32_t n = kc_info->num_blobs; |
634 | b.append_ui<uint32_t>(n); |
635 | for (uint i = 0; i < n; i++) { |
636 | uint blob_field_index = kc_info->blob_fields[i]; |
637 | assert_always(blob_field_index < table->s->fields); |
638 | uint8_t blob_field_length = |
639 | table->s->field[blob_field_index]->row_pack_length(); |
640 | b.append(&blob_field_length, sizeof blob_field_length); |
641 | } |
642 | } |
643 | |
644 | static inline uint32_t get_null_bit_position(uint32_t null_bit); |
645 | |
646 | // evaluate the int value of an item |
647 | static longlong item_val_int(Item* item) { |
648 | Item::Type t = item->type(); |
649 | if (t == Item::INSERT_VALUE_ITEM) { |
650 | Item_insert_value* value_item = static_cast<Item_insert_value*>(item); |
651 | return value_item->arg->val_int(); |
652 | } else |
653 | return item->val_int(); |
654 | } |
655 | |
656 | // Marshall update operations to a buffer. |
657 | static void marshall_update( |
658 | tokudb::buffer& b, |
659 | Item* lhs_item, |
660 | Item* rhs_item, |
661 | TABLE* table, |
662 | TOKUDB_SHARE* share) { |
663 | |
664 | // figure out the update operation type (again) |
665 | Field* lhs_field = find_field_by_name(table, lhs_item); |
666 | assert_always(lhs_field); // we found it before, so this should work |
667 | |
668 | // compute the update info |
669 | uint32_t field_type; |
670 | uint32_t field_null_num = 0; |
671 | if (lhs_field->real_maybe_null()) { |
672 | uint32_t field_num = lhs_field->field_index; |
673 | field_null_num = |
674 | ((field_num/8)*8 + get_null_bit_position(lhs_field->null_bit)) + 1; |
675 | } |
676 | uint32_t offset; |
677 | void* v_ptr = NULL; |
678 | uint32_t v_length; |
679 | uint32_t update_operation; |
680 | longlong v_ll; |
681 | String v_str; |
682 | |
683 | switch (lhs_field->type()) { |
684 | case MYSQL_TYPE_TINY: |
685 | case MYSQL_TYPE_SHORT: |
686 | case MYSQL_TYPE_INT24: |
687 | case MYSQL_TYPE_LONG: |
688 | case MYSQL_TYPE_LONGLONG: { |
689 | Field_num* lhs_num = static_cast<Field_num*>(lhs_field); |
690 | field_type = lhs_num->unsigned_flag ? UPDATE_TYPE_UINT : UPDATE_TYPE_INT; |
691 | offset = |
692 | fixed_field_offset( |
693 | table->s->null_bytes, |
694 | &share->kc_info, |
695 | table->s->primary_key, |
696 | lhs_field->field_index); |
697 | switch (rhs_item->type()) { |
698 | case Item::INT_ITEM: { |
699 | update_operation = '='; |
700 | v_ll = rhs_item->val_int(); |
701 | v_length = lhs_field->pack_length(); |
702 | v_ptr = &v_ll; |
703 | break; |
704 | } |
705 | case Item::FUNC_ITEM: { |
706 | Item_func* rhs_func = static_cast<Item_func*>(rhs_item); |
707 | Item** arguments = rhs_func->arguments(); |
708 | // we only support one if function for now, and it is a |
709 | // decrement with floor. |
710 | if (strcmp(rhs_func->func_name(), "if" ) == 0) { |
711 | update_operation = '-'; |
712 | v_ll = 1; |
713 | } else if (rhs_func->argument_count() == 1) { |
714 | update_operation = '='; |
715 | v_ll = rhs_func->val_int(); |
716 | } else { |
717 | update_operation = rhs_func->func_name()[0]; |
718 | v_ll = item_val_int(arguments[1]); |
719 | } |
720 | v_length = lhs_field->pack_length(); |
721 | v_ptr = &v_ll; |
722 | break; |
723 | } |
724 | default: |
725 | assert_unreachable(); |
726 | } |
727 | break; |
728 | } |
729 | case MYSQL_TYPE_STRING: { |
730 | update_operation = '='; |
731 | field_type = |
732 | lhs_field->binary() ? UPDATE_TYPE_BINARY : UPDATE_TYPE_CHAR; |
733 | offset = |
734 | fixed_field_offset( |
735 | table->s->null_bytes, |
736 | &share->kc_info, |
737 | table->s->primary_key, |
738 | lhs_field->field_index); |
739 | v_str = *rhs_item->val_str(&v_str); |
740 | v_length = v_str.length(); |
741 | if (v_length >= lhs_field->pack_length()) { |
742 | v_length = lhs_field->pack_length(); |
743 | v_str.length(v_length); // truncate |
744 | } else { |
745 | v_length = lhs_field->pack_length(); |
746 | uchar pad_char = |
747 | lhs_field->binary() ? 0 : lhs_field->charset()->pad_char; |
748 | v_str.fill(lhs_field->pack_length(), pad_char); // pad |
749 | } |
750 | v_ptr = v_str.c_ptr(); |
751 | break; |
752 | } |
753 | case MYSQL_TYPE_VARCHAR: { |
754 | update_operation = '='; |
755 | field_type = |
756 | lhs_field->binary() ? UPDATE_TYPE_VARBINARY : UPDATE_TYPE_VARCHAR; |
757 | offset = |
758 | var_field_index( |
759 | table, |
760 | &share->kc_info, |
761 | table->s->primary_key, |
762 | lhs_field->field_index); |
763 | v_str = *rhs_item->val_str(&v_str); |
764 | v_length = v_str.length(); |
765 | if (v_length >= lhs_field->row_pack_length()) { |
766 | v_length = lhs_field->row_pack_length(); |
767 | v_str.length(v_length); // truncate |
768 | } |
769 | v_ptr = v_str.c_ptr(); |
770 | break; |
771 | } |
772 | case MYSQL_TYPE_BLOB: { |
773 | update_operation = '='; |
774 | field_type = lhs_field->binary() ? UPDATE_TYPE_BLOB : UPDATE_TYPE_TEXT; |
775 | offset = |
776 | blob_field_index( |
777 | table, |
778 | &share->kc_info, |
779 | table->s->primary_key, |
780 | lhs_field->field_index); |
781 | v_str = *rhs_item->val_str(&v_str); |
782 | v_length = v_str.length(); |
783 | if (v_length >= lhs_field->max_data_length()) { |
784 | v_length = lhs_field->max_data_length(); |
785 | v_str.length(v_length); // truncate |
786 | } |
787 | v_ptr = v_str.c_ptr(); |
788 | break; |
789 | } |
790 | default: |
791 | assert_unreachable(); |
792 | } |
793 | |
794 | // marshall the update fields into the buffer |
795 | b.append_ui<uint32_t>(update_operation); |
796 | b.append_ui<uint32_t>(field_type); |
797 | b.append_ui<uint32_t>(field_null_num); |
798 | b.append_ui<uint32_t>(offset); |
799 | b.append_ui<uint32_t>(v_length); |
800 | b.append(v_ptr, v_length); |
801 | } |
802 | |
803 | // Save an item's value into the appropriate field. Return 0 if successful. |
804 | static int save_in_field(Item* item, TABLE* table) { |
805 | assert_always(item->type() == Item::FUNC_ITEM); |
806 | Item_func *func = static_cast<Item_func*>(item); |
807 | assert_always(strcmp(func->func_name(), "=" ) == 0); |
808 | uint n = func->argument_count(); |
809 | assert_always(n == 2); |
810 | Item **arguments = func->arguments(); |
811 | assert_always(arguments[0]->type() == Item::FIELD_ITEM); |
812 | Item_field *field_item = static_cast<Item_field*>(arguments[0]); |
813 | my_bitmap_map *old_map = dbug_tmp_use_all_columns(table, table->write_set); |
814 | int error = arguments[1]->save_in_field(field_item->field, 0); |
815 | dbug_tmp_restore_column_map(table->write_set, old_map); |
816 | return error; |
817 | } |
818 | |
819 | static void count_update_types( |
820 | Field* lhs_field, |
821 | uint* num_varchars, |
822 | uint* num_blobs) { |
823 | |
824 | switch (lhs_field->type()) { |
825 | case MYSQL_TYPE_VARCHAR: |
826 | *num_varchars += 1; |
827 | break; |
828 | case MYSQL_TYPE_BLOB: |
829 | *num_blobs += 1; |
830 | break; |
831 | default: |
832 | break; |
833 | } |
834 | } |
835 | |
836 | // Generate an update message for an update operation and send it into the |
837 | // primary tree. Return 0 if successful. |
838 | int ha_tokudb::send_update_message( |
839 | List<Item>& update_fields, |
840 | List<Item>& update_values, |
841 | Item* conds, |
842 | DB_TXN* txn) { |
843 | |
844 | int error; |
845 | |
846 | // Save the primary key from the where conditions |
847 | Item::Type t = conds->type(); |
848 | if (t == Item::FUNC_ITEM) { |
849 | error = save_in_field(conds, table); |
850 | } else if (t == Item::COND_ITEM) { |
851 | Item_cond* cond_item = static_cast<Item_cond*>(conds); |
852 | List_iterator<Item> li(*cond_item->argument_list()); |
853 | Item* list_item; |
854 | error = 0; |
855 | while (error == 0 && (list_item = li++)) { |
856 | error = save_in_field(list_item, table); |
857 | } |
858 | } else { |
859 | assert_unreachable(); |
860 | } |
861 | if (error) |
862 | return error; |
863 | |
864 | // put the primary key into key_buff and wrap it with key_dbt |
865 | DBT key_dbt; |
866 | bool has_null; |
867 | create_dbt_key_from_table( |
868 | &key_dbt, |
869 | primary_key, |
870 | key_buff, |
871 | table->record[0], |
872 | &has_null); |
873 | |
874 | // construct the update message |
875 | tokudb::buffer update_message; |
876 | |
877 | uint8_t op = UPDATE_OP_UPDATE_2; |
878 | update_message.append(&op, sizeof op); |
879 | |
880 | uint32_t num_updates = update_fields.elements; |
881 | uint num_varchars = 0, num_blobs = 0; |
882 | if (1) { |
883 | List_iterator<Item> lhs_i(update_fields); |
884 | Item* lhs_item; |
885 | while ((lhs_item = lhs_i++)) { |
886 | if (lhs_item == NULL) |
887 | break; |
888 | Field* lhs_field = find_field_by_name(table, lhs_item); |
889 | assert_always(lhs_field); // we found it before, so this should work |
890 | count_update_types(lhs_field, &num_varchars, &num_blobs); |
891 | } |
892 | if (num_varchars > 0 || num_blobs > 0) |
893 | num_updates++; |
894 | if (num_blobs > 0) |
895 | num_updates++; |
896 | } |
897 | |
898 | // append the updates |
899 | update_message.append_ui<uint32_t>(num_updates); |
900 | |
901 | if (num_varchars > 0 || num_blobs > 0) |
902 | marshall_varchar_descriptor( |
903 | update_message, |
904 | table, |
905 | &share->kc_info, |
906 | table->s->primary_key); |
907 | if (num_blobs > 0) |
908 | marshall_blobs_descriptor(update_message, table, &share->kc_info); |
909 | |
910 | List_iterator<Item> lhs_i(update_fields); |
911 | List_iterator<Item> rhs_i(update_values); |
912 | while (error == 0) { |
913 | Item* lhs_item = lhs_i++; |
914 | if (lhs_item == NULL) |
915 | break; |
916 | Item* rhs_item = rhs_i++; |
917 | assert_always(rhs_item != NULL); |
918 | marshall_update(update_message, lhs_item, rhs_item, table, share); |
919 | } |
920 | |
921 | rwlock_t_lock_read(share->_num_DBs_lock); |
922 | |
923 | // hot index in progress |
924 | if (share->num_DBs > table->s->keys + tokudb_test(hidden_primary_key)) { |
925 | error = ENOTSUP; // run on the slow path |
926 | } else { |
927 | // send the update message |
928 | DBT update_dbt; memset(&update_dbt, 0, sizeof update_dbt); |
929 | update_dbt.data = update_message.data(); |
930 | update_dbt.size = update_message.size(); |
931 | error = |
932 | share->key_file[primary_key]->update( |
933 | share->key_file[primary_key], |
934 | txn, |
935 | &key_dbt, |
936 | &update_dbt, |
937 | 0); |
938 | } |
939 | |
940 | share->_num_DBs_lock.unlock(); |
941 | |
942 | return error; |
943 | } |
944 | |
945 | // Determine if an upsert operation can be offloaded to the storage engine. |
946 | // An upsert consists of a row and a list of update expressions |
947 | // (update_fields[i] = update_values[i]). |
948 | // The function returns 0 if the upsert is handled in the storage engine. |
949 | // Otherwise, an error code is returned. |
950 | int ha_tokudb::upsert( |
951 | THD* thd, |
952 | List<Item>& update_fields, |
953 | List<Item>& update_values) { |
954 | |
955 | TOKUDB_HANDLER_DBUG_ENTER("" ); |
956 | |
957 | int error = 0; |
958 | |
959 | if (TOKUDB_UNLIKELY(TOKUDB_DEBUG_FLAGS(TOKUDB_DEBUG_UPSERT))) { |
960 | fprintf(stderr, "upsert\n" ); |
961 | dump_item_list("update_fields" , update_fields); |
962 | dump_item_list("update_values" , update_values); |
963 | } |
964 | |
965 | // not an upsert or something is fishy with the parameters |
966 | if (update_fields.elements < 1 || |
967 | update_fields.elements != update_values.elements) { |
968 | error = ENOTSUP; |
969 | goto return_error; |
970 | } |
971 | |
972 | if (!check_upsert(thd, update_fields, update_values)) { |
973 | error = ENOTSUP; |
974 | goto check_error; |
975 | } |
976 | |
977 | error = send_upsert_message(thd, update_fields, update_values, transaction); |
978 | if (error != 0) { |
979 | goto check_error; |
980 | } |
981 | |
982 | check_error: |
983 | if (error != 0) { |
984 | if (tokudb::sysvars::disable_slow_upsert(thd) != 0) |
985 | error = HA_ERR_UNSUPPORTED; |
986 | if (error != ENOTSUP) |
987 | print_error(error, MYF(0)); |
988 | } |
989 | |
990 | return_error: |
991 | TOKUDB_HANDLER_DBUG_RETURN(error); |
992 | } |
993 | |
994 | // Check if an upsert can be handled by this storage engine. |
995 | // Return true if it can. |
996 | bool ha_tokudb::check_upsert( |
997 | THD* thd, |
998 | List<Item>& update_fields, |
999 | List<Item>& update_values) { |
1000 | |
1001 | if (!transaction) |
1002 | return false; |
1003 | |
1004 | // avoid strict mode arithmetic overflow issues |
1005 | if (is_strict_mode(thd)) |
1006 | return false; |
1007 | |
1008 | // no triggers |
1009 | if (table->triggers) |
1010 | return false; |
1011 | |
1012 | // primary key must exist |
1013 | if (table->s->primary_key >= table->s->keys) |
1014 | return false; |
1015 | |
1016 | // no secondary keys |
1017 | if (table->s->keys > 1) |
1018 | return false; |
1019 | |
1020 | // no binlog |
1021 | if (mysql_bin_log.is_open() && |
1022 | !(thd->variables.binlog_format == BINLOG_FORMAT_STMT || |
1023 | thd->variables.binlog_format == BINLOG_FORMAT_MIXED)) |
1024 | return false; |
1025 | |
1026 | if (!check_all_update_expressions( |
1027 | update_fields, |
1028 | update_values, |
1029 | table, |
1030 | true)) |
1031 | return false; |
1032 | |
1033 | return true; |
1034 | } |
1035 | |
1036 | // Generate an upsert message and send it into the primary tree. |
1037 | // Return 0 if successful. |
1038 | int ha_tokudb::send_upsert_message( |
1039 | THD* thd, |
1040 | List<Item>& update_fields, |
1041 | List<Item>& update_values, |
1042 | DB_TXN* txn) { |
1043 | int error = 0; |
1044 | |
1045 | // generate primary key |
1046 | DBT key_dbt; |
1047 | bool has_null; |
1048 | create_dbt_key_from_table( |
1049 | &key_dbt, |
1050 | primary_key, |
1051 | primary_key_buff, |
1052 | table->record[0], |
1053 | &has_null); |
1054 | |
1055 | // generate packed row |
1056 | DBT row; |
1057 | error = pack_row(&row, (const uchar *) table->record[0], primary_key); |
1058 | if (error) |
1059 | return error; |
1060 | |
1061 | tokudb::buffer update_message; |
1062 | |
1063 | // append the operation |
1064 | uint8_t op = UPDATE_OP_UPSERT_2; |
1065 | update_message.append(&op, sizeof op); |
1066 | |
1067 | // append the row |
1068 | update_message.append_ui<uint32_t>(row.size); |
1069 | update_message.append(row.data, row.size); |
1070 | |
1071 | uint32_t num_updates = update_fields.elements; |
1072 | uint num_varchars = 0, num_blobs = 0; |
1073 | if (1) { |
1074 | List_iterator<Item> lhs_i(update_fields); |
1075 | Item* lhs_item; |
1076 | while ((lhs_item = lhs_i++)) { |
1077 | if (lhs_item == NULL) |
1078 | break; |
1079 | Field* lhs_field = find_field_by_name(table, lhs_item); |
1080 | assert_always(lhs_field); // we found it before, so this should work |
1081 | count_update_types(lhs_field, &num_varchars, &num_blobs); |
1082 | } |
1083 | if (num_varchars > 0 || num_blobs > 0) |
1084 | num_updates++; |
1085 | if (num_blobs > 0) |
1086 | num_updates++; |
1087 | } |
1088 | |
1089 | // append the updates |
1090 | update_message.append_ui<uint32_t>(num_updates); |
1091 | |
1092 | if (num_varchars > 0 || num_blobs > 0) |
1093 | marshall_varchar_descriptor( |
1094 | update_message, |
1095 | table, &share->kc_info, |
1096 | table->s->primary_key); |
1097 | if (num_blobs > 0) |
1098 | marshall_blobs_descriptor(update_message, table, &share->kc_info); |
1099 | |
1100 | List_iterator<Item> lhs_i(update_fields); |
1101 | List_iterator<Item> rhs_i(update_values); |
1102 | while (1) { |
1103 | Item* lhs_item = lhs_i++; |
1104 | if (lhs_item == NULL) |
1105 | break; |
1106 | Item* rhs_item = rhs_i++; |
1107 | assert_always(rhs_item != NULL); |
1108 | marshall_update(update_message, lhs_item, rhs_item, table, share); |
1109 | } |
1110 | |
1111 | rwlock_t_lock_read(share->_num_DBs_lock); |
1112 | |
1113 | // hot index in progress |
1114 | if (share->num_DBs > table->s->keys + tokudb_test(hidden_primary_key)) { |
1115 | error = ENOTSUP; // run on the slow path |
1116 | } else { |
1117 | // send the upsert message |
1118 | DBT update_dbt; memset(&update_dbt, 0, sizeof update_dbt); |
1119 | update_dbt.data = update_message.data(); |
1120 | update_dbt.size = update_message.size(); |
1121 | error = |
1122 | share->key_file[primary_key]->update( |
1123 | share->key_file[primary_key], |
1124 | txn, |
1125 | &key_dbt, |
1126 | &update_dbt, |
1127 | 0); |
1128 | } |
1129 | |
1130 | share->_num_DBs_lock.unlock(); |
1131 | |
1132 | return error; |
1133 | } |
1134 | |
1135 | #endif |
1136 | |