1 | |
2 | // vim:sw=2:ai |
3 | |
4 | /* |
5 | * Copyright (C) 2010 DeNA Co.,Ltd.. All rights reserved. |
6 | * See COPYRIGHT.txt for details. |
7 | */ |
8 | |
9 | #include <my_global.h> |
10 | #include <stdexcept> |
11 | |
12 | #include "hstcpcli.hpp" |
13 | #include "auto_file.hpp" |
14 | #include "string_util.hpp" |
15 | #include "auto_addrinfo.hpp" |
16 | #include "escape.hpp" |
17 | #include "util.hpp" |
18 | |
19 | /* TODO */ |
20 | #if !defined(__linux__) && !defined(__FreeBSD__) && !defined(MSG_NOSIGNAL) |
21 | #define MSG_NOSIGNAL 0 |
22 | #endif |
23 | |
24 | #define DBG(x) |
25 | |
26 | namespace dena { |
27 | |
28 | struct hstcpcli : public hstcpcli_i, private noncopyable { |
29 | hstcpcli(const socket_args& args); |
30 | virtual void close(); |
31 | virtual int reconnect(); |
32 | virtual bool stable_point(); |
33 | virtual void request_buf_open_index(size_t pst_id, const char *dbn, |
34 | const char *tbl, const char *idx, const char *retflds, const char *filflds); |
35 | virtual void request_buf_auth(const char *secret, const char *typ); |
36 | virtual void request_buf_exec_generic(size_t pst_id, const string_ref& op, |
37 | const string_ref *kvs, size_t kvslen, uint32_t limit, uint32_t skip, |
38 | const string_ref& mod_op, const string_ref *mvs, size_t mvslen, |
39 | const hstcpcli_filter *fils, size_t filslen, int invalues_keypart, |
40 | const string_ref *invalues, size_t invalueslen); |
41 | virtual int request_send(); |
42 | virtual int response_recv(size_t& num_flds_r); |
43 | virtual const string_ref *get_next_row(); |
44 | virtual void response_buf_remove(); |
45 | virtual int get_error_code(); |
46 | virtual std::string get_error(); |
47 | private: |
48 | int read_more(); |
49 | void clear_error(); |
50 | int set_error(int code, const std::string& str); |
51 | private: |
52 | auto_file fd; |
53 | socket_args sargs; |
54 | string_buffer readbuf; |
55 | string_buffer writebuf; |
56 | size_t response_end_offset; /* incl newline */ |
57 | size_t cur_row_offset; |
58 | size_t num_flds; |
59 | size_t num_req_bufd; /* buffered but not yet sent */ |
60 | size_t num_req_sent; /* sent but not yet received */ |
61 | size_t num_req_rcvd; /* received but not yet removed */ |
62 | int error_code; |
63 | std::string error_str; |
64 | std::vector<string_ref> flds; |
65 | }; |
66 | |
67 | hstcpcli::hstcpcli(const socket_args& args) |
68 | : sargs(args), response_end_offset(0), cur_row_offset(0), num_flds(0), |
69 | num_req_bufd(0), num_req_sent(0), num_req_rcvd(0), error_code(0) |
70 | { |
71 | std::string err; |
72 | if (socket_connect(fd, sargs, err) != 0) { |
73 | set_error(-1, err); |
74 | } |
75 | } |
76 | |
77 | void |
78 | hstcpcli::close() |
79 | { |
80 | fd.close(); |
81 | readbuf.clear(); |
82 | writebuf.clear(); |
83 | flds.clear(); |
84 | response_end_offset = 0; |
85 | cur_row_offset = 0; |
86 | num_flds = 0; |
87 | num_req_bufd = 0; |
88 | num_req_sent = 0; |
89 | num_req_rcvd = 0; |
90 | } |
91 | |
92 | int |
93 | hstcpcli::reconnect() |
94 | { |
95 | clear_error(); |
96 | close(); |
97 | std::string err; |
98 | if (socket_connect(fd, sargs, err) != 0) { |
99 | set_error(-1, err); |
100 | } |
101 | return error_code; |
102 | } |
103 | |
104 | bool |
105 | hstcpcli::stable_point() |
106 | { |
107 | /* returns true if cli can send a new request */ |
108 | return fd.get() >= 0 && num_req_bufd == 0 && num_req_sent == 0 && |
109 | num_req_rcvd == 0 && response_end_offset == 0; |
110 | } |
111 | |
112 | int |
113 | hstcpcli::get_error_code() |
114 | { |
115 | return error_code; |
116 | } |
117 | |
118 | std::string |
119 | hstcpcli::get_error() |
120 | { |
121 | return error_str; |
122 | } |
123 | |
124 | int |
125 | hstcpcli::read_more() |
126 | { |
127 | const size_t block_size = 4096; // FIXME |
128 | char *const wp = readbuf.make_space(block_size); |
129 | const ssize_t rlen = read(fd.get(), wp, block_size); |
130 | if (rlen <= 0) { |
131 | if (rlen < 0) { |
132 | error_str = "read: failed" ; |
133 | } else { |
134 | error_str = "read: eof" ; |
135 | } |
136 | return rlen; |
137 | } |
138 | readbuf.space_wrote(rlen); |
139 | return rlen; |
140 | } |
141 | |
142 | void |
143 | hstcpcli::clear_error() |
144 | { |
145 | DBG(fprintf(stderr, "CLEAR_ERROR: %d\n" , error_code)); |
146 | error_code = 0; |
147 | error_str.clear(); |
148 | } |
149 | |
150 | int |
151 | hstcpcli::set_error(int code, const std::string& str) |
152 | { |
153 | DBG(fprintf(stderr, "SET_ERROR: %d\n" , code)); |
154 | error_code = code; |
155 | error_str = str; |
156 | return error_code; |
157 | } |
158 | |
159 | void |
160 | hstcpcli::request_buf_open_index(size_t pst_id, const char *dbn, |
161 | const char *tbl, const char *idx, const char *retflds, const char *filflds) |
162 | { |
163 | if (num_req_sent > 0 || num_req_rcvd > 0) { |
164 | close(); |
165 | set_error(-1, "request_buf_open_index: protocol out of sync" ); |
166 | return; |
167 | } |
168 | const string_ref dbn_ref(dbn, strlen(dbn)); |
169 | const string_ref tbl_ref(tbl, strlen(tbl)); |
170 | const string_ref idx_ref(idx, strlen(idx)); |
171 | const string_ref rfs_ref(retflds, strlen(retflds)); |
172 | writebuf.append_literal("P\t" ); |
173 | append_uint32(writebuf, pst_id); // FIXME size_t ? |
174 | writebuf.append_literal("\t" ); |
175 | writebuf.append(dbn_ref.begin(), dbn_ref.end()); |
176 | writebuf.append_literal("\t" ); |
177 | writebuf.append(tbl_ref.begin(), tbl_ref.end()); |
178 | writebuf.append_literal("\t" ); |
179 | writebuf.append(idx_ref.begin(), idx_ref.end()); |
180 | writebuf.append_literal("\t" ); |
181 | writebuf.append(rfs_ref.begin(), rfs_ref.end()); |
182 | if (filflds != 0) { |
183 | const string_ref fls_ref(filflds, strlen(filflds)); |
184 | writebuf.append_literal("\t" ); |
185 | writebuf.append(fls_ref.begin(), fls_ref.end()); |
186 | } |
187 | writebuf.append_literal("\n" ); |
188 | ++num_req_bufd; |
189 | } |
190 | |
191 | void |
192 | hstcpcli::request_buf_auth(const char *secret, const char *typ) |
193 | { |
194 | if (num_req_sent > 0 || num_req_rcvd > 0) { |
195 | close(); |
196 | set_error(-1, "request_buf_auth: protocol out of sync" ); |
197 | return; |
198 | } |
199 | if (typ == 0) { |
200 | typ = "1" ; |
201 | } |
202 | const string_ref typ_ref(typ, strlen(typ)); |
203 | const string_ref secret_ref(secret, strlen(secret)); |
204 | writebuf.append_literal("A\t" ); |
205 | writebuf.append(typ_ref.begin(), typ_ref.end()); |
206 | writebuf.append_literal("\t" ); |
207 | writebuf.append(secret_ref.begin(), secret_ref.end()); |
208 | writebuf.append_literal("\n" ); |
209 | ++num_req_bufd; |
210 | } |
211 | |
212 | namespace { |
213 | |
214 | void |
215 | append_delim_value(string_buffer& buf, const char *start, const char *finish) |
216 | { |
217 | if (start == 0) { |
218 | /* null */ |
219 | const char t[] = "\t\0" ; |
220 | buf.append(t, t + 2); |
221 | } else { |
222 | /* non-null */ |
223 | buf.append_literal("\t" ); |
224 | escape_string(buf, start, finish); |
225 | } |
226 | } |
227 | |
228 | }; |
229 | |
230 | void |
231 | hstcpcli::request_buf_exec_generic(size_t pst_id, const string_ref& op, |
232 | const string_ref *kvs, size_t kvslen, uint32_t limit, uint32_t skip, |
233 | const string_ref& mod_op, const string_ref *mvs, size_t mvslen, |
234 | const hstcpcli_filter *fils, size_t filslen, int invalues_keypart, |
235 | const string_ref *invalues, size_t invalueslen) |
236 | { |
237 | if (num_req_sent > 0 || num_req_rcvd > 0) { |
238 | close(); |
239 | set_error(-1, "request_buf_exec_generic: protocol out of sync" ); |
240 | return; |
241 | } |
242 | append_uint32(writebuf, pst_id); // FIXME size_t ? |
243 | writebuf.append_literal("\t" ); |
244 | writebuf.append(op.begin(), op.end()); |
245 | writebuf.append_literal("\t" ); |
246 | append_uint32(writebuf, kvslen); // FIXME size_t ? |
247 | for (size_t i = 0; i < kvslen; ++i) { |
248 | const string_ref& kv = kvs[i]; |
249 | append_delim_value(writebuf, kv.begin(), kv.end()); |
250 | } |
251 | if (limit != 0 || skip != 0 || invalues_keypart >= 0 || |
252 | mod_op.size() != 0 || filslen != 0) { |
253 | /* has more option */ |
254 | writebuf.append_literal("\t" ); |
255 | append_uint32(writebuf, limit); // FIXME size_t ? |
256 | if (skip != 0 || invalues_keypart >= 0 || |
257 | mod_op.size() != 0 || filslen != 0) { |
258 | writebuf.append_literal("\t" ); |
259 | append_uint32(writebuf, skip); // FIXME size_t ? |
260 | } |
261 | if (invalues_keypart >= 0) { |
262 | writebuf.append_literal("\t@\t" ); |
263 | append_uint32(writebuf, invalues_keypart); |
264 | writebuf.append_literal("\t" ); |
265 | append_uint32(writebuf, invalueslen); |
266 | for (size_t i = 0; i < invalueslen; ++i) { |
267 | const string_ref& s = invalues[i]; |
268 | append_delim_value(writebuf, s.begin(), s.end()); |
269 | } |
270 | } |
271 | for (size_t i = 0; i < filslen; ++i) { |
272 | const hstcpcli_filter& f = fils[i]; |
273 | writebuf.append_literal("\t" ); |
274 | writebuf.append(f.filter_type.begin(), f.filter_type.end()); |
275 | writebuf.append_literal("\t" ); |
276 | writebuf.append(f.op.begin(), f.op.end()); |
277 | writebuf.append_literal("\t" ); |
278 | append_uint32(writebuf, f.ff_offset); |
279 | append_delim_value(writebuf, f.val.begin(), f.val.end()); |
280 | } |
281 | if (mod_op.size() != 0) { |
282 | writebuf.append_literal("\t" ); |
283 | writebuf.append(mod_op.begin(), mod_op.end()); |
284 | for (size_t i = 0; i < mvslen; ++i) { |
285 | const string_ref& mv = mvs[i]; |
286 | append_delim_value(writebuf, mv.begin(), mv.end()); |
287 | } |
288 | } |
289 | } |
290 | writebuf.append_literal("\n" ); |
291 | ++num_req_bufd; |
292 | } |
293 | |
294 | int |
295 | hstcpcli::request_send() |
296 | { |
297 | if (error_code < 0) { |
298 | return error_code; |
299 | } |
300 | clear_error(); |
301 | if (fd.get() < 0) { |
302 | close(); |
303 | return set_error(-1, "write: closed" ); |
304 | } |
305 | if (num_req_bufd == 0 || num_req_sent > 0 || num_req_rcvd > 0) { |
306 | close(); |
307 | return set_error(-1, "request_send: protocol out of sync" ); |
308 | } |
309 | const size_t wrlen = writebuf.size(); |
310 | const ssize_t r = send(fd.get(), writebuf.begin(), wrlen, MSG_NOSIGNAL); |
311 | if (r <= 0) { |
312 | close(); |
313 | return set_error(-1, r < 0 ? "write: failed" : "write: eof" ); |
314 | } |
315 | writebuf.erase_front(r); |
316 | if (static_cast<size_t>(r) != wrlen) { |
317 | close(); |
318 | return set_error(-1, "write: incomplete" ); |
319 | } |
320 | num_req_sent = num_req_bufd; |
321 | num_req_bufd = 0; |
322 | DBG(fprintf(stderr, "REQSEND 0\n" )); |
323 | return 0; |
324 | } |
325 | |
326 | int |
327 | hstcpcli::response_recv(size_t& num_flds_r) |
328 | { |
329 | if (error_code < 0) { |
330 | return error_code; |
331 | } |
332 | clear_error(); |
333 | if (num_req_bufd > 0 || num_req_sent == 0 || num_req_rcvd > 0 || |
334 | response_end_offset != 0) { |
335 | close(); |
336 | return set_error(-1, "response_recv: protocol out of sync" ); |
337 | } |
338 | cur_row_offset = 0; |
339 | num_flds_r = num_flds = 0; |
340 | if (fd.get() < 0) { |
341 | return set_error(-1, "read: closed" ); |
342 | } |
343 | size_t offset = 0; |
344 | while (true) { |
345 | const char *const lbegin = readbuf.begin() + offset; |
346 | const char *const lend = readbuf.end(); |
347 | const char *const nl = memchr_char(lbegin, '\n', lend - lbegin); |
348 | if (nl != 0) { |
349 | offset = (nl + 1) - readbuf.begin(); |
350 | break; |
351 | } |
352 | if (read_more() <= 0) { |
353 | close(); |
354 | return set_error(-1, "read: eof" ); |
355 | } |
356 | } |
357 | response_end_offset = offset; |
358 | --num_req_sent; |
359 | ++num_req_rcvd; |
360 | char *start = readbuf.begin(); |
361 | char *const finish = start + response_end_offset - 1; |
362 | const size_t resp_code = read_ui32(start, finish); |
363 | skip_one(start, finish); |
364 | num_flds_r = num_flds = read_ui32(start, finish); |
365 | if (resp_code != 0) { |
366 | skip_one(start, finish); |
367 | char *const err_begin = start; |
368 | read_token(start, finish); |
369 | char *const err_end = start; |
370 | std::string e = std::string(err_begin, err_end - err_begin); |
371 | if (e.empty()) { |
372 | e = "unknown_error" ; |
373 | } |
374 | return set_error(resp_code, e); |
375 | } |
376 | cur_row_offset = start - readbuf.begin(); |
377 | DBG(fprintf(stderr, "[%s] ro=%zu eol=%zu\n" , |
378 | std::string(readbuf.begin(), readbuf.begin() + response_end_offset) |
379 | .c_str(), |
380 | cur_row_offset, response_end_offset)); |
381 | DBG(fprintf(stderr, "RES 0\n" )); |
382 | return 0; |
383 | } |
384 | |
385 | const string_ref * |
386 | hstcpcli::get_next_row() |
387 | { |
388 | if (num_flds == 0) { |
389 | DBG(fprintf(stderr, "GNR NF 0\n" )); |
390 | return 0; |
391 | } |
392 | if (flds.size() < num_flds) { |
393 | flds.resize(num_flds); |
394 | } |
395 | char *start = readbuf.begin() + cur_row_offset; |
396 | char *const finish = readbuf.begin() + response_end_offset - 1; |
397 | if (start >= finish) { /* start[0] == nl */ |
398 | DBG(fprintf(stderr, "GNR FIN 0 %p %p\n" , start, finish)); |
399 | return 0; |
400 | } |
401 | for (size_t i = 0; i < num_flds; ++i) { |
402 | skip_one(start, finish); |
403 | char *const fld_begin = start; |
404 | read_token(start, finish); |
405 | char *const fld_end = start; |
406 | char *wp = fld_begin; |
407 | if (is_null_expression(fld_begin, fld_end)) { |
408 | /* null */ |
409 | flds[i] = string_ref(); |
410 | } else { |
411 | unescape_string(wp, fld_begin, fld_end); /* in-place */ |
412 | flds[i] = string_ref(fld_begin, wp); |
413 | } |
414 | } |
415 | cur_row_offset = start - readbuf.begin(); |
416 | return &flds[0]; |
417 | } |
418 | |
419 | void |
420 | hstcpcli::response_buf_remove() |
421 | { |
422 | if (response_end_offset == 0) { |
423 | close(); |
424 | set_error(-1, "response_buf_remove: protocol out of sync" ); |
425 | return; |
426 | } |
427 | readbuf.erase_front(response_end_offset); |
428 | response_end_offset = 0; |
429 | --num_req_rcvd; |
430 | cur_row_offset = 0; |
431 | num_flds = 0; |
432 | flds.clear(); |
433 | } |
434 | |
435 | hstcpcli_ptr |
436 | hstcpcli_i::create(const socket_args& args) |
437 | { |
438 | return hstcpcli_ptr(new hstcpcli(args)); |
439 | } |
440 | |
441 | }; |
442 | |
443 | |