Skip to content

File dblink_2pc_daemon.c

File List > cubrid > src > query > dblink_2pc_daemon.c

Go to the documentation of this file

/*
 *
 * Copyright 2016 CUBRID Corporation
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 *
 */

/*
 * dblink_2pc_daemon.c - send_2pc_decision_daemon for coordinator recovery
 *
 * global_tran_queue: coordinator -> daemon (participant data for _db_global_tran insert/update).
 * Daemon: 1) recovery from _db_global_tran (state 'A'/'C'); 2) wait on queue; 3) process: persist and send decision.
 */

#ident "$Id$"

#ifdef CCI_XA

#include "dblink_2pc_daemon.h"
#include "dblink_2pc.h"
#include "dblink_global_tran_catalog.h"
#include "dblink_scan.h"
#include "error_manager.h"
#include "log_impl.h"
#include "log_manager.h"
#include "memory_alloc.h"
#ifndef SA_MODE
#include "thread_daemon.hpp"
#endif
#include "thread_entry_task.hpp"
#include "thread_looper.hpp"
#include "thread_manager.hpp"
#include "xserver_interface.h"
#include "fault_injection.h"

#include <assert.h>
#include <chrono>
#include <stddef.h>
#include <string.h>
#include <time.h>

#include <pthread.h>

// XXX: SHOULD BE THE LAST INCLUDE HEADER
#include "memory_wrapper.hpp"

#ifdef SERVER_MODE

/* Initial and increment size for dynamic queue */
#define GLOBAL_TRAN_QUEUE_INIT_SIZE  64
#define GLOBAL_TRAN_QUEUE_GROW_SIZE  64

/* Dynamic circular queue */
static GLOBAL_TRAN_QUEUE_ENTRY *global_tran_queue = NULL;
static int global_tran_queue_size = GLOBAL_TRAN_QUEUE_INIT_SIZE;    /* allocated size */
static int global_tran_queue_head = 0;
static int global_tran_queue_tail = 0;
static int global_tran_queue_count = 0;
static pthread_mutex_t global_tran_queue_mutex = PTHREAD_MUTEX_INITIALIZER;

/* *INDENT-OFF* */
class dblink_2pc_daemon_context_manager:public cubthread::daemon_entry_manager
{
private:
  void on_daemon_retire (cubthread::entry & context) final
  {
    if (context.get_system_tdes () != NULL)
      {
    context.retire_system_worker ();
      }
  }
};

static cubthread::daemon * dblink_2pc_Daemon = NULL;
static dblink_2pc_daemon_context_manager * dblink_2pc_Daemon_context_manager = NULL;
/* *INDENT-ON* */

/*
 * global_tran_queue_expand - Expand queue by GLOBAL_TRAN_QUEUE_GROW_SIZE entries
 * Must be called with mutex held.
 * Returns: NO_ERROR on success, ER_OUT_OF_VIRTUAL_MEMORY on failure.
 *
 * Note: We use malloc + copy instead of realloc because the circular buffer
 * may have wrapped around (head > tail). In this case, we need to linearize
 * the data anyway, so realloc would not save any copying. This approach also
 * resets head to 0, making subsequent accesses more cache-friendly.
 */
static int
global_tran_queue_expand (void)
{
  GLOBAL_TRAN_QUEUE_ENTRY *new_queue;
  int new_size, i, j;

  new_size = global_tran_queue_size + GLOBAL_TRAN_QUEUE_GROW_SIZE;
  new_queue = (GLOBAL_TRAN_QUEUE_ENTRY *) malloc (new_size * sizeof (GLOBAL_TRAN_QUEUE_ENTRY));
  if (new_queue == NULL)
    {
      return ER_OUT_OF_VIRTUAL_MEMORY;
    }

  /* Copy existing entries to new queue (linearize circular buffer) */
  for (i = 0, j = global_tran_queue_head; i < global_tran_queue_count; i++)
    {
      new_queue[i] = global_tran_queue[j];
      j = (j + 1) % global_tran_queue_size;
    }

  /* Initialize remaining entries */
  for (i = global_tran_queue_count; i < new_size; i++)
    {
      memset (&new_queue[i], 0, sizeof (GLOBAL_TRAN_QUEUE_ENTRY));
    }

  assert (global_tran_queue != NULL);

  /* Free old queue and update pointers */
  free (global_tran_queue);

  global_tran_queue = new_queue;
  global_tran_queue_size = new_size;
  global_tran_queue_head = 0;
  global_tran_queue_tail = global_tran_queue_count;

  return NO_ERROR;
}

