1#include <Poco/Net/NetException.h>
2
3#include <IO/WriteBufferFromPocoSocket.h>
4
5#include <Common/Exception.h>
6#include <Common/NetException.h>
7#include <Common/Stopwatch.h>
8
9
10namespace ProfileEvents
11{
12 extern const Event NetworkSendElapsedMicroseconds;
13}
14
15
16namespace DB
17{
18
19namespace ErrorCodes
20{
21 extern const int NETWORK_ERROR;
22 extern const int SOCKET_TIMEOUT;
23 extern const int CANNOT_WRITE_TO_SOCKET;
24}
25
26
27void WriteBufferFromPocoSocket::nextImpl()
28{
29 if (!offset())
30 return;
31
32 Stopwatch watch;
33
34 size_t bytes_written = 0;
35 while (bytes_written < offset())
36 {
37 ssize_t res = 0;
38
39 /// Add more details to exceptions.
40 try
41 {
42 res = socket.impl()->sendBytes(working_buffer.begin() + bytes_written, offset() - bytes_written);
43 }
44 catch (const Poco::Net::NetException & e)
45 {
46 throw NetException(e.displayText() + ", while writing to socket (" + peer_address.toString() + ")", ErrorCodes::NETWORK_ERROR);
47 }
48 catch (const Poco::TimeoutException &)
49 {
50 throw NetException("Timeout exceeded while writing to socket (" + peer_address.toString() + ")", ErrorCodes::SOCKET_TIMEOUT);
51 }
52 catch (const Poco::IOException & e)
53 {
54 throw NetException(e.displayText() + ", while writing to socket (" + peer_address.toString() + ")", ErrorCodes::NETWORK_ERROR);
55 }
56
57 if (res < 0)
58 throw NetException("Cannot write to socket (" + peer_address.toString() + ")", ErrorCodes::CANNOT_WRITE_TO_SOCKET);
59
60 bytes_written += res;
61 }
62
63 ProfileEvents::increment(ProfileEvents::NetworkSendElapsedMicroseconds, watch.elapsedMicroseconds());
64}
65
66WriteBufferFromPocoSocket::WriteBufferFromPocoSocket(Poco::Net::Socket & socket_, size_t buf_size)
67 : BufferWithOwnMemory<WriteBuffer>(buf_size), socket(socket_), peer_address(socket.peerAddress())
68{
69}
70
71WriteBufferFromPocoSocket::~WriteBufferFromPocoSocket()
72{
73 try
74 {
75 next();
76 }
77 catch (...)
78 {
79 tryLogCurrentException(__PRETTY_FUNCTION__);
80 }
81}
82
83}
84