1#pragma once
2
3#include <DataStreams/IBlockInputStream.h>
4
5
6namespace DB
7{
8
9/** Initialize another source on the first `read` call, and then use it.
10 * This is needed, for example, to read from a table that will be populated
11 * after creation of LazyBlockInputStream object, but before the first `read` call.
12 */
13class LazyBlockInputStream : public IBlockInputStream
14{
15public:
16 using Generator = std::function<BlockInputStreamPtr()>;
17
18 LazyBlockInputStream(const Block & header_, Generator generator_)
19 : header(header_), generator(std::move(generator_))
20 {
21 }
22
23 LazyBlockInputStream(const char * name_, const Block & header_, Generator generator_)
24 : name(name_), header(header_), generator(std::move(generator_))
25 {
26 }
27
28 String getName() const override { return name; }
29
30 Block getHeader() const override
31 {
32 return header;
33 }
34
35 /// We call readPrefix lazily. Suppress default behaviour.
36 void readPrefix() override {}
37
38protected:
39 Block readImpl() override
40 {
41 if (!input)
42 {
43 input = generator();
44
45 if (!input)
46 return Block();
47
48 auto * p_input = dynamic_cast<IBlockInputStream *>(input.get());
49
50 if (p_input)
51 {
52 /// They could have been set before, but were not passed into the `input`.
53 if (progress_callback)
54 p_input->setProgressCallback(progress_callback);
55 if (process_list_elem)
56 p_input->setProcessListElement(process_list_elem);
57 }
58
59 input->readPrefix();
60
61 {
62 addChild(input);
63
64 if (isCancelled() && p_input)
65 p_input->cancel(is_killed);
66 }
67 }
68
69 return input->read();
70 }
71
72private:
73 const char * name = "Lazy";
74 Block header;
75 Generator generator;
76
77 BlockInputStreamPtr input;
78};
79
80}
81