1
2// vim:sw=2:ai
3
4/*
5 * Copyright (C) 2010 DeNA Co.,Ltd.. All rights reserved.
6 * See COPYRIGHT.txt for details.
7 */
8
9#include <my_global.h>
10#include <netinet/in.h>
11#include <errno.h>
12#include <poll.h>
13#include <unistd.h>
14#include <stdexcept>
15#include <signal.h>
16#include <list>
17#if __linux__
18#include <sys/epoll.h>
19#endif
20#ifdef HAVE_ALLOCA_H
21#include <alloca.h>
22#endif
23
24#include "hstcpsvr_worker.hpp"
25#include "string_buffer.hpp"
26#include "auto_ptrcontainer.hpp"
27#include "string_util.hpp"
28#include "escape.hpp"
29
30#define DBG_FD(x)
31#define DBG_TR(x)
32#define DBG_EP(x)
33#define DBG_MULTI(x)
34
35/* TODO */
36#if !defined(__linux__) && !defined(__FreeBSD__) && !defined(MSG_NOSIGNAL)
37#define MSG_NOSIGNAL 0
38#endif
39
40namespace dena {
41
42struct dbconnstate {
43 string_buffer readbuf;
44 string_buffer writebuf;
45 std::vector<prep_stmt> prep_stmts;
46 size_t resp_begin_pos;
47 size_t find_nl_pos;
48 void reset() {
49 readbuf.clear();
50 writebuf.clear();
51 prep_stmts.clear();
52 resp_begin_pos = 0;
53 find_nl_pos = 0;
54 }
55 dbconnstate() : resp_begin_pos(0), find_nl_pos(0) { }
56};
57
58struct hstcpsvr_conn;
59typedef auto_ptrcontainer< std::list<hstcpsvr_conn *> > hstcpsvr_conns_type;
60
61struct hstcpsvr_conn : public dbcallback_i {
62 public:
63 auto_file fd;
64 sockaddr_storage addr;
65 size_socket addr_len;
66 dbconnstate cstate;
67 std::string err;
68 size_t readsize;
69 bool nonblocking;
70 bool read_finished;
71 bool write_finished;
72 time_t nb_last_io;
73 hstcpsvr_conns_type::iterator conns_iter;
74 bool authorized;
75 public:
76 bool closed() const;
77 bool ok_to_close() const;
78 void reset();
79 int accept(const hstcpsvr_shared_c& cshared);
80 bool write_more(bool *more_r = 0);
81 bool read_more(bool *more_r = 0);
82 public:
83 virtual void dbcb_set_prep_stmt(size_t pst_id, const prep_stmt& v);
84 virtual const prep_stmt *dbcb_get_prep_stmt(size_t pst_id) const;
85 virtual void dbcb_resp_short(uint32_t code, const char *msg);
86 virtual void dbcb_resp_short_num(uint32_t code, uint32_t value);
87 virtual void dbcb_resp_short_num64(uint32_t code, uint64_t value);
88 virtual void dbcb_resp_begin(size_t num_flds);
89 virtual void dbcb_resp_entry(const char *fld, size_t fldlen);
90 virtual void dbcb_resp_end();
91 virtual void dbcb_resp_cancel();
92 public:
93 hstcpsvr_conn() : addr_len(sizeof(addr)), readsize(4096),
94 nonblocking(false), read_finished(false), write_finished(false),
95 nb_last_io(0), authorized(false) { }
96};
97
98bool
99hstcpsvr_conn::closed() const
100{
101 return fd.get() < 0;
102}
103
104bool
105hstcpsvr_conn::ok_to_close() const
106{
107 return write_finished || (read_finished && cstate.writebuf.size() == 0);
108}
109
110void
111hstcpsvr_conn::reset()
112{
113 addr = sockaddr_storage();
114 addr_len = sizeof(addr);
115 cstate.reset();
116 fd.reset();
117 read_finished = false;
118 write_finished = false;
119}
120
121int
122hstcpsvr_conn::accept(const hstcpsvr_shared_c& cshared)
123{
124 reset();
125 return socket_accept(cshared.listen_fd.get(), fd, cshared.sockargs, addr,
126 addr_len, err);
127}
128
129bool
130hstcpsvr_conn::write_more(bool *more_r)
131{
132 if (write_finished || cstate.writebuf.size() == 0) {
133 return false;
134 }
135 const size_t wlen = cstate.writebuf.size();
136 ssize_t len = send(fd.get(), cstate.writebuf.begin(), wlen, MSG_NOSIGNAL);
137 if (len <= 0) {
138 if (len == 0 || !nonblocking || errno != EWOULDBLOCK) {
139 cstate.writebuf.clear();
140 write_finished = true;
141 }
142 return false;
143 }
144 cstate.writebuf.erase_front(len);
145 /* FIXME: reallocate memory if too large */
146 if (more_r) {
147 *more_r = (static_cast<size_t>(len) == wlen);
148 }
149 return true;
150}
151
152bool
153hstcpsvr_conn::read_more(bool *more_r)
154{
155 if (read_finished) {
156 return false;
157 }
158 const size_t block_size = readsize > 4096 ? readsize : 4096;
159 char *wp = cstate.readbuf.make_space(block_size);
160 const ssize_t len = read(fd.get(), wp, block_size);
161 if (len <= 0) {
162 if (len == 0 || !nonblocking || errno != EWOULDBLOCK) {
163 read_finished = true;
164 }
165 return false;
166 }
167 cstate.readbuf.space_wrote(len);
168 if (more_r) {
169 *more_r = (static_cast<size_t>(len) == block_size);
170 }
171 return true;
172}
173
174void
175hstcpsvr_conn::dbcb_set_prep_stmt(size_t pst_id, const prep_stmt& v)
176{
177 if (cstate.prep_stmts.size() <= pst_id) {
178 cstate.prep_stmts.resize(pst_id + 1);
179 }
180 cstate.prep_stmts[pst_id] = v;
181}
182
183const prep_stmt *
184hstcpsvr_conn::dbcb_get_prep_stmt(size_t pst_id) const
185{
186 if (cstate.prep_stmts.size() <= pst_id) {
187 return 0;
188 }
189 return &cstate.prep_stmts[pst_id];
190}
191
192void
193hstcpsvr_conn::dbcb_resp_short(uint32_t code, const char *msg)
194{
195 write_ui32(cstate.writebuf, code);
196 const size_t msglen = strlen(msg);
197 if (msglen != 0) {
198 cstate.writebuf.append_literal("\t1\t");
199 cstate.writebuf.append(msg, msg + msglen);
200 } else {
201 cstate.writebuf.append_literal("\t1");
202 }
203 cstate.writebuf.append_literal("\n");
204}
205
206void
207hstcpsvr_conn::dbcb_resp_short_num(uint32_t code, uint32_t value)
208{
209 write_ui32(cstate.writebuf, code);
210 cstate.writebuf.append_literal("\t1\t");
211 write_ui32(cstate.writebuf, value);
212 cstate.writebuf.append_literal("\n");
213}
214
215void
216hstcpsvr_conn::dbcb_resp_short_num64(uint32_t code, uint64_t value)
217{
218 write_ui32(cstate.writebuf, code);
219 cstate.writebuf.append_literal("\t1\t");
220 write_ui64(cstate.writebuf, value);
221 cstate.writebuf.append_literal("\n");
222}
223
224void
225hstcpsvr_conn::dbcb_resp_begin(size_t num_flds)
226{
227 cstate.resp_begin_pos = cstate.writebuf.size();
228 cstate.writebuf.append_literal("0\t");
229 write_ui32(cstate.writebuf, num_flds);
230}
231
232void
233hstcpsvr_conn::dbcb_resp_entry(const char *fld, size_t fldlen)
234{
235 if (fld != 0) {
236 cstate.writebuf.append_literal("\t");
237 escape_string(cstate.writebuf, fld, fld + fldlen);
238 } else {
239 static const char t[] = "\t\0";
240 cstate.writebuf.append(t, t + 2);
241 }
242}
243
244void
245hstcpsvr_conn::dbcb_resp_end()
246{
247 cstate.writebuf.append_literal("\n");
248 cstate.resp_begin_pos = 0;
249}
250
251void
252hstcpsvr_conn::dbcb_resp_cancel()
253{
254 cstate.writebuf.resize(cstate.resp_begin_pos);
255 cstate.resp_begin_pos = 0;
256}
257
258struct hstcpsvr_worker : public hstcpsvr_worker_i, private noncopyable {
259 hstcpsvr_worker(const hstcpsvr_worker_arg& arg);
260 virtual void run();
261 private:
262 const hstcpsvr_shared_c& cshared;
263 volatile hstcpsvr_shared_v& vshared;
264 long worker_id;
265 dbcontext_ptr dbctx;
266 hstcpsvr_conns_type conns; /* conns refs dbctx */
267 time_t last_check_time;
268 std::vector<pollfd> pfds;
269 #ifdef __linux__
270 std::vector<epoll_event> events_vec;
271 auto_file epoll_fd;
272 #endif
273 bool accept_enabled;
274 int accept_balance;
275 std::vector<string_ref> invalues_work;
276 std::vector<record_filter> filters_work;
277 private:
278 int run_one_nb();
279 int run_one_ep();
280 void execute_lines(hstcpsvr_conn& conn);
281 void execute_line(char *start, char *finish, hstcpsvr_conn& conn);
282 void do_open_index(char *start, char *finish, hstcpsvr_conn& conn);
283 void do_exec_on_index(char *cmd_begin, char *cmd_end, char *start,
284 char *finish, hstcpsvr_conn& conn);
285 void do_authorization(char *start, char *finish, hstcpsvr_conn& conn);
286};
287
288hstcpsvr_worker::hstcpsvr_worker(const hstcpsvr_worker_arg& arg)
289 : cshared(*arg.cshared), vshared(*arg.vshared), worker_id(arg.worker_id),
290 dbctx(cshared.dbptr->create_context(cshared.for_write_flag)),
291 last_check_time(time(0)), accept_enabled(true), accept_balance(0)
292{
293 #ifdef __linux__
294 if (cshared.sockargs.use_epoll) {
295 epoll_fd.reset(epoll_create(10));
296 if (epoll_fd.get() < 0) {
297 fatal_abort("epoll_create");
298 }
299 epoll_event ev;
300 memset(&ev, 0, sizeof(ev));
301 ev.events = EPOLLIN;
302 ev.data.ptr = 0;
303 if (epoll_ctl(epoll_fd.get(), EPOLL_CTL_ADD, cshared.listen_fd.get(), &ev)
304 != 0) {
305 fatal_abort("epoll_ctl EPOLL_CTL_ADD");
306 }
307 events_vec.resize(10240);
308 }
309 #endif
310 accept_balance = cshared.conf.get_int("accept_balance", 0);
311}
312
313namespace {
314
315struct thr_init {
316 thr_init(const dbcontext_ptr& dc, volatile int& shutdown_flag) : dbctx(dc) {
317 dbctx->init_thread(this, shutdown_flag);
318 }
319 ~thr_init() {
320 dbctx->term_thread();
321 }
322 const dbcontext_ptr& dbctx;
323};
324
325}; // namespace
326
327void
328hstcpsvr_worker::run()
329{
330 thr_init initobj(dbctx, vshared.shutdown);
331
332 #ifdef __linux__
333 if (cshared.sockargs.use_epoll) {
334 while (!vshared.shutdown && dbctx->check_alive()) {
335 run_one_ep();
336 }
337 } else if (cshared.sockargs.nonblocking) {
338 while (!vshared.shutdown && dbctx->check_alive()) {
339 run_one_nb();
340 }
341 } else {
342 /* UNUSED */
343 fatal_abort("run_one");
344 }
345 #else
346 while (!vshared.shutdown && dbctx->check_alive()) {
347 run_one_nb();
348 }
349 #endif
350}
351
352int
353hstcpsvr_worker::run_one_nb()
354{
355 size_t nfds = 0;
356 /* CLIENT SOCKETS */
357 for (hstcpsvr_conns_type::const_iterator i = conns.begin();
358 i != conns.end(); ++i) {
359 if (pfds.size() <= nfds) {
360 pfds.resize(nfds + 1);
361 }
362 pollfd& pfd = pfds[nfds++];
363 pfd.fd = (*i)->fd.get();
364 short ev = 0;
365 if ((*i)->cstate.writebuf.size() != 0) {
366 ev = POLLOUT;
367 } else {
368 ev = POLLIN;
369 }
370 pfd.events = pfd.revents = ev;
371 }
372 /* LISTENER */
373 {
374 const size_t cpt = cshared.nb_conn_per_thread;
375 const short ev = (cpt > nfds) ? POLLIN : 0;
376 if (pfds.size() <= nfds) {
377 pfds.resize(nfds + 1);
378 }
379 pollfd& pfd = pfds[nfds++];
380 pfd.fd = cshared.listen_fd.get();
381 pfd.events = pfd.revents = ev;
382 }
383 /* POLL */
384 const int npollev = poll(&pfds[0], nfds, 1 * 1000);
385 dbctx->set_statistics(conns.size(), npollev);
386 const time_t now = time(0);
387 size_t j = 0;
388 const short mask_in = ~POLLOUT;
389 const short mask_out = POLLOUT | POLLERR | POLLHUP | POLLNVAL;
390 /* READ */
391 for (hstcpsvr_conns_type::iterator i = conns.begin(); i != conns.end();
392 ++i, ++j) {
393 pollfd& pfd = pfds[j];
394 if ((pfd.revents & mask_in) == 0) {
395 continue;
396 }
397 hstcpsvr_conn& conn = **i;
398 if (conn.read_more()) {
399 if (conn.cstate.readbuf.size() > 0) {
400 const char ch = conn.cstate.readbuf.begin()[0];
401 if (ch == 'Q') {
402 vshared.shutdown = 1;
403 } else if (ch == '/') {
404 conn.cstate.readbuf.clear();
405 conn.cstate.find_nl_pos = 0;
406 conn.cstate.writebuf.clear();
407 conn.read_finished = true;
408 conn.write_finished = true;
409 }
410 }
411 conn.nb_last_io = now;
412 }
413 }
414 /* EXECUTE */
415 j = 0;
416 for (hstcpsvr_conns_type::iterator i = conns.begin(); i != conns.end();
417 ++i, ++j) {
418 pollfd& pfd = pfds[j];
419 if ((pfd.revents & mask_in) == 0 || (*i)->cstate.readbuf.size() == 0) {
420 continue;
421 }
422 execute_lines(**i);
423 }
424 /* COMMIT */
425 dbctx->unlock_tables_if();
426 const bool commit_error = dbctx->get_commit_error();
427 dbctx->clear_error();
428 /* WRITE/CLOSE */
429 j = 0;
430 for (hstcpsvr_conns_type::iterator i = conns.begin(); i != conns.end();
431 ++j) {
432 pollfd& pfd = pfds[j];
433 hstcpsvr_conn& conn = **i;
434 hstcpsvr_conns_type::iterator icur = i;
435 ++i;
436 if (commit_error) {
437 conn.reset();
438 continue;
439 }
440 if ((pfd.revents & (mask_out | mask_in)) != 0) {
441 if (conn.write_more()) {
442 conn.nb_last_io = now;
443 }
444 }
445 if (cshared.sockargs.timeout != 0 &&
446 conn.nb_last_io + cshared.sockargs.timeout < now) {
447 conn.reset();
448 }
449 if (conn.closed() || conn.ok_to_close()) {
450 conns.erase_ptr(icur);
451 }
452 }
453 /* ACCEPT */
454 {
455 pollfd& pfd = pfds[nfds - 1];
456 if ((pfd.revents & mask_in) != 0) {
457 std::auto_ptr<hstcpsvr_conn> c(new hstcpsvr_conn());
458 c->nonblocking = true;
459 c->readsize = cshared.readsize;
460 c->accept(cshared);
461 if (c->fd.get() >= 0) {
462 if (fcntl(c->fd.get(), F_SETFL, O_NONBLOCK) != 0) {
463 fatal_abort("F_SETFL O_NONBLOCK");
464 }
465 c->nb_last_io = now;
466 conns.push_back_ptr(c);
467 } else {
468 /* errno == 11 (EAGAIN) is not a fatal error. */
469 DENA_VERBOSE(100, fprintf(stderr,
470 "accept failed: errno=%d (not fatal)\n", errno));
471 }
472 }
473 }
474 DENA_VERBOSE(30, fprintf(stderr, "nb: %p nfds=%zu cns=%zu\n", this, nfds,
475 conns.size()));
476 if (conns.empty()) {
477 dbctx->close_tables_if();
478 }
479 dbctx->set_statistics(conns.size(), 0);
480 return 0;
481}
482
483#ifdef __linux__
484int
485hstcpsvr_worker::run_one_ep()
486{
487 epoll_event *const events = &events_vec[0];
488 const size_t num_events = events_vec.size();
489 const time_t now = time(0);
490 size_t in_count = 0, out_count = 0, accept_count = 0;
491 int nfds = epoll_wait(epoll_fd.get(), events, num_events, 1000);
492 /* READ/ACCEPT */
493 dbctx->set_statistics(conns.size(), nfds);
494 for (int i = 0; i < nfds; ++i) {
495 epoll_event& ev = events[i];
496 if ((ev.events & EPOLLIN) == 0) {
497 continue;
498 }
499 hstcpsvr_conn *const conn = static_cast<hstcpsvr_conn *>(ev.data.ptr);
500 if (conn == 0) {
501 /* listener */
502 ++accept_count;
503 DBG_EP(fprintf(stderr, "IN listener\n"));
504 std::auto_ptr<hstcpsvr_conn> c(new hstcpsvr_conn());
505 c->nonblocking = true;
506 c->readsize = cshared.readsize;
507 c->accept(cshared);
508 if (c->fd.get() >= 0) {
509 if (fcntl(c->fd.get(), F_SETFL, O_NONBLOCK) != 0) {
510 fatal_abort("F_SETFL O_NONBLOCK");
511 }
512 epoll_event cev;
513 memset(&cev, 0, sizeof(cev));
514 cev.events = EPOLLIN | EPOLLOUT | EPOLLET;
515 cev.data.ptr = c.get();
516 c->nb_last_io = now;
517 const int fd = c->fd.get();
518 conns.push_back_ptr(c);
519 conns.back()->conns_iter = --conns.end();
520 if (epoll_ctl(epoll_fd.get(), EPOLL_CTL_ADD, fd, &cev) != 0) {
521 fatal_abort("epoll_ctl EPOLL_CTL_ADD");
522 }
523 } else {
524 DENA_VERBOSE(100, fprintf(stderr,
525 "accept failed: errno=%d (not fatal)\n", errno));
526 }
527 } else {
528 /* client connection */
529 ++in_count;
530 DBG_EP(fprintf(stderr, "IN client\n"));
531 bool more_data = false;
532 while (conn->read_more(&more_data)) {
533 DBG_EP(fprintf(stderr, "IN client read_more\n"));
534 conn->nb_last_io = now;
535 if (!more_data) {
536 break;
537 }
538 }
539 }
540 }
541 /* EXECUTE */
542 for (int i = 0; i < nfds; ++i) {
543 epoll_event& ev = events[i];
544 hstcpsvr_conn *const conn = static_cast<hstcpsvr_conn *>(ev.data.ptr);
545 if ((ev.events & EPOLLIN) == 0 || conn == 0 ||
546 conn->cstate.readbuf.size() == 0) {
547 continue;
548 }
549 const char ch = conn->cstate.readbuf.begin()[0];
550 if (ch == 'Q') {
551 vshared.shutdown = 1;
552 } else if (ch == '/') {
553 conn->cstate.readbuf.clear();
554 conn->cstate.find_nl_pos = 0;
555 conn->cstate.writebuf.clear();
556 conn->read_finished = true;
557 conn->write_finished = true;
558 } else {
559 execute_lines(*conn);
560 }
561 }
562 /* COMMIT */
563 dbctx->unlock_tables_if();
564 const bool commit_error = dbctx->get_commit_error();
565 dbctx->clear_error();
566 /* WRITE */
567 for (int i = 0; i < nfds; ++i) {
568 epoll_event& ev = events[i];
569 hstcpsvr_conn *const conn = static_cast<hstcpsvr_conn *>(ev.data.ptr);
570 if (commit_error && conn != 0) {
571 conn->reset();
572 continue;
573 }
574 if ((ev.events & EPOLLOUT) == 0) {
575 continue;
576 }
577 ++out_count;
578 if (conn == 0) {
579 /* listener */
580 DBG_EP(fprintf(stderr, "OUT listener\n"));
581 } else {
582 /* client connection */
583 DBG_EP(fprintf(stderr, "OUT client\n"));
584 bool more_data = false;
585 while (conn->write_more(&more_data)) {
586 DBG_EP(fprintf(stderr, "OUT client write_more\n"));
587 conn->nb_last_io = now;
588 if (!more_data) {
589 break;
590 }
591 }
592 }
593 }
594 /* CLOSE */
595 for (int i = 0; i < nfds; ++i) {
596 epoll_event& ev = events[i];
597 hstcpsvr_conn *const conn = static_cast<hstcpsvr_conn *>(ev.data.ptr);
598 if (conn != 0 && conn->ok_to_close()) {
599 DBG_EP(fprintf(stderr, "CLOSE close\n"));
600 conns.erase_ptr(conn->conns_iter);
601 }
602 }
603 /* TIMEOUT & cleanup */
604 if (last_check_time + 10 < now) {
605 for (hstcpsvr_conns_type::iterator i = conns.begin();
606 i != conns.end(); ) {
607 hstcpsvr_conns_type::iterator icur = i;
608 ++i;
609 if (cshared.sockargs.timeout != 0 &&
610 (*icur)->nb_last_io + cshared.sockargs.timeout < now) {
611 conns.erase_ptr((*icur)->conns_iter);
612 }
613 }
614 last_check_time = now;
615 DENA_VERBOSE(20, fprintf(stderr, "ep: %p nfds=%d cns=%zu\n", this, nfds,
616 conns.size()));
617 }
618 DENA_VERBOSE(30, fprintf(stderr, "%p in=%zu out=%zu ac=%zu, cns=%zu\n",
619 this, in_count, out_count, accept_count, conns.size()));
620 if (conns.empty()) {
621 dbctx->close_tables_if();
622 }
623 /* STATISTICS */
624 const size_t num_conns = conns.size();
625 dbctx->set_statistics(num_conns, 0);
626 /* ENABLE/DISABLE ACCEPT */
627 if (accept_balance != 0) {
628 cshared.thread_num_conns[worker_id] = num_conns;
629 size_t total_num_conns = 0;
630 for (long i = 0; i < cshared.num_threads; ++i) {
631 total_num_conns += cshared.thread_num_conns[i];
632 }
633 bool e_acc = false;
634 if (num_conns < 10 ||
635 total_num_conns * 2 > num_conns * cshared.num_threads) {
636 e_acc = true;
637 }
638 epoll_event ev;
639 memset(&ev, 0, sizeof(ev));
640 ev.events = EPOLLIN;
641 ev.data.ptr = 0;
642 if (e_acc == accept_enabled) {
643 } else if (e_acc) {
644 if (epoll_ctl(epoll_fd.get(), EPOLL_CTL_ADD, cshared.listen_fd.get(), &ev)
645 != 0) {
646 fatal_abort("epoll_ctl EPOLL_CTL_ADD");
647 }
648 } else {
649 if (epoll_ctl(epoll_fd.get(), EPOLL_CTL_DEL, cshared.listen_fd.get(), &ev)
650 != 0) {
651 fatal_abort("epoll_ctl EPOLL_CTL_ADD");
652 }
653 }
654 accept_enabled = e_acc;
655 }
656 return 0;
657}
658#endif
659
660void
661hstcpsvr_worker::execute_lines(hstcpsvr_conn& conn)
662{
663 DBG_MULTI(int cnt = 0);
664 dbconnstate& cstate = conn.cstate;
665 char *buf_end = cstate.readbuf.end();
666 char *line_begin = cstate.readbuf.begin();
667 char *find_pos = line_begin + cstate.find_nl_pos;
668 while (true) {
669 char *const nl = memchr_char(find_pos, '\n', buf_end - find_pos);
670 if (nl == 0) {
671 break;
672 }
673 char *const lf = (line_begin != nl && nl[-1] == '\r') ? nl - 1 : nl;
674 DBG_MULTI(cnt++);
675 execute_line(line_begin, lf, conn);
676 find_pos = line_begin = nl + 1;
677 }
678 cstate.readbuf.erase_front(line_begin - cstate.readbuf.begin());
679 cstate.find_nl_pos = cstate.readbuf.size();
680 DBG_MULTI(fprintf(stderr, "cnt=%d\n", cnt));
681}
682
683void
684hstcpsvr_worker::execute_line(char *start, char *finish, hstcpsvr_conn& conn)
685{
686 /* safe to modify, safe to dereference 'finish' */
687 char *const cmd_begin = start;
688 read_token(start, finish);
689 char *const cmd_end = start;
690 skip_one(start, finish);
691 if (cmd_begin == cmd_end) {
692 return conn.dbcb_resp_short(2, "cmd");
693 }
694 if (cmd_begin + 1 == cmd_end) {
695 if (cmd_begin[0] == 'P') {
696 if (cshared.require_auth && !conn.authorized) {
697 return conn.dbcb_resp_short(3, "unauth");
698 }
699 return do_open_index(start, finish, conn);
700 }
701 if (cmd_begin[0] == 'A') {
702 return do_authorization(start, finish, conn);
703 }
704 }
705 if (cmd_begin[0] >= '0' && cmd_begin[0] <= '9') {
706 if (cshared.require_auth && !conn.authorized) {
707 return conn.dbcb_resp_short(3, "unauth");
708 }
709 return do_exec_on_index(cmd_begin, cmd_end, start, finish, conn);
710 }
711 return conn.dbcb_resp_short(2, "cmd");
712}
713
714void
715hstcpsvr_worker::do_open_index(char *start, char *finish, hstcpsvr_conn& conn)
716{
717 const size_t pst_id = read_ui32(start, finish);
718 skip_one(start, finish);
719 /* dbname */
720 char *const dbname_begin = start;
721 read_token(start, finish);
722 char *const dbname_end = start;
723 skip_one(start, finish);
724 /* tblname */
725 char *const tblname_begin = start;
726 read_token(start, finish);
727 char *const tblname_end = start;
728 skip_one(start, finish);
729 /* idxname */
730 char *const idxname_begin = start;
731 read_token(start, finish);
732 char *const idxname_end = start;
733 skip_one(start, finish);
734 /* retfields */
735 char *const retflds_begin = start;
736 read_token(start, finish);
737 char *const retflds_end = start;
738 skip_one(start, finish);
739 /* filfields */
740 char *const filflds_begin = start;
741 read_token(start, finish);
742 char *const filflds_end = start;
743 dbname_end[0] = 0;
744 tblname_end[0] = 0;
745 idxname_end[0] = 0;
746 retflds_end[0] = 0;
747 filflds_end[0] = 0;
748 cmd_open_args args;
749 args.pst_id = pst_id;
750 args.dbn = dbname_begin;
751 args.tbl = tblname_begin;
752 args.idx = idxname_begin;
753 args.retflds = retflds_begin;
754 args.filflds = filflds_begin;
755 return dbctx->cmd_open(conn, args);
756}
757
758void
759hstcpsvr_worker::do_exec_on_index(char *cmd_begin, char *cmd_end, char *start,
760 char *finish, hstcpsvr_conn& conn)
761{
762 cmd_exec_args args;
763 const size_t pst_id = read_ui32(cmd_begin, cmd_end);
764 if (pst_id >= conn.cstate.prep_stmts.size()) {
765 return conn.dbcb_resp_short(2, "stmtnum");
766 }
767 args.pst = &conn.cstate.prep_stmts[pst_id];
768 char *const op_begin = start;
769 read_token(start, finish);
770 char *const op_end = start;
771 args.op = string_ref(op_begin, op_end);
772 skip_one(start, finish);
773 const uint32_t fldnum = read_ui32(start, finish);
774 string_ref *const flds = DENA_ALLOCA_ALLOCATE(string_ref, fldnum);
775 auto_alloca_free<string_ref> flds_autofree(flds);
776 args.kvals = flds;
777 args.kvalslen = fldnum;
778 for (size_t i = 0; i < fldnum; ++i) {
779 skip_one(start, finish);
780 char *const f_begin = start;
781 read_token(start, finish);
782 char *const f_end = start;
783 if (is_null_expression(f_begin, f_end)) {
784 /* null */
785 flds[i] = string_ref();
786 } else {
787 /* non-null */
788 char *wp = f_begin;
789 unescape_string(wp, f_begin, f_end);
790 flds[i] = string_ref(f_begin, wp - f_begin);
791 }
792 }
793 skip_one(start, finish);
794 args.limit = read_ui32(start, finish);
795 skip_one(start, finish);
796 args.skip = read_ui32(start, finish);
797 if (start == finish) {
798 /* simple query */
799 return dbctx->cmd_exec(conn, args);
800 }
801 /* has more options */
802 skip_one(start, finish);
803 /* in-clause */
804 if (start[0] == '@') {
805 read_token(start, finish); /* '@' */
806 skip_one(start, finish);
807 args.invalues_keypart = read_ui32(start, finish);
808 skip_one(start, finish);
809 args.invalueslen = read_ui32(start, finish);
810 if (args.invalueslen <= 0) {
811 return conn.dbcb_resp_short(2, "invalueslen");
812 }
813 if (invalues_work.size() < args.invalueslen) {
814 invalues_work.resize(args.invalueslen);
815 }
816 args.invalues = &invalues_work[0];
817 for (uint32_t i = 0; i < args.invalueslen; ++i) {
818 skip_one(start, finish);
819 char *const invalue_begin = start;
820 read_token(start, finish);
821 char *const invalue_end = start;
822 char *wp = invalue_begin;
823 unescape_string(wp, invalue_begin, invalue_end);
824 invalues_work[i] = string_ref(invalue_begin, wp - invalue_begin);
825 }
826 skip_one(start, finish);
827 }
828 if (start == finish) {
829 /* no more options */
830 return dbctx->cmd_exec(conn, args);
831 }
832 /* filters */
833 size_t filters_count = 0;
834 while (start != finish && (start[0] == 'W' || start[0] == 'F')) {
835 char *const filter_type_begin = start;
836 read_token(start, finish);
837 char *const filter_type_end = start;
838 skip_one(start, finish);
839 char *const filter_op_begin = start;
840 read_token(start, finish);
841 char *const filter_op_end = start;
842 skip_one(start, finish);
843 const uint32_t ff_offset = read_ui32(start, finish);
844 skip_one(start, finish);
845 char *const filter_val_begin = start;
846 read_token(start, finish);
847 char *const filter_val_end = start;
848 skip_one(start, finish);
849 if (filters_work.size() <= filters_count) {
850 filters_work.resize(filters_count + 1);
851 }
852 record_filter& fi = filters_work[filters_count];
853 if (filter_type_end != filter_type_begin + 1) {
854 return conn.dbcb_resp_short(2, "filtertype");
855 }
856 fi.filter_type = (filter_type_begin[0] == 'W')
857 ? record_filter_type_break : record_filter_type_skip;
858 const uint32_t num_filflds = args.pst->get_filter_fields().size();
859 if (ff_offset >= num_filflds) {
860 return conn.dbcb_resp_short(2, "filterfld");
861 }
862 fi.op = string_ref(filter_op_begin, filter_op_end);
863 fi.ff_offset = ff_offset;
864 if (is_null_expression(filter_val_begin, filter_val_end)) {
865 /* null */
866 fi.val = string_ref();
867 } else {
868 /* non-null */
869 char *wp = filter_val_begin;
870 unescape_string(wp, filter_val_begin, filter_val_end);
871 fi.val = string_ref(filter_val_begin, wp - filter_val_begin);
872 }
873 ++filters_count;
874 }
875 if (filters_count > 0) {
876 if (filters_work.size() <= filters_count) {
877 filters_work.resize(filters_count + 1);
878 }
879 filters_work[filters_count].op = string_ref(); /* sentinel */
880 args.filters = &filters_work[0];
881 } else {
882 args.filters = 0;
883 }
884 if (start == finish) {
885 /* no modops */
886 return dbctx->cmd_exec(conn, args);
887 }
888 /* has modops */
889 char *const mod_op_begin = start;
890 read_token(start, finish);
891 char *const mod_op_end = start;
892 args.mod_op = string_ref(mod_op_begin, mod_op_end);
893 const size_t num_uvals = args.pst->get_ret_fields().size();
894 string_ref *const uflds = DENA_ALLOCA_ALLOCATE(string_ref, num_uvals);
895 auto_alloca_free<string_ref> uflds_autofree(uflds);
896 for (size_t i = 0; i < num_uvals; ++i) {
897 skip_one(start, finish);
898 char *const f_begin = start;
899 read_token(start, finish);
900 char *const f_end = start;
901 if (is_null_expression(f_begin, f_end)) {
902 /* null */
903 uflds[i] = string_ref();
904 } else {
905 /* non-null */
906 char *wp = f_begin;
907 unescape_string(wp, f_begin, f_end);
908 uflds[i] = string_ref(f_begin, wp - f_begin);
909 }
910 }
911 args.uvals = uflds;
912 return dbctx->cmd_exec(conn, args);
913}
914
915void
916hstcpsvr_worker::do_authorization(char *start, char *finish,
917 hstcpsvr_conn& conn)
918{
919 /* auth type */
920 char *const authtype_begin = start;
921 read_token(start, finish);
922 char *const authtype_end = start;
923 const size_t authtype_len = authtype_end - authtype_begin;
924 skip_one(start, finish);
925 /* key */
926 char *const key_begin = start;
927 read_token(start, finish);
928 char *const key_end = start;
929 const size_t key_len = key_end - key_begin;
930 authtype_end[0] = 0;
931 key_end[0] = 0;
932 char *wp = key_begin;
933 unescape_string(wp, key_begin, key_end);
934 if (authtype_len != 1 || authtype_begin[0] != '1') {
935 return conn.dbcb_resp_short(3, "authtype");
936 }
937 if (cshared.plain_secret.size() == key_len &&
938 memcmp(cshared.plain_secret.data(), key_begin, key_len) == 0) {
939 conn.authorized = true;
940 } else {
941 conn.authorized = false;
942 }
943 if (!conn.authorized) {
944 return conn.dbcb_resp_short(3, "unauth");
945 } else {
946 return conn.dbcb_resp_short(0, "");
947 }
948}
949
950hstcpsvr_worker_ptr
951hstcpsvr_worker_i::create(const hstcpsvr_worker_arg& arg)
952{
953 return hstcpsvr_worker_ptr(new hstcpsvr_worker(arg));
954}
955
956};
957
958