Skip to content

File query_hash_scan.c

File List > cubrid > src > query > query_hash_scan.c

Go to the documentation of this file

/*
 *
 * Copyright 2016 CUBRID Corporation
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 *
 */

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stddef.h>
#include <math.h>
#if defined(sun) || defined(HPUX)
#include <sys/types.h>
#include <netinet/in.h>
#endif
#if defined(_AIX)
#include <net/nh.h>
#endif

#include "fetch.h"
#include "memory_alloc.h"
#include "memory_hash.h"
#include "object_domain.h"
#include "object_primitive.h"
#include "object_representation.h"
#include "query_opfunc.h"
#include "string_opfunc.h"
#include "query_hash_scan.h"
#include "db_value_printer.hpp"
#include "dbtype.h"
#include "chartype.h"
#include "storage_common.h"
#include "error_manager.h"
#include "page_buffer.h"
#include "slotted_page.h"
#include "file_manager.h"
#include "db_date.h"
#include "thread_compat.hpp"
#include "oid.h"
// XXX: SHOULD BE THE LAST INCLUDE HEADER
#include "memory_wrapper.hpp"


static bool safe_memcpy (void *data, void *source, int size);
static DB_VALUE_COMPARE_RESULT qdata_hscan_key_compare (HASH_SCAN_KEY * ckey1, HASH_SCAN_KEY * ckey2, int *diff_pos);

/****************************************************************************/
/************************ file hash structure *******************************/
/****************************************************************************/
/* Number of bits pseudo key consists of */
#define FHS_HASH_KEY_BITS           (sizeof (FHS_HASH_KEY) * 8)

/* Number of bits the short data type consists of */
#define FHS_SHORT_BITS  (sizeof (short) * 8)

#define FHS_FLAG_INDIRECT   -1  /* 0xFFFF, record is page id for duplicate key bucket */
#define FHS_FLAG_DUMMY_NUM  0   /* meaningless flag of record in dk bucket */

#define FHS_HEADER_SLOT_ID  0
#define FHS_FIRST_SLOT_ID   1

/* key */
typedef unsigned int FHS_HASH_KEY;
#define FHS_KEY_SIZE (sizeof (FHS_HASH_KEY))
#define FHS_ALIGNMENT ((char) FHS_KEY_SIZE)
#define FHS_MAX_DUP_KEY 100 /* 10% of MAXNUM (PAGE 16K / RECORD 14 bytes) */
                /* TO_DO : adjust it properly using statistical information in optimizer */

#define SET_VPID(dest_vpid, vol_id, page_id)  \
  do \
    { \
      (dest_vpid).volid = vol_id; \
      (dest_vpid).pageid = page_id; \
    } \
  while (0)

/* Directory elements : Pointer to the bucket */
typedef struct fhs_dir_record FHS_DIR_RECORD;
struct fhs_dir_record
{
  VPID bucket_vpid;     /* bucket pointer */
};

/* Each bucket has a header area to store its local depth */
typedef struct fhs_bucket_header FHS_BUCKET_HEADER;
struct fhs_bucket_header
{
  char local_depth;     /* The local depth of the bucket */
};

typedef struct fhs_dk_bucket_header FHS_DK_BUCKET_HEADER;
struct fhs_dk_bucket_header
{
  VPID next_bucket;     /* bucket pointer */
  VPID last_bucket;     /* bucket pointer */
};

typedef enum
{
  FHS_SUCCESSFUL_COMPLETION,
  FHS_BUCKET_FULL,      /* Bucket full condition; used in insertion */
  FHS_ERROR_OCCURRED
} FHS_RESULT;

static void inline fhs_read_tftid_from_record (char *record_p, TFTID * tftid_p);
static void inline fhs_read_key_from_record (char *record_p, int *key);
static void inline fhs_read_flag_from_record (char *record_p, short *flag);
static char inline *fhs_write_tftid_to_record (char *record_p, TFTID * tftid_p);
static char inline *fhs_write_key_to_record (char *record_p, void *key_p);
static char inline *fhs_write_flag_to_record (char *record_p, short flag);
static int fhs_compose_record (THREAD_ENTRY * thread_p, void *key_p, TFTID * value_p, RECDES * recdes_p, short flag);
static PAGE_PTR fhs_fix_ehid_page (THREAD_ENTRY * thread_p, EHID * ehid, PGBUF_LATCH_MODE latch_mode);
static PAGE_PTR fhs_fix_old_page (THREAD_ENTRY * thread_p, const VFID * vfid_p, const VPID * vpid_p,
                  PGBUF_LATCH_MODE latch_mode);
static PAGE_PTR fhs_fix_nth_page (THREAD_ENTRY * thread_p, const VFID * vfid_p, int offset,
                  PGBUF_LATCH_MODE latch_mode);
static void fhs_dir_locate (int *out_page_no_p, int *out_offset_p);
static char *fhs_allocate_recdes (THREAD_ENTRY * thread_p, RECDES * recdes_p, int size, short type);
static void fhs_free_recdes (THREAD_ENTRY * thread_p, RECDES * recdes_p);
static int fhs_initialize_dir_new_page (THREAD_ENTRY * thread_p, PAGE_PTR page_p, void *args);
static int fhs_initialize_bucket_new_page (THREAD_ENTRY * thread_p, PAGE_PTR page_p, void *args);
static int fhs_initialize_dk_bucket_new_page (THREAD_ENTRY * thread_p, PAGE_PTR page_p, void *args);
static bool fhs_binary_search_bucket (THREAD_ENTRY * thread_p, PAGE_PTR bucket_page_p, PGSLOTID num_record,
                      void *key_p, PGSLOTID * out_position_p, bool need_to_backward);
static FHS_HASH_KEY fhs_hash (void *original_key_p);
static FHS_HASH_KEY fhs_hash_four_bytes_type (char *key_p);
static int fhs_compare_key (THREAD_ENTRY * thread_p, char *bucket_record_p, void *key_p,
                INT16 record_type, int *out_compare_result_p);
static bool fhs_locate_slot (THREAD_ENTRY * thread_p, PAGE_PTR bucket_page_p, void *key_p,
                 PGSLOTID * out_position_p, bool need_to_backward);
static int fhs_find_bucket_vpid_with_hash (THREAD_ENTRY * thread_p, FHSID * fhsid_p, void *key_p,
                       PGBUF_LATCH_MODE root_latch, PGBUF_LATCH_MODE bucket_latch,
                       VPID * out_vpid_p, FHS_HASH_KEY * out_hash_key_p, int *out_location_p);
static int fhs_insert_to_bucket_after_create (THREAD_ENTRY * thread_p, FHSID * fhsid_p, VPID * bucket_vpid_p,
                          int location, FHS_HASH_KEY hash_key, void *key_p, TFTID * value_p);
static FHS_RESULT fhs_insert_bucket_after_extend_if_need (THREAD_ENTRY * thread_p, FHSID * fhsid_p,
                              VPID * bucket_vpid_p, void *key_p, FHS_HASH_KEY hash_key,
                              TFTID * value_p);
static char fhs_find_depth (THREAD_ENTRY * thread_p, FHSID * fhsid_p, int location, VPID * bucket_vpid_p,
                VPID * sibling_vpid_p);
static int fhs_connect_bucket (THREAD_ENTRY * thread_p, FHSID * fhsid_p, int local_depth, FHS_HASH_KEY hash_key,
                   VPID * bucket_vpid_p);
static FHS_RESULT fhs_insert_to_bucket (THREAD_ENTRY * thread_p, FHSID * fhsid_p, PAGE_PTR bucket_page_p, void *key_p,
                    TFTID * value_p);
static FHS_RESULT fhs_insert_to_dk_bucket (THREAD_ENTRY * thread_p, FHSID * fhsid_p, VPID * next_bucket, void *key_p,
                       TFTID * value_p);
static PAGE_PTR fhs_extend_bucket (THREAD_ENTRY * thread_p, FHSID * fhsid_p, PAGE_PTR bucket_page_p, void *key_p,
                   FHS_HASH_KEY hash_key, int *out_new_bit_p, VPID * bucket_vpid);
static PAGE_PTR fhs_split_bucket (THREAD_ENTRY * thread_p, FHSID * fhsid_p, PAGE_PTR bucket_page_p, void *key_p,
                  int *out_old_local_depth_p, int *out_new_local_depth_p, VPID * sibling_vpid_p);
static int fhs_expand_directory (THREAD_ENTRY * thread_p, FHSID * fhsid_p, int new_depth);
static int fhs_find_first_bit_position (THREAD_ENTRY * thread_p, FHSID * fhsid_p, PAGE_PTR bucket_page_p,
                    FHS_BUCKET_HEADER * bucket_header_p, void *key_p, int num_recs,
                    PGSLOTID first_slot_id, int *out_old_local_depth_p, int *out_new_local_depth_p);
static int fhs_distribute_records_into_two_bucket (THREAD_ENTRY * thread_p, FHSID * fhsid_p,
                           PAGE_PTR bucket_page_p, FHS_BUCKET_HEADER * bucket_header_p,
                           int num_recs, PGSLOTID first_slot_id, PAGE_PTR sibling_page_p);
static int fhs_get_pseudo_key (THREAD_ENTRY * thread_p, RECDES * recdes_p, FHS_HASH_KEY * out_hash_key_p);

/* Number of pointers the first page of the directory contains */
#define FHS_NUM_FIRST_PAGES \
  ((DB_PAGESIZE) / SSIZEOF (FHS_DIR_RECORD))

/* Offset of the last pointer in the first directory page */
#define FHS_LAST_OFFSET_IN_FIRST_PAGE \
  ((FHS_NUM_FIRST_PAGES - 1) * sizeof(FHS_DIR_RECORD))

/* Number of pointers for each directory page (other than the first one)  */
#define FHS_NUM_NON_FIRST_PAGES \
  (DB_PAGESIZE / sizeof(FHS_DIR_RECORD))

/* Offset of the last pointer in the other directory pages */
#define FHS_LAST_OFFSET_IN_NON_FIRST_PAGE \
  ((FHS_NUM_NON_FIRST_PAGES - 1) * sizeof(FHS_DIR_RECORD))

#define GETBITS(value, pos, n) \
  ( ((value) >> ( FHS_HASH_KEY_BITS - (pos) - (n) + 1)) & (~(~0UL << (n))) )
#define FIND_OFFSET(hash_key, depth) (GETBITS((hash_key), 1, (depth)))
#define GETBIT(word, pos) (GETBITS((word), (pos), 1))
#define SETBIT(word,  pos) ( (word) | (1 << (FHS_HASH_KEY_BITS - (pos))) )
#define CLEARBIT(word, pos) ( (word) & ~(1 << (FHS_HASH_KEY_BITS - (pos))) )

/****************************************************************************/
/************************ file hash structure *******************************/
/****************************************************************************/

/*
 * qdata_alloc_hscan_key () - allocate new hash key
 *   returns: pointer to new structure or NULL on error
 *   thread_p(in): thread
 *   val_cnt(in): size of key
 *   alloc_vals(in): if true will allocate dbvalues
 */
HASH_SCAN_KEY *
qdata_alloc_hscan_key (cubthread::entry * thread_p, int val_cnt, bool alloc_vals)
{
  HASH_SCAN_KEY *key;
  int i;

  key = (HASH_SCAN_KEY *) db_private_alloc (thread_p, sizeof (HASH_SCAN_KEY));
  if (key == NULL)
    {
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, sizeof (HASH_SCAN_KEY));
      return NULL;
    }

  key->values = (DB_VALUE **) db_private_alloc (thread_p, sizeof (DB_VALUE *) * val_cnt);
  if (key->values == NULL)
    {
      db_private_free (thread_p, key);
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, sizeof (DB_VALUE *) * val_cnt);
      return NULL;
    }

  if (alloc_vals)
    {
      for (i = 0; i < val_cnt; i++)
    {
      key->values[i] = pr_make_value ();
      if (key->values[i] == NULL)
        {
          key->free_values = true;
          qdata_free_hscan_key (thread_p, key, i);
          er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, sizeof (DB_VALUE *));
          return NULL;
        }
    }
    }

  key->val_count = val_cnt;
  key->free_values = alloc_vals;
  return key;
}

/*
 * qdata_free_hscan_key () - free hash key
 *   thread_p(in): thread
 *   key(in): hash key
 */
void
qdata_free_hscan_key (cubthread::entry * thread_p, HASH_SCAN_KEY * key, int val_count)
{
  if (key == NULL)
    {
      return;
    }

  if (key->values != NULL)
    {
      if (key->free_values)
    {
      for (int i = 0; i < val_count; i++)
        {
          if (key->values[i])
        {
          pr_free_value (key->values[i]);
        }
        }
    }

      /* free values array */
      db_private_free (thread_p, key->values);
    }

  /* free structure */
  db_private_free (thread_p, key);
}

/*
 * qdata_hash_scan_key () - compute hash of aggregate key
 *   returns: hash value
 *   key(in): key
 *   ht_size(in): hash table size (in buckets)
 */
unsigned int
qdata_hash_scan_key (const void *key, unsigned int ht_size, HASH_METHOD hash_method)
{
  HASH_SCAN_KEY *ckey = (HASH_SCAN_KEY *) key;
  unsigned int hash_val = 0, tmp_hash_val;
  int i;

  /* build hash value */
  for (i = 0; i < ckey->val_count; i++)
    {
      hash_val = ROTL32 (hash_val, 13);
      tmp_hash_val = mht_get_hash_number (ht_size, ckey->values[i]);
      hash_val ^= tmp_hash_val;
      if (hash_val == 0)
    {
      hash_val = tmp_hash_val;
    }
    }

  if (hash_method == HASH_METH_HASH_FILE)
    {
      hash_val = fhs_hash (&hash_val);
    }

  return hash_val;
}

