Skip to content

File thread_entry.hpp

File List > cubrid > src > thread > thread_entry.hpp

Go to the documentation of this file

/*
 * Copyright 2008 Search Solution Corporation
 * Copyright 2016 CUBRID Corporation
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 *
 */

/*
 * thread_entry - interface of thread contextual cache. for backward compatibility it has the unintuitive name entry
 */

#ifndef _THREAD_ENTRY_HPP_
#define _THREAD_ENTRY_HPP_

#if !defined (SERVER_MODE) && !defined (SA_MODE)
#error Wrong module
#endif // not SERVER_MODE and not SA_MODE

#include "error_context.hpp"
#include "lockfree_transaction_def.hpp"
#include "porting.h"        // for pthread_mutex_t, drand48_data
#include "system.h"         // for UINTPTR, INT64, HL_HEAPID

#include <atomic>
#include <thread>

#include <cassert>

// forward definitions

// from connection_defs.h
struct css_conn_entry;
// from connection_defs.h
struct dblink_conn_entry;
// from fault_injection.h
struct fi_test_item;
// from log_system_tran.hpp
class log_system_tdes;
// from log_compress.h
struct log_zip;
// from vacuum.h
struct vacuum_worker;
// from xasl_unpack_info.hpp
struct xasl_unpack_info;
// from page_buffer.h
struct pgbuf_holder_anchor;

// forward resource trackers
namespace cubbase
{
  template <typename Res>
  class resource_tracker;

  // trackers
  // memory allocations
  using alloc_tracker = resource_tracker<const void *>;
  // page fix
  using pgbuf_tracker = resource_tracker<const char *>;
}
namespace cubsync
{
  class critical_section_tracker;
}
namespace cubload
{
  class driver;
}

// for lock-free - FIXME
enum
{
  THREAD_TS_SPAGE_SAVING = 0,
  THREAD_TS_OBJ_LOCK_RES,
  THREAD_TS_OBJ_LOCK_ENT,
  THREAD_TS_CATALOG,
  THREAD_TS_SESSIONS,
  THREAD_TS_FREE_SORT_LIST,
  THREAD_TS_GLOBAL_UNIQUE_STATS,
  THREAD_TS_HFID_TABLE,
  THREAD_TS_XCACHE,
  THREAD_TS_FPCACHE,
  THREAD_TS_DWB_SLOTS,
  THREAD_TS_LAST
};
#define THREAD_TS_COUNT  THREAD_TS_LAST
struct lf_tran_entry;

// for what?? - FIXME
/* stats for event logging */
typedef struct event_stat EVENT_STAT;
struct event_stat
{
  // todo - replace timeval with std::chrono::milliseconds
  /* slow query stats */
  struct timeval cs_waits;
  struct timeval lock_waits;
  struct timeval latch_waits;

  /* volume expand stats */
  struct timeval extend_time;
  int extend_pages;

  /* save PRM_ID_SQL_TRACE_SLOW_MSECS for performance */
  bool trace_slow_query;

  /* log flush thread wait time */
  int trace_log_flush_time;
};

typedef std::thread::id thread_id_t;

// FIXME - move these enum to cubthread::entry
enum thread_type
{
  TT_MASTER,
  TT_SERVER,
  // used to designate generic 'user' operations threads
  TT_WORKER,
  // used to designate generic system operations
  TT_DAEMON,
  TT_LOADDB,
  TT_VACUUM_MASTER,
  TT_VACUUM_WORKER,
  TT_RECOVERY,
  TT_NONE
};

