1/********************************************************************
2 * Copyright (c) 2013 - 2014, Pivotal Inc.
3 * All rights reserved.
4 *
5 * Author: Zhanwei Wang
6 ********************************************************************/
7/********************************************************************
8 * 2014 -
9 * open source under Apache License Version 2.0
10 ********************************************************************/
11/**
12 * Licensed to the Apache Software Foundation (ASF) under one
13 * or more contributor license agreements. See the NOTICE file
14 * distributed with this work for additional information
15 * regarding copyright ownership. The ASF licenses this file
16 * to you under the Apache License, Version 2.0 (the
17 * "License"); you may not use this file except in compliance
18 * with the License. You may obtain a copy of the License at
19 *
20 * http://www.apache.org/licenses/LICENSE-2.0
21 *
22 * Unless required by applicable law or agreed to in writing, software
23 * distributed under the License is distributed on an "AS IS" BASIS,
24 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
25 * See the License for the specific language governing permissions and
26 * limitations under the License.
27 */
28#include "Exception.h"
29#include "ExceptionInternal.h"
30#include "Function.h"
31#include "SessionConfig.h"
32
33#include <sstream>
34
35#define ARRAYSIZE(A) (sizeof(A) / sizeof(A[0]))
36
37namespace Hdfs {
38namespace Internal {
39
40template<typename T>
41static void CheckRangeGE(const char * key, T const & value, T const & target) {
42 if (!(value >= target)) {
43 std::stringstream ss;
44 ss.imbue(std::locale::classic());
45 ss << "Invalid configure item: \"" << key << "\", value: " << value
46 << ", expected value should be larger than " << target;
47 THROW(HdfsConfigInvalid, "%s", ss.str().c_str());
48 }
49}
50
51template<typename T>
52static void CheckMultipleOf(const char * key, const T & value, int unit) {
53 if (value <= 0 || value % unit != 0) {
54 THROW(HdfsConfigInvalid, "%s should be larger than 0 and be the multiple of %d.", key, unit);
55 }
56}
57
58SessionConfig::SessionConfig(const Config & conf) {
59 ConfigDefault<bool> boolValues [] = {
60 {
61 &rpcTcpNoDelay, "rpc.client.connect.tcpnodelay", true
62 }, {
63 &readFromLocal, "dfs.client.read.shortcircuit", true
64 }, {
65 &addDatanode, "output.replace-datanode-on-failure", true
66 }, {
67 &notRetryAnotherNode, "input.notretry-another-node", false
68 }, {
69 &useMappedFile, "input.localread.mappedfile", true
70 }, {
71 &legacyLocalBlockReader, "dfs.client.use.legacy.blockreader.local", false
72 }
73 };
74 ConfigDefault<int32_t> i32Values[] = {
75 {
76 &rpcMaxIdleTime, "rpc.client.max.idle", 10 * 1000, bind(CheckRangeGE<int32_t>, _1, _2, 1)
77 }, {
78 &rpcPingTimeout, "rpc.client.ping.interval", 10 * 1000
79 }, {
80 &rpcConnectTimeout, "rpc.client.connect.timeout", 600 * 1000
81 }, {
82 &rpcReadTimeout, "rpc.client.read.timeout", 3600 * 1000
83 }, {
84 &rpcWriteTimeout, "rpc.client.write.timeout", 3600 * 1000
85 }, {
86 &rpcSocketLingerTimeout, "rpc.client.socekt.linger.timeout", -1
87 }, {
88 &rpcMaxRetryOnConnect, "rpc.client.connect.retry", 10, bind(CheckRangeGE<int32_t>, _1, _2, 1)
89 }, {
90 &rpcTimeout, "rpc.client.timeout", 3600 * 1000
91 }, {
92 &defaultReplica, "dfs.default.replica", 3, bind(CheckRangeGE<int32_t>, _1, _2, 1)
93 }, {
94 &inputConnTimeout, "input.connect.timeout", 600 * 1000
95 }, {
96 &inputReadTimeout, "input.read.timeout", 3600 * 1000
97 }, {
98 &inputWriteTimeout, "input.write.timeout", 3600 * 1000
99 }, {
100 &localReadBufferSize, "input.localread.default.buffersize", 1 * 1024 * 1024, bind(CheckRangeGE<int32_t>, _1, _2, 1)
101 }, {
102 &prefetchSize, "dfs.prefetchsize", 10, bind(CheckRangeGE<int32_t>, _1, _2, 1)
103 }, {
104 &maxGetBlockInfoRetry, "input.read.getblockinfo.retry", 3, bind(CheckRangeGE<int32_t>, _1, _2, 1)
105 }, {
106 &maxLocalBlockInfoCacheSize, "input.localread.blockinfo.cachesize", 1000, bind(CheckRangeGE<int32_t>, _1, _2, 1)
107 }, {
108 &maxReadBlockRetry, "input.read.max.retry", 60, bind(CheckRangeGE<int32_t>, _1, _2, 1)
109 }, {
110 &chunkSize, "output.default.chunksize", 512, bind(CheckMultipleOf<int32_t>, _1, _2, 512)
111 }, {
112 &packetSize, "output.default.packetsize", 64 * 1024
113 }, {
114 &blockWriteRetry, "output.default.write.retry", 10, bind(CheckRangeGE<int32_t>, _1, _2, 1)
115 }, {
116 &outputConnTimeout, "output.connect.timeout", 600 * 1000
117 }, {
118 &outputReadTimeout, "output.read.timeout", 3600 * 1000
119 }, {
120 &outputWriteTimeout, "output.write.timeout", 3600 * 1000
121 }, {
122 &closeFileTimeout, "output.close.timeout", 3600 * 1000
123 }, {
124 &packetPoolSize, "output.packetpool.size", 1024
125 }, {
126 &heartBeatInterval, "output.heeartbeat.interval", 10 * 1000
127 }, {
128 &rpcMaxHARetry, "dfs.client.failover.max.attempts", 15, bind(CheckRangeGE<int32_t>, _1, _2, 0)
129 }, {
130 &maxFileDescriptorCacheSize, "dfs.client.read.shortcircuit.streams.cache.size", 256, bind(CheckRangeGE<int32_t>, _1, _2, 0)
131 }, {
132 &socketCacheExpiry, "dfs.client.socketcache.expiryMsec", 3000, bind(CheckRangeGE<int32_t>, _1, _2, 0)
133 }, {
134 &socketCacheCapacity, "dfs.client.socketcache.capacity", 16, bind(CheckRangeGE<int32_t>, _1, _2, 0)
135 }
136 };
137 ConfigDefault<int64_t> i64Values [] = {
138 {
139 &defaultBlockSize, "dfs.default.blocksize", 64 * 1024 * 1024, bind(CheckMultipleOf<int64_t>, _1, _2, 512)
140 }
141 };
142 ConfigDefault<std::string> strValues [] = {
143 {&defaultUri, "dfs.default.uri", "hdfs://localhost:9000" },
144 {&rpcAuthMethod, "hadoop.security.authentication", "simple" },
145 {&kerberosCachePath, "hadoop.security.kerberos.ticket.cache.path", "" },
146 {&logSeverity, "dfs.client.log.severity", "INFO" },
147 {&domainSocketPath, "dfs.domain.socket.path", ""}
148 };
149
150 for (size_t i = 0; i < ARRAYSIZE(boolValues); ++i) {
151 *boolValues[i].variable = conf.getBool(boolValues[i].key,
152 boolValues[i].value);
153
154 if (boolValues[i].check) {
155 boolValues[i].check(boolValues[i].key, *boolValues[i].variable);
156 }
157 }
158
159 for (size_t i = 0; i < ARRAYSIZE(i32Values); ++i) {
160 *i32Values[i].variable = conf.getInt32(i32Values[i].key,
161 i32Values[i].value);
162
163 if (i32Values[i].check) {
164 i32Values[i].check(i32Values[i].key, *i32Values[i].variable);
165 }
166 }
167
168 for (size_t i = 0; i < ARRAYSIZE(i64Values); ++i) {
169 *i64Values[i].variable = conf.getInt64(i64Values[i].key,
170 i64Values[i].value);
171
172 if (i64Values[i].check) {
173 i64Values[i].check(i64Values[i].key, *i64Values[i].variable);
174 }
175 }
176
177 for (size_t i = 0; i < ARRAYSIZE(strValues); ++i) {
178 *strValues[i].variable = conf.getString(strValues[i].key,
179 strValues[i].value);
180
181 if (strValues[i].check) {
182 strValues[i].check(strValues[i].key, *strValues[i].variable);
183 }
184 }
185}
186
187}
188}
189