Skip to content

File extendible_hash.c

File List > cubrid > src > storage > extendible_hash.c

Go to the documentation of this file

/*
 * Copyright 2008 Search Solution Corporation
 * Copyright 2016 CUBRID Corporation
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 *
 */

/*
 * extendible_hash.c - Extendible hash manager
 */

#ident "$Id$"

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stddef.h>
#include <math.h>
#if defined(sun) || defined(HPUX)
#include <sys/types.h>
#include <netinet/in.h>
#endif
#if defined(_AIX)
#include <net/nh.h>
#endif

#include "chartype.h"
#include "storage_common.h"
#include "memory_alloc.h"
#include "object_representation.h"
#include "error_manager.h"
#include "xserver_interface.h"
#include "log_manager.h"
#include "extendible_hash.h"
#include "page_buffer.h"
#include "lock_manager.h"
#include "slotted_page.h"
#include "file_manager.h"
#include "overflow_file.h"
#include "memory_hash.h"    /* For hash functions */
#include "tz_support.h"
#include "db_date.h"
#include "thread_compat.hpp"
// XXX: SHOULD BE THE LAST INCLUDE HEADER
#include "memory_wrapper.hpp"

#ifdef EHASH_DEBUG
#define EHASH_BALANCE_FACTOR     4  /* Threshold rate of no. of directory pointers over no. of bucket pages. If
                     * this threshold is exceeded, a warning is issued in the debugging mode */
#endif /* EHASH_DEBUG */

  /*
   * NOTICE: The constants EHASH_OVERFLOW_RATE and UNDERFLOW_RATE must be
   * less than 1. The following values are very appropriate. Avoid changing
   * them unless absolutely necessary.
   */

#define EHASH_OVERFLOW_RATE     0.9 /* UPPER THRESHOLD for a merge operation. */

  /*
   * After a bucket merge operation, up to what percent of the sibling bucket
   * space (i.e. DB_PAGESIZE) can be full. If it is found that during a merge
   * the sibling bucket will become too full the merge is delayed to avoid an
   * immediate split on the sibling bucket. (Exception: if the bucket becomes
   * completely empty, the merge operation is performed no matter how full the
   * sibling bucket is). That is we try to avoid a yoyo behaviour (split,
   * merge).
   */

#define EHASH_UNDERFLOW_RATE     0.4    /* LOWER THRESHOLD for a merge operation. */

  /*
   * After a deletion operation, if the remaining records occupy less than
   * this rate of bucket space (i.e. DB_PAGESIZE) then the bucket is tried to
   * be merged with its sibling bucket.
   */

/* Conversion of these rates into absolute values */
#define EHASH_OVERFLOW_THRESHOLD   (EHASH_OVERFLOW_RATE * DB_PAGESIZE)
#define EHASH_UNDERFLOW_THRESHOLD  (EHASH_UNDERFLOW_RATE * DB_PAGESIZE)

/* Number of bits pseudo key consists of */
#define EHASH_HASH_KEY_BITS           (sizeof(EHASH_HASH_KEY) * 8)

/* Number of bits the short data type consists of */
#define EHASH_SHORT_BITS  (sizeof(short) * 8)

typedef unsigned int EHASH_HASH_KEY;    /* Pseudo_key type */

/* Extendible hashing directory header */
typedef struct ehash_dir_header EHASH_DIR_HEADER;
struct ehash_dir_header
{
  /* Fields should be ordered according to their sizes */
  VFID bucket_file;     /* bucket file identifier */
  VFID overflow_file;       /* overflow (buckets) file identifier */

  /* Each one keeps the number of buckets having a local depth equal to the index value of counter. Used for noticing
   * directory shrink condition */
  int local_depth_count[EHASH_HASH_KEY_BITS + 1];

  DB_TYPE key_type;     /* type of the keys */
  short depth;          /* global depth of the directory */
  char alignment;       /* alignment value used on slots of bucket pages */
};

/* Directory elements : Pointer to the bucket */
typedef struct ehash_dir_record EHASH_DIR_RECORD;
struct ehash_dir_record
{
  VPID bucket_vpid;     /* bucket pointer */
};

/* Each bucket has a header area to store its local depth */
typedef struct ehash_bucket_header EHASH_BUCKET_HEADER;
struct ehash_bucket_header
{
  char local_depth;     /* The local depth of the bucket */
};

typedef enum
{
  EHASH_SUCCESSFUL_COMPLETION,
  EHASH_BUCKET_FULL,        /* Bucket full condition; used in insertion */
  EHASH_BUCKET_UNDERFLOW,   /* Bucket underflow condition; used in deletion */
  EHASH_BUCKET_EMPTY,       /* Bucket empty condition; used in deletion */
  EHASH_FULL_SIBLING_BUCKET,
  EHASH_NO_SIBLING_BUCKET,
  EHASH_ERROR_OCCURRED
} EHASH_RESULT;

/* Recovery structures */
typedef struct ehash_repetition EHASH_REPETITION;
struct ehash_repetition
{
  /* The "vpid" is repeated "count" times */
  VPID vpid;
  int count;
};

/* Definitions about long strings */
#define EHASH_LONG_STRING_PREFIX_SIZE   10

/* Directory header size is aligned to size of integer */
#define EHASH_DIR_HEADER_SIZE \
  ((ssize_t) (((sizeof(EHASH_DIR_HEADER) + sizeof(int) - 1 ) / sizeof(int) ) * sizeof(int)))

/* Maximum size of a string key; 16 is for two slot indices */
#define EHASH_MAX_STRING_SIZE \
  (DB_PAGESIZE - (SSIZEOF(EHASH_BUCKET_HEADER) + 16))

/* Number of pointers the first page of the directory contains */
#define EHASH_NUM_FIRST_PAGES \
  ((DB_PAGESIZE - EHASH_DIR_HEADER_SIZE) / SSIZEOF (EHASH_DIR_RECORD))

/* Offset of the last pointer in the first directory page */
#define EHASH_LAST_OFFSET_IN_FIRST_PAGE \
  (EHASH_DIR_HEADER_SIZE + (EHASH_NUM_FIRST_PAGES - 1) * sizeof(EHASH_DIR_RECORD))

/* Number of pointers for each directory page (other than the first one)  */
#define EHASH_NUM_NON_FIRST_PAGES \
  (DB_PAGESIZE / sizeof(EHASH_DIR_RECORD))

/* Offset of the last pointer in the other directory pages */
#define EHASH_LAST_OFFSET_IN_NON_FIRST_PAGE \
  ((EHASH_NUM_NON_FIRST_PAGES - 1) * sizeof(EHASH_DIR_RECORD))

/*
 * GETBITS
 *
 *   value: value from which bits are extracted; its type should be
 *          "unsigned int"
 *   pos: first bit position (left adjusted) of the field
 *   n: length of the field to be extracted
 *
 * Note: Returns the n-bit field of key that begins at left adjusted
 * position pos. The type of key is supposed to be unsigned
 * so that when it is right-shifted vacated bits will be filled
 * with zeros, not sign bits.
 */
/* TODO: ~0: M2 64-bit */
#define GETBITS(value, pos, n) \
  ( ((value) >> ( EHASH_HASH_KEY_BITS - (pos) - (n) + 1)) & (~(~0UL << (n))) )
  /* Plus 1 since bits are numbered from 1 to 32 */

/*
 * FIND_OFFSET
 *
 *   hash_key: pseudo key to map to a directory pointer
 *   depth: depth of directory: tells how many bits to use
 *
 * Note: This macro maps a hash_key to an entry in a directory of one
 * area. It extracts first "depth" bits from the left side of
 *       "hash_key" and returns this value.
 *
 */

#define FIND_OFFSET(hash_key, depth) (GETBITS((hash_key), 1, (depth)))

/*
 * GETBIT
 *   return: int
 *   word: computer word from which the bit is extracted.
 *         Its type should be "unsigned int".
 *   pos: bit position (left adjusted) of word to return
 *
 *
 * Note: Returns the value of the bit (in "pos" position) of a "word".
 *
 */

#define GETBIT(word, pos) (GETBITS((word), (pos), 1))


/*
 * SETBIT
 *
 *   word: computer word whose bit is set
 *   pos: bit position (left adjusted) of word to be set to
 *
 * Note: Returns a value identical to "word" except the "pos" bit
 * is set to one.
 */

#define SETBIT(word,  pos) ( (word) | (1 << (EHASH_HASH_KEY_BITS - (pos))) )

/*
 * CLEARBIT
 *
 *   word: computer word whose bit is to be cleared
 *         Nth_most_significant_bit: bit position of word to be set to
 *   key_side: which side of the word should be used
 *
 * Note: Returns a value identical to "word" except the "pos" bit
 * is cleared (zeroed).
 */

#define CLEARBIT(word, pos) ( (word) & ~(1 << (EHASH_HASH_KEY_BITS - (pos))) )

#if defined (ENABLE_UNUSED_FUNCTION)
static int ehash_ansi_sql_strncmp (const char *s, const char *t, int max);
#endif
static char *ehash_allocate_recdes (RECDES * recdes, int size, short type);
static void ehash_free_recdes (RECDES * recdes);
#if defined (ENABLE_UNUSED_FUNCTION)
static int ehash_compare_overflow (THREAD_ENTRY * thread_p, const VPID * ovf_vpid, char *key, int *comp_result);
static char *ehash_compose_overflow (THREAD_ENTRY * thread_p, RECDES * recdes);
#endif
static int ehash_initialize_bucket_new_page (THREAD_ENTRY * thread_p, PAGE_PTR page_p, void *alignment_depth);
static int ehash_initialize_dir_new_page (THREAD_ENTRY * thread_p, PAGE_PTR page_p, void *args);
static short ehash_get_key_size (DB_TYPE key_type);
static EHID *ehash_create_helper (THREAD_ENTRY * thread_p, EHID * ehid, DB_TYPE key_type, int exp_num_entries,
                  OID * class_oid, int attr_id, bool istmp);
static PAGE_PTR ehash_fix_old_page (THREAD_ENTRY * thread_p, const VFID * vfid, const VPID * vpid,
                    PGBUF_LATCH_MODE mode);
static PAGE_PTR ehash_fix_ehid_page (THREAD_ENTRY * thread_p, EHID * ehid, PGBUF_LATCH_MODE latch_mode);
static PAGE_PTR ehash_fix_nth_page (THREAD_ENTRY * thread_p, const VFID * vfid, int offset, PGBUF_LATCH_MODE mode);
static void *ehash_insert_helper (THREAD_ENTRY * thread_p, EHID * ehid, void *key, OID * value_ptr, int lock_type,
                  VPID * existing_ovf_vpid);
static EHASH_RESULT ehash_insert_to_bucket (THREAD_ENTRY * thread_p, EHID * ehid, VFID * ovf_file, bool is_temp,
                        PAGE_PTR buc_pgptr, DB_TYPE key_type, void *key_ptr, OID * value_ptr,
                        VPID * existing_ovf_vpid);
static int ehash_compose_record (DB_TYPE key_type, void *key_ptr, OID * value_ptr, RECDES * recdes);
static bool ehash_locate_slot (THREAD_ENTRY * thread_p, PAGE_PTR page, DB_TYPE key_type, void *key,
                   PGSLOTID * position);
static PAGE_PTR ehash_split_bucket (THREAD_ENTRY * thread_p, EHASH_DIR_HEADER * dir_header, PAGE_PTR buc_pgptr,
                    void *key, int *old_local_depth, int *new_local_depth, VPID * sib_vpid,
                    bool is_temp);
static int ehash_expand_directory (THREAD_ENTRY * thread_p, EHID * ehid, int new_depth, bool is_temp);
static int ehash_connect_bucket (THREAD_ENTRY * thread_p, EHID * ehid, int local_depth, EHASH_HASH_KEY hash_key,
                 VPID * buc_vpid, bool is_temp);
static char ehash_find_depth (THREAD_ENTRY * thread_p, EHID * ehid, int location, VPID * vpid, VPID * sib_vpid);
static void ehash_merge (THREAD_ENTRY * thread_p, EHID * ehid, void *key, bool is_temp);
static void ehash_shrink_directory (THREAD_ENTRY * thread_p, EHID * ehid, int new_depth, bool is_temp);
static EHASH_HASH_KEY ehash_hash (void *orig_key, DB_TYPE key_type);
static int ehash_find_bucket_vpid (THREAD_ENTRY * thread_p, EHID * ehid, EHASH_DIR_HEADER * dir_header, int location,
                   PGBUF_LATCH_MODE latch, VPID * out_vpid);
static PAGE_PTR ehash_find_bucket_vpid_with_hash (THREAD_ENTRY * thread_p, EHID * ehid, void *key,
                          PGBUF_LATCH_MODE root_latch, PGBUF_LATCH_MODE bucket_latch,
                          VPID * out_vpid, EHASH_HASH_KEY * out_hash_key, int *out_location);
#if defined (ENABLE_UNUSED_FUNCTION)
static int ehash_create_overflow_file (THREAD_ENTRY * thread_p, EHID * ehid, PAGE_PTR dir_Rpgptr,
                       EHASH_DIR_HEADER * dir_header, FILE_TYPE file_type);
#endif
static char *ehash_read_oid_from_record (char *rec_p, OID * oid_p);
static char *ehash_write_oid_to_record (char *rec_p, OID * oid_p);
static char *ehash_read_ehid_from_record (char *rec_p, EHID * ehid_p);
static char *ehash_write_ehid_to_record (char *rec_p, EHID * ehid_p);
static int ehash_rv_delete (THREAD_ENTRY * thread_p, EHID * ehid, void *key);
static void ehash_adjust_local_depth (THREAD_ENTRY * thread_p, EHID * ehid, PAGE_PTR dir_Rpgptr,
                      EHASH_DIR_HEADER * dir_header, int depth, int delta, bool is_temp);
static int ehash_apply_each (THREAD_ENTRY * thread_p, EHID * ehid, RECDES * recdes, DB_TYPE key_type, char *bucrec_ptr,
                 OID * assoc_value, int *out_apply_error, int (*apply_function) (THREAD_ENTRY * thread_p,
                                                 void *key, void *data,
                                                 void *args), void *args);
static int ehash_merge_permanent (THREAD_ENTRY * thread_p, EHID * ehid, PAGE_PTR dir_Rpgptr,
                  EHASH_DIR_HEADER * dir_header, PAGE_PTR buc_pgptr, PAGE_PTR sib_pgptr,
                  VPID * buc_vpid, VPID * sib_vpid, int num_recs, int loc, PGSLOTID first_slotid,
                  int *out_new_local_depth, bool is_temp);
static void ehash_shrink_directory_if_need (THREAD_ENTRY * thread_p, EHID * ehid, EHASH_DIR_HEADER * dir_header,
                        bool is_temp);
static EHASH_RESULT ehash_check_merge_possible (THREAD_ENTRY * thread_p, EHID * ehid, EHASH_DIR_HEADER * dir_header,
                        VPID * buc_vpid, PAGE_PTR buc_pgptr, int location, int lock_type,
                        int *old_local_depth, VPID * sib_vpid, PAGE_PTR * out_sib_pgptr,
                        PGSLOTID * out_first_slotid, int *out_num_recs, int *out_loc);
static int ehash_distribute_records_into_two_bucket (THREAD_ENTRY * thread_p, EHASH_DIR_HEADER * dir_header,
                             PAGE_PTR buc_pgptr, EHASH_BUCKET_HEADER * buc_header, int num_recs,
                             PGSLOTID first_slotid, PAGE_PTR sib_pgptr);
static int ehash_find_first_bit_position (THREAD_ENTRY * thread_p, EHASH_DIR_HEADER * dir_header, PAGE_PTR buc_pgptr,
                      EHASH_BUCKET_HEADER * buc_header, void *key, int num_recs,
                      PGSLOTID first_slotid, int *old_local_depth, int *new_local_depth);
static int ehash_get_pseudo_key (THREAD_ENTRY * thread_p, RECDES * recdes, DB_TYPE key_type,
                 EHASH_HASH_KEY * out_hash_key);
static bool ehash_binary_search_bucket (THREAD_ENTRY * thread_p, PAGE_PTR buc_pgptr, PGSLOTID num_record,
                    DB_TYPE key_type, void *key, PGSLOTID * position);
static int ehash_compare_key (THREAD_ENTRY * thread_p, char *bucrec_ptr, DB_TYPE key_type, void *key, INT16 rec_type,
                  int *out_compare_result);
static int ehash_write_key_to_record (RECDES * recdes, DB_TYPE key_type, void *key_ptr, short key_size, OID * value_ptr,
                      bool long_str);
static EHASH_RESULT ehash_insert_bucket_after_extend_if_need (THREAD_ENTRY * thread_p, EHID * ehid, PAGE_PTR dir_Rpgptr,
                                  EHASH_DIR_HEADER * dir_header, VPID * buc_vpid, void *key,
                                  EHASH_HASH_KEY hash_key, int lock_type,
                                  bool is_temp, OID * value_ptr, VPID * existing_ovf_vpid);
static PAGE_PTR ehash_extend_bucket (THREAD_ENTRY * thread_p, EHID * ehid, PAGE_PTR dir_Rpgptr,
                     EHASH_DIR_HEADER * dir_header, PAGE_PTR buc_pgptr, void *key,
                     EHASH_HASH_KEY hash_key, int *new_bit, VPID * buc_vpid, bool is_temp);
static int ehash_insert_to_bucket_after_create (THREAD_ENTRY * thread_p, EHID * ehid, PAGE_PTR dir_Rpgptr,
                        EHASH_DIR_HEADER * dir_header, VPID * buc_vpid, int location,
                        EHASH_HASH_KEY hash_key, bool is_temp, void *key,
                        OID * value_ptr, VPID * existing_ovf_vpid);

static EHASH_HASH_KEY ehash_hash_string_type (char *key, char *orig_key);
static EHASH_HASH_KEY ehash_hash_eight_bytes_type (char *key);
#if defined (ENABLE_UNUSED_FUNCTION)
static EHASH_HASH_KEY ehash_hash_four_bytes_type (char *key);
static EHASH_HASH_KEY ehash_hash_two_bytes_type (char *key);
#endif

/* For debugging purposes only */
#if defined(EHINSERTION_ORDER)
static void eh_dump_key (DB_TYPE key_type, void *key, OID * value_ptr);
#endif /* EHINSERTION_ORDER */
static void ehash_dump_bucket (THREAD_ENTRY * thread_p, PAGE_PTR buc_pgptr, DB_TYPE key_type);

/*
 * ehash_dir_locate()
 *
 *   page_no_var(in): The nth directory page containing the pointer
 *   offset_var(in): The offset into the directory just like if the directory
 *              where contained in one contiguous area.
 *              SET by this macro according to the page_no_var where the
 *              offset is located in this page.
 *
 * Note: This macro finds the location of a directory pointer
 * (i.e. the number of directory page containing this pointer and
 * the offset value to reach the pointer within that page).
 * The pointer number is given to macro with the
 * "offset_var" parameter.
 * Since this operation is performed very frequently, it is
 * coded as a macro. So, it must be used very carefully. Two
 * variables should be passed as parameters. The variable passed
 * to "offset_var" should have the number of pointer prior to
 * this macro call.
 * In the implementation of this macro, division operation
 * is performed twice; once for the remainder and once for the
 * quotient. This is preferred to the method of successive
 * subtractions since built-in "/" and "%" operators take
 * constant time, whereas the latter method has linear
 * characteristic (i.e., it would take longer for bigger
 * directory sizes).
 */
static void
ehash_dir_locate (int *out_page_no_p, int *out_offset_p)
{
  int page_no, offset;

  offset = *out_offset_p;

  if (offset < EHASH_NUM_FIRST_PAGES)
    {
      /* in the first page */
      offset = offset * sizeof (EHASH_DIR_RECORD) + EHASH_DIR_HEADER_SIZE;
      page_no = 0;      /* at least one page */
    }
  else
    {
      /* not in the first page */
      offset -= EHASH_NUM_FIRST_PAGES;
      page_no = offset / EHASH_NUM_NON_FIRST_PAGES + 1;
      offset = (offset % EHASH_NUM_NON_FIRST_PAGES) * sizeof (EHASH_DIR_RECORD);
    }

  *out_page_no_p = page_no;
  *out_offset_p = offset;
}

#if defined (ENABLE_UNUSED_FUNCTION)
/*
 * Overflow page handling functions
 */

/*
 * ehash_ansi_sql_strncmp () -
 *   return:
 *   s(in):
 *   t(in):
 *   max(in):
 */
static int
ehash_ansi_sql_strncmp (const char *s, const char *t, int max)
{
  int i;

  for (i = 0; (*s == *t) && (i < max); s++, t++, i++)
    {
      if (*s == '\0')
    {
      return 0;
    }
    }

  if (i == max)
    {
      return 0;
    }

  if (*s == '\0')
    {
      while (*t != '\0')
    {           /* consume space-char */
      if (*t++ != ' ')
        {
          return -1;
        }
    }
      return 0;
    }
  else if (*t == '\0')
    {
      while (*s != '\0')
    {           /* consume space-char */
      if (*s++ != ' ')
        {
          return 1;
        }
    }
      return 0;
    }


  return (*(unsigned const char *) s < *(unsigned const char *) t) ? -1 : +1;

}
#endif

/*
 * ehash_allocate_recdes () -
 *   return: char *, or NULL
 *   recdes(out) : Record descriptor
 *   size(in)    : Request size
 *   type(in)    : Request type
 */
static char *
ehash_allocate_recdes (RECDES * recdes_p, int size, short type)
{
  recdes_p->area_size = recdes_p->length = size;
  recdes_p->type = type;
  recdes_p->data = NULL;

  if (recdes_p->area_size > 0)
    {
      recdes_p->data = (char *) malloc (recdes_p->area_size);
      if (recdes_p->data == NULL)
    {
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, (size_t) recdes_p->area_size);
    }
    }

  return recdes_p->data;
}

/*
 * ehash_free_recdes () -
 *   return:
 *   recdes(in): Record descriptor
 */
static void
ehash_free_recdes (RECDES * recdes_p)
{
  if (recdes_p->data)
    {
      free_and_init (recdes_p->data);
    }
  recdes_p->area_size = recdes_p->length = 0;

}

#if defined (ENABLE_UNUSED_FUNCTION)
/*
 * ehash_compare_overflow () - get/retrieve the content of a multipage object from overflow
 *   return: NO_ERROR, or ER_FAILED
 *   ovf_vpid(in): Overflow address
 *   key(in):
 *   comp_result(out):
 *
 * Note: The content of a multipage object associated with the given
 * overflow address(oid) is placed into the area pointed to by
 * the record descriptor. If the content of the object does not
 * fit in such an area (i.e., recdes->area_size), an error is
 * returned and a hint of its length is returned as a negative
 * value in recdes->length. The length of the retrieved object is
 * set in the the record descriptor (i.e., recdes->length).
 */
static int
ehash_compare_overflow (THREAD_ENTRY * thread_p, const VPID * ovf_vpid_p, char *key_p, int *out_comp_result)
{
  RECDES ovf_recdes;
  int size;
  int er_code;

  size = overflow_get_length (thread_p, ovf_vpid_p);
  if (size == -1)
    {
      return ER_FAILED;
    }

  if (ehash_allocate_recdes (&ovf_recdes, size, REC_HOME) == NULL)
    {
      return ER_FAILED;
    }

  er_code = NO_ERROR;

  if (overflow_get (thread_p, ovf_vpid_p, &ovf_recdes, NULL) != S_SUCCESS)
    {
      goto exit_on_error;
    }

  *out_comp_result = ansisql_strcmp (key_p, ovf_recdes.data);

end:
  ehash_free_recdes (&ovf_recdes);
  return er_code;

exit_on_error:
  er_code = ER_FAILED;
  goto end;
}

/*
 * ehash_compose_overflow () - get/retrieve the content of a multipage object from
 *                    overflow
 *   return:
 *   recdes(in): Record descriptor
 *
 * Note: The content of a multipage object associated with the given
 * overflow address(oid) is placed into the area pointed to by
 * the record descriptor. If the content of the object does not
 * fit in such an area (i.e., recdes->area_size), an error is
 * returned and a hint of its length is returned as a negative
 * value in recdes->length. The length of the retrieved object is
 * set in the the record descriptor (i.e., recdes->length).
 */
