Skip to content

File thread_manager.hpp

File List > cubrid > src > thread > thread_manager.hpp

Go to the documentation of this file

/*
 * Copyright 2008 Search Solution Corporation
 * Copyright 2016 CUBRID Corporation
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 *
 */

/*
 * thread_manager.hpp - interface of tracker for all thread resources
 */

#ifndef _THREAD_MANAGER_HPP_
#define _THREAD_MANAGER_HPP_

#if !defined (SERVER_MODE) && !defined (SA_MODE)
#error Wrong module
#endif // not SERVER_MODE and not SA_MODE

// same module includes
#include "thread_entry.hpp"
#include "thread_entry_task.hpp"
#include "thread_task.hpp"
#include "thread_waiter.hpp"
#if defined (SERVER_MODE)
#include "thread_worker_pool_impl.hpp"
#endif

// other module includes
#include "count_registry.hpp"
#include "base_flag.hpp"

#include <mutex>
#include <vector>

// forward definitions
template <typename T>
class resource_shared_pool;

namespace lockfree
{
  namespace tran
  {
    class system;
  }
}

namespace cubthread
{

  // forward definition
  class connection;
  class worker_pool;
  class looper;
  class daemon;
  class daemon_entry_manager;

  // cubthread::manager
  //
  //  description:
  //    thread and thread context (entry) manager
  //    CUBRID interface for using daemons and worker pools with thread entries
  //
  //    daemons -
  //      creates, destroys and tracks all daemons
  //      provides thread entries to daemons
  //      available in SERVER_MODE only
  //      see more details in thread_daemon.hpp
  //
  //    worker pools -
  //      create, destroys and tracks all worker pools
  //      provides thread entries to daemons
  //      available in both SERVER_MODE and SA_MODE; SA_MODE however does not actually create worker pools, but instead
  //        execute required tasks immediately (on current thread)
  //      see more details in thread_worker_pool.hpp
  //
  //    entries -
  //      creates a pool of entries; pool cannot be extended
  //      reserves entries for daemons and worker pools; if entry resources are depleted, it will refuse to create
  //      additional daemons and worker pools
  //      dispatches entries when worker/daemon threads start execution and manages entry retirement/reuse
  //      note -
  //        thread entries should be seen as thread local variables. however, they are bulky structures that may take
  //        a long time to initialize/finalize, so they are pooled by manager; expensive initialize/finalize are
  //        replaced by claim from pool and retire to pool. note that claim also saves the entry to thread local
  //        pointer to thread_entry (see claim_entry/retire_entry)
  //
  //  how to use:
  //     1. daemon -
  //          REGISTER_DAEMON (name);
  //          daemon *my_daemon = cubthread::get_manager ()->create_daemon (daemon_looper, daemon_task_p);
  //          // daemon loops and execute task on each iteration
  //          cubthread::get_manager ()->destroy_daemon (my_daemon);
  //
  //     2. worker_pool -
  //          REGISTER_WORKERPOOL (name, getter);
  //          worker_pool *my_workpool = cubthread::get_manager ()->create_worker_pool<pool_type> (MAX_THREADS, MAX_JOBS);
  //          cubthread::get_manager ()->push_task (my_workpool, entry_task_p);
  //          cubthread::get_manager ()->destroy_worker_pool (my_workpool);
  //
  class manager
  {
    public:
      using connection_registry_t = cubbase::count_registry<connection>;
      using workerpool_registry_t = cubbase::count_registry<worker_pool>;
      using daemon_registry_t = cubbase::count_registry<daemon>;

      manager ();
      ~manager ();

      // entry manager

      void alloc_entries (void);
      void init_entries (bool with_lock_free = false);
      void init_lockfree_system ();

      // worker pool management

      // create a worker pool with pool_size number of threads
      // notes: if there are not pool_size number of entries available, worker pool is not created and NULL is returned
      //        signature emulates worker_pool constructor signature
      template<typename Res, typename ... CtArgs>
      Res *create_worker_pool (std::size_t pool_size, std::size_t core_count, CtArgs &&... args);

      template <typename Res>
      void destroy_worker_pool (Res *&worker_pool_arg);

      // push task to worker pool created with this manager
      // if worker_pool_arg is NULL, the task is executed immediately
      void push_task (worker_pool *worker_pool_arg, entry_task *exec_p);
      // push task on the given core of entry worker pool.
      // read cubthread::worker_pool::execute_on_core for details.
      void push_task_on_core (worker_pool *worker_pool_arg, entry_task *exec_p, std::size_t core_hash, bool method_mode);

