1/*
2 * thr_tsvc.c
3 *
4 * Copyright (C) 2008-2014 Aerospike, Inc.
5 *
6 * Portions may be licensed to Aerospike, Inc. under one or more contributor
7 * license agreements.
8 *
9 * This program is free software: you can redistribute it and/or modify it under
10 * the terms of the GNU Affero General Public License as published by the Free
11 * Software Foundation, either version 3 of the License, or (at your option) any
12 * later version.
13 *
14 * This program is distributed in the hope that it will be useful, but WITHOUT
15 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
16 * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
17 * details.
18 *
19 * You should have received a copy of the GNU Affero General Public License
20 * along with this program. If not, see http://www.gnu.org/licenses/
21 */
22
23//==========================================================
24// Includes.
25//
26
27#include "base/thr_tsvc.h"
28
29#include <stdbool.h>
30#include <stddef.h>
31#include <stdint.h>
32
33#include "citrusleaf/alloc.h"
34#include "citrusleaf/cf_atomic.h"
35#include "citrusleaf/cf_clock.h"
36#include "citrusleaf/cf_digest.h"
37
38#include "fault.h"
39#include "node.h"
40
41#include "base/cfg.h"
42#include "base/datamodel.h"
43#include "base/proto.h"
44#include "base/scan.h"
45#include "base/secondary_index.h"
46#include "base/security.h"
47#include "base/stats.h"
48#include "base/transaction.h"
49#include "base/transaction_policy.h"
50#include "base/xdr_serverside.h"
51#include "fabric/partition.h"
52#include "fabric/partition_balance.h"
53#include "storage/storage.h"
54#include "transaction/delete.h"
55#include "transaction/proxy.h"
56#include "transaction/re_replicate.h"
57#include "transaction/read.h"
58#include "transaction/udf.h"
59#include "transaction/write.h"
60
61
62//==========================================================
63// Inlines & macros.
64//
65
66static inline bool
67should_security_check_data_op(const as_transaction *tr)
68{
69 return tr->origin == FROM_CLIENT || tr->origin == FROM_BATCH;
70}
71
72static inline as_sec_perm
73scan_perm(const as_transaction *tr)
74{
75 if (as_transaction_is_udf(tr)) {
76 return PERM_UDF_SCAN;
77 }
78
79 return (tr->msgp->msg.info2 & AS_MSG_INFO2_WRITE) != 0 ?
80 PERM_OPS_SCAN : PERM_SCAN;
81}
82
83static inline as_sec_perm
84query_perm(const as_transaction *tr)
85{
86 if (as_transaction_is_udf(tr)) {
87 return PERM_UDF_QUERY;
88 }
89
90 return (tr->msgp->msg.info2 & AS_MSG_INFO2_WRITE) != 0 ?
91 PERM_OPS_QUERY : PERM_QUERY;
92}
93
94static inline const char*
95write_type_tag(const as_transaction *tr)
96{
97 return as_transaction_is_delete(tr) ? "delete" :
98 (as_transaction_is_udf(tr) ? "udf" : "write");
99}
100
101static inline void
102detail_unique_client_rw(const as_transaction *tr, bool is_write)
103{
104 if (tr->origin == FROM_CLIENT) {
105 cf_detail_digest(AS_RW_CLIENT, &tr->keyd, "{%s} client %s %s ",
106 tr->rsv.ns->name, tr->from.proto_fd_h->client,
107 is_write ? write_type_tag(tr) : "read");
108 }
109}
110
111
112//==========================================================
113// Public API.
114//
115
116// Handle the transaction, including proxy to another node if necessary.
117void
118as_tsvc_process_transaction(as_transaction *tr)
119{
120 if (tr->msgp->proto.type == PROTO_TYPE_INTERNAL_XDR) {
121 as_xdr_read_txn(tr);
122 return;
123 }
124
125 int rv;
126 bool free_msgp = true;
127 cl_msg *msgp = tr->msgp;
128 as_msg *m = &msgp->msg;
129
130 as_transaction_init_body(tr);
131
132 // Check that the socket is authenticated.
133 if (tr->origin == FROM_CLIENT) {
134 uint8_t result = as_security_check(tr->from.proto_fd_h, PERM_NONE);
135
136 if (result != AS_OK) {
137 as_security_log(tr->from.proto_fd_h, result, PERM_NONE, NULL, NULL);
138 as_transaction_error(tr, NULL, (uint32_t)result);
139 goto Cleanup;
140 }
141 }
142
143 // All transactions must have a namespace.
144 as_msg_field *nf = as_msg_field_get(m, AS_MSG_FIELD_TYPE_NAMESPACE);
145
146 if (! nf) {
147 cf_warning(AS_TSVC, "no namespace in protocol request");
148 as_transaction_error(tr, NULL, AS_ERR_NAMESPACE);
149 goto Cleanup;
150 }
151
152 as_namespace *ns = as_namespace_get_bymsgfield(nf);
153
154 if (! ns) {
155 uint32_t ns_sz = as_msg_field_get_value_sz(nf);
156 CF_ZSTR_DEFINE(ns_name, AS_ID_NAMESPACE_SZ, nf->data, ns_sz);
157
158 cf_warning(AS_TSVC, "unknown namespace %s (%u) in protocol request - check configuration file",
159 ns_name, ns_sz);
160
161 as_transaction_error(tr, NULL, AS_ERR_NAMESPACE);
162 goto Cleanup;
163 }
164
165 // Have we finished the very first partition balance?
166 if (! as_partition_balance_is_init_resolved()) {
167 if (tr->origin == FROM_PROXY) {
168 as_proxy_return_to_sender(tr, ns);
169 tr->from.proxy_node = 0; // pattern, not needed
170 }
171 else {
172 cf_debug(AS_TSVC, "rejecting transaction - initial partition balance unresolved");
173 as_transaction_error(tr, NULL, AS_ERR_UNAVAILABLE);
174 // Note that we forfeited namespace info above so scan & query don't
175 // get counted as single-record error.
176 }
177
178 goto Cleanup;
179 }
180
181 //------------------------------------------------------
182 // Multi-record transaction.
183 //
184
185 if (as_transaction_is_multi_record(tr)) {
186 if (m->transaction_ttl != 0) {
187 // Queries may specify transaction_ttl, but don't use
188 // g_config.transaction_max_ns as a default. Assuming specified TTL
189 // is large enough that it's not worth checking for timeout here.
190 tr->end_time = tr->start_time +
191 ((uint64_t)m->transaction_ttl * 1000000);
192 }
193
194 if (as_transaction_is_batch_direct(tr)) {
195 // Old batch - deprecated.
196 as_multi_rec_transaction_error(tr, AS_ERR_UNSUPPORTED_FEATURE);
197 }
198 else if (as_transaction_is_query(tr)) {
199 // Query.
200 cf_atomic64_incr(&ns->query_reqs);
201
202 if (! as_security_check_data_op(tr, ns, query_perm(tr))) {
203 as_multi_rec_transaction_error(tr, tr->result_code);
204 goto Cleanup;
205 }
206
207 if (as_query(tr, ns) != 0) {
208 cf_atomic64_incr(&ns->query_fail);
209 as_multi_rec_transaction_error(tr, tr->result_code);
210 }
211 }
212 else {
213 // Scan.
214 if (! as_security_check_data_op(tr, ns, scan_perm(tr))) {
215 as_multi_rec_transaction_error(tr, tr->result_code);
216 goto Cleanup;
217 }
218
219 if ((rv = as_scan(tr, ns)) != 0) {
220 as_multi_rec_transaction_error(tr, rv);
221 }
222 }
223
224 goto Cleanup;
225 }
226
227 //------------------------------------------------------
228 // Single-record transaction.
229 //
230
231 // Calculate end_time based on message transaction TTL. May be recalculating
232 // for re-queued transactions, but nice if end_time not copied on/off queue.
233 if (m->transaction_ttl != 0) {
234 tr->end_time = tr->start_time +
235 ((uint64_t)m->transaction_ttl * 1000000);
236 }
237 else {
238 // Incorporate g_config.transaction_max_ns if appropriate.
239 // TODO - should g_config.transaction_max_ns = 0 be special?
240 tr->end_time = tr->start_time + g_config.transaction_max_ns;
241 }
242
243 // Did the transaction time out while on the queue?
244 if (cf_getns() > tr->end_time) {
245 cf_debug(AS_TSVC, "transaction timed out in queue");
246 as_transaction_error(tr, ns, AS_ERR_TIMEOUT);
247 goto Cleanup;
248 }
249
250 // All single-record transactions must have a digest, or a key from which
251 // to calculate it.
252 if (as_transaction_has_digest(tr)) {
253 // Modern client - just copy digest into tr.
254
255 as_msg_field *df = as_msg_field_get(m, AS_MSG_FIELD_TYPE_DIGEST_RIPE);
256 uint32_t digest_sz = as_msg_field_get_value_sz(df);
257
258 if (digest_sz != sizeof(cf_digest)) {
259 cf_warning(AS_TSVC, "digest msg field size %u", digest_sz);
260 as_transaction_error(tr, ns, AS_ERR_PARAMETER);
261 goto Cleanup;
262 }
263
264 tr->keyd = *(cf_digest *)df->data;
265 }
266 else if (as_transaction_has_key(tr)) {
267 // Old client - calculate digest from key & set, directly into tr.
268
269 as_msg_field *kf = as_msg_field_get(m, AS_MSG_FIELD_TYPE_KEY);
270 uint32_t key_sz = as_msg_field_get_value_sz(kf);
271
272 as_msg_field *sf = as_transaction_has_set(tr) ?
273 as_msg_field_get(m, AS_MSG_FIELD_TYPE_SET) : NULL;
274 uint32_t set_sz = sf ? as_msg_field_get_value_sz(sf) : 0;
275
276 cf_digest_compute2(sf->data, set_sz, kf->data, key_sz, &tr->keyd);
277 }
278 // else - batch sub-transactions & all internal transactions have neither
279 // digest nor key in the message - digest is already in tr.
280
281 // Process the transaction.
282
283 bool is_write = (m->info2 & AS_MSG_INFO2_WRITE) != 0;
284 bool is_read = (m->info1 & AS_MSG_INFO1_READ) != 0;
285 // Both can be set together, but is_write puts us on the 'write path' -
286 // write reservation, replica writes, etc. Writes quickly get split into
287 // write, delete, or UDF after the reservation.
288
289 uint32_t pid = as_partition_getid(&tr->keyd);
290 cf_node dest;
291
292 if (is_write) {
293 if (should_security_check_data_op(tr) &&
294 ! as_security_check_data_op(tr, ns,
295 PERM_WRITE | (is_read ? PERM_READ : 0))) {
296 as_transaction_error(tr, ns, tr->result_code);
297 goto Cleanup;
298 }
299
300 rv = as_partition_reserve_write(ns, pid, &tr->rsv, &dest);
301 }
302 else if (is_read) {
303 if (should_security_check_data_op(tr) &&
304 ! as_security_check_data_op(tr, ns, PERM_READ)) {
305 as_transaction_error(tr, ns, tr->result_code);
306 goto Cleanup;
307 }
308
309 rv = as_partition_reserve_read_tr(ns, pid, tr, &dest);
310 }
311 else {
312 cf_warning(AS_TSVC, "transaction is neither read nor write - unexpected");
313 as_transaction_error(tr, ns, AS_ERR_PARAMETER);
314 goto Cleanup;
315 }
316
317 if (rv == -2) {
318 // Partition is unavailable.
319 as_transaction_error(tr, ns, AS_ERR_UNAVAILABLE);
320 goto Cleanup;
321 }
322
323 if (dest == 0) {
324 cf_crash(AS_TSVC, "invalid destination while reserving partition");
325 }
326
327 if (rv == 0) {
328 // <><><><><><> Reservation Succeeded <><><><><><>
329
330 if (! as_transaction_is_restart(tr)) {
331 tr->benchmark_time = 0;
332 detail_unique_client_rw(tr, is_write);
333 }
334
335 transaction_status status;
336
337 if (is_write) {
338 if (as_transaction_is_delete(tr)) {
339 status = as_delete_start(tr);
340 }
341 else if (tr->origin == FROM_IUDF || as_transaction_is_udf(tr)) {
342 status = as_udf_start(tr);
343 }
344 else if (tr->origin == FROM_RE_REPL) {
345 status = as_re_replicate_start(tr);
346 }
347 else {
348 status = as_write_start(tr);
349 }
350 }
351 else {
352 status = as_read_start(tr);
353 }
354
355 switch (status) {
356 case TRANS_DONE_ERROR:
357 case TRANS_DONE_SUCCESS:
358 // Done, response already sent - free msg & release reservation.
359 as_partition_release(&tr->rsv);
360 break;
361 case TRANS_IN_PROGRESS:
362 // Don't free msg or release reservation - both owned by rw_request.
363 free_msgp = false;
364 break;
365 case TRANS_WAITING:
366 // Will be re-queued - don't free msg, but release reservation.
367 free_msgp = false;
368 as_partition_release(&tr->rsv);
369 break;
370 default:
371 cf_crash(AS_TSVC, "invalid transaction status %d", status);
372 break;
373 }
374 }
375 else {
376 // <><><><><><> Reservation Failed <><><><><><>
377
378 switch (tr->origin) {
379 case FROM_CLIENT:
380 case FROM_BATCH:
381 as_proxy_divert(dest, tr, ns);
382 // CLIENT: fabric owns msgp, BATCH: it's shared, don't free it.
383 free_msgp = false;
384 break;
385 case FROM_PROXY:
386 as_proxy_return_to_sender(tr, ns);
387 tr->from.proxy_node = 0; // pattern, not needed
388 break;
389 case FROM_IUDF:
390 tr->from.iudf_orig->cb(tr->from.iudf_orig->udata, AS_ERR_UNKNOWN);
391 tr->from.iudf_orig = NULL; // pattern, not needed
392 break;
393 case FROM_IOPS:
394 tr->from.iops_orig->cb(tr->from.iops_orig->udata, AS_ERR_UNKNOWN);
395 tr->from.iops_orig = NULL; // pattern, not needed
396 break;
397 case FROM_RE_REPL:
398 tr->from.re_repl_orig_cb(tr);
399 tr->from.re_repl_orig_cb = NULL; // pattern, not needed
400 break;
401 default:
402 cf_crash(AS_TSVC, "unexpected transaction origin %u", tr->origin);
403 break;
404 }
405 }
406
407Cleanup:
408
409 if (free_msgp && ! SHARED_MSGP(tr)) {
410 cf_free(msgp);
411 }
412} // end process_transaction()
413