1#include "duckdb/execution/index/art/node48.hpp"
2
3#include "duckdb/execution/index/art/art.hpp"
4#include "duckdb/execution/index/art/node.hpp"
5#include "duckdb/execution/index/art/node16.hpp"
6#include "duckdb/execution/index/art/node256.hpp"
7#include "duckdb/storage/meta_block_reader.hpp"
8#include "duckdb/storage/meta_block_writer.hpp"
9
10namespace duckdb {
11
12Node48 &Node48::New(ART &art, Node &node) {
13
14 node.SetPtr(Node::GetAllocator(art, type: NType::NODE_48).New());
15 node.type = (uint8_t)NType::NODE_48;
16 auto &n48 = Node48::Get(art, ptr: node);
17
18 n48.count = 0;
19 n48.prefix.Initialize();
20
21 for (idx_t i = 0; i < Node::NODE_256_CAPACITY; i++) {
22 n48.child_index[i] = Node::EMPTY_MARKER;
23 }
24
25 // necessary for faster child insertion/deletion
26 for (idx_t i = 0; i < Node::NODE_48_CAPACITY; i++) {
27 n48.children[i].Reset();
28 }
29
30 return n48;
31}
32
33void Node48::Free(ART &art, Node &node) {
34
35 D_ASSERT(node.IsSet());
36 D_ASSERT(!node.IsSwizzled());
37
38 auto &n48 = Node48::Get(art, ptr: node);
39
40 if (!n48.count) {
41 return;
42 }
43
44 // free all children
45 for (idx_t i = 0; i < Node::NODE_256_CAPACITY; i++) {
46 if (n48.child_index[i] != Node::EMPTY_MARKER) {
47 Node::Free(art, node&: n48.children[n48.child_index[i]]);
48 }
49 }
50}
51
52Node48 &Node48::GrowNode16(ART &art, Node &node48, Node &node16) {
53
54 auto &n16 = Node16::Get(art, ptr: node16);
55 auto &n48 = Node48::New(art, node&: node48);
56
57 n48.count = n16.count;
58 n48.prefix.Move(other&: n16.prefix);
59
60 for (idx_t i = 0; i < Node::NODE_256_CAPACITY; i++) {
61 n48.child_index[i] = Node::EMPTY_MARKER;
62 }
63
64 for (idx_t i = 0; i < n16.count; i++) {
65 n48.child_index[n16.key[i]] = i;
66 n48.children[i] = n16.children[i];
67 }
68
69 // necessary for faster child insertion/deletion
70 for (idx_t i = n16.count; i < Node::NODE_48_CAPACITY; i++) {
71 n48.children[i].Reset();
72 }
73
74 n16.count = 0;
75 Node::Free(art, node&: node16);
76 return n48;
77}
78
79Node48 &Node48::ShrinkNode256(ART &art, Node &node48, Node &node256) {
80
81 auto &n48 = Node48::New(art, node&: node48);
82 auto &n256 = Node256::Get(art, ptr: node256);
83
84 n48.count = 0;
85 n48.prefix.Move(other&: n256.prefix);
86
87 for (idx_t i = 0; i < Node::NODE_256_CAPACITY; i++) {
88 D_ASSERT(n48.count <= Node::NODE_48_CAPACITY);
89 if (n256.children[i].IsSet()) {
90 n48.child_index[i] = n48.count;
91 n48.children[n48.count] = n256.children[i];
92 n48.count++;
93 } else {
94 n48.child_index[i] = Node::EMPTY_MARKER;
95 }
96 }
97
98 // necessary for faster child insertion/deletion
99 for (idx_t i = n48.count; i < Node::NODE_48_CAPACITY; i++) {
100 n48.children[i].Reset();
101 }
102
103 n256.count = 0;
104 Node::Free(art, node&: node256);
105 return n48;
106}
107
108void Node48::InitializeMerge(ART &art, const ARTFlags &flags) {
109
110 for (idx_t i = 0; i < Node::NODE_256_CAPACITY; i++) {
111 if (child_index[i] != Node::EMPTY_MARKER) {
112 children[child_index[i]].InitializeMerge(art, flags);
113 }
114 }
115}
116
117void Node48::InsertChild(ART &art, Node &node, const uint8_t byte, const Node child) {
118
119 D_ASSERT(node.IsSet());
120 D_ASSERT(!node.IsSwizzled());
121 auto &n48 = Node48::Get(art, ptr: node);
122
123 // ensure that there is no other child at the same byte
124 D_ASSERT(n48.child_index[byte] == Node::EMPTY_MARKER);
125
126 // insert new child node into node
127 if (n48.count < Node::NODE_48_CAPACITY) {
128 // still space, just insert the child
129 idx_t child_pos = n48.count;
130 if (n48.children[child_pos].IsSet()) {
131 // find an empty position in the node list if the current position is occupied
132 child_pos = 0;
133 while (n48.children[child_pos].IsSet()) {
134 child_pos++;
135 }
136 }
137 n48.children[child_pos] = child;
138 n48.child_index[byte] = child_pos;
139 n48.count++;
140
141 } else {
142 // node is full, grow to Node256
143 auto node48 = node;
144 Node256::GrowNode48(art, node256&: node, node48);
145 Node256::InsertChild(art, node, byte, child);
146 }
147}
148
149void Node48::DeleteChild(ART &art, Node &node, const uint8_t byte) {
150
151 D_ASSERT(node.IsSet());
152 D_ASSERT(!node.IsSwizzled());
153 auto &n48 = Node48::Get(art, ptr: node);
154
155 // free the child and decrease the count
156 Node::Free(art, node&: n48.children[n48.child_index[byte]]);
157 n48.child_index[byte] = Node::EMPTY_MARKER;
158 n48.count--;
159
160 // shrink node to Node16
161 if (n48.count < Node::NODE_48_SHRINK_THRESHOLD) {
162 auto node48 = node;
163 Node16::ShrinkNode48(art, node16&: node, node48);
164 }
165}
166
167optional_ptr<Node> Node48::GetNextChild(uint8_t &byte) {
168
169 for (idx_t i = byte; i < Node::NODE_256_CAPACITY; i++) {
170 if (child_index[i] != Node::EMPTY_MARKER) {
171 byte = i;
172 D_ASSERT(children[child_index[i]].IsSet());
173 return &children[child_index[i]];
174 }
175 }
176 return nullptr;
177}
178
179BlockPointer Node48::Serialize(ART &art, MetaBlockWriter &writer) {
180
181 // recurse into children and retrieve child block pointers
182 vector<BlockPointer> child_block_pointers;
183 for (idx_t i = 0; i < Node::NODE_48_CAPACITY; i++) {
184 child_block_pointers.push_back(x: children[i].Serialize(art, writer));
185 }
186
187 // get pointer and write fields
188 auto block_pointer = writer.GetBlockPointer();
189 writer.Write(element: NType::NODE_48);
190 writer.Write<uint8_t>(element: count);
191 prefix.Serialize(art, writer);
192
193 // write key values
194 for (idx_t i = 0; i < Node::NODE_256_CAPACITY; i++) {
195 writer.Write(element: child_index[i]);
196 }
197
198 // write child block pointers
199 for (auto &child_block_pointer : child_block_pointers) {
200 writer.Write(element: child_block_pointer.block_id);
201 writer.Write(element: child_block_pointer.offset);
202 }
203
204 return block_pointer;
205}
206
207void Node48::Deserialize(ART &art, MetaBlockReader &reader) {
208
209 count = reader.Read<uint8_t>();
210 prefix.Deserialize(art, reader);
211
212 // read key values
213 for (idx_t i = 0; i < Node::NODE_256_CAPACITY; i++) {
214 child_index[i] = reader.Read<uint8_t>();
215 }
216
217 // read child block pointers
218 for (idx_t i = 0; i < Node::NODE_48_CAPACITY; i++) {
219 children[i] = Node(reader);
220 }
221}
222
223void Node48::Vacuum(ART &art, const ARTFlags &flags) {
224
225 for (idx_t i = 0; i < Node::NODE_256_CAPACITY; i++) {
226 if (child_index[i] != Node::EMPTY_MARKER) {
227 Node::Vacuum(art, node&: children[child_index[i]], flags);
228 }
229 }
230}
231
232} // namespace duckdb
233