1#include <DataStreams/RemoteBlockOutputStream.h>
2
3#include <Client/Connection.h>
4#include <common/logger_useful.h>
5
6#include <Common/NetException.h>
7#include <Common/CurrentThread.h>
8#include <Interpreters/InternalTextLogsQueue.h>
9#include <IO/ConnectionTimeouts.h>
10
11
12namespace DB
13{
14
15namespace ErrorCodes
16{
17 extern const int UNEXPECTED_PACKET_FROM_SERVER;
18 extern const int LOGICAL_ERROR;
19}
20
21
22RemoteBlockOutputStream::RemoteBlockOutputStream(Connection & connection_,
23 const ConnectionTimeouts & timeouts,
24 const String & query_,
25 const Settings * settings_)
26 : connection(connection_), query(query_), settings(settings_)
27{
28 /** Send query and receive "header", that describe table structure.
29 * Header is needed to know, what structure is required for blocks to be passed to 'write' method.
30 */
31 connection.sendQuery(timeouts, query, "", QueryProcessingStage::Complete, settings, nullptr);
32
33 while (true)
34 {
35 Packet packet = connection.receivePacket();
36
37 if (Protocol::Server::Data == packet.type)
38 {
39 header = packet.block;
40 break;
41 }
42 else if (Protocol::Server::Exception == packet.type)
43 {
44 packet.exception->rethrow();
45 break;
46 }
47 else if (Protocol::Server::Log == packet.type)
48 {
49 /// Pass logs from remote server to client
50 if (auto log_queue = CurrentThread::getInternalTextLogsQueue())
51 log_queue->pushBlock(std::move(packet.block));
52 }
53 else if (Protocol::Server::TableColumns == packet.type)
54 {
55 /// Server could attach ColumnsDescription in front of stream for column defaults. There's no need to pass it through cause
56 /// client's already got this information for remote table. Ignore.
57 }
58 else
59 throw NetException("Unexpected packet from server (expected Data or Exception, got "
60 + String(Protocol::Server::toString(packet.type)) + ")", ErrorCodes::UNEXPECTED_PACKET_FROM_SERVER);
61 }
62}
63
64
65void RemoteBlockOutputStream::write(const Block & block)
66{
67 if (header)
68 assertBlocksHaveEqualStructure(block, header, "RemoteBlockOutputStream");
69
70 try
71 {
72 connection.sendData(block);
73 }
74 catch (const NetException &)
75 {
76 /// Try to get more detailed exception from server
77 auto packet_type = connection.checkPacket();
78 if (packet_type && *packet_type == Protocol::Server::Exception)
79 {
80 Packet packet = connection.receivePacket();
81 packet.exception->rethrow();
82 }
83
84 throw;
85 }
86}
87
88
89void RemoteBlockOutputStream::writePrepared(ReadBuffer & input, size_t size)
90{
91 /// We cannot use 'header'. Input must contain block with proper structure.
92 connection.sendPreparedData(input, size);
93}
94
95
96void RemoteBlockOutputStream::writeSuffix()
97{
98 /// Empty block means end of data.
99 connection.sendData(Block());
100
101 /// Wait for EndOfStream or Exception packet, skip Log packets.
102 while (true)
103 {
104 Packet packet = connection.receivePacket();
105
106 if (Protocol::Server::EndOfStream == packet.type)
107 break;
108 else if (Protocol::Server::Exception == packet.type)
109 packet.exception->rethrow();
110 else if (Protocol::Server::Log == packet.type)
111 {
112 // Do nothing
113 }
114 else
115 throw NetException("Unexpected packet from server (expected EndOfStream or Exception, got "
116 + String(Protocol::Server::toString(packet.type)) + ")", ErrorCodes::UNEXPECTED_PACKET_FROM_SERVER);
117 }
118
119 finished = true;
120}
121
122RemoteBlockOutputStream::~RemoteBlockOutputStream()
123{
124 /// If interrupted in the middle of the loop of communication with the server, then interrupt the connection,
125 /// to not leave the connection in unsynchronized state.
126 if (!finished)
127 {
128 try
129 {
130 connection.disconnect();
131 }
132 catch (...)
133 {
134 tryLogCurrentException(__PRETTY_FUNCTION__);
135 }
136 }
137}
138
139}
140