/* Callback for dblink_global_tran_scan_for_recovery: enqueue participant data to daemon */
static bool
dblink_2pc_recovery_callback (const DBLINK_GLOBAL_TRAN_ROW * row_data)
{
  DBLINK_CONN_INFO participant;
  char state;

  /* For 'P' state (before decision), use ABORT for recovery */
  if (row_data->state == DBLINK_2PC_STATE_PREPARE)
    {
      state = DBLINK_2PC_STATE_ABORT;
    }
  else
    {
      state = row_data->state;
    }

  /* Build participant info from row data */
  memset (&participant, 0, sizeof (participant));
  participant.conn_handle = row_data->bqual;
  snprintf (participant.conn_url, sizeof (participant.conn_url), "%s", row_data->conn_url);
  snprintf (participant.user_name, sizeof (participant.user_name), "%s", row_data->user_name);
  snprintf (participant.password, sizeof (participant.password), "%s", row_data->password);

  /* Enqueue to daemon for processing (one entry per participant) */
  (void) dblink_2pc_daemon_enqueue (row_data->gtrid, state, &participant);

  return true;          /* continue to next row */
}

void
dblink_2pc_daemon_recovery_with_thread (THREAD_ENTRY * thread_p)
{
  if (thread_p == NULL)
    {
      return;
    }
  (void) dblink_global_tran_scan_for_recovery (thread_p, dblink_2pc_recovery_callback);
}

static void
dblink_2pc_daemon_execute (cubthread::entry & thread_ref)
{
  GLOBAL_TRAN_QUEUE_ENTRY e;
  int ret;
  char send_state;
  THREAD_ENTRY *thread_p;

  if (thread_ref.get_system_tdes () == NULL)
    {
      if (!LOG_ISRESTARTED ())
    {
      return;
    }
      thread_ref.claim_system_worker ();
    }

  while (true)
    {
      /* Dequeue one entry (one participant per entry) */
      ret = dblink_2pc_daemon_dequeue (&e);

      /* Dequeu error or empty: will be retried by looper */
      if (ret != NO_ERROR || e.state == DBLINK_2PC_STATE_EMPTY)
    {
      return;
    }

      /* Determine decision state */
      if (e.state == DBLINK_2PC_STATE_PREPARE)
    {
      send_state = DBLINK_2PC_STATE_ABORT;
    }
      else
    {
      send_state = e.state;
    }

      /* Send decision to this single participant */
      ret = dblink_2pc_send_decision_one_participant (e.gtrid, &e.participant, (send_state == DBLINK_2PC_STATE_COMMIT));

      if (ret != NO_ERROR)
    {
      /* Error: re-enqueue this single participant for retry */
      (void) dblink_2pc_daemon_enqueue (e.gtrid, send_state, &e.participant);
      return;
    }

      thread_p = &thread_ref;
      /* P5: Crash after (6) send decision, before (7) DELETE - recovery: daemon resends decision then DELETE */
      FI_TEST (thread_p, FI_TEST_DBLINK_2PC_CRASH_BETWEEN_6_7, 0);
      /* Use a regular (worker) transaction so that delete runs with normal lock/MVCC semantics. */
      int tran_index = logtb_assign_tran_index (thread_p, NULL_TRANID, TRAN_ACTIVE, NULL, NULL,
                        TRAN_LOCK_INFINITE_WAIT, TRAN_READ_COMMITTED);
      if (tran_index != NULL_TRAN_INDEX)
    {
      int del_error = dblink_global_tran_delete_row (thread_p, e.gtrid, e.participant.conn_handle);
      if (del_error == NO_ERROR)
        {
          xtran_server_commit (thread_p, false);
          logtb_free_tran_index (thread_p, tran_index);
        }
      else
        {
          (void) xtran_server_abort (thread_p);
          logtb_free_tran_index (thread_p, tran_index);
        }
    }
    }
}

int
dblink_2pc_daemon_dequeue (GLOBAL_TRAN_QUEUE_ENTRY * e)
{
  pthread_mutex_lock (&global_tran_queue_mutex);

  if (global_tran_queue == NULL || e == NULL)
    {
      pthread_mutex_unlock (&global_tran_queue_mutex);
      assert (global_tran_queue != NULL && e != NULL);
      return ER_FAILED;
    }

  /* init state */
  e->state = DBLINK_2PC_STATE_EMPTY;

  if (global_tran_queue_count > 0)
    {
      *e = global_tran_queue[global_tran_queue_head];
      global_tran_queue_head = (global_tran_queue_head + 1) % global_tran_queue_size;
      global_tran_queue_count--;
    }

  pthread_mutex_unlock (&global_tran_queue_mutex);

  return NO_ERROR;
}

