File log_2pc.c¶
File List > cubrid > src > transaction > log_2pc.c
Go to the documentation of this file
/*
* Copyright 2008 Search Solution Corporation
* Copyright 2016 CUBRID Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
/*
* log_2pc.c -
*/
#ident "$Id$"
#include "log_2pc.h"
#include "config.h"
#if defined(SERVER_MODE)
#include "connection_error.h"
#endif /* SERVER_MODE */
#include "error_manager.h"
#include "dblink_2pc.h"
#include "lock_manager.h"
#include "log_append.hpp"
#include "log_comm.h"
#include "log_impl.h"
#include "log_lsa.hpp"
#include "log_reader.hpp"
#include "log_manager.h"
#include "memory_alloc.h"
#include "page_buffer.h"
#include "storage_common.h"
#include "system_parameter.h"
#if !defined(WINDOWS)
#include "tcp.h" /* for css_gethostid */
#else /* !WINDOWS */
#include "wintcp.h"
#include "porting.h"
#endif /* !WINDOWS */
#include <stddef.h>
#include <string.h>
#include <limits.h>
#include <assert.h>
/* The following two are for getpid */
#include <sys/types.h>
// XXX: SHOULD BE THE LAST INCLUDE HEADER
#include "memory_wrapper.hpp"
#if !defined(SERVER_MODE)
#define CSS_ENABLE_INTERRUPTS
#endif /* !SERVER_MODE */
/* Variables */
struct log_2pc_global_data
{
int (*get_participants) (THREAD_ENTRY * thread_p, int *particp_id_length, void **block_particps_ids);
int (*lookup_participant) (void *particp_id, int num_particps, void *block_particps_ids);
void (*dump_participants) (FILE * fp, int block_length, void *block_particps_id);
bool (*send_prepare) (THREAD_ENTRY * thread_p, int gtrid, int num_particps, void *block_particps_ids);
void (*send_commit) (THREAD_ENTRY * thread_p, int gtrid, int num_particps, bool is_commit, void *block_particps_ids);
void (*send_abort) (THREAD_ENTRY * thread_p, int gtrid, int num_particps, bool is_abort, void *block_particps_ids);
};
#ifdef CCI_XA
struct log_2pc_global_data log_2pc_Userfun =
{ dblink_2pc_get_participants, NULL, dblink_2pc_dump_participants, dblink_2pc_send_prepare,
dblink_2pc_end_tran,
dblink_2pc_end_tran
};
#else
struct log_2pc_global_data log_2pc_Userfun = { NULL, NULL, NULL, NULL, NULL, NULL };
#endif
static int log_2pc_get_num_participants (int *partid_len, void **block_particps_ids);
static int log_2pc_make_global_tran_id (TRANID tranid);
static bool log_2pc_check_duplicate_global_tran_id (int gtrid);
static int log_2pc_commit_first_phase (THREAD_ENTRY * thread_p, LOG_TDES * tdes, LOG_2PC_EXECUTE execute_2pc_type,
bool * decision);
static TRAN_STATE log_2pc_commit_second_phase (THREAD_ENTRY * thread_p, LOG_TDES * tdes, bool * decision);
static void log_2pc_append_start (THREAD_ENTRY * thread_p, LOG_TDES * tdes);
static void log_2pc_append_decision (THREAD_ENTRY * thread_p, LOG_TDES * tdes, LOG_RECTYPE decsion);
static LOG_TDES *log_2pc_find_tran_descriptor (int gtrid);
static int log_2pc_attach_client (THREAD_ENTRY * thread_p, LOG_TDES * tdes, LOG_TDES * client_tdes);
static void log_2pc_recovery_prepare (THREAD_ENTRY * thread_p, LOG_TDES * tdes, LOG_LSA * log_lsa,
LOG_PAGE * log_page_p);
static int log_2pc_recovery_start (THREAD_ENTRY * thread_p, LOG_TDES * tdes, LOG_LSA * log_lsa, LOG_PAGE * log_page_p,
int *ack_list, int *ack_count);
static int *log_2pc_expand_ack_list (THREAD_ENTRY * thread_p, int *ack_list, int *ack_count, int *size_ack_list);
static void log_2pc_recovery_recv_ack (THREAD_ENTRY * thread_p, LOG_LSA * log_lsa, LOG_PAGE * log_page_p, int *ack_list,
int *ack_count);
static int log_2pc_recovery_analysis_record (THREAD_ENTRY * thread_p, LOG_RECTYPE record_type, LOG_TDES * tdes,
LOG_LSA * log_lsa, LOG_PAGE * log_page_p, int **ack_list, int *ack_count,
int *size_ack_list, bool * search_2pc_prepare, bool * search_2pc_start);
static void log_2pc_recovery_collecting_participant_votes (THREAD_ENTRY * thread_p, LOG_TDES * tdes);
static void log_2pc_recovery_abort_decision (THREAD_ENTRY * thread_p, LOG_TDES * tdes);
static void log_2pc_recovery_commit_decision (THREAD_ENTRY * thread_p, LOG_TDES * tdes);
static void log_2pc_recovery_committed_informing_participants (THREAD_ENTRY * thread_p, LOG_TDES * tdes);
static void log_2pc_recovery_aborted_informing_participants (THREAD_ENTRY * thread_p, LOG_TDES * tdes);
/*
* FUNCTIONS RELATED TO COMMUNICATION BETWEEN THIS MODULE AND YOUR
* APPLICATION (I.E, BETWEEN COORDINATOR AND PARTICIPANTS OR VICE VERSA)
*/
/*
* log_2pc_find_particps - Find all participants
*
* return: number of participants
*
* partid_len(in): Length of each participant (Set as a side effect)
* block_particps_ids(in): An array of participant ids where each participant id
* is of length "partid_len" (Set as a side effect)
*
* NOTE:Find the participants of the current transaction. If the
* the transaction was not distributed one, the number of
* participants is set to zero.
*/
static int
log_2pc_get_num_participants (int *partid_len, void **block_particps_ids)
{
int num_particps;
THREAD_ENTRY *thread_p = thread_get_thread_entry_info ();
if (log_2pc_Userfun.get_participants == NULL)
{
partid_len = 0;
block_particps_ids = NULL;
return 0;
}
num_particps = (*log_2pc_Userfun.get_participants) (thread_p, partid_len, block_particps_ids);
return num_particps;
}
/*
* log_2pc_dump_participants - Dump all participants
*
* return: nothing..
*
* block_length(in): Length of each participant.
* block_particps_ids(in): An array of particpant ids. The length of each
* element should be known by the callee.
*
* Note:Print all participants in a readable form. The particpant
* length is used to find out the number of participants.
*/
void
log_2pc_dump_participants (FILE * fp, int block_length, void *block_particps_ids)
{
(*log_2pc_Userfun.dump_participants) (fp, block_length, block_particps_ids);
return;
}
/*
* log_2pc_send_prepare - SEND A PREPARE MESSAGE TO PARTICIPANTS
*
* return:
*
* gtrid(in): Global/distributed transaction identifier
* num_particps(in): Number of participants at block_particps_ids
* block_particps_ids(in): An array of particpant ids. The length of each
* element should be known by the callee.
*
* NOTE: Send a prepare to commit message to all participants, then
* collects the votes and retruns true if all participants are
* willing to commit, otherwise, false is returned.
*
* Currently, our communication subsystem does not provide an
* asynchronous capabilities for multicasting. Once this is
* provided, the jobs of this function will change. For example,
* the collecting of votes will be done through interrupts, and
* so on.
*/
bool
log_2pc_send_prepare (THREAD_ENTRY * thread_p, int gtrid, int num_particps, void *block_particps_ids)
{
return (*log_2pc_Userfun.send_prepare) (thread_p, gtrid, num_particps, block_particps_ids);
}
/*
* log_2pc_send_commit_decision - SEND A COMMIT TO PARTICIPANTS
*
* return:
*
* gtrid(in): Global/distributed transaction identifier
* num_particps(in): Number of participants at block_particps_ids
* particps_indices(in): Participant indices
* block_particps_ids(in): An array of particpant ids. The length of each
* element should be known by the callee.
*
* NOTE:Send the commit decision to participants which have not
* received the commit decision. This is found by looking to the
* particps_indices array. If the ith element of the array is 0,
* the ith participant needs to be informed, otherwise, it does
* not have to. The particps_indices cannot be NULL. When a
* participant acknowledge for receiving the decisions, the
* function log_2pc_append_recv_ack must be called indicating the index
* number.
*
* Currently, our communication subsystem does not provide an
* asynchronous capabilities for multicasting. Once this is
* provided, the jobs of this function will change. For example,
* the collecting of votes will be done through interrupts, and
* so on.
*/
void
log_2pc_send_commit_decision (THREAD_ENTRY * thread_p, int gtrid, int num_particps, void *block_particps_ids)
{
(*log_2pc_Userfun.send_commit) (thread_p, gtrid, num_particps, true /* commit */ , block_particps_ids);
return;
}
/*
* log_2pc_send_abort_decision - SEND AN ABORT TO PARTICIPANTS
*
* return:
*
* gtrid(in): Global/distributed transaction identifier
* num_particps(in): Number of participants at block_particps_ids
* particps_indices(in): Participant indices
* block_particps_ids(in): An array of particpant ids. The length of each
* element should be known by the callee.
* collect(in): Wheater or not acks should be collected
*
* NOTE:Send the abort decision to participants which have not received
* the abort decision and that they were willing to commit. This
* is found by looking to the particps_indices array. If the ith
* element of the array is 0, the ith participant needs to be
* informed, otherwise, it does not have to. If the
* particps_indices is NULL, we must sent to all participants.
* If collect ack is used to indicate to participants if an ack
* is needed. An acks is not needed if the abort was decided
* before entering 2PC. When collect is true and when a
* participant acknowledge for receiving the decisonis received,
* the function log_2pc_append_recv_ack must be called indicating the
* index number.
*
* Currently, our communication subsystem does not provide an
* asynchronous capabilities for multicasting. Once this is
* provided, the jobs of this function will change. For example,
* the collecting of votes will be done through interrupts, and
* so on.
*/
void
log_2pc_send_abort_decision (THREAD_ENTRY * thread_p, int gtrid, int num_particps, void *block_particps_ids)
{
(*log_2pc_Userfun.send_abort) (thread_p, gtrid, num_particps, false /* abort */ , block_particps_ids);
return;
}
/*
*
* THE REST OF SUPPORTING 2PC FUNCTIONS
*
*/
#if defined (ENABLE_UNUSED_FUNCTION)
/*
* log_get_global_tran_id - FIND CURRENT GLOBAL TRANID
*
* return: gtrid
*
* NOTE:Find current gloabl transaction identifier.
*/
int
log_get_global_tran_id (THREAD_ENTRY * thread_p)
{
int tran_index;
LOG_TDES *tdes; /* Transaction descriptor */
int gtrid = LOG_2PC_NULL_GTRID;
tran_index = LOG_FIND_THREAD_TRAN_INDEX (thread_p);
TR_TABLE_CS_ENTER (thread_p);
tdes = LOG_FIND_TDES (tran_index);
if (tdes != NULL)
{
if (tdes->gtrid == LOG_2PC_NULL_GTRID && log_2pc_is_tran_distributed (tdes) == true)
{
tdes->gtrid = log_2pc_make_global_tran_id (tdes->trid);
}
gtrid = tdes->gtrid;
}
TR_TABLE_CS_EXIT (thread_p);
return gtrid;
}
#endif
/*
* log_2pc_make_global_tran_id - Make a global transaction identifier for
* 2pc purposes
*
* return:
*
* tranid(in): Transaction identifier
*
* NOTE:Build a global transaction identifier based on the host
* identifier, the process identifier and the transaction
* identifier.
*/
static int
log_2pc_make_global_tran_id (TRANID tranid)
{
unsigned char *ptr;
unsigned int hash;
unsigned int value;
unsigned int unsig_gtrid;
int gtrid;
unsigned int i;
unsig_gtrid = 0;
/* HASH THE HOST IDENTIFIER INTO ONE BYTE */
value = css_gethostid ();
hash = 0;
ptr = (unsigned char *) &value;
for (i = 0; i < sizeof (value); i++)
{
hash = (hash << 5) - hash + *ptr++;
}
/* Don't use more than one byte */
/* set the MSB to zero */
unsig_gtrid = unsig_gtrid + ((hash % UCHAR_MAX) & 0x7F);
/* HASH THE PROCESS IDENTIFIER INTO ONE BYTE */
value = (unsigned int) getpid ();
hash = 0;
ptr = (unsigned char *) &value;
for (i = 0; i < sizeof (value); i++)
{
hash = (hash << 5) - hash + *ptr++;
}
/* Don't use more than one byte */
unsig_gtrid = (unsig_gtrid << 8) + (hash % UCHAR_MAX);
/* FOLD the TRANSACTION IDENTIFIER INTO TWO */
unsigned short ushort_one;
unsigned short ushort_two;
memcpy (&ushort_one, &tranid, sizeof (unsigned short));
memcpy (&ushort_two, (char *) (&tranid) + sizeof (unsigned short), sizeof (unsigned short));
hash = ushort_one;
hash = (hash << 5) - hash + ushort_two;
/* Don't use more than two byte */
unsig_gtrid = (unsig_gtrid << 16) + (hash % SHRT_MAX);
/*
* Make sure that another 2PC transaction does not have this identifier.
* If there is one, subtract one and check again. Note that the identifier
* may be duplicated in a remote site, we do not have control about this.
*/
gtrid = (int) unsig_gtrid;
value = 1;
if (gtrid == LOG_2PC_NULL_GTRID)
{
gtrid--;
}
while (log_2pc_check_duplicate_global_tran_id (gtrid))
{
gtrid--;
if (gtrid == LOG_2PC_NULL_GTRID)
{
gtrid--;
}
}
return (int) gtrid;
}
/*
* log_2pc_check_duplicate_global_tran_id
*
* return:
*
* gtrid(in): Global transaction identifier
*
* Note:
*/
static bool
log_2pc_check_duplicate_global_tran_id (int gtrid)
{
LOG_TDES *tdes; /* Transaction descriptor */
int i;
for (i = 0; i < log_Gl.trantable.num_total_indices; i++)
{
if (i != LOG_SYSTEM_TRAN_INDEX)
{
tdes = LOG_FIND_TDES (i);
if (tdes != NULL && tdes->trid != NULL_TRANID && tdes->gtrid != LOG_2PC_NULL_GTRID && gtrid == tdes->gtrid)
{
return true;
}
}
}
return false;
}
/*
* log_2pc_commit_first_phase
*
* return:
*
* tdes(in):
*
* Note:
*/
static int
log_2pc_commit_first_phase (THREAD_ENTRY * thread_p, LOG_TDES * tdes, LOG_2PC_EXECUTE execute_2pc_type, bool * decision)
{
int i;
/* Start the first phase of 2PC. Prepare to commit or voting phase */
if (tdes->state == TRAN_ACTIVE)
{
/*
* Start prepare to commit at coordinator site
* Note: that the transient classnames table entries should have
* already been taken care of.
*/
/*
* Release share locks types and record that the 2PC has started.
* NOTE: that log_2pc_append_start flushes the log and change the state
* of the transaction 2PC collecting votes
*/
/*
* Start the 2PC for this coordinator
*/
log_2pc_append_start (thread_p, tdes);
if (execute_2pc_type == LOG_2PC_EXECUTE_FULL)
{
/*
* This is the coordinatoor root
*/
lock_unlock_all_shared_get_all_exclusive (thread_p, NULL);
}
#ifdef LOG_2PC_ACK_RECV_REQUIRED
tdes->coord->ack_received = (bool *) calloc (i);
if (tdes->coord->ack_received == NULL)
{
/* Out of memory */
logpb_fatal_error (thread_p, true, ARG_FILE_LINE, "log_2pc_commit");
return ER_OUT_OF_VIRTUAL_MEMORY;
}
#endif
*decision =
log_2pc_send_prepare (thread_p, tdes->gtrid, tdes->coord->num_particps, tdes->coord->block_particps_ids);
}
else
{
/*
* We are not in the right mode for the prepare to commit phase
*/
*decision = false;
return ER_FAILED;
}
return NO_ERROR;
}
/*
* log_2pc_commit_second_phase
*
* return:
*
* tdes(in):
*
* Note:
*/
static TRAN_STATE
log_2pc_commit_second_phase (THREAD_ENTRY * thread_p, LOG_TDES * tdes, bool * decision)
{
TRAN_STATE state;
if (*decision == true)
{
/*
* DECISION is COMMIT
* The coordinator and all participants of distributed transaction
* have agreed to commit the transaction. The commit decision is
* forced to the log to find out the decision in the case of a crash.
*/
log_2pc_append_decision (thread_p, tdes, LOG_2PC_COMMIT_DECISION);
/*
* The transaction has been declared as 2PC commit. We could execute the
* LOCAL COMMIT AND THE REMOTE COMMITS IN PARALLEL, however our
* communication subsystem does not support asynchronous communication
* types. The commitment of the participants is done after the local
* commitment is completed.
*/
/* Save the state.. so it can be reverted to the 2pc state .. */
state = tdes->state;
/* 2PC protocol does not support RETAIN LOCK */
(void) log_commit_local (thread_p, tdes, false, false);
tdes->state = state; /* Revert to 2PC state... */
/*
* If the following function fails, the transaction will be dangling
* and we need to retry sending the decision at another point.
* We have already decided and log the decision in the log file.
*/
(void) log_2pc_send_commit_decision (thread_p, tdes->gtrid, tdes->coord->num_particps,
tdes->coord->block_particps_ids);
/* Check if all the acknowledgments have been received */
state = log_complete_for_2pc (thread_p, tdes, LOG_COMMIT, LOG_NEED_NEWTRID);
}
else
{
/*
* DECISION is ABORT
* The coordinator and/or some of the participants of distributed
* transaction could not agree to commit the transaction. The abort
* decision is logged. We do not need to forced since the default is
* abort. It does not matter whether this is a root coordinator or not
* the current site has decide to abort.
*/
/*
* If the transaction is active and there are not acknowledgments
* needed, the abort for the distributed transaction was decided
* without using the 2PC
*/
#ifdef LOG_2PC_ACK_RECV_REQUIRED
if (tdes->state != TRAN_ACTIVE || tdes->coord->ack_received != NULL)
#endif
{
log_2pc_append_decision (thread_p, tdes, LOG_2PC_ABORT_DECISION);
}
/*
* The transaction has been declared as 2PC abort. We could execute the
* LOCAL ABORT AND THE REMOTE ABORTS IN PARALLEL, however our
* communication subsystem does not support asynchronous communication
* types. The abort of the participants is done after the local abort
* is completed.
*/
/* Save the state.. so it can be reverted to the 2pc state .. */
state = tdes->state;
/* 2PC protocol does not support RETAIN LOCK */
(void) log_abort_local (thread_p, tdes, false);
if (tdes->state == TRAN_UNACTIVE_ABORTED)
{
tdes->state = state; /* Revert to 2PC state... */
}
/*
* Execute the abort at participants sites at this time.
*/
#ifdef LOG_2PC_ACK_RECV_REQUIRED
if (tdes->coord->ack_received)
{
/*
* Current site was also a coordinator site (of course not the root
* coordinator). Thus, we need to collect acknowledgments.
*
* If the following function fails, the transaction will be dangling
* and we need to retry sending the decision at another point.
* We have already decided and log the decision in the log file.
*/
(void) log_2pc_send_abort_decision (thread_p, tdes->gtrid, tdes->coord->num_particps,
tdes->coord->block_particps_ids);
}
else
#endif
{
/*
* Abort was decided without using the 2PC protocol at this site.
* That is, the participants are not prepare to commit). Therefore,
* there is no need to collect acknowledgments.
*
* If the following function fails, the transaction will be dangling
* and we need to retry sending the decision at another point.
* We have already decided and log the decision in the log file.
*/
(void) log_2pc_send_abort_decision (thread_p, tdes->gtrid, tdes->coord->num_particps,
tdes->coord->block_particps_ids);
}
/* Check if all the acknowledgments have been received */
state = log_complete_for_2pc (thread_p, tdes, LOG_ABORT, LOG_NEED_NEWTRID);
}
return state;
}
/*
* log_2pc_commit - Follow two phase commit protocol (coordinator's algorithm)
*
* return: TRAN_STATE
*
* tdes(in): State structure of transaction of the log record
* execute_2pc_type(in): Phase of two phase commit to execute
* decision(in/out): Wheater coordinator and its participants agree to
* commit or abort the transaction
*
*/
TRAN_STATE
log_2pc_commit (THREAD_ENTRY * thread_p, log_tdes * tdes, LOG_2PC_EXECUTE execute_2pc_type, bool * decision)
{
TRAN_STATE state;
if (tdes->gtrid == LOG_2PC_NULL_GTRID)
{
TR_TABLE_CS_ENTER (thread_p);
tdes->gtrid = log_2pc_make_global_tran_id (tdes->trid);
TR_TABLE_CS_EXIT (thread_p);
}
/*
* PHASE I of 2PC: Guarantee commitment (i.e., coordinator and participants
* are Voting)
*/
/* If the transaction is ready to commit, the first phase of the 2PC has already been completed, so skip it here */
if (execute_2pc_type == LOG_2PC_EXECUTE_FULL || execute_2pc_type == LOG_2PC_EXECUTE_PREPARE)
{
if (log_2pc_commit_first_phase (thread_p, tdes, execute_2pc_type, decision) != NO_ERROR)
{
return tdes->state;
}
}
else
{
/*
* We are currently not executing the first phase of 2PC. The decsion is
* already known
*/
if (execute_2pc_type == LOG_2PC_EXECUTE_COMMIT_DECISION)
{
*decision = true;
}
else
{
*decision = false;
}
}
/*
* PHASE II of 2PC: Inform decsion to participants (i.e., either commit or
* abort)
*/
if (execute_2pc_type != LOG_2PC_EXECUTE_PREPARE || *decision == false)
{
state = log_2pc_commit_second_phase (thread_p, tdes, decision);
}
else
{
state = tdes->state;
}
return state;
}
/*
* log_set_global_tran_info - SET GLOBAL TRANSACTION INFORMATION
*
* return:
*
* gtrid(in): global transaction identifier
* info(in): pointer to the user information to be set
* size(in): size of the user information to be set
*
* NOTE:Set the user information related with the global transaction.
* The global transaction identified by the 'gtrid' should exist
* and should be the value returned by 'db_2pc_start_transaction'
* You can use this function to set the longer format of global
* transaction identifier such as XID of XA interface.
*/
int
log_2pc_set_global_tran_info (THREAD_ENTRY * thread_p, int gtrid, void *info, int size)
{
LOG_TDES *tdes;
int i;
if (gtrid != LOG_2PC_NULL_GTRID)
{
TR_TABLE_CS_ENTER (thread_p);
for (i = 0; i < log_Gl.trantable.num_total_indices; i++)
{
tdes = LOG_FIND_TDES (i);
if (tdes != NULL && tdes->trid != NULL_TRANID && tdes->gtrid == gtrid)
{
/* The transaction is in the middle of the 2PC protocol, we cannot set. */
if (LOG_ISTRAN_2PC (tdes))
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_LOG_CANNOT_SET_GTRINFO, 2, gtrid,
log_state_string (tdes->state));
TR_TABLE_CS_EXIT (thread_p);
return ER_LOG_CANNOT_SET_GTRINFO;
}
/* Set the global transaction information. If already set, overwrite it. */
if (tdes->gtrinfo.info_data != NULL)
{
free_and_init (tdes->gtrinfo.info_data);
}
tdes->gtrinfo.info_data = malloc (size);
if (tdes->gtrinfo.info_data == NULL)
{
TR_TABLE_CS_EXIT (thread_p);
return ER_OUT_OF_VIRTUAL_MEMORY;
}
tdes->gtrinfo.info_length = size;
(void) memcpy (tdes->gtrinfo.info_data, info, size);
TR_TABLE_CS_EXIT (thread_p);
return NO_ERROR;
}
}
TR_TABLE_CS_EXIT (thread_p);
}
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_LOG_2PC_UNKNOWN_GTID, 1, gtrid);
return ER_LOG_2PC_UNKNOWN_GTID;
}
/*
* log_get_global_tran_info - Get global transaction information
*
* return: NO_ERROR or error code
*
* gtrid(in): global transaction identifier
* buffer(in): pointer to the buffer into which the user information is stored
* size(in): size of the buffer
*
* NOTE:Get the user information of the global transaction identified
* by the 'gtrid'.
* You can use this function to get the longer format of global
* transaction identifier such as XID of XA interface. This
* function is designed to use if you want to get XID after
* calling 'db_2pc_prepared_transactions' to support xa_recover()
*/
int
log_2pc_get_global_tran_info (THREAD_ENTRY * thread_p, int gtrid, void *buffer, int size)
{
LOG_TDES *tdes;
int i;
assert (buffer != NULL);
assert (size >= 0);
if (gtrid != LOG_2PC_NULL_GTRID)
{
TR_TABLE_CS_ENTER (thread_p);
for (i = 0; i < log_Gl.trantable.num_total_indices; i++)
{
tdes = LOG_FIND_TDES (i);
if (tdes != NULL && tdes->trid != NULL_TRANID && tdes->gtrid == gtrid)
{
/* If the global transation information is not set, error. */
if (tdes->gtrinfo.info_data == NULL)
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_LOG_CANNOT_SET_GTRINFO, 1, gtrid);
TR_TABLE_CS_EXIT (thread_p);
return ER_LOG_CANNOT_SET_GTRINFO;
}
/* Copy the global transaction information to the buffer. */
if (size > tdes->gtrinfo.info_length)
{
size = tdes->gtrinfo.info_length;
}
(void) memcpy (buffer, tdes->gtrinfo.info_data, size);
TR_TABLE_CS_EXIT (thread_p);
return NO_ERROR;
}
}
TR_TABLE_CS_EXIT (thread_p);
}
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_LOG_2PC_UNKNOWN_GTID, 1, gtrid);
return ER_LOG_2PC_UNKNOWN_GTID;
}
/*
* log_2pc_start - START TRANSACTION AS A PART OF GLOBAL TRANSACTION
*
* return: return global transaction identifier
*
* NOTE:Make current transaction as a part of a global transaction by
* assigning a global transaction identifier(gtrid).
* It is recommended to call this function just after the end of
* a transaction(commit or abort) before executing other works.
* This function is one way of getting gtrid of the transaction.
* The other way is to use 'db_2pc_prepare_to_commit_transaction'
* The function 'db_2pc_prepare_transaction' should be used if
* this function is called.
*/
int
log_2pc_start (THREAD_ENTRY * thread_p)
{
LOG_TDES *tdes;
int tran_index;
tran_index = LOG_FIND_THREAD_TRAN_INDEX (thread_p);
tdes = LOG_FIND_TDES (tran_index);
if (tdes == NULL)
{
er_set (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, ER_LOG_UNKNOWN_TRANINDEX, 1, tran_index);
return LOG_2PC_NULL_GTRID;
}
if (!LOG_ISTRAN_ACTIVE (tdes))
{
er_set (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, ER_LOG_2PC_CANNOT_START, 1, log_state_string (tdes->state));
return LOG_2PC_NULL_GTRID;
}
if (tdes->gtrid == LOG_2PC_NULL_GTRID)
{
TR_TABLE_CS_ENTER (thread_p);
tdes->gtrid = log_2pc_make_global_tran_id (tdes->trid);
TR_TABLE_CS_EXIT (thread_p);
}
return tdes->gtrid;
}
/*
* log_2pc_prepare - PREPARE TRANSACTION TO COMMIT
*
* return: TRAN_STATE
*
* NOTE: Prepare the current transaction for commitment in 2PC. The
* transaction should be made as a part of a global transaction
* before by 'db_2pc_start_transaction', a pair one of this
* function.
* The system promises not to unilaterally abort the transaction.
* After this function call, the only API functions that should
* be executed are 'db_commit_transaction' &
* 'db_abort_transaction'.
*/
TRAN_STATE
log_2pc_prepare (THREAD_ENTRY * thread_p)
{
LOG_TDES *tdes;
int tran_index;
tran_index = LOG_FIND_THREAD_TRAN_INDEX (thread_p);
tdes = LOG_FIND_TDES (tran_index);
if (tdes == NULL)
{
er_set (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, ER_LOG_UNKNOWN_TRANINDEX, 1, tran_index);
return TRAN_UNACTIVE_UNKNOWN;
}
if (tdes->gtrid == LOG_2PC_NULL_GTRID)
{
/* Transaction is not started by 'log_2pc_start', cannot be prepared. */
er_set (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, ER_LOG_2PC_NOT_STARTED, 1, tdes->trid);
return tdes->state;
}
return log_2pc_prepare_global_tran (thread_p, tdes->gtrid);
}
/*
* log_2pc_recovery_prepared - OBTAIN LIST OF PREPARED TRANSACTIONS
*
* return: the number of ids copied into 'gtrids[]'
*
* gtrids(in): array into which global transaction identifiers are copied
* size(in): size of 'gtrids[]' array
*
* NOTE:For restart recovery of global transactions, this function
* returns gtrids of transactions in prepared state, which was
* a part of a global transaction.
* If the return value is less than the 'size', there's no more
* transactions to recover.
*/
int
log_2pc_recovery_prepared (THREAD_ENTRY * thread_p, int gtrids[], int size)
{
LOG_TDES *tdes;
int i, count = 0;
assert (size >= 0);
TR_TABLE_CS_ENTER (thread_p);
for (i = 0; i < log_Gl.trantable.num_total_indices; i++)
{
tdes = LOG_FIND_TDES (i);
if (tdes != NULL && tdes->trid != NULL_TRANID && tdes->gtrid != LOG_2PC_NULL_GTRID
&& LOG_ISTRAN_2PC_PREPARE (tdes))
{
if (size <= count)
{
break;
}
gtrids[count++] = tdes->gtrid;
}
}
TR_TABLE_CS_EXIT (thread_p);
return count;
}
/*
* log_2pc_find_tran_descriptor -
*
* return:
*
* gtrid(in): global transaction identifier
*
* Note:
*/
static LOG_TDES *
log_2pc_find_tran_descriptor (int gtrid)
{
LOG_TDES *tdes;
int i;
/*
* Check if the client_user has a 2PC prepare index. If it does, attach
* this index to the given user.
*/
for (i = 0; i < log_Gl.trantable.num_total_indices; i++)
{
tdes = LOG_FIND_TDES (i);
if (tdes != NULL && tdes->trid != NULL_TRANID && tdes->gtrid != LOG_2PC_NULL_GTRID
&& LOG_ISTRAN_2PC_PREPARE (tdes) && (tdes->gtrid == gtrid))
{
return tdes;
}
}
return NULL;
}
/*
* log_2pc_attach_client -
*
* return:
*
* gtrid(in): global transaction identifier
*
* Note:
*/
static int
log_2pc_attach_client (THREAD_ENTRY * thread_p, LOG_TDES * tdes, LOG_TDES * client_tdes)
{
/*
* Abort the current client transaction, then attach to the desired
* transaction.
*/
(void) log_abort (thread_p, NULL_TRAN_INDEX);
/*
* Copy the contents of the 2PC transaction descriptor over the
* client transaction index.
*/
tdes->isloose_end = false;
tdes->isolation = client_tdes->isolation;
tdes->wait_msecs = client_tdes->wait_msecs;
/*
* The client identification remains the same. So there is not a need
* to set clientids.
*/
/* Return the table entry that is not going to be used anymore */
logtb_free_tran_index (thread_p, client_tdes->tran_index);
LOG_SET_CURRENT_TRAN_INDEX (thread_p, tdes->tran_index);
/* Reduce the number of loose end transactions by one */
log_Gl.trantable.num_prepared_loose_end_indices--;
return NO_ERROR;
}
/*
* log_2pc_attach_global_tran - Join coordinator to 2pc transaction
*
* return: New transaction index..
*
* gtrid(in): Global transaction identifier
*
* NOTE:The current client index is freed and the one with the given
* 2PC loose end (i.e., transaction waiting for decision)
* transaction is returned. The old client transaction is aborted
* before the attachement, the old client transaction must not be
* in the middle of a 2PC.
* It is recommended to attach a client to a 2PC loose end
* transaction just after the client restart or after a commit
* or abort.
* The attachment is done by copying some information of the
* current transaction client index over the 2PC loose end index
* and the previous assigned client index is freed.
* The attachment may fail if the current client user is not the
* same that the original user due to recovery client issues.
*/
int
log_2pc_attach_global_tran (THREAD_ENTRY * thread_p, int gtrid)
{
LOG_TDES *client_tdes; /* The current (client) transaction descriptor */
LOG_TDES *tdes; /* Transaction descriptor */
int tran_index;
assert (gtrid != LOG_2PC_NULL_GTRID);
tran_index = LOG_FIND_THREAD_TRAN_INDEX (thread_p);
client_tdes = LOG_FIND_TDES (tran_index);
if (client_tdes == NULL)
{
er_set (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, ER_LOG_UNKNOWN_TRANINDEX, 1, tran_index);
return NULL_TRAN_INDEX;
}
if (LOG_ISTRAN_2PC (client_tdes))
{
#ifdef CCI_XA
/*
* The current transaction is in the middle of the 2PC protocol, we
* don't need to attach it.
*/
return tran_index;
#else
/*
* The current transaction is in the middle of the 2PC protocol, we
* cannot attach at this moment
*/
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_LOG_2PC_CANNOT_ATTACH, 2, client_tdes->trid, gtrid);
return NULL_TRAN_INDEX;
#endif
}
TR_TABLE_CS_ENTER (thread_p);
if (log_Gl.trantable.num_prepared_loose_end_indices > 0)
{
tdes = log_2pc_find_tran_descriptor (gtrid);
if (tdes == NULL)
{
goto error;
}
if (log_2pc_attach_client (thread_p, tdes, client_tdes) != NO_ERROR)
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_LOG_2PC_CANNOT_ATTACH, 2, gtrid, client_tdes->trid);
TR_TABLE_CS_EXIT (thread_p);
return NULL_TRAN_INDEX;
}
TR_TABLE_CS_EXIT (thread_p);
return (tdes->tran_index);
}
error:
TR_TABLE_CS_EXIT (thread_p);
/* There is no such transaction to attach to */
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_LOG_2PC_UNKNOWN_GTID, 1, gtrid);
return NULL_TRAN_INDEX;
}
/*
* log_2pc_prepare_global_tran - Prepare the transaction to commit
*
* return: TRAN_STATE
*
* gtrid(in): Identifier of the global transaction. The coordinator is
* responsible for generating the global transaction identifier.
*
* NOTE:This function prepares the transaction identified by "gtrid"
* for commitment. Any objects and data that the transaction held
* or modified are placed in a state that can be guarantee the
* the commintment of the transaction by coordinator request
* regardless of failures. The shared type locks (IS, S) acquired
* by the transaction are released (SIX is demoted to IX lock)
* and the exclusive type locks (IX, X) acquired by the
* transaction are saved in the log as part of the prepare to
* commit log record. This is needed since, we must guarantee the
* consistency of the updated data until the transaction is
* either committed or aborted by the coordinator regardless of
* failures. If the transaction cannot be committed, it was
* previously aborted, and the coordinator is notified of such
* state.
*/
TRAN_STATE
log_2pc_prepare_global_tran (THREAD_ENTRY * thread_p, int gtrid)
{
LOG_TDES *tdes; /* Transaction descriptor */
LOG_TDES *other_tdes; /* Transaction descriptor */
LOG_REC_2PC_PREPCOMMIT *prepared; /* A prepare to commit log record */
LK_ACQUIRED_LOCKS acq_locks; /* List of acquired locks */
bool decision; /* The decision: success or failure */
TRAN_STATE state; /* The state of the transaction */
int size;
int i;
int tran_index;
LOG_PRIOR_NODE *node;
LOG_LSA start_lsa;
tran_index = LOG_FIND_THREAD_TRAN_INDEX (thread_p);
tdes = LOG_FIND_TDES (tran_index);
if (tdes == NULL)
{
er_set (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, ER_LOG_UNKNOWN_TRANINDEX, 1, tran_index);
return TRAN_UNACTIVE_UNKNOWN;
}
if (!LOG_ISTRAN_ACTIVE (tdes))
{
/*
* May be a system error since transaction is not active.. cannot be
* prepared to committed
*/
#if defined(CUBRID_DEBUG)
er_log_debug (ARG_FILE_LINE,
"log_2pc_prepare: Transaction %d " "(index = %d) is not active for prepare to commit."
" Its state is %s\n", tdes->trid, tdes->tran_index, log_state_string (tdes->state));
#endif /* CUBRID_DEBUG */
return tdes->state;
}
if (tdes->topops.last >= 0)
{
/*
* This is likely a system error since the transaction is being prepared
* to commit when there are system permanent operations attached to it.
* We assume that the transaction finished those top actions and that a
* commit is required on them.
*/
#if defined(CUBRID_DEBUG)
er_log_debug (ARG_FILE_LINE,
"log_2pc_prepare: May be a system error.\n" "Prepare to commit requested on the transaction = %d"
" (index = %d) which has permanent operations attached"
" to it.\n Will attach those system operations to the" " transaction\n", tdes->trid,
tdes->tran_index);
#endif /* CUBRID_DEBUG */
assert (false);
while (tdes->topops.last >= 0)
{
log_sysop_attach_to_outer (thread_p);
}
}
/* Check if the given global transaction identifier is unique. Perform a linear search on the transaction table and
* make sure that the given identifier is not being used by another transaction. Since the number of entries in the
* transaction table is expected to be reasonably small, there is no need to use a hashing mechanism here. */
TR_TABLE_CS_ENTER (thread_p);
for (i = 0; i < log_Gl.trantable.num_total_indices; i++)
{
other_tdes = LOG_FIND_TDES (i);
if (other_tdes == NULL)
{
er_set (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, ER_LOG_UNKNOWN_TRANINDEX, 1,
LOG_FIND_THREAD_TRAN_INDEX (thread_p));
TR_TABLE_CS_EXIT (thread_p);
return TRAN_UNACTIVE_UNKNOWN;
}
if (other_tdes != tdes && other_tdes->trid != NULL_TRANID && other_tdes->gtrid != LOG_2PC_NULL_GTRID
&& other_tdes->gtrid == gtrid)
{
/* This gtrid is not unique; It is already been used */
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_LOG_2PC_NON_UNIQUE_GTID, 1, gtrid);
TR_TABLE_CS_EXIT (thread_p);
return tdes->state;
}
}
TR_TABLE_CS_EXIT (thread_p);
/*
* Check if the current site is not only a participant but also a
* coordinator for some other participnats. If the current site is a
* coordinator of the transaction,its participants must prepare to commit
* before we can proceed with the prepare to commit. If not all the
* participants are willing to commit, the prepare to commit cannot be
* guaranteed; thus, the transaction is aborted at this site and its
* participants that were willing to commit teh transaction.
*/
tdes->gtrid = gtrid;
if (log_2pc_is_tran_distributed (tdes))
{
/*
* Site is also a coordinator, so we need to execute a nested 2PC
*/
state = log_2pc_commit (thread_p, tdes, LOG_2PC_EXECUTE_PREPARE, &decision);
if (decision == false)
{
return state;
}
/* Now proceed as participant of the distributed transaction */
}
lock_unlock_all_shared_get_all_exclusive (thread_p, &acq_locks);
/*
* Indicate that we are willing to commit the transaction
*/
size = 0;
if (acq_locks.obj != NULL)
{
size = acq_locks.nobj_locks * sizeof (LK_ACQOBJ_LOCK);
}
node =
prior_lsa_alloc_and_copy_data (thread_p, LOG_2PC_PREPARE, RV_NOT_DEFINED, NULL, tdes->gtrinfo.info_length,
(char *) tdes->gtrinfo.info_data, size, (char *) acq_locks.obj);
if (node == NULL)
{
if (acq_locks.obj != NULL)
{
free_and_init (acq_locks.obj);
}
return TRAN_UNACTIVE_UNKNOWN;
}
prepared = (LOG_REC_2PC_PREPCOMMIT *) node->data_header;
memcpy (prepared->user_name, tdes->client.get_db_user (), DB_MAX_USER_LENGTH);
prepared->gtrid = gtrid;
prepared->gtrinfo_length = tdes->gtrinfo.info_length;
prepared->num_object_locks = acq_locks.nobj_locks;
/* ignore num_page_locks */
prepared->num_page_locks = 0;
start_lsa = prior_lsa_next_record (thread_p, node, tdes);
if (acq_locks.obj != NULL)
{
free_and_init (acq_locks.obj);
}
/*
* END append. The log need to be flushed since we need to guarantee
* the commitment of the transaction if the coordinator requests commit
*/
tdes->state = TRAN_UNACTIVE_2PC_PREPARE;
logpb_flush_pages (thread_p, &start_lsa);
return tdes->state;
}
/*
* log_2pc_read_prepare - READ PREPARE_TO_COMMIT LOG RECORD
*
* return: nothing
*
* acquire_locks(in): specify if list of locks needs to be read from the log
* record and the listed locks needs to be acquired.
* tdes(in): Transaction descriptor
* log_lsa(in): Log address identifier containing the log record
* log_pgptr(in): the buffer containing the log page
*
* NOTE:This function is used to read the prepared log record from the
* system log at the specified location. If "acquire_locks"
* parameter is off only the global transaction identifier is
* read. If this parameter is on, on the other hand, the list of
* update-type locks that the transaction had acquired at the
* time of crash is also read from the log record, and the listed
* locks aqcuired.
* Note that the parameters specifying the location of the log
* record to be read are updated to point to the end of the
* record.
*/
void
log_2pc_read_prepare (THREAD_ENTRY * thread_p, int acquire_locks, log_tdes * tdes, LOG_LSA * log_lsa,
LOG_PAGE * log_page_p)
{
LOG_REC_2PC_PREPCOMMIT *prepared; /* A 2PC prepare to commit log record */
LK_ACQUIRED_LOCKS acq_locks; /* List of acquired locks before the system crash */
int size;
LOG_READ_ADVANCE_WHEN_DOESNT_FIT (thread_p, sizeof (*prepared), log_lsa, log_page_p);
prepared = (LOG_REC_2PC_PREPCOMMIT *) ((char *) log_page_p->area + log_lsa->offset);
tdes->client.set_system_internal_with_user (prepared->user_name);
tdes->gtrid = prepared->gtrid;
tdes->gtrinfo.info_length = prepared->gtrinfo_length;
LOG_READ_ADD_ALIGN (thread_p, sizeof (*prepared), log_lsa, log_page_p);
if (tdes->gtrinfo.info_length > 0)
{
tdes->gtrinfo.info_data = malloc (tdes->gtrinfo.info_length);
if (tdes->gtrinfo.info_data == NULL)
{
logpb_fatal_error (thread_p, true, ARG_FILE_LINE, "log_2pc_read_prepare");
return;
}
/* Read the global transaction user information data */
LOG_READ_ALIGN (thread_p, log_lsa, log_page_p);
logpb_copy_from_log (thread_p, (char *) tdes->gtrinfo.info_data, tdes->gtrinfo.info_length, log_lsa, log_page_p);
}
/* If the update-type locks that the transaction had obtained before the crash needs to be aqcuired, read them from
* the log record and obtain the locks at this time. */
if (acquire_locks != false)
{
/* Read in the list of locks to acquire */
LOG_READ_ALIGN (thread_p, log_lsa, log_page_p);
acq_locks.nobj_locks = prepared->num_object_locks;
acq_locks.obj = NULL;
if (acq_locks.nobj_locks > 0)
{
/* obtain the list of locks to acquire on objects */
size = acq_locks.nobj_locks * sizeof (LK_ACQOBJ_LOCK);
acq_locks.obj = (LK_ACQOBJ_LOCK *) malloc (size);
if (acq_locks.obj == NULL)
{
logpb_fatal_error (thread_p, true, ARG_FILE_LINE, "log_2pc_read_prepare");
return;
}
logpb_copy_from_log (thread_p, (char *) acq_locks.obj, size, log_lsa, log_page_p);
LOG_READ_ALIGN (thread_p, log_lsa, log_page_p);
}
if (acq_locks.nobj_locks > 0)
{
/* Acquire the locks */
if (lock_reacquire_crash_locks (thread_p, &acq_locks, tdes->tran_index) != LK_GRANTED)
{
logpb_fatal_error (thread_p, true, ARG_FILE_LINE, "log_2pc_read_prepare");
return;
}
free_and_init (acq_locks.obj);
}
}
}
void
log_2pc_read_prepare (THREAD_ENTRY * thread_p, int acquire_locks, log_tdes * tdes, log_reader & log_pgptr_reader)
{
LOG_REC_2PC_PREPCOMMIT *prepared; /* A 2PC prepare to commit log record */
LK_ACQUIRED_LOCKS acq_locks; /* List of acquired locks before the system crash */
int size;
log_pgptr_reader.advance_when_does_not_fit (sizeof (*prepared));
// *INDENT-OFF*
prepared = const_cast<LOG_REC_2PC_PREPCOMMIT*> (log_pgptr_reader.reinterpret_cptr<LOG_REC_2PC_PREPCOMMIT> ());
// *INDENT-ON*
tdes->client.set_system_internal_with_user (prepared->user_name);
tdes->gtrid = prepared->gtrid;
tdes->gtrinfo.info_length = prepared->gtrinfo_length;
log_pgptr_reader.add_align (sizeof (*prepared));
if (tdes->gtrinfo.info_length > 0)
{
tdes->gtrinfo.info_data = malloc (tdes->gtrinfo.info_length);
if (tdes->gtrinfo.info_data == NULL)
{
logpb_fatal_error (thread_p, true, ARG_FILE_LINE, "log_2pc_read_prepare");
return;
}
/* Read the global transaction user information data */
log_pgptr_reader.align ();
log_pgptr_reader.copy_from_log ((char *) tdes->gtrinfo.info_data, tdes->gtrinfo.info_length);
}
/* If the update-type locks that the transaction had obtained before the crash needs to be aqcuired, read them from
* the log record and obtain the locks at this time. */
if (acquire_locks != false)
{
/* Read in the list of locks to acquire */
log_pgptr_reader.align ();
acq_locks.nobj_locks = prepared->num_object_locks;
acq_locks.obj = NULL;
if (acq_locks.nobj_locks > 0)
{
/* obtain the list of locks to acquire on objects */
size = acq_locks.nobj_locks * sizeof (LK_ACQOBJ_LOCK);
acq_locks.obj = (LK_ACQOBJ_LOCK *) malloc (size);
if (acq_locks.obj == NULL)
{
logpb_fatal_error (thread_p, true, ARG_FILE_LINE, "log_2pc_read_prepare");
return;
}
log_pgptr_reader.copy_from_log ((char *) acq_locks.obj, size);
log_pgptr_reader.align ();
}
if (acq_locks.nobj_locks > 0)
{
/* Acquire the locks */
if (lock_reacquire_crash_locks (thread_p, &acq_locks, tdes->tran_index) != LK_GRANTED)
{
logpb_fatal_error (thread_p, true, ARG_FILE_LINE, "log_2pc_read_prepare");
return;
}
free_and_init (acq_locks.obj);
}
}
}
/*
* log_2pc_dump_gtrinfo - DUMP GLOBAL TRANSACTION USER INFORMATION
*
* return: nothing
*
* length(in): Length to dump in bytes
* data(in): The data being logged
*
* NOTE:Dump global transaction user information
*/
void
log_2pc_dump_gtrinfo (FILE * fp, int length, void *data)
{
}
/*
* log_2pc_dump_acqobj_locks - DUMP THE ACQUIRED OBJECT LOCKS
*
* return: nothing
*
* length(in): Length to dump in bytes
* data(in): The data being logged
*
* NOTE: Dump the acquired object lock structure.
*/
void
log_2pc_dump_acqobj_locks (FILE * fp, int length, void *data)
{
LK_ACQUIRED_LOCKS acq_locks;
acq_locks.nobj_locks = length / sizeof (LK_ACQOBJ_LOCK);
acq_locks.obj = (LK_ACQOBJ_LOCK *) data;
lock_dump_acquired (fp, &acq_locks);
}
/*
* log_2pc_append_start - APPEND A VOTING LOG RECORD FOR THE 2PC PROTOCOL
*
* return: nothing
*
* tdes(in/out): State structure of transaction
*
* NOTE:A LOG_2PC_START log record is appended to the log to start
* the 2PC protocol (i.e., its atomic commitment). The
* transaction is declared as collecting votes. This function is
* used by the coordinator site of a distributed transaction.
*/
static void
log_2pc_append_start (THREAD_ENTRY * thread_p, LOG_TDES * tdes)
{
LOG_REC_2PC_START *start_2pc; /* Start 2PC log record */
LOG_PRIOR_NODE *node;
LOG_LSA start_lsa;
node =
prior_lsa_alloc_and_copy_data (thread_p, LOG_2PC_START, RV_NOT_DEFINED, NULL,
(tdes->coord->particp_id_length * tdes->coord->num_particps),
(char *) tdes->coord->block_particps_ids, 0, NULL);
if (node == NULL)
{
return;
}
start_2pc = (LOG_REC_2PC_START *) node->data_header;
memcpy (start_2pc->user_name, tdes->client.get_db_user (), DB_MAX_USER_LENGTH);
start_2pc->gtrid = tdes->gtrid;
start_2pc->num_particps = tdes->coord->num_particps;
start_2pc->particp_id_length = tdes->coord->particp_id_length;
start_lsa = prior_lsa_next_record (thread_p, node, tdes);
/*
* END append
* We need to flush the log so that we can find the participants of the
* 2PC in the event of a crash. This is needed since the participants do
* not know about the coordinator or other participants. Participants will
* always wait for the coordinators. We do not have a full 2PC in which
* participants know about each other and the coordinator.
*/
tdes->state = TRAN_UNACTIVE_2PC_COLLECTING_PARTICIPANT_VOTES;
logpb_flush_pages (thread_p, &start_lsa);
}
/*
* log_2pc_append_decision - THE DECISION FOR THE DISTRIBUTED TRANSACTION HAS
* BEEN TAKEN
*
* return: nothing
*
* tdes(in/out): State structure of transaction
* decision(in): Either LOG_2PC_COMMIT_DECISION or LOG_2PC_ABORT_DECISION
*
* NOTE:A decision was taken to either commit or abort the distributed
* transaction. If a commit decsion was taken all participants
* and the coordinator have agreed to commit the transaction. On
* the other hand, if an abort decsion wasd taken, the
* coordinator and all participants did not reach a agreement to
* commit the transaction. It is likely that the distributed
* transaction was aborted at a remote site for circunstances
* beyond our control. A LOG_2PC_ABORT_DECISION log record is
* appended to the log. The second phase of the 2PC starts after
* the function finishes.
*/
static void
log_2pc_append_decision (THREAD_ENTRY * thread_p, LOG_TDES * tdes, LOG_RECTYPE decision)
{
LOG_PRIOR_NODE *node;
LOG_LSA start_lsa;
node = prior_lsa_alloc_and_copy_data (thread_p, decision, RV_NOT_DEFINED, NULL, 0, NULL, 0, NULL);
if (node == NULL)
{
return;
}
start_lsa = prior_lsa_next_record (thread_p, node, tdes);
if (decision == LOG_2PC_COMMIT_DECISION)
{
tdes->state = TRAN_UNACTIVE_2PC_COMMIT_DECISION;
/*
* END append
* We need to flush the log so that we can find the decision if a
* participant needed in the event of a crash. If the decision is not
* found in the log, we will assume abort
*/
logpb_flush_pages (thread_p, &start_lsa);
}
else
{
tdes->state = TRAN_UNACTIVE_2PC_ABORT_DECISION;
/*
* END append
* We do not need to flush the log since if the decision is not found in
* the log, abort is assumed.
*/
}
}
/*
* log_2pc_alloc_coord_info - ALLOCATE COORDINATOR RELATED INFORMATION
*
* return: tdes or NULL
*
* tdes(in): Transaction descriptor
* num_particps(in): Number of participating sites
* particp_id_length(in): Length of particp_ids block
* block_particps_ids(in): A block of information about the participants
*
* NOTE:This function is used to allocate and initialize coordinator
* related information about participants.
*/
log_tdes *
log_2pc_alloc_coord_info (log_tdes * tdes, int num_particps, int particp_id_length, void *block_particps_ids)
{
/* Initialize the coordinator information */
tdes->coord = (LOG_2PC_COORDINATOR *) malloc (sizeof (LOG_2PC_COORDINATOR));
if (tdes->coord == NULL)
{
return NULL;
}
else
{
tdes->coord->num_particps = num_particps;
tdes->coord->particp_id_length = particp_id_length;
tdes->coord->block_particps_ids = block_particps_ids;
#ifdef LOG_2PC_ACK_RECV_REQUIRED
tdes->coord->ack_received = NULL;
#endif
}
return tdes;
}
/*
* log_2pc_free_coord_info - FREE COORDINATOR RELATED INFORMATION
*
* return: nothing
*
* tdes(in): Transaction descriptor
*
* NOTE:This function is used to free coordinator related information
* about participants.
*/
void
log_2pc_free_coord_info (log_tdes * tdes)
{
if (tdes->coord != NULL)
{
#ifdef LOG_2PC_ACK_RECV_REQUIRED
if (tdes->coord->ack_received != NULL)
{
free_and_init (tdes->coord->ack_received);
}
#endif
if (tdes->coord->block_particps_ids != NULL)
{
free_and_init (tdes->coord->block_particps_ids);
}
free_and_init (tdes->coord);
}
}
/*
* log_2pc_recovery_prepare -
*
* return:
*
* tdes(in/out):
* lsa(in/out):
* log_page_p(in/out):
*
* Note:
*/
static void
log_2pc_recovery_prepare (THREAD_ENTRY * thread_p, LOG_TDES * tdes, LOG_LSA * log_lsa, LOG_PAGE * log_page_p)
{
/*
* This is a particpant of the distributed transaction. We
* need to continue looking since this participant may be
* a non root coordinator
*/
/* Get the DATA HEADER */
LOG_READ_ADD_ALIGN (thread_p, sizeof (LOG_RECORD_HEADER), log_lsa, log_page_p);
/* The transaction was in prepared_to_commit state at the time of crash. So, read the global transaction identifier
* and list of locks from the log record, and acquire all of the locks. */
if (tdes->state == TRAN_UNACTIVE_2PC_PREPARE)
{
log_2pc_read_prepare (thread_p, LOG_2PC_OBTAIN_LOCKS, tdes, log_lsa, log_page_p);
}
else
{
log_2pc_read_prepare (thread_p, LOG_2PC_DONT_OBTAIN_LOCKS, tdes, log_lsa, log_page_p);
}
}
/*
* log_2pc_recovery_start -
*
* return:
*
* tdes(in/out):
* lsa(in/out):
* log_page_p(in/out):
* ack_list(in/out):
* ack_count(in/out):
*
* Note:
*/
static int
log_2pc_recovery_start (THREAD_ENTRY * thread_p, LOG_TDES * tdes, LOG_LSA * log_lsa, LOG_PAGE * log_page_p,
int *ack_list, int *ack_count)
{
LOG_REC_2PC_START *start_2pc; /* A 2PC start log record */
void *block_particps_ids; /* A block of participant identifiers */
int num_particps;
int particp_id_length;
int i;
/* Obtain the coordinator information */
LOG_READ_ADD_ALIGN (thread_p, sizeof (LOG_RECORD_HEADER), log_lsa, log_page_p);
LOG_READ_ADVANCE_WHEN_DOESNT_FIT (thread_p, sizeof (*start_2pc), log_lsa, log_page_p);
start_2pc = ((LOG_REC_2PC_START *) ((char *) log_page_p->area + log_lsa->offset));
/*
* Obtain the participant information for this coordinator
*/
tdes->client.set_system_internal_with_user (start_2pc->user_name);
tdes->gtrid = start_2pc->gtrid;
num_particps = start_2pc->num_particps;
particp_id_length = start_2pc->particp_id_length;
block_particps_ids = malloc (particp_id_length * num_particps);
if (block_particps_ids == NULL)
{
logpb_fatal_error (thread_p, true, ARG_FILE_LINE, "log_2pc_recovery_analysis_info");
return ER_OUT_OF_VIRTUAL_MEMORY;
}
LOG_READ_ADD_ALIGN (thread_p, sizeof (*start_2pc), log_lsa, log_page_p);
LOG_READ_ALIGN (thread_p, log_lsa, log_page_p);
/* Read in the participants info. block from the log */
logpb_copy_from_log (thread_p, (char *) block_particps_ids, particp_id_length * num_particps, log_lsa, log_page_p);
/* Initialize the coordinator information */
if (log_2pc_alloc_coord_info (tdes, num_particps, particp_id_length, block_particps_ids) == NULL)
{
logpb_fatal_error (thread_p, true, ARG_FILE_LINE, "log_2pc_recovery_analysis_info");
return ER_FAILED;
}
/* Initialize the Acknowledgement vector to false since we do not know what acknowledgments have already been
* received. we need to continue reading the log */
#ifdef LOG_2PC_ACK_RECV_REQUIRED
i = sizeof (bool) * tdes->coord->num_particps;
tdes->coord->ack_received = (bool *) calloc (i);
if (tdes->coord->ack_received == NULL)
{
log_2pc_free_coord_info (tdes);
logpb_fatal_error (thread_p, true, ARG_FILE_LINE, "log_2pc_recovery_analysis_info");
return ER_OUT_OF_VIRTUAL_MEMORY;
}
if (*ack_count > 0 && ack_list != NULL)
{
/*
* Some participant acknowledgements have already been
* received. Copy this acknowledgement into the transaction
* descriptor.
*/
for (i = 0; i < *ack_count; i++)
{
if (ack_list[i] > tdes->coord->num_particps)
{
er_log_debug (ARG_FILE_LINE, "log_2pc_recovery_analysis_info:" " SYSTEM ERROR for log located at %lld|%d",
log_lsa->pageid, log_lsa->offset);
}
else
{
tdes->coord->ack_received[ack_list[i]] = true;
}
}
free_and_init (ack_list);
*ack_count = 0;
}
#endif
return NO_ERROR;
}
/*
* log_2pc_expand_ack_list -
*
* return:
*
* ack_list(in/out):
* ack_count(in/out):
* size_ack_list(in/out):
*
* Note:
*/
static int *
log_2pc_expand_ack_list (THREAD_ENTRY * thread_p, int *ack_list, int *ack_count, int *size_ack_list)
{
int size;
if ((*ack_count + 1) > (*size_ack_list))
{
/* allocate space */
if (*size_ack_list == 0)
{
/*
* Initialize the temporary area. Assume no more than 10
* participants
*/
*ack_count = 0;
*size_ack_list = 10;
size = (*size_ack_list) * sizeof (int);
ack_list = (int *) malloc (size);
if (ack_list == NULL)
{
/* Out of memory */
logpb_fatal_error (thread_p, true, ARG_FILE_LINE, "log_2pc_recovery_analysis_info");
return NULL;
}
}
else
{
/* expand ack list by 30% */
*size_ack_list = ((int) (((float) (*size_ack_list) * 1.30) + 0.5));
size = (*size_ack_list) * sizeof (int);
ack_list = (int *) realloc (ack_list, size);
if (ack_list == NULL)
{
/* Out of memory */
logpb_fatal_error (thread_p, true, ARG_FILE_LINE, "log_2pc_recovery_analysis_info");
return NULL;
}
}
}
return ack_list;
}
/*
* log_2pc_recovery_recv_ack -
*
* return:
*
* lsa(in/out):
* log_page_p(in/out):
* ack_list(in/out):
* ack_count(in/out):
*
* Note:
*/
static void
log_2pc_recovery_recv_ack (THREAD_ENTRY * thread_p, LOG_LSA * log_lsa, LOG_PAGE * log_page_p, int *ack_list,
int *ack_count)
{
LOG_REC_2PC_PARTICP_ACK *received_ack; /* A 2PC recv decision ack */
LOG_READ_ADD_ALIGN (thread_p, sizeof (LOG_RECORD_HEADER), log_lsa, log_page_p);
LOG_READ_ADVANCE_WHEN_DOESNT_FIT (thread_p, sizeof (*received_ack), log_lsa, log_page_p);
received_ack = ((LOG_REC_2PC_PARTICP_ACK *) ((char *) log_page_p->area + log_lsa->offset));
ack_list[*ack_count] = received_ack->particp_index;
(*ack_count)++;
}
/*
* log_2pc_recovery_analysis_record -
*
* return:
*
* record_type(in):
* tdes(in/out):
* lsa(in/out):
* log_page_p(in/out):
* ack_list(in/out):
* ack_count(in/out):
* size_ack_list(in/out):
* search_2pc_prepare(in/out):
* search_2pc_start(in/out):
*
* Note:
*/
static int
log_2pc_recovery_analysis_record (THREAD_ENTRY * thread_p, LOG_RECTYPE record_type, LOG_TDES * tdes, LOG_LSA * log_lsa,
LOG_PAGE * log_page_p, int **ack_list, int *ack_count, int *size_ack_list,
bool * search_2pc_prepare, bool * search_2pc_start)
{
switch (record_type)
{
case LOG_2PC_PREPARE:
if (*search_2pc_prepare)
{
log_2pc_recovery_prepare (thread_p, tdes, log_lsa, log_page_p);
*search_2pc_prepare = false;
}
break;
case LOG_2PC_START:
if (*search_2pc_start)
{
if (log_2pc_recovery_start (thread_p, tdes, log_lsa, log_page_p, *ack_list, ack_count) == NO_ERROR)
{
*search_2pc_start = false;
}
}
break;
case LOG_2PC_RECV_ACK:
/*
* Coordiantor site: The distributed transaction is in the
* second phase of the 2PC, that is, coordinator has notfied
* the decision to participants and some of them as acknowledge
* the execution of the decsion.
*/
if (*search_2pc_start && LOG_ISTRAN_2PC_IN_SECOND_PHASE (tdes))
{
*ack_list = log_2pc_expand_ack_list (thread_p, *ack_list, ack_count, size_ack_list);
log_2pc_recovery_recv_ack (thread_p, log_lsa, log_page_p, *ack_list, ack_count);
}
break;
case LOG_COMPENSATE:
case LOG_RUN_POSTPONE:
case LOG_COMMIT_WITH_POSTPONE:
case LOG_COMMIT_WITH_POSTPONE_OBSOLETE:
case LOG_2PC_COMMIT_DECISION:
case LOG_2PC_ABORT_DECISION:
case LOG_2PC_COMMIT_INFORM_PARTICPS:
case LOG_2PC_ABORT_INFORM_PARTICPS:
/* Skip over this log record types */
break;
case LOG_UNDOREDO_DATA:
case LOG_DIFF_UNDOREDO_DATA:
case LOG_UNDO_DATA:
case LOG_REDO_DATA:
case LOG_MVCC_UNDOREDO_DATA:
case LOG_MVCC_DIFF_UNDOREDO_DATA:
case LOG_MVCC_UNDO_DATA:
case LOG_MVCC_REDO_DATA:
case LOG_DBEXTERN_REDO_DATA:
case LOG_DUMMY_HEAD_POSTPONE:
case LOG_POSTPONE:
case LOG_SAVEPOINT:
case LOG_COMMIT:
case LOG_ABORT:
case LOG_SYSOP_START_POSTPONE:
case LOG_SYSOP_END:
case LOG_START_CHKPT:
case LOG_END_CHKPT:
case LOG_DUMMY_CRASH_RECOVERY:
case LOG_REPLICATION_DATA:
case LOG_REPLICATION_STATEMENT:
case LOG_END_OF_LOG:
/*
* Either the prepare to commit or start 2PC record should
* have already been found by now. Otherwise, it is likely that the
* transaction is not a distributed transaction that has loose end
* client actions
*/
if (*search_2pc_start == false)
{
*search_2pc_prepare = false;
}
else if (*search_2pc_prepare == false)
{
*search_2pc_start = false;
}
break;
case LOG_SMALLER_LOGREC_TYPE:
case LOG_LARGER_LOGREC_TYPE:
default:
#if defined(CUBRID_DEBUG)
er_log_debug (ARG_FILE_LINE, "log_2pc_recovery_analysis_info:" " Unknown record type = %d May be a system error",
log_rec->type);
#endif /* CUBRID_DEBUG */
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_LOG_PAGE_CORRUPTED, 1, log_lsa->pageid);
return ER_LOG_PAGE_CORRUPTED;
}
return NO_ERROR;
}
/*
* log_2pc_recovery_analysis_info - FIND 2PC information of given transaction
* upto the given lsa address of transaction
*
* return: nothing
*
* tdes(in/out): State structure of transaction of the log record
* upto_chain_lsa(in): Stop at this lsa of the transaction (This lsa MUST
* be of the given transaction
*
* NOTE:Obtain 2PC information of the given transaction up to the
* given lsa address. This function is needed at the end of the
* recovery analysis phase for transaction that were active at
* the time of the crash and at the time of a checkpoint log
* record. The upto_chain_lsa address is the undo_tail address of
* the transaction at the moment of the checkpoint log record.
* We should point out that a checkpoint log record does not
* contain all information related to 2PC due to the big space
* overhead (e.g., locks) and the closenest to the end of the
* transaction. The rest of the 2PC information of this
* transaction is read by the redo phase of the recovery process.
*/
void
log_2pc_recovery_analysis_info (THREAD_ENTRY * thread_p, log_tdes * tdes, LOG_LSA * upto_chain_lsa)
{
LOG_RECORD_HEADER *log_rec; /* Pointer to log record */
char log_pgbuf[IO_MAX_PAGE_SIZE + MAX_ALIGNMENT], *aligned_log_pgbuf;
LOG_PAGE *log_page_p = NULL; /* Log page pointer where LSA is located */
LOG_LSA lsa;
LOG_LSA prev_tranlsa; /* prev LSA of transaction */
bool search_2pc_prepare = false;
bool search_2pc_start = false;
int ack_count = 0;
int *ack_list = NULL;
int size_ack_list = 0;
aligned_log_pgbuf = PTR_ALIGN (log_pgbuf, MAX_ALIGNMENT);
if (!LOG_ISTRAN_2PC (tdes))
{
return;
}
/* For a transaction that was prepared to commit at the time of the crash, make sure that its global transaction
* identifier is obtained from the log and that the update_type locks that were acquired before the time of the crash
* are reacquired. */
if (tdes->gtrid == LOG_2PC_NULL_GTRID)
{
search_2pc_prepare = true;
}
/* If this is a coordinator transaction performing 2PC and voting record has not been read from the log in the
* recovery redo phase, read the voting record and any acknowledgement records logged for this transaction */
if (tdes->coord == NULL)
{
search_2pc_start = true;
}
/*
* Follow the undo tail chain starting at upto_chain_tail finding all
* 2PC related information
*/
log_page_p = (LOG_PAGE *) aligned_log_pgbuf;
LSA_COPY (&prev_tranlsa, upto_chain_lsa);
while (!LSA_ISNULL (&prev_tranlsa) && (search_2pc_prepare || search_2pc_start))
{
LSA_COPY (&lsa, &prev_tranlsa);
if ((logpb_fetch_page (thread_p, &lsa, LOG_CS_FORCE_USE, log_page_p)) != NO_ERROR)
{
logpb_fatal_error (thread_p, true, ARG_FILE_LINE, "log_2pc_recovery_analysis_info");
break;
}
while (prev_tranlsa.pageid == lsa.pageid && (search_2pc_prepare || search_2pc_start))
{
lsa.offset = prev_tranlsa.offset;
log_rec = LOG_GET_LOG_RECORD_HEADER (log_page_p, &lsa);
LSA_COPY (&prev_tranlsa, &log_rec->prev_tranlsa);
if (log_2pc_recovery_analysis_record
(thread_p, log_rec->type, tdes, &lsa, log_page_p, &ack_list, &ack_count, &size_ack_list,
&search_2pc_prepare, &search_2pc_start) != NO_ERROR)
{
LSA_SET_NULL (&prev_tranlsa);
}
free_and_init (ack_list);
} /* while */
} /* while */
/* Check for error conditions */
if (tdes->state == TRAN_UNACTIVE_2PC_PREPARE && tdes->gtrid == LOG_2PC_NULL_GTRID)
{
#if defined(CUBRID_DEBUG)
er_log_debug (ARG_FILE_LINE,
"log_2pc_recovery_analysis_info:" " SYSTEM ERROR... Either the LOG_2PC_PREPARE/LOG_2PC_START\n"
" log record was not found for participant of distributed" " trid = %d with state = %s", tdes->trid,
log_state_string (tdes->state));
#endif /* CUBRID_DEBUG */
}
/*
* Now the client should attach to this prepared transaction and
* provide the decision (commit/abort). Until then this thread
* is suspended.
*/
if (search_2pc_start)
{
/*
* A 2PC start log record was not found for the coordinator
*/
if (tdes->state != TRAN_UNACTIVE_2PC_PREPARE)
{
#if defined(CUBRID_DEBUG)
er_log_debug (ARG_FILE_LINE,
"log_2pc_recovery_analysis_info:" " SYSTEM ERROR... The LOG_2PC_START log record was"
" not found for coordinator of distributed trid = %d" " with state = %s", tdes->trid,
log_state_string (tdes->state));
#endif /* CUBRID_DEBUG */
}
}
}
/*
* log_2pc_recovery_collecting_participant_votes -
*
* return: nothing
*
* tdes(in/out): State structure of transaction of the log record
*
* Note:
*/
static void
log_2pc_recovery_collecting_participant_votes (THREAD_ENTRY * thread_p, LOG_TDES * tdes)
{
/*
* This is a participant which has not decided the fate of the
* distributed transaction. Abort the transaction
*/
log_2pc_append_decision (thread_p, tdes, LOG_2PC_ABORT_DECISION);
/* Let it fall thru the TRAN_UNACTIVE_2PC_ABORT_DECISION case */
log_2pc_recovery_abort_decision (thread_p, tdes);
}
/*
* log_2pc_recovery_abort_decision -
*
* return: nothing
*
* tdes(in/out): State structure of transaction of the log record
*
* Note:
*/
static void
log_2pc_recovery_abort_decision (THREAD_ENTRY * thread_p, LOG_TDES * tdes)
{
TRAN_STATE state;
/*
* An abort decision has already been taken and the system crash
* during the local abort. Retry it
*/
/*
* The transaction has been declared as 2PC abort. We can execute
* the LOCAL ABORT AND THE REMOTE ABORTS IN PARALLEL, however our
* communication subsystem does not support asynchronous communication
* types. The abort of the participants is done after the local
* abort is completed.
*/
/* Save the state.. so it can be reverted to the 2pc state .. */
state = tdes->state;
/* 2PC protocol does not support RETAIN LOCK */
(void) log_abort_local (thread_p, tdes, false);
if (tdes->state == TRAN_UNACTIVE_ABORTED)
{
tdes->state = state; /* Revert to 2PC state... */
}
/* Try to reconnect to participants that have not sent ACK. yet */
/*
* If the following function fails, the transaction will be dangling and we
* need to retry sending the decision at another point.
* We have already decided and log the decision in the log file.
*/
(void) log_2pc_send_abort_decision (thread_p, tdes->gtrid, tdes->coord->num_particps,
tdes->coord->block_particps_ids);
/* Check if all the acknowledgements have been received */
(void) log_complete_for_2pc (thread_p, tdes, LOG_ABORT, LOG_DONT_NEED_NEWTRID);
}
/*
* log_2pc_recovery_commit_decision -
*
* return: nothing
*
* tdes(in/out): State structure of transaction of the log record
*
* Note:
*/
static void
log_2pc_recovery_commit_decision (THREAD_ENTRY * thread_p, LOG_TDES * tdes)
{
TRAN_STATE state;
/* Save the state.. so it can be reverted to the 2pc state .. */
state = tdes->state;
/* First perform local commit; 2PC protocol does not support RETAIN LOCK */
(void) log_commit_local (thread_p, tdes, false, false);
tdes->state = state; /* Revert to 2PC state... */
/*
* If the following function fails, the transaction will be dangling and we
* need to retry sending the decision at another point.
* We have already decided and log the decision in the log file.
*/
(void) log_2pc_send_commit_decision (thread_p, tdes->gtrid, tdes->coord->num_particps,
tdes->coord->block_particps_ids);
/* Check if all the acknowledgments have been received */
(void) log_complete_for_2pc (thread_p, tdes, LOG_COMMIT, LOG_DONT_NEED_NEWTRID);
}
/*
* log_2pc_recovery_committed_informing_participants -
*
* return: nothing
*
* tdes(in/out): State structure of transaction of the log record
*
* Note:
*/
static void
log_2pc_recovery_committed_informing_participants (THREAD_ENTRY * thread_p, LOG_TDES * tdes)
{
/*
* Broadcast the commit to the participants that has not sent an
* acknowledgement yet.
*
* If the following function fails, the transaction will be
* dangling and we need to retry sending the decision at another
* point.
* We have already decided and log the decision in the log file.
*/
(void) log_2pc_send_commit_decision (thread_p, tdes->gtrid, tdes->coord->num_particps,
tdes->coord->block_particps_ids);
(void) log_complete_for_2pc (thread_p, tdes, LOG_COMMIT, LOG_DONT_NEED_NEWTRID);
}
/*
* log_2pc_recovery_aborted_informing_participants -
*
* return: nothing
*
* tdes(in/out): State structure of transaction of the log record
*
* Note:
*/
static void
log_2pc_recovery_aborted_informing_participants (THREAD_ENTRY * thread_p, LOG_TDES * tdes)
{
/*
* Broadcast the abort to the participants that has not sent an
* acknowledgement yet.
*
* If the following function fails, the transaction will be
* dangling and we need to retry sending the decision at another
* point.
* We have already decided and log the decision in the log file.
*/
(void) log_2pc_send_abort_decision (thread_p, tdes->gtrid, tdes->coord->num_particps,
tdes->coord->block_particps_ids);
(void) log_complete_for_2pc (thread_p, tdes, LOG_ABORT, LOG_DONT_NEED_NEWTRID);
}
/*
* log_2pc_recovery - TRY TO FINISH TRANSACTIONS THAT WERE IN THE 2PC PROTOCOL
* AT THE TIME OF CRASH
*
* return: nothing
*
* NOTE:This function tries to finish up the transactions that were
* in the two phase commit protocol at the time of the crash.
*/
void
log_2pc_recovery (THREAD_ENTRY * thread_p)
{
LOG_TDES *tdes; /* Transaction descriptor */
int i;
/*
* Try to finish distributed transaction that are in the uncertain phase
* of the two phase commit
*/
for (i = 0; i < log_Gl.trantable.num_total_indices; i++)
{
tdes = LOG_FIND_TDES (i);
if (tdes == NULL || tdes->trid == NULL_TRANID || !LOG_ISTRAN_2PC (tdes))
{
continue;
}
LOG_SET_CURRENT_TRAN_INDEX (thread_p, i);
switch (tdes->state)
{
case TRAN_UNACTIVE_2PC_COLLECTING_PARTICIPANT_VOTES:
log_2pc_recovery_collecting_participant_votes (thread_p, tdes);
break;
case TRAN_UNACTIVE_2PC_ABORT_DECISION:
log_2pc_recovery_abort_decision (thread_p, tdes);
break;
case TRAN_UNACTIVE_2PC_COMMIT_DECISION:
log_2pc_recovery_commit_decision (thread_p, tdes);
break;
case TRAN_UNACTIVE_WILL_COMMIT:
case TRAN_UNACTIVE_COMMITTED_WITH_POSTPONE:
/*
* All the local postpone actions had been completed; there are
* not any client postpone actions. Thus, we can inform the
* participants at this time.
*/
tdes->state = TRAN_UNACTIVE_COMMITTED_INFORMING_PARTICIPANTS;
/*
* Let it fall thru the
* TRAN_UNACTIVE_COMMITTED_INFORMING_PARTICIPANTS case
*/
[[fallthrough]];
case TRAN_UNACTIVE_COMMITTED_INFORMING_PARTICIPANTS:
log_2pc_recovery_committed_informing_participants (thread_p, tdes);
break;
case TRAN_UNACTIVE_ABORTED_INFORMING_PARTICIPANTS:
log_2pc_recovery_aborted_informing_participants (thread_p, tdes);
break;
case TRAN_RECOVERY:
case TRAN_ACTIVE:
case TRAN_UNACTIVE_COMMITTED:
case TRAN_UNACTIVE_TOPOPE_COMMITTED_WITH_POSTPONE:
case TRAN_UNACTIVE_ABORTED:
case TRAN_UNACTIVE_UNILATERALLY_ABORTED:
case TRAN_UNACTIVE_2PC_PREPARE:
case TRAN_UNACTIVE_UNKNOWN:
break;
}
}
}
#if defined (ENABLE_UNUSED_FUNCTION)
/*
* log_is_tran_in_2pc - IS TRANSACTION IN THE MIDDLE OF 2PC ?
*
* return:
*
* NOTE: This function finds if the transaction indicated by tran_index
* is in prepare to commit state (i.e., it is waiting for either
* commit or abort from its coordiantor).
*/
bool
log_is_tran_in_2pc (THREAD_ENTRY * thread_p)
{
LOG_TDES *tdes; /* Transaction descriptor */
int tran_index;
tran_index = LOG_FIND_THREAD_TRAN_INDEX (thread_p);
tdes = LOG_FIND_TDES (tran_index);
if (tdes == NULL)
{
er_set (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, ER_LOG_UNKNOWN_TRANINDEX, 1, tran_index);
return false;
}
return (LOG_ISTRAN_2PC (tdes));
}
#endif
/*
* log_2pc_is_tran_distributed - IS THIS A COORDINATOR OF A DISTRIBUTED TRANSACTION
*
* return:
*
* tdes(in): Transaction descriptor
*
* NOTE:Is this the coordinator of a distributed transaction ? If it
* is, coordinator information is initialized by this function.
*/
bool
log_2pc_is_tran_distributed (log_tdes * tdes)
{
int num_particps = 0; /* Number of participating sites */
int particp_id_length; /* Length of a particp_id */
void *block_particps_ids; /* A block of participant identifiers */
if (tdes->coord != NULL)
{
return true;
}
num_particps = log_2pc_get_num_participants (&particp_id_length, &block_particps_ids);
if (num_particps > 0)
{
/* This is a distributed transaction and our site is the coordinator */
/* If the coordinator info has not been recorded in the tdes, do it now */
(void) log_2pc_alloc_coord_info (tdes, num_particps, particp_id_length, block_particps_ids);
}
return (tdes->coord != NULL);
}
/*
* log_2pc_clear_and_is_tran_distributed - FIND IF TRANSACTION IS DISTRIBUTED AFTER
* CLEARING OLD COORDINATOR INFORMATION.
*
* return:
*
* tdes(in): Transaction descriptor
*
* NOTE: Clear coordinator information about participants. Then, check
* if the transaction is distributed..and cache any related
* information.
* This function is used during commit/abort time to make sure
* we have all participants. This is needed since CUBRID does
* not inform me of new participants.
*/
bool
log_2pc_clear_and_is_tran_distributed (log_tdes * tdes)
{
log_2pc_free_coord_info (tdes);
return log_2pc_is_tran_distributed (tdes);
}