1/********************************************************************
2 * Copyright (c) 2013 - 2014, Pivotal Inc.
3 * All rights reserved.
4 *
5 * Author: Zhanwei Wang
6 ********************************************************************/
7/********************************************************************
8 * 2014 -
9 * open source under Apache License Version 2.0
10 ********************************************************************/
11/**
12 * Licensed to the Apache Software Foundation (ASF) under one
13 * or more contributor license agreements. See the NOTICE file
14 * distributed with this work for additional information
15 * regarding copyright ownership. The ASF licenses this file
16 * to you under the Apache License, Version 2.0 (the
17 * "License"); you may not use this file except in compliance
18 * with the License. You may obtain a copy of the License at
19 *
20 * http://www.apache.org/licenses/LICENSE-2.0
21 *
22 * Unless required by applicable law or agreed to in writing, software
23 * distributed under the License is distributed on an "AS IS" BASIS,
24 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
25 * See the License for the specific language governing permissions and
26 * limitations under the License.
27 */
28#include "BigEndian.h"
29#include "Exception.h"
30#include "ExceptionInternal.h"
31#include "Packet.h"
32#include "PacketHeader.h"
33
34namespace Hdfs {
35namespace Internal {
36
37Packet::Packet() :
38 lastPacketInBlock(false), syncBlock(false), checksumPos(0), checksumSize(0),
39 checksumStart(0), dataPos(0), dataStart(0), headerStart(0), maxChunks(
40 0), numChunks(0), offsetInBlock(0), seqno(HEART_BEAT_SEQNO) {
41 buffer.resize(PacketHeader::GetPkgHeaderSize());
42}
43
44Packet::Packet(int pktSize, int chunksPerPkt, int64_t offsetInBlock,
45 int64_t seqno, int checksumSize) :
46 lastPacketInBlock(false), syncBlock(false), checksumSize(checksumSize), headerStart(0),
47 maxChunks(chunksPerPkt), numChunks(0), offsetInBlock(offsetInBlock), seqno(seqno), buffer(pktSize) {
48 checksumPos = checksumStart = PacketHeader::GetPkgHeaderSize();
49 dataPos = dataStart = checksumStart + chunksPerPkt * checksumSize;
50 assert(dataPos >= 0);
51}
52
53void Packet::reset(int pktSize, int chunksPerPkt, int64_t offsetInBlock,
54 int64_t seqno, int checksumSize) {
55 lastPacketInBlock = false;
56 syncBlock = false;
57 this->checksumSize = checksumSize;
58 headerStart = 0;
59 maxChunks = chunksPerPkt;
60 numChunks = 0;
61 this->offsetInBlock = offsetInBlock;
62 this->seqno = seqno;
63 checksumPos = checksumStart = PacketHeader::GetPkgHeaderSize();
64 dataPos = dataStart = checksumStart + chunksPerPkt * checksumSize;
65
66 if (pktSize > static_cast<int>(buffer.size())) {
67 buffer.resize(pktSize);
68 }
69
70 assert(dataPos >= 0);
71}
72
73void Packet::addChecksum(uint32_t checksum) {
74 if (checksumPos + static_cast<int>(sizeof(uint32_t)) > dataStart) {
75 THROW(HdfsIOException,
76 "Packet: failed to add checksum into packet, checksum is too large");
77 }
78
79 WriteBigEndian32ToArray(checksum, &buffer[checksumPos]);
80 checksumPos += checksumSize;
81}
82
83void Packet::addData(const char * buf, int size) {
84 if (size + dataPos > static_cast<int>(buffer.size())) {
85 THROW(HdfsIOException,
86 "Packet: failed add data to packet, packet size is too small");
87 }
88
89 memcpy(&buffer[dataPos], buf, size);
90 dataPos += size;
91 assert(dataPos >= 0);
92}
93
94void Packet::setSyncFlag(bool sync) {
95 syncBlock = sync;
96}
97
98void Packet::increaseNumChunks() {
99 ++numChunks;
100}
101
102bool Packet::isFull() {
103 return numChunks >= maxChunks;
104}
105
106bool Packet::isHeartbeat() {
107 return HEART_BEAT_SEQNO == seqno;
108}
109
110void Packet::setLastPacketInBlock(bool lastPacket) {
111 lastPacketInBlock = lastPacket;
112}
113
114int Packet::getDataSize() {
115 return dataPos - dataStart;
116}
117
118int64_t Packet::getLastByteOffsetBlock() {
119 assert(offsetInBlock >= 0 && dataPos >= dataStart);
120 assert(dataPos - dataStart <= maxChunks * static_cast<int>(buffer.size()));
121 return offsetInBlock + dataPos - dataStart;
122}
123
124const ConstPacketBuffer Packet::getBuffer() {
125 /*
126 * Once this is called, no more data can be added to the packet.
127 * This is called only when the packet is ready to be sent.
128 */
129 int dataLen = dataPos - dataStart;
130 int checksumLen = checksumPos - checksumStart;
131
132 if (checksumPos != dataStart) {
133 /*
134 * move the checksum to cover the gap.
135 * This can happen for the last packet.
136 */
137 memmove(&buffer[dataStart - checksumLen], &buffer[checksumStart],
138 checksumLen);
139 headerStart = dataStart - checksumPos;
140 checksumStart += dataStart - checksumPos;
141 checksumPos = dataStart;
142 }
143
144 assert(dataPos >= 0);
145 int pktLen = dataLen + checksumLen;
146 PacketHeader header(pktLen + sizeof(int32_t)
147 /* why we add 4 bytes? Because the server will reduce 4 bytes. -_-*/
148 , offsetInBlock, seqno, lastPacketInBlock, dataLen);
149 header.writeInBuffer(&buffer[headerStart],
150 PacketHeader::GetPkgHeaderSize());
151 return ConstPacketBuffer(&buffer[headerStart],
152 PacketHeader::GetPkgHeaderSize() + pktLen);
153}
154
155}
156}
157