1#ifndef SIMDJSON_DOCUMENT_STREAM_H
2#define SIMDJSON_DOCUMENT_STREAM_H
3
4#include "simdjson/common_defs.h"
5#include "simdjson/dom/parser.h"
6#include "simdjson/error.h"
7#ifdef SIMDJSON_THREADS_ENABLED
8#include <thread>
9#include <mutex>
10#include <condition_variable>
11#endif
12
13namespace simdjson {
14namespace dom {
15
16
17#ifdef SIMDJSON_THREADS_ENABLED
18/** @private Custom worker class **/
19struct stage1_worker {
20 stage1_worker() noexcept = default;
21 stage1_worker(const stage1_worker&) = delete;
22 stage1_worker(stage1_worker&&) = delete;
23 stage1_worker operator=(const stage1_worker&) = delete;
24 ~stage1_worker();
25 /**
26 * We only start the thread when it is needed, not at object construction, this may throw.
27 * You should only call this once.
28 **/
29 void start_thread();
30 /**
31 * Start a stage 1 job. You should first call 'run', then 'finish'.
32 * You must call start_thread once before.
33 */
34 void run(document_stream * ds, dom::parser * stage1, size_t next_batch_start);
35 /** Wait for the run to finish (blocking). You should first call 'run', then 'finish'. **/
36 void finish();
37
38private:
39
40 /**
41 * Normally, we would never stop the thread. But we do in the destructor.
42 * This function is only safe assuming that you are not waiting for results. You
43 * should have called run, then finish, and be done.
44 **/
45 void stop_thread();
46
47 std::thread thread{};
48 /** These three variables define the work done by the thread. **/
49 dom::parser * stage1_thread_parser{};
50 size_t _next_batch_start{};
51 document_stream * owner{};
52 /**
53 * We have two state variables. This could be streamlined to one variable in the future but
54 * we use two for clarity.
55 */
56 bool has_work{false};
57 bool can_work{true};
58
59 /**
60 * We lock using a mutex.
61 */
62 std::mutex locking_mutex{};
63 std::condition_variable cond_var{};
64};
65#endif
66
67/**
68 * A forward-only stream of documents.
69 *
70 * Produced by parser::parse_many.
71 *
72 */
73class document_stream {
74public:
75 /**
76 * Construct an uninitialized document_stream.
77 *
78 * ```c++
79 * document_stream docs;
80 * error = parser.parse_many(json).get(docs);
81 * ```
82 */
83 simdjson_inline document_stream() noexcept;
84 /** Move one document_stream to another. */
85 simdjson_inline document_stream(document_stream &&other) noexcept = default;
86 /** Move one document_stream to another. */
87 simdjson_inline document_stream &operator=(document_stream &&other) noexcept = default;
88
89 simdjson_inline ~document_stream() noexcept;
90 /**
91 * Returns the input size in bytes.
92 */
93 inline size_t size_in_bytes() const noexcept;
94 /**
95 * After iterating through the stream, this method
96 * returns the number of bytes that were not parsed at the end
97 * of the stream. If truncated_bytes() differs from zero,
98 * then the input was truncated maybe because incomplete JSON
99 * documents were found at the end of the stream. You
100 * may need to process the bytes in the interval [size_in_bytes()-truncated_bytes(), size_in_bytes()).
101 *
102 * You should only call truncated_bytes() after streaming through all
103 * documents, like so:
104 *
105 * document_stream stream = parser.parse_many(json,window);
106 * for(auto doc : stream) {
107 * // do something with doc
108 * }
109 * size_t truncated = stream.truncated_bytes();
110 *
111 */
112 inline size_t truncated_bytes() const noexcept;
113 /**
114 * An iterator through a forward-only stream of documents.
115 */
116 class iterator {
117 public:
118 using value_type = simdjson_result<element>;
119 using reference = value_type;
120
121 using difference_type = std::ptrdiff_t;
122
123 using iterator_category = std::input_iterator_tag;
124
125 /**
126 * Default constructor.
127 */
128 simdjson_inline iterator() noexcept;
129 /**
130 * Get the current document (or error).
131 */
132 simdjson_inline reference operator*() noexcept;
133 /**
134 * Advance to the next document (prefix).
135 */
136 inline iterator& operator++() noexcept;
137 /**
138 * Check if we're at the end yet.
139 * @param other the end iterator to compare to.
140 */
141 simdjson_inline bool operator!=(const iterator &other) const noexcept;
142 /**
143 * @private
144 *
145 * Gives the current index in the input document in bytes.
146 *
147 * document_stream stream = parser.parse_many(json,window);
148 * for(auto i = stream.begin(); i != stream.end(); ++i) {
149 * auto doc = *i;
150 * size_t index = i.current_index();
151 * }
152 *
153 * This function (current_index()) is experimental and the usage
154 * may change in future versions of simdjson: we find the API somewhat
155 * awkward and we would like to offer something friendlier.
156 */
157 simdjson_inline size_t current_index() const noexcept;
158 /**
159 * @private
160 *
161 * Gives a view of the current document.
162 *
163 * document_stream stream = parser.parse_many(json,window);
164 * for(auto i = stream.begin(); i != stream.end(); ++i) {
165 * auto doc = *i;
166 * std::string_view v = i->source();
167 * }
168 *
169 * The returned string_view instance is simply a map to the (unparsed)
170 * source string: it may thus include white-space characters and all manner
171 * of padding.
172 *
173 * This function (source()) is experimental and the usage
174 * may change in future versions of simdjson: we find the API somewhat
175 * awkward and we would like to offer something friendlier.
176 */
177 simdjson_inline std::string_view source() const noexcept;
178
179 private:
180 simdjson_inline iterator(document_stream *s, bool finished) noexcept;
181 /** The document_stream we're iterating through. */
182 document_stream* stream;
183 /** Whether we're finished or not. */
184 bool finished;
185 friend class document_stream;
186 };
187
188 /**
189 * Start iterating the documents in the stream.
190 */
191 simdjson_inline iterator begin() noexcept;
192 /**
193 * The end of the stream, for iterator comparison purposes.
194 */
195 simdjson_inline iterator end() noexcept;
196
197private:
198
199 document_stream &operator=(const document_stream &) = delete; // Disallow copying
200 document_stream(const document_stream &other) = delete; // Disallow copying
201
202 /**
203 * Construct a document_stream. Does not allocate or parse anything until the iterator is
204 * used.
205 *
206 * @param parser is a reference to the parser instance used to generate this document_stream
207 * @param buf is the raw byte buffer we need to process
208 * @param len is the length of the raw byte buffer in bytes
209 * @param batch_size is the size of the windows (must be strictly greater or equal to the largest JSON document)
210 */
211 simdjson_inline document_stream(
212 dom::parser &parser,
213 const uint8_t *buf,
214 size_t len,
215 size_t batch_size
216 ) noexcept;
217
218 /**
219 * Parse the first document in the buffer. Used by begin(), to handle allocation and
220 * initialization.
221 */
222 inline void start() noexcept;
223
224 /**
225 * Parse the next document found in the buffer previously given to document_stream.
226 *
227 * The content should be a valid JSON document encoded as UTF-8. If there is a
228 * UTF-8 BOM, the caller is responsible for omitting it, UTF-8 BOM are
229 * discouraged.
230 *
231 * You do NOT need to pre-allocate a parser. This function takes care of
232 * pre-allocating a capacity defined by the batch_size defined when creating the
233 * document_stream object.
234 *
235 * The function returns simdjson::EMPTY if there is no more data to be parsed.
236 *
237 * The function returns simdjson::SUCCESS (as integer = 0) in case of success
238 * and indicates that the buffer has successfully been parsed to the end.
239 * Every document it contained has been parsed without error.
240 *
241 * The function returns an error code from simdjson/simdjson.h in case of failure
242 * such as simdjson::CAPACITY, simdjson::MEMALLOC, simdjson::DEPTH_ERROR and so forth;
243 * the simdjson::error_message function converts these error codes into a string).
244 *
245 * You can also check validity by calling parser.is_valid(). The same parser can
246 * and should be reused for the other documents in the buffer.
247 */
248 inline void next() noexcept;
249
250 /**
251 * Pass the next batch through stage 1 and return when finished.
252 * When threads are enabled, this may wait for the stage 1 thread to finish.
253 */
254 inline void load_batch() noexcept;
255
256 /** Get the next document index. */
257 inline size_t next_batch_start() const noexcept;
258
259 /** Pass the next batch through stage 1 with the given parser. */
260 inline error_code run_stage1(dom::parser &p, size_t batch_start) noexcept;
261
262 dom::parser *parser;
263 const uint8_t *buf;
264 size_t len;
265 size_t batch_size;
266 /** The error (or lack thereof) from the current document. */
267 error_code error;
268 size_t batch_start{0};
269 size_t doc_index{};
270#ifdef SIMDJSON_THREADS_ENABLED
271 /** Indicates whether we use threads. Note that this needs to be a constant during the execution of the parsing. */
272 bool use_thread;
273
274 inline void load_from_stage1_thread() noexcept;
275
276 /** Start a thread to run stage 1 on the next batch. */
277 inline void start_stage1_thread() noexcept;
278
279 /** Wait for the stage 1 thread to finish and capture the results. */
280 inline void finish_stage1_thread() noexcept;
281
282 /** The error returned from the stage 1 thread. */
283 error_code stage1_thread_error{UNINITIALIZED};
284 /** The thread used to run stage 1 against the next batch in the background. */
285 friend struct stage1_worker;
286 std::unique_ptr<stage1_worker> worker{new(std::nothrow) stage1_worker()};
287 /**
288 * The parser used to run stage 1 in the background. Will be swapped
289 * with the regular parser when finished.
290 */
291 dom::parser stage1_thread_parser{};
292#endif // SIMDJSON_THREADS_ENABLED
293
294 friend class dom::parser;
295 friend struct simdjson_result<dom::document_stream>;
296 friend struct internal::simdjson_result_base<dom::document_stream>;
297
298}; // class document_stream
299
300} // namespace dom
301
302template<>
303struct simdjson_result<dom::document_stream> : public internal::simdjson_result_base<dom::document_stream> {
304public:
305 simdjson_inline simdjson_result() noexcept; ///< @private
306 simdjson_inline simdjson_result(error_code error) noexcept; ///< @private
307 simdjson_inline simdjson_result(dom::document_stream &&value) noexcept; ///< @private
308
309#if SIMDJSON_EXCEPTIONS
310 simdjson_inline dom::document_stream::iterator begin() noexcept(false);
311 simdjson_inline dom::document_stream::iterator end() noexcept(false);
312#else // SIMDJSON_EXCEPTIONS
313#ifndef SIMDJSON_DISABLE_DEPRECATED_API
314 [[deprecated("parse_many() and load_many() may return errors. Use document_stream stream; error = parser.parse_many().get(doc); instead.")]]
315 simdjson_inline dom::document_stream::iterator begin() noexcept;
316 [[deprecated("parse_many() and load_many() may return errors. Use document_stream stream; error = parser.parse_many().get(doc); instead.")]]
317 simdjson_inline dom::document_stream::iterator end() noexcept;
318#endif // SIMDJSON_DISABLE_DEPRECATED_API
319#endif // SIMDJSON_EXCEPTIONS
320}; // struct simdjson_result<dom::document_stream>
321
322} // namespace simdjson
323
324#endif // SIMDJSON_DOCUMENT_STREAM_H
325