1#include <algorithm>
2#include <limits>
3#include <stdexcept>
4namespace simdjson {
5namespace SIMDJSON_IMPLEMENTATION {
6namespace ondemand {
7
8#ifdef SIMDJSON_THREADS_ENABLED
9
10inline void stage1_worker::finish() {
11 // After calling "run" someone would call finish() to wait
12 // for the end of the processing.
13 // This function will wait until either the thread has done
14 // the processing or, else, the destructor has been called.
15 std::unique_lock<std::mutex> lock(locking_mutex);
16 cond_var.wait(lock&: lock, p: [this]{return has_work == false;});
17}
18
19inline stage1_worker::~stage1_worker() {
20 // The thread may never outlive the stage1_worker instance
21 // and will always be stopped/joined before the stage1_worker
22 // instance is gone.
23 stop_thread();
24}
25
26inline void stage1_worker::start_thread() {
27 std::unique_lock<std::mutex> lock(locking_mutex);
28 if(thread.joinable()) {
29 return; // This should never happen but we never want to create more than one thread.
30 }
31 thread = std::thread([this]{
32 while(true) {
33 std::unique_lock<std::mutex> thread_lock(locking_mutex);
34 // We wait for either "run" or "stop_thread" to be called.
35 cond_var.wait(lock&: thread_lock, p: [this]{return has_work || !can_work;});
36 // If, for some reason, the stop_thread() method was called (i.e., the
37 // destructor of stage1_worker is called, then we want to immediately destroy
38 // the thread (and not do any more processing).
39 if(!can_work) {
40 break;
41 }
42 this->owner->stage1_thread_error = this->owner->run_stage1(p&: *this->stage1_thread_parser,
43 batch_start: this->_next_batch_start);
44 this->has_work = false;
45 // The condition variable call should be moved after thread_lock.unlock() for performance
46 // reasons but thread sanitizers may report it as a data race if we do.
47 // See https://stackoverflow.com/questions/35775501/c-should-condition-variable-be-notified-under-lock
48 cond_var.notify_one(); // will notify "finish"
49 thread_lock.unlock();
50 }
51 }
52 );
53}
54
55
56inline void stage1_worker::stop_thread() {
57 std::unique_lock<std::mutex> lock(locking_mutex);
58 // We have to make sure that all locks can be released.
59 can_work = false;
60 has_work = false;
61 cond_var.notify_all();
62 lock.unlock();
63 if(thread.joinable()) {
64 thread.join();
65 }
66}
67
68inline void stage1_worker::run(document_stream * ds, parser * stage1, size_t next_batch_start) {
69 std::unique_lock<std::mutex> lock(locking_mutex);
70 owner = ds;
71 _next_batch_start = next_batch_start;
72 stage1_thread_parser = stage1;
73 has_work = true;
74 // The condition variable call should be moved after thread_lock.unlock() for performance
75 // reasons but thread sanitizers may report it as a data race if we do.
76 // See https://stackoverflow.com/questions/35775501/c-should-condition-variable-be-notified-under-lock
77 cond_var.notify_one(); // will notify the thread lock that we have work
78 lock.unlock();
79}
80
81#endif // SIMDJSON_THREADS_ENABLED
82
83simdjson_inline document_stream::document_stream(
84 ondemand::parser &_parser,
85 const uint8_t *_buf,
86 size_t _len,
87 size_t _batch_size
88) noexcept
89 : parser{&_parser},
90 buf{_buf},
91 len{_len},
92 batch_size{_batch_size <= MINIMAL_BATCH_SIZE ? MINIMAL_BATCH_SIZE : _batch_size},
93 error{SUCCESS}
94 #ifdef SIMDJSON_THREADS_ENABLED
95 , use_thread(_parser.threaded) // we need to make a copy because _parser.threaded can change
96 #endif
97{
98#ifdef SIMDJSON_THREADS_ENABLED
99 if(worker.get() == nullptr) {
100 error = MEMALLOC;
101 }
102#endif
103}
104
105simdjson_inline document_stream::document_stream() noexcept
106 : parser{nullptr},
107 buf{nullptr},
108 len{0},
109 batch_size{0},
110 error{UNINITIALIZED}
111 #ifdef SIMDJSON_THREADS_ENABLED
112 , use_thread(false)
113 #endif
114{
115}
116
117simdjson_inline document_stream::~document_stream() noexcept
118{
119 #ifdef SIMDJSON_THREADS_ENABLED
120 worker.reset();
121 #endif
122}
123
124inline size_t document_stream::size_in_bytes() const noexcept {
125 return len;
126}
127
128inline size_t document_stream::truncated_bytes() const noexcept {
129 if(error == CAPACITY) { return len - batch_start; }
130 return parser->implementation->structural_indexes[parser->implementation->n_structural_indexes] - parser->implementation->structural_indexes[parser->implementation->n_structural_indexes + 1];
131}
132
133simdjson_inline document_stream::iterator::iterator() noexcept
134 : stream{nullptr}, finished{true} {
135}
136
137simdjson_inline document_stream::iterator::iterator(document_stream* _stream, bool is_end) noexcept
138 : stream{_stream}, finished{is_end} {
139}
140
141simdjson_inline simdjson_result<ondemand::document_reference> document_stream::iterator::operator*() noexcept {
142 //if(stream->error) { return stream->error; }
143 return simdjson_result<ondemand::document_reference>(stream->doc, stream->error);
144}
145
146simdjson_inline document_stream::iterator& document_stream::iterator::operator++() noexcept {
147 // If there is an error, then we want the iterator
148 // to be finished, no matter what. (E.g., we do not
149 // keep generating documents with errors, or go beyond
150 // a document with errors.)
151 //
152 // Users do not have to call "operator*()" when they use operator++,
153 // so we need to end the stream in the operator++ function.
154 //
155 // Note that setting finished = true is essential otherwise
156 // we would enter an infinite loop.
157 if (stream->error) { finished = true; }
158 // Note that stream->error() is guarded against error conditions
159 // (it will immediately return if stream->error casts to false).
160 // In effect, this next function does nothing when (stream->error)
161 // is true (hence the risk of an infinite loop).
162 stream->next();
163 // If that was the last document, we're finished.
164 // It is the only type of error we do not want to appear
165 // in operator*.
166 if (stream->error == EMPTY) { finished = true; }
167 // If we had any other kind of error (not EMPTY) then we want
168 // to pass it along to the operator* and we cannot mark the result
169 // as "finished" just yet.
170 return *this;
171}
172
173simdjson_inline bool document_stream::iterator::operator!=(const document_stream::iterator &other) const noexcept {
174 return finished != other.finished;
175}
176
177simdjson_inline document_stream::iterator document_stream::begin() noexcept {
178 start();
179 // If there are no documents, we're finished.
180 return iterator(this, error == EMPTY);
181}
182
183simdjson_inline document_stream::iterator document_stream::end() noexcept {
184 return iterator(this, true);
185}
186
187inline void document_stream::start() noexcept {
188 if (error) { return; }
189 error = parser->allocate(new_capacity: batch_size);
190 if (error) { return; }
191 // Always run the first stage 1 parse immediately
192 batch_start = 0;
193 error = run_stage1(p&: *parser, batch_start);
194 while(error == EMPTY) {
195 // In exceptional cases, we may start with an empty block
196 batch_start = next_batch_start();
197 if (batch_start >= len) { return; }
198 error = run_stage1(p&: *parser, batch_start);
199 }
200 if (error) { return; }
201 doc_index = batch_start;
202 doc = document(json_iterator(&buf[batch_start], parser));
203 doc.iter._streaming = true;
204
205 #ifdef SIMDJSON_THREADS_ENABLED
206 if (use_thread && next_batch_start() < len) {
207 // Kick off the first thread on next batch if needed
208 error = stage1_thread_parser.allocate(new_capacity: batch_size);
209 if (error) { return; }
210 worker->start_thread();
211 start_stage1_thread();
212 if (error) { return; }
213 }
214 #endif // SIMDJSON_THREADS_ENABLED
215}
216
217inline void document_stream::next() noexcept {
218 // We always enter at once once in an error condition.
219 if (error) { return; }
220 next_document();
221 if (error) { return; }
222 auto cur_struct_index = doc.iter._root - parser->implementation->structural_indexes.get();
223 doc_index = batch_start + parser->implementation->structural_indexes[cur_struct_index];
224
225 // Check if at end of structural indexes (i.e. at end of batch)
226 if(cur_struct_index >= static_cast<int64_t>(parser->implementation->n_structural_indexes)) {
227 error = EMPTY;
228 // Load another batch (if available)
229 while (error == EMPTY) {
230 batch_start = next_batch_start();
231 if (batch_start >= len) { break; }
232 #ifdef SIMDJSON_THREADS_ENABLED
233 if(use_thread) {
234 load_from_stage1_thread();
235 } else {
236 error = run_stage1(p&: *parser, batch_start);
237 }
238 #else
239 error = run_stage1(*parser, batch_start);
240 #endif
241 /**
242 * Whenever we move to another window, we need to update all pointers to make
243 * it appear as if the input buffer started at the beginning of the window.
244 *
245 * Take this input:
246 *
247 * {"z":5} {"1":1,"2":2,"4":4} [7, 10, 9] [15, 11, 12, 13] [154, 110, 112, 1311]
248 *
249 * Say you process the following window...
250 *
251 * '{"z":5} {"1":1,"2":2,"4":4} [7, 10, 9]'
252 *
253 * When you do so, the json_iterator has a pointer at the beginning of the memory region
254 * (pointing at the beginning of '{"z"...'.
255 *
256 * When you move to the window that starts at...
257 *
258 * '[7, 10, 9] [15, 11, 12, 13] ...
259 *
260 * then it is not sufficient to just run stage 1. You also need to re-anchor the
261 * json_iterator so that it believes we are starting at '[7, 10, 9]...'.
262 *
263 * Under the DOM front-end, this gets done automatically because the parser owns
264 * the pointer the data, and when you call stage1 and then stage2 on the same
265 * parser, then stage2 will run on the pointer acquired by stage1.
266 *
267 * That is, stage1 calls "this->buf = _buf" so the parser remembers the buffer that
268 * we used. But json_iterator has no callback when stage1 is called on the parser.
269 * In fact, I think that the parser is unaware of json_iterator.
270 *
271 *
272 * So we need to re-anchor the json_iterator after each call to stage 1 so that
273 * all of the pointers are in sync.
274 */
275 doc.iter = json_iterator(&buf[batch_start], parser);
276 doc.iter._streaming = true;
277 /**
278 * End of resync.
279 */
280
281 if (error) { continue; } // If the error was EMPTY, we may want to load another batch.
282 doc_index = batch_start;
283 }
284 }
285}
286
287inline void document_stream::next_document() noexcept {
288 // Go to next place where depth=0 (document depth)
289 error = doc.iter.skip_child(parent_depth: 0);
290 if (error) { return; }
291 // Always set depth=1 at the start of document
292 doc.iter._depth = 1;
293 // Resets the string buffer at the beginning, thus invalidating the strings.
294 doc.iter._string_buf_loc = parser->string_buf.get();
295 doc.iter._root = doc.iter.position();
296}
297
298inline size_t document_stream::next_batch_start() const noexcept {
299 return batch_start + parser->implementation->structural_indexes[parser->implementation->n_structural_indexes];
300}
301
302inline error_code document_stream::run_stage1(ondemand::parser &p, size_t _batch_start) noexcept {
303 // This code only updates the structural index in the parser, it does not update any json_iterator
304 // instance.
305 size_t remaining = len - _batch_start;
306 if (remaining <= batch_size) {
307 return p.implementation->stage1(buf: &buf[_batch_start], len: remaining, streaming: stage1_mode::streaming_final);
308 } else {
309 return p.implementation->stage1(buf: &buf[_batch_start], len: batch_size, streaming: stage1_mode::streaming_partial);
310 }
311}
312
313simdjson_inline size_t document_stream::iterator::current_index() const noexcept {
314 return stream->doc_index;
315}
316
317simdjson_inline std::string_view document_stream::iterator::source() const noexcept {
318 auto depth = stream->doc.iter.depth();
319 auto cur_struct_index = stream->doc.iter._root - stream->parser->implementation->structural_indexes.get();
320
321 // If at root, process the first token to determine if scalar value
322 if (stream->doc.iter.at_root()) {
323 switch (stream->buf[stream->batch_start + stream->parser->implementation->structural_indexes[cur_struct_index]]) {
324 case '{': case '[': // Depth=1 already at start of document
325 break;
326 case '}': case ']':
327 depth--;
328 break;
329 default: // Scalar value document
330 // TODO: Remove any trailing whitespaces
331 // This returns a string spanning from start of value to the beginning of the next document (excluded)
332 return std::string_view(reinterpret_cast<const char*>(stream->buf) + current_index(), stream->parser->implementation->structural_indexes[++cur_struct_index] - current_index() - 1);
333 }
334 cur_struct_index++;
335 }
336
337 while (cur_struct_index <= static_cast<int64_t>(stream->parser->implementation->n_structural_indexes)) {
338 switch (stream->buf[stream->batch_start + stream->parser->implementation->structural_indexes[cur_struct_index]]) {
339 case '{': case '[':
340 depth++;
341 break;
342 case '}': case ']':
343 depth--;
344 break;
345 }
346 if (depth == 0) { break; }
347 cur_struct_index++;
348 }
349
350 return std::string_view(reinterpret_cast<const char*>(stream->buf) + current_index(), stream->parser->implementation->structural_indexes[cur_struct_index] - current_index() + stream->batch_start + 1);;
351}
352
353inline error_code document_stream::iterator::error() const noexcept {
354 return stream->error;
355}
356
357#ifdef SIMDJSON_THREADS_ENABLED
358
359inline void document_stream::load_from_stage1_thread() noexcept {
360 worker->finish();
361 // Swap to the parser that was loaded up in the thread. Make sure the parser has
362 // enough memory to swap to, as well.
363 std::swap(a&: stage1_thread_parser,b&: *parser);
364 error = stage1_thread_error;
365 if (error) { return; }
366
367 // If there's anything left, start the stage 1 thread!
368 if (next_batch_start() < len) {
369 start_stage1_thread();
370 }
371}
372
373inline void document_stream::start_stage1_thread() noexcept {
374 // we call the thread on a lambda that will update
375 // this->stage1_thread_error
376 // there is only one thread that may write to this value
377 // TODO this is NOT exception-safe.
378 this->stage1_thread_error = UNINITIALIZED; // In case something goes wrong, make sure it's an error
379 size_t _next_batch_start = this->next_batch_start();
380
381 worker->run(ds: this, stage1: & this->stage1_thread_parser, next_batch_start: _next_batch_start);
382}
383
384#endif // SIMDJSON_THREADS_ENABLED
385
386} // namespace ondemand
387} // namespace SIMDJSON_IMPLEMENTATION
388} // namespace simdjson
389
390namespace simdjson {
391
392simdjson_inline simdjson_result<SIMDJSON_IMPLEMENTATION::ondemand::document_stream>::simdjson_result(
393 error_code error
394) noexcept :
395 implementation_simdjson_result_base<SIMDJSON_IMPLEMENTATION::ondemand::document_stream>(error)
396{
397}
398simdjson_inline simdjson_result<SIMDJSON_IMPLEMENTATION::ondemand::document_stream>::simdjson_result(
399 SIMDJSON_IMPLEMENTATION::ondemand::document_stream &&value
400) noexcept :
401 implementation_simdjson_result_base<SIMDJSON_IMPLEMENTATION::ondemand::document_stream>(
402 std::forward<SIMDJSON_IMPLEMENTATION::ondemand::document_stream>(t&: value)
403 )
404{
405}
406
407}
408