1/********************************************************************
2 * Copyright (c) 2013 - 2014, Pivotal Inc.
3 * All rights reserved.
4 *
5 * Author: Zhanwei Wang
6 ********************************************************************/
7/********************************************************************
8 * 2014 -
9 * open source under Apache License Version 2.0
10 ********************************************************************/
11/**
12 * Licensed to the Apache Software Foundation (ASF) under one
13 * or more contributor license agreements. See the NOTICE file
14 * distributed with this work for additional information
15 * regarding copyright ownership. The ASF licenses this file
16 * to you under the Apache License, Version 2.0 (the
17 * "License"); you may not use this file except in compliance
18 * with the License. You may obtain a copy of the License at
19 *
20 * http://www.apache.org/licenses/LICENSE-2.0
21 *
22 * Unless required by applicable law or agreed to in writing, software
23 * distributed under the License is distributed on an "AS IS" BASIS,
24 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
25 * See the License for the specific language governing permissions and
26 * limitations under the License.
27 */
28#ifndef _HDFS_LIBHDFS3_NETWORK_BUFFEREDSOCKET_H_
29#define _HDFS_LIBHDFS3_NETWORK_BUFFEREDSOCKET_H_
30
31#include <vector>
32#include <stdint.h>
33#include <cstdlib>
34
35#include "Socket.h"
36
37namespace Hdfs {
38namespace Internal {
39
40/**
41 * A warper of Socket, read big endian int and varint from socket.
42 */
43class BufferedSocketReader {
44public:
45 virtual ~BufferedSocketReader() {
46 }
47
48 /**
49 * Read data from socket, if there is data buffered, read from buffer first.
50 * If there is nothing can be read, the caller will be blocked.
51 * @param b The buffer used to receive data.
52 * @param s The size of bytes to be read.
53 * @return The size of data already read.
54 * @throw HdfsNetworkException
55 * @throw HdfsEndOfStream
56 */
57 virtual int32_t read(char * b, int32_t s) = 0;
58
59 /**
60 * Read data form socket, if there is data buffered, read from buffer first.
61 * If there is not enough data can be read, the caller will be blocked.
62 * @param b The buffer used to receive data.
63 * @param s The size of bytes to read.
64 * @param timeout The timeout interval of this read operation, negative means infinite.
65 * @throw HdfsNetworkException
66 * @throw HdfsEndOfStream
67 * @throw HdfsTimeout
68 */
69 virtual void readFully(char * b, int32_t s, int timeout) = 0;
70
71 /**
72 * Read a 32 bit big endian integer from socket.
73 * If there is not enough data can be read, the caller will be blocked.
74 * @param timeout The timeout interval of this read operation, negative means infinite.
75 * @return A 32 bit integer.
76 * @throw HdfsNetworkException
77 * @throw HdfsEndOfStream
78 * @throw HdfsTimeout
79 */
80 virtual int32_t readBigEndianInt32(int timeout) = 0;
81
82 /**
83 * Read a variable length encoding 32bit integer from socket.
84 * If there is not enough data can be read, the caller will be blocked.
85 * @param timeout The timeout interval of this read operation, negative means infinite.
86 * @return A 32 bit integer.
87 * @throw HdfsNetworkException
88 * @throw HdfsEndOfStream
89 * @throw HdfsTimeout
90 */
91 virtual int32_t readVarint32(int timeout) = 0;
92
93 /**
94 * Test if the socket can be read without blocking.
95 * @param timeout Time timeout interval of this operation, negative means infinite.
96 * @return Return true if the socket can be read without blocking, false on timeout.
97 * @throw HdfsNetworkException
98 * @throw HdfsTimeout
99 */
100 virtual bool poll(int timeout) = 0;
101
102};
103
104/**
105 * An implement of BufferedSocketReader.
106 */
107class BufferedSocketReaderImpl: public BufferedSocketReader {
108public:
109 BufferedSocketReaderImpl(Socket& s);
110
111 BufferedSocketReaderImpl(Socket& s, size_t bufferSize);
112
113 int32_t read(char * b, int32_t s);
114
115 void readFully(char * b, int32_t s, int timeout);
116
117 int32_t readBigEndianInt32(int timeout);
118
119 int32_t readVarint32(int timeout);
120
121 bool poll(int timeout);
122
123private:
124 int32_t readVarint32(int timeout, int32_t step);
125
126 //for test
127 BufferedSocketReaderImpl(Socket & s, const std::vector<char> & buffer) :
128 cursor(0), size(buffer.size()), sock(s), buffer(buffer) {
129 }
130
131private:
132 int32_t cursor;
133 int32_t size;
134 Socket & sock;
135 std::vector<char> buffer;
136};
137
138}
139}
140
141#endif /* _HDFS_LIBHDFS3_NETWORK_BUFFEREDSOCKET_H_ */
142