static char *
ehash_compose_overflow (THREAD_ENTRY * thread_p, RECDES * recdes_p)
{
  VPID *ovf_vpid_p;
  RECDES ovf_recdes;
  int size;
  char *bucket_p;

  /* Skip the OID field and point to ovf_vpid field */
  ovf_vpid_p = (VPID *) (recdes_p->data + sizeof (OID));

  /* Allocate the space for the whole key */
  size = overflow_get_length (thread_p, ovf_vpid_p);
  if (size == -1)
    {
      return NULL;
    }
  size += EHASH_LONG_STRING_PREFIX_SIZE;

  if (ehash_allocate_recdes (&ovf_recdes, size, REC_HOME) == NULL)
    {
      return NULL;
    }

  /* Copy the prefix key portion first */

  /* Skip the OID & ovf_vpid fields and point to the prefix key */
  bucket_p = recdes_p->data + sizeof (OID) + sizeof (VPID);

  memcpy (ovf_recdes.data, bucket_p, EHASH_LONG_STRING_PREFIX_SIZE);
  ovf_recdes.data += EHASH_LONG_STRING_PREFIX_SIZE;
  ovf_recdes.area_size -= EHASH_LONG_STRING_PREFIX_SIZE;

  /* Copy to the overflow portion of the key */
  if (overflow_get (thread_p, ovf_vpid_p, &ovf_recdes, NULL) != S_SUCCESS)
    {
      ehash_free_recdes (&ovf_recdes);
      return NULL;
    }

  return ovf_recdes.data;   /* key */
}
#endif

/*
 * ehash_initialize_bucket_new_page () - Initialize a newly allocated page
 *
 * return    : Error code.
 * thread_p (in) : Thread entry
 * page_p (in)   : New ehash bucket page
 * args (in)     : Alignment, depth and is_temp. Used to initialize and log the page.
 */
static int
ehash_initialize_bucket_new_page (THREAD_ENTRY * thread_p, PAGE_PTR page_p, void *args)
{
  char alignment;
  char depth;
  bool is_temp;
  int offset = 0;
  EHASH_BUCKET_HEADER bucket_header;
  RECDES bucket_recdes;
  PGSLOTID slot_id;
  int success = SP_SUCCESS;

  int error_code = NO_ERROR;

  alignment = *(char *) args;
  offset += sizeof (alignment);

  depth = *((char *) args + offset);
  offset += sizeof (depth);

  is_temp = *(bool *) ((char *) args + offset);

  /*
   * fetch and initialize the new page. The parameter UNANCHORED_KEEP_
   * SEQUENCE indicates that the order of records will be preserved
   * during insertions and deletions.
   */

  pgbuf_set_page_ptype (thread_p, page_p, PAGE_EHASH);

  /*
   * Initialize the bucket to contain variable-length records
   * on ordered slots.
   */
  spage_initialize (thread_p, page_p, UNANCHORED_KEEP_SEQUENCE, alignment, DONT_SAFEGUARD_RVSPACE);

  /* Initialize the bucket header */
  bucket_header.local_depth = depth;

  /* Set the record descriptor to the Bucket header */
  bucket_recdes.data = (char *) &bucket_header;
  bucket_recdes.area_size = bucket_recdes.length = sizeof (EHASH_BUCKET_HEADER);
  bucket_recdes.type = REC_HOME;

  /*
   * Insert the bucket header into the first slot (slot # 0)
   * on the bucket page
   */
  success = spage_insert (thread_p, page_p, &bucket_recdes, &slot_id);
  if (success != SP_SUCCESS)
    {
      /*
       * Slotted page module refuses to insert a short size record to an
       * empty page. This should never happen.
       */
      if (success != SP_ERROR)
    {
      er_set (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, ER_GENERIC_ERROR, 0);
      error_code = ER_FAILED;
    }
      else
    {
      ASSERT_ERROR_AND_SET (error_code);
    }
      return error_code;
    }

  if (!is_temp)
    {
      log_append_undoredo_data2 (thread_p, RVEH_INIT_BUCKET, NULL, page_p, -1, 0, 2, NULL, args);
    }
  pgbuf_set_dirty (thread_p, page_p, DONT_FREE);

  return NO_ERROR;
}

/*
 * ehash_initialize_dir_new_pages () - Initialize new page used to expand extensible hash directory.
 *
 * return       : Error code
 * thread_p (in)    : Thread entry
 * page_p (in)      : New directory page
 * ignore_args (in) : (not used)
 */
static int
ehash_initialize_dir_new_page (THREAD_ENTRY * thread_p, PAGE_PTR page_p, void *args)
{
  bool is_temp = *(bool *) args;

  pgbuf_set_page_ptype (thread_p, page_p, PAGE_EHASH);
  if (!is_temp)
    {
      log_append_undoredo_data2 (thread_p, RVEH_INIT_NEW_DIR_PAGE, NULL, page_p, -1, 0, 0, NULL, NULL);
    }
  pgbuf_set_dirty (thread_p, page_p, DONT_FREE);

  return NO_ERROR;
}

/*
 * ehash_rv_init_dir_new_page_redo () - Redo initialize new page used to
 *                  expand extensible hash table.
 *
 * return    : NO_ERROR.
 * thread_p (in) : Thread entry.
 * rcv (in)  : No data.
 */
int
ehash_rv_init_dir_new_page_redo (THREAD_ENTRY * thread_p, LOG_RCV * rcv)
{
  pgbuf_set_page_ptype (thread_p, rcv->pgptr, PAGE_EHASH);
  pgbuf_set_dirty (thread_p, rcv->pgptr, DONT_FREE);
  return NO_ERROR;
}

#if defined(EHINSERTION_ORDER)
/*
 * eh_dump_key () -
 *   return:
 *   key_type(in):
 *   key(in):
 *   value_ptr(in):
 */
static void
eh_dump_key (DB_TYPE key_type, void *key, OID * value_ptr)
{
  int hour, minute, second, month, day, year;
  double d;

  switch (key_type)
    {
    case DB_TYPE_STRING:
      fprintf (stdout, "key:%s", (char *) key);
      break;

    case DB_TYPE_OBJECT:
      fprintf (stdout, "key:%d|%d|%d", ((OID *) key)->volid, ((OID *) key)->pageid, ((OID *) key)->slotid);
      break;

    case DB_TYPE_DOUBLE:
      fprintf (stdout, "key:%f", *(double *) key);
      break;

    case DB_TYPE_FLOAT:
      fprintf (stdout, "key:%f", *(float *) key);
      break;

    case DB_TYPE_INTEGER:
      fprintf (stdout, "key:%d", *(int *) key);
      break;

    case DB_TYPE_BIGINT:
      fprintf (stdout, "key:%lld", (long long) (*(DB_BIGINT *) key));
      break;

    case DB_TYPE_SHORT:
      fprintf (stdout, "key:%d", *(short *) key);
      break;

    case DB_TYPE_DATE:
      db_date_decode ((DB_DATE *) key, &month, &day, &year);
      fprintf (stdout, "key:%2d/%2d/%4d", month, day, year);
      break;

    case DB_TYPE_TIME:
      db_time_decode ((DB_TIME *) key, &hour, &minute, &second);
      fprintf (stdout, "key:%3d:%3d:%3d", hour, minute, second);
      break;

    case DB_TYPE_TIMESTAMP:
    case DB_TYPE_TIMESTAMPLTZ:
      fprintf (stdout, "key:%d", *(DB_UTIME *) key);
      break;

    case DB_TYPE_TIMESTAMPTZ:
      fprintf (stdout, "key:%d", ((DB_TIMESTAMPTZ *) key)->timestamp, ((DB_TIMESTAMPTZ *) key)->tz_id);
      break;

    case DB_TYPE_DATETIME:
    case DB_TYPE_DATETIMELTZ:
      fprintf (stdout, "key:%d,%d", ((DB_DATETIME *) key)->date, ((DB_DATETIME *) key)->time);
      break;

    case DB_TYPE_DATETIMETZ:
      fprintf (stdout, "key:%d,%d", ((DB_DATETIMETZ *) key)->datetime.date, ((DB_DATETIMETZ *) key)->datetime.time,
           ((DB_DATETIMETZ *) key)->tz_id);
      break;

    case DB_TYPE_MONETARY:
      OR_MOVE_DOUBLE (key, &d);
      fprintf (stdout, "key:%f type %d", d, ((DB_MONETARY *) key)->type);
      break;

    default:
      /* Unspecified key type: Directory header has been corrupted */
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_EH_INVALID_KEY_TYPE, 1, key_type);
      return;
    }

  fprintf (stdout, "  OID associated value:%5d,%5d,%5d\n", value_ptr->volid, value_ptr->pageid, value_ptr->slotid);

}
#endif /* EHINSERTION_ORDER */

/*
 * xehash_create () - Create an extendible hashing structure
 *   return: EHID * (NULL in case of error)
 *   ehid(in): identifier for the extendible hashing structure to
 *             create. The volid field should already be set; others
 *             are set by this function.
 *   key_type(in): key type for the extendible hashing structure
 *   exp_num_entries(in): expected number of entries (i.e., <key, oid> pairs).
 *                   This figure is used as a guide to estimate the number of
 *           pages the extendible hashing structure will occupy.
 *           The purpose of this estimate is to increase the locality
 *           of reference on the disk.
 *           If the number of entries is not known, a negative value
 *           should be passed to this parameter.
 *   class_oid(in): OID of the class for which the index is created
 *   attr_id(in): Identifier of the attribute of the class for which the
 *                index is created.
 *   is_tmp(in): true, if the EHT will be based on temporary files.
 *
 * Note: Creates an extendible hashing structure for the particular
 * key type on the disk volume whose identifier is passed in
 * ehid->vfid.volid field. It creates two files on this volume: one
 * for the directory and one for the buckets. It also allocates
 * the first page of each file; the first bucket and the root
 * page of the directory.
 * The directory header area of the root page is
 * initialized by this function. Both the global depth of the
 * directory and the local depth of first bucket are set to 0.
 * The identifier of bucket file is kept in the directory header.
 * The directory file identifier and directory root page
 * identifier are loaded into the remaining fields of "Dir"
 * to be used as the complete identifier of this extendible
 * hashing structure for future reference.
 */
EHID *
xehash_create (THREAD_ENTRY * thread_p, EHID * ehid_p, DB_TYPE key_type, int exp_num_entries, OID * class_oid_p,
           int attr_id, bool is_tmp)
{
  return ehash_create_helper (thread_p, ehid_p, key_type, exp_num_entries, class_oid_p, attr_id, is_tmp);
}

/*
 * ehash_get_key_size () - Return the size of keys in bytes;
 *                      Undeclared if key_type is "DB_TYPE_STRING".
 *   return: byte size of key_type, or -1 for error;
 *   key_type(in):
 */
static short
ehash_get_key_size (DB_TYPE key_type)
{
  short key_size;

  switch (key_type)
    {
    case DB_TYPE_STRING:
      key_size = 1;     /* Size of each key will vary */
      break;

    case DB_TYPE_OBJECT:
      key_size = sizeof (OID);
      break;

#if defined (ENABLE_UNUSED_FUNCTION)
    case DB_TYPE_DOUBLE:
      key_size = sizeof (double);
      break;

    case DB_TYPE_FLOAT:
      key_size = sizeof (float);
      break;

    case DB_TYPE_INTEGER:
      key_size = sizeof (int);
      break;

    case DB_TYPE_BIGINT:
      key_size = sizeof (DB_BIGINT);
      break;

    case DB_TYPE_SHORT:
      key_size = sizeof (short);
      break;

    case DB_TYPE_DATE:
      key_size = sizeof (DB_DATE);
      break;

    case DB_TYPE_TIME:
      key_size = sizeof (DB_TIME);
      break;

    case DB_TYPE_TIMESTAMP:
      key_size = sizeof (DB_UTIME);
      break;

    case DB_TYPE_DATETIME:
      key_size = sizeof (DB_DATETIME);
      break;

    case DB_TYPE_MONETARY:
      key_size = OR_MONETARY_SIZE;
      break;
#endif
    default:
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_EH_INVALID_KEY_TYPE, 1, key_type);
      key_size = -1;
      break;
    }

  return key_size;
}

/*
 * ehash_create_helper () -
 *   return:
 *   ehid(in):
 *   key_type(in):
 *   exp_num_entries(in):
 *   class_oid(in):
 *   attr_id(in):
 *   is_tmp(in):
 */
static EHID *
ehash_create_helper (THREAD_ENTRY * thread_p, EHID * ehid_p, DB_TYPE key_type, int exp_num_entries, OID * class_oid_p,
             int attr_id, bool is_tmp)
{
  EHASH_DIR_HEADER *dir_header_p = NULL;
  EHASH_DIR_RECORD *dir_record_p = NULL;
  VFID dir_vfid = VFID_INITIALIZER;
  VPID dir_vpid = VPID_INITIALIZER;
  PAGE_PTR dir_page_p = NULL;
  VFID bucket_vfid = VFID_INITIALIZER;
  VPID bucket_vpid = VPID_INITIALIZER;
  char init_bucket_data[3];
  DKNPAGES exp_bucket_pages;
  DKNPAGES exp_dir_pages;
  short key_size;
  char alignment;
  OID value;
  unsigned int i;
  FILE_EHASH_DES ehdes;
  PAGE_TYPE ptype = PAGE_EHASH;

  assert (key_type == DB_TYPE_STRING || key_type == DB_TYPE_OBJECT);

  if (ehid_p == NULL)
    {
      return NULL;
    }

  /* create a file descriptor */
  if (class_oid_p != NULL)
    {
      COPY_OID (&ehdes.class_oid, class_oid_p);
    }
  else
    {
      OID_SET_NULL (&ehdes.class_oid);
    }
  ehdes.attr_id = attr_id;

  /* Set the key size */
  key_size = ehash_get_key_size (key_type);
  if (key_size < 0)
    {
      return NULL;
    }

  /* Estimate number of bucket pages */
  if (exp_num_entries < 0)
    {
      /* Assume minimum size */
      exp_bucket_pages = 1;
    }
  else
    {
      if (key_type == DB_TYPE_STRING)
    {
      exp_bucket_pages = exp_num_entries * (20 + sizeof (OID) + 4);
    }
      else
    {
      exp_bucket_pages = exp_num_entries * (key_size + sizeof (OID) + 4);
    }

      exp_bucket_pages = CEIL_PTVDIV (exp_bucket_pages, DB_PAGESIZE);
    }

  /* Calculate alignment to use on slots of bucket pages */
  if (SSIZEOF (value.pageid) >= key_size)
    {
      alignment = sizeof (value.pageid);
    }
  else
    {
      alignment = (char) key_size;
    }

  /* TODO: M2 64-bit */
  /* May want to remove later on; not portable to 64-bit machines */
  if (alignment > SSIZEOF (int))
    {
      alignment = sizeof (int);
    }

  /* Create the first bucket and initialize its header */

  bucket_vfid.volid = ehid_p->vfid.volid;

  if (file_create_ehash (thread_p, exp_bucket_pages, is_tmp, &ehdes, &bucket_vfid) != NO_ERROR)
    {
      ASSERT_ERROR ();
      return NULL;
    }

  /* Log the initialization of the first bucket page */

  init_bucket_data[0] = alignment;
  init_bucket_data[1] = 0;
  init_bucket_data[2] = is_tmp;

  if (file_alloc (thread_p, &bucket_vfid, ehash_initialize_bucket_new_page, init_bucket_data, &bucket_vpid, NULL)
      != NO_ERROR)
    {
      ASSERT_ERROR ();
      goto exit_on_error;
    }

  /* Estimate number of directory pages */
  if (exp_num_entries < 0)
    {
      exp_dir_pages = 1;
    }
  else
    {
      /* Calculate how many directory pages will be used */
      ehash_dir_locate (&exp_dir_pages, &exp_bucket_pages);
      /* exp_dir_pages is actually an index. the number of pages should be +1 */
      exp_dir_pages++;
    }

  /* Create the directory (allocate the first page) and initialize its header */

  dir_vfid.volid = bucket_vfid.volid;

  /*
   * Create the file and allocate the first page
   *
   * We do not initialize the page during the allocation since the file is
   * new, and the file is going to be removed in the event of a crash.
   */

  if (file_create_ehash_dir (thread_p, exp_dir_pages, is_tmp, &ehdes, &dir_vfid) != NO_ERROR)
    {
      ASSERT_ERROR ();
      goto exit_on_error;
    }
  if (file_alloc (thread_p, &dir_vfid, is_tmp ? file_init_temp_page_type : file_init_page_type, &ptype, &dir_vpid,
          &dir_page_p) != NO_ERROR)
    {
      ASSERT_ERROR ();
      goto exit_on_error;
    }
  if (dir_page_p == NULL)
    {
      assert_release (false);
      goto exit_on_error;
    }
#if !defined (NDEBUG)
  pgbuf_check_page_ptype (thread_p, dir_page_p, PAGE_EHASH);
#endif /* !NDEBUG */

  dir_header_p = (EHASH_DIR_HEADER *) dir_page_p;

  /* Initialize the directory header */
  dir_header_p->depth = 0;
  dir_header_p->key_type = key_type;
  dir_header_p->alignment = alignment;
  VFID_COPY (&dir_header_p->bucket_file, &bucket_vfid);
  VFID_SET_NULL (&dir_header_p->overflow_file);

  /* Initialize local depth information */
  dir_header_p->local_depth_count[0] = 1;
  for (i = 1; i <= EHASH_HASH_KEY_BITS; i++)
    {
      dir_header_p->local_depth_count[i] = 0;
    }

  /*
   * Check if the key is of fixed size, and if so, store this information
   * in the directory header
   */

  dir_record_p = (EHASH_DIR_RECORD *) ((char *) dir_page_p + EHASH_DIR_HEADER_SIZE);
  dir_record_p->bucket_vpid = bucket_vpid;

  /*
   * Don't need UNDO since we are just creating the file. If we abort, the
   * file is removed.
   */

  /* Log the directory root page */
  if (!is_tmp)
    {
      log_append_redo_data2 (thread_p, RVEH_INIT_DIR, &dir_vfid, dir_page_p, 0,
                 EHASH_DIR_HEADER_SIZE + sizeof (EHASH_DIR_RECORD), dir_page_p);
    }

  /* Finishing up; release the pages and return directory file id */
  pgbuf_set_dirty (thread_p, dir_page_p, FREE);

  VFID_COPY (&ehid_p->vfid, &dir_vfid);
  ehid_p->pageid = dir_vpid.pageid;

  return ehid_p;

exit_on_error:

  if (!VFID_ISNULL (&bucket_vfid))
    {
      if (is_tmp)
    {
      if (file_destroy (thread_p, &bucket_vfid, is_tmp) != NO_ERROR)
        {
          assert_release (false);
        }
    }
      else
    {
      (void) file_postpone_destroy (thread_p, &bucket_vfid);
    }
    }
  if (!VFID_ISNULL (&dir_vfid))
    {
      if (is_tmp)
    {
      if (file_destroy (thread_p, &dir_vfid, is_tmp) != NO_ERROR)
        {
          assert_release (false);
        }
    }
      else
    {
      (void) file_postpone_destroy (thread_p, &dir_vfid);
    }
    }
  return NULL;
}

/*
 * ehash_fix_old_page () -
 *   return: specified page pointer, or NULL
 *   vfid_p(in): only for error reporting
 *   vpid_p(in):
 *   latch_mode(in): lock mode
 */
static PAGE_PTR
ehash_fix_old_page (THREAD_ENTRY * thread_p, const VFID * vfid_p, const VPID * vpid_p, PGBUF_LATCH_MODE latch_mode)
{
  PAGE_PTR page_p;

  page_p = pgbuf_fix (thread_p, vpid_p, OLD_PAGE, latch_mode, PGBUF_UNCONDITIONAL_LATCH);
  if (page_p == NULL)
    {
      if (er_errid () == ER_PB_BAD_PAGEID)
    {
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_EH_UNKNOWN_EXT_HASH, 4, vfid_p->volid, vfid_p->fileid,
          vpid_p->volid, vpid_p->pageid);
    }

      return NULL;
    }

#if !defined (NDEBUG)
  (void) pgbuf_check_page_ptype (thread_p, page_p, PAGE_EHASH);
#endif /* !NDEBUG */

  return page_p;
}

/*
 * ehash_fix_ehid_page () -
 *   return: specified page pointer, or NULL
 *   ehid(in): extendible hashing structure
 *   latch_mode(in): lock mode
 */
static PAGE_PTR
ehash_fix_ehid_page (THREAD_ENTRY * thread_p, EHID * ehid, PGBUF_LATCH_MODE latch_mode)
{
  VPID vpid;

  vpid.volid = ehid->vfid.volid;
  vpid.pageid = ehid->pageid;

  return ehash_fix_old_page (thread_p, &(ehid->vfid), &vpid, latch_mode);
}

/*
 * ehash_fix_nth_page () -
 *   return: specified page pointer, or NULL
 *   vfid(in):
 *   offset(in):
 *   lock(in): lock mode
 */
static PAGE_PTR
ehash_fix_nth_page (THREAD_ENTRY * thread_p, const VFID * vfid_p, int offset, PGBUF_LATCH_MODE latch_mode)
{
  VPID vpid;

  if (file_numerable_find_nth (thread_p, vfid_p, offset, false, NULL, NULL, &vpid) != NO_ERROR)
    {
      ASSERT_ERROR ();
      return NULL;
    }

  return ehash_fix_old_page (thread_p, vfid_p, &vpid, latch_mode);
}

/*
 * xehash_destroy () - destroy the extensible hash table
 *
 * return        : error code
 * thread_p (in) : thread entry
 * ehid_p (in)   : extensible hash identifier
 *
 * note: only temporary extensible hash tables can be destroyed.
 */
int
xehash_destroy (THREAD_ENTRY * thread_p, EHID * ehid_p)
{
  EHASH_DIR_HEADER *dir_header_p;
  PAGE_PTR dir_page_p;

  if (ehid_p == NULL)
    {
      return ER_FAILED;
    }

  dir_page_p = ehash_fix_ehid_page (thread_p, ehid_p, PGBUF_LATCH_WRITE);
  if (dir_page_p == NULL)
    {
      return ER_FAILED;
    }

  log_sysop_start (thread_p);

  dir_header_p = (EHASH_DIR_HEADER *) dir_page_p;

  if (file_destroy (thread_p, &(dir_header_p->bucket_file), true) != NO_ERROR)
    {
      assert_release (false);
    }
  pgbuf_unfix (thread_p, dir_page_p);
  if (file_destroy (thread_p, &ehid_p->vfid, true) != NO_ERROR)
    {
      assert_release (false);
    }

  log_sysop_commit (thread_p);

  return NO_ERROR;
}

static int
ehash_find_bucket_vpid (THREAD_ENTRY * thread_p, EHID * ehid_p, EHASH_DIR_HEADER * dir_header_p, int location,
            PGBUF_LATCH_MODE latch, VPID * out_vpid_p)
{
  EHASH_DIR_RECORD *dir_record_p;
  PAGE_PTR dir_page_p;
  int dir_offset;

  ehash_dir_locate (&dir_offset, &location);

  if (dir_offset != 0)
    {
      /* The bucket pointer is not in the root (first) page of the directory */
      dir_page_p = ehash_fix_nth_page (thread_p, &ehid_p->vfid, dir_offset, latch);
      if (dir_page_p == NULL)
    {
      return ER_FAILED;
    }

      /* Find the bucket page containing the key */
      dir_record_p = (EHASH_DIR_RECORD *) ((char *) dir_page_p + location);
      pgbuf_unfix_and_init (thread_p, dir_page_p);
    }
  else
    {
      dir_record_p = (EHASH_DIR_RECORD *) ((char *) dir_header_p + location);
    }

  *out_vpid_p = dir_record_p->bucket_vpid;
  return NO_ERROR;
}

