1// Copyright 2019 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#include "content_stream.h"
16
17#include "dap/any.h"
18#include "dap/session.h"
19
20#include "chan.h"
21#include "json_serializer.h"
22#include "socket.h"
23
24#include <stdarg.h>
25#include <stdio.h>
26#include <atomic>
27#include <deque>
28#include <memory>
29#include <mutex>
30#include <thread>
31#include <unordered_map>
32#include <vector>
33
34#ifdef ENABLE_LOG
35#define Log(message) printf("%s", message);
36#else
37#define Log(message)
38#endif
39
40namespace {
41
42class Impl : public dap::Session {
43 public:
44 void onError(const ErrorHandler& handler) override { handlers.put(handler); }
45
46 void registerHandler(const dap::TypeInfo* typeinfo,
47 const GenericRequestHandler& handler) override {
48 handlers.put(typeinfo, handler);
49 }
50
51 void registerHandler(const dap::TypeInfo* typeinfo,
52 const GenericEventHandler& handler) override {
53 handlers.put(typeinfo, handler);
54 }
55
56 void registerHandler(const dap::TypeInfo* typeinfo,
57 const GenericResponseSentHandler& handler) override {
58 handlers.put(typeinfo, handler);
59 }
60
61 std::function<void()> getPayload() override {
62 auto request = reader.read();
63 Log(request.c_str());
64 if (request.size() > 0) {
65 if (auto payload = processMessage(request)) {
66 return payload;
67 }
68 }
69 return {};
70 }
71
72 void connect(const std::shared_ptr<dap::Reader>& r,
73 const std::shared_ptr<dap::Writer>& w) override {
74 if (isBound.exchange(true)) {
75 handlers.error("Session is already bound!");
76 return;
77 }
78
79 reader = dap::ContentReader(r);
80 writer = dap::ContentWriter(w);
81 }
82
83 void startProcessingMessages() override {
84 recvThread = std::thread([this] {
85 while (reader.isOpen()) {
86 if (auto payload = getPayload()) {
87 inbox.put(std::move(payload));
88 }
89 }
90 });
91
92 dispatchThread = std::thread([this] {
93 while (auto payload = inbox.take()) {
94 payload.value()();
95 }
96 });
97 }
98
99 bool send(const dap::TypeInfo* requestTypeInfo,
100 const dap::TypeInfo* responseTypeInfo,
101 const void* request,
102 const GenericResponseHandler& responseHandler) override {
103 int seq = nextSeq++;
104
105 handlers.put(seq, responseTypeInfo, responseHandler);
106
107 dap::json::Serializer s;
108 if (!s.object([&](dap::FieldSerializer* fs) {
109 return fs->field("seq", dap::integer(seq)) &&
110 fs->field("type", "request") &&
111 fs->field("command", requestTypeInfo->name()) &&
112 fs->field("arguments", [&](dap::Serializer* s) {
113 return requestTypeInfo->serialize(s, request);
114 });
115 })) {
116 return false;
117 }
118 return send(s.dump());
119 }
120
121 bool send(const dap::TypeInfo* typeinfo, const void* event) override {
122 dap::json::Serializer s;
123 if (!s.object([&](dap::FieldSerializer* fs) {
124 return fs->field("seq", dap::integer(nextSeq++)) &&
125 fs->field("type", "event") &&
126 fs->field("event", typeinfo->name()) &&
127 fs->field("body", [&](dap::Serializer* s) {
128 return typeinfo->serialize(s, event);
129 });
130 })) {
131 return false;
132 }
133 return send(s.dump());
134 }
135
136 //mozart: added time:2022/1/10
137 bool send(const std::string& s) override {
138 std::unique_lock<std::mutex> lock(sendMutex);
139 if (!writer.isOpen()) {
140 handlers.error("Send failed as the writer is closed");
141 return false;
142 }
143 Log(s.c_str());
144 return writer.write(s);
145 }
146
147 ~Impl() {
148 inbox.close();
149 reader.close();
150 writer.close();
151 if (recvThread.joinable()) {
152 recvThread.join();
153 }
154 if (dispatchThread.joinable()) {
155 dispatchThread.join();
156 }
157 }
158
159 private:
160 using Payload = std::function<void()>;
161
162 class EventHandlers {
163 public:
164 void put(const ErrorHandler& handler) {
165 std::unique_lock<std::mutex> lock(errorMutex);
166 errorHandler = handler;
167 }
168
169 void error(const char* format, ...) {
170 va_list vararg;
171 va_start(vararg, format);
172 std::unique_lock<std::mutex> lock(errorMutex);
173 errorLocked(format, vararg);
174 va_end(vararg);
175 }
176
177 std::pair<const dap::TypeInfo*, GenericRequestHandler> request(
178 const std::string& name) {
179 std::unique_lock<std::mutex> lock(requestMutex);
180 auto it = requestMap.find(name);
181 return (it != requestMap.end()) ? it->second : decltype(it->second){};
182 }
183
184 void put(const dap::TypeInfo* typeinfo,
185 const GenericRequestHandler& handler) {
186 std::unique_lock<std::mutex> lock(requestMutex);
187 auto added =
188 requestMap
189 .emplace(typeinfo->name(), std::make_pair(typeinfo, handler))
190 .second;
191 if (!added) {
192 errorfLocked("Request handler for '%s' already registered",
193 typeinfo->name().c_str());
194 }
195 }
196
197 std::pair<const dap::TypeInfo*, GenericResponseHandler> response(
198 int64_t seq) {
199 std::unique_lock<std::mutex> lock(responseMutex);
200 auto responseIt = responseMap.find(seq);
201 if (responseIt == responseMap.end()) {
202 errorfLocked("Unknown response with sequence %d", seq);
203 return {};
204 }
205 auto out = std::move(responseIt->second);
206 responseMap.erase(seq);
207 return out;
208 }
209
210 void put(int seq,
211 const dap::TypeInfo* typeinfo,
212 const GenericResponseHandler& handler) {
213 std::unique_lock<std::mutex> lock(responseMutex);
214 auto added =
215 responseMap.emplace(seq, std::make_pair(typeinfo, handler)).second;
216 if (!added) {
217 errorfLocked("Response handler for sequence %d already registered",
218 seq);
219 }
220 }
221
222 std::pair<const dap::TypeInfo*, GenericEventHandler> event(
223 const std::string& name) {
224 std::unique_lock<std::mutex> lock(eventMutex);
225 auto it = eventMap.find(name);
226 return (it != eventMap.end()) ? it->second : decltype(it->second){};
227 }
228
229 void put(const dap::TypeInfo* typeinfo,
230 const GenericEventHandler& handler) {
231 std::unique_lock<std::mutex> lock(eventMutex);
232 auto added =
233 eventMap.emplace(typeinfo->name(), std::make_pair(typeinfo, handler))
234 .second;
235 if (!added) {
236 errorfLocked("Event handler for '%s' already registered",
237 typeinfo->name().c_str());
238 }
239 }
240
241 GenericResponseSentHandler responseSent(const dap::TypeInfo* typeinfo) {
242 std::unique_lock<std::mutex> lock(responseSentMutex);
243 auto it = responseSentMap.find(typeinfo);
244 return (it != responseSentMap.end()) ? it->second
245 : decltype(it->second){};
246 }
247
248 void put(const dap::TypeInfo* typeinfo,
249 const GenericResponseSentHandler& handler) {
250 std::unique_lock<std::mutex> lock(responseSentMutex);
251 auto added = responseSentMap.emplace(typeinfo, handler).second;
252 if (!added) {
253 errorfLocked("Response sent handler for '%s' already registered",
254 typeinfo->name().c_str());
255 }
256 }
257
258 private:
259 void errorfLocked(const char* format, ...) {
260 va_list vararg;
261 va_start(vararg, format);
262 errorLocked(format, vararg);
263 va_end(vararg);
264 }
265
266 void errorLocked(const char* format, va_list args) {
267 char buf[2048];
268 vsnprintf(buf, sizeof(buf), format, args);
269 if (errorHandler) {
270 errorHandler(buf);
271 }
272 }
273
274 std::mutex errorMutex;
275 ErrorHandler errorHandler;
276
277 std::mutex requestMutex;
278 std::unordered_map<std::string,
279 std::pair<const dap::TypeInfo*, GenericRequestHandler>>
280 requestMap;
281
282 std::mutex responseMutex;
283 std::unordered_map<int64_t,
284 std::pair<const dap::TypeInfo*, GenericResponseHandler>>
285 responseMap;
286
287 std::mutex eventMutex;
288 std::unordered_map<std::string,
289 std::pair<const dap::TypeInfo*, GenericEventHandler>>
290 eventMap;
291
292 std::mutex responseSentMutex;
293 std::unordered_map<const dap::TypeInfo*, GenericResponseSentHandler>
294 responseSentMap;
295 }; // EventHandlers
296
297 Payload processMessage(const std::string& str) {
298 auto d = dap::json::Deserializer(str);
299 dap::string type;
300 if (!d.field("type", &type)) {
301 handlers.error("Message missing string 'type' field");
302 return {};
303 }
304
305 dap::integer sequence = 0;
306 if (!d.field("seq", &sequence)) {
307 handlers.error("Message missing number 'seq' field");
308 return {};
309 }
310
311 if (type == "request") {
312 return processRequest(&d, sequence);
313 } else if (type == "event") {
314 return processEvent(&d);
315 } else if (type == "response") {
316 processResponse(&d);
317 return {};
318 } else {
319 handlers.error("Unknown message type '%s'", type.c_str());
320 }
321
322 return {};
323 }
324
325 Payload processRequest(dap::json::Deserializer* d, dap::integer sequence) {
326 dap::string command;
327 if (!d->field("command", &command)) {
328 handlers.error("Request missing string 'command' field");
329 return {};
330 }
331
332 const dap::TypeInfo* typeinfo;
333 GenericRequestHandler handler;
334 std::tie(typeinfo, handler) = handlers.request(command);
335 if (!typeinfo) {
336 handlers.error("No request handler registered for command '%s'",
337 command.c_str());
338 return {};
339 }
340
341 auto data = new uint8_t[typeinfo->size()];
342 typeinfo->construct(data);
343
344 if (!d->field("arguments", [&](dap::Deserializer* d) {
345 return typeinfo->deserialize(d, data);
346 })) {
347 handlers.error("Failed to deserialize request");
348 typeinfo->destruct(data);
349 delete[] data;
350 return {};
351 }
352
353 return [=] {
354 handler(
355 data,
356 [=](const dap::TypeInfo* typeinfo, const void* data) {
357 // onSuccess
358 dap::json::Serializer s;
359 s.object([&](dap::FieldSerializer* fs) {
360 return fs->field("seq", dap::integer(nextSeq++)) &&
361 fs->field("type", "response") &&
362 fs->field("request_seq", sequence) &&
363 fs->field("success", dap::boolean(true)) &&
364 fs->field("command", command) &&
365 fs->field("body", [&](dap::Serializer* s) {
366 return typeinfo->serialize(s, data);
367 });
368 });
369 //printf("%s\n", s.dump().c_str());
370 send(s.dump());
371
372 if (auto handler = handlers.responseSent(typeinfo)) {
373 handler(data, nullptr);
374 }
375 },
376 [=](const dap::TypeInfo* typeinfo, const dap::Error& error) {
377 // onError
378 dap::json::Serializer s;
379 s.object([&](dap::FieldSerializer* fs) {
380 return fs->field("seq", dap::integer(nextSeq++)) &&
381 fs->field("type", "response") &&
382 fs->field("request_seq", sequence) &&
383 fs->field("success", dap::boolean(false)) &&
384 fs->field("command", command) &&
385 fs->field("message", error.message);
386 });
387 Log(s.dump().c_str());
388 send(s.dump());
389
390 if (auto handler = handlers.responseSent(typeinfo)) {
391 handler(nullptr, &error);
392 }
393 });
394 typeinfo->destruct(data);
395 delete[] data;
396 };
397 }
398
399 Payload processEvent(dap::json::Deserializer* d) {
400 dap::string event;
401 if (!d->field("event", &event)) {
402 handlers.error("Event missing string 'event' field");
403 return {};
404 }
405
406 const dap::TypeInfo* typeinfo;
407 GenericEventHandler handler;
408 std::tie(typeinfo, handler) = handlers.event(event);
409 if (!typeinfo) {
410 handlers.error("No event handler registered for event '%s'",
411 event.c_str());
412 return {};
413 }
414
415 auto data = new uint8_t[typeinfo->size()];
416 typeinfo->construct(data);
417
418 // "body" is an optional field for some events, such as "Terminated Event".
419 bool body_ok = true;
420 d->field("body", [&](dap::Deserializer* d) {
421 if (!typeinfo->deserialize(d, data)) {
422 body_ok = false;
423 }
424 return true;
425 });
426
427 if (!body_ok) {
428 handlers.error("Failed to deserialize event '%s' body", event.c_str());
429 typeinfo->destruct(data);
430 delete[] data;
431 return {};
432 }
433
434 return [=] {
435 handler(data);
436 typeinfo->destruct(data);
437 delete[] data;
438 };
439 }
440
441 void processResponse(const dap::Deserializer* d) {
442 dap::integer requestSeq = 0;
443 if (!d->field("request_seq", &requestSeq)) {
444 handlers.error("Response missing int 'request_seq' field");
445 return;
446 }
447
448 const dap::TypeInfo* typeinfo;
449 GenericResponseHandler handler;
450 std::tie(typeinfo, handler) = handlers.response(requestSeq);
451 if (!typeinfo) {
452 handlers.error("Unknown response with sequence %d", requestSeq);
453 return;
454 }
455
456 dap::boolean success = false;
457 if (!d->field("success", &success)) {
458 handlers.error("Response missing int 'success' field");
459 return;
460 }
461
462 if (success) {
463 auto data = std::unique_ptr<uint8_t[]>(new uint8_t[typeinfo->size()]);
464 typeinfo->construct(data.get());
465
466 // "body" field in Response is an optional field.
467 d->field("body", [&](const dap::Deserializer* d) {
468 return typeinfo->deserialize(d, data.get());
469 });
470
471 handler(data.get(), nullptr);
472 typeinfo->destruct(data.get());
473 } else {
474 std::string message;
475 if (!d->field("message", &message)) {
476 handlers.error("Failed to deserialize message");
477 return;
478 }
479 auto error = dap::Error("%s", message.c_str());
480 handler(nullptr, &error);
481 }
482 }
483
484 std::atomic<bool> isBound = {false};
485 dap::ContentReader reader;
486 dap::ContentWriter writer;
487
488 std::atomic<bool> shutdown = {false};
489 EventHandlers handlers;
490 std::thread recvThread;
491 std::thread dispatchThread;
492 dap::Chan<Payload> inbox;
493 std::atomic<uint32_t> nextSeq = {1};
494 std::mutex sendMutex;
495};
496
497} // anonymous namespace
498
499namespace dap {
500
501Error::Error(const std::string& message) : message(message) {}
502
503Error::Error(const char* msg, ...) {
504 char buf[2048];
505 va_list vararg;
506 va_start(vararg, msg);
507 vsnprintf(buf, sizeof(buf), msg, vararg);
508 va_end(vararg);
509 message = buf;
510}
511
512Session::~Session() = default;
513
514std::unique_ptr<Session> Session::create() {
515 return std::unique_ptr<Session>(new Impl());
516}
517
518} // namespace dap
519