1/*
2 Copyright (c) 2004, 2014, Oracle and/or its affiliates
3 Copyright (c) 2010, 2014, SkySQL Ab.
4
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License
7 as published by the Free Software Foundation; version 2 of
8 the License.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18*/
19
20#ifdef USE_PRAGMA_IMPLEMENTATION
21#pragma implementation // gcc: Class implementation
22#endif
23
24#include <my_global.h>
25#include "sql_class.h" // SSV
26#include "sql_table.h" // build_table_filename
27#include <myisam.h> // T_EXTEND
28
29#include "ha_archive.h"
30#include "discover.h"
31#include <my_dir.h>
32
33#include <mysql/plugin.h>
34
35/*
36 First, if you want to understand storage engines you should look at
37 ha_example.cc and ha_example.h.
38
39 This example was written as a test case for a customer who needed
40 a storage engine without indexes that could compress data very well.
41 So, welcome to a completely compressed storage engine. This storage
42 engine only does inserts. No replace, deletes, or updates. All reads are
43 complete table scans. Compression is done through a combination of packing
44 and making use of the zlib library
45
46 We keep a file pointer open for each instance of ha_archive for each read
47 but for writes we keep one open file handle just for that. We flush it
48 only if we have a read occur. azip handles compressing lots of records
49 at once much better then doing lots of little records between writes.
50 It is possible to not lock on writes but this would then mean we couldn't
51 handle bulk inserts as well (that is if someone was trying to read at
52 the same time since we would want to flush).
53
54 A "meta" file is kept alongside the data file. This file serves two purpose.
55 The first purpose is to track the number of rows in the table. The second
56 purpose is to determine if the table was closed properly or not. When the
57 meta file is first opened it is marked as dirty. It is opened when the table
58 itself is opened for writing. When the table is closed the new count for rows
59 is written to the meta file and the file is marked as clean. If the meta file
60 is opened and it is marked as dirty, it is assumed that a crash occurred. At
61 this point an error occurs and the user is told to rebuild the file.
62 A rebuild scans the rows and rewrites the meta file. If corruption is found
63 in the data file then the meta file is not repaired.
64
65 At some point a recovery method for such a drastic case needs to be divised.
66
67 Locks are row level, and you will get a consistant read.
68
69 For performance as far as table scans go it is quite fast. I don't have
70 good numbers but locally it has out performed both Innodb and MyISAM. For
71 Innodb the question will be if the table can be fit into the buffer
72 pool. For MyISAM its a question of how much the file system caches the
73 MyISAM file. With enough free memory MyISAM is faster. Its only when the OS
74 doesn't have enough memory to cache entire table that archive turns out
75 to be any faster.
76
77 Examples between MyISAM (packed) and Archive.
78
79 Table with 76695844 identical rows:
80 29680807 a_archive.ARZ
81 920350317 a.MYD
82
83
84 Table with 8991478 rows (all of Slashdot's comments):
85 1922964506 comment_archive.ARZ
86 2944970297 comment_text.MYD
87
88
89 TODO:
90 Allow users to set compression level.
91 Allow adjustable block size.
92 Implement versioning, should be easy.
93 Allow for errors, find a way to mark bad rows.
94 Add optional feature so that rows can be flushed at interval (which will cause less
95 compression but may speed up ordered searches).
96 Checkpoint the meta file to allow for faster rebuilds.
97 Option to allow for dirty reads, this would lower the sync calls, which would make
98 inserts a lot faster, but would mean highly arbitrary reads.
99
100 -Brian
101
102 Archive file format versions:
103 <5.1.5 - v.1
104 5.1.5-5.1.15 - v.2
105 >5.1.15 - v.3
106*/
107
108/* The file extension */
109#define ARZ ".ARZ" // The data file
110#define ARN ".ARN" // Files used during an optimize call
111#define ARM ".ARM" // Meta file (deprecated)
112
113/* 5.0 compatibility */
114#define META_V1_OFFSET_CHECK_HEADER 0
115#define META_V1_OFFSET_VERSION 1
116#define META_V1_OFFSET_ROWS_RECORDED 2
117#define META_V1_OFFSET_CHECK_POINT 10
118#define META_V1_OFFSET_CRASHED 18
119#define META_V1_LENGTH 19
120
121/*
122 uchar + uchar
123*/
124#define DATA_BUFFER_SIZE 2 // Size of the data used in the data file
125#define ARCHIVE_CHECK_HEADER 254 // The number we use to determine corruption
126
127#ifdef HAVE_PSI_INTERFACE
128extern "C" PSI_file_key arch_key_file_data;
129#endif
130
131/* Static declarations for handerton */
132static handler *archive_create_handler(handlerton *hton,
133 TABLE_SHARE *table,
134 MEM_ROOT *mem_root);
135int archive_discover(handlerton *hton, THD* thd, TABLE_SHARE *share);
136
137/*
138 Number of rows that will force a bulk insert.
139*/
140#define ARCHIVE_MIN_ROWS_TO_USE_BULK_INSERT 2
141
142/*
143 Size of header used for row
144*/
145#define ARCHIVE_ROW_HEADER_SIZE 4
146
147static handler *archive_create_handler(handlerton *hton,
148 TABLE_SHARE *table,
149 MEM_ROOT *mem_root)
150{
151 return new (mem_root) ha_archive(hton, table);
152}
153
154#ifdef HAVE_PSI_INTERFACE
155PSI_mutex_key az_key_mutex_Archive_share_mutex;
156
157static PSI_mutex_info all_archive_mutexes[]=
158{
159 { &az_key_mutex_Archive_share_mutex, "Archive_share::mutex", 0}
160};
161
162PSI_file_key arch_key_file_metadata, arch_key_file_data;
163static PSI_file_info all_archive_files[]=
164{
165 { &arch_key_file_metadata, "metadata", 0},
166 { &arch_key_file_data, "data", 0}
167};
168
169static void init_archive_psi_keys(void)
170{
171 const char* category= "archive";
172 int count;
173
174 if (!PSI_server)
175 return;
176
177 count= array_elements(all_archive_mutexes);
178 mysql_mutex_register(category, all_archive_mutexes, count);
179
180 count= array_elements(all_archive_files);
181 mysql_file_register(category, all_archive_files, count);
182}
183
184#endif /* HAVE_PSI_INTERFACE */
185
186/*
187 Initialize the archive handler.
188
189 SYNOPSIS
190 archive_db_init()
191 void *
192
193 RETURN
194 FALSE OK
195 TRUE Error
196*/
197
198/*
199 We just implement one additional file extension.
200 ARM is here just to properly drop 5.0 tables.
201*/
202static const char *ha_archive_exts[] = {
203 ARZ,
204 ARM,
205 NullS
206};
207
208int archive_db_init(void *p)
209{
210 DBUG_ENTER("archive_db_init");
211 handlerton *archive_hton;
212
213#ifdef HAVE_PSI_INTERFACE
214 init_archive_psi_keys();
215#endif
216
217 archive_hton= (handlerton *)p;
218 archive_hton->state= SHOW_OPTION_YES;
219 archive_hton->db_type= DB_TYPE_ARCHIVE_DB;
220 archive_hton->create= archive_create_handler;
221 archive_hton->flags= HTON_NO_FLAGS;
222 archive_hton->discover_table= archive_discover;
223 archive_hton->tablefile_extensions= ha_archive_exts;
224
225 DBUG_RETURN(0);
226}
227
228
229Archive_share::Archive_share()
230{
231 crashed= false;
232 in_optimize= false;
233 archive_write_open= false;
234 dirty= false;
235 DBUG_PRINT("ha_archive", ("Archive_share: %p",
236 this));
237 thr_lock_init(&lock);
238 /*
239 We will use this lock for rows.
240 */
241 mysql_mutex_init(az_key_mutex_Archive_share_mutex,
242 &mutex, MY_MUTEX_INIT_FAST);
243}
244
245
246ha_archive::ha_archive(handlerton *hton, TABLE_SHARE *table_arg)
247 :handler(hton, table_arg), delayed_insert(0), bulk_insert(0)
248{
249 /* Set our original buffer from pre-allocated memory */
250 buffer.set((char *)byte_buffer, IO_SIZE, system_charset_info);
251
252 /* The size of the offset value we will use for position() */
253 ref_length= sizeof(my_off_t);
254 archive_reader_open= FALSE;
255}
256
257int archive_discover(handlerton *hton, THD* thd, TABLE_SHARE *share)
258{
259 DBUG_ENTER("archive_discover");
260 DBUG_PRINT("archive_discover", ("db: '%s' name: '%s'", share->db.str,
261 share->table_name.str));
262 azio_stream frm_stream;
263 char az_file[FN_REFLEN];
264 uchar *frm_ptr;
265 MY_STAT file_stat;
266
267 strxmov(az_file, share->normalized_path.str, ARZ, NullS);
268
269 if (!(mysql_file_stat(/* arch_key_file_data */ 0, az_file, &file_stat, MYF(0))))
270 DBUG_RETURN(HA_ERR_NO_SUCH_TABLE);
271
272 if (!(azopen(&frm_stream, az_file, O_RDONLY|O_BINARY)))
273 {
274 if (errno == EROFS || errno == EACCES)
275 DBUG_RETURN(my_errno= errno);
276 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
277 }
278
279 if (frm_stream.frm_length == 0)
280 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
281
282 frm_ptr= (uchar *)my_malloc(sizeof(char) * frm_stream.frm_length,
283 MYF(MY_THREAD_SPECIFIC | MY_WME));
284 if (!frm_ptr)
285 DBUG_RETURN(HA_ERR_OUT_OF_MEM);
286
287 if (azread_frm(&frm_stream, frm_ptr))
288 goto ret;
289
290 azclose(&frm_stream);
291
292 my_errno= share->init_from_binary_frm_image(thd, 1,
293 frm_ptr, frm_stream.frm_length);
294ret:
295 my_free(frm_ptr);
296 DBUG_RETURN(my_errno);
297}
298
299/**
300 @brief Read version 1 meta file (5.0 compatibility routine).
301
302 @return Completion status
303 @retval 0 Success
304 @retval !0 Failure
305*/
306
307int Archive_share::read_v1_metafile()
308{
309 char file_name[FN_REFLEN];
310 uchar buf[META_V1_LENGTH];
311 File fd;
312 DBUG_ENTER("Archive_share::read_v1_metafile");
313
314 fn_format(file_name, data_file_name, "", ARM, MY_REPLACE_EXT);
315 if ((fd= mysql_file_open(arch_key_file_metadata, file_name, O_RDONLY, MYF(0))) == -1)
316 DBUG_RETURN(-1);
317
318 if (mysql_file_read(fd, buf, sizeof(buf), MYF(0)) != sizeof(buf))
319 {
320 mysql_file_close(fd, MYF(0));
321 DBUG_RETURN(-1);
322 }
323
324 rows_recorded= uint8korr(buf + META_V1_OFFSET_ROWS_RECORDED);
325 crashed= buf[META_V1_OFFSET_CRASHED];
326 mysql_file_close(fd, MYF(0));
327 DBUG_RETURN(0);
328}
329
330
331/**
332 @brief Write version 1 meta file (5.0 compatibility routine).
333
334 @return Completion status
335 @retval 0 Success
336 @retval !0 Failure
337*/
338
339int Archive_share::write_v1_metafile()
340{
341 char file_name[FN_REFLEN];
342 uchar buf[META_V1_LENGTH];
343 File fd;
344 DBUG_ENTER("Archive_share::write_v1_metafile");
345
346 buf[META_V1_OFFSET_CHECK_HEADER]= ARCHIVE_CHECK_HEADER;
347 buf[META_V1_OFFSET_VERSION]= 1;
348 int8store(buf + META_V1_OFFSET_ROWS_RECORDED, rows_recorded);
349 int8store(buf + META_V1_OFFSET_CHECK_POINT, (ulonglong) 0);
350 buf[META_V1_OFFSET_CRASHED]= crashed;
351
352 fn_format(file_name, data_file_name, "", ARM, MY_REPLACE_EXT);
353 if ((fd= mysql_file_open(arch_key_file_metadata, file_name, O_WRONLY, MYF(0))) == -1)
354 DBUG_RETURN(-1);
355
356 if (mysql_file_write(fd, buf, sizeof(buf), MYF(0)) != sizeof(buf))
357 {
358 mysql_file_close(fd, MYF(0));
359 DBUG_RETURN(-1);
360 }
361
362 mysql_file_close(fd, MYF(0));
363 DBUG_RETURN(0);
364}
365
366/**
367 @brief Pack version 1 row (5.0 compatibility routine).
368
369 @param[in] record the record to pack
370
371 @return Length of packed row
372*/
373
374unsigned int ha_archive::pack_row_v1(uchar *record)
375{
376 uint *blob, *end;
377 uchar *pos;
378 DBUG_ENTER("pack_row_v1");
379 memcpy(record_buffer->buffer, record, table->s->reclength);
380
381 /*
382 The end of VARCHAR fields are filled with garbage,so here
383 we explicitly set the end of the VARCHAR fields with zeroes
384 */
385
386 for (Field** field= table->field; (*field) ; field++)
387 {
388 Field *fld= *field;
389 if (fld->type() == MYSQL_TYPE_VARCHAR)
390 {
391 if (!(fld->is_real_null(record - table->record[0])))
392 {
393 ptrdiff_t start= (fld->ptr - table->record[0]);
394 Field_varstring *const field_var= (Field_varstring *)fld;
395 uint offset= field_var->data_length() + field_var->length_size();
396 memset(record_buffer->buffer + start + offset, 0,
397 fld->field_length - offset + 1);
398 }
399 }
400 }
401 pos= record_buffer->buffer + table->s->reclength;
402 for (blob= table->s->blob_field, end= blob + table->s->blob_fields;
403 blob != end; blob++)
404 {
405 uint32 length= ((Field_blob *) table->field[*blob])->get_length();
406 if (length)
407 {
408 uchar *data_ptr= ((Field_blob *) table->field[*blob])->get_ptr();
409 memcpy(pos, data_ptr, length);
410 pos+= length;
411 }
412 }
413 DBUG_RETURN((int)(pos - record_buffer->buffer));
414}
415
416/*
417 This method reads the header of a datafile and returns whether or not it was successful.
418*/
419int ha_archive::read_data_header(azio_stream *file_to_read)
420{
421 int error;
422 unsigned long ret;
423 uchar data_buffer[DATA_BUFFER_SIZE];
424 DBUG_ENTER("ha_archive::read_data_header");
425
426 if (azrewind(file_to_read) == -1)
427 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
428
429 if (file_to_read->version >= 3)
430 DBUG_RETURN(0);
431 /* Everything below this is just legacy to version 2< */
432
433 DBUG_PRINT("ha_archive", ("Reading legacy data header"));
434
435 ret= azread(file_to_read, data_buffer, DATA_BUFFER_SIZE, &error);
436
437 if (ret != DATA_BUFFER_SIZE)
438 {
439 DBUG_PRINT("ha_archive", ("Reading, expected %d got %lu",
440 DATA_BUFFER_SIZE, ret));
441 DBUG_RETURN(1);
442 }
443
444 if (error)
445 {
446 DBUG_PRINT("ha_archive", ("Compression error (%d)", error));
447 DBUG_RETURN(1);
448 }
449
450 DBUG_PRINT("ha_archive", ("Check %u", data_buffer[0]));
451 DBUG_PRINT("ha_archive", ("Version %u", data_buffer[1]));
452
453 if ((data_buffer[0] != (uchar)ARCHIVE_CHECK_HEADER) &&
454 (data_buffer[1] == 1 || data_buffer[1] == 2))
455 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
456
457 DBUG_RETURN(0);
458}
459
460
461/*
462 We create the shared memory space that we will use for the open table.
463 No matter what we try to get or create a share. This is so that a repair
464 table operation can occur.
465
466 See ha_example.cc for a longer description.
467*/
468Archive_share *ha_archive::get_share(const char *table_name, int *rc)
469{
470 Archive_share *tmp_share;
471
472 DBUG_ENTER("ha_archive::get_share");
473
474 lock_shared_ha_data();
475 if (!(tmp_share= static_cast<Archive_share*>(get_ha_share_ptr())))
476 {
477 azio_stream archive_tmp;
478
479 tmp_share= new Archive_share;
480
481 if (!tmp_share)
482 {
483 *rc= HA_ERR_OUT_OF_MEM;
484 goto err;
485 }
486 DBUG_PRINT("ha_archive", ("new Archive_share: %p",
487 tmp_share));
488
489 fn_format(tmp_share->data_file_name, table_name, "",
490 ARZ, MY_REPLACE_EXT | MY_UNPACK_FILENAME);
491 strmov(tmp_share->table_name, table_name);
492 DBUG_PRINT("ha_archive", ("Data File %s",
493 tmp_share->data_file_name));
494
495 /*
496 We read the meta file, but do not mark it dirty. Since we are not
497 doing a write we won't mark it dirty (and we won't open it for
498 anything but reading... open it for write and we will generate null
499 compression writes).
500 */
501 if (!(azopen(&archive_tmp, tmp_share->data_file_name, O_RDONLY|O_BINARY)))
502 {
503 delete tmp_share;
504 *rc= my_errno ? my_errno : HA_ERR_CRASHED;
505 tmp_share= NULL;
506 goto err;
507 }
508 stats.auto_increment_value= archive_tmp.auto_increment + 1;
509 tmp_share->rows_recorded= (ha_rows)archive_tmp.rows;
510 tmp_share->crashed= archive_tmp.dirty;
511 share= tmp_share;
512 if (archive_tmp.version == 1)
513 share->read_v1_metafile();
514 else if (frm_compare(&archive_tmp))
515 *rc= HA_ERR_TABLE_DEF_CHANGED;
516
517 azclose(&archive_tmp);
518
519 set_ha_share_ptr(static_cast<Handler_share*>(tmp_share));
520 }
521 if (tmp_share->crashed)
522 *rc= HA_ERR_CRASHED_ON_USAGE;
523err:
524 unlock_shared_ha_data();
525
526 DBUG_ASSERT(tmp_share || *rc);
527
528 DBUG_RETURN(tmp_share);
529}
530
531
532int Archive_share::init_archive_writer()
533{
534 DBUG_ENTER("Archive_share::init_archive_writer");
535 /*
536 It is expensive to open and close the data files and since you can't have
537 a gzip file that can be both read and written we keep a writer open
538 that is shared amoung all open tables.
539 */
540 if (!(azopen(&archive_write, data_file_name,
541 O_RDWR|O_BINARY)))
542 {
543 DBUG_PRINT("ha_archive", ("Could not open archive write file"));
544 crashed= true;
545 DBUG_RETURN(1);
546 }
547 archive_write_open= true;
548
549 DBUG_RETURN(0);
550}
551
552
553void Archive_share::close_archive_writer()
554{
555 mysql_mutex_assert_owner(&mutex);
556 if (archive_write_open)
557 {
558 if (archive_write.version == 1)
559 (void) write_v1_metafile();
560 azclose(&archive_write);
561 archive_write_open= false;
562 dirty= false;
563 }
564}
565
566
567/*
568 No locks are required because it is associated with just one handler instance
569*/
570int ha_archive::init_archive_reader()
571{
572 DBUG_ENTER("ha_archive::init_archive_reader");
573 /*
574 It is expensive to open and close the data files and since you can't have
575 a gzip file that can be both read and written we keep a writer open
576 that is shared amoung all open tables, but have one reader open for
577 each handler instance.
578 */
579 if (!archive_reader_open)
580 {
581 if (!(azopen(&archive, share->data_file_name, O_RDONLY|O_BINARY)))
582 {
583 DBUG_PRINT("ha_archive", ("Could not open archive read file"));
584 share->crashed= TRUE;
585 DBUG_RETURN(1);
586 }
587 archive_reader_open= TRUE;
588 }
589
590 DBUG_RETURN(0);
591}
592
593
594/*
595 When opening a file we:
596 Create/get our shared structure.
597 Init out lock.
598 We open the file we will read from.
599*/
600int ha_archive::open(const char *name, int mode, uint open_options)
601{
602 int rc= 0;
603 DBUG_ENTER("ha_archive::open");
604
605 DBUG_PRINT("ha_archive", ("archive table was opened for crash: %s",
606 (open_options & HA_OPEN_FOR_REPAIR) ? "yes" : "no"));
607 share= get_share(name, &rc);
608 if (!share)
609 DBUG_RETURN(rc);
610
611 /* Allow open on crashed table in repair mode only. */
612 switch (rc)
613 {
614 case 0:
615 break;
616 case HA_ERR_TABLE_DEF_CHANGED:
617 case HA_ERR_CRASHED_ON_USAGE:
618 if (open_options & HA_OPEN_FOR_REPAIR)
619 {
620 rc= 0;
621 break;
622 }
623 /* fall through */
624 default:
625 DBUG_RETURN(rc);
626 }
627
628 DBUG_ASSERT(share);
629
630 record_buffer= create_record_buffer(table->s->reclength +
631 ARCHIVE_ROW_HEADER_SIZE);
632
633 if (!record_buffer)
634 DBUG_RETURN(HA_ERR_OUT_OF_MEM);
635
636 thr_lock_data_init(&share->lock, &lock, NULL);
637
638 DBUG_PRINT("ha_archive", ("archive table was crashed %s",
639 rc == HA_ERR_CRASHED_ON_USAGE ? "yes" : "no"));
640 if (rc == HA_ERR_CRASHED_ON_USAGE && open_options & HA_OPEN_FOR_REPAIR)
641 {
642 DBUG_RETURN(0);
643 }
644
645 DBUG_RETURN(rc);
646}
647
648
649/*
650 Closes the file.
651
652 SYNOPSIS
653 close();
654
655 IMPLEMENTATION:
656
657 We first close this storage engines file handle to the archive and
658 then remove our reference count to the table (and possibly free it
659 as well).
660
661 RETURN
662 0 ok
663 1 Error
664*/
665
666int ha_archive::close(void)
667{
668 int rc= 0;
669 DBUG_ENTER("ha_archive::close");
670
671 destroy_record_buffer(record_buffer);
672
673 /* First close stream */
674 if (archive_reader_open)
675 {
676 if (azclose(&archive))
677 rc= 1;
678 }
679
680 DBUG_RETURN(rc);
681}
682
683
684/**
685 Copy a frm blob between streams.
686
687 @param src The source stream.
688 @param dst The destination stream.
689
690 @return Zero on success, non-zero otherwise.
691*/
692
693int ha_archive::frm_copy(azio_stream *src, azio_stream *dst)
694{
695 int rc= 0;
696 uchar *frm_ptr;
697
698 if (!src->frm_length)
699 {
700 size_t frm_len;
701 if (!table_share->read_frm_image((const uchar**) &frm_ptr, &frm_len))
702 {
703 azwrite_frm(dst, frm_ptr, frm_len);
704 table_share->free_frm_image(frm_ptr);
705 }
706 return 0;
707 }
708
709 if (!(frm_ptr= (uchar *) my_malloc(src->frm_length,
710 MYF(MY_THREAD_SPECIFIC | MY_WME))))
711 return HA_ERR_OUT_OF_MEM;
712
713 /* Write file offset is set to the end of the file. */
714 if (azread_frm(src, frm_ptr) ||
715 azwrite_frm(dst, frm_ptr, src->frm_length))
716 rc= my_errno ? my_errno : HA_ERR_INTERNAL_ERROR;
717
718 my_free(frm_ptr);
719
720 return rc;
721}
722
723
724/**
725 Compare frm blob with the on-disk frm file
726
727 @param s The azio stream.
728
729 @return Zero if equal, non-zero otherwise.
730*/
731
732int ha_archive::frm_compare(azio_stream *s)
733{
734 if (!s->frmver_length)
735 return 0; // Old pre-10.0 archive table. Never rediscover.
736
737 LEX_CUSTRING *ver= &table->s->tabledef_version;
738 return ver->length != s->frmver_length ||
739 memcmp(ver->str, s->frmver, ver->length);
740}
741
742
743/*
744 We create our data file here. The format is pretty simple.
745 You can read about the format of the data file above.
746 Unlike other storage engines we do not "pack" our data. Since we
747 are about to do a general compression, packing would just be a waste of
748 CPU time. If the table has blobs they are written after the row in the order
749 of creation.
750*/
751
752int ha_archive::create(const char *name, TABLE *table_arg,
753 HA_CREATE_INFO *create_info)
754{
755 char name_buff[FN_REFLEN];
756 char linkname[FN_REFLEN];
757 int error;
758 azio_stream create_stream; /* Archive file we are working with */
759 const uchar *frm_ptr;
760 size_t frm_len;
761
762 DBUG_ENTER("ha_archive::create");
763
764 stats.auto_increment_value= create_info->auto_increment_value;
765
766 for (uint key= 0; key < table_arg->s->keys; key++)
767 {
768 KEY *pos= table_arg->key_info+key;
769 KEY_PART_INFO *key_part= pos->key_part;
770 KEY_PART_INFO *key_part_end= key_part + pos->user_defined_key_parts;
771
772 for (; key_part != key_part_end; key_part++)
773 {
774 Field *field= key_part->field;
775
776 if (!(field->flags & AUTO_INCREMENT_FLAG))
777 {
778 error= HA_WRONG_CREATE_OPTION;
779 DBUG_PRINT("ha_archive", ("Index error in creating archive table"));
780 goto error;
781 }
782 }
783 }
784
785 /*
786 We reuse name_buff since it is available.
787 */
788#ifdef HAVE_READLINK
789 if (my_use_symdir &&
790 create_info->data_file_name &&
791 create_info->data_file_name[0] != '#')
792 {
793 DBUG_PRINT("ha_archive", ("archive will create stream file %s",
794 create_info->data_file_name));
795
796 fn_format(name_buff, create_info->data_file_name, "", ARZ,
797 MY_REPLACE_EXT | MY_UNPACK_FILENAME);
798 fn_format(linkname, name, "", ARZ,
799 MY_REPLACE_EXT | MY_UNPACK_FILENAME);
800 }
801 else
802#endif /* HAVE_READLINK */
803 {
804 if (create_info->data_file_name)
805 my_error(WARN_OPTION_IGNORED, MYF(ME_JUST_WARNING), "DATA DIRECTORY");
806
807 fn_format(name_buff, name, "", ARZ,
808 MY_REPLACE_EXT | MY_UNPACK_FILENAME);
809 linkname[0]= 0;
810 }
811
812 /* Archive engine never uses INDEX DIRECTORY. */
813 if (create_info->index_file_name)
814 my_error(WARN_OPTION_IGNORED, MYF(ME_JUST_WARNING), "INDEX DIRECTORY");
815
816 /*
817 There is a chance that the file was "discovered". In this case
818 just use whatever file is there.
819 */
820 my_errno= 0;
821 if (!(azopen(&create_stream, name_buff, O_CREAT|O_RDWR|O_BINARY)))
822 {
823 error= errno;
824 goto error2;
825 }
826
827 if (linkname[0])
828 my_symlink(name_buff, linkname, MYF(0));
829
830 /*
831 Here is where we open up the frm and pass it to archive to store
832 */
833 if (!table_arg->s->read_frm_image(&frm_ptr, &frm_len))
834 {
835 azwrite_frm(&create_stream, frm_ptr, frm_len);
836 table_arg->s->free_frm_image(frm_ptr);
837 }
838
839 if (create_info->comment.str)
840 azwrite_comment(&create_stream, create_info->comment.str,
841 create_info->comment.length);
842
843 /*
844 Yes you need to do this, because the starting value
845 for the autoincrement may not be zero.
846 */
847 create_stream.auto_increment= stats.auto_increment_value ?
848 stats.auto_increment_value - 1 : 0;
849 if (azclose(&create_stream))
850 {
851 error= errno;
852 goto error2;
853 }
854
855 DBUG_PRINT("ha_archive", ("Creating File %s", name_buff));
856 DBUG_PRINT("ha_archive", ("Creating Link %s", linkname));
857
858
859 DBUG_RETURN(0);
860
861error2:
862 delete_table(name);
863error:
864 /* Return error number, if we got one */
865 DBUG_RETURN(error ? error : -1);
866}
867
868/*
869 This is where the actual row is written out.
870*/
871int ha_archive::real_write_row(uchar *buf, azio_stream *writer)
872{
873 my_off_t written;
874 unsigned int r_pack_length;
875 DBUG_ENTER("ha_archive::real_write_row");
876
877 /* We pack the row for writing */
878 r_pack_length= pack_row(buf, writer);
879
880 written= azwrite(writer, record_buffer->buffer, r_pack_length);
881 if (written != r_pack_length)
882 {
883 DBUG_PRINT("ha_archive", ("Wrote %d bytes expected %d",
884 (uint32) written,
885 (uint32)r_pack_length));
886 DBUG_RETURN(-1);
887 }
888
889 if (!delayed_insert || !bulk_insert)
890 share->dirty= TRUE;
891
892 DBUG_RETURN(0);
893}
894
895
896/*
897 Calculate max length needed for row. This includes
898 the bytes required for the length in the header.
899*/
900
901uint32 ha_archive::max_row_length(const uchar *buf)
902{
903 uint32 length= (uint32)(table->s->reclength + table->s->fields*2);
904 length+= ARCHIVE_ROW_HEADER_SIZE;
905
906 uint *ptr, *end;
907 for (ptr= table->s->blob_field, end=ptr + table->s->blob_fields ;
908 ptr != end ;
909 ptr++)
910 {
911 if (!table->field[*ptr]->is_null())
912 length += 2 + ((Field_blob*)table->field[*ptr])->get_length();
913 }
914
915 return length;
916}
917
918
919unsigned int ha_archive::pack_row(uchar *record, azio_stream *writer)
920{
921 uchar *ptr;
922
923 DBUG_ENTER("ha_archive::pack_row");
924
925
926 if (fix_rec_buff(max_row_length(record)))
927 DBUG_RETURN(HA_ERR_OUT_OF_MEM); /* purecov: inspected */
928
929 if (writer->version == 1)
930 DBUG_RETURN(pack_row_v1(record));
931
932 /* Copy null bits */
933 memcpy(record_buffer->buffer+ARCHIVE_ROW_HEADER_SIZE,
934 record, table->s->null_bytes);
935 ptr= record_buffer->buffer + table->s->null_bytes + ARCHIVE_ROW_HEADER_SIZE;
936
937 for (Field **field=table->field ; *field ; field++)
938 {
939 if (!((*field)->is_null()))
940 ptr= (*field)->pack(ptr, record + (*field)->offset(record));
941 }
942
943 int4store(record_buffer->buffer, (int)(ptr - record_buffer->buffer -
944 ARCHIVE_ROW_HEADER_SIZE));
945 DBUG_PRINT("ha_archive",("Pack row length %u", (unsigned int)
946 (ptr - record_buffer->buffer -
947 ARCHIVE_ROW_HEADER_SIZE)));
948
949 DBUG_RETURN((unsigned int) (ptr - record_buffer->buffer));
950}
951
952
953/*
954 Look at ha_archive::open() for an explanation of the row format.
955 Here we just write out the row.
956
957 Wondering about start_bulk_insert()? We don't implement it for
958 archive since it optimizes for lots of writes. The only save
959 for implementing start_bulk_insert() is that we could skip
960 setting dirty to true each time.
961*/
962int ha_archive::write_row(uchar *buf)
963{
964 int rc;
965 uchar *read_buf= NULL;
966 ulonglong temp_auto;
967 uchar *record= table->record[0];
968 DBUG_ENTER("ha_archive::write_row");
969
970 if (share->crashed)
971 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
972
973 mysql_mutex_lock(&share->mutex);
974
975 if (!share->archive_write_open && share->init_archive_writer())
976 {
977 rc= errno;
978 goto error;
979 }
980
981 if (table->next_number_field && record == table->record[0])
982 {
983 KEY *mkey= &table->s->key_info[0]; // We only support one key right now
984 update_auto_increment();
985 temp_auto= table->next_number_field->val_int();
986
987 /*
988 We don't support decremening auto_increment. They make the performance
989 just cry.
990 */
991 if (temp_auto <= share->archive_write.auto_increment &&
992 mkey->flags & HA_NOSAME)
993 {
994 rc= HA_ERR_FOUND_DUPP_KEY;
995 goto error;
996 }
997#ifdef DEAD_CODE
998 /*
999 Bad news, this will cause a search for the unique value which is very
1000 expensive since we will have to do a table scan which will lock up
1001 all other writers during this period. This could perhaps be optimized
1002 in the future.
1003 */
1004 {
1005 /*
1006 First we create a buffer that we can use for reading rows, and can pass
1007 to get_row().
1008 */
1009 if (!(read_buf= (uchar*) my_malloc(table->s->reclength,
1010 MYF(MY_THREAD_SPECIFIC | MY_WME))))
1011 {
1012 rc= HA_ERR_OUT_OF_MEM;
1013 goto error;
1014 }
1015 /*
1016 All of the buffer must be written out or we won't see all of the
1017 data
1018 */
1019 azflush(&(share->archive_write), Z_SYNC_FLUSH);
1020 /*
1021 Set the position of the local read thread to the beginning position.
1022 */
1023 if (read_data_header(&archive))
1024 {
1025 rc= HA_ERR_CRASHED_ON_USAGE;
1026 goto error;
1027 }
1028
1029 Field *mfield= table->next_number_field;
1030
1031 while (!(get_row(&archive, read_buf)))
1032 {
1033 if (!memcmp(read_buf + mfield->offset(record),
1034 table->next_number_field->ptr,
1035 mfield->max_display_length()))
1036 {
1037 rc= HA_ERR_FOUND_DUPP_KEY;
1038 goto error;
1039 }
1040 }
1041 }
1042#endif
1043 else
1044 {
1045 if (temp_auto > share->archive_write.auto_increment)
1046 stats.auto_increment_value=
1047 (share->archive_write.auto_increment= temp_auto) + 1;
1048 }
1049 }
1050
1051 /*
1052 Notice that the global auto_increment has been increased.
1053 In case of a failed row write, we will never try to reuse the value.
1054 */
1055 share->rows_recorded++;
1056 rc= real_write_row(buf, &(share->archive_write));
1057error:
1058 mysql_mutex_unlock(&share->mutex);
1059 my_free(read_buf);
1060 DBUG_RETURN(rc);
1061}
1062
1063
1064void ha_archive::get_auto_increment(ulonglong offset, ulonglong increment,
1065 ulonglong nb_desired_values,
1066 ulonglong *first_value,
1067 ulonglong *nb_reserved_values)
1068{
1069 *nb_reserved_values= ULONGLONG_MAX;
1070 *first_value= share->archive_write.auto_increment + 1;
1071}
1072
1073/* Initialized at each key walk (called multiple times unlike rnd_init()) */
1074int ha_archive::index_init(uint keynr, bool sorted)
1075{
1076 DBUG_ENTER("ha_archive::index_init");
1077 active_index= keynr;
1078 DBUG_RETURN(0);
1079}
1080
1081
1082/*
1083 No indexes, so if we get a request for an index search since we tell
1084 the optimizer that we have unique indexes, we scan
1085*/
1086int ha_archive::index_read(uchar *buf, const uchar *key,
1087 uint key_len, enum ha_rkey_function find_flag)
1088{
1089 int rc;
1090 DBUG_ENTER("ha_archive::index_read");
1091 rc= index_read_idx(buf, active_index, key, key_len, find_flag);
1092 DBUG_RETURN(rc);
1093}
1094
1095
1096int ha_archive::index_read_idx(uchar *buf, uint index, const uchar *key,
1097 uint key_len, enum ha_rkey_function find_flag)
1098{
1099 int rc;
1100 bool found= 0;
1101 KEY *mkey= &table->s->key_info[index];
1102 current_k_offset= mkey->key_part->offset;
1103 current_key= key;
1104 current_key_len= key_len;
1105
1106
1107 DBUG_ENTER("ha_archive::index_read_idx");
1108
1109 rc= rnd_init(TRUE);
1110
1111 if (rc)
1112 goto error;
1113
1114 while (!(get_row(&archive, buf)))
1115 {
1116 if (!memcmp(current_key, buf + current_k_offset, current_key_len))
1117 {
1118 found= 1;
1119 break;
1120 }
1121 }
1122
1123 if (found)
1124 {
1125 /* notify handler that a record has been found */
1126 table->status= 0;
1127 DBUG_RETURN(0);
1128 }
1129
1130error:
1131 DBUG_RETURN(rc ? rc : HA_ERR_END_OF_FILE);
1132}
1133
1134
1135int ha_archive::index_next(uchar * buf)
1136{
1137 bool found= 0;
1138 int rc;
1139
1140 DBUG_ENTER("ha_archive::index_next");
1141
1142 while (!(get_row(&archive, buf)))
1143 {
1144 if (!memcmp(current_key, buf+current_k_offset, current_key_len))
1145 {
1146 found= 1;
1147 break;
1148 }
1149 }
1150
1151 rc= found ? 0 : HA_ERR_END_OF_FILE;
1152 DBUG_RETURN(rc);
1153}
1154
1155/*
1156 All calls that need to scan the table start with this method. If we are told
1157 that it is a table scan we rewind the file to the beginning, otherwise
1158 we assume the position will be set.
1159*/
1160
1161int ha_archive::rnd_init(bool scan)
1162{
1163 DBUG_ENTER("ha_archive::rnd_init");
1164
1165 if (share->crashed)
1166 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1167
1168 if (init_archive_reader())
1169 DBUG_RETURN(errno);
1170
1171 /* We rewind the file so that we can read from the beginning if scan */
1172 if (scan)
1173 {
1174 scan_rows= stats.records;
1175 DBUG_PRINT("info", ("archive will retrieve %llu rows",
1176 (unsigned long long) scan_rows));
1177
1178 if (read_data_header(&archive))
1179 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1180 }
1181
1182 DBUG_RETURN(0);
1183}
1184
1185
1186/*
1187 This is the method that is used to read a row. It assumes that the row is
1188 positioned where you want it.
1189*/
1190int ha_archive::get_row(azio_stream *file_to_read, uchar *buf)
1191{
1192 int rc;
1193 DBUG_ENTER("ha_archive::get_row");
1194 DBUG_PRINT("ha_archive", ("Picking version for get_row() %d -> %d",
1195 (uchar)file_to_read->version,
1196 ARCHIVE_VERSION));
1197 if (file_to_read->version == ARCHIVE_VERSION)
1198 rc= get_row_version3(file_to_read, buf);
1199 else
1200 rc= get_row_version2(file_to_read, buf);
1201
1202 DBUG_PRINT("ha_archive", ("Return %d\n", rc));
1203
1204 DBUG_RETURN(rc);
1205}
1206
1207/* Reallocate buffer if needed */
1208bool ha_archive::fix_rec_buff(unsigned int length)
1209{
1210 DBUG_ENTER("ha_archive::fix_rec_buff");
1211 DBUG_PRINT("ha_archive", ("Fixing %u for %u",
1212 length, record_buffer->length));
1213 DBUG_ASSERT(record_buffer->buffer);
1214
1215 if (length > record_buffer->length)
1216 {
1217 uchar *newptr;
1218 if (!(newptr=(uchar*) my_realloc((uchar*) record_buffer->buffer,
1219 length,
1220 MYF(MY_ALLOW_ZERO_PTR))))
1221 DBUG_RETURN(1);
1222 record_buffer->buffer= newptr;
1223 record_buffer->length= length;
1224 }
1225
1226 DBUG_ASSERT(length <= record_buffer->length);
1227
1228 DBUG_RETURN(0);
1229}
1230
1231int ha_archive::unpack_row(azio_stream *file_to_read, uchar *record)
1232{
1233 DBUG_ENTER("ha_archive::unpack_row");
1234
1235 unsigned int read;
1236 int error;
1237 uchar size_buffer[ARCHIVE_ROW_HEADER_SIZE];
1238 unsigned int row_len;
1239
1240 /* First we grab the length stored */
1241 read= azread(file_to_read, size_buffer, ARCHIVE_ROW_HEADER_SIZE, &error);
1242
1243 if (error == Z_STREAM_ERROR || (read && read < ARCHIVE_ROW_HEADER_SIZE))
1244 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1245
1246 /* If we read nothing we are at the end of the file */
1247 if (read == 0 || read != ARCHIVE_ROW_HEADER_SIZE)
1248 DBUG_RETURN(HA_ERR_END_OF_FILE);
1249
1250 row_len= uint4korr(size_buffer);
1251 DBUG_PRINT("ha_archive",("Unpack row length %u -> %u", row_len,
1252 (unsigned int)table->s->reclength));
1253
1254 if (fix_rec_buff(row_len))
1255 {
1256 DBUG_RETURN(HA_ERR_OUT_OF_MEM);
1257 }
1258 DBUG_ASSERT(row_len <= record_buffer->length);
1259
1260 read= azread(file_to_read, record_buffer->buffer, row_len, &error);
1261
1262 if (read != row_len || error)
1263 {
1264 DBUG_RETURN(error ? HA_ERR_CRASHED_ON_USAGE : HA_ERR_WRONG_IN_RECORD);
1265 }
1266
1267 /* Copy null bits */
1268 const uchar *ptr= record_buffer->buffer, *end= ptr+ row_len;
1269 memcpy(record, ptr, table->s->null_bytes);
1270 ptr+= table->s->null_bytes;
1271 if (ptr > end)
1272 DBUG_RETURN(HA_ERR_WRONG_IN_RECORD);
1273 for (Field **field=table->field ; *field ; field++)
1274 {
1275 if (!((*field)->is_null_in_record(record)))
1276 {
1277 if (!(ptr= (*field)->unpack(record + (*field)->offset(table->record[0]),
1278 ptr, end)))
1279 DBUG_RETURN(HA_ERR_WRONG_IN_RECORD);
1280 }
1281 }
1282 if (ptr != end)
1283 DBUG_RETURN(HA_ERR_WRONG_IN_RECORD);
1284 DBUG_RETURN(0);
1285}
1286
1287
1288int ha_archive::get_row_version3(azio_stream *file_to_read, uchar *buf)
1289{
1290 DBUG_ENTER("ha_archive::get_row_version3");
1291
1292 int returnable= unpack_row(file_to_read, buf);
1293
1294 DBUG_RETURN(returnable);
1295}
1296
1297
1298int ha_archive::get_row_version2(azio_stream *file_to_read, uchar *buf)
1299{
1300 unsigned int read;
1301 int error;
1302 uint *ptr, *end;
1303 char *last;
1304 size_t total_blob_length= 0;
1305 MY_BITMAP *read_set= table->read_set;
1306 DBUG_ENTER("ha_archive::get_row_version2");
1307
1308 read= azread(file_to_read, (voidp)buf, table->s->reclength, &error);
1309
1310 /* If we read nothing we are at the end of the file */
1311 if (read == 0)
1312 DBUG_RETURN(HA_ERR_END_OF_FILE);
1313
1314 if (read != table->s->reclength)
1315 {
1316 DBUG_PRINT("ha_archive::get_row_version2", ("Read %u bytes expected %u",
1317 read,
1318 (unsigned int)table->s->reclength));
1319 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1320 }
1321
1322 if (error == Z_STREAM_ERROR || error == Z_DATA_ERROR )
1323 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1324
1325 /*
1326 If the record is the wrong size, the file is probably damaged, unless
1327 we are dealing with a delayed insert or a bulk insert.
1328 */
1329 if ((ulong) read != table->s->reclength)
1330 DBUG_RETURN(HA_ERR_END_OF_FILE);
1331
1332 /* Calculate blob length, we use this for our buffer */
1333 for (ptr= table->s->blob_field, end=ptr + table->s->blob_fields ;
1334 ptr != end ;
1335 ptr++)
1336 {
1337 if (bitmap_is_set(read_set,
1338 (((Field_blob*) table->field[*ptr])->field_index)))
1339 total_blob_length += ((Field_blob*) table->field[*ptr])->get_length();
1340 }
1341
1342 /* Adjust our row buffer if we need be */
1343 buffer.alloc(total_blob_length);
1344 last= (char *)buffer.ptr();
1345
1346 /* Loop through our blobs and read them */
1347 for (ptr= table->s->blob_field, end=ptr + table->s->blob_fields ;
1348 ptr != end ;
1349 ptr++)
1350 {
1351 size_t size= ((Field_blob*) table->field[*ptr])->get_length();
1352 if (size)
1353 {
1354 if (bitmap_is_set(read_set,
1355 ((Field_blob*) table->field[*ptr])->field_index))
1356 {
1357 read= azread(file_to_read, last, size, &error);
1358
1359 if (error)
1360 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1361
1362 if ((size_t) read != size)
1363 DBUG_RETURN(HA_ERR_END_OF_FILE);
1364 ((Field_blob*) table->field[*ptr])->set_ptr(read, (uchar*) last);
1365 last += size;
1366 }
1367 else
1368 {
1369 (void)azseek(file_to_read, size, SEEK_CUR);
1370 }
1371 }
1372 }
1373 DBUG_RETURN(0);
1374}
1375
1376
1377/*
1378 Called during ORDER BY. Its position is either from being called sequentially
1379 or by having had ha_archive::rnd_pos() called before it is called.
1380*/
1381
1382int ha_archive::rnd_next(uchar *buf)
1383{
1384 int rc;
1385 DBUG_ENTER("ha_archive::rnd_next");
1386
1387 if (share->crashed)
1388 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1389
1390 if (!scan_rows)
1391 {
1392 rc= HA_ERR_END_OF_FILE;
1393 goto end;
1394 }
1395 scan_rows--;
1396
1397 current_position= aztell(&archive);
1398 rc= get_row(&archive, buf);
1399
1400end:
1401 DBUG_RETURN(rc);
1402}
1403
1404
1405/*
1406 Thanks to the table flag HA_REC_NOT_IN_SEQ this will be called after
1407 each call to ha_archive::rnd_next() if an ordering of the rows is
1408 needed.
1409*/
1410
1411void ha_archive::position(const uchar *record)
1412{
1413 DBUG_ENTER("ha_archive::position");
1414 my_store_ptr(ref, ref_length, current_position);
1415 DBUG_VOID_RETURN;
1416}
1417
1418
1419/*
1420 This is called after a table scan for each row if the results of the
1421 scan need to be ordered. It will take *pos and use it to move the
1422 cursor in the file so that the next row that is called is the
1423 correctly ordered row.
1424*/
1425
1426int ha_archive::rnd_pos(uchar * buf, uchar *pos)
1427{
1428 int rc;
1429 DBUG_ENTER("ha_archive::rnd_pos");
1430 current_position= (my_off_t)my_get_ptr(pos, ref_length);
1431 if (azseek(&archive, current_position, SEEK_SET) == (my_off_t)(-1L))
1432 {
1433 rc= HA_ERR_CRASHED_ON_USAGE;
1434 goto end;
1435 }
1436 rc= get_row(&archive, buf);
1437end:
1438 DBUG_RETURN(rc);
1439}
1440
1441
1442/**
1443 @brief Check for upgrade
1444
1445 @param[in] check_opt check options
1446
1447 @return Completion status
1448 @retval HA_ADMIN_OK No upgrade required
1449 @retval HA_ADMIN_CORRUPT Cannot read meta-data
1450 @retval HA_ADMIN_NEEDS_UPGRADE Upgrade required
1451*/
1452
1453int ha_archive::check_for_upgrade(HA_CHECK_OPT *check_opt)
1454{
1455 DBUG_ENTER("ha_archive::check_for_upgrade");
1456 if (init_archive_reader())
1457 DBUG_RETURN(HA_ADMIN_CORRUPT);
1458 if (archive.version < ARCHIVE_VERSION)
1459 DBUG_RETURN(HA_ADMIN_NEEDS_UPGRADE);
1460 DBUG_RETURN(HA_ADMIN_OK);
1461}
1462
1463
1464/*
1465 This method repairs the meta file. It does this by walking the datafile and
1466 rewriting the meta file. If EXTENDED repair is requested, we attempt to
1467 recover as much data as possible.
1468*/
1469int ha_archive::repair(THD* thd, HA_CHECK_OPT* check_opt)
1470{
1471 DBUG_ENTER("ha_archive::repair");
1472 int rc= optimize(thd, check_opt);
1473
1474 if (rc)
1475 DBUG_RETURN(HA_ADMIN_CORRUPT);
1476
1477 share->crashed= FALSE;
1478 DBUG_RETURN(0);
1479}
1480
1481/*
1482 The table can become fragmented if data was inserted, read, and then
1483 inserted again. What we do is open up the file and recompress it completely.
1484*/
1485int ha_archive::optimize(THD* thd, HA_CHECK_OPT* check_opt)
1486{
1487 int rc= 0;
1488 azio_stream writer;
1489 char writer_filename[FN_REFLEN];
1490 DBUG_ENTER("ha_archive::optimize");
1491
1492 mysql_mutex_lock(&share->mutex);
1493
1494 if (init_archive_reader())
1495 {
1496 mysql_mutex_unlock(&share->mutex);
1497 DBUG_RETURN(errno);
1498 }
1499
1500 // now we close both our writer and our reader for the rename
1501 if (share->archive_write_open)
1502 {
1503 azclose(&(share->archive_write));
1504 share->archive_write_open= FALSE;
1505 }
1506
1507 /* Lets create a file to contain the new data */
1508 fn_format(writer_filename, share->table_name, "", ARN,
1509 MY_REPLACE_EXT | MY_UNPACK_FILENAME);
1510
1511 if (!(azopen(&writer, writer_filename, O_CREAT|O_RDWR|O_BINARY)))
1512 {
1513 mysql_mutex_unlock(&share->mutex);
1514 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1515 }
1516
1517 /*
1518 Transfer the embedded FRM so that the file can be discoverable.
1519 Write file offset is set to the end of the file.
1520 */
1521 if ((rc= frm_copy(&archive, &writer)))
1522 goto error;
1523
1524 /*
1525 An extended rebuild is a lot more effort. We open up each row and re-record it.
1526 Any dead rows are removed (aka rows that may have been partially recorded).
1527
1528 As of Archive format 3, this is the only type that is performed, before this
1529 version it was just done on T_EXTEND
1530 */
1531 if (1)
1532 {
1533 DBUG_PRINT("ha_archive", ("archive extended rebuild"));
1534
1535 /*
1536 Now we will rewind the archive file so that we are positioned at the
1537 start of the file.
1538 */
1539 rc= read_data_header(&archive);
1540
1541 /*
1542 On success of writing out the new header, we now fetch each row and
1543 insert it into the new archive file.
1544 */
1545 if (!rc)
1546 {
1547 share->rows_recorded= 0;
1548 stats.auto_increment_value= 1;
1549 share->archive_write.auto_increment= 0;
1550 my_bitmap_map *org_bitmap= tmp_use_all_columns(table, table->read_set);
1551
1552 while (!(rc= get_row(&archive, table->record[0])))
1553 {
1554 real_write_row(table->record[0], &writer);
1555 /*
1556 Long term it should be possible to optimize this so that
1557 it is not called on each row.
1558 */
1559 if (table->found_next_number_field)
1560 {
1561 Field *field= table->found_next_number_field;
1562 ulonglong auto_value=
1563 (ulonglong) field->val_int(table->record[0] +
1564 field->offset(table->record[0]));
1565 if (share->archive_write.auto_increment < auto_value)
1566 stats.auto_increment_value=
1567 (share->archive_write.auto_increment= auto_value) + 1;
1568 }
1569 }
1570
1571 tmp_restore_column_map(table->read_set, org_bitmap);
1572 share->rows_recorded= (ha_rows)writer.rows;
1573 }
1574
1575 DBUG_PRINT("info", ("recovered %llu archive rows",
1576 (unsigned long long)share->rows_recorded));
1577
1578 DBUG_PRINT("ha_archive", ("recovered %llu archive rows",
1579 (unsigned long long)share->rows_recorded));
1580
1581 /*
1582 If REPAIR ... EXTENDED is requested, try to recover as much data
1583 from data file as possible. In this case if we failed to read a
1584 record, we assume EOF. This allows massive data loss, but we can
1585 hardly do more with broken zlib stream. And this is the only way
1586 to restore at least what is still recoverable.
1587 */
1588 if (rc && rc != HA_ERR_END_OF_FILE && !(check_opt->flags & T_EXTEND))
1589 goto error;
1590 }
1591
1592 azclose(&writer);
1593 share->dirty= FALSE;
1594
1595 azclose(&archive);
1596
1597 // make the file we just wrote be our data file
1598 rc= my_rename(writer_filename, share->data_file_name, MYF(0));
1599
1600
1601 mysql_mutex_unlock(&share->mutex);
1602 DBUG_RETURN(rc);
1603error:
1604 DBUG_PRINT("ha_archive", ("Failed to recover, error was %d", rc));
1605 azclose(&writer);
1606 mysql_mutex_unlock(&share->mutex);
1607
1608 DBUG_RETURN(rc);
1609}
1610
1611/*
1612 Below is an example of how to setup row level locking.
1613*/
1614THR_LOCK_DATA **ha_archive::store_lock(THD *thd,
1615 THR_LOCK_DATA **to,
1616 enum thr_lock_type lock_type)
1617{
1618 if (lock_type == TL_WRITE_DELAYED)
1619 delayed_insert= TRUE;
1620 else
1621 delayed_insert= FALSE;
1622
1623 if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK)
1624 {
1625 /*
1626 Here is where we get into the guts of a row level lock.
1627 If TL_UNLOCK is set
1628 If we are not doing a LOCK TABLE, DELAYED LOCK or DISCARD/IMPORT
1629 TABLESPACE, then allow multiple writers
1630 */
1631
1632 if ((lock_type >= TL_WRITE_CONCURRENT_INSERT &&
1633 lock_type <= TL_WRITE) && delayed_insert == FALSE &&
1634 !thd_in_lock_tables(thd)
1635 && !thd_tablespace_op(thd))
1636 lock_type = TL_WRITE_ALLOW_WRITE;
1637
1638 /*
1639 In queries of type INSERT INTO t1 SELECT ... FROM t2 ...
1640 MySQL would use the lock TL_READ_NO_INSERT on t2, and that
1641 would conflict with TL_WRITE_ALLOW_WRITE, blocking all inserts
1642 to t2. Convert the lock to a normal read lock to allow
1643 concurrent inserts to t2.
1644 */
1645
1646 if (lock_type == TL_READ_NO_INSERT && !thd_in_lock_tables(thd))
1647 lock_type = TL_READ;
1648
1649 lock.type=lock_type;
1650 }
1651
1652 *to++= &lock;
1653
1654 return to;
1655}
1656
1657void ha_archive::update_create_info(HA_CREATE_INFO *create_info)
1658{
1659 char tmp_real_path[FN_REFLEN];
1660 DBUG_ENTER("ha_archive::update_create_info");
1661
1662 ha_archive::info(HA_STATUS_AUTO);
1663 if (!(create_info->used_fields & HA_CREATE_USED_AUTO))
1664 {
1665 create_info->auto_increment_value= stats.auto_increment_value;
1666 }
1667
1668 if (!(my_readlink(tmp_real_path, share->data_file_name, MYF(0))))
1669 create_info->data_file_name= thd_strdup(ha_thd(), tmp_real_path);
1670
1671 DBUG_VOID_RETURN;
1672}
1673
1674
1675/*
1676 Hints for optimizer, see ha_tina for more information
1677*/
1678int ha_archive::info(uint flag)
1679{
1680 DBUG_ENTER("ha_archive::info");
1681
1682 mysql_mutex_lock(&share->mutex);
1683 if (share->dirty)
1684 {
1685 DBUG_PRINT("ha_archive", ("archive flushing out rows for scan"));
1686 DBUG_ASSERT(share->archive_write_open);
1687 azflush(&(share->archive_write), Z_SYNC_FLUSH);
1688 share->dirty= FALSE;
1689 }
1690
1691 /*
1692 This should be an accurate number now, though bulk and delayed inserts can
1693 cause the number to be inaccurate.
1694 */
1695 stats.records= share->rows_recorded;
1696 mysql_mutex_unlock(&share->mutex);
1697
1698 stats.deleted= 0;
1699
1700 DBUG_PRINT("ha_archive", ("Stats rows is %d\n", (int)stats.records));
1701 /* Costs quite a bit more to get all information */
1702 if (flag & (HA_STATUS_TIME | HA_STATUS_CONST | HA_STATUS_VARIABLE))
1703 {
1704 MY_STAT file_stat; // Stat information for the data file
1705
1706 (void) mysql_file_stat(/* arch_key_file_data */ 0, share->data_file_name, &file_stat, MYF(MY_WME));
1707
1708 if (flag & HA_STATUS_TIME)
1709 stats.update_time= (ulong) file_stat.st_mtime;
1710 if (flag & HA_STATUS_CONST)
1711 {
1712 stats.max_data_file_length= MAX_FILE_SIZE;
1713 stats.create_time= (ulong) file_stat.st_ctime;
1714 }
1715 if (flag & HA_STATUS_VARIABLE)
1716 {
1717 stats.delete_length= 0;
1718 stats.data_file_length= file_stat.st_size;
1719 stats.index_file_length=0;
1720 stats.mean_rec_length= stats.records ?
1721 ulong(stats.data_file_length / stats.records) : table->s->reclength;
1722 }
1723 }
1724
1725 if (flag & HA_STATUS_AUTO)
1726 {
1727 if (init_archive_reader())
1728 DBUG_RETURN(errno);
1729
1730 mysql_mutex_lock(&share->mutex);
1731 azflush(&archive, Z_SYNC_FLUSH);
1732 mysql_mutex_unlock(&share->mutex);
1733 stats.auto_increment_value= archive.auto_increment + 1;
1734 }
1735
1736 DBUG_RETURN(0);
1737}
1738
1739
1740/*
1741 This method tells us that a bulk insert operation is about to occur. We set
1742 a flag which will keep write_row from saying that its data is dirty. This in
1743 turn will keep selects from causing a sync to occur.
1744 Basically, yet another optimizations to keep compression working well.
1745*/
1746void ha_archive::start_bulk_insert(ha_rows rows, uint flags)
1747{
1748 DBUG_ENTER("ha_archive::start_bulk_insert");
1749 if (!rows || rows >= ARCHIVE_MIN_ROWS_TO_USE_BULK_INSERT)
1750 bulk_insert= TRUE;
1751 DBUG_VOID_RETURN;
1752}
1753
1754
1755/*
1756 Other side of start_bulk_insert, is end_bulk_insert. Here we turn off the bulk insert
1757 flag, and set the share dirty so that the next select will call sync for us.
1758*/
1759int ha_archive::end_bulk_insert()
1760{
1761 DBUG_ENTER("ha_archive::end_bulk_insert");
1762 bulk_insert= FALSE;
1763 mysql_mutex_lock(&share->mutex);
1764 if (share->archive_write_open)
1765 share->dirty= true;
1766 mysql_mutex_unlock(&share->mutex);
1767 DBUG_RETURN(0);
1768}
1769
1770/*
1771 We cancel a truncate command. The only way to delete an archive table is to drop it.
1772 This is done for security reasons. In a later version we will enable this by
1773 allowing the user to select a different row format.
1774*/
1775int ha_archive::truncate()
1776{
1777 DBUG_ENTER("ha_archive::truncate");
1778 DBUG_RETURN(HA_ERR_WRONG_COMMAND);
1779}
1780
1781/*
1782 We just return state if asked.
1783*/
1784bool ha_archive::is_crashed() const
1785{
1786 DBUG_ENTER("ha_archive::is_crashed");
1787 DBUG_RETURN(share->crashed);
1788}
1789
1790/*
1791 Simple scan of the tables to make sure everything is ok.
1792*/
1793
1794int ha_archive::check(THD* thd, HA_CHECK_OPT* check_opt)
1795{
1796 int rc= 0;
1797 const char *old_proc_info;
1798 ha_rows count;
1799 DBUG_ENTER("ha_archive::check");
1800
1801 old_proc_info= thd_proc_info(thd, "Checking table");
1802 mysql_mutex_lock(&share->mutex);
1803 count= share->rows_recorded;
1804 /* Flush any waiting data */
1805 if (share->archive_write_open)
1806 azflush(&(share->archive_write), Z_SYNC_FLUSH);
1807 mysql_mutex_unlock(&share->mutex);
1808
1809 if (init_archive_reader())
1810 DBUG_RETURN(HA_ADMIN_CORRUPT);
1811 /*
1812 Now we will rewind the archive file so that we are positioned at the
1813 start of the file.
1814 */
1815 read_data_header(&archive);
1816 for (ha_rows cur_count= count; cur_count; cur_count--)
1817 {
1818 if ((rc= get_row(&archive, table->record[0])))
1819 goto error;
1820 }
1821 /*
1822 Now read records that may have been inserted concurrently.
1823 Acquire share->mutex so tail of the table is not modified by
1824 concurrent writers.
1825 */
1826 mysql_mutex_lock(&share->mutex);
1827 count= share->rows_recorded - count;
1828 if (share->archive_write_open)
1829 azflush(&(share->archive_write), Z_SYNC_FLUSH);
1830 while (!(rc= get_row(&archive, table->record[0])))
1831 count--;
1832 mysql_mutex_unlock(&share->mutex);
1833
1834 if ((rc && rc != HA_ERR_END_OF_FILE) || count)
1835 goto error;
1836
1837 thd_proc_info(thd, old_proc_info);
1838 DBUG_RETURN(HA_ADMIN_OK);
1839
1840error:
1841 thd_proc_info(thd, old_proc_info);
1842 share->crashed= FALSE;
1843 DBUG_RETURN(HA_ADMIN_CORRUPT);
1844}
1845
1846/*
1847 Check and repair the table if needed.
1848*/
1849bool ha_archive::check_and_repair(THD *thd)
1850{
1851 HA_CHECK_OPT check_opt;
1852 DBUG_ENTER("ha_archive::check_and_repair");
1853
1854 check_opt.init();
1855
1856 DBUG_RETURN(repair(thd, &check_opt));
1857}
1858
1859archive_record_buffer *ha_archive::create_record_buffer(unsigned int length)
1860{
1861 DBUG_ENTER("ha_archive::create_record_buffer");
1862 archive_record_buffer *r;
1863 if (!(r=
1864 (archive_record_buffer*) my_malloc(sizeof(archive_record_buffer),
1865 MYF(MY_WME))))
1866 {
1867 DBUG_RETURN(NULL); /* purecov: inspected */
1868 }
1869 r->length= (int)length;
1870
1871 if (!(r->buffer= (uchar*) my_malloc(r->length,
1872 MYF(MY_WME))))
1873 {
1874 my_free(r);
1875 DBUG_RETURN(NULL); /* purecov: inspected */
1876 }
1877
1878 DBUG_RETURN(r);
1879}
1880
1881void ha_archive::destroy_record_buffer(archive_record_buffer *r)
1882{
1883 DBUG_ENTER("ha_archive::destroy_record_buffer");
1884 my_free(r->buffer);
1885 my_free(r);
1886 DBUG_VOID_RETURN;
1887}
1888
1889/*
1890 In archive *any* ALTER should cause a table to be rebuilt,
1891 no ALTER can be frm-only.
1892 Because after any change to the frm file archive must update the
1893 frm image in the ARZ file. And this cannot be done in-place, it
1894 requires ARZ file to be recreated from scratch
1895*/
1896bool ha_archive::check_if_incompatible_data(HA_CREATE_INFO *info_arg,
1897 uint table_changes)
1898{
1899 return COMPATIBLE_DATA_NO;
1900}
1901
1902
1903struct st_mysql_storage_engine archive_storage_engine=
1904{ MYSQL_HANDLERTON_INTERFACE_VERSION };
1905
1906maria_declare_plugin(archive)
1907{
1908 MYSQL_STORAGE_ENGINE_PLUGIN,
1909 &archive_storage_engine,
1910 "ARCHIVE",
1911 "Brian Aker, MySQL AB",
1912 "Archive storage engine",
1913 PLUGIN_LICENSE_GPL,
1914 archive_db_init, /* Plugin Init */
1915 NULL, /* Plugin Deinit */
1916 0x0300 /* 3.0 */,
1917 NULL, /* status variables */
1918 NULL, /* system variables */
1919 "1.0", /* string version */
1920 MariaDB_PLUGIN_MATURITY_STABLE /* maturity */
1921}
1922maria_declare_plugin_end;
1923
1924