1/********************************************************************
2 * Copyright (c) 2013 - 2014, Pivotal Inc.
3 * All rights reserved.
4 *
5 * Author: Zhanwei Wang
6 ********************************************************************/
7/********************************************************************
8 * 2014 -
9 * open source under Apache License Version 2.0
10 ********************************************************************/
11/**
12 * Licensed to the Apache Software Foundation (ASF) under one
13 * or more contributor license agreements. See the NOTICE file
14 * distributed with this work for additional information
15 * regarding copyright ownership. The ASF licenses this file
16 * to you under the Apache License, Version 2.0 (the
17 * "License"); you may not use this file except in compliance
18 * with the License. You may obtain a copy of the License at
19 *
20 * http://www.apache.org/licenses/LICENSE-2.0
21 *
22 * Unless required by applicable law or agreed to in writing, software
23 * distributed under the License is distributed on an "AS IS" BASIS,
24 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
25 * See the License for the specific language governing permissions and
26 * limitations under the License.
27 */
28#include "BufferedSocketReader.h"
29#include "DateTime.h"
30#include "Exception.h"
31#include "ExceptionInternal.h"
32
33#include <google/protobuf/io/coded_stream.h>
34
35using namespace google::protobuf::io;
36
37namespace Hdfs {
38namespace Internal {
39
40BufferedSocketReaderImpl::BufferedSocketReaderImpl(Socket & s) :
41 cursor(0), size(0), sock(s), buffer(sizeof(int64_t)) {
42}
43
44BufferedSocketReaderImpl::BufferedSocketReaderImpl(Socket & s, size_t bufferSize) :
45 cursor(0), size(0), sock(s), buffer(bufferSize) {
46}
47
48int32_t BufferedSocketReaderImpl::read(char * b, int32_t s) {
49 assert(s > 0 && NULL != b);
50 int32_t done = s < size - cursor ? s : size - cursor;
51
52 if (done > 0) {
53 memcpy(b, &buffer[cursor], done);
54 cursor += done;
55 return done;
56 } else {
57 assert(size == cursor);
58 size = cursor = 0;
59 return sock.read(b, s);
60 }
61}
62
63void BufferedSocketReaderImpl::readFully(char * b, int32_t s, int timeout) {
64 assert(s > 0 && NULL != b);
65 int32_t done = s < size - cursor ? s : size - cursor;
66 memcpy(b, &buffer[cursor], done);
67 cursor += done;
68
69 if (done < s) {
70 assert(size == cursor);
71 size = cursor = 0;
72 sock.readFully(b + done, s - done, timeout);
73 }
74}
75
76int32_t BufferedSocketReaderImpl::readBigEndianInt32(int timeout) {
77 char buf[sizeof(int32_t)];
78 readFully(buf, sizeof(buf), timeout);
79 return ntohl(*reinterpret_cast<int32_t *>(buf));
80}
81
82int32_t BufferedSocketReaderImpl::readVarint32(int timeout) {
83 if (buffer.size() == 0) {
84 try {
85 buffer.resize(sizeof(int64_t));
86 int32_t retval = readVarint32(timeout, 1);
87 assert(size == cursor);
88 buffer.resize(0);
89 return retval;
90 } catch (...) {
91 assert(size == cursor);
92 buffer.resize(0);
93 throw;
94 }
95 } else {
96 return readVarint32(timeout, buffer.size());
97 }
98}
99
100int32_t BufferedSocketReaderImpl::readVarint32(int timeout, int32_t step) {
101 int32_t value;
102 bool rc = false;
103 int deadline = timeout;
104 memmove(&buffer[0], &buffer[cursor], size - cursor);
105 size -= cursor;
106 cursor = 0;
107
108 while (!rc) {
109 CodedInputStream in(reinterpret_cast<uint8_t *>(&buffer[cursor]),
110 size - cursor);
111 in.PushLimit(size - cursor);
112 rc = in.ReadVarint32(reinterpret_cast<uint32_t *>(&value));
113
114 if (rc) {
115 cursor += size - cursor - in.BytesUntilLimit();
116 return value;
117 }
118
119 steady_clock::time_point s = steady_clock::now();
120 CheckOperationCanceled();
121
122 if (size == static_cast<int32_t>(buffer.size())) {
123 THROW(HdfsNetworkException,
124 "Invalid varint type or buffer is too small, buffer size = %d.",
125 static_cast<int>(buffer.size()));
126 }
127
128 if (sock.poll(true, false, deadline)) {
129 int32_t todo = buffer.size() - size;
130 todo = todo < step ? todo : step;
131 size += sock.read(&buffer[size], todo);
132 }
133
134 steady_clock::time_point e = steady_clock::now();
135
136 if (timeout > 0) {
137 deadline -= ToMilliSeconds(s, e);
138 }
139
140 if (timeout >= 0 && deadline <= 0) {
141 THROW(HdfsTimeoutException, "Read %d bytes timeout", size);
142 }
143 }
144
145 return 0;
146}
147
148bool BufferedSocketReaderImpl::poll(int timeout) {
149 if (cursor < size) {
150 return true;
151 }
152
153 return sock.poll(true, false, timeout);
154}
155
156}
157}
158