1 | /******************************************************************** |
2 | * Copyright (c) 2013 - 2014, Pivotal Inc. |
3 | * All rights reserved. |
4 | * |
5 | * Author: Zhanwei Wang |
6 | ********************************************************************/ |
7 | /******************************************************************** |
8 | * 2014 - |
9 | * open source under Apache License Version 2.0 |
10 | ********************************************************************/ |
11 | /** |
12 | * Licensed to the Apache Software Foundation (ASF) under one |
13 | * or more contributor license agreements. See the NOTICE file |
14 | * distributed with this work for additional information |
15 | * regarding copyright ownership. The ASF licenses this file |
16 | * to you under the Apache License, Version 2.0 (the |
17 | * "License"); you may not use this file except in compliance |
18 | * with the License. You may obtain a copy of the License at |
19 | * |
20 | * http://www.apache.org/licenses/LICENSE-2.0 |
21 | * |
22 | * Unless required by applicable law or agreed to in writing, software |
23 | * distributed under the License is distributed on an "AS IS" BASIS, |
24 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
25 | * See the License for the specific language governing permissions and |
26 | * limitations under the License. |
27 | */ |
28 | #ifndef _HDFS_LIBHDFS3_SERVER_READSHORTCIRCUITINFO_H_ |
29 | #define _HDFS_LIBHDFS3_SERVER_READSHORTCIRCUITINFO_H_ |
30 | |
31 | #include "FileWrapper.h" |
32 | #include "Hash.h" |
33 | #include "LruMap.h" |
34 | #include "Memory.h" |
35 | #include "network/Socket.h" |
36 | #include "rpc/RpcAuth.h" |
37 | #include "server/BlockLocalPathInfo.h" |
38 | #include "server/DatanodeInfo.h" |
39 | #include "server/ExtendedBlock.h" |
40 | #include "SessionConfig.h" |
41 | #include "Thread.h" |
42 | #include "Token.h" |
43 | |
44 | namespace Hdfs { |
45 | namespace Internal { |
46 | |
47 | struct ReadShortCircuitInfoKey { |
48 | ReadShortCircuitInfoKey(uint32_t dnPort, int64_t blockId, |
49 | const std::string& bpid) |
50 | : dnPort(dnPort), blockId(blockId), bpid(bpid) {} |
51 | |
52 | size_t hash_value() const { |
53 | size_t values[] = {Int32Hasher(dnPort), Int64Hasher(blockId), |
54 | StringHasher(bpid)}; |
55 | return CombineHasher(values, sizeof(values) / sizeof(values[0])); |
56 | } |
57 | |
58 | bool operator==(const ReadShortCircuitInfoKey& other) const { |
59 | return dnPort == other.dnPort && blockId == other.blockId && |
60 | bpid == other.bpid; |
61 | } |
62 | |
63 | uint32_t dnPort; |
64 | int64_t blockId; |
65 | std::string bpid; |
66 | }; |
67 | |
68 | struct ReadShortCircuitFDHolder { |
69 | public: |
70 | ReadShortCircuitFDHolder() : metafd(-1), datafd(-1) {} |
71 | ~ReadShortCircuitFDHolder(); |
72 | |
73 | int metafd; |
74 | int datafd; |
75 | }; |
76 | |
77 | class ReadShortCircuitInfo { |
78 | public: |
79 | ReadShortCircuitInfo(const ReadShortCircuitInfoKey& key, bool legacy) |
80 | : legacy(legacy), |
81 | valid(true), |
82 | blockId(key.blockId), |
83 | bpid(key.bpid), |
84 | dnPort(key.dnPort) {} |
85 | |
86 | ~ReadShortCircuitInfo(); |
87 | |
88 | const shared_ptr<FileWrapper>& getDataFile() const { return dataFile; } |
89 | |
90 | void setDataFile(shared_ptr<FileWrapper> dataFile) { |
91 | this->dataFile = dataFile; |
92 | } |
93 | |
94 | const shared_ptr<FileWrapper>& getMetaFile() const { return metaFile; } |
95 | |
96 | void setMetaFile(shared_ptr<FileWrapper> metaFile) { |
97 | this->metaFile = metaFile; |
98 | } |
99 | |
100 | bool isValid() const { return valid; } |
101 | |
102 | void setValid(bool valid) { this->valid = valid; } |
103 | |
104 | int64_t getBlockId() const { return blockId; } |
105 | |
106 | void setBlockId(int64_t blockId) { this->blockId = blockId; } |
107 | |
108 | const std::string& getBpid() const { return bpid; } |
109 | |
110 | void setBpid(const std::string& bpid) { this->bpid = bpid; } |
111 | |
112 | uint32_t getDnPort() const { return dnPort; } |
113 | |
114 | void setDnPort(uint32_t dnPort) { this->dnPort = dnPort; } |
115 | |
116 | ReadShortCircuitInfoKey getKey() const { |
117 | return ReadShortCircuitInfoKey(dnPort, blockId, bpid); |
118 | } |
119 | |
120 | bool isLegacy() const { return legacy; } |
121 | |
122 | void setLegacy(bool legacy) { this->legacy = legacy; } |
123 | |
124 | const shared_ptr<ReadShortCircuitFDHolder>& getFdHolder() const { |
125 | return fdHolder; |
126 | } |
127 | |
128 | void setFdHolder(const shared_ptr<ReadShortCircuitFDHolder>& fdHolder) { |
129 | this->fdHolder = fdHolder; |
130 | } |
131 | |
132 | const std::string formatBlockInfo() const { |
133 | ExtendedBlock block; |
134 | block.setBlockId(blockId); |
135 | block.setPoolId(bpid); |
136 | return block.toString(); |
137 | } |
138 | |
139 | private: |
140 | bool legacy; |
141 | bool valid; |
142 | shared_ptr<FileWrapper> dataFile; |
143 | shared_ptr<FileWrapper> metaFile; |
144 | shared_ptr<ReadShortCircuitFDHolder> fdHolder; |
145 | int64_t blockId; |
146 | std::string bpid; |
147 | uint32_t dnPort; |
148 | }; |
149 | |
150 | typedef LruMap<ReadShortCircuitInfoKey, shared_ptr<ReadShortCircuitFDHolder> > |
151 | ReadShortCircuitFDCacheType; |
152 | typedef LruMap<ReadShortCircuitInfoKey, BlockLocalPathInfo> |
153 | BlockLocalPathInfoCacheType; |
154 | |
155 | class ReadShortCircuitInfoBuilder { |
156 | public: |
157 | ReadShortCircuitInfoBuilder(const DatanodeInfo& dnInfo, const RpcAuth& auth, |
158 | const SessionConfig& conf); |
159 | shared_ptr<ReadShortCircuitInfo> fetchOrCreate(const ExtendedBlock& block, |
160 | const Token token); |
161 | static void release(const ReadShortCircuitInfo& info); |
162 | |
163 | private: |
164 | BlockLocalPathInfo getBlockLocalPathInfo(const ExtendedBlock& block, |
165 | const Token& token); |
166 | void invalidBlockLocalPathInfo(const ExtendedBlock& block); |
167 | shared_ptr<ReadShortCircuitInfo> createReadShortCircuitInfo( |
168 | const ReadShortCircuitInfoKey& key, const BlockLocalPathInfo& info); |
169 | shared_ptr<ReadShortCircuitInfo> createReadShortCircuitInfo( |
170 | const ReadShortCircuitInfoKey& key, const ExtendedBlock& block, |
171 | const Token& token); |
172 | shared_ptr<ReadShortCircuitInfo> createReadShortCircuitInfo( |
173 | const ReadShortCircuitInfoKey& key, |
174 | const shared_ptr<ReadShortCircuitFDHolder>& fds); |
175 | std::string buildDomainSocketAddress(uint32_t port); |
176 | shared_ptr<Socket> getDomainSocketConnection(const std::string& addr); |
177 | shared_ptr<ReadShortCircuitFDHolder> receiveReadShortCircuitFDs( |
178 | Socket& sock, const ExtendedBlock& block); |
179 | |
180 | private: |
181 | DatanodeInfo dnInfo; |
182 | RpcAuth auth; |
183 | SessionConfig conf; |
184 | static const int MaxReadShortCircuitVersion = 1; |
185 | static ReadShortCircuitFDCacheType ReadShortCircuitFDCache; |
186 | static BlockLocalPathInfoCacheType BlockLocalPathInfoCache; |
187 | }; |
188 | } |
189 | } |
190 | |
191 | HDFS_HASH_DEFINE(::Hdfs::Internal::ReadShortCircuitInfoKey); |
192 | |
193 | #endif /* _HDFS_LIBHDFS3_SERVER_READSHORTCIRCUITINFO_H_ */ |
194 | |