/*
 * qdata_hscan_key_compare () - compare two aggregate keys
 *   returns: comparison result
 *   key1(in): first key
 *   key2(in): second key
 *   diff_pos(out): if not equal, position of difference, otherwise -1
 */
static DB_VALUE_COMPARE_RESULT
qdata_hscan_key_compare (HASH_SCAN_KEY * ckey1, HASH_SCAN_KEY * ckey2, int *diff_pos)
{
  DB_VALUE_COMPARE_RESULT result;
  int i;

  assert (diff_pos);
  *diff_pos = -1;

  if (ckey1 == ckey2)
    {
      /* same pointer, same values */
      return DB_EQ;
    }

  if (ckey1->val_count != ckey2->val_count)
    {
      /* can't compare keys of different sizes; shouldn't get here */
      assert (false);
      return DB_UNK;
    }

  for (i = 0; i < ckey1->val_count; i++)
    {
      result = tp_value_compare (ckey1->values[i], ckey2->values[i], 0, 1);
      if (result != DB_EQ)
    {
      *diff_pos = i;
      return result;
    }
    }

  /* if we got this far, it's equal */
  return DB_EQ;
}

/*
 * qdata_hscan_key_eq () - check equality of two aggregate keys
 *   returns: true if equal, false otherwise
 *   key1(in): first key
 *   key2(in): second key
 */
int
qdata_hscan_key_eq (const void *key1, const void *key2)
{
  int decoy;

  /* compare for equality */
  return (qdata_hscan_key_compare ((HASH_SCAN_KEY *) key1, (HASH_SCAN_KEY *) key2, &decoy) == DB_EQ);
}

/*
 * qdata_build_hscan_key () - build aggregate key structure from reguvar list
 *   returns: NO_ERROR or error code
 *   thread_p(in): thread
 *   key(out): aggregate key
 *   regu_list(in): reguvar list for fetching values
 */
int
qdata_build_hscan_key (THREAD_ENTRY * thread_p, val_descr * vd, REGU_VARIABLE_LIST regu_list, HASH_SCAN_KEY * key)
{
  int rc = NO_ERROR;

  /* build key */
  key->free_values = false; /* references precreated DB_VALUES */
  key->val_count = 0;
  while (regu_list != NULL)
    {
      rc = fetch_peek_dbval (thread_p, &regu_list->value, vd, NULL, NULL, NULL, &key->values[key->val_count]);
      if (rc != NO_ERROR)
    {
      return rc;
    }

      /* next */
      regu_list = regu_list->next;
      key->val_count++;
    }

  /* all ok */
  return NO_ERROR;
}

/*
 * qdata_print_hash_scan_entry () - Print the entry
 *                              Will be used by mht_dump_hls() function
 *   return:
 *   fp(in)     :
 *   key(in)    :
 *   data(in)   :
 *   args(in)   :
 */
int
qdata_print_hash_scan_entry (THREAD_ENTRY * thread_p, FILE * fp, const void *data, const void *type_list, void *args)
{
  HASH_SCAN_VALUE *data_p;
  HASH_METHOD hash_list_scan_type;
  QFILE_TUPLE_VALUE_TYPE_LIST *type_list_p;
  DB_VALUE dbval;
  const PR_TYPE *pr_type_p;
  int i;
  char *tuple_p;
  OR_BUF buf;

  if (data == NULL || type_list == NULL || args == NULL)
    {
      return false;
    }

  data_p = (HASH_SCAN_VALUE *) data;

  hash_list_scan_type = *((HASH_METHOD *) args);
  if (hash_list_scan_type == HASH_METH_NOT_USE)
    {
      return false;
    }

  type_list_p = (QFILE_TUPLE_VALUE_TYPE_LIST *) type_list;
  if (type_list_p->type_cnt <= 0)
    {
      return false;
    }

  db_make_null (&dbval);

  if (fp == NULL)
    {
      fp = stdout;
    }

  fprintf (fp, "LIST_CACHE_ENTRY (%p) - ", data);

  if (hash_list_scan_type == HASH_METH_IN_MEM)
    {
      fprintf (fp, "data_size = [%d], data = { ", QFILE_GET_TUPLE_LENGTH (data_p->tuple));

      tuple_p = (char *) data_p->tuple + QFILE_TUPLE_LENGTH_SIZE;

      for (i = 0; i < type_list_p->type_cnt; i++)
    {
      if (QFILE_GET_TUPLE_VALUE_FLAG (tuple_p) == V_BOUND)
        {
          or_init (&buf, tuple_p + QFILE_TUPLE_VALUE_HEADER_SIZE, QFILE_GET_TUPLE_VALUE_LENGTH (tuple_p));

          pr_type_p = type_list_p->domp[i]->type;
          pr_type_p->data_readval (&buf, &dbval, type_list_p->domp[i], -1, false /* Don't copy */ , NULL, 0);

          db_fprint_value (fp, &dbval);

          if (db_value_need_clear (&dbval))
        {
          pr_clear_value (&dbval);
        }
        }
      else
        {
          fprintf (fp, "VALUE_UNBOUND");
        }

      if (i != type_list_p->type_cnt - 1)
        {
          fprintf (stdout, " , ");
        }

      tuple_p += QFILE_TUPLE_VALUE_HEADER_SIZE + QFILE_GET_TUPLE_VALUE_LENGTH (tuple_p);
    }

      fprintf (fp, " }");
    }
  else if (hash_list_scan_type == HASH_METH_HYBRID)
    {
      fprintf (fp, "pageid = [%d]  volid = [%d]  offset = [%d]", data_p->pos->vpid.pageid,
           data_p->pos->vpid.volid, data_p->pos->offset);
    }
  else if (hash_list_scan_type == HASH_METH_HASH_FILE)
    {
      /* nothing to do */
    }
  else
    {
      /* nothing to do */
    }

  fprintf (fp, "\n");

  return true;
}

/*
 * qdata_copy_hscan_key () - deep copy hash key
 *   returns: pointer to new hash key
 *   thread_p(in): thread
 *   key(in): source key
 */
HASH_SCAN_KEY *
qdata_copy_hscan_key (cubthread::entry * thread_p, HASH_SCAN_KEY * key, REGU_VARIABLE_LIST probe_regu_list,
              val_descr * vd)
{
  HASH_SCAN_KEY *new_key = NULL;
  int i = 0;
  DB_TYPE vtype1, vtype2;
  TP_DOMAIN_STATUS status = DOMAIN_COMPATIBLE;

  if (key)
    {
      /* make a copy */
      new_key = qdata_alloc_hscan_key (thread_p, key->val_count, false);
    }

  if (new_key)
    {
      /* copy values */
      new_key->val_count = key->val_count;
      new_key->free_values = true;
      for (i = 0; i < key->val_count; i++)
    {
      vtype1 = REGU_VARIABLE_GET_TYPE (&probe_regu_list->value);
      vtype2 = DB_VALUE_DOMAIN_TYPE (key->values[i]);

      if (vtype1 != vtype2)
        {
          new_key->values[i] = pr_make_value ();
          if (new_key->values[i] == NULL)
        {
          qdata_free_hscan_key (thread_p, new_key, i);
          er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, sizeof (DB_VALUE *));
          return NULL;
        }

          status = tp_value_coerce (key->values[i], new_key->values[i], probe_regu_list->value.domain);
          if (status != DOMAIN_COMPATIBLE)
        {
          qdata_free_hscan_key (thread_p, new_key, ++i);
          er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_TP_CANT_COERCE, 2, pr_type_name (vtype2),
              pr_type_name (vtype1));
          return NULL;
        }
        }
      else
        {
          new_key->values[i] = pr_copy_value (key->values[i]);
          if (new_key->values[i] == NULL)
        {
          qdata_free_hscan_key (thread_p, new_key, i);
          er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, sizeof (DB_VALUE *));
          return NULL;
        }
        }
      probe_regu_list = probe_regu_list->next;
    }
    }

  return new_key;
}

/*
 * qdata_copy_hscan_key_without_alloc () - deep copy hash key
 *   returns: pointer to new hash key
 *   thread_p(in): thread
 *   key(in): source key
 */
HASH_SCAN_KEY *
qdata_copy_hscan_key_without_alloc (cubthread::entry * thread_p, HASH_SCAN_KEY * key,
                    REGU_VARIABLE_LIST probe_regu_list, HASH_SCAN_KEY * new_key)
{
  DB_TYPE vtype1, vtype2;
  TP_DOMAIN_STATUS status = DOMAIN_COMPATIBLE;

  if (key == NULL)
    {
      return NULL;
    }
  if (new_key)
    {
      /* copy values */
      new_key->val_count = key->val_count;
      for (int i = 0; i < key->val_count; i++)
    {
      vtype1 = REGU_VARIABLE_GET_TYPE (&probe_regu_list->value);
      vtype2 = DB_VALUE_DOMAIN_TYPE (key->values[i]);

      if (vtype1 != vtype2)
        {
          pr_clear_value (new_key->values[i]);
          status = tp_value_coerce (key->values[i], new_key->values[i], probe_regu_list->value.domain);
          if (status != DOMAIN_COMPATIBLE)
        {
          er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_TP_CANT_COERCE, 2, pr_type_name (vtype2),
              pr_type_name (vtype1));
          return NULL;
        }
        }
      else
        {
          pr_clear_value (new_key->values[i]);
          if (pr_clone_value (key->values[i], new_key->values[i]) != NO_ERROR)
        {
          er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, sizeof (DB_VALUE *));
          return NULL;
        }
        }
      probe_regu_list = probe_regu_list->next;
    }
    }

  return new_key;
}

/*
 * qdata_alloc_hscan_value () - allocate new hash value
 *   returns: pointer to new structure or NULL on error
 *   thread_p(in): thread
 */
HASH_SCAN_VALUE *
qdata_alloc_hscan_value (cubthread::entry * thread_p, QFILE_TUPLE tpl)
{
  HASH_SCAN_VALUE *value;
  int tuple_size = QFILE_GET_TUPLE_LENGTH (tpl);

  /* alloc structure */
  value = (HASH_SCAN_VALUE *) db_private_alloc (thread_p, sizeof (HASH_SCAN_VALUE));
  if (value == NULL)
    {
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, sizeof (HASH_SCAN_VALUE));
      return NULL;
    }

  value->tuple = (QFILE_TUPLE) db_private_alloc (thread_p, tuple_size);
  if (value->tuple == NULL)
    {
      qdata_free_hscan_value (thread_p, value);
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, tuple_size);
      return NULL;
    }
  /* save tuple */
  if (!safe_memcpy (value->tuple, tpl, tuple_size))
    {
      qdata_free_hscan_value (thread_p, value);
      return NULL;
    }
  return value;
}

/*
 * qdata_alloc_hscan_value_OID () - allocate new hash OID value
 *   returns: pointer to new structure or NULL on error
 *   thread_p(in): thread
 */
HASH_SCAN_VALUE *
qdata_alloc_hscan_value_OID (cubthread::entry * thread_p, QFILE_LIST_SCAN_ID * scan_id_p)
{
  HASH_SCAN_VALUE *value;

  /* alloc structure */
  value = (HASH_SCAN_VALUE *) db_private_alloc (thread_p, sizeof (HASH_SCAN_VALUE));
  if (value == NULL)
    {
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, sizeof (HASH_SCAN_VALUE));
      return NULL;
    }

  value->pos = (QFILE_TUPLE_SIMPLE_POS *) db_private_alloc (thread_p, sizeof (QFILE_TUPLE_SIMPLE_POS));
  if (value->pos == NULL)
    {
      qdata_free_hscan_value (thread_p, value);
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, sizeof (QFILE_TUPLE_SIMPLE_POS));
      return NULL;
    }

  /* save position */
  value->pos->offset = scan_id_p->curr_offset;
  value->pos->vpid = scan_id_p->curr_vpid;

  return value;
}

static bool
safe_memcpy (void *data, void *source, int size)
{
  if (size < 0)
    {
      return false;
    }
  memcpy (data, source, (size_t) size);
  return true;
}

/*
 * qdata_free_hscan_value () - free hash value
 *   thread_p(in): thread
 *   key(in): hash value
 */
void
qdata_free_hscan_value (cubthread::entry * thread_p, HASH_SCAN_VALUE * value)
{
  if (value == NULL)
    {
      return;
    }

  /* free values */
  if (value->data != NULL)
    {
      db_private_free_and_init (thread_p, value->data);
    }
  /* free structure */
  db_private_free_and_init (thread_p, value);
}

/*
 * qdata_free_agg_hentry () - free key-value pair of hash entry
 *   returns: error code or NO_ERROR
 *   key(in): key pointer
 *   data(in): value pointer
 *   args(in): args passed by mht_rem (should be null)
 */
int
qdata_free_hscan_entry (const void *key, void *data, void *args)
{
  /* free key */
  qdata_free_hscan_key ((cubthread::entry *) args, (HASH_SCAN_KEY *) key, key ? ((HASH_SCAN_KEY *) key)->val_count : 0);

  /* free tuple */
  qdata_free_hscan_value ((cubthread::entry *) args, (HASH_SCAN_VALUE *) data);

  /* all ok */
  return NO_ERROR;
}

/*
 * fhs_dump_bucket () - Print the bucket's contents
 *   return:
 *   buc_pgptr(in): bucket page whose contents is going to be dumped
 *
 * Note: A debugging function. Prints out the contents of the given bucket.
 */
