Skip to content

File pl_sr.cpp

File List > cubrid > src > sp > pl_sr.cpp

Go to the documentation of this file

/*
 *
 * Copyright 2016 CUBRID Corporation
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 *
 */

/*
 * pl_sr.cpp - PL Server Module Source
 */

#include "pl_sr.h"

#if defined (SERVER_MODE) || defined (SA_MODE)
#include "boot_sr.h"
#endif

#if !defined(WINDOWS)
#include <sys/types.h>
#include <sys/wait.h>
#endif

#include "thread_manager.hpp"
#include "thread_task.hpp"
#if defined (SERVER_MODE)
#include "thread_entry.hpp"
#include "thread_looper.hpp"
#include "thread_daemon.hpp"
#include "boot_sr.h"
#else
#include "dbi.h"
#include "boot.h"
#endif

#include "dbtype.h"
#include "pl_comm.h"
#include "pl_connection.hpp"
#include "process_util.h"
#include "environment_variable.h"
#include "system_parameter.h"
#include "release_string.h"
#include "memory_alloc.h"
#include "error_manager.h"
#include "method_struct_invoke.hpp"
#include "method_struct_value.hpp"
#include "pl_session.hpp"

// XXX: SHOULD BE THE LAST INCLUDE HEADER
#include "memory_wrapper.hpp"
namespace cubpl
{
// Declarations

  class server_monitor_task;
  struct bootstrap_request;

  /*********************************************************************
   * server_manager - declaration
   *********************************************************************/
  class server_manager final
  {
    public:
      static constexpr std::size_t CONNECTION_POOL_SIZE = 10;

      explicit server_manager (const char *db_name);

      ~server_manager ();

      server_manager (const server_manager &copy) = delete; // Not CopyConstructible
      server_manager &operator= (const server_manager &copy) = delete;  // Not CopyAssignable

      server_manager (server_manager &&other) = delete; // Not MoveConstructible
      server_manager &operator= (server_manager &&other) = delete;  // Not MoveAssignable

      /*
      * start () - start the PL server through monitoring task
      */
      void start ();

      /*
      * wait_for_server_ready() - check if the server is ready to accept connection
      */
      int wait_for_server_ready ();

      /*
      * get_connection_pool() - get the connection pool
      */
      connection_pool *get_connection_pool ();

      /*
      * get_pl_ctx_params() - get the PL context parameters
      */
      SYSPRM_ASSIGN_VALUE *get_pl_ctx_params ();

      /*
      * get_db_name () - get the database name
      */
      std::string get_db_name () const
      {
    return m_db_name;
      }

    private:
      std::string m_db_name;
      server_monitor_task *m_server_monitor_task;
      connection_pool *m_connection_pool;

#if defined (SERVER_MODE)
      cubthread::daemon *m_monitor_helper_daemon = nullptr;
#endif

      SYSPRM_ASSIGN_VALUE *m_pl_ctx_params;
  };

  /*********************************************************************
   * server_monitor_task - declaration
   *********************************************************************/

#if defined (SERVER_MODE)
  class server_monitor_task : public cubthread::entry_task
#else
  class server_monitor_task
#endif
  {
    public:
      enum server_monitor_state
      {
    SERVER_MONITOR_STATE_RUNNING,
    SERVER_MONITOR_STATE_STOPPED,
    SERVER_MONITOR_STATE_READY_TO_INITIALIZE,
    SERVER_MONITOR_STATE_FAILED_TO_FORK,
    SERVER_MONITOR_STATE_FAILED_TO_INITIALIZE,
    SERVER_MONITOR_STATE_UNKNOWN
      };

      server_monitor_task (server_manager *manager, std::string db_name);
      ~server_monitor_task ();

      server_monitor_task (const server_monitor_task &copy) = delete;   // Not CopyConstructible
      server_monitor_task &operator= (const server_monitor_task &copy) = delete;    // Not CopyAssignable

      server_monitor_task (server_monitor_task &&other) = delete;   // Not MoveConstructible
      server_monitor_task &operator= (server_monitor_task &&other) = delete;    // Not MoveAssignable

#if defined (SERVER_MODE)
      // called by daemon thread
      void execute (context_type &thread_ref) override;
#endif

      // internal main routine
      // This function is called by daemon thread (SERVER_MODE) or main thread (SA_MODE)
      void do_monitor ();

      // wait until PL server is initialized
      void wait_for_ready ();

      bool is_running () const;

    private:
      int do_initialize ();

      // check functions for PL server state
      void do_check_state (bool hang_check);

      int do_check_connection (int fail_cnt);
      int do_ping_connection ();

      /*
      * do_bootstrap_request() - send a bootstrap request to PL server
      */
      int do_bootstrap_request ();

      server_manager *m_manager;

      int m_pid;
      server_monitor_state m_state;
      std::string m_db_name;
      std::string m_binary_name;
      std::string m_executable_path;
      const char *m_argv[3];
      int m_failure_count;

      connection_pool *m_sys_conn_pool;
      bootstrap_request *m_bootstrap_request;

#if defined (SERVER_MODE)
      std::mutex m_monitor_mutex;
      std::condition_variable m_monitor_cv;
#endif
  };

