Skip to content

File disk_manager.c

File List > cubrid > src > storage > disk_manager.c

Go to the documentation of this file

/*
 * Copyright 2008 Search Solution Corporation
 * Copyright 2016 CUBRID Corporation
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 *
 */

/*
 * disk_manager.c - Disk managment module (at server)
 */

#ident "$Id$"

#include "config.h"

#include <stdlib.h>
#include <stddef.h>
#include <string.h>
#include <assert.h>
#include <errno.h>

#if !defined (WINDOWS)
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#endif /* !WINDOWS */

#include "disk_manager.h"

#include "porting.h"
#include "event_log.h"
#include "porting_inline.hpp"
#include "system_parameter.h"
#include "error_manager.h"
#include "language_support.h"
#include "intl_support.h"
#include "xserver_interface.h"
#include "file_io.h"
#include "page_buffer.h"
#include "log_append.hpp"
#include "log_manager.h"
#include "log_lsa.hpp"
#include "log_volids.hpp"
#include "critical_section.h"
#include "boot_sr.h"
#include "tz_support.h"
#include "db_date.h"
#include "bit.h"
#include "fault_injection.h"
#include "vacuum.h"
#include "dbtype.h"
#include "thread_entry_task.hpp"
#include "thread_manager.hpp"
#include "double_write_buffer.hpp"
// XXX: SHOULD BE THE LAST INCLUDE HEADER
#include "memory_wrapper.hpp"

/************************************************************************/
/* Define structures, globals, and macro's                              */
/************************************************************************/

/* DON'T USE sizeof on this structure.. size if variable */
typedef struct disk_volume_header DISK_VOLUME_HEADER;
struct disk_volume_header
{               /* Volume header */
  /* DON'T MOVE THE MAGIC FIELD. IT IS USED BY FILEC */
  char magic[CUBRID_MAGIC_MAX_LENGTH];  /* Magic value for file/magic Unix utility */
  INT16 iopagesize;     /* This was only added for checking purposes. The actual value is stored on the log */
  INT16 volid;          /* Volume identifier */
  INT8 db_charset;      /* charset of database */
  INT8 dummy1;          /* Dummy fields for alignment */
  DB_VOLPURPOSE purpose;    /* Permanent or temporary volume purpose */
  DB_VOLTYPE type;      /* Permanent or temporary volume type */
  DKNPAGES sect_npgs;       /* Size of sector in pages */
  DKNSECTS nsect_total;     /* Total number of sectors */
  DKNSECTS nsect_max;       /* Maximum number of sectors */
  SECTID hint_allocsect;    /* Hint for next sector to be allocated */
  DKNPAGES stab_npages;     /* Size of sector allocation table in pages */
  PAGEID stab_first_page;   /* First page of sector allocation table */
  PAGEID sys_lastpage;      /* Last system page */
  INT32 dummy2;         /* Dummy fields for alignment */
  INT64 db_creation;        /* Database creation time. For safety reasons, this value is set on all volumes and the
                 * log. The value is generated by the log manager */
  INT64 vol_creation;       /* Volume creation time */
  LOG_LSA chkpt_lsa;        /* Lowest log sequence address to start the recovery process of this volume */
  HFID boot_hfid;       /* System Heap file for booting purposes and multi volumes */
  INT32 reserved0;      /* reserved area */
  INT32 reserved1;      /* reserved area */
  INT32 reserved2;      /* reserved area */
  INT32 reserved3;      /* reserved area */
  INT16 next_volid;     /* next volume identifier * */
  INT16 offset_to_vol_fullname; /* Offset to vol_fullname */
  INT16 offset_to_next_vol_fullname;    /* Offset to next vol_fullname */
  INT16 offset_to_vol_remarks;  /* Offset to vol_remarks */

  char var_fields[1];       /* Variable length fields addresses by the offset Current ordering is: 1) vol_fullname,
                 * 2) next_vol_fullname 3) volume remarks The length is DB_PAGESIZE - offset of
                 * var_fields */

};

typedef struct disk_recv_link_perm_volume DISK_RECV_LINK_PERM_VOLUME;
struct disk_recv_link_perm_volume
{               /* Recovery for links */
  INT16 next_volid;
  char next_vol_fullname[1];    /* Actually more than one */
};

typedef struct disk_recv_change_creation DISK_RECV_CHANGE_CREATION;
struct disk_recv_change_creation
{               /* Recovery for changes */
  INT64 db_creation;
  LOG_LSA chkpt_lsa;
  char vol_fullname[1];     /* Actually more than one */
};

/* recovery data for volume expand */
typedef struct disk_recv_data_volume_expand DISK_RECV_DATA_VOLUME_EXPAND;
struct disk_recv_data_volume_expand
{
  DKNPAGES npages;
  INT16 volid;
};

/* show volume header context structure */
typedef struct disk_vol_header_context DISK_VOL_HEADER_CONTEXT;
struct disk_vol_header_context
{
  int volume_id;        /* volume id */
};

typedef struct disk_check_vol_info DISK_CHECK_VOL_INFO;
struct disk_check_vol_info
{
  VOLID volid;          /* volume id be found */
  bool exists;          /* whether volid does exist */
};

/************************************************************************/
/* Disk cache section                                                   */
/************************************************************************/

typedef struct disk_cache_volinfo DISK_CACHE_VOLINFO;
struct disk_cache_volinfo
{
  DB_VOLPURPOSE purpose;
  DKNSECTS nsect_free;      /* Hint of free sectors on volume */
};

typedef struct disk_extend_info DISK_EXTEND_INFO;
struct disk_extend_info
{
  volatile DKNSECTS nsect_free;
  volatile DKNSECTS nsect_total;
  volatile DKNSECTS nsect_max;
  volatile DKNSECTS nsect_intention;

  pthread_mutex_t mutex_reserve;
#if !defined (NDEBUG)
  volatile int owner_reserve;
#endif              /* !NDEBUG */

  DKNSECTS nsect_vol_max;
  VOLID volid_extend;
  DB_VOLTYPE voltype;
};

typedef struct disk_perm_info DISK_PERM_PURPOSE_INFO;
struct disk_perm_info
{
  DISK_EXTEND_INFO extend_info;
};

typedef struct disk_temp_info DISK_TEMP_PURPOSE_INFO;
struct disk_temp_info
{
  DISK_EXTEND_INFO extend_info;
  DKNSECTS nsect_perm_free;
  DKNSECTS nsect_perm_total;
};

typedef struct disk_cache DISK_CACHE;
struct disk_cache
{
  int nvols_perm;       /* number of permanent type volumes */
  int nvols_temp;       /* number of temporary type volumes */
  DISK_CACHE_VOLINFO vols[LOG_MAX_DBVOLID + 1]; /* volume info array */

  DISK_PERM_PURPOSE_INFO perm_purpose_info; /* info for permanent purpose */
  DISK_TEMP_PURPOSE_INFO temp_purpose_info; /* info for temporary purpose */

  pthread_mutex_t mutex_extend; /* note: never get expand mutex while keeping reserve mutexes */
#if !defined (NDEBUG)
  volatile int owner_extend;
#endif              /* !NDEBUG */
};

static DISK_CACHE *disk_Cache = NULL;

static DKNSECTS disk_Temp_max_sects = -2;

/************************************************************************/
/* Disk allocation table section                                        */
/************************************************************************/

/* Disk allocation table is a bitmap that keeps track of reserved sectors. When files are created or extended, they
 * reserve a number of sectors from disk (each sector containing a predefined number of pages). */

/* The default unit used to divide a page of allocation table. Currently it is a char (8 bit).
 * If we ever want to change the type of unit, this can be modified and should be handled automatically. However, some
 * other structures may need updating (e.g. DISK_ALLOCTBL_CURSOR).
 */
typedef UINT64 DISK_STAB_UNIT;
#define DISK_STAB_UNIT_SIZE_OF sizeof (DISK_STAB_UNIT)

/* Disk allocation table cursor. Used to iterate through table bits. */
typedef struct disk_stab_cursor DISK_STAB_CURSOR;
struct disk_stab_cursor
{
  const DISK_VOLUME_HEADER *volheader;  /* Volume header */

  PAGEID pageid;        /* Current page ID */
  int offset_to_unit;       /* Offset to current unit in page. */
  int offset_to_bit;        /* Offset to current bit in unit. */

  SECTID sectid;        /* Sector ID */

  PAGE_PTR page;        /* Fixed table page. */
  DISK_STAB_UNIT *unit;     /* Unit pointer in current page. */
};
#define DISK_STAB_CURSOR_INITIALIZER { NULL, 0, 0, 0, 0, NULL, NULL }

/* Allocation table macro's */
/* Bit count in a unit */
#define DISK_STAB_UNIT_BIT_COUNT        ((int) (DISK_STAB_UNIT_SIZE_OF * CHAR_BIT))
/* Unit count in a table page */
#define DISK_STAB_PAGE_UNITS_COUNT      ((int) (DB_PAGESIZE / DISK_STAB_UNIT_SIZE_OF))
/* Bit count in a table page */
#define DISK_STAB_PAGE_BIT_COUNT        ((int) (DISK_STAB_UNIT_BIT_COUNT * DISK_STAB_PAGE_UNITS_COUNT))

/* Get page offset for sector ID. Note this is not the real page ID (since table does not start from page 0). */
#define DISK_ALLOCTBL_SECTOR_PAGE_OFFSET(sect) ((sect) / DISK_STAB_PAGE_BIT_COUNT)
/* Get unit offset in page for sector ID. */
#define DISK_ALLOCTBL_SECTOR_UNIT_OFFSET(sect) (((sect) % DISK_STAB_PAGE_BIT_COUNT) / DISK_STAB_UNIT_BIT_COUNT)
/* Get bit offset in unit for sector ID */
#define DISK_ALLOCTBL_SECTOR_BIT_OFFSET(sect) (((sect) % DISK_STAB_PAGE_BIT_COUNT) % DISK_STAB_UNIT_BIT_COUNT)

#define DISK_SECTS_ROUND_UP(nsects)  (CEIL_PTVDIV (nsects, DISK_STAB_UNIT_BIT_COUNT) * DISK_STAB_UNIT_BIT_COUNT)
#define DISK_SECTS_ROUND_DOWN(nsects)  ((nsects / DISK_STAB_UNIT_BIT_COUNT) * DISK_STAB_UNIT_BIT_COUNT)
#define DISK_SECTS_ASSERT_ROUNDED(nsects)  assert (nsects == DISK_SECTS_ROUND_DOWN (nsects))

#define DISK_STAB_NPAGES(nsect_max) (CEIL_PTVDIV (nsect_max, DISK_STAB_PAGE_BIT_COUNT))

/* function used by disk_stab_iterate_units */
typedef int (*DISK_STAB_UNIT_FUNC) (THREAD_ENTRY * thread_p, DISK_STAB_CURSOR * cursor, bool * stop, void *args);

/************************************************************************/
/* Sector reserve section                                               */
/************************************************************************/

typedef struct disk_cache_vol_reserve DISK_CACHE_VOL_RESERVE;
struct disk_cache_vol_reserve
{
  VOLID volid;
  DKNSECTS nsect;
};
#define DISK_PRERESERVE_BUF_DEFAULT 16

typedef struct disk_reserve_context DISK_RESERVE_CONTEXT;
struct disk_reserve_context
{
  int nsect_total;
  VSID *vsidp;

  DISK_CACHE_VOL_RESERVE cache_vol_reserve[VOLID_MAX];
  int n_cache_vol_reserve;
  int n_cache_reserve_remaining;

  DKNSECTS nsects_lastvol_remaining;

  DB_VOLPURPOSE purpose;
};

/************************************************************************/
/* Disk create, extend, destroy section                                 */
/************************************************************************/

/* minimum volume sectors for create/expand... at least one allocation table unit */
#define DISK_MIN_VOLUME_SECTS DISK_STAB_UNIT_BIT_COUNT

/* when allocating remaining disk space, leave out 64MB for safety */
#define DISK_SAFE_OSDISK_FREE_SPACE (64 * 1024 * 1024)
#define DISK_SAFE_OSDISK_FREE_SECTS (DISK_SAFE_OSDISK_FREE_SPACE / IO_SECTORSIZE)

/************************************************************************/
/* Utility section                                                      */
/************************************************************************/

/* logging */
static bool disk_Logging = false;
#define disk_log(func, msg, ...) \
  if (disk_Logging) \
    _er_log_debug (ARG_FILE_LINE, "DISK " func " " LOG_THREAD_TRAN_MSG ": \n\t" msg "\n", \
                   LOG_THREAD_TRAN_ARGS (thread_get_thread_entry_info ()), __VA_ARGS__)

#define DISK_VOLHEADER_MSG \
  "\t\tvolume header: \n" \
  "\t\t\tvolid = %d\n" \
  "\t\t\ttype = %s\n" \
  "\t\t\tpurpose = %s\n" \
  "\t\t\tsectors: total = %d, max = %d\n" \
  "\t\t\thint_allocset = %d\n" \
  "\t\t\tsector allocation table: start = %d, num pages = %d\n" \
  "\t\t\tchkpt_lsa = %lld|%d\n" \
  "\t\t\tnext_volid = %d\n"
#define DISK_VOLHEADER_AS_ARGS(arg_volh) \
  (arg_volh)->volid, \
  disk_type_to_string ((arg_volh)->type), \
  disk_purpose_to_string ((arg_volh)->purpose), \
  (arg_volh)->nsect_total, (arg_volh)->nsect_max, \
  (arg_volh)->hint_allocsect, \
  (arg_volh)->stab_first_page, (arg_volh)->stab_npages, \
  LSA_AS_ARGS (&(arg_volh)->chkpt_lsa), \
  (arg_volh)->next_volid

#define DISK_RESERVE_CONTEXT_MSG \
  "\t\tcontext: \n" \
  "\t\t\tnsect_total = %d\n" \
  "\t\t\tn_cache_vol_reserve = %d, n_cache_reserve_remaining = %d\n" \
  "\t\t\tpurpose = %s"
#define DISK_RESERVE_CONTEXT_AS_ARGS(context) \
  (context)->nsect_total, (context)->n_cache_vol_reserve, (context)->n_cache_reserve_remaining, \
  disk_purpose_to_string ((context)->purpose)

#define DISK_SYS_NPAGE_SIZE(nsect_max) (1 + DISK_STAB_NPAGES (nsect_max))
#define DISK_SYS_NSECT_SIZE(nsect_max) (CEIL_PTVDIV (DISK_SYS_NPAGE_SIZE (nsect_max), DISK_SECTOR_NPAGES))

/************************************************************************/
/* Declare static functions.                                            */
/************************************************************************/

#if defined (SERVER_MODE)
static void disk_log_extend_elapsed (THREAD_ENTRY * thread_p, const char *event, const char *name, DB_VOLTYPE voltype,
                     const char *log);
#endif

STATIC_INLINE char *disk_vhdr_get_vol_fullname (const DISK_VOLUME_HEADER * vhdr) __attribute__ ((ALWAYS_INLINE));
STATIC_INLINE char *disk_vhdr_get_next_vol_fullname (const DISK_VOLUME_HEADER * vhdr) __attribute__ ((ALWAYS_INLINE));
STATIC_INLINE char *disk_vhdr_get_vol_remarks (const DISK_VOLUME_HEADER * vhdr) __attribute__ ((ALWAYS_INLINE));
STATIC_INLINE int disk_vhdr_get_vol_header_size (const DISK_VOLUME_HEADER * vhdr) __attribute__ ((ALWAYS_INLINE));
static int disk_vhdr_set_vol_fullname (DISK_VOLUME_HEADER * vhdr, const char *vol_fullname);
static int disk_vhdr_set_next_vol_fullname (DISK_VOLUME_HEADER * vhdr, const char *next_vol_fullname);
static int disk_vhdr_set_vol_remarks (DISK_VOLUME_HEADER * vhdr, const char *vol_remarks);

static bool disk_cache_load_all_volumes (THREAD_ENTRY * thread_p);
static bool disk_cache_load_volume (THREAD_ENTRY * thread_p, INT16 volid, void *ignore);

static const char *disk_purpose_to_string (DISK_VOLPURPOSE purpose);
static const char *disk_type_to_string (DB_VOLTYPE voltype);

static int disk_stab_dump (THREAD_ENTRY * thread_p, FILE * fp, const DISK_VOLUME_HEADER * volheader);

static int disk_dump_volume_system_info (THREAD_ENTRY * thread_p, FILE * fp, INT16 volid);
static bool disk_dump_goodvol_all (THREAD_ENTRY * thread_p, INT16 volid, void *ignore);
static void disk_vhdr_dump (FILE * fp, const DISK_VOLUME_HEADER * vhdr);

STATIC_INLINE void disk_verify_volume_header (THREAD_ENTRY * thread_p, PAGE_PTR pgptr) __attribute__ ((ALWAYS_INLINE));
static bool disk_check_volume_exist (THREAD_ENTRY * thread_p, VOLID volid, void *arg);
static int disk_can_overwrite_data_volume (THREAD_ENTRY * thread_p, const char *vol_label_p, bool * can_overwrite);

/************************************************************************/
/* Disk sector table section                                            */
/************************************************************************/

STATIC_INLINE void disk_stab_cursor_set_at_sectid (const DISK_VOLUME_HEADER * volheader, SECTID sectid,
                           DISK_STAB_CURSOR * cursor) __attribute__ ((ALWAYS_INLINE));
STATIC_INLINE void disk_stab_cursor_set_at_end (const DISK_VOLUME_HEADER * volheader, DISK_STAB_CURSOR * cursor)
  __attribute__ ((ALWAYS_INLINE));
STATIC_INLINE void disk_stab_cursor_set_at_start (const DISK_VOLUME_HEADER * volheader, DISK_STAB_CURSOR * cursor)
  __attribute__ ((ALWAYS_INLINE));
STATIC_INLINE int disk_stab_cursor_compare (const DISK_STAB_CURSOR * first_cursor,
                        const DISK_STAB_CURSOR * second_cursor) __attribute__ ((ALWAYS_INLINE));
STATIC_INLINE void disk_stab_cursor_check_valid (const DISK_STAB_CURSOR * cursor) __attribute__ ((ALWAYS_INLINE));
STATIC_INLINE bool disk_stab_cursor_is_bit_set (const DISK_STAB_CURSOR * cursor) __attribute__ ((ALWAYS_INLINE));
STATIC_INLINE void disk_stab_cursor_set_bit (DISK_STAB_CURSOR * cursor) __attribute__ ((ALWAYS_INLINE));
STATIC_INLINE void disk_stab_cursor_clear_bit (DISK_STAB_CURSOR * cursor) __attribute__ ((ALWAYS_INLINE));
STATIC_INLINE int disk_stab_cursor_get_bit_index_in_page (const DISK_STAB_CURSOR * cursor)
  __attribute__ ((ALWAYS_INLINE));
STATIC_INLINE SECTID disk_stab_cursor_get_sectid (const DISK_STAB_CURSOR * cursor) __attribute__ ((ALWAYS_INLINE));
STATIC_INLINE int disk_stab_cursor_fix (THREAD_ENTRY * thread_p, DISK_STAB_CURSOR * cursor,
                    PGBUF_LATCH_MODE latch_mode) __attribute__ ((ALWAYS_INLINE));
STATIC_INLINE void disk_stab_cursor_unfix (THREAD_ENTRY * thread_p, DISK_STAB_CURSOR * cursor)
  __attribute__ ((ALWAYS_INLINE));
static int disk_stab_unit_reserve (THREAD_ENTRY * thread_p, DISK_STAB_CURSOR * cursor, bool * stop, void *args);

static int disk_stab_iterate_units (THREAD_ENTRY * thread_p, const DISK_VOLUME_HEADER * volheader,
                    PGBUF_LATCH_MODE mode, DISK_STAB_CURSOR * start, DISK_STAB_CURSOR * end,
                    DISK_STAB_UNIT_FUNC f_unit, void *f_unit_args);
static int disk_stab_iterate_units_all (THREAD_ENTRY * thread_p, const DISK_VOLUME_HEADER * volheader,
                    PGBUF_LATCH_MODE mode, DISK_STAB_UNIT_FUNC f_unit, void *f_unit_args);
static int disk_stab_dump_unit (THREAD_ENTRY * thread_p, DISK_STAB_CURSOR * cursor, bool * stop, void *args);
static int disk_stab_count_free (THREAD_ENTRY * thread_p, DISK_STAB_CURSOR * cursor, bool * stop, void *args);
static int disk_stab_has_used (THREAD_ENTRY * thread_p, DISK_STAB_CURSOR * cursor, bool * stop, void *args);
static int disk_stab_set_bits_contiguous (THREAD_ENTRY * thread_p, DISK_STAB_CURSOR * cursor, bool * stop, void *args);

/************************************************************************/
/* Disk cache section                                                   */
/************************************************************************/

static int disk_volume_boot (THREAD_ENTRY * thread_p, VOLID volid, DB_VOLPURPOSE * purpose_out,
                 DB_VOLTYPE * voltype_out, DISK_VOLUME_SPACE_INFO * space_out);
STATIC_INLINE void disk_cache_lock_reserve (DISK_EXTEND_INFO * expand_info) __attribute__ ((ALWAYS_INLINE));
STATIC_INLINE void disk_cache_unlock_reserve (DISK_EXTEND_INFO * expand_info) __attribute__ ((ALWAYS_INLINE));
STATIC_INLINE void disk_cache_lock_reserve_for_purpose (DB_VOLPURPOSE purpose) __attribute__ ((ALWAYS_INLINE));
STATIC_INLINE void disk_cache_unlock_reserve_for_purpose (DB_VOLPURPOSE purpose) __attribute__ ((ALWAYS_INLINE));
STATIC_INLINE void disk_cache_update_vol_free (VOLID volid, DKNSECTS delta_free) __attribute__ ((ALWAYS_INLINE));

/************************************************************************/
/* Sector reserve section                                               */
/************************************************************************/

static int disk_reserve_sectors_in_volume (THREAD_ENTRY * thread_p, int vol_index, DISK_RESERVE_CONTEXT * context);
static DISK_ISVALID disk_is_sector_reserved (THREAD_ENTRY * thread_p, const DISK_VOLUME_HEADER * volheader,
                         SECTID sectid, bool debug_crash);
static int disk_reserve_from_cache (THREAD_ENTRY * thread_p, DISK_RESERVE_CONTEXT * context, bool * did_extend);
STATIC_INLINE void disk_reserve_from_cache_vols (DB_VOLTYPE type, DISK_RESERVE_CONTEXT * context)
  __attribute__ ((ALWAYS_INLINE));
static int disk_extend (THREAD_ENTRY * thread_p, DISK_EXTEND_INFO * expand_info,
            DISK_RESERVE_CONTEXT * reserve_context);
static int disk_volume_expand (THREAD_ENTRY * thread_p, VOLID volid, DB_VOLTYPE voltype, DKNSECTS nsect_extend,
                   DKNSECTS * nsect_extended_out);
static int disk_add_volume (THREAD_ENTRY * thread_p, DBDEF_VOL_EXT_INFO * extinfo, VOLID * volid_out,
                DKNSECTS * nsects_free_out);
STATIC_INLINE void disk_reserve_from_cache_volume (VOLID volid, DISK_RESERVE_CONTEXT * context)
  __attribute__ ((ALWAYS_INLINE));
STATIC_INLINE void disk_cache_free_reserved (DISK_RESERVE_CONTEXT * context) __attribute__ ((ALWAYS_INLINE));
static int disk_unreserve_ordered_sectors_without_csect (THREAD_ENTRY * thread_p, DB_VOLPURPOSE purpose, int nsects,
                             VSID * vsids);
static int disk_unreserve_sectors_from_volume (THREAD_ENTRY * thread_p, VOLID volid, DISK_RESERVE_CONTEXT * context);
static int disk_stab_unit_unreserve (THREAD_ENTRY * thread_p, DISK_STAB_CURSOR * cursor, bool * stop, void *args);
static DISK_ISVALID disk_check_sectors_are_reserved_in_volume (THREAD_ENTRY * thread_p, VOLID volid,
                                   DISK_RESERVE_CONTEXT * context);
static int disk_stab_unit_check_reserved (THREAD_ENTRY * thread_p, DISK_STAB_CURSOR * cursor, bool * stop, void *args);

/************************************************************************/
/* Other                                                                */
/************************************************************************/

static int disk_stab_init (THREAD_ENTRY * thread_p, DISK_VOLUME_HEADER * volheader);
STATIC_INLINE void disk_volume_header_set_stab (DB_VOLPURPOSE vol_purpose, DISK_VOLUME_HEADER * volheader)
  __attribute__ ((ALWAYS_INLINE));

static int disk_format (THREAD_ENTRY * thread_p, const char *dbname, INT16 volid, DBDEF_VOL_EXT_INFO * ext_info,
            DKNSECTS * nsect_free_out);

STATIC_INLINE int disk_get_volheader_internal (THREAD_ENTRY * thread_p, VOLID volid, PGBUF_LATCH_MODE latch_mode,
                           PAGE_PTR * page_volheader_out, DISK_VOLUME_HEADER ** volheader_out
#if !defined (NDEBUG)
                           , const char *file, int line
#endif              /* !NDEBUG */
  ) __attribute__ ((ALWAYS_INLINE));
#if defined (NDEBUG)
#define disk_get_volheader disk_get_volheader_internal
#else /* !NDEBUG */
#define disk_get_volheader(...) disk_get_volheader_internal (__VA_ARGS__, ARG_FILE_LINE)
#endif /* !NDEBUG */

static int disk_cache_init (void);
static void disk_cache_final (void);
STATIC_INLINE bool disk_is_valid_volid (VOLID volid) __attribute__ ((ALWAYS_INLINE));
STATIC_INLINE DB_VOLPURPOSE disk_get_volpurpose (VOLID volid) __attribute__ ((ALWAYS_INLINE));
STATIC_INLINE DB_VOLTYPE disk_get_voltype (VOLID volid) __attribute__ ((ALWAYS_INLINE));
STATIC_INLINE bool disk_compatible_type_and_purpose (DB_VOLTYPE type, DB_VOLPURPOSE purpose)
  __attribute__ ((ALWAYS_INLINE));
STATIC_INLINE void disk_check_own_reserve_for_purpose (DB_VOLPURPOSE purpose) __attribute__ ((ALWAYS_INLINE));
static DISK_ISVALID disk_check_volume (THREAD_ENTRY * thread_p, INT16 volid, bool repair);

/************************************************************************/
/* End of static functions                                              */
/************************************************************************/

/************************************************************************/
/* Define functions.                                                    */
/************************************************************************/

/************************************************************************/
/* Disk volume manipulation                                             */
/************************************************************************/

/*
 * disk_format () - format new volume
 *
 * return               : error code
 * thread_p (in)        : thread entry
 * dbname (in)          : database name
 * volid (in)           : new volume identifier
 * ext_info (in)        : all extension info
 * nsect_free_out (out) : output number of free sectors in new volume
 */
