1 | /*************************************************************************** |
2 | * _ _ ____ _ |
3 | * Project ___| | | | _ \| | |
4 | * / __| | | | |_) | | |
5 | * | (__| |_| | _ <| |___ |
6 | * \___|\___/|_| \_\_____| |
7 | * |
8 | * Copyright (C) 1998 - 2021, Daniel Stenberg, <daniel@haxx.se>, et al. |
9 | * |
10 | * This software is licensed as described in the file COPYING, which |
11 | * you should have received as part of this distribution. The terms |
12 | * are also available at https://curl.se/docs/copyright.html. |
13 | * |
14 | * You may opt to use, copy, modify, merge, publish, distribute and/or sell |
15 | * copies of the Software, and permit persons to whom the Software is |
16 | * furnished to do so, under the terms of the COPYING file. |
17 | * |
18 | * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY |
19 | * KIND, either express or implied. |
20 | * |
21 | ***************************************************************************/ |
22 | #include "server_setup.h" |
23 | #include <stdlib.h> |
24 | #include <string.h> |
25 | #include "util.h" |
26 | |
27 | /* Function |
28 | * |
29 | * Accepts a TCP connection on a custom port (IPv4 or IPv6). Speaks MQTT. |
30 | * |
31 | * Read commands from FILE (set with --config). The commands control how to |
32 | * act and is reset to defaults each client TCP connect. |
33 | * |
34 | * Config file keywords: |
35 | * |
36 | * TODO |
37 | */ |
38 | |
39 | /* based on sockfilt.c */ |
40 | |
41 | #ifdef HAVE_SIGNAL_H |
42 | #include <signal.h> |
43 | #endif |
44 | #ifdef HAVE_NETINET_IN_H |
45 | #include <netinet/in.h> |
46 | #endif |
47 | #ifdef HAVE_NETINET_IN6_H |
48 | #include <netinet/in6.h> |
49 | #endif |
50 | #ifdef HAVE_ARPA_INET_H |
51 | #include <arpa/inet.h> |
52 | #endif |
53 | #ifdef HAVE_NETDB_H |
54 | #include <netdb.h> |
55 | #endif |
56 | |
57 | #define ENABLE_CURLX_PRINTF |
58 | /* make the curlx header define all printf() functions to use the curlx_* |
59 | versions instead */ |
60 | #include "curlx.h" /* from the private lib dir */ |
61 | #include "getpart.h" |
62 | #include "inet_pton.h" |
63 | #include "util.h" |
64 | #include "server_sockaddr.h" |
65 | #include "warnless.h" |
66 | |
67 | /* include memdebug.h last */ |
68 | #include "memdebug.h" |
69 | |
70 | #ifdef USE_WINSOCK |
71 | #undef EINTR |
72 | #define EINTR 4 /* errno.h value */ |
73 | #undef EAGAIN |
74 | #define EAGAIN 11 /* errno.h value */ |
75 | #undef ENOMEM |
76 | #define ENOMEM 12 /* errno.h value */ |
77 | #undef EINVAL |
78 | #define EINVAL 22 /* errno.h value */ |
79 | #endif |
80 | |
81 | #define DEFAULT_PORT 1883 /* MQTT default port */ |
82 | |
83 | #ifndef DEFAULT_LOGFILE |
84 | #define DEFAULT_LOGFILE "log/mqttd.log" |
85 | #endif |
86 | |
87 | #ifndef DEFAULT_CONFIG |
88 | #define DEFAULT_CONFIG "mqttd.config" |
89 | #endif |
90 | |
91 | #define MQTT_MSG_CONNECT 0x10 |
92 | #define MQTT_MSG_CONNACK 0x20 |
93 | #define MQTT_MSG_PUBLISH 0x30 |
94 | #define MQTT_MSG_PUBACK 0x40 |
95 | #define MQTT_MSG_SUBSCRIBE 0x82 |
96 | #define MQTT_MSG_SUBACK 0x90 |
97 | #define MQTT_MSG_DISCONNECT 0xe0 |
98 | |
99 | #define MQTT_CONNACK_LEN 4 |
100 | #define MQTT_SUBACK_LEN 5 |
101 | #define MQTT_CLIENTID_LEN 12 /* "curl0123abcd" */ |
102 | #define 5 /* max 5 bytes */ |
103 | |
104 | struct configurable { |
105 | unsigned char version; /* initial version byte in the request must match |
106 | this */ |
107 | bool publish_before_suback; |
108 | bool short_publish; |
109 | bool excessive_remaining; |
110 | unsigned char error_connack; |
111 | int testnum; |
112 | }; |
113 | |
114 | #define REQUEST_DUMP "log/server.input" |
115 | #define CONFIG_VERSION 5 |
116 | |
117 | static struct configurable config; |
118 | |
119 | const char *serverlogfile = DEFAULT_LOGFILE; |
120 | static const char *configfile = DEFAULT_CONFIG; |
121 | |
122 | #ifdef ENABLE_IPV6 |
123 | static bool use_ipv6 = FALSE; |
124 | #endif |
125 | static const char *ipv_inuse = "IPv4" ; |
126 | static unsigned short port = DEFAULT_PORT; |
127 | |
128 | static void resetdefaults(void) |
129 | { |
130 | logmsg("Reset to defaults" ); |
131 | config.version = CONFIG_VERSION; |
132 | config.publish_before_suback = FALSE; |
133 | config.short_publish = FALSE; |
134 | config.excessive_remaining = FALSE; |
135 | config.error_connack = 0; |
136 | config.testnum = 0; |
137 | } |
138 | |
139 | static unsigned char byteval(char *value) |
140 | { |
141 | unsigned long num = strtoul(value, NULL, 10); |
142 | return num & 0xff; |
143 | } |
144 | |
145 | static void getconfig(void) |
146 | { |
147 | FILE *fp = fopen(configfile, FOPEN_READTEXT); |
148 | resetdefaults(); |
149 | if(fp) { |
150 | char buffer[512]; |
151 | logmsg("parse config file" ); |
152 | while(fgets(buffer, sizeof(buffer), fp)) { |
153 | char key[32]; |
154 | char value[32]; |
155 | if(2 == sscanf(buffer, "%31s %31s" , key, value)) { |
156 | if(!strcmp(key, "version" )) { |
157 | config.version = byteval(value); |
158 | logmsg("version [%d] set" , config.version); |
159 | } |
160 | else if(!strcmp(key, "PUBLISH-before-SUBACK" )) { |
161 | logmsg("PUBLISH-before-SUBACK set" ); |
162 | config.publish_before_suback = TRUE; |
163 | } |
164 | else if(!strcmp(key, "short-PUBLISH" )) { |
165 | logmsg("short-PUBLISH set" ); |
166 | config.short_publish = TRUE; |
167 | } |
168 | else if(!strcmp(key, "error-CONNACK" )) { |
169 | config.error_connack = byteval(value); |
170 | logmsg("error-CONNACK = %d" , config.error_connack); |
171 | } |
172 | else if(!strcmp(key, "Testnum" )) { |
173 | config.testnum = atoi(value); |
174 | logmsg("testnum = %d" , config.testnum); |
175 | } |
176 | else if(!strcmp(key, "excessive-remaining" )) { |
177 | logmsg("excessive-remaining set" ); |
178 | config.excessive_remaining = TRUE; |
179 | } |
180 | } |
181 | } |
182 | fclose(fp); |
183 | } |
184 | else { |
185 | logmsg("No config file '%s' to read" , configfile); |
186 | } |
187 | } |
188 | |
189 | static void loghex(unsigned char *buffer, ssize_t len) |
190 | { |
191 | char data[12000]; |
192 | ssize_t i; |
193 | unsigned char *ptr = buffer; |
194 | char *optr = data; |
195 | ssize_t width = 0; |
196 | int left = sizeof(data); |
197 | |
198 | for(i = 0; i<len && (left >= 0); i++) { |
199 | msnprintf(optr, left, "%02x" , ptr[i]); |
200 | width += 2; |
201 | optr += 2; |
202 | left -= 2; |
203 | } |
204 | if(width) |
205 | logmsg("'%s'" , data); |
206 | } |
207 | |
208 | typedef enum { |
209 | FROM_CLIENT, |
210 | FROM_SERVER |
211 | } mqttdir; |
212 | |
213 | static void logprotocol(mqttdir dir, |
214 | const char *prefix, size_t remlen, |
215 | FILE *output, |
216 | unsigned char *buffer, ssize_t len) |
217 | { |
218 | char data[12000] = "" ; |
219 | ssize_t i; |
220 | unsigned char *ptr = buffer; |
221 | char *optr = data; |
222 | int left = sizeof(data); |
223 | |
224 | for(i = 0; i<len && (left >= 0); i++) { |
225 | msnprintf(optr, left, "%02x" , ptr[i]); |
226 | optr += 2; |
227 | left -= 2; |
228 | } |
229 | fprintf(output, "%s %s %zx %s\n" , |
230 | dir == FROM_CLIENT? "client" : "server" , |
231 | prefix, remlen, |
232 | data); |
233 | } |
234 | |
235 | |
236 | /* return 0 on success */ |
237 | static int connack(FILE *dump, curl_socket_t fd) |
238 | { |
239 | unsigned char packet[]={ |
240 | MQTT_MSG_CONNACK, 0x02, |
241 | 0x00, 0x00 |
242 | }; |
243 | ssize_t rc; |
244 | |
245 | packet[3] = config.error_connack; |
246 | |
247 | rc = swrite(fd, (char *)packet, sizeof(packet)); |
248 | if(rc > 0) { |
249 | logmsg("WROTE %d bytes [CONNACK]" , rc); |
250 | loghex(packet, rc); |
251 | logprotocol(FROM_SERVER, "CONNACK" , 2, dump, packet, sizeof(packet)); |
252 | } |
253 | if(rc == sizeof(packet)) { |
254 | return 0; |
255 | } |
256 | return 1; |
257 | } |
258 | |
259 | /* return 0 on success */ |
260 | static int suback(FILE *dump, curl_socket_t fd, unsigned short packetid) |
261 | { |
262 | unsigned char packet[]={ |
263 | MQTT_MSG_SUBACK, 0x03, |
264 | 0, 0, /* filled in below */ |
265 | 0x00 |
266 | }; |
267 | ssize_t rc; |
268 | packet[2] = (unsigned char)(packetid >> 8); |
269 | packet[3] = (unsigned char)(packetid & 0xff); |
270 | |
271 | rc = swrite(fd, (char *)packet, sizeof(packet)); |
272 | if(rc == sizeof(packet)) { |
273 | logmsg("WROTE %d bytes [SUBACK]" , rc); |
274 | loghex(packet, rc); |
275 | logprotocol(FROM_SERVER, "SUBACK" , 3, dump, packet, rc); |
276 | return 0; |
277 | } |
278 | return 1; |
279 | } |
280 | |
281 | #ifdef QOS |
282 | /* return 0 on success */ |
283 | static int puback(FILE *dump, curl_socket_t fd, unsigned short packetid) |
284 | { |
285 | unsigned char packet[]={ |
286 | MQTT_MSG_PUBACK, 0x00, |
287 | 0, 0 /* filled in below */ |
288 | }; |
289 | ssize_t rc; |
290 | packet[2] = (unsigned char)(packetid >> 8); |
291 | packet[3] = (unsigned char)(packetid & 0xff); |
292 | |
293 | rc = swrite(fd, (char *)packet, sizeof(packet)); |
294 | if(rc == sizeof(packet)) { |
295 | logmsg("WROTE %d bytes [PUBACK]" , rc); |
296 | loghex(packet, rc); |
297 | logprotocol(FROM_SERVER, dump, packet, rc); |
298 | return 0; |
299 | } |
300 | logmsg("Failed sending [PUBACK]" ); |
301 | return 1; |
302 | } |
303 | #endif |
304 | |
305 | /* return 0 on success */ |
306 | static int disconnect(FILE *dump, curl_socket_t fd) |
307 | { |
308 | unsigned char packet[]={ |
309 | MQTT_MSG_DISCONNECT, 0x00, |
310 | }; |
311 | ssize_t rc = swrite(fd, (char *)packet, sizeof(packet)); |
312 | if(rc == sizeof(packet)) { |
313 | logmsg("WROTE %d bytes [DISCONNECT]" , rc); |
314 | loghex(packet, rc); |
315 | logprotocol(FROM_SERVER, "DISCONNECT" , 0, dump, packet, rc); |
316 | return 0; |
317 | } |
318 | logmsg("Failed sending [DISCONNECT]" ); |
319 | return 1; |
320 | } |
321 | |
322 | |
323 | |
324 | /* |
325 | do |
326 | |
327 | encodedByte = X MOD 128 |
328 | |
329 | X = X DIV 128 |
330 | |
331 | // if there are more data to encode, set the top bit of this byte |
332 | |
333 | if ( X > 0 ) |
334 | |
335 | encodedByte = encodedByte OR 128 |
336 | |
337 | endif |
338 | |
339 | 'output' encodedByte |
340 | |
341 | while ( X > 0 ) |
342 | |
343 | */ |
344 | |
345 | /* return number of bytes used */ |
346 | static int encode_length(size_t packetlen, |
347 | unsigned char *remlength) /* 4 bytes */ |
348 | { |
349 | int bytes = 0; |
350 | unsigned char encode; |
351 | |
352 | do { |
353 | encode = packetlen % 0x80; |
354 | packetlen /= 0x80; |
355 | if(packetlen) |
356 | encode |= 0x80; |
357 | |
358 | remlength[bytes++] = encode; |
359 | |
360 | if(bytes > 3) { |
361 | logmsg("too large packet!" ); |
362 | return 0; |
363 | } |
364 | } while(packetlen); |
365 | |
366 | return bytes; |
367 | } |
368 | |
369 | |
370 | static size_t decode_length(unsigned char *buf, |
371 | size_t buflen, size_t *lenbytes) |
372 | { |
373 | size_t len = 0; |
374 | size_t mult = 1; |
375 | size_t i; |
376 | unsigned char encoded = 0x80; |
377 | |
378 | for(i = 0; (i < buflen) && (encoded & 0x80); i++) { |
379 | encoded = buf[i]; |
380 | len += (encoded & 0x7f) * mult; |
381 | mult *= 0x80; |
382 | } |
383 | |
384 | if(lenbytes) |
385 | *lenbytes = i; |
386 | |
387 | return len; |
388 | } |
389 | |
390 | |
391 | /* return 0 on success */ |
392 | static int publish(FILE *dump, |
393 | curl_socket_t fd, unsigned short packetid, |
394 | char *topic, char *payload, size_t payloadlen) |
395 | { |
396 | size_t topiclen = strlen(topic); |
397 | unsigned char *packet; |
398 | size_t payloadindex; |
399 | ssize_t remaininglength = topiclen + 2 + payloadlen; |
400 | ssize_t packetlen; |
401 | ssize_t sendamount; |
402 | ssize_t rc; |
403 | unsigned char rembuffer[4]; |
404 | int encodedlen; |
405 | |
406 | if(config.excessive_remaining) { |
407 | /* manually set illegal remaining length */ |
408 | rembuffer[0] = 0xff; |
409 | rembuffer[1] = 0xff; |
410 | rembuffer[2] = 0xff; |
411 | rembuffer[3] = 0x80; /* maximum allowed here by spec is 0x7f */ |
412 | encodedlen = 4; |
413 | } |
414 | else |
415 | encodedlen = encode_length(remaininglength, rembuffer); |
416 | |
417 | /* one packet type byte (possibly two more for packetid) */ |
418 | packetlen = remaininglength + encodedlen + 1; |
419 | packet = malloc(packetlen); |
420 | if(!packet) |
421 | return 1; |
422 | |
423 | packet[0] = MQTT_MSG_PUBLISH; /* TODO: set QoS? */ |
424 | memcpy(&packet[1], rembuffer, encodedlen); |
425 | |
426 | (void)packetid; |
427 | /* packet_id if QoS is set */ |
428 | |
429 | packet[1 + encodedlen] = (unsigned char)(topiclen >> 8); |
430 | packet[2 + encodedlen] = (unsigned char)(topiclen & 0xff); |
431 | memcpy(&packet[3 + encodedlen], topic, topiclen); |
432 | |
433 | payloadindex = 3 + topiclen + encodedlen; |
434 | memcpy(&packet[payloadindex], payload, payloadlen); |
435 | |
436 | sendamount = packetlen; |
437 | if(config.short_publish) |
438 | sendamount -= 2; |
439 | |
440 | rc = swrite(fd, (char *)packet, sendamount); |
441 | if(rc > 0) { |
442 | logmsg("WROTE %d bytes [PUBLISH]" , rc); |
443 | loghex(packet, rc); |
444 | logprotocol(FROM_SERVER, "PUBLISH" , remaininglength, dump, packet, rc); |
445 | } |
446 | if(rc == packetlen) |
447 | return 0; |
448 | return 1; |
449 | } |
450 | |
451 | #define MAX_TOPIC_LENGTH 65535 |
452 | #define MAX_CLIENT_ID_LENGTH 32 |
453 | |
454 | static char topic[MAX_TOPIC_LENGTH + 1]; |
455 | |
456 | static int (curl_socket_t fd, |
457 | unsigned char *bytep, |
458 | size_t *remaining_lengthp, |
459 | size_t *remaining_length_bytesp) |
460 | { |
461 | /* get the fixed header */ |
462 | unsigned char buffer[10]; |
463 | |
464 | /* get the first two bytes */ |
465 | ssize_t rc = sread(fd, (char *)buffer, 2); |
466 | int i; |
467 | if(rc < 2) { |
468 | logmsg("READ %d bytes [SHORT!]" , rc); |
469 | return 1; /* fail */ |
470 | } |
471 | logmsg("READ %d bytes" , rc); |
472 | loghex(buffer, rc); |
473 | *bytep = buffer[0]; |
474 | |
475 | /* if the length byte has the top bit set, get the next one too */ |
476 | i = 1; |
477 | while(buffer[i] & 0x80) { |
478 | i++; |
479 | rc = sread(fd, (char *)&buffer[i], 1); |
480 | if(rc != 1) { |
481 | logmsg("Remaining Length broken" ); |
482 | return 1; |
483 | } |
484 | } |
485 | *remaining_lengthp = decode_length(&buffer[1], i, remaining_length_bytesp); |
486 | logmsg("Remaining Length: %ld [%d bytes]" , (long) *remaining_lengthp, |
487 | *remaining_length_bytesp); |
488 | return 0; |
489 | } |
490 | |
491 | static curl_socket_t mqttit(curl_socket_t fd) |
492 | { |
493 | size_t buff_size = 10*1024; |
494 | unsigned char *buffer = NULL; |
495 | ssize_t rc; |
496 | unsigned char byte; |
497 | unsigned short packet_id; |
498 | size_t payload_len; |
499 | size_t client_id_length; |
500 | unsigned int topic_len; |
501 | size_t remaining_length = 0; |
502 | size_t bytes = 0; /* remaining length field size in bytes */ |
503 | char client_id[MAX_CLIENT_ID_LENGTH]; |
504 | long testno; |
505 | FILE *stream = NULL; |
506 | |
507 | |
508 | static const char protocol[7] = { |
509 | 0x00, 0x04, /* protocol length */ |
510 | 'M','Q','T','T', /* protocol name */ |
511 | 0x04 /* protocol level */ |
512 | }; |
513 | FILE *dump = fopen(REQUEST_DUMP, "ab" ); |
514 | if(!dump) |
515 | goto end; |
516 | |
517 | getconfig(); |
518 | |
519 | testno = config.testnum; |
520 | |
521 | if(testno) |
522 | logmsg("Found test number %ld" , testno); |
523 | |
524 | buffer = malloc(buff_size); |
525 | if(!buffer) { |
526 | logmsg("Out of memory, unable to allocate buffer" ); |
527 | goto end; |
528 | } |
529 | |
530 | do { |
531 | unsigned char usr_flag = 0x80; |
532 | unsigned char passwd_flag = 0x40; |
533 | unsigned char conn_flags; |
534 | const size_t client_id_offset = 12; |
535 | size_t start_usr; |
536 | size_t start_passwd; |
537 | |
538 | /* get the fixed header */ |
539 | rc = fixedheader(fd, &byte, &remaining_length, &bytes); |
540 | if(rc) |
541 | break; |
542 | |
543 | if(remaining_length >= buff_size) { |
544 | buff_size = remaining_length; |
545 | buffer = realloc(buffer, buff_size); |
546 | if(!buffer) { |
547 | logmsg("Failed realloc of size %lu" , buff_size); |
548 | goto end; |
549 | } |
550 | } |
551 | |
552 | if(remaining_length) { |
553 | /* reading variable header and payload into buffer */ |
554 | rc = sread(fd, (char *)buffer, remaining_length); |
555 | if(rc > 0) { |
556 | logmsg("READ %d bytes" , rc); |
557 | loghex(buffer, rc); |
558 | } |
559 | } |
560 | |
561 | if(byte == MQTT_MSG_CONNECT) { |
562 | logprotocol(FROM_CLIENT, "CONNECT" , remaining_length, |
563 | dump, buffer, rc); |
564 | |
565 | if(memcmp(protocol, buffer, sizeof(protocol))) { |
566 | logmsg("Protocol preamble mismatch" ); |
567 | goto end; |
568 | } |
569 | /* ignore the connect flag byte and two keepalive bytes */ |
570 | payload_len = (buffer[10] << 8) | buffer[11]; |
571 | /* first part of the payload is the client ID */ |
572 | client_id_length = payload_len; |
573 | |
574 | /* checking if user and password flags were set */ |
575 | conn_flags = buffer[7]; |
576 | |
577 | start_usr = client_id_offset + payload_len; |
578 | if(usr_flag == (unsigned char)(conn_flags & usr_flag)) { |
579 | logmsg("User flag is present in CONN flag" ); |
580 | payload_len += (buffer[start_usr] << 8) | buffer[start_usr + 1]; |
581 | payload_len += 2; /* MSB and LSB for user length */ |
582 | } |
583 | |
584 | start_passwd = client_id_offset + payload_len; |
585 | if(passwd_flag == (char)(conn_flags & passwd_flag)) { |
586 | logmsg("Password flag is present in CONN flags" ); |
587 | payload_len += (buffer[start_passwd] << 8) | buffer[start_passwd + 1]; |
588 | payload_len += 2; /* MSB and LSB for password length */ |
589 | } |
590 | |
591 | /* check the length of the payload */ |
592 | if((ssize_t)payload_len != (rc - 12)) { |
593 | logmsg("Payload length mismatch, expected %x got %x" , |
594 | rc - 12, payload_len); |
595 | goto end; |
596 | } |
597 | /* check the length of the client ID */ |
598 | else if((client_id_length + 1) > MAX_CLIENT_ID_LENGTH) { |
599 | logmsg("Too large client id" ); |
600 | goto end; |
601 | } |
602 | memcpy(client_id, &buffer[12], client_id_length); |
603 | client_id[client_id_length] = 0; |
604 | |
605 | logmsg("MQTT client connect accepted: %s" , client_id); |
606 | |
607 | /* The first packet sent from the Server to the Client MUST be a |
608 | CONNACK Packet */ |
609 | |
610 | if(connack(dump, fd)) { |
611 | logmsg("failed sending CONNACK" ); |
612 | goto end; |
613 | } |
614 | } |
615 | else if(byte == MQTT_MSG_SUBSCRIBE) { |
616 | int error; |
617 | char *data; |
618 | size_t datalen; |
619 | logprotocol(FROM_CLIENT, "SUBSCRIBE" , remaining_length, |
620 | dump, buffer, rc); |
621 | logmsg("Incoming SUBSCRIBE" ); |
622 | |
623 | if(rc < 6) { |
624 | logmsg("Too small SUBSCRIBE" ); |
625 | goto end; |
626 | } |
627 | |
628 | /* two bytes packet id */ |
629 | packet_id = (unsigned short)((buffer[0] << 8) | buffer[1]); |
630 | |
631 | /* two bytes topic length */ |
632 | topic_len = (buffer[2] << 8) | buffer[3]; |
633 | if(topic_len != (remaining_length - 5)) { |
634 | logmsg("Wrong topic length, got %d expected %d" , |
635 | topic_len, remaining_length - 5); |
636 | goto end; |
637 | } |
638 | memcpy(topic, &buffer[4], topic_len); |
639 | topic[topic_len] = 0; |
640 | |
641 | /* there's a QoS byte (two bits) after the topic */ |
642 | |
643 | logmsg("SUBSCRIBE to '%s' [%d]" , topic, packet_id); |
644 | stream = test2fopen(testno); |
645 | error = getpart(&data, &datalen, "reply" , "data" , stream); |
646 | if(!error) { |
647 | if(!config.publish_before_suback) { |
648 | if(suback(dump, fd, packet_id)) { |
649 | logmsg("failed sending SUBACK" ); |
650 | goto end; |
651 | } |
652 | } |
653 | if(publish(dump, fd, packet_id, topic, data, datalen)) { |
654 | logmsg("PUBLISH failed" ); |
655 | goto end; |
656 | } |
657 | if(config.publish_before_suback) { |
658 | if(suback(dump, fd, packet_id)) { |
659 | logmsg("failed sending SUBACK" ); |
660 | goto end; |
661 | } |
662 | } |
663 | } |
664 | else { |
665 | char *def = (char *)"this is random payload yes yes it is" ; |
666 | publish(dump, fd, packet_id, topic, def, strlen(def)); |
667 | } |
668 | disconnect(dump, fd); |
669 | } |
670 | else if((byte & 0xf0) == (MQTT_MSG_PUBLISH & 0xf0)) { |
671 | size_t topiclen; |
672 | |
673 | logmsg("Incoming PUBLISH" ); |
674 | logprotocol(FROM_CLIENT, "PUBLISH" , remaining_length, |
675 | dump, buffer, rc); |
676 | |
677 | topiclen = (buffer[1 + bytes] << 8) | buffer[2 + bytes]; |
678 | logmsg("Got %d bytes topic" , topiclen); |
679 | /* TODO: verify topiclen */ |
680 | |
681 | #ifdef QOS |
682 | /* TODO: handle packetid if there is one. Send puback if QoS > 0 */ |
683 | puback(dump, fd, 0); |
684 | #endif |
685 | /* expect a disconnect here */ |
686 | /* get the request */ |
687 | rc = sread(fd, (char *)&buffer[0], 2); |
688 | |
689 | logmsg("READ %d bytes [DISCONNECT]" , rc); |
690 | loghex(buffer, rc); |
691 | logprotocol(FROM_CLIENT, "DISCONNECT" , 0, dump, buffer, rc); |
692 | goto end; |
693 | } |
694 | else { |
695 | /* not supported (yet) */ |
696 | goto end; |
697 | } |
698 | } while(1); |
699 | |
700 | end: |
701 | if(buffer) |
702 | free(buffer); |
703 | if(dump) |
704 | fclose(dump); |
705 | if(stream) |
706 | fclose(stream); |
707 | return CURL_SOCKET_BAD; |
708 | } |
709 | |
710 | /* |
711 | sockfdp is a pointer to an established stream or CURL_SOCKET_BAD |
712 | |
713 | if sockfd is CURL_SOCKET_BAD, listendfd is a listening socket we must |
714 | accept() |
715 | */ |
716 | static bool incoming(curl_socket_t listenfd) |
717 | { |
718 | fd_set fds_read; |
719 | fd_set fds_write; |
720 | fd_set fds_err; |
721 | int clients = 0; /* connected clients */ |
722 | |
723 | if(got_exit_signal) { |
724 | logmsg("signalled to die, exiting..." ); |
725 | return FALSE; |
726 | } |
727 | |
728 | #ifdef HAVE_GETPPID |
729 | /* As a last resort, quit if socks5 process becomes orphan. */ |
730 | if(getppid() <= 1) { |
731 | logmsg("process becomes orphan, exiting" ); |
732 | return FALSE; |
733 | } |
734 | #endif |
735 | |
736 | do { |
737 | ssize_t rc; |
738 | int error = 0; |
739 | curl_socket_t sockfd = listenfd; |
740 | int maxfd = (int)sockfd; |
741 | |
742 | FD_ZERO(&fds_read); |
743 | FD_ZERO(&fds_write); |
744 | FD_ZERO(&fds_err); |
745 | |
746 | /* there's always a socket to wait for */ |
747 | FD_SET(sockfd, &fds_read); |
748 | |
749 | do { |
750 | /* select() blocking behavior call on blocking descriptors please */ |
751 | rc = select(maxfd + 1, &fds_read, &fds_write, &fds_err, NULL); |
752 | if(got_exit_signal) { |
753 | logmsg("signalled to die, exiting..." ); |
754 | return FALSE; |
755 | } |
756 | } while((rc == -1) && ((error = SOCKERRNO) == EINTR)); |
757 | |
758 | if(rc < 0) { |
759 | logmsg("select() failed with error: (%d) %s" , |
760 | error, strerror(error)); |
761 | return FALSE; |
762 | } |
763 | |
764 | if(FD_ISSET(sockfd, &fds_read)) { |
765 | curl_socket_t newfd = accept(sockfd, NULL, NULL); |
766 | if(CURL_SOCKET_BAD == newfd) { |
767 | error = SOCKERRNO; |
768 | logmsg("accept(%d, NULL, NULL) failed with error: (%d) %s" , |
769 | sockfd, error, strerror(error)); |
770 | } |
771 | else { |
772 | logmsg("====> Client connect, fd %d. Read config from %s" , |
773 | newfd, configfile); |
774 | set_advisor_read_lock(SERVERLOGS_LOCK); |
775 | (void)mqttit(newfd); /* until done */ |
776 | clear_advisor_read_lock(SERVERLOGS_LOCK); |
777 | |
778 | logmsg("====> Client disconnect" ); |
779 | sclose(newfd); |
780 | } |
781 | } |
782 | } while(clients); |
783 | |
784 | return TRUE; |
785 | } |
786 | |
787 | static curl_socket_t sockdaemon(curl_socket_t sock, |
788 | unsigned short *listenport) |
789 | { |
790 | /* passive daemon style */ |
791 | srvr_sockaddr_union_t listener; |
792 | int flag; |
793 | int rc; |
794 | int totdelay = 0; |
795 | int maxretr = 10; |
796 | int delay = 20; |
797 | int attempt = 0; |
798 | int error = 0; |
799 | |
800 | do { |
801 | attempt++; |
802 | flag = 1; |
803 | rc = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, |
804 | (void *)&flag, sizeof(flag)); |
805 | if(rc) { |
806 | error = SOCKERRNO; |
807 | logmsg("setsockopt(SO_REUSEADDR) failed with error: (%d) %s" , |
808 | error, strerror(error)); |
809 | if(maxretr) { |
810 | rc = wait_ms(delay); |
811 | if(rc) { |
812 | /* should not happen */ |
813 | logmsg("wait_ms() failed with error: %d" , rc); |
814 | sclose(sock); |
815 | return CURL_SOCKET_BAD; |
816 | } |
817 | if(got_exit_signal) { |
818 | logmsg("signalled to die, exiting..." ); |
819 | sclose(sock); |
820 | return CURL_SOCKET_BAD; |
821 | } |
822 | totdelay += delay; |
823 | delay *= 2; /* double the sleep for next attempt */ |
824 | } |
825 | } |
826 | } while(rc && maxretr--); |
827 | |
828 | if(rc) { |
829 | logmsg("setsockopt(SO_REUSEADDR) failed %d times in %d ms. Error: (%d) %s" , |
830 | attempt, totdelay, error, strerror(error)); |
831 | logmsg("Continuing anyway..." ); |
832 | } |
833 | |
834 | /* When the specified listener port is zero, it is actually a |
835 | request to let the system choose a non-zero available port. */ |
836 | |
837 | #ifdef ENABLE_IPV6 |
838 | if(!use_ipv6) { |
839 | #endif |
840 | memset(&listener.sa4, 0, sizeof(listener.sa4)); |
841 | listener.sa4.sin_family = AF_INET; |
842 | listener.sa4.sin_addr.s_addr = INADDR_ANY; |
843 | listener.sa4.sin_port = htons(*listenport); |
844 | rc = bind(sock, &listener.sa, sizeof(listener.sa4)); |
845 | #ifdef ENABLE_IPV6 |
846 | } |
847 | else { |
848 | memset(&listener.sa6, 0, sizeof(listener.sa6)); |
849 | listener.sa6.sin6_family = AF_INET6; |
850 | listener.sa6.sin6_addr = in6addr_any; |
851 | listener.sa6.sin6_port = htons(*listenport); |
852 | rc = bind(sock, &listener.sa, sizeof(listener.sa6)); |
853 | } |
854 | #endif /* ENABLE_IPV6 */ |
855 | if(rc) { |
856 | error = SOCKERRNO; |
857 | logmsg("Error binding socket on port %hu: (%d) %s" , |
858 | *listenport, error, strerror(error)); |
859 | sclose(sock); |
860 | return CURL_SOCKET_BAD; |
861 | } |
862 | |
863 | if(!*listenport) { |
864 | /* The system was supposed to choose a port number, figure out which |
865 | port we actually got and update the listener port value with it. */ |
866 | curl_socklen_t la_size; |
867 | srvr_sockaddr_union_t localaddr; |
868 | #ifdef ENABLE_IPV6 |
869 | if(!use_ipv6) |
870 | #endif |
871 | la_size = sizeof(localaddr.sa4); |
872 | #ifdef ENABLE_IPV6 |
873 | else |
874 | la_size = sizeof(localaddr.sa6); |
875 | #endif |
876 | memset(&localaddr.sa, 0, (size_t)la_size); |
877 | if(getsockname(sock, &localaddr.sa, &la_size) < 0) { |
878 | error = SOCKERRNO; |
879 | logmsg("getsockname() failed with error: (%d) %s" , |
880 | error, strerror(error)); |
881 | sclose(sock); |
882 | return CURL_SOCKET_BAD; |
883 | } |
884 | switch(localaddr.sa.sa_family) { |
885 | case AF_INET: |
886 | *listenport = ntohs(localaddr.sa4.sin_port); |
887 | break; |
888 | #ifdef ENABLE_IPV6 |
889 | case AF_INET6: |
890 | *listenport = ntohs(localaddr.sa6.sin6_port); |
891 | break; |
892 | #endif |
893 | default: |
894 | break; |
895 | } |
896 | if(!*listenport) { |
897 | /* Real failure, listener port shall not be zero beyond this point. */ |
898 | logmsg("Apparently getsockname() succeeded, with listener port zero." ); |
899 | logmsg("A valid reason for this failure is a binary built without" ); |
900 | logmsg("proper network library linkage. This might not be the only" ); |
901 | logmsg("reason, but double check it before anything else." ); |
902 | sclose(sock); |
903 | return CURL_SOCKET_BAD; |
904 | } |
905 | } |
906 | |
907 | /* start accepting connections */ |
908 | rc = listen(sock, 5); |
909 | if(0 != rc) { |
910 | error = SOCKERRNO; |
911 | logmsg("listen(%d, 5) failed with error: (%d) %s" , |
912 | sock, error, strerror(error)); |
913 | sclose(sock); |
914 | return CURL_SOCKET_BAD; |
915 | } |
916 | |
917 | return sock; |
918 | } |
919 | |
920 | |
921 | int main(int argc, char *argv[]) |
922 | { |
923 | curl_socket_t sock = CURL_SOCKET_BAD; |
924 | curl_socket_t msgsock = CURL_SOCKET_BAD; |
925 | int wrotepidfile = 0; |
926 | int wroteportfile = 0; |
927 | const char *pidname = ".mqttd.pid" ; |
928 | const char *portname = ".mqttd.port" ; |
929 | bool juggle_again; |
930 | int error; |
931 | int arg = 1; |
932 | |
933 | while(argc>arg) { |
934 | if(!strcmp("--version" , argv[arg])) { |
935 | printf("mqttd IPv4%s\n" , |
936 | #ifdef ENABLE_IPV6 |
937 | "/IPv6" |
938 | #else |
939 | "" |
940 | #endif |
941 | ); |
942 | return 0; |
943 | } |
944 | else if(!strcmp("--pidfile" , argv[arg])) { |
945 | arg++; |
946 | if(argc>arg) |
947 | pidname = argv[arg++]; |
948 | } |
949 | else if(!strcmp("--portfile" , argv[arg])) { |
950 | arg++; |
951 | if(argc>arg) |
952 | portname = argv[arg++]; |
953 | } |
954 | else if(!strcmp("--config" , argv[arg])) { |
955 | arg++; |
956 | if(argc>arg) |
957 | configfile = argv[arg++]; |
958 | } |
959 | else if(!strcmp("--logfile" , argv[arg])) { |
960 | arg++; |
961 | if(argc>arg) |
962 | serverlogfile = argv[arg++]; |
963 | } |
964 | else if(!strcmp("--ipv6" , argv[arg])) { |
965 | #ifdef ENABLE_IPV6 |
966 | ipv_inuse = "IPv6" ; |
967 | use_ipv6 = TRUE; |
968 | #endif |
969 | arg++; |
970 | } |
971 | else if(!strcmp("--ipv4" , argv[arg])) { |
972 | /* for completeness, we support this option as well */ |
973 | #ifdef ENABLE_IPV6 |
974 | ipv_inuse = "IPv4" ; |
975 | use_ipv6 = FALSE; |
976 | #endif |
977 | arg++; |
978 | } |
979 | else if(!strcmp("--port" , argv[arg])) { |
980 | arg++; |
981 | if(argc>arg) { |
982 | char *endptr; |
983 | unsigned long ulnum = strtoul(argv[arg], &endptr, 10); |
984 | if((endptr != argv[arg] + strlen(argv[arg])) || |
985 | ((ulnum != 0UL) && ((ulnum < 1025UL) || (ulnum > 65535UL)))) { |
986 | fprintf(stderr, "mqttd: invalid --port argument (%s)\n" , |
987 | argv[arg]); |
988 | return 0; |
989 | } |
990 | port = curlx_ultous(ulnum); |
991 | arg++; |
992 | } |
993 | } |
994 | else { |
995 | puts("Usage: mqttd [option]\n" |
996 | " --config [file]\n" |
997 | " --version\n" |
998 | " --logfile [file]\n" |
999 | " --pidfile [file]\n" |
1000 | " --portfile [file]\n" |
1001 | " --ipv4\n" |
1002 | " --ipv6\n" |
1003 | " --port [port]\n" ); |
1004 | return 0; |
1005 | } |
1006 | } |
1007 | |
1008 | #ifdef WIN32 |
1009 | win32_init(); |
1010 | atexit(win32_cleanup); |
1011 | |
1012 | setmode(fileno(stdin), O_BINARY); |
1013 | setmode(fileno(stdout), O_BINARY); |
1014 | setmode(fileno(stderr), O_BINARY); |
1015 | #endif |
1016 | |
1017 | install_signal_handlers(FALSE); |
1018 | |
1019 | #ifdef ENABLE_IPV6 |
1020 | if(!use_ipv6) |
1021 | #endif |
1022 | sock = socket(AF_INET, SOCK_STREAM, 0); |
1023 | #ifdef ENABLE_IPV6 |
1024 | else |
1025 | sock = socket(AF_INET6, SOCK_STREAM, 0); |
1026 | #endif |
1027 | |
1028 | if(CURL_SOCKET_BAD == sock) { |
1029 | error = SOCKERRNO; |
1030 | logmsg("Error creating socket: (%d) %s" , |
1031 | error, strerror(error)); |
1032 | goto mqttd_cleanup; |
1033 | } |
1034 | |
1035 | { |
1036 | /* passive daemon style */ |
1037 | sock = sockdaemon(sock, &port); |
1038 | if(CURL_SOCKET_BAD == sock) { |
1039 | goto mqttd_cleanup; |
1040 | } |
1041 | msgsock = CURL_SOCKET_BAD; /* no stream socket yet */ |
1042 | } |
1043 | |
1044 | logmsg("Running %s version" , ipv_inuse); |
1045 | logmsg("Listening on port %hu" , port); |
1046 | |
1047 | wrotepidfile = write_pidfile(pidname); |
1048 | if(!wrotepidfile) { |
1049 | goto mqttd_cleanup; |
1050 | } |
1051 | |
1052 | wroteportfile = write_portfile(portname, port); |
1053 | if(!wroteportfile) { |
1054 | goto mqttd_cleanup; |
1055 | } |
1056 | |
1057 | do { |
1058 | juggle_again = incoming(sock); |
1059 | } while(juggle_again); |
1060 | |
1061 | mqttd_cleanup: |
1062 | |
1063 | if((msgsock != sock) && (msgsock != CURL_SOCKET_BAD)) |
1064 | sclose(msgsock); |
1065 | |
1066 | if(sock != CURL_SOCKET_BAD) |
1067 | sclose(sock); |
1068 | |
1069 | if(wrotepidfile) |
1070 | unlink(pidname); |
1071 | if(wroteportfile) |
1072 | unlink(portname); |
1073 | |
1074 | restore_signal_handlers(FALSE); |
1075 | |
1076 | if(got_exit_signal) { |
1077 | logmsg("============> mqttd exits with signal (%d)" , exit_signal); |
1078 | /* |
1079 | * To properly set the return status of the process we |
1080 | * must raise the same signal SIGINT or SIGTERM that we |
1081 | * caught and let the old handler take care of it. |
1082 | */ |
1083 | raise(exit_signal); |
1084 | } |
1085 | |
1086 | logmsg("============> mqttd quits" ); |
1087 | return 0; |
1088 | } |
1089 | |