static PAGE_PTR
ehash_find_bucket_vpid_with_hash (THREAD_ENTRY * thread_p, EHID * ehid_p, void *key_p, PGBUF_LATCH_MODE root_latch,
                  PGBUF_LATCH_MODE bucket_latch, VPID * out_vpid_p, EHASH_HASH_KEY * out_hash_key_p,
                  int *out_location_p)
{
  EHASH_DIR_HEADER *dir_header_p;
  PAGE_PTR dir_root_page_p;
  EHASH_HASH_KEY hash_key;
  int location;

  dir_root_page_p = ehash_fix_ehid_page (thread_p, ehid_p, root_latch);
  if (dir_root_page_p == NULL)
    {
      return NULL;
    }

  dir_header_p = (EHASH_DIR_HEADER *) dir_root_page_p;

  /* Get the pseudo key */
  hash_key = ehash_hash (key_p, dir_header_p->key_type);
  if (out_hash_key_p)
    {
      *out_hash_key_p = hash_key;
    }

  /* Find the location of bucket pointer in the directory */
  location = FIND_OFFSET (hash_key, dir_header_p->depth);
  if (out_location_p)
    {
      *out_location_p = location;
    }

  if (ehash_find_bucket_vpid (thread_p, ehid_p, dir_header_p, location, bucket_latch, out_vpid_p) != NO_ERROR)
    {
      pgbuf_unfix_and_init (thread_p, dir_root_page_p);
      return NULL;
    }

  return dir_root_page_p;
}

/*
 * ehash_search () - Search for the key; return associated value
 *   return: EH_SEARCH
 *   ehid(in): identifier for the extendible hashing structure
 *   key(in): key to search
 *   value_ptr(out): pointer to return the value associated with the key
 *
 * Note: Returns the value associated with the given key, if it is
 * found in the specified extendible hashing structure. If the
 * key is not found an error condition is returned.
 */
EH_SEARCH
ehash_search (THREAD_ENTRY * thread_p, EHID * ehid_p, void *key_p, OID * value_p)
{
  EHASH_DIR_HEADER *dir_header_p;
  PAGE_PTR dir_root_page_p = NULL;
  PAGE_PTR bucket_page_p = NULL;
  VPID bucket_vpid;
  RECDES recdes;
  PGSLOTID slot_id;
  EH_SEARCH result = EH_KEY_NOTFOUND;

  if (ehid_p == NULL || key_p == NULL)
    {
      return EH_KEY_NOTFOUND;
    }

  dir_root_page_p =
    ehash_find_bucket_vpid_with_hash (thread_p, ehid_p, key_p, PGBUF_LATCH_READ, PGBUF_LATCH_READ, &bucket_vpid, NULL,
                      NULL);
  if (dir_root_page_p == NULL)
    {
      return EH_ERROR_OCCURRED;
    }

  dir_header_p = (EHASH_DIR_HEADER *) dir_root_page_p;

  if (bucket_vpid.pageid == NULL_PAGEID)
    {
      result = EH_KEY_NOTFOUND;
      goto end;
    }

  bucket_page_p = ehash_fix_old_page (thread_p, &ehid_p->vfid, &bucket_vpid, PGBUF_LATCH_READ);
  if (bucket_page_p == NULL)
    {
      result = EH_ERROR_OCCURRED;
      goto end;
    }

  if (ehash_locate_slot (thread_p, bucket_page_p, dir_header_p->key_type, key_p, &slot_id) == false)
    {
      result = EH_KEY_NOTFOUND;
      goto end;
    }

  (void) spage_get_record (thread_p, bucket_page_p, slot_id, &recdes, PEEK);
  (void) ehash_read_oid_from_record (recdes.data, value_p);
  result = EH_KEY_FOUND;

end:
  if (bucket_page_p)
    {
      pgbuf_unfix_and_init (thread_p, bucket_page_p);
    }

  if (dir_root_page_p)
    {
      pgbuf_unfix_and_init (thread_p, dir_root_page_p);
    }

  return result;
}

/*
 * ehash_insert () - Insert (key, assoc_value) pair to ext. hashing
 *   return: void * (NULL is returned in case of error)
 *   ehid(in): identifier of the extendible hashing structure
 *   key(in): key value to insert
 *   value_ptr(in): pointer to the associated value to insert
 *
 * Note: Inserts the given (key & assoc_value) pair into the specified
 * extendible hashing structure. If the key already exists then
 * the previous associated value is replaced with the new one.
 * Otherwise, a new record is inserted to the correct bucket.
 *      To perform the insertion operation safely in the concurrent
 * environment, an auxiliary (and private) function
 * "eh_insert_helper", which takes an additional parameter to
 * specify the lock type to acquire on the directory, is used
 * by this function. Hoping that this insertion will not effect
 * the directory pages, it passes a shared lock
 * (i.e, an "S_LOCK") for this parameter.
 */
void *
ehash_insert (THREAD_ENTRY * thread_p, EHID * ehid_p, void *key_p, OID * value_p)
{
  if (ehid_p == NULL || key_p == NULL)
    {
      return NULL;
    }

  return ehash_insert_helper (thread_p, ehid_p, key_p, value_p, S_LOCK, NULL);
}

static int
ehash_insert_to_bucket_after_create (THREAD_ENTRY * thread_p, EHID * ehid_p, PAGE_PTR dir_root_page_p,
                     EHASH_DIR_HEADER * dir_header_p, VPID * bucket_vpid_p, int location,
                     EHASH_HASH_KEY hash_key, bool is_temp, void *key_p, OID * value_p,
                     VPID * existing_ovf_vpid_p)
{
  PAGE_PTR bucket_page_p;
  EHASH_BUCKET_HEADER bucket_header;
  char found_depth;
  char init_bucket_data[3];
  VPID null_vpid = { NULL_VOLID, NULL_PAGEID };
  EHASH_RESULT ins_result;

  int error_code = NO_ERROR;

  found_depth = ehash_find_depth (thread_p, ehid_p, location, &null_vpid, &null_vpid);
  if (found_depth == 0)
    {
      return ER_FAILED;
    }

  init_bucket_data[0] = dir_header_p->alignment;
  init_bucket_data[1] = dir_header_p->depth - found_depth;
  init_bucket_data[2] = is_temp;
  bucket_header.local_depth = init_bucket_data[1];

  log_sysop_start (thread_p);

  error_code = file_alloc (thread_p, &dir_header_p->bucket_file, ehash_initialize_bucket_new_page, init_bucket_data,
               bucket_vpid_p, &bucket_page_p);
  if (error_code != NO_ERROR)
    {
      ASSERT_ERROR ();
      log_sysop_abort (thread_p);
      return ER_FAILED;
    }
  if (bucket_page_p == NULL)
    {
      assert_release (false);
      log_sysop_abort (thread_p);
      return ER_FAILED;
    }
#if !defined (NDEBUG)
  (void) pgbuf_check_page_ptype (thread_p, bucket_page_p, PAGE_EHASH);
#endif /* !NDEBUG */

  if (ehash_connect_bucket (thread_p, ehid_p, bucket_header.local_depth, hash_key, bucket_vpid_p, is_temp) != NO_ERROR)
    {
      pgbuf_unfix_and_init (thread_p, bucket_page_p);
      log_sysop_abort (thread_p);
      return ER_FAILED;
    }

  ehash_adjust_local_depth (thread_p, ehid_p, dir_root_page_p, dir_header_p, (int) bucket_header.local_depth, 1,
                is_temp);

  log_sysop_commit (thread_p);

  ins_result =
    ehash_insert_to_bucket (thread_p, ehid_p, &dir_header_p->overflow_file, is_temp, bucket_page_p,
                dir_header_p->key_type, key_p, value_p, existing_ovf_vpid_p);

  if (ins_result != EHASH_SUCCESSFUL_COMPLETION)
    {
      /* Slotted page module refuses to insert a short size record to an almost empty page. This should never happen. */
      er_set (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, ER_GENERIC_ERROR, 0);
      pgbuf_unfix_and_init (thread_p, bucket_page_p);
      return ER_FAILED;
    }

  pgbuf_unfix_and_init (thread_p, bucket_page_p);
  return NO_ERROR;
}

static PAGE_PTR
ehash_extend_bucket (THREAD_ENTRY * thread_p, EHID * ehid_p, PAGE_PTR dir_root_page_p, EHASH_DIR_HEADER * dir_header_p,
             PAGE_PTR bucket_page_p, void *key_p, EHASH_HASH_KEY hash_key, int *out_new_bit_p,
             VPID * bucket_vpid, bool is_temp)
{
  VPID sibling_vpid;
  PAGE_PTR sibling_page_p = NULL;
  VPID null_vpid = { NULL_VOLID, NULL_PAGEID };
  int old_local_depth;
  int new_local_depth;

  if (!is_temp)
    {
      log_sysop_start (thread_p);
    }

  sibling_page_p =
    ehash_split_bucket (thread_p, dir_header_p, bucket_page_p, key_p, &old_local_depth, &new_local_depth,
            &sibling_vpid, is_temp);
  if (sibling_page_p == NULL)
    {
      if (!is_temp)
    {
      log_sysop_abort (thread_p);
    }
      return NULL;
    }

  /* Save the bit position of hash_key to be used later in deciding whether to insert the new key to original bucket or
   * to the new sibling bucket. */
  *out_new_bit_p = GETBIT (hash_key, new_local_depth);

  ehash_adjust_local_depth (thread_p, ehid_p, dir_root_page_p, dir_header_p, old_local_depth, -1, is_temp);
  ehash_adjust_local_depth (thread_p, ehid_p, dir_root_page_p, dir_header_p, new_local_depth, 2, is_temp);

  /* Check directory expansion condition */
  if (new_local_depth > dir_header_p->depth)
    {
      if (ehash_expand_directory (thread_p, ehid_p, new_local_depth, is_temp) != NO_ERROR)
    {
      pgbuf_unfix_and_init (thread_p, sibling_page_p);
      if (!is_temp)
        {
          log_sysop_abort (thread_p);
        }
      return NULL;
    }
    }

  /* Connect the buckets */
  if ((new_local_depth - old_local_depth) > 1)
    {
      /* First, set all of them to NULL_PAGEID */
      if (ehash_connect_bucket (thread_p, ehid_p, old_local_depth, hash_key, &null_vpid, is_temp) != NO_ERROR)
    {
      pgbuf_unfix_and_init (thread_p, sibling_page_p);
      if (!is_temp)
        {
          log_sysop_abort (thread_p);
        }
      return NULL;
    }

      /* Then, connect the Bucket page */
      hash_key = CLEARBIT (hash_key, new_local_depth);
      if (ehash_connect_bucket (thread_p, ehid_p, new_local_depth, hash_key, bucket_vpid, is_temp) != NO_ERROR)
    {
      pgbuf_unfix_and_init (thread_p, sibling_page_p);
      if (!is_temp)
        {
          log_sysop_abort (thread_p);
        }
      return NULL;
    }
    }

  /* Finally, connect the Sibling bucket page */
  hash_key = SETBIT (hash_key, new_local_depth);
  if (ehash_connect_bucket (thread_p, ehid_p, new_local_depth, hash_key, &sibling_vpid, is_temp) != NO_ERROR)
    {
      pgbuf_unfix_and_init (thread_p, sibling_page_p);
      if (!is_temp)
    {
      log_sysop_abort (thread_p);
    }
      return NULL;
    }

  if (!is_temp)
    {
      log_sysop_commit (thread_p);
    }
  return sibling_page_p;
}

static EHASH_RESULT
ehash_insert_bucket_after_extend_if_need (THREAD_ENTRY * thread_p, EHID * ehid_p, PAGE_PTR dir_root_page_p,
                      EHASH_DIR_HEADER * dir_header_p, VPID * bucket_vpid_p, void *key_p,
                      EHASH_HASH_KEY hash_key, int lock_type, bool is_temp, OID * value_p,
                      VPID * existing_ovf_vpid_p)
{
  PAGE_PTR bucket_page_p;
  PAGE_PTR sibling_page_p = NULL;
  PAGE_PTR target_bucket_page_p;
  int new_bit;
  EHASH_RESULT result;

  /* We need to put a X_LOCK on bucket page */
  bucket_page_p = ehash_fix_old_page (thread_p, &ehid_p->vfid, bucket_vpid_p, PGBUF_LATCH_WRITE);
  if (bucket_page_p == NULL)
    {
      return EHASH_ERROR_OCCURRED;
    }

  result =
    ehash_insert_to_bucket (thread_p, ehid_p, &dir_header_p->overflow_file, is_temp, bucket_page_p,
                dir_header_p->key_type, key_p, value_p, existing_ovf_vpid_p);
  if (result == EHASH_BUCKET_FULL)
    {
      if (lock_type == S_LOCK)
    {
      pgbuf_unfix_and_init (thread_p, bucket_page_p);
      return EHASH_BUCKET_FULL;
    }

      sibling_page_p =
    ehash_extend_bucket (thread_p, ehid_p, dir_root_page_p, dir_header_p, bucket_page_p, key_p, hash_key, &new_bit,
                 bucket_vpid_p, is_temp);
      if (sibling_page_p == NULL)
    {
      pgbuf_unfix_and_init (thread_p, bucket_page_p);
      return EHASH_ERROR_OCCURRED;
    }

      /*
       * Try to insert the new key & assoc_value pair into one of the buckets.
       * The result of this attempt will determine if a recursive call is
       * needed to insert the new key.
       */

      if (new_bit)
    {
      target_bucket_page_p = sibling_page_p;
    }
      else
    {
      target_bucket_page_p = bucket_page_p;
    }

      result =
    ehash_insert_to_bucket (thread_p, ehid_p, &dir_header_p->overflow_file, is_temp, target_bucket_page_p,
                dir_header_p->key_type, key_p, value_p, existing_ovf_vpid_p);
      pgbuf_unfix_and_init (thread_p, sibling_page_p);
    }

  pgbuf_unfix_and_init (thread_p, bucket_page_p);
  return result;
}

/*
 * ehash_insert_helper () - Perform insertion
 *   return: void * (NULL is returned in case of error)
 *   ehid(in): identifier for the extendible hashing structure
 *   key(in): key value
 *   value_ptr(in): associated value to insert
 *   lock_type(in): type of lock to acquire for accessing directory pages
 *   existing_ovf_vpid(in):
 *
 */
static void *
ehash_insert_helper (THREAD_ENTRY * thread_p, EHID * ehid_p, void *key_p, OID * value_p, int lock_type,
             VPID * existing_ovf_vpid_p)
{
  EHASH_DIR_HEADER *dir_header_p;
  PAGE_PTR dir_root_page_p;
  VPID bucket_vpid;

  EHASH_HASH_KEY hash_key;
  int location;
  EHASH_RESULT result;
  bool is_temp = false;

  if (file_is_temp (thread_p, &ehid_p->vfid, &is_temp) != NO_ERROR)
    {
      ASSERT_ERROR ();
      return NULL;
    }

  dir_root_page_p =
    ehash_find_bucket_vpid_with_hash (thread_p, ehid_p, key_p,
                      (lock_type == S_LOCK ? PGBUF_LATCH_READ : PGBUF_LATCH_WRITE), PGBUF_LATCH_READ,
                      &bucket_vpid, &hash_key, &location);
  if (dir_root_page_p == NULL)
    {
      return NULL;
    }

  dir_header_p = (EHASH_DIR_HEADER *) dir_root_page_p;

#if defined(EHINSERTION_ORDER)
  fprintf (stdout, "Ex Hash %d|%d|%d Insert:", ehid_p->vfid.volid, ehid_p->vfid.fileid, ehid_p->pageid);
  eh_dump_key (dir_header_p->key_type, key_p, value_p);
#endif /* EHINSERTION_ORDER */

  if (dir_header_p->key_type == DB_TYPE_STRING)
    {
      /* max length of class name is 255 */
      assert (strlen ((char *) key_p) < 256);

#if defined (ENABLE_UNUSED_FUNCTION)
      /* Check if string is too long and no overflow file has been created */
      if ((strlen ((char *) key_p) + 1) > EHASH_MAX_STRING_SIZE && VFID_ISNULL (&dir_header_p->overflow_file))
    {
      if (lock_type == S_LOCK)
        {
          pgbuf_unfix_and_init (thread_p, dir_root_page_p);
          return ehash_insert_helper (thread_p, ehid_p, key_p, value_p, X_LOCK, existing_ovf_vpid_p);
        }
      else
        {
          if (ehash_create_overflow_file (thread_p, ehid_p, dir_root_page_p, dir_header_p, file_type) != NO_ERROR)
        {
          pgbuf_unfix_and_init (thread_p, dir_root_page_p);
          return NULL;
        }
        }
    }
#endif
    }

  if (VPID_ISNULL (&bucket_vpid))
    {
      if (lock_type == S_LOCK)
    {
      /* release lock and call itself to obtain X_LOCK */
      pgbuf_unfix_and_init (thread_p, dir_root_page_p);
      return ehash_insert_helper (thread_p, ehid_p, key_p, value_p, X_LOCK, existing_ovf_vpid_p);
    }
      else
    {
      if (ehash_insert_to_bucket_after_create
          (thread_p, ehid_p, dir_root_page_p, dir_header_p, &bucket_vpid, location, hash_key, is_temp, key_p,
           value_p, existing_ovf_vpid_p) != NO_ERROR)
        {
          pgbuf_unfix_and_init (thread_p, dir_root_page_p);
          return NULL;
        }
    }
    }
  else
    {
      result =
    ehash_insert_bucket_after_extend_if_need (thread_p, ehid_p, dir_root_page_p, dir_header_p, &bucket_vpid, key_p,
                          hash_key, lock_type, is_temp, value_p, existing_ovf_vpid_p);
      if (result == EHASH_ERROR_OCCURRED)
    {
      pgbuf_unfix_and_init (thread_p, dir_root_page_p);
      return NULL;
    }
      else if (result == EHASH_BUCKET_FULL)
    {
      pgbuf_unfix_and_init (thread_p, dir_root_page_p);
      return ehash_insert_helper (thread_p, ehid_p, key_p, value_p, X_LOCK, existing_ovf_vpid_p);
    }
    }

  pgbuf_unfix_and_init (thread_p, dir_root_page_p);
  return (key_p);
}

/*
 * ehash_insert_to_bucket () - Insert (key, value) to bucket
 *   return: EHASH_RESULT
 *   ehid(in): identifier for the extendible hashing structure
 *   overflow_file(in): Overflow file for extendible hash
 *   is_temp(in): is extensible hash temporary?
 *   buc_pgptr(in): bucket page to insert the key
 *   key_type(in): type of the key
 *   key_ptr(in): Pointer to the key
 *   value_ptr(in): Pointer to the associated value
 *   existing_ovf_vpid(in):
 *
 * Note: This function is used to insert a (key & assoc_value) pair
 * onto the given bucket. If the KEY already EXISTS in the bucket
 * the NEW ASSOCIATED VALUE REPLACES THE OLD ONE. Otherwise a
 * new entry is added to the bucket and the total number of
 * entries of this extendible hashing structure is incremented
 * by one. In the latter case, if the insertion is not possible
 * for some reason (e.g., the bucket does not have enough space
 * for the new record, etc.) an appropriate error code is returned.
 */
static EHASH_RESULT
ehash_insert_to_bucket (THREAD_ENTRY * thread_p, EHID * ehid_p, VFID * ovf_file_p, bool is_temp,
            PAGE_PTR bucket_page_p, DB_TYPE key_type, void *key_p, OID * value_p,
            VPID * existing_ovf_vpid_p)
{
  char *bucket_record_p;
  RECDES bucket_recdes;
  RECDES old_bucket_recdes;
  RECDES log_recdes;
#if defined (ENABLE_UNUSED_FUNCTION)
  RECDES ovf_recdes;
  VPID ovf_vpid;
  char *record_p;
#endif
  char *log_record_p;

  PGSLOTID slot_no;
  int success;
  bool is_replaced_oid = false;

  /* Check if insertion is possible, or not */
  if (ehash_locate_slot (thread_p, bucket_page_p, key_type, key_p, &slot_no) == true)
    {
      /*
       * Key already exists. So, replace the associated value
       * MIGHT BE CHANGED to allow multiple values
       */
      (void) spage_get_record (thread_p, bucket_page_p, slot_no, &old_bucket_recdes, PEEK);
      bucket_record_p = (char *) old_bucket_recdes.data;

      /*
       * Store the original OID associated with the key before it is replaced
       * with the new OID value.
       */
      is_replaced_oid = true;

      if (ehash_allocate_recdes (&bucket_recdes, old_bucket_recdes.length, old_bucket_recdes.type) == NULL)
    {
      return EHASH_ERROR_OCCURRED;
    }

      memcpy (bucket_recdes.data, bucket_record_p, bucket_recdes.length);
      (void) ehash_write_oid_to_record (bucket_record_p, value_p);
    }
  else
    {
      /*
       * Key does not exist in the bucket, so create a record for it and
       * insert it into the bucket;
       */

      if (ehash_compose_record (key_type, key_p, value_p, &bucket_recdes) != NO_ERROR)
    {
      return EHASH_ERROR_OCCURRED;
    }

#if defined (ENABLE_UNUSED_FUNCTION)
      /*
       * If this is a long string produce the prefix key record and see if it
       * is going to fit into this page. If it fits, produce the overflow pages.
       * If it does not return failure with SP_BUCKET_FULL value.
       */
      if (bucket_recdes.type == REC_BIGONE)
    {
      if (bucket_recdes.length > spage_max_space_for_new_record (thread_p, bucket_page_p))
        {
          ehash_free_recdes (&bucket_recdes);
          return EHASH_BUCKET_FULL;
        }

      ovf_recdes.data = (char *) key_p + EHASH_LONG_STRING_PREFIX_SIZE;
      ovf_recdes.area_size = strlen ((char *) key_p) + 1;
      ovf_recdes.length = ovf_recdes.area_size;

      /* Update the prefix key record; put overflow page id */
      record_p = bucket_recdes.data;
      record_p += sizeof (OID); /* Skip the associated OID */

      if (existing_ovf_vpid_p != NULL)
        /*
         * Overflow pages already exists for this key (i.e., we are
         * undoing a deletion of a long string
         * **** TODO: M2 Is this right ? ****
         */
        *(VPID *) record_p = *existing_ovf_vpid_p;
      else
        {
          /* Create the overflow pages */
          if (overflow_insert (thread_p, ovf_file_p, &ovf_vpid, &ovf_recdes, NULL) == NULL)
        {
          /*
           * overflow pages creation failed; do not insert the prefix
           * key record to the bucket; return with error
           */
          ehash_free_recdes (&bucket_recdes);
          return EHASH_ERROR_OCCURRED;
        }
          *(VPID *) record_p = ovf_vpid;
        }
    }
#endif

      /* Try to put the record to the slotted page */
      success = spage_insert_at (thread_p, bucket_page_p, slot_no, &bucket_recdes);
      if (success != SP_SUCCESS)
    {
      /* Problem the record was not inserted to the page */
      ehash_free_recdes (&bucket_recdes);

#if defined (ENABLE_UNUSED_FUNCTION)
      if (bucket_recdes.type == REC_BIGONE && existing_ovf_vpid_p == NULL)
        {
          /* if overflow pages has just been created delete them */
          (void) overflow_delete (thread_p, ovf_file_p, &ovf_vpid);
        }
#endif

      if (success == SP_DOESNT_FIT)
        {
          /* There is not enough space on the slotted page for the new record */
          return EHASH_BUCKET_FULL;
        }
      else
        {
          er_set (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, ER_GENERIC_ERROR, 0);
          return EHASH_ERROR_OCCURRED;
        }
    }
    }

  /***********************************
      Log this insertion operation
   ***********************************/

  /* Add the "ehid" to the record for no-page operation logging */
  if (ehash_allocate_recdes (&log_recdes, bucket_recdes.length + sizeof (EHID), REC_HOME) == NULL)
    {
      return EHASH_ERROR_OCCURRED;
    }

  log_record_p = log_recdes.data;

  /* First insert the Ext. Hashing identifier */
  log_record_p = ehash_write_ehid_to_record (log_record_p, ehid_p);

  /* Copy (the assoc-value, key) pair from the bucket record */
  memcpy (log_record_p, bucket_recdes.data, bucket_recdes.length);

  if (!is_temp)
    {
      if (is_replaced_oid)
    {
      /*
       * This insertion has actully replaced the original oid. The undo
       * logging should cause this original oid to be restored in the record.
       * The undo recovery function "eh_rvundo_delete" with the original oid
       * as parameter will do this.
       */
      log_append_undo_data2 (thread_p, RVEH_DELETE, &ehid_p->vfid, NULL, bucket_recdes.type, log_recdes.length,
                 log_recdes.data);
    }
      else
    {
      /* insertion */
      log_append_undo_data2 (thread_p, RVEH_INSERT, &ehid_p->vfid, NULL, bucket_recdes.type, log_recdes.length,
                 log_recdes.data);
    }
    }

  if (is_replaced_oid)
    {
      /*
       * This insertion has actually replaced the original oid. The redo logging
       * should cause this new oid to be written at its current physical
       * location.
       */
      if (!is_temp)
    {
      log_append_redo_data2 (thread_p, RVEH_REPLACE, &ehid_p->vfid, bucket_page_p,
                 (int) (old_bucket_recdes.data - bucket_page_p), sizeof (OID), old_bucket_recdes.data);
    }
    }
  else
    {
      /* Store the rec_type as "short" to avoid alignment problems */
      log_recdes.area_size = log_recdes.length = bucket_recdes.length + sizeof (short);

      /*
       * If undo logging was done the log_recdes.data should have enough space
       * for the redo logging since the redo log record is shorter than
       * the undo log record. Otherwise, allocate space for redo log record.
       */
      if (log_recdes.data == NULL)
    {
      if (ehash_allocate_recdes (&log_recdes, log_recdes.length, REC_HOME) == NULL)
        {
          return EHASH_ERROR_OCCURRED;
        }
    }
      log_record_p = log_recdes.data;

      /* First insert the key_type identifier */

      *(short *) log_record_p = bucket_recdes.type;
      log_record_p += sizeof (short);

      /* Copy (the assoc-value, key) pair from the bucket record */
      memcpy (log_record_p, bucket_recdes.data, bucket_recdes.length);

      if (!is_temp)
    {
      log_append_redo_data2 (thread_p, RVEH_INSERT, &ehid_p->vfid, bucket_page_p, slot_no, log_recdes.length,
                 log_recdes.data);
    }
    }

  if (bucket_recdes.data)
    {
      ehash_free_recdes (&bucket_recdes);
    }

  if (log_recdes.data)
    {
      ehash_free_recdes (&log_recdes);
    }

  pgbuf_set_dirty (thread_p, bucket_page_p, DONT_FREE);
  return EHASH_SUCCESSFUL_COMPLETION;
}

