1 | // This is an open source non-commercial project. Dear PVS-Studio, please check |
2 | // it. PVS-Studio Static Code Analyzer for C, C++ and C#: http://www.viva64.com |
3 | |
4 | #include "nvim/api/private/helpers.h" |
5 | #include "nvim/api/ui.h" |
6 | #include "nvim/channel.h" |
7 | #include "nvim/eval.h" |
8 | #include "nvim/eval/encode.h" |
9 | #include "nvim/event/socket.h" |
10 | #include "nvim/fileio.h" |
11 | #include "nvim/msgpack_rpc/channel.h" |
12 | #include "nvim/msgpack_rpc/server.h" |
13 | #include "nvim/os/shell.h" |
14 | #include "nvim/path.h" |
15 | #include "nvim/ascii.h" |
16 | |
17 | static bool did_stdio = false; |
18 | PMap(uint64_t) *channels = NULL; |
19 | |
20 | /// next free id for a job or rpc channel |
21 | /// 1 is reserved for stdio channel |
22 | /// 2 is reserved for stderr channel |
23 | static uint64_t next_chan_id = CHAN_STDERR+1; |
24 | |
25 | #ifdef INCLUDE_GENERATED_DECLARATIONS |
26 | # include "channel.c.generated.h" |
27 | #endif |
28 | |
29 | /// Teardown the module |
30 | void channel_teardown(void) |
31 | { |
32 | if (!channels) { |
33 | return; |
34 | } |
35 | |
36 | Channel *channel; |
37 | |
38 | map_foreach_value(channels, channel, { |
39 | channel_close(channel->id, kChannelPartAll, NULL); |
40 | }); |
41 | } |
42 | |
43 | /// Closes a channel |
44 | /// |
45 | /// @param id The channel id |
46 | /// @return true if successful, false otherwise |
47 | bool channel_close(uint64_t id, ChannelPart part, const char **error) |
48 | { |
49 | Channel *chan; |
50 | Process *proc; |
51 | |
52 | const char *dummy; |
53 | if (!error) { |
54 | error = &dummy; |
55 | } |
56 | |
57 | if (!(chan = find_channel(id))) { |
58 | if (id < next_chan_id) { |
59 | // allow double close, even though we can't say what parts was valid. |
60 | return true; |
61 | } |
62 | *error = (const char *)e_invchan; |
63 | return false; |
64 | } |
65 | |
66 | bool close_main = false; |
67 | if (part == kChannelPartRpc || part == kChannelPartAll) { |
68 | close_main = true; |
69 | if (chan->is_rpc) { |
70 | rpc_close(chan); |
71 | } else if (part == kChannelPartRpc) { |
72 | *error = (const char *)e_invstream; |
73 | return false; |
74 | } |
75 | } else if ((part == kChannelPartStdin || part == kChannelPartStdout) |
76 | && chan->is_rpc) { |
77 | *error = (const char *)e_invstreamrpc; |
78 | return false; |
79 | } |
80 | |
81 | switch (chan->streamtype) { |
82 | case kChannelStreamSocket: |
83 | if (!close_main) { |
84 | *error = (const char *)e_invstream; |
85 | return false; |
86 | } |
87 | stream_may_close(&chan->stream.socket); |
88 | break; |
89 | |
90 | case kChannelStreamProc: |
91 | proc = (Process *)&chan->stream.proc; |
92 | if (part == kChannelPartStdin || close_main) { |
93 | stream_may_close(&proc->in); |
94 | } |
95 | if (part == kChannelPartStdout || close_main) { |
96 | stream_may_close(&proc->out); |
97 | } |
98 | if (part == kChannelPartStderr || part == kChannelPartAll) { |
99 | stream_may_close(&proc->err); |
100 | } |
101 | if (proc->type == kProcessTypePty && part == kChannelPartAll) { |
102 | pty_process_close_master(&chan->stream.pty); |
103 | } |
104 | |
105 | break; |
106 | |
107 | case kChannelStreamStdio: |
108 | if (part == kChannelPartStdin || close_main) { |
109 | stream_may_close(&chan->stream.stdio.in); |
110 | } |
111 | if (part == kChannelPartStdout || close_main) { |
112 | stream_may_close(&chan->stream.stdio.out); |
113 | } |
114 | if (part == kChannelPartStderr) { |
115 | *error = (const char *)e_invstream; |
116 | return false; |
117 | } |
118 | break; |
119 | |
120 | case kChannelStreamStderr: |
121 | if (part != kChannelPartAll && part != kChannelPartStderr) { |
122 | *error = (const char *)e_invstream; |
123 | return false; |
124 | } |
125 | if (!chan->stream.err.closed) { |
126 | chan->stream.err.closed = true; |
127 | // Don't close on exit, in case late error messages |
128 | if (!exiting) { |
129 | fclose(stderr); |
130 | } |
131 | channel_decref(chan); |
132 | } |
133 | break; |
134 | |
135 | case kChannelStreamInternal: |
136 | if (!close_main) { |
137 | *error = (const char *)e_invstream; |
138 | return false; |
139 | } |
140 | break; |
141 | |
142 | default: |
143 | abort(); |
144 | } |
145 | |
146 | return true; |
147 | } |
148 | |
149 | /// Initializes the module |
150 | void channel_init(void) |
151 | { |
152 | channels = pmap_new(uint64_t)(); |
153 | channel_alloc(kChannelStreamStderr); |
154 | rpc_init(); |
155 | } |
156 | |
157 | /// Allocates a channel. |
158 | /// |
159 | /// Channel is allocated with refcount 1, which should be decreased |
160 | /// when the underlying stream closes. |
161 | static Channel *channel_alloc(ChannelStreamType type) |
162 | { |
163 | Channel *chan = xcalloc(1, sizeof(*chan)); |
164 | if (type == kChannelStreamStdio) { |
165 | chan->id = CHAN_STDIO; |
166 | } else if (type == kChannelStreamStderr) { |
167 | chan->id = CHAN_STDERR; |
168 | } else { |
169 | chan->id = next_chan_id++; |
170 | } |
171 | chan->events = multiqueue_new_child(main_loop.events); |
172 | chan->refcount = 1; |
173 | chan->exit_status = -1; |
174 | chan->streamtype = type; |
175 | assert(chan->id <= VARNUMBER_MAX); |
176 | pmap_put(uint64_t)(channels, chan->id, chan); |
177 | return chan; |
178 | } |
179 | |
180 | void channel_create_event(Channel *chan, const char *ext_source) |
181 | { |
182 | #if MIN_LOG_LEVEL <= INFO_LOG_LEVEL |
183 | const char *source; |
184 | |
185 | if (ext_source) { |
186 | // TODO(bfredl): in a future improved traceback solution, |
187 | // external events should be included. |
188 | source = ext_source; |
189 | } else { |
190 | eval_fmt_source_name_line((char *)IObuff, sizeof(IObuff)); |
191 | source = (const char *)IObuff; |
192 | } |
193 | |
194 | assert(chan->id <= VARNUMBER_MAX); |
195 | Dictionary info = channel_info(chan->id); |
196 | typval_T tv = TV_INITIAL_VALUE; |
197 | // TODO(bfredl): do the conversion in one step. Also would be nice |
198 | // to pretty print top level dict in defined order |
199 | (void)object_to_vim(DICTIONARY_OBJ(info), &tv, NULL); |
200 | char *str = encode_tv2json(&tv, NULL); |
201 | ILOG("new channel %" PRIu64 " (%s) : %s" , chan->id, source, str); |
202 | xfree(str); |
203 | api_free_dictionary(info); |
204 | |
205 | #else |
206 | (void)ext_source; |
207 | #endif |
208 | |
209 | channel_info_changed(chan, true); |
210 | } |
211 | |
212 | void channel_incref(Channel *chan) |
213 | { |
214 | chan->refcount++; |
215 | } |
216 | |
217 | void channel_decref(Channel *chan) |
218 | { |
219 | if (!(--chan->refcount)) { |
220 | // delay free, so that libuv is done with the handles |
221 | multiqueue_put(main_loop.events, free_channel_event, 1, chan); |
222 | } |
223 | } |
224 | |
225 | void callback_reader_free(CallbackReader *reader) |
226 | { |
227 | callback_free(&reader->cb); |
228 | ga_clear(&reader->buffer); |
229 | } |
230 | |
231 | void callback_reader_start(CallbackReader *reader, const char *type) |
232 | { |
233 | ga_init(&reader->buffer, sizeof(char *), 32); |
234 | reader->type = type; |
235 | } |
236 | |
237 | static void free_channel_event(void **argv) |
238 | { |
239 | Channel *chan = argv[0]; |
240 | if (chan->is_rpc) { |
241 | rpc_free(chan); |
242 | } |
243 | |
244 | callback_reader_free(&chan->on_data); |
245 | callback_reader_free(&chan->on_stderr); |
246 | callback_free(&chan->on_exit); |
247 | |
248 | pmap_del(uint64_t)(channels, chan->id); |
249 | multiqueue_free(chan->events); |
250 | xfree(chan); |
251 | } |
252 | |
253 | static void channel_destroy_early(Channel *chan) |
254 | { |
255 | if ((chan->id != --next_chan_id)) { |
256 | abort(); |
257 | } |
258 | pmap_del(uint64_t)(channels, chan->id); |
259 | chan->id = 0; |
260 | |
261 | if ((--chan->refcount != 0)) { |
262 | abort(); |
263 | } |
264 | |
265 | // uv will keep a reference to handles until next loop tick, so delay free |
266 | multiqueue_put(main_loop.events, free_channel_event, 1, chan); |
267 | } |
268 | |
269 | |
270 | static void close_cb(Stream *stream, void *data) |
271 | { |
272 | channel_decref(data); |
273 | } |
274 | |
275 | Channel *channel_job_start(char **argv, CallbackReader on_stdout, |
276 | CallbackReader on_stderr, Callback on_exit, |
277 | bool pty, bool rpc, bool detach, const char *cwd, |
278 | uint16_t pty_width, uint16_t pty_height, |
279 | char *term_name, varnumber_T *status_out) |
280 | { |
281 | assert(cwd == NULL || os_isdir_executable(cwd)); |
282 | |
283 | Channel *chan = channel_alloc(kChannelStreamProc); |
284 | chan->on_data = on_stdout; |
285 | chan->on_stderr = on_stderr; |
286 | chan->on_exit = on_exit; |
287 | |
288 | if (pty) { |
289 | if (detach) { |
290 | EMSG2(_(e_invarg2), "terminal/pty job cannot be detached" ); |
291 | shell_free_argv(argv); |
292 | xfree(term_name); |
293 | channel_destroy_early(chan); |
294 | *status_out = 0; |
295 | return NULL; |
296 | } |
297 | chan->stream.pty = pty_process_init(&main_loop, chan); |
298 | if (pty_width > 0) { |
299 | chan->stream.pty.width = pty_width; |
300 | } |
301 | if (pty_height > 0) { |
302 | chan->stream.pty.height = pty_height; |
303 | } |
304 | if (term_name) { |
305 | chan->stream.pty.term_name = term_name; |
306 | } |
307 | } else { |
308 | chan->stream.uv = libuv_process_init(&main_loop, chan); |
309 | } |
310 | |
311 | Process *proc = (Process *)&chan->stream.proc; |
312 | proc->argv = argv; |
313 | proc->cb = channel_process_exit_cb; |
314 | proc->events = chan->events; |
315 | proc->detach = detach; |
316 | proc->cwd = cwd; |
317 | |
318 | char *cmd = xstrdup(proc->argv[0]); |
319 | bool has_out, has_err; |
320 | if (proc->type == kProcessTypePty) { |
321 | has_out = true; |
322 | has_err = false; |
323 | } else { |
324 | has_out = rpc || callback_reader_set(chan->on_data); |
325 | has_err = callback_reader_set(chan->on_stderr); |
326 | } |
327 | int status = process_spawn(proc, true, has_out, has_err); |
328 | if (status) { |
329 | EMSG3(_(e_jobspawn), os_strerror(status), cmd); |
330 | xfree(cmd); |
331 | if (proc->type == kProcessTypePty) { |
332 | xfree(chan->stream.pty.term_name); |
333 | } |
334 | channel_destroy_early(chan); |
335 | *status_out = proc->status; |
336 | return NULL; |
337 | } |
338 | xfree(cmd); |
339 | |
340 | wstream_init(&proc->in, 0); |
341 | if (has_out) { |
342 | rstream_init(&proc->out, 0); |
343 | } |
344 | |
345 | if (rpc) { |
346 | // the rpc takes over the in and out streams |
347 | rpc_start(chan); |
348 | } else { |
349 | if (has_out) { |
350 | callback_reader_start(&chan->on_data, "stdout" ); |
351 | rstream_start(&proc->out, on_channel_data, chan); |
352 | } |
353 | } |
354 | |
355 | if (has_err) { |
356 | callback_reader_start(&chan->on_stderr, "stderr" ); |
357 | rstream_init(&proc->err, 0); |
358 | rstream_start(&proc->err, on_job_stderr, chan); |
359 | } |
360 | |
361 | *status_out = (varnumber_T)chan->id; |
362 | return chan; |
363 | } |
364 | |
365 | |
366 | uint64_t channel_connect(bool tcp, const char *address, |
367 | bool rpc, CallbackReader on_output, |
368 | int timeout, const char **error) |
369 | { |
370 | Channel *channel; |
371 | |
372 | if (!tcp && rpc) { |
373 | char *path = fix_fname(address); |
374 | bool loopback = server_owns_pipe_address(path); |
375 | xfree(path); |
376 | if (loopback) { |
377 | // Create a loopback channel. This avoids deadlock if nvim connects to |
378 | // its own named pipe. |
379 | channel = channel_alloc(kChannelStreamInternal); |
380 | rpc_start(channel); |
381 | goto end; |
382 | } |
383 | } |
384 | |
385 | channel = channel_alloc(kChannelStreamSocket); |
386 | if (!socket_connect(&main_loop, &channel->stream.socket, |
387 | tcp, address, timeout, error)) { |
388 | channel_destroy_early(channel); |
389 | return 0; |
390 | } |
391 | |
392 | channel->stream.socket.internal_close_cb = close_cb; |
393 | channel->stream.socket.internal_data = channel; |
394 | wstream_init(&channel->stream.socket, 0); |
395 | rstream_init(&channel->stream.socket, 0); |
396 | |
397 | if (rpc) { |
398 | rpc_start(channel); |
399 | } else { |
400 | channel->on_data = on_output; |
401 | callback_reader_start(&channel->on_data, "data" ); |
402 | rstream_start(&channel->stream.socket, on_channel_data, channel); |
403 | } |
404 | |
405 | end: |
406 | channel_create_event(channel, address); |
407 | return channel->id; |
408 | } |
409 | |
410 | /// Creates an RPC channel from a tcp/pipe socket connection |
411 | /// |
412 | /// @param watcher The SocketWatcher ready to accept the connection |
413 | void channel_from_connection(SocketWatcher *watcher) |
414 | { |
415 | Channel *channel = channel_alloc(kChannelStreamSocket); |
416 | socket_watcher_accept(watcher, &channel->stream.socket); |
417 | channel->stream.socket.internal_close_cb = close_cb; |
418 | channel->stream.socket.internal_data = channel; |
419 | wstream_init(&channel->stream.socket, 0); |
420 | rstream_init(&channel->stream.socket, 0); |
421 | rpc_start(channel); |
422 | channel_create_event(channel, watcher->addr); |
423 | } |
424 | |
425 | /// Creates an API channel from stdin/stdout. This is used when embedding |
426 | /// Neovim |
427 | uint64_t channel_from_stdio(bool rpc, CallbackReader on_output, |
428 | const char **error) |
429 | FUNC_ATTR_NONNULL_ALL |
430 | { |
431 | if (!headless_mode && !embedded_mode) { |
432 | *error = _("can only be opened in headless mode" ); |
433 | return 0; |
434 | } |
435 | |
436 | if (did_stdio) { |
437 | *error = _("channel was already open" ); |
438 | return 0; |
439 | } |
440 | did_stdio = true; |
441 | |
442 | Channel *channel = channel_alloc(kChannelStreamStdio); |
443 | |
444 | rstream_init_fd(&main_loop, &channel->stream.stdio.in, 0, 0); |
445 | wstream_init_fd(&main_loop, &channel->stream.stdio.out, 1, 0); |
446 | |
447 | if (rpc) { |
448 | rpc_start(channel); |
449 | } else { |
450 | channel->on_data = on_output; |
451 | callback_reader_start(&channel->on_data, "stdin" ); |
452 | rstream_start(&channel->stream.stdio.in, on_channel_data, channel); |
453 | } |
454 | |
455 | return channel->id; |
456 | } |
457 | |
458 | /// @param data will be consumed |
459 | size_t channel_send(uint64_t id, char *data, size_t len, const char **error) |
460 | { |
461 | Channel *chan = find_channel(id); |
462 | if (!chan) { |
463 | EMSG(_(e_invchan)); |
464 | goto err; |
465 | } |
466 | |
467 | if (chan->streamtype == kChannelStreamStderr) { |
468 | if (chan->stream.err.closed) { |
469 | *error = _("Can't send data to closed stream" ); |
470 | goto err; |
471 | } |
472 | // unbuffered write |
473 | size_t written = fwrite(data, len, 1, stderr); |
474 | xfree(data); |
475 | return len * written; |
476 | } |
477 | |
478 | |
479 | Stream *in = channel_instream(chan); |
480 | if (in->closed) { |
481 | *error = _("Can't send data to closed stream" ); |
482 | goto err; |
483 | } |
484 | |
485 | if (chan->is_rpc) { |
486 | *error = _("Can't send raw data to rpc channel" ); |
487 | goto err; |
488 | } |
489 | |
490 | WBuffer *buf = wstream_new_buffer(data, len, 1, xfree); |
491 | return wstream_write(in, buf) ? len : 0; |
492 | |
493 | err: |
494 | xfree(data); |
495 | return 0; |
496 | } |
497 | |
498 | /// Convert binary byte array to a readfile()-style list |
499 | /// |
500 | /// @param[in] buf Array to convert. |
501 | /// @param[in] len Array length. |
502 | /// |
503 | /// @return [allocated] Converted list. |
504 | static inline list_T *buffer_to_tv_list(const char *const buf, const size_t len) |
505 | FUNC_ATTR_WARN_UNUSED_RESULT FUNC_ATTR_ALWAYS_INLINE |
506 | { |
507 | list_T *const l = tv_list_alloc(kListLenMayKnow); |
508 | // Empty buffer should be represented by [''], encode_list_write() thinks |
509 | // empty list is fine for the case. |
510 | tv_list_append_string(l, "" , 0); |
511 | if (len > 0) { |
512 | encode_list_write(l, buf, len); |
513 | } |
514 | return l; |
515 | } |
516 | |
517 | void on_channel_data(Stream *stream, RBuffer *buf, size_t count, |
518 | void *data, bool eof) |
519 | { |
520 | Channel *chan = data; |
521 | on_channel_output(stream, chan, buf, count, eof, &chan->on_data); |
522 | } |
523 | |
524 | void on_job_stderr(Stream *stream, RBuffer *buf, size_t count, |
525 | void *data, bool eof) |
526 | { |
527 | Channel *chan = data; |
528 | on_channel_output(stream, chan, buf, count, eof, &chan->on_stderr); |
529 | } |
530 | |
531 | static void on_channel_output(Stream *stream, Channel *chan, RBuffer *buf, |
532 | size_t count, bool eof, CallbackReader *reader) |
533 | { |
534 | // stub variable, to keep reading consistent with the order of events, only |
535 | // consider the count parameter. |
536 | size_t r; |
537 | char *ptr = rbuffer_read_ptr(buf, &r); |
538 | |
539 | if (eof) { |
540 | reader->eof = true; |
541 | } else { |
542 | if (chan->term) { |
543 | terminal_receive(chan->term, ptr, count); |
544 | terminal_flush_output(chan->term); |
545 | } |
546 | |
547 | rbuffer_consumed(buf, count); |
548 | |
549 | if (callback_reader_set(*reader)) { |
550 | ga_concat_len(&reader->buffer, ptr, count); |
551 | } |
552 | } |
553 | |
554 | if (callback_reader_set(*reader)) { |
555 | schedule_channel_event(chan); |
556 | } |
557 | } |
558 | |
559 | /// schedule the necessary callbacks to be invoked as a deferred event |
560 | static void schedule_channel_event(Channel *chan) |
561 | { |
562 | if (!chan->callback_scheduled) { |
563 | if (!chan->callback_busy) { |
564 | multiqueue_put(chan->events, on_channel_event, 1, chan); |
565 | channel_incref(chan); |
566 | } |
567 | chan->callback_scheduled = true; |
568 | } |
569 | } |
570 | |
571 | static void on_channel_event(void **args) |
572 | { |
573 | Channel *chan = (Channel *)args[0]; |
574 | |
575 | chan->callback_busy = true; |
576 | chan->callback_scheduled = false; |
577 | |
578 | int exit_status = chan->exit_status; |
579 | channel_reader_callbacks(chan, &chan->on_data); |
580 | channel_reader_callbacks(chan, &chan->on_stderr); |
581 | if (exit_status > -1) { |
582 | channel_callback_call(chan, NULL); |
583 | chan->exit_status = -1; |
584 | } |
585 | |
586 | chan->callback_busy = false; |
587 | if (chan->callback_scheduled) { |
588 | // further callback was deferred to avoid recursion. |
589 | multiqueue_put(chan->events, on_channel_event, 1, chan); |
590 | channel_incref(chan); |
591 | } |
592 | |
593 | channel_decref(chan); |
594 | } |
595 | |
596 | void channel_reader_callbacks(Channel *chan, CallbackReader *reader) |
597 | { |
598 | if (reader->buffered) { |
599 | if (reader->eof) { |
600 | if (reader->self) { |
601 | if (tv_dict_find(reader->self, reader->type, -1) == NULL) { |
602 | list_T *data = buffer_to_tv_list(reader->buffer.ga_data, |
603 | (size_t)reader->buffer.ga_len); |
604 | tv_dict_add_list(reader->self, reader->type, strlen(reader->type), |
605 | data); |
606 | } else { |
607 | EMSG3(_(e_streamkey), reader->type, chan->id); |
608 | } |
609 | } else { |
610 | channel_callback_call(chan, reader); |
611 | } |
612 | reader->eof = false; |
613 | } |
614 | } else { |
615 | bool is_eof = reader->eof; |
616 | if (reader->buffer.ga_len > 0) { |
617 | channel_callback_call(chan, reader); |
618 | } |
619 | // if the stream reached eof, invoke extra callback with no data |
620 | if (is_eof) { |
621 | channel_callback_call(chan, reader); |
622 | reader->eof = false; |
623 | } |
624 | } |
625 | } |
626 | |
627 | static void channel_process_exit_cb(Process *proc, int status, void *data) |
628 | { |
629 | Channel *chan = data; |
630 | if (chan->term) { |
631 | char msg[sizeof("\r\n[Process exited ]" ) + NUMBUFLEN]; |
632 | snprintf(msg, sizeof msg, "\r\n[Process exited %d]" , proc->status); |
633 | terminal_close(chan->term, msg); |
634 | } |
635 | |
636 | // If process did not exit, we only closed the handle of a detached process. |
637 | bool exited = (status >= 0); |
638 | if (exited && chan->on_exit.type != kCallbackNone) { |
639 | schedule_channel_event(chan); |
640 | chan->exit_status = status; |
641 | } |
642 | |
643 | channel_decref(chan); |
644 | } |
645 | |
646 | static void channel_callback_call(Channel *chan, CallbackReader *reader) |
647 | { |
648 | Callback *cb; |
649 | typval_T argv[4]; |
650 | |
651 | argv[0].v_type = VAR_NUMBER; |
652 | argv[0].v_lock = VAR_UNLOCKED; |
653 | argv[0].vval.v_number = (varnumber_T)chan->id; |
654 | |
655 | if (reader) { |
656 | argv[1].v_type = VAR_LIST; |
657 | argv[1].v_lock = VAR_UNLOCKED; |
658 | argv[1].vval.v_list = buffer_to_tv_list(reader->buffer.ga_data, |
659 | (size_t)reader->buffer.ga_len); |
660 | tv_list_ref(argv[1].vval.v_list); |
661 | ga_clear(&reader->buffer); |
662 | cb = &reader->cb; |
663 | argv[2].vval.v_string = (char_u *)reader->type; |
664 | } else { |
665 | argv[1].v_type = VAR_NUMBER; |
666 | argv[1].v_lock = VAR_UNLOCKED; |
667 | argv[1].vval.v_number = chan->exit_status; |
668 | cb = &chan->on_exit; |
669 | argv[2].vval.v_string = (char_u *)"exit" ; |
670 | } |
671 | |
672 | argv[2].v_type = VAR_STRING; |
673 | argv[2].v_lock = VAR_UNLOCKED; |
674 | |
675 | typval_T rettv = TV_INITIAL_VALUE; |
676 | callback_call(cb, 3, argv, &rettv); |
677 | tv_clear(&rettv); |
678 | } |
679 | |
680 | |
681 | /// Open terminal for channel |
682 | /// |
683 | /// Channel `chan` is assumed to be an open pty channel, |
684 | /// and curbuf is assumed to be a new, unmodified buffer. |
685 | void channel_terminal_open(Channel *chan) |
686 | { |
687 | TerminalOptions topts; |
688 | topts.data = chan; |
689 | topts.width = chan->stream.pty.width; |
690 | topts.height = chan->stream.pty.height; |
691 | topts.write_cb = term_write; |
692 | topts.resize_cb = term_resize; |
693 | topts.close_cb = term_close; |
694 | curbuf->b_p_channel = (long)chan->id; // 'channel' option |
695 | Terminal *term = terminal_open(topts); |
696 | chan->term = term; |
697 | channel_incref(chan); |
698 | } |
699 | |
700 | static void term_write(char *buf, size_t size, void *data) |
701 | { |
702 | Channel *chan = data; |
703 | if (chan->stream.proc.in.closed) { |
704 | // If the backing stream was closed abruptly, there may be write events |
705 | // ahead of the terminal close event. Just ignore the writes. |
706 | ILOG("write failed: stream is closed" ); |
707 | return; |
708 | } |
709 | WBuffer *wbuf = wstream_new_buffer(xmemdup(buf, size), size, 1, xfree); |
710 | wstream_write(&chan->stream.proc.in, wbuf); |
711 | } |
712 | |
713 | static void term_resize(uint16_t width, uint16_t height, void *data) |
714 | { |
715 | Channel *chan = data; |
716 | pty_process_resize(&chan->stream.pty, width, height); |
717 | } |
718 | |
719 | static inline void term_delayed_free(void **argv) |
720 | { |
721 | Channel *chan = argv[0]; |
722 | if (chan->stream.proc.in.pending_reqs || chan->stream.proc.out.pending_reqs) { |
723 | multiqueue_put(chan->events, term_delayed_free, 1, chan); |
724 | return; |
725 | } |
726 | |
727 | terminal_destroy(chan->term); |
728 | chan->term = NULL; |
729 | channel_decref(chan); |
730 | } |
731 | |
732 | static void term_close(void *data) |
733 | { |
734 | Channel *chan = data; |
735 | process_stop(&chan->stream.proc); |
736 | multiqueue_put(chan->events, term_delayed_free, 1, data); |
737 | } |
738 | |
739 | void channel_info_changed(Channel *chan, bool new) |
740 | { |
741 | event_T event = new ? EVENT_CHANOPEN : EVENT_CHANINFO; |
742 | if (has_event(event)) { |
743 | channel_incref(chan); |
744 | multiqueue_put(main_loop.events, set_info_event, |
745 | 2, chan, event); |
746 | } |
747 | } |
748 | |
749 | static void set_info_event(void **argv) |
750 | { |
751 | Channel *chan = argv[0]; |
752 | event_T event = (event_T)(ptrdiff_t)argv[1]; |
753 | |
754 | dict_T *dict = get_vim_var_dict(VV_EVENT); |
755 | Dictionary info = channel_info(chan->id); |
756 | typval_T retval; |
757 | (void)object_to_vim(DICTIONARY_OBJ(info), &retval, NULL); |
758 | tv_dict_add_dict(dict, S_LEN("info" ), retval.vval.v_dict); |
759 | |
760 | apply_autocmds(event, NULL, NULL, false, curbuf); |
761 | |
762 | tv_dict_clear(dict); |
763 | api_free_dictionary(info); |
764 | channel_decref(chan); |
765 | } |
766 | |
767 | bool channel_job_running(uint64_t id) |
768 | { |
769 | Channel *chan = find_channel(id); |
770 | return (chan |
771 | && chan->streamtype == kChannelStreamProc |
772 | && !process_is_stopped(&chan->stream.proc)); |
773 | } |
774 | |
775 | Dictionary channel_info(uint64_t id) |
776 | { |
777 | Channel *chan = find_channel(id); |
778 | if (!chan) { |
779 | return (Dictionary)ARRAY_DICT_INIT; |
780 | } |
781 | |
782 | Dictionary info = ARRAY_DICT_INIT; |
783 | PUT(info, "id" , INTEGER_OBJ((Integer)chan->id)); |
784 | |
785 | const char *stream_desc, *mode_desc; |
786 | switch (chan->streamtype) { |
787 | case kChannelStreamProc: |
788 | stream_desc = "job" ; |
789 | if (chan->stream.proc.type == kProcessTypePty) { |
790 | const char *name = pty_process_tty_name(&chan->stream.pty); |
791 | PUT(info, "pty" , STRING_OBJ(cstr_to_string(name))); |
792 | } |
793 | break; |
794 | |
795 | case kChannelStreamStdio: |
796 | stream_desc = "stdio" ; |
797 | break; |
798 | |
799 | case kChannelStreamStderr: |
800 | stream_desc = "stderr" ; |
801 | break; |
802 | |
803 | case kChannelStreamInternal: |
804 | PUT(info, "internal" , BOOLEAN_OBJ(true)); |
805 | FALLTHROUGH; |
806 | |
807 | case kChannelStreamSocket: |
808 | stream_desc = "socket" ; |
809 | break; |
810 | |
811 | default: |
812 | abort(); |
813 | } |
814 | PUT(info, "stream" , STRING_OBJ(cstr_to_string(stream_desc))); |
815 | |
816 | if (chan->is_rpc) { |
817 | mode_desc = "rpc" ; |
818 | PUT(info, "client" , DICTIONARY_OBJ(rpc_client_info(chan))); |
819 | } else if (chan->term) { |
820 | mode_desc = "terminal" ; |
821 | PUT(info, "buffer" , BUFFER_OBJ(terminal_buf(chan->term))); |
822 | } else { |
823 | mode_desc = "bytes" ; |
824 | } |
825 | PUT(info, "mode" , STRING_OBJ(cstr_to_string(mode_desc))); |
826 | |
827 | return info; |
828 | } |
829 | |
830 | Array channel_all_info(void) |
831 | { |
832 | Channel *channel; |
833 | Array ret = ARRAY_DICT_INIT; |
834 | map_foreach_value(channels, channel, { |
835 | ADD(ret, DICTIONARY_OBJ(channel_info(channel->id))); |
836 | }); |
837 | return ret; |
838 | } |
839 | |