| 1 | /******************************************************************** |
| 2 | * Copyright (c) 2013 - 2014, Pivotal Inc. |
| 3 | * All rights reserved. |
| 4 | * |
| 5 | * Author: Zhanwei Wang |
| 6 | ********************************************************************/ |
| 7 | /******************************************************************** |
| 8 | * 2014 - |
| 9 | * open source under Apache License Version 2.0 |
| 10 | ********************************************************************/ |
| 11 | /** |
| 12 | * Licensed to the Apache Software Foundation (ASF) under one |
| 13 | * or more contributor license agreements. See the NOTICE file |
| 14 | * distributed with this work for additional information |
| 15 | * regarding copyright ownership. The ASF licenses this file |
| 16 | * to you under the Apache License, Version 2.0 (the |
| 17 | * "License"); you may not use this file except in compliance |
| 18 | * with the License. You may obtain a copy of the License at |
| 19 | * |
| 20 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 21 | * |
| 22 | * Unless required by applicable law or agreed to in writing, software |
| 23 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 24 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 25 | * See the License for the specific language governing permissions and |
| 26 | * limitations under the License. |
| 27 | */ |
| 28 | #include "Exception.h" |
| 29 | #include "ExceptionInternal.h" |
| 30 | #include "Function.h" |
| 31 | #include "SessionConfig.h" |
| 32 | |
| 33 | #include <sstream> |
| 34 | |
| 35 | #define ARRAYSIZE(A) (sizeof(A) / sizeof(A[0])) |
| 36 | |
| 37 | namespace Hdfs { |
| 38 | namespace Internal { |
| 39 | |
| 40 | template<typename T> |
| 41 | static void CheckRangeGE(const char * key, T const & value, T const & target) { |
| 42 | if (!(value >= target)) { |
| 43 | std::stringstream ss; |
| 44 | ss.imbue(std::locale::classic()); |
| 45 | ss << "Invalid configure item: \"" << key << "\", value: " << value |
| 46 | << ", expected value should be larger than " << target; |
| 47 | THROW(HdfsConfigInvalid, "%s" , ss.str().c_str()); |
| 48 | } |
| 49 | } |
| 50 | |
| 51 | template<typename T> |
| 52 | static void CheckMultipleOf(const char * key, const T & value, int unit) { |
| 53 | if (value <= 0 || value % unit != 0) { |
| 54 | THROW(HdfsConfigInvalid, "%s should be larger than 0 and be the multiple of %d." , key, unit); |
| 55 | } |
| 56 | } |
| 57 | |
| 58 | SessionConfig::SessionConfig(const Config & conf) { |
| 59 | ConfigDefault<bool> boolValues [] = { |
| 60 | { |
| 61 | &rpcTcpNoDelay, "rpc.client.connect.tcpnodelay" , true |
| 62 | }, { |
| 63 | &readFromLocal, "dfs.client.read.shortcircuit" , true |
| 64 | }, { |
| 65 | &addDatanode, "output.replace-datanode-on-failure" , true |
| 66 | }, { |
| 67 | ¬RetryAnotherNode, "input.notretry-another-node" , false |
| 68 | }, { |
| 69 | &useMappedFile, "input.localread.mappedfile" , true |
| 70 | }, { |
| 71 | &legacyLocalBlockReader, "dfs.client.use.legacy.blockreader.local" , false |
| 72 | } |
| 73 | }; |
| 74 | ConfigDefault<int32_t> i32Values[] = { |
| 75 | { |
| 76 | &rpcMaxIdleTime, "rpc.client.max.idle" , 10 * 1000, bind(CheckRangeGE<int32_t>, _1, _2, 1) |
| 77 | }, { |
| 78 | &rpcPingTimeout, "rpc.client.ping.interval" , 10 * 1000 |
| 79 | }, { |
| 80 | &rpcConnectTimeout, "rpc.client.connect.timeout" , 600 * 1000 |
| 81 | }, { |
| 82 | &rpcReadTimeout, "rpc.client.read.timeout" , 3600 * 1000 |
| 83 | }, { |
| 84 | &rpcWriteTimeout, "rpc.client.write.timeout" , 3600 * 1000 |
| 85 | }, { |
| 86 | &rpcSocketLingerTimeout, "rpc.client.socekt.linger.timeout" , -1 |
| 87 | }, { |
| 88 | &rpcMaxRetryOnConnect, "rpc.client.connect.retry" , 10, bind(CheckRangeGE<int32_t>, _1, _2, 1) |
| 89 | }, { |
| 90 | &rpcTimeout, "rpc.client.timeout" , 3600 * 1000 |
| 91 | }, { |
| 92 | &defaultReplica, "dfs.default.replica" , 3, bind(CheckRangeGE<int32_t>, _1, _2, 1) |
| 93 | }, { |
| 94 | &inputConnTimeout, "input.connect.timeout" , 600 * 1000 |
| 95 | }, { |
| 96 | &inputReadTimeout, "input.read.timeout" , 3600 * 1000 |
| 97 | }, { |
| 98 | &inputWriteTimeout, "input.write.timeout" , 3600 * 1000 |
| 99 | }, { |
| 100 | &localReadBufferSize, "input.localread.default.buffersize" , 1 * 1024 * 1024, bind(CheckRangeGE<int32_t>, _1, _2, 1) |
| 101 | }, { |
| 102 | &prefetchSize, "dfs.prefetchsize" , 10, bind(CheckRangeGE<int32_t>, _1, _2, 1) |
| 103 | }, { |
| 104 | &maxGetBlockInfoRetry, "input.read.getblockinfo.retry" , 3, bind(CheckRangeGE<int32_t>, _1, _2, 1) |
| 105 | }, { |
| 106 | &maxLocalBlockInfoCacheSize, "input.localread.blockinfo.cachesize" , 1000, bind(CheckRangeGE<int32_t>, _1, _2, 1) |
| 107 | }, { |
| 108 | &maxReadBlockRetry, "input.read.max.retry" , 60, bind(CheckRangeGE<int32_t>, _1, _2, 1) |
| 109 | }, { |
| 110 | &chunkSize, "output.default.chunksize" , 512, bind(CheckMultipleOf<int32_t>, _1, _2, 512) |
| 111 | }, { |
| 112 | &packetSize, "output.default.packetsize" , 64 * 1024 |
| 113 | }, { |
| 114 | &blockWriteRetry, "output.default.write.retry" , 10, bind(CheckRangeGE<int32_t>, _1, _2, 1) |
| 115 | }, { |
| 116 | &outputConnTimeout, "output.connect.timeout" , 600 * 1000 |
| 117 | }, { |
| 118 | &outputReadTimeout, "output.read.timeout" , 3600 * 1000 |
| 119 | }, { |
| 120 | &outputWriteTimeout, "output.write.timeout" , 3600 * 1000 |
| 121 | }, { |
| 122 | &closeFileTimeout, "output.close.timeout" , 3600 * 1000 |
| 123 | }, { |
| 124 | &packetPoolSize, "output.packetpool.size" , 1024 |
| 125 | }, { |
| 126 | &heartBeatInterval, "output.heeartbeat.interval" , 10 * 1000 |
| 127 | }, { |
| 128 | &rpcMaxHARetry, "dfs.client.failover.max.attempts" , 15, bind(CheckRangeGE<int32_t>, _1, _2, 0) |
| 129 | }, { |
| 130 | &maxFileDescriptorCacheSize, "dfs.client.read.shortcircuit.streams.cache.size" , 256, bind(CheckRangeGE<int32_t>, _1, _2, 0) |
| 131 | }, { |
| 132 | &socketCacheExpiry, "dfs.client.socketcache.expiryMsec" , 3000, bind(CheckRangeGE<int32_t>, _1, _2, 0) |
| 133 | }, { |
| 134 | &socketCacheCapacity, "dfs.client.socketcache.capacity" , 16, bind(CheckRangeGE<int32_t>, _1, _2, 0) |
| 135 | } |
| 136 | }; |
| 137 | ConfigDefault<int64_t> i64Values [] = { |
| 138 | { |
| 139 | &defaultBlockSize, "dfs.default.blocksize" , 64 * 1024 * 1024, bind(CheckMultipleOf<int64_t>, _1, _2, 512) |
| 140 | } |
| 141 | }; |
| 142 | ConfigDefault<std::string> strValues [] = { |
| 143 | {&defaultUri, "dfs.default.uri" , "hdfs://localhost:9000" }, |
| 144 | {&rpcAuthMethod, "hadoop.security.authentication" , "simple" }, |
| 145 | {&kerberosCachePath, "hadoop.security.kerberos.ticket.cache.path" , "" }, |
| 146 | {&logSeverity, "dfs.client.log.severity" , "INFO" }, |
| 147 | {&domainSocketPath, "dfs.domain.socket.path" , "" } |
| 148 | }; |
| 149 | |
| 150 | for (size_t i = 0; i < ARRAYSIZE(boolValues); ++i) { |
| 151 | *boolValues[i].variable = conf.getBool(boolValues[i].key, |
| 152 | boolValues[i].value); |
| 153 | |
| 154 | if (boolValues[i].check) { |
| 155 | boolValues[i].check(boolValues[i].key, *boolValues[i].variable); |
| 156 | } |
| 157 | } |
| 158 | |
| 159 | for (size_t i = 0; i < ARRAYSIZE(i32Values); ++i) { |
| 160 | *i32Values[i].variable = conf.getInt32(i32Values[i].key, |
| 161 | i32Values[i].value); |
| 162 | |
| 163 | if (i32Values[i].check) { |
| 164 | i32Values[i].check(i32Values[i].key, *i32Values[i].variable); |
| 165 | } |
| 166 | } |
| 167 | |
| 168 | for (size_t i = 0; i < ARRAYSIZE(i64Values); ++i) { |
| 169 | *i64Values[i].variable = conf.getInt64(i64Values[i].key, |
| 170 | i64Values[i].value); |
| 171 | |
| 172 | if (i64Values[i].check) { |
| 173 | i64Values[i].check(i64Values[i].key, *i64Values[i].variable); |
| 174 | } |
| 175 | } |
| 176 | |
| 177 | for (size_t i = 0; i < ARRAYSIZE(strValues); ++i) { |
| 178 | *strValues[i].variable = conf.getString(strValues[i].key, |
| 179 | strValues[i].value); |
| 180 | |
| 181 | if (strValues[i].check) { |
| 182 | strValues[i].check(strValues[i].key, *strValues[i].variable); |
| 183 | } |
| 184 | } |
| 185 | } |
| 186 | |
| 187 | } |
| 188 | } |
| 189 | |