static void
fhs_dump_bucket (THREAD_ENTRY * thread_p, PAGE_PTR bucket_page_p)
{
  FHS_BUCKET_HEADER *bucket_header_p;
  FHS_DK_BUCKET_HEADER *dk_bucket_header_p;
  char *bucket_record_p;
  RECDES recdes;
  PGSLOTID slot_id, first_slot_id = -1;
  int key_size;
  TFTID assoc_value;
  int num_records;
  int i, key;
  short flag;

  (void) spage_next_record (bucket_page_p, &first_slot_id, &recdes, PEEK);
  bucket_header_p = (FHS_BUCKET_HEADER *) recdes.data;
  dk_bucket_header_p = (FHS_DK_BUCKET_HEADER *) recdes.data;

  printf ("*************************************************************\n");
  printf ("*  local_depth : %d                                         *\n", bucket_header_p->local_depth);
  printf ("*  next_vpid   : %d                                         *\n", dk_bucket_header_p->next_bucket.pageid);
  printf ("*  no. records : %d                                         *\n",
      spage_number_of_records (bucket_page_p) - 1);
  printf ("*                                                           *\n");
  printf ("*   No        Key         flag             Value            *\n");
  printf ("*  ====   =============  ========   ==================      *\n");

  num_records = spage_number_of_records (bucket_page_p);

  for (slot_id = 1; slot_id < num_records; slot_id++)
    {
      printf ("*   %2d", slot_id);

      spage_get_record (thread_p, bucket_page_p, slot_id, &recdes, PEEK);
      bucket_record_p = (char *) recdes.data;
      fhs_read_tftid_from_record (bucket_record_p, &assoc_value);
      fhs_read_key_from_record (bucket_record_p, &key);
      fhs_read_flag_from_record (bucket_record_p, &flag);

      printf ("      %u  ", key);
      printf ("    %d       ", flag);
      printf ("(%5d,%5d,%5d)     *\n", assoc_value.volid, assoc_value.pageid, assoc_value.offset);
    }

  printf ("*************************************************************\n");
}

/*
 * fhs_dump () - Dump directory & all buckets
 *   return:
 *   fhsid(in): identifier for the extendible hashing structure to dump
 *
 * Note: A debugging function. Dumps the contents of the directory
 * and the buckets of specified ext. hashing structure.
 */
void
fhs_dump (THREAD_ENTRY * thread_p, FHSID * fhsid_p)
{
  FHS_DIR_RECORD *dir_record_p;
  int num_pages;
  int num_ptrs;

  int check_pages;
  int end_offset;
  int i;

  PAGE_PTR dir_page_p;
  PGLENGTH dir_offset;
  int dir_page_no;
  int dir_ptr_no;

  PAGE_PTR bucket_page_p;
  int bucket_page_no;

  if (fhsid_p == NULL)
    {
      return;
    }

  if (file_get_num_user_pages (thread_p, &fhsid_p->ehid.vfid, &num_pages) != NO_ERROR)
    {
      ASSERT_ERROR ();
      return;
    }
  num_pages -= 1;       /* The first page starts with 0 */

  num_ptrs = 1 << fhsid_p->depth;
  end_offset = num_ptrs - 1;    /* Directory first pointer has an offset of 0 */
  fhs_dir_locate (&check_pages, &end_offset);

  if (check_pages != num_pages)
    {
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_EH_ROOT_CORRUPTED, 3, fhsid_p->ehid.vfid.volid,
          fhsid_p->ehid.vfid.fileid, fhsid_p->ehid.pageid);
      return;
    }

  printf ("*********************************************************\n");
  printf ("*                      DIRECTORY                        *\n");
  printf ("*                                                       *\n");
  printf ("*    Depth    :  %d                                      *\n", fhsid_p->depth);
  printf ("*    Key type : int                                     *\n");
  printf ("*    Key size :  %ld                                      *\n", sizeof (FHS_HASH_KEY));
  printf ("*                                                       *\n");
  printf ("*                      POINTERS                         *\n");
  printf ("*                                                       *\n");

  /* Print directory */

  dir_offset = 0;
  dir_page_no = 0;
  dir_ptr_no = 0;

  dir_page_p = fhs_fix_ehid_page (thread_p, &fhsid_p->ehid, PGBUF_LATCH_READ);
  if (dir_page_p == NULL)
    {
      return;
    }

  for (i = 0; i < num_ptrs; i++)
    {
      if (DB_PAGESIZE - dir_offset < SSIZEOF (FHS_DIR_RECORD))
    {
      /* We reached the end of the directory page. The next bucket pointer is in the next directory page. */

      /* Release previous page, and unlock it */
      pgbuf_unfix_and_init (thread_p, dir_page_p);

      dir_page_no++;

      /* Get another page */
      dir_page_p = fhs_fix_nth_page (thread_p, &fhsid_p->ehid.vfid, dir_page_no, PGBUF_LATCH_READ);
      if (dir_page_p == NULL)
        {
          return;
        }

      dir_offset = 0;
    }

      /* Print out the next directory record */
      dir_record_p = (FHS_DIR_RECORD *) ((char *) dir_page_p + dir_offset);

      if (VPID_ISNULL (&dir_record_p->bucket_vpid))
    {
      printf ("*    Dir loc :  %d   points to bucket page  id: NULL    *\n", dir_ptr_no);
    }
      else
    {
      printf ("*    Dir loc :  %d   points to vol:%d bucket pgid: %d  *\n", dir_ptr_no,
          dir_record_p->bucket_vpid.volid, dir_record_p->bucket_vpid.pageid);
    }

      dir_ptr_no++;
      dir_offset += sizeof (FHS_DIR_RECORD);
    }

  /* Release last page */
  pgbuf_unfix_and_init (thread_p, dir_page_p);

  printf ("*                                                       *\n");
  printf ("*********************************************************\n");

  /* Print buckets */

  if (file_get_num_user_pages (thread_p, &fhsid_p->bucket_file, &num_pages) != NO_ERROR)
    {
      ASSERT_ERROR ();
      return;
    }
  num_pages -= 1;

  for (bucket_page_no = 0; bucket_page_no <= num_pages; bucket_page_no++)
    {
      bucket_page_p = fhs_fix_nth_page (thread_p, &fhsid_p->bucket_file, bucket_page_no, PGBUF_LATCH_READ);
      if (bucket_page_p == NULL)
    {
      return;
    }

      printf ("\n\n");
      fhs_dump_bucket (thread_p, bucket_page_p);
      pgbuf_unfix_and_init (thread_p, bucket_page_p);
    }

  return;
}

static inline char *
fhs_write_tftid_to_record (char *record_p, TFTID * tftid_p)
{
  memcpy (record_p, tftid_p, sizeof (TFTID));
  record_p += sizeof (TFTID);
  return record_p;
}

static inline char *
fhs_write_key_to_record (char *record_p, void *key_p)
{
  *(int *) record_p = *(int *) key_p;
  record_p += sizeof (int);

  return record_p;
}

static inline char *
fhs_write_flag_to_record (char *record_p, short flag)
{
  *(short *) record_p = flag;
  record_p += sizeof (short);

  return record_p;
}

static inline void
fhs_read_tftid_from_record (char *record_p, TFTID * tftid_p)
{
  memcpy (tftid_p, record_p, sizeof (TFTID));
}

static inline void
fhs_read_key_from_record (char *record_p, int *key)
{
  record_p += sizeof (TFTID);
  *key = *(int *) record_p;
}

static inline void
fhs_read_flag_from_record (char *record_p, short *flag)
{
  record_p += sizeof (TFTID) + sizeof (int);
  *flag = *(short *) record_p;
}

/*
 * fhs_compose_record () -
 *   return: NO_ERROR, or ER_FAILED
 *   key_ptr(in): Pointer to the key
 *   value_ptr(in): Pointer to the associated value
 *   recdes(in): Pointer to the Record descriptor to fill in
 *
 * Note: record : TFTID  | key(int) | flag(short)
 *                8byte    4byte       2byte    = 14byte
 */
static int
fhs_compose_record (THREAD_ENTRY * thread_p, void *key_p, TFTID * value_p, RECDES * recdes_p, short flag)
{
  int record_size;
  char *record_p;

  /* record : TFTID | key(int) | flag(short) */
  record_size = sizeof (TFTID) + sizeof (FHS_HASH_KEY) + sizeof (short);
  if (fhs_allocate_recdes (thread_p, recdes_p, record_size, REC_HOME) == NULL)
    {
      return ER_FAILED;
    }

  recdes_p->type = REC_HOME;

  record_p = recdes_p->data;
  record_p = fhs_write_tftid_to_record (record_p, value_p);
  record_p = fhs_write_key_to_record (record_p, key_p);
  record_p = fhs_write_flag_to_record (record_p, flag);

  return NO_ERROR;
}

/*
 * fhs_fix_ehid_page () -
 *   return: specified page pointer, or NULL
 *   ehid(in): extendible hashing structure
 *   latch_mode(in): lock mode
 */
static PAGE_PTR
fhs_fix_ehid_page (THREAD_ENTRY * thread_p, EHID * ehid, PGBUF_LATCH_MODE latch_mode)
{
  VPID vpid;

  vpid.volid = ehid->vfid.volid;
  vpid.pageid = ehid->pageid;

  return fhs_fix_old_page (thread_p, &(ehid->vfid), &vpid, latch_mode);
}

/*
 * fhs_fix_old_page () -
 *   return: specified page pointer, or NULL
 *   vfid_p(in): only for error reporting
 *   vpid_p(in):
 *   latch_mode(in): lock mode
 */
static PAGE_PTR
fhs_fix_old_page (THREAD_ENTRY * thread_p, const VFID * vfid_p, const VPID * vpid_p, PGBUF_LATCH_MODE latch_mode)
{
  PAGE_PTR page_p;

  page_p = pgbuf_fix (thread_p, vpid_p, OLD_PAGE, latch_mode, PGBUF_UNCONDITIONAL_LATCH);
  if (page_p == NULL)
    {
      if (er_errid () == ER_PB_BAD_PAGEID)
    {
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_EH_UNKNOWN_EXT_HASH, 4, vfid_p->volid, vfid_p->fileid,
          vpid_p->volid, vpid_p->pageid);
    }

      return NULL;
    }
#if !defined (NDEBUG)
  (void) pgbuf_check_page_ptype (thread_p, page_p, PAGE_EHASH);
#endif
  return page_p;
}

/*
 * fhs_fix_nth_page () -
 *   return: specified page pointer, or NULL
 *   vfid(in):
 *   offset(in):
 *   lock(in): lock mode
 */
static PAGE_PTR
fhs_fix_nth_page (THREAD_ENTRY * thread_p, const VFID * vfid_p, int offset, PGBUF_LATCH_MODE latch_mode)
{
  VPID vpid;

  if (file_numerable_find_nth (thread_p, vfid_p, offset, false, NULL, NULL, &vpid) != NO_ERROR)
    {
      ASSERT_ERROR ();
      return NULL;
    }

  return fhs_fix_old_page (thread_p, vfid_p, &vpid, latch_mode);
}

/*
 * fhs_dir_locate()
 *
 *   page_no_var(in): The nth directory page containing the pointer
 *   offset_var(in): The offset into the directory just like if the directory
 *              where contained in one contiguous area.
 *              SET by this macro according to the page_no_var where the
 *              offset is located in this page.
 *
 * Note: This macro finds the location of a directory pointer
 * (i.e. the number of directory page containing this pointer and
 * the offset value to reach the pointer within that page).
 * The pointer number is given to macro with the
 * "offset_var" parameter.
 * Since this operation is performed very frequently, it is
 * coded as a macro. So, it must be used very carefully. Two
 * variables should be passed as parameters. The variable passed
 * to "offset_var" should have the number of pointer prior to
 * this macro call.
 * In the implementation of this macro, division operation
 * is performed twice; once for the remainder and once for the
 * quotient. This is preferred to the method of successive
 * subtractions since built-in "/" and "%" operators take
 * constant time, whereas the latter method has linear
 * characteristic (i.e., it would take longer for bigger
 * directory sizes).
 */
static void
fhs_dir_locate (int *out_page_no_p, int *out_offset_p)
{
  int page_no, offset;

  offset = *out_offset_p;

  if (offset < FHS_NUM_FIRST_PAGES)
    {
      /* in the first page */
      offset = offset * sizeof (FHS_DIR_RECORD);
      page_no = 0;      /* at least one page */
    }
  else
    {
      /* not in the first page */
      offset -= FHS_NUM_FIRST_PAGES;
      page_no = offset / FHS_NUM_NON_FIRST_PAGES + 1;
      offset = (offset % FHS_NUM_NON_FIRST_PAGES) * sizeof (FHS_DIR_RECORD);
    }

  *out_page_no_p = page_no;
  *out_offset_p = offset;
}

/*
 * fhs_allocate_recdes () -
 *   return: char *, or NULL
 *   recdes(out) : Record descriptor
 *   size(in)    : Request size
 *   type(in)    : Request type
 */
static char *
fhs_allocate_recdes (THREAD_ENTRY * thread_p, RECDES * recdes_p, int size, short type)
{
  recdes_p->area_size = recdes_p->length = size;
  recdes_p->type = type;
  recdes_p->data = NULL;

  if (recdes_p->area_size > 0)
    {
      recdes_p->data = (char *) db_private_alloc (thread_p, recdes_p->area_size);
      if (recdes_p->data == NULL)
    {
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, (size_t) recdes_p->area_size);
    }
    }

  return recdes_p->data;
}

/*
 * fhs_free_recdes () -
 *   return:
 *   recdes(in): Record descriptor
 */
static void
fhs_free_recdes (THREAD_ENTRY * thread_p, RECDES * recdes_p)
{
  if (recdes_p->data)
    {
      db_private_free_and_init (thread_p, recdes_p->data);
    }
  recdes_p->area_size = recdes_p->length = 0;

}

