1/********************************************************************
2 * Copyright (c) 2013 - 2014, Pivotal Inc.
3 * All rights reserved.
4 *
5 * Author: Zhanwei Wang
6 ********************************************************************/
7/********************************************************************
8 * 2014 -
9 * open source under Apache License Version 2.0
10 ********************************************************************/
11/**
12 * Licensed to the Apache Software Foundation (ASF) under one
13 * or more contributor license agreements. See the NOTICE file
14 * distributed with this work for additional information
15 * regarding copyright ownership. The ASF licenses this file
16 * to you under the Apache License, Version 2.0 (the
17 * "License"); you may not use this file except in compliance
18 * with the License. You may obtain a copy of the License at
19 *
20 * http://www.apache.org/licenses/LICENSE-2.0
21 *
22 * Unless required by applicable law or agreed to in writing, software
23 * distributed under the License is distributed on an "AS IS" BASIS,
24 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
25 * See the License for the specific language governing permissions and
26 * limitations under the License.
27 */
28#ifndef _HDFS_LIBHDFS3_SERVER_READSHORTCIRCUITINFO_H_
29#define _HDFS_LIBHDFS3_SERVER_READSHORTCIRCUITINFO_H_
30
31#include "FileWrapper.h"
32#include "Hash.h"
33#include "LruMap.h"
34#include "Memory.h"
35#include "network/Socket.h"
36#include "rpc/RpcAuth.h"
37#include "server/BlockLocalPathInfo.h"
38#include "server/DatanodeInfo.h"
39#include "server/ExtendedBlock.h"
40#include "SessionConfig.h"
41#include "Thread.h"
42#include "Token.h"
43
44namespace Hdfs {
45namespace Internal {
46
47struct ReadShortCircuitInfoKey {
48 ReadShortCircuitInfoKey(uint32_t dnPort, int64_t blockId,
49 const std::string& bpid)
50 : dnPort(dnPort), blockId(blockId), bpid(bpid) {}
51
52 size_t hash_value() const {
53 size_t values[] = {Int32Hasher(dnPort), Int64Hasher(blockId),
54 StringHasher(bpid)};
55 return CombineHasher(values, sizeof(values) / sizeof(values[0]));
56 }
57
58 bool operator==(const ReadShortCircuitInfoKey& other) const {
59 return dnPort == other.dnPort && blockId == other.blockId &&
60 bpid == other.bpid;
61 }
62
63 uint32_t dnPort;
64 int64_t blockId;
65 std::string bpid;
66};
67
68struct ReadShortCircuitFDHolder {
69 public:
70 ReadShortCircuitFDHolder() : metafd(-1), datafd(-1) {}
71 ~ReadShortCircuitFDHolder();
72
73 int metafd;
74 int datafd;
75};
76
77class ReadShortCircuitInfo {
78 public:
79 ReadShortCircuitInfo(const ReadShortCircuitInfoKey& key, bool legacy)
80 : legacy(legacy),
81 valid(true),
82 blockId(key.blockId),
83 bpid(key.bpid),
84 dnPort(key.dnPort) {}
85
86 ~ReadShortCircuitInfo();
87
88 const shared_ptr<FileWrapper>& getDataFile() const { return dataFile; }
89
90 void setDataFile(shared_ptr<FileWrapper> dataFile) {
91 this->dataFile = dataFile;
92 }
93
94 const shared_ptr<FileWrapper>& getMetaFile() const { return metaFile; }
95
96 void setMetaFile(shared_ptr<FileWrapper> metaFile) {
97 this->metaFile = metaFile;
98 }
99
100 bool isValid() const { return valid; }
101
102 void setValid(bool valid) { this->valid = valid; }
103
104 int64_t getBlockId() const { return blockId; }
105
106 void setBlockId(int64_t blockId) { this->blockId = blockId; }
107
108 const std::string& getBpid() const { return bpid; }
109
110 void setBpid(const std::string& bpid) { this->bpid = bpid; }
111
112 uint32_t getDnPort() const { return dnPort; }
113
114 void setDnPort(uint32_t dnPort) { this->dnPort = dnPort; }
115
116 ReadShortCircuitInfoKey getKey() const {
117 return ReadShortCircuitInfoKey(dnPort, blockId, bpid);
118 }
119
120 bool isLegacy() const { return legacy; }
121
122 void setLegacy(bool legacy) { this->legacy = legacy; }
123
124 const shared_ptr<ReadShortCircuitFDHolder>& getFdHolder() const {
125 return fdHolder;
126 }
127
128 void setFdHolder(const shared_ptr<ReadShortCircuitFDHolder>& fdHolder) {
129 this->fdHolder = fdHolder;
130 }
131
132 const std::string formatBlockInfo() const {
133 ExtendedBlock block;
134 block.setBlockId(blockId);
135 block.setPoolId(bpid);
136 return block.toString();
137 }
138
139 private:
140 bool legacy;
141 bool valid;
142 shared_ptr<FileWrapper> dataFile;
143 shared_ptr<FileWrapper> metaFile;
144 shared_ptr<ReadShortCircuitFDHolder> fdHolder;
145 int64_t blockId;
146 std::string bpid;
147 uint32_t dnPort;
148};
149
150typedef LruMap<ReadShortCircuitInfoKey, shared_ptr<ReadShortCircuitFDHolder> >
151 ReadShortCircuitFDCacheType;
152typedef LruMap<ReadShortCircuitInfoKey, BlockLocalPathInfo>
153 BlockLocalPathInfoCacheType;
154
155class ReadShortCircuitInfoBuilder {
156 public:
157 ReadShortCircuitInfoBuilder(const DatanodeInfo& dnInfo, const RpcAuth& auth,
158 const SessionConfig& conf);
159 shared_ptr<ReadShortCircuitInfo> fetchOrCreate(const ExtendedBlock& block,
160 const Token token);
161 static void release(const ReadShortCircuitInfo& info);
162
163 private:
164 BlockLocalPathInfo getBlockLocalPathInfo(const ExtendedBlock& block,
165 const Token& token);
166 void invalidBlockLocalPathInfo(const ExtendedBlock& block);
167 shared_ptr<ReadShortCircuitInfo> createReadShortCircuitInfo(
168 const ReadShortCircuitInfoKey& key, const BlockLocalPathInfo& info);
169 shared_ptr<ReadShortCircuitInfo> createReadShortCircuitInfo(
170 const ReadShortCircuitInfoKey& key, const ExtendedBlock& block,
171 const Token& token);
172 shared_ptr<ReadShortCircuitInfo> createReadShortCircuitInfo(
173 const ReadShortCircuitInfoKey& key,
174 const shared_ptr<ReadShortCircuitFDHolder>& fds);
175 std::string buildDomainSocketAddress(uint32_t port);
176 shared_ptr<Socket> getDomainSocketConnection(const std::string& addr);
177 shared_ptr<ReadShortCircuitFDHolder> receiveReadShortCircuitFDs(
178 Socket& sock, const ExtendedBlock& block);
179
180 private:
181 DatanodeInfo dnInfo;
182 RpcAuth auth;
183 SessionConfig conf;
184 static const int MaxReadShortCircuitVersion = 1;
185 static ReadShortCircuitFDCacheType ReadShortCircuitFDCache;
186 static BlockLocalPathInfoCacheType BlockLocalPathInfoCache;
187};
188}
189}
190
191HDFS_HASH_DEFINE(::Hdfs::Internal::ReadShortCircuitInfoKey);
192
193#endif /* _HDFS_LIBHDFS3_SERVER_READSHORTCIRCUITINFO_H_ */
194