static int
ehash_write_key_to_record (RECDES * recdes_p, DB_TYPE key_type, void *key_p, short key_size, OID * value_p,
               bool is_long_str)
{
  char *record_p;

  recdes_p->type = is_long_str ? REC_BIGONE : REC_HOME;

  record_p = recdes_p->data;
  record_p = ehash_write_oid_to_record (record_p, value_p);

  /* TODO: M2 64-bit ??? Is this really needed... Can we just move bytes.. we have the size of the key.. AT least all
   * data types but string_key */
  switch (key_type)
    {
    case DB_TYPE_STRING:
#if defined (ENABLE_UNUSED_FUNCTION)
      if (is_long_str)
    {
      /* Key is a long string; produce just the prefix reord */
      /* Skip the overflow page id; will be filled by the caller function */
      record_p += sizeof (VPID);
    }
#endif
      memcpy (record_p, (char *) key_p, key_size);
      record_p += key_size;
      break;

    case DB_TYPE_OBJECT:
      *(OID *) record_p = *(OID *) key_p;
      break;
#if defined (ENABLE_UNUSED_FUNCTION)
    case DB_TYPE_DOUBLE:
      OR_MOVE_DOUBLE (key_p, record_p);
      break;

    case DB_TYPE_FLOAT:
      *(float *) record_p = *(float *) key_p;
      break;

    case DB_TYPE_INTEGER:
      *(int *) record_p = *(int *) key_p;
      break;

    case DB_TYPE_BIGINT:
      *(DB_BIGINT *) record_p = *(DB_BIGINT *) key_p;
      break;

    case DB_TYPE_SHORT:
      *(short *) record_p = *(short *) key_p;
      break;

    case DB_TYPE_DATE:
      *(DB_DATE *) record_p = *(DB_DATE *) key_p;
      break;

    case DB_TYPE_TIME:
      *(DB_TIME *) record_p = *(DB_TIME *) key_p;
      break;

    case DB_TYPE_TIMESTAMP:
      *(DB_UTIME *) record_p = *(DB_UTIME *) key_p;
      break;

    case DB_TYPE_DATETIME:
      *(DB_DATETIME *) record_p = *(DB_DATETIME *) key_p;
      break;

    case DB_TYPE_MONETARY:
      OR_MOVE_DOUBLE (&((DB_MONETARY *) key_p)->amount, record_p);
      break;
#endif
    default:
      /* Unspecified key type: Directory header has been corrupted */
      er_set (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, ER_EH_CORRUPTED, 0);
      ehash_free_recdes (recdes_p);
      return ER_EH_CORRUPTED;
    }

  return NO_ERROR;
}

/*
 * ehash_compose_record () -
 *   return: NO_ERROR, or ER_FAILED
 *   key_type(in): type of the key
 *   key_ptr(in): Pointer to the key
 *   value_ptr(in): Pointer to the associated value
 *   recdes(in): Pointer to the Record descriptor to fill in
 *
 * Note: This function prepares a record and sets the fields of the
 * passed record descriptor to describe it. All of the records
 * prepared by this function contain the
 * (associated_value, key_value) pairs (in this order). However,
 * these two fields may optionally be preceeded with the "ehid"
 * information. The records with the "ehid" information are used
 * for logging purposes. The ones without ehid information are
 * used to insert entries into the bucket pages. The dynamic
 * memory area allocated by this function for the record should
 * be freed by the caller function.
 *
 * If the key is of string type and it is > EHASH_MAX_STRING_SIZE, we
 * only include a prefix to the key. The caller is responsible
 * for the overflow part.
 */
static int
ehash_compose_record (DB_TYPE key_type, void *key_p, OID * value_p, RECDES * recdes_p)
{
  short key_size;
  int record_size;
  bool is_long_str = false;

  if (key_type == DB_TYPE_STRING)
    {
      key_size = (short) strlen ((char *) key_p) + 1;   /* Plus one is for \0 */

      /* max length of class name is 255 */
      assert (key_size <= 256);

#if defined (ENABLE_UNUSED_FUNCTION)
      /* Check if string is too long */
      if (key_size > EHASH_MAX_STRING_SIZE)
    {
      is_long_str = true;
      key_size = EHASH_LONG_STRING_PREFIX_SIZE;
    }
#endif

      record_size = sizeof (OID) + key_size;

#if defined (ENABLE_UNUSED_FUNCTION)
      /* If an overflow page is needed, reserve space for it */
      if (is_long_str)
    {
      record_size += sizeof (VPID);
    }
#endif
    }
  else
    {
      key_size = ehash_get_key_size (key_type);
      if (key_size < 0)
    {
      return ER_FAILED;
    }
      record_size = sizeof (OID) + key_size;
    }

  if (ehash_allocate_recdes (recdes_p, record_size, REC_HOME) == NULL)
    {
      return ER_FAILED;
    }

  return ehash_write_key_to_record (recdes_p, key_type, key_p, key_size, value_p, is_long_str);
}

static int
ehash_compare_key (THREAD_ENTRY * thread_p, char *bucket_record_p, DB_TYPE key_type, void *key_p, INT16 record_type,
           int *out_compare_result_p)
{
  int compare_result;
#if defined (ENABLE_UNUSED_FUNCTION)
  VPID *ovf_vpid_p;
  double d1, d2;
  float f1, f2;
  DB_DATETIME *dt1, *dt2;
  DB_BIGINT bi1, bi2;
#endif

  switch (key_type)
    {
    case DB_TYPE_STRING:
#if defined (ENABLE_UNUSED_FUNCTION)
      if (record_type == REC_BIGONE)
    {
      /* bucket record is a LOG key string */
      ovf_vpid_p = (VPID *) bucket_record_p;
      bucket_record_p += sizeof (VPID);

      compare_result = ehash_ansi_sql_strncmp ((char *) key_p, bucket_record_p, EHASH_LONG_STRING_PREFIX_SIZE);
      if (!compare_result)
        {
          /*
           * The prefix of the bucket string matches with the given key.
           * It is very likely that the whole key will match. So, retrive
           * the chopped portion of the bucket string and compare it with
           * the chopped portion of the key.
           */
          if (ehash_compare_overflow (thread_p, ovf_vpid_p, (char *) key_p + EHASH_LONG_STRING_PREFIX_SIZE,
                      &compare_result) != NO_ERROR)
        {
          return ER_FAILED;
        }
        }
    }
      else
#endif
    {
      compare_result = ansisql_strcmp ((char *) key_p, bucket_record_p);
    }
      break;

    case DB_TYPE_OBJECT:
      compare_result = oid_compare ((OID *) key_p, (OID *) bucket_record_p);
      break;
#if defined (ENABLE_UNUSED_FUNCTION)
    case DB_TYPE_DOUBLE:
      OR_MOVE_DOUBLE (key_p, &d1);
      OR_MOVE_DOUBLE (bucket_record_p, &d2);

      if (d1 == d2)
    {
      compare_result = 0;
    }
      else if (d1 > d2)
    {
      compare_result = 1;
    }
      else
    {
      compare_result = -1;
    }
      break;

    case DB_TYPE_FLOAT:
      f1 = *((float *) key_p);
      f2 = *((float *) bucket_record_p);

      if (f1 == f2)
    {
      compare_result = 0;
    }
      else if (f1 > f2)
    {
      compare_result = 1;
    }
      else
    {
      compare_result = -1;
    }
      break;

    case DB_TYPE_INTEGER:
      compare_result = *(int *) key_p - *(int *) bucket_record_p;
      break;

    case DB_TYPE_BIGINT:
      bi1 = *((DB_BIGINT *) key_p);
      bi2 = *((DB_BIGINT *) bucket_record_p);

      if (bi1 == bi2)
    {
      compare_result = 0;
    }
      else if (bi1 > bi2)
    {
      compare_result = 1;
    }
      else
    {
      compare_result = -1;
    }
      break;

    case DB_TYPE_SHORT:
      compare_result = *(short *) key_p - *(short *) bucket_record_p;
      break;

    case DB_TYPE_DATE:
      compare_result = *(DB_DATE *) key_p - *(DB_DATE *) bucket_record_p;
      break;

    case DB_TYPE_TIME:
      compare_result = *(DB_TIME *) key_p - *(DB_TIME *) bucket_record_p;
      break;

    case DB_TYPE_TIMESTAMP:
      compare_result = *(DB_UTIME *) key_p - *(DB_UTIME *) bucket_record_p;
      break;

    case DB_TYPE_DATETIME:
      dt1 = (DB_DATETIME *) key_p;
      dt2 = (DB_DATETIME *) bucket_record_p;

      if (dt1->date < dt2->date)
    {
      compare_result = -1;
    }
      else if (dt1->date > dt2->date)
    {
      compare_result = 1;
    }
      else if (dt1->time < dt2->time)
    {
      compare_result = -1;
    }
      else if (dt1->time > dt2->time)
    {
      compare_result = 1;
    }
      else
    {
      compare_result = 0;
    }
      break;

    case DB_TYPE_MONETARY:
      OR_MOVE_DOUBLE (&((DB_MONETARY *) key_p)->amount, &d1);
      OR_MOVE_DOUBLE (&((DB_MONETARY *) bucket_record_p)->amount, &d2);

      if (d1 == d2)
    {
      compare_result = 0;
    }
      else if (d1 > d2)
    {
      compare_result = 1;
    }
      else
    {
      compare_result = -1;
    }
      break;
#endif
    default:
      /* Unspecified key type: Directory header has been corrupted */
      er_set (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, ER_EH_CORRUPTED, 0);
      return ER_EH_CORRUPTED;
    }

  *out_compare_result_p = compare_result;
  return NO_ERROR;
}

static bool
ehash_binary_search_bucket (THREAD_ENTRY * thread_p, PAGE_PTR bucket_page_p, PGSLOTID num_record, DB_TYPE key_type,
                void *key_p, PGSLOTID * out_position_p)
{
  char *bucket_record_p;
  RECDES recdes;
  PGSLOTID low, high;
  PGSLOTID middle;
  int compare_result;

  low = 1;
  high = num_record;

  do
    {
      middle = (high + low) >> 1;

      if (spage_get_record (thread_p, bucket_page_p, middle, &recdes, PEEK) != S_SUCCESS)
    {
      er_set (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, ER_EH_CORRUPTED, 0);
      return false;
    }

      bucket_record_p = (char *) recdes.data;
      bucket_record_p += sizeof (OID);

      if (ehash_compare_key (thread_p, bucket_record_p, key_type, key_p, recdes.type, &compare_result) != NO_ERROR)
    {
      return false;
    }

      if (compare_result == 0)
    {
      *out_position_p = middle;
      return true;
    }

      if (compare_result < 0)
    {
      high = middle - 1;
    }
      else
    {
      low = middle + 1;
    }
    }
  while (high >= low);

  if (high < middle)
    {
      *out_position_p = middle;
    }
  else
    {
      /* Key is NOT in bucket; it should be located at the right of middle key. */
      *out_position_p = middle + 1;
    }

  return false;
}

/*
 * ehash_locate_slot () - Locate the slot for the key
 *   return: int
 *   buc_pgptr(in): pointer to the bucket page
 *   key_type(in):
 *   key(in): key value to search
 *   position(out): set to the location of the slot that contains the key if
 *                  it exists, or that would contain the key if it does not.
 *
 * Note: This function locates a specific key in a bucket
 * (namely, the slot number in the page). Currently this search
 * is done by using the binary search method on the real key
 * values. An alternative method is to use another hashing
 * mechanism on the pseudo keys. If the key does not exist,
 * position is set to the SLOTID value where it could be
 * inserted and false is returned.
 */
static bool
ehash_locate_slot (THREAD_ENTRY * thread_p, PAGE_PTR bucket_page_p, DB_TYPE key_type, void *key_p,
           PGSLOTID * out_position_p)
{
  RECDES recdes;
  PGSLOTID num_record;
  PGSLOTID first_slot_id = -1;

  num_record = spage_number_of_records (bucket_page_p) - 1;

  /*
   * If the bucket does not contain any records other than the header,
   * then return immediately.
   */
  if (num_record < 1)
    {
      if (spage_next_record (bucket_page_p, &first_slot_id, &recdes, PEEK) != S_SUCCESS)
    {
      er_set (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, ER_EH_CORRUPTED, 0);
      *out_position_p = 1;
      return false;
    }

      *out_position_p = first_slot_id + 1;
      return false;
    }

  return ehash_binary_search_bucket (thread_p, bucket_page_p, num_record, key_type, key_p, out_position_p);
}

static int
ehash_get_pseudo_key (THREAD_ENTRY * thread_p, RECDES * recdes_p, DB_TYPE key_type, EHASH_HASH_KEY * out_hash_key_p)
{
  EHASH_HASH_KEY hash_key;
  char *bucket_record_p;
#if defined (ENABLE_UNUSED_FUNCTION)
  char *whole_key_p;
#endif

#if defined (ENABLE_UNUSED_FUNCTION)
  if (recdes_p->type == REC_BIGONE)
    {
      /*
       * The record contains a long string Compose the whole key and find
       * pseudo key by using the whole key
       */
      whole_key_p = ehash_compose_overflow (thread_p, recdes_p);
      if (whole_key_p == NULL)
    {
      return ER_FAILED;
    }
      hash_key = ehash_hash (whole_key_p, key_type);
      free_and_init (whole_key_p);
    }
  else
#endif
    {
      bucket_record_p = (char *) recdes_p->data;
      bucket_record_p += sizeof (OID);
      hash_key = ehash_hash (bucket_record_p, key_type);
    }

  *out_hash_key_p = hash_key;
  return NO_ERROR;
}

static int
ehash_find_first_bit_position (THREAD_ENTRY * thread_p, EHASH_DIR_HEADER * dir_header_p, PAGE_PTR bucket_page_p,
                   EHASH_BUCKET_HEADER * bucket_header_p, void *key_p, int num_recs, PGSLOTID first_slot_id,
                   int *out_old_local_depth_p, int *out_new_local_depth_p)
{
  int bit_position;
  unsigned int i;
  unsigned int difference = 0, check_bit = 0;
  EHASH_HASH_KEY first_hash_key, next_hash_key;
  PGSLOTID slot_id;
  RECDES recdes;

  check_bit = 1 << (EHASH_HASH_KEY_BITS - bucket_header_p->local_depth - 1);

  /* Get the first pseudo key */
  first_hash_key = ehash_hash (key_p, dir_header_p->key_type);

  for (slot_id = first_slot_id + 1; slot_id < num_recs; slot_id++)
    {
      if (spage_get_record (thread_p, bucket_page_p, slot_id, &recdes, PEEK) != S_SUCCESS)
    {
      er_set (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, ER_EH_CORRUPTED, 0);
      return ER_EH_CORRUPTED;
    }

      if (ehash_get_pseudo_key (thread_p, &recdes, dir_header_p->key_type, &next_hash_key) != NO_ERROR)
    {
      return ER_FAILED;
    }

      difference = (first_hash_key ^ next_hash_key) | difference;
      if (difference & check_bit)
    {
      break;
    }
    }

  /* Initilize bit_position to one greater than the old local depth */
  bit_position = *out_old_local_depth_p + 1;

  /* Find out the correct bit_position that the keys differ */
  for (i = bucket_header_p->local_depth + 1; i <= EHASH_HASH_KEY_BITS; i++)
    {
      if (difference & check_bit)
    {
      bit_position = i;
      break;
    }

      /* Shift the check bit position to right */
      check_bit >>= 1;
    }

  /* Update the bucket header and the variable parameter to new local depth */
  bucket_header_p->local_depth = bit_position;
  *out_new_local_depth_p = bit_position;

  return NO_ERROR;
}

static int
ehash_distribute_records_into_two_bucket (THREAD_ENTRY * thread_p, EHASH_DIR_HEADER * dir_header_p,
                      PAGE_PTR bucket_page_p, EHASH_BUCKET_HEADER * bucket_header_p, int num_recs,
                      PGSLOTID first_slot_id, PAGE_PTR sibling_page_p)
{
  PGSLOTID slot_id, sibling_slot_id;
  RECDES recdes;
  EHASH_HASH_KEY hash_key;
  int i, success;

  for (slot_id = i = first_slot_id + 1; i < num_recs; i++)
    {
      if (spage_get_record (thread_p, bucket_page_p, slot_id, &recdes, PEEK) != S_SUCCESS)
    {
      return ER_FAILED;
    }

      if (ehash_get_pseudo_key (thread_p, &recdes, dir_header_p->key_type, &hash_key) != NO_ERROR)
    {
      return ER_FAILED;
    }

      if (GETBIT (hash_key, bucket_header_p->local_depth))
    {
      success = spage_insert (thread_p, sibling_page_p, &recdes, &sibling_slot_id);
      if (success != SP_SUCCESS)
        {
#ifdef EHASH_DEBUG
          er_log_debug (ARG_FILE_LINE,
                "Unable to move the record from the bucket to empty sibling bucket. Slotted page module"
                " refuses to insert a short size record to an empty page. This should never happen.");
#endif
          er_set (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, ER_GENERIC_ERROR, 0);

          return ER_FAILED;
        }
      (void) spage_delete (thread_p, bucket_page_p, slot_id);
      /*
       * slotid is not changed since the current slot is deleted and the
       * next slot will take its slotid.
       */
    }
      else
    {
      /* skip the slot */
      slot_id++;
    }
    }

  return NO_ERROR;
}

/*
 * ehash_split_bucket () - Split a bucket into two
 *   return: PAGE_PTR (Sibling Pageptr), NULL in case of error in split.
 *   dir_header(in): directory header;
 *   buc_pgptr(in): pointer to bucket page to split
 *   key(in): key value to insert after split
 *   old_local_depth(in): old local depth before the split operation; to be set
 *   new_local_depth(in): new local depth after the split operation; to be set
 *   sib_vpid(in): vpid of the sibling bucket; to be set
 *   is_temp(in): true if file is temporary (logging not required)
 *
 * Note: This function splits the given bucket into two buckets.
 * First, the new local depth of the bucket is found. Then a
 * sibling bucket is allocated, and the records are
 * redistributed. The contents of the sibling bucket is logged
 * for recovery purposes. (Note that the contents of the original
 * bucket page that is splitting is logged in the insertion
 * function after this function returns. Thus, this function
 * does not release the buffer that contains this page). Finally,
 * the given (key & assoc_value) pair is tried to be inserted
 * onto the correct bucket. The result of this attempt is
 * returned. (Note that this split may cause only one very
 * small record separated from the others, and the new record
 * may still be too big to fit into the bucket.) In addition to
 * the return value, three parameters are set so that the
 * directory can be updated to reflect this split.
 */
static PAGE_PTR
ehash_split_bucket (THREAD_ENTRY * thread_p, EHASH_DIR_HEADER * dir_header_p, PAGE_PTR bucket_page_p, void *key_p,
            int *out_old_local_depth_p, int *out_new_local_depth_p, VPID * sibling_vpid_p, bool is_temp)
{
  EHASH_BUCKET_HEADER *bucket_header_p;
  VFID bucket_vfid;
  PAGE_PTR sibling_page_p;
  RECDES recdes;
  PGSLOTID first_slot_id = -1;
  int num_records;
  char init_bucket_data[3];

  int error_code = NO_ERROR;

  if (!is_temp)
    {
      /* Log the contents of the bucket page before the split operation */
      log_append_undo_data2 (thread_p, RVEH_REPLACE, &dir_header_p->bucket_file, bucket_page_p, 0, DB_PAGESIZE,
                 bucket_page_p);
    }

  num_records = spage_number_of_records (bucket_page_p);
  /* Retrieve the bucket header and update it */
  if (spage_next_record (bucket_page_p, &first_slot_id, &recdes, PEEK) != S_SUCCESS)
    {
      er_set (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, ER_EH_CORRUPTED, 0);
      return NULL;
    }

  bucket_header_p = (EHASH_BUCKET_HEADER *) recdes.data;
  *out_old_local_depth_p = bucket_header_p->local_depth;

  if (ehash_find_first_bit_position (thread_p, dir_header_p, bucket_page_p, bucket_header_p, key_p, num_records,
                     first_slot_id, out_old_local_depth_p, out_new_local_depth_p) != NO_ERROR)
    {
      return NULL;
    }

  bucket_vfid = dir_header_p->bucket_file;
  init_bucket_data[0] = dir_header_p->alignment;
  init_bucket_data[1] = *out_new_local_depth_p;
  init_bucket_data[2] = is_temp;

  error_code = file_alloc (thread_p, &bucket_vfid, ehash_initialize_bucket_new_page, init_bucket_data, sibling_vpid_p,
               &sibling_page_p);
  if (error_code != NO_ERROR)
    {
      ASSERT_ERROR ();
      return NULL;
    }
  if (sibling_page_p == NULL)
    {
      assert_release (false);
      return NULL;
    }

  if (ehash_distribute_records_into_two_bucket (thread_p, dir_header_p, bucket_page_p, bucket_header_p, num_records,
                        first_slot_id, sibling_page_p) != NO_ERROR)
    {
      pgbuf_unfix_and_init (thread_p, sibling_page_p);
      if (file_dealloc (thread_p, &bucket_vfid, sibling_vpid_p, FILE_EXTENDIBLE_HASH) != NO_ERROR)
    {
      ASSERT_ERROR ();
    }
      VPID_SET_NULL (sibling_vpid_p);
      return NULL;
    }

  if (!is_temp)
    {
      /*
       * TODO: We are logging too much. It is unlikely that the page is full.
       *       we just distribute the keys among two buckets. It may be better
       *       to use crumbs to log different portions of the page.
       */
      log_append_redo_data2 (thread_p, RVEH_REPLACE, &dir_header_p->bucket_file, bucket_page_p, 0, DB_PAGESIZE,
                 bucket_page_p);
      log_append_redo_data2 (thread_p, RVEH_REPLACE, &dir_header_p->bucket_file, sibling_page_p, 0, DB_PAGESIZE,
                 sibling_page_p);
    }

  pgbuf_set_dirty (thread_p, sibling_page_p, DONT_FREE);
  pgbuf_set_dirty (thread_p, bucket_page_p, DONT_FREE);

  return sibling_page_p;
}