static int
disk_format (THREAD_ENTRY * thread_p, const char *dbname, VOLID volid, DBDEF_VOL_EXT_INFO * ext_info,
         DKNSECTS * nsect_free_out)
{
#if !defined (NDEBUG)
  /* inject faults to test recovery */
#define fault_inject_random_crash() \
  if (vol_purpose == DB_PERMANENT_DATA_PURPOSE) FI_TEST (thread_p, FI_TEST_DISK_MANAGER_VOLUME_ADD, 0)
#else /* RELEASE */
#define fault_inject_random_crash()
#endif /* RELEASE */

  int vdes;         /* Volume descriptor */
  DISK_VOLUME_HEADER *vhdr; /* Pointer to volume header */
  VPID vpid;            /* Volume and page identifiers */
  LOG_DATA_ADDR addr;       /* Address of logging data */
  const char *vol_fullname = ext_info->name;
  DKNSECTS max_npages = DISK_SECTS_NPAGES (ext_info->nsect_max);
  int kbytes_to_be_written_per_sec = ext_info->max_writesize_in_sec;
  DISK_VOLPURPOSE vol_purpose = ext_info->purpose;
  DKNPAGES extend_npages = DISK_SECTS_NPAGES (ext_info->nsect_total);
  INT16 prev_volid;
  int error_code = NO_ERROR;

  assert ((int) sizeof (DISK_VOLUME_HEADER) <= DB_PAGESIZE);

  addr.vfid = NULL;

  if ((strlen (vol_fullname) + 1 > DB_MAX_PATH_LENGTH)
      || (DB_PAGESIZE < (PGLENGTH) (SSIZEOF (DISK_VOLUME_HEADER) + strlen (vol_fullname) + 1)))
    {
      er_set (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, ER_BO_FULL_DATABASE_NAME_IS_TOO_LONG, 3, NULL, vol_fullname,
          strlen (vol_fullname) + 1, DB_MAX_PATH_LENGTH);
      return ER_BO_FULL_DATABASE_NAME_IS_TOO_LONG;
    }

  /* make sure that this is a valid purpose */
  if (vol_purpose != DB_PERMANENT_DATA_PURPOSE && vol_purpose != DB_TEMPORARY_DATA_PURPOSE)
    {
      assert (false);
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_DISK_UNKNOWN_PURPOSE, 3, vol_purpose, DB_PERMANENT_DATA_PURPOSE,
          DB_TEMPORARY_DATA_PURPOSE);
      return ER_DISK_UNKNOWN_PURPOSE;
    }

  /* safe guard: permanent volumes with temporary data purpose must be maxed */
  assert (vol_purpose != DB_TEMPORARY_DATA_PURPOSE || ext_info->voltype != DB_PERMANENT_VOLTYPE
      || ext_info->nsect_total == ext_info->nsect_max);

  /* undo must be logical since we are going to remove the volume in the case of rollback (really a crash since we are
   * in a top operation) */
  addr.offset = (PGLENGTH) volid;
  addr.pgptr = NULL;
  if (ext_info->voltype == DB_PERMANENT_VOLTYPE)
    {
      log_append_undo_data (thread_p, RVDK_FORMAT, &addr, (int) strlen (vol_fullname) + 1, vol_fullname);
    }
  fault_inject_random_crash ();

  /* this log must be flushed. */
  logpb_force_flush_pages (thread_p);
  fault_inject_random_crash ();

  /* create and initialize the volume. recovery information is initialized in every page. */
  vdes = fileio_format (thread_p, dbname, vol_fullname, volid, extend_npages, vol_purpose == DB_PERMANENT_DATA_PURPOSE,
            false, false, IO_PAGESIZE, kbytes_to_be_written_per_sec, false);
  if (vdes == NULL_VOLDES)
    {
      ASSERT_ERROR_AND_SET (error_code);
      return error_code;
    }
  /* from now on, if error occurs, we need to go to exit */
  fault_inject_random_crash ();

  /* initialize the volume header and the sector and page allocation tables */
  vpid.volid = volid;
  vpid.pageid = DISK_VOLHEADER_PAGE;

  /* lock the volume header in exclusive mode and then fetch the page. */
  addr.pgptr = pgbuf_fix (thread_p, &vpid, NEW_PAGE, PGBUF_LATCH_WRITE, PGBUF_UNCONDITIONAL_LATCH);
  if (addr.pgptr == NULL)
    {
      ASSERT_ERROR_AND_SET (error_code);
      goto exit;
    }
  (void) pgbuf_set_page_ptype (thread_p, addr.pgptr, PAGE_VOLHEADER);

  /* initialize the header */
  vhdr = (DISK_VOLUME_HEADER *) addr.pgptr;

  strncpy (vhdr->magic, CUBRID_MAGIC_DATABASE_VOLUME, CUBRID_MAGIC_MAX_LENGTH);
  vhdr->iopagesize = IO_PAGESIZE;
  vhdr->volid = volid;
  vhdr->purpose = vol_purpose;
  vhdr->type = ext_info->voltype;
  vhdr->sect_npgs = DISK_SECTOR_NPAGES;
  vhdr->nsect_total = ext_info->nsect_total;
  vhdr->nsect_max = ext_info->nsect_max;
  vhdr->db_charset = lang_charset ();
  vhdr->hint_allocsect = NULL_SECTID;
  vhdr->dummy1 = vhdr->dummy2 = 0;  /* alignment. You may use it. */
  vhdr->reserved0 = vhdr->reserved1 = vhdr->reserved2 = vhdr->reserved3 = 0;    /* for future extension */

  /* set sector table info in volume header */
  disk_volume_header_set_stab (vol_purpose, vhdr);
  if (vhdr->sys_lastpage >= extend_npages)
    {
      assert (false);
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_IO_FORMAT_BAD_NPAGES, 2, vol_fullname, extend_npages);
      error_code = ER_IO_FORMAT_BAD_NPAGES;
      goto exit;
    }

  error_code = log_get_db_start_parameters (&vhdr->db_creation, &vhdr->chkpt_lsa);
  if (error_code != NO_ERROR)
    {
      ASSERT_ERROR ();
      goto exit;
    }

  vhdr->vol_creation = time (NULL);

  /* Initialize the system heap file for booting purposes. This field is reseted after the heap file is created by the
   * boot manager */

  vhdr->boot_hfid.vfid.volid = NULL_VOLID;
  vhdr->boot_hfid.vfid.fileid = NULL_PAGEID;
  vhdr->boot_hfid.hpgid = NULL_PAGEID;

  /* Initialize variable length fields */

  vhdr->next_volid = NULL_VOLID;
  vhdr->offset_to_vol_fullname = vhdr->offset_to_next_vol_fullname = vhdr->offset_to_vol_remarks = 0;
  vhdr->var_fields[vhdr->offset_to_vol_fullname] = '\0';
  error_code = disk_vhdr_set_vol_fullname (vhdr, vol_fullname);
  if (error_code != NO_ERROR)
    {
      ASSERT_ERROR ();
      goto exit;
    }

  error_code = disk_vhdr_set_next_vol_fullname (vhdr, NULL);
  if (error_code != NO_ERROR)
    {
      ASSERT_ERROR ();
      goto exit;
    }

  error_code = disk_vhdr_set_vol_remarks (vhdr, ext_info->comments);
  if (error_code != NO_ERROR)
    {
      ASSERT_ERROR ();
      goto exit;
    }

  /* Make sure that in the case of a crash, the volume is created. Otherwise, the recovery will not work */

  if (ext_info->voltype == DB_PERMANENT_VOLTYPE)
    {
      log_append_dboutside_redo (thread_p, RVDK_NEWVOL, disk_vhdr_get_vol_header_size (vhdr), vhdr);

      fault_inject_random_crash ();

      /* Even though the volume header page is not completed at this moment, to write REDO log for the header page is
       * crucial for redo recovery since disk_map_init and disk_set_link will write their redo logs. These functions
       * will access the header page during restart recovery. Another REDO log for RVDK_FORMAT will be written to
       * completely log the header page including the volume link.
       *
       * TODO: I think this log entry and comment are obsolete. In fact disk_stab_init and disk_set_link will not access
       *       this volume header page during recovery. However, I don't like to change this without a thorough
       *       examination, although it does bring a lot of pain. I will just use the addr.offset field to make each
       *       RVDK_FORMAT calls recovery detectable.
       */
      addr.offset = -1;     /* First call is marked with offset -1. */
      log_append_redo_data (thread_p, RVDK_FORMAT, &addr, disk_vhdr_get_vol_header_size (vhdr), vhdr);

      fault_inject_random_crash ();
    }

  /* Now initialize the sector and page allocator tables and link the volume to previous allocated volume */
  prev_volid = fileio_find_previous_perm_volume (thread_p, volid);
  error_code = disk_stab_init (thread_p, vhdr);
  if (error_code != NO_ERROR)
    {
      /* Problems setting the map allocation tables, release the header page, dismount and destroy the volume, and
       * return */
      ASSERT_ERROR ();
      goto exit;
    }
  fault_inject_random_crash ();

  if (ext_info->voltype == DB_PERMANENT_VOLTYPE && volid != LOG_DBFIRST_VOLID)
    {
      error_code = disk_set_link (thread_p, prev_volid, volid, vol_fullname, true, DISK_FLUSH);
      if (error_code != NO_ERROR)
    {
      ASSERT_ERROR ();
      goto exit;
    }
      fault_inject_random_crash ();
    }

  if (ext_info->voltype == DB_PERMANENT_VOLTYPE)
    {
      addr.offset = 0;      /* Header is located at position zero */
      log_append_redo_data (thread_p, RVDK_FORMAT, &addr, disk_vhdr_get_vol_header_size (vhdr), vhdr);

      fault_inject_random_crash ();
    }

  /* if this is a volume with temporary purposes, we do not log any disk driver related changes any longer.
   * indicate that by setting the disk pages to temporary lsa */
  if (vol_purpose == DB_TEMPORARY_DATA_PURPOSE)
    {
      /* todo: understand what this code is supposed to do */
      PAGE_PTR pgptr = NULL;    /* Page pointer */
      bool flushed;

      /* Flush the pages so that the log is forced */
      (void) pgbuf_flush_all (thread_p, volid);

      /* Flush dwb also */
      error_code = dwb_flush_force (thread_p, &flushed);
      if (error_code != NO_ERROR)
    {
      ASSERT_ERROR ();
      goto exit;
    }

      /* TODO: Does it hold??
       * assert (flushed == true);
       */

      for (vpid.volid = volid, vpid.pageid = DISK_VOLHEADER_PAGE; vpid.pageid <= vhdr->sys_lastpage; vpid.pageid++)
    {
      pgptr = pgbuf_fix (thread_p, &vpid, OLD_PAGE, PGBUF_LATCH_WRITE, PGBUF_UNCONDITIONAL_LATCH);
      if (pgptr != NULL)
        {
          pgbuf_set_lsa_as_temporary (thread_p, pgptr);
          pgbuf_unfix_and_init (thread_p, pgptr);
        }
    }
      if (ext_info->voltype == DB_PERMANENT_VOLTYPE)
    {
      /* Flush all dirty pages and then invalidate them from page buffer pool. So that we can reset the recovery
       * information directly using the io module */

      (void) pgbuf_invalidate_all (thread_p, volid);    /* Flush and invalidate */
      error_code = fileio_reset_volume (thread_p, vdes, vol_fullname, max_npages, &PGBUF_TEMP_LSA);
      if (error_code != NO_ERROR)
        {
          ASSERT_ERROR ();
          /* Problems reseting the pages of the permanent volume for temporary storage purposes... That is, with a
           * tempvol LSA. dismount and destroy the volume, and return */
          goto exit;
        }
    }
    }

  (void) disk_verify_volume_header (thread_p, addr.pgptr);

  *nsect_free_out = vhdr->nsect_total - SECTOR_FROM_PAGEID (vhdr->sys_lastpage) - 1;
  pgbuf_set_dirty_and_free (thread_p, addr.pgptr);

  fault_inject_random_crash ();

  /* Flush all pages that were formatted. This is not needed, but it is done for security reasons to identify the volume
   * in case of a system crash. Note that the identification may not be possible during media crashes */
  (void) pgbuf_flush_all (thread_p, volid);
  (void) dwb_synchronize (thread_p, vdes, vol_fullname);

  fault_inject_random_crash ();

  /* todo: temporary is not logged because code should avoid it. this complicated system that uses page buffer should
   * not be necessary. with the exception of file manager and disk manager, who already manage to skip logging on
   * temporary files, all other changes on temporary pages are really not logged. so why bother? */

exit:

  if (addr.pgptr != NULL)
    {
      pgbuf_unfix (thread_p, addr.pgptr);
    }

  if (error_code != NO_ERROR)
    {
      /* volume is going to be removed, so we should invalidate all its pages in page buffer */
      (void) pgbuf_invalidate_all (thread_p, volid);

      if (ext_info->voltype == DB_TEMPORARY_VOLTYPE)
    {
      /* since temporary volumes are not logged, we cannot rely on rollback. we need to remove volume immediately.
       */
      if (disk_unformat (thread_p, vol_fullname) != NO_ERROR)
        {
          assert (false);
        }
    }
    }

  return error_code;

#undef fault_inject_random_crash
}

/*
 * disk_unformat () - Destroy/unformat a volume with the given name
 *   return: NO_ERROR
 *   vol_fullname(in): Full name of volume to unformat
 */
int
disk_unformat (THREAD_ENTRY * thread_p, const char *vol_fullname)
{
  INT16 volid;
  int ret = NO_ERROR;

  volid = fileio_find_volume_id_with_label (thread_p, vol_fullname);
  if (volid != NULL_VOLID)
    {
      (void) pgbuf_flush_all (thread_p, volid);
      (void) pgbuf_invalidate_all (thread_p, volid);
    }

  fileio_unformat (thread_p, vol_fullname);

  return ret;
}

/*
 * disk_set_creation () - Change database creation information of the given volume
 *   return: NO_ERROR
 *   volid(in): Volume identifier
 *   new_vol_fullname(in): New volume label/name
 *   new_dbcreation(in): New database creation time
 *   new_chkptlsa(in): New checkpoint
 *   logchange(in): Whether or not to log the change
 *   flush_page(in): true for flush dirty page. otherwise, false
 *
 * Note: This function is targeted for the log and recovery manager. It is used when a database is copied or renamed.
 */
int
disk_set_creation (THREAD_ENTRY * thread_p, INT16 volid, const char *new_vol_fullname, const INT64 * new_dbcreation,
           const LOG_LSA * new_chkptlsa, bool logchange, DISK_FLUSH_TYPE flush)
{
  DISK_VOLUME_HEADER *vhdr = NULL;
  LOG_DATA_ADDR addr = LOG_DATA_ADDR_INITIALIZER;
  DISK_RECV_CHANGE_CREATION *undo_recv;
  DISK_RECV_CHANGE_CREATION *redo_recv;
  int error_code = NO_ERROR;

  if ((int) strlen (new_vol_fullname) + 1 > DB_MAX_PATH_LENGTH)
    {
      er_set (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, ER_BO_FULL_DATABASE_NAME_IS_TOO_LONG, 3, NULL, new_vol_fullname,
          (int) strlen (new_vol_fullname) + 1, DB_MAX_PATH_LENGTH);
      return ER_BO_FULL_DATABASE_NAME_IS_TOO_LONG;
    }

  error_code = disk_get_volheader (thread_p, volid, PGBUF_LATCH_WRITE, &addr.pgptr, &vhdr);
  if (error_code != NO_ERROR)
    {
      ASSERT_ERROR ();
      return error_code;
    }

  /* Do I need to log anything ? */
  if (logchange != false)
    {
      int undo_size, redo_size;

      undo_size = (sizeof (*undo_recv) + (int) strlen (disk_vhdr_get_vol_fullname (vhdr)));
      undo_recv = (DISK_RECV_CHANGE_CREATION *) malloc (undo_size);
      if (undo_recv == NULL)
    {
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, undo_size);
      goto error;
    }

      redo_size = sizeof (*redo_recv) + (int) strlen (new_vol_fullname);
      redo_recv = (DISK_RECV_CHANGE_CREATION *) malloc (redo_size);
      if (redo_recv == NULL)
    {
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, redo_size);
      free_and_init (undo_recv);
      goto error;
    }

      /* Undo stuff */
      memcpy (&undo_recv->db_creation, &vhdr->db_creation, sizeof (vhdr->db_creation));
      memcpy (&undo_recv->chkpt_lsa, &vhdr->chkpt_lsa, sizeof (vhdr->chkpt_lsa));
      (void) strcpy (undo_recv->vol_fullname, disk_vhdr_get_vol_fullname (vhdr));

      /* Redo stuff */
      memcpy (&redo_recv->db_creation, new_dbcreation, sizeof (*new_dbcreation));
      memcpy (&redo_recv->chkpt_lsa, new_chkptlsa, sizeof (*new_chkptlsa));
      (void) strcpy (redo_recv->vol_fullname, new_vol_fullname);

      log_append_undoredo_data (thread_p, RVDK_CHANGE_CREATION, &addr, undo_size, redo_size, undo_recv, redo_recv);
      free_and_init (undo_recv);
      free_and_init (redo_recv);
    }
  else
    {
      log_skip_logging (thread_p, &addr);
    }

  /*
   * 1. The values of vhdr->db_creation and new_dbcreation differ only when a new database volume is being created. (ex: copydb)
   * 2. The value of new_chkptlsa differs from vhdr->chkpt_lsa only when a new database is created
   *    or when the active log header's chkpt_lsa value is changed due to a call to the log_final() function.
   */
  if (vhdr->db_creation != *new_dbcreation)
    {
      memcpy (&vhdr->db_creation, new_dbcreation, sizeof (*new_dbcreation));
      vhdr->vol_creation = time (NULL);
    }

  if (!LSA_EQ (&vhdr->chkpt_lsa, new_chkptlsa))
    {
      memcpy (&vhdr->chkpt_lsa, new_chkptlsa, sizeof (*new_chkptlsa));
    }

  if (disk_vhdr_set_vol_fullname (vhdr, new_vol_fullname) != NO_ERROR)
    {
      goto error;
    }

  (void) disk_verify_volume_header (thread_p, addr.pgptr);

  if (flush == DISK_FLUSH)
    {
      assert (false);
      pgbuf_set_dirty (thread_p, addr.pgptr, DONT_FREE);
      pgbuf_flush (thread_p, addr.pgptr, FREE);
    }
  else if (flush == DISK_FLUSH_AND_INVALIDATE)
    {
      pgbuf_set_dirty (thread_p, addr.pgptr, DONT_FREE);
      if (pgbuf_invalidate (thread_p, addr.pgptr) != NO_ERROR)
    {
      return ER_FAILED;
    }
    }
  else
    {               /* DISK_DONT_FLUSH */
      pgbuf_set_dirty (thread_p, addr.pgptr, FREE);
    }
  addr.pgptr = NULL;

  return NO_ERROR;

error:
  assert (addr.pgptr != NULL);

  (void) disk_verify_volume_header (thread_p, addr.pgptr);

  pgbuf_unfix_and_init (thread_p, addr.pgptr);

  return ER_FAILED;
}

/*
 * disk_set_link () - Link the given permanent volume with the previous permanent volume
 *   return: NO_ERROR
 *   volid(in): Volume identifier
 *   next_volid (in): next volume identifier
 *   next_volext_fullname(in): next volume label/name
 *   logchange(in): Whether or not to log the change
 *   flush(in):
 *
 * Note: No logging is intended for exclusive use by the log and recovery manager. It is used when a database
 *   is copied or renamed.
 */
int
disk_set_link (THREAD_ENTRY * thread_p, INT16 volid, INT16 next_volid, const char *next_volext_fullname, bool logchange,
           DISK_FLUSH_TYPE flush)
{
  DISK_VOLUME_HEADER *vhdr;
  LOG_DATA_ADDR addr = LOG_DATA_ADDR_INITIALIZER;
  VPID vpid;
  DISK_RECV_LINK_PERM_VOLUME *undo_recv;
  DISK_RECV_LINK_PERM_VOLUME *redo_recv;
  int error_code = NO_ERROR;

  addr.vfid = NULL;
  addr.offset = 0;

  /* Get the header of the previous permanent volume */

  error_code = disk_get_volheader (thread_p, volid, PGBUF_LATCH_WRITE, &addr.pgptr, &vhdr);
  if (error_code != NO_ERROR)
    {
      ASSERT_ERROR ();
      return error_code;
    }

  vpid.volid = volid;
  vpid.pageid = DISK_VOLHEADER_PAGE;

  /* Do I need to log anything ? */
  if (logchange == true)
    {
      int undo_size, redo_size;

      if (next_volid != NULL_VOLID)
    {
      /* Before updating and logging disk link to a new volume, we need to log a page flush on undo.
       * Recovery must flush disk link changes before removing volume from disk.
       */
      log_append_undo_data2 (thread_p, RVPGBUF_FLUSH_PAGE, NULL, NULL, 0, sizeof (vpid), &vpid);
    }

      undo_size = (sizeof (*undo_recv) + (int) strlen (disk_vhdr_get_next_vol_fullname (vhdr)));
      undo_recv = (DISK_RECV_LINK_PERM_VOLUME *) malloc (undo_size);
      if (undo_recv == NULL)
    {
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, undo_size);
      goto error;
    }

      redo_size = sizeof (*redo_recv) + (int) strlen (next_volext_fullname);
      redo_recv = (DISK_RECV_LINK_PERM_VOLUME *) malloc (redo_size);
      if (redo_recv == NULL)
    {
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, redo_size);
      free_and_init (undo_recv);
      goto error;
    }

      undo_recv->next_volid = vhdr->next_volid;
      (void) strcpy (undo_recv->next_vol_fullname, disk_vhdr_get_next_vol_fullname (vhdr));

      redo_recv->next_volid = next_volid;
      (void) strcpy (redo_recv->next_vol_fullname, next_volext_fullname);

      log_append_undoredo_data (thread_p, RVDK_LINK_PERM_VOLEXT, &addr, undo_size, redo_size, undo_recv, redo_recv);

      free_and_init (undo_recv);
      free_and_init (redo_recv);
    }
  else
    {
      log_skip_logging (thread_p, &addr);
    }

  /* Modify the header */
  vhdr->next_volid = next_volid;
  if (disk_vhdr_set_next_vol_fullname (vhdr, next_volext_fullname) != NO_ERROR)
    {
      goto error;
    }

  /* Forcing the log here to be safer, especially in the case of permanent temp volumes. */
  logpb_force_flush_pages (thread_p);

  (void) disk_verify_volume_header (thread_p, addr.pgptr);

  pgbuf_set_dirty (thread_p, addr.pgptr, DONT_FREE);
  if (flush == DISK_FLUSH_AND_INVALIDATE)
    {
      /* this will invoke pgbuf_flush */
      if (pgbuf_invalidate (thread_p, addr.pgptr) != NO_ERROR)
    {
      return ER_FAILED;
    }
    }
  else
    {
      pgbuf_flush (thread_p, addr.pgptr, FREE);
    }
  addr.pgptr = NULL;

  return NO_ERROR;

error:
  assert (addr.pgptr != NULL);

  (void) disk_verify_volume_header (thread_p, addr.pgptr);

  pgbuf_unfix_and_init (thread_p, addr.pgptr);

  return ER_FAILED;
}

/*
 * disk_set_boot_hfid () - Reset system boot heap
 *   return: NO_ERROR
 *   volid(in): Permanent volume identifier
 *   hfid(in): System boot heap file
 *
 * Note: The system boot file filed of in the volume header is redefined to point to the given value. This function
 *   is called only during the initialization process.
 */
int
disk_set_boot_hfid (THREAD_ENTRY * thread_p, INT16 volid, const HFID * hfid)
{
  DISK_VOLUME_HEADER *vhdr;
  VPID vpid;
  LOG_DATA_ADDR addr;

  addr.vfid = NULL;
  addr.offset = 0;

  vpid.volid = volid;
  vpid.pageid = DISK_VOLHEADER_PAGE;

  addr.pgptr = pgbuf_fix (thread_p, &vpid, OLD_PAGE, PGBUF_LATCH_WRITE, PGBUF_UNCONDITIONAL_LATCH);
  if (addr.pgptr == NULL)
    {
      return ER_FAILED;
    }

  (void) disk_verify_volume_header (thread_p, addr.pgptr);

  vhdr = (DISK_VOLUME_HEADER *) addr.pgptr;

  log_append_undoredo_data (thread_p, RVDK_RESET_BOOT_HFID, &addr, sizeof (vhdr->boot_hfid), sizeof (*hfid),
                &vhdr->boot_hfid, &hfid);
  HFID_COPY (&(vhdr->boot_hfid), hfid);

  (void) disk_verify_volume_header (thread_p, addr.pgptr);

  pgbuf_set_dirty (thread_p, addr.pgptr, FREE);
  addr.pgptr = NULL;

  return NO_ERROR;
}

/*
 * disk_get_boot_hfid () - Find the system boot heap file
 *   return: hfid on success or NULL on failure
 *   volid(in): Permanent volume identifier
 *   hfid(out): System boot heap file
 */
HFID *
disk_get_boot_hfid (THREAD_ENTRY * thread_p, INT16 volid, HFID * hfid)
{
  DISK_VOLUME_HEADER *vhdr;
  PAGE_PTR pgptr = NULL;

  if (disk_get_volheader (thread_p, volid, PGBUF_LATCH_READ, &pgptr, &vhdr) != NO_ERROR)
    {
      ASSERT_ERROR ();
      return NULL;
    }

  HFID_COPY (hfid, &(vhdr->boot_hfid));

  (void) disk_verify_volume_header (thread_p, pgptr);

  pgbuf_unfix_and_init (thread_p, pgptr);

  if (HFID_IS_NULL (hfid))
    {
      return NULL;
    }

  return hfid;
}

/*
 * disk_get_link () - Find the name of the next permananet volume extension
 *   return: next_volext_fullname or NULL in case of error
 *   volid(in): Volume identifier
 *   next_volid(out): Next volume identifier
 *   next_volext_fullname(out): Next volume extension
 *
 * Note: If there is none, next_volext_fullname is set to null string
 */
char *
disk_get_link (THREAD_ENTRY * thread_p, INT16 volid, INT16 * next_volid, char *next_volext_fullname)
{
  DISK_VOLUME_HEADER *vhdr;
  PAGE_PTR pgptr = NULL;

  assert (next_volid != NULL);
  assert (next_volext_fullname != NULL);

  if (disk_get_volheader (thread_p, volid, PGBUF_LATCH_READ, &pgptr, &vhdr) != NO_ERROR)
    {
      ASSERT_ERROR ();
      return NULL;
    }

  *next_volid = vhdr->next_volid;
  strncpy (next_volext_fullname, disk_vhdr_get_next_vol_fullname (vhdr), DB_MAX_PATH_LENGTH);

  (void) disk_verify_volume_header (thread_p, pgptr);

  pgbuf_unfix_and_init (thread_p, pgptr);

  return next_volext_fullname;
}


/*
 * disk_rv_redo_dboutside_newvol () - Redo the initialization of a disk from the point of view of operating system
 *   return: NO_ERROR
 *   rcv(in): Recovery structure
 */
int
disk_rv_redo_dboutside_newvol (THREAD_ENTRY * thread_p, LOG_RCV * rcv)
{
  DISK_VOLUME_HEADER *vhdr;
  char *vol_label;

  vhdr = (DISK_VOLUME_HEADER *) rcv->data;
  vol_label = disk_vhdr_get_vol_fullname (vhdr);

  if (fileio_find_volume_descriptor_with_label (vol_label) == NULL_VOLDES)
    {
      (void) fileio_format (thread_p, NULL, vol_label, vhdr->volid, DISK_SECTS_NPAGES (vhdr->nsect_total),
                vhdr->purpose != DB_TEMPORARY_DATA_PURPOSE, false, false, IO_PAGESIZE, 0, false);
      (void) pgbuf_invalidate_all (thread_p, vhdr->volid);
    }

  return NO_ERROR;
}

/*
 * disk_rv_undo_format () - Undo the initialization of a disk. The disk is uninitialized or removed
 *   return: NO_ERROR
 *   rcv(in): Recovery structure
 */
int
disk_rv_undo_format (THREAD_ENTRY * thread_p, LOG_RCV * rcv)
{
  VOLID volid = (VOLID) rcv->offset;
  int ret = NO_ERROR;

  /* we need to undo volume create. this is logged at the start of disk_format, so we may be in one of next situations:
   *   1. not recovery, an error must have occurred during disk_add_volume execution. it was not yet added to cache,
   *      so all this has to do is unformat (the number of volumes will be decremented by disk_add_volume).
   *      condition: LOG_ISRESTARTED () == true
   *   2. recovery, but crash occurred before volume is registered in system. volume is not loaded to cache, so all
   *      this has to do it unformat.
   *      condition: LOG_ISRESTARTED () == false AND volid >= disk_Cache->nvols_perm
   *   3. recovery, and crash occurred after volume was registered in system, but before completing the action. the
   *      volume was loaded in cache during restart. we need to remove it completely from cache. volid is last in cache
   *      in this case.
   *      condition: LOG_ISRESTARTED () == false AND volid == disk_Cache->nvols_perm - 1
   */

  if (!LOG_ISRESTARTED () && volid == disk_Cache->nvols_perm - 1)
    {
      VPID vpid_volheader;
      PAGE_PTR page_volheader = NULL;
      DKNSECTS total = 0, max = 0;
      DKNSECTS free = 0;

      /* case 3. remove volume from disk cache completely. */

      er_stack_push ();

      /* we need page to remove total/max from disk cache. however, it must be allocated. */
      vpid_volheader.volid = volid;
      vpid_volheader.pageid = DISK_VOLHEADER_PAGE;
      ret = pgbuf_fix_if_not_deallocated (thread_p, &vpid_volheader, PGBUF_LATCH_READ, PGBUF_UNCONDITIONAL_LATCH,
                      &page_volheader);
      if (ret != NO_ERROR)
    {
      /* what is this error?? */
      assert_release (false);
    }
      else if (page_volheader == NULL)
    {
      /* volume was not created. fall through. */
      assert (disk_Cache->vols[volid].nsect_free == 0);
    }
      else
    {
      /* get total/max from header */
      DISK_VOLUME_HEADER *volheader = (DISK_VOLUME_HEADER *) page_volheader;
      total = volheader->nsect_total;
      max = volheader->nsect_max;

      pgbuf_unfix (thread_p, page_volheader);
    }

      er_stack_pop ();

      /* volume was added to cache. now remove it */
      free = disk_Cache->vols[volid].nsect_free;
      disk_cache_lock_reserve_for_purpose (disk_Cache->vols[volid].purpose);
      disk_cache_update_vol_free (volid, -disk_Cache->vols[volid].nsect_free);
      disk_cache_unlock_reserve_for_purpose (disk_Cache->vols[volid].purpose);

      assert (disk_Cache->vols[volid].nsect_free == 0);
      disk_Cache->nvols_perm--;
      disk_Cache->vols[volid].purpose = DISK_UNKNOWN_PURPOSE;

      disk_Cache->perm_purpose_info.extend_info.nsect_total -= total;
      disk_Cache->perm_purpose_info.extend_info.nsect_max -= max;

      if (disk_Cache->perm_purpose_info.extend_info.volid_extend == volid)
    {
      /* no longer true */
      disk_Cache->perm_purpose_info.extend_info.volid_extend = NULL_VOLID;
    }

      disk_log ("disk_rv_undo_format", "remove volume %d from cache (free = %d, total = %d, max = %d).",
        volid, free, total, max);
    }
  else
    {
      /* case 1 or 2. disk cache is not updated here. */
      assert (disk_Cache->nvols_perm <= volid || (LOG_ISRESTARTED () && (disk_Cache->nvols_perm - 1 == volid)));
      assert (disk_Cache->vols[volid].nsect_free == 0);
      assert (disk_Cache->perm_purpose_info.extend_info.volid_extend != volid);

      /* make sure purpose is reset */
      disk_Cache->vols[volid].purpose = DISK_UNKNOWN_PURPOSE;

      disk_log ("disk_rv_undo_format", "remove volume %d", volid);
    }

  ret = disk_unformat (thread_p, (char *) rcv->data);
  /* we need to remove from volume info file too... the only way is to recreate it. */
  (void) logpb_recreate_volume_info (thread_p);
  log_append_dboutside_redo (thread_p, RVLOG_OUTSIDE_LOGICAL_REDO_NOOP, 0, NULL);

  return ret;
}

/*
 * disk_rv_redo_format () - Redo the initialization of a disk.
 *   return: NO_ERROR
 *   rcv(in): Recovery structure
 */
int
disk_rv_redo_format (THREAD_ENTRY * thread_p, LOG_RCV * rcv)
{
  int error_code = NO_ERROR;
  DISK_VOLUME_HEADER *volheader;
  bool is_first_call = rcv->offset == -1;
  DKNSECTS nsect_free = 0;
  DKNSECTS nsect_diff;

  rcv->offset = 0;
  (void) pgbuf_set_page_ptype (thread_p, rcv->pgptr, PAGE_VOLHEADER);
  error_code = log_rv_copy_char (thread_p, rcv);
  assert (error_code == NO_ERROR);

  disk_verify_volume_header (thread_p, rcv->pgptr);
  volheader = (DISK_VOLUME_HEADER *) rcv->pgptr;

  if (is_first_call)
    {
      /* don't update disk cache. if the volume is successfully created, another disk_rv_redo_format call will follow,
       * and we can update disk cache then. */

      disk_log ("disk_rv_redo_format", "first call for volume %d at lsa %lld|%d", volheader->volid,
        PGBUF_PAGE_LSA_AS_ARGS (rcv->pgptr));
      return NO_ERROR;
    }

  /* this is the second call of disk_rv_redo_format. we need to update disk cache too.
   * you may be tempted to think that this is a new volume and that all of it must be empty. oh well, even though you
   * are right most of the time, we can have this beautiful case:
   * 1. volume is successfully created.
   * 2. some sectors are reserved.
   * 3. some (or all) sector table pages are flushed to disk, but not the volume header page.
   * 4. other sectors are reserved.
   * 5. server crashes.
   *
   * while we are here, the sectors reserved in step #2 are already found in sector table pages, so the assumption that
   * whole volume is empty is wrong. we need to count the sectors to make sure we are not wrong. */

  if (disk_Cache->nvols_perm == volheader->volid)
    {
      /* add to disk cache */
      disk_Cache->nvols_perm++;
      disk_Cache->vols[volheader->volid].purpose = volheader->purpose;
      disk_Cache->vols[volheader->volid].nsect_free = 0;

      disk_Cache->perm_purpose_info.extend_info.nsect_total += volheader->nsect_total;
      disk_Cache->perm_purpose_info.extend_info.nsect_max += volheader->nsect_max;
    }

  /* fix cache... */
  error_code = disk_stab_iterate_units_all (thread_p, volheader, PGBUF_LATCH_READ, disk_stab_count_free, &nsect_free);
  if (error_code != NO_ERROR)
    {
      assert_release (false);
      return error_code;
    }
  nsect_diff = nsect_free - disk_Cache->vols[volheader->volid].nsect_free;
  if (nsect_diff != 0)
    {
      disk_cache_lock_reserve_for_purpose (volheader->purpose);
      disk_cache_update_vol_free (volheader->volid, nsect_diff);
      disk_cache_unlock_reserve_for_purpose (volheader->purpose);

      /* there, I fixed it */
    }

  disk_log ("disk_rv_redo_format", "second call for volume %d at lsa %lld|%d, free = %d, total = %d, max = %d",
        volheader->volid, PGBUF_PAGE_LSA_AS_ARGS (rcv->pgptr), nsect_free, volheader->nsect_total,
        volheader->nsect_max);

  return error_code;
}

/*
 * disk_rv_dump_hdr () - Dump recovery header information.
 *   return: void
 *   length_ignore(in): Length of Recovery Data
 *   data(in): The data being logged
 */
void
disk_rv_dump_hdr (FILE * fp, int length_ignore, void *data)
{
  DISK_VOLUME_HEADER *vhdr;

  vhdr = (DISK_VOLUME_HEADER *) data;
  disk_vhdr_dump (fp, vhdr);
}

/*
 * disk_rv_redo_init_map () - REDO the initialization of map table page.
 *   return: NO_ERROR
 *   rcv(in): Recovery structure
 */
int
disk_rv_redo_init_map (THREAD_ENTRY * thread_p, LOG_RCV * rcv)
{
  DKNSECTS nsects;
  DISK_STAB_UNIT *stab_unit;

  (void) pgbuf_set_page_ptype (thread_p, rcv->pgptr, PAGE_VOLBITMAP);

  nsects = *(DKNSECTS *) rcv->data;

  /* Initialize the page to zeros, and allocate the needed bits for the pages or sectors */
  memset (rcv->pgptr, 0, DB_PAGESIZE);

  /* One byte at a time */
  for (stab_unit = (DISK_STAB_UNIT *) rcv->pgptr; nsects >= DISK_STAB_UNIT_BIT_COUNT;
       nsects -= DISK_STAB_UNIT_BIT_COUNT, stab_unit++)
    {
      *stab_unit = BIT64_FULL;
    }
  if (nsects > 0)
    {
      *stab_unit = bit64_set_trailing_bits (0, nsects);
    }

  pgbuf_set_dirty (thread_p, rcv->pgptr, DONT_FREE);
  return NO_ERROR;
}

/*
 * disk_rv_dump_init_map () - Dump redo information to initialize a map table page
 *   return: void
 *   length_ignore(in): Length of Recovery Data
 *   data(in): The data being logged
 */
void
disk_rv_dump_init_map (FILE * fp, int length_ignore, void *data)
{
  fprintf (fp, "Nalloc_bits = %d\n", *(INT32 *) data);
}

