| 1 | #ifndef SIMDJSON_DOCUMENT_STREAM_H |
| 2 | #define SIMDJSON_DOCUMENT_STREAM_H |
| 3 | |
| 4 | #include "simdjson/common_defs.h" |
| 5 | #include "simdjson/dom/parser.h" |
| 6 | #include "simdjson/error.h" |
| 7 | #ifdef SIMDJSON_THREADS_ENABLED |
| 8 | #include <thread> |
| 9 | #include <mutex> |
| 10 | #include <condition_variable> |
| 11 | #endif |
| 12 | |
| 13 | namespace simdjson { |
| 14 | namespace dom { |
| 15 | |
| 16 | |
| 17 | #ifdef SIMDJSON_THREADS_ENABLED |
| 18 | /** @private Custom worker class **/ |
| 19 | struct stage1_worker { |
| 20 | stage1_worker() noexcept = default; |
| 21 | stage1_worker(const stage1_worker&) = delete; |
| 22 | stage1_worker(stage1_worker&&) = delete; |
| 23 | stage1_worker operator=(const stage1_worker&) = delete; |
| 24 | ~stage1_worker(); |
| 25 | /** |
| 26 | * We only start the thread when it is needed, not at object construction, this may throw. |
| 27 | * You should only call this once. |
| 28 | **/ |
| 29 | void start_thread(); |
| 30 | /** |
| 31 | * Start a stage 1 job. You should first call 'run', then 'finish'. |
| 32 | * You must call start_thread once before. |
| 33 | */ |
| 34 | void run(document_stream * ds, dom::parser * stage1, size_t next_batch_start); |
| 35 | /** Wait for the run to finish (blocking). You should first call 'run', then 'finish'. **/ |
| 36 | void finish(); |
| 37 | |
| 38 | private: |
| 39 | |
| 40 | /** |
| 41 | * Normally, we would never stop the thread. But we do in the destructor. |
| 42 | * This function is only safe assuming that you are not waiting for results. You |
| 43 | * should have called run, then finish, and be done. |
| 44 | **/ |
| 45 | void stop_thread(); |
| 46 | |
| 47 | std::thread thread{}; |
| 48 | /** These three variables define the work done by the thread. **/ |
| 49 | dom::parser * stage1_thread_parser{}; |
| 50 | size_t _next_batch_start{}; |
| 51 | document_stream * owner{}; |
| 52 | /** |
| 53 | * We have two state variables. This could be streamlined to one variable in the future but |
| 54 | * we use two for clarity. |
| 55 | */ |
| 56 | bool has_work{false}; |
| 57 | bool can_work{true}; |
| 58 | |
| 59 | /** |
| 60 | * We lock using a mutex. |
| 61 | */ |
| 62 | std::mutex locking_mutex{}; |
| 63 | std::condition_variable cond_var{}; |
| 64 | }; |
| 65 | #endif |
| 66 | |
| 67 | /** |
| 68 | * A forward-only stream of documents. |
| 69 | * |
| 70 | * Produced by parser::parse_many. |
| 71 | * |
| 72 | */ |
| 73 | class document_stream { |
| 74 | public: |
| 75 | /** |
| 76 | * Construct an uninitialized document_stream. |
| 77 | * |
| 78 | * ```c++ |
| 79 | * document_stream docs; |
| 80 | * error = parser.parse_many(json).get(docs); |
| 81 | * ``` |
| 82 | */ |
| 83 | simdjson_inline document_stream() noexcept; |
| 84 | /** Move one document_stream to another. */ |
| 85 | simdjson_inline document_stream(document_stream &&other) noexcept = default; |
| 86 | /** Move one document_stream to another. */ |
| 87 | simdjson_inline document_stream &operator=(document_stream &&other) noexcept = default; |
| 88 | |
| 89 | simdjson_inline ~document_stream() noexcept; |
| 90 | /** |
| 91 | * Returns the input size in bytes. |
| 92 | */ |
| 93 | inline size_t size_in_bytes() const noexcept; |
| 94 | /** |
| 95 | * After iterating through the stream, this method |
| 96 | * returns the number of bytes that were not parsed at the end |
| 97 | * of the stream. If truncated_bytes() differs from zero, |
| 98 | * then the input was truncated maybe because incomplete JSON |
| 99 | * documents were found at the end of the stream. You |
| 100 | * may need to process the bytes in the interval [size_in_bytes()-truncated_bytes(), size_in_bytes()). |
| 101 | * |
| 102 | * You should only call truncated_bytes() after streaming through all |
| 103 | * documents, like so: |
| 104 | * |
| 105 | * document_stream stream = parser.parse_many(json,window); |
| 106 | * for(auto doc : stream) { |
| 107 | * // do something with doc |
| 108 | * } |
| 109 | * size_t truncated = stream.truncated_bytes(); |
| 110 | * |
| 111 | */ |
| 112 | inline size_t truncated_bytes() const noexcept; |
| 113 | /** |
| 114 | * An iterator through a forward-only stream of documents. |
| 115 | */ |
| 116 | class iterator { |
| 117 | public: |
| 118 | using value_type = simdjson_result<element>; |
| 119 | using reference = value_type; |
| 120 | |
| 121 | using difference_type = std::ptrdiff_t; |
| 122 | |
| 123 | using iterator_category = std::input_iterator_tag; |
| 124 | |
| 125 | /** |
| 126 | * Default constructor. |
| 127 | */ |
| 128 | simdjson_inline iterator() noexcept; |
| 129 | /** |
| 130 | * Get the current document (or error). |
| 131 | */ |
| 132 | simdjson_inline reference operator*() noexcept; |
| 133 | /** |
| 134 | * Advance to the next document (prefix). |
| 135 | */ |
| 136 | inline iterator& operator++() noexcept; |
| 137 | /** |
| 138 | * Check if we're at the end yet. |
| 139 | * @param other the end iterator to compare to. |
| 140 | */ |
| 141 | simdjson_inline bool operator!=(const iterator &other) const noexcept; |
| 142 | /** |
| 143 | * @private |
| 144 | * |
| 145 | * Gives the current index in the input document in bytes. |
| 146 | * |
| 147 | * document_stream stream = parser.parse_many(json,window); |
| 148 | * for(auto i = stream.begin(); i != stream.end(); ++i) { |
| 149 | * auto doc = *i; |
| 150 | * size_t index = i.current_index(); |
| 151 | * } |
| 152 | * |
| 153 | * This function (current_index()) is experimental and the usage |
| 154 | * may change in future versions of simdjson: we find the API somewhat |
| 155 | * awkward and we would like to offer something friendlier. |
| 156 | */ |
| 157 | simdjson_inline size_t current_index() const noexcept; |
| 158 | /** |
| 159 | * @private |
| 160 | * |
| 161 | * Gives a view of the current document. |
| 162 | * |
| 163 | * document_stream stream = parser.parse_many(json,window); |
| 164 | * for(auto i = stream.begin(); i != stream.end(); ++i) { |
| 165 | * auto doc = *i; |
| 166 | * std::string_view v = i->source(); |
| 167 | * } |
| 168 | * |
| 169 | * The returned string_view instance is simply a map to the (unparsed) |
| 170 | * source string: it may thus include white-space characters and all manner |
| 171 | * of padding. |
| 172 | * |
| 173 | * This function (source()) is experimental and the usage |
| 174 | * may change in future versions of simdjson: we find the API somewhat |
| 175 | * awkward and we would like to offer something friendlier. |
| 176 | */ |
| 177 | simdjson_inline std::string_view source() const noexcept; |
| 178 | |
| 179 | private: |
| 180 | simdjson_inline iterator(document_stream *s, bool finished) noexcept; |
| 181 | /** The document_stream we're iterating through. */ |
| 182 | document_stream* stream; |
| 183 | /** Whether we're finished or not. */ |
| 184 | bool finished; |
| 185 | friend class document_stream; |
| 186 | }; |
| 187 | |
| 188 | /** |
| 189 | * Start iterating the documents in the stream. |
| 190 | */ |
| 191 | simdjson_inline iterator begin() noexcept; |
| 192 | /** |
| 193 | * The end of the stream, for iterator comparison purposes. |
| 194 | */ |
| 195 | simdjson_inline iterator end() noexcept; |
| 196 | |
| 197 | private: |
| 198 | |
| 199 | document_stream &operator=(const document_stream &) = delete; // Disallow copying |
| 200 | document_stream(const document_stream &other) = delete; // Disallow copying |
| 201 | |
| 202 | /** |
| 203 | * Construct a document_stream. Does not allocate or parse anything until the iterator is |
| 204 | * used. |
| 205 | * |
| 206 | * @param parser is a reference to the parser instance used to generate this document_stream |
| 207 | * @param buf is the raw byte buffer we need to process |
| 208 | * @param len is the length of the raw byte buffer in bytes |
| 209 | * @param batch_size is the size of the windows (must be strictly greater or equal to the largest JSON document) |
| 210 | */ |
| 211 | simdjson_inline document_stream( |
| 212 | dom::parser &parser, |
| 213 | const uint8_t *buf, |
| 214 | size_t len, |
| 215 | size_t batch_size |
| 216 | ) noexcept; |
| 217 | |
| 218 | /** |
| 219 | * Parse the first document in the buffer. Used by begin(), to handle allocation and |
| 220 | * initialization. |
| 221 | */ |
| 222 | inline void start() noexcept; |
| 223 | |
| 224 | /** |
| 225 | * Parse the next document found in the buffer previously given to document_stream. |
| 226 | * |
| 227 | * The content should be a valid JSON document encoded as UTF-8. If there is a |
| 228 | * UTF-8 BOM, the caller is responsible for omitting it, UTF-8 BOM are |
| 229 | * discouraged. |
| 230 | * |
| 231 | * You do NOT need to pre-allocate a parser. This function takes care of |
| 232 | * pre-allocating a capacity defined by the batch_size defined when creating the |
| 233 | * document_stream object. |
| 234 | * |
| 235 | * The function returns simdjson::EMPTY if there is no more data to be parsed. |
| 236 | * |
| 237 | * The function returns simdjson::SUCCESS (as integer = 0) in case of success |
| 238 | * and indicates that the buffer has successfully been parsed to the end. |
| 239 | * Every document it contained has been parsed without error. |
| 240 | * |
| 241 | * The function returns an error code from simdjson/simdjson.h in case of failure |
| 242 | * such as simdjson::CAPACITY, simdjson::MEMALLOC, simdjson::DEPTH_ERROR and so forth; |
| 243 | * the simdjson::error_message function converts these error codes into a string). |
| 244 | * |
| 245 | * You can also check validity by calling parser.is_valid(). The same parser can |
| 246 | * and should be reused for the other documents in the buffer. |
| 247 | */ |
| 248 | inline void next() noexcept; |
| 249 | |
| 250 | /** |
| 251 | * Pass the next batch through stage 1 and return when finished. |
| 252 | * When threads are enabled, this may wait for the stage 1 thread to finish. |
| 253 | */ |
| 254 | inline void load_batch() noexcept; |
| 255 | |
| 256 | /** Get the next document index. */ |
| 257 | inline size_t next_batch_start() const noexcept; |
| 258 | |
| 259 | /** Pass the next batch through stage 1 with the given parser. */ |
| 260 | inline error_code run_stage1(dom::parser &p, size_t batch_start) noexcept; |
| 261 | |
| 262 | dom::parser *parser; |
| 263 | const uint8_t *buf; |
| 264 | size_t len; |
| 265 | size_t batch_size; |
| 266 | /** The error (or lack thereof) from the current document. */ |
| 267 | error_code error; |
| 268 | size_t batch_start{0}; |
| 269 | size_t doc_index{}; |
| 270 | #ifdef SIMDJSON_THREADS_ENABLED |
| 271 | /** Indicates whether we use threads. Note that this needs to be a constant during the execution of the parsing. */ |
| 272 | bool use_thread; |
| 273 | |
| 274 | inline void load_from_stage1_thread() noexcept; |
| 275 | |
| 276 | /** Start a thread to run stage 1 on the next batch. */ |
| 277 | inline void start_stage1_thread() noexcept; |
| 278 | |
| 279 | /** Wait for the stage 1 thread to finish and capture the results. */ |
| 280 | inline void finish_stage1_thread() noexcept; |
| 281 | |
| 282 | /** The error returned from the stage 1 thread. */ |
| 283 | error_code stage1_thread_error{UNINITIALIZED}; |
| 284 | /** The thread used to run stage 1 against the next batch in the background. */ |
| 285 | friend struct stage1_worker; |
| 286 | std::unique_ptr<stage1_worker> worker{new(std::nothrow) stage1_worker()}; |
| 287 | /** |
| 288 | * The parser used to run stage 1 in the background. Will be swapped |
| 289 | * with the regular parser when finished. |
| 290 | */ |
| 291 | dom::parser stage1_thread_parser{}; |
| 292 | #endif // SIMDJSON_THREADS_ENABLED |
| 293 | |
| 294 | friend class dom::parser; |
| 295 | friend struct simdjson_result<dom::document_stream>; |
| 296 | friend struct internal::simdjson_result_base<dom::document_stream>; |
| 297 | |
| 298 | }; // class document_stream |
| 299 | |
| 300 | } // namespace dom |
| 301 | |
| 302 | template<> |
| 303 | struct simdjson_result<dom::document_stream> : public internal::simdjson_result_base<dom::document_stream> { |
| 304 | public: |
| 305 | simdjson_inline simdjson_result() noexcept; ///< @private |
| 306 | simdjson_inline simdjson_result(error_code error) noexcept; ///< @private |
| 307 | simdjson_inline simdjson_result(dom::document_stream &&value) noexcept; ///< @private |
| 308 | |
| 309 | #if SIMDJSON_EXCEPTIONS |
| 310 | simdjson_inline dom::document_stream::iterator begin() noexcept(false); |
| 311 | simdjson_inline dom::document_stream::iterator end() noexcept(false); |
| 312 | #else // SIMDJSON_EXCEPTIONS |
| 313 | #ifndef SIMDJSON_DISABLE_DEPRECATED_API |
| 314 | [[deprecated("parse_many() and load_many() may return errors. Use document_stream stream; error = parser.parse_many().get(doc); instead." )]] |
| 315 | simdjson_inline dom::document_stream::iterator begin() noexcept; |
| 316 | [[deprecated("parse_many() and load_many() may return errors. Use document_stream stream; error = parser.parse_many().get(doc); instead." )]] |
| 317 | simdjson_inline dom::document_stream::iterator end() noexcept; |
| 318 | #endif // SIMDJSON_DISABLE_DEPRECATED_API |
| 319 | #endif // SIMDJSON_EXCEPTIONS |
| 320 | }; // struct simdjson_result<dom::document_stream> |
| 321 | |
| 322 | } // namespace simdjson |
| 323 | |
| 324 | #endif // SIMDJSON_DOCUMENT_STREAM_H |
| 325 | |