1/********************************************************************
2 * Copyright (c) 2013 - 2014, Pivotal Inc.
3 * All rights reserved.
4 *
5 * Author: Zhanwei Wang
6 ********************************************************************/
7/********************************************************************
8 * 2014 -
9 * open source under Apache License Version 2.0
10 ********************************************************************/
11/**
12 * Licensed to the Apache Software Foundation (ASF) under one
13 * or more contributor license agreements. See the NOTICE file
14 * distributed with this work for additional information
15 * regarding copyright ownership. The ASF licenses this file
16 * to you under the Apache License, Version 2.0 (the
17 * "License"); you may not use this file except in compliance
18 * with the License. You may obtain a copy of the License at
19 *
20 * http://www.apache.org/licenses/LICENSE-2.0
21 *
22 * Unless required by applicable law or agreed to in writing, software
23 * distributed under the License is distributed on an "AS IS" BASIS,
24 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
25 * See the License for the specific language governing permissions and
26 * limitations under the License.
27 */
28#include "DateTime.h"
29#include "Pipeline.h"
30#include "Logger.h"
31#include "Exception.h"
32#include "ExceptionInternal.h"
33#include "OutputStreamInter.h"
34#include "FileSystemInter.h"
35#include "DataTransferProtocolSender.h"
36#include "datatransfer.pb.h"
37
38#include <inttypes.h>
39
40namespace Hdfs {
41namespace Internal {
42
43PipelineImpl::PipelineImpl(bool append, const char * path, const SessionConfig & conf,
44 shared_ptr<FileSystemInter> filesystem, int checksumType, int chunkSize,
45 int replication, int64_t bytesSent, PacketPool & packetPool, shared_ptr<LocatedBlock> lastBlock) :
46 checksumType(checksumType), chunkSize(chunkSize), errorIndex(-1), replication(replication), bytesAcked(
47 bytesSent), bytesSent(bytesSent), packetPool(packetPool), filesystem(filesystem), lastBlock(lastBlock), path(
48 path) {
49 canAddDatanode = conf.canAddDatanode();
50 blockWriteRetry = conf.getBlockWriteRetry();
51 connectTimeout = conf.getOutputConnTimeout();
52 readTimeout = conf.getOutputReadTimeout();
53 writeTimeout = conf.getOutputWriteTimeout();
54 clientName = filesystem->getClientName();
55
56 if (append) {
57 LOG(DEBUG2, "create pipeline for file %s to append to %s at position %" PRId64,
58 path, lastBlock->toString().c_str(), lastBlock->getNumBytes());
59 stage = PIPELINE_SETUP_APPEND;
60 assert(lastBlock);
61 nodes = lastBlock->getLocations();
62 storageIDs = lastBlock->getStorageIDs();
63 buildForAppendOrRecovery(false);
64 stage = DATA_STREAMING;
65 } else {
66 LOG(DEBUG2, "create pipeline for file %s to write to a new block", path);
67 stage = PIPELINE_SETUP_CREATE;
68 buildForNewBlock();
69 stage = DATA_STREAMING;
70 }
71}
72
73int PipelineImpl::findNewDatanode(const std::vector<DatanodeInfo> & original) {
74 if (nodes.size() != original.size() + 1) {
75 THROW(HdfsIOException, "Failed to acquire a datanode for block %s from namenode.",
76 lastBlock->toString().c_str());
77 }
78
79 for (size_t i = 0; i < nodes.size(); i++) {
80 size_t j = 0;
81
82 for (; j < original.size() && !(nodes[i] == original[j]); j++)
83 ;
84
85 if (j == original.size()) {
86 return i;
87 }
88 }
89
90 THROW(HdfsIOException, "Cannot add new datanode for block %s.", lastBlock->toString().c_str());
91}
92
93void PipelineImpl::transfer(const ExtendedBlock & blk, const DatanodeInfo & src,
94 const std::vector<DatanodeInfo> & targets, const Token & token) {
95 shared_ptr<Socket> so(new TcpSocketImpl);
96 shared_ptr<BufferedSocketReader> in(new BufferedSocketReaderImpl(*so));
97 so->connect(src.getIpAddr().c_str(), src.getXferPort(), connectTimeout);
98 DataTransferProtocolSender sender(*so, writeTimeout, src.formatAddress());
99 sender.transferBlock(blk, token, clientName.c_str(), targets);
100 int size;
101 size = in->readVarint32(readTimeout);
102 std::vector<char> buf(size);
103 in->readFully(&buf[0], size, readTimeout);
104 BlockOpResponseProto resp;
105
106 if (!resp.ParseFromArray(&buf[0], size)) {
107 THROW(HdfsIOException, "cannot parse datanode response from %s fro block %s.",
108 src.formatAddress().c_str(), lastBlock->toString().c_str());
109 }
110
111 if (Status::DT_PROTO_SUCCESS != resp.status()) {
112 THROW(HdfsIOException, "Failed to transfer block to a new datanode %s for block %s.",
113 targets[0].formatAddress().c_str(),
114 lastBlock->toString().c_str());
115 }
116}
117
118bool PipelineImpl::addDatanodeToPipeline(const std::vector<DatanodeInfo> & excludedNodes) {
119 try {
120 /*
121 * get a new datanode
122 */
123 std::vector<DatanodeInfo> original = nodes;
124 shared_ptr<LocatedBlock> lb;
125 lb = filesystem->getAdditionalDatanode(path, *lastBlock, nodes, storageIDs,
126 excludedNodes, 1);
127 nodes = lb->getLocations();
128 storageIDs = lb->getStorageIDs();
129
130 /*
131 * failed to add new datanode into pipeline.
132 */
133 if (original.size() == nodes.size()) {
134 LOG(LOG_ERROR,
135 "Failed to add new datanode into pipeline for block: %s file %s.",
136 lastBlock->toString().c_str(), path.c_str());
137 } else {
138 /*
139 * find the new datanode
140 */
141 int d = findNewDatanode(original);
142 /*
143 * in case transfer block fail.
144 */
145 errorIndex = d;
146 /*
147 * transfer replica
148 */
149 DatanodeInfo & src = d == 0 ? nodes[1] : nodes[d - 1];
150 std::vector<DatanodeInfo> targets;
151 targets.push_back(nodes[d]);
152 LOG(INFO, "Replicate block %s from %s to %s for file %s.", lastBlock->toString().c_str(),
153 src.formatAddress().c_str(), targets[0].formatAddress().c_str(), path.c_str());
154 transfer(*lastBlock, src, targets, lb->getToken());
155 errorIndex = -1;
156 return true;
157 }
158 } catch (const HdfsCanceled & e) {
159 throw;
160 } catch (const HdfsFileSystemClosed & e) {
161 throw;
162 } catch (const SafeModeException & e) {
163 throw;
164 } catch (const HdfsException & e) {
165 std::string buffer;
166 LOG(LOG_ERROR,
167 "Failed to add a new datanode into pipeline for block: %s file %s.\n%s",
168 lastBlock->toString().c_str(), path.c_str(), GetExceptionDetail(e, buffer));
169 }
170
171 return false;
172}
173
174void PipelineImpl::checkPipelineWithReplicas() {
175 if (static_cast<int>(nodes.size()) < replication) {
176 std::stringstream ss;
177 ss.imbue(std::locale::classic());
178 int size = nodes.size();
179
180 for (int i = 0; i < size - 1; ++i) {
181 ss << nodes[i].formatAddress() << ", ";
182 }
183
184 if (nodes.empty()) {
185 ss << "Empty";
186 } else {
187 ss << nodes.back().formatAddress();
188 }
189
190 LOG(WARNING,
191 "the number of nodes in pipeline is %d [%s], is less than the expected number of replica %d for block %s file %s",
192 static_cast<int>(nodes.size()), ss.str().c_str(), replication,
193 lastBlock->toString().c_str(), path.c_str());
194 }
195}
196
197void PipelineImpl::buildForAppendOrRecovery(bool recovery) {
198 int64_t gs = 0;
199 int retry = blockWriteRetry;
200 exception_ptr lastException;
201 std::vector<DatanodeInfo> excludedNodes;
202 shared_ptr<LocatedBlock> lb;
203 std::string buffer;
204
205 do {
206 /*
207 * Remove bad datanode from list of datanodes.
208 * If errorIndex was not set (i.e. appends), then do not remove
209 * any datanodes
210 */
211 if (errorIndex >= 0) {
212 assert(lastBlock);
213 LOG(LOG_ERROR, "Pipeline: node %s is invalid and removed from pipeline when %s block %s for file %s, stage = %s.",
214 nodes[errorIndex].formatAddress().c_str(),
215 (recovery ? "recovery" : "append to"), lastBlock->toString().c_str(),
216 path.c_str(), StageToString(stage));
217 excludedNodes.push_back(nodes[errorIndex]);
218 nodes.erase(nodes.begin() + errorIndex);
219
220 if (!storageIDs.empty()) {
221 storageIDs.erase(storageIDs.begin() + errorIndex);
222 }
223
224 if (nodes.empty()) {
225 THROW(HdfsIOException,
226 "Build pipeline to %s block %s failed: all datanodes are bad.",
227 (recovery ? "recovery" : "append to"), lastBlock->toString().c_str());
228 }
229
230 errorIndex = -1;
231 }
232
233 try {
234 gs = 0;
235
236 /*
237 * Check if the number of datanodes in pipeline satisfy the replication requirement,
238 * add new datanode if not
239 */
240 if (stage != PIPELINE_SETUP_CREATE && stage != PIPELINE_CLOSE
241 && static_cast<int>(nodes.size()) < replication && canAddDatanode) {
242 if (!addDatanodeToPipeline(excludedNodes)) {
243 THROW(HdfsIOException,
244 "Failed to add new datanode into pipeline for block: %s file %s, "
245 "set \"output.replace-datanode-on-failure\" to \"false\" to disable this feature.",
246 lastBlock->toString().c_str(), path.c_str());
247 }
248 }
249
250 if (errorIndex >= 0) {
251 continue;
252 }
253
254 checkPipelineWithReplicas();
255 /*
256 * Update generation stamp and access token
257 */
258 lb = filesystem->updateBlockForPipeline(*lastBlock);
259 gs = lb->getGenerationStamp();
260 /*
261 * Try to build pipeline
262 */
263 createBlockOutputStream(lb->getToken(), gs, recovery);
264 /*
265 * everything is ok, reset errorIndex.
266 */
267 errorIndex = -1;
268 lastException = exception_ptr();
269 break; //break on success
270 } catch (const HdfsInvalidBlockToken & e) {
271 lastException = current_exception();
272 recovery = true;
273 LOG(LOG_ERROR,
274 "Pipeline: Failed to build pipeline for block %s file %s, new generation stamp is %" PRId64 ",\n%s",
275 lastBlock->toString().c_str(), path.c_str(), gs, GetExceptionDetail(e, buffer));
276 LOG(INFO, "Try to recovery pipeline for block %s file %s.",
277 lastBlock->toString().c_str(), path.c_str());
278 } catch (const HdfsTimeoutException & e) {
279 lastException = current_exception();
280 recovery = true;
281 LOG(LOG_ERROR,
282 "Pipeline: Failed to build pipeline for block %s file %s, new generation stamp is %" PRId64 ",\n%s",
283 lastBlock->toString().c_str(), path.c_str(), gs, GetExceptionDetail(e, buffer));
284 LOG(INFO, "Try to recovery pipeline for block %s file %s.",
285 lastBlock->toString().c_str(), path.c_str());
286 } catch (const HdfsIOException & e) {
287 lastException = current_exception();
288 /*
289 * Set recovery flag to true in case of failed to create a pipeline for appending.
290 */
291 recovery = true;
292 LOG(LOG_ERROR,
293 "Pipeline: Failed to build pipeline for block %s file %s, new generation stamp is %" PRId64 ",\n%s",
294 lastBlock->toString().c_str(), path.c_str(), gs, GetExceptionDetail(e, buffer));
295 LOG(INFO, "Try to recovery pipeline for block %s file %s.", lastBlock->toString().c_str(), path.c_str());
296 }
297
298 /*
299 * we don't known what happened, no datanode is reported failure, reduce retry count in case infinite loop.
300 * it may caused by rpc call throw HdfsIOException
301 */
302 if (errorIndex < 0) {
303 --retry;
304 }
305 } while (retry > 0);
306
307 if (lastException) {
308 rethrow_exception(lastException);
309 }
310
311 /*
312 * Update pipeline at the namenode, non-idempotent RPC call.
313 */
314 lb->setPoolId(lastBlock->getPoolId());
315 lb->setBlockId(lastBlock->getBlockId());
316 lb->setLocations(nodes);
317 lb->setStorageIDs(storageIDs);
318 lb->setNumBytes(lastBlock->getNumBytes());
319 lb->setOffset(lastBlock->getOffset());
320 filesystem->updatePipeline(*lastBlock, *lb, nodes, storageIDs);
321 lastBlock = lb;
322}
323
324void PipelineImpl::locateNextBlock(
325 const std::vector<DatanodeInfo> & excludedNodes) {
326 milliseconds sleeptime(100);
327 milliseconds fiveSeconds(5000);
328 int retry = blockWriteRetry;
329
330 while (true) {
331 try {
332 lastBlock = filesystem->addBlock(path, lastBlock.get(),
333 excludedNodes);
334 assert(lastBlock);
335 return;
336 } catch (const NotReplicatedYetException & e) {
337 LOG(DEBUG1, "Got NotReplicatedYetException when try to addBlock for block %s, "
338 "already retry %d times, max retry %d times", lastBlock->toString().c_str(),
339 blockWriteRetry - retry, blockWriteRetry);
340
341 if (retry--) {
342 try {
343 sleep_for(sleeptime);
344 } catch (...) {
345 }
346
347 sleeptime *= 2;
348 sleeptime = sleeptime < fiveSeconds ? sleeptime : fiveSeconds;
349 } else {
350 throw;
351 }
352 }
353 }
354}
355
356static std::string FormatExcludedNodes(
357 const std::vector<DatanodeInfo> & excludedNodes) {
358 std::stringstream ss;
359 ss.imbue(std::locale::classic());
360 ss << "[";
361 int size = excludedNodes.size();
362
363 for (int i = 0; i < size - 1; ++i) {
364 ss << excludedNodes[i].formatAddress() << ", ";
365 }
366
367 if (excludedNodes.empty()) {
368 ss << "Empty";
369 } else {
370 ss << excludedNodes.back().formatAddress();
371 }
372
373 ss << "]";
374 return ss.str();
375}
376
377void PipelineImpl::buildForNewBlock() {
378 int retryAllocNewBlock = 0, retry = blockWriteRetry;
379 LocatedBlock lb;
380 std::vector<DatanodeInfo> excludedNodes;
381 shared_ptr<LocatedBlock> block = lastBlock;
382 std::string buffer;
383
384 do {
385 errorIndex = -1;
386 lastBlock = block;
387
388 try {
389 locateNextBlock(excludedNodes);
390 lastBlock->setNumBytes(0);
391 nodes = lastBlock->getLocations();
392 storageIDs = lastBlock->getStorageIDs();
393 } catch (const HdfsRpcException & e) {
394 const char * lastBlockName = lastBlock ? lastBlock->toString().c_str() : "Null";
395 LOG(LOG_ERROR,
396 "Failed to allocate a new empty block for file %s, last block %s, excluded nodes %s.\n%s",
397 path.c_str(), lastBlockName, FormatExcludedNodes(excludedNodes).c_str(), GetExceptionDetail(e, buffer));
398
399 if (retryAllocNewBlock > blockWriteRetry) {
400 throw;
401 }
402
403 LOG(INFO, "Retry to allocate a new empty block for file %s, last block %s, excluded nodes %s.",
404 path.c_str(), lastBlockName, FormatExcludedNodes(excludedNodes).c_str());
405 ++retryAllocNewBlock;
406 continue;
407 } catch (const HdfsException & e) {
408 const char * lastBlockName = lastBlock ? lastBlock->toString().c_str() : "Null";
409 LOG(LOG_ERROR,
410 "Failed to allocate a new empty block for file %s, last block %s, excluded nodes %s.\n%s",
411 path.c_str(), lastBlockName, FormatExcludedNodes(excludedNodes).c_str(), GetExceptionDetail(e, buffer));
412 throw;
413 }
414
415 retryAllocNewBlock = 0;
416 checkPipelineWithReplicas();
417
418 if (nodes.empty()) {
419 THROW(HdfsIOException,
420 "No datanode is available to create a pipeline for block %s file %s.",
421 lastBlock->toString().c_str(), path.c_str());
422 }
423
424 try {
425 createBlockOutputStream(lastBlock->getToken(), 0, false);
426 break; //break on success
427 } catch (const HdfsInvalidBlockToken & e) {
428 LOG(LOG_ERROR,
429 "Failed to setup the pipeline for new block %s file %s.\n%s",
430 lastBlock->toString().c_str(), path.c_str(), GetExceptionDetail(e, buffer));
431 } catch (const HdfsTimeoutException & e) {
432 LOG(LOG_ERROR,
433 "Failed to setup the pipeline for new block %s file %s.\n%s",
434 lastBlock->toString().c_str(), path.c_str(), GetExceptionDetail(e, buffer));
435 } catch (const HdfsIOException & e) {
436 LOG(LOG_ERROR,
437 "Failed to setup the pipeline for new block %s file %s.\n%s",
438 lastBlock->toString().c_str(), path.c_str(), GetExceptionDetail(e, buffer));
439 }
440
441 LOG(INFO, "Abandoning block: %s for file %s.", lastBlock->toString().c_str(), path.c_str());
442
443 try {
444 filesystem->abandonBlock(*lastBlock, path);
445 } catch (const HdfsException & e) {
446 LOG(LOG_ERROR,
447 "Failed to abandon useless block %s for file %s.\n%s",
448 lastBlock->toString().c_str(), path.c_str(), GetExceptionDetail(e, buffer));
449 throw;
450 }
451
452 if (errorIndex >= 0) {
453 LOG(INFO, "Excluding invalid datanode: %s for block %s for file %s",
454 nodes[errorIndex].formatAddress().c_str(), lastBlock->toString().c_str(), path.c_str());
455 excludedNodes.push_back(nodes[errorIndex]);
456 } else {
457 /*
458 * we don't known what happened, no datanode is reported failure, reduce retry count in case of infinite loop.
459 */
460 --retry;
461 }
462 } while (retry);
463}
464
465/*
466 * bad link node must be either empty or a "IP:PORT"
467 */
468void PipelineImpl::checkBadLinkFormat(const std::string & n) {
469 std::string node = n;
470
471 if (node.empty()) {
472 return;
473 }
474
475 do {
476 const char * host = &node[0], *port;
477 size_t pos = node.find_last_of(":");
478
479 if (pos == node.npos || pos + 1 == node.length()) {
480 break;
481 }
482
483 node[pos] = 0;
484 port = &node[pos + 1];
485 struct addrinfo hints, *addrs;
486 memset(&hints, 0, sizeof(hints));
487 hints.ai_family = PF_UNSPEC;
488 hints.ai_socktype = SOCK_STREAM;
489 hints.ai_flags = AI_NUMERICHOST | AI_NUMERICSERV;
490 int p;
491 char * end;
492 p = strtol(port, &end, 0);
493
494 if (p >= 65536 || p <= 0 || end != port + strlen(port)) {
495 break;
496 }
497
498 if (getaddrinfo(host, port, &hints, &addrs)) {
499 break;
500 }
501
502 freeaddrinfo(addrs);
503 return;
504 } while (0);
505
506 LOG(FATAL, "Cannot parser the firstBadLink string %s, it should be a bug or protocol incompatible.",
507 n.c_str());
508 THROW(HdfsException, "Cannot parser the firstBadLink string %s, it should be a bug or protocol incompatible.",
509 n.c_str());
510}
511
512void PipelineImpl::createBlockOutputStream(const Token & token, int64_t gs, bool recovery) {
513 std::string firstBadLink;
514 exception_ptr lastError;
515 bool needWrapException = true;
516
517 try {
518 sock = shared_ptr < Socket > (new TcpSocketImpl);
519 reader = shared_ptr<BufferedSocketReader>(new BufferedSocketReaderImpl(*sock));
520 sock->connect(nodes[0].getIpAddr().c_str(), nodes[0].getXferPort(),
521 connectTimeout);
522 std::vector<DatanodeInfo> targets;
523
524 for (size_t i = 1; i < nodes.size(); ++i) {
525 targets.push_back(nodes[i]);
526 }
527
528 DataTransferProtocolSender sender(*sock, writeTimeout,
529 nodes[0].formatAddress());
530 sender.writeBlock(*lastBlock, token, clientName.c_str(), targets,
531 (recovery ? (stage | 0x1) : stage), nodes.size(),
532 lastBlock->getNumBytes(), bytesSent, gs, checksumType, chunkSize);
533 int size;
534 size = reader->readVarint32(readTimeout);
535 std::vector<char> buf(size);
536 reader->readFully(&buf[0], size, readTimeout);
537 BlockOpResponseProto resp;
538
539 if (!resp.ParseFromArray(&buf[0], size)) {
540 THROW(HdfsIOException, "cannot parse datanode response from %s for block %s.",
541 nodes[0].formatAddress().c_str(), lastBlock->toString().c_str());
542 }
543
544 Status pipelineStatus = resp.status();
545 firstBadLink = resp.firstbadlink();
546
547 if (Status::DT_PROTO_SUCCESS != pipelineStatus) {
548 needWrapException = false;
549
550 if (Status::DT_PROTO_ERROR_ACCESS_TOKEN == pipelineStatus) {
551 THROW(HdfsInvalidBlockToken,
552 "Got access token error for connect ack with firstBadLink as %s for block %s",
553 firstBadLink.c_str(), lastBlock->toString().c_str());
554 } else {
555 THROW(HdfsIOException, "Bad connect ack with firstBadLink as %s for block %s",
556 firstBadLink.c_str(), lastBlock->toString().c_str());
557 }
558 }
559
560 return;
561 } catch (...) {
562 errorIndex = 0;
563 lastError = current_exception();
564 }
565
566 checkBadLinkFormat(firstBadLink);
567
568 if (!firstBadLink.empty()) {
569 for (size_t i = 0; i < nodes.size(); ++i) {
570 if (nodes[i].getXferAddr() == firstBadLink) {
571 errorIndex = i;
572 break;
573 }
574 }
575 }
576
577 assert(lastError);
578
579 if (!needWrapException) {
580 rethrow_exception(lastError);
581 }
582
583 try {
584 rethrow_exception(lastError);
585 } catch (const HdfsException & e) {
586 NESTED_THROW(HdfsIOException,
587 "Cannot create block output stream for block %s, "
588 "recovery flag: %s, with last generate stamp %" PRId64 ".",
589 lastBlock->toString().c_str(), (recovery ? "true" : "false"), gs);
590 }
591}
592
593void PipelineImpl::resend() {
594 assert(stage != PIPELINE_CLOSE);
595
596 for (size_t i = 0; i < packets.size(); ++i) {
597 ConstPacketBuffer b = packets[i]->getBuffer();
598 sock->writeFully(b.getBuffer(), b.getSize(), writeTimeout);
599 int64_t tmp = packets[i]->getLastByteOffsetBlock();
600 bytesSent = bytesSent > tmp ? bytesSent : tmp;
601 }
602}
603
604void PipelineImpl::send(shared_ptr<Packet> packet) {
605 ConstPacketBuffer buffer = packet->getBuffer();
606
607 if (!packet->isHeartbeat()) {
608 packets.push_back(packet);
609 }
610
611 /*
612 * too many packets pending on the ack. wait in case of consuming to much memory.
613 */
614 if (static_cast<int>(packets.size()) > packetPool.getMaxSize()) {
615 waitForAcks(false);
616 }
617
618 bool failover = false;
619
620 do {
621 try {
622 if (failover) {
623 resend();
624 } else {
625 assert(sock);
626 sock->writeFully(buffer.getBuffer(), buffer.getSize(),
627 writeTimeout);
628 int64_t tmp = packet->getLastByteOffsetBlock();
629 bytesSent = bytesSent > tmp ? bytesSent : tmp;
630 }
631
632 checkResponse(false);
633 return;
634 } catch (const HdfsIOException & e) {
635 if (errorIndex < 0) {
636 errorIndex = 0;
637 }
638
639 sock.reset();
640 }
641
642 buildForAppendOrRecovery(true);
643 failover = true;
644
645 if (stage == PIPELINE_CLOSE) {
646 assert(packets.size() == 1 && packets[0]->isLastPacketInBlock());
647 packets.clear();
648 break;
649 }
650 } while (true);
651}
652
653void PipelineImpl::processAck(PipelineAck & ack) {
654 assert(!ack.isInvalid());
655 int64_t seqno = ack.getSeqno();
656
657 if (HEART_BEAT_SEQNO == seqno) {
658 return;
659 }
660
661 assert(!packets.empty());
662 Packet & packet = *packets[0];
663
664 if (ack.isSuccess()) {
665 if (packet.getSeqno() != seqno) {
666 THROW(HdfsIOException,
667 "processAck: pipeline ack expecting seqno %" PRId64 " but received %" PRId64 " for block %s.",
668 packet.getSeqno(), seqno, lastBlock->toString().c_str());
669 }
670
671 int64_t tmp = packet.getLastByteOffsetBlock();
672 bytesAcked = tmp > bytesAcked ? tmp : bytesAcked;
673 assert(lastBlock);
674 lastBlock->setNumBytes(bytesAcked);
675
676 if (packet.isLastPacketInBlock()) {
677 sock.reset();
678 }
679
680 packetPool.relesePacket(packets[0]);
681 packets.pop_front();
682 } else {
683 for (int i = ack.getNumOfReplies() - 1; i >= 0; --i) {
684 if (Status::DT_PROTO_SUCCESS != ack.getReply(i)) {
685 errorIndex = i;
686 /*
687 * handle block token expire as same as HdfsIOException.
688 */
689 THROW(HdfsIOException,
690 "processAck: ack report error at node: %s for block %s.",
691 nodes[i].formatAddress().c_str(), lastBlock->toString().c_str());
692 }
693 }
694 }
695}
696
697void PipelineImpl::processResponse() {
698 PipelineAck ack;
699 std::vector<char> buf;
700 int size = reader->readVarint32(readTimeout);
701 ack.reset();
702 buf.resize(size);
703 reader->readFully(&buf[0], size, readTimeout);
704 ack.readFrom(&buf[0], size);
705
706 if (ack.isInvalid()) {
707 THROW(HdfsIOException,
708 "processAllAcks: get an invalid DataStreamer packet ack for block %s",
709 lastBlock->toString().c_str());
710 }
711
712 processAck(ack);
713}
714
715void PipelineImpl::checkResponse(bool wait) {
716 int timeout = wait ? readTimeout : 0;
717 bool readable = reader->poll(timeout);
718
719 if (readable) {
720 processResponse();
721 } else if (wait) {
722 THROW(HdfsIOException, "Timeout when reading response for block %s, datanode %s do not response.",
723 lastBlock->toString().c_str(),
724 nodes[0].formatAddress().c_str());
725 }
726}
727
728void PipelineImpl::flush() {
729 waitForAcks(true);
730}
731
732void PipelineImpl::waitForAcks(bool force) {
733 bool failover = false;
734
735 while (!packets.empty()) {
736 /*
737 * just wait for some acks in case of consuming too much memory.
738 */
739 if (!force && static_cast<int>(packets.size()) < packetPool.getMaxSize()) {
740 return;
741 }
742
743 try {
744 if (failover) {
745 resend();
746 }
747
748 checkResponse(true);
749 failover = false;
750 } catch (const HdfsIOException & e) {
751 if (errorIndex < 0) {
752 errorIndex = 0;
753 }
754
755 std::string buffer;
756 LOG(LOG_ERROR,
757 "Failed to flush pipeline on datanode %s for block %s file %s.\n%s",
758 nodes[errorIndex].formatAddress().c_str(), lastBlock->toString().c_str(),
759 path.c_str(), GetExceptionDetail(e, buffer));
760 LOG(INFO, "Rebuild pipeline to flush for block %s file %s.", lastBlock->toString().c_str(), path.c_str());
761 sock.reset();
762 failover = true;
763 }
764
765 if (failover) {
766 buildForAppendOrRecovery(true);
767
768 if (stage == PIPELINE_CLOSE) {
769 assert(packets.size() == 1 && packets[0]->isLastPacketInBlock());
770 packets.clear();
771 break;
772 }
773 }
774 }
775}
776
777shared_ptr<LocatedBlock> PipelineImpl::close(shared_ptr<Packet> lastPacket) {
778 waitForAcks(true);
779 lastPacket->setLastPacketInBlock(true);
780 stage = PIPELINE_CLOSE;
781 send(lastPacket);
782 waitForAcks(true);
783 sock.reset();
784 lastBlock->setNumBytes(bytesAcked);
785 LOG(DEBUG2, "close pipeline for file %s, block %s with length %" PRId64,
786 path.c_str(), lastBlock->toString().c_str(),
787 lastBlock->getNumBytes());
788 return lastBlock;
789}
790
791}
792}
793