1 | /* |
2 | ** 2011-07-09 |
3 | ** |
4 | ** The author disclaims copyright to this source code. In place of |
5 | ** a legal notice, here is a blessing: |
6 | ** |
7 | ** May you do good and not evil. |
8 | ** May you find forgiveness for yourself and forgive others. |
9 | ** May you share freely, never taking more than you give. |
10 | ** |
11 | ************************************************************************* |
12 | ** This file contains code for the VdbeSorter object, used in concert with |
13 | ** a VdbeCursor to sort large numbers of keys for CREATE INDEX statements |
14 | ** or by SELECT statements with ORDER BY clauses that cannot be satisfied |
15 | ** using indexes and without LIMIT clauses. |
16 | ** |
17 | ** The VdbeSorter object implements a multi-threaded external merge sort |
18 | ** algorithm that is efficient even if the number of elements being sorted |
19 | ** exceeds the available memory. |
20 | ** |
21 | ** Here is the (internal, non-API) interface between this module and the |
22 | ** rest of the SQLite system: |
23 | ** |
24 | ** sqlite3VdbeSorterInit() Create a new VdbeSorter object. |
25 | ** |
26 | ** sqlite3VdbeSorterWrite() Add a single new row to the VdbeSorter |
27 | ** object. The row is a binary blob in the |
28 | ** OP_MakeRecord format that contains both |
29 | ** the ORDER BY key columns and result columns |
30 | ** in the case of a SELECT w/ ORDER BY, or |
31 | ** the complete record for an index entry |
32 | ** in the case of a CREATE INDEX. |
33 | ** |
34 | ** sqlite3VdbeSorterRewind() Sort all content previously added. |
35 | ** Position the read cursor on the |
36 | ** first sorted element. |
37 | ** |
38 | ** sqlite3VdbeSorterNext() Advance the read cursor to the next sorted |
39 | ** element. |
40 | ** |
41 | ** sqlite3VdbeSorterRowkey() Return the complete binary blob for the |
42 | ** row currently under the read cursor. |
43 | ** |
44 | ** sqlite3VdbeSorterCompare() Compare the binary blob for the row |
45 | ** currently under the read cursor against |
46 | ** another binary blob X and report if |
47 | ** X is strictly less than the read cursor. |
48 | ** Used to enforce uniqueness in a |
49 | ** CREATE UNIQUE INDEX statement. |
50 | ** |
51 | ** sqlite3VdbeSorterClose() Close the VdbeSorter object and reclaim |
52 | ** all resources. |
53 | ** |
54 | ** sqlite3VdbeSorterReset() Refurbish the VdbeSorter for reuse. This |
55 | ** is like Close() followed by Init() only |
56 | ** much faster. |
57 | ** |
58 | ** The interfaces above must be called in a particular order. Write() can |
59 | ** only occur in between Init()/Reset() and Rewind(). Next(), Rowkey(), and |
60 | ** Compare() can only occur in between Rewind() and Close()/Reset(). i.e. |
61 | ** |
62 | ** Init() |
63 | ** for each record: Write() |
64 | ** Rewind() |
65 | ** Rowkey()/Compare() |
66 | ** Next() |
67 | ** Close() |
68 | ** |
69 | ** Algorithm: |
70 | ** |
71 | ** Records passed to the sorter via calls to Write() are initially held |
72 | ** unsorted in main memory. Assuming the amount of memory used never exceeds |
73 | ** a threshold, when Rewind() is called the set of records is sorted using |
74 | ** an in-memory merge sort. In this case, no temporary files are required |
75 | ** and subsequent calls to Rowkey(), Next() and Compare() read records |
76 | ** directly from main memory. |
77 | ** |
78 | ** If the amount of space used to store records in main memory exceeds the |
79 | ** threshold, then the set of records currently in memory are sorted and |
80 | ** written to a temporary file in "Packed Memory Array" (PMA) format. |
81 | ** A PMA created at this point is known as a "level-0 PMA". Higher levels |
82 | ** of PMAs may be created by merging existing PMAs together - for example |
83 | ** merging two or more level-0 PMAs together creates a level-1 PMA. |
84 | ** |
85 | ** The threshold for the amount of main memory to use before flushing |
86 | ** records to a PMA is roughly the same as the limit configured for the |
87 | ** page-cache of the main database. Specifically, the threshold is set to |
88 | ** the value returned by "PRAGMA main.page_size" multipled by |
89 | ** that returned by "PRAGMA main.cache_size", in bytes. |
90 | ** |
91 | ** If the sorter is running in single-threaded mode, then all PMAs generated |
92 | ** are appended to a single temporary file. Or, if the sorter is running in |
93 | ** multi-threaded mode then up to (N+1) temporary files may be opened, where |
94 | ** N is the configured number of worker threads. In this case, instead of |
95 | ** sorting the records and writing the PMA to a temporary file itself, the |
96 | ** calling thread usually launches a worker thread to do so. Except, if |
97 | ** there are already N worker threads running, the main thread does the work |
98 | ** itself. |
99 | ** |
100 | ** The sorter is running in multi-threaded mode if (a) the library was built |
101 | ** with pre-processor symbol SQLITE_MAX_WORKER_THREADS set to a value greater |
102 | ** than zero, and (b) worker threads have been enabled at runtime by calling |
103 | ** "PRAGMA threads=N" with some value of N greater than 0. |
104 | ** |
105 | ** When Rewind() is called, any data remaining in memory is flushed to a |
106 | ** final PMA. So at this point the data is stored in some number of sorted |
107 | ** PMAs within temporary files on disk. |
108 | ** |
109 | ** If there are fewer than SORTER_MAX_MERGE_COUNT PMAs in total and the |
110 | ** sorter is running in single-threaded mode, then these PMAs are merged |
111 | ** incrementally as keys are retreived from the sorter by the VDBE. The |
112 | ** MergeEngine object, described in further detail below, performs this |
113 | ** merge. |
114 | ** |
115 | ** Or, if running in multi-threaded mode, then a background thread is |
116 | ** launched to merge the existing PMAs. Once the background thread has |
117 | ** merged T bytes of data into a single sorted PMA, the main thread |
118 | ** begins reading keys from that PMA while the background thread proceeds |
119 | ** with merging the next T bytes of data. And so on. |
120 | ** |
121 | ** Parameter T is set to half the value of the memory threshold used |
122 | ** by Write() above to determine when to create a new PMA. |
123 | ** |
124 | ** If there are more than SORTER_MAX_MERGE_COUNT PMAs in total when |
125 | ** Rewind() is called, then a hierarchy of incremental-merges is used. |
126 | ** First, T bytes of data from the first SORTER_MAX_MERGE_COUNT PMAs on |
127 | ** disk are merged together. Then T bytes of data from the second set, and |
128 | ** so on, such that no operation ever merges more than SORTER_MAX_MERGE_COUNT |
129 | ** PMAs at a time. This done is to improve locality. |
130 | ** |
131 | ** If running in multi-threaded mode and there are more than |
132 | ** SORTER_MAX_MERGE_COUNT PMAs on disk when Rewind() is called, then more |
133 | ** than one background thread may be created. Specifically, there may be |
134 | ** one background thread for each temporary file on disk, and one background |
135 | ** thread to merge the output of each of the others to a single PMA for |
136 | ** the main thread to read from. |
137 | */ |
138 | #include "sqliteInt.h" |
139 | #include "vdbeInt.h" |
140 | |
141 | /* |
142 | ** If SQLITE_DEBUG_SORTER_THREADS is defined, this module outputs various |
143 | ** messages to stderr that may be helpful in understanding the performance |
144 | ** characteristics of the sorter in multi-threaded mode. |
145 | */ |
146 | #if 0 |
147 | # define SQLITE_DEBUG_SORTER_THREADS 1 |
148 | #endif |
149 | |
150 | /* |
151 | ** Hard-coded maximum amount of data to accumulate in memory before flushing |
152 | ** to a level 0 PMA. The purpose of this limit is to prevent various integer |
153 | ** overflows. 512MiB. |
154 | */ |
155 | #define SQLITE_MAX_PMASZ (1<<29) |
156 | |
157 | /* |
158 | ** Private objects used by the sorter |
159 | */ |
160 | typedef struct MergeEngine MergeEngine; /* Merge PMAs together */ |
161 | typedef struct PmaReader PmaReader; /* Incrementally read one PMA */ |
162 | typedef struct PmaWriter PmaWriter; /* Incrementally write one PMA */ |
163 | typedef struct SorterRecord SorterRecord; /* A record being sorted */ |
164 | typedef struct SortSubtask SortSubtask; /* A sub-task in the sort process */ |
165 | typedef struct SorterFile SorterFile; /* Temporary file object wrapper */ |
166 | typedef struct SorterList SorterList; /* In-memory list of records */ |
167 | typedef struct IncrMerger IncrMerger; /* Read & merge multiple PMAs */ |
168 | |
169 | /* |
170 | ** A container for a temp file handle and the current amount of data |
171 | ** stored in the file. |
172 | */ |
173 | struct SorterFile { |
174 | sqlite3_file *pFd; /* File handle */ |
175 | i64 iEof; /* Bytes of data stored in pFd */ |
176 | }; |
177 | |
178 | /* |
179 | ** An in-memory list of objects to be sorted. |
180 | ** |
181 | ** If aMemory==0 then each object is allocated separately and the objects |
182 | ** are connected using SorterRecord.u.pNext. If aMemory!=0 then all objects |
183 | ** are stored in the aMemory[] bulk memory, one right after the other, and |
184 | ** are connected using SorterRecord.u.iNext. |
185 | */ |
186 | struct SorterList { |
187 | SorterRecord *pList; /* Linked list of records */ |
188 | u8 *aMemory; /* If non-NULL, bulk memory to hold pList */ |
189 | int szPMA; /* Size of pList as PMA in bytes */ |
190 | }; |
191 | |
192 | /* |
193 | ** The MergeEngine object is used to combine two or more smaller PMAs into |
194 | ** one big PMA using a merge operation. Separate PMAs all need to be |
195 | ** combined into one big PMA in order to be able to step through the sorted |
196 | ** records in order. |
197 | ** |
198 | ** The aReadr[] array contains a PmaReader object for each of the PMAs being |
199 | ** merged. An aReadr[] object either points to a valid key or else is at EOF. |
200 | ** ("EOF" means "End Of File". When aReadr[] is at EOF there is no more data.) |
201 | ** For the purposes of the paragraphs below, we assume that the array is |
202 | ** actually N elements in size, where N is the smallest power of 2 greater |
203 | ** to or equal to the number of PMAs being merged. The extra aReadr[] elements |
204 | ** are treated as if they are empty (always at EOF). |
205 | ** |
206 | ** The aTree[] array is also N elements in size. The value of N is stored in |
207 | ** the MergeEngine.nTree variable. |
208 | ** |
209 | ** The final (N/2) elements of aTree[] contain the results of comparing |
210 | ** pairs of PMA keys together. Element i contains the result of |
211 | ** comparing aReadr[2*i-N] and aReadr[2*i-N+1]. Whichever key is smaller, the |
212 | ** aTree element is set to the index of it. |
213 | ** |
214 | ** For the purposes of this comparison, EOF is considered greater than any |
215 | ** other key value. If the keys are equal (only possible with two EOF |
216 | ** values), it doesn't matter which index is stored. |
217 | ** |
218 | ** The (N/4) elements of aTree[] that precede the final (N/2) described |
219 | ** above contains the index of the smallest of each block of 4 PmaReaders |
220 | ** And so on. So that aTree[1] contains the index of the PmaReader that |
221 | ** currently points to the smallest key value. aTree[0] is unused. |
222 | ** |
223 | ** Example: |
224 | ** |
225 | ** aReadr[0] -> Banana |
226 | ** aReadr[1] -> Feijoa |
227 | ** aReadr[2] -> Elderberry |
228 | ** aReadr[3] -> Currant |
229 | ** aReadr[4] -> Grapefruit |
230 | ** aReadr[5] -> Apple |
231 | ** aReadr[6] -> Durian |
232 | ** aReadr[7] -> EOF |
233 | ** |
234 | ** aTree[] = { X, 5 0, 5 0, 3, 5, 6 } |
235 | ** |
236 | ** The current element is "Apple" (the value of the key indicated by |
237 | ** PmaReader 5). When the Next() operation is invoked, PmaReader 5 will |
238 | ** be advanced to the next key in its segment. Say the next key is |
239 | ** "Eggplant": |
240 | ** |
241 | ** aReadr[5] -> Eggplant |
242 | ** |
243 | ** The contents of aTree[] are updated first by comparing the new PmaReader |
244 | ** 5 key to the current key of PmaReader 4 (still "Grapefruit"). The PmaReader |
245 | ** 5 value is still smaller, so aTree[6] is set to 5. And so on up the tree. |
246 | ** The value of PmaReader 6 - "Durian" - is now smaller than that of PmaReader |
247 | ** 5, so aTree[3] is set to 6. Key 0 is smaller than key 6 (Banana<Durian), |
248 | ** so the value written into element 1 of the array is 0. As follows: |
249 | ** |
250 | ** aTree[] = { X, 0 0, 6 0, 3, 5, 6 } |
251 | ** |
252 | ** In other words, each time we advance to the next sorter element, log2(N) |
253 | ** key comparison operations are required, where N is the number of segments |
254 | ** being merged (rounded up to the next power of 2). |
255 | */ |
256 | struct MergeEngine { |
257 | int nTree; /* Used size of aTree/aReadr (power of 2) */ |
258 | SortSubtask *pTask; /* Used by this thread only */ |
259 | int *aTree; /* Current state of incremental merge */ |
260 | PmaReader *aReadr; /* Array of PmaReaders to merge data from */ |
261 | }; |
262 | |
263 | /* |
264 | ** This object represents a single thread of control in a sort operation. |
265 | ** Exactly VdbeSorter.nTask instances of this object are allocated |
266 | ** as part of each VdbeSorter object. Instances are never allocated any |
267 | ** other way. VdbeSorter.nTask is set to the number of worker threads allowed |
268 | ** (see SQLITE_CONFIG_WORKER_THREADS) plus one (the main thread). Thus for |
269 | ** single-threaded operation, there is exactly one instance of this object |
270 | ** and for multi-threaded operation there are two or more instances. |
271 | ** |
272 | ** Essentially, this structure contains all those fields of the VdbeSorter |
273 | ** structure for which each thread requires a separate instance. For example, |
274 | ** each thread requries its own UnpackedRecord object to unpack records in |
275 | ** as part of comparison operations. |
276 | ** |
277 | ** Before a background thread is launched, variable bDone is set to 0. Then, |
278 | ** right before it exits, the thread itself sets bDone to 1. This is used for |
279 | ** two purposes: |
280 | ** |
281 | ** 1. When flushing the contents of memory to a level-0 PMA on disk, to |
282 | ** attempt to select a SortSubtask for which there is not already an |
283 | ** active background thread (since doing so causes the main thread |
284 | ** to block until it finishes). |
285 | ** |
286 | ** 2. If SQLITE_DEBUG_SORTER_THREADS is defined, to determine if a call |
287 | ** to sqlite3ThreadJoin() is likely to block. Cases that are likely to |
288 | ** block provoke debugging output. |
289 | ** |
290 | ** In both cases, the effects of the main thread seeing (bDone==0) even |
291 | ** after the thread has finished are not dire. So we don't worry about |
292 | ** memory barriers and such here. |
293 | */ |
294 | typedef int (*SorterCompare)(SortSubtask*,int*,const void*,int,const void*,int); |
295 | struct SortSubtask { |
296 | SQLiteThread *pThread; /* Background thread, if any */ |
297 | int bDone; /* Set if thread is finished but not joined */ |
298 | VdbeSorter *pSorter; /* Sorter that owns this sub-task */ |
299 | UnpackedRecord *pUnpacked; /* Space to unpack a record */ |
300 | SorterList list; /* List for thread to write to a PMA */ |
301 | int nPMA; /* Number of PMAs currently in file */ |
302 | SorterCompare xCompare; /* Compare function to use */ |
303 | SorterFile file; /* Temp file for level-0 PMAs */ |
304 | SorterFile file2; /* Space for other PMAs */ |
305 | }; |
306 | |
307 | |
308 | /* |
309 | ** Main sorter structure. A single instance of this is allocated for each |
310 | ** sorter cursor created by the VDBE. |
311 | ** |
312 | ** mxKeysize: |
313 | ** As records are added to the sorter by calls to sqlite3VdbeSorterWrite(), |
314 | ** this variable is updated so as to be set to the size on disk of the |
315 | ** largest record in the sorter. |
316 | */ |
317 | struct VdbeSorter { |
318 | int mnPmaSize; /* Minimum PMA size, in bytes */ |
319 | int mxPmaSize; /* Maximum PMA size, in bytes. 0==no limit */ |
320 | int mxKeysize; /* Largest serialized key seen so far */ |
321 | int pgsz; /* Main database page size */ |
322 | PmaReader *pReader; /* Readr data from here after Rewind() */ |
323 | MergeEngine *pMerger; /* Or here, if bUseThreads==0 */ |
324 | sqlite3 *db; /* Database connection */ |
325 | KeyInfo *pKeyInfo; /* How to compare records */ |
326 | UnpackedRecord *pUnpacked; /* Used by VdbeSorterCompare() */ |
327 | SorterList list; /* List of in-memory records */ |
328 | int iMemory; /* Offset of free space in list.aMemory */ |
329 | int nMemory; /* Size of list.aMemory allocation in bytes */ |
330 | u8 bUsePMA; /* True if one or more PMAs created */ |
331 | u8 bUseThreads; /* True to use background threads */ |
332 | u8 iPrev; /* Previous thread used to flush PMA */ |
333 | u8 nTask; /* Size of aTask[] array */ |
334 | u8 typeMask; |
335 | SortSubtask aTask[1]; /* One or more subtasks */ |
336 | }; |
337 | |
338 | #define SORTER_TYPE_INTEGER 0x01 |
339 | #define SORTER_TYPE_TEXT 0x02 |
340 | |
341 | /* |
342 | ** An instance of the following object is used to read records out of a |
343 | ** PMA, in sorted order. The next key to be read is cached in nKey/aKey. |
344 | ** aKey might point into aMap or into aBuffer. If neither of those locations |
345 | ** contain a contiguous representation of the key, then aAlloc is allocated |
346 | ** and the key is copied into aAlloc and aKey is made to poitn to aAlloc. |
347 | ** |
348 | ** pFd==0 at EOF. |
349 | */ |
350 | struct PmaReader { |
351 | i64 iReadOff; /* Current read offset */ |
352 | i64 iEof; /* 1 byte past EOF for this PmaReader */ |
353 | int nAlloc; /* Bytes of space at aAlloc */ |
354 | int nKey; /* Number of bytes in key */ |
355 | sqlite3_file *pFd; /* File handle we are reading from */ |
356 | u8 *aAlloc; /* Space for aKey if aBuffer and pMap wont work */ |
357 | u8 *aKey; /* Pointer to current key */ |
358 | u8 *aBuffer; /* Current read buffer */ |
359 | int nBuffer; /* Size of read buffer in bytes */ |
360 | u8 *aMap; /* Pointer to mapping of entire file */ |
361 | IncrMerger *pIncr; /* Incremental merger */ |
362 | }; |
363 | |
364 | /* |
365 | ** Normally, a PmaReader object iterates through an existing PMA stored |
366 | ** within a temp file. However, if the PmaReader.pIncr variable points to |
367 | ** an object of the following type, it may be used to iterate/merge through |
368 | ** multiple PMAs simultaneously. |
369 | ** |
370 | ** There are two types of IncrMerger object - single (bUseThread==0) and |
371 | ** multi-threaded (bUseThread==1). |
372 | ** |
373 | ** A multi-threaded IncrMerger object uses two temporary files - aFile[0] |
374 | ** and aFile[1]. Neither file is allowed to grow to more than mxSz bytes in |
375 | ** size. When the IncrMerger is initialized, it reads enough data from |
376 | ** pMerger to populate aFile[0]. It then sets variables within the |
377 | ** corresponding PmaReader object to read from that file and kicks off |
378 | ** a background thread to populate aFile[1] with the next mxSz bytes of |
379 | ** sorted record data from pMerger. |
380 | ** |
381 | ** When the PmaReader reaches the end of aFile[0], it blocks until the |
382 | ** background thread has finished populating aFile[1]. It then exchanges |
383 | ** the contents of the aFile[0] and aFile[1] variables within this structure, |
384 | ** sets the PmaReader fields to read from the new aFile[0] and kicks off |
385 | ** another background thread to populate the new aFile[1]. And so on, until |
386 | ** the contents of pMerger are exhausted. |
387 | ** |
388 | ** A single-threaded IncrMerger does not open any temporary files of its |
389 | ** own. Instead, it has exclusive access to mxSz bytes of space beginning |
390 | ** at offset iStartOff of file pTask->file2. And instead of using a |
391 | ** background thread to prepare data for the PmaReader, with a single |
392 | ** threaded IncrMerger the allocate part of pTask->file2 is "refilled" with |
393 | ** keys from pMerger by the calling thread whenever the PmaReader runs out |
394 | ** of data. |
395 | */ |
396 | struct IncrMerger { |
397 | SortSubtask *pTask; /* Task that owns this merger */ |
398 | MergeEngine *pMerger; /* Merge engine thread reads data from */ |
399 | i64 iStartOff; /* Offset to start writing file at */ |
400 | int mxSz; /* Maximum bytes of data to store */ |
401 | int bEof; /* Set to true when merge is finished */ |
402 | int bUseThread; /* True to use a bg thread for this object */ |
403 | SorterFile aFile[2]; /* aFile[0] for reading, [1] for writing */ |
404 | }; |
405 | |
406 | /* |
407 | ** An instance of this object is used for writing a PMA. |
408 | ** |
409 | ** The PMA is written one record at a time. Each record is of an arbitrary |
410 | ** size. But I/O is more efficient if it occurs in page-sized blocks where |
411 | ** each block is aligned on a page boundary. This object caches writes to |
412 | ** the PMA so that aligned, page-size blocks are written. |
413 | */ |
414 | struct PmaWriter { |
415 | int eFWErr; /* Non-zero if in an error state */ |
416 | u8 *aBuffer; /* Pointer to write buffer */ |
417 | int nBuffer; /* Size of write buffer in bytes */ |
418 | int iBufStart; /* First byte of buffer to write */ |
419 | int iBufEnd; /* Last byte of buffer to write */ |
420 | i64 iWriteOff; /* Offset of start of buffer in file */ |
421 | sqlite3_file *pFd; /* File handle to write to */ |
422 | }; |
423 | |
424 | /* |
425 | ** This object is the header on a single record while that record is being |
426 | ** held in memory and prior to being written out as part of a PMA. |
427 | ** |
428 | ** How the linked list is connected depends on how memory is being managed |
429 | ** by this module. If using a separate allocation for each in-memory record |
430 | ** (VdbeSorter.list.aMemory==0), then the list is always connected using the |
431 | ** SorterRecord.u.pNext pointers. |
432 | ** |
433 | ** Or, if using the single large allocation method (VdbeSorter.list.aMemory!=0), |
434 | ** then while records are being accumulated the list is linked using the |
435 | ** SorterRecord.u.iNext offset. This is because the aMemory[] array may |
436 | ** be sqlite3Realloc()ed while records are being accumulated. Once the VM |
437 | ** has finished passing records to the sorter, or when the in-memory buffer |
438 | ** is full, the list is sorted. As part of the sorting process, it is |
439 | ** converted to use the SorterRecord.u.pNext pointers. See function |
440 | ** vdbeSorterSort() for details. |
441 | */ |
442 | struct SorterRecord { |
443 | int nVal; /* Size of the record in bytes */ |
444 | union { |
445 | SorterRecord *pNext; /* Pointer to next record in list */ |
446 | int iNext; /* Offset within aMemory of next record */ |
447 | } u; |
448 | /* The data for the record immediately follows this header */ |
449 | }; |
450 | |
451 | /* Return a pointer to the buffer containing the record data for SorterRecord |
452 | ** object p. Should be used as if: |
453 | ** |
454 | ** void *SRVAL(SorterRecord *p) { return (void*)&p[1]; } |
455 | */ |
456 | #define SRVAL(p) ((void*)((SorterRecord*)(p) + 1)) |
457 | |
458 | |
459 | /* Maximum number of PMAs that a single MergeEngine can merge */ |
460 | #define SORTER_MAX_MERGE_COUNT 16 |
461 | |
462 | static int vdbeIncrSwap(IncrMerger*); |
463 | static void vdbeIncrFree(IncrMerger *); |
464 | |
465 | /* |
466 | ** Free all memory belonging to the PmaReader object passed as the |
467 | ** argument. All structure fields are set to zero before returning. |
468 | */ |
469 | static void vdbePmaReaderClear(PmaReader *pReadr){ |
470 | sqlite3_free(pReadr->aAlloc); |
471 | sqlite3_free(pReadr->aBuffer); |
472 | if( pReadr->aMap ) sqlite3OsUnfetch(pReadr->pFd, 0, pReadr->aMap); |
473 | vdbeIncrFree(pReadr->pIncr); |
474 | memset(pReadr, 0, sizeof(PmaReader)); |
475 | } |
476 | |
477 | /* |
478 | ** Read the next nByte bytes of data from the PMA p. |
479 | ** If successful, set *ppOut to point to a buffer containing the data |
480 | ** and return SQLITE_OK. Otherwise, if an error occurs, return an SQLite |
481 | ** error code. |
482 | ** |
483 | ** The buffer returned in *ppOut is only valid until the |
484 | ** next call to this function. |
485 | */ |
486 | static int vdbePmaReadBlob( |
487 | PmaReader *p, /* PmaReader from which to take the blob */ |
488 | int nByte, /* Bytes of data to read */ |
489 | u8 **ppOut /* OUT: Pointer to buffer containing data */ |
490 | ){ |
491 | int iBuf; /* Offset within buffer to read from */ |
492 | int nAvail; /* Bytes of data available in buffer */ |
493 | |
494 | if( p->aMap ){ |
495 | *ppOut = &p->aMap[p->iReadOff]; |
496 | p->iReadOff += nByte; |
497 | return SQLITE_OK; |
498 | } |
499 | |
500 | assert( p->aBuffer ); |
501 | |
502 | /* If there is no more data to be read from the buffer, read the next |
503 | ** p->nBuffer bytes of data from the file into it. Or, if there are less |
504 | ** than p->nBuffer bytes remaining in the PMA, read all remaining data. */ |
505 | iBuf = p->iReadOff % p->nBuffer; |
506 | if( iBuf==0 ){ |
507 | int nRead; /* Bytes to read from disk */ |
508 | int rc; /* sqlite3OsRead() return code */ |
509 | |
510 | /* Determine how many bytes of data to read. */ |
511 | if( (p->iEof - p->iReadOff) > (i64)p->nBuffer ){ |
512 | nRead = p->nBuffer; |
513 | }else{ |
514 | nRead = (int)(p->iEof - p->iReadOff); |
515 | } |
516 | assert( nRead>0 ); |
517 | |
518 | /* Readr data from the file. Return early if an error occurs. */ |
519 | rc = sqlite3OsRead(p->pFd, p->aBuffer, nRead, p->iReadOff); |
520 | assert( rc!=SQLITE_IOERR_SHORT_READ ); |
521 | if( rc!=SQLITE_OK ) return rc; |
522 | } |
523 | nAvail = p->nBuffer - iBuf; |
524 | |
525 | if( nByte<=nAvail ){ |
526 | /* The requested data is available in the in-memory buffer. In this |
527 | ** case there is no need to make a copy of the data, just return a |
528 | ** pointer into the buffer to the caller. */ |
529 | *ppOut = &p->aBuffer[iBuf]; |
530 | p->iReadOff += nByte; |
531 | }else{ |
532 | /* The requested data is not all available in the in-memory buffer. |
533 | ** In this case, allocate space at p->aAlloc[] to copy the requested |
534 | ** range into. Then return a copy of pointer p->aAlloc to the caller. */ |
535 | int nRem; /* Bytes remaining to copy */ |
536 | |
537 | /* Extend the p->aAlloc[] allocation if required. */ |
538 | if( p->nAlloc<nByte ){ |
539 | u8 *aNew; |
540 | sqlite3_int64 nNew = MAX(128, 2*(sqlite3_int64)p->nAlloc); |
541 | while( nByte>nNew ) nNew = nNew*2; |
542 | aNew = sqlite3Realloc(p->aAlloc, nNew); |
543 | if( !aNew ) return SQLITE_NOMEM_BKPT; |
544 | p->nAlloc = nNew; |
545 | p->aAlloc = aNew; |
546 | } |
547 | |
548 | /* Copy as much data as is available in the buffer into the start of |
549 | ** p->aAlloc[]. */ |
550 | memcpy(p->aAlloc, &p->aBuffer[iBuf], nAvail); |
551 | p->iReadOff += nAvail; |
552 | nRem = nByte - nAvail; |
553 | |
554 | /* The following loop copies up to p->nBuffer bytes per iteration into |
555 | ** the p->aAlloc[] buffer. */ |
556 | while( nRem>0 ){ |
557 | int rc; /* vdbePmaReadBlob() return code */ |
558 | int nCopy; /* Number of bytes to copy */ |
559 | u8 *aNext; /* Pointer to buffer to copy data from */ |
560 | |
561 | nCopy = nRem; |
562 | if( nRem>p->nBuffer ) nCopy = p->nBuffer; |
563 | rc = vdbePmaReadBlob(p, nCopy, &aNext); |
564 | if( rc!=SQLITE_OK ) return rc; |
565 | assert( aNext!=p->aAlloc ); |
566 | memcpy(&p->aAlloc[nByte - nRem], aNext, nCopy); |
567 | nRem -= nCopy; |
568 | } |
569 | |
570 | *ppOut = p->aAlloc; |
571 | } |
572 | |
573 | return SQLITE_OK; |
574 | } |
575 | |
576 | /* |
577 | ** Read a varint from the stream of data accessed by p. Set *pnOut to |
578 | ** the value read. |
579 | */ |
580 | static int vdbePmaReadVarint(PmaReader *p, u64 *pnOut){ |
581 | int iBuf; |
582 | |
583 | if( p->aMap ){ |
584 | p->iReadOff += sqlite3GetVarint(&p->aMap[p->iReadOff], pnOut); |
585 | }else{ |
586 | iBuf = p->iReadOff % p->nBuffer; |
587 | if( iBuf && (p->nBuffer-iBuf)>=9 ){ |
588 | p->iReadOff += sqlite3GetVarint(&p->aBuffer[iBuf], pnOut); |
589 | }else{ |
590 | u8 aVarint[16], *a; |
591 | int i = 0, rc; |
592 | do{ |
593 | rc = vdbePmaReadBlob(p, 1, &a); |
594 | if( rc ) return rc; |
595 | aVarint[(i++)&0xf] = a[0]; |
596 | }while( (a[0]&0x80)!=0 ); |
597 | sqlite3GetVarint(aVarint, pnOut); |
598 | } |
599 | } |
600 | |
601 | return SQLITE_OK; |
602 | } |
603 | |
604 | /* |
605 | ** Attempt to memory map file pFile. If successful, set *pp to point to the |
606 | ** new mapping and return SQLITE_OK. If the mapping is not attempted |
607 | ** (because the file is too large or the VFS layer is configured not to use |
608 | ** mmap), return SQLITE_OK and set *pp to NULL. |
609 | ** |
610 | ** Or, if an error occurs, return an SQLite error code. The final value of |
611 | ** *pp is undefined in this case. |
612 | */ |
613 | static int vdbeSorterMapFile(SortSubtask *pTask, SorterFile *pFile, u8 **pp){ |
614 | int rc = SQLITE_OK; |
615 | if( pFile->iEof<=(i64)(pTask->pSorter->db->nMaxSorterMmap) ){ |
616 | sqlite3_file *pFd = pFile->pFd; |
617 | if( pFd->pMethods->iVersion>=3 ){ |
618 | rc = sqlite3OsFetch(pFd, 0, (int)pFile->iEof, (void**)pp); |
619 | testcase( rc!=SQLITE_OK ); |
620 | } |
621 | } |
622 | return rc; |
623 | } |
624 | |
625 | /* |
626 | ** Attach PmaReader pReadr to file pFile (if it is not already attached to |
627 | ** that file) and seek it to offset iOff within the file. Return SQLITE_OK |
628 | ** if successful, or an SQLite error code if an error occurs. |
629 | */ |
630 | static int vdbePmaReaderSeek( |
631 | SortSubtask *pTask, /* Task context */ |
632 | PmaReader *pReadr, /* Reader whose cursor is to be moved */ |
633 | SorterFile *pFile, /* Sorter file to read from */ |
634 | i64 iOff /* Offset in pFile */ |
635 | ){ |
636 | int rc = SQLITE_OK; |
637 | |
638 | assert( pReadr->pIncr==0 || pReadr->pIncr->bEof==0 ); |
639 | |
640 | if( sqlite3FaultSim(201) ) return SQLITE_IOERR_READ; |
641 | if( pReadr->aMap ){ |
642 | sqlite3OsUnfetch(pReadr->pFd, 0, pReadr->aMap); |
643 | pReadr->aMap = 0; |
644 | } |
645 | pReadr->iReadOff = iOff; |
646 | pReadr->iEof = pFile->iEof; |
647 | pReadr->pFd = pFile->pFd; |
648 | |
649 | rc = vdbeSorterMapFile(pTask, pFile, &pReadr->aMap); |
650 | if( rc==SQLITE_OK && pReadr->aMap==0 ){ |
651 | int pgsz = pTask->pSorter->pgsz; |
652 | int iBuf = pReadr->iReadOff % pgsz; |
653 | if( pReadr->aBuffer==0 ){ |
654 | pReadr->aBuffer = (u8*)sqlite3Malloc(pgsz); |
655 | if( pReadr->aBuffer==0 ) rc = SQLITE_NOMEM_BKPT; |
656 | pReadr->nBuffer = pgsz; |
657 | } |
658 | if( rc==SQLITE_OK && iBuf ){ |
659 | int nRead = pgsz - iBuf; |
660 | if( (pReadr->iReadOff + nRead) > pReadr->iEof ){ |
661 | nRead = (int)(pReadr->iEof - pReadr->iReadOff); |
662 | } |
663 | rc = sqlite3OsRead( |
664 | pReadr->pFd, &pReadr->aBuffer[iBuf], nRead, pReadr->iReadOff |
665 | ); |
666 | testcase( rc!=SQLITE_OK ); |
667 | } |
668 | } |
669 | |
670 | return rc; |
671 | } |
672 | |
673 | /* |
674 | ** Advance PmaReader pReadr to the next key in its PMA. Return SQLITE_OK if |
675 | ** no error occurs, or an SQLite error code if one does. |
676 | */ |
677 | static int vdbePmaReaderNext(PmaReader *pReadr){ |
678 | int rc = SQLITE_OK; /* Return Code */ |
679 | u64 nRec = 0; /* Size of record in bytes */ |
680 | |
681 | |
682 | if( pReadr->iReadOff>=pReadr->iEof ){ |
683 | IncrMerger *pIncr = pReadr->pIncr; |
684 | int bEof = 1; |
685 | if( pIncr ){ |
686 | rc = vdbeIncrSwap(pIncr); |
687 | if( rc==SQLITE_OK && pIncr->bEof==0 ){ |
688 | rc = vdbePmaReaderSeek( |
689 | pIncr->pTask, pReadr, &pIncr->aFile[0], pIncr->iStartOff |
690 | ); |
691 | bEof = 0; |
692 | } |
693 | } |
694 | |
695 | if( bEof ){ |
696 | /* This is an EOF condition */ |
697 | vdbePmaReaderClear(pReadr); |
698 | testcase( rc!=SQLITE_OK ); |
699 | return rc; |
700 | } |
701 | } |
702 | |
703 | if( rc==SQLITE_OK ){ |
704 | rc = vdbePmaReadVarint(pReadr, &nRec); |
705 | } |
706 | if( rc==SQLITE_OK ){ |
707 | pReadr->nKey = (int)nRec; |
708 | rc = vdbePmaReadBlob(pReadr, (int)nRec, &pReadr->aKey); |
709 | testcase( rc!=SQLITE_OK ); |
710 | } |
711 | |
712 | return rc; |
713 | } |
714 | |
715 | /* |
716 | ** Initialize PmaReader pReadr to scan through the PMA stored in file pFile |
717 | ** starting at offset iStart and ending at offset iEof-1. This function |
718 | ** leaves the PmaReader pointing to the first key in the PMA (or EOF if the |
719 | ** PMA is empty). |
720 | ** |
721 | ** If the pnByte parameter is NULL, then it is assumed that the file |
722 | ** contains a single PMA, and that that PMA omits the initial length varint. |
723 | */ |
724 | static int vdbePmaReaderInit( |
725 | SortSubtask *pTask, /* Task context */ |
726 | SorterFile *pFile, /* Sorter file to read from */ |
727 | i64 iStart, /* Start offset in pFile */ |
728 | PmaReader *pReadr, /* PmaReader to populate */ |
729 | i64 *pnByte /* IN/OUT: Increment this value by PMA size */ |
730 | ){ |
731 | int rc; |
732 | |
733 | assert( pFile->iEof>iStart ); |
734 | assert( pReadr->aAlloc==0 && pReadr->nAlloc==0 ); |
735 | assert( pReadr->aBuffer==0 ); |
736 | assert( pReadr->aMap==0 ); |
737 | |
738 | rc = vdbePmaReaderSeek(pTask, pReadr, pFile, iStart); |
739 | if( rc==SQLITE_OK ){ |
740 | u64 nByte = 0; /* Size of PMA in bytes */ |
741 | rc = vdbePmaReadVarint(pReadr, &nByte); |
742 | pReadr->iEof = pReadr->iReadOff + nByte; |
743 | *pnByte += nByte; |
744 | } |
745 | |
746 | if( rc==SQLITE_OK ){ |
747 | rc = vdbePmaReaderNext(pReadr); |
748 | } |
749 | return rc; |
750 | } |
751 | |
752 | /* |
753 | ** A version of vdbeSorterCompare() that assumes that it has already been |
754 | ** determined that the first field of key1 is equal to the first field of |
755 | ** key2. |
756 | */ |
757 | static int vdbeSorterCompareTail( |
758 | SortSubtask *pTask, /* Subtask context (for pKeyInfo) */ |
759 | int *pbKey2Cached, /* True if pTask->pUnpacked is pKey2 */ |
760 | const void *pKey1, int nKey1, /* Left side of comparison */ |
761 | const void *pKey2, int nKey2 /* Right side of comparison */ |
762 | ){ |
763 | UnpackedRecord *r2 = pTask->pUnpacked; |
764 | if( *pbKey2Cached==0 ){ |
765 | sqlite3VdbeRecordUnpack(pTask->pSorter->pKeyInfo, nKey2, pKey2, r2); |
766 | *pbKey2Cached = 1; |
767 | } |
768 | return sqlite3VdbeRecordCompareWithSkip(nKey1, pKey1, r2, 1); |
769 | } |
770 | |
771 | /* |
772 | ** Compare key1 (buffer pKey1, size nKey1 bytes) with key2 (buffer pKey2, |
773 | ** size nKey2 bytes). Use (pTask->pKeyInfo) for the collation sequences |
774 | ** used by the comparison. Return the result of the comparison. |
775 | ** |
776 | ** If IN/OUT parameter *pbKey2Cached is true when this function is called, |
777 | ** it is assumed that (pTask->pUnpacked) contains the unpacked version |
778 | ** of key2. If it is false, (pTask->pUnpacked) is populated with the unpacked |
779 | ** version of key2 and *pbKey2Cached set to true before returning. |
780 | ** |
781 | ** If an OOM error is encountered, (pTask->pUnpacked->error_rc) is set |
782 | ** to SQLITE_NOMEM. |
783 | */ |
784 | static int vdbeSorterCompare( |
785 | SortSubtask *pTask, /* Subtask context (for pKeyInfo) */ |
786 | int *pbKey2Cached, /* True if pTask->pUnpacked is pKey2 */ |
787 | const void *pKey1, int nKey1, /* Left side of comparison */ |
788 | const void *pKey2, int nKey2 /* Right side of comparison */ |
789 | ){ |
790 | UnpackedRecord *r2 = pTask->pUnpacked; |
791 | if( !*pbKey2Cached ){ |
792 | sqlite3VdbeRecordUnpack(pTask->pSorter->pKeyInfo, nKey2, pKey2, r2); |
793 | *pbKey2Cached = 1; |
794 | } |
795 | return sqlite3VdbeRecordCompare(nKey1, pKey1, r2); |
796 | } |
797 | |
798 | /* |
799 | ** A specially optimized version of vdbeSorterCompare() that assumes that |
800 | ** the first field of each key is a TEXT value and that the collation |
801 | ** sequence to compare them with is BINARY. |
802 | */ |
803 | static int vdbeSorterCompareText( |
804 | SortSubtask *pTask, /* Subtask context (for pKeyInfo) */ |
805 | int *pbKey2Cached, /* True if pTask->pUnpacked is pKey2 */ |
806 | const void *pKey1, int nKey1, /* Left side of comparison */ |
807 | const void *pKey2, int nKey2 /* Right side of comparison */ |
808 | ){ |
809 | const u8 * const p1 = (const u8 * const)pKey1; |
810 | const u8 * const p2 = (const u8 * const)pKey2; |
811 | const u8 * const v1 = &p1[ p1[0] ]; /* Pointer to value 1 */ |
812 | const u8 * const v2 = &p2[ p2[0] ]; /* Pointer to value 2 */ |
813 | |
814 | int n1; |
815 | int n2; |
816 | int res; |
817 | |
818 | getVarint32NR(&p1[1], n1); |
819 | getVarint32NR(&p2[1], n2); |
820 | res = memcmp(v1, v2, (MIN(n1, n2) - 13)/2); |
821 | if( res==0 ){ |
822 | res = n1 - n2; |
823 | } |
824 | |
825 | if( res==0 ){ |
826 | if( pTask->pSorter->pKeyInfo->nKeyField>1 ){ |
827 | res = vdbeSorterCompareTail( |
828 | pTask, pbKey2Cached, pKey1, nKey1, pKey2, nKey2 |
829 | ); |
830 | } |
831 | }else{ |
832 | assert( !(pTask->pSorter->pKeyInfo->aSortFlags[0]&KEYINFO_ORDER_BIGNULL) ); |
833 | if( pTask->pSorter->pKeyInfo->aSortFlags[0] ){ |
834 | res = res * -1; |
835 | } |
836 | } |
837 | |
838 | return res; |
839 | } |
840 | |
841 | /* |
842 | ** A specially optimized version of vdbeSorterCompare() that assumes that |
843 | ** the first field of each key is an INTEGER value. |
844 | */ |
845 | static int vdbeSorterCompareInt( |
846 | SortSubtask *pTask, /* Subtask context (for pKeyInfo) */ |
847 | int *pbKey2Cached, /* True if pTask->pUnpacked is pKey2 */ |
848 | const void *pKey1, int nKey1, /* Left side of comparison */ |
849 | const void *pKey2, int nKey2 /* Right side of comparison */ |
850 | ){ |
851 | const u8 * const p1 = (const u8 * const)pKey1; |
852 | const u8 * const p2 = (const u8 * const)pKey2; |
853 | const int s1 = p1[1]; /* Left hand serial type */ |
854 | const int s2 = p2[1]; /* Right hand serial type */ |
855 | const u8 * const v1 = &p1[ p1[0] ]; /* Pointer to value 1 */ |
856 | const u8 * const v2 = &p2[ p2[0] ]; /* Pointer to value 2 */ |
857 | int res; /* Return value */ |
858 | |
859 | assert( (s1>0 && s1<7) || s1==8 || s1==9 ); |
860 | assert( (s2>0 && s2<7) || s2==8 || s2==9 ); |
861 | |
862 | if( s1==s2 ){ |
863 | /* The two values have the same sign. Compare using memcmp(). */ |
864 | static const u8 aLen[] = {0, 1, 2, 3, 4, 6, 8, 0, 0, 0 }; |
865 | const u8 n = aLen[s1]; |
866 | int i; |
867 | res = 0; |
868 | for(i=0; i<n; i++){ |
869 | if( (res = v1[i] - v2[i])!=0 ){ |
870 | if( ((v1[0] ^ v2[0]) & 0x80)!=0 ){ |
871 | res = v1[0] & 0x80 ? -1 : +1; |
872 | } |
873 | break; |
874 | } |
875 | } |
876 | }else if( s1>7 && s2>7 ){ |
877 | res = s1 - s2; |
878 | }else{ |
879 | if( s2>7 ){ |
880 | res = +1; |
881 | }else if( s1>7 ){ |
882 | res = -1; |
883 | }else{ |
884 | res = s1 - s2; |
885 | } |
886 | assert( res!=0 ); |
887 | |
888 | if( res>0 ){ |
889 | if( *v1 & 0x80 ) res = -1; |
890 | }else{ |
891 | if( *v2 & 0x80 ) res = +1; |
892 | } |
893 | } |
894 | |
895 | if( res==0 ){ |
896 | if( pTask->pSorter->pKeyInfo->nKeyField>1 ){ |
897 | res = vdbeSorterCompareTail( |
898 | pTask, pbKey2Cached, pKey1, nKey1, pKey2, nKey2 |
899 | ); |
900 | } |
901 | }else if( pTask->pSorter->pKeyInfo->aSortFlags[0] ){ |
902 | assert( !(pTask->pSorter->pKeyInfo->aSortFlags[0]&KEYINFO_ORDER_BIGNULL) ); |
903 | res = res * -1; |
904 | } |
905 | |
906 | return res; |
907 | } |
908 | |
909 | /* |
910 | ** Initialize the temporary index cursor just opened as a sorter cursor. |
911 | ** |
912 | ** Usually, the sorter module uses the value of (pCsr->pKeyInfo->nKeyField) |
913 | ** to determine the number of fields that should be compared from the |
914 | ** records being sorted. However, if the value passed as argument nField |
915 | ** is non-zero and the sorter is able to guarantee a stable sort, nField |
916 | ** is used instead. This is used when sorting records for a CREATE INDEX |
917 | ** statement. In this case, keys are always delivered to the sorter in |
918 | ** order of the primary key, which happens to be make up the final part |
919 | ** of the records being sorted. So if the sort is stable, there is never |
920 | ** any reason to compare PK fields and they can be ignored for a small |
921 | ** performance boost. |
922 | ** |
923 | ** The sorter can guarantee a stable sort when running in single-threaded |
924 | ** mode, but not in multi-threaded mode. |
925 | ** |
926 | ** SQLITE_OK is returned if successful, or an SQLite error code otherwise. |
927 | */ |
928 | int sqlite3VdbeSorterInit( |
929 | sqlite3 *db, /* Database connection (for malloc()) */ |
930 | int nField, /* Number of key fields in each record */ |
931 | VdbeCursor *pCsr /* Cursor that holds the new sorter */ |
932 | ){ |
933 | int pgsz; /* Page size of main database */ |
934 | int i; /* Used to iterate through aTask[] */ |
935 | VdbeSorter *pSorter; /* The new sorter */ |
936 | KeyInfo *pKeyInfo; /* Copy of pCsr->pKeyInfo with db==0 */ |
937 | int szKeyInfo; /* Size of pCsr->pKeyInfo in bytes */ |
938 | int sz; /* Size of pSorter in bytes */ |
939 | int rc = SQLITE_OK; |
940 | #if SQLITE_MAX_WORKER_THREADS==0 |
941 | # define nWorker 0 |
942 | #else |
943 | int nWorker; |
944 | #endif |
945 | |
946 | /* Initialize the upper limit on the number of worker threads */ |
947 | #if SQLITE_MAX_WORKER_THREADS>0 |
948 | if( sqlite3TempInMemory(db) || sqlite3GlobalConfig.bCoreMutex==0 ){ |
949 | nWorker = 0; |
950 | }else{ |
951 | nWorker = db->aLimit[SQLITE_LIMIT_WORKER_THREADS]; |
952 | } |
953 | #endif |
954 | |
955 | /* Do not allow the total number of threads (main thread + all workers) |
956 | ** to exceed the maximum merge count */ |
957 | #if SQLITE_MAX_WORKER_THREADS>=SORTER_MAX_MERGE_COUNT |
958 | if( nWorker>=SORTER_MAX_MERGE_COUNT ){ |
959 | nWorker = SORTER_MAX_MERGE_COUNT-1; |
960 | } |
961 | #endif |
962 | |
963 | assert( pCsr->pKeyInfo ); |
964 | assert( !pCsr->isEphemeral ); |
965 | assert( pCsr->eCurType==CURTYPE_SORTER ); |
966 | szKeyInfo = sizeof(KeyInfo) + (pCsr->pKeyInfo->nKeyField-1)*sizeof(CollSeq*); |
967 | sz = sizeof(VdbeSorter) + nWorker * sizeof(SortSubtask); |
968 | |
969 | pSorter = (VdbeSorter*)sqlite3DbMallocZero(db, sz + szKeyInfo); |
970 | pCsr->uc.pSorter = pSorter; |
971 | if( pSorter==0 ){ |
972 | rc = SQLITE_NOMEM_BKPT; |
973 | }else{ |
974 | Btree *pBt = db->aDb[0].pBt; |
975 | pSorter->pKeyInfo = pKeyInfo = (KeyInfo*)((u8*)pSorter + sz); |
976 | memcpy(pKeyInfo, pCsr->pKeyInfo, szKeyInfo); |
977 | pKeyInfo->db = 0; |
978 | if( nField && nWorker==0 ){ |
979 | pKeyInfo->nKeyField = nField; |
980 | } |
981 | sqlite3BtreeEnter(pBt); |
982 | pSorter->pgsz = pgsz = sqlite3BtreeGetPageSize(pBt); |
983 | sqlite3BtreeLeave(pBt); |
984 | pSorter->nTask = nWorker + 1; |
985 | pSorter->iPrev = (u8)(nWorker - 1); |
986 | pSorter->bUseThreads = (pSorter->nTask>1); |
987 | pSorter->db = db; |
988 | for(i=0; i<pSorter->nTask; i++){ |
989 | SortSubtask *pTask = &pSorter->aTask[i]; |
990 | pTask->pSorter = pSorter; |
991 | } |
992 | |
993 | if( !sqlite3TempInMemory(db) ){ |
994 | i64 mxCache; /* Cache size in bytes*/ |
995 | u32 szPma = sqlite3GlobalConfig.szPma; |
996 | pSorter->mnPmaSize = szPma * pgsz; |
997 | |
998 | mxCache = db->aDb[0].pSchema->cache_size; |
999 | if( mxCache<0 ){ |
1000 | /* A negative cache-size value C indicates that the cache is abs(C) |
1001 | ** KiB in size. */ |
1002 | mxCache = mxCache * -1024; |
1003 | }else{ |
1004 | mxCache = mxCache * pgsz; |
1005 | } |
1006 | mxCache = MIN(mxCache, SQLITE_MAX_PMASZ); |
1007 | pSorter->mxPmaSize = MAX(pSorter->mnPmaSize, (int)mxCache); |
1008 | |
1009 | /* Avoid large memory allocations if the application has requested |
1010 | ** SQLITE_CONFIG_SMALL_MALLOC. */ |
1011 | if( sqlite3GlobalConfig.bSmallMalloc==0 ){ |
1012 | assert( pSorter->iMemory==0 ); |
1013 | pSorter->nMemory = pgsz; |
1014 | pSorter->list.aMemory = (u8*)sqlite3Malloc(pgsz); |
1015 | if( !pSorter->list.aMemory ) rc = SQLITE_NOMEM_BKPT; |
1016 | } |
1017 | } |
1018 | |
1019 | if( pKeyInfo->nAllField<13 |
1020 | && (pKeyInfo->aColl[0]==0 || pKeyInfo->aColl[0]==db->pDfltColl) |
1021 | && (pKeyInfo->aSortFlags[0] & KEYINFO_ORDER_BIGNULL)==0 |
1022 | ){ |
1023 | pSorter->typeMask = SORTER_TYPE_INTEGER | SORTER_TYPE_TEXT; |
1024 | } |
1025 | } |
1026 | |
1027 | return rc; |
1028 | } |
1029 | #undef nWorker /* Defined at the top of this function */ |
1030 | |
1031 | /* |
1032 | ** Free the list of sorted records starting at pRecord. |
1033 | */ |
1034 | static void vdbeSorterRecordFree(sqlite3 *db, SorterRecord *pRecord){ |
1035 | SorterRecord *p; |
1036 | SorterRecord *pNext; |
1037 | for(p=pRecord; p; p=pNext){ |
1038 | pNext = p->u.pNext; |
1039 | sqlite3DbFree(db, p); |
1040 | } |
1041 | } |
1042 | |
1043 | /* |
1044 | ** Free all resources owned by the object indicated by argument pTask. All |
1045 | ** fields of *pTask are zeroed before returning. |
1046 | */ |
1047 | static void vdbeSortSubtaskCleanup(sqlite3 *db, SortSubtask *pTask){ |
1048 | sqlite3DbFree(db, pTask->pUnpacked); |
1049 | #if SQLITE_MAX_WORKER_THREADS>0 |
1050 | /* pTask->list.aMemory can only be non-zero if it was handed memory |
1051 | ** from the main thread. That only occurs SQLITE_MAX_WORKER_THREADS>0 */ |
1052 | if( pTask->list.aMemory ){ |
1053 | sqlite3_free(pTask->list.aMemory); |
1054 | }else |
1055 | #endif |
1056 | { |
1057 | assert( pTask->list.aMemory==0 ); |
1058 | vdbeSorterRecordFree(0, pTask->list.pList); |
1059 | } |
1060 | if( pTask->file.pFd ){ |
1061 | sqlite3OsCloseFree(pTask->file.pFd); |
1062 | } |
1063 | if( pTask->file2.pFd ){ |
1064 | sqlite3OsCloseFree(pTask->file2.pFd); |
1065 | } |
1066 | memset(pTask, 0, sizeof(SortSubtask)); |
1067 | } |
1068 | |
1069 | #ifdef SQLITE_DEBUG_SORTER_THREADS |
1070 | static void vdbeSorterWorkDebug(SortSubtask *pTask, const char *zEvent){ |
1071 | i64 t; |
1072 | int iTask = (pTask - pTask->pSorter->aTask); |
1073 | sqlite3OsCurrentTimeInt64(pTask->pSorter->db->pVfs, &t); |
1074 | fprintf(stderr, "%lld:%d %s\n" , t, iTask, zEvent); |
1075 | } |
1076 | static void vdbeSorterRewindDebug(const char *zEvent){ |
1077 | i64 t = 0; |
1078 | sqlite3_vfs *pVfs = sqlite3_vfs_find(0); |
1079 | if( ALWAYS(pVfs) ) sqlite3OsCurrentTimeInt64(pVfs, &t); |
1080 | fprintf(stderr, "%lld:X %s\n" , t, zEvent); |
1081 | } |
1082 | static void vdbeSorterPopulateDebug( |
1083 | SortSubtask *pTask, |
1084 | const char *zEvent |
1085 | ){ |
1086 | i64 t; |
1087 | int iTask = (pTask - pTask->pSorter->aTask); |
1088 | sqlite3OsCurrentTimeInt64(pTask->pSorter->db->pVfs, &t); |
1089 | fprintf(stderr, "%lld:bg%d %s\n" , t, iTask, zEvent); |
1090 | } |
1091 | static void vdbeSorterBlockDebug( |
1092 | SortSubtask *pTask, |
1093 | int bBlocked, |
1094 | const char *zEvent |
1095 | ){ |
1096 | if( bBlocked ){ |
1097 | i64 t; |
1098 | sqlite3OsCurrentTimeInt64(pTask->pSorter->db->pVfs, &t); |
1099 | fprintf(stderr, "%lld:main %s\n" , t, zEvent); |
1100 | } |
1101 | } |
1102 | #else |
1103 | # define vdbeSorterWorkDebug(x,y) |
1104 | # define vdbeSorterRewindDebug(y) |
1105 | # define vdbeSorterPopulateDebug(x,y) |
1106 | # define vdbeSorterBlockDebug(x,y,z) |
1107 | #endif |
1108 | |
1109 | #if SQLITE_MAX_WORKER_THREADS>0 |
1110 | /* |
1111 | ** Join thread pTask->thread. |
1112 | */ |
1113 | static int vdbeSorterJoinThread(SortSubtask *pTask){ |
1114 | int rc = SQLITE_OK; |
1115 | if( pTask->pThread ){ |
1116 | #ifdef SQLITE_DEBUG_SORTER_THREADS |
1117 | int bDone = pTask->bDone; |
1118 | #endif |
1119 | void *pRet = SQLITE_INT_TO_PTR(SQLITE_ERROR); |
1120 | vdbeSorterBlockDebug(pTask, !bDone, "enter" ); |
1121 | (void)sqlite3ThreadJoin(pTask->pThread, &pRet); |
1122 | vdbeSorterBlockDebug(pTask, !bDone, "exit" ); |
1123 | rc = SQLITE_PTR_TO_INT(pRet); |
1124 | assert( pTask->bDone==1 ); |
1125 | pTask->bDone = 0; |
1126 | pTask->pThread = 0; |
1127 | } |
1128 | return rc; |
1129 | } |
1130 | |
1131 | /* |
1132 | ** Launch a background thread to run xTask(pIn). |
1133 | */ |
1134 | static int vdbeSorterCreateThread( |
1135 | SortSubtask *pTask, /* Thread will use this task object */ |
1136 | void *(*xTask)(void*), /* Routine to run in a separate thread */ |
1137 | void *pIn /* Argument passed into xTask() */ |
1138 | ){ |
1139 | assert( pTask->pThread==0 && pTask->bDone==0 ); |
1140 | return sqlite3ThreadCreate(&pTask->pThread, xTask, pIn); |
1141 | } |
1142 | |
1143 | /* |
1144 | ** Join all outstanding threads launched by SorterWrite() to create |
1145 | ** level-0 PMAs. |
1146 | */ |
1147 | static int vdbeSorterJoinAll(VdbeSorter *pSorter, int rcin){ |
1148 | int rc = rcin; |
1149 | int i; |
1150 | |
1151 | /* This function is always called by the main user thread. |
1152 | ** |
1153 | ** If this function is being called after SorterRewind() has been called, |
1154 | ** it is possible that thread pSorter->aTask[pSorter->nTask-1].pThread |
1155 | ** is currently attempt to join one of the other threads. To avoid a race |
1156 | ** condition where this thread also attempts to join the same object, join |
1157 | ** thread pSorter->aTask[pSorter->nTask-1].pThread first. */ |
1158 | for(i=pSorter->nTask-1; i>=0; i--){ |
1159 | SortSubtask *pTask = &pSorter->aTask[i]; |
1160 | int rc2 = vdbeSorterJoinThread(pTask); |
1161 | if( rc==SQLITE_OK ) rc = rc2; |
1162 | } |
1163 | return rc; |
1164 | } |
1165 | #else |
1166 | # define vdbeSorterJoinAll(x,rcin) (rcin) |
1167 | # define vdbeSorterJoinThread(pTask) SQLITE_OK |
1168 | #endif |
1169 | |
1170 | /* |
1171 | ** Allocate a new MergeEngine object capable of handling up to |
1172 | ** nReader PmaReader inputs. |
1173 | ** |
1174 | ** nReader is automatically rounded up to the next power of two. |
1175 | ** nReader may not exceed SORTER_MAX_MERGE_COUNT even after rounding up. |
1176 | */ |
1177 | static MergeEngine *vdbeMergeEngineNew(int nReader){ |
1178 | int N = 2; /* Smallest power of two >= nReader */ |
1179 | int nByte; /* Total bytes of space to allocate */ |
1180 | MergeEngine *pNew; /* Pointer to allocated object to return */ |
1181 | |
1182 | assert( nReader<=SORTER_MAX_MERGE_COUNT ); |
1183 | |
1184 | while( N<nReader ) N += N; |
1185 | nByte = sizeof(MergeEngine) + N * (sizeof(int) + sizeof(PmaReader)); |
1186 | |
1187 | pNew = sqlite3FaultSim(100) ? 0 : (MergeEngine*)sqlite3MallocZero(nByte); |
1188 | if( pNew ){ |
1189 | pNew->nTree = N; |
1190 | pNew->pTask = 0; |
1191 | pNew->aReadr = (PmaReader*)&pNew[1]; |
1192 | pNew->aTree = (int*)&pNew->aReadr[N]; |
1193 | } |
1194 | return pNew; |
1195 | } |
1196 | |
1197 | /* |
1198 | ** Free the MergeEngine object passed as the only argument. |
1199 | */ |
1200 | static void vdbeMergeEngineFree(MergeEngine *pMerger){ |
1201 | int i; |
1202 | if( pMerger ){ |
1203 | for(i=0; i<pMerger->nTree; i++){ |
1204 | vdbePmaReaderClear(&pMerger->aReadr[i]); |
1205 | } |
1206 | } |
1207 | sqlite3_free(pMerger); |
1208 | } |
1209 | |
1210 | /* |
1211 | ** Free all resources associated with the IncrMerger object indicated by |
1212 | ** the first argument. |
1213 | */ |
1214 | static void vdbeIncrFree(IncrMerger *pIncr){ |
1215 | if( pIncr ){ |
1216 | #if SQLITE_MAX_WORKER_THREADS>0 |
1217 | if( pIncr->bUseThread ){ |
1218 | vdbeSorterJoinThread(pIncr->pTask); |
1219 | if( pIncr->aFile[0].pFd ) sqlite3OsCloseFree(pIncr->aFile[0].pFd); |
1220 | if( pIncr->aFile[1].pFd ) sqlite3OsCloseFree(pIncr->aFile[1].pFd); |
1221 | } |
1222 | #endif |
1223 | vdbeMergeEngineFree(pIncr->pMerger); |
1224 | sqlite3_free(pIncr); |
1225 | } |
1226 | } |
1227 | |
1228 | /* |
1229 | ** Reset a sorting cursor back to its original empty state. |
1230 | */ |
1231 | void sqlite3VdbeSorterReset(sqlite3 *db, VdbeSorter *pSorter){ |
1232 | int i; |
1233 | (void)vdbeSorterJoinAll(pSorter, SQLITE_OK); |
1234 | assert( pSorter->bUseThreads || pSorter->pReader==0 ); |
1235 | #if SQLITE_MAX_WORKER_THREADS>0 |
1236 | if( pSorter->pReader ){ |
1237 | vdbePmaReaderClear(pSorter->pReader); |
1238 | sqlite3DbFree(db, pSorter->pReader); |
1239 | pSorter->pReader = 0; |
1240 | } |
1241 | #endif |
1242 | vdbeMergeEngineFree(pSorter->pMerger); |
1243 | pSorter->pMerger = 0; |
1244 | for(i=0; i<pSorter->nTask; i++){ |
1245 | SortSubtask *pTask = &pSorter->aTask[i]; |
1246 | vdbeSortSubtaskCleanup(db, pTask); |
1247 | pTask->pSorter = pSorter; |
1248 | } |
1249 | if( pSorter->list.aMemory==0 ){ |
1250 | vdbeSorterRecordFree(0, pSorter->list.pList); |
1251 | } |
1252 | pSorter->list.pList = 0; |
1253 | pSorter->list.szPMA = 0; |
1254 | pSorter->bUsePMA = 0; |
1255 | pSorter->iMemory = 0; |
1256 | pSorter->mxKeysize = 0; |
1257 | sqlite3DbFree(db, pSorter->pUnpacked); |
1258 | pSorter->pUnpacked = 0; |
1259 | } |
1260 | |
1261 | /* |
1262 | ** Free any cursor components allocated by sqlite3VdbeSorterXXX routines. |
1263 | */ |
1264 | void sqlite3VdbeSorterClose(sqlite3 *db, VdbeCursor *pCsr){ |
1265 | VdbeSorter *pSorter; |
1266 | assert( pCsr->eCurType==CURTYPE_SORTER ); |
1267 | pSorter = pCsr->uc.pSorter; |
1268 | if( pSorter ){ |
1269 | sqlite3VdbeSorterReset(db, pSorter); |
1270 | sqlite3_free(pSorter->list.aMemory); |
1271 | sqlite3DbFree(db, pSorter); |
1272 | pCsr->uc.pSorter = 0; |
1273 | } |
1274 | } |
1275 | |
1276 | #if SQLITE_MAX_MMAP_SIZE>0 |
1277 | /* |
1278 | ** The first argument is a file-handle open on a temporary file. The file |
1279 | ** is guaranteed to be nByte bytes or smaller in size. This function |
1280 | ** attempts to extend the file to nByte bytes in size and to ensure that |
1281 | ** the VFS has memory mapped it. |
1282 | ** |
1283 | ** Whether or not the file does end up memory mapped of course depends on |
1284 | ** the specific VFS implementation. |
1285 | */ |
1286 | static void vdbeSorterExtendFile(sqlite3 *db, sqlite3_file *pFd, i64 nByte){ |
1287 | if( nByte<=(i64)(db->nMaxSorterMmap) && pFd->pMethods->iVersion>=3 ){ |
1288 | void *p = 0; |
1289 | int chunksize = 4*1024; |
1290 | sqlite3OsFileControlHint(pFd, SQLITE_FCNTL_CHUNK_SIZE, &chunksize); |
1291 | sqlite3OsFileControlHint(pFd, SQLITE_FCNTL_SIZE_HINT, &nByte); |
1292 | sqlite3OsFetch(pFd, 0, (int)nByte, &p); |
1293 | if( p ) sqlite3OsUnfetch(pFd, 0, p); |
1294 | } |
1295 | } |
1296 | #else |
1297 | # define vdbeSorterExtendFile(x,y,z) |
1298 | #endif |
1299 | |
1300 | /* |
1301 | ** Allocate space for a file-handle and open a temporary file. If successful, |
1302 | ** set *ppFd to point to the malloc'd file-handle and return SQLITE_OK. |
1303 | ** Otherwise, set *ppFd to 0 and return an SQLite error code. |
1304 | */ |
1305 | static int vdbeSorterOpenTempFile( |
1306 | sqlite3 *db, /* Database handle doing sort */ |
1307 | i64 nExtend, /* Attempt to extend file to this size */ |
1308 | sqlite3_file **ppFd |
1309 | ){ |
1310 | int rc; |
1311 | if( sqlite3FaultSim(202) ) return SQLITE_IOERR_ACCESS; |
1312 | rc = sqlite3OsOpenMalloc(db->pVfs, 0, ppFd, |
1313 | SQLITE_OPEN_TEMP_JOURNAL | |
1314 | SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | |
1315 | SQLITE_OPEN_EXCLUSIVE | SQLITE_OPEN_DELETEONCLOSE, &rc |
1316 | ); |
1317 | if( rc==SQLITE_OK ){ |
1318 | i64 max = SQLITE_MAX_MMAP_SIZE; |
1319 | sqlite3OsFileControlHint(*ppFd, SQLITE_FCNTL_MMAP_SIZE, (void*)&max); |
1320 | if( nExtend>0 ){ |
1321 | vdbeSorterExtendFile(db, *ppFd, nExtend); |
1322 | } |
1323 | } |
1324 | return rc; |
1325 | } |
1326 | |
1327 | /* |
1328 | ** If it has not already been allocated, allocate the UnpackedRecord |
1329 | ** structure at pTask->pUnpacked. Return SQLITE_OK if successful (or |
1330 | ** if no allocation was required), or SQLITE_NOMEM otherwise. |
1331 | */ |
1332 | static int vdbeSortAllocUnpacked(SortSubtask *pTask){ |
1333 | if( pTask->pUnpacked==0 ){ |
1334 | pTask->pUnpacked = sqlite3VdbeAllocUnpackedRecord(pTask->pSorter->pKeyInfo); |
1335 | if( pTask->pUnpacked==0 ) return SQLITE_NOMEM_BKPT; |
1336 | pTask->pUnpacked->nField = pTask->pSorter->pKeyInfo->nKeyField; |
1337 | pTask->pUnpacked->errCode = 0; |
1338 | } |
1339 | return SQLITE_OK; |
1340 | } |
1341 | |
1342 | |
1343 | /* |
1344 | ** Merge the two sorted lists p1 and p2 into a single list. |
1345 | */ |
1346 | static SorterRecord *vdbeSorterMerge( |
1347 | SortSubtask *pTask, /* Calling thread context */ |
1348 | SorterRecord *p1, /* First list to merge */ |
1349 | SorterRecord *p2 /* Second list to merge */ |
1350 | ){ |
1351 | SorterRecord *pFinal = 0; |
1352 | SorterRecord **pp = &pFinal; |
1353 | int bCached = 0; |
1354 | |
1355 | assert( p1!=0 && p2!=0 ); |
1356 | for(;;){ |
1357 | int res; |
1358 | res = pTask->xCompare( |
1359 | pTask, &bCached, SRVAL(p1), p1->nVal, SRVAL(p2), p2->nVal |
1360 | ); |
1361 | |
1362 | if( res<=0 ){ |
1363 | *pp = p1; |
1364 | pp = &p1->u.pNext; |
1365 | p1 = p1->u.pNext; |
1366 | if( p1==0 ){ |
1367 | *pp = p2; |
1368 | break; |
1369 | } |
1370 | }else{ |
1371 | *pp = p2; |
1372 | pp = &p2->u.pNext; |
1373 | p2 = p2->u.pNext; |
1374 | bCached = 0; |
1375 | if( p2==0 ){ |
1376 | *pp = p1; |
1377 | break; |
1378 | } |
1379 | } |
1380 | } |
1381 | return pFinal; |
1382 | } |
1383 | |
1384 | /* |
1385 | ** Return the SorterCompare function to compare values collected by the |
1386 | ** sorter object passed as the only argument. |
1387 | */ |
1388 | static SorterCompare vdbeSorterGetCompare(VdbeSorter *p){ |
1389 | if( p->typeMask==SORTER_TYPE_INTEGER ){ |
1390 | return vdbeSorterCompareInt; |
1391 | }else if( p->typeMask==SORTER_TYPE_TEXT ){ |
1392 | return vdbeSorterCompareText; |
1393 | } |
1394 | return vdbeSorterCompare; |
1395 | } |
1396 | |
1397 | /* |
1398 | ** Sort the linked list of records headed at pTask->pList. Return |
1399 | ** SQLITE_OK if successful, or an SQLite error code (i.e. SQLITE_NOMEM) if |
1400 | ** an error occurs. |
1401 | */ |
1402 | static int vdbeSorterSort(SortSubtask *pTask, SorterList *pList){ |
1403 | int i; |
1404 | SorterRecord *p; |
1405 | int rc; |
1406 | SorterRecord *aSlot[64]; |
1407 | |
1408 | rc = vdbeSortAllocUnpacked(pTask); |
1409 | if( rc!=SQLITE_OK ) return rc; |
1410 | |
1411 | p = pList->pList; |
1412 | pTask->xCompare = vdbeSorterGetCompare(pTask->pSorter); |
1413 | memset(aSlot, 0, sizeof(aSlot)); |
1414 | |
1415 | while( p ){ |
1416 | SorterRecord *pNext; |
1417 | if( pList->aMemory ){ |
1418 | if( (u8*)p==pList->aMemory ){ |
1419 | pNext = 0; |
1420 | }else{ |
1421 | assert( p->u.iNext<sqlite3MallocSize(pList->aMemory) ); |
1422 | pNext = (SorterRecord*)&pList->aMemory[p->u.iNext]; |
1423 | } |
1424 | }else{ |
1425 | pNext = p->u.pNext; |
1426 | } |
1427 | |
1428 | p->u.pNext = 0; |
1429 | for(i=0; aSlot[i]; i++){ |
1430 | p = vdbeSorterMerge(pTask, p, aSlot[i]); |
1431 | aSlot[i] = 0; |
1432 | } |
1433 | aSlot[i] = p; |
1434 | p = pNext; |
1435 | } |
1436 | |
1437 | p = 0; |
1438 | for(i=0; i<ArraySize(aSlot); i++){ |
1439 | if( aSlot[i]==0 ) continue; |
1440 | p = p ? vdbeSorterMerge(pTask, p, aSlot[i]) : aSlot[i]; |
1441 | } |
1442 | pList->pList = p; |
1443 | |
1444 | assert( pTask->pUnpacked->errCode==SQLITE_OK |
1445 | || pTask->pUnpacked->errCode==SQLITE_NOMEM |
1446 | ); |
1447 | return pTask->pUnpacked->errCode; |
1448 | } |
1449 | |
1450 | /* |
1451 | ** Initialize a PMA-writer object. |
1452 | */ |
1453 | static void vdbePmaWriterInit( |
1454 | sqlite3_file *pFd, /* File handle to write to */ |
1455 | PmaWriter *p, /* Object to populate */ |
1456 | int nBuf, /* Buffer size */ |
1457 | i64 iStart /* Offset of pFd to begin writing at */ |
1458 | ){ |
1459 | memset(p, 0, sizeof(PmaWriter)); |
1460 | p->aBuffer = (u8*)sqlite3Malloc(nBuf); |
1461 | if( !p->aBuffer ){ |
1462 | p->eFWErr = SQLITE_NOMEM_BKPT; |
1463 | }else{ |
1464 | p->iBufEnd = p->iBufStart = (iStart % nBuf); |
1465 | p->iWriteOff = iStart - p->iBufStart; |
1466 | p->nBuffer = nBuf; |
1467 | p->pFd = pFd; |
1468 | } |
1469 | } |
1470 | |
1471 | /* |
1472 | ** Write nData bytes of data to the PMA. Return SQLITE_OK |
1473 | ** if successful, or an SQLite error code if an error occurs. |
1474 | */ |
1475 | static void vdbePmaWriteBlob(PmaWriter *p, u8 *pData, int nData){ |
1476 | int nRem = nData; |
1477 | while( nRem>0 && p->eFWErr==0 ){ |
1478 | int nCopy = nRem; |
1479 | if( nCopy>(p->nBuffer - p->iBufEnd) ){ |
1480 | nCopy = p->nBuffer - p->iBufEnd; |
1481 | } |
1482 | |
1483 | memcpy(&p->aBuffer[p->iBufEnd], &pData[nData-nRem], nCopy); |
1484 | p->iBufEnd += nCopy; |
1485 | if( p->iBufEnd==p->nBuffer ){ |
1486 | p->eFWErr = sqlite3OsWrite(p->pFd, |
1487 | &p->aBuffer[p->iBufStart], p->iBufEnd - p->iBufStart, |
1488 | p->iWriteOff + p->iBufStart |
1489 | ); |
1490 | p->iBufStart = p->iBufEnd = 0; |
1491 | p->iWriteOff += p->nBuffer; |
1492 | } |
1493 | assert( p->iBufEnd<p->nBuffer ); |
1494 | |
1495 | nRem -= nCopy; |
1496 | } |
1497 | } |
1498 | |
1499 | /* |
1500 | ** Flush any buffered data to disk and clean up the PMA-writer object. |
1501 | ** The results of using the PMA-writer after this call are undefined. |
1502 | ** Return SQLITE_OK if flushing the buffered data succeeds or is not |
1503 | ** required. Otherwise, return an SQLite error code. |
1504 | ** |
1505 | ** Before returning, set *piEof to the offset immediately following the |
1506 | ** last byte written to the file. |
1507 | */ |
1508 | static int vdbePmaWriterFinish(PmaWriter *p, i64 *piEof){ |
1509 | int rc; |
1510 | if( p->eFWErr==0 && ALWAYS(p->aBuffer) && p->iBufEnd>p->iBufStart ){ |
1511 | p->eFWErr = sqlite3OsWrite(p->pFd, |
1512 | &p->aBuffer[p->iBufStart], p->iBufEnd - p->iBufStart, |
1513 | p->iWriteOff + p->iBufStart |
1514 | ); |
1515 | } |
1516 | *piEof = (p->iWriteOff + p->iBufEnd); |
1517 | sqlite3_free(p->aBuffer); |
1518 | rc = p->eFWErr; |
1519 | memset(p, 0, sizeof(PmaWriter)); |
1520 | return rc; |
1521 | } |
1522 | |
1523 | /* |
1524 | ** Write value iVal encoded as a varint to the PMA. Return |
1525 | ** SQLITE_OK if successful, or an SQLite error code if an error occurs. |
1526 | */ |
1527 | static void vdbePmaWriteVarint(PmaWriter *p, u64 iVal){ |
1528 | int nByte; |
1529 | u8 aByte[10]; |
1530 | nByte = sqlite3PutVarint(aByte, iVal); |
1531 | vdbePmaWriteBlob(p, aByte, nByte); |
1532 | } |
1533 | |
1534 | /* |
1535 | ** Write the current contents of in-memory linked-list pList to a level-0 |
1536 | ** PMA in the temp file belonging to sub-task pTask. Return SQLITE_OK if |
1537 | ** successful, or an SQLite error code otherwise. |
1538 | ** |
1539 | ** The format of a PMA is: |
1540 | ** |
1541 | ** * A varint. This varint contains the total number of bytes of content |
1542 | ** in the PMA (not including the varint itself). |
1543 | ** |
1544 | ** * One or more records packed end-to-end in order of ascending keys. |
1545 | ** Each record consists of a varint followed by a blob of data (the |
1546 | ** key). The varint is the number of bytes in the blob of data. |
1547 | */ |
1548 | static int vdbeSorterListToPMA(SortSubtask *pTask, SorterList *pList){ |
1549 | sqlite3 *db = pTask->pSorter->db; |
1550 | int rc = SQLITE_OK; /* Return code */ |
1551 | PmaWriter writer; /* Object used to write to the file */ |
1552 | |
1553 | #ifdef SQLITE_DEBUG |
1554 | /* Set iSz to the expected size of file pTask->file after writing the PMA. |
1555 | ** This is used by an assert() statement at the end of this function. */ |
1556 | i64 iSz = pList->szPMA + sqlite3VarintLen(pList->szPMA) + pTask->file.iEof; |
1557 | #endif |
1558 | |
1559 | vdbeSorterWorkDebug(pTask, "enter" ); |
1560 | memset(&writer, 0, sizeof(PmaWriter)); |
1561 | assert( pList->szPMA>0 ); |
1562 | |
1563 | /* If the first temporary PMA file has not been opened, open it now. */ |
1564 | if( pTask->file.pFd==0 ){ |
1565 | rc = vdbeSorterOpenTempFile(db, 0, &pTask->file.pFd); |
1566 | assert( rc!=SQLITE_OK || pTask->file.pFd ); |
1567 | assert( pTask->file.iEof==0 ); |
1568 | assert( pTask->nPMA==0 ); |
1569 | } |
1570 | |
1571 | /* Try to get the file to memory map */ |
1572 | if( rc==SQLITE_OK ){ |
1573 | vdbeSorterExtendFile(db, pTask->file.pFd, pTask->file.iEof+pList->szPMA+9); |
1574 | } |
1575 | |
1576 | /* Sort the list */ |
1577 | if( rc==SQLITE_OK ){ |
1578 | rc = vdbeSorterSort(pTask, pList); |
1579 | } |
1580 | |
1581 | if( rc==SQLITE_OK ){ |
1582 | SorterRecord *p; |
1583 | SorterRecord *pNext = 0; |
1584 | |
1585 | vdbePmaWriterInit(pTask->file.pFd, &writer, pTask->pSorter->pgsz, |
1586 | pTask->file.iEof); |
1587 | pTask->nPMA++; |
1588 | vdbePmaWriteVarint(&writer, pList->szPMA); |
1589 | for(p=pList->pList; p; p=pNext){ |
1590 | pNext = p->u.pNext; |
1591 | vdbePmaWriteVarint(&writer, p->nVal); |
1592 | vdbePmaWriteBlob(&writer, SRVAL(p), p->nVal); |
1593 | if( pList->aMemory==0 ) sqlite3_free(p); |
1594 | } |
1595 | pList->pList = p; |
1596 | rc = vdbePmaWriterFinish(&writer, &pTask->file.iEof); |
1597 | } |
1598 | |
1599 | vdbeSorterWorkDebug(pTask, "exit" ); |
1600 | assert( rc!=SQLITE_OK || pList->pList==0 ); |
1601 | assert( rc!=SQLITE_OK || pTask->file.iEof==iSz ); |
1602 | return rc; |
1603 | } |
1604 | |
1605 | /* |
1606 | ** Advance the MergeEngine to its next entry. |
1607 | ** Set *pbEof to true there is no next entry because |
1608 | ** the MergeEngine has reached the end of all its inputs. |
1609 | ** |
1610 | ** Return SQLITE_OK if successful or an error code if an error occurs. |
1611 | */ |
1612 | static int vdbeMergeEngineStep( |
1613 | MergeEngine *pMerger, /* The merge engine to advance to the next row */ |
1614 | int *pbEof /* Set TRUE at EOF. Set false for more content */ |
1615 | ){ |
1616 | int rc; |
1617 | int iPrev = pMerger->aTree[1];/* Index of PmaReader to advance */ |
1618 | SortSubtask *pTask = pMerger->pTask; |
1619 | |
1620 | /* Advance the current PmaReader */ |
1621 | rc = vdbePmaReaderNext(&pMerger->aReadr[iPrev]); |
1622 | |
1623 | /* Update contents of aTree[] */ |
1624 | if( rc==SQLITE_OK ){ |
1625 | int i; /* Index of aTree[] to recalculate */ |
1626 | PmaReader *pReadr1; /* First PmaReader to compare */ |
1627 | PmaReader *pReadr2; /* Second PmaReader to compare */ |
1628 | int bCached = 0; |
1629 | |
1630 | /* Find the first two PmaReaders to compare. The one that was just |
1631 | ** advanced (iPrev) and the one next to it in the array. */ |
1632 | pReadr1 = &pMerger->aReadr[(iPrev & 0xFFFE)]; |
1633 | pReadr2 = &pMerger->aReadr[(iPrev | 0x0001)]; |
1634 | |
1635 | for(i=(pMerger->nTree+iPrev)/2; i>0; i=i/2){ |
1636 | /* Compare pReadr1 and pReadr2. Store the result in variable iRes. */ |
1637 | int iRes; |
1638 | if( pReadr1->pFd==0 ){ |
1639 | iRes = +1; |
1640 | }else if( pReadr2->pFd==0 ){ |
1641 | iRes = -1; |
1642 | }else{ |
1643 | iRes = pTask->xCompare(pTask, &bCached, |
1644 | pReadr1->aKey, pReadr1->nKey, pReadr2->aKey, pReadr2->nKey |
1645 | ); |
1646 | } |
1647 | |
1648 | /* If pReadr1 contained the smaller value, set aTree[i] to its index. |
1649 | ** Then set pReadr2 to the next PmaReader to compare to pReadr1. In this |
1650 | ** case there is no cache of pReadr2 in pTask->pUnpacked, so set |
1651 | ** pKey2 to point to the record belonging to pReadr2. |
1652 | ** |
1653 | ** Alternatively, if pReadr2 contains the smaller of the two values, |
1654 | ** set aTree[i] to its index and update pReadr1. If vdbeSorterCompare() |
1655 | ** was actually called above, then pTask->pUnpacked now contains |
1656 | ** a value equivalent to pReadr2. So set pKey2 to NULL to prevent |
1657 | ** vdbeSorterCompare() from decoding pReadr2 again. |
1658 | ** |
1659 | ** If the two values were equal, then the value from the oldest |
1660 | ** PMA should be considered smaller. The VdbeSorter.aReadr[] array |
1661 | ** is sorted from oldest to newest, so pReadr1 contains older values |
1662 | ** than pReadr2 iff (pReadr1<pReadr2). */ |
1663 | if( iRes<0 || (iRes==0 && pReadr1<pReadr2) ){ |
1664 | pMerger->aTree[i] = (int)(pReadr1 - pMerger->aReadr); |
1665 | pReadr2 = &pMerger->aReadr[ pMerger->aTree[i ^ 0x0001] ]; |
1666 | bCached = 0; |
1667 | }else{ |
1668 | if( pReadr1->pFd ) bCached = 0; |
1669 | pMerger->aTree[i] = (int)(pReadr2 - pMerger->aReadr); |
1670 | pReadr1 = &pMerger->aReadr[ pMerger->aTree[i ^ 0x0001] ]; |
1671 | } |
1672 | } |
1673 | *pbEof = (pMerger->aReadr[pMerger->aTree[1]].pFd==0); |
1674 | } |
1675 | |
1676 | return (rc==SQLITE_OK ? pTask->pUnpacked->errCode : rc); |
1677 | } |
1678 | |
1679 | #if SQLITE_MAX_WORKER_THREADS>0 |
1680 | /* |
1681 | ** The main routine for background threads that write level-0 PMAs. |
1682 | */ |
1683 | static void *vdbeSorterFlushThread(void *pCtx){ |
1684 | SortSubtask *pTask = (SortSubtask*)pCtx; |
1685 | int rc; /* Return code */ |
1686 | assert( pTask->bDone==0 ); |
1687 | rc = vdbeSorterListToPMA(pTask, &pTask->list); |
1688 | pTask->bDone = 1; |
1689 | return SQLITE_INT_TO_PTR(rc); |
1690 | } |
1691 | #endif /* SQLITE_MAX_WORKER_THREADS>0 */ |
1692 | |
1693 | /* |
1694 | ** Flush the current contents of VdbeSorter.list to a new PMA, possibly |
1695 | ** using a background thread. |
1696 | */ |
1697 | static int vdbeSorterFlushPMA(VdbeSorter *pSorter){ |
1698 | #if SQLITE_MAX_WORKER_THREADS==0 |
1699 | pSorter->bUsePMA = 1; |
1700 | return vdbeSorterListToPMA(&pSorter->aTask[0], &pSorter->list); |
1701 | #else |
1702 | int rc = SQLITE_OK; |
1703 | int i; |
1704 | SortSubtask *pTask = 0; /* Thread context used to create new PMA */ |
1705 | int nWorker = (pSorter->nTask-1); |
1706 | |
1707 | /* Set the flag to indicate that at least one PMA has been written. |
1708 | ** Or will be, anyhow. */ |
1709 | pSorter->bUsePMA = 1; |
1710 | |
1711 | /* Select a sub-task to sort and flush the current list of in-memory |
1712 | ** records to disk. If the sorter is running in multi-threaded mode, |
1713 | ** round-robin between the first (pSorter->nTask-1) tasks. Except, if |
1714 | ** the background thread from a sub-tasks previous turn is still running, |
1715 | ** skip it. If the first (pSorter->nTask-1) sub-tasks are all still busy, |
1716 | ** fall back to using the final sub-task. The first (pSorter->nTask-1) |
1717 | ** sub-tasks are prefered as they use background threads - the final |
1718 | ** sub-task uses the main thread. */ |
1719 | for(i=0; i<nWorker; i++){ |
1720 | int iTest = (pSorter->iPrev + i + 1) % nWorker; |
1721 | pTask = &pSorter->aTask[iTest]; |
1722 | if( pTask->bDone ){ |
1723 | rc = vdbeSorterJoinThread(pTask); |
1724 | } |
1725 | if( rc!=SQLITE_OK || pTask->pThread==0 ) break; |
1726 | } |
1727 | |
1728 | if( rc==SQLITE_OK ){ |
1729 | if( i==nWorker ){ |
1730 | /* Use the foreground thread for this operation */ |
1731 | rc = vdbeSorterListToPMA(&pSorter->aTask[nWorker], &pSorter->list); |
1732 | }else{ |
1733 | /* Launch a background thread for this operation */ |
1734 | u8 *aMem; |
1735 | void *pCtx; |
1736 | |
1737 | assert( pTask!=0 ); |
1738 | assert( pTask->pThread==0 && pTask->bDone==0 ); |
1739 | assert( pTask->list.pList==0 ); |
1740 | assert( pTask->list.aMemory==0 || pSorter->list.aMemory!=0 ); |
1741 | |
1742 | aMem = pTask->list.aMemory; |
1743 | pCtx = (void*)pTask; |
1744 | pSorter->iPrev = (u8)(pTask - pSorter->aTask); |
1745 | pTask->list = pSorter->list; |
1746 | pSorter->list.pList = 0; |
1747 | pSorter->list.szPMA = 0; |
1748 | if( aMem ){ |
1749 | pSorter->list.aMemory = aMem; |
1750 | pSorter->nMemory = sqlite3MallocSize(aMem); |
1751 | }else if( pSorter->list.aMemory ){ |
1752 | pSorter->list.aMemory = sqlite3Malloc(pSorter->nMemory); |
1753 | if( !pSorter->list.aMemory ) return SQLITE_NOMEM_BKPT; |
1754 | } |
1755 | |
1756 | rc = vdbeSorterCreateThread(pTask, vdbeSorterFlushThread, pCtx); |
1757 | } |
1758 | } |
1759 | |
1760 | return rc; |
1761 | #endif /* SQLITE_MAX_WORKER_THREADS!=0 */ |
1762 | } |
1763 | |
1764 | /* |
1765 | ** Add a record to the sorter. |
1766 | */ |
1767 | int sqlite3VdbeSorterWrite( |
1768 | const VdbeCursor *pCsr, /* Sorter cursor */ |
1769 | Mem *pVal /* Memory cell containing record */ |
1770 | ){ |
1771 | VdbeSorter *pSorter; |
1772 | int rc = SQLITE_OK; /* Return Code */ |
1773 | SorterRecord *pNew; /* New list element */ |
1774 | int bFlush; /* True to flush contents of memory to PMA */ |
1775 | int nReq; /* Bytes of memory required */ |
1776 | int nPMA; /* Bytes of PMA space required */ |
1777 | int t; /* serial type of first record field */ |
1778 | |
1779 | assert( pCsr->eCurType==CURTYPE_SORTER ); |
1780 | pSorter = pCsr->uc.pSorter; |
1781 | getVarint32NR((const u8*)&pVal->z[1], t); |
1782 | if( t>0 && t<10 && t!=7 ){ |
1783 | pSorter->typeMask &= SORTER_TYPE_INTEGER; |
1784 | }else if( t>10 && (t & 0x01) ){ |
1785 | pSorter->typeMask &= SORTER_TYPE_TEXT; |
1786 | }else{ |
1787 | pSorter->typeMask = 0; |
1788 | } |
1789 | |
1790 | assert( pSorter ); |
1791 | |
1792 | /* Figure out whether or not the current contents of memory should be |
1793 | ** flushed to a PMA before continuing. If so, do so. |
1794 | ** |
1795 | ** If using the single large allocation mode (pSorter->aMemory!=0), then |
1796 | ** flush the contents of memory to a new PMA if (a) at least one value is |
1797 | ** already in memory and (b) the new value will not fit in memory. |
1798 | ** |
1799 | ** Or, if using separate allocations for each record, flush the contents |
1800 | ** of memory to a PMA if either of the following are true: |
1801 | ** |
1802 | ** * The total memory allocated for the in-memory list is greater |
1803 | ** than (page-size * cache-size), or |
1804 | ** |
1805 | ** * The total memory allocated for the in-memory list is greater |
1806 | ** than (page-size * 10) and sqlite3HeapNearlyFull() returns true. |
1807 | */ |
1808 | nReq = pVal->n + sizeof(SorterRecord); |
1809 | nPMA = pVal->n + sqlite3VarintLen(pVal->n); |
1810 | if( pSorter->mxPmaSize ){ |
1811 | if( pSorter->list.aMemory ){ |
1812 | bFlush = pSorter->iMemory && (pSorter->iMemory+nReq) > pSorter->mxPmaSize; |
1813 | }else{ |
1814 | bFlush = ( |
1815 | (pSorter->list.szPMA > pSorter->mxPmaSize) |
1816 | || (pSorter->list.szPMA > pSorter->mnPmaSize && sqlite3HeapNearlyFull()) |
1817 | ); |
1818 | } |
1819 | if( bFlush ){ |
1820 | rc = vdbeSorterFlushPMA(pSorter); |
1821 | pSorter->list.szPMA = 0; |
1822 | pSorter->iMemory = 0; |
1823 | assert( rc!=SQLITE_OK || pSorter->list.pList==0 ); |
1824 | } |
1825 | } |
1826 | |
1827 | pSorter->list.szPMA += nPMA; |
1828 | if( nPMA>pSorter->mxKeysize ){ |
1829 | pSorter->mxKeysize = nPMA; |
1830 | } |
1831 | |
1832 | if( pSorter->list.aMemory ){ |
1833 | int nMin = pSorter->iMemory + nReq; |
1834 | |
1835 | if( nMin>pSorter->nMemory ){ |
1836 | u8 *aNew; |
1837 | sqlite3_int64 nNew = 2 * (sqlite3_int64)pSorter->nMemory; |
1838 | int iListOff = -1; |
1839 | if( pSorter->list.pList ){ |
1840 | iListOff = (u8*)pSorter->list.pList - pSorter->list.aMemory; |
1841 | } |
1842 | while( nNew < nMin ) nNew = nNew*2; |
1843 | if( nNew > pSorter->mxPmaSize ) nNew = pSorter->mxPmaSize; |
1844 | if( nNew < nMin ) nNew = nMin; |
1845 | aNew = sqlite3Realloc(pSorter->list.aMemory, nNew); |
1846 | if( !aNew ) return SQLITE_NOMEM_BKPT; |
1847 | if( iListOff>=0 ){ |
1848 | pSorter->list.pList = (SorterRecord*)&aNew[iListOff]; |
1849 | } |
1850 | pSorter->list.aMemory = aNew; |
1851 | pSorter->nMemory = nNew; |
1852 | } |
1853 | |
1854 | pNew = (SorterRecord*)&pSorter->list.aMemory[pSorter->iMemory]; |
1855 | pSorter->iMemory += ROUND8(nReq); |
1856 | if( pSorter->list.pList ){ |
1857 | pNew->u.iNext = (int)((u8*)(pSorter->list.pList) - pSorter->list.aMemory); |
1858 | } |
1859 | }else{ |
1860 | pNew = (SorterRecord *)sqlite3Malloc(nReq); |
1861 | if( pNew==0 ){ |
1862 | return SQLITE_NOMEM_BKPT; |
1863 | } |
1864 | pNew->u.pNext = pSorter->list.pList; |
1865 | } |
1866 | |
1867 | memcpy(SRVAL(pNew), pVal->z, pVal->n); |
1868 | pNew->nVal = pVal->n; |
1869 | pSorter->list.pList = pNew; |
1870 | |
1871 | return rc; |
1872 | } |
1873 | |
1874 | /* |
1875 | ** Read keys from pIncr->pMerger and populate pIncr->aFile[1]. The format |
1876 | ** of the data stored in aFile[1] is the same as that used by regular PMAs, |
1877 | ** except that the number-of-bytes varint is omitted from the start. |
1878 | */ |
1879 | static int vdbeIncrPopulate(IncrMerger *pIncr){ |
1880 | int rc = SQLITE_OK; |
1881 | int rc2; |
1882 | i64 iStart = pIncr->iStartOff; |
1883 | SorterFile *pOut = &pIncr->aFile[1]; |
1884 | SortSubtask *pTask = pIncr->pTask; |
1885 | MergeEngine *pMerger = pIncr->pMerger; |
1886 | PmaWriter writer; |
1887 | assert( pIncr->bEof==0 ); |
1888 | |
1889 | vdbeSorterPopulateDebug(pTask, "enter" ); |
1890 | |
1891 | vdbePmaWriterInit(pOut->pFd, &writer, pTask->pSorter->pgsz, iStart); |
1892 | while( rc==SQLITE_OK ){ |
1893 | int dummy; |
1894 | PmaReader *pReader = &pMerger->aReadr[ pMerger->aTree[1] ]; |
1895 | int nKey = pReader->nKey; |
1896 | i64 iEof = writer.iWriteOff + writer.iBufEnd; |
1897 | |
1898 | /* Check if the output file is full or if the input has been exhausted. |
1899 | ** In either case exit the loop. */ |
1900 | if( pReader->pFd==0 ) break; |
1901 | if( (iEof + nKey + sqlite3VarintLen(nKey))>(iStart + pIncr->mxSz) ) break; |
1902 | |
1903 | /* Write the next key to the output. */ |
1904 | vdbePmaWriteVarint(&writer, nKey); |
1905 | vdbePmaWriteBlob(&writer, pReader->aKey, nKey); |
1906 | assert( pIncr->pMerger->pTask==pTask ); |
1907 | rc = vdbeMergeEngineStep(pIncr->pMerger, &dummy); |
1908 | } |
1909 | |
1910 | rc2 = vdbePmaWriterFinish(&writer, &pOut->iEof); |
1911 | if( rc==SQLITE_OK ) rc = rc2; |
1912 | vdbeSorterPopulateDebug(pTask, "exit" ); |
1913 | return rc; |
1914 | } |
1915 | |
1916 | #if SQLITE_MAX_WORKER_THREADS>0 |
1917 | /* |
1918 | ** The main routine for background threads that populate aFile[1] of |
1919 | ** multi-threaded IncrMerger objects. |
1920 | */ |
1921 | static void *vdbeIncrPopulateThread(void *pCtx){ |
1922 | IncrMerger *pIncr = (IncrMerger*)pCtx; |
1923 | void *pRet = SQLITE_INT_TO_PTR( vdbeIncrPopulate(pIncr) ); |
1924 | pIncr->pTask->bDone = 1; |
1925 | return pRet; |
1926 | } |
1927 | |
1928 | /* |
1929 | ** Launch a background thread to populate aFile[1] of pIncr. |
1930 | */ |
1931 | static int vdbeIncrBgPopulate(IncrMerger *pIncr){ |
1932 | void *p = (void*)pIncr; |
1933 | assert( pIncr->bUseThread ); |
1934 | return vdbeSorterCreateThread(pIncr->pTask, vdbeIncrPopulateThread, p); |
1935 | } |
1936 | #endif |
1937 | |
1938 | /* |
1939 | ** This function is called when the PmaReader corresponding to pIncr has |
1940 | ** finished reading the contents of aFile[0]. Its purpose is to "refill" |
1941 | ** aFile[0] such that the PmaReader should start rereading it from the |
1942 | ** beginning. |
1943 | ** |
1944 | ** For single-threaded objects, this is accomplished by literally reading |
1945 | ** keys from pIncr->pMerger and repopulating aFile[0]. |
1946 | ** |
1947 | ** For multi-threaded objects, all that is required is to wait until the |
1948 | ** background thread is finished (if it is not already) and then swap |
1949 | ** aFile[0] and aFile[1] in place. If the contents of pMerger have not |
1950 | ** been exhausted, this function also launches a new background thread |
1951 | ** to populate the new aFile[1]. |
1952 | ** |
1953 | ** SQLITE_OK is returned on success, or an SQLite error code otherwise. |
1954 | */ |
1955 | static int vdbeIncrSwap(IncrMerger *pIncr){ |
1956 | int rc = SQLITE_OK; |
1957 | |
1958 | #if SQLITE_MAX_WORKER_THREADS>0 |
1959 | if( pIncr->bUseThread ){ |
1960 | rc = vdbeSorterJoinThread(pIncr->pTask); |
1961 | |
1962 | if( rc==SQLITE_OK ){ |
1963 | SorterFile f0 = pIncr->aFile[0]; |
1964 | pIncr->aFile[0] = pIncr->aFile[1]; |
1965 | pIncr->aFile[1] = f0; |
1966 | } |
1967 | |
1968 | if( rc==SQLITE_OK ){ |
1969 | if( pIncr->aFile[0].iEof==pIncr->iStartOff ){ |
1970 | pIncr->bEof = 1; |
1971 | }else{ |
1972 | rc = vdbeIncrBgPopulate(pIncr); |
1973 | } |
1974 | } |
1975 | }else |
1976 | #endif |
1977 | { |
1978 | rc = vdbeIncrPopulate(pIncr); |
1979 | pIncr->aFile[0] = pIncr->aFile[1]; |
1980 | if( pIncr->aFile[0].iEof==pIncr->iStartOff ){ |
1981 | pIncr->bEof = 1; |
1982 | } |
1983 | } |
1984 | |
1985 | return rc; |
1986 | } |
1987 | |
1988 | /* |
1989 | ** Allocate and return a new IncrMerger object to read data from pMerger. |
1990 | ** |
1991 | ** If an OOM condition is encountered, return NULL. In this case free the |
1992 | ** pMerger argument before returning. |
1993 | */ |
1994 | static int vdbeIncrMergerNew( |
1995 | SortSubtask *pTask, /* The thread that will be using the new IncrMerger */ |
1996 | MergeEngine *pMerger, /* The MergeEngine that the IncrMerger will control */ |
1997 | IncrMerger **ppOut /* Write the new IncrMerger here */ |
1998 | ){ |
1999 | int rc = SQLITE_OK; |
2000 | IncrMerger *pIncr = *ppOut = (IncrMerger*) |
2001 | (sqlite3FaultSim(100) ? 0 : sqlite3MallocZero(sizeof(*pIncr))); |
2002 | if( pIncr ){ |
2003 | pIncr->pMerger = pMerger; |
2004 | pIncr->pTask = pTask; |
2005 | pIncr->mxSz = MAX(pTask->pSorter->mxKeysize+9,pTask->pSorter->mxPmaSize/2); |
2006 | pTask->file2.iEof += pIncr->mxSz; |
2007 | }else{ |
2008 | vdbeMergeEngineFree(pMerger); |
2009 | rc = SQLITE_NOMEM_BKPT; |
2010 | } |
2011 | assert( *ppOut!=0 || rc!=SQLITE_OK ); |
2012 | return rc; |
2013 | } |
2014 | |
2015 | #if SQLITE_MAX_WORKER_THREADS>0 |
2016 | /* |
2017 | ** Set the "use-threads" flag on object pIncr. |
2018 | */ |
2019 | static void vdbeIncrMergerSetThreads(IncrMerger *pIncr){ |
2020 | pIncr->bUseThread = 1; |
2021 | pIncr->pTask->file2.iEof -= pIncr->mxSz; |
2022 | } |
2023 | #endif /* SQLITE_MAX_WORKER_THREADS>0 */ |
2024 | |
2025 | |
2026 | |
2027 | /* |
2028 | ** Recompute pMerger->aTree[iOut] by comparing the next keys on the |
2029 | ** two PmaReaders that feed that entry. Neither of the PmaReaders |
2030 | ** are advanced. This routine merely does the comparison. |
2031 | */ |
2032 | static void vdbeMergeEngineCompare( |
2033 | MergeEngine *pMerger, /* Merge engine containing PmaReaders to compare */ |
2034 | int iOut /* Store the result in pMerger->aTree[iOut] */ |
2035 | ){ |
2036 | int i1; |
2037 | int i2; |
2038 | int iRes; |
2039 | PmaReader *p1; |
2040 | PmaReader *p2; |
2041 | |
2042 | assert( iOut<pMerger->nTree && iOut>0 ); |
2043 | |
2044 | if( iOut>=(pMerger->nTree/2) ){ |
2045 | i1 = (iOut - pMerger->nTree/2) * 2; |
2046 | i2 = i1 + 1; |
2047 | }else{ |
2048 | i1 = pMerger->aTree[iOut*2]; |
2049 | i2 = pMerger->aTree[iOut*2+1]; |
2050 | } |
2051 | |
2052 | p1 = &pMerger->aReadr[i1]; |
2053 | p2 = &pMerger->aReadr[i2]; |
2054 | |
2055 | if( p1->pFd==0 ){ |
2056 | iRes = i2; |
2057 | }else if( p2->pFd==0 ){ |
2058 | iRes = i1; |
2059 | }else{ |
2060 | SortSubtask *pTask = pMerger->pTask; |
2061 | int bCached = 0; |
2062 | int res; |
2063 | assert( pTask->pUnpacked!=0 ); /* from vdbeSortSubtaskMain() */ |
2064 | res = pTask->xCompare( |
2065 | pTask, &bCached, p1->aKey, p1->nKey, p2->aKey, p2->nKey |
2066 | ); |
2067 | if( res<=0 ){ |
2068 | iRes = i1; |
2069 | }else{ |
2070 | iRes = i2; |
2071 | } |
2072 | } |
2073 | |
2074 | pMerger->aTree[iOut] = iRes; |
2075 | } |
2076 | |
2077 | /* |
2078 | ** Allowed values for the eMode parameter to vdbeMergeEngineInit() |
2079 | ** and vdbePmaReaderIncrMergeInit(). |
2080 | ** |
2081 | ** Only INCRINIT_NORMAL is valid in single-threaded builds (when |
2082 | ** SQLITE_MAX_WORKER_THREADS==0). The other values are only used |
2083 | ** when there exists one or more separate worker threads. |
2084 | */ |
2085 | #define INCRINIT_NORMAL 0 |
2086 | #define INCRINIT_TASK 1 |
2087 | #define INCRINIT_ROOT 2 |
2088 | |
2089 | /* |
2090 | ** Forward reference required as the vdbeIncrMergeInit() and |
2091 | ** vdbePmaReaderIncrInit() routines are called mutually recursively when |
2092 | ** building a merge tree. |
2093 | */ |
2094 | static int vdbePmaReaderIncrInit(PmaReader *pReadr, int eMode); |
2095 | |
2096 | /* |
2097 | ** Initialize the MergeEngine object passed as the second argument. Once this |
2098 | ** function returns, the first key of merged data may be read from the |
2099 | ** MergeEngine object in the usual fashion. |
2100 | ** |
2101 | ** If argument eMode is INCRINIT_ROOT, then it is assumed that any IncrMerge |
2102 | ** objects attached to the PmaReader objects that the merger reads from have |
2103 | ** already been populated, but that they have not yet populated aFile[0] and |
2104 | ** set the PmaReader objects up to read from it. In this case all that is |
2105 | ** required is to call vdbePmaReaderNext() on each PmaReader to point it at |
2106 | ** its first key. |
2107 | ** |
2108 | ** Otherwise, if eMode is any value other than INCRINIT_ROOT, then use |
2109 | ** vdbePmaReaderIncrMergeInit() to initialize each PmaReader that feeds data |
2110 | ** to pMerger. |
2111 | ** |
2112 | ** SQLITE_OK is returned if successful, or an SQLite error code otherwise. |
2113 | */ |
2114 | static int vdbeMergeEngineInit( |
2115 | SortSubtask *pTask, /* Thread that will run pMerger */ |
2116 | MergeEngine *pMerger, /* MergeEngine to initialize */ |
2117 | int eMode /* One of the INCRINIT_XXX constants */ |
2118 | ){ |
2119 | int rc = SQLITE_OK; /* Return code */ |
2120 | int i; /* For looping over PmaReader objects */ |
2121 | int nTree; /* Number of subtrees to merge */ |
2122 | |
2123 | /* Failure to allocate the merge would have been detected prior to |
2124 | ** invoking this routine */ |
2125 | assert( pMerger!=0 ); |
2126 | |
2127 | /* eMode is always INCRINIT_NORMAL in single-threaded mode */ |
2128 | assert( SQLITE_MAX_WORKER_THREADS>0 || eMode==INCRINIT_NORMAL ); |
2129 | |
2130 | /* Verify that the MergeEngine is assigned to a single thread */ |
2131 | assert( pMerger->pTask==0 ); |
2132 | pMerger->pTask = pTask; |
2133 | |
2134 | nTree = pMerger->nTree; |
2135 | for(i=0; i<nTree; i++){ |
2136 | if( SQLITE_MAX_WORKER_THREADS>0 && eMode==INCRINIT_ROOT ){ |
2137 | /* PmaReaders should be normally initialized in order, as if they are |
2138 | ** reading from the same temp file this makes for more linear file IO. |
2139 | ** However, in the INCRINIT_ROOT case, if PmaReader aReadr[nTask-1] is |
2140 | ** in use it will block the vdbePmaReaderNext() call while it uses |
2141 | ** the main thread to fill its buffer. So calling PmaReaderNext() |
2142 | ** on this PmaReader before any of the multi-threaded PmaReaders takes |
2143 | ** better advantage of multi-processor hardware. */ |
2144 | rc = vdbePmaReaderNext(&pMerger->aReadr[nTree-i-1]); |
2145 | }else{ |
2146 | rc = vdbePmaReaderIncrInit(&pMerger->aReadr[i], INCRINIT_NORMAL); |
2147 | } |
2148 | if( rc!=SQLITE_OK ) return rc; |
2149 | } |
2150 | |
2151 | for(i=pMerger->nTree-1; i>0; i--){ |
2152 | vdbeMergeEngineCompare(pMerger, i); |
2153 | } |
2154 | return pTask->pUnpacked->errCode; |
2155 | } |
2156 | |
2157 | /* |
2158 | ** The PmaReader passed as the first argument is guaranteed to be an |
2159 | ** incremental-reader (pReadr->pIncr!=0). This function serves to open |
2160 | ** and/or initialize the temp file related fields of the IncrMerge |
2161 | ** object at (pReadr->pIncr). |
2162 | ** |
2163 | ** If argument eMode is set to INCRINIT_NORMAL, then all PmaReaders |
2164 | ** in the sub-tree headed by pReadr are also initialized. Data is then |
2165 | ** loaded into the buffers belonging to pReadr and it is set to point to |
2166 | ** the first key in its range. |
2167 | ** |
2168 | ** If argument eMode is set to INCRINIT_TASK, then pReadr is guaranteed |
2169 | ** to be a multi-threaded PmaReader and this function is being called in a |
2170 | ** background thread. In this case all PmaReaders in the sub-tree are |
2171 | ** initialized as for INCRINIT_NORMAL and the aFile[1] buffer belonging to |
2172 | ** pReadr is populated. However, pReadr itself is not set up to point |
2173 | ** to its first key. A call to vdbePmaReaderNext() is still required to do |
2174 | ** that. |
2175 | ** |
2176 | ** The reason this function does not call vdbePmaReaderNext() immediately |
2177 | ** in the INCRINIT_TASK case is that vdbePmaReaderNext() assumes that it has |
2178 | ** to block on thread (pTask->thread) before accessing aFile[1]. But, since |
2179 | ** this entire function is being run by thread (pTask->thread), that will |
2180 | ** lead to the current background thread attempting to join itself. |
2181 | ** |
2182 | ** Finally, if argument eMode is set to INCRINIT_ROOT, it may be assumed |
2183 | ** that pReadr->pIncr is a multi-threaded IncrMerge objects, and that all |
2184 | ** child-trees have already been initialized using IncrInit(INCRINIT_TASK). |
2185 | ** In this case vdbePmaReaderNext() is called on all child PmaReaders and |
2186 | ** the current PmaReader set to point to the first key in its range. |
2187 | ** |
2188 | ** SQLITE_OK is returned if successful, or an SQLite error code otherwise. |
2189 | */ |
2190 | static int vdbePmaReaderIncrMergeInit(PmaReader *pReadr, int eMode){ |
2191 | int rc = SQLITE_OK; |
2192 | IncrMerger *pIncr = pReadr->pIncr; |
2193 | SortSubtask *pTask = pIncr->pTask; |
2194 | sqlite3 *db = pTask->pSorter->db; |
2195 | |
2196 | /* eMode is always INCRINIT_NORMAL in single-threaded mode */ |
2197 | assert( SQLITE_MAX_WORKER_THREADS>0 || eMode==INCRINIT_NORMAL ); |
2198 | |
2199 | rc = vdbeMergeEngineInit(pTask, pIncr->pMerger, eMode); |
2200 | |
2201 | /* Set up the required files for pIncr. A multi-theaded IncrMerge object |
2202 | ** requires two temp files to itself, whereas a single-threaded object |
2203 | ** only requires a region of pTask->file2. */ |
2204 | if( rc==SQLITE_OK ){ |
2205 | int mxSz = pIncr->mxSz; |
2206 | #if SQLITE_MAX_WORKER_THREADS>0 |
2207 | if( pIncr->bUseThread ){ |
2208 | rc = vdbeSorterOpenTempFile(db, mxSz, &pIncr->aFile[0].pFd); |
2209 | if( rc==SQLITE_OK ){ |
2210 | rc = vdbeSorterOpenTempFile(db, mxSz, &pIncr->aFile[1].pFd); |
2211 | } |
2212 | }else |
2213 | #endif |
2214 | /*if( !pIncr->bUseThread )*/{ |
2215 | if( pTask->file2.pFd==0 ){ |
2216 | assert( pTask->file2.iEof>0 ); |
2217 | rc = vdbeSorterOpenTempFile(db, pTask->file2.iEof, &pTask->file2.pFd); |
2218 | pTask->file2.iEof = 0; |
2219 | } |
2220 | if( rc==SQLITE_OK ){ |
2221 | pIncr->aFile[1].pFd = pTask->file2.pFd; |
2222 | pIncr->iStartOff = pTask->file2.iEof; |
2223 | pTask->file2.iEof += mxSz; |
2224 | } |
2225 | } |
2226 | } |
2227 | |
2228 | #if SQLITE_MAX_WORKER_THREADS>0 |
2229 | if( rc==SQLITE_OK && pIncr->bUseThread ){ |
2230 | /* Use the current thread to populate aFile[1], even though this |
2231 | ** PmaReader is multi-threaded. If this is an INCRINIT_TASK object, |
2232 | ** then this function is already running in background thread |
2233 | ** pIncr->pTask->thread. |
2234 | ** |
2235 | ** If this is the INCRINIT_ROOT object, then it is running in the |
2236 | ** main VDBE thread. But that is Ok, as that thread cannot return |
2237 | ** control to the VDBE or proceed with anything useful until the |
2238 | ** first results are ready from this merger object anyway. |
2239 | */ |
2240 | assert( eMode==INCRINIT_ROOT || eMode==INCRINIT_TASK ); |
2241 | rc = vdbeIncrPopulate(pIncr); |
2242 | } |
2243 | #endif |
2244 | |
2245 | if( rc==SQLITE_OK && (SQLITE_MAX_WORKER_THREADS==0 || eMode!=INCRINIT_TASK) ){ |
2246 | rc = vdbePmaReaderNext(pReadr); |
2247 | } |
2248 | |
2249 | return rc; |
2250 | } |
2251 | |
2252 | #if SQLITE_MAX_WORKER_THREADS>0 |
2253 | /* |
2254 | ** The main routine for vdbePmaReaderIncrMergeInit() operations run in |
2255 | ** background threads. |
2256 | */ |
2257 | static void *vdbePmaReaderBgIncrInit(void *pCtx){ |
2258 | PmaReader *pReader = (PmaReader*)pCtx; |
2259 | void *pRet = SQLITE_INT_TO_PTR( |
2260 | vdbePmaReaderIncrMergeInit(pReader,INCRINIT_TASK) |
2261 | ); |
2262 | pReader->pIncr->pTask->bDone = 1; |
2263 | return pRet; |
2264 | } |
2265 | #endif |
2266 | |
2267 | /* |
2268 | ** If the PmaReader passed as the first argument is not an incremental-reader |
2269 | ** (if pReadr->pIncr==0), then this function is a no-op. Otherwise, it invokes |
2270 | ** the vdbePmaReaderIncrMergeInit() function with the parameters passed to |
2271 | ** this routine to initialize the incremental merge. |
2272 | ** |
2273 | ** If the IncrMerger object is multi-threaded (IncrMerger.bUseThread==1), |
2274 | ** then a background thread is launched to call vdbePmaReaderIncrMergeInit(). |
2275 | ** Or, if the IncrMerger is single threaded, the same function is called |
2276 | ** using the current thread. |
2277 | */ |
2278 | static int vdbePmaReaderIncrInit(PmaReader *pReadr, int eMode){ |
2279 | IncrMerger *pIncr = pReadr->pIncr; /* Incremental merger */ |
2280 | int rc = SQLITE_OK; /* Return code */ |
2281 | if( pIncr ){ |
2282 | #if SQLITE_MAX_WORKER_THREADS>0 |
2283 | assert( pIncr->bUseThread==0 || eMode==INCRINIT_TASK ); |
2284 | if( pIncr->bUseThread ){ |
2285 | void *pCtx = (void*)pReadr; |
2286 | rc = vdbeSorterCreateThread(pIncr->pTask, vdbePmaReaderBgIncrInit, pCtx); |
2287 | }else |
2288 | #endif |
2289 | { |
2290 | rc = vdbePmaReaderIncrMergeInit(pReadr, eMode); |
2291 | } |
2292 | } |
2293 | return rc; |
2294 | } |
2295 | |
2296 | /* |
2297 | ** Allocate a new MergeEngine object to merge the contents of nPMA level-0 |
2298 | ** PMAs from pTask->file. If no error occurs, set *ppOut to point to |
2299 | ** the new object and return SQLITE_OK. Or, if an error does occur, set *ppOut |
2300 | ** to NULL and return an SQLite error code. |
2301 | ** |
2302 | ** When this function is called, *piOffset is set to the offset of the |
2303 | ** first PMA to read from pTask->file. Assuming no error occurs, it is |
2304 | ** set to the offset immediately following the last byte of the last |
2305 | ** PMA before returning. If an error does occur, then the final value of |
2306 | ** *piOffset is undefined. |
2307 | */ |
2308 | static int vdbeMergeEngineLevel0( |
2309 | SortSubtask *pTask, /* Sorter task to read from */ |
2310 | int nPMA, /* Number of PMAs to read */ |
2311 | i64 *piOffset, /* IN/OUT: Readr offset in pTask->file */ |
2312 | MergeEngine **ppOut /* OUT: New merge-engine */ |
2313 | ){ |
2314 | MergeEngine *pNew; /* Merge engine to return */ |
2315 | i64 iOff = *piOffset; |
2316 | int i; |
2317 | int rc = SQLITE_OK; |
2318 | |
2319 | *ppOut = pNew = vdbeMergeEngineNew(nPMA); |
2320 | if( pNew==0 ) rc = SQLITE_NOMEM_BKPT; |
2321 | |
2322 | for(i=0; i<nPMA && rc==SQLITE_OK; i++){ |
2323 | i64 nDummy = 0; |
2324 | PmaReader *pReadr = &pNew->aReadr[i]; |
2325 | rc = vdbePmaReaderInit(pTask, &pTask->file, iOff, pReadr, &nDummy); |
2326 | iOff = pReadr->iEof; |
2327 | } |
2328 | |
2329 | if( rc!=SQLITE_OK ){ |
2330 | vdbeMergeEngineFree(pNew); |
2331 | *ppOut = 0; |
2332 | } |
2333 | *piOffset = iOff; |
2334 | return rc; |
2335 | } |
2336 | |
2337 | /* |
2338 | ** Return the depth of a tree comprising nPMA PMAs, assuming a fanout of |
2339 | ** SORTER_MAX_MERGE_COUNT. The returned value does not include leaf nodes. |
2340 | ** |
2341 | ** i.e. |
2342 | ** |
2343 | ** nPMA<=16 -> TreeDepth() == 0 |
2344 | ** nPMA<=256 -> TreeDepth() == 1 |
2345 | ** nPMA<=65536 -> TreeDepth() == 2 |
2346 | */ |
2347 | static int vdbeSorterTreeDepth(int nPMA){ |
2348 | int nDepth = 0; |
2349 | i64 nDiv = SORTER_MAX_MERGE_COUNT; |
2350 | while( nDiv < (i64)nPMA ){ |
2351 | nDiv = nDiv * SORTER_MAX_MERGE_COUNT; |
2352 | nDepth++; |
2353 | } |
2354 | return nDepth; |
2355 | } |
2356 | |
2357 | /* |
2358 | ** pRoot is the root of an incremental merge-tree with depth nDepth (according |
2359 | ** to vdbeSorterTreeDepth()). pLeaf is the iSeq'th leaf to be added to the |
2360 | ** tree, counting from zero. This function adds pLeaf to the tree. |
2361 | ** |
2362 | ** If successful, SQLITE_OK is returned. If an error occurs, an SQLite error |
2363 | ** code is returned and pLeaf is freed. |
2364 | */ |
2365 | static int vdbeSorterAddToTree( |
2366 | SortSubtask *pTask, /* Task context */ |
2367 | int nDepth, /* Depth of tree according to TreeDepth() */ |
2368 | int iSeq, /* Sequence number of leaf within tree */ |
2369 | MergeEngine *pRoot, /* Root of tree */ |
2370 | MergeEngine *pLeaf /* Leaf to add to tree */ |
2371 | ){ |
2372 | int rc = SQLITE_OK; |
2373 | int nDiv = 1; |
2374 | int i; |
2375 | MergeEngine *p = pRoot; |
2376 | IncrMerger *pIncr; |
2377 | |
2378 | rc = vdbeIncrMergerNew(pTask, pLeaf, &pIncr); |
2379 | |
2380 | for(i=1; i<nDepth; i++){ |
2381 | nDiv = nDiv * SORTER_MAX_MERGE_COUNT; |
2382 | } |
2383 | |
2384 | for(i=1; i<nDepth && rc==SQLITE_OK; i++){ |
2385 | int iIter = (iSeq / nDiv) % SORTER_MAX_MERGE_COUNT; |
2386 | PmaReader *pReadr = &p->aReadr[iIter]; |
2387 | |
2388 | if( pReadr->pIncr==0 ){ |
2389 | MergeEngine *pNew = vdbeMergeEngineNew(SORTER_MAX_MERGE_COUNT); |
2390 | if( pNew==0 ){ |
2391 | rc = SQLITE_NOMEM_BKPT; |
2392 | }else{ |
2393 | rc = vdbeIncrMergerNew(pTask, pNew, &pReadr->pIncr); |
2394 | } |
2395 | } |
2396 | if( rc==SQLITE_OK ){ |
2397 | p = pReadr->pIncr->pMerger; |
2398 | nDiv = nDiv / SORTER_MAX_MERGE_COUNT; |
2399 | } |
2400 | } |
2401 | |
2402 | if( rc==SQLITE_OK ){ |
2403 | p->aReadr[iSeq % SORTER_MAX_MERGE_COUNT].pIncr = pIncr; |
2404 | }else{ |
2405 | vdbeIncrFree(pIncr); |
2406 | } |
2407 | return rc; |
2408 | } |
2409 | |
2410 | /* |
2411 | ** This function is called as part of a SorterRewind() operation on a sorter |
2412 | ** that has already written two or more level-0 PMAs to one or more temp |
2413 | ** files. It builds a tree of MergeEngine/IncrMerger/PmaReader objects that |
2414 | ** can be used to incrementally merge all PMAs on disk. |
2415 | ** |
2416 | ** If successful, SQLITE_OK is returned and *ppOut set to point to the |
2417 | ** MergeEngine object at the root of the tree before returning. Or, if an |
2418 | ** error occurs, an SQLite error code is returned and the final value |
2419 | ** of *ppOut is undefined. |
2420 | */ |
2421 | static int vdbeSorterMergeTreeBuild( |
2422 | VdbeSorter *pSorter, /* The VDBE cursor that implements the sort */ |
2423 | MergeEngine **ppOut /* Write the MergeEngine here */ |
2424 | ){ |
2425 | MergeEngine *pMain = 0; |
2426 | int rc = SQLITE_OK; |
2427 | int iTask; |
2428 | |
2429 | #if SQLITE_MAX_WORKER_THREADS>0 |
2430 | /* If the sorter uses more than one task, then create the top-level |
2431 | ** MergeEngine here. This MergeEngine will read data from exactly |
2432 | ** one PmaReader per sub-task. */ |
2433 | assert( pSorter->bUseThreads || pSorter->nTask==1 ); |
2434 | if( pSorter->nTask>1 ){ |
2435 | pMain = vdbeMergeEngineNew(pSorter->nTask); |
2436 | if( pMain==0 ) rc = SQLITE_NOMEM_BKPT; |
2437 | } |
2438 | #endif |
2439 | |
2440 | for(iTask=0; rc==SQLITE_OK && iTask<pSorter->nTask; iTask++){ |
2441 | SortSubtask *pTask = &pSorter->aTask[iTask]; |
2442 | assert( pTask->nPMA>0 || SQLITE_MAX_WORKER_THREADS>0 ); |
2443 | if( SQLITE_MAX_WORKER_THREADS==0 || pTask->nPMA ){ |
2444 | MergeEngine *pRoot = 0; /* Root node of tree for this task */ |
2445 | int nDepth = vdbeSorterTreeDepth(pTask->nPMA); |
2446 | i64 iReadOff = 0; |
2447 | |
2448 | if( pTask->nPMA<=SORTER_MAX_MERGE_COUNT ){ |
2449 | rc = vdbeMergeEngineLevel0(pTask, pTask->nPMA, &iReadOff, &pRoot); |
2450 | }else{ |
2451 | int i; |
2452 | int iSeq = 0; |
2453 | pRoot = vdbeMergeEngineNew(SORTER_MAX_MERGE_COUNT); |
2454 | if( pRoot==0 ) rc = SQLITE_NOMEM_BKPT; |
2455 | for(i=0; i<pTask->nPMA && rc==SQLITE_OK; i += SORTER_MAX_MERGE_COUNT){ |
2456 | MergeEngine *pMerger = 0; /* New level-0 PMA merger */ |
2457 | int nReader; /* Number of level-0 PMAs to merge */ |
2458 | |
2459 | nReader = MIN(pTask->nPMA - i, SORTER_MAX_MERGE_COUNT); |
2460 | rc = vdbeMergeEngineLevel0(pTask, nReader, &iReadOff, &pMerger); |
2461 | if( rc==SQLITE_OK ){ |
2462 | rc = vdbeSorterAddToTree(pTask, nDepth, iSeq++, pRoot, pMerger); |
2463 | } |
2464 | } |
2465 | } |
2466 | |
2467 | if( rc==SQLITE_OK ){ |
2468 | #if SQLITE_MAX_WORKER_THREADS>0 |
2469 | if( pMain!=0 ){ |
2470 | rc = vdbeIncrMergerNew(pTask, pRoot, &pMain->aReadr[iTask].pIncr); |
2471 | }else |
2472 | #endif |
2473 | { |
2474 | assert( pMain==0 ); |
2475 | pMain = pRoot; |
2476 | } |
2477 | }else{ |
2478 | vdbeMergeEngineFree(pRoot); |
2479 | } |
2480 | } |
2481 | } |
2482 | |
2483 | if( rc!=SQLITE_OK ){ |
2484 | vdbeMergeEngineFree(pMain); |
2485 | pMain = 0; |
2486 | } |
2487 | *ppOut = pMain; |
2488 | return rc; |
2489 | } |
2490 | |
2491 | /* |
2492 | ** This function is called as part of an sqlite3VdbeSorterRewind() operation |
2493 | ** on a sorter that has written two or more PMAs to temporary files. It sets |
2494 | ** up either VdbeSorter.pMerger (for single threaded sorters) or pReader |
2495 | ** (for multi-threaded sorters) so that it can be used to iterate through |
2496 | ** all records stored in the sorter. |
2497 | ** |
2498 | ** SQLITE_OK is returned if successful, or an SQLite error code otherwise. |
2499 | */ |
2500 | static int vdbeSorterSetupMerge(VdbeSorter *pSorter){ |
2501 | int rc; /* Return code */ |
2502 | SortSubtask *pTask0 = &pSorter->aTask[0]; |
2503 | MergeEngine *pMain = 0; |
2504 | #if SQLITE_MAX_WORKER_THREADS |
2505 | sqlite3 *db = pTask0->pSorter->db; |
2506 | int i; |
2507 | SorterCompare xCompare = vdbeSorterGetCompare(pSorter); |
2508 | for(i=0; i<pSorter->nTask; i++){ |
2509 | pSorter->aTask[i].xCompare = xCompare; |
2510 | } |
2511 | #endif |
2512 | |
2513 | rc = vdbeSorterMergeTreeBuild(pSorter, &pMain); |
2514 | if( rc==SQLITE_OK ){ |
2515 | #if SQLITE_MAX_WORKER_THREADS |
2516 | assert( pSorter->bUseThreads==0 || pSorter->nTask>1 ); |
2517 | if( pSorter->bUseThreads ){ |
2518 | int iTask; |
2519 | PmaReader *pReadr = 0; |
2520 | SortSubtask *pLast = &pSorter->aTask[pSorter->nTask-1]; |
2521 | rc = vdbeSortAllocUnpacked(pLast); |
2522 | if( rc==SQLITE_OK ){ |
2523 | pReadr = (PmaReader*)sqlite3DbMallocZero(db, sizeof(PmaReader)); |
2524 | pSorter->pReader = pReadr; |
2525 | if( pReadr==0 ) rc = SQLITE_NOMEM_BKPT; |
2526 | } |
2527 | if( rc==SQLITE_OK ){ |
2528 | rc = vdbeIncrMergerNew(pLast, pMain, &pReadr->pIncr); |
2529 | if( rc==SQLITE_OK ){ |
2530 | vdbeIncrMergerSetThreads(pReadr->pIncr); |
2531 | for(iTask=0; iTask<(pSorter->nTask-1); iTask++){ |
2532 | IncrMerger *pIncr; |
2533 | if( (pIncr = pMain->aReadr[iTask].pIncr) ){ |
2534 | vdbeIncrMergerSetThreads(pIncr); |
2535 | assert( pIncr->pTask!=pLast ); |
2536 | } |
2537 | } |
2538 | for(iTask=0; rc==SQLITE_OK && iTask<pSorter->nTask; iTask++){ |
2539 | /* Check that: |
2540 | ** |
2541 | ** a) The incremental merge object is configured to use the |
2542 | ** right task, and |
2543 | ** b) If it is using task (nTask-1), it is configured to run |
2544 | ** in single-threaded mode. This is important, as the |
2545 | ** root merge (INCRINIT_ROOT) will be using the same task |
2546 | ** object. |
2547 | */ |
2548 | PmaReader *p = &pMain->aReadr[iTask]; |
2549 | assert( p->pIncr==0 || ( |
2550 | (p->pIncr->pTask==&pSorter->aTask[iTask]) /* a */ |
2551 | && (iTask!=pSorter->nTask-1 || p->pIncr->bUseThread==0) /* b */ |
2552 | )); |
2553 | rc = vdbePmaReaderIncrInit(p, INCRINIT_TASK); |
2554 | } |
2555 | } |
2556 | pMain = 0; |
2557 | } |
2558 | if( rc==SQLITE_OK ){ |
2559 | rc = vdbePmaReaderIncrMergeInit(pReadr, INCRINIT_ROOT); |
2560 | } |
2561 | }else |
2562 | #endif |
2563 | { |
2564 | rc = vdbeMergeEngineInit(pTask0, pMain, INCRINIT_NORMAL); |
2565 | pSorter->pMerger = pMain; |
2566 | pMain = 0; |
2567 | } |
2568 | } |
2569 | |
2570 | if( rc!=SQLITE_OK ){ |
2571 | vdbeMergeEngineFree(pMain); |
2572 | } |
2573 | return rc; |
2574 | } |
2575 | |
2576 | |
2577 | /* |
2578 | ** Once the sorter has been populated by calls to sqlite3VdbeSorterWrite, |
2579 | ** this function is called to prepare for iterating through the records |
2580 | ** in sorted order. |
2581 | */ |
2582 | int sqlite3VdbeSorterRewind(const VdbeCursor *pCsr, int *pbEof){ |
2583 | VdbeSorter *pSorter; |
2584 | int rc = SQLITE_OK; /* Return code */ |
2585 | |
2586 | assert( pCsr->eCurType==CURTYPE_SORTER ); |
2587 | pSorter = pCsr->uc.pSorter; |
2588 | assert( pSorter ); |
2589 | |
2590 | /* If no data has been written to disk, then do not do so now. Instead, |
2591 | ** sort the VdbeSorter.pRecord list. The vdbe layer will read data directly |
2592 | ** from the in-memory list. */ |
2593 | if( pSorter->bUsePMA==0 ){ |
2594 | if( pSorter->list.pList ){ |
2595 | *pbEof = 0; |
2596 | rc = vdbeSorterSort(&pSorter->aTask[0], &pSorter->list); |
2597 | }else{ |
2598 | *pbEof = 1; |
2599 | } |
2600 | return rc; |
2601 | } |
2602 | |
2603 | /* Write the current in-memory list to a PMA. When the VdbeSorterWrite() |
2604 | ** function flushes the contents of memory to disk, it immediately always |
2605 | ** creates a new list consisting of a single key immediately afterwards. |
2606 | ** So the list is never empty at this point. */ |
2607 | assert( pSorter->list.pList ); |
2608 | rc = vdbeSorterFlushPMA(pSorter); |
2609 | |
2610 | /* Join all threads */ |
2611 | rc = vdbeSorterJoinAll(pSorter, rc); |
2612 | |
2613 | vdbeSorterRewindDebug("rewind" ); |
2614 | |
2615 | /* Assuming no errors have occurred, set up a merger structure to |
2616 | ** incrementally read and merge all remaining PMAs. */ |
2617 | assert( pSorter->pReader==0 ); |
2618 | if( rc==SQLITE_OK ){ |
2619 | rc = vdbeSorterSetupMerge(pSorter); |
2620 | *pbEof = 0; |
2621 | } |
2622 | |
2623 | vdbeSorterRewindDebug("rewinddone" ); |
2624 | return rc; |
2625 | } |
2626 | |
2627 | /* |
2628 | ** Advance to the next element in the sorter. Return value: |
2629 | ** |
2630 | ** SQLITE_OK success |
2631 | ** SQLITE_DONE end of data |
2632 | ** otherwise some kind of error. |
2633 | */ |
2634 | int sqlite3VdbeSorterNext(sqlite3 *db, const VdbeCursor *pCsr){ |
2635 | VdbeSorter *pSorter; |
2636 | int rc; /* Return code */ |
2637 | |
2638 | assert( pCsr->eCurType==CURTYPE_SORTER ); |
2639 | pSorter = pCsr->uc.pSorter; |
2640 | assert( pSorter->bUsePMA || (pSorter->pReader==0 && pSorter->pMerger==0) ); |
2641 | if( pSorter->bUsePMA ){ |
2642 | assert( pSorter->pReader==0 || pSorter->pMerger==0 ); |
2643 | assert( pSorter->bUseThreads==0 || pSorter->pReader ); |
2644 | assert( pSorter->bUseThreads==1 || pSorter->pMerger ); |
2645 | #if SQLITE_MAX_WORKER_THREADS>0 |
2646 | if( pSorter->bUseThreads ){ |
2647 | rc = vdbePmaReaderNext(pSorter->pReader); |
2648 | if( rc==SQLITE_OK && pSorter->pReader->pFd==0 ) rc = SQLITE_DONE; |
2649 | }else |
2650 | #endif |
2651 | /*if( !pSorter->bUseThreads )*/ { |
2652 | int res = 0; |
2653 | assert( pSorter->pMerger!=0 ); |
2654 | assert( pSorter->pMerger->pTask==(&pSorter->aTask[0]) ); |
2655 | rc = vdbeMergeEngineStep(pSorter->pMerger, &res); |
2656 | if( rc==SQLITE_OK && res ) rc = SQLITE_DONE; |
2657 | } |
2658 | }else{ |
2659 | SorterRecord *pFree = pSorter->list.pList; |
2660 | pSorter->list.pList = pFree->u.pNext; |
2661 | pFree->u.pNext = 0; |
2662 | if( pSorter->list.aMemory==0 ) vdbeSorterRecordFree(db, pFree); |
2663 | rc = pSorter->list.pList ? SQLITE_OK : SQLITE_DONE; |
2664 | } |
2665 | return rc; |
2666 | } |
2667 | |
2668 | /* |
2669 | ** Return a pointer to a buffer owned by the sorter that contains the |
2670 | ** current key. |
2671 | */ |
2672 | static void *vdbeSorterRowkey( |
2673 | const VdbeSorter *pSorter, /* Sorter object */ |
2674 | int *pnKey /* OUT: Size of current key in bytes */ |
2675 | ){ |
2676 | void *pKey; |
2677 | if( pSorter->bUsePMA ){ |
2678 | PmaReader *pReader; |
2679 | #if SQLITE_MAX_WORKER_THREADS>0 |
2680 | if( pSorter->bUseThreads ){ |
2681 | pReader = pSorter->pReader; |
2682 | }else |
2683 | #endif |
2684 | /*if( !pSorter->bUseThreads )*/{ |
2685 | pReader = &pSorter->pMerger->aReadr[pSorter->pMerger->aTree[1]]; |
2686 | } |
2687 | *pnKey = pReader->nKey; |
2688 | pKey = pReader->aKey; |
2689 | }else{ |
2690 | *pnKey = pSorter->list.pList->nVal; |
2691 | pKey = SRVAL(pSorter->list.pList); |
2692 | } |
2693 | return pKey; |
2694 | } |
2695 | |
2696 | /* |
2697 | ** Copy the current sorter key into the memory cell pOut. |
2698 | */ |
2699 | int sqlite3VdbeSorterRowkey(const VdbeCursor *pCsr, Mem *pOut){ |
2700 | VdbeSorter *pSorter; |
2701 | void *pKey; int nKey; /* Sorter key to copy into pOut */ |
2702 | |
2703 | assert( pCsr->eCurType==CURTYPE_SORTER ); |
2704 | pSorter = pCsr->uc.pSorter; |
2705 | pKey = vdbeSorterRowkey(pSorter, &nKey); |
2706 | if( sqlite3VdbeMemClearAndResize(pOut, nKey) ){ |
2707 | return SQLITE_NOMEM_BKPT; |
2708 | } |
2709 | pOut->n = nKey; |
2710 | MemSetTypeFlag(pOut, MEM_Blob); |
2711 | memcpy(pOut->z, pKey, nKey); |
2712 | |
2713 | return SQLITE_OK; |
2714 | } |
2715 | |
2716 | /* |
2717 | ** Compare the key in memory cell pVal with the key that the sorter cursor |
2718 | ** passed as the first argument currently points to. For the purposes of |
2719 | ** the comparison, ignore the rowid field at the end of each record. |
2720 | ** |
2721 | ** If the sorter cursor key contains any NULL values, consider it to be |
2722 | ** less than pVal. Even if pVal also contains NULL values. |
2723 | ** |
2724 | ** If an error occurs, return an SQLite error code (i.e. SQLITE_NOMEM). |
2725 | ** Otherwise, set *pRes to a negative, zero or positive value if the |
2726 | ** key in pVal is smaller than, equal to or larger than the current sorter |
2727 | ** key. |
2728 | ** |
2729 | ** This routine forms the core of the OP_SorterCompare opcode, which in |
2730 | ** turn is used to verify uniqueness when constructing a UNIQUE INDEX. |
2731 | */ |
2732 | int sqlite3VdbeSorterCompare( |
2733 | const VdbeCursor *pCsr, /* Sorter cursor */ |
2734 | Mem *pVal, /* Value to compare to current sorter key */ |
2735 | int nKeyCol, /* Compare this many columns */ |
2736 | int *pRes /* OUT: Result of comparison */ |
2737 | ){ |
2738 | VdbeSorter *pSorter; |
2739 | UnpackedRecord *r2; |
2740 | KeyInfo *pKeyInfo; |
2741 | int i; |
2742 | void *pKey; int nKey; /* Sorter key to compare pVal with */ |
2743 | |
2744 | assert( pCsr->eCurType==CURTYPE_SORTER ); |
2745 | pSorter = pCsr->uc.pSorter; |
2746 | r2 = pSorter->pUnpacked; |
2747 | pKeyInfo = pCsr->pKeyInfo; |
2748 | if( r2==0 ){ |
2749 | r2 = pSorter->pUnpacked = sqlite3VdbeAllocUnpackedRecord(pKeyInfo); |
2750 | if( r2==0 ) return SQLITE_NOMEM_BKPT; |
2751 | r2->nField = nKeyCol; |
2752 | } |
2753 | assert( r2->nField==nKeyCol ); |
2754 | |
2755 | pKey = vdbeSorterRowkey(pSorter, &nKey); |
2756 | sqlite3VdbeRecordUnpack(pKeyInfo, nKey, pKey, r2); |
2757 | for(i=0; i<nKeyCol; i++){ |
2758 | if( r2->aMem[i].flags & MEM_Null ){ |
2759 | *pRes = -1; |
2760 | return SQLITE_OK; |
2761 | } |
2762 | } |
2763 | |
2764 | *pRes = sqlite3VdbeRecordCompare(pVal->n, pVal->z, r2); |
2765 | return SQLITE_OK; |
2766 | } |
2767 | |