1/*
2 * transaction.c
3 *
4 * Copyright (C) 2008-2015 Aerospike, Inc.
5 *
6 * Portions may be licensed to Aerospike, Inc. under one or more contributor
7 * license agreements.
8 *
9 * This program is free software: you can redistribute it and/or modify it under
10 * the terms of the GNU Affero General Public License as published by the Free
11 * Software Foundation, either version 3 of the License, or (at your option) any
12 * later version.
13 *
14 * This program is distributed in the hope that it will be useful, but WITHOUT
15 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
16 * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
17 * details.
18 *
19 * You should have received a copy of the GNU Affero General Public License
20 * along with this program. If not, see http://www.gnu.org/licenses/
21 */
22
23/*
24 * Operations on transactions
25 */
26
27#include "base/transaction.h"
28
29#include <stddef.h>
30#include <stdint.h>
31#include <string.h>
32#include <unistd.h>
33
34#include "aerospike/as_atomic.h"
35#include "citrusleaf/alloc.h"
36#include "citrusleaf/cf_atomic.h"
37#include "citrusleaf/cf_clock.h"
38#include "citrusleaf/cf_digest.h"
39
40#include "fault.h"
41#include "socket.h"
42
43#include "base/batch.h"
44#include "base/datamodel.h"
45#include "base/proto.h"
46#include "base/scan.h"
47#include "base/security.h"
48#include "base/service.h"
49#include "base/stats.h"
50#include "fabric/partition.h"
51#include "transaction/proxy.h"
52#include "transaction/rw_request.h"
53#include "transaction/rw_utils.h"
54#include "transaction/udf.h"
55#include "transaction/write.h"
56
57
58void
59as_transaction_init_head(as_transaction *tr, const cf_digest *keyd,
60 cl_msg *msgp)
61{
62 tr->msgp = msgp;
63 tr->msg_fields = 0;
64
65 tr->origin = 0;
66 tr->from_flags = 0;
67
68 tr->from.any = NULL;
69 tr->from_data.any = 0;
70
71 tr->keyd = keyd ? *keyd : cf_digest_zero;
72
73 tr->start_time = 0;
74 tr->benchmark_time = 0;
75}
76
77void
78as_transaction_init_body(as_transaction *tr)
79{
80 AS_PARTITION_RESERVATION_INIT(tr->rsv);
81
82 tr->end_time = 0;
83 tr->result_code = AS_OK;
84 tr->flags = 0;
85 tr->generation = 0;
86 tr->void_time = 0;
87 tr->last_update_time = 0;
88}
89
90void
91as_transaction_copy_head(as_transaction *to, const as_transaction *from)
92{
93 to->msgp = from->msgp;
94 to->msg_fields = from->msg_fields;
95
96 to->origin = from->origin;
97 to->from_flags = from->from_flags;
98
99 to->from.any = from->from.any;
100 to->from_data.any = from->from_data.any;
101
102 to->keyd = from->keyd;
103
104 to->start_time = from->start_time;
105 to->benchmark_time = from->benchmark_time;
106}
107
108void
109as_transaction_init_from_rw(as_transaction *tr, rw_request *rw)
110{
111 as_transaction_init_head_from_rw(tr, rw);
112 // Note - we don't clear rw->msgp, destructor will free it.
113
114 as_partition_reservation_copy(&tr->rsv, &rw->rsv);
115 // Note - destructor will still release the reservation.
116
117 tr->end_time = rw->end_time;
118 tr->result_code = rw->result_code;
119 tr->flags = rw->flags;
120 tr->generation = rw->generation;
121 tr->void_time = rw->void_time;
122 tr->last_update_time = rw->last_update_time;
123}
124
125void
126as_transaction_init_head_from_rw(as_transaction *tr, rw_request *rw)
127{
128 tr->msgp = rw->msgp;
129 tr->msg_fields = rw->msg_fields;
130 tr->origin = rw->origin;
131 tr->from_flags = rw->from_flags;
132 tr->from.any = rw->from.any;
133 tr->from_data.any = rw->from_data.any;
134 tr->keyd = rw->keyd;
135 tr->start_time = rw->start_time;
136 tr->benchmark_time = rw->benchmark_time;
137
138 rw->from.any = NULL;
139 // Note - we don't clear rw->msgp, destructor will free it.
140}
141
142bool
143as_transaction_set_msg_field_flag(as_transaction *tr, uint8_t type)
144{
145 switch (type) {
146 case AS_MSG_FIELD_TYPE_NAMESPACE:
147 tr->msg_fields |= AS_MSG_FIELD_BIT_NAMESPACE;
148 break;
149 case AS_MSG_FIELD_TYPE_SET:
150 tr->msg_fields |= AS_MSG_FIELD_BIT_SET;
151 break;
152 case AS_MSG_FIELD_TYPE_KEY:
153 tr->msg_fields |= AS_MSG_FIELD_BIT_KEY;
154 break;
155 case AS_MSG_FIELD_TYPE_DIGEST_RIPE:
156 tr->msg_fields |= AS_MSG_FIELD_BIT_DIGEST_RIPE;
157 break;
158 case AS_MSG_FIELD_TYPE_DIGEST_RIPE_ARRAY:
159 tr->msg_fields |= AS_MSG_FIELD_BIT_DIGEST_RIPE_ARRAY;
160 break;
161 case AS_MSG_FIELD_TYPE_TRID:
162 tr->msg_fields |= AS_MSG_FIELD_BIT_TRID;
163 break;
164 case AS_MSG_FIELD_TYPE_SCAN_OPTIONS:
165 tr->msg_fields |= AS_MSG_FIELD_BIT_SCAN_OPTIONS;
166 break;
167 case AS_MSG_FIELD_TYPE_SOCKET_TIMEOUT:
168 tr->msg_fields |= AS_MSG_FIELD_BIT_SOCKET_TIMEOUT;
169 break;
170 case AS_MSG_FIELD_TYPE_RECS_PER_SEC:
171 tr->msg_fields |= AS_MSG_FIELD_BIT_RECS_PER_SEC;
172 break;
173 case AS_MSG_FIELD_TYPE_INDEX_NAME:
174 tr->msg_fields |= AS_MSG_FIELD_BIT_INDEX_NAME;
175 break;
176 case AS_MSG_FIELD_TYPE_INDEX_RANGE:
177 tr->msg_fields |= AS_MSG_FIELD_BIT_INDEX_RANGE;
178 break;
179 case AS_MSG_FIELD_TYPE_INDEX_TYPE:
180 tr->msg_fields |= AS_MSG_FIELD_BIT_INDEX_TYPE;
181 break;
182 case AS_MSG_FIELD_TYPE_UDF_FILENAME:
183 tr->msg_fields |= AS_MSG_FIELD_BIT_UDF_FILENAME;
184 break;
185 case AS_MSG_FIELD_TYPE_UDF_FUNCTION:
186 tr->msg_fields |= AS_MSG_FIELD_BIT_UDF_FUNCTION;
187 break;
188 case AS_MSG_FIELD_TYPE_UDF_ARGLIST:
189 tr->msg_fields |= AS_MSG_FIELD_BIT_UDF_ARGLIST;
190 break;
191 case AS_MSG_FIELD_TYPE_UDF_OP:
192 tr->msg_fields |= AS_MSG_FIELD_BIT_UDF_OP;
193 break;
194 case AS_MSG_FIELD_TYPE_QUERY_BINLIST:
195 tr->msg_fields |= AS_MSG_FIELD_BIT_QUERY_BINLIST;
196 break;
197 case AS_MSG_FIELD_TYPE_BATCH: // shouldn't get here - batch parent handles this
198 tr->msg_fields |= AS_MSG_FIELD_BIT_BATCH;
199 break;
200 case AS_MSG_FIELD_TYPE_BATCH_WITH_SET: // shouldn't get here - batch parent handles this
201 tr->msg_fields |= AS_MSG_FIELD_BIT_BATCH_WITH_SET;
202 break;
203 case AS_MSG_FIELD_TYPE_PREDEXP:
204 tr->msg_fields |= AS_MSG_FIELD_BIT_PREDEXP;
205 break;
206 default:
207 return false;
208 }
209
210 return true;
211}
212
213bool
214as_transaction_prepare(as_transaction *tr, bool swap)
215{
216 uint64_t size = tr->msgp->proto.sz;
217
218 if (size < sizeof(as_msg)) {
219 cf_warning(AS_PROTO, "proto body size %lu smaller than as_msg", size);
220 return false;
221 }
222
223 // The proto data is not smaller than an as_msg - safe to swap header.
224 as_msg *m = &tr->msgp->msg;
225
226 if (swap) {
227 as_msg_swap_header(m);
228 }
229
230 uint8_t* p_end = (uint8_t*)m + size;
231 uint8_t* p_read = m->data;
232
233 // Parse and swap fields first.
234 for (uint16_t n = 0; n < m->n_fields; n++) {
235 if (p_read + sizeof(as_msg_field) > p_end) {
236 cf_warning(AS_PROTO, "incomplete as_msg_field");
237 return false;
238 }
239
240 as_msg_field* p_field = (as_msg_field*)p_read;
241
242 if (swap) {
243 as_msg_swap_field(p_field);
244 }
245
246 if (! (p_read = as_msg_field_skip(p_field))) {
247 cf_warning(AS_PROTO, "bad as_msg_field");
248 return false;
249 }
250
251 // Store which message fields are present - prevents lots of re-parsing.
252 if (! as_transaction_set_msg_field_flag(tr, p_field->type)) {
253 cf_debug(AS_PROTO, "skipping as_msg_field type %u", p_field->type);
254 }
255 }
256
257 // Parse and swap bin-ops, if any.
258 for (uint16_t n = 0; n < m->n_ops; n++) {
259 if (p_read + sizeof(as_msg_op) > p_end) {
260 cf_warning(AS_PROTO, "incomplete as_msg");
261 return false;
262 }
263
264 as_msg_op* op = (as_msg_op*)p_read;
265
266 if (swap) {
267 as_msg_swap_op(op);
268 }
269
270 if (! (p_read = as_msg_op_skip(op))) {
271 cf_warning(AS_PROTO, "bad as_msg_op");
272 return false;
273 }
274 }
275
276 if (p_read != p_end) {
277 cf_warning(AS_PROTO, "extra bytes follow fields and bin-ops");
278 return false;
279 }
280
281 return true;
282}
283
284// Initialize an internal UDF transaction (for a UDF scan/query). Uses shared
285// message with namespace but no digest, and no set for now since these
286// transactions won't get security checked, and can't create a record.
287void
288as_transaction_init_iudf(as_transaction *tr, as_namespace *ns, cf_digest *keyd,
289 iudf_origin* iudf_orig)
290{
291 // Note - digest is on transaction head before it's enqueued.
292 as_transaction_init_head(tr, keyd, iudf_orig->msgp);
293
294 as_transaction_set_msg_field_flag(tr, AS_MSG_FIELD_TYPE_NAMESPACE);
295
296 tr->origin = FROM_IUDF;
297 tr->from.iudf_orig = iudf_orig;
298
299 tr->start_time = cf_getns();
300}
301
302// Initialize an internal ops transaction (for an ops scan/query). Uses shared
303// message with namespace but no digest, and no set for now since these
304// transactions won't get security checked, and can't create a record.
305void
306as_transaction_init_iops(as_transaction *tr, as_namespace *ns, cf_digest *keyd,
307 iops_origin* iops_orig)
308{
309 // Note - digest is on transaction head before it's enqueued.
310 as_transaction_init_head(tr, keyd, iops_orig->msgp);
311
312 as_transaction_set_msg_field_flag(tr, AS_MSG_FIELD_TYPE_NAMESPACE);
313
314 tr->origin = FROM_IOPS;
315 tr->from.iops_orig = iops_orig;
316
317 tr->start_time = cf_getns();
318}
319
320void
321as_transaction_demarshal_error(as_transaction* tr, uint32_t error_code)
322{
323 as_msg_send_reply(tr->from.proto_fd_h, error_code, 0, 0, NULL, NULL, 0, NULL, 0);
324 tr->from.proto_fd_h = NULL;
325
326 cf_free(tr->msgp);
327 tr->msgp = NULL;
328
329 cf_atomic64_incr(&g_stats.n_demarshal_error);
330}
331
332#define UPDATE_ERROR_STATS(name) \
333 if (ns) { \
334 if (error_code == AS_ERR_TIMEOUT) { \
335 cf_atomic64_incr(&ns->n_##name##_tsvc_timeout); \
336 } \
337 else { \
338 cf_atomic64_incr(&ns->n_##name##_tsvc_error); \
339 } \
340 } \
341 else { \
342 cf_atomic64_incr(&g_stats.n_tsvc_##name##_error); \
343 }
344
345void
346as_transaction_error(as_transaction* tr, as_namespace* ns, uint32_t error_code)
347{
348 if (error_code == 0) {
349 cf_warning(AS_PROTO, "converting error code 0 to 1 (unknown)");
350 error_code = AS_ERR_UNKNOWN;
351 }
352
353 // The 'from' checks below are unnecessary, only paranoia.
354 switch (tr->origin) {
355 case FROM_CLIENT:
356 if (tr->from.proto_fd_h) {
357 as_msg_send_reply(tr->from.proto_fd_h, error_code, 0, 0, NULL, NULL, 0, NULL, as_transaction_trid(tr));
358 tr->from.proto_fd_h = NULL; // pattern, not needed
359 }
360 UPDATE_ERROR_STATS(client);
361 break;
362 case FROM_PROXY:
363 if (tr->from.proxy_node != 0) {
364 as_proxy_send_response(tr->from.proxy_node, tr->from_data.proxy_tid, error_code, 0, 0, NULL, NULL, 0, NULL, as_transaction_trid(tr));
365 tr->from.proxy_node = 0; // pattern, not needed
366 }
367 if (as_transaction_is_batch_sub(tr)) {
368 UPDATE_ERROR_STATS(from_proxy_batch_sub);
369 }
370 else {
371 UPDATE_ERROR_STATS(from_proxy);
372 }
373 break;
374 case FROM_BATCH:
375 if (tr->from.batch_shared) {
376 as_batch_add_error(tr->from.batch_shared, tr->from_data.batch_index, error_code);
377 tr->from.batch_shared = NULL; // pattern, not needed
378 tr->msgp = NULL; // pattern, not needed
379 }
380 UPDATE_ERROR_STATS(batch_sub);
381 break;
382 case FROM_IUDF:
383 if (tr->from.iudf_orig) {
384 tr->from.iudf_orig->cb(tr->from.iudf_orig->udata, error_code);
385 tr->from.iudf_orig = NULL; // pattern, not needed
386 }
387 UPDATE_ERROR_STATS(udf_sub);
388 break;
389 case FROM_IOPS:
390 if (tr->from.iops_orig) {
391 tr->from.iops_orig->cb(tr->from.iops_orig->udata, error_code);
392 tr->from.iops_orig = NULL; // pattern, not needed
393 }
394 UPDATE_ERROR_STATS(ops_sub);
395 break;
396 case FROM_RE_REPL:
397 if (tr->from.re_repl_orig_cb) {
398 tr->result_code = error_code;
399 tr->from.re_repl_orig_cb(tr);
400 tr->from.re_repl_orig_cb = NULL; // pattern, not needed
401 }
402 // Re-replications take care of stats independently.
403 break;
404 default:
405 cf_crash(AS_PROTO, "unexpected transaction origin %u", tr->origin);
406 break;
407 }
408}
409
410// TODO - temporary, until scan & query can do their own synchronous failure
411// responses. (Here we forfeit namespace info and add to global-scope error.)
412void
413as_multi_rec_transaction_error(as_transaction* tr, uint32_t error_code)
414{
415 if (error_code == 0) {
416 cf_warning(AS_PROTO, "converting error code 0 to 1 (unknown)");
417 error_code = AS_ERR_UNKNOWN;
418 }
419
420 switch (tr->origin) {
421 case FROM_CLIENT:
422 if (tr->from.proto_fd_h) {
423 as_msg_send_reply(tr->from.proto_fd_h, error_code, 0, 0, NULL, NULL, 0, NULL, as_transaction_trid(tr));
424 tr->from.proto_fd_h = NULL; // pattern, not needed
425 }
426 cf_atomic64_incr(&g_stats.n_tsvc_client_error);
427 break;
428 default:
429 cf_crash(AS_PROTO, "unexpected transaction origin %u", tr->origin);
430 break;
431 }
432}
433
434// Helper to release transaction file handles.
435void
436as_release_file_handle(as_file_handle *proto_fd_h)
437{
438 if (cf_rc_release(proto_fd_h) != 0) {
439 return;
440 }
441
442 cf_socket_close(&proto_fd_h->sock);
443 cf_socket_term(&proto_fd_h->sock);
444
445 if (proto_fd_h->proto != NULL) {
446 cf_free(proto_fd_h->proto);
447 }
448
449 if (proto_fd_h->security_filter != NULL) {
450 as_security_filter_destroy(proto_fd_h->security_filter);
451 }
452
453 cf_rc_free(proto_fd_h);
454 cf_atomic64_incr(&g_stats.proto_connections_closed);
455}
456
457void
458as_end_of_transaction(as_file_handle *proto_fd_h, bool force_close)
459{
460 if (force_close) {
461 cf_socket_shutdown(&proto_fd_h->sock);
462 }
463
464 // Rearm first.
465 as_service_rearm(proto_fd_h);
466
467 // Now allow service threads to exit and close the epoll instance.
468 as_decr_uint32(&proto_fd_h->in_transaction);
469}
470
471void
472as_end_of_transaction_ok(as_file_handle *proto_fd_h)
473{
474 as_end_of_transaction(proto_fd_h, false);
475}
476
477void
478as_end_of_transaction_force_close(as_file_handle *proto_fd_h)
479{
480 as_end_of_transaction(proto_fd_h, true);
481}
482