1/********************************************************************
2 * Copyright (c) 2013 - 2014, Pivotal Inc.
3 * All rights reserved.
4 *
5 * Author: Zhanwei Wang
6 ********************************************************************/
7/********************************************************************
8 * 2014 -
9 * open source under Apache License Version 2.0
10 ********************************************************************/
11/**
12 * Licensed to the Apache Software Foundation (ASF) under one
13 * or more contributor license agreements. See the NOTICE file
14 * distributed with this work for additional information
15 * regarding copyright ownership. The ASF licenses this file
16 * to you under the Apache License, Version 2.0 (the
17 * "License"); you may not use this file except in compliance
18 * with the License. You may obtain a copy of the License at
19 *
20 * http://www.apache.org/licenses/LICENSE-2.0
21 *
22 * Unless required by applicable law or agreed to in writing, software
23 * distributed under the License is distributed on an "AS IS" BASIS,
24 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
25 * See the License for the specific language governing permissions and
26 * limitations under the License.
27 */
28#include "DateTime.h"
29#include "Exception.h"
30#include "ExceptionInternal.h"
31#include "FileSystemInter.h"
32#include "LeaseRenewer.h"
33#include "Logger.h"
34
35#define DEFAULT_LEASE_RENEW_INTERVAL (60 * 1000)
36
37namespace Hdfs {
38namespace Internal {
39
40once_flag LeaseRenewer::once;
41shared_ptr<LeaseRenewer> LeaseRenewer::renewer;
42
43LeaseRenewer & LeaseRenewer::GetLeaseRenewer() {
44 call_once(once, &LeaseRenewer::CreateSinglten);
45 assert(renewer);
46 return *renewer;
47}
48
49void LeaseRenewer::CreateSinglten() {
50 renewer = shared_ptr < LeaseRenewer > (new LeaseRenewerImpl());
51}
52
53LeaseRenewerImpl::LeaseRenewerImpl() :
54 stop(true), interval(DEFAULT_LEASE_RENEW_INTERVAL) {
55}
56
57LeaseRenewerImpl::~LeaseRenewerImpl() {
58 stop = true;
59 cond.notify_all();
60
61 if (worker.joinable()) {
62 worker.join();
63 }
64}
65
66int LeaseRenewerImpl::getInterval() const {
67 return interval;
68}
69
70void LeaseRenewerImpl::setInterval(int interval) {
71 this->interval = interval;
72}
73
74void LeaseRenewerImpl::StartRenew(shared_ptr<FileSystemInter> filesystem) {
75 lock_guard<mutex> lock(mut);
76 const char * clientName = filesystem->getClientName();
77
78 if (maps.find(clientName) == maps.end()) {
79 maps[clientName] = filesystem;
80 }
81
82 filesystem->registerOpenedOutputStream();
83
84 if (stop && !maps.empty()) {
85 if (worker.joinable()) {
86 worker.join();
87 }
88
89 stop = false;
90 CREATE_THREAD(worker, bind(&LeaseRenewerImpl::renewer, this));
91 }
92}
93
94void LeaseRenewerImpl::StopRenew(shared_ptr<FileSystemInter> filesystem) {
95 lock_guard<mutex> lock(mut);
96 const char * clientName = filesystem->getClientName();
97
98 if (filesystem->unregisterOpenedOutputStream()
99 && maps.find(clientName) != maps.end()) {
100 maps.erase(clientName);
101 }
102}
103
104void LeaseRenewerImpl::renewer() {
105 assert(stop == false);
106
107 while (!stop) {
108 try {
109 unique_lock < mutex > lock(mut);
110 cond.wait_for(lock, milliseconds(interval));
111
112 if (stop || maps.empty()) {
113 break;
114 }
115
116 std::map<std::string, shared_ptr<FileSystemInter> >::iterator s, e, d;
117 e = maps.end();
118
119 for (s = maps.begin(); s != e;) {
120 shared_ptr<FileSystemInter> fs = s->second;
121
122 try {
123 if (!fs->renewLease()) {
124 d = s++;
125 maps.erase(d);
126 } else {
127 ++s;
128 }
129
130 continue;
131 } catch (const HdfsException & e) {
132 std::string buffer;
133 LOG(LOG_ERROR,
134 "Failed to renew lease for filesystem which client name is %s, since:\n%s",
135 fs->getClientName(), GetExceptionDetail(e, buffer));
136 } catch (const std::exception & e) {
137 LOG(LOG_ERROR,
138 "Failed to renew lease for filesystem which client name is %s, since:\n%s",
139 fs->getClientName(), e.what());
140 break;
141 }
142
143 ++s;
144 }
145
146 if (maps.empty()) {
147 break;
148 }
149 } catch (const std::bad_alloc & e) {
150 /*
151 * keep quiet if we run out of memory, since writing log needs memory,
152 * that may cause the process terminated.
153 */
154 break;
155 } catch (const std::exception & e) {
156 LOG(LOG_ERROR,
157 "Lease renewer will exit since unexpected exception: %s",
158 e.what());
159 break;
160 }
161 }
162
163 stop = true;
164}
165
166}
167}
168