1/*
2 * transaction.h
3 *
4 * Copyright (C) 2008-2016 Aerospike, Inc.
5 *
6 * Portions may be licensed to Aerospike, Inc. under one or more contributor
7 * license agreements.
8 *
9 * This program is free software: you can redistribute it and/or modify it under
10 * the terms of the GNU Affero General Public License as published by the Free
11 * Software Foundation, either version 3 of the License, or (at your option) any
12 * later version.
13 *
14 * This program is distributed in the hope that it will be useful, but WITHOUT
15 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
16 * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
17 * details.
18 *
19 * You should have received a copy of the GNU Affero General Public License
20 * along with this program. If not, see http://www.gnu.org/licenses/
21 */
22
23
24#pragma once
25
26#include <stdbool.h>
27#include <stddef.h>
28#include <stdint.h>
29
30#include "citrusleaf/alloc.h"
31#include "citrusleaf/cf_byte_order.h"
32#include "citrusleaf/cf_clock.h"
33#include "citrusleaf/cf_digest.h"
34
35#include "msg.h"
36#include "node.h"
37#include "socket.h"
38
39#include "base/proto.h"
40#include "fabric/partition.h"
41
42struct as_namespace_s;
43
44
45#define SHARED_MSGP(trw) ( \
46 trw->origin == FROM_BATCH || \
47 trw->origin == FROM_IUDF || \
48 trw->origin == FROM_IOPS)
49
50
51//==========================================================
52// Histogram macros.
53//
54
55#define G_HIST_INSERT_DATA_POINT(name, start_time) \
56{ \
57 if (g_config.name##_enabled) { \
58 histogram_insert_data_point(g_stats.name, start_time); \
59 } \
60}
61
62#define G_HIST_ACTIVATE_INSERT_DATA_POINT(name, start_time) \
63{ \
64 g_stats.name##_active = true; \
65 histogram_insert_data_point(g_stats.name, start_time); \
66}
67
68#define HIST_TRACK_ACTIVATE_INSERT_DATA_POINT(trw, name) \
69{ \
70 trw->rsv.ns->name##_active = true; \
71 cf_hist_track_insert_data_point(trw->rsv.ns->name, trw->start_time); \
72}
73
74#define HIST_ACTIVATE_INSERT_DATA_POINT(trw, name) \
75{ \
76 trw->rsv.ns->name##_active = true; \
77 histogram_insert_data_point(trw->rsv.ns->name, trw->start_time); \
78}
79
80#define BENCHMARK_START(tr, name, orig) \
81{ \
82 if (tr->rsv.ns->name##_benchmarks_enabled && tr->origin == orig) { \
83 if (tr->benchmark_time == 0) { \
84 tr->benchmark_time = histogram_insert_data_point(tr->rsv.ns->name##_start_hist, tr->start_time); \
85 } \
86 else { \
87 tr->benchmark_time = histogram_insert_data_point(tr->rsv.ns->name##_restart_hist, tr->benchmark_time); \
88 } \
89 } \
90}
91
92#define BENCHMARK_NEXT_DATA_POINT(trw, name, tok) \
93{ \
94 if (trw->rsv.ns->name##_benchmarks_enabled && trw->benchmark_time != 0) { \
95 trw->benchmark_time = histogram_insert_data_point(trw->rsv.ns->name##_##tok##_hist, trw->benchmark_time); \
96 } \
97}
98
99#define BENCHMARK_NEXT_DATA_POINT_FROM(trw, name, orig, tok) \
100{ \
101 if (trw->rsv.ns->name##_benchmarks_enabled && trw->origin == orig && trw->benchmark_time != 0) { \
102 trw->benchmark_time = histogram_insert_data_point(trw->rsv.ns->name##_##tok##_hist, trw->benchmark_time); \
103 } \
104}
105
106
107//==========================================================
108// Client socket information - as_file_handle.
109//
110
111typedef struct as_file_handle_s {
112 char client[64]; // client identifier (currently ip-addr:port)
113 uint64_t last_used; // last nanoseconds we read or wrote
114 cf_socket sock; // our client socket
115 cf_poll poll; // our epoll instance
116 uint32_t in_transaction; // don't reap or transfer during transaction
117 bool move_me; // redistribute to another service thread
118 bool reap_me; // force reaping (overrides in_transaction)
119 bool is_xdr; // XDR client connection
120 as_proto proto_hdr; // space for header when reading it from socket
121 as_proto *proto; // complete request message
122 uint64_t proto_unread; // bytes not yet read from socket
123 void *security_filter;
124} as_file_handle;
125
126// Helpers to release transaction file handles.
127void as_release_file_handle(as_file_handle *proto_fd_h);
128void as_end_of_transaction(as_file_handle *proto_fd_h, bool force_close);
129void as_end_of_transaction_ok(as_file_handle *proto_fd_h);
130void as_end_of_transaction_force_close(as_file_handle *proto_fd_h);
131
132
133//==========================================================
134// Transaction.
135//
136
137typedef enum {
138 TRANS_DONE_ERROR = -1, // tsvc frees msgp & reservation, response was sent to origin
139 TRANS_DONE_SUCCESS = 0, // tsvc frees msgp & reservation, response was sent to origin
140 TRANS_IN_PROGRESS = 1, // tsvc leaves msgp & reservation alone, rw_request now owns them
141 TRANS_WAITING = 2 // tsvc leaves msgp alone but frees reservation
142} transaction_status;
143
144// How to interpret the 'from' union.
145//
146// NOT a generic transaction type flag, e.g. batch sub-transactions that proxy
147// are FROM_PROXY on the proxyee node, hence we still need a separate
148// FROM_FLAG_BATCH_SUB.
149//
150typedef enum {
151 // External, comes through service or fabric:
152 FROM_CLIENT = 1,
153 FROM_PROXY,
154
155 // Internal, generated on local node:
156 FROM_BATCH,
157 FROM_IUDF,
158 FROM_IOPS,
159 FROM_RE_REPL, // enterprise-only
160
161 FROM_UNDEF = 0
162} transaction_origin;
163
164struct as_batch_shared_s;
165struct iudf_origin_s;
166struct iops_origin_s;
167
168typedef struct as_transaction_s {
169
170 //------------------------------------------------------
171 // transaction 'head' - copied onto queue.
172 //
173
174 cl_msg* msgp;
175 uint32_t msg_fields;
176
177 uint8_t origin;
178 uint8_t from_flags;
179
180 // 2 spare bytes.
181
182 union {
183 void* any;
184 as_file_handle* proto_fd_h;
185 cf_node proxy_node;
186 struct as_batch_shared_s* batch_shared;
187 struct iudf_origin_s* iudf_orig;
188 struct iops_origin_s* iops_orig;
189 void (*re_repl_orig_cb) (struct as_transaction_s* tr);
190 } from;
191
192 union {
193 uint32_t any;
194 uint32_t proxy_tid;
195 uint32_t batch_index;
196 } from_data;
197
198 cf_digest keyd; // only batch sub-transactions require this on queue
199
200 uint64_t start_time;
201 uint64_t benchmark_time;
202
203 //<><><><><><><><><><><> 64 bytes <><><><><><><><><><><>
204
205 //------------------------------------------------------
206 // transaction 'body' - NOT copied onto queue.
207 //
208
209 as_partition_reservation rsv;
210
211 uint64_t end_time;
212 uint8_t result_code;
213 uint8_t flags;
214 uint16_t generation;
215 uint32_t void_time;
216 uint64_t last_update_time;
217
218} as_transaction;
219
220#define AS_TRANSACTION_HEAD_SIZE (offsetof(as_transaction, rsv))
221
222// 'from_flags' bits - set before queuing transaction head:
223#define FROM_FLAG_BATCH_SUB 0x0001
224#define FROM_FLAG_RESTART 0x0002
225#define FROM_FLAG_RESTART_STRICT 0x0004 // enterprise-only
226
227// 'flags' bits - set in transaction body after queuing:
228#define AS_TRANSACTION_FLAG_SINDEX_TOUCHED 0x01
229#define AS_TRANSACTION_FLAG_IS_DELETE 0x02
230#define AS_TRANSACTION_FLAG_MUST_PING 0x04 // enterprise-only
231#define AS_TRANSACTION_FLAG_RSV_PROLE 0x08 // enterprise-only
232#define AS_TRANSACTION_FLAG_RSV_UNAVAILABLE 0x10 // enterprise-only
233
234
235void as_transaction_init_head(as_transaction *tr, const cf_digest *, cl_msg *);
236void as_transaction_init_body(as_transaction *tr);
237
238void as_transaction_copy_head(as_transaction *to, const as_transaction *from);
239
240struct rw_request_s;
241
242void as_transaction_init_from_rw(as_transaction *tr, struct rw_request_s *rw);
243void as_transaction_init_head_from_rw(as_transaction *tr, struct rw_request_s *rw);
244
245bool as_transaction_set_msg_field_flag(as_transaction *tr, uint8_t type);
246bool as_transaction_prepare(as_transaction *tr, bool swap);
247
248static inline bool
249as_transaction_is_restart(const as_transaction *tr)
250{
251 return (tr->from_flags & FROM_FLAG_RESTART) != 0;
252}
253
254static inline bool
255as_transaction_is_batch_sub(const as_transaction *tr)
256{
257 return (tr->from_flags & FROM_FLAG_BATCH_SUB) != 0;
258}
259
260static inline bool
261as_transaction_is_restart_strict(const as_transaction *tr)
262{
263 return (tr->from_flags & FROM_FLAG_RESTART_STRICT) != 0;
264}
265
266static inline bool
267as_transaction_has_set(const as_transaction *tr)
268{
269 return (tr->msg_fields & AS_MSG_FIELD_BIT_SET) != 0;
270}
271
272static inline bool
273as_transaction_has_key(const as_transaction *tr)
274{
275 return (tr->msg_fields & AS_MSG_FIELD_BIT_KEY) != 0;
276}
277
278static inline bool
279as_transaction_has_digest(const as_transaction *tr)
280{
281 return (tr->msg_fields & AS_MSG_FIELD_BIT_DIGEST_RIPE) != 0;
282}
283
284static inline bool
285as_transaction_has_no_key_or_digest(const as_transaction *tr)
286{
287 return (tr->msg_fields & (AS_MSG_FIELD_BIT_KEY | AS_MSG_FIELD_BIT_DIGEST_RIPE)) == 0;
288}
289
290static inline bool
291as_transaction_is_multi_record(const as_transaction *tr)
292{
293 return tr->origin == FROM_CLIENT &&
294 (tr->msg_fields & (AS_MSG_FIELD_BIT_KEY | AS_MSG_FIELD_BIT_DIGEST_RIPE)) == 0;
295}
296
297static inline bool
298as_transaction_is_batch_direct(const as_transaction *tr)
299{
300 // Assumes we're already multi-record.
301 return (tr->msg_fields & AS_MSG_FIELD_BIT_DIGEST_RIPE_ARRAY) != 0;
302}
303
304static inline bool
305as_transaction_is_query(const as_transaction *tr)
306{
307 // Assumes we're already multi-record.
308 return (tr->msg_fields & AS_MSG_FIELD_BIT_INDEX_RANGE) != 0;
309}
310
311static inline bool
312as_transaction_is_udf(const as_transaction *tr)
313{
314 return (tr->msg_fields & AS_MSG_FIELD_BIT_UDF_FILENAME) != 0;
315}
316
317static inline bool
318as_transaction_has_udf_op(const as_transaction *tr)
319{
320 return (tr->msg_fields & AS_MSG_FIELD_BIT_UDF_OP) != 0;
321}
322
323static inline bool
324as_transaction_has_scan_options(const as_transaction *tr)
325{
326 return (tr->msg_fields & AS_MSG_FIELD_BIT_SCAN_OPTIONS) != 0;
327}
328
329static inline bool
330as_transaction_has_socket_timeout(const as_transaction *tr)
331{
332 return (tr->msg_fields & AS_MSG_FIELD_BIT_SOCKET_TIMEOUT) != 0;
333}
334
335static inline bool
336as_transaction_has_recs_per_sec(const as_transaction *tr)
337{
338 return (tr->msg_fields & AS_MSG_FIELD_BIT_RECS_PER_SEC) != 0;
339}
340
341static inline bool
342as_transaction_has_predexp(const as_transaction *tr)
343{
344 return (tr->msg_fields & AS_MSG_FIELD_BIT_PREDEXP) != 0;
345}
346
347// For now it's not worth storing the trid in the as_transaction struct since we
348// only parse it from the msg once per transaction anyway.
349static inline uint64_t
350as_transaction_trid(const as_transaction *tr)
351{
352 if ((tr->msg_fields & AS_MSG_FIELD_BIT_TRID) == 0) {
353 return 0;
354 }
355
356 as_msg_field *f = as_msg_field_get(&tr->msgp->msg, AS_MSG_FIELD_TYPE_TRID);
357
358 return cf_swap_from_be64(*(uint64_t*)f->data);
359}
360
361static inline bool
362as_transaction_is_delete(const as_transaction *tr)
363{
364 return (tr->msgp->msg.info2 & AS_MSG_INFO2_DELETE) != 0;
365}
366
367static inline bool
368as_transaction_is_durable_delete(const as_transaction *tr)
369{
370 return (tr->msgp->msg.info2 & AS_MSG_INFO2_DURABLE_DELETE) != 0;
371}
372
373// TODO - where should this go?
374static inline bool
375as_msg_is_xdr(const as_msg *m)
376{
377 return (m->info1 & AS_MSG_INFO1_XDR) != 0;
378}
379
380static inline bool
381as_transaction_is_xdr(const as_transaction *tr)
382{
383 return (tr->msgp->msg.info1 & AS_MSG_INFO1_XDR) != 0;
384}
385
386static inline bool
387as_transaction_is_linearized_read(const as_transaction *tr)
388{
389 return (tr->msgp->msg.info3 & AS_MSG_INFO3_SC_READ_RELAX) == 0 &&
390 (tr->msgp->msg.info3 & AS_MSG_INFO3_SC_READ_TYPE) != 0;
391}
392
393static inline bool
394as_transaction_is_allow_unavailable_read(const as_transaction *tr)
395{
396 return (tr->msgp->msg.info3 & AS_MSG_INFO3_SC_READ_RELAX) != 0 &&
397 (tr->msgp->msg.info3 & AS_MSG_INFO3_SC_READ_TYPE) != 0;
398}
399
400static inline bool
401as_transaction_is_strict_read(const as_transaction *tr)
402{
403 return (tr->msgp->msg.info3 & AS_MSG_INFO3_SC_READ_RELAX) == 0;
404}
405
406void as_transaction_init_iudf(as_transaction *tr, struct as_namespace_s *ns, cf_digest *keyd, struct iudf_origin_s *iudf_orig);
407void as_transaction_init_iops(as_transaction *tr, struct as_namespace_s *ns, cf_digest *keyd, struct iops_origin_s* iops_orig);
408
409void as_transaction_demarshal_error(as_transaction *tr, uint32_t error_code);
410void as_transaction_error(as_transaction *tr, struct as_namespace_s *ns, uint32_t error_code);
411void as_multi_rec_transaction_error(as_transaction *tr, uint32_t error_code);
412