File log_tran_table.c¶
File List > cubrid > src > transaction > log_tran_table.c
Go to the documentation of this file
/*
* Copyright 2008 Search Solution Corporation
* Copyright 2016 CUBRID Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
/*
* log_tran_table.c -
*/
#ident "$Id$"
#if !defined(WINDOWS)
#define __STDC_FORMAT_MACROS
#include <inttypes.h>
#endif
#include "config.h"
#include <stdio.h>
#include <stddef.h>
#include <stdlib.h>
#include <stdarg.h>
#include <string.h>
#include <fcntl.h>
#include <time.h>
#include <limits.h>
#if defined(SOLARIS)
/* for MAXHOSTNAMELEN */
#include <netdb.h>
#endif /* SOLARIS */
#include <sys/stat.h>
#include <assert.h>
#include "dbtran_def.h"
#include "log_impl.h"
#include "log_lsa.hpp"
#include "log_manager.h"
#include "log_system_tran.hpp"
#include "memory_private_allocator.hpp"
#include "object_representation.h"
#include "error_manager.h"
#include "system_parameter.h"
#include "xserver_interface.h"
#include "file_manager.h"
#include "query_manager.h"
#include "query_monitoring.hpp"
#include "partition_sr.h"
#include "btree_load.h"
#include "serial.h"
#include "show_scan.h"
#include "boot_sr.h"
#include "tz_support.h"
#include "db_date.h"
#include "dbtype.h"
#if defined (SERVER_MODE)
#include "server_support.h"
#endif // SERVER_MODE
#include "string_buffer.hpp"
#if defined (SA_MODE)
#include "transaction_cl.h" /* for interrupt */
#endif /* defined (SA_MODE) */
#include "thread_entry.hpp"
#include "thread_manager.hpp"
#include "xasl.h"
#include "xasl_cache.h"
#include "session.h"
#include "pl_session.hpp"
// XXX: SHOULD BE THE LAST INCLUDE HEADER
#include "memory_wrapper.hpp"
#define RMUTEX_NAME_TDES_TOPOP "TDES_TOPOP"
#define NUM_ASSIGNED_TRAN_INDICES log_Gl.trantable.num_assigned_indices
#define NUM_TOTAL_TRAN_INDICES log_Gl.trantable.num_total_indices
#if !defined(SERVER_MODE)
#define pthread_mutex_init(a, b)
#define pthread_mutex_destroy(a)
#define pthread_mutex_lock(a) 0
#define pthread_mutex_trylock(a) 0
#define pthread_mutex_unlock(a)
#endif /* not SERVER_MODE */
static const int LOG_MAX_NUM_CONTIGUOUS_TDES = INT_MAX / sizeof (LOG_TDES);
static const float LOG_EXPAND_TRANTABLE_RATIO = 1.25; /* Increase table by 25% */
static const int LOG_TOPOPS_STACK_INCREMENT = 3; /* No more than 3 nested top system operations */
static BOOT_CLIENT_CREDENTIAL log_Client_credential;
static const unsigned int LOGTB_RETRY_SLAM_MAX_TIMES = 10;
static int logtb_expand_trantable (THREAD_ENTRY * thread_p, int num_new_indices);
static int logtb_allocate_tran_index (THREAD_ENTRY * thread_p, TRANID trid, TRAN_STATE state,
const BOOT_CLIENT_CREDENTIAL * client_credential, TRAN_STATE * current_state,
int wait_msecs, TRAN_ISOLATION isolation);
static LOG_ADDR_TDESAREA *logtb_allocate_tdes_area (int num_indices);
static void logtb_initialize_trantable (TRANTABLE * trantable_p);
static int logtb_initialize_system_tdes (THREAD_ENTRY * thread_p);
static void logtb_set_number_of_assigned_tran_indices (int num_trans);
static void logtb_increment_number_of_assigned_tran_indices ();
static void logtb_decrement_number_of_assigned_tran_indices ();
static void logtb_set_number_of_total_tran_indices (int num_total_trans);
static void logtb_set_loose_end_tdes (LOG_TDES * tdes);
static bool logtb_is_interrupted_tdes (THREAD_ENTRY * thread_p, LOG_TDES * tdes, bool clear, bool * continue_checking);
static void logtb_dump_tdes_distribute_transaction (FILE * out_fp, int global_tran_id, LOG_2PC_COORDINATOR * coord);
static void logtb_dump_top_operations (FILE * out_fp, LOG_TOPOPS_STACK * topops_p);
static void logtb_dump_tdes (FILE * out_fp, LOG_TDES * tdes);
static void logtb_set_tdes (THREAD_ENTRY * thread_p, LOG_TDES * tdes, const BOOT_CLIENT_CREDENTIAL * client_credential,
int wait_msecs, TRAN_ISOLATION isolation);
static void logtb_tran_free_update_stats (LOG_TRAN_UPDATE_STATS * log_upd_stats);
static void logtb_tran_clear_update_stats (LOG_TRAN_UPDATE_STATS * log_upd_stats);
static unsigned int logtb_tran_btid_hash_func (const void *key, const unsigned int ht_size);
static int logtb_tran_btid_hash_cmp_func (const void *key1, const void *key2);
static LOG_TRAN_CLASS_COS *logtb_tran_create_class_cos (THREAD_ENTRY * thread_p, const OID * class_oid);
static LOG_TRAN_BTID_UNIQUE_STATS *logtb_tran_create_btid_unique_stats (THREAD_ENTRY * thread_p, const BTID * btid);
static int logtb_tran_update_delta_hash_func (THREAD_ENTRY * thread_p, void *data, void *args);
static int logtb_tran_load_global_stats_func (THREAD_ENTRY * thread_p, void *data, void *args);
static int logtb_tran_reset_cos_func (THREAD_ENTRY * thread_p, void *data, void *args);
static int logtb_create_unique_stats_from_repr (THREAD_ENTRY * thread_p, OID * class_oid);
static GLOBAL_UNIQUE_STATS *logtb_get_global_unique_stats_entry (THREAD_ENTRY * thread_p, BTID * btid,
bool load_at_creation);
static void *logtb_global_unique_stat_alloc (void);
static int logtb_global_unique_stat_free (void *unique_stat);
static int logtb_global_unique_stat_init (void *unique_stat);
static int logtb_global_unique_stat_key_copy (void *src, void *dest);
static void logtb_free_tran_mvcc_info (LOG_TDES * tdes);
static void logtb_assign_subtransaction_mvccid (THREAD_ENTRY * thread_p, MVCC_INFO * curr_mvcc_info, MVCCID mvcc_subid);
static int logtb_check_kill_tran_auth (THREAD_ENTRY * thread_p, int tran_id, bool * has_authorization);
static void logtb_find_thread_entry_mapfunc (THREAD_ENTRY & thread_ref, bool & stop_mapper, int tran_index,
bool except_me, REFPTR (THREAD_ENTRY, found_ptr));
/*
* logtb_realloc_topops_stack - realloc stack of top system operations
*
* return: stack or NULL
*
* tdes(in): State structure of transaction to realloc stack
* num_elms(in):
*
* Note: Realloc the current transaction top system operation stack by
* the given number of entries.
*/
void *
logtb_realloc_topops_stack (LOG_TDES * tdes, int num_elms)
{
size_t size;
void *newptr;
if (num_elms < LOG_TOPOPS_STACK_INCREMENT)
{
num_elms = LOG_TOPOPS_STACK_INCREMENT;
}
size = tdes->topops.max + num_elms;
size = size * sizeof (*tdes->topops.stack);
newptr = (LOG_TOPOPS_ADDRESSES *) realloc (tdes->topops.stack, size);
if (newptr != NULL)
{
tdes->topops.stack = (LOG_TOPOPS_ADDRESSES *) newptr;
if (tdes->topops.max == 0)
{
tdes->topops.last = -1;
}
tdes->topops.max += num_elms;
}
else
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, size);
return NULL;
}
return tdes->topops.stack;
}
/*
* logtb_allocate_tdes_area -
*
* return:
*
* num_indices(in):
*
* Note:
*/
static LOG_ADDR_TDESAREA *
logtb_allocate_tdes_area (int num_indices)
{
LOG_ADDR_TDESAREA *area; /* Contiguous area for new transaction indices */
LOG_TDES *tdes; /* Transaction descriptor */
int i, tran_index;
/*
* Allocate an area for the transaction descriptors, set the address of
* each transaction descriptor, and keep the address of the area for
* deallocation purposes at shutdown time.
*/
area = (LOG_ADDR_TDESAREA *) malloc (sizeof (LOG_ADDR_TDESAREA));
if (area == NULL)
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, sizeof (LOG_ADDR_TDESAREA));
return NULL;
}
area->tdesarea = new LOG_TDES[num_indices];
if (area->tdesarea == NULL)
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, num_indices * sizeof (LOG_TDES));
return NULL;
}
area->next = log_Gl.trantable.area;
/*
* Initialize every newly created transaction descriptor index
*/
for (i = 0, tran_index = NUM_TOTAL_TRAN_INDICES; i < num_indices; tran_index++, i++)
{
tdes = log_Gl.trantable.all_tdes[tran_index] = &area->tdesarea[i];
logtb_initialize_tdes (tdes, i);
}
return area;
}
/*
* logtb_expand_trantable - expand the transaction table
*
* return: NO_ERROR if all OK, ER_ status otherwise
*
* num_new_indices(in): Number of indices of expansion.
* (i.e., threads/clients of execution)
*
* Note: Expand the transaction table with the number of given indices.
*/
static int
logtb_expand_trantable (THREAD_ENTRY * thread_p, int num_new_indices)
{
LOG_ADDR_TDESAREA *area; /* Contiguous area for new transaction indices */
int total_indices; /* Total number of transaction indices */
int i;
int error_code = NO_ERROR;
#if defined(SERVER_MODE)
/*
* When second time this function invoked during normal processing,
* just return.
*/
total_indices = MAX_NTRANS;
if (log_Gl.rcv_phase == LOG_RESTARTED && total_indices <= NUM_TOTAL_TRAN_INDICES)
{
return NO_ERROR;
}
#endif /* SERVER_MODE */
while (num_new_indices > LOG_MAX_NUM_CONTIGUOUS_TDES)
{
error_code = logtb_expand_trantable (thread_p, LOG_MAX_NUM_CONTIGUOUS_TDES);
if (error_code != NO_ERROR)
{
goto error;
}
num_new_indices -= LOG_MAX_NUM_CONTIGUOUS_TDES;
}
if (num_new_indices <= 0)
{
return NO_ERROR;
}
#if defined(SERVER_MODE)
if (log_Gl.rcv_phase != LOG_RESTARTED)
{
total_indices = NUM_TOTAL_TRAN_INDICES + num_new_indices;
}
#else /* SERVER_MODE */
total_indices = NUM_TOTAL_TRAN_INDICES + num_new_indices;
#endif
/*
* NOTE that this realloc is OK since we are in a critical section.
* Nobody should have pointer to transaction table
*/
i = total_indices * sizeof (*log_Gl.trantable.all_tdes);
log_Gl.trantable.all_tdes = (LOG_TDES **) realloc (log_Gl.trantable.all_tdes, i);
if (log_Gl.trantable.all_tdes == NULL)
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, (size_t) i);
error_code = ER_OUT_OF_VIRTUAL_MEMORY;
goto error;
}
area = logtb_allocate_tdes_area (num_new_indices);
if (area == NULL)
{
error_code = ER_OUT_OF_VIRTUAL_MEMORY;
goto error;
}
/*
* Notify other modules of new number of transaction indices
*/
#if defined(ENABLE_UNUSED_FUNCTION)
error_code = wfg_alloc_nodes (thread_p, total_indices);
if (error_code != NO_ERROR)
{
/* *INDENT-OFF* */
delete [] area->tdesarea;
/* *INDENT-ON* */
free_and_init (area);
goto error;
}
#endif
if (qmgr_allocate_tran_entries (thread_p, total_indices) != NO_ERROR)
{
/* *INDENT-OFF* */
delete [] area->tdesarea;
/* *INDENT-ON* */
free_and_init (area);
error_code = ER_FAILED;
goto error;
}
log_Gl.trantable.area = area;
log_Gl.trantable.hint_free_index = NUM_TOTAL_TRAN_INDICES;
logtb_set_number_of_total_tran_indices (total_indices);
// make sure MVCC table resizes if necessary
log_Gl.mvcc_table.alloc_transaction_lowest_active ();
return error_code;
/* **** */
error:
return error_code;
}
/*
* logtb_define_trantable - define the transaction table
*
* return: nothing
*
* num_expected_tran_indices(in): Number of expected concurrent transactions
* (i.e., threads/clients of execution)
* num_expected_locks(in): Number of expected locks
*
* Note: Define the transaction table which is used to support the
* number of expected transactions.
*/
void
logtb_define_trantable (THREAD_ENTRY * thread_p, int num_expected_tran_indices, int num_expected_locks)
{
LOG_SET_CURRENT_TRAN_INDEX (thread_p, LOG_SYSTEM_TRAN_INDEX);
LOG_CS_ENTER (thread_p);
TR_TABLE_CS_ENTER (thread_p);
if (logpb_is_pool_initialized ())
{
logpb_finalize_pool (thread_p);
}
(void) logtb_define_trantable_log_latch (thread_p, num_expected_tran_indices);
LOG_SET_CURRENT_TRAN_INDEX (thread_p, LOG_SYSTEM_TRAN_INDEX);
TR_TABLE_CS_EXIT (thread_p);
LOG_CS_EXIT (thread_p);
}
/*
* logtb_define_trantable_log_latch - define the transaction table
*
* return: NO_ERROR if all OK, ER status otherwise
*
* num_expected_tran_indices(in): Number of expected concurrent transactions
* (i.e., threads/clients of execution)
* num_expected_locks(in): Number of expected locks
*
* Note: This function is only called by the log manager when the log
* latch has already been acquired. (See logtb_define_trantable for
* other uses).
*/
int
logtb_define_trantable_log_latch (THREAD_ENTRY * thread_p, int num_expected_tran_indices)
{
int error_code = NO_ERROR;
assert (LOG_CS_OWN_WRITE_MODE (thread_p));
/*
* for XA support: there is prepared transaction after recovery.
* so, can not recreate transaction description
* table after recovery.
*
* Total number of transaction descriptor is set to the value of
* MAX_NTRANS
*/
num_expected_tran_indices = MAX (num_expected_tran_indices, MAX_NTRANS);
num_expected_tran_indices = MAX (num_expected_tran_indices, LOG_SYSTEM_TRAN_INDEX + 1);
/* If there is an already defined table, free such a table */
if (log_Gl.trantable.area != NULL)
{
logtb_undefine_trantable (thread_p);
}
else
{
/* Initialize the transaction table as empty */
logtb_initialize_trantable (&log_Gl.trantable);
}
/*
* Create an area to keep the number of desired transaction descriptors
*/
error_code = logtb_expand_trantable (thread_p, num_expected_tran_indices);
if (error_code != NO_ERROR)
{
/*
* Unable to create transaction table to hold the desired number
* of indices. Probably, a lot of indices were requested.
* try again with defaults.
*/
if (log_Gl.trantable.area != NULL)
{
logtb_undefine_trantable (thread_p);
}
#if defined(SERVER_MODE)
if (num_expected_tran_indices <= LOG_ESTIMATE_NACTIVE_TRANS || log_Gl.rcv_phase == LOG_RESTARTED)
#else /* SERVER_MODE */
if (num_expected_tran_indices <= LOG_ESTIMATE_NACTIVE_TRANS)
#endif /* SERVER_MODE */
{
/* Out of memory */
logpb_fatal_error (thread_p, true, ARG_FILE_LINE, "log_def_trantable");
return ER_OUT_OF_VIRTUAL_MEMORY;
}
else
{
error_code = logtb_define_trantable_log_latch (thread_p, LOG_ESTIMATE_NACTIVE_TRANS);
return error_code;
}
}
logtb_set_number_of_assigned_tran_indices (1); /* sys tran */
/*
* Assign the first entry for the system transaction. System transaction
* has an infinite timeout
*/
error_code = logtb_initialize_system_tdes (thread_p);
if (error_code != NO_ERROR)
{
er_set (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, ER_LOG_UNKNOWN_TRANINDEX, 1, LOG_SYSTEM_TRAN_INDEX);
logpb_fatal_error (thread_p, true, ARG_FILE_LINE, "log_def_trantable");
return error_code;
}
LOG_SET_CURRENT_TRAN_INDEX (thread_p, LOG_SYSTEM_TRAN_INDEX);
log_Gl.mvcc_table.initialize ();
/* Initialize the lock manager and the page buffer pool */
error_code = lock_initialize ();
if (error_code != NO_ERROR)
{
goto error;
}
error_code = pgbuf_initialize ();
if (error_code != NO_ERROR)
{
goto error;
}
error_code = file_manager_init ();
if (error_code != NO_ERROR)
{
goto error;
}
return error_code;
error:
logtb_undefine_trantable (thread_p);
logpb_fatal_error (thread_p, true, ARG_FILE_LINE, "log_def_trantable");
return error_code;
}
/*
* logtb_initialize_trantable -
*
* return: nothing
*
* trantable_p(in/out):
*
* Note: .
*/
static void
logtb_initialize_trantable (TRANTABLE * trantable_p)
{
trantable_p->num_total_indices = 0;
trantable_p->num_assigned_indices = 1;
trantable_p->num_coord_loose_end_indices = 0;
trantable_p->num_prepared_loose_end_indices = 0;
trantable_p->hint_free_index = 0;
trantable_p->num_interrupts = 0;
trantable_p->area = NULL;
trantable_p->all_tdes = NULL;
}
/*
* logtb_initialize_system_tdes -
*
* return: NO_ERROR if all OK, ER status otherwise
*
* Note: .
*/
static int
logtb_initialize_system_tdes (THREAD_ENTRY * thread_p)
{
LOG_TDES *tdes;
tdes = LOG_FIND_TDES (LOG_SYSTEM_TRAN_INDEX);
if (tdes == NULL)
{
er_set (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, ER_LOG_UNKNOWN_TRANINDEX, 1, LOG_SYSTEM_TRAN_INDEX);
logpb_fatal_error (thread_p, true, ARG_FILE_LINE, "log_initialize_system_tdes");
return ER_LOG_UNKNOWN_TRANINDEX;
}
logtb_clear_tdes (thread_p, tdes);
tdes->tran_index = LOG_SYSTEM_TRAN_INDEX;
tdes->trid = LOG_SYSTEM_TRANID;
tdes->mvccinfo.reset ();
tdes->isloose_end = true;
tdes->wait_msecs = TRAN_LOCK_INFINITE_WAIT;
tdes->isolation = TRAN_DEFAULT_ISOLATION_LEVEL ();
tdes->client_id = -1;
tdes->client.set_system_internal ();
tdes->query_timeout = 0;
tdes->tran_abort_reason = TRAN_NORMAL;
tdes->block_global_oldest_active_until_commit = false;
return NO_ERROR;
}
/*
* logtb_undefine_trantable - undefine the transaction table
*
* return: nothing
*
* Note: Undefine and free the transaction table space.
*/
void
logtb_undefine_trantable (THREAD_ENTRY * thread_p)
{
LOG_ADDR_TDESAREA *area;
LOG_TDES *tdes; /* Transaction descriptor */
int i;
log_Gl.mvcc_table.finalize ();
lock_finalize ();
pgbuf_finalize ();
file_manager_final ();
if (log_Gl.trantable.area != NULL)
{
/*
* If any one of the transaction indices has coordinator info,
* free this area
*/
for (i = 0; i < NUM_TOTAL_TRAN_INDICES; i++)
{
/*
* If there is any memory allocated in the transaction descriptor,
* release it
*/
tdes = log_Gl.trantable.all_tdes[i];
if (tdes != NULL)
{
#if defined(SERVER_MODE)
assert (tdes->tran_index == i);
#endif
logtb_finalize_tdes (thread_p, tdes);
}
}
#if defined(ENABLE_UNUSED_FUNCTION)
wfg_free_nodes (thread_p);
#endif
if (log_Gl.trantable.all_tdes != NULL)
{
free_and_init (log_Gl.trantable.all_tdes);
}
area = log_Gl.trantable.area;
while (area != NULL)
{
log_Gl.trantable.area = area->next;
/* *INDENT-OFF* */
delete[] area->tdesarea;
/* *INDENT-ON* */
free_and_init (area);
area = log_Gl.trantable.area;
}
}
logtb_initialize_trantable (&log_Gl.trantable);
}
/*
* logtb_get_number_assigned_tran_indices - find number of transaction indices
*
* return: number of transaction indices
*
*/
int
logtb_get_number_assigned_tran_indices (void)
{
/* Do not use TR_TABLE_CS_ENTER()/TR_TABLE_CS_EXIT(), Estimated value is sufficient for the caller */
return NUM_ASSIGNED_TRAN_INDICES;
}
/*
* logtb_set_number_of_assigned_tran_indices - set the number of tran indices
*
* return: nothing
* num_trans(in): the number of assigned tran indices
*
* Note: Callers have to call this function in the 'TR_TABLE' critical section.
*/
static void
logtb_set_number_of_assigned_tran_indices (int num_trans)
{
log_Gl.trantable.num_assigned_indices = num_trans;
}
/*
* logtb_increment_number_of_assigned_tran_indices -
* increment the number of tran indices
*
* return: nothing
*
* Note: Callers have to call this function in the 'TR_TABLE' critical section.
*/
static void
logtb_increment_number_of_assigned_tran_indices (void)
{
log_Gl.trantable.num_assigned_indices++;
}
/*
* logtb_decrement_number_of_assigned_tran_indices -
* decrement the number of tran indices
*
* return: nothing
*
* Note: Callers have to call this function in the 'TR_TABLE' critical section.
*/
static void
logtb_decrement_number_of_assigned_tran_indices (void)
{
log_Gl.trantable.num_assigned_indices--;
}
/*
* logtb_get_number_of_total_tran_indices - find number of total transaction
* indices
*
* return: nothing
*
* Note: Find number of total transaction indices in the transaction
* table. Note that some of this indices may have not been
* assigned. See logtb_get_number_assigned_tran_indices.
*/
int
logtb_get_number_of_total_tran_indices (void)
{
return log_Gl.trantable.num_total_indices;
}
/*
* logtb_set_number_of_total_tran_indices - set the number of total tran indices
*
* return: nothing
* num_trans(in): the number of total tran indices
*
* Note: Callers have to call this function in the 'TR_TABLE' critical section.
*/
static void
logtb_set_number_of_total_tran_indices (int num_total_trans)
{
log_Gl.trantable.num_total_indices = num_total_trans;
}
#if defined(ENABLE_UNUSED_FUNCTION)
/*
* logtb_am_i_sole_tran - Check if no other transactions are running
*
* return: If true, return as did TR_TABLE_CS_ENTER()
* but not TR_TABLE_CS_EXIT()
*
* Note: Check if no other transactions are running, that is, i am a
* sole transaction. If you get true by this function, you
* should call logtb_i_am_not_sole_tran() to exit the critical
* section (TR_TABLE_CS_EIXT())
*/
bool
logtb_am_i_sole_tran (THREAD_ENTRY * thread_p)
{
TR_TABLE_CS_ENTER (thread_p);
if (NUM_ASSIGNED_TRAN_INDICES <= 2)
{
return true;
}
return false;
}
/*
* logtb_i_am_not_sole_tran -
*
* return:
*
* NOTE:
*/
void
logtb_i_am_not_sole_tran (THREAD_ENTRY * thread_p)
{
TR_TABLE_CS_EXIT (thread_p);
}
#endif /* ENABLE_UNUSED_FUNCTION */
bool
logtb_am_i_dba_client (THREAD_ENTRY * thread_p)
{
const char *db_user;
db_user = logtb_find_current_client_name (thread_p);
return (db_user != NULL && !strcasecmp (db_user, "DBA"));
}
/*
* logtb_assign_tran_index - assign a transaction index for a sequence of
* transactions (thread of execution.. a client)
*
* return: transaction index
*
* trid(in): Transaction identifier or NULL_TRANID
* state(in): Transaction state (Usually active)
* client_prog_name(in): Name of the client program or NULL
* client_user_name(in): Name of the client user or NULL
* client_host_name(in): Name of the client host or NULL
* client_process_id(in): Identifier of the process of the host where the
* client transaction runs.
* current_state(in/out): Set as a side effect to state of transaction, when
* a valid pointer is given.
* wait_msecs(in): Wait for at least this number of milliseconds to acquire a
* lock. Negative value is infinite
* isolation(in): Isolation level. One of the following:
* TRAN_SERIALIZABLE
* TRAN_REPEATABLE_READ
* TRAN_READ_COMMITTED
*
* Note:Assign a transaction index for a sequence of transactions
* (i.e., a client) and initialize the state structure for the
* first transaction in the sequence. If trid is equal to
* NULL_TRANID, a transaction is assigned to the assigned
* structure and the transaction is declared active; otherwise,
* the given transaction with the given state is assigned to the
* index.
*
* This function must be called when a client is restarted.
*/
int
logtb_assign_tran_index (THREAD_ENTRY * thread_p, TRANID trid, TRAN_STATE state,
const BOOT_CLIENT_CREDENTIAL * client_credential, TRAN_STATE * current_state, int wait_msecs,
TRAN_ISOLATION isolation)
{
int tran_index; /* The allocated transaction index */
#if defined(SERVER_MODE)
if (thread_p == NULL)
{
thread_p = thread_get_thread_entry_info ();
}
#endif /* SERVER_MODE */
LOG_SET_CURRENT_TRAN_INDEX (thread_p, LOG_SYSTEM_TRAN_INDEX);
TR_TABLE_CS_ENTER (thread_p);
tran_index =
logtb_allocate_tran_index (thread_p, trid, state, client_credential, current_state, wait_msecs, isolation);
TR_TABLE_CS_EXIT (thread_p);
if (tran_index != NULL_TRAN_INDEX)
{
LOG_SET_CURRENT_TRAN_INDEX (thread_p, tran_index);
}
else
{
LOG_SET_CURRENT_TRAN_INDEX (thread_p, LOG_SYSTEM_TRAN_INDEX);
}
return tran_index;
}
/*
* logtb_set_tdes -
*
* return:
*
* tdes(in/out): Transaction descriptor
* client_prog_name(in): the name of the client program
* client_host_name(in): the name of the client host
* client_user_name(in): the name of the client user
* client_process_id(in): the process id of the client
* wait_msecs(in): Wait for at least this number of milliseconds to acquire a lock.
* isolation(in): Isolation level
*/
static void
logtb_set_tdes (THREAD_ENTRY * thread_p, LOG_TDES * tdes, const BOOT_CLIENT_CREDENTIAL * client_credential,
int wait_msecs, TRAN_ISOLATION isolation)
{
#if defined(SERVER_MODE)
CSS_CONN_ENTRY *conn;
#endif /* SERVER_MODE */
if (client_credential == NULL)
{
client_credential = &log_Client_credential;
}
tdes->client.set_ids (*client_credential);
tdes->is_user_active = false;
#if defined(SERVER_MODE)
if (thread_p == NULL)
{
thread_p = thread_get_thread_entry_info ();
}
conn = thread_p->conn_entry;
if (conn != NULL)
{
tdes->client_id = conn->client_id;
}
else
{
tdes->client_id = -1;
}
#else /* SERVER_MODE */
tdes->client_id = -1;
#endif /* SERVER_MODE */
tdes->wait_msecs = wait_msecs;
tdes->isolation = isolation;
tdes->isloose_end = false;
tdes->interrupt = false;
tdes->topops.stack = NULL;
tdes->topops.max = 0;
tdes->topops.last = -1;
tdes->m_modified_classes.clear ();
tdes->num_transient_classnames = 0;
tdes->first_save_entry = NULL;
tdes->lob_locator_root.init ();
tdes->m_log_postpone_cache.reset ();
}
/*
* logtb_allocate_tran_index - allocate a transaction index for a sequence of
* transactions (thread of execution.. a client)
*
* return: tran_index or NULL_TRAN_INDEX
*
* trid(in): Transaction identifier or NULL_TRANID
* state(in): Transaction state (Usually active)
* client_prog_name(in): Name of the client program or NULL
* client_user_name(in): Name of the client user or NULL
* client_host_name(in): Name of the client host or NULL
* client_process_id(in): Identifier of the process of the host where the
* client transaction runs.
* current_state(in/out): Set as a side effect to state of transaction, when
* a valid pointer is given.
* wait_msecs(in): Wait for at least this number of milliseconds to acquire a
* lock. That is, wait this much before the transaction
* is timed out. Negative value is infinite.
* isolation(in): Isolation level. One of the following:
* TRAN_SERIALIZABLE
* TRAN_REPEATABLE_READ
* TRAN_READ_COMMITTED
*
* Note:Allocate a transaction index for a sequence of transactions
* (i.e., a client) and initialize the state structure for the
* first transaction in the sequence. If trid is equal to
* NULL_TRANID, a transaction is assigned to the assigned
* structure and the transaction is declared active; otherwise,
* the given transaction with the given state is assigned to the
* index. If the given client user has a dangling entry due to
* client loose ends, this index is attached to it.
*
* This function is only called by the log manager when the log
* latch has already been acquired. (See logtb_assign_tran_index)
*/
static int
logtb_allocate_tran_index (THREAD_ENTRY * thread_p, TRANID trid, TRAN_STATE state,
const BOOT_CLIENT_CREDENTIAL * client_credential, TRAN_STATE * current_state, int wait_msecs,
TRAN_ISOLATION isolation)
{
int i;
int visited_loop_start_pos;
LOG_TDES *tdes; /* Transaction descriptor */
int tran_index; /* The assigned index */
int save_tran_index; /* Save as a good index to assign */
#if defined(SERVER_MODE)
if (thread_p == NULL)
{
thread_p = thread_get_thread_entry_info ();
}
#endif /* SERVER_MODE */
save_tran_index = tran_index = NULL_TRAN_INDEX;
/* Is there any free index ? */
if (NUM_ASSIGNED_TRAN_INDICES >= NUM_TOTAL_TRAN_INDICES)
{
#if defined(SERVER_MODE)
/* When normal processing, we never expand trantable */
if (log_Gl.rcv_phase == LOG_RESTARTED && NUM_TOTAL_TRAN_INDICES > 0)
{
er_set (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, ER_TM_TOO_MANY_CLIENTS, 1, NUM_TOTAL_TRAN_INDICES - 1);
return NULL_TRAN_INDEX;
}
#endif /* SERVER_MODE */
i = (int) (((float) NUM_TOTAL_TRAN_INDICES * LOG_EXPAND_TRANTABLE_RATIO) + 0.5);
if (logtb_expand_trantable (thread_p, i) != NO_ERROR)
{
/* Out of memory or something like that */
return NULL_TRAN_INDEX;
}
}
/*
* Note that we could have found the entry already and it may be stored in
* tran_index.
*/
for (i = log_Gl.trantable.hint_free_index, visited_loop_start_pos = 0;
tran_index == NULL_TRAN_INDEX && visited_loop_start_pos < 2; i = (i + 1) % NUM_TOTAL_TRAN_INDICES)
{
if (log_Gl.trantable.all_tdes[i]->trid == NULL_TRANID)
{
tran_index = i;
}
if (i == log_Gl.trantable.hint_free_index)
{
visited_loop_start_pos++;
}
}
if (tran_index != NULL_TRAN_INDEX)
{
log_Gl.trantable.hint_free_index = (tran_index + 1) % NUM_TOTAL_TRAN_INDICES;
logtb_increment_number_of_assigned_tran_indices ();
tdes = LOG_FIND_TDES (tran_index);
if (tdes == NULL)
{
er_set (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, ER_LOG_UNKNOWN_TRANINDEX, 1, tran_index);
return NULL_TRAN_INDEX;
}
tdes->tran_index = tran_index;
logtb_clear_tdes (thread_p, tdes);
logtb_set_tdes (thread_p, tdes, client_credential, wait_msecs, isolation);
if (trid == NULL_TRANID)
{
/* Assign a new transaction identifier for the new index */
logtb_get_new_tran_id (thread_p, tdes);
state = TRAN_ACTIVE;
}
else
{
tdes->trid = trid;
tdes->state = state;
}
if (current_state)
{
*current_state = state;
}
LOG_SET_CURRENT_TRAN_INDEX (thread_p, tran_index);
tdes->tran_abort_reason = TRAN_NORMAL;
}
return tran_index;
}
int
logtb_is_tran_modification_disabled (THREAD_ENTRY * thread_p)
{
LOG_TDES *tdes;
int tran_index;
tran_index = LOG_FIND_THREAD_TRAN_INDEX (thread_p);
tdes = LOG_FIND_TDES (tran_index);
if (tdes == NULL)
{
return db_Disable_modifications;
}
return tdes->disable_modifications;
}
/*
* logtb_rv_find_allocate_tran_index - find/alloc a transaction during the recovery
* analysis process
*
* return: The transaction descriptor
*
* trid(in): The desired transaction identifier
* log_lsa(in): Log address where the transaction was seen in the log
*
* Note: Find or allocate the transaction descriptor for the given
* transaction identifier. If the descriptor was allocated, it is
* assumed that this is the first time the transaction is seen.
* Thus, the head of the transaction in the log is located at the
* givel location (i.e., log_lsa.pageid, log_offset).
* This function should be called only by the recovery process
* (the analysis phase).
*/
LOG_TDES *
logtb_rv_find_allocate_tran_index (THREAD_ENTRY * thread_p, TRANID trid, const LOG_LSA * log_lsa)
{
LOG_TDES *tdes; /* Transaction descriptor */
int tran_index;
assert (trid != NULL_TRANID);
if (logtb_is_system_worker_tranid (trid))
{
// *INDENT-OFF*
return log_system_tdes::rv_get_or_alloc_tdes (trid, *log_lsa);
// *INDENT-ON*
}
/*
* If this is the first time, the transaction is seen. Assign a new
* index to describe it and assume that the transaction was active
* at the time of the crash, and thus it will be unilaterally aborted
*/
tran_index = logtb_find_tran_index (thread_p, trid);
if (tran_index == NULL_TRAN_INDEX)
{
/* Define the index */
tran_index =
logtb_allocate_tran_index (thread_p, trid, TRAN_UNACTIVE_UNILATERALLY_ABORTED, NULL, NULL,
TRAN_LOCK_INFINITE_WAIT, TRAN_SERIALIZABLE);
tdes = LOG_FIND_TDES (tran_index);
if (tran_index == NULL_TRAN_INDEX || tdes == NULL)
{
/*
* Unable to assign a transaction index. The recovery process
* cannot continue
*/
logpb_fatal_error (thread_p, true, ARG_FILE_LINE, "log_recovery_find_or_alloc");
return NULL;
}
else
{
LSA_COPY (&tdes->head_lsa, log_lsa);
}
}
else
{
tdes = LOG_FIND_TDES (tran_index);
}
return tdes;
}
/*
* logtb_rv_assign_mvccid_for_undo_recovery () - Assign an MVCCID for
* transactions that need to
* undo at recovery.
*
* return : Void.
* thread_p (in) : Thread entry.
* mvccid (in) : Assigned MVCCID.
*/
void
logtb_rv_assign_mvccid_for_undo_recovery (THREAD_ENTRY * thread_p, MVCCID mvccid)
{
LOG_TDES *tdes = LOG_FIND_CURRENT_TDES (thread_p);
assert (tdes != NULL);
assert (MVCCID_IS_VALID (mvccid));
tdes->mvccinfo.id = mvccid;
}
/*
* logtb_release_tran_index - return an assigned transaction index
*
* return: nothing
*
* tran_index(in): Transaction index
*
* Note: Return a transaction index which was used for a sequence of
* transactions (i.e., a client).
*
* This function must be called when a client is shutdown (i.e.,
* unregistered).
*/
void
logtb_release_tran_index (THREAD_ENTRY * thread_p, int tran_index)
{
LOG_TDES *tdes; /* Transaction descriptor */
qmgr_clear_trans_wakeup (thread_p, tran_index, true, false);
heap_chnguess_clear (thread_p, tran_index);
tdes = LOG_FIND_TDES (tran_index);
if (tran_index != LOG_SYSTEM_TRAN_INDEX && tdes != NULL)
{
tdes->mvccinfo.reset ();
TR_TABLE_CS_ENTER (thread_p);
/*
* Free the top system operation stack since the transaction entry may
* not be freed (i.e., left as loose end distributed transaction)
*/
if (tdes->topops.max != 0)
{
free_and_init (tdes->topops.stack);
tdes->topops.max = 0;
tdes->topops.last = -1;
}
if (LOG_ISTRAN_2PC_PREPARE (tdes))
{
tdes->isloose_end = true;
log_Gl.trantable.num_prepared_loose_end_indices++;
}
else
{
if (LOG_ISTRAN_2PC_INFORMING_PARTICIPANTS (tdes))
{
tdes->isloose_end = true;
log_Gl.trantable.num_coord_loose_end_indices++;
}
else
{
logtb_free_tran_index (thread_p, tran_index);
tdes->client.reset ();
tdes->is_user_active = false;
}
}
TR_TABLE_CS_EXIT (thread_p);
}
}
/*
* logtb_free_tran_index - free a transaction index
*
* return: nothing
*
* tran_index(in): Transaction index
*
* Note: Free a transaction index which was used for a sequence of
* transactions (i.e., a client).
*
* This function is only called by the log manager when the log
* latch has already been acquired. (See logtb_release_tran_index
* for other cases).
*/
void
logtb_free_tran_index (THREAD_ENTRY * thread_p, int tran_index)
{
LOG_TDES *tdes; /* Transaction descriptor */
int log_tran_index;
#if defined(SERVER_MODE)
if (thread_p == NULL)
{
thread_p = thread_get_thread_entry_info ();
}
#endif /* SERVER_MODE */
log_tran_index = LOG_FIND_THREAD_TRAN_INDEX (thread_p);
tdes = LOG_FIND_TDES (tran_index);
if (tran_index > NUM_TOTAL_TRAN_INDICES || tdes == NULL || tdes->trid == NULL_TRANID)
{
#if defined(CUBRID_DEBUG)
er_log_debug (ARG_FILE_LINE, "log_free_tran_index: Unknown index = %d. Operation is ignored", tran_index);
#endif /* CUBRID_DEBUG */
return;
}
logtb_clear_tdes (thread_p, tdes);
if (tdes->repl_records)
{
free_and_init (tdes->repl_records);
}
tdes->num_repl_records = 0;
if (tdes->topops.max != 0)
{
free_and_init (tdes->topops.stack);
tdes->topops.max = 0;
tdes->topops.last = -1;
}
if (tran_index != LOG_SYSTEM_TRAN_INDEX)
{
tdes->trid = NULL_TRANID;
tdes->client_id = -1;
TR_TABLE_CS_ENTER (thread_p);
logtb_decrement_number_of_assigned_tran_indices ();
if (log_Gl.trantable.hint_free_index > tran_index)
{
log_Gl.trantable.hint_free_index = tran_index;
}
TR_TABLE_CS_EXIT (thread_p);
if (log_tran_index == tran_index)
{
if (!LOG_ISRESTARTED ())
{
log_tran_index = LOG_SYSTEM_TRAN_INDEX;
}
else
{
log_tran_index = NULL_TRAN_INDEX;
}
LOG_SET_CURRENT_TRAN_INDEX (thread_p, log_tran_index);
}
}
}
/*
* logtb_free_tran_index_with_undo_lsa - free tranindex with lsa
*
* return: nothing
*
* undo_lsa(in): Undo log sequence address
*
* Note: Remove the transaction index associated with the undo LSA.
* This function execute a sequential search on the transaction
* table to find out the transaction with such lsa. This
* sequential scan is OK since this function is only called when
* a system error happens, which in principle never happen.
*/
void
logtb_free_tran_index_with_undo_lsa (THREAD_ENTRY * thread_p, const LOG_LSA * undo_lsa)
{
int i;
LOG_TDES *tdes; /* Transaction descriptor */
TR_TABLE_CS_ENTER (thread_p);
if (undo_lsa != NULL && !LSA_ISNULL (undo_lsa))
{
for (i = 0; i < NUM_TOTAL_TRAN_INDICES; i++)
{
if (i != LOG_SYSTEM_TRAN_INDEX)
{
tdes = log_Gl.trantable.all_tdes[i];
if (tdes != NULL && tdes->trid != NULL_TRANID && tdes->state == TRAN_UNACTIVE_UNILATERALLY_ABORTED
&& LSA_EQ (undo_lsa, &tdes->undo_nxlsa))
{
logtb_free_tran_index (thread_p, i);
}
}
}
}
TR_TABLE_CS_EXIT (thread_p);
}
/*
* logtb_dump_tdes -
*
* return: nothing
*
* tdes(in):
*
* Note:
*/
static void
logtb_dump_tdes (FILE * out_fp, LOG_TDES * tdes)
{
fprintf (out_fp,
"Tran_index = %2d, Trid = %d,\n State = %s,\n Isolation = %s,\n"
" Wait_msecs = %d, isloose_end = %d,\n Head_lsa = %lld|%d, Tail_lsa = %lld|%d,"
" Postpone_lsa = %lld|%d,\n SaveLSA = %lld|%d, UndoNextLSA = %lld|%d,\n"
" Client_User: (Type = %d, User = %s, Program = %s, Login = %s, Host = %s, Pid = %d)\n",
tdes->tran_index, tdes->trid, log_state_string (tdes->state), log_isolation_string (tdes->isolation),
tdes->wait_msecs, tdes->isloose_end, (long long int) tdes->head_lsa.pageid, (int) tdes->head_lsa.offset,
(long long int) tdes->tail_lsa.pageid, (int) tdes->tail_lsa.offset, (long long int) tdes->posp_nxlsa.pageid,
(int) tdes->posp_nxlsa.offset, (long long int) tdes->savept_lsa.pageid, (int) tdes->savept_lsa.offset,
(long long int) tdes->undo_nxlsa.pageid, (int) tdes->undo_nxlsa.offset, tdes->client.client_type,
tdes->client.get_db_user (), tdes->client.get_program_name (), tdes->client.get_login_name (),
tdes->client.get_host_name (), tdes->client.process_id);
if (tdes->topops.max != 0 && tdes->topops.last >= 0)
{
logtb_dump_top_operations (out_fp, &tdes->topops);
}
if (tdes->gtrid != LOG_2PC_NULL_GTRID || tdes->coord != NULL)
{
logtb_dump_tdes_distribute_transaction (out_fp, tdes->gtrid, tdes->coord);
}
}
/*
* logtb_dump_top_operations -
*
* return: nothing
*
* tdes(in):
*
* Note:
*/
static void
logtb_dump_top_operations (FILE * out_fp, LOG_TOPOPS_STACK * topops_p)
{
int i;
fprintf (out_fp, " Active top system operations for tran:\n");
for (i = topops_p->last; i >= 0; i--)
{
fprintf (out_fp, " Head = %lld|%d, Posp_Head = %lld|%d\n",
LSA_AS_ARGS (&topops_p->stack[i].lastparent_lsa), LSA_AS_ARGS (&topops_p->stack[i].posp_lsa));
}
}
/*
* logtb_dump_tdes_distribute_transaction -
*
* return: nothing
*
* tdes(in):
*
* Note:
*/
static void
logtb_dump_tdes_distribute_transaction (FILE * out_fp, int global_tran_id, LOG_2PC_COORDINATOR * coord)
{
int i;
char *particp_id; /* Participant identifier */
/* This is a distributed transaction */
if (coord != NULL)
{
fprintf (out_fp, " COORDINATOR SITE(or NESTED PARTICIPANT SITE)");
}
else
{
fprintf (out_fp, " PARTICIPANT SITE");
}
fprintf (out_fp, " of global tranid = %d\n", global_tran_id);
if (coord != NULL)
{
fprintf (out_fp, " Num_participants = %d, Partids = ", coord->num_particps);
for (i = 0; i < coord->num_particps; i++)
{
particp_id = ((char *) coord->block_particps_ids + i * coord->particp_id_length);
DBLINK_CONN_INFO *dblink = (DBLINK_CONN_INFO *) particp_id;
if (i == 0)
{
fprintf (out_fp, " [handle = %d, url = %s, user = %s]", dblink->conn_handle, dblink->conn_url,
dblink->user_name);
}
else
{
fprintf (out_fp, ", [handle = %d, url = %s, user = %s]", dblink->conn_handle, dblink->conn_url,
dblink->user_name);
}
}
fprintf (out_fp, "\n");
#ifdef LOG_2PC_ACK_RECV_REQUIRED
if (coord->ack_received)
{
fprintf (out_fp, " Acknowledgement vector =");
for (i = 0; i < coord->num_particps; i++)
{
if (i == 0)
{
fprintf (out_fp, " %d", coord->ack_received[i]);
}
else
{
fprintf (out_fp, ", %d", coord->ack_received[i]);
}
}
}
#endif
fprintf (out_fp, "\n");
}
}
/*
* xlogtb_dump_trantable - dump the transaction table
*
* return: nothing
*
* Note: Dump the transaction state table.
* This function is used for debugging purposes.
*/
void
xlogtb_dump_trantable (THREAD_ENTRY * thread_p, FILE * out_fp)
{
int i;
LOG_TDES *tdes; /* Transaction descriptor */
fprintf (out_fp, "\n ** DUMPING TABLE OF ACTIVE TRANSACTIONS **\n");
TR_TABLE_CS_ENTER_READ_MODE (thread_p);
for (i = 0; i < NUM_TOTAL_TRAN_INDICES; i++)
{
tdes = log_Gl.trantable.all_tdes[i];
if (tdes == NULL || tdes->trid == NULL_TRANID)
{
fprintf (out_fp, "Tran_index = %2d... Free transaction index\n", i);
}
else
{
logtb_dump_tdes (out_fp, tdes);
}
}
TR_TABLE_CS_EXIT (thread_p);
fprintf (out_fp, "\n");
}
/*
* logtb_free_tran_mvcc_info - free transaction MVCC info
*
* return: nothing..
*
* tdes(in/out): Transaction descriptor
*/
static void
logtb_free_tran_mvcc_info (LOG_TDES * tdes)
{
MVCC_INFO *curr_mvcc_info = &tdes->mvccinfo;
curr_mvcc_info->snapshot.m_active_mvccs.finalize ();
curr_mvcc_info->sub_ids.clear ();
}
/*
* logtb_clear_tdes - clear the transaction descriptor
*
* return: nothing..
*
* tdes(in/out): Transaction descriptor
*/
void
logtb_clear_tdes (THREAD_ENTRY * thread_p, LOG_TDES * tdes)
{
int i, j;
DB_VALUE *dbval;
HL_HEAPID save_heap_id;
tdes->isloose_end = false;
tdes->state = TRAN_ACTIVE;
LSA_SET_NULL (&tdes->head_lsa);
LSA_SET_NULL (&tdes->tail_lsa);
LSA_SET_NULL (&tdes->undo_nxlsa);
LSA_SET_NULL (&tdes->posp_nxlsa);
LSA_SET_NULL (&tdes->savept_lsa);
LSA_SET_NULL (&tdes->topop_lsa);
LSA_SET_NULL (&tdes->tail_topresult_lsa);
LSA_SET_NULL (&tdes->commit_abort_lsa);
tdes->topops.last = -1;
tdes->gtrid = LOG_2PC_NULL_GTRID;
tdes->gtrinfo.info_length = 0;
if (tdes->gtrinfo.info_data != NULL)
{
free_and_init (tdes->gtrinfo.info_data);
}
if (tdes->coord != NULL)
{
log_2pc_free_coord_info (tdes);
}
tdes->m_multiupd_stats.clear ();
if (tdes->interrupt == (int) true)
{
tdes->interrupt = false;
#if defined (HAVE_ATOMIC_BUILTINS)
ATOMIC_INC_32 (&log_Gl.trantable.num_interrupts, -1);
#else
TR_TABLE_CS_ENTER (thread_p);
log_Gl.trantable.num_interrupts--;
TR_TABLE_CS_EXIT (thread_p);
#endif
}
tdes->m_modified_classes.clear ();
for (i = 0; i < tdes->cur_repl_record; i++)
{
if (tdes->repl_records[i].repl_data)
{
free_and_init (tdes->repl_records[i].repl_data);
}
}
save_heap_id = db_change_private_heap (thread_p, 0);
for (i = 0; i < tdes->num_exec_queries && i < MAX_NUM_EXEC_QUERY_HISTORY; i++)
{
if (tdes->bind_history[i].vals == NULL)
{
continue;
}
dbval = tdes->bind_history[i].vals;
for (j = 0; j < tdes->bind_history[i].size; j++)
{
db_value_clear (dbval);
dbval++;
}
db_private_free_and_init (thread_p, tdes->bind_history[i].vals);
tdes->bind_history[i].size = 0;
}
(void) db_change_private_heap (thread_p, save_heap_id);
tdes->cur_repl_record = 0;
tdes->append_repl_recidx = -1;
tdes->fl_mark_repl_recidx = -1;
LSA_SET_NULL (&tdes->repl_insert_lsa);
LSA_SET_NULL (&tdes->repl_update_lsa);
tdes->first_save_entry = NULL;
tdes->query_timeout = 0;
tdes->query_start_time = 0;
tdes->tran_start_time = 0;
XASL_ID_SET_NULL (&tdes->xasl_id);
tdes->waiting_for_res = NULL;
tdes->tran_abort_reason = TRAN_NORMAL;
tdes->num_exec_queries = 0;
tdes->suppress_replication = 0;
tdes->m_log_postpone_cache.reset ();
tdes->has_supplemental_log = false;
if (tdes->ddl_sql_user_text != NULL)
{
free_and_init (tdes->ddl_sql_user_text);
}
logtb_tran_clear_update_stats (&tdes->log_upd_stats);
assert (tdes->mvccinfo.id == MVCCID_NULL);
if (BOOT_WRITE_ON_STANDY_CLIENT_TYPE (tdes->client.client_type))
{
tdes->disable_modifications = 0;
}
else
{
tdes->disable_modifications = db_Disable_modifications;
}
tdes->has_deadlock_priority = false;
tdes->num_log_records_written = 0;
LSA_SET_NULL (&tdes->rcv.tran_start_postpone_lsa);
LSA_SET_NULL (&tdes->rcv.sysop_start_postpone_lsa);
LSA_SET_NULL (&tdes->rcv.atomic_sysop_start_lsa);
LSA_SET_NULL (&tdes->rcv.analysis_last_aborted_sysop_lsa);
LSA_SET_NULL (&tdes->rcv.analysis_last_aborted_sysop_start_lsa);
}
/*
* logtb_initialize_tdes - initialize the transaction descriptor
*
* return: nothing..
*
* tdes(in/out): Transaction descriptor
* tran_index(in): Transaction index
*/
void
logtb_initialize_tdes (LOG_TDES * tdes, int tran_index)
{
int i, r;
tdes->tran_index = tran_index;
tdes->trid = NULL_TRANID;
tdes->isloose_end = false;
tdes->coord = NULL;
tdes->client_id = -1;
tdes->gtrid = LOG_2PC_NULL_GTRID;
tdes->gtrinfo.info_length = 0;
tdes->gtrinfo.info_data = NULL;
tdes->interrupt = false;
tdes->wait_msecs = TRAN_LOCK_INFINITE_WAIT;
tdes->isolation = TRAN_SERIALIZABLE;
LSA_SET_NULL (&tdes->head_lsa);
LSA_SET_NULL (&tdes->tail_lsa);
LSA_SET_NULL (&tdes->undo_nxlsa);
LSA_SET_NULL (&tdes->posp_nxlsa);
LSA_SET_NULL (&tdes->savept_lsa);
LSA_SET_NULL (&tdes->topop_lsa);
LSA_SET_NULL (&tdes->tail_topresult_lsa);
LSA_SET_NULL (&tdes->commit_abort_lsa);
r = rmutex_initialize (&tdes->rmutex_topop, RMUTEX_NAME_TDES_TOPOP);
assert (r == NO_ERROR);
tdes->topops.stack = NULL;
tdes->topops.last = -1;
tdes->topops.max = 0;
tdes->num_unique_btrees = 0;
tdes->max_unique_btrees = 0;
tdes->m_multiupd_stats.construct ();
tdes->num_transient_classnames = 0;
tdes->num_repl_records = 0;
tdes->cur_repl_record = 0;
tdes->append_repl_recidx = -1;
tdes->fl_mark_repl_recidx = -1;
tdes->repl_records = NULL;
LSA_SET_NULL (&tdes->repl_insert_lsa);
LSA_SET_NULL (&tdes->repl_update_lsa);
tdes->first_save_entry = NULL;
tdes->suppress_replication = 0;
tdes->lob_locator_root.init ();
tdes->query_timeout = 0;
tdes->query_start_time = 0;
tdes->tran_start_time = 0;
XASL_ID_SET_NULL (&tdes->xasl_id);
tdes->waiting_for_res = NULL;
tdes->disable_modifications = db_Disable_modifications;
tdes->tran_abort_reason = TRAN_NORMAL;
tdes->num_exec_queries = 0;
tdes->ddl_sql_user_text = NULL;
for (i = 0; i < MAX_NUM_EXEC_QUERY_HISTORY; i++)
{
tdes->bind_history[i].size = 0;
tdes->bind_history[i].vals = NULL;
}
tdes->has_deadlock_priority = false;
tdes->num_log_records_written = 0;
tdes->mvccinfo.init ();
tdes->log_upd_stats.cos_count = 0;
tdes->log_upd_stats.cos_first_chunk = NULL;
tdes->log_upd_stats.cos_current_chunk = NULL;
tdes->log_upd_stats.classes_cos_hash = NULL;
tdes->log_upd_stats.stats_count = 0;
tdes->log_upd_stats.stats_first_chunk = NULL;
tdes->log_upd_stats.stats_current_chunk = NULL;
tdes->log_upd_stats.unique_stats_hash = NULL;
tdes->log_upd_stats.unique_stats_hash =
mht_create ("Tran_unique_stats", 101, logtb_tran_btid_hash_func, logtb_tran_btid_hash_cmp_func);
tdes->log_upd_stats.classes_cos_hash = mht_create ("Tran_classes_cos", 101, oid_hash, oid_compare_equals);
tdes->block_global_oldest_active_until_commit = false;
tdes->is_user_active = false;
tdes->has_supplemental_log = false;
LSA_SET_NULL (&tdes->rcv.tran_start_postpone_lsa);
LSA_SET_NULL (&tdes->rcv.sysop_start_postpone_lsa);
LSA_SET_NULL (&tdes->rcv.atomic_sysop_start_lsa);
LSA_SET_NULL (&tdes->rcv.analysis_last_aborted_sysop_lsa);
LSA_SET_NULL (&tdes->rcv.analysis_last_aborted_sysop_start_lsa);
}
/*
* logtb_finalize_tdes - finalize the transaction descriptor
*
* return: nothing.
*
* thread_p(in):
* tdes(in/out): Transaction descriptor
*/
void
logtb_finalize_tdes (THREAD_ENTRY * thread_p, LOG_TDES * tdes)
{
int r;
logtb_clear_tdes (thread_p, tdes);
logtb_free_tran_mvcc_info (tdes);
logtb_tran_free_update_stats (&tdes->log_upd_stats);
r = rmutex_finalize (&tdes->rmutex_topop);
assert (r == NO_ERROR);
if (tdes->topops.max != 0)
{
free_and_init (tdes->topops.stack);
tdes->topops.max = 0;
tdes->topops.last = -1;
}
}
/*
* logtb_get_new_tran_id - assign a new transaction identifier
*
* return: tranid
*
* tdes(in/out): Transaction descriptor
*/
int
logtb_get_new_tran_id (THREAD_ENTRY * thread_p, LOG_TDES * tdes)
{
#if defined (HAVE_ATOMIC_BUILTINS)
int trid, next_trid;
logtb_clear_tdes (thread_p, tdes);
do
{
trid = VOLATILE_ACCESS (log_Gl.hdr.next_trid, int);
next_trid = trid + 1;
if (next_trid < 0)
{
/* an overflow happened. starts with its base */
next_trid = LOG_SYSTEM_TRANID + 1;
}
/* Need to check (trid < LOG_SYSTEM_TRANID + 1) for robustness. If log_Gl.hdr.next_trid was reset to 0 (see
* log_rv_analysis_log_end), this prevents us from correctly generating trids. */
}
while (!ATOMIC_CAS_32 (&log_Gl.hdr.next_trid, trid, next_trid) || (trid < LOG_SYSTEM_TRANID + 1));
assert (LOG_SYSTEM_TRANID + 1 <= trid && trid <= DB_INT32_MAX);
tdes->trid = trid;
return trid;
#else
TR_TABLE_CS_ENTER (thread_p);
logtb_clear_tdes (thread_p, tdes);
tdes->trid = log_Gl.hdr.next_trid++;
/* check overflow */
if (tdes->trid < 0)
{
tdes->trid = LOG_SYSTEM_TRANID + 1;
/* set MVCC next id to null */
log_Gl.hdr.next_trid = tdes->trid + 1;
}
TR_TABLE_CS_EXIT (thread_p);
return tdes->trid;
#endif
}
/*
* logtb_find_tran_index - find index of transaction
*
* return: tran index
*
* trid(in): Transaction identifier
*
* Note: Find the index of a transaction. This function execute a
* sequential search in the transaction table to find out the
* transaction index. The function bypasses the search if the
* trid belongs to the current transaction.
*
* The assumption of this function is that the transaction table
* is not very big and that most of the time the search is
* avoided. if this assumption becomes false, we may need to
* define a hash table from trid to tdes to speed up the
* search.
*/
int
logtb_find_tran_index (THREAD_ENTRY * thread_p, TRANID trid)
{
int i;
int tran_index = NULL_TRAN_INDEX; /* The transaction index */
LOG_TDES *tdes; /* Transaction descriptor */
assert (trid != NULL_TRANID);
/* Avoid searching as much as possible */
tran_index = LOG_FIND_THREAD_TRAN_INDEX (thread_p);
tdes = LOG_FIND_TDES (tran_index);
if (tdes == NULL || tdes->trid != trid)
{
tran_index = NULL_TRAN_INDEX;
TR_TABLE_CS_ENTER_READ_MODE (thread_p);
/* Search the transaction table for such transaction */
for (i = 0; i < NUM_TOTAL_TRAN_INDICES; i++)
{
tdes = log_Gl.trantable.all_tdes[i];
if (tdes != NULL && tdes->trid != NULL_TRANID && tdes->trid == trid)
{
tran_index = i;
break;
}
}
TR_TABLE_CS_EXIT (thread_p);
}
return tran_index;
}
#if defined (ENABLE_UNUSED_FUNCTION)
/*
* logtb_find_tran_index_host_pid - find index of transaction
*
* return: tran index
*
* host_name(in): Name of host machine
* process_id(in): Process id of client
*
* Note: Find the index of a transaction. This function executes a
* sequential search in the transaction table to find out the
* transaction index given only the host machine and process id.
*
* The assumption of this function is that the transaction table
* is not very big and that most of the time the search is
* avoided. It is currently only being used during client
* restart to insure that no badly behaving clients manage to
* have two open connections at the same time.
*/
int
logtb_find_tran_index_host_pid (THREAD_ENTRY * thread_p, const char *host_name, int process_id)
{
int i;
int tran_index = NULL_TRAN_INDEX; /* The transaction index */
LOG_TDES *tdes; /* Transaction descriptor */
TR_TABLE_CS_ENTER_READ_MODE (thread_p);
/* Search the transaction table for such transaction */
for (i = 0; i < NUM_TOTAL_TRAN_INDICES; i++)
{
tdes = log_Gl.trantable.all_tdes[i];
if (tdes != NULL && tdes->trid != NULL_TRANID && tdes->client.process_id == process_id
&& strcmp (tdes->client.get_host_name (), host_name) == 0)
{
tran_index = i;
break;
}
}
TR_TABLE_CS_EXIT (thread_p);
return tran_index;
}
#endif /* ENABLE_UNUSED_FUNCTION */
/*
* logtb_find_tranid - find TRANID of transaction index
*
* return: TRANID
*
* tran_index(in): Index of transaction
*/
TRANID
logtb_find_tranid (int tran_index)
{
LOG_TDES *tdes; /* Transaction descriptor */
TRANID trid = NULL_TRANID; /* Transaction index */
tdes = LOG_FIND_TDES (tran_index);
if (tdes != NULL)
{
trid = tdes->trid;
}
return trid;
}
/*
* logtb_find_current_tranid - find current transaction identifier
*
* return: TRANID
*/
TRANID
logtb_find_current_tranid (THREAD_ENTRY * thread_p)
{
return logtb_find_tranid (LOG_FIND_THREAD_TRAN_INDEX (thread_p));
}
#if defined (ENABLE_UNUSED_FUNCTION)
/*
* logtb_count_clients_with_type - count number of transaction indices
* with client type
* return: number of clients
*/
int
logtb_count_clients_with_type (THREAD_ENTRY * thread_p, int client_type)
{
LOG_TDES *tdes;
int i, count;
TR_TABLE_CS_ENTER_READ_MODE (thread_p);
count = 0;
for (i = 0; i < log_Gl.trantable.num_total_indices; i++)
{
tdes = log_Gl.trantable.all_tdes[i];
if (tdes != NULL && tdes->trid != NULL_TRANID)
{
if (tdes->client.client_type == client_type)
{
count++;
}
}
}
TR_TABLE_CS_EXIT (thread_p);
return count;
}
#endif /* ENABLE_UNUSED_FUNCTION */
/*
* logtb_count_clients - count number of transaction indices
* return: number of clients
*/
int
logtb_count_clients (THREAD_ENTRY * thread_p)
{
LOG_TDES *tdes;
int i, count;
TR_TABLE_CS_ENTER_READ_MODE (thread_p);
count = 0;
for (i = 0; i < log_Gl.trantable.num_total_indices; i++)
{
tdes = log_Gl.trantable.all_tdes[i];
if (tdes != NULL && tdes->trid != NULL_TRANID)
{
if (BOOT_NORMAL_CLIENT_TYPE (tdes->client.client_type))
{
count++;
}
}
}
TR_TABLE_CS_EXIT (thread_p);
return count;
}
/*
* logtb_count_not_allowed_clients_in_maintenance_mode -
* count number of transaction indices
* connection not allowed client in maintenancemode.
* return: number of clients
*/
int
logtb_count_not_allowed_clients_in_maintenance_mode (THREAD_ENTRY * thread_p)
{
LOG_TDES *tdes;
int i, count;
TR_TABLE_CS_ENTER_READ_MODE (thread_p);
count = 0;
for (i = 0; i < log_Gl.trantable.num_total_indices; i++)
{
tdes = log_Gl.trantable.all_tdes[i];
if (tdes != NULL && tdes->trid != NULL_TRANID)
{
if (!BOOT_IS_ALLOWED_CLIENT_TYPE_IN_MT_MODE (tdes->client.get_host_name (), boot_Host_name,
tdes->client.client_type))
{
count++;
}
}
}
TR_TABLE_CS_EXIT (thread_p);
return count;
}
/*
* logtb_find_client_type - find client type of transaction index
*
* return: client type
*
* tran_index(in): Index of transaction
*/
int
logtb_find_client_type (int tran_index)
{
LOG_TDES *tdes;
tdes = LOG_FIND_TDES (tran_index);
if (tdes != NULL && tdes->trid != NULL_TRANID)
{
return tdes->client.client_type;
}
return -1;
}
/*
* logtb_find_client_name - find client name of transaction index
*
* return: client name
*
* tran_index(in): Index of transaction
*/
const char *
logtb_find_client_name (int tran_index)
{
LOG_TDES *tdes;
tdes = LOG_FIND_TDES (tran_index);
if (tdes != NULL && tdes->trid != NULL_TRANID)
{
return tdes->client.get_db_user ();
}
return NULL;
}
/*
* logtb_set_user_name - set client name of transaction index
*
* return:
*
* tran_index(in): Index of transaction
* user_name(in):
*/
void
logtb_set_user_name (int tran_index, const char *user_name)
{
LOG_TDES *tdes;
tdes = LOG_FIND_TDES (tran_index);
if (tdes != NULL && tdes->trid != NULL_TRANID)
{
// *INDENT-OFF*
tdes->client.set_user ((user_name) ? user_name : clientids::UNKNOWN_ID);
// *INDENT-ON*
}
return;
}
/*
* logtb_set_current_user_name - set client name of current transaction
*
* return:
*/
void
logtb_set_current_user_name (THREAD_ENTRY * thread_p, const char *user_name)
{
logtb_set_user_name (LOG_FIND_THREAD_TRAN_INDEX (thread_p), user_name);
}
/*
* logtb_set_current_user_active() - set active state of current user
*
* return:
* thread_p(in):
* is_user_active(in):
*/
void
logtb_set_current_user_active (THREAD_ENTRY * thread_p, bool is_user_active)
{
int tran_index;
LOG_TDES *tdes;
tran_index = LOG_FIND_THREAD_TRAN_INDEX (thread_p);
tdes = LOG_FIND_TDES (tran_index);
if (tdes)
{
tdes->is_user_active = is_user_active;
}
}
/*
* logtb_find_client_hostname - find client hostname of transaction index
*
* return: client hostname
*
* tran_index(in): Index of transaction
*/
const char *
logtb_find_client_hostname (int tran_index)
{
LOG_TDES *tdes;
tdes = LOG_FIND_TDES (tran_index);
if (tdes != NULL && tdes->trid != NULL_TRANID)
{
return tdes->client.get_host_name ();
}
return NULL;
}
/*
* logtb_find_client_name_host_pid - find client identifiers(user_name,
* host_name, host_pid) OF TRANSACTION INDEX
*
* return: NO_ERROR if all OK, ER_ status otherwise
*
* tran_index(in): Index of transaction
* client_prog_name(out): Name of the client program
* client_user_name(out): Name of the client user
* client_host_name(out): Name of the client host
* client_pid(out): Identifier of the process of the host where the client
* client transaction runs.
*
* Note: Find the client user name, host name, and process identifier
* associated with the given transaction index.
*
* The above pointers are valid until the client is unregister.
*/
int
logtb_find_client_name_host_pid (int tran_index, const char **client_prog_name, const char **client_user_name,
const char **client_host_name, int *client_pid)
{
LOG_TDES *tdes; /* Transaction descriptor */
tdes = LOG_FIND_TDES (tran_index);
if (tdes == NULL || tdes->trid == NULL_TRANID)
{
// *INDENT-OFF*
*client_prog_name = clientids::UNKNOWN_ID;
*client_user_name = clientids::UNKNOWN_ID;
*client_host_name = clientids::UNKNOWN_ID;
// *INDENT-ON*
*client_pid = -1;
return ER_FAILED;
}
*client_prog_name = tdes->client.get_program_name ();
*client_user_name = tdes->client.get_db_user ();
*client_host_name = tdes->client.get_host_name ();
*client_pid = tdes->client.process_id;
return NO_ERROR;
}
#if defined (SERVER_MODE)
/* logtb_find_client_tran_name_host_pid - same as logtb_find_client_name_host_pid, but also gets tran_index.
*/
int
logtb_find_client_tran_name_host_pid (int &tran_index, const char **client_prog_name, const char **client_user_name,
const char **client_host_name, int *client_pid)
{
tran_index = logtb_get_current_tran_index ();
return logtb_find_client_name_host_pid (tran_index, client_prog_name, client_user_name, client_host_name, client_pid);
}
#endif // SERVER_MODE
/*
* logtb_find_client_ids - find client identifiers OF TRANSACTION INDEX
*
* return: NO_ERROR if all OK, ER_ status otherwise
*
* tran_index(in): Index of transaction
* client_info(out): pointer to CLIENTIDS structure
*
*/
int
logtb_get_client_ids (int tran_index, CLIENTIDS * client_info)
{
LOG_TDES *tdes;
tdes = LOG_FIND_TDES (tran_index);
if (tdes == NULL || tdes->trid == NULL_TRANID)
{
return ER_FAILED;
}
*client_info = tdes->client;
return NO_ERROR;
}
/*
* xlogtb_get_pack_tran_table - return transaction info stored on transaction table
*
* return: NO_ERROR if all OK, ER status otherwise
*
* buffer_p(in/out): returned buffer poitner
* size_p(in/out): returned buffer size
*
* Note: This is a support function which is used mainly for the
* killtran utility. It returns a variety of client information
* of transactions. This will be displayed by killtran so that
* the user can select which transaction id needs to be aborted.
*
* The buffer is allocated using malloc and must be freed by the
* caller.
*/
int
xlogtb_get_pack_tran_table (THREAD_ENTRY * thread_p, char **buffer_p, int *size_p, int include_query_exec_info)
{
int error_code = NO_ERROR;
int num_clients = 0, num_clients_packed = 0;
int i;
int size;
char *buffer, *ptr;
LOG_TDES *tdes; /* Transaction descriptor */
int num_total_indices;
#if defined(SERVER_MODE)
INT64 current_msec = 0;
TRAN_QUERY_EXEC_INFO *query_exec_info = NULL;
XASL_CACHE_ENTRY *ent = NULL;
#endif
/* Note, we'll be in a critical section while we gather the data but the section ends as soon as we return the data.
* This means that the transaction table can change after the information is used. */
TR_TABLE_CS_ENTER_READ_MODE (thread_p);
num_total_indices = NUM_TOTAL_TRAN_INDICES;
#if defined(SERVER_MODE)
if (include_query_exec_info)
{
query_exec_info = (TRAN_QUERY_EXEC_INFO *) calloc (num_total_indices, sizeof (TRAN_QUERY_EXEC_INFO));
if (query_exec_info == NULL)
{
error_code = ER_OUT_OF_VIRTUAL_MEMORY;
goto error;
}
current_msec = log_get_clock_msec ();
}
#endif
size = OR_INT_SIZE; /* Number of client transactions */
/* Find size of needed buffer */
for (i = 0; i < num_total_indices; i++)
{
tdes = log_Gl.trantable.all_tdes[i];
if (tdes == NULL || tdes->trid == NULL_TRANID || tdes->tran_index == LOG_SYSTEM_TRAN_INDEX)
{
/* The index is not assigned or is system transaction (no-client) */
continue;
}
size += (3 * OR_INT_SIZE /* tran index + tran state + process id */
+ OR_INT_SIZE + DB_ALIGN (LOG_USERNAME_MAX, INT_ALIGNMENT)
+ OR_INT_SIZE + DB_ALIGN (PATH_MAX, INT_ALIGNMENT)
+ OR_INT_SIZE + DB_ALIGN (L_cuserid, INT_ALIGNMENT)
+ OR_INT_SIZE + DB_ALIGN (CUB_MAXHOSTNAMELEN, INT_ALIGNMENT));
#if defined(SERVER_MODE)
if (include_query_exec_info)
{
if (tdes->query_start_time > 0)
{
query_exec_info[i].query_time = (float) (current_msec - tdes->query_start_time) / 1000.0f;
}
if (tdes->tran_start_time > 0)
{
query_exec_info[i].tran_time = (float) (current_msec - tdes->tran_start_time) / 1000.0f;
}
lock_get_lock_holder_tran_index (thread_p, &query_exec_info[i].wait_for_tran_index_string, tdes->tran_index,
tdes->waiting_for_res);
if (!XASL_ID_IS_NULL (&tdes->xasl_id))
{
/* retrieve query statement in the xasl_cache entry */
error_code = xcache_find_sha1 (thread_p, &tdes->xasl_id.sha1, XASL_CACHE_SEARCH_GENERIC, &ent, NULL);
if (error_code != NO_ERROR)
{
ASSERT_ERROR ();
goto error;
}
/* entry can be NULL, if xasl cache entry is deleted */
if (ent != NULL)
{
if (ent->sql_info.sql_hash_text != NULL)
{
char *sql = ent->sql_info.sql_hash_text;
if (qmgr_get_sql_id (thread_p, &query_exec_info[i].sql_id, sql, (int) strlen (sql)) != NO_ERROR)
{
goto error;
}
if (ent->sql_info.sql_user_text != NULL)
{
sql = ent->sql_info.sql_user_text;
}
/* copy query string */
query_exec_info[i].query_stmt = strdup (sql);
if (query_exec_info[i].query_stmt == NULL)
{
error_code = ER_OUT_OF_VIRTUAL_MEMORY;
goto error;
}
}
xcache_unfix (thread_p, ent);
ent = NULL;
}
/* structure copy */
XASL_ID_COPY (&query_exec_info[i].xasl_id, &tdes->xasl_id);
}
else
{
XASL_ID_SET_NULL (&query_exec_info[i].xasl_id);
if (tdes->query_start_time > 0 && tdes->ddl_sql_user_text)
{
query_exec_info[i].query_stmt = strdup (tdes->ddl_sql_user_text);
}
else
{
query_exec_info[i].query_stmt = NULL;
}
}
size += (2 * OR_FLOAT_SIZE /* query time + tran time */
+ or_packed_string_length (query_exec_info[i].wait_for_tran_index_string, NULL)
+ or_packed_string_length (query_exec_info[i].query_stmt, NULL)
+ or_packed_string_length (query_exec_info[i].sql_id, NULL) + OR_XASL_ID_SIZE);
}
#endif
num_clients++;
}
/* Now allocate the area and pack the information */
buffer = (char *) malloc (size);
if (buffer == NULL)
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, (size_t) size);
error_code = ER_OUT_OF_VIRTUAL_MEMORY;
goto error;
}
ptr = buffer;
ptr = or_pack_int (ptr, num_clients);
/* Find size of needed buffer */
for (i = 0; i < num_total_indices; i++)
{
tdes = log_Gl.trantable.all_tdes[i];
if (tdes == NULL || tdes->trid == NULL_TRANID || tdes->tran_index == LOG_SYSTEM_TRAN_INDEX)
{
/* The index is not assigned or is system transaction (no-client) */
continue;
}
ptr = or_pack_int (ptr, tdes->tran_index);
ptr = or_pack_int (ptr, tdes->state);
ptr = or_pack_int (ptr, tdes->client.process_id);
ptr = or_pack_string_with_length (ptr, tdes->client.get_db_user (), tdes->client.db_user.size ());
ptr = or_pack_string_with_length (ptr, tdes->client.get_program_name (), tdes->client.program_name.size ());
ptr = or_pack_string_with_length (ptr, tdes->client.get_login_name (), tdes->client.login_name.size ());
ptr = or_pack_string_with_length (ptr, tdes->client.get_host_name (), tdes->client.host_name.size ());
#if defined(SERVER_MODE)
if (include_query_exec_info)
{
ptr = or_pack_float (ptr, query_exec_info[i].query_time);
ptr = or_pack_float (ptr, query_exec_info[i].tran_time);
ptr = or_pack_string (ptr, query_exec_info[i].wait_for_tran_index_string);
ptr = or_pack_string (ptr, query_exec_info[i].query_stmt);
ptr = or_pack_string (ptr, query_exec_info[i].sql_id);
OR_PACK_XASL_ID (ptr, &query_exec_info[i].xasl_id);
}
#endif
num_clients_packed++;
assert (num_clients_packed <= num_clients);
assert (ptr <= buffer + size);
}
assert (num_clients_packed == num_clients);
assert (num_total_indices == NUM_TOTAL_TRAN_INDICES);
assert (ptr <= buffer + size);
*buffer_p = buffer;
*size_p = CAST_BUFLEN (ptr - buffer);
error:
TR_TABLE_CS_EXIT (thread_p);
#if defined(SERVER_MODE)
if (query_exec_info != NULL)
{
for (i = 0; i < num_total_indices; i++)
{
if (query_exec_info[i].wait_for_tran_index_string)
{
free_and_init (query_exec_info[i].wait_for_tran_index_string);
}
if (query_exec_info[i].query_stmt)
{
free_and_init (query_exec_info[i].query_stmt);
}
if (query_exec_info[i].sql_id)
{
free_and_init (query_exec_info[i].sql_id);
}
}
free_and_init (query_exec_info);
}
if (ent != NULL)
{
xcache_unfix (thread_p, ent);
}
#endif
return error_code;
}
/*
* logtb_find_current_client_type - find client type of current transaction
*
* return: client type
*/
int
logtb_find_current_client_type (THREAD_ENTRY * thread_p)
{
return logtb_find_client_type (LOG_FIND_THREAD_TRAN_INDEX (thread_p));
}
/*
* logtb_find_current_client_name - find client name of current transaction
*
* return: client name
*/
const char *
logtb_find_current_client_name (THREAD_ENTRY * thread_p)
{
return logtb_find_client_name (LOG_FIND_THREAD_TRAN_INDEX (thread_p));
}
/*
* logtb_find_current_client_hostname - find client hostname of current transaction
*
* return: client hostname
*/
const char *
logtb_find_current_client_hostname (THREAD_ENTRY * thread_p)
{
return logtb_find_client_hostname (LOG_FIND_THREAD_TRAN_INDEX (thread_p));
}
/*
* logtb_find_current_tran_lsa - find current transaction log sequence address
*
* return: LOG_LSA *
*/
LOG_LSA *
logtb_find_current_tran_lsa (THREAD_ENTRY * thread_p)
{
LOG_TDES *tdes; /* Transaction descriptor */
tdes = LOG_FIND_CURRENT_TDES (thread_p);
return ((tdes != NULL) ? &tdes->tail_lsa : NULL);
}
/*
* logtb_find_state - find the state of the transaction
*
* return: TRAN_STATE
*
* tran_index(in): transaction index
*/
TRAN_STATE
logtb_find_state (int tran_index)
{
LOG_TDES *tdes; /* Transaction descriptor */
tdes = LOG_FIND_TDES (tran_index);
if (tdes != NULL)
{
return tdes->state;
}
else
{
return TRAN_UNACTIVE_UNKNOWN;
}
}
/*
* xlogtb_reset_wait_msecs - reset future waiting times
*
* return: The old wait_msecs.
*
* wait_msecs(in): Wait for at least this number of milliseconds to acquire a lock
* before the transaction is timed out.
* A negative value (e.g., -1) means wait forever until a lock
* is granted or transaction is selected as a victim of a
* deadlock.
* A value of zero means do not wait at all, timeout immediately
* (in milliseconds)
*
* Note:Reset the default waiting time for the current transaction index(client).
*/
int
xlogtb_reset_wait_msecs (THREAD_ENTRY * thread_p, int wait_msecs)
{
LOG_TDES *tdes; /* Transaction descriptor */
int old_wait_msecs; /* The old waiting time to be returned */
int tran_index;
tran_index = LOG_FIND_THREAD_TRAN_INDEX (thread_p);
tdes = LOG_FIND_TDES (tran_index);
if (tdes == NULL)
{
er_set (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, ER_LOG_UNKNOWN_TRANINDEX, 1, tran_index);
return -1;
}
old_wait_msecs = tdes->wait_msecs;
tdes->wait_msecs = wait_msecs;
return old_wait_msecs;
}
/*
* logtb_find_wait_msecs - find waiting times for given transaction
*
* return: wait_msecs...
*
* tran_index(in): Index of transaction
*/
int
logtb_find_wait_msecs (int tran_index)
{
LOG_TDES *tdes; /* Transaction descriptor */
tdes = LOG_FIND_TDES (tran_index);
if (tdes != NULL)
{
return tdes->wait_msecs;
}
else
{
assert (false);
return 0;
}
}
/*
* logtb_find_interrupt -
*
* return :
*
* tran_index(in):
* interrupt(out):
*
*/
int
logtb_find_interrupt (int tran_index, bool * interrupt)
{
LOG_TDES *tdes;
assert (interrupt);
tdes = LOG_FIND_TDES (tran_index);
if (tdes == NULL || tdes->trid == NULL_TRANID)
{
return ER_FAILED;
}
else
{
*interrupt = tdes->interrupt ? true : false;
}
return NO_ERROR;
}
/*
* logtb_find_log_records_count - find log records count for given transaction
*
* return: num_log_records_written...
*
* tran_index(in): Index of transaction
*/
int
logtb_find_log_records_count (int tran_index)
{
LOG_TDES *tdes; /* Transaction descriptor */
tdes = LOG_FIND_TDES (tran_index);
if (tdes != NULL)
{
return tdes->num_log_records_written;
}
else
{
return 0;
}
}
/*
* xlogtb_reset_isolation - reset consistency of transaction
*
* return: error code.
*
* isolation(in): New Isolation level. One of the following:
* TRAN_SERIALIZABLE
* TRAN_REPEATABLE_READ
* TRAN_READ_COMMITTED
*
* Note:Reset the default isolation level for the current transaction index (client).
*
* Note/Warning: This function must be called when the current transaction has
* not been done any work (i.e, just after restart, commit, or
* abort), otherwise, its isolation behaviour will be undefined.
*/
int
xlogtb_reset_isolation (THREAD_ENTRY * thread_p, TRAN_ISOLATION isolation)
{
TRAN_ISOLATION old_isolation;
int error_code = NO_ERROR;
LOG_TDES *tdes; /* Transaction descriptor */
int tran_index;
tran_index = LOG_FIND_THREAD_TRAN_INDEX (thread_p);
tdes = LOG_FIND_TDES (tran_index);
if (IS_VALID_ISOLATION_LEVEL (isolation) && tdes != NULL)
{
old_isolation = tdes->isolation;
tdes->isolation = isolation;
}
else
{
er_set (ER_SYNTAX_ERROR_SEVERITY, ARG_FILE_LINE, ER_MVCC_LOG_INVALID_ISOLATION_LEVEL, 0);
error_code = ER_MVCC_LOG_INVALID_ISOLATION_LEVEL;
}
return error_code;
}
/*
* logtb_find_isolation - find the isolation level for given trans
*
* return: isolation
*
* tran_index(in):Index of transaction
*/
TRAN_ISOLATION
logtb_find_isolation (int tran_index)
{
LOG_TDES *tdes; /* Transaction descriptor */
tdes = LOG_FIND_TDES (tran_index);
if (tdes != NULL)
{
return tdes->isolation;
}
else
{
return TRAN_UNKNOWN_ISOLATION;
}
}
/*
* logtb_find_current_isolation - find the isolation level for current
* transaction
*
* return: isolation...
*
* Note: Find the isolation level for the current transaction
*/
TRAN_ISOLATION
logtb_find_current_isolation (THREAD_ENTRY * thread_p)
{
int tran_index;
tran_index = LOG_FIND_THREAD_TRAN_INDEX (thread_p);
return logtb_find_isolation (tran_index);
}
/*
* xlogtb_set_interrupt - indicate interrupt to a future caller of current
* transaction
*
* return: nothing
*
* set(in): true for set and false for clear
*
* Note: Set the interrupt falg for the current execution, so that the
* next caller obtains an interrupt.
*/
void
xlogtb_set_interrupt (THREAD_ENTRY * thread_p, int set)
{
logtb_set_tran_index_interrupt (thread_p, LOG_FIND_THREAD_TRAN_INDEX (thread_p), set);
}
/*
* logtb_set_tran_index_interrupt - indicate interrupt to a future caller for an
* specific transaction index
*
* return: false is returned when the tran_index is not associated
* with a transaction
*
* tran_index(in): Transaction index
* set(in): true for set and false for clear
*
* Note:Set the interrupt flag for the execution of the given transaction,
* so that the next caller obtains an interrupt.
*/
bool
logtb_set_tran_index_interrupt (THREAD_ENTRY * thread_p, int tran_index, bool set)
{
LOG_TDES *tdes; /* Transaction descriptor */
if (tran_index == LOG_SYSTEM_TRAN_INDEX)
{
#if defined (SERVER_MODE)
assert (false);
#endif // SERVER_MODE
return false;
}
/* get thread by tran_index, if thread_p is NULL */
if (!thread_p)
{
thread_p = logtb_find_thread_by_tran_index (tran_index);
}
if (log_Gl.trantable.area != NULL)
{
tdes = LOG_FIND_TDES (tran_index);
if (tdes != NULL && tdes->trid != NULL_TRANID)
{
if (tdes->interrupt != (int) set)
{
#if defined (HAVE_ATOMIC_BUILTINS)
tdes->interrupt = (int) set;
if (set == true)
{
ATOMIC_INC_32 (&log_Gl.trantable.num_interrupts, 1);
}
else
{
ATOMIC_INC_32 (&log_Gl.trantable.num_interrupts, -1);
}
#else
TR_TABLE_CS_ENTER (thread_p);
tdes->interrupt = set;
if (set == true)
{
log_Gl.trantable.num_interrupts++;
}
else
{
log_Gl.trantable.num_interrupts--;
}
TR_TABLE_CS_EXIT (thread_p);
#endif
}
if (set == true)
{
pgbuf_force_to_check_for_interrupts ();
er_set (ER_NOTIFICATION_SEVERITY, ARG_FILE_LINE, ER_INTERRUPTING, 1, tran_index);
/* collect stat, if thread_p is not NULL */
if (thread_p)
{
perfmon_inc_stat (thread_p, PSTAT_TRAN_NUM_INTERRUPTS);
}
// Only TT_WORKER threads use pl_session
if (thread_p && thread_p->type == TT_WORKER)
{
cubpl::session * session = cubpl::get_session ();
if (session)
{
session->set_interrupt (ER_INTERRUPTED);
}
}
}
return true;
}
}
return false;
}
/*
* logtb_is_interrupted_tdes -
*
* return:
*
* tdes(in/out):
* clear(in/out):
* continue_checking(out):
*
* Note:
*/
static bool
logtb_is_interrupted_tdes (THREAD_ENTRY * thread_p, LOG_TDES * tdes, bool clear, bool * continue_checking)
{
bool interrupt;
INT64 now;
#if !defined(SERVER_MODE)
struct timeval tv;
#else /* SERVER_MODE */
/* vacuum threads should not be interruptible (unless this is still recovery). */
assert (!BO_IS_SERVER_RESTARTED () || !VACUUM_IS_THREAD_VACUUM (thread_p));
#endif /* SERVER_MODE */
interrupt = tdes->interrupt;
if (!LOG_ISTRAN_ACTIVE (tdes))
{
interrupt = false;
if (log_Gl.trantable.num_interrupts > 0)
{
*continue_checking = true;
}
else
{
*continue_checking = false;
}
}
else if (interrupt == true)
{
if (clear)
{
tdes->interrupt = false;
#if !defined (HAVE_ATOMIC_BUILTINS)
TR_TABLE_CS_ENTER (thread_p);
log_Gl.trantable.num_interrupts--;
#else
ATOMIC_INC_32 (&log_Gl.trantable.num_interrupts, -1);
#endif
if (log_Gl.trantable.num_interrupts > 0)
{
*continue_checking = true;
}
else
{
*continue_checking = false;
}
#if !defined (HAVE_ATOMIC_BUILTINS)
TR_TABLE_CS_EXIT (thread_p);
#endif
}
cubpl::session * session = cubpl::get_session ();
if (session)
{
session->set_interrupt (ER_INTERRUPTED);
}
}
else if (interrupt == false && tdes->query_timeout > 0)
{
/* In order to prevent performance degradation, we use log_Clock_msec set by thread_log_clock_thread instead of
* calling gettimeofday here if the system supports atomic built-ins. */
#if defined(SERVER_MODE)
now = log_get_clock_msec ();
#else /* SERVER_MODE */
gettimeofday (&tv, NULL);
now = (tv.tv_sec * 1000LL) + (tv.tv_usec / 1000LL);
#endif /* !SERVER_MODE */
if (tdes->query_timeout < now)
{
er_log_debug (ARG_FILE_LINE,
"logtb_is_interrupted_tdes: timeout %lld milliseconds delayed (expected=%lld, now=%lld)",
now - tdes->query_timeout, tdes->query_timeout, now);
interrupt = true;
}
}
return interrupt;
}
/*
* logtb_is_interrupted - find if execution must be stopped due to
* an interrupt (^C)
*
* return:
*
* clear(in): true if the interrupt should be cleared.
* continue_checking(in): Set as a side effect to true if there are more
* interrupts to check or to false if there are not
* more interrupts.
*
* Note: Find if the current execution must be stopped due to an interrupt (^C). If clear is true, the interruption flag
* is cleared; This is the expected case, once someone is notified, we do not have to keep the flag on.
*
* If the transaction is not active, false is returned. For example, in the middle of an undo action, the
* transaction will not be interrupted. The recovery manager will interrupt the transaction at the end of the undo
* action... in this case the transaction will be partially aborted.
*/
bool
logtb_is_interrupted (THREAD_ENTRY * thread_p, bool clear, bool * continue_checking)
{
LOG_TDES *tdes; /* Transaction descriptor */
int tran_index;
if (log_Gl.trantable.area == NULL)
{
return false;
}
tran_index = LOG_FIND_THREAD_TRAN_INDEX (thread_p);
tdes = LOG_FIND_TDES (tran_index);
if (tdes == NULL)
{
return false;
}
return logtb_is_interrupted_tdes (thread_p, tdes, clear, continue_checking);
}
/*
* logtb_is_interrupted_tran - find if the execution of the given transaction
* must be stopped due to an interrupt (^C)
*
* return:
*
* clear(in): true if the interrupt should be cleared.
* continue_checking(in): Set as a side effect to true if there are more
* interrupts to check or to false if there are not
* more interrupts.
* tran_index(in):
*
* Note: Find if the execution of the given transaction must be stopped due to an interrupt (^C). If clear is true, the
* interruption flag is cleared; This is the expected case, once someone is notified, we do not have to keep the
* flag on.
* This function is called to see if a transaction that is waiting (e.g., suspended on a lock) on an event must
* be interrupted.
*/
bool
logtb_is_interrupted_tran (THREAD_ENTRY * thread_p, bool clear, bool * continue_checking, int tran_index)
{
LOG_TDES *tdes; /* Transaction descriptor */
tdes = LOG_FIND_TDES (tran_index);
if (log_Gl.trantable.area == NULL || tdes == NULL)
{
return false;
}
return logtb_is_interrupted_tdes (thread_p, tdes, clear, continue_checking);
}
/*
* xlogtb_set_suppress_repl_on_transaction - set or unset suppress_replication flag
* on transaction descriptor
*
* return: nothing
*
* set(in): non-zero to set, zero to unset
*/
void
xlogtb_set_suppress_repl_on_transaction (THREAD_ENTRY * thread_p, int set)
{
logtb_set_suppress_repl_on_transaction (thread_p, LOG_FIND_THREAD_TRAN_INDEX (thread_p), set);
}
/*
* logtb_set_suppress_repl_on_transaction - set or unset suppress_replication flag
* on transaction descriptor
*
* return: false is returned when the tran_index is not associated
* with a transaction
*
* tran_index(in): Transaction index
* set(in): non-zero to set, zero to unset
*/
bool
logtb_set_suppress_repl_on_transaction (THREAD_ENTRY * thread_p, int tran_index, int set)
{
LOG_TDES *tdes; /* Transaction descriptor */
if (log_Gl.trantable.area != NULL)
{
tdes = LOG_FIND_TDES (tran_index);
if (tdes != NULL && tdes->trid != NULL_TRANID)
{
if (tdes->suppress_replication != set)
{
tdes->suppress_replication = set;
}
return true;
}
}
return false;
}
/*
* logtb_is_active - is transaction active ?
*
* return:
*
* trid(in): Transaction identifier
*
* Note: Find if given transaction is an active one. This function
* execute a sequential search in the transaction table to find
* out the given transaction. The function bypasses the search if
* the given trid is current transaction.
*
* Note: The assumption of this function is that the transaction table
* is not very big and that most of the time the search is
* avoided. if this assumption becomes false, we may need to
* define a hash table from trid to tdes to speed up the search.
*/
bool
logtb_is_active (THREAD_ENTRY * thread_p, TRANID trid)
{
int i;
LOG_TDES *tdes; /* Transaction descriptor */
bool active = false;
int tran_index;
if (!LOG_ISRESTARTED ())
{
return false;
}
/* Avoid searching as much as possible */
tran_index = LOG_FIND_THREAD_TRAN_INDEX (thread_p);
tdes = LOG_FIND_TDES (tran_index);
if (tdes != NULL && tdes->trid == trid)
{
active = LOG_ISTRAN_ACTIVE (tdes);
}
else
{
TR_TABLE_CS_ENTER_READ_MODE (thread_p);
/* Search the transaction table for such transaction */
for (i = 0; i < NUM_TOTAL_TRAN_INDICES; i++)
{
tdes = log_Gl.trantable.all_tdes[i];
if (tdes != NULL && tdes->trid != NULL_TRANID && tdes->trid == trid)
{
active = LOG_ISTRAN_ACTIVE (tdes);
break;
}
}
TR_TABLE_CS_EXIT (thread_p);
}
return active;
}
/*
* logtb_is_current_active - is current transaction active ?
*
* return:
*
* Note: Find if the current transaction is an active one.
*/
bool
logtb_is_current_active (THREAD_ENTRY * thread_p)
{
LOG_TDES *tdes; /* Transaction descriptor */
int tran_index;
tran_index = LOG_FIND_THREAD_TRAN_INDEX (thread_p);
tdes = LOG_FIND_TDES (tran_index);
if (tdes != NULL && LOG_ISTRAN_ACTIVE (tdes))
{
return true;
}
else
{
assert (tdes == NULL || tdes->is_active_worker_transaction ());
return false;
}
}
/*
* logtb_istran_finished - is transaction finished?
*
* return:
*
* trid(in): Transaction identifier
*
* Note: Find if given transaction exists. That is, find if the
* transaction has completely finished its execution.
* Note that this function differs from log_istran_active in
* that a transaction in commit or abort state is not active but
* it is still alive (i.e., has not done completely).
* This function execute a sequential search in the transaction
* table to find out the given transaction. The function bypasses
* the search if the given trid is current transaction.
*
* The assumption of this function is that the transaction table
* is not very big and that most of the time the search is
* avoided. if this assumption becomes false, we may need to
* define a hash table from trid to tdes to speed up the search.
*/
bool
logtb_istran_finished (THREAD_ENTRY * thread_p, TRANID trid)
{
int i;
LOG_TDES *tdes; /* Transaction descriptor */
bool active = true;
int tran_index;
/* Avoid searching as much as possible */
tran_index = LOG_FIND_THREAD_TRAN_INDEX (thread_p);
tdes = LOG_FIND_TDES (tran_index);
if (tdes != NULL && tdes->trid == trid)
{
active = false;
}
else
{
TR_TABLE_CS_ENTER_READ_MODE (thread_p);
/* Search the transaction table for such transaction */
for (i = 0; i < NUM_TOTAL_TRAN_INDICES; i++)
{
tdes = log_Gl.trantable.all_tdes[i];
if (tdes != NULL && tdes->trid != NULL_TRANID && tdes->trid == trid)
{
active = false;
break;
}
}
TR_TABLE_CS_EXIT (thread_p);
}
return active;
}
/*
* logtb_has_updated - has transaction updated the database ?
*
* return:
*
*/
bool
logtb_has_updated (THREAD_ENTRY * thread_p)
{
LOG_TDES *tdes; /* Transaction descriptor */
int tran_index;
tran_index = LOG_FIND_THREAD_TRAN_INDEX (thread_p);
tdes = LOG_FIND_TDES (tran_index);
if (tdes != NULL && !LSA_ISNULL (&tdes->tail_lsa))
{
return true;
}
else
{
return false;
}
}
/*
* logtb_disable_update -
* return: none
*/
void
logtb_disable_update (THREAD_ENTRY * thread_p)
{
db_Disable_modifications = 1;
er_log_debug (ARG_FILE_LINE, "logtb_disable_update: db_Disable_modifications = %d\n", db_Disable_modifications);
}
/*
* logtb_enable_update -
* return: none
*/
void
logtb_enable_update (THREAD_ENTRY * thread_p)
{
if (prm_get_bool_value (PRM_ID_READ_ONLY_MODE) == false)
{
db_Disable_modifications = 0;
er_log_debug (ARG_FILE_LINE, "logtb_enable_update: db_Disable_modifications = %d\n", db_Disable_modifications);
}
}
/*
* logtb_set_to_system_tran_index - set to tran index system
*
* return: nothing
*
* Note: The current log_tran_index is set to the system transaction index.
*/
void
logtb_set_to_system_tran_index (THREAD_ENTRY * thread_p)
{
LOG_SET_CURRENT_TRAN_INDEX (thread_p, LOG_SYSTEM_TRAN_INDEX);
}
/*
* logtb_tran_free_update_stats () - Free logged list of update statistics.
*
* return : Void.
* log_upd_stats (in) : List of logged update statistics records.
*/
static void
logtb_tran_free_update_stats (LOG_TRAN_UPDATE_STATS * log_upd_stats)
{
LOG_TRAN_CLASS_COS_CHUNK *cos_chunk = NULL, *next_cos_chunk = NULL;
LOG_TRAN_BTID_UNIQUE_STATS_CHUNK *stats_chunk = NULL, *next_stats_chunk = NULL;
logtb_tran_clear_update_stats (log_upd_stats);
/* free count optimization structure */
cos_chunk = log_upd_stats->cos_first_chunk;
if (cos_chunk != NULL)
{
for (; cos_chunk != NULL; cos_chunk = next_cos_chunk)
{
next_cos_chunk = cos_chunk->next_chunk;
free (cos_chunk);
}
log_upd_stats->cos_first_chunk = NULL;
log_upd_stats->cos_current_chunk = NULL;
log_upd_stats->cos_count = 0;
}
if (log_upd_stats->classes_cos_hash != NULL)
{
mht_destroy (log_upd_stats->classes_cos_hash);
log_upd_stats->classes_cos_hash = NULL;
}
/* free unique statistics structure */
stats_chunk = log_upd_stats->stats_first_chunk;
if (stats_chunk != NULL)
{
for (; stats_chunk != NULL; stats_chunk = next_stats_chunk)
{
next_stats_chunk = stats_chunk->next_chunk;
free (stats_chunk);
}
log_upd_stats->stats_first_chunk = NULL;
log_upd_stats->stats_current_chunk = NULL;
log_upd_stats->stats_count = 0;
}
if (log_upd_stats->unique_stats_hash != NULL)
{
mht_destroy (log_upd_stats->unique_stats_hash);
log_upd_stats->unique_stats_hash = NULL;
}
}
/*
* logtb_tran_clear_update_stats () - Clear logged update statistics.
* Entries are not actually freed, they are
* appended to a list of free entries ready
* to be reused.
*
* return : Void.
* log_upd_stats (in) : Pointer to update statistics log.
*/
static void
logtb_tran_clear_update_stats (LOG_TRAN_UPDATE_STATS * log_upd_stats)
{
/* clear count optimization structure */
log_upd_stats->cos_current_chunk = log_upd_stats->cos_first_chunk;
log_upd_stats->cos_count = 0;
if (log_upd_stats->classes_cos_hash != NULL)
{
mht_clear (log_upd_stats->classes_cos_hash, NULL, NULL);
}
/* clear unique statistics structure */
log_upd_stats->stats_current_chunk = log_upd_stats->stats_first_chunk;
log_upd_stats->stats_count = 0;
if (log_upd_stats->unique_stats_hash != NULL)
{
mht_clear (log_upd_stats->unique_stats_hash, NULL, NULL);
}
}
/*
* logtb_tran_btid_hash_func() - Hash function for BTIDs
* return: hash value
* key(in): BTID to hash
* ht_size(in): Size of hash table
*/
static unsigned int
logtb_tran_btid_hash_func (const void *key, const unsigned int ht_size)
{
return ((BTID *) key)->vfid.fileid % ht_size;
}
/*
* logtb_tran_btid_hash_func() - Comparison function for BTIDs (equal or not)
* return: 0 not equal, 1 otherwise
* key1(in): left key
* key2(in): right key
*/
static int
logtb_tran_btid_hash_cmp_func (const void *key1, const void *key2)
{
return BTID_IS_EQUAL ((BTID *) key1, (BTID *) key2);
}
/*
* logtb_tran_create_btid_unique_stats () - allocates memory and initializes
* statistics associated with btid
*
* return : The address of newly created statistics structure or NULL
* in case of error.
* thread_p(in) :
* btid (in) : Id of unique index for which the statistics will be
* created
*/
static LOG_TRAN_BTID_UNIQUE_STATS *
logtb_tran_create_btid_unique_stats (THREAD_ENTRY * thread_p, const BTID * btid)
{
LOG_TRAN_BTID_UNIQUE_STATS *unique_stats = NULL;
LOG_TDES *tdes = LOG_FIND_TDES (LOG_FIND_THREAD_TRAN_INDEX (thread_p));
if (btid == NULL)
{
assert (false);
return NULL;
}
if (tdes->log_upd_stats.stats_count % TRAN_UNIQUE_STATS_CHUNK_SIZE == 0)
{
LOG_TRAN_BTID_UNIQUE_STATS_CHUNK *chunk = NULL;
if (tdes->log_upd_stats.stats_current_chunk != NULL
&& tdes->log_upd_stats.stats_current_chunk->next_chunk != NULL)
{
/* reuse the old chunk */
chunk = tdes->log_upd_stats.stats_current_chunk->next_chunk;
}
else
{
/* if the entire allocated space was exhausted then alloc a new chunk */
int size =
sizeof (LOG_TRAN_BTID_UNIQUE_STATS_CHUNK) + (TRAN_UNIQUE_STATS_CHUNK_SIZE -
1) * sizeof (LOG_TRAN_BTID_UNIQUE_STATS);
chunk = (LOG_TRAN_BTID_UNIQUE_STATS_CHUNK *) malloc (size);
if (chunk == NULL)
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, size);
return NULL;
}
if (tdes->log_upd_stats.stats_first_chunk == NULL)
{
tdes->log_upd_stats.stats_first_chunk = chunk;
}
else
{
tdes->log_upd_stats.stats_current_chunk->next_chunk = chunk;
}
chunk->next_chunk = NULL;
}
tdes->log_upd_stats.stats_current_chunk = chunk;
}
/* get a new entry */
unique_stats =
&tdes->log_upd_stats.stats_current_chunk->buffer[tdes->log_upd_stats.stats_count++ % TRAN_UNIQUE_STATS_CHUNK_SIZE];
BTID_COPY (&unique_stats->btid, btid);
unique_stats->deleted = false;
/* init transaction local statistics */
unique_stats->tran_stats.num_keys = 0;
unique_stats->tran_stats.num_oids = 0;
unique_stats->tran_stats.num_nulls = 0;
/* init global statistics */
unique_stats->global_stats.num_keys = -1;
unique_stats->global_stats.num_oids = -1;
unique_stats->global_stats.num_nulls = -1;
/* Store the new entry in hash */
if (mht_put (tdes->log_upd_stats.unique_stats_hash, &unique_stats->btid, unique_stats) == NULL)
{
return NULL;
}
return unique_stats;
}
/*
* logtb_tran_create_class_cos () - creates a new count optimization state entry
* for a class
*
* return : return the adddress of newly created entry
* thread_p(in) :
* class_oid (in) : OID of the class for which the entry will be created
*/
static LOG_TRAN_CLASS_COS *
logtb_tran_create_class_cos (THREAD_ENTRY * thread_p, const OID * class_oid)
{
LOG_TDES *tdes = LOG_FIND_TDES (LOG_FIND_THREAD_TRAN_INDEX (thread_p));
LOG_TRAN_CLASS_COS *entry = NULL;
if (tdes->log_upd_stats.cos_count % COS_CLASSES_CHUNK_SIZE == 0)
{
LOG_TRAN_CLASS_COS_CHUNK *chunk = NULL;
if (tdes->log_upd_stats.cos_current_chunk != NULL && tdes->log_upd_stats.cos_current_chunk->next_chunk != NULL)
{
/* reuse the old chunk */
chunk = tdes->log_upd_stats.cos_current_chunk->next_chunk;
}
else
{
/* if the entire allocated space was exhausted then alloc a new chunk */
int size = sizeof (LOG_TRAN_CLASS_COS_CHUNK) + (COS_CLASSES_CHUNK_SIZE - 1) * sizeof (LOG_TRAN_CLASS_COS);
chunk = (LOG_TRAN_CLASS_COS_CHUNK *) malloc (size);
if (chunk == NULL)
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, size);
return NULL;
}
if (tdes->log_upd_stats.cos_first_chunk == NULL)
{
tdes->log_upd_stats.cos_first_chunk = chunk;
}
else
{
tdes->log_upd_stats.cos_current_chunk->next_chunk = chunk;
}
chunk->next_chunk = NULL;
}
tdes->log_upd_stats.cos_current_chunk = chunk;
}
/* get a new entry */
entry = &tdes->log_upd_stats.cos_current_chunk->buffer[tdes->log_upd_stats.cos_count++ % COS_CLASSES_CHUNK_SIZE];
/* init newly created entry */
COPY_OID (&entry->class_oid, class_oid);
entry->count_state = COS_NOT_LOADED;
if (mht_put (tdes->log_upd_stats.classes_cos_hash, &entry->class_oid, entry) == NULL)
{
return NULL;
}
return entry;
}
/*
* logtb_tran_find_class_cos () - searches the hash of count optimization states
* for a specific class oid
*
* return : address of found (or newly created) entry or null
* otherwise
* thread_p(in) :
* class_oid (in) : OID of the class for which the entry will be created
* create(in) : true if the caller needs a new entry to be created if not
* found an already existing one
*/
LOG_TRAN_CLASS_COS *
logtb_tran_find_class_cos (THREAD_ENTRY * thread_p, const OID * class_oid, bool create)
{
LOG_TDES *tdes = LOG_FIND_TDES (LOG_FIND_THREAD_TRAN_INDEX (thread_p));
LOG_TRAN_CLASS_COS *entry = NULL;
assert (tdes != NULL && class_oid != NULL);
/* search */
entry = (LOG_TRAN_CLASS_COS *) mht_get (tdes->log_upd_stats.classes_cos_hash, class_oid);
if (entry == NULL && create)
{
/* create if not found */
entry = logtb_tran_create_class_cos (thread_p, class_oid);
}
return entry;
}
/*
* logtb_tran_find_btid_stats () - searches the hash of statistics for a
* specific index.
*
* return : address of found (or newly created) statistics or null
* otherwise
* thread_p(in) :
* btid (in) : index id to be searched
* create(in) : true if the caller needs a new entry to be created if not
* found an already existing one
*/
LOG_TRAN_BTID_UNIQUE_STATS *
logtb_tran_find_btid_stats (THREAD_ENTRY * thread_p, const BTID * btid, bool create)
{
LOG_TRAN_BTID_UNIQUE_STATS *unique_stats = NULL;
LOG_TDES *tdes = LOG_FIND_TDES (LOG_FIND_THREAD_TRAN_INDEX (thread_p));
if (btid == NULL)
{
return NULL;
}
/* search */
unique_stats = (LOG_TRAN_BTID_UNIQUE_STATS *) mht_get (tdes->log_upd_stats.unique_stats_hash, btid);
if (unique_stats == NULL && create)
{
/* create if not found */
unique_stats = logtb_tran_create_btid_unique_stats (thread_p, btid);
}
return unique_stats;
}
/*
* logtb_tran_update_btid_unique_stats () - updates statistics associated with
* the given btid
*
* return : error code or NO_ERROR
* thread_p(in) :
* btid (in) : index id to be searched
* n_keys(in) : number of keys to be added to statistics
* n_oids(in) : number of oids to be added to statistics
* n_nulls(in) : number of nulls to be added to statistics
*
* Note: the statistics are searched and created if they not exist.
*/
int
logtb_tran_update_btid_unique_stats (THREAD_ENTRY * thread_p, const BTID * btid, long long n_keys, long long n_oids,
long long n_nulls)
{
/* search and create if not found */
LOG_TRAN_BTID_UNIQUE_STATS *unique_stats = logtb_tran_find_btid_stats (thread_p, btid, true);
if (unique_stats == NULL)
{
return ER_FAILED;
}
/* update statistics */
unique_stats->tran_stats.num_keys += n_keys;
unique_stats->tran_stats.num_oids += n_oids;
unique_stats->tran_stats.num_nulls += n_nulls;
return NO_ERROR;
}
/*
* logtb_tran_update_unique_stats () - updates statistics associated with
* the given class and btid
*
* return : error code or NO_ERROR
* thread_p(in) :
* btid (in) : B-tree id to be searched
* n_keys(in) : number of keys to be added to statistics
* n_oids(in) : number of oids to be added to statistics
* n_nulls(in) : number of nulls to be added to statistics
* write_to_log : if true then new statistics wil be written to log
*
* Note: the statistics are searched and created if they not exist.
*/
int
logtb_tran_update_unique_stats (THREAD_ENTRY * thread_p, const BTID * btid, long long n_keys, long long n_oids,
long long n_nulls, bool write_to_log)
{
int error = NO_ERROR;
/* update statistics */
error = logtb_tran_update_btid_unique_stats (thread_p, btid, n_keys, n_oids, n_nulls);
if (error != NO_ERROR)
{
return error;
}
if (write_to_log)
{
/* log statistics */
char undo_rec_buf[3 * OR_BIGINT_SIZE + OR_BTID_ALIGNED_SIZE + MAX_ALIGNMENT];
char redo_rec_buf[3 * OR_BIGINT_SIZE + OR_BTID_ALIGNED_SIZE + MAX_ALIGNMENT];
RECDES undo_rec, redo_rec;
undo_rec.area_size = ((3 * OR_BIGINT_SIZE) + OR_BTID_ALIGNED_SIZE);
undo_rec.data = PTR_ALIGN (undo_rec_buf, MAX_ALIGNMENT);
redo_rec.area_size = ((3 * OR_BIGINT_SIZE) + OR_BTID_ALIGNED_SIZE);
redo_rec.data = PTR_ALIGN (redo_rec_buf, MAX_ALIGNMENT);
btree_rv_mvcc_save_increments (btid, -n_keys, -n_oids, -n_nulls, &undo_rec);
/* todo: remove me. redo has no use */
/*btree_rv_mvcc_save_increments (btid, n_keys, n_oids, n_nulls, &redo_rec);
log_append_undoredo_data2 (thread_p, RVBT_MVCC_INCREMENTS_UPD, NULL, NULL, -1, undo_rec.length, redo_rec.length,
undo_rec.data, redo_rec.data); */
log_append_undo_data2 (thread_p, RVBT_MVCC_INCREMENTS_UPD, NULL, NULL, NULL_OFFSET, undo_rec.length,
undo_rec.data);
}
return error;
}
// *INDENT-OFF*
int
logtb_tran_update_unique_stats (THREAD_ENTRY * thread_p, const BTID &btid, const btree_unique_stats &ustats,
bool write_to_log)
{
if (ustats.is_zero ())
{
return NO_ERROR;
}
return logtb_tran_update_unique_stats (thread_p, &btid, ustats.get_key_count (), ustats.get_row_count (),
ustats.get_null_count (), write_to_log);
}
int
logtb_tran_update_unique_stats (THREAD_ENTRY * thread_p, const multi_index_unique_stats &multi_stats, bool write_to_log)
{
int error = NO_ERROR;
for (const auto &it : multi_stats.get_map ())
{
error = logtb_tran_update_unique_stats (thread_p, it.first, it.second, write_to_log);
if (error != NO_ERROR)
{
ASSERT_ERROR ();
return error;
}
}
return NO_ERROR;
}
// *INDENT-ON*
/*
* logtb_tran_update_delta_hash_func () - updates statistics associated with
* the given btid by local statistics
*
* return : error code or NO_ERROR
* thread_p(in) :
* data(in) : unique statistics
* args(in) : not used
*
* Note: This is a function that is called for each element during the hash
* iteration.
*/
static int
logtb_tran_update_delta_hash_func (THREAD_ENTRY * thread_p, void *data, void *args)
{
LOG_TRAN_BTID_UNIQUE_STATS *unique_stats = (LOG_TRAN_BTID_UNIQUE_STATS *) data;
int error_code = NO_ERROR;
if (unique_stats->deleted)
{
/* ignore if deleted */
return NO_ERROR;
}
error_code =
logtb_update_global_unique_stats_by_delta (thread_p, &unique_stats->btid, unique_stats->tran_stats.num_oids,
unique_stats->tran_stats.num_nulls, unique_stats->tran_stats.num_keys,
true);
return error_code;
}
/*
* logtb_tran_update_all_global_unique_stats () - update global statistics
* by local statistics for all
* indexes found in transaction's
* unique statistics hash
*
* return : error code or NO_ERROR
* thread_p(in) :
*
* Note: this function must be called at the end of transaction (commit)
*/
int
logtb_tran_update_all_global_unique_stats (THREAD_ENTRY * thread_p)
{
LOG_TDES *tdes = LOG_FIND_TDES (LOG_FIND_THREAD_TRAN_INDEX (thread_p));
int error_code = NO_ERROR;
bool old_check_interrupt;
if (tdes == NULL)
{
return ER_FAILED;
}
/* We have to disable interrupt while reflecting unique stats. Please notice that the transaction is still in
* TRAN_ACTIVE state and it may be previously interrupted but user eventually issues commit. The transaction should
* successfully complete commit in spite of interrupt. */
old_check_interrupt = logtb_set_check_interrupt (thread_p, false);
error_code =
mht_map_no_key (thread_p, tdes->log_upd_stats.unique_stats_hash, logtb_tran_update_delta_hash_func, thread_p);
(void) logtb_set_check_interrupt (thread_p, old_check_interrupt);
return error_code;
}
/*
* logtb_tran_load_global_stats_func () - load global statistics into the
* current transaction for a given class.
*
* return : error code or NO_ERROR
* thread_p(in) :
* data(in) : count optimization state entry
* args(in) : not used
*
* Note: This function is called for each element of the count optimization
* states hash. If the statistics were successfully loaded then set state
* to COS_LOADED. In case of a partitioned class, statistics for each
* partition are loaded.
*/
static int
logtb_tran_load_global_stats_func (THREAD_ENTRY * thread_p, void *data, void *args)
{
int error_code = NO_ERROR, idx;
PRUNING_CONTEXT context;
LOG_TRAN_CLASS_COS *entry = NULL, *new_entry = NULL;
OR_CLASSREP *classrepr = NULL;
int classrepr_cacheindex = -1;
bool clear_pcontext = false;
entry = (LOG_TRAN_CLASS_COS *) data;
if (entry->count_state != COS_TO_LOAD)
{
return NO_ERROR;
}
/* get class representation to find partition information */
classrepr = heap_classrepr_get (thread_p, &entry->class_oid, NULL, NULL_REPRID, &classrepr_cacheindex);
if (classrepr == NULL)
{
goto cleanup;
}
if (classrepr->has_partition_info > 0)
{
partition_init_pruning_context (&context);
clear_pcontext = true;
/* In case of partitioned class load statistics for each partition */
error_code = partition_load_pruning_context (thread_p, &entry->class_oid, DB_PARTITIONED_CLASS, &context);
if (error_code != NO_ERROR)
{
goto cleanup;
}
}
if (classrepr->has_partition_info > 0 && context.count > 0)
{
for (idx = 0; idx < context.count; idx++)
{
if (OID_ISNULL (&context.partitions[idx].class_oid))
{
continue;
}
new_entry = logtb_tran_find_class_cos (thread_p, &context.partitions[idx].class_oid, true);
if (new_entry == NULL)
{
error_code = ER_FAILED;
goto cleanup;
}
error_code = logtb_create_unique_stats_from_repr (thread_p, &new_entry->class_oid);
if (error_code != NO_ERROR)
{
goto cleanup;
}
new_entry->count_state = COS_LOADED;
}
}
error_code = logtb_create_unique_stats_from_repr (thread_p, &entry->class_oid);
if (error_code != NO_ERROR)
{
goto cleanup;
}
entry->count_state = COS_LOADED;
cleanup:
if (clear_pcontext == true)
{
partition_clear_pruning_context (&context);
}
if (classrepr != NULL)
{
heap_classrepr_free_and_init (classrepr, &classrepr_cacheindex);
}
return error_code;
}
/*
* logtb_load_global_statistics_to_tran () - load global statistics into the
* current transaction for all classes
* with COS_TO_LOAD count optimization
* state.
*
* return : error code or NO_ERROR
* thread_p(in) :
*
* Note: The statistics will be loaded only for classes that have COS_TO_LOAD
* count optimization state. This function is used when a snapshot is
* taken.
*/
int
logtb_load_global_statistics_to_tran (THREAD_ENTRY * thread_p)
{
int error_code = NO_ERROR;
LOG_TDES *tdes = LOG_FIND_TDES (LOG_FIND_THREAD_TRAN_INDEX (thread_p));
if (tdes == NULL)
{
return ER_FAILED;
}
error_code =
mht_map_no_key (thread_p, tdes->log_upd_stats.classes_cos_hash, logtb_tran_load_global_stats_func, thread_p);
if (error_code != NO_ERROR)
{
return error_code;
}
return error_code;
}
/*
* logtb_invalidate_snapshot_data () - Make sure MVCC is invalidated.
*
* return : Void.
* thread_p (in) : Thread entry.
*/
int
logtb_invalidate_snapshot_data (THREAD_ENTRY * thread_p)
{
/* Get transaction descriptor */
LOG_TDES *tdes = LOG_FIND_TDES (LOG_FIND_THREAD_TRAN_INDEX (thread_p));
if (tdes == NULL || tdes->isolation >= TRAN_REPEATABLE_READ)
{
/* Nothing to do */
return NO_ERROR;
}
if (tdes->mvccinfo.snapshot.valid)
{
/* Invalidate snapshot */
tdes->mvccinfo.snapshot.valid = false;
logtb_tran_reset_count_optim_state (thread_p);
}
return NO_ERROR;
}
/*
* xlogtb_get_mvcc_snapshot () - Make sure snapshot is generated.
*
* return : Void.
* thread_p (in) : Thread entry.
*/
int
xlogtb_get_mvcc_snapshot (THREAD_ENTRY * thread_p)
{
/* Get transaction descriptor */
MVCC_SNAPSHOT *snapshot = logtb_get_mvcc_snapshot (thread_p);
int error_code = NO_ERROR;
if (snapshot == NULL)
{
ASSERT_ERROR_AND_SET (error_code);
}
return error_code;
}
/*
* logtb_find_current_mvccid - find current transaction MVCC id
*
* return: MVCCID
*
* thread_p(in):
*/
MVCCID
logtb_find_current_mvccid (THREAD_ENTRY * thread_p)
{
LOG_TDES *tdes;
MVCCID id = MVCCID_NULL;
tdes = LOG_FIND_TDES (LOG_FIND_THREAD_TRAN_INDEX (thread_p));
if (tdes != NULL)
{
if (!tdes->mvccinfo.sub_ids.empty ())
{
id = tdes->mvccinfo.sub_ids.back ();
}
else
{
id = tdes->mvccinfo.id;
}
}
return id;
}
/*
* logtb_get_current_mvccid - return current transaction MVCC id. Assign
* a new ID if not previously set.
*
* return: current MVCCID
*
* thread_p(in):
*/
MVCCID
logtb_get_current_mvccid (THREAD_ENTRY * thread_p)
{
LOG_TDES *tdes = LOG_FIND_TDES (LOG_FIND_THREAD_TRAN_INDEX (thread_p));
MVCC_INFO *curr_mvcc_info = &tdes->mvccinfo;
#if defined (SA_MODE)
/* We shouldn't be here */
assert (false);
#endif /* SA_MODE */
assert (tdes != NULL && curr_mvcc_info != NULL);
if (MVCCID_IS_VALID (curr_mvcc_info->id) == false)
{
curr_mvcc_info->id = log_Gl.mvcc_table.get_new_mvccid ();
}
if (!tdes->mvccinfo.sub_ids.empty ())
{
return tdes->mvccinfo.sub_ids.back ();
}
return curr_mvcc_info->id;
}
/*
* logtb_is_current_mvccid - check whether given mvccid is current mvccid
*
* return: bool
*
* thread_p(in): thread entry
* mvccid(in): MVCC id
*/
bool
logtb_is_current_mvccid (THREAD_ENTRY * thread_p, MVCCID mvccid)
{
LOG_TDES *tdes = LOG_FIND_TDES (LOG_FIND_THREAD_TRAN_INDEX (thread_p));
MVCC_INFO *curr_mvcc_info;
assert (tdes != NULL);
curr_mvcc_info = &tdes->mvccinfo;
if (curr_mvcc_info->id == mvccid)
{
return true;
}
else if (curr_mvcc_info->sub_ids.size () > 0)
{
for (size_t i = 0; i < curr_mvcc_info->sub_ids.size (); i++)
{
if (curr_mvcc_info->sub_ids[i] == mvccid)
{
return true;
}
}
}
return false;
}
/*
* logtb_get_mvcc_snapshot - get MVCC snapshot
*
* return: MVCC snapshot
*
* thread_p(in): thread entry
*/
MVCC_SNAPSHOT *
logtb_get_mvcc_snapshot (THREAD_ENTRY * thread_p)
{
LOG_TDES *tdes = LOG_FIND_TDES (LOG_FIND_THREAD_TRAN_INDEX (thread_p));
if (!tdes->is_active_worker_transaction ())
{
/* System transactions do not have snapshots */
return NULL;
}
assert (tdes != NULL);
THREAD_ENTRY *main_thread_p = NULL;
if (thread_p->m_px_orig_thread_entry != NULL)
{
main_thread_p = thread_get_main_thread (thread_p);
pthread_mutex_lock (&main_thread_p->m_px_lock_mutex);
}
if (!tdes->mvccinfo.snapshot.valid)
{
log_Gl.mvcc_table.build_mvcc_info (*tdes);
}
if (main_thread_p != NULL)
{
pthread_mutex_unlock (&main_thread_p->m_px_lock_mutex);
}
return &tdes->mvccinfo.snapshot;
}
/*
* logtb_complete_mvcc () - Called at commit or rollback, completes MVCC info
* for current transaction.
*
* return : Void.
* thread_p (in) : Thread entry.
* tdes (in) : Transaction descriptor.
* committed (in) : True if transaction was committed false if it was aborted.
*/
void
logtb_complete_mvcc (THREAD_ENTRY * thread_p, LOG_TDES * tdes, bool committed)
{
MVCC_INFO *curr_mvcc_info = NULL;
mvcctable *mvcc_table = &log_Gl.mvcc_table;
MVCC_SNAPSHOT *p_mvcc_snapshot = NULL;
MVCCID mvccid;
int tran_index;
TSC_TICKS start_tick, end_tick;
TSCTIMEVAL tv_diff;
UINT64 tran_complete_time;
bool is_perf_tracking = false;
assert (tdes != NULL);
is_perf_tracking = perfmon_is_perf_tracking ();
if (is_perf_tracking)
{
tsc_getticks (&start_tick);
}
curr_mvcc_info = &tdes->mvccinfo;
mvccid = curr_mvcc_info->id;
tran_index = LOG_FIND_THREAD_TRAN_INDEX (thread_p);
if (MVCCID_IS_VALID (mvccid))
{
mvcc_table->complete_mvcc (tran_index, mvccid, committed);
}
else
{
if (committed && logtb_tran_update_all_global_unique_stats (thread_p) != NO_ERROR)
{
assert (false);
}
/* atomic set transaction lowest active MVCCID */
log_Gl.mvcc_table.reset_transaction_lowest_active (tran_index);
}
curr_mvcc_info->recent_snapshot_lowest_active_mvccid = MVCCID_NULL;
p_mvcc_snapshot = &(curr_mvcc_info->snapshot);
if (p_mvcc_snapshot->valid)
{
logtb_tran_reset_count_optim_state (thread_p);
}
curr_mvcc_info->reset ();
logtb_tran_clear_update_stats (&tdes->log_upd_stats);
if (is_perf_tracking)
{
tsc_getticks (&end_tick);
tsc_elapsed_time_usec (&tv_diff, end_tick, start_tick);
tran_complete_time = tv_diff.tv_sec * 1000000LL + tv_diff.tv_usec;
if (tran_complete_time > 0)
{
perfmon_add_stat (thread_p, PSTAT_LOG_TRAN_COMPLETE_TIME_COUNTERS, tran_complete_time);
}
}
}
/*
* logtb_set_loose_end_tdes -
*
* return:
*
* tdes(in/out):
*
* Note:
*/
static void
logtb_set_loose_end_tdes (LOG_TDES * tdes)
{
if (LOG_ISTRAN_2PC_PREPARE (tdes))
{
tdes->isloose_end = true;
log_Gl.trantable.num_prepared_loose_end_indices++;
#if !defined(NDEBUG)
if (prm_get_bool_value (PRM_ID_LOG_TRACE_DEBUG))
{
fprintf (stdout,
"\n*** Transaction = %d (index = %d) is prepared to commit as gobal tran = %d\n"
" The coordinator site (maybe the client user = %s) needs to attach\n"
" to this transaction and either commit or abort it. ***\n", tdes->trid, tdes->tran_index,
tdes->gtrid, tdes->client.get_db_user ());
fflush (stdout);
}
#endif
}
else if (LOG_ISTRAN_2PC_IN_SECOND_PHASE (tdes) || tdes->state == TRAN_UNACTIVE_2PC_COLLECTING_PARTICIPANT_VOTES)
{
tdes->isloose_end = true;
log_Gl.trantable.num_coord_loose_end_indices++;
#if !defined(NDEBUG)
if (prm_get_bool_value (PRM_ID_LOG_TRACE_DEBUG))
{
fprintf (stdout,
"\n*** Transaction = %d (index = %d) needs to complete informing participants\n"
" about its fate = %s and collect participant acknowledgements.\n"
" This transaction has been disassociated from the client user = %s.\n"
" The transaction will be completely finished by the system ***\n", tdes->trid,
tdes->tran_index, ((LOG_ISTRAN_COMMITTED (tdes)) ? "COMMIT" : "ABORT"), tdes->client.get_db_user ());
fflush (stdout);
}
#endif
}
}
/*
* logtb_set_num_loose_end_trans - set the number of loose end transactions
*
* return: num loose ends
*
* Note: The number of loose ends transactions is set by searching the
* transaction table.
*/
int
logtb_set_num_loose_end_trans (THREAD_ENTRY * thread_p)
{
int i;
int r;
LOG_TDES *tdes; /* Transaction descriptor */
TR_TABLE_CS_ENTER (thread_p);
log_Gl.trantable.num_coord_loose_end_indices = 0;
log_Gl.trantable.num_prepared_loose_end_indices = 0;
for (i = 0; i < NUM_TOTAL_TRAN_INDICES; i++)
{
if (i != LOG_SYSTEM_TRAN_INDEX)
{
tdes = log_Gl.trantable.all_tdes[i];
if (tdes != NULL && tdes->trid != NULL_TRANID)
{
logtb_set_loose_end_tdes (tdes);
}
}
}
r = (log_Gl.trantable.num_coord_loose_end_indices + log_Gl.trantable.num_prepared_loose_end_indices);
TR_TABLE_CS_EXIT (thread_p);
return r;
}
/*
* logtb_rv_read_only_map_undo_tdes - map func to all tdes to abort in the UNOO recovery phase.
*/
void
logtb_rv_read_only_map_undo_tdes (THREAD_ENTRY * thread_p, const std::function < void (const log_tdes &) > map_func)
{
int i;
LOG_TDES *tdes; /* Transaction descriptor */
TR_TABLE_CS_ENTER_READ_MODE (thread_p);
/* Check active transactions. */
for (i = 0; i < NUM_TOTAL_TRAN_INDICES; i++)
{
if (i != LOG_SYSTEM_TRAN_INDEX)
{
tdes = log_Gl.trantable.all_tdes[i];
if (tdes != NULL && tdes->trid != NULL_TRANID
&& (tdes->state == TRAN_UNACTIVE_UNILATERALLY_ABORTED || tdes->state == TRAN_UNACTIVE_ABORTED))
{
map_func (*tdes);
}
}
}
/* Check system worker transactions. */
// *INDENT-OFF*
log_system_tdes::map_all_tdes (map_func);
// *INDENT-ON*
TR_TABLE_CS_EXIT (thread_p);
}
/*
* logtb_find_smallest_lsa - smallest lsa address of all active transactions
*
* return:
*
* lsa(in):
*
*/
void
logtb_find_smallest_lsa (THREAD_ENTRY * thread_p, LOG_LSA * lsa)
{
int i;
LOG_TDES *tdes; /* Transaction descriptor */
LOG_LSA *min_lsa = NULL; /* The smallest lsa value */
LSA_SET_NULL (lsa);
TR_TABLE_CS_ENTER_READ_MODE (thread_p);
for (i = 0; i < NUM_TOTAL_TRAN_INDICES; i++)
{
if (i != LOG_SYSTEM_TRAN_INDEX)
{
tdes = log_Gl.trantable.all_tdes[i];
if (tdes != NULL && tdes->trid != NULL_TRANID && !LSA_ISNULL (&tdes->head_lsa)
&& (min_lsa == NULL || LSA_LT (&tdes->head_lsa, min_lsa)))
{
min_lsa = &tdes->head_lsa;
}
}
}
if (min_lsa != NULL)
{
LSA_COPY (lsa, min_lsa);
}
TR_TABLE_CS_EXIT (thread_p);
}
/*
* logtb_tran_prepare_count_optim_classes - prepare classes for count
* optimization (for unique statistics
* loading)
*
* return: error code
*
* thread_p(in): thread entry
* classes(in): classes names list
* flags(in): flags associated with class names
* n_classes(in): number of classes names
*
* Note: This function is called at prefetch request. It receives a list of
* classes and for each class the COS_TO_LOAD flag will be set if the
* statistics were not already loaded. The statistics will be loaded at
* snapshot.
*/
int
logtb_tran_prepare_count_optim_classes (THREAD_ENTRY * thread_p, const char **classes, LC_PREFETCH_FLAGS * flags,
int n_classes)
{
int idx;
OID class_oid;
LC_FIND_CLASSNAME find;
LOG_TRAN_CLASS_COS *class_cos = NULL;
for (idx = n_classes - 1; idx >= 0; idx--)
{
if (!(flags[idx] & LC_PREF_FLAG_COUNT_OPTIM))
{
continue;
}
/* get class OID from class name */
find = xlocator_find_class_oid (thread_p, classes[idx], &class_oid, NULL_LOCK);
switch (find)
{
case LC_CLASSNAME_ERROR:
return ER_FAILED;
case LC_CLASSNAME_EXIST:
if (OID_ISNULL (&class_oid))
{
/* The class OID could not be retrieved. Return error. */
return ER_FAILED;
}
else
{
/* search for class statistics (create if not exist). */
class_cos = logtb_tran_find_class_cos (thread_p, &class_oid, true);
if (class_cos == NULL)
{
/* something wrong happened. Just return error */
return ER_FAILED;
}
/* Mark class for unique statistics loading. The statistics will be loaded when snapshot will be taken */
if (class_cos->count_state != COS_LOADED)
{
class_cos->count_state = COS_TO_LOAD;
}
}
break;
default:
break;
}
}
return NO_ERROR;
}
/*
* logtb_tran_reset_cos_func - working function for
* logtb_tran_reset_count_optim_state
*
* return:
* data(in): count optimization state entry.
* args(in): not used.
*
* thread_p(in): thread entry
*/
static int
logtb_tran_reset_cos_func (THREAD_ENTRY * thread_p, void *data, void *args)
{
((LOG_TRAN_CLASS_COS *) data)->count_state = COS_NOT_LOADED;
return NO_ERROR;
}
/*
* logtb_tran_reset_count_optim_state - reset count optimization state for all
* class statistics instances
*
* return:
*
* thread_p(in): thread entry
*/
void
logtb_tran_reset_count_optim_state (THREAD_ENTRY * thread_p)
{
LOG_TDES *tdes = LOG_FIND_TDES (LOG_FIND_THREAD_TRAN_INDEX (thread_p));
mht_map_no_key (thread_p, tdes->log_upd_stats.classes_cos_hash, logtb_tran_reset_cos_func, NULL);
}
/*
* logtb_create_unique_stats_from_repr - create unique statistics instances
* for all unique indexes of a class
*
* return: error code
*
* thread_p(in) : thread entry
* class_oid(in) : class for which the unique statistics will be created
*/
static int
logtb_create_unique_stats_from_repr (THREAD_ENTRY * thread_p, OID * class_oid)
{
OR_CLASSREP *classrepr = NULL;
int error_code = NO_ERROR, idx, classrepr_cacheindex = -1;
LOG_TRAN_BTID_UNIQUE_STATS *unique_stats = NULL;
/* get class representation to find the total number of indexes */
classrepr = heap_classrepr_get (thread_p, class_oid, NULL, NULL_REPRID, &classrepr_cacheindex);
if (classrepr == NULL)
{
goto exit_on_error;
}
for (idx = classrepr->n_indexes - 1; idx >= 0; idx--)
{
if (btree_is_unique_type (classrepr->indexes[idx].type))
{
unique_stats = logtb_tran_find_btid_stats (thread_p, &classrepr->indexes[idx].btid, true);
if (unique_stats == NULL)
{
error_code = ER_FAILED;
goto exit_on_error;
}
error_code =
logtb_get_global_unique_stats (thread_p, &unique_stats->btid, &unique_stats->global_stats.num_oids,
&unique_stats->global_stats.num_nulls, &unique_stats->global_stats.num_keys);
if (error_code != NO_ERROR)
{
goto exit_on_error;
}
}
}
/* free class representation */
heap_classrepr_free_and_init (classrepr, &classrepr_cacheindex);
return NO_ERROR;
exit_on_error:
if (classrepr != NULL)
{
heap_classrepr_free_and_init (classrepr, &classrepr_cacheindex);
}
return (error_code == NO_ERROR && (error_code = er_errid ()) == NO_ERROR) ? ER_FAILED : error_code;
}
#if defined(ENABLE_UNUSED_FUNCTION)
/*
* logtb_find_largest_lsa - largest lsa address of all active transactions
*
* return: LOG_LSA *
*
* Note: Find the largest LSA address of all active transactions.
*/
LOG_LSA *
logtb_find_largest_lsa (THREAD_ENTRY * thread_p)
{
int i;
LOG_TDES *tdes; /* Transaction descriptor */
LOG_LSA *max_lsa = NULL; /* The largest lsa value */
TR_TABLE_CS_ENTER_READ_MODE (thread_p);
for (i = 0; i < NUM_TOTAL_TRAN_INDICES; i++)
{
if (i != LOG_SYSTEM_TRAN_INDEX)
{
tdes = log_Gl.trantable.all_tdes[i];
if (tdes != NULL && tdes->trid != NULL_TRANID && !LSA_ISNULL (&tdes->tail_lsa)
&& (max_lsa == NULL || LSA_GT (&tdes->tail_lsa, max_lsa)))
{
max_lsa = &tdes->tail_lsa;
}
}
}
TR_TABLE_CS_EXIT (thread_p);
return max_lsa;
}
#endif /* ENABLE_UNUSED_FUNCTION */
/*
* logtb_find_smallest_and_largest_active_pages - smallest and larger active pages
*
* return: nothing...
*
* smallest(in/out): smallest active log page
* largest(in/out): largest active log page
*
* Note: Find the smallest and larger active pages.
*/
void
logtb_find_smallest_and_largest_active_pages (THREAD_ENTRY * thread_p, LOG_PAGEID * smallest, LOG_PAGEID * largest)
{
int i;
LOG_TDES *tdes; /* Transaction descriptor */
TR_TABLE_CS_ENTER_READ_MODE (thread_p);
*smallest = *largest = NULL_PAGEID;
for (i = 0; i < NUM_TOTAL_TRAN_INDICES; i++)
{
if (i != LOG_SYSTEM_TRAN_INDEX)
{
tdes = log_Gl.trantable.all_tdes[i];
if (tdes != NULL && tdes->trid != NULL_TRANID && !LSA_ISNULL (&tdes->head_lsa))
{
if (*smallest == NULL_PAGEID || tdes->head_lsa.pageid < *smallest)
{
*smallest = tdes->head_lsa.pageid;
}
if (*largest == NULL_PAGEID || tdes->tail_lsa.pageid > *largest)
{
*largest = tdes->tail_lsa.pageid;
}
if (*largest == NULL_PAGEID || tdes->posp_nxlsa.pageid > *largest)
{
*largest = tdes->posp_nxlsa.pageid;
}
}
}
}
TR_TABLE_CS_EXIT (thread_p);
}
/*
* logtb_has_deadlock_priority -
*
* return: whether this transaction has deadlock priority
*/
bool
logtb_has_deadlock_priority (int tran_index)
{
LOG_TDES *tdes; /* Transaction descriptor */
tdes = LOG_FIND_TDES (tran_index);
if (tdes)
{
return tdes->has_deadlock_priority;
}
return false;
}
/*
*logtb_get_new_subtransaction_mvccid - assign a new sub-transaction MVCCID
*
* return: error code
*
* thread_p(in): Thread entry
* curr_mvcc_info(in): current MVCC info
*
* Note: If transaction MVCCID is NULL then a new transaction MVCCID is
* allocated first.
*/
void
logtb_get_new_subtransaction_mvccid (THREAD_ENTRY * thread_p, MVCC_INFO * curr_mvcc_info)
{
MVCCID mvcc_subid;
mvcctable *mvcc_table;
assert (curr_mvcc_info != NULL);
mvcc_table = &log_Gl.mvcc_table;
// curr_mvcc_info->id must be valid too!
if (MVCCID_IS_VALID (curr_mvcc_info->id))
{
mvcc_subid = mvcc_table->get_new_mvccid ();
}
else
{
mvcc_table->get_two_new_mvccid (curr_mvcc_info->id, mvcc_subid);
}
logtb_assign_subtransaction_mvccid (thread_p, curr_mvcc_info, mvcc_subid);
}
/*
* logtb_assign_subtransaction_mvccid () - Assign sub-transaction MVCCID.
*
* return : Error code.
* thread_p (in) : Thread entry.
* curr_mvcc_info (in) : Current transaction MVCC information.
* mvcc_subid (in) : Sub-transaction MVCCID.
*/
static void
logtb_assign_subtransaction_mvccid (THREAD_ENTRY * thread_p, MVCC_INFO * curr_mvcc_info, MVCCID mvcc_subid)
{
assert (curr_mvcc_info != NULL);
assert (MVCCID_IS_VALID (curr_mvcc_info->id));
curr_mvcc_info->sub_ids.push_back (mvcc_subid);
}
/*
* logtb_complete_sub_mvcc () - Called at end of sub-transaction
*
* return : Void.
* thread_p (in) : Thread entry.
* tdes (in) : Transaction descriptor.
*/
void
logtb_complete_sub_mvcc (THREAD_ENTRY * thread_p, LOG_TDES * tdes)
{
MVCC_INFO *curr_mvcc_info = NULL;
MVCCID mvcc_sub_id;
mvcctable *mvcc_table = &log_Gl.mvcc_table;
assert (tdes != NULL);
curr_mvcc_info = &tdes->mvccinfo;
mvcc_sub_id = curr_mvcc_info->sub_ids.back ();
mvcc_table->complete_sub_mvcc (mvcc_sub_id);
curr_mvcc_info->sub_ids.pop_back ();
if (tdes->mvccinfo.snapshot.valid)
{
/* adjust snapshot to reflect committed sub-transaction, since the parent transaction didn't finished yet */
MVCC_SNAPSHOT *snapshot = &tdes->mvccinfo.snapshot;
if (mvcc_sub_id >= snapshot->highest_completed_mvccid)
{
snapshot->highest_completed_mvccid = mvcc_sub_id;
MVCCID_FORWARD (snapshot->highest_completed_mvccid);
}
snapshot->m_active_mvccs.set_inactive_mvccid (mvcc_sub_id);
}
}
/*
* logtb_global_unique_stat_alloc () - allocate a new structure of unique
* statistics for a btree
* returns: new pointer or NULL on error
*/
static void *
logtb_global_unique_stat_alloc (void)
{
GLOBAL_UNIQUE_STATS *unique_stat;
unique_stat = (GLOBAL_UNIQUE_STATS *) malloc (sizeof (GLOBAL_UNIQUE_STATS));
if (unique_stat == NULL)
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, sizeof (GLOBAL_UNIQUE_STATS));
return NULL;
}
pthread_mutex_init (&unique_stat->mutex, NULL);
return (void *) unique_stat;
}
/*
* logtb_global_unique_stat_free () - free a global unique statistics of a btree
* returns: error code or NO_ERROR
* unique_stat(in): global unique statistics entry to free
* (GLOBAL_UNIQUE_STATS)
*/
static int
logtb_global_unique_stat_free (void *unique_stat)
{
if (unique_stat != NULL)
{
pthread_mutex_destroy (&((GLOBAL_UNIQUE_STATS *) unique_stat)->mutex);
free (unique_stat);
return NO_ERROR;
}
else
{
return ER_FAILED;
}
}
/*
* logtb_global_unique_stat_init () - initialize global unique statistics element
* returns: error code or NO_ERROR
* unique_stat(in): global unique statistics element
*/
static int
logtb_global_unique_stat_init (void *unique_stat)
{
GLOBAL_UNIQUE_STATS *unique_stat_p = (GLOBAL_UNIQUE_STATS *) unique_stat;
if (unique_stat == NULL)
{
return ER_FAILED;
}
/* initialize fields */
BTID_SET_NULL (&unique_stat_p->btid);
unique_stat_p->unique_stats.num_nulls = 0;
unique_stat_p->unique_stats.num_keys = 0;
unique_stat_p->unique_stats.num_oids = 0;
LSA_SET_NULL (&unique_stat_p->last_log_lsa);
return NO_ERROR;
}
/*
* logtb_global_unique_stat_key_copy () - copy a global unique statistics key
* returns: error code or NO_ERROR
* src(in): source
* dest(in): destination
*/
static int
logtb_global_unique_stat_key_copy (void *src, void *dest)
{
if (src == NULL || dest == NULL)
{
return ER_FAILED;
}
BTID_COPY ((BTID *) dest, (BTID *) src);
/* all ok */
return NO_ERROR;
}
/*
* logtb_initialize_global_unique_stats_table () - Creates and initializes
* global structure for global
* unique statistics
* return: error code
* thread_p (in) :
*/
int
logtb_initialize_global_unique_stats_table (THREAD_ENTRY * thread_p)
{
int ret = NO_ERROR;
LF_ENTRY_DESCRIPTOR *edesc = &log_Gl.unique_stats_table.unique_stats_descriptor;
if (log_Gl.unique_stats_table.initialized)
{
return NO_ERROR;
}
edesc->of_local_next = offsetof (GLOBAL_UNIQUE_STATS, stack);
edesc->of_next = offsetof (GLOBAL_UNIQUE_STATS, next);
edesc->of_del_tran_id = offsetof (GLOBAL_UNIQUE_STATS, del_id);
edesc->of_key = offsetof (GLOBAL_UNIQUE_STATS, btid);
edesc->of_mutex = offsetof (GLOBAL_UNIQUE_STATS, mutex);
edesc->using_mutex = LF_EM_USING_MUTEX;
edesc->max_alloc_cnt = LF_ENTRY_DESCRIPTOR_MAX_ALLOC;
edesc->f_alloc = logtb_global_unique_stat_alloc;
edesc->f_free = logtb_global_unique_stat_free;
edesc->f_init = logtb_global_unique_stat_init;
edesc->f_uninit = NULL;
edesc->f_key_copy = logtb_global_unique_stat_key_copy;
edesc->f_key_cmp = btree_compare_btids;
edesc->f_hash = btree_hash_btid;
edesc->f_duplicate = NULL;
/* initialize freelist */
ret = lf_freelist_init (&log_Gl.unique_stats_table.unique_stats_freelist, 1, 100, edesc, &global_unique_stats_Ts);
if (ret != NO_ERROR)
{
return ret;
}
/* initialize hash table */
ret =
lf_hash_init (&log_Gl.unique_stats_table.unique_stats_hash, &log_Gl.unique_stats_table.unique_stats_freelist,
GLOBAL_UNIQUE_STATS_HASH_SIZE, edesc);
if (ret != NO_ERROR)
{
lf_hash_destroy (&log_Gl.unique_stats_table.unique_stats_hash);
return ret;
}
LSA_SET_NULL (&log_Gl.unique_stats_table.curr_rcv_rec_lsa);
log_Gl.unique_stats_table.initialized = true;
return ret;
}
/*
* logtb_finalize_global_unique_stats_table () - Finalize global structure for
* global unique statistics
* return: error code
* thread_p (in) :
*/
void
logtb_finalize_global_unique_stats_table (THREAD_ENTRY * thread_p)
{
if (log_Gl.unique_stats_table.initialized)
{
/* destroy hash and freelist */
lf_hash_destroy (&log_Gl.unique_stats_table.unique_stats_hash);
lf_freelist_destroy (&log_Gl.unique_stats_table.unique_stats_freelist);
log_Gl.unique_stats_table.initialized = false;
}
}
/*
* logtb_get_global_unique_stats_entry () - returns the entry into the global
* unique statistics associated with
* the given btid
* return: the entry associated with btid or NULL in case of error
* thread_p (in) :
* btid (in) : the btree id for which the entry will be returned
* load_at_creation (in) : if true and there is no entry in hash for btid then
* load statistics from btree header into hash
*
* NOTE: The statistics are searched in the global hash. If they are found
* then the found entry is returned. Otherwise a new entry will be
* created and inserted into hash. If load_at_creation is true then the
* statistics will be loaded from btree header.
*
* NOTE: !!! DO NOT CALL THIS FUNCTION IF YOU HAVE A LATCH ON THE BTREE
* HEADER. THIS CAN CAUSE A DEADLOCK BETWEEN THE LATCH AND THE MUTEX !!!
*/
static GLOBAL_UNIQUE_STATS *
logtb_get_global_unique_stats_entry (THREAD_ENTRY * thread_p, BTID * btid, bool load_at_creation)
{
int error_code = NO_ERROR;
LF_TRAN_ENTRY *t_entry = thread_get_tran_entry (thread_p, THREAD_TS_GLOBAL_UNIQUE_STATS);
GLOBAL_UNIQUE_STATS *stats = NULL;
long long num_oids, num_nulls, num_keys;
assert (btid != NULL);
#if !defined(NDEBUG)
{
VPID root_vpid;
root_vpid.pageid = btid->root_pageid;
root_vpid.volid = btid->vfid.volid;
assert (!pgbuf_is_page_fixed_by_thread (thread_p, &root_vpid));
}
#endif
error_code = lf_hash_find (t_entry, &log_Gl.unique_stats_table.unique_stats_hash, btid, (void **) &stats);
if (error_code != NO_ERROR)
{
return NULL;
}
if (stats == NULL)
{
if (load_at_creation)
{
error_code = btree_get_unique_statistics (thread_p, btid, &num_oids, &num_nulls, &num_keys);
if (error_code != NO_ERROR)
{
return NULL;
}
}
error_code =
lf_hash_find_or_insert (t_entry, &log_Gl.unique_stats_table.unique_stats_hash, btid, (void **) &stats, NULL);
if (error_code != NO_ERROR || stats == NULL)
{
return NULL;
}
if (load_at_creation)
{
stats->unique_stats.num_oids = num_oids;
stats->unique_stats.num_nulls = num_nulls;
stats->unique_stats.num_keys = num_keys;
}
}
return stats;
}
/*
* logtb_get_global_unique_stats () - returns the global unique statistics for
* the given btid
* return: error code
* thread_p (in) :
* btid (in) : the btree id for which the statistics will be returned
* num_oids (in) : address of an integer that will receive the global number
* of oids for the given btid
* num_nulls (in) : address of an integer that will receive the global number
* of nulls for the given btid
* num_keys (in) : address of an integer that will receive the global number
* of keys for the given btid
*/
int
logtb_get_global_unique_stats (THREAD_ENTRY * thread_p, BTID * btid, long long *num_oids, long long *num_nulls,
long long *num_keys)
{
int error_code = NO_ERROR;
GLOBAL_UNIQUE_STATS *stats = NULL;
assert (btid != NULL);
stats = logtb_get_global_unique_stats_entry (thread_p, btid, true);
if (stats == NULL)
{
return ER_FAILED;
}
*num_oids = stats->unique_stats.num_oids;
*num_nulls = stats->unique_stats.num_nulls;
*num_keys = stats->unique_stats.num_keys;
pthread_mutex_unlock (&stats->mutex);
return error_code;
}
/*
* logtb_rv_update_global_unique_stats_by_abs () - updates the global unique
* statistics associated with
* the given btid by absolute
* values. used for recovery.
* return: error code
* thread_p (in) :
* btid (in) : the btree id for which the statistics will be updated
* num_oids (in) : the new number of oids
* num_nulls (in) : the new number of nulls
* num_keys (in) : the new number of keys
*/
int
logtb_rv_update_global_unique_stats_by_abs (THREAD_ENTRY * thread_p, BTID * btid, long long num_oids,
long long num_nulls, long long num_keys)
{
int error_code = NO_ERROR;
GLOBAL_UNIQUE_STATS *stats = NULL;
/* Because we update the statistics with absolute values (this means that we override old values) we don't need to
* load from btree header old values and, therefore, we give a 'false' value to 'load_at_creation' parameter */
stats = logtb_get_global_unique_stats_entry (thread_p, btid, false);
if (stats == NULL)
{
return ER_FAILED;
}
if (!LSA_ISNULL (&log_Gl.unique_stats_table.curr_rcv_rec_lsa))
{
/* Here we assume that we are at recovery stage */
LSA_COPY (&stats->last_log_lsa, &log_Gl.unique_stats_table.curr_rcv_rec_lsa);
}
if (prm_get_bool_value (PRM_ID_LOG_UNIQUE_STATS))
{
_er_log_debug (ARG_FILE_LINE,
"Update stats for index (%d, %d|%d) to nulls=%d, oids=%d, keys=%d. LSA=%lld|%d.\n",
btid->root_pageid, btid->vfid.volid, btid->vfid.fileid, num_nulls, num_oids, num_keys,
(long long int) stats->last_log_lsa.pageid, (int) stats->last_log_lsa.offset);
}
stats->unique_stats.num_oids = num_oids;
stats->unique_stats.num_nulls = num_nulls;
stats->unique_stats.num_keys = num_keys;
pthread_mutex_unlock (&stats->mutex);
return error_code;
}
/*
* logtb_update_global_unique_stats_by_delta () - updates the global unique
* statistics associated with the
* given btid by delta values
* return: error code
* thread_p (in) :
* btid (in) : the btree id for which the statistics will be updated
* oid_delta (in) : the delta of oids that will be added
* null_delta (in) : the delta of nulls that will be added
* key_delta (in) : the delta of keys that will be added
* log (in) : true if we need to log the changes
*/
int
logtb_update_global_unique_stats_by_delta (THREAD_ENTRY * thread_p, BTID * btid, long long oid_delta,
long long null_delta, long long key_delta, bool log)
{
int error_code = NO_ERROR;
GLOBAL_UNIQUE_STATS *stats = NULL;
LOG_TDES *tdes = LOG_FIND_CURRENT_TDES (thread_p);
long long num_oids, num_nulls, num_keys;
if (oid_delta == 0 && key_delta == 0 && null_delta == 0)
{
return NO_ERROR;
}
stats = logtb_get_global_unique_stats_entry (thread_p, btid, true);
if (stats == NULL)
{
return ER_FAILED;
}
num_oids = stats->unique_stats.num_oids + oid_delta;
num_nulls = stats->unique_stats.num_nulls + null_delta;
num_keys = stats->unique_stats.num_keys + key_delta;
if (log)
{
RECDES undo_rec, redo_rec;
char undo_rec_buf[(3 * OR_BIGINT_SIZE) + OR_BTID_ALIGNED_SIZE + BTREE_MAX_ALIGN], *datap = NULL;
char redo_rec_buf[(3 * OR_BIGINT_SIZE) + OR_BTID_ALIGNED_SIZE + BTREE_MAX_ALIGN];
/* although we don't change the btree header, we still need to log here the new values of statistics so that they
* can be recovered at recover stage. For undo purposes we log the increments. */
undo_rec.data = NULL;
undo_rec.area_size = 3 * OR_BIGINT_SIZE + OR_BTID_ALIGNED_SIZE;
undo_rec.data = PTR_ALIGN (undo_rec_buf, BTREE_MAX_ALIGN);
undo_rec.length = 0;
datap = (char *) undo_rec.data;
OR_PUT_BTID (datap, btid);
datap += OR_BTID_ALIGNED_SIZE;
OR_PUT_BIGINT (datap, &null_delta);
datap += OR_BIGINT_SIZE;
OR_PUT_BIGINT (datap, &oid_delta);
datap += OR_BIGINT_SIZE;
OR_PUT_BIGINT (datap, &key_delta);
datap += OR_BIGINT_SIZE;
undo_rec.length = CAST_BUFLEN (datap - undo_rec.data);
redo_rec.data = NULL;
redo_rec.area_size = 3 * OR_BIGINT_SIZE + OR_BTID_ALIGNED_SIZE;
redo_rec.data = PTR_ALIGN (redo_rec_buf, BTREE_MAX_ALIGN);
redo_rec.length = 0;
datap = (char *) redo_rec.data;
OR_PUT_BTID (datap, btid);
datap += OR_BTID_ALIGNED_SIZE;
OR_PUT_BIGINT (datap, &num_nulls);
datap += OR_BIGINT_SIZE;
OR_PUT_BIGINT (datap, &num_oids);
datap += OR_BIGINT_SIZE;
OR_PUT_BIGINT (datap, &num_keys);
datap += OR_BIGINT_SIZE;
redo_rec.length = CAST_BUFLEN (datap - redo_rec.data);
log_append_undoredo_data2 (thread_p, RVBT_LOG_GLOBAL_UNIQUE_STATS_COMMIT, NULL, NULL, HEADER, undo_rec.length,
redo_rec.length, undo_rec.data, redo_rec.data);
LSA_COPY (&stats->last_log_lsa, &tdes->tail_lsa);
}
else if (!LSA_ISNULL (&log_Gl.unique_stats_table.curr_rcv_rec_lsa))
{
/* Here we assume that we are at recovery stage */
LSA_COPY (&stats->last_log_lsa, &log_Gl.unique_stats_table.curr_rcv_rec_lsa);
}
if (prm_get_bool_value (PRM_ID_LOG_UNIQUE_STATS))
{
_er_log_debug (ARG_FILE_LINE,
"Update stats for index (%d, %d|%d) by nulls=%lld, "
"oids=%lld, keys=%lld to nulls=%lld, oids=%lld, keys=%lld. LSA=%lld|%d.\n", btid->root_pageid,
btid->vfid.volid, btid->vfid.fileid, null_delta, oid_delta, key_delta, num_nulls, num_oids,
num_keys, (long long int) stats->last_log_lsa.pageid, (int) stats->last_log_lsa.offset);
}
stats->unique_stats.num_oids = num_oids;
stats->unique_stats.num_nulls = num_nulls;
stats->unique_stats.num_keys = num_keys;
pthread_mutex_unlock (&stats->mutex);
return error_code;
}
/*
* logtb_delete_global_unique_stats () - deletes the entry associated with
* the given btid from global unique
* statistics hash
* return: error code
* thread_p (in) :
* btid (in) : the btree id for which the statistics entry will be deleted
*/
int
logtb_delete_global_unique_stats (THREAD_ENTRY * thread_p, BTID * btid)
{
LF_TRAN_ENTRY *t_entry = thread_get_tran_entry (thread_p, THREAD_TS_GLOBAL_UNIQUE_STATS);
int error = NO_ERROR;
assert (!BTID_IS_NULL (btid));
#if !defined(NDEBUG)
{
VPID root_vpid;
root_vpid.pageid = btid->root_pageid;
root_vpid.volid = btid->vfid.volid;
assert (!pgbuf_is_page_fixed_by_thread (thread_p, &root_vpid));
}
#endif
error = lf_hash_delete (t_entry, &log_Gl.unique_stats_table.unique_stats_hash, btid, NULL);
if (error != NO_ERROR)
{
return error;
}
return NO_ERROR;
}
/*
* logtb_reflect_global_unique_stats_to_btree () - reflects the global
* statistics into the btree
* header
* return: error code
* thread_p (in) :
*/
int
logtb_reflect_global_unique_stats_to_btree (THREAD_ENTRY * thread_p)
{
int error = NO_ERROR;
LF_HASH_TABLE_ITERATOR it;
LF_TRAN_ENTRY *t_entry = thread_get_tran_entry (thread_p, THREAD_TS_GLOBAL_UNIQUE_STATS);
GLOBAL_UNIQUE_STATS *stats = NULL;
if (!log_Gl.unique_stats_table.initialized)
{
return NO_ERROR;
}
// reflecting stats should not be interrupted
bool save_check_interrupt = logtb_set_check_interrupt (thread_p, false);
lf_hash_create_iterator (&it, t_entry, &log_Gl.unique_stats_table.unique_stats_hash);
for (stats = (GLOBAL_UNIQUE_STATS *) lf_hash_iterate (&it); stats != NULL;
stats = (GLOBAL_UNIQUE_STATS *) lf_hash_iterate (&it))
{
/* reflect only if some changes were logged */
if (log_is_no_logging () || !LSA_ISNULL (&stats->last_log_lsa))
{
error = btree_reflect_global_unique_statistics (thread_p, stats, false);
if (error != NO_ERROR)
{
ASSERT_ERROR ();
// must unlock entry
pthread_mutex_unlock (&stats->mutex);
// finish transaction
lf_tran_end_with_mb (t_entry);
break;
}
LSA_SET_NULL (&stats->last_log_lsa);
}
}
(void) logtb_set_check_interrupt (thread_p, save_check_interrupt);
return error;
}
/*
* logtb_does_active_user_exist - check whether the specified user is active user
* or not. active user means it is in trantable now.
*
* return: true for existed
* thread_p(in):
* user_name(in): the specified user name
*
*/
bool
xlogtb_does_active_user_exist (THREAD_ENTRY * thread_p, const char *user_name)
{
int i;
LOG_TDES *tdes; /* Transaction descriptor */
bool existed = false;
TR_TABLE_CS_ENTER_READ_MODE (thread_p);
for (i = 0; i < NUM_TOTAL_TRAN_INDICES; i++)
{
tdes = log_Gl.trantable.all_tdes[i];
if (tdes != NULL && tdes->is_user_active && strcmp (tdes->client.get_db_user (), user_name) == 0)
{
existed = true;
break;
}
}
TR_TABLE_CS_EXIT (thread_p);
return existed;
}
#if !defined (NDEBUG) && !defined (WINDOWS)
int
logtb_collect_local_clients (int **local_clients_pids)
{
LOG_TDES *tdes; /* Transaction descriptor */
int i, num_client;
int *table;
*local_clients_pids = NULL;
table = (int *) malloc (NUM_TOTAL_TRAN_INDICES * sizeof (int));
if (table == NULL)
{
return ER_FAILED;
}
memset (table, 0, NUM_TOTAL_TRAN_INDICES * sizeof (int));
for (i = 0, num_client = 0; i < NUM_TOTAL_TRAN_INDICES; i++)
{
tdes = log_Gl.trantable.all_tdes[i];
if (tdes != NULL && tdes->client.process_id > 0 && !tdes->client.host_name.empty ()
&& (strcmp (tdes->client.get_host_name (), boot_Host_name) == 0
|| strcmp (tdes->client.get_host_name (), "localhost") == 0))
{
table[num_client++] = tdes->client.process_id;
}
}
*local_clients_pids = table;
return num_client;
}
#endif /* !defined (NDEBUG) && !defined (WINDOWS) */
/*
* logtb_descriptors_start_scan () - start scan function for tran descriptors
* return: NO_ERROR, or ER_code
*
* thread_p(in):
* type(in):
* arg_values(in):
* arg_cnt(in):
* ptr(in/out):
*/
int
logtb_descriptors_start_scan (THREAD_ENTRY * thread_p, int type, DB_VALUE ** arg_values, int arg_cnt, void **ptr)
{
SHOWSTMT_ARRAY_CONTEXT *ctx = NULL;
int i, idx, msecs, error = NO_ERROR;
char buf[512];
const char *str;
time_t tval;
INT64 i64val;
XASL_ID xasl_val;
DB_DATETIME time_val;
void *ptr_val;
LOG_TDES *tdes;
DB_VALUE *vals = NULL;
const int num_cols = 46;
*ptr = NULL;
ctx = showstmt_alloc_array_context (thread_p, NUM_TOTAL_TRAN_INDICES, num_cols);
if (ctx == NULL)
{
error = er_errid ();
return error;
}
TR_TABLE_CS_ENTER_READ_MODE (thread_p);
for (i = 0; i < NUM_TOTAL_TRAN_INDICES; i++)
{
tdes = log_Gl.trantable.all_tdes[i];
if (tdes == NULL || tdes->trid == NULL_TRANID)
{
/* The index is not assigned or is system transaction (no-client) */
continue;
}
idx = 0;
vals = showstmt_alloc_tuple_in_context (thread_p, ctx);
if (vals == NULL)
{
error = er_errid ();
goto exit_on_error;
}
/* Tran_index */
db_make_int (&vals[idx], tdes->tran_index);
idx++;
/* Tran_id */
db_make_int (&vals[idx], tdes->trid);
idx++;
/* Is_loose_end */
db_make_int (&vals[idx], tdes->isloose_end);
idx++;
/* State */
db_make_string (&vals[idx], log_state_short_string (tdes->state));
idx++;
/* isolation */
db_make_string (&vals[idx], log_isolation_string (tdes->isolation));
idx++;
/* Wait_msecs */
db_make_int (&vals[idx], tdes->wait_msecs);
idx++;
/* Head_lsa */
lsa_to_string (buf, sizeof (buf), &tdes->head_lsa);
error = db_make_string_copy (&vals[idx], buf);
idx++;
if (error != NO_ERROR)
{
goto exit_on_error;
}
/* Tail_lsa */
lsa_to_string (buf, sizeof (buf), &tdes->tail_lsa);
error = db_make_string_copy (&vals[idx], buf);
idx++;
if (error != NO_ERROR)
{
goto exit_on_error;
}
/* Undo_next_lsa */
lsa_to_string (buf, sizeof (buf), &tdes->undo_nxlsa);
error = db_make_string_copy (&vals[idx], buf);
idx++;
if (error != NO_ERROR)
{
goto exit_on_error;
}
/* Postpone_next_lsa */
lsa_to_string (buf, sizeof (buf), &tdes->posp_nxlsa);
error = db_make_string_copy (&vals[idx], buf);
idx++;
if (error != NO_ERROR)
{
goto exit_on_error;
}
/* Savepoint_lsa */
lsa_to_string (buf, sizeof (buf), &tdes->savept_lsa);
error = db_make_string_copy (&vals[idx], buf);
idx++;
if (error != NO_ERROR)
{
goto exit_on_error;
}
/* Topop_lsa */
lsa_to_string (buf, sizeof (buf), &tdes->topop_lsa);
error = db_make_string_copy (&vals[idx], buf);
idx++;
if (error != NO_ERROR)
{
goto exit_on_error;
}
/* Tail_top_result_lsa */
lsa_to_string (buf, sizeof (buf), &tdes->tail_topresult_lsa);
error = db_make_string_copy (&vals[idx], buf);
idx++;
if (error != NO_ERROR)
{
goto exit_on_error;
}
/* Client_id */
db_make_int (&vals[idx], tdes->client_id);
idx++;
/* Client_type */
str = boot_client_type_to_string ((BOOT_CLIENT_TYPE) tdes->client.client_type);
error = db_make_string_copy (&vals[idx], str);
idx++;
if (error != NO_ERROR)
{
goto exit_on_error;
}
/* Client_info */
error = db_make_string_copy (&vals[idx], tdes->client.get_client_info ());
idx++;
if (error != NO_ERROR)
{
goto exit_on_error;
}
/* Client_db_user */
error = db_make_string_copy (&vals[idx], tdes->client.get_db_user ());
idx++;
if (error != NO_ERROR)
{
goto exit_on_error;
}
/* Client_program */
error = db_make_string_copy (&vals[idx], tdes->client.get_program_name ());
idx++;
if (error != NO_ERROR)
{
goto exit_on_error;
}
/* Client_login_user */
error = db_make_string_copy (&vals[idx], tdes->client.get_login_name ());
idx++;
if (error != NO_ERROR)
{
goto exit_on_error;
}
/* Client_host */
error = db_make_string_copy (&vals[idx], tdes->client.get_host_name ());
idx++;
if (error != NO_ERROR)
{
goto exit_on_error;
}
/* Client_pid */
db_make_int (&vals[idx], tdes->client.process_id);
idx++;
/* Topop_depth */
db_make_int (&vals[idx], tdes->topops.last + 1);
idx++;
/* Num_unique_btrees */
db_make_int (&vals[idx], tdes->num_unique_btrees);
idx++;
/* Max_unique_btrees */
db_make_int (&vals[idx], tdes->max_unique_btrees);
idx++;
/* Interrupt */
db_make_int (&vals[idx], tdes->interrupt);
idx++;
/* Num_transient_classnames */
db_make_int (&vals[idx], tdes->num_transient_classnames);
idx++;
/* Repl_max_records */
db_make_int (&vals[idx], tdes->num_repl_records);
idx++;
/* Repl_records */
ptr_val = tdes->repl_records;
if (ptr_val == NULL)
{
db_make_null (&vals[idx]);
}
else
{
snprintf (buf, sizeof (buf), "0x%08" PRIx64, (UINT64) ptr_val);
error = db_make_string_copy (&vals[idx], buf);
if (error != NO_ERROR)
{
goto exit_on_error;
}
}
idx++;
/* Repl_current_index */
db_make_int (&vals[idx], tdes->cur_repl_record);
idx++;
/* Repl_append_index */
db_make_int (&vals[idx], tdes->append_repl_recidx);
idx++;
/* Repl_flush_marked_index */
db_make_int (&vals[idx], tdes->fl_mark_repl_recidx);
idx++;
/* Repl_insert_lsa */
lsa_to_string (buf, sizeof (buf), &tdes->repl_insert_lsa);
error = db_make_string_copy (&vals[idx], buf);
idx++;
if (error != NO_ERROR)
{
goto exit_on_error;
}
/* Repl_update_lsa */
lsa_to_string (buf, sizeof (buf), &tdes->repl_update_lsa);
error = db_make_string_copy (&vals[idx], buf);
idx++;
if (error != NO_ERROR)
{
goto exit_on_error;
}
/* First_save_entry */
ptr_val = tdes->first_save_entry;
if (ptr_val == NULL)
{
db_make_null (&vals[idx]);
}
else
{
snprintf (buf, sizeof (buf), "0x%08" PRIx64, (UINT64) ptr_val);
error = db_make_string_copy (&vals[idx], buf);
if (error != NO_ERROR)
{
goto exit_on_error;
}
}
idx++;
/* Tran_unique_stats */
if (tdes->m_multiupd_stats.empty ())
{
db_make_null (&vals[idx]);
}
else
{
string_buffer strbuf (cubmem::PRIVATE_BLOCK_ALLOCATOR);
tdes->m_multiupd_stats.to_string (strbuf);
error = db_make_string (&vals[idx], strbuf.release_ptr ());
vals[idx].need_clear = true;
if (error != NO_ERROR)
{
goto exit_on_error;
}
}
idx++;
/* modified class list */
if (tdes->m_modified_classes.empty ())
{
db_make_null (&vals[idx]);
}
else
{
(void) db_make_string (&vals[idx], tdes->m_modified_classes.to_string ());
vals[idx].need_clear = true;
}
idx++;
/* Num_temp_files */
db_make_int (&vals[idx], file_get_tran_num_temp_files (thread_p));
idx++;
/* Waiting_for_res */
ptr_val = tdes->waiting_for_res;
if (ptr_val == NULL)
{
db_make_null (&vals[idx]);
}
else
{
snprintf (buf, sizeof (buf), "0x%08" PRIx64, (UINT64) ptr_val);
error = db_make_string_copy (&vals[idx], buf);
if (error != NO_ERROR)
{
goto exit_on_error;
}
}
idx++;
/* Has_deadlock_priority */
db_make_int (&vals[idx], tdes->has_deadlock_priority);
idx++;
/* Suppress_replication */
db_make_int (&vals[idx], tdes->suppress_replication);
idx++;
/* Query_timeout */
i64val = tdes->query_timeout;
if (i64val <= 0)
{
db_make_null (&vals[idx]);
}
else
{
tval = i64val / 1000;
msecs = i64val % 1000;
db_localdatetime_msec (&tval, msecs, &time_val);
db_make_datetime (&vals[idx], &time_val);
}
idx++;
/* Query_start_time */
i64val = tdes->query_start_time;
if (i64val <= 0)
{
db_make_null (&vals[idx]);
}
else
{
tval = i64val / 1000;
msecs = i64val % 1000;
db_localdatetime_msec (&tval, msecs, &time_val);
db_make_datetime (&vals[idx], &time_val);
}
idx++;
/* Tran_start_time */
i64val = tdes->tran_start_time;
if (i64val <= 0)
{
db_make_null (&vals[idx]);
}
else
{
tval = i64val / 1000;
msecs = i64val % 1000;
db_localdatetime_msec (&tval, msecs, &time_val);
db_make_datetime (&vals[idx], &time_val);
}
idx++;
/* Xasl_id */
XASL_ID_COPY (&xasl_val, &tdes->xasl_id);
if (XASL_ID_IS_NULL (&xasl_val))
{
db_make_null (&vals[idx]);
}
else
{
snprintf (buf, sizeof (buf), "sha1 = %08x | %08x | %08x | %08x | %08x, time_stored = %d sec %d usec",
SHA1_AS_ARGS (&xasl_val.sha1), CACHE_TIME_AS_ARGS (&xasl_val.time_stored));
error = db_make_string_copy (&vals[idx], buf);
if (error != NO_ERROR)
{
goto exit_on_error;
}
}
idx++;
/* Disable_modifications */
db_make_int (&vals[idx], tdes->disable_modifications);
idx++;
/* Abort_reason */
str = tran_abort_reason_to_string (tdes->tran_abort_reason);
db_make_string (&vals[idx], str);
idx++;
assert (idx == num_cols);
}
TR_TABLE_CS_EXIT (thread_p);
*ptr = ctx;
return NO_ERROR;
exit_on_error:
TR_TABLE_CS_EXIT (thread_p);
if (ctx != NULL)
{
showstmt_free_array_context (thread_p, ctx);
}
return error;
}
/*
* tran_abort_reason_to_string() - return the string alias of enum value
*
* return: constant string
*
* val(in): the enum value
*/
const char *
tran_abort_reason_to_string (TRAN_ABORT_REASON val)
{
switch (val)
{
case TRAN_NORMAL:
return "NORMAL";
case TRAN_ABORT_DUE_DEADLOCK:
return "ABORT_DUE_DEADLOCK";
case TRAN_ABORT_DUE_ROLLBACK_ON_ESCALATION:
return "ABORT_DUE_ROLLBACK_ON_ESCALATION";
}
return "UNKNOWN";
}
/*
* logtb_check_class_for_rr_isolation_err () - Check if the class have to be checked against serializable conflicts
*
* return : true if the class is not root/trigger/user class, otherwise false
* class_oid (in) : Class object identifier.
*
* Note: Do not check system classes that are not part of catalog for rr isolation level error. Isolation consistency
* is secured using locks anyway. These classes are in a way related to table schema's and can be accessed
* before the actual classes. db_user instances are fetched to check authorizations, while db_root and _db_trigger
* are accessed when triggers are modified.
* The RR isolation has to check if an instance that we want to lock was modified by concurrent transaction.
* If the instance was modified, then this means we have an isolation conflict. The check must verify last
* instance version visibility over transaction snapshot. The version is visible if and only if it was not
* modified by concurrent transaction. To check visibility, we must first generate a transaction snapshot.
* Since instances from these classes are accessed before locking tables, the snapshot is generated before
* transaction is blocked on table lock. The results will then seem to be inconsistent with most cases when table
* locks are acquired before snapshot.
*/
bool
logtb_check_class_for_rr_isolation_err (const OID * class_oid)
{
assert (class_oid != NULL && !OID_ISNULL (class_oid));
if (!oid_check_cached_class_oid (OID_CACHE_DB_ROOT_CLASS_ID, class_oid)
&& !oid_check_cached_class_oid (OID_CACHE_USER_CLASS_ID, class_oid)
&& !oid_check_cached_class_oid (OID_CACHE_TRIGGER_CLASS_ID, class_oid))
{
return true;
}
return false;
}
void
logtb_slam_transaction (THREAD_ENTRY * thread_p, int tran_index)
{
logtb_set_tran_index_interrupt (thread_p, tran_index, true);
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_CSS_CONN_SHUTDOWN, 0);
#if defined (SERVER_MODE)
css_shutdown_conn_by_tran_index (tran_index, LOGTB_RETRY_SLAM_MAX_TIMES);
#endif // SERVER_MODE
}
/*
* logtb_check_kill_tran_auth () - User who is not DBA can kill only own transaction
* return: NO_ERROR or error code
* thread_p(in):
* tran_id(in):
* has_authorization(out):
*/
static int
logtb_check_kill_tran_auth (THREAD_ENTRY * thread_p, int tran_id, bool * has_authorization)
{
const char *tran_client_name;
const char *current_client_name;
assert (has_authorization);
*has_authorization = false;
if (logtb_am_i_dba_client (thread_p) == true)
{
*has_authorization = true;
return NO_ERROR;
}
tran_client_name = logtb_find_client_name (tran_id);
current_client_name = logtb_find_current_client_name (thread_p);
if (tran_client_name == NULL || current_client_name == NULL)
{
return ER_CSS_KILL_UNKNOWN_TRANSACTION;
}
if (strcasecmp (tran_client_name, current_client_name) == 0)
{
*has_authorization = true;
}
return NO_ERROR;
}
/*
* xlogtb_kill_tran_index() - Kill given transaction.
* return:
* kill_tran_index(in):
* kill_user(in):
* kill_host(in):
* kill_pid(id):
*/
int
xlogtb_kill_tran_index (THREAD_ENTRY * thread_p, int kill_tran_index, char *kill_user_p, char *kill_host_p,
int kill_pid)
{
const char *slam_progname_p; /* Client program name for tran */
const char *slam_user_p; /* Client user name for tran */
const char *slam_host_p; /* Client host for tran */
int slam_pid; /* Client process id for tran */
bool signaled = false;
int error_code = NO_ERROR;
bool killed = false;
size_t i;
if (kill_tran_index == NULL_TRAN_INDEX || kill_user_p == NULL || kill_host_p == NULL || strcmp (kill_user_p, "") == 0
|| strcmp (kill_host_p, "") == 0)
{
/*
* Not enough information to kill specific transaction..
*
* For now.. I am setting an er_set..since I have so many files out..and
* I cannot compile more junk..
*/
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_CSS_KILL_BAD_INTERFACE, 0);
return ER_CSS_KILL_BAD_INTERFACE;
}
if (kill_tran_index == LOG_SYSTEM_TRAN_INDEX)
{
// cannot kill system transaction; not even if this is dba
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_KILL_TR_NOT_ALLOWED, 1, kill_tran_index);
return ER_KILL_TR_NOT_ALLOWED;
}
signaled = false;
for (i = 0; i < LOGTB_RETRY_SLAM_MAX_TIMES && error_code == NO_ERROR && !killed; i++)
{
if (logtb_find_client_name_host_pid (kill_tran_index, &slam_progname_p, &slam_user_p, &slam_host_p, &slam_pid) !=
NO_ERROR)
{
if (signaled == false)
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_CSS_KILL_UNKNOWN_TRANSACTION, 4, kill_tran_index,
kill_user_p, kill_host_p, kill_pid);
error_code = ER_CSS_KILL_UNKNOWN_TRANSACTION;
}
else
{
killed = true;
}
break;
}
if (kill_pid == slam_pid && strcmp (kill_user_p, slam_user_p) == 0 && strcmp (kill_host_p, slam_host_p) == 0)
{
logtb_slam_transaction (thread_p, kill_tran_index);
signaled = true;
}
else
{
if (signaled == false)
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_CSS_KILL_DOES_NOTMATCH, 8, kill_tran_index, kill_user_p,
kill_host_p, kill_pid, kill_tran_index, slam_user_p, slam_host_p, slam_pid);
error_code = ER_CSS_KILL_DOES_NOTMATCH;
}
else
{
killed = true;
}
break;
}
thread_sleep_for (std::chrono::seconds (1));
}
if (error_code == NO_ERROR && !killed)
{
error_code = ER_FAILED; /* timeout */
}
return error_code;
}
/*
* xlogtb_kill_or_interrupt_tran() -
* return:
* thread_p(in):
* tran_index(in):
* is_dba_group_member(in):
* kill_query_only(in):
*/
int
xlogtb_kill_or_interrupt_tran (THREAD_ENTRY * thread_p, int tran_index, bool is_dba_group_member, bool interrupt_only)
{
#if defined (SERVER_MODE)
LOG_TDES *tdes;
#endif
bool interrupt, has_authorization;
bool is_trx_exists;
KILLSTMT_TYPE kill_type;
int error;
size_t i;
if (tran_index == LOG_SYSTEM_TRAN_INDEX)
{
// cannot kill system transaction; not even if this is dba
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_KILL_TR_NOT_ALLOWED, 1, tran_index);
return ER_KILL_TR_NOT_ALLOWED;
}
if (!is_dba_group_member)
{
error = logtb_check_kill_tran_auth (thread_p, tran_index, &has_authorization);
if (error != NO_ERROR)
{
return error;
}
if (has_authorization == false)
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_KILL_TR_NOT_ALLOWED, 1, tran_index);
return ER_KILL_TR_NOT_ALLOWED;
}
}
kill_type = interrupt_only ? KILLSTMT_QUERY : KILLSTMT_TRAN;
if (kill_type == KILLSTMT_TRAN)
{
#if defined (SERVER_MODE)
is_trx_exists = false;
if (log_Gl.trantable.area != NULL)
{
tdes = LOG_FIND_TDES (tran_index);
if (tdes != NULL && tdes->trid != NULL_TRANID)
{
is_trx_exists = true;
}
}
if (css_shutdown_conn_by_tran_index (tran_index, LOGTB_RETRY_SLAM_MAX_TIMES) != NO_ERROR)
{
return ER_FAILED;
}
#else
is_trx_exists = logtb_set_tran_index_interrupt (thread_p, tran_index, true);
#endif
}
else
{
is_trx_exists = logtb_set_tran_index_interrupt (thread_p, tran_index, true);
}
thread_sleep_for (std::chrono::seconds (1));
if (is_trx_exists == false)
{
/*
* Note that the following error will be ignored by
* sthread_kill_or_interrupt_tran().
*/
return ER_FAILED;
}
return NO_ERROR;
}
//
// logtb_find_thread_entry_mapfunc - function mapped over thread manager's entries to find thread belonging to given
// transaction index
//
// thread_ref (in) : thread entry
// stop_mapper (out) : output true to stop mapping
// tran_index (in) : searched transaction index
// except_me (in) : true to accept current transaction, false otherwise
// found_ptr (out) : saves pointer to found thread entry
//
static void
logtb_find_thread_entry_mapfunc (THREAD_ENTRY & thread_ref, bool & stop_mapper, int tran_index, bool except_me,
REFPTR (THREAD_ENTRY, found_ptr))
{
if (thread_ref.tran_index != tran_index)
{
// not this
return;
}
if (except_me && thread_ref.is_on_current_thread ())
{
// not me
return;
}
// found
found_ptr = &thread_ref;
stop_mapper = true; // stop searching
}
//
// logtb_find_thread_by_tran_index - find thread entry by transaction index
//
// return : NULL or pointer to found thread
// tran_index (in) : searched transaction index
//
THREAD_ENTRY *
logtb_find_thread_by_tran_index (int tran_index)
{
THREAD_ENTRY *found_thread = NULL;
thread_get_manager ()->map_entries (logtb_find_thread_entry_mapfunc, tran_index, false, found_thread);
return found_thread;
}
//
// thread_find_entry_by_tran_index_except_me - find thread entry by transaction index; ignore current thread
//
// return : NULL or pointer to found thread
// tran_index (in) : searched transaction index
//
THREAD_ENTRY *
logtb_find_thread_by_tran_index_except_me (int tran_index)
{
THREAD_ENTRY *found_thread = NULL;
thread_get_manager ()->map_entries (logtb_find_thread_entry_mapfunc, tran_index, true, found_thread);
return found_thread;
}
#if defined (SERVER_MODE)
//
// logtb_wakeup_thread_with_tran_index - find thread by transaction index and wake it
//
// tran_index (in) : searched transaction index
// resume_reason (in) : the reason thread is resumed
void
logtb_wakeup_thread_with_tran_index (int tran_index, thread_resume_suspend_status resume_reason)
{
// find thread with transaction index; ignore current thread
THREAD_ENTRY *thread_p = logtb_find_thread_by_tran_index_except_me (tran_index);
if (thread_p == NULL)
{
// not found
return;
}
thread_wakeup (thread_p, resume_reason);
}
#endif // SERVER_MODE
/*
* logtb_get_current_tran_index() - get transaction index of current thread
* return:
*/
int
logtb_get_current_tran_index (void)
{
THREAD_ENTRY *thread_p = thread_get_thread_entry_info ();
assert (thread_p != NULL);
return thread_p->tran_index;
}
/*
* logtb_set_current_tran_index - set transaction index on current thread
*/
void
logtb_set_current_tran_index (THREAD_ENTRY * thread_p, int tran_index)
{
if (thread_p == NULL)
{
thread_p = thread_get_thread_entry_info ();
}
thread_p->tran_index = tran_index;
}
/*
* logtb_set_check_interrupt() -
* return:
* flag(in):
*/
bool
logtb_set_check_interrupt (THREAD_ENTRY * thread_p, bool flag)
{
#if defined (SERVER_MODE)
bool old_val = true;
if (BO_IS_SERVER_RESTARTED ())
{
if (thread_p == NULL)
{
thread_p = thread_get_thread_entry_info ();
}
/* safe guard: vacuum workers should not check for interrupt */
assert (flag == false || !VACUUM_IS_THREAD_VACUUM (thread_p));
old_val = thread_p->check_interrupt;
thread_p->check_interrupt = flag;
}
return old_val;
#else // not SERVER_MODE = SA_MODE
return tran_set_check_interrupt (flag);
#endif // not SERVER_MODE = SA_MODE
}
/*
* logtb_get_check_interrupt() -
* return:
*/
bool
logtb_get_check_interrupt (THREAD_ENTRY * thread_p)
{
#if defined (SERVER_MODE)
bool ret_val = true;
if (BO_IS_SERVER_RESTARTED ())
{
if (thread_p == NULL)
{
thread_p = thread_get_thread_entry_info ();
}
ret_val = thread_p->check_interrupt;
}
return ret_val;
#else // not SERVER_MODE = SA_MODE
return tran_get_check_interrupt ();
#endif // not SERVER_MODE = SA_MODE
}
LOG_TDES *
logtb_get_system_tdes (THREAD_ENTRY * thread_p)
{
if (thread_p == NULL)
{
thread_p = thread_get_thread_entry_info ();
}
// if requesting system tran_index and this is a system worker, return its own log_tdes
if (thread_p->tran_index == LOG_SYSTEM_TRAN_INDEX && thread_p->get_system_tdes () != NULL)
{
return thread_p->get_system_tdes ()->get_tdes ();
}
else
{
return log_Gl.trantable.all_tdes[LOG_SYSTEM_TRAN_INDEX];
}
}
// *INDENT-OFF*
// C++
bool
log_tdes::is_active_worker_transaction () const
{
return tran_index > LOG_SYSTEM_TRAN_INDEX && tran_index < log_Gl.trantable.num_total_indices;
}
bool
log_tdes::is_system_transaction () const
{
return tran_index == LOG_SYSTEM_TRAN_INDEX;
}
bool
log_tdes::is_system_main_transaction () const
{
return is_system_transaction () && trid == LOG_SYSTEM_TRANID;
}
bool
log_tdes::is_system_worker_transaction () const
{
return is_system_transaction () && trid < NULL_TRANID;
}
bool
log_tdes::is_allowed_sysop () const
{
return is_active_worker_transaction () || is_system_worker_transaction ();
}
bool
log_tdes::is_under_sysop () const
{
return topops.last >= 0;
}
bool
log_tdes::is_allowed_undo () const
{
return is_active_worker_transaction () || is_under_sysop ();
}
void
log_tdes::lock_topop ()
{
if (LOG_ISRESTARTED () && is_active_worker_transaction ())
{
cubthread::entry *thread_p = NULL;
// TODO [PL/CSQL]: It will be fixed at CBRD-25641.
// The following code inside of #if block is a workaround for the issue.
#if 1
if (rmutex_topop.owner != thread_id_t ())
{
cubpl::session *session = cubpl::get_session();
if (session
&& session->is_thread_involved (rmutex_topop.owner))
{
thread_p = thread_get_manager ()->find_by_tid (rmutex_topop.owner);
}
}
#endif
int r = rmutex_lock (thread_p, &rmutex_topop);
assert (r == NO_ERROR);
}
}
void
log_tdes::unlock_topop ()
{
if (LOG_ISRESTARTED () && is_active_worker_transaction ())
{
cubthread::entry *thread_p = NULL;
// TODO [PL/CSQL]: It will be fixed at CBRD-25641.
// The following code inside of #if block is a workaround for the issue.
#if 1
if (rmutex_topop.owner != thread_id_t ())
{
cubpl::session *session = cubpl::get_session();
if (session
&& session->is_thread_involved (rmutex_topop.owner))
{
thread_p = thread_get_manager ()->find_by_tid (rmutex_topop.owner);
}
}
#endif
int r = rmutex_unlock (thread_p, &rmutex_topop);
assert (r == NO_ERROR);
}
}
void
log_tdes::on_sysop_start ()
{
assert (is_allowed_sysop ());
if (is_system_worker_transaction () && topops.last < 0)
{
if (!LOG_ISRESTARTED ())
{
/* The links are used at recovery. */
return;
}
// make sure all links to previous records are lost
assert (topops.last == -1);
LSA_SET_NULL (&head_lsa);
LSA_SET_NULL (&tail_lsa);
LSA_SET_NULL (&undo_nxlsa);
LSA_SET_NULL (&tail_topresult_lsa);
assert (commit_abort_lsa.is_null ());
LSA_SET_NULL (&rcv.tran_start_postpone_lsa);
LSA_SET_NULL (&rcv.sysop_start_postpone_lsa);
}
}
void
log_tdes::on_sysop_end ()
{
assert (is_allowed_sysop ());
if (is_system_worker_transaction() && topops.last < 0)
{
// make sure this system operation cannot be linked
assert (topops.last == -1);
LSA_SET_NULL (&head_lsa);
LSA_SET_NULL (&tail_lsa);
LSA_SET_NULL (&undo_nxlsa);
LSA_SET_NULL (&tail_topresult_lsa);
assert (commit_abort_lsa.is_null ());
}
}
void
log_tdes::lock_global_oldest_visible_mvccid ()
{
if (!block_global_oldest_active_until_commit)
{
log_Gl.mvcc_table.lock_global_oldest_visible ();
block_global_oldest_active_until_commit = true;
}
}
void
log_tdes::unlock_global_oldest_visible_mvccid ()
{
if (block_global_oldest_active_until_commit)
{
assert (log_Gl.mvcc_table.is_global_oldest_visible_locked ());
log_Gl.mvcc_table.unlock_global_oldest_visible ();
block_global_oldest_active_until_commit = false;
}
}
void
log_tdes::copy_to (LOG_TDES & dest) const
{
#define REPLACE_COPY_2_DEST(d, _name) ((d)._name = this->_name)
this->mvccinfo.copy_to (dest.mvccinfo); // MVCC_INFO
REPLACE_COPY_2_DEST (dest, tran_index);
REPLACE_COPY_2_DEST (dest, trid);
REPLACE_COPY_2_DEST (dest, isloose_end);
REPLACE_COPY_2_DEST (dest, state);
REPLACE_COPY_2_DEST (dest, isolation);
REPLACE_COPY_2_DEST (dest, wait_msecs);
REPLACE_COPY_2_DEST (dest, head_lsa);
REPLACE_COPY_2_DEST (dest, tail_lsa);
REPLACE_COPY_2_DEST (dest, undo_nxlsa);
REPLACE_COPY_2_DEST (dest, posp_nxlsa);
REPLACE_COPY_2_DEST (dest, savept_lsa);
REPLACE_COPY_2_DEST (dest, topop_lsa);
REPLACE_COPY_2_DEST (dest, tail_topresult_lsa);
REPLACE_COPY_2_DEST (dest, commit_abort_lsa);
REPLACE_COPY_2_DEST (dest, client_id);
REPLACE_COPY_2_DEST (dest, gtrid);
REPLACE_COPY_2_DEST (dest, client); // CLIENTIDS
REPLACE_COPY_2_DEST (dest, rmutex_topop); // SYNC_RMUTEX
REPLACE_COPY_2_DEST (dest, topops); // LOG_TOPOPS_STACK
REPLACE_COPY_2_DEST (dest, gtrinfo);
REPLACE_COPY_2_DEST (dest, coord);
REPLACE_COPY_2_DEST (dest, num_unique_btrees);
REPLACE_COPY_2_DEST (dest, max_unique_btrees);
this->m_multiupd_stats.copy_to (dest.m_multiupd_stats); // multi_index_unique_stats
REPLACE_COPY_2_DEST (dest, interrupt); // sig_atomic_t
REPLACE_COPY_2_DEST (dest, m_modified_classes); // tx_transient_class_registry
REPLACE_COPY_2_DEST (dest, num_transient_classnames);
REPLACE_COPY_2_DEST (dest, num_repl_records);
REPLACE_COPY_2_DEST (dest, cur_repl_record);
REPLACE_COPY_2_DEST (dest, append_repl_recidx);
REPLACE_COPY_2_DEST (dest, fl_mark_repl_recidx);
REPLACE_COPY_2_DEST (dest, repl_records);
REPLACE_COPY_2_DEST (dest, repl_insert_lsa);
REPLACE_COPY_2_DEST (dest, repl_update_lsa);
REPLACE_COPY_2_DEST (dest, first_save_entry);
REPLACE_COPY_2_DEST (dest, suppress_replication);
REPLACE_COPY_2_DEST (dest, lob_locator_root);
REPLACE_COPY_2_DEST (dest, query_timeout);
REPLACE_COPY_2_DEST (dest, query_start_time);
REPLACE_COPY_2_DEST (dest, tran_start_time);
REPLACE_COPY_2_DEST (dest, xasl_id);
REPLACE_COPY_2_DEST (dest, waiting_for_res);
REPLACE_COPY_2_DEST (dest, disable_modifications);
REPLACE_COPY_2_DEST (dest, tran_abort_reason);
REPLACE_COPY_2_DEST (dest, num_exec_queries);
memcpy (dest.bind_history, this->bind_history, sizeof (dest.bind_history));
REPLACE_COPY_2_DEST (dest, num_log_records_written);
REPLACE_COPY_2_DEST (dest, log_upd_stats);
REPLACE_COPY_2_DEST (dest, has_deadlock_priority);
REPLACE_COPY_2_DEST (dest, block_global_oldest_active_until_commit);
REPLACE_COPY_2_DEST (dest, is_user_active);
REPLACE_COPY_2_DEST (dest, rcv);
this->m_log_postpone_cache.copy_to (dest.m_log_postpone_cache); // log_postpone_cache
REPLACE_COPY_2_DEST (dest, has_supplemental_log);
}
// *INDENT-ON*