      // daemon management

      // there are two types of daemons:
      //
      //    1. daemons based on thread_entry context
      //    2. daemons without context
      //
      // first types of daemons will also have to reserve a thread entry. there can be unlimited second type daemons
      //
      // create_daemon/destroy_daemon and create_daemon_without_entry/destroy_daemon_without_entry are not
      // interchangeable. expect safe-guard failures if not used appropriately.
      //

      // create daemon thread
      //
      // note: signature should match context-based daemon constructor. only exception is entry manager which is
      //       moved at the end to allow a default value
      //
      // todo: remove default daemon name
      daemon *create_daemon (const looper &looper_arg, entry_task *exec_p,
                 const char *daemon_name = "", entry_manager *entry_mgr = NULL);
      // destroy daemon thread
      void destroy_daemon (daemon *&daemon_arg);

      // create & destroy daemon thread without thread entry
      //
      // note: create signature should match context-less daemon constructor
      daemon *create_daemon_without_entry (const looper &looper_arg, task_without_context *exec_p,
                       const char *daemon_name);
      void destroy_daemon_without_entry (daemon *&daemon_arg);

      // other member functions

      // get the maximum thread count
      std::size_t get_max_thread_count (void) const;

      // verify all threads (workers and daemons) are killed
      void check_all_killed (void);

      // get entry array; required for thread.c/h backward compatibility
      // todo: remove me
      entry *get_all_entries (void)
      {
    return m_all_entries;
      }

      entry_manager &get_entry_manager (void)
      {
    return m_entry_manager;
      }

      lockfree::tran::system &get_lockfree_transys ()
      {
    return *m_lf_tran_sys;
      }

      void set_max_thread_count_from_config ();
      void set_max_thread_count (std::size_t count);

      void return_lock_free_transaction_entries (void);
      entry *find_by_tid (thread_id_t tid);

      // mappers

      // map all entries
      // function signature is:
      //    bool & stop_mapper - output true to stop mapping over threads
      template <typename Func, typename ... Args>
      void map_entries (Func &&func, Args &&... args);

      void clear_all_holder_anchor (void)
      {
    for (std::size_t it = 0; it < m_max_threads; it++)
      {
        m_all_entries[it].m_holder_anchor = NULL;
      }
      }

      // claim/retire entries
      entry *claim_entry (void);
      void retire_entry (entry &entry_p);

    private:

      // define friend classes/functions to access claim_entry/retire_entry functions
      friend class entry_manager;
      friend void initialize (entry *&my_entry);
      friend void finalize (void);

      // private type aliases
      using entry_dispatcher = resource_shared_pool<entry>;

      // generic implementation to create and destroy resources (specialize through daemon and worker pool)
      template <typename Res, typename Base, typename ... CtArgs>
      Res *create_and_track_resource (std::vector<Base *> &tracker, size_t entries_count, CtArgs &&... args);
      template <typename Res>
      void destroy_and_untrack_resource (std::vector<Res *> &tracker, Res *&res, std::size_t entries_count);
      template <typename Res>
      void destroy_and_untrack_all_resources (std::vector<Res *> &tracker);

      // private members

      // max thread count
      std::size_t m_max_threads;

      // guard for thread resources
      std::mutex m_entries_mutex;
      // worker pools
      std::vector<worker_pool *> m_worker_pools;
      // daemons
      std::vector<daemon *> m_daemons;
      // daemons without entries
      std::vector<daemon *> m_daemons_without_entries;

      // entries
      entry *m_all_entries;
      // entry pool
      entry_dispatcher *m_entry_dispatcher;
      // available entries count
      std::size_t m_available_entries_count;
      entry_manager m_entry_manager;
      daemon_entry_manager m_daemon_entry_manager;

      // lock-free transaction system
      lockfree::tran::system *m_lf_tran_sys;
  };

  // alias
#if defined (SERVER_MODE)
  using worker_pool_type = cubthread::worker_pool_impl<false>;
  using stats_worker_pool_type = cubthread::worker_pool_impl<true>;
#else
  using worker_pool_type = cubthread::worker_pool;
  using stats_worker_pool_type = cubthread::worker_pool;
#endif