enum thread_resume_suspend_status
{
  THREAD_RESUME_NONE = 0,
  THREAD_RESUME_DUE_TO_INTERRUPT = 1,
  THREAD_RESUME_DUE_TO_SHUTDOWN = 2,
  THREAD_PGBUF_SUSPENDED = 3,
  THREAD_PGBUF_RESUMED = 4,
  THREAD_JOB_QUEUE_SUSPENDED = 5,
  THREAD_JOB_QUEUE_RESUMED = 6,
  THREAD_CSECT_READER_SUSPENDED = 7,
  THREAD_CSECT_READER_RESUMED = 8,
  THREAD_CSECT_WRITER_SUSPENDED = 9,
  THREAD_CSECT_WRITER_RESUMED = 10,
  THREAD_CSECT_PROMOTER_SUSPENDED = 11,
  THREAD_CSECT_PROMOTER_RESUMED = 12,
  THREAD_CSS_QUEUE_SUSPENDED = 13,
  THREAD_CSS_QUEUE_RESUMED = 14,
  THREAD_HEAP_CLSREPR_SUSPENDED = 15,
  THREAD_HEAP_CLSREPR_RESUMED = 16,
  THREAD_LOCK_SUSPENDED = 17,
  THREAD_LOCK_RESUMED = 18,
  THREAD_LOGWR_SUSPENDED = 19,
  THREAD_LOGWR_RESUMED = 20,
  THREAD_ALLOC_BCB_SUSPENDED = 21,
  THREAD_ALLOC_BCB_RESUMED = 22,
  THREAD_DWB_QUEUE_SUSPENDED = 23,
  THREAD_DWB_QUEUE_RESUMED = 24
};

namespace cubthread
{

  // cubthread::entry
  //
  //  description
  //    this is the thread context used by most server module functions to access thread-specific information quickly
  //
  //  note
  //    in CUBRID, thread entries are pooled and dispatched by cubthread::manager. see thread_manager.hpp for details.
  //
  //    this is an implementation in progress. for backward compatibility, all legacy members in this class are public;
  //    however, they will be gradually converted into private members with access functions
  //
  //    this header is paired with thread_compat.hpp which is used for compatibility between modules. only server
  //    module (with its SERVER_MODE and SA_MODE versions) has access to the content of this entry. client modules only
  //    sees a void pointer.
  //
  //    to avoid major refactoring, the THREAD_ENTRY alias is kept
  //
  //  todo
  //    make member variable private
  //
  //    remove content that does not belong here
  //
  //    migrate here thread entry related functionality from thread.c/h
  //
  class entry
  {
    public:
      entry ();
      ~entry ();

      // enumerations
      enum class status
      {
    TS_DEAD,
    TS_FREE,
    TS_RUN,
    TS_WAIT,
    TS_CHECK
      };

      // public functions
      void request_lock_free_transactions (void);   // todo: lock-free refactoring

      // The rules of thumbs is to always use private members. Until a complete refactoring, these members will remain
      // public
      int index;            /* thread entry index */
      thread_type type;     /* thread type */
      thread_id_t emulate_tid;  /* emulated thread id; applies to non-worker threads, when works on behalf of a worker
                   * thread */
      int client_id;        /* client id whom this thread is responding */
      int tran_index;       /* tran index to which this thread belongs */
      int private_lru_index;    /* private lru index when transaction quota is used */
      pthread_mutex_t tran_index_lock;
      unsigned int rid;     /* request id which this thread is processing */
      status m_status;          /* thread status */

      pthread_mutex_t th_entry_lock;    /* latch for this thread entry */
      pthread_cond_t wakeup_cond;   /* wakeup condition */

      HL_HEAPID private_heap_id;    /* id of thread private memory allocator */

      css_conn_entry *conn_entry;   /* conn entry ptr */

      xasl_unpack_info *xasl_unpack_info_ptr;     /* XASL_UNPACK_INFO * */
      int xasl_errcode;     /* xasl errorcode */
      int xasl_recursion_depth;

      unsigned int rand_seed;   /* seed for rand_r() */
      struct drand48_data rand_buf; /* seed for lrand48_r(), drand48_r() */

      thread_resume_suspend_status resume_status;       /* resume status */
      int request_latch_mode;   /* for page latch support */
      int request_fix_count;
      bool victim_request_fail;
      bool interrupted;     /* is this request/transaction interrupted ? */
      std::atomic_bool shutdown;        /* is server going down? */
      bool check_interrupt;     /* check_interrupt == false, during fl_alloc* function call. */
      bool wait_for_latch_promote;  /* this thread is waiting for latch promotion */
      entry *next_wait_thrd;

