1#include "ReadBufferFromHDFS.h"
2
3#if USE_HDFS
4#include <IO/HDFSCommon.h>
5#include <hdfs/hdfs.h>
6
7
8namespace DB
9{
10namespace ErrorCodes
11{
12 extern const int NETWORK_ERROR;
13 extern const int CANNOT_OPEN_FILE;
14}
15
16struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl
17{
18 std::string hdfs_uri;
19 hdfsFile fin;
20 HDFSBuilderPtr builder;
21 HDFSFSPtr fs;
22
23 ReadBufferFromHDFSImpl(const std::string & hdfs_name_)
24 : hdfs_uri(hdfs_name_)
25 , builder(createHDFSBuilder(hdfs_uri))
26 , fs(createHDFSFS(builder.get()))
27 {
28 const size_t begin_of_path = hdfs_uri.find('/', hdfs_uri.find("//") + 2);
29 const std::string path = hdfs_uri.substr(begin_of_path);
30 fin = hdfsOpenFile(fs.get(), path.c_str(), O_RDONLY, 0, 0, 0);
31
32 if (fin == nullptr)
33 throw Exception("Unable to open HDFS file: " + path + " error: " + std::string(hdfsGetLastError()),
34 ErrorCodes::CANNOT_OPEN_FILE);
35 }
36
37 int read(char * start, size_t size)
38 {
39 int bytes_read = hdfsRead(fs.get(), fin, start, size);
40 if (bytes_read < 0)
41 throw Exception("Fail to read HDFS file: " + hdfs_uri + " " + std::string(hdfsGetLastError()),
42 ErrorCodes::NETWORK_ERROR);
43 return bytes_read;
44 }
45
46 ~ReadBufferFromHDFSImpl()
47 {
48 hdfsCloseFile(fs.get(), fin);
49 }
50};
51
52ReadBufferFromHDFS::ReadBufferFromHDFS(const std::string & hdfs_name_, size_t buf_size)
53 : BufferWithOwnMemory<ReadBuffer>(buf_size)
54 , impl(std::make_unique<ReadBufferFromHDFSImpl>(hdfs_name_))
55{
56}
57
58
59bool ReadBufferFromHDFS::nextImpl()
60{
61 int bytes_read = impl->read(internal_buffer.begin(), internal_buffer.size());
62
63 if (bytes_read)
64 working_buffer.resize(bytes_read);
65 else
66 return false;
67 return true;
68}
69
70ReadBufferFromHDFS::~ReadBufferFromHDFS()
71{
72}
73
74}
75
76#endif
77