Skip to content

File unittests_snapshot.c

File List > cubrid > src > executables > unittests_snapshot.c

Go to the documentation of this file

/*
 * Copyright 2008 Search Solution Corporation
 * Copyright 2016 CUBRID Corporation
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 *
 */

/*
 * unittests_snapshot.c : unit tests for snapshot
 */

#include "porting.h"

#include "thread_manager.hpp"

#include <stdio.h>
#include <pthread.h>
#include <log_impl.h>
#include <sys/time.h>

#define strlen(s1) ((int) strlen(s1))

#define NOPS_SNAPSHOT   1000000
#define NOPS_COMPLPETE  1000000
#define NOPS_OLDEST     2000000

/* bit area sizes expressed in bits */
#define MVCC_BITAREA_ELEMENT_BITS 64
#define MVCC_BITAREA_ELEMENT_ALL_COMMITTED 0xffffffffffffffffULL
#define MVCC_BITAREA_BIT_COMMITTED 1
#define MVCC_BITAREA_BIT_ACTIVE 0

/* bit area size after cleanup */
#define MVCC_BITAREA_ELEMENTS_AFTER_FULL_CLEANUP      16

/* maximum size - 500 UINT64 */
#define MVCC_BITAREA_MAXIMUM_ELEMENTS            500

/* maximum size - 32000 bits */
#define MVCC_BITAREA_MAXIMUM_BITS          32000

#define MVCC_BITAREA_BITS_TO_ELEMENTS(count_bits) (((count_bits) + 63) >> 6)
#define MVCC_BITAREA_BITS_TO_BYTES(count_bits) ((((count_bits) + 63) >> 6) << 3)
#define MVCC_BITAREA_ELEMENTS_TO_BYTES(count_elements) ((count_elements) << 3)
#define MVCC_BITAREA_ELEMENTS_TO_BITS(count_elements) ((count_elements) << 6)

/* print function */
static struct timeval start_time;

static void
begin (char *test_name)
{
#define MSG_LEN   40
  int i;

  printf ("Testing %s", test_name);
  for (i = 0; i < MSG_LEN - strlen (test_name); i++)
    {
      putchar (' ');
    }
  printf ("...\n");

  gettimeofday (&start_time, NULL);

#undef MSG_LEN
}

static int
success ()
{
  struct timeval end_time;
  long long int elapsed_msec = 0;

  gettimeofday (&end_time, NULL);

  elapsed_msec = (end_time.tv_usec - start_time.tv_usec) / 1000;
  elapsed_msec += (end_time.tv_sec - start_time.tv_sec) * 1000;

  printf (" %s [%9.3f sec]\n", "OK", (float) elapsed_msec / 1000.0f);
  return NO_ERROR;
}

static void
logtb_initialize_mvcctable (void)
{
  log_Gl.mvcc_table.initialize ();
}


static void
logtb_finalize_mvcctable ()
{
  log_Gl.mvcc_table.finalize ();
}

static unsigned int
logtb_tran_btid_hash_func (const void *key, const unsigned int ht_size)
{
  return 0;
}

static int
logtb_tran_btid_hash_cmp_func (const void *key1, const void *key2)
{
  return 0;
}


static void
logtb_initialize_tdes_for_mvcc_testing (LOG_TDES * tdes, int tran_index)
{
  /* TODO: this is completely unsafe.
   * Using memset() on LOG_TDES is not appropriate.
   * However, since this is the only place in the entire codebase where memset() is used and it is for testing purposes,
   * we ensure that no compile warnings are triggered.   */
  memset ((void *) tdes, 0, sizeof (LOG_TDES));

  tdes->tran_index = tran_index;
  tdes->trid = NULL_TRANID;

  tdes->mvccinfo.init ();

  tdes->log_upd_stats.unique_stats_hash =
    mht_create ("Tran_unique_stats", 101, logtb_tran_btid_hash_func, logtb_tran_btid_hash_cmp_func);
  tdes->log_upd_stats.classes_cos_hash = mht_create ("Tran_classes_cos", 101, oid_hash, oid_compare_equals);
}


