File compactdb_sr.c¶
File List > cubrid > src > storage > compactdb_sr.c
Go to the documentation of this file
/*
* Copyright 2008 Search Solution Corporation
* Copyright 2016 CUBRID Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#ident "$Id$"
#include "config.h"
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/stat.h>
#if defined(SOLARIS)
#include <netdb.h>
#endif /* SOLARIS */
#include <assert.h>
#include "btree.h" // for SINGLE_ROW_UPDATE
#include "thread_compat.hpp"
#include "heap_file.h"
#include "dbtype.h"
#include "boot_sr.h"
#include "locator_sr.h"
#include "set_object.h"
#include "xserver_interface.h"
#include "server_interface.h"
// XXX: SHOULD BE THE LAST INCLUDE HEADER
#include "memory_wrapper.hpp"
static bool compact_started = false;
static int last_tran_index = -1;
static bool is_class (OID * obj_oid, OID * class_oid);
static int process_set (THREAD_ENTRY * thread_p, DB_SET * set);
static int process_value (THREAD_ENTRY * thread_p, DB_VALUE * value);
static int process_object (THREAD_ENTRY * thread_p, HEAP_SCANCACHE * upd_scancache, HEAP_CACHE_ATTRINFO * attr_info,
OID * oid);
static int desc_disk_to_attr_info (THREAD_ENTRY * thread_p, OID * oid, RECDES * recdes,
HEAP_CACHE_ATTRINFO * attr_info);
static int process_class (THREAD_ENTRY * thread_p, OID * class_oid, HFID * hfid, int max_space_to_process,
int *instance_lock_timeout, int *space_to_process, OID * last_processed_oid,
int *total_objects, int *failed_objects, int *modified_objects, int *big_objects);
/*
* is_class () - check if an object is a class
*
* return : bool
* obj_oid - the oid of the object
* class_oid - the class oid of obj_oid
*
*/
static bool
is_class (OID * obj_oid, OID * class_oid)
{
if (OID_EQ (class_oid, oid_Root_class_oid))
{
return true;
}
return false;
}
/*
* process_value () - process a value
*
* return : error status
* value(in,out) - the processed value
*
*/
static int
process_value (THREAD_ENTRY * thread_p, DB_VALUE * value)
{
int return_value = 0;
SCAN_CODE scan_code;
switch (DB_VALUE_TYPE (value))
{
case DB_TYPE_OID:
{
OID *ref_oid;
OID ref_class_oid;
HEAP_SCANCACHE scan_cache;
ref_oid = db_get_oid (value);
if (OID_ISNULL (ref_oid))
{
break;
}
heap_scancache_quick_start (&scan_cache);
scan_cache.mvcc_snapshot = logtb_get_mvcc_snapshot (thread_p);
scan_code = heap_get_visible_version (thread_p, ref_oid, &ref_class_oid, NULL, &scan_cache, PEEK, NULL_CHN);
heap_scancache_end (thread_p, &scan_cache);
if (scan_code == S_ERROR)
{
ASSERT_ERROR_AND_SET (return_value);
break;
}
else if (scan_code != S_SUCCESS)
{
OID_SET_NULL (ref_oid);
return_value = 1;
break;
}
if (is_class (ref_oid, &ref_class_oid))
{
break;
}
#if defined(CUBRID_DEBUG)
printf (msgcat_message (MSGCAT_CATALOG_UTILS, MSGCAT_UTIL_SET_COMPACTDB, COMPACTDB_MSG_REFOID), ref_oid->volid,
ref_oid->pageid, ref_oid->slotid, ref_class_oid.volid, ref_class_oid.pageid, ref_class_oid.slotid);
#endif
break;
}
case DB_TYPE_POINTER:
case DB_TYPE_SET:
case DB_TYPE_MULTISET:
case DB_TYPE_SEQUENCE:
{
return_value = process_set (thread_p, db_get_set (value));
break;
}
default:
break;
}
return return_value;
}
/*
* process_set - process one set
* return: whether the object should be updated.
* set(in): the set to process
*/
static int
process_set (THREAD_ENTRY * thread_p, DB_SET * set)
{
SET_ITERATOR *it;
DB_VALUE *element_value;
int return_value = 0;
int error_code;
it = set_iterate (set);
while ((element_value = set_iterator_value (it)) != NULL)
{
error_code = process_value (thread_p, element_value);
if (error_code > 0)
{
return_value += error_code;
}
else if (error_code != NO_ERROR)
{
set_iterator_free (it);
return error_code;
}
set_iterator_next (it);
}
set_iterator_free (it);
return return_value;
}
/*
* process_object - process an object
* return: 1 - object has changed
* 0 - object has not changed
* -1 - object failed
* upd_scancache(in):
* attr_info(in):
* oid(in): the oid of the object to process
*/
static int
process_object (THREAD_ENTRY * thread_p, HEAP_SCANCACHE * upd_scancache, HEAP_CACHE_ATTRINFO * attr_info, OID * oid)
{
int i, result = 0;
HEAP_ATTRVALUE *value = NULL;
int force_count = 0, updated_n_attrs_id = 0;
int *atts_id = NULL;
int error_code;
SCAN_CODE scan_code;
RECDES copy_recdes;
if (upd_scancache == NULL || attr_info == NULL || oid == NULL)
{
return -1;
}
copy_recdes.data = NULL;
/* get object with X_LOCK */
scan_code = locator_lock_and_get_object (thread_p, oid, &upd_scancache->node.class_oid, ©_recdes, upd_scancache,
X_LOCK, COPY, NULL_CHN, LOG_WARNING_IF_DELETED);
if (scan_code != S_SUCCESS)
{
if (er_errid () == ER_HEAP_UNKNOWN_OBJECT)
{
assert (scan_code == S_DOESNT_EXIST || scan_code == S_SNAPSHOT_NOT_SATISFIED);
er_clear ();
}
if (scan_code == S_DOESNT_EXIST || scan_code == S_SNAPSHOT_NOT_SATISFIED)
{
return 0;
}
return -1;
}
atts_id = (int *) db_private_alloc (thread_p, attr_info->num_values * sizeof (int));
if (atts_id == NULL)
{
return -1;
}
for (i = 0, value = attr_info->values; i < attr_info->num_values; i++, value++)
{
error_code = process_value (thread_p, &value->dbvalue);
if (error_code > 0)
{
value->state = HEAP_WRITTEN_ATTRVALUE;
atts_id[updated_n_attrs_id] = value->attrid;
updated_n_attrs_id++;
}
else if (error_code != NO_ERROR)
{
db_private_free (thread_p, atts_id);
return error_code;
}
}
if ((updated_n_attrs_id > 0)
|| (attr_info->read_classrepr != NULL && attr_info->last_classrepr != NULL
&& attr_info->read_classrepr->id != attr_info->last_classrepr->id))
{
/* oid already locked at locator_lock_and_get_object */
error_code =
locator_attribute_info_force (thread_p, &upd_scancache->node.hfid, oid, attr_info, atts_id, updated_n_attrs_id,
LC_FLUSH_UPDATE, SINGLE_ROW_UPDATE, upd_scancache, &force_count, false,
REPL_INFO_TYPE_RBR_NORMAL, DB_NOT_PARTITIONED_CLASS, NULL, NULL, NULL,
UPDATE_INPLACE_NONE, ©_recdes, false);
if (error_code != NO_ERROR)
{
if (error_code == ER_MVCC_NOT_SATISFIED_REEVALUATION)
{
result = 0;
}
else
{
result = -1;
}
}
else
{
result = 1;
}
}
if (atts_id)
{
db_private_free (thread_p, atts_id);
atts_id = NULL;
}
return result;
}
/*
* desc_disk_to_attr_info - convert RECDES for specified oid to
* HEAP_CACHE_ATTRINFO structure
* return: error status
* oid(in): the oid of the object
* recdes(in): RECDES structure to convert
* attr_info(out): the HEAP_CACHE_ATTRINFO structure
*/
static int
desc_disk_to_attr_info (THREAD_ENTRY * thread_p, OID * oid, RECDES * recdes, HEAP_CACHE_ATTRINFO * attr_info)
{
if (oid == NULL || recdes == NULL || attr_info == NULL)
{
return ER_FAILED;
}
if (heap_attrinfo_clear_dbvalues (attr_info) != NO_ERROR)
{
return ER_FAILED;
}
if (heap_attrinfo_read_dbvalues (thread_p, oid, recdes, attr_info) != NO_ERROR)
{
return ER_FAILED;
}
return NO_ERROR;
}
/*
* process_class - process a class
* HEAP_CACHE_ATTRINFO structure
* return: error status
* class_oid(in): the class OID
* hfid(in): the class HFID
* max_space_to_process(in): maximum space to process
* instance_lock_timeout(in): the lock timeout for instances
* space_to_process(in, out): space to process
* last_processed_oid(in, out): last processed oid
* total_objects(in, out): count the processed class objects
* failed_objects(in, out): count the failed class objects
* modified_objects(in, out): count the modified class objects
* big_objects(in, out): count the big class objects
*/
static int
process_class (THREAD_ENTRY * thread_p, OID * class_oid, HFID * hfid, int max_space_to_process,
int *instance_lock_timeout, int *space_to_process, OID * last_processed_oid, int *total_objects,
int *failed_objects, int *modified_objects, int *big_objects)
{
int nobjects, nfetched, i, j;
OID last_oid, prev_oid;
LOCK null_lock = NULL_LOCK;
LOCK oid_lock = X_LOCK;
LC_COPYAREA *fetch_area = NULL; /* Area where objects are received */
struct lc_copyarea_manyobjs *mobjs; /* Describe multiple objects in area */
struct lc_copyarea_oneobj *obj; /* Describe on object in area */
RECDES recdes;
HEAP_CACHE_ATTRINFO attr_info;
HEAP_SCANCACHE upd_scancache;
int ret = NO_ERROR, object_processed;
MVCC_SNAPSHOT *mvcc_snapshot = NULL;
int nfailed_instances = 0;
if (class_oid == NULL || hfid == NULL || space_to_process == NULL || *space_to_process <= 0
|| *space_to_process > max_space_to_process || last_processed_oid == NULL || total_objects == NULL
|| failed_objects == NULL || modified_objects == NULL || big_objects == NULL || *total_objects < 0
|| *failed_objects < 0)
{
return ER_FAILED;
}
if (!OID_IS_ROOTOID (class_oid))
{
mvcc_snapshot = logtb_get_mvcc_snapshot (thread_p);
if (mvcc_snapshot == NULL)
{
return ER_FAILED;
}
}
nobjects = 0;
nfetched = -1;
ret = heap_scancache_start_modify (thread_p, &upd_scancache, hfid, class_oid, SINGLE_ROW_UPDATE, NULL);
if (ret != NO_ERROR)
{
return ER_FAILED;
}
ret = heap_attrinfo_start (thread_p, class_oid, -1, NULL, &attr_info);
if (ret != NO_ERROR)
{
heap_scancache_end_modify (thread_p, &upd_scancache);
return ER_FAILED;
}
COPY_OID (&last_oid, last_processed_oid);
COPY_OID (&prev_oid, last_processed_oid);
while (nobjects != nfetched)
{
ret =
xlocator_lock_and_fetch_all (thread_p, hfid, &oid_lock, instance_lock_timeout, class_oid, &null_lock, &nobjects,
&nfetched, &nfailed_instances, &last_oid, &fetch_area, mvcc_snapshot);
if (ret == NO_ERROR)
{
(*total_objects) += nfailed_instances;
(*failed_objects) += nfailed_instances;
if (fetch_area != NULL)
{
mobjs = LC_MANYOBJS_PTR_IN_COPYAREA (fetch_area);
obj = LC_START_ONEOBJ_PTR_IN_COPYAREA (mobjs);
for (i = 0; i < mobjs->num_objs; i++)
{
if (obj->length > *space_to_process)
{
if (*space_to_process == max_space_to_process)
{
(*total_objects)++;
(*big_objects)++;
lock_unlock_object (thread_p, &obj->oid, class_oid, oid_lock, true);
}
else
{
*space_to_process = 0;
COPY_OID (last_processed_oid, &prev_oid);
for (j = i; j < mobjs->num_objs; j++)
{
lock_unlock_object (thread_p, &obj->oid, class_oid, oid_lock, true);
obj = LC_NEXT_ONEOBJ_PTR_IN_COPYAREA (obj);
}
if (fetch_area)
{
locator_free_copy_area (fetch_area);
}
goto end;
}
}
else
{
*space_to_process -= obj->length;
(*total_objects)++;
LC_RECDES_TO_GET_ONEOBJ (fetch_area, obj, &recdes);
if (desc_disk_to_attr_info (thread_p, &obj->oid, &recdes, &attr_info) == NO_ERROR)
{
object_processed = process_object (thread_p, &upd_scancache, &attr_info, &obj->oid);
if (object_processed != 1)
{
lock_unlock_object (thread_p, &obj->oid, class_oid, oid_lock, true);
if (object_processed == -1)
{
(*failed_objects)++;
}
}
else
{
(*modified_objects)++;
}
}
else
{
(*failed_objects)++;
}
}
COPY_OID (&prev_oid, &obj->oid);
obj = LC_NEXT_ONEOBJ_PTR_IN_COPYAREA (obj);
}
if (fetch_area)
{
locator_free_copy_area (fetch_area);
}
}
else
{
/* No more objects */
break;
}
}
else
{
ret = ER_FAILED;
break;
}
}
COPY_OID (last_processed_oid, &last_oid);
end:
heap_attrinfo_end (thread_p, &attr_info);
heap_scancache_end_modify (thread_p, &upd_scancache);
return ret;
}
/*
* boot_compact_db - compact specified classes
* HEAP_CACHE_ATTRINFO structure
* return: error status
* class_oids(in): the classes list
* n_classes(in): the class_oids length
* hfids(in): the hfid list
* space_to_process(in): the space to process
* instance_lock_timeout(in): the lock timeout for instances
* class_lock_timeout(in): the lock timeout for instances
* delete_old_repr(in): whether to delete the old class representation
* last_processed_class_oid(in,out): last processed class oid
* last_processed_oid(in,out): last processed oid
* total_objects(out): count processed objects for each class
* failed_objects(out): count failed objects for each class
* modified_objects(out): count modified objects for each class
* big_objects(out): count big objects for each class
* initial_last_repr_id(in, out): the list of initial last class
* representation
*/
int
boot_compact_db (THREAD_ENTRY * thread_p, OID * class_oids, int n_classes, int space_to_process,
int instance_lock_timeout, int class_lock_timeout, bool delete_old_repr,
OID * last_processed_class_oid, OID * last_processed_oid, int *total_objects, int *failed_objects,
int *modified_objects, int *big_objects, int *initial_last_repr_id)
{
int result = NO_ERROR;
int i, j, start_index = -1;
int max_space_to_process;
int lock_ret;
HFID hfid;
if (boot_can_compact (thread_p) == false)
{
return ER_COMPACTDB_ALREADY_STARTED;
}
if (class_oids == NULL || n_classes <= 0 || space_to_process <= 0 || last_processed_class_oid == NULL
|| last_processed_oid == NULL || total_objects == NULL || failed_objects == NULL || modified_objects == NULL
|| big_objects == NULL || initial_last_repr_id == NULL)
{
return ER_QPROC_INVALID_PARAMETER;
}
for (start_index = 0; start_index < n_classes; start_index++)
{
if (OID_EQ (class_oids + start_index, last_processed_class_oid))
{
break;
}
}
if (start_index == n_classes)
{
return ER_QPROC_INVALID_PARAMETER;
}
for (i = 0; i < n_classes; i++)
{
total_objects[i] = 0;
failed_objects[i] = 0;
modified_objects[i] = 0;
big_objects[i] = 0;
}
max_space_to_process = space_to_process;
for (i = start_index; i < n_classes; i++)
{
lock_ret =
lock_object_wait_msecs (thread_p, class_oids + i, oid_Root_class_oid, IX_LOCK, LK_UNCOND_LOCK,
class_lock_timeout);
if (lock_ret != LK_GRANTED)
{
total_objects[i] = COMPACTDB_LOCKED_CLASS;
OID_SET_NULL (last_processed_oid);
continue;
}
if (heap_get_class_info (thread_p, class_oids + i, &hfid, NULL, NULL) != NO_ERROR)
{
lock_unlock_object (thread_p, class_oids + i, oid_Root_class_oid, IX_LOCK, true);
OID_SET_NULL (last_processed_oid);
total_objects[i] = COMPACTDB_INVALID_CLASS;
continue;
}
if (HFID_IS_NULL (&hfid))
{
lock_unlock_object (thread_p, class_oids + i, oid_Root_class_oid, IX_LOCK, true);
OID_SET_NULL (last_processed_oid);
total_objects[i] = COMPACTDB_INVALID_CLASS;
continue;
}
if (OID_ISNULL (last_processed_oid))
{
initial_last_repr_id[i] = heap_get_class_repr_id (thread_p, class_oids + i);
if (initial_last_repr_id[i] <= 0)
{
lock_unlock_object (thread_p, class_oids + i, oid_Root_class_oid, IX_LOCK, true);
total_objects[i] = COMPACTDB_INVALID_CLASS;
continue;
}
}
if (process_class
(thread_p, class_oids + i, &hfid, max_space_to_process, &instance_lock_timeout, &space_to_process,
last_processed_oid, total_objects + i, failed_objects + i, modified_objects + i,
big_objects + i) != NO_ERROR)
{
OID_SET_NULL (last_processed_oid);
for (j = start_index; j <= i; j++)
{
total_objects[j] = COMPACTDB_UNPROCESSED_CLASS;
failed_objects[j] = 0;
modified_objects[j] = 0;
big_objects[j] = 0;
}
result = ER_FAILED;
break;
}
if (delete_old_repr && OID_ISNULL (last_processed_oid) && failed_objects[i] == 0
&& heap_get_class_repr_id (thread_p, class_oids + i) == initial_last_repr_id[i])
{
lock_ret =
lock_object_wait_msecs (thread_p, class_oids + i, oid_Root_class_oid, X_LOCK, LK_UNCOND_LOCK,
class_lock_timeout);
if (lock_ret == LK_GRANTED)
{
if (catalog_drop_old_representations (thread_p, class_oids + i) != NO_ERROR)
{
for (j = start_index; j <= i; j++)
{
total_objects[j] = COMPACTDB_UNPROCESSED_CLASS;
failed_objects[j] = 0;
modified_objects[j] = 0;
big_objects[j] = 0;
}
result = ER_FAILED;
}
else
{
initial_last_repr_id[i] = COMPACTDB_REPR_DELETED;
}
break;
}
}
if (space_to_process == 0)
{
break;
}
}
if (OID_ISNULL (last_processed_oid))
{
if (i < n_classes - 1)
{
COPY_OID (last_processed_class_oid, class_oids + i + 1);
}
else
{
OID_SET_NULL (last_processed_class_oid);
}
}
else
{
COPY_OID (last_processed_class_oid, class_oids + i);
}
return result;
}
/*
* heap_compact_pages () - compact all pages from hfid of specified class OID
* return: error_code
* class_oid(out): the class oid
*/
int
boot_heap_compact_pages (THREAD_ENTRY * thread_p, OID * class_oid)
{
if (boot_can_compact (thread_p) == false)
{
return ER_COMPACTDB_ALREADY_STARTED;
}
return heap_compact_pages (thread_p, class_oid);
}
/*
* boot_compact_start () - start database compaction
* return: error_code
*/
int
boot_compact_start (THREAD_ENTRY * thread_p)
{
int current_tran_index = -1;
if (csect_enter (thread_p, CSECT_COMPACTDB_ONE_INSTANCE, INF_WAIT) != NO_ERROR)
{
return ER_FAILED;
}
current_tran_index = LOG_FIND_THREAD_TRAN_INDEX (thread_p);
if (current_tran_index != last_tran_index && compact_started == true)
{
csect_exit (thread_p, CSECT_COMPACTDB_ONE_INSTANCE);
return ER_COMPACTDB_ALREADY_STARTED;
}
last_tran_index = current_tran_index;
compact_started = true;
csect_exit (thread_p, CSECT_COMPACTDB_ONE_INSTANCE);
return NO_ERROR;
}
/*
* boot_compact_stop () - stop database compaction
* return: error_code
*/
int
boot_compact_stop (THREAD_ENTRY * thread_p)
{
int current_tran_index = -1;
if (csect_enter (thread_p, CSECT_COMPACTDB_ONE_INSTANCE, INF_WAIT) != NO_ERROR)
{
return ER_FAILED;
}
current_tran_index = LOG_FIND_THREAD_TRAN_INDEX (thread_p);
if (current_tran_index != last_tran_index && compact_started == true)
{
csect_exit (thread_p, CSECT_COMPACTDB_ONE_INSTANCE);
return ER_FAILED;
}
last_tran_index = -1;
compact_started = false;
csect_exit (thread_p, CSECT_COMPACTDB_ONE_INSTANCE);
return NO_ERROR;
}
/*
* boot_can_compact () - check if the current transaction can compact the database
* return: bool
*/
bool
boot_can_compact (THREAD_ENTRY * thread_p)
{
int current_tran_index = -1;
if (csect_enter (thread_p, CSECT_COMPACTDB_ONE_INSTANCE, INF_WAIT) != NO_ERROR)
{
return false;
}
current_tran_index = LOG_FIND_THREAD_TRAN_INDEX (thread_p);
if (current_tran_index != last_tran_index && compact_started == true)
{
csect_exit (thread_p, CSECT_COMPACTDB_ONE_INSTANCE);
return false;
}
csect_exit (thread_p, CSECT_COMPACTDB_ONE_INSTANCE);
return true;
}