1/*
2 * Copyright (c) 2017, Matias Fontanini
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are
7 * met:
8 *
9 * * Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * * Redistributions in binary form must reproduce the above
12 * copyright notice, this list of conditions and the following disclaimer
13 * in the documentation and/or other materials provided with the
14 * distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
17 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
18 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
19 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
20 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
26 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 *
28 */
29
30#include "message.h"
31#include "message_internal.h"
32
33using std::chrono::milliseconds;
34
35namespace cppkafka {
36
37void dummy_deleter(rd_kafka_message_t*) {
38
39}
40
41Message Message::make_non_owning(rd_kafka_message_t* handle) {
42 return Message(handle, NonOwningTag());
43}
44
45Message::Message()
46: handle_(nullptr, nullptr),
47 user_data_(nullptr) {
48
49}
50
51Message::Message(rd_kafka_message_t* handle)
52: Message(HandlePtr(handle, &rd_kafka_message_destroy)) {
53
54}
55
56Message::Message(rd_kafka_message_t* handle, NonOwningTag)
57: Message(HandlePtr(handle, &dummy_deleter)) {
58
59}
60
61Message::Message(HandlePtr handle)
62: handle_(move(handle)),
63 payload_(handle_ ? Buffer((const Buffer::DataType*)handle_->payload, handle_->len) : Buffer()),
64 key_(handle_ ? Buffer((const Buffer::DataType*)handle_->key, handle_->key_len) : Buffer()),
65 user_data_(handle_ ? handle_->_private : nullptr) {
66#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION)
67 // get the header list if any
68 if (handle_) {
69 rd_kafka_headers_t* headers_handle;
70 Error error = rd_kafka_message_headers(handle_.get(), &headers_handle);
71 if (!error) {
72 header_list_ = HeaderListType::make_non_owning(headers_handle);
73 }
74 }
75#endif
76}
77
78Message& Message::load_internal() {
79 if (user_data_) {
80 MessageInternal* mi = static_cast<MessageInternal*>(user_data_);
81 user_data_ = mi->get_user_data();
82 internal_ = mi->get_internal();
83 }
84 return *this;
85}
86
87boost::optional<MessageTimestamp> Message::get_timestamp() const {
88 rd_kafka_timestamp_type_t type = RD_KAFKA_TIMESTAMP_NOT_AVAILABLE;
89 int64_t timestamp = rd_kafka_message_timestamp(handle_.get(), &type);
90 if (timestamp == -1 || type == RD_KAFKA_TIMESTAMP_NOT_AVAILABLE) {
91 return {};
92 }
93 return MessageTimestamp(std::chrono::milliseconds(timestamp),
94 static_cast<MessageTimestamp::TimestampType>(type));
95}
96
97} // cppkafka
98