1 | /* |
2 | * read.c |
3 | * |
4 | * Copyright (C) 2016 Aerospike, Inc. |
5 | * |
6 | * Portions may be licensed to Aerospike, Inc. under one or more contributor |
7 | * license agreements. |
8 | * |
9 | * This program is free software: you can redistribute it and/or modify it under |
10 | * the terms of the GNU Affero General Public License as published by the Free |
11 | * Software Foundation, either version 3 of the License, or (at your option) any |
12 | * later version. |
13 | * |
14 | * This program is distributed in the hope that it will be useful, but WITHOUT |
15 | * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
16 | * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more |
17 | * details. |
18 | * |
19 | * You should have received a copy of the GNU Affero General Public License |
20 | * along with this program. If not, see http://www.gnu.org/licenses/ |
21 | */ |
22 | |
23 | //========================================================== |
24 | // Includes. |
25 | // |
26 | |
27 | #include "transaction/read.h" |
28 | |
29 | #include <stdbool.h> |
30 | #include <stddef.h> |
31 | #include <stdint.h> |
32 | |
33 | #include "citrusleaf/alloc.h" |
34 | #include "citrusleaf/cf_atomic.h" |
35 | #include "citrusleaf/cf_clock.h" |
36 | |
37 | #include "cf_mutex.h" |
38 | #include "dynbuf.h" |
39 | #include "fault.h" |
40 | |
41 | #include "base/batch.h" |
42 | #include "base/cfg.h" |
43 | #include "base/datamodel.h" |
44 | #include "base/index.h" |
45 | #include "base/predexp.h" |
46 | #include "base/proto.h" |
47 | #include "base/transaction.h" |
48 | #include "base/transaction_policy.h" |
49 | #include "fabric/partition.h" |
50 | #include "storage/storage.h" |
51 | #include "transaction/duplicate_resolve.h" |
52 | #include "transaction/proxy.h" |
53 | #include "transaction/replica_ping.h" |
54 | #include "transaction/rw_request.h" |
55 | #include "transaction/rw_request_hash.h" |
56 | #include "transaction/rw_utils.h" |
57 | |
58 | |
59 | //========================================================== |
60 | // Forward declarations. |
61 | // |
62 | |
63 | void start_read_dup_res(rw_request* rw, as_transaction* tr); |
64 | void start_repl_ping(rw_request* rw, as_transaction* tr); |
65 | bool read_dup_res_cb(rw_request* rw); |
66 | void repl_ping_after_dup_res(rw_request* rw, as_transaction* tr); |
67 | void repl_ping_cb(rw_request* rw); |
68 | |
69 | void send_read_response(as_transaction* tr, as_msg_op** ops, |
70 | as_bin** response_bins, uint16_t n_bins, cf_dyn_buf* db); |
71 | void read_timeout_cb(rw_request* rw); |
72 | |
73 | transaction_status read_local(as_transaction* tr); |
74 | void read_local_done(as_transaction* tr, as_index_ref* r_ref, as_storage_rd* rd, |
75 | int result_code); |
76 | int batch_predexp_filter_meta(const as_transaction* tr, const as_record* r, |
77 | predexp_eval_t** predexp); |
78 | |
79 | |
80 | //========================================================== |
81 | // Inlines & macros. |
82 | // |
83 | |
84 | static inline bool |
85 | read_must_duplicate_resolve(const as_transaction* tr) |
86 | { |
87 | return tr->rsv.n_dupl != 0 && |
88 | TR_READ_CONSISTENCY_LEVEL(tr) == AS_READ_CONSISTENCY_LEVEL_ALL; |
89 | } |
90 | |
91 | static inline bool |
92 | read_must_ping(const as_transaction *tr) |
93 | { |
94 | return (tr->flags & AS_TRANSACTION_FLAG_MUST_PING) != 0; |
95 | } |
96 | |
97 | static inline void |
98 | client_read_update_stats(as_namespace* ns, uint8_t result_code) |
99 | { |
100 | switch (result_code) { |
101 | case AS_OK: |
102 | cf_atomic64_incr(&ns->n_client_read_success); |
103 | break; |
104 | default: |
105 | cf_atomic64_incr(&ns->n_client_read_error); |
106 | break; |
107 | case AS_ERR_TIMEOUT: |
108 | cf_atomic64_incr(&ns->n_client_read_timeout); |
109 | break; |
110 | case AS_ERR_NOT_FOUND: |
111 | cf_atomic64_incr(&ns->n_client_read_not_found); |
112 | break; |
113 | case AS_ERR_FILTERED_OUT: |
114 | cf_atomic64_incr(&ns->n_client_read_filtered_out); |
115 | break; |
116 | } |
117 | } |
118 | |
119 | static inline void |
120 | from_proxy_read_update_stats(as_namespace* ns, uint8_t result_code) |
121 | { |
122 | switch (result_code) { |
123 | case AS_OK: |
124 | cf_atomic64_incr(&ns->n_from_proxy_read_success); |
125 | break; |
126 | default: |
127 | cf_atomic64_incr(&ns->n_from_proxy_read_error); |
128 | break; |
129 | case AS_ERR_TIMEOUT: |
130 | cf_atomic64_incr(&ns->n_from_proxy_read_timeout); |
131 | break; |
132 | case AS_ERR_NOT_FOUND: |
133 | cf_atomic64_incr(&ns->n_from_proxy_read_not_found); |
134 | break; |
135 | case AS_ERR_FILTERED_OUT: |
136 | cf_atomic64_incr(&ns->n_from_proxy_read_filtered_out); |
137 | break; |
138 | } |
139 | } |
140 | |
141 | static inline void |
142 | batch_sub_read_update_stats(as_namespace* ns, uint8_t result_code) |
143 | { |
144 | switch (result_code) { |
145 | case AS_OK: |
146 | cf_atomic64_incr(&ns->n_batch_sub_read_success); |
147 | break; |
148 | default: |
149 | cf_atomic64_incr(&ns->n_batch_sub_read_error); |
150 | break; |
151 | case AS_ERR_TIMEOUT: |
152 | cf_atomic64_incr(&ns->n_batch_sub_read_timeout); |
153 | break; |
154 | case AS_ERR_NOT_FOUND: |
155 | cf_atomic64_incr(&ns->n_batch_sub_read_not_found); |
156 | break; |
157 | case AS_ERR_FILTERED_OUT: |
158 | cf_atomic64_incr(&ns->n_batch_sub_read_filtered_out); |
159 | break; |
160 | } |
161 | } |
162 | |
163 | static inline void |
164 | from_proxy_batch_sub_read_update_stats(as_namespace* ns, uint8_t result_code) |
165 | { |
166 | switch (result_code) { |
167 | case AS_OK: |
168 | cf_atomic64_incr(&ns->n_from_proxy_batch_sub_read_success); |
169 | break; |
170 | default: |
171 | cf_atomic64_incr(&ns->n_from_proxy_batch_sub_read_error); |
172 | break; |
173 | case AS_ERR_TIMEOUT: |
174 | cf_atomic64_incr(&ns->n_from_proxy_batch_sub_read_timeout); |
175 | break; |
176 | case AS_ERR_NOT_FOUND: |
177 | cf_atomic64_incr(&ns->n_from_proxy_batch_sub_read_not_found); |
178 | break; |
179 | case AS_ERR_FILTERED_OUT: |
180 | cf_atomic64_incr(&ns->n_from_proxy_batch_sub_read_filtered_out); |
181 | break; |
182 | } |
183 | } |
184 | |
185 | |
186 | //========================================================== |
187 | // Public API. |
188 | // |
189 | |
190 | transaction_status |
191 | as_read_start(as_transaction* tr) |
192 | { |
193 | BENCHMARK_START(tr, read, FROM_CLIENT); |
194 | BENCHMARK_START(tr, batch_sub, FROM_BATCH); |
195 | |
196 | if (! repl_ping_check(tr)) { |
197 | send_read_response(tr, NULL, NULL, 0, NULL); |
198 | return TRANS_DONE_ERROR; |
199 | } |
200 | |
201 | transaction_status status; |
202 | bool must_duplicate_resolve = read_must_duplicate_resolve(tr); |
203 | bool must_ping = read_must_ping(tr); |
204 | |
205 | if (! must_duplicate_resolve && ! must_ping) { |
206 | // No network hops needed, try reading. |
207 | if ((status = read_local(tr)) != TRANS_IN_PROGRESS) { |
208 | return status; |
209 | } |
210 | // else - must try again under hash. |
211 | } |
212 | // else - there are duplicates, and we're configured to resolve them, or |
213 | // we're required to ping replicas. |
214 | |
215 | // Create rw_request and add to hash. |
216 | rw_request_hkey hkey = { tr->rsv.ns->id, tr->keyd }; |
217 | rw_request* rw = rw_request_create(&tr->keyd); |
218 | |
219 | // If rw_request isn't inserted in hash, transaction is finished. |
220 | if ((status = rw_request_hash_insert(&hkey, rw, tr)) != TRANS_IN_PROGRESS) { |
221 | rw_request_release(rw); |
222 | |
223 | if (status != TRANS_WAITING) { |
224 | send_read_response(tr, NULL, NULL, 0, NULL); |
225 | } |
226 | |
227 | return status; |
228 | } |
229 | // else - rw_request is now in hash, continue... |
230 | |
231 | if (must_duplicate_resolve) { |
232 | start_read_dup_res(rw, tr); |
233 | |
234 | // Started duplicate resolution. |
235 | return TRANS_IN_PROGRESS; |
236 | } |
237 | |
238 | if (must_ping) { |
239 | // Set up the nodes to which we'll ping. |
240 | rw->n_dest_nodes = as_partition_get_other_replicas(tr->rsv.p, |
241 | rw->dest_nodes); |
242 | |
243 | if (insufficient_replica_destinations(tr->rsv.ns, rw->n_dest_nodes)) { |
244 | rw_request_hash_delete(&hkey, rw); |
245 | tr->result_code = AS_ERR_UNAVAILABLE; |
246 | send_read_response(tr, NULL, NULL, 0, NULL); |
247 | return TRANS_DONE_ERROR; |
248 | } |
249 | |
250 | start_repl_ping(rw, tr); |
251 | |
252 | // Started replica ping. |
253 | return TRANS_IN_PROGRESS; |
254 | } |
255 | |
256 | // Trying again under hash. |
257 | status = read_local(tr); |
258 | cf_assert(status != TRANS_IN_PROGRESS, AS_RW, "read in-progress" ); |
259 | rw_request_hash_delete(&hkey, rw); |
260 | |
261 | return status; |
262 | } |
263 | |
264 | |
265 | //========================================================== |
266 | // Local helpers - transaction flow. |
267 | // |
268 | |
269 | void |
270 | start_read_dup_res(rw_request* rw, as_transaction* tr) |
271 | { |
272 | // Finish initializing rw_request, construct and send dup-res message. |
273 | |
274 | dup_res_make_message(rw, tr); |
275 | |
276 | cf_mutex_lock(&rw->lock); |
277 | |
278 | dup_res_setup_rw(rw, tr, read_dup_res_cb, read_timeout_cb); |
279 | send_rw_messages(rw); |
280 | |
281 | cf_mutex_unlock(&rw->lock); |
282 | } |
283 | |
284 | |
285 | void |
286 | start_repl_ping(rw_request* rw, as_transaction* tr) |
287 | { |
288 | // Finish initializing rw, construct and send repl-ping message. |
289 | |
290 | repl_ping_make_message(rw, tr); |
291 | |
292 | cf_mutex_lock(&rw->lock); |
293 | |
294 | repl_ping_setup_rw(rw, tr, repl_ping_cb, read_timeout_cb); |
295 | send_rw_messages(rw); |
296 | |
297 | cf_mutex_unlock(&rw->lock); |
298 | } |
299 | |
300 | |
301 | bool |
302 | read_dup_res_cb(rw_request* rw) |
303 | { |
304 | BENCHMARK_NEXT_DATA_POINT_FROM(rw, read, FROM_CLIENT, dup_res); |
305 | BENCHMARK_NEXT_DATA_POINT_FROM(rw, batch_sub, FROM_BATCH, dup_res); |
306 | |
307 | as_transaction tr; |
308 | as_transaction_init_from_rw(&tr, rw); |
309 | |
310 | if (tr.result_code != AS_OK) { |
311 | send_read_response(&tr, NULL, NULL, 0, NULL); |
312 | return true; |
313 | } |
314 | |
315 | if (read_must_ping(&tr)) { |
316 | // Set up the nodes to which we'll ping. |
317 | rw->n_dest_nodes = as_partition_get_other_replicas(tr.rsv.p, |
318 | rw->dest_nodes); |
319 | |
320 | if (insufficient_replica_destinations(tr.rsv.ns, rw->n_dest_nodes)) { |
321 | tr.result_code = AS_ERR_UNAVAILABLE; |
322 | send_read_response(&tr, NULL, NULL, 0, NULL); |
323 | return true; |
324 | } |
325 | |
326 | repl_ping_after_dup_res(rw, &tr); |
327 | |
328 | return false; |
329 | } |
330 | |
331 | // Read the local copy and respond to origin. |
332 | transaction_status status = read_local(&tr); |
333 | |
334 | cf_assert(status != TRANS_IN_PROGRESS, AS_RW, "read in-progress" ); |
335 | |
336 | if (status == TRANS_WAITING) { |
337 | // Note - new tr now owns msgp, make sure rw destructor doesn't free it. |
338 | // Also, rw will release rsv - new tr will get a new one. |
339 | rw->msgp = NULL; |
340 | } |
341 | |
342 | // Finished transaction - rw_request cleans up reservation and msgp! |
343 | return true; |
344 | } |
345 | |
346 | |
347 | void |
348 | repl_ping_after_dup_res(rw_request* rw, as_transaction* tr) |
349 | { |
350 | // Recycle rw_request that was just used for duplicate resolution to now do |
351 | // replica pings. Note - we are under the rw_request lock here! |
352 | |
353 | repl_ping_make_message(rw, tr); |
354 | repl_ping_reset_rw(rw, tr, repl_ping_cb); |
355 | send_rw_messages(rw); |
356 | } |
357 | |
358 | |
359 | void |
360 | repl_ping_cb(rw_request* rw) |
361 | { |
362 | BENCHMARK_NEXT_DATA_POINT_FROM(rw, read, FROM_CLIENT, repl_ping); |
363 | BENCHMARK_NEXT_DATA_POINT_FROM(rw, batch_sub, FROM_BATCH, repl_ping); |
364 | |
365 | as_transaction tr; |
366 | as_transaction_init_from_rw(&tr, rw); |
367 | |
368 | // Read the local copy and respond to origin. |
369 | transaction_status status = read_local(&tr); |
370 | |
371 | cf_assert(status != TRANS_IN_PROGRESS, AS_RW, "read in-progress" ); |
372 | |
373 | if (status == TRANS_WAITING) { |
374 | // Note - new tr now owns msgp, make sure rw destructor doesn't free it. |
375 | // Also, rw will release rsv - new tr will get a new one. |
376 | rw->msgp = NULL; |
377 | } |
378 | } |
379 | |
380 | |
381 | //========================================================== |
382 | // Local helpers - transaction end. |
383 | // |
384 | |
385 | void |
386 | send_read_response(as_transaction* tr, as_msg_op** ops, as_bin** response_bins, |
387 | uint16_t n_bins, cf_dyn_buf* db) |
388 | { |
389 | // Paranoia - shouldn't get here on losing race with timeout. |
390 | if (! tr->from.any) { |
391 | cf_warning(AS_RW, "transaction origin %u has null 'from'" , tr->origin); |
392 | return; |
393 | } |
394 | |
395 | // Note - if tr was setup from rw, rw->from.any has been set null and |
396 | // informs timeout it lost the race. |
397 | |
398 | switch (tr->origin) { |
399 | case FROM_CLIENT: |
400 | BENCHMARK_NEXT_DATA_POINT(tr, read, local); |
401 | if (db && db->used_sz != 0) { |
402 | as_msg_send_ops_reply(tr->from.proto_fd_h, db); |
403 | } |
404 | else { |
405 | as_msg_send_reply(tr->from.proto_fd_h, tr->result_code, |
406 | tr->generation, tr->void_time, ops, response_bins, n_bins, |
407 | tr->rsv.ns, as_transaction_trid(tr)); |
408 | } |
409 | BENCHMARK_NEXT_DATA_POINT(tr, read, response); |
410 | HIST_TRACK_ACTIVATE_INSERT_DATA_POINT(tr, read_hist); |
411 | client_read_update_stats(tr->rsv.ns, tr->result_code); |
412 | break; |
413 | case FROM_PROXY: |
414 | if (db && db->used_sz != 0) { |
415 | as_proxy_send_ops_response(tr->from.proxy_node, |
416 | tr->from_data.proxy_tid, db); |
417 | } |
418 | else { |
419 | as_proxy_send_response(tr->from.proxy_node, tr->from_data.proxy_tid, |
420 | tr->result_code, tr->generation, tr->void_time, ops, |
421 | response_bins, n_bins, tr->rsv.ns, as_transaction_trid(tr)); |
422 | } |
423 | if (as_transaction_is_batch_sub(tr)) { |
424 | from_proxy_batch_sub_read_update_stats(tr->rsv.ns, tr->result_code); |
425 | } |
426 | else { |
427 | from_proxy_read_update_stats(tr->rsv.ns, tr->result_code); |
428 | } |
429 | break; |
430 | case FROM_BATCH: |
431 | BENCHMARK_NEXT_DATA_POINT(tr, batch_sub, read_local); |
432 | as_batch_add_result(tr, n_bins, response_bins, ops); |
433 | BENCHMARK_NEXT_DATA_POINT(tr, batch_sub, response); |
434 | batch_sub_read_update_stats(tr->rsv.ns, tr->result_code); |
435 | break; |
436 | default: |
437 | cf_crash(AS_RW, "unexpected transaction origin %u" , tr->origin); |
438 | break; |
439 | } |
440 | |
441 | tr->from.any = NULL; // pattern, not needed |
442 | } |
443 | |
444 | |
445 | void |
446 | read_timeout_cb(rw_request* rw) |
447 | { |
448 | if (! rw->from.any) { |
449 | return; // lost race against dup-res callback |
450 | } |
451 | |
452 | switch (rw->origin) { |
453 | case FROM_CLIENT: |
454 | as_msg_send_reply(rw->from.proto_fd_h, AS_ERR_TIMEOUT, 0, 0, NULL, NULL, |
455 | 0, rw->rsv.ns, rw_request_trid(rw)); |
456 | // Timeouts aren't included in histograms. |
457 | client_read_update_stats(rw->rsv.ns, AS_ERR_TIMEOUT); |
458 | break; |
459 | case FROM_PROXY: |
460 | if (rw_request_is_batch_sub(rw)) { |
461 | from_proxy_batch_sub_read_update_stats(rw->rsv.ns, AS_ERR_TIMEOUT); |
462 | } |
463 | else { |
464 | from_proxy_read_update_stats(rw->rsv.ns, AS_ERR_TIMEOUT); |
465 | } |
466 | break; |
467 | case FROM_BATCH: |
468 | as_batch_add_error(rw->from.batch_shared, rw->from_data.batch_index, |
469 | AS_ERR_TIMEOUT); |
470 | // Timeouts aren't included in histograms. |
471 | batch_sub_read_update_stats(rw->rsv.ns, AS_ERR_TIMEOUT); |
472 | break; |
473 | default: |
474 | cf_crash(AS_RW, "unexpected transaction origin %u" , rw->origin); |
475 | break; |
476 | } |
477 | |
478 | rw->from.any = NULL; // inform other callback it lost the race |
479 | } |
480 | |
481 | |
482 | //========================================================== |
483 | // Local helpers - read local. |
484 | // |
485 | |
486 | transaction_status |
487 | read_local(as_transaction* tr) |
488 | { |
489 | as_msg* m = &tr->msgp->msg; |
490 | as_namespace* ns = tr->rsv.ns; |
491 | |
492 | as_index_ref r_ref; |
493 | |
494 | if (as_record_get(tr->rsv.tree, &tr->keyd, &r_ref) != 0) { |
495 | read_local_done(tr, NULL, NULL, AS_ERR_NOT_FOUND); |
496 | return TRANS_DONE_ERROR; |
497 | } |
498 | |
499 | as_record* r = r_ref.r; |
500 | |
501 | // Make sure the message set name (if it's there) is correct. |
502 | if (! set_name_check(tr, r)) { |
503 | read_local_done(tr, &r_ref, NULL, AS_ERR_PARAMETER); |
504 | return TRANS_DONE_ERROR; |
505 | } |
506 | |
507 | // Check if it's an expired or truncated record. |
508 | if (as_record_is_doomed(r, ns)) { |
509 | read_local_done(tr, &r_ref, NULL, AS_ERR_NOT_FOUND); |
510 | return TRANS_DONE_ERROR; |
511 | } |
512 | |
513 | int result = repl_state_check(r, tr); |
514 | |
515 | if (result != 0) { |
516 | if (result == -3) { |
517 | read_local_done(tr, &r_ref, NULL, AS_ERR_UNAVAILABLE); |
518 | return TRANS_DONE_ERROR; |
519 | } |
520 | |
521 | // No response sent to origin. |
522 | as_record_done(&r_ref, ns); |
523 | return result == 1 ? TRANS_IN_PROGRESS : TRANS_WAITING; |
524 | } |
525 | |
526 | // Check if it's a tombstone. |
527 | if (! as_record_is_live(r)) { |
528 | read_local_done(tr, &r_ref, NULL, AS_ERR_NOT_FOUND); |
529 | return TRANS_DONE_ERROR; |
530 | } |
531 | |
532 | // Apply predexp metadata filter if present. |
533 | |
534 | predexp_eval_t* predexp = NULL; |
535 | predexp_eval_t* batch_predexp = NULL; |
536 | |
537 | if ((result = tr->origin == FROM_BATCH ? |
538 | batch_predexp_filter_meta(tr, r, &batch_predexp) : |
539 | build_predexp_and_filter_meta(tr, r, &predexp)) != 0) { |
540 | read_local_done(tr, &r_ref, NULL, result); |
541 | return TRANS_DONE_ERROR; |
542 | } |
543 | |
544 | as_storage_rd rd; |
545 | |
546 | as_storage_record_open(ns, r, &rd); |
547 | |
548 | // If configuration permits, allow reads to use page cache. |
549 | rd.read_page_cache = ns->storage_read_page_cache; |
550 | |
551 | // Apply predexp record bins filter if present. |
552 | if (predexp != NULL || batch_predexp != NULL) { |
553 | if ((result = predexp_read_and_filter_bins(&rd, |
554 | tr->origin == FROM_BATCH ? batch_predexp : predexp)) != 0) { |
555 | predexp_destroy(predexp); |
556 | read_local_done(tr, &r_ref, &rd, result); |
557 | return TRANS_DONE_ERROR; |
558 | } |
559 | |
560 | predexp_destroy(predexp); |
561 | } |
562 | |
563 | // Check the key if required. |
564 | // Note - for data-not-in-memory "exists" ops, key check is expensive! |
565 | if (as_transaction_has_key(tr) && |
566 | as_storage_record_get_key(&rd) && ! check_msg_key(m, &rd)) { |
567 | read_local_done(tr, &r_ref, &rd, AS_ERR_KEY_MISMATCH); |
568 | return TRANS_DONE_ERROR; |
569 | } |
570 | |
571 | if ((m->info1 & AS_MSG_INFO1_GET_NO_BINS) != 0) { |
572 | tr->generation = r->generation; |
573 | tr->void_time = r->void_time; |
574 | tr->last_update_time = r->last_update_time; |
575 | |
576 | read_local_done(tr, &r_ref, &rd, AS_OK); |
577 | return TRANS_DONE_SUCCESS; |
578 | } |
579 | |
580 | if ((result = as_storage_rd_load_n_bins(&rd)) < 0) { |
581 | cf_warning_digest(AS_RW, &tr->keyd, "{%s} read_local: failed as_storage_rd_load_n_bins() " , ns->name); |
582 | read_local_done(tr, &r_ref, &rd, -result); |
583 | return TRANS_DONE_ERROR; |
584 | } |
585 | |
586 | as_bin stack_bins[ns->storage_data_in_memory ? 0 : rd.n_bins]; |
587 | |
588 | if ((result = as_storage_rd_load_bins(&rd, stack_bins)) < 0) { |
589 | cf_warning_digest(AS_RW, &tr->keyd, "{%s} read_local: failed as_storage_rd_load_bins() " , ns->name); |
590 | read_local_done(tr, &r_ref, &rd, -result); |
591 | return TRANS_DONE_ERROR; |
592 | } |
593 | |
594 | uint32_t bin_count = (m->info1 & AS_MSG_INFO1_GET_ALL) != 0 ? |
595 | rd.n_bins : m->n_ops; |
596 | |
597 | as_msg_op* ops[bin_count]; |
598 | as_msg_op** p_ops = ops; |
599 | as_bin* response_bins[bin_count]; |
600 | uint16_t n_bins = 0; |
601 | |
602 | as_bin result_bins[bin_count]; |
603 | uint32_t n_result_bins = 0; |
604 | |
605 | if ((m->info1 & AS_MSG_INFO1_GET_ALL) != 0) { |
606 | p_ops = NULL; |
607 | n_bins = rd.n_bins; |
608 | as_bin_get_all_p(&rd, response_bins); |
609 | } |
610 | else { |
611 | if (m->n_ops == 0) { |
612 | cf_warning_digest(AS_RW, &tr->keyd, "{%s} read_local: bin op(s) expected, none present " , ns->name); |
613 | read_local_done(tr, &r_ref, &rd, AS_ERR_PARAMETER); |
614 | return TRANS_DONE_ERROR; |
615 | } |
616 | |
617 | bool respond_all_ops = (m->info2 & AS_MSG_INFO2_RESPOND_ALL_OPS) != 0; |
618 | |
619 | as_msg_op* op = 0; |
620 | int n = 0; |
621 | |
622 | while ((op = as_msg_op_iterate(m, op, &n)) != NULL) { |
623 | if (op->op == AS_MSG_OP_READ) { |
624 | as_bin* b = as_bin_get_from_buf(&rd, op->name, op->name_sz); |
625 | |
626 | if (b || respond_all_ops) { |
627 | ops[n_bins] = op; |
628 | response_bins[n_bins++] = b; |
629 | } |
630 | } |
631 | else if (op->op == AS_MSG_OP_BITS_READ) { |
632 | as_bin* b = as_bin_get_from_buf(&rd, op->name, op->name_sz); |
633 | |
634 | if (b) { |
635 | as_bin* rb = &result_bins[n_result_bins]; |
636 | as_bin_set_empty(rb); |
637 | |
638 | if ((result = as_bin_bits_read_from_client(b, op, rb)) < 0) { |
639 | cf_warning_digest(AS_RW, &tr->keyd, "{%s} read_local: failed as_bin_bits_read_from_client() " , ns->name); |
640 | destroy_stack_bins(result_bins, n_result_bins); |
641 | read_local_done(tr, &r_ref, &rd, -result); |
642 | return TRANS_DONE_ERROR; |
643 | } |
644 | |
645 | if (as_bin_inuse(rb)) { |
646 | n_result_bins++; |
647 | ops[n_bins] = op; |
648 | response_bins[n_bins++] = rb; |
649 | } |
650 | else if (respond_all_ops) { |
651 | ops[n_bins] = op; |
652 | response_bins[n_bins++] = NULL; |
653 | } |
654 | } |
655 | else if (respond_all_ops) { |
656 | ops[n_bins] = op; |
657 | response_bins[n_bins++] = NULL; |
658 | } |
659 | } |
660 | else if (op->op == AS_MSG_OP_CDT_READ) { |
661 | as_bin* b = as_bin_get_from_buf(&rd, op->name, op->name_sz); |
662 | |
663 | if (b) { |
664 | as_bin* rb = &result_bins[n_result_bins]; |
665 | as_bin_set_empty(rb); |
666 | |
667 | if ((result = as_bin_cdt_read_from_client(b, op, rb)) < 0) { |
668 | cf_warning_digest(AS_RW, &tr->keyd, "{%s} read_local: failed as_bin_cdt_read_from_client() " , ns->name); |
669 | destroy_stack_bins(result_bins, n_result_bins); |
670 | read_local_done(tr, &r_ref, &rd, -result); |
671 | return TRANS_DONE_ERROR; |
672 | } |
673 | |
674 | if (as_bin_inuse(rb)) { |
675 | n_result_bins++; |
676 | ops[n_bins] = op; |
677 | response_bins[n_bins++] = rb; |
678 | } |
679 | else if (respond_all_ops) { |
680 | ops[n_bins] = op; |
681 | response_bins[n_bins++] = NULL; |
682 | } |
683 | } |
684 | else if (respond_all_ops) { |
685 | ops[n_bins] = op; |
686 | response_bins[n_bins++] = NULL; |
687 | } |
688 | } |
689 | else { |
690 | cf_warning_digest(AS_RW, &tr->keyd, "{%s} read_local: unexpected bin op %u " , ns->name, op->op); |
691 | destroy_stack_bins(result_bins, n_result_bins); |
692 | read_local_done(tr, &r_ref, &rd, AS_ERR_PARAMETER); |
693 | return TRANS_DONE_ERROR; |
694 | } |
695 | } |
696 | } |
697 | |
698 | cf_dyn_buf_define_size(db, 16 * 1024); |
699 | |
700 | if (tr->origin != FROM_BATCH) { |
701 | db.used_sz = db.alloc_sz; |
702 | db.buf = (uint8_t*)as_msg_make_response_msg(tr->result_code, |
703 | r->generation, r->void_time, p_ops, response_bins, n_bins, ns, |
704 | (cl_msg*)dyn_bufdb, &db.used_sz, as_transaction_trid(tr)); |
705 | |
706 | db.is_stack = db.buf == dyn_bufdb; |
707 | // Note - not bothering to correct alloc_sz if buf was allocated. |
708 | } |
709 | else { |
710 | tr->generation = r->generation; |
711 | tr->void_time = r->void_time; |
712 | tr->last_update_time = r->last_update_time; |
713 | |
714 | // Since as_batch_add_result() constructs response directly in shared |
715 | // buffer to avoid extra copies, can't use db. |
716 | send_read_response(tr, p_ops, response_bins, n_bins, NULL); |
717 | } |
718 | |
719 | destroy_stack_bins(result_bins, n_result_bins); |
720 | as_storage_record_close(&rd); |
721 | as_record_done(&r_ref, ns); |
722 | |
723 | // Now that we're not under the record lock, send the message we just built. |
724 | if (db.used_sz != 0) { |
725 | send_read_response(tr, NULL, NULL, 0, &db); |
726 | |
727 | cf_dyn_buf_free(&db); |
728 | tr->from.proto_fd_h = NULL; |
729 | } |
730 | |
731 | return TRANS_DONE_SUCCESS; |
732 | } |
733 | |
734 | |
735 | void |
736 | read_local_done(as_transaction* tr, as_index_ref* r_ref, as_storage_rd* rd, |
737 | int result_code) |
738 | { |
739 | if (r_ref) { |
740 | if (rd) { |
741 | as_storage_record_close(rd); |
742 | } |
743 | |
744 | as_record_done(r_ref, tr->rsv.ns); |
745 | } |
746 | |
747 | tr->result_code = (uint8_t)result_code; |
748 | |
749 | send_read_response(tr, NULL, NULL, 0, NULL); |
750 | } |
751 | |
752 | |
753 | int |
754 | batch_predexp_filter_meta(const as_transaction* tr, const as_record* r, |
755 | predexp_eval_t** predexp) |
756 | { |
757 | *predexp = as_batch_get_predexp(tr->from.batch_shared); |
758 | |
759 | if (*predexp == NULL) { |
760 | return AS_OK; |
761 | } |
762 | |
763 | predexp_args_t predargs = { .ns = tr->rsv.ns, .md = (as_record*)r }; |
764 | predexp_retval_t predrv = predexp_matches_metadata(*predexp, &predargs); |
765 | |
766 | if (predrv == PREDEXP_UNKNOWN) { |
767 | return AS_OK; // caller must later check bins using *predexp |
768 | } |
769 | // else - caller will not need to apply filter later. |
770 | |
771 | *predexp = NULL; |
772 | |
773 | return predrv == PREDEXP_TRUE ? AS_OK : AS_ERR_FILTERED_OUT; |
774 | } |
775 | |