1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file |
3 | // distributed with this work for additional information |
4 | // regarding copyright ownership. The ASF licenses this file |
5 | // to you under the Apache License, Version 2.0 (the |
6 | // "License"); you may not use this file except in compliance |
7 | // with the License. You may obtain a copy of the License at |
8 | // |
9 | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | // |
11 | // Unless required by applicable law or agreed to in writing, |
12 | // software distributed under the License is distributed on an |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | // KIND, either express or implied. See the License for the |
15 | // specific language governing permissions and limitations |
16 | // under the License. |
17 | |
18 | // Internal metadata serialization matters |
19 | |
20 | #ifndef ARROW_IPC_METADATA_INTERNAL_H |
21 | #define ARROW_IPC_METADATA_INTERNAL_H |
22 | |
23 | #include <cstdint> |
24 | #include <cstring> |
25 | #include <memory> |
26 | #include <string> |
27 | #include <vector> |
28 | |
29 | #include <flatbuffers/flatbuffers.h> |
30 | |
31 | #include "arrow/buffer.h" |
32 | #include "arrow/ipc/Schema_generated.h" |
33 | #include "arrow/ipc/dictionary.h" // IYWU pragma: keep |
34 | #include "arrow/ipc/message.h" |
35 | #include "arrow/memory_pool.h" |
36 | #include "arrow/sparse_tensor.h" |
37 | #include "arrow/status.h" |
38 | |
39 | namespace arrow { |
40 | |
41 | class DataType; |
42 | class Schema; |
43 | class Tensor; |
44 | class SparseTensor; |
45 | |
46 | namespace flatbuf = org::apache::arrow::flatbuf; |
47 | |
48 | namespace io { |
49 | |
50 | class OutputStream; |
51 | |
52 | } // namespace io |
53 | |
54 | namespace ipc { |
55 | |
56 | class DictionaryMemo; |
57 | |
58 | namespace internal { |
59 | |
60 | static constexpr flatbuf::MetadataVersion kCurrentMetadataVersion = |
61 | flatbuf::MetadataVersion_V4; |
62 | |
63 | static constexpr flatbuf::MetadataVersion kMinMetadataVersion = |
64 | flatbuf::MetadataVersion_V4; |
65 | |
66 | MetadataVersion GetMetadataVersion(flatbuf::MetadataVersion version); |
67 | |
68 | static constexpr const char* kArrowMagicBytes = "ARROW1" ; |
69 | |
70 | struct FieldMetadata { |
71 | int64_t length; |
72 | int64_t null_count; |
73 | int64_t offset; |
74 | }; |
75 | |
76 | struct BufferMetadata { |
77 | /// The relative offset into the memory page to the starting byte of the buffer |
78 | int64_t offset; |
79 | |
80 | /// Absolute length in bytes of the buffer |
81 | int64_t length; |
82 | }; |
83 | |
84 | struct FileBlock { |
85 | int64_t offset; |
86 | int32_t metadata_length; |
87 | int64_t body_length; |
88 | }; |
89 | |
90 | // Read interface classes. We do not fully deserialize the flatbuffers so that |
91 | // individual fields metadata can be retrieved from very large schema without |
92 | // |
93 | |
94 | // Retrieve a list of all the dictionary ids and types required by the schema for |
95 | // reconstruction. The presumption is that these will be loaded either from |
96 | // the stream or file (or they may already be somewhere else in memory) |
97 | Status GetDictionaryTypes(const void* opaque_schema, DictionaryTypeMap* id_to_field); |
98 | |
99 | // Construct a complete Schema from the message. May be expensive for very |
100 | // large schemas if you are only interested in a few fields |
101 | Status GetSchema(const void* opaque_schema, const DictionaryMemo& dictionary_memo, |
102 | std::shared_ptr<Schema>* out); |
103 | |
104 | Status GetTensorMetadata(const Buffer& metadata, std::shared_ptr<DataType>* type, |
105 | std::vector<int64_t>* shape, std::vector<int64_t>* strides, |
106 | std::vector<std::string>* dim_names); |
107 | |
108 | // EXPERIMENTAL: Extracting metadata of a sparse tensor from the message |
109 | Status GetSparseTensorMetadata(const Buffer& metadata, std::shared_ptr<DataType>* type, |
110 | std::vector<int64_t>* shape, |
111 | std::vector<std::string>* dim_names, int64_t* length, |
112 | SparseTensorFormat::type* sparse_tensor_format_id); |
113 | |
114 | /// Write a serialized message metadata with a length-prefix and padding to an |
115 | /// 8-byte offset. Does not make assumptions about whether the stream is |
116 | /// aligned already |
117 | /// |
118 | /// <message_size: int32><message: const void*><padding> |
119 | /// |
120 | /// \param[in] message a buffer containing the metadata to write |
121 | /// \param[in] alignment the size multiple of the total message size including |
122 | /// length prefix, metadata, and padding. Usually 8 or 64 |
123 | /// \param[in,out] file the OutputStream to write to |
124 | /// \param[out] message_length the total size of the payload written including |
125 | /// padding |
126 | /// \return Status |
127 | Status WriteMessage(const Buffer& message, int32_t alignment, io::OutputStream* file, |
128 | int32_t* message_length); |
129 | |
130 | // Serialize arrow::Schema as a Flatbuffer |
131 | // |
132 | // \param[in] schema a Schema instance |
133 | // \param[in,out] dictionary_memo class for tracking dictionaries and assigning |
134 | // dictionary ids |
135 | // \param[out] out the serialized arrow::Buffer |
136 | // \return Status outcome |
137 | Status WriteSchemaMessage(const Schema& schema, DictionaryMemo* dictionary_memo, |
138 | std::shared_ptr<Buffer>* out); |
139 | |
140 | Status WriteRecordBatchMessage(const int64_t length, const int64_t body_length, |
141 | const std::vector<FieldMetadata>& nodes, |
142 | const std::vector<BufferMetadata>& buffers, |
143 | std::shared_ptr<Buffer>* out); |
144 | |
145 | Status WriteTensorMessage(const Tensor& tensor, const int64_t buffer_start_offset, |
146 | std::shared_ptr<Buffer>* out); |
147 | |
148 | Status WriteSparseTensorMessage(const SparseTensor& sparse_tensor, int64_t body_length, |
149 | const std::vector<BufferMetadata>& buffers, |
150 | std::shared_ptr<Buffer>* out); |
151 | |
152 | Status (const Schema& schema, const std::vector<FileBlock>& dictionaries, |
153 | const std::vector<FileBlock>& record_batches, |
154 | DictionaryMemo* dictionary_memo, io::OutputStream* out); |
155 | |
156 | Status WriteDictionaryMessage(const int64_t id, const int64_t length, |
157 | const int64_t body_length, |
158 | const std::vector<FieldMetadata>& nodes, |
159 | const std::vector<BufferMetadata>& buffers, |
160 | std::shared_ptr<Buffer>* out); |
161 | |
162 | static inline Status WriteFlatbufferBuilder(flatbuffers::FlatBufferBuilder& fbb, |
163 | std::shared_ptr<Buffer>* out) { |
164 | int32_t size = fbb.GetSize(); |
165 | |
166 | std::shared_ptr<Buffer> result; |
167 | RETURN_NOT_OK(AllocateBuffer(default_memory_pool(), size, &result)); |
168 | |
169 | uint8_t* dst = result->mutable_data(); |
170 | memcpy(dst, fbb.GetBufferPointer(), size); |
171 | *out = result; |
172 | return Status::OK(); |
173 | } |
174 | |
175 | } // namespace internal |
176 | } // namespace ipc |
177 | } // namespace arrow |
178 | |
179 | #endif // ARROW_IPC_METADATA_H |
180 | |