File heartbeat.c¶
File List > connection > heartbeat.c
Go to the documentation of this file
/*
* Copyright 2008 Search Solution Corporation
* Copyright 2016 CUBRID Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
/*
* heartbeat.c - heartbeat resource process common
*/
#ident "$Id$"
#include "config.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <sys/types.h>
#include <assert.h>
#include <signal.h>
#if !defined(SERVER_MODE)
#include <atomic>
#include <mutex>
#endif
#if defined(WINDOWS)
#include <winsock2.h>
#include <windows.h>
#else /* WINDOWS */
#include <fcntl.h>
#include <sys/time.h>
#include <sys/ioctl.h>
#include <sys/uio.h>
#include <sys/socket.h>
#include <netinet/in.h>
#endif /* WINDOWS */
#if defined(_AIX)
#include <sys/select.h>
#endif /* _AIX */
#if defined(SOLARIS)
#include <sys/filio.h>
#include <netdb.h>
#endif /* SOLARIS */
#if defined(SOLARIS) || defined(LINUX)
#include <unistd.h>
#include <pthread.h>
#endif /* SOLARIS || LINUX */
#include "environment_variable.h"
#include "error_context.hpp"
#include "porting.h"
#include "system_parameter.h"
#include "error_manager.h"
#include "connection_defs.h"
#if defined(CS_MODE)
#include "client_support.h"
#else
#include "connection_support.hpp"
#endif
#if defined(WINDOWS)
#include "wintcp.h"
#else /* WINDOWS */
#include "tcp.h"
#endif /* WINDOWS */
#include "release_string.h"
#include "heartbeat.h"
#if defined(CS_MODE)
static THREAD_RET_T THREAD_CALLING_CONVENTION hb_thread_master_reader (void *arg);
static char *hb_pack_server_name (const char *server_name, int *name_length, const char *log_path, HB_PROC_TYPE type);
static CSS_CONN_ENTRY *hb_connect_to_master (const char *server_name, const char *log_path, HB_PROC_TYPE type);
static int hb_create_master_reader (void);
static int hb_process_master_request_info (CSS_CONN_ENTRY * conn);
static const char *hb_type_to_str (HB_PROC_TYPE type);
static int hb_process_master_request (void);
static CSS_CONN_ENTRY *hb_Conn = NULL;
#endif //#if defined(CS_MODE)
static char hb_Exec_path[PATH_MAX];
static char **hb_Argv;
bool hb_Proc_shutdown = false;
SOCKET hb_Pipe_to_master = INVALID_SOCKET;
/*
* hb_process_type_string () -
* return: process type string
*
* ptype(in):
*/
const char *
hb_process_type_string (int ptype)
{
switch (ptype)
{
case HB_PTYPE_SERVER:
return HB_PTYPE_SERVER_STR;
case HB_PTYPE_COPYLOGDB:
return HB_PTYPE_COPYLOGDB_STR;
case HB_PTYPE_APPLYLOGDB:
return HB_PTYPE_APPLYLOGDB_STR;
}
return "invalid";
}
/*
* hb_set_exec_path () -
* return: none
*
* exec_path(in):
*/
void
hb_set_exec_path (char *exec_path)
{
strncpy (hb_Exec_path, exec_path, sizeof (hb_Exec_path) - 1);
}
/*
* hb_set_argv () -
* return: none
*
* argv(in):
*/
void
hb_set_argv (char **argv)
{
hb_Argv = argv;
}
/*
* css_send_heartbeat_request () -
* return:
*
* conn(in):
* command(in):
*/
int
css_send_heartbeat_request (CSS_CONN_ENTRY * conn, int command)
{
int nbytes;
int request;
request = htonl (command);
if (conn && !IS_INVALID_SOCKET (conn->fd))
{
nbytes = send (conn->fd, (char *) &request, sizeof (int), 0);
if (nbytes == sizeof (int))
{
return (NO_ERRORS);
}
return (ERROR_ON_WRITE);
}
return CONNECTION_CLOSED;
}
/*
* css_send_heartbeat_data () -
* return:
*
* conn(in):
* data(in):
* size(in):
*/
int
css_send_heartbeat_data (CSS_CONN_ENTRY * conn, const char *data, int size)
{
int nbytes;
if (conn && !IS_INVALID_SOCKET (conn->fd))
{
nbytes = send (conn->fd, (char *) data, size, 0);
if (nbytes == size)
{
return (NO_ERRORS);
}
return (ERROR_ON_WRITE);
}
return CONNECTION_CLOSED;
}
/*
* css_receive_heartbeat_request () -
* return:
*
* conn(in):
* command(in):
*/
int
css_receive_heartbeat_request (CSS_CONN_ENTRY * conn, int *command)
{
int nbytes;
int request;
int size = sizeof (request);
if (conn && !IS_INVALID_SOCKET (conn->fd))
{
nbytes = css_readn (conn->fd, (char *) &request, size, -1);
if (nbytes == size)
{
*command = ntohl (request);
return NO_ERRORS;
}
return ERROR_ON_READ;
}
return CONNECTION_CLOSED;
}
/*
* css_receive_heartbeat_data () -
* return:
*
* conn(in):
* data(in):
* size(in):
*/
int
css_receive_heartbeat_data (CSS_CONN_ENTRY * conn, char *data, int size)
{
int nbytes;
if (conn && !IS_INVALID_SOCKET (conn->fd))
{
nbytes = css_readn (conn->fd, data, size, -1);
if (nbytes == size)
{
return NO_ERRORS;
}
return ERROR_ON_READ;
}
return CONNECTION_CLOSED;
}
/*
* hb_make_set_hbp_register () -
* return:
*
* type(in):
*/
HBP_PROC_REGISTER *
hb_make_set_hbp_register (int type)
{
HBP_PROC_REGISTER *hbp_register;
char *p, *last;
char **argv;
hbp_register = (HBP_PROC_REGISTER *) malloc (sizeof (HBP_PROC_REGISTER));
if (NULL == hbp_register)
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, sizeof (HBP_PROC_REGISTER));
return (NULL);
}
memset ((void *) hbp_register, 0, sizeof (HBP_PROC_REGISTER));
hbp_register->pid = htonl (getpid ());
hbp_register->type = htonl (type);
strncpy_bufsize (hbp_register->exec_path, hb_Exec_path);
p = (char *) &hbp_register->args[0];
last = (char *) (p + sizeof (hbp_register->args));
for (argv = hb_Argv; *argv; argv++)
{
p += snprintf (p, MAX ((last - p), 0), "%s ", *argv);
}
return (hbp_register);
}
/*
* hb_register_to_master () -
* return: NO_ERROR or ER_FAILED
*
* conn(in):
* type(in):
*/
int
hb_register_to_master (CSS_CONN_ENTRY * conn, int type)
{
int error;
HBP_PROC_REGISTER *hbp_register = NULL;
if (NULL == conn)
{
er_log_debug (ARG_FILE_LINE, "invalid conn. (conn:NULL).\n");
return (ER_FAILED);
}
hbp_register = hb_make_set_hbp_register (type);
if (NULL == hbp_register)
{
er_log_debug (ARG_FILE_LINE, "hbp_register failed. \n");
return (ER_FAILED);
}
if (!IS_INVALID_SOCKET (conn->fd))
{
error = css_send_heartbeat_request (conn, SERVER_REGISTER_HA_PROCESS);
if (error != NO_ERRORS)
{
goto error_return;
}
error = css_send_heartbeat_data (conn, (const char *) hbp_register, sizeof (*hbp_register));
if (error != NO_ERRORS)
{
goto error_return;
}
}
free_and_init (hbp_register);
return (NO_ERROR);
error_return:
free_and_init (hbp_register);
return (ER_FAILED);
}
#if defined(CS_MODE)
/*
* hb_thread_master_reader () -
* return: none
*
* arg(in):
*/
static THREAD_RET_T THREAD_CALLING_CONVENTION
hb_thread_master_reader (void *arg)
{
#if !defined(WINDOWS)
int error;
/* *INDENT-OFF* */
cuberr::context er_context (true);
/* *INDENT-ON* */
error = hb_process_master_request ();
if (error != NO_ERROR)
{
hb_process_term ();
/* wait 1 sec */
sleep (1);
/* is it ok? */
kill (getpid (), SIGTERM);
}
#endif
return (THREAD_RET_T) 0;
}
/*
* hb_deregister_from_master () -
* return: NO_ERROR or ER_FAILED
*
*/
int
hb_deregister_from_master (void)
{
int css_error;
int pid;
if (hb_Conn == NULL || IS_INVALID_SOCKET (hb_Conn->fd))
{
return ER_FAILED;
}
css_error = css_send_heartbeat_request (hb_Conn, SERVER_DEREGISTER_HA_PROCESS);
if (css_error != NO_ERRORS)
{
return ER_FAILED;
}
pid = htonl (getpid ());
css_error = css_send_heartbeat_data (hb_Conn, (char *) &pid, sizeof (pid));
if (css_error != NO_ERRORS)
{
return ER_FAILED;
}
return NO_ERROR;
}
/*
* hb_process_master_request_info () -
* return: NO_ERROR or ER_FAILED
*
* conn(in):
*/
static int
hb_process_master_request_info (CSS_CONN_ENTRY * conn)
{
int rc;
int command;
if (NULL == conn)
{
er_log_debug (ARG_FILE_LINE, "invalid conn. (conn:NULL).\n");
return (ER_FAILED);
}
rc = css_receive_heartbeat_request (conn, &command);
if (rc == NO_ERRORS)
{
/* Ignore request, just check connection is alive or not */
return (NO_ERROR);
}
return (ER_FAILED);
}
static const char *
hb_type_to_str (HB_PROC_TYPE type)
{
if (type == HB_PTYPE_COPYLOGDB)
{
return "copylogdb";
}
else if (type == HB_PTYPE_APPLYLOGDB)
{
return "applylogdb";
}
else
{
return "";
}
}
/*
* hb_process_to_master () -
* return: NO_ERROR or ER_FAILED
*
* argv(in):
*/
int
hb_process_master_request (void)
{
int error;
int r, status = 0;
struct pollfd po[1] = { {0, 0, 0} };
if (NULL == hb_Conn)
{
er_log_debug (ARG_FILE_LINE, "hb_Conn did not allocated yet. \n");
return (ER_FAILED);
}
while (false == hb_Proc_shutdown)
{
po[0].fd = hb_Conn->fd;
po[0].events = POLLIN;
r = css_platform_independent_poll (po, 1, (prm_get_integer_value (PRM_ID_TCP_CONNECTION_TIMEOUT) * 1000));
switch (r)
{
case 0:
break;
case -1:
if (!IS_INVALID_SOCKET (hb_Conn->fd)
#if defined(WINDOWS)
&& ioctlsocket (hb_Conn->fd, FIONREAD, (u_long *) (&status)) == SOCKET_ERROR
#else /* WINDOWS */
&& fcntl (hb_Conn->fd, F_GETFL, status) < 0
#endif /* WINDOWS */
)
hb_Proc_shutdown = true;
break;
default:
error = hb_process_master_request_info (hb_Conn);
if (NO_ERROR != error)
{
hb_Proc_shutdown = true;
}
break;
}
}
return (ER_FAILED);
}
/*
* hb_pack_server_name() - make a "server_name" string
* return: packed name
* server_name(in) : server name
* name_length(out) : length of packed name
* log_path(in) : log path
* copylogdbyn(in) : true if copylogdb
*
* Note:
* make a "server_name" string to connect to master
* server_name = server_type ( # ) +
* server_name +
* release_string +
* env_name + ($CUBRID path)
* pid_string (process id)
*/
static char *
hb_pack_server_name (const char *server_name, int *name_length, const char *log_path, HB_PROC_TYPE type)
{
char *packed_name = NULL;
const char *env_name = NULL;
char pid_string[16];
int n_len, l_len, r_len, e_len, p_len;
if (server_name != NULL)
{
env_name = envvar_root ();
if (env_name == NULL)
{
return NULL;
}
/* here we changed the 2nd string in packed_name from rel_release_string() to rel_major_release_string() solely
* for the purpose of matching the name of the CUBRID driver. */
snprintf (pid_string, sizeof (pid_string), "%d", getpid ());
n_len = (int) strlen (server_name) + 1;
l_len = (log_path) ? (int) strlen (log_path) + 1 : 0;
r_len = (int) strlen (rel_major_release_string ()) + 1;
e_len = (int) strlen (env_name) + 1;
p_len = (int) strlen (pid_string) + 1;
*name_length = n_len + l_len + r_len + e_len + p_len + 5;
packed_name = (char *) malloc (*name_length);
if (packed_name == NULL)
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, (*name_length));
return NULL;
}
if (type == HB_PTYPE_COPYLOGDB)
{
packed_name[0] = '$';
}
else if (type == HB_PTYPE_APPLYLOGDB)
{
packed_name[0] = '%';
}
else
{
assert (0);
free_and_init (packed_name);
return NULL;
}
memcpy (packed_name + 1, server_name, n_len);
if (l_len)
{
packed_name[(1 + n_len) - 1] = ':';
memcpy (packed_name + 1 + n_len, log_path, l_len);
}
memcpy (packed_name + 1 + n_len + l_len, rel_major_release_string (), r_len);
memcpy (packed_name + 1 + n_len + l_len + r_len, env_name, e_len);
memcpy (packed_name + 1 + n_len + l_len + r_len + e_len, pid_string, p_len);
}
return (packed_name);
}
/*
* hb_connect_to_master() - connect to the master server
* return: conn
* server_name(in): server name
* log_path(in): log path
* copylogdbyn(in):
*/
static CSS_CONN_ENTRY *
hb_connect_to_master (const char *server_name, const char *log_path, HB_PROC_TYPE type)
{
CSS_CONN_ENTRY *conn;
char *packed_name;
int name_length = 0;
packed_name = hb_pack_server_name (server_name, &name_length, log_path, type);
if (packed_name == NULL)
{
return NULL;
}
conn = __gv_cvar.css_connect_to_master_server (prm_get_master_port_id (), packed_name, name_length);
if (conn == NULL)
{
free_and_init (packed_name);
return NULL;
}
hb_Pipe_to_master = conn->fd;
free_and_init (packed_name);
return conn;
}
/*
* hb_create_master_reader () -
* return: NO_ERROR or ER_FAILED
*
* conn(in):
*/
static int
hb_create_master_reader (void)
{
#if !defined (WINDOWS)
int rv;
pthread_attr_t thread_attr;
size_t ts_size;
pthread_t master_reader_th;
rv = pthread_attr_init (&thread_attr);
if (rv != 0)
{
er_set_with_oserror (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_CSS_PTHREAD_ATTR_INIT, 0);
return ER_CSS_PTHREAD_ATTR_INIT;
}
rv = pthread_attr_setdetachstate (&thread_attr, PTHREAD_CREATE_DETACHED);
if (rv != 0)
{
er_set_with_oserror (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_CSS_PTHREAD_ATTR_SETDETACHSTATE, 0);
return ER_CSS_PTHREAD_ATTR_SETDETACHSTATE;
}
#if defined(AIX)
/* AIX's pthread is slightly different from other systems. Its performance highly depends on the pthread's scope and
* it's related kernel parameters. */
rv =
pthread_attr_setscope (&thread_attr,
prm_get_bool_value (PRM_ID_PTHREAD_SCOPE_PROCESS) ? PTHREAD_SCOPE_PROCESS :
PTHREAD_SCOPE_SYSTEM);
#else /* AIX */
rv = pthread_attr_setscope (&thread_attr, PTHREAD_SCOPE_SYSTEM);
#endif /* AIX */
if (rv != 0)
{
er_set_with_oserror (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_CSS_PTHREAD_ATTR_SETSCOPE, 0);
return ER_CSS_PTHREAD_ATTR_SETSCOPE;
}
#if defined(_POSIX_THREAD_ATTR_STACKSIZE)
rv = pthread_attr_getstacksize (&thread_attr, &ts_size);
if (ts_size != (size_t) prm_get_bigint_value (PRM_ID_THREAD_STACKSIZE))
{
rv = pthread_attr_setstacksize (&thread_attr, prm_get_bigint_value (PRM_ID_THREAD_STACKSIZE));
if (rv != 0)
{
er_set_with_oserror (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_CSS_PTHREAD_ATTR_SETSTACKSIZE, 0);
return ER_CSS_PTHREAD_ATTR_SETSTACKSIZE;
}
}
#endif /* _POSIX_THREAD_ATTR_STACKSIZE */
rv = pthread_create (&master_reader_th, &thread_attr, hb_thread_master_reader, (void *) NULL);
if (rv != 0)
{
er_set_with_oserror (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_CSS_PTHREAD_CREATE, 0);
return ER_CSS_PTHREAD_CREATE;
}
return (NO_ERROR);
#else
return ER_FAILED;
#endif
}
/*
* hb_process_init () -
* return: NO_ERROR or ER_FAILED
*
* server_name(in):
* log_path(in):
* copylogdbyn(in):
*/
int
hb_process_init (const char *server_name, const char *log_path, HB_PROC_TYPE type)
{
#if !defined(SERVER_MODE)
// *INDENT-OFF*
static std::atomic <bool> is_first{true};
static std::mutex init_mtx;
int error = NO_ERROR;
if (is_first.load (std::memory_order_acquire) == false)
{
return (NO_ERROR);
}
std::lock_guard <std::mutex> lock (init_mtx);
// *INDENT-ON*
if (is_first.load (std::memory_order_relaxed) == false)
{
return (NO_ERROR);
}
er_log_debug (ARG_FILE_LINE, "hb_process_init. (type:%s). \n", hb_type_to_str (type));
if (hb_Exec_path[0] == '\0' || *(hb_Argv) == 0)
{
er_log_debug (ARG_FILE_LINE, "hb_Exec_path or hb_Argv is not set. \n");
return (ER_FAILED);
}
hb_Conn = hb_connect_to_master (server_name, log_path, type);
/* wait 1 sec */
sleep (1);
error = hb_register_to_master (hb_Conn, type);
if (NO_ERROR != error)
{
er_log_debug (ARG_FILE_LINE, "hb_register_to_master failed. \n");
return (error);
}
error = hb_create_master_reader ();
if (NO_ERROR != error)
{
er_log_debug (ARG_FILE_LINE, "hb_create_master_reader failed. \n");
return (error);
}
is_first.store (false, std::memory_order_release);
return (NO_ERROR);
#else
return (ER_FAILED);
#endif
}
/*
* hb_process_term () -
* return: none
*
* type(in):
*/
void
hb_process_term (void)
{
if (hb_Conn)
{
css_shutdown_conn (hb_Conn);
hb_Conn = NULL;
}
hb_Proc_shutdown = true;
}
#endif // #if defined(CS_MODE)
/*
* hb_node_state_string -
* return: node state sring
*
* nstate(in):
*/
const char *
hb_node_state_string (HB_NODE_STATE_TYPE nstate)
{
switch (nstate)
{
case HB_NSTATE_UNKNOWN:
return HB_NSTATE_UNKNOWN_STR;
case HB_NSTATE_SLAVE:
return HB_NSTATE_SLAVE_STR;
case HB_NSTATE_TO_BE_MASTER:
return HB_NSTATE_TO_BE_MASTER_STR;
case HB_NSTATE_TO_BE_SLAVE:
return HB_NSTATE_TO_BE_SLAVE_STR;
case HB_NSTATE_MASTER:
return HB_NSTATE_MASTER_STR;
case HB_NSTATE_REPLICA:
return HB_NSTATE_REPLICA_STR;
default:
return "invalid";
}
}