/*
 * disk_rv_undoredo_set_creation_time () - Recover the modification of change creation stuff
 *   return: NO_ERROR
 *   rcv(in): Recovery structure
 */
int
disk_rv_undoredo_set_creation_time (THREAD_ENTRY * thread_p, LOG_RCV * rcv)
{
  DISK_VOLUME_HEADER *vhdr;
  DISK_RECV_CHANGE_CREATION *change;
  int ret = NO_ERROR;

  vhdr = (DISK_VOLUME_HEADER *) rcv->pgptr;
  change = (DISK_RECV_CHANGE_CREATION *) rcv->data;

  memcpy (&vhdr->db_creation, &change->db_creation, sizeof (change->db_creation));
  memcpy (&vhdr->chkpt_lsa, &change->chkpt_lsa, sizeof (change->chkpt_lsa));
  ret = disk_vhdr_set_vol_fullname (vhdr, change->vol_fullname);

  pgbuf_set_dirty (thread_p, rcv->pgptr, DONT_FREE);

  return NO_ERROR;
}

/*
 * disk_rv_dump_set_creation_time () - Dump either redo or undo change creation information
 *   return: void
 *   length_ignore(in): Length of Recovery Data
 *   data(in): The data being logged
 */
void
disk_rv_dump_set_creation_time (FILE * fp, int length_ignore, void *data)
{
  DISK_RECV_CHANGE_CREATION *change;

  change = (DISK_RECV_CHANGE_CREATION *) data;

  fprintf (fp, "Label = %s, Db_creation = %lld, chkpt = %lld|%lld\n", change->vol_fullname,
       (long long) change->db_creation, (long long) change->chkpt_lsa.pageid, (long long) change->chkpt_lsa.offset);
}

/*
 * disk_rv_undoredo_link () - Recover the link of a volume extension
 *   return: NO_ERROR
 *   rcv(in): Recovery structure
 */
int
disk_rv_undoredo_link (THREAD_ENTRY * thread_p, LOG_RCV * rcv)
{
  DISK_VOLUME_HEADER *vhdr;
  DISK_RECV_LINK_PERM_VOLUME *link;
  int ret = NO_ERROR;

  vhdr = (DISK_VOLUME_HEADER *) rcv->pgptr;
  link = (DISK_RECV_LINK_PERM_VOLUME *) rcv->data;

  vhdr->next_volid = link->next_volid;
  ret = disk_vhdr_set_next_vol_fullname (vhdr, link->next_vol_fullname);
  pgbuf_set_dirty (thread_p, rcv->pgptr, DONT_FREE);

  return NO_ERROR;
}

/*
 * disk_rv_dump_link () - Dump either redo or undo link of a volume extension
 *   return: void
 *   length_ignore(in): Length of Recovery Data
 *   data(in): The data being logged
 */
void
disk_rv_dump_link (FILE * fp, int length_ignore, void *data)
{
  DISK_RECV_LINK_PERM_VOLUME *link;

  link = (DISK_RECV_LINK_PERM_VOLUME *) data;

  fprintf (fp, "Next_Volid = %d, Next_Volextension = %s\n", link->next_volid, link->next_vol_fullname);
}

/*
 * disk_rv_undoredo_set_boot_hfid () - Recover the reset of boot system heap
 *   return: NO_ERROR
 *   rcv(in): Recovery structure
 */
int
disk_rv_undoredo_set_boot_hfid (THREAD_ENTRY * thread_p, LOG_RCV * rcv)
{
  DISK_VOLUME_HEADER *vhdr;
  HFID *hfid;

  vhdr = (DISK_VOLUME_HEADER *) rcv->pgptr;
  hfid = (HFID *) rcv->data;

  vhdr->boot_hfid.vfid.volid = hfid->vfid.volid;
  vhdr->boot_hfid.vfid.fileid = hfid->vfid.fileid;
  vhdr->boot_hfid.hpgid = hfid->hpgid;

  pgbuf_set_dirty (thread_p, rcv->pgptr, DONT_FREE);

  return NO_ERROR;
}

/*
 * disk_rv_dump_set_boot_hfid () - Dump either redo/undo reset of boot system heap
 *   return: void
 *   length_ignore(in): Length of Recovery Data
 *   data(in): The data being logged
 */
void
disk_rv_dump_set_boot_hfid (FILE * fp, int length_ignore, void *data)
{
  HFID *hfid;

  hfid = (HFID *) data;
  fprintf (fp, "Heap: Volid = %d, Fileid = %d, Header_pageid = %d\n", hfid->vfid.volid, hfid->vfid.fileid, hfid->hpgid);
}

/*
 *  disk_rv_redo_volume_expand ()
 *
 *   return: NO_ERROR
 *   rcv(in): Recovery structure
 */
int
disk_rv_redo_volume_expand (THREAD_ENTRY * thread_p, LOG_RCV * rcv)
{
  DISK_RECV_DATA_VOLUME_EXPAND info;

  assert (rcv->length == sizeof (info));
  info = *((DISK_RECV_DATA_VOLUME_EXPAND *) rcv->data);

  return fileio_expand_to (thread_p, info.volid, info.npages, DB_PERMANENT_VOLTYPE);
}

/*
 * disk_rv_dump_volume_expand () -
 *
 *   return: void
 *   length_ignore(in): Length of Recovery Data
 *   data(in):
 */
void
disk_rv_dump_volume_expand (FILE * fp, int length_ignore, void *data)
{
  DISK_RECV_DATA_VOLUME_EXPAND *info;

  info = (DISK_RECV_DATA_VOLUME_EXPAND *) data;

  fprintf (fp, "Volid = %d, volume new size in pages = %d\n", info->volid, info->npages);
}

/*
 * disk_extend () - expand disk storage by extending existing volumes or by adding new volumes.
 *
 * return               : error code
 * thread_p (in)        : thread entry
 * extend_info (in)     : disk extend info
 * reserve_context (in) : reserve context (can be NULL)
 */
static int
disk_extend (THREAD_ENTRY * thread_p, DISK_EXTEND_INFO * extend_info, DISK_RESERVE_CONTEXT * reserve_context)
{
#if defined (SERVER_MODE)
#define DISK_EXTEND_REGISTER() \
  do \
    { \
      tsc_getticks (&end_tick); \
      tsc_elapsed_time_usec (&tv_diff, end_tick, start_tick); \
      TSC_ADD_TIMEVAL (thread_p->event_stats.extend_time, tv_diff); \
      thread_p->event_stats.extend_pages += DISK_SECTS_NPAGES (nsect_extended); \
      if (thread_p->m_px_orig_thread_entry) \
      { \
    THREAD_ENTRY *main_thread_p = thread_get_main_thread (thread_p); \
        main_thread_p->event_stats.extend_pages += DISK_SECTS_NPAGES (nsect_extended); \
      } \
    } \
  while (0)
#define DISK_EXTEND_COLLECT(nsects) \
  do \
    { \
      nsect_extended += (nsects); \
    } \
  while (0)
#endif /* SERVER_MODE */

  DKNSECTS free = extend_info->nsect_free;
  DKNSECTS intention = extend_info->nsect_intention;
  DKNSECTS total = extend_info->nsect_total;
  DKNSECTS max = extend_info->nsect_max;
  DB_VOLTYPE voltype = extend_info->voltype;

  DKNSECTS nsect_extend;
  DKNSECTS target_free;

  DBDEF_VOL_EXT_INFO volext;
  VOLID volid_new = NULL_VOLID;

  DKNSECTS nsect_free_new = 0;

#if defined (SERVER_MODE)
  TSC_TICKS start_tick, end_tick;
  TSCTIMEVAL tv_diff;
  DKNSECTS nsect_extended = 0;
#endif /* SERVER_MODE */

  bool check_interrupt = logtb_get_check_interrupt (thread_p);
  bool continue_check = false;
  int error_code = NO_ERROR;

  /* How this works:
   * There can be only one expand running for permanent volumes and one for temporary volumes. Any expander must first
   * lock expand mutex.
   *
   * We want to avoid concurrent expansions as much as possible, therefore on each expansion we try to allocate more
   * disk space than currently necessary. Moreover, there is an auto-expansion thread that tries to keep a stable level
   * of free space. As long as the worker requirements do not spike, they would never have to do the disk expansion.
   *
   * However, we cannot rule out spikes in disk space requirements. We cannot even rule out concurrent spikes. Intention
   * field saves all sector requests that could not be satisfied with existing free space. Therefore, one expand
   * iteration can serve all expansion needs.
   *
   * This being said, now let's get to how expand works.
   *
   * First we decide how much to expand. we set the target_free to MAX (1% current size, min threshold). Then we subtract
   * the current free space. If the difference is negative, we set it to 0.
   * Then we add the reserve intentions that could not be satisfied by existing disk space.
   *
   * Once we decide how much we want to expand, we first extend last volume are already extended to their maximum
   * capacities. If last volume is also extended to its maximum capacity, we start adding new volumes.
   *
   * NOTE: The same algorithm is applied to both permanent and temporary files. More exactly, the expansion is allowed
   *       for permanent volumes used for permanent data purpose or temporary files used for temporary data purpose.
   *       Permanent volumes for temporary data purpose can only be added by user and are never extended.
   */

  assert (disk_Cache->owner_extend == thread_get_entry_index (thread_p));

  /* expand */
  /* what is the desired remaining free after expand? */
  target_free = MAX ((DKNSECTS) (total * 0.01), DISK_MIN_VOLUME_SECTS);
  /* what is the desired expansion? do not expand less than intention. */
  nsect_extend = MAX (target_free - free, 0) + intention;
  if (nsect_extend <= 0)
    {
      /* no expand needed */
      return NO_ERROR;
    }

  disk_log ("disk_extend", "extend %s disk by %d sectors.", disk_type_to_string (extend_info->voltype), nsect_extend);

#if defined (SERVER_MODE)
  tsc_getticks (&start_tick);
#endif

  if (total < max)
    {
      /* first expand last volume to its capacity */
      DKNSECTS to_expand;

      assert (extend_info->volid_extend != NULL_VOLID);

      to_expand = MIN (nsect_extend, max - total);

#if defined (SERVER_MODE)
      disk_log_extend_elapsed (thread_p, "DISK_EXTEND", fileio_get_volume_label (extend_info->volid_extend, PEEK),
                   extend_info->voltype, "volume extension started");
#endif

      log_sysop_start (thread_p);

      (void) logtb_set_check_interrupt (thread_p, false);
      error_code = disk_volume_expand (thread_p, extend_info->volid_extend, voltype, to_expand, &nsect_free_new);
      (void) logtb_set_check_interrupt (thread_p, check_interrupt);
      if (error_code != NO_ERROR)
    {
      ASSERT_ERROR ();
      log_sysop_abort (thread_p);
      return error_code;
    }

      log_sysop_commit (thread_p);
      assert (nsect_free_new >= to_expand);

      if (extend_info->nsect_total == extend_info->nsect_max)
    {
      /* this cannot be extended any longer. make sure it's not going to be used for that. */
      extend_info->volid_extend = NULL_VOLID;
    }

      disk_log ("disk_extend", "expanded volume %d by %d sectors for %s.", extend_info->volid_extend, nsect_free_new,
        disk_type_to_string (extend_info->voltype));

#if defined (SERVER_MODE)
      disk_log_extend_elapsed (thread_p, "DISK_EXTEND", fileio_get_volume_label (extend_info->volid_extend, PEEK),
                   extend_info->voltype, "volume extension completed");
#endif

      /* subtract from what we need to expand */
      nsect_extend -= nsect_free_new;

      /* no one else modifies total, it is protected by expand mutex */
      extend_info->nsect_total += nsect_free_new;

      disk_cache_lock_reserve (extend_info);
      disk_cache_update_vol_free (extend_info->volid_extend, nsect_free_new);

      if (reserve_context != NULL && reserve_context->n_cache_reserve_remaining > 0)
    {
      disk_reserve_from_cache_volume (extend_info->volid_extend, reserve_context);
    }
      disk_cache_unlock_reserve (extend_info);

#if defined (SERVER_MODE)
      DISK_EXTEND_COLLECT (nsect_free_new);
#endif /* SERVER_MODE */

      if (nsect_extend <= 0)
    {
      /* it is enough */
#if defined (SERVER_MODE)
      DISK_EXTEND_REGISTER ();
#endif /* SERVER_MODE */
      return NO_ERROR;
    }
      assert (extend_info->nsect_total == extend_info->nsect_max);
    }
  assert (nsect_extend > 0);

  /* add new volume(s) */
  volext.nsect_max = extend_info->nsect_vol_max;
  volext.comments = "Automatic Volume Extension";
  volext.voltype = voltype;
  volext.purpose = voltype == DB_PERMANENT_VOLTYPE ? DB_PERMANENT_DATA_PURPOSE : DB_TEMPORARY_DATA_PURPOSE;
  volext.overwrite = false;
  volext.max_writesize_in_sec = 0;
  while (nsect_extend > 0)
    {
      volext.path = NULL;
      volext.name = NULL;

      if (check_interrupt)
    {
      if (logtb_is_interrupted (thread_p, true, &continue_check))
        {
          er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_INTERRUPTED, 0);
          return ER_INTERRUPTED;
        }
    }

      /* set size to remaining sectors */
      volext.nsect_total = nsect_extend + DISK_SYS_NSECT_SIZE (volext.nsect_max);
      /* but size cannot exceed max */
      volext.nsect_total = MIN (volext.nsect_max, volext.nsect_total);
      /* and it cannot be lower than a minimum size */
      volext.nsect_total = MAX (volext.nsect_total, DISK_MIN_VOLUME_SECTS);
      /* we always keep rounded number of sectors */
      volext.nsect_total = DISK_SECTS_ROUND_UP (volext.nsect_total);

      /* add new volume */
      (void) logtb_set_check_interrupt (thread_p, false);
      error_code = disk_add_volume (thread_p, &volext, &volid_new, &nsect_free_new);
      (void) logtb_set_check_interrupt (thread_p, check_interrupt);
      if (error_code != NO_ERROR)
    {
      ASSERT_ERROR ();
      return error_code;
    }
      assert (disk_Cache->nvols_perm + disk_Cache->nvols_temp <= LOG_MAX_DBVOLID);

      disk_log ("disk_extend", "added new volume %d with %d free sectors for %s.", volid_new, nsect_free_new,
        disk_type_to_string (extend_info->voltype));

      nsect_extend -= nsect_free_new;

      /* update total and max */
      extend_info->nsect_total += volext.nsect_total;
      extend_info->nsect_max += volext.nsect_max;

      disk_cache_lock_reserve (extend_info);
      /* add new volume */
      disk_Cache->vols[volid_new].nsect_free = nsect_free_new;
      assert (disk_Cache->vols[volid_new].purpose == volext.purpose);
      extend_info->nsect_free += nsect_free_new;

      if (reserve_context && reserve_context->n_cache_reserve_remaining > 0)
    {
      /* reserve ahead */
      disk_reserve_from_cache_volume (volid_new, reserve_context);
    }

      disk_cache_unlock_reserve (extend_info);

      if (extend_info->nsect_total < extend_info->nsect_max)
    {
      /* update volume used for auto extend */
      extend_info->volid_extend = volid_new;
    }

      assert (disk_is_valid_volid (volid_new));

#if defined (SERVER_MODE)
      DISK_EXTEND_COLLECT (volext.nsect_total);
#endif /* SERVER_MODE */
    }

  /* finished expand */

  /* safe guard: if this was called during sector reservation, the expansion should cover all required sectors. */
  assert (reserve_context == NULL || reserve_context->n_cache_reserve_remaining == 0);
#if defined (SERVER_MODE)
  DISK_EXTEND_REGISTER ();
#endif /* SERVER_MODE */
  return NO_ERROR;

#if defined (SERVER_MODE)
#undef DISK_EXTEND_COLLECT
#undef DISK_EXTEND_REGISTER
#endif /* SERVER_MODE */
}

/*
 * disk_volume_expand () - expand disk space for volume
 *
 * return                   : error code
 * thread_p (in)            : thread entry
 * volid (in)               : volume identifier
 * voltype (in)             : volume type (hint, caller should know)
 * nsect_extend (in)        : desired extension
 * nsect_extended_out (out) : extended size (rounded up desired extension)
 */
static int
disk_volume_expand (THREAD_ENTRY * thread_p, VOLID volid, DB_VOLTYPE voltype, DKNSECTS nsect_extend,
            DKNSECTS * nsect_extended_out)
{
  PAGE_PTR page_volheader = NULL;
  DISK_VOLUME_HEADER *volheader = NULL;
  DISK_RECV_DATA_VOLUME_EXPAND log_data;
  int volume_new_npages;
  bool do_logging;
  int error_code = NO_ERROR;
  LOG_LSA prev_lsa = LSA_INITIALIZER;

  assert (nsect_extend > 0);

  /* how it works:
   *
   * to make sure extension is correctly recovered, we need to:
   * 1. use a system operation (we need to undo change in volume header if expand is not completed).
   * 2. undoredo update on volume header.
   * 3. log expansion (unattached redo - is always executed).
   * 4. commit system operation (to cancel the volume header change undo).
   * 5. flush log! without this last step, it is still possible to expand the volume without recovery
   * 6. now it is safe to expand volume.
   *
   */

  /* round up */
  nsect_extend = DISK_SECTS_ROUND_UP (nsect_extend);

  /* fix volume header */
  error_code = disk_get_volheader (thread_p, volid, PGBUF_LATCH_WRITE, &page_volheader, &volheader);
  if (error_code != NO_ERROR)
    {
      assert_release (false);
      er_set (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, ER_GENERIC_ERROR, 0);
      return ER_FAILED;
    }

  assert (volheader->type == voltype);
  do_logging = (volheader->type == DB_PERMANENT_VOLTYPE);

  /* we need system op here */
  log_sysop_start (thread_p);

  /* update sector total number */
  volheader->nsect_total += nsect_extend;
  disk_verify_volume_header (thread_p, page_volheader);
  if (do_logging)
    {
      prev_lsa = *pgbuf_get_lsa (page_volheader);
      log_append_undoredo_data2 (thread_p, RVDK_VOLHEAD_EXPAND, NULL, page_volheader, NULL_OFFSET,
                 sizeof (nsect_extend), sizeof (nsect_extend), &nsect_extend, &nsect_extend);
    }
  disk_log ("disk_volume_expand", "add %d more sectors to: \n" DISK_VOLHEADER_MSG
        "\t\t\t" PGBUF_PAGE_MODIFY_MSG ("page"), nsect_extend,
        DISK_VOLHEADER_AS_ARGS (volheader), PGBUF_PAGE_MODIFY_ARGS (page_volheader, &prev_lsa));

  FI_TEST (thread_p, FI_TEST_DISK_MANAGER_VOLUME_EXPAND, 0);

  /* volume new size in pages */
  volume_new_npages = DISK_SECTS_NPAGES (volheader->nsect_total);

  if (do_logging)
    {
      /* volume expand must be logged before freeing volume header page. otherwise, this can happen:
       *
       * 1. page is freed.
       * 2. log is flushed including RVDK_VOLHEAD_EXPAND record.
       * 3. system crashes before logging RVDK_EXPAND_VOLUME.
       * 4. after recovery, volume header page says volume is expanded, but it is not.
       */
      log_data.volid = volid;
      log_data.npages = volume_new_npages;

      log_append_dboutside_redo (thread_p, RVDK_EXPAND_VOLUME, sizeof (DISK_RECV_DATA_VOLUME_EXPAND), &log_data);
    }

  /* now it is safe to free volume header */
  pgbuf_set_dirty_and_free (thread_p, page_volheader);

  FI_TEST (thread_p, FI_TEST_DISK_MANAGER_VOLUME_EXPAND, 0);

  /* we can now end system op. */
  log_sysop_commit (thread_p);

  FI_TEST (thread_p, FI_TEST_DISK_MANAGER_VOLUME_EXPAND, 0);

  /* to be sure expansion really happens on recovery too, we must flush log! */
  logpb_force_flush_pages (thread_p);

  FI_TEST (thread_p, FI_TEST_DISK_MANAGER_VOLUME_EXPAND, 0);

  /* expand volume */
  error_code = fileio_expand_to (thread_p, volid, volume_new_npages, voltype);
  if (error_code != NO_ERROR)
    {
      // important note - we just committed volume expansion; we cannot afford any failures here
      // caller won't update cache!!
      assert (false);
      return error_code;
    }

  FI_TEST (thread_p, FI_TEST_DISK_MANAGER_VOLUME_EXPAND, 0);

  *nsect_extended_out = nsect_extend;

  /* success */
  /* caller will update cache */
  return NO_ERROR;
}

/*
 * disk_rv_volhead_extend_redo () - volume header redo recovery after volume extension.
 *
 * return        : Error code
 * thread_p (in) : Thread entry
 * rcv (in)      : Recovery data
 */
int
disk_rv_volhead_extend_redo (THREAD_ENTRY * thread_p, LOG_RCV * rcv)
{
  DISK_VOLUME_HEADER *volheader = (DISK_VOLUME_HEADER *) rcv->pgptr;
  DKNSECTS nsect_extend = *(DKNSECTS *) rcv->data;
  DISK_STAB_CURSOR start_cursor;
  DISK_STAB_CURSOR end_cursor;
  int nfree = 0;
  int error_code = NO_ERROR;

  assert (rcv->length == sizeof (nsect_extend));
  assert (volheader->type == DB_PERMANENT_VOLTYPE);
  assert (volheader->purpose == DB_PERMANENT_DATA_PURPOSE);

  disk_verify_volume_header (thread_p, rcv->pgptr);
  volheader->nsect_total += nsect_extend;
  disk_verify_volume_header (thread_p, rcv->pgptr);

  /* we may think that all these are new sectors, free to reserve, but that may not always be true:
   * 1. volume is extended.
   * 2. some of the sectors in the extended sectors are reserved.
   * 3. sector table pages are flushed to disk.
   * 4. other sectors are reserved.
   * 5. server crashes.
   *
   * at this point, sectors reserved in step #2 are already set and they cannot be considered free. */

  disk_stab_cursor_set_at_sectid (volheader, volheader->nsect_total - nsect_extend, &start_cursor);
  disk_stab_cursor_set_at_end (volheader, &end_cursor);

  error_code =
    disk_stab_iterate_units (thread_p, volheader, PGBUF_LATCH_READ, &start_cursor, &end_cursor, disk_stab_count_free,
                 &nfree);
  if (error_code != NO_ERROR)
    {
      assert_release (false);
      return error_code;
    }
  assert (nfree <= nsect_extend);

  disk_Cache->perm_purpose_info.extend_info.nsect_total += nsect_extend;
  disk_cache_lock_reserve_for_purpose (DB_PERMANENT_DATA_PURPOSE);
  disk_cache_update_vol_free (volheader->volid, nfree);
  disk_cache_unlock_reserve_for_purpose (DB_PERMANENT_DATA_PURPOSE);

  disk_log ("disk_rv_volhead_extend_redo", "extended volume %d total sectors %d (out of which %d were free). "
        "volume header lsa %lld|%d", volheader->volid, nsect_extend, nfree, PGBUF_PAGE_LSA_AS_ARGS (rcv->pgptr));

  pgbuf_set_dirty (thread_p, rcv->pgptr, DONT_FREE);
  return NO_ERROR;
}

/*
 * disk_rv_volhead_extend_undo () - undo the volume header update on aborted expansion
 *
 * return        : NO_ERROR
 * thread_p (in) : thread entry
 * rcv (in)      : recovery data
 */
int
disk_rv_volhead_extend_undo (THREAD_ENTRY * thread_p, LOG_RCV * rcv)
{
  DISK_VOLUME_HEADER *volheader = (DISK_VOLUME_HEADER *) rcv->pgptr;
  DKNSECTS nsect_extend = *(DKNSECTS *) rcv->data;

  assert (rcv->length == sizeof (nsect_extend));
  assert (volheader->type == DB_PERMANENT_VOLTYPE);
  assert (volheader->purpose == DB_PERMANENT_DATA_PURPOSE);

  disk_verify_volume_header (thread_p, rcv->pgptr);
  volheader->nsect_total -= nsect_extend;
  disk_verify_volume_header (thread_p, rcv->pgptr);

  /* disk cache is updated, but no sector could be reserved.
   * NOTE: I assume here that extension is part of a committed sysop */
  disk_Cache->perm_purpose_info.extend_info.nsect_total -= nsect_extend;
  disk_cache_lock_reserve_for_purpose (DB_PERMANENT_DATA_PURPOSE);
  disk_cache_update_vol_free (volheader->volid, -nsect_extend);
  disk_cache_unlock_reserve_for_purpose (DB_PERMANENT_DATA_PURPOSE);

  disk_log ("disk_rv_volhead_extend_undo", "undo volume %d extension - remove %d sectors. volume header lsa %lld|%d\n",
        volheader->volid, nsect_extend, PGBUF_PAGE_LSA_AS_ARGS (rcv->pgptr));
  pgbuf_set_dirty (thread_p, rcv->pgptr, DONT_FREE);
  return NO_ERROR;
}

/*
 * disk_add_volume () - add new volume (permanent or temporary)
 *
 * return                : error code
 * thread_p (in)         : thread entry
 * extinfo (in)          : extend info
 * volid_out (out)       : new volume identifier
 * nsects_free_out (out) : output the number of free sectors in new volume
 */
static int
disk_add_volume (THREAD_ENTRY * thread_p, DBDEF_VOL_EXT_INFO * extinfo, VOLID * volid_out, DKNSECTS * nsects_free_out)
{
  char fullname[PATH_MAX];
  VOLID volid;
  DKNSECTS nsect_part_max;
  int error_code = NO_ERROR;
  bool can_overwrite;

  /* how it works:
   *
   * we need to do several steps to add a new volume:
   * 1. get from boot the full name (path + file name) and volume id.
   * 2. make sure there is enough space on disk to handle a new volume.
   * 3. notify page buffer (it needs to track permanent/temporary volumes).
   * 4. format new volume.
   * 5. update volume info file (if permanent volume).
   * 6. update boot_Db_parm.
   */

  if (disk_Cache->nvols_perm + disk_Cache->nvols_temp >= LOG_MAX_DBVOLID)
    {
      /* oops, too many volumes */
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_BO_MAXNUM_VOLS_HAS_BEEN_EXCEEDED, 1, LOG_MAX_DBVOLID);
      return ER_BO_MAXNUM_VOLS_HAS_BEEN_EXCEEDED;
    }

  /* get from boot the volume name and identifier */
  error_code =
    boot_get_new_volume_name_and_id (thread_p, extinfo->voltype, extinfo->path, extinfo->name, fullname, &volid);
  if (error_code != NO_ERROR)
    {
      ASSERT_ERROR ();
      return error_code;
    }

  /* make sure the total and max size are rounded */
  extinfo->nsect_max = DISK_SECTS_ROUND_UP (extinfo->nsect_max);
  extinfo->nsect_total = DISK_SECTS_ROUND_UP (extinfo->nsect_total);

  disk_log ("disk_add_volume", "add new %s volume with purpose %s:\n" "\tname=%s\n" "\tcomments=%s\n" "\tpath=%s\n"
        "\tfullname = %s\n" "\ttotal sectors = %d\n" "\tmax sectors = %d", disk_type_to_string (extinfo->voltype),
        disk_purpose_to_string (extinfo->purpose), extinfo->name ? extinfo->name : "(UNKNOWN)",
        extinfo->comments ? extinfo->comments : "(UNKNOWN)", extinfo->path ? extinfo->path : "(UNKNOWN)",
        fullname, extinfo->nsect_total, extinfo->nsect_max);

#if defined (SERVER_MODE)
  disk_log_extend_elapsed (thread_p, "DISK_ADD_VOLUME", fullname, extinfo->voltype, "volume creation started");
#endif

#if !defined (WINDOWS)
  {
    DBDEF_VOL_EXT_INFO temp_extinfo = *extinfo;
    char vol_realpath[PATH_MAX];
    char link_path[PATH_MAX];
    char link_fullname[PATH_MAX];
    struct stat stat_buf;

    if (stat (fullname, &stat_buf) == 0 /* file exists */
    && S_ISCHR (stat_buf.st_mode))  /* is the raw device? */
      {
    temp_extinfo.path = fileio_get_directory_path (link_path, boot_db_full_name ());
    if (temp_extinfo.path == NULL)
      {
        link_path[0] = '\0';
        temp_extinfo.path = link_path;
      }
    temp_extinfo.name = fileio_get_base_file_name (boot_db_full_name ());
    fileio_make_volume_ext_name (link_fullname, temp_extinfo.path, temp_extinfo.name, volid);

    if (realpath (fullname, vol_realpath) != NULL)
      {
        strcpy (fullname, vol_realpath);
      }
    (void) unlink (link_fullname);
    if (symlink (fullname, link_fullname) != 0)
      {
        er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_BO_CANNOT_CREATE_LINK, 2, fullname, link_fullname);
        return ER_BO_CANNOT_CREATE_LINK;
      }
    strcpy (fullname, link_fullname);

    /* we don't know character special files size */
    nsect_part_max = VOL_MAX_NSECTS (IO_PAGESIZE);
      }
    else
      {
    nsect_part_max = fileio_get_number_of_partition_free_sectors (fullname);
      }
  }
#else /* WINDOWS */
  nsect_part_max = fileio_get_number_of_partition_free_sectors (fullname);
#endif /* WINDOWS */

  if (nsect_part_max >= 0 && nsect_part_max < extinfo->nsect_max)
    {
      /* not enough space on disk */
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_IO_FORMAT_OUT_OF_SPACE, 5, fullname,
          DISK_SECTS_NPAGES (extinfo->nsect_max), DISK_SECTS_SIZE (extinfo->nsect_max) / 1204 /* KB */ ,
          DISK_SECTS_NPAGES (nsect_part_max), DISK_SECTS_SIZE (nsect_part_max) / 1204 /* KB */ );
      return ER_IO_FORMAT_OUT_OF_SPACE;
    }

  if (extinfo->comments == NULL)
    {
      extinfo->comments = "Volume Extension";
    }
  extinfo->name = fullname;

  if (!extinfo->overwrite && fileio_is_volume_exist (extinfo->name))
    {
      if (disk_can_overwrite_data_volume (thread_p, extinfo->name, &can_overwrite) != NO_ERROR
      || can_overwrite == false)
    {
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_BO_VOLUME_EXISTS, 1, extinfo->name);
      return ER_BO_VOLUME_EXISTS;
    }
    }

  log_sysop_start (thread_p);

  /* with disk_format, we start fixing pages. page fixing may depend on */
  if (extinfo->voltype == DB_PERMANENT_VOLTYPE)
    {
      disk_Cache->nvols_perm++;
    }
  else
    {
      disk_Cache->nvols_temp++;
    }
  disk_Cache->vols[volid].purpose = extinfo->purpose;

  error_code = disk_format (thread_p, boot_db_full_name (), volid, extinfo, nsects_free_out);
  if (error_code != NO_ERROR)
    {
      ASSERT_ERROR ();
      goto exit;
    }

  if (extinfo->voltype == DB_PERMANENT_VOLTYPE)
    {
      if (logpb_add_volume (NULL, volid, extinfo->name, DB_PERMANENT_DATA_PURPOSE) == NULL_VOLID)
    {
      ASSERT_ERROR_AND_SET (error_code);
      goto exit;
    }
    }

  /* this must be last step */
  error_code = boot_dbparm_save_volume (thread_p, extinfo->voltype, volid);
  if (error_code != NO_ERROR)
    {
      ASSERT_ERROR ();
      if (extinfo->voltype == DB_TEMPORARY_VOLTYPE)
    {
      /* rollback will not remove volume, we have to do it manually */
      if (disk_unformat (thread_p, extinfo->name) != NO_ERROR)
        {
          assert (false);
        }
    }
      goto exit;
    }

  assert (error_code == NO_ERROR);
  *volid_out = volid;

exit:
  if (error_code == NO_ERROR)
    {
      log_sysop_commit (thread_p);
    }
  else
    {
      log_sysop_abort (thread_p);

      /* undo incrementing volume count. rollback won't do it. */
      if (extinfo->voltype == DB_TEMPORARY_VOLTYPE)
    {
      disk_Cache->nvols_temp--;
    }
      else
    {
      disk_Cache->nvols_perm--;
    }
    }

