1 | /******************************************************************** |
2 | * Copyright (c) 2013 - 2014, Pivotal Inc. |
3 | * All rights reserved. |
4 | * |
5 | * Author: Zhanwei Wang |
6 | ********************************************************************/ |
7 | /******************************************************************** |
8 | * 2014 - |
9 | * open source under Apache License Version 2.0 |
10 | ********************************************************************/ |
11 | /** |
12 | * Licensed to the Apache Software Foundation (ASF) under one |
13 | * or more contributor license agreements. See the NOTICE file |
14 | * distributed with this work for additional information |
15 | * regarding copyright ownership. The ASF licenses this file |
16 | * to you under the Apache License, Version 2.0 (the |
17 | * "License"); you may not use this file except in compliance |
18 | * with the License. You may obtain a copy of the License at |
19 | * |
20 | * http://www.apache.org/licenses/LICENSE-2.0 |
21 | * |
22 | * Unless required by applicable law or agreed to in writing, software |
23 | * distributed under the License is distributed on an "AS IS" BASIS, |
24 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
25 | * See the License for the specific language governing permissions and |
26 | * limitations under the License. |
27 | */ |
28 | #include "Exception.h" |
29 | #include "ExceptionInternal.h" |
30 | #include "Logger.h" |
31 | #include "NamenodeImpl.h" |
32 | #include "NamenodeProxy.h" |
33 | #include "StringUtil.h" |
34 | |
35 | #include <string> |
36 | |
37 | #include <sys/fcntl.h> |
38 | #include <sys/stat.h> |
39 | #include <sys/types.h> |
40 | #include <sys/file.h> |
41 | |
42 | namespace Hdfs { |
43 | namespace Internal { |
44 | |
45 | static uint32_t GetInitNamenodeIndex(const std::string id) { |
46 | std::string path = "/tmp/" ; |
47 | path += id; |
48 | int fd; |
49 | uint32_t index = 0; |
50 | /* |
51 | * try create the file |
52 | */ |
53 | fd = open(path.c_str(), O_WRONLY | O_CREAT | O_EXCL, 0666); |
54 | |
55 | if (fd < 0) { |
56 | if (errno == EEXIST) { |
57 | /* |
58 | * the file already exist, try to open it |
59 | */ |
60 | fd = open(path.c_str(), O_RDONLY); |
61 | } else { |
62 | /* |
63 | * failed to create, do not care why |
64 | */ |
65 | return 0; |
66 | } |
67 | } else { |
68 | if (0 != flock(fd, LOCK_EX)) { |
69 | /* |
70 | * failed to lock |
71 | */ |
72 | close(fd); |
73 | return index; |
74 | } |
75 | |
76 | /* |
77 | * created file, initialize it with 0 |
78 | */ |
79 | if (write(fd, &index, sizeof(index)) < 0) { |
80 | LOG(WARNING, |
81 | "NamenodeProxy: Failed to write current Namenode index into " |
82 | "cache file." ); |
83 | /* |
84 | * ignore the failure. |
85 | */ |
86 | } |
87 | |
88 | flock(fd, LOCK_UN); |
89 | close(fd); |
90 | return index; |
91 | } |
92 | |
93 | /* |
94 | * the file exist, read it. |
95 | */ |
96 | if (fd >= 0) { |
97 | if (0 != flock(fd, LOCK_SH)) { |
98 | /* |
99 | * failed to lock |
100 | */ |
101 | close(fd); |
102 | return index; |
103 | } |
104 | |
105 | if (sizeof(index) != read(fd, &index, sizeof(index))) { |
106 | /* |
107 | * failed to read, do not care why |
108 | */ |
109 | index = 0; |
110 | } |
111 | |
112 | flock(fd, LOCK_UN); |
113 | close(fd); |
114 | } |
115 | |
116 | return index; |
117 | } |
118 | |
119 | static void SetInitNamenodeIndex(const std::string & id, uint32_t index) { |
120 | std::string path = "/tmp/" ; |
121 | path += id; |
122 | int fd; |
123 | /* |
124 | * try open the file for write |
125 | */ |
126 | fd = open(path.c_str(), O_WRONLY); |
127 | |
128 | if (fd > 0) { |
129 | if (0 != flock(fd, LOCK_EX)) { |
130 | /* |
131 | * failed to lock |
132 | */ |
133 | close(fd); |
134 | return; |
135 | } |
136 | |
137 | if (write(fd, &index, sizeof(index)) < 0) { |
138 | LOG(WARNING, |
139 | "NamenodeProxy: Failed to write current Namenode index into " |
140 | "cache file." ); |
141 | /* |
142 | * ignore the failure. |
143 | */ |
144 | } |
145 | flock(fd, LOCK_UN); |
146 | close(fd); |
147 | } |
148 | } |
149 | |
150 | NamenodeProxy::NamenodeProxy(const std::vector<NamenodeInfo> & namenodeInfos, const std::string & tokenService, |
151 | const SessionConfig & c, const RpcAuth & a) : |
152 | clusterid(tokenService), currentNamenode(0) { |
153 | if (namenodeInfos.size() == 1) { |
154 | enableNamenodeHA = false; |
155 | maxNamenodeHARetry = 0; |
156 | } else { |
157 | enableNamenodeHA = true; |
158 | maxNamenodeHARetry = c.getRpcMaxHaRetry(); |
159 | } |
160 | |
161 | for (size_t i = 0; i < namenodeInfos.size(); ++i) { |
162 | std::vector<std::string> nninfo = StringSplit(namenodeInfos[i].getRpcAddr(), ":" ); |
163 | |
164 | if (nninfo.size() != 2) { |
165 | THROW(InvalidParameter, "Cannot create namenode proxy, %s does not contain host or port" , |
166 | namenodeInfos[i].getRpcAddr().c_str()); |
167 | } |
168 | |
169 | namenodes.push_back( |
170 | shared_ptr<Namenode>( |
171 | new NamenodeImpl(nninfo[0].c_str(), nninfo[1].c_str(), clusterid, c, a))); |
172 | } |
173 | |
174 | if (enableNamenodeHA) { |
175 | currentNamenode = GetInitNamenodeIndex(clusterid) % namenodeInfos.size(); |
176 | } |
177 | } |
178 | |
179 | NamenodeProxy::~NamenodeProxy() { |
180 | } |
181 | |
182 | shared_ptr<Namenode> NamenodeProxy::getActiveNamenode(uint32_t & oldValue) { |
183 | lock_guard<mutex> lock(mut); |
184 | |
185 | if (namenodes.empty()) { |
186 | THROW(HdfsFileSystemClosed, "NamenodeProxy is closed." ); |
187 | } |
188 | |
189 | oldValue = currentNamenode; |
190 | return namenodes[currentNamenode % namenodes.size()]; |
191 | } |
192 | |
193 | void NamenodeProxy::failoverToNextNamenode(uint32_t oldValue) { |
194 | lock_guard<mutex> lock(mut); |
195 | |
196 | if (oldValue != currentNamenode) { |
197 | //already failover in another thread. |
198 | return; |
199 | } |
200 | |
201 | ++currentNamenode; |
202 | currentNamenode = currentNamenode % namenodes.size(); |
203 | SetInitNamenodeIndex(clusterid, currentNamenode); |
204 | } |
205 | |
206 | static void HandleHdfsFailoverException(const HdfsFailoverException & e) { |
207 | try { |
208 | Hdfs::rethrow_if_nested(e); |
209 | } catch (...) { |
210 | NESTED_THROW(Hdfs::HdfsRpcException, "%s" , e.what()); |
211 | } |
212 | |
213 | //should not reach here |
214 | abort(); |
215 | } |
216 | |
217 | #define NAMENODE_HA_RETRY_BEGIN() \ |
218 | do { \ |
219 | int __count = 0; \ |
220 | do { \ |
221 | uint32_t __oldValue = 0; \ |
222 | shared_ptr<Namenode> namenode = getActiveNamenode(__oldValue); \ |
223 | try { \ |
224 | (void)0 |
225 | |
226 | #define NAMENODE_HA_RETRY_END() \ |
227 | break; \ |
228 | } catch (const NameNodeStandbyException & e) { \ |
229 | if (!enableNamenodeHA || __count++ > maxNamenodeHARetry) { \ |
230 | throw; \ |
231 | } \ |
232 | } catch (const HdfsFailoverException & e) { \ |
233 | if (!enableNamenodeHA || __count++ > maxNamenodeHARetry) { \ |
234 | HandleHdfsFailoverException(e); \ |
235 | } \ |
236 | } \ |
237 | failoverToNextNamenode(__oldValue); \ |
238 | LOG(WARNING, "NamenodeProxy: Failover to another Namenode."); \ |
239 | } while (true); \ |
240 | } while (0) |
241 | |
242 | void NamenodeProxy::getBlockLocations(const std::string & src, int64_t offset, |
243 | int64_t length, LocatedBlocks & lbs) { |
244 | NAMENODE_HA_RETRY_BEGIN(); |
245 | namenode->getBlockLocations(src, offset, length, lbs); |
246 | NAMENODE_HA_RETRY_END(); |
247 | } |
248 | |
249 | void NamenodeProxy::create(const std::string & src, const Permission & masked, |
250 | const std::string & clientName, int flag, bool createParent, |
251 | short replication, int64_t blockSize) { |
252 | NAMENODE_HA_RETRY_BEGIN(); |
253 | namenode->create(src, masked, clientName, flag, createParent, replication, blockSize); |
254 | NAMENODE_HA_RETRY_END(); |
255 | } |
256 | |
257 | std::pair<shared_ptr<LocatedBlock>, shared_ptr<FileStatus> > |
258 | NamenodeProxy::append(const std::string& src, const std::string& clientName) { |
259 | NAMENODE_HA_RETRY_BEGIN(); |
260 | return namenode->append(src, clientName); |
261 | NAMENODE_HA_RETRY_END(); |
262 | assert(!"should not reach here" ); |
263 | return std::pair<shared_ptr<LocatedBlock>, shared_ptr<FileStatus> >(); |
264 | } |
265 | |
266 | bool NamenodeProxy::setReplication(const std::string & src, short replication) { |
267 | NAMENODE_HA_RETRY_BEGIN(); |
268 | return namenode->setReplication(src, replication); |
269 | NAMENODE_HA_RETRY_END(); |
270 | assert(!"should not reach here" ); |
271 | return false; |
272 | } |
273 | |
274 | void NamenodeProxy::setPermission(const std::string & src, |
275 | const Permission & permission) { |
276 | NAMENODE_HA_RETRY_BEGIN(); |
277 | namenode->setPermission(src, permission); |
278 | NAMENODE_HA_RETRY_END(); |
279 | } |
280 | |
281 | void NamenodeProxy::setOwner(const std::string & src, |
282 | const std::string & username, const std::string & groupname) { |
283 | NAMENODE_HA_RETRY_BEGIN(); |
284 | namenode->setOwner(src, username, groupname); |
285 | NAMENODE_HA_RETRY_END(); |
286 | } |
287 | |
288 | void NamenodeProxy::abandonBlock(const ExtendedBlock & b, |
289 | const std::string & src, const std::string & holder) { |
290 | NAMENODE_HA_RETRY_BEGIN(); |
291 | namenode->abandonBlock(b, src, holder); |
292 | NAMENODE_HA_RETRY_END(); |
293 | } |
294 | |
295 | shared_ptr<LocatedBlock> NamenodeProxy::addBlock(const std::string & src, |
296 | const std::string & clientName, const ExtendedBlock * previous, |
297 | const std::vector<DatanodeInfo> & excludeNodes) { |
298 | NAMENODE_HA_RETRY_BEGIN(); |
299 | return namenode->addBlock(src, clientName, previous, excludeNodes); |
300 | NAMENODE_HA_RETRY_END(); |
301 | assert(!"should not reach here" ); |
302 | return shared_ptr<LocatedBlock>(); |
303 | } |
304 | |
305 | shared_ptr<LocatedBlock> NamenodeProxy::getAdditionalDatanode( |
306 | const std::string & src, const ExtendedBlock & blk, |
307 | const std::vector<DatanodeInfo> & existings, |
308 | const std::vector<std::string> & storageIDs, |
309 | const std::vector<DatanodeInfo> & excludes, int numAdditionalNodes, |
310 | const std::string & clientName) { |
311 | NAMENODE_HA_RETRY_BEGIN(); |
312 | return namenode->getAdditionalDatanode(src, blk, existings, |
313 | storageIDs, excludes, numAdditionalNodes, clientName); |
314 | NAMENODE_HA_RETRY_END(); |
315 | assert(!"should not reach here" ); |
316 | return shared_ptr<LocatedBlock>(); |
317 | } |
318 | |
319 | bool NamenodeProxy::complete(const std::string & src, |
320 | const std::string & clientName, const ExtendedBlock * last) { |
321 | NAMENODE_HA_RETRY_BEGIN(); |
322 | return namenode->complete(src, clientName, last); |
323 | NAMENODE_HA_RETRY_END(); |
324 | assert(!"should not reach here" ); |
325 | return false; |
326 | } |
327 | |
328 | /*void NamenodeProxy::reportBadBlocks(const std::vector<LocatedBlock> & blocks) { |
329 | NAMENODE_HA_RETRY_BEGIN(); |
330 | namenode->reportBadBlocks(blocks); |
331 | NAMENODE_HA_RETRY_END(); |
332 | }*/ |
333 | |
334 | bool NamenodeProxy::rename(const std::string & src, const std::string & dst) { |
335 | NAMENODE_HA_RETRY_BEGIN(); |
336 | return namenode->rename(src, dst); |
337 | NAMENODE_HA_RETRY_END(); |
338 | assert(!"should not reach here" ); |
339 | return false; |
340 | } |
341 | |
342 | /* |
343 | void NamenodeProxy::concat(const std::string & trg, |
344 | const std::vector<std::string> & srcs) { |
345 | NAMENODE_HA_RETRY_BEGIN(); |
346 | namenode->concat(trg, srcs); |
347 | NAMENODE_HA_RETRY_END(); |
348 | } |
349 | */ |
350 | |
351 | bool NamenodeProxy::truncate(const std::string & src, int64_t size, |
352 | const std::string & clientName) { |
353 | NAMENODE_HA_RETRY_BEGIN(); |
354 | return namenode->truncate(src, size, clientName); |
355 | NAMENODE_HA_RETRY_END(); |
356 | } |
357 | |
358 | void NamenodeProxy::getLease(const std::string & src, |
359 | const std::string & clientName) { |
360 | NAMENODE_HA_RETRY_BEGIN(); |
361 | namenode->getLease(src, clientName); |
362 | NAMENODE_HA_RETRY_END(); |
363 | } |
364 | |
365 | void NamenodeProxy::releaseLease(const std::string & src, |
366 | const std::string & clientName) { |
367 | NAMENODE_HA_RETRY_BEGIN(); |
368 | namenode->releaseLease(src, clientName); |
369 | NAMENODE_HA_RETRY_END(); |
370 | } |
371 | |
372 | bool NamenodeProxy::deleteFile(const std::string & src, bool recursive) { |
373 | NAMENODE_HA_RETRY_BEGIN(); |
374 | return namenode->deleteFile(src, recursive); |
375 | NAMENODE_HA_RETRY_END(); |
376 | assert(!"should not reach here" ); |
377 | return false; |
378 | } |
379 | |
380 | bool NamenodeProxy::mkdirs(const std::string & src, const Permission & masked, |
381 | bool createParent) { |
382 | NAMENODE_HA_RETRY_BEGIN(); |
383 | return namenode->mkdirs(src, masked, createParent); |
384 | NAMENODE_HA_RETRY_END(); |
385 | assert(!"should not reach here" ); |
386 | return false; |
387 | } |
388 | |
389 | bool NamenodeProxy::getListing(const std::string & src, |
390 | const std::string & startAfter, bool needLocation, |
391 | std::vector<FileStatus> & dl) { |
392 | NAMENODE_HA_RETRY_BEGIN(); |
393 | return namenode->getListing(src, startAfter, needLocation, dl); |
394 | NAMENODE_HA_RETRY_END(); |
395 | assert(!"should not reach here" ); |
396 | return false; |
397 | } |
398 | |
399 | void NamenodeProxy::renewLease(const std::string & clientName) { |
400 | NAMENODE_HA_RETRY_BEGIN(); |
401 | namenode->renewLease(clientName); |
402 | NAMENODE_HA_RETRY_END(); |
403 | } |
404 | |
405 | /*bool NamenodeProxy::recoverLease(const std::string & src, |
406 | const std::string & clientName) { |
407 | NAMENODE_HA_RETRY_BEGIN(); |
408 | return namenode->recoverLease(src, clientName); |
409 | NAMENODE_HA_RETRY_END(); |
410 | assert(!"should not reach here"); |
411 | return false; |
412 | }*/ |
413 | |
414 | std::vector<int64_t> NamenodeProxy::getFsStats() { |
415 | NAMENODE_HA_RETRY_BEGIN(); |
416 | return namenode->getFsStats(); |
417 | NAMENODE_HA_RETRY_END(); |
418 | assert(!"should not reach here" ); |
419 | return std::vector<int64_t>(); |
420 | } |
421 | |
422 | /*void NamenodeProxy::metaSave(const std::string & filename) { |
423 | NAMENODE_HA_RETRY_BEGIN(); |
424 | namenode->metaSave(filename); |
425 | NAMENODE_HA_RETRY_END(); |
426 | }*/ |
427 | |
428 | FileStatus NamenodeProxy::getFileInfo(const std::string & src) { |
429 | NAMENODE_HA_RETRY_BEGIN(); |
430 | return namenode->getFileInfo(src); |
431 | NAMENODE_HA_RETRY_END(); |
432 | assert(!"should not reach here" ); |
433 | return FileStatus(); |
434 | } |
435 | |
436 | /*FileStatus NamenodeProxy::getFileLinkInfo(const std::string & src) { |
437 | NAMENODE_HA_RETRY_BEGIN(); |
438 | return namenode->getFileLinkInfo(src); |
439 | NAMENODE_HA_RETRY_END(); |
440 | assert(!"should not reach here"); |
441 | return FileStatus(); |
442 | }*/ |
443 | |
444 | /*ContentSummary NamenodeProxy::getContentSummary(const std::string & path) { |
445 | NAMENODE_HA_RETRY_BEGIN(); |
446 | return namenode->getContentSummary(path); |
447 | NAMENODE_HA_RETRY_END(); |
448 | assert(!"should not reach here"); |
449 | return ContentSummary(); |
450 | }*/ |
451 | |
452 | /*void NamenodeProxy::setQuota(const std::string & path, int64_t namespaceQuota, |
453 | int64_t diskspaceQuota) { |
454 | NAMENODE_HA_RETRY_BEGIN(); |
455 | namenode->setQuota(path, namespaceQuota, diskspaceQuota); |
456 | NAMENODE_HA_RETRY_END(); |
457 | }*/ |
458 | |
459 | void NamenodeProxy::fsync(const std::string & src, const std::string & client) { |
460 | NAMENODE_HA_RETRY_BEGIN(); |
461 | namenode->fsync(src, client); |
462 | NAMENODE_HA_RETRY_END(); |
463 | } |
464 | |
465 | void NamenodeProxy::setTimes(const std::string & src, int64_t mtime, |
466 | int64_t atime) { |
467 | NAMENODE_HA_RETRY_BEGIN(); |
468 | namenode->setTimes(src, mtime, atime); |
469 | NAMENODE_HA_RETRY_END(); |
470 | } |
471 | |
472 | /*void NamenodeProxy::createSymlink(const std::string & target, |
473 | const std::string & link, const Permission & dirPerm, |
474 | bool createParent) { |
475 | NAMENODE_HA_RETRY_BEGIN(); |
476 | namenode->createSymlink(target, link, dirPerm, createParent); |
477 | NAMENODE_HA_RETRY_END(); |
478 | }*/ |
479 | |
480 | /*std::string NamenodeProxy::getLinkTarget(const std::string & path) { |
481 | NAMENODE_HA_RETRY_BEGIN(); |
482 | return namenode->getLinkTarget(path); |
483 | NAMENODE_HA_RETRY_END(); |
484 | assert(!"should not reach here"); |
485 | return ""; |
486 | }*/ |
487 | |
488 | shared_ptr<LocatedBlock> NamenodeProxy::updateBlockForPipeline( |
489 | const ExtendedBlock & block, const std::string & clientName) { |
490 | NAMENODE_HA_RETRY_BEGIN(); |
491 | return namenode->updateBlockForPipeline(block, clientName); |
492 | NAMENODE_HA_RETRY_END(); |
493 | assert(!"should not reach here" ); |
494 | return shared_ptr<LocatedBlock>(); |
495 | } |
496 | |
497 | void NamenodeProxy::updatePipeline(const std::string & clientName, |
498 | const ExtendedBlock & oldBlock, const ExtendedBlock & newBlock, |
499 | const std::vector<DatanodeInfo> & newNodes, |
500 | const std::vector<std::string> & storageIDs) { |
501 | NAMENODE_HA_RETRY_BEGIN(); |
502 | namenode->updatePipeline(clientName, oldBlock, newBlock, newNodes, storageIDs); |
503 | NAMENODE_HA_RETRY_END(); |
504 | } |
505 | |
506 | Token NamenodeProxy::getDelegationToken(const std::string & renewer) { |
507 | NAMENODE_HA_RETRY_BEGIN(); |
508 | return namenode->getDelegationToken(renewer); |
509 | NAMENODE_HA_RETRY_END(); |
510 | assert(!"should not reach here" ); |
511 | return Token(); |
512 | } |
513 | |
514 | int64_t NamenodeProxy::renewDelegationToken(const Token & token) { |
515 | NAMENODE_HA_RETRY_BEGIN(); |
516 | return namenode->renewDelegationToken(token); |
517 | NAMENODE_HA_RETRY_END(); |
518 | assert(!"should not reach here" ); |
519 | return 0; |
520 | } |
521 | |
522 | void NamenodeProxy::cancelDelegationToken(const Token & token) { |
523 | NAMENODE_HA_RETRY_BEGIN(); |
524 | namenode->cancelDelegationToken(token); |
525 | NAMENODE_HA_RETRY_END(); |
526 | } |
527 | |
528 | void NamenodeProxy::close() { |
529 | lock_guard<mutex> lock(mut); |
530 | namenodes.clear(); |
531 | } |
532 | |
533 | } |
534 | } |
535 | |