static int
logtb_initialize_mvcc_testing (int num_threads, THREAD_ENTRY ** thread_array)
{
  LOG_ADDR_TDESAREA *area = NULL;   /* Contiguous area for new transaction indices */
  size_t size, area_size;
  int i;
  THREAD_ENTRY *thread_p;
  int error_code = NO_ERROR;
  LOG_TDES *tdes;

  if (num_threads == 0 || thread_array == NULL)
    {
      return ER_FAILED;
    }

  log_Gl.trantable.area = NULL;
  log_Gl.trantable.all_tdes = NULL;

  size = num_threads * sizeof (THREAD_ENTRY);
  *thread_array = (THREAD_ENTRY *) malloc (size);
  if (*thread_array == NULL)
    {
      error_code = ER_OUT_OF_VIRTUAL_MEMORY;
      goto error;
    }

  for (i = 0; i < num_threads; i++)
    {
      // placement new
      new ((*thread_array) + i) THREAD_ENTRY ();
    }

  for (i = 0; i < num_threads; i++)
    {
      thread_p = *thread_array + i;
      thread_p->type = TT_WORKER;   /* init */
      thread_p->index = i;
      thread_p->tran_index = i + 1; /* quick fix to avoid issue in logtb_get_mvcc_snapshot - LOG_SYSTEM_TRAN_INDEX */
    }

  size = num_threads * sizeof (*log_Gl.trantable.all_tdes);
  log_Gl.trantable.all_tdes = (LOG_TDES **) malloc (size);
  if (log_Gl.trantable.all_tdes == NULL)
    {
      error_code = ER_OUT_OF_VIRTUAL_MEMORY;
      goto error;
    }

  area_size = num_threads * sizeof (LOG_TDES) + sizeof (LOG_ADDR_TDESAREA);
  area = (LOG_ADDR_TDESAREA *) malloc (area_size);
  if (area == NULL)
    {
      error_code = ER_OUT_OF_VIRTUAL_MEMORY;
      goto error;
    }

  area->tdesarea = ((LOG_TDES *) ((char *) area + sizeof (LOG_ADDR_TDESAREA)));
  area->next = NULL;

  /*
   * Initialize every newly created transaction descriptor index
   */
  for (i = 0; i < num_threads; i++)
    {
      tdes = log_Gl.trantable.all_tdes[i] = &area->tdesarea[i];
      logtb_initialize_tdes_for_mvcc_testing (tdes, i);
    }

  log_Gl.trantable.area = area;
  log_Gl.trantable.num_total_indices = num_threads;

  logtb_initialize_mvcctable ();
  log_Gl.hdr.mvcc_next_id = MVCCID_FIRST;

  return NO_ERROR;

error:
  if (*thread_array)
    {
      free_and_init (*thread_array);
    }

  if (log_Gl.trantable.all_tdes)
    {
      free_and_init (log_Gl.trantable.all_tdes);
    }

  if (log_Gl.trantable.area)
    {
      free_and_init (log_Gl.trantable.area);
    }

  return error_code;
}

static void
logtb_finalize_mvcc_testing (THREAD_ENTRY ** thread_array)
{
  LOG_TDES *tdes;
  MVCC_INFO *curr_mvcc_info;
  int i;

  logtb_finalize_mvcctable ();

  for (i = 0; i < log_Gl.trantable.num_total_indices; i++)
    {
      tdes = log_Gl.trantable.all_tdes[i];
      curr_mvcc_info = &tdes->mvccinfo;

      curr_mvcc_info->snapshot.m_active_mvccs.finalize ();

      if (tdes->log_upd_stats.unique_stats_hash != NULL)
    {
      mht_destroy (tdes->log_upd_stats.unique_stats_hash);
      tdes->log_upd_stats.unique_stats_hash = NULL;
    }
    }

  if (thread_array && *thread_array)
    {
      free_and_init (*thread_array);
    }

  if (log_Gl.trantable.all_tdes)
    {
      free_and_init (log_Gl.trantable.all_tdes);
    }

  if (log_Gl.trantable.area)
    {
      free_and_init (log_Gl.trantable.area);
    }
}

static UINT64 count_snapshots = 0;
static UINT64 count_complete = 0;
static UINT64 count_oldest = 0;

