1/********************************************************************
2 * Copyright (c) 2013 - 2014, Pivotal Inc.
3 * All rights reserved.
4 *
5 * Author: Zhanwei Wang
6 ********************************************************************/
7/********************************************************************
8 * 2014 -
9 * open source under Apache License Version 2.0
10 ********************************************************************/
11/**
12 * Licensed to the Apache Software Foundation (ASF) under one
13 * or more contributor license agreements. See the NOTICE file
14 * distributed with this work for additional information
15 * regarding copyright ownership. The ASF licenses this file
16 * to you under the Apache License, Version 2.0 (the
17 * "License"); you may not use this file except in compliance
18 * with the License. You may obtain a copy of the License at
19 *
20 * http://www.apache.org/licenses/LICENSE-2.0
21 *
22 * Unless required by applicable law or agreed to in writing, software
23 * distributed under the License is distributed on an "AS IS" BASIS,
24 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
25 * See the License for the specific language governing permissions and
26 * limitations under the License.
27 */
28#include "Exception.h"
29#include "ExceptionInternal.h"
30#include "Logger.h"
31#include "NamenodeImpl.h"
32#include "NamenodeProxy.h"
33#include "StringUtil.h"
34
35#include <string>
36
37#include <sys/fcntl.h>
38#include <sys/stat.h>
39#include <sys/types.h>
40#include <sys/file.h>
41
42namespace Hdfs {
43namespace Internal {
44
45static uint32_t GetInitNamenodeIndex(const std::string id) {
46 std::string path = "/tmp/";
47 path += id;
48 int fd;
49 uint32_t index = 0;
50 /*
51 * try create the file
52 */
53 fd = open(path.c_str(), O_WRONLY | O_CREAT | O_EXCL, 0666);
54
55 if (fd < 0) {
56 if (errno == EEXIST) {
57 /*
58 * the file already exist, try to open it
59 */
60 fd = open(path.c_str(), O_RDONLY);
61 } else {
62 /*
63 * failed to create, do not care why
64 */
65 return 0;
66 }
67 } else {
68 if (0 != flock(fd, LOCK_EX)) {
69 /*
70 * failed to lock
71 */
72 close(fd);
73 return index;
74 }
75
76 /*
77 * created file, initialize it with 0
78 */
79 if (write(fd, &index, sizeof(index)) < 0) {
80 LOG(WARNING,
81 "NamenodeProxy: Failed to write current Namenode index into "
82 "cache file.");
83 /*
84 * ignore the failure.
85 */
86 }
87
88 flock(fd, LOCK_UN);
89 close(fd);
90 return index;
91 }
92
93 /*
94 * the file exist, read it.
95 */
96 if (fd >= 0) {
97 if (0 != flock(fd, LOCK_SH)) {
98 /*
99 * failed to lock
100 */
101 close(fd);
102 return index;
103 }
104
105 if (sizeof(index) != read(fd, &index, sizeof(index))) {
106 /*
107 * failed to read, do not care why
108 */
109 index = 0;
110 }
111
112 flock(fd, LOCK_UN);
113 close(fd);
114 }
115
116 return index;
117}
118
119static void SetInitNamenodeIndex(const std::string & id, uint32_t index) {
120 std::string path = "/tmp/";
121 path += id;
122 int fd;
123 /*
124 * try open the file for write
125 */
126 fd = open(path.c_str(), O_WRONLY);
127
128 if (fd > 0) {
129 if (0 != flock(fd, LOCK_EX)) {
130 /*
131 * failed to lock
132 */
133 close(fd);
134 return;
135 }
136
137 if (write(fd, &index, sizeof(index)) < 0) {
138 LOG(WARNING,
139 "NamenodeProxy: Failed to write current Namenode index into "
140 "cache file.");
141 /*
142 * ignore the failure.
143 */
144 }
145 flock(fd, LOCK_UN);
146 close(fd);
147 }
148}
149
150NamenodeProxy::NamenodeProxy(const std::vector<NamenodeInfo> & namenodeInfos, const std::string & tokenService,
151 const SessionConfig & c, const RpcAuth & a) :
152 clusterid(tokenService), currentNamenode(0) {
153 if (namenodeInfos.size() == 1) {
154 enableNamenodeHA = false;
155 maxNamenodeHARetry = 0;
156 } else {
157 enableNamenodeHA = true;
158 maxNamenodeHARetry = c.getRpcMaxHaRetry();
159 }
160
161 for (size_t i = 0; i < namenodeInfos.size(); ++i) {
162 std::vector<std::string> nninfo = StringSplit(namenodeInfos[i].getRpcAddr(), ":");
163
164 if (nninfo.size() != 2) {
165 THROW(InvalidParameter, "Cannot create namenode proxy, %s does not contain host or port",
166 namenodeInfos[i].getRpcAddr().c_str());
167 }
168
169 namenodes.push_back(
170 shared_ptr<Namenode>(
171 new NamenodeImpl(nninfo[0].c_str(), nninfo[1].c_str(), clusterid, c, a)));
172 }
173
174 if (enableNamenodeHA) {
175 currentNamenode = GetInitNamenodeIndex(clusterid) % namenodeInfos.size();
176 }
177}
178
179NamenodeProxy::~NamenodeProxy() {
180}
181
182shared_ptr<Namenode> NamenodeProxy::getActiveNamenode(uint32_t & oldValue) {
183 lock_guard<mutex> lock(mut);
184
185 if (namenodes.empty()) {
186 THROW(HdfsFileSystemClosed, "NamenodeProxy is closed.");
187 }
188
189 oldValue = currentNamenode;
190 return namenodes[currentNamenode % namenodes.size()];
191}
192
193void NamenodeProxy::failoverToNextNamenode(uint32_t oldValue) {
194 lock_guard<mutex> lock(mut);
195
196 if (oldValue != currentNamenode) {
197 //already failover in another thread.
198 return;
199 }
200
201 ++currentNamenode;
202 currentNamenode = currentNamenode % namenodes.size();
203 SetInitNamenodeIndex(clusterid, currentNamenode);
204}
205
206static void HandleHdfsFailoverException(const HdfsFailoverException & e) {
207 try {
208 Hdfs::rethrow_if_nested(e);
209 } catch (...) {
210 NESTED_THROW(Hdfs::HdfsRpcException, "%s", e.what());
211 }
212
213 //should not reach here
214 abort();
215}
216
217#define NAMENODE_HA_RETRY_BEGIN() \
218 do { \
219 int __count = 0; \
220 do { \
221 uint32_t __oldValue = 0; \
222 shared_ptr<Namenode> namenode = getActiveNamenode(__oldValue); \
223 try { \
224 (void)0
225
226#define NAMENODE_HA_RETRY_END() \
227 break; \
228 } catch (const NameNodeStandbyException & e) { \
229 if (!enableNamenodeHA || __count++ > maxNamenodeHARetry) { \
230 throw; \
231 } \
232 } catch (const HdfsFailoverException & e) { \
233 if (!enableNamenodeHA || __count++ > maxNamenodeHARetry) { \
234 HandleHdfsFailoverException(e); \
235 } \
236 } \
237 failoverToNextNamenode(__oldValue); \
238 LOG(WARNING, "NamenodeProxy: Failover to another Namenode."); \
239 } while (true); \
240 } while (0)
241
242void NamenodeProxy::getBlockLocations(const std::string & src, int64_t offset,
243 int64_t length, LocatedBlocks & lbs) {
244 NAMENODE_HA_RETRY_BEGIN();
245 namenode->getBlockLocations(src, offset, length, lbs);
246 NAMENODE_HA_RETRY_END();
247}
248
249void NamenodeProxy::create(const std::string & src, const Permission & masked,
250 const std::string & clientName, int flag, bool createParent,
251 short replication, int64_t blockSize) {
252 NAMENODE_HA_RETRY_BEGIN();
253 namenode->create(src, masked, clientName, flag, createParent, replication, blockSize);
254 NAMENODE_HA_RETRY_END();
255}
256
257std::pair<shared_ptr<LocatedBlock>, shared_ptr<FileStatus> >
258NamenodeProxy::append(const std::string& src, const std::string& clientName) {
259 NAMENODE_HA_RETRY_BEGIN();
260 return namenode->append(src, clientName);
261 NAMENODE_HA_RETRY_END();
262 assert(!"should not reach here");
263 return std::pair<shared_ptr<LocatedBlock>, shared_ptr<FileStatus> >();
264}
265
266bool NamenodeProxy::setReplication(const std::string & src, short replication) {
267 NAMENODE_HA_RETRY_BEGIN();
268 return namenode->setReplication(src, replication);
269 NAMENODE_HA_RETRY_END();
270 assert(!"should not reach here");
271 return false;
272}
273
274void NamenodeProxy::setPermission(const std::string & src,
275 const Permission & permission) {
276 NAMENODE_HA_RETRY_BEGIN();
277 namenode->setPermission(src, permission);
278 NAMENODE_HA_RETRY_END();
279}
280
281void NamenodeProxy::setOwner(const std::string & src,
282 const std::string & username, const std::string & groupname) {
283 NAMENODE_HA_RETRY_BEGIN();
284 namenode->setOwner(src, username, groupname);
285 NAMENODE_HA_RETRY_END();
286}
287
288void NamenodeProxy::abandonBlock(const ExtendedBlock & b,
289 const std::string & src, const std::string & holder) {
290 NAMENODE_HA_RETRY_BEGIN();
291 namenode->abandonBlock(b, src, holder);
292 NAMENODE_HA_RETRY_END();
293}
294
295shared_ptr<LocatedBlock> NamenodeProxy::addBlock(const std::string & src,
296 const std::string & clientName, const ExtendedBlock * previous,
297 const std::vector<DatanodeInfo> & excludeNodes) {
298 NAMENODE_HA_RETRY_BEGIN();
299 return namenode->addBlock(src, clientName, previous, excludeNodes);
300 NAMENODE_HA_RETRY_END();
301 assert(!"should not reach here");
302 return shared_ptr<LocatedBlock>();
303}
304
305shared_ptr<LocatedBlock> NamenodeProxy::getAdditionalDatanode(
306 const std::string & src, const ExtendedBlock & blk,
307 const std::vector<DatanodeInfo> & existings,
308 const std::vector<std::string> & storageIDs,
309 const std::vector<DatanodeInfo> & excludes, int numAdditionalNodes,
310 const std::string & clientName) {
311 NAMENODE_HA_RETRY_BEGIN();
312 return namenode->getAdditionalDatanode(src, blk, existings,
313 storageIDs, excludes, numAdditionalNodes, clientName);
314 NAMENODE_HA_RETRY_END();
315 assert(!"should not reach here");
316 return shared_ptr<LocatedBlock>();
317}
318
319bool NamenodeProxy::complete(const std::string & src,
320 const std::string & clientName, const ExtendedBlock * last) {
321 NAMENODE_HA_RETRY_BEGIN();
322 return namenode->complete(src, clientName, last);
323 NAMENODE_HA_RETRY_END();
324 assert(!"should not reach here");
325 return false;
326}
327
328/*void NamenodeProxy::reportBadBlocks(const std::vector<LocatedBlock> & blocks) {
329 NAMENODE_HA_RETRY_BEGIN();
330 namenode->reportBadBlocks(blocks);
331 NAMENODE_HA_RETRY_END();
332}*/
333
334bool NamenodeProxy::rename(const std::string & src, const std::string & dst) {
335 NAMENODE_HA_RETRY_BEGIN();
336 return namenode->rename(src, dst);
337 NAMENODE_HA_RETRY_END();
338 assert(!"should not reach here");
339 return false;
340}
341
342/*
343void NamenodeProxy::concat(const std::string & trg,
344 const std::vector<std::string> & srcs) {
345 NAMENODE_HA_RETRY_BEGIN();
346 namenode->concat(trg, srcs);
347 NAMENODE_HA_RETRY_END();
348}
349*/
350
351bool NamenodeProxy::truncate(const std::string & src, int64_t size,
352 const std::string & clientName) {
353 NAMENODE_HA_RETRY_BEGIN();
354 return namenode->truncate(src, size, clientName);
355 NAMENODE_HA_RETRY_END();
356}
357
358void NamenodeProxy::getLease(const std::string & src,
359 const std::string & clientName) {
360 NAMENODE_HA_RETRY_BEGIN();
361 namenode->getLease(src, clientName);
362 NAMENODE_HA_RETRY_END();
363}
364
365void NamenodeProxy::releaseLease(const std::string & src,
366 const std::string & clientName) {
367 NAMENODE_HA_RETRY_BEGIN();
368 namenode->releaseLease(src, clientName);
369 NAMENODE_HA_RETRY_END();
370}
371
372bool NamenodeProxy::deleteFile(const std::string & src, bool recursive) {
373 NAMENODE_HA_RETRY_BEGIN();
374 return namenode->deleteFile(src, recursive);
375 NAMENODE_HA_RETRY_END();
376 assert(!"should not reach here");
377 return false;
378}
379
380bool NamenodeProxy::mkdirs(const std::string & src, const Permission & masked,
381 bool createParent) {
382 NAMENODE_HA_RETRY_BEGIN();
383 return namenode->mkdirs(src, masked, createParent);
384 NAMENODE_HA_RETRY_END();
385 assert(!"should not reach here");
386 return false;
387}
388
389bool NamenodeProxy::getListing(const std::string & src,
390 const std::string & startAfter, bool needLocation,
391 std::vector<FileStatus> & dl) {
392 NAMENODE_HA_RETRY_BEGIN();
393 return namenode->getListing(src, startAfter, needLocation, dl);
394 NAMENODE_HA_RETRY_END();
395 assert(!"should not reach here");
396 return false;
397}
398
399void NamenodeProxy::renewLease(const std::string & clientName) {
400 NAMENODE_HA_RETRY_BEGIN();
401 namenode->renewLease(clientName);
402 NAMENODE_HA_RETRY_END();
403}
404
405/*bool NamenodeProxy::recoverLease(const std::string & src,
406 const std::string & clientName) {
407 NAMENODE_HA_RETRY_BEGIN();
408 return namenode->recoverLease(src, clientName);
409 NAMENODE_HA_RETRY_END();
410 assert(!"should not reach here");
411 return false;
412}*/
413
414std::vector<int64_t> NamenodeProxy::getFsStats() {
415 NAMENODE_HA_RETRY_BEGIN();
416 return namenode->getFsStats();
417 NAMENODE_HA_RETRY_END();
418 assert(!"should not reach here");
419 return std::vector<int64_t>();
420}
421
422/*void NamenodeProxy::metaSave(const std::string & filename) {
423 NAMENODE_HA_RETRY_BEGIN();
424 namenode->metaSave(filename);
425 NAMENODE_HA_RETRY_END();
426}*/
427
428FileStatus NamenodeProxy::getFileInfo(const std::string & src) {
429 NAMENODE_HA_RETRY_BEGIN();
430 return namenode->getFileInfo(src);
431 NAMENODE_HA_RETRY_END();
432 assert(!"should not reach here");
433 return FileStatus();
434}
435
436/*FileStatus NamenodeProxy::getFileLinkInfo(const std::string & src) {
437 NAMENODE_HA_RETRY_BEGIN();
438 return namenode->getFileLinkInfo(src);
439 NAMENODE_HA_RETRY_END();
440 assert(!"should not reach here");
441 return FileStatus();
442}*/
443
444/*ContentSummary NamenodeProxy::getContentSummary(const std::string & path) {
445 NAMENODE_HA_RETRY_BEGIN();
446 return namenode->getContentSummary(path);
447 NAMENODE_HA_RETRY_END();
448 assert(!"should not reach here");
449 return ContentSummary();
450}*/
451
452/*void NamenodeProxy::setQuota(const std::string & path, int64_t namespaceQuota,
453 int64_t diskspaceQuota) {
454 NAMENODE_HA_RETRY_BEGIN();
455 namenode->setQuota(path, namespaceQuota, diskspaceQuota);
456 NAMENODE_HA_RETRY_END();
457}*/
458
459void NamenodeProxy::fsync(const std::string & src, const std::string & client) {
460 NAMENODE_HA_RETRY_BEGIN();
461 namenode->fsync(src, client);
462 NAMENODE_HA_RETRY_END();
463}
464
465void NamenodeProxy::setTimes(const std::string & src, int64_t mtime,
466 int64_t atime) {
467 NAMENODE_HA_RETRY_BEGIN();
468 namenode->setTimes(src, mtime, atime);
469 NAMENODE_HA_RETRY_END();
470}
471
472/*void NamenodeProxy::createSymlink(const std::string & target,
473 const std::string & link, const Permission & dirPerm,
474 bool createParent) {
475 NAMENODE_HA_RETRY_BEGIN();
476 namenode->createSymlink(target, link, dirPerm, createParent);
477 NAMENODE_HA_RETRY_END();
478}*/
479
480/*std::string NamenodeProxy::getLinkTarget(const std::string & path) {
481 NAMENODE_HA_RETRY_BEGIN();
482 return namenode->getLinkTarget(path);
483 NAMENODE_HA_RETRY_END();
484 assert(!"should not reach here");
485 return "";
486}*/
487
488shared_ptr<LocatedBlock> NamenodeProxy::updateBlockForPipeline(
489 const ExtendedBlock & block, const std::string & clientName) {
490 NAMENODE_HA_RETRY_BEGIN();
491 return namenode->updateBlockForPipeline(block, clientName);
492 NAMENODE_HA_RETRY_END();
493 assert(!"should not reach here");
494 return shared_ptr<LocatedBlock>();
495}
496
497void NamenodeProxy::updatePipeline(const std::string & clientName,
498 const ExtendedBlock & oldBlock, const ExtendedBlock & newBlock,
499 const std::vector<DatanodeInfo> & newNodes,
500 const std::vector<std::string> & storageIDs) {
501 NAMENODE_HA_RETRY_BEGIN();
502 namenode->updatePipeline(clientName, oldBlock, newBlock, newNodes, storageIDs);
503 NAMENODE_HA_RETRY_END();
504}
505
506Token NamenodeProxy::getDelegationToken(const std::string & renewer) {
507 NAMENODE_HA_RETRY_BEGIN();
508 return namenode->getDelegationToken(renewer);
509 NAMENODE_HA_RETRY_END();
510 assert(!"should not reach here");
511 return Token();
512}
513
514int64_t NamenodeProxy::renewDelegationToken(const Token & token) {
515 NAMENODE_HA_RETRY_BEGIN();
516 return namenode->renewDelegationToken(token);
517 NAMENODE_HA_RETRY_END();
518 assert(!"should not reach here");
519 return 0;
520}
521
522void NamenodeProxy::cancelDelegationToken(const Token & token) {
523 NAMENODE_HA_RETRY_BEGIN();
524 namenode->cancelDelegationToken(token);
525 NAMENODE_HA_RETRY_END();
526}
527
528void NamenodeProxy::close() {
529 lock_guard<mutex> lock(mut);
530 namenodes.clear();
531}
532
533}
534}
535