1 | /******************************************************************** |
2 | * Copyright (c) 2013 - 2014, Pivotal Inc. |
3 | * All rights reserved. |
4 | * |
5 | * Author: Zhanwei Wang |
6 | ********************************************************************/ |
7 | /******************************************************************** |
8 | * 2014 - |
9 | * open source under Apache License Version 2.0 |
10 | ********************************************************************/ |
11 | /** |
12 | * Licensed to the Apache Software Foundation (ASF) under one |
13 | * or more contributor license agreements. See the NOTICE file |
14 | * distributed with this work for additional information |
15 | * regarding copyright ownership. The ASF licenses this file |
16 | * to you under the Apache License, Version 2.0 (the |
17 | * "License"); you may not use this file except in compliance |
18 | * with the License. You may obtain a copy of the License at |
19 | * |
20 | * http://www.apache.org/licenses/LICENSE-2.0 |
21 | * |
22 | * Unless required by applicable law or agreed to in writing, software |
23 | * distributed under the License is distributed on an "AS IS" BASIS, |
24 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
25 | * See the License for the specific language governing permissions and |
26 | * limitations under the License. |
27 | */ |
28 | #include "DateTime.h" |
29 | #include "Pipeline.h" |
30 | #include "Logger.h" |
31 | #include "Exception.h" |
32 | #include "ExceptionInternal.h" |
33 | #include "OutputStreamInter.h" |
34 | #include "FileSystemInter.h" |
35 | #include "DataTransferProtocolSender.h" |
36 | #include "datatransfer.pb.h" |
37 | |
38 | #include <inttypes.h> |
39 | |
40 | namespace Hdfs { |
41 | namespace Internal { |
42 | |
43 | PipelineImpl::PipelineImpl(bool append, const char * path, const SessionConfig & conf, |
44 | shared_ptr<FileSystemInter> filesystem, int checksumType, int chunkSize, |
45 | int replication, int64_t bytesSent, PacketPool & packetPool, shared_ptr<LocatedBlock> lastBlock) : |
46 | checksumType(checksumType), chunkSize(chunkSize), errorIndex(-1), replication(replication), bytesAcked( |
47 | bytesSent), bytesSent(bytesSent), packetPool(packetPool), filesystem(filesystem), lastBlock(lastBlock), path( |
48 | path) { |
49 | canAddDatanode = conf.canAddDatanode(); |
50 | blockWriteRetry = conf.getBlockWriteRetry(); |
51 | connectTimeout = conf.getOutputConnTimeout(); |
52 | readTimeout = conf.getOutputReadTimeout(); |
53 | writeTimeout = conf.getOutputWriteTimeout(); |
54 | clientName = filesystem->getClientName(); |
55 | |
56 | if (append) { |
57 | LOG(DEBUG2, "create pipeline for file %s to append to %s at position %" PRId64, |
58 | path, lastBlock->toString().c_str(), lastBlock->getNumBytes()); |
59 | stage = PIPELINE_SETUP_APPEND; |
60 | assert(lastBlock); |
61 | nodes = lastBlock->getLocations(); |
62 | storageIDs = lastBlock->getStorageIDs(); |
63 | buildForAppendOrRecovery(false); |
64 | stage = DATA_STREAMING; |
65 | } else { |
66 | LOG(DEBUG2, "create pipeline for file %s to write to a new block" , path); |
67 | stage = PIPELINE_SETUP_CREATE; |
68 | buildForNewBlock(); |
69 | stage = DATA_STREAMING; |
70 | } |
71 | } |
72 | |
73 | int PipelineImpl::findNewDatanode(const std::vector<DatanodeInfo> & original) { |
74 | if (nodes.size() != original.size() + 1) { |
75 | THROW(HdfsIOException, "Failed to acquire a datanode for block %s from namenode." , |
76 | lastBlock->toString().c_str()); |
77 | } |
78 | |
79 | for (size_t i = 0; i < nodes.size(); i++) { |
80 | size_t j = 0; |
81 | |
82 | for (; j < original.size() && !(nodes[i] == original[j]); j++) |
83 | ; |
84 | |
85 | if (j == original.size()) { |
86 | return i; |
87 | } |
88 | } |
89 | |
90 | THROW(HdfsIOException, "Cannot add new datanode for block %s." , lastBlock->toString().c_str()); |
91 | } |
92 | |
93 | void PipelineImpl::transfer(const ExtendedBlock & blk, const DatanodeInfo & src, |
94 | const std::vector<DatanodeInfo> & targets, const Token & token) { |
95 | shared_ptr<Socket> so(new TcpSocketImpl); |
96 | shared_ptr<BufferedSocketReader> in(new BufferedSocketReaderImpl(*so)); |
97 | so->connect(src.getIpAddr().c_str(), src.getXferPort(), connectTimeout); |
98 | DataTransferProtocolSender sender(*so, writeTimeout, src.formatAddress()); |
99 | sender.transferBlock(blk, token, clientName.c_str(), targets); |
100 | int size; |
101 | size = in->readVarint32(readTimeout); |
102 | std::vector<char> buf(size); |
103 | in->readFully(&buf[0], size, readTimeout); |
104 | BlockOpResponseProto resp; |
105 | |
106 | if (!resp.ParseFromArray(&buf[0], size)) { |
107 | THROW(HdfsIOException, "cannot parse datanode response from %s fro block %s." , |
108 | src.formatAddress().c_str(), lastBlock->toString().c_str()); |
109 | } |
110 | |
111 | if (Status::DT_PROTO_SUCCESS != resp.status()) { |
112 | THROW(HdfsIOException, "Failed to transfer block to a new datanode %s for block %s." , |
113 | targets[0].formatAddress().c_str(), |
114 | lastBlock->toString().c_str()); |
115 | } |
116 | } |
117 | |
118 | bool PipelineImpl::addDatanodeToPipeline(const std::vector<DatanodeInfo> & excludedNodes) { |
119 | try { |
120 | /* |
121 | * get a new datanode |
122 | */ |
123 | std::vector<DatanodeInfo> original = nodes; |
124 | shared_ptr<LocatedBlock> lb; |
125 | lb = filesystem->getAdditionalDatanode(path, *lastBlock, nodes, storageIDs, |
126 | excludedNodes, 1); |
127 | nodes = lb->getLocations(); |
128 | storageIDs = lb->getStorageIDs(); |
129 | |
130 | /* |
131 | * failed to add new datanode into pipeline. |
132 | */ |
133 | if (original.size() == nodes.size()) { |
134 | LOG(LOG_ERROR, |
135 | "Failed to add new datanode into pipeline for block: %s file %s." , |
136 | lastBlock->toString().c_str(), path.c_str()); |
137 | } else { |
138 | /* |
139 | * find the new datanode |
140 | */ |
141 | int d = findNewDatanode(original); |
142 | /* |
143 | * in case transfer block fail. |
144 | */ |
145 | errorIndex = d; |
146 | /* |
147 | * transfer replica |
148 | */ |
149 | DatanodeInfo & src = d == 0 ? nodes[1] : nodes[d - 1]; |
150 | std::vector<DatanodeInfo> targets; |
151 | targets.push_back(nodes[d]); |
152 | LOG(INFO, "Replicate block %s from %s to %s for file %s." , lastBlock->toString().c_str(), |
153 | src.formatAddress().c_str(), targets[0].formatAddress().c_str(), path.c_str()); |
154 | transfer(*lastBlock, src, targets, lb->getToken()); |
155 | errorIndex = -1; |
156 | return true; |
157 | } |
158 | } catch (const HdfsCanceled & e) { |
159 | throw; |
160 | } catch (const HdfsFileSystemClosed & e) { |
161 | throw; |
162 | } catch (const SafeModeException & e) { |
163 | throw; |
164 | } catch (const HdfsException & e) { |
165 | std::string buffer; |
166 | LOG(LOG_ERROR, |
167 | "Failed to add a new datanode into pipeline for block: %s file %s.\n%s" , |
168 | lastBlock->toString().c_str(), path.c_str(), GetExceptionDetail(e, buffer)); |
169 | } |
170 | |
171 | return false; |
172 | } |
173 | |
174 | void PipelineImpl::checkPipelineWithReplicas() { |
175 | if (static_cast<int>(nodes.size()) < replication) { |
176 | std::stringstream ss; |
177 | ss.imbue(std::locale::classic()); |
178 | int size = nodes.size(); |
179 | |
180 | for (int i = 0; i < size - 1; ++i) { |
181 | ss << nodes[i].formatAddress() << ", " ; |
182 | } |
183 | |
184 | if (nodes.empty()) { |
185 | ss << "Empty" ; |
186 | } else { |
187 | ss << nodes.back().formatAddress(); |
188 | } |
189 | |
190 | LOG(WARNING, |
191 | "the number of nodes in pipeline is %d [%s], is less than the expected number of replica %d for block %s file %s" , |
192 | static_cast<int>(nodes.size()), ss.str().c_str(), replication, |
193 | lastBlock->toString().c_str(), path.c_str()); |
194 | } |
195 | } |
196 | |
197 | void PipelineImpl::buildForAppendOrRecovery(bool recovery) { |
198 | int64_t gs = 0; |
199 | int retry = blockWriteRetry; |
200 | exception_ptr lastException; |
201 | std::vector<DatanodeInfo> excludedNodes; |
202 | shared_ptr<LocatedBlock> lb; |
203 | std::string buffer; |
204 | |
205 | do { |
206 | /* |
207 | * Remove bad datanode from list of datanodes. |
208 | * If errorIndex was not set (i.e. appends), then do not remove |
209 | * any datanodes |
210 | */ |
211 | if (errorIndex >= 0) { |
212 | assert(lastBlock); |
213 | LOG(LOG_ERROR, "Pipeline: node %s is invalid and removed from pipeline when %s block %s for file %s, stage = %s." , |
214 | nodes[errorIndex].formatAddress().c_str(), |
215 | (recovery ? "recovery" : "append to" ), lastBlock->toString().c_str(), |
216 | path.c_str(), StageToString(stage)); |
217 | excludedNodes.push_back(nodes[errorIndex]); |
218 | nodes.erase(nodes.begin() + errorIndex); |
219 | |
220 | if (!storageIDs.empty()) { |
221 | storageIDs.erase(storageIDs.begin() + errorIndex); |
222 | } |
223 | |
224 | if (nodes.empty()) { |
225 | THROW(HdfsIOException, |
226 | "Build pipeline to %s block %s failed: all datanodes are bad." , |
227 | (recovery ? "recovery" : "append to" ), lastBlock->toString().c_str()); |
228 | } |
229 | |
230 | errorIndex = -1; |
231 | } |
232 | |
233 | try { |
234 | gs = 0; |
235 | |
236 | /* |
237 | * Check if the number of datanodes in pipeline satisfy the replication requirement, |
238 | * add new datanode if not |
239 | */ |
240 | if (stage != PIPELINE_SETUP_CREATE && stage != PIPELINE_CLOSE |
241 | && static_cast<int>(nodes.size()) < replication && canAddDatanode) { |
242 | if (!addDatanodeToPipeline(excludedNodes)) { |
243 | THROW(HdfsIOException, |
244 | "Failed to add new datanode into pipeline for block: %s file %s, " |
245 | "set \"output.replace-datanode-on-failure\" to \"false\" to disable this feature." , |
246 | lastBlock->toString().c_str(), path.c_str()); |
247 | } |
248 | } |
249 | |
250 | if (errorIndex >= 0) { |
251 | continue; |
252 | } |
253 | |
254 | checkPipelineWithReplicas(); |
255 | /* |
256 | * Update generation stamp and access token |
257 | */ |
258 | lb = filesystem->updateBlockForPipeline(*lastBlock); |
259 | gs = lb->getGenerationStamp(); |
260 | /* |
261 | * Try to build pipeline |
262 | */ |
263 | createBlockOutputStream(lb->getToken(), gs, recovery); |
264 | /* |
265 | * everything is ok, reset errorIndex. |
266 | */ |
267 | errorIndex = -1; |
268 | lastException = exception_ptr(); |
269 | break; //break on success |
270 | } catch (const HdfsInvalidBlockToken & e) { |
271 | lastException = current_exception(); |
272 | recovery = true; |
273 | LOG(LOG_ERROR, |
274 | "Pipeline: Failed to build pipeline for block %s file %s, new generation stamp is %" PRId64 ",\n%s" , |
275 | lastBlock->toString().c_str(), path.c_str(), gs, GetExceptionDetail(e, buffer)); |
276 | LOG(INFO, "Try to recovery pipeline for block %s file %s." , |
277 | lastBlock->toString().c_str(), path.c_str()); |
278 | } catch (const HdfsTimeoutException & e) { |
279 | lastException = current_exception(); |
280 | recovery = true; |
281 | LOG(LOG_ERROR, |
282 | "Pipeline: Failed to build pipeline for block %s file %s, new generation stamp is %" PRId64 ",\n%s" , |
283 | lastBlock->toString().c_str(), path.c_str(), gs, GetExceptionDetail(e, buffer)); |
284 | LOG(INFO, "Try to recovery pipeline for block %s file %s." , |
285 | lastBlock->toString().c_str(), path.c_str()); |
286 | } catch (const HdfsIOException & e) { |
287 | lastException = current_exception(); |
288 | /* |
289 | * Set recovery flag to true in case of failed to create a pipeline for appending. |
290 | */ |
291 | recovery = true; |
292 | LOG(LOG_ERROR, |
293 | "Pipeline: Failed to build pipeline for block %s file %s, new generation stamp is %" PRId64 ",\n%s" , |
294 | lastBlock->toString().c_str(), path.c_str(), gs, GetExceptionDetail(e, buffer)); |
295 | LOG(INFO, "Try to recovery pipeline for block %s file %s." , lastBlock->toString().c_str(), path.c_str()); |
296 | } |
297 | |
298 | /* |
299 | * we don't known what happened, no datanode is reported failure, reduce retry count in case infinite loop. |
300 | * it may caused by rpc call throw HdfsIOException |
301 | */ |
302 | if (errorIndex < 0) { |
303 | --retry; |
304 | } |
305 | } while (retry > 0); |
306 | |
307 | if (lastException) { |
308 | rethrow_exception(lastException); |
309 | } |
310 | |
311 | /* |
312 | * Update pipeline at the namenode, non-idempotent RPC call. |
313 | */ |
314 | lb->setPoolId(lastBlock->getPoolId()); |
315 | lb->setBlockId(lastBlock->getBlockId()); |
316 | lb->setLocations(nodes); |
317 | lb->setStorageIDs(storageIDs); |
318 | lb->setNumBytes(lastBlock->getNumBytes()); |
319 | lb->setOffset(lastBlock->getOffset()); |
320 | filesystem->updatePipeline(*lastBlock, *lb, nodes, storageIDs); |
321 | lastBlock = lb; |
322 | } |
323 | |
324 | void PipelineImpl::locateNextBlock( |
325 | const std::vector<DatanodeInfo> & excludedNodes) { |
326 | milliseconds sleeptime(100); |
327 | milliseconds fiveSeconds(5000); |
328 | int retry = blockWriteRetry; |
329 | |
330 | while (true) { |
331 | try { |
332 | lastBlock = filesystem->addBlock(path, lastBlock.get(), |
333 | excludedNodes); |
334 | assert(lastBlock); |
335 | return; |
336 | } catch (const NotReplicatedYetException & e) { |
337 | LOG(DEBUG1, "Got NotReplicatedYetException when try to addBlock for block %s, " |
338 | "already retry %d times, max retry %d times" , lastBlock->toString().c_str(), |
339 | blockWriteRetry - retry, blockWriteRetry); |
340 | |
341 | if (retry--) { |
342 | try { |
343 | sleep_for(sleeptime); |
344 | } catch (...) { |
345 | } |
346 | |
347 | sleeptime *= 2; |
348 | sleeptime = sleeptime < fiveSeconds ? sleeptime : fiveSeconds; |
349 | } else { |
350 | throw; |
351 | } |
352 | } |
353 | } |
354 | } |
355 | |
356 | static std::string FormatExcludedNodes( |
357 | const std::vector<DatanodeInfo> & excludedNodes) { |
358 | std::stringstream ss; |
359 | ss.imbue(std::locale::classic()); |
360 | ss << "[" ; |
361 | int size = excludedNodes.size(); |
362 | |
363 | for (int i = 0; i < size - 1; ++i) { |
364 | ss << excludedNodes[i].formatAddress() << ", " ; |
365 | } |
366 | |
367 | if (excludedNodes.empty()) { |
368 | ss << "Empty" ; |
369 | } else { |
370 | ss << excludedNodes.back().formatAddress(); |
371 | } |
372 | |
373 | ss << "]" ; |
374 | return ss.str(); |
375 | } |
376 | |
377 | void PipelineImpl::buildForNewBlock() { |
378 | int retryAllocNewBlock = 0, retry = blockWriteRetry; |
379 | LocatedBlock lb; |
380 | std::vector<DatanodeInfo> excludedNodes; |
381 | shared_ptr<LocatedBlock> block = lastBlock; |
382 | std::string buffer; |
383 | |
384 | do { |
385 | errorIndex = -1; |
386 | lastBlock = block; |
387 | |
388 | try { |
389 | locateNextBlock(excludedNodes); |
390 | lastBlock->setNumBytes(0); |
391 | nodes = lastBlock->getLocations(); |
392 | storageIDs = lastBlock->getStorageIDs(); |
393 | } catch (const HdfsRpcException & e) { |
394 | const char * lastBlockName = lastBlock ? lastBlock->toString().c_str() : "Null" ; |
395 | LOG(LOG_ERROR, |
396 | "Failed to allocate a new empty block for file %s, last block %s, excluded nodes %s.\n%s" , |
397 | path.c_str(), lastBlockName, FormatExcludedNodes(excludedNodes).c_str(), GetExceptionDetail(e, buffer)); |
398 | |
399 | if (retryAllocNewBlock > blockWriteRetry) { |
400 | throw; |
401 | } |
402 | |
403 | LOG(INFO, "Retry to allocate a new empty block for file %s, last block %s, excluded nodes %s." , |
404 | path.c_str(), lastBlockName, FormatExcludedNodes(excludedNodes).c_str()); |
405 | ++retryAllocNewBlock; |
406 | continue; |
407 | } catch (const HdfsException & e) { |
408 | const char * lastBlockName = lastBlock ? lastBlock->toString().c_str() : "Null" ; |
409 | LOG(LOG_ERROR, |
410 | "Failed to allocate a new empty block for file %s, last block %s, excluded nodes %s.\n%s" , |
411 | path.c_str(), lastBlockName, FormatExcludedNodes(excludedNodes).c_str(), GetExceptionDetail(e, buffer)); |
412 | throw; |
413 | } |
414 | |
415 | retryAllocNewBlock = 0; |
416 | checkPipelineWithReplicas(); |
417 | |
418 | if (nodes.empty()) { |
419 | THROW(HdfsIOException, |
420 | "No datanode is available to create a pipeline for block %s file %s." , |
421 | lastBlock->toString().c_str(), path.c_str()); |
422 | } |
423 | |
424 | try { |
425 | createBlockOutputStream(lastBlock->getToken(), 0, false); |
426 | break; //break on success |
427 | } catch (const HdfsInvalidBlockToken & e) { |
428 | LOG(LOG_ERROR, |
429 | "Failed to setup the pipeline for new block %s file %s.\n%s" , |
430 | lastBlock->toString().c_str(), path.c_str(), GetExceptionDetail(e, buffer)); |
431 | } catch (const HdfsTimeoutException & e) { |
432 | LOG(LOG_ERROR, |
433 | "Failed to setup the pipeline for new block %s file %s.\n%s" , |
434 | lastBlock->toString().c_str(), path.c_str(), GetExceptionDetail(e, buffer)); |
435 | } catch (const HdfsIOException & e) { |
436 | LOG(LOG_ERROR, |
437 | "Failed to setup the pipeline for new block %s file %s.\n%s" , |
438 | lastBlock->toString().c_str(), path.c_str(), GetExceptionDetail(e, buffer)); |
439 | } |
440 | |
441 | LOG(INFO, "Abandoning block: %s for file %s." , lastBlock->toString().c_str(), path.c_str()); |
442 | |
443 | try { |
444 | filesystem->abandonBlock(*lastBlock, path); |
445 | } catch (const HdfsException & e) { |
446 | LOG(LOG_ERROR, |
447 | "Failed to abandon useless block %s for file %s.\n%s" , |
448 | lastBlock->toString().c_str(), path.c_str(), GetExceptionDetail(e, buffer)); |
449 | throw; |
450 | } |
451 | |
452 | if (errorIndex >= 0) { |
453 | LOG(INFO, "Excluding invalid datanode: %s for block %s for file %s" , |
454 | nodes[errorIndex].formatAddress().c_str(), lastBlock->toString().c_str(), path.c_str()); |
455 | excludedNodes.push_back(nodes[errorIndex]); |
456 | } else { |
457 | /* |
458 | * we don't known what happened, no datanode is reported failure, reduce retry count in case of infinite loop. |
459 | */ |
460 | --retry; |
461 | } |
462 | } while (retry); |
463 | } |
464 | |
465 | /* |
466 | * bad link node must be either empty or a "IP:PORT" |
467 | */ |
468 | void PipelineImpl::checkBadLinkFormat(const std::string & n) { |
469 | std::string node = n; |
470 | |
471 | if (node.empty()) { |
472 | return; |
473 | } |
474 | |
475 | do { |
476 | const char * host = &node[0], *port; |
477 | size_t pos = node.find_last_of(":" ); |
478 | |
479 | if (pos == node.npos || pos + 1 == node.length()) { |
480 | break; |
481 | } |
482 | |
483 | node[pos] = 0; |
484 | port = &node[pos + 1]; |
485 | struct addrinfo hints, *addrs; |
486 | memset(&hints, 0, sizeof(hints)); |
487 | hints.ai_family = PF_UNSPEC; |
488 | hints.ai_socktype = SOCK_STREAM; |
489 | hints.ai_flags = AI_NUMERICHOST | AI_NUMERICSERV; |
490 | int p; |
491 | char * end; |
492 | p = strtol(port, &end, 0); |
493 | |
494 | if (p >= 65536 || p <= 0 || end != port + strlen(port)) { |
495 | break; |
496 | } |
497 | |
498 | if (getaddrinfo(host, port, &hints, &addrs)) { |
499 | break; |
500 | } |
501 | |
502 | freeaddrinfo(addrs); |
503 | return; |
504 | } while (0); |
505 | |
506 | LOG(FATAL, "Cannot parser the firstBadLink string %s, it should be a bug or protocol incompatible." , |
507 | n.c_str()); |
508 | THROW(HdfsException, "Cannot parser the firstBadLink string %s, it should be a bug or protocol incompatible." , |
509 | n.c_str()); |
510 | } |
511 | |
512 | void PipelineImpl::createBlockOutputStream(const Token & token, int64_t gs, bool recovery) { |
513 | std::string firstBadLink; |
514 | exception_ptr lastError; |
515 | bool needWrapException = true; |
516 | |
517 | try { |
518 | sock = shared_ptr < Socket > (new TcpSocketImpl); |
519 | reader = shared_ptr<BufferedSocketReader>(new BufferedSocketReaderImpl(*sock)); |
520 | sock->connect(nodes[0].getIpAddr().c_str(), nodes[0].getXferPort(), |
521 | connectTimeout); |
522 | std::vector<DatanodeInfo> targets; |
523 | |
524 | for (size_t i = 1; i < nodes.size(); ++i) { |
525 | targets.push_back(nodes[i]); |
526 | } |
527 | |
528 | DataTransferProtocolSender sender(*sock, writeTimeout, |
529 | nodes[0].formatAddress()); |
530 | sender.writeBlock(*lastBlock, token, clientName.c_str(), targets, |
531 | (recovery ? (stage | 0x1) : stage), nodes.size(), |
532 | lastBlock->getNumBytes(), bytesSent, gs, checksumType, chunkSize); |
533 | int size; |
534 | size = reader->readVarint32(readTimeout); |
535 | std::vector<char> buf(size); |
536 | reader->readFully(&buf[0], size, readTimeout); |
537 | BlockOpResponseProto resp; |
538 | |
539 | if (!resp.ParseFromArray(&buf[0], size)) { |
540 | THROW(HdfsIOException, "cannot parse datanode response from %s for block %s." , |
541 | nodes[0].formatAddress().c_str(), lastBlock->toString().c_str()); |
542 | } |
543 | |
544 | Status pipelineStatus = resp.status(); |
545 | firstBadLink = resp.firstbadlink(); |
546 | |
547 | if (Status::DT_PROTO_SUCCESS != pipelineStatus) { |
548 | needWrapException = false; |
549 | |
550 | if (Status::DT_PROTO_ERROR_ACCESS_TOKEN == pipelineStatus) { |
551 | THROW(HdfsInvalidBlockToken, |
552 | "Got access token error for connect ack with firstBadLink as %s for block %s" , |
553 | firstBadLink.c_str(), lastBlock->toString().c_str()); |
554 | } else { |
555 | THROW(HdfsIOException, "Bad connect ack with firstBadLink as %s for block %s" , |
556 | firstBadLink.c_str(), lastBlock->toString().c_str()); |
557 | } |
558 | } |
559 | |
560 | return; |
561 | } catch (...) { |
562 | errorIndex = 0; |
563 | lastError = current_exception(); |
564 | } |
565 | |
566 | checkBadLinkFormat(firstBadLink); |
567 | |
568 | if (!firstBadLink.empty()) { |
569 | for (size_t i = 0; i < nodes.size(); ++i) { |
570 | if (nodes[i].getXferAddr() == firstBadLink) { |
571 | errorIndex = i; |
572 | break; |
573 | } |
574 | } |
575 | } |
576 | |
577 | assert(lastError); |
578 | |
579 | if (!needWrapException) { |
580 | rethrow_exception(lastError); |
581 | } |
582 | |
583 | try { |
584 | rethrow_exception(lastError); |
585 | } catch (const HdfsException & e) { |
586 | NESTED_THROW(HdfsIOException, |
587 | "Cannot create block output stream for block %s, " |
588 | "recovery flag: %s, with last generate stamp %" PRId64 "." , |
589 | lastBlock->toString().c_str(), (recovery ? "true" : "false" ), gs); |
590 | } |
591 | } |
592 | |
593 | void PipelineImpl::resend() { |
594 | assert(stage != PIPELINE_CLOSE); |
595 | |
596 | for (size_t i = 0; i < packets.size(); ++i) { |
597 | ConstPacketBuffer b = packets[i]->getBuffer(); |
598 | sock->writeFully(b.getBuffer(), b.getSize(), writeTimeout); |
599 | int64_t tmp = packets[i]->getLastByteOffsetBlock(); |
600 | bytesSent = bytesSent > tmp ? bytesSent : tmp; |
601 | } |
602 | } |
603 | |
604 | void PipelineImpl::send(shared_ptr<Packet> packet) { |
605 | ConstPacketBuffer buffer = packet->getBuffer(); |
606 | |
607 | if (!packet->isHeartbeat()) { |
608 | packets.push_back(packet); |
609 | } |
610 | |
611 | /* |
612 | * too many packets pending on the ack. wait in case of consuming to much memory. |
613 | */ |
614 | if (static_cast<int>(packets.size()) > packetPool.getMaxSize()) { |
615 | waitForAcks(false); |
616 | } |
617 | |
618 | bool failover = false; |
619 | |
620 | do { |
621 | try { |
622 | if (failover) { |
623 | resend(); |
624 | } else { |
625 | assert(sock); |
626 | sock->writeFully(buffer.getBuffer(), buffer.getSize(), |
627 | writeTimeout); |
628 | int64_t tmp = packet->getLastByteOffsetBlock(); |
629 | bytesSent = bytesSent > tmp ? bytesSent : tmp; |
630 | } |
631 | |
632 | checkResponse(false); |
633 | return; |
634 | } catch (const HdfsIOException & e) { |
635 | if (errorIndex < 0) { |
636 | errorIndex = 0; |
637 | } |
638 | |
639 | sock.reset(); |
640 | } |
641 | |
642 | buildForAppendOrRecovery(true); |
643 | failover = true; |
644 | |
645 | if (stage == PIPELINE_CLOSE) { |
646 | assert(packets.size() == 1 && packets[0]->isLastPacketInBlock()); |
647 | packets.clear(); |
648 | break; |
649 | } |
650 | } while (true); |
651 | } |
652 | |
653 | void PipelineImpl::processAck(PipelineAck & ack) { |
654 | assert(!ack.isInvalid()); |
655 | int64_t seqno = ack.getSeqno(); |
656 | |
657 | if (HEART_BEAT_SEQNO == seqno) { |
658 | return; |
659 | } |
660 | |
661 | assert(!packets.empty()); |
662 | Packet & packet = *packets[0]; |
663 | |
664 | if (ack.isSuccess()) { |
665 | if (packet.getSeqno() != seqno) { |
666 | THROW(HdfsIOException, |
667 | "processAck: pipeline ack expecting seqno %" PRId64 " but received %" PRId64 " for block %s." , |
668 | packet.getSeqno(), seqno, lastBlock->toString().c_str()); |
669 | } |
670 | |
671 | int64_t tmp = packet.getLastByteOffsetBlock(); |
672 | bytesAcked = tmp > bytesAcked ? tmp : bytesAcked; |
673 | assert(lastBlock); |
674 | lastBlock->setNumBytes(bytesAcked); |
675 | |
676 | if (packet.isLastPacketInBlock()) { |
677 | sock.reset(); |
678 | } |
679 | |
680 | packetPool.relesePacket(packets[0]); |
681 | packets.pop_front(); |
682 | } else { |
683 | for (int i = ack.getNumOfReplies() - 1; i >= 0; --i) { |
684 | if (Status::DT_PROTO_SUCCESS != ack.getReply(i)) { |
685 | errorIndex = i; |
686 | /* |
687 | * handle block token expire as same as HdfsIOException. |
688 | */ |
689 | THROW(HdfsIOException, |
690 | "processAck: ack report error at node: %s for block %s." , |
691 | nodes[i].formatAddress().c_str(), lastBlock->toString().c_str()); |
692 | } |
693 | } |
694 | } |
695 | } |
696 | |
697 | void PipelineImpl::processResponse() { |
698 | PipelineAck ack; |
699 | std::vector<char> buf; |
700 | int size = reader->readVarint32(readTimeout); |
701 | ack.reset(); |
702 | buf.resize(size); |
703 | reader->readFully(&buf[0], size, readTimeout); |
704 | ack.readFrom(&buf[0], size); |
705 | |
706 | if (ack.isInvalid()) { |
707 | THROW(HdfsIOException, |
708 | "processAllAcks: get an invalid DataStreamer packet ack for block %s" , |
709 | lastBlock->toString().c_str()); |
710 | } |
711 | |
712 | processAck(ack); |
713 | } |
714 | |
715 | void PipelineImpl::checkResponse(bool wait) { |
716 | int timeout = wait ? readTimeout : 0; |
717 | bool readable = reader->poll(timeout); |
718 | |
719 | if (readable) { |
720 | processResponse(); |
721 | } else if (wait) { |
722 | THROW(HdfsIOException, "Timeout when reading response for block %s, datanode %s do not response." , |
723 | lastBlock->toString().c_str(), |
724 | nodes[0].formatAddress().c_str()); |
725 | } |
726 | } |
727 | |
728 | void PipelineImpl::flush() { |
729 | waitForAcks(true); |
730 | } |
731 | |
732 | void PipelineImpl::waitForAcks(bool force) { |
733 | bool failover = false; |
734 | |
735 | while (!packets.empty()) { |
736 | /* |
737 | * just wait for some acks in case of consuming too much memory. |
738 | */ |
739 | if (!force && static_cast<int>(packets.size()) < packetPool.getMaxSize()) { |
740 | return; |
741 | } |
742 | |
743 | try { |
744 | if (failover) { |
745 | resend(); |
746 | } |
747 | |
748 | checkResponse(true); |
749 | failover = false; |
750 | } catch (const HdfsIOException & e) { |
751 | if (errorIndex < 0) { |
752 | errorIndex = 0; |
753 | } |
754 | |
755 | std::string buffer; |
756 | LOG(LOG_ERROR, |
757 | "Failed to flush pipeline on datanode %s for block %s file %s.\n%s" , |
758 | nodes[errorIndex].formatAddress().c_str(), lastBlock->toString().c_str(), |
759 | path.c_str(), GetExceptionDetail(e, buffer)); |
760 | LOG(INFO, "Rebuild pipeline to flush for block %s file %s." , lastBlock->toString().c_str(), path.c_str()); |
761 | sock.reset(); |
762 | failover = true; |
763 | } |
764 | |
765 | if (failover) { |
766 | buildForAppendOrRecovery(true); |
767 | |
768 | if (stage == PIPELINE_CLOSE) { |
769 | assert(packets.size() == 1 && packets[0]->isLastPacketInBlock()); |
770 | packets.clear(); |
771 | break; |
772 | } |
773 | } |
774 | } |
775 | } |
776 | |
777 | shared_ptr<LocatedBlock> PipelineImpl::close(shared_ptr<Packet> lastPacket) { |
778 | waitForAcks(true); |
779 | lastPacket->setLastPacketInBlock(true); |
780 | stage = PIPELINE_CLOSE; |
781 | send(lastPacket); |
782 | waitForAcks(true); |
783 | sock.reset(); |
784 | lastBlock->setNumBytes(bytesAcked); |
785 | LOG(DEBUG2, "close pipeline for file %s, block %s with length %" PRId64, |
786 | path.c_str(), lastBlock->toString().c_str(), |
787 | lastBlock->getNumBytes()); |
788 | return lastBlock; |
789 | } |
790 | |
791 | } |
792 | } |
793 | |