1 | |
2 | // vim:sw=2:ai |
3 | |
4 | /* |
5 | * Copyright (C) 2010 DeNA Co.,Ltd.. All rights reserved. |
6 | * See COPYRIGHT.txt for details. |
7 | */ |
8 | |
9 | #include <my_global.h> |
10 | #include <netinet/in.h> |
11 | #include <errno.h> |
12 | #include <poll.h> |
13 | #include <unistd.h> |
14 | #include <stdexcept> |
15 | #include <signal.h> |
16 | #include <list> |
17 | #if __linux__ |
18 | #include <sys/epoll.h> |
19 | #endif |
20 | #ifdef HAVE_ALLOCA_H |
21 | #include <alloca.h> |
22 | #endif |
23 | |
24 | #include "hstcpsvr_worker.hpp" |
25 | #include "string_buffer.hpp" |
26 | #include "auto_ptrcontainer.hpp" |
27 | #include "string_util.hpp" |
28 | #include "escape.hpp" |
29 | |
30 | #define DBG_FD(x) |
31 | #define DBG_TR(x) |
32 | #define DBG_EP(x) |
33 | #define DBG_MULTI(x) |
34 | |
35 | /* TODO */ |
36 | #if !defined(__linux__) && !defined(__FreeBSD__) && !defined(MSG_NOSIGNAL) |
37 | #define MSG_NOSIGNAL 0 |
38 | #endif |
39 | |
40 | namespace dena { |
41 | |
42 | struct dbconnstate { |
43 | string_buffer readbuf; |
44 | string_buffer writebuf; |
45 | std::vector<prep_stmt> prep_stmts; |
46 | size_t resp_begin_pos; |
47 | size_t find_nl_pos; |
48 | void reset() { |
49 | readbuf.clear(); |
50 | writebuf.clear(); |
51 | prep_stmts.clear(); |
52 | resp_begin_pos = 0; |
53 | find_nl_pos = 0; |
54 | } |
55 | dbconnstate() : resp_begin_pos(0), find_nl_pos(0) { } |
56 | }; |
57 | |
58 | struct hstcpsvr_conn; |
59 | typedef auto_ptrcontainer< std::list<hstcpsvr_conn *> > hstcpsvr_conns_type; |
60 | |
61 | struct hstcpsvr_conn : public dbcallback_i { |
62 | public: |
63 | auto_file fd; |
64 | sockaddr_storage addr; |
65 | size_socket addr_len; |
66 | dbconnstate cstate; |
67 | std::string err; |
68 | size_t readsize; |
69 | bool nonblocking; |
70 | bool read_finished; |
71 | bool write_finished; |
72 | time_t nb_last_io; |
73 | hstcpsvr_conns_type::iterator conns_iter; |
74 | bool authorized; |
75 | public: |
76 | bool closed() const; |
77 | bool ok_to_close() const; |
78 | void reset(); |
79 | int accept(const hstcpsvr_shared_c& cshared); |
80 | bool write_more(bool *more_r = 0); |
81 | bool read_more(bool *more_r = 0); |
82 | public: |
83 | virtual void dbcb_set_prep_stmt(size_t pst_id, const prep_stmt& v); |
84 | virtual const prep_stmt *dbcb_get_prep_stmt(size_t pst_id) const; |
85 | virtual void dbcb_resp_short(uint32_t code, const char *msg); |
86 | virtual void dbcb_resp_short_num(uint32_t code, uint32_t value); |
87 | virtual void dbcb_resp_short_num64(uint32_t code, uint64_t value); |
88 | virtual void dbcb_resp_begin(size_t num_flds); |
89 | virtual void dbcb_resp_entry(const char *fld, size_t fldlen); |
90 | virtual void dbcb_resp_end(); |
91 | virtual void dbcb_resp_cancel(); |
92 | public: |
93 | hstcpsvr_conn() : addr_len(sizeof(addr)), readsize(4096), |
94 | nonblocking(false), read_finished(false), write_finished(false), |
95 | nb_last_io(0), authorized(false) { } |
96 | }; |
97 | |
98 | bool |
99 | hstcpsvr_conn::closed() const |
100 | { |
101 | return fd.get() < 0; |
102 | } |
103 | |
104 | bool |
105 | hstcpsvr_conn::ok_to_close() const |
106 | { |
107 | return write_finished || (read_finished && cstate.writebuf.size() == 0); |
108 | } |
109 | |
110 | void |
111 | hstcpsvr_conn::reset() |
112 | { |
113 | addr = sockaddr_storage(); |
114 | addr_len = sizeof(addr); |
115 | cstate.reset(); |
116 | fd.reset(); |
117 | read_finished = false; |
118 | write_finished = false; |
119 | } |
120 | |
121 | int |
122 | hstcpsvr_conn::accept(const hstcpsvr_shared_c& cshared) |
123 | { |
124 | reset(); |
125 | return socket_accept(cshared.listen_fd.get(), fd, cshared.sockargs, addr, |
126 | addr_len, err); |
127 | } |
128 | |
129 | bool |
130 | hstcpsvr_conn::write_more(bool *more_r) |
131 | { |
132 | if (write_finished || cstate.writebuf.size() == 0) { |
133 | return false; |
134 | } |
135 | const size_t wlen = cstate.writebuf.size(); |
136 | ssize_t len = send(fd.get(), cstate.writebuf.begin(), wlen, MSG_NOSIGNAL); |
137 | if (len <= 0) { |
138 | if (len == 0 || !nonblocking || errno != EWOULDBLOCK) { |
139 | cstate.writebuf.clear(); |
140 | write_finished = true; |
141 | } |
142 | return false; |
143 | } |
144 | cstate.writebuf.erase_front(len); |
145 | /* FIXME: reallocate memory if too large */ |
146 | if (more_r) { |
147 | *more_r = (static_cast<size_t>(len) == wlen); |
148 | } |
149 | return true; |
150 | } |
151 | |
152 | bool |
153 | hstcpsvr_conn::read_more(bool *more_r) |
154 | { |
155 | if (read_finished) { |
156 | return false; |
157 | } |
158 | const size_t block_size = readsize > 4096 ? readsize : 4096; |
159 | char *wp = cstate.readbuf.make_space(block_size); |
160 | const ssize_t len = read(fd.get(), wp, block_size); |
161 | if (len <= 0) { |
162 | if (len == 0 || !nonblocking || errno != EWOULDBLOCK) { |
163 | read_finished = true; |
164 | } |
165 | return false; |
166 | } |
167 | cstate.readbuf.space_wrote(len); |
168 | if (more_r) { |
169 | *more_r = (static_cast<size_t>(len) == block_size); |
170 | } |
171 | return true; |
172 | } |
173 | |
174 | void |
175 | hstcpsvr_conn::dbcb_set_prep_stmt(size_t pst_id, const prep_stmt& v) |
176 | { |
177 | if (cstate.prep_stmts.size() <= pst_id) { |
178 | cstate.prep_stmts.resize(pst_id + 1); |
179 | } |
180 | cstate.prep_stmts[pst_id] = v; |
181 | } |
182 | |
183 | const prep_stmt * |
184 | hstcpsvr_conn::dbcb_get_prep_stmt(size_t pst_id) const |
185 | { |
186 | if (cstate.prep_stmts.size() <= pst_id) { |
187 | return 0; |
188 | } |
189 | return &cstate.prep_stmts[pst_id]; |
190 | } |
191 | |
192 | void |
193 | hstcpsvr_conn::dbcb_resp_short(uint32_t code, const char *msg) |
194 | { |
195 | write_ui32(cstate.writebuf, code); |
196 | const size_t msglen = strlen(msg); |
197 | if (msglen != 0) { |
198 | cstate.writebuf.append_literal("\t1\t" ); |
199 | cstate.writebuf.append(msg, msg + msglen); |
200 | } else { |
201 | cstate.writebuf.append_literal("\t1" ); |
202 | } |
203 | cstate.writebuf.append_literal("\n" ); |
204 | } |
205 | |
206 | void |
207 | hstcpsvr_conn::dbcb_resp_short_num(uint32_t code, uint32_t value) |
208 | { |
209 | write_ui32(cstate.writebuf, code); |
210 | cstate.writebuf.append_literal("\t1\t" ); |
211 | write_ui32(cstate.writebuf, value); |
212 | cstate.writebuf.append_literal("\n" ); |
213 | } |
214 | |
215 | void |
216 | hstcpsvr_conn::dbcb_resp_short_num64(uint32_t code, uint64_t value) |
217 | { |
218 | write_ui32(cstate.writebuf, code); |
219 | cstate.writebuf.append_literal("\t1\t" ); |
220 | write_ui64(cstate.writebuf, value); |
221 | cstate.writebuf.append_literal("\n" ); |
222 | } |
223 | |
224 | void |
225 | hstcpsvr_conn::dbcb_resp_begin(size_t num_flds) |
226 | { |
227 | cstate.resp_begin_pos = cstate.writebuf.size(); |
228 | cstate.writebuf.append_literal("0\t" ); |
229 | write_ui32(cstate.writebuf, num_flds); |
230 | } |
231 | |
232 | void |
233 | hstcpsvr_conn::dbcb_resp_entry(const char *fld, size_t fldlen) |
234 | { |
235 | if (fld != 0) { |
236 | cstate.writebuf.append_literal("\t" ); |
237 | escape_string(cstate.writebuf, fld, fld + fldlen); |
238 | } else { |
239 | static const char t[] = "\t\0" ; |
240 | cstate.writebuf.append(t, t + 2); |
241 | } |
242 | } |
243 | |
244 | void |
245 | hstcpsvr_conn::dbcb_resp_end() |
246 | { |
247 | cstate.writebuf.append_literal("\n" ); |
248 | cstate.resp_begin_pos = 0; |
249 | } |
250 | |
251 | void |
252 | hstcpsvr_conn::dbcb_resp_cancel() |
253 | { |
254 | cstate.writebuf.resize(cstate.resp_begin_pos); |
255 | cstate.resp_begin_pos = 0; |
256 | } |
257 | |
258 | struct hstcpsvr_worker : public hstcpsvr_worker_i, private noncopyable { |
259 | hstcpsvr_worker(const hstcpsvr_worker_arg& arg); |
260 | virtual void run(); |
261 | private: |
262 | const hstcpsvr_shared_c& cshared; |
263 | volatile hstcpsvr_shared_v& vshared; |
264 | long worker_id; |
265 | dbcontext_ptr dbctx; |
266 | hstcpsvr_conns_type conns; /* conns refs dbctx */ |
267 | time_t last_check_time; |
268 | std::vector<pollfd> pfds; |
269 | #ifdef __linux__ |
270 | std::vector<epoll_event> events_vec; |
271 | auto_file epoll_fd; |
272 | #endif |
273 | bool accept_enabled; |
274 | int accept_balance; |
275 | std::vector<string_ref> invalues_work; |
276 | std::vector<record_filter> filters_work; |
277 | private: |
278 | int run_one_nb(); |
279 | int run_one_ep(); |
280 | void execute_lines(hstcpsvr_conn& conn); |
281 | void execute_line(char *start, char *finish, hstcpsvr_conn& conn); |
282 | void do_open_index(char *start, char *finish, hstcpsvr_conn& conn); |
283 | void do_exec_on_index(char *cmd_begin, char *cmd_end, char *start, |
284 | char *finish, hstcpsvr_conn& conn); |
285 | void do_authorization(char *start, char *finish, hstcpsvr_conn& conn); |
286 | }; |
287 | |
288 | hstcpsvr_worker::hstcpsvr_worker(const hstcpsvr_worker_arg& arg) |
289 | : cshared(*arg.cshared), vshared(*arg.vshared), worker_id(arg.worker_id), |
290 | dbctx(cshared.dbptr->create_context(cshared.for_write_flag)), |
291 | last_check_time(time(0)), accept_enabled(true), accept_balance(0) |
292 | { |
293 | #ifdef __linux__ |
294 | if (cshared.sockargs.use_epoll) { |
295 | epoll_fd.reset(epoll_create(10)); |
296 | if (epoll_fd.get() < 0) { |
297 | fatal_abort("epoll_create" ); |
298 | } |
299 | epoll_event ev; |
300 | memset(&ev, 0, sizeof(ev)); |
301 | ev.events = EPOLLIN; |
302 | ev.data.ptr = 0; |
303 | if (epoll_ctl(epoll_fd.get(), EPOLL_CTL_ADD, cshared.listen_fd.get(), &ev) |
304 | != 0) { |
305 | fatal_abort("epoll_ctl EPOLL_CTL_ADD" ); |
306 | } |
307 | events_vec.resize(10240); |
308 | } |
309 | #endif |
310 | accept_balance = cshared.conf.get_int("accept_balance" , 0); |
311 | } |
312 | |
313 | namespace { |
314 | |
315 | struct thr_init { |
316 | thr_init(const dbcontext_ptr& dc, volatile int& shutdown_flag) : dbctx(dc) { |
317 | dbctx->init_thread(this, shutdown_flag); |
318 | } |
319 | ~thr_init() { |
320 | dbctx->term_thread(); |
321 | } |
322 | const dbcontext_ptr& dbctx; |
323 | }; |
324 | |
325 | }; // namespace |
326 | |
327 | void |
328 | hstcpsvr_worker::run() |
329 | { |
330 | thr_init initobj(dbctx, vshared.shutdown); |
331 | |
332 | #ifdef __linux__ |
333 | if (cshared.sockargs.use_epoll) { |
334 | while (!vshared.shutdown && dbctx->check_alive()) { |
335 | run_one_ep(); |
336 | } |
337 | } else if (cshared.sockargs.nonblocking) { |
338 | while (!vshared.shutdown && dbctx->check_alive()) { |
339 | run_one_nb(); |
340 | } |
341 | } else { |
342 | /* UNUSED */ |
343 | fatal_abort("run_one" ); |
344 | } |
345 | #else |
346 | while (!vshared.shutdown && dbctx->check_alive()) { |
347 | run_one_nb(); |
348 | } |
349 | #endif |
350 | } |
351 | |
352 | int |
353 | hstcpsvr_worker::run_one_nb() |
354 | { |
355 | size_t nfds = 0; |
356 | /* CLIENT SOCKETS */ |
357 | for (hstcpsvr_conns_type::const_iterator i = conns.begin(); |
358 | i != conns.end(); ++i) { |
359 | if (pfds.size() <= nfds) { |
360 | pfds.resize(nfds + 1); |
361 | } |
362 | pollfd& pfd = pfds[nfds++]; |
363 | pfd.fd = (*i)->fd.get(); |
364 | short ev = 0; |
365 | if ((*i)->cstate.writebuf.size() != 0) { |
366 | ev = POLLOUT; |
367 | } else { |
368 | ev = POLLIN; |
369 | } |
370 | pfd.events = pfd.revents = ev; |
371 | } |
372 | /* LISTENER */ |
373 | { |
374 | const size_t cpt = cshared.nb_conn_per_thread; |
375 | const short ev = (cpt > nfds) ? POLLIN : 0; |
376 | if (pfds.size() <= nfds) { |
377 | pfds.resize(nfds + 1); |
378 | } |
379 | pollfd& pfd = pfds[nfds++]; |
380 | pfd.fd = cshared.listen_fd.get(); |
381 | pfd.events = pfd.revents = ev; |
382 | } |
383 | /* POLL */ |
384 | const int npollev = poll(&pfds[0], nfds, 1 * 1000); |
385 | dbctx->set_statistics(conns.size(), npollev); |
386 | const time_t now = time(0); |
387 | size_t j = 0; |
388 | const short mask_in = ~POLLOUT; |
389 | const short mask_out = POLLOUT | POLLERR | POLLHUP | POLLNVAL; |
390 | /* READ */ |
391 | for (hstcpsvr_conns_type::iterator i = conns.begin(); i != conns.end(); |
392 | ++i, ++j) { |
393 | pollfd& pfd = pfds[j]; |
394 | if ((pfd.revents & mask_in) == 0) { |
395 | continue; |
396 | } |
397 | hstcpsvr_conn& conn = **i; |
398 | if (conn.read_more()) { |
399 | if (conn.cstate.readbuf.size() > 0) { |
400 | const char ch = conn.cstate.readbuf.begin()[0]; |
401 | if (ch == 'Q') { |
402 | vshared.shutdown = 1; |
403 | } else if (ch == '/') { |
404 | conn.cstate.readbuf.clear(); |
405 | conn.cstate.find_nl_pos = 0; |
406 | conn.cstate.writebuf.clear(); |
407 | conn.read_finished = true; |
408 | conn.write_finished = true; |
409 | } |
410 | } |
411 | conn.nb_last_io = now; |
412 | } |
413 | } |
414 | /* EXECUTE */ |
415 | j = 0; |
416 | for (hstcpsvr_conns_type::iterator i = conns.begin(); i != conns.end(); |
417 | ++i, ++j) { |
418 | pollfd& pfd = pfds[j]; |
419 | if ((pfd.revents & mask_in) == 0 || (*i)->cstate.readbuf.size() == 0) { |
420 | continue; |
421 | } |
422 | execute_lines(**i); |
423 | } |
424 | /* COMMIT */ |
425 | dbctx->unlock_tables_if(); |
426 | const bool commit_error = dbctx->get_commit_error(); |
427 | dbctx->clear_error(); |
428 | /* WRITE/CLOSE */ |
429 | j = 0; |
430 | for (hstcpsvr_conns_type::iterator i = conns.begin(); i != conns.end(); |
431 | ++j) { |
432 | pollfd& pfd = pfds[j]; |
433 | hstcpsvr_conn& conn = **i; |
434 | hstcpsvr_conns_type::iterator icur = i; |
435 | ++i; |
436 | if (commit_error) { |
437 | conn.reset(); |
438 | continue; |
439 | } |
440 | if ((pfd.revents & (mask_out | mask_in)) != 0) { |
441 | if (conn.write_more()) { |
442 | conn.nb_last_io = now; |
443 | } |
444 | } |
445 | if (cshared.sockargs.timeout != 0 && |
446 | conn.nb_last_io + cshared.sockargs.timeout < now) { |
447 | conn.reset(); |
448 | } |
449 | if (conn.closed() || conn.ok_to_close()) { |
450 | conns.erase_ptr(icur); |
451 | } |
452 | } |
453 | /* ACCEPT */ |
454 | { |
455 | pollfd& pfd = pfds[nfds - 1]; |
456 | if ((pfd.revents & mask_in) != 0) { |
457 | std::auto_ptr<hstcpsvr_conn> c(new hstcpsvr_conn()); |
458 | c->nonblocking = true; |
459 | c->readsize = cshared.readsize; |
460 | c->accept(cshared); |
461 | if (c->fd.get() >= 0) { |
462 | if (fcntl(c->fd.get(), F_SETFL, O_NONBLOCK) != 0) { |
463 | fatal_abort("F_SETFL O_NONBLOCK" ); |
464 | } |
465 | c->nb_last_io = now; |
466 | conns.push_back_ptr(c); |
467 | } else { |
468 | /* errno == 11 (EAGAIN) is not a fatal error. */ |
469 | DENA_VERBOSE(100, fprintf(stderr, |
470 | "accept failed: errno=%d (not fatal)\n" , errno)); |
471 | } |
472 | } |
473 | } |
474 | DENA_VERBOSE(30, fprintf(stderr, "nb: %p nfds=%zu cns=%zu\n" , this, nfds, |
475 | conns.size())); |
476 | if (conns.empty()) { |
477 | dbctx->close_tables_if(); |
478 | } |
479 | dbctx->set_statistics(conns.size(), 0); |
480 | return 0; |
481 | } |
482 | |
483 | #ifdef __linux__ |
484 | int |
485 | hstcpsvr_worker::run_one_ep() |
486 | { |
487 | epoll_event *const events = &events_vec[0]; |
488 | const size_t num_events = events_vec.size(); |
489 | const time_t now = time(0); |
490 | size_t in_count = 0, out_count = 0, accept_count = 0; |
491 | int nfds = epoll_wait(epoll_fd.get(), events, num_events, 1000); |
492 | /* READ/ACCEPT */ |
493 | dbctx->set_statistics(conns.size(), nfds); |
494 | for (int i = 0; i < nfds; ++i) { |
495 | epoll_event& ev = events[i]; |
496 | if ((ev.events & EPOLLIN) == 0) { |
497 | continue; |
498 | } |
499 | hstcpsvr_conn *const conn = static_cast<hstcpsvr_conn *>(ev.data.ptr); |
500 | if (conn == 0) { |
501 | /* listener */ |
502 | ++accept_count; |
503 | DBG_EP(fprintf(stderr, "IN listener\n" )); |
504 | std::auto_ptr<hstcpsvr_conn> c(new hstcpsvr_conn()); |
505 | c->nonblocking = true; |
506 | c->readsize = cshared.readsize; |
507 | c->accept(cshared); |
508 | if (c->fd.get() >= 0) { |
509 | if (fcntl(c->fd.get(), F_SETFL, O_NONBLOCK) != 0) { |
510 | fatal_abort("F_SETFL O_NONBLOCK" ); |
511 | } |
512 | epoll_event cev; |
513 | memset(&cev, 0, sizeof(cev)); |
514 | cev.events = EPOLLIN | EPOLLOUT | EPOLLET; |
515 | cev.data.ptr = c.get(); |
516 | c->nb_last_io = now; |
517 | const int fd = c->fd.get(); |
518 | conns.push_back_ptr(c); |
519 | conns.back()->conns_iter = --conns.end(); |
520 | if (epoll_ctl(epoll_fd.get(), EPOLL_CTL_ADD, fd, &cev) != 0) { |
521 | fatal_abort("epoll_ctl EPOLL_CTL_ADD" ); |
522 | } |
523 | } else { |
524 | DENA_VERBOSE(100, fprintf(stderr, |
525 | "accept failed: errno=%d (not fatal)\n" , errno)); |
526 | } |
527 | } else { |
528 | /* client connection */ |
529 | ++in_count; |
530 | DBG_EP(fprintf(stderr, "IN client\n" )); |
531 | bool more_data = false; |
532 | while (conn->read_more(&more_data)) { |
533 | DBG_EP(fprintf(stderr, "IN client read_more\n" )); |
534 | conn->nb_last_io = now; |
535 | if (!more_data) { |
536 | break; |
537 | } |
538 | } |
539 | } |
540 | } |
541 | /* EXECUTE */ |
542 | for (int i = 0; i < nfds; ++i) { |
543 | epoll_event& ev = events[i]; |
544 | hstcpsvr_conn *const conn = static_cast<hstcpsvr_conn *>(ev.data.ptr); |
545 | if ((ev.events & EPOLLIN) == 0 || conn == 0 || |
546 | conn->cstate.readbuf.size() == 0) { |
547 | continue; |
548 | } |
549 | const char ch = conn->cstate.readbuf.begin()[0]; |
550 | if (ch == 'Q') { |
551 | vshared.shutdown = 1; |
552 | } else if (ch == '/') { |
553 | conn->cstate.readbuf.clear(); |
554 | conn->cstate.find_nl_pos = 0; |
555 | conn->cstate.writebuf.clear(); |
556 | conn->read_finished = true; |
557 | conn->write_finished = true; |
558 | } else { |
559 | execute_lines(*conn); |
560 | } |
561 | } |
562 | /* COMMIT */ |
563 | dbctx->unlock_tables_if(); |
564 | const bool commit_error = dbctx->get_commit_error(); |
565 | dbctx->clear_error(); |
566 | /* WRITE */ |
567 | for (int i = 0; i < nfds; ++i) { |
568 | epoll_event& ev = events[i]; |
569 | hstcpsvr_conn *const conn = static_cast<hstcpsvr_conn *>(ev.data.ptr); |
570 | if (commit_error && conn != 0) { |
571 | conn->reset(); |
572 | continue; |
573 | } |
574 | if ((ev.events & EPOLLOUT) == 0) { |
575 | continue; |
576 | } |
577 | ++out_count; |
578 | if (conn == 0) { |
579 | /* listener */ |
580 | DBG_EP(fprintf(stderr, "OUT listener\n" )); |
581 | } else { |
582 | /* client connection */ |
583 | DBG_EP(fprintf(stderr, "OUT client\n" )); |
584 | bool more_data = false; |
585 | while (conn->write_more(&more_data)) { |
586 | DBG_EP(fprintf(stderr, "OUT client write_more\n" )); |
587 | conn->nb_last_io = now; |
588 | if (!more_data) { |
589 | break; |
590 | } |
591 | } |
592 | } |
593 | } |
594 | /* CLOSE */ |
595 | for (int i = 0; i < nfds; ++i) { |
596 | epoll_event& ev = events[i]; |
597 | hstcpsvr_conn *const conn = static_cast<hstcpsvr_conn *>(ev.data.ptr); |
598 | if (conn != 0 && conn->ok_to_close()) { |
599 | DBG_EP(fprintf(stderr, "CLOSE close\n" )); |
600 | conns.erase_ptr(conn->conns_iter); |
601 | } |
602 | } |
603 | /* TIMEOUT & cleanup */ |
604 | if (last_check_time + 10 < now) { |
605 | for (hstcpsvr_conns_type::iterator i = conns.begin(); |
606 | i != conns.end(); ) { |
607 | hstcpsvr_conns_type::iterator icur = i; |
608 | ++i; |
609 | if (cshared.sockargs.timeout != 0 && |
610 | (*icur)->nb_last_io + cshared.sockargs.timeout < now) { |
611 | conns.erase_ptr((*icur)->conns_iter); |
612 | } |
613 | } |
614 | last_check_time = now; |
615 | DENA_VERBOSE(20, fprintf(stderr, "ep: %p nfds=%d cns=%zu\n" , this, nfds, |
616 | conns.size())); |
617 | } |
618 | DENA_VERBOSE(30, fprintf(stderr, "%p in=%zu out=%zu ac=%zu, cns=%zu\n" , |
619 | this, in_count, out_count, accept_count, conns.size())); |
620 | if (conns.empty()) { |
621 | dbctx->close_tables_if(); |
622 | } |
623 | /* STATISTICS */ |
624 | const size_t num_conns = conns.size(); |
625 | dbctx->set_statistics(num_conns, 0); |
626 | /* ENABLE/DISABLE ACCEPT */ |
627 | if (accept_balance != 0) { |
628 | cshared.thread_num_conns[worker_id] = num_conns; |
629 | size_t total_num_conns = 0; |
630 | for (long i = 0; i < cshared.num_threads; ++i) { |
631 | total_num_conns += cshared.thread_num_conns[i]; |
632 | } |
633 | bool e_acc = false; |
634 | if (num_conns < 10 || |
635 | total_num_conns * 2 > num_conns * cshared.num_threads) { |
636 | e_acc = true; |
637 | } |
638 | epoll_event ev; |
639 | memset(&ev, 0, sizeof(ev)); |
640 | ev.events = EPOLLIN; |
641 | ev.data.ptr = 0; |
642 | if (e_acc == accept_enabled) { |
643 | } else if (e_acc) { |
644 | if (epoll_ctl(epoll_fd.get(), EPOLL_CTL_ADD, cshared.listen_fd.get(), &ev) |
645 | != 0) { |
646 | fatal_abort("epoll_ctl EPOLL_CTL_ADD" ); |
647 | } |
648 | } else { |
649 | if (epoll_ctl(epoll_fd.get(), EPOLL_CTL_DEL, cshared.listen_fd.get(), &ev) |
650 | != 0) { |
651 | fatal_abort("epoll_ctl EPOLL_CTL_ADD" ); |
652 | } |
653 | } |
654 | accept_enabled = e_acc; |
655 | } |
656 | return 0; |
657 | } |
658 | #endif |
659 | |
660 | void |
661 | hstcpsvr_worker::execute_lines(hstcpsvr_conn& conn) |
662 | { |
663 | DBG_MULTI(int cnt = 0); |
664 | dbconnstate& cstate = conn.cstate; |
665 | char *buf_end = cstate.readbuf.end(); |
666 | char *line_begin = cstate.readbuf.begin(); |
667 | char *find_pos = line_begin + cstate.find_nl_pos; |
668 | while (true) { |
669 | char *const nl = memchr_char(find_pos, '\n', buf_end - find_pos); |
670 | if (nl == 0) { |
671 | break; |
672 | } |
673 | char *const lf = (line_begin != nl && nl[-1] == '\r') ? nl - 1 : nl; |
674 | DBG_MULTI(cnt++); |
675 | execute_line(line_begin, lf, conn); |
676 | find_pos = line_begin = nl + 1; |
677 | } |
678 | cstate.readbuf.erase_front(line_begin - cstate.readbuf.begin()); |
679 | cstate.find_nl_pos = cstate.readbuf.size(); |
680 | DBG_MULTI(fprintf(stderr, "cnt=%d\n" , cnt)); |
681 | } |
682 | |
683 | void |
684 | hstcpsvr_worker::execute_line(char *start, char *finish, hstcpsvr_conn& conn) |
685 | { |
686 | /* safe to modify, safe to dereference 'finish' */ |
687 | char *const cmd_begin = start; |
688 | read_token(start, finish); |
689 | char *const cmd_end = start; |
690 | skip_one(start, finish); |
691 | if (cmd_begin == cmd_end) { |
692 | return conn.dbcb_resp_short(2, "cmd" ); |
693 | } |
694 | if (cmd_begin + 1 == cmd_end) { |
695 | if (cmd_begin[0] == 'P') { |
696 | if (cshared.require_auth && !conn.authorized) { |
697 | return conn.dbcb_resp_short(3, "unauth" ); |
698 | } |
699 | return do_open_index(start, finish, conn); |
700 | } |
701 | if (cmd_begin[0] == 'A') { |
702 | return do_authorization(start, finish, conn); |
703 | } |
704 | } |
705 | if (cmd_begin[0] >= '0' && cmd_begin[0] <= '9') { |
706 | if (cshared.require_auth && !conn.authorized) { |
707 | return conn.dbcb_resp_short(3, "unauth" ); |
708 | } |
709 | return do_exec_on_index(cmd_begin, cmd_end, start, finish, conn); |
710 | } |
711 | return conn.dbcb_resp_short(2, "cmd" ); |
712 | } |
713 | |
714 | void |
715 | hstcpsvr_worker::do_open_index(char *start, char *finish, hstcpsvr_conn& conn) |
716 | { |
717 | const size_t pst_id = read_ui32(start, finish); |
718 | skip_one(start, finish); |
719 | /* dbname */ |
720 | char *const dbname_begin = start; |
721 | read_token(start, finish); |
722 | char *const dbname_end = start; |
723 | skip_one(start, finish); |
724 | /* tblname */ |
725 | char *const tblname_begin = start; |
726 | read_token(start, finish); |
727 | char *const tblname_end = start; |
728 | skip_one(start, finish); |
729 | /* idxname */ |
730 | char *const idxname_begin = start; |
731 | read_token(start, finish); |
732 | char *const idxname_end = start; |
733 | skip_one(start, finish); |
734 | /* retfields */ |
735 | char *const retflds_begin = start; |
736 | read_token(start, finish); |
737 | char *const retflds_end = start; |
738 | skip_one(start, finish); |
739 | /* filfields */ |
740 | char *const filflds_begin = start; |
741 | read_token(start, finish); |
742 | char *const filflds_end = start; |
743 | dbname_end[0] = 0; |
744 | tblname_end[0] = 0; |
745 | idxname_end[0] = 0; |
746 | retflds_end[0] = 0; |
747 | filflds_end[0] = 0; |
748 | cmd_open_args args; |
749 | args.pst_id = pst_id; |
750 | args.dbn = dbname_begin; |
751 | args.tbl = tblname_begin; |
752 | args.idx = idxname_begin; |
753 | args.retflds = retflds_begin; |
754 | args.filflds = filflds_begin; |
755 | return dbctx->cmd_open(conn, args); |
756 | } |
757 | |
758 | void |
759 | hstcpsvr_worker::do_exec_on_index(char *cmd_begin, char *cmd_end, char *start, |
760 | char *finish, hstcpsvr_conn& conn) |
761 | { |
762 | cmd_exec_args args; |
763 | const size_t pst_id = read_ui32(cmd_begin, cmd_end); |
764 | if (pst_id >= conn.cstate.prep_stmts.size()) { |
765 | return conn.dbcb_resp_short(2, "stmtnum" ); |
766 | } |
767 | args.pst = &conn.cstate.prep_stmts[pst_id]; |
768 | char *const op_begin = start; |
769 | read_token(start, finish); |
770 | char *const op_end = start; |
771 | args.op = string_ref(op_begin, op_end); |
772 | skip_one(start, finish); |
773 | const uint32_t fldnum = read_ui32(start, finish); |
774 | string_ref *const flds = DENA_ALLOCA_ALLOCATE(string_ref, fldnum); |
775 | auto_alloca_free<string_ref> flds_autofree(flds); |
776 | args.kvals = flds; |
777 | args.kvalslen = fldnum; |
778 | for (size_t i = 0; i < fldnum; ++i) { |
779 | skip_one(start, finish); |
780 | char *const f_begin = start; |
781 | read_token(start, finish); |
782 | char *const f_end = start; |
783 | if (is_null_expression(f_begin, f_end)) { |
784 | /* null */ |
785 | flds[i] = string_ref(); |
786 | } else { |
787 | /* non-null */ |
788 | char *wp = f_begin; |
789 | unescape_string(wp, f_begin, f_end); |
790 | flds[i] = string_ref(f_begin, wp - f_begin); |
791 | } |
792 | } |
793 | skip_one(start, finish); |
794 | args.limit = read_ui32(start, finish); |
795 | skip_one(start, finish); |
796 | args.skip = read_ui32(start, finish); |
797 | if (start == finish) { |
798 | /* simple query */ |
799 | return dbctx->cmd_exec(conn, args); |
800 | } |
801 | /* has more options */ |
802 | skip_one(start, finish); |
803 | /* in-clause */ |
804 | if (start[0] == '@') { |
805 | read_token(start, finish); /* '@' */ |
806 | skip_one(start, finish); |
807 | args.invalues_keypart = read_ui32(start, finish); |
808 | skip_one(start, finish); |
809 | args.invalueslen = read_ui32(start, finish); |
810 | if (args.invalueslen <= 0) { |
811 | return conn.dbcb_resp_short(2, "invalueslen" ); |
812 | } |
813 | if (invalues_work.size() < args.invalueslen) { |
814 | invalues_work.resize(args.invalueslen); |
815 | } |
816 | args.invalues = &invalues_work[0]; |
817 | for (uint32_t i = 0; i < args.invalueslen; ++i) { |
818 | skip_one(start, finish); |
819 | char *const invalue_begin = start; |
820 | read_token(start, finish); |
821 | char *const invalue_end = start; |
822 | char *wp = invalue_begin; |
823 | unescape_string(wp, invalue_begin, invalue_end); |
824 | invalues_work[i] = string_ref(invalue_begin, wp - invalue_begin); |
825 | } |
826 | skip_one(start, finish); |
827 | } |
828 | if (start == finish) { |
829 | /* no more options */ |
830 | return dbctx->cmd_exec(conn, args); |
831 | } |
832 | /* filters */ |
833 | size_t filters_count = 0; |
834 | while (start != finish && (start[0] == 'W' || start[0] == 'F')) { |
835 | char *const filter_type_begin = start; |
836 | read_token(start, finish); |
837 | char *const filter_type_end = start; |
838 | skip_one(start, finish); |
839 | char *const filter_op_begin = start; |
840 | read_token(start, finish); |
841 | char *const filter_op_end = start; |
842 | skip_one(start, finish); |
843 | const uint32_t ff_offset = read_ui32(start, finish); |
844 | skip_one(start, finish); |
845 | char *const filter_val_begin = start; |
846 | read_token(start, finish); |
847 | char *const filter_val_end = start; |
848 | skip_one(start, finish); |
849 | if (filters_work.size() <= filters_count) { |
850 | filters_work.resize(filters_count + 1); |
851 | } |
852 | record_filter& fi = filters_work[filters_count]; |
853 | if (filter_type_end != filter_type_begin + 1) { |
854 | return conn.dbcb_resp_short(2, "filtertype" ); |
855 | } |
856 | fi.filter_type = (filter_type_begin[0] == 'W') |
857 | ? record_filter_type_break : record_filter_type_skip; |
858 | const uint32_t num_filflds = args.pst->get_filter_fields().size(); |
859 | if (ff_offset >= num_filflds) { |
860 | return conn.dbcb_resp_short(2, "filterfld" ); |
861 | } |
862 | fi.op = string_ref(filter_op_begin, filter_op_end); |
863 | fi.ff_offset = ff_offset; |
864 | if (is_null_expression(filter_val_begin, filter_val_end)) { |
865 | /* null */ |
866 | fi.val = string_ref(); |
867 | } else { |
868 | /* non-null */ |
869 | char *wp = filter_val_begin; |
870 | unescape_string(wp, filter_val_begin, filter_val_end); |
871 | fi.val = string_ref(filter_val_begin, wp - filter_val_begin); |
872 | } |
873 | ++filters_count; |
874 | } |
875 | if (filters_count > 0) { |
876 | if (filters_work.size() <= filters_count) { |
877 | filters_work.resize(filters_count + 1); |
878 | } |
879 | filters_work[filters_count].op = string_ref(); /* sentinel */ |
880 | args.filters = &filters_work[0]; |
881 | } else { |
882 | args.filters = 0; |
883 | } |
884 | if (start == finish) { |
885 | /* no modops */ |
886 | return dbctx->cmd_exec(conn, args); |
887 | } |
888 | /* has modops */ |
889 | char *const mod_op_begin = start; |
890 | read_token(start, finish); |
891 | char *const mod_op_end = start; |
892 | args.mod_op = string_ref(mod_op_begin, mod_op_end); |
893 | const size_t num_uvals = args.pst->get_ret_fields().size(); |
894 | string_ref *const uflds = DENA_ALLOCA_ALLOCATE(string_ref, num_uvals); |
895 | auto_alloca_free<string_ref> uflds_autofree(uflds); |
896 | for (size_t i = 0; i < num_uvals; ++i) { |
897 | skip_one(start, finish); |
898 | char *const f_begin = start; |
899 | read_token(start, finish); |
900 | char *const f_end = start; |
901 | if (is_null_expression(f_begin, f_end)) { |
902 | /* null */ |
903 | uflds[i] = string_ref(); |
904 | } else { |
905 | /* non-null */ |
906 | char *wp = f_begin; |
907 | unescape_string(wp, f_begin, f_end); |
908 | uflds[i] = string_ref(f_begin, wp - f_begin); |
909 | } |
910 | } |
911 | args.uvals = uflds; |
912 | return dbctx->cmd_exec(conn, args); |
913 | } |
914 | |
915 | void |
916 | hstcpsvr_worker::do_authorization(char *start, char *finish, |
917 | hstcpsvr_conn& conn) |
918 | { |
919 | /* auth type */ |
920 | char *const authtype_begin = start; |
921 | read_token(start, finish); |
922 | char *const authtype_end = start; |
923 | const size_t authtype_len = authtype_end - authtype_begin; |
924 | skip_one(start, finish); |
925 | /* key */ |
926 | char *const key_begin = start; |
927 | read_token(start, finish); |
928 | char *const key_end = start; |
929 | const size_t key_len = key_end - key_begin; |
930 | authtype_end[0] = 0; |
931 | key_end[0] = 0; |
932 | char *wp = key_begin; |
933 | unescape_string(wp, key_begin, key_end); |
934 | if (authtype_len != 1 || authtype_begin[0] != '1') { |
935 | return conn.dbcb_resp_short(3, "authtype" ); |
936 | } |
937 | if (cshared.plain_secret.size() == key_len && |
938 | memcmp(cshared.plain_secret.data(), key_begin, key_len) == 0) { |
939 | conn.authorized = true; |
940 | } else { |
941 | conn.authorized = false; |
942 | } |
943 | if (!conn.authorized) { |
944 | return conn.dbcb_resp_short(3, "unauth" ); |
945 | } else { |
946 | return conn.dbcb_resp_short(0, "" ); |
947 | } |
948 | } |
949 | |
950 | hstcpsvr_worker_ptr |
951 | hstcpsvr_worker_i::create(const hstcpsvr_worker_arg& arg) |
952 | { |
953 | return hstcpsvr_worker_ptr(new hstcpsvr_worker(arg)); |
954 | } |
955 | |
956 | }; |
957 | |
958 | |