1/*
2 * read.c
3 *
4 * Copyright (C) 2016 Aerospike, Inc.
5 *
6 * Portions may be licensed to Aerospike, Inc. under one or more contributor
7 * license agreements.
8 *
9 * This program is free software: you can redistribute it and/or modify it under
10 * the terms of the GNU Affero General Public License as published by the Free
11 * Software Foundation, either version 3 of the License, or (at your option) any
12 * later version.
13 *
14 * This program is distributed in the hope that it will be useful, but WITHOUT
15 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
16 * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
17 * details.
18 *
19 * You should have received a copy of the GNU Affero General Public License
20 * along with this program. If not, see http://www.gnu.org/licenses/
21 */
22
23//==========================================================
24// Includes.
25//
26
27#include "transaction/read.h"
28
29#include <stdbool.h>
30#include <stddef.h>
31#include <stdint.h>
32
33#include "citrusleaf/alloc.h"
34#include "citrusleaf/cf_atomic.h"
35#include "citrusleaf/cf_clock.h"
36
37#include "cf_mutex.h"
38#include "dynbuf.h"
39#include "fault.h"
40
41#include "base/batch.h"
42#include "base/cfg.h"
43#include "base/datamodel.h"
44#include "base/index.h"
45#include "base/predexp.h"
46#include "base/proto.h"
47#include "base/transaction.h"
48#include "base/transaction_policy.h"
49#include "fabric/partition.h"
50#include "storage/storage.h"
51#include "transaction/duplicate_resolve.h"
52#include "transaction/proxy.h"
53#include "transaction/replica_ping.h"
54#include "transaction/rw_request.h"
55#include "transaction/rw_request_hash.h"
56#include "transaction/rw_utils.h"
57
58
59//==========================================================
60// Forward declarations.
61//
62
63void start_read_dup_res(rw_request* rw, as_transaction* tr);
64void start_repl_ping(rw_request* rw, as_transaction* tr);
65bool read_dup_res_cb(rw_request* rw);
66void repl_ping_after_dup_res(rw_request* rw, as_transaction* tr);
67void repl_ping_cb(rw_request* rw);
68
69void send_read_response(as_transaction* tr, as_msg_op** ops,
70 as_bin** response_bins, uint16_t n_bins, cf_dyn_buf* db);
71void read_timeout_cb(rw_request* rw);
72
73transaction_status read_local(as_transaction* tr);
74void read_local_done(as_transaction* tr, as_index_ref* r_ref, as_storage_rd* rd,
75 int result_code);
76int batch_predexp_filter_meta(const as_transaction* tr, const as_record* r,
77 predexp_eval_t** predexp);
78
79
80//==========================================================
81// Inlines & macros.
82//
83
84static inline bool
85read_must_duplicate_resolve(const as_transaction* tr)
86{
87 return tr->rsv.n_dupl != 0 &&
88 TR_READ_CONSISTENCY_LEVEL(tr) == AS_READ_CONSISTENCY_LEVEL_ALL;
89}
90
91static inline bool
92read_must_ping(const as_transaction *tr)
93{
94 return (tr->flags & AS_TRANSACTION_FLAG_MUST_PING) != 0;
95}
96
97static inline void
98client_read_update_stats(as_namespace* ns, uint8_t result_code)
99{
100 switch (result_code) {
101 case AS_OK:
102 cf_atomic64_incr(&ns->n_client_read_success);
103 break;
104 default:
105 cf_atomic64_incr(&ns->n_client_read_error);
106 break;
107 case AS_ERR_TIMEOUT:
108 cf_atomic64_incr(&ns->n_client_read_timeout);
109 break;
110 case AS_ERR_NOT_FOUND:
111 cf_atomic64_incr(&ns->n_client_read_not_found);
112 break;
113 case AS_ERR_FILTERED_OUT:
114 cf_atomic64_incr(&ns->n_client_read_filtered_out);
115 break;
116 }
117}
118
119static inline void
120from_proxy_read_update_stats(as_namespace* ns, uint8_t result_code)
121{
122 switch (result_code) {
123 case AS_OK:
124 cf_atomic64_incr(&ns->n_from_proxy_read_success);
125 break;
126 default:
127 cf_atomic64_incr(&ns->n_from_proxy_read_error);
128 break;
129 case AS_ERR_TIMEOUT:
130 cf_atomic64_incr(&ns->n_from_proxy_read_timeout);
131 break;
132 case AS_ERR_NOT_FOUND:
133 cf_atomic64_incr(&ns->n_from_proxy_read_not_found);
134 break;
135 case AS_ERR_FILTERED_OUT:
136 cf_atomic64_incr(&ns->n_from_proxy_read_filtered_out);
137 break;
138 }
139}
140
141static inline void
142batch_sub_read_update_stats(as_namespace* ns, uint8_t result_code)
143{
144 switch (result_code) {
145 case AS_OK:
146 cf_atomic64_incr(&ns->n_batch_sub_read_success);
147 break;
148 default:
149 cf_atomic64_incr(&ns->n_batch_sub_read_error);
150 break;
151 case AS_ERR_TIMEOUT:
152 cf_atomic64_incr(&ns->n_batch_sub_read_timeout);
153 break;
154 case AS_ERR_NOT_FOUND:
155 cf_atomic64_incr(&ns->n_batch_sub_read_not_found);
156 break;
157 case AS_ERR_FILTERED_OUT:
158 cf_atomic64_incr(&ns->n_batch_sub_read_filtered_out);
159 break;
160 }
161}
162
163static inline void
164from_proxy_batch_sub_read_update_stats(as_namespace* ns, uint8_t result_code)
165{
166 switch (result_code) {
167 case AS_OK:
168 cf_atomic64_incr(&ns->n_from_proxy_batch_sub_read_success);
169 break;
170 default:
171 cf_atomic64_incr(&ns->n_from_proxy_batch_sub_read_error);
172 break;
173 case AS_ERR_TIMEOUT:
174 cf_atomic64_incr(&ns->n_from_proxy_batch_sub_read_timeout);
175 break;
176 case AS_ERR_NOT_FOUND:
177 cf_atomic64_incr(&ns->n_from_proxy_batch_sub_read_not_found);
178 break;
179 case AS_ERR_FILTERED_OUT:
180 cf_atomic64_incr(&ns->n_from_proxy_batch_sub_read_filtered_out);
181 break;
182 }
183}
184
185
186//==========================================================
187// Public API.
188//
189
190transaction_status
191as_read_start(as_transaction* tr)
192{
193 BENCHMARK_START(tr, read, FROM_CLIENT);
194 BENCHMARK_START(tr, batch_sub, FROM_BATCH);
195
196 if (! repl_ping_check(tr)) {
197 send_read_response(tr, NULL, NULL, 0, NULL);
198 return TRANS_DONE_ERROR;
199 }
200
201 transaction_status status;
202 bool must_duplicate_resolve = read_must_duplicate_resolve(tr);
203 bool must_ping = read_must_ping(tr);
204
205 if (! must_duplicate_resolve && ! must_ping) {
206 // No network hops needed, try reading.
207 if ((status = read_local(tr)) != TRANS_IN_PROGRESS) {
208 return status;
209 }
210 // else - must try again under hash.
211 }
212 // else - there are duplicates, and we're configured to resolve them, or
213 // we're required to ping replicas.
214
215 // Create rw_request and add to hash.
216 rw_request_hkey hkey = { tr->rsv.ns->id, tr->keyd };
217 rw_request* rw = rw_request_create(&tr->keyd);
218
219 // If rw_request isn't inserted in hash, transaction is finished.
220 if ((status = rw_request_hash_insert(&hkey, rw, tr)) != TRANS_IN_PROGRESS) {
221 rw_request_release(rw);
222
223 if (status != TRANS_WAITING) {
224 send_read_response(tr, NULL, NULL, 0, NULL);
225 }
226
227 return status;
228 }
229 // else - rw_request is now in hash, continue...
230
231 if (must_duplicate_resolve) {
232 start_read_dup_res(rw, tr);
233
234 // Started duplicate resolution.
235 return TRANS_IN_PROGRESS;
236 }
237
238 if (must_ping) {
239 // Set up the nodes to which we'll ping.
240 rw->n_dest_nodes = as_partition_get_other_replicas(tr->rsv.p,
241 rw->dest_nodes);
242
243 if (insufficient_replica_destinations(tr->rsv.ns, rw->n_dest_nodes)) {
244 rw_request_hash_delete(&hkey, rw);
245 tr->result_code = AS_ERR_UNAVAILABLE;
246 send_read_response(tr, NULL, NULL, 0, NULL);
247 return TRANS_DONE_ERROR;
248 }
249
250 start_repl_ping(rw, tr);
251
252 // Started replica ping.
253 return TRANS_IN_PROGRESS;
254 }
255
256 // Trying again under hash.
257 status = read_local(tr);
258 cf_assert(status != TRANS_IN_PROGRESS, AS_RW, "read in-progress");
259 rw_request_hash_delete(&hkey, rw);
260
261 return status;
262}
263
264
265//==========================================================
266// Local helpers - transaction flow.
267//
268
269void
270start_read_dup_res(rw_request* rw, as_transaction* tr)
271{
272 // Finish initializing rw_request, construct and send dup-res message.
273
274 dup_res_make_message(rw, tr);
275
276 cf_mutex_lock(&rw->lock);
277
278 dup_res_setup_rw(rw, tr, read_dup_res_cb, read_timeout_cb);
279 send_rw_messages(rw);
280
281 cf_mutex_unlock(&rw->lock);
282}
283
284
285void
286start_repl_ping(rw_request* rw, as_transaction* tr)
287{
288 // Finish initializing rw, construct and send repl-ping message.
289
290 repl_ping_make_message(rw, tr);
291
292 cf_mutex_lock(&rw->lock);
293
294 repl_ping_setup_rw(rw, tr, repl_ping_cb, read_timeout_cb);
295 send_rw_messages(rw);
296
297 cf_mutex_unlock(&rw->lock);
298}
299
300
301bool
302read_dup_res_cb(rw_request* rw)
303{
304 BENCHMARK_NEXT_DATA_POINT_FROM(rw, read, FROM_CLIENT, dup_res);
305 BENCHMARK_NEXT_DATA_POINT_FROM(rw, batch_sub, FROM_BATCH, dup_res);
306
307 as_transaction tr;
308 as_transaction_init_from_rw(&tr, rw);
309
310 if (tr.result_code != AS_OK) {
311 send_read_response(&tr, NULL, NULL, 0, NULL);
312 return true;
313 }
314
315 if (read_must_ping(&tr)) {
316 // Set up the nodes to which we'll ping.
317 rw->n_dest_nodes = as_partition_get_other_replicas(tr.rsv.p,
318 rw->dest_nodes);
319
320 if (insufficient_replica_destinations(tr.rsv.ns, rw->n_dest_nodes)) {
321 tr.result_code = AS_ERR_UNAVAILABLE;
322 send_read_response(&tr, NULL, NULL, 0, NULL);
323 return true;
324 }
325
326 repl_ping_after_dup_res(rw, &tr);
327
328 return false;
329 }
330
331 // Read the local copy and respond to origin.
332 transaction_status status = read_local(&tr);
333
334 cf_assert(status != TRANS_IN_PROGRESS, AS_RW, "read in-progress");
335
336 if (status == TRANS_WAITING) {
337 // Note - new tr now owns msgp, make sure rw destructor doesn't free it.
338 // Also, rw will release rsv - new tr will get a new one.
339 rw->msgp = NULL;
340 }
341
342 // Finished transaction - rw_request cleans up reservation and msgp!
343 return true;
344}
345
346
347void
348repl_ping_after_dup_res(rw_request* rw, as_transaction* tr)
349{
350 // Recycle rw_request that was just used for duplicate resolution to now do
351 // replica pings. Note - we are under the rw_request lock here!
352
353 repl_ping_make_message(rw, tr);
354 repl_ping_reset_rw(rw, tr, repl_ping_cb);
355 send_rw_messages(rw);
356}
357
358
359void
360repl_ping_cb(rw_request* rw)
361{
362 BENCHMARK_NEXT_DATA_POINT_FROM(rw, read, FROM_CLIENT, repl_ping);
363 BENCHMARK_NEXT_DATA_POINT_FROM(rw, batch_sub, FROM_BATCH, repl_ping);
364
365 as_transaction tr;
366 as_transaction_init_from_rw(&tr, rw);
367
368 // Read the local copy and respond to origin.
369 transaction_status status = read_local(&tr);
370
371 cf_assert(status != TRANS_IN_PROGRESS, AS_RW, "read in-progress");
372
373 if (status == TRANS_WAITING) {
374 // Note - new tr now owns msgp, make sure rw destructor doesn't free it.
375 // Also, rw will release rsv - new tr will get a new one.
376 rw->msgp = NULL;
377 }
378}
379
380
381//==========================================================
382// Local helpers - transaction end.
383//
384
385void
386send_read_response(as_transaction* tr, as_msg_op** ops, as_bin** response_bins,
387 uint16_t n_bins, cf_dyn_buf* db)
388{
389 // Paranoia - shouldn't get here on losing race with timeout.
390 if (! tr->from.any) {
391 cf_warning(AS_RW, "transaction origin %u has null 'from'", tr->origin);
392 return;
393 }
394
395 // Note - if tr was setup from rw, rw->from.any has been set null and
396 // informs timeout it lost the race.
397
398 switch (tr->origin) {
399 case FROM_CLIENT:
400 BENCHMARK_NEXT_DATA_POINT(tr, read, local);
401 if (db && db->used_sz != 0) {
402 as_msg_send_ops_reply(tr->from.proto_fd_h, db);
403 }
404 else {
405 as_msg_send_reply(tr->from.proto_fd_h, tr->result_code,
406 tr->generation, tr->void_time, ops, response_bins, n_bins,
407 tr->rsv.ns, as_transaction_trid(tr));
408 }
409 BENCHMARK_NEXT_DATA_POINT(tr, read, response);
410 HIST_TRACK_ACTIVATE_INSERT_DATA_POINT(tr, read_hist);
411 client_read_update_stats(tr->rsv.ns, tr->result_code);
412 break;
413 case FROM_PROXY:
414 if (db && db->used_sz != 0) {
415 as_proxy_send_ops_response(tr->from.proxy_node,
416 tr->from_data.proxy_tid, db);
417 }
418 else {
419 as_proxy_send_response(tr->from.proxy_node, tr->from_data.proxy_tid,
420 tr->result_code, tr->generation, tr->void_time, ops,
421 response_bins, n_bins, tr->rsv.ns, as_transaction_trid(tr));
422 }
423 if (as_transaction_is_batch_sub(tr)) {
424 from_proxy_batch_sub_read_update_stats(tr->rsv.ns, tr->result_code);
425 }
426 else {
427 from_proxy_read_update_stats(tr->rsv.ns, tr->result_code);
428 }
429 break;
430 case FROM_BATCH:
431 BENCHMARK_NEXT_DATA_POINT(tr, batch_sub, read_local);
432 as_batch_add_result(tr, n_bins, response_bins, ops);
433 BENCHMARK_NEXT_DATA_POINT(tr, batch_sub, response);
434 batch_sub_read_update_stats(tr->rsv.ns, tr->result_code);
435 break;
436 default:
437 cf_crash(AS_RW, "unexpected transaction origin %u", tr->origin);
438 break;
439 }
440
441 tr->from.any = NULL; // pattern, not needed
442}
443
444
445void
446read_timeout_cb(rw_request* rw)
447{
448 if (! rw->from.any) {
449 return; // lost race against dup-res callback
450 }
451
452 switch (rw->origin) {
453 case FROM_CLIENT:
454 as_msg_send_reply(rw->from.proto_fd_h, AS_ERR_TIMEOUT, 0, 0, NULL, NULL,
455 0, rw->rsv.ns, rw_request_trid(rw));
456 // Timeouts aren't included in histograms.
457 client_read_update_stats(rw->rsv.ns, AS_ERR_TIMEOUT);
458 break;
459 case FROM_PROXY:
460 if (rw_request_is_batch_sub(rw)) {
461 from_proxy_batch_sub_read_update_stats(rw->rsv.ns, AS_ERR_TIMEOUT);
462 }
463 else {
464 from_proxy_read_update_stats(rw->rsv.ns, AS_ERR_TIMEOUT);
465 }
466 break;
467 case FROM_BATCH:
468 as_batch_add_error(rw->from.batch_shared, rw->from_data.batch_index,
469 AS_ERR_TIMEOUT);
470 // Timeouts aren't included in histograms.
471 batch_sub_read_update_stats(rw->rsv.ns, AS_ERR_TIMEOUT);
472 break;
473 default:
474 cf_crash(AS_RW, "unexpected transaction origin %u", rw->origin);
475 break;
476 }
477
478 rw->from.any = NULL; // inform other callback it lost the race
479}
480
481
482//==========================================================
483// Local helpers - read local.
484//
485
486transaction_status
487read_local(as_transaction* tr)
488{
489 as_msg* m = &tr->msgp->msg;
490 as_namespace* ns = tr->rsv.ns;
491
492 as_index_ref r_ref;
493
494 if (as_record_get(tr->rsv.tree, &tr->keyd, &r_ref) != 0) {
495 read_local_done(tr, NULL, NULL, AS_ERR_NOT_FOUND);
496 return TRANS_DONE_ERROR;
497 }
498
499 as_record* r = r_ref.r;
500
501 // Make sure the message set name (if it's there) is correct.
502 if (! set_name_check(tr, r)) {
503 read_local_done(tr, &r_ref, NULL, AS_ERR_PARAMETER);
504 return TRANS_DONE_ERROR;
505 }
506
507 // Check if it's an expired or truncated record.
508 if (as_record_is_doomed(r, ns)) {
509 read_local_done(tr, &r_ref, NULL, AS_ERR_NOT_FOUND);
510 return TRANS_DONE_ERROR;
511 }
512
513 int result = repl_state_check(r, tr);
514
515 if (result != 0) {
516 if (result == -3) {
517 read_local_done(tr, &r_ref, NULL, AS_ERR_UNAVAILABLE);
518 return TRANS_DONE_ERROR;
519 }
520
521 // No response sent to origin.
522 as_record_done(&r_ref, ns);
523 return result == 1 ? TRANS_IN_PROGRESS : TRANS_WAITING;
524 }
525
526 // Check if it's a tombstone.
527 if (! as_record_is_live(r)) {
528 read_local_done(tr, &r_ref, NULL, AS_ERR_NOT_FOUND);
529 return TRANS_DONE_ERROR;
530 }
531
532 // Apply predexp metadata filter if present.
533
534 predexp_eval_t* predexp = NULL;
535 predexp_eval_t* batch_predexp = NULL;
536
537 if ((result = tr->origin == FROM_BATCH ?
538 batch_predexp_filter_meta(tr, r, &batch_predexp) :
539 build_predexp_and_filter_meta(tr, r, &predexp)) != 0) {
540 read_local_done(tr, &r_ref, NULL, result);
541 return TRANS_DONE_ERROR;
542 }
543
544 as_storage_rd rd;
545
546 as_storage_record_open(ns, r, &rd);
547
548 // If configuration permits, allow reads to use page cache.
549 rd.read_page_cache = ns->storage_read_page_cache;
550
551 // Apply predexp record bins filter if present.
552 if (predexp != NULL || batch_predexp != NULL) {
553 if ((result = predexp_read_and_filter_bins(&rd,
554 tr->origin == FROM_BATCH ? batch_predexp : predexp)) != 0) {
555 predexp_destroy(predexp);
556 read_local_done(tr, &r_ref, &rd, result);
557 return TRANS_DONE_ERROR;
558 }
559
560 predexp_destroy(predexp);
561 }
562
563 // Check the key if required.
564 // Note - for data-not-in-memory "exists" ops, key check is expensive!
565 if (as_transaction_has_key(tr) &&
566 as_storage_record_get_key(&rd) && ! check_msg_key(m, &rd)) {
567 read_local_done(tr, &r_ref, &rd, AS_ERR_KEY_MISMATCH);
568 return TRANS_DONE_ERROR;
569 }
570
571 if ((m->info1 & AS_MSG_INFO1_GET_NO_BINS) != 0) {
572 tr->generation = r->generation;
573 tr->void_time = r->void_time;
574 tr->last_update_time = r->last_update_time;
575
576 read_local_done(tr, &r_ref, &rd, AS_OK);
577 return TRANS_DONE_SUCCESS;
578 }
579
580 if ((result = as_storage_rd_load_n_bins(&rd)) < 0) {
581 cf_warning_digest(AS_RW, &tr->keyd, "{%s} read_local: failed as_storage_rd_load_n_bins() ", ns->name);
582 read_local_done(tr, &r_ref, &rd, -result);
583 return TRANS_DONE_ERROR;
584 }
585
586 as_bin stack_bins[ns->storage_data_in_memory ? 0 : rd.n_bins];
587
588 if ((result = as_storage_rd_load_bins(&rd, stack_bins)) < 0) {
589 cf_warning_digest(AS_RW, &tr->keyd, "{%s} read_local: failed as_storage_rd_load_bins() ", ns->name);
590 read_local_done(tr, &r_ref, &rd, -result);
591 return TRANS_DONE_ERROR;
592 }
593
594 uint32_t bin_count = (m->info1 & AS_MSG_INFO1_GET_ALL) != 0 ?
595 rd.n_bins : m->n_ops;
596
597 as_msg_op* ops[bin_count];
598 as_msg_op** p_ops = ops;
599 as_bin* response_bins[bin_count];
600 uint16_t n_bins = 0;
601
602 as_bin result_bins[bin_count];
603 uint32_t n_result_bins = 0;
604
605 if ((m->info1 & AS_MSG_INFO1_GET_ALL) != 0) {
606 p_ops = NULL;
607 n_bins = rd.n_bins;
608 as_bin_get_all_p(&rd, response_bins);
609 }
610 else {
611 if (m->n_ops == 0) {
612 cf_warning_digest(AS_RW, &tr->keyd, "{%s} read_local: bin op(s) expected, none present ", ns->name);
613 read_local_done(tr, &r_ref, &rd, AS_ERR_PARAMETER);
614 return TRANS_DONE_ERROR;
615 }
616
617 bool respond_all_ops = (m->info2 & AS_MSG_INFO2_RESPOND_ALL_OPS) != 0;
618
619 as_msg_op* op = 0;
620 int n = 0;
621
622 while ((op = as_msg_op_iterate(m, op, &n)) != NULL) {
623 if (op->op == AS_MSG_OP_READ) {
624 as_bin* b = as_bin_get_from_buf(&rd, op->name, op->name_sz);
625
626 if (b || respond_all_ops) {
627 ops[n_bins] = op;
628 response_bins[n_bins++] = b;
629 }
630 }
631 else if (op->op == AS_MSG_OP_BITS_READ) {
632 as_bin* b = as_bin_get_from_buf(&rd, op->name, op->name_sz);
633
634 if (b) {
635 as_bin* rb = &result_bins[n_result_bins];
636 as_bin_set_empty(rb);
637
638 if ((result = as_bin_bits_read_from_client(b, op, rb)) < 0) {
639 cf_warning_digest(AS_RW, &tr->keyd, "{%s} read_local: failed as_bin_bits_read_from_client() ", ns->name);
640 destroy_stack_bins(result_bins, n_result_bins);
641 read_local_done(tr, &r_ref, &rd, -result);
642 return TRANS_DONE_ERROR;
643 }
644
645 if (as_bin_inuse(rb)) {
646 n_result_bins++;
647 ops[n_bins] = op;
648 response_bins[n_bins++] = rb;
649 }
650 else if (respond_all_ops) {
651 ops[n_bins] = op;
652 response_bins[n_bins++] = NULL;
653 }
654 }
655 else if (respond_all_ops) {
656 ops[n_bins] = op;
657 response_bins[n_bins++] = NULL;
658 }
659 }
660 else if (op->op == AS_MSG_OP_CDT_READ) {
661 as_bin* b = as_bin_get_from_buf(&rd, op->name, op->name_sz);
662
663 if (b) {
664 as_bin* rb = &result_bins[n_result_bins];
665 as_bin_set_empty(rb);
666
667 if ((result = as_bin_cdt_read_from_client(b, op, rb)) < 0) {
668 cf_warning_digest(AS_RW, &tr->keyd, "{%s} read_local: failed as_bin_cdt_read_from_client() ", ns->name);
669 destroy_stack_bins(result_bins, n_result_bins);
670 read_local_done(tr, &r_ref, &rd, -result);
671 return TRANS_DONE_ERROR;
672 }
673
674 if (as_bin_inuse(rb)) {
675 n_result_bins++;
676 ops[n_bins] = op;
677 response_bins[n_bins++] = rb;
678 }
679 else if (respond_all_ops) {
680 ops[n_bins] = op;
681 response_bins[n_bins++] = NULL;
682 }
683 }
684 else if (respond_all_ops) {
685 ops[n_bins] = op;
686 response_bins[n_bins++] = NULL;
687 }
688 }
689 else {
690 cf_warning_digest(AS_RW, &tr->keyd, "{%s} read_local: unexpected bin op %u ", ns->name, op->op);
691 destroy_stack_bins(result_bins, n_result_bins);
692 read_local_done(tr, &r_ref, &rd, AS_ERR_PARAMETER);
693 return TRANS_DONE_ERROR;
694 }
695 }
696 }
697
698 cf_dyn_buf_define_size(db, 16 * 1024);
699
700 if (tr->origin != FROM_BATCH) {
701 db.used_sz = db.alloc_sz;
702 db.buf = (uint8_t*)as_msg_make_response_msg(tr->result_code,
703 r->generation, r->void_time, p_ops, response_bins, n_bins, ns,
704 (cl_msg*)dyn_bufdb, &db.used_sz, as_transaction_trid(tr));
705
706 db.is_stack = db.buf == dyn_bufdb;
707 // Note - not bothering to correct alloc_sz if buf was allocated.
708 }
709 else {
710 tr->generation = r->generation;
711 tr->void_time = r->void_time;
712 tr->last_update_time = r->last_update_time;
713
714 // Since as_batch_add_result() constructs response directly in shared
715 // buffer to avoid extra copies, can't use db.
716 send_read_response(tr, p_ops, response_bins, n_bins, NULL);
717 }
718
719 destroy_stack_bins(result_bins, n_result_bins);
720 as_storage_record_close(&rd);
721 as_record_done(&r_ref, ns);
722
723 // Now that we're not under the record lock, send the message we just built.
724 if (db.used_sz != 0) {
725 send_read_response(tr, NULL, NULL, 0, &db);
726
727 cf_dyn_buf_free(&db);
728 tr->from.proto_fd_h = NULL;
729 }
730
731 return TRANS_DONE_SUCCESS;
732}
733
734
735void
736read_local_done(as_transaction* tr, as_index_ref* r_ref, as_storage_rd* rd,
737 int result_code)
738{
739 if (r_ref) {
740 if (rd) {
741 as_storage_record_close(rd);
742 }
743
744 as_record_done(r_ref, tr->rsv.ns);
745 }
746
747 tr->result_code = (uint8_t)result_code;
748
749 send_read_response(tr, NULL, NULL, 0, NULL);
750}
751
752
753int
754batch_predexp_filter_meta(const as_transaction* tr, const as_record* r,
755 predexp_eval_t** predexp)
756{
757 *predexp = as_batch_get_predexp(tr->from.batch_shared);
758
759 if (*predexp == NULL) {
760 return AS_OK;
761 }
762
763 predexp_args_t predargs = { .ns = tr->rsv.ns, .md = (as_record*)r };
764 predexp_retval_t predrv = predexp_matches_metadata(*predexp, &predargs);
765
766 if (predrv == PREDEXP_UNKNOWN) {
767 return AS_OK; // caller must later check bins using *predexp
768 }
769 // else - caller will not need to apply filter later.
770
771 *predexp = NULL;
772
773 return predrv == PREDEXP_TRUE ? AS_OK : AS_ERR_FILTERED_OUT;
774}
775