1#pragma once
2
3#include <Common/ClickHouseRevision.h>
4#include <DataStreams/IBlockInputStream.h>
5#include <DataStreams/NativeBlockInputStream.h>
6#include <DataStreams/NativeBlockOutputStream.h>
7#include <DataStreams/copyData.h>
8#include <Compression/CompressedReadBuffer.h>
9#include <Compression/CompressedWriteBuffer.h>
10#include <IO/ReadBufferFromFile.h>
11#include <IO/WriteBufferFromFile.h>
12
13namespace DB
14{
15
16/// To read the data that was flushed into the temporary data file.
17struct TemporaryFileStream
18{
19 ReadBufferFromFile file_in;
20 CompressedReadBuffer compressed_in;
21 BlockInputStreamPtr block_in;
22
23 TemporaryFileStream(const std::string & path)
24 : file_in(path)
25 , compressed_in(file_in)
26 , block_in(std::make_shared<NativeBlockInputStream>(compressed_in, ClickHouseRevision::get()))
27 {}
28
29 TemporaryFileStream(const std::string & path, const Block & header_)
30 : file_in(path)
31 , compressed_in(file_in)
32 , block_in(std::make_shared<NativeBlockInputStream>(compressed_in, header_, 0))
33 {}
34
35 /// Flush data from input stream into file for future reading
36 static void write(const std::string & path, const Block & header, IBlockInputStream & input, std::atomic<bool> * is_cancelled = nullptr)
37 {
38 WriteBufferFromFile file_buf(path);
39 CompressedWriteBuffer compressed_buf(file_buf);
40 NativeBlockOutputStream output(compressed_buf, 0, header);
41 copyData(input, output, is_cancelled);
42 }
43};
44
45class TemporaryFileLazyInputStream : public IBlockInputStream
46{
47public:
48 TemporaryFileLazyInputStream(const std::string & path_, const Block & header_)
49 : path(path_)
50 , header(header_)
51 , done(false)
52 {}
53
54 String getName() const override { return "TemporaryFile"; }
55 Block getHeader() const override { return header; }
56 void readSuffix() override {}
57
58protected:
59 Block readImpl() override
60 {
61 if (!done)
62 {
63 done = true;
64 TemporaryFileStream stream(path, header);
65 return stream.block_in->read();
66 }
67 return {};
68 }
69
70private:
71 const std::string path;
72 Block header;
73 bool done;
74};
75
76}
77