#if defined (SERVER_MODE)
  disk_log_extend_elapsed (thread_p, "DISK_ADD_VOLUME", extinfo->name ? extinfo->name : NULL, extinfo->voltype,
               "volume creation completed");
#endif

  return error_code;
}

/*
 * disk_add_volume_extension () - add new volume extension. this can be called by addvol utility or on create database.
 *
 * return                     : error code
 * thread_p (in)              : thread entry
 * purpose (in)               : permanent or temporary purpose
 * npages (in)                : desired number of pages
 * path (in)                  : path to volume file
 * name (in)                  : name of volume file
 * comments (in)              : comments in volume header
 * max_write_size_in_sec (in) : write speed (to limit IO when database is on line)
 * overwrite (in)             : true to overwrite existing file, false otherwise
 * volid_out (out)            : Output new volume identifier
 */
int
disk_add_volume_extension (THREAD_ENTRY * thread_p, DB_VOLPURPOSE purpose, DB_VOLTYPE voltype, DKNPAGES npages,
               const char *path, const char *name, const char *comments, int max_write_size_in_sec,
               bool overwrite, VOLID * volid_out)
{
  DBDEF_VOL_EXT_INFO ext_info;
  VOLID volid_new;
  DKNSECTS nsect_free;
  char buf_realpath[PATH_MAX];
  int error_code = NO_ERROR;

  *volid_out = NULL_VOLID;

  error_code = csect_enter_as_reader (thread_p, CSECT_DISK_CHECK, INF_WAIT);

  /* first we need to block other expansions */
  disk_lock_extend ();

  /* build volume extension info */
  ext_info.purpose = purpose;
  if (path != NULL && realpath (path, buf_realpath) != NULL)
    {
      ext_info.path = buf_realpath;
    }
  else
    {
      ext_info.path = path;
    }
  ext_info.name = name;
  ext_info.comments = comments;
  ext_info.max_writesize_in_sec = max_write_size_in_sec;
  ext_info.overwrite = overwrite;

  /* compute total/max sectors. we always keep a rounded number of sectors. */
  ext_info.nsect_total = disk_sectors_to_extend_npages (npages);
  ext_info.nsect_max = ext_info.nsect_total;
  ext_info.max_npages = npages; /* this is obsolete. I set it just to see it if a crash occurs. */

  /* check voltype */
  if (voltype == DB_TEMPORARY_VOLTYPE)
    {
      /* check parameter of temp_file_max_size_in_pages */
      if (disk_Cache->temp_purpose_info.extend_info.nsect_total + ext_info.nsect_total > disk_Temp_max_sects)
    {
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_BO_MAXTEMP_SPACE_HAS_BEEN_EXCEEDED, 1,
          disk_Temp_max_sects * DISK_SECTOR_NPAGES);
      disk_unlock_extend ();
      csect_exit (thread_p, CSECT_DISK_CHECK);
      error_code = ER_BO_MAXTEMP_SPACE_HAS_BEEN_EXCEEDED;
      return error_code;
    }
      ext_info.voltype = DB_TEMPORARY_VOLTYPE;
    }
  else
    {
      ext_info.voltype = DB_PERMANENT_VOLTYPE;
    }

  /* add volume */
  error_code = disk_add_volume (thread_p, &ext_info, &volid_new, &nsect_free);
  if (error_code != NO_ERROR)
    {
      ASSERT_ERROR ();
      disk_unlock_extend ();
      csect_exit (thread_p, CSECT_DISK_CHECK);
      return error_code;
    }

  assert (ext_info.voltype != DB_PERMANENT_VOLTYPE || volid_new == disk_Cache->nvols_perm - 1);

  if (ext_info.purpose == DB_PERMANENT_DATA_PURPOSE)
    {
      disk_Cache->perm_purpose_info.extend_info.nsect_total += ext_info.nsect_total;
      disk_Cache->perm_purpose_info.extend_info.nsect_max += ext_info.nsect_max;
    }
  else
    {
      if (ext_info.voltype == DB_PERMANENT_VOLTYPE)
    {
      disk_Cache->temp_purpose_info.nsect_perm_total += ext_info.nsect_total;
    }
      else
    {
      disk_Cache->temp_purpose_info.extend_info.nsect_total += ext_info.nsect_total;
    }
    }

  disk_cache_lock_reserve_for_purpose (ext_info.purpose);
  assert (disk_Cache->vols[volid_new].purpose == ext_info.purpose);
  assert (disk_Cache->vols[volid_new].nsect_free == 0);

  disk_cache_update_vol_free (volid_new, nsect_free);
  disk_cache_unlock_reserve_for_purpose (ext_info.purpose);

  /* unblock expand */
  disk_unlock_extend ();
  csect_exit (thread_p, CSECT_DISK_CHECK);

  assert (disk_is_valid_volid (volid_new));

  /* output volume identifier */
  *volid_out = volid_new;

  /* success */
  return NO_ERROR;
}

/*
 * disk_volume_boot () - Boot disk volume.
 *
 * return            : error code
 * thread_p (in)     : thread entry
 * volid (in)        : volume identifier
 * purpose_out (out) : output volume purpose
 * voltype_out (out) : output volume type
 * space_out (out)   : output space information
 */
static int
disk_volume_boot (THREAD_ENTRY * thread_p, VOLID volid, DB_VOLPURPOSE * purpose_out, DB_VOLTYPE * voltype_out,
          DISK_VOLUME_SPACE_INFO * space_out)
{
  PAGE_PTR page_volheader = NULL;
  DISK_VOLUME_HEADER *volheader;
  int error_code = NO_ERROR;

  assert (volid != NULL_VOLID);

  error_code = disk_get_volheader (thread_p, volid, PGBUF_LATCH_WRITE, &page_volheader, &volheader);
  if (error_code != NO_ERROR)
    {
      ASSERT_ERROR ();
      return error_code;
    }

  *purpose_out = volheader->purpose;
  *voltype_out = volheader->type;

  if (*voltype_out == DB_TEMPORARY_VOLTYPE)
    {
      /* don't load temporary volumes */
      pgbuf_unfix_and_init (thread_p, page_volheader);
      return NO_ERROR;
    }

  /* get space info */
  space_out->n_max_sects = volheader->nsect_max;
  space_out->n_total_sects = volheader->nsect_total;

  if (volheader->purpose == DB_TEMPORARY_DATA_PURPOSE)
    {
      /* reset volume */
      assert (volheader->nsect_max == volheader->nsect_total);
      /* set back sectors used by system */
      error_code = disk_stab_init (thread_p, volheader);
      if (error_code != NO_ERROR)
    {
      ASSERT_ERROR ();
      goto exit;
    }
      space_out->n_free_sects = space_out->n_total_sects - SECTOR_FROM_PAGEID (volheader->sys_lastpage) - 1;
    }
  else
    {
      space_out->n_free_sects = 0;
      error_code =
    disk_stab_iterate_units_all (thread_p, volheader, PGBUF_LATCH_READ, disk_stab_count_free,
                     &space_out->n_free_sects);
      if (error_code != NO_ERROR)
    {
      ASSERT_ERROR ();
      goto exit;
    }
    }

  assert (error_code == NO_ERROR);

exit:
  if (page_volheader != NULL)
    {
      pgbuf_unfix (thread_p, page_volheader);
    }

  return error_code;
}

/*
 * disk_volume_is_empty () - Check whether a disk volume is empty.
 *
 * return        : true if the volume does not exist or is empty (no data),
 *                 false if the volume exists and contains data
 * thread_p (in) : thread entry
 * volid (in)    : volume identifier
 */
static bool
disk_volume_is_empty (THREAD_ENTRY * thread_p, VOLID volid)
{
  DISK_VOLUME_HEADER *volheader = NULL;
  DISK_STAB_CURSOR start_cursor = DISK_STAB_CURSOR_INITIALIZER;
  DISK_STAB_CURSOR end_cursor = DISK_STAB_CURSOR_INITIALIZER;
  PAGE_PTR pgptr = NULL;
  bool has_used = true;

  if (xdisk_is_volume_exist (thread_p, volid) == false)
    {
      return true;
    }

  if (disk_get_volheader (thread_p, volid, PGBUF_LATCH_READ, &pgptr, &volheader) != NO_ERROR)
    {
      assert (false);
      er_clear ();

      return false;
    }

  disk_stab_cursor_set_at_sectid (volheader, SECTOR_FROM_PAGEID (volheader->sys_lastpage) + 1, &start_cursor);
  disk_stab_cursor_set_at_end (volheader, &end_cursor);

  (void) disk_stab_iterate_units (thread_p, volheader, PGBUF_LATCH_READ, &start_cursor, &end_cursor, disk_stab_has_used,
                  &has_used);

  if (pgptr != NULL)
    {
      pgbuf_unfix (thread_p, pgptr);
    }

  return has_used ? false : true;
}

/************************************************************************/
/* Disk cache section                                                   */
/************************************************************************/

/*
 * disk_cache_load_volume () - load and cache volume information
 *
 * return        : true if successful, false otherwise
 * thread_p (in) : thread entry
 * volid (in)    : volume identifier
 * ignore (in)   : not used
 */
static bool
disk_cache_load_volume (THREAD_ENTRY * thread_p, INT16 volid, void *ignore)
{
  DB_VOLPURPOSE vol_purpose;
  DB_VOLTYPE vol_type;
  DISK_VOLUME_SPACE_INFO space_info = DISK_VOLUME_SPACE_INFO_INITIALIZER;

  if (disk_volume_boot (thread_p, volid, &vol_purpose, &vol_type, &space_info) != NO_ERROR)
    {
      ASSERT_ERROR ();
      return false;
    }

  if (vol_type != DB_PERMANENT_VOLTYPE)
    {
      /* don't save temporary volumes... they will be dropped anyway */
      return true;
    }

  /* called during boot, no sync required */
  if (vol_purpose == DB_PERMANENT_DATA_PURPOSE)
    {
      disk_Cache->perm_purpose_info.extend_info.nsect_free += space_info.n_free_sects;
      disk_Cache->perm_purpose_info.extend_info.nsect_total += space_info.n_total_sects;
      disk_Cache->perm_purpose_info.extend_info.nsect_max += space_info.n_max_sects;

      assert (disk_Cache->perm_purpose_info.extend_info.nsect_free
          <= disk_Cache->perm_purpose_info.extend_info.nsect_total);
      assert (disk_Cache->perm_purpose_info.extend_info.nsect_total
          <= disk_Cache->perm_purpose_info.extend_info.nsect_max);

      if (space_info.n_total_sects < space_info.n_max_sects)
    {
      assert (disk_Cache->perm_purpose_info.extend_info.volid_extend == NULL_VOLID);
      disk_Cache->perm_purpose_info.extend_info.volid_extend = volid;
    }
    }
  else
    {
      assert (space_info.n_total_sects == space_info.n_max_sects);

      disk_Cache->temp_purpose_info.nsect_perm_free += space_info.n_free_sects;
      disk_Cache->temp_purpose_info.nsect_perm_total += space_info.n_total_sects;

      assert (disk_Cache->temp_purpose_info.nsect_perm_free <= disk_Cache->temp_purpose_info.nsect_perm_total);
    }

  disk_Cache->vols[volid].nsect_free = space_info.n_free_sects;
  disk_Cache->vols[volid].purpose = vol_purpose;

  disk_Cache->nvols_perm++;
  return true;
}

/*
 * disk_cache_init () - initialize disk cache
 *
 * return    : error code
 * void (in) : void
 */
static int
disk_cache_init (void)
{
  int i;

  assert (disk_Cache == NULL);

  disk_Cache = (DISK_CACHE *) malloc (sizeof (DISK_CACHE));
  if (disk_Cache == NULL)
    {
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, sizeof (DISK_CACHE));
      return ER_OUT_OF_VIRTUAL_MEMORY;
    }

  disk_Cache->nvols_perm = 0;
  disk_Cache->nvols_temp = 0;

  disk_Cache->perm_purpose_info.extend_info.nsect_vol_max =
    DISK_SECTS_ROUND_UP ((DKNSECTS) (prm_get_bigint_value (PRM_ID_DB_VOLUME_SIZE) / IO_SECTORSIZE));
  disk_Cache->perm_purpose_info.extend_info.nsect_free = 0;
  disk_Cache->perm_purpose_info.extend_info.nsect_total = 0;
  disk_Cache->perm_purpose_info.extend_info.nsect_max = 0;
  disk_Cache->perm_purpose_info.extend_info.nsect_intention = 0;
  disk_Cache->perm_purpose_info.extend_info.voltype = DB_PERMANENT_VOLTYPE;
  disk_Cache->perm_purpose_info.extend_info.volid_extend = NULL_VOLID;
  pthread_mutex_init (&disk_Cache->perm_purpose_info.extend_info.mutex_reserve, NULL);
#if !defined (NDEBUG)
  disk_Cache->perm_purpose_info.extend_info.owner_reserve = -1;
#endif /* !NDEBUG */

  disk_Cache->temp_purpose_info.extend_info.nsect_vol_max =
    DISK_SECTS_ROUND_UP ((DKNSECTS) (prm_get_bigint_value (PRM_ID_DB_VOLUME_SIZE) / IO_SECTORSIZE));
  disk_Cache->temp_purpose_info.nsect_perm_free = 0;
  disk_Cache->temp_purpose_info.nsect_perm_total = 0;
  disk_Cache->temp_purpose_info.extend_info.nsect_free = 0;
  disk_Cache->temp_purpose_info.extend_info.nsect_total = 0;
  disk_Cache->temp_purpose_info.extend_info.nsect_max = 0;
  disk_Cache->temp_purpose_info.extend_info.nsect_intention = 0;
  disk_Cache->temp_purpose_info.extend_info.voltype = DB_TEMPORARY_VOLTYPE;
  disk_Cache->temp_purpose_info.extend_info.volid_extend = NULL_VOLID;
  pthread_mutex_init (&disk_Cache->temp_purpose_info.extend_info.mutex_reserve, NULL);
#if !defined (NDEBUG)
  disk_Cache->temp_purpose_info.extend_info.owner_reserve = -1;
#endif /* !NDEBUG */

  pthread_mutex_init (&disk_Cache->mutex_extend, NULL);
#if !defined (NDEBUG)
  disk_Cache->owner_extend = -1;
#endif /* !NDEBUG */

  for (i = 0; i <= LOG_MAX_DBVOLID; i++)
    {
      disk_Cache->vols[i].purpose = DISK_UNKNOWN_PURPOSE;
      disk_Cache->vols[i].nsect_free = 0;
    }
  return NO_ERROR;
}

/*
 * disk_cache_final () - finalize disk cache
 */
static void
disk_cache_final (void)
{
  if (disk_Cache == NULL)
    {
      /* not initialized */
      return;
    }

  assert (disk_Cache->perm_purpose_info.extend_info.owner_reserve == -1);
  assert (disk_Cache->temp_purpose_info.extend_info.owner_reserve == -1);
  assert (disk_Cache->owner_extend == -1);

  pthread_mutex_destroy (&disk_Cache->perm_purpose_info.extend_info.mutex_reserve);
  pthread_mutex_destroy (&disk_Cache->temp_purpose_info.extend_info.mutex_reserve);
  pthread_mutex_destroy (&disk_Cache->mutex_extend);

  free_and_init (disk_Cache);
}

/*
 * disk_cache_load_all_volumes () - load all disk volumes and save info to disk cache
 *
 * return        : true if successful, false otherwise
 * thread_p (in) : thread entry
 */
static bool
disk_cache_load_all_volumes (THREAD_ENTRY * thread_p)
{
  /* Cache every single volume */
  assert (disk_Cache != NULL);
  return fileio_map_mounted (thread_p, disk_cache_load_volume, NULL);
}

/*
 * disk_cache_free_reserved () - add reserved sectors to cache free sectors
 *
 * return       : void
 * context (in) : reserve context
 */
STATIC_INLINE void
disk_cache_free_reserved (DISK_RESERVE_CONTEXT * context)
{
  int iter;

  disk_cache_lock_reserve_for_purpose (context->purpose);
  for (iter = 0; iter < context->n_cache_vol_reserve; iter++)
    {
      disk_cache_update_vol_free (context->cache_vol_reserve[iter].volid, context->cache_vol_reserve[iter].nsect);
    }
  disk_cache_unlock_reserve_for_purpose (context->purpose);
}

/*
 * disk_cache_update_vol_free () - update number of volume free sectors in cache
 *
 * return          : void
 * volid (in)      : volume identifier
 * delta_free (in) : delta free sectors
 */
STATIC_INLINE void
disk_cache_update_vol_free (VOLID volid, DKNSECTS delta_free)
{
  /* must be locked */
  disk_Cache->vols[volid].nsect_free += delta_free;
  assert (disk_Cache->vols[volid].nsect_free >= 0);
  disk_check_own_reserve_for_purpose (disk_Cache->vols[volid].purpose);
  if (disk_Cache->vols[volid].purpose == DB_PERMANENT_DATA_PURPOSE)
    {
      assert (disk_get_voltype (volid) == DB_PERMANENT_VOLTYPE);
      disk_Cache->perm_purpose_info.extend_info.nsect_free += delta_free;
      assert (disk_Cache->perm_purpose_info.extend_info.nsect_free >= 0);

      disk_log ("disk_cache_update_vol_free", "updated cached free for volid %d to %d and perm_perm to %d; "
        "delta free = %d", volid, disk_Cache->vols[volid].nsect_free,
        disk_Cache->perm_purpose_info.extend_info.nsect_free, delta_free);
    }
  else
    {
      if (disk_get_voltype (volid) == DB_PERMANENT_VOLTYPE)
    {
      disk_Cache->temp_purpose_info.nsect_perm_free += delta_free;
      assert (disk_Cache->temp_purpose_info.nsect_perm_free >= 0);

      disk_log ("disk_cache_update_vol_free", "updated cached free for volid %d to %d and perm_temp to %d; "
            "delta free = %d", volid, disk_Cache->vols[volid].nsect_free,
            disk_Cache->temp_purpose_info.nsect_perm_free, delta_free);
    }
      else
    {
      disk_Cache->temp_purpose_info.extend_info.nsect_free += delta_free;
      assert (disk_Cache->temp_purpose_info.extend_info.nsect_free >= 0);

      disk_log ("disk_cache_update_vol_free", "updated cached free for volid %d to %d and temp_temp to %d; "
            "delta free = %d", volid, disk_Cache->vols[volid].nsect_free,
            disk_Cache->temp_purpose_info.extend_info.nsect_free, delta_free);
    }
    }
}

/*
 * disk_lock_extend () - lock volume extension mutex
 */
void
disk_lock_extend (void)
{
#if defined (NDEBUG)
  pthread_mutex_lock (&disk_Cache->mutex_extend);
#else /* !NDEBUG */
  int me = thread_get_current_entry_index ();

  assert (me != disk_Cache->perm_purpose_info.extend_info.owner_reserve);
  assert (me != disk_Cache->temp_purpose_info.extend_info.owner_reserve);
  if (me == disk_Cache->owner_extend)
    {
      /* already owner */
      assert (false);
      return;
    }

  pthread_mutex_lock (&disk_Cache->mutex_extend);
  assert (disk_Cache->owner_extend == -1);
  disk_Cache->owner_extend = me;
#endif /* !NDEBUG */
}

/*
 * disk_unlock_extend () - unlock volume extension mutex
 */
void
disk_unlock_extend (void)
{
#if defined (NDEBUG)
  pthread_mutex_unlock (&disk_Cache->mutex_extend);
#else /* !NDEBUG */
  int me = thread_get_current_entry_index ();

  assert (disk_Cache->owner_extend == me);
  disk_Cache->owner_extend = -1;
  pthread_mutex_unlock (&disk_Cache->mutex_extend);
#endif /* !NDEBUG */
}

/*
 * disk_cache_lock_reserve_for_purpose () - lock reservations for given purpose
 *
 * return       : void
 * purpose (in) : permanent/temporary purpose
 */
STATIC_INLINE void
disk_cache_lock_reserve_for_purpose (DB_VOLPURPOSE purpose)
{
  if (purpose == DB_PERMANENT_DATA_PURPOSE)
    {
      disk_cache_lock_reserve (&disk_Cache->perm_purpose_info.extend_info);
    }
  else
    {
      disk_cache_lock_reserve (&disk_Cache->temp_purpose_info.extend_info);
    }
}

/*
 * disk_cache_unlock_reserve_for_purpose () - unlock reservations for given purpose
 *
 * return       : void
 * purpose (in) : permanent/temporary purpose
 */
STATIC_INLINE void
disk_cache_unlock_reserve_for_purpose (DB_VOLPURPOSE purpose)
{
  if (purpose == DB_PERMANENT_DATA_PURPOSE)
    {
      disk_cache_unlock_reserve (&disk_Cache->perm_purpose_info.extend_info);
    }
  else
    {
      disk_cache_unlock_reserve (&disk_Cache->temp_purpose_info.extend_info);
    }
}

/*
 * disk_cache_lock_reserve () - lock reservations (permanent or temporary)
 *
 * return           : void
 * extend_info (in) : extend info (permanent or temporary)
 */
STATIC_INLINE void
disk_cache_lock_reserve (DISK_EXTEND_INFO * extend_info)
{
#if defined (NDEBUG)
  pthread_mutex_lock (&extend_info->mutex_reserve);
#else /* !NDEBUG */
  int me = thread_get_current_entry_index ();

  if (me == extend_info->owner_reserve)
    {
      /* already owner */
      assert (false);
      return;
    }
  pthread_mutex_lock (&extend_info->mutex_reserve);
  assert (extend_info->owner_reserve == -1);
  extend_info->owner_reserve = me;
#endif /* !NDEBUG */
}

/*
 * disk_cache_unlock_reserve () - lock reservations (permanent or temporary)
 *
 * return           : void
 * extend_info (in) : extend info (permanent or temporary)
 */
STATIC_INLINE void
disk_cache_unlock_reserve (DISK_EXTEND_INFO * extend_info)
{
#if defined (NDEBUG)
  pthread_mutex_unlock (&extend_info->mutex_reserve);
#else /* !NDEBUG */
  int me = thread_get_current_entry_index ();

  assert (me == extend_info->owner_reserve);
  extend_info->owner_reserve = -1;
  pthread_mutex_unlock (&extend_info->mutex_reserve);
#endif /* !NDEBUG */
}

/************************************************************************/
/* Disk volume header section                                           */
/************************************************************************/

/*
 * disk_volume_header_start_scan () -  start scan function for show volume header
 *   return: NO_ERROR, or ER_code
 *
 *   thread_p(in):
 *   type (in):
 *   arg_values(in):
 *   arg_cnt(in):
 *   ptr(in/out): volume header context
 */
extern int
disk_volume_header_start_scan (THREAD_ENTRY * thread_p, int type, DB_VALUE ** arg_values, int arg_cnt, void **ptr)
{
  int error = NO_ERROR;
  DISK_VOL_HEADER_CONTEXT *ctx = NULL;

  ctx = (DISK_VOL_HEADER_CONTEXT *) db_private_alloc (thread_p, sizeof (DISK_VOL_HEADER_CONTEXT));
  if (ctx == NULL)
    {
      ASSERT_ERROR_AND_SET (error);
      goto exit_on_error;
    }
  memset (ctx, 0, sizeof (DISK_VOL_HEADER_CONTEXT));

  assert (arg_values != NULL && arg_cnt && arg_values[0] != NULL);
  assert (DB_VALUE_TYPE (arg_values[0]) == DB_TYPE_INTEGER);
  ctx->volume_id = db_get_int (arg_values[0]);

  /* if volume id is out of range */
  if (ctx->volume_id < 0 || ctx->volume_id > DB_INT16_MAX)
    {
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_DIAG_VOLID_NOT_EXIST, 1, ctx->volume_id);
      error = ER_DIAG_VOLID_NOT_EXIST;
      goto exit_on_error;
    }

  /* check volume id exist or not */
  if (xdisk_is_volume_exist (thread_p, (INT16) ctx->volume_id) == false)
    {
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_DIAG_VOLID_NOT_EXIST, 1, ctx->volume_id);
      error = ER_DIAG_VOLID_NOT_EXIST;
      goto exit_on_error;
    }

  *ptr = ctx;
  return NO_ERROR;

exit_on_error:
  if (ctx != NULL)
    {
      db_private_free (thread_p, ctx);
    }
  return error;
}

/*
 * disk_volume_header_next_scan () -  next scan function for show volume header
 *   return: NO_ERROR, or ER_code
 *
 *   thread_p(in):
 *   ptr(in/out): volume header context
 */
SCAN_CODE
disk_volume_header_next_scan (THREAD_ENTRY * thread_p, int cursor, DB_VALUE ** out_values, int out_cnt, void *ptr)
{
  DISK_VOLUME_HEADER *vhdr;
  int error = NO_ERROR, idx = 0;
  PAGE_PTR pgptr = NULL;
  DB_DATETIME vol_creation;
  char buf[256];
  DISK_VOL_HEADER_CONTEXT *ctx = (DISK_VOL_HEADER_CONTEXT *) ptr;

  if (cursor >= 1)
    {
      return S_END;
    }

  error = disk_get_volheader (thread_p, ctx->volume_id, PGBUF_LATCH_READ, &pgptr, &vhdr);
  if (error != NO_ERROR)
    {
      ASSERT_ERROR ();
      goto exit;
    }

  /* fill each column */
  db_make_int (out_values[idx], ctx->volume_id);
  idx++;

  snprintf (buf, sizeof (buf), "MAGIC SYMBOL = %s at disk location = %lld", vhdr->magic,
        offsetof (FILEIO_PAGE, page) + (long long) offsetof (DISK_VOLUME_HEADER, magic));
  error = db_make_string_copy (out_values[idx], buf);
  idx++;
  if (error != NO_ERROR)
    {
      goto exit;
    }

  db_make_int (out_values[idx], vhdr->iopagesize);
  idx++;

  db_make_string (out_values[idx], disk_purpose_to_string (vhdr->purpose));
  idx++;

  db_make_string (out_values[idx], disk_type_to_string (vhdr->type));
  idx++;

  db_make_int (out_values[idx], vhdr->sect_npgs);
  idx++;

  db_make_int (out_values[idx], vhdr->nsect_total);
  idx++;

  /* free sects are not kept in header */
  db_make_int (out_values[idx], disk_Cache->vols[ctx->volume_id].nsect_free);
  idx++;

  db_make_int (out_values[idx], vhdr->nsect_max);
  idx++;

  db_make_int (out_values[idx], vhdr->hint_allocsect);
  idx++;

  db_make_int (out_values[idx], vhdr->stab_npages);
  idx++;

  db_make_int (out_values[idx], vhdr->stab_first_page);
  idx++;

  db_make_int (out_values[idx], vhdr->sys_lastpage);
  idx++;

  db_localdatetime ((time_t *) (&vhdr->vol_creation), &vol_creation);
  db_make_datetime (out_values[idx], &vol_creation);
  idx++;

  db_make_int (out_values[idx], vhdr->db_charset);
  idx++;

  lsa_to_string (buf, sizeof (buf), &vhdr->chkpt_lsa);
  error = db_make_string_copy (out_values[idx], buf);
  idx++;
  if (error != NO_ERROR)
    {
      goto exit;
    }

  error = db_make_string_copy (out_values[idx], hfid_to_string (buf, sizeof (buf), &vhdr->boot_hfid));
  idx++;
  if (error != NO_ERROR)
    {
      goto exit;
    }

  error = db_make_string_copy (out_values[idx], (char *) (vhdr->var_fields + vhdr->offset_to_vol_fullname));
  idx++;
  if (error != NO_ERROR)
    {
      goto exit;
    }

  db_make_int (out_values[idx], vhdr->next_volid);
  idx++;

  error = db_make_string_copy (out_values[idx], (char *) (vhdr->var_fields + vhdr->offset_to_next_vol_fullname));
  idx++;
  if (error != NO_ERROR)
    {
      goto exit;
    }

  error = db_make_string_copy (out_values[idx], (char *) (vhdr->var_fields + vhdr->offset_to_vol_remarks));
  idx++;
  if (error != NO_ERROR)
    {
      goto exit;
    }

  assert (idx == out_cnt);

exit:
  if (pgptr)
    {
      pgbuf_unfix (thread_p, pgptr);
    }
  return error == NO_ERROR ? S_SUCCESS : S_ERROR;
}

/*
 * disk_volume_header_end_scan() -- end scan function of show volume header
 *   return: NO_ERROR, or ER_code
 *
 *   thread_p(in):
 *   ptr(in):  volume header context
 */
int
disk_volume_header_end_scan (THREAD_ENTRY * thread_p, void **ptr)
{
  db_private_free_and_init (thread_p, *ptr);
  return NO_ERROR;
}

/*
 * disk_vhdr_dump () - Dump the volume header structure.
 *   return: void
 *   vhdr(in): Pointer to volume header
 *
 * Note: This function is used for debugging purposes.
 */
static void
disk_vhdr_dump (FILE * fp, const DISK_VOLUME_HEADER * vhdr)
{
  char db_creation_time_val[CTIME_MAX];
  char vol_creation_time_val[CTIME_MAX];

  (void) ctime_r ((time_t *) & vhdr->db_creation, db_creation_time_val);
  (void) ctime_r ((time_t *) & vhdr->vol_creation, vol_creation_time_val);

  (void) fprintf (fp, " MAGIC SYMBOL = %s at disk location = %lld\n", vhdr->magic,
          offsetof (FILEIO_PAGE, page) + (long long) offsetof (DISK_VOLUME_HEADER, magic));
  (void) fprintf (fp, " io_pagesize = %d,\n", vhdr->iopagesize);
  (void) fprintf (fp, " VID = %d, VOL_FULLNAME = %s\n VOL PURPOSE = %s\n VOL TYPE = %s VOL_REMARKS = %s\n",
          vhdr->volid, disk_vhdr_get_vol_fullname (vhdr), disk_purpose_to_string (vhdr->purpose),
          disk_type_to_string (vhdr->type), disk_vhdr_get_vol_remarks (vhdr));
  (void) fprintf (fp, " NEXT_VID = %d, NEXT_VOL_FULLNAME = %s\n", vhdr->next_volid,
          disk_vhdr_get_next_vol_fullname (vhdr));
  (void) fprintf (fp, " LAST SYSTEM PAGE = %d\n", vhdr->sys_lastpage);
  (void) fprintf (fp, " SECTOR: SIZE IN PAGES = %10d, TOTAL = %10d,", vhdr->sect_npgs, vhdr->nsect_total);
  (void) fprintf (fp, " FREE = %10d, MAX=%10d\n", disk_Cache->vols[vhdr->volid].nsect_free, vhdr->nsect_max);
  (void) fprintf (fp, " %10s HINT_ALLOC = %10d\n", " ", vhdr->hint_allocsect);
  (void) fprintf (fp, " SECTOR TABLE:    SIZE IN PAGES = %10d, FIRST_PAGE = %5d\n", vhdr->stab_npages,
          vhdr->stab_first_page);

  (void) fprintf (fp, " Database creation time = %s", db_creation_time_val);
  (void) fprintf (fp, " Volume creation time = %s", vol_creation_time_val);
  (void) fprintf (fp, " Lowest Checkpoint for recovery = %lld|%lld\n",
          (long long) vhdr->chkpt_lsa.pageid, (long long) vhdr->chkpt_lsa.offset);
  (void) fprintf (fp, "Boot_hfid: volid %d, fileid %d header_pageid %d\n", vhdr->boot_hfid.vfid.volid,
          vhdr->boot_hfid.vfid.fileid, vhdr->boot_hfid.hpgid);
  (void) fprintf (fp, " db_charset = %d\n", vhdr->db_charset);
}

/*
 * disk_volume_header_set_stab () - knowing the maximum number of sectors, set header fields for sector tables
 *
 * return         : void
 * volheader (in) : volume header
 */
STATIC_INLINE void
disk_volume_header_set_stab (DB_VOLPURPOSE vol_purpose, DISK_VOLUME_HEADER * volheader)
{
  volheader->stab_first_page = DISK_VOLHEADER_PAGE + 1;
  volheader->stab_npages = CEIL_PTVDIV (volheader->nsect_max, DISK_STAB_PAGE_BIT_COUNT);
  volheader->sys_lastpage = volheader->stab_first_page + volheader->stab_npages - 1;
}