/*
 * ehash_expand_directory () - Expand directory
 *   return: NO_ERROR, or ER_FAILED
 *   ehid(in): identifier for the extendible hashing structure to expand
 *   new_depth(in): requested new depth
 *   is_temp(in): true if file is temporary (logging not required)
 *
 * Note: This function expands the given extendible hashing directory
 * as many times as necessary to attain the requested depth.
 * This is performed with a single pass on the original
 * directory (starting from the last pointer and working through
 * the first one).
 * During this operation, the directory is locked with X lock to
 * prevent others from accessing wrong information. If the
 * original directory is not large enough some new pages are
 * allocated for the new directory. If the disk becomes full
 * during this operation, or any system error occurs, the
 * directory expansion is canceled and an error code is
 * returned. For recovery purposes, the contents of the
 * directory pages are logged during the expansion operation.
 */
static int
ehash_expand_directory (THREAD_ENTRY * thread_p, EHID * ehid_p, int new_depth, bool is_temp)
{
  int old_pages;
  int old_ptrs;
  int new_pages;
  int new_ptrs;
  int check_pages;

  PAGE_PTR dir_header_page_p;
  EHASH_DIR_HEADER *dir_header_p;
  int end_offset;
  int new_end_offset;
  int needed_pages;
  int i, j;
  int exp_times;

  PAGE_PTR old_dir_page_p;
  PGLENGTH old_dir_offset;
  int old_dir_nth_page;

  PAGE_PTR new_dir_page_p;
  PGLENGTH new_dir_offset;
  int new_dir_nth_page;

  int error_code = NO_ERROR;

  dir_header_page_p = ehash_fix_ehid_page (thread_p, ehid_p, PGBUF_LATCH_WRITE);
  if (dir_header_page_p == NULL)
    {
      return ER_FAILED;
    }
  dir_header_p = (EHASH_DIR_HEADER *) dir_header_page_p;

  error_code = file_get_num_user_pages (thread_p, &ehid_p->vfid, &old_pages);
  if (error_code != NO_ERROR)
    {
      ASSERT_ERROR ();
      pgbuf_unfix_and_init (thread_p, dir_header_page_p);
      return error_code;
    }
  old_pages -= 1;       /* The first page starts with 0 */
  old_ptrs = 1 << dir_header_p->depth;
  exp_times = 1 << (new_depth - dir_header_p->depth);

  new_ptrs = old_ptrs * exp_times;
  end_offset = old_ptrs - 1;    /* Dir first pointer has an offset of 0 */
  ehash_dir_locate (&check_pages, &end_offset);

#ifdef EHASH_DEBUG
  if (check_pages != old_pages)
    {
      pgbuf_unfix_and_init (thread_p, dir_header_page_p);
      er_set (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, ER_EH_ROOT_CORRUPTED, 3, ehid_p->vfid.volid, ehid_p->vfid.fileid,
          ehid_p->pageid);
      return ER_FAILED;
    }
#endif

  /* Find how many pages the expanded directory will occupy */
  new_end_offset = new_ptrs - 1;
  ehash_dir_locate (&new_pages, &new_end_offset);
  needed_pages = new_pages - old_pages;
  if (needed_pages > 0)
    {
      error_code = file_alloc_multiple (thread_p, &ehid_p->vfid, ehash_initialize_dir_new_page, &is_temp, needed_pages,
                    NULL);
      if (error_code != NO_ERROR)
    {
      ASSERT_ERROR ();
      pgbuf_unfix_and_init (thread_p, dir_header_page_p);
      return error_code;
    }
    }

  /*****************************
     Perform expansion
   ******************************/

  /* Initialize source variables */
  old_dir_nth_page = old_pages; /* The last page of the old directory */

  old_dir_page_p = ehash_fix_nth_page (thread_p, &ehid_p->vfid, old_dir_nth_page, PGBUF_LATCH_WRITE);
  if (old_dir_page_p == NULL)
    {
      pgbuf_unfix_and_init (thread_p, dir_header_page_p);
      return ER_FAILED;
    }
  old_dir_offset = end_offset;

  /* Initialize destination variables */
  new_dir_nth_page = new_pages; /* The last page of the new directory */

  new_dir_page_p = ehash_fix_nth_page (thread_p, &ehid_p->vfid, new_dir_nth_page, PGBUF_LATCH_WRITE);
  if (new_dir_page_p == NULL)
    {
      pgbuf_unfix_and_init (thread_p, dir_header_page_p);
      pgbuf_unfix_and_init (thread_p, old_dir_page_p);
      return ER_FAILED;
    }
  new_dir_offset = new_end_offset;

  if (new_dir_nth_page <= old_pages && !is_temp)
    {
      /*
       * This page is part of old directory.
       * Log the initial content of this original directory page
       */
      log_append_undo_data2 (thread_p, RVEH_REPLACE, &ehid_p->vfid, new_dir_page_p, 0, DB_PAGESIZE, new_dir_page_p);
    }

  /* Copy old directory records to new (expanded) area */
  for (j = old_ptrs; j > 0; j--)
    {
      if (old_dir_offset < 0)
    {
      /*
       * We reached the end of the old directory page.
       * The next bucket pointer is in the previous directory page
       */
      pgbuf_unfix_and_init (thread_p, old_dir_page_p);

      /* get another page */
      old_dir_nth_page--;

      old_dir_page_p = ehash_fix_nth_page (thread_p, &ehid_p->vfid, old_dir_nth_page, PGBUF_LATCH_WRITE);
      if (old_dir_page_p == NULL)
        {
          /* Fetch error; so return */
          pgbuf_unfix_and_init (thread_p, dir_header_page_p);
          pgbuf_unfix_and_init (thread_p, new_dir_page_p);
          return ER_FAILED;
        }

      /*
       * Set the olddir_offset to the offset of the last pointer in the
       * current source directory page.
       */
      if (old_dir_nth_page)
        {
          /* This is not the first (root) directory page */
          old_dir_offset = EHASH_LAST_OFFSET_IN_NON_FIRST_PAGE;
        }
      else
        {
          /* This is the first (root) directory page */
          old_dir_offset = EHASH_LAST_OFFSET_IN_FIRST_PAGE;
        }
    }

      /* Copy the next directory record "exp_times" times */
      for (i = 1; i <= exp_times; i++)
    {
      if (new_dir_offset < 0)
        {
          /*
           * There is not any more entries in the new directory page.
           * Log this updated directory page, and get next one
           */
          if (!is_temp)
        {
          log_append_redo_data2 (thread_p, RVEH_REPLACE, &ehid_p->vfid, new_dir_page_p, 0, DB_PAGESIZE,
                     new_dir_page_p);
        }
          pgbuf_set_dirty (thread_p, new_dir_page_p, FREE);

          /* get another page */
          new_dir_nth_page--;

          new_dir_page_p = ehash_fix_nth_page (thread_p, &ehid_p->vfid, new_dir_nth_page, PGBUF_LATCH_WRITE);
          if (new_dir_page_p == NULL)
        {
          pgbuf_unfix_and_init (thread_p, dir_header_page_p);
          pgbuf_unfix_and_init (thread_p, old_dir_page_p);
          return ER_FAILED;
        }

          if (new_dir_nth_page <= old_pages && !is_temp)
        {
          /* Log the initial content of this original directory page */
          log_append_undo_data2 (thread_p, RVEH_REPLACE, &ehid_p->vfid, new_dir_page_p, 0, DB_PAGESIZE,
                     new_dir_page_p);
        }

          /* Set the newdir_offset to the offset of the last pointer in the current destination directory page. */
          if (new_dir_nth_page != 0)
        {
          /* This is not the first (root) dir page */
          new_dir_offset = EHASH_LAST_OFFSET_IN_NON_FIRST_PAGE;
        }
          else
        {
          /* This is the first (root) dir page */
          new_dir_offset = EHASH_LAST_OFFSET_IN_FIRST_PAGE;
        }
        }

      if ((char *) new_dir_page_p + new_dir_offset != (char *) old_dir_page_p + old_dir_offset)
        {
          memcpy ((char *) new_dir_page_p + new_dir_offset, (char *) old_dir_page_p + old_dir_offset,
              sizeof (EHASH_DIR_RECORD));
        }

      /* Advance the destination pointer to new spot */
      new_dir_offset -= sizeof (EHASH_DIR_RECORD);
    }

      /* Advance the source pointer to new spot */
      old_dir_offset -= sizeof (EHASH_DIR_RECORD);
    }

  /*
   * Update the directory header
   */
  dir_header_p->depth = new_depth;

  if (!is_temp)
    {
      /* Log the first directory page which also contains the directory header */
      log_append_redo_data2 (thread_p, RVEH_REPLACE, &ehid_p->vfid, new_dir_page_p, 0, DB_PAGESIZE, new_dir_page_p);
    }

  /* Release the old and new directory pages */
  pgbuf_unfix_and_init (thread_p, old_dir_page_p);
  pgbuf_set_dirty (thread_p, new_dir_page_p, FREE);

#ifdef EHASH_DEBUG
  {
    DKNPAGES npages_bucket = 0;
    DKNPAGES npages_ehash = 0;
    if (file_get_num_user_pages (thread_p, &dir_header_p->bucket_file, &npages_bucket) == NO_ERROR
    && file_get_num_user_pages (thread_p, &ehid_p->vfid, &npages_ehash) == NO_ERROR)
      {
    if (new_ptrs / npages > EHASH_BALANCE_FACTOR || npages_ehash > npages_bucket)
      {
        er_log_debug (ARG_FILE_LINE,
              "WARNING: Ext. Hash EHID=%d|%d|%d is unbalanced.\n Num_bucket_pages = %d, "
              "num_dir_pages = %d,\n num_dir_pointers = %d, directory_depth = %d.\n"
              " You may want to consider another hash function.\n", ehid_p->vfid.volid,
              ehid_p->vfid.fileid, ehid_p->pageid, npages_bucket, npages_ehash,
              new_ptrs, dir_header_p->depth);
      }
      }
  }
#endif /* EHASH_DEBUG */

  /* Release the root page holding the directory header */
  pgbuf_unfix_and_init (thread_p, dir_header_page_p);

  return NO_ERROR;
}

/*
 * ehash_connect_bucket () - Connect bucket to directory
 *   return: int NO_ERROR, or ER_FAILED
 *   ehid(in): identifier for the extendible hashing structure
 *   local_depth(in): local depth of the bucket; determines how many directory
 *                    pointers are set to point to the given bucket identifier
 *   hash_key(in): pseudo key that led to the bucket page; determines which
 *                   pointers to be updated
 *   bucket_vpid(in): Identifier of the bucket page to connect
 *   is_temp(in): true if temporary (logging is not required)
 *
 * Note: This function connects the given bucket to the directory of
 * specified extendible hashing structure. All the directory
 * pointers whose number have the same value as the hash_key
 * for the first "local_depth" bits are updated with "bucket_vpid".
 * Since these pointers are successive to each other, it is
 * known that a contagious area (possibly over several pages)
 * on the directory will be updated. Thus, this function is
 * implemented in the following way: First,  the directory pages
 * to be updated are determined. Then, these pages are retrieved,
 * updated and released one at a time. For recovery purposes, the
 * contents of these pages are logged before and after they are updated.
 */
static int
ehash_connect_bucket (THREAD_ENTRY * thread_p, EHID * ehid_p, int local_depth, EHASH_HASH_KEY hash_key,
              VPID * bucket_vpid_p, bool is_temp)
{
  EHASH_DIR_HEADER dir_header;
  EHASH_DIR_RECORD *dir_record_p;
  EHASH_REPETITION repetition;
  PAGE_PTR page_p;
  int location;
  int first_page, last_page;
  int first_ptr_offset, last_ptr_offset;
  int start_offset, end_offset;
  int diff;
  int i, j;
  int bef_length;
  unsigned int set_bits;
  unsigned int clear_bits;

  repetition.vpid = *bucket_vpid_p;

  page_p = ehash_fix_ehid_page (thread_p, ehid_p, PGBUF_LATCH_READ);
  if (page_p == NULL)
    {
      return ER_FAILED;
    }

  memcpy (&dir_header, page_p, EHASH_DIR_HEADER_SIZE);
  pgbuf_unfix_and_init (thread_p, page_p);

  /* First find out how many page entries will be updated in the directory */
  location = GETBITS (hash_key, 1, dir_header.depth);

  diff = dir_header.depth - local_depth;
  if (diff != 0)
    {
      /* There are more than one pointers that need to be updated */
      for (set_bits = 0, i = 0; i < diff; i++)
    {
      set_bits = 2 * set_bits + 1;
    }
      clear_bits = ~set_bits;

      first_ptr_offset = location & clear_bits;
      last_ptr_offset = location | set_bits;

      ehash_dir_locate (&first_page, &first_ptr_offset);
      ehash_dir_locate (&last_page, &last_ptr_offset);
    }
  else
    {
      /* There is only one pointer that needs to be updated */
      first_ptr_offset = location;
      ehash_dir_locate (&first_page, &first_ptr_offset);
      last_page = first_page;
      last_ptr_offset = first_ptr_offset;
    }

  /* Go over all of these pages */
  for (i = first_page; i <= last_page; i++)
    {
      page_p = ehash_fix_nth_page (thread_p, &ehid_p->vfid, i, PGBUF_LATCH_WRITE);
      if (page_p == NULL)
    {
      return ER_FAILED;
    }

      if (i == first_page)
    {
      start_offset = first_ptr_offset;
    }
      else
    {
      start_offset = 0;
    }

      if (i == last_page)
    {
      end_offset = last_ptr_offset + sizeof (EHASH_DIR_RECORD);
    }
      else
    {
      end_offset = DB_PAGESIZE;
    }

      /* Log this page */
      bef_length = end_offset - start_offset;
      repetition.count = bef_length / sizeof (EHASH_DIR_RECORD);

      if (!is_temp)
    {
      log_append_undoredo_data2 (thread_p, RVEH_CONNECT_BUCKET, &ehid_p->vfid, page_p, start_offset, bef_length,
                     sizeof (EHASH_REPETITION), (char *) page_p + start_offset, &repetition);
    }

      /* Update the directory page */
      dir_record_p = (EHASH_DIR_RECORD *) ((char *) page_p + start_offset);
      for (j = 0; j < repetition.count; j++)
    {
      dir_record_p->bucket_vpid = *bucket_vpid_p;
      dir_record_p++;
    }
      pgbuf_set_dirty (thread_p, page_p, FREE);
    }

  return NO_ERROR;
}

/*
 * ehash_find_depth () - Find depth
 *   return: char (will return 0 in case of error)
 *   ehid(in): identifier for the extendible hashing structure
 *   location(in): directory entry whose neighbooring area should be checked
 *   bucket_vpid(in): vpid of the bucket pointed by the specified directory entry
 *   sib_vpid(in): vpid of the sibling bucket
 *
 */
static char
ehash_find_depth (THREAD_ENTRY * thread_p, EHID * ehid_p, int location, VPID * bucket_vpid_p, VPID * sibling_vpid_p)
{
  EHASH_DIR_HEADER dir_header;
  EHASH_DIR_RECORD *dir_record_p;
  PAGE_PTR page_p;
  int loc;
  int rel_loc;
  int iterations;
  int page_no;
  int prev_page_no;
  int i;
  unsigned int clear;
  char check_depth = 2;
  bool is_stop = false;
  int check_bit;

  prev_page_no = 0;
  page_p = ehash_fix_ehid_page (thread_p, ehid_p, PGBUF_LATCH_WRITE);
  if (page_p == NULL)
    {
      return 0;
    }
  memcpy (&dir_header, page_p, EHASH_DIR_HEADER_SIZE);

  while ((check_depth <= dir_header.depth) && !is_stop)
    {
      /*
       * Find the base location for this iteration. The base location differs from
       * the original location at the check_depth bit (it has the opposite value
       * for this bit position) and at the remaining least significant bit
       * positions (it has 0 for these bit positions).
       */
      clear = 1;
      for (i = 1; i < check_depth - 1; i++)
    {
      clear <<= 1;
      clear += 1;
    }

      clear = ~clear;
      loc = location & clear;

      check_bit = GETBIT (loc, EHASH_HASH_KEY_BITS - check_depth + 1);
      if (check_bit != 0)
    {
      loc = CLEARBIT (loc, EHASH_HASH_KEY_BITS - check_depth + 1);
    }
      else
    {
      loc = SETBIT (loc, EHASH_HASH_KEY_BITS - check_depth + 1);
    }

      /* "loc" is the base_location now */

      iterations = 1 << (check_depth - 2);

      for (i = 0; i < iterations; i++)
    {
      rel_loc = loc | (i << 1);
      ehash_dir_locate (&page_no, &rel_loc);
      if (page_no != prev_page_no)
        {
          pgbuf_unfix_and_init (thread_p, page_p);
          page_p = ehash_fix_nth_page (thread_p, &ehid_p->vfid, page_no, PGBUF_LATCH_WRITE);
          if (page_p == NULL)
        {
          return 0;
        }
          prev_page_no = page_no;
        }
      dir_record_p = (EHASH_DIR_RECORD *) ((char *) page_p + rel_loc);

      if (!((VPID_ISNULL (&dir_record_p->bucket_vpid)) || VPID_EQ (&dir_record_p->bucket_vpid, bucket_vpid_p)
        || VPID_EQ (&dir_record_p->bucket_vpid, sibling_vpid_p)))
        {
          is_stop = true;
          break;
        }
    }

      if (!is_stop)
    {
      check_depth++;
    }
    }

  pgbuf_unfix_and_init (thread_p, page_p);
  return (check_depth - 1);
}

static EHASH_RESULT
ehash_check_merge_possible (THREAD_ENTRY * thread_p, EHID * ehid_p, EHASH_DIR_HEADER * dir_header_p,
                VPID * bucket_vpid_p, PAGE_PTR bucket_page_p, int location, int lock_type,
                int *out_old_local_depth_p, VPID * sibling_vpid_p, PAGE_PTR * out_sibling_page_p,
                PGSLOTID * out_first_slot_id_p, int *out_num_records_p, int *out_location_p)
{
  EHASH_BUCKET_HEADER *bucket_header_p;
  EHASH_BUCKET_HEADER sibling_bucket_header;
  PAGE_PTR sibling_page_p;

  RECDES recdes;
  PGSLOTID first_slot_id = -1;

  int num_records;
  int transfer;
  int already;
  int check_bit;
  int loc = location;

  if (spage_next_record (bucket_page_p, &first_slot_id, &recdes, PEEK) != S_SUCCESS)
    {
      er_set (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, ER_EH_CORRUPTED, 0);
      return EHASH_ERROR_OCCURRED;
    }

  bucket_header_p = (EHASH_BUCKET_HEADER *) recdes.data;

  *out_old_local_depth_p = bucket_header_p->local_depth;
  if (bucket_header_p->local_depth == 0)
    {
      /* Special case: there is only one bucket; so do not try to merge */
      return EHASH_NO_SIBLING_BUCKET;
    }

  /* find the location of sibling bucket pointer */
  check_bit = GETBIT (loc, EHASH_HASH_KEY_BITS - dir_header_p->depth + bucket_header_p->local_depth);
  if (check_bit == 0)
    {
      loc = SETBIT (loc, EHASH_HASH_KEY_BITS - dir_header_p->depth + bucket_header_p->local_depth);
    }
  else
    {
      loc = CLEARBIT (loc, EHASH_HASH_KEY_BITS - dir_header_p->depth + bucket_header_p->local_depth);
    }

  /*
   * Now, "loc" is the index of directory entry pointing to the sibling bucket
   * (of course, if the sibling bucket exists). So, find its absolute location.
   */
  if (ehash_find_bucket_vpid (thread_p, ehid_p, dir_header_p, loc, PGBUF_LATCH_READ, sibling_vpid_p) != NO_ERROR)
    {
      return EHASH_ERROR_OCCURRED;
    }

  if (VPID_ISNULL (sibling_vpid_p))
    {
      return EHASH_NO_SIBLING_BUCKET;
    }

  if (VPID_EQ (sibling_vpid_p, bucket_vpid_p))
    {
      /* Special case: there is only one bucket; so do not try to merge */
      return EHASH_NO_SIBLING_BUCKET;
    }

  sibling_page_p =
    ehash_fix_old_page (thread_p, &ehid_p->vfid, sibling_vpid_p,
            lock_type == S_LOCK ? PGBUF_LATCH_READ : PGBUF_LATCH_WRITE);

  if (sibling_page_p == NULL)
    {
      return EHASH_ERROR_OCCURRED;
    }

  /* retrieve the bucket header */
  recdes.area_size = sizeof (EHASH_BUCKET_HEADER);
  recdes.data = (char *) &sibling_bucket_header;

  first_slot_id = -1;
  if (spage_next_record (sibling_page_p, &first_slot_id, &recdes, COPY) != S_SUCCESS)
    {
      er_set (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, ER_EH_CORRUPTED, 0);
      pgbuf_unfix_and_init (thread_p, sibling_page_p);

      return EHASH_ERROR_OCCURRED;
    }

  if (sibling_bucket_header.local_depth != bucket_header_p->local_depth)
    {
      pgbuf_unfix_and_init (thread_p, sibling_page_p);

      return EHASH_NO_SIBLING_BUCKET;
    }

  /* Check free space of sibling */

  num_records = spage_number_of_records (bucket_page_p);

  if (num_records > 1)
    {
      /* Bucket page contains records that needs to be moved to the sibling page */
      transfer =
    (DB_PAGESIZE - (sizeof (EHASH_BUCKET_HEADER) + 2) - spage_max_space_for_new_record (thread_p, bucket_page_p));

      already = (DB_PAGESIZE - spage_max_space_for_new_record (thread_p, sibling_page_p));

      if ((already + transfer) > EHASH_OVERFLOW_THRESHOLD)
    {
      pgbuf_unfix_and_init (thread_p, sibling_page_p);
      return EHASH_FULL_SIBLING_BUCKET;
    }
    }

  if (lock_type == S_LOCK)
    {
      pgbuf_unfix_and_init (thread_p, sibling_page_p);
    }
  else
    {
      *out_sibling_page_p = sibling_page_p;
      *out_first_slot_id_p = first_slot_id;
      *out_num_records_p = num_records;
      *out_location_p = loc;
    }

  return EHASH_SUCCESSFUL_COMPLETION;
}

/*
 * ehash_delete () - Delete key from ext. hashing
 *   return: void * (NULL is returned in case of error)
 *   ehid(in): identifier for the extendible hashing structure
 *   key(in): Key value to remove
 *
 * Note: Deletes the given key value (together with its assoc_value)
 * from the specified extendible hashing structure. If the key
 * does not exist then it returns an error code.
 * After it removes the entry from the bucket page, it produces
 * an operational log for this deletion operation. Then, it
 * checks the condition of the bucket. If the bucket is empty
 * or underflown, it releases the locks on the extendible
 * hashing structure and calls another function to merge the
 * bucket with its sibling bucket.
 */
