1#include "ExecutableDictionarySource.h"
2
3#include <future>
4#include <thread>
5#include <ext/scope_guard.h>
6#include <DataStreams/IBlockOutputStream.h>
7#include <DataStreams/OwningBlockInputStream.h>
8#include <Interpreters/Context.h>
9#include <IO/WriteHelpers.h>
10#include <Common/ShellCommand.h>
11#include <Common/ThreadPool.h>
12#include <common/logger_useful.h>
13#include <common/LocalDateTime.h>
14#include "DictionarySourceFactory.h"
15#include "DictionarySourceHelpers.h"
16#include "DictionaryStructure.h"
17#include "registerDictionaries.h"
18
19
20namespace DB
21{
22static const UInt64 max_block_size = 8192;
23
24namespace ErrorCodes
25{
26 extern const int DICTIONARY_ACCESS_DENIED;
27}
28
29namespace
30{
31 /// Owns ShellCommand and calls wait for it.
32 class ShellCommandOwningBlockInputStream : public OwningBlockInputStream<ShellCommand>
33 {
34 public:
35 ShellCommandOwningBlockInputStream(const BlockInputStreamPtr & impl, std::unique_ptr<ShellCommand> own_)
36 : OwningBlockInputStream(std::move(impl), std::move(own_))
37 {
38 }
39
40 void readSuffix() override
41 {
42 OwningBlockInputStream<ShellCommand>::readSuffix();
43 own->wait();
44 }
45 };
46
47}
48
49
50ExecutableDictionarySource::ExecutableDictionarySource(
51 const DictionaryStructure & dict_struct_,
52 const Poco::Util::AbstractConfiguration & config,
53 const std::string & config_prefix,
54 Block & sample_block_,
55 const Context & context_)
56 : log(&Logger::get("ExecutableDictionarySource"))
57 , dict_struct{dict_struct_}
58 , command{config.getString(config_prefix + ".command")}
59 , update_field{config.getString(config_prefix + ".update_field", "")}
60 , format{config.getString(config_prefix + ".format")}
61 , sample_block{sample_block_}
62 , context(context_)
63{
64}
65
66ExecutableDictionarySource::ExecutableDictionarySource(const ExecutableDictionarySource & other)
67 : log(&Logger::get("ExecutableDictionarySource"))
68 , update_time{other.update_time}
69 , dict_struct{other.dict_struct}
70 , command{other.command}
71 , update_field{other.update_field}
72 , format{other.format}
73 , sample_block{other.sample_block}
74 , context(other.context)
75{
76}
77
78BlockInputStreamPtr ExecutableDictionarySource::loadAll()
79{
80 LOG_TRACE(log, "loadAll " + toString());
81 auto process = ShellCommand::execute(command);
82 auto input_stream = context.getInputFormat(format, process->out, sample_block, max_block_size);
83 return std::make_shared<ShellCommandOwningBlockInputStream>(input_stream, std::move(process));
84}
85
86BlockInputStreamPtr ExecutableDictionarySource::loadUpdatedAll()
87{
88 time_t new_update_time = time(nullptr);
89 SCOPE_EXIT(update_time = new_update_time);
90
91 std::string command_with_update_field = command;
92 if (update_time)
93 command_with_update_field += " " + update_field + " " + DB::toString(LocalDateTime(update_time - 1));
94
95 LOG_TRACE(log, "loadUpdatedAll " + command_with_update_field);
96 auto process = ShellCommand::execute(command_with_update_field);
97 auto input_stream = context.getInputFormat(format, process->out, sample_block, max_block_size);
98 return std::make_shared<ShellCommandOwningBlockInputStream>(input_stream, std::move(process));
99}
100
101namespace
102{
103 /** A stream, that also runs and waits for background thread
104 * (that will feed data into pipe to be read from the other side of the pipe).
105 */
106 class BlockInputStreamWithBackgroundThread final : public IBlockInputStream
107 {
108 public:
109 BlockInputStreamWithBackgroundThread(
110 const BlockInputStreamPtr & stream_, std::unique_ptr<ShellCommand> && command_, std::packaged_task<void()> && task_)
111 : stream{stream_}, command{std::move(command_)}, task(std::move(task_)), thread([this] {
112 task();
113 command->in.close();
114 })
115 {
116 children.push_back(stream);
117 }
118
119 ~BlockInputStreamWithBackgroundThread() override
120 {
121 if (thread.joinable())
122 {
123 try
124 {
125 readSuffix();
126 }
127 catch (...)
128 {
129 tryLogCurrentException(__PRETTY_FUNCTION__);
130 }
131 }
132 }
133
134 Block getHeader() const override { return stream->getHeader(); }
135
136 private:
137 Block readImpl() override { return stream->read(); }
138
139 void readSuffix() override
140 {
141 IBlockInputStream::readSuffix();
142 if (!wait_called)
143 {
144 wait_called = true;
145 command->wait();
146 }
147 thread.join();
148 /// To rethrow an exception, if any.
149 task.get_future().get();
150 }
151
152 String getName() const override { return "WithBackgroundThread"; }
153
154 BlockInputStreamPtr stream;
155 std::unique_ptr<ShellCommand> command;
156 std::packaged_task<void()> task;
157 ThreadFromGlobalPool thread;
158 bool wait_called = false;
159 };
160
161}
162
163
164BlockInputStreamPtr ExecutableDictionarySource::loadIds(const std::vector<UInt64> & ids)
165{
166 LOG_TRACE(log, "loadIds " << toString() << " size = " << ids.size());
167 auto process = ShellCommand::execute(command);
168
169 auto output_stream = context.getOutputFormat(format, process->in, sample_block);
170 auto input_stream = context.getInputFormat(format, process->out, sample_block, max_block_size);
171
172 return std::make_shared<BlockInputStreamWithBackgroundThread>(
173 input_stream, std::move(process), std::packaged_task<void()>([output_stream, &ids]() mutable { formatIDs(output_stream, ids); }));
174}
175
176BlockInputStreamPtr ExecutableDictionarySource::loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows)
177{
178 LOG_TRACE(log, "loadKeys " << toString() << " size = " << requested_rows.size());
179 auto process = ShellCommand::execute(command);
180
181 auto output_stream = context.getOutputFormat(format, process->in, sample_block);
182 auto input_stream = context.getInputFormat(format, process->out, sample_block, max_block_size);
183
184 return std::make_shared<BlockInputStreamWithBackgroundThread>(
185 input_stream, std::move(process), std::packaged_task<void()>([output_stream, key_columns, &requested_rows, this]() mutable
186 {
187 formatKeys(dict_struct, output_stream, key_columns, requested_rows);
188 }));
189}
190
191bool ExecutableDictionarySource::isModified() const
192{
193 return true;
194}
195
196bool ExecutableDictionarySource::supportsSelectiveLoad() const
197{
198 return true;
199}
200
201bool ExecutableDictionarySource::hasUpdateField() const
202{
203 if (update_field.empty())
204 return false;
205 else
206 return true;
207}
208
209DictionarySourcePtr ExecutableDictionarySource::clone() const
210{
211 return std::make_unique<ExecutableDictionarySource>(*this);
212}
213
214std::string ExecutableDictionarySource::toString() const
215{
216 return "Executable: " + command;
217}
218
219void registerDictionarySourceExecutable(DictionarySourceFactory & factory)
220{
221 auto createTableSource = [=](const DictionaryStructure & dict_struct,
222 const Poco::Util::AbstractConfiguration & config,
223 const std::string & config_prefix,
224 Block & sample_block,
225 const Context & context,
226 bool check_config) -> DictionarySourcePtr
227 {
228 if (dict_struct.has_expressions)
229 throw Exception{"Dictionary source of type `executable` does not support attribute expressions", ErrorCodes::LOGICAL_ERROR};
230
231 /// Executable dictionaries may execute arbitrary commands.
232 /// It's OK for dictionaries created by administrator from xml-file, but
233 /// maybe dangerous for dictionaries created from DDL-queries.
234 if (check_config)
235 throw Exception("Dictionaries with Executable dictionary source is not allowed", ErrorCodes::DICTIONARY_ACCESS_DENIED);
236
237 return std::make_unique<ExecutableDictionarySource>(
238 dict_struct, config, config_prefix + ".executable",
239 sample_block, context);
240 };
241 factory.registerSource("executable", createTableSource);
242}
243
244}
245