THREAD_RET_T THREAD_CALLING_CONVENTION
test_mvcc_get_snapshot (void *param)
{
  int i;
  THREAD_ENTRY *thread_p = (THREAD_ENTRY *) param;
  int tran_index = LOG_FIND_THREAD_TRAN_INDEX (thread_p);
  LOG_TDES *tdes = LOG_FIND_TDES (tran_index);
  unsigned int local_count_snapshots = 0;
  MVCC_INFO *curr_mvcc_info = &tdes->mvccinfo;

  // *INDENT-OFF*
  cubthread::set_thread_local_entry (*thread_p);
  // *INDENT-ON*

  for (i = 0; i < NOPS_SNAPSHOT; i++)
    {
      if (logtb_get_mvcc_snapshot (thread_p) != NULL)
    {
      local_count_snapshots++;
    }

      /* Invalidate snapshot */
      log_Gl.mvcc_table.reset_transaction_lowest_active (tran_index);
      curr_mvcc_info->reset ();
    }

  ATOMIC_INC_64 (&count_snapshots, local_count_snapshots);
  fprintf (stdout, "snapshot worker thread (%p) is leaving\n", thread_p);
  fflush (stdout);

  // *INDENT-OFF*
  cubthread::clear_thread_local_entry ();
  // *INDENT-ON*

  return (THREAD_RET_T) 0;
}

THREAD_RET_T THREAD_CALLING_CONVENTION
test_new_mvcc_complete (void *param)
{
  int i;
  THREAD_ENTRY *thread_p = (THREAD_ENTRY *) param;
  int tran_index = LOG_FIND_THREAD_TRAN_INDEX (thread_p);
  LOG_TDES *tdes = LOG_FIND_TDES (tran_index);
  unsigned int local_count_complete = 0;
  bool committed = true;
  MVCCID mvccid;

  // *INDENT-OFF*
  cubthread::set_thread_local_entry (*thread_p);
  // *INDENT-ON*

  for (i = 0; i < NOPS_COMPLPETE; i++)
    {
      mvccid = logtb_get_current_mvccid (thread_p);
      if (mvccid == MVCCID_NULL)
    {
      abort ();
    }

      logtb_complete_mvcc (thread_p, tdes, committed);
      committed = !committed;

      /* here we may test whether bit was set */
      local_count_complete++;

      log_Gl.mvcc_table.reset_transaction_lowest_active (tran_index);
    }

  ATOMIC_INC_64 (&count_complete, local_count_complete);
  fprintf (stdout, "complete worker thread (%p) is leaving\n", thread_p);
  fflush (stdout);

  // *INDENT-OFF*
  cubthread::clear_thread_local_entry ();
  // *INDENT-ON*

  return (THREAD_RET_T) 0;
}

THREAD_RET_T THREAD_CALLING_CONVENTION
test_mvcc_get_oldest (void *param)
{
  int i;
  THREAD_ENTRY *thread_p = (THREAD_ENTRY *) param;
  unsigned int local_count_oldest = 0;
  MVCCID prev_oldest, curr_oldest = MVCCID_NULL;

  // *INDENT-OFF*
  cubthread::set_thread_local_entry (*thread_p);
  // *INDENT-ON*

  for (i = 0; i < NOPS_OLDEST; i++)
    {
      prev_oldest = curr_oldest;
      curr_oldest = log_Gl.mvcc_table.get_global_oldest_visible ();
      if (MVCC_ID_PRECEDES (curr_oldest, prev_oldest))
    {
      abort ();
      continue;
    }

      local_count_oldest++;
    }

  ATOMIC_INC_64 (&count_oldest, local_count_oldest);

  fprintf (stdout, "get_oldest thread (%p) is leaving\n", thread_p);
  fflush (stdout);

  // *INDENT-OFF*
  cubthread::clear_thread_local_entry ();
  // *INDENT-ON*

  return (THREAD_RET_T) 0;
}

