CUBRID Engine  latest
external_sort.c
Go to the documentation of this file.
1 /*
2  * Copyright 2008 Search Solution Corporation
3  * Copyright 2016 CUBRID Corporation
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 /*
20  * external_sort.c - External sorting module
21  */
22 
23 #ident "$Id$"
24 
25 #include "config.h"
26 
27 #include <stdio.h>
28 #include <string.h>
29 #include <stddef.h>
30 #include <stdlib.h>
31 #include <math.h>
32 
33 #include "error_manager.h"
34 #include "system_parameter.h"
35 #include "memory_alloc.h"
36 #include "external_sort.h"
37 #include "file_manager.h"
38 #include "page_buffer.h"
39 #include "log_manager.h"
40 #include "disk_manager.h"
41 #include "slotted_page.h"
42 #include "overflow_file.h"
43 #include "boot_sr.h"
44 #if defined(ENABLE_SYSTEMTAP)
45 #include "probes.h"
46 #endif /* ENABLE_SYSTEMTAP */
47 #if defined(SERVER_MODE)
48 #include "connection_error.h"
49 #endif /* SERVER_MODE */
50 #include "server_support.h"
51 #include "thread_entry_task.hpp"
52 #include "thread_manager.hpp" // for thread_get_thread_entry_info and thread_sleep
53 
54 #include <functional>
55 
56 /* Estimate on number of pages in the multipage temporary file */
57 #define SORT_MULTIPAGE_FILE_SIZE_ESTIMATE 20
58 
59 /* Upper limit on the half of the total number of the temporary files.
60  * The exact upper limit on total number of temp files is twice this number.
61  * (i.e., this number specifies the upper limit on the number of total input
62  * or total output files at each stage of the merging process.
63  */
64 #define SORT_MAX_HALF_FILES 4
65 
66 /* Lower limit on the half of the total number of the temporary files.
67  * The exact lower limit on total number of temp files is twice this number.
68  * (i.e., this number specifies the lower limit on the number of total input
69  * or total output files at each stage of the merging process.
70  */
71 #define SORT_MIN_HALF_FILES 2
72 
73 /* Initial size of the dynamic array that keeps the file contents list */
74 #define SORT_INITIAL_DYN_ARRAY_SIZE 30
75 
76 /* Expansion Ratio of the dynamic array that keeps the file contents list */
77 #define SORT_EXPAND_DYN_ARRAY_RATIO 1.5
78 
79 #define SORT_MAXREC_LENGTH \
80  ((ssize_t)(DB_PAGESIZE - sizeof(SLOTTED_PAGE_HEADER) - sizeof(SLOT)))
81 
82 #define SORT_SWAP_PTR(a,b) { char **temp; temp = a; a = b; b = temp; }
83 
84 #define SORT_CHECK_DUPLICATE(a, b) \
85  do { \
86  if (cmp == 0) { \
87  if (option == SORT_DUP) \
88  sort_append(a, b); \
89  *(a) = NULL; \
90  dup_num++; \
91  } \
92  } while (0)
93 
96 { /* node of the file_contents linked list */
97  int *num_pages; /* Dynamic array whose elements keep the sizes of the runs contained in the file in
98  * terms of number of slotted pages it occupies */
99  int num_slots; /* Total number of elements the array has */
100  int first_run; /* The index of the array element keeping the size of the first run of the file. */
101  int last_run; /* The index of the array element keeping the size of the last run of the file. */
102 };
103 
104 typedef struct vol_info VOL_INFO;
105 struct vol_info
106 { /* volume information */
107  INT16 volid; /* Volume identifier */
108  int file_cnt; /* Current number of files in the volume */
109 };
110 
111 typedef struct vol_list VOL_LIST;
112 struct vol_list
113 { /* temporary volume identifier list */
114  INT16 volid; /* main sorting disk volume */
115  int vol_ent_cnt; /* number of array entries */
116  int vol_cnt; /* number of temporary volumes */
117  VOL_INFO *vol_info; /* array of volume information */
118 };
119 
120 /* Parallel eXecution and communition node */
121 typedef struct px_tree_node PX_TREE_NODE;
123 {
124  int px_id; /* node ID */
125 #if defined(SERVER_MODE)
126  int px_status; /* node status; access through px_mtx */
127 #endif /* SERVER_MODE */
128 
129  int px_height; /* tournament tree: node level */
130  int px_myself; /* tournament tree: node ID */
131 
133 
134  void *px_arg; /* operation info */
135 
136  char **px_buff;
137  char **px_vector;
139 
140  char **px_result; /* output */
141  long px_result_size; /* output */
142 };
143 
144 typedef struct sort_param SORT_PARAM;
146 {
147  VFID temp[2 * SORT_MAX_HALF_FILES]; /* Temporary file identifiers */
148  VFID multipage_file; /* Temporary file for multi page sorting records */
149  FILE_CONTENTS file_contents[2 * SORT_MAX_HALF_FILES]; /* Contents of each temporary file */
150 
151  bool tde_encrypted; /* whether related temp files are encrypted (TDE) or not */
152 
153  VOL_LIST vol_list; /* Temporary volume information list */
154  char *internal_memory; /* Internal_memory used for internal sorting phase and as input/output buffers for temp
155  * files during merging phase */
156  int tot_runs; /* Total number of runs */
157  int tot_buffers; /* Size of internal memory used in terms of number of buffers it occupies */
158  int tot_tempfiles; /* Total number of temporary files */
159  int half_files; /* Half number of temporary files */
160  int in_half; /* Which half of temp files is for input */
161 
162  void *out_arg; /* Arguments to pass to the output function */
163 
164  /* Comparison function to use in the internal sorting and the merging phases */
166  void *cmp_arg;
168 
169  /* output function to apply on temporary records */
171  void *put_arg;
172 
173  /* Estimated number of pages in each temp file (used in initialization) */
175 
176  /* Details about the "limit" clause */
177  int limit;
178 
179  /* support parallelism */
180 #if defined(SERVER_MODE)
181  pthread_mutex_t px_mtx; /* px_node status mutex */
182 #endif
183  int px_height_max; /* px_node tournament tree max level */
184  int px_array_size; /* px_node array size */
185  PX_TREE_NODE *px_array; /* px_node array */
186 };
187 
190 {
191  struct sort_rec_list *next; /* next sorted record item */
192  int rec_pos; /* record position */
193  bool is_duplicated; /* duplicated sort_key record flag */
194 }; /* Sort record list */
195 
198 {
199  INT16 nslots; /* Number of allocated slots for the page */
200  INT16 nrecs; /* Number of records on page */
201  INT16 anchor_flag; /* Valid ANCHORED, ANCHORED_DONT_REUSE_SLOTS UNANCHORED_ANY_SEQUENCE,
202  * UNANCHORED_KEEP_SEQUENCE */
203  INT16 alignment; /* Alignment for records. */
204  INT16 waste_align; /* Number of bytes waste because of alignment */
205  INT16 tfree; /* Total free space on page */
206  INT16 cfree; /* Contiguous free space on page */
207  INT16 foffset; /* Byte offset from the beginning of the page to the first free byte area on the page. */
208 };
209 
210 typedef struct slot SLOT;
211 struct slot
212 {
213  INT16 roffset; /* Byte Offset from the beginning of the page to the beginning of the record */
214  INT16 rlength; /* Length of record */
215  INT16 rtype; /* Record type described by slot. */
216 };
217 
218 typedef struct run_struct RUN;
220 {
221  long start;
222  long stop;
223 };
224 
225 typedef struct srun SRUN;
226 struct srun
227 {
228  char low_high; /* location info LOW('L') : otherbase HIGH('H') : base */
229  unsigned short tree_depth; /* depth of this node : leaf is 1 */
230  long start;
231  long stop;
232 };
233 
234 typedef struct sort_stack SORT_STACK;
236 {
237  int top;
239 };
240 
241 typedef void FIND_RUN_FN (char **, long *, SORT_STACK *, long, SORT_CMP_FUNC *, void *);
242 typedef void MERGE_RUN_FN (char **, char **, SORT_STACK *, SORT_CMP_FUNC *, void *);
243 
244 #if !defined(NDEBUG)
245 static int sort_validate (char **vector, long size, SORT_CMP_FUNC * compare, void *comp_arg);
246 #endif
247 static PX_TREE_NODE *px_sort_assign (THREAD_ENTRY * thread_p, SORT_PARAM * sort_param, int px_id, char **px_buff,
248  char **px_vector, long px_vector_size, int px_height, int px_myself);
249 static int px_sort_myself (THREAD_ENTRY * thread_p, PX_TREE_NODE * px_node);
250 #if defined(SERVER_MODE)
251 static int px_sort_communicate (PX_TREE_NODE * px_node);
252 #endif
253 
254 static int sort_inphase_sort (THREAD_ENTRY * thread_p, SORT_PARAM * sort_param, SORT_GET_FUNC * get_next,
255  void *arguments, unsigned int *total_numrecs);
257 static int sort_exphase_merge (THREAD_ENTRY * thread_p, SORT_PARAM * sort_param);
260 static int sort_add_new_file (THREAD_ENTRY * thread_p, VFID * vfid, int file_pg_cnt_est, bool force_alloc,
261  bool tde_encrypted);
262 
263 static int sort_write_area (THREAD_ENTRY * thread_p, VFID * vfid, int first_page, INT32 num_pages, char *area_start);
264 static int sort_read_area (THREAD_ENTRY * thread_p, VFID * vfid, int first_page, INT32 num_pages, char *area_start);
265 
266 static int sort_get_num_half_tmpfiles (int tot_buffers, int input_pages);
269 static int sort_find_inbuf_size (int tot_buffers, int in_sections);
270 static char *sort_retrieve_longrec (THREAD_ENTRY * thread_p, RECDES * address, RECDES * memory);
271 static char **sort_run_sort (THREAD_ENTRY * thread_p, SORT_PARAM * sort_param, char **base, long limit,
272  long sort_numrecs, char **otherbase, long *srun_limit);
275 static void sort_run_flip (char **start, char **stop);
276 static void sort_run_find (char **source, long *top, SORT_STACK * st_p, long limit, SORT_CMP_FUNC * compare,
277  void *comp_arg, SORT_DUP_OPTION option);
278 static void sort_run_merge (char **low, char **high, SORT_STACK * st_p, SORT_CMP_FUNC * compare, void *comp_arg,
280 static int sort_run_flush (THREAD_ENTRY * thread_p, SORT_PARAM * sort_param, int out_curfile, int *cur_page,
281  char *output_buffer, char **index_area, int numrecs, int rec_type);
282 
284 #if defined(CUBRID_DEBUG)
285 static void sort_print_file_contents (const FILE_CONTENTS * file_contents);
286 #endif /* CUBRID_DEBUG */
287 
288 static void sort_spage_initialize (PAGE_PTR pgptr, INT16 slots_type, INT16 alignment);
289 
290 static INT16 sort_spage_get_numrecs (PAGE_PTR pgptr);
291 static INT16 sort_spage_insert (PAGE_PTR pgptr, RECDES * recdes);
292 static SCAN_CODE sort_spage_get_record (PAGE_PTR pgptr, INT16 slotid, RECDES * recdes, bool peek_p);
293 static int sort_spage_offsetcmp (const void *s1, const void *s2);
294 static int sort_spage_compact (PAGE_PTR pgptr);
295 static INT16 sort_spage_find_free (PAGE_PTR pgptr, SLOT ** sptr, INT16 length, INT16 type, INT16 * space);
296 #if defined(CUBRID_DEBUG)
297 static INT16 sort_spage_dump_sptr (SLOT * sptr, INT16 nslots, INT16 alignment);
298 static void sort_spage_dump_hdr (SLOTTED_PAGE_HEADER * sphdr);
299 static void sort_spage_dump (PAGE_PTR pgptr, int rec_p);
300 #endif /* CUBRID_DEBUG */
301 
302 static void sort_append (const void *pk0, const void *pk1);
303 
304 /*
305  * sort_spage_initialize () - Initialize a slotted page
306  * return: void
307  * pgptr(in): Pointer to slotted page
308  * slots_type(in): Flag which indicates the type of slots
309  * alignment(in): Type of alignment
310  *
311  * Note: A slotted page must be initialized before records are inserted on the
312  * page. The alignment indicates the valid offset where the records
313  * should be stored. This is a requirment for peeking records on pages
314  * according to their alignmnet restrictions.
315  */
316 static void
317 sort_spage_initialize (PAGE_PTR pgptr, INT16 slots_type, INT16 alignment)
318 {
319  SLOTTED_PAGE_HEADER *sphdr;
320 
321  sphdr = (SLOTTED_PAGE_HEADER *) pgptr;
322 
323  sphdr->nslots = 0;
324  sphdr->nrecs = 0;
325 
326 #if defined(CUBRID_DEBUG)
327  if (!spage_is_valid_anchor_type (slots_type))
328  {
329  (void) fprintf (stderr,
330  "sort_spage_initialize: **INTERFACE SYSTEM ERROR BAD value for slots_type = %d.\n"
331  " UNANCHORED_KEEP_SEQUENCE was assumed **\n", slots_type);
332  slots_type = UNANCHORED_KEEP_SEQUENCE;
333  }
334  if (!(alignment == CHAR_ALIGNMENT || alignment == SHORT_ALIGNMENT || alignment == INT_ALIGNMENT
335  || alignment == LONG_ALIGNMENT || alignment == FLOAT_ALIGNMENT || alignment == DOUBLE_ALIGNMENT))
336  {
337  (void) fprintf (stderr,
338  "sort_spage_initialize: **INTERFACE SYSTEM ERROR BAD value = %d"
339  " for alignment. %d was assumed\n. Alignment must be"
340  " either SIZEOF char, short, int, long, float, or double", alignment, MAX_ALIGNMENT);
341  alignment = MAX_ALIGNMENT;
342  }
343 #endif /* CUBRID_DEBUG */
344 
345  sphdr->anchor_flag = slots_type;
346  sphdr->tfree = DB_PAGESIZE - sizeof (SLOTTED_PAGE_HEADER);
347  sphdr->cfree = sphdr->tfree;
348  sphdr->foffset = sizeof (SLOTTED_PAGE_HEADER);
349  sphdr->alignment = alignment;
350  sphdr->waste_align = DB_WASTED_ALIGN (sphdr->foffset, alignment);
351 
352  if (sphdr->waste_align != 0)
353  {
354  sphdr->foffset += sphdr->waste_align;
355  sphdr->cfree -= sphdr->waste_align;
356  sphdr->tfree -= sphdr->waste_align;
357  }
358 }
359 
360 /*
361  * sort_spage_get_numrecs () - Return the total number of records on the slotted page
362  * return:
363  * pgptr(in): Pointer to slotted page
364  */
365 static INT16
367 {
368  SLOTTED_PAGE_HEADER *sphdr;
369 
370  sphdr = (SLOTTED_PAGE_HEADER *) pgptr;
371 
372  return sphdr->nrecs;
373 }
374 
375 /*
376  * sort_spage_offsetcmp () - Compare the location (offset) of slots
377  * return: sp1 - sp2
378  * sp1(in): slot 1
379  * sp2(in): slot 2
380  */
381 static int
382 sort_spage_offsetcmp (const void *sp1, const void *sp2)
383 {
384  SLOT *s1, *s2;
385  INT16 l1, l2;
386 
387  s1 = *(SLOT **) sp1;
388  s2 = *(SLOT **) sp2;
389 
390  l1 = s1->roffset;
391  l2 = s2->roffset;
392 
393  return (l1 < l2) ? -1 : (l1 == l2) ? 0 : 1;
394 }
395 
396 /*
397  * sort_spage_compact () - Compact an slotted page
398  * return: NO_ERROR
399  * pgptr(in): Pointer to slotted page
400  *
401  * Note: Only the records are compacted, the slots are not compacted.
402  */
403 static int
405 {
406  int i, j;
407  SLOTTED_PAGE_HEADER *sphdr;
408  SLOT *sptr;
409  SLOT **sortptr;
410  INT16 to_offset;
411  int ret = NO_ERROR;
412 
413  sphdr = (SLOTTED_PAGE_HEADER *) pgptr;
414 
415  sortptr = (SLOT **) calloc ((unsigned) sphdr->nrecs, sizeof (SLOT *));
416  if (sortptr == NULL)
417  {
420  }
421 
422  /* Populate the array to sort and then sort it */
423  sptr = (SLOT *) ((char *) pgptr + DB_PAGESIZE - sizeof (SLOT));
424  for (j = 0, i = 0; i < sphdr->nslots; sptr--, i++)
425  {
426  if (sptr->roffset != NULL_OFFSET)
427  {
428  sortptr[j++] = sptr;
429  }
430  }
431 
432  qsort ((void *) sortptr, (size_t) sphdr->nrecs, (size_t) sizeof (SLOT *), sort_spage_offsetcmp);
433 
434  to_offset = sizeof (SLOTTED_PAGE_HEADER);
435  for (i = 0; i < sphdr->nrecs; i++)
436  {
437  /* Make sure that the offset is aligned */
438  to_offset = DB_ALIGN (to_offset, sphdr->alignment);
439  if (to_offset == sortptr[i]->roffset)
440  {
441  /* Record slot is already in place */
442  to_offset += sortptr[i]->rlength;
443  }
444  else
445  {
446  /* Move the record */
447  memmove ((char *) pgptr + to_offset, (char *) pgptr + sortptr[i]->roffset, sortptr[i]->rlength);
448  sortptr[i]->roffset = to_offset;
449  to_offset += sortptr[i]->rlength;
450  }
451  }
452 
453  /* Make sure that the next inserted record will be aligned */
454  to_offset = DB_ALIGN (to_offset, sphdr->alignment);
455 
456  sphdr->cfree = sphdr->tfree = (DB_PAGESIZE - to_offset - sphdr->nslots * sizeof (SLOT));
457  sphdr->foffset = to_offset;
458  free_and_init (sortptr);
459 
460  /* The page is set dirty somewhere else */
461 
462  return ret;
463 }
464 
465 /*
466  * sort_spage_find_free () - Find a free area/slot where a record of the given length
467  * can be inserted onto the given slotted page
468  * return: A slot identifier or NULL_SLOTID
469  * pgptr(in): Pointer to slotted page
470  * sptr(out): Pointer to slotted page array pointer
471  * length(in): Length of area/record
472  * type(in): Type of record to be inserted
473  * space(out): Space used/defined
474  *
475  * Note: If there is not enough space on the page, an error condition is
476  * indicated and NULLSLOTID is returned.
477  */
478 static INT16
479 sort_spage_find_free (PAGE_PTR pgptr, SLOT ** sptr, INT16 length, INT16 type, INT16 * space)
480 {
481  SLOTTED_PAGE_HEADER *sphdr;
482  INT16 slotid;
483  INT16 waste;
484 
485  sphdr = (SLOTTED_PAGE_HEADER *) pgptr;
486 
487  /* Calculate the wasting space that this record will introduce. We need to take in consideration the wasting space
488  * when there is space saved */
489  waste = DB_WASTED_ALIGN (length, sphdr->alignment);
490  *space = length + waste;
491 
492  /* Quickly check for available space. We may need to check again if a slot is created (instead of reused) */
493  if (*space > sphdr->tfree)
494  {
495  *sptr = NULL;
496  *space = 0;
497  return NULL_SLOTID;
498  }
499 
500  /* Find a free slot. Try to reuse an unused slotid, instead of allocating a new one */
501 
502  *sptr = (SLOT *) ((char *) pgptr + DB_PAGESIZE - sizeof (SLOT));
503 
504  if (sphdr->nslots == sphdr->nrecs)
505  {
506  slotid = sphdr->nslots;
507  *sptr -= slotid;
508  }
509  else
510  {
511  for (slotid = 0; slotid < sphdr->nslots; (*sptr)--, slotid++)
512  {
513  if ((*sptr)->rtype == REC_HOME || (*sptr)->rtype == REC_BIGONE)
514  {
515  ; /* go ahead */
516  }
517  else
518  { /* impossible case */
519  assert (false);
520  break;
521  }
522  }
523  }
524 
525  /* Make sure that there is enough space for the record and the slot */
526 
527  assert (slotid <= sphdr->nslots);
528 
529  if (slotid >= sphdr->nslots)
530  {
531  *space += sizeof (SLOT);
532  /* We are allocating a new slotid. Check for available space again */
533  if (*space > sphdr->tfree)
534  {
535  *sptr = NULL;
536  *space = 0;
537  return NULL_SLOTID;
538  }
539  else if (*space > sphdr->cfree && sort_spage_compact (pgptr) != NO_ERROR)
540  {
541  *sptr = NULL;
542  *space = 0;
543  return NULL_SLOTID;
544  }
545  /* Adjust the number of slots */
546  sphdr->nslots++;
547  }
548  else
549  {
550  /* We already know that there is total space available since the slot is reused and the space was check above */
551  if (*space > sphdr->cfree && sort_spage_compact (pgptr) != NO_ERROR)
552  {
553  *sptr = NULL;
554  *space = 0;
555  return NULL_SLOTID;
556  }
557  }
558 
559  /* Now separate an empty area for the record */
560  (*sptr)->roffset = sphdr->foffset;
561  (*sptr)->rlength = length;
562  (*sptr)->rtype = type;
563 
564  /* Adjust the header */
565  sphdr->nrecs++;
566  sphdr->tfree -= *space;
567  sphdr->cfree -= *space;
568  sphdr->foffset += length + waste;
569  sphdr->waste_align += waste;
570 
571  /* The page is set dirty somewhere else */
572 
573  return slotid;
574 }
575 
576 /*
577  * sort_spage_insert () - Insert a record onto the given slotted page
578  * return: A slot identifier
579  * pgptr(in): Pointer to slotted page
580  * recdes(in): Pointer to a record descriptor
581  *
582  * Note: If the record does not fit on the page, an error condition is
583  * indicated and NULL_SLOTID is returned.
584  */
585 static INT16
587 {
588  SLOTTED_PAGE_HEADER *sphdr;
589  SLOT *sptr;
590  INT16 slotid;
591  INT16 used_space;
592 
593  if (recdes->length > SORT_MAXREC_LENGTH)
594  {
595  return NULL_SLOTID;
596  }
597 
598  assert (recdes->type == REC_HOME || recdes->type == REC_BIGONE);
599 
600  slotid = sort_spage_find_free (pgptr, &sptr, recdes->length, recdes->type, &used_space);
601  if (slotid != NULL_SLOTID)
602  {
603  /* Find the free slot and insert the record */
604  memcpy (((char *) pgptr + sptr->roffset), recdes->data, recdes->length);
605 
606  /* Indicate that we are spending our savings */
607  sphdr = (SLOTTED_PAGE_HEADER *) pgptr;
608  }
609 
610  return slotid;
611 }
612 
613 /*
614  * sort_spage_get_record () - Get specific record
615  * return: S_SUCCESS, S_DOESNT_FIT, S_DOESNT_EXIST
616  * pgptr(in): Pointer to slotted page
617  * slotid(in): Slot identifier of current record
618  * recdes(in): Pointer to a record descriptor
619  * peek_p(in): Indicates whether the record is going to be copied or peeked
620  *
621  * Note: When ispeeking is PEEK, the desired available record is peeked onto
622  * the page. The address of the record descriptor is set to the portion
623  * of the buffer where the record is stored. Peeking a record should be
624  * executed with caution since the slotted module may decide to move
625  * the record around. In general, no other operation should be executed
626  * on the page until the peeking of the record is done. The page should
627  * be fixed and locked to avoid any funny behavior. RECORD should NEVER
628  * be MODIFIED DIRECTLY. Only reads should be performed, otherwise
629  * header information and other records may be corrupted.
630  *
631  * When ispeeking is COPY, the desired available record is
632  * read onto the area pointed by the record descriptor. If the record
633  * does not fit in such an area, the length of the record is returned
634  * as a negative value in recdes->length and an error is indicated in the
635  * return value.
636  */
637 static SCAN_CODE
638 sort_spage_get_record (PAGE_PTR pgptr, INT16 slotid, RECDES * recdes, bool peek_p)
639 {
640  SLOTTED_PAGE_HEADER *sphdr;
641  SLOT *sptr;
642 
643  sphdr = (SLOTTED_PAGE_HEADER *) pgptr;
644 
645  sptr = (SLOT *) ((char *) pgptr + DB_PAGESIZE - sizeof (SLOT));
646 
647  sptr -= slotid;
648 
649  if (slotid < 0 || slotid >= sphdr->nslots || sptr->roffset == NULL_OFFSET)
650  {
651  recdes->length = 0;
652  return S_DOESNT_EXIST;
653  }
654 
655  /*
656  * If peeking, the address of the data in the descriptor is set to the
657  * address of the record in the buffer. Otherwise, the record is copied
658  * onto the area specified by the descriptor
659  */
660  if (peek_p == PEEK)
661  {
662  recdes->area_size = -1;
663  recdes->data = (char *) pgptr + sptr->roffset;
664  }
665  else
666  {
667  /* copy the record */
668 
669  if (sptr->rlength > recdes->area_size)
670  {
671  /*
672  * DOES NOT FIT
673  * Give a hint to the user of the needed length. Hint is given as a
674  * negative value
675  */
676  recdes->length = -sptr->rlength;
677  return S_DOESNT_FIT;
678  }
679 
680  memcpy (recdes->data, (char *) pgptr + sptr->roffset, sptr->rlength);
681  }
682 
683  recdes->length = sptr->rlength;
684  recdes->type = sptr->rtype;
685 
686  return S_SUCCESS;
687 }
688 
689 #if defined(CUBRID_DEBUG)
690 /*
691  * sort_spage_dump_sptr () - Dump the slotted page array
692  * return: Total length of records
693  * sptr(in): Pointer to slotted page pointer array
694  * nslots(in): Number of slots
695  * alignment(in): Alignment for records
696  *
697  * Note: The content of the record is not dumped by this function.
698  * This function is used for debugging purposes.
699  */
700 static INT16
701 sort_spage_dump_sptr (SLOT * sptr, INT16 nslots, INT16 alignment)
702 {
703  int i;
704  INT16 waste;
705  INT16 total_length_records = 0;
706 
707  for (i = 0; i < nslots; sptr--, i++)
708  {
709  assert (sptr->rtype == REC_HOME || sptr->rtype == REC_BIGONE);
710  (void) fprintf (stdout, "\nSlot-id = %2d, offset = %4d, type = %s", i, sptr->roffset,
711  ((sptr->rtype == REC_HOME) ? "HOME" : (sptr->rtype ==
712  REC_BIGONE) ? "BIGONE" : "UNKNOWN-BY-CURRENT-MODULE"));
713 
714  if (sptr->roffset != NULL_OFFSET)
715  {
716  total_length_records += sptr->rlength;
717  waste = DB_WASTED_ALIGN (sptr->rlength, alignment);
718  (void) fprintf (stdout, ", length = %4d, waste = %d", sptr->rlength, waste);
719  }
720  (void) fprintf (stdout, "\n");
721  }
722 
723  return total_length_records;
724 }
725 
726 /*
727  * sort_spage_dump_hdr () - Dump an slotted page header
728  * return: void
729  * sphdr(in): Pointer to header of slotted page
730  *
731  * Note: This function is used for debugging purposes.
732  */
733 static void
734 sort_spage_dump_hdr (SLOTTED_PAGE_HEADER * sphdr)
735 {
736  (void) fprintf (stdout, "NUM SLOTS = %d, NUM RECS = %d, TYPE OF SLOTS = %s,\n", sphdr->nslots, sphdr->nrecs,
738 
739  (void) fprintf (stdout, "ALIGNMENT-TO = %s, WASTED AREA FOR ALIGNMENT = %d,\n",
740  spage_alignment_string (sphdr->alignment), spage_waste_align);
741 
742  (void) fprintf (stdout, "TOTAL FREE AREA = %d, CONTIGUOUS FREE AREA = %d, FREE SPACE OFFSET = %d,\n", sphdr->tfree,
743  sphdr->cfree, sphdr->foffset);
744 }
745 
746 /*
747  * sort_spage_dump () - Dump an slotted page
748  * return: void
749  * pgptr(in): Pointer to slotted page
750  * rec_p(in): If true, records are printed in ascii format, otherwise, the
751  * records are not printed
752  *
753  * Note: The records are printed only when the value of rec_p is true.
754  * This function is used for debugging purposes.
755  */
756 static void
757 sort_spage_dump (PAGE_PTR pgptr, int rec_p)
758 {
759  int i, j;
760  SLOTTED_PAGE_HEADER *sphdr;
761  SLOT *sptr;
762  char *rec;
763  int used_length = 0;
764 
765  sphdr = (SLOTTED_PAGE_HEADER *) pgptr;
766 
767  sort_spage_dump_hdr (sphdr);
768 
769  sptr = (SLOT *) ((char *) pgptr + DB_PAGESIZE - sizeof (SLOT));
770 
771  used_length = (sizeof (SLOTTED_PAGE_HEADER) + sphdr->waste_align + sizeof (SLOT) * sphdr->nslots);
772  used_length += sort_spage_dump_sptr (sptr, sphdr->nslots, sphdr->alignment);
773 
774  if (rec_p)
775  {
776  (void) fprintf (stdout, "\nRecords in ascii follow ...\n");
777  for (i = 0; i < sphdr->nslots; sptr--, i++)
778  {
779  if (sptr->roffset != NULL_OFFSET)
780  {
781  (void) fprintf (stdout, "\nSlot-id = %2d\n", i);
782  rec = (char *) pgptr + sptr->roffset;
783 
784  for (j = 0; j < sptr->rlength; j++)
785  {
786  (void) fputc (*rec++, stdout);
787  }
788 
789  (void) fprintf (stdout, "\n");
790  }
791  else
792  {
793  (void) fprintf (stdout, "\nSlot-id = %2d has been deleted\n", i);
794  }
795  }
796  }
797 
798  if (used_length + sphdr->tfree > DB_PAGESIZE)
799  {
800  (void) fprintf (stdout,
801  "sort_spage_dump: Inconsistent page \n (Used_space + tfree > DB_PAGESIZE\n (%d + %d) > %d \n "
802  " %d > %d\n", used_length, sphdr->tfree, DB_PAGESIZE, used_length + sphdr->tfree, DB_PAGESIZE);
803  }
804 
805  if ((sphdr->cfree + sphdr->foffset + sizeof (SLOT) * sphdr->nslots) > DB_PAGESIZE)
806  {
807  (void) fprintf (stdout,
808  "sort_spage_dump: Inconsistent page\n (cfree + foffset + SIZEOF(SLOT) * nslots) > "
809  " DB_PAGESIZE\n (%d + %d + (%d * %d)) > %d\n %d > %d\n", sphdr->cfree, sphdr->foffset,
810  sizeof (SLOT), sphdr->nslots, DB_PAGESIZE,
811  (sphdr->cfree + sphdr->foffset + sizeof (SLOT) * sphdr->nslots), DB_PAGESIZE);
812  }
813 
814  if (sphdr->cfree <= -(sphdr->alignment - 1))
815  {
816  (void) fprintf (stdout, "sort_spage_dump: Cfree %d is inconsistent in page. Cannot be < -%d\n", sphdr->cfree,
817  sphdr->alignment);
818  }
819 }
820 #endif /* CUBRID_DEBUG */
821 
822 /* Sorting module functions */
823 
824 /*
825  * sort_run_flip () - Flip a run in place
826  * return: void
827  * start(in):
828  * stop(in):
829  *
830  * Note: Odd runs will have middle pointer undisturbed.
831  */
832 static void
833 sort_run_flip (char **start, char **stop)
834 {
835  char *temp;
836 
837  while (start < stop)
838  {
839  temp = *start;
840  *start = *stop;
841  *stop = temp;
842  start++;
843  stop--;
844  }
845 
846  return;
847 }
848 
849 /*
850  * sort_append () -
851  * return: void
852  * pk0(in):
853  * pk1(in):
854  */
855 static void
856 sort_append (const void *pk0, const void *pk1)
857 {
858  SORT_REC *node, *list;
859 
860  node = *(SORT_REC **) pk0;
861  list = *(SORT_REC **) pk1;
862 
863  while (list->next)
864  {
865  list = list->next;
866  }
867  list->next = node;
868 
869  return;
870 }
871 
872 /*
873  * sort_run_find () - Finds the longest ascending or descending run it can
874  * return:
875  * source(in):
876  * top(in):
877  * st_p(in):
878  * limit(in):
879  * compare(in):
880  * comp_arg(in):
881  * option(in):
882  *
883  * Note: Flip descending run, and assign RUN start and stop
884  */
885 static void
886 sort_run_find (char **source, long *top, SORT_STACK * st_p, long limit, SORT_CMP_FUNC * compare, void *comp_arg,
888 {
889  char **start;
890  char **stop;
891  char **next_stop;
892  char **limit_p;
893  SRUN *srun_p;
894 
895  char **dup;
896  char **non_dup;
897  int dup_num;
898  int cmp;
899  bool increasing_order;
900 
901  /* init new SRUN */
902  st_p->top++;
903  srun_p = &(st_p->srun[st_p->top]);
904  srun_p->tree_depth = 1;
905  srun_p->low_high = 'H';
906  srun_p->start = *top;
907 
908  if (*top >= (limit - 1))
909  {
910  /* degenerate run length 1. Must go ahead and compare with length 2, because we may need to flip them */
911  srun_p->stop = limit - 1;
912  *top = limit;
913 
914  return;
915  }
916 
917  start = &source[*top];
918  stop = start + 1;
919  next_stop = stop + 1;
920  limit_p = &source[limit];
921 
922  dup_num = 0;
923 
924  /* have a non-trivial run of length 2 or more */
925  cmp = (*compare) (start, stop, comp_arg);
926  if (cmp > 0)
927  {
928  increasing_order = false; /* mark as non-increasing order run */
929 
930  while (next_stop < limit_p && ((cmp = (*compare) (stop, next_stop, comp_arg)) >= 0))
931  {
932  /* mark duplicate as NULL */
933  SORT_CHECK_DUPLICATE (stop, next_stop); /* increase dup_num */
934 
935  stop = next_stop;
936  next_stop = next_stop + 1;
937  }
938  }
939  else
940  {
941  increasing_order = true; /* mark as increasing order run */
942 
943  /* mark duplicate as NULL */
944  SORT_CHECK_DUPLICATE (start, stop); /* increase dup_num */
945 
946  /* build increasing order run */
947  while (next_stop < limit_p && ((cmp = (*compare) (stop, next_stop, comp_arg)) <= 0))
948  {
949  /* mark duplicate as NULL */
950  SORT_CHECK_DUPLICATE (stop, next_stop); /* increase dup_num */
951 
952  stop = next_stop;
953  next_stop = next_stop + 1;
954  }
955  }
956 
957  /* eliminate duplicates; right-shift slots */
958  if (dup_num)
959  {
960  dup = stop - 1;
961  for (non_dup = dup - 1; non_dup >= start; dup--, non_dup--)
962  {
963  /* find duplicated value slot */
964  if (*dup == NULL)
965  {
966  /* find previous non-duplicated value slot */
967  for (; non_dup >= start; non_dup--)
968  {
969  /* move non-duplicated value slot to duplicated value slot */
970  if (*non_dup != NULL)
971  {
972  *dup = *non_dup;
973  *non_dup = NULL;
974  break;
975  }
976  }
977  }
978  }
979  }
980 
981  /* change non-increasing order run to increasing order run */
982  if (increasing_order != true)
983  {
984  sort_run_flip (start + dup_num, stop);
985  }
986 
987  *top += CAST_BUFLEN (stop - start); /* advance to last visited */
988  srun_p->start += dup_num;
989  srun_p->stop = *top;
990 
991  (*top)++; /* advance to next unvisited element */
992 
993  return;
994 }
995 
996 /*
997  * sort_run_merge () - Merges two runs from source to dest, updateing dest_top
998  * return:
999  * low(in):
1000  * high(in):
1001  * st_p(in):
1002  * compare(in):
1003  * comp_arg(in):
1004  * option(in):
1005  */
1006 static void
1007 sort_run_merge (char **low, char **high, SORT_STACK * st_p, SORT_CMP_FUNC * compare, void *comp_arg,
1009 {
1010  char dest_low_high;
1011  char **left_start, **right_start;
1012  char **left_stop, **right_stop;
1013  char **dest_ptr;
1014  SRUN *left_srun_p, *right_srun_p;
1015  int cmp;
1016  int dup_num;
1017 
1018  do
1019  {
1020  /* STEP 1: initialize */
1021  left_srun_p = &(st_p->srun[st_p->top - 1]);
1022  right_srun_p = &(st_p->srun[st_p->top]);
1023 
1024  left_srun_p->tree_depth++;
1025 
1026  if (left_srun_p->low_high == 'L')
1027  {
1028  left_start = &low[left_srun_p->start];
1029  left_stop = &low[left_srun_p->stop];
1030  }
1031  else
1032  {
1033  left_start = &high[left_srun_p->start];
1034  left_stop = &high[left_srun_p->stop];
1035  }
1036 
1037  if (right_srun_p->low_high == 'L')
1038  {
1039  right_start = &low[right_srun_p->start];
1040  right_stop = &low[right_srun_p->stop];
1041  }
1042  else
1043  {
1044  right_start = &high[right_srun_p->start];
1045  right_stop = &high[right_srun_p->stop];
1046  }
1047 
1048  dup_num = 0;
1049 
1050  /* STEP 2: check CON conditions srun follows ascending order. if (left_max < right_min) do FORWARD-CON. we use
1051  * '<' instead of '<=' */
1052  cmp = (*compare) (left_stop, right_start, comp_arg);
1053  if (cmp < 0)
1054  {
1055  /* con == TRUE */
1056  dest_low_high = right_srun_p->low_high;
1057 
1058  if (left_srun_p->low_high == right_srun_p->low_high && left_srun_p->stop + 1 == right_srun_p->start)
1059  {
1060  ;
1061  }
1062  else
1063  {
1064  /* move LEFT to RIGHT's current PART */
1065  if (right_srun_p->low_high == 'L')
1066  {
1067  dest_ptr = &low[right_srun_p->start - 1];
1068  }
1069  else
1070  {
1071  dest_ptr = &high[right_srun_p->start - 1];
1072  }
1073 
1074  while (left_stop >= left_start)
1075  {
1076  /* copy LEFT */
1077  *dest_ptr-- = *left_stop--;
1078  }
1079  }
1080  }
1081  else
1082  {
1083  /* con == FALSE do the actual merge; right-shift merge slots */
1084  if (right_srun_p->low_high == 'L')
1085  {
1086  dest_low_high = 'H';
1087  dest_ptr = &high[right_srun_p->stop];
1088  }
1089  else
1090  {
1091  dest_low_high = 'L';
1092  dest_ptr = &low[right_srun_p->stop];
1093  }
1094 
1095  while (left_stop >= left_start && right_stop >= right_start)
1096  {
1097  cmp = (*compare) (left_stop, right_stop, comp_arg);
1098  if (cmp == 0)
1099  {
1100  /* eliminate duplicate */
1101  if (option == SORT_DUP)
1102  {
1103  sort_append (left_stop, right_stop);
1104  }
1105  dup_num++;
1106 
1107  *dest_ptr-- = *right_stop--;
1108  left_stop--;
1109  }
1110  else if (cmp > 0)
1111  {
1112  *dest_ptr-- = *left_stop--;
1113  }
1114  else
1115  {
1116  *dest_ptr-- = *right_stop--;
1117  }
1118  }
1119 
1120  while (left_stop >= left_start)
1121  {
1122  /* copy the rest of LEFT */
1123  *dest_ptr-- = *left_stop--;
1124  }
1125  while (right_stop >= right_start)
1126  {
1127  /* copy the rest of RIGHT */
1128  *dest_ptr-- = *right_stop--;
1129  }
1130  }
1131 
1132  /* STEP 3: reconfig SORT_STACK */
1133  st_p->top--;
1134  left_srun_p->low_high = dest_low_high;
1135  left_srun_p->start = right_srun_p->start - (left_srun_p->stop - left_srun_p->start + 1) + dup_num;
1136  left_srun_p->stop = right_srun_p->stop;
1137 
1138  }
1139  while ((st_p->top >= 1) /* may need to merge */
1140  && (st_p->srun[st_p->top - 1].tree_depth == st_p->srun[st_p->top].tree_depth));
1141 
1142  return;
1143 }
1144 
1145 /*
1146  * sort_run_sort () - An implementation of a run-sort algorithm
1147  * return: pointer to the sorted area
1148  * thread_p(in):
1149  * sort_param(in): sort parameters
1150  * base(in): pointer to the element at the base of the table
1151  * limit(in): numrecs of before current sort
1152  * sort_numrecs(in): numrecs of after privious sort
1153  * otherbase(in): pointer to alternate area suffecient to store base-limit
1154  * srun_limit(in): numrecs of after current sort
1155  *
1156  * Note: This sorts files by successive merging of runs.
1157  *
1158  * This has the advantage of being liner on sorted or reversed data,
1159  * and being order N log base k N, where k is the average length of a
1160  * run. Note that k must be at least 2, so the worst case is N log2 N.
1161  *
1162  * Overall, since long runs are common, this beats the pants off
1163  * quick-sort.
1164  *
1165  * This could be sped up a bit by looking for N runs, and sorting these
1166  * into lists of concatennatable runs.
1167  */
1168 static char **
1169 sort_run_sort (THREAD_ENTRY * thread_p, SORT_PARAM * sort_param, char **base, long limit, long sort_numrecs,
1170  char **otherbase, long *srun_limit)
1171 {
1172  SORT_CMP_FUNC *compare;
1173  void *comp_arg;
1175  char **src, **dest, **result;
1176  SORT_STACK sr_stack, *st_p;
1177  long src_top = 0;
1178  int cnt;
1179 
1180  assert_release (limit == *srun_limit);
1181 
1182  /* exclude already sorted items */
1183  limit -= sort_numrecs;
1184 
1185  if (limit == 0 || (limit == 1 && sort_numrecs == 0))
1186  {
1187  return base;
1188  }
1189 
1190  /* init */
1191  compare = sort_param->cmp_fn;
1192  comp_arg = sort_param->cmp_arg;
1193  option = sort_param->option;
1194 
1195  src = base;
1196  dest = otherbase;
1197  result = NULL;
1198 
1199  st_p = &sr_stack;
1200  st_p->top = -1;
1201 
1202  cnt = (int) (log10 (ceil ((double) limit / 2.0)) / log10 (2.0)) + 2;
1203  if (sort_numrecs)
1204  {
1205  /* reserve space for already found srun */
1206  cnt += 1;
1207  }
1208 
1209  st_p->srun = (SRUN *) db_private_alloc (NULL, cnt * sizeof (SRUN));
1210  if (st_p->srun == NULL)
1211  {
1213  return NULL;
1214  }
1215 
1216  do
1217  {
1218  sort_run_find (src, &src_top, st_p, limit, compare, comp_arg, option);
1219  if (src_top < limit)
1220  {
1221  sort_run_find (src, &src_top, st_p, limit, compare, comp_arg, option);
1222  }
1223 
1224  while ((st_p->top >= 1) /* may need to merge */
1225  && ((src_top >= limit) /* case 1: final merge stage */
1226  || ((src_top < limit) /* case 2: non-final merge stage */
1227  && (st_p->srun[st_p->top - 1].tree_depth == st_p->srun[st_p->top].tree_depth))))
1228  {
1229  sort_run_merge (dest, src, st_p, compare, comp_arg, option);
1230  }
1231  }
1232  while (src_top < limit);
1233 
1234  if (sort_numrecs > 0)
1235  {
1236  /* finally, merge with already found srun */
1237  SRUN *srun_p;
1238 
1239  srun_p = &(st_p->srun[++(st_p->top)]);
1240  srun_p->tree_depth = 1;
1241  srun_p->low_high = 'H';
1242  srun_p->start = limit;
1243  srun_p->stop = limit + sort_numrecs - 1;
1244 
1245  sort_run_merge (dest, src, st_p, compare, comp_arg, option);
1246  }
1247 
1248 #if defined(CUBRID_DEBUG)
1249  if (limit != src_top)
1250  {
1251  printf ("Inconsistent sort test d), %ld %ld\n", limit, src_top);
1252  }
1253 #endif /* CUBRID_DEBUG */
1254 
1255  /* include already sorted items */
1256  limit += sort_numrecs;
1257 
1258  /* save limit of non-duplicated value slot */
1259  *srun_limit = limit - st_p->srun[0].start;
1260 
1261  /* move base pointer */
1262  result = base + st_p->srun[0].start;
1263 
1264  if (st_p->srun[0].low_high == 'L')
1265  {
1266  if (option == SORT_ELIM_DUP)
1267  {
1268  memcpy (result, otherbase + st_p->srun[0].start, (*srun_limit) * sizeof (char *));
1269  }
1270  else
1271  {
1272  result = otherbase + st_p->srun[0].start;
1273  }
1274  }
1275 
1276  assert (result != NULL);
1277 
1278 #if defined(CUBRID_DEBUG)
1279  src_top = 0;
1280  src = result;
1281  sort_run_find (src, &src_top, st_p, *srun_limit, compare, comp_arg, option);
1282  if (st_p->srun[st_p->top].stop != *srun_limit - 1)
1283  {
1284  printf ("Inconsistent sort, %ld %ld %ld\n", st_p->srun[st_p->top].start, st_p->srun[st_p->top].stop, *srun_limit);
1285  result = NULL;
1286  }
1287 #endif /* CUBRID_DEBUG */
1288 
1290 
1291 #if !defined(NDEBUG)
1292  if (sort_validate (result, *srun_limit, compare, comp_arg) != NO_ERROR)
1293  {
1294  result = NULL;
1295  }
1296 #endif
1297 
1298  return result;
1299 }
1300 
1301 /*
1302  * sort_listfile () - Perform sorting
1303  * return:
1304  * volid(in): volume to keep the temporary files
1305  * est_inp_pg_cnt(in): estimated number of input pages, or -1
1306  * get_fn(in): user-supplied function: provides the next sort item (or,
1307  * temporary record) every time it is called. This function
1308  * should put the next sort item to the area pointed by the
1309  * record descriptor "temp_recdes" and should return SORT_SUCCESS
1310  * to acknowledge that everything is fine. However, if there is
1311  * a problem it should return a corresponding code.
1312  * For example, if the sort item does not fit into the
1313  * "temp_recdes" area it should return SORT_REC_DOESNT_FIT; and
1314  * if there are no more sort items then it should return
1315  * SORT_NOMORE_RECS. In case of an error, it should return
1316  * SORT_ERROR_OCCURRED,in addition to setting the corresponding
1317  * error code. (Note that in this case, the sorting process
1318  * will be aborted.)
1319  * get_arg(in): arguments to the get_fn function
1320  * put_fn(in): user-supplied function: provides the output operation to be
1321  * applied on the sorted items. This function should process
1322  * (in any way it chooses so) the sort item passed via the
1323  * record descriptor "temp_recdes" and return NO_ERROR to be
1324  * called again with the next item on the sorted order. If it
1325  * returns any error code then the sorting process is aborted.
1326  * put_arg(in): arguments to the put_fn function
1327  * cmp_fn(in): user-supplied function for comparing records to be sorted.
1328  * This function is expected to follow the strcmp() protocol
1329  * for return results: -1 means the first arg precedes the
1330  * second, 1 means the second precedes the first, and 0 means
1331  * neither precedes the other.
1332  * cmp_arg(in): arguments to the cmp_fn function
1333  * option(in):
1334  * limit(in): optional arg, can represent the limit clause. If we only want
1335  * the top K elements of a processed list, it makes sense to use
1336  * special optimizations instead of sorting the entire list and
1337  * returning the first K elements.
1338  * Parameter: -1 if not to be used, > 0 if it should be taken
1339  * into account. Zero is a reserved value.
1340  * includes_tde_class(in): whether tde-configured class data is included or not,
1341  * it determines whehter internal temp files are
1342  * encrypted or not.
1343  */
1344 int
1345 sort_listfile (THREAD_ENTRY * thread_p, INT16 volid, int est_inp_pg_cnt, SORT_GET_FUNC * get_fn, void *get_arg,
1346  SORT_PUT_FUNC * put_fn, void *put_arg, SORT_CMP_FUNC * cmp_fn, void *cmp_arg, SORT_DUP_OPTION option,
1347  int limit, bool includes_tde_class)
1348 {
1349  int error = NO_ERROR;
1351  bool prm_enable_sort_parallel = false; /* TODO - PRM_SORT_PARALLEL_SORT */
1352  INT32 input_pages;
1353  int i;
1354  int file_pg_cnt_est;
1355  unsigned int total_numrecs = 0;
1356 #if defined(SERVER_MODE)
1357  int num_cpus;
1358  int rv;
1359 #endif /* SERVER_MODE */
1360 
1361  thread_set_sort_stats_active (thread_p, true);
1362 
1363 #if defined(ENABLE_SYSTEMTAP)
1364  CUBRID_SORT_START ();
1365 #endif /* ENABLE_SYSTEMTAP */
1366 
1367  sort_param = (SORT_PARAM *) malloc (sizeof (SORT_PARAM));
1368  if (sort_param == NULL)
1369  {
1370  error = ER_OUT_OF_VIRTUAL_MEMORY;
1371  er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, error, 1, sizeof (SORT_PARAM));
1372  return error;
1373  }
1374 
1375 #if defined(SERVER_MODE)
1376  rv = pthread_mutex_init (&(sort_param->px_mtx), NULL);
1377  if (rv != 0)
1378  {
1379  error = ER_CSS_PTHREAD_MUTEX_INIT;
1380  er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, error, 0);
1381 
1382  free_and_init (sort_param);
1383 
1384  return error;
1385  }
1386 #endif /* SERVER_MODE */
1387 
1388  sort_param->cmp_fn = cmp_fn;
1389  sort_param->cmp_arg = cmp_arg;
1390  sort_param->option = option;
1391 
1392  sort_param->put_fn = put_fn;
1393  sort_param->put_arg = put_arg;
1394 
1395  sort_param->limit = limit;
1396  sort_param->tot_tempfiles = 0;
1397 
1398  /* initialize memory allocable fields */
1399  for (i = 0; i < 2 * SORT_MAX_HALF_FILES; i++)
1400  {
1401  sort_param->temp[i].volid = NULL_VOLID;
1402  sort_param->file_contents[i].num_pages = NULL;
1403  }
1404  sort_param->internal_memory = NULL;
1405  sort_param->px_height_max = sort_param->px_array_size = 0;
1406  sort_param->px_array = NULL;
1407 
1408  /* initialize temp. overflow file. Real value will be assigned in sort_inphase_sort function, if long size sorting
1409  * records are encountered. */
1410  sort_param->multipage_file.volid = NULL_VOLID;
1411  sort_param->multipage_file.fileid = NULL_FILEID;
1412 
1413  /* NOTE: This volume list will not be used any more. */
1414  /* initialize temporary volume list */
1415  sort_param->vol_list.volid = volid;
1416  sort_param->vol_list.vol_ent_cnt = 0;
1417  sort_param->vol_list.vol_cnt = 0;
1418  sort_param->vol_list.vol_info = NULL;
1419 
1420  if (est_inp_pg_cnt > 0)
1421  {
1422  /* 10% of overhead and fragmentation */
1423  input_pages = est_inp_pg_cnt + MAX ((int) (est_inp_pg_cnt * 0.1), 2);
1424  }
1425  else
1426  {
1427  input_pages = prm_get_integer_value (PRM_ID_SR_NBUFFERS);
1428  }
1429 
1430  /* The size of a sort buffer is limited to PRM_SR_NBUFFERS. */
1431  sort_param->tot_buffers = MIN (prm_get_integer_value (PRM_ID_SR_NBUFFERS), input_pages);
1432  sort_param->tot_buffers = MAX (4, sort_param->tot_buffers);
1433 
1434  sort_param->internal_memory = (char *) malloc ((size_t) sort_param->tot_buffers * (size_t) DB_PAGESIZE);
1435  if (sort_param->internal_memory == NULL)
1436  {
1437  sort_param->tot_buffers = 4;
1438 
1439  sort_param->internal_memory = (char *) malloc (sort_param->tot_buffers * DB_PAGESIZE);
1440  if (sort_param->internal_memory == NULL)
1441  {
1442  error = ER_OUT_OF_VIRTUAL_MEMORY;
1443  er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, error, 1, (size_t) (sort_param->tot_buffers * DB_PAGESIZE));
1444  goto cleanup;
1445  }
1446  }
1447 
1448  sort_param->half_files = sort_get_num_half_tmpfiles (sort_param->tot_buffers, input_pages);
1449  sort_param->tot_tempfiles = sort_param->half_files << 1;
1450  sort_param->in_half = 0;
1451 
1452  for (i = 0; i < sort_param->tot_tempfiles; i++)
1453  {
1454  /* Initilize temporary file identifier; real value will be set in "sort_add_new_file () */
1455  sort_param->temp[i].volid = NULL_VOLID;
1456 
1457  /* Initilize file contents list */
1458  sort_param->file_contents[i].num_pages =
1459  (int *) db_private_alloc (thread_p, SORT_INITIAL_DYN_ARRAY_SIZE * sizeof (int));
1460  if (sort_param->file_contents[i].num_pages == NULL)
1461  {
1462  sort_param->tot_tempfiles = i;
1463  error = ER_OUT_OF_VIRTUAL_MEMORY;
1464  goto cleanup;
1465  }
1466 
1468  sort_param->file_contents[i].first_run = -1;
1469  sort_param->file_contents[i].last_run = -1;
1470  }
1471 
1472  /* Create only input temporary files make file and temporary volume page count estimates */
1473  sort_param->tmp_file_pgs = CEIL_PTVDIV (input_pages, sort_param->half_files);
1474  sort_param->tmp_file_pgs = MAX (1, sort_param->tmp_file_pgs);
1475 
1476  sort_param->tde_encrypted = includes_tde_class;
1477 
1478  sort_param->px_height_max = 0; /* init */
1479  sort_param->px_array_size = 1; /* init */
1480 
1481 #if 1 /* TODO - currently, disable parallelism */
1482  prm_enable_sort_parallel = false;
1483 #endif
1484 
1485 #if !defined(NDEBUG)
1486  er_log_debug (ARG_FILE_LINE, "TDE: sort_listfile(): tde_encrypted = %d\n", sort_param->tde_encrypted);
1487 #endif /* !NDEBUG */
1488 
1489 #if defined(SERVER_MODE)
1490  if (prm_enable_sort_parallel == true)
1491  {
1492  /* TODO - calc n, 2^^n get the number of CPUs TODO - fileio_os_sysconf really get #cores instead of #CPUs -
1493  * NEED MORE CONSIDERATION */
1494  num_cpus = fileio_os_sysconf ();
1495 
1496  sort_param->px_height_max = (int) sqrt ((double) num_cpus); /* n */
1497  sort_param->px_array_size = num_cpus; /* 2^^n */
1498 
1499  assert_release (sort_param->px_array_size == pow ((double) 2, (double) sort_param->px_height_max));
1500  }
1501 #endif /* SERVER_MODE */
1502 
1503  sort_param->px_array = (PX_TREE_NODE *) malloc (sort_param->px_array_size * sizeof (PX_TREE_NODE));
1504  if (sort_param->px_array == NULL)
1505  {
1506  error = ER_OUT_OF_VIRTUAL_MEMORY;
1507  er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, error, 1, (sort_param->px_array_size * sizeof (PX_TREE_NODE)));
1508  goto cleanup;
1509  }
1510 
1511  /*
1512  * Don't allocate any temp files yet, since we may not need them.
1513  * We'll allocate them on the fly as the need arises.
1514  *
1515  * However, indicate to file/disk manager of the approximate temporary
1516  * space that is going to be needed.
1517  */
1518 
1519  error = sort_inphase_sort (thread_p, sort_param, get_fn, get_arg, &total_numrecs);
1520  if (error != NO_ERROR)
1521  {
1522  goto cleanup;
1523  }
1524 
1525  if (sort_param->tot_runs > 1)
1526  {
1527  /* Create output temporary files make file and temporary volume page count estimates */
1528  file_pg_cnt_est = sort_get_avg_numpages_of_nonempty_tmpfile (sort_param);
1529  file_pg_cnt_est = MAX (1, file_pg_cnt_est);
1530 
1531  for (i = sort_param->half_files; i < sort_param->tot_tempfiles; i++)
1532  {
1533  error =
1534  sort_add_new_file (thread_p, &(sort_param->temp[i]), file_pg_cnt_est, true, sort_param->tde_encrypted);
1535  if (error != NO_ERROR)
1536  {
1537  goto cleanup;
1538  }
1539  }
1540 
1541  if (sort_param->option == SORT_ELIM_DUP)
1542  {
1543  error = sort_exphase_merge_elim_dup (thread_p, sort_param);
1544  }
1545  else
1546  {
1547  /* SORT_DUP */
1548  error = sort_exphase_merge (thread_p, sort_param);
1549  }
1550  } /* if (sort_param->tot_runs > 1) */
1551 
1552 cleanup:
1553 
1554 #if defined(ENABLE_SYSTEMTAP)
1555  CUBRID_SORT_END (total_numrecs, error);
1556 #endif /* ENABLE_SYSTEMTAP */
1557 
1558  sort_return_used_resources (thread_p, sort_param);
1559  sort_param = NULL;
1560  thread_set_sort_stats_active (thread_p, false);
1561 
1562  return error;
1563 }
1564 
1565 #if !defined(NDEBUG)
1566 /*
1567  * sort_validate() -
1568  * return:
1569  * vector(in):
1570  * size(in):
1571  * compare(in):
1572  * comp_arg(in):
1573  *
1574  * NOTE: debugging function
1575  */
1576 static int
1577 sort_validate (char **vector, long size, SORT_CMP_FUNC * compare, void *comp_arg)
1578 {
1579  long k;
1580  int cmp;
1581 
1582  assert (vector != NULL);
1583  assert (size >= 1);
1584  assert (compare != NULL);
1585 
1586 #if 1 /* TODO - for QAF, do not delete me */
1587  return NO_ERROR;
1588 #endif
1589 
1590  for (k = 0; k < size - 1; k++)
1591  {
1592  cmp = (*compare) (&(vector[k]), &(vector[k + 1]), comp_arg);
1593  if (cmp != DB_LT)
1594  {
1595  assert (false);
1596  return ER_FAILED;
1597  }
1598  }
1599 
1600  return NO_ERROR;
1601 }
1602 #endif
1603 
1604 /*
1605  * px_sort_assign() -
1606  * return:
1607  * thread_p(in):
1608  * sort_param(in): sort parameters
1609  * px_id(in):
1610  * px_buff(in):
1611  * px_vector(in):
1612  * px_vector_size(in):
1613  * px_height(in):
1614  * px_myself(in):
1615  *
1616  * NOTE: support parallelism
1617  */
1618 static PX_TREE_NODE *
1619 px_sort_assign (THREAD_ENTRY * thread_p, SORT_PARAM * sort_param, int px_id, char **px_buff, char **px_vector,
1620  long px_vector_size, int px_height, int px_myself)
1621 {
1622  PX_TREE_NODE *px_node;
1623 #if defined(SERVER_MODE)
1624  int rv = NO_ERROR;
1625 #endif
1626 
1627  assert (sort_param != NULL);
1628 
1629  if (px_height < 0 || px_height > sort_param->px_height_max)
1630  {
1631  assert_release (false);
1632  return NULL;
1633  }
1634 
1635  if (px_id < 0 || px_id >= sort_param->px_array_size)
1636  {
1637  assert_release (false);
1638  return NULL;
1639  }
1640 
1641  if (px_myself < 0 || px_myself > px_id)
1642  {
1643  assert_release (false);
1644  return NULL;
1645  }
1646 
1647  px_node = &(sort_param->px_array[px_id]);
1648 
1649 #if defined(SERVER_MODE)
1650  rv = pthread_mutex_lock (&(sort_param->px_mtx));
1651  assert (rv == NO_ERROR);
1652 
1653 #if !defined(NDEBUG)
1654  if (px_node->px_status != 0)
1655  {
1656  assert (false);
1657  pthread_mutex_unlock (&(sort_param->px_mtx));
1658  return NULL;
1659  }
1660 #endif
1661 
1662  px_node->px_status = 0;
1663 
1664  pthread_mutex_unlock (&(sort_param->px_mtx));
1665 #else /* SERVER_MODE */
1666  assert (sort_param->px_height_max == 0);
1667  assert (sort_param->px_array_size == 1);
1668 
1669  assert (px_id == 0);
1670  assert (px_height == 0);
1671  assert (px_myself == 0);
1672 #endif /* SERVER_MODE */
1673 
1674  /* set node info */
1675 
1676  px_node->px_id = px_id;
1677 
1678  px_node->px_height = px_height;
1679  px_node->px_myself = px_myself;
1680 
1681  px_node->px_tran_index = LOG_FIND_THREAD_TRAN_INDEX (thread_p);
1682 
1683  /* set operation info */
1684 
1685  px_node->px_arg = (void *) sort_param;
1686 
1687  px_node->px_buff = px_buff;
1688  px_node->px_vector = px_vector;
1689  px_node->px_vector_size = px_vector_size;
1690 
1691  px_node->px_result = px_node->px_vector; /* init */
1692  px_node->px_result_size = px_node->px_vector_size; /* init */
1693 
1694  return px_node;
1695 }
1696 
1697 #if defined(SERVER_MODE)
1698 // *INDENT-OFF*
1699 static void
1700 px_sort_myself_execute (cubthread::entry &thread_ref, PX_TREE_NODE * px_node)
1701 {
1702  (void) px_sort_myself (&thread_ref, px_node);
1703 }
1704 
1705 /*
1706  * px_sort_communicate() -
1707  * return:
1708  * thread_p(in):
1709  * px_node(in):
1710  *
1711  * NOTE: support parallelism
1712  */
1713 static int
1714 px_sort_communicate (PX_TREE_NODE * px_node)
1715 {
1717 
1718  assert_release (px_node != NULL);
1719  assert_release (px_node->px_arg != NULL);
1720 
1721  sort_param = (SORT_PARAM *) (px_node->px_arg);
1722  assert_release (px_node->px_height <= sort_param->px_height_max);
1723  assert_release (px_node->px_id < sort_param->px_array_size);
1724  assert_release (px_node->px_vector_size > 1);
1725 
1727  new cubthread::entry_callable_task (std::bind (px_sort_myself_execute, std::placeholders::_1, px_node));
1729 
1730  return NO_ERROR;
1731 }
1732 // *INDENT-ON*
1733 #endif /* SERVER_MODE */
1734 
1735 /*
1736  * px_sort_myself() -
1737  * return:
1738  * thread_p(in):
1739  * px_node(in):
1740  *
1741  * NOTE: support parallelism
1742  *
1743  * Partitioned merge logic
1744  *
1745  * The working core: each internal node recurses on this function
1746  * both for its left side and its right side, as nodes one closer to
1747  * the leaf level. It then merges the results into the vector
1748  *
1749  * Leaf level nodes just sort the vector.
1750  */
1751 static int
1753 {
1754 #define SORT_PARTITION_RUN_SIZE_MIN (ONE_M)
1755 
1756  int ret = NO_ERROR;
1757  bool old_check_interrupt;
1758 
1759 #if defined(SERVER_MODE)
1760  int parent;
1761  int child_right = 0;
1762  int child_height;
1763 
1764  int cmp;
1765 
1766  int rv = NO_ERROR;
1767 #endif /* SERVER_MODE */
1768 
1770 
1771  char **buff;
1772  char **vector;
1773  long vector_size;
1774 
1775  char **result = NULL;
1776  long result_size = -1;
1777 
1778  assert_release (px_node != NULL);
1779  assert_release (px_node->px_id >= 0);
1780  assert_release (px_node->px_arg != NULL);
1781 
1782  if (thread_p == NULL)
1783  {
1784  thread_p = thread_get_thread_entry_info ();
1785  }
1786 
1787  sort_param = (SORT_PARAM *) (px_node->px_arg);
1788 
1789 #if defined(SERVER_MODE)
1790  if (px_node->px_id > 0)
1791  {
1792  /* is new childs */
1793  thread_p->tran_index = px_node->px_tran_index;
1794  pthread_mutex_unlock (&thread_p->tran_index_lock);
1795  }
1796 
1797 #if !defined(NDEBUG)
1798  rv = pthread_mutex_lock (&(sort_param->px_mtx));
1799  assert (rv == NO_ERROR);
1800 
1801  assert (px_node->px_status == 0);
1802 
1803  pthread_mutex_unlock (&(sort_param->px_mtx));
1804 #endif
1805 #endif /* SERVER_MODE */
1806 
1807  old_check_interrupt = logtb_set_check_interrupt (thread_p, false);
1808 
1809  buff = px_node->px_buff;
1810  vector = px_node->px_vector;
1811  vector_size = px_node->px_vector_size;
1812 
1813  assert_release (px_node->px_result == vector);
1814  assert_release (px_node->px_result_size == vector_size);
1815 
1816  result = px_node->px_result;
1817  result_size = px_node->px_result_size;
1818 
1819  assert_release (vector_size > 0);
1820 
1821 #if defined(SERVER_MODE)
1822  parent = px_node->px_id & ~(1 << px_node->px_height);
1823  child_height = px_node->px_height - 1;
1824  if (child_height >= 0)
1825  {
1826  child_right = px_node->px_myself | (1 << child_height);
1827  assert_release (child_right > 0);
1828  }
1829 
1830  if (vector_size <= 1)
1831  {
1832  assert_release (px_node->px_result == vector);
1833  assert_release (px_node->px_result_size == vector_size);
1834 
1835  result = px_node->px_result = vector;
1836  result_size = px_node->px_result_size = vector_size;
1837 
1838  /* mark as finished */
1839 
1840  rv = pthread_mutex_lock (&(sort_param->px_mtx));
1841  assert (rv == NO_ERROR);
1842 
1843  assert_release (px_node->px_status == 0);
1844  px_node->px_status = 1; /* done */
1845 
1846  pthread_mutex_unlock (&(sort_param->px_mtx));
1847 
1848  goto exit_on_end;
1849  }
1850 
1851  if (px_node->px_height > 0 && vector_size > SORT_PARTITION_RUN_SIZE_MIN)
1852  {
1853  long left_vector_size, right_vector_size;
1854  char **left_vector, **right_vector;
1855  PX_TREE_NODE *left_px_node, *right_px_node;
1856  int i, j, k; /* Used in the merge logic */
1857 
1858  assert_release (child_right > 0);
1859 
1860  left_vector_size = vector_size / 2;
1861  right_vector_size = vector_size - left_vector_size;
1862 
1863  assert_release (vector_size == left_vector_size + right_vector_size);
1864 
1865  /* do new child first */
1866  right_vector = vector + left_vector_size;
1867  right_px_node = px_sort_assign (thread_p, sort_param, px_node->px_id + child_right, buff + left_vector_size,
1868  right_vector, right_vector_size, child_height, 0 /* px_myself: set as root */ );
1869  if (right_px_node == NULL)
1870  {
1871  goto exit_on_error;
1872  }
1873 
1874  if (right_vector_size > 1)
1875  {
1876  /* launch new worker */
1877  if (px_sort_communicate (right_px_node) != NO_ERROR)
1878  {
1879  goto exit_on_error;
1880  }
1881  }
1882  else
1883  {
1884  /* mark as finished */
1885 
1886  rv = pthread_mutex_lock (&(sort_param->px_mtx));
1887  assert (rv == NO_ERROR);
1888 
1889  assert_release (right_px_node->px_status == 0);
1890  right_px_node->px_status = 1; /* done */
1891 
1892  pthread_mutex_unlock (&(sort_param->px_mtx));
1893  }
1894 
1895  left_vector = vector;
1896  left_px_node =
1897  px_sort_assign (thread_p, sort_param, px_node->px_id, buff, left_vector, left_vector_size, child_height,
1898  px_node->px_myself);
1899  if (left_px_node == NULL)
1900  {
1901  goto exit_on_error;
1902  }
1903 
1904  assert_release (px_node == left_px_node);
1905 #if !defined(NDEBUG)
1906  rv = pthread_mutex_lock (&(sort_param->px_mtx));
1907  assert (rv == NO_ERROR);
1908 
1909  assert (px_node->px_status == 0);
1910 
1911  pthread_mutex_unlock (&(sort_param->px_mtx));
1912 #endif
1913 
1914  if (left_vector_size > 1)
1915  {
1916  if (px_sort_myself (thread_p, left_px_node) != NO_ERROR)
1917  {
1918  goto exit_on_error;
1919  }
1920  }
1921 
1922  /* wait for right-child finished */
1923  do
1924  {
1925  rv = pthread_mutex_lock (&(sort_param->px_mtx));
1926  assert (rv == NO_ERROR);
1927 
1928  if (right_px_node->px_status != 0)
1929  {
1930  assert (right_px_node->px_status == 1);
1931  pthread_mutex_unlock (&(sort_param->px_mtx));
1932  break;
1933  }
1934 
1935  pthread_mutex_unlock (&(sort_param->px_mtx));
1936 
1937 #if 1 /* TODO */
1938  thread_sleep (10); /* 10 msec */
1939 #endif
1940  }
1941  while (1);
1942 
1943  assert_release (px_node == left_px_node);
1944 #if !defined(NDEBUG)
1945  rv = pthread_mutex_lock (&(sort_param->px_mtx));
1946  assert (rv == NO_ERROR);
1947 
1948  assert (right_px_node->px_status == 1);
1949  assert (px_node->px_status == 0);
1950 
1951  pthread_mutex_unlock (&(sort_param->px_mtx));
1952 #endif
1953 
1954  right_vector = right_px_node->px_result;
1955  right_vector_size = right_px_node->px_result_size;
1956  if (right_vector == NULL || right_vector_size < 0)
1957  {
1958  goto exit_on_error;
1959  }
1960 
1961  left_vector = left_px_node->px_result;
1962  left_vector_size = left_px_node->px_result_size;
1963  if (left_vector == NULL || left_vector_size < 0)
1964  {
1965  goto exit_on_error;
1966  }
1967 
1968  assert_release (vector_size >= left_vector_size + right_vector_size);
1969 
1970  result_size = px_node->px_result_size = left_vector_size + right_vector_size;
1971  if (left_vector < vector)
1972  {
1973  assert_release (left_vector + left_vector_size <= vector);
1974  result = px_node->px_result = vector;
1975  }
1976  else
1977  {
1978  result = px_node->px_result = buff;
1979  }
1980 
1981  /* Merge the two sub-results back into upper result */
1982 
1983  i = j = k = 0;
1984 
1985 #if 1
1986  /* STEP 2: check CON conditions if (left_max < right_min) do FORWARD-CON. we use '<' instead of '<=' */
1987  cmp = (*(sort_param->cmp_fn)) (&(left_vector[left_vector_size - 1]), &(right_vector[0]), sort_param->cmp_arg);
1988 
1989  if (cmp == DB_LT || cmp == DB_EQ || cmp == DB_GT)
1990  {
1991  ; /* ok */
1992  }
1993  else
1994  {
1995  assert_release (cmp == DB_LT || cmp == DB_EQ || cmp == DB_GT);
1996  goto exit_on_error;
1997  }
1998 
1999  if (cmp == DB_LT)
2000  {
2001  while (i < left_vector_size)
2002  {
2003  result[k++] = left_vector[i++];
2004  }
2005  while (j < right_vector_size)
2006  {
2007  result[k++] = right_vector[j++];
2008  }
2009  }
2010  else
2011  {
2012  /* STEP 3: check CON conditions if (right_max < left_min) do BACKWARD-CON. we use '<' instead of '<=' */
2013  cmp =
2014  (*(sort_param->cmp_fn)) (&(right_vector[right_vector_size - 1]), &(left_vector[0]), sort_param->cmp_arg);
2015 
2016  if (cmp == DB_LT || cmp == DB_EQ || cmp == DB_GT)
2017  {
2018  ; /* ok */
2019  }
2020  else
2021  {
2022  assert_release (cmp == DB_LT || cmp == DB_EQ || cmp == DB_GT);
2023  goto exit_on_error;
2024  }
2025 
2026  if (cmp == DB_LT)
2027  {
2028  while (j < right_vector_size)
2029  {
2030  result[k++] = right_vector[j++];
2031  }
2032  while (i < left_vector_size)
2033  {
2034  result[k++] = left_vector[i++];
2035  }
2036  }
2037  else
2038  {
2039 #endif
2040  /* STEP 4: do the actual merge */
2041  while (i < left_vector_size && j < right_vector_size)
2042  {
2043  cmp = (*(sort_param->cmp_fn)) (&(left_vector[i]), &(right_vector[j]), sort_param->cmp_arg);
2044 
2045  if (cmp == DB_LT || cmp == DB_EQ || cmp == DB_GT)
2046  {
2047  ; /* ok */
2048  }
2049  else
2050  {
2051  assert_release (cmp == DB_LT || cmp == DB_EQ || cmp == DB_GT);
2052  goto exit_on_error;
2053  }
2054 
2055  if (cmp == DB_EQ)
2056  {
2057  if (sort_param->option == SORT_DUP) /* allow duplicate */
2058  {
2059  sort_append (&(left_vector[i]), &(right_vector[j]));
2060  }
2061 
2062  result[k++] = right_vector[j++]; /* temp */
2063  i++; /* skip left-side dup */
2064  }
2065  else if (cmp == DB_GT)
2066  {
2067  result[k++] = right_vector[j++];
2068  }
2069  else
2070  {
2071  assert_release (cmp == DB_LT);
2072  result[k++] = left_vector[i++];
2073  }
2074  }
2075  while (i < left_vector_size)
2076  {
2077  result[k++] = left_vector[i++];
2078  }
2079  while (j < right_vector_size)
2080  {
2081  result[k++] = right_vector[j++];
2082  }
2083 #if 1
2084  } /* else */
2085  } /* else */
2086 #endif
2087 
2088  assert_release (result_size >= k);
2089 
2090  result_size = px_node->px_result_size = k;
2091 
2092 #if !defined(NDEBUG)
2093  if (sort_validate (result, result_size, sort_param->cmp_fn, sort_param->cmp_arg) != NO_ERROR)
2094  {
2095  goto exit_on_error;
2096  }
2097 #endif
2098  }
2099  else
2100  {
2101  result = px_node->px_result = sort_run_sort (thread_p, sort_param, vector, vector_size, 0 /* dummy */ ,
2102  buff, &(px_node->px_result_size));
2103  result_size = px_node->px_result_size;
2104  }
2105 
2106 #else /* SERVER_MODE */
2107 
2108  result = px_node->px_result = sort_run_sort (thread_p, sort_param, vector, vector_size, 0 /* dummy */ ,
2109  buff, &(px_node->px_result_size));
2110  result_size = px_node->px_result_size;
2111 
2112 #endif /* SERVER_MODE */
2113 
2114  if (result == NULL || result_size < 0)
2115  {
2116  assert_release (false);
2117  goto exit_on_error;
2118  }
2119 
2120 exit_on_end:
2121 
2122  assert_release (result == px_node->px_result);
2123  assert_release (result_size == px_node->px_result_size);
2124 
2125 #if defined(SERVER_MODE)
2126  if (parent != px_node->px_id)
2127  {
2128  /* mark as finished */
2129 
2130  rv = pthread_mutex_lock (&(sort_param->px_mtx));
2131  assert (rv == NO_ERROR);
2132 
2133  assert_release (px_node->px_status == 0);
2134  px_node->px_status = 1; /* done */
2135 
2136  pthread_mutex_unlock (&(sort_param->px_mtx));
2137  }
2138 #endif /* SERVER_MODE */
2139 
2140  (void) logtb_set_check_interrupt (thread_p, old_check_interrupt);
2141 
2142  return ret;
2143 
2144 exit_on_error:
2145 
2146  result = px_node->px_result = NULL;
2147  result_size = px_node->px_result_size = -1;
2148 
2149  ret = (ret == NO_ERROR && (ret = er_errid ()) == NO_ERROR) ? ER_FAILED : ret;
2150 
2151  goto exit_on_end;
2152 }
2153 
2154 /*
2155  * sort_inphase_sort () - Internal sorting phase
2156  * return:
2157  * sort_param(in): sort parameters
2158  * get_fn(in): user-supplied function: provides the temporary record for
2159  * the given input record
2160  * get_arg(in): arguments for get_fn
2161  * total_numrecs(out): records sorted
2162  *
2163  */
2164 static int
2165 sort_inphase_sort (THREAD_ENTRY * thread_p, SORT_PARAM * sort_param, SORT_GET_FUNC * get_fn, void *get_arg,
2166  unsigned int *total_numrecs)
2167 {
2168  /* Variables for the input file */
2169  SORT_STATUS status;
2170 
2171  /* Variables for the current output file */
2172  int out_curfile;
2173  char *output_buffer;
2174  int cur_page[SORT_MAX_HALF_FILES];
2175 
2176  /* Variables for the internal memory */
2177  RECDES temp_recdes;
2178  RECDES long_recdes; /* Record desc. for reading in long sorting records */
2179  char *item_ptr; /* Pointer to the first free location of the temp. records region of internal memory */
2180  long numrecs; /* Number of records kept in the internal memory */
2181  long sort_numrecs; /* Number of sort records kept in the internal memory */
2182  bool once_flushed = false;
2183  long saved_numrecs;
2184  char **saved_index_area;
2185  char **index_area; /* Part of internal memory keeping the addresses of records */
2186  char **index_buff; /* buffer area to sort indexes. */
2187  int i;
2188  int error = NO_ERROR;
2189 
2190  PX_TREE_NODE *px_node;
2191 #if defined (SERVER_MODE)
2192  int rv = NO_ERROR;
2193 #endif /* SERVER_MODE */
2194 
2195  assert (sort_param->half_files <= SORT_MAX_HALF_FILES);
2196 
2197  assert (sort_param->px_height_max >= 0);
2198  assert (sort_param->px_array_size >= 1);
2199 
2200  /* Initialize the current pages of all temp files to 0 */
2201  for (i = 0; i < sort_param->half_files; i++)
2202  {
2203  cur_page[i] = 0;
2204  }
2205 
2206  sort_param->tot_runs = 0;
2207  out_curfile = sort_param->in_half;
2208 
2209  output_buffer = sort_param->internal_memory + (sort_param->tot_buffers - 1) * DB_PAGESIZE;
2210 
2211  numrecs = 0;
2212  sort_numrecs = 0;
2213  saved_numrecs = 0;
2214  *total_numrecs = 0;
2215  saved_index_area = NULL;
2216  item_ptr = sort_param->internal_memory + SORT_RECORD_LENGTH_SIZE;
2217  index_area = (char **) (output_buffer - sizeof (char *));
2218  index_buff = index_area - 1;
2219  temp_recdes.area_size = SORT_MAXREC_LENGTH;
2220  temp_recdes.length = 0;
2221 
2222  long_recdes.area_size = 0;
2223  long_recdes.data = NULL;
2224 
2225  for (;;)
2226  {
2227  if ((char *) index_buff < item_ptr)
2228  {
2229  /* Internal memory is already full */
2230  status = SORT_REC_DOESNT_FIT;
2231  }
2232  else
2233  {
2234  /* Internal memory is not full; try to get the next item */
2235  temp_recdes.data = item_ptr;
2236  if (((int) ((char *) index_buff - item_ptr)) < SORT_MAXREC_LENGTH)
2237  {
2238  temp_recdes.area_size = (int) ((char *) index_buff - item_ptr) - (4 * sizeof (char *));
2239  }
2240 
2241  if (temp_recdes.area_size <= SSIZEOF (SORT_REC))
2242  {
2243  /* internal memory is not enough */
2244  status = SORT_REC_DOESNT_FIT;
2245  }
2246  else
2247  {
2248  status = (*get_fn) (thread_p, &temp_recdes, get_arg);
2249  /* There are no more input records; So, break the loop */
2250  if (status == SORT_NOMORE_RECS)
2251  {
2252  break;
2253  }
2254  }
2255  }
2256 
2257  switch (status)
2258  {
2259  case SORT_ERROR_OCCURRED:
2260  error = er_errid ();
2261  assert (error != NO_ERROR);
2262  goto exit_on_error;
2263 
2264  case SORT_REC_DOESNT_FIT:
2265  if (numrecs > 0)
2266  {
2267  /* Perform internal sorting and flush the run */
2268 
2269  index_area++;
2270 
2271  if (sort_numrecs == 0)
2272  {
2273  assert (sort_param->px_height_max >= 0);
2274  assert (sort_param->px_array_size >= 1);
2275 #if defined(SERVER_MODE)
2276  rv = pthread_mutex_lock (&(sort_param->px_mtx));
2277  assert (rv == NO_ERROR);
2278 
2279  for (i = 0; i < sort_param->px_array_size; i++)
2280  {
2281  sort_param->px_array[i].px_status = 0; /* init */
2282  }
2283 
2284  pthread_mutex_unlock (&(sort_param->px_mtx));
2285 #endif /* SERVER_MODE */
2286 
2287  px_node = px_sort_assign (thread_p, sort_param, 0, index_buff, index_area, numrecs,
2288  sort_param->px_height_max, 0 /* px_myself: set as root */ );
2289  if (px_node == NULL)
2290  {
2291  error = ER_FAILED;
2292  goto exit_on_error;
2293  }
2294 
2295  error = px_sort_myself (thread_p, px_node);
2296  if (error != NO_ERROR)
2297  {
2298  goto exit_on_error;
2299  }
2300 
2301  index_area = px_node->px_result;
2302  numrecs = px_node->px_result_size;
2303  *total_numrecs += numrecs;
2304  }
2305  else
2306  {
2307  index_area =
2308  sort_run_sort (thread_p, sort_param, index_area, numrecs, sort_numrecs, index_buff, &numrecs);
2309  *total_numrecs += numrecs;
2310  }
2311 
2312  if (index_area == NULL || numrecs < 0)
2313  {
2314  error = ER_FAILED;
2315  goto exit_on_error;
2316  }
2317 
2318  if (sort_param->option == SORT_ELIM_DUP)
2319  {
2320  /* reset item_ptr */
2321  item_ptr = index_area[0];
2322  for (i = 1; i < numrecs; i++)
2323  {
2324  if (index_area[i] > item_ptr)
2325  {
2326  item_ptr = index_area[i];
2327  }
2328  }
2329 
2331  }
2332 
2333  if (sort_param->option == SORT_ELIM_DUP
2334  && ((item_ptr - sort_param->internal_memory) + numrecs * SSIZEOF (SLOT) < DB_PAGESIZE)
2335  && temp_recdes.length <= SORT_MAXREC_LENGTH)
2336  {
2337  /* still, remaining key area enough; do not flush, go on internal sorting */
2338  index_buff = index_area - numrecs;
2339  index_area--;
2340  index_buff--; /* decrement once for pointer, */
2341  index_buff--; /* once for pointer buffer */
2342 
2343  /* must keep track because index_buff is used */
2344  /* to detect when sort buffer is full */
2345  temp_recdes.area_size = SORT_MAXREC_LENGTH;
2346 
2347  assert ((char *) index_buff >= item_ptr);
2348  }
2349  else
2350  {
2351  error =
2352  sort_run_flush (thread_p, sort_param, out_curfile, cur_page, output_buffer, index_area, numrecs,
2353  REC_HOME);
2354  if (error != NO_ERROR)
2355  {
2356  goto exit_on_error;
2357  }
2358 
2359  /* Prepare for the next internal sorting run */
2360 
2361  if (sort_param->tot_runs == 1)
2362  {
2363  once_flushed = true;
2364  saved_numrecs = numrecs;
2365  saved_index_area = index_area;
2366  }
2367 
2368  numrecs = 0;
2369  item_ptr = sort_param->internal_memory + SORT_RECORD_LENGTH_SIZE;
2370  index_area = (char **) (output_buffer - sizeof (char *));
2371  index_buff = index_area - 1;
2372  temp_recdes.area_size = SORT_MAXREC_LENGTH;
2373 
2374  /* Switch to the next Temp file */
2375  if (++out_curfile >= sort_param->half_files)
2376  {
2377  out_curfile = sort_param->in_half;
2378  }
2379  }
2380 
2381  /* save sorted record number at this stage */
2382  sort_numrecs = numrecs;
2383  }
2384 
2385  /* Check if the record would fit into a single slotted page. If not, take special action for this record. */
2386  if (temp_recdes.length > SORT_MAXREC_LENGTH)
2387  {
2388  /* TAKE CARE OF LONG RECORD as a separate RUN */
2389 
2390  if (long_recdes.area_size < temp_recdes.length)
2391  {
2392  /* read in the long record to a dynamic memory area */
2393  if (long_recdes.data)
2394  {
2395  free_and_init (long_recdes.data);
2396  }
2397 
2398  long_recdes.area_size = temp_recdes.length;
2399  long_recdes.data = (char *) malloc (long_recdes.area_size);
2400 
2401  if (long_recdes.data == NULL)
2402  {
2403  error = ER_OUT_OF_VIRTUAL_MEMORY;
2404  er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, error, 1, (size_t) long_recdes.area_size);
2405  goto exit_on_error;
2406  }
2407  }
2408 
2409  /* Obtain the long record */
2410  status = (*get_fn) (thread_p, &long_recdes, get_arg);
2411 
2412  if (status != SORT_SUCCESS)
2413  {
2414  /* Obtaining the long record has failed */
2415  if (status == SORT_REC_DOESNT_FIT || status == SORT_NOMORE_RECS)
2416  {
2417  /* This should never happen */
2418  error = ER_GENERIC_ERROR;
2420  }
2421  else
2422  {
2423  ASSERT_ERROR ();
2424  error = er_errid ();
2425  }
2426 
2427  assert (error != NO_ERROR);
2428 
2429  goto exit_on_error;
2430  }
2431 
2432  /* Put the record to the multipage area & put the pointer to internal memory area */
2433 
2434  /* If necessary create the multipage_file */
2435  if (sort_param->multipage_file.volid == NULL_VOLID)
2436  {
2437  TDE_ALGORITHM tde_algo = TDE_ALGORITHM_NONE;
2438  /* Create the multipage file */
2439  sort_param->multipage_file.volid = sort_param->temp[0].volid;
2440 
2441  error = file_create_temp (thread_p, 1, &sort_param->multipage_file);
2442  if (error != NO_ERROR)
2443  {
2444  ASSERT_ERROR ();
2445  goto exit_on_error;
2446  }
2447  if (sort_param->tde_encrypted)
2448  {
2450  }
2451  if (file_apply_tde_algorithm (thread_p, &sort_param->multipage_file, tde_algo) != NO_ERROR)
2452  {
2453  file_temp_retire (thread_p, &sort_param->multipage_file);
2454  ASSERT_ERROR ();
2455  goto exit_on_error;
2456  }
2457  }
2458 
2459  /* Create a multipage record for this long record : insert to multipage_file and put the pointer as the
2460  * first record in this run */
2461  if (overflow_insert (thread_p, &sort_param->multipage_file, (VPID *) item_ptr, &long_recdes, FILE_TEMP)
2462  != NO_ERROR)
2463  {
2464  ASSERT_ERROR_AND_SET (error);
2465  goto exit_on_error;
2466  }
2467 
2468  /* Update the pointers */
2469  SORT_RECORD_LENGTH (item_ptr) = sizeof (VPID);
2470  *index_area = item_ptr;
2471  numrecs++;
2472 
2473  error =
2474  sort_run_flush (thread_p, sort_param, out_curfile, cur_page, output_buffer, index_area, numrecs,
2475  REC_BIGONE);
2476  if (error != NO_ERROR)
2477  {
2478  goto exit_on_error;
2479  }
2480 
2481  /* Prepare for the next internal sorting run */
2482  numrecs = 0;
2483  item_ptr = sort_param->internal_memory + SORT_RECORD_LENGTH_SIZE;
2484  index_area = (char **) (output_buffer - sizeof (char *));
2485  index_buff = index_area - 1;
2486  temp_recdes.area_size = SORT_MAXREC_LENGTH;
2487 
2488  /* Switch to the next Temp file */
2489  if (++out_curfile >= sort_param->half_files)
2490  {
2491  out_curfile = sort_param->in_half;
2492  }
2493 
2494  /* save sorted record number at this stage */
2495  sort_numrecs = numrecs;
2496  }
2497  break;
2498 
2499  case SORT_SUCCESS:
2500  /* Proceed the pointers */
2501  SORT_RECORD_LENGTH (item_ptr) = temp_recdes.length;
2502  *index_area = item_ptr;
2503  numrecs++;
2504 
2505  index_area--;
2506  index_buff--; /* decrease once for pointer, once for pointer buffer */
2507  index_buff--; /* must keep track because index_buff is used to detect when sort buffer is full */
2508 
2509  item_ptr += DB_ALIGN (temp_recdes.length, MAX_ALIGNMENT) + SORT_RECORD_LENGTH_SIZE;
2510  break;
2511 
2512  default:
2513  /* This should never happen */
2514  error = ER_GENERIC_ERROR;
2516  goto exit_on_error;
2517  }
2518  }
2519 
2520  if (numrecs > 0)
2521  {
2522  /* The input file has finished; process whatever is left over in the internal memory */
2523 
2524  index_area++;
2525 
2526  if (sort_numrecs == 0)
2527  {
2528  assert (sort_param->px_height_max >= 0);
2529  assert (sort_param->px_array_size >= 1);
2530 #if defined(SERVER_MODE)
2531  rv = pthread_mutex_lock (&(sort_param->px_mtx));
2532  assert (rv == NO_ERROR);
2533 
2534  for (i = 0; i < sort_param->px_array_size; i++)
2535  {
2536  sort_param->px_array[i].px_status = 0; /* init */
2537  }
2538 
2539  pthread_mutex_unlock (&(sort_param->px_mtx));
2540 #endif /* SERVER_MODE */
2541 
2542  px_node = px_sort_assign (thread_p, sort_param, 0, index_buff, index_area, numrecs, sort_param->px_height_max,
2543  0 /* px_myself: set as root */ );
2544  if (px_node == NULL)
2545  {
2546  error = ER_FAILED;
2547  goto exit_on_error;
2548  }
2549 
2550  if (px_sort_myself (thread_p, px_node) != NO_ERROR)
2551  {
2552  error = ER_FAILED;
2553  goto exit_on_error;
2554  }
2555 
2556  index_area = px_node->px_result;
2557  numrecs = px_node->px_result_size;
2558  *total_numrecs += numrecs;
2559  }
2560  else
2561  {
2562  index_area = sort_run_sort (thread_p, sort_param, index_area, numrecs, sort_numrecs, index_buff, &numrecs);
2563  *total_numrecs += numrecs;
2564  }
2565 
2566  if (index_area == NULL || numrecs < 0)
2567  {
2568  error = ER_FAILED;
2569  goto exit_on_error;
2570  }
2571 
2572  if (sort_param->tot_runs > 0)
2573  {
2574  /* There has been other runs produced already */
2575 
2576  error =
2577  sort_run_flush (thread_p, sort_param, out_curfile, cur_page, output_buffer, index_area, numrecs, REC_HOME);
2578  if (error != NO_ERROR)
2579  {
2580  goto exit_on_error;
2581  }
2582  }
2583  else
2584  {
2585  /* No run has been produced yet There is no need for the merging phase; directly output the sorted temp
2586  * records. */
2587 
2588  for (i = 0; i < numrecs; i++)
2589  {
2590  /* Obtain the output record for this temporary record */
2591  temp_recdes.data = index_area[i];
2592  temp_recdes.length = SORT_RECORD_LENGTH (index_area[i]);
2593 
2594  error = (*sort_param->put_fn) (thread_p, &temp_recdes, sort_param->put_arg);
2595  if (error != NO_ERROR)
2596  {
2597  if (error == SORT_PUT_STOP)
2598  {
2599  error = NO_ERROR;
2600  }
2601  goto exit_on_error;
2602  }
2603  }
2604  }
2605  }
2606  else if (sort_param->tot_runs == 1)
2607  {
2608  if (once_flushed)
2609  {
2610  for (i = 0; i < saved_numrecs; i++)
2611  {
2612  /* Obtain the output record for this temporary record */
2613  temp_recdes.data = saved_index_area[i];
2614  temp_recdes.length = SORT_RECORD_LENGTH (saved_index_area[i]);
2615 
2616  error = (*sort_param->put_fn) (thread_p, &temp_recdes, sort_param->put_arg);
2617  if (error != NO_ERROR)
2618  {
2619  if (error == SORT_PUT_STOP)
2620  {
2621  error = NO_ERROR;
2622  }
2623  goto exit_on_error;
2624  }
2625  }
2626  }
2627  else
2628  {
2629  /*
2630  * The only way to get here is if we had exactly one record to
2631  * sort, and that record required overflow pages. In that case we
2632  * have done a ridiculous amount of work, but there doesn't seem to
2633  * be an easy way to restructure the existing sort_inphase_sort code to
2634  * cope with the situation. Just go get the record and be happy...
2635  */
2636  error = NO_ERROR;
2637  temp_recdes.data = NULL;
2638  long_recdes.area_size = 0;
2639  if (long_recdes.data)
2640  {
2641  free_and_init (long_recdes.data);
2642  }
2643 
2644  if (sort_read_area (thread_p, &sort_param->temp[0], 0, 1, output_buffer) != NO_ERROR
2645  || sort_spage_get_record (output_buffer, 0, &temp_recdes, PEEK) != S_SUCCESS
2646  || sort_retrieve_longrec (thread_p, &temp_recdes, &long_recdes) == NULL
2647  || (*sort_param->put_fn) (thread_p, &long_recdes, sort_param->put_arg) != NO_ERROR)
2648  {
2649  ASSERT_ERROR ();
2650  error = er_errid ();
2651  goto exit_on_error;
2652  }
2653  }
2654  }
2655 
2656 exit_on_error:
2657 
2658  if (long_recdes.data)
2659  {
2660  free_and_init (long_recdes.data);
2661  }
2662 
2663  return error;
2664 }
2665 
2666 /*
2667  * sort_run_flush () - Flush run
2668  * return:
2669  * sort_param(in): sort parameters
2670  * out_file(in): index of output file to flush the run
2671  * cur_page(in): current page of each temp file (used to determine
2672  * where, within the file, the run should be flushed)
2673  * output_buffer(in): output buffer to use for flushing the records
2674  * index_area(in): index area keeping ordered pointers to the records
2675  * numrecs(in): number of records the run includes
2676  * rec_type(in): type of records; Assume that all the records of this
2677  * run has the same type. This may need to be changed
2678  * to allow individual records have different types.
2679  *
2680  * Note: This function flushes a run to the specified output file. The records
2681  * of the run are loaded to the output buffer in the order imposed by
2682  * the index area (i.e., on the order of pointers to these records).
2683  * This buffer is written to the successive pages of the specified file
2684  * when it is full.
2685  */
2686 static int
2687 sort_run_flush (THREAD_ENTRY * thread_p, SORT_PARAM * sort_param, int out_file, int *cur_page, char *output_buffer,
2688  char **index_area, int numrecs, int rec_type)
2689 {
2690  int error = NO_ERROR;
2691  int run_size;
2692  RECDES out_recdes;
2693  int i;
2694  SORT_REC *key, *next;
2695  int flushed_items = 0;
2696  int should_continue = true;
2697 
2698  /* Make sure the the temp file indexed by out_file has been created; if not, create it now. */
2699  if (sort_param->temp[out_file].volid == NULL_VOLID)
2700  {
2701  error =
2702  sort_add_new_file (thread_p, &sort_param->temp[out_file], sort_param->tmp_file_pgs, false,
2703  sort_param->tde_encrypted);
2704  if (error != NO_ERROR)
2705  {
2706  return error;
2707  }
2708  }
2709 
2710  /* Store the record type; used for REC_BIGONE record types */
2711  out_recdes.type = rec_type;
2712 
2713  run_size = 0;
2715 
2716  /* Insert each record to the output buffer and flush the buffer when it is full */
2717  for (i = 0; i < numrecs && should_continue; i++)
2718  {
2719  /* Traverse next link */
2720  for (key = (SORT_REC *) index_area[i]; key; key = next)
2721  {
2722  /* If we've already flushed the required number, stop. */
2723  if (sort_param->limit > 0 && flushed_items >= sort_param->limit)
2724  {
2725  should_continue = false;
2726  break;
2727  }
2728 
2729  /* cut-off and save duplicate sort_key value link */
2730  if (rec_type == REC_HOME)
2731  {
2732  next = key->next;
2733  }
2734  else
2735  {
2736  /* REC_BIGONE */
2737  next = NULL;
2738  }
2739 
2740  out_recdes.data = (char *) key;
2741  out_recdes.length = SORT_RECORD_LENGTH ((char *) key);
2742 
2743  if (sort_spage_insert (output_buffer, &out_recdes) == NULL_SLOTID)
2744  {
2745  /* Output buffer is full */
2746  error = sort_write_area (thread_p, &sort_param->temp[out_file], cur_page[out_file], 1, output_buffer);
2747  if (error != NO_ERROR)
2748  {
2749  return error;
2750  }
2751 
2752  cur_page[out_file]++;
2753  run_size++;
2755 
2756  if (sort_spage_insert (output_buffer, &out_recdes) == NULL_SLOTID)
2757  {
2758  /* Slotted page module refuses to insert a short size record to an empty page. This should never
2759  * happen. */
2760  error = ER_GENERIC_ERROR;
2762  return error;
2763  }
2764  }
2765  flushed_items++;
2766  }
2767  }
2768 
2769  if (sort_spage_get_numrecs (output_buffer))
2770  {
2771  /* Flush the partially full output page */
2772  error = sort_write_area (thread_p, &sort_param->temp[out_file], cur_page[out_file], 1, output_buffer);
2773  if (error != NO_ERROR)
2774  {
2775  return error;
2776  }
2777 
2778  /* Update sort parameters */
2779  cur_page[out_file]++;
2780  run_size++;
2781  }
2782 
2783  /* Record the insertion of the new pages of the file to global parameters */
2784  error = sort_run_add_new (&sort_param->file_contents[out_file], run_size);
2785  if (error != NO_ERROR)
2786  {
2787  return error;
2788  }
2789 
2790  sort_param->tot_runs++;
2791 
2792  return NO_ERROR;
2793 }
2794 
2795 /*
2796  * sort_retrieve_longrec () -
2797  * return:
2798  * address(in):
2799  * memory(in):
2800  */
2801 static char *
2802 sort_retrieve_longrec (THREAD_ENTRY * thread_p, RECDES * address, RECDES * memory)
2803 {
2804  int needed_area_size;
2805 
2806  /* Find the required area for the long record */
2807  needed_area_size = overflow_get_length (thread_p, (VPID *) address->data);
2808  if (needed_area_size == -1)
2809  {
2810  return NULL;
2811  }
2812 
2813  /* If necessary allocate dynamic area for the long record */
2814  if (needed_area_size > memory->area_size)
2815  {
2816  /* There is already a small area; free it. */
2817  if (memory->data != NULL)
2818  {
2819  free_and_init (memory->data);
2820  }
2821 
2822  /* Allocate dynamic area for this long record */
2823  memory->area_size = needed_area_size;
2824  memory->data = (char *) malloc (memory->area_size);
2825  if (memory->data == NULL)
2826  {
2827  return NULL;
2828  }
2829  }
2830 
2831  /* Retrieve the long record */
2832  if (overflow_get (thread_p, (VPID *) address->data, memory, NULL) != S_SUCCESS)
2833  {
2834  return NULL;
2835  }
2836 
2837  return memory->data;
2838 }
2839 
2840 /*
2841  * sort_exphase_merge_elim_dup () -
2842  * return:
2843  * sort_param(in):
2844  */
2845 static int
2847 {
2848  /* Variables for input files */
2849  int act_infiles; /* How many of input files are active */
2850  int pre_act_infiles; /* Number of active input files in the previous iteration */
2851  int in_sectsize; /* Size of section allocated to each active input file (in terms of number of buffers
2852  * it contains) */
2853  int read_pages; /* Number of pages read in to fill the input buffer */
2854  int in_act_bufno[SORT_MAX_HALF_FILES]; /* Active buffer in the input section */
2855  int in_last_buf[SORT_MAX_HALF_FILES]; /* Last full buffer of the input section */
2856  int act_slot[SORT_MAX_HALF_FILES]; /* Active slot of the active buffer of input section */
2857  int last_slot[SORT_MAX_HALF_FILES]; /* Last slot of the active buffer of the input section */
2858 
2859  char *in_sectaddr[SORT_MAX_HALF_FILES]; /* Beginning address of each input section */
2860  char *in_cur_bufaddr[SORT_MAX_HALF_FILES]; /* Address of the current buffer in each input section */
2861 
2862  /* Variables for output file */
2863  int out_half; /* Which half of temp files is for output */
2864  int cur_outfile; /* Index for output file recieving new run */
2865  int out_sectsize; /* Size of the output section (in terms of number of buffer it contains) */
2866  int out_act_bufno; /* Active buffer in the output section */
2867  int out_runsize; /* Total pages output for the run being produced */
2868  char *out_sectaddr; /* Beginning address of the output section */
2869  char *out_cur_bufaddr; /* Address of the current buffer in the output section */
2870 
2871  /* Smallest element pointers (one for each active input file) pointing to the active temp records. If the input file
2872  * becomes inactive (all input is exhausted), its smallest element pointer is set to NULL */
2873  RECDES smallest_elem_ptr[SORT_MAX_HALF_FILES];
2874  RECDES long_recdes[SORT_MAX_HALF_FILES];
2875 
2876  int cur_page[2 * SORT_MAX_HALF_FILES]; /* Current page of each temp file */
2877  int num_runs; /* Number of output runs to be produced in this stage of the merging phase; */
2878  int big_index;
2879  int error;
2880  int i, j;
2881  int temp;
2882  int min;
2883  int len;
2884  bool very_last_run = false;
2885  int act;
2886  int cp_pages;
2887  int cmp;
2888 
2889  SORT_CMP_FUNC *compare;
2890  void *compare_arg;
2891 
2892  SORT_REC_LIST sr_list[SORT_MAX_HALF_FILES], *min_p, *s, *p;
2893  int tmp_var;
2894  RECDES last_elem_ptr; /* last element pointer in one page of input section */
2895  RECDES last_long_recdes;
2896 
2897  /* >0 : must find minimum record <0 : last element in the current input section is minimum record. no need to find
2898  * min record =0 : last element in the current input section is duplicated min record. no need to find min record
2899  * except last element */
2900  int last_elem_cmp;
2901 
2902  char **data1, **data2;
2903  SORT_REC *sort_rec;
2904  int first_run;
2905 
2906  error = NO_ERROR;
2907 
2908  compare = sort_param->cmp_fn;
2909  compare_arg = sort_param->cmp_arg;
2910 
2911  for (i = 0; i < SORT_MAX_HALF_FILES; i++)
2912  {
2913  in_act_bufno[i] = 0;
2914  in_last_buf[i] = 0;
2915  act_slot[i] = 0;
2916  last_slot[i] = 0;
2917  in_sectaddr[i] = NULL;
2918  in_cur_bufaddr[i] = NULL;
2919 
2920  smallest_elem_ptr[i].data = NULL;
2921  smallest_elem_ptr[i].area_size = 0;
2922 
2923  long_recdes[i].data = NULL;
2924  long_recdes[i].area_size = 0;
2925  }
2926 
2927  last_elem_ptr.data = NULL;
2928  last_elem_ptr.area_size = 0;
2929 
2930  last_long_recdes.data = NULL;
2931  last_long_recdes.area_size = 0;
2932 
2933  for (i = 0; i < (int) DIM (cur_page); i++)
2934  {
2935  cur_page[i] = 0;
2936  }
2937 
2938  if (sort_param->in_half == 0)
2939  {
2940  out_half = sort_param->half_files;
2941  }
2942  else
2943  {
2944  out_half = 0;
2945  }
2946 
2947  /* OUTER LOOP */
2948 
2949  /* While there are more than one input files with different runs to merge */
2950  while ((act_infiles = sort_get_numpages_of_active_infiles (sort_param)) > 1)
2951  {
2952  /* Check if output files has enough pages; if not allocate new pages */
2953  error = sort_checkalloc_numpages_of_outfiles (thread_p, sort_param);
2954  if (error != NO_ERROR)
2955  {
2956  ASSERT_ERROR ();
2957  goto bailout;
2958  }
2959 
2960  /* Initialize the current pages of all temp files to 0 */
2961  for (i = 0; i < sort_param->tot_tempfiles; i++)
2962  {
2963  cur_page[i] = 0;
2964  }
2965 
2966  /* Distribute the internal memory to the input and output sections */
2967  in_sectsize = sort_find_inbuf_size (sort_param->tot_buffers, act_infiles);
2968  out_sectsize = sort_param->tot_buffers - in_sectsize * act_infiles;
2969 
2970  /* Set the address of each input section */
2971  for (i = 0; i < act_infiles; i++)
2972  {
2973  in_sectaddr[i] = sort_param->internal_memory + (i * in_sectsize * DB_PAGESIZE);
2974  }
2975 
2976  /* Set the address of output section */
2977  out_sectaddr = sort_param->internal_memory + (act_infiles * in_sectsize * DB_PAGESIZE);
2978 
2979  cur_outfile = out_half;
2980 
2981  /* Find how many runs will be produced in this iteration */
2982  num_runs = 0;
2983  for (i = sort_param->in_half; i < sort_param->in_half + act_infiles; i++)
2984  {
2985  len = sort_get_num_file_contents (&sort_param->file_contents[i]);
2986  if (len > num_runs)
2987  {
2988  num_runs = len;
2989  }
2990  }
2991 
2992  if (num_runs == 1)
2993  {
2994  very_last_run = true;
2995  }
2996 
2997  /* PRODUCE RUNS */
2998 
2999  for (j = num_runs; j > 0; j--)
3000  {
3001  if (!very_last_run && (j == 1))
3002  {
3003  /* Last iteration of the outer loop ; some of the input files might have become empty. */
3004 
3005  pre_act_infiles = act_infiles;
3006  act_infiles = sort_get_numpages_of_active_infiles (sort_param);
3007  if (act_infiles != pre_act_infiles)
3008  {
3009  /* Some of the active input files became inactive */
3010 
3011  if (act_infiles == 1)
3012  {
3013  /*
3014  * There is only one active input file (i.e. there is
3015  * only one input run to produce the output run). So,
3016  * there is no need to perform the merging actions. All
3017  * needed is to copy this input run to the current output
3018  * file.
3019  */
3020 
3021  act = -1;
3022 
3023  /* Find which input file contains this last input run */
3024  for (i = sort_param->in_half; i < (sort_param->in_half + pre_act_infiles); i++)
3025  {
3026  if (sort_param->file_contents[i].first_run != -1)
3027  {
3028  act = i;
3029  break;
3030  }
3031  }
3032 
3033  if (act == -1)
3034  {
3035  goto bailout;
3036  }
3037 
3038  first_run = sort_param->file_contents[act].first_run;
3039  cp_pages = sort_param->file_contents[act].num_pages[first_run];
3040 
3041  error = sort_run_add_new (&sort_param->file_contents[cur_outfile], cp_pages);
3042  if (error != NO_ERROR)
3043  {
3044  goto bailout;
3045  }
3046 
3047  sort_run_remove_first (&sort_param->file_contents[act]);
3048 
3049  /* Use the whole internal_memory area as both the input and output buffer areas. */
3050  while (cp_pages > 0)
3051  {
3052  if (cp_pages > sort_param->tot_buffers)
3053  {
3054  read_pages = sort_param->tot_buffers;
3055  }
3056  else
3057  {
3058  read_pages = cp_pages;
3059  }
3060 
3061  error =
3062  sort_read_area (thread_p, &sort_param->temp[act], cur_page[act], read_pages,
3063  sort_param->internal_memory);
3064  if (error != NO_ERROR)
3065  {
3066  goto bailout;
3067  }
3068 
3069  cur_page[act] += read_pages;
3070  error =
3071  sort_write_area (thread_p, &sort_param->temp[cur_outfile], cur_page[cur_outfile],
3072  read_pages, sort_param->internal_memory);
3073  if (error != NO_ERROR)
3074  {
3075  goto bailout;
3076  }
3077 
3078  cur_page[cur_outfile] += read_pages;
3079  cp_pages -= read_pages;
3080  }
3081 
3082  /* Skip the remaining operations of the PRODUCE RUNS loop */
3083  continue;
3084  }
3085  else
3086  {
3087  /* There are more than one active input files; redistribute buffers */
3088  in_sectsize = sort_find_inbuf_size (sort_param->tot_buffers, act_infiles);
3089  out_sectsize = sort_param->tot_buffers - in_sectsize * act_infiles;
3090 
3091  /* Set the address of each input section */
3092  for (i = 0; i < act_infiles; i++)
3093  {
3094  in_sectaddr[i] = sort_param->internal_memory + (i * in_sectsize * DB_PAGESIZE);
3095  }
3096 
3097  /* Set the address of output section */
3098  out_sectaddr = sort_param->internal_memory + (act_infiles * in_sectsize * DB_PAGESIZE);
3099  }
3100  }
3101  }
3102 
3103  /* PRODUCE A NEW RUN */
3104 
3105  /* INITIALIZE INPUT SECTIONS AND INPUT VARIABLES */
3106  for (i = 0; i < act_infiles; i++)
3107  {
3108  big_index = sort_param->in_half + i;
3109  first_run = sort_param->file_contents[big_index].first_run;
3110  read_pages = sort_param->file_contents[big_index].num_pages[first_run];
3111 
3112  if (in_sectsize < read_pages)
3113  {
3114  read_pages = in_sectsize;
3115  }
3116 
3117  error =
3118  sort_read_area (thread_p, &sort_param->temp[big_index], cur_page[big_index], read_pages,
3119  in_sectaddr[i]);
3120  if (error != NO_ERROR)
3121  {
3122  goto bailout;
3123  }
3124 
3125  /* Increment the current page of this input_file */
3126  cur_page[big_index] += read_pages;
3127 
3128  first_run = sort_param->file_contents[big_index].first_run;
3129  sort_param->file_contents[big_index].num_pages[first_run] -= read_pages;
3130 
3131  /* Initialize input variables */
3132  in_cur_bufaddr[i] = in_sectaddr[i];
3133  in_act_bufno[i] = 0;
3134  in_last_buf[i] = read_pages;
3135  act_slot[i] = 0;
3136  last_slot[i] = sort_spage_get_numrecs (in_cur_bufaddr[i]);
3137 
3138  if (sort_spage_get_record (in_cur_bufaddr[i], act_slot[i], &smallest_elem_ptr[i], PEEK) != S_SUCCESS)
3139  {
3142  goto bailout;
3143  }
3144 
3145  /* If this is a long record retrieve it */
3146  if (smallest_elem_ptr[i].type == REC_BIGONE)
3147  {
3148  if (sort_retrieve_longrec (thread_p, &smallest_elem_ptr[i], &long_recdes[i]) == NULL)
3149  {
3150  ASSERT_ERROR ();
3151  error = er_errid ();
3152  goto bailout;
3153  }
3154  }
3155  }
3156 
3157  /* linkage & init nodes */
3158  for (i = 0, p = sr_list; i < (act_infiles - 1); p = p->next)
3159  {
3160  p->next = (SORT_REC_LIST *) ((char *) p + sizeof (SORT_REC_LIST));
3161  p->rec_pos = i++;
3162  p->is_duplicated = false;
3163  }
3164 
3165  p->next = NULL;
3166  p->rec_pos = i;
3167  p->is_duplicated = false;
3168 
3169  /* sort sr_list */
3170  for (s = sr_list; s; s = s->next)
3171  {
3172  for (p = s->next; p; p = p->next)
3173  {
3174  /* compare s, p */
3175  data1 = ((smallest_elem_ptr[s->rec_pos].type == REC_BIGONE)
3176  ? &(long_recdes[s->rec_pos].data) : &(smallest_elem_ptr[s->rec_pos].data));
3177 
3178  data2 = ((smallest_elem_ptr[p->rec_pos].type == REC_BIGONE)
3179  ? &(long_recdes[p->rec_pos].data) : &(smallest_elem_ptr[p->rec_pos].data));
3180 
3181  cmp = (*compare) (data1, data2, compare_arg);
3182  if (cmp > 0)
3183  {
3184  /* swap s, p's rec_pos */
3185  tmp_var = s->rec_pos;
3186  s->rec_pos = p->rec_pos;
3187  p->rec_pos = tmp_var;
3188  }
3189  }
3190  }
3191 
3192  /* find duplicate */
3193  for (s = sr_list; s && s->next; s = s->next)
3194  {
3195  p = s->next;
3196 
3197  data1 = ((smallest_elem_ptr[s->rec_pos].type == REC_BIGONE)
3198  ? &(long_recdes[s->rec_pos].data) : &(smallest_elem_ptr[s->rec_pos].data));
3199 
3200  data2 = ((smallest_elem_ptr[p->rec_pos].type == REC_BIGONE)
3201  ? &(long_recdes[p->rec_pos].data) : &(smallest_elem_ptr[p->rec_pos].data));
3202 
3203  cmp = (*compare) (data1, data2, compare_arg);
3204  if (cmp == 0)
3205  {
3206  p->is_duplicated = true;
3207  }
3208  }
3209 
3210  /* set min_p to point the minimum record */
3211  min_p = sr_list; /* min_p->rec_pos is the min record */
3212 
3213  /* last element comparison */
3214  last_elem_cmp = 1;
3215  p = min_p->next; /* second smallest element */
3216 
3217  if (p)
3218  {
3219  /* STEP 1: get last_elem */
3220  if (sort_spage_get_record (in_cur_bufaddr[min_p->rec_pos], (last_slot[min_p->rec_pos] - 1),
3221  &last_elem_ptr, PEEK) != S_SUCCESS)
3222  {
3225  goto bailout;
3226  }
3227 
3228  /* if this is a long record, retrieve it */
3229  if (last_elem_ptr.type == REC_BIGONE)
3230  {
3231  if (sort_retrieve_longrec (thread_p, &last_elem_ptr, &last_long_recdes) == NULL)
3232  {
3233  ASSERT_ERROR ();
3234  error = er_errid ();
3235  goto bailout;
3236  }
3237  }
3238 
3239  /* STEP 2: compare last, p */
3240  data1 = ((last_elem_ptr.type == REC_BIGONE) ? &(last_long_recdes.data) : &(last_elem_ptr.data));
3241 
3242  data2 = ((smallest_elem_ptr[p->rec_pos].type == REC_BIGONE)
3243  ? &(long_recdes[p->rec_pos].data) : &(smallest_elem_ptr[p->rec_pos].data));
3244 
3245  last_elem_cmp = (*compare) (data1, data2, compare_arg);
3246  }
3247 
3248  /* INITIALIZE OUTPUT SECTION AND OUTPUT VARIABLES */
3249  out_act_bufno = 0;
3250  out_cur_bufaddr = out_sectaddr;
3251  for (i = 0; i < out_sectsize; i++)
3252  {
3253  /* Initialize each buffer to contain a slotted page */
3255  }
3256 
3257  /* Initialize the size of next run to zero */
3258  out_runsize = 0;
3259 
3260  for (;;)
3261  {
3262  /* OUTPUT A RECORD */
3263 
3264  /* FIND MINIMUM RECORD IN THE INPUT AREA */
3265  min = min_p->rec_pos;
3266 
3267  /* min_p->is_duplicated == 1 then skip duplicated sort_key record */
3268  if (min_p->is_duplicated == false)
3269  {
3270  /* we found first unique sort_key record */
3271 
3272  if (very_last_run)
3273  {
3274  /* OUTPUT THE RECORD */
3275  /* Obtain the output record for this temporary record */
3276  if (smallest_elem_ptr[min].type == REC_BIGONE)
3277  {
3278  error = (*sort_param->put_fn) (thread_p, &long_recdes[min], sort_param->put_arg);
3279  if (error != NO_ERROR)
3280  {
3281  goto bailout;
3282  }
3283  }
3284  else
3285  {
3286  sort_rec = (SORT_REC *) (smallest_elem_ptr[min].data);
3287  /* cut-off link used in Internal Sort */
3288  sort_rec->next = NULL;
3289  error = (*sort_param->put_fn) (thread_p, &smallest_elem_ptr[min], sort_param->put_arg);
3290  if (error != NO_ERROR)
3291  {
3292  goto bailout;
3293  }
3294  }
3295  }
3296  else
3297  {
3298  /* OUTPUT THE MINIMUM RECORD TO THE OUTPUT AREA */
3299 
3300  /* Insert this record to the output area */
3301  if (sort_spage_insert (out_cur_bufaddr, &smallest_elem_ptr[min]) == NULL_SLOTID)
3302  {
3303  /* Current output buffer is full */
3304 
3305  if (++out_act_bufno < out_sectsize)
3306  {
3307  /* There is another buffer in the output section; so insert the new record there */
3308  out_cur_bufaddr += DB_PAGESIZE;
3309 
3310  if (sort_spage_insert (out_cur_bufaddr, &smallest_elem_ptr[min]) == NULL_SLOTID)
3311  {
3312  /*
3313  * Slotted page module refuses to insert a
3314  * short size record (a temporary record that
3315  * was already in a slotted page) to an empty
3316  * page. This should never happen.
3317  */
3319  error = ER_GENERIC_ERROR;
3320  goto bailout;
3321  }
3322  }
3323  else
3324  {
3325  /* Output section is full */
3326 
3327  /* Flush output section */
3328  error =
3329  sort_write_area (thread_p, &sort_param->temp[cur_outfile], cur_page[cur_outfile],
3330  out_sectsize, out_sectaddr);
3331  if (error != NO_ERROR)
3332  {
3333  goto bailout;
3334  }
3335  cur_page[cur_outfile] += out_sectsize;
3336  out_runsize += out_sectsize;
3337 
3338  /* Initialize output section and output variables */
3339  out_act_bufno = 0;
3340  out_cur_bufaddr = out_sectaddr;
3341  for (i = 0; i < out_sectsize; i++)
3342  {
3343  /* Initialize each buffer to contain a slotted page */
3345  MAX_ALIGNMENT);
3346  }
3347 
3348  if (sort_spage_insert (out_cur_bufaddr, &smallest_elem_ptr[min]) == NULL_SLOTID)
3349  {
3350  /*
3351  * Slotted page module refuses to insert a
3352  * short size record (a temporary record that
3353  * was already in a slotted page) to an empty
3354  * page. This should never happen.
3355  */
3357  error = ER_GENERIC_ERROR;
3358  goto bailout;
3359  }
3360  }
3361  }
3362  }
3363  }
3364  else
3365  {
3366  /* skip output the duplicated record to the output area */
3367  ;
3368  }
3369 
3370  /* PROCEED THE smallest_elem_ptr[min] TO NEXT RECORD */
3371  if (++act_slot[min] >= last_slot[min])
3372  {
3373  /* The current input page is finished */
3374 
3375  last_elem_cmp = 1;
3376 
3377  if (++in_act_bufno[min] < in_last_buf[min])
3378  {
3379  /* Switch to the next page in the input buffer */
3380  in_cur_bufaddr[min] = in_sectaddr[min] + in_act_bufno[min] * DB_PAGESIZE;
3381  }
3382  else
3383  { /* The input section is finished */
3384  int frun;
3385 
3386  big_index = sort_param->in_half + min;
3387  frun = sort_param->file_contents[big_index].first_run;
3388 
3389  if (sort_param->file_contents[big_index].num_pages[frun])
3390  {
3391  /* There are still some pages in the current input run */
3392 
3393  in_cur_bufaddr[min] = in_sectaddr[min];
3394 
3395  read_pages = sort_param->file_contents[big_index].num_pages[frun];
3396  if (in_sectsize < read_pages)
3397  {
3398  read_pages = in_sectsize;
3399  }
3400 
3401  in_last_buf[min] = read_pages;
3402 
3403  error = sort_read_area (thread_p, &sort_param->temp[big_index], cur_page[big_index],
3404  read_pages, in_cur_bufaddr[min]);
3405  if (error != NO_ERROR)
3406  {
3407  goto bailout;
3408  }
3409 
3410  /* Increment the current page of this input_file */
3411  cur_page[big_index] += read_pages;
3412 
3413  in_act_bufno[min] = 0;
3414 
3415  frun = sort_param->file_contents[big_index].first_run;
3416  sort_param->file_contents[big_index].num_pages[frun] -= read_pages;
3417  }
3418  else
3419  {
3420  /* Current input run on this input file has finished */
3421  min_p = min_p->next;
3422 
3423  if (min_p == NULL)
3424  {
3425  /* all "smallest_elem_ptr" are NULL; so break */
3426  break;
3427  }
3428  else
3429  {
3430  /* Don't try to get the next record on this input section */
3431  continue;
3432  }
3433  }
3434  }
3435 
3436  act_slot[min] = 0;
3437  last_slot[min] = sort_spage_get_numrecs (in_cur_bufaddr[min]);
3438  }
3439 
3440  if (sort_spage_get_record (in_cur_bufaddr[min], act_slot[min], &smallest_elem_ptr[min], PEEK) !=
3441  S_SUCCESS)
3442  {
3445  goto bailout;
3446  }
3447 
3448  /* If this is a long record retrieve it */
3449  if (smallest_elem_ptr[min].type == REC_BIGONE)
3450  {
3451  if (sort_retrieve_longrec (thread_p, &smallest_elem_ptr[min], &long_recdes[min]) == NULL)
3452  {
3453  ASSERT_ERROR ();
3454  error = er_errid ();
3455  goto bailout;
3456  }
3457  }
3458 
3459  if ((act_slot[min_p->rec_pos] == last_slot[min_p->rec_pos] - 1) && (last_elem_cmp == 0))
3460  {
3461  /* last duplicated element in input section page enters */
3462  min_p->is_duplicated = true;
3463  }
3464  else
3465  {
3466  min_p->is_duplicated = false;
3467  }
3468 
3469  /* find minimum */
3470  if (last_elem_cmp <= 0)
3471  {
3472  /* already found min */
3473  ;
3474  }
3475  else
3476  {
3477  for (s = min_p; s; s = s->next)
3478  {
3479  p = s->next;
3480  if (p == NULL)
3481  {
3482  /* there is only one record */
3483  break;
3484  }
3485 
3486  /* compare s, p */
3487  data1 = ((smallest_elem_ptr[s->rec_pos].type == REC_BIGONE)
3488  ? &(long_recdes[s->rec_pos].data) : &(smallest_elem_ptr[s->rec_pos].data));
3489 
3490  data2 = ((smallest_elem_ptr[p->rec_pos].type == REC_BIGONE)
3491  ? &(long_recdes[p->rec_pos].data) : &(smallest_elem_ptr[p->rec_pos].data));
3492 
3493  cmp = (*compare) (data1, data2, compare_arg);
3494  if (cmp > 0)
3495  {
3496  /* swap s, p's rec_pos */
3497  tmp_var = s->rec_pos;
3498  s->rec_pos = p->rec_pos;
3499  p->rec_pos = tmp_var;
3500 
3501  /* swap s, p's is_duplicated */
3502  tmp_var = (int) s->is_duplicated;
3503  s->is_duplicated = p->is_duplicated;
3504  p->is_duplicated = (bool) tmp_var;
3505  }
3506  else
3507  {
3508  if (cmp == 0)
3509  {
3510  p->is_duplicated = true; /* duplicated */
3511  }
3512 
3513  /* sr_list is completely sorted */
3514  break;
3515  }
3516  }
3517 
3518  /* new input page is entered */
3519  if (act_slot[min_p->rec_pos] == 0)
3520  {
3521  /* last element comparison */
3522  p = min_p->next; /* second smallest element */
3523  if (p)
3524  {
3525  /* STEP 1: get last_elem */
3526  if (sort_spage_get_record (in_cur_bufaddr[min_p->rec_pos], (last_slot[min_p->rec_pos] - 1),
3527  &last_elem_ptr, PEEK) != S_SUCCESS)
3528  {
3531  goto bailout;
3532  }
3533 
3534  /* if this is a long record, retrieve it */
3535  if (last_elem_ptr.type == REC_BIGONE)
3536  {
3537  if (sort_retrieve_longrec (thread_p, &last_elem_ptr, &last_long_recdes) == NULL)
3538  {
3539  ASSERT_ERROR ();
3540  error = er_errid ();
3541  goto bailout;
3542  }
3543  }
3544 
3545  /* STEP 2: compare last, p */
3546  data1 = ((last_elem_ptr.type == REC_BIGONE)
3547  ? &(last_long_recdes.data) : &(last_elem_ptr.data));
3548 
3549  data2 = ((smallest_elem_ptr[p->rec_pos].type == REC_BIGONE)
3550  ? &(long_recdes[p->rec_pos].data) : &(smallest_elem_ptr[p->rec_pos].data));
3551 
3552  last_elem_cmp = (*compare) (data1, data2, compare_arg);
3553  }
3554  }
3555  }
3556  }
3557 
3558  if (!very_last_run)
3559  {
3560  /* Flush whatever is left on the output section */
3561  out_act_bufno++; /* Since 0 refers to the first active buffer */
3562 
3563  error =
3564  sort_write_area (thread_p, &sort_param->temp[cur_outfile], cur_page[cur_outfile], out_act_bufno,
3565  out_sectaddr);
3566  if (error != NO_ERROR)
3567  {
3568  goto bailout;
3569  }
3570 
3571  cur_page[cur_outfile] += out_act_bufno;
3572  out_runsize += out_act_bufno;
3573  }
3574 
3575  /* END UP THIS RUN */
3576 
3577  /* Remove previous first_run nodes of the file_contents lists of the input files */
3578  for (i = sort_param->in_half; i < sort_param->in_half + sort_param->half_files; i++)
3579  {
3580  sort_run_remove_first (&sort_param->file_contents[i]);
3581  }
3582 
3583  /* Add a new node to the file_contents list of the current output file */
3584  error = sort_run_add_new (&sort_param->file_contents[cur_outfile], out_runsize);
3585  if (error != NO_ERROR)
3586  {
3587  goto bailout;
3588  }
3589 
3590  /* Produce a new run */
3591 
3592  /* Switch to the next out file */
3593  if (++cur_outfile >= sort_param->half_files + out_half)
3594  {
3595  cur_outfile = out_half;
3596  }
3597  }
3598 
3599  /* Exchange input and output file indices */
3600  temp = sort_param->in_half;
3601  sort_param->in_half = out_half;
3602  out_half = temp;
3603  }
3604 
3605 bailout:
3606 
3607  for (i = 0; i < sort_param->half_files; i++)
3608  {
3609  if (long_recdes[i].data != NULL)
3610  {
3611  free_and_init (long_recdes[i].data);
3612  }
3613  }
3614 
3615  if (last_long_recdes.data)
3616  {
3617  free_and_init (last_long_recdes.data);
3618  }
3619 
3620  return (error == SORT_PUT_STOP) ? NO_ERROR : error;
3621 }
3622 
3623 /*
3624  * sort_exphase_merge () - Merge phase
3625  * return:
3626  * sort_param(in): sort parameters
3627  *
3628  */
3629 static int
3631 {
3632  /* Variables for input files */
3633  int act_infiles; /* How many of input files are active */
3634  int pre_act_infiles; /* Number of active input files in the previous iteration */
3635  int in_sectsize; /* Size of section allocated to each active input file (in terms of number of buffers
3636  * it contains) */
3637  int read_pages; /* Number of pages read in to fill the input buffer */
3638  int in_act_bufno[SORT_MAX_HALF_FILES]; /* Active buffer in the input section */
3639  int in_last_buf[SORT_MAX_HALF_FILES]; /* Last full buffer of the input section */
3640  int act_slot[SORT_MAX_HALF_FILES]; /* Active slot of the active buffer of input section */
3641  int last_slot[SORT_MAX_HALF_FILES]; /* Last slot of the active buffer of the input section */
3642 
3643  char *in_sectaddr[SORT_MAX_HALF_FILES]; /* Beginning address of each input section */
3644  char *in_cur_bufaddr[SORT_MAX_HALF_FILES]; /* Address of the current buffer in each input section */
3645 
3646  /* Variables for output file */
3647  int out_half; /* Which half of temp files is for output */
3648  int cur_outfile; /* Index for output file recieving new run */
3649  int out_sectsize; /* Size of the output section (in terms of number of buffer it contains) */
3650  int out_act_bufno; /* Active buffer in the output section */
3651  int out_runsize; /* Total pages output for the run being produced */
3652  char *out_sectaddr; /* Beginning address of the output section */
3653  char *out_cur_bufaddr; /* Address of the current buffer in the output section */
3654 
3655  /* Smallest element pointers (one for each active input file) pointing to the active temp records. If the input file
3656  * becomes inactive (all input is exhausted), its smallest element pointer is set to NULL */
3657  RECDES smallest_elem_ptr[SORT_MAX_HALF_FILES];
3658  RECDES long_recdes[SORT_MAX_HALF_FILES];
3659 
3660  int cur_page[2 * SORT_MAX_HALF_FILES]; /* Current page of each temp file */
3661  int num_runs; /* Number of output runs to be produced in this stage of the merging phase; */
3662  int big_index;
3663  int error;
3664  int i, j;
3665  int temp;
3666  int min;
3667  int len;
3668  bool very_last_run = false;
3669  int act;
3670  int cp_pages;
3671 
3672  SORT_CMP_FUNC *compare;
3673  void *compare_arg;
3674 
3675  SORT_REC_LIST sr_list[SORT_MAX_HALF_FILES], *min_p, *s, *p;
3676  int tmp_pos; /* temporary value for rec_pos swapping */
3677  bool do_swap; /* rec_pos swapping indicator */
3678 
3679  RECDES last_elem_ptr; /* last element pointers in one page of input section */
3680  RECDES last_long_recdes;
3681  bool last_elem_is_min; /* false: must find min record true: last element in the current input section is min
3682  * record. no need to find min */
3683  char **data1, **data2;
3684  SORT_REC *sort_rec;
3685  int first_run;
3686  int cmp;
3687 
3688  error = NO_ERROR;
3689 
3690  compare = sort_param->cmp_fn;
3691  compare_arg = sort_param->cmp_arg;
3692 
3693  for (i = 0; i < SORT_MAX_HALF_FILES; i++)
3694  {
3695  in_act_bufno[i] = 0;
3696  in_last_buf[i] = 0;
3697  act_slot[i] = 0;
3698  last_slot[i] = 0;
3699  in_sectaddr[i] = NULL;
3700  in_cur_bufaddr[i] = NULL;
3701 
3702  smallest_elem_ptr[i].data = NULL;
3703  smallest_elem_ptr[i].area_size = 0;
3704 
3705  long_recdes[i].data = NULL;
3706  long_recdes[i].area_size = 0;
3707  }
3708 
3709  last_elem_ptr.data = NULL;
3710  last_elem_ptr.area_size = 0;
3711 
3712  last_long_recdes.data = NULL;
3713  last_long_recdes.area_size = 0;
3714 
3715  for (i = 0; i < (int) DIM (cur_page); i++)
3716  {
3717  cur_page[i] = 0;
3718  }
3719 
3720  if (sort_param->in_half == 0)
3721  {
3722  out_half = sort_param->half_files;
3723  }
3724  else
3725  {
3726  out_half = 0;
3727  }
3728 
3729  /* OUTER LOOP */
3730 
3731  /* While there are more than one input files with different runs to merge */
3732  while ((act_infiles = sort_get_numpages_of_active_infiles (sort_param)) > 1)
3733  {
3734  /* Check if output files has enough pages; if not allocate new pages */
3735  error = sort_checkalloc_numpages_of_outfiles (thread_p, sort_param);
3736  if (error != NO_ERROR)
3737  {
3738  ASSERT_ERROR ();
3739  goto bailout;
3740  }
3741 
3742  /* Initialize the current pages of all temp files to 0 */
3743  for (i = 0; i < sort_param->tot_tempfiles; i++)
3744  {
3745  cur_page[i] = 0;
3746  }
3747 
3748  /* Distribute the internal memory to the input and output sections */
3749  in_sectsize = sort_find_inbuf_size (sort_param->tot_buffers, act_infiles);
3750  out_sectsize = sort_param->tot_buffers - in_sectsize * act_infiles;
3751 
3752  /* Set the address of each input section */
3753  for (i = 0; i < act_infiles; i++)
3754  {
3755  in_sectaddr[i] = sort_param->internal_memory + (i * in_sectsize * DB_PAGESIZE);
3756  }
3757 
3758  /* Set the address of output section */
3759  out_sectaddr = sort_param->internal_memory + (act_infiles * in_sectsize * DB_PAGESIZE);
3760 
3761  cur_outfile = out_half;
3762 
3763  /* Find how many runs will be produced in this iteration */
3764  num_runs = 0;
3765  for (i = sort_param->in_half; i < sort_param->in_half + act_infiles; i++)
3766  {
3767  len = sort_get_num_file_contents (&sort_param->file_contents[i]);
3768  if (len > num_runs)
3769  {
3770  num_runs = len;
3771  }
3772  }
3773 
3774  if (num_runs == 1)
3775  {
3776  very_last_run = true;
3777  }
3778 
3779  /* PRODUCE RUNS */
3780 
3781  for (j = num_runs; j > 0; j--)
3782  {
3783  if (!very_last_run && (j == 1))
3784  {
3785  /* LAST RUN OF THIS ITERATION */
3786 
3787  /* Last iteration of the outer loop ; some of the input files might have become empty. */
3788 
3789  pre_act_infiles = act_infiles;
3790  act_infiles = sort_get_numpages_of_active_infiles (sort_param);
3791  if (act_infiles != pre_act_infiles)
3792  {
3793  /* Some of the active input files became inactive */
3794 
3795  if (act_infiles == 1)
3796  {
3797  /* ONE ACTIVE INFILE */
3798 
3799  /*
3800  * There is only one active input file (i.e. there is
3801  * only one input run to produce the output run). So,
3802  * there is no need to perform the merging actions. All
3803  * needed is to copy this input run to the current output
3804  * file.
3805  */
3806  act = -1;
3807 
3808  /* Find which input file contains this last input run */
3809  for (i = sort_param->in_half; i < (sort_param->in_half + pre_act_infiles); i++)
3810  {
3811  if (sort_param->file_contents[i].first_run != -1)
3812  {
3813  act = i;
3814  break;
3815  }
3816  }
3817 
3818  if (act == -1)
3819  {
3820  goto bailout;
3821  }
3822 
3823  first_run = sort_param->file_contents[act].first_run;
3824  cp_pages = sort_param->file_contents[act].num_pages[first_run];
3825 
3826  error = sort_run_add_new (&sort_param->file_contents[cur_outfile], cp_pages);
3827  if (error != NO_ERROR)
3828  {
3829  goto bailout;
3830  }
3831  sort_run_remove_first (&sort_param->file_contents[act]);
3832 
3833  /* Use the whole internal_memory area as both the input and output buffer areas. */
3834  while (cp_pages > 0)
3835  {
3836  if (cp_pages > sort_param->tot_buffers)
3837  {
3838  read_pages = sort_param->tot_buffers;
3839  }
3840  else
3841  {
3842  read_pages = cp_pages;
3843  }
3844 
3845  error =
3846  sort_read_area (thread_p, &sort_param->temp[act], cur_page[act], read_pages,
3847  sort_param->internal_memory);
3848  if (error != NO_ERROR)
3849  {
3850  goto bailout;
3851  }
3852 
3853  cur_page[act] += read_pages;
3854  error =
3855  sort_write_area (thread_p, &sort_param->temp[cur_outfile], cur_page[cur_outfile],
3856  read_pages, sort_param->internal_memory);
3857  if (error != NO_ERROR)
3858  {
3859  goto bailout;
3860  }
3861 
3862  cur_page[cur_outfile] += read_pages;
3863  cp_pages -= read_pages;
3864  }
3865 
3866  /* Skip the remaining operations of the PRODUCE RUNS loop */
3867  continue;
3868  }
3869  else
3870  {
3871  /* There are more than one active input files; redistribute buffers */
3872  in_sectsize = sort_find_inbuf_size (sort_param->tot_buffers, act_infiles);
3873  out_sectsize = sort_param->tot_buffers - in_sectsize * act_infiles;
3874 
3875  /* Set the address of each input section */
3876  for (i = 0; i < act_infiles; i++)
3877  {
3878  in_sectaddr[i] = sort_param->internal_memory + (i * in_sectsize * DB_PAGESIZE);
3879  }
3880 
3881  /* Set the address of output section */
3882  out_sectaddr = sort_param->internal_memory + (act_infiles * in_sectsize * DB_PAGESIZE);
3883  }
3884  }
3885  }
3886 
3887  /* PRODUCE A NEW RUN */
3888 
3889  /* INITIALIZE INPUT SECTIONS AND INPUT VARIABLES */
3890  for (i = 0; i < act_infiles; i++)
3891  {
3892  big_index = sort_param->in_half + i;
3893  first_run = sort_param->file_contents[big_index].first_run;
3894  read_pages = sort_param->file_contents[big_index].num_pages[first_run];
3895 
3896  if (in_sectsize < read_pages)
3897  {
3898  read_pages = in_sectsize;
3899  }
3900 
3901  error =
3902  sort_read_area (thread_p, &sort_param->temp[big_index], cur_page[big_index], read_pages,
3903  in_sectaddr[i]);
3904  if (error != NO_ERROR)
3905  {
3906  goto bailout;
3907  }
3908 
3909  /* Increment the current page of this input_file */
3910  cur_page[big_index] += read_pages;
3911 
3912  first_run = sort_param->file_contents[big_index].first_run;
3913  sort_param->file_contents[big_index].num_pages[first_run] -= read_pages;
3914 
3915  /* Initialize input variables */
3916  in_cur_bufaddr[i] = in_sectaddr[i];
3917  in_act_bufno[i] = 0;
3918  in_last_buf[i] = read_pages;
3919  act_slot[i] = 0;
3920  last_slot[i] = sort_spage_get_numrecs (in_cur_bufaddr[i]);
3921 
3922  if (sort_spage_get_record (in_cur_bufaddr[i], act_slot[i], &smallest_elem_ptr[i], PEEK) != S_SUCCESS)
3923  {
3926  goto bailout;
3927  }
3928 
3929  /* If this is a long record retrieve it */
3930  if (smallest_elem_ptr[i].type == REC_BIGONE)
3931  {
3932  if (sort_retrieve_longrec (thread_p, &smallest_elem_ptr[i], &long_recdes[i]) == NULL)
3933  {
3934  ASSERT_ERROR ();
3935  error = er_errid ();
3936  goto bailout;
3937  }
3938  }
3939  }
3940 
3941  for (i = 0, p = sr_list; i < (act_infiles - 1); p = p->next)
3942  {
3943  p->next = (SORT_REC_LIST *) ((char *) p + sizeof (SORT_REC_LIST));
3944  p->rec_pos = i++;
3945  }
3946  p->next = NULL;
3947  p->rec_pos = i;
3948 
3949  for (s = sr_list; s; s = s->next)
3950  {
3951  for (p = s->next; p; p = p->next)
3952  {
3953  do_swap = false;
3954 
3955  data1 = ((smallest_elem_ptr[s->rec_pos].type == REC_BIGONE)
3956  ? &(long_recdes[s->rec_pos].data) : &(smallest_elem_ptr[s->rec_pos].data));
3957 
3958  data2 = ((smallest_elem_ptr[p->rec_pos].type == REC_BIGONE)
3959  ? &(long_recdes[p->rec_pos].data) : &(smallest_elem_ptr[p->rec_pos].data));
3960 
3961  cmp = (*compare) (data1, data2, compare_arg);
3962  if (cmp > 0)
3963  {
3964  do_swap = true;
3965  }
3966 
3967  if (do_swap)
3968  {
3969  tmp_pos = s->rec_pos;
3970  s->rec_pos = p->rec_pos;
3971  p->rec_pos = tmp_pos;
3972  }
3973  }
3974  }
3975 
3976  /* set min_p to point minimum record */
3977  min_p = sr_list; /* min_p->rec_pos is min record */
3978 
3979  /* last element comparison */
3980  last_elem_is_min = false;
3981  p = min_p->next; /* second smallest element */
3982 
3983  if (p)
3984  {
3985  /* STEP 1: get last_elem */
3986  if (sort_spage_get_record (in_cur_bufaddr[min_p->rec_pos], (last_slot[min_p->rec_pos] - 1),
3987  &last_elem_ptr, PEEK) != S_SUCCESS)
3988  {
3991  goto bailout;
3992  }
3993 
3994  /* if this is a long record then retrieve it */
3995  if (last_elem_ptr.type == REC_BIGONE)
3996  {
3997  if (sort_retrieve_longrec (thread_p, &last_elem_ptr, &last_long_recdes) == NULL)
3998  {
3999  ASSERT_ERROR ();
4000  error = er_errid ();
4001  goto bailout;
4002  }
4003  }
4004 
4005  /* STEP 2: compare last, p */
4006  data1 = ((last_elem_ptr.type == REC_BIGONE) ? &(last_long_recdes.data) : &(last_elem_ptr.data));
4007 
4008  data2 = ((smallest_elem_ptr[p->rec_pos].type == REC_BIGONE)
4009  ? &(long_recdes[p->rec_pos].data) : &(smallest_elem_ptr[p->rec_pos].data));
4010 
4011  cmp = (*compare) (data1, data2, compare_arg);
4012  if (cmp <= 0)
4013  {
4014  last_elem_is_min = true;
4015  }
4016  }
4017 
4018  /* INITIALIZE OUTPUT SECTION AND OUTPUT VARIABLES */
4019  out_act_bufno = 0;
4020  out_cur_bufaddr = out_sectaddr;
4021  for (i = 0; i < out_sectsize; i++)
4022  {
4023  /* Initialize each buffer to contain a slotted page */
4025  }
4026 
4027  /* Initialize the size of next run to zero */
4028  out_runsize = 0;
4029 
4030  for (;;)
4031  {
4032  /* OUTPUT A RECORD */
4033 
4034  /* FIND MINIMUM RECORD IN THE INPUT AREA */
4035  min = min_p->rec_pos;
4036 
4037  if (very_last_run)
4038  {
4039  /* OUTPUT THE RECORD */
4040  /* Obtain the output record for this temporary record */
4041  if (smallest_elem_ptr[min].type == REC_BIGONE)
4042  {
4043  error = (*sort_param->put_fn) (thread_p, &long_recdes[min], sort_param->put_arg);
4044  if (error != NO_ERROR)
4045  {
4046  goto bailout;
4047  }
4048  }
4049  else
4050  {
4051  sort_rec = (SORT_REC *) (smallest_elem_ptr[min].data);
4052  /* cut-off link used in Internal Sort */
4053  sort_rec->next = NULL;
4054  error = (*sort_param->put_fn) (thread_p, &smallest_elem_ptr[min], sort_param->put_arg);
4055  if (error != NO_ERROR)
4056  {
4057  goto bailout;
4058  }
4059  }
4060  }
4061  else
4062  {
4063  /* OUTPUT THE MINIMUM RECORD TO THE OUTPUT AREA */
4064 
4065  /* Insert this record to the output area */
4066  if (sort_spage_insert (out_cur_bufaddr, &smallest_elem_ptr[min]) == NULL_SLOTID)
4067  {
4068  /* Current output buffer is full */
4069 
4070  if (++out_act_bufno < out_sectsize)
4071  {
4072  /* There is another buffer in the output section; so insert the new record there */
4073  out_cur_bufaddr += DB_PAGESIZE;
4074 
4075  if (sort_spage_insert (out_cur_bufaddr, &smallest_elem_ptr[min]) == NULL_SLOTID)
4076  {
4077  /*
4078  * Slotted page module refuses to insert a short
4079  * size record (a temporary record that was
4080  * already in a slotted page) to an empty page.
4081  * This should never happen.
4082  */
4084  error = ER_GENERIC_ERROR;
4085  goto bailout;
4086  }
4087  }
4088  else
4089  {
4090  /* Output section is full */
4091  /* Flush output section */
4092  error =
4093  sort_write_area (thread_p, &sort_param->temp[cur_outfile], cur_page[cur_outfile],
4094  out_sectsize, out_sectaddr);
4095  if (error != NO_ERROR)
4096  {
4097  goto bailout;
4098  }
4099  cur_page[cur_outfile] += out_sectsize;
4100  out_runsize += out_sectsize;
4101 
4102  /* Initialize output section and output variables */
4103  out_act_bufno = 0;
4104  out_cur_bufaddr = out_sectaddr;
4105  for (i = 0; i < out_sectsize; i++)
4106  {
4107  /* Initialize each buffer to contain a slotted page */
4109  MAX_ALIGNMENT);
4110  }
4111 
4112  if (sort_spage_insert (out_cur_bufaddr, &smallest_elem_ptr[min]) == NULL_SLOTID)
4113  {
4114  /*
4115  * Slotted page module refuses to insert a short
4116  * size record (a temporary record that was
4117  * already in a slotted page) to an empty page.
4118  * This should never happen.
4119  */
4121  error = ER_GENERIC_ERROR;
4122  goto bailout;
4123  }
4124  }
4125  }
4126  }
4127 
4128  /* PROCEED THE smallest_elem_ptr[min] TO NEXT RECORD */
4129  if (++act_slot[min] >= last_slot[min])
4130  {
4131  /* The current input page is finished */
4132 
4133  last_elem_is_min = false;
4134 
4135  if (++in_act_bufno[min] < in_last_buf[min])
4136  {
4137  /* Switch to the next page in the input buffer */
4138  in_cur_bufaddr[min] = in_sectaddr[min] + in_act_bufno[min] * DB_PAGESIZE;
4139  }
4140  else
4141  {
4142  /* The input section is finished */
4143  big_index = sort_param->in_half + min;
4144  first_run = sort_param->file_contents[big_index].first_run;
4145  if (sort_param->file_contents[big_index].num_pages[first_run])
4146  {
4147  /* There are still some pages in the current input run */
4148 
4149  in_cur_bufaddr[min] = in_sectaddr[min];
4150 
4151  read_pages = sort_param->file_contents[big_index].num_pages[first_run];
4152  if (in_sectsize < read_pages)
4153  {
4154  read_pages = in_sectsize;
4155  }
4156 
4157  in_last_buf[min] = read_pages;
4158 
4159  error =
4160  sort_read_area (thread_p, &sort_param->temp[big_index], cur_page[big_index], read_pages,
4161  in_cur_bufaddr[min]);
4162  if (error != NO_ERROR)
4163  {
4164  goto bailout;
4165  }
4166 
4167  /* Increment the current page of this input_file */
4168  cur_page[big_index] += read_pages;
4169 
4170  in_act_bufno[min] = 0;
4171  first_run = sort_param->file_contents[big_index].first_run;
4172  sort_param->file_contents[big_index].num_pages[first_run] -= read_pages;
4173  }
4174  else
4175  {
4176  /* Current input run on this input file has finished */
4177 
4178  /* remove current input run in input section. proceed to next minimum record. */
4179  min_p = min_p->next;
4180 
4181  if (min_p == NULL)
4182  {
4183  /* all "smallest_elem_ptr" are NULL; so break */
4184  break;
4185  }
4186  else
4187  {
4188  /* Don't try to get the next record on this input section */
4189  continue;
4190  }
4191  }
4192  }
4193 
4194  act_slot[min] = 0;
4195  last_slot[min] = sort_spage_get_numrecs (in_cur_bufaddr[min]);
4196  }
4197 
4198  if (sort_spage_get_record (in_cur_bufaddr[min], act_slot[min], &smallest_elem_ptr[min], PEEK) !=
4199  S_SUCCESS)
4200  {
4203  goto bailout;
4204  }
4205 
4206  /* If this is a long record retrieve it */
4207  if (smallest_elem_ptr[min].type == REC_BIGONE)
4208  {
4209  if (sort_retrieve_longrec (thread_p, &smallest_elem_ptr[min], &long_recdes[min]) == NULL)
4210  {
4211  ASSERT_ERROR ();
4212  error = er_errid ();
4213  goto bailout;
4214  }
4215  }
4216 
4217  /* find minimum */
4218  if (last_elem_is_min == true)
4219  {
4220  /* already find min */
4221  ;
4222  }
4223  else
4224  {
4225  for (s = min_p; s; s = s->next)
4226  {
4227  p = s->next;
4228  if (p == NULL)
4229  {
4230  /* there is only one record */
4231  break;
4232  }
4233 
4234  do_swap = false;
4235 
4236  data1 = ((smallest_elem_ptr[s->rec_pos].type == REC_BIGONE)
4237  ? &(long_recdes[s->rec_pos].data) : &(smallest_elem_ptr[s->rec_pos].data));
4238 
4239  data2 = ((smallest_elem_ptr[p->rec_pos].type == REC_BIGONE)
4240  ? &(long_recdes[p->rec_pos].data) : &(smallest_elem_ptr[p->rec_pos].data));
4241 
4242  cmp = (*compare) (data1, data2, compare_arg);
4243  if (cmp > 0)
4244  {
4245  do_swap = true;
4246  }
4247 
4248  if (do_swap)
4249  {
4250  /* swap s, p */
4251  tmp_pos = s->rec_pos;
4252  s->rec_pos = p->rec_pos;
4253  p->rec_pos = tmp_pos;
4254  }
4255  else
4256  {
4257  /* sr_list is completely sorted */
4258  break;
4259  }
4260  }
4261 
4262  /* new input page is entered */
4263  if (act_slot[min_p->rec_pos] == 0)
4264  {
4265  /* last element comparison */
4266  p = min_p->next; /* second smallest element */
4267  if (p)
4268  {
4269  /* STEP 1: get last_elem */
4270  if (sort_spage_get_record (in_cur_bufaddr[min_p->rec_pos], (last_slot[min_p->rec_pos] - 1),
4271  &last_elem_ptr, PEEK) != S_SUCCESS)
4272  {
4275  goto bailout;
4276  }
4277 
4278  /* if this is a long record retrieve it */
4279  if (last_elem_ptr.type == REC_BIGONE)
4280  {
4281  if (sort_retrieve_longrec (thread_p, &last_elem_ptr, &last_long_recdes) == NULL)
4282  {
4283  ASSERT_ERROR ();
4284  error = er_errid ();
4285  goto bailout;
4286  }
4287  }
4288 
4289  /* STEP 2: compare last, p */
4290  data1 =
4291  ((last_elem_ptr.type == REC_BIGONE) ? &(last_long_recdes.data) : &(last_elem_ptr.data));
4292 
4293  data2 = ((smallest_elem_ptr[p->rec_pos].type == REC_BIGONE)
4294  ? &(long_recdes[p->rec_pos].data) : &(smallest_elem_ptr[p->rec_pos].data));
4295 
4296  cmp = (*compare) (data1, data2, compare_arg);
4297  if (cmp <= 0)
4298  {
4299  last_elem_is_min = true;
4300  }
4301  }
4302  }
4303  }
4304  }
4305 
4306  if (!very_last_run)
4307  {
4308  /* Flush whatever is left on the output section */
4309 
4310  out_act_bufno++; /* Since 0 refers to the first active buffer */
4311  error =
4312  sort_write_area (thread_p, &sort_param->temp[cur_outfile], cur_page[cur_outfile], out_act_bufno,
4313  out_sectaddr);
4314  if (error != NO_ERROR)
4315  {
4316  goto bailout;
4317  }
4318  cur_page[cur_outfile] += out_act_bufno;
4319  out_runsize += out_act_bufno;
4320  }
4321 
4322  /* END UP THIS RUN */
4323 
4324  /* Remove previous first_run nodes of the file_contents lists of the input files */
4325  for (i = sort_param->in_half; i < sort_param->in_half + sort_param->half_files; i++)
4326  {
4327  sort_run_remove_first (&sort_param->file_contents[i]);
4328  }
4329 
4330  /* Add a new node to the file_contents list of the current output file */
4331  error = sort_run_add_new (&sort_param->file_contents[cur_outfile], out_runsize);
4332  if (error != NO_ERROR)
4333  {
4334  goto bailout;
4335  }
4336 
4337  /* PRODUCE A NEW RUN */
4338 
4339  /* Switch to the next out file */
4340  if (++cur_outfile >= sort_param->half_files + out_half)
4341  {
4342  cur_outfile = out_half;
4343  }
4344  }
4345 
4346  /* Exchange input and output file indices */
4347  temp = sort_param->in_half;
4348  sort_param->in_half = out_half;
4349  out_half = temp;
4350  }
4351 
4352 bailout:
4353 
4354  for (i = 0; i < sort_param->half_files; i++)
4355  {
4356  if (long_recdes[i].data != NULL)
4357  {
4358  free_and_init (long_recdes[i].data);
4359  }
4360  }
4361 
4362  if (last_long_recdes.data)
4363  {
4364  free_and_init (last_long_recdes.data);
4365  }
4366 
4367  return (error == SORT_PUT_STOP) ? NO_ERROR : error;
4368 }
4369 
4370 /* AUXILIARY FUNCTIONS */
4371 
4372 /*
4373  * sort_get_avg_numpages_of_nonempty_tmpfile () - Return average number of pages
4374  * currently occupied by nonempty
4375  * temporary file
4376  * return:
4377  * sort_param(in): Sort paramater
4378  */
4379 static int
4381 {
4382  int f;
4383  int sum, i;
4384  int nonempty_temp_file_num = 0;
4385 
4386  sum = 0;
4387  for (i = 0; i < sort_param->tot_tempfiles; i++)
4388  {
4389  /* If the list is not empty */
4390  f = sort_param->file_contents[i].first_run;
4391  if (f > -1)
4392  {
4393  nonempty_temp_file_num++;
4394  for (; f <= sort_param->file_contents[i].last_run; f++)
4395  {
4396  sum += sort_param->file_contents[i].num_pages[f];
4397  }
4398  }
4399  }
4400 
4401  return (sum / MAX (1, nonempty_temp_file_num));
4402 }
4403 
4404 /*
4405  * sort_return_used_resources () - Return system resource used for sorting
4406  * return: void
4407  * sort_param(in): Sort paramater
4408  *
4409  * Note: Clear the sort parameter structure by deallocating any allocated
4410  * memory areas and destroying any temporary files and volumes.
4411  */
4412 static void
4414 {
4415  int k;
4416 #if defined(SERVER_MODE)
4417  int rv;
4418 #endif /* SERVER_MODE */
4419 
4420  if (sort_param == NULL)
4421  {
4422  return; /* nop */
4423  }
4424 
4425  if (sort_param->internal_memory)
4426  {
4427  free_and_init (sort_param->internal_memory);
4428  }
4429 
4430  for (k = 0; k < sort_param->tot_tempfiles; k++)
4431  {
4432  if (sort_param->temp[k].volid != NULL_VOLID)
4433  {
4434  (void) file_temp_retire (thread_p, &sort_param->temp[k]);
4435  }
4436  }
4437 
4438  if (sort_param->multipage_file.volid != NULL_VOLID)
4439  {
4440  (void) file_temp_retire (thread_p, &(sort_param->multipage_file));
4441  }
4442 
4443  for (k = 0; k < sort_param->tot_tempfiles; k++)
4444  {
4445  if (sort_param->file_contents[k].num_pages != NULL)
4446  {
4447  db_private_free_and_init (thread_p, sort_param->file_contents[k].num_pages);
4448  }
4449  }
4450 
4451  if (sort_param->px_array)
4452  {
4453  free_and_init (sort_param->px_array);
4454  }
4455  sort_param->px_height_max = sort_param->px_array_size = 0;
4456 
4457 #if defined(SERVER_MODE)
4458  rv = pthread_mutex_destroy (&(sort_param->px_mtx));
4459  if (rv != 0)
4460  {
4462  }
4463 #endif
4464 
4465  free_and_init (sort_param);
4466 }
4467 
4468 /*
4469  * sort_add_new_file () - Create a new temporary file for sorting purposes
4470  * return: NO_ERROR
4471  * vfid(in): Set to the created file identifier
4472  * file_pg_cnt_est(in): Estimated file page count
4473  * force_alloc(in): Allocate file pages now ?
4474  * tde_encrypted(in): whether the file has to be encrypted or not for TDE
4475  */
4476 static int
4477 sort_add_new_file (THREAD_ENTRY * thread_p, VFID * vfid, int file_pg_cnt_est, bool force_alloc, bool tde_encrypted)
4478 {
4479  VPID new_vpid;
4480  TDE_ALGORITHM tde_algo = TDE_ALGORITHM_NONE;
4481  int ret = NO_ERROR;
4482 
4483  /* todo: sort file is a case I missed that seems to use file_find_nthpages. I don't know if it can be optimized to
4484  * work without numerable files, that remains to be seen. */
4485 
4486  ret = file_create_temp_numerable (thread_p, file_pg_cnt_est, vfid);
4487  if (ret != NO_ERROR)
4488  {
4489  ASSERT_ERROR ();
4490  return ret;
4491  }
4492  if (VFID_ISNULL (vfid))
4493  {
4494  assert_release (false);
4495  return ER_FAILED;
4496  }
4497  if (tde_encrypted)
4498  {
4500  }
4501 
4502  ret = file_apply_tde_algorithm (thread_p, vfid, tde_algo);
4503  if (ret != NO_ERROR)
4504  {
4505  ASSERT_ERROR ();
4506  file_temp_retire (thread_p, vfid);
4507  VFID_SET_NULL (vfid);
4508  return ret;
4509  }
4510 
4511  if (force_alloc == false)
4512  {
4513  return NO_ERROR;
4514  }
4515 
4516  /* page allocation force is specified, allocate pages for the file */
4517  /* todo: we don't have multiple page allocation, but allocation should be fast enough */
4518  for (; file_pg_cnt_est > 0; file_pg_cnt_est--)
4519  {
4520  ret = file_alloc (thread_p, vfid, NULL, NULL, &new_vpid, NULL);
4521  if (ret != NO_ERROR)
4522  {
4523  ASSERT_ERROR ();
4524  file_temp_retire (thread_p, vfid);
4525  VFID_SET_NULL (vfid);
4526  return ret;
4527  }
4528  }
4529 
4530  return NO_ERROR;
4531 }
4532 
4533 /*
4534  * sort_write_area () - Write memory area to disk
4535  * return:
4536  * vfid(in): file identifier to write the pages contained in the area
4537  * first_page(in): first page to be written on the file
4538  * num_pages(in): size of the memory area in terms of number of pages it
4539  * accommodates
4540  * area_start(in): beginning address of the area
4541  *
4542  * Note: This function writes the contents of the given memory area to the
4543  * specified file starting from the given page. Before doing so, however,
4544  * it checks the size of the file and, if necessary, allocates new pages.
4545  * If new pages are needed but the disk is full, an error code is
4546  * returned.
4547  */
4548 static int
4549 sort_write_area (THREAD_ENTRY * thread_p, VFID * vfid, int first_page, INT32 num_pages, char *area_start)
4550 {
4551  PAGE_PTR page_ptr = NULL;
4552  VPID vpid;
4553  INT32 page_no;
4554  int i;
4555  int ret = NO_ERROR;
4556  TDE_ALGORITHM tde_algo = TDE_ALGORITHM_NONE;
4557 
4558  ret = file_get_tde_algorithm (thread_p, vfid, PGBUF_UNCONDITIONAL_LATCH, &tde_algo);
4559  if (ret != NO_ERROR)
4560  {
4561  return ret;
4562  }
4563  /* initializations */
4564  page_no = first_page;
4565 
4566  /* Flush pages buffered in the given area to the specified file */
4567 
4568  page_ptr = (PAGE_PTR) area_start;
4569 
4570  for (i = 0; i < num_pages; i++)
4571  {
4572  /* file is automatically expanded if page is not allocated (as long as it is missing only one page) */
4573  ret = file_numerable_find_nth (thread_p, vfid, page_no++, true, NULL, NULL, &vpid);
4574  if (ret != NO_ERROR)
4575  {
4576  ASSERT_ERROR ();
4577  return ret;
4578  }
4579  if (pgbuf_copy_from_area (thread_p, &vpid, 0, DB_PAGESIZE, page_ptr, true, tde_algo) == NULL)
4580  {
4581  ASSERT_ERROR_AND_SET (ret);
4582  return ret;
4583  }
4584 
4585  page_ptr += DB_PAGESIZE;
4586  }
4587 
4588  return NO_ERROR;
4589 }
4590 
4591 /*
4592  * sort_read_area () - Read memory area from disk
4593  * return:
4594  * vfid(in): file identifier to read the pages from
4595  * first_page(in): first page to be read from the file
4596  * num_pages(in): size of the memory area in terms of number of pages it
4597  * accommodates
4598  * area_start(in): beginning address of the area
4599  *
4600  * Note: This function reads in successive pages of the specified file into
4601  * the given memory area until this area becomes full.
4602  */
4603 static int
4604 sort_read_area (THREAD_ENTRY * thread_p, VFID * vfid, int first_page, INT32 num_pages, char *area_start)
4605 {
4606  PAGE_PTR page_ptr = NULL;
4607  VPID vpid;
4608  INT32 page_no;
4609  int i;
4610  int ret = NO_ERROR;
4611 
4612  vpid.volid = vfid->volid;
4613  page_no = first_page;
4614 
4615  /* Flush pages buffered in the given area to the specified file */
4616 
4617  page_ptr = (PAGE_PTR) area_start;
4618 
4619  for (i = 0; i < num_pages; i++)
4620  {
4621  ret = file_numerable_find_nth (thread_p, vfid, page_no++, false, NULL, NULL, &vpid);
4622  if (ret != NO_ERROR)
4623  {
4624  ASSERT_ERROR ();
4625  return ret;
4626  }
4627  if (pgbuf_copy_to_area (thread_p, &vpid, 0, DB_PAGESIZE, page_ptr, true) == NULL)
4628  {
4629  ASSERT_ERROR_AND_SET (ret);
4630  return ret;
4631  }
4632 
4633  page_ptr += DB_PAGESIZE;
4634  }
4635 
4636  return NO_ERROR;
4637 }
4638 
4639 /*
4640  * sort_get_num_half_tmpfiles () - Determines the number of temporary files to be used
4641  * during the sorting process
4642  * return:
4643  * tot_buffers(in): total number of buffers in the buffer pool area
4644  * input_pages(in): size of the input file in terms of number of pages it
4645  * occupies
4646  *
4647  */
4648 static int
4649 sort_get_num_half_tmpfiles (int tot_buffers, int input_pages)
4650 {
4651  int half_files = tot_buffers - 1;
4652  int exp_num_runs;
4653 
4654  /* If there is an estimate on number of input pages */
4655  if (input_pages > 0)
4656  {
4657  /* Conservatively estimate number of runs that will be produced */
4658  exp_num_runs = CEIL_PTVDIV (input_pages, tot_buffers) + 1;
4659 
4660  if (exp_num_runs < half_files)
4661  {
4662  if ((exp_num_runs > (tot_buffers / 2)))
4663  {
4664  half_files = exp_num_runs;
4665  }
4666  else
4667  {
4668  half_files = (tot_buffers / 2);
4669  }
4670  }
4671  }
4672 
4673  /* Precaution against underestimation on exp_num_runs and having too few files to merge them */
4674  if (half_files < SORT_MIN_HALF_FILES)
4675  {
4676  return SORT_MIN_HALF_FILES;
4677  }
4678 
4679  if (half_files < SORT_MAX_HALF_FILES)
4680  {
4681  return half_files;
4682  }
4683  else
4684  {
4685  /* Precaution against saturation (i.e. having too many files) */
4686  return SORT_MAX_HALF_FILES;
4687  }
4688 }
4689 
4690 /*
4691  * sort_checkalloc_numpages_of_outfiles () - Check sizes of output files
4692  * return: error code
4693  * sort_param(in): sort parameters
4694  *
4695  * Note: This function determines how many pages will be needed by each output
4696  * file of the current stage of the merging phase. This is done by going
4697  * over the file_contents lists of the input files and determining how
4698  * many pages they will eventually contribute to each output file.
4699  * (Again, this is an estimate, not an exact number on the size of output
4700  * files.) It then checks whether these output files have that many pages
4701  * already. If some of them need more pages, it allocates new pages.
4702  */
4703 static int
4705 {
4706  int out_file;
4707  int out_half;
4708  int needed_pages[2 * SORT_MAX_HALF_FILES];
4709  int contains;
4710  int alloc_pages;
4711  int i, j;
4712 
4713  int error_code = NO_ERROR;
4714 
4715  for (i = 0; i < (int) DIM (needed_pages); i++)
4716  {
4717  needed_pages[i] = 0;
4718  }
4719 
4720  if (sort_param->in_half == 0)
4721  {
4722  out_half = sort_param->half_files;
4723  }
4724  else
4725  {
4726  out_half = 0;
4727  }
4728 
4729  /* Estimate the sizes of all new runs to be flushed on output files */
4730  for (i = sort_param->in_half; i < sort_param->in_half + sort_param->half_files; i++)
4731  {
4732  out_file = out_half;
4733 
4734  /* If the list is not empty */
4735  j = sort_param->file_contents[i].first_run;
4736  if (j > -1)
4737  {
4738  for (; j <= sort_param->file_contents[i].last_run; j++)
4739  {
4740  needed_pages[out_file] += sort_param->file_contents[i].num_pages[j];
4741 
4742  if (++out_file >= out_half + sort_param->half_files)
4743  {
4744  out_file = out_half;
4745  }
4746  }
4747  }
4748  }
4749 
4750  /* Allocate enough pages to each output file We don't initialize pages during allocation since we do not care the
4751  * state of the pages after a rollback or system crashes. Nothing need to be log on the page. The pages are
4752  * initialized at a later time. */
4753 
4754  /* Files are traversed in reverse order, in order to destroy unnecessary files first. It is expected that returned
4755  * pages will be reused by the next allocation. */
4756  for (i = out_half + sort_param->half_files - 1; i >= out_half; i--)
4757  {
4758  if (needed_pages[i] > 0)
4759  {
4760  assert (!VFID_ISNULL (&sort_param->temp[i]));
4761  error_code = file_get_num_user_pages (thread_p, &sort_param->temp[i], &contains);
4762  if (error_code != NO_ERROR)
4763  {
4764  ASSERT_ERROR ();
4765  return error_code;
4766  }
4767  alloc_pages = (needed_pages[i] - contains);
4768  if (alloc_pages > 0)
4769  {
4770  error_code = file_alloc_multiple (thread_p, &sort_param->temp[i], NULL, NULL, alloc_pages, NULL);
4771  if (error_code != NO_ERROR)
4772  {
4773  ASSERT_ERROR ();
4774  return error_code;
4775  }
4776  }
4777  }
4778  else
4779  {
4780  /* If there is a file not to be used anymore, destroy it in order to reuse spaces. */
4781  if (!VFID_ISNULL (&sort_param->temp[i]))
4782  {
4783  error_code = file_temp_retire (thread_p, &sort_param->temp[i]);
4784  if (error_code != NO_ERROR)
4785  {
4786  ASSERT_ERROR ();
4787  return error_code;
4788  }
4789  VFID_SET_NULL (&sort_param->temp[i]);
4790  }
4791  }
4792  }
4793  return NO_ERROR;
4794 }
4795 
4796 /*
4797  * sort_get_numpages_of_active_infiles () - Find number of active input files
4798  * return:
4799  * sort_param(in): sort parameters
4800  *
4801  * Note: This function determines how many of the input files still
4802  * have input runs (active) to participate in while the merging
4803  * process which produces larger size runs. For this purpose,
4804  * it checks the file_contents list of each input file. Once the
4805  * first file with no remaining input runs (unactive) is found,
4806  * it is concluded that all the remaining input temporary files
4807  * are also inactive (because of balanced distribution of runs to
4808  * the files).
4809  */
4810 static int
4812 {
4813  int i;
4814 
4815  for (i = sort_param->in_half; i < sort_param->in_half + sort_param->half_files; i++)
4816  {
4817  if (sort_param->file_contents[i].first_run == -1)
4818  {
4819  break;
4820  }
4821  }
4822 
4823  return (i - sort_param->in_half);
4824 }
4825 
4826 /*
4827  * sort_find_inbuf_size () - Distribute buffers
4828  * return:
4829  * tot_buffers(in): number of total buffers in the buffer pool area
4830  * in_sections(in): number of input sections into which this buffer pool area
4831  * should be divided into (in other words, the number of
4832  * active input files)
4833  *
4834  * Note: This function distributes the buffers of the buffer pool area
4835  * (i.e., the internal memory) among the active input files and
4836  * the output file. Recall that each active input file and the
4837  * output file will have a section in the buffer pool area.
4838  * This function returns the size of each input section in terms
4839  * of number of buffers it occupies. Naturally, the output
4840  * section will have the remaining buffers.
4841  *
4842  * Note that when the input runs are merged together the
4843  * number of read operations is (roughly) equal to the
4844  * number of write operations. For that reason this function
4845  * reserves roughly half of the buffers for the output section
4846  * and distributes the remaining ones evenly among the input
4847  * sections, as each input run is approximately the same size.
4848  */
4849 static int
4850 sort_find_inbuf_size (int tot_buffers, int in_sections)
4851 {
4852  int in_sectsize;
4853 
4854  /* Allocate half of the total buffers to output buffer area */
4855  in_sectsize = (tot_buffers / (in_sections << 1));
4856  if (in_sectsize != 0)
4857  {
4858  return in_sectsize;
4859  }
4860  else
4861  {
4862  return 1;
4863  }
4864 }
4865 
4866 /*
4867  * sort_run_add_new () - Adds a new node to the end of the given list
4868  * return: NO_ERROR
4869  * file_contents(in): which list to add
4870  * num_pages(in): what value to put for the new run
4871  */
4872 static int
4874 {
4875  int new_total_elements;
4876  int ret = NO_ERROR;
4877 
4878  if (file_contents->first_run == -1)
4879  {
4880  /* This is an empty list */
4881  file_contents->first_run = 0;
4882  file_contents->last_run = 0;
4883  }
4884  else
4885  {
4886  file_contents->last_run++;
4887  }
4888 
4889  /* If there is no room in the dynamic array to keep the next element of the list; expand the dynamic array. */
4890  if (file_contents->last_run >= file_contents->num_slots)
4891  {
4892  new_total_elements = ((int) (((float) file_contents->num_slots * SORT_EXPAND_DYN_ARRAY_RATIO) + 0.5));
4893  file_contents->num_pages =
4894  (int *) db_private_realloc (NULL, file_contents->num_pages, new_total_elements * sizeof (int));
4895  if (file_contents->num_pages == NULL)
4896  {
4897  return ER_FAILED;
4898  }
4899  file_contents->num_slots = new_total_elements;
4900  }
4901 
4902  /* Put the "num_pages" info to the "last_run" slot of the array */
4903  file_contents->num_pages[file_contents->last_run] = num_pages;
4904 
4905  return ret;
4906 }
4907 
4908 /*
4909  * sort_run_remove_first () - Removes the first run of the given file contents list
4910  * return: void
4911  * file_contents(in): which list to remove from
4912  */
4913 static void
4915 {
4916  /* If the list is not empty */
4917  if (file_contents->first_run != -1)
4918  {
4919  /* remove the first element of the list */
4920  if (++file_contents->first_run > file_contents->last_run)
4921  {
4922  /* the list is empty now; indicate so */
4923  file_contents->first_run = -1;
4924  }
4925  }
4926 }
4927 
4928 /*
4929  * sort_get_num_file_contents () - Returns the number of elements kept in the
4930  * given linked list
4931  * return:
4932  * file_contents(in): which list
4933  */
4934 static int
4936 {
4937  /* If the list is not empty */
4938  if (file_contents->first_run != -1)
4939  {
4940  return (file_contents->last_run - file_contents->first_run + 1);
4941  }
4942  else
4943  {
4944  /* empty list */
4945  return (0);
4946  }
4947 }
4948 
4949 #if defined(CUBRID_DEBUG)
4950 /*
4951  * sort_print_file_contents () - Prints the elements of the given file contents list
4952  * return: void
4953  * file_contents(in): which list to print
4954  *
4955  * Note: It is used for debugging purposes.
4956  */
4957 static void
4958 sort_print_file_contents (const FILE_CONTENTS * file_contents)
4959 {
4960  int j;
4961 
4962  /* If the list is not empty */
4963  j = file_contents->first_run;
4964  if (j > -1)
4965  {
4966  fprintf (stdout, "File contents:\n");
4967  for (; j <= file_contents->last_run; j++)
4968  {
4969  fprintf (stdout, " Run with %3d pages\n", file_contents->num_pages[j]);
4970  }
4971  }
4972  else
4973  {
4974  fprintf (stdout, "Empty file:\n");
4975  }
4976 }
4977 #endif /* CUBRID_DEBUG */
char * PAGE_PTR
struct slotted_pheader SLOTTED_PAGE_HEADER
int file_temp_retire(THREAD_ENTRY *thread_p, const VFID *vfid)
VFID temp[2 *SORT_MAX_HALF_FILES]
int file_numerable_find_nth(THREAD_ENTRY *thread_p, const VFID *vfid, int nth, bool auto_alloc, FILE_INIT_PAGE_FUNC f_init, void *f_init_args, VPID *vpid_nth)
cubthread::entry * thread_get_thread_entry_info(void)
#define NO_ERROR
Definition: error_code.h:46
INT16 rtype
FILE_CONTENTS file_contents[2 *SORT_MAX_HALF_FILES]
int area_size
static int sort_exphase_merge_elim_dup(THREAD_ENTRY *thread_p, SORT_PARAM *sort_param)
SORT_DUP_OPTION option
#define SORT_MIN_HALF_FILES
Definition: external_sort.c:71
#define ASSERT_ERROR()
SCAN_CODE
static int sort_checkalloc_numpages_of_outfiles(THREAD_ENTRY *thread_p, SORT_PARAM *sort_param)
#define pthread_mutex_init(a, b)
Definition: area_alloc.c:48
#define FLOAT_ALIGNMENT
Definition: memory_alloc.h:63
VOL_LIST vol_list
long start
int file_alloc_multiple(THREAD_ENTRY *thread_p, const VFID *vfid, FILE_INIT_PAGE_FUNC f_init, void *f_init_args, int npages, VPID *vpids_out)
static void sort_spage_initialize(PAGE_PTR pgptr, INT16 slots_type, INT16 alignment)
#define ER_FAILED
Definition: error_code.h:47
const char * spage_alignment_string(unsigned short alignment)
PX_TREE_NODE * px_array
int SORT_PUT_FUNC(THREAD_ENTRY *thread_p, const RECDES *, void *)
Definition: external_sort.h:60
#define pthread_mutex_unlock(a)
Definition: area_alloc.c:51
static PX_TREE_NODE * px_sort_assign(THREAD_ENTRY *thread_p, SORT_PARAM *sort_param, int px_id, char **px_buff, char **px_vector, long px_vector_size, int px_height, int px_myself)
int file_create_temp_numerable(THREAD_ENTRY *thread_p, int npages, VFID *vfid)
#define ASSERT_ERROR_AND_SET(error_code)
char low_high
void thread_sleep(double millisec)
#define assert_release(e)
Definition: error_manager.h:96
bool thread_set_sort_stats_active(cubthread::entry *thread_p, bool new_flag)
SCAN_CODE overflow_get(THREAD_ENTRY *thread_p, const VPID *ovf_vpid, RECDES *recdes, MVCC_SNAPSHOT *mvcc_snapshot)
#define SHORT_ALIGNMENT
Definition: memory_alloc.h:60
INT16 volid
#define NULL_SLOTID
#define SORT_INITIAL_DYN_ARRAY_SIZE
Definition: external_sort.c:74
char * data
INT16 roffset
int er_errid(void)
static SCAN_CODE sort_spage_get_record(PAGE_PTR pgptr, INT16 slotid, RECDES *recdes, bool peek_p)
#define er_log_debug(...)
const char * spage_anchor_flag_string(const INT16 anchor_type)
void * pgbuf_copy_to_area(THREAD_ENTRY *thread_p, const VPID *vpid, int start_offset, int length, void *area, bool do_fetch)
Definition: page_buffer.c:4067
#define MAX_ALIGNMENT
Definition: memory_alloc.h:70
bool tde_encrypted
void * put_arg
#define VFID_ISNULL(vfid_ptr)
Definition: file_manager.h:72
void THREAD_ENTRY
#define CHAR_ALIGNMENT
Definition: memory_alloc.h:59
static int sort_find_inbuf_size(int tot_buffers, int in_sections)
static int sort_inphase_sort(THREAD_ENTRY *thread_p, SORT_PARAM *sort_param, SORT_GET_FUNC *get_next, void *arguments, unsigned int *total_numrecs)
static INT16 sort_spage_find_free(PAGE_PTR pgptr, SLOT **sptr, INT16 length, INT16 type, INT16 *space)
int sort_listfile(THREAD_ENTRY *thread_p, INT16 volid, int est_inp_pg_cnt, SORT_GET_FUNC *get_fn, void *get_arg, SORT_PUT_FUNC *put_fn, void *put_arg, SORT_CMP_FUNC *cmp_fn, void *cmp_arg, SORT_DUP_OPTION option, int limit, bool includes_tde_class)
static void sort_run_remove_first(FILE_CONTENTS *file_contents)
void er_set(int severity, const char *file_name, const int line_no, int err_id, int num_args,...)
#define ER_SORT_TEMP_PAGE_CORRUPTED
Definition: error_code.h:119
void css_push_external_task(CSS_CONN_ENTRY *conn, cubthread::entry_task *task)
static char ** sort_run_sort(THREAD_ENTRY *thread_p, SORT_PARAM *sort_param, char **base, long limit, long sort_numrecs, char **otherbase, long *srun_limit)
#define assert(x)
VFID multipage_file
unsigned short tree_depth
TDE_ALGORITHM
Definition: tde.h:71
int32_t fileid
Definition: dbtype_def.h:886
int file_get_num_user_pages(THREAD_ENTRY *thread_p, const VFID *vfid, int *n_user_pages_out)
int prm_get_integer_value(PARAM_ID prm_id)
#define ER_GENERIC_ERROR
Definition: error_code.h:49
int file_apply_tde_algorithm(THREAD_ENTRY *thread_p, const VFID *vfid, const TDE_ALGORITHM tde_algo)
#define ER_OUT_OF_VIRTUAL_MEMORY
Definition: error_code.h:50
static int sort_spage_compact(PAGE_PTR pgptr)
char ** px_vector
#define SORT_RECORD_LENGTH(item_p)
Definition: external_sort.h:43
static char * sort_retrieve_longrec(THREAD_ENTRY *thread_p, RECDES *address, RECDES *memory)
int file_get_tde_algorithm(THREAD_ENTRY *thread_p, const VFID *vfid, PGBUF_LATCH_CONDITION fix_head_cond, TDE_ALGORITHM *tde_algo)
static INT16 sort_spage_insert(PAGE_PTR pgptr, RECDES *recdes)
#define min(a, b)
short volid
Definition: dbtype_def.h:880
INT16 rlength
static int rv
Definition: area_alloc.c:52
static void sort_return_used_resources(THREAD_ENTRY *thread_p, SORT_PARAM *sort_param)
static void cleanup(int signo)
Definition: broker.c:717
void * out_arg
void * cmp_arg
#define NULL
Definition: freelistheap.h:34
int file_alloc(THREAD_ENTRY *thread_p, const VFID *vfid, FILE_INIT_PAGE_FUNC f_init, void *f_init_args, VPID *vpid_out, PAGE_PTR *page_out)
#define db_private_free_and_init(thrd, ptr)
Definition: memory_alloc.h:141
static int sort_read_area(THREAD_ENTRY *thread_p, VFID *vfid, int first_page, INT32 num_pages, char *area_start)
static int sort_add_new_file(THREAD_ENTRY *thread_p, VFID *vfid, int file_pg_cnt_est, bool force_alloc, bool tde_encrypted)
struct slot SLOT
#define db_private_alloc(thrd, size)
Definition: memory_alloc.h:227
CSS_CONN_ENTRY * css_get_current_conn_entry(void)
#define NULL_FILEID
struct sort_rec_list SORT_REC_LIST
struct sort_rec_list * next
#define NULL_OFFSET
int file_create_temp(THREAD_ENTRY *thread_p, int npages, VFID *vfid)
SORT_REC * next
Definition: external_sort.h:70
SORT_STATUS
Definition: external_sort.h:45
bool logtb_set_check_interrupt(THREAD_ENTRY *thread_p, bool flag)
#define CEIL_PTVDIV(dividend, divisor)
Definition: memory_alloc.h:50
long stop
char * internal_memory
#define cmp
Definition: mprec.h:351
#define ER_CSS_PTHREAD_MUTEX_DESTROY
Definition: error_code.h:998
VOL_INFO * vol_info
int vol_ent_cnt
#define CAST_BUFLEN
Definition: porting.h:471
SORT_CMP_FUNC * cmp_fn
static void error(const char *msg)
Definition: gencat.c:331
#define LOG_FIND_THREAD_TRAN_INDEX(thrd)
Definition: perf_monitor.h:158
#define SORT_CHECK_DUPLICATE(a, b)
Definition: external_sort.c:84
#define ARG_FILE_LINE
Definition: error_manager.h:44
void FIND_RUN_FN(char **, long *, SORT_STACK *, long, SORT_CMP_FUNC *, void *)
static int sort_get_numpages_of_active_infiles(const SORT_PARAM *sort_param)
void MERGE_RUN_FN(char **, char **, SORT_STACK *, SORT_CMP_FUNC *, void *)
#define SORT_PARTITION_RUN_SIZE_MIN
SORT_DUP_OPTION
Definition: external_sort.h:53
void * pgbuf_copy_from_area(THREAD_ENTRY *thread_p, const VPID *vpid, int start_offset, int length, void *area, bool do_fetch, TDE_ALGORITHM tde_algo)
Definition: page_buffer.c:4192
#define DOUBLE_ALIGNMENT
Definition: memory_alloc.h:64
#define free_and_init(ptr)
Definition: memory_alloc.h:147
static int sort_exphase_merge(THREAD_ENTRY *thread_p, SORT_PARAM *sort_param)
#define DB_ALIGN(offset, align)
Definition: memory_alloc.h:84
static int sort_get_num_half_tmpfiles(int tot_buffers, int input_pages)
#define LONG_ALIGNMENT
Definition: memory_alloc.h:62
#define DB_WASTED_ALIGN(offset, align)
Definition: memory_alloc.h:90
struct vpid VPID
Definition: dbtype_def.h:876
#define DB_PAGESIZE
static int sort_run_flush(THREAD_ENTRY *thread_p, SORT_PARAM *sort_param, int out_curfile, int *cur_page, char *output_buffer, char **index_area, int numrecs, int rec_type)
#define INT_ALIGNMENT
Definition: memory_alloc.h:61
static int sort_validate(char **vector, long size, SORT_CMP_FUNC *compare, void *comp_arg)
static int sort_spage_offsetcmp(const void *s1, const void *s2)
static int sort_get_num_file_contents(FILE_CONTENTS *file_contents)
static void sort_run_find(char **source, long *top, SORT_STACK *st_p, long limit, SORT_CMP_FUNC *compare, void *comp_arg, SORT_DUP_OPTION option)
static INT16 sort_spage_get_numrecs(PAGE_PTR pgptr)
char ** px_result
int i
Definition: dynamic_load.c:954
#define SORT_EXPAND_DYN_ARRAY_RATIO
Definition: external_sort.c:77
SORT_PUT_FUNC * put_fn
static void sort_run_flip(char **start, char **stop)
static int sort_get_avg_numpages_of_nonempty_tmpfile(SORT_PARAM *sort_param)
static int sort_write_area(THREAD_ENTRY *thread_p, VFID *vfid, int first_page, INT32 num_pages, char *area_start)
for(p=libs;*p;p++)
Definition: dynamic_load.c:968
INT16 type
#define SORT_MAXREC_LENGTH
Definition: external_sort.c:79
bool spage_is_valid_anchor_type(const INT16 anchor_type)
Definition: slotted_page.c:364
INT16 volid
#define NULL_VOLID
#define pthread_mutex_lock(a)
Definition: area_alloc.c:50
short volid
Definition: dbtype_def.h:887
#define SORT_PUT_STOP
Definition: external_sort.h:38
static int sort_run_add_new(FILE_CONTENTS *file_contents, int num_pages)
#define ER_CSS_PTHREAD_MUTEX_INIT
Definition: error_code.h:997
char ** px_buff
int overflow_get_length(THREAD_ENTRY *thread_p, const VPID *ovf_vpid)
#define db_private_realloc(thrd, ptr, size)
Definition: memory_alloc.h:231
#define SORT_RECORD_LENGTH_SIZE
Definition: external_sort.h:42
#define PEEK
Definition: file_io.h:74
int SORT_CMP_FUNC(const void *, const void *, void *)
Definition: external_sort.h:61
static void sort_append(const void *pk0, const void *pk1)
int overflow_insert(THREAD_ENTRY *thread_p, const VFID *ovf_vfid, VPID *ovf_vpid, RECDES *recdes, FILE_TYPE file_type)
Definition: overflow_file.c:95
callable_task< entry > entry_callable_task
SORT_STATUS SORT_GET_FUNC(THREAD_ENTRY *thread_p, RECDES *, void *)
Definition: external_sort.h:59
#define VFID_SET_NULL(vfid_ptr)
Definition: file_manager.h:65
const char ** p
Definition: dynamic_load.c:945
static void sort_run_merge(char **low, char **high, SORT_STACK *st_p, SORT_CMP_FUNC *compare, void *comp_arg, SORT_DUP_OPTION option)
#define pthread_mutex_destroy(a)
Definition: area_alloc.c:49
static int px_sort_myself(THREAD_ENTRY *thread_p, PX_TREE_NODE *px_node)
#define SORT_MAX_HALF_FILES
Definition: external_sort.c:64