1/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3#ident "$Id$"
4/*======
5This file is part of TokuDB
6
7
8Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 TokuDBis is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 TokuDB is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with TokuDB. If not, see <http://www.gnu.org/licenses/>.
21
22======= */
23
24#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
25
26#if defined(TOKU_INCLUDE_UPSERT)
27
28// Point updates and upserts
29
30// Restrictions:
31// No triggers
32// Statement or mixed replication
33// Primary key must be defined
34// Simple and compound primary key
35// Int, char and varchar primary key types
36// No updates on fields that are part of any key
37// No clustering keys
38// Integer and char field updates
39// Update expressions:
40// x = constant
41// x = x + constant
42// x = x - constant
43// x = if (x=0,0,x-1)
44// x = x + values(x)
45// Session variable disables slow updates and slow upserts
46
47// Future features:
48// Support more primary key types
49// Force statement logging for fast updates
50// Support clustering keys using broadcast updates
51// Support primary key ranges using multicast messages
52// Support more complicated update expressions
53// Replace field_offset
54
55// Debug function to dump an Item
56static void dump_item(Item* item) {
57 fprintf(stderr, "%u", item->type());
58 switch (item->type()) {
59 case Item::FUNC_ITEM: {
60 Item_func* func = static_cast<Item_func*>(item);
61 uint n = func->argument_count();
62 Item** arguments = func->arguments();
63 fprintf(
64 stderr,
65 ":func=%u,%s,%u(",
66 func->functype(),
67 func->func_name(),
68 n);
69 for (uint i = 0; i < n ; i++) {
70 dump_item(arguments[i]);
71 if (i < n-1)
72 fprintf(stderr,",");
73 }
74 fprintf(stderr, ")");
75 break;
76 }
77 case Item::INT_ITEM: {
78 Item_int* int_item = static_cast<Item_int*>(item);
79 fprintf(stderr, ":int=%lld", int_item->val_int());
80 break;
81 }
82 case Item::STRING_ITEM: {
83 Item_string* str_item = static_cast<Item_string*>(item);
84 fprintf(stderr, ":str=%s", str_item->val_str(NULL)->c_ptr());
85 break;
86 }
87 case Item::FIELD_ITEM: {
88 Item_field* field_item = static_cast<Item_field*>(item);
89 fprintf(
90 stderr,
91 ":field=%s.%s.%s",
92 field_item->db_name,
93 field_item->table_name,
94 field_item->field_name.str);
95 break;
96 }
97 case Item::COND_ITEM: {
98 Item_cond* cond_item = static_cast<Item_cond*>(item);
99 fprintf(stderr, ":cond=%s(\n", cond_item->func_name());
100 List_iterator<Item> li(*cond_item->argument_list());
101 Item* list_item;
102 while ((list_item = li++)) {
103 dump_item(list_item);
104 fprintf(stderr, "\n");
105 }
106 fprintf(stderr, ")\n");
107 break;
108 }
109 case Item::INSERT_VALUE_ITEM: {
110 Item_insert_value* value_item = static_cast<Item_insert_value*>(item);
111 fprintf(stderr, ":insert_value");
112 dump_item(value_item->arg);
113 break;
114 }
115 default:
116 fprintf(stderr, ":unsupported\n");
117 break;
118 }
119}
120
121// Debug function to dump an Item list
122static void dump_item_list(const char* h, List<Item> &l) {
123 fprintf(stderr, "%s elements=%u\n", h, l.elements);
124 List_iterator<Item> li(l);
125 Item* item;
126 while ((item = li++) != NULL) {
127 dump_item(item);
128 fprintf(stderr, "\n");
129 }
130}
131
132// Find a Field by its Item name
133static Field* find_field_by_name(TABLE* table, Item* item) {
134 if (item->type() != Item::FIELD_ITEM)
135 return NULL;
136 Item_field* field_item = static_cast<Item_field*>(item);
137#if 0
138 if (strcmp(table->s->db.str, field_item->db_name) != 0 ||
139 strcmp(table->s->table_name.str, field_item->table_name) != 0)
140 return NULL;
141 Field *found_field = NULL;
142 for (uint i = 0; i < table->s->fields; i++) {
143 Field *test_field = table->s->field[i];
144 if (strcmp(field_item->field_name.str, test_field->field_name.str) == 0) {
145 found_field = test_field;
146 break;
147 }
148 }
149 return found_field;
150#else
151 // item->field may be a shortcut instead of the above table lookup
152 return field_item->field;
153#endif
154}
155
156// Return the starting offset in the value for a particular index (selected by idx) of a
157// particular field (selected by expand_field_num).
158// This only works for fixed length fields
159static uint32_t fixed_field_offset(
160 uint32_t null_bytes,
161 KEY_AND_COL_INFO* kc_info,
162 uint idx,
163 uint expand_field_num) {
164
165 uint32_t offset = null_bytes;
166 for (uint i = 0; i < expand_field_num; i++) {
167 if (bitmap_is_set(&kc_info->key_filters[idx], i))
168 continue;
169 offset += kc_info->field_lengths[i];
170 }
171 return offset;
172}
173
174static uint32_t var_field_index(
175 TABLE* table,
176 KEY_AND_COL_INFO* kc_info,
177 uint idx,
178 uint field_num) {
179
180 assert_always(field_num < table->s->fields);
181 uint v_index = 0;
182 for (uint i = 0; i < table->s->fields; i++) {
183 if (bitmap_is_set(&kc_info->key_filters[idx], i))
184 continue;
185 if (kc_info->length_bytes[i]) {
186 if (i == field_num)
187 break;
188 v_index++;
189 }
190 }
191 return v_index;
192}
193
194static uint32_t blob_field_index(
195 TABLE* table,
196 KEY_AND_COL_INFO* kc_info,
197 uint idx,
198 uint field_num) {
199
200 assert_always(field_num < table->s->fields);
201 uint b_index;
202 for (b_index = 0; b_index < kc_info->num_blobs; b_index++) {
203 if (kc_info->blob_fields[b_index] == field_num)
204 break;
205 }
206 assert_always(b_index < kc_info->num_blobs);
207 return b_index;
208}
209
210// Determine if an update operation can be offloaded to the storage engine.
211// The update operation consists of a list of update expressions
212// (fields[i] = values[i]), and a list of where conditions (conds).
213// The function returns 0 if the update is handled in the storage engine.
214// Otherwise, an error is returned.
215int ha_tokudb::fast_update(
216 THD* thd,
217 List<Item>& update_fields,
218 List<Item>& update_values,
219 Item* conds) {
220
221 TOKUDB_HANDLER_DBUG_ENTER("");
222 int error = 0;
223
224 if (TOKUDB_UNLIKELY(TOKUDB_DEBUG_FLAGS(TOKUDB_DEBUG_UPSERT))) {
225 dump_item_list("fields", update_fields);
226 dump_item_list("values", update_values);
227 if (conds) {
228 fprintf(stderr, "conds\n"); dump_item(conds); fprintf(stderr, "\n");
229 }
230 }
231
232 if (update_fields.elements < 1 ||
233 update_fields.elements != update_values.elements) {
234 error = ENOTSUP; // something is fishy with the parameters
235 goto return_error;
236 }
237
238 if (!check_fast_update(thd, update_fields, update_values, conds)) {
239 error = ENOTSUP;
240 goto check_error;
241 }
242
243 error = send_update_message(
244 update_fields,
245 update_values,
246 conds,
247 transaction);
248 if (error != 0) {
249 goto check_error;
250 }
251
252check_error:
253 if (error != 0) {
254 if (tokudb::sysvars::disable_slow_update(thd) != 0)
255 error = HA_ERR_UNSUPPORTED;
256 if (error != ENOTSUP)
257 print_error(error, MYF(0));
258 }
259
260return_error:
261 TOKUDB_HANDLER_DBUG_RETURN(error);
262}
263
264// Return true if an expression is a simple int expression or a simple function
265// of +- int expression.
266static bool check_int_result(Item* item) {
267 Item::Type t = item->type();
268 if (t == Item::INT_ITEM) {
269 return true;
270 } else if (t == Item::FUNC_ITEM) {
271 Item_func* item_func = static_cast<Item_func*>(item);
272 if (strcmp(item_func->func_name(), "+") != 0 &&
273 strcmp(item_func->func_name(), "-") != 0)
274 return false;
275 if (item_func->argument_count() != 1)
276 return false;
277 Item** arguments = item_func->arguments();
278 if (arguments[0]->type() != Item::INT_ITEM)
279 return false;
280 return true;
281 } else
282 return false;
283}
284
285// check that an item is an insert value item with the same field name
286static bool check_insert_value(Item* item, const char* field_name) {
287 if (item->type() != Item::INSERT_VALUE_ITEM)
288 return false;
289 Item_insert_value* value_item = static_cast<Item_insert_value*>(item);
290 if (value_item->arg->type() != Item::FIELD_ITEM)
291 return false;
292 Item_field* arg = static_cast<Item_field*>(value_item->arg);
293 if (strcmp(field_name, arg->field_name.str) != 0)
294 return false;
295 return true;
296}
297
298// Return true if an expression looks like field_name op constant.
299static bool check_x_op_constant(
300 const char* field_name,
301 Item* item,
302 const char* op,
303 Item** item_constant,
304 bool allow_insert_value) {
305
306 if (item->type() != Item::FUNC_ITEM)
307 return false;
308 Item_func* item_func = static_cast<Item_func*>(item);
309 if (strcmp(item_func->func_name(), op) != 0)
310 return false;
311 Item** arguments = item_func->arguments();
312 uint n = item_func->argument_count();
313 if (n != 2)
314 return false;
315 if (arguments[0]->type() != Item::FIELD_ITEM)
316 return false;
317 Item_field* arg0 = static_cast<Item_field*>(arguments[0]);
318 if (strcmp(field_name, arg0->field_name.str) != 0)
319 return false;
320 if (!check_int_result(arguments[1]))
321 if (!(allow_insert_value &&
322 check_insert_value(arguments[1], field_name)))
323 return false;
324 *item_constant = arguments[1];
325 return true;
326}
327
328// Return true if an expression looks like field_name = constant
329static bool check_x_equal_0(const char *field_name, Item *item) {
330 Item* item_constant;
331 if (!check_x_op_constant(field_name, item, "=", &item_constant, false))
332 return false;
333 if (item_constant->type() != Item::INT_ITEM ||
334 item_constant->val_int() != 0)
335 return false;
336 return true;
337}
338
339// Return true if an expression looks like fieldname - 1
340static bool check_x_minus_1(const char* field_name, Item* item) {
341 Item* item_constant;
342 if (!check_x_op_constant(field_name, item, "-", &item_constant, false))
343 return false;
344 if (item_constant->type() != Item::INT_ITEM ||
345 item_constant->val_int() != 1)
346 return false;
347 return true;
348}
349
350// Return true if an expression looks like if(fieldname=0, 0, fieldname-1) and
351// the field named by fieldname is an unsigned int.
352static bool check_decr_floor_expression(Field* lhs_field, Item* item) {
353 if (item->type() != Item::FUNC_ITEM)
354 return false;
355 Item_func* item_func = static_cast<Item_func*>(item);
356 if (strcmp(item_func->func_name(), "if") != 0)
357 return false;
358 Item** arguments = item_func->arguments();
359 uint n = item_func->argument_count();
360 if (n != 3)
361 return false;
362 if (!check_x_equal_0(lhs_field->field_name.str, arguments[0]))
363 return false;
364 if (arguments[1]->type() != Item::INT_ITEM || arguments[1]->val_int() != 0)
365 return false;
366 if (!check_x_minus_1(lhs_field->field_name.str, arguments[2]))
367 return false;
368 if (!(lhs_field->flags & UNSIGNED_FLAG))
369 return false;
370 return true;
371}
372
373// Check if lhs = rhs expression is simple. Return true if it is.
374static bool check_update_expression(
375 Item* lhs_item,
376 Item* rhs_item,
377 TABLE* table,
378 bool allow_insert_value) {
379
380 Field* lhs_field = find_field_by_name(table, lhs_item);
381 if (lhs_field == NULL)
382 return false;
383 if (!lhs_field->part_of_key.is_clear_all())
384 return false;
385 enum_field_types lhs_type = lhs_field->type();
386 Item::Type rhs_type = rhs_item->type();
387 switch (lhs_type) {
388 case MYSQL_TYPE_TINY:
389 case MYSQL_TYPE_SHORT:
390 case MYSQL_TYPE_INT24:
391 case MYSQL_TYPE_LONG:
392 case MYSQL_TYPE_LONGLONG:
393 if (check_int_result(rhs_item))
394 return true;
395 Item* item_constant;
396 if (check_x_op_constant(
397 lhs_field->field_name.str,
398 rhs_item,
399 "+",
400 &item_constant,
401 allow_insert_value))
402 return true;
403 if (check_x_op_constant(
404 lhs_field->field_name.str,
405 rhs_item,
406 "-",
407 &item_constant,
408 allow_insert_value))
409 return true;
410 if (check_decr_floor_expression(lhs_field, rhs_item))
411 return true;
412 break;
413 case MYSQL_TYPE_STRING:
414 if (rhs_type == Item::INT_ITEM || rhs_type == Item::STRING_ITEM)
415 return true;
416 break;
417 case MYSQL_TYPE_VARCHAR:
418 case MYSQL_TYPE_BLOB:
419 if (rhs_type == Item::STRING_ITEM)
420 return true;
421 break;
422 default:
423 break;
424 }
425 return false;
426}
427
428// Check that all update expressions are simple. Return true if they are.
429static bool check_all_update_expressions(
430 List<Item>& fields,
431 List<Item>& values,
432 TABLE* table,
433 bool allow_insert_value) {
434
435 List_iterator<Item> lhs_i(fields);
436 List_iterator<Item> rhs_i(values);
437 while (1) {
438 Item* lhs_item = lhs_i++;
439 if (lhs_item == NULL)
440 break;
441 Item* rhs_item = rhs_i++;
442 assert_always(rhs_item != NULL);
443 if (!check_update_expression(
444 lhs_item,
445 rhs_item,
446 table,
447 allow_insert_value))
448 return false;
449 }
450 return true;
451}
452
453static bool full_field_in_key(TABLE* table, Field* field) {
454 assert_always(table->s->primary_key < table->s->keys);
455 KEY* key = &table->s->key_info[table->s->primary_key];
456 for (uint i = 0; i < key->user_defined_key_parts; i++) {
457 KEY_PART_INFO* key_part = &key->key_part[i];
458 if (strcmp(field->field_name.str, key_part->field->field_name.str) == 0) {
459 return key_part->length == field->field_length;
460 }
461 }
462 return false;
463}
464
465// Check that an expression looks like fieldname = constant, fieldname is part
466// of the primary key, and the named field is an int, char or varchar type.
467// Return true if it does.
468static bool check_pk_field_equal_constant(
469 Item* item,
470 TABLE* table,
471 MY_BITMAP* pk_fields) {
472
473 if (item->type() != Item::FUNC_ITEM)
474 return false;
475 Item_func* func = static_cast<Item_func*>(item);
476 if (strcmp(func->func_name(), "=") != 0)
477 return false;
478 uint n = func->argument_count();
479 if (n != 2)
480 return false;
481 Item** arguments = func->arguments();
482 Field* field = find_field_by_name(table, arguments[0]);
483 if (field == NULL)
484 return false;
485 if (!bitmap_test_and_clear(pk_fields, field->field_index))
486 return false;
487 switch (field->type()) {
488 case MYSQL_TYPE_TINY:
489 case MYSQL_TYPE_SHORT:
490 case MYSQL_TYPE_INT24:
491 case MYSQL_TYPE_LONG:
492 case MYSQL_TYPE_LONGLONG:
493 return arguments[1]->type() == Item::INT_ITEM ||
494 arguments[1]->type() == Item::STRING_ITEM;
495 case MYSQL_TYPE_STRING:
496 case MYSQL_TYPE_VARCHAR:
497 return full_field_in_key(table, field) &&
498 (arguments[1]->type() == Item::INT_ITEM ||
499 arguments[1]->type() == Item::STRING_ITEM);
500 default:
501 return false;
502 }
503}
504
505// Check that the where condition covers all of the primary key components
506// with fieldname = constant expressions. Return true if it does.
507static bool check_point_update(Item* conds, TABLE* table) {
508 bool result = false;
509
510 if (conds == NULL)
511 return false; // no where condition on the update
512
513 if (table->s->primary_key >= table->s->keys)
514 return false; // no primary key defined
515
516 // use a bitmap of the primary key fields to keep track of those fields
517 // that are covered by the where conditions
518 MY_BITMAP pk_fields;
519 if (bitmap_init(&pk_fields, NULL, table->s->fields, FALSE)) // 1 -> failure
520 return false;
521 KEY *key = &table->s->key_info[table->s->primary_key];
522 for (uint i = 0; i < key->user_defined_key_parts; i++)
523 bitmap_set_bit(&pk_fields, key->key_part[i].field->field_index);
524
525 switch (conds->type()) {
526 case Item::FUNC_ITEM:
527 result = check_pk_field_equal_constant(conds, table, &pk_fields);
528 break;
529 case Item::COND_ITEM: {
530 Item_cond* cond_item = static_cast<Item_cond*>(conds);
531 if (strcmp(cond_item->func_name(), "and") != 0)
532 break;
533 List_iterator<Item> li(*cond_item->argument_list());
534 Item* list_item;
535 result = true;
536 while (result == true && (list_item = li++)) {
537 result = check_pk_field_equal_constant(
538 list_item,
539 table,
540 &pk_fields);
541 }
542 break;
543 }
544 default:
545 break;
546 }
547
548 if (!bitmap_is_clear_all(&pk_fields))
549 result = false;
550 bitmap_free(&pk_fields);
551 return result;
552}
553
554// Return true if there are any clustering keys (except the primary).
555// Precompute this when the table is opened.
556static bool clustering_keys_exist(TABLE *table) {
557 for (uint keynr = 0; keynr < table->s->keys; keynr++) {
558 if (keynr != table->s->primary_key &&
559 key_is_clustering(&table->s->key_info[keynr]))
560 return true;
561 }
562 return false;
563}
564
565static bool is_strict_mode(THD* thd) {
566#if 50600 <= MYSQL_VERSION_ID && MYSQL_VERSION_ID <= 50699
567 return thd->is_strict_mode();
568#else
569 return tokudb_test(thd->variables.sql_mode & (MODE_STRICT_TRANS_TABLES | MODE_STRICT_ALL_TABLES));
570#endif
571}
572
573// Check if an update operation can be handled by this storage engine.
574// Return true if it can.
575bool ha_tokudb::check_fast_update(
576 THD* thd,
577 List<Item>& fields,
578 List<Item>& values,
579 Item* conds) {
580
581 if (!transaction)
582 return false;
583
584 // avoid strict mode arithmetic overflow issues
585 if (is_strict_mode(thd))
586 return false;
587
588 // no triggers
589 if (table->triggers)
590 return false;
591
592 // no binlog
593 if (mysql_bin_log.is_open() &&
594 !(thd->variables.binlog_format == BINLOG_FORMAT_STMT ||
595 thd->variables.binlog_format == BINLOG_FORMAT_MIXED))
596 return false;
597
598 // no clustering keys (need to broadcast an increment into the clustering
599 // keys since we are selecting with the primary key)
600 if (clustering_keys_exist(table))
601 return false;
602
603 if (!check_all_update_expressions(fields, values, table, false))
604 return false;
605
606 if (!check_point_update(conds, table))
607 return false;
608
609 return true;
610}
611
612static void marshall_varchar_descriptor(
613 tokudb::buffer& b,
614 TABLE* table,
615 KEY_AND_COL_INFO* kc_info,
616 uint key_num) {
617
618 b.append_ui<uint32_t>('v');
619 b.append_ui<uint32_t>(
620 table->s->null_bytes + kc_info->mcp_info[key_num].fixed_field_size);
621 uint32_t var_offset_bytes = kc_info->mcp_info[key_num].len_of_offsets;
622 b.append_ui<uint32_t>(var_offset_bytes);
623 b.append_ui<uint32_t>(
624 var_offset_bytes == 0 ? 0 : kc_info->num_offset_bytes);
625}
626
627static void marshall_blobs_descriptor(
628 tokudb::buffer& b,
629 TABLE* table,
630 KEY_AND_COL_INFO* kc_info) {
631
632 b.append_ui<uint32_t>('b');
633 uint32_t n = kc_info->num_blobs;
634 b.append_ui<uint32_t>(n);
635 for (uint i = 0; i < n; i++) {
636 uint blob_field_index = kc_info->blob_fields[i];
637 assert_always(blob_field_index < table->s->fields);
638 uint8_t blob_field_length =
639 table->s->field[blob_field_index]->row_pack_length();
640 b.append(&blob_field_length, sizeof blob_field_length);
641 }
642}
643
644static inline uint32_t get_null_bit_position(uint32_t null_bit);
645
646// evaluate the int value of an item
647static longlong item_val_int(Item* item) {
648 Item::Type t = item->type();
649 if (t == Item::INSERT_VALUE_ITEM) {
650 Item_insert_value* value_item = static_cast<Item_insert_value*>(item);
651 return value_item->arg->val_int();
652 } else
653 return item->val_int();
654}
655
656// Marshall update operations to a buffer.
657static void marshall_update(
658 tokudb::buffer& b,
659 Item* lhs_item,
660 Item* rhs_item,
661 TABLE* table,
662 TOKUDB_SHARE* share) {
663
664 // figure out the update operation type (again)
665 Field* lhs_field = find_field_by_name(table, lhs_item);
666 assert_always(lhs_field); // we found it before, so this should work
667
668 // compute the update info
669 uint32_t field_type;
670 uint32_t field_null_num = 0;
671 if (lhs_field->real_maybe_null()) {
672 uint32_t field_num = lhs_field->field_index;
673 field_null_num =
674 ((field_num/8)*8 + get_null_bit_position(lhs_field->null_bit)) + 1;
675 }
676 uint32_t offset;
677 void* v_ptr = NULL;
678 uint32_t v_length;
679 uint32_t update_operation;
680 longlong v_ll;
681 String v_str;
682
683 switch (lhs_field->type()) {
684 case MYSQL_TYPE_TINY:
685 case MYSQL_TYPE_SHORT:
686 case MYSQL_TYPE_INT24:
687 case MYSQL_TYPE_LONG:
688 case MYSQL_TYPE_LONGLONG: {
689 Field_num* lhs_num = static_cast<Field_num*>(lhs_field);
690 field_type = lhs_num->unsigned_flag ? UPDATE_TYPE_UINT : UPDATE_TYPE_INT;
691 offset =
692 fixed_field_offset(
693 table->s->null_bytes,
694 &share->kc_info,
695 table->s->primary_key,
696 lhs_field->field_index);
697 switch (rhs_item->type()) {
698 case Item::INT_ITEM: {
699 update_operation = '=';
700 v_ll = rhs_item->val_int();
701 v_length = lhs_field->pack_length();
702 v_ptr = &v_ll;
703 break;
704 }
705 case Item::FUNC_ITEM: {
706 Item_func* rhs_func = static_cast<Item_func*>(rhs_item);
707 Item** arguments = rhs_func->arguments();
708 // we only support one if function for now, and it is a
709 // decrement with floor.
710 if (strcmp(rhs_func->func_name(), "if") == 0) {
711 update_operation = '-';
712 v_ll = 1;
713 } else if (rhs_func->argument_count() == 1) {
714 update_operation = '=';
715 v_ll = rhs_func->val_int();
716 } else {
717 update_operation = rhs_func->func_name()[0];
718 v_ll = item_val_int(arguments[1]);
719 }
720 v_length = lhs_field->pack_length();
721 v_ptr = &v_ll;
722 break;
723 }
724 default:
725 assert_unreachable();
726 }
727 break;
728 }
729 case MYSQL_TYPE_STRING: {
730 update_operation = '=';
731 field_type =
732 lhs_field->binary() ? UPDATE_TYPE_BINARY : UPDATE_TYPE_CHAR;
733 offset =
734 fixed_field_offset(
735 table->s->null_bytes,
736 &share->kc_info,
737 table->s->primary_key,
738 lhs_field->field_index);
739 v_str = *rhs_item->val_str(&v_str);
740 v_length = v_str.length();
741 if (v_length >= lhs_field->pack_length()) {
742 v_length = lhs_field->pack_length();
743 v_str.length(v_length); // truncate
744 } else {
745 v_length = lhs_field->pack_length();
746 uchar pad_char =
747 lhs_field->binary() ? 0 : lhs_field->charset()->pad_char;
748 v_str.fill(lhs_field->pack_length(), pad_char); // pad
749 }
750 v_ptr = v_str.c_ptr();
751 break;
752 }
753 case MYSQL_TYPE_VARCHAR: {
754 update_operation = '=';
755 field_type =
756 lhs_field->binary() ? UPDATE_TYPE_VARBINARY : UPDATE_TYPE_VARCHAR;
757 offset =
758 var_field_index(
759 table,
760 &share->kc_info,
761 table->s->primary_key,
762 lhs_field->field_index);
763 v_str = *rhs_item->val_str(&v_str);
764 v_length = v_str.length();
765 if (v_length >= lhs_field->row_pack_length()) {
766 v_length = lhs_field->row_pack_length();
767 v_str.length(v_length); // truncate
768 }
769 v_ptr = v_str.c_ptr();
770 break;
771 }
772 case MYSQL_TYPE_BLOB: {
773 update_operation = '=';
774 field_type = lhs_field->binary() ? UPDATE_TYPE_BLOB : UPDATE_TYPE_TEXT;
775 offset =
776 blob_field_index(
777 table,
778 &share->kc_info,
779 table->s->primary_key,
780 lhs_field->field_index);
781 v_str = *rhs_item->val_str(&v_str);
782 v_length = v_str.length();
783 if (v_length >= lhs_field->max_data_length()) {
784 v_length = lhs_field->max_data_length();
785 v_str.length(v_length); // truncate
786 }
787 v_ptr = v_str.c_ptr();
788 break;
789 }
790 default:
791 assert_unreachable();
792 }
793
794 // marshall the update fields into the buffer
795 b.append_ui<uint32_t>(update_operation);
796 b.append_ui<uint32_t>(field_type);
797 b.append_ui<uint32_t>(field_null_num);
798 b.append_ui<uint32_t>(offset);
799 b.append_ui<uint32_t>(v_length);
800 b.append(v_ptr, v_length);
801}
802
803// Save an item's value into the appropriate field. Return 0 if successful.
804static int save_in_field(Item* item, TABLE* table) {
805 assert_always(item->type() == Item::FUNC_ITEM);
806 Item_func *func = static_cast<Item_func*>(item);
807 assert_always(strcmp(func->func_name(), "=") == 0);
808 uint n = func->argument_count();
809 assert_always(n == 2);
810 Item **arguments = func->arguments();
811 assert_always(arguments[0]->type() == Item::FIELD_ITEM);
812 Item_field *field_item = static_cast<Item_field*>(arguments[0]);
813 my_bitmap_map *old_map = dbug_tmp_use_all_columns(table, table->write_set);
814 int error = arguments[1]->save_in_field(field_item->field, 0);
815 dbug_tmp_restore_column_map(table->write_set, old_map);
816 return error;
817}
818
819static void count_update_types(
820 Field* lhs_field,
821 uint* num_varchars,
822 uint* num_blobs) {
823
824 switch (lhs_field->type()) {
825 case MYSQL_TYPE_VARCHAR:
826 *num_varchars += 1;
827 break;
828 case MYSQL_TYPE_BLOB:
829 *num_blobs += 1;
830 break;
831 default:
832 break;
833 }
834}
835
836// Generate an update message for an update operation and send it into the
837// primary tree. Return 0 if successful.
838int ha_tokudb::send_update_message(
839 List<Item>& update_fields,
840 List<Item>& update_values,
841 Item* conds,
842 DB_TXN* txn) {
843
844 int error;
845
846 // Save the primary key from the where conditions
847 Item::Type t = conds->type();
848 if (t == Item::FUNC_ITEM) {
849 error = save_in_field(conds, table);
850 } else if (t == Item::COND_ITEM) {
851 Item_cond* cond_item = static_cast<Item_cond*>(conds);
852 List_iterator<Item> li(*cond_item->argument_list());
853 Item* list_item;
854 error = 0;
855 while (error == 0 && (list_item = li++)) {
856 error = save_in_field(list_item, table);
857 }
858 } else {
859 assert_unreachable();
860 }
861 if (error)
862 return error;
863
864 // put the primary key into key_buff and wrap it with key_dbt
865 DBT key_dbt;
866 bool has_null;
867 create_dbt_key_from_table(
868 &key_dbt,
869 primary_key,
870 key_buff,
871 table->record[0],
872 &has_null);
873
874 // construct the update message
875 tokudb::buffer update_message;
876
877 uint8_t op = UPDATE_OP_UPDATE_2;
878 update_message.append(&op, sizeof op);
879
880 uint32_t num_updates = update_fields.elements;
881 uint num_varchars = 0, num_blobs = 0;
882 if (1) {
883 List_iterator<Item> lhs_i(update_fields);
884 Item* lhs_item;
885 while ((lhs_item = lhs_i++)) {
886 if (lhs_item == NULL)
887 break;
888 Field* lhs_field = find_field_by_name(table, lhs_item);
889 assert_always(lhs_field); // we found it before, so this should work
890 count_update_types(lhs_field, &num_varchars, &num_blobs);
891 }
892 if (num_varchars > 0 || num_blobs > 0)
893 num_updates++;
894 if (num_blobs > 0)
895 num_updates++;
896 }
897
898 // append the updates
899 update_message.append_ui<uint32_t>(num_updates);
900
901 if (num_varchars > 0 || num_blobs > 0)
902 marshall_varchar_descriptor(
903 update_message,
904 table,
905 &share->kc_info,
906 table->s->primary_key);
907 if (num_blobs > 0)
908 marshall_blobs_descriptor(update_message, table, &share->kc_info);
909
910 List_iterator<Item> lhs_i(update_fields);
911 List_iterator<Item> rhs_i(update_values);
912 while (error == 0) {
913 Item* lhs_item = lhs_i++;
914 if (lhs_item == NULL)
915 break;
916 Item* rhs_item = rhs_i++;
917 assert_always(rhs_item != NULL);
918 marshall_update(update_message, lhs_item, rhs_item, table, share);
919 }
920
921 rwlock_t_lock_read(share->_num_DBs_lock);
922
923 // hot index in progress
924 if (share->num_DBs > table->s->keys + tokudb_test(hidden_primary_key)) {
925 error = ENOTSUP; // run on the slow path
926 } else {
927 // send the update message
928 DBT update_dbt; memset(&update_dbt, 0, sizeof update_dbt);
929 update_dbt.data = update_message.data();
930 update_dbt.size = update_message.size();
931 error =
932 share->key_file[primary_key]->update(
933 share->key_file[primary_key],
934 txn,
935 &key_dbt,
936 &update_dbt,
937 0);
938 }
939
940 share->_num_DBs_lock.unlock();
941
942 return error;
943}
944
945// Determine if an upsert operation can be offloaded to the storage engine.
946// An upsert consists of a row and a list of update expressions
947// (update_fields[i] = update_values[i]).
948// The function returns 0 if the upsert is handled in the storage engine.
949// Otherwise, an error code is returned.
950int ha_tokudb::upsert(
951 THD* thd,
952 List<Item>& update_fields,
953 List<Item>& update_values) {
954
955 TOKUDB_HANDLER_DBUG_ENTER("");
956
957 int error = 0;
958
959 if (TOKUDB_UNLIKELY(TOKUDB_DEBUG_FLAGS(TOKUDB_DEBUG_UPSERT))) {
960 fprintf(stderr, "upsert\n");
961 dump_item_list("update_fields", update_fields);
962 dump_item_list("update_values", update_values);
963 }
964
965 // not an upsert or something is fishy with the parameters
966 if (update_fields.elements < 1 ||
967 update_fields.elements != update_values.elements) {
968 error = ENOTSUP;
969 goto return_error;
970 }
971
972 if (!check_upsert(thd, update_fields, update_values)) {
973 error = ENOTSUP;
974 goto check_error;
975 }
976
977 error = send_upsert_message(thd, update_fields, update_values, transaction);
978 if (error != 0) {
979 goto check_error;
980 }
981
982check_error:
983 if (error != 0) {
984 if (tokudb::sysvars::disable_slow_upsert(thd) != 0)
985 error = HA_ERR_UNSUPPORTED;
986 if (error != ENOTSUP)
987 print_error(error, MYF(0));
988 }
989
990return_error:
991 TOKUDB_HANDLER_DBUG_RETURN(error);
992}
993
994// Check if an upsert can be handled by this storage engine.
995// Return true if it can.
996bool ha_tokudb::check_upsert(
997 THD* thd,
998 List<Item>& update_fields,
999 List<Item>& update_values) {
1000
1001 if (!transaction)
1002 return false;
1003
1004 // avoid strict mode arithmetic overflow issues
1005 if (is_strict_mode(thd))
1006 return false;
1007
1008 // no triggers
1009 if (table->triggers)
1010 return false;
1011
1012 // primary key must exist
1013 if (table->s->primary_key >= table->s->keys)
1014 return false;
1015
1016 // no secondary keys
1017 if (table->s->keys > 1)
1018 return false;
1019
1020 // no binlog
1021 if (mysql_bin_log.is_open() &&
1022 !(thd->variables.binlog_format == BINLOG_FORMAT_STMT ||
1023 thd->variables.binlog_format == BINLOG_FORMAT_MIXED))
1024 return false;
1025
1026 if (!check_all_update_expressions(
1027 update_fields,
1028 update_values,
1029 table,
1030 true))
1031 return false;
1032
1033 return true;
1034}
1035
1036// Generate an upsert message and send it into the primary tree.
1037// Return 0 if successful.
1038int ha_tokudb::send_upsert_message(
1039 THD* thd,
1040 List<Item>& update_fields,
1041 List<Item>& update_values,
1042 DB_TXN* txn) {
1043 int error = 0;
1044
1045 // generate primary key
1046 DBT key_dbt;
1047 bool has_null;
1048 create_dbt_key_from_table(
1049 &key_dbt,
1050 primary_key,
1051 primary_key_buff,
1052 table->record[0],
1053 &has_null);
1054
1055 // generate packed row
1056 DBT row;
1057 error = pack_row(&row, (const uchar *) table->record[0], primary_key);
1058 if (error)
1059 return error;
1060
1061 tokudb::buffer update_message;
1062
1063 // append the operation
1064 uint8_t op = UPDATE_OP_UPSERT_2;
1065 update_message.append(&op, sizeof op);
1066
1067 // append the row
1068 update_message.append_ui<uint32_t>(row.size);
1069 update_message.append(row.data, row.size);
1070
1071 uint32_t num_updates = update_fields.elements;
1072 uint num_varchars = 0, num_blobs = 0;
1073 if (1) {
1074 List_iterator<Item> lhs_i(update_fields);
1075 Item* lhs_item;
1076 while ((lhs_item = lhs_i++)) {
1077 if (lhs_item == NULL)
1078 break;
1079 Field* lhs_field = find_field_by_name(table, lhs_item);
1080 assert_always(lhs_field); // we found it before, so this should work
1081 count_update_types(lhs_field, &num_varchars, &num_blobs);
1082 }
1083 if (num_varchars > 0 || num_blobs > 0)
1084 num_updates++;
1085 if (num_blobs > 0)
1086 num_updates++;
1087 }
1088
1089 // append the updates
1090 update_message.append_ui<uint32_t>(num_updates);
1091
1092 if (num_varchars > 0 || num_blobs > 0)
1093 marshall_varchar_descriptor(
1094 update_message,
1095 table, &share->kc_info,
1096 table->s->primary_key);
1097 if (num_blobs > 0)
1098 marshall_blobs_descriptor(update_message, table, &share->kc_info);
1099
1100 List_iterator<Item> lhs_i(update_fields);
1101 List_iterator<Item> rhs_i(update_values);
1102 while (1) {
1103 Item* lhs_item = lhs_i++;
1104 if (lhs_item == NULL)
1105 break;
1106 Item* rhs_item = rhs_i++;
1107 assert_always(rhs_item != NULL);
1108 marshall_update(update_message, lhs_item, rhs_item, table, share);
1109 }
1110
1111 rwlock_t_lock_read(share->_num_DBs_lock);
1112
1113 // hot index in progress
1114 if (share->num_DBs > table->s->keys + tokudb_test(hidden_primary_key)) {
1115 error = ENOTSUP; // run on the slow path
1116 } else {
1117 // send the upsert message
1118 DBT update_dbt; memset(&update_dbt, 0, sizeof update_dbt);
1119 update_dbt.data = update_message.data();
1120 update_dbt.size = update_message.size();
1121 error =
1122 share->key_file[primary_key]->update(
1123 share->key_file[primary_key],
1124 txn,
1125 &key_dbt,
1126 &update_dbt,
1127 0);
1128 }
1129
1130 share->_num_DBs_lock.unlock();
1131
1132 return error;
1133}
1134
1135#endif
1136