1/*
2 * migrate.h
3 *
4 * Copyright (C) 2008-2014 Aerospike, Inc.
5 *
6 * Portions may be licensed to Aerospike, Inc. under one or more contributor
7 * license agreements.
8 *
9 * This program is free software: you can redistribute it and/or modify it under
10 * the terms of the GNU Affero General Public License as published by the Free
11 * Software Foundation, either version 3 of the License, or (at your option) any
12 * later version.
13 *
14 * This program is distributed in the hope that it will be useful, but WITHOUT
15 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
16 * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
17 * details.
18 *
19 * You should have received a copy of the GNU Affero General Public License
20 * along with this program. If not, see http://www.gnu.org/licenses/
21 */
22
23#pragma once
24
25//==========================================================
26// Includes.
27//
28
29#include <stdbool.h>
30#include <stdint.h>
31
32#include "citrusleaf/cf_atomic.h"
33#include "citrusleaf/cf_digest.h"
34#include "citrusleaf/cf_queue.h"
35
36#include "msg.h"
37#include "node.h"
38#include "rchash.h"
39#include "shash.h"
40
41#include "fabric/hb.h"
42#include "fabric/partition.h"
43#include "fabric/partition_balance.h"
44
45
46//==========================================================
47// Forward declarations.
48//
49
50struct as_index_s;
51struct as_index_ref_s;
52struct as_namespace_s;
53struct as_remote_record_s;
54struct meta_in_q_s;
55struct meta_out_q_s;
56struct pb_task_s;
57
58
59//==========================================================
60// Typedefs & constants.
61//
62
63// For receiver-side migration flow-control.
64// TODO - move to namespace? Go even lower than 4?
65#define AS_MIGRATE_DEFAULT_MAX_NUM_INCOMING 4
66#define AS_MIGRATE_LIMIT_MAX_NUM_INCOMING 256
67
68// Maximum permissible number of migrate xmit threads.
69#define MAX_NUM_MIGRATE_XMIT_THREADS 100
70
71#define TX_FLAGS_NONE ((uint32_t) 0x0)
72#define TX_FLAGS_ACTING_MASTER ((uint32_t) 0x1)
73#define TX_FLAGS_LEAD ((uint32_t) 0x2)
74#define TX_FLAGS_CONTINGENT ((uint32_t) 0x4)
75
76
77//==========================================================
78// Public API.
79//
80
81void as_migrate_init();
82void as_migrate_emigrate(const struct pb_task_s *task);
83void as_migrate_set_num_xmit_threads(uint32_t n_threads);
84void as_migrate_dump(bool verbose);
85
86
87//==========================================================
88// Private API - for enterprise separation only.
89//
90
91typedef enum {
92 // These values go on the wire, so mind backward compatibility if changing.
93 MIG_FIELD_OP,
94 MIG_FIELD_UNUSED_1,
95 MIG_FIELD_EMIG_ID,
96 MIG_FIELD_NAMESPACE,
97 MIG_FIELD_PARTITION,
98 MIG_FIELD_DIGEST, // TODO - old pickle - deprecate in "six months"
99 MIG_FIELD_GENERATION, // TODO - old pickle - deprecate in "six months"
100 MIG_FIELD_RECORD,
101 MIG_FIELD_CLUSTER_KEY,
102 MIG_FIELD_UNUSED_9,
103 MIG_FIELD_VOID_TIME, // TODO - old pickle - deprecate in "six months"
104 MIG_FIELD_UNUSED_11,
105 MIG_FIELD_UNUSED_12,
106 MIG_FIELD_INFO,
107 MIG_FIELD_UNUSED_14,
108 MIG_FIELD_UNUSED_15,
109 MIG_FIELD_UNUSED_16,
110 MIG_FIELD_UNUSED_17,
111 MIG_FIELD_UNUSED_18,
112 MIG_FIELD_LAST_UPDATE_TIME, // TODO - old pickle - deprecate in "six months"
113 MIG_FIELD_FEATURES,
114 MIG_FIELD_UNUSED_21,
115 MIG_FIELD_META_RECORDS,
116 MIG_FIELD_META_SEQUENCE,
117 MIG_FIELD_META_SEQUENCE_FINAL,
118 MIG_FIELD_PARTITION_SIZE,
119 MIG_FIELD_SET_NAME, // TODO - old pickle - deprecate in "six months"
120 MIG_FIELD_KEY, // TODO - old pickle - deprecate in "six months"
121 MIG_FIELD_UNUSED_28,
122 MIG_FIELD_EMIG_INSERT_ID,
123
124 NUM_MIG_FIELDS
125} migrate_msg_fields;
126
127#define OPERATION_UNDEF 0
128#define OPERATION_OLD_INSERT 1 // TODO - old pickle - deprecate in "six months"
129#define OPERATION_INSERT_ACK 2
130#define OPERATION_START 3
131#define OPERATION_START_ACK_OK 4
132#define OPERATION_START_ACK_EAGAIN 5
133#define OPERATION_START_ACK_FAIL 6
134#define OPERATION_INSERT 7
135#define OPERATION_DONE 8
136#define OPERATION_DONE_ACK 9
137#define OPERATION_UNUSED_10 10 // deprecated
138#define OPERATION_MERGE_META 11
139#define OPERATION_MERGE_META_ACK 12
140#define OPERATION_ALL_DONE 13
141#define OPERATION_ALL_DONE_ACK 14
142
143#define MIG_INFO_UNUSED_1 0x0001
144#define MIG_INFO_UNUSED_2 0x0002
145#define MIG_INFO_UNREPLICATED 0x0004 // enterprise only
146#define MIG_INFO_TOMBSTONE 0x0008 // enterprise only
147
148#define MIG_FEATURE_MERGE 0x00000001U
149#define MIG_FEATURES_SEEN 0x80000000U // needed for backward compatibility
150extern const uint32_t MY_MIG_FEATURES;
151
152typedef struct emigration_s {
153 cf_node dest;
154 uint64_t cluster_key;
155 uint32_t id;
156 pb_task_type type;
157 uint32_t tx_flags;
158 cf_atomic32 state;
159 bool aborted;
160 bool from_replica;
161 uint64_t wait_until_ms;
162
163 cf_atomic32 bytes_emigrating;
164 cf_shash *reinsert_hash;
165 uint64_t insert_id;
166 cf_queue *ctrl_q;
167 struct meta_in_q_s *meta_q;
168
169 as_partition_reservation rsv;
170} emigration;
171
172typedef struct immigration_s {
173 cf_node src;
174 uint64_t cluster_key;
175 uint32_t pid;
176
177 cf_atomic32 done_recv; // flag - 0 if not yet received, atomic counter for receives
178 uint64_t start_recv_ms; // time the first START event was received
179 uint64_t done_recv_ms; // time the first DONE event was received
180
181 uint32_t emig_id;
182 struct meta_out_q_s *meta_q;
183
184 as_migrate_result start_result;
185 uint32_t features;
186 struct as_namespace_s *ns; // for statistics only
187
188 as_partition_reservation rsv;
189} immigration;
190
191typedef struct immigration_hkey_s {
192 cf_node src;
193 uint32_t emig_id;
194} __attribute__((__packed__)) immigration_hkey;
195
196
197// Globals.
198extern cf_rchash *g_emigration_hash;
199extern cf_rchash *g_immigration_hash;
200extern cf_queue g_emigration_q;
201
202
203// Emigration, immigration, & pickled record destructors.
204void emigration_release(emigration *emig);
205void immigration_release(immigration *immig);
206
207// Emigration.
208void emigrate_fill_queue_init();
209void emigrate_queue_push(emigration *emig);
210bool should_emigrate_record(emigration *emig, struct as_index_ref_s *r_ref);
211uint32_t emigration_pack_info(const emigration *emig, const struct as_index_s *r);
212
213// Migrate fabric message handling.
214void emigration_handle_meta_batch_request(cf_node src, msg *m);
215bool immigration_ignore_pickle(const uint8_t *buf, uint32_t info);
216void immigration_init_repl_state(struct as_remote_record_s* rr, uint32_t info);
217void immigration_handle_meta_batch_ack(cf_node src, msg *m);
218
219// Meta sender.
220bool immigration_start_meta_sender(immigration *immig, uint32_t emig_features, uint64_t emig_n_recs);
221