1/* -*- c-basic-offset: 2 -*- */
2/*
3 Copyright(C) 2009-2016 Brazil
4
5 This library is free software; you can redistribute it and/or
6 modify it under the terms of the GNU Lesser General Public
7 License version 2.1 as published by the Free Software Foundation.
8
9 This library is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 Lesser General Public License for more details.
13
14 You should have received a copy of the GNU Lesser General Public
15 License along with this library; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17*/
18
19#pragma once
20
21#include "grn.h"
22#include "grn_str.h"
23#include "grn_hash.h"
24
25#ifdef HAVE_NETDB_H
26#include <netdb.h>
27#endif /* HAVE_NETDB_H */
28
29#ifdef __cplusplus
30extern "C" {
31#endif
32
33/******* grn_com_queue ********/
34
35typedef struct _grn_com_queue grn_com_queue;
36typedef struct _grn_com_queue_entry grn_com_queue_entry;
37
38#define GRN_COM_QUEUE_BINSIZE (0x100)
39
40struct _grn_com_queue_entry {
41 grn_obj obj;
42 struct _grn_com_queue_entry *next;
43};
44
45struct _grn_com_queue {
46 grn_com_queue_entry *bins[GRN_COM_QUEUE_BINSIZE];
47 grn_com_queue_entry *next;
48 grn_com_queue_entry **tail;
49 uint8_t first;
50 uint8_t last;
51 grn_critical_section cs;
52};
53
54#define GRN_COM_QUEUE_INIT(q) do {\
55 (q)->next = NULL;\
56 (q)->tail = &(q)->next;\
57 (q)->first = 0;\
58 (q)->last = 0;\
59 CRITICAL_SECTION_INIT((q)->cs);\
60} while (0)
61
62#define GRN_COM_QUEUE_EMPTYP(q) (((q)->first == (q)->last) && !(q)->next)
63
64GRN_API grn_rc grn_com_queue_enque(grn_ctx *ctx, grn_com_queue *q, grn_com_queue_entry *e);
65GRN_API grn_com_queue_entry *grn_com_queue_deque(grn_ctx *ctx, grn_com_queue *q);
66
67/******* grn_com ********/
68
69#ifdef USE_SELECT
70# ifdef HAVE_SYS_SELECT_H
71# include <sys/select.h>
72# endif /* HAVE_SYS_SELECT_H */
73# define GRN_COM_POLLIN 1
74# define GRN_COM_POLLOUT 2
75#else /* USE_SELECT */
76# ifdef USE_EPOLL
77# include <sys/epoll.h>
78# define GRN_COM_POLLIN EPOLLIN
79# define GRN_COM_POLLOUT EPOLLOUT
80# else /* USE_EPOLL */
81# ifdef USE_KQUEUE
82# include <sys/event.h>
83# define GRN_COM_POLLIN EVFILT_READ
84# define GRN_COM_POLLOUT EVFILT_WRITE
85# else /* USE_KQUEUE */
86# include <poll.h>
87# define GRN_COM_POLLIN POLLIN
88# define GRN_COM_POLLOUT POLLOUT
89# endif /* USE_KQUEUE */
90# endif /* USE_EPOLL */
91#endif /* USE_SELECT */
92
93typedef struct _grn_com grn_com;
94typedef struct _grn_com_event grn_com_event;
95typedef struct _grn_com_addr grn_com_addr;
96typedef void grn_com_callback(grn_ctx *ctx, grn_com_event *, grn_com *);
97typedef void grn_msg_handler(grn_ctx *ctx, grn_obj *msg);
98
99enum {
100 grn_com_ok = 0,
101 grn_com_emem,
102 grn_com_erecv_head,
103 grn_com_erecv_body,
104 grn_com_eproto,
105};
106
107struct _grn_com_addr {
108 uint32_t addr;
109 uint16_t port;
110 uint16_t sid;
111};
112
113struct _grn_com {
114 grn_sock fd;
115 int events;
116 uint16_t sid;
117 uint8_t has_sid;
118 uint8_t closed;
119 grn_com_queue new_;
120 grn_com_event *ev;
121 void *opaque;
122 grn_bool accepting;
123};
124
125struct _grn_com_event {
126 struct _grn_hash *hash;
127 int max_nevents;
128 grn_ctx *ctx;
129 grn_mutex mutex;
130 grn_cond cond;
131 grn_com_queue recv_old;
132 grn_msg_handler *msg_handler;
133 grn_com_addr curr_edge_id;
134 grn_com *acceptor;
135 void *opaque;
136#ifndef USE_SELECT
137#ifdef USE_EPOLL
138 int epfd;
139 struct epoll_event *events;
140#else /* USE_EPOLL */
141#ifdef USE_KQUEUE
142 int kqfd;
143 struct kevent *events;
144#else /* USE_KQUEUE */
145 int dummy; /* dummy */
146 struct pollfd *events;
147#endif /* USE_KQUEUE */
148#endif /* USE_EPOLL */
149#endif /* USE_SELECT */
150};
151
152grn_rc grn_com_init(void);
153void grn_com_fin(void);
154GRN_API grn_rc grn_com_event_init(grn_ctx *ctx, grn_com_event *ev, int max_nevents, int data_size);
155GRN_API grn_rc grn_com_event_fin(grn_ctx *ctx, grn_com_event *ev);
156GRN_API grn_rc grn_com_event_start_accept(grn_ctx *ctx, grn_com_event *ev);
157grn_rc grn_com_event_stop_accept(grn_ctx *ctx, grn_com_event *ev);
158grn_rc grn_com_event_add(grn_ctx *ctx, grn_com_event *ev, grn_sock fd, int events, grn_com **com);
159grn_rc grn_com_event_mod(grn_ctx *ctx, grn_com_event *ev, grn_sock fd, int events, grn_com **com);
160GRN_API grn_rc grn_com_event_del(grn_ctx *ctx, grn_com_event *ev, grn_sock fd);
161GRN_API grn_rc grn_com_event_poll(grn_ctx *ctx, grn_com_event *ev, int timeout);
162grn_rc grn_com_event_each(grn_ctx *ctx, grn_com_event *ev, grn_com_callback *func);
163
164/******* grn_com_gqtp ********/
165
166#define GRN_COM_PROTO_HTTP 0x47
167#define GRN_COM_PROTO_GQTP 0xc7
168#define GRN_COM_PROTO_MBREQ 0x80
169#define GRN_COM_PROTO_MBRES 0x81
170
171typedef struct _grn_com_header grn_com_header;
172
173struct _grn_com_header {
174 uint8_t proto;
175 uint8_t qtype;
176 uint16_t keylen;
177 uint8_t level;
178 uint8_t flags;
179 uint16_t status;
180 uint32_t size;
181 uint32_t opaque;
182 uint64_t cas;
183};
184
185GRN_API grn_com *grn_com_copen(grn_ctx *ctx, grn_com_event *ev, const char *dest, int port);
186GRN_API grn_rc grn_com_sopen(grn_ctx *ctx, grn_com_event *ev,
187 const char *bind_address, int port,
188 grn_msg_handler *func, struct hostent *he);
189
190GRN_API void grn_com_close_(grn_ctx *ctx, grn_com *com);
191GRN_API grn_rc grn_com_close(grn_ctx *ctx, grn_com *com);
192
193GRN_API grn_rc grn_com_send(grn_ctx *ctx, grn_com *cs,
194 grn_com_header *header, const char *body, uint32_t size, int flags);
195grn_rc grn_com_recv(grn_ctx *ctx, grn_com *cs, grn_com_header *header, grn_obj *buf);
196GRN_API grn_rc grn_com_send_http(grn_ctx *ctx, grn_com *cs, const char *path, uint32_t path_len, int flags);
197
198/******* grn_msg ********/
199
200typedef struct _grn_msg grn_msg;
201
202struct _grn_msg {
203 grn_com_queue_entry qe;
204 union {
205 grn_com *peer;
206 grn_sock fd;
207 } u;
208 grn_ctx *ctx;
209 grn_com_queue *old;
210 grn_com_header header;
211 grn_com_addr edge_id;
212 grn_com *acceptor;
213};
214
215GRN_API grn_rc grn_msg_send(grn_ctx *ctx, grn_obj *msg, int flags);
216GRN_API grn_obj *grn_msg_open_for_reply(grn_ctx *ctx, grn_obj *query, grn_com_queue *old);
217GRN_API grn_obj *grn_msg_open(grn_ctx *ctx, grn_com *com, grn_com_queue *old);
218GRN_API grn_rc grn_msg_set_property(grn_ctx *ctx, grn_obj *obj,
219 uint16_t status, uint32_t key_size, uint8_t extra_size);
220GRN_API grn_rc grn_msg_close(grn_ctx *ctx, grn_obj *msg);
221
222/******* grn_edge ********/
223
224#define GRN_EDGE_WORKER 0
225#define GRN_EDGE_COMMUNICATOR 1
226
227typedef struct {
228 grn_com_queue_entry eq;
229 grn_ctx ctx;
230 grn_com_queue recv_new;
231 grn_com_queue send_old;
232 grn_com *com;
233 grn_com_addr *addr;
234 grn_msg *msg;
235 uint8_t stat;
236 uint8_t flags;
237 grn_id id;
238} grn_edge;
239
240GRN_VAR grn_hash *grn_edges;
241GRN_API void grn_edges_init(grn_ctx *ctx, void (*dispatcher)(grn_ctx *ctx, grn_edge *edge));
242GRN_API void grn_edges_fin(grn_ctx *ctx);
243GRN_API grn_edge *grn_edges_add(grn_ctx *ctx, grn_com_addr *addr, int *added);
244grn_edge *grn_edges_add_communicator(grn_ctx *ctx, grn_com_addr *addr);
245GRN_API void grn_edges_delete(grn_ctx *ctx, grn_edge *edge);
246void grn_edge_dispatch(grn_ctx *ctx, grn_edge *edge, grn_obj *msg);
247
248#ifdef __cplusplus
249}
250#endif
251