/*
 * fhs_create () - Create an extendible hashing structure
 *   return: FHSID * (NULL in case of error)
 *   ehid(in): identifier for the extendible hashing structure to
 *             create. The volid field should already be set; others
 *             are set by this function.
 *   exp_num_entries(in): expected number of entries.
 *                   This figure is used as a guide to estimate the number of
 *           pages the extendible hashing structure will occupy.
 *           The purpose of this estimate is to increase the locality
 *           of reference on the disk.
 *           If the number of entries is not known, a negative value
 *           should be passed to this parameter.
 *
 * Note: Creates an extendible hashing structure for the particular
 * key type on the disk volume whose identifier is passed in
 * ehid->vfid.volid field. It creates two files on this volume: one
 * for the directory and one for the buckets. It also allocates
 * the first page of each file; the first bucket and the root
 * page of the directory.
 * The directory header area of the root page is
 * initialized by this function. Both the global depth of the
 * directory and the local depth of first bucket are set to 0.
 * The identifier of bucket file is kept in the directory header.
 * The directory file identifier and directory root page
 * identifier are loaded into the remaining fields of "Dir"
 * to be used as the complete identifier of this extendible
 * hashing structure for future reference.
 */
FHSID *
fhs_create (THREAD_ENTRY * thread_p, FHSID * fhsid_p, int exp_num_entries)
{
  FHS_DIR_RECORD *dir_record_p = NULL;
  VFID dir_vfid = VFID_INITIALIZER;
  VPID dir_vpid = VPID_INITIALIZER;
  PAGE_PTR dir_page_p = NULL;
  VFID bucket_vfid = VFID_INITIALIZER;
  VPID bucket_vpid = VPID_INITIALIZER;
  char init_bucket_data;
  DKNPAGES exp_bucket_pages;
  DKNPAGES exp_dir_pages;
  short key_size;
  PAGE_TYPE ptype = PAGE_EHASH;

  if (fhsid_p == NULL)
    {
      return NULL;
    }

  /* Set the key size */
  key_size = sizeof (FHS_HASH_KEY);

  /* Estimate number of bucket pages */
  if (exp_num_entries < 0)
    {
      /* Assume minimum size */
      exp_bucket_pages = 1;
    }
  else
    {
      exp_bucket_pages = exp_num_entries * (key_size + sizeof (TFTID) + sizeof (short));
      exp_bucket_pages = CEIL_PTVDIV (exp_bucket_pages, DB_PAGESIZE);
    }

  /* Create the first bucket and initialize its header */
  if (file_create_ehash (thread_p, exp_bucket_pages, true, NULL, &bucket_vfid) != NO_ERROR)
    {
      ASSERT_ERROR ();
      return NULL;
    }

  /* initialization of the first bucket page */
  init_bucket_data = 0;
  if (file_alloc (thread_p, &bucket_vfid, fhs_initialize_bucket_new_page, &init_bucket_data, &bucket_vpid, NULL)
      != NO_ERROR)
    {
      ASSERT_ERROR ();
      goto exit_on_error;
    }

  /* Estimate number of directory pages */
  if (exp_num_entries < 0)
    {
      exp_dir_pages = 1;
    }
  else
    {
      /* Calculate how many directory pages will be used */
      fhs_dir_locate (&exp_dir_pages, &exp_bucket_pages);
      /* exp_dir_pages is actually an index. the number of pages should be +1 */
      exp_dir_pages++;
    }

  /* Create the directory (allocate the first page) and initialize its header */

  dir_vfid.volid = bucket_vfid.volid;

  /*
   * Create the file and allocate the first page
   *
   * We do not initialize the page during the allocation since the file is
   * new, and the file is going to be removed in the event of a crash.
   */

  if (file_create_ehash_dir (thread_p, exp_dir_pages, true, NULL, &dir_vfid) != NO_ERROR)
    {
      ASSERT_ERROR ();
      goto exit_on_error;
    }
  if (file_alloc (thread_p, &dir_vfid, file_init_temp_page_type, &ptype, &dir_vpid, &dir_page_p) != NO_ERROR)
    {
      ASSERT_ERROR ();
      goto exit_on_error;
    }
  if (dir_page_p == NULL)
    {
      assert_release (false);
      goto exit_on_error;
    }
#if !defined (NDEBUG)
  pgbuf_check_page_ptype (thread_p, dir_page_p, PAGE_EHASH);
#endif

  /* store bucket page id */
  dir_record_p = (FHS_DIR_RECORD *) dir_page_p;
  dir_record_p->bucket_vpid = bucket_vpid;

  /* Finishing up; release the pages and return directory file id */
  pgbuf_set_dirty (thread_p, dir_page_p, FREE);

  /* Initialize the file hash scan id */
  fhsid_p->depth = 0;
  fhsid_p->alignment = FHS_ALIGNMENT;
  VFID_COPY (&fhsid_p->bucket_file, &bucket_vfid);
  VFID_COPY (&fhsid_p->ehid.vfid, &dir_vfid);
  fhsid_p->ehid.pageid = dir_vpid.pageid;

  return fhsid_p;

exit_on_error:

  if (!VFID_ISNULL (&bucket_vfid))
    {
      if (file_destroy (thread_p, &bucket_vfid, true) != NO_ERROR)
    {
      assert_release (false);
    }
    }
  if (!VFID_ISNULL (&dir_vfid))
    {
      if (file_destroy (thread_p, &dir_vfid, true) != NO_ERROR)
    {
      assert_release (false);
    }
    }
  return NULL;
}

/*
 * fhs_destroy () - destroy the extensible hash table
 *
 * return        : error code
 * thread_p (in) : thread entry
 * fhsid_p (in)   : extensible hash identifier
 *
 * note: only temporary extensible hash tables can be destroyed.
 */
int
fhs_destroy (THREAD_ENTRY * thread_p, FHSID * fhsid_p)
{
  if (fhsid_p == NULL)
    {
      return ER_FAILED;
    }

#if !defined(NDEBUG) && HASH_LIST_SCAN_DUMP_FILE_HASH
  fhs_dump (thread_p, fhsid_p);
#endif /* !defined(NDEBUG) && HASH_LIST_SCAN_DUMP_FILE_HASH */

  if (file_destroy (thread_p, &(fhsid_p->bucket_file), true) != NO_ERROR)
    {
      assert_release (false);
    }
  if (file_destroy (thread_p, &fhsid_p->ehid.vfid, true) != NO_ERROR)
    {
      assert_release (false);
    }

  return NO_ERROR;
}

/*
 * fhs_initialize_bucket_new_page () - Initialize a newly allocated page
 *
 * return    : Error code.
 * thread_p (in) : Thread entry
 * page_p (in)   : New bucket page
 * args (in)     : depth. Used to initialize and log the page.
 */
static int
fhs_initialize_bucket_new_page (THREAD_ENTRY * thread_p, PAGE_PTR page_p, void *args)
{
  char depth;
  FHS_BUCKET_HEADER bucket_header;
  RECDES bucket_recdes;
  PGSLOTID slot_id;
  int success = SP_SUCCESS;

  int error_code = NO_ERROR;

  depth = *((char *) args);

  /*
   * fetch and initialize the new page. The parameter UNANCHORED_KEEP_
   * SEQUENCE indicates that the order of records will be preserved
   * during insertions and deletions.
   */

  pgbuf_set_page_ptype (thread_p, page_p, PAGE_EHASH);

  /*
   * Initialize the bucket to contain variable-length records
   * on ordered slots.
   */
  spage_initialize (thread_p, page_p, UNANCHORED_KEEP_SEQUENCE, FHS_ALIGNMENT, DONT_SAFEGUARD_RVSPACE);

  /* Initialize the bucket header */
  bucket_header.local_depth = depth;

  /* Set the record descriptor to the Bucket header */
  bucket_recdes.data = (char *) &bucket_header;
  bucket_recdes.area_size = bucket_recdes.length = sizeof (FHS_BUCKET_HEADER);
  bucket_recdes.type = REC_HOME;

  /*
   * Insert the bucket header into the first slot (slot # 0)
   * on the bucket page
   */
  success = spage_insert (thread_p, page_p, &bucket_recdes, &slot_id);
  if (success != SP_SUCCESS)
    {
      /*
       * Slotted page module refuses to insert a short size record to an
       * empty page. This should never happen.
       */
      if (success != SP_ERROR)
    {
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_GENERIC_ERROR, 0);
      error_code = ER_FAILED;
    }
      else
    {
      ASSERT_ERROR_AND_SET (error_code);
    }
      return error_code;
    }

  pgbuf_set_dirty (thread_p, page_p, DONT_FREE);

  return NO_ERROR;
}

/*
 * fhs_initialize_dk_bucket_new_page () - Initialize a newly allocated page
 *
 * return    : Error code.
 * thread_p (in) : Thread entry
 * page_p (in)   : New bucket page
 * args (in)     : depth. Used to initialize and log the page.
 */
static int
fhs_initialize_dk_bucket_new_page (THREAD_ENTRY * thread_p, PAGE_PTR page_p, void *args)
{
  FHS_DK_BUCKET_HEADER dk_bucket_header;
  RECDES bucket_recdes;
  PGSLOTID slot_id;
  int success = SP_SUCCESS;

  int error_code = NO_ERROR;
  /*
   * fetch and initialize the new page. The parameter UNANCHORED_KEEP_
   * SEQUENCE indicates that the order of records will be preserved
   * during insertions and deletions.
   */

  pgbuf_set_page_ptype (thread_p, page_p, PAGE_EHASH);

  /*
   * Initialize the bucket to contain variable-length records
   * on ordered slots.
   */
  spage_initialize (thread_p, page_p, UNANCHORED_KEEP_SEQUENCE, FHS_ALIGNMENT, DONT_SAFEGUARD_RVSPACE);

  /* Initialize the bucket header */
  dk_bucket_header.next_bucket = VPID_INITIALIZER;
  dk_bucket_header.last_bucket = VPID_INITIALIZER;

  /* Set the record descriptor to the Bucket header */
  bucket_recdes.data = (char *) &dk_bucket_header;
  bucket_recdes.area_size = bucket_recdes.length = sizeof (FHS_DK_BUCKET_HEADER);
  bucket_recdes.type = REC_HOME;

  /*
   * Insert the bucket header into the first slot (slot # 0)
   * on the bucket page
   */
  success = spage_insert (thread_p, page_p, &bucket_recdes, &slot_id);
  if (success != SP_SUCCESS)
    {
      /*
       * Slotted page module refuses to insert a short size record to an
       * empty page. This should never happen.
       */
      if (success != SP_ERROR)
    {
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_GENERIC_ERROR, 0);
      error_code = ER_FAILED;
    }
      else
    {
      ASSERT_ERROR_AND_SET (error_code);
    }
      return error_code;
    }

  pgbuf_set_dirty (thread_p, page_p, DONT_FREE);

  return NO_ERROR;
}

/*
 * fhs_initialize_dir_new_pages () - Initialize new page used to expand extensible hash directory.
 *
 * return       : Error code
 * thread_p (in)    : Thread entry
 * page_p (in)      : New directory page
 * ignore_args (in) : (not used)
 */
static int
fhs_initialize_dir_new_page (THREAD_ENTRY * thread_p, PAGE_PTR page_p, void *args)
{
  pgbuf_set_page_ptype (thread_p, page_p, PAGE_EHASH);
  pgbuf_set_dirty (thread_p, page_p, DONT_FREE);

  return NO_ERROR;
}

/*
 * fhs_search () - Search for the first key of duplicated keys;
 *           return associated value.
 *   return: EH_SEARCH
 *   fhsid(in): identifier for the extendible hashing structure
 *   key(in): key to search
 *   value_ptr(out): pointer to return the value associated with the key
 *
 * Note: Returns the value associated with the given key, if it is
 * found in the specified extendible hashing structure. If the
 * key is not found an error condition is returned.
 */
EH_SEARCH
fhs_search (THREAD_ENTRY * thread_p, HASH_LIST_SCAN * hlsid_p, TFTID * value_p)
{
  PAGE_PTR bucket_page_p = NULL;
  PAGE_PTR dk_bucket_page_p = NULL;
  VPID bucket_vpid, dk_bucket_vpid;
  RECDES recdes;
  PGSLOTID slot_id;
  EH_SEARCH result = EH_KEY_NOTFOUND;
  short flag;
  TFTID temp_tftid;

  if (fhs_find_bucket_vpid_with_hash
      (thread_p, hlsid_p->file.hash_table, (void *) &hlsid_p->curr_hash_key, PGBUF_LATCH_READ, PGBUF_LATCH_READ,
       &bucket_vpid, NULL, NULL) != NO_ERROR)
    {
      return EH_ERROR_OCCURRED;
    }
  if (bucket_vpid.pageid == NULL_PAGEID)
    {
      result = EH_KEY_NOTFOUND;
      goto end;
    }
  bucket_page_p = fhs_fix_old_page (thread_p, &hlsid_p->file.hash_table->bucket_file, &bucket_vpid, PGBUF_LATCH_READ);
  if (bucket_page_p == NULL)
    {
      result = EH_ERROR_OCCURRED;
      goto end;
    }
  if (fhs_locate_slot (thread_p, bucket_page_p, (void *) &hlsid_p->curr_hash_key, &slot_id, false) == false)
    {
      result = EH_KEY_NOTFOUND;
      goto end;
    }
  if (spage_get_record (thread_p, bucket_page_p, slot_id, &recdes, PEEK) != S_SUCCESS)
    {
      result = EH_ERROR_OCCURRED;
      goto end;
    }

  fhs_read_flag_from_record (recdes.data, &flag);
  if (flag == FHS_FLAG_INDIRECT)
    {
      /* indirect data. search DK bucket */
      hlsid_p->file.is_dk_bucket = true;
      fhs_read_tftid_from_record (recdes.data, &temp_tftid);
      SET_VPID (dk_bucket_vpid, temp_tftid.volid, temp_tftid.pageid);
      /* fix dk bucket page */
      dk_bucket_page_p =
    fhs_fix_old_page (thread_p, &hlsid_p->file.hash_table->bucket_file, &dk_bucket_vpid, PGBUF_LATCH_READ);
      if (dk_bucket_page_p == NULL)
    {
      result = EH_ERROR_OCCURRED;
      goto end;
    }
      /* get first record */
      if (spage_get_record (thread_p, dk_bucket_page_p, FHS_FIRST_SLOT_ID, &recdes, PEEK) != S_SUCCESS)
    {
      result = EH_ERROR_OCCURRED;
      goto end;
    }
      fhs_read_tftid_from_record (recdes.data, value_p);
      /* save last oid */
      SET_OID (&hlsid_p->file.curr_oid, dk_bucket_vpid.volid, dk_bucket_vpid.pageid, FHS_FIRST_SLOT_ID);
      result = EH_KEY_FOUND;
    }
  else
    {
      /* direct data */
      hlsid_p->file.is_dk_bucket = false;
      fhs_read_tftid_from_record (recdes.data, value_p);
      /* save last oid */
      SET_OID (&hlsid_p->file.curr_oid, bucket_vpid.volid, bucket_vpid.pageid, slot_id);
      result = EH_KEY_FOUND;
    }

end:
  if (bucket_page_p)
    {
      pgbuf_unfix_and_init (thread_p, bucket_page_p);
    }
  if (dk_bucket_page_p)
    {
      pgbuf_unfix_and_init (thread_p, dk_bucket_page_p);
    }

  return result;
}

