1#ifndef SIMDJSON_INLINE_DOCUMENT_STREAM_H
2#define SIMDJSON_INLINE_DOCUMENT_STREAM_H
3
4#include "simdjson/dom/document_stream.h"
5#include <algorithm>
6#include <limits>
7#include <stdexcept>
8namespace simdjson {
9namespace dom {
10
11#ifdef SIMDJSON_THREADS_ENABLED
12inline void stage1_worker::finish() {
13 // After calling "run" someone would call finish() to wait
14 // for the end of the processing.
15 // This function will wait until either the thread has done
16 // the processing or, else, the destructor has been called.
17 std::unique_lock<std::mutex> lock(locking_mutex);
18 cond_var.wait(lock&: lock, p: [this]{return has_work == false;});
19}
20
21inline stage1_worker::~stage1_worker() {
22 // The thread may never outlive the stage1_worker instance
23 // and will always be stopped/joined before the stage1_worker
24 // instance is gone.
25 stop_thread();
26}
27
28inline void stage1_worker::start_thread() {
29 std::unique_lock<std::mutex> lock(locking_mutex);
30 if(thread.joinable()) {
31 return; // This should never happen but we never want to create more than one thread.
32 }
33 thread = std::thread([this]{
34 while(true) {
35 std::unique_lock<std::mutex> thread_lock(locking_mutex);
36 // We wait for either "run" or "stop_thread" to be called.
37 cond_var.wait(lock&: thread_lock, p: [this]{return has_work || !can_work;});
38 // If, for some reason, the stop_thread() method was called (i.e., the
39 // destructor of stage1_worker is called, then we want to immediately destroy
40 // the thread (and not do any more processing).
41 if(!can_work) {
42 break;
43 }
44 this->owner->stage1_thread_error = this->owner->run_stage1(p&: *this->stage1_thread_parser,
45 batch_start: this->_next_batch_start);
46 this->has_work = false;
47 // The condition variable call should be moved after thread_lock.unlock() for performance
48 // reasons but thread sanitizers may report it as a data race if we do.
49 // See https://stackoverflow.com/questions/35775501/c-should-condition-variable-be-notified-under-lock
50 cond_var.notify_one(); // will notify "finish"
51 thread_lock.unlock();
52 }
53 }
54 );
55}
56
57
58inline void stage1_worker::stop_thread() {
59 std::unique_lock<std::mutex> lock(locking_mutex);
60 // We have to make sure that all locks can be released.
61 can_work = false;
62 has_work = false;
63 cond_var.notify_all();
64 lock.unlock();
65 if(thread.joinable()) {
66 thread.join();
67 }
68}
69
70inline void stage1_worker::run(document_stream * ds, dom::parser * stage1, size_t next_batch_start) {
71 std::unique_lock<std::mutex> lock(locking_mutex);
72 owner = ds;
73 _next_batch_start = next_batch_start;
74 stage1_thread_parser = stage1;
75 has_work = true;
76 // The condition variable call should be moved after thread_lock.unlock() for performance
77 // reasons but thread sanitizers may report it as a data race if we do.
78 // See https://stackoverflow.com/questions/35775501/c-should-condition-variable-be-notified-under-lock
79 cond_var.notify_one(); // will notify the thread lock that we have work
80 lock.unlock();
81}
82#endif
83
84simdjson_inline document_stream::document_stream(
85 dom::parser &_parser,
86 const uint8_t *_buf,
87 size_t _len,
88 size_t _batch_size
89) noexcept
90 : parser{&_parser},
91 buf{_buf},
92 len{_len},
93 batch_size{_batch_size <= MINIMAL_BATCH_SIZE ? MINIMAL_BATCH_SIZE : _batch_size},
94 error{SUCCESS}
95#ifdef SIMDJSON_THREADS_ENABLED
96 , use_thread(_parser.threaded) // we need to make a copy because _parser.threaded can change
97#endif
98{
99#ifdef SIMDJSON_THREADS_ENABLED
100 if(worker.get() == nullptr) {
101 error = MEMALLOC;
102 }
103#endif
104}
105
106simdjson_inline document_stream::document_stream() noexcept
107 : parser{nullptr},
108 buf{nullptr},
109 len{0},
110 batch_size{0},
111 error{UNINITIALIZED}
112#ifdef SIMDJSON_THREADS_ENABLED
113 , use_thread(false)
114#endif
115{
116}
117
118simdjson_inline document_stream::~document_stream() noexcept {
119#ifdef SIMDJSON_THREADS_ENABLED
120 worker.reset();
121#endif
122}
123
124simdjson_inline document_stream::iterator::iterator() noexcept
125 : stream{nullptr}, finished{true} {
126}
127
128simdjson_inline document_stream::iterator document_stream::begin() noexcept {
129 start();
130 // If there are no documents, we're finished.
131 return iterator(this, error == EMPTY);
132}
133
134simdjson_inline document_stream::iterator document_stream::end() noexcept {
135 return iterator(this, true);
136}
137
138simdjson_inline document_stream::iterator::iterator(document_stream* _stream, bool is_end) noexcept
139 : stream{_stream}, finished{is_end} {
140}
141
142simdjson_inline document_stream::iterator::reference document_stream::iterator::operator*() noexcept {
143 // Note that in case of error, we do not yet mark
144 // the iterator as "finished": this detection is done
145 // in the operator++ function since it is possible
146 // to call operator++ repeatedly while omitting
147 // calls to operator*.
148 if (stream->error) { return stream->error; }
149 return stream->parser->doc.root();
150}
151
152simdjson_inline document_stream::iterator& document_stream::iterator::operator++() noexcept {
153 // If there is an error, then we want the iterator
154 // to be finished, no matter what. (E.g., we do not
155 // keep generating documents with errors, or go beyond
156 // a document with errors.)
157 //
158 // Users do not have to call "operator*()" when they use operator++,
159 // so we need to end the stream in the operator++ function.
160 //
161 // Note that setting finished = true is essential otherwise
162 // we would enter an infinite loop.
163 if (stream->error) { finished = true; }
164 // Note that stream->error() is guarded against error conditions
165 // (it will immediately return if stream->error casts to false).
166 // In effect, this next function does nothing when (stream->error)
167 // is true (hence the risk of an infinite loop).
168 stream->next();
169 // If that was the last document, we're finished.
170 // It is the only type of error we do not want to appear
171 // in operator*.
172 if (stream->error == EMPTY) { finished = true; }
173 // If we had any other kind of error (not EMPTY) then we want
174 // to pass it along to the operator* and we cannot mark the result
175 // as "finished" just yet.
176 return *this;
177}
178
179simdjson_inline bool document_stream::iterator::operator!=(const document_stream::iterator &other) const noexcept {
180 return finished != other.finished;
181}
182
183inline void document_stream::start() noexcept {
184 if (error) { return; }
185 error = parser->ensure_capacity(desired_capacity: batch_size);
186 if (error) { return; }
187 // Always run the first stage 1 parse immediately
188 batch_start = 0;
189 error = run_stage1(p&: *parser, batch_start);
190 while(error == EMPTY) {
191 // In exceptional cases, we may start with an empty block
192 batch_start = next_batch_start();
193 if (batch_start >= len) { return; }
194 error = run_stage1(p&: *parser, batch_start);
195 }
196 if (error) { return; }
197#ifdef SIMDJSON_THREADS_ENABLED
198 if (use_thread && next_batch_start() < len) {
199 // Kick off the first thread if needed
200 error = stage1_thread_parser.ensure_capacity(desired_capacity: batch_size);
201 if (error) { return; }
202 worker->start_thread();
203 start_stage1_thread();
204 if (error) { return; }
205 }
206#endif // SIMDJSON_THREADS_ENABLED
207 next();
208}
209
210simdjson_inline size_t document_stream::iterator::current_index() const noexcept {
211 return stream->doc_index;
212}
213
214simdjson_inline std::string_view document_stream::iterator::source() const noexcept {
215 const char* start = reinterpret_cast<const char*>(stream->buf) + current_index();
216 bool object_or_array = ((*start == '[') || (*start == '{'));
217 if(object_or_array) {
218 size_t next_doc_index = stream->batch_start + stream->parser->implementation->structural_indexes[stream->parser->implementation->next_structural_index - 1];
219 return std::string_view(start, next_doc_index - current_index() + 1);
220 } else {
221 size_t next_doc_index = stream->batch_start + stream->parser->implementation->structural_indexes[stream->parser->implementation->next_structural_index];
222 return std::string_view(reinterpret_cast<const char*>(stream->buf) + current_index(), next_doc_index - current_index() - 1);
223 }
224}
225
226
227inline void document_stream::next() noexcept {
228 // We always exit at once, once in an error condition.
229 if (error) { return; }
230
231 // Load the next document from the batch
232 doc_index = batch_start + parser->implementation->structural_indexes[parser->implementation->next_structural_index];
233 error = parser->implementation->stage2_next(doc&: parser->doc);
234 // If that was the last document in the batch, load another batch (if available)
235 while (error == EMPTY) {
236 batch_start = next_batch_start();
237 if (batch_start >= len) { break; }
238
239#ifdef SIMDJSON_THREADS_ENABLED
240 if(use_thread) {
241 load_from_stage1_thread();
242 } else {
243 error = run_stage1(p&: *parser, batch_start);
244 }
245#else
246 error = run_stage1(*parser, batch_start);
247#endif
248 if (error) { continue; } // If the error was EMPTY, we may want to load another batch.
249 // Run stage 2 on the first document in the batch
250 doc_index = batch_start + parser->implementation->structural_indexes[parser->implementation->next_structural_index];
251 error = parser->implementation->stage2_next(doc&: parser->doc);
252 }
253}
254inline size_t document_stream::size_in_bytes() const noexcept {
255 return len;
256}
257
258inline size_t document_stream::truncated_bytes() const noexcept {
259 if(error == CAPACITY) { return len - batch_start; }
260 return parser->implementation->structural_indexes[parser->implementation->n_structural_indexes] - parser->implementation->structural_indexes[parser->implementation->n_structural_indexes + 1];
261}
262
263inline size_t document_stream::next_batch_start() const noexcept {
264 return batch_start + parser->implementation->structural_indexes[parser->implementation->n_structural_indexes];
265}
266
267inline error_code document_stream::run_stage1(dom::parser &p, size_t _batch_start) noexcept {
268 size_t remaining = len - _batch_start;
269 if (remaining <= batch_size) {
270 return p.implementation->stage1(buf: &buf[_batch_start], len: remaining, streaming: stage1_mode::streaming_final);
271 } else {
272 return p.implementation->stage1(buf: &buf[_batch_start], len: batch_size, streaming: stage1_mode::streaming_partial);
273 }
274}
275
276#ifdef SIMDJSON_THREADS_ENABLED
277
278inline void document_stream::load_from_stage1_thread() noexcept {
279 worker->finish();
280 // Swap to the parser that was loaded up in the thread. Make sure the parser has
281 // enough memory to swap to, as well.
282 std::swap(a&: *parser, b&: stage1_thread_parser);
283 error = stage1_thread_error;
284 if (error) { return; }
285
286 // If there's anything left, start the stage 1 thread!
287 if (next_batch_start() < len) {
288 start_stage1_thread();
289 }
290}
291
292inline void document_stream::start_stage1_thread() noexcept {
293 // we call the thread on a lambda that will update
294 // this->stage1_thread_error
295 // there is only one thread that may write to this value
296 // TODO this is NOT exception-safe.
297 this->stage1_thread_error = UNINITIALIZED; // In case something goes wrong, make sure it's an error
298 size_t _next_batch_start = this->next_batch_start();
299
300 worker->run(ds: this, stage1: & this->stage1_thread_parser, next_batch_start: _next_batch_start);
301}
302
303#endif // SIMDJSON_THREADS_ENABLED
304
305} // namespace dom
306
307simdjson_inline simdjson_result<dom::document_stream>::simdjson_result() noexcept
308 : simdjson_result_base() {
309}
310simdjson_inline simdjson_result<dom::document_stream>::simdjson_result(error_code error) noexcept
311 : simdjson_result_base(error) {
312}
313simdjson_inline simdjson_result<dom::document_stream>::simdjson_result(dom::document_stream &&value) noexcept
314 : simdjson_result_base(std::forward<dom::document_stream>(t&: value)) {
315}
316
317#if SIMDJSON_EXCEPTIONS
318simdjson_inline dom::document_stream::iterator simdjson_result<dom::document_stream>::begin() noexcept(false) {
319 if (error()) { throw simdjson_error(error()); }
320 return first.begin();
321}
322simdjson_inline dom::document_stream::iterator simdjson_result<dom::document_stream>::end() noexcept(false) {
323 if (error()) { throw simdjson_error(error()); }
324 return first.end();
325}
326#else // SIMDJSON_EXCEPTIONS
327#ifndef SIMDJSON_DISABLE_DEPRECATED_API
328simdjson_inline dom::document_stream::iterator simdjson_result<dom::document_stream>::begin() noexcept {
329 first.error = error();
330 return first.begin();
331}
332simdjson_inline dom::document_stream::iterator simdjson_result<dom::document_stream>::end() noexcept {
333 first.error = error();
334 return first.end();
335}
336#endif // SIMDJSON_DISABLE_DEPRECATED_API
337#endif // SIMDJSON_EXCEPTIONS
338
339} // namespace simdjson
340#endif // SIMDJSON_INLINE_DOCUMENT_STREAM_H
341