1/**************************************************************************/
2/* scene_replication_interface.cpp */
3/**************************************************************************/
4/* This file is part of: */
5/* GODOT ENGINE */
6/* https://godotengine.org */
7/**************************************************************************/
8/* Copyright (c) 2014-present Godot Engine contributors (see AUTHORS.md). */
9/* Copyright (c) 2007-2014 Juan Linietsky, Ariel Manzur. */
10/* */
11/* Permission is hereby granted, free of charge, to any person obtaining */
12/* a copy of this software and associated documentation files (the */
13/* "Software"), to deal in the Software without restriction, including */
14/* without limitation the rights to use, copy, modify, merge, publish, */
15/* distribute, sublicense, and/or sell copies of the Software, and to */
16/* permit persons to whom the Software is furnished to do so, subject to */
17/* the following conditions: */
18/* */
19/* The above copyright notice and this permission notice shall be */
20/* included in all copies or substantial portions of the Software. */
21/* */
22/* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, */
23/* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF */
24/* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. */
25/* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY */
26/* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, */
27/* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE */
28/* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */
29/**************************************************************************/
30
31#include "scene_replication_interface.h"
32
33#include "scene_multiplayer.h"
34
35#include "core/debugger/engine_debugger.h"
36#include "core/io/marshalls.h"
37#include "scene/main/node.h"
38#include "scene/scene_string_names.h"
39
40#define MAKE_ROOM(m_amount) \
41 if (packet_cache.size() < m_amount) \
42 packet_cache.resize(m_amount);
43
44#ifdef DEBUG_ENABLED
45_FORCE_INLINE_ void SceneReplicationInterface::_profile_node_data(const String &p_what, ObjectID p_id, int p_size) {
46 if (EngineDebugger::is_profiling("multiplayer:replication")) {
47 Array values;
48 values.push_back(p_what);
49 values.push_back(p_id);
50 values.push_back(p_size);
51 EngineDebugger::profiler_add_frame_data("multiplayer:replication", values);
52 }
53}
54#endif
55
56SceneReplicationInterface::TrackedNode &SceneReplicationInterface::_track(const ObjectID &p_id) {
57 if (!tracked_nodes.has(p_id)) {
58 tracked_nodes[p_id] = TrackedNode(p_id);
59 Node *node = get_id_as<Node>(p_id);
60 node->connect(SceneStringNames::get_singleton()->tree_exited, callable_mp(this, &SceneReplicationInterface::_untrack).bind(p_id), Node::CONNECT_ONE_SHOT);
61 }
62 return tracked_nodes[p_id];
63}
64
65void SceneReplicationInterface::_untrack(const ObjectID &p_id) {
66 if (!tracked_nodes.has(p_id)) {
67 return;
68 }
69 uint32_t net_id = tracked_nodes[p_id].net_id;
70 uint32_t peer = tracked_nodes[p_id].remote_peer;
71 tracked_nodes.erase(p_id);
72 // If it was spawned by a remote, remove it from the received nodes.
73 if (peer && peers_info.has(peer)) {
74 peers_info[peer].recv_nodes.erase(net_id);
75 }
76 // If we spawned or synced it, we need to remove it from any peer it was sent to.
77 if (net_id || peer == 0) {
78 for (KeyValue<int, PeerInfo> &E : peers_info) {
79 E.value.spawn_nodes.erase(p_id);
80 }
81 }
82}
83
84void SceneReplicationInterface::_free_remotes(const PeerInfo &p_info) {
85 for (const KeyValue<uint32_t, ObjectID> &E : p_info.recv_nodes) {
86 Node *node = tracked_nodes.has(E.value) ? get_id_as<Node>(E.value) : nullptr;
87 ERR_CONTINUE(!node);
88 node->queue_free();
89 }
90}
91
92void SceneReplicationInterface::on_peer_change(int p_id, bool p_connected) {
93 if (p_connected) {
94 peers_info[p_id] = PeerInfo();
95 for (const ObjectID &oid : spawned_nodes) {
96 _update_spawn_visibility(p_id, oid);
97 }
98 for (const ObjectID &oid : sync_nodes) {
99 _update_sync_visibility(p_id, get_id_as<MultiplayerSynchronizer>(oid));
100 }
101 } else {
102 ERR_FAIL_COND(!peers_info.has(p_id));
103 _free_remotes(peers_info[p_id]);
104 peers_info.erase(p_id);
105 }
106}
107
108void SceneReplicationInterface::on_reset() {
109 for (const KeyValue<int, PeerInfo> &E : peers_info) {
110 _free_remotes(E.value);
111 }
112 peers_info.clear();
113 // Tracked nodes are cleared on deletion, here we only reset the ids so they can be later re-assigned.
114 for (KeyValue<ObjectID, TrackedNode> &E : tracked_nodes) {
115 TrackedNode &tobj = E.value;
116 tobj.net_id = 0;
117 tobj.remote_peer = 0;
118 }
119 for (const ObjectID &oid : sync_nodes) {
120 MultiplayerSynchronizer *sync = get_id_as<MultiplayerSynchronizer>(oid);
121 ERR_CONTINUE(!sync);
122 sync->reset();
123 }
124 last_net_id = 0;
125}
126
127void SceneReplicationInterface::on_network_process() {
128 // Prevent endless stalling in case of unforeseen spawn errors.
129 if (spawn_queue.size()) {
130 ERR_PRINT("An error happened during last spawn, this usually means the 'ready' signal was not emitted by the spawned node.");
131 for (const ObjectID &oid : spawn_queue) {
132 Node *node = get_id_as<Node>(oid);
133 ERR_CONTINUE(!node);
134 if (node->is_connected(SceneStringNames::get_singleton()->ready, callable_mp(this, &SceneReplicationInterface::_node_ready))) {
135 node->disconnect(SceneStringNames::get_singleton()->ready, callable_mp(this, &SceneReplicationInterface::_node_ready));
136 }
137 }
138 spawn_queue.clear();
139 }
140
141 // Process syncs.
142 uint64_t usec = OS::get_singleton()->get_ticks_usec();
143 for (KeyValue<int, PeerInfo> &E : peers_info) {
144 const HashSet<ObjectID> to_sync = E.value.sync_nodes;
145 if (to_sync.is_empty()) {
146 continue; // Nothing to sync
147 }
148 uint16_t sync_net_time = ++E.value.last_sent_sync;
149 _send_sync(E.key, to_sync, sync_net_time, usec);
150 _send_delta(E.key, to_sync, usec, E.value.last_watch_usecs);
151 }
152}
153
154Error SceneReplicationInterface::on_spawn(Object *p_obj, Variant p_config) {
155 Node *node = Object::cast_to<Node>(p_obj);
156 ERR_FAIL_COND_V(!node || p_config.get_type() != Variant::OBJECT, ERR_INVALID_PARAMETER);
157 MultiplayerSpawner *spawner = Object::cast_to<MultiplayerSpawner>(p_config.get_validated_object());
158 ERR_FAIL_COND_V(!spawner, ERR_INVALID_PARAMETER);
159 // Track node.
160 const ObjectID oid = node->get_instance_id();
161 TrackedNode &tobj = _track(oid);
162
163 // Spawn state needs to be callected after "ready", but the spawn order follows "enter_tree".
164 ERR_FAIL_COND_V(tobj.spawner != ObjectID(), ERR_ALREADY_IN_USE);
165 tobj.spawner = spawner->get_instance_id();
166 spawn_queue.insert(oid);
167 node->connect(SceneStringNames::get_singleton()->ready, callable_mp(this, &SceneReplicationInterface::_node_ready).bind(oid), Node::CONNECT_ONE_SHOT);
168 return OK;
169}
170
171void SceneReplicationInterface::_node_ready(const ObjectID &p_oid) {
172 ERR_FAIL_COND(!spawn_queue.has(p_oid)); // Bug.
173
174 // If we are a nested spawn, we need to wait until the parent is ready.
175 if (p_oid != *(spawn_queue.begin())) {
176 return;
177 }
178
179 for (const ObjectID &oid : spawn_queue) {
180 ERR_CONTINUE(!tracked_nodes.has(oid));
181
182 TrackedNode &tobj = tracked_nodes[oid];
183 MultiplayerSpawner *spawner = get_id_as<MultiplayerSpawner>(tobj.spawner);
184 ERR_CONTINUE(!spawner);
185
186 spawned_nodes.insert(oid);
187 if (multiplayer->has_multiplayer_peer() && spawner->is_multiplayer_authority()) {
188 if (tobj.net_id == 0) {
189 tobj.net_id = ++last_net_id;
190 }
191 _update_spawn_visibility(0, oid);
192 }
193 }
194 spawn_queue.clear();
195}
196
197Error SceneReplicationInterface::on_despawn(Object *p_obj, Variant p_config) {
198 Node *node = Object::cast_to<Node>(p_obj);
199 ERR_FAIL_COND_V(!node || p_config.get_type() != Variant::OBJECT, ERR_INVALID_PARAMETER);
200 MultiplayerSpawner *spawner = Object::cast_to<MultiplayerSpawner>(p_config.get_validated_object());
201 ERR_FAIL_COND_V(!p_obj || !spawner, ERR_INVALID_PARAMETER);
202 // Forcibly despawn to all peers that knowns me.
203 int len = 0;
204 Error err = _make_despawn_packet(node, len);
205 ERR_FAIL_COND_V(err != OK, ERR_BUG);
206 const ObjectID oid = p_obj->get_instance_id();
207 for (const KeyValue<int, PeerInfo> &E : peers_info) {
208 if (!E.value.spawn_nodes.has(oid)) {
209 continue;
210 }
211 _send_raw(packet_cache.ptr(), len, E.key, true);
212 }
213 // Also remove spawner tracking from the replication state.
214 ERR_FAIL_COND_V(!tracked_nodes.has(oid), ERR_INVALID_PARAMETER);
215 TrackedNode &tobj = _track(oid);
216 ERR_FAIL_COND_V(tobj.spawner != spawner->get_instance_id(), ERR_INVALID_PARAMETER);
217 tobj.spawner = ObjectID();
218 spawned_nodes.erase(oid);
219 for (KeyValue<int, PeerInfo> &E : peers_info) {
220 E.value.spawn_nodes.erase(oid);
221 }
222 return OK;
223}
224
225Error SceneReplicationInterface::on_replication_start(Object *p_obj, Variant p_config) {
226 Node *node = Object::cast_to<Node>(p_obj);
227 ERR_FAIL_COND_V(!node || p_config.get_type() != Variant::OBJECT, ERR_INVALID_PARAMETER);
228 MultiplayerSynchronizer *sync = Object::cast_to<MultiplayerSynchronizer>(p_config.get_validated_object());
229 ERR_FAIL_COND_V(!sync, ERR_INVALID_PARAMETER);
230
231 // Add to synchronizer list.
232 TrackedNode &tobj = _track(p_obj->get_instance_id());
233 const ObjectID sid = sync->get_instance_id();
234 tobj.synchronizers.insert(sid);
235 sync_nodes.insert(sid);
236
237 // Update visibility.
238 sync->connect("visibility_changed", callable_mp(this, &SceneReplicationInterface::_visibility_changed).bind(sync->get_instance_id()));
239 _update_sync_visibility(0, sync);
240
241 if (pending_spawn == p_obj->get_instance_id() && sync->get_multiplayer_authority() == pending_spawn_remote) {
242 // Try to apply synchronizer Net ID
243 ERR_FAIL_COND_V_MSG(pending_sync_net_ids.is_empty(), ERR_INVALID_DATA, vformat("The MultiplayerSynchronizer at path \"%s\" is unable to process the pending spawn since it has no network ID. This might happen when changing the multiplayer authority during the \"_ready\" callback. Make sure to only change the authority of multiplayer synchronizers during \"_enter_tree\" or the \"_spawn_custom\" callback of their multiplayer spawner.", sync->get_path()));
244 ERR_FAIL_COND_V(!peers_info.has(pending_spawn_remote), ERR_INVALID_DATA);
245 uint32_t net_id = pending_sync_net_ids[0];
246 pending_sync_net_ids.pop_front();
247 peers_info[pending_spawn_remote].recv_sync_ids[net_id] = sync->get_instance_id();
248
249 // Try to apply spawn state (before ready).
250 if (pending_buffer_size > 0) {
251 ERR_FAIL_COND_V(!node || sync->get_replication_config().is_null(), ERR_UNCONFIGURED);
252 int consumed = 0;
253 const List<NodePath> props = sync->get_replication_config()->get_spawn_properties();
254 Vector<Variant> vars;
255 vars.resize(props.size());
256 Error err = MultiplayerAPI::decode_and_decompress_variants(vars, pending_buffer, pending_buffer_size, consumed);
257 ERR_FAIL_COND_V(err, err);
258 if (consumed > 0) {
259 pending_buffer += consumed;
260 pending_buffer_size -= consumed;
261 err = MultiplayerSynchronizer::set_state(props, node, vars);
262 ERR_FAIL_COND_V(err, err);
263 }
264 }
265 }
266 return OK;
267}
268
269Error SceneReplicationInterface::on_replication_stop(Object *p_obj, Variant p_config) {
270 Node *node = Object::cast_to<Node>(p_obj);
271 ERR_FAIL_COND_V(!node || p_config.get_type() != Variant::OBJECT, ERR_INVALID_PARAMETER);
272 MultiplayerSynchronizer *sync = Object::cast_to<MultiplayerSynchronizer>(p_config.get_validated_object());
273 ERR_FAIL_COND_V(!sync, ERR_INVALID_PARAMETER);
274 sync->disconnect("visibility_changed", callable_mp(this, &SceneReplicationInterface::_visibility_changed));
275 // Untrack synchronizer.
276 const ObjectID oid = node->get_instance_id();
277 const ObjectID sid = sync->get_instance_id();
278 ERR_FAIL_COND_V(!tracked_nodes.has(oid), ERR_INVALID_PARAMETER);
279 TrackedNode &tobj = _track(oid);
280 tobj.synchronizers.erase(sid);
281 sync_nodes.erase(sid);
282 for (KeyValue<int, PeerInfo> &E : peers_info) {
283 E.value.sync_nodes.erase(sid);
284 E.value.last_watch_usecs.erase(sid);
285 if (sync->get_net_id()) {
286 E.value.recv_sync_ids.erase(sync->get_net_id());
287 }
288 }
289 return OK;
290}
291
292void SceneReplicationInterface::_visibility_changed(int p_peer, ObjectID p_sid) {
293 MultiplayerSynchronizer *sync = get_id_as<MultiplayerSynchronizer>(p_sid);
294 ERR_FAIL_COND(!sync); // Bug.
295 Node *node = sync->get_root_node();
296 ERR_FAIL_COND(!node); // Bug.
297 const ObjectID oid = node->get_instance_id();
298 if (spawned_nodes.has(oid) && p_peer != multiplayer->get_unique_id()) {
299 _update_spawn_visibility(p_peer, oid);
300 }
301 _update_sync_visibility(p_peer, sync);
302}
303
304bool SceneReplicationInterface::is_rpc_visible(const ObjectID &p_oid, int p_peer) const {
305 if (!tracked_nodes.has(p_oid)) {
306 return true; // Untracked nodes are always visible to RPCs.
307 }
308 ERR_FAIL_COND_V(p_peer < 0, false);
309 const TrackedNode &tnode = tracked_nodes[p_oid];
310 if (tnode.synchronizers.is_empty()) {
311 return true; // No synchronizers means no visibility restrictions.
312 }
313 if (tnode.remote_peer && uint32_t(p_peer) == tnode.remote_peer) {
314 return true; // RPCs on spawned nodes are always visible to spawner.
315 } else if (spawned_nodes.has(p_oid)) {
316 // It's a spawned node we control, this can be fast.
317 if (p_peer) {
318 return peers_info.has(p_peer) && peers_info[p_peer].spawn_nodes.has(p_oid);
319 } else {
320 for (const KeyValue<int, PeerInfo> &E : peers_info) {
321 if (!E.value.spawn_nodes.has(p_oid)) {
322 return false; // Not public.
323 }
324 }
325 return true; // All peers have this node.
326 }
327 } else {
328 // Cycle object synchronizers to check visibility.
329 for (const ObjectID &sid : tnode.synchronizers) {
330 MultiplayerSynchronizer *sync = get_id_as<MultiplayerSynchronizer>(sid);
331 ERR_CONTINUE(!sync);
332 // RPC visibility is composed using OR when multiple synchronizers are present.
333 // Note that we don't really care about authority here which may lead to unexpected
334 // results when using multiple synchronizers to control the same node.
335 if (sync->is_visible_to(p_peer)) {
336 return true;
337 }
338 }
339 return false; // Not visible.
340 }
341}
342
343Error SceneReplicationInterface::_update_sync_visibility(int p_peer, MultiplayerSynchronizer *p_sync) {
344 ERR_FAIL_COND_V(!p_sync, ERR_BUG);
345 if (!multiplayer->has_multiplayer_peer() || !p_sync->is_multiplayer_authority() || p_peer == multiplayer->get_unique_id()) {
346 return OK;
347 }
348
349 const ObjectID &sid = p_sync->get_instance_id();
350 bool is_visible = p_sync->is_visible_to(p_peer);
351 if (p_peer == 0) {
352 for (KeyValue<int, PeerInfo> &E : peers_info) {
353 // Might be visible to this specific peer.
354 bool is_visible_to_peer = is_visible || p_sync->is_visible_to(E.key);
355 if (is_visible_to_peer == E.value.sync_nodes.has(sid)) {
356 continue;
357 }
358 if (is_visible_to_peer) {
359 E.value.sync_nodes.insert(sid);
360 } else {
361 E.value.sync_nodes.erase(sid);
362 E.value.last_watch_usecs.erase(sid);
363 }
364 }
365 return OK;
366 } else {
367 ERR_FAIL_COND_V(!peers_info.has(p_peer), ERR_INVALID_PARAMETER);
368 if (is_visible == peers_info[p_peer].sync_nodes.has(sid)) {
369 return OK;
370 }
371 if (is_visible) {
372 peers_info[p_peer].sync_nodes.insert(sid);
373 } else {
374 peers_info[p_peer].sync_nodes.erase(sid);
375 peers_info[p_peer].last_watch_usecs.erase(sid);
376 }
377 return OK;
378 }
379}
380
381Error SceneReplicationInterface::_update_spawn_visibility(int p_peer, const ObjectID &p_oid) {
382 const TrackedNode *tnode = tracked_nodes.getptr(p_oid);
383 ERR_FAIL_COND_V(!tnode, ERR_BUG);
384 MultiplayerSpawner *spawner = get_id_as<MultiplayerSpawner>(tnode->spawner);
385 Node *node = get_id_as<Node>(p_oid);
386 ERR_FAIL_COND_V(!node || !spawner || !spawner->is_multiplayer_authority(), ERR_BUG);
387 ERR_FAIL_COND_V(!tracked_nodes.has(p_oid), ERR_BUG);
388 const HashSet<ObjectID> synchronizers = tracked_nodes[p_oid].synchronizers;
389 bool is_visible = true;
390 for (const ObjectID &sid : synchronizers) {
391 MultiplayerSynchronizer *sync = get_id_as<MultiplayerSynchronizer>(sid);
392 ERR_CONTINUE(!sync);
393 if (!sync->is_multiplayer_authority()) {
394 continue;
395 }
396 // Spawn visibility is composed using OR when multiple synchronizers are present.
397 if (sync->is_visible_to(p_peer)) {
398 is_visible = true;
399 break;
400 }
401 is_visible = false;
402 }
403 // Spawn (and despawn) when needed.
404 HashSet<int> to_spawn;
405 HashSet<int> to_despawn;
406 if (p_peer) {
407 ERR_FAIL_COND_V(!peers_info.has(p_peer), ERR_INVALID_PARAMETER);
408 if (is_visible == peers_info[p_peer].spawn_nodes.has(p_oid)) {
409 return OK;
410 }
411 if (is_visible) {
412 to_spawn.insert(p_peer);
413 } else {
414 to_despawn.insert(p_peer);
415 }
416 } else {
417 // Check visibility for each peers.
418 for (const KeyValue<int, PeerInfo> &E : peers_info) {
419 if (is_visible) {
420 // This is fast, since the the object is visible to everyone, we don't need to check each peer.
421 if (E.value.spawn_nodes.has(p_oid)) {
422 // Already spawned.
423 continue;
424 }
425 to_spawn.insert(E.key);
426 } else {
427 // Need to check visibility for each peer.
428 _update_spawn_visibility(E.key, p_oid);
429 }
430 }
431 }
432 if (to_spawn.size()) {
433 int len = 0;
434 _make_spawn_packet(node, spawner, len);
435 for (int pid : to_spawn) {
436 ERR_CONTINUE(!peers_info.has(pid));
437 int path_id;
438 multiplayer->get_path_cache()->send_object_cache(spawner, pid, path_id);
439 _send_raw(packet_cache.ptr(), len, pid, true);
440 peers_info[pid].spawn_nodes.insert(p_oid);
441 }
442 }
443 if (to_despawn.size()) {
444 int len = 0;
445 _make_despawn_packet(node, len);
446 for (int pid : to_despawn) {
447 ERR_CONTINUE(!peers_info.has(pid));
448 peers_info[pid].spawn_nodes.erase(p_oid);
449 _send_raw(packet_cache.ptr(), len, pid, true);
450 }
451 }
452 return OK;
453}
454
455Error SceneReplicationInterface::_send_raw(const uint8_t *p_buffer, int p_size, int p_peer, bool p_reliable) {
456 ERR_FAIL_COND_V(!p_buffer || p_size < 1, ERR_INVALID_PARAMETER);
457 ERR_FAIL_COND_V(!multiplayer->has_multiplayer_peer(), ERR_UNCONFIGURED);
458
459 Ref<MultiplayerPeer> peer = multiplayer->get_multiplayer_peer();
460 peer->set_transfer_channel(0);
461 peer->set_transfer_mode(p_reliable ? MultiplayerPeer::TRANSFER_MODE_RELIABLE : MultiplayerPeer::TRANSFER_MODE_UNRELIABLE);
462 return multiplayer->send_command(p_peer, p_buffer, p_size);
463}
464
465Error SceneReplicationInterface::_make_spawn_packet(Node *p_node, MultiplayerSpawner *p_spawner, int &r_len) {
466 ERR_FAIL_COND_V(!multiplayer || !p_node || !p_spawner, ERR_BUG);
467
468 const ObjectID oid = p_node->get_instance_id();
469 const TrackedNode *tnode = tracked_nodes.getptr(oid);
470 ERR_FAIL_COND_V(!tnode, ERR_INVALID_PARAMETER);
471
472 uint32_t nid = tnode->net_id;
473 ERR_FAIL_COND_V(!nid, ERR_UNCONFIGURED);
474
475 // Prepare custom arg and scene_id
476 uint8_t scene_id = p_spawner->find_spawnable_scene_index_from_object(oid);
477 bool is_custom = scene_id == MultiplayerSpawner::INVALID_ID;
478 Variant spawn_arg = p_spawner->get_spawn_argument(oid);
479 int spawn_arg_size = 0;
480 if (is_custom) {
481 Error err = MultiplayerAPI::encode_and_compress_variant(spawn_arg, nullptr, spawn_arg_size, false);
482 ERR_FAIL_COND_V(err, err);
483 }
484
485 // Prepare spawn state.
486 List<NodePath> state_props;
487 List<uint32_t> sync_ids;
488 const HashSet<ObjectID> synchronizers = tnode->synchronizers;
489 for (const ObjectID &sid : synchronizers) {
490 MultiplayerSynchronizer *sync = get_id_as<MultiplayerSynchronizer>(sid);
491 if (!sync->is_multiplayer_authority()) {
492 continue;
493 }
494 ERR_CONTINUE(!sync);
495 ERR_FAIL_COND_V(sync->get_replication_config().is_null(), ERR_BUG);
496 for (const NodePath &prop : sync->get_replication_config()->get_spawn_properties()) {
497 state_props.push_back(prop);
498 }
499 // Ensure the synchronizer has an ID.
500 if (sync->get_net_id() == 0) {
501 sync->set_net_id(++last_net_id);
502 }
503 sync_ids.push_back(sync->get_net_id());
504 }
505 int state_size = 0;
506 Vector<Variant> state_vars;
507 Vector<const Variant *> state_varp;
508 if (state_props.size()) {
509 Error err = MultiplayerSynchronizer::get_state(state_props, p_node, state_vars, state_varp);
510 ERR_FAIL_COND_V_MSG(err != OK, err, "Unable to retrieve spawn state.");
511 err = MultiplayerAPI::encode_and_compress_variants(state_varp.ptrw(), state_varp.size(), nullptr, state_size);
512 ERR_FAIL_COND_V_MSG(err != OK, err, "Unable to encode spawn state.");
513 }
514
515 // Encode scene ID, path ID, net ID, node name.
516 int path_id = multiplayer->get_path_cache()->make_object_cache(p_spawner);
517 CharString cname = p_node->get_name().operator String().utf8();
518 int nlen = encode_cstring(cname.get_data(), nullptr);
519 MAKE_ROOM(1 + 1 + 4 + 4 + 4 + 4 * sync_ids.size() + 4 + nlen + (is_custom ? 4 + spawn_arg_size : 0) + state_size);
520 uint8_t *ptr = packet_cache.ptrw();
521 ptr[0] = (uint8_t)SceneMultiplayer::NETWORK_COMMAND_SPAWN;
522 ptr[1] = scene_id;
523 int ofs = 2;
524 ofs += encode_uint32(path_id, &ptr[ofs]);
525 ofs += encode_uint32(nid, &ptr[ofs]);
526 ofs += encode_uint32(sync_ids.size(), &ptr[ofs]);
527 ofs += encode_uint32(nlen, &ptr[ofs]);
528 for (uint32_t snid : sync_ids) {
529 ofs += encode_uint32(snid, &ptr[ofs]);
530 }
531 ofs += encode_cstring(cname.get_data(), &ptr[ofs]);
532 // Write args
533 if (is_custom) {
534 ofs += encode_uint32(spawn_arg_size, &ptr[ofs]);
535 Error err = MultiplayerAPI::encode_and_compress_variant(spawn_arg, &ptr[ofs], spawn_arg_size, false);
536 ERR_FAIL_COND_V(err, err);
537 ofs += spawn_arg_size;
538 }
539 // Write state.
540 if (state_size) {
541 Error err = MultiplayerAPI::encode_and_compress_variants(state_varp.ptrw(), state_varp.size(), &ptr[ofs], state_size);
542 ERR_FAIL_COND_V(err, err);
543 ofs += state_size;
544 }
545 r_len = ofs;
546 return OK;
547}
548
549Error SceneReplicationInterface::_make_despawn_packet(Node *p_node, int &r_len) {
550 const ObjectID oid = p_node->get_instance_id();
551 const TrackedNode *tnode = tracked_nodes.getptr(oid);
552 ERR_FAIL_COND_V(!tnode, ERR_INVALID_PARAMETER);
553 MAKE_ROOM(5);
554 uint8_t *ptr = packet_cache.ptrw();
555 ptr[0] = (uint8_t)SceneMultiplayer::NETWORK_COMMAND_DESPAWN;
556 int ofs = 1;
557 uint32_t nid = tnode->net_id;
558 ofs += encode_uint32(nid, &ptr[ofs]);
559 r_len = ofs;
560 return OK;
561}
562
563Error SceneReplicationInterface::on_spawn_receive(int p_from, const uint8_t *p_buffer, int p_buffer_len) {
564 ERR_FAIL_COND_V_MSG(p_buffer_len < 18, ERR_INVALID_DATA, "Invalid spawn packet received");
565 int ofs = 1; // The spawn/despawn command.
566 uint8_t scene_id = p_buffer[ofs];
567 ofs += 1;
568 uint32_t node_target = decode_uint32(&p_buffer[ofs]);
569 ofs += 4;
570 MultiplayerSpawner *spawner = Object::cast_to<MultiplayerSpawner>(multiplayer->get_path_cache()->get_cached_object(p_from, node_target));
571 ERR_FAIL_COND_V(!spawner, ERR_DOES_NOT_EXIST);
572 ERR_FAIL_COND_V(p_from != spawner->get_multiplayer_authority(), ERR_UNAUTHORIZED);
573
574 uint32_t net_id = decode_uint32(&p_buffer[ofs]);
575 ofs += 4;
576 uint32_t sync_len = decode_uint32(&p_buffer[ofs]);
577 ofs += 4;
578 uint32_t name_len = decode_uint32(&p_buffer[ofs]);
579 ofs += 4;
580 ERR_FAIL_COND_V_MSG(name_len + (sync_len * 4) > uint32_t(p_buffer_len - ofs), ERR_INVALID_DATA, vformat("Invalid spawn packet size: %d, wants: %d", p_buffer_len, ofs + name_len + (sync_len * 4)));
581 List<uint32_t> sync_ids;
582 for (uint32_t i = 0; i < sync_len; i++) {
583 sync_ids.push_back(decode_uint32(&p_buffer[ofs]));
584 ofs += 4;
585 }
586 ERR_FAIL_COND_V_MSG(name_len < 1, ERR_INVALID_DATA, "Zero spawn name size.");
587
588 // We need to make sure no trickery happens here, but we want to allow autogenerated ("@") node names.
589 const String name = String::utf8((const char *)&p_buffer[ofs], name_len);
590 ERR_FAIL_COND_V_MSG(name.validate_node_name() != name, ERR_INVALID_DATA, vformat("Invalid node name received: '%s'. Make sure to add nodes via 'add_child(node, true)' remotely.", name));
591 ofs += name_len;
592
593 // Check that we can spawn.
594 Node *parent = spawner->get_node_or_null(spawner->get_spawn_path());
595 ERR_FAIL_COND_V(!parent, ERR_UNCONFIGURED);
596 ERR_FAIL_COND_V(parent->has_node(name), ERR_INVALID_DATA);
597
598 Node *node = nullptr;
599 if (scene_id == MultiplayerSpawner::INVALID_ID) {
600 // Custom spawn.
601 ERR_FAIL_COND_V(p_buffer_len - ofs < 4, ERR_INVALID_DATA);
602 uint32_t arg_size = decode_uint32(&p_buffer[ofs]);
603 ofs += 4;
604 ERR_FAIL_COND_V(arg_size > uint32_t(p_buffer_len - ofs), ERR_INVALID_DATA);
605 Variant v;
606 Error err = MultiplayerAPI::decode_and_decompress_variant(v, &p_buffer[ofs], arg_size, nullptr, false);
607 ERR_FAIL_COND_V(err != OK, err);
608 ofs += arg_size;
609 node = spawner->instantiate_custom(v);
610 } else {
611 // Scene based spawn.
612 node = spawner->instantiate_scene(scene_id);
613 }
614 ERR_FAIL_COND_V(!node, ERR_UNAUTHORIZED);
615 node->set_name(name);
616
617 // Add and track remote
618 ERR_FAIL_COND_V(!peers_info.has(p_from), ERR_UNAVAILABLE);
619 ERR_FAIL_COND_V(peers_info[p_from].recv_nodes.has(net_id), ERR_ALREADY_IN_USE);
620 ObjectID oid = node->get_instance_id();
621 TrackedNode &tobj = _track(oid);
622 tobj.spawner = spawner->get_instance_id();
623 tobj.net_id = net_id;
624 tobj.remote_peer = p_from;
625 peers_info[p_from].recv_nodes[net_id] = oid;
626
627 // The initial state will be applied during the sync config (i.e. before _ready).
628 pending_spawn = node->get_instance_id();
629 pending_spawn_remote = p_from;
630 pending_buffer_size = p_buffer_len - ofs;
631 pending_buffer = pending_buffer_size > 0 ? &p_buffer[ofs] : nullptr;
632 pending_sync_net_ids = sync_ids;
633
634 parent->add_child(node);
635 spawner->emit_signal(SNAME("spawned"), node);
636
637 pending_spawn = ObjectID();
638 pending_spawn_remote = 0;
639 pending_buffer = nullptr;
640 pending_buffer_size = 0;
641 if (pending_sync_net_ids.size()) {
642 pending_sync_net_ids.clear();
643 ERR_FAIL_V(ERR_INVALID_DATA); // Should have been consumed.
644 }
645 return OK;
646}
647
648Error SceneReplicationInterface::on_despawn_receive(int p_from, const uint8_t *p_buffer, int p_buffer_len) {
649 ERR_FAIL_COND_V_MSG(p_buffer_len < 5, ERR_INVALID_DATA, "Invalid spawn packet received");
650 int ofs = 1; // The spawn/despawn command.
651 uint32_t net_id = decode_uint32(&p_buffer[ofs]);
652 ofs += 4;
653
654 // Untrack remote
655 ERR_FAIL_COND_V(!peers_info.has(p_from), ERR_UNAUTHORIZED);
656 PeerInfo &pinfo = peers_info[p_from];
657 ERR_FAIL_COND_V(!pinfo.recv_nodes.has(net_id), ERR_UNAUTHORIZED);
658 Node *node = get_id_as<Node>(pinfo.recv_nodes[net_id]);
659 ERR_FAIL_COND_V(!node, ERR_BUG);
660 pinfo.recv_nodes.erase(net_id);
661
662 const ObjectID oid = node->get_instance_id();
663 ERR_FAIL_COND_V(!tracked_nodes.has(oid), ERR_BUG);
664 MultiplayerSpawner *spawner = get_id_as<MultiplayerSpawner>(tracked_nodes[oid].spawner);
665 ERR_FAIL_COND_V(!spawner, ERR_DOES_NOT_EXIST);
666 ERR_FAIL_COND_V(p_from != spawner->get_multiplayer_authority(), ERR_UNAUTHORIZED);
667
668 if (node->get_parent() != nullptr) {
669 node->get_parent()->remove_child(node);
670 }
671 node->queue_free();
672 spawner->emit_signal(SNAME("despawned"), node);
673
674 return OK;
675}
676
677bool SceneReplicationInterface::_verify_synchronizer(int p_peer, MultiplayerSynchronizer *p_sync, uint32_t &r_net_id) {
678 r_net_id = p_sync->get_net_id();
679 if (r_net_id == 0 || (r_net_id & 0x80000000)) {
680 int path_id = 0;
681 bool verified = multiplayer->get_path_cache()->send_object_cache(p_sync, p_peer, path_id);
682 ERR_FAIL_COND_V_MSG(path_id < 0, false, "This should never happen!");
683 if (r_net_id == 0) {
684 // First time path based ID.
685 r_net_id = path_id | 0x80000000;
686 p_sync->set_net_id(r_net_id | 0x80000000);
687 }
688 return verified;
689 }
690 return true;
691}
692
693MultiplayerSynchronizer *SceneReplicationInterface::_find_synchronizer(int p_peer, uint32_t p_net_id) {
694 MultiplayerSynchronizer *sync = nullptr;
695 if (p_net_id & 0x80000000) {
696 sync = Object::cast_to<MultiplayerSynchronizer>(multiplayer->get_path_cache()->get_cached_object(p_peer, p_net_id & 0x7FFFFFFF));
697 } else if (peers_info[p_peer].recv_sync_ids.has(p_net_id)) {
698 const ObjectID &sid = peers_info[p_peer].recv_sync_ids[p_net_id];
699 sync = get_id_as<MultiplayerSynchronizer>(sid);
700 }
701 return sync;
702}
703
704void SceneReplicationInterface::_send_delta(int p_peer, const HashSet<ObjectID> p_synchronizers, uint64_t p_usec, const HashMap<ObjectID, uint64_t> p_last_watch_usecs) {
705 MAKE_ROOM(/* header */ 1 + /* element */ 4 + 8 + 4 + delta_mtu);
706 uint8_t *ptr = packet_cache.ptrw();
707 ptr[0] = SceneMultiplayer::NETWORK_COMMAND_SYNC | (1 << SceneMultiplayer::CMD_FLAG_0_SHIFT);
708 int ofs = 1;
709 for (const ObjectID &oid : p_synchronizers) {
710 MultiplayerSynchronizer *sync = get_id_as<MultiplayerSynchronizer>(oid);
711 ERR_CONTINUE(!sync || !sync->get_replication_config().is_valid() || !sync->is_multiplayer_authority());
712 uint32_t net_id;
713 if (!_verify_synchronizer(p_peer, sync, net_id)) {
714 continue;
715 }
716 uint64_t last_usec = p_last_watch_usecs.has(oid) ? p_last_watch_usecs[oid] : 0;
717 uint64_t indexes;
718 List<Variant> delta = sync->get_delta_state(p_usec, last_usec, indexes);
719
720 if (!delta.size()) {
721 continue; // Nothing to update.
722 }
723
724 Vector<const Variant *> varp;
725 varp.resize(delta.size());
726 const Variant **vptr = varp.ptrw();
727 int i = 0;
728 for (const Variant &v : delta) {
729 vptr[i] = &v;
730 i++;
731 }
732 int size;
733 Error err = MultiplayerAPI::encode_and_compress_variants(vptr, varp.size(), nullptr, size);
734 ERR_CONTINUE_MSG(err != OK, "Unable to encode delta state.");
735
736 ERR_CONTINUE_MSG(size > delta_mtu, vformat("Synchronizer delta bigger than MTU will not be sent (%d > %d): %s", size, delta_mtu, sync->get_path()));
737
738 if (ofs + 4 + 8 + 4 + size > delta_mtu) {
739 // Send what we got, and reset write.
740 _send_raw(packet_cache.ptr(), ofs, p_peer, true);
741 ofs = 1;
742 }
743 if (size) {
744 ofs += encode_uint32(sync->get_net_id(), &ptr[ofs]);
745 ofs += encode_uint64(indexes, &ptr[ofs]);
746 ofs += encode_uint32(size, &ptr[ofs]);
747 MultiplayerAPI::encode_and_compress_variants(vptr, varp.size(), &ptr[ofs], size);
748 ofs += size;
749 }
750#ifdef DEBUG_ENABLED
751 _profile_node_data("delta_out", oid, size);
752#endif
753 peers_info[p_peer].last_watch_usecs[oid] = p_usec;
754 }
755 if (ofs > 1) {
756 // Got some left over to send.
757 _send_raw(packet_cache.ptr(), ofs, p_peer, true);
758 }
759}
760
761Error SceneReplicationInterface::on_delta_receive(int p_from, const uint8_t *p_buffer, int p_buffer_len) {
762 int ofs = 1;
763 while (ofs + 4 + 8 + 4 < p_buffer_len) {
764 uint32_t net_id = decode_uint32(&p_buffer[ofs]);
765 ofs += 4;
766 uint64_t indexes = decode_uint64(&p_buffer[ofs]);
767 ofs += 8;
768 uint32_t size = decode_uint32(&p_buffer[ofs]);
769 ofs += 4;
770 ERR_FAIL_COND_V(size > uint32_t(p_buffer_len - ofs), ERR_INVALID_DATA);
771 MultiplayerSynchronizer *sync = _find_synchronizer(p_from, net_id);
772 Node *node = sync ? sync->get_root_node() : nullptr;
773 if (!sync || sync->get_multiplayer_authority() != p_from || !node) {
774 ofs += size;
775 ERR_CONTINUE_MSG(true, "Ignoring delta for non-authority or invalid synchronizer.");
776 }
777 List<NodePath> props = sync->get_delta_properties(indexes);
778 ERR_FAIL_COND_V(props.size() == 0, ERR_INVALID_DATA);
779 Vector<Variant> vars;
780 vars.resize(props.size());
781 int consumed = 0;
782 Error err = MultiplayerAPI::decode_and_decompress_variants(vars, p_buffer + ofs, size, consumed);
783 ERR_FAIL_COND_V(err != OK, err);
784 ERR_FAIL_COND_V(uint32_t(consumed) != size, ERR_INVALID_DATA);
785 err = MultiplayerSynchronizer::set_state(props, node, vars);
786 ERR_FAIL_COND_V(err != OK, err);
787 ofs += size;
788 sync->emit_signal(SNAME("delta_synchronized"));
789#ifdef DEBUG_ENABLED
790 _profile_node_data("delta_in", sync->get_instance_id(), size);
791#endif
792 }
793 return OK;
794}
795
796void SceneReplicationInterface::_send_sync(int p_peer, const HashSet<ObjectID> p_synchronizers, uint16_t p_sync_net_time, uint64_t p_usec) {
797 MAKE_ROOM(/* header */ 3 + /* element */ 4 + 4 + sync_mtu);
798 uint8_t *ptr = packet_cache.ptrw();
799 ptr[0] = SceneMultiplayer::NETWORK_COMMAND_SYNC;
800 int ofs = 1;
801 ofs += encode_uint16(p_sync_net_time, &ptr[1]);
802 // Can only send updates for already notified nodes.
803 // This is a lazy implementation, we could optimize much more here with by grouping by replication config.
804 for (const ObjectID &oid : p_synchronizers) {
805 MultiplayerSynchronizer *sync = get_id_as<MultiplayerSynchronizer>(oid);
806 ERR_CONTINUE(!sync || !sync->get_replication_config().is_valid() || !sync->is_multiplayer_authority());
807 if (!sync->update_outbound_sync_time(p_usec)) {
808 continue; // nothing to sync.
809 }
810
811 Node *node = sync->get_root_node();
812 ERR_CONTINUE(!node);
813 uint32_t net_id = sync->get_net_id();
814 if (!_verify_synchronizer(p_peer, sync, net_id)) {
815 // The path based sync is not yet confirmed, skipping.
816 continue;
817 }
818 int size;
819 Vector<Variant> vars;
820 Vector<const Variant *> varp;
821 const List<NodePath> props = sync->get_replication_config()->get_sync_properties();
822 Error err = MultiplayerSynchronizer::get_state(props, node, vars, varp);
823 ERR_CONTINUE_MSG(err != OK, "Unable to retrieve sync state.");
824 err = MultiplayerAPI::encode_and_compress_variants(varp.ptrw(), varp.size(), nullptr, size);
825 ERR_CONTINUE_MSG(err != OK, "Unable to encode sync state.");
826 // TODO Handle single state above MTU.
827 ERR_CONTINUE_MSG(size > sync_mtu, vformat("Node states bigger than MTU will not be sent (%d > %d): %s", size, sync_mtu, node->get_path()));
828 if (ofs + 4 + 4 + size > sync_mtu) {
829 // Send what we got, and reset write.
830 _send_raw(packet_cache.ptr(), ofs, p_peer, false);
831 ofs = 3;
832 }
833 if (size) {
834 ofs += encode_uint32(sync->get_net_id(), &ptr[ofs]);
835 ofs += encode_uint32(size, &ptr[ofs]);
836 MultiplayerAPI::encode_and_compress_variants(varp.ptrw(), varp.size(), &ptr[ofs], size);
837 ofs += size;
838 }
839#ifdef DEBUG_ENABLED
840 _profile_node_data("sync_out", oid, size);
841#endif
842 }
843 if (ofs > 3) {
844 // Got some left over to send.
845 _send_raw(packet_cache.ptr(), ofs, p_peer, false);
846 }
847}
848
849Error SceneReplicationInterface::on_sync_receive(int p_from, const uint8_t *p_buffer, int p_buffer_len) {
850 ERR_FAIL_COND_V_MSG(p_buffer_len < 11, ERR_INVALID_DATA, "Invalid sync packet received");
851 bool is_delta = (p_buffer[0] & (1 << SceneMultiplayer::CMD_FLAG_0_SHIFT)) != 0;
852 if (is_delta) {
853 return on_delta_receive(p_from, p_buffer, p_buffer_len);
854 }
855 uint16_t time = decode_uint16(&p_buffer[1]);
856 int ofs = 3;
857 while (ofs + 8 < p_buffer_len) {
858 uint32_t net_id = decode_uint32(&p_buffer[ofs]);
859 ofs += 4;
860 uint32_t size = decode_uint32(&p_buffer[ofs]);
861 ofs += 4;
862 ERR_FAIL_COND_V(size > uint32_t(p_buffer_len - ofs), ERR_INVALID_DATA);
863 MultiplayerSynchronizer *sync = _find_synchronizer(p_from, net_id);
864 if (!sync) {
865 // Not received yet.
866 ofs += size;
867 continue;
868 }
869 Node *node = sync->get_root_node();
870 if (sync->get_multiplayer_authority() != p_from || !node) {
871 // Not valid for me.
872 ofs += size;
873 ERR_CONTINUE_MSG(true, "Ignoring sync data from non-authority or for missing node.");
874 }
875 if (!sync->update_inbound_sync_time(time)) {
876 // State is too old.
877 ofs += size;
878 continue;
879 }
880 const List<NodePath> props = sync->get_replication_config()->get_sync_properties();
881 Vector<Variant> vars;
882 vars.resize(props.size());
883 int consumed;
884 Error err = MultiplayerAPI::decode_and_decompress_variants(vars, &p_buffer[ofs], size, consumed);
885 ERR_FAIL_COND_V(err, err);
886 err = MultiplayerSynchronizer::set_state(props, node, vars);
887 ERR_FAIL_COND_V(err, err);
888 ofs += size;
889 sync->emit_signal(SNAME("synchronized"));
890#ifdef DEBUG_ENABLED
891 _profile_node_data("sync_in", sync->get_instance_id(), size);
892#endif
893 }
894 return OK;
895}
896
897void SceneReplicationInterface::set_max_sync_packet_size(int p_size) {
898 ERR_FAIL_COND_MSG(p_size < 128, "Sync maximum packet size must be at least 128 bytes.");
899 sync_mtu = p_size;
900}
901
902int SceneReplicationInterface::get_max_sync_packet_size() const {
903 return sync_mtu;
904}
905
906void SceneReplicationInterface::set_max_delta_packet_size(int p_size) {
907 ERR_FAIL_COND_MSG(p_size < 128, "Sync maximum packet size must be at least 128 bytes.");
908 delta_mtu = p_size;
909}
910
911int SceneReplicationInterface::get_max_delta_packet_size() const {
912 return delta_mtu;
913}
914