1 | #include <Storages/Kafka/StorageKafka.h> |
2 | |
3 | #include <DataStreams/IBlockInputStream.h> |
4 | #include <DataStreams/LimitBlockInputStream.h> |
5 | #include <DataStreams/UnionBlockInputStream.h> |
6 | #include <DataStreams/copyData.h> |
7 | #include <DataTypes/DataTypeDateTime.h> |
8 | #include <DataTypes/DataTypeNullable.h> |
9 | #include <DataTypes/DataTypesNumber.h> |
10 | #include <DataTypes/DataTypeString.h> |
11 | #include <Interpreters/InterpreterInsertQuery.h> |
12 | #include <Interpreters/evaluateConstantExpression.h> |
13 | #include <Parsers/ASTCreateQuery.h> |
14 | #include <Parsers/ASTExpressionList.h> |
15 | #include <Parsers/ASTInsertQuery.h> |
16 | #include <Parsers/ASTLiteral.h> |
17 | #include <Storages/Kafka/KafkaSettings.h> |
18 | #include <Storages/Kafka/KafkaBlockInputStream.h> |
19 | #include <Storages/Kafka/KafkaBlockOutputStream.h> |
20 | #include <Storages/Kafka/WriteBufferToKafkaProducer.h> |
21 | #include <Storages/StorageFactory.h> |
22 | #include <Storages/StorageMaterializedView.h> |
23 | #include <boost/algorithm/string/replace.hpp> |
24 | #include <boost/algorithm/string/split.hpp> |
25 | #include <boost/algorithm/string/trim.hpp> |
26 | #include <Poco/Util/AbstractConfiguration.h> |
27 | #include <Common/Exception.h> |
28 | #include <Common/Macros.h> |
29 | #include <Common/config_version.h> |
30 | #include <Common/setThreadName.h> |
31 | #include <Common/typeid_cast.h> |
32 | #include <common/logger_useful.h> |
33 | #include <Common/quoteString.h> |
34 | |
35 | |
36 | namespace DB |
37 | { |
38 | |
39 | namespace ErrorCodes |
40 | { |
41 | extern const int INCORRECT_DATA; |
42 | extern const int UNKNOWN_EXCEPTION; |
43 | extern const int CANNOT_READ_FROM_ISTREAM; |
44 | extern const int INVALID_CONFIG_PARAMETER; |
45 | extern const int LOGICAL_ERROR; |
46 | extern const int BAD_ARGUMENTS; |
47 | extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; |
48 | extern const int UNSUPPORTED_METHOD; |
49 | extern const int UNKNOWN_SETTING; |
50 | extern const int READONLY_SETTING; |
51 | } |
52 | |
53 | namespace |
54 | { |
55 | const auto RESCHEDULE_MS = 500; |
56 | const auto CLEANUP_TIMEOUT_MS = 3000; |
57 | |
58 | /// Configuration prefix |
59 | const String CONFIG_PREFIX = "kafka" ; |
60 | |
61 | void loadFromConfig(cppkafka::Configuration & conf, const Poco::Util::AbstractConfiguration & config, const std::string & path) |
62 | { |
63 | Poco::Util::AbstractConfiguration::Keys keys; |
64 | std::vector<char> errstr(512); |
65 | |
66 | config.keys(path, keys); |
67 | |
68 | for (const auto & key : keys) |
69 | { |
70 | const String key_path = path + "." + key; |
71 | const String key_name = boost::replace_all_copy(key, "_" , "." ); |
72 | conf.set(key_name, config.getString(key_path)); |
73 | } |
74 | } |
75 | } |
76 | |
77 | StorageKafka::StorageKafka( |
78 | const std::string & table_name_, |
79 | const std::string & database_name_, |
80 | Context & context_, |
81 | const ColumnsDescription & columns_, |
82 | const String & brokers_, |
83 | const String & group_, |
84 | const Names & topics_, |
85 | const String & format_name_, |
86 | char row_delimiter_, |
87 | const String & schema_name_, |
88 | size_t num_consumers_, |
89 | UInt64 max_block_size_, |
90 | size_t skip_broken_, |
91 | bool intermediate_commit_) |
92 | : IStorage( |
93 | ColumnsDescription({{"_topic" , std::make_shared<DataTypeString>()}, |
94 | {"_key" , std::make_shared<DataTypeString>()}, |
95 | {"_offset" , std::make_shared<DataTypeUInt64>()}, |
96 | {"_partition" , std::make_shared<DataTypeUInt64>()}, |
97 | {"_timestamp" , std::make_shared<DataTypeNullable>(std::make_shared<DataTypeDateTime>())}}, true)) |
98 | , table_name(table_name_) |
99 | , database_name(database_name_) |
100 | , global_context(context_.getGlobalContext()) |
101 | , kafka_context(Context(global_context)) |
102 | , topics(global_context.getMacros()->expand(topics_)) |
103 | , brokers(global_context.getMacros()->expand(brokers_)) |
104 | , group(global_context.getMacros()->expand(group_)) |
105 | , format_name(global_context.getMacros()->expand(format_name_)) |
106 | , row_delimiter(row_delimiter_) |
107 | , schema_name(global_context.getMacros()->expand(schema_name_)) |
108 | , num_consumers(num_consumers_) |
109 | , max_block_size(max_block_size_) |
110 | , log(&Logger::get("StorageKafka (" + table_name_ + ")" )) |
111 | , semaphore(0, num_consumers_) |
112 | , skip_broken(skip_broken_) |
113 | , intermediate_commit(intermediate_commit_) |
114 | { |
115 | kafka_context.makeQueryContext(); |
116 | |
117 | setColumns(columns_); |
118 | task = global_context.getSchedulePool().createTask(log->name(), [this]{ threadFunc(); }); |
119 | task->deactivate(); |
120 | } |
121 | |
122 | |
123 | BlockInputStreams StorageKafka::read( |
124 | const Names & column_names, |
125 | const SelectQueryInfo & /* query_info */, |
126 | const Context & context, |
127 | QueryProcessingStage::Enum /* processed_stage */, |
128 | size_t /* max_block_size */, |
129 | unsigned /* num_streams */) |
130 | { |
131 | if (num_created_consumers == 0) |
132 | return BlockInputStreams(); |
133 | |
134 | /// Always use all consumers at once, otherwise SELECT may not read messages from all partitions. |
135 | BlockInputStreams streams; |
136 | streams.reserve(num_created_consumers); |
137 | |
138 | // Claim as many consumers as requested, but don't block |
139 | for (size_t i = 0; i < num_created_consumers; ++i) |
140 | { |
141 | /// Use block size of 1, otherwise LIMIT won't work properly as it will buffer excess messages in the last block |
142 | /// TODO: probably that leads to awful performance. |
143 | /// FIXME: seems that doesn't help with extra reading and committing unprocessed messages. |
144 | streams.emplace_back(std::make_shared<KafkaBlockInputStream>(*this, context, column_names, 1)); |
145 | } |
146 | |
147 | LOG_DEBUG(log, "Starting reading " << streams.size() << " streams" ); |
148 | return streams; |
149 | } |
150 | |
151 | |
152 | BlockOutputStreamPtr StorageKafka::write(const ASTPtr &, const Context & context) |
153 | { |
154 | if (topics.size() > 1) |
155 | throw Exception("Can't write to Kafka table with multiple topics!" , ErrorCodes::NOT_IMPLEMENTED); |
156 | return std::make_shared<KafkaBlockOutputStream>(*this, context); |
157 | } |
158 | |
159 | |
160 | void StorageKafka::startup() |
161 | { |
162 | for (size_t i = 0; i < num_consumers; ++i) |
163 | { |
164 | try |
165 | { |
166 | pushReadBuffer(createReadBuffer()); |
167 | ++num_created_consumers; |
168 | } |
169 | catch (const cppkafka::Exception &) |
170 | { |
171 | tryLogCurrentException(log); |
172 | } |
173 | } |
174 | |
175 | // Start the reader thread |
176 | task->activateAndSchedule(); |
177 | } |
178 | |
179 | |
180 | void StorageKafka::shutdown() |
181 | { |
182 | // Interrupt streaming thread |
183 | stream_cancelled = true; |
184 | |
185 | // Close all consumers |
186 | for (size_t i = 0; i < num_created_consumers; ++i) |
187 | { |
188 | auto buffer = popReadBuffer(); |
189 | // FIXME: not sure if we really close consumers here, and if we really need to close them here. |
190 | } |
191 | |
192 | LOG_TRACE(log, "Waiting for cleanup" ); |
193 | rd_kafka_wait_destroyed(CLEANUP_TIMEOUT_MS); |
194 | |
195 | task->deactivate(); |
196 | } |
197 | |
198 | |
199 | void StorageKafka::rename(const String & /* new_path_to_db */, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &) |
200 | { |
201 | table_name = new_table_name; |
202 | database_name = new_database_name; |
203 | } |
204 | |
205 | |
206 | void StorageKafka::updateDependencies() |
207 | { |
208 | task->activateAndSchedule(); |
209 | } |
210 | |
211 | |
212 | void StorageKafka::pushReadBuffer(ConsumerBufferPtr buffer) |
213 | { |
214 | std::lock_guard lock(mutex); |
215 | buffers.push_back(buffer); |
216 | semaphore.set(); |
217 | } |
218 | |
219 | |
220 | ConsumerBufferPtr StorageKafka::popReadBuffer() |
221 | { |
222 | return popReadBuffer(std::chrono::milliseconds::zero()); |
223 | } |
224 | |
225 | |
226 | ConsumerBufferPtr StorageKafka::popReadBuffer(std::chrono::milliseconds timeout) |
227 | { |
228 | // Wait for the first free buffer |
229 | if (timeout == std::chrono::milliseconds::zero()) |
230 | semaphore.wait(); |
231 | else |
232 | { |
233 | if (!semaphore.tryWait(timeout.count())) |
234 | return nullptr; |
235 | } |
236 | |
237 | // Take the first available buffer from the list |
238 | std::lock_guard lock(mutex); |
239 | auto buffer = buffers.back(); |
240 | buffers.pop_back(); |
241 | return buffer; |
242 | } |
243 | |
244 | |
245 | ProducerBufferPtr StorageKafka::createWriteBuffer() |
246 | { |
247 | cppkafka::Configuration conf; |
248 | conf.set("metadata.broker.list" , brokers); |
249 | conf.set("group.id" , group); |
250 | conf.set("client.id" , VERSION_FULL); |
251 | // TODO: fill required settings |
252 | updateConfiguration(conf); |
253 | |
254 | auto producer = std::make_shared<cppkafka::Producer>(conf); |
255 | const Settings & settings = global_context.getSettingsRef(); |
256 | size_t poll_timeout = settings.stream_poll_timeout_ms.totalMilliseconds(); |
257 | |
258 | return std::make_shared<WriteBufferToKafkaProducer>( |
259 | producer, topics[0], row_delimiter ? std::optional<char>{row_delimiter} : std::optional<char>(), 1, 1024, std::chrono::milliseconds(poll_timeout)); |
260 | } |
261 | |
262 | |
263 | ConsumerBufferPtr StorageKafka::createReadBuffer() |
264 | { |
265 | cppkafka::Configuration conf; |
266 | conf.set("metadata.broker.list" , brokers); |
267 | conf.set("group.id" , group); |
268 | conf.set("client.id" , VERSION_FULL); |
269 | conf.set("auto.offset.reset" , "smallest" ); // If no offset stored for this group, read all messages from the start |
270 | conf.set("enable.auto.commit" , "false" ); // We manually commit offsets after a stream successfully finished |
271 | conf.set("enable.auto.offset.store" , "false" ); // Update offset automatically - to commit them all at once. |
272 | conf.set("enable.partition.eof" , "false" ); // Ignore EOF messages |
273 | updateConfiguration(conf); |
274 | |
275 | // Create a consumer and subscribe to topics |
276 | auto consumer = std::make_shared<cppkafka::Consumer>(conf); |
277 | |
278 | // Limit the number of batched messages to allow early cancellations |
279 | const Settings & settings = global_context.getSettingsRef(); |
280 | size_t batch_size = max_block_size; |
281 | if (!batch_size) |
282 | batch_size = settings.max_block_size.value; |
283 | size_t poll_timeout = settings.stream_poll_timeout_ms.totalMilliseconds(); |
284 | |
285 | /// NOTE: we pass |stream_cancelled| by reference here, so the buffers should not outlive the storage. |
286 | return std::make_shared<ReadBufferFromKafkaConsumer>(consumer, log, batch_size, poll_timeout, intermediate_commit, stream_cancelled); |
287 | } |
288 | |
289 | |
290 | void StorageKafka::updateConfiguration(cppkafka::Configuration & conf) |
291 | { |
292 | // Update consumer configuration from the configuration |
293 | const auto & config = global_context.getConfigRef(); |
294 | if (config.has(CONFIG_PREFIX)) |
295 | loadFromConfig(conf, config, CONFIG_PREFIX); |
296 | |
297 | // Update consumer topic-specific configuration |
298 | for (const auto & topic : topics) |
299 | { |
300 | const auto topic_config_key = CONFIG_PREFIX + "_" + topic; |
301 | if (config.has(topic_config_key)) |
302 | loadFromConfig(conf, config, topic_config_key); |
303 | } |
304 | } |
305 | |
306 | bool StorageKafka::checkDependencies(const String & current_database_name, const String & current_table_name) |
307 | { |
308 | // Check if all dependencies are attached |
309 | auto dependencies = global_context.getDependencies(current_database_name, current_table_name); |
310 | if (dependencies.size() == 0) |
311 | return true; |
312 | |
313 | // Check the dependencies are ready? |
314 | for (const auto & db_tab : dependencies) |
315 | { |
316 | auto table = global_context.tryGetTable(db_tab.first, db_tab.second); |
317 | if (!table) |
318 | return false; |
319 | |
320 | // If it materialized view, check it's target table |
321 | auto * materialized_view = dynamic_cast<StorageMaterializedView *>(table.get()); |
322 | if (materialized_view && !materialized_view->tryGetTargetTable()) |
323 | return false; |
324 | |
325 | // Check all its dependencies |
326 | if (!checkDependencies(db_tab.first, db_tab.second)) |
327 | return false; |
328 | } |
329 | |
330 | return true; |
331 | } |
332 | |
333 | void StorageKafka::threadFunc() |
334 | { |
335 | try |
336 | { |
337 | // Check if at least one direct dependency is attached |
338 | auto dependencies = global_context.getDependencies(database_name, table_name); |
339 | |
340 | // Keep streaming as long as there are attached views and streaming is not cancelled |
341 | while (!stream_cancelled && num_created_consumers > 0 && dependencies.size() > 0) |
342 | { |
343 | if (!checkDependencies(database_name, table_name)) |
344 | break; |
345 | |
346 | LOG_DEBUG(log, "Started streaming to " << dependencies.size() << " attached views" ); |
347 | |
348 | // Reschedule if not limited |
349 | if (!streamToViews()) |
350 | break; |
351 | } |
352 | } |
353 | catch (...) |
354 | { |
355 | tryLogCurrentException(__PRETTY_FUNCTION__); |
356 | } |
357 | |
358 | // Wait for attached views |
359 | if (!stream_cancelled) |
360 | task->scheduleAfter(RESCHEDULE_MS); |
361 | } |
362 | |
363 | |
364 | bool StorageKafka::streamToViews() |
365 | { |
366 | auto table = global_context.getTable(database_name, table_name); |
367 | if (!table) |
368 | throw Exception("Engine table " + backQuote(database_name) + "." + backQuote(table_name) + " doesn't exist." , ErrorCodes::LOGICAL_ERROR); |
369 | |
370 | // Create an INSERT query for streaming data |
371 | auto insert = std::make_shared<ASTInsertQuery>(); |
372 | insert->database = database_name; |
373 | insert->table = table_name; |
374 | |
375 | const Settings & settings = global_context.getSettingsRef(); |
376 | size_t block_size = max_block_size; |
377 | if (block_size == 0) |
378 | block_size = settings.max_block_size; |
379 | |
380 | // Create a stream for each consumer and join them in a union stream |
381 | // Only insert into dependent views and expect that input blocks contain virtual columns |
382 | InterpreterInsertQuery interpreter(insert, kafka_context, false, true, true); |
383 | auto block_io = interpreter.execute(); |
384 | |
385 | // Create a stream for each consumer and join them in a union stream |
386 | BlockInputStreams streams; |
387 | streams.reserve(num_created_consumers); |
388 | for (size_t i = 0; i < num_created_consumers; ++i) |
389 | { |
390 | auto stream |
391 | = std::make_shared<KafkaBlockInputStream>(*this, kafka_context, block_io.out->getHeader().getNames(), block_size, false); |
392 | streams.emplace_back(stream); |
393 | |
394 | // Limit read batch to maximum block size to allow DDL |
395 | IBlockInputStream::LocalLimits limits; |
396 | limits.speed_limits.max_execution_time = settings.stream_flush_interval_ms; |
397 | limits.timeout_overflow_mode = OverflowMode::BREAK; |
398 | stream->setLimits(limits); |
399 | } |
400 | |
401 | // Join multiple streams if necessary |
402 | BlockInputStreamPtr in; |
403 | if (streams.size() > 1) |
404 | in = std::make_shared<UnionBlockInputStream>(streams, nullptr, streams.size()); |
405 | else |
406 | in = streams[0]; |
407 | |
408 | std::atomic<bool> stub = {false}; |
409 | copyData(*in, *block_io.out, &stub); |
410 | for (auto & stream : streams) |
411 | stream->as<KafkaBlockInputStream>()->commit(); |
412 | |
413 | // Check whether the limits were applied during query execution |
414 | bool limits_applied = false; |
415 | const BlockStreamProfileInfo & info = in->getProfileInfo(); |
416 | limits_applied = info.hasAppliedLimit(); |
417 | |
418 | return limits_applied; |
419 | } |
420 | |
421 | void registerStorageKafka(StorageFactory & factory) |
422 | { |
423 | factory.registerStorage("Kafka" , [](const StorageFactory::Arguments & args) |
424 | { |
425 | ASTs & engine_args = args.engine_args; |
426 | size_t args_count = engine_args.size(); |
427 | bool has_settings = args.storage_def->settings; |
428 | |
429 | KafkaSettings kafka_settings; |
430 | if (has_settings) |
431 | { |
432 | kafka_settings.loadFromQuery(*args.storage_def); |
433 | } |
434 | |
435 | /** Arguments of engine is following: |
436 | * - Kafka broker list |
437 | * - List of topics |
438 | * - Group ID (may be a constaint expression with a string result) |
439 | * - Message format (string) |
440 | * - Row delimiter |
441 | * - Schema (optional, if the format supports it) |
442 | * - Number of consumers |
443 | * - Max block size for background consumption |
444 | * - Skip (at least) unreadable messages number |
445 | * - Do intermediate commits when the batch consumed and handled |
446 | */ |
447 | |
448 | // Check arguments and settings |
449 | #define CHECK_KAFKA_STORAGE_ARGUMENT(ARG_NUM, PAR_NAME) \ |
450 | /* One of the four required arguments is not specified */ \ |
451 | if (args_count < ARG_NUM && ARG_NUM <= 4 && \ |
452 | !kafka_settings.PAR_NAME.changed) \ |
453 | { \ |
454 | throw Exception( \ |
455 | "Required parameter '" #PAR_NAME "' " \ |
456 | "for storage Kafka not specified", \ |
457 | ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); \ |
458 | } \ |
459 | /* The same argument is given in two places */ \ |
460 | if (has_settings && \ |
461 | kafka_settings.PAR_NAME.changed && \ |
462 | args_count >= ARG_NUM) \ |
463 | { \ |
464 | throw Exception( \ |
465 | "The argument â„–" #ARG_NUM " of storage Kafka " \ |
466 | "and the parameter '" #PAR_NAME "' " \ |
467 | "in SETTINGS cannot be specified at the same time", \ |
468 | ErrorCodes::BAD_ARGUMENTS); \ |
469 | } |
470 | |
471 | CHECK_KAFKA_STORAGE_ARGUMENT(1, kafka_broker_list) |
472 | CHECK_KAFKA_STORAGE_ARGUMENT(2, kafka_topic_list) |
473 | CHECK_KAFKA_STORAGE_ARGUMENT(3, kafka_group_name) |
474 | CHECK_KAFKA_STORAGE_ARGUMENT(4, kafka_format) |
475 | CHECK_KAFKA_STORAGE_ARGUMENT(5, kafka_row_delimiter) |
476 | CHECK_KAFKA_STORAGE_ARGUMENT(6, kafka_schema) |
477 | CHECK_KAFKA_STORAGE_ARGUMENT(7, kafka_num_consumers) |
478 | CHECK_KAFKA_STORAGE_ARGUMENT(8, kafka_max_block_size) |
479 | CHECK_KAFKA_STORAGE_ARGUMENT(9, kafka_skip_broken_messages) |
480 | CHECK_KAFKA_STORAGE_ARGUMENT(10, kafka_commit_every_batch) |
481 | |
482 | #undef CHECK_KAFKA_STORAGE_ARGUMENT |
483 | |
484 | // Get and check broker list |
485 | String brokers = kafka_settings.kafka_broker_list; |
486 | if (args_count >= 1) |
487 | { |
488 | const auto * ast = engine_args[0]->as<ASTLiteral>(); |
489 | if (ast && ast->value.getType() == Field::Types::String) |
490 | { |
491 | brokers = safeGet<String>(ast->value); |
492 | } |
493 | else |
494 | { |
495 | throw Exception(String("Kafka broker list must be a string" ), ErrorCodes::BAD_ARGUMENTS); |
496 | } |
497 | } |
498 | |
499 | // Get and check topic list |
500 | String topic_list = kafka_settings.kafka_topic_list.value; |
501 | if (args_count >= 2) |
502 | { |
503 | engine_args[1] = evaluateConstantExpressionAsLiteral(engine_args[1], args.local_context); |
504 | topic_list = engine_args[1]->as<ASTLiteral &>().value.safeGet<String>(); |
505 | } |
506 | |
507 | Names topics; |
508 | boost::split(topics, topic_list , [](char c){ return c == ','; }); |
509 | for (String & topic : topics) |
510 | { |
511 | boost::trim(topic); |
512 | } |
513 | |
514 | // Get and check group name |
515 | String group = kafka_settings.kafka_group_name.value; |
516 | if (args_count >= 3) |
517 | { |
518 | engine_args[2] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[2], args.local_context); |
519 | group = engine_args[2]->as<ASTLiteral &>().value.safeGet<String>(); |
520 | } |
521 | |
522 | // Get and check message format name |
523 | String format = kafka_settings.kafka_format.value; |
524 | if (args_count >= 4) |
525 | { |
526 | engine_args[3] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[3], args.local_context); |
527 | |
528 | const auto * ast = engine_args[3]->as<ASTLiteral>(); |
529 | if (ast && ast->value.getType() == Field::Types::String) |
530 | { |
531 | format = safeGet<String>(ast->value); |
532 | } |
533 | else |
534 | { |
535 | throw Exception("Format must be a string" , ErrorCodes::BAD_ARGUMENTS); |
536 | } |
537 | } |
538 | |
539 | // Parse row delimiter (optional) |
540 | char row_delimiter = kafka_settings.kafka_row_delimiter; |
541 | if (args_count >= 5) |
542 | { |
543 | engine_args[4] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[4], args.local_context); |
544 | |
545 | const auto * ast = engine_args[4]->as<ASTLiteral>(); |
546 | String arg; |
547 | if (ast && ast->value.getType() == Field::Types::String) |
548 | { |
549 | arg = safeGet<String>(ast->value); |
550 | } |
551 | else |
552 | { |
553 | throw Exception("Row delimiter must be a char" , ErrorCodes::BAD_ARGUMENTS); |
554 | } |
555 | if (arg.size() > 1) |
556 | { |
557 | throw Exception("Row delimiter must be a char" , ErrorCodes::BAD_ARGUMENTS); |
558 | } |
559 | else if (arg.size() == 0) |
560 | { |
561 | row_delimiter = '\0'; |
562 | } |
563 | else |
564 | { |
565 | row_delimiter = arg[0]; |
566 | } |
567 | } |
568 | |
569 | // Parse format schema if supported (optional) |
570 | String schema = kafka_settings.kafka_schema.value; |
571 | if (args_count >= 6) |
572 | { |
573 | engine_args[5] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[5], args.local_context); |
574 | |
575 | const auto * ast = engine_args[5]->as<ASTLiteral>(); |
576 | if (ast && ast->value.getType() == Field::Types::String) |
577 | { |
578 | schema = safeGet<String>(ast->value); |
579 | } |
580 | else |
581 | { |
582 | throw Exception("Format schema must be a string" , ErrorCodes::BAD_ARGUMENTS); |
583 | } |
584 | } |
585 | |
586 | // Parse number of consumers (optional) |
587 | UInt64 num_consumers = kafka_settings.kafka_num_consumers; |
588 | if (args_count >= 7) |
589 | { |
590 | const auto * ast = engine_args[6]->as<ASTLiteral>(); |
591 | if (ast && ast->value.getType() == Field::Types::UInt64) |
592 | { |
593 | num_consumers = safeGet<UInt64>(ast->value); |
594 | } |
595 | else |
596 | { |
597 | throw Exception("Number of consumers must be a positive integer" , ErrorCodes::BAD_ARGUMENTS); |
598 | } |
599 | } |
600 | |
601 | // Parse max block size (optional) |
602 | UInt64 max_block_size = static_cast<size_t>(kafka_settings.kafka_max_block_size); |
603 | if (args_count >= 8) |
604 | { |
605 | const auto * ast = engine_args[7]->as<ASTLiteral>(); |
606 | if (ast && ast->value.getType() == Field::Types::UInt64) |
607 | { |
608 | max_block_size = static_cast<size_t>(safeGet<UInt64>(ast->value)); |
609 | } |
610 | else |
611 | { |
612 | // TODO: no check if the integer is really positive |
613 | throw Exception("Maximum block size must be a positive integer" , ErrorCodes::BAD_ARGUMENTS); |
614 | } |
615 | } |
616 | |
617 | size_t skip_broken = static_cast<size_t>(kafka_settings.kafka_skip_broken_messages); |
618 | if (args_count >= 9) |
619 | { |
620 | const auto * ast = engine_args[8]->as<ASTLiteral>(); |
621 | if (ast && ast->value.getType() == Field::Types::UInt64) |
622 | { |
623 | skip_broken = static_cast<size_t>(safeGet<UInt64>(ast->value)); |
624 | } |
625 | else |
626 | { |
627 | throw Exception("Number of broken messages to skip must be a non-negative integer" , ErrorCodes::BAD_ARGUMENTS); |
628 | } |
629 | } |
630 | |
631 | bool intermediate_commit = static_cast<bool>(kafka_settings.kafka_commit_every_batch); |
632 | if (args_count >= 10) |
633 | { |
634 | const auto * ast = engine_args[9]->as<ASTLiteral>(); |
635 | if (ast && ast->value.getType() == Field::Types::UInt64) |
636 | { |
637 | intermediate_commit = static_cast<bool>(safeGet<UInt64>(ast->value)); |
638 | } |
639 | else |
640 | { |
641 | throw Exception("Flag for committing every batch must be 0 or 1" , ErrorCodes::BAD_ARGUMENTS); |
642 | } |
643 | } |
644 | |
645 | return StorageKafka::create( |
646 | args.table_name, args.database_name, args.context, args.columns, |
647 | brokers, group, topics, format, row_delimiter, schema, num_consumers, max_block_size, skip_broken, intermediate_commit); |
648 | }); |
649 | } |
650 | |
651 | |
652 | } |
653 | |