  struct bootstrap_request : public cubpacking::packable_object
  {
    cubmethod::header req_header;
    std::vector <sys_param> server_params;

    bootstrap_request (SYSPRM_ASSIGN_VALUE *pl_ctx_values);
    ~bootstrap_request () = default;

    void pack (cubpacking::packer &serializator) const override;
    void unpack (cubpacking::unpacker &deserializator) override;
    size_t get_packed_size (cubpacking::packer &serializator, std::size_t start_offset) const override;
  };

// Definitions

  REGISTER_DAEMON (pl_monitor);

  /*********************************************************************
   * server_manager - definition
   *********************************************************************/
  server_manager::server_manager (const char *db_name)
    : m_db_name (db_name)
  {
    m_server_monitor_task = new server_monitor_task (this, m_db_name);
#if defined (SERVER_MODE)
    m_monitor_helper_daemon = nullptr;
#endif
    m_connection_pool = new connection_pool (server_manager::CONNECTION_POOL_SIZE, db_name);

    m_pl_ctx_params = nullptr;
  }

  server_manager::~server_manager ()
  {
#if defined (SERVER_MODE)
    if (m_monitor_helper_daemon)
      {
    cubthread::get_manager ()->destroy_daemon (m_monitor_helper_daemon);
    m_monitor_helper_daemon = nullptr;
      }

    if (m_connection_pool)
      {
    delete m_connection_pool;
    m_connection_pool = nullptr;
      }
#endif

    if (m_pl_ctx_params)
      {
    sysprm_free_assign_values (&m_pl_ctx_params);
      }
  }

  void
  server_manager::start ()
  {
#if defined (SERVER_MODE)
    cubthread::looper looper = cubthread::looper (std::chrono::milliseconds (1000));
    m_monitor_helper_daemon = cubthread::get_manager ()->create_daemon (looper, m_server_monitor_task, "pl-monitor");
#else
    m_server_monitor_task->do_monitor ();
#endif
  }

  int
  server_manager::wait_for_server_ready ()
  {
    m_server_monitor_task->wait_for_ready ();
    if (m_server_monitor_task->is_running ())
      {
    return NO_ERROR;
      }
    else
      {
    er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_SP_CANNOT_START_PL_SERVER, 1,
        m_db_name.c_str ());
    return er_errid ();
      }
  }

  connection_pool *
  server_manager::get_connection_pool ()
  {
    return m_connection_pool;
  }

  SYSPRM_ASSIGN_VALUE *
  server_manager::get_pl_ctx_params ()
  {
    if (m_pl_ctx_params == nullptr)
      {
    /* late initialization */
    m_pl_ctx_params = xsysprm_get_pl_context_parameters (PRM_ALL_FLAGS);
      }
    return m_pl_ctx_params;
  }

  /*********************************************************************
   * server_monitor_task - definition
   *********************************************************************/
  server_monitor_task::server_monitor_task (server_manager *manager, std::string db_name)
    : m_manager (manager)
    , m_pid (-1)
    , m_state (SERVER_MONITOR_STATE_STOPPED)
    , m_db_name (db_name)
#if defined(WINDOWS)
    , m_binary_name ("cub_pl.exe")
#else
    , m_binary_name ("cub_pl")