void *
ehash_delete (THREAD_ENTRY * thread_p, EHID * ehid_p, void *key_p)
{
  EHASH_DIR_HEADER *dir_header_p;
  PAGE_PTR dir_root_page_p;
  PAGE_PTR bucket_page_p;
  VPID ovf_vpid VPID_INITIALIZER;
  VPID bucket_vpid;
  VPID sibling_vpid;

  RECDES bucket_recdes;
  RECDES log_recdes_undo;
  RECDES log_recdes_redo;
  char *log_undo_record_p;
  char *log_redo_record_p;

  int location;
  int old_local_depth;

  EHASH_RESULT result;
  PGSLOTID max_free;
  PGSLOTID slot_no;
  bool is_long_str = false;
  bool is_temp = false;

  bool do_merge;

  if (ehid_p == NULL || key_p == NULL)
    {
      return NULL;
    }

  if (file_is_temp (thread_p, &ehid_p->vfid, &is_temp) != NO_ERROR)
    {
      ASSERT_ERROR ();
      return NULL;
    }

  dir_root_page_p =
    ehash_find_bucket_vpid_with_hash (thread_p, ehid_p, key_p, PGBUF_LATCH_READ, PGBUF_LATCH_WRITE, &bucket_vpid, NULL,
                      &location);
  if (dir_root_page_p == NULL)
    {
      return NULL;
    }

  dir_header_p = (EHASH_DIR_HEADER *) dir_root_page_p;

  if (bucket_vpid.pageid == NULL_PAGEID)
    {
      pgbuf_unfix_and_init (thread_p, dir_root_page_p);
      er_set (ER_WARNING_SEVERITY, ARG_FILE_LINE, ER_EH_UNKNOWN_KEY, 0);
      return NULL;
    }
  else
    {
      bucket_page_p = ehash_fix_old_page (thread_p, &ehid_p->vfid, &bucket_vpid, PGBUF_LATCH_WRITE);
      if (bucket_page_p == NULL)
    {
      pgbuf_unfix_and_init (thread_p, dir_root_page_p);
      return NULL;
    }

      /* Try to delete key from buc_page */

      /* Check if deletion possible, or not */
      if (ehash_locate_slot (thread_p, bucket_page_p, dir_header_p->key_type, key_p, &slot_no) == false)
    {
      /* Key does not exist, so return errorcode */
      pgbuf_unfix_and_init (thread_p, bucket_page_p);
      pgbuf_unfix_and_init (thread_p, dir_root_page_p);
      er_set (ER_WARNING_SEVERITY, ARG_FILE_LINE, ER_EH_UNKNOWN_KEY, 0);
      return NULL;
    }
      else
    {
      /* Key exists in the bucket */
      (void) spage_get_record (thread_p, bucket_page_p, slot_no, &bucket_recdes, PEEK);

      /* Prepare the undo log record */
      if (ehash_allocate_recdes (&log_recdes_undo, bucket_recdes.length + sizeof (EHID), REC_HOME) == NULL)
        {
          pgbuf_unfix_and_init (thread_p, bucket_page_p);
          pgbuf_unfix_and_init (thread_p, dir_root_page_p);
          return NULL;
        }

      log_undo_record_p = log_recdes_undo.data;

      /* First insert the Ext. Hashing identifier */
      log_undo_record_p = ehash_write_ehid_to_record (log_undo_record_p, ehid_p);

      /* Copy (the assoc-value, key) pair from the bucket record */
      memcpy (log_undo_record_p, bucket_recdes.data, bucket_recdes.length);

      /* Prepare the redo log record */
      if (ehash_allocate_recdes (&log_recdes_redo, bucket_recdes.length + sizeof (short), REC_HOME) == NULL)
        {
          pgbuf_unfix_and_init (thread_p, bucket_page_p);
          pgbuf_unfix_and_init (thread_p, dir_root_page_p);
          return NULL;
        }

      log_redo_record_p = log_recdes_redo.data;

      /* First insert the key type */
      *(short *) log_redo_record_p = bucket_recdes.type;
      log_redo_record_p += sizeof (short);

      /* Copy (the assoc-value, key) pair from the bucket record */
      memcpy (log_redo_record_p, bucket_recdes.data, bucket_recdes.length);

#if defined (ENABLE_UNUSED_FUNCTION)
      /* Check if the record contains a long string */
      if (bucket_recdes.type == REC_BIGONE)
        {
          ovf_vpid = *(VPID *) (bucket_recdes.data + sizeof (OID));
          is_long_str = true;
          /* Overflow pages needs to be removed, too. But, that shoud be done after the bucket record deletion has
           * been logged. This way, we will have the overflow pages available when logical undo-delete recovery
           * operation is called. Here just save the overflow page(s) id. */
        }
#endif

      /* Delete the bucket record from the bucket */
      (void) spage_delete (thread_p, bucket_page_p, slot_no);
      pgbuf_set_dirty (thread_p, bucket_page_p, DONT_FREE);

      /* Check the condition of bucket after the deletion */
      if (spage_number_of_records (bucket_page_p) <= 1)
        {
          /* Bucket became empty after this deletion */
          result = EHASH_BUCKET_EMPTY;
        }
      else
        {
          max_free = spage_max_space_for_new_record (thread_p, bucket_page_p);

          /* Check if deletion had caused the Bucket to underflow, or not */
          if ((DB_PAGESIZE - max_free) < EHASH_UNDERFLOW_THRESHOLD)
        {
          /* Yes bucket has underflown */
          result = EHASH_BUCKET_UNDERFLOW;
        }
          else
        {
          /* Bucket is just fine */
          result = EHASH_SUCCESSFUL_COMPLETION;
        }
        }

      /* Log this deletion operation */

      if (!is_temp)
        {
          log_append_undo_data2 (thread_p, RVEH_DELETE, &ehid_p->vfid, NULL, bucket_recdes.type,
                     log_recdes_undo.length, log_recdes_undo.data);

          log_append_redo_data2 (thread_p, RVEH_DELETE, &ehid_p->vfid, bucket_page_p, slot_no,
                     log_recdes_redo.length, log_recdes_redo.data);
        }
      ehash_free_recdes (&log_recdes_undo);
      ehash_free_recdes (&log_recdes_redo);

      /* Remove the overflow pages, if any. */
      if (is_long_str)
        {
          (void) overflow_delete (thread_p, &dir_header_p->overflow_file, &ovf_vpid);
        }

      /* Check for the merge condition */
      do_merge = false; /* init */
      if (result == EHASH_BUCKET_EMPTY || result == EHASH_BUCKET_UNDERFLOW)
        {
          if (ehash_check_merge_possible
          (thread_p, ehid_p, dir_header_p, &bucket_vpid, bucket_page_p, location, S_LOCK, &old_local_depth,
           &sibling_vpid, NULL, NULL, NULL, NULL) == EHASH_SUCCESSFUL_COMPLETION)
        {
          do_merge = true;
        }
        }

      /* Release directory and bucket and return */
      pgbuf_unfix_and_init (thread_p, bucket_page_p);
      pgbuf_unfix_and_init (thread_p, dir_root_page_p);

      /* do merge routine */
      if (do_merge)
        {
          ehash_merge (thread_p, ehid_p, key_p, is_temp);
        }

      return (key_p);
    }
    }
}

static void
ehash_shrink_directory_if_need (THREAD_ENTRY * thread_p, EHID * ehid_p, EHASH_DIR_HEADER * dir_header_p, bool is_temp)
{
  int i;

  /* Check directory shrink condition */
  i = dir_header_p->depth;
  while (dir_header_p->local_depth_count[i] == 0)
    {
      i--;
    }

  if ((dir_header_p->depth - i) > 1)
    {
      ehash_shrink_directory (thread_p, ehid_p, i + 1, is_temp);
    }
}

static void
ehash_adjust_local_depth (THREAD_ENTRY * thread_p, EHID * ehid_p, PAGE_PTR dir_root_page_p,
              EHASH_DIR_HEADER * dir_header_p, int depth, int delta, bool is_temp)
{
  int redo_inc, undo_inc, offset;

  /* Update the directory header for local depth counters */
  dir_header_p->local_depth_count[depth] += delta;
  pgbuf_set_dirty (thread_p, dir_root_page_p, DONT_FREE);

  /* Log this change on the directory header */
  offset = CAST_BUFLEN ((char *) (dir_header_p->local_depth_count + depth) - (char *) dir_root_page_p);
  redo_inc = delta;
  undo_inc = -delta;

  if (!is_temp)
    {
      log_append_undoredo_data2 (thread_p, RVEH_INC_COUNTER, &ehid_p->vfid, dir_root_page_p, offset, sizeof (int),
                 sizeof (int), &undo_inc, &redo_inc);
    }
}

static int
ehash_merge_permanent (THREAD_ENTRY * thread_p, EHID * ehid_p, PAGE_PTR dir_root_page_p,
               EHASH_DIR_HEADER * dir_header_p, PAGE_PTR bucket_page_p, PAGE_PTR sibling_page_p,
               VPID * bucket_vpid_p, VPID * sibling_vpid_p, int num_records, int location,
               PGSLOTID first_slot_id, int *out_new_local_depth_p, bool is_temp)
{
  PGSLOTID slot_id;
  RECDES recdes;
  char *bucket_record_p;
  PGSLOTID new_slot_id;
  bool is_record_exist;
  int success;
  EHASH_BUCKET_HEADER sibling_bucket_header;
  char found_depth;

  if (!is_temp)
    {
      log_append_undo_data2 (thread_p, RVEH_REPLACE, &ehid_p->vfid, sibling_page_p, 0, DB_PAGESIZE, sibling_page_p);
    }

  /* Move records of original page into the sibling bucket */
  for (slot_id = 1; slot_id < num_records; slot_id++)
    {
      spage_get_record (thread_p, bucket_page_p, slot_id, &recdes, PEEK);
      bucket_record_p = (char *) recdes.data;
      bucket_record_p += sizeof (OID);

#if defined (ENABLE_UNUSED_FUNCTION)
      if (recdes.type == REC_BIGONE)    /* string type */
    {
      bucket_record_p = ehash_compose_overflow (thread_p, &recdes);
      if (bucket_record_p == NULL)
        {
          return ER_FAILED;
        }
    }
#endif

      is_record_exist =
    ehash_locate_slot (thread_p, sibling_page_p, dir_header_p->key_type, (void *) bucket_record_p, &new_slot_id);
      if (is_record_exist == false)
    {
      success = spage_insert_at (thread_p, sibling_page_p, new_slot_id, &recdes);
      if (success != SP_SUCCESS)
        {
          er_set (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, ER_GENERIC_ERROR, 0);
#if defined (ENABLE_UNUSED_FUNCTION)
          if (recdes.type == REC_BIGONE)
        {
          free_and_init (bucket_record_p);
        }
#endif
          return ER_FAILED;
        }
    }
      else
    {
      er_set (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, ER_EH_CORRUPTED, 0);
#if defined (ENABLE_UNUSED_FUNCTION)
      if (recdes.type == REC_BIGONE)
        {
          free_and_init (bucket_record_p);
        }
#endif
      return ER_FAILED;
    }

#if defined (ENABLE_UNUSED_FUNCTION)
      if (recdes.type == REC_BIGONE)
    {
      free_and_init (bucket_record_p);
    }
#endif
    }

  found_depth = ehash_find_depth (thread_p, ehid_p, location, bucket_vpid_p, sibling_vpid_p);
  if (found_depth == 0)
    {
      return ER_FAILED;
    }

  /* Update the sibling header */
  sibling_bucket_header.local_depth = dir_header_p->depth - found_depth;
  *out_new_local_depth_p = sibling_bucket_header.local_depth;

  /* Set the record descriptor to the sib_header */
  recdes.data = (char *) &sibling_bucket_header;
  recdes.area_size = sizeof (EHASH_BUCKET_HEADER);
  recdes.length = sizeof (EHASH_BUCKET_HEADER);
  (void) spage_update (thread_p, sibling_page_p, first_slot_id, &recdes);

  pgbuf_set_dirty (thread_p, sibling_page_p, DONT_FREE);

  if (!is_temp)
    {
      log_append_redo_data2 (thread_p, RVEH_REPLACE, &ehid_p->vfid, sibling_page_p, 0, DB_PAGESIZE, sibling_page_p);
    }

  return NO_ERROR;
}

/*
 * ehash_merge () - If possible, merge two buckets
 *   return:
 *   ehid(in): identifier for the extendible hashing structure
 *   key(in): key value that has just been removed; used to locate the bucket.
 *   is_temp(in): true if temporary (logging is not required)
 *
 * Note: This function checks to determine if it is possible (and
 * desirable) to merge the bucket (that contained the given key
 * before the deletion) with its sibling bucket. If so, it starts
 * a permanent system operation and performs the merge. And then,
 * it checks the size of the directory. If the directory is too
 * big for the remaining buckets, it shrinks the directory to
 * a more tolerable size before concluding the system permanent operation.
 */
static void
ehash_merge (THREAD_ENTRY * thread_p, EHID * ehid_p, void *key_p, bool is_temp)
{
  EHASH_DIR_HEADER *dir_header_p;
  PAGE_PTR dir_root_page_p;
  VPID bucket_vpid;
  PAGE_PTR bucket_page_p;
  PAGE_PTR sibling_page_p;
  VPID sibling_vpid;
  int location;
  int old_local_depth, new_local_depth;
  EHASH_HASH_KEY hash_key;
  int bucket_status;
  EHASH_RESULT check_result;
  PGSLOTID max_free;
  VPID null_vpid = { NULL_VOLID, NULL_PAGEID };
  PGSLOTID first_slot_id = -1;
  int num_records, sibling_location;

  dir_root_page_p =
    ehash_find_bucket_vpid_with_hash (thread_p, ehid_p, key_p, PGBUF_LATCH_WRITE, PGBUF_LATCH_WRITE, &bucket_vpid,
                      &hash_key, &location);
  if (dir_root_page_p == NULL)
    {
      return;
    }
  dir_header_p = (EHASH_DIR_HEADER *) dir_root_page_p;

  if (bucket_vpid.pageid != NULL_PAGEID)
    {
      bucket_page_p = ehash_fix_old_page (thread_p, &ehid_p->vfid, &bucket_vpid, PGBUF_LATCH_WRITE);
      if (bucket_page_p == NULL)
    {
      pgbuf_unfix_and_init (thread_p, dir_root_page_p);
      return;
    }

      /* Check the status of bucket to see if merge is still needed */
      if (spage_number_of_records (bucket_page_p) <= 1)
    {
      bucket_status = EHASH_BUCKET_EMPTY;
    }
      else
    {
      max_free = spage_max_space_for_new_record (thread_p, bucket_page_p);

      /* Check if the Bucket is underflown, or not */
      if ((DB_PAGESIZE - max_free) < EHASH_UNDERFLOW_THRESHOLD)
        {
          bucket_status = EHASH_BUCKET_UNDERFLOW;
        }
      else
        {
          pgbuf_unfix_and_init (thread_p, bucket_page_p);
          bucket_status = NO_ERROR;
        }
    }

      if (bucket_status == EHASH_BUCKET_EMPTY || bucket_status == EHASH_BUCKET_UNDERFLOW)
    {
      check_result =
        ehash_check_merge_possible (thread_p, ehid_p, dir_header_p, &bucket_vpid, bucket_page_p, location, X_LOCK,
                    &old_local_depth, &sibling_vpid, &sibling_page_p, &first_slot_id, &num_records,
                    &sibling_location);

      switch (check_result)
        {
        case EHASH_NO_SIBLING_BUCKET:
          pgbuf_unfix_and_init (thread_p, bucket_page_p);

          if ((bucket_status == EHASH_BUCKET_EMPTY) && (old_local_depth != 0))
        {
          if (!is_temp)
            {
              log_sysop_start (thread_p);
            }

          if (file_dealloc (thread_p, &dir_header_p->bucket_file, &bucket_vpid, FILE_EXTENDIBLE_HASH)
              != NO_ERROR)
            {
              ASSERT_ERROR ();
              pgbuf_unfix_and_init (thread_p, dir_root_page_p);
              if (!is_temp)
            {
              log_sysop_abort (thread_p);
            }
              return;
            }

          /* Set all pointers to the bucket to NULL */
          if (ehash_connect_bucket (thread_p, ehid_p, old_local_depth, hash_key, &null_vpid, is_temp) !=
              NO_ERROR)
            {
              pgbuf_unfix_and_init (thread_p, dir_root_page_p);
              if (!is_temp)
            {
              log_sysop_abort (thread_p);
            }
              return;
            }

          ehash_adjust_local_depth (thread_p, ehid_p, dir_root_page_p, dir_header_p, old_local_depth, -1,
                        is_temp);
          ehash_shrink_directory_if_need (thread_p, ehid_p, dir_header_p, is_temp);
          if (!is_temp)
            {
              log_sysop_commit (thread_p);
            }
        }
          break;

        case EHASH_FULL_SIBLING_BUCKET:
          /* If the sib_bucket does not have room for the remaining records of buc_page, the record has already
           * been deleted, so just release bucket page */
          pgbuf_unfix_and_init (thread_p, bucket_page_p);
          break;

        case EHASH_SUCCESSFUL_COMPLETION:
          /* Perform actual merge operation */
          if (!is_temp)
        {
          log_sysop_start (thread_p);
        }

          if (ehash_merge_permanent (thread_p, ehid_p, dir_root_page_p, dir_header_p, bucket_page_p, sibling_page_p,
                     &bucket_vpid, &sibling_vpid, num_records, sibling_location, first_slot_id,
                     &new_local_depth, is_temp) != NO_ERROR)
        {
          pgbuf_unfix_and_init (thread_p, sibling_page_p);
          pgbuf_unfix_and_init (thread_p, bucket_page_p);
          pgbuf_unfix_and_init (thread_p, dir_root_page_p);
          if (!is_temp)
            {
              log_sysop_abort (thread_p);
            }
          return;
        }

          pgbuf_unfix_and_init (thread_p, sibling_page_p);
          pgbuf_unfix_and_init (thread_p, bucket_page_p);

          if (file_dealloc (thread_p, &dir_header_p->bucket_file, &bucket_vpid, FILE_EXTENDIBLE_HASH) != NO_ERROR)
        {
          ASSERT_ERROR ();
          pgbuf_unfix_and_init (thread_p, dir_root_page_p);
          if (!is_temp)
            {
              log_sysop_abort (thread_p);
            }
          return;
        }

          ehash_adjust_local_depth (thread_p, ehid_p, dir_root_page_p, dir_header_p, old_local_depth, -2, is_temp);
          ehash_adjust_local_depth (thread_p, ehid_p, dir_root_page_p, dir_header_p, new_local_depth, 1, is_temp);
          ehash_shrink_directory_if_need (thread_p, ehid_p, dir_header_p, is_temp);

          /* Update some directory pointers */
          if (ehash_connect_bucket (thread_p, ehid_p, new_local_depth, hash_key, &sibling_vpid, is_temp)
          != NO_ERROR)
        {
          pgbuf_unfix_and_init (thread_p, dir_root_page_p);
          if (!is_temp)
            {
              log_sysop_abort (thread_p);
            }
          return;
        }

          if (!is_temp)
        {
          log_sysop_commit (thread_p);
        }
          break;

        case EHASH_ERROR_OCCURRED:
          /* There happened an error in the merge operation; so return */

          pgbuf_unfix_and_init (thread_p, bucket_page_p);
          pgbuf_unfix_and_init (thread_p, dir_root_page_p);
          return;

        default:
          /* An undefined merge result was returned; This should never happen */
          er_set (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, ER_EH_CORRUPTED, 0);
          pgbuf_unfix_and_init (thread_p, bucket_page_p);
          pgbuf_unfix_and_init (thread_p, dir_root_page_p);
          return;
        }
    }
    }

  /* Release directory and return */
  pgbuf_unfix_and_init (thread_p, dir_root_page_p);
}

/*
 * ehash_shrink_directory () - Shrink directory
 *   return:
 *   ehid(in): identifier for the extendible hashing structure to shrink
 *   new_depth(in): requested new depth of directory
 *   is_temp(in): true if temporary file (logging is not required)
 *
 * Note: This function shrinks the directory of specified extendible
 * hashing structure as many times as necessary to attain
 * the requested depth. During this operation, the directory is
 * locked with X_LOCK to prevent others from accessing wrong
 * information. If the original directory is large enough, some
 * pages are deallocated from the new directory.
 * The remaining portion of the directory is logged and updated
 * before the extra part is deallocated.
 */
static void
ehash_shrink_directory (THREAD_ENTRY * thread_p, EHID * ehid_p, int new_depth, bool is_temp)
{
  int old_pages;
  int old_ptrs;
  int new_pages;
  int new_ptrs;

  EHASH_DIR_HEADER *dir_header_p;
  int end_offset;
  int new_end_offset;
  int check_pages;
  int i;
  int times;

  PAGE_PTR old_dir_page_p;
  int old_dir_offset;
  int old_dir_nth_page;
  int prev_pg_no;

  PAGE_PTR new_dir_page_p;
  PGLENGTH new_dir_offset;
  int new_dir_nth_page;
  int ret = NO_ERROR;

  dir_header_p = (EHASH_DIR_HEADER *) ehash_fix_nth_page (thread_p, &ehid_p->vfid, 0, PGBUF_LATCH_WRITE);
  if (dir_header_p == NULL)
    {
      return;
    }

  if (dir_header_p->depth < new_depth)
    {
#ifdef EHASH_DEBUG
      er_log_debug (ARG_FILE_LINE, "WARNING in eh_shrink_dir:The directory has a depth of %d , shrink is cancelled ",
            dir_header_p->depth);
#endif
      pgbuf_unfix (thread_p, (PAGE_PTR) dir_header_p);
      return;
    }

  old_ptrs = 1 << dir_header_p->depth;
  times = 1 << (dir_header_p->depth - new_depth);
  ret = file_get_num_user_pages (thread_p, &ehid_p->vfid, &old_pages);
  if (ret != NO_ERROR)
    {
#ifdef EHASH_DEBUG
      er_log_debug (ARG_FILE_LINE, "WARNING: error reading user page number from file, shrink is cancelled.\n");
#endif
      ASSERT_ERROR ();
      pgbuf_unfix (thread_p, (PAGE_PTR) dir_header_p);
      return;
    }
  old_pages -= 1;       /* The first page starts with 0 */

  new_ptrs = old_ptrs / times;  /* Calculate how many pointers will remain */

  end_offset = old_ptrs - 1;    /* Directory first pointer has an offset of 0 */
  ehash_dir_locate (&check_pages, &end_offset);
#ifdef EHASH_DEBUG
  if (check_pages != old_pages)
    {
      er_set (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, ER_EH_ROOT_CORRUPTED, 3, ehid_p->vfid.volid, ehid_p->vfid.fileid,
          ehid_p->pageid);
      return;
    }
#endif

  /* Perform shrink */

  prev_pg_no = 0;
  old_dir_page_p = ehash_fix_nth_page (thread_p, &ehid_p->vfid, prev_pg_no, PGBUF_LATCH_WRITE);
  if (old_dir_page_p == NULL)
    {
      return;
    }

  new_dir_nth_page = 0;
  new_dir_page_p = (PAGE_PTR) dir_header_p;
  new_dir_offset = EHASH_DIR_HEADER_SIZE;

  if (!is_temp)
    {
      log_append_undo_data2 (thread_p, RVEH_REPLACE, &ehid_p->vfid, new_dir_page_p, 0, DB_PAGESIZE, new_dir_page_p);
    }

  dir_header_p->depth = new_depth;  /* Decrease the directory global depth */

  /* Copy old directory records to new (shrunk) area */
  for (i = 1; i < new_ptrs; i++)
    {
      /* Location 0 will not change */

      /* Locate the next source pointer */
      old_dir_offset = i * times;
      ehash_dir_locate (&old_dir_nth_page, &old_dir_offset);

      if (old_dir_nth_page != prev_pg_no)
    {
      /* We reached the end of the source page. The next bucket pointer is in the next Dir page */

      prev_pg_no = old_dir_nth_page;
      pgbuf_unfix_and_init (thread_p, old_dir_page_p);

      /* set olddir_pgptr to the new pgptr */
      old_dir_page_p = ehash_fix_nth_page (thread_p, &ehid_p->vfid, old_dir_nth_page, PGBUF_LATCH_WRITE);
      if (old_dir_page_p == NULL)
        {
          pgbuf_set_dirty (thread_p, new_dir_page_p, FREE);
          return;
        }
    }

      /* Advance the destination pointer to new spot */
      new_dir_offset += sizeof (EHASH_DIR_RECORD);

      if ((DB_PAGESIZE - new_dir_offset) < SSIZEOF (EHASH_DIR_RECORD))
    {
      /* There is no more place in the directory page for new bucket pointers */

      if (!is_temp)
        {
          /* Log this updated directory page */
          log_append_redo_data2 (thread_p, RVEH_REPLACE, &ehid_p->vfid, new_dir_page_p, 0, DB_PAGESIZE,
                     new_dir_page_p);
        }

      pgbuf_set_dirty (thread_p, new_dir_page_p, FREE);

      /* get another page */
      new_dir_nth_page++;

      /* set newdir_pgptr to the new pgptr */
      new_dir_page_p = ehash_fix_nth_page (thread_p, &ehid_p->vfid, new_dir_nth_page, PGBUF_LATCH_WRITE);
      if (new_dir_page_p == NULL)
        {
          pgbuf_unfix_and_init (thread_p, old_dir_page_p);
          return;
        }
      new_dir_offset = 0;

      if (!is_temp)
        {
          /* Log the initial content of this directory page */
          log_append_undo_data2 (thread_p, RVEH_REPLACE, &ehid_p->vfid, new_dir_page_p, 0, DB_PAGESIZE,
                     new_dir_page_p);
        }
    }

      /* Perform the copy operation */
      memcpy ((char *) new_dir_page_p + new_dir_offset, (char *) old_dir_page_p + old_dir_offset,
          sizeof (EHASH_DIR_RECORD));
    }

  if (!is_temp)
    {
      /* Log this updated directory page */
      log_append_redo_data2 (thread_p, RVEH_REPLACE, &ehid_p->vfid, new_dir_page_p, 0, DB_PAGESIZE, new_dir_page_p);
    }

  /* Release the source and destination pages */
  pgbuf_unfix_and_init (thread_p, old_dir_page_p);
  pgbuf_set_dirty (thread_p, new_dir_page_p, FREE);

  /* remove unwanted part of directory. */
  new_end_offset = new_ptrs - 1;
  ehash_dir_locate (&new_pages, &new_end_offset);
  ret = file_numerable_truncate (thread_p, &ehid_p->vfid, new_pages + 1);
  if (ret != NO_ERROR)
    {
      ASSERT_ERROR ();
    }
}

