1 | #ifndef SIMDJSON_INLINE_DOCUMENT_STREAM_H |
2 | #define SIMDJSON_INLINE_DOCUMENT_STREAM_H |
3 | |
4 | #include "simdjson/dom/document_stream.h" |
5 | #include <algorithm> |
6 | #include <limits> |
7 | #include <stdexcept> |
8 | namespace simdjson { |
9 | namespace dom { |
10 | |
11 | #ifdef SIMDJSON_THREADS_ENABLED |
12 | inline void stage1_worker::finish() { |
13 | // After calling "run" someone would call finish() to wait |
14 | // for the end of the processing. |
15 | // This function will wait until either the thread has done |
16 | // the processing or, else, the destructor has been called. |
17 | std::unique_lock<std::mutex> lock(locking_mutex); |
18 | cond_var.wait(lock&: lock, p: [this]{return has_work == false;}); |
19 | } |
20 | |
21 | inline stage1_worker::~stage1_worker() { |
22 | // The thread may never outlive the stage1_worker instance |
23 | // and will always be stopped/joined before the stage1_worker |
24 | // instance is gone. |
25 | stop_thread(); |
26 | } |
27 | |
28 | inline void stage1_worker::start_thread() { |
29 | std::unique_lock<std::mutex> lock(locking_mutex); |
30 | if(thread.joinable()) { |
31 | return; // This should never happen but we never want to create more than one thread. |
32 | } |
33 | thread = std::thread([this]{ |
34 | while(true) { |
35 | std::unique_lock<std::mutex> thread_lock(locking_mutex); |
36 | // We wait for either "run" or "stop_thread" to be called. |
37 | cond_var.wait(lock&: thread_lock, p: [this]{return has_work || !can_work;}); |
38 | // If, for some reason, the stop_thread() method was called (i.e., the |
39 | // destructor of stage1_worker is called, then we want to immediately destroy |
40 | // the thread (and not do any more processing). |
41 | if(!can_work) { |
42 | break; |
43 | } |
44 | this->owner->stage1_thread_error = this->owner->run_stage1(p&: *this->stage1_thread_parser, |
45 | batch_start: this->_next_batch_start); |
46 | this->has_work = false; |
47 | // The condition variable call should be moved after thread_lock.unlock() for performance |
48 | // reasons but thread sanitizers may report it as a data race if we do. |
49 | // See https://stackoverflow.com/questions/35775501/c-should-condition-variable-be-notified-under-lock |
50 | cond_var.notify_one(); // will notify "finish" |
51 | thread_lock.unlock(); |
52 | } |
53 | } |
54 | ); |
55 | } |
56 | |
57 | |
58 | inline void stage1_worker::stop_thread() { |
59 | std::unique_lock<std::mutex> lock(locking_mutex); |
60 | // We have to make sure that all locks can be released. |
61 | can_work = false; |
62 | has_work = false; |
63 | cond_var.notify_all(); |
64 | lock.unlock(); |
65 | if(thread.joinable()) { |
66 | thread.join(); |
67 | } |
68 | } |
69 | |
70 | inline void stage1_worker::run(document_stream * ds, dom::parser * stage1, size_t next_batch_start) { |
71 | std::unique_lock<std::mutex> lock(locking_mutex); |
72 | owner = ds; |
73 | _next_batch_start = next_batch_start; |
74 | stage1_thread_parser = stage1; |
75 | has_work = true; |
76 | // The condition variable call should be moved after thread_lock.unlock() for performance |
77 | // reasons but thread sanitizers may report it as a data race if we do. |
78 | // See https://stackoverflow.com/questions/35775501/c-should-condition-variable-be-notified-under-lock |
79 | cond_var.notify_one(); // will notify the thread lock that we have work |
80 | lock.unlock(); |
81 | } |
82 | #endif |
83 | |
84 | simdjson_inline document_stream::document_stream( |
85 | dom::parser &_parser, |
86 | const uint8_t *_buf, |
87 | size_t _len, |
88 | size_t _batch_size |
89 | ) noexcept |
90 | : parser{&_parser}, |
91 | buf{_buf}, |
92 | len{_len}, |
93 | batch_size{_batch_size <= MINIMAL_BATCH_SIZE ? MINIMAL_BATCH_SIZE : _batch_size}, |
94 | error{SUCCESS} |
95 | #ifdef SIMDJSON_THREADS_ENABLED |
96 | , use_thread(_parser.threaded) // we need to make a copy because _parser.threaded can change |
97 | #endif |
98 | { |
99 | #ifdef SIMDJSON_THREADS_ENABLED |
100 | if(worker.get() == nullptr) { |
101 | error = MEMALLOC; |
102 | } |
103 | #endif |
104 | } |
105 | |
106 | simdjson_inline document_stream::document_stream() noexcept |
107 | : parser{nullptr}, |
108 | buf{nullptr}, |
109 | len{0}, |
110 | batch_size{0}, |
111 | error{UNINITIALIZED} |
112 | #ifdef SIMDJSON_THREADS_ENABLED |
113 | , use_thread(false) |
114 | #endif |
115 | { |
116 | } |
117 | |
118 | simdjson_inline document_stream::~document_stream() noexcept { |
119 | #ifdef SIMDJSON_THREADS_ENABLED |
120 | worker.reset(); |
121 | #endif |
122 | } |
123 | |
124 | simdjson_inline document_stream::iterator::iterator() noexcept |
125 | : stream{nullptr}, finished{true} { |
126 | } |
127 | |
128 | simdjson_inline document_stream::iterator document_stream::begin() noexcept { |
129 | start(); |
130 | // If there are no documents, we're finished. |
131 | return iterator(this, error == EMPTY); |
132 | } |
133 | |
134 | simdjson_inline document_stream::iterator document_stream::end() noexcept { |
135 | return iterator(this, true); |
136 | } |
137 | |
138 | simdjson_inline document_stream::iterator::iterator(document_stream* _stream, bool is_end) noexcept |
139 | : stream{_stream}, finished{is_end} { |
140 | } |
141 | |
142 | simdjson_inline document_stream::iterator::reference document_stream::iterator::operator*() noexcept { |
143 | // Note that in case of error, we do not yet mark |
144 | // the iterator as "finished": this detection is done |
145 | // in the operator++ function since it is possible |
146 | // to call operator++ repeatedly while omitting |
147 | // calls to operator*. |
148 | if (stream->error) { return stream->error; } |
149 | return stream->parser->doc.root(); |
150 | } |
151 | |
152 | simdjson_inline document_stream::iterator& document_stream::iterator::operator++() noexcept { |
153 | // If there is an error, then we want the iterator |
154 | // to be finished, no matter what. (E.g., we do not |
155 | // keep generating documents with errors, or go beyond |
156 | // a document with errors.) |
157 | // |
158 | // Users do not have to call "operator*()" when they use operator++, |
159 | // so we need to end the stream in the operator++ function. |
160 | // |
161 | // Note that setting finished = true is essential otherwise |
162 | // we would enter an infinite loop. |
163 | if (stream->error) { finished = true; } |
164 | // Note that stream->error() is guarded against error conditions |
165 | // (it will immediately return if stream->error casts to false). |
166 | // In effect, this next function does nothing when (stream->error) |
167 | // is true (hence the risk of an infinite loop). |
168 | stream->next(); |
169 | // If that was the last document, we're finished. |
170 | // It is the only type of error we do not want to appear |
171 | // in operator*. |
172 | if (stream->error == EMPTY) { finished = true; } |
173 | // If we had any other kind of error (not EMPTY) then we want |
174 | // to pass it along to the operator* and we cannot mark the result |
175 | // as "finished" just yet. |
176 | return *this; |
177 | } |
178 | |
179 | simdjson_inline bool document_stream::iterator::operator!=(const document_stream::iterator &other) const noexcept { |
180 | return finished != other.finished; |
181 | } |
182 | |
183 | inline void document_stream::start() noexcept { |
184 | if (error) { return; } |
185 | error = parser->ensure_capacity(desired_capacity: batch_size); |
186 | if (error) { return; } |
187 | // Always run the first stage 1 parse immediately |
188 | batch_start = 0; |
189 | error = run_stage1(p&: *parser, batch_start); |
190 | while(error == EMPTY) { |
191 | // In exceptional cases, we may start with an empty block |
192 | batch_start = next_batch_start(); |
193 | if (batch_start >= len) { return; } |
194 | error = run_stage1(p&: *parser, batch_start); |
195 | } |
196 | if (error) { return; } |
197 | #ifdef SIMDJSON_THREADS_ENABLED |
198 | if (use_thread && next_batch_start() < len) { |
199 | // Kick off the first thread if needed |
200 | error = stage1_thread_parser.ensure_capacity(desired_capacity: batch_size); |
201 | if (error) { return; } |
202 | worker->start_thread(); |
203 | start_stage1_thread(); |
204 | if (error) { return; } |
205 | } |
206 | #endif // SIMDJSON_THREADS_ENABLED |
207 | next(); |
208 | } |
209 | |
210 | simdjson_inline size_t document_stream::iterator::current_index() const noexcept { |
211 | return stream->doc_index; |
212 | } |
213 | |
214 | simdjson_inline std::string_view document_stream::iterator::source() const noexcept { |
215 | const char* start = reinterpret_cast<const char*>(stream->buf) + current_index(); |
216 | bool object_or_array = ((*start == '[') || (*start == '{')); |
217 | if(object_or_array) { |
218 | size_t next_doc_index = stream->batch_start + stream->parser->implementation->structural_indexes[stream->parser->implementation->next_structural_index - 1]; |
219 | return std::string_view(start, next_doc_index - current_index() + 1); |
220 | } else { |
221 | size_t next_doc_index = stream->batch_start + stream->parser->implementation->structural_indexes[stream->parser->implementation->next_structural_index]; |
222 | return std::string_view(reinterpret_cast<const char*>(stream->buf) + current_index(), next_doc_index - current_index() - 1); |
223 | } |
224 | } |
225 | |
226 | |
227 | inline void document_stream::next() noexcept { |
228 | // We always exit at once, once in an error condition. |
229 | if (error) { return; } |
230 | |
231 | // Load the next document from the batch |
232 | doc_index = batch_start + parser->implementation->structural_indexes[parser->implementation->next_structural_index]; |
233 | error = parser->implementation->stage2_next(doc&: parser->doc); |
234 | // If that was the last document in the batch, load another batch (if available) |
235 | while (error == EMPTY) { |
236 | batch_start = next_batch_start(); |
237 | if (batch_start >= len) { break; } |
238 | |
239 | #ifdef SIMDJSON_THREADS_ENABLED |
240 | if(use_thread) { |
241 | load_from_stage1_thread(); |
242 | } else { |
243 | error = run_stage1(p&: *parser, batch_start); |
244 | } |
245 | #else |
246 | error = run_stage1(*parser, batch_start); |
247 | #endif |
248 | if (error) { continue; } // If the error was EMPTY, we may want to load another batch. |
249 | // Run stage 2 on the first document in the batch |
250 | doc_index = batch_start + parser->implementation->structural_indexes[parser->implementation->next_structural_index]; |
251 | error = parser->implementation->stage2_next(doc&: parser->doc); |
252 | } |
253 | } |
254 | inline size_t document_stream::size_in_bytes() const noexcept { |
255 | return len; |
256 | } |
257 | |
258 | inline size_t document_stream::truncated_bytes() const noexcept { |
259 | if(error == CAPACITY) { return len - batch_start; } |
260 | return parser->implementation->structural_indexes[parser->implementation->n_structural_indexes] - parser->implementation->structural_indexes[parser->implementation->n_structural_indexes + 1]; |
261 | } |
262 | |
263 | inline size_t document_stream::next_batch_start() const noexcept { |
264 | return batch_start + parser->implementation->structural_indexes[parser->implementation->n_structural_indexes]; |
265 | } |
266 | |
267 | inline error_code document_stream::run_stage1(dom::parser &p, size_t _batch_start) noexcept { |
268 | size_t remaining = len - _batch_start; |
269 | if (remaining <= batch_size) { |
270 | return p.implementation->stage1(buf: &buf[_batch_start], len: remaining, streaming: stage1_mode::streaming_final); |
271 | } else { |
272 | return p.implementation->stage1(buf: &buf[_batch_start], len: batch_size, streaming: stage1_mode::streaming_partial); |
273 | } |
274 | } |
275 | |
276 | #ifdef SIMDJSON_THREADS_ENABLED |
277 | |
278 | inline void document_stream::load_from_stage1_thread() noexcept { |
279 | worker->finish(); |
280 | // Swap to the parser that was loaded up in the thread. Make sure the parser has |
281 | // enough memory to swap to, as well. |
282 | std::swap(a&: *parser, b&: stage1_thread_parser); |
283 | error = stage1_thread_error; |
284 | if (error) { return; } |
285 | |
286 | // If there's anything left, start the stage 1 thread! |
287 | if (next_batch_start() < len) { |
288 | start_stage1_thread(); |
289 | } |
290 | } |
291 | |
292 | inline void document_stream::start_stage1_thread() noexcept { |
293 | // we call the thread on a lambda that will update |
294 | // this->stage1_thread_error |
295 | // there is only one thread that may write to this value |
296 | // TODO this is NOT exception-safe. |
297 | this->stage1_thread_error = UNINITIALIZED; // In case something goes wrong, make sure it's an error |
298 | size_t _next_batch_start = this->next_batch_start(); |
299 | |
300 | worker->run(ds: this, stage1: & this->stage1_thread_parser, next_batch_start: _next_batch_start); |
301 | } |
302 | |
303 | #endif // SIMDJSON_THREADS_ENABLED |
304 | |
305 | } // namespace dom |
306 | |
307 | simdjson_inline simdjson_result<dom::document_stream>::simdjson_result() noexcept |
308 | : simdjson_result_base() { |
309 | } |
310 | simdjson_inline simdjson_result<dom::document_stream>::simdjson_result(error_code error) noexcept |
311 | : simdjson_result_base(error) { |
312 | } |
313 | simdjson_inline simdjson_result<dom::document_stream>::simdjson_result(dom::document_stream &&value) noexcept |
314 | : simdjson_result_base(std::forward<dom::document_stream>(t&: value)) { |
315 | } |
316 | |
317 | #if SIMDJSON_EXCEPTIONS |
318 | simdjson_inline dom::document_stream::iterator simdjson_result<dom::document_stream>::begin() noexcept(false) { |
319 | if (error()) { throw simdjson_error(error()); } |
320 | return first.begin(); |
321 | } |
322 | simdjson_inline dom::document_stream::iterator simdjson_result<dom::document_stream>::end() noexcept(false) { |
323 | if (error()) { throw simdjson_error(error()); } |
324 | return first.end(); |
325 | } |
326 | #else // SIMDJSON_EXCEPTIONS |
327 | #ifndef SIMDJSON_DISABLE_DEPRECATED_API |
328 | simdjson_inline dom::document_stream::iterator simdjson_result<dom::document_stream>::begin() noexcept { |
329 | first.error = error(); |
330 | return first.begin(); |
331 | } |
332 | simdjson_inline dom::document_stream::iterator simdjson_result<dom::document_stream>::end() noexcept { |
333 | first.error = error(); |
334 | return first.end(); |
335 | } |
336 | #endif // SIMDJSON_DISABLE_DEPRECATED_API |
337 | #endif // SIMDJSON_EXCEPTIONS |
338 | |
339 | } // namespace simdjson |
340 | #endif // SIMDJSON_INLINE_DOCUMENT_STREAM_H |
341 | |