#endif
    , m_argv {m_binary_name.c_str (), m_db_name.c_str (), 0}
    , m_failure_count (0)
    , m_sys_conn_pool {nullptr}
    , m_bootstrap_request {nullptr}
#if defined (SERVER_MODE)
    , m_monitor_mutex {}
    , m_monitor_cv {}
#endif
  {
    char executable_path[PATH_MAX];
    (void) envvar_bindir_file (executable_path, PATH_MAX, m_binary_name.c_str ());
    m_executable_path.assign (executable_path, PATH_MAX);
  }

  server_monitor_task::~server_monitor_task ()
  {
    if (m_bootstrap_request != nullptr)
      {
    delete m_bootstrap_request;
    m_bootstrap_request = nullptr;
      }

    if (m_sys_conn_pool != nullptr)
      {
    delete m_sys_conn_pool;
    m_sys_conn_pool = nullptr;
      }
  }

#if defined (SERVER_MODE)
  void
  server_monitor_task::execute (context_type &thread_ref)
  {
    do_monitor ();
  }
#endif

  void
  server_monitor_task::do_monitor ()
  {
    (void) do_check_state (false);

    if (m_state == SERVER_MONITOR_STATE_STOPPED || m_state == SERVER_MONITOR_STATE_FAILED_TO_FORK)
      {
    int status;

    pl_reset_info (m_db_name.c_str ());
    int pid = create_child_process (m_executable_path.c_str (), m_argv, 0 /* do not wait */, nullptr, nullptr, nullptr,
                    &status);
    if (pid > 1) // parent
      {
        m_pid = pid;
        sleep (1);
        m_state = SERVER_MONITOR_STATE_READY_TO_INITIALIZE;
      }
    else if (pid == 1) // fork error
      {
        er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ERR_CSS_CANNOT_FORK, 0);
        m_state = SERVER_MONITOR_STATE_FAILED_TO_FORK;
        m_failure_count++;
      }
    else
      {
        // wait flag is not set, never reach here
        assert (false);
      }
      }

    if (m_state == SERVER_MONITOR_STATE_READY_TO_INITIALIZE)
      {
    do_initialize ();
      }
  }

  void
  server_monitor_task::wait_for_ready ()
  {
    if (m_state == SERVER_MONITOR_STATE_READY_TO_INITIALIZE)
      {
#if defined (SA_MODE)
    assert (lang_is_all_initialized ());
#endif
    do_initialize ();
      }

#if defined (SERVER_MODE)
    auto pred = [this] () -> bool { return m_state == SERVER_MONITOR_STATE_RUNNING ||
                       (!BO_IS_SERVER_RESTARTED () && m_state == SERVER_MONITOR_STATE_FAILED_TO_INITIALIZE);
                  };

    std::unique_lock<std::mutex> ulock (m_monitor_mutex);
    m_monitor_cv.wait (ulock, pred);
#else
    if (m_state != SERVER_MONITOR_STATE_RUNNING)
      {
    // retry starting pl server
    int try_count = 0;
    do
      {
        m_state = SERVER_MONITOR_STATE_UNKNOWN;
        do_monitor ();
      }
    while (try_count++ < 10 && m_state != SERVER_MONITOR_STATE_RUNNING);
      }
#endif
  }

  bool
  server_monitor_task::is_running () const
  {
    return m_state == SERVER_MONITOR_STATE_RUNNING;
  }

  int
  server_monitor_task::do_initialize ()
  {
    int error = ER_FAILED;

    assert (m_state == SERVER_MONITOR_STATE_READY_TO_INITIALIZE);
    if (!lang_is_all_initialized ())
      {
    return error;
      }

#if defined (SERVER_MODE)
    std::lock_guard<std::mutex> lock (m_monitor_mutex);
#endif

    // wait PL server is ready to accept connection (polling)

    // TODO: parameterize this
    constexpr int MAX_FAIL_COUNT = 10;
    error = do_check_connection (MAX_FAIL_COUNT);

    // set unknown state here
#if defined (SERVER_MODE)
    m_state = SERVER_MONITOR_STATE_UNKNOWN;
#else
    m_state = SERVER_MONITOR_STATE_FAILED_TO_INITIALIZE;
#endif

    if (error == NO_ERROR)
      {
    error = do_bootstrap_request ();
    if (error == NO_ERROR)
      {
        // notify server is ready
        m_state = SERVER_MONITOR_STATE_RUNNING;
        m_failure_count = 0;
      }
      }

    // re-initialize connection pool
    if (m_manager->get_connection_pool ()->get_db_port () != PL_PORT_UDS_MODE)
      {
    // set the port number possibly randomly assigned in TCP mode
    m_manager->get_connection_pool ()->set_db_port (pl_server_port_from_info ());
      }
    m_manager->get_connection_pool ()->increment_epoch ();

#if defined (SERVER_MODE)
    m_monitor_cv.notify_all();
#endif

    return error;
  }

  void
  server_monitor_task::do_check_state (bool hang_check)
  {
    /* state transition */
    switch (m_state)
      {
      case SERVER_MONITOR_STATE_STOPPED:
#if defined(SA_MODE)
    if (do_check_connection (1) == NO_ERROR)
      {
        // Waiting for PL server in shutdown state
        m_state = SERVER_MONITOR_STATE_UNKNOWN;
      }
#else
    /* do nothing */
#endif
    break;
      case SERVER_MONITOR_STATE_RUNNING:
      case SERVER_MONITOR_STATE_READY_TO_INITIALIZE:
    if (m_pid > 0 && !is_terminated_process (m_pid))
      {
        // stay in the same state
      }
    else
      {
        er_log_debug (ARG_FILE_LINE, "PL server is terminated. pid=%d\n", m_pid);
        m_state = SERVER_MONITOR_STATE_STOPPED;
      }
    break;

      case SERVER_MONITOR_STATE_FAILED_TO_FORK:
      {
    if (m_failure_count > 10)
      {
        // After several failed attempts, we should consider the PL server is not able to start
        m_state = SERVER_MONITOR_STATE_FAILED_TO_INITIALIZE;
        er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_SP_CANNOT_START_PL_SERVER, 1,
            "Failed to initialize the PL server. Verify that the server environment and configurations are properly set up");
#if defined (SERVER_MODE)
        m_monitor_cv.notify_all ();
#endif
      }
      }
      break;

      case SERVER_MONITOR_STATE_UNKNOWN:
      case SERVER_MONITOR_STATE_FAILED_TO_INITIALIZE:
    if (m_pid == -1 || (m_pid > 0 && is_terminated_process (m_pid)))
      {
        // PL server is terminated by user (cubrid pl restart)
        m_state = SERVER_MONITOR_STATE_STOPPED;
        m_failure_count = 0;
      }

    if (m_state == SERVER_MONITOR_STATE_UNKNOWN)
      {
        m_failure_count++;
        if (m_failure_count > 10)
          {
        // After several failed attempts, we should consider the PL server is not able to start
        m_state = SERVER_MONITOR_STATE_FAILED_TO_INITIALIZE;
        er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_SP_CANNOT_START_PL_SERVER, 1,
            "Failed to initialize the PL server. Verify that the server environment and configurations are properly set up");
#if defined (SERVER_MODE)
        m_monitor_cv.notify_all ();
#endif
          }
        else
          {
        m_state = SERVER_MONITOR_STATE_READY_TO_INITIALIZE; // retry initialization
          }
      }
    break;
      }
  }

  int
  server_monitor_task::do_check_connection (int fail_cnt)
  {
    int error = NO_ERROR;
    int c = 0;
    do
      {
    error = do_ping_connection ();
    if (error == NO_ERROR || ++c > fail_cnt)
      {
        break;
      }

    /* The contents of the pl file may have changed, so set it to read again. */
    assert (m_sys_conn_pool);
    m_sys_conn_pool->set_db_port (pl_server_port_from_info ());

    thread_sleep (1000);    /* 1000 msec */
      }
    while (c < fail_cnt);

    return error;
  }

  int
  server_monitor_task::do_ping_connection ()
  {
    int error = NO_ERROR;

    if (m_sys_conn_pool == nullptr)
      {
    m_sys_conn_pool = new connection_pool (5, m_db_name, pl_server_port_from_info (), true);
      }

    cubmem::block ping_response;
    connection_view cv = m_sys_conn_pool->claim ();
    cubmethod::header header (DB_EMPTY_SESSION, SP_CODE_UTIL_PING);

    auto ping = [&] ()
    {
      int error = cv->send_buffer_args (header);
      if (error == NO_ERROR)
    {
      error = cv->receive_buffer (ping_response);
    }
      return error;
    };

    error = ping ();
    if (error != NO_ERROR)
      {
    // retry
    error = ping ();
      }

exit:
    ping_response.freemem ();

    cv.reset ();

    return (error);
  }

  int
  server_monitor_task::do_bootstrap_request ()
  {
    int error = ER_FAILED;
    if (m_bootstrap_request == nullptr)
      {
    m_bootstrap_request = new bootstrap_request (m_manager->get_pl_ctx_params ());
      }

    cubmem::block bootstrap_response;
    connection_view cv = m_sys_conn_pool->claim ();

    error = cv->send_buffer_args (*m_bootstrap_request);
    if (error == NO_ERROR)
      {
    error = cv->receive_buffer (bootstrap_response);
      }

    if (error == NO_ERROR && bootstrap_response.is_valid ())
      {
    packing_unpacker deserializator (bootstrap_response);
    deserializator.unpack_int (error);

    bootstrap_response.freemem ();
      }

    return error;
  }

  /*********************************************************************
   * bootstrap_request - definition
   *********************************************************************/
