1 | /******************************************************************** |
2 | * Copyright (c) 2013 - 2014, Pivotal Inc. |
3 | * All rights reserved. |
4 | * |
5 | * Author: Zhanwei Wang |
6 | ********************************************************************/ |
7 | /******************************************************************** |
8 | * 2014 - |
9 | * open source under Apache License Version 2.0 |
10 | ********************************************************************/ |
11 | /** |
12 | * Licensed to the Apache Software Foundation (ASF) under one |
13 | * or more contributor license agreements. See the NOTICE file |
14 | * distributed with this work for additional information |
15 | * regarding copyright ownership. The ASF licenses this file |
16 | * to you under the Apache License, Version 2.0 (the |
17 | * "License"); you may not use this file except in compliance |
18 | * with the License. You may obtain a copy of the License at |
19 | * |
20 | * http://www.apache.org/licenses/LICENSE-2.0 |
21 | * |
22 | * Unless required by applicable law or agreed to in writing, software |
23 | * distributed under the License is distributed on an "AS IS" BASIS, |
24 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
25 | * See the License for the specific language governing permissions and |
26 | * limitations under the License. |
27 | */ |
28 | #include "Exception.h" |
29 | #include "ExceptionInternal.h" |
30 | #include "Function.h" |
31 | #include "SessionConfig.h" |
32 | |
33 | #include <sstream> |
34 | |
35 | #define ARRAYSIZE(A) (sizeof(A) / sizeof(A[0])) |
36 | |
37 | namespace Hdfs { |
38 | namespace Internal { |
39 | |
40 | template<typename T> |
41 | static void CheckRangeGE(const char * key, T const & value, T const & target) { |
42 | if (!(value >= target)) { |
43 | std::stringstream ss; |
44 | ss.imbue(std::locale::classic()); |
45 | ss << "Invalid configure item: \"" << key << "\", value: " << value |
46 | << ", expected value should be larger than " << target; |
47 | THROW(HdfsConfigInvalid, "%s" , ss.str().c_str()); |
48 | } |
49 | } |
50 | |
51 | template<typename T> |
52 | static void CheckMultipleOf(const char * key, const T & value, int unit) { |
53 | if (value <= 0 || value % unit != 0) { |
54 | THROW(HdfsConfigInvalid, "%s should be larger than 0 and be the multiple of %d." , key, unit); |
55 | } |
56 | } |
57 | |
58 | SessionConfig::SessionConfig(const Config & conf) { |
59 | ConfigDefault<bool> boolValues [] = { |
60 | { |
61 | &rpcTcpNoDelay, "rpc.client.connect.tcpnodelay" , true |
62 | }, { |
63 | &readFromLocal, "dfs.client.read.shortcircuit" , true |
64 | }, { |
65 | &addDatanode, "output.replace-datanode-on-failure" , true |
66 | }, { |
67 | ¬RetryAnotherNode, "input.notretry-another-node" , false |
68 | }, { |
69 | &useMappedFile, "input.localread.mappedfile" , true |
70 | }, { |
71 | &legacyLocalBlockReader, "dfs.client.use.legacy.blockreader.local" , false |
72 | } |
73 | }; |
74 | ConfigDefault<int32_t> i32Values[] = { |
75 | { |
76 | &rpcMaxIdleTime, "rpc.client.max.idle" , 10 * 1000, bind(CheckRangeGE<int32_t>, _1, _2, 1) |
77 | }, { |
78 | &rpcPingTimeout, "rpc.client.ping.interval" , 10 * 1000 |
79 | }, { |
80 | &rpcConnectTimeout, "rpc.client.connect.timeout" , 600 * 1000 |
81 | }, { |
82 | &rpcReadTimeout, "rpc.client.read.timeout" , 3600 * 1000 |
83 | }, { |
84 | &rpcWriteTimeout, "rpc.client.write.timeout" , 3600 * 1000 |
85 | }, { |
86 | &rpcSocketLingerTimeout, "rpc.client.socekt.linger.timeout" , -1 |
87 | }, { |
88 | &rpcMaxRetryOnConnect, "rpc.client.connect.retry" , 10, bind(CheckRangeGE<int32_t>, _1, _2, 1) |
89 | }, { |
90 | &rpcTimeout, "rpc.client.timeout" , 3600 * 1000 |
91 | }, { |
92 | &defaultReplica, "dfs.default.replica" , 3, bind(CheckRangeGE<int32_t>, _1, _2, 1) |
93 | }, { |
94 | &inputConnTimeout, "input.connect.timeout" , 600 * 1000 |
95 | }, { |
96 | &inputReadTimeout, "input.read.timeout" , 3600 * 1000 |
97 | }, { |
98 | &inputWriteTimeout, "input.write.timeout" , 3600 * 1000 |
99 | }, { |
100 | &localReadBufferSize, "input.localread.default.buffersize" , 1 * 1024 * 1024, bind(CheckRangeGE<int32_t>, _1, _2, 1) |
101 | }, { |
102 | &prefetchSize, "dfs.prefetchsize" , 10, bind(CheckRangeGE<int32_t>, _1, _2, 1) |
103 | }, { |
104 | &maxGetBlockInfoRetry, "input.read.getblockinfo.retry" , 3, bind(CheckRangeGE<int32_t>, _1, _2, 1) |
105 | }, { |
106 | &maxLocalBlockInfoCacheSize, "input.localread.blockinfo.cachesize" , 1000, bind(CheckRangeGE<int32_t>, _1, _2, 1) |
107 | }, { |
108 | &maxReadBlockRetry, "input.read.max.retry" , 60, bind(CheckRangeGE<int32_t>, _1, _2, 1) |
109 | }, { |
110 | &chunkSize, "output.default.chunksize" , 512, bind(CheckMultipleOf<int32_t>, _1, _2, 512) |
111 | }, { |
112 | &packetSize, "output.default.packetsize" , 64 * 1024 |
113 | }, { |
114 | &blockWriteRetry, "output.default.write.retry" , 10, bind(CheckRangeGE<int32_t>, _1, _2, 1) |
115 | }, { |
116 | &outputConnTimeout, "output.connect.timeout" , 600 * 1000 |
117 | }, { |
118 | &outputReadTimeout, "output.read.timeout" , 3600 * 1000 |
119 | }, { |
120 | &outputWriteTimeout, "output.write.timeout" , 3600 * 1000 |
121 | }, { |
122 | &closeFileTimeout, "output.close.timeout" , 3600 * 1000 |
123 | }, { |
124 | &packetPoolSize, "output.packetpool.size" , 1024 |
125 | }, { |
126 | &heartBeatInterval, "output.heeartbeat.interval" , 10 * 1000 |
127 | }, { |
128 | &rpcMaxHARetry, "dfs.client.failover.max.attempts" , 15, bind(CheckRangeGE<int32_t>, _1, _2, 0) |
129 | }, { |
130 | &maxFileDescriptorCacheSize, "dfs.client.read.shortcircuit.streams.cache.size" , 256, bind(CheckRangeGE<int32_t>, _1, _2, 0) |
131 | }, { |
132 | &socketCacheExpiry, "dfs.client.socketcache.expiryMsec" , 3000, bind(CheckRangeGE<int32_t>, _1, _2, 0) |
133 | }, { |
134 | &socketCacheCapacity, "dfs.client.socketcache.capacity" , 16, bind(CheckRangeGE<int32_t>, _1, _2, 0) |
135 | } |
136 | }; |
137 | ConfigDefault<int64_t> i64Values [] = { |
138 | { |
139 | &defaultBlockSize, "dfs.default.blocksize" , 64 * 1024 * 1024, bind(CheckMultipleOf<int64_t>, _1, _2, 512) |
140 | } |
141 | }; |
142 | ConfigDefault<std::string> strValues [] = { |
143 | {&defaultUri, "dfs.default.uri" , "hdfs://localhost:9000" }, |
144 | {&rpcAuthMethod, "hadoop.security.authentication" , "simple" }, |
145 | {&kerberosCachePath, "hadoop.security.kerberos.ticket.cache.path" , "" }, |
146 | {&logSeverity, "dfs.client.log.severity" , "INFO" }, |
147 | {&domainSocketPath, "dfs.domain.socket.path" , "" } |
148 | }; |
149 | |
150 | for (size_t i = 0; i < ARRAYSIZE(boolValues); ++i) { |
151 | *boolValues[i].variable = conf.getBool(boolValues[i].key, |
152 | boolValues[i].value); |
153 | |
154 | if (boolValues[i].check) { |
155 | boolValues[i].check(boolValues[i].key, *boolValues[i].variable); |
156 | } |
157 | } |
158 | |
159 | for (size_t i = 0; i < ARRAYSIZE(i32Values); ++i) { |
160 | *i32Values[i].variable = conf.getInt32(i32Values[i].key, |
161 | i32Values[i].value); |
162 | |
163 | if (i32Values[i].check) { |
164 | i32Values[i].check(i32Values[i].key, *i32Values[i].variable); |
165 | } |
166 | } |
167 | |
168 | for (size_t i = 0; i < ARRAYSIZE(i64Values); ++i) { |
169 | *i64Values[i].variable = conf.getInt64(i64Values[i].key, |
170 | i64Values[i].value); |
171 | |
172 | if (i64Values[i].check) { |
173 | i64Values[i].check(i64Values[i].key, *i64Values[i].variable); |
174 | } |
175 | } |
176 | |
177 | for (size_t i = 0; i < ARRAYSIZE(strValues); ++i) { |
178 | *strValues[i].variable = conf.getString(strValues[i].key, |
179 | strValues[i].value); |
180 | |
181 | if (strValues[i].check) { |
182 | strValues[i].check(strValues[i].key, *strValues[i].variable); |
183 | } |
184 | } |
185 | } |
186 | |
187 | } |
188 | } |
189 | |