1#include <Storages/Kafka/StorageKafka.h>
2
3#include <DataStreams/IBlockInputStream.h>
4#include <DataStreams/LimitBlockInputStream.h>
5#include <DataStreams/UnionBlockInputStream.h>
6#include <DataStreams/copyData.h>
7#include <DataTypes/DataTypeDateTime.h>
8#include <DataTypes/DataTypeNullable.h>
9#include <DataTypes/DataTypesNumber.h>
10#include <DataTypes/DataTypeString.h>
11#include <Interpreters/InterpreterInsertQuery.h>
12#include <Interpreters/evaluateConstantExpression.h>
13#include <Parsers/ASTCreateQuery.h>
14#include <Parsers/ASTExpressionList.h>
15#include <Parsers/ASTInsertQuery.h>
16#include <Parsers/ASTLiteral.h>
17#include <Storages/Kafka/KafkaSettings.h>
18#include <Storages/Kafka/KafkaBlockInputStream.h>
19#include <Storages/Kafka/KafkaBlockOutputStream.h>
20#include <Storages/Kafka/WriteBufferToKafkaProducer.h>
21#include <Storages/StorageFactory.h>
22#include <Storages/StorageMaterializedView.h>
23#include <boost/algorithm/string/replace.hpp>
24#include <boost/algorithm/string/split.hpp>
25#include <boost/algorithm/string/trim.hpp>
26#include <Poco/Util/AbstractConfiguration.h>
27#include <Common/Exception.h>
28#include <Common/Macros.h>
29#include <Common/config_version.h>
30#include <Common/setThreadName.h>
31#include <Common/typeid_cast.h>
32#include <common/logger_useful.h>
33#include <Common/quoteString.h>
34
35
36namespace DB
37{
38
39namespace ErrorCodes
40{
41 extern const int INCORRECT_DATA;
42 extern const int UNKNOWN_EXCEPTION;
43 extern const int CANNOT_READ_FROM_ISTREAM;
44 extern const int INVALID_CONFIG_PARAMETER;
45 extern const int LOGICAL_ERROR;
46 extern const int BAD_ARGUMENTS;
47 extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
48 extern const int UNSUPPORTED_METHOD;
49 extern const int UNKNOWN_SETTING;
50 extern const int READONLY_SETTING;
51}
52
53namespace
54{
55 const auto RESCHEDULE_MS = 500;
56 const auto CLEANUP_TIMEOUT_MS = 3000;
57
58 /// Configuration prefix
59 const String CONFIG_PREFIX = "kafka";
60
61 void loadFromConfig(cppkafka::Configuration & conf, const Poco::Util::AbstractConfiguration & config, const std::string & path)
62 {
63 Poco::Util::AbstractConfiguration::Keys keys;
64 std::vector<char> errstr(512);
65
66 config.keys(path, keys);
67
68 for (const auto & key : keys)
69 {
70 const String key_path = path + "." + key;
71 const String key_name = boost::replace_all_copy(key, "_", ".");
72 conf.set(key_name, config.getString(key_path));
73 }
74 }
75}
76
77StorageKafka::StorageKafka(
78 const std::string & table_name_,
79 const std::string & database_name_,
80 Context & context_,
81 const ColumnsDescription & columns_,
82 const String & brokers_,
83 const String & group_,
84 const Names & topics_,
85 const String & format_name_,
86 char row_delimiter_,
87 const String & schema_name_,
88 size_t num_consumers_,
89 UInt64 max_block_size_,
90 size_t skip_broken_,
91 bool intermediate_commit_)
92 : IStorage(
93 ColumnsDescription({{"_topic", std::make_shared<DataTypeString>()},
94 {"_key", std::make_shared<DataTypeString>()},
95 {"_offset", std::make_shared<DataTypeUInt64>()},
96 {"_partition", std::make_shared<DataTypeUInt64>()},
97 {"_timestamp", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeDateTime>())}}, true))
98 , table_name(table_name_)
99 , database_name(database_name_)
100 , global_context(context_.getGlobalContext())
101 , kafka_context(Context(global_context))
102 , topics(global_context.getMacros()->expand(topics_))
103 , brokers(global_context.getMacros()->expand(brokers_))
104 , group(global_context.getMacros()->expand(group_))
105 , format_name(global_context.getMacros()->expand(format_name_))
106 , row_delimiter(row_delimiter_)
107 , schema_name(global_context.getMacros()->expand(schema_name_))
108 , num_consumers(num_consumers_)
109 , max_block_size(max_block_size_)
110 , log(&Logger::get("StorageKafka (" + table_name_ + ")"))
111 , semaphore(0, num_consumers_)
112 , skip_broken(skip_broken_)
113 , intermediate_commit(intermediate_commit_)
114{
115 kafka_context.makeQueryContext();
116
117 setColumns(columns_);
118 task = global_context.getSchedulePool().createTask(log->name(), [this]{ threadFunc(); });
119 task->deactivate();
120}
121
122
123BlockInputStreams StorageKafka::read(
124 const Names & column_names,
125 const SelectQueryInfo & /* query_info */,
126 const Context & context,
127 QueryProcessingStage::Enum /* processed_stage */,
128 size_t /* max_block_size */,
129 unsigned /* num_streams */)
130{
131 if (num_created_consumers == 0)
132 return BlockInputStreams();
133
134 /// Always use all consumers at once, otherwise SELECT may not read messages from all partitions.
135 BlockInputStreams streams;
136 streams.reserve(num_created_consumers);
137
138 // Claim as many consumers as requested, but don't block
139 for (size_t i = 0; i < num_created_consumers; ++i)
140 {
141 /// Use block size of 1, otherwise LIMIT won't work properly as it will buffer excess messages in the last block
142 /// TODO: probably that leads to awful performance.
143 /// FIXME: seems that doesn't help with extra reading and committing unprocessed messages.
144 streams.emplace_back(std::make_shared<KafkaBlockInputStream>(*this, context, column_names, 1));
145 }
146
147 LOG_DEBUG(log, "Starting reading " << streams.size() << " streams");
148 return streams;
149}
150
151
152BlockOutputStreamPtr StorageKafka::write(const ASTPtr &, const Context & context)
153{
154 if (topics.size() > 1)
155 throw Exception("Can't write to Kafka table with multiple topics!", ErrorCodes::NOT_IMPLEMENTED);
156 return std::make_shared<KafkaBlockOutputStream>(*this, context);
157}
158
159
160void StorageKafka::startup()
161{
162 for (size_t i = 0; i < num_consumers; ++i)
163 {
164 try
165 {
166 pushReadBuffer(createReadBuffer());
167 ++num_created_consumers;
168 }
169 catch (const cppkafka::Exception &)
170 {
171 tryLogCurrentException(log);
172 }
173 }
174
175 // Start the reader thread
176 task->activateAndSchedule();
177}
178
179
180void StorageKafka::shutdown()
181{
182 // Interrupt streaming thread
183 stream_cancelled = true;
184
185 // Close all consumers
186 for (size_t i = 0; i < num_created_consumers; ++i)
187 {
188 auto buffer = popReadBuffer();
189 // FIXME: not sure if we really close consumers here, and if we really need to close them here.
190 }
191
192 LOG_TRACE(log, "Waiting for cleanup");
193 rd_kafka_wait_destroyed(CLEANUP_TIMEOUT_MS);
194
195 task->deactivate();
196}
197
198
199void StorageKafka::rename(const String & /* new_path_to_db */, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &)
200{
201 table_name = new_table_name;
202 database_name = new_database_name;
203}
204
205
206void StorageKafka::updateDependencies()
207{
208 task->activateAndSchedule();
209}
210
211
212void StorageKafka::pushReadBuffer(ConsumerBufferPtr buffer)
213{
214 std::lock_guard lock(mutex);
215 buffers.push_back(buffer);
216 semaphore.set();
217}
218
219
220ConsumerBufferPtr StorageKafka::popReadBuffer()
221{
222 return popReadBuffer(std::chrono::milliseconds::zero());
223}
224
225
226ConsumerBufferPtr StorageKafka::popReadBuffer(std::chrono::milliseconds timeout)
227{
228 // Wait for the first free buffer
229 if (timeout == std::chrono::milliseconds::zero())
230 semaphore.wait();
231 else
232 {
233 if (!semaphore.tryWait(timeout.count()))
234 return nullptr;
235 }
236
237 // Take the first available buffer from the list
238 std::lock_guard lock(mutex);
239 auto buffer = buffers.back();
240 buffers.pop_back();
241 return buffer;
242}
243
244
245ProducerBufferPtr StorageKafka::createWriteBuffer()
246{
247 cppkafka::Configuration conf;
248 conf.set("metadata.broker.list", brokers);
249 conf.set("group.id", group);
250 conf.set("client.id", VERSION_FULL);
251 // TODO: fill required settings
252 updateConfiguration(conf);
253
254 auto producer = std::make_shared<cppkafka::Producer>(conf);
255 const Settings & settings = global_context.getSettingsRef();
256 size_t poll_timeout = settings.stream_poll_timeout_ms.totalMilliseconds();
257
258 return std::make_shared<WriteBufferToKafkaProducer>(
259 producer, topics[0], row_delimiter ? std::optional<char>{row_delimiter} : std::optional<char>(), 1, 1024, std::chrono::milliseconds(poll_timeout));
260}
261
262
263ConsumerBufferPtr StorageKafka::createReadBuffer()
264{
265 cppkafka::Configuration conf;
266 conf.set("metadata.broker.list", brokers);
267 conf.set("group.id", group);
268 conf.set("client.id", VERSION_FULL);
269 conf.set("auto.offset.reset", "smallest"); // If no offset stored for this group, read all messages from the start
270 conf.set("enable.auto.commit", "false"); // We manually commit offsets after a stream successfully finished
271 conf.set("enable.auto.offset.store", "false"); // Update offset automatically - to commit them all at once.
272 conf.set("enable.partition.eof", "false"); // Ignore EOF messages
273 updateConfiguration(conf);
274
275 // Create a consumer and subscribe to topics
276 auto consumer = std::make_shared<cppkafka::Consumer>(conf);
277
278 // Limit the number of batched messages to allow early cancellations
279 const Settings & settings = global_context.getSettingsRef();
280 size_t batch_size = max_block_size;
281 if (!batch_size)
282 batch_size = settings.max_block_size.value;
283 size_t poll_timeout = settings.stream_poll_timeout_ms.totalMilliseconds();
284
285 /// NOTE: we pass |stream_cancelled| by reference here, so the buffers should not outlive the storage.
286 return std::make_shared<ReadBufferFromKafkaConsumer>(consumer, log, batch_size, poll_timeout, intermediate_commit, stream_cancelled);
287}
288
289
290void StorageKafka::updateConfiguration(cppkafka::Configuration & conf)
291{
292 // Update consumer configuration from the configuration
293 const auto & config = global_context.getConfigRef();
294 if (config.has(CONFIG_PREFIX))
295 loadFromConfig(conf, config, CONFIG_PREFIX);
296
297 // Update consumer topic-specific configuration
298 for (const auto & topic : topics)
299 {
300 const auto topic_config_key = CONFIG_PREFIX + "_" + topic;
301 if (config.has(topic_config_key))
302 loadFromConfig(conf, config, topic_config_key);
303 }
304}
305
306bool StorageKafka::checkDependencies(const String & current_database_name, const String & current_table_name)
307{
308 // Check if all dependencies are attached
309 auto dependencies = global_context.getDependencies(current_database_name, current_table_name);
310 if (dependencies.size() == 0)
311 return true;
312
313 // Check the dependencies are ready?
314 for (const auto & db_tab : dependencies)
315 {
316 auto table = global_context.tryGetTable(db_tab.first, db_tab.second);
317 if (!table)
318 return false;
319
320 // If it materialized view, check it's target table
321 auto * materialized_view = dynamic_cast<StorageMaterializedView *>(table.get());
322 if (materialized_view && !materialized_view->tryGetTargetTable())
323 return false;
324
325 // Check all its dependencies
326 if (!checkDependencies(db_tab.first, db_tab.second))
327 return false;
328 }
329
330 return true;
331}
332
333void StorageKafka::threadFunc()
334{
335 try
336 {
337 // Check if at least one direct dependency is attached
338 auto dependencies = global_context.getDependencies(database_name, table_name);
339
340 // Keep streaming as long as there are attached views and streaming is not cancelled
341 while (!stream_cancelled && num_created_consumers > 0 && dependencies.size() > 0)
342 {
343 if (!checkDependencies(database_name, table_name))
344 break;
345
346 LOG_DEBUG(log, "Started streaming to " << dependencies.size() << " attached views");
347
348 // Reschedule if not limited
349 if (!streamToViews())
350 break;
351 }
352 }
353 catch (...)
354 {
355 tryLogCurrentException(__PRETTY_FUNCTION__);
356 }
357
358 // Wait for attached views
359 if (!stream_cancelled)
360 task->scheduleAfter(RESCHEDULE_MS);
361}
362
363
364bool StorageKafka::streamToViews()
365{
366 auto table = global_context.getTable(database_name, table_name);
367 if (!table)
368 throw Exception("Engine table " + backQuote(database_name) + "." + backQuote(table_name) + " doesn't exist.", ErrorCodes::LOGICAL_ERROR);
369
370 // Create an INSERT query for streaming data
371 auto insert = std::make_shared<ASTInsertQuery>();
372 insert->database = database_name;
373 insert->table = table_name;
374
375 const Settings & settings = global_context.getSettingsRef();
376 size_t block_size = max_block_size;
377 if (block_size == 0)
378 block_size = settings.max_block_size;
379
380 // Create a stream for each consumer and join them in a union stream
381 // Only insert into dependent views and expect that input blocks contain virtual columns
382 InterpreterInsertQuery interpreter(insert, kafka_context, false, true, true);
383 auto block_io = interpreter.execute();
384
385 // Create a stream for each consumer and join them in a union stream
386 BlockInputStreams streams;
387 streams.reserve(num_created_consumers);
388 for (size_t i = 0; i < num_created_consumers; ++i)
389 {
390 auto stream
391 = std::make_shared<KafkaBlockInputStream>(*this, kafka_context, block_io.out->getHeader().getNames(), block_size, false);
392 streams.emplace_back(stream);
393
394 // Limit read batch to maximum block size to allow DDL
395 IBlockInputStream::LocalLimits limits;
396 limits.speed_limits.max_execution_time = settings.stream_flush_interval_ms;
397 limits.timeout_overflow_mode = OverflowMode::BREAK;
398 stream->setLimits(limits);
399 }
400
401 // Join multiple streams if necessary
402 BlockInputStreamPtr in;
403 if (streams.size() > 1)
404 in = std::make_shared<UnionBlockInputStream>(streams, nullptr, streams.size());
405 else
406 in = streams[0];
407
408 std::atomic<bool> stub = {false};
409 copyData(*in, *block_io.out, &stub);
410 for (auto & stream : streams)
411 stream->as<KafkaBlockInputStream>()->commit();
412
413 // Check whether the limits were applied during query execution
414 bool limits_applied = false;
415 const BlockStreamProfileInfo & info = in->getProfileInfo();
416 limits_applied = info.hasAppliedLimit();
417
418 return limits_applied;
419}
420
421void registerStorageKafka(StorageFactory & factory)
422{
423 factory.registerStorage("Kafka", [](const StorageFactory::Arguments & args)
424 {
425 ASTs & engine_args = args.engine_args;
426 size_t args_count = engine_args.size();
427 bool has_settings = args.storage_def->settings;
428
429 KafkaSettings kafka_settings;
430 if (has_settings)
431 {
432 kafka_settings.loadFromQuery(*args.storage_def);
433 }
434
435 /** Arguments of engine is following:
436 * - Kafka broker list
437 * - List of topics
438 * - Group ID (may be a constaint expression with a string result)
439 * - Message format (string)
440 * - Row delimiter
441 * - Schema (optional, if the format supports it)
442 * - Number of consumers
443 * - Max block size for background consumption
444 * - Skip (at least) unreadable messages number
445 * - Do intermediate commits when the batch consumed and handled
446 */
447
448 // Check arguments and settings
449 #define CHECK_KAFKA_STORAGE_ARGUMENT(ARG_NUM, PAR_NAME) \
450 /* One of the four required arguments is not specified */ \
451 if (args_count < ARG_NUM && ARG_NUM <= 4 && \
452 !kafka_settings.PAR_NAME.changed) \
453 { \
454 throw Exception( \
455 "Required parameter '" #PAR_NAME "' " \
456 "for storage Kafka not specified", \
457 ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); \
458 } \
459 /* The same argument is given in two places */ \
460 if (has_settings && \
461 kafka_settings.PAR_NAME.changed && \
462 args_count >= ARG_NUM) \
463 { \
464 throw Exception( \
465 "The argument â„–" #ARG_NUM " of storage Kafka " \
466 "and the parameter '" #PAR_NAME "' " \
467 "in SETTINGS cannot be specified at the same time", \
468 ErrorCodes::BAD_ARGUMENTS); \
469 }
470
471 CHECK_KAFKA_STORAGE_ARGUMENT(1, kafka_broker_list)
472 CHECK_KAFKA_STORAGE_ARGUMENT(2, kafka_topic_list)
473 CHECK_KAFKA_STORAGE_ARGUMENT(3, kafka_group_name)
474 CHECK_KAFKA_STORAGE_ARGUMENT(4, kafka_format)
475 CHECK_KAFKA_STORAGE_ARGUMENT(5, kafka_row_delimiter)
476 CHECK_KAFKA_STORAGE_ARGUMENT(6, kafka_schema)
477 CHECK_KAFKA_STORAGE_ARGUMENT(7, kafka_num_consumers)
478 CHECK_KAFKA_STORAGE_ARGUMENT(8, kafka_max_block_size)
479 CHECK_KAFKA_STORAGE_ARGUMENT(9, kafka_skip_broken_messages)
480 CHECK_KAFKA_STORAGE_ARGUMENT(10, kafka_commit_every_batch)
481
482 #undef CHECK_KAFKA_STORAGE_ARGUMENT
483
484 // Get and check broker list
485 String brokers = kafka_settings.kafka_broker_list;
486 if (args_count >= 1)
487 {
488 const auto * ast = engine_args[0]->as<ASTLiteral>();
489 if (ast && ast->value.getType() == Field::Types::String)
490 {
491 brokers = safeGet<String>(ast->value);
492 }
493 else
494 {
495 throw Exception(String("Kafka broker list must be a string"), ErrorCodes::BAD_ARGUMENTS);
496 }
497 }
498
499 // Get and check topic list
500 String topic_list = kafka_settings.kafka_topic_list.value;
501 if (args_count >= 2)
502 {
503 engine_args[1] = evaluateConstantExpressionAsLiteral(engine_args[1], args.local_context);
504 topic_list = engine_args[1]->as<ASTLiteral &>().value.safeGet<String>();
505 }
506
507 Names topics;
508 boost::split(topics, topic_list , [](char c){ return c == ','; });
509 for (String & topic : topics)
510 {
511 boost::trim(topic);
512 }
513
514 // Get and check group name
515 String group = kafka_settings.kafka_group_name.value;
516 if (args_count >= 3)
517 {
518 engine_args[2] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[2], args.local_context);
519 group = engine_args[2]->as<ASTLiteral &>().value.safeGet<String>();
520 }
521
522 // Get and check message format name
523 String format = kafka_settings.kafka_format.value;
524 if (args_count >= 4)
525 {
526 engine_args[3] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[3], args.local_context);
527
528 const auto * ast = engine_args[3]->as<ASTLiteral>();
529 if (ast && ast->value.getType() == Field::Types::String)
530 {
531 format = safeGet<String>(ast->value);
532 }
533 else
534 {
535 throw Exception("Format must be a string", ErrorCodes::BAD_ARGUMENTS);
536 }
537 }
538
539 // Parse row delimiter (optional)
540 char row_delimiter = kafka_settings.kafka_row_delimiter;
541 if (args_count >= 5)
542 {
543 engine_args[4] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[4], args.local_context);
544
545 const auto * ast = engine_args[4]->as<ASTLiteral>();
546 String arg;
547 if (ast && ast->value.getType() == Field::Types::String)
548 {
549 arg = safeGet<String>(ast->value);
550 }
551 else
552 {
553 throw Exception("Row delimiter must be a char", ErrorCodes::BAD_ARGUMENTS);
554 }
555 if (arg.size() > 1)
556 {
557 throw Exception("Row delimiter must be a char", ErrorCodes::BAD_ARGUMENTS);
558 }
559 else if (arg.size() == 0)
560 {
561 row_delimiter = '\0';
562 }
563 else
564 {
565 row_delimiter = arg[0];
566 }
567 }
568
569 // Parse format schema if supported (optional)
570 String schema = kafka_settings.kafka_schema.value;
571 if (args_count >= 6)
572 {
573 engine_args[5] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[5], args.local_context);
574
575 const auto * ast = engine_args[5]->as<ASTLiteral>();
576 if (ast && ast->value.getType() == Field::Types::String)
577 {
578 schema = safeGet<String>(ast->value);
579 }
580 else
581 {
582 throw Exception("Format schema must be a string", ErrorCodes::BAD_ARGUMENTS);
583 }
584 }
585
586 // Parse number of consumers (optional)
587 UInt64 num_consumers = kafka_settings.kafka_num_consumers;
588 if (args_count >= 7)
589 {
590 const auto * ast = engine_args[6]->as<ASTLiteral>();
591 if (ast && ast->value.getType() == Field::Types::UInt64)
592 {
593 num_consumers = safeGet<UInt64>(ast->value);
594 }
595 else
596 {
597 throw Exception("Number of consumers must be a positive integer", ErrorCodes::BAD_ARGUMENTS);
598 }
599 }
600
601 // Parse max block size (optional)
602 UInt64 max_block_size = static_cast<size_t>(kafka_settings.kafka_max_block_size);
603 if (args_count >= 8)
604 {
605 const auto * ast = engine_args[7]->as<ASTLiteral>();
606 if (ast && ast->value.getType() == Field::Types::UInt64)
607 {
608 max_block_size = static_cast<size_t>(safeGet<UInt64>(ast->value));
609 }
610 else
611 {
612 // TODO: no check if the integer is really positive
613 throw Exception("Maximum block size must be a positive integer", ErrorCodes::BAD_ARGUMENTS);
614 }
615 }
616
617 size_t skip_broken = static_cast<size_t>(kafka_settings.kafka_skip_broken_messages);
618 if (args_count >= 9)
619 {
620 const auto * ast = engine_args[8]->as<ASTLiteral>();
621 if (ast && ast->value.getType() == Field::Types::UInt64)
622 {
623 skip_broken = static_cast<size_t>(safeGet<UInt64>(ast->value));
624 }
625 else
626 {
627 throw Exception("Number of broken messages to skip must be a non-negative integer", ErrorCodes::BAD_ARGUMENTS);
628 }
629 }
630
631 bool intermediate_commit = static_cast<bool>(kafka_settings.kafka_commit_every_batch);
632 if (args_count >= 10)
633 {
634 const auto * ast = engine_args[9]->as<ASTLiteral>();
635 if (ast && ast->value.getType() == Field::Types::UInt64)
636 {
637 intermediate_commit = static_cast<bool>(safeGet<UInt64>(ast->value));
638 }
639 else
640 {
641 throw Exception("Flag for committing every batch must be 0 or 1", ErrorCodes::BAD_ARGUMENTS);
642 }
643 }
644
645 return StorageKafka::create(
646 args.table_name, args.database_name, args.context, args.columns,
647 brokers, group, topics, format, row_delimiter, schema, num_consumers, max_block_size, skip_broken, intermediate_commit);
648 });
649}
650
651
652}
653