1/*
2 * IXWebSocketTransport.h
3 * Author: Benjamin Sergeant
4 * Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved.
5 */
6
7#pragma once
8
9//
10// Adapted from https://github.com/dhbaird/easywsclient
11//
12
13#include "IXCancellationRequest.h"
14#include "IXProgressCallback.h"
15#include "IXSocketTLSOptions.h"
16#include "IXWebSocketCloseConstants.h"
17#include "IXWebSocketHandshake.h"
18#include "IXWebSocketHttpHeaders.h"
19#include "IXWebSocketPerMessageDeflate.h"
20#include "IXWebSocketPerMessageDeflateOptions.h"
21#include "IXWebSocketSendInfo.h"
22#include "IXWebSocketSendData.h"
23#include <atomic>
24#include <functional>
25#include <list>
26#include <memory>
27#include <mutex>
28#include <string>
29#include <vector>
30
31namespace ix
32{
33 class Socket;
34
35 enum class SendMessageKind
36 {
37 Text,
38 Binary,
39 Ping
40 };
41
42 class WebSocketTransport
43 {
44 public:
45 enum class ReadyState
46 {
47 CLOSING,
48 CLOSED,
49 CONNECTING,
50 OPEN
51 };
52
53 enum class MessageKind
54 {
55 MSG_TEXT,
56 MSG_BINARY,
57 PING,
58 PONG,
59 FRAGMENT
60 };
61
62 enum class PollResult
63 {
64 Succeeded,
65 AbnormalClose,
66 CannotFlushSendBuffer
67 };
68
69 using OnMessageCallback =
70 std::function<void(const std::string&, size_t, bool, MessageKind)>;
71 using OnCloseCallback = std::function<void(uint16_t, const std::string&, size_t, bool)>;
72
73 WebSocketTransport();
74 ~WebSocketTransport();
75
76 void configure(const WebSocketPerMessageDeflateOptions& perMessageDeflateOptions,
77 const SocketTLSOptions& socketTLSOptions,
78 bool enablePong,
79 int pingIntervalSecs);
80
81 // Client
82 WebSocketInitResult connectToUrl(const std::string& url,
83 const WebSocketHttpHeaders& headers,
84 int timeoutSecs);
85
86 // Server
87 WebSocketInitResult connectToSocket(std::unique_ptr<Socket> socket,
88 int timeoutSecs,
89 bool enablePerMessageDeflate);
90
91 PollResult poll();
92 WebSocketSendInfo sendBinary(const IXWebSocketSendData& message,
93 const OnProgressCallback& onProgressCallback);
94 WebSocketSendInfo sendText(const IXWebSocketSendData& message,
95 const OnProgressCallback& onProgressCallback);
96 WebSocketSendInfo sendPing(const IXWebSocketSendData& message);
97
98 void close(uint16_t code = WebSocketCloseConstants::kNormalClosureCode,
99 const std::string& reason = WebSocketCloseConstants::kNormalClosureMessage,
100 size_t closeWireSize = 0,
101 bool remote = false);
102
103 void closeSocket();
104
105 ReadyState getReadyState() const;
106 void setReadyState(ReadyState readyState);
107 void setOnCloseCallback(const OnCloseCallback& onCloseCallback);
108 void dispatch(PollResult pollResult, const OnMessageCallback& onMessageCallback);
109 size_t bufferedAmount() const;
110
111 // internal
112 WebSocketSendInfo sendHeartBeat();
113
114 private:
115 std::string _url;
116
117 struct wsheader_type
118 {
119 unsigned header_size;
120 bool fin;
121 bool rsv1;
122 bool rsv2;
123 bool rsv3;
124 bool mask;
125 enum opcode_type
126 {
127 CONTINUATION = 0x0,
128 TEXT_FRAME = 0x1,
129 BINARY_FRAME = 0x2,
130 CLOSE = 8,
131 PING = 9,
132 PONG = 0xa,
133 } opcode;
134 int N0;
135 uint64_t N;
136 uint8_t masking_key[4];
137 };
138
139 // Tells whether we should mask the data we send.
140 // client should mask but server should not
141 std::atomic<bool> _useMask;
142
143 // Tells whether we should flush the send buffer before
144 // saying that a send is complete. This is the mode for server code.
145 std::atomic<bool> _blockingSend;
146
147 // Buffer for reading from our socket. That buffer is never resized.
148 std::vector<uint8_t> _readbuf;
149
150 // Contains all messages that were fetched in the last socket read.
151 // This could be a mix of control messages (Close, Ping, etc...) and
152 // data messages. That buffer is resized
153 std::vector<uint8_t> _rxbuf;
154
155 // Contains all messages that are waiting to be sent
156 std::vector<uint8_t> _txbuf;
157 mutable std::mutex _txbufMutex;
158
159 // Hold fragments for multi-fragments messages in a list. We support receiving very large
160 // messages (tested messages up to 700M) and we cannot put them in a single
161 // buffer that is resized, as this operation can be slow when a buffer has its
162 // size increased 2 fold, while appending to a list has a fixed cost.
163 std::list<std::string> _chunks;
164
165 // Record the message kind (will be TEXT or BINARY) for a fragmented
166 // message, present in the first chunk, since the final chunk will be a
167 // CONTINUATION opcode and doesn't tell the full message kind
168 MessageKind _fragmentedMessageKind;
169
170 // Ditto for whether a message is compressed
171 bool _receivedMessageCompressed;
172
173 // Fragments are 32K long
174 static constexpr size_t kChunkSize = 1 << 15;
175
176 // Underlying TCP socket
177 std::unique_ptr<Socket> _socket;
178 std::mutex _socketMutex;
179
180 // Hold the state of the connection (OPEN, CLOSED, etc...)
181 std::atomic<ReadyState> _readyState;
182
183 OnCloseCallback _onCloseCallback;
184 std::string _closeReason;
185 mutable std::mutex _closeReasonMutex;
186 std::atomic<uint16_t> _closeCode;
187 std::atomic<size_t> _closeWireSize;
188 std::atomic<bool> _closeRemote;
189
190 // Data used for Per Message Deflate compression (with zlib)
191 WebSocketPerMessageDeflatePtr _perMessageDeflate;
192 WebSocketPerMessageDeflateOptions _perMessageDeflateOptions;
193 std::atomic<bool> _enablePerMessageDeflate;
194
195 std::string _decompressedMessage;
196 std::string _compressedMessage;
197
198 // Used to control TLS connection behavior
199 SocketTLSOptions _socketTLSOptions;
200
201 // Used to cancel dns lookup + socket connect + http upgrade
202 std::atomic<bool> _requestInitCancellation;
203
204 mutable std::mutex _closingTimePointMutex;
205 std::chrono::time_point<std::chrono::steady_clock> _closingTimePoint;
206 static const int kClosingMaximumWaitingDelayInMs;
207
208 // enable auto response to ping
209 std::atomic<bool> _enablePong;
210 static const bool kDefaultEnablePong;
211
212 // Optional ping and pong timeout
213 int _pingIntervalSecs;
214 std::atomic<bool> _pongReceived;
215
216 static const int kDefaultPingIntervalSecs;
217 static const std::string kPingMessage;
218 std::atomic<uint64_t> _pingCount;
219
220 // We record when ping are being sent so that we can know when to send the next one
221 mutable std::mutex _lastSendPingTimePointMutex;
222 std::chrono::time_point<std::chrono::steady_clock> _lastSendPingTimePoint;
223
224 // If this function returns true, it is time to send a new ping
225 bool pingIntervalExceeded();
226 void initTimePointsAfterConnect();
227
228 // after calling close(), if no CLOSE frame answer is received back from the remote, we
229 // should close the connexion
230 bool closingDelayExceeded();
231
232 void sendCloseFrame(uint16_t code, const std::string& reason);
233
234 void closeSocketAndSwitchToClosedState(uint16_t code,
235 const std::string& reason,
236 size_t closeWireSize,
237 bool remote);
238
239 bool wakeUpFromPoll(uint64_t wakeUpCode);
240
241 bool flushSendBuffer();
242 bool sendOnSocket();
243 bool receiveFromSocket();
244
245 WebSocketSendInfo sendData(wsheader_type::opcode_type type,
246 const IXWebSocketSendData& message,
247 bool compress,
248 const OnProgressCallback& onProgressCallback = nullptr);
249
250 template<class Iterator>
251 bool sendFragment(
252 wsheader_type::opcode_type type, bool fin, Iterator begin, Iterator end, bool compress);
253
254 void emitMessage(MessageKind messageKind,
255 const std::string& message,
256 bool compressedMessage,
257 const OnMessageCallback& onMessageCallback);
258
259 bool isSendBufferEmpty() const;
260
261 template<class Iterator>
262 void appendToSendBuffer(const std::vector<uint8_t>& header,
263 Iterator begin,
264 Iterator end,
265 uint64_t message_size,
266 uint8_t masking_key[4]);
267
268 unsigned getRandomUnsigned();
269 void unmaskReceiveBuffer(const wsheader_type& ws);
270
271 std::string getMergedChunks() const;
272
273 void setCloseReason(const std::string& reason);
274 const std::string& getCloseReason() const;
275 };
276} // namespace ix
277