1#include "simdjson/error.h"
2#ifdef SIMDJSON_THREADS_ENABLED
3#include <thread>
4#include <mutex>
5#include <condition_variable>
6#endif
7
8namespace simdjson {
9namespace SIMDJSON_IMPLEMENTATION {
10namespace ondemand {
11
12class parser;
13class json_iterator;
14class document;
15
16#ifdef SIMDJSON_THREADS_ENABLED
17/** @private Custom worker class **/
18struct stage1_worker {
19 stage1_worker() noexcept = default;
20 stage1_worker(const stage1_worker&) = delete;
21 stage1_worker(stage1_worker&&) = delete;
22 stage1_worker operator=(const stage1_worker&) = delete;
23 ~stage1_worker();
24 /**
25 * We only start the thread when it is needed, not at object construction, this may throw.
26 * You should only call this once.
27 **/
28 void start_thread();
29 /**
30 * Start a stage 1 job. You should first call 'run', then 'finish'.
31 * You must call start_thread once before.
32 */
33 void run(document_stream * ds, parser * stage1, size_t next_batch_start);
34 /** Wait for the run to finish (blocking). You should first call 'run', then 'finish'. **/
35 void finish();
36
37private:
38
39 /**
40 * Normally, we would never stop the thread. But we do in the destructor.
41 * This function is only safe assuming that you are not waiting for results. You
42 * should have called run, then finish, and be done.
43 **/
44 void stop_thread();
45
46 std::thread thread{};
47 /** These three variables define the work done by the thread. **/
48 ondemand::parser * stage1_thread_parser{};
49 size_t _next_batch_start{};
50 document_stream * owner{};
51 /**
52 * We have two state variables. This could be streamlined to one variable in the future but
53 * we use two for clarity.
54 */
55 bool has_work{false};
56 bool can_work{true};
57
58 /**
59 * We lock using a mutex.
60 */
61 std::mutex locking_mutex{};
62 std::condition_variable cond_var{};
63
64 friend class document_stream;
65};
66#endif // SIMDJSON_THREADS_ENABLED
67
68/**
69 * A forward-only stream of documents.
70 *
71 * Produced by parser::iterate_many.
72 *
73 */
74class document_stream {
75public:
76 /**
77 * Construct an uninitialized document_stream.
78 *
79 * ```c++
80 * document_stream docs;
81 * auto error = parser.iterate_many(json).get(docs);
82 * ```
83 */
84 simdjson_inline document_stream() noexcept;
85 /** Move one document_stream to another. */
86 simdjson_inline document_stream(document_stream &&other) noexcept = default;
87 /** Move one document_stream to another. */
88 simdjson_inline document_stream &operator=(document_stream &&other) noexcept = default;
89
90 simdjson_inline ~document_stream() noexcept;
91
92 /**
93 * Returns the input size in bytes.
94 */
95 inline size_t size_in_bytes() const noexcept;
96
97 /**
98 * After iterating through the stream, this method
99 * returns the number of bytes that were not parsed at the end
100 * of the stream. If truncated_bytes() differs from zero,
101 * then the input was truncated maybe because incomplete JSON
102 * documents were found at the end of the stream. You
103 * may need to process the bytes in the interval [size_in_bytes()-truncated_bytes(), size_in_bytes()).
104 *
105 * You should only call truncated_bytes() after streaming through all
106 * documents, like so:
107 *
108 * document_stream stream = parser.iterate_many(json,window);
109 * for(auto & doc : stream) {
110 * // do something with doc
111 * }
112 * size_t truncated = stream.truncated_bytes();
113 *
114 */
115 inline size_t truncated_bytes() const noexcept;
116
117 class iterator {
118 public:
119 using value_type = simdjson_result<document>;
120 using reference = value_type;
121
122 using difference_type = std::ptrdiff_t;
123
124 using iterator_category = std::input_iterator_tag;
125
126 /**
127 * Default constructor.
128 */
129 simdjson_inline iterator() noexcept;
130 /**
131 * Get the current document (or error).
132 */
133 simdjson_inline simdjson_result<ondemand::document_reference> operator*() noexcept;
134 /**
135 * Advance to the next document (prefix).
136 */
137 inline iterator& operator++() noexcept;
138 /**
139 * Check if we're at the end yet.
140 * @param other the end iterator to compare to.
141 */
142 simdjson_inline bool operator!=(const iterator &other) const noexcept;
143 /**
144 * @private
145 *
146 * Gives the current index in the input document in bytes.
147 *
148 * document_stream stream = parser.parse_many(json,window);
149 * for(auto i = stream.begin(); i != stream.end(); ++i) {
150 * auto doc = *i;
151 * size_t index = i.current_index();
152 * }
153 *
154 * This function (current_index()) is experimental and the usage
155 * may change in future versions of simdjson: we find the API somewhat
156 * awkward and we would like to offer something friendlier.
157 */
158 simdjson_inline size_t current_index() const noexcept;
159
160 /**
161 * @private
162 *
163 * Gives a view of the current document at the current position.
164 *
165 * document_stream stream = parser.iterate_many(json,window);
166 * for(auto i = stream.begin(); i != stream.end(); ++i) {
167 * std::string_view v = i.source();
168 * }
169 *
170 * The returned string_view instance is simply a map to the (unparsed)
171 * source string: it may thus include white-space characters and all manner
172 * of padding.
173 *
174 * This function (source()) is experimental and the usage
175 * may change in future versions of simdjson: we find the API somewhat
176 * awkward and we would like to offer something friendlier.
177 *
178 */
179 simdjson_inline std::string_view source() const noexcept;
180
181 /**
182 * Returns error of the stream (if any).
183 */
184 inline error_code error() const noexcept;
185
186 private:
187 simdjson_inline iterator(document_stream *s, bool finished) noexcept;
188 /** The document_stream we're iterating through. */
189 document_stream* stream;
190 /** Whether we're finished or not. */
191 bool finished;
192
193 friend class document;
194 friend class document_stream;
195 friend class json_iterator;
196 };
197
198 /**
199 * Start iterating the documents in the stream.
200 */
201 simdjson_inline iterator begin() noexcept;
202 /**
203 * The end of the stream, for iterator comparison purposes.
204 */
205 simdjson_inline iterator end() noexcept;
206
207private:
208
209 document_stream &operator=(const document_stream &) = delete; // Disallow copying
210 document_stream(const document_stream &other) = delete; // Disallow copying
211
212 /**
213 * Construct a document_stream. Does not allocate or parse anything until the iterator is
214 * used.
215 *
216 * @param parser is a reference to the parser instance used to generate this document_stream
217 * @param buf is the raw byte buffer we need to process
218 * @param len is the length of the raw byte buffer in bytes
219 * @param batch_size is the size of the windows (must be strictly greater or equal to the largest JSON document)
220 */
221 simdjson_inline document_stream(
222 ondemand::parser &parser,
223 const uint8_t *buf,
224 size_t len,
225 size_t batch_size
226 ) noexcept;
227
228 /**
229 * Parse the first document in the buffer. Used by begin(), to handle allocation and
230 * initialization.
231 */
232 inline void start() noexcept;
233
234 /**
235 * Parse the next document found in the buffer previously given to document_stream.
236 *
237 * The content should be a valid JSON document encoded as UTF-8. If there is a
238 * UTF-8 BOM, the caller is responsible for omitting it, UTF-8 BOM are
239 * discouraged.
240 *
241 * You do NOT need to pre-allocate a parser. This function takes care of
242 * pre-allocating a capacity defined by the batch_size defined when creating the
243 * document_stream object.
244 *
245 * The function returns simdjson::EMPTY if there is no more data to be parsed.
246 *
247 * The function returns simdjson::SUCCESS (as integer = 0) in case of success
248 * and indicates that the buffer has successfully been parsed to the end.
249 * Every document it contained has been parsed without error.
250 *
251 * The function returns an error code from simdjson/simdjson.h in case of failure
252 * such as simdjson::CAPACITY, simdjson::MEMALLOC, simdjson::DEPTH_ERROR and so forth;
253 * the simdjson::error_message function converts these error codes into a string).
254 *
255 * You can also check validity by calling parser.is_valid(). The same parser can
256 * and should be reused for the other documents in the buffer.
257 */
258 inline void next() noexcept;
259
260 /** Move the json_iterator of the document to the location of the next document in the stream. */
261 inline void next_document() noexcept;
262
263 /** Get the next document index. */
264 inline size_t next_batch_start() const noexcept;
265
266 /** Pass the next batch through stage 1 with the given parser. */
267 inline error_code run_stage1(ondemand::parser &p, size_t batch_start) noexcept;
268
269 // Fields
270 ondemand::parser *parser;
271 const uint8_t *buf;
272 size_t len;
273 size_t batch_size;
274 /**
275 * We are going to use just one document instance. The document owns
276 * the json_iterator. It implies that we only ever pass a reference
277 * to the document to the users.
278 */
279 document doc{};
280 /** The error (or lack thereof) from the current document. */
281 error_code error;
282 size_t batch_start{0};
283 size_t doc_index{};
284
285 #ifdef SIMDJSON_THREADS_ENABLED
286 /** Indicates whether we use threads. Note that this needs to be a constant during the execution of the parsing. */
287 bool use_thread;
288
289 inline void load_from_stage1_thread() noexcept;
290
291 /** Start a thread to run stage 1 on the next batch. */
292 inline void start_stage1_thread() noexcept;
293
294 /** Wait for the stage 1 thread to finish and capture the results. */
295 inline void finish_stage1_thread() noexcept;
296
297 /** The error returned from the stage 1 thread. */
298 error_code stage1_thread_error{UNINITIALIZED};
299 /** The thread used to run stage 1 against the next batch in the background. */
300 std::unique_ptr<stage1_worker> worker{new(std::nothrow) stage1_worker()};
301 /**
302 * The parser used to run stage 1 in the background. Will be swapped
303 * with the regular parser when finished.
304 */
305 ondemand::parser stage1_thread_parser{};
306
307 friend struct stage1_worker;
308 #endif // SIMDJSON_THREADS_ENABLED
309
310 friend class parser;
311 friend class document;
312 friend class json_iterator;
313 friend struct simdjson_result<ondemand::document_stream>;
314 friend struct internal::simdjson_result_base<ondemand::document_stream>;
315}; // document_stream
316
317} // namespace ondemand
318} // namespace SIMDJSON_IMPLEMENTATION
319} // namespace simdjson
320
321namespace simdjson {
322template<>
323struct simdjson_result<SIMDJSON_IMPLEMENTATION::ondemand::document_stream> : public SIMDJSON_IMPLEMENTATION::implementation_simdjson_result_base<SIMDJSON_IMPLEMENTATION::ondemand::document_stream> {
324public:
325 simdjson_inline simdjson_result(SIMDJSON_IMPLEMENTATION::ondemand::document_stream &&value) noexcept; ///< @private
326 simdjson_inline simdjson_result(error_code error) noexcept; ///< @private
327 simdjson_inline simdjson_result() noexcept = default;
328};
329
330} // namespace simdjson
331