1 | #include "ExecutableDictionarySource.h" |
2 | |
3 | #include <future> |
4 | #include <thread> |
5 | #include <ext/scope_guard.h> |
6 | #include <DataStreams/IBlockOutputStream.h> |
7 | #include <DataStreams/OwningBlockInputStream.h> |
8 | #include <Interpreters/Context.h> |
9 | #include <IO/WriteHelpers.h> |
10 | #include <Common/ShellCommand.h> |
11 | #include <Common/ThreadPool.h> |
12 | #include <common/logger_useful.h> |
13 | #include <common/LocalDateTime.h> |
14 | #include "DictionarySourceFactory.h" |
15 | #include "DictionarySourceHelpers.h" |
16 | #include "DictionaryStructure.h" |
17 | #include "registerDictionaries.h" |
18 | |
19 | |
20 | namespace DB |
21 | { |
22 | static const UInt64 max_block_size = 8192; |
23 | |
24 | namespace ErrorCodes |
25 | { |
26 | extern const int DICTIONARY_ACCESS_DENIED; |
27 | } |
28 | |
29 | namespace |
30 | { |
31 | /// Owns ShellCommand and calls wait for it. |
32 | class ShellCommandOwningBlockInputStream : public OwningBlockInputStream<ShellCommand> |
33 | { |
34 | public: |
35 | ShellCommandOwningBlockInputStream(const BlockInputStreamPtr & impl, std::unique_ptr<ShellCommand> own_) |
36 | : OwningBlockInputStream(std::move(impl), std::move(own_)) |
37 | { |
38 | } |
39 | |
40 | void readSuffix() override |
41 | { |
42 | OwningBlockInputStream<ShellCommand>::readSuffix(); |
43 | own->wait(); |
44 | } |
45 | }; |
46 | |
47 | } |
48 | |
49 | |
50 | ExecutableDictionarySource::ExecutableDictionarySource( |
51 | const DictionaryStructure & dict_struct_, |
52 | const Poco::Util::AbstractConfiguration & config, |
53 | const std::string & config_prefix, |
54 | Block & sample_block_, |
55 | const Context & context_) |
56 | : log(&Logger::get("ExecutableDictionarySource" )) |
57 | , dict_struct{dict_struct_} |
58 | , command{config.getString(config_prefix + ".command" )} |
59 | , update_field{config.getString(config_prefix + ".update_field" , "" )} |
60 | , format{config.getString(config_prefix + ".format" )} |
61 | , sample_block{sample_block_} |
62 | , context(context_) |
63 | { |
64 | } |
65 | |
66 | ExecutableDictionarySource::ExecutableDictionarySource(const ExecutableDictionarySource & other) |
67 | : log(&Logger::get("ExecutableDictionarySource" )) |
68 | , update_time{other.update_time} |
69 | , dict_struct{other.dict_struct} |
70 | , command{other.command} |
71 | , update_field{other.update_field} |
72 | , format{other.format} |
73 | , sample_block{other.sample_block} |
74 | , context(other.context) |
75 | { |
76 | } |
77 | |
78 | BlockInputStreamPtr ExecutableDictionarySource::loadAll() |
79 | { |
80 | LOG_TRACE(log, "loadAll " + toString()); |
81 | auto process = ShellCommand::execute(command); |
82 | auto input_stream = context.getInputFormat(format, process->out, sample_block, max_block_size); |
83 | return std::make_shared<ShellCommandOwningBlockInputStream>(input_stream, std::move(process)); |
84 | } |
85 | |
86 | BlockInputStreamPtr ExecutableDictionarySource::loadUpdatedAll() |
87 | { |
88 | time_t new_update_time = time(nullptr); |
89 | SCOPE_EXIT(update_time = new_update_time); |
90 | |
91 | std::string command_with_update_field = command; |
92 | if (update_time) |
93 | command_with_update_field += " " + update_field + " " + DB::toString(LocalDateTime(update_time - 1)); |
94 | |
95 | LOG_TRACE(log, "loadUpdatedAll " + command_with_update_field); |
96 | auto process = ShellCommand::execute(command_with_update_field); |
97 | auto input_stream = context.getInputFormat(format, process->out, sample_block, max_block_size); |
98 | return std::make_shared<ShellCommandOwningBlockInputStream>(input_stream, std::move(process)); |
99 | } |
100 | |
101 | namespace |
102 | { |
103 | /** A stream, that also runs and waits for background thread |
104 | * (that will feed data into pipe to be read from the other side of the pipe). |
105 | */ |
106 | class BlockInputStreamWithBackgroundThread final : public IBlockInputStream |
107 | { |
108 | public: |
109 | BlockInputStreamWithBackgroundThread( |
110 | const BlockInputStreamPtr & stream_, std::unique_ptr<ShellCommand> && command_, std::packaged_task<void()> && task_) |
111 | : stream{stream_}, command{std::move(command_)}, task(std::move(task_)), thread([this] { |
112 | task(); |
113 | command->in.close(); |
114 | }) |
115 | { |
116 | children.push_back(stream); |
117 | } |
118 | |
119 | ~BlockInputStreamWithBackgroundThread() override |
120 | { |
121 | if (thread.joinable()) |
122 | { |
123 | try |
124 | { |
125 | readSuffix(); |
126 | } |
127 | catch (...) |
128 | { |
129 | tryLogCurrentException(__PRETTY_FUNCTION__); |
130 | } |
131 | } |
132 | } |
133 | |
134 | Block () const override { return stream->getHeader(); } |
135 | |
136 | private: |
137 | Block readImpl() override { return stream->read(); } |
138 | |
139 | void readSuffix() override |
140 | { |
141 | IBlockInputStream::readSuffix(); |
142 | if (!wait_called) |
143 | { |
144 | wait_called = true; |
145 | command->wait(); |
146 | } |
147 | thread.join(); |
148 | /// To rethrow an exception, if any. |
149 | task.get_future().get(); |
150 | } |
151 | |
152 | String getName() const override { return "WithBackgroundThread" ; } |
153 | |
154 | BlockInputStreamPtr stream; |
155 | std::unique_ptr<ShellCommand> command; |
156 | std::packaged_task<void()> task; |
157 | ThreadFromGlobalPool thread; |
158 | bool wait_called = false; |
159 | }; |
160 | |
161 | } |
162 | |
163 | |
164 | BlockInputStreamPtr ExecutableDictionarySource::loadIds(const std::vector<UInt64> & ids) |
165 | { |
166 | LOG_TRACE(log, "loadIds " << toString() << " size = " << ids.size()); |
167 | auto process = ShellCommand::execute(command); |
168 | |
169 | auto output_stream = context.getOutputFormat(format, process->in, sample_block); |
170 | auto input_stream = context.getInputFormat(format, process->out, sample_block, max_block_size); |
171 | |
172 | return std::make_shared<BlockInputStreamWithBackgroundThread>( |
173 | input_stream, std::move(process), std::packaged_task<void()>([output_stream, &ids]() mutable { formatIDs(output_stream, ids); })); |
174 | } |
175 | |
176 | BlockInputStreamPtr ExecutableDictionarySource::loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows) |
177 | { |
178 | LOG_TRACE(log, "loadKeys " << toString() << " size = " << requested_rows.size()); |
179 | auto process = ShellCommand::execute(command); |
180 | |
181 | auto output_stream = context.getOutputFormat(format, process->in, sample_block); |
182 | auto input_stream = context.getInputFormat(format, process->out, sample_block, max_block_size); |
183 | |
184 | return std::make_shared<BlockInputStreamWithBackgroundThread>( |
185 | input_stream, std::move(process), std::packaged_task<void()>([output_stream, key_columns, &requested_rows, this]() mutable |
186 | { |
187 | formatKeys(dict_struct, output_stream, key_columns, requested_rows); |
188 | })); |
189 | } |
190 | |
191 | bool ExecutableDictionarySource::isModified() const |
192 | { |
193 | return true; |
194 | } |
195 | |
196 | bool ExecutableDictionarySource::supportsSelectiveLoad() const |
197 | { |
198 | return true; |
199 | } |
200 | |
201 | bool ExecutableDictionarySource::hasUpdateField() const |
202 | { |
203 | if (update_field.empty()) |
204 | return false; |
205 | else |
206 | return true; |
207 | } |
208 | |
209 | DictionarySourcePtr ExecutableDictionarySource::clone() const |
210 | { |
211 | return std::make_unique<ExecutableDictionarySource>(*this); |
212 | } |
213 | |
214 | std::string ExecutableDictionarySource::toString() const |
215 | { |
216 | return "Executable: " + command; |
217 | } |
218 | |
219 | void registerDictionarySourceExecutable(DictionarySourceFactory & factory) |
220 | { |
221 | auto createTableSource = [=](const DictionaryStructure & dict_struct, |
222 | const Poco::Util::AbstractConfiguration & config, |
223 | const std::string & config_prefix, |
224 | Block & sample_block, |
225 | const Context & context, |
226 | bool check_config) -> DictionarySourcePtr |
227 | { |
228 | if (dict_struct.has_expressions) |
229 | throw Exception{"Dictionary source of type `executable` does not support attribute expressions" , ErrorCodes::LOGICAL_ERROR}; |
230 | |
231 | /// Executable dictionaries may execute arbitrary commands. |
232 | /// It's OK for dictionaries created by administrator from xml-file, but |
233 | /// maybe dangerous for dictionaries created from DDL-queries. |
234 | if (check_config) |
235 | throw Exception("Dictionaries with Executable dictionary source is not allowed" , ErrorCodes::DICTIONARY_ACCESS_DENIED); |
236 | |
237 | return std::make_unique<ExecutableDictionarySource>( |
238 | dict_struct, config, config_prefix + ".executable" , |
239 | sample_block, context); |
240 | }; |
241 | factory.registerSource("executable" , createTableSource); |
242 | } |
243 | |
244 | } |
245 | |