| 1 | /* -*- c-basic-offset: 2 -*- */ | 
| 2 | /* Copyright(C) 2009-2012 Brazil | 
| 3 |  | 
| 4 |   This library is free software; you can redistribute it and/or | 
| 5 |   modify it under the terms of the GNU Lesser General Public | 
| 6 |   License version 2.1 as published by the Free Software Foundation. | 
| 7 |  | 
| 8 |   This library is distributed in the hope that it will be useful, | 
| 9 |   but WITHOUT ANY WARRANTY; without even the implied warranty of | 
| 10 |   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU | 
| 11 |   Lesser General Public License for more details. | 
| 12 |  | 
| 13 |   You should have received a copy of the GNU Lesser General Public | 
| 14 |   License along with this library; if not, write to the Free Software | 
| 15 |   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA | 
| 16 | */ | 
| 17 |  | 
| 18 | #include "grn.h" | 
| 19 |  | 
| 20 | #include <stdio.h> | 
| 21 | #include <string.h> | 
| 22 | #include "grn_ctx_impl.h" | 
| 23 |  | 
| 24 | #ifdef WIN32 | 
| 25 | # include <ws2tcpip.h> | 
| 26 | #else | 
| 27 | # ifdef HAVE_SYS_SOCKET_H | 
| 28 | #  include <sys/socket.h> | 
| 29 | # endif /* HAVE_SYS_SOCKET_H */ | 
| 30 | # include <netinet/in.h> | 
| 31 | # include <netinet/tcp.h> | 
| 32 | # ifdef HAVE_SIGNAL_H | 
| 33 | #  include <signal.h> | 
| 34 | # endif /* HAVE_SIGNAL_H */ | 
| 35 | # include <sys/uio.h> | 
| 36 | #endif /* WIN32 */ | 
| 37 |  | 
| 38 | #include "grn_ctx.h" | 
| 39 | #include "grn_com.h" | 
| 40 |  | 
| 41 | #ifndef PF_INET | 
| 42 | #define PF_INET AF_INET | 
| 43 | #endif /* PF_INET */ | 
| 44 |  | 
| 45 | #ifndef SOL_TCP | 
| 46 | #  ifdef IPPROTO_TCP | 
| 47 | #    define SOL_TCP IPPROTO_TCP | 
| 48 | #  else | 
| 49 | #    define SOL_TCP 6 | 
| 50 | #  endif /* IPPROTO_TCP */ | 
| 51 | #endif /* SOL_TCP */ | 
| 52 |  | 
| 53 | #ifndef USE_MSG_MORE | 
| 54 | #  ifdef MSG_MORE | 
| 55 | #    undef MSG_MORE | 
| 56 | #  endif | 
| 57 | #  define MSG_MORE     0 | 
| 58 | #endif /* USE_MSG_MORE */ | 
| 59 |  | 
| 60 |  | 
| 61 | #ifndef USE_MSG_NOSIGNAL | 
| 62 | #  ifdef MSG_NOSIGNAL | 
| 63 | #    undef MSG_NOSIGNAL | 
| 64 | #  endif | 
| 65 | #  define MSG_NOSIGNAL 0 | 
| 66 | #endif /* USE_MSG_NOSIGNAL */ | 
| 67 | /******* grn_com_queue ********/ | 
| 68 |  | 
| 69 | grn_rc | 
| 70 | grn_com_queue_enque(grn_ctx *ctx, grn_com_queue *q, grn_com_queue_entry *e) | 
| 71 | { | 
| 72 |   CRITICAL_SECTION_ENTER(q->cs); | 
| 73 |   e->next = NULL; | 
| 74 |   *q->tail = e; | 
| 75 |   q->tail = &e->next; | 
| 76 |   CRITICAL_SECTION_LEAVE(q->cs); | 
| 77 |   /* | 
| 78 |   uint8_t i = q->last + 1; | 
| 79 |   e->next = NULL; | 
| 80 |   if (q->first == i || q->next) { | 
| 81 |     CRITICAL_SECTION_ENTER(q->cs); | 
| 82 |     if (q->first == i || q->next) { | 
| 83 |       *q->tail = e; | 
| 84 |       q->tail = &e->next; | 
| 85 |     } else { | 
| 86 |       q->bins[q->last] = e; | 
| 87 |       q->last = i; | 
| 88 |     } | 
| 89 |     CRITICAL_SECTION_LEAVE(q->cs); | 
| 90 |   } else { | 
| 91 |     q->bins[q->last] = e; | 
| 92 |     q->last = i; | 
| 93 |   } | 
| 94 |   */ | 
| 95 |   return GRN_SUCCESS; | 
| 96 | } | 
| 97 |  | 
| 98 | grn_com_queue_entry * | 
| 99 | grn_com_queue_deque(grn_ctx *ctx, grn_com_queue *q) | 
| 100 | { | 
| 101 |   grn_com_queue_entry *e = NULL; | 
| 102 |  | 
| 103 |   CRITICAL_SECTION_ENTER(q->cs); | 
| 104 |   if (q->next) { | 
| 105 |     e = q->next; | 
| 106 |     if (!(q->next = e->next)) { q->tail = &q->next; } | 
| 107 |   } | 
| 108 |   CRITICAL_SECTION_LEAVE(q->cs); | 
| 109 |  | 
| 110 |   /* | 
| 111 |   if (q->first == q->last) { | 
| 112 |     if (q->next) { | 
| 113 |       CRITICAL_SECTION_ENTER(q->cs); | 
| 114 |       e = q->next; | 
| 115 |       if (!(q->next = e->next)) { q->tail = &q->next; } | 
| 116 |       CRITICAL_SECTION_LEAVE(q->cs); | 
| 117 |     } | 
| 118 |   } else { | 
| 119 |     e = q->bins[q->first++]; | 
| 120 |   } | 
| 121 |   */ | 
| 122 |   return e; | 
| 123 | } | 
| 124 |  | 
| 125 | /******* grn_msg ********/ | 
| 126 |  | 
| 127 | grn_obj * | 
| 128 | grn_msg_open(grn_ctx *ctx, grn_com *com, grn_com_queue *old) | 
| 129 | { | 
| 130 |   grn_msg *msg = NULL; | 
| 131 |   if (old && (msg = (grn_msg *)grn_com_queue_deque(ctx, old))) { | 
| 132 |     if (msg->ctx != ctx) { | 
| 133 |       ERR(GRN_INVALID_ARGUMENT, "ctx unmatch" ); | 
| 134 |       return NULL; | 
| 135 |     } | 
| 136 |     GRN_BULK_REWIND(&msg->qe.obj); | 
| 137 |   } else if ((msg = GRN_MALLOCN(grn_msg, 1))) { | 
| 138 |     GRN_OBJ_INIT(&msg->qe.obj, GRN_MSG, 0, GRN_DB_TEXT); | 
| 139 |     msg->qe.obj.header.impl_flags |= GRN_OBJ_ALLOCATED; | 
| 140 |     msg->ctx = ctx; | 
| 141 |   } | 
| 142 |   msg->qe.next = NULL; | 
| 143 |   msg->u.peer = com; | 
| 144 |   msg->old = old; | 
| 145 |   memset(&msg->header, 0, sizeof(grn_com_header)); | 
| 146 |   return (grn_obj *)msg; | 
| 147 | } | 
| 148 |  | 
| 149 | grn_obj * | 
| 150 | grn_msg_open_for_reply(grn_ctx *ctx, grn_obj *query, grn_com_queue *old) | 
| 151 | { | 
| 152 |   grn_msg *req = (grn_msg *)query, *msg = NULL; | 
| 153 |   if (req && (msg = (grn_msg *)grn_msg_open(ctx, req->u.peer, old))) { | 
| 154 |     msg->edge_id = req->edge_id; | 
| 155 |     msg->header.proto = req->header.proto == GRN_COM_PROTO_MBREQ | 
| 156 |       ? GRN_COM_PROTO_MBRES : req->header.proto; | 
| 157 |   } | 
| 158 |   return (grn_obj *)msg; | 
| 159 | } | 
| 160 |  | 
| 161 | grn_rc | 
| 162 | grn_msg_close(grn_ctx *ctx, grn_obj *obj) | 
| 163 | { | 
| 164 |   grn_msg *msg = (grn_msg *)obj; | 
| 165 |   if (ctx == msg->ctx) { return grn_obj_close(ctx, obj); } | 
| 166 |   return grn_com_queue_enque(ctx, msg->old, (grn_com_queue_entry *)msg); | 
| 167 | } | 
| 168 |  | 
| 169 | grn_rc | 
| 170 | grn_msg_set_property(grn_ctx *ctx, grn_obj *obj, | 
| 171 |                      uint16_t status, uint32_t key_size, uint8_t ) | 
| 172 | { | 
| 173 |   grn_com_header * = &((grn_msg *)obj)->header; | 
| 174 |   header->status = htons(status); | 
| 175 |   header->keylen = htons(key_size); | 
| 176 |   header->level = extra_size; | 
| 177 |   return GRN_SUCCESS; | 
| 178 | } | 
| 179 |  | 
| 180 | grn_rc | 
| 181 | grn_msg_send(grn_ctx *ctx, grn_obj *msg, int flags) | 
| 182 | { | 
| 183 |   grn_rc rc; | 
| 184 |   grn_msg *m = (grn_msg *)msg; | 
| 185 |   grn_com *peer = m->u.peer; | 
| 186 |   grn_com_header * = &m->header; | 
| 187 |   if (GRN_COM_QUEUE_EMPTYP(&peer->new_)) { | 
| 188 |     switch (header->proto) { | 
| 189 |     case GRN_COM_PROTO_HTTP : | 
| 190 |       { | 
| 191 |         ssize_t ret; | 
| 192 |         ret = send(peer->fd, GRN_BULK_HEAD(msg), GRN_BULK_VSIZE(msg), MSG_NOSIGNAL); | 
| 193 |         if (ret == -1) { SOERR("send" ); } | 
| 194 |         if (ctx->rc != GRN_OPERATION_WOULD_BLOCK) { | 
| 195 |           grn_com_queue_enque(ctx, m->old, (grn_com_queue_entry *)msg); | 
| 196 |           return ctx->rc; | 
| 197 |         } | 
| 198 |       } | 
| 199 |       break; | 
| 200 |     case GRN_COM_PROTO_GQTP : | 
| 201 |       { | 
| 202 |         if (flags & GRN_CTX_MORE) { flags |= GRN_CTX_QUIET; } | 
| 203 |         if (ctx->stat == GRN_CTX_QUIT) { flags |= GRN_CTX_QUIT; } | 
| 204 |         header->qtype = (uint8_t) ctx->impl->output.type; | 
| 205 |         header->keylen = 0; | 
| 206 |         header->level = 0; | 
| 207 |         header->flags = flags; | 
| 208 |         header->status = htons((uint16_t)ctx->rc); | 
| 209 |         header->opaque = 0; | 
| 210 |         header->cas = 0; | 
| 211 |         //todo : MSG_DONTWAIT | 
| 212 |         rc = grn_com_send(ctx, peer, header, | 
| 213 |                           GRN_BULK_HEAD(msg), GRN_BULK_VSIZE(msg), 0); | 
| 214 |         if (rc != GRN_OPERATION_WOULD_BLOCK) { | 
| 215 |           grn_com_queue_enque(ctx, m->old, (grn_com_queue_entry *)msg); | 
| 216 |           return rc; | 
| 217 |         } | 
| 218 |       } | 
| 219 |       break; | 
| 220 |     case GRN_COM_PROTO_MBREQ : | 
| 221 |       return GRN_FUNCTION_NOT_IMPLEMENTED; | 
| 222 |     case GRN_COM_PROTO_MBRES : | 
| 223 |       rc = grn_com_send(ctx, peer, header, | 
| 224 |                         GRN_BULK_HEAD(msg), GRN_BULK_VSIZE(msg), | 
| 225 |                         (flags & GRN_CTX_MORE) ? MSG_MORE :0); | 
| 226 |       if (rc != GRN_OPERATION_WOULD_BLOCK) { | 
| 227 |         grn_com_queue_enque(ctx, m->old, (grn_com_queue_entry *)msg); | 
| 228 |         return rc; | 
| 229 |       } | 
| 230 |       break; | 
| 231 |     default : | 
| 232 |       return GRN_INVALID_ARGUMENT; | 
| 233 |     } | 
| 234 |   } | 
| 235 |   MUTEX_LOCK(peer->ev->mutex); | 
| 236 |   rc = grn_com_queue_enque(ctx, &peer->new_, (grn_com_queue_entry *)msg); | 
| 237 |   COND_SIGNAL(peer->ev->cond); | 
| 238 |   MUTEX_UNLOCK(peer->ev->mutex); | 
| 239 |   return rc; | 
| 240 | } | 
| 241 |  | 
| 242 | /******* grn_com ********/ | 
| 243 |  | 
| 244 | grn_rc | 
| 245 | grn_com_init(void) | 
| 246 | { | 
| 247 | #ifdef WIN32 | 
| 248 |   WSADATA wd; | 
| 249 |   if (WSAStartup(MAKEWORD(2, 0), &wd) != 0) { | 
| 250 |     grn_ctx *ctx = &grn_gctx; | 
| 251 |     SOERR("WSAStartup" ); | 
| 252 |   } | 
| 253 | #else /* WIN32 */ | 
| 254 | #ifndef USE_MSG_NOSIGNAL | 
| 255 |   if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) { | 
| 256 |     grn_ctx *ctx = &grn_gctx; | 
| 257 |     SERR("signal" ); | 
| 258 |   } | 
| 259 | #endif /* USE_MSG_NOSIGNAL */ | 
| 260 | #endif /* WIN32 */ | 
| 261 |   return grn_gctx.rc; | 
| 262 | } | 
| 263 |  | 
| 264 | void | 
| 265 | grn_com_fin(void) | 
| 266 | { | 
| 267 | #ifdef WIN32 | 
| 268 |   WSACleanup(); | 
| 269 | #endif /* WIN32 */ | 
| 270 | } | 
| 271 |  | 
| 272 | grn_rc | 
| 273 | grn_com_event_init(grn_ctx *ctx, grn_com_event *ev, int max_nevents, int data_size) | 
| 274 | { | 
| 275 |   ev->max_nevents = max_nevents; | 
| 276 |   if ((ev->hash = grn_hash_create(ctx, NULL, sizeof(grn_sock), data_size, 0))) { | 
| 277 |     MUTEX_INIT(ev->mutex); | 
| 278 |     COND_INIT(ev->cond); | 
| 279 |     GRN_COM_QUEUE_INIT(&ev->recv_old); | 
| 280 |     ev->msg_handler = NULL; | 
| 281 |     memset(&(ev->curr_edge_id), 0, sizeof(grn_com_addr)); | 
| 282 |     ev->acceptor = NULL; | 
| 283 |     ev->opaque = NULL; | 
| 284 | #ifndef USE_SELECT | 
| 285 | # ifdef USE_EPOLL | 
| 286 |     if ((ev->events = GRN_MALLOC(sizeof(struct epoll_event) * max_nevents))) { | 
| 287 |       if ((ev->epfd = epoll_create(max_nevents)) != -1) { | 
| 288 |         goto exit; | 
| 289 |       } else { | 
| 290 |         SERR("epoll_create" ); | 
| 291 |       } | 
| 292 |       GRN_FREE(ev->events); | 
| 293 |     } | 
| 294 | # else /* USE_EPOLL */ | 
| 295 | #  ifdef USE_KQUEUE | 
| 296 |     if ((ev->events = GRN_MALLOC(sizeof(struct kevent) * max_nevents))) { | 
| 297 |       if ((ev->kqfd = kqueue()) != -1) { | 
| 298 |         goto exit; | 
| 299 |       } else { | 
| 300 |         SERR("kqueue" ); | 
| 301 |       } | 
| 302 |       GRN_FREE(ev->events); | 
| 303 |     } | 
| 304 | #  else /* USE_KQUEUE */ | 
| 305 |     if ((ev->events = GRN_MALLOC(sizeof(struct pollfd) * max_nevents))) { | 
| 306 |       goto exit; | 
| 307 |     } | 
| 308 | #  endif /* USE_KQUEUE*/ | 
| 309 | # endif /* USE_EPOLL */ | 
| 310 |     grn_hash_close(ctx, ev->hash); | 
| 311 |     ev->hash = NULL; | 
| 312 |     ev->events = NULL; | 
| 313 | #else /* USE_SELECT */ | 
| 314 |     goto exit; | 
| 315 | #endif /* USE_SELECT */ | 
| 316 |   } | 
| 317 | exit : | 
| 318 |   return ctx->rc; | 
| 319 | } | 
| 320 |  | 
| 321 | grn_rc | 
| 322 | grn_com_event_fin(grn_ctx *ctx, grn_com_event *ev) | 
| 323 | { | 
| 324 |   grn_obj *msg; | 
| 325 |   while ((msg = (grn_obj *)grn_com_queue_deque(ctx, &ev->recv_old))) { | 
| 326 |     grn_msg_close(ctx, msg); | 
| 327 |   } | 
| 328 |   if (ev->hash) { grn_hash_close(ctx, ev->hash); } | 
| 329 | #ifndef USE_SELECT | 
| 330 |   if (ev->events) { GRN_FREE(ev->events); } | 
| 331 | # ifdef USE_EPOLL | 
| 332 |   grn_close(ev->epfd); | 
| 333 | # endif /* USE_EPOLL */ | 
| 334 | # ifdef USE_KQUEUE | 
| 335 |   grn_close(ev->kqfd); | 
| 336 | # endif /* USE_KQUEUE*/ | 
| 337 | #endif /* USE_SELECT */ | 
| 338 |   return GRN_SUCCESS; | 
| 339 | } | 
| 340 |  | 
| 341 | grn_rc | 
| 342 | grn_com_event_add(grn_ctx *ctx, grn_com_event *ev, grn_sock fd, int events, grn_com **com) | 
| 343 | { | 
| 344 |   grn_com *c; | 
| 345 |   /* todo : expand events */ | 
| 346 |   if (!ev || *ev->hash->n_entries == ev->max_nevents) { | 
| 347 |     if (ev) { GRN_LOG(ctx, GRN_LOG_ERROR, "too many connections (%d)" , ev->max_nevents); } | 
| 348 |     return GRN_INVALID_ARGUMENT; | 
| 349 |   } | 
| 350 | #ifdef USE_EPOLL | 
| 351 |   { | 
| 352 |     struct epoll_event e; | 
| 353 |     memset(&e, 0, sizeof(struct epoll_event)); | 
| 354 |     e.data.fd = (fd); | 
| 355 |     e.events = (uint32_t) events; | 
| 356 |     if (epoll_ctl(ev->epfd, EPOLL_CTL_ADD, (fd), &e) == -1) { | 
| 357 |       SERR("epoll_ctl" ); | 
| 358 |       return ctx->rc; | 
| 359 |     } | 
| 360 |   } | 
| 361 | #endif /* USE_EPOLL*/ | 
| 362 | #ifdef USE_KQUEUE | 
| 363 |   { | 
| 364 |     struct kevent e; | 
| 365 |     /* todo: udata should have fd */ | 
| 366 |     EV_SET(&e, (fd), events, EV_ADD, 0, 0, NULL); | 
| 367 |     if (kevent(ev->kqfd, &e, 1, NULL, 0, NULL) == -1) { | 
| 368 |       SERR("kevent" ); | 
| 369 |       return ctx->rc; | 
| 370 |     } | 
| 371 |   } | 
| 372 | #endif /* USE_KQUEUE */ | 
| 373 |   { | 
| 374 |     if (grn_hash_add(ctx, ev->hash, &fd, sizeof(grn_sock), (void **)&c, NULL)) { | 
| 375 |       c->ev = ev; | 
| 376 |       c->fd = fd; | 
| 377 |       c->events = events; | 
| 378 |       if (com) { *com = c; } | 
| 379 |     } | 
| 380 |   } | 
| 381 |   return ctx->rc; | 
| 382 | } | 
| 383 |  | 
| 384 | grn_rc | 
| 385 | grn_com_event_mod(grn_ctx *ctx, grn_com_event *ev, grn_sock fd, int events, grn_com **com) | 
| 386 | { | 
| 387 |   grn_com *c; | 
| 388 |   if (!ev) { return GRN_INVALID_ARGUMENT; } | 
| 389 |   if (grn_hash_get(ctx, ev->hash, &fd, sizeof(grn_sock), (void **)&c)) { | 
| 390 |     if (c->fd != fd) { | 
| 391 |       GRN_LOG(ctx, GRN_LOG_ERROR, | 
| 392 |               "grn_com_event_mod fd unmatch "  | 
| 393 |               "%"  GRN_FMT_SOCKET " != %"  GRN_FMT_SOCKET, | 
| 394 |               c->fd, fd); | 
| 395 |       return GRN_OBJECT_CORRUPT; | 
| 396 |     } | 
| 397 |     if (com) { *com = c; } | 
| 398 |     if (c->events != events) { | 
| 399 | #ifdef USE_EPOLL | 
| 400 |       struct epoll_event e; | 
| 401 |       memset(&e, 0, sizeof(struct epoll_event)); | 
| 402 |       e.data.fd = (fd); | 
| 403 |       e.events = (uint32_t) events; | 
| 404 |       if (epoll_ctl(ev->epfd, EPOLL_CTL_MOD, (fd), &e) == -1) { | 
| 405 |         SERR("epoll_ctl" ); | 
| 406 |         return ctx->rc; | 
| 407 |       } | 
| 408 | #endif /* USE_EPOLL*/ | 
| 409 | #ifdef USE_KQUEUE | 
| 410 |       // experimental | 
| 411 |       struct kevent e[2]; | 
| 412 |       EV_SET(&e[0], (fd), GRN_COM_POLLIN|GRN_COM_POLLOUT, EV_DELETE, 0, 0, NULL); | 
| 413 |       EV_SET(&e[1], (fd), events, EV_ADD, 0, 0, NULL); | 
| 414 |       if (kevent(ev->kqfd, e, 2, NULL, 0, NULL) == -1) { | 
| 415 |         SERR("kevent" ); | 
| 416 |         return ctx->rc; | 
| 417 |       } | 
| 418 | #endif /* USE_KQUEUE */ | 
| 419 |       c->events = events; | 
| 420 |     } | 
| 421 |     return GRN_SUCCESS; | 
| 422 |   } | 
| 423 |   return GRN_INVALID_ARGUMENT; | 
| 424 | } | 
| 425 |  | 
| 426 | grn_rc | 
| 427 | grn_com_event_del(grn_ctx *ctx, grn_com_event *ev, grn_sock fd) | 
| 428 | { | 
| 429 |   if (!ev) { return GRN_INVALID_ARGUMENT; } | 
| 430 |   { | 
| 431 |     grn_com *c; | 
| 432 |     grn_id id = grn_hash_get(ctx, ev->hash, &fd, sizeof(grn_sock), (void **)&c); | 
| 433 |     if (id) { | 
| 434 | #ifdef USE_EPOLL | 
| 435 |       if (!c->closed) { | 
| 436 |         struct epoll_event e; | 
| 437 |         memset(&e, 0, sizeof(struct epoll_event)); | 
| 438 |         e.data.fd = fd; | 
| 439 |         e.events = c->events; | 
| 440 |         if (epoll_ctl(ev->epfd, EPOLL_CTL_DEL, fd, &e) == -1) { | 
| 441 |           SERR("epoll_ctl" ); | 
| 442 |           return ctx->rc; | 
| 443 |         } | 
| 444 |       } | 
| 445 | #endif /* USE_EPOLL*/ | 
| 446 | #ifdef USE_KQUEUE | 
| 447 |       struct kevent e; | 
| 448 |       EV_SET(&e, (fd), c->events, EV_DELETE, 0, 0, NULL); | 
| 449 |       if (kevent(ev->kqfd, &e, 1, NULL, 0, NULL) == -1) { | 
| 450 |         SERR("kevent" ); | 
| 451 |         return ctx->rc; | 
| 452 |       } | 
| 453 | #endif /* USE_KQUEUE */ | 
| 454 |       return grn_hash_delete_by_id(ctx, ev->hash, id, NULL); | 
| 455 |     } else { | 
| 456 |       GRN_LOG(ctx, GRN_LOG_ERROR, | 
| 457 |               "%04x| fd(%"  GRN_FMT_SOCKET ") not found in ev(%p)" , | 
| 458 |               grn_getpid(), fd, ev); | 
| 459 |       return GRN_INVALID_ARGUMENT; | 
| 460 |     } | 
| 461 |   } | 
| 462 | } | 
| 463 |  | 
| 464 | #define LISTEN_BACKLOG 0x1000 | 
| 465 |  | 
| 466 | grn_rc | 
| 467 | grn_com_event_start_accept(grn_ctx *ctx, grn_com_event *ev) | 
| 468 | { | 
| 469 |   grn_com *com = ev->acceptor; | 
| 470 |  | 
| 471 |   if (com->accepting) {return ctx->rc;} | 
| 472 |  | 
| 473 |   GRN_API_ENTER; | 
| 474 |   if (!grn_com_event_mod(ctx, ev, com->fd, GRN_COM_POLLIN, NULL)) { | 
| 475 |     if (listen(com->fd, LISTEN_BACKLOG) == 0) { | 
| 476 |       com->accepting = GRN_TRUE; | 
| 477 |     } else { | 
| 478 |       SOERR("listen - start accept" ); | 
| 479 |     } | 
| 480 |   } | 
| 481 |   GRN_API_RETURN(ctx->rc); | 
| 482 | } | 
| 483 |  | 
| 484 | grn_rc | 
| 485 | grn_com_event_stop_accept(grn_ctx *ctx, grn_com_event *ev) | 
| 486 | { | 
| 487 |   grn_com *com = ev->acceptor; | 
| 488 |  | 
| 489 |   if (!com->accepting) {return ctx->rc;} | 
| 490 |  | 
| 491 |   GRN_API_ENTER; | 
| 492 |   if (!grn_com_event_mod(ctx, ev, com->fd, 0, NULL)) { | 
| 493 |     if (listen(com->fd, 0) == 0) { | 
| 494 |       com->accepting = GRN_FALSE; | 
| 495 |     } else { | 
| 496 |       SOERR("listen - disable accept" ); | 
| 497 |     } | 
| 498 |   } | 
| 499 |   GRN_API_RETURN(ctx->rc); | 
| 500 | } | 
| 501 |  | 
| 502 | static void | 
| 503 | grn_com_receiver(grn_ctx *ctx, grn_com *com) | 
| 504 | { | 
| 505 |   grn_com_event *ev = com->ev; | 
| 506 |   ERRCLR(ctx); | 
| 507 |   if (ev->acceptor == com) { | 
| 508 |     grn_com *ncs; | 
| 509 |     grn_sock fd = accept(com->fd, NULL, NULL); | 
| 510 |     if (fd == -1) { | 
| 511 |       if (errno == EMFILE) { | 
| 512 |         grn_com_event_stop_accept(ctx, ev); | 
| 513 |       } else { | 
| 514 |         SOERR("accept" ); | 
| 515 |       } | 
| 516 |       return; | 
| 517 |     } | 
| 518 |     if (grn_com_event_add(ctx, ev, fd, GRN_COM_POLLIN, (grn_com **)&ncs)) { | 
| 519 |       grn_sock_close(fd); | 
| 520 |       return; | 
| 521 |     } | 
| 522 |     ncs->has_sid = 0; | 
| 523 |     ncs->closed = 0; | 
| 524 |     ncs->opaque = NULL; | 
| 525 |     GRN_COM_QUEUE_INIT(&ncs->new_); | 
| 526 |     // GRN_LOG(ctx, GRN_LOG_NOTICE, "accepted (%d)", fd); | 
| 527 |     return; | 
| 528 |   } else { | 
| 529 |     grn_msg *msg = (grn_msg *)grn_msg_open(ctx, com, &ev->recv_old); | 
| 530 |     grn_com_recv(ctx, msg->u.peer, &msg->header, (grn_obj *)msg); | 
| 531 |     if (msg->u.peer /* is_edge_request(msg)*/) { | 
| 532 |       grn_memcpy(&msg->edge_id, &ev->curr_edge_id, sizeof(grn_com_addr)); | 
| 533 |       if (!com->has_sid) { | 
| 534 |         com->has_sid = 1; | 
| 535 |         com->sid = ev->curr_edge_id.sid++; | 
| 536 |       } | 
| 537 |       msg->edge_id.sid = com->sid; | 
| 538 |     } | 
| 539 |     msg->acceptor = ev->acceptor; | 
| 540 |     ev->msg_handler(ctx, (grn_obj *)msg); | 
| 541 |   } | 
| 542 | } | 
| 543 |  | 
| 544 | grn_rc | 
| 545 | grn_com_event_poll(grn_ctx *ctx, grn_com_event *ev, int timeout) | 
| 546 | { | 
| 547 |   int nevents; | 
| 548 |   grn_com *com; | 
| 549 | #ifdef USE_SELECT | 
| 550 |   uint32_t dummy; | 
| 551 |   grn_sock *pfd; | 
| 552 |   int nfds = 0; | 
| 553 |   fd_set rfds; | 
| 554 |   fd_set wfds; | 
| 555 |   struct timeval tv; | 
| 556 |   if (timeout >= 0) { | 
| 557 |     tv.tv_sec = timeout / 1000; | 
| 558 |     tv.tv_usec = (timeout % 1000) * 1000; | 
| 559 |   } | 
| 560 |   FD_ZERO(&rfds); | 
| 561 |   FD_ZERO(&wfds); | 
| 562 |   ctx->errlvl = GRN_OK; | 
| 563 |   ctx->rc = GRN_SUCCESS; | 
| 564 |   { | 
| 565 |     grn_hash_cursor *cursor; | 
| 566 |     cursor = grn_hash_cursor_open(ctx, ev->hash, NULL, 0, NULL, 0, 0, -1, 0); | 
| 567 |     if (cursor) { | 
| 568 |       grn_id id; | 
| 569 |       while ((id = grn_hash_cursor_next(ctx, cursor))) { | 
| 570 |         grn_hash_cursor_get_key_value(ctx, | 
| 571 |                                       cursor, | 
| 572 |                                       (void **)(&pfd), | 
| 573 |                                       &dummy, | 
| 574 |                                       (void **)(&com)); | 
| 575 |         if (com->events & GRN_COM_POLLIN) { FD_SET(*pfd, &rfds); } | 
| 576 |         if (com->events & GRN_COM_POLLOUT) { FD_SET(*pfd, &wfds); } | 
| 577 | # ifndef WIN32 | 
| 578 |         if (*pfd > nfds) { nfds = *pfd; } | 
| 579 | # endif /* WIN32 */ | 
| 580 |       } | 
| 581 |       grn_hash_cursor_close(ctx, cursor); | 
| 582 |     } | 
| 583 |   } | 
| 584 |   nevents = select(nfds + 1, &rfds, &wfds, NULL, (timeout >= 0) ? &tv : NULL); | 
| 585 |   if (nevents < 0) { | 
| 586 |     SOERR("select" ); | 
| 587 |     if (ctx->rc == GRN_INTERRUPTED_FUNCTION_CALL) { ERRCLR(ctx); } | 
| 588 |     return ctx->rc; | 
| 589 |   } | 
| 590 |   if (timeout < 0 && !nevents) { GRN_LOG(ctx, GRN_LOG_NOTICE, "select returns 0 events" ); } | 
| 591 |   GRN_HASH_EACH(ctx, ev->hash, eh, &pfd, &dummy, &com, { | 
| 592 |     if (FD_ISSET(*pfd, &rfds)) { grn_com_receiver(ctx, com); } | 
| 593 |   }); | 
| 594 | #else /* USE_SELECT */ | 
| 595 | # ifdef USE_EPOLL | 
| 596 |   struct epoll_event *ep; | 
| 597 |   ctx->errlvl = GRN_OK; | 
| 598 |   ctx->rc = GRN_SUCCESS; | 
| 599 |   nevents = epoll_wait(ev->epfd, ev->events, ev->max_nevents, timeout); | 
| 600 |   if (nevents < 0) { | 
| 601 |     SERR("epoll_wait" ); | 
| 602 |   } | 
| 603 | # else /* USE_EPOLL */ | 
| 604 | #  ifdef USE_KQUEUE | 
| 605 |   struct kevent *ep; | 
| 606 |   struct timespec tv; | 
| 607 |   if (timeout >= 0) { | 
| 608 |     tv.tv_sec = timeout / 1000; | 
| 609 |     tv.tv_nsec = (timeout % 1000) * 1000; | 
| 610 |   } | 
| 611 |   nevents = kevent(ev->kqfd, NULL, 0, ev->events, ev->max_nevents, &tv); | 
| 612 |   if (nevents < 0) { | 
| 613 |     SERR("kevent" ); | 
| 614 |   } | 
| 615 | #  else /* USE_KQUEUE */ | 
| 616 |   uint32_t dummy; | 
| 617 |   int nfd = 0, *pfd; | 
| 618 |   struct pollfd *ep = ev->events; | 
| 619 |   ctx->errlvl = GRN_OK; | 
| 620 |   ctx->rc = GRN_SUCCESS; | 
| 621 |   GRN_HASH_EACH(ctx, ev->hash, eh, &pfd, &dummy, &com, { | 
| 622 |     ep->fd = *pfd; | 
| 623 |     //    ep->events =(short) com->events; | 
| 624 |     ep->events = POLLIN; | 
| 625 |     ep->revents = 0; | 
| 626 |     ep++; | 
| 627 |     nfd++; | 
| 628 |   }); | 
| 629 |   nevents = poll(ev->events, nfd, timeout); | 
| 630 |   if (nevents < 0) { | 
| 631 |     SERR("poll" ); | 
| 632 |   } | 
| 633 | #  endif /* USE_KQUEUE */ | 
| 634 | # endif /* USE_EPOLL */ | 
| 635 |   if (ctx->rc != GRN_SUCCESS) { | 
| 636 |     if (ctx->rc == GRN_INTERRUPTED_FUNCTION_CALL) { | 
| 637 |       ERRCLR(ctx); | 
| 638 |     } | 
| 639 |     return ctx->rc; | 
| 640 |   } | 
| 641 |   if (timeout < 0 && !nevents) { GRN_LOG(ctx, GRN_LOG_NOTICE, "poll returns 0 events" ); } | 
| 642 |   for (ep = ev->events; nevents; ep++) { | 
| 643 |     int efd; | 
| 644 | # ifdef USE_EPOLL | 
| 645 |     efd = ep->data.fd; | 
| 646 |     nevents--; | 
| 647 |     // todo : com = ep->data.ptr; | 
| 648 |     if (!grn_hash_get(ctx, ev->hash, &efd, sizeof(grn_sock), (void *)&com)) { | 
| 649 |       struct epoll_event e; | 
| 650 |       GRN_LOG(ctx, GRN_LOG_ERROR, "fd(%d) not found in ev->hash" , efd); | 
| 651 |       memset(&e, 0, sizeof(struct epoll_event)); | 
| 652 |       e.data.fd = efd; | 
| 653 |       e.events = ep->events; | 
| 654 |       if (epoll_ctl(ev->epfd, EPOLL_CTL_DEL, efd, &e) == -1) { SERR("epoll_ctl" ); } | 
| 655 |       if (grn_sock_close(efd) == -1) { SOERR("close" ); } | 
| 656 |       continue; | 
| 657 |     } | 
| 658 |     if (ep->events & GRN_COM_POLLIN) { grn_com_receiver(ctx, com); } | 
| 659 | # else /* USE_EPOLL */ | 
| 660 | #  ifdef USE_KQUEUE | 
| 661 |     efd = ep->ident; | 
| 662 |     nevents--; | 
| 663 |     // todo : com = ep->udata; | 
| 664 |     if (!grn_hash_get(ctx, ev->hash, &efd, sizeof(grn_sock), (void *)&com)) { | 
| 665 |       struct kevent e; | 
| 666 |       GRN_LOG(ctx, GRN_LOG_ERROR, "fd(%d) not found in ev->set" , efd); | 
| 667 |       EV_SET(&e, efd, ep->filter, EV_DELETE, 0, 0, NULL); | 
| 668 |       if (kevent(ev->kqfd, &e, 1, NULL, 0, NULL) == -1) { SERR("kevent" ); } | 
| 669 |       if (grn_sock_close(efd) == -1) { SOERR("close" ); } | 
| 670 |       continue; | 
| 671 |     } | 
| 672 |     if (ep->filter == GRN_COM_POLLIN) { grn_com_receiver(ctx, com); } | 
| 673 | #  else | 
| 674 |     efd = ep->fd; | 
| 675 |     if (!(ep->events & ep->revents)) { continue; } | 
| 676 |     nevents--; | 
| 677 |     if (!grn_hash_get(ctx, ev->hash, &efd, sizeof(grn_sock), (void *)&com)) { | 
| 678 |       GRN_LOG(ctx, GRN_LOG_ERROR, "fd(%d) not found in ev->hash" , efd); | 
| 679 |       if (grn_sock_close(efd) == -1) { SOERR("close" ); } | 
| 680 |       continue; | 
| 681 |     } | 
| 682 |     if (ep->revents & GRN_COM_POLLIN) { grn_com_receiver(ctx, com); } | 
| 683 | #  endif /* USE_KQUEUE */ | 
| 684 | # endif /* USE_EPOLL */ | 
| 685 |   } | 
| 686 | #endif /* USE_SELECT */ | 
| 687 |   /* todo : | 
| 688 |   while (!(msg = (grn_com_msg *)grn_com_queue_deque(&recv_old))) { | 
| 689 |     grn_msg_close(ctx, msg); | 
| 690 |   } | 
| 691 |   */ | 
| 692 |   return GRN_SUCCESS; | 
| 693 | } | 
| 694 |  | 
| 695 | grn_rc | 
| 696 | grn_com_send_http(grn_ctx *ctx, grn_com *cs, const char *path, uint32_t path_len, int flags) | 
| 697 | { | 
| 698 |   ssize_t ret; | 
| 699 |   grn_obj buf; | 
| 700 |   GRN_TEXT_INIT(&buf, 0); | 
| 701 |   GRN_TEXT_PUTS(ctx, &buf, "GET " ); | 
| 702 |   grn_bulk_write(ctx, &buf, path, path_len); | 
| 703 |   GRN_TEXT_PUTS(ctx, &buf, " HTTP/1.0\r\n\r\n" ); | 
| 704 |   // todo : refine | 
| 705 |   if ((ret = send(cs->fd, GRN_BULK_HEAD(&buf), GRN_BULK_VSIZE(&buf), MSG_NOSIGNAL|flags)) == -1) { | 
| 706 |     SOERR("send" ); | 
| 707 |   } | 
| 708 |   if (ret != GRN_BULK_VSIZE(&buf)) { | 
| 709 |     GRN_LOG(ctx, GRN_LOG_NOTICE, "send %d != %d" , (int)ret, (int)GRN_BULK_VSIZE(&buf)); | 
| 710 |   } | 
| 711 |   grn_obj_close(ctx, &buf); | 
| 712 |   return ctx->rc; | 
| 713 | } | 
| 714 |  | 
| 715 | grn_rc | 
| 716 | grn_com_send(grn_ctx *ctx, grn_com *cs, | 
| 717 |              grn_com_header *, const char *body, uint32_t size, int flags) | 
| 718 | { | 
| 719 |   grn_rc rc = GRN_SUCCESS; | 
| 720 |   size_t whole_size = sizeof(grn_com_header) + size; | 
| 721 |   ssize_t ret; | 
| 722 |   header->size = htonl(size); | 
| 723 |   GRN_LOG(ctx, GRN_LOG_INFO, "send (%d,%x,%d,%02x,%02x,%04x)" , size, header->flags, header->proto, header->qtype, header->level, header->status); | 
| 724 |  | 
| 725 |   if (size) { | 
| 726 | #ifdef WIN32 | 
| 727 |     WSABUF wsabufs[2]; | 
| 728 |     DWORD n_sent; | 
| 729 |     wsabufs[0].buf = (char *)header; | 
| 730 |     wsabufs[0].len = sizeof(grn_com_header); | 
| 731 |     wsabufs[1].buf = (char *)body; | 
| 732 |     wsabufs[1].len = size; | 
| 733 |     if (WSASend(cs->fd, wsabufs, 2, &n_sent, 0, NULL, NULL) == SOCKET_ERROR) { | 
| 734 |       SOERR("WSASend" ); | 
| 735 |     } | 
| 736 |     ret = n_sent; | 
| 737 | #else /* WIN32 */ | 
| 738 |     struct iovec msg_iov[2]; | 
| 739 |     struct msghdr msg; | 
| 740 |     memset(&msg, 0, sizeof(struct msghdr)); | 
| 741 |     msg.msg_name = NULL; | 
| 742 |     msg.msg_namelen = 0; | 
| 743 |     msg.msg_iov = msg_iov; | 
| 744 |     msg.msg_iovlen = 2; | 
| 745 |     msg_iov[0].iov_base = (char*) header; | 
| 746 |     msg_iov[0].iov_len = sizeof(grn_com_header); | 
| 747 |     msg_iov[1].iov_base = (char *)body; | 
| 748 |     msg_iov[1].iov_len = size; | 
| 749 |     if ((ret = sendmsg(cs->fd, &msg, MSG_NOSIGNAL|flags)) == -1) { | 
| 750 |       SOERR("sendmsg" ); | 
| 751 |       rc = ctx->rc; | 
| 752 |     } | 
| 753 | #endif /* WIN32 */ | 
| 754 |   } else { | 
| 755 |     if ((ret = send(cs->fd, (const void *)header, whole_size, MSG_NOSIGNAL|flags)) == -1) { | 
| 756 |       SOERR("send" ); | 
| 757 |       rc = ctx->rc; | 
| 758 |     } | 
| 759 |   } | 
| 760 |   if (ret != whole_size) { | 
| 761 |     GRN_LOG(ctx, GRN_LOG_ERROR, | 
| 762 |             "sendmsg(%"  GRN_FMT_SOCKET "): %"  GRN_FMT_LLD " < %"  GRN_FMT_LLU, | 
| 763 |             cs->fd, (long long int)ret, (unsigned long long int)whole_size); | 
| 764 |     rc = ctx->rc; | 
| 765 |   } | 
| 766 |   return rc; | 
| 767 | } | 
| 768 |  | 
| 769 | #define RETRY_MAX 10 | 
| 770 |  | 
| 771 | static const char * | 
| 772 | scan_delimiter(const char *p, const char *e) | 
| 773 | { | 
| 774 |   while (p + 4 <= e) { | 
| 775 |     if (p[3] == '\n') { | 
| 776 |       if (p[2] == '\r') { | 
| 777 |         if (p[1] == '\n') { | 
| 778 |           if (p[0] == '\r') { return p + 4; } else { p += 2; } | 
| 779 |         } else { p += 2; } | 
| 780 |       } else { p += 4; } | 
| 781 |     } else { p += p[3] == '\r' ? 1 : 4; } | 
| 782 |   } | 
| 783 |   return NULL; | 
| 784 | } | 
| 785 |  | 
| 786 | #define BUFSIZE 4096 | 
| 787 |  | 
| 788 | static grn_rc | 
| 789 | grn_com_recv_text(grn_ctx *ctx, grn_com *com, | 
| 790 |                   grn_com_header *, grn_obj *buf, ssize_t ret) | 
| 791 | { | 
| 792 |   const char *p; | 
| 793 |   int retry = 0; | 
| 794 |   grn_bulk_write(ctx, buf, (char *)header, ret); | 
| 795 |   if ((p = scan_delimiter(GRN_BULK_HEAD(buf), GRN_BULK_CURR(buf)))) { | 
| 796 |     header->qtype = *GRN_BULK_HEAD(buf); | 
| 797 |     header->proto = GRN_COM_PROTO_HTTP; | 
| 798 |     header->size = GRN_BULK_VSIZE(buf); | 
| 799 |     goto exit; | 
| 800 |   } | 
| 801 |   for (;;) { | 
| 802 |     if (grn_bulk_reserve(ctx, buf, BUFSIZE)) { return ctx->rc; } | 
| 803 |     if ((ret = recv(com->fd, GRN_BULK_CURR(buf), BUFSIZE, 0)) < 0) { | 
| 804 |       SOERR("recv text" ); | 
| 805 |       if (ctx->rc == GRN_OPERATION_WOULD_BLOCK || | 
| 806 |           ctx->rc == GRN_INTERRUPTED_FUNCTION_CALL) { | 
| 807 |         ERRCLR(ctx); | 
| 808 |         continue; | 
| 809 |       } | 
| 810 |       goto exit; | 
| 811 |     } | 
| 812 |     if (ret) { | 
| 813 |       off_t o = GRN_BULK_VSIZE(buf); | 
| 814 |       p = GRN_BULK_CURR(buf); | 
| 815 |       GRN_BULK_INCR_LEN(buf, ret); | 
| 816 |       if (scan_delimiter(p - (o > 3 ? 3 : o), p + ret)) { | 
| 817 |         break; | 
| 818 |       } | 
| 819 |     } else { | 
| 820 |       if (++retry > RETRY_MAX) { | 
| 821 |         // ERR(GRN_RETRY_MAX, "retry max in recv text"); | 
| 822 |         goto exit; | 
| 823 |       } | 
| 824 |     } | 
| 825 |   } | 
| 826 |   header->qtype = *GRN_BULK_HEAD(buf); | 
| 827 |   header->proto = GRN_COM_PROTO_HTTP; | 
| 828 |   header->size = GRN_BULK_VSIZE(buf); | 
| 829 | exit : | 
| 830 |   if (header->qtype == 'H') { | 
| 831 |     //todo : refine | 
| 832 |     /* | 
| 833 |     GRN_BULK_REWIND(buf); | 
| 834 |     grn_bulk_reserve(ctx, buf, BUFSIZE); | 
| 835 |     if ((ret = recv(com->fd, GRN_BULK_CURR(buf), BUFSIZE, 0)) < 0) { | 
| 836 |       SOERR("recv text body"); | 
| 837 |     } else { | 
| 838 |       GRN_BULK_CURR(buf) += ret; | 
| 839 |     } | 
| 840 |     */ | 
| 841 |   } | 
| 842 |   return ctx->rc; | 
| 843 | } | 
| 844 |  | 
| 845 | grn_rc | 
| 846 | grn_com_recv(grn_ctx *ctx, grn_com *com, grn_com_header *, grn_obj *buf) | 
| 847 | { | 
| 848 |   ssize_t ret; | 
| 849 |   int retry = 0; | 
| 850 |   byte *p = (byte *)header; | 
| 851 |   size_t rest = sizeof(grn_com_header); | 
| 852 |   do { | 
| 853 |     if ((ret = recv(com->fd, p, rest, 0)) < 0) { | 
| 854 |       SOERR("recv size" ); | 
| 855 |       GRN_LOG(ctx, GRN_LOG_ERROR, "recv error (%"  GRN_FMT_SOCKET ")" , com->fd); | 
| 856 |       if (ctx->rc == GRN_OPERATION_WOULD_BLOCK || | 
| 857 |           ctx->rc == GRN_INTERRUPTED_FUNCTION_CALL) { | 
| 858 |         ERRCLR(ctx); | 
| 859 |         continue; | 
| 860 |       } | 
| 861 |       goto exit; | 
| 862 |     } | 
| 863 |     if (ret) { | 
| 864 |       if (header->proto < 0x80) { | 
| 865 |         return grn_com_recv_text(ctx, com, header, buf, ret); | 
| 866 |       } | 
| 867 |       rest -= ret, p += ret; | 
| 868 |     } else { | 
| 869 |       if (++retry > RETRY_MAX) { | 
| 870 |         // ERR(GRN_RETRY_MAX, "retry max in recv header (%d)", com->fd); | 
| 871 |         goto exit; | 
| 872 |       } | 
| 873 |     } | 
| 874 |   } while (rest); | 
| 875 |   GRN_LOG(ctx, GRN_LOG_INFO, | 
| 876 |           "recv (%u,%x,%d,%02x,%02x,%04x)" , | 
| 877 |           (uint32_t)ntohl(header->size), | 
| 878 |           header->flags, | 
| 879 |           header->proto, | 
| 880 |           header->qtype, | 
| 881 |           header->level, | 
| 882 |           header->status); | 
| 883 |   { | 
| 884 |     uint8_t proto = header->proto; | 
| 885 |     size_t value_size = ntohl(header->size); | 
| 886 |     GRN_BULK_REWIND(buf); | 
| 887 |     switch (proto) { | 
| 888 |     case GRN_COM_PROTO_GQTP : | 
| 889 |     case GRN_COM_PROTO_MBREQ : | 
| 890 |       if (GRN_BULK_WSIZE(buf) < value_size) { | 
| 891 |         if (grn_bulk_resize(ctx, buf, value_size)) { | 
| 892 |           goto exit; | 
| 893 |         } | 
| 894 |       } | 
| 895 |       retry = 0; | 
| 896 |       for (rest = value_size; rest;) { | 
| 897 |         if ((ret = recv(com->fd, GRN_BULK_CURR(buf), rest, MSG_WAITALL)) < 0) { | 
| 898 |           SOERR("recv body" ); | 
| 899 |           if (ctx->rc == GRN_OPERATION_WOULD_BLOCK || | 
| 900 |               ctx->rc == GRN_INTERRUPTED_FUNCTION_CALL) { | 
| 901 |             ERRCLR(ctx); | 
| 902 |             continue; | 
| 903 |           } | 
| 904 |           goto exit; | 
| 905 |         } | 
| 906 |         if (ret) { | 
| 907 |           rest -= ret; | 
| 908 |           GRN_BULK_INCR_LEN(buf, ret); | 
| 909 |         } else { | 
| 910 |           if (++retry > RETRY_MAX) { | 
| 911 |             // ERR(GRN_RETRY_MAX, "retry max in recv body"); | 
| 912 |             goto exit; | 
| 913 |           } | 
| 914 |         } | 
| 915 |       } | 
| 916 |       break; | 
| 917 |     default : | 
| 918 |       GRN_LOG(ctx, GRN_LOG_ERROR, "illegal header: %d" , proto); | 
| 919 |       ctx->rc = GRN_INVALID_FORMAT; | 
| 920 |       goto exit; | 
| 921 |     } | 
| 922 |   } | 
| 923 | exit : | 
| 924 |   return ctx->rc; | 
| 925 | } | 
| 926 |  | 
| 927 | grn_com * | 
| 928 | grn_com_copen(grn_ctx *ctx, grn_com_event *ev, const char *dest, int port) | 
| 929 | { | 
| 930 |   grn_sock fd = -1; | 
| 931 |   grn_com *cs = NULL; | 
| 932 |  | 
| 933 |   struct addrinfo hints, *addrinfo_list, *addrinfo_ptr; | 
| 934 |   char port_string[16]; | 
| 935 |   int getaddrinfo_result; | 
| 936 |  | 
| 937 |   memset(&hints, 0, sizeof(hints)); | 
| 938 |   hints.ai_family = AF_UNSPEC; | 
| 939 |   hints.ai_socktype = SOCK_STREAM; | 
| 940 | #ifdef AI_NUMERICSERV | 
| 941 |   hints.ai_flags = AI_NUMERICSERV; | 
| 942 | #endif | 
| 943 |   grn_snprintf(port_string, sizeof(port_string), sizeof(port_string), | 
| 944 |                "%d" , port); | 
| 945 |  | 
| 946 |   getaddrinfo_result = getaddrinfo(dest, port_string, &hints, &addrinfo_list); | 
| 947 |   if (getaddrinfo_result != 0) { | 
| 948 |     switch (getaddrinfo_result) { | 
| 949 | #ifdef EAI_MEMORY | 
| 950 |     case EAI_MEMORY: | 
| 951 |       ERR(GRN_NO_MEMORY_AVAILABLE, "getaddrinfo: <%s:%s>: %s" , | 
| 952 |           dest, port_string, gai_strerror(getaddrinfo_result)); | 
| 953 |       break; | 
| 954 | #endif | 
| 955 | #ifdef EAI_SYSTEM | 
| 956 |     case EAI_SYSTEM: | 
| 957 |       SOERR("getaddrinfo" ); | 
| 958 |       break; | 
| 959 | #endif | 
| 960 |     default: | 
| 961 |       ERR(GRN_INVALID_ARGUMENT, "getaddrinfo: <%s:%s>: %s" , | 
| 962 |           dest, port_string, gai_strerror(getaddrinfo_result)); | 
| 963 |       break; | 
| 964 |     } | 
| 965 |     return NULL; | 
| 966 |   } | 
| 967 |  | 
| 968 |   for (addrinfo_ptr = addrinfo_list; addrinfo_ptr; | 
| 969 |        addrinfo_ptr = addrinfo_ptr->ai_next) { | 
| 970 |     fd = socket(addrinfo_ptr->ai_family, addrinfo_ptr->ai_socktype, | 
| 971 |                 addrinfo_ptr->ai_protocol); | 
| 972 |     if (fd == -1) { | 
| 973 |       SOERR("socket" ); | 
| 974 |       continue; | 
| 975 |     } | 
| 976 | #ifdef TCP_NODELAY | 
| 977 |     { | 
| 978 |       static const int value = 1; | 
| 979 |       if (setsockopt(fd, 6, TCP_NODELAY, | 
| 980 |                      (const char *)&value, sizeof(value)) != 0) { | 
| 981 |         SOERR("setsockopt" ); | 
| 982 |         grn_sock_close(fd); | 
| 983 |         continue; | 
| 984 |       } | 
| 985 |     } | 
| 986 | #endif | 
| 987 |     if (connect(fd, addrinfo_ptr->ai_addr, addrinfo_ptr->ai_addrlen) != 0) { | 
| 988 |       SOERR("connect" ); | 
| 989 |       grn_sock_close(fd); | 
| 990 |       continue; | 
| 991 |     } | 
| 992 |  | 
| 993 |     break; | 
| 994 |   } | 
| 995 |  | 
| 996 |   freeaddrinfo(addrinfo_list); | 
| 997 |  | 
| 998 |   if (!addrinfo_ptr) { | 
| 999 |     return NULL; | 
| 1000 |   } | 
| 1001 |   ctx->errlvl = GRN_OK; | 
| 1002 |   ctx->rc = GRN_SUCCESS; | 
| 1003 |  | 
| 1004 |   if (ev) { | 
| 1005 |     grn_com_event_add(ctx, ev, fd, GRN_COM_POLLIN, &cs); | 
| 1006 |   } else { | 
| 1007 |     cs = GRN_CALLOC(sizeof(grn_com)); | 
| 1008 |     if (cs) { | 
| 1009 |       cs->fd = fd; | 
| 1010 |     } | 
| 1011 |   } | 
| 1012 |   if (!cs) { | 
| 1013 |     grn_sock_close(fd); | 
| 1014 |   } | 
| 1015 |   return cs; | 
| 1016 | } | 
| 1017 |  | 
| 1018 | void | 
| 1019 | grn_com_close_(grn_ctx *ctx, grn_com *com) | 
| 1020 | { | 
| 1021 |   grn_sock fd = com->fd; | 
| 1022 |   if (shutdown(fd, SHUT_RDWR) == -1) { /* SOERR("shutdown"); */ } | 
| 1023 |   if (grn_sock_close(fd) == -1) { | 
| 1024 |     SOERR("close" ); | 
| 1025 |   } else { | 
| 1026 |     com->closed = 1; | 
| 1027 |   } | 
| 1028 | } | 
| 1029 |  | 
| 1030 | grn_rc | 
| 1031 | grn_com_close(grn_ctx *ctx, grn_com *com) | 
| 1032 | { | 
| 1033 |   grn_sock fd = com->fd; | 
| 1034 |   grn_com_event *ev = com->ev; | 
| 1035 |   if (ev) { | 
| 1036 |     grn_com *acceptor = ev->acceptor; | 
| 1037 |     grn_com_event_del(ctx, ev, fd); | 
| 1038 |     if (acceptor) { grn_com_event_start_accept(ctx, ev); } | 
| 1039 |   } | 
| 1040 |   if (!com->closed) { grn_com_close_(ctx, com); } | 
| 1041 |   if (!ev) { GRN_FREE(com); } | 
| 1042 |   return GRN_SUCCESS; | 
| 1043 | } | 
| 1044 |  | 
| 1045 | grn_rc | 
| 1046 | grn_com_sopen(grn_ctx *ctx, grn_com_event *ev, | 
| 1047 |               const char *bind_address, int port, grn_msg_handler *func, | 
| 1048 |               struct hostent *he) | 
| 1049 | { | 
| 1050 |   grn_sock lfd = -1; | 
| 1051 |   grn_com *cs = NULL; | 
| 1052 |   int getaddrinfo_result; | 
| 1053 |   struct addrinfo *bind_address_info = NULL; | 
| 1054 |   struct addrinfo hints; | 
| 1055 |   char port_string[6]; /* ceil(log10(65535)) + 1 ('\0')*/ | 
| 1056 |  | 
| 1057 |   GRN_API_ENTER; | 
| 1058 |   if (!bind_address) { | 
| 1059 |     bind_address = "0.0.0.0" ; | 
| 1060 |   } | 
| 1061 |   grn_snprintf(port_string, sizeof(port_string), sizeof(port_string), | 
| 1062 |                "%d" , port); | 
| 1063 |   memset(&hints, 0, sizeof(struct addrinfo)); | 
| 1064 |   hints.ai_family = PF_UNSPEC; | 
| 1065 |   hints.ai_socktype = SOCK_STREAM; | 
| 1066 | #ifdef AI_NUMERICSERV | 
| 1067 |   hints.ai_flags = AI_NUMERICSERV; | 
| 1068 | #endif | 
| 1069 |   getaddrinfo_result = getaddrinfo(bind_address, port_string, | 
| 1070 |                                    &hints, &bind_address_info); | 
| 1071 |   if (getaddrinfo_result != 0) { | 
| 1072 |     switch (getaddrinfo_result) { | 
| 1073 | #ifdef EAI_MEMORY | 
| 1074 |     case EAI_MEMORY: | 
| 1075 |       ERR(GRN_NO_MEMORY_AVAILABLE, | 
| 1076 |           "getaddrinfo: <%s:%s>: %s" , | 
| 1077 |           bind_address, port_string, gai_strerror(getaddrinfo_result)); | 
| 1078 |       break; | 
| 1079 | #endif | 
| 1080 | #ifdef EAI_SYSTEM | 
| 1081 |     case EAI_SYSTEM: | 
| 1082 |       SOERR("getaddrinfo" ); | 
| 1083 |       break; | 
| 1084 | #endif | 
| 1085 |     default: | 
| 1086 |       ERR(GRN_INVALID_ARGUMENT, | 
| 1087 |           "getaddrinfo: <%s:%s>: %s" , | 
| 1088 |           bind_address, port_string, gai_strerror(getaddrinfo_result)); | 
| 1089 |       break; | 
| 1090 |     } | 
| 1091 |     goto exit; | 
| 1092 |   } | 
| 1093 |   if ((lfd = socket(bind_address_info->ai_family, SOCK_STREAM, 0)) == -1) { | 
| 1094 |     SOERR("socket" ); | 
| 1095 |     goto exit; | 
| 1096 |   } | 
| 1097 |   grn_memcpy(&ev->curr_edge_id.addr, he->h_addr, he->h_length); | 
| 1098 |   ev->curr_edge_id.port = htons(port); | 
| 1099 |   ev->curr_edge_id.sid = 0; | 
| 1100 |   { | 
| 1101 |     int v = 1; | 
| 1102 | #ifdef TCP_NODELAY | 
| 1103 |     if (setsockopt(lfd, SOL_TCP, TCP_NODELAY, (void *) &v, sizeof(int)) == -1) { | 
| 1104 |       SOERR("setsockopt" ); | 
| 1105 |       goto exit; | 
| 1106 |     } | 
| 1107 | #endif | 
| 1108 |     if (setsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, (void *) &v, sizeof(int)) == -1) { | 
| 1109 |       SOERR("setsockopt" ); | 
| 1110 |       goto exit; | 
| 1111 |     } | 
| 1112 |   } | 
| 1113 |   if (bind(lfd, bind_address_info->ai_addr, bind_address_info->ai_addrlen) < 0) { | 
| 1114 |     SOERR("bind" ); | 
| 1115 |     goto exit; | 
| 1116 |   } | 
| 1117 |   if (listen(lfd, LISTEN_BACKLOG) < 0) { | 
| 1118 |     SOERR("listen" ); | 
| 1119 |     goto exit; | 
| 1120 |   } | 
| 1121 |   if (ev) { | 
| 1122 |     if (grn_com_event_add(ctx, ev, lfd, GRN_COM_POLLIN, &cs)) { goto exit; } | 
| 1123 |     ev->acceptor = cs; | 
| 1124 |     ev->msg_handler = func; | 
| 1125 |     cs->has_sid = 0; | 
| 1126 |     cs->closed = 0; | 
| 1127 |     cs->opaque = NULL; | 
| 1128 |     GRN_COM_QUEUE_INIT(&cs->new_); | 
| 1129 |   } else { | 
| 1130 |     if (!(cs = GRN_MALLOC(sizeof(grn_com)))) { goto exit; } | 
| 1131 |     cs->fd = lfd; | 
| 1132 |   } | 
| 1133 |   cs->accepting = GRN_TRUE; | 
| 1134 | exit : | 
| 1135 |   if (!cs && lfd != 1) { grn_sock_close(lfd); } | 
| 1136 |   if (bind_address_info) { freeaddrinfo(bind_address_info); } | 
| 1137 |   GRN_API_RETURN(ctx->rc); | 
| 1138 | } | 
| 1139 |  | 
| 1140 |  | 
| 1141 | grn_hash *grn_edges = NULL; | 
| 1142 | void (*grn_dispatcher)(grn_ctx *ctx, grn_edge *edge); | 
| 1143 |  | 
| 1144 | void | 
| 1145 | grn_edges_init(grn_ctx *ctx, void (*dispatcher)(grn_ctx *ctx, grn_edge *edge)) | 
| 1146 | { | 
| 1147 |   grn_edges = grn_hash_create(ctx, NULL, sizeof(grn_com_addr), sizeof(grn_edge), 0); | 
| 1148 |   grn_dispatcher = dispatcher; | 
| 1149 | } | 
| 1150 |  | 
| 1151 | void | 
| 1152 | grn_edges_fin(grn_ctx *ctx) | 
| 1153 | { | 
| 1154 |   grn_hash_close(ctx, grn_edges); | 
| 1155 | } | 
| 1156 |  | 
| 1157 | grn_edge * | 
| 1158 | grn_edges_add(grn_ctx *ctx, grn_com_addr *addr, int *added) | 
| 1159 | { | 
| 1160 |   if (grn_io_lock(ctx, grn_edges->io, grn_lock_timeout)) { | 
| 1161 |     return NULL; | 
| 1162 |   } else { | 
| 1163 |     grn_edge *edge; | 
| 1164 |     grn_id id = grn_hash_add(ctx, grn_edges, addr, sizeof(grn_com_addr), | 
| 1165 |                              (void **)&edge, added); | 
| 1166 |     grn_io_unlock(grn_edges->io); | 
| 1167 |     if (id) { edge->id = id; } | 
| 1168 |     return edge; | 
| 1169 |   } | 
| 1170 | } | 
| 1171 |  | 
| 1172 | void | 
| 1173 | grn_edges_delete(grn_ctx *ctx, grn_edge *edge) | 
| 1174 | { | 
| 1175 |   if (!grn_io_lock(ctx, grn_edges->io, grn_lock_timeout)) { | 
| 1176 |     grn_hash_delete_by_id(ctx, grn_edges, edge->id, NULL); | 
| 1177 |     grn_io_unlock(grn_edges->io); | 
| 1178 |   } | 
| 1179 | } | 
| 1180 |  | 
| 1181 | grn_edge * | 
| 1182 | grn_edges_add_communicator(grn_ctx *ctx, grn_com_addr *addr) | 
| 1183 | { | 
| 1184 |   int added; | 
| 1185 |   grn_edge *edge = grn_edges_add(ctx, addr, &added); | 
| 1186 |   if (added) { | 
| 1187 |     grn_ctx_init(&edge->ctx, 0); | 
| 1188 |     GRN_COM_QUEUE_INIT(&edge->recv_new); | 
| 1189 |     GRN_COM_QUEUE_INIT(&edge->send_old); | 
| 1190 |     edge->com = NULL; | 
| 1191 |     edge->stat = 0 /*EDGE_IDLE*/; | 
| 1192 |     edge->flags = GRN_EDGE_COMMUNICATOR; | 
| 1193 |   } | 
| 1194 |   return edge; | 
| 1195 | } | 
| 1196 |  | 
| 1197 | void | 
| 1198 | grn_edge_dispatch(grn_ctx *ctx, grn_edge *edge, grn_obj *msg) | 
| 1199 | { | 
| 1200 |   grn_com_queue_enque(ctx, &edge->recv_new, (grn_com_queue_entry *)msg); | 
| 1201 |   grn_dispatcher(ctx, edge); | 
| 1202 | } | 
| 1203 |  |