/*
 * disk_verify_volume_header () -
 *   return: void
 *   pgptr(in): Pointer to volume header page
 */
STATIC_INLINE void
disk_verify_volume_header (THREAD_ENTRY * thread_p, PAGE_PTR pgptr)
{
#if !defined (NDEBUG)
  DISK_VOLUME_HEADER *vhdr;

  assert (pgptr != NULL);
  (void) pgbuf_check_page_ptype (thread_p, pgptr, PAGE_VOLHEADER);
  vhdr = (DISK_VOLUME_HEADER *) pgptr;

  assert (vhdr->sect_npgs == DISK_SECTOR_NPAGES);
  assert (vhdr->nsect_total > 0);
  assert (vhdr->nsect_total <= vhdr->nsect_max);
  DISK_SECTS_ASSERT_ROUNDED (vhdr->nsect_total);
  DISK_SECTS_ASSERT_ROUNDED (vhdr->nsect_max);
  assert (disk_Cache == NULL || disk_Cache->vols[vhdr->volid].nsect_free <= vhdr->nsect_total);

  assert (vhdr->stab_first_page == DISK_VOLHEADER_PAGE + 1);
  assert (vhdr->stab_npages >= CEIL_PTVDIV (vhdr->nsect_total, DISK_STAB_PAGE_BIT_COUNT));
  assert (vhdr->stab_npages == CEIL_PTVDIV (vhdr->nsect_max, DISK_STAB_PAGE_BIT_COUNT));
  assert (vhdr->sys_lastpage == (vhdr->stab_first_page + vhdr->stab_npages - 1));

  assert (vhdr->purpose == DB_PERMANENT_DATA_PURPOSE || vhdr->purpose == DB_TEMPORARY_DATA_PURPOSE);
  assert (vhdr->type == DB_PERMANENT_VOLTYPE || vhdr->type == DB_TEMPORARY_VOLTYPE);
  assert (vhdr->type != DB_TEMPORARY_VOLTYPE || vhdr->purpose != DB_PERMANENT_DATA_PURPOSE);
#endif /* !NDEBUG */
}

/*
 * disk_get_volheader_internal () - get volume header page and header
 *
 * return                   : error code
 * thread_p (in)            : thread entry
 * volid (in)               : volume id
 * latch_mode (in)          : latch mode for volume header page
 * page_volheader_out (out) : output volume header page
 * volheader_out (out)      : output volume header
 * file (in)                : (debug only) caller file
 * line (in)                : (debug only) caller line
 */
STATIC_INLINE int
disk_get_volheader_internal (THREAD_ENTRY * thread_p, VOLID volid, PGBUF_LATCH_MODE latch_mode,
                 PAGE_PTR * page_volheader_out, DISK_VOLUME_HEADER ** volheader_out
#if !defined (NDEBUG)
                 , const char *file, int line
#endif              /* !NDEBUG */
  )
{
  VPID vpid_volheader;
  int error_code = NO_ERROR;

  vpid_volheader.volid = volid;
  vpid_volheader.pageid = DISK_VOLHEADER_PAGE;

  *page_volheader_out = pgbuf_fix (thread_p, &vpid_volheader, OLD_PAGE, latch_mode, PGBUF_UNCONDITIONAL_LATCH);
  if (*page_volheader_out == NULL)
    {
      ASSERT_ERROR_AND_SET (error_code);
      return error_code;
    }

  disk_verify_volume_header (thread_p, *page_volheader_out);
  *volheader_out = (DISK_VOLUME_HEADER *) (*page_volheader_out);

  return NO_ERROR;
}

/************************************************************************/
/* Disk allocation table section.                                       */
/************************************************************************/

/*
 * disk_stab_cursor_set_at_sectid () - Position cursor for allocation table at sector ID.
 *
 * return     : Void.
 * volheader (in) : Volume header.
 * sectid (in)    : Sector ID.
 * cursor (out)   : Allocation table cursor.
 */
STATIC_INLINE void
disk_stab_cursor_set_at_sectid (const DISK_VOLUME_HEADER * volheader, SECTID sectid, DISK_STAB_CURSOR * cursor)
{
  assert (volheader != NULL);
  assert (cursor != NULL);
  assert (sectid >= 0 && sectid <= volheader->nsect_total);

  cursor->volheader = volheader;
  cursor->sectid = sectid;

  cursor->pageid = volheader->stab_first_page + DISK_ALLOCTBL_SECTOR_PAGE_OFFSET (sectid);
  assert (cursor->pageid < volheader->stab_first_page + volheader->stab_npages);
  cursor->offset_to_unit = DISK_ALLOCTBL_SECTOR_UNIT_OFFSET (sectid);
  cursor->offset_to_bit = DISK_ALLOCTBL_SECTOR_BIT_OFFSET (sectid);

  cursor->page = NULL;
  cursor->unit = NULL;
}

/*
 * disk_stab_cursor_set_at_end () - Position cursor at the end of allocation table.
 *
 * return     : Void.
 * volheader (in) : Volume header.
 * cursor (out)   : Allocation table cursor.
 */
STATIC_INLINE void
disk_stab_cursor_set_at_end (const DISK_VOLUME_HEADER * volheader, DISK_STAB_CURSOR * cursor)
{
  assert (volheader != NULL);
  assert (cursor != NULL);

  cursor->volheader = volheader;

  DISK_SECTS_ASSERT_ROUNDED (volheader->nsect_total);
  disk_stab_cursor_set_at_sectid (volheader, volheader->nsect_total, cursor);
}

/*
 * disk_stab_cursor_set_at_start () - Position cursor at the start of allocation table.
 *
 * return     : Void.
 * volheader (in) : Volume header.
 * cursor (out)   : Allocation table cursor.
 */
STATIC_INLINE void
disk_stab_cursor_set_at_start (const DISK_VOLUME_HEADER * volheader, DISK_STAB_CURSOR * cursor)
{
  assert (volheader != NULL);
  assert (cursor != NULL);

  cursor->volheader = volheader;
  cursor->sectid = 0;

  cursor->pageid = volheader->stab_first_page;
  cursor->offset_to_unit = 0;
  cursor->offset_to_bit = 0;
  cursor->page = NULL;
  cursor->unit = NULL;
}

/*
 * disk_stab_cursor_compare () - Compare two allocation table cursors.
 *
 * return         : -1 if first cursor is positioned before second cursor.
 *          0 if both cursors have the same position.
 *          1 if first cursor is positioned after second cursor.
 * first_cursor (in)  : First cursor.
 * second_cursor (in) : Second cursor.
 */
STATIC_INLINE int
disk_stab_cursor_compare (const DISK_STAB_CURSOR * first_cursor, const DISK_STAB_CURSOR * second_cursor)
{
  assert (first_cursor != NULL);
  assert (second_cursor != NULL);

  if (first_cursor->pageid < second_cursor->pageid)
    {
      return -1;
    }
  else if (first_cursor->pageid > second_cursor->pageid)
    {
      return 1;
    }

  if (first_cursor->offset_to_unit < second_cursor->offset_to_unit)
    {
      return -1;
    }
  else if (first_cursor->offset_to_unit > second_cursor->offset_to_unit)
    {
      return 1;
    }

  if (first_cursor->offset_to_bit < second_cursor->offset_to_bit)
    {
      return -1;
    }
  else if (first_cursor->offset_to_bit > second_cursor->offset_to_bit)
    {
      return 1;
    }

  return 0;
}

/*
 * disk_stab_cursor_check_valid () - Check allocation table cursor validity.
 *
 * return      : void.
 * cursor (in) : Allocation table cursor.
 *
 * NOTE: This is a debug function.
 */
STATIC_INLINE void
disk_stab_cursor_check_valid (const DISK_STAB_CURSOR * cursor)
{
  assert (cursor != NULL);

  if (cursor->pageid == NULL_PAGEID)
    {
      /* Must be recovery. */
      assert (!LOG_ISRESTARTED ());
    }
  else
    {
      /* Cursor must have valid volheader pointer. */
      assert (cursor->volheader != NULL);

      /* Cursor must have a valid position. */
      assert (cursor->pageid >= cursor->volheader->stab_first_page);
      assert (cursor->pageid < cursor->volheader->stab_first_page + cursor->volheader->stab_npages);
      assert ((cursor->pageid - cursor->volheader->stab_first_page) * DISK_STAB_PAGE_BIT_COUNT
          + cursor->offset_to_unit * DISK_STAB_UNIT_BIT_COUNT + cursor->offset_to_bit == cursor->sectid);
    }

  assert (cursor->offset_to_unit >= 0);
  assert (cursor->offset_to_unit < DISK_STAB_PAGE_UNITS_COUNT);
  assert (cursor->offset_to_bit >= 0);
  assert (cursor->offset_to_bit < DISK_STAB_UNIT_BIT_COUNT);

  if (cursor->unit != NULL)
    {
      /* Must have a page fixed */
      assert (cursor->page != NULL);
      /* Unit pointer must match offset_to_unit */
      assert ((int) ((char *) cursor->unit - cursor->page) == (int) (cursor->offset_to_unit * DISK_STAB_UNIT_SIZE_OF));
    }
}

/*
 * disk_stab_cursor_is_bit_set () - Is bit set on current allocation table cursor.
 *
 * return      : True if bit is set, false otherwise.
 * cursor (in) : Allocation table cursor.
 */
STATIC_INLINE bool
disk_stab_cursor_is_bit_set (const DISK_STAB_CURSOR * cursor)
{
  disk_stab_cursor_check_valid (cursor);

  /* update if unit size is changed */
  return bit64_is_set (*cursor->unit, cursor->offset_to_bit);
}

/*
 * disk_stab_cursor_set_bit () - Set bit on current allocation table cursor.
 *
 * return      : Void.
 * cursor (in/out) : Allocation table cursor.
 */
STATIC_INLINE void
disk_stab_cursor_set_bit (DISK_STAB_CURSOR * cursor)
{
  disk_stab_cursor_check_valid (cursor);

  assert (!disk_stab_cursor_is_bit_set (cursor));

  /* update if unit size is changed */
  *cursor->unit = bit64_set (*cursor->unit, cursor->offset_to_bit);
}

/*
 * disk_stab_cursor_clear_bit () - Clear bit on current allocation table cursor.
 *
 * return      : Void.
 * cursor (in/out) : Allocation table cursor.
 */
STATIC_INLINE void
disk_stab_cursor_clear_bit (DISK_STAB_CURSOR * cursor)
{
  disk_stab_cursor_check_valid (cursor);

  assert (!disk_stab_cursor_is_bit_set (cursor));

  /* update if unit size is changed */
  *cursor->unit = bit64_clear (*cursor->unit, cursor->offset_to_bit);
}

/*
 * disk_stab_cursor_get_bit_index_in_page () - Get the index of cursor bit in allocation table page.
 *
 * return      : Bit index in page.
 * cursor (in) : Allocation table cursor.
 */
STATIC_INLINE int
disk_stab_cursor_get_bit_index_in_page (const DISK_STAB_CURSOR * cursor)
{
  disk_stab_cursor_check_valid (cursor);

  return cursor->offset_to_unit * DISK_STAB_UNIT_BIT_COUNT + cursor->offset_to_bit;
}

/*
 * disk_alloctbl_cursor_get_sectid () - Get the sector ID of cursor.
 *
 * return      : Sector ID.
 * cursor (in) : Allocation table cursor.
 */
STATIC_INLINE SECTID
disk_stab_cursor_get_sectid (const DISK_STAB_CURSOR * cursor)
{
  disk_stab_cursor_check_valid (cursor);

  return cursor->sectid;
}

/*
 * disk_stab_cursor_fix () - Fix current table page.
 *
 * return      : Error code.
 * thread_p (in)   : Thread entry.
 * cursor (in/out) : Allocation table cursor.
 * latch_mode (in) : Page latch mode.
 */
STATIC_INLINE int
disk_stab_cursor_fix (THREAD_ENTRY * thread_p, DISK_STAB_CURSOR * cursor, PGBUF_LATCH_MODE latch_mode)
{
  VPID vpid = VPID_INITIALIZER;
  int error_code = NO_ERROR;

  assert (cursor->page == NULL);

  cursor->unit = NULL;

  /* Fix page. */
  vpid.volid = cursor->volheader->volid;
  vpid.pageid = cursor->pageid;
  cursor->page = pgbuf_fix (thread_p, &vpid, OLD_PAGE, latch_mode, PGBUF_UNCONDITIONAL_LATCH);
  if (cursor->page == NULL)
    {
      ASSERT_ERROR_AND_SET (error_code);
      return error_code;
    }

  cursor->unit = ((DISK_STAB_UNIT *) cursor->page) + cursor->offset_to_unit;

  return NO_ERROR;
}

/*
 * disk_stab_cursor_unfix () - Unfix page from allocation table cursor.
 *
 * return      : Void.
 * thread_p (in)   : Thread entry.
 * cursor (in/out) : Allocation table cursor.
 */
STATIC_INLINE void
disk_stab_cursor_unfix (THREAD_ENTRY * thread_p, DISK_STAB_CURSOR * cursor)
{
  if (cursor->page != NULL)
    {
      pgbuf_unfix_and_init (thread_p, cursor->page);
    }
  cursor->unit = NULL;
}

/*
 * disk_stab_unit_reserve () - DISK_STAB_UNIT_FUNC function used to lookup and reserve free sectors
 *
 * return        : NO_ERROR
 * thread_p (in) : thread entry
 * cursor (in)   : disk sector table cursor
 * stop (out)    : output true when all required sectors are reserved
 * args (in/out) : reserve context
 */
static int
disk_stab_unit_reserve (THREAD_ENTRY * thread_p, DISK_STAB_CURSOR * cursor, bool * stop, void *args)
{
  DISK_RESERVE_CONTEXT *context;
  DISK_STAB_UNIT log_unit;
  SECTID sectid;

  /* how it works
   * look for unset bits and reserve the required number of sectors.
   * we have two special cases, which can be very usual:
   * 1. full unit - nothing can be reserved, so we early out
   * 2. empty unit - we can consume it all (if we need it all) or just trailing bits.
   * otherwise, we iterate bit by bit and reserve free sectors.
   */

  if (*cursor->unit == BIT64_FULL)
    {
      /* nothing free */
      return NO_ERROR;
    }

  context = (DISK_RESERVE_CONTEXT *) args;
  assert (context->nsects_lastvol_remaining > 0);

  if (*cursor->unit == 0)
    {
      /* empty unit. set all required bits */
      int bits_to_set = MIN (context->nsects_lastvol_remaining, DISK_STAB_UNIT_BIT_COUNT);
      int i;

      if (bits_to_set == DISK_STAB_UNIT_BIT_COUNT)
    {
      /* Consume all unit */
      *cursor->unit = BIT64_FULL;
    }
      else
    {
      /* consume only part of unit */
      *cursor->unit = bit64_set_trailing_bits (*cursor->unit, bits_to_set);
    }
      /* what we log */
      log_unit = *cursor->unit;

      /* update reserve context */
      context->nsects_lastvol_remaining -= bits_to_set;

      /* save sector ids */
      for (i = 0, sectid = disk_stab_cursor_get_sectid (cursor); i < bits_to_set; i++, sectid++)
    {
      context->vsidp->volid = cursor->volheader->volid;
      context->vsidp->sectid = sectid;
      context->vsidp++;

      disk_log ("disk_stab_unit_reserve", "reserved sectid %d in volume %d.", sectid, cursor->volheader->volid);
    }
    }
  else
    {
      /* iterate through unit bits */
      log_unit = 0;
      for (cursor->offset_to_bit = bit64_count_trailing_ones (*cursor->unit), cursor->sectid += cursor->offset_to_bit;
       cursor->offset_to_bit < DISK_STAB_UNIT_BIT_COUNT && context->nsects_lastvol_remaining > 0;
       cursor->offset_to_bit++, cursor->sectid++)
    {
      if (!disk_stab_cursor_is_bit_set (cursor))
        {
          /* reserve this sector */
          disk_stab_cursor_set_bit (cursor);

          /* update what we log */
          log_unit = bit64_set (log_unit, cursor->offset_to_bit);

          /* update context */
          context->nsects_lastvol_remaining--;

          /* save vsid */
          context->vsidp->volid = cursor->volheader->volid;
          context->vsidp->sectid = cursor->sectid;
          context->vsidp++;

          disk_log ("disk_stab_unit_reserve", "reserved sectid %d in volume %d.", cursor->sectid,
            cursor->volheader->volid);
        }
    }
    }

  /* safe guard: we must have done something, so log_unit cannot be 0 */
  assert (log_unit != 0);
  /* safe guard: all bits in log_unit must be set */
  assert ((log_unit & *cursor->unit) == log_unit);
  if (context->purpose == DB_PERMANENT_DATA_PURPOSE)
    {
      /* log changes */
      log_append_undoredo_data2 (thread_p, RVDK_RESERVE_SECTORS, NULL, cursor->page, cursor->offset_to_unit,
                 sizeof (log_unit), sizeof (log_unit), &log_unit, &log_unit);
    }
  /* page was modified */
  pgbuf_set_dirty (thread_p, cursor->page, DONT_FREE);

  if (context->nsects_lastvol_remaining <= 0)
    {
      /* all required sectors are reserved, we can stop now */
      assert (context->nsects_lastvol_remaining == 0);
      *stop = true;
    }
  return NO_ERROR;
}

/*
 * disk_stab_iterate_units () - iterate through units between start and end and call argument function. start and end
 *                              cursor should be aligned.
 *
 * return           : error code
 * thread_p (in)    : thread entry
 * volheader (in)   : volume header
 * mode (in)        : page latch mode
 * start (in)       : start cursor
 * end (in)         : end cursor
 * f_unit (in)      : function called on each unit
 * f_unit_args (in) : argument for unit function
 */
static int
disk_stab_iterate_units (THREAD_ENTRY * thread_p, const DISK_VOLUME_HEADER * volheader, PGBUF_LATCH_MODE mode,
             DISK_STAB_CURSOR * start, DISK_STAB_CURSOR * end, DISK_STAB_UNIT_FUNC f_unit,
             void *f_unit_args)
{
  DISK_STAB_CURSOR cursor;
  DISK_STAB_UNIT *end_unit;
  bool stop = false;
  int error_code = NO_ERROR;

  assert (volheader != NULL);
  assert (start->offset_to_bit >= 0);
  assert (end->offset_to_bit >= 0);
  assert (disk_stab_cursor_compare (start, end) < 0);

  /* iterate through pages */
  for (cursor = *start; cursor.pageid <= end->pageid; cursor.pageid++, cursor.offset_to_unit = 0)
    {
      assert (cursor.page == NULL);
      disk_stab_cursor_check_valid (&cursor);

      error_code = disk_stab_cursor_fix (thread_p, &cursor, mode);
      if (error_code != NO_ERROR)
    {
      ASSERT_ERROR ();
      return error_code;
    }

      /* iterate through units */

      /* set end_unit */
      end_unit = ((DISK_STAB_UNIT *) cursor.page)
    + (cursor.pageid == end->pageid ? end->offset_to_unit : DISK_STAB_PAGE_UNITS_COUNT);

      /* iterate */
      for (; cursor.unit < end_unit;
       cursor.unit++, cursor.offset_to_unit++, cursor.sectid += (DISK_STAB_UNIT_BIT_COUNT - cursor.offset_to_bit),
       cursor.offset_to_bit = 0)
    {
      disk_stab_cursor_check_valid (&cursor);

      /* call unit function */
      error_code = f_unit (thread_p, &cursor, &stop, f_unit_args);
      if (error_code != NO_ERROR)
        {
          ASSERT_ERROR ();
          disk_stab_cursor_unfix (thread_p, &cursor);
          return error_code;
        }
      if (stop)
        {
          /* stop */
          disk_stab_cursor_unfix (thread_p, &cursor);
          return NO_ERROR;
        }
    }

      disk_stab_cursor_unfix (thread_p, &cursor);
    }

  return NO_ERROR;
}

/*
 * disk_stab_iterate_units_all () - iterate trough all sector table units and apply function
 *
 * return           : error code
 * thread_p (in)    : thread entry
 * volheader (in)   : volume header
 * mode (in)        : page latch mode
 * f_unit (in)      : function to apply for each unit
 * f_unit_args (in) : argument for unit function
 */
static int
disk_stab_iterate_units_all (THREAD_ENTRY * thread_p, const DISK_VOLUME_HEADER * volheader, PGBUF_LATCH_MODE mode,
                 DISK_STAB_UNIT_FUNC f_unit, void *f_unit_args)
{
  DISK_STAB_CURSOR start, end;

  disk_stab_cursor_set_at_start (volheader, &start);
  disk_stab_cursor_set_at_end (volheader, &end);

  return disk_stab_iterate_units (thread_p, volheader, mode, &start, &end, f_unit, f_unit_args);
}

/*
 * disk_stab_count_free () - DISK_STAB_UNIT_FUNC to count free sectors
 *
 * return        : NO_ERROR
 * thread_p (in) : thread entry
 * cursor (in)   : disk sector table cursor
 * stop (in)     : not used
 * args (in/out) : free sectors total count
 */
static int
disk_stab_count_free (THREAD_ENTRY * thread_p, DISK_STAB_CURSOR * cursor, bool * stop, void *args)
{
  DKNSECTS *nfreep = (DKNSECTS *) args;

  /* add zero bit count to free sectors total count */
  *nfreep += bit64_count_zeros (*cursor->unit);
  return NO_ERROR;
}

/*
 * disk_stab_has_used () - DISK_STAB_UNIT_FUNC to determine whether at least one sector in the unit is in use
 *
 * return        : NO_ERROR
 * thread_p (in) : thread entry
 * cursor (in)   : disk sector table cursor
 * stop (out)    : output true when at least one sector in the unit is in use
 * args (out)    : output true when at least one sector in the unit is in use
 */
static int
disk_stab_has_used (THREAD_ENTRY * thread_p, DISK_STAB_CURSOR * cursor, bool * stop, void *args)
{
  bool *has_used = (bool *) args;

  *has_used = false;

  for (; cursor->offset_to_bit < DISK_STAB_UNIT_BIT_COUNT; cursor->offset_to_bit++, cursor->sectid++)
    {
      if (disk_stab_cursor_is_bit_set (cursor))
    {
      *has_used = *stop = true;

      break;
    }
    }

  return NO_ERROR;
}

/*
 * disk_stab_set_bits_contiguous () - set first bits
 *
 * return        : NO_ERROR
 * thread_p (in) : thread entry
 * cursor (in)   : sector table cursor
 * stop (out)    : output true when all required bits are set
 * args (in/out) : remaining number of bits to set
 */
static int
disk_stab_set_bits_contiguous (THREAD_ENTRY * thread_p, DISK_STAB_CURSOR * cursor, bool * stop, void *args)
{
  DKNSECTS *nsectsp = (DKNSECTS *) args;

  if (*nsectsp > DISK_STAB_UNIT_BIT_COUNT)
    {
      *cursor->unit = BIT64_FULL;
      (*nsectsp) -= DISK_STAB_UNIT_BIT_COUNT;
    }
  else
    {
      *cursor->unit = bit64_set_trailing_bits (0, *nsectsp);
      *nsectsp = 0;
      *stop = true;
    }

  return NO_ERROR;
}

/*
 * disk_stab_dump_unit () - dump a unit in sector table
 *
 * return        : NO_ERROR
 * thread_p (in) : thread entry
 * cursor (in)   : sector table cursor
 * stop (in)     : not used
 * args (in/out) : output file
 */
static int
disk_stab_dump_unit (THREAD_ENTRY * thread_p, DISK_STAB_CURSOR * cursor, bool * stop, void *args)
{
  FILE *fp = (FILE *) args;

  assert (cursor->offset_to_bit == 0);

  fprintf (fp, "\n%10d", cursor->sectid);

  for (; cursor->offset_to_bit < DISK_STAB_UNIT_BIT_COUNT; cursor->offset_to_bit++, cursor->sectid++)
    {
      if (cursor->offset_to_bit % CHAR_BIT == 0)
    {
      fprintf (fp, " ");
    }

      fprintf (fp, "%d", disk_stab_cursor_is_bit_set (cursor) ? 1 : 0);
    }

  return NO_ERROR;
}

/*
 * disk_stab_dump () - Dump the content of the allocation map table
 *   return: NO_ERROR
 *   vpid(in): Complete Page identifier
 *   at_name(in): Name of allocator table
 *   at_fpageid(in): First page of map allocation table
 *   at_lpageid(in): Last page of map allocation table
 *   all_fid(in): First allocation(page/sector) id
 *   all_lid(in): Last  allocation(page/sector) id
 *
 * Note: This function is used for debugging purposes.
 */
static int
disk_stab_dump (THREAD_ENTRY * thread_p, FILE * fp, const DISK_VOLUME_HEADER * volheader)
{
  int error_code = NO_ERROR;

  fprintf (fp, "SECTOR TABLE BITMAP:\n\n");
  fprintf (fp, "           01234567 01234567 01234567 01234567 01234567 01234567 01234567 01234567");

  error_code = disk_stab_iterate_units_all (thread_p, volheader, PGBUF_LATCH_READ, disk_stab_dump_unit, fp);
  if (error_code != NO_ERROR)
    {
      ASSERT_ERROR ();
      return error_code;
    }
  fprintf (fp, "\n");
  return NO_ERROR;
}

/************************************************************************/
/* Sector reserve section                                               */
/************************************************************************/

/*
 * disk_rv_reserve_sectors () - Apply recovery for reserving sectors.
 *
 * return      : Error code.
 * thread_p (in)   : Thread entry.
 * rcv (in)    : Recovery data.
 */
int
disk_rv_reserve_sectors (THREAD_ENTRY * thread_p, LOG_RCV * rcv)
{
  DISK_STAB_UNIT rv_unit = *(DISK_STAB_UNIT *) rcv->data;
  DISK_STAB_UNIT *stab_unit;
  VOLID volid;
  DB_VOLPURPOSE purpose;
  DKNSECTS nsect;
  int error_code = NO_ERROR;

  assert (rcv->pgptr != NULL);
  assert (rcv->length == sizeof (rv_unit));
  assert (rcv->offset >= 0 && rcv->offset < DISK_STAB_PAGE_UNITS_COUNT);

  /* we need to enter CSECT_DISK_CHECK as reader, to make sure we do not conflict with disk check */
  error_code = csect_enter_as_reader (thread_p, CSECT_DISK_CHECK, 0);
  if (error_code == NO_ERROR)
    {
      /* we locked. */
    }
  else if (error_code == ETIMEDOUT)
    {
      /* disk check is in progress. unfix page, get critical section again and then fix page and make the changes */
      VPID vpid;
      pgbuf_get_vpid (rcv->pgptr, &vpid);
      pgbuf_unfix_and_init (thread_p, rcv->pgptr);
      error_code = csect_enter_as_reader (thread_p, CSECT_DISK_CHECK, INF_WAIT);
      if (error_code != NO_ERROR)
    {
      assert_release (false);
      return ER_FAILED;
    }
      /* fix page again */
      rcv->pgptr = pgbuf_fix (thread_p, &vpid, OLD_PAGE, PGBUF_LATCH_WRITE, PGBUF_UNCONDITIONAL_LATCH);
      if (rcv->pgptr == NULL)
    {
      /* should not fail */
      assert_release (false);
      csect_exit (thread_p, CSECT_DISK_CHECK);
      return ER_FAILED;
    }
    }
  else
    {
      assert_release (false);
      return ER_FAILED;
    }

#if !defined (NDEBUG)
  pgbuf_check_page_ptype (thread_p, rcv->pgptr, PAGE_VOLBITMAP);
#endif /* !NDEBUG */

  stab_unit = ((DISK_STAB_UNIT *) rcv->pgptr) + rcv->offset;
  assert (((*stab_unit) & rv_unit) == 0);
  *stab_unit |= rv_unit;

  pgbuf_set_dirty (thread_p, rcv->pgptr, DONT_FREE);

  /* update cache */
  volid = pgbuf_get_volume_id (rcv->pgptr);
  purpose = disk_get_volpurpose (volid);

  nsect = bit64_count_ones (rv_unit);

  disk_cache_lock_reserve_for_purpose (purpose);
  disk_cache_update_vol_free (volid, -nsect);
  disk_cache_unlock_reserve_for_purpose (purpose);

  disk_log ("disk_rv_reserve_sectors", "reserved %d sectors in " PGBUF_PAGE_STATE_MSG ("sector table page")
        "\n\tstab_unit = " BIT64_HEXA_PRINT_FORMAT "\n\trv_unit = " BIT64_HEXA_PRINT_FORMAT,
        nsect, PGBUF_PAGE_STATE_ARGS (rcv->pgptr), *stab_unit, rv_unit);

  csect_exit (thread_p, CSECT_DISK_CHECK);
  return NO_ERROR;
}

/*
 * disk_rv_unreserve_sectors () - Apply recovery for unreserving sectors.
 *
 * return      : Error code.
 * thread_p (in)   : Thread entry.
 * rcv (in)    : Recovery data.
 */
int
disk_rv_unreserve_sectors (THREAD_ENTRY * thread_p, LOG_RCV * rcv)
{
  DISK_STAB_UNIT rv_unit = *(DISK_STAB_UNIT *) rcv->data;
  DISK_STAB_UNIT *stab_unit;
  VOLID volid;
  DB_VOLPURPOSE purpose;
  DKNSECTS nsect;
  int error_code = NO_ERROR;

  assert (rcv->pgptr != NULL);
  assert (rcv->length == sizeof (rv_unit));
  assert (rcv->offset >= 0 && rcv->offset < DISK_STAB_PAGE_UNITS_COUNT);

  /* we need to enter CSECT_DISK_CHECK as reader, to make sure we do not conflict with disk check */
  error_code = csect_enter_as_reader (thread_p, CSECT_DISK_CHECK, 0);
  if (error_code == NO_ERROR)
    {
      /* we locked. */
    }
  else if (error_code == ETIMEDOUT)
    {
      /* disk check is in progress. unfix page, get critical section again and then fix page and make the changes */
      VPID vpid;
      pgbuf_get_vpid (rcv->pgptr, &vpid);
      pgbuf_unfix_and_init (thread_p, rcv->pgptr);
      error_code = csect_enter_as_reader (thread_p, CSECT_DISK_CHECK, INF_WAIT);
      if (error_code != NO_ERROR)
    {
      assert_release (false);
      return ER_FAILED;
    }
      /* fix page again */
      rcv->pgptr = pgbuf_fix (thread_p, &vpid, OLD_PAGE, PGBUF_LATCH_WRITE, PGBUF_UNCONDITIONAL_LATCH);
      if (rcv->pgptr == NULL)
    {
      /* should not fail */
      assert_release (false);
      csect_exit (thread_p, CSECT_DISK_CHECK);
      return ER_FAILED;
    }
    }
  else
    {
      assert_release (false);
      return ER_FAILED;
    }

#if !defined (NDEBUG)
  pgbuf_check_page_ptype (thread_p, rcv->pgptr, PAGE_VOLBITMAP);
#endif /* !NDEBUG */

  stab_unit = ((DISK_STAB_UNIT *) rcv->pgptr) + rcv->offset;
  assert (((*stab_unit) & rv_unit) == rv_unit);
  *stab_unit &= ~rv_unit;

  pgbuf_set_dirty (thread_p, rcv->pgptr, DONT_FREE);

  /* update cache */
  volid = pgbuf_get_volume_id (rcv->pgptr);
  purpose = disk_get_volpurpose (volid);

  nsect = bit64_count_ones (rv_unit);

  disk_cache_lock_reserve_for_purpose (purpose);
  disk_cache_update_vol_free (volid, nsect);
  disk_cache_unlock_reserve_for_purpose (purpose);

  disk_log ("disk_rv_unreserve_sectors", "unreserved %d sectors in " PGBUF_PAGE_STATE_MSG ("sector table page")
        "\n\tstab_unit = " BIT64_HEXA_PRINT_FORMAT "\n\trv_unit = " BIT64_HEXA_PRINT_FORMAT,
        nsect, PGBUF_PAGE_STATE_ARGS (rcv->pgptr), *stab_unit, rv_unit);

  csect_exit (thread_p, CSECT_DISK_CHECK);
  return NO_ERROR;
}

/*
 * disk_reserve_sectors_in_volume () - Reserve a number of sectors in the given volume.
 *
 * return       : Error code.
 * thread_p (in)    : Thread entry.
 * vol_index (in)   : The index of volume in reserve context
 * context (in/out) : Reserve context
 */
