1 | #include "simdjson/error.h" |
2 | #ifdef SIMDJSON_THREADS_ENABLED |
3 | #include <thread> |
4 | #include <mutex> |
5 | #include <condition_variable> |
6 | #endif |
7 | |
8 | namespace simdjson { |
9 | namespace SIMDJSON_IMPLEMENTATION { |
10 | namespace ondemand { |
11 | |
12 | class parser; |
13 | class json_iterator; |
14 | class document; |
15 | |
16 | #ifdef SIMDJSON_THREADS_ENABLED |
17 | /** @private Custom worker class **/ |
18 | struct stage1_worker { |
19 | stage1_worker() noexcept = default; |
20 | stage1_worker(const stage1_worker&) = delete; |
21 | stage1_worker(stage1_worker&&) = delete; |
22 | stage1_worker operator=(const stage1_worker&) = delete; |
23 | ~stage1_worker(); |
24 | /** |
25 | * We only start the thread when it is needed, not at object construction, this may throw. |
26 | * You should only call this once. |
27 | **/ |
28 | void start_thread(); |
29 | /** |
30 | * Start a stage 1 job. You should first call 'run', then 'finish'. |
31 | * You must call start_thread once before. |
32 | */ |
33 | void run(document_stream * ds, parser * stage1, size_t next_batch_start); |
34 | /** Wait for the run to finish (blocking). You should first call 'run', then 'finish'. **/ |
35 | void finish(); |
36 | |
37 | private: |
38 | |
39 | /** |
40 | * Normally, we would never stop the thread. But we do in the destructor. |
41 | * This function is only safe assuming that you are not waiting for results. You |
42 | * should have called run, then finish, and be done. |
43 | **/ |
44 | void stop_thread(); |
45 | |
46 | std::thread thread{}; |
47 | /** These three variables define the work done by the thread. **/ |
48 | ondemand::parser * stage1_thread_parser{}; |
49 | size_t _next_batch_start{}; |
50 | document_stream * owner{}; |
51 | /** |
52 | * We have two state variables. This could be streamlined to one variable in the future but |
53 | * we use two for clarity. |
54 | */ |
55 | bool has_work{false}; |
56 | bool can_work{true}; |
57 | |
58 | /** |
59 | * We lock using a mutex. |
60 | */ |
61 | std::mutex locking_mutex{}; |
62 | std::condition_variable cond_var{}; |
63 | |
64 | friend class document_stream; |
65 | }; |
66 | #endif // SIMDJSON_THREADS_ENABLED |
67 | |
68 | /** |
69 | * A forward-only stream of documents. |
70 | * |
71 | * Produced by parser::iterate_many. |
72 | * |
73 | */ |
74 | class document_stream { |
75 | public: |
76 | /** |
77 | * Construct an uninitialized document_stream. |
78 | * |
79 | * ```c++ |
80 | * document_stream docs; |
81 | * auto error = parser.iterate_many(json).get(docs); |
82 | * ``` |
83 | */ |
84 | simdjson_inline document_stream() noexcept; |
85 | /** Move one document_stream to another. */ |
86 | simdjson_inline document_stream(document_stream &&other) noexcept = default; |
87 | /** Move one document_stream to another. */ |
88 | simdjson_inline document_stream &operator=(document_stream &&other) noexcept = default; |
89 | |
90 | simdjson_inline ~document_stream() noexcept; |
91 | |
92 | /** |
93 | * Returns the input size in bytes. |
94 | */ |
95 | inline size_t size_in_bytes() const noexcept; |
96 | |
97 | /** |
98 | * After iterating through the stream, this method |
99 | * returns the number of bytes that were not parsed at the end |
100 | * of the stream. If truncated_bytes() differs from zero, |
101 | * then the input was truncated maybe because incomplete JSON |
102 | * documents were found at the end of the stream. You |
103 | * may need to process the bytes in the interval [size_in_bytes()-truncated_bytes(), size_in_bytes()). |
104 | * |
105 | * You should only call truncated_bytes() after streaming through all |
106 | * documents, like so: |
107 | * |
108 | * document_stream stream = parser.iterate_many(json,window); |
109 | * for(auto & doc : stream) { |
110 | * // do something with doc |
111 | * } |
112 | * size_t truncated = stream.truncated_bytes(); |
113 | * |
114 | */ |
115 | inline size_t truncated_bytes() const noexcept; |
116 | |
117 | class iterator { |
118 | public: |
119 | using value_type = simdjson_result<document>; |
120 | using reference = value_type; |
121 | |
122 | using difference_type = std::ptrdiff_t; |
123 | |
124 | using iterator_category = std::input_iterator_tag; |
125 | |
126 | /** |
127 | * Default constructor. |
128 | */ |
129 | simdjson_inline iterator() noexcept; |
130 | /** |
131 | * Get the current document (or error). |
132 | */ |
133 | simdjson_inline simdjson_result<ondemand::document_reference> operator*() noexcept; |
134 | /** |
135 | * Advance to the next document (prefix). |
136 | */ |
137 | inline iterator& operator++() noexcept; |
138 | /** |
139 | * Check if we're at the end yet. |
140 | * @param other the end iterator to compare to. |
141 | */ |
142 | simdjson_inline bool operator!=(const iterator &other) const noexcept; |
143 | /** |
144 | * @private |
145 | * |
146 | * Gives the current index in the input document in bytes. |
147 | * |
148 | * document_stream stream = parser.parse_many(json,window); |
149 | * for(auto i = stream.begin(); i != stream.end(); ++i) { |
150 | * auto doc = *i; |
151 | * size_t index = i.current_index(); |
152 | * } |
153 | * |
154 | * This function (current_index()) is experimental and the usage |
155 | * may change in future versions of simdjson: we find the API somewhat |
156 | * awkward and we would like to offer something friendlier. |
157 | */ |
158 | simdjson_inline size_t current_index() const noexcept; |
159 | |
160 | /** |
161 | * @private |
162 | * |
163 | * Gives a view of the current document at the current position. |
164 | * |
165 | * document_stream stream = parser.iterate_many(json,window); |
166 | * for(auto i = stream.begin(); i != stream.end(); ++i) { |
167 | * std::string_view v = i.source(); |
168 | * } |
169 | * |
170 | * The returned string_view instance is simply a map to the (unparsed) |
171 | * source string: it may thus include white-space characters and all manner |
172 | * of padding. |
173 | * |
174 | * This function (source()) is experimental and the usage |
175 | * may change in future versions of simdjson: we find the API somewhat |
176 | * awkward and we would like to offer something friendlier. |
177 | * |
178 | */ |
179 | simdjson_inline std::string_view source() const noexcept; |
180 | |
181 | /** |
182 | * Returns error of the stream (if any). |
183 | */ |
184 | inline error_code error() const noexcept; |
185 | |
186 | private: |
187 | simdjson_inline iterator(document_stream *s, bool finished) noexcept; |
188 | /** The document_stream we're iterating through. */ |
189 | document_stream* stream; |
190 | /** Whether we're finished or not. */ |
191 | bool finished; |
192 | |
193 | friend class document; |
194 | friend class document_stream; |
195 | friend class json_iterator; |
196 | }; |
197 | |
198 | /** |
199 | * Start iterating the documents in the stream. |
200 | */ |
201 | simdjson_inline iterator begin() noexcept; |
202 | /** |
203 | * The end of the stream, for iterator comparison purposes. |
204 | */ |
205 | simdjson_inline iterator end() noexcept; |
206 | |
207 | private: |
208 | |
209 | document_stream &operator=(const document_stream &) = delete; // Disallow copying |
210 | document_stream(const document_stream &other) = delete; // Disallow copying |
211 | |
212 | /** |
213 | * Construct a document_stream. Does not allocate or parse anything until the iterator is |
214 | * used. |
215 | * |
216 | * @param parser is a reference to the parser instance used to generate this document_stream |
217 | * @param buf is the raw byte buffer we need to process |
218 | * @param len is the length of the raw byte buffer in bytes |
219 | * @param batch_size is the size of the windows (must be strictly greater or equal to the largest JSON document) |
220 | */ |
221 | simdjson_inline document_stream( |
222 | ondemand::parser &parser, |
223 | const uint8_t *buf, |
224 | size_t len, |
225 | size_t batch_size |
226 | ) noexcept; |
227 | |
228 | /** |
229 | * Parse the first document in the buffer. Used by begin(), to handle allocation and |
230 | * initialization. |
231 | */ |
232 | inline void start() noexcept; |
233 | |
234 | /** |
235 | * Parse the next document found in the buffer previously given to document_stream. |
236 | * |
237 | * The content should be a valid JSON document encoded as UTF-8. If there is a |
238 | * UTF-8 BOM, the caller is responsible for omitting it, UTF-8 BOM are |
239 | * discouraged. |
240 | * |
241 | * You do NOT need to pre-allocate a parser. This function takes care of |
242 | * pre-allocating a capacity defined by the batch_size defined when creating the |
243 | * document_stream object. |
244 | * |
245 | * The function returns simdjson::EMPTY if there is no more data to be parsed. |
246 | * |
247 | * The function returns simdjson::SUCCESS (as integer = 0) in case of success |
248 | * and indicates that the buffer has successfully been parsed to the end. |
249 | * Every document it contained has been parsed without error. |
250 | * |
251 | * The function returns an error code from simdjson/simdjson.h in case of failure |
252 | * such as simdjson::CAPACITY, simdjson::MEMALLOC, simdjson::DEPTH_ERROR and so forth; |
253 | * the simdjson::error_message function converts these error codes into a string). |
254 | * |
255 | * You can also check validity by calling parser.is_valid(). The same parser can |
256 | * and should be reused for the other documents in the buffer. |
257 | */ |
258 | inline void next() noexcept; |
259 | |
260 | /** Move the json_iterator of the document to the location of the next document in the stream. */ |
261 | inline void next_document() noexcept; |
262 | |
263 | /** Get the next document index. */ |
264 | inline size_t next_batch_start() const noexcept; |
265 | |
266 | /** Pass the next batch through stage 1 with the given parser. */ |
267 | inline error_code run_stage1(ondemand::parser &p, size_t batch_start) noexcept; |
268 | |
269 | // Fields |
270 | ondemand::parser *parser; |
271 | const uint8_t *buf; |
272 | size_t len; |
273 | size_t batch_size; |
274 | /** |
275 | * We are going to use just one document instance. The document owns |
276 | * the json_iterator. It implies that we only ever pass a reference |
277 | * to the document to the users. |
278 | */ |
279 | document doc{}; |
280 | /** The error (or lack thereof) from the current document. */ |
281 | error_code error; |
282 | size_t batch_start{0}; |
283 | size_t doc_index{}; |
284 | |
285 | #ifdef SIMDJSON_THREADS_ENABLED |
286 | /** Indicates whether we use threads. Note that this needs to be a constant during the execution of the parsing. */ |
287 | bool use_thread; |
288 | |
289 | inline void load_from_stage1_thread() noexcept; |
290 | |
291 | /** Start a thread to run stage 1 on the next batch. */ |
292 | inline void start_stage1_thread() noexcept; |
293 | |
294 | /** Wait for the stage 1 thread to finish and capture the results. */ |
295 | inline void finish_stage1_thread() noexcept; |
296 | |
297 | /** The error returned from the stage 1 thread. */ |
298 | error_code stage1_thread_error{UNINITIALIZED}; |
299 | /** The thread used to run stage 1 against the next batch in the background. */ |
300 | std::unique_ptr<stage1_worker> worker{new(std::nothrow) stage1_worker()}; |
301 | /** |
302 | * The parser used to run stage 1 in the background. Will be swapped |
303 | * with the regular parser when finished. |
304 | */ |
305 | ondemand::parser stage1_thread_parser{}; |
306 | |
307 | friend struct stage1_worker; |
308 | #endif // SIMDJSON_THREADS_ENABLED |
309 | |
310 | friend class parser; |
311 | friend class document; |
312 | friend class json_iterator; |
313 | friend struct simdjson_result<ondemand::document_stream>; |
314 | friend struct internal::simdjson_result_base<ondemand::document_stream>; |
315 | }; // document_stream |
316 | |
317 | } // namespace ondemand |
318 | } // namespace SIMDJSON_IMPLEMENTATION |
319 | } // namespace simdjson |
320 | |
321 | namespace simdjson { |
322 | template<> |
323 | struct simdjson_result<SIMDJSON_IMPLEMENTATION::ondemand::document_stream> : public SIMDJSON_IMPLEMENTATION::implementation_simdjson_result_base<SIMDJSON_IMPLEMENTATION::ondemand::document_stream> { |
324 | public: |
325 | simdjson_inline simdjson_result(SIMDJSON_IMPLEMENTATION::ondemand::document_stream &&value) noexcept; ///< @private |
326 | simdjson_inline simdjson_result(error_code error) noexcept; ///< @private |
327 | simdjson_inline simdjson_result() noexcept = default; |
328 | }; |
329 | |
330 | } // namespace simdjson |
331 | |