1/* -*- c-basic-offset: 2 -*- */
2/* Copyright(C) 2009-2012 Brazil
3
4 This library is free software; you can redistribute it and/or
5 modify it under the terms of the GNU Lesser General Public
6 License version 2.1 as published by the Free Software Foundation.
7
8 This library is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 Lesser General Public License for more details.
12
13 You should have received a copy of the GNU Lesser General Public
14 License along with this library; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16*/
17
18#include "grn.h"
19
20#include <stdio.h>
21#include <string.h>
22#include "grn_ctx_impl.h"
23
24#ifdef WIN32
25# include <ws2tcpip.h>
26#else
27# ifdef HAVE_SYS_SOCKET_H
28# include <sys/socket.h>
29# endif /* HAVE_SYS_SOCKET_H */
30# include <netinet/in.h>
31# include <netinet/tcp.h>
32# ifdef HAVE_SIGNAL_H
33# include <signal.h>
34# endif /* HAVE_SIGNAL_H */
35# include <sys/uio.h>
36#endif /* WIN32 */
37
38#include "grn_ctx.h"
39#include "grn_com.h"
40
41#ifndef PF_INET
42#define PF_INET AF_INET
43#endif /* PF_INET */
44
45#ifndef SOL_TCP
46# ifdef IPPROTO_TCP
47# define SOL_TCP IPPROTO_TCP
48# else
49# define SOL_TCP 6
50# endif /* IPPROTO_TCP */
51#endif /* SOL_TCP */
52
53#ifndef USE_MSG_MORE
54# ifdef MSG_MORE
55# undef MSG_MORE
56# endif
57# define MSG_MORE 0
58#endif /* USE_MSG_MORE */
59
60
61#ifndef USE_MSG_NOSIGNAL
62# ifdef MSG_NOSIGNAL
63# undef MSG_NOSIGNAL
64# endif
65# define MSG_NOSIGNAL 0
66#endif /* USE_MSG_NOSIGNAL */
67/******* grn_com_queue ********/
68
69grn_rc
70grn_com_queue_enque(grn_ctx *ctx, grn_com_queue *q, grn_com_queue_entry *e)
71{
72 CRITICAL_SECTION_ENTER(q->cs);
73 e->next = NULL;
74 *q->tail = e;
75 q->tail = &e->next;
76 CRITICAL_SECTION_LEAVE(q->cs);
77 /*
78 uint8_t i = q->last + 1;
79 e->next = NULL;
80 if (q->first == i || q->next) {
81 CRITICAL_SECTION_ENTER(q->cs);
82 if (q->first == i || q->next) {
83 *q->tail = e;
84 q->tail = &e->next;
85 } else {
86 q->bins[q->last] = e;
87 q->last = i;
88 }
89 CRITICAL_SECTION_LEAVE(q->cs);
90 } else {
91 q->bins[q->last] = e;
92 q->last = i;
93 }
94 */
95 return GRN_SUCCESS;
96}
97
98grn_com_queue_entry *
99grn_com_queue_deque(grn_ctx *ctx, grn_com_queue *q)
100{
101 grn_com_queue_entry *e = NULL;
102
103 CRITICAL_SECTION_ENTER(q->cs);
104 if (q->next) {
105 e = q->next;
106 if (!(q->next = e->next)) { q->tail = &q->next; }
107 }
108 CRITICAL_SECTION_LEAVE(q->cs);
109
110 /*
111 if (q->first == q->last) {
112 if (q->next) {
113 CRITICAL_SECTION_ENTER(q->cs);
114 e = q->next;
115 if (!(q->next = e->next)) { q->tail = &q->next; }
116 CRITICAL_SECTION_LEAVE(q->cs);
117 }
118 } else {
119 e = q->bins[q->first++];
120 }
121 */
122 return e;
123}
124
125/******* grn_msg ********/
126
127grn_obj *
128grn_msg_open(grn_ctx *ctx, grn_com *com, grn_com_queue *old)
129{
130 grn_msg *msg = NULL;
131 if (old && (msg = (grn_msg *)grn_com_queue_deque(ctx, old))) {
132 if (msg->ctx != ctx) {
133 ERR(GRN_INVALID_ARGUMENT, "ctx unmatch");
134 return NULL;
135 }
136 GRN_BULK_REWIND(&msg->qe.obj);
137 } else if ((msg = GRN_MALLOCN(grn_msg, 1))) {
138 GRN_OBJ_INIT(&msg->qe.obj, GRN_MSG, 0, GRN_DB_TEXT);
139 msg->qe.obj.header.impl_flags |= GRN_OBJ_ALLOCATED;
140 msg->ctx = ctx;
141 }
142 msg->qe.next = NULL;
143 msg->u.peer = com;
144 msg->old = old;
145 memset(&msg->header, 0, sizeof(grn_com_header));
146 return (grn_obj *)msg;
147}
148
149grn_obj *
150grn_msg_open_for_reply(grn_ctx *ctx, grn_obj *query, grn_com_queue *old)
151{
152 grn_msg *req = (grn_msg *)query, *msg = NULL;
153 if (req && (msg = (grn_msg *)grn_msg_open(ctx, req->u.peer, old))) {
154 msg->edge_id = req->edge_id;
155 msg->header.proto = req->header.proto == GRN_COM_PROTO_MBREQ
156 ? GRN_COM_PROTO_MBRES : req->header.proto;
157 }
158 return (grn_obj *)msg;
159}
160
161grn_rc
162grn_msg_close(grn_ctx *ctx, grn_obj *obj)
163{
164 grn_msg *msg = (grn_msg *)obj;
165 if (ctx == msg->ctx) { return grn_obj_close(ctx, obj); }
166 return grn_com_queue_enque(ctx, msg->old, (grn_com_queue_entry *)msg);
167}
168
169grn_rc
170grn_msg_set_property(grn_ctx *ctx, grn_obj *obj,
171 uint16_t status, uint32_t key_size, uint8_t extra_size)
172{
173 grn_com_header *header = &((grn_msg *)obj)->header;
174 header->status = htons(status);
175 header->keylen = htons(key_size);
176 header->level = extra_size;
177 return GRN_SUCCESS;
178}
179
180grn_rc
181grn_msg_send(grn_ctx *ctx, grn_obj *msg, int flags)
182{
183 grn_rc rc;
184 grn_msg *m = (grn_msg *)msg;
185 grn_com *peer = m->u.peer;
186 grn_com_header *header = &m->header;
187 if (GRN_COM_QUEUE_EMPTYP(&peer->new_)) {
188 switch (header->proto) {
189 case GRN_COM_PROTO_HTTP :
190 {
191 ssize_t ret;
192 ret = send(peer->fd, GRN_BULK_HEAD(msg), GRN_BULK_VSIZE(msg), MSG_NOSIGNAL);
193 if (ret == -1) { SOERR("send"); }
194 if (ctx->rc != GRN_OPERATION_WOULD_BLOCK) {
195 grn_com_queue_enque(ctx, m->old, (grn_com_queue_entry *)msg);
196 return ctx->rc;
197 }
198 }
199 break;
200 case GRN_COM_PROTO_GQTP :
201 {
202 if (flags & GRN_CTX_MORE) { flags |= GRN_CTX_QUIET; }
203 if (ctx->stat == GRN_CTX_QUIT) { flags |= GRN_CTX_QUIT; }
204 header->qtype = (uint8_t) ctx->impl->output.type;
205 header->keylen = 0;
206 header->level = 0;
207 header->flags = flags;
208 header->status = htons((uint16_t)ctx->rc);
209 header->opaque = 0;
210 header->cas = 0;
211 //todo : MSG_DONTWAIT
212 rc = grn_com_send(ctx, peer, header,
213 GRN_BULK_HEAD(msg), GRN_BULK_VSIZE(msg), 0);
214 if (rc != GRN_OPERATION_WOULD_BLOCK) {
215 grn_com_queue_enque(ctx, m->old, (grn_com_queue_entry *)msg);
216 return rc;
217 }
218 }
219 break;
220 case GRN_COM_PROTO_MBREQ :
221 return GRN_FUNCTION_NOT_IMPLEMENTED;
222 case GRN_COM_PROTO_MBRES :
223 rc = grn_com_send(ctx, peer, header,
224 GRN_BULK_HEAD(msg), GRN_BULK_VSIZE(msg),
225 (flags & GRN_CTX_MORE) ? MSG_MORE :0);
226 if (rc != GRN_OPERATION_WOULD_BLOCK) {
227 grn_com_queue_enque(ctx, m->old, (grn_com_queue_entry *)msg);
228 return rc;
229 }
230 break;
231 default :
232 return GRN_INVALID_ARGUMENT;
233 }
234 }
235 MUTEX_LOCK(peer->ev->mutex);
236 rc = grn_com_queue_enque(ctx, &peer->new_, (grn_com_queue_entry *)msg);
237 COND_SIGNAL(peer->ev->cond);
238 MUTEX_UNLOCK(peer->ev->mutex);
239 return rc;
240}
241
242/******* grn_com ********/
243
244grn_rc
245grn_com_init(void)
246{
247#ifdef WIN32
248 WSADATA wd;
249 if (WSAStartup(MAKEWORD(2, 0), &wd) != 0) {
250 grn_ctx *ctx = &grn_gctx;
251 SOERR("WSAStartup");
252 }
253#else /* WIN32 */
254#ifndef USE_MSG_NOSIGNAL
255 if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) {
256 grn_ctx *ctx = &grn_gctx;
257 SERR("signal");
258 }
259#endif /* USE_MSG_NOSIGNAL */
260#endif /* WIN32 */
261 return grn_gctx.rc;
262}
263
264void
265grn_com_fin(void)
266{
267#ifdef WIN32
268 WSACleanup();
269#endif /* WIN32 */
270}
271
272grn_rc
273grn_com_event_init(grn_ctx *ctx, grn_com_event *ev, int max_nevents, int data_size)
274{
275 ev->max_nevents = max_nevents;
276 if ((ev->hash = grn_hash_create(ctx, NULL, sizeof(grn_sock), data_size, 0))) {
277 MUTEX_INIT(ev->mutex);
278 COND_INIT(ev->cond);
279 GRN_COM_QUEUE_INIT(&ev->recv_old);
280 ev->msg_handler = NULL;
281 memset(&(ev->curr_edge_id), 0, sizeof(grn_com_addr));
282 ev->acceptor = NULL;
283 ev->opaque = NULL;
284#ifndef USE_SELECT
285# ifdef USE_EPOLL
286 if ((ev->events = GRN_MALLOC(sizeof(struct epoll_event) * max_nevents))) {
287 if ((ev->epfd = epoll_create(max_nevents)) != -1) {
288 goto exit;
289 } else {
290 SERR("epoll_create");
291 }
292 GRN_FREE(ev->events);
293 }
294# else /* USE_EPOLL */
295# ifdef USE_KQUEUE
296 if ((ev->events = GRN_MALLOC(sizeof(struct kevent) * max_nevents))) {
297 if ((ev->kqfd = kqueue()) != -1) {
298 goto exit;
299 } else {
300 SERR("kqueue");
301 }
302 GRN_FREE(ev->events);
303 }
304# else /* USE_KQUEUE */
305 if ((ev->events = GRN_MALLOC(sizeof(struct pollfd) * max_nevents))) {
306 goto exit;
307 }
308# endif /* USE_KQUEUE*/
309# endif /* USE_EPOLL */
310 grn_hash_close(ctx, ev->hash);
311 ev->hash = NULL;
312 ev->events = NULL;
313#else /* USE_SELECT */
314 goto exit;
315#endif /* USE_SELECT */
316 }
317exit :
318 return ctx->rc;
319}
320
321grn_rc
322grn_com_event_fin(grn_ctx *ctx, grn_com_event *ev)
323{
324 grn_obj *msg;
325 while ((msg = (grn_obj *)grn_com_queue_deque(ctx, &ev->recv_old))) {
326 grn_msg_close(ctx, msg);
327 }
328 if (ev->hash) { grn_hash_close(ctx, ev->hash); }
329#ifndef USE_SELECT
330 if (ev->events) { GRN_FREE(ev->events); }
331# ifdef USE_EPOLL
332 grn_close(ev->epfd);
333# endif /* USE_EPOLL */
334# ifdef USE_KQUEUE
335 grn_close(ev->kqfd);
336# endif /* USE_KQUEUE*/
337#endif /* USE_SELECT */
338 return GRN_SUCCESS;
339}
340
341grn_rc
342grn_com_event_add(grn_ctx *ctx, grn_com_event *ev, grn_sock fd, int events, grn_com **com)
343{
344 grn_com *c;
345 /* todo : expand events */
346 if (!ev || *ev->hash->n_entries == ev->max_nevents) {
347 if (ev) { GRN_LOG(ctx, GRN_LOG_ERROR, "too many connections (%d)", ev->max_nevents); }
348 return GRN_INVALID_ARGUMENT;
349 }
350#ifdef USE_EPOLL
351 {
352 struct epoll_event e;
353 memset(&e, 0, sizeof(struct epoll_event));
354 e.data.fd = (fd);
355 e.events = (uint32_t) events;
356 if (epoll_ctl(ev->epfd, EPOLL_CTL_ADD, (fd), &e) == -1) {
357 SERR("epoll_ctl");
358 return ctx->rc;
359 }
360 }
361#endif /* USE_EPOLL*/
362#ifdef USE_KQUEUE
363 {
364 struct kevent e;
365 /* todo: udata should have fd */
366 EV_SET(&e, (fd), events, EV_ADD, 0, 0, NULL);
367 if (kevent(ev->kqfd, &e, 1, NULL, 0, NULL) == -1) {
368 SERR("kevent");
369 return ctx->rc;
370 }
371 }
372#endif /* USE_KQUEUE */
373 {
374 if (grn_hash_add(ctx, ev->hash, &fd, sizeof(grn_sock), (void **)&c, NULL)) {
375 c->ev = ev;
376 c->fd = fd;
377 c->events = events;
378 if (com) { *com = c; }
379 }
380 }
381 return ctx->rc;
382}
383
384grn_rc
385grn_com_event_mod(grn_ctx *ctx, grn_com_event *ev, grn_sock fd, int events, grn_com **com)
386{
387 grn_com *c;
388 if (!ev) { return GRN_INVALID_ARGUMENT; }
389 if (grn_hash_get(ctx, ev->hash, &fd, sizeof(grn_sock), (void **)&c)) {
390 if (c->fd != fd) {
391 GRN_LOG(ctx, GRN_LOG_ERROR,
392 "grn_com_event_mod fd unmatch "
393 "%" GRN_FMT_SOCKET " != %" GRN_FMT_SOCKET,
394 c->fd, fd);
395 return GRN_OBJECT_CORRUPT;
396 }
397 if (com) { *com = c; }
398 if (c->events != events) {
399#ifdef USE_EPOLL
400 struct epoll_event e;
401 memset(&e, 0, sizeof(struct epoll_event));
402 e.data.fd = (fd);
403 e.events = (uint32_t) events;
404 if (epoll_ctl(ev->epfd, EPOLL_CTL_MOD, (fd), &e) == -1) {
405 SERR("epoll_ctl");
406 return ctx->rc;
407 }
408#endif /* USE_EPOLL*/
409#ifdef USE_KQUEUE
410 // experimental
411 struct kevent e[2];
412 EV_SET(&e[0], (fd), GRN_COM_POLLIN|GRN_COM_POLLOUT, EV_DELETE, 0, 0, NULL);
413 EV_SET(&e[1], (fd), events, EV_ADD, 0, 0, NULL);
414 if (kevent(ev->kqfd, e, 2, NULL, 0, NULL) == -1) {
415 SERR("kevent");
416 return ctx->rc;
417 }
418#endif /* USE_KQUEUE */
419 c->events = events;
420 }
421 return GRN_SUCCESS;
422 }
423 return GRN_INVALID_ARGUMENT;
424}
425
426grn_rc
427grn_com_event_del(grn_ctx *ctx, grn_com_event *ev, grn_sock fd)
428{
429 if (!ev) { return GRN_INVALID_ARGUMENT; }
430 {
431 grn_com *c;
432 grn_id id = grn_hash_get(ctx, ev->hash, &fd, sizeof(grn_sock), (void **)&c);
433 if (id) {
434#ifdef USE_EPOLL
435 if (!c->closed) {
436 struct epoll_event e;
437 memset(&e, 0, sizeof(struct epoll_event));
438 e.data.fd = fd;
439 e.events = c->events;
440 if (epoll_ctl(ev->epfd, EPOLL_CTL_DEL, fd, &e) == -1) {
441 SERR("epoll_ctl");
442 return ctx->rc;
443 }
444 }
445#endif /* USE_EPOLL*/
446#ifdef USE_KQUEUE
447 struct kevent e;
448 EV_SET(&e, (fd), c->events, EV_DELETE, 0, 0, NULL);
449 if (kevent(ev->kqfd, &e, 1, NULL, 0, NULL) == -1) {
450 SERR("kevent");
451 return ctx->rc;
452 }
453#endif /* USE_KQUEUE */
454 return grn_hash_delete_by_id(ctx, ev->hash, id, NULL);
455 } else {
456 GRN_LOG(ctx, GRN_LOG_ERROR,
457 "%04x| fd(%" GRN_FMT_SOCKET ") not found in ev(%p)",
458 grn_getpid(), fd, ev);
459 return GRN_INVALID_ARGUMENT;
460 }
461 }
462}
463
464#define LISTEN_BACKLOG 0x1000
465
466grn_rc
467grn_com_event_start_accept(grn_ctx *ctx, grn_com_event *ev)
468{
469 grn_com *com = ev->acceptor;
470
471 if (com->accepting) {return ctx->rc;}
472
473 GRN_API_ENTER;
474 if (!grn_com_event_mod(ctx, ev, com->fd, GRN_COM_POLLIN, NULL)) {
475 if (listen(com->fd, LISTEN_BACKLOG) == 0) {
476 com->accepting = GRN_TRUE;
477 } else {
478 SOERR("listen - start accept");
479 }
480 }
481 GRN_API_RETURN(ctx->rc);
482}
483
484grn_rc
485grn_com_event_stop_accept(grn_ctx *ctx, grn_com_event *ev)
486{
487 grn_com *com = ev->acceptor;
488
489 if (!com->accepting) {return ctx->rc;}
490
491 GRN_API_ENTER;
492 if (!grn_com_event_mod(ctx, ev, com->fd, 0, NULL)) {
493 if (listen(com->fd, 0) == 0) {
494 com->accepting = GRN_FALSE;
495 } else {
496 SOERR("listen - disable accept");
497 }
498 }
499 GRN_API_RETURN(ctx->rc);
500}
501
502static void
503grn_com_receiver(grn_ctx *ctx, grn_com *com)
504{
505 grn_com_event *ev = com->ev;
506 ERRCLR(ctx);
507 if (ev->acceptor == com) {
508 grn_com *ncs;
509 grn_sock fd = accept(com->fd, NULL, NULL);
510 if (fd == -1) {
511 if (errno == EMFILE) {
512 grn_com_event_stop_accept(ctx, ev);
513 } else {
514 SOERR("accept");
515 }
516 return;
517 }
518 if (grn_com_event_add(ctx, ev, fd, GRN_COM_POLLIN, (grn_com **)&ncs)) {
519 grn_sock_close(fd);
520 return;
521 }
522 ncs->has_sid = 0;
523 ncs->closed = 0;
524 ncs->opaque = NULL;
525 GRN_COM_QUEUE_INIT(&ncs->new_);
526 // GRN_LOG(ctx, GRN_LOG_NOTICE, "accepted (%d)", fd);
527 return;
528 } else {
529 grn_msg *msg = (grn_msg *)grn_msg_open(ctx, com, &ev->recv_old);
530 grn_com_recv(ctx, msg->u.peer, &msg->header, (grn_obj *)msg);
531 if (msg->u.peer /* is_edge_request(msg)*/) {
532 grn_memcpy(&msg->edge_id, &ev->curr_edge_id, sizeof(grn_com_addr));
533 if (!com->has_sid) {
534 com->has_sid = 1;
535 com->sid = ev->curr_edge_id.sid++;
536 }
537 msg->edge_id.sid = com->sid;
538 }
539 msg->acceptor = ev->acceptor;
540 ev->msg_handler(ctx, (grn_obj *)msg);
541 }
542}
543
544grn_rc
545grn_com_event_poll(grn_ctx *ctx, grn_com_event *ev, int timeout)
546{
547 int nevents;
548 grn_com *com;
549#ifdef USE_SELECT
550 uint32_t dummy;
551 grn_sock *pfd;
552 int nfds = 0;
553 fd_set rfds;
554 fd_set wfds;
555 struct timeval tv;
556 if (timeout >= 0) {
557 tv.tv_sec = timeout / 1000;
558 tv.tv_usec = (timeout % 1000) * 1000;
559 }
560 FD_ZERO(&rfds);
561 FD_ZERO(&wfds);
562 ctx->errlvl = GRN_OK;
563 ctx->rc = GRN_SUCCESS;
564 {
565 grn_hash_cursor *cursor;
566 cursor = grn_hash_cursor_open(ctx, ev->hash, NULL, 0, NULL, 0, 0, -1, 0);
567 if (cursor) {
568 grn_id id;
569 while ((id = grn_hash_cursor_next(ctx, cursor))) {
570 grn_hash_cursor_get_key_value(ctx,
571 cursor,
572 (void **)(&pfd),
573 &dummy,
574 (void **)(&com));
575 if (com->events & GRN_COM_POLLIN) { FD_SET(*pfd, &rfds); }
576 if (com->events & GRN_COM_POLLOUT) { FD_SET(*pfd, &wfds); }
577# ifndef WIN32
578 if (*pfd > nfds) { nfds = *pfd; }
579# endif /* WIN32 */
580 }
581 grn_hash_cursor_close(ctx, cursor);
582 }
583 }
584 nevents = select(nfds + 1, &rfds, &wfds, NULL, (timeout >= 0) ? &tv : NULL);
585 if (nevents < 0) {
586 SOERR("select");
587 if (ctx->rc == GRN_INTERRUPTED_FUNCTION_CALL) { ERRCLR(ctx); }
588 return ctx->rc;
589 }
590 if (timeout < 0 && !nevents) { GRN_LOG(ctx, GRN_LOG_NOTICE, "select returns 0 events"); }
591 GRN_HASH_EACH(ctx, ev->hash, eh, &pfd, &dummy, &com, {
592 if (FD_ISSET(*pfd, &rfds)) { grn_com_receiver(ctx, com); }
593 });
594#else /* USE_SELECT */
595# ifdef USE_EPOLL
596 struct epoll_event *ep;
597 ctx->errlvl = GRN_OK;
598 ctx->rc = GRN_SUCCESS;
599 nevents = epoll_wait(ev->epfd, ev->events, ev->max_nevents, timeout);
600 if (nevents < 0) {
601 SERR("epoll_wait");
602 }
603# else /* USE_EPOLL */
604# ifdef USE_KQUEUE
605 struct kevent *ep;
606 struct timespec tv;
607 if (timeout >= 0) {
608 tv.tv_sec = timeout / 1000;
609 tv.tv_nsec = (timeout % 1000) * 1000;
610 }
611 nevents = kevent(ev->kqfd, NULL, 0, ev->events, ev->max_nevents, &tv);
612 if (nevents < 0) {
613 SERR("kevent");
614 }
615# else /* USE_KQUEUE */
616 uint32_t dummy;
617 int nfd = 0, *pfd;
618 struct pollfd *ep = ev->events;
619 ctx->errlvl = GRN_OK;
620 ctx->rc = GRN_SUCCESS;
621 GRN_HASH_EACH(ctx, ev->hash, eh, &pfd, &dummy, &com, {
622 ep->fd = *pfd;
623 // ep->events =(short) com->events;
624 ep->events = POLLIN;
625 ep->revents = 0;
626 ep++;
627 nfd++;
628 });
629 nevents = poll(ev->events, nfd, timeout);
630 if (nevents < 0) {
631 SERR("poll");
632 }
633# endif /* USE_KQUEUE */
634# endif /* USE_EPOLL */
635 if (ctx->rc != GRN_SUCCESS) {
636 if (ctx->rc == GRN_INTERRUPTED_FUNCTION_CALL) {
637 ERRCLR(ctx);
638 }
639 return ctx->rc;
640 }
641 if (timeout < 0 && !nevents) { GRN_LOG(ctx, GRN_LOG_NOTICE, "poll returns 0 events"); }
642 for (ep = ev->events; nevents; ep++) {
643 int efd;
644# ifdef USE_EPOLL
645 efd = ep->data.fd;
646 nevents--;
647 // todo : com = ep->data.ptr;
648 if (!grn_hash_get(ctx, ev->hash, &efd, sizeof(grn_sock), (void *)&com)) {
649 struct epoll_event e;
650 GRN_LOG(ctx, GRN_LOG_ERROR, "fd(%d) not found in ev->hash", efd);
651 memset(&e, 0, sizeof(struct epoll_event));
652 e.data.fd = efd;
653 e.events = ep->events;
654 if (epoll_ctl(ev->epfd, EPOLL_CTL_DEL, efd, &e) == -1) { SERR("epoll_ctl"); }
655 if (grn_sock_close(efd) == -1) { SOERR("close"); }
656 continue;
657 }
658 if (ep->events & GRN_COM_POLLIN) { grn_com_receiver(ctx, com); }
659# else /* USE_EPOLL */
660# ifdef USE_KQUEUE
661 efd = ep->ident;
662 nevents--;
663 // todo : com = ep->udata;
664 if (!grn_hash_get(ctx, ev->hash, &efd, sizeof(grn_sock), (void *)&com)) {
665 struct kevent e;
666 GRN_LOG(ctx, GRN_LOG_ERROR, "fd(%d) not found in ev->set", efd);
667 EV_SET(&e, efd, ep->filter, EV_DELETE, 0, 0, NULL);
668 if (kevent(ev->kqfd, &e, 1, NULL, 0, NULL) == -1) { SERR("kevent"); }
669 if (grn_sock_close(efd) == -1) { SOERR("close"); }
670 continue;
671 }
672 if (ep->filter == GRN_COM_POLLIN) { grn_com_receiver(ctx, com); }
673# else
674 efd = ep->fd;
675 if (!(ep->events & ep->revents)) { continue; }
676 nevents--;
677 if (!grn_hash_get(ctx, ev->hash, &efd, sizeof(grn_sock), (void *)&com)) {
678 GRN_LOG(ctx, GRN_LOG_ERROR, "fd(%d) not found in ev->hash", efd);
679 if (grn_sock_close(efd) == -1) { SOERR("close"); }
680 continue;
681 }
682 if (ep->revents & GRN_COM_POLLIN) { grn_com_receiver(ctx, com); }
683# endif /* USE_KQUEUE */
684# endif /* USE_EPOLL */
685 }
686#endif /* USE_SELECT */
687 /* todo :
688 while (!(msg = (grn_com_msg *)grn_com_queue_deque(&recv_old))) {
689 grn_msg_close(ctx, msg);
690 }
691 */
692 return GRN_SUCCESS;
693}
694
695grn_rc
696grn_com_send_http(grn_ctx *ctx, grn_com *cs, const char *path, uint32_t path_len, int flags)
697{
698 ssize_t ret;
699 grn_obj buf;
700 GRN_TEXT_INIT(&buf, 0);
701 GRN_TEXT_PUTS(ctx, &buf, "GET ");
702 grn_bulk_write(ctx, &buf, path, path_len);
703 GRN_TEXT_PUTS(ctx, &buf, " HTTP/1.0\r\n\r\n");
704 // todo : refine
705 if ((ret = send(cs->fd, GRN_BULK_HEAD(&buf), GRN_BULK_VSIZE(&buf), MSG_NOSIGNAL|flags)) == -1) {
706 SOERR("send");
707 }
708 if (ret != GRN_BULK_VSIZE(&buf)) {
709 GRN_LOG(ctx, GRN_LOG_NOTICE, "send %d != %d", (int)ret, (int)GRN_BULK_VSIZE(&buf));
710 }
711 grn_obj_close(ctx, &buf);
712 return ctx->rc;
713}
714
715grn_rc
716grn_com_send(grn_ctx *ctx, grn_com *cs,
717 grn_com_header *header, const char *body, uint32_t size, int flags)
718{
719 grn_rc rc = GRN_SUCCESS;
720 size_t whole_size = sizeof(grn_com_header) + size;
721 ssize_t ret;
722 header->size = htonl(size);
723 GRN_LOG(ctx, GRN_LOG_INFO, "send (%d,%x,%d,%02x,%02x,%04x)", size, header->flags, header->proto, header->qtype, header->level, header->status);
724
725 if (size) {
726#ifdef WIN32
727 WSABUF wsabufs[2];
728 DWORD n_sent;
729 wsabufs[0].buf = (char *)header;
730 wsabufs[0].len = sizeof(grn_com_header);
731 wsabufs[1].buf = (char *)body;
732 wsabufs[1].len = size;
733 if (WSASend(cs->fd, wsabufs, 2, &n_sent, 0, NULL, NULL) == SOCKET_ERROR) {
734 SOERR("WSASend");
735 }
736 ret = n_sent;
737#else /* WIN32 */
738 struct iovec msg_iov[2];
739 struct msghdr msg;
740 memset(&msg, 0, sizeof(struct msghdr));
741 msg.msg_name = NULL;
742 msg.msg_namelen = 0;
743 msg.msg_iov = msg_iov;
744 msg.msg_iovlen = 2;
745 msg_iov[0].iov_base = (char*) header;
746 msg_iov[0].iov_len = sizeof(grn_com_header);
747 msg_iov[1].iov_base = (char *)body;
748 msg_iov[1].iov_len = size;
749 if ((ret = sendmsg(cs->fd, &msg, MSG_NOSIGNAL|flags)) == -1) {
750 SOERR("sendmsg");
751 rc = ctx->rc;
752 }
753#endif /* WIN32 */
754 } else {
755 if ((ret = send(cs->fd, (const void *)header, whole_size, MSG_NOSIGNAL|flags)) == -1) {
756 SOERR("send");
757 rc = ctx->rc;
758 }
759 }
760 if (ret != whole_size) {
761 GRN_LOG(ctx, GRN_LOG_ERROR,
762 "sendmsg(%" GRN_FMT_SOCKET "): %" GRN_FMT_LLD " < %" GRN_FMT_LLU,
763 cs->fd, (long long int)ret, (unsigned long long int)whole_size);
764 rc = ctx->rc;
765 }
766 return rc;
767}
768
769#define RETRY_MAX 10
770
771static const char *
772scan_delimiter(const char *p, const char *e)
773{
774 while (p + 4 <= e) {
775 if (p[3] == '\n') {
776 if (p[2] == '\r') {
777 if (p[1] == '\n') {
778 if (p[0] == '\r') { return p + 4; } else { p += 2; }
779 } else { p += 2; }
780 } else { p += 4; }
781 } else { p += p[3] == '\r' ? 1 : 4; }
782 }
783 return NULL;
784}
785
786#define BUFSIZE 4096
787
788static grn_rc
789grn_com_recv_text(grn_ctx *ctx, grn_com *com,
790 grn_com_header *header, grn_obj *buf, ssize_t ret)
791{
792 const char *p;
793 int retry = 0;
794 grn_bulk_write(ctx, buf, (char *)header, ret);
795 if ((p = scan_delimiter(GRN_BULK_HEAD(buf), GRN_BULK_CURR(buf)))) {
796 header->qtype = *GRN_BULK_HEAD(buf);
797 header->proto = GRN_COM_PROTO_HTTP;
798 header->size = GRN_BULK_VSIZE(buf);
799 goto exit;
800 }
801 for (;;) {
802 if (grn_bulk_reserve(ctx, buf, BUFSIZE)) { return ctx->rc; }
803 if ((ret = recv(com->fd, GRN_BULK_CURR(buf), BUFSIZE, 0)) < 0) {
804 SOERR("recv text");
805 if (ctx->rc == GRN_OPERATION_WOULD_BLOCK ||
806 ctx->rc == GRN_INTERRUPTED_FUNCTION_CALL) {
807 ERRCLR(ctx);
808 continue;
809 }
810 goto exit;
811 }
812 if (ret) {
813 off_t o = GRN_BULK_VSIZE(buf);
814 p = GRN_BULK_CURR(buf);
815 GRN_BULK_INCR_LEN(buf, ret);
816 if (scan_delimiter(p - (o > 3 ? 3 : o), p + ret)) {
817 break;
818 }
819 } else {
820 if (++retry > RETRY_MAX) {
821 // ERR(GRN_RETRY_MAX, "retry max in recv text");
822 goto exit;
823 }
824 }
825 }
826 header->qtype = *GRN_BULK_HEAD(buf);
827 header->proto = GRN_COM_PROTO_HTTP;
828 header->size = GRN_BULK_VSIZE(buf);
829exit :
830 if (header->qtype == 'H') {
831 //todo : refine
832 /*
833 GRN_BULK_REWIND(buf);
834 grn_bulk_reserve(ctx, buf, BUFSIZE);
835 if ((ret = recv(com->fd, GRN_BULK_CURR(buf), BUFSIZE, 0)) < 0) {
836 SOERR("recv text body");
837 } else {
838 GRN_BULK_CURR(buf) += ret;
839 }
840 */
841 }
842 return ctx->rc;
843}
844
845grn_rc
846grn_com_recv(grn_ctx *ctx, grn_com *com, grn_com_header *header, grn_obj *buf)
847{
848 ssize_t ret;
849 int retry = 0;
850 byte *p = (byte *)header;
851 size_t rest = sizeof(grn_com_header);
852 do {
853 if ((ret = recv(com->fd, p, rest, 0)) < 0) {
854 SOERR("recv size");
855 GRN_LOG(ctx, GRN_LOG_ERROR, "recv error (%" GRN_FMT_SOCKET ")", com->fd);
856 if (ctx->rc == GRN_OPERATION_WOULD_BLOCK ||
857 ctx->rc == GRN_INTERRUPTED_FUNCTION_CALL) {
858 ERRCLR(ctx);
859 continue;
860 }
861 goto exit;
862 }
863 if (ret) {
864 if (header->proto < 0x80) {
865 return grn_com_recv_text(ctx, com, header, buf, ret);
866 }
867 rest -= ret, p += ret;
868 } else {
869 if (++retry > RETRY_MAX) {
870 // ERR(GRN_RETRY_MAX, "retry max in recv header (%d)", com->fd);
871 goto exit;
872 }
873 }
874 } while (rest);
875 GRN_LOG(ctx, GRN_LOG_INFO,
876 "recv (%u,%x,%d,%02x,%02x,%04x)",
877 (uint32_t)ntohl(header->size),
878 header->flags,
879 header->proto,
880 header->qtype,
881 header->level,
882 header->status);
883 {
884 uint8_t proto = header->proto;
885 size_t value_size = ntohl(header->size);
886 GRN_BULK_REWIND(buf);
887 switch (proto) {
888 case GRN_COM_PROTO_GQTP :
889 case GRN_COM_PROTO_MBREQ :
890 if (GRN_BULK_WSIZE(buf) < value_size) {
891 if (grn_bulk_resize(ctx, buf, value_size)) {
892 goto exit;
893 }
894 }
895 retry = 0;
896 for (rest = value_size; rest;) {
897 if ((ret = recv(com->fd, GRN_BULK_CURR(buf), rest, MSG_WAITALL)) < 0) {
898 SOERR("recv body");
899 if (ctx->rc == GRN_OPERATION_WOULD_BLOCK ||
900 ctx->rc == GRN_INTERRUPTED_FUNCTION_CALL) {
901 ERRCLR(ctx);
902 continue;
903 }
904 goto exit;
905 }
906 if (ret) {
907 rest -= ret;
908 GRN_BULK_INCR_LEN(buf, ret);
909 } else {
910 if (++retry > RETRY_MAX) {
911 // ERR(GRN_RETRY_MAX, "retry max in recv body");
912 goto exit;
913 }
914 }
915 }
916 break;
917 default :
918 GRN_LOG(ctx, GRN_LOG_ERROR, "illegal header: %d", proto);
919 ctx->rc = GRN_INVALID_FORMAT;
920 goto exit;
921 }
922 }
923exit :
924 return ctx->rc;
925}
926
927grn_com *
928grn_com_copen(grn_ctx *ctx, grn_com_event *ev, const char *dest, int port)
929{
930 grn_sock fd = -1;
931 grn_com *cs = NULL;
932
933 struct addrinfo hints, *addrinfo_list, *addrinfo_ptr;
934 char port_string[16];
935 int getaddrinfo_result;
936
937 memset(&hints, 0, sizeof(hints));
938 hints.ai_family = AF_UNSPEC;
939 hints.ai_socktype = SOCK_STREAM;
940#ifdef AI_NUMERICSERV
941 hints.ai_flags = AI_NUMERICSERV;
942#endif
943 grn_snprintf(port_string, sizeof(port_string), sizeof(port_string),
944 "%d", port);
945
946 getaddrinfo_result = getaddrinfo(dest, port_string, &hints, &addrinfo_list);
947 if (getaddrinfo_result != 0) {
948 switch (getaddrinfo_result) {
949#ifdef EAI_MEMORY
950 case EAI_MEMORY:
951 ERR(GRN_NO_MEMORY_AVAILABLE, "getaddrinfo: <%s:%s>: %s",
952 dest, port_string, gai_strerror(getaddrinfo_result));
953 break;
954#endif
955#ifdef EAI_SYSTEM
956 case EAI_SYSTEM:
957 SOERR("getaddrinfo");
958 break;
959#endif
960 default:
961 ERR(GRN_INVALID_ARGUMENT, "getaddrinfo: <%s:%s>: %s",
962 dest, port_string, gai_strerror(getaddrinfo_result));
963 break;
964 }
965 return NULL;
966 }
967
968 for (addrinfo_ptr = addrinfo_list; addrinfo_ptr;
969 addrinfo_ptr = addrinfo_ptr->ai_next) {
970 fd = socket(addrinfo_ptr->ai_family, addrinfo_ptr->ai_socktype,
971 addrinfo_ptr->ai_protocol);
972 if (fd == -1) {
973 SOERR("socket");
974 continue;
975 }
976#ifdef TCP_NODELAY
977 {
978 static const int value = 1;
979 if (setsockopt(fd, 6, TCP_NODELAY,
980 (const char *)&value, sizeof(value)) != 0) {
981 SOERR("setsockopt");
982 grn_sock_close(fd);
983 continue;
984 }
985 }
986#endif
987 if (connect(fd, addrinfo_ptr->ai_addr, addrinfo_ptr->ai_addrlen) != 0) {
988 SOERR("connect");
989 grn_sock_close(fd);
990 continue;
991 }
992
993 break;
994 }
995
996 freeaddrinfo(addrinfo_list);
997
998 if (!addrinfo_ptr) {
999 return NULL;
1000 }
1001 ctx->errlvl = GRN_OK;
1002 ctx->rc = GRN_SUCCESS;
1003
1004 if (ev) {
1005 grn_com_event_add(ctx, ev, fd, GRN_COM_POLLIN, &cs);
1006 } else {
1007 cs = GRN_CALLOC(sizeof(grn_com));
1008 if (cs) {
1009 cs->fd = fd;
1010 }
1011 }
1012 if (!cs) {
1013 grn_sock_close(fd);
1014 }
1015 return cs;
1016}
1017
1018void
1019grn_com_close_(grn_ctx *ctx, grn_com *com)
1020{
1021 grn_sock fd = com->fd;
1022 if (shutdown(fd, SHUT_RDWR) == -1) { /* SOERR("shutdown"); */ }
1023 if (grn_sock_close(fd) == -1) {
1024 SOERR("close");
1025 } else {
1026 com->closed = 1;
1027 }
1028}
1029
1030grn_rc
1031grn_com_close(grn_ctx *ctx, grn_com *com)
1032{
1033 grn_sock fd = com->fd;
1034 grn_com_event *ev = com->ev;
1035 if (ev) {
1036 grn_com *acceptor = ev->acceptor;
1037 grn_com_event_del(ctx, ev, fd);
1038 if (acceptor) { grn_com_event_start_accept(ctx, ev); }
1039 }
1040 if (!com->closed) { grn_com_close_(ctx, com); }
1041 if (!ev) { GRN_FREE(com); }
1042 return GRN_SUCCESS;
1043}
1044
1045grn_rc
1046grn_com_sopen(grn_ctx *ctx, grn_com_event *ev,
1047 const char *bind_address, int port, grn_msg_handler *func,
1048 struct hostent *he)
1049{
1050 grn_sock lfd = -1;
1051 grn_com *cs = NULL;
1052 int getaddrinfo_result;
1053 struct addrinfo *bind_address_info = NULL;
1054 struct addrinfo hints;
1055 char port_string[6]; /* ceil(log10(65535)) + 1 ('\0')*/
1056
1057 GRN_API_ENTER;
1058 if (!bind_address) {
1059 bind_address = "0.0.0.0";
1060 }
1061 grn_snprintf(port_string, sizeof(port_string), sizeof(port_string),
1062 "%d", port);
1063 memset(&hints, 0, sizeof(struct addrinfo));
1064 hints.ai_family = PF_UNSPEC;
1065 hints.ai_socktype = SOCK_STREAM;
1066#ifdef AI_NUMERICSERV
1067 hints.ai_flags = AI_NUMERICSERV;
1068#endif
1069 getaddrinfo_result = getaddrinfo(bind_address, port_string,
1070 &hints, &bind_address_info);
1071 if (getaddrinfo_result != 0) {
1072 switch (getaddrinfo_result) {
1073#ifdef EAI_MEMORY
1074 case EAI_MEMORY:
1075 ERR(GRN_NO_MEMORY_AVAILABLE,
1076 "getaddrinfo: <%s:%s>: %s",
1077 bind_address, port_string, gai_strerror(getaddrinfo_result));
1078 break;
1079#endif
1080#ifdef EAI_SYSTEM
1081 case EAI_SYSTEM:
1082 SOERR("getaddrinfo");
1083 break;
1084#endif
1085 default:
1086 ERR(GRN_INVALID_ARGUMENT,
1087 "getaddrinfo: <%s:%s>: %s",
1088 bind_address, port_string, gai_strerror(getaddrinfo_result));
1089 break;
1090 }
1091 goto exit;
1092 }
1093 if ((lfd = socket(bind_address_info->ai_family, SOCK_STREAM, 0)) == -1) {
1094 SOERR("socket");
1095 goto exit;
1096 }
1097 grn_memcpy(&ev->curr_edge_id.addr, he->h_addr, he->h_length);
1098 ev->curr_edge_id.port = htons(port);
1099 ev->curr_edge_id.sid = 0;
1100 {
1101 int v = 1;
1102#ifdef TCP_NODELAY
1103 if (setsockopt(lfd, SOL_TCP, TCP_NODELAY, (void *) &v, sizeof(int)) == -1) {
1104 SOERR("setsockopt");
1105 goto exit;
1106 }
1107#endif
1108 if (setsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, (void *) &v, sizeof(int)) == -1) {
1109 SOERR("setsockopt");
1110 goto exit;
1111 }
1112 }
1113 if (bind(lfd, bind_address_info->ai_addr, bind_address_info->ai_addrlen) < 0) {
1114 SOERR("bind");
1115 goto exit;
1116 }
1117 if (listen(lfd, LISTEN_BACKLOG) < 0) {
1118 SOERR("listen");
1119 goto exit;
1120 }
1121 if (ev) {
1122 if (grn_com_event_add(ctx, ev, lfd, GRN_COM_POLLIN, &cs)) { goto exit; }
1123 ev->acceptor = cs;
1124 ev->msg_handler = func;
1125 cs->has_sid = 0;
1126 cs->closed = 0;
1127 cs->opaque = NULL;
1128 GRN_COM_QUEUE_INIT(&cs->new_);
1129 } else {
1130 if (!(cs = GRN_MALLOC(sizeof(grn_com)))) { goto exit; }
1131 cs->fd = lfd;
1132 }
1133 cs->accepting = GRN_TRUE;
1134exit :
1135 if (!cs && lfd != 1) { grn_sock_close(lfd); }
1136 if (bind_address_info) { freeaddrinfo(bind_address_info); }
1137 GRN_API_RETURN(ctx->rc);
1138}
1139
1140
1141grn_hash *grn_edges = NULL;
1142void (*grn_dispatcher)(grn_ctx *ctx, grn_edge *edge);
1143
1144void
1145grn_edges_init(grn_ctx *ctx, void (*dispatcher)(grn_ctx *ctx, grn_edge *edge))
1146{
1147 grn_edges = grn_hash_create(ctx, NULL, sizeof(grn_com_addr), sizeof(grn_edge), 0);
1148 grn_dispatcher = dispatcher;
1149}
1150
1151void
1152grn_edges_fin(grn_ctx *ctx)
1153{
1154 grn_hash_close(ctx, grn_edges);
1155}
1156
1157grn_edge *
1158grn_edges_add(grn_ctx *ctx, grn_com_addr *addr, int *added)
1159{
1160 if (grn_io_lock(ctx, grn_edges->io, grn_lock_timeout)) {
1161 return NULL;
1162 } else {
1163 grn_edge *edge;
1164 grn_id id = grn_hash_add(ctx, grn_edges, addr, sizeof(grn_com_addr),
1165 (void **)&edge, added);
1166 grn_io_unlock(grn_edges->io);
1167 if (id) { edge->id = id; }
1168 return edge;
1169 }
1170}
1171
1172void
1173grn_edges_delete(grn_ctx *ctx, grn_edge *edge)
1174{
1175 if (!grn_io_lock(ctx, grn_edges->io, grn_lock_timeout)) {
1176 grn_hash_delete_by_id(ctx, grn_edges, edge->id, NULL);
1177 grn_io_unlock(grn_edges->io);
1178 }
1179}
1180
1181grn_edge *
1182grn_edges_add_communicator(grn_ctx *ctx, grn_com_addr *addr)
1183{
1184 int added;
1185 grn_edge *edge = grn_edges_add(ctx, addr, &added);
1186 if (added) {
1187 grn_ctx_init(&edge->ctx, 0);
1188 GRN_COM_QUEUE_INIT(&edge->recv_new);
1189 GRN_COM_QUEUE_INIT(&edge->send_old);
1190 edge->com = NULL;
1191 edge->stat = 0 /*EDGE_IDLE*/;
1192 edge->flags = GRN_EDGE_COMMUNICATOR;
1193 }
1194 return edge;
1195}
1196
1197void
1198grn_edge_dispatch(grn_ctx *ctx, grn_edge *edge, grn_obj *msg)
1199{
1200 grn_com_queue_enque(ctx, &edge->recv_new, (grn_com_queue_entry *)msg);
1201 grn_dispatcher(ctx, edge);
1202}
1203