/*
 * fhs_search_next () - Search for the next key; return associated value
 *   return: EH_SEARCH
 *   hlsid_p(in): identifier for the hash list scan
 *
 * Note: Returns the next value associated with the given key, if it is
 * found in the specified extendible hashing structure.
 */
EH_SEARCH
fhs_search_next (THREAD_ENTRY * thread_p, HASH_LIST_SCAN * hlsid_p, TFTID * value_p)
{
  PAGE_PTR bucket_page_p = NULL;
  PAGE_PTR next_bucket_page_p = NULL;
  VPID bucket_vpid, next_bucket_vpid;
  RECDES recdes;
  EH_SEARCH result = EH_KEY_NOTFOUND;
  char *bucket_key;
  int compare_result;
  PGSLOTID num_record;
  TFTID temp_tftid;

  if (hlsid_p->file.curr_oid.pageid == NULL_PAGEID)
    {
      return EH_KEY_NOTFOUND;
    }

  SET_VPID (bucket_vpid, hlsid_p->file.curr_oid.volid, hlsid_p->file.curr_oid.pageid);
  bucket_page_p = fhs_fix_old_page (thread_p, &hlsid_p->file.hash_table->ehid.vfid, &bucket_vpid, PGBUF_LATCH_READ);
  if (bucket_page_p == NULL)
    {
      result = EH_ERROR_OCCURRED;
      goto end;
    }

  if (hlsid_p->file.is_dk_bucket)
    {
      num_record = spage_number_of_records (bucket_page_p) - 1;
      if (hlsid_p->file.curr_oid.slotid >= num_record)
    {
      /* already last slot id */
      /* search data in next bucket */
      (void) spage_get_record (thread_p, bucket_page_p, FHS_HEADER_SLOT_ID, &recdes, PEEK);
      fhs_read_tftid_from_record (recdes.data, &temp_tftid);
      SET_VPID (next_bucket_vpid, temp_tftid.volid, temp_tftid.pageid);
      if (VPID_ISNULL (&next_bucket_vpid))
        {
          result = EH_KEY_NOTFOUND;
          goto end;
        }
      next_bucket_page_p =
        fhs_fix_old_page (thread_p, &hlsid_p->file.hash_table->ehid.vfid, &next_bucket_vpid, PGBUF_LATCH_READ);
      /* get first record */
      if (spage_get_record (thread_p, next_bucket_page_p, FHS_FIRST_SLOT_ID, &recdes, PEEK) != S_SUCCESS)
        {
          result = EH_ERROR_OCCURRED;
          goto end;
        }
      fhs_read_tftid_from_record (recdes.data, value_p);
      /* save last oid */
      SET_OID (&hlsid_p->file.curr_oid, next_bucket_vpid.volid, next_bucket_vpid.pageid, FHS_FIRST_SLOT_ID);
      result = EH_KEY_FOUND;
      goto end;
    }
      else
    {
      (void) spage_get_record (thread_p, bucket_page_p, ++hlsid_p->file.curr_oid.slotid, &recdes, PEEK);
      fhs_read_tftid_from_record (recdes.data, value_p);
      result = EH_KEY_FOUND;
      goto end;
    }
    }
  else
    {
      if (hlsid_p->file.curr_oid.slotid <= 1)
    {
      /* already last slot id */
      result = EH_KEY_NOTFOUND;
      goto end;
    }
      (void) spage_get_record (thread_p, bucket_page_p, --hlsid_p->file.curr_oid.slotid, &recdes, PEEK);

      /* compare key */
      bucket_key = (char *) recdes.data;
      bucket_key += sizeof (TFTID);
      if (fhs_compare_key (thread_p, bucket_key, (void *) &hlsid_p->curr_hash_key, recdes.type, &compare_result) !=
      NO_ERROR)
    {
      result = EH_ERROR_OCCURRED;
      goto end;
    }
      if (compare_result != 0)
    {
      /* the all keys is already found */
      result = EH_KEY_NOTFOUND;
      goto end;
    }

      fhs_read_tftid_from_record (recdes.data, value_p);
      result = EH_KEY_FOUND;
      goto end;
    }

end:
  if (bucket_page_p)
    {
      pgbuf_unfix_and_init (thread_p, bucket_page_p);
    }
  if (next_bucket_page_p)
    {
      pgbuf_unfix_and_init (thread_p, next_bucket_page_p);
    }
  return result;
}

static int
fhs_find_bucket_vpid_with_hash (THREAD_ENTRY * thread_p, FHSID * fhsid_p, void *key_p, PGBUF_LATCH_MODE root_latch,
                PGBUF_LATCH_MODE bucket_latch, VPID * out_vpid_p, FHS_HASH_KEY * out_hash_key_p,
                int *out_location_p)
{
  FHS_HASH_KEY hash_key;
  int location, dir_offset;
  FHS_DIR_RECORD *dir_record_p;
  PAGE_PTR dir_page_p;

  /* Get the pseudo key */
  hash_key = *((FHS_HASH_KEY *) key_p);
  if (out_hash_key_p)
    {
      *out_hash_key_p = hash_key;
    }

  /* Find the location of bucket pointer in the directory */
  location = FIND_OFFSET (hash_key, fhsid_p->depth);
  if (out_location_p)
    {
      *out_location_p = location;
    }

  /* Find the bucket page containing the key */
  fhs_dir_locate (&dir_offset, &location);
  dir_page_p = fhs_fix_nth_page (thread_p, &fhsid_p->ehid.vfid, dir_offset, bucket_latch);
  if (dir_page_p == NULL)
    {
      return ER_FAILED;
    }
  dir_record_p = (FHS_DIR_RECORD *) ((char *) dir_page_p + location);
  *out_vpid_p = dir_record_p->bucket_vpid;
  pgbuf_unfix_and_init (thread_p, dir_page_p);

  return NO_ERROR;
}

/*
 * fhs_locate_slot () - Locate the slot for the key
 *   return: int
 *   buc_pgptr(in): pointer to the bucket page
 *   key(in): key value to search
 *   position(out): set to the location of the slot that contains the key if
 *                  it exists, or that would contain the key if it does not.
 *
 * Note: This function locates a specific key in a bucket
 * (namely, the slot number in the page). Currently this search
 * is done by using the binary search method on the real key
 * values. An alternative method is to use another hashing
 * mechanism on the pseudo keys. If the key does not exist,
 * position is set to the SLOTID value where it could be
 * inserted and false is returned.
 */
static bool
fhs_locate_slot (THREAD_ENTRY * thread_p, PAGE_PTR bucket_page_p, void *key_p,
         PGSLOTID * out_position_p, bool need_to_backward)
{
  RECDES recdes;
  PGSLOTID num_record;
  PGSLOTID first_slot_id = -1;

  num_record = spage_number_of_records (bucket_page_p) - 1;

  /*
   * If the bucket does not contain any records other than the header,
   * then return immediately.
   */
  if (num_record < 1)
    {
      if (spage_next_record (bucket_page_p, &first_slot_id, &recdes, PEEK) != S_SUCCESS)
    {
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_EH_CORRUPTED, 0);
      *out_position_p = 1;
      return false;
    }

      *out_position_p = first_slot_id + 1;
      return false;
    }

  return fhs_binary_search_bucket (thread_p, bucket_page_p, num_record, key_p, out_position_p, need_to_backward);
}

static int
fhs_compare_key (THREAD_ENTRY * thread_p, char *bucket_record_p, void *key_p, INT16 record_type,
         int *out_compare_result_p)
{
  int compare_result;
  unsigned int u1, u2;

  u1 = *((unsigned int *) key_p);
  u2 = *((unsigned int *) bucket_record_p);

  if (u1 == u2)
    {
      compare_result = 0;
    }
  else if (u1 > u2)
    {
      compare_result = 1;
    }
  else
    {
      compare_result = -1;
    }

  *out_compare_result_p = compare_result;
  return NO_ERROR;
}

/*
 * fhs_hash () - Hash function
 *   return: FHS_HASH_KEY
 *   orig_key(in): original key to encode into a pseudo key
 *
 * Note: This function converts the given original key into a pseudo
 * key. Since the original key is presented as a character
 * string, its conversion into a int-compatible type is essential
 * prior to performing any operation on it.
 * This function does not change the value of parameter
 * orig_key, as it might be on a bucket.
 */
static FHS_HASH_KEY
fhs_hash (void *original_key_p)
{
  char *key = (char *) original_key_p;
  FHS_HASH_KEY hash_key = 0;
  hash_key = fhs_hash_four_bytes_type (key);

  return hash_key;
}

static FHS_HASH_KEY
fhs_hash_four_bytes_type (char *key_p)
{
  FHS_HASH_KEY hash_key = 0;
  unsigned int i;
  char Char;

  hash_key = (FHS_HASH_KEY) ntohl (*(unsigned int *) key_p);

  /* Go over the chars of the given pseudo key */
  Char = '\0';
  key_p = (char *) &hash_key;
  for (i = 0; i < sizeof (FHS_HASH_KEY); i++)
    {
      Char += (char) *key_p++;
    }

  /* Change the first byte of the pseudo key to the SUM of all of them */
  memcpy (&hash_key, &Char, sizeof (char));

  return hash_key;
}

static bool
fhs_binary_search_bucket (THREAD_ENTRY * thread_p, PAGE_PTR bucket_page_p, PGSLOTID num_record,
              void *key_p, PGSLOTID * out_position_p, bool need_to_backward)
{
  char *bucket_record_p;
  RECDES recdes;
  PGSLOTID low, high;
  PGSLOTID middle;
  int compare_result;
  short flag;

  low = 1;
  high = num_record;

  do
    {
      middle = (high + low) >> 1;

      if (spage_get_record (thread_p, bucket_page_p, middle, &recdes, PEEK) != S_SUCCESS)
    {
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_EH_CORRUPTED, 0);
      return false;
    }
      fhs_read_flag_from_record ((char *) recdes.data, &flag);
      bucket_record_p = (char *) recdes.data;
      bucket_record_p += sizeof (TFTID);

      if (fhs_compare_key (thread_p, bucket_record_p, key_p, recdes.type, &compare_result) != NO_ERROR)
    {
      return false;
    }

      if (compare_result == 0)
    {
      if (need_to_backward)
        {
          /* Find the first slotid */
          while (compare_result == 0)
        {
          if (middle == 1)
            {
              *out_position_p = middle;
              return true;
            }
          --middle;

          if (spage_get_record (thread_p, bucket_page_p, middle, &recdes, PEEK) != S_SUCCESS)
            {
              er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_EH_CORRUPTED, 0);
              return false;
            }

          bucket_record_p = (char *) recdes.data;
          bucket_record_p += sizeof (TFTID);

          if (fhs_compare_key (thread_p, bucket_record_p, key_p, recdes.type, &compare_result) != NO_ERROR)
            {
              return false;
            }
        }
          *out_position_p = middle + 1;
        }
      else
        {
          /* go to last slot using flag */
          if (flag == FHS_FLAG_INDIRECT)
        {
          *out_position_p = middle;
        }
          else if (flag >= 1)
        {
          *out_position_p = middle + flag - 1;
        }
          else
        {
          er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_EH_CORRUPTED, 0);
          return false;
        }
        }
      return true;
    }

      if (compare_result < 0)
    {
      high = middle - 1;
    }
      else
    {
      middle += (flag > 0) ? flag - 1 : 0;  /* go to last slot using flag */
      low = middle + 1;
    }
    }
  while (high >= low);

  if (high < middle)
    {
      *out_position_p = middle;
    }
  else
    {
      /* Key is NOT in bucket; it should be located at the right of middle key. */
      *out_position_p = middle + 1;
    }

  return false;
}

/*
 * fhs_insert () - Insert (key, assoc_value) pair to ext. hashing
 *   return: void * (NULL is returned in case of error)
 *   fhsid(in): identifier of the extendible hashing structure
 *   key(in): key value to insert
 *   value_ptr(in): pointer to the associated value to insert
 *
 * Note: Inserts the given (key & assoc_value) pair into the specified
 * extendible hashing structure. If the key already exists then
 * the previous associated value is replaced with the new one.
 * Otherwise, a new record is inserted to the correct bucket.
 *      To perform the insertion operation safely in the concurrent
 * environment, an auxiliary (and private) function
 */