  // thread logging flags
  //
  // TODO: complete thread logging for all modules
  //
  // How to use:
  //
  //    do_log = is_logging_configured (LOG_MANAGER);
  //    if (do_log)
  //      _er_log_debug (ARG_FILE_LINE, "something happens\n);
  //
  // Flags explained:
  //
  //    There are three types of flags to be used: manager, worker pool and daemons. For now, only worker pools are
  //    actually logged, others are just declared for future extensions.
  //
  //    To activate a logging flag, should set the thread_logging_flag system parameter value to include flag.
  //    For instance, to log connections, the bit for LOG_WORKER_POOL_CONNECTIONS should be set.
  //
  // system parameter flags for thread logging
  // manager flags
  const int LOG_MANAGER = 0x1;
  const int LOG_MANAGER_ALL = 0xFF;          // reserved for thread manager

  // worker pool flags
  const int LOG_WORKER_POOL_VACUUM = 0x100;
  const int LOG_WORKER_POOL_CONNECTIONS = 0x200;
  const int LOG_WORKER_POOL_TRAN_WORKERS = 0x400;
  const int LOG_WORKER_POOL_INDEX_BUILDER = 0x800;
  const int LOG_WORKER_POOL_ALL = 0xFF00;    // reserved for thread worker pools

  // daemons flags
  const int LOG_DAEMON_VACUUM = 0x10000;
  const int LOG_DAEMON_ALL = 0xFFFF0000;     // reserved for thread daemons

  bool is_logging_configured (const int logging_flag);

  // thread global functions

  // initialize thread manager; note this creates a singleton cubthread::manager instance
  void initialize (entry *&my_entry);

  // finalize thread manager
  void finalize (void);

  // backward compatibility initialization
  int initialize_thread_entries (bool with_lock_free = true);

  // get thread manager
  manager *get_manager (void);

  // get maximum thread count
  std::size_t get_max_thread_count (void);

  // is_single_thread context; e.g. SA_MODE
  // todo: sometimes SERVER_MODE can be single-thread; e.g. during boot
  bool is_single_thread (void);
  // safe-guard for multi-thread features not being used in single-thread context
  void check_not_single_thread (void);

  // get current thread's entry
  entry &get_entry (void);
  void set_thread_local_entry (entry &tl_entry);      // for unit test easy mock-ups
  void clear_thread_local_entry (void);               // for unit test easy mock-ups

  void return_lock_free_transaction_entries (void);

  // template / inline functions

  template<typename Res, typename ... CtArgs>
  Res *
  manager::create_worker_pool (std::size_t pool_size, std::size_t core_count, CtArgs &&... args)
  {
    static_assert (std::is_base_of_v<worker_pool, Res>);

#if defined (SERVER_MODE)
    Res *workerpool;

    assert (m_worker_pools.size () <= workerpool_registry_t::count ());

    // reserve pool_size entries and add to m_worker_pools
    workerpool = create_and_track_resource<Res> (m_worker_pools, pool_size,
         pool_size, core_count, std::forward<CtArgs> (args)...);
    if (workerpool)
      {
    workerpool->initialize (pool_size, core_count);
      }
    return workerpool;
#else // not SERVER_MODE = SA_MODE
    return NULL;
#endif // not SERVER_MODE = SA_MODE
  }

  template <typename Res>
  void
  manager::destroy_worker_pool (Res *&worker_pool_arg)
  {
#if defined (SERVER_MODE)
    if (worker_pool_arg == NULL)
      {
    return;
      }
    // remove from m_worker_pools and free worker_pool_arg->get_worker_count thread entries
    worker_pool *base_arg = worker_pool_arg;
    destroy_and_untrack_resource (m_worker_pools, base_arg, worker_pool_arg->get_worker_count ());
    worker_pool_arg = NULL;
#else // not SERVER_MODE = SA_MODE
    assert (worker_pool_arg == NULL);
#endif // not SERVER_MODE = SA_MODE
  }

  template <typename Func, typename ... Args>
  void
  manager::map_entries (Func &&func, Args &&... args)
  {
    bool stop = false;
    for (std::size_t i = 0; i < m_max_threads; i++)
      {
    func (m_all_entries[i], stop, std::forward<Args> (args)...);
    if (stop)
      {
        break;
      }
      }
  }

  template <typename Res, typename Base, typename ... CtArgs>
  Res *
  manager::create_and_track_resource (std::vector<Base *> &tracker, size_t entries_count, CtArgs &&... args)
  {
    check_not_single_thread ();

    std::lock_guard<std::mutex> lock (m_entries_mutex);

    if (m_available_entries_count < entries_count)
      {
    return NULL;
      }
    m_available_entries_count -= entries_count;

    Res *new_res = new Res (std::forward<CtArgs> (args)...);

    tracker.push_back (new_res);

    return new_res;
  }

