| 1 | #pragma once |
| 2 | |
| 3 | #include <list> |
| 4 | #include <queue> |
| 5 | #include <atomic> |
| 6 | #include <thread> |
| 7 | #include <mutex> |
| 8 | |
| 9 | #include <common/logger_useful.h> |
| 10 | |
| 11 | #include <DataStreams/IBlockStream_fwd.h> |
| 12 | #include <Common/setThreadName.h> |
| 13 | #include <Common/CurrentMetrics.h> |
| 14 | #include <Common/CurrentThread.h> |
| 15 | #include <Common/ThreadPool.h> |
| 16 | |
| 17 | |
| 18 | /** Allows to process multiple block input streams (sources) in parallel, using specified number of threads. |
| 19 | * Reads (pulls) blocks from any available source and passes it to specified handler. |
| 20 | * |
| 21 | * Before any reading, calls "readPrefix" method of sources in parallel. |
| 22 | * |
| 23 | * (As an example, "readPrefix" can prepare connections to remote servers, |
| 24 | * and we want this work to be executed in parallel for different sources) |
| 25 | * |
| 26 | * Implemented in following way: |
| 27 | * - there are multiple input sources to read blocks from; |
| 28 | * - there are multiple threads, that could simultaneously read blocks from different sources; |
| 29 | * - "available" sources (that are not read in any thread right now) are put in queue of sources; |
| 30 | * - when thread take a source to read from, it removes source from queue of sources, |
| 31 | * then read block from source and then put source back to queue of available sources. |
| 32 | */ |
| 33 | |
| 34 | namespace CurrentMetrics |
| 35 | { |
| 36 | extern const Metric QueryThread; |
| 37 | } |
| 38 | |
| 39 | namespace DB |
| 40 | { |
| 41 | |
| 42 | /// Example of the handler. |
| 43 | struct ParallelInputsHandler |
| 44 | { |
| 45 | /// Processing the data block. |
| 46 | void onBlock(Block & /*block*/, size_t /*thread_num*/) {} |
| 47 | |
| 48 | /// Called for each thread, when the thread has nothing else to do. |
| 49 | /// Due to the fact that part of the sources has run out, and now there are fewer sources left than streams. |
| 50 | /// Called if the `onException` method does not throw an exception; is called before the `onFinish` method. |
| 51 | void onFinishThread(size_t /*thread_num*/) {} |
| 52 | |
| 53 | /// Blocks are over. Due to the fact that all sources ran out or because of the cancellation of work. |
| 54 | /// This method is always called exactly once, at the end of the work, if the `onException` method does not throw an exception. |
| 55 | void onFinish() {} |
| 56 | |
| 57 | /// Exception handling. It is reasonable to call the ParallelInputsProcessor::cancel method in this method, and also pass the exception to the main thread. |
| 58 | void onException(std::exception_ptr & /*exception*/, size_t /*thread_num*/) {} |
| 59 | }; |
| 60 | |
| 61 | |
| 62 | template <typename Handler> |
| 63 | class ParallelInputsProcessor |
| 64 | { |
| 65 | public: |
| 66 | /** additional_input_at_end - if not nullptr, |
| 67 | * then the blocks from this source will start to be processed only after all other sources are processed. |
| 68 | * This is done in the main thread. |
| 69 | * |
| 70 | * Intended for implementation of FULL and RIGHT JOIN |
| 71 | * - where you must first make JOIN in parallel, while noting which keys are not found, |
| 72 | * and only after the completion of this work, create blocks of keys that are not found. |
| 73 | */ |
| 74 | ParallelInputsProcessor(const BlockInputStreams & inputs_, const BlockInputStreamPtr & additional_input_at_end_, size_t max_threads_, Handler & handler_) |
| 75 | : inputs(inputs_), additional_input_at_end(additional_input_at_end_), max_threads(std::min(inputs_.size(), max_threads_)), handler(handler_) |
| 76 | { |
| 77 | for (size_t i = 0; i < inputs_.size(); ++i) |
| 78 | unprepared_inputs.emplace(inputs_[i], i); |
| 79 | } |
| 80 | |
| 81 | ~ParallelInputsProcessor() |
| 82 | { |
| 83 | try |
| 84 | { |
| 85 | wait(); |
| 86 | } |
| 87 | catch (...) |
| 88 | { |
| 89 | tryLogCurrentException(__PRETTY_FUNCTION__); |
| 90 | } |
| 91 | } |
| 92 | |
| 93 | /// Start background threads, start work. |
| 94 | void process() |
| 95 | { |
| 96 | active_threads = max_threads; |
| 97 | threads.reserve(max_threads); |
| 98 | |
| 99 | try |
| 100 | { |
| 101 | for (size_t i = 0; i < max_threads; ++i) |
| 102 | threads.emplace_back(&ParallelInputsProcessor::thread, this, CurrentThread::getGroup(), i); |
| 103 | } |
| 104 | catch (...) |
| 105 | { |
| 106 | cancel(false); |
| 107 | wait(); |
| 108 | if (active_threads) |
| 109 | { |
| 110 | active_threads = 0; |
| 111 | /// handler.onFinish() is supposed to be called from one of the threads when the number of |
| 112 | /// finished threads reaches max_threads. But since we weren't able to launch all threads, |
| 113 | /// we have to call onFinish() manually here. |
| 114 | handler.onFinish(); |
| 115 | } |
| 116 | throw; |
| 117 | } |
| 118 | } |
| 119 | |
| 120 | /// Ask all sources to stop earlier than they run out. |
| 121 | void cancel(bool kill) |
| 122 | { |
| 123 | finish = true; |
| 124 | |
| 125 | for (auto & input : inputs) |
| 126 | { |
| 127 | try |
| 128 | { |
| 129 | input->cancel(kill); |
| 130 | } |
| 131 | catch (...) |
| 132 | { |
| 133 | /** If you can not ask one or more sources to stop. |
| 134 | * (for example, the connection is broken for distributed query processing) |
| 135 | * - then do not care. |
| 136 | */ |
| 137 | LOG_ERROR(log, "Exception while cancelling " << input->getName()); |
| 138 | } |
| 139 | } |
| 140 | } |
| 141 | |
| 142 | /// Wait until all threads are finished, before the destructor. |
| 143 | void wait() |
| 144 | { |
| 145 | if (joined_threads) |
| 146 | return; |
| 147 | |
| 148 | for (auto & thread : threads) |
| 149 | thread.join(); |
| 150 | |
| 151 | threads.clear(); |
| 152 | joined_threads = true; |
| 153 | } |
| 154 | |
| 155 | size_t getNumActiveThreads() const |
| 156 | { |
| 157 | return active_threads; |
| 158 | } |
| 159 | |
| 160 | private: |
| 161 | /// Single source data |
| 162 | struct InputData |
| 163 | { |
| 164 | BlockInputStreamPtr in; |
| 165 | size_t i = 0; /// The source number (for debugging). |
| 166 | |
| 167 | InputData() {} |
| 168 | InputData(const BlockInputStreamPtr & in_, size_t i_) : in(in_), i(i_) {} |
| 169 | }; |
| 170 | |
| 171 | void publishPayload(Block & block, size_t thread_num) |
| 172 | { |
| 173 | handler.onBlock(block, thread_num); |
| 174 | } |
| 175 | |
| 176 | void thread(ThreadGroupStatusPtr thread_group, size_t thread_num) |
| 177 | { |
| 178 | std::exception_ptr exception; |
| 179 | CurrentMetrics::Increment metric_increment{CurrentMetrics::QueryThread}; |
| 180 | |
| 181 | try |
| 182 | { |
| 183 | setThreadName("ParalInputsProc" ); |
| 184 | if (thread_group) |
| 185 | CurrentThread::attachTo(thread_group); |
| 186 | |
| 187 | while (!finish) |
| 188 | { |
| 189 | InputData unprepared_input; |
| 190 | { |
| 191 | std::lock_guard lock(unprepared_inputs_mutex); |
| 192 | |
| 193 | if (unprepared_inputs.empty()) |
| 194 | break; |
| 195 | |
| 196 | unprepared_input = unprepared_inputs.front(); |
| 197 | unprepared_inputs.pop(); |
| 198 | } |
| 199 | |
| 200 | unprepared_input.in->readPrefix(); |
| 201 | |
| 202 | { |
| 203 | std::lock_guard lock(available_inputs_mutex); |
| 204 | available_inputs.push(unprepared_input); |
| 205 | } |
| 206 | } |
| 207 | |
| 208 | loop(thread_num); |
| 209 | } |
| 210 | catch (...) |
| 211 | { |
| 212 | exception = std::current_exception(); |
| 213 | } |
| 214 | |
| 215 | if (exception) |
| 216 | { |
| 217 | handler.onException(exception, thread_num); |
| 218 | } |
| 219 | |
| 220 | handler.onFinishThread(thread_num); |
| 221 | |
| 222 | /// The last thread on the output indicates that there is no more data. |
| 223 | if (0 == --active_threads) |
| 224 | { |
| 225 | /// And then it processes an additional source, if there is one. |
| 226 | if (additional_input_at_end) |
| 227 | { |
| 228 | try |
| 229 | { |
| 230 | additional_input_at_end->readPrefix(); |
| 231 | while (Block block = additional_input_at_end->read()) |
| 232 | publishPayload(block, thread_num); |
| 233 | } |
| 234 | catch (...) |
| 235 | { |
| 236 | exception = std::current_exception(); |
| 237 | } |
| 238 | |
| 239 | if (exception) |
| 240 | { |
| 241 | handler.onException(exception, thread_num); |
| 242 | } |
| 243 | } |
| 244 | |
| 245 | handler.onFinish(); /// TODO If in `onFinish` or `onFinishThread` there is an exception, then std::terminate is called. |
| 246 | } |
| 247 | } |
| 248 | |
| 249 | void loop(size_t thread_num) |
| 250 | { |
| 251 | while (!finish) /// You may need to stop work earlier than all sources run out. |
| 252 | { |
| 253 | InputData input; |
| 254 | |
| 255 | /// Select the next source. |
| 256 | { |
| 257 | std::lock_guard lock(available_inputs_mutex); |
| 258 | |
| 259 | /// If there are no free sources, then this thread is no longer needed. (But other threads can work with their sources.) |
| 260 | if (available_inputs.empty()) |
| 261 | break; |
| 262 | |
| 263 | input = available_inputs.front(); |
| 264 | |
| 265 | /// We remove the source from the queue of available sources. |
| 266 | available_inputs.pop(); |
| 267 | } |
| 268 | |
| 269 | /// The main work. |
| 270 | Block block = input.in->read(); |
| 271 | |
| 272 | { |
| 273 | if (finish) |
| 274 | break; |
| 275 | |
| 276 | /// If this source is not run out yet, then put the resulting block in the ready queue. |
| 277 | { |
| 278 | std::lock_guard lock(available_inputs_mutex); |
| 279 | |
| 280 | if (block) |
| 281 | { |
| 282 | available_inputs.push(input); |
| 283 | } |
| 284 | else |
| 285 | { |
| 286 | if (available_inputs.empty()) |
| 287 | break; |
| 288 | } |
| 289 | } |
| 290 | |
| 291 | if (finish) |
| 292 | break; |
| 293 | |
| 294 | if (block) |
| 295 | publishPayload(block, thread_num); |
| 296 | } |
| 297 | } |
| 298 | } |
| 299 | |
| 300 | BlockInputStreams inputs; |
| 301 | BlockInputStreamPtr additional_input_at_end; |
| 302 | unsigned max_threads; |
| 303 | |
| 304 | Handler & handler; |
| 305 | |
| 306 | /// Threads. |
| 307 | using ThreadsData = std::vector<ThreadFromGlobalPool>; |
| 308 | ThreadsData threads; |
| 309 | |
| 310 | /** A set of available sources that are not currently processed by any thread. |
| 311 | * Each thread takes one source from this set, takes a block out of the source (at this moment the source does the calculations) |
| 312 | * and (if the source is not run out), puts it back into the set of available sources. |
| 313 | * |
| 314 | * The question arises what is better to use: |
| 315 | * - the queue (just processed source will be processed the next time later than the rest) |
| 316 | * - stack (just processed source will be processed as soon as possible). |
| 317 | * |
| 318 | * The stack is better than the queue when you need to do work on reading one source more consequentially, |
| 319 | * and theoretically, this allows you to achieve more consequent/consistent reads from the disk. |
| 320 | * |
| 321 | * But when using the stack, there is a problem with distributed query processing: |
| 322 | * data is read only from a part of the servers, and on the other servers |
| 323 | * a timeout occurs during send, and the request processing ends with an exception. |
| 324 | * |
| 325 | * Therefore, a queue is used. This can be improved in the future. |
| 326 | */ |
| 327 | using AvailableInputs = std::queue<InputData>; |
| 328 | AvailableInputs available_inputs; |
| 329 | |
| 330 | /** For parallel preparing (readPrefix) child streams. |
| 331 | * First, streams are located here. |
| 332 | * After a stream was prepared, it is moved to "available_inputs" for reading. |
| 333 | */ |
| 334 | using UnpreparedInputs = std::queue<InputData>; |
| 335 | UnpreparedInputs unprepared_inputs; |
| 336 | |
| 337 | /// For operations with available_inputs. |
| 338 | std::mutex available_inputs_mutex; |
| 339 | |
| 340 | /// For operations with unprepared_inputs. |
| 341 | std::mutex unprepared_inputs_mutex; |
| 342 | |
| 343 | /// How many sources ran out. |
| 344 | std::atomic<size_t> active_threads { 0 }; |
| 345 | /// Finish the threads work (before the sources run out). |
| 346 | std::atomic<bool> finish { false }; |
| 347 | /// Wait for the completion of all threads. |
| 348 | std::atomic<bool> joined_threads { false }; |
| 349 | |
| 350 | Logger * log = &Logger::get("ParallelInputsProcessor" ); |
| 351 | }; |
| 352 | |
| 353 | |
| 354 | } |
| 355 | |