void *
fhs_insert (THREAD_ENTRY * thread_p, FHSID * fhsid_p, void *key_p, TFTID * value_p)
{
  VPID bucket_vpid;

  FHS_HASH_KEY hash_key;
  int location;
  FHS_RESULT result;

  if (fhs_find_bucket_vpid_with_hash (thread_p, fhsid_p, key_p,
                      PGBUF_LATCH_WRITE, PGBUF_LATCH_READ, &bucket_vpid, &hash_key,
                      &location) != NO_ERROR)
    {
      return NULL;
    }

  if (VPID_ISNULL (&bucket_vpid))
    {
      if (fhs_insert_to_bucket_after_create (thread_p, fhsid_p, &bucket_vpid, location, hash_key, key_p, value_p) !=
      NO_ERROR)
    {
      return NULL;
    }
    }
  else
    {
      result = fhs_insert_bucket_after_extend_if_need (thread_p, fhsid_p, &bucket_vpid, key_p, hash_key, value_p);
      if (result == FHS_ERROR_OCCURRED)
    {
      return NULL;
    }
      else if (result == FHS_BUCKET_FULL)
    {
      return fhs_insert (thread_p, fhsid_p, key_p, value_p);
    }
    }

  return (key_p);
}

static int
fhs_insert_to_bucket_after_create (THREAD_ENTRY * thread_p, FHSID * fhsid_p, VPID * bucket_vpid_p, int location,
                   FHS_HASH_KEY hash_key, void *key_p, TFTID * value_p)
{
  PAGE_PTR bucket_page_p;
  FHS_BUCKET_HEADER bucket_header;
  char found_depth;
  char init_bucket_data;
  VPID null_vpid = VPID_INITIALIZER;
  FHS_RESULT ins_result;

  int error_code = NO_ERROR;

  found_depth = fhs_find_depth (thread_p, fhsid_p, location, &null_vpid, &null_vpid);
  if (found_depth == 0)
    {
      return ER_FAILED;
    }

  init_bucket_data = fhsid_p->depth - found_depth;
  bucket_header.local_depth = init_bucket_data;

  error_code = file_alloc (thread_p, &fhsid_p->bucket_file, fhs_initialize_bucket_new_page, &init_bucket_data,
               bucket_vpid_p, &bucket_page_p);
  if (error_code != NO_ERROR)
    {
      ASSERT_ERROR ();
      return ER_FAILED;
    }
  if (bucket_page_p == NULL)
    {
      assert_release (false);
      return ER_FAILED;
    }
#if !defined (NDEBUG)
  (void) pgbuf_check_page_ptype (thread_p, bucket_page_p, PAGE_EHASH);
#endif

  if (fhs_connect_bucket (thread_p, fhsid_p, bucket_header.local_depth, hash_key, bucket_vpid_p) != NO_ERROR)
    {
      pgbuf_unfix_and_init (thread_p, bucket_page_p);
      return ER_FAILED;
    }

  ins_result = fhs_insert_to_bucket (thread_p, fhsid_p, bucket_page_p, key_p, value_p);

  if (ins_result != FHS_SUCCESSFUL_COMPLETION)
    {
      /* Slotted page module refuses to insert a short size record to an almost empty page. This should never happen. */
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_GENERIC_ERROR, 0);
      pgbuf_unfix_and_init (thread_p, bucket_page_p);
      return ER_FAILED;
    }

  pgbuf_unfix_and_init (thread_p, bucket_page_p);
  return NO_ERROR;
}

static FHS_RESULT
fhs_insert_bucket_after_extend_if_need (THREAD_ENTRY * thread_p, FHSID * fhsid_p, VPID * bucket_vpid_p, void *key_p,
                    FHS_HASH_KEY hash_key, TFTID * value_p)
{
  PAGE_PTR bucket_page_p;
  PAGE_PTR sibling_page_p = NULL;
  PAGE_PTR target_bucket_page_p;
  int new_bit;
  FHS_RESULT result;

  /* We need to put a X_LOCK on bucket page */
  bucket_page_p = fhs_fix_old_page (thread_p, &fhsid_p->bucket_file, bucket_vpid_p, PGBUF_LATCH_WRITE);
  if (bucket_page_p == NULL)
    {
      return FHS_ERROR_OCCURRED;
    }

  result = fhs_insert_to_bucket (thread_p, fhsid_p, bucket_page_p, key_p, value_p);
  if (result == FHS_BUCKET_FULL)
    {
      sibling_page_p = fhs_extend_bucket (thread_p, fhsid_p, bucket_page_p, key_p, hash_key, &new_bit, bucket_vpid_p);
      if (sibling_page_p == NULL)
    {
      pgbuf_unfix_and_init (thread_p, bucket_page_p);
      return FHS_ERROR_OCCURRED;
    }

      /*
       * Try to insert the new key & assoc_value pair into one of the buckets.
       * The result of this attempt will determine if a recursive call is
       * needed to insert the new key.
       */

      if (new_bit)
    {
      target_bucket_page_p = sibling_page_p;
    }
      else
    {
      target_bucket_page_p = bucket_page_p;
    }

      result = fhs_insert_to_bucket (thread_p, fhsid_p, target_bucket_page_p, key_p, value_p);
      pgbuf_unfix_and_init (thread_p, sibling_page_p);
    }

  pgbuf_unfix_and_init (thread_p, bucket_page_p);
  return result;
}

/*
 * fhs_find_depth () - Find depth
 *   return: char (will return 0 in case of error)
 *   fhsid(in): identifier for the extendible hashing structure
 *   location(in): directory entry whose neighbooring area should be checked
 *   bucket_vpid(in): vpid of the bucket pointed by the specified directory entry
 *   sib_vpid(in): vpid of the sibling bucket
 *
 */
static char
fhs_find_depth (THREAD_ENTRY * thread_p, FHSID * fhsid_p, int location, VPID * bucket_vpid_p, VPID * sibling_vpid_p)
{
  PAGE_PTR page_p = NULL;
  FHS_DIR_RECORD *dir_record_p;
  int loc;
  int rel_loc;
  int iterations;
  int page_no;
  int prev_page_no = -1;
  int i;
  unsigned int clear;
  char check_depth = 2;
  bool is_stop = false;
  int check_bit;

  while ((check_depth <= fhsid_p->depth) && !is_stop)
    {
      /*
       * Find the base location for this iteration. The base location differs from
       * the original location at the check_depth bit (it has the opposite value
       * for this bit position) and at the remaining least significant bit
       * positions (it has 0 for these bit positions).
       */
      clear = 1;
      for (i = 1; i < check_depth - 1; i++)
    {
      clear <<= 1;
      clear += 1;
    }

      clear = ~clear;
      loc = location & clear;

      check_bit = GETBIT (loc, FHS_HASH_KEY_BITS - check_depth + 1);
      if (check_bit != 0)
    {
      loc = CLEARBIT (loc, FHS_HASH_KEY_BITS - check_depth + 1);
    }
      else
    {
      loc = SETBIT (loc, FHS_HASH_KEY_BITS - check_depth + 1);
    }

      /* "loc" is the base_location now */

      iterations = 1 << (check_depth - 2);

      for (i = 0; i < iterations; i++)
    {
      rel_loc = loc | (i << 1);
      fhs_dir_locate (&page_no, &rel_loc);
      if (prev_page_no == -1)
        {
          /* first time */
          page_p = fhs_fix_nth_page (thread_p, &fhsid_p->ehid.vfid, page_no, PGBUF_LATCH_WRITE);
          if (page_p == NULL)
        {
          return 0;
        }
          prev_page_no = page_no;
        }
      else if (page_no != prev_page_no)
        {
          pgbuf_unfix_and_init (thread_p, page_p);
          page_p = fhs_fix_nth_page (thread_p, &fhsid_p->ehid.vfid, page_no, PGBUF_LATCH_WRITE);
          if (page_p == NULL)
        {
          return 0;
        }
          prev_page_no = page_no;
        }
      dir_record_p = (FHS_DIR_RECORD *) ((char *) page_p + rel_loc);

      if (!((VPID_ISNULL (&dir_record_p->bucket_vpid)) || VPID_EQ (&dir_record_p->bucket_vpid, bucket_vpid_p)
        || VPID_EQ (&dir_record_p->bucket_vpid, sibling_vpid_p)))
        {
          is_stop = true;
          break;
        }
    }

      if (!is_stop)
    {
      check_depth++;
    }
    }

  pgbuf_unfix_and_init (thread_p, page_p);
  return (check_depth - 1);
}

/*
 * fhs_connect_bucket () - Connect bucket to directory
 *   return: int NO_ERROR, or ER_FAILED
 *   fhsid(in): identifier for the extendible hashing structure
 *   local_depth(in): local depth of the bucket; determines how many directory
 *                    pointers are set to point to the given bucket identifier
 *   hash_key(in): pseudo key that led to the bucket page; determines which
 *                   pointers to be updated
 *   bucket_vpid(in): Identifier of the bucket page to connect
 *
 * Note: This function connects the given bucket to the directory of
 * specified extendible hashing structure. All the directory
 * pointers whose number have the same value as the hash_key
 * for the first "local_depth" bits are updated with "bucket_vpid".
 * Since these pointers are successive to each other, it is
 * known that a contagious area (possibly over several pages)
 * on the directory will be updated. Thus, this function is
 * implemented in the following way: First,  the directory pages
 * to be updated are determined. Then, these pages are retrieved,
 * updated and released one at a time. For recovery purposes, the
 * contents of these pages are logged before and after they are updated.
 */
static int
fhs_connect_bucket (THREAD_ENTRY * thread_p, FHSID * fhsid_p, int local_depth, FHS_HASH_KEY hash_key,
            VPID * bucket_vpid_p)
{
  FHS_DIR_RECORD *dir_record_p;
  PAGE_PTR page_p;
  int location;
  int first_page, last_page;
  int first_ptr_offset, last_ptr_offset;
  int start_offset, end_offset;
  int diff;
  int i, j;
  int bef_length, count;
  unsigned int set_bits;
  unsigned int clear_bits;

  /* First find out how many page entries will be updated in the directory */
  location = GETBITS (hash_key, 1, fhsid_p->depth);

  diff = fhsid_p->depth - local_depth;
  if (diff != 0)
    {
      /* There are more than one pointers that need to be updated */
      for (set_bits = 0, i = 0; i < diff; i++)
    {
      set_bits = 2 * set_bits + 1;
    }
      clear_bits = ~set_bits;

      first_ptr_offset = location & clear_bits;
      last_ptr_offset = location | set_bits;

      fhs_dir_locate (&first_page, &first_ptr_offset);
      fhs_dir_locate (&last_page, &last_ptr_offset);
    }
  else
    {
      /* There is only one pointer that needs to be updated */
      first_ptr_offset = location;
      fhs_dir_locate (&first_page, &first_ptr_offset);
      last_page = first_page;
      last_ptr_offset = first_ptr_offset;
    }

  /* Go over all of these pages */
  for (i = first_page; i <= last_page; i++)
    {
      page_p = fhs_fix_nth_page (thread_p, &fhsid_p->ehid.vfid, i, PGBUF_LATCH_WRITE);
      if (page_p == NULL)
    {
      return ER_FAILED;
    }

      if (i == first_page)
    {
      start_offset = first_ptr_offset;
    }
      else
    {
      start_offset = 0;
    }

      if (i == last_page)
    {
      end_offset = last_ptr_offset + sizeof (FHS_DIR_RECORD);
    }
      else
    {
      end_offset = DB_PAGESIZE;
    }

      /* Log this page */
      bef_length = end_offset - start_offset;
      count = bef_length / sizeof (FHS_DIR_RECORD);

      /* Update the directory page */
      dir_record_p = (FHS_DIR_RECORD *) ((char *) page_p + start_offset);
      for (j = 0; j < count; j++)
    {
      dir_record_p->bucket_vpid = *bucket_vpid_p;
      dir_record_p++;
    }
      pgbuf_set_dirty (thread_p, page_p, FREE);
    }

  return NO_ERROR;
}

/*
 * fhs_insert_to_bucket () - Insert (key, value) to bucket
 *   return: FHS_RESULT
 *   fhsid(in): identifier for the extendible hashing structure
 *   buc_pgptr(in): bucket page to insert the key
 *   key_ptr(in): Pointer to the key
 *   value_ptr(in): Pointer to the associated value
 *
 * Note: This function is used to insert a (key & assoc_value) pair
 * onto the given bucket. If the KEY already EXISTS in the bucket
 * the NEW ASSOCIATED VALUE REPLACES THE OLD ONE. Otherwise a
 * new entry is added to the bucket and the total number of
 * entries of this extendible hashing structure is incremented
 * by one. In the latter case, if the insertion is not possible
 * for some reason (e.g., the bucket does not have enough space
 * for the new record, etc.) an appropriate error code is returned.
 */
