1/*
2 * rw_request.h
3 *
4 * Copyright (C) 2016 Aerospike, Inc.
5 *
6 * Portions may be licensed to Aerospike, Inc. under one or more contributor
7 * license agreements.
8 *
9 * This program is free software: you can redistribute it and/or modify it under
10 * the terms of the GNU Affero General Public License as published by the Free
11 * Software Foundation, either version 3 of the License, or (at your option) any
12 * later version.
13 *
14 * This program is distributed in the hope that it will be useful, but WITHOUT
15 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
16 * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
17 * details.
18 *
19 * You should have received a copy of the GNU Affero General Public License
20 * along with this program. If not, see http://www.gnu.org/licenses/
21 */
22
23#pragma once
24
25//==========================================================
26// Includes.
27//
28
29#include <stdbool.h>
30#include <stddef.h>
31#include <stdint.h>
32
33#include "citrusleaf/alloc.h"
34#include "citrusleaf/cf_atomic.h"
35#include "citrusleaf/cf_byte_order.h"
36#include "citrusleaf/cf_digest.h"
37
38#include "cf_mutex.h"
39#include "dynbuf.h"
40#include "msg.h"
41#include "node.h"
42
43#include "base/proto.h"
44#include "base/transaction.h"
45#include "fabric/hb.h"
46#include "fabric/partition.h"
47
48
49//==========================================================
50// Forward declarations.
51//
52
53struct as_batch_shared_s;
54struct as_file_handle_s;
55struct as_transaction_s;
56struct cl_msg_s;
57struct iops_origin_s;
58struct iudf_origin_s;
59struct rw_request_s;
60struct rw_wait_ele_s;
61
62
63//==========================================================
64// Typedefs & constants.
65//
66
67typedef bool (*dup_res_done_cb) (struct rw_request_s* rw);
68typedef void (*repl_write_done_cb) (struct rw_request_s* rw);
69typedef void (*repl_ping_done_cb) (struct rw_request_s* rw);
70typedef void (*timeout_done_cb) (struct rw_request_s* rw);
71
72typedef struct rw_request_s {
73
74 //------------------------------------------------------
75 // Matches as_transaction.
76 //
77
78 struct cl_msg_s* msgp;
79 uint32_t msg_fields;
80
81 uint8_t origin;
82 uint8_t from_flags;
83
84 union {
85 void* any;
86 struct as_file_handle_s* proto_fd_h;
87 cf_node proxy_node;
88 struct iudf_origin_s* iudf_orig;
89 struct iops_origin_s* iops_orig;
90 struct as_batch_shared_s* batch_shared;
91 } from;
92
93 union {
94 uint32_t any;
95 uint32_t batch_index;
96 uint32_t proxy_tid;
97 } from_data;
98
99 cf_digest keyd;
100
101 uint64_t start_time;
102 uint64_t benchmark_time;
103
104 as_partition_reservation rsv;
105
106 uint64_t end_time;
107 uint8_t result_code;
108 uint8_t flags;
109 uint16_t generation;
110 uint32_t void_time;
111 uint64_t last_update_time;
112
113 //
114 // End of as_transaction look-alike.
115 //------------------------------------------------------
116
117 cf_mutex lock;
118
119 struct rw_wait_ele_s* wait_queue_head;
120 struct rw_wait_ele_s* wait_queue_tail;
121 uint32_t wait_queue_depth;
122
123 bool is_set_up; // TODO - redundant with timeout_cb
124
125 // Store pickled data, for use in replica write.
126 bool is_old_pickle; // TODO - old pickle - remove in "six months"
127 uint8_t* pickle;
128 size_t pickle_sz;
129 const char* set_name; // points directly into vmap - never free it
130 uint32_t set_name_len;
131 uint8_t* key;
132 uint32_t key_size;
133
134 // Store ops' responses here.
135 cf_dyn_buf response_db;
136
137 // Manage responses for duplicate resolution and replica write requests, or
138 // alternatively, timeouts.
139 uint32_t tid;
140 bool dup_res_complete;
141 bool repl_write_complete;
142 bool repl_ping_complete;
143 dup_res_done_cb dup_res_cb;
144 repl_write_done_cb repl_write_cb;
145 repl_ping_done_cb repl_ping_cb;
146 timeout_done_cb timeout_cb;
147
148 // Message being sent to dest_nodes. May be duplicate resolution or replica
149 // write request. Message is kept in case it needs to be retransmitted.
150 msg* dest_msg;
151
152 uint64_t xmit_ms; // time of next retransmit
153 uint32_t retry_interval_ms; // interval to add for next retransmit
154
155 // Destination info for duplicate resolution and replica write requests.
156 uint32_t n_dest_nodes;
157 cf_node dest_nodes[AS_CLUSTER_SZ];
158 bool dest_complete[AS_CLUSTER_SZ];
159
160 // Duplicate resolution response messages from nodes with duplicates.
161 msg* best_dup_msg;
162 // TODO - could store best dup node-id - worth it?
163 uint8_t best_dup_result_code;
164 uint16_t best_dup_gen;
165 uint64_t best_dup_lut;
166
167 bool tie_was_replicated; // enterprise only
168
169 // Node health related stat, to track replication latency.
170 uint64_t repl_start_us;
171
172} rw_request;
173
174
175//==========================================================
176// Public API.
177//
178
179rw_request* rw_request_create();
180void rw_request_destroy(rw_request* rw);
181void rw_request_wait_q_push(rw_request* rw, struct as_transaction_s* tr);
182void rw_request_wait_q_push_head(rw_request* rw, struct as_transaction_s* tr);
183
184static inline void
185rw_request_hdestroy(void* pv)
186{
187 rw_request_destroy((rw_request*)pv);
188}
189
190static inline void
191rw_request_release(rw_request* rw)
192{
193 if (cf_rc_release(rw) == 0) {
194 rw_request_destroy(rw);
195 cf_rc_free(rw);
196 }
197}
198
199static inline bool
200rw_request_is_batch_sub(const rw_request* rw)
201{
202 return (rw->from_flags & FROM_FLAG_BATCH_SUB) != 0;
203}
204
205// See as_transaction_trid().
206static inline uint64_t
207rw_request_trid(const rw_request* rw)
208{
209 if ((rw->msg_fields & AS_MSG_FIELD_BIT_TRID) == 0) {
210 return 0;
211 }
212
213 as_msg_field *f = as_msg_field_get(&rw->msgp->msg, AS_MSG_FIELD_TYPE_TRID);
214
215 return cf_swap_from_be64(*(uint64_t*)f->data);
216}
217