static int
disk_reserve_sectors_in_volume (THREAD_ENTRY * thread_p, int vol_index, DISK_RESERVE_CONTEXT * context)
{
  VOLID volid;
  PAGE_PTR page_volheader = NULL;
  DISK_VOLUME_HEADER *volheader = NULL;
  DISK_STAB_CURSOR start_cursor = DISK_STAB_CURSOR_INITIALIZER;
  DISK_STAB_CURSOR end_cursor = DISK_STAB_CURSOR_INITIALIZER;
  int error_code = NO_ERROR;

  volid = context->cache_vol_reserve[vol_index].volid;
  if (volid == NULL_VOLID)
    {
      assert_release (false);
      return ER_FAILED;
    }
  context->nsects_lastvol_remaining = context->cache_vol_reserve[vol_index].nsect;
  assert (context->nsects_lastvol_remaining > 0);

  /* fix volume header */
  error_code = disk_get_volheader (thread_p, volid, PGBUF_LATCH_WRITE, &page_volheader, &volheader);
  if (error_code != NO_ERROR)
    {
      ASSERT_ERROR ();
      return error_code;
    }

  disk_log ("disk_reserve_sectors_in_volume", "reserve %d sectors in volume %d.", context->nsects_lastvol_remaining,
        volid);

  /* reserve all possible sectors. */
  if (volheader->hint_allocsect > 0 && volheader->hint_allocsect < volheader->nsect_total)
    {
      /* start with hinted sector */
      DISK_SECTS_ASSERT_ROUNDED (volheader->hint_allocsect);
      disk_stab_cursor_set_at_sectid (volheader, volheader->hint_allocsect, &start_cursor);
      disk_stab_cursor_set_at_end (volheader, &end_cursor);

      /* reserve sectors after hint */
      error_code = disk_stab_iterate_units (thread_p, volheader, PGBUF_LATCH_WRITE, &start_cursor, &end_cursor,
                        disk_stab_unit_reserve, context);
      if (error_code != NO_ERROR)
    {
      ASSERT_ERROR ();
      goto exit;
    }
      if (context->nsects_lastvol_remaining > 0)
    {
      /* we need to reserve more. reserve sectors before hint */
      end_cursor = start_cursor;
      disk_stab_cursor_set_at_start (volheader, &start_cursor);
      error_code = disk_stab_iterate_units (thread_p, volheader, PGBUF_LATCH_WRITE, &start_cursor, &end_cursor,
                        disk_stab_unit_reserve, context);
      if (error_code != NO_ERROR)
        {
          ASSERT_ERROR ();
          goto exit;
        }
    }
    }
  else
    {
      /* search the entire sector table */
      disk_stab_cursor_set_at_start (volheader, &start_cursor);
      disk_stab_cursor_set_at_end (volheader, &end_cursor);
      error_code = disk_stab_iterate_units (thread_p, volheader, PGBUF_LATCH_WRITE, &start_cursor, &end_cursor,
                        disk_stab_unit_reserve, context);
      if (error_code != NO_ERROR)
    {
      ASSERT_ERROR ();
      goto exit;
    }
    }

  if (context->nsects_lastvol_remaining != 0)
    {
      /* our logic must be flawed... the sectors are reserved ahead so we should have found enough free sectors */
      assert_release (false);
      error_code = ER_FAILED;
      goto exit;
    }

  /* update hint */
  volheader->hint_allocsect = (context->vsidp - 1)->sectid + 1;
  volheader->hint_allocsect = DISK_SECTS_ROUND_DOWN (volheader->hint_allocsect);
  /* we don't really need to set the page dirty or log the hint change. */

exit:
  if (page_volheader != NULL)
    {
      pgbuf_unfix (thread_p, page_volheader);
    }
  return error_code;
}

/*
 * disk_is_page_sector_reserved () - Is sector of page reserved?
 *
 * return        : Valid if sector of page is reserved, invalid (or error) otherwise.
 * thread_p (in) : Thread entry
 * volid (in)    : Page volid
 * pageid (in)   : Page pageid
 */
DISK_ISVALID
disk_is_page_sector_reserved (THREAD_ENTRY * thread_p, VOLID volid, PAGEID pageid)
{
  return disk_is_page_sector_reserved_with_debug_crash (thread_p, volid, pageid, false);
}

/*
 * disk_is_page_sector_reserved_with_error () - Is sector of page reserved?
 *
 * return           : Valid if sector of page is reserved, invalid (or error) otherwise.
 * thread_p (in)    : Thread entry
 * volid (in)       : Page volid
 * pageid (in)      : Page pageid
 * debug_crash (in) : crash when invalid in debug mode
 */
DISK_ISVALID
disk_is_page_sector_reserved_with_debug_crash (THREAD_ENTRY * thread_p, VOLID volid, PAGEID pageid, bool debug_crash)
{
  PAGE_PTR page_volheader = NULL;
  DISK_VOLUME_HEADER *volheader;
  DISK_ISVALID isvalid = DISK_VALID;
  SECTID sectid;
  bool old_check_interrupt;
  int old_wait_msecs;

  old_check_interrupt = logtb_set_check_interrupt (thread_p, false);
  old_wait_msecs = xlogtb_reset_wait_msecs (thread_p, LK_INFINITE_WAIT);

  if (fileio_get_volume_descriptor (volid) == NULL_VOLDES || pageid < 0)
    {
      /* invalid */
      assert (!debug_crash);
      isvalid = DISK_INVALID;
      goto exit;
    }

  if (pageid == DISK_VOLHEADER_PAGE)
    {
      /* valid */
      isvalid = DISK_VALID;
      goto exit;
    }

  if (disk_get_volheader (thread_p, volid, PGBUF_LATCH_READ, &page_volheader, &volheader) != NO_ERROR)
    {
      ASSERT_ERROR ();
      isvalid = DISK_ERROR;
      goto exit;
    }

  if (pageid <= volheader->sys_lastpage)
    {
      isvalid = DISK_VALID;
      goto exit;
    }
  if (pageid > DISK_SECTS_NPAGES (volheader->nsect_total))
    {
      assert (!debug_crash);
      isvalid = DISK_INVALID;
      goto exit;
    }

  sectid = SECTOR_FROM_PAGEID (pageid);
  isvalid = disk_is_sector_reserved (thread_p, volheader, sectid, debug_crash);

exit:
  xlogtb_reset_wait_msecs (thread_p, old_wait_msecs);
  (void) logtb_set_check_interrupt (thread_p, old_check_interrupt);

  if (page_volheader)
    {
      pgbuf_unfix (thread_p, page_volheader);
    }

  return isvalid;
}

/*
 * disk_is_sector_reserved () - Return valid if sector is reserved, invalid otherwise.
 *
 * return           : Valid if sector is reserved, invalid if it is not reserved or error if table page cannot be fixed.
 * thread_p (in)    : Thread entry
 * volheader (in)   : Volume header
 * sectid (in)      : Sector ID
 * debug_crash (in) : crash when invalid in debug mode
 */
static DISK_ISVALID
disk_is_sector_reserved (THREAD_ENTRY * thread_p, const DISK_VOLUME_HEADER * volheader, SECTID sectid, bool debug_crash)
{
  DISK_STAB_CURSOR cursor_sectid;

  disk_stab_cursor_set_at_sectid (volheader, sectid, &cursor_sectid);
  if (disk_stab_cursor_fix (thread_p, &cursor_sectid, PGBUF_LATCH_READ) != NO_ERROR)
    {
      ASSERT_ERROR ();
      return DISK_ERROR;
    }

  if (!disk_stab_cursor_is_bit_set (&cursor_sectid))
    {
      assert (!debug_crash);
      disk_stab_cursor_unfix (thread_p, &cursor_sectid);
      return DISK_INVALID;
    }
  else
    {
      disk_stab_cursor_unfix (thread_p, &cursor_sectid);
      return DISK_VALID;
    }
}

/*
 * disk_reserve_sectors () - Reserve the required number of sectors in all database volumes.
 *
 * return         : Error code.
 * thread_p (in)      : Thread entry.
 * purpose (in)       : Reservations purpose (data, index, generic or temp).
 * volid_hint (in)    : Hint a volume to be checked first.
 * n_sectors (in)     : Number of sectors to reserve.
 * reserved_sectors (out) : Array of reserved sectors.
 */
int
disk_reserve_sectors (THREAD_ENTRY * thread_p, DB_VOLPURPOSE purpose, VOLID volid_hint, int n_sectors,
              VSID * reserved_sectors)
{
  int iter;
  DISK_RESERVE_CONTEXT context;
  int nreserved;
  bool retried = false;
  bool did_extend = false;
  int error_code = NO_ERROR;

  assert (purpose == DB_PERMANENT_DATA_PURPOSE || purpose == DB_TEMPORARY_DATA_PURPOSE);

  if (n_sectors <= 0 || reserved_sectors == NULL)
    {
      assert_release (false);
      return ER_FAILED;
    }

  if (purpose != DB_TEMPORARY_DATA_PURPOSE && !log_check_system_op_is_started (thread_p))
    {
      /* We really need a system operation. */
      assert (false);
      er_set (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, ER_GENERIC_ERROR, 0);
      return ER_FAILED;
    }

retry:
  disk_log ("disk_reserve_sectors", "reserve %d sectors for %s.", n_sectors, disk_purpose_to_string (purpose));

  log_sysop_start (thread_p);

  /* we don't want to conflict with disk check */
  error_code = csect_enter_as_reader (thread_p, CSECT_DISK_CHECK, INF_WAIT);
  if (error_code != NO_ERROR)
    {
      ASSERT_ERROR ();
      log_sysop_abort (thread_p);
      return error_code;
    }

  /* init context */
  context.nsect_total = n_sectors;
  context.n_cache_reserve_remaining = n_sectors;
  context.vsidp = reserved_sectors;
  context.n_cache_vol_reserve = 0;
  context.purpose = purpose;

  error_code = disk_reserve_from_cache (thread_p, &context, &did_extend);
  if (error_code != NO_ERROR)
    {
      ASSERT_ERROR ();
      goto error;
    }

  for (iter = 0; iter < context.n_cache_vol_reserve; iter++)
    {
      error_code = disk_reserve_sectors_in_volume (thread_p, iter, &context);
      if (error_code != NO_ERROR)
    {
      ASSERT_ERROR ();
      goto error;
    }
    }

  /* Should have enough sectors. */
  assert ((context.vsidp - reserved_sectors) == n_sectors);

  csect_exit (thread_p, CSECT_DISK_CHECK);
  /* attach sys op to outer */
  log_sysop_attach_to_outer (thread_p);

#if !defined (NDEBUG)
  if (did_extend)
    {
      /* safe-guard: catch inconsistencies early */
      assert (disk_check (thread_p, false) != DISK_INVALID);
    }
#endif /* !NDEBUG */

  return NO_ERROR;

error:
  nreserved = (int) (context.vsidp - reserved_sectors);
  if (nreserved > 0)
    {
      int iter_vsid;

      if (purpose == DB_TEMPORARY_DATA_PURPOSE)
    {
      /* nothing was logged. we need to revert any partial allocations we may have made. */
      bool save_check_interrupt = logtb_set_check_interrupt (thread_p, false);

      qsort (reserved_sectors, nreserved, sizeof (VSID), disk_compare_vsids);
      if (disk_unreserve_ordered_sectors_without_csect (thread_p, purpose, nreserved, reserved_sectors) != NO_ERROR)
        {
          assert_release (false);
        }
      (void) logtb_set_check_interrupt (thread_p, save_check_interrupt);
    }

      /* we'll need to remove reservations for the rest of sectors (that were not allocated from disk). but first,
       * let's avoid removing the reservations for the ones we allocated from disk and rollbacked (they have been
       * removed from cache too) */
      for (iter_vsid = 0; iter_vsid < nreserved; iter_vsid++)
    {
      /* search vsid in volumes */
      for (iter = 0; iter < context.n_cache_vol_reserve; iter++)
        {
          if (reserved_sectors[iter_vsid].volid == context.cache_vol_reserve[iter].volid)
        {
          context.cache_vol_reserve[iter].nsect--;
          break;
        }
        }
      assert (iter < context.n_cache_vol_reserve);
    }
    }

  /* undo cache reserve */
  disk_cache_free_reserved (&context);

  csect_exit (thread_p, CSECT_DISK_CHECK);

  /* abort any changes */
  log_sysop_abort (thread_p);

  if (error_code == ER_INTERRUPTED  /* interrupted error */
      || error_code == ER_IO_MOUNT_FAIL || error_code == ER_IO_FORMAT_OUT_OF_SPACE || error_code == ER_IO_WRITE
      || error_code == ER_BO_CANNOT_CREATE_VOL /* IO errors */ )
    {
      /* this is expected. */
      return error_code;
    }
  /* this is not. */
  assert_release (false);

  /* let's try to do something about this. */

  if (retried)
    {
      /* already tried... this is not a good sign */
      _er_log_debug (ARG_FILE_LINE, "disk_reserve_sectors retried and failed! error_code = %d \n", error_code);
      return error_code;
    }

  /* disk_check should fix any cache inconsistencies */
  if (disk_check (thread_p, true) == DISK_INVALID)
    {
      /* oh, it was bad */
      error_code = NO_ERROR;
      er_clear ();
      retried = true;
      goto retry;
    }

  /* it was not disk cache... */
  _er_log_debug (ARG_FILE_LINE, "disk_reserve_sectors failed unexpectedly! error_code = %d \n", error_code);
  return error_code;
}

/*
 * disk_reserve_from_cache () - First step of sector reservation on disk. This searches the cache for free space in
 *              existing volumes. If not enough available sectors were found, volumes will be
 *              expanded/added until all sectors could be reserved.
 *              NOTE: this will modify the disk cache. It will "move" free sectors from disk cache
 *              to reserve context. If any error occurs, the sectors must be returned to disk cache.
 *
 * return           : Error code
 * thread_p (in)    : Thread entry
 * context (in/out) : Reserve context
 * did_extend (out) :
 */
static int
disk_reserve_from_cache (THREAD_ENTRY * thread_p, DISK_RESERVE_CONTEXT * context, bool * did_extend)
{
  DISK_EXTEND_INFO *extend_info;
  DKNSECTS save_remaining;
  int error_code = NO_ERROR;

  if (disk_Cache == NULL)
    {
      /* not initialized? */
      assert_release (false);
      return ER_FAILED;
    }

  disk_cache_lock_reserve_for_purpose (context->purpose);
  if (context->purpose == DB_TEMPORARY_DATA_PURPOSE)
    {
      /* if we want to allocate temporary files, we have two options: preallocated permanent volumes (but with the
       * purpose of temporary files) or temporary volumes. try first the permanent volumes */

      extend_info = &disk_Cache->temp_purpose_info.extend_info;

      if (disk_Cache->temp_purpose_info.nsect_perm_free > 0)
    {
      disk_reserve_from_cache_vols (DB_PERMANENT_VOLTYPE, context);
    }
      if (context->n_cache_reserve_remaining <= 0)
    {
      /* found enough sectors */
      assert (context->n_cache_reserve_remaining == 0);
      disk_cache_unlock_reserve_for_purpose (context->purpose);
      return NO_ERROR;
    }

      /* reserve sectors from temporary volumes */
      extend_info = &disk_Cache->temp_purpose_info.extend_info;
      if (extend_info->nsect_total - extend_info->nsect_free + context->n_cache_reserve_remaining > disk_Temp_max_sects)
    {
      /* too much temporary space */
      assert (false);
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_BO_MAXTEMP_SPACE_HAS_BEEN_EXCEEDED, 1,
          disk_Temp_max_sects * DISK_SECTOR_NPAGES);
      disk_cache_unlock_reserve_for_purpose (context->purpose);
      return ER_BO_MAXTEMP_SPACE_HAS_BEEN_EXCEEDED;
    }

      /* fall through */
    }
  else
    {
      extend_info = &disk_Cache->perm_purpose_info.extend_info;
      /* fall through */
    }

  assert (context->n_cache_reserve_remaining > 0);
  assert (extend_info->owner_reserve == thread_get_entry_index (thread_p));

  if (extend_info->nsect_free > context->n_cache_reserve_remaining)
    {
      disk_reserve_from_cache_vols (extend_info->voltype, context);
      if (context->n_cache_reserve_remaining <= 0)
    {
      /* found enough sectors */
      assert (context->n_cache_reserve_remaining == 0);
      disk_cache_unlock_reserve (extend_info);
      return NO_ERROR;
    }
    }

  /* we might have to expand */
  /* first, save our intention in case somebody else will do the expand */
  extend_info->nsect_intention += context->n_cache_reserve_remaining;

  disk_log ("disk_reserve_from_cache", "existing free space could not satisfy reserve. "
        "increment intention by %d to %d for %s.", context->n_cache_reserve_remaining,
        extend_info->nsect_intention, disk_type_to_string (extend_info->voltype));

  disk_cache_unlock_reserve (extend_info);

  /* now lock expand */
  disk_lock_extend ();

  /* check again free sectors */
  disk_cache_lock_reserve (extend_info);
  if (extend_info->nsect_free > context->n_cache_reserve_remaining)
    {
      /* somebody else expanded? try again to reserve */
      /* also update intention */
      extend_info->nsect_intention -= context->n_cache_reserve_remaining;

      disk_log ("disk_reserve_from_cache", "somebody else extended disk. try reserve from cache again. "
        "also decrement intention by %d to %d for %s.", context->n_cache_reserve_remaining,
        extend_info->nsect_intention, disk_type_to_string (extend_info->voltype));

      disk_reserve_from_cache_vols (extend_info->voltype, context);
      if (context->n_cache_reserve_remaining <= 0)
    {
      assert (context->n_cache_reserve_remaining == 0);
      disk_cache_unlock_reserve (extend_info);
      disk_unlock_extend ();
      return NO_ERROR;
    }

      extend_info->nsect_intention += context->n_cache_reserve_remaining;

      disk_log ("disk_reserve_from_cache", "could not reserve enough from cache. need to do extend. "
        "increment intention by %d to %d for %s.", context->n_cache_reserve_remaining,
        extend_info->nsect_intention, disk_type_to_string (extend_info->voltype));
    }

  /* ok, we really have to extend the disk space ourselves */
  save_remaining = context->n_cache_reserve_remaining;

  disk_cache_unlock_reserve (extend_info);

  error_code = disk_extend (thread_p, extend_info, context);

  /* remove intention */
  disk_cache_lock_reserve (extend_info);
  extend_info->nsect_intention -= save_remaining;
  disk_log ("disk_reserve_from_cache", "extend done. decrement intention by %d to %d for %s. \n",
        save_remaining, extend_info->nsect_intention, disk_type_to_string (extend_info->voltype));
  disk_cache_unlock_reserve (extend_info);

  disk_unlock_extend ();
  if (error_code != NO_ERROR)
    {
      ASSERT_ERROR ();
      return error_code;
    }
  if (context->n_cache_reserve_remaining > 0)
    {
      assert_release (false);
      return ER_FAILED;
    }

  *did_extend = true;

  /* all cache reservations were made */
  return NO_ERROR;
}

/*
 * disk_reserve_from_cache_vols () - reserve sectors in disk cache volumes
 *
 * return       : Void
 * purpose (in) : Permanent/temporary purpose
 * context (in) : Reserve context
 */
STATIC_INLINE void
disk_reserve_from_cache_vols (DB_VOLTYPE type, DISK_RESERVE_CONTEXT * context)
{
  VOLID volid_iter;
  VOLID start_iter, end_iter, incr;
  DKNSECTS min_free;

  assert (disk_compatible_type_and_purpose (type, context->purpose));

  if (type == DB_PERMANENT_VOLTYPE)
    {
      start_iter = 0;
      end_iter = disk_Cache->nvols_perm;
      incr = 1;

      min_free = MIN (context->nsect_total, disk_Cache->perm_purpose_info.extend_info.nsect_vol_max) / 2;
    }
  else
    {
      start_iter = LOG_MAX_DBVOLID;
      end_iter = LOG_MAX_DBVOLID - disk_Cache->nvols_temp;
      incr = -1;

      min_free = MIN (context->nsect_total, disk_Cache->temp_purpose_info.extend_info.nsect_vol_max) / 2;
    }

  /* make sure we search for at least one sector */
  min_free = MAX (min_free, 1);

  for (volid_iter = start_iter; volid_iter != end_iter && context->n_cache_reserve_remaining > 0; volid_iter += incr)
    {
      if (disk_Cache->vols[volid_iter].purpose != context->purpose)
    {
      /* not the right purpose. */
      continue;
    }
      if (disk_Cache->vols[volid_iter].nsect_free < min_free)
    {
      /* avoid unnecessary fragmentation */
      continue;
    }
      /* reserve from this volume */
      disk_reserve_from_cache_volume (volid_iter, context);
    }
}

/*
 * disk_reserve_from_cache_volume () - reserve sectors from cache
 *
 * return           : void
 * volid (in)       : volume identifier
 * nsects (in)      : number of reserved sectors
 * context (in/out) : reserve context
 */
STATIC_INLINE void
disk_reserve_from_cache_volume (VOLID volid, DISK_RESERVE_CONTEXT * context)
{
  DKNSECTS nsects;

  if (context->n_cache_vol_reserve >= LOG_MAX_DBVOLID)
    {
      assert_release (false);
      return;
    }
  disk_check_own_reserve_for_purpose (context->purpose);
  assert (context->n_cache_reserve_remaining > 0);
  assert (disk_Cache->vols[volid].nsect_free > 0);

  nsects = MIN (disk_Cache->vols[volid].nsect_free, context->n_cache_reserve_remaining);
  disk_cache_update_vol_free (volid, -nsects);

  context->cache_vol_reserve[context->n_cache_vol_reserve].volid = volid;
  context->cache_vol_reserve[context->n_cache_vol_reserve].nsect = nsects;
  context->n_cache_reserve_remaining -= nsects;

  disk_log ("disk_reserve_from_cache_volume", "reserved %d sectors from volid = %d, \n" DISK_RESERVE_CONTEXT_MSG,
        nsects, volid, DISK_RESERVE_CONTEXT_AS_ARGS (context));

  context->n_cache_vol_reserve++;
  assert (context->n_cache_reserve_remaining >= 0);
}

/*
 * disk_unreserve_ordered_sectors () - un-reserve given list of sectors from disk volumes. the list must be ordered.
 *
 * return        : error code
 * thread_p (in) : thread entry
 * purpose (in)  : the purpose of reserved sectors
 * nsects (in)   : number of sectors
 * vsids (in)    : array of sectors
 */
int
disk_unreserve_ordered_sectors (THREAD_ENTRY * thread_p, DB_VOLPURPOSE purpose, int nsects, VSID * vsids)
{
  int error_code = NO_ERROR;

  error_code = csect_enter_as_reader (thread_p, CSECT_DISK_CHECK, INF_WAIT);
  if (error_code != NO_ERROR)
    {
      ASSERT_ERROR ();
      return error_code;
    }

  error_code = disk_unreserve_ordered_sectors_without_csect (thread_p, purpose, nsects, vsids);
  if (error_code != NO_ERROR)
    {
      ASSERT_ERROR ();
    }

  csect_exit (thread_p, CSECT_DISK_CHECK);
  return error_code;
}

/*
 * disk_unreserve_ordered_sectors_without_csect () - un-reserve given list of sectors from disk volumes. the list must
 *                                                   be ordered. caller must make sure to lock CSECT_DISK_CHECK.
 *
 * return        : error code
 * thread_p (in) : thread entry
 * purpose (in)  : the purpose of reserved sectors
 * nsects (in)   : number of sectors
 * vsids (in)    : array of sectors
 */
static int
disk_unreserve_ordered_sectors_without_csect (THREAD_ENTRY * thread_p, DB_VOLPURPOSE purpose, int nsects, VSID * vsids)
{
  int start_index = 0;
  int end_index = 0;
  int index;
  VOLID volid = NULL_VOLID;
  DISK_RESERVE_CONTEXT context;
  int error_code = NO_ERROR;

  context.nsect_total = nsects;
  context.n_cache_reserve_remaining = nsects;

  context.n_cache_vol_reserve = 0;
  context.vsidp = vsids;
  context.purpose = purpose;

  /* note: vsids are ordered */
  for (start_index = 0; start_index < nsects; start_index = end_index)
    {
      assert (volid < vsids[start_index].volid);
      volid = vsids[start_index].volid;
      for (end_index = start_index + 1; end_index < nsects && vsids[end_index].volid == volid; end_index++)
    {
      assert (vsids[end_index].sectid > vsids[end_index - 1].sectid);
    }
      context.cache_vol_reserve[context.n_cache_vol_reserve].nsect = end_index - start_index;
      context.cache_vol_reserve[context.n_cache_vol_reserve].volid = volid;
      context.n_cache_vol_reserve++;
    }
  assert (end_index == nsects);

  disk_log ("disk_unreserve_ordered_sectors", "unreserve sectors.\n" DISK_RESERVE_CONTEXT_MSG,
        DISK_RESERVE_CONTEXT_AS_ARGS (&context));

  for (index = 0; index < context.n_cache_vol_reserve; index++)
    {
      /* unreserve volume sectors */
      context.nsects_lastvol_remaining = context.cache_vol_reserve[index].nsect;

      error_code = disk_unreserve_sectors_from_volume (thread_p, context.cache_vol_reserve[index].volid, &context);
      if (error_code != NO_ERROR)
    {
      ASSERT_ERROR ();
      return error_code;
    }
    }

  return NO_ERROR;
}

/*
 * disk_unreserve_sectors_from_volume () - un-reserve sectors indicated in reserve context from volume's sector table
 *
 * return        : error code
 * thread_p (in) : thread entry
 * volid (in)    : volume identifier
 * context (in)  : reserve context
 */
static int
disk_unreserve_sectors_from_volume (THREAD_ENTRY * thread_p, VOLID volid, DISK_RESERVE_CONTEXT * context)
{
  PAGE_PTR page_volheader;
  DISK_VOLUME_HEADER *volheader = NULL;
  SECTID sectid_start_cursor;
  DISK_STAB_CURSOR start_cursor, end_cursor;
  int error_code = NO_ERROR;

  assert (context != NULL && context->nsects_lastvol_remaining > 0);

  disk_log ("disk_unreserve_sectors_from_volume", "unreserve %d sectors in volume %d.",
        context->nsects_lastvol_remaining, volid);

  error_code = disk_get_volheader (thread_p, volid, PGBUF_LATCH_WRITE, &page_volheader, &volheader);
  if (error_code != NO_ERROR)
    {
      ASSERT_ERROR ();
      return error_code;
    }

  /* un-reserve all given sectors. */

  /* use disk_stab_iterate_units. set starting cursor to first sector rounded down */
  sectid_start_cursor = DISK_SECTS_ROUND_DOWN (context->vsidp->sectid);
  disk_stab_cursor_set_at_sectid (volheader, sectid_start_cursor, &start_cursor);
  disk_stab_cursor_set_at_end (volheader, &end_cursor);
  error_code =
    disk_stab_iterate_units (thread_p, volheader, PGBUF_LATCH_WRITE, &start_cursor, &end_cursor,
                 disk_stab_unit_unreserve, context);
  if (error_code != NO_ERROR)
    {
      ASSERT_ERROR ();
      goto exit;
    }

  /* that's it */
  assert (error_code == NO_ERROR);

exit:
  pgbuf_unfix (thread_p, page_volheader);

  return error_code;
}

/*
 * disk_stab_unit_unreserve () - DISK_STAB_UNIT_FUNC used to un-reserve sectors from sector table
 *
 * return        : NO_ERROR
 * thread_p (in) : thread entry
 * cursor (in)   : sector table cursor
 * stop (in)     : output true when all sectors have been un-reserved
 * args (in)     : reserve context
 */
static int
disk_stab_unit_unreserve (THREAD_ENTRY * thread_p, DISK_STAB_CURSOR * cursor, bool * stop, void *args)
{
  DISK_RESERVE_CONTEXT *context = (DISK_RESERVE_CONTEXT *) args;
  DISK_STAB_UNIT unreserve_bits = 0;
  LOG_DATA_ADDR addr = LOG_DATA_ADDR_INITIALIZER;
  int nsect = 0;

  while (context->nsects_lastvol_remaining > 0 && context->vsidp->sectid < cursor->sectid + DISK_STAB_UNIT_BIT_COUNT)
    {
      unreserve_bits = bit64_set (unreserve_bits, context->vsidp->sectid - cursor->sectid);
      context->nsects_lastvol_remaining--;

      disk_log ("disk_stab_unit_unreserve", "unreserve sectid %d from volume %d.", context->vsidp->sectid,
        context->vsidp->volid);

      context->vsidp++;
      nsect++;
    }

  /* all bits must be set */
  assert ((unreserve_bits & (*cursor->unit)) == unreserve_bits);
  if (unreserve_bits != 0)
    {
      if (context->purpose == DB_PERMANENT_DATA_PURPOSE)
    {
      /* postpone */
      addr.pgptr = cursor->page;
      addr.offset = cursor->offset_to_unit;
      log_append_postpone (thread_p, RVDK_UNRESERVE_SECTORS, &addr, sizeof (unreserve_bits), &unreserve_bits);
    }
      else
    {
      /* remove immediately */
      (*cursor->unit) &= ~unreserve_bits;
      pgbuf_set_dirty (thread_p, cursor->page, DONT_FREE);

      assert (context->purpose == DB_TEMPORARY_DATA_PURPOSE);
      assert (nsect > 0);

      disk_cache_lock_reserve_for_purpose (DB_TEMPORARY_DATA_PURPOSE);
      disk_cache_update_vol_free (cursor->volheader->volid, nsect);
      disk_cache_unlock_reserve_for_purpose (DB_TEMPORARY_DATA_PURPOSE);
    }
    }

  if (context->nsects_lastvol_remaining <= 0)
    {
      assert (context->nsects_lastvol_remaining == 0);
      *stop = true;
    }
  return NO_ERROR;
}

/*
 * disk_stab_init () - initialize disk sector table
 *
 * return         : error code
 * thread_p (in)  : thread entry
 * volheader (in) : volume header
 */
static int
disk_stab_init (THREAD_ENTRY * thread_p, DISK_VOLUME_HEADER * volheader)
{
  DKNSECTS nsects_sys = SECTOR_FROM_PAGEID (volheader->sys_lastpage) + 1;
  DKNSECTS nsect_copy = 0;
  VPID vpid_stab;
  PAGE_PTR page_stab = NULL;
  DISK_STAB_CURSOR start_cursor;
  DISK_STAB_CURSOR end_cursor;
  int error_code = NO_ERROR;

  assert (nsects_sys < DISK_STAB_PAGE_BIT_COUNT);

  vpid_stab.volid = volheader->volid;
  for (vpid_stab.pageid = volheader->stab_first_page;
       vpid_stab.pageid < volheader->stab_first_page + volheader->stab_npages; vpid_stab.pageid++)
    {
      page_stab = pgbuf_fix (thread_p, &vpid_stab, NEW_PAGE, PGBUF_LATCH_WRITE, PGBUF_UNCONDITIONAL_LATCH);
      if (page_stab == NULL)
    {
      ASSERT_ERROR_AND_SET (error_code);
      return error_code;
    }

      pgbuf_set_page_ptype (thread_p, page_stab, PAGE_VOLBITMAP);

      if (volheader->purpose == DB_TEMPORARY_DATA_PURPOSE)
    {
      /* why is this necessary? */
      pgbuf_set_lsa_as_temporary (thread_p, page_stab);
    }

      memset (page_stab, 0, DB_PAGESIZE);

      if (nsects_sys > 0)
    {
      nsect_copy = nsects_sys;

      disk_stab_cursor_set_at_sectid (volheader,
                      (vpid_stab.pageid - volheader->stab_first_page) * DISK_STAB_PAGE_BIT_COUNT,
                      &start_cursor);
      if (vpid_stab.pageid == volheader->stab_first_page + volheader->stab_npages - 1)
        {
          disk_stab_cursor_set_at_end (volheader, &end_cursor);
        }
      else
        {
          disk_stab_cursor_set_at_sectid (volheader,
                          (vpid_stab.pageid + 1 - volheader->stab_first_page)
                          * DISK_STAB_PAGE_BIT_COUNT, &end_cursor);
        }
      error_code = disk_stab_iterate_units (thread_p, volheader, PGBUF_LATCH_WRITE, &start_cursor, &end_cursor,
                        disk_stab_set_bits_contiguous, &nsect_copy);
      if (error_code != NO_ERROR)
        {
          ASSERT_ERROR ();
          pgbuf_unfix (thread_p, page_stab);
          return error_code;
        }
    }

      if (volheader->purpose != DB_TEMPORARY_DATA_PURPOSE)
    {
      DKNSECTS nsects_set = nsects_sys - nsect_copy;
      log_append_redo_data2 (thread_p, RVDK_INITMAP, NULL, page_stab, NULL_OFFSET, sizeof (nsects_set),
                 &nsects_set);
    }
      if (!LOG_ISRESTARTED ())
    {
      /* page buffer will invalidated and pages will not be flushed. */
      pgbuf_set_dirty (thread_p, page_stab, DONT_FREE);
      pgbuf_flush (thread_p, page_stab, FREE);
      page_stab = NULL;
    }
      else
    {
      pgbuf_set_dirty_and_free (thread_p, page_stab);
    }

      nsects_sys = nsect_copy;
      nsect_copy = 0;
    }

  return NO_ERROR;
}

