1#include <IO/HDFSCommon.h>
2#include <Poco/URI.h>
3
4#if USE_HDFS
5#include <Common/Exception.h>
6
7namespace DB
8{
9namespace ErrorCodes
10{
11extern const int BAD_ARGUMENTS;
12extern const int NETWORK_ERROR;
13}
14
15HDFSBuilderPtr createHDFSBuilder(const std::string & uri_str)
16{
17 const Poco::URI uri(uri_str);
18 auto & host = uri.getHost();
19 auto port = uri.getPort();
20 const std::string path = "//";
21 if (host.empty())
22 throw Exception("Illegal HDFS URI: " + uri.toString(), ErrorCodes::BAD_ARGUMENTS);
23
24 HDFSBuilderPtr builder(hdfsNewBuilder());
25 if (builder == nullptr)
26 throw Exception("Unable to create builder to connect to HDFS: " + uri.toString() + " " + std::string(hdfsGetLastError()),
27 ErrorCodes::NETWORK_ERROR);
28 hdfsBuilderConfSetStr(builder.get(), "input.read.timeout", "60000"); // 1 min
29 hdfsBuilderConfSetStr(builder.get(), "input.write.timeout", "60000"); // 1 min
30 hdfsBuilderConfSetStr(builder.get(), "input.connect.timeout", "60000"); // 1 min
31
32 std::string user_info = uri.getUserInfo();
33 if (!user_info.empty() && user_info.front() != ':')
34 {
35 std::string user;
36 size_t delim_pos = user_info.find(":");
37 if (delim_pos != std::string::npos)
38 user = user_info.substr(0, delim_pos);
39 else
40 user = user_info;
41
42 hdfsBuilderSetUserName(builder.get(), user.c_str());
43 }
44 hdfsBuilderSetNameNode(builder.get(), host.c_str());
45 if (port != 0)
46 {
47 hdfsBuilderSetNameNodePort(builder.get(), port);
48 }
49 return builder;
50}
51
52HDFSFSPtr createHDFSFS(hdfsBuilder * builder)
53{
54 HDFSFSPtr fs(hdfsBuilderConnect(builder));
55 if (fs == nullptr)
56 throw Exception("Unable to connect to HDFS: " + std::string(hdfsGetLastError()),
57 ErrorCodes::NETWORK_ERROR);
58
59 return fs;
60}
61}
62#endif
63