File external_sort.c¶
File List > cubrid > src > storage > external_sort.c
Go to the documentation of this file
/*
* Copyright 2008 Search Solution Corporation
* Copyright 2016 CUBRID Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
/*
* external_sort.c - External sorting module
*/
#ident "$Id$"
#include "config.h"
#include <stdio.h>
#include <string.h>
#include <stddef.h>
#include <stdlib.h>
#include <math.h>
#include "error_manager.h"
#include "system_parameter.h"
#include "memory_alloc.h"
#include "external_sort.h"
#include "file_manager.h"
#include "page_buffer.h"
#include "log_manager.h"
#include "disk_manager.h"
#include "slotted_page.h"
#include "overflow_file.h"
#include "boot_sr.h"
#if defined(ENABLE_SYSTEMTAP)
#include "probes.h"
#endif /* ENABLE_SYSTEMTAP */
#if defined(SERVER_MODE)
#include "connection_error.h"
#endif /* SERVER_MODE */
#include "server_support.h"
#include "thread_entry_task.hpp"
#include "thread_manager.hpp" // for thread_get_thread_entry_info and thread_sleep
#include "list_file.h"
#include "query_manager.h"
#include "object_representation.h"
#include "px_worker_manager.hpp"
#include "px_callable_task.hpp"
#include "px_parallel.hpp" /* parallel_query::compute_parallel_degree */
#include "px_sort.h"
#include "btree_load.h"
#include "px_ftab_set.hpp"
#include <functional>
// XXX: SHOULD BE THE LAST INCLUDE HEADER
#include "memory_wrapper.hpp"
/* Estimate on number of pages in the multipage temporary file */
#define SORT_MULTIPAGE_FILE_SIZE_ESTIMATE 20
/* Upper limit on the half of the total number of the temporary files.
* The exact upper limit on total number of temp files is twice this number.
* (i.e., this number specifies the upper limit on the number of total input
* or total output files at each stage of the merging process.
*/
#define SORT_MAX_HALF_FILES 4
#define SORT_MAX_TOT_FILES SORT_MAX_HALF_FILES * 2
/* Lower limit on the half of the total number of the temporary files.
* The exact lower limit on total number of temp files is twice this number.
* (i.e., this number specifies the lower limit on the number of total input
* or total output files at each stage of the merging process.
*/
#define SORT_MIN_HALF_FILES 2
/* Initial size of the dynamic array that keeps the file contents list */
#define SORT_INITIAL_DYN_ARRAY_SIZE 30
/* Expansion Ratio of the dynamic array that keeps the file contents list */
#define SORT_EXPAND_DYN_ARRAY_RATIO 1.5
#define SORT_MAXREC_LENGTH \
((ssize_t)(DB_PAGESIZE - sizeof(SLOTTED_PAGE_HEADER) - sizeof(SLOT)))
#define SORT_SWAP_PTR(a,b) { char **temp; temp = a; a = b; b = temp; }
#define SORT_CHECK_DUPLICATE(a, b) \
do { \
if (cmp == 0) { \
if (option == SORT_DUP) \
sort_append(a, b); \
*(a) = NULL; \
dup_num++; \
} \
} while (0)
typedef struct file_contents FILE_CONTENTS;
struct file_contents
{ /* node of the file_contents linked list */
int *num_pages; /* Dynamic array whose elements keep the sizes of the runs contained in the file in
* terms of number of slotted pages it occupies */
int num_slots; /* Total number of elements the array has */
int first_run; /* The index of the array element keeping the size of the first run of the file. */
int last_run; /* The index of the array element keeping the size of the last run of the file. */
int start_index; /* Used when splitting a run in parallel processing. */
};
typedef struct vol_info VOL_INFO;
struct vol_info
{ /* volume information */
INT16 volid; /* Volume identifier */
int file_cnt; /* Current number of files in the volume */
};
typedef struct vol_list VOL_LIST;
struct vol_list
{ /* temporary volume identifier list */
INT16 volid; /* main sorting disk volume */
int vol_ent_cnt; /* number of array entries */
int vol_cnt; /* number of temporary volumes */
VOL_INFO *vol_info; /* array of volume information */
};
typedef struct sort_param SORT_PARAM;
struct sort_param
{
VFID temp[SORT_MAX_TOT_FILES]; /* Temporary file identifiers */
VFID multipage_file; /* Temporary file for multi page sorting records */
FILE_CONTENTS file_contents[SORT_MAX_TOT_FILES]; /* Contents of each temporary file */
bool tde_encrypted; /* whether related temp files are encrypted (TDE) or not */
VOL_LIST vol_list; /* Temporary volume information list */
char *internal_memory; /* Internal_memory used for internal sorting phase and as input/output buffers for temp
* files during merging phase */
int tot_runs; /* Total number of runs */
int tot_buffers; /* Size of internal memory used in terms of number of buffers it occupies */
int tot_tempfiles; /* Total number of temporary files */
int half_files; /* Half number of temporary files */
int in_half; /* Which half of temp files is for input */
void *out_arg; /* Arguments to pass to the output function */
/* Comparison function to use in the internal sorting and the merging phases */
SORT_CMP_FUNC *cmp_fn;
void *cmp_arg;
SORT_DUP_OPTION option;
/* input function to apply on temporary records */
SORT_GET_FUNC *get_fn;
void *get_arg;
/* output function to apply on temporary records */
SORT_PUT_FUNC *put_fn;
void *put_arg;
/* Estimated number of pages in each temp file (used in initialization) */
int tmp_file_pgs;
/* Details about the "limit" clause */
int limit;
/* total number of recordes */
unsigned int total_numrecs;
/* support parallelism */
PX_STATUS px_status;
int px_result_file_idx;
THREAD_ENTRY *px_orig_thread_p;
SORT_PARALLEL_TYPE px_type;
SORT_PARAM *ori_sort_param;
int px_parallel_num;
RESULT_RUN *px_result_run;
parallel_query::worker_manager * px_worker_manager;
ORDERBY_STATS orderby_stats;
cuberr::context * main_error_context;
#if defined(SERVER_MODE)
pthread_mutex_t *px_mtx; /* px_status mutex */
pthread_cond_t *complete_cond; /* complete condition */
#endif
};
typedef struct sort_rec_list SORT_REC_LIST;
struct sort_rec_list
{
struct sort_rec_list *next; /* next sorted record item */
int rec_pos; /* record position */
bool is_duplicated; /* duplicated sort_key record flag */
}; /* Sort record list */
typedef struct slotted_pheader SLOTTED_PAGE_HEADER;
struct slotted_pheader
{
INT16 nslots; /* Number of allocated slots for the page */
INT16 nrecs; /* Number of records on page */
INT16 anchor_flag; /* Valid ANCHORED, ANCHORED_DONT_REUSE_SLOTS UNANCHORED_ANY_SEQUENCE,
* UNANCHORED_KEEP_SEQUENCE */
INT16 alignment; /* Alignment for records. */
INT16 waste_align; /* Number of bytes waste because of alignment */
INT16 tfree; /* Total free space on page */
INT16 cfree; /* Contiguous free space on page */
INT16 foffset; /* Byte offset from the beginning of the page to the first free byte area on the page. */
};
typedef struct slot SLOT;
struct slot
{
INT16 roffset; /* Byte Offset from the beginning of the page to the beginning of the record */
INT16 rlength; /* Length of record */
INT16 rtype; /* Record type described by slot. */
};
typedef struct run_struct RUN;
struct run_struct
{
long start;
long stop;
};
typedef struct srun SRUN;
struct srun
{
char low_high; /* location info LOW('L') : otherbase HIGH('H') : base */
unsigned short tree_depth; /* depth of this node : leaf is 1 */
long start;
long stop;
};
typedef struct sort_stack SORT_STACK;
struct sort_stack
{
int top;
SRUN *srun;
};
typedef void FIND_RUN_FN (char **, long *, SORT_STACK *, long, SORT_CMP_FUNC *, void *);
typedef void MERGE_RUN_FN (char **, char **, SORT_STACK *, SORT_CMP_FUNC *, void *);
#if !defined(NDEBUG)
static int sort_validate (char **vector, long size, SORT_CMP_FUNC * compare, void *comp_arg);
#endif
static int sort_listfile_internal (THREAD_ENTRY * thread_p, SORT_PARAM * sort_param);
static int sort_inphase_sort (THREAD_ENTRY * thread_p, SORT_PARAM * sort_param, SORT_GET_FUNC * get_next,
void *arguments, unsigned int *total_numrecs);
static int sort_exphase_merge_elim_dup (THREAD_ENTRY * thread_p, SORT_PARAM * sort_param);
static int sort_exphase_merge (THREAD_ENTRY * thread_p, SORT_PARAM * sort_param);
static int sort_get_avg_numpages_of_nonempty_tmpfile (SORT_PARAM * sort_param);
static void sort_return_used_resources (THREAD_ENTRY * thread_p, SORT_PARAM * sort_param, PARALLEL_TYPE parallel_type);
static int sort_add_new_file (THREAD_ENTRY * thread_p, VFID * vfid, int file_pg_cnt_est, bool force_alloc,
bool tde_encrypted);
static int sort_write_area (THREAD_ENTRY * thread_p, VFID * vfid, int first_page, INT32 num_pages, char *area_start,
bool tde_encrypted);
static int sort_read_area (THREAD_ENTRY * thread_p, VFID * vfid, int first_page, INT32 num_pages, char *area_start);
static int sort_get_num_half_tmpfiles (int tot_buffers, int input_pages);
static int sort_checkalloc_numpages_of_outfiles (THREAD_ENTRY * thread_p, SORT_PARAM * sort_param);
static int sort_get_numpages_of_active_infiles (const SORT_PARAM * sort_param);
static int sort_find_inbuf_size (int tot_buffers, int in_sections);
static char *sort_retrieve_longrec (THREAD_ENTRY * thread_p, RECDES * address, RECDES * memory);
static char **sort_run_sort (THREAD_ENTRY * thread_p, SORT_PARAM * sort_param, char **base, long limit,
long sort_numrecs, char **otherbase, long *srun_limit);
static int sort_run_add_new (FILE_CONTENTS * file_contents, int num_pages);
static void sort_run_remove_first (FILE_CONTENTS * file_contents);
static void sort_run_flip (char **start, char **stop);
static void sort_run_find (char **source, long *top, SORT_STACK * st_p, long limit, SORT_CMP_FUNC * compare,
void *comp_arg, SORT_DUP_OPTION option);
static void sort_run_merge (char **low, char **high, SORT_STACK * st_p, SORT_CMP_FUNC * compare, void *comp_arg,
SORT_DUP_OPTION option);
static int sort_run_flush (THREAD_ENTRY * thread_p, SORT_PARAM * sort_param, int out_curfile, int *cur_page,
char *output_buffer, char **index_area, int numrecs, int rec_type);
static int sort_get_num_file_contents (FILE_CONTENTS * file_contents);
#if defined(CUBRID_DEBUG)
static void sort_print_file_contents (const FILE_CONTENTS * file_contents);
#endif /* CUBRID_DEBUG */
static void sort_spage_initialize (PAGE_PTR pgptr, INT16 slots_type, INT16 alignment);
static INT16 sort_spage_get_numrecs (PAGE_PTR pgptr);
static INT16 sort_spage_insert (PAGE_PTR pgptr, RECDES * recdes);
static SCAN_CODE sort_spage_get_record (PAGE_PTR pgptr, INT16 slotid, RECDES * recdes, bool peek_p);
static int sort_spage_offsetcmp (const void *s1, const void *s2);
static int sort_spage_compact (PAGE_PTR pgptr);
static INT16 sort_spage_find_free (PAGE_PTR pgptr, SLOT ** sptr, INT16 length, INT16 type, INT16 * space);
#if defined(CUBRID_DEBUG)
static INT16 sort_spage_dump_sptr (SLOT * sptr, INT16 nslots, INT16 alignment);
static void sort_spage_dump_hdr (SLOTTED_PAGE_HEADER * sphdr);
static void sort_spage_dump (PAGE_PTR pgptr, int rec_p);
#endif /* CUBRID_DEBUG */
static void sort_append (const void *pk0, const void *pk1);
static int sort_merge_run_for_parallel_index_leaf_build (THREAD_ENTRY * thread_p, SORT_PARAM * px_sort_param,
SORT_PARAM * sort_param, int parallel_num);
/*
* sort_spage_initialize () - Initialize a slotted page
* return: void
* pgptr(in): Pointer to slotted page
* slots_type(in): Flag which indicates the type of slots
* alignment(in): Type of alignment
*
* Note: A slotted page must be initialized before records are inserted on the
* page. The alignment indicates the valid offset where the records
* should be stored. This is a requirment for peeking records on pages
* according to their alignmnet restrictions.
*/
static void
sort_spage_initialize (PAGE_PTR pgptr, INT16 slots_type, INT16 alignment)
{
SLOTTED_PAGE_HEADER *sphdr;
sphdr = (SLOTTED_PAGE_HEADER *) pgptr;
sphdr->nslots = 0;
sphdr->nrecs = 0;
#if defined(CUBRID_DEBUG)
if (!spage_is_valid_anchor_type (slots_type))
{
(void) fprintf (stderr,
"sort_spage_initialize: **INTERFACE SYSTEM ERROR BAD value for slots_type = %d.\n"
" UNANCHORED_KEEP_SEQUENCE was assumed **\n", slots_type);
slots_type = UNANCHORED_KEEP_SEQUENCE;
}
if (!(alignment == CHAR_ALIGNMENT || alignment == SHORT_ALIGNMENT || alignment == INT_ALIGNMENT
|| alignment == LONG_ALIGNMENT || alignment == FLOAT_ALIGNMENT || alignment == DOUBLE_ALIGNMENT))
{
(void) fprintf (stderr,
"sort_spage_initialize: **INTERFACE SYSTEM ERROR BAD value = %d"
" for alignment. %d was assumed\n. Alignment must be"
" either SIZEOF char, short, int, long, float, or double", alignment, MAX_ALIGNMENT);
alignment = MAX_ALIGNMENT;
}
#endif /* CUBRID_DEBUG */
sphdr->anchor_flag = slots_type;
sphdr->tfree = DB_PAGESIZE - sizeof (SLOTTED_PAGE_HEADER);
sphdr->cfree = sphdr->tfree;
sphdr->foffset = sizeof (SLOTTED_PAGE_HEADER);
sphdr->alignment = alignment;
sphdr->waste_align = DB_WASTED_ALIGN (sphdr->foffset, alignment);
if (sphdr->waste_align != 0)
{
sphdr->foffset += sphdr->waste_align;
sphdr->cfree -= sphdr->waste_align;
sphdr->tfree -= sphdr->waste_align;
}
}
/*
* sort_spage_get_numrecs () - Return the total number of records on the slotted page
* return:
* pgptr(in): Pointer to slotted page
*/
static INT16
sort_spage_get_numrecs (PAGE_PTR pgptr)
{
SLOTTED_PAGE_HEADER *sphdr;
sphdr = (SLOTTED_PAGE_HEADER *) pgptr;
return sphdr->nrecs;
}
/*
* sort_spage_offsetcmp () - Compare the location (offset) of slots
* return: sp1 - sp2
* sp1(in): slot 1
* sp2(in): slot 2
*/
static int
sort_spage_offsetcmp (const void *sp1, const void *sp2)
{
SLOT *s1, *s2;
INT16 l1, l2;
s1 = *(SLOT **) sp1;
s2 = *(SLOT **) sp2;
l1 = s1->roffset;
l2 = s2->roffset;
return (l1 < l2) ? -1 : (l1 == l2) ? 0 : 1;
}
/*
* sort_spage_compact () - Compact an slotted page
* return: NO_ERROR
* pgptr(in): Pointer to slotted page
*
* Note: Only the records are compacted, the slots are not compacted.
*/
static int
sort_spage_compact (PAGE_PTR pgptr)
{
int i, j;
SLOTTED_PAGE_HEADER *sphdr;
SLOT *sptr;
SLOT **sortptr;
INT16 to_offset;
int ret = NO_ERROR;
sphdr = (SLOTTED_PAGE_HEADER *) pgptr;
sortptr = (SLOT **) calloc ((unsigned) sphdr->nrecs, sizeof (SLOT *));
if (sortptr == NULL)
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, sphdr->nrecs * sizeof (SLOT *));
return ER_OUT_OF_VIRTUAL_MEMORY;
}
/* Populate the array to sort and then sort it */
sptr = (SLOT *) ((char *) pgptr + DB_PAGESIZE - sizeof (SLOT));
for (j = 0, i = 0; i < sphdr->nslots; sptr--, i++)
{
if (sptr->roffset != NULL_OFFSET)
{
sortptr[j++] = sptr;
}
}
qsort ((void *) sortptr, (size_t) sphdr->nrecs, (size_t) sizeof (SLOT *), sort_spage_offsetcmp);
to_offset = sizeof (SLOTTED_PAGE_HEADER);
for (i = 0; i < sphdr->nrecs; i++)
{
/* Make sure that the offset is aligned */
to_offset = DB_ALIGN (to_offset, sphdr->alignment);
if (to_offset == sortptr[i]->roffset)
{
/* Record slot is already in place */
to_offset += sortptr[i]->rlength;
}
else
{
/* Move the record */
memmove ((char *) pgptr + to_offset, (char *) pgptr + sortptr[i]->roffset, sortptr[i]->rlength);
sortptr[i]->roffset = to_offset;
to_offset += sortptr[i]->rlength;
}
}
/* Make sure that the next inserted record will be aligned */
to_offset = DB_ALIGN (to_offset, sphdr->alignment);
sphdr->cfree = sphdr->tfree = (DB_PAGESIZE - to_offset - sphdr->nslots * sizeof (SLOT));
sphdr->foffset = to_offset;
free_and_init (sortptr);
/* The page is set dirty somewhere else */
return ret;
}
/*
* sort_spage_find_free () - Find a free area/slot where a record of the given length
* can be inserted onto the given slotted page
* return: A slot identifier or NULL_SLOTID
* pgptr(in): Pointer to slotted page
* sptr(out): Pointer to slotted page array pointer
* length(in): Length of area/record
* type(in): Type of record to be inserted
* space(out): Space used/defined
*
* Note: If there is not enough space on the page, an error condition is
* indicated and NULLSLOTID is returned.
*/
static INT16
sort_spage_find_free (PAGE_PTR pgptr, SLOT ** sptr, INT16 length, INT16 type, INT16 * space)
{
SLOTTED_PAGE_HEADER *sphdr;
INT16 slotid;
INT16 waste;
sphdr = (SLOTTED_PAGE_HEADER *) pgptr;
/* Calculate the wasting space that this record will introduce. We need to take in consideration the wasting space
* when there is space saved */
waste = DB_WASTED_ALIGN (length, sphdr->alignment);
*space = length + waste;
/* Quickly check for available space. We may need to check again if a slot is created (instead of reused) */
if (*space > sphdr->tfree)
{
*sptr = NULL;
*space = 0;
return NULL_SLOTID;
}
/* Find a free slot. Try to reuse an unused slotid, instead of allocating a new one */
*sptr = (SLOT *) ((char *) pgptr + DB_PAGESIZE - sizeof (SLOT));
if (sphdr->nslots == sphdr->nrecs)
{
slotid = sphdr->nslots;
*sptr -= slotid;
}
else
{
for (slotid = 0; slotid < sphdr->nslots; (*sptr)--, slotid++)
{
if ((*sptr)->rtype == REC_HOME || (*sptr)->rtype == REC_BIGONE)
{
; /* go ahead */
}
else
{ /* impossible case */
assert (false);
break;
}
}
}
/* Make sure that there is enough space for the record and the slot */
assert (slotid <= sphdr->nslots);
if (slotid >= sphdr->nslots)
{
*space += sizeof (SLOT);
/* We are allocating a new slotid. Check for available space again */
if (*space > sphdr->tfree)
{
*sptr = NULL;
*space = 0;
return NULL_SLOTID;
}
else if (*space > sphdr->cfree && sort_spage_compact (pgptr) != NO_ERROR)
{
*sptr = NULL;
*space = 0;
return NULL_SLOTID;
}
/* Adjust the number of slots */
sphdr->nslots++;
}
else
{
/* We already know that there is total space available since the slot is reused and the space was check above */
if (*space > sphdr->cfree && sort_spage_compact (pgptr) != NO_ERROR)
{
*sptr = NULL;
*space = 0;
return NULL_SLOTID;
}
}
/* Now separate an empty area for the record */
(*sptr)->roffset = sphdr->foffset;
(*sptr)->rlength = length;
(*sptr)->rtype = type;
/* Adjust the header */
sphdr->nrecs++;
sphdr->tfree -= *space;
sphdr->cfree -= *space;
sphdr->foffset += length + waste;
sphdr->waste_align += waste;
/* The page is set dirty somewhere else */
return slotid;
}
/*
* sort_spage_insert () - Insert a record onto the given slotted page
* return: A slot identifier
* pgptr(in): Pointer to slotted page
* recdes(in): Pointer to a record descriptor
*
* Note: If the record does not fit on the page, an error condition is
* indicated and NULL_SLOTID is returned.
*/
static INT16
sort_spage_insert (PAGE_PTR pgptr, RECDES * recdes)
{
SLOTTED_PAGE_HEADER *sphdr;
SLOT *sptr;
INT16 slotid;
INT16 used_space;
if (recdes->length > SORT_MAXREC_LENGTH)
{
return NULL_SLOTID;
}
assert (recdes->type == REC_HOME || recdes->type == REC_BIGONE);
slotid = sort_spage_find_free (pgptr, &sptr, recdes->length, recdes->type, &used_space);
if (slotid != NULL_SLOTID)
{
/* Find the free slot and insert the record */
memcpy (((char *) pgptr + sptr->roffset), recdes->data, recdes->length);
/* Indicate that we are spending our savings */
sphdr = (SLOTTED_PAGE_HEADER *) pgptr;
}
return slotid;
}
/*
* sort_spage_get_record () - Get specific record
* return: S_SUCCESS, S_DOESNT_FIT, S_DOESNT_EXIST
* pgptr(in): Pointer to slotted page
* slotid(in): Slot identifier of current record
* recdes(in): Pointer to a record descriptor
* peek_p(in): Indicates whether the record is going to be copied or peeked
*
* Note: When ispeeking is PEEK, the desired available record is peeked onto
* the page. The address of the record descriptor is set to the portion
* of the buffer where the record is stored. Peeking a record should be
* executed with caution since the slotted module may decide to move
* the record around. In general, no other operation should be executed
* on the page until the peeking of the record is done. The page should
* be fixed and locked to avoid any funny behavior. RECORD should NEVER
* be MODIFIED DIRECTLY. Only reads should be performed, otherwise
* header information and other records may be corrupted.
*
* When ispeeking is COPY, the desired available record is
* read onto the area pointed by the record descriptor. If the record
* does not fit in such an area, the length of the record is returned
* as a negative value in recdes->length and an error is indicated in the
* return value.
*/
static SCAN_CODE
sort_spage_get_record (PAGE_PTR pgptr, INT16 slotid, RECDES * recdes, bool peek_p)
{
SLOTTED_PAGE_HEADER *sphdr;
SLOT *sptr;
sphdr = (SLOTTED_PAGE_HEADER *) pgptr;
sptr = (SLOT *) ((char *) pgptr + DB_PAGESIZE - sizeof (SLOT));
sptr -= slotid;
if (slotid < 0 || slotid >= sphdr->nslots || sptr->roffset == NULL_OFFSET)
{
recdes->length = 0;
return S_DOESNT_EXIST;
}
/*
* If peeking, the address of the data in the descriptor is set to the
* address of the record in the buffer. Otherwise, the record is copied
* onto the area specified by the descriptor
*/
if (peek_p == PEEK)
{
recdes->area_size = -1;
recdes->data = (char *) pgptr + sptr->roffset;
}
else
{
/* copy the record */
if (sptr->rlength > recdes->area_size)
{
/*
* DOES NOT FIT
* Give a hint to the user of the needed length. Hint is given as a
* negative value
*/
recdes->length = -sptr->rlength;
return S_DOESNT_FIT;
}
memcpy (recdes->data, (char *) pgptr + sptr->roffset, sptr->rlength);
}
recdes->length = sptr->rlength;
recdes->type = sptr->rtype;
return S_SUCCESS;
}
#if defined(CUBRID_DEBUG)
/*
* sort_spage_dump_sptr () - Dump the slotted page array
* return: Total length of records
* sptr(in): Pointer to slotted page pointer array
* nslots(in): Number of slots
* alignment(in): Alignment for records
*
* Note: The content of the record is not dumped by this function.
* This function is used for debugging purposes.
*/
static INT16
sort_spage_dump_sptr (SLOT * sptr, INT16 nslots, INT16 alignment)
{
int i;
INT16 waste;
INT16 total_length_records = 0;
for (i = 0; i < nslots; sptr--, i++)
{
assert (sptr->rtype == REC_HOME || sptr->rtype == REC_BIGONE);
(void) fprintf (stdout, "\nSlot-id = %2d, offset = %4d, type = %s", i, sptr->roffset,
((sptr->rtype == REC_HOME) ? "HOME" : (sptr->rtype ==
REC_BIGONE) ? "BIGONE" : "UNKNOWN-BY-CURRENT-MODULE"));
if (sptr->roffset != NULL_OFFSET)
{
total_length_records += sptr->rlength;
waste = DB_WASTED_ALIGN (sptr->rlength, alignment);
(void) fprintf (stdout, ", length = %4d, waste = %d", sptr->rlength, waste);
}
(void) fprintf (stdout, "\n");
}
return total_length_records;
}
/*
* sort_spage_dump_hdr () - Dump an slotted page header
* return: void
* sphdr(in): Pointer to header of slotted page
*
* Note: This function is used for debugging purposes.
*/
static void
sort_spage_dump_hdr (SLOTTED_PAGE_HEADER * sphdr)
{
(void) fprintf (stdout, "NUM SLOTS = %d, NUM RECS = %d, TYPE OF SLOTS = %s,\n", sphdr->nslots, sphdr->nrecs,
spage_anchor_flag_string (sphdr->anchor_flag));
(void) fprintf (stdout, "ALIGNMENT-TO = %s, WASTED AREA FOR ALIGNMENT = %d,\n",
spage_alignment_string (sphdr->alignment), spage_waste_align);
(void) fprintf (stdout, "TOTAL FREE AREA = %d, CONTIGUOUS FREE AREA = %d, FREE SPACE OFFSET = %d,\n", sphdr->tfree,
sphdr->cfree, sphdr->foffset);
}
/*
* sort_spage_dump () - Dump an slotted page
* return: void
* pgptr(in): Pointer to slotted page
* rec_p(in): If true, records are printed in ascii format, otherwise, the
* records are not printed
*
* Note: The records are printed only when the value of rec_p is true.
* This function is used for debugging purposes.
*/
static void
sort_spage_dump (PAGE_PTR pgptr, int rec_p)
{
int i, j;
SLOTTED_PAGE_HEADER *sphdr;
SLOT *sptr;
char *rec;
int used_length = 0;
sphdr = (SLOTTED_PAGE_HEADER *) pgptr;
sort_spage_dump_hdr (sphdr);
sptr = (SLOT *) ((char *) pgptr + DB_PAGESIZE - sizeof (SLOT));
used_length = (sizeof (SLOTTED_PAGE_HEADER) + sphdr->waste_align + sizeof (SLOT) * sphdr->nslots);
used_length += sort_spage_dump_sptr (sptr, sphdr->nslots, sphdr->alignment);
if (rec_p)
{
(void) fprintf (stdout, "\nRecords in ascii follow ...\n");
for (i = 0; i < sphdr->nslots; sptr--, i++)
{
if (sptr->roffset != NULL_OFFSET)
{
(void) fprintf (stdout, "\nSlot-id = %2d\n", i);
rec = (char *) pgptr + sptr->roffset;
for (j = 0; j < sptr->rlength; j++)
{
(void) fputc (*rec++, stdout);
}
(void) fprintf (stdout, "\n");
}
else
{
(void) fprintf (stdout, "\nSlot-id = %2d has been deleted\n", i);
}
}
}
if (used_length + sphdr->tfree > DB_PAGESIZE)
{
(void) fprintf (stdout,
"sort_spage_dump: Inconsistent page \n (Used_space + tfree > DB_PAGESIZE\n (%d + %d) > %d \n "
" %d > %d\n", used_length, sphdr->tfree, DB_PAGESIZE, used_length + sphdr->tfree, DB_PAGESIZE);
}
if ((sphdr->cfree + sphdr->foffset + sizeof (SLOT) * sphdr->nslots) > DB_PAGESIZE)
{
(void) fprintf (stdout,
"sort_spage_dump: Inconsistent page\n (cfree + foffset + SIZEOF(SLOT) * nslots) > "
" DB_PAGESIZE\n (%d + %d + (%d * %d)) > %d\n %d > %d\n", sphdr->cfree, sphdr->foffset,
sizeof (SLOT), sphdr->nslots, DB_PAGESIZE,
(sphdr->cfree + sphdr->foffset + sizeof (SLOT) * sphdr->nslots), DB_PAGESIZE);
}
if (sphdr->cfree <= -(sphdr->alignment - 1))
{
(void) fprintf (stdout, "sort_spage_dump: Cfree %d is inconsistent in page. Cannot be < -%d\n", sphdr->cfree,
sphdr->alignment);
}
}
#endif /* CUBRID_DEBUG */
/* Sorting module functions */
/*
* sort_run_flip () - Flip a run in place
* return: void
* start(in):
* stop(in):
*
* Note: Odd runs will have middle pointer undisturbed.
*/
static void
sort_run_flip (char **start, char **stop)
{
char *temp;
while (start < stop)
{
temp = *start;
*start = *stop;
*stop = temp;
start++;
stop--;
}
return;
}
/*
* sort_append () -
* return: void
* pk0(in):
* pk1(in):
*/
static void
sort_append (const void *pk0, const void *pk1)
{
SORT_REC *node, *list;
node = *(SORT_REC **) pk0;
list = *(SORT_REC **) pk1;
while (list->next)
{
list = list->next;
}
list->next = node;
return;
}
/*
* sort_run_find () - Finds the longest ascending or descending run it can
* return:
* source(in):
* top(in):
* st_p(in):
* limit(in):
* compare(in):
* comp_arg(in):
* option(in):
*
* Note: Flip descending run, and assign RUN start and stop
*/
static void
sort_run_find (char **source, long *top, SORT_STACK * st_p, long limit, SORT_CMP_FUNC * compare, void *comp_arg,
SORT_DUP_OPTION option)
{
char **start;
char **stop;
char **next_stop;
char **limit_p;
SRUN *srun_p;
char **dup;
char **non_dup;
int dup_num;
int cmp;
bool increasing_order;
/* init new SRUN */
st_p->top++;
srun_p = &(st_p->srun[st_p->top]);
srun_p->tree_depth = 1;
srun_p->low_high = 'H';
srun_p->start = *top;
if (*top >= (limit - 1))
{
/* degenerate run length 1. Must go ahead and compare with length 2, because we may need to flip them */
srun_p->stop = limit - 1;
*top = limit;
return;
}
start = &source[*top];
stop = start + 1;
next_stop = stop + 1;
limit_p = &source[limit];
dup_num = 0;
/* have a non-trivial run of length 2 or more */
cmp = (*compare) (start, stop, comp_arg);
if (cmp > 0)
{
increasing_order = false; /* mark as non-increasing order run */
while (next_stop < limit_p && ((cmp = (*compare) (stop, next_stop, comp_arg)) >= 0))
{
/* mark duplicate as NULL */
SORT_CHECK_DUPLICATE (stop, next_stop); /* increase dup_num */
stop = next_stop;
next_stop = next_stop + 1;
}
}
else
{
increasing_order = true; /* mark as increasing order run */
/* mark duplicate as NULL */
SORT_CHECK_DUPLICATE (start, stop); /* increase dup_num */
/* build increasing order run */
while (next_stop < limit_p && ((cmp = (*compare) (stop, next_stop, comp_arg)) <= 0))
{
/* mark duplicate as NULL */
SORT_CHECK_DUPLICATE (stop, next_stop); /* increase dup_num */
stop = next_stop;
next_stop = next_stop + 1;
}
}
/* eliminate duplicates; right-shift slots */
if (dup_num)
{
dup = stop - 1;
for (non_dup = dup - 1; non_dup >= start; dup--, non_dup--)
{
/* find duplicated value slot */
if (*dup == NULL)
{
/* find previous non-duplicated value slot */
for (; non_dup >= start; non_dup--)
{
/* move non-duplicated value slot to duplicated value slot */
if (*non_dup != NULL)
{
*dup = *non_dup;
*non_dup = NULL;
break;
}
}
}
}
}
/* change non-increasing order run to increasing order run */
if (increasing_order != true)
{
sort_run_flip (start + dup_num, stop);
}
*top += CAST_BUFLEN (stop - start); /* advance to last visited */
srun_p->start += dup_num;
srun_p->stop = *top;
(*top)++; /* advance to next unvisited element */
return;
}
/*
* sort_run_merge () - Merges two runs from source to dest, updateing dest_top
* return:
* low(in):
* high(in):
* st_p(in):
* compare(in):
* comp_arg(in):
* option(in):
*/
static void
sort_run_merge (char **low, char **high, SORT_STACK * st_p, SORT_CMP_FUNC * compare, void *comp_arg,
SORT_DUP_OPTION option)
{
char dest_low_high;
char **left_start, **right_start;
char **left_stop, **right_stop;
char **dest_ptr;
SRUN *left_srun_p, *right_srun_p;
int cmp;
int dup_num;
do
{
/* STEP 1: initialize */
left_srun_p = &(st_p->srun[st_p->top - 1]);
right_srun_p = &(st_p->srun[st_p->top]);
left_srun_p->tree_depth++;
if (left_srun_p->low_high == 'L')
{
left_start = &low[left_srun_p->start];
left_stop = &low[left_srun_p->stop];
}
else
{
left_start = &high[left_srun_p->start];
left_stop = &high[left_srun_p->stop];
}
if (right_srun_p->low_high == 'L')
{
right_start = &low[right_srun_p->start];
right_stop = &low[right_srun_p->stop];
}
else
{
right_start = &high[right_srun_p->start];
right_stop = &high[right_srun_p->stop];
}
dup_num = 0;
/* STEP 2: check CON conditions srun follows ascending order. if (left_max < right_min) do FORWARD-CON. we use
* '<' instead of '<=' */
cmp = (*compare) (left_stop, right_start, comp_arg);
if (cmp < 0)
{
/* con == TRUE */
dest_low_high = right_srun_p->low_high;
if (left_srun_p->low_high == right_srun_p->low_high && left_srun_p->stop + 1 == right_srun_p->start)
{
;
}
else
{
/* move LEFT to RIGHT's current PART */
if (right_srun_p->low_high == 'L')
{
dest_ptr = &low[right_srun_p->start - 1];
}
else
{
dest_ptr = &high[right_srun_p->start - 1];
}
while (left_stop >= left_start)
{
/* copy LEFT */
*dest_ptr-- = *left_stop--;
}
}
}
else
{
/* con == FALSE do the actual merge; right-shift merge slots */
if (right_srun_p->low_high == 'L')
{
dest_low_high = 'H';
dest_ptr = &high[right_srun_p->stop];
}
else
{
dest_low_high = 'L';
dest_ptr = &low[right_srun_p->stop];
}
while (left_stop >= left_start && right_stop >= right_start)
{
cmp = (*compare) (left_stop, right_stop, comp_arg);
if (cmp == 0)
{
/* eliminate duplicate */
if (option == SORT_DUP)
{
sort_append (left_stop, right_stop);
}
dup_num++;
*dest_ptr-- = *right_stop--;
left_stop--;
}
else if (cmp > 0)
{
*dest_ptr-- = *left_stop--;
}
else
{
*dest_ptr-- = *right_stop--;
}
}
while (left_stop >= left_start)
{
/* copy the rest of LEFT */
*dest_ptr-- = *left_stop--;
}
while (right_stop >= right_start)
{
/* copy the rest of RIGHT */
*dest_ptr-- = *right_stop--;
}
}
/* STEP 3: reconfig SORT_STACK */
st_p->top--;
left_srun_p->low_high = dest_low_high;
left_srun_p->start = right_srun_p->start - (left_srun_p->stop - left_srun_p->start + 1) + dup_num;
left_srun_p->stop = right_srun_p->stop;
}
while ((st_p->top >= 1) /* may need to merge */
&& (st_p->srun[st_p->top - 1].tree_depth == st_p->srun[st_p->top].tree_depth));
return;
}
/*
* sort_run_sort () - An implementation of a run-sort algorithm
* return: pointer to the sorted area
* thread_p(in):
* sort_param(in): sort parameters
* base(in): pointer to the element at the base of the table
* limit(in): numrecs of before current sort
* sort_numrecs(in): numrecs of after privious sort
* otherbase(in): pointer to alternate area suffecient to store base-limit
* srun_limit(in): numrecs of after current sort
*
* Note: This sorts files by successive merging of runs.
*
* This has the advantage of being liner on sorted or reversed data,
* and being order N log base k N, where k is the average length of a
* run. Note that k must be at least 2, so the worst case is N log2 N.
*
* Overall, since long runs are common, this beats the pants off
* quick-sort.
*
* This could be sped up a bit by looking for N runs, and sorting these
* into lists of concatennatable runs.
*/
static char **
sort_run_sort (THREAD_ENTRY * thread_p, SORT_PARAM * sort_param, char **base, long limit, long sort_numrecs,
char **otherbase, long *srun_limit)
{
SORT_CMP_FUNC *compare;
void *comp_arg;
SORT_DUP_OPTION option;
char **src, **dest, **result;
SORT_STACK sr_stack, *st_p;
long src_top = 0;
int cnt;
assert_release (limit == *srun_limit);
/* exclude already sorted items */
limit -= sort_numrecs;
if (limit == 0 || (limit == 1 && sort_numrecs == 0))
{
return base;
}
/* init */
compare = sort_param->cmp_fn;
comp_arg = sort_param->cmp_arg;
option = sort_param->option;
src = base;
dest = otherbase;
result = NULL;
st_p = &sr_stack;
st_p->top = -1;
cnt = (int) (log10 (ceil ((double) limit / 2.0)) / log10 (2.0)) + 2;
if (sort_numrecs)
{
/* reserve space for already found srun */
cnt += 1;
}
st_p->srun = (SRUN *) db_private_alloc (NULL, cnt * sizeof (SRUN));
if (st_p->srun == NULL)
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, cnt * sizeof (SRUN));
return NULL;
}
do
{
sort_run_find (src, &src_top, st_p, limit, compare, comp_arg, option);
if (src_top < limit)
{
sort_run_find (src, &src_top, st_p, limit, compare, comp_arg, option);
}
while ((st_p->top >= 1) /* may need to merge */
&& ((src_top >= limit) /* case 1: final merge stage */
|| ((src_top < limit) /* case 2: non-final merge stage */
&& (st_p->srun[st_p->top - 1].tree_depth == st_p->srun[st_p->top].tree_depth))))
{
sort_run_merge (dest, src, st_p, compare, comp_arg, option);
}
}
while (src_top < limit);
if (sort_numrecs > 0)
{
/* finally, merge with already found srun */
SRUN *srun_p;
srun_p = &(st_p->srun[++(st_p->top)]);
srun_p->tree_depth = 1;
srun_p->low_high = 'H';
srun_p->start = limit;
srun_p->stop = limit + sort_numrecs - 1;
sort_run_merge (dest, src, st_p, compare, comp_arg, option);
}
#if defined(CUBRID_DEBUG)
if (limit != src_top)
{
printf ("Inconsistent sort test d), %ld %ld\n", limit, src_top);
}
#endif /* CUBRID_DEBUG */
/* include already sorted items */
limit += sort_numrecs;
/* save limit of non-duplicated value slot */
*srun_limit = limit - st_p->srun[0].start;
/* move base pointer */
result = base + st_p->srun[0].start;
if (st_p->srun[0].low_high == 'L')
{
if (option == SORT_ELIM_DUP)
{
memcpy (result, otherbase + st_p->srun[0].start, (*srun_limit) * sizeof (char *));
}
else
{
result = otherbase + st_p->srun[0].start;
}
}
assert (result != NULL);
#if defined(CUBRID_DEBUG)
src_top = 0;
src = result;
sort_run_find (src, &src_top, st_p, *srun_limit, compare, comp_arg, option);
if (st_p->srun[st_p->top].stop != *srun_limit - 1)
{
printf ("Inconsistent sort, %ld %ld %ld\n", st_p->srun[st_p->top].start, st_p->srun[st_p->top].stop, *srun_limit);
result = NULL;
}
#endif /* CUBRID_DEBUG */
db_private_free_and_init (NULL, st_p->srun);
#if !defined(NDEBUG)
if (sort_validate (result, *srun_limit, compare, comp_arg) != NO_ERROR)
{
result = NULL;
}
#endif
return result;
}
/*
* sort_listfile () - Perform sorting
* return:
* volid(in): volume to keep the temporary files
* est_inp_pg_cnt(in): estimated number of input pages, or -1
* get_fn(in): user-supplied function: provides the next sort item (or,
* temporary record) every time it is called. This function
* should put the next sort item to the area pointed by the
* record descriptor "temp_recdes" and should return SORT_SUCCESS
* to acknowledge that everything is fine. However, if there is
* a problem it should return a corresponding code.
* For example, if the sort item does not fit into the
* "temp_recdes" area it should return SORT_REC_DOESNT_FIT; and
* if there are no more sort items then it should return
* SORT_NOMORE_RECS. In case of an error, it should return
* SORT_ERROR_OCCURRED,in addition to setting the corresponding
* error code. (Note that in this case, the sorting process
* will be aborted.)
* get_arg(in): arguments to the get_fn function
* put_fn(in): user-supplied function: provides the output operation to be
* applied on the sorted items. This function should process
* (in any way it chooses so) the sort item passed via the
* record descriptor "temp_recdes" and return NO_ERROR to be
* called again with the next item on the sorted order. If it
* returns any error code then the sorting process is aborted.
* put_arg(in): arguments to the put_fn function
* cmp_fn(in): user-supplied function for comparing records to be sorted.
* This function is expected to follow the strcmp() protocol
* for return results: -1 means the first arg precedes the
* second, 1 means the second precedes the first, and 0 means
* neither precedes the other.
* cmp_arg(in): arguments to the cmp_fn function
* option(in):
* limit(in): optional arg, can represent the limit clause. If we only want
* the top K elements of a processed list, it makes sense to use
* special optimizations instead of sorting the entire list and
* returning the first K elements.
* Parameter: -1 if not to be used, > 0 if it should be taken
* into account. Zero is a reserved value.
* includes_tde_class(in): whether tde-configured class data is included or not,
* it determines whehter internal temp files are
* encrypted or not.
*/
int
sort_listfile (THREAD_ENTRY * thread_p, INT16 volid, int est_inp_pg_cnt, SORT_GET_FUNC * get_fn, void *get_arg,
SORT_PUT_FUNC * put_fn, void *put_arg, SORT_CMP_FUNC * cmp_fn, void *cmp_arg, SORT_DUP_OPTION option,
int limit, bool includes_tde_class, SORT_PARALLEL_TYPE parallel_type)
{
int error = NO_ERROR;
SORT_PARAM ori_sort_param;
SORT_PARAM *sort_param = &ori_sort_param;
INT32 input_pages;
int i;
/* for parallel sort */
SORT_PARAM *px_sort_param = NULL;
#if defined(SERVER_MODE)
pthread_mutex_t px_mtx; /* px_status mutex */
pthread_cond_t complete_cond; /* complete condition */
#endif
thread_set_sort_stats_active (thread_p, true);
#if defined(ENABLE_SYSTEMTAP)
CUBRID_SORT_START ();
#endif /* ENABLE_SYSTEMTAP */
#if defined(SERVER_MODE)
if (pthread_mutex_init (&px_mtx, NULL) != 0)
{
error = ER_CSS_PTHREAD_MUTEX_INIT;
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, error, 0);
return error;
}
if (pthread_cond_init (&complete_cond, NULL) != 0)
{
error = ER_CSS_PTHREAD_COND_INIT;
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, error, 0);
return error;
}
sort_param->px_mtx = &px_mtx;
sort_param->complete_cond = &complete_cond;
#endif /* SERVER_MODE */
sort_param->cmp_fn = cmp_fn;
sort_param->cmp_arg = cmp_arg;
sort_param->option = option;
sort_param->get_fn = get_fn;
sort_param->get_arg = get_arg;
sort_param->put_fn = put_fn;
sort_param->put_arg = put_arg;
sort_param->limit = limit;
sort_param->tot_tempfiles = 0;
/* initialize memory allocable fields */
for (i = 0; i < SORT_MAX_TOT_FILES; i++)
{
sort_param->temp[i].volid = NULL_VOLID;
sort_param->file_contents[i].num_pages = NULL;
}
sort_param->internal_memory = NULL;
/* initialize temp. overflow file. Real value will be assigned in sort_inphase_sort function, if long size sorting
* records are encountered. */
sort_param->multipage_file.volid = NULL_VOLID;
sort_param->multipage_file.fileid = NULL_FILEID;
/* NOTE: This volume list will not be used any more. */
/* initialize temporary volume list */
sort_param->vol_list.volid = volid;
sort_param->vol_list.vol_ent_cnt = 0;
sort_param->vol_list.vol_cnt = 0;
sort_param->vol_list.vol_info = NULL;
if (est_inp_pg_cnt > 0)
{
/* 10% of overhead and fragmentation */
input_pages = est_inp_pg_cnt + MAX ((int) (est_inp_pg_cnt * 0.1), 2);
}
else
{
input_pages = prm_get_integer_value (PRM_ID_SR_NBUFFERS);
}
/* The size of a sort buffer is limited to PRM_SR_NBUFFERS. */
sort_param->tot_buffers = MIN (prm_get_integer_value (PRM_ID_SR_NBUFFERS), input_pages);
sort_param->tot_buffers = MAX (4, sort_param->tot_buffers);
sort_param->internal_memory = (char *) malloc ((size_t) sort_param->tot_buffers * (size_t) DB_PAGESIZE);
if (sort_param->internal_memory == NULL)
{
sort_param->tot_buffers = 4;
sort_param->internal_memory = (char *) malloc (sort_param->tot_buffers * DB_PAGESIZE);
if (sort_param->internal_memory == NULL)
{
error = ER_OUT_OF_VIRTUAL_MEMORY;
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, error, 1, (size_t) (sort_param->tot_buffers * DB_PAGESIZE));
goto cleanup;
}
}
sort_param->half_files = sort_get_num_half_tmpfiles (sort_param->tot_buffers, input_pages);
sort_param->tot_tempfiles = sort_param->half_files << 1;
sort_param->in_half = 0;
for (i = 0; i < SORT_MAX_TOT_FILES; i++)
{
/* Initilize temporary file identifier; real value will be set in "sort_add_new_file () */
sort_param->temp[i].volid = NULL_VOLID;
/* Initilize file contents list */
sort_param->file_contents[i].num_pages = (int *) malloc (SORT_INITIAL_DYN_ARRAY_SIZE * sizeof (int));
if (sort_param->file_contents[i].num_pages == NULL)
{
sort_param->tot_tempfiles = i;
error = ER_OUT_OF_VIRTUAL_MEMORY;
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, error, 1, (size_t) (SORT_INITIAL_DYN_ARRAY_SIZE * sizeof (int)));
goto cleanup;
}
sort_param->file_contents[i].num_slots = SORT_INITIAL_DYN_ARRAY_SIZE;
sort_param->file_contents[i].first_run = -1;
sort_param->file_contents[i].last_run = -1;
}
/* Create only input temporary files make file and temporary volume page count estimates */
sort_param->tmp_file_pgs = CEIL_PTVDIV (input_pages, sort_param->half_files);
sort_param->tmp_file_pgs = MAX (1, sort_param->tmp_file_pgs);
sort_param->tde_encrypted = includes_tde_class;
sort_param->px_type = parallel_type;
tde_er_log ("sort_listfile(): tde_encrypted = %d\n", sort_param->tde_encrypted);
#if defined(SERVER_MODE)
/* check the number of parallel process */
sort_param->px_parallel_num = sort_check_parallelism (thread_p, sort_param);
if (sort_param->px_parallel_num <= 1)
{
/* single process */
sort_param->px_parallel_num = 1;
if (sort_param->px_type == SORT_INDEX_LEAF)
{
SORT_ARGS *sort_args_p = (SORT_ARGS *) sort_param->get_arg;
memset (&sort_args_p->hfscan_cache, 0, sizeof (HEAP_SCANCACHE));
memset (&sort_args_p->attr_info, 0, sizeof (HEAP_CACHE_ATTRINFO));
if (bt_load_heap_scancache_start_for_attrinfo (thread_p, sort_args_p, NULL, NULL, true) != NO_ERROR)
{
error = ER_FAILED;
goto cleanup;
}
log_sysop_start (thread_p);
if (btree_create_file (thread_p, &sort_args_p->class_ids[0], sort_args_p->attr_ids[0],
sort_args_p->btid->sys_btid) != NO_ERROR)
{
ASSERT_ERROR ();
log_sysop_abort (thread_p);
bt_load_heap_scancache_end_for_attrinfo (thread_p, sort_args_p, NULL, NULL);
error = ER_FAILED;
goto cleanup;
}
vacuum_log_add_dropped_file (thread_p, &sort_args_p->btid->sys_btid->vfid, NULL,
VACUUM_LOG_ADD_DROPPED_FILE_UNDO);
}
error = sort_listfile_internal (thread_p, sort_param);
if (sort_param->px_type == SORT_INDEX_LEAF)
{
SORT_ARGS *sort_args_p = (SORT_ARGS *) sort_param->get_arg;
bt_load_heap_scancache_end_for_attrinfo (thread_p, sort_args_p, NULL, NULL);
if (error != NO_ERROR)
{
log_sysop_abort (thread_p);
}
}
}
else
{
px_sort_param = (SORT_PARAM *) malloc (sizeof (SORT_PARAM) * sort_param->px_parallel_num);
if (px_sort_param == NULL)
{
error = ER_OUT_OF_VIRTUAL_MEMORY;
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, error, 1, sizeof (SORT_PARAM));
goto cleanup;
}
/* parallel process */
error = sort_start_parallelism (thread_p, px_sort_param, sort_param);
if (error != NO_ERROR)
{
goto cleanup;
}
/* execute parallel sort */
SORT_EXECUTE_PARALLEL (sort_param->px_parallel_num, px_sort_param, sort_listfile_execute);
/* wait for threads */
SORT_WAIT_PARALLEL (sort_param->px_parallel_num, sort_param, px_sort_param);
if (error != NO_ERROR)
{
goto cleanup;
}
error = sort_end_parallelism (thread_p, px_sort_param, sort_param);
if (error != NO_ERROR)
{
goto cleanup;
}
}
#else
/* single process for stand alone mode */
sort_param->px_parallel_num = 1;
error = sort_listfile_internal (thread_p, sort_param);
#endif
cleanup:
#if defined(ENABLE_SYSTEMTAP)
CUBRID_SORT_END (sort_param->total_numrecs, error);
#endif /* ENABLE_SYSTEMTAP */
#if defined(SERVER_MODE)
pthread_mutex_destroy (&px_mtx);
pthread_cond_destroy (&complete_cond);
#endif
/* free sort_param */
#if defined(SERVER_MODE)
if (sort_param->px_parallel_num > 1)
{
for (int i = 0; i < sort_param->px_parallel_num; i++)
{
sort_return_used_resources (thread_p, &px_sort_param[i], PX_THREAD_IN_PARALLEL);
}
sort_return_used_resources (thread_p, sort_param, PX_MAIN_IN_PARALLEL);
sort_param->px_worker_manager->release_workers ();
free_and_init (px_sort_param);
}
else
#endif
{
sort_return_used_resources (thread_p, sort_param, PX_SINGLE);
}
thread_set_sort_stats_active (thread_p, false);
return error;
}
#if defined(SERVER_MODE)
void
sort_listfile_execute (cubthread::entry & thread_ref, SORT_PARAM * sort_param)
{
TSC_TICKS start_tick, end_tick;
TSCTIMEVAL tv_diff;
THREAD_ENTRY *thread_p = &thread_ref;
PX_STATUS px_status;
PRED_EXPR_WITH_CONTEXT *filter_pred = NULL;
FILTER_INDEX_INFO filter_index_info = { NULL, 0 };
FUNCTION_INDEX_INFO func_index_info;
XASL_UNPACK_INFO *func_unpack_info = NULL;
DB_TYPE single_node_type = DB_TYPE_NULL;
thread_p->push_resource_tracks ();
thread_ref.tran_index = sort_param->px_orig_thread_p->tran_index;
thread_ref.m_px_orig_thread_entry = sort_param->px_orig_thread_p;
thread_ref.conn_entry = sort_param->px_orig_thread_p->conn_entry;
if (sort_param->px_orig_thread_p->on_trace)
{
thread_set_sort_stats_active (thread_p, true);
thread_ref.on_trace = true;
perfmon_initialize_parallel_stats (&thread_ref);
}
if (sort_param->px_type == SORT_ORDER_BY)
{
SORT_INFO *sort_info_p = (SORT_INFO *) sort_param->get_arg;
/* open splitted temp file for read */
if (qfile_open_list_scan (sort_info_p->input_file, sort_info_p->s_id->s_id) != NO_ERROR)
{
px_status = PX_ERR_FAILED;
goto cleanup;
}
if (thread_is_on_trace (sort_param->px_orig_thread_p))
{
tsc_getticks (&start_tick);
}
}
else if (sort_param->px_type == SORT_INDEX_LEAF)
{
SORT_ARGS *sort_args_p = (SORT_ARGS *) sort_param->get_arg;
func_index_info.expr_stream = NULL;
func_index_info.expr_stream_size = 0;
if (sort_args_p->filter != NULL)
{
filter_index_info = *sort_args_p->filter_index_info;
}
if (sort_args_p->func_index_info != NULL)
{
func_index_info = *sort_args_p->func_index_info;
}
sort_args_p->filter = NULL;
sort_args_p->func_index_info = NULL;
if (btree_load_filter_pred_function_info
(thread_p, sort_args_p, &filter_pred, &filter_index_info, &func_index_info, &func_unpack_info,
&single_node_type) != NO_ERROR)
{
px_status = PX_ERR_FAILED;
goto cleanup;
}
sort_args_p->attrinfo_inited = false;
sort_args_p->scancache_inited = false;
memset (&sort_args_p->hfscan_cache, 0, sizeof (HEAP_SCANCACHE));
memset (&sort_args_p->attr_info, 0, sizeof (HEAP_CACHE_ATTRINFO));
if (bt_load_heap_scancache_start_for_attrinfo (thread_p, sort_args_p, NULL, NULL, true) != NO_ERROR)
{
px_status = PX_ERR_FAILED;
goto cleanup;
}
}
else
{
/* Not implemented yet */
px_status = PX_ERR_FAILED;
goto cleanup;
}
if (sort_listfile_internal (&thread_ref, sort_param) != NO_ERROR)
{
px_status = PX_ERR_FAILED;
goto cleanup;
}
else
{
px_status = PX_DONE;
}
cleanup:
if (sort_param->px_type == SORT_ORDER_BY)
{
SORT_INFO *sort_info_p = (SORT_INFO *) sort_param->get_arg;
/* close splitted temp file for read */
qfile_close_scan (&thread_ref, sort_info_p->s_id->s_id);
if (thread_is_on_trace (sort_param->px_orig_thread_p))
{
tsc_getticks (&end_tick);
tsc_elapsed_time_usec (&tv_diff, end_tick, start_tick);
TSC_ADD_TIMEVAL (sort_param->orderby_stats.orderby_time, tv_diff);
sort_param->orderby_stats.orderby_pages += (perfmon_get_from_statistic (thread_p, PSTAT_SORT_NUM_DATA_PAGES));
sort_param->orderby_stats.orderby_ioreads += (perfmon_get_from_statistic (thread_p, PSTAT_SORT_NUM_IO_PAGES));
}
}
else if (sort_param->px_type == SORT_INDEX_LEAF)
{
SORT_ARGS *sort_args_p = (SORT_ARGS *) sort_param->get_arg;
bt_load_heap_scancache_end_for_attrinfo (thread_p, sort_args_p, NULL, NULL);
bt_load_clear_pred_and_unpack (thread_p, sort_args_p, func_unpack_info);
}
else
{
/* Not implemented yet */
px_status = PX_ERR_FAILED;
}
if (sort_param->px_orig_thread_p->on_trace)
{
thread_set_sort_stats_active (thread_p, false);
perfmon_destroy_parallel_stats (&thread_ref);
}
thread_p->pop_resource_tracks ();
/* done */
pthread_mutex_lock (sort_param->px_mtx);
sort_param->px_status = px_status;
if (px_status == PX_ERR_FAILED)
{
sort_param->main_error_context->get_current_error_level ().swap (cuberr::context::get_thread_local_error ());
}
pthread_cond_signal (sort_param->complete_cond);
pthread_mutex_unlock (sort_param->px_mtx);
}
#endif
/*
* sort_listfile_internal () - Perform sorting
* return:
*/
int
sort_listfile_internal (THREAD_ENTRY * thread_p, SORT_PARAM * sort_param)
{
int error = NO_ERROR;
int file_pg_cnt_est;
int i;
/*
* Don't allocate any temp files yet, since we may not need them.
* We'll allocate them on the fly as the need arises.
*
* However, indicate to file/disk manager of the approximate temporary
* space that is going to be needed.
*/
error = sort_inphase_sort (thread_p, sort_param, sort_param->get_fn, sort_param->get_arg, &sort_param->total_numrecs);
if (error != NO_ERROR)
{
return error;
}
if (sort_param->tot_runs > 1)
{
assert (sort_param->tot_runs > 0);
/* Create output temporary files make file and temporary volume page count estimates */
file_pg_cnt_est = sort_get_avg_numpages_of_nonempty_tmpfile (sort_param);
file_pg_cnt_est = MAX (1, file_pg_cnt_est);
for (i = sort_param->half_files; i < sort_param->tot_tempfiles; i++)
{
error =
sort_add_new_file (thread_p, &(sort_param->temp[i]), file_pg_cnt_est, true, sort_param->tde_encrypted);
if (error != NO_ERROR)
{
return error;
}
}
if (sort_param->option == SORT_ELIM_DUP)
{
error = sort_exphase_merge_elim_dup (thread_p, sort_param);
}
else
{
/* SORT_DUP */
error = sort_exphase_merge (thread_p, sort_param);
}
} /* if (sort_param->tot_runs > 1) */
return error;
}
#if !defined(NDEBUG)
/*
* sort_validate() -
* return:
* vector(in):
* size(in):
* compare(in):
* comp_arg(in):
*
* NOTE: debugging function
*/
static int
sort_validate (char **vector, long size, SORT_CMP_FUNC * compare, void *comp_arg)
{
long k;
int cmp;
assert (vector != NULL);
assert (size >= 1);
assert (compare != NULL);
#if 1 /* TODO - for QAF, do not delete me */
return NO_ERROR;
#endif
for (k = 0; k < size - 1; k++)
{
cmp = (*compare) (&(vector[k]), &(vector[k + 1]), comp_arg);
if (cmp != DB_LT)
{
assert (false);
return ER_FAILED;
}
}
return NO_ERROR;
}
#endif
/*
* sort_inphase_sort () - Internal sorting phase
* return:
* sort_param(in): sort parameters
* get_fn(in): user-supplied function: provides the temporary record for
* the given input record
* get_arg(in): arguments for get_fn
* total_numrecs(out): records sorted
*
*/
static int
sort_inphase_sort (THREAD_ENTRY * thread_p, SORT_PARAM * sort_param, SORT_GET_FUNC * get_fn, void *get_arg,
unsigned int *total_numrecs)
{
/* Variables for the input file */
SORT_STATUS status;
/* Variables for the current output file */
int out_curfile;
char *output_buffer;
int cur_page[SORT_MAX_HALF_FILES];
/* Variables for the internal memory */
RECDES temp_recdes;
RECDES long_recdes; /* Record desc. for reading in long sorting records */
char *item_ptr; /* Pointer to the first free location of the temp. records region of internal memory */
long numrecs; /* Number of records kept in the internal memory */
long sort_numrecs; /* Number of sort records kept in the internal memory */
bool once_flushed = false;
long saved_numrecs;
char **saved_index_area;
char **index_area; /* Part of internal memory keeping the addresses of records */
char **index_buff; /* buffer area to sort indexes. */
int i;
int error = NO_ERROR;
#if defined (SERVER_MODE)
int rv = NO_ERROR;
#endif /* SERVER_MODE */
assert (sort_param->half_files <= SORT_MAX_HALF_FILES);
/* Initialize the current pages of all temp files to 0 */
for (i = 0; i < sort_param->half_files; i++)
{
cur_page[i] = 0;
}
sort_param->tot_runs = 0;
out_curfile = sort_param->in_half;
output_buffer = sort_param->internal_memory + ((long) (sort_param->tot_buffers - 1) * DB_PAGESIZE);
assert (output_buffer > sort_param->internal_memory);
numrecs = 0;
sort_numrecs = 0;
saved_numrecs = 0;
*total_numrecs = 0;
saved_index_area = NULL;
item_ptr = sort_param->internal_memory + SORT_RECORD_LENGTH_SIZE;
index_area = (char **) (output_buffer - sizeof (char *));
index_buff = index_area - 1;
temp_recdes.area_size = SORT_MAXREC_LENGTH;
temp_recdes.length = 0;
long_recdes.area_size = 0;
long_recdes.data = NULL;
for (;;)
{
if ((char *) index_buff < item_ptr)
{
/* Internal memory is already full */
status = SORT_REC_DOESNT_FIT;
}
else
{
/* Internal memory is not full; try to get the next item */
temp_recdes.data = item_ptr;
if (((int) ((char *) index_buff - item_ptr)) < SORT_MAXREC_LENGTH)
{
temp_recdes.area_size = (int) ((char *) index_buff - item_ptr) - (4 * sizeof (char *));
}
if (temp_recdes.area_size <= SSIZEOF (SORT_REC))
{
/* internal memory is not enough */
status = SORT_REC_DOESNT_FIT;
}
else
{
status = (*get_fn) (thread_p, &temp_recdes, get_arg);
/* There are no more input records; So, break the loop */
if (status == SORT_NOMORE_RECS)
{
break;
}
}
}
switch (status)
{
case SORT_ERROR_OCCURRED:
error = er_errid ();
assert (error != NO_ERROR);
goto exit_on_error;
case SORT_REC_DOESNT_FIT:
if (numrecs > 0)
{
/* Perform internal sorting and flush the run */
index_area++;
index_area =
sort_run_sort (thread_p, sort_param, index_area, numrecs, sort_numrecs, index_buff, &numrecs);
*total_numrecs += numrecs;
if (index_area == NULL || numrecs < 0)
{
error = ER_FAILED;
goto exit_on_error;
}
if (sort_param->option == SORT_ELIM_DUP)
{
/* reset item_ptr */
item_ptr = index_area[0];
for (i = 1; i < numrecs; i++)
{
if (index_area[i] > item_ptr)
{
item_ptr = index_area[i];
}
}
item_ptr += DB_ALIGN (SORT_RECORD_LENGTH (item_ptr), MAX_ALIGNMENT) + SORT_RECORD_LENGTH_SIZE;
}
if (sort_param->option == SORT_ELIM_DUP
&& ((item_ptr - sort_param->internal_memory) + numrecs * SSIZEOF (SLOT) < DB_PAGESIZE)
&& temp_recdes.length <= SORT_MAXREC_LENGTH)
{
/* still, remaining key area enough; do not flush, go on internal sorting */
index_buff = index_area - numrecs;
index_area--;
index_buff--; /* decrement once for pointer, */
index_buff--; /* once for pointer buffer */
/* must keep track because index_buff is used */
/* to detect when sort buffer is full */
temp_recdes.area_size = SORT_MAXREC_LENGTH;
assert ((char *) index_buff >= item_ptr);
}
else
{
error =
sort_run_flush (thread_p, sort_param, out_curfile, cur_page, output_buffer, index_area, numrecs,
REC_HOME);
if (error != NO_ERROR)
{
goto exit_on_error;
}
/* Prepare for the next internal sorting run */
if (sort_param->tot_runs == 1)
{
once_flushed = true;
saved_numrecs = numrecs;
saved_index_area = index_area;
}
numrecs = 0;
item_ptr = sort_param->internal_memory + SORT_RECORD_LENGTH_SIZE;
index_area = (char **) (output_buffer - sizeof (char *));
index_buff = index_area - 1;
temp_recdes.area_size = SORT_MAXREC_LENGTH;
/* Switch to the next Temp file */
if (++out_curfile >= sort_param->half_files)
{
out_curfile = sort_param->in_half;
}
}
/* save sorted record number at this stage */
sort_numrecs = numrecs;
}
/* Check if the record would fit into a single slotted page. If not, take special action for this record. */
if (temp_recdes.length > SORT_MAXREC_LENGTH)
{
/* TAKE CARE OF LONG RECORD as a separate RUN */
if (long_recdes.area_size < temp_recdes.length)
{
/* read in the long record to a dynamic memory area */
if (long_recdes.data)
{
free_and_init (long_recdes.data);
}
long_recdes.area_size = temp_recdes.length;
long_recdes.data = (char *) malloc (long_recdes.area_size);
if (long_recdes.data == NULL)
{
error = ER_OUT_OF_VIRTUAL_MEMORY;
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, error, 1, (size_t) long_recdes.area_size);
goto exit_on_error;
}
}
/* Obtain the long record */
status = (*get_fn) (thread_p, &long_recdes, get_arg);
if (status != SORT_SUCCESS)
{
/* Obtaining the long record has failed */
if (status == SORT_REC_DOESNT_FIT || status == SORT_NOMORE_RECS)
{
/* This should never happen */
error = ER_GENERIC_ERROR;
er_set (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, error, 0);
}
else
{
ASSERT_ERROR ();
error = er_errid ();
}
assert (error != NO_ERROR);
goto exit_on_error;
}
/* Put the record to the multipage area & put the pointer to internal memory area */
/* If necessary create the multipage_file */
if (sort_param->multipage_file.volid == NULL_VOLID)
{
TDE_ALGORITHM tde_algo = TDE_ALGORITHM_NONE;
/* Create the multipage file */
sort_param->multipage_file.volid = sort_param->temp[0].volid;
error = file_create_temp (thread_p, 1, &sort_param->multipage_file);
if (error != NO_ERROR)
{
ASSERT_ERROR ();
goto exit_on_error;
}
if (sort_param->tde_encrypted)
{
tde_algo = (TDE_ALGORITHM) prm_get_integer_value (PRM_ID_TDE_DEFAULT_ALGORITHM);
if (file_apply_tde_algorithm (thread_p, &sort_param->multipage_file, tde_algo) != NO_ERROR)
{
file_temp_retire (thread_p, &sort_param->multipage_file);
ASSERT_ERROR ();
goto exit_on_error;
}
}
}
/* Create a multipage record for this long record : insert to multipage_file and put the pointer as the
* first record in this run */
if (overflow_insert (thread_p, &sort_param->multipage_file, (VPID *) item_ptr, &long_recdes, FILE_TEMP)
!= NO_ERROR)
{
ASSERT_ERROR_AND_SET (error);
goto exit_on_error;
}
/* Update the pointers */
SORT_RECORD_LENGTH (item_ptr) = sizeof (VPID);
*index_area = item_ptr;
numrecs++;
error =
sort_run_flush (thread_p, sort_param, out_curfile, cur_page, output_buffer, index_area, numrecs,
REC_BIGONE);
if (error != NO_ERROR)
{
goto exit_on_error;
}
/* Prepare for the next internal sorting run */
numrecs = 0;
item_ptr = sort_param->internal_memory + SORT_RECORD_LENGTH_SIZE;
index_area = (char **) (output_buffer - sizeof (char *));
index_buff = index_area - 1;
temp_recdes.area_size = SORT_MAXREC_LENGTH;
/* Switch to the next Temp file */
if (++out_curfile >= sort_param->half_files)
{
out_curfile = sort_param->in_half;
}
/* save sorted record number at this stage */
sort_numrecs = numrecs;
}
break;
case SORT_SUCCESS:
/* Proceed the pointers */
SORT_RECORD_LENGTH (item_ptr) = temp_recdes.length;
*index_area = item_ptr;
numrecs++;
index_area--;
index_buff--; /* decrease once for pointer, once for pointer buffer */
index_buff--; /* must keep track because index_buff is used to detect when sort buffer is full */
item_ptr += DB_ALIGN (temp_recdes.length, MAX_ALIGNMENT) + SORT_RECORD_LENGTH_SIZE;
break;
default:
/* This should never happen */
error = ER_GENERIC_ERROR;
er_set (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, error, 0);
goto exit_on_error;
}
}
if (numrecs > 0)
{
/* The input file has finished; process whatever is left over in the internal memory */
index_area++;
index_area = sort_run_sort (thread_p, sort_param, index_area, numrecs, sort_numrecs, index_buff, &numrecs);
*total_numrecs += numrecs;
if (index_area == NULL || numrecs < 0)
{
error = ER_FAILED;
goto exit_on_error;
}
if (sort_param->tot_runs > 0 || SORT_IS_PARALLEL (sort_param))
{
/* There has been other runs produced already */
error =
sort_run_flush (thread_p, sort_param, out_curfile, cur_page, output_buffer, index_area, numrecs, REC_HOME);
if (error != NO_ERROR)
{
goto exit_on_error;
}
}
else
{
/* No run has been produced yet There is no need for the merging phase; directly output the sorted temp
* records. */
for (i = 0; i < numrecs; i++)
{
/* Obtain the output record for this temporary record */
temp_recdes.data = index_area[i];
temp_recdes.length = SORT_RECORD_LENGTH (index_area[i]);
error = (*sort_param->put_fn) (thread_p, &temp_recdes, sort_param->put_arg);
if (error != NO_ERROR)
{
if (error == SORT_PUT_STOP)
{
error = NO_ERROR;
}
goto exit_on_error;
}
}
}
}
else if (sort_param->tot_runs == 1 && !SORT_IS_PARALLEL (sort_param))
{
if (once_flushed)
{
for (i = 0; i < saved_numrecs; i++)
{
/* Obtain the output record for this temporary record */
temp_recdes.data = saved_index_area[i];
temp_recdes.length = SORT_RECORD_LENGTH (saved_index_area[i]);
error = (*sort_param->put_fn) (thread_p, &temp_recdes, sort_param->put_arg);
if (error != NO_ERROR)
{
if (error == SORT_PUT_STOP)
{
error = NO_ERROR;
}
goto exit_on_error;
}
}
}
else
{
/*
* The only way to get here is if we had exactly one record to
* sort, and that record required overflow pages. In that case we
* have done a ridiculous amount of work, but there doesn't seem to
* be an easy way to restructure the existing sort_inphase_sort code to
* cope with the situation. Just go get the record and be happy...
*/
error = NO_ERROR;
temp_recdes.data = NULL;
long_recdes.area_size = 0;
if (long_recdes.data)
{
free_and_init (long_recdes.data);
}
if (sort_read_area (thread_p, &sort_param->temp[0], 0, 1, output_buffer) != NO_ERROR
|| sort_spage_get_record (output_buffer, 0, &temp_recdes, PEEK) != S_SUCCESS
|| sort_retrieve_longrec (thread_p, &temp_recdes, &long_recdes) == NULL
|| (*sort_param->put_fn) (thread_p, &long_recdes, sort_param->put_arg) != NO_ERROR)
{
ASSERT_ERROR ();
error = er_errid ();
goto exit_on_error;
}
}
}
exit_on_error:
if (long_recdes.data)
{
free_and_init (long_recdes.data);
}
return error;
}
/*
* sort_run_flush () - Flush run
* return:
* sort_param(in): sort parameters
* out_file(in): index of output file to flush the run
* cur_page(in): current page of each temp file (used to determine
* where, within the file, the run should be flushed)
* output_buffer(in): output buffer to use for flushing the records
* index_area(in): index area keeping ordered pointers to the records
* numrecs(in): number of records the run includes
* rec_type(in): type of records; Assume that all the records of this
* run has the same type. This may need to be changed
* to allow individual records have different types.
*
* Note: This function flushes a run to the specified output file. The records
* of the run are loaded to the output buffer in the order imposed by
* the index area (i.e., on the order of pointers to these records).
* This buffer is written to the successive pages of the specified file
* when it is full.
*/
static int
sort_run_flush (THREAD_ENTRY * thread_p, SORT_PARAM * sort_param, int out_file, int *cur_page, char *output_buffer,
char **index_area, int numrecs, int rec_type)
{
int error = NO_ERROR;
int run_size;
RECDES out_recdes;
int i;
SORT_REC *key, *next;
int flushed_items = 0;
int should_continue = true;
/* Make sure the the temp file indexed by out_file has been created; if not, create it now. */
if (sort_param->temp[out_file].volid == NULL_VOLID)
{
error =
sort_add_new_file (thread_p, &sort_param->temp[out_file], sort_param->tmp_file_pgs, false,
sort_param->tde_encrypted);
if (error != NO_ERROR)
{
return error;
}
}
/* Store the record type; used for REC_BIGONE record types */
out_recdes.type = rec_type;
run_size = 0;
sort_spage_initialize (output_buffer, UNANCHORED_KEEP_SEQUENCE, MAX_ALIGNMENT);
/* Insert each record to the output buffer and flush the buffer when it is full */
for (i = 0; i < numrecs && should_continue; i++)
{
/* Traverse next link */
for (key = (SORT_REC *) index_area[i]; key; key = next)
{
/* If we've already flushed the required number, stop. */
if (sort_param->limit > 0 && flushed_items >= sort_param->limit)
{
should_continue = false;
break;
}
/* cut-off and save duplicate sort_key value link */
if (rec_type == REC_HOME)
{
next = key->next;
}
else
{
/* REC_BIGONE */
next = NULL;
}
out_recdes.data = (char *) key;
out_recdes.length = SORT_RECORD_LENGTH ((char *) key);
if (sort_spage_insert (output_buffer, &out_recdes) == NULL_SLOTID)
{
/* Output buffer is full */
error =
sort_write_area (thread_p, &sort_param->temp[out_file], cur_page[out_file], 1, output_buffer,
sort_param->tde_encrypted);
if (error != NO_ERROR)
{
return error;
}
cur_page[out_file]++;
run_size++;
sort_spage_initialize (output_buffer, UNANCHORED_KEEP_SEQUENCE, MAX_ALIGNMENT);
if (sort_spage_insert (output_buffer, &out_recdes) == NULL_SLOTID)
{
/* Slotted page module refuses to insert a short size record to an empty page. This should never
* happen. */
error = ER_GENERIC_ERROR;
er_set (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, error, 0);
return error;
}
}
flushed_items++;
}
}
if (sort_spage_get_numrecs (output_buffer))
{
/* Flush the partially full output page */
error =
sort_write_area (thread_p, &sort_param->temp[out_file], cur_page[out_file], 1, output_buffer,
sort_param->tde_encrypted);
if (error != NO_ERROR)
{
return error;
}
/* Update sort parameters */
cur_page[out_file]++;
run_size++;
}
/* Record the insertion of the new pages of the file to global parameters */
error = sort_run_add_new (&sort_param->file_contents[out_file], run_size);
if (error != NO_ERROR)
{
return error;
}
sort_param->tot_runs++;
return NO_ERROR;
}
/*
* sort_retrieve_longrec () -
* return:
* address(in):
* memory(in):
*/
static char *
sort_retrieve_longrec (THREAD_ENTRY * thread_p, RECDES * address, RECDES * memory)
{
int needed_area_size;
/* Find the required area for the long record */
needed_area_size = overflow_get_length (thread_p, (VPID *) address->data);
if (needed_area_size == -1)
{
return NULL;
}
/* If necessary allocate dynamic area for the long record */
if (needed_area_size > memory->area_size)
{
/* There is already a small area; free it. */
if (memory->data != NULL)
{
free_and_init (memory->data);
}
/* Allocate dynamic area for this long record */
memory->area_size = needed_area_size;
memory->data = (char *) malloc (memory->area_size);
if (memory->data == NULL)
{
return NULL;
}
}
/* Retrieve the long record */
if (overflow_get (thread_p, (VPID *) address->data, memory, NULL) != S_SUCCESS)
{
return NULL;
}
return memory->data;
}
/*
* sort_exphase_merge_elim_dup () -
* return:
* sort_param(in):
*/
static int
sort_exphase_merge_elim_dup (THREAD_ENTRY * thread_p, SORT_PARAM * sort_param)
{
/* Variables for input files */
int act_infiles; /* How many of input files are active */
int pre_act_infiles; /* Number of active input files in the previous iteration */
int in_sectsize; /* Size of section allocated to each active input file (in terms of number of buffers
* it contains) */
int read_pages; /* Number of pages read in to fill the input buffer */
int in_act_bufno[SORT_MAX_HALF_FILES]; /* Active buffer in the input section */
int in_last_buf[SORT_MAX_HALF_FILES]; /* Last full buffer of the input section */
int act_slot[SORT_MAX_HALF_FILES]; /* Active slot of the active buffer of input section */
int last_slot[SORT_MAX_HALF_FILES]; /* Last slot of the active buffer of the input section */
char *in_sectaddr[SORT_MAX_HALF_FILES]; /* Beginning address of each input section */
char *in_cur_bufaddr[SORT_MAX_HALF_FILES]; /* Address of the current buffer in each input section */
/* Variables for output file */
int out_half; /* Which half of temp files is for output */
int cur_outfile; /* Index for output file recieving new run */
int out_sectsize; /* Size of the output section (in terms of number of buffer it contains) */
int out_act_bufno; /* Active buffer in the output section */
int out_runsize; /* Total pages output for the run being produced */
char *out_sectaddr; /* Beginning address of the output section */
char *out_cur_bufaddr; /* Address of the current buffer in the output section */
/* Smallest element pointers (one for each active input file) pointing to the active temp records. If the input file
* becomes inactive (all input is exhausted), its smallest element pointer is set to NULL */
RECDES smallest_elem_ptr[SORT_MAX_HALF_FILES];
RECDES long_recdes[SORT_MAX_HALF_FILES];
int cur_page[2 * SORT_MAX_HALF_FILES]; /* Current page of each temp file */
int num_runs; /* Number of output runs to be produced in this stage of the merging phase; */
int big_index;
int error;
int i, j;
int temp;
int min;
int len;
bool very_last_run = false;
int act;
int cp_pages;
int cmp;
SORT_CMP_FUNC *compare;
void *compare_arg;
SORT_REC_LIST sr_list[SORT_MAX_HALF_FILES], *min_p, *s, *p;
int tmp_var;
RECDES last_elem_ptr; /* last element pointer in one page of input section */
RECDES last_long_recdes;
/* >0 : must find minimum record <0 : last element in the current input section is minimum record. no need to find
* min record =0 : last element in the current input section is duplicated min record. no need to find min record
* except last element */
int last_elem_cmp;
char **data1, **data2;
SORT_REC *sort_rec;
int first_run;
error = NO_ERROR;
compare = sort_param->cmp_fn;
compare_arg = sort_param->cmp_arg;
for (i = 0; i < SORT_MAX_HALF_FILES; i++)
{
in_act_bufno[i] = 0;
in_last_buf[i] = 0;
act_slot[i] = 0;
last_slot[i] = 0;
in_sectaddr[i] = NULL;
in_cur_bufaddr[i] = NULL;
smallest_elem_ptr[i].data = NULL;
smallest_elem_ptr[i].area_size = 0;
long_recdes[i].data = NULL;
long_recdes[i].area_size = 0;
}
last_elem_ptr.data = NULL;
last_elem_ptr.area_size = 0;
last_long_recdes.data = NULL;
last_long_recdes.area_size = 0;
for (i = 0; i < (int) DIM (cur_page); i++)
{
cur_page[i] = 0;
}
if (sort_param->in_half == 0)
{
out_half = sort_param->half_files;
}
else
{
out_half = 0;
}
/* OUTER LOOP */
/* While there are more than one input files with different runs to merge */
while ((act_infiles = sort_get_numpages_of_active_infiles (sort_param)) > 1)
{
/* Check if output files has enough pages; if not allocate new pages */
error = sort_checkalloc_numpages_of_outfiles (thread_p, sort_param);
if (error != NO_ERROR)
{
ASSERT_ERROR ();
goto bailout;
}
/* Initialize the current pages of all temp files to 0 */
for (i = 0; i < sort_param->tot_tempfiles; i++)
{
cur_page[i] = 0;
}
/* Distribute the internal memory to the input and output sections */
in_sectsize = sort_find_inbuf_size (sort_param->tot_buffers, act_infiles);
out_sectsize = sort_param->tot_buffers - in_sectsize * act_infiles;
/* Set the address of each input section */
for (i = 0; i < act_infiles; i++)
{
in_sectaddr[i] = sort_param->internal_memory + (i * in_sectsize * DB_PAGESIZE);
}
/* Set the address of output section */
out_sectaddr = sort_param->internal_memory + (act_infiles * in_sectsize * DB_PAGESIZE);
cur_outfile = out_half;
/* Find how many runs will be produced in this iteration */
num_runs = 0;
for (i = sort_param->in_half; i < sort_param->in_half + act_infiles; i++)
{
len = sort_get_num_file_contents (&sort_param->file_contents[i]);
if (len > num_runs)
{
num_runs = len;
}
}
if (num_runs == 1)
{
very_last_run = true;
}
/* PRODUCE RUNS */
for (j = num_runs; j > 0; j--)
{
if (!very_last_run && (j == 1))
{
/* Last iteration of the outer loop ; some of the input files might have become empty. */
pre_act_infiles = act_infiles;
act_infiles = sort_get_numpages_of_active_infiles (sort_param);
if (act_infiles != pre_act_infiles)
{
/* Some of the active input files became inactive */
if (act_infiles == 1)
{
/*
* There is only one active input file (i.e. there is
* only one input run to produce the output run). So,
* there is no need to perform the merging actions. All
* needed is to copy this input run to the current output
* file.
*/
act = -1;
/* Find which input file contains this last input run */
for (i = sort_param->in_half; i < (sort_param->in_half + pre_act_infiles); i++)
{
if (sort_param->file_contents[i].first_run != -1)
{
act = i;
break;
}
}
if (act == -1)
{
goto bailout;
}
first_run = sort_param->file_contents[act].first_run;
cp_pages = sort_param->file_contents[act].num_pages[first_run];
error = sort_run_add_new (&sort_param->file_contents[cur_outfile], cp_pages);
if (error != NO_ERROR)
{
goto bailout;
}
sort_run_remove_first (&sort_param->file_contents[act]);
/* Use the whole internal_memory area as both the input and output buffer areas. */
while (cp_pages > 0)
{
if (cp_pages > sort_param->tot_buffers)
{
read_pages = sort_param->tot_buffers;
}
else
{
read_pages = cp_pages;
}
error =
sort_read_area (thread_p, &sort_param->temp[act], cur_page[act], read_pages,
sort_param->internal_memory);
if (error != NO_ERROR)
{
goto bailout;
}
cur_page[act] += read_pages;
error =
sort_write_area (thread_p, &sort_param->temp[cur_outfile], cur_page[cur_outfile],
read_pages, sort_param->internal_memory, sort_param->tde_encrypted);
if (error != NO_ERROR)
{
goto bailout;
}
cur_page[cur_outfile] += read_pages;
cp_pages -= read_pages;
}
/* Skip the remaining operations of the PRODUCE RUNS loop */
continue;
}
else
{
/* There are more than one active input files; redistribute buffers */
in_sectsize = sort_find_inbuf_size (sort_param->tot_buffers, act_infiles);
out_sectsize = sort_param->tot_buffers - in_sectsize * act_infiles;
/* Set the address of each input section */
for (i = 0; i < act_infiles; i++)
{
in_sectaddr[i] = sort_param->internal_memory + (i * in_sectsize * DB_PAGESIZE);
}
/* Set the address of output section */
out_sectaddr = sort_param->internal_memory + (act_infiles * in_sectsize * DB_PAGESIZE);
}
}
}
/* PRODUCE A NEW RUN */
/* INITIALIZE INPUT SECTIONS AND INPUT VARIABLES */
for (i = 0; i < act_infiles; i++)
{
big_index = sort_param->in_half + i;
first_run = sort_param->file_contents[big_index].first_run;
read_pages = sort_param->file_contents[big_index].num_pages[first_run];
if (in_sectsize < read_pages)
{
read_pages = in_sectsize;
}
error =
sort_read_area (thread_p, &sort_param->temp[big_index], cur_page[big_index], read_pages,
in_sectaddr[i]);
if (error != NO_ERROR)
{
goto bailout;
}
/* Increment the current page of this input_file */
cur_page[big_index] += read_pages;
first_run = sort_param->file_contents[big_index].first_run;
sort_param->file_contents[big_index].num_pages[first_run] -= read_pages;
/* Initialize input variables */
in_cur_bufaddr[i] = in_sectaddr[i];
in_act_bufno[i] = 0;
in_last_buf[i] = read_pages;
act_slot[i] = 0;
last_slot[i] = sort_spage_get_numrecs (in_cur_bufaddr[i]);
if (sort_spage_get_record (in_cur_bufaddr[i], act_slot[i], &smallest_elem_ptr[i], PEEK) != S_SUCCESS)
{
er_set (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, ER_SORT_TEMP_PAGE_CORRUPTED, 0);
error = ER_SORT_TEMP_PAGE_CORRUPTED;
goto bailout;
}
/* If this is a long record retrieve it */
if (smallest_elem_ptr[i].type == REC_BIGONE)
{
if (sort_retrieve_longrec (thread_p, &smallest_elem_ptr[i], &long_recdes[i]) == NULL)
{
ASSERT_ERROR ();
error = er_errid ();
goto bailout;
}
}
}
/* linkage & init nodes */
for (i = 0, p = sr_list; i < (act_infiles - 1); p = p->next)
{
p->next = (SORT_REC_LIST *) ((char *) p + sizeof (SORT_REC_LIST));
p->rec_pos = i++;
p->is_duplicated = false;
}
p->next = NULL;
p->rec_pos = i;
p->is_duplicated = false;
/* sort sr_list */
for (s = sr_list; s; s = s->next)
{
for (p = s->next; p; p = p->next)
{
/* compare s, p */
data1 = ((smallest_elem_ptr[s->rec_pos].type == REC_BIGONE)
? &(long_recdes[s->rec_pos].data) : &(smallest_elem_ptr[s->rec_pos].data));
data2 = ((smallest_elem_ptr[p->rec_pos].type == REC_BIGONE)
? &(long_recdes[p->rec_pos].data) : &(smallest_elem_ptr[p->rec_pos].data));
cmp = (*compare) (data1, data2, compare_arg);
if (cmp > 0)
{
/* swap s, p's rec_pos */
tmp_var = s->rec_pos;
s->rec_pos = p->rec_pos;
p->rec_pos = tmp_var;
}
}
}
/* find duplicate */
for (s = sr_list; s && s->next; s = s->next)
{
p = s->next;
data1 = ((smallest_elem_ptr[s->rec_pos].type == REC_BIGONE)
? &(long_recdes[s->rec_pos].data) : &(smallest_elem_ptr[s->rec_pos].data));
data2 = ((smallest_elem_ptr[p->rec_pos].type == REC_BIGONE)
? &(long_recdes[p->rec_pos].data) : &(smallest_elem_ptr[p->rec_pos].data));
cmp = (*compare) (data1, data2, compare_arg);
if (cmp == 0)
{
p->is_duplicated = true;
}
}
/* set min_p to point the minimum record */
min_p = sr_list; /* min_p->rec_pos is the min record */
/* last element comparison */
last_elem_cmp = 1;
p = min_p->next; /* second smallest element */
if (p)
{
/* STEP 1: get last_elem */
if (sort_spage_get_record (in_cur_bufaddr[min_p->rec_pos], (last_slot[min_p->rec_pos] - 1),
&last_elem_ptr, PEEK) != S_SUCCESS)
{
error = ER_SORT_TEMP_PAGE_CORRUPTED;
er_set (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, error, 0);
goto bailout;
}
/* if this is a long record, retrieve it */
if (last_elem_ptr.type == REC_BIGONE)
{
if (sort_retrieve_longrec (thread_p, &last_elem_ptr, &last_long_recdes) == NULL)
{
ASSERT_ERROR ();
error = er_errid ();
goto bailout;
}
}
/* STEP 2: compare last, p */
data1 = ((last_elem_ptr.type == REC_BIGONE) ? &(last_long_recdes.data) : &(last_elem_ptr.data));
data2 = ((smallest_elem_ptr[p->rec_pos].type == REC_BIGONE)
? &(long_recdes[p->rec_pos].data) : &(smallest_elem_ptr[p->rec_pos].data));
last_elem_cmp = (*compare) (data1, data2, compare_arg);
}
/* INITIALIZE OUTPUT SECTION AND OUTPUT VARIABLES */
out_act_bufno = 0;
out_cur_bufaddr = out_sectaddr;
for (i = 0; i < out_sectsize; i++)
{
/* Initialize each buffer to contain a slotted page */
sort_spage_initialize (out_sectaddr + (i * DB_PAGESIZE), UNANCHORED_KEEP_SEQUENCE, MAX_ALIGNMENT);
}
/* Initialize the size of next run to zero */
out_runsize = 0;
/* In parallel sort, put_fn will be performed by the parent thread. save last file index. */
if (very_last_run && SORT_IS_PARALLEL (sort_param))
{
sort_param->px_result_file_idx = cur_outfile;
}
for (;;)
{
/* OUTPUT A RECORD */
/* FIND MINIMUM RECORD IN THE INPUT AREA */
min = min_p->rec_pos;
/* min_p->is_duplicated == 1 then skip duplicated sort_key record */
if (min_p->is_duplicated == false)
{
/* we found first unique sort_key record */
if (very_last_run && !SORT_IS_PARALLEL (sort_param))
{
/* OUTPUT THE RECORD */
/* Obtain the output record for this temporary record */
if (smallest_elem_ptr[min].type == REC_BIGONE)
{
error = (*sort_param->put_fn) (thread_p, &long_recdes[min], sort_param->put_arg);
if (error != NO_ERROR)
{
goto bailout;
}
}
else
{
sort_rec = (SORT_REC *) (smallest_elem_ptr[min].data);
/* cut-off link used in Internal Sort */
sort_rec->next = NULL;
error = (*sort_param->put_fn) (thread_p, &smallest_elem_ptr[min], sort_param->put_arg);
if (error != NO_ERROR)
{
goto bailout;
}
}
}
else
{
/* OUTPUT THE MINIMUM RECORD TO THE OUTPUT AREA */
/* Insert this record to the output area */
if (sort_spage_insert (out_cur_bufaddr, &smallest_elem_ptr[min]) == NULL_SLOTID)
{
/* Current output buffer is full */
if (++out_act_bufno < out_sectsize)
{
/* There is another buffer in the output section; so insert the new record there */
out_cur_bufaddr += DB_PAGESIZE;
if (sort_spage_insert (out_cur_bufaddr, &smallest_elem_ptr[min]) == NULL_SLOTID)
{
/*
* Slotted page module refuses to insert a
* short size record (a temporary record that
* was already in a slotted page) to an empty
* page. This should never happen.
*/
er_set (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, ER_GENERIC_ERROR, 0);
error = ER_GENERIC_ERROR;
goto bailout;
}
}
else
{
/* Output section is full */
/* Flush output section */
error =
sort_write_area (thread_p, &sort_param->temp[cur_outfile], cur_page[cur_outfile],
out_sectsize, out_sectaddr, sort_param->tde_encrypted);
if (error != NO_ERROR)
{
goto bailout;
}
cur_page[cur_outfile] += out_sectsize;
out_runsize += out_sectsize;
/* Initialize output section and output variables */
out_act_bufno = 0;
out_cur_bufaddr = out_sectaddr;
for (i = 0; i < out_sectsize; i++)
{
/* Initialize each buffer to contain a slotted page */
sort_spage_initialize (out_sectaddr + (i * DB_PAGESIZE), UNANCHORED_KEEP_SEQUENCE,
MAX_ALIGNMENT);
}
if (sort_spage_insert (out_cur_bufaddr, &smallest_elem_ptr[min]) == NULL_SLOTID)
{
/*
* Slotted page module refuses to insert a
* short size record (a temporary record that
* was already in a slotted page) to an empty
* page. This should never happen.
*/
er_set (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, ER_GENERIC_ERROR, 0);
error = ER_GENERIC_ERROR;
goto bailout;
}
}
}
}
}
else
{
/* skip output the duplicated record to the output area */
;
}
/* PROCEED THE smallest_elem_ptr[min] TO NEXT RECORD */
if (++act_slot[min] >= last_slot[min])
{
/* The current input page is finished */
last_elem_cmp = 1;
if (++in_act_bufno[min] < in_last_buf[min])
{
/* Switch to the next page in the input buffer */
in_cur_bufaddr[min] = in_sectaddr[min] + in_act_bufno[min] * DB_PAGESIZE;
}
else
{ /* The input section is finished */
int frun;
big_index = sort_param->in_half + min;
frun = sort_param->file_contents[big_index].first_run;
if (sort_param->file_contents[big_index].num_pages[frun])
{
/* There are still some pages in the current input run */
in_cur_bufaddr[min] = in_sectaddr[min];
read_pages = sort_param->file_contents[big_index].num_pages[frun];
if (in_sectsize < read_pages)
{
read_pages = in_sectsize;
}
in_last_buf[min] = read_pages;
error = sort_read_area (thread_p, &sort_param->temp[big_index], cur_page[big_index],
read_pages, in_cur_bufaddr[min]);
if (error != NO_ERROR)
{
goto bailout;
}
/* Increment the current page of this input_file */
cur_page[big_index] += read_pages;
in_act_bufno[min] = 0;
frun = sort_param->file_contents[big_index].first_run;
sort_param->file_contents[big_index].num_pages[frun] -= read_pages;
}
else
{
/* Current input run on this input file has finished */
min_p = min_p->next;
if (min_p == NULL)
{
/* all "smallest_elem_ptr" are NULL; so break */
break;
}
else
{
/* Don't try to get the next record on this input section */
continue;
}
}
}
act_slot[min] = 0;
last_slot[min] = sort_spage_get_numrecs (in_cur_bufaddr[min]);
}
if (sort_spage_get_record (in_cur_bufaddr[min], act_slot[min], &smallest_elem_ptr[min], PEEK) !=
S_SUCCESS)
{
er_set (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, ER_SORT_TEMP_PAGE_CORRUPTED, 0);
error = ER_SORT_TEMP_PAGE_CORRUPTED;
goto bailout;
}
/* If this is a long record retrieve it */
if (smallest_elem_ptr[min].type == REC_BIGONE)
{
if (sort_retrieve_longrec (thread_p, &smallest_elem_ptr[min], &long_recdes[min]) == NULL)
{
ASSERT_ERROR ();
error = er_errid ();
goto bailout;
}
}
if ((act_slot[min_p->rec_pos] == last_slot[min_p->rec_pos] - 1) && (last_elem_cmp == 0))
{
/* last duplicated element in input section page enters */
min_p->is_duplicated = true;
}
else
{
min_p->is_duplicated = false;
}
/* find minimum */
if (last_elem_cmp <= 0)
{
/* already found min */
;
}
else
{
for (s = min_p; s; s = s->next)
{
p = s->next;
if (p == NULL)
{
/* there is only one record */
break;
}
/* compare s, p */
data1 = ((smallest_elem_ptr[s->rec_pos].type == REC_BIGONE)
? &(long_recdes[s->rec_pos].data) : &(smallest_elem_ptr[s->rec_pos].data));
data2 = ((smallest_elem_ptr[p->rec_pos].type == REC_BIGONE)
? &(long_recdes[p->rec_pos].data) : &(smallest_elem_ptr[p->rec_pos].data));
cmp = (*compare) (data1, data2, compare_arg);
if (cmp > 0)
{
/* swap s, p's rec_pos */
tmp_var = s->rec_pos;
s->rec_pos = p->rec_pos;
p->rec_pos = tmp_var;
/* swap s, p's is_duplicated */
tmp_var = (int) s->is_duplicated;
s->is_duplicated = p->is_duplicated;
p->is_duplicated = (bool) tmp_var;
}
else
{
if (cmp == 0)
{
p->is_duplicated = true; /* duplicated */
}
/* sr_list is completely sorted */
break;
}
}
/* new input page is entered */
if (act_slot[min_p->rec_pos] == 0)
{
/* last element comparison */
p = min_p->next; /* second smallest element */
if (p)
{
/* STEP 1: get last_elem */
if (sort_spage_get_record (in_cur_bufaddr[min_p->rec_pos], (last_slot[min_p->rec_pos] - 1),
&last_elem_ptr, PEEK) != S_SUCCESS)
{
error = ER_SORT_TEMP_PAGE_CORRUPTED;
er_set (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, error, 0);
goto bailout;
}
/* if this is a long record, retrieve it */
if (last_elem_ptr.type == REC_BIGONE)
{
if (sort_retrieve_longrec (thread_p, &last_elem_ptr, &last_long_recdes) == NULL)
{
ASSERT_ERROR ();
error = er_errid ();
goto bailout;
}
}
/* STEP 2: compare last, p */
data1 = ((last_elem_ptr.type == REC_BIGONE)
? &(last_long_recdes.data) : &(last_elem_ptr.data));
data2 = ((smallest_elem_ptr[p->rec_pos].type == REC_BIGONE)
? &(long_recdes[p->rec_pos].data) : &(smallest_elem_ptr[p->rec_pos].data));
last_elem_cmp = (*compare) (data1, data2, compare_arg);
}
}
}
}
if (!(very_last_run && !SORT_IS_PARALLEL (sort_param)))
{
/* Flush whatever is left on the output section */
out_act_bufno++; /* Since 0 refers to the first active buffer */
error =
sort_write_area (thread_p, &sort_param->temp[cur_outfile], cur_page[cur_outfile], out_act_bufno,
out_sectaddr, sort_param->tde_encrypted);
if (error != NO_ERROR)
{
goto bailout;
}
cur_page[cur_outfile] += out_act_bufno;
out_runsize += out_act_bufno;
}
/* END UP THIS RUN */
/* Remove previous first_run nodes of the file_contents lists of the input files */
for (i = sort_param->in_half; i < sort_param->in_half + sort_param->half_files; i++)
{
sort_run_remove_first (&sort_param->file_contents[i]);
}
/* Add a new node to the file_contents list of the current output file */
error = sort_run_add_new (&sort_param->file_contents[cur_outfile], out_runsize);
if (error != NO_ERROR)
{
goto bailout;
}
/* Produce a new run */
/* Switch to the next out file */
if (++cur_outfile >= sort_param->half_files + out_half)
{
cur_outfile = out_half;
}
}
/* Exchange input and output file indices */
temp = sort_param->in_half;
sort_param->in_half = out_half;
out_half = temp;
}
bailout:
for (i = 0; i < sort_param->half_files; i++)
{
if (long_recdes[i].data != NULL)
{
free_and_init (long_recdes[i].data);
}
}
if (last_long_recdes.data)
{
free_and_init (last_long_recdes.data);
}
return (error == SORT_PUT_STOP) ? NO_ERROR : error;
}
/*
* sort_put_result_from_tmpfile () - put result from last temp file
* return:
* sort_param(in): sort parameters
*
*/
int
sort_put_result_from_tmpfile (THREAD_ENTRY * thread_p, SORT_PARAM * sort_param, int start_pagenum)
{
int tot_pages;
int current_pages = start_pagenum, read_pages = 0, cur_read_pages = 0;
int slot_num = 0;
char *cur_pgptr;
int result_file_idx = sort_param->px_result_file_idx;
RECDES record = RECDES_INITIALIZER;
RECDES long_record = RECDES_INITIALIZER;
int error = NO_ERROR;
SORT_REC *sort_rec;
int tot_rows = 0;
tot_pages = sort_param->file_contents[result_file_idx].num_pages[0];
while (tot_pages > 0)
{
read_pages = (tot_pages > sort_param->tot_buffers) ? sort_param->tot_buffers : tot_pages;
error =
sort_read_area (thread_p, &sort_param->temp[result_file_idx], current_pages, read_pages,
sort_param->internal_memory);
if (error != NO_ERROR)
{
goto bailout;
}
cur_pgptr = sort_param->internal_memory;
cur_read_pages = read_pages;
while (cur_read_pages > 0)
{
/* read record from sort temp file */
slot_num = sort_spage_get_numrecs (cur_pgptr);
tot_rows += slot_num;
for (int i = 0; i < slot_num; i++)
{
if (sort_spage_get_record (cur_pgptr, i, &record, PEEK) != S_SUCCESS)
{
er_set (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, ER_SORT_TEMP_PAGE_CORRUPTED, 0);
error = ER_SORT_TEMP_PAGE_CORRUPTED;
goto bailout;
}
/* If this is a long record retrieve it */
if (record.type == REC_BIGONE)
{
if (sort_retrieve_longrec (thread_p, &record, &long_record) == NULL)
{
ASSERT_ERROR ();
error = er_errid ();
goto bailout;
}
}
/* write data by put_fn */
if (record.type == REC_BIGONE)
{
error = (*sort_param->put_fn) (thread_p, &long_record, sort_param->put_arg);
if (error != NO_ERROR)
{
goto bailout;
}
}
else
{
sort_rec = (SORT_REC *) (record.data);
/* cut-off link used in Internal Sort */
sort_rec->next = NULL;
error = (*sort_param->put_fn) (thread_p, &record, sort_param->put_arg);
if (error != NO_ERROR)
{
goto bailout;
}
}
}
/* Switch to the next page */
cur_pgptr += DB_PAGESIZE;
cur_read_pages--;
current_pages++;
}
tot_pages -= read_pages;
}
bailout:
if (long_record.data != NULL)
{
free_and_init (long_record.data);
}
return (error == SORT_PUT_STOP) ? NO_ERROR : error;
}
/*
* sort_split_last_run () - split last run for parallel works.
* return:
* px_sort_param(in): px_sort_param
* sort_param(in): sort_param
*
*/
void
sort_split_last_run (THREAD_ENTRY * thread_p, SORT_PARAM * px_sort_param, SORT_PARAM * sort_param, int parallel_num)
{
int result_file_idx, total_pages, pages_per_thread, remaining_pages;
int start_index = 0;
int cp_pages, read_pages;
result_file_idx = sort_param->px_result_file_idx = px_sort_param[0].px_result_file_idx;
total_pages = sort_param->file_contents[result_file_idx].num_pages[0] =
px_sort_param[0].file_contents[result_file_idx].num_pages[0];
sort_param->temp[result_file_idx] = px_sort_param[0].temp[result_file_idx];
pages_per_thread = total_pages / parallel_num;
remaining_pages = total_pages % parallel_num;
for (int i = 0; i < parallel_num; i++)
{
px_sort_param[i].px_result_file_idx = 0;
px_sort_param[i].file_contents[0].num_pages[0] = pages_per_thread + (i < remaining_pages ? 1 : 0);
px_sort_param[i].file_contents[0].first_run = 0;
px_sort_param[i].file_contents[0].last_run = 0;
px_sort_param[i].temp[0] = sort_param->temp[result_file_idx];
start_index += (i == 0) ? 0 : px_sort_param[i - 1].file_contents[0].num_pages[0];
px_sort_param[i].file_contents[0].start_index = start_index;
/* init px_status */
px_sort_param[i].px_status = PX_PROGRESS;
}
}
/*
* sort_exphase_merge () - Merge phase
* return:
* sort_param(in): sort parameters
*
*/
static int
sort_exphase_merge (THREAD_ENTRY * thread_p, SORT_PARAM * sort_param)
{
/* Variables for input files */
int act_infiles; /* How many of input files are active */
int pre_act_infiles; /* Number of active input files in the previous iteration */
int in_sectsize; /* Size of section allocated to each active input file (in terms of number of buffers
* it contains) */
int read_pages; /* Number of pages read in to fill the input buffer */
int in_act_bufno[SORT_MAX_HALF_FILES]; /* Active buffer in the input section */
int in_last_buf[SORT_MAX_HALF_FILES]; /* Last full buffer of the input section */
int act_slot[SORT_MAX_HALF_FILES]; /* Active slot of the active buffer of input section */
int last_slot[SORT_MAX_HALF_FILES]; /* Last slot of the active buffer of the input section */
char *in_sectaddr[SORT_MAX_HALF_FILES]; /* Beginning address of each input section */
char *in_cur_bufaddr[SORT_MAX_HALF_FILES]; /* Address of the current buffer in each input section */
/* Variables for output file */
int out_half; /* Which half of temp files is for output */
int cur_outfile; /* Index for output file recieving new run */
int out_sectsize; /* Size of the output section (in terms of number of buffer it contains) */
int out_act_bufno; /* Active buffer in the output section */
int out_runsize; /* Total pages output for the run being produced */
char *out_sectaddr; /* Beginning address of the output section */
char *out_cur_bufaddr; /* Address of the current buffer in the output section */
/* Smallest element pointers (one for each active input file) pointing to the active temp records. If the input file
* becomes inactive (all input is exhausted), its smallest element pointer is set to NULL */
RECDES smallest_elem_ptr[SORT_MAX_HALF_FILES];
RECDES long_recdes[SORT_MAX_HALF_FILES];
int cur_page[2 * SORT_MAX_HALF_FILES]; /* Current page of each temp file */
int num_runs; /* Number of output runs to be produced in this stage of the merging phase; */
int big_index;
int error;
int i, j;
int temp;
int min;
int len;
bool very_last_run = false;
int act;
int cp_pages;
SORT_CMP_FUNC *compare;
void *compare_arg;
SORT_REC_LIST sr_list[SORT_MAX_HALF_FILES], *min_p, *s, *p;
int tmp_pos; /* temporary value for rec_pos swapping */
bool do_swap; /* rec_pos swapping indicator */
RECDES last_elem_ptr; /* last element pointers in one page of input section */
RECDES last_long_recdes;
bool last_elem_is_min; /* false: must find min record true: last element in the current input section is min
* record. no need to find min */
char **data1, **data2;
SORT_REC *sort_rec;
int first_run;
int cmp;
error = NO_ERROR;
compare = sort_param->cmp_fn;
compare_arg = sort_param->cmp_arg;
for (i = 0; i < SORT_MAX_HALF_FILES; i++)
{
in_act_bufno[i] = 0;
in_last_buf[i] = 0;
act_slot[i] = 0;
last_slot[i] = 0;
in_sectaddr[i] = NULL;
in_cur_bufaddr[i] = NULL;
smallest_elem_ptr[i].data = NULL;
smallest_elem_ptr[i].area_size = 0;
long_recdes[i].data = NULL;
long_recdes[i].area_size = 0;
}
last_elem_ptr.data = NULL;
last_elem_ptr.area_size = 0;
last_long_recdes.data = NULL;
last_long_recdes.area_size = 0;
for (i = 0; i < (int) DIM (cur_page); i++)
{
cur_page[i] = 0;
}
if (sort_param->in_half == 0)
{
out_half = sort_param->half_files;
}
else
{
out_half = 0;
}
/* OUTER LOOP */
/* While there are more than one input files with different runs to merge */
while ((act_infiles = sort_get_numpages_of_active_infiles (sort_param)) > 1)
{
/* Check if output files has enough pages; if not allocate new pages */
error = sort_checkalloc_numpages_of_outfiles (thread_p, sort_param);
if (error != NO_ERROR)
{
ASSERT_ERROR ();
goto bailout;
}
/* Initialize the current pages of all temp files to 0 */
for (i = 0; i < sort_param->tot_tempfiles; i++)
{
cur_page[i] = 0;
}
/* Distribute the internal memory to the input and output sections */
in_sectsize = sort_find_inbuf_size (sort_param->tot_buffers, act_infiles);
out_sectsize = sort_param->tot_buffers - in_sectsize * act_infiles;
/* Set the address of each input section */
for (i = 0; i < act_infiles; i++)
{
in_sectaddr[i] = sort_param->internal_memory + (i * in_sectsize * DB_PAGESIZE);
}
/* Set the address of output section */
out_sectaddr = sort_param->internal_memory + (act_infiles * in_sectsize * DB_PAGESIZE);
cur_outfile = out_half;
/* Find how many runs will be produced in this iteration */
num_runs = 0;
for (i = sort_param->in_half; i < sort_param->in_half + act_infiles; i++)
{
len = sort_get_num_file_contents (&sort_param->file_contents[i]);
if (len > num_runs)
{
num_runs = len;
}
}
if (num_runs == 1)
{
very_last_run = true;
}
/* PRODUCE RUNS */
for (j = num_runs; j > 0; j--)
{
if (!very_last_run && (j == 1))
{
/* LAST RUN OF THIS ITERATION */
/* Last iteration of the outer loop ; some of the input files might have become empty. */
pre_act_infiles = act_infiles;
act_infiles = sort_get_numpages_of_active_infiles (sort_param);
if (act_infiles != pre_act_infiles)
{
/* Some of the active input files became inactive */
if (act_infiles == 1)
{
/* ONE ACTIVE INFILE */
/*
* There is only one active input file (i.e. there is
* only one input run to produce the output run). So,
* there is no need to perform the merging actions. All
* needed is to copy this input run to the current output
* file.
*/
act = -1;
/* Find which input file contains this last input run */
for (i = sort_param->in_half; i < (sort_param->in_half + pre_act_infiles); i++)
{
if (sort_param->file_contents[i].first_run != -1)
{
act = i;
break;
}
}
if (act == -1)
{
goto bailout;
}
first_run = sort_param->file_contents[act].first_run;
cp_pages = sort_param->file_contents[act].num_pages[first_run];
error = sort_run_add_new (&sort_param->file_contents[cur_outfile], cp_pages);
if (error != NO_ERROR)
{
goto bailout;
}
sort_run_remove_first (&sort_param->file_contents[act]);
/* Use the whole internal_memory area as both the input and output buffer areas. */
while (cp_pages > 0)
{
if (cp_pages > sort_param->tot_buffers)
{
read_pages = sort_param->tot_buffers;
}
else
{
read_pages = cp_pages;
}
error =
sort_read_area (thread_p, &sort_param->temp[act], cur_page[act], read_pages,
sort_param->internal_memory);
if (error != NO_ERROR)
{
goto bailout;
}
cur_page[act] += read_pages;
error =
sort_write_area (thread_p, &sort_param->temp[cur_outfile], cur_page[cur_outfile],
read_pages, sort_param->internal_memory, sort_param->tde_encrypted);
if (error != NO_ERROR)
{
goto bailout;
}
cur_page[cur_outfile] += read_pages;
cp_pages -= read_pages;
}
/* Skip the remaining operations of the PRODUCE RUNS loop */
continue;
}
else
{
/* There are more than one active input files; redistribute buffers */
in_sectsize = sort_find_inbuf_size (sort_param->tot_buffers, act_infiles);
out_sectsize = sort_param->tot_buffers - in_sectsize * act_infiles;
/* Set the address of each input section */
for (i = 0; i < act_infiles; i++)
{
in_sectaddr[i] = sort_param->internal_memory + (i * in_sectsize * DB_PAGESIZE);
}
/* Set the address of output section */
out_sectaddr = sort_param->internal_memory + (act_infiles * in_sectsize * DB_PAGESIZE);
}
}
}
/* PRODUCE A NEW RUN */
/* INITIALIZE INPUT SECTIONS AND INPUT VARIABLES */
for (i = 0; i < act_infiles; i++)
{
big_index = sort_param->in_half + i;
first_run = sort_param->file_contents[big_index].first_run;
read_pages = sort_param->file_contents[big_index].num_pages[first_run];
if (in_sectsize < read_pages)
{
read_pages = in_sectsize;
}
error =
sort_read_area (thread_p, &sort_param->temp[big_index], cur_page[big_index], read_pages,
in_sectaddr[i]);
if (error != NO_ERROR)
{
goto bailout;
}
/* Increment the current page of this input_file */
cur_page[big_index] += read_pages;
first_run = sort_param->file_contents[big_index].first_run;
sort_param->file_contents[big_index].num_pages[first_run] -= read_pages;
/* Initialize input variables */
in_cur_bufaddr[i] = in_sectaddr[i];
in_act_bufno[i] = 0;
in_last_buf[i] = read_pages;
act_slot[i] = 0;
last_slot[i] = sort_spage_get_numrecs (in_cur_bufaddr[i]);
if (sort_spage_get_record (in_cur_bufaddr[i], act_slot[i], &smallest_elem_ptr[i], PEEK) != S_SUCCESS)
{
er_set (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, ER_SORT_TEMP_PAGE_CORRUPTED, 0);
error = ER_SORT_TEMP_PAGE_CORRUPTED;
goto bailout;
}
/* If this is a long record retrieve it */
if (smallest_elem_ptr[i].type == REC_BIGONE)
{
if (sort_retrieve_longrec (thread_p, &smallest_elem_ptr[i], &long_recdes[i]) == NULL)
{
ASSERT_ERROR ();
error = er_errid ();
goto bailout;
}
}
}
for (i = 0, p = sr_list; i < (act_infiles - 1); p = p->next)
{
p->next = (SORT_REC_LIST *) ((char *) p + sizeof (SORT_REC_LIST));
p->rec_pos = i++;
}
p->next = NULL;
p->rec_pos = i;
for (s = sr_list; s; s = s->next)
{
for (p = s->next; p; p = p->next)
{
do_swap = false;
data1 = ((smallest_elem_ptr[s->rec_pos].type == REC_BIGONE)
? &(long_recdes[s->rec_pos].data) : &(smallest_elem_ptr[s->rec_pos].data));
data2 = ((smallest_elem_ptr[p->rec_pos].type == REC_BIGONE)
? &(long_recdes[p->rec_pos].data) : &(smallest_elem_ptr[p->rec_pos].data));
cmp = (*compare) (data1, data2, compare_arg);
if (cmp > 0)
{
do_swap = true;
}
if (do_swap)
{
tmp_pos = s->rec_pos;
s->rec_pos = p->rec_pos;
p->rec_pos = tmp_pos;
}
}
}
/* set min_p to point minimum record */
min_p = sr_list; /* min_p->rec_pos is min record */
/* last element comparison */
last_elem_is_min = false;
p = min_p->next; /* second smallest element */
if (p)
{
/* STEP 1: get last_elem */
if (sort_spage_get_record (in_cur_bufaddr[min_p->rec_pos], (last_slot[min_p->rec_pos] - 1),
&last_elem_ptr, PEEK) != S_SUCCESS)
{
error = ER_SORT_TEMP_PAGE_CORRUPTED;
er_set (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, error, 0);
goto bailout;
}
/* if this is a long record then retrieve it */
if (last_elem_ptr.type == REC_BIGONE)
{
if (sort_retrieve_longrec (thread_p, &last_elem_ptr, &last_long_recdes) == NULL)
{
ASSERT_ERROR ();
error = er_errid ();
goto bailout;
}
}
/* STEP 2: compare last, p */
data1 = ((last_elem_ptr.type == REC_BIGONE) ? &(last_long_recdes.data) : &(last_elem_ptr.data));
data2 = ((smallest_elem_ptr[p->rec_pos].type == REC_BIGONE)
? &(long_recdes[p->rec_pos].data) : &(smallest_elem_ptr[p->rec_pos].data));
cmp = (*compare) (data1, data2, compare_arg);
if (cmp <= 0)
{
last_elem_is_min = true;
}
}
/* INITIALIZE OUTPUT SECTION AND OUTPUT VARIABLES */
out_act_bufno = 0;
out_cur_bufaddr = out_sectaddr;
for (i = 0; i < out_sectsize; i++)
{
/* Initialize each buffer to contain a slotted page */
sort_spage_initialize (out_sectaddr + (i * DB_PAGESIZE), UNANCHORED_KEEP_SEQUENCE, MAX_ALIGNMENT);
}
/* Initialize the size of next run to zero */
out_runsize = 0;
/* In parallel sort, put_fn will be performed by the parent thread. save last file index. */
if (very_last_run && SORT_IS_PARALLEL (sort_param))
{
sort_param->px_result_file_idx = cur_outfile;
}
for (;;)
{
/* OUTPUT A RECORD */
/* FIND MINIMUM RECORD IN THE INPUT AREA */
min = min_p->rec_pos;
if (very_last_run && !SORT_IS_PARALLEL (sort_param))
{
/* OUTPUT THE RECORD */
/* Obtain the output record for this temporary record */
if (smallest_elem_ptr[min].type == REC_BIGONE)
{
error = (*sort_param->put_fn) (thread_p, &long_recdes[min], sort_param->put_arg);
if (error != NO_ERROR)
{
goto bailout;
}
}
else
{
sort_rec = (SORT_REC *) (smallest_elem_ptr[min].data);
/* cut-off link used in Internal Sort */
sort_rec->next = NULL;
error = (*sort_param->put_fn) (thread_p, &smallest_elem_ptr[min], sort_param->put_arg);
if (error != NO_ERROR)
{
goto bailout;
}
}
}
else
{
/* OUTPUT THE MINIMUM RECORD TO THE OUTPUT AREA */
/* Insert this record to the output area */
if (sort_spage_insert (out_cur_bufaddr, &smallest_elem_ptr[min]) == NULL_SLOTID)
{
/* Current output buffer is full */
if (++out_act_bufno < out_sectsize)
{
/* There is another buffer in the output section; so insert the new record there */
out_cur_bufaddr += DB_PAGESIZE;
if (sort_spage_insert (out_cur_bufaddr, &smallest_elem_ptr[min]) == NULL_SLOTID)
{
/*
* Slotted page module refuses to insert a short
* size record (a temporary record that was
* already in a slotted page) to an empty page.
* This should never happen.
*/
er_set (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, ER_GENERIC_ERROR, 0);
error = ER_GENERIC_ERROR;
goto bailout;
}
}
else
{
/* Output section is full */
/* Flush output section */
error =
sort_write_area (thread_p, &sort_param->temp[cur_outfile], cur_page[cur_outfile],
out_sectsize, out_sectaddr, sort_param->tde_encrypted);
if (error != NO_ERROR)
{
goto bailout;
}
cur_page[cur_outfile] += out_sectsize;
out_runsize += out_sectsize;
/* Initialize output section and output variables */
out_act_bufno = 0;
out_cur_bufaddr = out_sectaddr;
for (i = 0; i < out_sectsize; i++)
{
/* Initialize each buffer to contain a slotted page */
sort_spage_initialize (out_sectaddr + (i * DB_PAGESIZE), UNANCHORED_KEEP_SEQUENCE,
MAX_ALIGNMENT);
}
if (sort_spage_insert (out_cur_bufaddr, &smallest_elem_ptr[min]) == NULL_SLOTID)
{
/*
* Slotted page module refuses to insert a short
* size record (a temporary record that was
* already in a slotted page) to an empty page.
* This should never happen.
*/
er_set (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, ER_GENERIC_ERROR, 0);
error = ER_GENERIC_ERROR;
goto bailout;
}
}
}
}
/* PROCEED THE smallest_elem_ptr[min] TO NEXT RECORD */
if (++act_slot[min] >= last_slot[min])
{
/* The current input page is finished */
last_elem_is_min = false;
if (++in_act_bufno[min] < in_last_buf[min])
{
/* Switch to the next page in the input buffer */
in_cur_bufaddr[min] = in_sectaddr[min] + in_act_bufno[min] * DB_PAGESIZE;
}
else
{
/* The input section is finished */
big_index = sort_param->in_half + min;
first_run = sort_param->file_contents[big_index].first_run;
if (sort_param->file_contents[big_index].num_pages[first_run])
{
/* There are still some pages in the current input run */
in_cur_bufaddr[min] = in_sectaddr[min];
read_pages = sort_param->file_contents[big_index].num_pages[first_run];
if (in_sectsize < read_pages)
{
read_pages = in_sectsize;
}
in_last_buf[min] = read_pages;
error =
sort_read_area (thread_p, &sort_param->temp[big_index], cur_page[big_index], read_pages,
in_cur_bufaddr[min]);
if (error != NO_ERROR)
{
goto bailout;
}
/* Increment the current page of this input_file */
cur_page[big_index] += read_pages;
in_act_bufno[min] = 0;
first_run = sort_param->file_contents[big_index].first_run;
sort_param->file_contents[big_index].num_pages[first_run] -= read_pages;
}
else
{
/* Current input run on this input file has finished */
/* remove current input run in input section. proceed to next minimum record. */
min_p = min_p->next;
if (min_p == NULL)
{
/* all "smallest_elem_ptr" are NULL; so break */
break;
}
else
{
/* Don't try to get the next record on this input section */
continue;
}
}
}
act_slot[min] = 0;
last_slot[min] = sort_spage_get_numrecs (in_cur_bufaddr[min]);
}
if (sort_spage_get_record (in_cur_bufaddr[min], act_slot[min], &smallest_elem_ptr[min], PEEK) !=
S_SUCCESS)
{
er_set (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, ER_SORT_TEMP_PAGE_CORRUPTED, 0);
error = ER_SORT_TEMP_PAGE_CORRUPTED;
goto bailout;
}
/* If this is a long record retrieve it */
if (smallest_elem_ptr[min].type == REC_BIGONE)
{
if (sort_retrieve_longrec (thread_p, &smallest_elem_ptr[min], &long_recdes[min]) == NULL)
{
ASSERT_ERROR ();
error = er_errid ();
goto bailout;
}
}
/* find minimum */
if (last_elem_is_min == true)
{
/* already find min */
;
}
else
{
for (s = min_p; s; s = s->next)
{
p = s->next;
if (p == NULL)
{
/* there is only one record */
break;
}
do_swap = false;
data1 = ((smallest_elem_ptr[s->rec_pos].type == REC_BIGONE)
? &(long_recdes[s->rec_pos].data) : &(smallest_elem_ptr[s->rec_pos].data));
data2 = ((smallest_elem_ptr[p->rec_pos].type == REC_BIGONE)
? &(long_recdes[p->rec_pos].data) : &(smallest_elem_ptr[p->rec_pos].data));
cmp = (*compare) (data1, data2, compare_arg);
if (cmp > 0)
{
do_swap = true;
}
if (do_swap)
{
/* swap s, p */
tmp_pos = s->rec_pos;
s->rec_pos = p->rec_pos;
p->rec_pos = tmp_pos;
}
else
{
/* sr_list is completely sorted */
break;
}
}
/* new input page is entered */
if (act_slot[min_p->rec_pos] == 0)
{
/* last element comparison */
p = min_p->next; /* second smallest element */
if (p)
{
/* STEP 1: get last_elem */
if (sort_spage_get_record (in_cur_bufaddr[min_p->rec_pos], (last_slot[min_p->rec_pos] - 1),
&last_elem_ptr, PEEK) != S_SUCCESS)
{
error = ER_SORT_TEMP_PAGE_CORRUPTED;
er_set (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, error, 0);
goto bailout;
}
/* if this is a long record retrieve it */
if (last_elem_ptr.type == REC_BIGONE)
{
if (sort_retrieve_longrec (thread_p, &last_elem_ptr, &last_long_recdes) == NULL)
{
ASSERT_ERROR ();
error = er_errid ();
goto bailout;
}
}
/* STEP 2: compare last, p */
data1 =
((last_elem_ptr.type == REC_BIGONE) ? &(last_long_recdes.data) : &(last_elem_ptr.data));
data2 = ((smallest_elem_ptr[p->rec_pos].type == REC_BIGONE)
? &(long_recdes[p->rec_pos].data) : &(smallest_elem_ptr[p->rec_pos].data));
cmp = (*compare) (data1, data2, compare_arg);
if (cmp <= 0)
{
last_elem_is_min = true;
}
}
}
}
}
if (!(very_last_run && !SORT_IS_PARALLEL (sort_param)))
{
/* Flush whatever is left on the output section */
out_act_bufno++; /* Since 0 refers to the first active buffer */
error =
sort_write_area (thread_p, &sort_param->temp[cur_outfile], cur_page[cur_outfile], out_act_bufno,
out_sectaddr, sort_param->tde_encrypted);
if (error != NO_ERROR)
{
goto bailout;
}
cur_page[cur_outfile] += out_act_bufno;
out_runsize += out_act_bufno;
}
/* END UP THIS RUN */
/* Remove previous first_run nodes of the file_contents lists of the input files */
for (i = sort_param->in_half; i < sort_param->in_half + sort_param->half_files; i++)
{
sort_run_remove_first (&sort_param->file_contents[i]);
}
/* Add a new node to the file_contents list of the current output file */
error = sort_run_add_new (&sort_param->file_contents[cur_outfile], out_runsize);
if (error != NO_ERROR)
{
goto bailout;
}
/* PRODUCE A NEW RUN */
/* Switch to the next out file */
if (++cur_outfile >= sort_param->half_files + out_half)
{
cur_outfile = out_half;
}
}
/* Exchange input and output file indices */
temp = sort_param->in_half;
sort_param->in_half = out_half;
out_half = temp;
}
bailout:
for (i = 0; i < sort_param->half_files; i++)
{
if (long_recdes[i].data != NULL)
{
free_and_init (long_recdes[i].data);
}
}
if (last_long_recdes.data)
{
free_and_init (last_long_recdes.data);
}
return (error == SORT_PUT_STOP) ? NO_ERROR : error;
}
/* AUXILIARY FUNCTIONS */
/*
* sort_get_avg_numpages_of_nonempty_tmpfile () - Return average number of pages
* currently occupied by nonempty
* temporary file
* return:
* sort_param(in): Sort paramater
*/
static int
sort_get_avg_numpages_of_nonempty_tmpfile (SORT_PARAM * sort_param)
{
int f;
int sum, i;
int nonempty_temp_file_num = 0;
sum = 0;
for (i = 0; i < sort_param->tot_tempfiles; i++)
{
/* If the list is not empty */
f = sort_param->file_contents[i].first_run;
if (f > -1)
{
nonempty_temp_file_num++;
for (; f <= sort_param->file_contents[i].last_run; f++)
{
sum += sort_param->file_contents[i].num_pages[f];
}
}
}
return (sum / MAX (1, nonempty_temp_file_num));
}
/*
* sort_return_used_resources () - Return system resource used for sorting
* return: void
* sort_param(in): Sort paramater
*
* Note: Clear the sort parameter structure by deallocating any allocated
* memory areas and destroying any temporary files and volumes.
*/
static void
sort_return_used_resources (THREAD_ENTRY * thread_p, SORT_PARAM * sort_param, PARALLEL_TYPE parallel_type)
{
int k;
#if defined(SERVER_MODE)
int rv;
#endif /* SERVER_MODE */
if (sort_param == NULL)
{
return; /* nop */
}
if (sort_param->internal_memory)
{
free_and_init (sort_param->internal_memory);
}
if (parallel_type == PX_SINGLE || parallel_type == PX_MAIN_IN_PARALLEL)
{
for (k = 0; k < sort_param->tot_tempfiles; k++)
{
if (sort_param->temp[k].volid != NULL_VOLID)
{
(void) file_temp_retire (thread_p, &sort_param->temp[k]);
}
}
}
if (sort_param->multipage_file.volid != NULL_VOLID)
{
(void) file_temp_retire (thread_p, &(sort_param->multipage_file));
}
for (k = 0; k < SORT_MAX_TOT_FILES; k++)
{
if (sort_param->file_contents[k].num_pages != NULL)
{
free_and_init (sort_param->file_contents[k].num_pages);
}
}
if (parallel_type == PX_THREAD_IN_PARALLEL)
{
if (sort_param->px_type == SORT_ORDER_BY)
{
if (sort_param->get_arg != NULL)
{
SORT_INFO *sort_info_p = (SORT_INFO *) sort_param->get_arg;
if (sort_info_p->input_file)
{
qfile_free_list_id (sort_info_p->input_file);
}
if (sort_info_p->s_id->s_id != NULL)
{
db_private_free_and_init (thread_p, sort_info_p->s_id->s_id);
}
if (sort_info_p->s_id != NULL)
{
db_private_free_and_init (thread_p, sort_info_p->s_id);
}
db_private_free_and_init (thread_p, sort_param->get_arg);
}
if (sort_param->put_arg != NULL)
{
SORT_INFO *sort_info_p = (SORT_INFO *) sort_param->put_arg;
SORT_INFO *ori_sort_info_p = (SORT_INFO *) sort_param->ori_sort_param->put_arg;
if (sort_info_p->output_file && sort_info_p->output_file != ori_sort_info_p->output_file)
{
qfile_free_list_id (sort_info_p->output_file);
}
if (sort_info_p->s_id->s_id != NULL)
{
db_private_free_and_init (thread_p, sort_info_p->s_id->s_id);
}
if (sort_info_p->s_id != NULL)
{
db_private_free_and_init (thread_p, sort_info_p->s_id);
}
db_private_free_and_init (thread_p, sort_param->put_arg);
}
}
else if (sort_param->px_type == SORT_INDEX_LEAF)
{
if (sort_param->get_arg != NULL)
{
SORT_ARGS *sort_args_p = (SORT_ARGS *) sort_param->get_arg;
if (sort_args_p->ftab_sets != NULL)
{
sort_args_p->ftab_sets->~vector ();
free_and_init (sort_args_p->ftab_sets);
}
free_and_init (sort_param->get_arg);
}
}
else
{
assert (false);
return;
}
}
}
/*
* sort_add_new_file () - Create a new temporary file for sorting purposes
* return: NO_ERROR
* vfid(in): Set to the created file identifier
* file_pg_cnt_est(in): Estimated file page count
* force_alloc(in): Allocate file pages now ?
* tde_encrypted(in): whether the file has to be encrypted or not for TDE
*/
static int
sort_add_new_file (THREAD_ENTRY * thread_p, VFID * vfid, int file_pg_cnt_est, bool force_alloc, bool tde_encrypted)
{
VPID new_vpid;
TDE_ALGORITHM tde_algo = TDE_ALGORITHM_NONE;
int ret = NO_ERROR;
/* todo: sort file is a case I missed that seems to use file_find_nthpages. I don't know if it can be optimized to
* work without numerable files, that remains to be seen. */
ret = file_create_temp_numerable (thread_p, file_pg_cnt_est, vfid);
if (ret != NO_ERROR)
{
ASSERT_ERROR ();
return ret;
}
if (VFID_ISNULL (vfid))
{
assert_release (false);
return ER_FAILED;
}
if (tde_encrypted)
{
tde_algo = (TDE_ALGORITHM) prm_get_integer_value (PRM_ID_TDE_DEFAULT_ALGORITHM);
ret = file_apply_tde_algorithm (thread_p, vfid, tde_algo);
if (ret != NO_ERROR)
{
ASSERT_ERROR ();
file_temp_retire (thread_p, vfid);
VFID_SET_NULL (vfid);
return ret;
}
}
if (force_alloc == false)
{
return NO_ERROR;
}
/* page allocation force is specified, allocate pages for the file */
/* todo: we don't have multiple page allocation, but allocation should be fast enough */
for (; file_pg_cnt_est > 0; file_pg_cnt_est--)
{
ret = file_alloc (thread_p, vfid, NULL, NULL, &new_vpid, NULL);
if (ret != NO_ERROR)
{
ASSERT_ERROR ();
file_temp_retire (thread_p, vfid);
VFID_SET_NULL (vfid);
return ret;
}
}
return NO_ERROR;
}
#if defined(SERVER_MODE)
/*
* sort_copy_sort_param () - copy sort param from src_param to dest_param
* return: NO_ERROR
* dest_param(in):
* src_param(in):
* parallel_num(in):
*/
int
sort_copy_sort_param (THREAD_ENTRY * thread_p, SORT_PARAM * px_sort_param, SORT_PARAM * sort_param, int parallel_num)
{
int error = NO_ERROR;
int i, j;
/* init stats */
memset (&sort_param->orderby_stats, 0, sizeof (sort_param->orderby_stats));
/* copy from origin sort param */
for (i = 0; i < parallel_num; i++)
{
memcpy (&px_sort_param[i], sort_param, sizeof (SORT_PARAM));
}
/* init */
for (i = 0; i < parallel_num; i++)
{
px_sort_param[i].internal_memory = NULL;
for (j = 0; j < SORT_MAX_TOT_FILES; j++)
{
px_sort_param[i].file_contents[j].num_pages = NULL;
}
}
/* For parallel sort, tot_buffers must be at least 8 (bump up if currently < 8) */
if (sort_param->tot_buffers < 8)
{
char *new_internal_memory;
int saved_tot_buffers = sort_param->tot_buffers;
sort_param->tot_buffers = 8;
new_internal_memory =
(char *) realloc (sort_param->internal_memory, (size_t) sort_param->tot_buffers * (size_t) DB_PAGESIZE);
if (new_internal_memory == NULL)
{
sort_param->tot_buffers = saved_tot_buffers;
error = ER_OUT_OF_VIRTUAL_MEMORY;
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, error, 1, (size_t) (8 * DB_PAGESIZE));
goto clear;
}
sort_param->internal_memory = new_internal_memory;
}
/* alloc new memory */
for (i = 0; i < parallel_num; i++)
{
px_sort_param[i].internal_memory = (char *) malloc ((size_t) sort_param->tot_buffers * (size_t) DB_PAGESIZE);
if (px_sort_param[i].internal_memory == NULL)
{
error = ER_OUT_OF_VIRTUAL_MEMORY;
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, error, 1, (size_t) (sort_param->tot_buffers * DB_PAGESIZE));
goto clear;
}
px_sort_param[i].tot_buffers = sort_param->tot_buffers; /* match allocated size (may be 8 for parallel) */
for (j = 0; j < SORT_MAX_TOT_FILES; j++)
{
/* Initilize file contents list */
px_sort_param[i].file_contents[j].num_pages = (int *) calloc (SORT_INITIAL_DYN_ARRAY_SIZE, sizeof (int));
if (px_sort_param[i].file_contents[j].num_pages == NULL)
{
error = ER_OUT_OF_VIRTUAL_MEMORY;
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, error, 1,
(size_t) (SORT_INITIAL_DYN_ARRAY_SIZE * sizeof (int)));
goto clear;
}
px_sort_param[i].file_contents[j].num_slots = SORT_INITIAL_DYN_ARRAY_SIZE;
px_sort_param[i].file_contents[j].first_run = -1;
px_sort_param[i].file_contents[j].last_run = -1;
}
/* init px variable */
px_sort_param[i].px_status = PX_PROGRESS;
px_sort_param[i].px_parallel_num = parallel_num;
px_sort_param[i].px_result_file_idx = 0;
/* Copy the parent's thread_p. */
px_sort_param[i].px_orig_thread_p =
thread_p->m_px_orig_thread_entry ? thread_p->m_px_orig_thread_entry : thread_p;
px_sort_param[i].ori_sort_param = sort_param;
px_sort_param[i].main_error_context = &cuberr::context::get_thread_local_context ();
}
clear:
if (error != NO_ERROR)
{
/* free memory */
for (i = 0; i < parallel_num; i++)
{
if (px_sort_param[i].internal_memory != NULL)
{
free_and_init (px_sort_param[i].internal_memory);
}
for (j = 0; j < SORT_MAX_TOT_FILES; j++)
{
if (px_sort_param[i].file_contents[j].num_pages != NULL)
{
free_and_init (px_sort_param[i].file_contents[j].num_pages);
}
}
}
}
return error;
}
/*
* sort_split_input_temp_file () - split input temp file
* return: NO_ERROR
* px_sort_param(in):
* sort_param(in):
* parallel_num(in):
*/
int
sort_split_input_temp_file (THREAD_ENTRY * thread_p, SORT_PARAM * px_sort_param, SORT_PARAM * sort_param,
int parallel_num)
{
int error = NO_ERROR;
int i = 0, j = 0, splitted_num_page;
bool is_first_vpid = false;
PAGE_PTR page_p;
VPID next_vpid, prev_vpid, first_vpid[SORT_MAX_PARALLEL], last_vpid[SORT_MAX_PARALLEL];
SORT_INFO *sort_info_p, *org_sort_info_p;
assert (parallel_num <= SORT_MAX_PARALLEL);
/* init vpid */
for (i = 0; i < parallel_num; i++)
{
first_vpid[i] = VPID_INITIALIZER;
last_vpid[i] = VPID_INITIALIZER;
}
/* page_cnt contains ovfl_page. If the length of tuple is longer than page, split by the number of tuples. */
sort_info_p = (SORT_INFO *) sort_param->get_arg;
splitted_num_page = sort_info_p->input_file->page_cnt / parallel_num;
splitted_num_page = MIN (splitted_num_page, sort_info_p->input_file->tuple_cnt / parallel_num);
i = 0;
is_first_vpid = false;
prev_vpid = sort_info_p->input_file->first_vpid;
/* TO_DO : dont fix all pages to split. Consider using a numerable temporary file */
while (true)
{
page_p = qmgr_get_old_page (thread_p, &prev_vpid, sort_info_p->input_file->tfile_vfid);
if (page_p == NULL)
{
return ER_FAILED;
}
if (is_first_vpid)
{
QFILE_PUT_PREV_VPID_NULL (page_p);
qmgr_set_dirty_page (thread_p, page_p, DONT_FREE, NULL, sort_info_p->input_file->tfile_vfid);
is_first_vpid = false;
}
QFILE_GET_NEXT_VPID (&next_vpid, page_p);
if (VPID_ISNULL (&next_vpid))
{
qmgr_free_old_page_and_init (thread_p, page_p, sort_info_p->input_file->tfile_vfid);
break;
}
else if (++j >= splitted_num_page)
{
QFILE_PUT_NEXT_VPID_NULL (page_p);
qmgr_set_dirty_page (thread_p, page_p, DONT_FREE, NULL, sort_info_p->input_file->tfile_vfid);
is_first_vpid = true;
first_vpid[i] = next_vpid;
last_vpid[i] = prev_vpid;
i++;
if (i < parallel_num - 1)
{
j = 0;
}
else
{
qmgr_free_old_page_and_init (thread_p, page_p, sort_info_p->input_file->tfile_vfid);
/* init prev page id */
page_p = qmgr_get_old_page (thread_p, &next_vpid, sort_info_p->input_file->tfile_vfid);
if (page_p == NULL)
{
return ER_FAILED;
}
QFILE_PUT_PREV_VPID_NULL (page_p);
qmgr_set_dirty_page (thread_p, page_p, DONT_FREE, NULL, sort_info_p->input_file->tfile_vfid);
qmgr_free_old_page_and_init (thread_p, page_p, sort_info_p->input_file->tfile_vfid);
break;
}
}
prev_vpid = next_vpid;
qmgr_free_old_page_and_init (thread_p, page_p, sort_info_p->input_file->tfile_vfid);
}
/* add splitted file info */
for (i = 0; i < parallel_num; i++)
{
sort_info_p = (SORT_INFO *) px_sort_param[i].get_arg;
org_sort_info_p = (SORT_INFO *) sort_param->get_arg;
sort_info_p->input_file = qfile_clone_list_id (org_sort_info_p->input_file, true, QFILE_SKIP_DEPENDENT);
if (sort_info_p->input_file == NULL)
{
return ER_FAILED;
}
/* tuple_cnt and page_cnt put approximately. */
/* It can be put precisely from the page header, but not have to be precise for later process. */
sort_info_p->input_file->tuple_cnt = org_sort_info_p->input_file->tuple_cnt / parallel_num;
sort_info_p->input_file->page_cnt = splitted_num_page;
sort_info_p->input_file->first_vpid = (i == 0) ? org_sort_info_p->input_file->first_vpid : first_vpid[i - 1];
sort_info_p->input_file->last_vpid =
(i == parallel_num - 1) ? org_sort_info_p->input_file->last_vpid : last_vpid[i];
}
return error;
}
/*
* sort_merge_run_for_parallel () - merge run for parallel
* return: NO_ERROR
* px_sort_param(in):
* sort_param(in):
* parallel_num(in):
*/
int
sort_merge_run_for_parallel (THREAD_ENTRY * thread_p, SORT_PARAM * px_sort_param, SORT_PARAM * sort_param,
int parallel_num)
{
int error = NO_ERROR;
int i = 0, idx = 0, file_pg_cnt_est;
int remaining_run, level, merge_num;
RESULT_RUN result_run[SORT_MAX_PARALLEL];
QFILE_LIST_ID *origin_list_id, *mergeable_list_id;
SORT_INFO *sort_info_p;
if (parallel_num > SORT_MAX_PARALLEL)
{
return ER_FAILED;
}
/* init result_run */
for (i = 0; i < parallel_num; i++)
{
result_run[i].temp_file = px_sort_param[i].temp[px_sort_param[i].px_result_file_idx];
result_run[i].num_pages = px_sort_param[i].file_contents[px_sort_param[i].px_result_file_idx].num_pages[0];
}
remaining_run = parallel_num;
level = 0;
/* merge result run */
while (remaining_run > 1)
{
merge_num = (remaining_run + (SORT_PX_MERGE_FILES - 1)) / SORT_PX_MERGE_FILES;
for (i = 0; i < merge_num; i++)
{
int first_idx = i * pow (SORT_PX_MERGE_FILES, level + 1);
/* init file info */
int half_files = MIN (remaining_run - (first_idx / pow (SORT_PX_MERGE_FILES, level)), SORT_PX_MERGE_FILES);
if (half_files == 1)
{
/* skip last one file */
continue;
}
px_sort_param[i].px_result_file_idx = 0;
px_sort_param[i].half_files = half_files;
px_sort_param[i].tot_tempfiles = half_files * 2;
px_sort_param[i].in_half = 0;
/* copy temp file and file contents */
for (int j = 0; j < px_sort_param[i].half_files; j++)
{
idx = (level == 0) ? (j + first_idx) : ((j * pow (SORT_PX_MERGE_FILES, level)) + first_idx);
px_sort_param[i].temp[j] = result_run[idx].temp_file;
/* copy the number of pages for one run */
px_sort_param[i].file_contents[j].num_pages[0] = result_run[idx].num_pages;
px_sort_param[i].file_contents[j].first_run = 0;
px_sort_param[i].file_contents[j].last_run = 0;
}
for (int j = px_sort_param[i].half_files; j < px_sort_param[i].tot_tempfiles; j++)
{
/* init temp file and contents */
px_sort_param[i].temp[j].volid = NULL_VOLID;
px_sort_param[i].file_contents[j].first_run = -1;
px_sort_param[i].file_contents[j].last_run = -1;
}
px_sort_param[i].px_result_run = &result_run[first_idx];
px_sort_param[i].px_status = PX_PROGRESS;
parallel_query::callable_task * task =
new parallel_query::callable_task (sort_param->px_worker_manager,
std::bind (sort_merge_nruns_parallel, std::placeholders::_1,
&px_sort_param[i]));
sort_param->px_worker_manager->push_task (task);
}
SORT_WAIT_PARALLEL (merge_num, sort_param, px_sort_param);
if (error != NO_ERROR)
{
goto cleanup;
}
remaining_run = merge_num;
level++;
}
/* split last run into px_sort_param */
sort_split_last_run (thread_p, px_sort_param, sort_param, parallel_num);
/* put result from sort location info */
SORT_EXECUTE_PARALLEL (parallel_num, px_sort_param, sort_put_result_for_parallel);
/* wait for threads */
SORT_WAIT_PARALLEL (parallel_num, sort_param, px_sort_param);
if (error != NO_ERROR)
{
goto cleanup;
}
/* merge last output file */
origin_list_id = ((SORT_INFO *) px_sort_param[0].put_arg)->output_file;
/* merge lists */
for (int i = 1; i < parallel_num; i++)
{
sort_info_p = (SORT_INFO *) px_sort_param[i].put_arg;
/* check NULL list id */
if (VPID_ISNULL (&sort_info_p->output_file->first_vpid))
{
/* skip the empty list */
continue;
}
/* If QUERY_CACHE(FILE_QUERY_AREA), Merged list not possible. */
if (origin_list_id->tfile_vfid->temp_file_type == FILE_QUERY_AREA)
{
error = qfile_append_list (thread_p, origin_list_id, sort_info_p->output_file);
if (error != NO_ERROR)
{
goto cleanup;
}
qfile_destroy_list (thread_p, sort_info_p->output_file);
QFILE_FREE_AND_INIT_LIST_ID (sort_info_p->output_file);
}
else
{
error = qfile_connect_list (thread_p, origin_list_id, sort_info_p->output_file);
if (error != NO_ERROR)
{
goto cleanup;
}
sort_info_p->output_file = NULL;
}
}
/* reopen final output file */
error = qfile_reopen_list_as_append_mode (thread_p, origin_list_id);
if (error != NO_ERROR)
{
goto cleanup;
}
cleanup:
/* clear input_file for px */
for (int i = 0; i < parallel_num; i++)
{
sort_info_p = (SORT_INFO *) px_sort_param[i].get_arg;
if (sort_info_p->input_file)
{
qfile_free_list_id (sort_info_p->input_file);
sort_info_p->input_file = NULL;
}
}
/* clear output_file for px */
for (int i = 1; i < parallel_num; i++)
{
/* index 0 is origin output file. it'll be freed in qfile_sort_list_with_func() */
sort_info_p = (SORT_INFO *) px_sort_param[i].put_arg;
if (sort_info_p->output_file)
{
qfile_free_list_id (sort_info_p->output_file);
sort_info_p->output_file = NULL;
}
}
return error;
}
int
sort_merge_run_for_parallel_index_leaf_build (THREAD_ENTRY * thread_p, SORT_PARAM * px_sort_param,
SORT_PARAM * sort_param, int parallel_num)
{
int error = NO_ERROR;
int i = 0, idx = 0;
int remaining_run, level, merge_num;
RESULT_RUN result_run[SORT_MAX_PARALLEL];
SORT_ARGS *sort_args_p;
if (parallel_num > SORT_MAX_PARALLEL)
{
return ER_FAILED;
}
/* init result_run */
for (i = 0; i < parallel_num; i++)
{
result_run[i].temp_file = px_sort_param[i].temp[px_sort_param[i].px_result_file_idx];
result_run[i].num_pages = px_sort_param[i].file_contents[px_sort_param[i].px_result_file_idx].num_pages[0];
}
remaining_run = parallel_num;
level = 0;
/* merge result run */
while (remaining_run > 1)
{
merge_num = (remaining_run + (SORT_PX_MERGE_FILES - 1)) / SORT_PX_MERGE_FILES;
for (i = 0; i < merge_num; i++)
{
int first_idx = i * pow (SORT_PX_MERGE_FILES, level + 1);
/* init file info */
int half_files = MIN (remaining_run - (first_idx / pow (SORT_PX_MERGE_FILES, level)), SORT_PX_MERGE_FILES);
if (half_files == 1)
{
/* skip last one file */
continue;
}
px_sort_param[i].px_result_file_idx = 0;
px_sort_param[i].in_half = 0;
/* copy temp file and file contents, skip empty (0-page) workers */
int valid_j = 0;
for (int j = 0; j < half_files; j++)
{
idx = (level == 0) ? (j + first_idx) : ((j * pow (SORT_PX_MERGE_FILES, level)) + first_idx);
if (result_run[idx].num_pages == 0)
{
continue;
}
px_sort_param[i].temp[valid_j] = result_run[idx].temp_file;
px_sort_param[i].file_contents[valid_j].num_pages[0] = result_run[idx].num_pages;
px_sort_param[i].file_contents[valid_j].first_run = 0;
px_sort_param[i].file_contents[valid_j].last_run = 0;
valid_j++;
}
if (valid_j <= 1)
{
/* 0 or 1 non-empty runs: no merge needed, propagate result directly */
if (valid_j == 1)
{
result_run[first_idx].temp_file = px_sort_param[i].temp[0];
result_run[first_idx].num_pages = px_sort_param[i].file_contents[0].num_pages[0];
}
else
{
result_run[first_idx].num_pages = 0;
}
px_sort_param[i].px_status = PX_DONE;
continue;
}
px_sort_param[i].half_files = valid_j;
px_sort_param[i].tot_tempfiles = valid_j * 2;
for (int j = valid_j; j < px_sort_param[i].tot_tempfiles; j++)
{
/* init temp file and contents */
px_sort_param[i].temp[j].volid = NULL_VOLID;
px_sort_param[i].file_contents[j].first_run = -1;
px_sort_param[i].file_contents[j].last_run = -1;
}
px_sort_param[i].px_result_run = &result_run[first_idx];
px_sort_param[i].px_status = PX_PROGRESS;
parallel_query::callable_task * task =
new parallel_query::callable_task (sort_param->px_worker_manager,
std::bind (sort_merge_nruns_parallel, std::placeholders::_1,
&px_sort_param[i]));
sort_param->px_worker_manager->push_task (task);
}
SORT_WAIT_PARALLEL (merge_num, sort_param, px_sort_param);
if (error != NO_ERROR)
{
return error;
}
remaining_run = merge_num;
level++;
}
sort_args_p = (SORT_ARGS *) sort_param->get_arg;
for (i = 0; i < parallel_num; i++)
{
SORT_ARGS *px_sort_args_p = (SORT_ARGS *) px_sort_param[i].get_arg;
sort_args_p->n_oids += px_sort_args_p->n_oids;
sort_args_p->n_nulls += px_sort_args_p->n_nulls;
}
log_sysop_start (thread_p);
if (btree_create_file
(thread_p, &sort_args_p->class_ids[0], sort_args_p->attr_ids[0], sort_args_p->btid->sys_btid) != NO_ERROR)
{
ASSERT_ERROR ();
log_sysop_abort (thread_p);
return ER_FAILED;
}
/* if loading is aborted or if transaction is aborted, vacuum must be notified before file is destoyed. */
vacuum_log_add_dropped_file (thread_p, &sort_args_p->btid->sys_btid->vfid, NULL, VACUUM_LOG_ADD_DROPPED_FILE_UNDO);
int result_file_idx = px_sort_param[0].px_result_file_idx;
sort_param->px_result_file_idx = result_file_idx;
sort_param->file_contents[result_file_idx].num_pages[0] =
px_sort_param[0].file_contents[result_file_idx].num_pages[0];
sort_param->temp[result_file_idx] = px_sort_param[0].temp[result_file_idx];
if (sort_put_result_from_tmpfile (thread_p, sort_param, 0) != NO_ERROR)
{
log_sysop_abort (thread_p);
error = ER_FAILED;
}
return error;
}
/*
* sort_merge_nruns () - merge n run
* return: NO_ERROR
* px_sort_param(in):
* sort_param(in):
* parallel_num(in):
*/
int
sort_merge_nruns (THREAD_ENTRY * thread_p, SORT_PARAM * sort_param)
{
int error = NO_ERROR;
int i = 0, idx = 0, file_pg_cnt_est;
/* Create output temporary files make file and temporary volume page count estimates */
file_pg_cnt_est = sort_get_avg_numpages_of_nonempty_tmpfile (sort_param);
file_pg_cnt_est = MAX (1, file_pg_cnt_est);
for (i = sort_param->half_files; i < sort_param->tot_tempfiles; i++)
{
error = sort_add_new_file (thread_p, &(sort_param->temp[i]), file_pg_cnt_est, true, sort_param->tde_encrypted);
if (error != NO_ERROR)
{
return error;
}
}
/* Merge the parallel processed results. */
if (sort_param->option == SORT_ELIM_DUP)
{
error = sort_exphase_merge_elim_dup (thread_p, sort_param);
}
else
{
/* SORT_DUP */
error = sort_exphase_merge (thread_p, sort_param);
}
/* save result run */
sort_param->px_result_run->temp_file = sort_param->temp[sort_param->px_result_file_idx];
sort_param->px_result_run->num_pages = sort_param->file_contents[sort_param->px_result_file_idx].num_pages[0];
/* retire temp file */
for (i = 0; i < sort_param->tot_tempfiles; i++)
{
if (sort_param->temp[i].volid != NULL_VOLID && i != sort_param->px_result_file_idx)
{
(void) file_temp_retire (thread_p, &sort_param->temp[i]);
VFID_SET_NULL (&sort_param->temp[i]);
}
}
return error;
}
/*
* sort_check_parallelism () - check the number of parallel processes
* return: parallel_num
* sort_parallel_type(in):
* sort_param(in):
*/
int
sort_check_parallelism (THREAD_ENTRY * thread_p, SORT_PARAM * sort_param)
{
int parallel_num = 1;
if (sort_param->px_type == SORT_ORDER_BY)
{
SORT_INFO *sort_info_p;
/* get scan id of input file */
sort_info_p = (SORT_INFO *) sort_param->get_arg;
parallel_num =
parallel_query::compute_parallel_degree (parallel_query::parallel_type::SORT, sort_info_p->input_file->page_cnt,
sort_info_p->parallelism /* hint */ );
if (parallel_num < 2)
{
/* single process */
return 1;
}
/* check tuple_cnt */
if (sort_info_p->input_file->tuple_cnt <= parallel_num)
{
return 1;
}
/* check worker */
sort_param->px_worker_manager = parallel_query::worker_manager::try_reserve_workers (parallel_num);
if (sort_param->px_worker_manager == NULL)
{
return 1;
}
else
{
return parallel_num;
}
}
else if (sort_param->px_type == SORT_INDEX_LEAF)
{
SORT_ARGS *sort_args_p = (SORT_ARGS *) sort_param->get_arg;
int n_user_pages = 0, n_sects = 0, tmp_pg = 0, tmp_sects = 0, error_code = NO_ERROR;
if (sort_args_p->n_classes > 1)
{
/* not partition, partition has own indexes, this means like this :
* create t1; create t2 under t1; */
return 1;
}
/* get number of pages to sort */
for (int i = 0; i < sort_args_p->n_classes; i++)
{
error_code = file_get_num_user_pages (thread_p, &sort_args_p->hfids[i].vfid, &tmp_pg);
if (error_code != NO_ERROR)
{
return 1;
}
error_code = file_get_num_data_sectors (thread_p, &sort_args_p->hfids[i].vfid, &tmp_sects);
if (error_code != NO_ERROR)
{
return 1;
}
n_user_pages += tmp_pg;
n_sects += tmp_sects;
}
parallel_num = parallel_query::compute_parallel_degree (parallel_query::parallel_type::SORT, n_user_pages,
-1 /* no hint at parallel index build */ );
if (parallel_num < 2)
{
/* single process */
return 1;
}
if (n_sects < parallel_num)
{
/* no sector in some threads */
return 1;
}
/* check worker */
sort_param->px_worker_manager = parallel_query::worker_manager::try_reserve_workers (parallel_num);
if (sort_param->px_worker_manager == NULL)
{
return 1;
}
else
{
return parallel_num;
}
}
else
{
/* Not implemented yet */
return 1;
}
/* single process */
return 1;
}
/*
* sort_put_result_for_parallel () - put result file for parallel
* return:
* thread_p(in):
* sort_param(in):
*/
void
sort_put_result_for_parallel (cubthread::entry & thread_ref, SORT_PARAM * sort_param)
{
THREAD_ENTRY *thread_p = &thread_ref;
SORT_INFO *sort_info_p = NULL, *ori_sort_info_p;
TSC_TICKS start_tick, end_tick;
TSCTIMEVAL tv_diff;
PX_STATUS px_status;
thread_ref.tran_index = sort_param->px_orig_thread_p->tran_index;
thread_ref.m_px_orig_thread_entry = sort_param->px_orig_thread_p;
thread_ref.conn_entry = sort_param->px_orig_thread_p->conn_entry;
thread_p->push_resource_tracks ();
if (thread_is_on_trace (sort_param->px_orig_thread_p))
{
thread_set_sort_stats_active (thread_p, true);
thread_ref.on_trace = true;
perfmon_initialize_parallel_stats (&thread_ref);
tsc_getticks (&start_tick);
}
if (sort_param->px_type == SORT_ORDER_BY)
{
sort_info_p = (SORT_INFO *) sort_param->put_arg;
ori_sort_info_p = (SORT_INFO *) sort_param->ori_sort_param->put_arg;
if (sort_param->file_contents[0].start_index == 0)
{
/* first file : use origin output temp file */
sort_info_p->output_file = ori_sort_info_p->output_file;
}
else
{
sort_info_p->output_file =
qfile_open_list (thread_p, &ori_sort_info_p->input_file->type_list, ori_sort_info_p->sort_list_p,
ori_sort_info_p->input_file->query_id, ori_sort_info_p->flag | QFILE_NOT_USE_MEMBUF, NULL);
}
}
/* write last temp file */
if (sort_put_result_from_tmpfile (thread_p, sort_param, sort_param->file_contents[0].start_index) != NO_ERROR)
{
px_status = PX_ERR_FAILED;
}
else
{
px_status = PX_DONE;
}
if (sort_param->px_type == SORT_ORDER_BY)
{
qfile_close_list (thread_p, sort_info_p->output_file);
if (sort_info_p->output_recdes.data)
{
db_private_free_and_init (NULL, sort_info_p->output_recdes.data);
sort_info_p->output_recdes.data = NULL;
}
}
if (thread_is_on_trace (sort_param->px_orig_thread_p))
{
tsc_getticks (&end_tick);
tsc_elapsed_time_usec (&tv_diff, end_tick, start_tick);
TSC_ADD_TIMEVAL (sort_param->orderby_stats.orderby_time, tv_diff);
sort_param->orderby_stats.orderby_pages += (perfmon_get_from_statistic (thread_p, PSTAT_SORT_NUM_DATA_PAGES));
sort_param->orderby_stats.orderby_ioreads += (perfmon_get_from_statistic (thread_p, PSTAT_SORT_NUM_IO_PAGES));
thread_set_sort_stats_active (thread_p, false);
perfmon_destroy_parallel_stats (&thread_ref);
}
thread_p->pop_resource_tracks ();
/* done */
pthread_mutex_lock (sort_param->px_mtx);
sort_param->px_status = px_status;
if (px_status == PX_ERR_FAILED)
{
sort_param->main_error_context->get_current_error_level ().swap (cuberr::context::get_thread_local_error ());
}
pthread_cond_signal (sort_param->complete_cond);
pthread_mutex_unlock (sort_param->px_mtx);
}
/*
* sort_merge_nruns_parallel () - merge nruns for parallel
* return:
* thread_p(in):
* sort_param(in):
*/
void
sort_merge_nruns_parallel (cubthread::entry & thread_ref, SORT_PARAM * sort_param)
{
THREAD_ENTRY *thread_p = &thread_ref;
TSC_TICKS start_tick, end_tick;
TSCTIMEVAL tv_diff;
PX_STATUS px_status;
thread_ref.tran_index = sort_param->px_orig_thread_p->tran_index;
thread_ref.m_px_orig_thread_entry = sort_param->px_orig_thread_p;
thread_ref.conn_entry = sort_param->px_orig_thread_p->conn_entry;
thread_p->push_resource_tracks ();
if (thread_is_on_trace (sort_param->px_orig_thread_p))
{
thread_set_sort_stats_active (thread_p, true);
thread_ref.on_trace = true;
perfmon_initialize_parallel_stats (&thread_ref);
tsc_getticks (&start_tick);
}
if (sort_merge_nruns (thread_p, sort_param) != NO_ERROR)
{
px_status = PX_ERR_FAILED;
}
else
{
px_status = PX_DONE;
}
if (thread_is_on_trace (sort_param->px_orig_thread_p))
{
tsc_getticks (&end_tick);
tsc_elapsed_time_usec (&tv_diff, end_tick, start_tick);
TSC_ADD_TIMEVAL (sort_param->orderby_stats.orderby_time, tv_diff);
sort_param->orderby_stats.orderby_pages += (perfmon_get_from_statistic (thread_p, PSTAT_SORT_NUM_DATA_PAGES));
sort_param->orderby_stats.orderby_ioreads += (perfmon_get_from_statistic (thread_p, PSTAT_SORT_NUM_IO_PAGES));
thread_set_sort_stats_active (thread_p, false);
perfmon_destroy_parallel_stats (&thread_ref);
}
thread_p->pop_resource_tracks ();
/* done */
pthread_mutex_lock (sort_param->px_mtx);
sort_param->px_status = px_status;
if (sort_param->px_status == PX_ERR_FAILED)
{
sort_param->main_error_context->get_current_error_level ().swap (cuberr::context::get_thread_local_error ());
}
pthread_cond_signal (sort_param->complete_cond);
pthread_mutex_unlock (sort_param->px_mtx);
}
/*
* sort_start_parallelism () - start parallelism
* return: NO_ERROR
* px_sort_param(in):
* sort_param(in):
* parallel_num(in):
* sort_parallel_type(in):
*/
int
sort_start_parallelism (THREAD_ENTRY * thread_p, SORT_PARAM * px_sort_param, SORT_PARAM * sort_param)
{
int error = NO_ERROR;
int parallel_num = sort_param->px_parallel_num;
/* copy sort_param for parallel sort */
error = sort_copy_sort_param (thread_p, px_sort_param, sort_param, parallel_num);
if (error != NO_ERROR)
{
return ER_FAILED;
}
/* case of ORDER BY */
if (sort_param->px_type == SORT_ORDER_BY)
{
QFILE_LIST_SCAN_ID *scan_id_p;
SORT_INFO *sort_info_p;
for (int i = 0; i < parallel_num; i++)
{
/* deep copy for get_arg, put_arg */
error =
sort_copy_sort_info (thread_p, (SORT_INFO **) & px_sort_param[i].get_arg,
(SORT_INFO *) sort_param->get_arg);
if (error != NO_ERROR)
{
return ER_FAILED;
}
error =
sort_copy_sort_info (thread_p, (SORT_INFO **) & px_sort_param[i].put_arg,
(SORT_INFO *) sort_param->put_arg);
if (error != NO_ERROR)
{
return ER_FAILED;
}
}
/* get scan id of input file */
sort_info_p = (SORT_INFO *) sort_param->get_arg;
scan_id_p = sort_info_p->s_id->s_id;
/* close input file for read. it'll be opened in parallel */
qfile_close_scan (thread_p, scan_id_p);
/* split input temp file. */
error = sort_split_input_temp_file (thread_p, px_sort_param, sort_param, parallel_num);
if (error != NO_ERROR)
{
return ER_FAILED;
}
}
else if (sort_param->px_type == SORT_INDEX_LEAF)
{
using ftab_set = parallel_query::ftab_set;
SORT_ARGS *sort_args_p = (SORT_ARGS *) sort_param->get_arg, *px_sort_args_p;
std::vector < ftab_set > ftab_sets (parallel_num);
ftab_set temp;
FILE_FTAB_COLLECTOR collector;
for (int i = 0; i < parallel_num; i++)
{
px_sort_param[i].get_arg = NULL;
}
for (int i = 0; i < parallel_num; i++)
{
px_sort_args_p = (SORT_ARGS *) malloc (sizeof (SORT_ARGS));
if (px_sort_args_p == NULL)
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, sizeof (SORT_ARGS));
return ER_OUT_OF_VIRTUAL_MEMORY;
}
memcpy (px_sort_args_p, sort_param->get_arg, sizeof (SORT_ARGS));
px_sort_param[i].get_arg = px_sort_args_p;
px_sort_param[i].get_fn = &btree_sort_get_next_parallel;
px_sort_args_p->ftab_sets = NULL;
px_sort_args_p->curr_sec = FILE_PARTIAL_SECTOR_INITIALIZER;
px_sort_args_p->curr_pgoffset = 0;
px_sort_args_p->in_recdes =
{
0, 0, 0, NULL};
}
/* split ftab into each parallel sort param */
for (int i = 0; i < sort_args_p->n_classes; i++)
{
error = file_get_all_data_sectors (thread_p, &sort_args_p->hfids[i].vfid, &collector);
if (error != NO_ERROR)
{
return error;
}
temp.convert (&collector);
db_private_free_and_init (thread_p, collector.partsect_ftab);
ftab_sets = temp.split (parallel_num);
for (int j = 0; j < parallel_num; j++)
{
px_sort_args_p = (SORT_ARGS *) px_sort_param[j].get_arg;
if (px_sort_args_p->ftab_sets == NULL)
{
px_sort_args_p->ftab_sets = (std::vector < ftab_set > *)malloc (sizeof (std::vector < ftab_set >));
if (px_sort_args_p->ftab_sets == NULL)
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1,
sizeof (std::vector < ftab_set >));
return ER_OUT_OF_VIRTUAL_MEMORY;
}
placement_new (px_sort_args_p->ftab_sets);
}
/* Always push (even if empty) to keep ftab_sets index aligned with cur_class.
* An empty ftab_set causes get_next_vpid() to return S_END immediately. */
px_sort_args_p->ftab_sets->push_back (ftab_sets[j]);
}
}
}
else
{
/* not implemented yet (group by, analytic fuction) */
return ER_FAILED;
}
return error;
}
/*
* sort_end_parallelism () - end parallelism
* return: NO_ERROR
* px_sort_param(in):
* sort_param(in):
* parallel_num(in):
* sort_parallel_type(in):
*/
int
sort_end_parallelism (THREAD_ENTRY * thread_p, SORT_PARAM * px_sort_param, SORT_PARAM * sort_param)
{
int error = NO_ERROR;
SORT_INFO *sort_info_p;
QFILE_LIST_SCAN_ID *scan_id_p;
QFILE_LIST_ID *origin_list_id, *mergeable_list_id;
int parallel_num = sort_param->px_parallel_num;
if (sort_param->px_type == SORT_ORDER_BY)
{
sort_info_p = (SORT_INFO *) sort_param->get_arg;
scan_id_p = sort_info_p->s_id->s_id;
/* open origin temp file for read */
if (qfile_open_list_scan (sort_info_p->input_file, scan_id_p) != NO_ERROR)
{
return ER_FAILED;
}
/* merge parallel result into sort location temp file */
error = sort_merge_run_for_parallel (thread_p, px_sort_param, sort_param, parallel_num);
if (error != NO_ERROR)
{
return ER_FAILED;
}
/* Do not output parallel sort trace info from qfile_sort_list(). */
ORDERBY_STATS *orderby_stats = (ORDERBY_STATS *) ((SORT_INFO *) sort_param->get_arg)->orderby_stats;
if (orderby_stats && thread_is_on_trace (thread_p))
{
orderby_stats->px_min_orderby_time = std::numeric_limits < UINT64 >::max ();
orderby_stats->px_min_orderby_pages = std::numeric_limits < UINT64 >::max ();
orderby_stats->px_min_orderby_ioreads = std::numeric_limits < UINT64 >::max ();
for (int i = 0; i < parallel_num; i++)
{
orderby_stats->px_min_orderby_time =
std::min (orderby_stats->px_min_orderby_time,
(UINT64) (TO_MSEC (px_sort_param[i].orderby_stats.orderby_time)));
orderby_stats->px_max_orderby_time =
std::max (orderby_stats->px_max_orderby_time,
(UINT64) (TO_MSEC (px_sort_param[i].orderby_stats.orderby_time)));
orderby_stats->px_min_orderby_pages =
std::min (orderby_stats->px_min_orderby_pages, px_sort_param[i].orderby_stats.orderby_pages);
orderby_stats->px_max_orderby_pages =
std::max (orderby_stats->px_max_orderby_pages, px_sort_param[i].orderby_stats.orderby_pages);
orderby_stats->px_min_orderby_ioreads =
std::min (orderby_stats->px_min_orderby_ioreads, px_sort_param[i].orderby_stats.orderby_ioreads);
orderby_stats->px_max_orderby_ioreads =
std::max (orderby_stats->px_max_orderby_ioreads, px_sort_param[i].orderby_stats.orderby_ioreads);
}
orderby_stats->parallel_num = parallel_num;
}
}
else if (sort_param->px_type == SORT_INDEX_LEAF)
{
error = sort_merge_run_for_parallel_index_leaf_build (thread_p, px_sort_param, sort_param, parallel_num);
if (error != NO_ERROR)
{
return ER_FAILED;
}
}
else
{
/* not implemented yet (group by, analytic function, create index) */
return ER_FAILED;
}
return error;
}
#endif
/*
* sort_write_area () - Write memory area to disk
* return:
* vfid(in): file identifier to write the pages contained in the area
* first_page(in): first page to be written on the file
* num_pages(in): size of the memory area in terms of number of pages it
* accommodates
* area_start(in): beginning address of the area
*
* Note: This function writes the contents of the given memory area to the
* specified file starting from the given page. Before doing so, however,
* it checks the size of the file and, if necessary, allocates new pages.
* If new pages are needed but the disk is full, an error code is
* returned.
*/
static int
sort_write_area (THREAD_ENTRY * thread_p, VFID * vfid, int first_page, INT32 num_pages, char *area_start,
bool tde_encrypted)
{
PAGE_PTR page_ptr = NULL;
VPID vpid;
INT32 page_no;
int i;
int ret = NO_ERROR;
TDE_ALGORITHM tde_algo = TDE_ALGORITHM_NONE;
if (tde_encrypted)
{
ret = file_get_tde_algorithm (thread_p, vfid, PGBUF_UNCONDITIONAL_LATCH, &tde_algo);
if (ret != NO_ERROR)
{
return ret;
}
}
/* initializations */
page_no = first_page;
/* Flush pages buffered in the given area to the specified file */
page_ptr = (PAGE_PTR) area_start;
for (i = 0; i < num_pages; i++)
{
/* file is automatically expanded if page is not allocated (as long as it is missing only one page) */
ret = file_numerable_find_nth (thread_p, vfid, page_no++, true, NULL, NULL, &vpid);
if (ret != NO_ERROR)
{
ASSERT_ERROR ();
return ret;
}
if (pgbuf_copy_from_area (thread_p, &vpid, 0, DB_PAGESIZE, page_ptr, true, tde_algo) == NULL)
{
ASSERT_ERROR_AND_SET (ret);
return ret;
}
page_ptr += DB_PAGESIZE;
}
return NO_ERROR;
}
/*
* sort_read_area () - Read memory area from disk
* return:
* vfid(in): file identifier to read the pages from
* first_page(in): first page to be read from the file
* num_pages(in): size of the memory area in terms of number of pages it
* accommodates
* area_start(in): beginning address of the area
*
* Note: This function reads in successive pages of the specified file into
* the given memory area until this area becomes full.
*/
static int
sort_read_area (THREAD_ENTRY * thread_p, VFID * vfid, int first_page, INT32 num_pages, char *area_start)
{
PAGE_PTR page_ptr = NULL;
VPID vpid;
INT32 page_no;
int i;
int ret = NO_ERROR;
vpid.volid = vfid->volid;
page_no = first_page;
/* Flush pages buffered in the given area to the specified file */
page_ptr = (PAGE_PTR) area_start;
for (i = 0; i < num_pages; i++)
{
ret = file_numerable_find_nth (thread_p, vfid, page_no++, false, NULL, NULL, &vpid);
if (ret != NO_ERROR)
{
ASSERT_ERROR ();
return ret;
}
if (pgbuf_copy_to_area (thread_p, &vpid, 0, DB_PAGESIZE, page_ptr, true) == NULL)
{
ASSERT_ERROR_AND_SET (ret);
return ret;
}
page_ptr += DB_PAGESIZE;
}
return NO_ERROR;
}
/*
* sort_get_num_half_tmpfiles () - Determines the number of temporary files to be used
* during the sorting process
* return:
* tot_buffers(in): total number of buffers in the buffer pool area
* input_pages(in): size of the input file in terms of number of pages it
* occupies
*
*/
static int
sort_get_num_half_tmpfiles (int tot_buffers, int input_pages)
{
int half_files = tot_buffers - 1;
int exp_num_runs;
/* If there is an estimate on number of input pages */
if (input_pages > 0)
{
/* Conservatively estimate number of runs that will be produced */
exp_num_runs = CEIL_PTVDIV (input_pages, tot_buffers) + 1;
if (exp_num_runs < half_files)
{
if ((exp_num_runs > (tot_buffers / 2)))
{
half_files = exp_num_runs;
}
else
{
half_files = (tot_buffers / 2);
}
}
}
/* Precaution against underestimation on exp_num_runs and having too few files to merge them */
if (half_files < SORT_MIN_HALF_FILES)
{
return SORT_MIN_HALF_FILES;
}
if (half_files < SORT_MAX_HALF_FILES)
{
return half_files;
}
else
{
/* Precaution against saturation (i.e. having too many files) */
return SORT_MAX_HALF_FILES;
}
}
/*
* sort_checkalloc_numpages_of_outfiles () - Check sizes of output files
* return: error code
* sort_param(in): sort parameters
*
* Note: This function determines how many pages will be needed by each output
* file of the current stage of the merging phase. This is done by going
* over the file_contents lists of the input files and determining how
* many pages they will eventually contribute to each output file.
* (Again, this is an estimate, not an exact number on the size of output
* files.) It then checks whether these output files have that many pages
* already. If some of them need more pages, it allocates new pages.
*/
static int
sort_checkalloc_numpages_of_outfiles (THREAD_ENTRY * thread_p, SORT_PARAM * sort_param)
{
int out_file;
int out_half;
int needed_pages[2 * SORT_MAX_HALF_FILES];
int contains;
int alloc_pages;
int i, j;
int error_code = NO_ERROR;
for (i = 0; i < (int) DIM (needed_pages); i++)
{
needed_pages[i] = 0;
}
if (sort_param->in_half == 0)
{
out_half = sort_param->half_files;
}
else
{
out_half = 0;
}
/* Estimate the sizes of all new runs to be flushed on output files */
for (i = sort_param->in_half; i < sort_param->in_half + sort_param->half_files; i++)
{
out_file = out_half;
/* If the list is not empty */
j = sort_param->file_contents[i].first_run;
if (j > -1)
{
for (; j <= sort_param->file_contents[i].last_run; j++)
{
needed_pages[out_file] += sort_param->file_contents[i].num_pages[j];
if (++out_file >= out_half + sort_param->half_files)
{
out_file = out_half;
}
}
}
}
/* Allocate enough pages to each output file We don't initialize pages during allocation since we do not care the
* state of the pages after a rollback or system crashes. Nothing need to be log on the page. The pages are
* initialized at a later time. */
/* Files are traversed in reverse order, in order to destroy unnecessary files first. It is expected that returned
* pages will be reused by the next allocation. */
for (i = out_half + sort_param->half_files - 1; i >= out_half; i--)
{
if (needed_pages[i] > 0)
{
assert (!VFID_ISNULL (&sort_param->temp[i]));
error_code = file_get_num_user_pages (thread_p, &sort_param->temp[i], &contains);
if (error_code != NO_ERROR)
{
ASSERT_ERROR ();
return error_code;
}
alloc_pages = (needed_pages[i] - contains);
if (alloc_pages > 0)
{
error_code = file_alloc_multiple (thread_p, &sort_param->temp[i], NULL, NULL, alloc_pages, NULL);
if (error_code != NO_ERROR)
{
ASSERT_ERROR ();
return error_code;
}
}
}
else
{
/* If there is a file not to be used anymore, destroy it in order to reuse spaces. */
if (!VFID_ISNULL (&sort_param->temp[i]))
{
error_code = file_temp_retire (thread_p, &sort_param->temp[i]);
if (error_code != NO_ERROR)
{
ASSERT_ERROR ();
return error_code;
}
VFID_SET_NULL (&sort_param->temp[i]);
}
}
}
return NO_ERROR;
}
/*
* sort_get_numpages_of_active_infiles () - Find number of active input files
* return:
* sort_param(in): sort parameters
*
* Note: This function determines how many of the input files still
* have input runs (active) to participate in while the merging
* process which produces larger size runs. For this purpose,
* it checks the file_contents list of each input file. Once the
* first file with no remaining input runs (unactive) is found,
* it is concluded that all the remaining input temporary files
* are also inactive (because of balanced distribution of runs to
* the files).
*/
static int
sort_get_numpages_of_active_infiles (const SORT_PARAM * sort_param)
{
int i;
for (i = sort_param->in_half; i < sort_param->in_half + sort_param->half_files; i++)
{
if (sort_param->file_contents[i].first_run == -1)
{
break;
}
}
return (i - sort_param->in_half);
}
/*
* sort_find_inbuf_size () - Distribute buffers
* return:
* tot_buffers(in): number of total buffers in the buffer pool area
* in_sections(in): number of input sections into which this buffer pool area
* should be divided into (in other words, the number of
* active input files)
*
* Note: This function distributes the buffers of the buffer pool area
* (i.e., the internal memory) among the active input files and
* the output file. Recall that each active input file and the
* output file will have a section in the buffer pool area.
* This function returns the size of each input section in terms
* of number of buffers it occupies. Naturally, the output
* section will have the remaining buffers.
*
* Note that when the input runs are merged together the
* number of read operations is (roughly) equal to the
* number of write operations. For that reason this function
* reserves roughly half of the buffers for the output section
* and distributes the remaining ones evenly among the input
* sections, as each input run is approximately the same size.
*/
static int
sort_find_inbuf_size (int tot_buffers, int in_sections)
{
int in_sectsize;
/* Allocate half of the total buffers to output buffer area */
in_sectsize = (tot_buffers / (in_sections << 1));
if (in_sectsize != 0)
{
return in_sectsize;
}
else
{
return 1;
}
}
/*
* sort_run_add_new () - Adds a new node to the end of the given list
* return: NO_ERROR
* file_contents(in): which list to add
* num_pages(in): what value to put for the new run
*/
static int
sort_run_add_new (FILE_CONTENTS * file_contents, int num_pages)
{
int new_total_elements;
int ret = NO_ERROR;
if (file_contents->first_run == -1)
{
/* This is an empty list */
file_contents->first_run = 0;
file_contents->last_run = 0;
}
else
{
file_contents->last_run++;
}
/* If there is no room in the dynamic array to keep the next element of the list; expand the dynamic array. */
if (file_contents->last_run >= file_contents->num_slots)
{
new_total_elements = ((int) (((float) file_contents->num_slots * SORT_EXPAND_DYN_ARRAY_RATIO) + 0.5));
file_contents->num_pages = (int *) realloc (file_contents->num_pages, new_total_elements * sizeof (int));
if (file_contents->num_pages == NULL)
{
return ER_FAILED;
}
file_contents->num_slots = new_total_elements;
}
/* Put the "num_pages" info to the "last_run" slot of the array */
file_contents->num_pages[file_contents->last_run] = num_pages;
return ret;
}
/*
* sort_run_remove_first () - Removes the first run of the given file contents list
* return: void
* file_contents(in): which list to remove from
*/
static void
sort_run_remove_first (FILE_CONTENTS * file_contents)
{
/* If the list is not empty */
if (file_contents->first_run != -1)
{
/* remove the first element of the list */
if (++file_contents->first_run > file_contents->last_run)
{
/* the list is empty now; indicate so */
file_contents->first_run = -1;
}
}
}
/*
* sort_get_num_file_contents () - Returns the number of elements kept in the
* given linked list
* return:
* file_contents(in): which list
*/
static int
sort_get_num_file_contents (FILE_CONTENTS * file_contents)
{
/* If the list is not empty */
if (file_contents->first_run != -1)
{
return (file_contents->last_run - file_contents->first_run + 1);
}
else
{
/* empty list */
return (0);
}
}
#if defined(CUBRID_DEBUG)
/*
* sort_print_file_contents () - Prints the elements of the given file contents list
* return: void
* file_contents(in): which list to print
*
* Note: It is used for debugging purposes.
*/
static void
sort_print_file_contents (const FILE_CONTENTS * file_contents)
{
int j;
/* If the list is not empty */
j = file_contents->first_run;
if (j > -1)
{
fprintf (stdout, "File contents:\n");
for (; j <= file_contents->last_run; j++)
{
fprintf (stdout, " Run with %3d pages\n", file_contents->num_pages[j]);
}
}
else
{
fprintf (stdout, "Empty file:\n");
}
}
#endif /* CUBRID_DEBUG */