1 | #include <DataStreams/RemoteBlockOutputStream.h> |
2 | |
3 | #include <Client/Connection.h> |
4 | #include <common/logger_useful.h> |
5 | |
6 | #include <Common/NetException.h> |
7 | #include <Common/CurrentThread.h> |
8 | #include <Interpreters/InternalTextLogsQueue.h> |
9 | #include <IO/ConnectionTimeouts.h> |
10 | |
11 | |
12 | namespace DB |
13 | { |
14 | |
15 | namespace ErrorCodes |
16 | { |
17 | extern const int UNEXPECTED_PACKET_FROM_SERVER; |
18 | extern const int LOGICAL_ERROR; |
19 | } |
20 | |
21 | |
22 | RemoteBlockOutputStream::RemoteBlockOutputStream(Connection & connection_, |
23 | const ConnectionTimeouts & timeouts, |
24 | const String & query_, |
25 | const Settings * settings_) |
26 | : connection(connection_), query(query_), settings(settings_) |
27 | { |
28 | /** Send query and receive "header", that describe table structure. |
29 | * Header is needed to know, what structure is required for blocks to be passed to 'write' method. |
30 | */ |
31 | connection.sendQuery(timeouts, query, "" , QueryProcessingStage::Complete, settings, nullptr); |
32 | |
33 | while (true) |
34 | { |
35 | Packet packet = connection.receivePacket(); |
36 | |
37 | if (Protocol::Server::Data == packet.type) |
38 | { |
39 | header = packet.block; |
40 | break; |
41 | } |
42 | else if (Protocol::Server::Exception == packet.type) |
43 | { |
44 | packet.exception->rethrow(); |
45 | break; |
46 | } |
47 | else if (Protocol::Server::Log == packet.type) |
48 | { |
49 | /// Pass logs from remote server to client |
50 | if (auto log_queue = CurrentThread::getInternalTextLogsQueue()) |
51 | log_queue->pushBlock(std::move(packet.block)); |
52 | } |
53 | else if (Protocol::Server::TableColumns == packet.type) |
54 | { |
55 | /// Server could attach ColumnsDescription in front of stream for column defaults. There's no need to pass it through cause |
56 | /// client's already got this information for remote table. Ignore. |
57 | } |
58 | else |
59 | throw NetException("Unexpected packet from server (expected Data or Exception, got " |
60 | + String(Protocol::Server::toString(packet.type)) + ")" , ErrorCodes::UNEXPECTED_PACKET_FROM_SERVER); |
61 | } |
62 | } |
63 | |
64 | |
65 | void RemoteBlockOutputStream::write(const Block & block) |
66 | { |
67 | if (header) |
68 | assertBlocksHaveEqualStructure(block, header, "RemoteBlockOutputStream" ); |
69 | |
70 | try |
71 | { |
72 | connection.sendData(block); |
73 | } |
74 | catch (const NetException &) |
75 | { |
76 | /// Try to get more detailed exception from server |
77 | auto packet_type = connection.checkPacket(); |
78 | if (packet_type && *packet_type == Protocol::Server::Exception) |
79 | { |
80 | Packet packet = connection.receivePacket(); |
81 | packet.exception->rethrow(); |
82 | } |
83 | |
84 | throw; |
85 | } |
86 | } |
87 | |
88 | |
89 | void RemoteBlockOutputStream::writePrepared(ReadBuffer & input, size_t size) |
90 | { |
91 | /// We cannot use 'header'. Input must contain block with proper structure. |
92 | connection.sendPreparedData(input, size); |
93 | } |
94 | |
95 | |
96 | void RemoteBlockOutputStream::writeSuffix() |
97 | { |
98 | /// Empty block means end of data. |
99 | connection.sendData(Block()); |
100 | |
101 | /// Wait for EndOfStream or Exception packet, skip Log packets. |
102 | while (true) |
103 | { |
104 | Packet packet = connection.receivePacket(); |
105 | |
106 | if (Protocol::Server::EndOfStream == packet.type) |
107 | break; |
108 | else if (Protocol::Server::Exception == packet.type) |
109 | packet.exception->rethrow(); |
110 | else if (Protocol::Server::Log == packet.type) |
111 | { |
112 | // Do nothing |
113 | } |
114 | else |
115 | throw NetException("Unexpected packet from server (expected EndOfStream or Exception, got " |
116 | + String(Protocol::Server::toString(packet.type)) + ")" , ErrorCodes::UNEXPECTED_PACKET_FROM_SERVER); |
117 | } |
118 | |
119 | finished = true; |
120 | } |
121 | |
122 | RemoteBlockOutputStream::~RemoteBlockOutputStream() |
123 | { |
124 | /// If interrupted in the middle of the loop of communication with the server, then interrupt the connection, |
125 | /// to not leave the connection in unsynchronized state. |
126 | if (!finished) |
127 | { |
128 | try |
129 | { |
130 | connection.disconnect(); |
131 | } |
132 | catch (...) |
133 | { |
134 | tryLogCurrentException(__PRETTY_FUNCTION__); |
135 | } |
136 | } |
137 | } |
138 | |
139 | } |
140 | |