1/* -*- c-basic-offset: 2 -*- */
2/*
3 Copyright(C) 2015 Kouhei Sutou <kou@clear-code.com>
4
5 This library is free software; you can redistribute it and/or
6 modify it under the terms of the GNU Lesser General Public
7 License as published by the Free Software Foundation; either
8 version 2.1 of the License, or (at your option) any later version.
9
10 This library is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Lesser General Public License for more details.
14
15 You should have received a copy of the GNU Lesser General Public
16 License along with this library; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
18*/
19
20#include <mrn_mysql.h>
21
22#include <string.h>
23
24#include "mrn_operations.hpp"
25
26// for debug
27#define MRN_CLASS_NAME "mrn::Operations"
28
29#define TABLE_NAME "mroonga_operations"
30#define COLUMN_TYPE_NAME "type"
31#define COLUMN_TABLE_NAME "table"
32#define COLUMN_RECORD_NAME "record"
33
34namespace mrn {
35 Operations::Operations(grn_ctx *ctx)
36 : ctx_(ctx) {
37 MRN_DBUG_ENTER_METHOD();
38
39 GRN_TEXT_INIT(&text_buffer_, GRN_OBJ_DO_SHALLOW_COPY);
40 GRN_UINT32_INIT(&id_buffer_, 0);
41
42 table_ = grn_ctx_get(ctx_, TABLE_NAME, -1);
43 if (!table_) {
44 table_ = grn_table_create(ctx_,
45 TABLE_NAME, strlen(TABLE_NAME),
46 NULL,
47 GRN_OBJ_TABLE_NO_KEY | GRN_OBJ_PERSISTENT,
48 NULL, NULL);
49 columns_.type_ =
50 grn_column_create(ctx_, table_,
51 COLUMN_TYPE_NAME, strlen(COLUMN_TYPE_NAME),
52 NULL,
53 GRN_OBJ_COLUMN_SCALAR | GRN_OBJ_PERSISTENT,
54 grn_ctx_at(ctx_, GRN_DB_SHORT_TEXT));
55 columns_.table_ =
56 grn_column_create(ctx_, table_,
57 COLUMN_TABLE_NAME, strlen(COLUMN_TABLE_NAME),
58 NULL,
59 GRN_OBJ_COLUMN_SCALAR | GRN_OBJ_PERSISTENT,
60 grn_ctx_at(ctx_, GRN_DB_SHORT_TEXT));
61 columns_.record_ =
62 grn_column_create(ctx_, table_,
63 COLUMN_RECORD_NAME, strlen(COLUMN_RECORD_NAME),
64 NULL,
65 GRN_OBJ_COLUMN_SCALAR | GRN_OBJ_PERSISTENT,
66 grn_ctx_at(ctx_, GRN_DB_UINT32));
67 } else {
68 columns_.type_ = grn_ctx_get(ctx_, TABLE_NAME "." COLUMN_TYPE_NAME, -1);
69 columns_.table_ = grn_ctx_get(ctx_, TABLE_NAME "." COLUMN_TABLE_NAME, -1);
70 columns_.record_ = grn_ctx_get(ctx_, TABLE_NAME "." COLUMN_RECORD_NAME, -1);
71 }
72
73 is_enabled_recording_ = true;
74
75 DBUG_VOID_RETURN;
76 }
77
78 Operations::~Operations() {
79 MRN_DBUG_ENTER_METHOD();
80
81 GRN_OBJ_FIN(ctx_, &id_buffer_);
82 GRN_OBJ_FIN(ctx_, &text_buffer_);
83
84 DBUG_VOID_RETURN;
85 }
86
87 bool Operations::is_locked() {
88 MRN_DBUG_ENTER_METHOD();
89
90 if (grn_obj_is_locked(ctx_, table_) > 0)
91 DBUG_RETURN(true);
92
93 if (grn_obj_is_locked(ctx_, columns_.type_) > 0)
94 DBUG_RETURN(true);
95
96 if (grn_obj_is_locked(ctx_, columns_.table_) > 0)
97 DBUG_RETURN(true);
98
99 if (grn_obj_is_locked(ctx_, columns_.record_) > 0)
100 DBUG_RETURN(true);
101
102 DBUG_RETURN(false);
103 }
104
105 grn_id Operations::start(const char *type,
106 const char *table_name, size_t table_name_size) {
107 MRN_DBUG_ENTER_METHOD();
108
109 if (!is_enabled_recording_) {
110 DBUG_RETURN(GRN_ID_NIL);
111 }
112
113 grn_id id = grn_table_add(ctx_, table_, NULL, 0, NULL);
114
115 GRN_TEXT_SETS(ctx_, &text_buffer_, type);
116 grn_obj_set_value(ctx_, columns_.type_, id, &text_buffer_, GRN_OBJ_SET);
117
118 GRN_TEXT_SET(ctx_, &text_buffer_, table_name, table_name_size);
119 grn_obj_set_value(ctx_, columns_.table_, id, &text_buffer_, GRN_OBJ_SET);
120
121 DBUG_RETURN(id);
122 }
123
124 void Operations::record_target(grn_id id, grn_id record_id) {
125 MRN_DBUG_ENTER_METHOD();
126
127 if (!is_enabled_recording_) {
128 DBUG_VOID_RETURN;
129 }
130
131 GRN_UINT32_SET(ctx_, &id_buffer_, record_id);
132 grn_obj_set_value(ctx_, columns_.record_, id, &id_buffer_, GRN_OBJ_SET);
133
134 DBUG_VOID_RETURN;
135 }
136
137 void Operations::finish(grn_id id) {
138 MRN_DBUG_ENTER_METHOD();
139
140 if (!is_enabled_recording_) {
141 DBUG_VOID_RETURN;
142 }
143
144 grn_table_delete_by_id(ctx_, table_, id);
145
146 DBUG_VOID_RETURN;
147 }
148
149 void Operations::enable_recording() {
150 MRN_DBUG_ENTER_METHOD();
151
152 is_enabled_recording_ = true;
153
154 DBUG_VOID_RETURN;
155 }
156
157 void Operations::disable_recording() {
158 MRN_DBUG_ENTER_METHOD();
159
160 is_enabled_recording_ = false;
161
162 DBUG_VOID_RETURN;
163 }
164
165 grn_hash *Operations::collect_processing_table_names() {
166 MRN_DBUG_ENTER_METHOD();
167
168 grn_hash *table_names =
169 grn_hash_create(ctx_, NULL, GRN_TABLE_MAX_KEY_SIZE, 0,
170 GRN_OBJ_TABLE_HASH_KEY | GRN_OBJ_KEY_VAR_SIZE);
171
172 grn_table_cursor *cursor;
173 cursor = grn_table_cursor_open(ctx_, table_, NULL, 0, NULL, 0, 0, -1, 0);
174 if (!cursor) {
175 GRN_LOG(ctx_, GRN_LOG_NOTICE,
176 "[operations] failed to open cursor: %s",
177 ctx_->errbuf);
178 DBUG_RETURN(table_names);
179 }
180
181 grn_id id;
182 while ((id = grn_table_cursor_next(ctx_, cursor))) {
183 GRN_BULK_REWIND(&text_buffer_);
184 grn_obj_get_value(ctx_, columns_.table_, id, &text_buffer_);
185 if (GRN_TEXT_LEN(&text_buffer_) > 0) {
186 grn_hash_add(ctx_, table_names,
187 GRN_TEXT_VALUE(&text_buffer_),
188 GRN_TEXT_LEN(&text_buffer_),
189 NULL,
190 NULL);
191 }
192 }
193 grn_table_cursor_close(ctx_, cursor);
194
195 DBUG_RETURN(table_names);
196 }
197
198 int Operations::repair(const char *table_name, size_t table_name_size) {
199 MRN_DBUG_ENTER_METHOD();
200
201 int error = 0;
202
203 grn_table_cursor *cursor;
204 cursor = grn_table_cursor_open(ctx_, table_, NULL, 0, NULL, 0, 0, -1, 0);
205 if (!cursor) {
206 error = HA_ERR_CRASHED_ON_USAGE;
207 if (ctx_->rc) {
208 my_message(error, ctx_->errbuf, MYF(0));
209 } else {
210 my_message(error,
211 "mroonga: repair: "
212 "failed to open cursor for operations table",
213 MYF(0));
214 }
215 DBUG_RETURN(error);
216 }
217
218 grn_obj *target_table = grn_ctx_get(ctx_, table_name, table_name_size);
219 if (!target_table) {
220 GRN_LOG(ctx_, GRN_LOG_WARNING,
221 "table doesn't exist for auto repair: <%.*s>",
222 static_cast<int>(table_name_size), table_name);
223 }
224
225 grn_id id;
226 while ((id = grn_table_cursor_next(ctx_, cursor))) {
227 GRN_BULK_REWIND(&text_buffer_);
228 grn_obj_get_value(ctx_, columns_.table_, id, &text_buffer_);
229 if (!((static_cast<size_t>(GRN_TEXT_LEN(&text_buffer_)) ==
230 table_name_size) &&
231 memcmp(GRN_TEXT_VALUE(&text_buffer_),
232 table_name,
233 table_name_size) == 0)) {
234 continue;
235 }
236
237 if (!target_table) {
238 grn_rc rc = grn_table_cursor_delete(ctx_, cursor);
239 if (rc != GRN_SUCCESS) {
240 GRN_BULK_REWIND(&text_buffer_);
241 grn_obj_get_value(ctx_, columns_.type_, id, &text_buffer_);
242 GRN_TEXT_PUTC(ctx_, &text_buffer_, '\0');
243 char error_message[MRN_MESSAGE_BUFFER_SIZE];
244 snprintf(error_message, MRN_MESSAGE_BUFFER_SIZE,
245 "mroonga: repair: failed to delete an orphan operation: "
246 "[%u]: <%.*s>[%s]: <%s>(%d)",
247 id,
248 static_cast<int>(table_name_size), table_name,
249 GRN_TEXT_VALUE(&text_buffer_),
250 ctx_->errbuf,
251 rc);
252 my_message(error, error_message, MYF(0));
253 break;
254 }
255 continue;
256 }
257
258 GRN_BULK_REWIND(&id_buffer_);
259 grn_obj_get_value(ctx_, columns_.record_, id, &id_buffer_);
260 grn_id record_id = GRN_UINT32_VALUE(&id_buffer_);
261 if (record_id == GRN_ID_NIL) {
262 grn_rc rc = grn_table_cursor_delete(ctx_, cursor);
263 if (rc != GRN_SUCCESS) {
264 GRN_BULK_REWIND(&text_buffer_);
265 grn_obj_get_value(ctx_, columns_.type_, id, &text_buffer_);
266 GRN_TEXT_PUTC(ctx_, &text_buffer_, '\0');
267 char error_message[MRN_MESSAGE_BUFFER_SIZE];
268 snprintf(error_message, MRN_MESSAGE_BUFFER_SIZE,
269 "mroonga: repair: "
270 "failed to delete an operation that has no related record: "
271 "[%u]: <%.*s>[%s]: <%s>(%d)",
272 id,
273 static_cast<int>(table_name_size), table_name,
274 GRN_TEXT_VALUE(&text_buffer_),
275 ctx_->errbuf,
276 rc);
277 my_message(error, error_message, MYF(0));
278 break;
279 }
280 continue;
281 }
282
283 GRN_BULK_REWIND(&text_buffer_);
284 grn_obj_get_value(ctx_, columns_.type_, id, &text_buffer_);
285 GRN_TEXT_PUTC(ctx_, &text_buffer_, '\0');
286 if (strcmp(GRN_TEXT_VALUE(&text_buffer_), "write") == 0 ||
287 strcmp(GRN_TEXT_VALUE(&text_buffer_), "delete") == 0) {
288 grn_rc rc = grn_table_delete_by_id(ctx_, target_table, record_id);
289 if (rc != GRN_SUCCESS) {
290 error = HA_ERR_CRASHED_ON_USAGE;
291 char error_message[MRN_MESSAGE_BUFFER_SIZE];
292 snprintf(error_message, MRN_MESSAGE_BUFFER_SIZE,
293 "mroonga: repair: failed to delete an incomplete record: "
294 "[%u]: <%.*s>[%u]: <%s>(%d)",
295 id,
296 static_cast<int>(table_name_size), table_name,
297 record_id,
298 ctx_->errbuf,
299 rc);
300 my_message(error, error_message, MYF(0));
301 break;
302 }
303
304 rc = grn_table_cursor_delete(ctx_, cursor);
305 if (rc != GRN_SUCCESS) {
306 error = HA_ERR_CRASHED_ON_USAGE;
307 char error_message[MRN_MESSAGE_BUFFER_SIZE];
308 snprintf(error_message, MRN_MESSAGE_BUFFER_SIZE,
309 "mroonga: repair: failed to delete an incomplete operation: "
310 "[%u]: <%.*s>[%u][%s]: <%s>(%d)",
311 id,
312 static_cast<int>(table_name_size), table_name,
313 record_id,
314 GRN_TEXT_VALUE(&text_buffer_),
315 ctx_->errbuf,
316 rc);
317 my_message(error, error_message, MYF(0));
318 break;
319 }
320 } else if (strcmp(GRN_TEXT_VALUE(&text_buffer_), "update") == 0) {
321 error = HA_ERR_CRASHED_ON_USAGE;
322 my_message(error,
323 "mroonga: repair: can't recover from crash while updating",
324 MYF(0));
325 break;
326 } else {
327 error = HA_ERR_CRASHED_ON_USAGE;
328 char error_message[MRN_MESSAGE_BUFFER_SIZE];
329 snprintf(error_message, MRN_MESSAGE_BUFFER_SIZE,
330 "mroonga: repair: unknown operation type: "
331 "[%u]: <%.*s>[%u]: <%s>",
332 id,
333 static_cast<int>(table_name_size), table_name,
334 record_id,
335 GRN_TEXT_VALUE(&text_buffer_));
336 my_message(error, error_message, MYF(0));
337 break;
338 }
339 }
340 grn_table_cursor_close(ctx_, cursor);
341
342 DBUG_RETURN(error);
343 }
344
345 int Operations::clear(const char *table_name, size_t table_name_size) {
346 MRN_DBUG_ENTER_METHOD();
347
348 int error = 0;
349
350 grn_table_cursor *cursor;
351 cursor = grn_table_cursor_open(ctx_, table_, NULL, 0, NULL, 0, 0, -1, 0);
352 if (!cursor) {
353 error = HA_ERR_CRASHED_ON_USAGE;
354 if (ctx_->rc) {
355 my_message(error, ctx_->errbuf, MYF(0));
356 } else {
357 my_message(error,
358 "mroonga: clear: "
359 "failed to open cursor for operations table",
360 MYF(0));
361 }
362 DBUG_RETURN(error);
363 }
364
365 grn_id id;
366 while ((id = grn_table_cursor_next(ctx_, cursor))) {
367 GRN_BULK_REWIND(&text_buffer_);
368 grn_obj_get_value(ctx_, columns_.table_, id, &text_buffer_);
369 if ((static_cast<size_t>(GRN_TEXT_LEN(&text_buffer_)) ==
370 table_name_size) &&
371 memcmp(GRN_TEXT_VALUE(&text_buffer_),
372 table_name,
373 table_name_size) == 0) {
374 grn_rc rc = grn_table_cursor_delete(ctx_, cursor);
375 if (rc != GRN_SUCCESS) {
376 error = HA_ERR_CRASHED_ON_USAGE;
377 GRN_BULK_REWIND(&id_buffer_);
378 grn_obj_get_value(ctx_, columns_.record_, id, &id_buffer_);
379 GRN_BULK_REWIND(&text_buffer_);
380 grn_obj_get_value(ctx_, columns_.type_, id, &text_buffer_);
381 GRN_TEXT_PUTC(ctx_, &text_buffer_, '\0');
382 char error_message[MRN_MESSAGE_BUFFER_SIZE];
383 snprintf(error_message, MRN_MESSAGE_BUFFER_SIZE,
384 "mroonga: clear: failed to delete an operation: "
385 "[%u]: <%.*s>[%u][%s]: <%s>(%d)",
386 id,
387 static_cast<int>(table_name_size), table_name,
388 GRN_UINT32_VALUE(&id_buffer_),
389 GRN_TEXT_VALUE(&text_buffer_),
390 ctx_->errbuf,
391 rc);
392 my_message(error, error_message, MYF(0));
393 break;
394 }
395 }
396 }
397 grn_table_cursor_close(ctx_, cursor);
398
399 DBUG_RETURN(error);
400 }
401}
402