static int
test_mvcc_operations (int num_snapshot_threads, int num_complete_threads, int num_oldest_mvccid_threads,
              THREAD_ENTRY * thread_array)
{
  int i;
  int numthreads;
#define MAX_THREADS   100
  pthread_t threads[MAX_THREADS];
  int idx_thread_entry;
  char msg[256];

  numthreads = num_snapshot_threads + num_complete_threads + num_oldest_mvccid_threads;
  sprintf (msg, "test_mvcc_operations (%d snapshot threads, %d complete threads, %d oldest threads)",
       num_snapshot_threads, num_complete_threads, num_oldest_mvccid_threads);
  begin (msg);

  if (num_snapshot_threads < 0 || num_complete_threads < 0 || num_oldest_mvccid_threads < 0)
    {
      printf (" %s: %s\n", "FAILED", "negative number of threads not allowed");
      return ER_FAILED;
    }

  if (numthreads > MAX_THREADS)
    {
      printf (" %s: %s\n", "FAILED", "too many threads");
      return ER_FAILED;
    }

  count_snapshots = count_complete = count_oldest = 0;
  idx_thread_entry = 0;
  for (i = 0; i < num_snapshot_threads; i++, idx_thread_entry++)
    {
      if (pthread_create (&threads[idx_thread_entry], NULL, test_mvcc_get_snapshot,
              (void *) (thread_array + idx_thread_entry)) != NO_ERROR)
    {
      printf (" %s: %s\n", "FAILED", "thread create error");
      return ER_FAILED;
    }
    }

  for (i = 0; i < num_complete_threads; i++, idx_thread_entry++)
    {
      if (pthread_create (&threads[idx_thread_entry], NULL, test_new_mvcc_complete,
              (void *) (thread_array + idx_thread_entry)) != NO_ERROR)
    {
      printf (" %s: %s\n", "FAILED", "thread create error");
      return ER_FAILED;
    }
    }

  for (i = 0; i < num_oldest_mvccid_threads; i++, idx_thread_entry++)
    {
      if (pthread_create (&threads[idx_thread_entry], NULL, test_mvcc_get_oldest,
              (void *) (thread_array + idx_thread_entry)) != NO_ERROR)
    {
      printf (" %s: %s\n", "FAILED", "thread create error");
      return ER_FAILED;
    }
    }

  for (i = 0; i < numthreads; i++)
    {
      void *retval;

      pthread_join (threads[i], &retval);
      if (retval != NO_ERROR)
    {
      printf (" %s: %s\n", "FAILED", "thread proc error");
      return ER_FAILED;
    }
    }

  if (count_snapshots != (UINT64) num_snapshot_threads * NOPS_SNAPSHOT)
    {
      printf ("snapshot count fail (%llu != %llu)",
          (unsigned long long) count_snapshots, (unsigned long long) num_snapshot_threads * NOPS_SNAPSHOT);
      return ER_FAILED;
    }

  if (count_complete != (UINT64) num_complete_threads * NOPS_COMPLPETE)
    {
      printf ("complete count fail (%llu != %llu)",
          (unsigned long long) count_complete, (unsigned long long) num_complete_threads * NOPS_COMPLPETE);
      return ER_FAILED;
    }

  if (count_oldest != (UINT64) num_oldest_mvccid_threads * NOPS_OLDEST)
    {
      printf ("oldest count fail (%llu != %llu)",
          (unsigned long long) count_oldest, (unsigned long long) num_oldest_mvccid_threads * NOPS_OLDEST);
      return ER_FAILED;
    }

  success ();

  return NO_ERROR;
}

/* program entry */
int
main (int argc, char **argv)
{
#define MAX_SNAPSHOT_THREADS 10
#define MAX_COMPLETE_THREADS 10
#define MAX_OLDEST_THREADS 1

  int num_snapshot_threads, num_complete_threads, num_oldest_threads;
  THREAD_ENTRY *thread_array = NULL;

  logtb_initialize_mvcc_testing (100, &thread_array);

  for (num_oldest_threads = 1; num_oldest_threads <= MAX_OLDEST_THREADS; num_oldest_threads++)
    {
      for (num_complete_threads = 1; num_complete_threads <= MAX_COMPLETE_THREADS; num_complete_threads++)
    {
      for (num_snapshot_threads = 1; num_snapshot_threads <= MAX_SNAPSHOT_THREADS; num_snapshot_threads++)
        {
          if (test_mvcc_operations (num_snapshot_threads, num_complete_threads, num_oldest_threads,
                    thread_array) != NO_ERROR)
        {
          goto fail;
        }
        }
    }
    }

  logtb_finalize_mvcc_testing (&thread_array);
  return 0;

fail:
  logtb_finalize_mvcc_testing (&thread_array);
  printf ("Unit tests failed!\n");
  return ER_FAILED;

#undef MAX_SNAPSHOT_THREADS
#undef MAX_COMPLETE_THREADS
#undef MAX_OLDEST_THREADS
}