1#include <Common/config.h>
2
3#if USE_HDFS
4
5#include <IO/WriteBufferFromHDFS.h>
6#include <IO/HDFSCommon.h>
7#include <hdfs/hdfs.h>
8
9
10namespace DB
11{
12
13namespace ErrorCodes
14{
15extern const int NETWORK_ERROR;
16extern const int CANNOT_OPEN_FILE;
17extern const int CANNOT_FSYNC;
18extern const int BAD_ARGUMENTS;
19}
20
21
22struct WriteBufferFromHDFS::WriteBufferFromHDFSImpl
23{
24 std::string hdfs_uri;
25 hdfsFile fout;
26 HDFSBuilderPtr builder;
27 HDFSFSPtr fs;
28
29 WriteBufferFromHDFSImpl(const std::string & hdfs_name_)
30 : hdfs_uri(hdfs_name_)
31 , builder(createHDFSBuilder(hdfs_uri))
32 , fs(createHDFSFS(builder.get()))
33 {
34 const size_t begin_of_path = hdfs_uri.find('/', hdfs_uri.find("//") + 2);
35 const std::string path = hdfs_uri.substr(begin_of_path);
36 if (path.find_first_of("*?{") != std::string::npos)
37 throw Exception("URI '" + hdfs_uri + "' contains globs, so the table is in readonly mode", ErrorCodes::CANNOT_OPEN_FILE);
38
39 if (!hdfsExists(fs.get(), path.c_str()))
40 throw Exception("File: " + path + " is already exists", ErrorCodes::BAD_ARGUMENTS);
41 fout = hdfsOpenFile(fs.get(), path.c_str(), O_WRONLY, 0, 0, 0); /// O_WRONLY meaning create or overwrite i.e., implies O_TRUNCAT here
42
43 if (fout == nullptr)
44 {
45 throw Exception("Unable to open HDFS file: " + path + " error: " + std::string(hdfsGetLastError()),
46 ErrorCodes::CANNOT_OPEN_FILE);
47 }
48
49 }
50
51 ~WriteBufferFromHDFSImpl()
52 {
53 hdfsCloseFile(fs.get(), fout);
54 }
55
56
57 int write(const char * start, size_t size)
58 {
59 int bytes_written = hdfsWrite(fs.get(), fout, start, size);
60 if (bytes_written < 0)
61 throw Exception("Fail to write HDFS file: " + hdfs_uri + " " + std::string(hdfsGetLastError()),
62 ErrorCodes::NETWORK_ERROR);
63 return bytes_written;
64 }
65
66 void sync()
67 {
68 int result = hdfsSync(fs.get(), fout);
69 if (result < 0)
70 throwFromErrno("Cannot HDFS sync" + hdfs_uri + " " + std::string(hdfsGetLastError()),
71 ErrorCodes::CANNOT_FSYNC);
72 }
73};
74
75WriteBufferFromHDFS::WriteBufferFromHDFS(const std::string & hdfs_name_, size_t buf_size)
76 : BufferWithOwnMemory<WriteBuffer>(buf_size)
77 , impl(std::make_unique<WriteBufferFromHDFSImpl>(hdfs_name_))
78{
79}
80
81
82void WriteBufferFromHDFS::nextImpl()
83{
84 if (!offset())
85 return;
86
87 size_t bytes_written = 0;
88
89 while (bytes_written != offset())
90 bytes_written += impl->write(working_buffer.begin() + bytes_written, offset() - bytes_written);
91}
92
93
94void WriteBufferFromHDFS::sync()
95{
96 impl->sync();
97}
98
99WriteBufferFromHDFS::~WriteBufferFromHDFS()
100{
101 try
102 {
103 next();
104 }
105 catch (...)
106 {
107 tryLogCurrentException(__PRETTY_FUNCTION__);
108 }
109}
110
111}
112#endif
113