1
2// vim:sw=2:ai
3
4/*
5 * Copyright (C) 2010-2011 DeNA Co.,Ltd.. All rights reserved.
6 * Copyright (C) 2011-2017 Kentoku SHIBA
7 * See COPYRIGHT.txt for details.
8 */
9
10#include <my_global.h>
11#include "mysql_version.h"
12#include "hs_compat.h"
13#if MYSQL_VERSION_ID < 50500
14#include "mysql_priv.h"
15#include <mysql/plugin.h>
16#else
17#include "sql_priv.h"
18#include "probes_mysql.h"
19#include "sql_class.h"
20#endif
21
22#include "hstcpcli.hpp"
23#include "auto_file.hpp"
24#include "string_util.hpp"
25#include "auto_addrinfo.hpp"
26#include "escape.hpp"
27#include "util.hpp"
28
29/* TODO */
30#if !defined(__linux__) && !defined(__FreeBSD__) && !defined(MSG_NOSIGNAL)
31#define MSG_NOSIGNAL 0
32#endif
33
34#define DBG(x)
35
36namespace dena {
37
38hstresult::hstresult()
39{
40 SPD_INIT_DYNAMIC_ARRAY2(&flds, sizeof(string_ref), NULL, 16, 16,
41 MYF(MY_WME));
42}
43
44hstresult::~hstresult()
45{
46 delete_dynamic(&flds);
47}
48
49struct hstcpcli : public hstcpcli_i, private noncopyable {
50 hstcpcli(const socket_args& args);
51 virtual ~hstcpcli();
52 virtual void close();
53 virtual int reconnect();
54 virtual bool stable_point();
55 virtual void request_buf_open_index(size_t pst_id, const char *dbn,
56 const char *tbl, const char *idx, const char *retflds, const char *filflds);
57 virtual void request_buf_auth(const char *secret, const char *typ);
58 virtual void request_buf_exec_generic(size_t pst_id, const string_ref& op,
59 const string_ref *kvs, size_t kvslen, uint32 limit, uint32 skip,
60 const string_ref& mod_op, const string_ref *mvs, size_t mvslen,
61 const hstcpcli_filter *fils, size_t filslen, int invalues_keypart,
62 const string_ref *invalues, size_t invalueslen);
63 virtual size_t request_buf_append(const char *start, const char *finish);
64 virtual void request_reset();
65 virtual int request_send();
66 virtual int response_recv(size_t& num_flds_r);
67 virtual int get_result(hstresult& result);
68 virtual const string_ref *get_next_row();
69 virtual const string_ref *get_next_row_from_result(hstresult& result);
70 virtual void response_buf_remove();
71 virtual int get_error_code();
72 virtual String& get_error();
73 virtual void clear_error();
74 virtual int set_timeout(int send_timeout, int recv_timeout);
75 virtual size_t get_num_req_bufd() { return num_req_bufd; }
76 virtual size_t get_num_req_sent() { return num_req_sent; }
77 virtual size_t get_num_req_rcvd() { return num_req_rcvd; }
78 virtual size_t get_response_end_offset() { return response_end_offset; }
79 virtual const char *get_readbuf_begin() { return readbuf.begin(); }
80 virtual const char *get_readbuf_end() { return readbuf.end(); }
81 virtual const char *get_writebuf_begin() { return writebuf.begin(); }
82 virtual size_t get_writebuf_size() { return writebuf.size(); }
83 virtual void write_error_to_log(const char *func_name, const char *file_name,
84 ulong line_no);
85 private:
86 int read_more();
87 int set_error(int code, const String& str);
88 int set_error(int code, const char *str);
89 private:
90 auto_file fd;
91 socket_args sargs;
92 string_buffer readbuf;
93 string_buffer writebuf;
94 size_t response_end_offset; /* incl newline */
95 size_t cur_row_offset;
96 size_t num_flds;
97 size_t num_req_bufd; /* buffered but not yet sent */
98 size_t num_req_sent; /* sent but not yet received */
99 size_t num_req_rcvd; /* received but not yet removed */
100 int error_code;
101 String error_str;
102 DYNAMIC_ARRAY flds;
103 int errno_buf;
104};
105
106hstcpcli::hstcpcli(const socket_args& args)
107 : sargs(args), response_end_offset(0), cur_row_offset(0), num_flds(0),
108 num_req_bufd(0), num_req_sent(0), num_req_rcvd(0), error_code(0), errno_buf(0)
109{
110 String err;
111 SPD_INIT_DYNAMIC_ARRAY2(&flds, sizeof(string_ref), NULL, 16, 16, MYF(MY_WME));
112 if (socket_connect(fd, sargs, err) != 0) {
113 set_error(-1, err);
114 }
115}
116
117hstcpcli::~hstcpcli()
118{
119 delete_dynamic(&flds);
120}
121
122void
123hstcpcli::close()
124{
125 fd.close();
126 readbuf.clear();
127 writebuf.clear();
128 response_end_offset = 0;
129 cur_row_offset = 0;
130 num_flds = 0;
131 num_req_bufd = 0;
132 num_req_sent = 0;
133 num_req_rcvd = 0;
134}
135
136int
137hstcpcli::reconnect()
138{
139 clear_error();
140 close();
141 String err;
142 if (socket_connect(fd, sargs, err) != 0) {
143 set_error(-1, err);
144 }
145 return error_code;
146}
147
148int
149hstcpcli::set_timeout(int send_timeout, int recv_timeout)
150{
151 String err;
152 sargs.send_timeout = send_timeout;
153 sargs.recv_timeout = recv_timeout;
154 if (socket_set_timeout(fd, sargs, err) != 0) {
155 set_error(-1, err);
156 }
157 return error_code;
158}
159
160bool
161hstcpcli::stable_point()
162{
163 /* returns true if cli can send a new request */
164 return fd.get() >= 0 && num_req_bufd == 0 && num_req_sent == 0 &&
165 num_req_rcvd == 0 && response_end_offset == 0;
166}
167
168int
169hstcpcli::get_error_code()
170{
171 return error_code;
172}
173
174String&
175hstcpcli::get_error()
176{
177 return error_str;
178}
179
180int
181hstcpcli::read_more()
182{
183 const size_t block_size = 4096; // FIXME
184 char *const wp = readbuf.make_space(block_size);
185 int rlen;
186 errno = 0;
187 while ((rlen = read(fd.get(), wp, block_size)) <= 0) {
188 errno_buf = errno;
189 if (rlen < 0) {
190 if (errno == EINTR || errno == EAGAIN)
191 {
192 errno = 0;
193 continue;
194 }
195 error_str = String("read: failed", &my_charset_bin);
196 } else {
197 error_str = String("read: eof", &my_charset_bin);
198 }
199 return rlen;
200 }
201 readbuf.space_wrote(rlen);
202 return rlen;
203}
204
205void
206hstcpcli::clear_error()
207{
208 DBG(fprintf(stderr, "CLEAR_ERROR: %d\n", error_code));
209 error_code = 0;
210 error_str.length(0);
211}
212
213int
214hstcpcli::set_error(int code, const String& str)
215{
216 DBG(fprintf(stderr, "SET_ERROR: %d\n", code));
217 error_code = code;
218 error_str = str;
219 return error_code;
220}
221
222int
223hstcpcli::set_error(int code, const char *str)
224{
225 uint32 str_len = strlen(str);
226 DBG(fprintf(stderr, "SET_ERROR: %d\n", code));
227 error_code = code;
228 error_str.length(0);
229 if (error_str.reserve(str_len + 1))
230 return 0;
231 error_str.q_append(str, str_len);
232 error_str.c_ptr_safe();
233 return error_code;
234}
235
236void
237hstcpcli::request_buf_open_index(size_t pst_id, const char *dbn,
238 const char *tbl, const char *idx, const char *retflds, const char *filflds)
239{
240/*
241 if (num_req_sent > 0 || num_req_rcvd > 0) {
242*/
243 if (num_req_rcvd > 0) {
244 close();
245 set_error(-1, "request_buf_open_index: protocol out of sync");
246 return;
247 }
248 const string_ref dbn_ref(dbn, strlen(dbn));
249 const string_ref tbl_ref(tbl, strlen(tbl));
250 const string_ref idx_ref(idx, strlen(idx));
251 const string_ref rfs_ref(retflds, strlen(retflds));
252 writebuf.append_literal("P\t");
253 append_uint32(writebuf, pst_id); // FIXME size_t ?
254 writebuf.append_literal("\t");
255 writebuf.append(dbn_ref.begin(), dbn_ref.end());
256 writebuf.append_literal("\t");
257 writebuf.append(tbl_ref.begin(), tbl_ref.end());
258 writebuf.append_literal("\t");
259 writebuf.append(idx_ref.begin(), idx_ref.end());
260 writebuf.append_literal("\t");
261 writebuf.append(rfs_ref.begin(), rfs_ref.end());
262 if (filflds != 0) {
263 const string_ref fls_ref(filflds, strlen(filflds));
264 writebuf.append_literal("\t");
265 writebuf.append(fls_ref.begin(), fls_ref.end());
266 }
267 writebuf.append_literal("\n");
268 ++num_req_bufd;
269}
270
271void
272hstcpcli::request_buf_auth(const char *secret, const char *typ)
273{
274/*
275 if (num_req_sent > 0 || num_req_rcvd > 0) {
276*/
277 if (num_req_rcvd > 0) {
278 close();
279 set_error(-1, "request_buf_auth: protocol out of sync");
280 return;
281 }
282 if (typ == 0) {
283 typ = "1";
284 }
285 const string_ref typ_ref(typ, strlen(typ));
286 const string_ref secret_ref(secret, strlen(secret));
287 writebuf.append_literal("A\t");
288 writebuf.append(typ_ref.begin(), typ_ref.end());
289 writebuf.append_literal("\t");
290 writebuf.append(secret_ref.begin(), secret_ref.end());
291 writebuf.append_literal("\n");
292 ++num_req_bufd;
293}
294
295namespace {
296
297void
298append_delim_value(string_buffer& buf, const char *start, const char *finish)
299{
300 if (start == 0) {
301 /* null */
302 const char t[] = "\t\0";
303 buf.append(t, t + 2);
304 } else {
305 /* non-null */
306 buf.append_literal("\t");
307 escape_string(buf, start, finish);
308 }
309}
310
311};
312
313void
314hstcpcli::request_buf_exec_generic(size_t pst_id, const string_ref& op,
315 const string_ref *kvs, size_t kvslen, uint32 limit, uint32 skip,
316 const string_ref& mod_op, const string_ref *mvs, size_t mvslen,
317 const hstcpcli_filter *fils, size_t filslen, int invalues_keypart,
318 const string_ref *invalues, size_t invalueslen)
319{
320/*
321 if (num_req_sent > 0 || num_req_rcvd > 0) {
322*/
323 if (num_req_rcvd > 0) {
324 close();
325 set_error(-1, "request_buf_exec_generic: protocol out of sync");
326 return;
327 }
328 append_uint32(writebuf, pst_id); // FIXME size_t ?
329 writebuf.append_literal("\t");
330 writebuf.append(op.begin(), op.end());
331 writebuf.append_literal("\t");
332 append_uint32(writebuf, kvslen); // FIXME size_t ?
333 for (size_t i = 0; i < kvslen; ++i) {
334 const string_ref& kv = kvs[i];
335 append_delim_value(writebuf, kv.begin(), kv.end());
336 }
337 if (limit != 0 || skip != 0 || invalues_keypart >= 0 ||
338 mod_op.size() != 0 || filslen != 0) {
339 /* has more option */
340 writebuf.append_literal("\t");
341 append_uint32(writebuf, limit); // FIXME size_t ?
342 if (skip != 0 || invalues_keypart >= 0 ||
343 mod_op.size() != 0 || filslen != 0) {
344 writebuf.append_literal("\t");
345 append_uint32(writebuf, skip); // FIXME size_t ?
346 }
347 if (invalues_keypart >= 0) {
348 writebuf.append_literal("\t@\t");
349 append_uint32(writebuf, invalues_keypart);
350 writebuf.append_literal("\t");
351 append_uint32(writebuf, invalueslen);
352 for (size_t i = 0; i < invalueslen; ++i) {
353 const string_ref& s = invalues[i];
354 append_delim_value(writebuf, s.begin(), s.end());
355 }
356 }
357 for (size_t i = 0; i < filslen; ++i) {
358 const hstcpcli_filter& f = fils[i];
359 writebuf.append_literal("\t");
360 writebuf.append(f.filter_type.begin(), f.filter_type.end());
361 writebuf.append_literal("\t");
362 writebuf.append(f.op.begin(), f.op.end());
363 writebuf.append_literal("\t");
364 append_uint32(writebuf, f.ff_offset);
365 append_delim_value(writebuf, f.val.begin(), f.val.end());
366 }
367 if (mod_op.size() != 0) {
368 writebuf.append_literal("\t");
369 writebuf.append(mod_op.begin(), mod_op.end());
370 for (size_t i = 0; i < mvslen; ++i) {
371 const string_ref& mv = mvs[i];
372 append_delim_value(writebuf, mv.begin(), mv.end());
373 }
374 }
375 }
376 writebuf.append_literal("\n");
377 ++num_req_bufd;
378}
379
380size_t
381hstcpcli::request_buf_append(const char *start, const char *finish)
382{
383/*
384 if (num_req_sent > 0 || num_req_rcvd > 0) {
385*/
386 if (num_req_rcvd > 0) {
387 close();
388 set_error(-1, "request_buf_append: protocol out of sync");
389 return 0;
390 }
391 const char *nl = start;
392 size_t num_req = 0;
393 while ((nl = memchr_char(nl, '\n', finish - nl))) {
394 if (nl == finish)
395 break;
396 num_req++;
397 nl++;
398 }
399 num_req++;
400 writebuf.append(start, finish);
401 if (*(finish - 1) != '\n')
402 writebuf.append_literal("\n");
403 num_req_bufd += num_req;
404 return num_req;
405}
406
407void
408hstcpcli::request_reset()
409{
410 if (num_req_bufd) {
411 writebuf.erase_front(writebuf.size());
412 num_req_bufd = 0;
413 }
414}
415
416int
417hstcpcli::request_send()
418{
419 if (error_code < 0) {
420 return error_code;
421 }
422 clear_error();
423 if (fd.get() < 0) {
424 close();
425 return set_error(-1, "write: closed");
426 }
427/*
428 if (num_req_bufd == 0 || num_req_sent > 0 || num_req_rcvd > 0) {
429*/
430 if (num_req_bufd == 0 || num_req_rcvd > 0) {
431 close();
432 return set_error(-1, "request_send: protocol out of sync");
433 }
434 const size_t wrlen = writebuf.size();
435 const ssize_t r = send(fd.get(), writebuf.begin(), wrlen, MSG_NOSIGNAL);
436 if (r <= 0) {
437 close();
438 return set_error(-1, r < 0 ? "write: failed" : "write: eof");
439 }
440 writebuf.erase_front(r);
441 if (static_cast<size_t>(r) != wrlen) {
442 close();
443 return set_error(-1, "write: incomplete");
444 }
445 num_req_sent += num_req_bufd;
446 num_req_bufd = 0;
447 DBG(fprintf(stderr, "REQSEND 0\n"));
448 return 0;
449}
450
451int
452hstcpcli::response_recv(size_t& num_flds_r)
453{
454 if (error_code < 0) {
455 return error_code;
456 }
457 clear_error();
458 if (num_req_bufd > 0 || num_req_sent == 0 || num_req_rcvd > 0 ||
459 response_end_offset != 0) {
460 close();
461 return set_error(-1, "response_recv: protocol out of sync");
462 }
463 cur_row_offset = 0;
464 num_flds_r = num_flds = 0;
465 if (fd.get() < 0) {
466 return set_error(-1, "read: closed");
467 }
468 size_t offset = 0;
469 while (true) {
470 const char *const lbegin = readbuf.begin() + offset;
471 const char *const lend = readbuf.end();
472 if (lbegin < lend)
473 {
474 const char *const nl = memchr_char(lbegin, '\n', lend - lbegin);
475 if (nl != 0) {
476 offset += (nl + 1) - lbegin;
477 break;
478 }
479 offset += lend - lbegin;
480 }
481 if (read_more() <= 0) {
482 close();
483 error_code = -1;
484 return error_code;
485 }
486 }
487 response_end_offset = offset;
488 --num_req_sent;
489 ++num_req_rcvd;
490 char *start = readbuf.begin();
491 char *const finish = start + response_end_offset - 1;
492 const size_t resp_code = read_ui32(start, finish);
493 skip_one(start, finish);
494 num_flds_r = num_flds = read_ui32(start, finish);
495 if (resp_code != 0) {
496 skip_one(start, finish);
497 char *const err_begin = start;
498 read_token(start, finish);
499 char *const err_end = start;
500 String e = String(err_begin, (uint32)(err_end - err_begin), &my_charset_bin);
501 if (!e.length()) {
502 e = String("unknown_error", &my_charset_bin);
503 }
504 return set_error(resp_code, e);
505 }
506 cur_row_offset = start - readbuf.begin();
507 DBG(fprintf(stderr, "[%s] ro=%zu eol=%zu\n",
508 String(readbuf.begin(), readbuf.begin() + response_end_offset)
509 .c_str(),
510 cur_row_offset, response_end_offset));
511 DBG(fprintf(stderr, "RES 0\n"));
512 if (flds.max_element < num_flds)
513 {
514 if (allocate_dynamic(&flds, num_flds))
515 return set_error(-1, "out of memory");
516 }
517 flds.elements = num_flds;
518 return 0;
519}
520
521int
522hstcpcli::get_result(hstresult& result)
523{
524/*
525 readbuf.swap(result.readbuf);
526*/
527 char *const wp = result.readbuf.make_space(response_end_offset);
528 memcpy(wp, readbuf.begin(), response_end_offset);
529 result.readbuf.space_wrote(response_end_offset);
530 result.response_end_offset = response_end_offset;
531 result.num_flds = num_flds;
532 result.cur_row_offset = cur_row_offset;
533 if (result.flds.max_element < num_flds)
534 {
535 if (allocate_dynamic(&result.flds, num_flds))
536 return set_error(-1, "out of memory");
537 }
538 result.flds.elements = num_flds;
539 return 0;
540}
541
542const string_ref *
543hstcpcli::get_next_row()
544{
545 if (num_flds == 0 || flds.elements < num_flds) {
546 DBG(fprintf(stderr, "GNR NF 0\n"));
547 return 0;
548 }
549 char *start = readbuf.begin() + cur_row_offset;
550 char *const finish = readbuf.begin() + response_end_offset - 1;
551 if (start >= finish) { /* start[0] == nl */
552 DBG(fprintf(stderr, "GNR FIN 0 %p %p\n", start, finish));
553 return 0;
554 }
555 for (size_t i = 0; i < num_flds; ++i) {
556 skip_one(start, finish);
557 char *const fld_begin = start;
558 read_token(start, finish);
559 char *const fld_end = start;
560 char *wp = fld_begin;
561 if (is_null_expression(fld_begin, fld_end)) {
562 /* null */
563 ((string_ref *) flds.buffer)[i] = string_ref();
564 } else {
565 unescape_string(wp, fld_begin, fld_end); /* in-place */
566 ((string_ref *) flds.buffer)[i] = string_ref(fld_begin, wp);
567 }
568 }
569 cur_row_offset = start - readbuf.begin();
570 return (string_ref *) flds.buffer;
571}
572
573const string_ref *
574hstcpcli::get_next_row_from_result(hstresult& result)
575{
576 if (result.num_flds == 0 || result.flds.elements < result.num_flds) {
577 DBG(fprintf(stderr, "GNR NF 0\n"));
578 return 0;
579 }
580 char *start = result.readbuf.begin() + result.cur_row_offset;
581 char *const finish = result.readbuf.begin() + result.response_end_offset - 1;
582 if (start >= finish) { /* start[0] == nl */
583 DBG(fprintf(stderr, "GNR FIN 0 %p %p\n", start, finish));
584 return 0;
585 }
586 for (size_t i = 0; i < result.num_flds; ++i) {
587 skip_one(start, finish);
588 char *const fld_begin = start;
589 read_token(start, finish);
590 char *const fld_end = start;
591 char *wp = fld_begin;
592 if (is_null_expression(fld_begin, fld_end)) {
593 /* null */
594 ((string_ref *) result.flds.buffer)[i] = string_ref();
595 } else {
596 unescape_string(wp, fld_begin, fld_end); /* in-place */
597 ((string_ref *) result.flds.buffer)[i] = string_ref(fld_begin, wp);
598 }
599 }
600 result.cur_row_offset = start - result.readbuf.begin();
601 return (string_ref *) result.flds.buffer;
602}
603
604void
605hstcpcli::response_buf_remove()
606{
607 if (response_end_offset == 0) {
608 close();
609 set_error(-1, "response_buf_remove: protocol out of sync");
610 return;
611 }
612 readbuf.erase_front(response_end_offset);
613 response_end_offset = 0;
614 --num_req_rcvd;
615 cur_row_offset = 0;
616 num_flds = 0;
617}
618
619void
620hstcpcli::write_error_to_log(
621 const char *func_name,
622 const char *file_name,
623 ulong line_no
624) {
625 if (errno_buf) {
626 time_t cur_time = (time_t) time((time_t*) 0);
627 struct tm lt;
628 struct tm *l_time = localtime_r(&cur_time, &lt);
629 fprintf(stderr,
630 "%04d%02d%02d %02d:%02d:%02d [ERROR] hstcpcli: [%d][%s]"
631 " [%s][%s][%lu] errno=%d\n",
632 l_time->tm_year + 1900, l_time->tm_mon + 1, l_time->tm_mday,
633 l_time->tm_hour, l_time->tm_min, l_time->tm_sec,
634 error_code, error_str.c_ptr_safe(),
635 func_name, file_name, line_no, errno_buf);
636 }
637}
638
639hstcpcli_ptr
640hstcpcli_i::create(const socket_args& args)
641{
642 return hstcpcli_ptr(new hstcpcli(args));
643}
644
645};
646
647