int
dblink_2pc_daemon_enqueue (int gtrid, char state, const DBLINK_CONN_INFO * participant)
{
  assert (participant != NULL);

  pthread_mutex_lock (&global_tran_queue_mutex);

  if (global_tran_queue == NULL)
    {
      pthread_mutex_unlock (&global_tran_queue_mutex);
      assert (global_tran_queue != NULL);
      return ER_FAILED;
    }

  /* check: queue is full */
  if (global_tran_queue_count >= global_tran_queue_size)
    {
      if (global_tran_queue_expand () != NO_ERROR)
    {
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1,
          (size_t) GLOBAL_TRAN_QUEUE_GROW_SIZE * sizeof (GLOBAL_TRAN_QUEUE_ENTRY));
      pthread_mutex_unlock (&global_tran_queue_mutex);
      assert (false);
      return ER_OUT_OF_VIRTUAL_MEMORY;
    }
    }

  global_tran_queue[global_tran_queue_tail].gtrid = gtrid;
  global_tran_queue[global_tran_queue_tail].state = state;
  global_tran_queue[global_tran_queue_tail].participant = *participant;
  global_tran_queue_tail = (global_tran_queue_tail + 1) % global_tran_queue_size;
  global_tran_queue_count++;

  pthread_mutex_unlock (&global_tran_queue_mutex);

  if (dblink_2pc_Daemon != NULL)
    {
      dblink_2pc_Daemon->wakeup ();
    }

  return NO_ERROR;
}

void
dblink_2pc_daemon_init (void)
{
  global_tran_queue_head = 0;
  global_tran_queue_tail = 0;
  global_tran_queue_count = 0;
  global_tran_queue_size = 0;
  global_tran_queue = NULL;

  global_tran_queue =
    (GLOBAL_TRAN_QUEUE_ENTRY *) malloc (GLOBAL_TRAN_QUEUE_INIT_SIZE * sizeof (GLOBAL_TRAN_QUEUE_ENTRY));
  if (global_tran_queue == NULL)
    {
      er_set (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1,
          GLOBAL_TRAN_QUEUE_INIT_SIZE * sizeof (GLOBAL_TRAN_QUEUE_ENTRY));
      /* falls through to exit/abort below */
    }
  else
    {
      global_tran_queue_size = GLOBAL_TRAN_QUEUE_INIT_SIZE;
      memset (global_tran_queue, 0, global_tran_queue_size * sizeof (GLOBAL_TRAN_QUEUE_ENTRY));

      {
    cubthread::looper looper = cubthread::looper (std::chrono::seconds (1));
    cubthread::entry_callable_task * daemon_task = new cubthread::entry_callable_task (dblink_2pc_daemon_execute);

    dblink_2pc_Daemon_context_manager = new dblink_2pc_daemon_context_manager ();
    dblink_2pc_Daemon =
      cubthread::get_manager ()->create_daemon (looper, daemon_task, "dblink_2pc_daemon",
                            dblink_2pc_Daemon_context_manager);
    if (dblink_2pc_Daemon == NULL)
      {
        er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_FAILED, 0);
        delete daemon_task;
        delete dblink_2pc_Daemon_context_manager;
        dblink_2pc_Daemon_context_manager = NULL;
      }
      }
    }

  if (dblink_2pc_Daemon == NULL)
    {
#if defined(NDEBUG)
      exit (EXIT_FAILURE);
#else
      abort ();
#endif
    }
}

void
dblink_2pc_daemon_stop (void)
{
  int i;

  if (dblink_2pc_Daemon != NULL)
    {
      cubthread::get_manager ()->destroy_daemon (dblink_2pc_Daemon);
    }
  if (dblink_2pc_Daemon_context_manager != NULL)
    {
      delete dblink_2pc_Daemon_context_manager;
      dblink_2pc_Daemon_context_manager = NULL;
    }

  if (global_tran_queue != NULL)
    {
      free_and_init (global_tran_queue);
    }
  global_tran_queue_size = 0;
  global_tran_queue_head = 0;
  global_tran_queue_tail = 0;
  global_tran_queue_count = 0;
}
#endif
#endif /* CCI_XA */