      void *lockwait;
      INT64 lockwait_stime;     /* time in milliseconds */
      int lockwait_msecs;       /* time in milliseconds */
      int lockwait_state;
      void *query_entry;
      entry *tran_next_wait;
      entry *worker_thrd_list;  /* worker thread on job queue */

      struct log_zip *log_zip_undo;
      struct log_zip *log_zip_redo;
      char *log_data_ptr;
      int log_data_length;

      bool no_logging;

      int net_request_index;    /* request index of net server functions */

      struct vacuum_worker *vacuum_worker;  /* Vacuum worker info */

      bool sort_stats_active;

      EVENT_STAT event_stats;

      /* for query profile */
      int trace_format;
      bool on_trace;
      bool clear_trace;

      /* for lock free structures */
      lf_tran_entry *tran_entries[THREAD_TS_COUNT];

      /* for supplemental log */
      bool no_supplemental_log;
      bool trigger_involved;
      bool is_cdc_daemon;

      /* support multi-process unloaddb
       * _unload_parallel_process_idx is only valid when (_unload_cnt_parallel_process > 1).
       * At this time, _unload_parallel_process_idx can have values ​​between 0 and (_unload_cnt_parallel_process-1).
       */
#define  NO_UNLOAD_PARALLEL_PROCESSIING (-1)
      int _unload_parallel_process_idx;
      int _unload_cnt_parallel_process;

#if !defined(NDEBUG)
      fi_test_item *fi_test_array;

      int count_private_allocators;
#endif
      std::atomic_int m_qlist_count;
      int read_ovfl_pages_count; // For Vacuum only.

      cubload::driver *m_loaddb_driver;

      pthread_mutex_t m_px_lock_mutex;
      pthread_mutex_t m_px_stats_mutex;
      UINT64 *m_px_stats;
      entry *m_px_orig_thread_entry;
      bool m_uses_px_stats;

      bool m_is_private_lru_enabled;
      struct pgbuf_holder_anchor *m_holder_anchor;

      thread_id_t get_id ();
      pthread_t get_posix_id ();
      void register_id ();
      void unregister_id ();
      bool is_on_current_thread () const;

      void return_lock_free_transaction_entries (void);

      void release_packet (void *buffer);

      void lock (void);
      void unlock (void);

      cuberr::context &get_error_context (void)
      {
    return m_error;
      }

      cubbase::alloc_tracker &get_alloc_tracker (void)
      {
    return m_alloc_tracker;
      }
      cubbase::pgbuf_tracker &get_pgbuf_tracker (void)
      {
    return m_pgbuf_tracker;
      }
      cubsync::critical_section_tracker &get_csect_tracker (void)
      {
    return m_csect_tracker;
      }

      log_system_tdes *get_system_tdes (void)
      {
    return m_systdes;
      }
      void set_system_tdes (log_system_tdes *sys_tdes)
      {
    m_systdes = sys_tdes;
      }
      void reset_system_tdes (void)
      {
    m_systdes = NULL;
      }
      void claim_system_worker ();
      void retire_system_worker ();

      void end_resource_tracks (void);
      void push_resource_tracks (void);
      void pop_resource_tracks (void);

      void assign_lf_tran_index (lockfree::tran::index idx);
      lockfree::tran::index pull_lf_tran_index ();
      lockfree::tran::index get_lf_tran_index ();

    private:
      void clear_resources (void);

      thread_id_t m_id;

      // error manager context
      cuberr::context m_error;

      // TODO: move all members her
      bool m_cleared;

      // trackers
      cubbase::alloc_tracker &m_alloc_tracker;
      cubbase::pgbuf_tracker &m_pgbuf_tracker;
      cubsync::critical_section_tracker &m_csect_tracker;
      log_system_tdes *m_systdes;

      lockfree::tran::index m_lf_tran_index;
  };

} // namespace cubthread

#ifndef _THREAD_COMPAT_HPP_
// The whole code uses THREAD_ENTRY... It is ridiculous to change entire code to rename.
typedef cubthread::entry THREAD_ENTRY;
typedef std::thread::id thread_id_t;
#endif // _THREAD_COMPAT_HPP_

