1/********************************************************************
2 * Copyright (c) 2013 - 2014, Pivotal Inc.
3 * All rights reserved.
4 *
5 * Author: Zhanwei Wang
6 ********************************************************************/
7/********************************************************************
8 * 2014 -
9 * open source under Apache License Version 2.0
10 ********************************************************************/
11/**
12 * Licensed to the Apache Software Foundation (ASF) under one
13 * or more contributor license agreements. See the NOTICE file
14 * distributed with this work for additional information
15 * regarding copyright ownership. The ASF licenses this file
16 * to you under the Apache License, Version 2.0 (the
17 * "License"); you may not use this file except in compliance
18 * with the License. You may obtain a copy of the License at
19 *
20 * http://www.apache.org/licenses/LICENSE-2.0
21 *
22 * Unless required by applicable law or agreed to in writing, software
23 * distributed under the License is distributed on an "AS IS" BASIS,
24 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
25 * See the License for the specific language governing permissions and
26 * limitations under the License.
27 */
28#include "platform.h"
29
30#include <arpa/inet.h>
31#include <cassert>
32#include <climits>
33#include <cstring>
34#include <errno.h>
35#include <fcntl.h>
36#include <netdb.h>
37#include <netinet/in.h>
38#include <netinet/tcp.h>
39#include <poll.h>
40#include <stdint.h>
41#include <sys/socket.h>
42#include <sys/types.h>
43#include <unistd.h>
44
45#include <sstream>
46
47#include "DateTime.h"
48#include "Exception.h"
49#include "ExceptionInternal.h"
50#include "TcpSocket.h"
51#include "Syscall.h"
52
53namespace Hdfs {
54namespace Internal {
55
56TcpSocketImpl::TcpSocketImpl() :
57 sock(-1), lingerTimeout(-1) {
58}
59
60TcpSocketImpl::~TcpSocketImpl() {
61 close();
62}
63
64int32_t TcpSocketImpl::read(char * buffer, int32_t size) {
65 assert(-1 != sock);
66 assert(NULL != buffer && size > 0);
67 int32_t rc;
68
69 do {
70 rc = HdfsSystem::recv(sock, buffer, size, 0);
71 } while (-1 == rc && EINTR == errno && !CheckOperationCanceled());
72
73 if (-1 == rc) {
74 THROW(HdfsNetworkException, "Read %d bytes failed from %s: %s",
75 size, remoteAddr.c_str(), GetSystemErrorInfo(errno));
76 }
77
78 if (0 == rc) {
79 THROW(HdfsEndOfStream, "Read %d bytes failed from %s: End of the stream", size, remoteAddr.c_str());
80 }
81
82 return rc;
83}
84
85void TcpSocketImpl::readFully(char * buffer, int32_t size, int timeout) {
86 assert(-1 != sock);
87 assert(NULL != buffer && size > 0);
88 int32_t todo = size, rc;
89 int deadline = timeout;
90
91 while (todo > 0) {
92 steady_clock::time_point s = steady_clock::now();
93 CheckOperationCanceled();
94
95 if (poll(true, false, deadline)) {
96 rc = read(buffer + (size - todo), todo);
97 todo -= rc;
98 }
99
100 steady_clock::time_point e = steady_clock::now();
101
102 if (timeout > 0) {
103 deadline -= ToMilliSeconds(s, e);
104 }
105
106 if (todo > 0 && timeout >= 0 && deadline <= 0) {
107 THROW(HdfsTimeoutException, "Read %d bytes timeout from %s", size, remoteAddr.c_str());
108 }
109 }
110}
111
112int32_t TcpSocketImpl::write(const char * buffer, int32_t size) {
113 assert(-1 != sock);
114 assert(NULL != buffer && size > 0);
115 int32_t rc;
116
117 do {
118#ifdef MSG_NOSIGNAL //on linux
119 rc = HdfsSystem::send(sock, buffer, size, MSG_NOSIGNAL);
120#else
121 rc = HdfsSystem::send(sock, buffer, size, 0);
122#endif
123 } while (-1 == rc && EINTR == errno && !CheckOperationCanceled());
124
125 if (-1 == rc) {
126 THROW(HdfsNetworkException, "Write %d bytes failed to %s: %s",
127 size, remoteAddr.c_str(), GetSystemErrorInfo(errno));
128 }
129
130 return rc;
131}
132
133void TcpSocketImpl::writeFully(const char * buffer, int32_t size, int timeout) {
134 assert(-1 != sock);
135 assert(NULL != buffer && size > 0);
136 int32_t todo = size, rc;
137 int deadline = timeout;
138
139 while (todo > 0) {
140 steady_clock::time_point s = steady_clock::now();
141 CheckOperationCanceled();
142
143 if (poll(false, true, deadline)) {
144 rc = write(buffer + (size - todo), todo);
145 todo -= rc;
146 }
147
148 steady_clock::time_point e = steady_clock::now();
149
150 if (timeout > 0) {
151 deadline -= ToMilliSeconds(s, e);
152 }
153
154 if (todo > 0 && timeout >= 0 && deadline <= 0) {
155 THROW(HdfsTimeoutException, "Write %d bytes timeout to %s", size, remoteAddr.c_str());
156 }
157 }
158}
159
160void TcpSocketImpl::connect(const char * host, int port, int timeout) {
161 std::stringstream ss;
162 ss.imbue(std::locale::classic());
163 ss << port;
164 connect(host, ss.str().c_str(), timeout);
165}
166
167void TcpSocketImpl::connect(const char * host, const char * port, int timeout) {
168 assert(-1 == sock);
169 struct addrinfo hints, *addrs, *paddr;
170 memset(&hints, 0, sizeof(hints));
171 hints.ai_family = PF_UNSPEC;
172 hints.ai_socktype = SOCK_STREAM;
173 int retval = HdfsSystem::getaddrinfo(host, port, &hints, &addrs);
174
175 if (0 != retval) {
176 THROW(HdfsNetworkConnectException, "Failed to resolve address \"%s:%s\" %s",
177 host, port, gai_strerror(retval));
178 }
179
180 int deadline = timeout;
181 std::stringstream ss;
182 ss.imbue(std::locale::classic());
183 ss << "\"" << host << ":" << port << "\"";
184 remoteAddr = ss.str();
185
186 try {
187 for (paddr = addrs; NULL != paddr; paddr = paddr->ai_next) {
188 steady_clock::time_point s = steady_clock::now();
189 CheckOperationCanceled();
190
191 try {
192 connect(paddr, host, port, deadline);
193 } catch (HdfsNetworkConnectException & e) {
194 if (NULL == paddr->ai_next) {
195 throw;
196 }
197 } catch (HdfsTimeoutException & e) {
198 if (NULL == paddr->ai_next) {
199 throw;
200 }
201 }
202
203 if (-1 != sock) {
204 HdfsSystem::freeaddrinfo(addrs);
205 return;
206 }
207
208 steady_clock::time_point e = steady_clock::now();
209
210 if (timeout > 0) {
211 deadline -= ToMilliSeconds(s, e);
212 }
213
214 if (-1 == sock && timeout >= 0 && deadline <= 0) {
215 THROW(HdfsTimeoutException, "Connect to \"%s:%s\" timeout", host, port);
216 }
217 }
218 } catch (...) {
219 HdfsSystem::freeaddrinfo(addrs);
220 throw;
221 }
222}
223
224void TcpSocketImpl::connect(struct addrinfo * paddr, const char * host,
225 const char * port, int timeout) {
226 assert(-1 == sock);
227 sock = HdfsSystem::socket(paddr->ai_family, paddr->ai_socktype,
228 paddr->ai_protocol);
229
230 if (-1 == sock) {
231 THROW(HdfsNetworkException,
232 "Create socket failed when connect to %s: %s",
233 remoteAddr.c_str(), GetSystemErrorInfo(errno));
234 }
235
236 if (lingerTimeout >= 0) {
237 setLingerTimeoutInternal(lingerTimeout);
238 }
239
240#ifdef __linux__
241 /*
242 * on linux some kernel use SO_SNDTIMEO as connect timeout.
243 * It is OK to set a very large value here since the user has its own timeout mechanism.
244 */
245 setSendTimeout(3600000);
246#endif
247
248 try {
249 setBlockMode(false);
250 disableSigPipe();
251 int rc = 0;
252
253 do {
254 rc = HdfsSystem::connect(sock, paddr->ai_addr, paddr->ai_addrlen);
255 } while (rc < 0 && EINTR == errno && !CheckOperationCanceled());
256
257 if (rc < 0) {
258 if (EINPROGRESS != errno && EWOULDBLOCK != errno) {
259 if (ETIMEDOUT == errno) {
260 THROW(HdfsTimeoutException, "Connect to \"%s:%s\" timeout",
261 host, port);
262 } else {
263 THROW(HdfsNetworkConnectException,
264 "Connect to \"%s:%s\" failed: %s",
265 host, port, GetSystemErrorInfo(errno));
266 }
267 }
268
269 if (!poll(false, true, timeout)) {
270 THROW(HdfsTimeoutException, "Connect to \"%s:%s\" timeout", host, port);
271 }
272
273 struct sockaddr peer;
274
275 unsigned int len = sizeof(peer);
276
277 memset(&peer, 0, sizeof(peer));
278
279 if (HdfsSystem::getpeername(sock, &peer, &len)) {
280 /*
281 * connect failed, find out the error info.
282 */
283 char c;
284 rc = HdfsSystem::recv(sock, &c, 1, 0);
285 assert(rc < 0);
286
287 if (ETIMEDOUT == errno) {
288 THROW(HdfsTimeoutException, "Connect to \"%s:%s\" timeout",
289 host, port);
290 }
291
292 THROW(HdfsNetworkConnectException, "Connect to \"%s:%s\" failed: %s",
293 host, port, GetSystemErrorInfo(errno));
294 }
295 }
296
297 setBlockMode(true);
298 } catch (...) {
299 close();
300 throw;
301 }
302}
303
304void TcpSocketImpl::setBlockMode(bool enable) {
305 int flag;
306 flag = HdfsSystem::fcntl(sock, F_GETFL, 0);
307
308 if (-1 == flag) {
309 THROW(HdfsNetworkException, "Get socket flag failed for remote node %s: %s",
310 remoteAddr.c_str(), GetSystemErrorInfo(errno));
311 }
312
313 flag = enable ? (flag & ~O_NONBLOCK) : (flag | O_NONBLOCK);
314
315 if (-1 == HdfsSystem::fcntl(sock, F_SETFL, flag)) {
316 THROW(HdfsNetworkException, "Set socket flag failed for remote node %s: %s",
317 remoteAddr.c_str(), GetSystemErrorInfo(errno));
318 }
319}
320
321void TcpSocketImpl::disableSigPipe() {
322#ifdef SO_NOSIGPIPE /* only available on macos*/
323 int flag = 1;
324
325 if (HdfsSystem::setsockopt(sock, SOL_SOCKET, SO_NOSIGPIPE, (char *) &flag,
326 sizeof(flag))) {
327 THROW(HdfsNetworkException, "Set socket flag failed for remote node %s: %s",
328 remoteAddr.c_str(), GetSystemErrorInfo(errno));
329 }
330
331#endif
332}
333
334bool TcpSocketImpl::poll(bool read, bool write, int timeout) {
335 assert(-1 != sock);
336 int rc;
337 struct pollfd pfd;
338
339 do {
340 memset(&pfd, 0, sizeof(pfd));
341 pfd.fd = sock;
342
343 if (read) {
344 pfd.events |= POLLIN;
345 }
346
347 if (write) {
348 pfd.events |= POLLOUT;
349 }
350
351 rc = HdfsSystem::poll(&pfd, 1, timeout);
352 } while (-1 == rc && EINTR == errno && !CheckOperationCanceled());
353
354 if (-1 == rc) {
355 THROW(HdfsNetworkException, "Poll failed for remote node %s: %s",
356 remoteAddr.c_str(), GetSystemErrorInfo(errno));
357 }
358
359 return 0 != rc;
360}
361
362void TcpSocketImpl::setNoDelay(bool enable) {
363 assert(-1 != sock);
364 int flag = enable ? 1 : 0;
365
366 if (HdfsSystem::setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (char *) &flag,
367 sizeof(flag))) {
368 THROW(HdfsNetworkException, "Set socket flag failed for remote node %s: %s",
369 remoteAddr.c_str(), GetSystemErrorInfo(errno));
370 }
371}
372
373void TcpSocketImpl::setLingerTimeout(int timeout) {
374 lingerTimeout = timeout;
375}
376
377void TcpSocketImpl::setLingerTimeoutInternal(int timeout) {
378 assert(-1 != sock);
379 struct linger l;
380 l.l_onoff = timeout > 0 ? true : false;
381 l.l_linger = timeout > 0 ? timeout : 0;
382
383 if (HdfsSystem::setsockopt(sock, SOL_SOCKET, SO_LINGER, &l, sizeof(l))) {
384 THROW(HdfsNetworkException, "Set socket flag failed for remote node %s: %s",
385 remoteAddr.c_str(), GetSystemErrorInfo(errno));
386 }
387}
388
389void TcpSocketImpl::setSendTimeout(int timeout) {
390 assert(-1 != sock);
391 struct timeval timeo;
392 timeo.tv_sec = timeout / 1000;
393 timeo.tv_usec = (timeout % 1000) * 1000;
394
395 if (HdfsSystem::setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, &timeo, sizeof(timeo))) {
396 THROW(HdfsNetworkException, "Set socket flag failed for remote node %s: %s",
397 remoteAddr.c_str(), GetSystemErrorInfo(errno));
398 }
399}
400
401void TcpSocketImpl::close() {
402 if (-1 != sock) {
403 HdfsSystem::shutdown(sock, SHUT_RDWR);
404 HdfsSystem::close(sock);
405 sock = -1;
406 }
407}
408
409}
410}
411