File master_server_monitor.cpp¶
File List > cubrid > src > executables > master_server_monitor.cpp
Go to the documentation of this file
/*
*
* Copyright 2016 CUBRID Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
//
// master_server_monitor.cpp - Server Revive monitoring module
//
#include <sstream>
#include <algorithm>
#include <unistd.h>
#include <signal.h>
#include "system_parameter.h"
#include "master_server_monitor.hpp"
std::unique_ptr<server_monitor> master_Server_monitor = nullptr;
bool auto_Restart_server = false;
server_monitor::server_monitor ()
{
m_server_entry_map = std::unordered_map<std::string, server_entry> ();
{
std::lock_guard<std::mutex> lock (m_server_monitor_mutex);
m_thread_shutdown = false;
}
start_monitoring_thread ();
er_log_debug (ARG_FILE_LINE, "[Server Monitor] Monitoring started.");
}
// In server_monitor destructor, it should guarentee that
// m_monitoring_thread is terminated before m_monitor_list is deleted.
server_monitor::~server_monitor ()
{
stop_monitoring_thread ();
er_log_debug (ARG_FILE_LINE, "[Server Monitor] Monitoring finished.");
}
void
server_monitor::start_monitoring_thread ()
{
m_monitoring_thread = std::make_unique<std::thread> (&server_monitor::server_monitor_thread_worker, this);
}
void
server_monitor::stop_monitoring_thread ()
{
{
std::lock_guard<std::mutex> lock (m_server_monitor_mutex);
m_thread_shutdown = true;
}
m_monitor_cv_consumer.notify_all();
m_monitoring_thread->join();
}
void
server_monitor::server_monitor_thread_worker ()
{
job job;
while (true)
{
{
std::unique_lock<std::mutex> lock (m_server_monitor_mutex);
m_monitor_cv_consumer.wait (lock, [this]
{
return !m_job_queue.empty () || m_thread_shutdown;
});
if (m_thread_shutdown)
{
break;
}
else
{
assert (!m_job_queue.empty ());
consume_job (job);
}
}
process_job (job);
}
}
void
server_monitor::register_server_entry (int pid, const std::string &exec_path, const std::string &args,
const std::string &server_name
)
{
auto entry = m_server_entry_map.find (server_name);
if (entry != m_server_entry_map.end ())
{
entry->second.set_pid (pid);
entry->second.set_need_revive (false);
entry->second.set_last_revived_time (std::chrono::steady_clock::now ());
er_log_debug (ARG_FILE_LINE,
"[Server Monitor] [%s] Server entry has been registered. (pid : %d)",
server_name.c_str(), pid);
}
else
{
m_server_entry_map.emplace (std::move (server_name), server_entry (pid, exec_path, args,
std::chrono::steady_clock::time_point ()));
er_log_debug (ARG_FILE_LINE,
"[Server Monitor] [%s] Server entry has been registered newly. (pid : %d)",
server_name.c_str(), pid);
}
}
void
server_monitor::remove_server_entry (const std::string &server_name)
{
auto entry = m_server_entry_map.find (server_name);
// When the stop command for the same CUBRID server is invoked concurrently,
// the corresponding server entry may have already been removed.
// Therefore, the assertion should be skipped to avoid unnecessary failures.
if (entry != m_server_entry_map.end ())
{
er_log_debug (ARG_FILE_LINE,
"[Server Monitor] [%s] Server entry has been unregistered. (pid : %d)",
server_name.c_str(), entry->second.get_pid());
m_server_entry_map.erase (entry);
}
}
void
server_monitor::revive_server (const std::string &server_name)
{
int error_code;
// Unacceptable revive time difference is set to be 120 seconds
// as the timediff of server restart mechanism of heartbeat.
constexpr int SERVER_MONITOR_UNACCEPTABLE_REVIVE_TIMEDIFF_IN_SECS = 120;
std::chrono::steady_clock::time_point tv;
int out_pid;
auto entry = m_server_entry_map.find (server_name);
if (entry != m_server_entry_map.end ())
{
entry->second.set_need_revive (true);
tv = std::chrono::steady_clock::now ();
auto timediff = std::chrono::duration_cast<std::chrono::seconds> (tv -
entry->second.get_last_revived_time()).count();
bool revived_before = entry->second.get_last_revived_time () != std::chrono::steady_clock::time_point ();
// If the server is abnormally terminated and revived within a short period of time, it is considered as a repeated failure.
// For HA server, heartbeat handle this case as demoting the server from master to slave and keep trying to revive the server.
// However, in this case, the server_monitor will not try to revive the server due to following reasons.
// 1. preventing repeated creation of core files.
// 2. The service cannot be recovered even if revived if the server abnormally terminates again within a short time.
// TODO: Consider retry count for repeated failure case, and give up reviving the server after several retries.
// TODO: The timediff value continues to increase if REVIVE_SERVER handling is repeated. Thus, the if condition will always be
// true after the first evaluation. Therefore, evaluating the timediff only once when producing the REVIVE_SERVER job is needed.
// (Currently, it is impossible since last_revived_time is stored in server_entry, which is not synchronized structure between monitor and main thread.)
if (!revived_before || timediff > SERVER_MONITOR_UNACCEPTABLE_REVIVE_TIMEDIFF_IN_SECS)
{
out_pid = try_revive_server (entry->second.get_exec_path(), entry->second.get_argv());
if (out_pid == -1)
{
er_log_debug (ARG_FILE_LINE,
"[Server Monitor] [%s] Failed to fork server process. Server monitor try to revive server again.",
entry->first.c_str());
produce_job_internal (job_type::REVIVE_SERVER, -1, "", "", entry->first);
}
else
{
entry->second.set_pid (out_pid);
er_log_debug (ARG_FILE_LINE,
"[Server Monitor] [%s] Server monitor is waiting for server to be registered. (pid : %d)",
entry->first.c_str(), entry->second.get_pid());
produce_job_internal (job_type::CONFIRM_REVIVE_SERVER, -1, "", "",
entry->first);
}
return;
}
else
{
er_log_debug (ARG_FILE_LINE,
"[Server Monitor] [%s] Server process failures occurred again within a short period of time(%d sec). It will no longer be revived automatically. (pid : %d)",
entry->first.c_str(), SERVER_MONITOR_UNACCEPTABLE_REVIVE_TIMEDIFF_IN_SECS, entry->second.get_pid());
m_server_entry_map.erase (entry);
return;
}
}
}
void
server_monitor::check_server_revived (const std::string &server_name)
{
int error_code;
auto entry = m_server_entry_map.find (server_name);
if (entry != m_server_entry_map.end ())
{
error_code = kill (entry->second.get_pid (), 0);
if (error_code)
{
if (errno == ESRCH)
{
er_log_debug (ARG_FILE_LINE,
"[Server Monitor] [%s] Revived server process can not be found. Server monitor will try to revive server again. (pid : %d)",
entry->first.c_str(), entry->second.get_pid());
produce_job_internal (job_type::REVIVE_SERVER, -1, "", "", entry->first);
}
else
{
er_log_debug (ARG_FILE_LINE,
"[Server Monitor] [%s] Failed to revive server due to unknown error from kill() function. It will no longer be revived automatically. (pid : %d)",
entry->first.c_str(), entry->second.get_pid());
kill (entry->second.get_pid (), SIGKILL);
m_server_entry_map.erase (entry);
}
}
else if (entry->second.get_need_revive ())
{
// Server revive confirm interval is set to be 1 second to avoid busy waiting.
constexpr int SERVER_MONITOR_CONFIRM_REVIVE_INTERVAL_IN_SECS = 1;
std::this_thread::sleep_for (std::chrono::seconds (SERVER_MONITOR_CONFIRM_REVIVE_INTERVAL_IN_SECS));
produce_job_internal (job_type::CONFIRM_REVIVE_SERVER, -1, "", "",
entry->first);
}
else
{
er_log_debug (ARG_FILE_LINE, "[Server Monitor] [%s] Server revive success. (pid : %d)",
entry->first.c_str(),
entry->second.get_pid());
}
return;
}
}
int
server_monitor::try_revive_server (const std::string &exec_path, char *const *argv)
{
pid_t pid;
pid = fork ();
if (pid < 0)
{
return -1;
}
else if (pid == 0)
{
return execv (exec_path.c_str(), argv);
}
else
{
return pid;
}
}
void
server_monitor::shutdown_server (const std::string &server_name)
{
int rv;
auto entry = m_server_entry_map.find (server_name);
if (entry != m_server_entry_map.end ())
{
if (entry->second.get_need_revive ())
{
er_log_debug (ARG_FILE_LINE,
"[Server Monitor] [%s] Server is shutdown. Reviving the server will not be tried.",
server_name.c_str());
}
else
{
css_process_start_shutdown_by_name (const_cast<char *> (server_name.c_str()));
er_log_debug (ARG_FILE_LINE,
"[Server Monitor] [%s] Server is already revived. Server monitor will terminate the server. (pid : %d)",
server_name.c_str(), entry->second.get_pid());
}
m_server_entry_map.erase (entry);
}
}
void
server_monitor::produce_job_internal (job_type job_type, int pid, const std::string &exec_path,
const std::string &args, const std::string &server_name)
{
std::lock_guard<std::mutex> lock (m_server_monitor_mutex);
m_job_queue.emplace (job_type, pid, exec_path, args, server_name);
}
void
server_monitor::produce_job (job_type job_type, int pid, const std::string &exec_path,
const std::string &args, const std::string &server_name)
{
produce_job_internal (job_type, pid, exec_path, args, server_name);
m_monitor_cv_consumer.notify_all();
}
void
server_monitor::consume_job (job &consume_job)
{
consume_job = std::move (m_job_queue.front ());
m_job_queue.pop ();
}
void
server_monitor::process_job (job &consume_job)
{
switch (consume_job.get_job_type ())
{
case job_type::REGISTER_SERVER:
register_server_entry (consume_job.get_pid(), consume_job.get_exec_path(), consume_job.get_args(),
consume_job.get_server_name());
break;
case job_type::UNREGISTER_SERVER:
remove_server_entry (consume_job.get_server_name());
break;
case job_type::REVIVE_SERVER:
revive_server (consume_job.get_server_name());
break;
case job_type::CONFIRM_REVIVE_SERVER:
check_server_revived (consume_job.get_server_name());
break;
case job_type::SHUTDOWN_SERVER:
shutdown_server (consume_job.get_server_name());
break;
case job_type::JOB_MAX:
default:
assert (false);
break;
}
}
server_monitor::job::
job (job_type job_type, int pid, const std::string &exec_path, const std::string &args,
const std::string &server_name)
: m_job_type {job_type}
, m_pid {pid}
, m_exec_path {exec_path}
, m_args {args}
, m_server_name {server_name}
{
}
server_monitor::job_type
server_monitor::job::get_job_type () const
{
return m_job_type;
}
int
server_monitor::job::get_pid () const
{
return m_pid;
}
std::string
server_monitor::job::get_exec_path () const
{
return m_exec_path;
}
std::string
server_monitor::job::get_args () const
{
return m_args;
}
std::string
server_monitor::job::get_server_name () const
{
return m_server_name;
}
server_monitor::server_entry::
server_entry (int pid, const std::string &exec_path, const std::string &args,
std::chrono::steady_clock::time_point revive_time)
: m_pid {pid}
, m_exec_path {exec_path}
, m_need_revive {false}
, m_last_revived_time {revive_time}
{
if (args.size() > 0)
{
proc_make_arg (args);
}
}
server_monitor::server_entry::~server_entry ()
{
if (m_argv)
{
for (int i = 0; m_argv[i] != nullptr; i++)
{
delete[] m_argv[i];
}
m_argv.reset();
}
}
int
server_monitor::server_entry::get_pid () const
{
return m_pid;
}
std::string
server_monitor::server_entry::get_exec_path () const
{
return m_exec_path;
}
char *const *
server_monitor::server_entry::get_argv () const
{
return m_argv.get ();
}
bool
server_monitor::server_entry::get_need_revive () const
{
return m_need_revive;
}
std::chrono::steady_clock::time_point
server_monitor::server_entry::get_last_revived_time () const
{
return m_last_revived_time;
}
void
server_monitor::server_entry::set_pid (int pid)
{
m_pid = pid;
}
void
server_monitor::server_entry::set_exec_path (const std::string &exec_path)
{
m_exec_path = exec_path;
}
void
server_monitor::server_entry::set_need_revive (bool need_revive)
{
m_need_revive = need_revive;
}
void
server_monitor::server_entry::set_last_revived_time (std::chrono::steady_clock::time_point revive_time)
{
m_last_revived_time = revive_time;
}
void
server_monitor::server_entry::proc_make_arg (const std::string &args)
{
//argv is type of std::unique_ptr<const char *[]>
m_argv = std::make_unique<char *[]> (args.size () + 1);
std::istringstream iss (args);
std::string arg;
int i = 0;
while (std::getline (iss, arg, ' '))
{
m_argv[i] = new char[arg.size () + 1];
std::copy (arg.begin (), arg.end (), m_argv[i]);
m_argv[i][arg.size()] = '\0';
i++;
}
m_argv[i] = NULL;
}