1/********************************************************************
2 * Copyright (c) 2013 - 2014, Pivotal Inc.
3 * All rights reserved.
4 *
5 * Author: Zhanwei Wang
6 ********************************************************************/
7/********************************************************************
8 * 2014 -
9 * open source under Apache License Version 2.0
10 ********************************************************************/
11/**
12 * Licensed to the Apache Software Foundation (ASF) under one
13 * or more contributor license agreements. See the NOTICE file
14 * distributed with this work for additional information
15 * regarding copyright ownership. The ASF licenses this file
16 * to you under the Apache License, Version 2.0 (the
17 * "License"); you may not use this file except in compliance
18 * with the License. You may obtain a copy of the License at
19 *
20 * http://www.apache.org/licenses/LICENSE-2.0
21 *
22 * Unless required by applicable law or agreed to in writing, software
23 * distributed under the License is distributed on an "AS IS" BASIS,
24 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
25 * See the License for the specific language governing permissions and
26 * limitations under the License.
27 */
28#include "Atomic.h"
29#include "DateTime.h"
30#include "Exception.h"
31#include "ExceptionInternal.h"
32#include "FileSystemInter.h"
33#include "HWCrc32c.h"
34#include "LeaseRenewer.h"
35#include "Logger.h"
36#include "OutputStream.h"
37#include "OutputStreamImpl.h"
38#include "Packet.h"
39#include "PacketHeader.h"
40#include "SWCrc32c.h"
41
42#include <cassert>
43#include <inttypes.h>
44
45namespace Hdfs {
46namespace Internal {
47
48OutputStreamImpl::OutputStreamImpl() :
49/*heartBeatStop(true),*/ closed(true), isAppend(false), syncBlock(false), checksumSize(0), chunkSize(
50 0), chunksPerPacket(0), closeTimeout(0), heartBeatInterval(0), packetSize(0), position(
51 0), replication(0), blockSize(0), bytesWritten(0), cursor(0), lastFlushed(
52 0), nextSeqNo(0), packets(0) {
53 if (HWCrc32c::available()) {
54 checksum = shared_ptr < Checksum > (new HWCrc32c());
55 } else {
56 checksum = shared_ptr < Checksum > (new SWCrc32c());
57 }
58
59 checksumSize = sizeof(int32_t);
60 lastSend = steady_clock::now();
61#ifdef MOCK
62 stub = NULL;
63#endif
64}
65
66OutputStreamImpl::~OutputStreamImpl() {
67 if (!closed) {
68 try {
69 close();
70 } catch (...) {
71 }
72 }
73}
74
75void OutputStreamImpl::checkStatus() {
76 if (closed) {
77 THROW(HdfsIOException, "OutputStreamImpl: stream is not opened.");
78 }
79
80 lock_guard < mutex > lock(mut);
81
82 if (lastError != exception_ptr()) {
83 rethrow_exception(lastError);
84 }
85}
86
87void OutputStreamImpl::setError(const exception_ptr & error) {
88 try {
89 lock_guard < mutex > lock(mut);
90 lastError = error;
91 } catch (...) {
92 }
93}
94
95/**
96 * To create or append a file.
97 * @param fs hdfs file system.
98 * @param path the file path.
99 * @param flag creation flag, can be Create, Append or Create|Overwrite.
100 * @param permission create a new file with given permission.
101 * @param createParent if the parent does not exist, create it.
102 * @param replication create a file with given number of replication.
103 * @param blockSize create a file with given block size.
104 */
105void OutputStreamImpl::open(shared_ptr<FileSystemInter> fs, const char * path, int flag,
106 const Permission & permission, bool createParent, int replication,
107 int64_t blockSize) {
108 if (NULL == path || 0 == strlen(path) || replication < 0 || blockSize < 0) {
109 THROW(InvalidParameter, "Invalid parameter.");
110 }
111
112 if (!(flag == Create || flag == (Create | SyncBlock) || flag == Overwrite
113 || flag == (Overwrite | SyncBlock) || flag == Append
114 || flag == (Append | SyncBlock) || flag == (Create | Overwrite)
115 || flag == (Create | Overwrite | SyncBlock)
116 || flag == (Create | Append)
117 || flag == (Create | Append | SyncBlock))) {
118 THROW(InvalidParameter, "Invalid flag.");
119 }
120
121 try {
122 openInternal(fs, path, flag, permission, createParent, replication,
123 blockSize);
124 } catch (...) {
125 reset();
126 throw;
127 }
128}
129
130void OutputStreamImpl::computePacketChunkSize() {
131 int chunkSizeWithChecksum = chunkSize + checksumSize;
132 static const int packetHeaderSize = PacketHeader::GetPkgHeaderSize();
133 chunksPerPacket =
134 (packetSize - packetHeaderSize + chunkSizeWithChecksum - 1)
135 / chunkSizeWithChecksum;
136 chunksPerPacket = chunksPerPacket > 1 ? chunksPerPacket : 1;
137 packetSize = chunksPerPacket * chunkSizeWithChecksum + packetHeaderSize;
138 buffer.resize(chunkSize);
139}
140
141void OutputStreamImpl::initAppend() {
142 FileStatus fileInfo;
143 std::pair<shared_ptr<LocatedBlock>, shared_ptr<FileStatus> > lastBlockWithStatus;
144 lastBlockWithStatus = filesystem->append(this->path);
145 lastBlock = lastBlockWithStatus.first;
146
147 if (lastBlockWithStatus.second) {
148 fileInfo = *lastBlockWithStatus.second;
149 } else {
150 fileInfo = filesystem->getFileStatus(this->path.c_str());
151 }
152
153 closed = false;
154
155 try {
156 this->blockSize = fileInfo.getBlockSize();
157 cursor = fileInfo.getLength();
158
159 if (lastBlock) {
160 isAppend = true;
161 bytesWritten = lastBlock->getNumBytes();
162 int64_t usedInLastBlock = fileInfo.getLength() % blockSize;
163 int64_t freeInLastBlock = blockSize - usedInLastBlock;
164
165 if (freeInLastBlock == this->blockSize) {
166 THROW(HdfsIOException,
167 "OutputStreamImpl: the last block for file %s is full.",
168 this->path.c_str());
169 }
170
171 int usedInCksum = cursor % chunkSize;
172 int freeInCksum = chunkSize - usedInCksum;
173
174 if (usedInCksum > 0 && freeInCksum > 0) {
175 /*
176 * if there is space in the last partial chunk, then
177 * setup in such a way that the next packet will have only
178 * one chunk that fills up the partial chunk.
179 */
180 packetSize = 0;
181 chunkSize = freeInCksum;
182 } else {
183 /*
184 * if the remaining space in the block is smaller than
185 * that expected size of of a packet, then create
186 * smaller size packet.
187 */
188 packetSize =
189 packetSize < freeInLastBlock ?
190 packetSize : static_cast<int>(freeInLastBlock);
191 }
192 }
193 } catch (...) {
194 completeFile(false);
195 reset();
196 throw;
197 }
198
199 computePacketChunkSize();
200}
201
202void OutputStreamImpl::openInternal(shared_ptr<FileSystemInter> fs, const char * path,
203 int flag, const Permission & permission, bool createParent,
204 int replication, int64_t blockSize) {
205 filesystem = fs;
206 this->path = fs->getStandardPath(path);
207 this->replication = replication;
208 this->blockSize = blockSize;
209 syncBlock = flag & SyncBlock;
210 conf = shared_ptr < SessionConfig > (new SessionConfig(fs->getConf()));
211 LOG(DEBUG2, "open file %s for %s", this->path.c_str(), (flag & Append ? "append" : "write"));
212 packets.setMaxSize(conf->getPacketPoolSize());
213
214 if (0 == replication) {
215 this->replication = conf->getDefaultReplica();
216 } else {
217 this->replication = replication;
218 }
219
220 if (0 == blockSize) {
221 this->blockSize = conf->getDefaultBlockSize();
222 } else {
223 this->blockSize = blockSize;
224 }
225
226 chunkSize = conf->getDefaultChunkSize();
227 packetSize = conf->getDefaultPacketSize();
228 heartBeatInterval = conf->getHeartBeatInterval();
229 closeTimeout = conf->getCloseFileTimeout();
230
231 if (packetSize < chunkSize) {
232 THROW(InvalidParameter,
233 "OutputStreamImpl: packet size %d is less than the chunk size %d.",
234 packetSize, chunkSize);
235 }
236
237 if (0 != this->blockSize % chunkSize) {
238 THROW(InvalidParameter,
239 "OutputStreamImpl: block size %" PRId64 " is not the multiply of chunk size %d.",
240 this->blockSize, chunkSize);
241 }
242
243 try {
244 if (flag & Append) {
245 initAppend();
246 LeaseRenewer::GetLeaseRenewer().StartRenew(filesystem);
247 return;
248 }
249 } catch (const FileNotFoundException & e) {
250 if (!(flag & Create)) {
251 throw;
252 }
253 }
254
255 assert((flag & Create) || (flag & Overwrite));
256 fs->create(this->path, permission, flag, createParent, this->replication,
257 this->blockSize);
258 closed = false;
259 computePacketChunkSize();
260 LeaseRenewer::GetLeaseRenewer().StartRenew(filesystem);
261}
262
263/**
264 * To append data to file.
265 * @param buf the data used to append.
266 * @param size the data size.
267 */
268void OutputStreamImpl::append(const char * buf, int64_t size) {
269 LOG(DEBUG3, "append file %s size is %" PRId64 ", offset %" PRId64 " next pos %" PRId64, path.c_str(), size, cursor, size + cursor);
270
271 if (NULL == buf || size < 0) {
272 THROW(InvalidParameter, "Invalid parameter.");
273 }
274
275 checkStatus();
276
277 try {
278 appendInternal(buf, size);
279 } catch (...) {
280 setError(current_exception());
281 throw;
282 }
283}
284
285void OutputStreamImpl::appendInternal(const char * buf, int64_t size) {
286 int64_t todo = size;
287
288 while (todo > 0) {
289 int batch = buffer.size() - position;
290 batch = batch < todo ? batch : static_cast<int>(todo);
291
292 /*
293 * bypass buffer.
294 */
295 if (0 == position && todo >= static_cast<int64_t>(buffer.size())) {
296 checksum->update(buf + size - todo, batch);
297 appendChunkToPacket(buf + size - todo, batch);
298 bytesWritten += batch;
299 checksum->reset();
300 } else {
301 checksum->update(buf + size - todo, batch);
302 memcpy(&buffer[position], buf + size - todo, batch);
303 position += batch;
304
305 if (position == static_cast<int>(buffer.size())) {
306 appendChunkToPacket(&buffer[0], buffer.size());
307 bytesWritten += buffer.size();
308 checksum->reset();
309 position = 0;
310 }
311 }
312
313 todo -= batch;
314
315 if (currentPacket
316 && (currentPacket->isFull() || bytesWritten == blockSize)) {
317 sendPacket(currentPacket);
318
319 if (isAppend) {
320 isAppend = false;
321 chunkSize = conf->getDefaultChunkSize();
322 packetSize = conf->getDefaultPacketSize();
323 computePacketChunkSize();
324 }
325
326 if (bytesWritten == blockSize) {
327 closePipeline();
328 }
329 }
330 }
331
332 cursor += size;
333}
334
335void OutputStreamImpl::appendChunkToPacket(const char * buf, int size) {
336 assert(NULL != buf && size > 0);
337
338 if (!currentPacket) {
339 currentPacket = packets.getPacket(packetSize, chunksPerPacket, bytesWritten,
340 nextSeqNo++, checksumSize);
341 }
342
343 currentPacket->addChecksum(checksum->getValue());
344 currentPacket->addData(buf, size);
345 currentPacket->increaseNumChunks();
346}
347
348void OutputStreamImpl::sendPacket(shared_ptr<Packet> packet) {
349 if (!pipeline) {
350 setupPipeline();
351 }
352
353 pipeline->send(currentPacket);
354 currentPacket.reset();
355 lastSend = steady_clock::now();
356}
357
358void OutputStreamImpl::setupPipeline() {
359 assert(currentPacket);
360#ifdef MOCK
361 pipeline = stub->getPipeline();
362#else
363 pipeline = shared_ptr<Pipeline>(new PipelineImpl(isAppend, path.c_str(), *conf, filesystem,
364 CHECKSUM_TYPE_CRC32C, conf->getDefaultChunkSize(), replication,
365 currentPacket->getOffsetInBlock(), packets, lastBlock));
366#endif
367 lastSend = steady_clock::now();
368 /*
369 * start heart beat beat thread
370 */
371 /*if (heartBeatStop) {
372 if (heartBeatSender.joinable()) {
373 heartBeatSender.join();
374 }
375
376 heartBeatStop = false;
377 heartBeatSender = thread(&OutputStreamImpl::heartBeatSenderRoutine, this);
378 }*/
379}
380
381/**
382 * Flush all data in buffer and waiting for ack.
383 * Will block until get all acks.
384 */
385void OutputStreamImpl::flush() {
386 LOG(DEBUG3, "flush file %s at offset %" PRId64, path.c_str(), cursor);
387 checkStatus();
388
389 try {
390 flushInternal(false);
391 } catch (...) {
392 setError(current_exception());
393 throw;
394 }
395}
396
397void OutputStreamImpl::flushInternal(bool needSync) {
398 if (lastFlushed == cursor && !needSync) {
399 return;
400 } else {
401 lastFlushed = cursor;
402 }
403
404 if (position > 0) {
405 appendChunkToPacket(&buffer[0], position);
406 }
407
408 /*
409 * if the pipeline and currentPacket are both NULL,
410 * that means the pipeline has been closed and no more data in buffer/packet.
411 * already synced when closing pipeline.
412 */
413 if (!currentPacket && needSync && pipeline) {
414 currentPacket = packets.getPacket(packetSize, chunksPerPacket, bytesWritten,
415 nextSeqNo++, checksumSize);
416 }
417
418 lock_guard < mutex > lock(mut);
419
420 if (currentPacket) {
421 currentPacket->setSyncFlag(needSync);
422 sendPacket(currentPacket);
423 }
424
425 if (pipeline) {
426 pipeline->flush();
427 }
428}
429
430/**
431 * return the current file length.
432 * @return current file length.
433 */
434int64_t OutputStreamImpl::tell() {
435 checkStatus();
436 return cursor;
437}
438
439/**
440 * @ref OutputStream::sync
441 */
442void OutputStreamImpl::sync() {
443 LOG(DEBUG3, "sync file %s at offset %" PRId64, path.c_str(), cursor);
444 checkStatus();
445
446 try {
447 flushInternal(true);
448 } catch (...) {
449 setError(current_exception());
450 throw;
451 }
452}
453
454void OutputStreamImpl::completeFile(bool throwError) {
455 steady_clock::time_point start = steady_clock::now();
456
457 while (true) {
458 try {
459 bool success;
460 success = filesystem->complete(path, lastBlock.get());
461
462 if (success) {
463 return;
464 }
465 } catch (HdfsIOException & e) {
466 if (throwError) {
467 NESTED_THROW(HdfsIOException,
468 "OutputStreamImpl: failed to complete file %s.",
469 path.c_str());
470 } else {
471 return;
472 }
473 }
474
475 if (closeTimeout > 0) {
476 steady_clock::time_point end = steady_clock::now();
477
478 if (ToMilliSeconds(start, end) >= closeTimeout) {
479 if (throwError) {
480 THROW(HdfsIOException,
481 "OutputStreamImpl: timeout when complete file %s, timeout interval %d ms.",
482 path.c_str(), closeTimeout);
483 } else {
484 return;
485 }
486 }
487 }
488
489 try {
490 sleep_for(milliseconds(400));
491 } catch (...) {
492 }
493 }
494}
495
496/**
497 * close the stream.
498 */
499void OutputStreamImpl::closePipeline() {
500 lock_guard < mutex > lock(mut);
501
502 if (!pipeline) {
503 return;
504 }
505
506 if (currentPacket) {
507 sendPacket(currentPacket);
508 }
509
510 currentPacket = packets.getPacket(packetSize, chunksPerPacket, bytesWritten, nextSeqNo++,
511 checksumSize);
512
513 if (syncBlock) {
514 currentPacket->setSyncFlag(syncBlock);
515 }
516
517 lastBlock = pipeline->close(currentPacket);
518 assert(lastBlock);
519 currentPacket.reset();
520 pipeline.reset();
521 filesystem->fsync(path);
522 bytesWritten = 0;
523}
524
525void OutputStreamImpl::close() {
526 exception_ptr e;
527
528 if (closed) {
529 return;
530 }
531
532 try {
533 //pipeline may be broken
534 if (!lastError) {
535 if (lastFlushed != cursor && position > 0) {
536 appendChunkToPacket(&buffer[0], position);
537 }
538
539 if (lastFlushed != cursor && currentPacket) {
540 sendPacket(currentPacket);
541 }
542
543 closePipeline();
544 /*heartBeatStop = true;
545 condHeartBeatSender.notify_all();
546
547 if (heartBeatSender.joinable()) {
548 heartBeatSender.join();
549 }*/
550 completeFile(true);
551 }
552 } catch (...) {
553 e = current_exception();
554 }
555
556 LeaseRenewer::GetLeaseRenewer().StopRenew(filesystem);
557 LOG(DEBUG3, "close file %s for write with length %" PRId64, path.c_str(), cursor);
558 reset();
559
560 if (e) {
561 rethrow_exception(e);
562 }
563}
564
565void OutputStreamImpl::reset() {
566 blockSize = 0;
567 bytesWritten = 0;
568 checksum->reset();
569 chunkSize = 0;
570 chunksPerPacket = 0;
571 closed = true;
572 closeTimeout = 0;
573 conf.reset();
574 currentPacket.reset();
575 cursor = 0;
576 filesystem.reset();
577 heartBeatInterval = 0;
578 isAppend = false;
579 lastBlock.reset();
580 lastError = exception_ptr();
581 lastFlushed = 0;
582 nextSeqNo = 0;
583 packetSize = 0;
584 path.clear();
585 pipeline.reset();
586 position = 0;
587 replication = 0;
588 syncBlock = false;
589}
590
591std::string OutputStreamImpl::toString() {
592 if (path.empty()) {
593 return std::string("OutputStream for path ") + path;
594 } else {
595 return std::string("OutputStream (not opened)");
596 }
597}
598
599/*void OutputStreamImpl::heartBeatSenderRoutine() {
600 assert(heartBeatStop == false);
601
602 while (!heartBeatStop) {
603 try {
604 unique_lock < mutex > lock(mut);
605 condHeartBeatSender.wait_for(lock, milliseconds(1000));
606
607 try {
608 try {
609 if (pipeline
610 && ToMilliSeconds(lastSend, steady_clock::now())
611 >= heartBeatInterval) {
612 pipeline->send(shared_ptr < Packet > (new Packet()));
613 lastSend = steady_clock::now();
614 }
615 } catch (...) {
616 NESTED_THROW(Hdfs::HdfsIOException, "Failed to send heart beat, path: %s",
617 path.c_str());
618 }
619 } catch (...) {
620 lastError = current_exception();
621 throw;
622 }
623 } catch (const std::bad_alloc & e) {
624
625 * keep quiet if we run out of memory, since writing log need memory,
626 * that may cause the process terminated.
627
628 break;
629 } catch (const Hdfs::HdfsException & e) {
630 LOG(LOG_ERROR, "Heart beat thread exit since %s",
631 GetExceptionDetail(e));
632 } catch (const std::exception & e) {
633 LOG(LOG_ERROR, "Heart beat thread exit since %s",
634 e.what());
635 }
636 }
637
638 heartBeatStop = true;
639}*/
640
641}
642}
643