static EHASH_HASH_KEY
ehash_hash_string_type (char *key_p, char *original_key_p)
{
  EHASH_HASH_KEY hash_key = 0;
  char copy_psekey[12];
  int i;
  int length;
  int times;
  int remaining;
  int Int;
  char Char;
  int byte;
  char *p = NULL;
  char *new_key_p = NULL;

  length = (int) strlen (key_p);

  if (length > 0)
    {
      /* Eliminate any trailing space characters */
      if (char_isspace (*(char *) (key_p + length - 1)))
    {
      for (p = key_p + length - 1; char_isspace (*p) && (p > key_p); p--)
        {
          ;
        }
      length = (int) (p - key_p + 1);
      key_p = new_key_p = (char *) malloc (length + 1); /* + 1 is for \0 */
      if (key_p == NULL)
        {
          return 0;
        }
      memcpy (key_p, original_key_p, length);
      *(char *) (key_p + length) = '\0';
    }

      /* Takes the floor of division */
      times = length / sizeof (EHASH_HASH_KEY);

      /* Apply Folding method */
      for (i = 1; i <= times; i++)
    {
      memcpy (&Int, key_p, sizeof (int));
      hash_key += Int;  /* Add next word of original key */

      key_p += sizeof (EHASH_HASH_KEY);
    }

      remaining = length - times * sizeof (EHASH_HASH_KEY);

      /* Go over the remaining chars of key */
      for (i = 1; i <= remaining; i++)
    {
      /* shift left to align with previous content */
      Int = (int) *key_p++ << i;
      hash_key += Int;
    }

      if (new_key_p)
    {
      free_and_init (new_key_p);
    }
    }

  /* Copy the hash_key to an aux. area, to be further hashed into individual bytes */
  memcpy (&copy_psekey, &hash_key, sizeof (EHASH_HASH_KEY));
  copy_psekey[sizeof (EHASH_HASH_KEY)] = '\0';

  hash_key = 0;
  byte = mht_1strhash (copy_psekey, 509);
  hash_key = hash_key + (byte << (8 * (sizeof (EHASH_HASH_KEY) - 1)));
  byte = mht_2strhash (copy_psekey, 509);
  hash_key = hash_key + (byte << (8 * (sizeof (EHASH_HASH_KEY) - 2)));
  byte = mht_3strhash (copy_psekey, 509);
  hash_key = hash_key + (byte << (8 * (sizeof (EHASH_HASH_KEY) - 3)));

  /* Go over the chars of the given pseudo key */
  Char = '\0';
  key_p = (char *) &copy_psekey;
  for (i = 0; (unsigned int) i < sizeof (EHASH_HASH_KEY); i++)
    {
      Char += (char) *key_p++;
    }

  /* Change the first byte of the pseudo key to the SUM of all of them */
  return hash_key + Char;
}

static EHASH_HASH_KEY
ehash_hash_eight_bytes_type (char *key_p)
{
  EHASH_HASH_KEY hash_key = 0;
  unsigned int i;
  int Int;
  char Char;

  for (i = 0; i < sizeof (double) / sizeof (int); i++)
    {
      memcpy (&Int, key_p, sizeof (int));
      hash_key += htonl (Int);  /* Add next word of original key */
      key_p += sizeof (int);
    }

  /* Go over the chars of the given pseudo key */
  Char = '\0';
  key_p = (char *) &hash_key;
  for (i = 0; i < sizeof (EHASH_HASH_KEY); i++)
    {
      Char ^= (char) *key_p++;
    }

  /* Change the first byte of the pseudo key to the summation of all of them */
  memcpy (&hash_key, &Char, sizeof (char));

  return hash_key;
}

#if defined (ENABLE_UNUSED_FUNCTION)
static EHASH_HASH_KEY
ehash_hash_four_bytes_type (char *key_p)
{
  EHASH_HASH_KEY hash_key = 0;
  unsigned int i;
  unsigned short Short, *Short2;
  char Char;

  Short = ntohs (*(unsigned short *) key_p);    /* Get the left half of the int */
  Short2 = (unsigned short *) key_p;
  Short2++;

  Short += ntohs (*Short2);
  hash_key = (EHASH_HASH_KEY) (Short << EHASH_SHORT_BITS);
  hash_key += (EHASH_HASH_KEY) Short;

  /* Go over the chars of the given pseudo key */
  Char = '\0';
  key_p = (char *) &hash_key;
  for (i = 0; i < sizeof (EHASH_HASH_KEY); i++)
    {
      Char += (char) *key_p++;
    }

  /* Change the first byte of the pseudo key to the SUM of all of them */
  memcpy (&hash_key, &Char, sizeof (char));

  return hash_key;
}

static EHASH_HASH_KEY
ehash_hash_two_bytes_type (char *key_p)
{
  EHASH_HASH_KEY hash_key = 0;
  unsigned int i;
  unsigned short Short;
  char Char;

  Short = ntohs (*(unsigned short *) key_p);
  hash_key = (EHASH_HASH_KEY) (Short << EHASH_SHORT_BITS);
  hash_key += (EHASH_HASH_KEY) Short;

  /* Go over the chars of the given pseudo key */
  Char = '\0';
  key_p = (char *) &hash_key;
  for (i = 0; i < sizeof (EHASH_HASH_KEY); i++)
    {
      Char += (char) *key_p++;
    }

  /* Change the first byte of the pseudo key to the summation of all of them */
  memcpy (&hash_key, &Char, sizeof (char));

  return hash_key;
}
#endif

/*
 * ehash_hash () - Hash function
 *   return: EHASH_HASH_KEY
 *   orig_key(in): original key to encode into a pseudo key
 *   key_type(in): type of the key
 *
 * Note: This function converts the given original key into a pseudo
 * key. Since the original key is presented as a character
 * string, its conversion into a int-compatible type is essential
 * prior to performing any operation on it.
 * Depending on the "key_type", the original key might be
 * folded (for DB_TYPE_STRING, DB_TYPE_OBJECT and DB_TYPE_DOUBLE) or duplicated
 * (for DB_TYPE_SHORT) into a computer word.
 * This function does not change the value of parameter
 * orig_key, as it might be on a bucket.
 */
static EHASH_HASH_KEY
ehash_hash (void *original_key_p, DB_TYPE key_type)
{
  char *key = (char *) original_key_p;
  EHASH_HASH_KEY hash_key = 0;

  switch (key_type)
    {
    case DB_TYPE_STRING:
      hash_key = ehash_hash_string_type (key, (char *) original_key_p);
      break;

    case DB_TYPE_OBJECT:
      hash_key = ehash_hash_eight_bytes_type (key);
      break;

#if defined (ENABLE_UNUSED_FUNCTION)
    case DB_TYPE_BIGINT:
    case DB_TYPE_DOUBLE:
    case DB_TYPE_MONETARY:
      if (key_type == DB_TYPE_MONETARY)
    {
      key = (char *) &(((DB_MONETARY *) original_key_p)->amount);
    }

      hash_key = ehash_hash_eight_bytes_type (key);
      break;

    case DB_TYPE_FLOAT:
    case DB_TYPE_DATE:
    case DB_TYPE_TIME:
    case DB_TYPE_TIMESTAMP:
    case DB_TYPE_DATETIME:
    case DB_TYPE_INTEGER:
      hash_key = ehash_hash_four_bytes_type (key);
      break;

    case DB_TYPE_SHORT:
      hash_key = ehash_hash_two_bytes_type (key);
      break;
#endif
    default:
      er_set (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, ER_EH_CORRUPTED, 0);
    }

  return hash_key;
}

static int
ehash_apply_each (THREAD_ENTRY * thread_p, EHID * ehid_p, RECDES * recdes_p, DB_TYPE key_type, char *bucket_record_p,
          OID * assoc_value_p, int *out_apply_error, int (*apply_function) (THREAD_ENTRY * thread_p, void *key,
                                            void *data, void *args), void *args)
{
#if defined (ENABLE_UNUSED_FUNCTION)
  char *long_str;
#endif
  char *str_next_key_p, *temp_p;
  int key_size;
  char next_key[sizeof (DB_MONETARY)];
  int i;
  void *key_p = &next_key;

  switch (key_type)
    {
    case DB_TYPE_STRING:
#if defined (ENABLE_UNUSED_FUNCTION)
      if (recdes_p->type == REC_BIGONE)
    {
      /* This is a long string */
      long_str = ehash_compose_overflow (thread_p, recdes_p);
      if (long_str == NULL)
        {
          *out_apply_error = ER_FAILED;
          return NO_ERROR;
        }
      key_p = long_str;
    }
      else
#endif
    {
      /* Short String */
      key_size = recdes_p->length - CAST_BUFLEN (bucket_record_p - recdes_p->data);

      str_next_key_p = (char *) malloc (key_size);
      if (str_next_key_p == NULL)
        {
          er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, (size_t) key_size);
          return ER_OUT_OF_VIRTUAL_MEMORY;
        }

      temp_p = str_next_key_p;
      for (i = 0; i < key_size; i++)
        {
          *temp_p++ = *bucket_record_p++;
        }

      key_p = str_next_key_p;
    }

      break;

    case DB_TYPE_OBJECT:
      memcpy (&next_key, bucket_record_p, sizeof (OID));
      break;
#if defined (ENABLE_UNUSED_FUNCTION)
    case DB_TYPE_DOUBLE:
      OR_MOVE_DOUBLE (bucket_record_p, &next_key);
      break;

    case DB_TYPE_FLOAT:
      *((float *) &next_key) = *(float *) bucket_record_p;
      break;

    case DB_TYPE_INTEGER:
      *((int *) &next_key) = *(int *) bucket_record_p;
      break;

    case DB_TYPE_BIGINT:
      *((DB_BIGINT *) (&next_key)) = *(DB_BIGINT *) bucket_record_p;
      break;

    case DB_TYPE_SHORT:
      *((short *) &next_key) = *(short *) bucket_record_p;
      break;

    case DB_TYPE_DATE:
      *((DB_DATE *) (&next_key)) = *(DB_DATE *) bucket_record_p;
      break;

    case DB_TYPE_TIME:
      *((DB_TIME *) (&next_key)) = *(DB_TIME *) bucket_record_p;
      break;

    case DB_TYPE_TIMESTAMP:
      *((DB_UTIME *) (&next_key)) = *(DB_UTIME *) bucket_record_p;
      break;

    case DB_TYPE_DATETIME:
      *((DB_DATETIME *) (&next_key)) = *(DB_DATETIME *) bucket_record_p;
      break;

    case DB_TYPE_MONETARY:
      OR_MOVE_DOUBLE (&((DB_MONETARY *) bucket_record_p)->amount, &((DB_MONETARY *) (&next_key))->amount);
      ((DB_MONETARY *) (&next_key))->type = ((DB_MONETARY *) bucket_record_p)->type;
      break;
#endif
    default:
      /* Unspecified key type: Directory header has been corrupted */
      er_set (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, ER_EH_ROOT_CORRUPTED, 3, ehid_p->vfid.volid, ehid_p->vfid.fileid,
          ehid_p->pageid);
      return ER_EH_ROOT_CORRUPTED;
    }

  *out_apply_error = (*apply_function) (thread_p, key_p, assoc_value_p, args);

  if (key_type == DB_TYPE_STRING)
    {
      free_and_init (key_p);
    }

  return NO_ERROR;
}

/*
 * ehash_map () - Apply function to all entries
 *   return: int NO_ERROR, or ER_FAILED
 *   ehid(in): identifier of the extendible hashing structure
 *   fun(in): function to call on key, data, and arguments
 *   args(in):
 *
 */
int
ehash_map (THREAD_ENTRY * thread_p, EHID * ehid_p,
       int (*apply_function) (THREAD_ENTRY * thread_p, void *key, void *data, void *args), void *args)
{
  EHASH_DIR_HEADER *dir_header_p;
  int num_pages, bucket_page_no, num_records;
  PAGE_PTR dir_page_p, bucket_page_p = NULL;
  char *bucket_record_p;
  RECDES recdes;
  PGSLOTID slot_id, first_slot_id = NULL_SLOTID;
  OID assoc_value;
  int apply_error_code = NO_ERROR;

  if (ehid_p == NULL)
    {
      return ER_FAILED;
    }

  dir_page_p = ehash_fix_ehid_page (thread_p, ehid_p, PGBUF_LATCH_READ);
  if (dir_page_p == NULL)
    {
      return ER_FAILED;
    }

  dir_header_p = (EHASH_DIR_HEADER *) dir_page_p;
  if (file_get_num_user_pages (thread_p, &dir_header_p->bucket_file, &num_pages) != NO_ERROR)
    {
      return ER_FAILED;
    }

  /* Retrieve each bucket page and apply the given function to its entries */
  for (bucket_page_no = 0; apply_error_code == NO_ERROR && bucket_page_no < num_pages; bucket_page_no++)
    {
      first_slot_id = NULL_SLOTID;
      bucket_page_p = ehash_fix_nth_page (thread_p, &dir_header_p->bucket_file, bucket_page_no, PGBUF_LATCH_READ);
      if (bucket_page_p == NULL)
    {
      pgbuf_unfix_and_init (thread_p, dir_page_p);
      return ER_FAILED;
    }

      num_records = spage_number_of_records (bucket_page_p);

      (void) spage_next_record (bucket_page_p, &first_slot_id, &recdes, PEEK);

      /* Skip the first slot since it contains the bucket header */
      for (slot_id = first_slot_id + 1; apply_error_code == NO_ERROR && slot_id < num_records; slot_id++)
    {
      (void) spage_get_record (thread_p, bucket_page_p, slot_id, &recdes, PEEK);
      bucket_record_p = (char *) recdes.data;
      bucket_record_p = ehash_read_oid_from_record (bucket_record_p, &assoc_value);

      if (ehash_apply_each (thread_p, ehid_p, &recdes, dir_header_p->key_type, bucket_record_p, &assoc_value,
                &apply_error_code, apply_function, args) != NO_ERROR)
        {
          pgbuf_unfix_and_init (thread_p, bucket_page_p);
          pgbuf_unfix_and_init (thread_p, dir_page_p);
          return ER_FAILED;
        }
    }
      pgbuf_unfix_and_init (thread_p, bucket_page_p);
    }

  pgbuf_unfix_and_init (thread_p, dir_page_p);

  return apply_error_code;
}

/*
 * Debugging functions
 */

/*
 * ehash_dump () - Dump directory & all buckets
 *   return:
 *   ehid(in): identifier for the extendible hashing structure to dump
 *
 * Note: A debugging function. Dumps the contents of the directory
 * and the buckets of specified ext. hashing structure.
 */
void
ehash_dump (THREAD_ENTRY * thread_p, EHID * ehid_p)
{
  EHASH_DIR_HEADER *dir_header_p;
  EHASH_DIR_RECORD *dir_record_p;
  int num_pages;
  int num_ptrs;

  int check_pages;
  int end_offset;
  int i;

  PAGE_PTR dir_page_p, dir_root_page_p;
  PGLENGTH dir_offset;
  int dir_page_no;
  int dir_ptr_no;

  PAGE_PTR bucket_page_p;
  int bucket_page_no;

  if (ehid_p == NULL)
    {
      return;
    }

  dir_root_page_p = ehash_fix_ehid_page (thread_p, ehid_p, PGBUF_LATCH_READ);
  if (dir_root_page_p == NULL)
    {
      return;
    }

  dir_header_p = (EHASH_DIR_HEADER *) dir_root_page_p;

  if (file_get_num_user_pages (thread_p, &ehid_p->vfid, &num_pages) != NO_ERROR)
    {
      ASSERT_ERROR ();
      return;
    }
  num_pages -= 1;       /* The first page starts with 0 */

  num_ptrs = 1 << dir_header_p->depth;
  end_offset = num_ptrs - 1;    /* Directory first pointer has an offset of 0 */
  ehash_dir_locate (&check_pages, &end_offset);

#ifdef EHASH_DEBUG
  if (check_pages != num_pages)
    {
      pgbuf_unfix_and_init (thread_p, dir_root_page_p);
      er_set (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, ER_EH_ROOT_CORRUPTED, 3, ehid_p->vfid.volid, ehid_p->vfid.fileid,
          ehid_p->pageid);
      return;
    }
#endif


  printf ("*********************************************************\n");
  printf ("*                      DIRECTORY                        *\n");
  printf ("*                                                       *\n");
  printf ("*    Depth    :  %d                                      *\n", dir_header_p->depth);
  printf ("*    Key type : ");

  switch (dir_header_p->key_type)
    {
    case DB_TYPE_STRING:
      printf (" string                                 *\n");
      break;

    case DB_TYPE_OBJECT:
      printf (" OID                                 *\n");
      break;
#if defined (ENABLE_UNUSED_FUNCTION)
    case DB_TYPE_DOUBLE:
      printf (" double                                 *\n");
      break;

    case DB_TYPE_FLOAT:
      printf (" float                                 *\n");
      break;

    case DB_TYPE_INTEGER:
      printf (" int                                    *\n");
      break;

    case DB_TYPE_BIGINT:
      printf (" BIGINT                                 *\n");
      break;

    case DB_TYPE_SHORT:
      printf (" short                                  *\n");
      break;

    case DB_TYPE_DATE:
      printf (" date                                   *\n");
      break;

    case DB_TYPE_TIME:
      printf (" time                                   *\n");
      break;

    case DB_TYPE_TIMESTAMP:
      printf (" utime                                  *\n");
      break;

    case DB_TYPE_DATETIME:
      printf (" datetime                               *\n");
      break;

    case DB_TYPE_MONETARY:
      printf (" monetary                               *\n");
      break;
#endif
    default:
      /* Unspecified key type: Directory header has been corrupted */
      er_set (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, ER_EH_ROOT_CORRUPTED, 3, ehid_p->vfid.volid, ehid_p->vfid.fileid,
          ehid_p->pageid);
      pgbuf_unfix_and_init (thread_p, dir_root_page_p);
      return;
    }

  printf ("*    Key size :  %d                                      *\n", ehash_get_key_size (dir_header_p->key_type));
  printf ("*                                                       *\n");
  printf ("*               LOCAL DEPTH COUNTERS                    *\n");
  for (i = 0; (unsigned int) i <= EHASH_HASH_KEY_BITS; i++)
    {
      if (dir_header_p->local_depth_count[i] != 0)
    {
      fprintf (stdout, "*    There are %d ", dir_header_p->local_depth_count[i]);
      fprintf (stdout, " buckets with local depth %d             *\n", i);
    }
    }

  printf ("*                                                       *\n");
  printf ("*                      POINTERS                         *\n");
  printf ("*                                                       *\n");

  /* Print directory */

  dir_offset = EHASH_DIR_HEADER_SIZE;
  dir_page_no = 0;
  dir_ptr_no = 0;

  dir_page_p = ehash_fix_ehid_page (thread_p, ehid_p, PGBUF_LATCH_READ);
  if (dir_page_p == NULL)
    {
      pgbuf_unfix_and_init (thread_p, dir_root_page_p);
      return;
    }

  for (i = 0; i < num_ptrs; i++)
    {
      if (DB_PAGESIZE - dir_offset < SSIZEOF (EHASH_DIR_RECORD))
    {
      /* We reached the end of the directory page. The next bucket pointer is in the next directory page. */

      /* Release previous page, and unlock it */
      pgbuf_unfix_and_init (thread_p, dir_page_p);

      dir_page_no++;

      /* Get another page */
      dir_page_p = ehash_fix_nth_page (thread_p, &ehid_p->vfid, dir_page_no, PGBUF_LATCH_READ);
      if (dir_page_p == NULL)
        {
          return;
        }

      dir_offset = 0;
    }

      /* Print out the next directory record */
      dir_record_p = (EHASH_DIR_RECORD *) ((char *) dir_page_p + dir_offset);

      if (VPID_ISNULL (&dir_record_p->bucket_vpid))
    {
      printf ("*    Dir loc :  %d   points to bucket page  id: NULL    *\n", dir_ptr_no);
    }
      else
    {
      printf ("*    Dir loc :  %d   points to vol:%d bucket pgid: %d  *\n", dir_ptr_no,
          dir_record_p->bucket_vpid.volid, dir_record_p->bucket_vpid.pageid);
    }

      dir_ptr_no++;
      dir_offset += sizeof (EHASH_DIR_RECORD);
    }

  /* Release last page */
  pgbuf_unfix_and_init (thread_p, dir_page_p);

  printf ("*                                                       *\n");
  printf ("*********************************************************\n");

  /* Print buckets */

  if (file_get_num_user_pages (thread_p, &dir_header_p->bucket_file, &num_pages) != NO_ERROR)
    {
      ASSERT_ERROR ();
      return;
    }
  num_pages -= 1;

  for (bucket_page_no = 0; bucket_page_no <= num_pages; bucket_page_no++)
    {
      bucket_page_p = ehash_fix_nth_page (thread_p, &dir_header_p->bucket_file, bucket_page_no, PGBUF_LATCH_READ);
      if (bucket_page_p == NULL)
    {
      pgbuf_unfix_and_init (thread_p, dir_root_page_p);
      return;
    }

      printf ("\n\n");
      ehash_dump_bucket (thread_p, bucket_page_p, dir_header_p->key_type);
      pgbuf_unfix_and_init (thread_p, bucket_page_p);
    }

  pgbuf_unfix_and_init (thread_p, dir_root_page_p);
  return;
}

#if defined(ENABLE_UNUSED_FUNCTION)
/*
 * ehash_print_bucket () - Retrieve the bucket and print its contents
 *   return:
 *   ehid(in): identifier for the extendible hashing structure
 *   nth_ptr(in): which pointer of the directory points to the bucket
 *
 * Note: A debugging function. Prints out the contents of the bucket
 * pointed by the "nth_ptr" pointer.
 */
