1 | // Copyright 2019 Google LLC |
2 | // |
3 | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | // you may not use this file except in compliance with the License. |
5 | // You may obtain a copy of the License at |
6 | // |
7 | // https://www.apache.org/licenses/LICENSE-2.0 |
8 | // |
9 | // Unless required by applicable law or agreed to in writing, software |
10 | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | // See the License for the specific language governing permissions and |
13 | // limitations under the License. |
14 | |
15 | #include "content_stream.h" |
16 | |
17 | #include "dap/any.h" |
18 | #include "dap/session.h" |
19 | |
20 | #include "chan.h" |
21 | #include "json_serializer.h" |
22 | #include "socket.h" |
23 | |
24 | #include <stdarg.h> |
25 | #include <stdio.h> |
26 | #include <atomic> |
27 | #include <deque> |
28 | #include <memory> |
29 | #include <mutex> |
30 | #include <thread> |
31 | #include <unordered_map> |
32 | #include <vector> |
33 | |
34 | #ifdef ENABLE_LOG |
35 | #define Log(message) printf("%s", message); |
36 | #else |
37 | #define Log(message) |
38 | #endif |
39 | |
40 | namespace { |
41 | |
42 | class Impl : public dap::Session { |
43 | public: |
44 | void onError(const ErrorHandler& handler) override { handlers.put(handler); } |
45 | |
46 | void registerHandler(const dap::TypeInfo* typeinfo, |
47 | const GenericRequestHandler& handler) override { |
48 | handlers.put(typeinfo, handler); |
49 | } |
50 | |
51 | void registerHandler(const dap::TypeInfo* typeinfo, |
52 | const GenericEventHandler& handler) override { |
53 | handlers.put(typeinfo, handler); |
54 | } |
55 | |
56 | void registerHandler(const dap::TypeInfo* typeinfo, |
57 | const GenericResponseSentHandler& handler) override { |
58 | handlers.put(typeinfo, handler); |
59 | } |
60 | |
61 | std::function<void()> getPayload() override { |
62 | auto request = reader.read(); |
63 | Log(request.c_str()); |
64 | if (request.size() > 0) { |
65 | if (auto payload = processMessage(request)) { |
66 | return payload; |
67 | } |
68 | } |
69 | return {}; |
70 | } |
71 | |
72 | void connect(const std::shared_ptr<dap::Reader>& r, |
73 | const std::shared_ptr<dap::Writer>& w) override { |
74 | if (isBound.exchange(true)) { |
75 | handlers.error("Session is already bound!" ); |
76 | return; |
77 | } |
78 | |
79 | reader = dap::ContentReader(r); |
80 | writer = dap::ContentWriter(w); |
81 | } |
82 | |
83 | void startProcessingMessages() override { |
84 | recvThread = std::thread([this] { |
85 | while (reader.isOpen()) { |
86 | if (auto payload = getPayload()) { |
87 | inbox.put(std::move(payload)); |
88 | } |
89 | } |
90 | }); |
91 | |
92 | dispatchThread = std::thread([this] { |
93 | while (auto payload = inbox.take()) { |
94 | payload.value()(); |
95 | } |
96 | }); |
97 | } |
98 | |
99 | bool send(const dap::TypeInfo* requestTypeInfo, |
100 | const dap::TypeInfo* responseTypeInfo, |
101 | const void* request, |
102 | const GenericResponseHandler& responseHandler) override { |
103 | int seq = nextSeq++; |
104 | |
105 | handlers.put(seq, responseTypeInfo, responseHandler); |
106 | |
107 | dap::json::Serializer s; |
108 | if (!s.object([&](dap::FieldSerializer* fs) { |
109 | return fs->field("seq" , dap::integer(seq)) && |
110 | fs->field("type" , "request" ) && |
111 | fs->field("command" , requestTypeInfo->name()) && |
112 | fs->field("arguments" , [&](dap::Serializer* s) { |
113 | return requestTypeInfo->serialize(s, request); |
114 | }); |
115 | })) { |
116 | return false; |
117 | } |
118 | return send(s.dump()); |
119 | } |
120 | |
121 | bool send(const dap::TypeInfo* typeinfo, const void* event) override { |
122 | dap::json::Serializer s; |
123 | if (!s.object([&](dap::FieldSerializer* fs) { |
124 | return fs->field("seq" , dap::integer(nextSeq++)) && |
125 | fs->field("type" , "event" ) && |
126 | fs->field("event" , typeinfo->name()) && |
127 | fs->field("body" , [&](dap::Serializer* s) { |
128 | return typeinfo->serialize(s, event); |
129 | }); |
130 | })) { |
131 | return false; |
132 | } |
133 | return send(s.dump()); |
134 | } |
135 | |
136 | //mozart: added time:2022/1/10 |
137 | bool send(const std::string& s) override { |
138 | std::unique_lock<std::mutex> lock(sendMutex); |
139 | if (!writer.isOpen()) { |
140 | handlers.error("Send failed as the writer is closed" ); |
141 | return false; |
142 | } |
143 | Log(s.c_str()); |
144 | return writer.write(s); |
145 | } |
146 | |
147 | ~Impl() { |
148 | inbox.close(); |
149 | reader.close(); |
150 | writer.close(); |
151 | if (recvThread.joinable()) { |
152 | recvThread.join(); |
153 | } |
154 | if (dispatchThread.joinable()) { |
155 | dispatchThread.join(); |
156 | } |
157 | } |
158 | |
159 | private: |
160 | using Payload = std::function<void()>; |
161 | |
162 | class EventHandlers { |
163 | public: |
164 | void put(const ErrorHandler& handler) { |
165 | std::unique_lock<std::mutex> lock(errorMutex); |
166 | errorHandler = handler; |
167 | } |
168 | |
169 | void error(const char* format, ...) { |
170 | va_list vararg; |
171 | va_start(vararg, format); |
172 | std::unique_lock<std::mutex> lock(errorMutex); |
173 | errorLocked(format, vararg); |
174 | va_end(vararg); |
175 | } |
176 | |
177 | std::pair<const dap::TypeInfo*, GenericRequestHandler> request( |
178 | const std::string& name) { |
179 | std::unique_lock<std::mutex> lock(requestMutex); |
180 | auto it = requestMap.find(name); |
181 | return (it != requestMap.end()) ? it->second : decltype(it->second){}; |
182 | } |
183 | |
184 | void put(const dap::TypeInfo* typeinfo, |
185 | const GenericRequestHandler& handler) { |
186 | std::unique_lock<std::mutex> lock(requestMutex); |
187 | auto added = |
188 | requestMap |
189 | .emplace(typeinfo->name(), std::make_pair(typeinfo, handler)) |
190 | .second; |
191 | if (!added) { |
192 | errorfLocked("Request handler for '%s' already registered" , |
193 | typeinfo->name().c_str()); |
194 | } |
195 | } |
196 | |
197 | std::pair<const dap::TypeInfo*, GenericResponseHandler> response( |
198 | int64_t seq) { |
199 | std::unique_lock<std::mutex> lock(responseMutex); |
200 | auto responseIt = responseMap.find(seq); |
201 | if (responseIt == responseMap.end()) { |
202 | errorfLocked("Unknown response with sequence %d" , seq); |
203 | return {}; |
204 | } |
205 | auto out = std::move(responseIt->second); |
206 | responseMap.erase(seq); |
207 | return out; |
208 | } |
209 | |
210 | void put(int seq, |
211 | const dap::TypeInfo* typeinfo, |
212 | const GenericResponseHandler& handler) { |
213 | std::unique_lock<std::mutex> lock(responseMutex); |
214 | auto added = |
215 | responseMap.emplace(seq, std::make_pair(typeinfo, handler)).second; |
216 | if (!added) { |
217 | errorfLocked("Response handler for sequence %d already registered" , |
218 | seq); |
219 | } |
220 | } |
221 | |
222 | std::pair<const dap::TypeInfo*, GenericEventHandler> event( |
223 | const std::string& name) { |
224 | std::unique_lock<std::mutex> lock(eventMutex); |
225 | auto it = eventMap.find(name); |
226 | return (it != eventMap.end()) ? it->second : decltype(it->second){}; |
227 | } |
228 | |
229 | void put(const dap::TypeInfo* typeinfo, |
230 | const GenericEventHandler& handler) { |
231 | std::unique_lock<std::mutex> lock(eventMutex); |
232 | auto added = |
233 | eventMap.emplace(typeinfo->name(), std::make_pair(typeinfo, handler)) |
234 | .second; |
235 | if (!added) { |
236 | errorfLocked("Event handler for '%s' already registered" , |
237 | typeinfo->name().c_str()); |
238 | } |
239 | } |
240 | |
241 | GenericResponseSentHandler responseSent(const dap::TypeInfo* typeinfo) { |
242 | std::unique_lock<std::mutex> lock(responseSentMutex); |
243 | auto it = responseSentMap.find(typeinfo); |
244 | return (it != responseSentMap.end()) ? it->second |
245 | : decltype(it->second){}; |
246 | } |
247 | |
248 | void put(const dap::TypeInfo* typeinfo, |
249 | const GenericResponseSentHandler& handler) { |
250 | std::unique_lock<std::mutex> lock(responseSentMutex); |
251 | auto added = responseSentMap.emplace(typeinfo, handler).second; |
252 | if (!added) { |
253 | errorfLocked("Response sent handler for '%s' already registered" , |
254 | typeinfo->name().c_str()); |
255 | } |
256 | } |
257 | |
258 | private: |
259 | void errorfLocked(const char* format, ...) { |
260 | va_list vararg; |
261 | va_start(vararg, format); |
262 | errorLocked(format, vararg); |
263 | va_end(vararg); |
264 | } |
265 | |
266 | void errorLocked(const char* format, va_list args) { |
267 | char buf[2048]; |
268 | vsnprintf(buf, sizeof(buf), format, args); |
269 | if (errorHandler) { |
270 | errorHandler(buf); |
271 | } |
272 | } |
273 | |
274 | std::mutex errorMutex; |
275 | ErrorHandler errorHandler; |
276 | |
277 | std::mutex requestMutex; |
278 | std::unordered_map<std::string, |
279 | std::pair<const dap::TypeInfo*, GenericRequestHandler>> |
280 | requestMap; |
281 | |
282 | std::mutex responseMutex; |
283 | std::unordered_map<int64_t, |
284 | std::pair<const dap::TypeInfo*, GenericResponseHandler>> |
285 | responseMap; |
286 | |
287 | std::mutex eventMutex; |
288 | std::unordered_map<std::string, |
289 | std::pair<const dap::TypeInfo*, GenericEventHandler>> |
290 | eventMap; |
291 | |
292 | std::mutex responseSentMutex; |
293 | std::unordered_map<const dap::TypeInfo*, GenericResponseSentHandler> |
294 | responseSentMap; |
295 | }; // EventHandlers |
296 | |
297 | Payload processMessage(const std::string& str) { |
298 | auto d = dap::json::Deserializer(str); |
299 | dap::string type; |
300 | if (!d.field("type" , &type)) { |
301 | handlers.error("Message missing string 'type' field" ); |
302 | return {}; |
303 | } |
304 | |
305 | dap::integer sequence = 0; |
306 | if (!d.field("seq" , &sequence)) { |
307 | handlers.error("Message missing number 'seq' field" ); |
308 | return {}; |
309 | } |
310 | |
311 | if (type == "request" ) { |
312 | return processRequest(&d, sequence); |
313 | } else if (type == "event" ) { |
314 | return processEvent(&d); |
315 | } else if (type == "response" ) { |
316 | processResponse(&d); |
317 | return {}; |
318 | } else { |
319 | handlers.error("Unknown message type '%s'" , type.c_str()); |
320 | } |
321 | |
322 | return {}; |
323 | } |
324 | |
325 | Payload processRequest(dap::json::Deserializer* d, dap::integer sequence) { |
326 | dap::string command; |
327 | if (!d->field("command" , &command)) { |
328 | handlers.error("Request missing string 'command' field" ); |
329 | return {}; |
330 | } |
331 | |
332 | const dap::TypeInfo* typeinfo; |
333 | GenericRequestHandler handler; |
334 | std::tie(typeinfo, handler) = handlers.request(command); |
335 | if (!typeinfo) { |
336 | handlers.error("No request handler registered for command '%s'" , |
337 | command.c_str()); |
338 | return {}; |
339 | } |
340 | |
341 | auto data = new uint8_t[typeinfo->size()]; |
342 | typeinfo->construct(data); |
343 | |
344 | if (!d->field("arguments" , [&](dap::Deserializer* d) { |
345 | return typeinfo->deserialize(d, data); |
346 | })) { |
347 | handlers.error("Failed to deserialize request" ); |
348 | typeinfo->destruct(data); |
349 | delete[] data; |
350 | return {}; |
351 | } |
352 | |
353 | return [=] { |
354 | handler( |
355 | data, |
356 | [=](const dap::TypeInfo* typeinfo, const void* data) { |
357 | // onSuccess |
358 | dap::json::Serializer s; |
359 | s.object([&](dap::FieldSerializer* fs) { |
360 | return fs->field("seq" , dap::integer(nextSeq++)) && |
361 | fs->field("type" , "response" ) && |
362 | fs->field("request_seq" , sequence) && |
363 | fs->field("success" , dap::boolean(true)) && |
364 | fs->field("command" , command) && |
365 | fs->field("body" , [&](dap::Serializer* s) { |
366 | return typeinfo->serialize(s, data); |
367 | }); |
368 | }); |
369 | //printf("%s\n", s.dump().c_str()); |
370 | send(s.dump()); |
371 | |
372 | if (auto handler = handlers.responseSent(typeinfo)) { |
373 | handler(data, nullptr); |
374 | } |
375 | }, |
376 | [=](const dap::TypeInfo* typeinfo, const dap::Error& error) { |
377 | // onError |
378 | dap::json::Serializer s; |
379 | s.object([&](dap::FieldSerializer* fs) { |
380 | return fs->field("seq" , dap::integer(nextSeq++)) && |
381 | fs->field("type" , "response" ) && |
382 | fs->field("request_seq" , sequence) && |
383 | fs->field("success" , dap::boolean(false)) && |
384 | fs->field("command" , command) && |
385 | fs->field("message" , error.message); |
386 | }); |
387 | Log(s.dump().c_str()); |
388 | send(s.dump()); |
389 | |
390 | if (auto handler = handlers.responseSent(typeinfo)) { |
391 | handler(nullptr, &error); |
392 | } |
393 | }); |
394 | typeinfo->destruct(data); |
395 | delete[] data; |
396 | }; |
397 | } |
398 | |
399 | Payload processEvent(dap::json::Deserializer* d) { |
400 | dap::string event; |
401 | if (!d->field("event" , &event)) { |
402 | handlers.error("Event missing string 'event' field" ); |
403 | return {}; |
404 | } |
405 | |
406 | const dap::TypeInfo* typeinfo; |
407 | GenericEventHandler handler; |
408 | std::tie(typeinfo, handler) = handlers.event(event); |
409 | if (!typeinfo) { |
410 | handlers.error("No event handler registered for event '%s'" , |
411 | event.c_str()); |
412 | return {}; |
413 | } |
414 | |
415 | auto data = new uint8_t[typeinfo->size()]; |
416 | typeinfo->construct(data); |
417 | |
418 | // "body" is an optional field for some events, such as "Terminated Event". |
419 | bool body_ok = true; |
420 | d->field("body" , [&](dap::Deserializer* d) { |
421 | if (!typeinfo->deserialize(d, data)) { |
422 | body_ok = false; |
423 | } |
424 | return true; |
425 | }); |
426 | |
427 | if (!body_ok) { |
428 | handlers.error("Failed to deserialize event '%s' body" , event.c_str()); |
429 | typeinfo->destruct(data); |
430 | delete[] data; |
431 | return {}; |
432 | } |
433 | |
434 | return [=] { |
435 | handler(data); |
436 | typeinfo->destruct(data); |
437 | delete[] data; |
438 | }; |
439 | } |
440 | |
441 | void processResponse(const dap::Deserializer* d) { |
442 | dap::integer requestSeq = 0; |
443 | if (!d->field("request_seq" , &requestSeq)) { |
444 | handlers.error("Response missing int 'request_seq' field" ); |
445 | return; |
446 | } |
447 | |
448 | const dap::TypeInfo* typeinfo; |
449 | GenericResponseHandler handler; |
450 | std::tie(typeinfo, handler) = handlers.response(requestSeq); |
451 | if (!typeinfo) { |
452 | handlers.error("Unknown response with sequence %d" , requestSeq); |
453 | return; |
454 | } |
455 | |
456 | dap::boolean success = false; |
457 | if (!d->field("success" , &success)) { |
458 | handlers.error("Response missing int 'success' field" ); |
459 | return; |
460 | } |
461 | |
462 | if (success) { |
463 | auto data = std::unique_ptr<uint8_t[]>(new uint8_t[typeinfo->size()]); |
464 | typeinfo->construct(data.get()); |
465 | |
466 | // "body" field in Response is an optional field. |
467 | d->field("body" , [&](const dap::Deserializer* d) { |
468 | return typeinfo->deserialize(d, data.get()); |
469 | }); |
470 | |
471 | handler(data.get(), nullptr); |
472 | typeinfo->destruct(data.get()); |
473 | } else { |
474 | std::string message; |
475 | if (!d->field("message" , &message)) { |
476 | handlers.error("Failed to deserialize message" ); |
477 | return; |
478 | } |
479 | auto error = dap::Error("%s" , message.c_str()); |
480 | handler(nullptr, &error); |
481 | } |
482 | } |
483 | |
484 | std::atomic<bool> isBound = {false}; |
485 | dap::ContentReader reader; |
486 | dap::ContentWriter writer; |
487 | |
488 | std::atomic<bool> shutdown = {false}; |
489 | EventHandlers handlers; |
490 | std::thread recvThread; |
491 | std::thread dispatchThread; |
492 | dap::Chan<Payload> inbox; |
493 | std::atomic<uint32_t> nextSeq = {1}; |
494 | std::mutex sendMutex; |
495 | }; |
496 | |
497 | } // anonymous namespace |
498 | |
499 | namespace dap { |
500 | |
501 | Error::Error(const std::string& message) : message(message) {} |
502 | |
503 | Error::Error(const char* msg, ...) { |
504 | char buf[2048]; |
505 | va_list vararg; |
506 | va_start(vararg, msg); |
507 | vsnprintf(buf, sizeof(buf), msg, vararg); |
508 | va_end(vararg); |
509 | message = buf; |
510 | } |
511 | |
512 | Session::~Session() = default; |
513 | |
514 | std::unique_ptr<Session> Session::create() { |
515 | return std::unique_ptr<Session>(new Impl()); |
516 | } |
517 | |
518 | } // namespace dap |
519 | |