1#include "TestHint.h"
2#include "ConnectionParameters.h"
3
4#include <port/unistd.h>
5#include <stdlib.h>
6#include <fcntl.h>
7#include <signal.h>
8#include <map>
9#include <iostream>
10#include <fstream>
11#include <iomanip>
12#include <unordered_set>
13#include <algorithm>
14#include <optional>
15#include <ext/scope_guard.h>
16#include <boost/program_options.hpp>
17#include <boost/algorithm/string/replace.hpp>
18#include <Poco/String.h>
19#include <Poco/File.h>
20#include <Poco/Util/Application.h>
21#include <common/readline_use.h>
22#include <common/find_symbols.h>
23#include <Common/ClickHouseRevision.h>
24#include <Common/Stopwatch.h>
25#include <Common/Exception.h>
26#include <Common/ShellCommand.h>
27#include <Common/UnicodeBar.h>
28#include <Common/formatReadable.h>
29#include <Common/NetException.h>
30#include <Common/Throttler.h>
31#include <Common/StringUtils/StringUtils.h>
32#include <Common/typeid_cast.h>
33#include <Common/Config/ConfigProcessor.h>
34#include <Common/config_version.h>
35#include <Core/Types.h>
36#include <Core/QueryProcessingStage.h>
37#include <Core/ExternalTable.h>
38#include <IO/ReadBufferFromFileDescriptor.h>
39#include <IO/WriteBufferFromFileDescriptor.h>
40#include <IO/WriteBufferFromFile.h>
41#include <IO/ReadBufferFromMemory.h>
42#include <IO/ReadBufferFromString.h>
43#include <IO/ReadHelpers.h>
44#include <IO/WriteHelpers.h>
45#include <IO/Operators.h>
46#include <IO/UseSSL.h>
47#include <DataStreams/AsynchronousBlockInputStream.h>
48#include <DataStreams/AddingDefaultsBlockInputStream.h>
49#include <DataStreams/InternalTextLogsRowOutputStream.h>
50#include <Parsers/ParserQuery.h>
51#include <Parsers/ASTSetQuery.h>
52#include <Parsers/ASTUseQuery.h>
53#include <Parsers/ASTInsertQuery.h>
54#include <Parsers/ASTSelectWithUnionQuery.h>
55#include <Parsers/ASTQueryWithOutput.h>
56#include <Parsers/ASTLiteral.h>
57#include <Parsers/ASTIdentifier.h>
58#include <Parsers/formatAST.h>
59#include <Parsers/parseQuery.h>
60#include <Interpreters/Context.h>
61#include <Interpreters/InterpreterSetQuery.h>
62#include <Interpreters/ReplaceQueryParameterVisitor.h>
63#include <Client/Connection.h>
64#include <Common/InterruptListener.h>
65#include <Functions/registerFunctions.h>
66#include <AggregateFunctions/registerAggregateFunctions.h>
67#include <Common/Config/configReadClient.h>
68#include <Storages/ColumnsDescription.h>
69#include <common/argsToConfig.h>
70#include <Common/TerminalSize.h>
71
72#if USE_READLINE
73#include "Suggest.h"
74#endif
75
76#ifndef __clang__
77#pragma GCC optimize("-fno-var-tracking-assignments")
78#endif
79
80/// http://en.wikipedia.org/wiki/ANSI_escape_code
81
82/// Similar codes \e[s, \e[u don't work in VT100 and Mosh.
83#define SAVE_CURSOR_POSITION "\033""7"
84#define RESTORE_CURSOR_POSITION "\033""8"
85
86#define CLEAR_TO_END_OF_LINE "\033[K"
87
88/// This codes are possibly not supported everywhere.
89#define DISABLE_LINE_WRAPPING "\033[?7l"
90#define ENABLE_LINE_WRAPPING "\033[?7h"
91
92#if USE_READLINE && RL_VERSION_MAJOR >= 7
93
94#define BRACK_PASTE_PREF "\033[200~"
95#define BRACK_PASTE_SUFF "\033[201~"
96
97#define BRACK_PASTE_LAST '~'
98#define BRACK_PASTE_SLEN 6
99
100/// This handler bypasses some unused macro/event checkings.
101static int clickhouse_rl_bracketed_paste_begin(int /* count */, int /* key */)
102{
103 std::string buf;
104 buf.reserve(128);
105
106 RL_SETSTATE(RL_STATE_MOREINPUT);
107 SCOPE_EXIT(RL_UNSETSTATE(RL_STATE_MOREINPUT));
108 int c;
109 while ((c = rl_read_key()) >= 0)
110 {
111 if (c == '\r')
112 c = '\n';
113 buf.push_back(c);
114 if (buf.size() >= BRACK_PASTE_SLEN && c == BRACK_PASTE_LAST && buf.substr(buf.size() - BRACK_PASTE_SLEN) == BRACK_PASTE_SUFF)
115 {
116 buf.resize(buf.size() - BRACK_PASTE_SLEN);
117 break;
118 }
119 }
120 return static_cast<size_t>(rl_insert_text(buf.c_str())) == buf.size() ? 0 : 1;
121}
122
123#endif
124
125namespace DB
126{
127
128namespace ErrorCodes
129{
130 extern const int NETWORK_ERROR;
131 extern const int NO_DATA_TO_INSERT;
132 extern const int BAD_ARGUMENTS;
133 extern const int CANNOT_READ_HISTORY;
134 extern const int CANNOT_APPEND_HISTORY;
135 extern const int UNKNOWN_PACKET_FROM_SERVER;
136 extern const int UNEXPECTED_PACKET_FROM_SERVER;
137 extern const int CLIENT_OUTPUT_FORMAT_SPECIFIED;
138 extern const int LOGICAL_ERROR;
139 extern const int CANNOT_SET_SIGNAL_HANDLER;
140 extern const int CANNOT_READLINE;
141 extern const int SYSTEM_ERROR;
142 extern const int INVALID_USAGE_OF_INPUT;
143}
144
145
146class Client : public Poco::Util::Application
147{
148public:
149 Client() {}
150
151private:
152 using StringSet = std::unordered_set<String>;
153 StringSet exit_strings
154 {
155 "exit", "quit", "logout",
156 "учше", "йгше", "дщпщге",
157 "exit;", "quit;", "logout;",
158 "учшеж", "йгшеж", "дщпщгеж",
159 "q", "й", "\\q", "\\Q", "\\й", "\\Й", ":q", "Жй"
160 };
161 bool is_interactive = true; /// Use either readline interface or batch mode.
162 bool need_render_progress = true; /// Render query execution progress.
163 bool echo_queries = false; /// Print queries before execution in batch mode.
164 bool ignore_error = false; /// In case of errors, don't print error message, continue to next query. Only applicable for non-interactive mode.
165 bool print_time_to_stderr = false; /// Output execution time to stderr in batch mode.
166 bool stdin_is_not_tty = false; /// stdin is not a terminal.
167
168 uint16_t terminal_width = 0; /// Terminal width is needed to render progress bar.
169
170 std::unique_ptr<Connection> connection; /// Connection to DB.
171 String query_id; /// Current query_id.
172 String query; /// Current query.
173
174 String format; /// Query results output format.
175 bool is_default_format = true; /// false, if format is set in the config or command line.
176 size_t format_max_block_size = 0; /// Max block size for console output.
177 String insert_format; /// Format of INSERT data that is read from stdin in batch mode.
178 size_t insert_format_max_block_size = 0; /// Max block size when reading INSERT data.
179 size_t max_client_network_bandwidth = 0; /// The maximum speed of data exchange over the network for the client in bytes per second.
180
181 bool has_vertical_output_suffix = false; /// Is \G present at the end of the query string?
182
183 Context context = Context::createGlobal();
184
185 /// Buffer that reads from stdin in batch mode.
186 ReadBufferFromFileDescriptor std_in {STDIN_FILENO};
187
188 /// Console output.
189 WriteBufferFromFileDescriptor std_out {STDOUT_FILENO};
190 std::unique_ptr<ShellCommand> pager_cmd;
191 /// The user can specify to redirect query output to a file.
192 std::optional<WriteBufferFromFile> out_file_buf;
193 BlockOutputStreamPtr block_out_stream;
194
195 /// The user could specify special file for server logs (stderr by default)
196 std::unique_ptr<WriteBuffer> out_logs_buf;
197 String server_logs_file;
198 BlockOutputStreamPtr logs_out_stream;
199
200 String home_path;
201
202 String current_profile;
203
204 String prompt_by_server_display_name;
205
206 /// Path to a file containing command history.
207 String history_file;
208
209 /// How many rows have been read or written.
210 size_t processed_rows = 0;
211
212 /// Parsed query. Is used to determine some settings (e.g. format, output file).
213 ASTPtr parsed_query;
214
215 /// The last exception that was received from the server. Is used for the return code in batch mode.
216 std::unique_ptr<Exception> last_exception;
217
218 /// If the last query resulted in exception.
219 bool got_exception = false;
220 int expected_server_error = 0;
221 int expected_client_error = 0;
222 int actual_server_error = 0;
223 int actual_client_error = 0;
224
225 UInt64 server_revision = 0;
226 String server_version;
227 String server_display_name;
228
229 Stopwatch watch;
230
231 /// The server periodically sends information about how much data was read since last time.
232 Progress progress;
233 bool show_progress_bar = false;
234
235 size_t written_progress_chars = 0;
236 bool written_first_block = false;
237
238 /// External tables info.
239 std::list<ExternalTable> external_tables;
240
241 /// Dictionary with query parameters for prepared statements.
242 NameToNameMap query_parameters;
243
244 ConnectionParameters connection_parameters;
245
246 void initialize(Poco::Util::Application & self)
247 {
248 Poco::Util::Application::initialize(self);
249
250 const char * home_path_cstr = getenv("HOME");
251 if (home_path_cstr)
252 home_path = home_path_cstr;
253
254 configReadClient(config(), home_path);
255
256 context.makeGlobalContext();
257 context.setApplicationType(Context::ApplicationType::CLIENT);
258 context.setQueryParameters(query_parameters);
259
260 /// settings and limits could be specified in config file, but passed settings has higher priority
261 for (auto && setting : context.getSettingsRef())
262 {
263 const String & name = setting.getName().toString();
264 if (config().has(name) && !setting.isChanged())
265 setting.setValue(config().getString(name));
266 }
267
268 /// Set path for format schema files
269 if (config().has("format_schema_path"))
270 context.setFormatSchemaPath(Poco::Path(config().getString("format_schema_path")).toString());
271 }
272
273
274 int main(const std::vector<std::string> & /*args*/)
275 {
276 try
277 {
278 return mainImpl();
279 }
280 catch (const Exception & e)
281 {
282 bool print_stack_trace = config().getBool("stacktrace", false);
283
284 std::string text = e.displayText();
285
286 /** If exception is received from server, then stack trace is embedded in message.
287 * If exception is thrown on client, then stack trace is in separate field.
288 */
289
290 auto embedded_stack_trace_pos = text.find("Stack trace");
291 if (std::string::npos != embedded_stack_trace_pos && !print_stack_trace)
292 text.resize(embedded_stack_trace_pos);
293
294 std::cerr << "Code: " << e.code() << ". " << text << std::endl << std::endl;
295
296 /// Don't print the stack trace on the client if it was logged on the server.
297 /// Also don't print the stack trace in case of network errors.
298 if (print_stack_trace
299 && e.code() != ErrorCodes::NETWORK_ERROR
300 && std::string::npos == embedded_stack_trace_pos)
301 {
302 std::cerr << "Stack trace:" << std::endl
303 << e.getStackTrace().toString();
304 }
305
306 /// If exception code isn't zero, we should return non-zero return code anyway.
307 return e.code() ? e.code() : -1;
308 }
309 catch (...)
310 {
311 std::cerr << getCurrentExceptionMessage(false) << std::endl;
312 return getCurrentExceptionCode();
313 }
314 }
315
316 /// Should we celebrate a bit?
317 bool isNewYearMode()
318 {
319 time_t current_time = time(nullptr);
320
321 /// It's bad to be intrusive.
322 if (current_time % 3 != 0)
323 return false;
324
325 LocalDate now(current_time);
326 return (now.month() == 12 && now.day() >= 20)
327 || (now.month() == 1 && now.day() <= 5);
328 }
329
330 int mainImpl()
331 {
332 UseSSL use_ssl;
333
334 registerFunctions();
335 registerAggregateFunctions();
336
337 /// Batch mode is enabled if one of the following is true:
338 /// - -e (--query) command line option is present.
339 /// The value of the option is used as the text of query (or of multiple queries).
340 /// If stdin is not a terminal, INSERT data for the first query is read from it.
341 /// - stdin is not a terminal. In this case queries are read from it.
342 if (stdin_is_not_tty || config().has("query"))
343 is_interactive = false;
344
345 std::cout << std::fixed << std::setprecision(3);
346 std::cerr << std::fixed << std::setprecision(3);
347
348 if (is_interactive)
349 showClientVersion();
350
351 is_default_format = !config().has("vertical") && !config().has("format");
352 if (config().has("vertical"))
353 format = config().getString("format", "Vertical");
354 else
355 format = config().getString("format", is_interactive ? "PrettyCompact" : "TabSeparated");
356
357 format_max_block_size = config().getInt("format_max_block_size", context.getSettingsRef().max_block_size);
358
359 insert_format = "Values";
360
361 /// Setting value from cmd arg overrides one from config
362 if (context.getSettingsRef().max_insert_block_size.changed)
363 insert_format_max_block_size = context.getSettingsRef().max_insert_block_size;
364 else
365 insert_format_max_block_size = config().getInt("insert_format_max_block_size", context.getSettingsRef().max_insert_block_size);
366
367 if (!is_interactive)
368 {
369 need_render_progress = config().getBool("progress", false);
370 echo_queries = config().getBool("echo", false);
371 ignore_error = config().getBool("ignore-error", false);
372 }
373
374 connect();
375
376 /// Initialize DateLUT here to avoid counting time spent here as query execution time.
377 DateLUT::instance();
378 if (!context.getSettingsRef().use_client_time_zone)
379 {
380 const auto & time_zone = connection->getServerTimezone(connection_parameters.timeouts);
381 if (!time_zone.empty())
382 {
383 try
384 {
385 DateLUT::setDefaultTimezone(time_zone);
386 }
387 catch (...)
388 {
389 std::cerr << "Warning: could not switch to server time zone: " << time_zone
390 << ", reason: " << getCurrentExceptionMessage(/* with_stacktrace = */ false) << std::endl
391 << "Proceeding with local time zone."
392 << std::endl << std::endl;
393 }
394 }
395 else
396 {
397 std::cerr << "Warning: could not determine server time zone. "
398 << "Proceeding with local time zone."
399 << std::endl << std::endl;
400 }
401 }
402
403 Strings keys;
404
405 prompt_by_server_display_name = config().getRawString("prompt_by_server_display_name.default", "{display_name} :) ");
406
407 config().keys("prompt_by_server_display_name", keys);
408
409 for (const String & key : keys)
410 {
411 if (key != "default" && server_display_name.find(key) != std::string::npos)
412 {
413 prompt_by_server_display_name = config().getRawString("prompt_by_server_display_name." + key);
414 break;
415 }
416 }
417
418 /// Prompt may contain escape sequences including \e[ or \x1b[ sequences to set terminal color.
419 {
420 String unescaped_prompt_by_server_display_name;
421 ReadBufferFromString in(prompt_by_server_display_name);
422 readEscapedString(unescaped_prompt_by_server_display_name, in);
423 prompt_by_server_display_name = std::move(unescaped_prompt_by_server_display_name);
424 }
425
426 /// Prompt may contain the following substitutions in a form of {name}.
427 std::map<String, String> prompt_substitutions
428 {
429 {"host", connection_parameters.host},
430 {"port", toString(connection_parameters.port)},
431 {"user", connection_parameters.user},
432 {"display_name", server_display_name},
433 };
434
435 /// Quite suboptimal.
436 for (const auto & [key, value]: prompt_substitutions)
437 boost::replace_all(prompt_by_server_display_name, "{" + key + "}", value);
438
439 if (is_interactive)
440 {
441 if (!query_id.empty())
442 throw Exception("query_id could be specified only in non-interactive mode", ErrorCodes::BAD_ARGUMENTS);
443 if (print_time_to_stderr)
444 throw Exception("time option could be specified only in non-interactive mode", ErrorCodes::BAD_ARGUMENTS);
445
446#if USE_READLINE
447 SCOPE_EXIT({ Suggest::instance().finalize(); });
448 if (server_revision >= Suggest::MIN_SERVER_REVISION
449 && !config().getBool("disable_suggestion", false))
450 {
451 /// Load suggestion data from the server.
452 Suggest::instance().load(connection_parameters, config().getInt("suggestion_limit"));
453
454 /// Added '.' to the default list. Because it is used to separate database and table.
455 rl_basic_word_break_characters = " \t\n\r\"\\'`@$><=;|&{(.";
456
457 /// Not append whitespace after single suggestion. Because whitespace after function name is meaningless.
458 rl_completion_append_character = '\0';
459
460 rl_completion_entry_function = Suggest::generator;
461 }
462 else
463 /// Turn tab completion off.
464 rl_bind_key('\t', rl_insert);
465#endif
466 /// Load command history if present.
467 if (config().has("history_file"))
468 history_file = config().getString("history_file");
469 else
470 {
471 auto history_file_from_env = getenv("CLICKHOUSE_HISTORY_FILE");
472 if (history_file_from_env)
473 history_file = history_file_from_env;
474 else if (!home_path.empty())
475 history_file = home_path + "/.clickhouse-client-history";
476 }
477
478 if (!history_file.empty())
479 {
480 if (Poco::File(history_file).exists())
481 {
482#if USE_READLINE
483 int res = read_history(history_file.c_str());
484 if (res)
485 std::cerr << "Cannot read history from file " + history_file + ": "+ errnoToString(ErrorCodes::CANNOT_READ_HISTORY);
486#endif
487 }
488 else /// Create history file.
489 Poco::File(history_file).createFile();
490 }
491
492#if USE_READLINE
493 /// Install Ctrl+C signal handler that will be used in interactive mode.
494
495 if (rl_initialize())
496 throw Exception("Cannot initialize readline", ErrorCodes::CANNOT_READLINE);
497
498#if RL_VERSION_MAJOR >= 7
499 /// Enable bracketed-paste-mode only when multiquery is enabled and multiline is
500 /// disabled, so that we are able to paste and execute multiline queries in a whole
501 /// instead of erroring out, while be less intrusive.
502 if (config().has("multiquery") && !config().has("multiline"))
503 {
504 /// When bracketed paste mode is set, pasted text is bracketed with control sequences so
505 /// that the program can differentiate pasted text from typed-in text. This helps
506 /// clickhouse-client so that without -m flag, one can still paste multiline queries, and
507 /// possibly get better pasting performance. See https://cirw.in/blog/bracketed-paste for
508 /// more details.
509 rl_variable_bind("enable-bracketed-paste", "on");
510
511 /// Use our bracketed paste handler to get better user experience. See comments above.
512 rl_bind_keyseq(BRACK_PASTE_PREF, clickhouse_rl_bracketed_paste_begin);
513 }
514#endif
515
516 auto clear_prompt_or_exit = [](int)
517 {
518 /// This is signal safe.
519 ssize_t res = write(STDOUT_FILENO, "\n", 1);
520
521 /// Allow to quit client while query is in progress by pressing Ctrl+C twice.
522 /// (First press to Ctrl+C will try to cancel query by InterruptListener).
523 if (res == 1 && rl_line_buffer[0] && !RL_ISSTATE(RL_STATE_DONE))
524 {
525 rl_replace_line("", 0);
526 if (rl_forced_update_display())
527 _exit(0);
528 }
529 else
530 {
531 /// A little dirty, but we struggle to find better way to correctly
532 /// force readline to exit after returning from the signal handler.
533 _exit(0);
534 }
535 };
536
537 if (signal(SIGINT, clear_prompt_or_exit) == SIG_ERR)
538 throwFromErrno("Cannot set signal handler.", ErrorCodes::CANNOT_SET_SIGNAL_HANDLER);
539#endif
540
541 loop();
542
543 std::cout << (isNewYearMode() ? "Happy new year." : "Bye.") << std::endl;
544 return 0;
545 }
546 else
547 {
548 /// This is intended for testing purposes.
549 if (config().getBool("always_load_suggestion_data", false))
550 {
551#if USE_READLINE
552 SCOPE_EXIT({ Suggest::instance().finalize(); });
553 Suggest::instance().load(connection_parameters, config().getInt("suggestion_limit"));
554#else
555 throw Exception("Command line suggestions cannot work without readline", ErrorCodes::BAD_ARGUMENTS);
556#endif
557 }
558
559 query_id = config().getString("query_id", "");
560 nonInteractive();
561
562 /// If exception code isn't zero, we should return non-zero return code anyway.
563 if (last_exception)
564 return last_exception->code() != 0 ? last_exception->code() : -1;
565
566 return 0;
567 }
568 }
569
570
571 void connect()
572 {
573 connection_parameters = ConnectionParameters(config());
574
575 if (is_interactive)
576 std::cout << "Connecting to "
577 << (!connection_parameters.default_database.empty() ? "database " + connection_parameters.default_database + " at " : "")
578 << connection_parameters.host << ":" << connection_parameters.port
579 << (!connection_parameters.user.empty() ? " as user " + connection_parameters.user : "")
580 << "." << std::endl;
581
582 connection = std::make_unique<Connection>(
583 connection_parameters.host,
584 connection_parameters.port,
585 connection_parameters.default_database,
586 connection_parameters.user,
587 connection_parameters.password,
588 "client",
589 connection_parameters.compression,
590 connection_parameters.security);
591
592 String server_name;
593 UInt64 server_version_major = 0;
594 UInt64 server_version_minor = 0;
595 UInt64 server_version_patch = 0;
596
597 if (max_client_network_bandwidth)
598 {
599 ThrottlerPtr throttler = std::make_shared<Throttler>(max_client_network_bandwidth, 0, "");
600 connection->setThrottler(throttler);
601 }
602
603 connection->getServerVersion(connection_parameters.timeouts,
604 server_name, server_version_major, server_version_minor, server_version_patch, server_revision);
605
606 server_version = toString(server_version_major) + "." + toString(server_version_minor) + "." + toString(server_version_patch);
607
608 if (
609 server_display_name = connection->getServerDisplayName(connection_parameters.timeouts);
610 server_display_name.length() == 0)
611 {
612 server_display_name = config().getString("host", "localhost");
613 }
614
615 if (is_interactive)
616 {
617 std::cout << "Connected to " << server_name
618 << " server version " << server_version
619 << " revision " << server_revision
620 << "." << std::endl << std::endl;
621
622 if (std::make_tuple(VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH)
623 < std::make_tuple(server_version_major, server_version_minor, server_version_patch))
624 {
625 std::cout << "ClickHouse client version is older than ClickHouse server. "
626 << "It may lack support for new features."
627 << std::endl << std::endl;
628 }
629 }
630 }
631
632
633 /// Check if multi-line query is inserted from the paste buffer.
634 /// Allows delaying the start of query execution until the entirety of query is inserted.
635 static bool hasDataInSTDIN()
636 {
637 timeval timeout = { 0, 0 };
638 fd_set fds;
639 FD_ZERO(&fds);
640 FD_SET(STDIN_FILENO, &fds);
641 return select(1, &fds, nullptr, nullptr, &timeout) == 1;
642 }
643
644 inline const String prompt() const
645 {
646 return boost::replace_all_copy(prompt_by_server_display_name, "{database}", config().getString("database", "default"));
647 }
648
649 void loop()
650 {
651 String input;
652 String prev_input;
653
654 while (char * line_ = readline(input.empty() ? prompt().c_str() : ":-] "))
655 {
656 String line = line_;
657 free(line_);
658
659 size_t ws = line.size();
660 while (ws > 0 && isWhitespaceASCII(line[ws - 1]))
661 --ws;
662
663 if (ws == 0 || line.empty())
664 continue;
665
666 bool ends_with_semicolon = line[ws - 1] == ';';
667 bool ends_with_backslash = line[ws - 1] == '\\';
668
669 has_vertical_output_suffix = (ws >= 2) && (line[ws - 2] == '\\') && (line[ws - 1] == 'G');
670
671 if (ends_with_backslash)
672 line = line.substr(0, ws - 1);
673
674 input += line;
675
676 if (!ends_with_backslash && (ends_with_semicolon || has_vertical_output_suffix || (!config().has("multiline") && !hasDataInSTDIN())))
677 {
678 // TODO: should we do sensitive data masking on client too? History file can be source of secret leaks.
679 if (input != prev_input)
680 {
681 /// Replace line breaks with spaces to prevent the following problem.
682 /// Every line of multi-line query is saved to history file as a separate line.
683 /// If the user restarts the client then after pressing the "up" button
684 /// every line of the query will be displayed separately.
685 std::string logged_query = input;
686 if (config().has("multiline"))
687 std::replace(logged_query.begin(), logged_query.end(), '\n', ' ');
688 add_history(logged_query.c_str());
689
690#if USE_READLINE && HAVE_READLINE_HISTORY
691 if (!history_file.empty() && append_history(1, history_file.c_str()))
692 std::cerr << "Cannot append history to file " + history_file + ": " + errnoToString(ErrorCodes::CANNOT_APPEND_HISTORY);
693#endif
694
695 prev_input = input;
696 }
697
698 if (has_vertical_output_suffix)
699 input = input.substr(0, input.length() - 2);
700
701 try
702 {
703 if (!process(input))
704 break;
705 }
706 catch (const Exception & e)
707 {
708 actual_client_error = e.code();
709 if (!actual_client_error || actual_client_error != expected_client_error)
710 {
711 std::cerr << std::endl
712 << "Exception on client:" << std::endl
713 << "Code: " << e.code() << ". " << e.displayText() << std::endl;
714
715 if (config().getBool("stacktrace", false))
716 std::cerr << "Stack trace:" << std::endl
717 << e.getStackTrace().toString() << std::endl;
718
719 std::cerr << std::endl;
720
721 }
722
723 /// Client-side exception during query execution can result in the loss of
724 /// sync in the connection protocol.
725 /// So we reconnect and allow to enter the next query.
726 connect();
727 }
728
729 input = "";
730 }
731 else
732 {
733 input += '\n';
734 }
735 }
736 }
737
738
739 void nonInteractive()
740 {
741 String text;
742
743 if (config().has("query"))
744 text = config().getRawString("query"); /// Poco configuration should not process substitutions in form of ${...} inside query.
745 else
746 {
747 /// If 'query' parameter is not set, read a query from stdin.
748 /// The query is read entirely into memory (streaming is disabled).
749 ReadBufferFromFileDescriptor in(STDIN_FILENO);
750 readStringUntilEOF(text, in);
751 }
752
753 process(text);
754 }
755
756
757 bool process(const String & text)
758 {
759 if (exit_strings.end() != exit_strings.find(trim(text, [](char c){ return isWhitespaceASCII(c) || c == ';'; })))
760 return false;
761
762 const bool test_mode = config().has("testmode");
763 if (config().has("multiquery"))
764 {
765 { /// disable logs if expects errors
766 TestHint test_hint(test_mode, text);
767 if (test_hint.clientError() || test_hint.serverError())
768 process("SET send_logs_level = 'none'");
769 }
770
771 /// Several queries separated by ';'.
772 /// INSERT data is ended by the end of line, not ';'.
773
774 const char * begin = text.data();
775 const char * end = begin + text.size();
776
777 while (begin < end)
778 {
779 const char * pos = begin;
780 ASTPtr ast = parseQuery(pos, end, true);
781
782 if (!ast)
783 {
784 if (ignore_error)
785 {
786 Tokens tokens(begin, end);
787 IParser::Pos token_iterator(tokens);
788 while (token_iterator->type != TokenType::Semicolon && token_iterator.isValid())
789 ++token_iterator;
790 begin = token_iterator->end;
791
792 continue;
793 }
794 return true;
795 }
796
797 auto * insert = ast->as<ASTInsertQuery>();
798
799 if (insert && insert->data)
800 {
801 pos = find_first_symbols<'\n'>(insert->data, end);
802 insert->end = pos;
803 }
804
805 String str = text.substr(begin - text.data(), pos - begin);
806
807 begin = pos;
808 while (isWhitespaceASCII(*begin) || *begin == ';')
809 ++begin;
810
811 TestHint test_hint(test_mode, str);
812 expected_client_error = test_hint.clientError();
813 expected_server_error = test_hint.serverError();
814
815 try
816 {
817 auto ast_to_process = ast;
818 if (insert && insert->data)
819 ast_to_process = nullptr;
820
821 if (!processSingleQuery(str, ast_to_process) && !ignore_error)
822 return false;
823 }
824 catch (...)
825 {
826 last_exception = std::make_unique<Exception>(getCurrentExceptionMessage(true), getCurrentExceptionCode());
827 actual_client_error = last_exception->code();
828 if (!ignore_error && (!actual_client_error || actual_client_error != expected_client_error))
829 std::cerr << "Error on processing query: " << str << std::endl << last_exception->message();
830 got_exception = true;
831 }
832
833 if (!test_hint.checkActual(actual_server_error, actual_client_error, got_exception, last_exception))
834 connection->forceConnected(connection_parameters.timeouts);
835
836 if (got_exception && !ignore_error)
837 {
838 if (is_interactive)
839 break;
840 else
841 return false;
842 }
843 }
844
845 return true;
846 }
847 else
848 {
849 return processSingleQuery(text);
850 }
851 }
852
853
854 bool processSingleQuery(const String & line, ASTPtr parsed_query_ = nullptr)
855 {
856 resetOutput();
857 got_exception = false;
858
859 if (echo_queries)
860 {
861 writeString(line, std_out);
862 writeChar('\n', std_out);
863 std_out.next();
864 }
865
866 watch.restart();
867
868 query = line;
869
870 /// Some parts of a query (result output and formatting) are executed client-side.
871 /// Thus we need to parse the query.
872 parsed_query = parsed_query_;
873 if (!parsed_query)
874 {
875 const char * begin = query.data();
876 parsed_query = parseQuery(begin, begin + query.size(), false);
877 }
878
879 if (!parsed_query)
880 return true;
881
882 processed_rows = 0;
883 progress.reset();
884 show_progress_bar = false;
885 written_progress_chars = 0;
886 written_first_block = false;
887
888 {
889 /// Temporarily apply query settings to context.
890 std::optional<Settings> old_settings;
891 SCOPE_EXIT({ if (old_settings) context.setSettings(*old_settings); });
892 auto apply_query_settings = [&](const IAST & settings_ast)
893 {
894 if (!old_settings)
895 old_settings.emplace(context.getSettingsRef());
896 context.applySettingsChanges(settings_ast.as<ASTSetQuery>()->changes);
897 };
898 const auto * insert = parsed_query->as<ASTInsertQuery>();
899 if (insert && insert->settings_ast)
900 apply_query_settings(*insert->settings_ast);
901 /// FIXME: try to prettify this cast using `as<>()`
902 const auto * with_output = dynamic_cast<const ASTQueryWithOutput *>(parsed_query.get());
903 if (with_output && with_output->settings_ast)
904 apply_query_settings(*with_output->settings_ast);
905
906 connection->forceConnected(connection_parameters.timeouts);
907
908 ASTPtr input_function;
909 if (insert && insert->select)
910 insert->tryFindInputFunction(input_function);
911
912 /// INSERT query for which data transfer is needed (not an INSERT SELECT or input()) is processed separately.
913 if (insert && (!insert->select || input_function))
914 {
915 if (input_function && insert->format.empty())
916 throw Exception("FORMAT must be specified for function input()", ErrorCodes::INVALID_USAGE_OF_INPUT);
917 processInsertQuery();
918 }
919 else
920 processOrdinaryQuery();
921 }
922
923 /// Do not change context (current DB, settings) in case of an exception.
924 if (!got_exception)
925 {
926 if (const auto * set_query = parsed_query->as<ASTSetQuery>())
927 {
928 /// Save all changes in settings to avoid losing them if the connection is lost.
929 for (const auto & change : set_query->changes)
930 {
931 if (change.name == "profile")
932 current_profile = change.value.safeGet<String>();
933 else
934 context.applySettingChange(change);
935 }
936 }
937
938 if (const auto * use_query = parsed_query->as<ASTUseQuery>())
939 {
940 const String & new_database = use_query->database;
941 /// If the client initiates the reconnection, it takes the settings from the config.
942 config().setString("database", new_database);
943 /// If the connection initiates the reconnection, it uses its variable.
944 connection->setDefaultDatabase(new_database);
945 }
946 }
947
948 if (is_interactive)
949 {
950 std::cout << std::endl
951 << processed_rows << " rows in set. Elapsed: " << watch.elapsedSeconds() << " sec. ";
952
953 if (progress.read_rows >= 1000)
954 writeFinalProgress();
955
956 std::cout << std::endl << std::endl;
957 }
958 else if (print_time_to_stderr)
959 {
960 std::cerr << watch.elapsedSeconds() << "\n";
961 }
962
963 return true;
964 }
965
966
967 /// Convert external tables to ExternalTableData and send them using the connection.
968 void sendExternalTables()
969 {
970 const auto * select = parsed_query->as<ASTSelectWithUnionQuery>();
971 if (!select && !external_tables.empty())
972 throw Exception("External tables could be sent only with select query", ErrorCodes::BAD_ARGUMENTS);
973
974 std::vector<ExternalTableData> data;
975 for (auto & table : external_tables)
976 data.emplace_back(table.getData(context));
977
978 connection->sendExternalTablesData(data);
979 }
980
981
982 /// Process the query that doesn't require transferring data blocks to the server.
983 void processOrdinaryQuery()
984 {
985 /// We will always rewrite query (even if there are no query_parameters) because it will help to find errors in query formatter.
986 {
987 /// Replace ASTQueryParameter with ASTLiteral for prepared statements.
988 ReplaceQueryParameterVisitor visitor(query_parameters);
989 visitor.visit(parsed_query);
990
991 /// Get new query after substitutions. Note that it cannot be done for INSERT query with embedded data.
992 query = serializeAST(*parsed_query);
993 }
994
995 connection->sendQuery(connection_parameters.timeouts, query, query_id, QueryProcessingStage::Complete, &context.getSettingsRef(), nullptr, true);
996 sendExternalTables();
997 receiveResult();
998 }
999
1000
1001 /// Process the query that requires transferring data blocks to the server.
1002 void processInsertQuery()
1003 {
1004 /// Send part of query without data, because data will be sent separately.
1005 const auto & parsed_insert_query = parsed_query->as<ASTInsertQuery &>();
1006 String query_without_data = parsed_insert_query.data
1007 ? query.substr(0, parsed_insert_query.data - query.data())
1008 : query;
1009
1010 if (!parsed_insert_query.data && (is_interactive || (stdin_is_not_tty && std_in.eof())))
1011 throw Exception("No data to insert", ErrorCodes::NO_DATA_TO_INSERT);
1012
1013 connection->sendQuery(connection_parameters.timeouts, query_without_data, query_id, QueryProcessingStage::Complete, &context.getSettingsRef(), nullptr, true);
1014 sendExternalTables();
1015
1016 /// Receive description of table structure.
1017 Block sample;
1018 ColumnsDescription columns_description;
1019 if (receiveSampleBlock(sample, columns_description))
1020 {
1021 /// If structure was received (thus, server has not thrown an exception),
1022 /// send our data with that structure.
1023 sendData(sample, columns_description);
1024 receiveEndOfQuery();
1025 }
1026 }
1027
1028
1029 ASTPtr parseQuery(const char * & pos, const char * end, bool allow_multi_statements)
1030 {
1031 ParserQuery parser(end, true);
1032 ASTPtr res;
1033
1034 if (is_interactive || ignore_error)
1035 {
1036 String message;
1037 res = tryParseQuery(parser, pos, end, message, true, "", allow_multi_statements, 0);
1038
1039 if (!res)
1040 {
1041 std::cerr << std::endl << message << std::endl << std::endl;
1042 return nullptr;
1043 }
1044 }
1045 else
1046 res = parseQueryAndMovePosition(parser, pos, end, "", allow_multi_statements, 0);
1047
1048 if (is_interactive)
1049 {
1050 std::cout << std::endl;
1051 formatAST(*res, std::cout);
1052 std::cout << std::endl << std::endl;
1053 }
1054
1055 return res;
1056 }
1057
1058
1059 void sendData(Block & sample, const ColumnsDescription & columns_description)
1060 {
1061 /// If INSERT data must be sent.
1062 const auto * parsed_insert_query = parsed_query->as<ASTInsertQuery>();
1063 if (!parsed_insert_query)
1064 return;
1065
1066 if (parsed_insert_query->data)
1067 {
1068 /// Send data contained in the query.
1069 ReadBufferFromMemory data_in(parsed_insert_query->data, parsed_insert_query->end - parsed_insert_query->data);
1070 sendDataFrom(data_in, sample, columns_description);
1071 }
1072 else if (!is_interactive)
1073 {
1074 /// Send data read from stdin.
1075 sendDataFrom(std_in, sample, columns_description);
1076 }
1077 else
1078 throw Exception("No data to insert", ErrorCodes::NO_DATA_TO_INSERT);
1079 }
1080
1081
1082 void sendDataFrom(ReadBuffer & buf, Block & sample, const ColumnsDescription & columns_description)
1083 {
1084 String current_format = insert_format;
1085
1086 /// Data format can be specified in the INSERT query.
1087 if (const auto * insert = parsed_query->as<ASTInsertQuery>())
1088 {
1089 if (!insert->format.empty())
1090 current_format = insert->format;
1091 }
1092
1093 BlockInputStreamPtr block_input = context.getInputFormat(
1094 current_format, buf, sample, insert_format_max_block_size);
1095
1096 const auto & column_defaults = columns_description.getDefaults();
1097 if (!column_defaults.empty())
1098 block_input = std::make_shared<AddingDefaultsBlockInputStream>(block_input, column_defaults, context);
1099
1100 BlockInputStreamPtr async_block_input = std::make_shared<AsynchronousBlockInputStream>(block_input);
1101
1102 async_block_input->readPrefix();
1103
1104 while (true)
1105 {
1106 Block block = async_block_input->read();
1107
1108 /// Check if server send Log packet
1109 receiveLogs();
1110
1111 /// Check if server send Exception packet
1112 auto packet_type = connection->checkPacket();
1113 if (packet_type && *packet_type == Protocol::Server::Exception)
1114 {
1115 /*
1116 * We're exiting with error, so it makes sense to kill the
1117 * input stream without waiting for it to complete.
1118 */
1119 async_block_input->cancel(true);
1120 return;
1121 }
1122
1123 connection->sendData(block);
1124 processed_rows += block.rows();
1125
1126 if (!block)
1127 break;
1128 }
1129
1130 async_block_input->readSuffix();
1131 }
1132
1133
1134 /// Flush all buffers.
1135 void resetOutput()
1136 {
1137 block_out_stream.reset();
1138 logs_out_stream.reset();
1139
1140 if (pager_cmd)
1141 {
1142 pager_cmd->in.close();
1143 pager_cmd->wait();
1144 }
1145 pager_cmd = nullptr;
1146
1147 if (out_file_buf)
1148 {
1149 out_file_buf->next();
1150 out_file_buf.reset();
1151 }
1152
1153 if (out_logs_buf)
1154 {
1155 out_logs_buf->next();
1156 out_logs_buf.reset();
1157 }
1158
1159 std_out.next();
1160 }
1161
1162
1163 /// Receives and processes packets coming from server.
1164 /// Also checks if query execution should be cancelled.
1165 void receiveResult()
1166 {
1167 InterruptListener interrupt_listener;
1168 bool cancelled = false;
1169
1170 // TODO: get the poll_interval from commandline.
1171 const auto receive_timeout = connection_parameters.timeouts.receive_timeout;
1172 constexpr size_t default_poll_interval = 1000000; /// in microseconds
1173 constexpr size_t min_poll_interval = 5000; /// in microseconds
1174 const size_t poll_interval
1175 = std::max(min_poll_interval, std::min<size_t>(receive_timeout.totalMicroseconds(), default_poll_interval));
1176
1177 while (true)
1178 {
1179 Stopwatch receive_watch(CLOCK_MONOTONIC_COARSE);
1180
1181 while (true)
1182 {
1183 /// Has the Ctrl+C been pressed and thus the query should be cancelled?
1184 /// If this is the case, inform the server about it and receive the remaining packets
1185 /// to avoid losing sync.
1186 if (!cancelled)
1187 {
1188 auto cancelQuery = [&] {
1189 connection->sendCancel();
1190 cancelled = true;
1191 if (is_interactive)
1192 std::cout << "Cancelling query." << std::endl;
1193
1194 /// Pressing Ctrl+C twice results in shut down.
1195 interrupt_listener.unblock();
1196 };
1197
1198 if (interrupt_listener.check())
1199 {
1200 cancelQuery();
1201 }
1202 else
1203 {
1204 double elapsed = receive_watch.elapsedSeconds();
1205 if (elapsed > receive_timeout.totalSeconds())
1206 {
1207 std::cout << "Timeout exceeded while receiving data from server."
1208 << " Waited for " << static_cast<size_t>(elapsed) << " seconds,"
1209 << " timeout is " << receive_timeout.totalSeconds() << " seconds." << std::endl;
1210
1211 cancelQuery();
1212 }
1213 }
1214 }
1215
1216 /// Poll for changes after a cancellation check, otherwise it never reached
1217 /// because of progress updates from server.
1218 if (connection->poll(poll_interval))
1219 break;
1220 }
1221
1222 if (!receiveAndProcessPacket())
1223 break;
1224 }
1225
1226 if (cancelled && is_interactive)
1227 std::cout << "Query was cancelled." << std::endl;
1228 }
1229
1230
1231 /// Receive a part of the result, or progress info or an exception and process it.
1232 /// Returns true if one should continue receiving packets.
1233 bool receiveAndProcessPacket()
1234 {
1235 Packet packet = connection->receivePacket();
1236
1237 switch (packet.type)
1238 {
1239 case Protocol::Server::Data:
1240 onData(packet.block);
1241 return true;
1242
1243 case Protocol::Server::Progress:
1244 onProgress(packet.progress);
1245 return true;
1246
1247 case Protocol::Server::ProfileInfo:
1248 onProfileInfo(packet.profile_info);
1249 return true;
1250
1251 case Protocol::Server::Totals:
1252 onTotals(packet.block);
1253 return true;
1254
1255 case Protocol::Server::Extremes:
1256 onExtremes(packet.block);
1257 return true;
1258
1259 case Protocol::Server::Exception:
1260 onException(*packet.exception);
1261 last_exception = std::move(packet.exception);
1262 return false;
1263
1264 case Protocol::Server::Log:
1265 onLogData(packet.block);
1266 return true;
1267
1268 case Protocol::Server::EndOfStream:
1269 onEndOfStream();
1270 return false;
1271
1272 default:
1273 throw Exception("Unknown packet from server", ErrorCodes::UNKNOWN_PACKET_FROM_SERVER);
1274 }
1275 }
1276
1277
1278 /// Receive the block that serves as an example of the structure of table where data will be inserted.
1279 bool receiveSampleBlock(Block & out, ColumnsDescription & columns_description)
1280 {
1281 while (true)
1282 {
1283 Packet packet = connection->receivePacket();
1284
1285 switch (packet.type)
1286 {
1287 case Protocol::Server::Data:
1288 out = packet.block;
1289 return true;
1290
1291 case Protocol::Server::Exception:
1292 onException(*packet.exception);
1293 last_exception = std::move(packet.exception);
1294 return false;
1295
1296 case Protocol::Server::Log:
1297 onLogData(packet.block);
1298 break;
1299
1300 case Protocol::Server::TableColumns:
1301 columns_description = ColumnsDescription::parse(packet.multistring_message[1]);
1302 return receiveSampleBlock(out, columns_description);
1303
1304 default:
1305 throw NetException("Unexpected packet from server (expected Data, Exception or Log, got "
1306 + String(Protocol::Server::toString(packet.type)) + ")", ErrorCodes::UNEXPECTED_PACKET_FROM_SERVER);
1307 }
1308 }
1309 }
1310
1311
1312 /// Process Log packets, exit when receive Exception or EndOfStream
1313 bool receiveEndOfQuery()
1314 {
1315 while (true)
1316 {
1317 Packet packet = connection->receivePacket();
1318
1319 switch (packet.type)
1320 {
1321 case Protocol::Server::EndOfStream:
1322 onEndOfStream();
1323 return true;
1324
1325 case Protocol::Server::Exception:
1326 onException(*packet.exception);
1327 last_exception = std::move(packet.exception);
1328 return false;
1329
1330 case Protocol::Server::Log:
1331 onLogData(packet.block);
1332 break;
1333
1334 default:
1335 throw NetException("Unexpected packet from server (expected Exception, EndOfStream or Log, got "
1336 + String(Protocol::Server::toString(packet.type)) + ")", ErrorCodes::UNEXPECTED_PACKET_FROM_SERVER);
1337 }
1338 }
1339 }
1340
1341 /// Process Log packets, used when inserting data by blocks
1342 void receiveLogs()
1343 {
1344 auto packet_type = connection->checkPacket();
1345
1346 while (packet_type && *packet_type == Protocol::Server::Log)
1347 {
1348 receiveAndProcessPacket();
1349 packet_type = connection->checkPacket();
1350 }
1351 }
1352
1353 void initBlockOutputStream(const Block & block)
1354 {
1355 if (!block_out_stream)
1356 {
1357 WriteBuffer * out_buf = nullptr;
1358 String pager = config().getString("pager", "");
1359 if (!pager.empty())
1360 {
1361 signal(SIGPIPE, SIG_IGN);
1362 pager_cmd = ShellCommand::execute(pager, true);
1363 out_buf = &pager_cmd->in;
1364 }
1365 else
1366 {
1367 out_buf = &std_out;
1368 }
1369
1370 String current_format = format;
1371
1372 /// The query can specify output format or output file.
1373 /// FIXME: try to prettify this cast using `as<>()`
1374 if (const auto * query_with_output = dynamic_cast<const ASTQueryWithOutput *>(parsed_query.get()))
1375 {
1376 if (query_with_output->out_file)
1377 {
1378 const auto & out_file_node = query_with_output->out_file->as<ASTLiteral &>();
1379 const auto & out_file = out_file_node.value.safeGet<std::string>();
1380
1381 out_file_buf.emplace(out_file, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_EXCL | O_CREAT);
1382 out_buf = &*out_file_buf;
1383
1384 // We are writing to file, so default format is the same as in non-interactive mode.
1385 if (is_interactive && is_default_format)
1386 current_format = "TabSeparated";
1387 }
1388 if (query_with_output->format != nullptr)
1389 {
1390 if (has_vertical_output_suffix)
1391 throw Exception("Output format already specified", ErrorCodes::CLIENT_OUTPUT_FORMAT_SPECIFIED);
1392 const auto & id = query_with_output->format->as<ASTIdentifier &>();
1393 current_format = id.name;
1394 }
1395 }
1396
1397 if (has_vertical_output_suffix)
1398 current_format = "Vertical";
1399
1400 block_out_stream = context.getOutputFormat(current_format, *out_buf, block);
1401 block_out_stream->writePrefix();
1402 }
1403 }
1404
1405
1406 void initLogsOutputStream()
1407 {
1408 if (!logs_out_stream)
1409 {
1410 WriteBuffer * wb = out_logs_buf.get();
1411
1412 if (!out_logs_buf)
1413 {
1414 if (server_logs_file.empty())
1415 {
1416 /// Use stderr by default
1417 out_logs_buf = std::make_unique<WriteBufferFromFileDescriptor>(STDERR_FILENO);
1418 wb = out_logs_buf.get();
1419 }
1420 else if (server_logs_file == "-")
1421 {
1422 /// Use stdout if --server_logs_file=- specified
1423 wb = &std_out;
1424 }
1425 else
1426 {
1427 out_logs_buf = std::make_unique<WriteBufferFromFile>(server_logs_file, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_APPEND | O_CREAT);
1428 wb = out_logs_buf.get();
1429 }
1430 }
1431
1432 logs_out_stream = std::make_shared<InternalTextLogsRowOutputStream>(*wb);
1433 logs_out_stream->writePrefix();
1434 }
1435 }
1436
1437
1438 void onData(Block & block)
1439 {
1440 if (written_progress_chars)
1441 clearProgress();
1442
1443 if (!block)
1444 return;
1445
1446 processed_rows += block.rows();
1447 initBlockOutputStream(block);
1448
1449 /// The header block containing zero rows was used to initialize block_out_stream, do not output it.
1450 if (block.rows() != 0)
1451 {
1452 block_out_stream->write(block);
1453 written_first_block = true;
1454 }
1455
1456 /// Received data block is immediately displayed to the user.
1457 block_out_stream->flush();
1458
1459 /// Restore progress bar after data block.
1460 writeProgress();
1461 }
1462
1463
1464 void onLogData(Block & block)
1465 {
1466 initLogsOutputStream();
1467 logs_out_stream->write(block);
1468 logs_out_stream->flush();
1469 }
1470
1471
1472 void onTotals(Block & block)
1473 {
1474 initBlockOutputStream(block);
1475 block_out_stream->setTotals(block);
1476 }
1477
1478 void onExtremes(Block & block)
1479 {
1480 initBlockOutputStream(block);
1481 block_out_stream->setExtremes(block);
1482 }
1483
1484
1485 void onProgress(const Progress & value)
1486 {
1487 if (!progress.incrementPiecewiseAtomically(value))
1488 {
1489 // Just a keep-alive update.
1490 return;
1491 }
1492 if (block_out_stream)
1493 block_out_stream->onProgress(value);
1494 writeProgress();
1495 }
1496
1497
1498 void clearProgress()
1499 {
1500 written_progress_chars = 0;
1501 std::cerr << RESTORE_CURSOR_POSITION CLEAR_TO_END_OF_LINE;
1502 }
1503
1504
1505 void writeProgress()
1506 {
1507 if (!need_render_progress)
1508 return;
1509
1510 /// Output all progress bar commands to stderr at once to avoid flicker.
1511 WriteBufferFromFileDescriptor message(STDERR_FILENO, 1024);
1512
1513 static size_t increment = 0;
1514 static const char * indicators[8] =
1515 {
1516 "\033[1;30m→\033[0m",
1517 "\033[1;31m↘\033[0m",
1518 "\033[1;32m↓\033[0m",
1519 "\033[1;33m↙\033[0m",
1520 "\033[1;34m←\033[0m",
1521 "\033[1;35m↖\033[0m",
1522 "\033[1;36m↑\033[0m",
1523 "\033[1m↗\033[0m",
1524 };
1525
1526 if (written_progress_chars)
1527 message << RESTORE_CURSOR_POSITION CLEAR_TO_END_OF_LINE;
1528 else
1529 message << SAVE_CURSOR_POSITION;
1530
1531 message << DISABLE_LINE_WRAPPING;
1532
1533 size_t prefix_size = message.count();
1534
1535 message << indicators[increment % 8]
1536 << " Progress: ";
1537
1538 message
1539 << formatReadableQuantity(progress.read_rows) << " rows, "
1540 << formatReadableSizeWithDecimalSuffix(progress.read_bytes);
1541
1542 size_t elapsed_ns = watch.elapsed();
1543 if (elapsed_ns)
1544 message << " ("
1545 << formatReadableQuantity(progress.read_rows * 1000000000.0 / elapsed_ns) << " rows/s., "
1546 << formatReadableSizeWithDecimalSuffix(progress.read_bytes * 1000000000.0 / elapsed_ns) << "/s.) ";
1547 else
1548 message << ". ";
1549
1550 written_progress_chars = message.count() - prefix_size - (increment % 8 == 7 ? 10 : 13); /// Don't count invisible output (escape sequences).
1551
1552 /// If the approximate number of rows to process is known, we can display a progress bar and percentage.
1553 if (progress.total_rows_to_read > 0)
1554 {
1555 size_t total_rows_corrected = std::max(progress.read_rows, progress.total_rows_to_read);
1556
1557 /// To avoid flicker, display progress bar only if .5 seconds have passed since query execution start
1558 /// and the query is less than halfway done.
1559
1560 if (elapsed_ns > 500000000)
1561 {
1562 /// Trigger to start displaying progress bar. If query is mostly done, don't display it.
1563 if (progress.read_rows * 2 < total_rows_corrected)
1564 show_progress_bar = true;
1565
1566 if (show_progress_bar)
1567 {
1568 ssize_t width_of_progress_bar = static_cast<ssize_t>(terminal_width) - written_progress_chars - strlen(" 99%");
1569 if (width_of_progress_bar > 0)
1570 {
1571 std::string bar = UnicodeBar::render(UnicodeBar::getWidth(progress.read_rows, 0, total_rows_corrected, width_of_progress_bar));
1572 message << "\033[0;32m" << bar << "\033[0m";
1573 if (width_of_progress_bar > static_cast<ssize_t>(bar.size() / UNICODE_BAR_CHAR_SIZE))
1574 message << std::string(width_of_progress_bar - bar.size() / UNICODE_BAR_CHAR_SIZE, ' ');
1575 }
1576 }
1577 }
1578
1579 /// Underestimate percentage a bit to avoid displaying 100%.
1580 message << ' ' << (99 * progress.read_rows / total_rows_corrected) << '%';
1581 }
1582
1583 message << ENABLE_LINE_WRAPPING;
1584 ++increment;
1585
1586 message.next();
1587 }
1588
1589
1590 void writeFinalProgress()
1591 {
1592 std::cout << "Processed "
1593 << formatReadableQuantity(progress.read_rows) << " rows, "
1594 << formatReadableSizeWithDecimalSuffix(progress.read_bytes);
1595
1596 size_t elapsed_ns = watch.elapsed();
1597 if (elapsed_ns)
1598 std::cout << " ("
1599 << formatReadableQuantity(progress.read_rows * 1000000000.0 / elapsed_ns) << " rows/s., "
1600 << formatReadableSizeWithDecimalSuffix(progress.read_bytes * 1000000000.0 / elapsed_ns) << "/s.) ";
1601 else
1602 std::cout << ". ";
1603 }
1604
1605
1606 void onException(const Exception & e)
1607 {
1608 resetOutput();
1609 got_exception = true;
1610
1611 actual_server_error = e.code();
1612 if (expected_server_error)
1613 {
1614 if (actual_server_error == expected_server_error)
1615 return;
1616 std::cerr << "Expected error code: " << expected_server_error << " but got: " << actual_server_error << "." << std::endl;
1617 }
1618
1619 std::string text = e.displayText();
1620
1621 auto embedded_stack_trace_pos = text.find("Stack trace");
1622 if (std::string::npos != embedded_stack_trace_pos && !config().getBool("stacktrace", false))
1623 text.resize(embedded_stack_trace_pos);
1624
1625 std::cerr << "Received exception from server (version " << server_version << "):" << std::endl
1626 << "Code: " << e.code() << ". " << text << std::endl;
1627 }
1628
1629
1630 void onProfileInfo(const BlockStreamProfileInfo & profile_info)
1631 {
1632 if (profile_info.hasAppliedLimit() && block_out_stream)
1633 block_out_stream->setRowsBeforeLimit(profile_info.getRowsBeforeLimit());
1634 }
1635
1636
1637 void onEndOfStream()
1638 {
1639 if (block_out_stream)
1640 block_out_stream->writeSuffix();
1641
1642 if (logs_out_stream)
1643 logs_out_stream->writeSuffix();
1644
1645 resetOutput();
1646
1647 if (is_interactive && !written_first_block)
1648 std::cout << "Ok." << std::endl;
1649 }
1650
1651 void showClientVersion()
1652 {
1653 std::cout << DBMS_NAME << " client version " << VERSION_STRING << VERSION_OFFICIAL << "." << std::endl;
1654 }
1655
1656public:
1657 void init(int argc, char ** argv)
1658 {
1659 /// Don't parse options with Poco library. We need more sophisticated processing.
1660 stopOptionsProcessing();
1661
1662 /** We allow different groups of arguments:
1663 * - common arguments;
1664 * - arguments for any number of external tables each in form "--external args...",
1665 * where possible args are file, name, format, structure, types;
1666 * - param arguments for prepared statements.
1667 * Split these groups before processing.
1668 */
1669 using Arguments = std::vector<std::string>;
1670
1671 Arguments common_arguments{""}; /// 0th argument is ignored.
1672 std::vector<Arguments> external_tables_arguments;
1673
1674 bool in_external_group = false;
1675 for (int arg_num = 1; arg_num < argc; ++arg_num)
1676 {
1677 const char * arg = argv[arg_num];
1678
1679 if (0 == strcmp(arg, "--external"))
1680 {
1681 in_external_group = true;
1682 external_tables_arguments.emplace_back(Arguments{""});
1683 }
1684 /// Options with value after equal sign.
1685 else if (in_external_group
1686 && (0 == strncmp(arg, "--file=", strlen("--file="))
1687 || 0 == strncmp(arg, "--name=", strlen("--name="))
1688 || 0 == strncmp(arg, "--format=", strlen("--format="))
1689 || 0 == strncmp(arg, "--structure=", strlen("--structure="))
1690 || 0 == strncmp(arg, "--types=", strlen("--types="))))
1691 {
1692 external_tables_arguments.back().emplace_back(arg);
1693 }
1694 /// Options with value after whitespace.
1695 else if (in_external_group
1696 && (0 == strcmp(arg, "--file")
1697 || 0 == strcmp(arg, "--name")
1698 || 0 == strcmp(arg, "--format")
1699 || 0 == strcmp(arg, "--structure")
1700 || 0 == strcmp(arg, "--types")))
1701 {
1702 if (arg_num + 1 < argc)
1703 {
1704 external_tables_arguments.back().emplace_back(arg);
1705 ++arg_num;
1706 arg = argv[arg_num];
1707 external_tables_arguments.back().emplace_back(arg);
1708 }
1709 else
1710 break;
1711 }
1712 else
1713 {
1714 in_external_group = false;
1715
1716 /// Parameter arg after underline.
1717 if (startsWith(arg, "--param_"))
1718 {
1719 const char * param_continuation = arg + strlen("--param_");
1720 const char * equal_pos = strchr(param_continuation, '=');
1721
1722 if (equal_pos == param_continuation)
1723 throw Exception("Parameter name cannot be empty", ErrorCodes::BAD_ARGUMENTS);
1724
1725 if (equal_pos)
1726 {
1727 /// param_name=value
1728 query_parameters.emplace(String(param_continuation, equal_pos), String(equal_pos + 1));
1729 }
1730 else
1731 {
1732 /// param_name value
1733 ++arg_num;
1734 arg = argv[arg_num];
1735 query_parameters.emplace(String(param_continuation), String(arg));
1736 }
1737 }
1738 else
1739 common_arguments.emplace_back(arg);
1740 }
1741 }
1742
1743 stdin_is_not_tty = !isatty(STDIN_FILENO);
1744
1745 if (!stdin_is_not_tty)
1746 terminal_width = getTerminalWidth();
1747
1748 namespace po = boost::program_options;
1749
1750 /// Main commandline options related to client functionality and all parameters from Settings.
1751 po::options_description main_description = createOptionsDescription("Main options", terminal_width);
1752 main_description.add_options()
1753 ("help", "produce help message")
1754 ("config-file,C", po::value<std::string>(), "config-file path")
1755 ("config,c", po::value<std::string>(), "config-file path (another shorthand)")
1756 ("host,h", po::value<std::string>()->default_value("localhost"), "server host")
1757 ("port", po::value<int>()->default_value(9000), "server port")
1758 ("secure,s", "Use TLS connection")
1759 ("user,u", po::value<std::string>()->default_value("default"), "user")
1760 /** If "--password [value]" is used but the value is omitted, the bad argument exception will be thrown.
1761 * implicit_value is used to avoid this exception (to allow user to type just "--password")
1762 * Since currently boost provides no way to check if a value has been set implicitly for an option,
1763 * the "\n" is used to distinguish this case because there is hardly a chance an user would use "\n"
1764 * as the password.
1765 */
1766 ("password", po::value<std::string>()->implicit_value("\n", ""), "password")
1767 ("ask-password", "ask-password")
1768 ("query_id", po::value<std::string>(), "query_id")
1769 ("query,q", po::value<std::string>(), "query")
1770 ("database,d", po::value<std::string>(), "database")
1771 ("pager", po::value<std::string>(), "pager")
1772 ("disable_suggestion,A", "Disable loading suggestion data. Note that suggestion data is loaded asynchronously through a second connection to ClickHouse server. Also it is reasonable to disable suggestion if you want to paste a query with TAB characters. Shorthand option -A is for those who get used to mysql client.")
1773 ("always_load_suggestion_data", "Load suggestion data even if clickhouse-client is run in non-interactive mode. Used for testing.")
1774 ("suggestion_limit", po::value<int>()->default_value(10000),
1775 "Suggestion limit for how many databases, tables and columns to fetch.")
1776 ("multiline,m", "multiline")
1777 ("multiquery,n", "multiquery")
1778 ("format,f", po::value<std::string>(), "default output format")
1779 ("testmode,T", "enable test hints in comments")
1780 ("ignore-error", "do not stop processing in multiquery mode")
1781 ("vertical,E", "vertical output format, same as --format=Vertical or FORMAT Vertical or \\G at end of command")
1782 ("time,t", "print query execution time to stderr in non-interactive mode (for benchmarks)")
1783 ("stacktrace", "print stack traces of exceptions")
1784 ("progress", "print progress even in non-interactive mode")
1785 ("version,V", "print version information and exit")
1786 ("version-clean", "print version in machine-readable format and exit")
1787 ("echo", "in batch mode, print query before execution")
1788 ("max_client_network_bandwidth", po::value<int>(), "the maximum speed of data exchange over the network for the client in bytes per second.")
1789 ("compression", po::value<bool>(), "enable or disable compression")
1790 ("log-level", po::value<std::string>(), "client log level")
1791 ("server_logs_file", po::value<std::string>(), "put server logs into specified file")
1792 ;
1793
1794 context.getSettingsRef().addProgramOptions(main_description);
1795
1796 /// Commandline options related to external tables.
1797 po::options_description external_description = createOptionsDescription("External tables options", terminal_width);
1798 external_description.add_options()
1799 ("file", po::value<std::string>(), "data file or - for stdin")
1800 ("name", po::value<std::string>()->default_value("_data"), "name of the table")
1801 ("format", po::value<std::string>()->default_value("TabSeparated"), "data format")
1802 ("structure", po::value<std::string>(), "structure")
1803 ("types", po::value<std::string>(), "types")
1804 ;
1805
1806 /// Parse main commandline options.
1807 po::parsed_options parsed = po::command_line_parser(common_arguments).options(main_description).run();
1808 po::variables_map options;
1809 po::store(parsed, options);
1810 po::notify(options);
1811
1812 if (options.count("version") || options.count("V"))
1813 {
1814 showClientVersion();
1815 exit(0);
1816 }
1817
1818 if (options.count("version-clean"))
1819 {
1820 std::cout << VERSION_STRING;
1821 exit(0);
1822 }
1823
1824 /// Output of help message.
1825 if (options.count("help")
1826 || (options.count("host") && options["host"].as<std::string>() == "elp")) /// If user writes -help instead of --help.
1827 {
1828 std::cout << main_description << "\n";
1829 std::cout << external_description << "\n";
1830 std::cout << "In addition, --param_name=value can be specified for substitution of parameters for parametrized queries.\n";
1831 exit(0);
1832 }
1833
1834 if (options.count("log-level"))
1835 Poco::Logger::root().setLevel(options["log-level"].as<std::string>());
1836
1837 size_t number_of_external_tables_with_stdin_source = 0;
1838 for (size_t i = 0; i < external_tables_arguments.size(); ++i)
1839 {
1840 /// Parse commandline options related to external tables.
1841 po::parsed_options parsed_tables = po::command_line_parser(external_tables_arguments[i]).options(external_description).run();
1842 po::variables_map external_options;
1843 po::store(parsed_tables, external_options);
1844
1845 try
1846 {
1847 external_tables.emplace_back(external_options);
1848 if (external_tables.back().file == "-")
1849 ++number_of_external_tables_with_stdin_source;
1850 if (number_of_external_tables_with_stdin_source > 1)
1851 throw Exception("Two or more external tables has stdin (-) set as --file field", ErrorCodes::BAD_ARGUMENTS);
1852 }
1853 catch (const Exception & e)
1854 {
1855 std::string text = e.displayText();
1856 std::cerr << "Code: " << e.code() << ". " << text << std::endl;
1857 std::cerr << "Table №" << i << std::endl << std::endl;
1858 exit(e.code());
1859 }
1860 }
1861
1862 /// Copy settings-related program options to config.
1863 /// TODO: Is this code necessary?
1864 for (const auto & setting : context.getSettingsRef())
1865 {
1866 const String name = setting.getName().toString();
1867 if (options.count(name))
1868 config().setString(name, options[name].as<std::string>());
1869 }
1870
1871 if (options.count("config-file") && options.count("config"))
1872 throw Exception("Two or more configuration files referenced in arguments", ErrorCodes::BAD_ARGUMENTS);
1873
1874 /// Save received data into the internal config.
1875 if (options.count("config-file"))
1876 config().setString("config-file", options["config-file"].as<std::string>());
1877 if (options.count("config"))
1878 config().setString("config-file", options["config"].as<std::string>());
1879 if (options.count("host") && !options["host"].defaulted())
1880 config().setString("host", options["host"].as<std::string>());
1881 if (options.count("query_id"))
1882 config().setString("query_id", options["query_id"].as<std::string>());
1883 if (options.count("query"))
1884 config().setString("query", options["query"].as<std::string>());
1885 if (options.count("database"))
1886 config().setString("database", options["database"].as<std::string>());
1887 if (options.count("pager"))
1888 config().setString("pager", options["pager"].as<std::string>());
1889
1890 if (options.count("port") && !options["port"].defaulted())
1891 config().setInt("port", options["port"].as<int>());
1892 if (options.count("secure"))
1893 config().setBool("secure", true);
1894 if (options.count("user") && !options["user"].defaulted())
1895 config().setString("user", options["user"].as<std::string>());
1896 if (options.count("password"))
1897 config().setString("password", options["password"].as<std::string>());
1898 if (options.count("ask-password"))
1899 config().setBool("ask-password", true);
1900 if (options.count("multiline"))
1901 config().setBool("multiline", true);
1902 if (options.count("multiquery"))
1903 config().setBool("multiquery", true);
1904 if (options.count("testmode"))
1905 config().setBool("testmode", true);
1906 if (options.count("ignore-error"))
1907 config().setBool("ignore-error", true);
1908 if (options.count("format"))
1909 config().setString("format", options["format"].as<std::string>());
1910 if (options.count("vertical"))
1911 config().setBool("vertical", true);
1912 if (options.count("stacktrace"))
1913 config().setBool("stacktrace", true);
1914 if (options.count("progress"))
1915 config().setBool("progress", true);
1916 if (options.count("echo"))
1917 config().setBool("echo", true);
1918 if (options.count("time"))
1919 print_time_to_stderr = true;
1920 if (options.count("max_client_network_bandwidth"))
1921 max_client_network_bandwidth = options["max_client_network_bandwidth"].as<int>();
1922 if (options.count("compression"))
1923 config().setBool("compression", options["compression"].as<bool>());
1924 if (options.count("server_logs_file"))
1925 server_logs_file = options["server_logs_file"].as<std::string>();
1926 if (options.count("disable_suggestion"))
1927 config().setBool("disable_suggestion", true);
1928 if (options.count("always_load_suggestion_data"))
1929 {
1930 if (options.count("disable_suggestion"))
1931 throw Exception("Command line parameters disable_suggestion (-A) and always_load_suggestion_data cannot be specified simultaneously",
1932 ErrorCodes::BAD_ARGUMENTS);
1933 config().setBool("always_load_suggestion_data", true);
1934 }
1935 if (options.count("suggestion_limit"))
1936 config().setInt("suggestion_limit", options["suggestion_limit"].as<int>());
1937
1938 argsToConfig(common_arguments, config(), 100);
1939
1940 }
1941};
1942
1943}
1944
1945#pragma GCC diagnostic ignored "-Wunused-function"
1946#pragma GCC diagnostic ignored "-Wmissing-declarations"
1947
1948int mainEntryClickHouseClient(int argc, char ** argv)
1949{
1950 try
1951 {
1952 DB::Client client;
1953 client.init(argc, argv);
1954 return client.run();
1955 }
1956 catch (const boost::program_options::error & e)
1957 {
1958 std::cerr << "Bad arguments: " << e.what() << std::endl;
1959 return 1;
1960 }
1961 catch (...)
1962 {
1963 std::cerr << DB::getCurrentExceptionMessage(true) << std::endl;
1964 return 1;
1965 }
1966}
1967