1 | #pragma once |
2 | |
3 | #include <Core/BackgroundSchedulePool.h> |
4 | #include <Storages/IStorage.h> |
5 | #include <Storages/Kafka/Buffer_fwd.h> |
6 | |
7 | #include <Poco/Semaphore.h> |
8 | #include <ext/shared_ptr_helper.h> |
9 | |
10 | #include <mutex> |
11 | #include <atomic> |
12 | |
13 | namespace cppkafka |
14 | { |
15 | |
16 | class Configuration; |
17 | |
18 | } |
19 | |
20 | namespace DB |
21 | { |
22 | |
23 | /** Implements a Kafka queue table engine that can be used as a persistent queue / buffer, |
24 | * or as a basic building block for creating pipelines with a continuous insertion / ETL. |
25 | */ |
26 | class StorageKafka : public ext::shared_ptr_helper<StorageKafka>, public IStorage |
27 | { |
28 | friend struct ext::shared_ptr_helper<StorageKafka>; |
29 | public: |
30 | std::string getName() const override { return "Kafka" ; } |
31 | std::string getTableName() const override { return table_name; } |
32 | std::string getDatabaseName() const override { return database_name; } |
33 | |
34 | bool supportsSettings() const override { return true; } |
35 | bool noPushingToViews() const override { return true; } |
36 | |
37 | void startup() override; |
38 | void shutdown() override; |
39 | |
40 | BlockInputStreams read( |
41 | const Names & column_names, |
42 | const SelectQueryInfo & query_info, |
43 | const Context & context, |
44 | QueryProcessingStage::Enum processed_stage, |
45 | size_t max_block_size, |
46 | unsigned num_streams) override; |
47 | |
48 | BlockOutputStreamPtr write( |
49 | const ASTPtr & query, |
50 | const Context & context) override; |
51 | |
52 | void rename(const String & /* new_path_to_db */, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &) override; |
53 | |
54 | void updateDependencies() override; |
55 | |
56 | void pushReadBuffer(ConsumerBufferPtr buf); |
57 | ConsumerBufferPtr popReadBuffer(); |
58 | ConsumerBufferPtr popReadBuffer(std::chrono::milliseconds timeout); |
59 | |
60 | ProducerBufferPtr createWriteBuffer(); |
61 | |
62 | const auto & getTopics() const { return topics; } |
63 | const auto & getFormatName() const { return format_name; } |
64 | const auto & getSchemaName() const { return schema_name; } |
65 | const auto & skipBroken() const { return skip_broken; } |
66 | |
67 | protected: |
68 | StorageKafka( |
69 | const std::string & table_name_, |
70 | const std::string & database_name_, |
71 | Context & context_, |
72 | const ColumnsDescription & columns_, |
73 | const String & brokers_, const String & group_, const Names & topics_, |
74 | const String & format_name_, char row_delimiter_, const String & schema_name_, |
75 | size_t num_consumers_, UInt64 max_block_size_, size_t skip_broken, |
76 | bool intermediate_commit_); |
77 | |
78 | private: |
79 | // Configuration and state |
80 | String table_name; |
81 | String database_name; |
82 | Context global_context; |
83 | Context kafka_context; |
84 | Names topics; |
85 | const String brokers; |
86 | const String group; |
87 | const String format_name; |
88 | char row_delimiter; /// optional row delimiter for generating char delimited stream in order to make various input stream parsers happy. |
89 | const String schema_name; |
90 | size_t num_consumers; /// total number of consumers |
91 | UInt64 max_block_size; /// maximum block size for insertion into this table |
92 | |
93 | /// Can differ from num_consumers in case of exception in startup() (or if startup() hasn't been called). |
94 | /// In this case we still need to be able to shutdown() properly. |
95 | size_t num_created_consumers = 0; /// number of actually created consumers. |
96 | |
97 | Poco::Logger * log; |
98 | |
99 | // Consumer list |
100 | Poco::Semaphore semaphore; |
101 | std::mutex mutex; |
102 | std::vector<ConsumerBufferPtr> buffers; /// available buffers for Kafka consumers |
103 | |
104 | size_t skip_broken; |
105 | |
106 | bool intermediate_commit; |
107 | |
108 | // Stream thread |
109 | BackgroundSchedulePool::TaskHolder task; |
110 | std::atomic<bool> stream_cancelled{false}; |
111 | |
112 | ConsumerBufferPtr createReadBuffer(); |
113 | |
114 | // Update Kafka configuration with values from CH user configuration. |
115 | void updateConfiguration(cppkafka::Configuration & conf); |
116 | |
117 | void threadFunc(); |
118 | bool streamToViews(); |
119 | bool checkDependencies(const String & database_name, const String & table_name); |
120 | }; |
121 | |
122 | } |
123 | |