1/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3/*======
4This file is part of PerconaFT.
5
6
7Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
8
9 PerconaFT is free software: you can redistribute it and/or modify
10 it under the terms of the GNU General Public License, version 2,
11 as published by the Free Software Foundation.
12
13 PerconaFT is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
17
18 You should have received a copy of the GNU General Public License
19 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
20
21----------------------------------------
22
23 PerconaFT is free software: you can redistribute it and/or modify
24 it under the terms of the GNU Affero General Public License, version 3,
25 as published by the Free Software Foundation.
26
27 PerconaFT is distributed in the hope that it will be useful,
28 but WITHOUT ANY WARRANTY; without even the implied warranty of
29 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
30 GNU Affero General Public License for more details.
31
32 You should have received a copy of the GNU Affero General Public License
33 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
34======= */
35
36#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
37
38#include "ft/msg_buffer.h"
39#include "util/dbt.h"
40
41void message_buffer::create() {
42 _num_entries = 0;
43 _memory = nullptr;
44 _memory_usable = 0;
45 _memory_size = 0;
46 _memory_used = 0;
47}
48
49void message_buffer::clone(message_buffer *src) {
50 _num_entries = src->_num_entries;
51 _memory_used = src->_memory_used;
52 _memory_size = src->_memory_size;
53 XMALLOC_N(_memory_size, _memory);
54 memcpy(_memory, src->_memory, _memory_size);
55 _memory_usable = toku_malloc_usable_size(_memory);
56}
57
58void message_buffer::destroy() {
59 if (_memory != nullptr) {
60 toku_free(_memory);
61 _memory_usable = 0;
62 }
63}
64
65void message_buffer::deserialize_from_rbuf(struct rbuf *rb,
66 int32_t **fresh_offsets, int32_t *nfresh,
67 int32_t **stale_offsets, int32_t *nstale,
68 int32_t **broadcast_offsets, int32_t *nbroadcast) {
69 // read the number of messages in this buffer
70 int n_in_this_buffer = rbuf_int(rb);
71 if (fresh_offsets != nullptr) {
72 XMALLOC_N(n_in_this_buffer, *fresh_offsets);
73 }
74 if (stale_offsets != nullptr) {
75 XMALLOC_N(n_in_this_buffer, *stale_offsets);
76 }
77 if (broadcast_offsets != nullptr) {
78 XMALLOC_N(n_in_this_buffer, *broadcast_offsets);
79 }
80
81 _resize(rb->size + 64); // rb->size is a good hint for how big the buffer will be
82
83 // deserialize each message individually, noting whether it was fresh
84 // and putting its buffer offset in the appropriate offsets array
85 for (int i = 0; i < n_in_this_buffer; i++) {
86 XIDS xids;
87 bool is_fresh;
88 const ft_msg msg = ft_msg::deserialize_from_rbuf(rb, &xids, &is_fresh);
89
90 int32_t *dest;
91 if (ft_msg_type_applies_once(msg.type())) {
92 if (is_fresh) {
93 dest = fresh_offsets ? *fresh_offsets + (*nfresh)++ : nullptr;
94 } else {
95 dest = stale_offsets ? *stale_offsets + (*nstale)++ : nullptr;
96 }
97 } else {
98 invariant(ft_msg_type_applies_all(msg.type()) || ft_msg_type_does_nothing(msg.type()));
99 dest = broadcast_offsets ? *broadcast_offsets + (*nbroadcast)++ : nullptr;
100 }
101
102 enqueue(msg, is_fresh, dest);
103 toku_xids_destroy(&xids);
104 }
105
106 invariant(_num_entries == n_in_this_buffer);
107}
108
109MSN message_buffer::deserialize_from_rbuf_v13(struct rbuf *rb,
110 MSN *highest_unused_msn_for_upgrade,
111 int32_t **fresh_offsets, int32_t *nfresh,
112 int32_t **broadcast_offsets, int32_t *nbroadcast) {
113 // read the number of messages in this buffer
114 int n_in_this_buffer = rbuf_int(rb);
115 if (fresh_offsets != nullptr) {
116 XMALLOC_N(n_in_this_buffer, *fresh_offsets);
117 }
118 if (broadcast_offsets != nullptr) {
119 XMALLOC_N(n_in_this_buffer, *broadcast_offsets);
120 }
121
122 // Atomically decrement the header's MSN count by the number
123 // of messages in the buffer.
124 MSN highest_msn_in_this_buffer = {
125 .msn = toku_sync_sub_and_fetch(&highest_unused_msn_for_upgrade->msn, n_in_this_buffer)
126 };
127
128 // Create the message buffers from the deserialized buffer.
129 for (int i = 0; i < n_in_this_buffer; i++) {
130 XIDS xids;
131 // There were no stale messages at this version, so call it fresh.
132 const bool is_fresh = true;
133
134 // Increment our MSN, the last message should have the
135 // newest/highest MSN. See above for a full explanation.
136 highest_msn_in_this_buffer.msn++;
137 const ft_msg msg = ft_msg::deserialize_from_rbuf_v13(rb, highest_msn_in_this_buffer, &xids);
138
139 int32_t *dest;
140 if (ft_msg_type_applies_once(msg.type())) {
141 dest = fresh_offsets ? *fresh_offsets + (*nfresh)++ : nullptr;
142 } else {
143 invariant(ft_msg_type_applies_all(msg.type()) || ft_msg_type_does_nothing(msg.type()));
144 dest = broadcast_offsets ? *broadcast_offsets + (*nbroadcast)++ : nullptr;
145 }
146
147 enqueue(msg, is_fresh, dest);
148 toku_xids_destroy(&xids);
149 }
150
151 return highest_msn_in_this_buffer;
152}
153
154void message_buffer::_resize(size_t new_size) {
155 XREALLOC_N(new_size, _memory);
156 _memory_size = new_size;
157 _memory_usable = toku_malloc_usable_size(_memory);
158}
159
160static int next_power_of_two (int n) {
161 int r = 4096;
162 while (r < n) {
163 r*=2;
164 assert(r>0);
165 }
166 return r;
167}
168
169struct message_buffer::buffer_entry *message_buffer::get_buffer_entry(int32_t offset) const {
170 return (struct buffer_entry *) (_memory + offset);
171}
172
173void message_buffer::enqueue(const ft_msg &msg, bool is_fresh, int32_t *offset) {
174 int need_space_here = msg_memsize_in_buffer(msg);
175 int need_space_total = _memory_used + need_space_here;
176 if (_memory == nullptr || need_space_total > _memory_size) {
177 // resize the buffer to the next power of 2 greater than the needed space
178 int next_2 = next_power_of_two(need_space_total);
179 _resize(next_2);
180 }
181 uint32_t keylen = msg.kdbt()->size;
182 uint32_t datalen = msg.vdbt()->size;
183 struct buffer_entry *entry = get_buffer_entry(_memory_used);
184 entry->type = (unsigned char) msg.type();
185 entry->msn = msg.msn();
186 toku_xids_cpy(&entry->xids_s, msg.xids());
187 entry->is_fresh = is_fresh;
188 unsigned char *e_key = toku_xids_get_end_of_array(&entry->xids_s);
189 entry->keylen = keylen;
190 memcpy(e_key, msg.kdbt()->data, keylen);
191 entry->vallen = datalen;
192 memcpy(e_key + keylen, msg.vdbt()->data, datalen);
193 if (offset) {
194 *offset = _memory_used;
195 }
196 _num_entries++;
197 _memory_used += need_space_here;
198}
199
200void message_buffer::set_freshness(int32_t offset, bool is_fresh) {
201 struct buffer_entry *entry = get_buffer_entry(offset);
202 entry->is_fresh = is_fresh;
203}
204
205bool message_buffer::get_freshness(int32_t offset) const {
206 struct buffer_entry *entry = get_buffer_entry(offset);
207 return entry->is_fresh;
208}
209
210ft_msg message_buffer::get_message(int32_t offset, DBT *keydbt, DBT *valdbt) const {
211 struct buffer_entry *entry = get_buffer_entry(offset);
212 uint32_t keylen = entry->keylen;
213 uint32_t vallen = entry->vallen;
214 enum ft_msg_type type = (enum ft_msg_type) entry->type;
215 MSN msn = entry->msn;
216 const XIDS xids = (XIDS) &entry->xids_s;
217 const void *key = toku_xids_get_end_of_array(xids);
218 const void *val = (uint8_t *) key + entry->keylen;
219 return ft_msg(toku_fill_dbt(keydbt, key, keylen), toku_fill_dbt(valdbt, val, vallen), type, msn, xids);
220}
221
222void message_buffer::get_message_key_msn(int32_t offset, DBT *key, MSN *msn) const {
223 struct buffer_entry *entry = get_buffer_entry(offset);
224 if (key != nullptr) {
225 toku_fill_dbt(key, toku_xids_get_end_of_array((XIDS) &entry->xids_s), entry->keylen);
226 }
227 if (msn != nullptr) {
228 *msn = entry->msn;
229 }
230}
231
232int message_buffer::num_entries() const {
233 return _num_entries;
234}
235
236size_t message_buffer::buffer_size_in_use() const {
237 return _memory_used;
238}
239
240size_t message_buffer::memory_size_in_use() const {
241 return sizeof(*this) + _memory_used;
242}
243
244size_t message_buffer::memory_footprint() const {
245#ifdef TOKU_DEBUG_PARANOID
246 // Enable this code if you want to verify that the new way of computing
247 // the memory footprint is the same as the old.
248 // It slows the code down by perhaps 10%.
249 assert(_memory_usable == toku_malloc_usable_size(_memory));
250 size_t fp = toku_memory_footprint(_memory, _memory_used);
251 size_t fpg = toku_memory_footprint_given_usable_size(_memory_used, _memory_usable);
252 if (fp != fpg) printf("ptr=%p mu=%ld fp=%ld fpg=%ld\n", _memory, _memory_usable, fp, fpg);
253 assert(fp == fpg);
254#endif // TOKU_DEBUG_PARANOID
255 return sizeof(*this) + toku_memory_footprint_given_usable_size(_memory_used, _memory_usable);
256}
257
258bool message_buffer::equals(message_buffer *other) const {
259 return (_memory_used == other->_memory_used &&
260 memcmp(_memory, other->_memory, _memory_used) == 0);
261}
262
263void message_buffer::serialize_to_wbuf(struct wbuf *wb) const {
264 wbuf_nocrc_int(wb, _num_entries);
265 struct msg_serialize_fn {
266 struct wbuf *wb;
267 msg_serialize_fn(struct wbuf *w) : wb(w) { }
268 int operator()(const ft_msg &msg, bool is_fresh) {
269 msg.serialize_to_wbuf(wb, is_fresh);
270 return 0;
271 }
272 } serialize_fn(wb);
273 iterate(serialize_fn);
274}
275//void static stats(struct wbuf *wb) const {
276// wbuf_nocrc_int(wb, _num_entries);
277// struct msg_serialize_fn {
278// struct wbuf *wb;
279// msg_serialize_fn(struct wbuf *w) : wb(w) { }
280// int operator()(const ft_msg &msg, bool is_fresh) {
281// msg.serialize_to_wbuf(wb, is_fresh);
282// return 0;
283// }
284// } serialize_fn(wb);
285// iterate(serialize_fn);
286//}
287size_t message_buffer::msg_memsize_in_buffer(const ft_msg &msg) {
288 const uint32_t keylen = msg.kdbt()->size;
289 const uint32_t datalen = msg.vdbt()->size;
290 const size_t xidslen = toku_xids_get_size(msg.xids());
291 return sizeof(struct buffer_entry) + keylen + datalen + xidslen - sizeof(XIDS_S);
292}
293