1// This is an open source non-commercial project. Dear PVS-Studio, please check
2// it. PVS-Studio Static Code Analyzer for C, C++ and C#: http://www.viva64.com
3
4#include <stdbool.h>
5#include <string.h>
6#include <inttypes.h>
7
8#include <uv.h>
9#include <msgpack.h>
10
11#include "nvim/api/private/helpers.h"
12#include "nvim/api/vim.h"
13#include "nvim/api/ui.h"
14#include "nvim/channel.h"
15#include "nvim/msgpack_rpc/channel.h"
16#include "nvim/event/loop.h"
17#include "nvim/event/libuv_process.h"
18#include "nvim/event/rstream.h"
19#include "nvim/event/wstream.h"
20#include "nvim/event/socket.h"
21#include "nvim/msgpack_rpc/helpers.h"
22#include "nvim/vim.h"
23#include "nvim/main.h"
24#include "nvim/ascii.h"
25#include "nvim/memory.h"
26#include "nvim/eval.h"
27#include "nvim/os_unix.h"
28#include "nvim/message.h"
29#include "nvim/map.h"
30#include "nvim/log.h"
31#include "nvim/misc1.h"
32#include "nvim/lib/kvec.h"
33#include "nvim/os/input.h"
34#include "nvim/ui.h"
35
36#if MIN_LOG_LEVEL > DEBUG_LOG_LEVEL
37#define log_client_msg(...)
38#define log_server_msg(...)
39#endif
40
41static PMap(cstr_t) *event_strings = NULL;
42static msgpack_sbuffer out_buffer;
43
44#ifdef INCLUDE_GENERATED_DECLARATIONS
45# include "msgpack_rpc/channel.c.generated.h"
46#endif
47
48void rpc_init(void)
49{
50 ch_before_blocking_events = multiqueue_new_child(main_loop.events);
51 event_strings = pmap_new(cstr_t)();
52 msgpack_sbuffer_init(&out_buffer);
53}
54
55
56void rpc_start(Channel *channel)
57{
58 channel_incref(channel);
59 channel->is_rpc = true;
60 RpcState *rpc = &channel->rpc;
61 rpc->closed = false;
62 rpc->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE);
63 rpc->subscribed_events = pmap_new(cstr_t)();
64 rpc->next_request_id = 1;
65 rpc->info = (Dictionary)ARRAY_DICT_INIT;
66 kv_init(rpc->call_stack);
67
68 if (channel->streamtype != kChannelStreamInternal) {
69 Stream *out = channel_outstream(channel);
70#if MIN_LOG_LEVEL <= DEBUG_LOG_LEVEL
71 Stream *in = channel_instream(channel);
72 DLOG("rpc ch %" PRIu64 " in-stream=%p out-stream=%p", channel->id,
73 (void *)in, (void *)out);
74#endif
75
76 rstream_start(out, receive_msgpack, channel);
77 }
78}
79
80
81static Channel *find_rpc_channel(uint64_t id)
82{
83 Channel *chan = find_channel(id);
84 if (!chan || !chan->is_rpc || chan->rpc.closed) {
85 return NULL;
86 }
87 return chan;
88}
89
90/// Publishes an event to a channel.
91///
92/// @param id Channel id. 0 means "broadcast to all subscribed channels"
93/// @param name Event name (application-defined)
94/// @param args Array of event arguments
95/// @return True if the event was sent successfully, false otherwise.
96bool rpc_send_event(uint64_t id, const char *name, Array args)
97{
98 Channel *channel = NULL;
99
100 if (id && (!(channel = find_rpc_channel(id)))) {
101 api_free_array(args);
102 return false;
103 }
104
105 if (channel) {
106 send_event(channel, name, args);
107 } else {
108 broadcast_event(name, args);
109 }
110
111 return true;
112}
113
114/// Sends a method call to a channel
115///
116/// @param id The channel id
117/// @param method_name The method name, an arbitrary string
118/// @param args Array with method arguments
119/// @param[out] error True if the return value is an error
120/// @return Whatever the remote method returned
121Object rpc_send_call(uint64_t id,
122 const char *method_name,
123 Array args,
124 Error *err)
125{
126 Channel *channel = NULL;
127
128 if (!(channel = find_rpc_channel(id))) {
129 api_set_error(err, kErrorTypeException, "Invalid channel: %" PRIu64, id);
130 api_free_array(args);
131 return NIL;
132 }
133
134 channel_incref(channel);
135 RpcState *rpc = &channel->rpc;
136 uint32_t request_id = rpc->next_request_id++;
137 // Send the msgpack-rpc request
138 send_request(channel, request_id, method_name, args);
139
140 // Push the frame
141 ChannelCallFrame frame = { request_id, false, false, NIL };
142 kv_push(rpc->call_stack, &frame);
143 LOOP_PROCESS_EVENTS_UNTIL(&main_loop, channel->events, -1, frame.returned);
144 (void)kv_pop(rpc->call_stack);
145
146 if (frame.errored) {
147 if (frame.result.type == kObjectTypeString) {
148 api_set_error(err, kErrorTypeException, "%s",
149 frame.result.data.string.data);
150 } else if (frame.result.type == kObjectTypeArray) {
151 // Should be an error in the form [type, message]
152 Array array = frame.result.data.array;
153 if (array.size == 2 && array.items[0].type == kObjectTypeInteger
154 && (array.items[0].data.integer == kErrorTypeException
155 || array.items[0].data.integer == kErrorTypeValidation)
156 && array.items[1].type == kObjectTypeString) {
157 api_set_error(err, (ErrorType)array.items[0].data.integer, "%s",
158 array.items[1].data.string.data);
159 } else {
160 api_set_error(err, kErrorTypeException, "%s", "unknown error");
161 }
162 } else {
163 api_set_error(err, kErrorTypeException, "%s", "unknown error");
164 }
165
166 api_free_object(frame.result);
167 }
168
169 channel_decref(channel);
170
171 return frame.errored ? NIL : frame.result;
172}
173
174/// Subscribes to event broadcasts
175///
176/// @param id The channel id
177/// @param event The event type string
178void rpc_subscribe(uint64_t id, char *event)
179{
180 Channel *channel;
181
182 if (!(channel = find_rpc_channel(id))) {
183 abort();
184 }
185
186 char *event_string = pmap_get(cstr_t)(event_strings, event);
187
188 if (!event_string) {
189 event_string = xstrdup(event);
190 pmap_put(cstr_t)(event_strings, event_string, event_string);
191 }
192
193 pmap_put(cstr_t)(channel->rpc.subscribed_events, event_string, event_string);
194}
195
196/// Unsubscribes to event broadcasts
197///
198/// @param id The channel id
199/// @param event The event type string
200void rpc_unsubscribe(uint64_t id, char *event)
201{
202 Channel *channel;
203
204 if (!(channel = find_rpc_channel(id))) {
205 abort();
206 }
207
208 unsubscribe(channel, event);
209}
210
211static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c,
212 void *data, bool eof)
213{
214 Channel *channel = data;
215 channel_incref(channel);
216
217 if (eof) {
218 channel_close(channel->id, kChannelPartRpc, NULL);
219 char buf[256];
220 snprintf(buf, sizeof(buf), "ch %" PRIu64 " was closed by the client",
221 channel->id);
222 call_set_error(channel, buf, WARN_LOG_LEVEL);
223 goto end;
224 }
225
226 size_t count = rbuffer_size(rbuf);
227 DLOG("ch %" PRIu64 ": parsing %zu bytes from msgpack Stream: %p",
228 channel->id, count, (void *)stream);
229
230 // Feed the unpacker with data
231 msgpack_unpacker_reserve_buffer(channel->rpc.unpacker, count);
232 rbuffer_read(rbuf, msgpack_unpacker_buffer(channel->rpc.unpacker), count);
233 msgpack_unpacker_buffer_consumed(channel->rpc.unpacker, count);
234
235 parse_msgpack(channel);
236
237end:
238 channel_decref(channel);
239}
240
241static void parse_msgpack(Channel *channel)
242{
243 msgpack_unpacked unpacked;
244 msgpack_unpacked_init(&unpacked);
245 msgpack_unpack_return result;
246
247 // Deserialize everything we can.
248 while ((result = msgpack_unpacker_next(channel->rpc.unpacker, &unpacked)) ==
249 MSGPACK_UNPACK_SUCCESS) {
250 bool is_response = is_rpc_response(&unpacked.data);
251 log_client_msg(channel->id, !is_response, unpacked.data);
252
253 if (is_response) {
254 if (is_valid_rpc_response(&unpacked.data, channel)) {
255 complete_call(&unpacked.data, channel);
256 } else {
257 char buf[256];
258 snprintf(buf, sizeof(buf),
259 "ch %" PRIu64 " returned a response with an unknown request "
260 "id. Ensure the client is properly synchronized",
261 channel->id);
262 call_set_error(channel, buf, ERROR_LOG_LEVEL);
263 }
264 msgpack_unpacked_destroy(&unpacked);
265 // Bail out from this event loop iteration
266 return;
267 }
268
269 handle_request(channel, &unpacked.data);
270 }
271
272 if (result == MSGPACK_UNPACK_NOMEM_ERROR) {
273 mch_errmsg(e_outofmem);
274 mch_errmsg("\n");
275 channel_decref(channel);
276 preserve_exit();
277 }
278
279 if (result == MSGPACK_UNPACK_PARSE_ERROR) {
280 // See src/msgpack/unpack_template.h in msgpack source tree for
281 // causes for this error(search for 'goto _failed')
282 //
283 // A not so uncommon cause for this might be deserializing objects with
284 // a high nesting level: msgpack will break when its internal parse stack
285 // size exceeds MSGPACK_EMBED_STACK_SIZE (defined as 32 by default)
286 send_error(channel, kMessageTypeRequest, 0,
287 "Invalid msgpack payload. "
288 "This error can also happen when deserializing "
289 "an object with high level of nesting");
290 }
291}
292
293/// Handles requests and notifications received on the channel.
294static void handle_request(Channel *channel, msgpack_object *request)
295 FUNC_ATTR_NONNULL_ALL
296{
297 uint32_t request_id;
298 Error error = ERROR_INIT;
299 MessageType type = msgpack_rpc_validate(&request_id, request, &error);
300
301 if (ERROR_SET(&error)) {
302 // Validation failed, send response with error
303 if (channel_write(channel,
304 serialize_response(channel->id,
305 type,
306 request_id,
307 &error,
308 NIL,
309 &out_buffer))) {
310 char buf[256];
311 snprintf(buf, sizeof(buf),
312 "ch %" PRIu64 " sent an invalid message, closed.",
313 channel->id);
314 call_set_error(channel, buf, ERROR_LOG_LEVEL);
315 }
316 api_clear_error(&error);
317 return;
318 }
319 assert(type == kMessageTypeRequest || type == kMessageTypeNotification);
320
321 MsgpackRpcRequestHandler handler;
322 msgpack_object *method = msgpack_rpc_method(request);
323 handler = msgpack_rpc_get_handler_for(method->via.bin.ptr,
324 method->via.bin.size,
325 &error);
326
327 // check method arguments
328 Array args = ARRAY_DICT_INIT;
329 if (!ERROR_SET(&error)
330 && !msgpack_rpc_to_array(msgpack_rpc_args(request), &args)) {
331 api_set_error(&error, kErrorTypeException, "Invalid method arguments");
332 }
333
334 if (ERROR_SET(&error)) {
335 send_error(channel, type, request_id, error.msg);
336 api_clear_error(&error);
337 api_free_array(args);
338 return;
339 }
340
341 RequestEvent *evdata = xmalloc(sizeof(RequestEvent));
342 evdata->type = type;
343 evdata->channel = channel;
344 evdata->handler = handler;
345 evdata->args = args;
346 evdata->request_id = request_id;
347 channel_incref(channel);
348 if (handler.fast) {
349 bool is_get_mode = handler.fn == handle_nvim_get_mode;
350
351 if (is_get_mode && !input_blocking()) {
352 // Defer the event to a special queue used by os/input.c. #6247
353 multiqueue_put(ch_before_blocking_events, request_event, 1, evdata);
354 } else {
355 // Invoke immediately.
356 request_event((void **)&evdata);
357 }
358 } else {
359 bool is_resize = handler.fn == handle_nvim_ui_try_resize;
360 if (is_resize) {
361 Event ev = event_create_oneshot(event_create(request_event, 1, evdata),
362 2);
363 multiqueue_put_event(channel->events, ev);
364 multiqueue_put_event(resize_events, ev);
365 } else {
366 multiqueue_put(channel->events, request_event, 1, evdata);
367 DLOG("RPC: scheduled %.*s", method->via.bin.size, method->via.bin.ptr);
368 }
369 }
370}
371
372
373/// Handles a message, depending on the type:
374/// - Request: invokes method and writes the response (or error).
375/// - Notification: invokes method (emits `nvim_error_event` on error).
376static void request_event(void **argv)
377{
378 RequestEvent *e = argv[0];
379 Channel *channel = e->channel;
380 MsgpackRpcRequestHandler handler = e->handler;
381 Error error = ERROR_INIT;
382 Object result = handler.fn(channel->id, e->args, &error);
383 if (e->type == kMessageTypeRequest || ERROR_SET(&error)) {
384 // Send the response.
385 msgpack_packer response;
386 msgpack_packer_init(&response, &out_buffer, msgpack_sbuffer_write);
387 channel_write(channel, serialize_response(channel->id,
388 e->type,
389 e->request_id,
390 &error,
391 result,
392 &out_buffer));
393 } else {
394 api_free_object(result);
395 }
396 api_free_array(e->args);
397 channel_decref(channel);
398 xfree(e);
399 api_clear_error(&error);
400}
401
402static bool channel_write(Channel *channel, WBuffer *buffer)
403{
404 bool success;
405
406 if (channel->rpc.closed) {
407 wstream_release_wbuffer(buffer);
408 return false;
409 }
410
411 if (channel->streamtype == kChannelStreamInternal) {
412 channel_incref(channel);
413 CREATE_EVENT(channel->events, internal_read_event, 2, channel, buffer);
414 success = true;
415 } else {
416 Stream *in = channel_instream(channel);
417 success = wstream_write(in, buffer);
418 }
419
420
421 if (!success) {
422 // If the write failed for any reason, close the channel
423 char buf[256];
424 snprintf(buf,
425 sizeof(buf),
426 "ch %" PRIu64 ": stream write failed. "
427 "RPC canceled; closing channel",
428 channel->id);
429 call_set_error(channel, buf, ERROR_LOG_LEVEL);
430 }
431
432 return success;
433}
434
435static void internal_read_event(void **argv)
436{
437 Channel *channel = argv[0];
438 WBuffer *buffer = argv[1];
439
440 msgpack_unpacker_reserve_buffer(channel->rpc.unpacker, buffer->size);
441 memcpy(msgpack_unpacker_buffer(channel->rpc.unpacker),
442 buffer->data, buffer->size);
443 msgpack_unpacker_buffer_consumed(channel->rpc.unpacker, buffer->size);
444
445 parse_msgpack(channel);
446
447 channel_decref(channel);
448 wstream_release_wbuffer(buffer);
449}
450
451static void send_error(Channel *chan, MessageType type, uint32_t id, char *err)
452{
453 Error e = ERROR_INIT;
454 api_set_error(&e, kErrorTypeException, "%s", err);
455 channel_write(chan, serialize_response(chan->id,
456 type,
457 id,
458 &e,
459 NIL,
460 &out_buffer));
461 api_clear_error(&e);
462}
463
464static void send_request(Channel *channel,
465 uint32_t id,
466 const char *name,
467 Array args)
468{
469 const String method = cstr_as_string((char *)name);
470 channel_write(channel, serialize_request(channel->id,
471 id,
472 method,
473 args,
474 &out_buffer,
475 1));
476}
477
478static void send_event(Channel *channel,
479 const char *name,
480 Array args)
481{
482 const String method = cstr_as_string((char *)name);
483 channel_write(channel, serialize_request(channel->id,
484 0,
485 method,
486 args,
487 &out_buffer,
488 1));
489}
490
491static void broadcast_event(const char *name, Array args)
492{
493 kvec_t(Channel *) subscribed = KV_INITIAL_VALUE;
494 Channel *channel;
495
496 map_foreach_value(channels, channel, {
497 if (channel->is_rpc
498 && pmap_has(cstr_t)(channel->rpc.subscribed_events, name)) {
499 kv_push(subscribed, channel);
500 }
501 });
502
503 if (!kv_size(subscribed)) {
504 api_free_array(args);
505 goto end;
506 }
507
508 const String method = cstr_as_string((char *)name);
509 WBuffer *buffer = serialize_request(0,
510 0,
511 method,
512 args,
513 &out_buffer,
514 kv_size(subscribed));
515
516 for (size_t i = 0; i < kv_size(subscribed); i++) {
517 Channel *c = kv_A(subscribed, i);
518 channel_write(c, buffer);
519 }
520
521end:
522 kv_destroy(subscribed);
523}
524
525static void unsubscribe(Channel *channel, char *event)
526{
527 char *event_string = pmap_get(cstr_t)(event_strings, event);
528 if (!event_string) {
529 WLOG("RPC: ch %" PRIu64 ": tried to unsubscribe unknown event '%s'",
530 channel->id, event);
531 return;
532 }
533 pmap_del(cstr_t)(channel->rpc.subscribed_events, event_string);
534
535 map_foreach_value(channels, channel, {
536 if (channel->is_rpc
537 && pmap_has(cstr_t)(channel->rpc.subscribed_events, event_string)) {
538 return;
539 }
540 });
541
542 // Since the string is no longer used by other channels, release it's memory
543 pmap_del(cstr_t)(event_strings, event_string);
544 xfree(event_string);
545}
546
547
548/// Mark rpc state as closed, and release its reference to the channel.
549/// Don't call this directly, call channel_close(id, kChannelPartRpc, &error)
550void rpc_close(Channel *channel)
551{
552 if (channel->rpc.closed) {
553 return;
554 }
555
556 channel->rpc.closed = true;
557 channel_decref(channel);
558
559 if (channel->streamtype == kChannelStreamStdio) {
560 multiqueue_put(main_loop.fast_events, exit_event, 0);
561 }
562}
563
564static void exit_event(void **argv)
565{
566 if (!exiting) {
567 mch_exit(0);
568 }
569}
570
571void rpc_free(Channel *channel)
572{
573 remote_ui_disconnect(channel->id);
574 msgpack_unpacker_free(channel->rpc.unpacker);
575
576 // Unsubscribe from all events
577 char *event_string;
578 map_foreach_value(channel->rpc.subscribed_events, event_string, {
579 unsubscribe(channel, event_string);
580 });
581
582 pmap_free(cstr_t)(channel->rpc.subscribed_events);
583 kv_destroy(channel->rpc.call_stack);
584 api_free_dictionary(channel->rpc.info);
585}
586
587static bool is_rpc_response(msgpack_object *obj)
588{
589 return obj->type == MSGPACK_OBJECT_ARRAY
590 && obj->via.array.size == 4
591 && obj->via.array.ptr[0].type == MSGPACK_OBJECT_POSITIVE_INTEGER
592 && obj->via.array.ptr[0].via.u64 == 1
593 && obj->via.array.ptr[1].type == MSGPACK_OBJECT_POSITIVE_INTEGER;
594}
595
596static bool is_valid_rpc_response(msgpack_object *obj, Channel *channel)
597{
598 uint32_t response_id = (uint32_t)obj->via.array.ptr[1].via.u64;
599 if (kv_size(channel->rpc.call_stack) == 0) {
600 return false;
601 }
602
603 // Must be equal to the frame at the stack's bottom
604 ChannelCallFrame *frame = kv_last(channel->rpc.call_stack);
605 return response_id == frame->request_id;
606}
607
608static void complete_call(msgpack_object *obj, Channel *channel)
609{
610 ChannelCallFrame *frame = kv_last(channel->rpc.call_stack);
611 frame->returned = true;
612 frame->errored = obj->via.array.ptr[2].type != MSGPACK_OBJECT_NIL;
613
614 if (frame->errored) {
615 msgpack_rpc_to_object(&obj->via.array.ptr[2], &frame->result);
616 } else {
617 msgpack_rpc_to_object(&obj->via.array.ptr[3], &frame->result);
618 }
619}
620
621static void call_set_error(Channel *channel, char *msg, int loglevel)
622{
623 LOG(loglevel, "RPC: %s", msg);
624 for (size_t i = 0; i < kv_size(channel->rpc.call_stack); i++) {
625 ChannelCallFrame *frame = kv_A(channel->rpc.call_stack, i);
626 frame->returned = true;
627 frame->errored = true;
628 api_free_object(frame->result);
629 frame->result = STRING_OBJ(cstr_to_string(msg));
630 }
631
632 channel_close(channel->id, kChannelPartRpc, NULL);
633}
634
635static WBuffer *serialize_request(uint64_t channel_id,
636 uint32_t request_id,
637 const String method,
638 Array args,
639 msgpack_sbuffer *sbuffer,
640 size_t refcount)
641{
642 msgpack_packer pac;
643 msgpack_packer_init(&pac, sbuffer, msgpack_sbuffer_write);
644 msgpack_rpc_serialize_request(request_id, method, args, &pac);
645 log_server_msg(channel_id, sbuffer);
646 WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size),
647 sbuffer->size,
648 refcount,
649 xfree);
650 msgpack_sbuffer_clear(sbuffer);
651 api_free_array(args);
652 return rv;
653}
654
655static WBuffer *serialize_response(uint64_t channel_id,
656 MessageType type,
657 uint32_t response_id,
658 Error *err,
659 Object arg,
660 msgpack_sbuffer *sbuffer)
661{
662 msgpack_packer pac;
663 msgpack_packer_init(&pac, sbuffer, msgpack_sbuffer_write);
664 if (ERROR_SET(err) && type == kMessageTypeNotification) {
665 Array args = ARRAY_DICT_INIT;
666 ADD(args, INTEGER_OBJ(err->type));
667 ADD(args, STRING_OBJ(cstr_to_string(err->msg)));
668 msgpack_rpc_serialize_request(0, cstr_as_string("nvim_error_event"),
669 args, &pac);
670 api_free_array(args);
671 } else {
672 msgpack_rpc_serialize_response(response_id, err, arg, &pac);
673 }
674 log_server_msg(channel_id, sbuffer);
675 WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size),
676 sbuffer->size,
677 1, // responses only go though 1 channel
678 xfree);
679 msgpack_sbuffer_clear(sbuffer);
680 api_free_object(arg);
681 return rv;
682}
683
684void rpc_set_client_info(uint64_t id, Dictionary info)
685{
686 Channel *chan = find_rpc_channel(id);
687 if (!chan) {
688 abort();
689 }
690
691 api_free_dictionary(chan->rpc.info);
692 chan->rpc.info = info;
693 channel_info_changed(chan, false);
694}
695
696Dictionary rpc_client_info(Channel *chan)
697{
698 return copy_dictionary(chan->rpc.info);
699}
700
701const char *rpc_client_name(Channel *chan)
702{
703 if (!chan->is_rpc) {
704 return NULL;
705 }
706 Dictionary info = chan->rpc.info;
707 for (size_t i = 0; i < info.size; i++) {
708 if (strequal("name", info.items[i].key.data)
709 && info.items[i].value.type == kObjectTypeString) {
710 return info.items[i].value.data.string.data;
711 }
712 }
713
714 return NULL;
715}
716
717#if MIN_LOG_LEVEL <= DEBUG_LOG_LEVEL
718#define REQ "[request] "
719#define RES "[response] "
720#define NOT "[notify] "
721#define ERR "[error] "
722
723// Cannot define array with negative offsets, so this one is needed to be added
724// to MSGPACK_UNPACK_\* values.
725#define MUR_OFF 2
726
727static const char *const msgpack_error_messages[] = {
728 [MSGPACK_UNPACK_EXTRA_BYTES + MUR_OFF] = "extra bytes found",
729 [MSGPACK_UNPACK_CONTINUE + MUR_OFF] = "incomplete string",
730 [MSGPACK_UNPACK_PARSE_ERROR + MUR_OFF] = "parse error",
731 [MSGPACK_UNPACK_NOMEM_ERROR + MUR_OFF] = "not enough memory",
732};
733
734static void log_server_msg(uint64_t channel_id,
735 msgpack_sbuffer *packed)
736{
737 msgpack_unpacked unpacked;
738 msgpack_unpacked_init(&unpacked);
739 DLOGN("RPC ->ch %" PRIu64 ": ", channel_id);
740 const msgpack_unpack_return result =
741 msgpack_unpack_next(&unpacked, packed->data, packed->size, NULL);
742 switch (result) {
743 case MSGPACK_UNPACK_SUCCESS: {
744 uint64_t type = unpacked.data.via.array.ptr[0].via.u64;
745 log_lock();
746 FILE *f = open_log_file();
747 fprintf(f, type ? (type == 1 ? RES : NOT) : REQ);
748 log_msg_close(f, unpacked.data);
749 msgpack_unpacked_destroy(&unpacked);
750 break;
751 }
752 case MSGPACK_UNPACK_EXTRA_BYTES:
753 case MSGPACK_UNPACK_CONTINUE:
754 case MSGPACK_UNPACK_PARSE_ERROR:
755 case MSGPACK_UNPACK_NOMEM_ERROR: {
756 log_lock();
757 FILE *f = open_log_file();
758 fprintf(f, ERR);
759 log_msg_close(f, (msgpack_object) {
760 .type = MSGPACK_OBJECT_STR,
761 .via.str = {
762 .ptr = (char *)msgpack_error_messages[result + MUR_OFF],
763 .size = (uint32_t)strlen(
764 msgpack_error_messages[result + MUR_OFF]),
765 },
766 });
767 break;
768 }
769 }
770}
771
772static void log_client_msg(uint64_t channel_id,
773 bool is_request,
774 msgpack_object msg)
775{
776 DLOGN("RPC <-ch %" PRIu64 ": ", channel_id);
777 log_lock();
778 FILE *f = open_log_file();
779 fprintf(f, is_request ? REQ : RES);
780 log_msg_close(f, msg);
781}
782
783static void log_msg_close(FILE *f, msgpack_object msg)
784{
785 msgpack_object_print(f, msg);
786 fputc('\n', f);
787 fflush(f);
788 fclose(f);
789 log_unlock();
790}
791#endif
792