void
ehash_print_bucket (THREAD_ENTRY * thread_p, EHID * ehid_p, int nth_ptr)
{
  EHASH_DIR_HEADER *dir_header_p;
  PAGE_PTR dir_root_page_p;
  VPID bucket_vpid;
  PAGE_PTR bucket_page_p;

  dir_root_page_p = ehash_fix_ehid_page (thread_p, ehid_p, PGBUF_LATCH_READ);
  if (dir_root_page_p == NULL)
    {
      return;
    }

  dir_header_p = (EHASH_DIR_HEADER *) dir_root_page_p;

  if (ehash_find_bucket_vpid (thread_p, ehid_p, dir_header_p, nth_ptr, PGBUF_LATCH_WRITE, &bucket_vpid) != NO_ERROR)
    {
      pgbuf_unfix_and_init (thread_p, dir_root_page_p);
      return;
    }

  bucket_page_p = ehash_fix_old_page (thread_p, &ehid_p->vfid, &bucket_vpid, PGBUF_LATCH_READ);
  if (bucket_page_p == NULL)
    {
      pgbuf_unfix_and_init (thread_p, dir_root_page_p);
      return;
    }
  ehash_dump_bucket (thread_p, bucket_page_p, dir_header_p->key_type);

  pgbuf_unfix_and_init (thread_p, bucket_page_p);
  pgbuf_unfix_and_init (thread_p, dir_root_page_p);

  return;
}
#endif

/*
 * ehash_dump_bucket () - Print the bucket's contents
 *   return:
 *   buc_pgptr(in): bucket page whose contents is going to be dumped
 *   key_type(in): type of the key
 *
 * Note: A debugging function. Prints out the contents of the given bucket.
 */
static void
ehash_dump_bucket (THREAD_ENTRY * thread_p, PAGE_PTR bucket_page_p, DB_TYPE key_type)
{
  EHASH_BUCKET_HEADER *bucket_header_p;
  char *bucket_record_p;
  RECDES recdes;
  PGSLOTID slot_id, first_slot_id = -1;
  int key_size;
  OID assoc_value;
  int num_records;
  int i;
#if defined (ENABLE_UNUSED_FUNCTION)
  VPID *ovf_vpid_p;
  int hour, minute, second, millisecond, month, day, year;
  double d;
#endif

  (void) spage_next_record (bucket_page_p, &first_slot_id, &recdes, PEEK);
  bucket_header_p = (EHASH_BUCKET_HEADER *) recdes.data;

  printf ("************************************************************\n");
  printf ("*  local_depth : %d                                         *\n", bucket_header_p->local_depth);
  printf ("*  no. records : %d                                         *\n",
      spage_number_of_records (bucket_page_p) - 1);
  printf ("*                                                          *\n");
  printf ("*   No          Key                   Assoc Value          *\n");
  printf ("*  ====   =====================    ==================      *\n");

  num_records = spage_number_of_records (bucket_page_p);

  for (slot_id = 1; slot_id < num_records; slot_id++)
    {
      printf ("*   %2d", slot_id);

      spage_get_record (thread_p, bucket_page_p, slot_id, &recdes, PEEK);
      bucket_record_p = (char *) recdes.data;
      bucket_record_p = ehash_read_oid_from_record (bucket_record_p, &assoc_value);

      switch (key_type)
    {

    case DB_TYPE_STRING:
#if defined (ENABLE_UNUSED_FUNCTION)
      if (recdes.type == REC_BIGONE)
        {
          ovf_vpid_p = (VPID *) bucket_record_p;
          bucket_record_p += sizeof (VPID);

          printf ("    ");
          for (i = 0; i < EHASH_LONG_STRING_PREFIX_SIZE; i++)
        {
          putchar (*(bucket_record_p++));
        }
          printf ("  ovf: %4d | %1d  ", ovf_vpid_p->pageid, ovf_vpid_p->volid);
        }
      else
#endif
        {
          key_size = recdes.length - CAST_BUFLEN (bucket_record_p - recdes.data);
          printf ("    %s", bucket_record_p);
          for (i = 0; i < (29 - key_size); i++)
        {
          printf (" ");
        }
        }
      break;

    case DB_TYPE_OBJECT:
      printf ("   (%5d,%5d,%5d)          ", ((OID *) bucket_record_p)->volid, ((OID *) bucket_record_p)->pageid,
          ((OID *) bucket_record_p)->slotid);
      break;
#if defined (ENABLE_UNUSED_FUNCTION)
    case DB_TYPE_DOUBLE:
      OR_MOVE_DOUBLE (bucket_record_p, &d);
      printf ("    %20f      ", d);
      break;

    case DB_TYPE_FLOAT:
      printf ("    %20f      ", *(float *) bucket_record_p);
      break;

    case DB_TYPE_INTEGER:
      printf ("    %14d              ", *(int *) bucket_record_p);
      break;

    case DB_TYPE_BIGINT:
      printf ("    %19lld             ", (long long) (*(DB_BIGINT *) bucket_record_p));
      break;

    case DB_TYPE_SHORT:
      printf ("    %14d              ", *(short *) bucket_record_p);
      break;

    case DB_TYPE_DATE:
      db_date_decode ((DB_DATE *) bucket_record_p, &month, &day, &year);
      fprintf (stdout, "    %2d/%2d/%4d              ", month, day, year);
      break;

    case DB_TYPE_TIME:
      db_time_decode ((DB_TIME *) bucket_record_p, &hour, &minute, &second);
      fprintf (stdout, "      %2d:%2d:%2d               ", hour, minute, second);
      break;

    case DB_TYPE_TIMESTAMP:
      {
        DB_DATE tmp_date;
        DB_TIME tmp_time;

        db_timestamp_decode_ses ((DB_UTIME *) bucket_record_p, &tmp_date, &tmp_time);
        db_date_decode (&tmp_date, &month, &day, &year);
        db_time_decode (&tmp_time, &hour, &minute, &second);
        printf ("    %2d:%2d:%2d %2d/%2d/%4d             ", hour, minute, second, month, day, year);
      }
      break;

    case DB_TYPE_DATETIME:
      db_datetime_decode ((DB_DATETIME *) bucket_record_p, &month, &day, &year, &hour, &minute, &second,
                  &millisecond);
      printf ("    %2d:%2d:%2d.%03d %2d/%2d/%4d             ", hour, minute, second, millisecond, month, day, year);
      break;

    case DB_TYPE_MONETARY:
      OR_MOVE_DOUBLE (bucket_record_p, &d);
      printf ("    %14f type %d       ", d, ((DB_MONETARY *) bucket_record_p)->type);
      break;
#endif
    default:
      /* Unspecified key type: Directory header has been corrupted */
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_EH_INVALID_KEY_TYPE, 1, key_type);
      return;
    }
      printf ("(%5d,%5d,%5d)  *\n", assoc_value.volid, assoc_value.pageid, assoc_value.slotid);

    }

  printf ("*********************************************************\n");
}

/*
 * Recovery functions
 */

/*
 * ehash_rv_init_bucket_redo () - Redo the initilization of a bucket page
 *   return: int
 *   recv(in): Recovery structure
 *
 * Note: Redo the initilization of a bucket page. The data area of the
 * recovery structure contains the alignment value and the
 * local depth of the bucket page.
 */
int
ehash_rv_init_bucket_redo (THREAD_ENTRY * thread_p, LOG_RCV * recv_p)
{
  char alignment;
  EHASH_BUCKET_HEADER bucket_header;
  RECDES bucket_recdes;
  const char *record_p;
  PGSLOTID slot_id;
  int success;

  record_p = recv_p->data;

  alignment = *(char *) record_p;
  record_p += sizeof (char);
  bucket_header.local_depth = *(char *) record_p;

  pgbuf_set_page_ptype (thread_p, recv_p->pgptr, PAGE_EHASH);

  /*
   * Initilize the bucket to contain variable-length records
   * on ordered slots.
   */
  spage_initialize (thread_p, recv_p->pgptr, UNANCHORED_KEEP_SEQUENCE, alignment, DONT_SAFEGUARD_RVSPACE);

  /* Set the record descriptor to the Bucket header */
  bucket_recdes.data = (char *) &bucket_header;
  bucket_recdes.area_size = bucket_recdes.length = sizeof (EHASH_BUCKET_HEADER);
  bucket_recdes.type = REC_HOME;

  success = spage_insert (thread_p, recv_p->pgptr, &bucket_recdes, &slot_id);
  if (success != SP_SUCCESS)
    {
      /* Slotted page module refuses to insert a short size record to an empty page. This should never happen. */
      if (success != SP_ERROR)
    {
      er_set (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, ER_GENERIC_ERROR, 0);
    }
      assert (er_errid () != NO_ERROR);
      return er_errid ();
    }

  return NO_ERROR;
}

/*
 * ehash_rv_init_dir_redo () - Redo the initilization of a directory
 *   return: int
 *   recv(in): Recovery structure
 */
int
ehash_rv_init_dir_redo (THREAD_ENTRY * thread_p, LOG_RCV * recv_p)
{
  pgbuf_set_page_ptype (thread_p, recv_p->pgptr, PAGE_EHASH);

  return log_rv_copy_char (thread_p, recv_p);
}

/*
 * ehash_rv_insert_redo () - Redo the insertion of an entry to a bucket page
 *   return: int
 *   recv(in): Recovery structure
 *
 * Note: Redo the insertion of a (key, assoc-value) entry to a specific
 * extendible hashing structure. The data area of the recovery
 * structure contains the key type, and the entry to be inserted.
 */
int
ehash_rv_insert_redo (THREAD_ENTRY * thread_p, LOG_RCV * recv_p)
{
  PGSLOTID slot_id;
  RECDES recdes;
  int success;

  slot_id = recv_p->offset;
  recdes.type = *(short *) (recv_p->data);
  recdes.data = (char *) (recv_p->data) + sizeof (short);
  recdes.area_size = recdes.length = recv_p->length - sizeof (recdes.type);

  success = spage_insert_for_recovery (thread_p, recv_p->pgptr, slot_id, &recdes);
  pgbuf_set_dirty (thread_p, recv_p->pgptr, DONT_FREE);

  if (success != SP_SUCCESS)
    {
      er_set (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, ER_GENERIC_ERROR, 0);
      return er_errid ();
    }

  return NO_ERROR;
}


/*
 * ehash_rv_insert_undo () - Undo the insertion of an entry to ext. hash
 *   return: int
 *   recv(in): Recovery structure
 *
 * Note: Undo the insertion of a (key, assoc-value) entry to a
 * specific extendible hashing structure. The data area of the
 * recovery structure contains the ext. hashing identifier, and
 * the entry to be deleted. Note that this function uses the
 * recovery delete function (not the original one) in order to
 * avoid possible merge operations.
 */
int
ehash_rv_insert_undo (THREAD_ENTRY * thread_p, LOG_RCV * recv_p)
{
  EHID ehid;
  char *record_p = (char *) recv_p->data;
#if defined (ENABLE_UNUSED_FUNCTION)
  short record_type = recv_p->offset;
#endif

  record_p = ehash_read_ehid_from_record (record_p, &ehid);
  record_p += sizeof (OID);

#if defined (ENABLE_UNUSED_FUNCTION)
  if (record_type == REC_BIGONE)
    {
      /* This record is for a long string. At the monent it seems more logical to compose the whole key here rather
       * than at the logging time. But, if the overflow pages cannot be guaranteed to exist when this function is
       * called, then the whole key would need to be logged and this portion of the code becomes unnecessary. */
      RECDES recdes;
      char *long_str;
      int error;

      /* Compose the whole key */
      recdes.data = (char *) recv_p->data + sizeof (EHID);
      recdes.length = recdes.area_size = recv_p->length - sizeof (EHID);
      long_str = ehash_compose_overflow (thread_p, &recdes);

      if (long_str == NULL)
    {
      assert (er_errid () != NO_ERROR);
      error = er_errid ();
      return error;
    }

      error = ehash_rv_delete (thread_p, &ehid, (void *) long_str);
      free_and_init (long_str);
      return error;
    }
  else
#endif
    {
      return ehash_rv_delete (thread_p, &ehid, (void *) record_p);
    }
}

/*
 * ehash_rv_delete_redo () - Redo the deletion of an entry to ext. hash
 *   return: int
 *   recv(in): Recovery structure
 *
 * Note: Redo the deletion of a (key, assoc-value) entry from a
 * specific extendible hashing structure. The data area of the
 * recovery structure contains the key type followed by the
 * entry to be deleted.
 */
int
ehash_rv_delete_redo (THREAD_ENTRY * thread_p, LOG_RCV * recv_p)
{
  PGSLOTID slot_id;
  RECDES recdes;
  RECDES existing_recdes;

  slot_id = recv_p->offset;
  recdes.type = *(short *) (recv_p->data);
  recdes.data = (char *) (recv_p->data) + sizeof (short);
  recdes.area_size = recdes.length = recv_p->length - sizeof (recdes.type);

  if (spage_get_record (thread_p, recv_p->pgptr, slot_id, &existing_recdes, PEEK) == S_SUCCESS)
    /*
     * There is a record in the specified slot. Check if it is the same
     * record
     */
    if ((existing_recdes.type == recdes.type) && (existing_recdes.length == recdes.length)
    && (memcmp (existing_recdes.data, recdes.data, recdes.length) == 0))
      {
    /* The record exist in the correct slot in the page. So, delete this slot from the page */
    (void) spage_delete (thread_p, recv_p->pgptr, slot_id);
    pgbuf_set_dirty (thread_p, recv_p->pgptr, DONT_FREE);

      }

  return NO_ERROR;
}

/*
 * ehash_rv_delete_undo () - Undo the deletion of an entry from ext. hash
 *   return: int
 *   recv(in): Recovery structure
 *
 * Note: Undo the deletion of a (key, assoc-value) entry from a
 * specific extendible hashing structure. The data area of the
 * recovery structure contains the ext. hashing identifier,
 * followed by the entry to be inserted back.
 */
int
ehash_rv_delete_undo (THREAD_ENTRY * thread_p, LOG_RCV * recv_p)
{
  EHID ehid;
  OID oid;
  char *record_p = (char *) recv_p->data;
  int error = NO_ERROR;
#if defined (ENABLE_UNUSED_FUNCTION)
  short record_type = recv_p->offset;
#endif

  record_p = ehash_read_ehid_from_record (record_p, &ehid);
  record_p = ehash_read_oid_from_record (record_p, &oid);

  /* Now rec_ptr is pointing to the key value */
#if defined (ENABLE_UNUSED_FUNCTION)
  if (record_type == REC_BIGONE)
    {
      /* This record is for a long string. At the monent it seems more logical to compose the whole key here rather
       * than at the logging time. But, if the overflow pages cannot be guaranteed to exist when this function is
       * called, then the whole key would need to be logged and this portion of the code becomes unnecessary. */
      RECDES recdes;
      VPID *vpid = (VPID *) record_p;
      char *long_str;

      /* Compose the whole key */
      recdes.data = (char *) recv_p->data + sizeof (EHID);
      recdes.length = recdes.area_size = recv_p->length - sizeof (EHID);
      long_str = ehash_compose_overflow (thread_p, &recdes);
      if (long_str == NULL)
    {
      assert (er_errid () != NO_ERROR);
      return er_errid ();
    }

      if (ehash_insert_helper (thread_p, &ehid, (void *) long_str, &oid, S_LOCK, vpid) == NULL)
    {
      assert (er_errid () != NO_ERROR);
      error = er_errid ();
    }

      free_and_init (long_str);
    }
  else
#endif
    {
      if (ehash_insert_helper (thread_p, &ehid, (void *) record_p, &oid, S_LOCK, NULL) == NULL)
    {
      assert (er_errid () != NO_ERROR);
      error = er_errid ();
    }
    }

  return (error);
}

/*
 * ehash_rv_delete () - Recovery delete function
 *   return: int
 *   ehid(in): identifier for the extendible hashing structure
 *   key(in): Key value to remove
 *
 * Note: This is the recovery version of the "ehash_delete" function. Just
 * like the original function, this one also deletes the given
 * key value (together with its assoc_value) from the specified
 * extendible hashing structure, and returns an error code if the
 * key does not exist. However, unlike "ehash_delete", it does not
 * check the condition of bucket and invoke "eh_try_to_merge"
 * when possible. Nor does it produce undo log information. (It
 * merely produces redo log to produce the compansating log
 * record for this operation. This is to minimize the recovery
 * and transaction abort activities.)
 */
static int
ehash_rv_delete (THREAD_ENTRY * thread_p, EHID * ehid_p, void *key_p)
{
  EHASH_DIR_HEADER *dir_header_p;
  PAGE_PTR dir_root_page_p;
  VPID bucket_vpid;
  PAGE_PTR bucket_page_p;
  PGSLOTID slot_no;

  RECDES bucket_recdes;
  RECDES log_recdes_redo;
  char *log_redo_record_p;
  int error = NO_ERROR;

  dir_root_page_p =
    ehash_find_bucket_vpid_with_hash (thread_p, ehid_p, key_p, PGBUF_LATCH_READ, PGBUF_LATCH_WRITE, &bucket_vpid, NULL,
                      NULL);
  if (dir_root_page_p == NULL)
    {
      assert (er_errid () != NO_ERROR);
      return er_errid ();
    }
  dir_header_p = (EHASH_DIR_HEADER *) dir_root_page_p;

  if (bucket_vpid.pageid == NULL_PAGEID)
    {
      pgbuf_unfix_and_init (thread_p, dir_root_page_p);
      er_set (ER_WARNING_SEVERITY, ARG_FILE_LINE, ER_EH_UNKNOWN_KEY, 0);
      return ER_EH_UNKNOWN_KEY;
    }
  else
    {
      bucket_page_p = ehash_fix_old_page (thread_p, &ehid_p->vfid, &bucket_vpid, PGBUF_LATCH_WRITE);
      if (bucket_page_p == NULL)
    {
      pgbuf_unfix_and_init (thread_p, dir_root_page_p);

      assert (er_errid () != NO_ERROR);
      return er_errid ();
    }

      /* Try to delete key from buc_page */

      /* Check if deletion possible, or not */
      if (ehash_locate_slot (thread_p, bucket_page_p, dir_header_p->key_type, key_p, &slot_no) == false)
    {
      /* Key does not exist, so return errorcode */
      pgbuf_unfix_and_init (thread_p, bucket_page_p);
      pgbuf_unfix_and_init (thread_p, dir_root_page_p);
      er_set (ER_WARNING_SEVERITY, ARG_FILE_LINE, ER_EH_UNKNOWN_KEY, 0);
      return ER_EH_UNKNOWN_KEY;
    }
      else
    {
      /* Key exists in the bucket */

      /* Put the redo (compansating) log record */
      (void) spage_get_record (thread_p, bucket_page_p, slot_no, &bucket_recdes, PEEK);

      /* Prepare the redo log record */
      if (ehash_allocate_recdes (&log_recdes_redo, bucket_recdes.length + sizeof (short), REC_HOME) == NULL)
        {
          /*
           * Will not be able to log a compensating log record... continue
           * anyhow...without the log...since we are doing recovery anyhow
           */
          error = ER_OUT_OF_VIRTUAL_MEMORY;
        }
      else
        {
          log_redo_record_p = log_recdes_redo.data;

          /* First insert the key type */
          *(short *) log_redo_record_p = bucket_recdes.type;
          log_redo_record_p += sizeof (short);

          /* Copy (the assoc-value, key) pair from the bucket record */
          memcpy (log_redo_record_p, bucket_recdes.data, bucket_recdes.length);

          /* why is redo logged inside an undo? */
          log_append_redo_data2 (thread_p, RVEH_DELETE, &ehid_p->vfid, bucket_page_p, slot_no,
                     log_recdes_redo.length, log_recdes_redo.data);

          ehash_free_recdes (&log_recdes_redo);
        }

      /* Delete the bucket record from the bucket; */
      (void) spage_delete (thread_p, bucket_page_p, slot_no);
      pgbuf_set_dirty (thread_p, bucket_page_p, DONT_FREE);

      /* Do not remove the overflow pages here */
      /* Since they will be removed by their own undo functions */
    }
    }

  pgbuf_unfix_and_init (thread_p, bucket_page_p);
  pgbuf_unfix_and_init (thread_p, dir_root_page_p);

  return error;
}

/*
 * ehash_rv_increment () - Recovery increment counter
 *   return: int
 *   recv(in): Recovery structure
 *
 * Note: This function increments the value of an (integer) counter
 * during the recovery (or transaction abort) time. The location
 * of the counter and the amount of increment is passed as the
 * first and second element, respectively, on the data area of
 * the recovery structure.
 */
int
ehash_rv_increment (THREAD_ENTRY * thread_p, LOG_RCV * recv_p)
{
  int inc_cnt;

  inc_cnt = *(int *) recv_p->data;

  *(int *) ((char *) recv_p->pgptr + recv_p->offset) += inc_cnt;

  pgbuf_set_dirty (thread_p, recv_p->pgptr, DONT_FREE);
  return NO_ERROR;
}

/*
 * ehash_rv_connect_bucket_redo () - Recovery connect bucket
 *   return: int
 *   recv(in): Recovery structure
 *
 * Note: This function sets a number of directory pointers (on the
 * same directory page) to the specified bucket pageid. The
 * bucket pageid, and the number of pointers to be updated are
 * passed as the first and the second elements, respectively,
 * of the data area of the recovery stucture.
 */
int
ehash_rv_connect_bucket_redo (THREAD_ENTRY * thread_p, LOG_RCV * recv_p)
{
  EHASH_REPETITION repetition;
  EHASH_DIR_RECORD *dir_record_p;
  int i;

  repetition = *(EHASH_REPETITION *) recv_p->data;

  /* Update the directory page */
  dir_record_p = (EHASH_DIR_RECORD *) ((char *) recv_p->pgptr + recv_p->offset);
  for (i = 0; i < repetition.count; i++)
    {

      if (!VPID_EQ (&(dir_record_p->bucket_vpid), &repetition.vpid))
    {
      /* Yes, correction is needed */
      dir_record_p->bucket_vpid = repetition.vpid;
      pgbuf_set_dirty (thread_p, recv_p->pgptr, DONT_FREE);
    }

      dir_record_p++;
    }

  return NO_ERROR;
}

static char *
ehash_read_oid_from_record (char *record_p, OID * oid_p)
{
  oid_p->pageid = *(PAGEID *) record_p;
  record_p += sizeof (PAGEID);

  oid_p->volid = *(VOLID *) record_p;
  record_p += sizeof (VOLID);

  oid_p->slotid = *(PGSLOTID *) record_p;
  record_p += sizeof (PGSLOTID);

  return record_p;
}

static char *
ehash_write_oid_to_record (char *record_p, OID * oid_p)
{
  *(PAGEID *) record_p = oid_p->pageid;
  record_p += sizeof (PAGEID);

  *(VOLID *) record_p = oid_p->volid;
  record_p += sizeof (VOLID);

  *(PGSLOTID *) record_p = oid_p->slotid;
  record_p += sizeof (PGSLOTID);

  return record_p;
}

static char *
ehash_read_ehid_from_record (char *record_p, EHID * ehid_p)
{
  ehid_p->pageid = *(PAGEID *) record_p;
  record_p += sizeof (PAGEID);

  ehid_p->vfid.fileid = *(FILEID *) record_p;
  record_p += sizeof (FILEID);

  ehid_p->vfid.volid = *(VOLID *) record_p;
  record_p += (sizeof (EHID) - sizeof (PAGEID) - sizeof (FILEID));

  return record_p;
}

static char *
ehash_write_ehid_to_record (char *record_p, EHID * ehid_p)
{
  *(PAGEID *) record_p = ehid_p->pageid;
  record_p += sizeof (PAGEID);

  *(FILEID *) record_p = ehid_p->vfid.fileid;
  record_p += sizeof (FILEID);

  *(VOLID *) record_p = ehid_p->vfid.volid;
  record_p += (sizeof (EHID) - sizeof (PAGEID) - sizeof (FILEID));

  return record_p;
}