1 | /* |
2 | * rw_request.h |
3 | * |
4 | * Copyright (C) 2016 Aerospike, Inc. |
5 | * |
6 | * Portions may be licensed to Aerospike, Inc. under one or more contributor |
7 | * license agreements. |
8 | * |
9 | * This program is free software: you can redistribute it and/or modify it under |
10 | * the terms of the GNU Affero General Public License as published by the Free |
11 | * Software Foundation, either version 3 of the License, or (at your option) any |
12 | * later version. |
13 | * |
14 | * This program is distributed in the hope that it will be useful, but WITHOUT |
15 | * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
16 | * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more |
17 | * details. |
18 | * |
19 | * You should have received a copy of the GNU Affero General Public License |
20 | * along with this program. If not, see http://www.gnu.org/licenses/ |
21 | */ |
22 | |
23 | #pragma once |
24 | |
25 | //========================================================== |
26 | // Includes. |
27 | // |
28 | |
29 | #include <stdbool.h> |
30 | #include <stddef.h> |
31 | #include <stdint.h> |
32 | |
33 | #include "citrusleaf/alloc.h" |
34 | #include "citrusleaf/cf_atomic.h" |
35 | #include "citrusleaf/cf_byte_order.h" |
36 | #include "citrusleaf/cf_digest.h" |
37 | |
38 | #include "cf_mutex.h" |
39 | #include "dynbuf.h" |
40 | #include "msg.h" |
41 | #include "node.h" |
42 | |
43 | #include "base/proto.h" |
44 | #include "base/transaction.h" |
45 | #include "fabric/hb.h" |
46 | #include "fabric/partition.h" |
47 | |
48 | |
49 | //========================================================== |
50 | // Forward declarations. |
51 | // |
52 | |
53 | struct as_batch_shared_s; |
54 | struct as_file_handle_s; |
55 | struct as_transaction_s; |
56 | struct cl_msg_s; |
57 | struct iops_origin_s; |
58 | struct iudf_origin_s; |
59 | struct rw_request_s; |
60 | struct rw_wait_ele_s; |
61 | |
62 | |
63 | //========================================================== |
64 | // Typedefs & constants. |
65 | // |
66 | |
67 | typedef bool (*dup_res_done_cb) (struct rw_request_s* rw); |
68 | typedef void (*repl_write_done_cb) (struct rw_request_s* rw); |
69 | typedef void (*repl_ping_done_cb) (struct rw_request_s* rw); |
70 | typedef void (*timeout_done_cb) (struct rw_request_s* rw); |
71 | |
72 | typedef struct rw_request_s { |
73 | |
74 | //------------------------------------------------------ |
75 | // Matches as_transaction. |
76 | // |
77 | |
78 | struct cl_msg_s* msgp; |
79 | uint32_t msg_fields; |
80 | |
81 | uint8_t origin; |
82 | uint8_t from_flags; |
83 | |
84 | union { |
85 | void* any; |
86 | struct as_file_handle_s* proto_fd_h; |
87 | cf_node proxy_node; |
88 | struct iudf_origin_s* iudf_orig; |
89 | struct iops_origin_s* iops_orig; |
90 | struct as_batch_shared_s* batch_shared; |
91 | } from; |
92 | |
93 | union { |
94 | uint32_t any; |
95 | uint32_t batch_index; |
96 | uint32_t proxy_tid; |
97 | } from_data; |
98 | |
99 | cf_digest keyd; |
100 | |
101 | uint64_t start_time; |
102 | uint64_t benchmark_time; |
103 | |
104 | as_partition_reservation rsv; |
105 | |
106 | uint64_t end_time; |
107 | uint8_t result_code; |
108 | uint8_t flags; |
109 | uint16_t generation; |
110 | uint32_t void_time; |
111 | uint64_t last_update_time; |
112 | |
113 | // |
114 | // End of as_transaction look-alike. |
115 | //------------------------------------------------------ |
116 | |
117 | cf_mutex lock; |
118 | |
119 | struct rw_wait_ele_s* wait_queue_head; |
120 | struct rw_wait_ele_s* wait_queue_tail; |
121 | uint32_t wait_queue_depth; |
122 | |
123 | bool is_set_up; // TODO - redundant with timeout_cb |
124 | |
125 | // Store pickled data, for use in replica write. |
126 | bool is_old_pickle; // TODO - old pickle - remove in "six months" |
127 | uint8_t* pickle; |
128 | size_t pickle_sz; |
129 | const char* set_name; // points directly into vmap - never free it |
130 | uint32_t set_name_len; |
131 | uint8_t* key; |
132 | uint32_t key_size; |
133 | |
134 | // Store ops' responses here. |
135 | cf_dyn_buf response_db; |
136 | |
137 | // Manage responses for duplicate resolution and replica write requests, or |
138 | // alternatively, timeouts. |
139 | uint32_t tid; |
140 | bool dup_res_complete; |
141 | bool repl_write_complete; |
142 | bool repl_ping_complete; |
143 | dup_res_done_cb dup_res_cb; |
144 | repl_write_done_cb repl_write_cb; |
145 | repl_ping_done_cb repl_ping_cb; |
146 | timeout_done_cb timeout_cb; |
147 | |
148 | // Message being sent to dest_nodes. May be duplicate resolution or replica |
149 | // write request. Message is kept in case it needs to be retransmitted. |
150 | msg* dest_msg; |
151 | |
152 | uint64_t xmit_ms; // time of next retransmit |
153 | uint32_t retry_interval_ms; // interval to add for next retransmit |
154 | |
155 | // Destination info for duplicate resolution and replica write requests. |
156 | uint32_t n_dest_nodes; |
157 | cf_node dest_nodes[AS_CLUSTER_SZ]; |
158 | bool dest_complete[AS_CLUSTER_SZ]; |
159 | |
160 | // Duplicate resolution response messages from nodes with duplicates. |
161 | msg* best_dup_msg; |
162 | // TODO - could store best dup node-id - worth it? |
163 | uint8_t best_dup_result_code; |
164 | uint16_t best_dup_gen; |
165 | uint64_t best_dup_lut; |
166 | |
167 | bool tie_was_replicated; // enterprise only |
168 | |
169 | // Node health related stat, to track replication latency. |
170 | uint64_t repl_start_us; |
171 | |
172 | } rw_request; |
173 | |
174 | |
175 | //========================================================== |
176 | // Public API. |
177 | // |
178 | |
179 | rw_request* rw_request_create(); |
180 | void rw_request_destroy(rw_request* rw); |
181 | void rw_request_wait_q_push(rw_request* rw, struct as_transaction_s* tr); |
182 | void rw_request_wait_q_push_head(rw_request* rw, struct as_transaction_s* tr); |
183 | |
184 | static inline void |
185 | rw_request_hdestroy(void* pv) |
186 | { |
187 | rw_request_destroy((rw_request*)pv); |
188 | } |
189 | |
190 | static inline void |
191 | rw_request_release(rw_request* rw) |
192 | { |
193 | if (cf_rc_release(rw) == 0) { |
194 | rw_request_destroy(rw); |
195 | cf_rc_free(rw); |
196 | } |
197 | } |
198 | |
199 | static inline bool |
200 | rw_request_is_batch_sub(const rw_request* rw) |
201 | { |
202 | return (rw->from_flags & FROM_FLAG_BATCH_SUB) != 0; |
203 | } |
204 | |
205 | // See as_transaction_trid(). |
206 | static inline uint64_t |
207 | rw_request_trid(const rw_request* rw) |
208 | { |
209 | if ((rw->msg_fields & AS_MSG_FIELD_BIT_TRID) == 0) { |
210 | return 0; |
211 | } |
212 | |
213 | as_msg_field *f = as_msg_field_get(&rw->msgp->msg, AS_MSG_FIELD_TYPE_TRID); |
214 | |
215 | return cf_swap_from_be64(*(uint64_t*)f->data); |
216 | } |
217 | |