/*
 * disk_manager_init () - load disk manager and allocate all required resources
 *
 * return              : error code
 * thread_p (in)       : thread entry
 * load_from_disk (in) : true to also populate disk cache with volume info
 */
int
disk_manager_init (THREAD_ENTRY * thread_p, bool load_from_disk)
{
  int error_code = NO_ERROR;

  disk_Temp_max_sects = (DKNSECTS) prm_get_integer_value (PRM_ID_BOSR_MAXTMP_PAGES);
  if (disk_Temp_max_sects < 0)
    {
      disk_Temp_max_sects = SECTID_MAX; /* infinite */
    }
  else
    {
      disk_Temp_max_sects = disk_Temp_max_sects / DISK_SECTOR_NPAGES;
    }

  disk_Logging = prm_get_bool_value (PRM_ID_DISK_LOGGING);

  if (disk_Cache != NULL)
    {
      disk_log ("disk_manager_init", "%s", "reload disk cache");
      disk_cache_final ();
    }
  error_code = disk_cache_init ();
  if (error_code != NO_ERROR)
    {
      ASSERT_ERROR ();
      return error_code;
    }
  assert (disk_Cache != NULL);

  if (load_from_disk && !disk_cache_load_all_volumes (thread_p))
    {
      ASSERT_ERROR_AND_SET (error_code);
      disk_manager_final ();
      return error_code;
    }
  return NO_ERROR;
}

/*
 * disk_manager_final () - free disk manager resources
 */
void
disk_manager_final (void)
{
  disk_cache_final ();
}

/*
 * disk_format_first_volume () - format first database volume
 *
 * return           : error code
 * thread_p (in)    : thread entry
 * full_dbname (in) : database full name
 * dbcomments (in)  : database comments
 * npages (in)      : desired number of pages
 *                    todo: replace with number of sectors or disk size
 *
 * NOTE: disk manager is also initialized
 */
int
disk_format_first_volume (THREAD_ENTRY * thread_p, const char *full_dbname, const char *dbcomments, DKNPAGES npages)
{
  int error_code = NO_ERROR;
  DBDEF_VOL_EXT_INFO ext_info;
  DKNSECTS nsect_free = 0;

  error_code = disk_manager_init (thread_p, false);
  if (error_code != NO_ERROR)
    {
      ASSERT_ERROR ();
      return error_code;
    }

  ext_info.name = full_dbname;
  ext_info.comments = dbcomments;
  ext_info.nsect_total = disk_sectors_to_extend_npages (npages);
  ext_info.nsect_max = ext_info.nsect_total;
  ext_info.max_writesize_in_sec = 0;
  ext_info.overwrite = false;
  ext_info.purpose = DB_PERMANENT_DATA_PURPOSE;
  ext_info.voltype = DB_PERMANENT_VOLTYPE;

  disk_Cache->nvols_perm = 1;
  disk_Cache->vols[LOG_DBFIRST_VOLID].purpose = DB_PERMANENT_DATA_PURPOSE;

  error_code = disk_format (thread_p, full_dbname, LOG_DBFIRST_VOLID, &ext_info, &nsect_free);
  if (error_code != NO_ERROR)
    {
      ASSERT_ERROR ();
      disk_Cache->nvols_perm = 0;
      return error_code;
    }

  disk_Cache->vols[LOG_DBFIRST_VOLID].nsect_free = nsect_free;
  disk_Cache->perm_purpose_info.extend_info.nsect_free = nsect_free;
  disk_Cache->perm_purpose_info.extend_info.nsect_total = ext_info.nsect_total;
  disk_Cache->perm_purpose_info.extend_info.nsect_max = ext_info.nsect_max;

  return NO_ERROR;
}

/*
 * disk_check_own_reserve_for_purpose () - check current thread owns reserve mutex (based on purpose)
 *
 * return       : true if thread owns mutex, false otherwise
 * purpose (in) : volume purpose
 */
STATIC_INLINE void
disk_check_own_reserve_for_purpose (DB_VOLPURPOSE purpose)
{
  assert (thread_get_current_entry_index ()
      == ((purpose == DB_PERMANENT_DATA_PURPOSE)
          ? disk_Cache->perm_purpose_info.extend_info.owner_reserve
          : disk_Cache->temp_purpose_info.extend_info.owner_reserve));
}

/*
 * disk_check_sectors_are_reserved () - check all given sectors are reserved
 *
 * return        : DISK_INVALID for unexpected flaws, DISK_ERROR for expected errors, DISK_VALID for successful check
 * thread_p (in) : thread entry
 * vsids (in)    : VSID array
 * nsects (in)   : VSID number
 */
DISK_ISVALID
disk_check_sectors_are_reserved (THREAD_ENTRY * thread_p, VSID * vsids, int nsects)
{
  int start_index = 0;
  int end_index = 0;
  int index;
  VOLID volid = NULL_VOLID;
  DISK_RESERVE_CONTEXT context;

  DISK_ISVALID valid = DISK_VALID;
  DISK_ISVALID allvalid = DISK_VALID;

  context.nsect_total = nsects;
  context.n_cache_vol_reserve = 0;
  context.vsidp = vsids;
  /* purpose is not relevant */
  context.purpose = DISK_UNKNOWN_PURPOSE;

  /* note: vsids are ordered */
  for (start_index = 0; start_index < nsects; start_index = end_index)
    {
      assert (volid < vsids[start_index].volid);
      volid = vsids[start_index].volid;
      for (end_index = start_index + 1; end_index < nsects && vsids[end_index].volid == volid; end_index++)
    {
      assert (vsids[end_index].sectid > vsids[end_index - 1].sectid);
    }
      context.cache_vol_reserve[context.n_cache_vol_reserve].nsect = end_index - start_index;
      context.cache_vol_reserve[context.n_cache_vol_reserve].volid = volid;
      context.n_cache_vol_reserve++;
    }
  assert (end_index == nsects);

  for (index = 0; index < context.n_cache_vol_reserve; index++)
    {
      /* unreserve volume sectors */
      context.nsects_lastvol_remaining = context.cache_vol_reserve[index].nsect;

      valid = disk_check_sectors_are_reserved_in_volume (thread_p, context.cache_vol_reserve[index].volid, &context);
      if (valid == DISK_INVALID)
    {
      allvalid = DISK_INVALID;
      /* continue checking */
    }
      else if (valid == DISK_ERROR)
    {
      ASSERT_ERROR ();
      return valid;
    }
    }

  return allvalid;
}

/*
 * disk_check_sectors_are_reserved_in_volume () - check that sectors in reserve context for given volume are reserved
 *
 * return        : error code
 * thread_p (in) : thread entry
 * volid (in)    : volume identifier
 * context (in)  : reserve context
 */
static DISK_ISVALID
disk_check_sectors_are_reserved_in_volume (THREAD_ENTRY * thread_p, VOLID volid, DISK_RESERVE_CONTEXT * context)
{
  PAGE_PTR page_volheader = NULL;
  DISK_VOLUME_HEADER *volheader = NULL;
  SECTID sectid_start_cursor;
  DISK_STAB_CURSOR start_cursor, end_cursor;
  int error_code = NO_ERROR;

  assert (context != NULL && context->nsects_lastvol_remaining > 0);

  if (disk_get_volheader (thread_p, volid, PGBUF_LATCH_READ, &page_volheader, &volheader) != NO_ERROR)
    {
      ASSERT_ERROR ();
      return DISK_ERROR;
    }

  sectid_start_cursor = DISK_SECTS_ROUND_DOWN (context->vsidp->sectid);
  disk_stab_cursor_set_at_sectid (volheader, sectid_start_cursor, &start_cursor);
  disk_stab_cursor_set_at_end (volheader, &end_cursor);
  error_code = disk_stab_iterate_units (thread_p, volheader, PGBUF_LATCH_READ, &start_cursor, &end_cursor,
                    disk_stab_unit_check_reserved, context);
  pgbuf_unfix (thread_p, page_volheader);
  if (error_code == ER_FAILED)
    {
      return DISK_INVALID;
    }
  else if (error_code != NO_ERROR)
    {
      return DISK_ERROR;
    }
  return DISK_VALID;
}

/*
 * disk_stab_unit_check_reserved () - check unit for reserved sectors
 *
 * return        : error code
 * thread_p (in) : thread entry
 * cursor (in)   : sector table cursor
 * stop (out)    : output true when no sectors are left to check
 * args (in/out) : reserve context
 */
static int
disk_stab_unit_check_reserved (THREAD_ENTRY * thread_p, DISK_STAB_CURSOR * cursor, bool * stop, void *args)
{
  DISK_RESERVE_CONTEXT *context = (DISK_RESERVE_CONTEXT *) args;
  DISK_STAB_UNIT bits_check_reserved = 0;

  while (context->nsects_lastvol_remaining > 0 && context->vsidp->sectid < cursor->sectid + DISK_STAB_UNIT_BIT_COUNT)
    {
      bits_check_reserved = bit64_set (bits_check_reserved, context->vsidp->sectid - cursor->sectid);
      context->nsects_lastvol_remaining--;

      disk_log ("disk_stab_unit_unreserve", "unreserve sectid %d from volume %d.", context->vsidp->sectid,
        context->vsidp->volid);

      context->vsidp++;
    }

  /* all bits must be set */
  if ((bits_check_reserved & (*cursor->unit)) != bits_check_reserved)
    {
      /* not all bits are set */
      assert_release (false);
      return ER_FAILED;
    }
  if (context->nsects_lastvol_remaining <= 0)
    {
      assert (context->nsects_lastvol_remaining == 0);
      *stop = true;
    }
  return NO_ERROR;
}

/************************************************************************/
/* Utility section                                                      */
/************************************************************************/

/*
 * disk_purpose_to_string () - Return the volume purpose in string format
 *   return:
 *   vol_purpose(in): Purpose of volume
 */
static const char *
disk_purpose_to_string (DISK_VOLPURPOSE vol_purpose)
{
  switch (vol_purpose)
    {
    case DB_PERMANENT_DATA_PURPOSE:
      return "Permanent data purpose";
    case DB_TEMPORARY_DATA_PURPOSE:
      return "Temporary data purpose";
    default:
      assert (false);
      break;
    }
  return "Unknown purpose";
}

/*
 * disk_type_to_string () - volume type to string
 *
 * return       : volume type to string
 * voltype (in) : volume type
 */
static const char *
disk_type_to_string (DB_VOLTYPE voltype)
{
  return voltype == DB_PERMANENT_VOLTYPE ? "Permanent Volume" : "Temporary Volume";
}

/*
 * disk_vhdr_set_vol_fullname () -
 *   return: NO_ERROR
 *   vhdr(in):
 *   vol_fullname(in):
 */
static int
disk_vhdr_set_vol_fullname (DISK_VOLUME_HEADER * vhdr, const char *vol_fullname)
{
  int length_diff;
  int length_to_move;
  int ret = NO_ERROR;

  length_diff = vhdr->offset_to_vol_remarks;

  length_to_move = (length_diff + (int) strlen (vhdr->var_fields + length_diff) + 1
            - vhdr->offset_to_next_vol_fullname);

  /* Difference in length between new name and old name */
  length_diff = (((int) strlen (vol_fullname) + 1)
         - (vhdr->offset_to_next_vol_fullname - vhdr->offset_to_vol_fullname));

  if (length_diff != 0)
    {
      /* We need to either move to right(expand) or left(shrink) the rest of the variable length fields */
      memmove (disk_vhdr_get_next_vol_fullname (vhdr) + length_diff, disk_vhdr_get_next_vol_fullname (vhdr),
           length_to_move);
      vhdr->offset_to_next_vol_fullname += length_diff;
      vhdr->offset_to_vol_remarks += length_diff;
    }

  (void) memcpy (disk_vhdr_get_vol_fullname (vhdr), vol_fullname,
         MIN ((ssize_t) strlen (vol_fullname) + 1, DB_MAX_PATH_LENGTH));
  return ret;
}

/*
 * disk_vhdr_set_next_vol_fullname () -
 *   return: NO_ERROR
 *   vhdr(in):
 *   next_vol_fullname(in):
 */
static int
disk_vhdr_set_next_vol_fullname (DISK_VOLUME_HEADER * vhdr, const char *next_vol_fullname)
{
  int length_diff;
  int length_to_move;
  int ret = NO_ERROR;
  int next_vol_fullname_size;

  if (next_vol_fullname == NULL)
    {
      next_vol_fullname = "";
      next_vol_fullname_size = 1;
    }
  else
    {
      next_vol_fullname_size = (int) strlen (next_vol_fullname) + 1;
      if (next_vol_fullname_size > PATH_MAX)
    {
      next_vol_fullname_size = PATH_MAX;
    }
    }

  length_diff = vhdr->offset_to_vol_remarks;

  length_to_move = (int) strlen (vhdr->var_fields + length_diff) + 1;

  /* Difference in length between new name and old name */
  length_diff = (next_vol_fullname_size - (vhdr->offset_to_vol_remarks - vhdr->offset_to_next_vol_fullname));

  if (length_diff != 0)
    {
      /* We need to either move to right(expand) or left(shrink) the rest of the variable length fields */
      memmove (disk_vhdr_get_vol_remarks (vhdr) + length_diff, disk_vhdr_get_vol_remarks (vhdr), length_to_move);
      vhdr->offset_to_vol_remarks += length_diff;
    }

  (void) memcpy (disk_vhdr_get_next_vol_fullname (vhdr), next_vol_fullname, next_vol_fullname_size);

  return ret;
}

/*
 * disk_vhdr_set_vol_remarks () -
 *   return: NO_ERROR
 *   vhdr(in):
 *   vol_remarks(in):
 */
static int
disk_vhdr_set_vol_remarks (DISK_VOLUME_HEADER * vhdr, const char *vol_remarks)
{
  int maxsize;
  int ret = NO_ERROR;

  if (vol_remarks != NULL)
    {
      maxsize = (DB_PAGESIZE - offsetof (DISK_VOLUME_HEADER, var_fields) - vhdr->offset_to_vol_remarks);

      if ((int) strlen (vol_remarks) > maxsize)
    {
      /* Does not fit.. Truncate the comment */
      (void) strncpy (disk_vhdr_get_vol_remarks (vhdr), vol_remarks, maxsize - 1);
      vhdr->var_fields[maxsize] = '\0';
    }
      else
    {
      (void) strcpy (disk_vhdr_get_vol_remarks (vhdr), vol_remarks);
    }
    }
  else
    {
      vhdr->var_fields[vhdr->offset_to_vol_remarks] = '\0';
    }

  return ret;
}

#if defined (SERVER_MODE)
/*
 * disk_log_extend_elapsed () - add expasion log for elapsed time to event log
 *
 * thread_p (in)  : thread entry
 * event (in)     : event
 * name (in)      : volume name
 * voltype (in)   : volume type
 * str (in)   : log
 */
static void
disk_log_extend_elapsed (THREAD_ENTRY * thread_p, const char *event, const char *name, DB_VOLTYPE voltype,
             const char *log)
{
  FILE *log_fp;
  int indent = 2;

  log_fp = event_log_start (thread_p, event);
  if (log_fp == NULL)
    {
      return;
    }

  fprintf (log_fp, "%*ctran index: %d\n", indent, ' ', thread_p->tran_index);
  fprintf (log_fp, "%*cvoltype: %s\n", indent, ' ',
       voltype == DB_PERMANENT_VOLTYPE ? "PERMANENT_VOLUME" : "TEMPORARY_VOLUME");
  fprintf (log_fp, "%*cvolname: %s\n", indent, ' ', name ? name : "(UNKNOWN)");
  fprintf (log_fp, "%*cevent: %s\n", indent, ' ', log);

  event_log_end (thread_p);
}
#endif

/*
 * disk_vhdr_get_vol_fullname () - get full name from volume header
 *
 * return    : full name
 * vhdr (in) : volume header
 */
STATIC_INLINE char *
disk_vhdr_get_vol_fullname (const DISK_VOLUME_HEADER * vhdr)
{
  return ((char *) (vhdr->var_fields + vhdr->offset_to_vol_fullname));
}

/*
 * disk_vhdr_get_next_vol_fullname () - get next volume full name from volume header
 *
 * return    : next volume full name
 * vhdr (in) : volume header
 */
STATIC_INLINE char *
disk_vhdr_get_next_vol_fullname (const DISK_VOLUME_HEADER * vhdr)
{
  return ((char *) (vhdr->var_fields + vhdr->offset_to_next_vol_fullname));
}

/*
 * disk_vhdr_get_vol_remarks () - get remarks from volume header
 *
 * return    : remarks
 * vhdr (in) : volume header
 */
STATIC_INLINE char *
disk_vhdr_get_vol_remarks (const DISK_VOLUME_HEADER * vhdr)
{
  return ((char *) (vhdr->var_fields + vhdr->offset_to_vol_remarks));
}

/*
 * disk_vhdr_get_vol_header_size () - get the size of volume header including variable fields
 *
 * return    : total size
 * vhdr (in) : volume header
 */
STATIC_INLINE int
disk_vhdr_get_vol_header_size (const DISK_VOLUME_HEADER * vhdr)
{
  assert (vhdr != NULL);

  const char *remarks = disk_vhdr_get_vol_remarks (vhdr);
  const int volume_header_size = (remarks + strlen (remarks) + 1) - (char *) (vhdr);

  assert (volume_header_size >= 0 && volume_header_size <= DB_PAGESIZE);

  return volume_header_size;
}

/*
 * disk_set_checkpoint () - Reset the recovery checkpoint address for this volume
 *   return: NO_ERROR;
 *   volid(in): Permanent volume identifier
 *   log_chkpt_lsa(in): Recovery checkpoint for volume
 *
 * Note: The dirty pages of this volume (except the header page which maintains the checkpoint value) are not
 *   written out. The function assumes that all volume pages with lsa smaller that the given one has already
 *   been forced to disk (e.g., by the log and recovery manager).
 *
 *       When a backup of the database is taken, it is important that the volume header page is forced out.
 *       The checkpoint on the volume is used as an indicator to start a media recovery process, so it may be good idea
 *       to force all dirty unfixed pages.
 */
int
disk_set_checkpoint (THREAD_ENTRY * thread_p, INT16 volid, const LOG_LSA * log_chkpt_lsa)
{
  DISK_VOLUME_HEADER *vhdr;
  LOG_DATA_ADDR addr;
  int error_code = NO_ERROR;

  addr.pgptr = NULL;
  addr.vfid = NULL;
  addr.offset = 0;

  error_code = disk_get_volheader (thread_p, volid, PGBUF_LATCH_WRITE, &addr.pgptr, &vhdr);
  if (error_code != NO_ERROR)
    {
      ASSERT_ERROR ();
      return error_code;
    }

  vhdr->chkpt_lsa.pageid = log_chkpt_lsa->pageid;
  vhdr->chkpt_lsa.offset = log_chkpt_lsa->offset;

  (void) disk_verify_volume_header (thread_p, addr.pgptr);

  log_skip_logging (thread_p, &addr);
  pgbuf_set_dirty (thread_p, addr.pgptr, DONT_FREE);
  pgbuf_flush (thread_p, addr.pgptr, FREE);
  addr.pgptr = NULL;

  return NO_ERROR;
}

/*
 * disk_get_checkpoint () - Get the recovery checkpoint address of this volume
 *   return: NO_ERROR
 *   volid(in): Permanent volume identifier
 *   vol_lsa(out): Volume recovery checkpoint
 */
int
disk_get_checkpoint (THREAD_ENTRY * thread_p, INT16 volid, LOG_LSA * vol_lsa)
{
  DISK_VOLUME_HEADER *vhdr;
  PAGE_PTR hdr_pgptr = NULL;
  int error_code = NO_ERROR;

  error_code = disk_get_volheader (thread_p, volid, PGBUF_LATCH_READ, &hdr_pgptr, &vhdr);
  if (error_code != NO_ERROR)
    {
      ASSERT_ERROR ();
      return error_code;
    }

  vol_lsa->pageid = vhdr->chkpt_lsa.pageid;
  vol_lsa->offset = vhdr->chkpt_lsa.offset;

  (void) disk_verify_volume_header (thread_p, hdr_pgptr);

  pgbuf_unfix_and_init (thread_p, hdr_pgptr);

  return NO_ERROR;
}

/*
 * disk_get_db_creation () - Get the database creation time according to the volume header
 *   return: void
 *   volid(in): Permanent volume identifier
 *   db_creation(out): Database creation time according to the volume
 */
int
disk_get_db_creation (THREAD_ENTRY * thread_p, INT16 volid, INT64 * db_creation)
{
  DISK_VOLUME_HEADER *vhdr;
  PAGE_PTR hdr_pgptr = NULL;
  int error_code = NO_ERROR;

  error_code = disk_get_volheader (thread_p, volid, PGBUF_LATCH_READ, &hdr_pgptr, &vhdr);
  if (error_code != NO_ERROR)
    {
      ASSERT_ERROR ();
      return error_code;
    }

  memcpy (db_creation, &vhdr->db_creation, sizeof (*db_creation));

  (void) disk_verify_volume_header (thread_p, hdr_pgptr);

  pgbuf_unfix_and_init (thread_p, hdr_pgptr);

  return NO_ERROR;
}

/*
 * xdisk_get_purpose () - Find the main purpose of the given volume
 *   return: volume_purpose or DISK_UNKNOWN_PURPOSE
 *   volid(in): Permanent volume identifier
 */
DISK_VOLPURPOSE
xdisk_get_purpose (THREAD_ENTRY * thread_p, INT16 volid)
{
  if (disk_Cache == NULL)
    {
      /* manager not initialized. only allow fetches from first volume. */
      assert (volid == LOG_DBFIRST_VOLID);
      return DB_PERMANENT_DATA_PURPOSE;
    }

  if (volid < LOG_DBFIRST_VOLID)
    {
      /* system volumes */
      return DISK_UNKNOWN_PURPOSE;
    }

  if (!disk_is_valid_volid (volid))
    {
      assert (false);
      return DISK_UNKNOWN_PURPOSE;
    }

  return disk_get_volpurpose (volid);
}

/*
 * xdisk_get_purpose_and_space_info () - Find the main purpose and space info of the volume
 *
 *   return: volid or NULL_VOLID in case of error
 *   volid(in): Permanent volume identifier. If NULL_VOLID is given, total information of all volumes is requested.
 *   vol_purpose(out): Purpose for the given volume
 *   space_info (out): space info of the volume.
 *
 * Note: The free number of pages should be taken as an approximation by the caller since we do not leave the page
 *   locked after the inquire. That is, someone else can allocate pages
 */
int
xdisk_get_purpose_and_space_info (THREAD_ENTRY * thread_p, VOLID volid, DISK_VOLPURPOSE * vol_purpose,
                  DISK_VOLUME_SPACE_INFO * space_info)
{
  int error_code = NO_ERROR;

  assert (volid != NULL_VOLID);
  assert (disk_Cache != NULL);
  assert (volid < disk_Cache->nvols_perm || volid > LOG_MAX_DBVOLID - disk_Cache->nvols_temp);

  if (space_info != NULL)
    {
      /* we don't cache total/max sectors */
      PAGE_PTR page_volheader;
      DISK_VOLUME_HEADER *volheader;

      memset (space_info, 0, sizeof (*space_info));

      error_code = disk_get_volheader (thread_p, volid, PGBUF_LATCH_READ, &page_volheader, &volheader);
      if (error_code != NO_ERROR)
    {
      ASSERT_ERROR ();
      return error_code;
    }

      space_info->n_max_sects = volheader->nsect_max;
      space_info->n_total_sects = volheader->nsect_total;

      pgbuf_unfix_and_init (thread_p, page_volheader);

      space_info->n_free_sects = disk_Cache->vols[volid].nsect_free;
    }
  if (vol_purpose != NULL)
    {
      *vol_purpose = disk_Cache->vols[volid].purpose;
    }

  return NO_ERROR;
}

/*
 * xdisk_get_purpose_and_sys_lastpage () - Find the main purpose of the given volume and the pagied of the last system
 *                     page used by the volume
 *   return: volid or NULL_VOLID in case of error
 *   volid(in): Permanent volume identifier
 *   vol_purpose(out): Purpose for the given volume
 *   sys_lastpage(out): Pageid of last system page
 */
INT16
xdisk_get_purpose_and_sys_lastpage (THREAD_ENTRY * thread_p, INT16 volid, DISK_VOLPURPOSE * vol_purpose,
                    INT32 * sys_lastpage)
{
  DISK_VOLUME_HEADER *vhdr;
  PAGE_PTR hdr_pgptr = NULL;

  /* The purpose of a volume does not change, so we do not lock the page */
  if (disk_get_volheader (thread_p, volid, PGBUF_LATCH_READ, &hdr_pgptr, &vhdr) != NO_ERROR)
    {
      ASSERT_ERROR ();
      return NULL_VOLID;
    }

  *vol_purpose = vhdr->purpose;
  *sys_lastpage = vhdr->sys_lastpage;

  (void) disk_verify_volume_header (thread_p, hdr_pgptr);

  pgbuf_unfix_and_init (thread_p, hdr_pgptr);

  return volid;
}

/*
 * xdisk_get_total_numpages () - Return the number of total pages for the given volume
 *   return: Total Number of pages
 *   volid(in): Permanent volume identifier
 */
INT32
xdisk_get_total_numpages (THREAD_ENTRY * thread_p, INT16 volid)
{
  /* we don't have this info in cache */
  /* todo: investigate further its usage */
  PAGE_PTR page_volheader;
  DISK_VOLUME_HEADER *volheader;

  DKNPAGES npages;

  if (disk_get_volheader (thread_p, volid, PGBUF_LATCH_READ, &page_volheader, &volheader) != NO_ERROR)
    {
      ASSERT_ERROR ();
      return -1;
    }
  npages = DISK_SECTS_NPAGES (volheader->nsect_total);

  pgbuf_unfix (thread_p, page_volheader);

  return npages;
}

/*
 * xdisk_get_free_numpages () - Return the number of free pages for the given volume
 *   return: Number of free pages
 *   volid(in): Permanent volume identifier
 *
 * Note: The free number of pages should be taken as an approximation by the caller since we do not leave the page
 *       locked after the inquire. That is, someone else can allocate pages.
 */
INT32
xdisk_get_free_numpages (THREAD_ENTRY * thread_p, INT16 volid)
{
  /* get from cache. */
  /* todo: investigate usage */
  assert (disk_Cache != NULL);
  assert (volid <= disk_Cache->nvols_perm || volid > LOG_MAX_DBVOLID - disk_Cache->nvols_temp);

  return DISK_SECTS_NPAGES (disk_Cache->vols[volid].nsect_free);
}

/*
 * xdisk_is_volume_exist () -
 *   return:
 *   volid(in): volume identifier
 */
bool
xdisk_is_volume_exist (THREAD_ENTRY * thread_p, VOLID volid)
{
  DISK_CHECK_VOL_INFO vol_info;

  vol_info.volid = volid;
  vol_info.exists = false;
  (void) fileio_map_mounted (thread_p, disk_check_volume_exist, &vol_info);

  return vol_info.exists;
}

/*
 * disk_get_total_numsectors () - Return the number of total sectors for the given volume
 *   return: Total Number of sectors
 *   volid(in): Permanent volume identifier
 */
INT32
disk_get_total_numsectors (THREAD_ENTRY * thread_p, INT16 volid)
{
  DISK_VOLUME_HEADER *vhdr;
  PAGE_PTR hdr_pgptr = NULL;
  INT32 total_sects;

  /* todo: will we need this? */

  if (disk_get_volheader (thread_p, volid, PGBUF_LATCH_READ, &hdr_pgptr, &vhdr) != NO_ERROR)
    {
      ASSERT_ERROR ();
      return -1;
    }

  total_sects = vhdr->nsect_total;

  pgbuf_unfix_and_init (thread_p, hdr_pgptr);

  return total_sects;
}

/*
 * xdisk_get_fullname () - Find the name of the volume and copy it into vol_fullname
 *   return: vol_fullname on success or NULL on failure
 *   volid(in): Permanent volume identifier
 *   vol_fullname(out): Address where the name of the volume is placed.
 *                     The size must be at least DB_MAX_PATH_LENGTH
 *
 * Note: Alternative function fileio_get_volume_label which is much faster and does not copy the name
 */
char *
xdisk_get_fullname (THREAD_ENTRY * thread_p, INT16 volid, char *vol_fullname)
{
  DISK_VOLUME_HEADER *vhdr;
  PAGE_PTR hdr_pgptr = NULL;

  if (vol_fullname == NULL)
    {
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OBJ_INVALID_ARGUMENTS, 0);
      return NULL;
    }

  if (disk_get_volheader (thread_p, volid, PGBUF_LATCH_READ, &hdr_pgptr, &vhdr) != NO_ERROR)
    {
      ASSERT_ERROR ();
      *vol_fullname = '\0';
      return NULL;
    }

  strncpy (vol_fullname, disk_vhdr_get_vol_fullname (vhdr), DB_MAX_PATH_LENGTH);

  (void) disk_verify_volume_header (thread_p, hdr_pgptr);

  pgbuf_unfix_and_init (thread_p, hdr_pgptr);

  return vol_fullname;
}

/*
 * xdisk_get_remarks () - Find the remarks attached to the volume creation
 *   return: remarks string
 *   volid(in): Permanent volume identifier
 *
 * Note: The string returned, must be freed by using free_and_init.
 */
char *
xdisk_get_remarks (THREAD_ENTRY * thread_p, INT16 volid)
{
  DISK_VOLUME_HEADER *vhdr;
  PAGE_PTR hdr_pgptr = NULL;
  char *remarks;

  if (disk_get_volheader (thread_p, volid, PGBUF_LATCH_READ, &hdr_pgptr, &vhdr) != NO_ERROR)
    {
      ASSERT_ERROR ();
      return NULL;
    }

  remarks = (char *) malloc ((int) strlen (disk_vhdr_get_vol_remarks (vhdr)) + 1);
  if (remarks != NULL)
    {
      strcpy (remarks, disk_vhdr_get_vol_remarks (vhdr));
    }

  pgbuf_unfix_and_init (thread_p, hdr_pgptr);

  return remarks;
}

/*
 * disk_get_boot_db_charset () - Find the system boot charset
 *   return: error code
 *   volid(in): Permanent volume identifier
 *   db_charset(out): System boot charset
 */
int
disk_get_boot_db_charset (THREAD_ENTRY * thread_p, INT16 volid, INTL_CODESET * db_charset)
{
  DISK_VOLUME_HEADER *vhdr;
  PAGE_PTR pgptr = NULL;

  int error_code = NO_ERROR;

  assert (db_charset != NULL);

  error_code = disk_get_volheader (thread_p, volid, PGBUF_LATCH_READ, &pgptr, &vhdr);
  if (error_code != NO_ERROR)
    {
      ASSERT_ERROR ();
      return error_code;
    }

  *db_charset = (INTL_CODESET) vhdr->db_charset;

  pgbuf_unfix_and_init (thread_p, pgptr);

  return NO_ERROR;
}

/*
 * disk_dump_goodvol_system () - dump volume system information
 *
 * return        : error code
 * thread_p (in) : thread entry
 * fp (in)       : output file
 * volid (in)    : volume identifier
 */
