1/*
2** 2011-07-09
3**
4** The author disclaims copyright to this source code. In place of
5** a legal notice, here is a blessing:
6**
7** May you do good and not evil.
8** May you find forgiveness for yourself and forgive others.
9** May you share freely, never taking more than you give.
10**
11*************************************************************************
12** This file contains code for the VdbeSorter object, used in concert with
13** a VdbeCursor to sort large numbers of keys for CREATE INDEX statements
14** or by SELECT statements with ORDER BY clauses that cannot be satisfied
15** using indexes and without LIMIT clauses.
16**
17** The VdbeSorter object implements a multi-threaded external merge sort
18** algorithm that is efficient even if the number of elements being sorted
19** exceeds the available memory.
20**
21** Here is the (internal, non-API) interface between this module and the
22** rest of the SQLite system:
23**
24** sqlite3VdbeSorterInit() Create a new VdbeSorter object.
25**
26** sqlite3VdbeSorterWrite() Add a single new row to the VdbeSorter
27** object. The row is a binary blob in the
28** OP_MakeRecord format that contains both
29** the ORDER BY key columns and result columns
30** in the case of a SELECT w/ ORDER BY, or
31** the complete record for an index entry
32** in the case of a CREATE INDEX.
33**
34** sqlite3VdbeSorterRewind() Sort all content previously added.
35** Position the read cursor on the
36** first sorted element.
37**
38** sqlite3VdbeSorterNext() Advance the read cursor to the next sorted
39** element.
40**
41** sqlite3VdbeSorterRowkey() Return the complete binary blob for the
42** row currently under the read cursor.
43**
44** sqlite3VdbeSorterCompare() Compare the binary blob for the row
45** currently under the read cursor against
46** another binary blob X and report if
47** X is strictly less than the read cursor.
48** Used to enforce uniqueness in a
49** CREATE UNIQUE INDEX statement.
50**
51** sqlite3VdbeSorterClose() Close the VdbeSorter object and reclaim
52** all resources.
53**
54** sqlite3VdbeSorterReset() Refurbish the VdbeSorter for reuse. This
55** is like Close() followed by Init() only
56** much faster.
57**
58** The interfaces above must be called in a particular order. Write() can
59** only occur in between Init()/Reset() and Rewind(). Next(), Rowkey(), and
60** Compare() can only occur in between Rewind() and Close()/Reset(). i.e.
61**
62** Init()
63** for each record: Write()
64** Rewind()
65** Rowkey()/Compare()
66** Next()
67** Close()
68**
69** Algorithm:
70**
71** Records passed to the sorter via calls to Write() are initially held
72** unsorted in main memory. Assuming the amount of memory used never exceeds
73** a threshold, when Rewind() is called the set of records is sorted using
74** an in-memory merge sort. In this case, no temporary files are required
75** and subsequent calls to Rowkey(), Next() and Compare() read records
76** directly from main memory.
77**
78** If the amount of space used to store records in main memory exceeds the
79** threshold, then the set of records currently in memory are sorted and
80** written to a temporary file in "Packed Memory Array" (PMA) format.
81** A PMA created at this point is known as a "level-0 PMA". Higher levels
82** of PMAs may be created by merging existing PMAs together - for example
83** merging two or more level-0 PMAs together creates a level-1 PMA.
84**
85** The threshold for the amount of main memory to use before flushing
86** records to a PMA is roughly the same as the limit configured for the
87** page-cache of the main database. Specifically, the threshold is set to
88** the value returned by "PRAGMA main.page_size" multipled by
89** that returned by "PRAGMA main.cache_size", in bytes.
90**
91** If the sorter is running in single-threaded mode, then all PMAs generated
92** are appended to a single temporary file. Or, if the sorter is running in
93** multi-threaded mode then up to (N+1) temporary files may be opened, where
94** N is the configured number of worker threads. In this case, instead of
95** sorting the records and writing the PMA to a temporary file itself, the
96** calling thread usually launches a worker thread to do so. Except, if
97** there are already N worker threads running, the main thread does the work
98** itself.
99**
100** The sorter is running in multi-threaded mode if (a) the library was built
101** with pre-processor symbol SQLITE_MAX_WORKER_THREADS set to a value greater
102** than zero, and (b) worker threads have been enabled at runtime by calling
103** "PRAGMA threads=N" with some value of N greater than 0.
104**
105** When Rewind() is called, any data remaining in memory is flushed to a
106** final PMA. So at this point the data is stored in some number of sorted
107** PMAs within temporary files on disk.
108**
109** If there are fewer than SORTER_MAX_MERGE_COUNT PMAs in total and the
110** sorter is running in single-threaded mode, then these PMAs are merged
111** incrementally as keys are retreived from the sorter by the VDBE. The
112** MergeEngine object, described in further detail below, performs this
113** merge.
114**
115** Or, if running in multi-threaded mode, then a background thread is
116** launched to merge the existing PMAs. Once the background thread has
117** merged T bytes of data into a single sorted PMA, the main thread
118** begins reading keys from that PMA while the background thread proceeds
119** with merging the next T bytes of data. And so on.
120**
121** Parameter T is set to half the value of the memory threshold used
122** by Write() above to determine when to create a new PMA.
123**
124** If there are more than SORTER_MAX_MERGE_COUNT PMAs in total when
125** Rewind() is called, then a hierarchy of incremental-merges is used.
126** First, T bytes of data from the first SORTER_MAX_MERGE_COUNT PMAs on
127** disk are merged together. Then T bytes of data from the second set, and
128** so on, such that no operation ever merges more than SORTER_MAX_MERGE_COUNT
129** PMAs at a time. This done is to improve locality.
130**
131** If running in multi-threaded mode and there are more than
132** SORTER_MAX_MERGE_COUNT PMAs on disk when Rewind() is called, then more
133** than one background thread may be created. Specifically, there may be
134** one background thread for each temporary file on disk, and one background
135** thread to merge the output of each of the others to a single PMA for
136** the main thread to read from.
137*/
138#include "sqliteInt.h"
139#include "vdbeInt.h"
140
141/*
142** If SQLITE_DEBUG_SORTER_THREADS is defined, this module outputs various
143** messages to stderr that may be helpful in understanding the performance
144** characteristics of the sorter in multi-threaded mode.
145*/
146#if 0
147# define SQLITE_DEBUG_SORTER_THREADS 1
148#endif
149
150/*
151** Hard-coded maximum amount of data to accumulate in memory before flushing
152** to a level 0 PMA. The purpose of this limit is to prevent various integer
153** overflows. 512MiB.
154*/
155#define SQLITE_MAX_PMASZ (1<<29)
156
157/*
158** Private objects used by the sorter
159*/
160typedef struct MergeEngine MergeEngine; /* Merge PMAs together */
161typedef struct PmaReader PmaReader; /* Incrementally read one PMA */
162typedef struct PmaWriter PmaWriter; /* Incrementally write one PMA */
163typedef struct SorterRecord SorterRecord; /* A record being sorted */
164typedef struct SortSubtask SortSubtask; /* A sub-task in the sort process */
165typedef struct SorterFile SorterFile; /* Temporary file object wrapper */
166typedef struct SorterList SorterList; /* In-memory list of records */
167typedef struct IncrMerger IncrMerger; /* Read & merge multiple PMAs */
168
169/*
170** A container for a temp file handle and the current amount of data
171** stored in the file.
172*/
173struct SorterFile {
174 sqlite3_file *pFd; /* File handle */
175 i64 iEof; /* Bytes of data stored in pFd */
176};
177
178/*
179** An in-memory list of objects to be sorted.
180**
181** If aMemory==0 then each object is allocated separately and the objects
182** are connected using SorterRecord.u.pNext. If aMemory!=0 then all objects
183** are stored in the aMemory[] bulk memory, one right after the other, and
184** are connected using SorterRecord.u.iNext.
185*/
186struct SorterList {
187 SorterRecord *pList; /* Linked list of records */
188 u8 *aMemory; /* If non-NULL, bulk memory to hold pList */
189 int szPMA; /* Size of pList as PMA in bytes */
190};
191
192/*
193** The MergeEngine object is used to combine two or more smaller PMAs into
194** one big PMA using a merge operation. Separate PMAs all need to be
195** combined into one big PMA in order to be able to step through the sorted
196** records in order.
197**
198** The aReadr[] array contains a PmaReader object for each of the PMAs being
199** merged. An aReadr[] object either points to a valid key or else is at EOF.
200** ("EOF" means "End Of File". When aReadr[] is at EOF there is no more data.)
201** For the purposes of the paragraphs below, we assume that the array is
202** actually N elements in size, where N is the smallest power of 2 greater
203** to or equal to the number of PMAs being merged. The extra aReadr[] elements
204** are treated as if they are empty (always at EOF).
205**
206** The aTree[] array is also N elements in size. The value of N is stored in
207** the MergeEngine.nTree variable.
208**
209** The final (N/2) elements of aTree[] contain the results of comparing
210** pairs of PMA keys together. Element i contains the result of
211** comparing aReadr[2*i-N] and aReadr[2*i-N+1]. Whichever key is smaller, the
212** aTree element is set to the index of it.
213**
214** For the purposes of this comparison, EOF is considered greater than any
215** other key value. If the keys are equal (only possible with two EOF
216** values), it doesn't matter which index is stored.
217**
218** The (N/4) elements of aTree[] that precede the final (N/2) described
219** above contains the index of the smallest of each block of 4 PmaReaders
220** And so on. So that aTree[1] contains the index of the PmaReader that
221** currently points to the smallest key value. aTree[0] is unused.
222**
223** Example:
224**
225** aReadr[0] -> Banana
226** aReadr[1] -> Feijoa
227** aReadr[2] -> Elderberry
228** aReadr[3] -> Currant
229** aReadr[4] -> Grapefruit
230** aReadr[5] -> Apple
231** aReadr[6] -> Durian
232** aReadr[7] -> EOF
233**
234** aTree[] = { X, 5 0, 5 0, 3, 5, 6 }
235**
236** The current element is "Apple" (the value of the key indicated by
237** PmaReader 5). When the Next() operation is invoked, PmaReader 5 will
238** be advanced to the next key in its segment. Say the next key is
239** "Eggplant":
240**
241** aReadr[5] -> Eggplant
242**
243** The contents of aTree[] are updated first by comparing the new PmaReader
244** 5 key to the current key of PmaReader 4 (still "Grapefruit"). The PmaReader
245** 5 value is still smaller, so aTree[6] is set to 5. And so on up the tree.
246** The value of PmaReader 6 - "Durian" - is now smaller than that of PmaReader
247** 5, so aTree[3] is set to 6. Key 0 is smaller than key 6 (Banana<Durian),
248** so the value written into element 1 of the array is 0. As follows:
249**
250** aTree[] = { X, 0 0, 6 0, 3, 5, 6 }
251**
252** In other words, each time we advance to the next sorter element, log2(N)
253** key comparison operations are required, where N is the number of segments
254** being merged (rounded up to the next power of 2).
255*/
256struct MergeEngine {
257 int nTree; /* Used size of aTree/aReadr (power of 2) */
258 SortSubtask *pTask; /* Used by this thread only */
259 int *aTree; /* Current state of incremental merge */
260 PmaReader *aReadr; /* Array of PmaReaders to merge data from */
261};
262
263/*
264** This object represents a single thread of control in a sort operation.
265** Exactly VdbeSorter.nTask instances of this object are allocated
266** as part of each VdbeSorter object. Instances are never allocated any
267** other way. VdbeSorter.nTask is set to the number of worker threads allowed
268** (see SQLITE_CONFIG_WORKER_THREADS) plus one (the main thread). Thus for
269** single-threaded operation, there is exactly one instance of this object
270** and for multi-threaded operation there are two or more instances.
271**
272** Essentially, this structure contains all those fields of the VdbeSorter
273** structure for which each thread requires a separate instance. For example,
274** each thread requries its own UnpackedRecord object to unpack records in
275** as part of comparison operations.
276**
277** Before a background thread is launched, variable bDone is set to 0. Then,
278** right before it exits, the thread itself sets bDone to 1. This is used for
279** two purposes:
280**
281** 1. When flushing the contents of memory to a level-0 PMA on disk, to
282** attempt to select a SortSubtask for which there is not already an
283** active background thread (since doing so causes the main thread
284** to block until it finishes).
285**
286** 2. If SQLITE_DEBUG_SORTER_THREADS is defined, to determine if a call
287** to sqlite3ThreadJoin() is likely to block. Cases that are likely to
288** block provoke debugging output.
289**
290** In both cases, the effects of the main thread seeing (bDone==0) even
291** after the thread has finished are not dire. So we don't worry about
292** memory barriers and such here.
293*/
294typedef int (*SorterCompare)(SortSubtask*,int*,const void*,int,const void*,int);
295struct SortSubtask {
296 SQLiteThread *pThread; /* Background thread, if any */
297 int bDone; /* Set if thread is finished but not joined */
298 VdbeSorter *pSorter; /* Sorter that owns this sub-task */
299 UnpackedRecord *pUnpacked; /* Space to unpack a record */
300 SorterList list; /* List for thread to write to a PMA */
301 int nPMA; /* Number of PMAs currently in file */
302 SorterCompare xCompare; /* Compare function to use */
303 SorterFile file; /* Temp file for level-0 PMAs */
304 SorterFile file2; /* Space for other PMAs */
305};
306
307
308/*
309** Main sorter structure. A single instance of this is allocated for each
310** sorter cursor created by the VDBE.
311**
312** mxKeysize:
313** As records are added to the sorter by calls to sqlite3VdbeSorterWrite(),
314** this variable is updated so as to be set to the size on disk of the
315** largest record in the sorter.
316*/
317struct VdbeSorter {
318 int mnPmaSize; /* Minimum PMA size, in bytes */
319 int mxPmaSize; /* Maximum PMA size, in bytes. 0==no limit */
320 int mxKeysize; /* Largest serialized key seen so far */
321 int pgsz; /* Main database page size */
322 PmaReader *pReader; /* Readr data from here after Rewind() */
323 MergeEngine *pMerger; /* Or here, if bUseThreads==0 */
324 sqlite3 *db; /* Database connection */
325 KeyInfo *pKeyInfo; /* How to compare records */
326 UnpackedRecord *pUnpacked; /* Used by VdbeSorterCompare() */
327 SorterList list; /* List of in-memory records */
328 int iMemory; /* Offset of free space in list.aMemory */
329 int nMemory; /* Size of list.aMemory allocation in bytes */
330 u8 bUsePMA; /* True if one or more PMAs created */
331 u8 bUseThreads; /* True to use background threads */
332 u8 iPrev; /* Previous thread used to flush PMA */
333 u8 nTask; /* Size of aTask[] array */
334 u8 typeMask;
335 SortSubtask aTask[1]; /* One or more subtasks */
336};
337
338#define SORTER_TYPE_INTEGER 0x01
339#define SORTER_TYPE_TEXT 0x02
340
341/*
342** An instance of the following object is used to read records out of a
343** PMA, in sorted order. The next key to be read is cached in nKey/aKey.
344** aKey might point into aMap or into aBuffer. If neither of those locations
345** contain a contiguous representation of the key, then aAlloc is allocated
346** and the key is copied into aAlloc and aKey is made to poitn to aAlloc.
347**
348** pFd==0 at EOF.
349*/
350struct PmaReader {
351 i64 iReadOff; /* Current read offset */
352 i64 iEof; /* 1 byte past EOF for this PmaReader */
353 int nAlloc; /* Bytes of space at aAlloc */
354 int nKey; /* Number of bytes in key */
355 sqlite3_file *pFd; /* File handle we are reading from */
356 u8 *aAlloc; /* Space for aKey if aBuffer and pMap wont work */
357 u8 *aKey; /* Pointer to current key */
358 u8 *aBuffer; /* Current read buffer */
359 int nBuffer; /* Size of read buffer in bytes */
360 u8 *aMap; /* Pointer to mapping of entire file */
361 IncrMerger *pIncr; /* Incremental merger */
362};
363
364/*
365** Normally, a PmaReader object iterates through an existing PMA stored
366** within a temp file. However, if the PmaReader.pIncr variable points to
367** an object of the following type, it may be used to iterate/merge through
368** multiple PMAs simultaneously.
369**
370** There are two types of IncrMerger object - single (bUseThread==0) and
371** multi-threaded (bUseThread==1).
372**
373** A multi-threaded IncrMerger object uses two temporary files - aFile[0]
374** and aFile[1]. Neither file is allowed to grow to more than mxSz bytes in
375** size. When the IncrMerger is initialized, it reads enough data from
376** pMerger to populate aFile[0]. It then sets variables within the
377** corresponding PmaReader object to read from that file and kicks off
378** a background thread to populate aFile[1] with the next mxSz bytes of
379** sorted record data from pMerger.
380**
381** When the PmaReader reaches the end of aFile[0], it blocks until the
382** background thread has finished populating aFile[1]. It then exchanges
383** the contents of the aFile[0] and aFile[1] variables within this structure,
384** sets the PmaReader fields to read from the new aFile[0] and kicks off
385** another background thread to populate the new aFile[1]. And so on, until
386** the contents of pMerger are exhausted.
387**
388** A single-threaded IncrMerger does not open any temporary files of its
389** own. Instead, it has exclusive access to mxSz bytes of space beginning
390** at offset iStartOff of file pTask->file2. And instead of using a
391** background thread to prepare data for the PmaReader, with a single
392** threaded IncrMerger the allocate part of pTask->file2 is "refilled" with
393** keys from pMerger by the calling thread whenever the PmaReader runs out
394** of data.
395*/
396struct IncrMerger {
397 SortSubtask *pTask; /* Task that owns this merger */
398 MergeEngine *pMerger; /* Merge engine thread reads data from */
399 i64 iStartOff; /* Offset to start writing file at */
400 int mxSz; /* Maximum bytes of data to store */
401 int bEof; /* Set to true when merge is finished */
402 int bUseThread; /* True to use a bg thread for this object */
403 SorterFile aFile[2]; /* aFile[0] for reading, [1] for writing */
404};
405
406/*
407** An instance of this object is used for writing a PMA.
408**
409** The PMA is written one record at a time. Each record is of an arbitrary
410** size. But I/O is more efficient if it occurs in page-sized blocks where
411** each block is aligned on a page boundary. This object caches writes to
412** the PMA so that aligned, page-size blocks are written.
413*/
414struct PmaWriter {
415 int eFWErr; /* Non-zero if in an error state */
416 u8 *aBuffer; /* Pointer to write buffer */
417 int nBuffer; /* Size of write buffer in bytes */
418 int iBufStart; /* First byte of buffer to write */
419 int iBufEnd; /* Last byte of buffer to write */
420 i64 iWriteOff; /* Offset of start of buffer in file */
421 sqlite3_file *pFd; /* File handle to write to */
422};
423
424/*
425** This object is the header on a single record while that record is being
426** held in memory and prior to being written out as part of a PMA.
427**
428** How the linked list is connected depends on how memory is being managed
429** by this module. If using a separate allocation for each in-memory record
430** (VdbeSorter.list.aMemory==0), then the list is always connected using the
431** SorterRecord.u.pNext pointers.
432**
433** Or, if using the single large allocation method (VdbeSorter.list.aMemory!=0),
434** then while records are being accumulated the list is linked using the
435** SorterRecord.u.iNext offset. This is because the aMemory[] array may
436** be sqlite3Realloc()ed while records are being accumulated. Once the VM
437** has finished passing records to the sorter, or when the in-memory buffer
438** is full, the list is sorted. As part of the sorting process, it is
439** converted to use the SorterRecord.u.pNext pointers. See function
440** vdbeSorterSort() for details.
441*/
442struct SorterRecord {
443 int nVal; /* Size of the record in bytes */
444 union {
445 SorterRecord *pNext; /* Pointer to next record in list */
446 int iNext; /* Offset within aMemory of next record */
447 } u;
448 /* The data for the record immediately follows this header */
449};
450
451/* Return a pointer to the buffer containing the record data for SorterRecord
452** object p. Should be used as if:
453**
454** void *SRVAL(SorterRecord *p) { return (void*)&p[1]; }
455*/
456#define SRVAL(p) ((void*)((SorterRecord*)(p) + 1))
457
458
459/* Maximum number of PMAs that a single MergeEngine can merge */
460#define SORTER_MAX_MERGE_COUNT 16
461
462static int vdbeIncrSwap(IncrMerger*);
463static void vdbeIncrFree(IncrMerger *);
464
465/*
466** Free all memory belonging to the PmaReader object passed as the
467** argument. All structure fields are set to zero before returning.
468*/
469static void vdbePmaReaderClear(PmaReader *pReadr){
470 sqlite3_free(pReadr->aAlloc);
471 sqlite3_free(pReadr->aBuffer);
472 if( pReadr->aMap ) sqlite3OsUnfetch(pReadr->pFd, 0, pReadr->aMap);
473 vdbeIncrFree(pReadr->pIncr);
474 memset(pReadr, 0, sizeof(PmaReader));
475}
476
477/*
478** Read the next nByte bytes of data from the PMA p.
479** If successful, set *ppOut to point to a buffer containing the data
480** and return SQLITE_OK. Otherwise, if an error occurs, return an SQLite
481** error code.
482**
483** The buffer returned in *ppOut is only valid until the
484** next call to this function.
485*/
486static int vdbePmaReadBlob(
487 PmaReader *p, /* PmaReader from which to take the blob */
488 int nByte, /* Bytes of data to read */
489 u8 **ppOut /* OUT: Pointer to buffer containing data */
490){
491 int iBuf; /* Offset within buffer to read from */
492 int nAvail; /* Bytes of data available in buffer */
493
494 if( p->aMap ){
495 *ppOut = &p->aMap[p->iReadOff];
496 p->iReadOff += nByte;
497 return SQLITE_OK;
498 }
499
500 assert( p->aBuffer );
501
502 /* If there is no more data to be read from the buffer, read the next
503 ** p->nBuffer bytes of data from the file into it. Or, if there are less
504 ** than p->nBuffer bytes remaining in the PMA, read all remaining data. */
505 iBuf = p->iReadOff % p->nBuffer;
506 if( iBuf==0 ){
507 int nRead; /* Bytes to read from disk */
508 int rc; /* sqlite3OsRead() return code */
509
510 /* Determine how many bytes of data to read. */
511 if( (p->iEof - p->iReadOff) > (i64)p->nBuffer ){
512 nRead = p->nBuffer;
513 }else{
514 nRead = (int)(p->iEof - p->iReadOff);
515 }
516 assert( nRead>0 );
517
518 /* Readr data from the file. Return early if an error occurs. */
519 rc = sqlite3OsRead(p->pFd, p->aBuffer, nRead, p->iReadOff);
520 assert( rc!=SQLITE_IOERR_SHORT_READ );
521 if( rc!=SQLITE_OK ) return rc;
522 }
523 nAvail = p->nBuffer - iBuf;
524
525 if( nByte<=nAvail ){
526 /* The requested data is available in the in-memory buffer. In this
527 ** case there is no need to make a copy of the data, just return a
528 ** pointer into the buffer to the caller. */
529 *ppOut = &p->aBuffer[iBuf];
530 p->iReadOff += nByte;
531 }else{
532 /* The requested data is not all available in the in-memory buffer.
533 ** In this case, allocate space at p->aAlloc[] to copy the requested
534 ** range into. Then return a copy of pointer p->aAlloc to the caller. */
535 int nRem; /* Bytes remaining to copy */
536
537 /* Extend the p->aAlloc[] allocation if required. */
538 if( p->nAlloc<nByte ){
539 u8 *aNew;
540 sqlite3_int64 nNew = MAX(128, 2*(sqlite3_int64)p->nAlloc);
541 while( nByte>nNew ) nNew = nNew*2;
542 aNew = sqlite3Realloc(p->aAlloc, nNew);
543 if( !aNew ) return SQLITE_NOMEM_BKPT;
544 p->nAlloc = nNew;
545 p->aAlloc = aNew;
546 }
547
548 /* Copy as much data as is available in the buffer into the start of
549 ** p->aAlloc[]. */
550 memcpy(p->aAlloc, &p->aBuffer[iBuf], nAvail);
551 p->iReadOff += nAvail;
552 nRem = nByte - nAvail;
553
554 /* The following loop copies up to p->nBuffer bytes per iteration into
555 ** the p->aAlloc[] buffer. */
556 while( nRem>0 ){
557 int rc; /* vdbePmaReadBlob() return code */
558 int nCopy; /* Number of bytes to copy */
559 u8 *aNext; /* Pointer to buffer to copy data from */
560
561 nCopy = nRem;
562 if( nRem>p->nBuffer ) nCopy = p->nBuffer;
563 rc = vdbePmaReadBlob(p, nCopy, &aNext);
564 if( rc!=SQLITE_OK ) return rc;
565 assert( aNext!=p->aAlloc );
566 memcpy(&p->aAlloc[nByte - nRem], aNext, nCopy);
567 nRem -= nCopy;
568 }
569
570 *ppOut = p->aAlloc;
571 }
572
573 return SQLITE_OK;
574}
575
576/*
577** Read a varint from the stream of data accessed by p. Set *pnOut to
578** the value read.
579*/
580static int vdbePmaReadVarint(PmaReader *p, u64 *pnOut){
581 int iBuf;
582
583 if( p->aMap ){
584 p->iReadOff += sqlite3GetVarint(&p->aMap[p->iReadOff], pnOut);
585 }else{
586 iBuf = p->iReadOff % p->nBuffer;
587 if( iBuf && (p->nBuffer-iBuf)>=9 ){
588 p->iReadOff += sqlite3GetVarint(&p->aBuffer[iBuf], pnOut);
589 }else{
590 u8 aVarint[16], *a;
591 int i = 0, rc;
592 do{
593 rc = vdbePmaReadBlob(p, 1, &a);
594 if( rc ) return rc;
595 aVarint[(i++)&0xf] = a[0];
596 }while( (a[0]&0x80)!=0 );
597 sqlite3GetVarint(aVarint, pnOut);
598 }
599 }
600
601 return SQLITE_OK;
602}
603
604/*
605** Attempt to memory map file pFile. If successful, set *pp to point to the
606** new mapping and return SQLITE_OK. If the mapping is not attempted
607** (because the file is too large or the VFS layer is configured not to use
608** mmap), return SQLITE_OK and set *pp to NULL.
609**
610** Or, if an error occurs, return an SQLite error code. The final value of
611** *pp is undefined in this case.
612*/
613static int vdbeSorterMapFile(SortSubtask *pTask, SorterFile *pFile, u8 **pp){
614 int rc = SQLITE_OK;
615 if( pFile->iEof<=(i64)(pTask->pSorter->db->nMaxSorterMmap) ){
616 sqlite3_file *pFd = pFile->pFd;
617 if( pFd->pMethods->iVersion>=3 ){
618 rc = sqlite3OsFetch(pFd, 0, (int)pFile->iEof, (void**)pp);
619 testcase( rc!=SQLITE_OK );
620 }
621 }
622 return rc;
623}
624
625/*
626** Attach PmaReader pReadr to file pFile (if it is not already attached to
627** that file) and seek it to offset iOff within the file. Return SQLITE_OK
628** if successful, or an SQLite error code if an error occurs.
629*/
630static int vdbePmaReaderSeek(
631 SortSubtask *pTask, /* Task context */
632 PmaReader *pReadr, /* Reader whose cursor is to be moved */
633 SorterFile *pFile, /* Sorter file to read from */
634 i64 iOff /* Offset in pFile */
635){
636 int rc = SQLITE_OK;
637
638 assert( pReadr->pIncr==0 || pReadr->pIncr->bEof==0 );
639
640 if( sqlite3FaultSim(201) ) return SQLITE_IOERR_READ;
641 if( pReadr->aMap ){
642 sqlite3OsUnfetch(pReadr->pFd, 0, pReadr->aMap);
643 pReadr->aMap = 0;
644 }
645 pReadr->iReadOff = iOff;
646 pReadr->iEof = pFile->iEof;
647 pReadr->pFd = pFile->pFd;
648
649 rc = vdbeSorterMapFile(pTask, pFile, &pReadr->aMap);
650 if( rc==SQLITE_OK && pReadr->aMap==0 ){
651 int pgsz = pTask->pSorter->pgsz;
652 int iBuf = pReadr->iReadOff % pgsz;
653 if( pReadr->aBuffer==0 ){
654 pReadr->aBuffer = (u8*)sqlite3Malloc(pgsz);
655 if( pReadr->aBuffer==0 ) rc = SQLITE_NOMEM_BKPT;
656 pReadr->nBuffer = pgsz;
657 }
658 if( rc==SQLITE_OK && iBuf ){
659 int nRead = pgsz - iBuf;
660 if( (pReadr->iReadOff + nRead) > pReadr->iEof ){
661 nRead = (int)(pReadr->iEof - pReadr->iReadOff);
662 }
663 rc = sqlite3OsRead(
664 pReadr->pFd, &pReadr->aBuffer[iBuf], nRead, pReadr->iReadOff
665 );
666 testcase( rc!=SQLITE_OK );
667 }
668 }
669
670 return rc;
671}
672
673/*
674** Advance PmaReader pReadr to the next key in its PMA. Return SQLITE_OK if
675** no error occurs, or an SQLite error code if one does.
676*/
677static int vdbePmaReaderNext(PmaReader *pReadr){
678 int rc = SQLITE_OK; /* Return Code */
679 u64 nRec = 0; /* Size of record in bytes */
680
681
682 if( pReadr->iReadOff>=pReadr->iEof ){
683 IncrMerger *pIncr = pReadr->pIncr;
684 int bEof = 1;
685 if( pIncr ){
686 rc = vdbeIncrSwap(pIncr);
687 if( rc==SQLITE_OK && pIncr->bEof==0 ){
688 rc = vdbePmaReaderSeek(
689 pIncr->pTask, pReadr, &pIncr->aFile[0], pIncr->iStartOff
690 );
691 bEof = 0;
692 }
693 }
694
695 if( bEof ){
696 /* This is an EOF condition */
697 vdbePmaReaderClear(pReadr);
698 testcase( rc!=SQLITE_OK );
699 return rc;
700 }
701 }
702
703 if( rc==SQLITE_OK ){
704 rc = vdbePmaReadVarint(pReadr, &nRec);
705 }
706 if( rc==SQLITE_OK ){
707 pReadr->nKey = (int)nRec;
708 rc = vdbePmaReadBlob(pReadr, (int)nRec, &pReadr->aKey);
709 testcase( rc!=SQLITE_OK );
710 }
711
712 return rc;
713}
714
715/*
716** Initialize PmaReader pReadr to scan through the PMA stored in file pFile
717** starting at offset iStart and ending at offset iEof-1. This function
718** leaves the PmaReader pointing to the first key in the PMA (or EOF if the
719** PMA is empty).
720**
721** If the pnByte parameter is NULL, then it is assumed that the file
722** contains a single PMA, and that that PMA omits the initial length varint.
723*/
724static int vdbePmaReaderInit(
725 SortSubtask *pTask, /* Task context */
726 SorterFile *pFile, /* Sorter file to read from */
727 i64 iStart, /* Start offset in pFile */
728 PmaReader *pReadr, /* PmaReader to populate */
729 i64 *pnByte /* IN/OUT: Increment this value by PMA size */
730){
731 int rc;
732
733 assert( pFile->iEof>iStart );
734 assert( pReadr->aAlloc==0 && pReadr->nAlloc==0 );
735 assert( pReadr->aBuffer==0 );
736 assert( pReadr->aMap==0 );
737
738 rc = vdbePmaReaderSeek(pTask, pReadr, pFile, iStart);
739 if( rc==SQLITE_OK ){
740 u64 nByte = 0; /* Size of PMA in bytes */
741 rc = vdbePmaReadVarint(pReadr, &nByte);
742 pReadr->iEof = pReadr->iReadOff + nByte;
743 *pnByte += nByte;
744 }
745
746 if( rc==SQLITE_OK ){
747 rc = vdbePmaReaderNext(pReadr);
748 }
749 return rc;
750}
751
752/*
753** A version of vdbeSorterCompare() that assumes that it has already been
754** determined that the first field of key1 is equal to the first field of
755** key2.
756*/
757static int vdbeSorterCompareTail(
758 SortSubtask *pTask, /* Subtask context (for pKeyInfo) */
759 int *pbKey2Cached, /* True if pTask->pUnpacked is pKey2 */
760 const void *pKey1, int nKey1, /* Left side of comparison */
761 const void *pKey2, int nKey2 /* Right side of comparison */
762){
763 UnpackedRecord *r2 = pTask->pUnpacked;
764 if( *pbKey2Cached==0 ){
765 sqlite3VdbeRecordUnpack(pTask->pSorter->pKeyInfo, nKey2, pKey2, r2);
766 *pbKey2Cached = 1;
767 }
768 return sqlite3VdbeRecordCompareWithSkip(nKey1, pKey1, r2, 1);
769}
770
771/*
772** Compare key1 (buffer pKey1, size nKey1 bytes) with key2 (buffer pKey2,
773** size nKey2 bytes). Use (pTask->pKeyInfo) for the collation sequences
774** used by the comparison. Return the result of the comparison.
775**
776** If IN/OUT parameter *pbKey2Cached is true when this function is called,
777** it is assumed that (pTask->pUnpacked) contains the unpacked version
778** of key2. If it is false, (pTask->pUnpacked) is populated with the unpacked
779** version of key2 and *pbKey2Cached set to true before returning.
780**
781** If an OOM error is encountered, (pTask->pUnpacked->error_rc) is set
782** to SQLITE_NOMEM.
783*/
784static int vdbeSorterCompare(
785 SortSubtask *pTask, /* Subtask context (for pKeyInfo) */
786 int *pbKey2Cached, /* True if pTask->pUnpacked is pKey2 */
787 const void *pKey1, int nKey1, /* Left side of comparison */
788 const void *pKey2, int nKey2 /* Right side of comparison */
789){
790 UnpackedRecord *r2 = pTask->pUnpacked;
791 if( !*pbKey2Cached ){
792 sqlite3VdbeRecordUnpack(pTask->pSorter->pKeyInfo, nKey2, pKey2, r2);
793 *pbKey2Cached = 1;
794 }
795 return sqlite3VdbeRecordCompare(nKey1, pKey1, r2);
796}
797
798/*
799** A specially optimized version of vdbeSorterCompare() that assumes that
800** the first field of each key is a TEXT value and that the collation
801** sequence to compare them with is BINARY.
802*/
803static int vdbeSorterCompareText(
804 SortSubtask *pTask, /* Subtask context (for pKeyInfo) */
805 int *pbKey2Cached, /* True if pTask->pUnpacked is pKey2 */
806 const void *pKey1, int nKey1, /* Left side of comparison */
807 const void *pKey2, int nKey2 /* Right side of comparison */
808){
809 const u8 * const p1 = (const u8 * const)pKey1;
810 const u8 * const p2 = (const u8 * const)pKey2;
811 const u8 * const v1 = &p1[ p1[0] ]; /* Pointer to value 1 */
812 const u8 * const v2 = &p2[ p2[0] ]; /* Pointer to value 2 */
813
814 int n1;
815 int n2;
816 int res;
817
818 getVarint32NR(&p1[1], n1);
819 getVarint32NR(&p2[1], n2);
820 res = memcmp(v1, v2, (MIN(n1, n2) - 13)/2);
821 if( res==0 ){
822 res = n1 - n2;
823 }
824
825 if( res==0 ){
826 if( pTask->pSorter->pKeyInfo->nKeyField>1 ){
827 res = vdbeSorterCompareTail(
828 pTask, pbKey2Cached, pKey1, nKey1, pKey2, nKey2
829 );
830 }
831 }else{
832 assert( !(pTask->pSorter->pKeyInfo->aSortFlags[0]&KEYINFO_ORDER_BIGNULL) );
833 if( pTask->pSorter->pKeyInfo->aSortFlags[0] ){
834 res = res * -1;
835 }
836 }
837
838 return res;
839}
840
841/*
842** A specially optimized version of vdbeSorterCompare() that assumes that
843** the first field of each key is an INTEGER value.
844*/
845static int vdbeSorterCompareInt(
846 SortSubtask *pTask, /* Subtask context (for pKeyInfo) */
847 int *pbKey2Cached, /* True if pTask->pUnpacked is pKey2 */
848 const void *pKey1, int nKey1, /* Left side of comparison */
849 const void *pKey2, int nKey2 /* Right side of comparison */
850){
851 const u8 * const p1 = (const u8 * const)pKey1;
852 const u8 * const p2 = (const u8 * const)pKey2;
853 const int s1 = p1[1]; /* Left hand serial type */
854 const int s2 = p2[1]; /* Right hand serial type */
855 const u8 * const v1 = &p1[ p1[0] ]; /* Pointer to value 1 */
856 const u8 * const v2 = &p2[ p2[0] ]; /* Pointer to value 2 */
857 int res; /* Return value */
858
859 assert( (s1>0 && s1<7) || s1==8 || s1==9 );
860 assert( (s2>0 && s2<7) || s2==8 || s2==9 );
861
862 if( s1==s2 ){
863 /* The two values have the same sign. Compare using memcmp(). */
864 static const u8 aLen[] = {0, 1, 2, 3, 4, 6, 8, 0, 0, 0 };
865 const u8 n = aLen[s1];
866 int i;
867 res = 0;
868 for(i=0; i<n; i++){
869 if( (res = v1[i] - v2[i])!=0 ){
870 if( ((v1[0] ^ v2[0]) & 0x80)!=0 ){
871 res = v1[0] & 0x80 ? -1 : +1;
872 }
873 break;
874 }
875 }
876 }else if( s1>7 && s2>7 ){
877 res = s1 - s2;
878 }else{
879 if( s2>7 ){
880 res = +1;
881 }else if( s1>7 ){
882 res = -1;
883 }else{
884 res = s1 - s2;
885 }
886 assert( res!=0 );
887
888 if( res>0 ){
889 if( *v1 & 0x80 ) res = -1;
890 }else{
891 if( *v2 & 0x80 ) res = +1;
892 }
893 }
894
895 if( res==0 ){
896 if( pTask->pSorter->pKeyInfo->nKeyField>1 ){
897 res = vdbeSorterCompareTail(
898 pTask, pbKey2Cached, pKey1, nKey1, pKey2, nKey2
899 );
900 }
901 }else if( pTask->pSorter->pKeyInfo->aSortFlags[0] ){
902 assert( !(pTask->pSorter->pKeyInfo->aSortFlags[0]&KEYINFO_ORDER_BIGNULL) );
903 res = res * -1;
904 }
905
906 return res;
907}
908
909/*
910** Initialize the temporary index cursor just opened as a sorter cursor.
911**
912** Usually, the sorter module uses the value of (pCsr->pKeyInfo->nKeyField)
913** to determine the number of fields that should be compared from the
914** records being sorted. However, if the value passed as argument nField
915** is non-zero and the sorter is able to guarantee a stable sort, nField
916** is used instead. This is used when sorting records for a CREATE INDEX
917** statement. In this case, keys are always delivered to the sorter in
918** order of the primary key, which happens to be make up the final part
919** of the records being sorted. So if the sort is stable, there is never
920** any reason to compare PK fields and they can be ignored for a small
921** performance boost.
922**
923** The sorter can guarantee a stable sort when running in single-threaded
924** mode, but not in multi-threaded mode.
925**
926** SQLITE_OK is returned if successful, or an SQLite error code otherwise.
927*/
928int sqlite3VdbeSorterInit(
929 sqlite3 *db, /* Database connection (for malloc()) */
930 int nField, /* Number of key fields in each record */
931 VdbeCursor *pCsr /* Cursor that holds the new sorter */
932){
933 int pgsz; /* Page size of main database */
934 int i; /* Used to iterate through aTask[] */
935 VdbeSorter *pSorter; /* The new sorter */
936 KeyInfo *pKeyInfo; /* Copy of pCsr->pKeyInfo with db==0 */
937 int szKeyInfo; /* Size of pCsr->pKeyInfo in bytes */
938 int sz; /* Size of pSorter in bytes */
939 int rc = SQLITE_OK;
940#if SQLITE_MAX_WORKER_THREADS==0
941# define nWorker 0
942#else
943 int nWorker;
944#endif
945
946 /* Initialize the upper limit on the number of worker threads */
947#if SQLITE_MAX_WORKER_THREADS>0
948 if( sqlite3TempInMemory(db) || sqlite3GlobalConfig.bCoreMutex==0 ){
949 nWorker = 0;
950 }else{
951 nWorker = db->aLimit[SQLITE_LIMIT_WORKER_THREADS];
952 }
953#endif
954
955 /* Do not allow the total number of threads (main thread + all workers)
956 ** to exceed the maximum merge count */
957#if SQLITE_MAX_WORKER_THREADS>=SORTER_MAX_MERGE_COUNT
958 if( nWorker>=SORTER_MAX_MERGE_COUNT ){
959 nWorker = SORTER_MAX_MERGE_COUNT-1;
960 }
961#endif
962
963 assert( pCsr->pKeyInfo );
964 assert( !pCsr->isEphemeral );
965 assert( pCsr->eCurType==CURTYPE_SORTER );
966 szKeyInfo = sizeof(KeyInfo) + (pCsr->pKeyInfo->nKeyField-1)*sizeof(CollSeq*);
967 sz = sizeof(VdbeSorter) + nWorker * sizeof(SortSubtask);
968
969 pSorter = (VdbeSorter*)sqlite3DbMallocZero(db, sz + szKeyInfo);
970 pCsr->uc.pSorter = pSorter;
971 if( pSorter==0 ){
972 rc = SQLITE_NOMEM_BKPT;
973 }else{
974 Btree *pBt = db->aDb[0].pBt;
975 pSorter->pKeyInfo = pKeyInfo = (KeyInfo*)((u8*)pSorter + sz);
976 memcpy(pKeyInfo, pCsr->pKeyInfo, szKeyInfo);
977 pKeyInfo->db = 0;
978 if( nField && nWorker==0 ){
979 pKeyInfo->nKeyField = nField;
980 }
981 sqlite3BtreeEnter(pBt);
982 pSorter->pgsz = pgsz = sqlite3BtreeGetPageSize(pBt);
983 sqlite3BtreeLeave(pBt);
984 pSorter->nTask = nWorker + 1;
985 pSorter->iPrev = (u8)(nWorker - 1);
986 pSorter->bUseThreads = (pSorter->nTask>1);
987 pSorter->db = db;
988 for(i=0; i<pSorter->nTask; i++){
989 SortSubtask *pTask = &pSorter->aTask[i];
990 pTask->pSorter = pSorter;
991 }
992
993 if( !sqlite3TempInMemory(db) ){
994 i64 mxCache; /* Cache size in bytes*/
995 u32 szPma = sqlite3GlobalConfig.szPma;
996 pSorter->mnPmaSize = szPma * pgsz;
997
998 mxCache = db->aDb[0].pSchema->cache_size;
999 if( mxCache<0 ){
1000 /* A negative cache-size value C indicates that the cache is abs(C)
1001 ** KiB in size. */
1002 mxCache = mxCache * -1024;
1003 }else{
1004 mxCache = mxCache * pgsz;
1005 }
1006 mxCache = MIN(mxCache, SQLITE_MAX_PMASZ);
1007 pSorter->mxPmaSize = MAX(pSorter->mnPmaSize, (int)mxCache);
1008
1009 /* Avoid large memory allocations if the application has requested
1010 ** SQLITE_CONFIG_SMALL_MALLOC. */
1011 if( sqlite3GlobalConfig.bSmallMalloc==0 ){
1012 assert( pSorter->iMemory==0 );
1013 pSorter->nMemory = pgsz;
1014 pSorter->list.aMemory = (u8*)sqlite3Malloc(pgsz);
1015 if( !pSorter->list.aMemory ) rc = SQLITE_NOMEM_BKPT;
1016 }
1017 }
1018
1019 if( pKeyInfo->nAllField<13
1020 && (pKeyInfo->aColl[0]==0 || pKeyInfo->aColl[0]==db->pDfltColl)
1021 && (pKeyInfo->aSortFlags[0] & KEYINFO_ORDER_BIGNULL)==0
1022 ){
1023 pSorter->typeMask = SORTER_TYPE_INTEGER | SORTER_TYPE_TEXT;
1024 }
1025 }
1026
1027 return rc;
1028}
1029#undef nWorker /* Defined at the top of this function */
1030
1031/*
1032** Free the list of sorted records starting at pRecord.
1033*/
1034static void vdbeSorterRecordFree(sqlite3 *db, SorterRecord *pRecord){
1035 SorterRecord *p;
1036 SorterRecord *pNext;
1037 for(p=pRecord; p; p=pNext){
1038 pNext = p->u.pNext;
1039 sqlite3DbFree(db, p);
1040 }
1041}
1042
1043/*
1044** Free all resources owned by the object indicated by argument pTask. All
1045** fields of *pTask are zeroed before returning.
1046*/
1047static void vdbeSortSubtaskCleanup(sqlite3 *db, SortSubtask *pTask){
1048 sqlite3DbFree(db, pTask->pUnpacked);
1049#if SQLITE_MAX_WORKER_THREADS>0
1050 /* pTask->list.aMemory can only be non-zero if it was handed memory
1051 ** from the main thread. That only occurs SQLITE_MAX_WORKER_THREADS>0 */
1052 if( pTask->list.aMemory ){
1053 sqlite3_free(pTask->list.aMemory);
1054 }else
1055#endif
1056 {
1057 assert( pTask->list.aMemory==0 );
1058 vdbeSorterRecordFree(0, pTask->list.pList);
1059 }
1060 if( pTask->file.pFd ){
1061 sqlite3OsCloseFree(pTask->file.pFd);
1062 }
1063 if( pTask->file2.pFd ){
1064 sqlite3OsCloseFree(pTask->file2.pFd);
1065 }
1066 memset(pTask, 0, sizeof(SortSubtask));
1067}
1068
1069#ifdef SQLITE_DEBUG_SORTER_THREADS
1070static void vdbeSorterWorkDebug(SortSubtask *pTask, const char *zEvent){
1071 i64 t;
1072 int iTask = (pTask - pTask->pSorter->aTask);
1073 sqlite3OsCurrentTimeInt64(pTask->pSorter->db->pVfs, &t);
1074 fprintf(stderr, "%lld:%d %s\n", t, iTask, zEvent);
1075}
1076static void vdbeSorterRewindDebug(const char *zEvent){
1077 i64 t = 0;
1078 sqlite3_vfs *pVfs = sqlite3_vfs_find(0);
1079 if( ALWAYS(pVfs) ) sqlite3OsCurrentTimeInt64(pVfs, &t);
1080 fprintf(stderr, "%lld:X %s\n", t, zEvent);
1081}
1082static void vdbeSorterPopulateDebug(
1083 SortSubtask *pTask,
1084 const char *zEvent
1085){
1086 i64 t;
1087 int iTask = (pTask - pTask->pSorter->aTask);
1088 sqlite3OsCurrentTimeInt64(pTask->pSorter->db->pVfs, &t);
1089 fprintf(stderr, "%lld:bg%d %s\n", t, iTask, zEvent);
1090}
1091static void vdbeSorterBlockDebug(
1092 SortSubtask *pTask,
1093 int bBlocked,
1094 const char *zEvent
1095){
1096 if( bBlocked ){
1097 i64 t;
1098 sqlite3OsCurrentTimeInt64(pTask->pSorter->db->pVfs, &t);
1099 fprintf(stderr, "%lld:main %s\n", t, zEvent);
1100 }
1101}
1102#else
1103# define vdbeSorterWorkDebug(x,y)
1104# define vdbeSorterRewindDebug(y)
1105# define vdbeSorterPopulateDebug(x,y)
1106# define vdbeSorterBlockDebug(x,y,z)
1107#endif
1108
1109#if SQLITE_MAX_WORKER_THREADS>0
1110/*
1111** Join thread pTask->thread.
1112*/
1113static int vdbeSorterJoinThread(SortSubtask *pTask){
1114 int rc = SQLITE_OK;
1115 if( pTask->pThread ){
1116#ifdef SQLITE_DEBUG_SORTER_THREADS
1117 int bDone = pTask->bDone;
1118#endif
1119 void *pRet = SQLITE_INT_TO_PTR(SQLITE_ERROR);
1120 vdbeSorterBlockDebug(pTask, !bDone, "enter");
1121 (void)sqlite3ThreadJoin(pTask->pThread, &pRet);
1122 vdbeSorterBlockDebug(pTask, !bDone, "exit");
1123 rc = SQLITE_PTR_TO_INT(pRet);
1124 assert( pTask->bDone==1 );
1125 pTask->bDone = 0;
1126 pTask->pThread = 0;
1127 }
1128 return rc;
1129}
1130
1131/*
1132** Launch a background thread to run xTask(pIn).
1133*/
1134static int vdbeSorterCreateThread(
1135 SortSubtask *pTask, /* Thread will use this task object */
1136 void *(*xTask)(void*), /* Routine to run in a separate thread */
1137 void *pIn /* Argument passed into xTask() */
1138){
1139 assert( pTask->pThread==0 && pTask->bDone==0 );
1140 return sqlite3ThreadCreate(&pTask->pThread, xTask, pIn);
1141}
1142
1143/*
1144** Join all outstanding threads launched by SorterWrite() to create
1145** level-0 PMAs.
1146*/
1147static int vdbeSorterJoinAll(VdbeSorter *pSorter, int rcin){
1148 int rc = rcin;
1149 int i;
1150
1151 /* This function is always called by the main user thread.
1152 **
1153 ** If this function is being called after SorterRewind() has been called,
1154 ** it is possible that thread pSorter->aTask[pSorter->nTask-1].pThread
1155 ** is currently attempt to join one of the other threads. To avoid a race
1156 ** condition where this thread also attempts to join the same object, join
1157 ** thread pSorter->aTask[pSorter->nTask-1].pThread first. */
1158 for(i=pSorter->nTask-1; i>=0; i--){
1159 SortSubtask *pTask = &pSorter->aTask[i];
1160 int rc2 = vdbeSorterJoinThread(pTask);
1161 if( rc==SQLITE_OK ) rc = rc2;
1162 }
1163 return rc;
1164}
1165#else
1166# define vdbeSorterJoinAll(x,rcin) (rcin)
1167# define vdbeSorterJoinThread(pTask) SQLITE_OK
1168#endif
1169
1170/*
1171** Allocate a new MergeEngine object capable of handling up to
1172** nReader PmaReader inputs.
1173**
1174** nReader is automatically rounded up to the next power of two.
1175** nReader may not exceed SORTER_MAX_MERGE_COUNT even after rounding up.
1176*/
1177static MergeEngine *vdbeMergeEngineNew(int nReader){
1178 int N = 2; /* Smallest power of two >= nReader */
1179 int nByte; /* Total bytes of space to allocate */
1180 MergeEngine *pNew; /* Pointer to allocated object to return */
1181
1182 assert( nReader<=SORTER_MAX_MERGE_COUNT );
1183
1184 while( N<nReader ) N += N;
1185 nByte = sizeof(MergeEngine) + N * (sizeof(int) + sizeof(PmaReader));
1186
1187 pNew = sqlite3FaultSim(100) ? 0 : (MergeEngine*)sqlite3MallocZero(nByte);
1188 if( pNew ){
1189 pNew->nTree = N;
1190 pNew->pTask = 0;
1191 pNew->aReadr = (PmaReader*)&pNew[1];
1192 pNew->aTree = (int*)&pNew->aReadr[N];
1193 }
1194 return pNew;
1195}
1196
1197/*
1198** Free the MergeEngine object passed as the only argument.
1199*/
1200static void vdbeMergeEngineFree(MergeEngine *pMerger){
1201 int i;
1202 if( pMerger ){
1203 for(i=0; i<pMerger->nTree; i++){
1204 vdbePmaReaderClear(&pMerger->aReadr[i]);
1205 }
1206 }
1207 sqlite3_free(pMerger);
1208}
1209
1210/*
1211** Free all resources associated with the IncrMerger object indicated by
1212** the first argument.
1213*/
1214static void vdbeIncrFree(IncrMerger *pIncr){
1215 if( pIncr ){
1216#if SQLITE_MAX_WORKER_THREADS>0
1217 if( pIncr->bUseThread ){
1218 vdbeSorterJoinThread(pIncr->pTask);
1219 if( pIncr->aFile[0].pFd ) sqlite3OsCloseFree(pIncr->aFile[0].pFd);
1220 if( pIncr->aFile[1].pFd ) sqlite3OsCloseFree(pIncr->aFile[1].pFd);
1221 }
1222#endif
1223 vdbeMergeEngineFree(pIncr->pMerger);
1224 sqlite3_free(pIncr);
1225 }
1226}
1227
1228/*
1229** Reset a sorting cursor back to its original empty state.
1230*/
1231void sqlite3VdbeSorterReset(sqlite3 *db, VdbeSorter *pSorter){
1232 int i;
1233 (void)vdbeSorterJoinAll(pSorter, SQLITE_OK);
1234 assert( pSorter->bUseThreads || pSorter->pReader==0 );
1235#if SQLITE_MAX_WORKER_THREADS>0
1236 if( pSorter->pReader ){
1237 vdbePmaReaderClear(pSorter->pReader);
1238 sqlite3DbFree(db, pSorter->pReader);
1239 pSorter->pReader = 0;
1240 }
1241#endif
1242 vdbeMergeEngineFree(pSorter->pMerger);
1243 pSorter->pMerger = 0;
1244 for(i=0; i<pSorter->nTask; i++){
1245 SortSubtask *pTask = &pSorter->aTask[i];
1246 vdbeSortSubtaskCleanup(db, pTask);
1247 pTask->pSorter = pSorter;
1248 }
1249 if( pSorter->list.aMemory==0 ){
1250 vdbeSorterRecordFree(0, pSorter->list.pList);
1251 }
1252 pSorter->list.pList = 0;
1253 pSorter->list.szPMA = 0;
1254 pSorter->bUsePMA = 0;
1255 pSorter->iMemory = 0;
1256 pSorter->mxKeysize = 0;
1257 sqlite3DbFree(db, pSorter->pUnpacked);
1258 pSorter->pUnpacked = 0;
1259}
1260
1261/*
1262** Free any cursor components allocated by sqlite3VdbeSorterXXX routines.
1263*/
1264void sqlite3VdbeSorterClose(sqlite3 *db, VdbeCursor *pCsr){
1265 VdbeSorter *pSorter;
1266 assert( pCsr->eCurType==CURTYPE_SORTER );
1267 pSorter = pCsr->uc.pSorter;
1268 if( pSorter ){
1269 sqlite3VdbeSorterReset(db, pSorter);
1270 sqlite3_free(pSorter->list.aMemory);
1271 sqlite3DbFree(db, pSorter);
1272 pCsr->uc.pSorter = 0;
1273 }
1274}
1275
1276#if SQLITE_MAX_MMAP_SIZE>0
1277/*
1278** The first argument is a file-handle open on a temporary file. The file
1279** is guaranteed to be nByte bytes or smaller in size. This function
1280** attempts to extend the file to nByte bytes in size and to ensure that
1281** the VFS has memory mapped it.
1282**
1283** Whether or not the file does end up memory mapped of course depends on
1284** the specific VFS implementation.
1285*/
1286static void vdbeSorterExtendFile(sqlite3 *db, sqlite3_file *pFd, i64 nByte){
1287 if( nByte<=(i64)(db->nMaxSorterMmap) && pFd->pMethods->iVersion>=3 ){
1288 void *p = 0;
1289 int chunksize = 4*1024;
1290 sqlite3OsFileControlHint(pFd, SQLITE_FCNTL_CHUNK_SIZE, &chunksize);
1291 sqlite3OsFileControlHint(pFd, SQLITE_FCNTL_SIZE_HINT, &nByte);
1292 sqlite3OsFetch(pFd, 0, (int)nByte, &p);
1293 if( p ) sqlite3OsUnfetch(pFd, 0, p);
1294 }
1295}
1296#else
1297# define vdbeSorterExtendFile(x,y,z)
1298#endif
1299
1300/*
1301** Allocate space for a file-handle and open a temporary file. If successful,
1302** set *ppFd to point to the malloc'd file-handle and return SQLITE_OK.
1303** Otherwise, set *ppFd to 0 and return an SQLite error code.
1304*/
1305static int vdbeSorterOpenTempFile(
1306 sqlite3 *db, /* Database handle doing sort */
1307 i64 nExtend, /* Attempt to extend file to this size */
1308 sqlite3_file **ppFd
1309){
1310 int rc;
1311 if( sqlite3FaultSim(202) ) return SQLITE_IOERR_ACCESS;
1312 rc = sqlite3OsOpenMalloc(db->pVfs, 0, ppFd,
1313 SQLITE_OPEN_TEMP_JOURNAL |
1314 SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE |
1315 SQLITE_OPEN_EXCLUSIVE | SQLITE_OPEN_DELETEONCLOSE, &rc
1316 );
1317 if( rc==SQLITE_OK ){
1318 i64 max = SQLITE_MAX_MMAP_SIZE;
1319 sqlite3OsFileControlHint(*ppFd, SQLITE_FCNTL_MMAP_SIZE, (void*)&max);
1320 if( nExtend>0 ){
1321 vdbeSorterExtendFile(db, *ppFd, nExtend);
1322 }
1323 }
1324 return rc;
1325}
1326
1327/*
1328** If it has not already been allocated, allocate the UnpackedRecord
1329** structure at pTask->pUnpacked. Return SQLITE_OK if successful (or
1330** if no allocation was required), or SQLITE_NOMEM otherwise.
1331*/
1332static int vdbeSortAllocUnpacked(SortSubtask *pTask){
1333 if( pTask->pUnpacked==0 ){
1334 pTask->pUnpacked = sqlite3VdbeAllocUnpackedRecord(pTask->pSorter->pKeyInfo);
1335 if( pTask->pUnpacked==0 ) return SQLITE_NOMEM_BKPT;
1336 pTask->pUnpacked->nField = pTask->pSorter->pKeyInfo->nKeyField;
1337 pTask->pUnpacked->errCode = 0;
1338 }
1339 return SQLITE_OK;
1340}
1341
1342
1343/*
1344** Merge the two sorted lists p1 and p2 into a single list.
1345*/
1346static SorterRecord *vdbeSorterMerge(
1347 SortSubtask *pTask, /* Calling thread context */
1348 SorterRecord *p1, /* First list to merge */
1349 SorterRecord *p2 /* Second list to merge */
1350){
1351 SorterRecord *pFinal = 0;
1352 SorterRecord **pp = &pFinal;
1353 int bCached = 0;
1354
1355 assert( p1!=0 && p2!=0 );
1356 for(;;){
1357 int res;
1358 res = pTask->xCompare(
1359 pTask, &bCached, SRVAL(p1), p1->nVal, SRVAL(p2), p2->nVal
1360 );
1361
1362 if( res<=0 ){
1363 *pp = p1;
1364 pp = &p1->u.pNext;
1365 p1 = p1->u.pNext;
1366 if( p1==0 ){
1367 *pp = p2;
1368 break;
1369 }
1370 }else{
1371 *pp = p2;
1372 pp = &p2->u.pNext;
1373 p2 = p2->u.pNext;
1374 bCached = 0;
1375 if( p2==0 ){
1376 *pp = p1;
1377 break;
1378 }
1379 }
1380 }
1381 return pFinal;
1382}
1383
1384/*
1385** Return the SorterCompare function to compare values collected by the
1386** sorter object passed as the only argument.
1387*/
1388static SorterCompare vdbeSorterGetCompare(VdbeSorter *p){
1389 if( p->typeMask==SORTER_TYPE_INTEGER ){
1390 return vdbeSorterCompareInt;
1391 }else if( p->typeMask==SORTER_TYPE_TEXT ){
1392 return vdbeSorterCompareText;
1393 }
1394 return vdbeSorterCompare;
1395}
1396
1397/*
1398** Sort the linked list of records headed at pTask->pList. Return
1399** SQLITE_OK if successful, or an SQLite error code (i.e. SQLITE_NOMEM) if
1400** an error occurs.
1401*/
1402static int vdbeSorterSort(SortSubtask *pTask, SorterList *pList){
1403 int i;
1404 SorterRecord *p;
1405 int rc;
1406 SorterRecord *aSlot[64];
1407
1408 rc = vdbeSortAllocUnpacked(pTask);
1409 if( rc!=SQLITE_OK ) return rc;
1410
1411 p = pList->pList;
1412 pTask->xCompare = vdbeSorterGetCompare(pTask->pSorter);
1413 memset(aSlot, 0, sizeof(aSlot));
1414
1415 while( p ){
1416 SorterRecord *pNext;
1417 if( pList->aMemory ){
1418 if( (u8*)p==pList->aMemory ){
1419 pNext = 0;
1420 }else{
1421 assert( p->u.iNext<sqlite3MallocSize(pList->aMemory) );
1422 pNext = (SorterRecord*)&pList->aMemory[p->u.iNext];
1423 }
1424 }else{
1425 pNext = p->u.pNext;
1426 }
1427
1428 p->u.pNext = 0;
1429 for(i=0; aSlot[i]; i++){
1430 p = vdbeSorterMerge(pTask, p, aSlot[i]);
1431 aSlot[i] = 0;
1432 }
1433 aSlot[i] = p;
1434 p = pNext;
1435 }
1436
1437 p = 0;
1438 for(i=0; i<ArraySize(aSlot); i++){
1439 if( aSlot[i]==0 ) continue;
1440 p = p ? vdbeSorterMerge(pTask, p, aSlot[i]) : aSlot[i];
1441 }
1442 pList->pList = p;
1443
1444 assert( pTask->pUnpacked->errCode==SQLITE_OK
1445 || pTask->pUnpacked->errCode==SQLITE_NOMEM
1446 );
1447 return pTask->pUnpacked->errCode;
1448}
1449
1450/*
1451** Initialize a PMA-writer object.
1452*/
1453static void vdbePmaWriterInit(
1454 sqlite3_file *pFd, /* File handle to write to */
1455 PmaWriter *p, /* Object to populate */
1456 int nBuf, /* Buffer size */
1457 i64 iStart /* Offset of pFd to begin writing at */
1458){
1459 memset(p, 0, sizeof(PmaWriter));
1460 p->aBuffer = (u8*)sqlite3Malloc(nBuf);
1461 if( !p->aBuffer ){
1462 p->eFWErr = SQLITE_NOMEM_BKPT;
1463 }else{
1464 p->iBufEnd = p->iBufStart = (iStart % nBuf);
1465 p->iWriteOff = iStart - p->iBufStart;
1466 p->nBuffer = nBuf;
1467 p->pFd = pFd;
1468 }
1469}
1470
1471/*
1472** Write nData bytes of data to the PMA. Return SQLITE_OK
1473** if successful, or an SQLite error code if an error occurs.
1474*/
1475static void vdbePmaWriteBlob(PmaWriter *p, u8 *pData, int nData){
1476 int nRem = nData;
1477 while( nRem>0 && p->eFWErr==0 ){
1478 int nCopy = nRem;
1479 if( nCopy>(p->nBuffer - p->iBufEnd) ){
1480 nCopy = p->nBuffer - p->iBufEnd;
1481 }
1482
1483 memcpy(&p->aBuffer[p->iBufEnd], &pData[nData-nRem], nCopy);
1484 p->iBufEnd += nCopy;
1485 if( p->iBufEnd==p->nBuffer ){
1486 p->eFWErr = sqlite3OsWrite(p->pFd,
1487 &p->aBuffer[p->iBufStart], p->iBufEnd - p->iBufStart,
1488 p->iWriteOff + p->iBufStart
1489 );
1490 p->iBufStart = p->iBufEnd = 0;
1491 p->iWriteOff += p->nBuffer;
1492 }
1493 assert( p->iBufEnd<p->nBuffer );
1494
1495 nRem -= nCopy;
1496 }
1497}
1498
1499/*
1500** Flush any buffered data to disk and clean up the PMA-writer object.
1501** The results of using the PMA-writer after this call are undefined.
1502** Return SQLITE_OK if flushing the buffered data succeeds or is not
1503** required. Otherwise, return an SQLite error code.
1504**
1505** Before returning, set *piEof to the offset immediately following the
1506** last byte written to the file.
1507*/
1508static int vdbePmaWriterFinish(PmaWriter *p, i64 *piEof){
1509 int rc;
1510 if( p->eFWErr==0 && ALWAYS(p->aBuffer) && p->iBufEnd>p->iBufStart ){
1511 p->eFWErr = sqlite3OsWrite(p->pFd,
1512 &p->aBuffer[p->iBufStart], p->iBufEnd - p->iBufStart,
1513 p->iWriteOff + p->iBufStart
1514 );
1515 }
1516 *piEof = (p->iWriteOff + p->iBufEnd);
1517 sqlite3_free(p->aBuffer);
1518 rc = p->eFWErr;
1519 memset(p, 0, sizeof(PmaWriter));
1520 return rc;
1521}
1522
1523/*
1524** Write value iVal encoded as a varint to the PMA. Return
1525** SQLITE_OK if successful, or an SQLite error code if an error occurs.
1526*/
1527static void vdbePmaWriteVarint(PmaWriter *p, u64 iVal){
1528 int nByte;
1529 u8 aByte[10];
1530 nByte = sqlite3PutVarint(aByte, iVal);
1531 vdbePmaWriteBlob(p, aByte, nByte);
1532}
1533
1534/*
1535** Write the current contents of in-memory linked-list pList to a level-0
1536** PMA in the temp file belonging to sub-task pTask. Return SQLITE_OK if
1537** successful, or an SQLite error code otherwise.
1538**
1539** The format of a PMA is:
1540**
1541** * A varint. This varint contains the total number of bytes of content
1542** in the PMA (not including the varint itself).
1543**
1544** * One or more records packed end-to-end in order of ascending keys.
1545** Each record consists of a varint followed by a blob of data (the
1546** key). The varint is the number of bytes in the blob of data.
1547*/
1548static int vdbeSorterListToPMA(SortSubtask *pTask, SorterList *pList){
1549 sqlite3 *db = pTask->pSorter->db;
1550 int rc = SQLITE_OK; /* Return code */
1551 PmaWriter writer; /* Object used to write to the file */
1552
1553#ifdef SQLITE_DEBUG
1554 /* Set iSz to the expected size of file pTask->file after writing the PMA.
1555 ** This is used by an assert() statement at the end of this function. */
1556 i64 iSz = pList->szPMA + sqlite3VarintLen(pList->szPMA) + pTask->file.iEof;
1557#endif
1558
1559 vdbeSorterWorkDebug(pTask, "enter");
1560 memset(&writer, 0, sizeof(PmaWriter));
1561 assert( pList->szPMA>0 );
1562
1563 /* If the first temporary PMA file has not been opened, open it now. */
1564 if( pTask->file.pFd==0 ){
1565 rc = vdbeSorterOpenTempFile(db, 0, &pTask->file.pFd);
1566 assert( rc!=SQLITE_OK || pTask->file.pFd );
1567 assert( pTask->file.iEof==0 );
1568 assert( pTask->nPMA==0 );
1569 }
1570
1571 /* Try to get the file to memory map */
1572 if( rc==SQLITE_OK ){
1573 vdbeSorterExtendFile(db, pTask->file.pFd, pTask->file.iEof+pList->szPMA+9);
1574 }
1575
1576 /* Sort the list */
1577 if( rc==SQLITE_OK ){
1578 rc = vdbeSorterSort(pTask, pList);
1579 }
1580
1581 if( rc==SQLITE_OK ){
1582 SorterRecord *p;
1583 SorterRecord *pNext = 0;
1584
1585 vdbePmaWriterInit(pTask->file.pFd, &writer, pTask->pSorter->pgsz,
1586 pTask->file.iEof);
1587 pTask->nPMA++;
1588 vdbePmaWriteVarint(&writer, pList->szPMA);
1589 for(p=pList->pList; p; p=pNext){
1590 pNext = p->u.pNext;
1591 vdbePmaWriteVarint(&writer, p->nVal);
1592 vdbePmaWriteBlob(&writer, SRVAL(p), p->nVal);
1593 if( pList->aMemory==0 ) sqlite3_free(p);
1594 }
1595 pList->pList = p;
1596 rc = vdbePmaWriterFinish(&writer, &pTask->file.iEof);
1597 }
1598
1599 vdbeSorterWorkDebug(pTask, "exit");
1600 assert( rc!=SQLITE_OK || pList->pList==0 );
1601 assert( rc!=SQLITE_OK || pTask->file.iEof==iSz );
1602 return rc;
1603}
1604
1605/*
1606** Advance the MergeEngine to its next entry.
1607** Set *pbEof to true there is no next entry because
1608** the MergeEngine has reached the end of all its inputs.
1609**
1610** Return SQLITE_OK if successful or an error code if an error occurs.
1611*/
1612static int vdbeMergeEngineStep(
1613 MergeEngine *pMerger, /* The merge engine to advance to the next row */
1614 int *pbEof /* Set TRUE at EOF. Set false for more content */
1615){
1616 int rc;
1617 int iPrev = pMerger->aTree[1];/* Index of PmaReader to advance */
1618 SortSubtask *pTask = pMerger->pTask;
1619
1620 /* Advance the current PmaReader */
1621 rc = vdbePmaReaderNext(&pMerger->aReadr[iPrev]);
1622
1623 /* Update contents of aTree[] */
1624 if( rc==SQLITE_OK ){
1625 int i; /* Index of aTree[] to recalculate */
1626 PmaReader *pReadr1; /* First PmaReader to compare */
1627 PmaReader *pReadr2; /* Second PmaReader to compare */
1628 int bCached = 0;
1629
1630 /* Find the first two PmaReaders to compare. The one that was just
1631 ** advanced (iPrev) and the one next to it in the array. */
1632 pReadr1 = &pMerger->aReadr[(iPrev & 0xFFFE)];
1633 pReadr2 = &pMerger->aReadr[(iPrev | 0x0001)];
1634
1635 for(i=(pMerger->nTree+iPrev)/2; i>0; i=i/2){
1636 /* Compare pReadr1 and pReadr2. Store the result in variable iRes. */
1637 int iRes;
1638 if( pReadr1->pFd==0 ){
1639 iRes = +1;
1640 }else if( pReadr2->pFd==0 ){
1641 iRes = -1;
1642 }else{
1643 iRes = pTask->xCompare(pTask, &bCached,
1644 pReadr1->aKey, pReadr1->nKey, pReadr2->aKey, pReadr2->nKey
1645 );
1646 }
1647
1648 /* If pReadr1 contained the smaller value, set aTree[i] to its index.
1649 ** Then set pReadr2 to the next PmaReader to compare to pReadr1. In this
1650 ** case there is no cache of pReadr2 in pTask->pUnpacked, so set
1651 ** pKey2 to point to the record belonging to pReadr2.
1652 **
1653 ** Alternatively, if pReadr2 contains the smaller of the two values,
1654 ** set aTree[i] to its index and update pReadr1. If vdbeSorterCompare()
1655 ** was actually called above, then pTask->pUnpacked now contains
1656 ** a value equivalent to pReadr2. So set pKey2 to NULL to prevent
1657 ** vdbeSorterCompare() from decoding pReadr2 again.
1658 **
1659 ** If the two values were equal, then the value from the oldest
1660 ** PMA should be considered smaller. The VdbeSorter.aReadr[] array
1661 ** is sorted from oldest to newest, so pReadr1 contains older values
1662 ** than pReadr2 iff (pReadr1<pReadr2). */
1663 if( iRes<0 || (iRes==0 && pReadr1<pReadr2) ){
1664 pMerger->aTree[i] = (int)(pReadr1 - pMerger->aReadr);
1665 pReadr2 = &pMerger->aReadr[ pMerger->aTree[i ^ 0x0001] ];
1666 bCached = 0;
1667 }else{
1668 if( pReadr1->pFd ) bCached = 0;
1669 pMerger->aTree[i] = (int)(pReadr2 - pMerger->aReadr);
1670 pReadr1 = &pMerger->aReadr[ pMerger->aTree[i ^ 0x0001] ];
1671 }
1672 }
1673 *pbEof = (pMerger->aReadr[pMerger->aTree[1]].pFd==0);
1674 }
1675
1676 return (rc==SQLITE_OK ? pTask->pUnpacked->errCode : rc);
1677}
1678
1679#if SQLITE_MAX_WORKER_THREADS>0
1680/*
1681** The main routine for background threads that write level-0 PMAs.
1682*/
1683static void *vdbeSorterFlushThread(void *pCtx){
1684 SortSubtask *pTask = (SortSubtask*)pCtx;
1685 int rc; /* Return code */
1686 assert( pTask->bDone==0 );
1687 rc = vdbeSorterListToPMA(pTask, &pTask->list);
1688 pTask->bDone = 1;
1689 return SQLITE_INT_TO_PTR(rc);
1690}
1691#endif /* SQLITE_MAX_WORKER_THREADS>0 */
1692
1693/*
1694** Flush the current contents of VdbeSorter.list to a new PMA, possibly
1695** using a background thread.
1696*/
1697static int vdbeSorterFlushPMA(VdbeSorter *pSorter){
1698#if SQLITE_MAX_WORKER_THREADS==0
1699 pSorter->bUsePMA = 1;
1700 return vdbeSorterListToPMA(&pSorter->aTask[0], &pSorter->list);
1701#else
1702 int rc = SQLITE_OK;
1703 int i;
1704 SortSubtask *pTask = 0; /* Thread context used to create new PMA */
1705 int nWorker = (pSorter->nTask-1);
1706
1707 /* Set the flag to indicate that at least one PMA has been written.
1708 ** Or will be, anyhow. */
1709 pSorter->bUsePMA = 1;
1710
1711 /* Select a sub-task to sort and flush the current list of in-memory
1712 ** records to disk. If the sorter is running in multi-threaded mode,
1713 ** round-robin between the first (pSorter->nTask-1) tasks. Except, if
1714 ** the background thread from a sub-tasks previous turn is still running,
1715 ** skip it. If the first (pSorter->nTask-1) sub-tasks are all still busy,
1716 ** fall back to using the final sub-task. The first (pSorter->nTask-1)
1717 ** sub-tasks are prefered as they use background threads - the final
1718 ** sub-task uses the main thread. */
1719 for(i=0; i<nWorker; i++){
1720 int iTest = (pSorter->iPrev + i + 1) % nWorker;
1721 pTask = &pSorter->aTask[iTest];
1722 if( pTask->bDone ){
1723 rc = vdbeSorterJoinThread(pTask);
1724 }
1725 if( rc!=SQLITE_OK || pTask->pThread==0 ) break;
1726 }
1727
1728 if( rc==SQLITE_OK ){
1729 if( i==nWorker ){
1730 /* Use the foreground thread for this operation */
1731 rc = vdbeSorterListToPMA(&pSorter->aTask[nWorker], &pSorter->list);
1732 }else{
1733 /* Launch a background thread for this operation */
1734 u8 *aMem;
1735 void *pCtx;
1736
1737 assert( pTask!=0 );
1738 assert( pTask->pThread==0 && pTask->bDone==0 );
1739 assert( pTask->list.pList==0 );
1740 assert( pTask->list.aMemory==0 || pSorter->list.aMemory!=0 );
1741
1742 aMem = pTask->list.aMemory;
1743 pCtx = (void*)pTask;
1744 pSorter->iPrev = (u8)(pTask - pSorter->aTask);
1745 pTask->list = pSorter->list;
1746 pSorter->list.pList = 0;
1747 pSorter->list.szPMA = 0;
1748 if( aMem ){
1749 pSorter->list.aMemory = aMem;
1750 pSorter->nMemory = sqlite3MallocSize(aMem);
1751 }else if( pSorter->list.aMemory ){
1752 pSorter->list.aMemory = sqlite3Malloc(pSorter->nMemory);
1753 if( !pSorter->list.aMemory ) return SQLITE_NOMEM_BKPT;
1754 }
1755
1756 rc = vdbeSorterCreateThread(pTask, vdbeSorterFlushThread, pCtx);
1757 }
1758 }
1759
1760 return rc;
1761#endif /* SQLITE_MAX_WORKER_THREADS!=0 */
1762}
1763
1764/*
1765** Add a record to the sorter.
1766*/
1767int sqlite3VdbeSorterWrite(
1768 const VdbeCursor *pCsr, /* Sorter cursor */
1769 Mem *pVal /* Memory cell containing record */
1770){
1771 VdbeSorter *pSorter;
1772 int rc = SQLITE_OK; /* Return Code */
1773 SorterRecord *pNew; /* New list element */
1774 int bFlush; /* True to flush contents of memory to PMA */
1775 int nReq; /* Bytes of memory required */
1776 int nPMA; /* Bytes of PMA space required */
1777 int t; /* serial type of first record field */
1778
1779 assert( pCsr->eCurType==CURTYPE_SORTER );
1780 pSorter = pCsr->uc.pSorter;
1781 getVarint32NR((const u8*)&pVal->z[1], t);
1782 if( t>0 && t<10 && t!=7 ){
1783 pSorter->typeMask &= SORTER_TYPE_INTEGER;
1784 }else if( t>10 && (t & 0x01) ){
1785 pSorter->typeMask &= SORTER_TYPE_TEXT;
1786 }else{
1787 pSorter->typeMask = 0;
1788 }
1789
1790 assert( pSorter );
1791
1792 /* Figure out whether or not the current contents of memory should be
1793 ** flushed to a PMA before continuing. If so, do so.
1794 **
1795 ** If using the single large allocation mode (pSorter->aMemory!=0), then
1796 ** flush the contents of memory to a new PMA if (a) at least one value is
1797 ** already in memory and (b) the new value will not fit in memory.
1798 **
1799 ** Or, if using separate allocations for each record, flush the contents
1800 ** of memory to a PMA if either of the following are true:
1801 **
1802 ** * The total memory allocated for the in-memory list is greater
1803 ** than (page-size * cache-size), or
1804 **
1805 ** * The total memory allocated for the in-memory list is greater
1806 ** than (page-size * 10) and sqlite3HeapNearlyFull() returns true.
1807 */
1808 nReq = pVal->n + sizeof(SorterRecord);
1809 nPMA = pVal->n + sqlite3VarintLen(pVal->n);
1810 if( pSorter->mxPmaSize ){
1811 if( pSorter->list.aMemory ){
1812 bFlush = pSorter->iMemory && (pSorter->iMemory+nReq) > pSorter->mxPmaSize;
1813 }else{
1814 bFlush = (
1815 (pSorter->list.szPMA > pSorter->mxPmaSize)
1816 || (pSorter->list.szPMA > pSorter->mnPmaSize && sqlite3HeapNearlyFull())
1817 );
1818 }
1819 if( bFlush ){
1820 rc = vdbeSorterFlushPMA(pSorter);
1821 pSorter->list.szPMA = 0;
1822 pSorter->iMemory = 0;
1823 assert( rc!=SQLITE_OK || pSorter->list.pList==0 );
1824 }
1825 }
1826
1827 pSorter->list.szPMA += nPMA;
1828 if( nPMA>pSorter->mxKeysize ){
1829 pSorter->mxKeysize = nPMA;
1830 }
1831
1832 if( pSorter->list.aMemory ){
1833 int nMin = pSorter->iMemory + nReq;
1834
1835 if( nMin>pSorter->nMemory ){
1836 u8 *aNew;
1837 sqlite3_int64 nNew = 2 * (sqlite3_int64)pSorter->nMemory;
1838 int iListOff = -1;
1839 if( pSorter->list.pList ){
1840 iListOff = (u8*)pSorter->list.pList - pSorter->list.aMemory;
1841 }
1842 while( nNew < nMin ) nNew = nNew*2;
1843 if( nNew > pSorter->mxPmaSize ) nNew = pSorter->mxPmaSize;
1844 if( nNew < nMin ) nNew = nMin;
1845 aNew = sqlite3Realloc(pSorter->list.aMemory, nNew);
1846 if( !aNew ) return SQLITE_NOMEM_BKPT;
1847 if( iListOff>=0 ){
1848 pSorter->list.pList = (SorterRecord*)&aNew[iListOff];
1849 }
1850 pSorter->list.aMemory = aNew;
1851 pSorter->nMemory = nNew;
1852 }
1853
1854 pNew = (SorterRecord*)&pSorter->list.aMemory[pSorter->iMemory];
1855 pSorter->iMemory += ROUND8(nReq);
1856 if( pSorter->list.pList ){
1857 pNew->u.iNext = (int)((u8*)(pSorter->list.pList) - pSorter->list.aMemory);
1858 }
1859 }else{
1860 pNew = (SorterRecord *)sqlite3Malloc(nReq);
1861 if( pNew==0 ){
1862 return SQLITE_NOMEM_BKPT;
1863 }
1864 pNew->u.pNext = pSorter->list.pList;
1865 }
1866
1867 memcpy(SRVAL(pNew), pVal->z, pVal->n);
1868 pNew->nVal = pVal->n;
1869 pSorter->list.pList = pNew;
1870
1871 return rc;
1872}
1873
1874/*
1875** Read keys from pIncr->pMerger and populate pIncr->aFile[1]. The format
1876** of the data stored in aFile[1] is the same as that used by regular PMAs,
1877** except that the number-of-bytes varint is omitted from the start.
1878*/
1879static int vdbeIncrPopulate(IncrMerger *pIncr){
1880 int rc = SQLITE_OK;
1881 int rc2;
1882 i64 iStart = pIncr->iStartOff;
1883 SorterFile *pOut = &pIncr->aFile[1];
1884 SortSubtask *pTask = pIncr->pTask;
1885 MergeEngine *pMerger = pIncr->pMerger;
1886 PmaWriter writer;
1887 assert( pIncr->bEof==0 );
1888
1889 vdbeSorterPopulateDebug(pTask, "enter");
1890
1891 vdbePmaWriterInit(pOut->pFd, &writer, pTask->pSorter->pgsz, iStart);
1892 while( rc==SQLITE_OK ){
1893 int dummy;
1894 PmaReader *pReader = &pMerger->aReadr[ pMerger->aTree[1] ];
1895 int nKey = pReader->nKey;
1896 i64 iEof = writer.iWriteOff + writer.iBufEnd;
1897
1898 /* Check if the output file is full or if the input has been exhausted.
1899 ** In either case exit the loop. */
1900 if( pReader->pFd==0 ) break;
1901 if( (iEof + nKey + sqlite3VarintLen(nKey))>(iStart + pIncr->mxSz) ) break;
1902
1903 /* Write the next key to the output. */
1904 vdbePmaWriteVarint(&writer, nKey);
1905 vdbePmaWriteBlob(&writer, pReader->aKey, nKey);
1906 assert( pIncr->pMerger->pTask==pTask );
1907 rc = vdbeMergeEngineStep(pIncr->pMerger, &dummy);
1908 }
1909
1910 rc2 = vdbePmaWriterFinish(&writer, &pOut->iEof);
1911 if( rc==SQLITE_OK ) rc = rc2;
1912 vdbeSorterPopulateDebug(pTask, "exit");
1913 return rc;
1914}
1915
1916#if SQLITE_MAX_WORKER_THREADS>0
1917/*
1918** The main routine for background threads that populate aFile[1] of
1919** multi-threaded IncrMerger objects.
1920*/
1921static void *vdbeIncrPopulateThread(void *pCtx){
1922 IncrMerger *pIncr = (IncrMerger*)pCtx;
1923 void *pRet = SQLITE_INT_TO_PTR( vdbeIncrPopulate(pIncr) );
1924 pIncr->pTask->bDone = 1;
1925 return pRet;
1926}
1927
1928/*
1929** Launch a background thread to populate aFile[1] of pIncr.
1930*/
1931static int vdbeIncrBgPopulate(IncrMerger *pIncr){
1932 void *p = (void*)pIncr;
1933 assert( pIncr->bUseThread );
1934 return vdbeSorterCreateThread(pIncr->pTask, vdbeIncrPopulateThread, p);
1935}
1936#endif
1937
1938/*
1939** This function is called when the PmaReader corresponding to pIncr has
1940** finished reading the contents of aFile[0]. Its purpose is to "refill"
1941** aFile[0] such that the PmaReader should start rereading it from the
1942** beginning.
1943**
1944** For single-threaded objects, this is accomplished by literally reading
1945** keys from pIncr->pMerger and repopulating aFile[0].
1946**
1947** For multi-threaded objects, all that is required is to wait until the
1948** background thread is finished (if it is not already) and then swap
1949** aFile[0] and aFile[1] in place. If the contents of pMerger have not
1950** been exhausted, this function also launches a new background thread
1951** to populate the new aFile[1].
1952**
1953** SQLITE_OK is returned on success, or an SQLite error code otherwise.
1954*/
1955static int vdbeIncrSwap(IncrMerger *pIncr){
1956 int rc = SQLITE_OK;
1957
1958#if SQLITE_MAX_WORKER_THREADS>0
1959 if( pIncr->bUseThread ){
1960 rc = vdbeSorterJoinThread(pIncr->pTask);
1961
1962 if( rc==SQLITE_OK ){
1963 SorterFile f0 = pIncr->aFile[0];
1964 pIncr->aFile[0] = pIncr->aFile[1];
1965 pIncr->aFile[1] = f0;
1966 }
1967
1968 if( rc==SQLITE_OK ){
1969 if( pIncr->aFile[0].iEof==pIncr->iStartOff ){
1970 pIncr->bEof = 1;
1971 }else{
1972 rc = vdbeIncrBgPopulate(pIncr);
1973 }
1974 }
1975 }else
1976#endif
1977 {
1978 rc = vdbeIncrPopulate(pIncr);
1979 pIncr->aFile[0] = pIncr->aFile[1];
1980 if( pIncr->aFile[0].iEof==pIncr->iStartOff ){
1981 pIncr->bEof = 1;
1982 }
1983 }
1984
1985 return rc;
1986}
1987
1988/*
1989** Allocate and return a new IncrMerger object to read data from pMerger.
1990**
1991** If an OOM condition is encountered, return NULL. In this case free the
1992** pMerger argument before returning.
1993*/
1994static int vdbeIncrMergerNew(
1995 SortSubtask *pTask, /* The thread that will be using the new IncrMerger */
1996 MergeEngine *pMerger, /* The MergeEngine that the IncrMerger will control */
1997 IncrMerger **ppOut /* Write the new IncrMerger here */
1998){
1999 int rc = SQLITE_OK;
2000 IncrMerger *pIncr = *ppOut = (IncrMerger*)
2001 (sqlite3FaultSim(100) ? 0 : sqlite3MallocZero(sizeof(*pIncr)));
2002 if( pIncr ){
2003 pIncr->pMerger = pMerger;
2004 pIncr->pTask = pTask;
2005 pIncr->mxSz = MAX(pTask->pSorter->mxKeysize+9,pTask->pSorter->mxPmaSize/2);
2006 pTask->file2.iEof += pIncr->mxSz;
2007 }else{
2008 vdbeMergeEngineFree(pMerger);
2009 rc = SQLITE_NOMEM_BKPT;
2010 }
2011 assert( *ppOut!=0 || rc!=SQLITE_OK );
2012 return rc;
2013}
2014
2015#if SQLITE_MAX_WORKER_THREADS>0
2016/*
2017** Set the "use-threads" flag on object pIncr.
2018*/
2019static void vdbeIncrMergerSetThreads(IncrMerger *pIncr){
2020 pIncr->bUseThread = 1;
2021 pIncr->pTask->file2.iEof -= pIncr->mxSz;
2022}
2023#endif /* SQLITE_MAX_WORKER_THREADS>0 */
2024
2025
2026
2027/*
2028** Recompute pMerger->aTree[iOut] by comparing the next keys on the
2029** two PmaReaders that feed that entry. Neither of the PmaReaders
2030** are advanced. This routine merely does the comparison.
2031*/
2032static void vdbeMergeEngineCompare(
2033 MergeEngine *pMerger, /* Merge engine containing PmaReaders to compare */
2034 int iOut /* Store the result in pMerger->aTree[iOut] */
2035){
2036 int i1;
2037 int i2;
2038 int iRes;
2039 PmaReader *p1;
2040 PmaReader *p2;
2041
2042 assert( iOut<pMerger->nTree && iOut>0 );
2043
2044 if( iOut>=(pMerger->nTree/2) ){
2045 i1 = (iOut - pMerger->nTree/2) * 2;
2046 i2 = i1 + 1;
2047 }else{
2048 i1 = pMerger->aTree[iOut*2];
2049 i2 = pMerger->aTree[iOut*2+1];
2050 }
2051
2052 p1 = &pMerger->aReadr[i1];
2053 p2 = &pMerger->aReadr[i2];
2054
2055 if( p1->pFd==0 ){
2056 iRes = i2;
2057 }else if( p2->pFd==0 ){
2058 iRes = i1;
2059 }else{
2060 SortSubtask *pTask = pMerger->pTask;
2061 int bCached = 0;
2062 int res;
2063 assert( pTask->pUnpacked!=0 ); /* from vdbeSortSubtaskMain() */
2064 res = pTask->xCompare(
2065 pTask, &bCached, p1->aKey, p1->nKey, p2->aKey, p2->nKey
2066 );
2067 if( res<=0 ){
2068 iRes = i1;
2069 }else{
2070 iRes = i2;
2071 }
2072 }
2073
2074 pMerger->aTree[iOut] = iRes;
2075}
2076
2077/*
2078** Allowed values for the eMode parameter to vdbeMergeEngineInit()
2079** and vdbePmaReaderIncrMergeInit().
2080**
2081** Only INCRINIT_NORMAL is valid in single-threaded builds (when
2082** SQLITE_MAX_WORKER_THREADS==0). The other values are only used
2083** when there exists one or more separate worker threads.
2084*/
2085#define INCRINIT_NORMAL 0
2086#define INCRINIT_TASK 1
2087#define INCRINIT_ROOT 2
2088
2089/*
2090** Forward reference required as the vdbeIncrMergeInit() and
2091** vdbePmaReaderIncrInit() routines are called mutually recursively when
2092** building a merge tree.
2093*/
2094static int vdbePmaReaderIncrInit(PmaReader *pReadr, int eMode);
2095
2096/*
2097** Initialize the MergeEngine object passed as the second argument. Once this
2098** function returns, the first key of merged data may be read from the
2099** MergeEngine object in the usual fashion.
2100**
2101** If argument eMode is INCRINIT_ROOT, then it is assumed that any IncrMerge
2102** objects attached to the PmaReader objects that the merger reads from have
2103** already been populated, but that they have not yet populated aFile[0] and
2104** set the PmaReader objects up to read from it. In this case all that is
2105** required is to call vdbePmaReaderNext() on each PmaReader to point it at
2106** its first key.
2107**
2108** Otherwise, if eMode is any value other than INCRINIT_ROOT, then use
2109** vdbePmaReaderIncrMergeInit() to initialize each PmaReader that feeds data
2110** to pMerger.
2111**
2112** SQLITE_OK is returned if successful, or an SQLite error code otherwise.
2113*/
2114static int vdbeMergeEngineInit(
2115 SortSubtask *pTask, /* Thread that will run pMerger */
2116 MergeEngine *pMerger, /* MergeEngine to initialize */
2117 int eMode /* One of the INCRINIT_XXX constants */
2118){
2119 int rc = SQLITE_OK; /* Return code */
2120 int i; /* For looping over PmaReader objects */
2121 int nTree; /* Number of subtrees to merge */
2122
2123 /* Failure to allocate the merge would have been detected prior to
2124 ** invoking this routine */
2125 assert( pMerger!=0 );
2126
2127 /* eMode is always INCRINIT_NORMAL in single-threaded mode */
2128 assert( SQLITE_MAX_WORKER_THREADS>0 || eMode==INCRINIT_NORMAL );
2129
2130 /* Verify that the MergeEngine is assigned to a single thread */
2131 assert( pMerger->pTask==0 );
2132 pMerger->pTask = pTask;
2133
2134 nTree = pMerger->nTree;
2135 for(i=0; i<nTree; i++){
2136 if( SQLITE_MAX_WORKER_THREADS>0 && eMode==INCRINIT_ROOT ){
2137 /* PmaReaders should be normally initialized in order, as if they are
2138 ** reading from the same temp file this makes for more linear file IO.
2139 ** However, in the INCRINIT_ROOT case, if PmaReader aReadr[nTask-1] is
2140 ** in use it will block the vdbePmaReaderNext() call while it uses
2141 ** the main thread to fill its buffer. So calling PmaReaderNext()
2142 ** on this PmaReader before any of the multi-threaded PmaReaders takes
2143 ** better advantage of multi-processor hardware. */
2144 rc = vdbePmaReaderNext(&pMerger->aReadr[nTree-i-1]);
2145 }else{
2146 rc = vdbePmaReaderIncrInit(&pMerger->aReadr[i], INCRINIT_NORMAL);
2147 }
2148 if( rc!=SQLITE_OK ) return rc;
2149 }
2150
2151 for(i=pMerger->nTree-1; i>0; i--){
2152 vdbeMergeEngineCompare(pMerger, i);
2153 }
2154 return pTask->pUnpacked->errCode;
2155}
2156
2157/*
2158** The PmaReader passed as the first argument is guaranteed to be an
2159** incremental-reader (pReadr->pIncr!=0). This function serves to open
2160** and/or initialize the temp file related fields of the IncrMerge
2161** object at (pReadr->pIncr).
2162**
2163** If argument eMode is set to INCRINIT_NORMAL, then all PmaReaders
2164** in the sub-tree headed by pReadr are also initialized. Data is then
2165** loaded into the buffers belonging to pReadr and it is set to point to
2166** the first key in its range.
2167**
2168** If argument eMode is set to INCRINIT_TASK, then pReadr is guaranteed
2169** to be a multi-threaded PmaReader and this function is being called in a
2170** background thread. In this case all PmaReaders in the sub-tree are
2171** initialized as for INCRINIT_NORMAL and the aFile[1] buffer belonging to
2172** pReadr is populated. However, pReadr itself is not set up to point
2173** to its first key. A call to vdbePmaReaderNext() is still required to do
2174** that.
2175**
2176** The reason this function does not call vdbePmaReaderNext() immediately
2177** in the INCRINIT_TASK case is that vdbePmaReaderNext() assumes that it has
2178** to block on thread (pTask->thread) before accessing aFile[1]. But, since
2179** this entire function is being run by thread (pTask->thread), that will
2180** lead to the current background thread attempting to join itself.
2181**
2182** Finally, if argument eMode is set to INCRINIT_ROOT, it may be assumed
2183** that pReadr->pIncr is a multi-threaded IncrMerge objects, and that all
2184** child-trees have already been initialized using IncrInit(INCRINIT_TASK).
2185** In this case vdbePmaReaderNext() is called on all child PmaReaders and
2186** the current PmaReader set to point to the first key in its range.
2187**
2188** SQLITE_OK is returned if successful, or an SQLite error code otherwise.
2189*/
2190static int vdbePmaReaderIncrMergeInit(PmaReader *pReadr, int eMode){
2191 int rc = SQLITE_OK;
2192 IncrMerger *pIncr = pReadr->pIncr;
2193 SortSubtask *pTask = pIncr->pTask;
2194 sqlite3 *db = pTask->pSorter->db;
2195
2196 /* eMode is always INCRINIT_NORMAL in single-threaded mode */
2197 assert( SQLITE_MAX_WORKER_THREADS>0 || eMode==INCRINIT_NORMAL );
2198
2199 rc = vdbeMergeEngineInit(pTask, pIncr->pMerger, eMode);
2200
2201 /* Set up the required files for pIncr. A multi-theaded IncrMerge object
2202 ** requires two temp files to itself, whereas a single-threaded object
2203 ** only requires a region of pTask->file2. */
2204 if( rc==SQLITE_OK ){
2205 int mxSz = pIncr->mxSz;
2206#if SQLITE_MAX_WORKER_THREADS>0
2207 if( pIncr->bUseThread ){
2208 rc = vdbeSorterOpenTempFile(db, mxSz, &pIncr->aFile[0].pFd);
2209 if( rc==SQLITE_OK ){
2210 rc = vdbeSorterOpenTempFile(db, mxSz, &pIncr->aFile[1].pFd);
2211 }
2212 }else
2213#endif
2214 /*if( !pIncr->bUseThread )*/{
2215 if( pTask->file2.pFd==0 ){
2216 assert( pTask->file2.iEof>0 );
2217 rc = vdbeSorterOpenTempFile(db, pTask->file2.iEof, &pTask->file2.pFd);
2218 pTask->file2.iEof = 0;
2219 }
2220 if( rc==SQLITE_OK ){
2221 pIncr->aFile[1].pFd = pTask->file2.pFd;
2222 pIncr->iStartOff = pTask->file2.iEof;
2223 pTask->file2.iEof += mxSz;
2224 }
2225 }
2226 }
2227
2228#if SQLITE_MAX_WORKER_THREADS>0
2229 if( rc==SQLITE_OK && pIncr->bUseThread ){
2230 /* Use the current thread to populate aFile[1], even though this
2231 ** PmaReader is multi-threaded. If this is an INCRINIT_TASK object,
2232 ** then this function is already running in background thread
2233 ** pIncr->pTask->thread.
2234 **
2235 ** If this is the INCRINIT_ROOT object, then it is running in the
2236 ** main VDBE thread. But that is Ok, as that thread cannot return
2237 ** control to the VDBE or proceed with anything useful until the
2238 ** first results are ready from this merger object anyway.
2239 */
2240 assert( eMode==INCRINIT_ROOT || eMode==INCRINIT_TASK );
2241 rc = vdbeIncrPopulate(pIncr);
2242 }
2243#endif
2244
2245 if( rc==SQLITE_OK && (SQLITE_MAX_WORKER_THREADS==0 || eMode!=INCRINIT_TASK) ){
2246 rc = vdbePmaReaderNext(pReadr);
2247 }
2248
2249 return rc;
2250}
2251
2252#if SQLITE_MAX_WORKER_THREADS>0
2253/*
2254** The main routine for vdbePmaReaderIncrMergeInit() operations run in
2255** background threads.
2256*/
2257static void *vdbePmaReaderBgIncrInit(void *pCtx){
2258 PmaReader *pReader = (PmaReader*)pCtx;
2259 void *pRet = SQLITE_INT_TO_PTR(
2260 vdbePmaReaderIncrMergeInit(pReader,INCRINIT_TASK)
2261 );
2262 pReader->pIncr->pTask->bDone = 1;
2263 return pRet;
2264}
2265#endif
2266
2267/*
2268** If the PmaReader passed as the first argument is not an incremental-reader
2269** (if pReadr->pIncr==0), then this function is a no-op. Otherwise, it invokes
2270** the vdbePmaReaderIncrMergeInit() function with the parameters passed to
2271** this routine to initialize the incremental merge.
2272**
2273** If the IncrMerger object is multi-threaded (IncrMerger.bUseThread==1),
2274** then a background thread is launched to call vdbePmaReaderIncrMergeInit().
2275** Or, if the IncrMerger is single threaded, the same function is called
2276** using the current thread.
2277*/
2278static int vdbePmaReaderIncrInit(PmaReader *pReadr, int eMode){
2279 IncrMerger *pIncr = pReadr->pIncr; /* Incremental merger */
2280 int rc = SQLITE_OK; /* Return code */
2281 if( pIncr ){
2282#if SQLITE_MAX_WORKER_THREADS>0
2283 assert( pIncr->bUseThread==0 || eMode==INCRINIT_TASK );
2284 if( pIncr->bUseThread ){
2285 void *pCtx = (void*)pReadr;
2286 rc = vdbeSorterCreateThread(pIncr->pTask, vdbePmaReaderBgIncrInit, pCtx);
2287 }else
2288#endif
2289 {
2290 rc = vdbePmaReaderIncrMergeInit(pReadr, eMode);
2291 }
2292 }
2293 return rc;
2294}
2295
2296/*
2297** Allocate a new MergeEngine object to merge the contents of nPMA level-0
2298** PMAs from pTask->file. If no error occurs, set *ppOut to point to
2299** the new object and return SQLITE_OK. Or, if an error does occur, set *ppOut
2300** to NULL and return an SQLite error code.
2301**
2302** When this function is called, *piOffset is set to the offset of the
2303** first PMA to read from pTask->file. Assuming no error occurs, it is
2304** set to the offset immediately following the last byte of the last
2305** PMA before returning. If an error does occur, then the final value of
2306** *piOffset is undefined.
2307*/
2308static int vdbeMergeEngineLevel0(
2309 SortSubtask *pTask, /* Sorter task to read from */
2310 int nPMA, /* Number of PMAs to read */
2311 i64 *piOffset, /* IN/OUT: Readr offset in pTask->file */
2312 MergeEngine **ppOut /* OUT: New merge-engine */
2313){
2314 MergeEngine *pNew; /* Merge engine to return */
2315 i64 iOff = *piOffset;
2316 int i;
2317 int rc = SQLITE_OK;
2318
2319 *ppOut = pNew = vdbeMergeEngineNew(nPMA);
2320 if( pNew==0 ) rc = SQLITE_NOMEM_BKPT;
2321
2322 for(i=0; i<nPMA && rc==SQLITE_OK; i++){
2323 i64 nDummy = 0;
2324 PmaReader *pReadr = &pNew->aReadr[i];
2325 rc = vdbePmaReaderInit(pTask, &pTask->file, iOff, pReadr, &nDummy);
2326 iOff = pReadr->iEof;
2327 }
2328
2329 if( rc!=SQLITE_OK ){
2330 vdbeMergeEngineFree(pNew);
2331 *ppOut = 0;
2332 }
2333 *piOffset = iOff;
2334 return rc;
2335}
2336
2337/*
2338** Return the depth of a tree comprising nPMA PMAs, assuming a fanout of
2339** SORTER_MAX_MERGE_COUNT. The returned value does not include leaf nodes.
2340**
2341** i.e.
2342**
2343** nPMA<=16 -> TreeDepth() == 0
2344** nPMA<=256 -> TreeDepth() == 1
2345** nPMA<=65536 -> TreeDepth() == 2
2346*/
2347static int vdbeSorterTreeDepth(int nPMA){
2348 int nDepth = 0;
2349 i64 nDiv = SORTER_MAX_MERGE_COUNT;
2350 while( nDiv < (i64)nPMA ){
2351 nDiv = nDiv * SORTER_MAX_MERGE_COUNT;
2352 nDepth++;
2353 }
2354 return nDepth;
2355}
2356
2357/*
2358** pRoot is the root of an incremental merge-tree with depth nDepth (according
2359** to vdbeSorterTreeDepth()). pLeaf is the iSeq'th leaf to be added to the
2360** tree, counting from zero. This function adds pLeaf to the tree.
2361**
2362** If successful, SQLITE_OK is returned. If an error occurs, an SQLite error
2363** code is returned and pLeaf is freed.
2364*/
2365static int vdbeSorterAddToTree(
2366 SortSubtask *pTask, /* Task context */
2367 int nDepth, /* Depth of tree according to TreeDepth() */
2368 int iSeq, /* Sequence number of leaf within tree */
2369 MergeEngine *pRoot, /* Root of tree */
2370 MergeEngine *pLeaf /* Leaf to add to tree */
2371){
2372 int rc = SQLITE_OK;
2373 int nDiv = 1;
2374 int i;
2375 MergeEngine *p = pRoot;
2376 IncrMerger *pIncr;
2377
2378 rc = vdbeIncrMergerNew(pTask, pLeaf, &pIncr);
2379
2380 for(i=1; i<nDepth; i++){
2381 nDiv = nDiv * SORTER_MAX_MERGE_COUNT;
2382 }
2383
2384 for(i=1; i<nDepth && rc==SQLITE_OK; i++){
2385 int iIter = (iSeq / nDiv) % SORTER_MAX_MERGE_COUNT;
2386 PmaReader *pReadr = &p->aReadr[iIter];
2387
2388 if( pReadr->pIncr==0 ){
2389 MergeEngine *pNew = vdbeMergeEngineNew(SORTER_MAX_MERGE_COUNT);
2390 if( pNew==0 ){
2391 rc = SQLITE_NOMEM_BKPT;
2392 }else{
2393 rc = vdbeIncrMergerNew(pTask, pNew, &pReadr->pIncr);
2394 }
2395 }
2396 if( rc==SQLITE_OK ){
2397 p = pReadr->pIncr->pMerger;
2398 nDiv = nDiv / SORTER_MAX_MERGE_COUNT;
2399 }
2400 }
2401
2402 if( rc==SQLITE_OK ){
2403 p->aReadr[iSeq % SORTER_MAX_MERGE_COUNT].pIncr = pIncr;
2404 }else{
2405 vdbeIncrFree(pIncr);
2406 }
2407 return rc;
2408}
2409
2410/*
2411** This function is called as part of a SorterRewind() operation on a sorter
2412** that has already written two or more level-0 PMAs to one or more temp
2413** files. It builds a tree of MergeEngine/IncrMerger/PmaReader objects that
2414** can be used to incrementally merge all PMAs on disk.
2415**
2416** If successful, SQLITE_OK is returned and *ppOut set to point to the
2417** MergeEngine object at the root of the tree before returning. Or, if an
2418** error occurs, an SQLite error code is returned and the final value
2419** of *ppOut is undefined.
2420*/
2421static int vdbeSorterMergeTreeBuild(
2422 VdbeSorter *pSorter, /* The VDBE cursor that implements the sort */
2423 MergeEngine **ppOut /* Write the MergeEngine here */
2424){
2425 MergeEngine *pMain = 0;
2426 int rc = SQLITE_OK;
2427 int iTask;
2428
2429#if SQLITE_MAX_WORKER_THREADS>0
2430 /* If the sorter uses more than one task, then create the top-level
2431 ** MergeEngine here. This MergeEngine will read data from exactly
2432 ** one PmaReader per sub-task. */
2433 assert( pSorter->bUseThreads || pSorter->nTask==1 );
2434 if( pSorter->nTask>1 ){
2435 pMain = vdbeMergeEngineNew(pSorter->nTask);
2436 if( pMain==0 ) rc = SQLITE_NOMEM_BKPT;
2437 }
2438#endif
2439
2440 for(iTask=0; rc==SQLITE_OK && iTask<pSorter->nTask; iTask++){
2441 SortSubtask *pTask = &pSorter->aTask[iTask];
2442 assert( pTask->nPMA>0 || SQLITE_MAX_WORKER_THREADS>0 );
2443 if( SQLITE_MAX_WORKER_THREADS==0 || pTask->nPMA ){
2444 MergeEngine *pRoot = 0; /* Root node of tree for this task */
2445 int nDepth = vdbeSorterTreeDepth(pTask->nPMA);
2446 i64 iReadOff = 0;
2447
2448 if( pTask->nPMA<=SORTER_MAX_MERGE_COUNT ){
2449 rc = vdbeMergeEngineLevel0(pTask, pTask->nPMA, &iReadOff, &pRoot);
2450 }else{
2451 int i;
2452 int iSeq = 0;
2453 pRoot = vdbeMergeEngineNew(SORTER_MAX_MERGE_COUNT);
2454 if( pRoot==0 ) rc = SQLITE_NOMEM_BKPT;
2455 for(i=0; i<pTask->nPMA && rc==SQLITE_OK; i += SORTER_MAX_MERGE_COUNT){
2456 MergeEngine *pMerger = 0; /* New level-0 PMA merger */
2457 int nReader; /* Number of level-0 PMAs to merge */
2458
2459 nReader = MIN(pTask->nPMA - i, SORTER_MAX_MERGE_COUNT);
2460 rc = vdbeMergeEngineLevel0(pTask, nReader, &iReadOff, &pMerger);
2461 if( rc==SQLITE_OK ){
2462 rc = vdbeSorterAddToTree(pTask, nDepth, iSeq++, pRoot, pMerger);
2463 }
2464 }
2465 }
2466
2467 if( rc==SQLITE_OK ){
2468#if SQLITE_MAX_WORKER_THREADS>0
2469 if( pMain!=0 ){
2470 rc = vdbeIncrMergerNew(pTask, pRoot, &pMain->aReadr[iTask].pIncr);
2471 }else
2472#endif
2473 {
2474 assert( pMain==0 );
2475 pMain = pRoot;
2476 }
2477 }else{
2478 vdbeMergeEngineFree(pRoot);
2479 }
2480 }
2481 }
2482
2483 if( rc!=SQLITE_OK ){
2484 vdbeMergeEngineFree(pMain);
2485 pMain = 0;
2486 }
2487 *ppOut = pMain;
2488 return rc;
2489}
2490
2491/*
2492** This function is called as part of an sqlite3VdbeSorterRewind() operation
2493** on a sorter that has written two or more PMAs to temporary files. It sets
2494** up either VdbeSorter.pMerger (for single threaded sorters) or pReader
2495** (for multi-threaded sorters) so that it can be used to iterate through
2496** all records stored in the sorter.
2497**
2498** SQLITE_OK is returned if successful, or an SQLite error code otherwise.
2499*/
2500static int vdbeSorterSetupMerge(VdbeSorter *pSorter){
2501 int rc; /* Return code */
2502 SortSubtask *pTask0 = &pSorter->aTask[0];
2503 MergeEngine *pMain = 0;
2504#if SQLITE_MAX_WORKER_THREADS
2505 sqlite3 *db = pTask0->pSorter->db;
2506 int i;
2507 SorterCompare xCompare = vdbeSorterGetCompare(pSorter);
2508 for(i=0; i<pSorter->nTask; i++){
2509 pSorter->aTask[i].xCompare = xCompare;
2510 }
2511#endif
2512
2513 rc = vdbeSorterMergeTreeBuild(pSorter, &pMain);
2514 if( rc==SQLITE_OK ){
2515#if SQLITE_MAX_WORKER_THREADS
2516 assert( pSorter->bUseThreads==0 || pSorter->nTask>1 );
2517 if( pSorter->bUseThreads ){
2518 int iTask;
2519 PmaReader *pReadr = 0;
2520 SortSubtask *pLast = &pSorter->aTask[pSorter->nTask-1];
2521 rc = vdbeSortAllocUnpacked(pLast);
2522 if( rc==SQLITE_OK ){
2523 pReadr = (PmaReader*)sqlite3DbMallocZero(db, sizeof(PmaReader));
2524 pSorter->pReader = pReadr;
2525 if( pReadr==0 ) rc = SQLITE_NOMEM_BKPT;
2526 }
2527 if( rc==SQLITE_OK ){
2528 rc = vdbeIncrMergerNew(pLast, pMain, &pReadr->pIncr);
2529 if( rc==SQLITE_OK ){
2530 vdbeIncrMergerSetThreads(pReadr->pIncr);
2531 for(iTask=0; iTask<(pSorter->nTask-1); iTask++){
2532 IncrMerger *pIncr;
2533 if( (pIncr = pMain->aReadr[iTask].pIncr) ){
2534 vdbeIncrMergerSetThreads(pIncr);
2535 assert( pIncr->pTask!=pLast );
2536 }
2537 }
2538 for(iTask=0; rc==SQLITE_OK && iTask<pSorter->nTask; iTask++){
2539 /* Check that:
2540 **
2541 ** a) The incremental merge object is configured to use the
2542 ** right task, and
2543 ** b) If it is using task (nTask-1), it is configured to run
2544 ** in single-threaded mode. This is important, as the
2545 ** root merge (INCRINIT_ROOT) will be using the same task
2546 ** object.
2547 */
2548 PmaReader *p = &pMain->aReadr[iTask];
2549 assert( p->pIncr==0 || (
2550 (p->pIncr->pTask==&pSorter->aTask[iTask]) /* a */
2551 && (iTask!=pSorter->nTask-1 || p->pIncr->bUseThread==0) /* b */
2552 ));
2553 rc = vdbePmaReaderIncrInit(p, INCRINIT_TASK);
2554 }
2555 }
2556 pMain = 0;
2557 }
2558 if( rc==SQLITE_OK ){
2559 rc = vdbePmaReaderIncrMergeInit(pReadr, INCRINIT_ROOT);
2560 }
2561 }else
2562#endif
2563 {
2564 rc = vdbeMergeEngineInit(pTask0, pMain, INCRINIT_NORMAL);
2565 pSorter->pMerger = pMain;
2566 pMain = 0;
2567 }
2568 }
2569
2570 if( rc!=SQLITE_OK ){
2571 vdbeMergeEngineFree(pMain);
2572 }
2573 return rc;
2574}
2575
2576
2577/*
2578** Once the sorter has been populated by calls to sqlite3VdbeSorterWrite,
2579** this function is called to prepare for iterating through the records
2580** in sorted order.
2581*/
2582int sqlite3VdbeSorterRewind(const VdbeCursor *pCsr, int *pbEof){
2583 VdbeSorter *pSorter;
2584 int rc = SQLITE_OK; /* Return code */
2585
2586 assert( pCsr->eCurType==CURTYPE_SORTER );
2587 pSorter = pCsr->uc.pSorter;
2588 assert( pSorter );
2589
2590 /* If no data has been written to disk, then do not do so now. Instead,
2591 ** sort the VdbeSorter.pRecord list. The vdbe layer will read data directly
2592 ** from the in-memory list. */
2593 if( pSorter->bUsePMA==0 ){
2594 if( pSorter->list.pList ){
2595 *pbEof = 0;
2596 rc = vdbeSorterSort(&pSorter->aTask[0], &pSorter->list);
2597 }else{
2598 *pbEof = 1;
2599 }
2600 return rc;
2601 }
2602
2603 /* Write the current in-memory list to a PMA. When the VdbeSorterWrite()
2604 ** function flushes the contents of memory to disk, it immediately always
2605 ** creates a new list consisting of a single key immediately afterwards.
2606 ** So the list is never empty at this point. */
2607 assert( pSorter->list.pList );
2608 rc = vdbeSorterFlushPMA(pSorter);
2609
2610 /* Join all threads */
2611 rc = vdbeSorterJoinAll(pSorter, rc);
2612
2613 vdbeSorterRewindDebug("rewind");
2614
2615 /* Assuming no errors have occurred, set up a merger structure to
2616 ** incrementally read and merge all remaining PMAs. */
2617 assert( pSorter->pReader==0 );
2618 if( rc==SQLITE_OK ){
2619 rc = vdbeSorterSetupMerge(pSorter);
2620 *pbEof = 0;
2621 }
2622
2623 vdbeSorterRewindDebug("rewinddone");
2624 return rc;
2625}
2626
2627/*
2628** Advance to the next element in the sorter. Return value:
2629**
2630** SQLITE_OK success
2631** SQLITE_DONE end of data
2632** otherwise some kind of error.
2633*/
2634int sqlite3VdbeSorterNext(sqlite3 *db, const VdbeCursor *pCsr){
2635 VdbeSorter *pSorter;
2636 int rc; /* Return code */
2637
2638 assert( pCsr->eCurType==CURTYPE_SORTER );
2639 pSorter = pCsr->uc.pSorter;
2640 assert( pSorter->bUsePMA || (pSorter->pReader==0 && pSorter->pMerger==0) );
2641 if( pSorter->bUsePMA ){
2642 assert( pSorter->pReader==0 || pSorter->pMerger==0 );
2643 assert( pSorter->bUseThreads==0 || pSorter->pReader );
2644 assert( pSorter->bUseThreads==1 || pSorter->pMerger );
2645#if SQLITE_MAX_WORKER_THREADS>0
2646 if( pSorter->bUseThreads ){
2647 rc = vdbePmaReaderNext(pSorter->pReader);
2648 if( rc==SQLITE_OK && pSorter->pReader->pFd==0 ) rc = SQLITE_DONE;
2649 }else
2650#endif
2651 /*if( !pSorter->bUseThreads )*/ {
2652 int res = 0;
2653 assert( pSorter->pMerger!=0 );
2654 assert( pSorter->pMerger->pTask==(&pSorter->aTask[0]) );
2655 rc = vdbeMergeEngineStep(pSorter->pMerger, &res);
2656 if( rc==SQLITE_OK && res ) rc = SQLITE_DONE;
2657 }
2658 }else{
2659 SorterRecord *pFree = pSorter->list.pList;
2660 pSorter->list.pList = pFree->u.pNext;
2661 pFree->u.pNext = 0;
2662 if( pSorter->list.aMemory==0 ) vdbeSorterRecordFree(db, pFree);
2663 rc = pSorter->list.pList ? SQLITE_OK : SQLITE_DONE;
2664 }
2665 return rc;
2666}
2667
2668/*
2669** Return a pointer to a buffer owned by the sorter that contains the
2670** current key.
2671*/
2672static void *vdbeSorterRowkey(
2673 const VdbeSorter *pSorter, /* Sorter object */
2674 int *pnKey /* OUT: Size of current key in bytes */
2675){
2676 void *pKey;
2677 if( pSorter->bUsePMA ){
2678 PmaReader *pReader;
2679#if SQLITE_MAX_WORKER_THREADS>0
2680 if( pSorter->bUseThreads ){
2681 pReader = pSorter->pReader;
2682 }else
2683#endif
2684 /*if( !pSorter->bUseThreads )*/{
2685 pReader = &pSorter->pMerger->aReadr[pSorter->pMerger->aTree[1]];
2686 }
2687 *pnKey = pReader->nKey;
2688 pKey = pReader->aKey;
2689 }else{
2690 *pnKey = pSorter->list.pList->nVal;
2691 pKey = SRVAL(pSorter->list.pList);
2692 }
2693 return pKey;
2694}
2695
2696/*
2697** Copy the current sorter key into the memory cell pOut.
2698*/
2699int sqlite3VdbeSorterRowkey(const VdbeCursor *pCsr, Mem *pOut){
2700 VdbeSorter *pSorter;
2701 void *pKey; int nKey; /* Sorter key to copy into pOut */
2702
2703 assert( pCsr->eCurType==CURTYPE_SORTER );
2704 pSorter = pCsr->uc.pSorter;
2705 pKey = vdbeSorterRowkey(pSorter, &nKey);
2706 if( sqlite3VdbeMemClearAndResize(pOut, nKey) ){
2707 return SQLITE_NOMEM_BKPT;
2708 }
2709 pOut->n = nKey;
2710 MemSetTypeFlag(pOut, MEM_Blob);
2711 memcpy(pOut->z, pKey, nKey);
2712
2713 return SQLITE_OK;
2714}
2715
2716/*
2717** Compare the key in memory cell pVal with the key that the sorter cursor
2718** passed as the first argument currently points to. For the purposes of
2719** the comparison, ignore the rowid field at the end of each record.
2720**
2721** If the sorter cursor key contains any NULL values, consider it to be
2722** less than pVal. Even if pVal also contains NULL values.
2723**
2724** If an error occurs, return an SQLite error code (i.e. SQLITE_NOMEM).
2725** Otherwise, set *pRes to a negative, zero or positive value if the
2726** key in pVal is smaller than, equal to or larger than the current sorter
2727** key.
2728**
2729** This routine forms the core of the OP_SorterCompare opcode, which in
2730** turn is used to verify uniqueness when constructing a UNIQUE INDEX.
2731*/
2732int sqlite3VdbeSorterCompare(
2733 const VdbeCursor *pCsr, /* Sorter cursor */
2734 Mem *pVal, /* Value to compare to current sorter key */
2735 int nKeyCol, /* Compare this many columns */
2736 int *pRes /* OUT: Result of comparison */
2737){
2738 VdbeSorter *pSorter;
2739 UnpackedRecord *r2;
2740 KeyInfo *pKeyInfo;
2741 int i;
2742 void *pKey; int nKey; /* Sorter key to compare pVal with */
2743
2744 assert( pCsr->eCurType==CURTYPE_SORTER );
2745 pSorter = pCsr->uc.pSorter;
2746 r2 = pSorter->pUnpacked;
2747 pKeyInfo = pCsr->pKeyInfo;
2748 if( r2==0 ){
2749 r2 = pSorter->pUnpacked = sqlite3VdbeAllocUnpackedRecord(pKeyInfo);
2750 if( r2==0 ) return SQLITE_NOMEM_BKPT;
2751 r2->nField = nKeyCol;
2752 }
2753 assert( r2->nField==nKeyCol );
2754
2755 pKey = vdbeSorterRowkey(pSorter, &nKey);
2756 sqlite3VdbeRecordUnpack(pKeyInfo, nKey, pKey, r2);
2757 for(i=0; i<nKeyCol; i++){
2758 if( r2->aMem[i].flags & MEM_Null ){
2759 *pRes = -1;
2760 return SQLITE_OK;
2761 }
2762 }
2763
2764 *pRes = sqlite3VdbeRecordCompare(pVal->n, pVal->z, r2);
2765 return SQLITE_OK;
2766}
2767