static FHS_RESULT
fhs_insert_to_bucket (THREAD_ENTRY * thread_p, FHSID * fhsid_p, PAGE_PTR bucket_page_p, void *key_p, TFTID * value_p)
{
  RECDES bucket_recdes;
  RECDES old_bucket_recdes;

  PGSLOTID slot_no, tmp_slot;
  int success;
  short flag;
  VPID dk_bucket_vpid;
  PAGE_PTR dk_bucket_page_p = NULL;
  TFTID tmp_tftid;

  /* Check if insertion is duplicate, or not */
  if (fhs_locate_slot (thread_p, bucket_page_p, key_p, &slot_no, true) == true)
    {
      /* Key already exists. Append new data to the last slot of duplicate keys. */
      (void) spage_get_record (thread_p, bucket_page_p, slot_no, &old_bucket_recdes, PEEK);
      fhs_read_flag_from_record (old_bucket_recdes.data, &flag);

      if (flag == FHS_FLAG_INDIRECT)
    {
      /* the case of inserting record into DK bucket */
      /* get the dk bucket vpid */
      fhs_read_tftid_from_record (old_bucket_recdes.data, &tmp_tftid);
      SET_VPID (dk_bucket_vpid, tmp_tftid.volid, tmp_tftid.pageid);
      /* insert new record into dk_bucket */
      if (fhs_insert_to_dk_bucket (thread_p, fhsid_p, &dk_bucket_vpid, key_p, value_p) != FHS_SUCCESSFUL_COMPLETION)
        {
          /* This should never happen. */
          er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_GENERIC_ERROR, 0);
          goto error;
        }
      return FHS_SUCCESSFUL_COMPLETION;
    }
      else if (flag + 1 >= FHS_MAX_DUP_KEY)
    {
      /* the case of inserting firstly to DK bucket */
      /* make DK bucket page */
      success =
        file_alloc (thread_p, &fhsid_p->bucket_file, fhs_initialize_dk_bucket_new_page, NULL, &dk_bucket_vpid,
            &dk_bucket_page_p);
      if (success != NO_ERROR || dk_bucket_page_p == NULL)
        {
          er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_GENERIC_ERROR, 0);
          goto error;
        }
      /* insert new record into dk_bucket */
      if (fhs_compose_record (thread_p, key_p, value_p, &bucket_recdes, FHS_FLAG_DUMMY_NUM) != NO_ERROR)
        {
          er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_GENERIC_ERROR, 0);
          goto error;
        }
      success = spage_insert (thread_p, dk_bucket_page_p, &bucket_recdes, &tmp_slot);
      if (success != SP_SUCCESS)
        {
          er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_GENERIC_ERROR, 0);
          goto error;
        }
      fhs_free_recdes (thread_p, &bucket_recdes);

      /* move duplicate key from bucket to dk_bucket */
      success = spage_insert_at (thread_p, dk_bucket_page_p, tmp_slot, &old_bucket_recdes);
      if (success != SP_SUCCESS)
        {
          er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_GENERIC_ERROR, 0);
          goto error;
        }
      (void) spage_delete (thread_p, bucket_page_p, slot_no);
      for (int i = flag - 1; i > 0; i--)
        {
          (void) spage_get_record (thread_p, bucket_page_p, slot_no, &old_bucket_recdes, PEEK);
          success = spage_insert_at (thread_p, dk_bucket_page_p, tmp_slot, &old_bucket_recdes);
          if (success != SP_SUCCESS)
        {
          er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_GENERIC_ERROR, 0);
          goto error;
        }
          (void) spage_delete (thread_p, bucket_page_p, slot_no);
          /* slot_no is not changed since the current slot is deleted */
        }
      /* make indirect record for bucket */
      SET_TFTID (tmp_tftid, dk_bucket_vpid.volid, dk_bucket_vpid.pageid, 0);
      if (fhs_compose_record (thread_p, key_p, &tmp_tftid, &bucket_recdes, FHS_FLAG_INDIRECT) != NO_ERROR)
        {
          er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_GENERIC_ERROR, 0);
          goto error;
        }
    }
      else
    {
      /* the case of inserting duplicate key record into bucket */
      if (fhs_compose_record (thread_p, key_p, value_p, &bucket_recdes, ++flag) != NO_ERROR)
        {
          er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_GENERIC_ERROR, 0);
          goto error;
        }
    }
    }
  else
    {
      /* Key does not exist in the bucket */
      if (fhs_compose_record (thread_p, key_p, value_p, &bucket_recdes, 1) != NO_ERROR)
    {
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_GENERIC_ERROR, 0);
      goto error;
    }
    }
  /* insert it into the bucket */
  success = spage_insert_at (thread_p, bucket_page_p, slot_no, &bucket_recdes);
  if (success != SP_SUCCESS)
    {
      /* Problem the record was not inserted to the page */
      fhs_free_recdes (thread_p, &bucket_recdes);
      if (success == SP_DOESNT_FIT)
    {
      /* There is not enough space on the slotted page for the new record */
      if (dk_bucket_page_p)
        {
          pgbuf_set_dirty (thread_p, dk_bucket_page_p, FREE);
        }
      return FHS_BUCKET_FULL;
    }
      else
    {
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_GENERIC_ERROR, 0);
      goto error;
    }
    }

  if (bucket_recdes.data)
    {
      fhs_free_recdes (thread_p, &bucket_recdes);
    }
  if (dk_bucket_page_p)
    {
      pgbuf_set_dirty (thread_p, dk_bucket_page_p, FREE);
    }

  pgbuf_set_dirty (thread_p, bucket_page_p, DONT_FREE);
  return FHS_SUCCESSFUL_COMPLETION;

error:
  if (bucket_recdes.data)
    {
      fhs_free_recdes (thread_p, &bucket_recdes);
    }
  if (dk_bucket_page_p)
    {
      pgbuf_set_dirty (thread_p, dk_bucket_page_p, FREE);
    }
  return FHS_ERROR_OCCURRED;
}

/*
 * fhs_insert_to_dk_bucket () - Insert (key, value) to dk bucket
 *   return: FHS_RESULT
 *   fhsid(in): identifier for the extendible hashing structure
 *   buc_pgptr(in): bucket page to insert the key
 *   key_ptr(in): Pointer to the key
 *   value_ptr(in): Pointer to the associated value
 *
 * Note:
 * Insert (key, value) to DK bucket
 * if DK bucket is full, create new DK bucket and connect to prior DK bucket
 */
static FHS_RESULT
fhs_insert_to_dk_bucket (THREAD_ENTRY * thread_p, FHSID * fhsid_p, VPID * next_bucket_vpid, void *key_p,
             TFTID * value_p)
{
  RECDES bucket_recdes = RECDES_INITIALIZER, first_bucket_recdes = RECDES_INITIALIZER, last_bucket_recdes =
    RECDES_INITIALIZER;
  PGSLOTID tmp_slot;
  int success;
  FHS_DK_BUCKET_HEADER *first_dk_bucket_header_p = NULL, *last_dk_bucket_header_p = NULL;
  VPID last_bucket_vpid, cur_bucket_vpid;
  PAGE_PTR first_dk_bucket_page_p = NULL;
  PAGE_PTR last_dk_bucket_page_p = NULL;
  PAGE_PTR new_dk_bucket_page_p = NULL;
  bool is_first_bucket = false;

  /* get last DK bucket page. */
  first_dk_bucket_page_p = fhs_fix_old_page (thread_p, &fhsid_p->bucket_file, next_bucket_vpid, PGBUF_LATCH_WRITE);
  if (first_dk_bucket_page_p == NULL)
    {
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_GENERIC_ERROR, 0);
      goto error;
    }
  (void) spage_get_record (thread_p, first_dk_bucket_page_p, 0, &first_bucket_recdes, PEEK);
  first_dk_bucket_header_p = (FHS_DK_BUCKET_HEADER *) first_bucket_recdes.data;
  last_bucket_vpid = first_dk_bucket_header_p->last_bucket;

  /* fix last DK bucket page. */
  if (last_bucket_vpid.volid == NULL_VOLID && last_bucket_vpid.pageid == NULL_PAGEID)
    {
      /* first is last */
      last_dk_bucket_page_p = first_dk_bucket_page_p;
      last_dk_bucket_header_p = first_dk_bucket_header_p;
      is_first_bucket = true;
    }
  else
    {
      last_dk_bucket_page_p = fhs_fix_old_page (thread_p, &fhsid_p->bucket_file, &last_bucket_vpid, PGBUF_LATCH_WRITE);
      if (last_dk_bucket_page_p == NULL)
    {
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_GENERIC_ERROR, 0);
      goto error;
    }
      (void) spage_get_record (thread_p, last_dk_bucket_page_p, 0, &last_bucket_recdes, PEEK);
      last_dk_bucket_header_p = (FHS_DK_BUCKET_HEADER *) last_bucket_recdes.data;
    }

  /* insert new record into dk_bucket */
  if (fhs_compose_record (thread_p, key_p, value_p, &bucket_recdes, FHS_FLAG_DUMMY_NUM) != NO_ERROR)
    {
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_GENERIC_ERROR, 0);
      goto error;
    }
  success = spage_insert (thread_p, last_dk_bucket_page_p, &bucket_recdes, &tmp_slot);
  if (success == SP_DOESNT_FIT)
    {
      /* make new dk_bucket page */
      success =
    file_alloc (thread_p, &fhsid_p->bucket_file, fhs_initialize_dk_bucket_new_page, NULL, &cur_bucket_vpid,
            &new_dk_bucket_page_p);
      if (success != NO_ERROR || new_dk_bucket_page_p == NULL)
    {
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_GENERIC_ERROR, 0);
      goto error;
    }
      /* connect with prior page */
      last_dk_bucket_header_p->next_bucket = cur_bucket_vpid;
      pgbuf_set_dirty (thread_p, last_dk_bucket_page_p, DONT_FREE);

      /* set last dk bucket vpid at first dk bucket */
      first_dk_bucket_header_p->last_bucket = cur_bucket_vpid;
      pgbuf_set_dirty (thread_p, first_dk_bucket_page_p, DONT_FREE);

      /* insert record into new dk_bucket */
      success = spage_insert (thread_p, new_dk_bucket_page_p, &bucket_recdes, &tmp_slot);
      if (success != SP_SUCCESS)
    {
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_GENERIC_ERROR, 0);
      goto error;
    }
      pgbuf_set_dirty (thread_p, new_dk_bucket_page_p, DONT_FREE);
    }
  else if (success != SP_SUCCESS)
    {
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_GENERIC_ERROR, 0);
      goto error;
    }

  if (is_first_bucket)
    {
      last_dk_bucket_page_p = NULL;
    }
  if (first_dk_bucket_page_p)
    {
      pgbuf_unfix_and_init (thread_p, first_dk_bucket_page_p);
    }
  if (last_dk_bucket_page_p)
    {
      pgbuf_unfix_and_init (thread_p, last_dk_bucket_page_p);
    }
  fhs_free_recdes (thread_p, &bucket_recdes);
  if (new_dk_bucket_page_p)
    {
      pgbuf_unfix_and_init (thread_p, new_dk_bucket_page_p);
    }

  return FHS_SUCCESSFUL_COMPLETION;

error:
  if (bucket_recdes.data)
    {
      fhs_free_recdes (thread_p, &bucket_recdes);
    }
  if (is_first_bucket)
    {
      last_dk_bucket_page_p = NULL;
    }
  if (first_dk_bucket_page_p)
    {
      pgbuf_unfix_and_init (thread_p, first_dk_bucket_page_p);
    }
  if (last_dk_bucket_page_p)
    {
      pgbuf_unfix_and_init (thread_p, last_dk_bucket_page_p);
    }
  if (new_dk_bucket_page_p)
    {
      pgbuf_unfix_and_init (thread_p, new_dk_bucket_page_p);
    }
  return FHS_ERROR_OCCURRED;
}

static PAGE_PTR
fhs_extend_bucket (THREAD_ENTRY * thread_p, FHSID * fhsid_p,
           PAGE_PTR bucket_page_p, void *key_p, FHS_HASH_KEY hash_key, int *out_new_bit_p, VPID * bucket_vpid)
{
  VPID sibling_vpid;
  PAGE_PTR sibling_page_p = NULL;
  VPID null_vpid = VPID_INITIALIZER;
  int old_local_depth;
  int new_local_depth;

  sibling_page_p =
    fhs_split_bucket (thread_p, fhsid_p, bucket_page_p, key_p, &old_local_depth, &new_local_depth, &sibling_vpid);
  if (sibling_page_p == NULL)
    {
      return NULL;
    }

  /* Save the bit position of hash_key to be used later in deciding whether to insert the new key to original bucket or
   * to the new sibling bucket. */
  *out_new_bit_p = GETBIT (hash_key, new_local_depth);

  /* Check directory expansion condition */
  if (new_local_depth > fhsid_p->depth)
    {
      if (fhs_expand_directory (thread_p, fhsid_p, new_local_depth) != NO_ERROR)
    {
      pgbuf_unfix_and_init (thread_p, sibling_page_p);
      return NULL;
    }
    }

  /* Connect the buckets */
  if ((new_local_depth - old_local_depth) > 1)
    {
      /* First, set all of them to NULL_PAGEID */
      if (fhs_connect_bucket (thread_p, fhsid_p, old_local_depth, hash_key, &null_vpid) != NO_ERROR)
    {
      pgbuf_unfix_and_init (thread_p, sibling_page_p);
      return NULL;
    }

      /* Then, connect the Bucket page */
      hash_key = CLEARBIT (hash_key, new_local_depth);
      if (fhs_connect_bucket (thread_p, fhsid_p, new_local_depth, hash_key, bucket_vpid) != NO_ERROR)
    {
      pgbuf_unfix_and_init (thread_p, sibling_page_p);
    }
    }

  /* Finally, connect the Sibling bucket page */
  hash_key = SETBIT (hash_key, new_local_depth);
  if (fhs_connect_bucket (thread_p, fhsid_p, new_local_depth, hash_key, &sibling_vpid) != NO_ERROR)
    {
      pgbuf_unfix_and_init (thread_p, sibling_page_p);
      return NULL;
    }

  return sibling_page_p;
}