// alias functions for C legacy code
//
// use inline functions instead definitions

inline int
thread_get_recursion_depth (cubthread::entry *thread_p)
{
  return thread_p->xasl_recursion_depth;
}

inline void
thread_inc_recursion_depth (cubthread::entry *thread_p)
{
  thread_p->xasl_recursion_depth++;
}

inline void
thread_dec_recursion_depth (cubthread::entry *thread_p)
{
  thread_p->xasl_recursion_depth--;
}

inline void
thread_clear_recursion_depth (cubthread::entry *thread_p)
{
  thread_p->xasl_recursion_depth = 0;
}

inline void
thread_trace_on (cubthread::entry *thread_p)
{
  thread_p->on_trace = true;
}

inline void
thread_trace_off (cubthread::entry *thread_p)
{
  thread_p->on_trace = false;
}

inline void
thread_set_trace_format (cubthread::entry *thread_p, int format)
{
  thread_p->trace_format = format;
}

inline bool
thread_is_on_trace (cubthread::entry *thread_p)
{
  return thread_p->on_trace;
}

inline void
thread_set_clear_trace (cubthread::entry *thread_p, bool clear)
{
  thread_p->clear_trace = clear;
}

inline bool
thread_need_clear_trace (cubthread::entry *thread_p)
{
  return thread_p->clear_trace;
}

inline bool
thread_get_sort_stats_active (cubthread::entry *thread_p)
{
  return thread_p->sort_stats_active;
}

inline bool
thread_set_sort_stats_active (cubthread::entry *thread_p, bool new_flag)
{
  bool old_flag = thread_p->sort_stats_active;
  thread_p->sort_stats_active = new_flag;
  return old_flag;
}

inline cubthread::entry *
thread_get_main_thread (cubthread::entry *thread_p)
{
  assert (thread_p != nullptr);

  cubthread::entry *current = thread_p;

  // Safety limit to prevent infinite traversal in case of corrupted hierarchy
  constexpr int MAX_DEPTH = 8;

  for (int i = 0; i < MAX_DEPTH; ++i)
    {
      cubthread::entry *parent = current->m_px_orig_thread_entry;

      // Found root (nullptr) or a self-referencing main thread
      if (parent == nullptr || parent == current)
    {

      return current;
    }

      // Detect logical cycles (looping back to the starting thread)
      if ( unlikely (parent == thread_p))
    {
      assert (false && "Cycle detected in thread hierarchy");
      return thread_p;
    }

      current = parent;
    }

  // Fallback for unexpectedly deep chains or undetected complex cycles
  assert (false && "Thread hierarchy depth exceeded limit");
  return thread_p;
}

inline void
thread_lock_entry (cubthread::entry *thread_p)
{
  thread_p->lock ();
}

inline void
thread_unlock_entry (cubthread::entry *thread_p)
{
  thread_p->unlock ();
}

void thread_suspend_wakeup_and_unlock_entry (cubthread::entry *p, thread_resume_suspend_status suspended_reason);
int thread_suspend_timeout_wakeup_and_unlock_entry (cubthread::entry *p, struct timespec *t,
    thread_resume_suspend_status suspended_reason);
void thread_wakeup (cubthread::entry *p, thread_resume_suspend_status resume_reason);
void thread_check_suspend_reason_and_wakeup (cubthread::entry *thread_p, thread_resume_suspend_status resume_reason,
    thread_resume_suspend_status suspend_reason);
void thread_wakeup_already_had_mutex (cubthread::entry *p, thread_resume_suspend_status resume_reason);
int thread_suspend_with_other_mutex (cubthread::entry *p, pthread_mutex_t *mutexp, int timeout, struct timespec *to,
                     thread_resume_suspend_status suspended_reason);

const char *thread_type_to_string (thread_type type);
const char *thread_status_to_string (cubthread::entry::status status);
const char *thread_resume_status_to_string (thread_resume_suspend_status resume_status);
#endif // _THREAD_ENTRY_HPP_