static int
disk_dump_volume_system_info (THREAD_ENTRY * thread_p, FILE * fp, INT16 volid)
{
  DISK_VOLUME_HEADER *vhdr;
  PAGE_PTR hdr_pgptr = NULL;
  int error_code = NO_ERROR;

  error_code = disk_get_volheader (thread_p, volid, PGBUF_LATCH_READ, &hdr_pgptr, &vhdr);
  if (error_code != NO_ERROR)
    {
      ASSERT_ERROR ();
      return error_code;
    }
  disk_vhdr_dump (fp, vhdr);
  error_code = disk_stab_dump (thread_p, fp, vhdr);
  if (error_code != NO_ERROR)
    {
      ASSERT_ERROR ();
    }
  else
    {
      (void) fprintf (fp, "\n\n");
    }
  pgbuf_unfix_and_init (thread_p, hdr_pgptr);

  return error_code;
}

/*
 * disk_dump_all () - Dump the system area information of every single volume,
 *                 but log and backup volumes.
 *   return: NO_ERROR;
 */
int
disk_dump_all (THREAD_ENTRY * thread_p, FILE * fp)
{
  int ret = NO_ERROR;

  ret = (fileio_map_mounted (thread_p, disk_dump_goodvol_all, fp) == true ? NO_ERROR : ER_FAILED);

  return ret;
}

/*
 * disk_dump_goodvol_all () -  Dump all information of given volume
 *   return: true
 *   volid(in): Permanent volume identifier
 *   arg(in): output file pointer
 */
static bool
disk_dump_goodvol_all (THREAD_ENTRY * thread_p, INT16 volid, void *arg)
{
  FILE *const fp = (FILE *) arg;
  (void) disk_dump_volume_system_info (thread_p, fp, volid);

  return true;
}

/*
 * disk_is_valid_volid () - is volume identifier valid? (permanent or temporary)
 *
 * return     : true if volume id is valid, false otherwise
 * volid (in) : volume identifier
 */
STATIC_INLINE bool
disk_is_valid_volid (VOLID volid)
{
  return volid < disk_Cache->nvols_perm || volid > LOG_MAX_DBVOLID - disk_Cache->nvols_temp;
}

/*
 * disk_get_volpurpose () - get volume purpose
 *
 * return     : volume purpose
 * volid (in) : volume identifier
 */
STATIC_INLINE DB_VOLPURPOSE
disk_get_volpurpose (VOLID volid)
{
  assert (disk_Cache != NULL);
  assert (LOG_DBFIRST_VOLID <= volid && disk_is_valid_volid (volid));

  return disk_Cache->vols[volid].purpose;
}

/*
 * disk_get_voltype () - get volume type
 *
 * return     : permanent/temporary volume type
 * volid (in) : volume identifier
 */
STATIC_INLINE DB_VOLTYPE
disk_get_voltype (VOLID volid)
{
  assert (disk_Cache != NULL);
  assert (disk_is_valid_volid (volid));

  return volid < disk_Cache->nvols_perm ? DB_PERMANENT_VOLTYPE : DB_TEMPORARY_VOLTYPE;
}

/*
 * disk_spacedb () - get space info from disk manager
 *
 * return          : error code
 * thread_p (in)   : thread entry
 * spaceall (out)  : output info on all volumes
 * spacevols (out) : output info for each volume if not null
 */
int
disk_spacedb (THREAD_ENTRY * thread_p, SPACEDB_ALL * spaceall, SPACEDB_ONEVOL ** spacevols)
{
  int i;
  int iter_vol;
  int iter_spacevols;
  int nvols_total = 0;
  bool is_extend_locked = false;

  int error_code = NO_ERROR;

  assert (spaceall != NULL);

  spaceall[SPACEDB_PERM_TEMP_ALL].nvols = 0;

  /* block extensions for the short period we'll be reading volume info. */
  disk_lock_extend ();
  is_extend_locked = true;

  /* get number of volumes. we know the total number of permanent type volumes, we'll have to count how many of them
   * have temporary purpose */
  for (iter_vol = 0; iter_vol < disk_Cache->nvols_perm; iter_vol++)
    {
      if (disk_Cache->vols[iter_vol].purpose == DB_TEMPORARY_DATA_PURPOSE)
    {
      spaceall[SPACEDB_PERM_TEMP_ALL].nvols++;
    }
    }

  spaceall[SPACEDB_PERM_PERM_ALL].nvols = disk_Cache->nvols_perm - spaceall[SPACEDB_PERM_TEMP_ALL].nvols;
  spaceall[SPACEDB_TEMP_TEMP_ALL].nvols = disk_Cache->nvols_temp;
  nvols_total = disk_Cache->nvols_perm + disk_Cache->nvols_temp;

  spaceall[SPACEDB_PERM_PERM_ALL].npage_used =
    DISK_SECTS_NPAGES (disk_Cache->perm_purpose_info.extend_info.nsect_total
               - disk_Cache->perm_purpose_info.extend_info.nsect_free);
  spaceall[SPACEDB_PERM_PERM_ALL].npage_free = DISK_SECTS_NPAGES (disk_Cache->perm_purpose_info.extend_info.nsect_free);

  spaceall[SPACEDB_PERM_TEMP_ALL].npage_used =
    DISK_SECTS_NPAGES (disk_Cache->temp_purpose_info.nsect_perm_total - disk_Cache->temp_purpose_info.nsect_perm_free);
  spaceall[SPACEDB_PERM_TEMP_ALL].npage_free = DISK_SECTS_NPAGES (disk_Cache->temp_purpose_info.nsect_perm_free);

  spaceall[SPACEDB_TEMP_TEMP_ALL].npage_used =
    DISK_SECTS_NPAGES (disk_Cache->temp_purpose_info.extend_info.nsect_total
               - disk_Cache->temp_purpose_info.extend_info.nsect_free);
  spaceall[SPACEDB_TEMP_TEMP_ALL].npage_free = DISK_SECTS_NPAGES (disk_Cache->temp_purpose_info.extend_info.nsect_free);

  if (spacevols != NULL)
    {
      /* get info on each volume */
      iter_spacevols = 0;

      *spacevols = (SPACEDB_ONEVOL *) malloc (nvols_total * sizeof (SPACEDB_ONEVOL));
      if (*spacevols == NULL)
    {
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, nvols_total * sizeof (SPACEDB_ONEVOL));
      error_code = ER_OUT_OF_VIRTUAL_MEMORY;
      goto exit;
    }

      for (iter_vol = 0; iter_vol < disk_Cache->nvols_perm; iter_vol++)
    {
      (*spacevols)[iter_spacevols].volid = iter_vol;
      (*spacevols)[iter_spacevols].type = DB_PERMANENT_VOLTYPE;
      (*spacevols)[iter_spacevols].purpose = disk_Cache->vols[iter_vol].purpose;
      (*spacevols)[iter_spacevols].npage_free = DISK_SECTS_NPAGES (disk_Cache->vols[iter_vol].nsect_free);
      iter_spacevols++;
    }
      for (iter_vol = LOG_MAX_DBVOLID - disk_Cache->nvols_temp + 1; iter_vol <= LOG_MAX_DBVOLID; iter_vol++)
    {
      (*spacevols)[iter_spacevols].volid = iter_vol;
      (*spacevols)[iter_spacevols].type = DB_TEMPORARY_VOLTYPE;
      (*spacevols)[iter_spacevols].purpose = disk_Cache->vols[iter_vol].purpose;
      (*spacevols)[iter_spacevols].npage_free = DISK_SECTS_NPAGES (disk_Cache->vols[iter_vol].nsect_free);
      iter_spacevols++;
    }
      assert (iter_spacevols == nvols_total);
    }

  /* now unlock the extensions */
  disk_unlock_extend ();
  is_extend_locked = false;

  /* complete total values */
  memset (&spaceall[SPACEDB_TOTAL_ALL], 0, sizeof (spaceall[SPACEDB_TOTAL_ALL]));
  for (i = 0; i < SPACEDB_TOTAL_ALL; i++)
    {
      spaceall[SPACEDB_TOTAL_ALL].nvols += spaceall[i].nvols;
      spaceall[SPACEDB_TOTAL_ALL].npage_used += spaceall[i].npage_used;
      spaceall[SPACEDB_TOTAL_ALL].npage_free += spaceall[i].npage_free;
    }

  if (spacevols != NULL)
    {
      PAGE_PTR page_volheader = NULL;
      DISK_VOLUME_HEADER *volheader = NULL;

      /* we still have to read the total number of sectors and names for each volume, which are found in volumes header
       * pages, which need latches, which are slow, therefore we do it here, after unlocking extend. */
      assert (*spacevols != NULL);

      for (iter_spacevols = 0; iter_spacevols < nvols_total; iter_spacevols++)
    {
      error_code =
        disk_get_volheader (thread_p, (*spacevols)[iter_spacevols].volid, PGBUF_LATCH_READ, &page_volheader,
                &volheader);
      if (error_code != NO_ERROR)
        {
          ASSERT_ERROR ();
          goto exit;
        }
      (*spacevols)[iter_spacevols].npage_used =
        DISK_SECTS_NPAGES (volheader->nsect_total) - (*spacevols)[iter_spacevols].npage_free;
      strncpy ((*spacevols)[iter_spacevols].name, disk_vhdr_get_vol_fullname (volheader), DB_MAX_PATH_LENGTH);
      pgbuf_unfix_and_init (thread_p, page_volheader);
    }
    }

  /* success */
  assert (error_code == NO_ERROR);

exit:

  if (is_extend_locked)
    {
      disk_unlock_extend ();
    }

  if (error_code != NO_ERROR && *spacevols != NULL)
    {
      /* free spacevols */
      free_and_init (*spacevols);
    }

  return error_code;
}

/*
 * disk_compare_vsids () - Compare two sector identifiers.
 *
 * return      : 1 if first sector is bigger, -1 if first sector is smaller and 0 if sector ids are equal
 * first (in)  : first sector id
 * second (in) : second sector id
 */
int
disk_compare_vsids (const void *first, const void *second)
{
  VSID *first_vsid = (VSID *) first;
  VSID *second_vsid = (VSID *) second;

  if (first_vsid->volid > second_vsid->volid)
    {
      return 1;
    }
  else if (first_vsid->volid < second_vsid->volid)
    {
      return -1;
    }
  return (int) (first_vsid->sectid - second_vsid->sectid);
}

/************************************************************************/
/* Disk check section                                                   */
/************************************************************************/

/*
 * disk_check_volume_exist () - check whether volume existed
 *   return: NO_ERROR, or ER_code
 *
 *   thread_p(in):
 *   volid(in):
 *   arg(in/out):CHECK_VOL_INFO structure
 */
static bool
disk_check_volume_exist (THREAD_ENTRY * thread_p, VOLID volid, void *arg)
{
  DISK_CHECK_VOL_INFO *vol_infop = (DISK_CHECK_VOL_INFO *) arg;

  if (volid == vol_infop->volid)
    {
      vol_infop->exists = true;
    }
  return true;
}

/*
 * disk_can_overwrite_data_volume() - check whether data volume can be overwritten
 *
 * return : error code
 * thread_p(in) : thread entry
 * vol_label_p(in) : volume label
 * can_overwrite(out) : true, if the volume can be overwritten
 */
static int
disk_can_overwrite_data_volume (THREAD_ENTRY * thread_p, const char *vol_label_p, bool * can_overwrite)
{
  char data_buf[IO_MAX_PAGE_SIZE + MAX_ALIGNMENT], *data;
  DISK_VOLUME_HEADER *vhdr;
  int vol_fd = NULL_VOLDES;
  off_t read_size;
  size_t page_reserver_area;
  int error_code = NO_ERROR;
  struct stat stat_buffer;

  assert (vol_label_p != NULL && can_overwrite != NULL);

  *can_overwrite = false;

#if !defined(CS_MODE)
  /* Is volume already mounted ? */
  vol_fd = fileio_find_volume_descriptor_with_label (vol_label_p);
  if (vol_fd != NULL_VOLDES)
    {
      /* Can't overwrite the mounted volume. */
      return NO_ERROR;
    }
#endif /* !CS_MODE */

  data = PTR_ALIGN (data_buf, MAX_ALIGNMENT);
  page_reserver_area = offsetof (FILEIO_PAGE, page);
  vhdr = (DISK_VOLUME_HEADER *) (data + page_reserver_area);

  /* Check the existence of the file by opening the file */
  vol_fd = fileio_open (vol_label_p, O_RDONLY, 0);
  if (vol_fd == NULL_VOLDES)
    {
      /* Someone deleted the volume. It can be written. */
      *can_overwrite = true;
      return NO_ERROR;
    }

  /* Computes the number of bytes to read. */
  read_size = offsetof (DISK_VOLUME_HEADER, db_creation) + sizeof (((DISK_VOLUME_HEADER *) 0)->db_creation);
  assert (read_size > (off_t) (offsetof (DISK_VOLUME_HEADER, magic) + CUBRID_MAGIC_MAX_LENGTH));
  read_size += page_reserver_area;

  if (fstat (vol_fd, &stat_buffer) != 0)
    {
      error_code = ER_FAILED;
      goto end;
    }

  // We will overwrite the file:
  // 0 size or the creation timestamp of the file is older than that of db.
  if (stat_buffer.st_size == 0)
    {
      // regard a failure happened during creation.
      *can_overwrite = true;
      goto end;
    }
  else if (stat_buffer.st_size < read_size)
    {
      /* Is not a CUBRID file. It can't be overwritten. */
      goto end;
    }

  /* Read block flush size bytes at offset 0. */
  if (fileio_read (thread_p, vol_fd, data, 0, read_size) == NULL)
    {
      error_code = ER_FAILED;
      goto end;
    }

  /* Check whether the existing volume is leaked from previous database and can be overwritten. */
  if (strncmp (vhdr->magic, CUBRID_MAGIC_DATABASE_VOLUME, CUBRID_MAGIC_MAX_LENGTH) == 0
      && log_Gl.hdr.db_creation > vhdr->db_creation)
    {
      /* Old CUBRID file. It can be overwritten. */
      *can_overwrite = true;
    }

end:
  if (vol_fd != NULL_VOLDES)
    {
      fileio_close (vol_fd);
    }

  return error_code;
}

/*
 * disk_compatible_type_and_purpose () - is volume purpose compatible to volume type?
 *
 * return       : true if compatible, false otherwise
 * type (in)    : volume type
 * purpose (in) : volume purpose
 */
STATIC_INLINE bool
disk_compatible_type_and_purpose (DB_VOLTYPE type, DB_VOLPURPOSE purpose)
{
  /* temporary type with permanent purpose is not compatible */
  return type == DB_PERMANENT_VOLTYPE || purpose == DB_TEMPORARY_DATA_PURPOSE;
}

/*
 * disk_check_volume () - compare cache and volume and check for inconsistencies
 *
 * return        : DISK_VALID if no inconsistency or if all inconsistencies have been fixed
 *                 DISK_ERROR if expected errors occurred
 *                 DISK_INVALID if cache and/or volume header are inconsistent
 * thread_p (in) : thread entry
 * volid (in)    : volume identifier
 * repair (in)   : true to try fix the inconsistencies
 */
static DISK_ISVALID
disk_check_volume (THREAD_ENTRY * thread_p, INT16 volid, bool repair)
{
  DISK_ISVALID valid = DISK_VALID;
  DISK_VOLUME_HEADER *volheader;
  PAGE_PTR page_volheader = NULL;
  DKNSECTS nfree = 0;
  int error_code = NO_ERROR;

  /* get check critical section. it will prevent reservations/extensions to interfere with the process */
  error_code = csect_enter (thread_p, CSECT_DISK_CHECK, INF_WAIT);
  if (error_code != NO_ERROR)
    {
      ASSERT_ERROR ();
      return DISK_ERROR;
    }

  if (disk_get_volheader (thread_p, volid, PGBUF_LATCH_READ, &page_volheader, &volheader) != NO_ERROR)
    {
      ASSERT_ERROR ();
      valid = DISK_ERROR;
      goto exit;
    }

  /* check that purpose matches */
  if (volheader->purpose != disk_Cache->vols[volid].purpose)
    {
      assert (false);
      if (repair)
    {
      disk_Cache->vols[volid].purpose = volheader->purpose;
    }
      else
    {
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_GENERIC_ERROR, 0);
      valid = DISK_INVALID;
      goto exit;
    }
    }

  /* count free sectors */
  error_code = disk_stab_iterate_units_all (thread_p, volheader, PGBUF_LATCH_READ, disk_stab_count_free, &nfree);
  if (error_code != NO_ERROR)
    {
      ASSERT_ERROR ();
      valid = DISK_ERROR;
      goto exit;
    }

  /* check (and maybe fix) cache inconsistencies */
  disk_cache_lock_reserve_for_purpose (volheader->purpose);
  if (nfree != disk_Cache->vols[volid].nsect_free)
    {
      /* inconsistent! */
      assert (false);

      if (repair)
    {
      DKNSECTS diff = nfree - disk_Cache->vols[volid].nsect_free;
      disk_cache_update_vol_free (volid, diff);
    }
      else
    {
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_DISK_INCONSISTENT_NFREE_SECTS, 3,
          fileio_get_volume_label (volid, PEEK), disk_Cache->vols[volid].nsect_free, nfree);
      valid = DISK_INVALID;
    }
    }
  disk_cache_unlock_reserve_for_purpose (volheader->purpose);

  /* the following check also added to the disk_verify_volume_header() macro */
  if (volheader->sect_npgs != DISK_SECTOR_NPAGES
      || volheader->stab_first_page != DISK_VOLHEADER_PAGE + 1
      || volheader->sys_lastpage != (volheader->stab_first_page + volheader->stab_npages - 1)
      || volheader->nsect_total > volheader->nsect_max
      || volheader->stab_npages < CEIL_PTVDIV (volheader->nsect_max, DISK_STAB_PAGE_BIT_COUNT)
      || volheader->nsect_total < disk_Cache->vols[volid].nsect_free)
    {
      assert (false);
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_DISK_INCONSISTENT_VOL_HEADER, 1,
          fileio_get_volume_label (volid, PEEK));
      valid = DISK_INVALID;
    }

  /* permanent volume is un-maxed if and only if it is the auto-extend volume */
  if (volheader->purpose == DB_PERMANENT_DATA_PURPOSE && volheader->nsect_max > volheader->nsect_total
      && disk_Cache->perm_purpose_info.extend_info.volid_extend != volid)
    {
      /* inconsistent! */
      assert_release (false);

      /* how to repair?? */
      valid = DISK_INVALID;
    }

exit:
  if (page_volheader != NULL)
    {
      pgbuf_unfix_and_init (thread_p, page_volheader);
    }

  disk_log ("disk_check_volume", "check volume %d is %s", volid, valid == DISK_VALID ? "valid" : "not valid");

  csect_exit (thread_p, CSECT_DISK_CHECK);

  return valid;
}

/*
 * disk_check () - check disk cache is not out of sync
 *
 * return        : DISK_VALID if all was ok or fixed
                   DISK_ERROR if expected error occurred
                   DISK_INVALID if disk cache is in an invalid state
 * thread_p (in) : thread entry
 * repair (in)   : true to repair invalid states (if possible)
 */
DISK_ISVALID
disk_check (THREAD_ENTRY * thread_p, bool repair)
{
  DISK_ISVALID valid = DISK_VALID;
  int nvols_perm;
  int nvols_temp;
  VOLID volid_perm_last;
  VOLID volid_temp_last;
  VOLID volid_iter;
  DKNSECTS perm_free;
  DKNSECTS temp_free;
  int error_code = NO_ERROR;

  error_code = csect_enter (thread_p, CSECT_DISK_CHECK, INF_WAIT);
  if (error_code != NO_ERROR)
    {
      ASSERT_ERROR ();
      return DISK_ERROR;
    }

  /* first step: check cache matches boot_db_parm */
  nvols_perm = xboot_find_number_permanent_volumes (thread_p);
  nvols_temp = xboot_find_number_temp_volumes (thread_p);
  volid_perm_last = xboot_find_last_permanent (thread_p);
  volid_temp_last = xboot_find_last_temp (thread_p);

  if (nvols_perm != volid_perm_last + 1)
    {
      /* cannot repair */
      assert_release (false);
      csect_exit (thread_p, CSECT_DISK_CHECK);
      return DISK_INVALID;
    }

  if (nvols_temp > 0 && (nvols_temp != (LOG_MAX_DBVOLID - volid_temp_last + 1)))
    {
      /* cannot repair */
      assert_release (false);
      csect_exit (thread_p, CSECT_DISK_CHECK);
      return DISK_INVALID;
    }

  if (nvols_perm != disk_Cache->nvols_perm)
    {
      assert (false);
      if (repair)
    {
      disk_Cache->nvols_perm = nvols_perm;
    }
      else
    {
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_GENERIC_ERROR, 0);
      csect_exit (thread_p, CSECT_DISK_CHECK);
      return DISK_INVALID;
    }
    }

  if (nvols_temp > 0 && nvols_temp != disk_Cache->nvols_temp)
    {
      assert (false);
      if (repair)
    {
      disk_Cache->nvols_temp = nvols_temp;
    }
      else
    {
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_GENERIC_ERROR, 0);
      csect_exit (thread_p, CSECT_DISK_CHECK);
      return DISK_INVALID;
    }
    }

  disk_log ("disk_check", "first check step is %s", valid == DISK_VALID ? "valid" : "not valid");

  /* release critical section. we will get it for each volume we check, to avoid blocking all reservations and
   * extensions for a long time. */
  csect_exit (thread_p, CSECT_DISK_CHECK);

  /* second step: check volume cached info is consistent */
  for (volid_iter = 0; volid_iter < disk_Cache->nvols_perm; volid_iter++)
    {
      valid = disk_check_volume (thread_p, volid_iter, repair);
      if (valid != DISK_VALID)
    {
      assert (valid == DISK_ERROR);
      ASSERT_ERROR ();
      return valid;
    }
    }

  for (volid_iter = LOG_MAX_DBVOLID; volid_iter > LOG_MAX_DBVOLID - disk_Cache->nvols_temp; volid_iter--)
    {
      valid = disk_check_volume (thread_p, volid_iter, repair);
      if (valid != DISK_VALID)
    {
      assert (valid == DISK_ERROR);
      ASSERT_ERROR ();
      return valid;
    }
    }

  /* third step: check information aggregated for each purpose matches the sum of all volumes */
  error_code = csect_enter (thread_p, CSECT_DISK_CHECK, INF_WAIT);
  if (error_code != NO_ERROR)
    {
      ASSERT_ERROR ();
      return DISK_ERROR;
    }

  /* check permanently stored volumes */
  for (perm_free = 0, temp_free = 0, volid_iter = 0; volid_iter < disk_Cache->nvols_perm; volid_iter++)
    {
      if (disk_Cache->vols[volid_iter].purpose == DB_PERMANENT_DATA_PURPOSE)
    {
      perm_free += disk_Cache->vols[volid_iter].nsect_free;
    }
      else
    {
      temp_free += disk_Cache->vols[volid_iter].nsect_free;
    }
    }

  if (perm_free != disk_Cache->perm_purpose_info.extend_info.nsect_free)
    {
      assert (false);
      if (repair)
    {
      disk_Cache->perm_purpose_info.extend_info.nsect_free = perm_free;
    }
      else
    {
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_GENERIC_ERROR, 0);
      csect_exit (thread_p, CSECT_DISK_CHECK);
      return DISK_INVALID;
    }
    }

  if (temp_free != disk_Cache->temp_purpose_info.nsect_perm_free)
    {
      assert (false);
      if (repair)
    {
      disk_Cache->temp_purpose_info.nsect_perm_free = temp_free;
    }
      else
    {
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_GENERIC_ERROR, 0);
      csect_exit (thread_p, CSECT_DISK_CHECK);
      return DISK_INVALID;
    }
    }

  /* check temporarily stored volumes */
  for (temp_free = 0, volid_iter = LOG_MAX_DBVOLID; volid_iter > LOG_MAX_DBVOLID - disk_Cache->nvols_temp; volid_iter--)
    {
      temp_free += disk_Cache->vols[volid_iter].nsect_free;
    }

  if (temp_free != disk_Cache->temp_purpose_info.extend_info.nsect_free)
    {
      assert (false);
      if (repair)
    {
      disk_Cache->temp_purpose_info.extend_info.nsect_free = temp_free;
    }
      else
    {
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_GENERIC_ERROR, 0);
      csect_exit (thread_p, CSECT_DISK_CHECK);
      return DISK_INVALID;
    }
    }

  disk_log ("disk_check", "full check is %s", "valid");

  /* all valid or all repaired */
  csect_exit (thread_p, CSECT_DISK_CHECK);

  return DISK_VALID;
}

#if defined (SA_MODE)
/*
 * disk_map_clone_create () - create a clone of all permanent volumes sector table maps
 *
 * return               : error code
 * thread_p (in)        : thread entry
 * disk_map_clone (out) : disk sector table maps clone
 */
int
disk_map_clone_create (THREAD_ENTRY * thread_p, DISK_VOLMAP_CLONE ** disk_map_clone)
{
  VOLID iter;

  PAGE_PTR page_volheader = NULL;
  DISK_VOLUME_HEADER *volheader = NULL;

  VPID vpid_stab;
  PAGE_PTR page_stab = NULL;
  DKNSECTS nsects;
  int memsize;
  char *ptr_map = NULL;

  int error_code = NO_ERROR;

  *disk_map_clone = (DISK_VOLMAP_CLONE *) calloc (disk_Cache->nvols_perm, sizeof (DISK_VOLMAP_CLONE));
  if (*disk_map_clone == NULL)
    {
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1,
          disk_Cache->nvols_perm * sizeof (DISK_VOLMAP_CLONE));
      return ER_OUT_OF_VIRTUAL_MEMORY;
    }

  for (iter = 0; iter < disk_Cache->nvols_perm; iter++)
    {
      if (disk_Cache->vols[iter].purpose == DB_TEMPORARY_DATA_PURPOSE)
    {
      /* we don't consider volumes with temporary purpose here */
      continue;
    }

      error_code = disk_get_volheader (thread_p, iter, PGBUF_LATCH_READ, &page_volheader, &volheader);
      if (error_code != NO_ERROR)
    {
      ASSERT_ERROR ();
      goto exit;
    }

      (*disk_map_clone)[iter].size_map = volheader->nsect_total / CHAR_BIT;
      (*disk_map_clone)[iter].map = (char *) malloc ((*disk_map_clone)[iter].size_map);
      if ((*disk_map_clone)[iter].map == NULL)
    {
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, (*disk_map_clone)[iter].size_map);
      error_code = ER_OUT_OF_VIRTUAL_MEMORY;
      goto exit;
    }

      /* copy map */
      vpid_stab.volid = iter;
      vpid_stab.pageid = volheader->stab_first_page;
      ptr_map = (*disk_map_clone)[iter].map;
      for (nsects = volheader->nsect_total; nsects >= 0; nsects -= DISK_STAB_PAGE_BIT_COUNT)
    {
      memsize = MIN (DB_PAGESIZE, nsects / CHAR_BIT);

      page_stab = pgbuf_fix (thread_p, &vpid_stab, OLD_PAGE, PGBUF_LATCH_READ, PGBUF_UNCONDITIONAL_LATCH);
      if (page_stab == NULL)
        {
          ASSERT_ERROR_AND_SET (error_code);
          goto exit;
        }

      memcpy (ptr_map, page_stab, memsize);

      pgbuf_unfix_and_init (thread_p, page_stab);

      ptr_map += memsize;
      vpid_stab.pageid++;
    }
      assert (ptr_map == (*disk_map_clone)[iter].map + (*disk_map_clone)[iter].size_map);
      assert (vpid_stab.pageid <= volheader->stab_first_page + volheader->stab_npages);

      /* clear sectors used by system */
      ptr_map = (*disk_map_clone)[iter].map;
      for (nsects = SECTOR_FROM_PAGEID (volheader->sys_lastpage) + 1; nsects >= DISK_STAB_UNIT_BIT_COUNT;
       nsects -= DISK_STAB_UNIT_BIT_COUNT)
    {
      *(DISK_STAB_UNIT *) ptr_map = 0;
      ptr_map += DISK_STAB_UNIT_SIZE_OF;
    }

      /* clear trailing bits */
      *(DISK_STAB_UNIT *) ptr_map &= ~bit64_set_trailing_bits (0, nsects);

      pgbuf_unfix_and_init (thread_p, page_volheader);
    }

  /* disk map clone successfully created */
  assert (error_code == NO_ERROR);

exit:
  if (error_code != NO_ERROR)
    {
      disk_map_clone_free (disk_map_clone);
    }
  if (page_stab != NULL)
    {
      pgbuf_unfix (thread_p, page_stab);
    }
  if (page_volheader != NULL)
    {
      pgbuf_unfix (thread_p, page_volheader);
    }
  return error_code;
}

/*
 * disk_map_clone_free () - free sector table maps clone
 *
 * return                  : void
 * disk_map_clone (in/out) : sector table maps clone. it is set to NULL.
 */
void
disk_map_clone_free (DISK_VOLMAP_CLONE ** disk_map_clone)
{
  VOLID iter;

  for (iter = 0; iter < disk_Cache->nvols_perm; iter++)
    {
      free ((*disk_map_clone)[iter].map);
    }
  free_and_init (*disk_map_clone);
}

/*
 * disk_map_clone_clear () - clear the bit for given VSID in sector table map clone
 *
 * return                  : DISK_VALID if all checks pass, DISK_INVALID otherwise
 * disk_map_clone (in/out) : sector table maps clone
 * vsid (in)               : VSID
 */
DISK_ISVALID
disk_map_clone_clear (VSID * vsid, DISK_VOLMAP_CLONE * disk_map_clone)
{
  int offset_unit = vsid->sectid / DISK_STAB_UNIT_BIT_COUNT;
  int offset_bit = vsid->sectid % DISK_STAB_UNIT_BIT_COUNT;
  DISK_STAB_UNIT *unit = ((DISK_STAB_UNIT *) disk_map_clone[vsid->volid].map) + offset_unit;

  if (vsid->sectid > disk_map_clone[vsid->volid].size_map * CHAR_BIT)
    {
      /* overflow */
      assert_release (false);
      return DISK_INVALID;
    }

  if (!bit64_is_set (*unit, offset_bit))
    {
      /* not reserved */
      assert_release (false);
      return DISK_INVALID;
    }

  *unit = bit64_clear (*unit, offset_bit);

  return DISK_VALID;
}

/*
 * disk_map_clone_check_leaks () - check disk map clone for sector leaks
 *
 * return              : DISK_VALID if no leak, DISK_INVALID otherwise
 * disk_map_clone (in) : disk map clone
 */
DISK_ISVALID
disk_map_clone_check_leaks (DISK_VOLMAP_CLONE * disk_map_clone)
{
  DISK_STAB_UNIT *unit;
  VOLID volid;

  for (volid = 0; volid < disk_Cache->nvols_perm; volid++)
    {
      for (unit = (DISK_STAB_UNIT *) disk_map_clone[volid].map;
       (char *) unit < disk_map_clone[volid].map + disk_map_clone[volid].size_map; unit++)
    {
      if (*unit != 0)
        {
          /* leaked sectors */
          assert_release (false);
          return DISK_INVALID;
        }
    }
    }

  return DISK_VALID;
}
#endif /* SA_MODE */

#if !defined (NDEBUG)
void
disk_volheader_check_magic (THREAD_ENTRY * thread_p, const PAGE_PTR page_volheader)
{
  DISK_VOLUME_HEADER *volheader;

  (void) pgbuf_check_page_ptype (thread_p, page_volheader, PAGE_VOLHEADER);

  volheader = (DISK_VOLUME_HEADER *) page_volheader;
  assert (strncmp (volheader->magic, CUBRID_MAGIC_DATABASE_VOLUME, CUBRID_MAGIC_MAX_LENGTH) == 0);
}
#endif /* !NDEBUG */

/*
 * disk_sectors_to_extend_npages () - compute the rounded number of sectors necessary to extend a number of pages
 *
 * return     : The number of sectors
 * num_pages (in) : required number of pages
 **/
int
disk_sectors_to_extend_npages (const int num_pages)
{
  return DISK_SECTS_ROUND_UP (DISK_PAGES_TO_SECTS (num_pages));
}

/************************************************************************/
/* End of file                                                          */
/************************************************************************/