  template<typename Res>
  void
  manager::destroy_and_untrack_resource (std::vector<Res *> &tracker, Res *&res, std::size_t entries_count)
  {
    check_not_single_thread ();

    std::lock_guard<std::mutex> lock (m_entries_mutex);

    for (auto iter = tracker.begin (); iter != tracker.end (); ++iter)
      {
    if (res == *iter)
      {
        // remove resource from tracker
        (void) tracker.erase (iter);

        // stop resource and delete
        res->stop_execution ();
        delete res;
        res = NULL;

        // update available entries
        m_available_entries_count += entries_count;

        return;
      }
      }
    // resource not found
    assert (false);
  }

  template<typename Res>
  void
  manager::destroy_and_untrack_all_resources (std::vector<Res *> &tracker)
  {
    assert (tracker.empty ());

#if defined (SERVER_MODE)
    for (; !tracker.empty ();)
      {
    const auto iter = tracker.begin ();
    (*iter)->stop_execution ();
    delete *iter;
    tracker.erase (iter);
      }
#endif // SERVER_MODE
  }

} // namespace cubthread

// macros to count the number of entries

#define REGISTER_CONNECTION(name, getter) static cubthread::manager::connection_registry_t _gl_reg_conn_##name (#name, getter)
#define REGISTER_WORKERPOOL(name, getter) static cubthread::manager::workerpool_registry_t _gl_reg_wp_##name (#name, getter)
#define REGISTER_DAEMON(name) static cubthread::manager::daemon_registry_t _gl_reg_daemon_##name (#name, 1)

// alias functions to be used in C legacy code
//
// use inline functions instead of definitions

inline cubthread::manager *
thread_get_manager (void)
{
  return cubthread::get_manager ();
}

inline cubthread::entry_manager &
thread_get_entry_manager (void)
{
  return cubthread::get_manager ()->get_entry_manager ();
}

inline cubthread::worker_pool_type *
thread_create_worker_pool (std::size_t pool_size, std::size_t core_count, const char *name,
               cubthread::entry_manager &entry_mgr, bool pool_threads = false)
{
  return cubthread::get_manager ()->create_worker_pool<cubthread::worker_pool_type> (pool_size, core_count, name,
     entry_mgr, pool_threads);
}

inline cubthread::stats_worker_pool_type *
thread_create_stats_worker_pool (std::size_t pool_size, std::size_t core_count, const char *name,
                 cubthread::entry_manager &entry_mgr, bool pool_threads = false,
                 cubthread::wait_seconds idle_timeout = std::chrono::seconds (5))
{
  return cubthread::get_manager ()->create_worker_pool<cubthread::stats_worker_pool_type> (pool_size, core_count, name,
     entry_mgr, pool_threads, idle_timeout);
}

inline std::size_t
thread_num_total_threads (void)
{
  return cubthread::get_max_thread_count ();
}

inline cubthread::entry *
thread_get_thread_entry_info (void)
{
  cubthread::entry &te = cubthread::get_entry ();
  return &te;
}

inline int
thread_get_entry_index (cubthread::entry *thread_p)
{
  if (thread_p == NULL)
    {
      thread_p = thread_get_thread_entry_info ();
    }

  return thread_p->index;
}

inline int
thread_get_current_entry_index (void)
{
  return thread_get_entry_index (thread_get_thread_entry_info ());
}

inline void
thread_return_lock_free_transaction_entries (void)
{
  return cubthread::return_lock_free_transaction_entries ();
}

// todo - we really need to do some refactoring for lock-free structures
inline lf_tran_entry *
thread_get_tran_entry (cubthread::entry *thread_p, int entry_idx)
{
  if (thread_p == NULL)
    {
      thread_p = thread_get_thread_entry_info ();
    }
  if (entry_idx >= 0 && entry_idx < THREAD_TS_LAST)
    {
      return thread_p->tran_entries[entry_idx];
    }
  else
    {
      assert (false);
      return NULL;
    }
}

template <typename Duration>
inline void
thread_sleep_for (Duration d)
{
  std::this_thread::sleep_for (d);
}

inline void
thread_sleep (double millisec)
{
  // try to avoid this and use thread_sleep_for instead
  std::chrono::duration<double, std::milli> duration_millis (millisec);
  thread_sleep_for (duration_millis);
}

inline void
thread_clear_all_holder_anchor (void)
{
  cubthread::get_entry ().m_holder_anchor = NULL;
  return cubthread::get_manager ()->clear_all_holder_anchor ();
}

#endif  // _THREAD_MANAGER_HPP_