#define BOOTSTRAP_REQ_ARGS() \
  req_header, server_params

  bootstrap_request::bootstrap_request (SYSPRM_ASSIGN_VALUE *pl_ctx_values)
    : req_header (DB_EMPTY_SESSION, SP_CODE_UTIL_BOOTSTRAP)
    , server_params ()
  {
    while (pl_ctx_values != nullptr)
      {
    server_params.emplace_back (pl_ctx_values);
    pl_ctx_values = pl_ctx_values->next;
      }
  }

  void
  bootstrap_request::pack (cubpacking::packer &serializator) const
  {
    serializator.pack_all (BOOTSTRAP_REQ_ARGS ());
  }

  void
  bootstrap_request::unpack (cubpacking::unpacker &deserializator)
  {
    // do nothing
  }

  size_t
  bootstrap_request::get_packed_size (cubpacking::packer &serializator, std::size_t start_offset) const
  {
    return serializator.get_all_packed_size_starting_offset (start_offset, BOOTSTRAP_REQ_ARGS ());
  }
} // namespace cubpl

// High Level API for PL server module

static cubpl::server_manager *pl_server_manager = nullptr;

int
pl_server_init (const char *db_name)
{
  if (pl_server_manager != nullptr || prm_get_bool_value (PRM_ID_STORED_PROCEDURE) == false)
    {
      return NO_ERROR;
    }

#if defined (SA_MODE)
  if (!BOOT_NORMAL_CLIENT_TYPE (db_get_client_type ()))
    {
      return NO_ERROR;
    }
#endif

  pl_server_manager = new cubpl::server_manager (db_name);
  pl_server_manager->start ();

  return NO_ERROR;
}

void
pl_server_destroy ()
{
  if (pl_server_manager != nullptr)
    {
      delete pl_server_manager;
      pl_server_manager = nullptr;
    }
}

int
pl_server_wait_for_ready ()
{
  if (pl_server_manager)
    {
      return pl_server_manager->wait_for_server_ready ();
    }

  return NO_ERROR;
}

PL_CONNECTION_POOL *get_connection_pool ()
{
  if (pl_server_manager)
    {
      return pl_server_manager->get_connection_pool ();
    }
  else
    {
      er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_SP_NOT_RUNNING_PL_SERVER, 0);
      return nullptr;
    }
}


/*
 * pl_server_port_from_info
 *   return: if jsp is disabled return -2 (PL_PORT_DISABLED)
 *           else if jsp is UDS mode return -1
 *           else return a port (TCP mode)
 *
 *
 * Note:
 */

static int sp_port = PL_PORT_DISABLED;

int
pl_server_port_from_info (void)
{
  // check $CUBRID/var/pl_<db_name>.info
  PL_SERVER_INFO pl_info {-1, -1};
  pl_read_info (boot_db_name (), pl_info);
  sp_port = pl_info.port;

  return sp_port;
}