/*
 * fhs_split_bucket () - Split a bucket into two
 *   return: PAGE_PTR (Sibling Pageptr), NULL in case of error in split.
 *   buc_pgptr(in): pointer to bucket page to split
 *   key(in): key value to insert after split
 *   old_local_depth(in): old local depth before the split operation; to be set
 *   new_local_depth(in): new local depth after the split operation; to be set
 *   sib_vpid(in): vpid of the sibling bucket; to be set
 *
 * Note: This function splits the given bucket into two buckets.
 * First, the new local depth of the bucket is found. Then a
 * sibling bucket is allocated, and the records are
 * redistributed. The contents of the sibling bucket is logged
 * for recovery purposes. (Note that the contents of the original
 * bucket page that is splitting is logged in the insertion
 * function after this function returns. Thus, this function
 * does not release the buffer that contains this page). Finally,
 * the given (key & assoc_value) pair is tried to be inserted
 * onto the correct bucket. The result of this attempt is
 * returned. (Note that this split may cause only one very
 * small record separated from the others, and the new record
 * may still be too big to fit into the bucket.) In addition to
 * the return value, three parameters are set so that the
 * directory can be updated to reflect this split.
 */
static PAGE_PTR
fhs_split_bucket (THREAD_ENTRY * thread_p, FHSID * fhsid_p, PAGE_PTR bucket_page_p, void *key_p,
          int *out_old_local_depth_p, int *out_new_local_depth_p, VPID * sibling_vpid_p)
{
  FHS_BUCKET_HEADER *bucket_header_p;
  VFID bucket_vfid;
  PAGE_PTR sibling_page_p;
  RECDES recdes;
  PGSLOTID first_slot_id = -1;
  int num_records;
  char init_bucket_data;

  int error_code = NO_ERROR;

  num_records = spage_number_of_records (bucket_page_p);
  /* Retrieve the bucket header and update it */
  if (spage_next_record (bucket_page_p, &first_slot_id, &recdes, PEEK) != S_SUCCESS)
    {
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_EH_CORRUPTED, 0);
      return NULL;
    }
  bucket_header_p = (FHS_BUCKET_HEADER *) recdes.data;
  *out_old_local_depth_p = bucket_header_p->local_depth;

  if (fhs_find_first_bit_position (thread_p, fhsid_p, bucket_page_p, bucket_header_p, key_p, num_records,
                   first_slot_id, out_old_local_depth_p, out_new_local_depth_p) != NO_ERROR)
    {
      return NULL;
    }

  bucket_vfid = fhsid_p->bucket_file;
  init_bucket_data = *out_new_local_depth_p;

  error_code = file_alloc (thread_p, &bucket_vfid, fhs_initialize_bucket_new_page, &init_bucket_data, sibling_vpid_p,
               &sibling_page_p);
  if (error_code != NO_ERROR)
    {
      ASSERT_ERROR ();
      return NULL;
    }
  if (sibling_page_p == NULL)
    {
      assert_release (false);
      return NULL;
    }

  if (fhs_distribute_records_into_two_bucket (thread_p, fhsid_p, bucket_page_p, bucket_header_p, num_records,
                          first_slot_id, sibling_page_p) != NO_ERROR)
    {
      pgbuf_unfix_and_init (thread_p, sibling_page_p);
      if (file_dealloc (thread_p, &bucket_vfid, sibling_vpid_p, FILE_EXTENDIBLE_HASH) != NO_ERROR)
    {
      ASSERT_ERROR ();
    }
      VPID_SET_NULL (sibling_vpid_p);
      return NULL;
    }

  pgbuf_set_dirty (thread_p, sibling_page_p, DONT_FREE);
  pgbuf_set_dirty (thread_p, bucket_page_p, DONT_FREE);

  return sibling_page_p;
}

/*
 * fhs_expand_directory () - Expand directory
 *   return: NO_ERROR, or ER_FAILED
 *   fhsid(in): identifier for the extendible hashing structure to expand
 *   new_depth(in): requested new depth
 *
 * Note: This function expands the given extendible hashing directory
 * as many times as necessary to attain the requested depth.
 * This is performed with a single pass on the original
 * directory (starting from the last pointer and working through
 * the first one).
 * If the original directory is not large enough some new pages are
 * allocated for the new directory. If the disk becomes full
 * during this operation, or any system error occurs, the
 * directory expansion is canceled and an error code is
 * returned. For recovery purposes, the contents of the
 * directory pages are logged during the expansion operation.
 */
static int
fhs_expand_directory (THREAD_ENTRY * thread_p, FHSID * fhsid_p, int new_depth)
{
  int old_pages;
  int old_ptrs;
  int new_pages;
  int new_ptrs;
  int check_pages;
  int end_offset;
  int new_end_offset;
  int needed_pages;
  int i, j;
  int exp_times;

  PAGE_PTR old_dir_page_p;
  PGLENGTH old_dir_offset;
  int old_dir_nth_page;

  PAGE_PTR new_dir_page_p;
  PGLENGTH new_dir_offset;
  int new_dir_nth_page;

  int error_code = NO_ERROR;

  error_code = file_get_num_user_pages (thread_p, &fhsid_p->ehid.vfid, &old_pages);
  if (error_code != NO_ERROR)
    {
      ASSERT_ERROR ();
      return error_code;
    }
  old_pages -= 1;       /* The first page starts with 0 */
  old_ptrs = 1 << fhsid_p->depth;
  exp_times = 1 << (new_depth - fhsid_p->depth);

  new_ptrs = old_ptrs * exp_times;
  end_offset = old_ptrs - 1;    /* Dir first pointer has an offset of 0 */
  fhs_dir_locate (&check_pages, &end_offset);

  /* Find how many pages the expanded directory will occupy */
  new_end_offset = new_ptrs - 1;
  fhs_dir_locate (&new_pages, &new_end_offset);
  needed_pages = new_pages - old_pages;
  if (needed_pages > 0)
    {
      error_code =
    file_alloc_multiple (thread_p, &fhsid_p->ehid.vfid, fhs_initialize_dir_new_page, NULL, needed_pages, NULL);
      if (error_code != NO_ERROR)
    {
      ASSERT_ERROR ();
      return error_code;
    }
    }

  /*****************************
     Perform expansion
   ******************************/

  /* Initialize source variables */
  old_dir_nth_page = old_pages; /* The last page of the old directory */

  old_dir_page_p = fhs_fix_nth_page (thread_p, &fhsid_p->ehid.vfid, old_dir_nth_page, PGBUF_LATCH_WRITE);
  if (old_dir_page_p == NULL)
    {
      return ER_FAILED;
    }
  old_dir_offset = end_offset;

  /* Initialize destination variables */
  new_dir_nth_page = new_pages; /* The last page of the new directory */

  new_dir_page_p = fhs_fix_nth_page (thread_p, &fhsid_p->ehid.vfid, new_dir_nth_page, PGBUF_LATCH_WRITE);
  if (new_dir_page_p == NULL)
    {
      pgbuf_unfix_and_init (thread_p, old_dir_page_p);
      return ER_FAILED;
    }
  new_dir_offset = new_end_offset;

  /* Copy old directory records to new (expanded) area */
  for (j = old_ptrs; j > 0; j--)
    {
      if (old_dir_offset < 0)
    {
      /*
       * We reached the end of the old directory page.
       * The next bucket pointer is in the previous directory page
       */
      pgbuf_unfix_and_init (thread_p, old_dir_page_p);

      /* get another page */
      old_dir_nth_page--;

      old_dir_page_p = fhs_fix_nth_page (thread_p, &fhsid_p->ehid.vfid, old_dir_nth_page, PGBUF_LATCH_WRITE);
      if (old_dir_page_p == NULL)
        {
          /* Fetch error; so return */
          pgbuf_unfix_and_init (thread_p, new_dir_page_p);
          return ER_FAILED;
        }

      /*
       * Set the olddir_offset to the offset of the last pointer in the
       * current source directory page.
       */
      if (old_dir_nth_page)
        {
          /* This is not the first (root) directory page */
          old_dir_offset = FHS_LAST_OFFSET_IN_NON_FIRST_PAGE;
        }
      else
        {
          /* This is the first (root) directory page */
          old_dir_offset = FHS_LAST_OFFSET_IN_FIRST_PAGE;
        }
    }

      /* Copy the next directory record "exp_times" times */
      for (i = 1; i <= exp_times; i++)
    {
      if (new_dir_offset < 0)
        {
          /*
           * There is not any more entries in the new directory page.
           * Log this updated directory page, and get next one
           */
          pgbuf_set_dirty (thread_p, new_dir_page_p, FREE);

          /* get another page */
          new_dir_nth_page--;

          new_dir_page_p = fhs_fix_nth_page (thread_p, &fhsid_p->ehid.vfid, new_dir_nth_page, PGBUF_LATCH_WRITE);
          if (new_dir_page_p == NULL)
        {
          pgbuf_unfix_and_init (thread_p, old_dir_page_p);
          return ER_FAILED;
        }

          /* Set the newdir_offset to the offset of the last pointer in the current destination directory page. */
          if (new_dir_nth_page != 0)
        {
          /* This is not the first (root) dir page */
          new_dir_offset = FHS_LAST_OFFSET_IN_NON_FIRST_PAGE;
        }
          else
        {
          /* This is the first (root) dir page */
          new_dir_offset = FHS_LAST_OFFSET_IN_FIRST_PAGE;
        }
        }

      if ((char *) new_dir_page_p + new_dir_offset != (char *) old_dir_page_p + old_dir_offset)
        {
          memcpy ((char *) new_dir_page_p + new_dir_offset, (char *) old_dir_page_p + old_dir_offset,
              sizeof (FHS_DIR_RECORD));
        }

      /* Advance the destination pointer to new spot */
      new_dir_offset -= sizeof (FHS_DIR_RECORD);
    }

      /* Advance the source pointer to new spot */
      old_dir_offset -= sizeof (FHS_DIR_RECORD);
    }

  /*
   * Update the directory header
   */
  fhsid_p->depth = new_depth;

  /* Release the old and new directory pages */
  pgbuf_unfix_and_init (thread_p, old_dir_page_p);
  pgbuf_set_dirty (thread_p, new_dir_page_p, FREE);

  return NO_ERROR;
}

static int
fhs_find_first_bit_position (THREAD_ENTRY * thread_p, FHSID * fhsid_p, PAGE_PTR bucket_page_p,
                 FHS_BUCKET_HEADER * bucket_header_p, void *key_p, int num_recs, PGSLOTID first_slot_id,
                 int *out_old_local_depth_p, int *out_new_local_depth_p)
{
  int bit_position;
  unsigned int i;
  unsigned int difference = 0, check_bit = 0;
  FHS_HASH_KEY first_hash_key, next_hash_key;
  PGSLOTID slot_id;
  RECDES recdes;

  check_bit = 1 << (FHS_HASH_KEY_BITS - bucket_header_p->local_depth - 1);

  /* Get the first pseudo key */
  first_hash_key = *((FHS_HASH_KEY *) key_p);

  /* TO_DO : Is there any way to find the depth more effectively? */
  for (slot_id = first_slot_id + 1; slot_id < num_recs; slot_id++)
    {
      if (spage_get_record (thread_p, bucket_page_p, slot_id, &recdes, PEEK) != S_SUCCESS)
    {
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_EH_CORRUPTED, 0);
      return ER_EH_CORRUPTED;
    }

      if (fhs_get_pseudo_key (thread_p, &recdes, &next_hash_key) != NO_ERROR)
    {
      return ER_FAILED;
    }

      difference = (first_hash_key ^ next_hash_key) | difference;
      if (difference & check_bit)
    {
      break;
    }
    }

  /* Initilize bit_position to one greater than the old local depth */
  bit_position = *out_old_local_depth_p + 1;

  /* Find out the correct bit_position that the keys differ */
  for (i = bucket_header_p->local_depth + 1; i <= FHS_HASH_KEY_BITS; i++)
    {
      if (difference & check_bit)
    {
      bit_position = i;
      break;
    }

      /* Shift the check bit position to right */
      check_bit >>= 1;
    }

  /* Update the bucket header and the variable parameter to new local depth */
  bucket_header_p->local_depth = bit_position;
  *out_new_local_depth_p = bit_position;

  return NO_ERROR;
}

static int
fhs_distribute_records_into_two_bucket (THREAD_ENTRY * thread_p, FHSID * fhsid_p,
                    PAGE_PTR bucket_page_p, FHS_BUCKET_HEADER * bucket_header_p, int num_recs,
                    PGSLOTID first_slot_id, PAGE_PTR sibling_page_p)
{
  PGSLOTID slot_id, sibling_slot_id;
  RECDES recdes;
  FHS_HASH_KEY hash_key;
  int i, success;

  for (slot_id = i = first_slot_id + 1; i < num_recs; i++)
    {
      if (spage_get_record (thread_p, bucket_page_p, slot_id, &recdes, PEEK) != S_SUCCESS)
    {
      return ER_FAILED;
    }

      if (fhs_get_pseudo_key (thread_p, &recdes, &hash_key) != NO_ERROR)
    {
      return ER_FAILED;
    }

      if (GETBIT (hash_key, bucket_header_p->local_depth))
    {
      success = spage_insert (thread_p, sibling_page_p, &recdes, &sibling_slot_id);
      if (success != SP_SUCCESS)
        {
          er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_GENERIC_ERROR, 0);

          return ER_FAILED;
        }
      (void) spage_delete (thread_p, bucket_page_p, slot_id);
      /*
       * slotid is not changed since the current slot is deleted and the
       * next slot will take its slotid.
       */
    }
      else
    {
      /* skip the slot */
      slot_id++;
    }
    }

  return NO_ERROR;
}

static int
fhs_get_pseudo_key (THREAD_ENTRY * thread_p, RECDES * recdes_p, FHS_HASH_KEY * out_hash_key_p)
{
  FHS_HASH_KEY hash_key;
  char *bucket_record_p;

  bucket_record_p = (char *) recdes_p->data;
  bucket_record_p += sizeof (TFTID);
  hash_key = *((FHS_HASH_KEY *) bucket_record_p);

  *out_hash_key_p = hash_key;
  return NO_ERROR;
}