1 | // This is an open source non-commercial project. Dear PVS-Studio, please check |
2 | // it. PVS-Studio Static Code Analyzer for C, C++ and C#: http://www.viva64.com |
3 | |
4 | #include <stdbool.h> |
5 | #include <string.h> |
6 | #include <inttypes.h> |
7 | |
8 | #include <uv.h> |
9 | #include <msgpack.h> |
10 | |
11 | #include "nvim/api/private/helpers.h" |
12 | #include "nvim/api/vim.h" |
13 | #include "nvim/api/ui.h" |
14 | #include "nvim/channel.h" |
15 | #include "nvim/msgpack_rpc/channel.h" |
16 | #include "nvim/event/loop.h" |
17 | #include "nvim/event/libuv_process.h" |
18 | #include "nvim/event/rstream.h" |
19 | #include "nvim/event/wstream.h" |
20 | #include "nvim/event/socket.h" |
21 | #include "nvim/msgpack_rpc/helpers.h" |
22 | #include "nvim/vim.h" |
23 | #include "nvim/main.h" |
24 | #include "nvim/ascii.h" |
25 | #include "nvim/memory.h" |
26 | #include "nvim/eval.h" |
27 | #include "nvim/os_unix.h" |
28 | #include "nvim/message.h" |
29 | #include "nvim/map.h" |
30 | #include "nvim/log.h" |
31 | #include "nvim/misc1.h" |
32 | #include "nvim/lib/kvec.h" |
33 | #include "nvim/os/input.h" |
34 | #include "nvim/ui.h" |
35 | |
36 | #if MIN_LOG_LEVEL > DEBUG_LOG_LEVEL |
37 | #define log_client_msg(...) |
38 | #define log_server_msg(...) |
39 | #endif |
40 | |
41 | static PMap(cstr_t) *event_strings = NULL; |
42 | static msgpack_sbuffer out_buffer; |
43 | |
44 | #ifdef INCLUDE_GENERATED_DECLARATIONS |
45 | # include "msgpack_rpc/channel.c.generated.h" |
46 | #endif |
47 | |
48 | void rpc_init(void) |
49 | { |
50 | ch_before_blocking_events = multiqueue_new_child(main_loop.events); |
51 | event_strings = pmap_new(cstr_t)(); |
52 | msgpack_sbuffer_init(&out_buffer); |
53 | } |
54 | |
55 | |
56 | void rpc_start(Channel *channel) |
57 | { |
58 | channel_incref(channel); |
59 | channel->is_rpc = true; |
60 | RpcState *rpc = &channel->rpc; |
61 | rpc->closed = false; |
62 | rpc->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE); |
63 | rpc->subscribed_events = pmap_new(cstr_t)(); |
64 | rpc->next_request_id = 1; |
65 | rpc->info = (Dictionary)ARRAY_DICT_INIT; |
66 | kv_init(rpc->call_stack); |
67 | |
68 | if (channel->streamtype != kChannelStreamInternal) { |
69 | Stream *out = channel_outstream(channel); |
70 | #if MIN_LOG_LEVEL <= DEBUG_LOG_LEVEL |
71 | Stream *in = channel_instream(channel); |
72 | DLOG("rpc ch %" PRIu64 " in-stream=%p out-stream=%p" , channel->id, |
73 | (void *)in, (void *)out); |
74 | #endif |
75 | |
76 | rstream_start(out, receive_msgpack, channel); |
77 | } |
78 | } |
79 | |
80 | |
81 | static Channel *find_rpc_channel(uint64_t id) |
82 | { |
83 | Channel *chan = find_channel(id); |
84 | if (!chan || !chan->is_rpc || chan->rpc.closed) { |
85 | return NULL; |
86 | } |
87 | return chan; |
88 | } |
89 | |
90 | /// Publishes an event to a channel. |
91 | /// |
92 | /// @param id Channel id. 0 means "broadcast to all subscribed channels" |
93 | /// @param name Event name (application-defined) |
94 | /// @param args Array of event arguments |
95 | /// @return True if the event was sent successfully, false otherwise. |
96 | bool rpc_send_event(uint64_t id, const char *name, Array args) |
97 | { |
98 | Channel *channel = NULL; |
99 | |
100 | if (id && (!(channel = find_rpc_channel(id)))) { |
101 | api_free_array(args); |
102 | return false; |
103 | } |
104 | |
105 | if (channel) { |
106 | send_event(channel, name, args); |
107 | } else { |
108 | broadcast_event(name, args); |
109 | } |
110 | |
111 | return true; |
112 | } |
113 | |
114 | /// Sends a method call to a channel |
115 | /// |
116 | /// @param id The channel id |
117 | /// @param method_name The method name, an arbitrary string |
118 | /// @param args Array with method arguments |
119 | /// @param[out] error True if the return value is an error |
120 | /// @return Whatever the remote method returned |
121 | Object rpc_send_call(uint64_t id, |
122 | const char *method_name, |
123 | Array args, |
124 | Error *err) |
125 | { |
126 | Channel *channel = NULL; |
127 | |
128 | if (!(channel = find_rpc_channel(id))) { |
129 | api_set_error(err, kErrorTypeException, "Invalid channel: %" PRIu64, id); |
130 | api_free_array(args); |
131 | return NIL; |
132 | } |
133 | |
134 | channel_incref(channel); |
135 | RpcState *rpc = &channel->rpc; |
136 | uint32_t request_id = rpc->next_request_id++; |
137 | // Send the msgpack-rpc request |
138 | send_request(channel, request_id, method_name, args); |
139 | |
140 | // Push the frame |
141 | ChannelCallFrame frame = { request_id, false, false, NIL }; |
142 | kv_push(rpc->call_stack, &frame); |
143 | LOOP_PROCESS_EVENTS_UNTIL(&main_loop, channel->events, -1, frame.returned); |
144 | (void)kv_pop(rpc->call_stack); |
145 | |
146 | if (frame.errored) { |
147 | if (frame.result.type == kObjectTypeString) { |
148 | api_set_error(err, kErrorTypeException, "%s" , |
149 | frame.result.data.string.data); |
150 | } else if (frame.result.type == kObjectTypeArray) { |
151 | // Should be an error in the form [type, message] |
152 | Array array = frame.result.data.array; |
153 | if (array.size == 2 && array.items[0].type == kObjectTypeInteger |
154 | && (array.items[0].data.integer == kErrorTypeException |
155 | || array.items[0].data.integer == kErrorTypeValidation) |
156 | && array.items[1].type == kObjectTypeString) { |
157 | api_set_error(err, (ErrorType)array.items[0].data.integer, "%s" , |
158 | array.items[1].data.string.data); |
159 | } else { |
160 | api_set_error(err, kErrorTypeException, "%s" , "unknown error" ); |
161 | } |
162 | } else { |
163 | api_set_error(err, kErrorTypeException, "%s" , "unknown error" ); |
164 | } |
165 | |
166 | api_free_object(frame.result); |
167 | } |
168 | |
169 | channel_decref(channel); |
170 | |
171 | return frame.errored ? NIL : frame.result; |
172 | } |
173 | |
174 | /// Subscribes to event broadcasts |
175 | /// |
176 | /// @param id The channel id |
177 | /// @param event The event type string |
178 | void rpc_subscribe(uint64_t id, char *event) |
179 | { |
180 | Channel *channel; |
181 | |
182 | if (!(channel = find_rpc_channel(id))) { |
183 | abort(); |
184 | } |
185 | |
186 | char *event_string = pmap_get(cstr_t)(event_strings, event); |
187 | |
188 | if (!event_string) { |
189 | event_string = xstrdup(event); |
190 | pmap_put(cstr_t)(event_strings, event_string, event_string); |
191 | } |
192 | |
193 | pmap_put(cstr_t)(channel->rpc.subscribed_events, event_string, event_string); |
194 | } |
195 | |
196 | /// Unsubscribes to event broadcasts |
197 | /// |
198 | /// @param id The channel id |
199 | /// @param event The event type string |
200 | void rpc_unsubscribe(uint64_t id, char *event) |
201 | { |
202 | Channel *channel; |
203 | |
204 | if (!(channel = find_rpc_channel(id))) { |
205 | abort(); |
206 | } |
207 | |
208 | unsubscribe(channel, event); |
209 | } |
210 | |
211 | static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c, |
212 | void *data, bool eof) |
213 | { |
214 | Channel *channel = data; |
215 | channel_incref(channel); |
216 | |
217 | if (eof) { |
218 | channel_close(channel->id, kChannelPartRpc, NULL); |
219 | char buf[256]; |
220 | snprintf(buf, sizeof(buf), "ch %" PRIu64 " was closed by the client" , |
221 | channel->id); |
222 | call_set_error(channel, buf, WARN_LOG_LEVEL); |
223 | goto end; |
224 | } |
225 | |
226 | size_t count = rbuffer_size(rbuf); |
227 | DLOG("ch %" PRIu64 ": parsing %zu bytes from msgpack Stream: %p" , |
228 | channel->id, count, (void *)stream); |
229 | |
230 | // Feed the unpacker with data |
231 | msgpack_unpacker_reserve_buffer(channel->rpc.unpacker, count); |
232 | rbuffer_read(rbuf, msgpack_unpacker_buffer(channel->rpc.unpacker), count); |
233 | msgpack_unpacker_buffer_consumed(channel->rpc.unpacker, count); |
234 | |
235 | parse_msgpack(channel); |
236 | |
237 | end: |
238 | channel_decref(channel); |
239 | } |
240 | |
241 | static void parse_msgpack(Channel *channel) |
242 | { |
243 | msgpack_unpacked unpacked; |
244 | msgpack_unpacked_init(&unpacked); |
245 | msgpack_unpack_return result; |
246 | |
247 | // Deserialize everything we can. |
248 | while ((result = msgpack_unpacker_next(channel->rpc.unpacker, &unpacked)) == |
249 | MSGPACK_UNPACK_SUCCESS) { |
250 | bool is_response = is_rpc_response(&unpacked.data); |
251 | log_client_msg(channel->id, !is_response, unpacked.data); |
252 | |
253 | if (is_response) { |
254 | if (is_valid_rpc_response(&unpacked.data, channel)) { |
255 | complete_call(&unpacked.data, channel); |
256 | } else { |
257 | char buf[256]; |
258 | snprintf(buf, sizeof(buf), |
259 | "ch %" PRIu64 " returned a response with an unknown request " |
260 | "id. Ensure the client is properly synchronized" , |
261 | channel->id); |
262 | call_set_error(channel, buf, ERROR_LOG_LEVEL); |
263 | } |
264 | msgpack_unpacked_destroy(&unpacked); |
265 | // Bail out from this event loop iteration |
266 | return; |
267 | } |
268 | |
269 | handle_request(channel, &unpacked.data); |
270 | } |
271 | |
272 | if (result == MSGPACK_UNPACK_NOMEM_ERROR) { |
273 | mch_errmsg(e_outofmem); |
274 | mch_errmsg("\n" ); |
275 | channel_decref(channel); |
276 | preserve_exit(); |
277 | } |
278 | |
279 | if (result == MSGPACK_UNPACK_PARSE_ERROR) { |
280 | // See src/msgpack/unpack_template.h in msgpack source tree for |
281 | // causes for this error(search for 'goto _failed') |
282 | // |
283 | // A not so uncommon cause for this might be deserializing objects with |
284 | // a high nesting level: msgpack will break when its internal parse stack |
285 | // size exceeds MSGPACK_EMBED_STACK_SIZE (defined as 32 by default) |
286 | send_error(channel, kMessageTypeRequest, 0, |
287 | "Invalid msgpack payload. " |
288 | "This error can also happen when deserializing " |
289 | "an object with high level of nesting" ); |
290 | } |
291 | } |
292 | |
293 | /// Handles requests and notifications received on the channel. |
294 | static void handle_request(Channel *channel, msgpack_object *request) |
295 | FUNC_ATTR_NONNULL_ALL |
296 | { |
297 | uint32_t request_id; |
298 | Error error = ERROR_INIT; |
299 | MessageType type = msgpack_rpc_validate(&request_id, request, &error); |
300 | |
301 | if (ERROR_SET(&error)) { |
302 | // Validation failed, send response with error |
303 | if (channel_write(channel, |
304 | serialize_response(channel->id, |
305 | type, |
306 | request_id, |
307 | &error, |
308 | NIL, |
309 | &out_buffer))) { |
310 | char buf[256]; |
311 | snprintf(buf, sizeof(buf), |
312 | "ch %" PRIu64 " sent an invalid message, closed." , |
313 | channel->id); |
314 | call_set_error(channel, buf, ERROR_LOG_LEVEL); |
315 | } |
316 | api_clear_error(&error); |
317 | return; |
318 | } |
319 | assert(type == kMessageTypeRequest || type == kMessageTypeNotification); |
320 | |
321 | MsgpackRpcRequestHandler handler; |
322 | msgpack_object *method = msgpack_rpc_method(request); |
323 | handler = msgpack_rpc_get_handler_for(method->via.bin.ptr, |
324 | method->via.bin.size, |
325 | &error); |
326 | |
327 | // check method arguments |
328 | Array args = ARRAY_DICT_INIT; |
329 | if (!ERROR_SET(&error) |
330 | && !msgpack_rpc_to_array(msgpack_rpc_args(request), &args)) { |
331 | api_set_error(&error, kErrorTypeException, "Invalid method arguments" ); |
332 | } |
333 | |
334 | if (ERROR_SET(&error)) { |
335 | send_error(channel, type, request_id, error.msg); |
336 | api_clear_error(&error); |
337 | api_free_array(args); |
338 | return; |
339 | } |
340 | |
341 | RequestEvent *evdata = xmalloc(sizeof(RequestEvent)); |
342 | evdata->type = type; |
343 | evdata->channel = channel; |
344 | evdata->handler = handler; |
345 | evdata->args = args; |
346 | evdata->request_id = request_id; |
347 | channel_incref(channel); |
348 | if (handler.fast) { |
349 | bool is_get_mode = handler.fn == handle_nvim_get_mode; |
350 | |
351 | if (is_get_mode && !input_blocking()) { |
352 | // Defer the event to a special queue used by os/input.c. #6247 |
353 | multiqueue_put(ch_before_blocking_events, request_event, 1, evdata); |
354 | } else { |
355 | // Invoke immediately. |
356 | request_event((void **)&evdata); |
357 | } |
358 | } else { |
359 | bool is_resize = handler.fn == handle_nvim_ui_try_resize; |
360 | if (is_resize) { |
361 | Event ev = event_create_oneshot(event_create(request_event, 1, evdata), |
362 | 2); |
363 | multiqueue_put_event(channel->events, ev); |
364 | multiqueue_put_event(resize_events, ev); |
365 | } else { |
366 | multiqueue_put(channel->events, request_event, 1, evdata); |
367 | DLOG("RPC: scheduled %.*s" , method->via.bin.size, method->via.bin.ptr); |
368 | } |
369 | } |
370 | } |
371 | |
372 | |
373 | /// Handles a message, depending on the type: |
374 | /// - Request: invokes method and writes the response (or error). |
375 | /// - Notification: invokes method (emits `nvim_error_event` on error). |
376 | static void request_event(void **argv) |
377 | { |
378 | RequestEvent *e = argv[0]; |
379 | Channel *channel = e->channel; |
380 | MsgpackRpcRequestHandler handler = e->handler; |
381 | Error error = ERROR_INIT; |
382 | Object result = handler.fn(channel->id, e->args, &error); |
383 | if (e->type == kMessageTypeRequest || ERROR_SET(&error)) { |
384 | // Send the response. |
385 | msgpack_packer response; |
386 | msgpack_packer_init(&response, &out_buffer, msgpack_sbuffer_write); |
387 | channel_write(channel, serialize_response(channel->id, |
388 | e->type, |
389 | e->request_id, |
390 | &error, |
391 | result, |
392 | &out_buffer)); |
393 | } else { |
394 | api_free_object(result); |
395 | } |
396 | api_free_array(e->args); |
397 | channel_decref(channel); |
398 | xfree(e); |
399 | api_clear_error(&error); |
400 | } |
401 | |
402 | static bool channel_write(Channel *channel, WBuffer *buffer) |
403 | { |
404 | bool success; |
405 | |
406 | if (channel->rpc.closed) { |
407 | wstream_release_wbuffer(buffer); |
408 | return false; |
409 | } |
410 | |
411 | if (channel->streamtype == kChannelStreamInternal) { |
412 | channel_incref(channel); |
413 | CREATE_EVENT(channel->events, internal_read_event, 2, channel, buffer); |
414 | success = true; |
415 | } else { |
416 | Stream *in = channel_instream(channel); |
417 | success = wstream_write(in, buffer); |
418 | } |
419 | |
420 | |
421 | if (!success) { |
422 | // If the write failed for any reason, close the channel |
423 | char buf[256]; |
424 | snprintf(buf, |
425 | sizeof(buf), |
426 | "ch %" PRIu64 ": stream write failed. " |
427 | "RPC canceled; closing channel" , |
428 | channel->id); |
429 | call_set_error(channel, buf, ERROR_LOG_LEVEL); |
430 | } |
431 | |
432 | return success; |
433 | } |
434 | |
435 | static void internal_read_event(void **argv) |
436 | { |
437 | Channel *channel = argv[0]; |
438 | WBuffer *buffer = argv[1]; |
439 | |
440 | msgpack_unpacker_reserve_buffer(channel->rpc.unpacker, buffer->size); |
441 | memcpy(msgpack_unpacker_buffer(channel->rpc.unpacker), |
442 | buffer->data, buffer->size); |
443 | msgpack_unpacker_buffer_consumed(channel->rpc.unpacker, buffer->size); |
444 | |
445 | parse_msgpack(channel); |
446 | |
447 | channel_decref(channel); |
448 | wstream_release_wbuffer(buffer); |
449 | } |
450 | |
451 | static void send_error(Channel *chan, MessageType type, uint32_t id, char *err) |
452 | { |
453 | Error e = ERROR_INIT; |
454 | api_set_error(&e, kErrorTypeException, "%s" , err); |
455 | channel_write(chan, serialize_response(chan->id, |
456 | type, |
457 | id, |
458 | &e, |
459 | NIL, |
460 | &out_buffer)); |
461 | api_clear_error(&e); |
462 | } |
463 | |
464 | static void send_request(Channel *channel, |
465 | uint32_t id, |
466 | const char *name, |
467 | Array args) |
468 | { |
469 | const String method = cstr_as_string((char *)name); |
470 | channel_write(channel, serialize_request(channel->id, |
471 | id, |
472 | method, |
473 | args, |
474 | &out_buffer, |
475 | 1)); |
476 | } |
477 | |
478 | static void send_event(Channel *channel, |
479 | const char *name, |
480 | Array args) |
481 | { |
482 | const String method = cstr_as_string((char *)name); |
483 | channel_write(channel, serialize_request(channel->id, |
484 | 0, |
485 | method, |
486 | args, |
487 | &out_buffer, |
488 | 1)); |
489 | } |
490 | |
491 | static void broadcast_event(const char *name, Array args) |
492 | { |
493 | kvec_t(Channel *) subscribed = KV_INITIAL_VALUE; |
494 | Channel *channel; |
495 | |
496 | map_foreach_value(channels, channel, { |
497 | if (channel->is_rpc |
498 | && pmap_has(cstr_t)(channel->rpc.subscribed_events, name)) { |
499 | kv_push(subscribed, channel); |
500 | } |
501 | }); |
502 | |
503 | if (!kv_size(subscribed)) { |
504 | api_free_array(args); |
505 | goto end; |
506 | } |
507 | |
508 | const String method = cstr_as_string((char *)name); |
509 | WBuffer *buffer = serialize_request(0, |
510 | 0, |
511 | method, |
512 | args, |
513 | &out_buffer, |
514 | kv_size(subscribed)); |
515 | |
516 | for (size_t i = 0; i < kv_size(subscribed); i++) { |
517 | Channel *c = kv_A(subscribed, i); |
518 | channel_write(c, buffer); |
519 | } |
520 | |
521 | end: |
522 | kv_destroy(subscribed); |
523 | } |
524 | |
525 | static void unsubscribe(Channel *channel, char *event) |
526 | { |
527 | char *event_string = pmap_get(cstr_t)(event_strings, event); |
528 | if (!event_string) { |
529 | WLOG("RPC: ch %" PRIu64 ": tried to unsubscribe unknown event '%s'" , |
530 | channel->id, event); |
531 | return; |
532 | } |
533 | pmap_del(cstr_t)(channel->rpc.subscribed_events, event_string); |
534 | |
535 | map_foreach_value(channels, channel, { |
536 | if (channel->is_rpc |
537 | && pmap_has(cstr_t)(channel->rpc.subscribed_events, event_string)) { |
538 | return; |
539 | } |
540 | }); |
541 | |
542 | // Since the string is no longer used by other channels, release it's memory |
543 | pmap_del(cstr_t)(event_strings, event_string); |
544 | xfree(event_string); |
545 | } |
546 | |
547 | |
548 | /// Mark rpc state as closed, and release its reference to the channel. |
549 | /// Don't call this directly, call channel_close(id, kChannelPartRpc, &error) |
550 | void rpc_close(Channel *channel) |
551 | { |
552 | if (channel->rpc.closed) { |
553 | return; |
554 | } |
555 | |
556 | channel->rpc.closed = true; |
557 | channel_decref(channel); |
558 | |
559 | if (channel->streamtype == kChannelStreamStdio) { |
560 | multiqueue_put(main_loop.fast_events, exit_event, 0); |
561 | } |
562 | } |
563 | |
564 | static void exit_event(void **argv) |
565 | { |
566 | if (!exiting) { |
567 | mch_exit(0); |
568 | } |
569 | } |
570 | |
571 | void rpc_free(Channel *channel) |
572 | { |
573 | remote_ui_disconnect(channel->id); |
574 | msgpack_unpacker_free(channel->rpc.unpacker); |
575 | |
576 | // Unsubscribe from all events |
577 | char *event_string; |
578 | map_foreach_value(channel->rpc.subscribed_events, event_string, { |
579 | unsubscribe(channel, event_string); |
580 | }); |
581 | |
582 | pmap_free(cstr_t)(channel->rpc.subscribed_events); |
583 | kv_destroy(channel->rpc.call_stack); |
584 | api_free_dictionary(channel->rpc.info); |
585 | } |
586 | |
587 | static bool is_rpc_response(msgpack_object *obj) |
588 | { |
589 | return obj->type == MSGPACK_OBJECT_ARRAY |
590 | && obj->via.array.size == 4 |
591 | && obj->via.array.ptr[0].type == MSGPACK_OBJECT_POSITIVE_INTEGER |
592 | && obj->via.array.ptr[0].via.u64 == 1 |
593 | && obj->via.array.ptr[1].type == MSGPACK_OBJECT_POSITIVE_INTEGER; |
594 | } |
595 | |
596 | static bool is_valid_rpc_response(msgpack_object *obj, Channel *channel) |
597 | { |
598 | uint32_t response_id = (uint32_t)obj->via.array.ptr[1].via.u64; |
599 | if (kv_size(channel->rpc.call_stack) == 0) { |
600 | return false; |
601 | } |
602 | |
603 | // Must be equal to the frame at the stack's bottom |
604 | ChannelCallFrame *frame = kv_last(channel->rpc.call_stack); |
605 | return response_id == frame->request_id; |
606 | } |
607 | |
608 | static void complete_call(msgpack_object *obj, Channel *channel) |
609 | { |
610 | ChannelCallFrame *frame = kv_last(channel->rpc.call_stack); |
611 | frame->returned = true; |
612 | frame->errored = obj->via.array.ptr[2].type != MSGPACK_OBJECT_NIL; |
613 | |
614 | if (frame->errored) { |
615 | msgpack_rpc_to_object(&obj->via.array.ptr[2], &frame->result); |
616 | } else { |
617 | msgpack_rpc_to_object(&obj->via.array.ptr[3], &frame->result); |
618 | } |
619 | } |
620 | |
621 | static void call_set_error(Channel *channel, char *msg, int loglevel) |
622 | { |
623 | LOG(loglevel, "RPC: %s" , msg); |
624 | for (size_t i = 0; i < kv_size(channel->rpc.call_stack); i++) { |
625 | ChannelCallFrame *frame = kv_A(channel->rpc.call_stack, i); |
626 | frame->returned = true; |
627 | frame->errored = true; |
628 | api_free_object(frame->result); |
629 | frame->result = STRING_OBJ(cstr_to_string(msg)); |
630 | } |
631 | |
632 | channel_close(channel->id, kChannelPartRpc, NULL); |
633 | } |
634 | |
635 | static WBuffer *serialize_request(uint64_t channel_id, |
636 | uint32_t request_id, |
637 | const String method, |
638 | Array args, |
639 | msgpack_sbuffer *sbuffer, |
640 | size_t refcount) |
641 | { |
642 | msgpack_packer pac; |
643 | msgpack_packer_init(&pac, sbuffer, msgpack_sbuffer_write); |
644 | msgpack_rpc_serialize_request(request_id, method, args, &pac); |
645 | log_server_msg(channel_id, sbuffer); |
646 | WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size), |
647 | sbuffer->size, |
648 | refcount, |
649 | xfree); |
650 | msgpack_sbuffer_clear(sbuffer); |
651 | api_free_array(args); |
652 | return rv; |
653 | } |
654 | |
655 | static WBuffer *serialize_response(uint64_t channel_id, |
656 | MessageType type, |
657 | uint32_t response_id, |
658 | Error *err, |
659 | Object arg, |
660 | msgpack_sbuffer *sbuffer) |
661 | { |
662 | msgpack_packer pac; |
663 | msgpack_packer_init(&pac, sbuffer, msgpack_sbuffer_write); |
664 | if (ERROR_SET(err) && type == kMessageTypeNotification) { |
665 | Array args = ARRAY_DICT_INIT; |
666 | ADD(args, INTEGER_OBJ(err->type)); |
667 | ADD(args, STRING_OBJ(cstr_to_string(err->msg))); |
668 | msgpack_rpc_serialize_request(0, cstr_as_string("nvim_error_event" ), |
669 | args, &pac); |
670 | api_free_array(args); |
671 | } else { |
672 | msgpack_rpc_serialize_response(response_id, err, arg, &pac); |
673 | } |
674 | log_server_msg(channel_id, sbuffer); |
675 | WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size), |
676 | sbuffer->size, |
677 | 1, // responses only go though 1 channel |
678 | xfree); |
679 | msgpack_sbuffer_clear(sbuffer); |
680 | api_free_object(arg); |
681 | return rv; |
682 | } |
683 | |
684 | void rpc_set_client_info(uint64_t id, Dictionary info) |
685 | { |
686 | Channel *chan = find_rpc_channel(id); |
687 | if (!chan) { |
688 | abort(); |
689 | } |
690 | |
691 | api_free_dictionary(chan->rpc.info); |
692 | chan->rpc.info = info; |
693 | channel_info_changed(chan, false); |
694 | } |
695 | |
696 | Dictionary rpc_client_info(Channel *chan) |
697 | { |
698 | return copy_dictionary(chan->rpc.info); |
699 | } |
700 | |
701 | const char *rpc_client_name(Channel *chan) |
702 | { |
703 | if (!chan->is_rpc) { |
704 | return NULL; |
705 | } |
706 | Dictionary info = chan->rpc.info; |
707 | for (size_t i = 0; i < info.size; i++) { |
708 | if (strequal("name" , info.items[i].key.data) |
709 | && info.items[i].value.type == kObjectTypeString) { |
710 | return info.items[i].value.data.string.data; |
711 | } |
712 | } |
713 | |
714 | return NULL; |
715 | } |
716 | |
717 | #if MIN_LOG_LEVEL <= DEBUG_LOG_LEVEL |
718 | #define REQ "[request] " |
719 | #define RES "[response] " |
720 | #define NOT "[notify] " |
721 | #define ERR "[error] " |
722 | |
723 | // Cannot define array with negative offsets, so this one is needed to be added |
724 | // to MSGPACK_UNPACK_\* values. |
725 | #define MUR_OFF 2 |
726 | |
727 | static const char *const msgpack_error_messages[] = { |
728 | [MSGPACK_UNPACK_EXTRA_BYTES + MUR_OFF] = "extra bytes found" , |
729 | [MSGPACK_UNPACK_CONTINUE + MUR_OFF] = "incomplete string" , |
730 | [MSGPACK_UNPACK_PARSE_ERROR + MUR_OFF] = "parse error" , |
731 | [MSGPACK_UNPACK_NOMEM_ERROR + MUR_OFF] = "not enough memory" , |
732 | }; |
733 | |
734 | static void log_server_msg(uint64_t channel_id, |
735 | msgpack_sbuffer *packed) |
736 | { |
737 | msgpack_unpacked unpacked; |
738 | msgpack_unpacked_init(&unpacked); |
739 | DLOGN("RPC ->ch %" PRIu64 ": " , channel_id); |
740 | const msgpack_unpack_return result = |
741 | msgpack_unpack_next(&unpacked, packed->data, packed->size, NULL); |
742 | switch (result) { |
743 | case MSGPACK_UNPACK_SUCCESS: { |
744 | uint64_t type = unpacked.data.via.array.ptr[0].via.u64; |
745 | log_lock(); |
746 | FILE *f = open_log_file(); |
747 | fprintf(f, type ? (type == 1 ? RES : NOT) : REQ); |
748 | log_msg_close(f, unpacked.data); |
749 | msgpack_unpacked_destroy(&unpacked); |
750 | break; |
751 | } |
752 | case MSGPACK_UNPACK_EXTRA_BYTES: |
753 | case MSGPACK_UNPACK_CONTINUE: |
754 | case MSGPACK_UNPACK_PARSE_ERROR: |
755 | case MSGPACK_UNPACK_NOMEM_ERROR: { |
756 | log_lock(); |
757 | FILE *f = open_log_file(); |
758 | fprintf(f, ERR); |
759 | log_msg_close(f, (msgpack_object) { |
760 | .type = MSGPACK_OBJECT_STR, |
761 | .via.str = { |
762 | .ptr = (char *)msgpack_error_messages[result + MUR_OFF], |
763 | .size = (uint32_t)strlen( |
764 | msgpack_error_messages[result + MUR_OFF]), |
765 | }, |
766 | }); |
767 | break; |
768 | } |
769 | } |
770 | } |
771 | |
772 | static void log_client_msg(uint64_t channel_id, |
773 | bool is_request, |
774 | msgpack_object msg) |
775 | { |
776 | DLOGN("RPC <-ch %" PRIu64 ": " , channel_id); |
777 | log_lock(); |
778 | FILE *f = open_log_file(); |
779 | fprintf(f, is_request ? REQ : RES); |
780 | log_msg_close(f, msg); |
781 | } |
782 | |
783 | static void log_msg_close(FILE *f, msgpack_object msg) |
784 | { |
785 | msgpack_object_print(f, msg); |
786 | fputc('\n', f); |
787 | fflush(f); |
788 | fclose(f); |
789 | log_unlock(); |
790 | } |
791 | #endif |
792 | |