File server_support.c¶
File List > connection > server_support.c
Go to the documentation of this file
/*
* Copyright 2008 Search Solution Corporation
* Copyright 2016 CUBRID Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
/*
* server_support.c - server interface
*/
#ident "$Id$"
#include "server_support.h"
#include "config.h"
#include "load_worker_manager.hpp"
#include "log_append.hpp"
#include "session.h"
#include "thread_entry_task.hpp"
#include "thread_entry.hpp"
#include "thread_manager.hpp"
#include "master_connector.hpp"
#include "connection_pool.hpp"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#if !defined(WINDOWS)
#include <signal.h>
#include <unistd.h>
#if defined(SOLARIS)
#include <sys/filio.h>
#endif /* SOLARIS */
#include <sys/socket.h>
#include <fcntl.h>
#include <netinet/in.h>
#endif /* !WINDOWS */
#include <assert.h>
#include "porting.h"
#include "memory_alloc.h"
#include "boot_sr.h"
#include "connection_defs.h"
#include "connection_globals.h"
#include "release_string.h"
#include "system_parameter.h"
#include "environment_variable.h"
#include "error_manager.h"
#include "connection_error.h"
#include "message_catalog.h"
#include "critical_section.h"
#include "lock_manager.h"
#include "log_lsa.hpp"
#include "log_manager.h"
#include "network.h"
#include "object_representation.h"
#include "pl_sr.h"
#include "show_scan.h"
#if defined(WINDOWS)
#include "wintcp.h"
#else /* WINDOWS */
#include "tcp.h"
#endif /* WINDOWS */
#include "connection_sr.h"
#include "xserver_interface.h"
#include "utility.h"
#include "vacuum.h"
#if !defined(WINDOWS)
#include "heartbeat.h"
#endif
#include "dbtype.h"
// XXX: SHOULD BE THE LAST INCLUDE HEADER
#include "memory_wrapper.hpp"
#if !defined (SERVER_MODE)
#error Belongs to server module
#endif /* !defined (SERVER_MODE) */
#define CSS_WAIT_COUNT 5 /* # of retry to connect to master */
#define CSS_GOING_DOWN_IMMEDIATELY "Server going down immediately"
#if defined(WINDOWS)
#define SockError SOCKET_ERROR
#else /* WINDOWS */
#define SockError -1
#endif /* WINDOWS */
#define RMUTEX_NAME_TEMP_CONN_ENTRY "TEMP_CONN_ENTRY"
static bool css_Server_shutdown_inited = false;
static struct timeval css_Shutdown_timeout = { 0, 0 };
static char *css_Master_server_name = NULL; /* database identifier */
static int css_Master_port_id;
static CSS_CONN_ENTRY *css_Master_conn;
static IP_INFO *css_Server_accessible_ip_info;
static char *ip_list_file_name = NULL;
static char ip_file_real_path[PATH_MAX];
/* internal request hander function */
static int (*css_Server_request_handler) (THREAD_ENTRY *, unsigned int, int, int, char *);
/* server's state for HA feature */
static HA_SERVER_STATE ha_Server_state = HA_SERVER_STATE_IDLE;
static bool ha_Repl_delay_detected = false;
static int ha_Server_num_of_hosts = 0;
#define HA_LOG_APPLIER_STATE_TABLE_MAX 5
typedef struct ha_log_applier_state_table HA_LOG_APPLIER_STATE_TABLE;
struct ha_log_applier_state_table
{
int client_id;
HA_LOG_APPLIER_STATE state;
};
static HA_LOG_APPLIER_STATE_TABLE ha_Log_applier_state[HA_LOG_APPLIER_STATE_TABLE_MAX] = {
{-1, HA_LOG_APPLIER_STATE_NA},
{-1, HA_LOG_APPLIER_STATE_NA},
{-1, HA_LOG_APPLIER_STATE_NA},
{-1, HA_LOG_APPLIER_STATE_NA},
{-1, HA_LOG_APPLIER_STATE_NA}
};
static int ha_Log_applier_state_num = 0;
// *INDENT-OFF*
static cubthread::stats_worker_pool_type *css_Server_request_worker_pool = NULL;
class css_server_task : public cubthread::entry_task
{
public:
css_server_task (void) = delete;
css_server_task (CSS_CONN_ENTRY &conn)
: m_conn (conn)
{
}
void execute (context_type &thread_ref) override final;
// retire not overwritten; task is automatically deleted
private:
CSS_CONN_ENTRY &m_conn;
};
// css_server_external_task - class used for legacy desgin; external modules may push tasks on css worker pool and we
// need to make sure conn_entry is properly initialized.
//
// TODO: remove me
class css_server_external_task : public cubthread::entry_task
{
public:
css_server_external_task (void) = delete;
css_server_external_task (CSS_CONN_ENTRY *conn, cubthread::entry_task *task)
: m_conn (conn)
, m_task (task)
{
}
~css_server_external_task (void)
{
m_task->retire ();
}
void execute (context_type &thread_ref) override final;
// retire not overwritten; task is automatically deleted
private:
CSS_CONN_ENTRY *m_conn;
cubthread::entry_task *m_task;
};
static const size_t CSS_JOB_QUEUE_SCAN_COLUMN_COUNT = 4;
static void css_set_shutdown_timeout (int timeout);
static int css_get_master_request (SOCKET master_fd);
static void css_process_shutdown_request (SOCKET master_fd);
static int css_internal_request_handler (THREAD_ENTRY & thread_ref, CSS_CONN_ENTRY & conn_ref);
static int css_test_for_client_errors (CSS_CONN_ENTRY * conn, unsigned int eid);
static unsigned int css_enqueue_and_notify (cubconn::connection::worker::queue_type type,
cubconn::connection::worker::message &&item, int wait_time = 0);
static bool css_check_ha_log_applier_done (void);
static bool css_check_ha_log_applier_working (void);
static void css_stop_non_log_writer (THREAD_ENTRY & thread_ref, bool &, THREAD_ENTRY & stopper_thread_ref);
static void css_stop_log_writer (THREAD_ENTRY & thread_ref, bool &);
static void css_find_not_stopped (THREAD_ENTRY & thread_ref, bool & stop, bool is_log_writer, bool & found);
static bool css_is_log_writer (const THREAD_ENTRY & thread_arg);
static void css_stop_all_workers (THREAD_ENTRY & thread_ref, css_thread_stop_type stop_phase);
static void css_wp_worker_get_busy_count_mapper (THREAD_ENTRY & thread_ref, bool & stop_mapper, int &busy_count);
// cubthread::stats_worker_pool_type::core_impl confuses indent
static void css_wp_core_job_scan_mapper (const cubthread::stats_worker_pool_type::core_impl & wp_core, bool & stop_mapper,
THREAD_ENTRY * thread_p, SHOWSTMT_ARRAY_CONTEXT * ctx, size_t & core_index,
int & error_code);
static void
css_is_any_thread_not_suspended_mapfunc (THREAD_ENTRY & thread_ref, bool & stop_mapper, size_t & count, bool & found);
static void
css_count_transaction_worker_threads_mapfunc (THREAD_ENTRY & thread_ref, bool & stop_mapper,
THREAD_ENTRY * caller_thread, int tran_index, int client_id,
size_t & count);
static HA_SERVER_STATE css_transit_ha_server_state (THREAD_ENTRY * thread_p, HA_SERVER_STATE req_state);
static bool css_get_server_request_thread_pooling_configuration (void);
static cubthread::wait_seconds css_get_server_request_thread_timeout_configuration (void);
static void css_start_all_threads (void);
// *INDENT-ON*
#if defined (SERVER_MODE)
/*
* css_job_queues_start_scan() - start scan function for 'SHOW JOB QUEUES'
* return: NO_ERROR, or ER_code
* thread_p(in): thread entry
* show_type(in):
* arg_values(in):
* arg_cnt(in):
* ptr(in/out): 'show job queues' context
*
* NOTE: job queues don't really exist anymore, at least not the way SHOW JOB QUEUES statement was created for.
* we now have worker pool "cores" that act as partitions of workers and queued tasks.
* for backward compatibility, the statement is not changed; only its columns are reinterpreted
* 1. job queue index => core index
* 2. job queue max workers => core max workers
* 3. job queue busy workers => core busy workers
* 4. job queue connection workers => 0 // connection workers are separated in a different worker pool
*/
int
css_job_queues_start_scan (THREAD_ENTRY * thread_p, int show_type, DB_VALUE ** arg_values, int arg_cnt, void **ptr)
{
int error = NO_ERROR;
SHOWSTMT_ARRAY_CONTEXT *ctx = NULL;
*ptr = NULL;
ctx = showstmt_alloc_array_context (thread_p, (int) css_Server_request_worker_pool->get_core_count (),
(int) CSS_JOB_QUEUE_SCAN_COLUMN_COUNT);
if (ctx == NULL)
{
ASSERT_ERROR_AND_SET (error);
return error;
}
size_t core_index = 0; // core index starts with 0
css_Server_request_worker_pool->map_cores (css_wp_core_job_scan_mapper, thread_p, ctx, core_index, error);
if (error != NO_ERROR)
{
ASSERT_ERROR ();
showstmt_free_array_context (thread_p, ctx);
return error;
}
*ptr = ctx;
return NO_ERROR;
}
#endif // SERVER_MODE
/*
* css_check_conn() -
* return:
* p(in):
*/
int
css_check_conn (CSS_CONN_ENTRY * p)
{
#if defined(WINDOWS)
u_long status = 0;
#else
int status = 0;
#endif
#if defined(WINDOWS)
if (ioctlsocket (p->fd, FIONREAD, &status) == SockError || p->status != CONN_OPEN)
{
return ER_FAILED;
}
#else /* WINDOWS */
if (fcntl (p->fd, F_GETFL, status) < 0 || p->status != CONN_OPEN)
{
return ER_FAILED;
}
#endif /* WINDOWS */
return NO_ERROR;
}
/*
* css_set_shutdown_timeout() -
* return:
* timeout(in):
*/
static void
css_set_shutdown_timeout (int timeout)
{
if (gettimeofday (&css_Shutdown_timeout, NULL) == 0)
{
css_Shutdown_timeout.tv_sec += timeout;
}
return;
}
/*
* css_get_master_request () -
* return:
* master_fd(in):
*/
static int
css_get_master_request (SOCKET master_fd)
{
int request, r;
r = css_readn (master_fd, (char *) &request, sizeof (int), -1);
if (r == sizeof (int))
{
return ((int) ntohl (request));
}
else
{
return (-1);
}
}
/*
* css_process_shutdown_request () -
* return:
* master_fd(in):
*/
static void
css_process_shutdown_request (SOCKET master_fd)
{
char buffer[MASTER_TO_SRV_MSG_SIZE];
int r, timeout;
timeout = (int) css_get_master_request (master_fd);
r = css_readn (master_fd, buffer, MASTER_TO_SRV_MSG_SIZE, -1);
if (r < 0)
{
er_set_with_oserror (ER_ERROR_SEVERITY, ARG_FILE_LINE, ERR_CSS_SHUTDOWN_ERROR, 0);
return;
}
}
/*
* css_is_shutdown_timeout_expired () -
* return:
*/
bool
css_is_shutdown_timeout_expired (void)
{
struct timeval timeout;
/* css_Shutdown_timeout is set by shutdown request */
if (css_Shutdown_timeout.tv_sec != 0 && gettimeofday (&timeout, NULL) == 0)
{
if (css_Shutdown_timeout.tv_sec <= timeout.tv_sec)
{
return true;
}
}
return false;
}
/*
* css_get_shutdown_timeout () -
* return:
*/
struct timeval *
css_get_shutdown_timeout (void)
{
return &css_Shutdown_timeout;
}
/*
* css_block_all_active_conn() - Before shutdown, stop all server thread
* return:
*
* Note: All communication will be stopped
*/
void
css_block_all_active_conn (unsigned short stop_phase)
{
CSS_CONN_ENTRY *conn;
int r;
START_EXCLUSIVE_ACCESS_ACTIVE_CONN_ANCHOR (r);
for (conn = css_Active_conn_anchor; conn != NULL; conn = conn->next)
{
r = rmutex_lock (NULL, &conn->rmutex);
assert (r == NO_ERROR);
if (conn->stop_phase != stop_phase)
{
r = rmutex_unlock (NULL, &conn->rmutex);
assert (r == NO_ERROR);
continue;
}
css_end_server_request (conn);
if (!IS_INVALID_SOCKET (conn->fd) && conn->fd != css_Pipe_to_master)
{
conn->stop_talk = true;
logtb_set_tran_index_interrupt (NULL, conn->get_tran_index (), 1);
}
r = rmutex_unlock (NULL, &conn->rmutex);
assert (r == NO_ERROR);
}
END_EXCLUSIVE_ACCESS_ACTIVE_CONN_ANCHOR (r);
}
/*
* css_internal_request_handler() -
* return:
* arg(in):
*
* Note: This routine is "registered" to be called when a new request is
* initiated by the client.
*
* To now support multiple concurrent requests from the same client,
* check if a request is actually sent on the socket. If data was sent
* (not a request), then just return and the scheduler will wake up the
* thread that is blocking for data.
*/
static int
css_internal_request_handler (THREAD_ENTRY & thread_ref, CSS_CONN_ENTRY & conn_ref)
{
unsigned short rid;
unsigned int eid;
int request, rc, size = 0;
char *buffer = NULL;
int local_tran_index;
int status = CSS_UNPLANNED_SHUTDOWN;
assert (thread_ref.conn_entry == &conn_ref);
local_tran_index = thread_ref.tran_index;
rc = css_receive_request (&conn_ref, &rid, &request, &size);
if (rc == NO_ERRORS)
{
/* 1. change thread's transaction id to this connection's */
thread_ref.tran_index = conn_ref.get_tran_index ();
pthread_mutex_unlock (&thread_ref.tran_index_lock);
if (size)
{
rc = css_receive_data (&conn_ref, rid, &buffer, &size, -1);
if (rc != NO_ERRORS)
{
return status;
}
}
conn_ref.db_error = 0; /* This will reset the error indicator */
eid = css_return_eid_from_conn (&conn_ref, rid);
/* 2. change thread's client, rid, tran_index for this request */
css_set_thread_info (&thread_ref, conn_ref.client_id, eid, conn_ref.get_tran_index (), request);
/* 3. Call server_request() function */
status = css_Server_request_handler (&thread_ref, eid, request, size, buffer);
/* 4. reset thread transaction id(may be NULL_TRAN_INDEX) */
css_set_thread_info (&thread_ref, -1, 0, local_tran_index, -1);
}
else
{
pthread_mutex_unlock (&thread_ref.tran_index_lock);
if (rc == ERROR_WHEN_READING_SIZE || rc == NO_DATA_AVAILABLE)
{
status = CSS_NO_ERRORS;
}
}
return status;
}
/*
* css_initialize_server_interfaces() - initialize the server interfaces
* return:
* request_handler(in):
* thrd(in):
* eid(in):
* request(in):
* size(in):
* buffer(in):
*/
void
css_initialize_server_interfaces (int (*request_handler) (THREAD_ENTRY * thrd, unsigned int eid, int request,
int size, char *buffer))
{
css_Server_request_handler = request_handler;
}
bool
css_is_shutdowning_server ()
{
return css_Server_shutdown_inited;
}
void
css_start_shutdown_server ()
{
css_Server_shutdown_inited = true;
css_set_shutdown_timeout (prm_get_integer_value (PRM_ID_SHUTDOWN_WAIT_TIME_IN_SECS));
}
/*
* css_init() -
* return:
* thread_p(in):
* server_name(in):
* name_length(in):
* port_id(in):
*
* Note: This routine is the entry point for the server interface. Once this
* routine is called, control will not return to the caller until the
* server/scheduler is stopped. Please call
* css_initialize_server_interfaces before calling this function.
*/
// *INDENT-OFF*
REGISTER_WORKERPOOL (transaction, []() { return (int) prm_get_integer_value (PRM_ID_TASK_WORKER); });
// *INDENT-ON*
int
css_init (THREAD_ENTRY * thread_p, char *server_name, int name_length, int port_id)
{
cubconn::master::connector connector;
cubconn::connection::pool connections;
std::size_t task_group, task_worker;
std::size_t max_connection_workers, min_connection_workers;
std::string name;
int status = NO_ERROR;
if (server_name == NULL || port_id <= 0)
{
return ER_FAILED;
}
name = std::string (server_name, name_length);
// initialize worker pool for server requests
#define MAX_WORKERS css_get_max_workers ()
#define MAX_TASK_COUNT css_get_max_task_count ()
#define MAX_CONNECTIONS css_get_max_connections ()
task_group = (int) prm_get_integer_value (PRM_ID_TASK_GROUP);
task_worker = (int) prm_get_integer_value (PRM_ID_TASK_WORKER);
max_connection_workers = (int) prm_get_integer_value (PRM_ID_CSS_MAX_CONNECTION_WORKER);
min_connection_workers = (int) prm_get_integer_value (PRM_ID_CSS_MIN_CONNECTION_WORKER);
// create request worker pool
css_Server_request_worker_pool =
thread_create_stats_worker_pool (task_worker, task_group, "transaction", thread_get_entry_manager (),
css_get_server_request_thread_pooling_configuration (),
css_get_server_request_thread_timeout_configuration ());
// m_log = cubthread::is_logging_configured (cubthread::LOG_WORKER_POOL_TRAN_WORKERS)
if (css_Server_request_worker_pool == NULL)
{
assert (false);
er_set (ER_FATAL_ERROR_SEVERITY, ARG_FILE_LINE, ER_GENERIC_ERROR, 0);
status = ER_FAILED;
goto shutdown;
}
/* initialize epoll worker pool */
connections.initialize (MAX_CONNECTIONS, max_connection_workers, min_connection_workers);
/* attach thread entry */
connector.attach (*thread_p);
/* attach pool */
connector.attach (connections);
/* handshake and dispatch connection */
connector.run (port_id, name);
shutdown:
/*
* start to shutdown server
*/
css_start_shutdown_server ();
connector.stop ();
connections.finalize ();
// stop threads; in first phase we need to stop active workers, but keep log writers for a while longer to make sure
// all log is transfered
css_stop_all_workers (*thread_p, THREAD_STOP_WORKERS_EXCEPT_LOGWR);
/* stop vacuum threads. */
vacuum_stop_workers (thread_p);
// stop load sessions
cubload::worker_manager_stop_all ();
/* we should flush all append pages before stop log writer */
logpb_force_flush_pages (thread_p);
#if !defined(NDEBUG)
/* All active transaction and vacuum workers should have been stopped. Only system transactions are still running. */
assert (!log_prior_has_worker_log_records (thread_p));
#endif
// stop log writers
css_stop_all_workers (*thread_p, THREAD_STOP_LOGWR);
if (prm_get_bool_value (PRM_ID_STATS_ON))
{
perfmon_er_log_current_stats (thread_p);
}
css_Server_request_worker_pool->er_log_stats ();
// destroy thread worker pools
thread_get_manager ()->destroy_worker_pool (css_Server_request_worker_pool);
/* If this was opened for the new style connection protocol, make sure it gets closed. */
css_close_server_connection_socket ();
return status;
}
// *INDENT-OFF*
/*
* css_enqueue_and_notify () - enqueue the request and notify to worker
* return:
* type (in): queue to be inserted
* item (in): request
*/
static unsigned int
css_enqueue_and_notify (cubconn::connection::worker::queue_type type, cubconn::connection::worker::message &&item,
int wait_time)
{
CSS_CONN_ENTRY * conn;
int r;
assert (item.conn);
conn = item.conn;
/* lock to access worker and context */
r = rmutex_lock (NULL, &conn->cmutex);
assert (r == NO_ERROR);
if (conn->worker == nullptr || conn->context == nullptr)
{
/* unlock */
r = rmutex_unlock (NULL, &conn->cmutex);
assert (r == NO_ERROR);
if (item.deleter)
{
item.deleter ();
}
return 0;
}
auto func =[conn] ()noexcept {
/* unlock */
rmutex_unlock (NULL, &conn->cmutex);
};
if (!conn->worker->enqueue_and_notify (type, std::move (item), func, wait_time))
{
return INTERNAL_CSS_ERROR;
}
return 0;
}
// *INDENT-ON*
/*
* css_send_data_to_client() - send a data buffer to the server
* return:
* eid(in): enquiry id
* buffer(in): data buffer to queue for expected data.
* buffer_size(in): size of data buffer
*
* Note: This is to be used ONLY by the server to return data to the client
*/
unsigned int
css_send_data_to_client (CSS_CONN_ENTRY * conn, unsigned int eid, char *buffer, int buffer_size, int wait_time)
{
// *INDENT-OFF*
cubconn::connection::worker::message request;
NET_HEADER *mem_header;
std::byte * mem_reply = nullptr;
assert (conn != NULL);
rmutex_lock (NULL, &conn->rmutex);
if (conn->status == CONN_CLOSED)
{
rmutex_unlock (NULL, &conn->rmutex);
return CONNECTION_CLOSED;
}
rmutex_unlock (NULL, &conn->rmutex);
request.type = cubconn::connection::worker::message_type::SEND_PACKET;
request.conn = conn;
request.packet.clear ();
/* header */
mem_header = new NET_HEADER {};
css_set_net_header (mem_header, DATA_TYPE, 0, CSS_RID_FROM_EID (eid), buffer_size, conn->get_tran_index (),
conn->invalidate_snapshot, conn->db_error);
request.packet.emplace_back ((std::byte *) mem_header, sizeof (NET_HEADER));
if (buffer && buffer_size > 0)
{
/* reply */
mem_reply = new std::byte[buffer_size];
std::memcpy (mem_reply, buffer, buffer_size);
}
request.packet.emplace_back (mem_reply, (std::size_t) buffer_size);
/* deleter */
request.deleter =[mem_header, mem_reply] () noexcept
{
delete mem_header;
if (mem_reply)
{
delete[] mem_reply;
}
};
// *INDENT-ON*
return css_enqueue_and_notify (cubconn::connection::worker::queue_type::IMMEDIATE, std::move (request), wait_time);
}
unsigned int
css_send_reply_and_data_to_client_direct (CSS_CONN_ENTRY * conn, unsigned int eid, char *reply, int reply_size,
char *buffer, int buffer_size)
{
int rc = 0;
assert (conn != NULL);
if (buffer_size > 0 && buffer != NULL)
{
rc = css_send_two_data (conn, CSS_RID_FROM_EID (eid), reply, reply_size, buffer, buffer_size);
}
else
{
rc = css_send_data (conn, CSS_RID_FROM_EID (eid), reply, reply_size);
}
return (rc == NO_ERRORS) ? NO_ERROR : rc;
}
/*
* css_send_reply_and_data_to_client() - send a reply to the server,
* and optionaly, an additional data
* buffer
* return:
* eid(in): enquiry id
* reply(in): the reply data (error or no error)
* reply_size(in): the size of the reply data.
* buffer(in): data buffer to queue for expected data.
* buffer_size(in): size of data buffer
*
* Note: This is to be used only by the server
*/
unsigned int
css_send_reply_and_data_to_client (CSS_CONN_ENTRY * conn, unsigned int eid, char *reply, int reply_size, char *buffer,
int buffer_size, std::function < void () > &&deleter)
{
// *INDENT-OFF*
cubconn::connection::worker::message request;
NET_HEADER *mem_header[2] = { nullptr, nullptr };
std::byte * mem_reply = nullptr;
assert (conn != NULL);
assert (!!buffer == !!buffer_size);
rmutex_lock (NULL, &conn->rmutex);
if (conn->status != CONN_OPEN)
{
rmutex_unlock (NULL, &conn->rmutex);
if (deleter)
{
deleter ();
}
return CONNECTION_CLOSED;
}
rmutex_unlock (NULL, &conn->rmutex);
request.type = cubconn::connection::worker::message_type::SEND_PACKET;
request.conn = conn;
request.packet.clear ();
/* reply */
mem_header[0] = new NET_HEADER {};
css_set_net_header (mem_header[0], DATA_TYPE, 0, CSS_RID_FROM_EID (eid), reply_size, conn->get_tran_index (),
conn->invalidate_snapshot, conn->db_error);
request.packet.emplace_back (reinterpret_cast < std::byte * >(mem_header[0]), sizeof (NET_HEADER));
if (reply && reply_size > 0)
{
mem_reply = new std::byte[reply_size];
std::memcpy (mem_reply, reply, reply_size);
}
request.packet.emplace_back (mem_reply, (std::size_t) reply_size);
/* data */
if (buffer && buffer_size > 0)
{
mem_header[1] = new NET_HEADER {};
css_set_net_header (mem_header[1], DATA_TYPE, 0, CSS_RID_FROM_EID (eid), buffer_size, conn->get_tran_index (),
conn->invalidate_snapshot, conn->db_error);
request.packet.emplace_back (reinterpret_cast < std::byte * >(mem_header[1]), sizeof (NET_HEADER));
request.packet.emplace_back (reinterpret_cast < std::byte * >(buffer), static_cast < std::size_t > (buffer_size));
}
/* deleter */
request.deleter =[header1 = mem_header[0],
header2 = mem_header[1], body1 = mem_reply, deleter = std::move (deleter)] () noexcept
{
delete header1;
if (header2)
{
delete header2;
}
if (body1)
{
delete[] body1;
}
if (deleter)
{
deleter ();
}
};
// *INDENT-ON*
return css_enqueue_and_notify (cubconn::connection::worker::queue_type::IMMEDIATE, std::move (request));
}
#if 0
/*
* css_send_reply_and_large_data_to_client() - send a reply to the server,
* and optionaly, an additional l
* large data
* buffer
* return:
* eid(in): enquiry id
* reply(in): the reply data (error or no error)
* reply_size(in): the size of the reply data.
* buffer(in): data buffer to queue for expected data.
* buffer_size(in): size of data buffer
*
* Note: This is to be used only by the server
*/
unsigned int
css_send_reply_and_large_data_to_client (unsigned int eid, char *reply, int reply_size, char *buffer, INT64 buffer_size)
{
CSS_CONN_ENTRY *conn;
int rc = 0;
int idx = CSS_ENTRYID_FROM_EID (eid);
int num_buffers;
char **buffers;
int *buffers_size, i;
INT64 pos = 0;
conn = &css_Conn_array[idx];
if (buffer_size > 0 && buffer != NULL)
{
num_buffers = (int) (buffer_size / INT_MAX) + 2;
buffers = (char **) malloc (sizeof (char *) * num_buffers);
if (buffers == NULL)
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, sizeof (char *) * num_buffers);
return ER_OUT_OF_VIRTUAL_MEMORY;
}
buffers_size = (int *) malloc (sizeof (int) * num_buffers);
if (buffers_size == NULL)
{
free (buffers);
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, sizeof (int) * num_buffers);
return ER_OUT_OF_VIRTUAL_MEMORY;
}
buffers[0] = reply;
buffers_size[0] = reply_size;
for (i = 1; i < num_buffers; i++)
{
buffers[i] = &buffer[pos];
if (buffer_size > INT_MAX)
{
buffers_size[i] = INT_MAX;
}
else
{
buffers_size[i] = buffer_size;
}
pos += buffers_size[i];
}
rc = css_send_large_data (conn, CSS_RID_FROM_EID (eid), (const char **) buffers, buffers_size, num_buffers);
free (buffers);
free (buffers_size);
}
else
{
rc = css_send_data (conn, CSS_RID_FROM_EID (eid), reply, reply_size);
}
return (rc == NO_ERRORS) ? NO_ERROR : rc;
}
#endif
/*
* css_send_reply_and_2_data_to_client() - send a reply to the server,
* and optionaly, an additional data
* buffer
* return:
* eid(in): enquiry id
* reply(in): the reply data (error or no error)
* reply_size(in): the size of the reply data.
* buffer1(in): data buffer to queue for expected data.
* buffer1_size(in): size of data buffer
* buffer2(in): data buffer to queue for expected data.
* buffer2_size(in): size of data buffer
*
* Note: This is to be used only by the server
*/
unsigned int
css_send_reply_and_2_data_to_client (CSS_CONN_ENTRY * conn, unsigned int eid, char *reply, int reply_size,
char *buffer1, int buffer1_size, char *buffer2, int buffer2_size,
std::function < void () > &&deleter)
{
// *INDENT-OFF*
cubconn::connection::worker::message request;
NET_HEADER *mem_header[3] = { nullptr, nullptr, nullptr };
std::byte * mem_reply = nullptr;
assert (conn != NULL);
assert (reply && reply_size > 0);
assert (!!buffer1 == !!buffer1_size);
assert (!!buffer2 == !!buffer2_size);
rmutex_lock (NULL, &conn->rmutex);
if (conn->status != CONN_OPEN)
{
rmutex_unlock (NULL, &conn->rmutex);
if (deleter)
{
deleter ();
}
return CONNECTION_CLOSED;
}
rmutex_unlock (NULL, &conn->rmutex);
request.type = cubconn::connection::worker::message_type::SEND_PACKET;
request.conn = conn;
request.packet.clear ();
/* reply */
mem_header[0] = new NET_HEADER {};
css_set_net_header (mem_header[0], DATA_TYPE, 0, CSS_RID_FROM_EID (eid), reply_size, conn->get_tran_index (),
conn->invalidate_snapshot, conn->db_error);
request.packet.emplace_back (reinterpret_cast < std::byte * >(mem_header[0]), sizeof (NET_HEADER));
if (reply && reply_size > 0)
{
mem_reply = new std::byte[reply_size];
std::memcpy (mem_reply, reply, reply_size);
}
request.packet.emplace_back (mem_reply, (std::size_t) reply_size);
/* don't refactor! I've split the conditions to make the code easier to read. */
/* these conditions will be optimized at the compiler level. */
/* data1 */
if ((buffer1 && buffer1_size > 0) || (buffer2 && buffer2_size > 0))
{
mem_header[1] = new NET_HEADER {};
css_set_net_header (mem_header[1], DATA_TYPE, 0, CSS_RID_FROM_EID (eid), buffer1_size, conn->get_tran_index (),
conn->invalidate_snapshot, conn->db_error);
request.packet.emplace_back (reinterpret_cast < std::byte * >(mem_header[1]), sizeof (NET_HEADER));
request.packet.emplace_back (reinterpret_cast < std::byte * >(buffer1),
static_cast < std::size_t > (buffer1_size));
}
/* data2 */
if (buffer2 && buffer2_size > 0)
{
mem_header[2] = new NET_HEADER {};
css_set_net_header (mem_header[2], DATA_TYPE, 0, CSS_RID_FROM_EID (eid), buffer2_size, conn->get_tran_index (),
conn->invalidate_snapshot, conn->db_error);
request.packet.emplace_back (reinterpret_cast < std::byte * >(mem_header[2]), sizeof (NET_HEADER));
request.packet.emplace_back (reinterpret_cast < std::byte * >(buffer2),
static_cast < std::size_t > (buffer2_size));
}
/* deleter */
request.deleter =[header1 = mem_header[0],
header2 = mem_header[1],
header3 = mem_header[2], body1 = mem_reply, deleter = std::move (deleter)] () noexcept
{
delete header1;
if (header2)
{
delete header2;
}
if (header3)
{
delete header3;
}
if (body1)
{
delete[]body1;
}
if (deleter)
{
deleter ();
}
};
// *INDENT-ON*
return css_enqueue_and_notify (cubconn::connection::worker::queue_type::IMMEDIATE, std::move (request));
}
/*
* css_send_reply_and_3_data_to_client() - send a reply to the server,
* and optionaly, an additional data
* buffer
* return:
* eid(in): enquiry id
* reply(in): the reply data (error or no error)
* reply_size(in): the size of the reply data.
* buffer1(in): data buffer to queue for expected data.
* buffer1_size(in): size of data buffer
* buffer2(in): data buffer to queue for expected data.
* buffer2_size(in): size of data buffer
* buffer3(in): data buffer to queue for expected data.
* buffer3_size(in): size of data buffer
*
* Note: This is to be used only by the server
*/
unsigned int
css_send_reply_and_3_data_to_client (CSS_CONN_ENTRY * conn, unsigned int eid, char *reply, int reply_size,
char *buffer1, int buffer1_size, char *buffer2, int buffer2_size, char *buffer3,
int buffer3_size, std::function < void () > &&deleter)
{
// *INDENT-OFF*
cubconn::connection::worker::message request;
NET_HEADER *mem_header[4] = { nullptr, nullptr, nullptr, nullptr };
std::byte * mem_reply = nullptr;
assert (conn != NULL);
assert (reply && reply_size > 0);
assert (!!buffer1 == !!buffer1_size);
assert (!!buffer2 == !!buffer2_size);
assert (!!buffer3 == !!buffer3_size);
rmutex_lock (NULL, &conn->rmutex);
if (conn->status == CONN_CLOSED)
{
rmutex_unlock (NULL, &conn->rmutex);
if (deleter)
{
deleter ();
}
return CONNECTION_CLOSED;
}
rmutex_unlock (NULL, &conn->rmutex);
request.type = cubconn::connection::worker::message_type::SEND_PACKET;
request.conn = conn;
request.packet.clear ();
/* reply */
mem_header[0] = new NET_HEADER {};
css_set_net_header (mem_header[0], DATA_TYPE, 0, CSS_RID_FROM_EID (eid), reply_size, conn->get_tran_index (),
conn->invalidate_snapshot, conn->db_error);
request.packet.emplace_back (reinterpret_cast < std::byte * >(mem_header[0]), sizeof (NET_HEADER));
if (reply && reply_size > 0)
{
mem_reply = new std::byte[reply_size];
std::memcpy (mem_reply, reply, reply_size);
}
request.packet.emplace_back (mem_reply, (std::size_t) reply_size);
/* data1 */
if ((buffer1 && buffer1_size > 0) || (buffer2 && buffer2_size > 0) || (buffer3 && buffer3_size > 0))
{
mem_header[1] = new NET_HEADER {};
css_set_net_header (mem_header[1], DATA_TYPE, 0, CSS_RID_FROM_EID (eid), buffer1_size, conn->get_tran_index (),
conn->invalidate_snapshot, conn->db_error);
request.packet.emplace_back (reinterpret_cast < std::byte * >(mem_header[1]), sizeof (NET_HEADER));
request.packet.emplace_back (reinterpret_cast < std::byte * >(buffer1),
static_cast < std::size_t > (buffer1_size));
}
/* data2 */
if ((buffer2 && buffer2_size > 0) || (buffer3 && buffer3_size > 0))
{
mem_header[2] = new NET_HEADER {};
css_set_net_header (mem_header[2], DATA_TYPE, 0, CSS_RID_FROM_EID (eid), buffer2_size, conn->get_tran_index (),
conn->invalidate_snapshot, conn->db_error);
request.packet.emplace_back (reinterpret_cast < std::byte * >(mem_header[2]), sizeof (NET_HEADER));
request.packet.emplace_back (reinterpret_cast < std::byte * >(buffer2),
static_cast < std::size_t > (buffer2_size));
}
/* data3 */
if (buffer3 && buffer3_size > 0)
{
mem_header[3] = new NET_HEADER {};
css_set_net_header (mem_header[3], DATA_TYPE, 0, CSS_RID_FROM_EID (eid), buffer3_size, conn->get_tran_index (),
conn->invalidate_snapshot, conn->db_error);
request.packet.emplace_back (reinterpret_cast < std::byte * >(mem_header[3]), sizeof (NET_HEADER));
request.packet.emplace_back (reinterpret_cast < std::byte * >(buffer3),
static_cast < std::size_t > (buffer3_size));
}
/* deleter */
request.deleter =[header1 = mem_header[0],
header2 = mem_header[1],
header3 = mem_header[2],
header4 = mem_header[3], body1 = mem_reply, deleter = std::move (deleter)] () noexcept
{
delete header1;
if (header2)
{
delete header2;
}
if (header3)
{
delete header3;
}
if (header4)
{
delete header4;
}
if (body1)
{
delete[]body1;
}
if (deleter)
{
deleter ();
}
};
// *INDENT-ON*
return css_enqueue_and_notify (cubconn::connection::worker::queue_type::IMMEDIATE, std::move (request));
}
/*
* css_send_error_to_client() - send an error buffer to the server
* return:
* conn(in): connection entry
* eid(in): enquiry id
* buffer(in): data buffer to queue for expected data.
* buffer_size(in): size of data buffer
*
* Note: This is to be used ONLY by the server to return error data to the
* client.
*/
unsigned int
css_send_error_to_client (CSS_CONN_ENTRY * conn, unsigned int eid, char *buffer, int buffer_size)
{
// *INDENT-OFF*
cubconn::connection::worker::message request;
NET_HEADER *mem_header;
std::byte * mem_reply = nullptr;
assert (conn != NULL);
rmutex_lock (NULL, &conn->rmutex);
if (conn->status == CONN_CLOSED)
{
rmutex_unlock (NULL, &conn->rmutex);
return CONNECTION_CLOSED;
}
rmutex_unlock (NULL, &conn->rmutex);
request.type = cubconn::connection::worker::message_type::SEND_PACKET;
request.conn = conn;
request.packet.clear ();
/* header */
mem_header = new NET_HEADER {};
css_set_net_header (mem_header, ERROR_TYPE, 0, CSS_RID_FROM_EID (eid), buffer_size, conn->get_tran_index (),
conn->invalidate_snapshot, conn->db_error);
request.packet.emplace_back ((std::byte *) mem_header, sizeof (NET_HEADER));
if (buffer && buffer_size > 0)
{
/* reply */
mem_reply = new std::byte[buffer_size];
std::memcpy (mem_reply, buffer, buffer_size);
}
request.packet.emplace_back (mem_reply, (std::size_t) buffer_size);
/* deleter */
request.deleter =[mem_header, mem_reply] () noexcept
{
delete mem_header;
if (mem_reply)
{
delete[] mem_reply;
}
};
// *INDENT-ON*
return css_enqueue_and_notify (cubconn::connection::worker::queue_type::IMMEDIATE, std::move (request));
}
/*
* css_send_abort_to_client() - send an abort message to the client
* return:
* eid(in): enquiry id
*/
unsigned int
css_send_abort_to_client (CSS_CONN_ENTRY * conn, unsigned int eid)
{
// *INDENT-OFF*
cubconn::connection::worker::message request;
NET_HEADER *header;
unsigned short flags = 0;
int r;
assert (conn != NULL);
rmutex_lock (NULL, &conn->rmutex);
if (conn->status != CONN_OPEN)
{
rmutex_unlock (NULL, &conn->rmutex);
return CONNECTION_CLOSED;
}
rmutex_unlock (NULL, &conn->rmutex);
request.type = cubconn::connection::worker::message_type::SEND_PACKET;
request.conn = conn;
request.packet.clear ();
/* header */
header = new NET_HEADER {};
header->type = htonl (ABORT_TYPE);
header->request_id = htonl (CSS_RID_FROM_EID (eid));
header->transaction_id = htonl (conn->get_tran_index ());
if (conn->invalidate_snapshot)
{
flags |= NET_HEADER_FLAG_INVALIDATE_SNAPSHOT;
}
if (conn->in_method)
{
flags |= NET_HEADER_FLAG_METHOD_MODE;
}
header->flags = htons (flags);
header->db_error = htonl (conn->db_error);
request.packet.emplace_back ((std::byte *) header, sizeof (NET_HEADER));
request.deleter =[header] () noexcept
{
delete header;
};
// *INDENT-ON*
if (css_enqueue_and_notify (cubconn::connection::worker::queue_type::IMMEDIATE, std::move (request)) != NO_ERROR)
{
return INTERNAL_CSS_ERROR;
}
/* remove queued packet */
r = rmutex_lock (NULL, &conn->rmutex);
assert (r == NO_ERROR);
css_remove_unexpected_packets (conn, CSS_RID_FROM_EID (eid));
r = rmutex_unlock (NULL, &conn->rmutex);
assert (r == NO_ERROR);
return 0;
}
/*
* css_test_for_client_errors () -
* return: error id from the client
* conn(in):
* eid(in):
*/
static int
css_test_for_client_errors (CSS_CONN_ENTRY * conn, unsigned int eid)
{
char *error_buffer;
int error_size, rc, errid = NO_ERROR;
assert (conn != NULL);
if (css_return_queued_error (conn, CSS_RID_FROM_EID (eid), &error_buffer, &error_size, &rc))
{
errid = er_set_area_error (error_buffer);
conn->release_packet (error_buffer);
}
return errid;
}
/*
* css_receive_data_from_client() - return data that was sent by the server
* return:
* eid(in): enquiry id
* buffer(out): data buffer to send to client.
* buffer_size(out): size of data buffer
*
* note: caller should know that it returns zero on success and
* returns css error code on failure
*/
unsigned int
css_receive_data_from_client (CSS_CONN_ENTRY * conn, unsigned int eid, char **buffer, int *size)
{
return css_receive_data_from_client_with_timeout (conn, eid, buffer, size, -1);
}
/*
* css_receive_data_from_client_with_timeout() - return data that was sent by the server
* return:
* eid(in): enquiry id
* buffer(out): data buffer to send to client.
* buffer_size(out): size of data buffer
* timeout(in): timeout in seconds
*
* note: caller should know that it returns zero on success and
* returns css error code on failure
*/
unsigned int
css_receive_data_from_client_with_timeout (CSS_CONN_ENTRY * conn, unsigned int eid, char **buffer, int *size,
int timeout)
{
int rc = 0;
assert (conn != NULL);
*size = 0;
rc = css_receive_data (conn, CSS_RID_FROM_EID (eid), buffer, size, timeout);
if (rc == NO_ERRORS || rc == RECORD_TRUNCATED)
{
css_test_for_client_errors (conn, eid);
return 0;
}
return rc;
}
/*
* css_end_server_request() - terminates the request from the client
* return:
* conn(in/out):
*/
void
css_end_server_request (CSS_CONN_ENTRY * conn)
{
int r;
r = rmutex_lock (NULL, &conn->rmutex);
assert (r == NO_ERROR);
css_remove_all_unexpected_packets (conn);
conn->status = CONN_CLOSING;
r = rmutex_unlock (NULL, &conn->rmutex);
assert (r == NO_ERROR);
/* no need to make a request to connection thread */
}
/*
* css_pack_server_name() -
* return: a new string containing the server name and the database version
* string
* server_name(in): the name of the database volume
* name_length(out): returned size of the server_name
*
* Note: Builds a character buffer with three embedded strings: the database
* volume name, a string containing the release identifier, and the
* CUBRID environment variable (if exists)
*/
char *
css_pack_server_name (const char *server_name, int *name_length)
{
char *packed_name = NULL;
const char *env_name = NULL;
char pid_string[16], *s;
const char *t;
if (server_name != NULL)
{
env_name = envvar_root ();
if (env_name == NULL)
{
return NULL;
}
/*
* here we changed the 2nd string in packed_name from
* rel_release_string() to rel_major_release_string()
* solely for the purpose of matching the name of the cubrid driver.
* That is, the name of the cubrid driver has been changed to use
* MAJOR_RELEASE_STRING (see drivers/Makefile). So, here we must also
* use rel_major_release_string(), so master can successfully find and
* fork cubrid drivers.
*/
sprintf (pid_string, "%d", getpid ());
*name_length =
(int) (strlen (server_name) + 1 + strlen (rel_major_release_string ()) + 1 + strlen (env_name) + 1 +
strlen (pid_string) + 1);
/* in order to prepend '#' */
if (!HA_DISABLED ())
{
(*name_length)++;
}
packed_name = (char *) malloc (*name_length);
if (packed_name == NULL)
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, (size_t) (*name_length));
return NULL;
}
s = packed_name;
t = server_name;
if (!HA_DISABLED ())
{
*s++ = '#';
}
while (*t)
{
*s++ = *t++;
}
*s++ = '\0';
t = rel_major_release_string ();
while (*t)
{
*s++ = *t++;
}
*s++ = '\0';
t = env_name;
while (*t)
{
*s++ = *t++;
}
*s++ = '\0';
t = pid_string;
while (*t)
{
*s++ = *t++;
}
*s++ = '\0';
}
return packed_name;
}
/*
* css_add_client_version_string() - add the version_string to socket queue
* entry structure
* return: pointer to version_string in the socket queue entry structure
* version_string(in):
*/
char *
css_add_client_version_string (THREAD_ENTRY * thread_p, const char *version_string)
{
char *ver_str = NULL;
CSS_CONN_ENTRY *conn;
assert (thread_p != NULL);
conn = thread_p->conn_entry;
if (conn != NULL)
{
if (conn->version_string == NULL)
{
ver_str = (char *) malloc (strlen (version_string) + 1);
if (ver_str != NULL)
{
strcpy (ver_str, version_string);
conn->version_string = ver_str;
}
else
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1,
(size_t) (strlen (version_string) + 1));
}
}
else
{
/* already registered */
ver_str = conn->version_string;
}
}
return ver_str;
}
#if defined (ENABLE_UNUSED_FUNCTION)
/*
* css_get_client_version_string() - retrieve the version_string from socket
* queue entry structure
* return:
*/
char *
css_get_client_version_string (void)
{
CSS_CONN_ENTRY *entry;
entry = css_get_current_conn_entry ();
if (entry != NULL)
{
return entry->version_string;
}
else
{
return NULL;
}
}
#endif /* ENABLE_UNUSED_FUNCTION */
/*
* css_cleanup_server_queues () -
* return:
* eid(in):
*/
void
css_cleanup_server_queues (unsigned int eid)
{
int idx = CSS_ENTRYID_FROM_EID (eid);
css_remove_all_unexpected_packets (&css_Conn_array[idx]);
}
/*
* css_set_ha_num_of_hosts -
* return: none
*
* Note: be careful to use
*/
void
css_set_ha_num_of_hosts (int num)
{
if (num < 1)
{
num = 1;
}
if (num > HA_LOG_APPLIER_STATE_TABLE_MAX)
{
num = HA_LOG_APPLIER_STATE_TABLE_MAX;
}
ha_Server_num_of_hosts = num - 1;
}
/*
* css_get_ha_num_of_hosts -
* return: return the number of hosts
*
* Note:
*/
int
css_get_ha_num_of_hosts (void)
{
return ha_Server_num_of_hosts;
}
/*
* css_ha_server_state - return the current HA server state
* return: one of HA_SERVER_STATE
*/
HA_SERVER_STATE
css_ha_server_state (void)
{
return ha_Server_state;
}
bool
css_is_ha_repl_delayed (void)
{
return ha_Repl_delay_detected;
}
void
css_set_ha_repl_delayed (void)
{
ha_Repl_delay_detected = true;
}
void
css_unset_ha_repl_delayed (void)
{
ha_Repl_delay_detected = false;
}
/*
* css_transit_ha_server_state - request to transit the current HA server
* state to the required state
* return: new state changed if successful or HA_SERVER_STATE_NA
* req_state(in): the state for the server to transit
*
*/
static HA_SERVER_STATE
css_transit_ha_server_state (THREAD_ENTRY * thread_p, HA_SERVER_STATE req_state)
{
struct ha_server_state_transition_table
{
HA_SERVER_STATE cur_state;
HA_SERVER_STATE req_state;
HA_SERVER_STATE next_state;
};
static struct ha_server_state_transition_table ha_Server_state_transition[] = {
/* idle -> active */
{HA_SERVER_STATE_IDLE, HA_SERVER_STATE_ACTIVE, HA_SERVER_STATE_ACTIVE},
#if 0
/* idle -> to-be-standby */
{HA_SERVER_STATE_IDLE, HA_SERVER_STATE_STANDBY, HA_SERVER_STATE_TO_BE_STANDBY},
#else
/* idle -> standby */
{HA_SERVER_STATE_IDLE, HA_SERVER_STATE_STANDBY, HA_SERVER_STATE_STANDBY},
#endif
/* idle -> maintenance */
{HA_SERVER_STATE_IDLE, HA_SERVER_STATE_MAINTENANCE, HA_SERVER_STATE_MAINTENANCE},
/* active -> active */
{HA_SERVER_STATE_ACTIVE, HA_SERVER_STATE_ACTIVE, HA_SERVER_STATE_ACTIVE},
/* active -> to-be-standby */
{HA_SERVER_STATE_ACTIVE, HA_SERVER_STATE_STANDBY, HA_SERVER_STATE_TO_BE_STANDBY},
/* to-be-active -> active */
{HA_SERVER_STATE_TO_BE_ACTIVE, HA_SERVER_STATE_ACTIVE, HA_SERVER_STATE_ACTIVE},
/* standby -> standby */
{HA_SERVER_STATE_STANDBY, HA_SERVER_STATE_STANDBY, HA_SERVER_STATE_STANDBY},
/* standby -> to-be-active */
{HA_SERVER_STATE_STANDBY, HA_SERVER_STATE_ACTIVE, HA_SERVER_STATE_TO_BE_ACTIVE},
/* statndby -> maintenance */
{HA_SERVER_STATE_STANDBY, HA_SERVER_STATE_MAINTENANCE, HA_SERVER_STATE_MAINTENANCE},
/* to-be-standby -> standby */
{HA_SERVER_STATE_TO_BE_STANDBY, HA_SERVER_STATE_STANDBY, HA_SERVER_STATE_STANDBY},
/* maintenance -> standby */
{HA_SERVER_STATE_MAINTENANCE, HA_SERVER_STATE_STANDBY, HA_SERVER_STATE_TO_BE_STANDBY},
/* end of table */
{HA_SERVER_STATE_NA, HA_SERVER_STATE_NA, HA_SERVER_STATE_NA}
};
struct ha_server_state_transition_table *table;
HA_SERVER_STATE new_state = HA_SERVER_STATE_NA;
if (ha_Server_state == req_state)
{
return req_state;
}
csect_enter (thread_p, CSECT_HA_SERVER_STATE, INF_WAIT);
for (table = ha_Server_state_transition; table->cur_state != HA_SERVER_STATE_NA; table++)
{
if (table->cur_state == ha_Server_state && table->req_state == req_state)
{
er_log_debug (ARG_FILE_LINE, "css_transit_ha_server_state: " "ha_Server_state (%s) -> (%s)\n",
css_ha_server_state_string (ha_Server_state), css_ha_server_state_string (table->next_state));
new_state = table->next_state;
/* append a dummy log record for LFT to wake LWTs up */
log_append_ha_server_state (thread_p, new_state);
if (!HA_DISABLED ())
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_CSS_SERVER_HA_MODE_CHANGE, 2,
css_ha_server_state_string (ha_Server_state), css_ha_server_state_string (new_state));
}
ha_Server_state = new_state;
/* sync up the current HA state with the system parameter */
prm_set_integer_value (PRM_ID_HA_SERVER_STATE, ha_Server_state);
if (ha_Server_state == HA_SERVER_STATE_ACTIVE)
{
log_set_ha_promotion_time (thread_p, ((INT64) time (0)));
css_start_all_threads ();
}
break;
}
}
csect_exit (thread_p, CSECT_HA_SERVER_STATE);
return new_state;
}
/*
* css_check_ha_server_state_for_client
* return: NO_ERROR or errno
* whence(in): 0: others, 1: register_client, 2: unregister_client
*/
int
css_check_ha_server_state_for_client (THREAD_ENTRY * thread_p, int whence)
{
#define FROM_OTHERS 0
#define FROM_REGISTER_CLIENT 1
#define FROM_UNREGISTER_CLIENT 2
int err = NO_ERROR;
HA_SERVER_STATE state;
/* csect_enter (thread_p, CSECT_HA_SERVER_STATE, INF_WAIT); */
switch (ha_Server_state)
{
case HA_SERVER_STATE_TO_BE_ACTIVE:
/* Server accepts clients even though it is in a to-be-active state */
break;
case HA_SERVER_STATE_TO_BE_STANDBY:
/*
* If the server's state is 'to-be-standby',
* new connection request will be rejected for HA fail-back action.
*/
if (whence == FROM_REGISTER_CLIENT)
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ERR_CSS_ERROR_FROM_SERVER, 1,
"Connection rejected. " "The server is changing to standby mode.");
err = ERR_CSS_ERROR_FROM_SERVER;
}
/*
* If all connected clients are released (by reset-on-commit),
* change the state to 'standby' as a completion of HA fail-back action.
*/
else if (whence == FROM_UNREGISTER_CLIENT)
{
if (logtb_count_clients (thread_p) == 1)
{
er_log_debug (ARG_FILE_LINE,
"logtb_count_clients () = 1 including me "
"transit state from 'to-be-standby' to 'standby'\n");
state = css_transit_ha_server_state (thread_p, HA_SERVER_STATE_STANDBY);
assert (state == HA_SERVER_STATE_STANDBY);
if (state == HA_SERVER_STATE_STANDBY)
{
er_log_debug (ARG_FILE_LINE, "css_check_ha_server_state_for_client: " "logtb_disable_update() \n");
logtb_disable_update (thread_p);
}
}
}
break;
default:
break;
}
/* csect_exit (CSECT_HA_SERVER_STATE); */
return err;
}
/*
* css_check_ha_log_applier_done - check all log appliers have done
* return: true or false
*/
static bool
css_check_ha_log_applier_done (void)
{
int i;
for (i = 0; i < ha_Server_num_of_hosts; i++)
{
if (ha_Log_applier_state[i].state != HA_LOG_APPLIER_STATE_DONE)
{
break;
}
}
if (i == ha_Server_num_of_hosts
&& (ha_Server_state == HA_SERVER_STATE_TO_BE_ACTIVE || ha_Server_state == HA_SERVER_STATE_ACTIVE))
{
return true;
}
return false;
}
/*
* css_check_ha_log_applier_working - check all log appliers are working
* return: true or false
*/
static bool
css_check_ha_log_applier_working (void)
{
int i;
for (i = 0; i < ha_Server_num_of_hosts; i++)
{
if (ha_Log_applier_state[i].state != HA_LOG_APPLIER_STATE_WORKING
|| ha_Log_applier_state[i].state != HA_LOG_APPLIER_STATE_DONE)
{
break;
}
}
if (i == ha_Server_num_of_hosts
&& (ha_Server_state == HA_SERVER_STATE_TO_BE_STANDBY || ha_Server_state == HA_SERVER_STATE_STANDBY))
{
return true;
}
return false;
}
// *INDENT-OFF*
/*
* css_change_ha_server_state - change the server's HA state
* return: NO_ERROR or ER_FAILED
* state(in): new state for server to be
* force(in): force to change
* timeout(in): timeout (standby to maintenance)
* heartbeat(in): from heartbeat master
*/
int
css_change_ha_server_state (THREAD_ENTRY * thread_p, HA_SERVER_STATE state, bool force, int timeout, bool heartbeat)
{
HA_SERVER_STATE orig_state;
int i;
er_log_debug (ARG_FILE_LINE, "css_change_ha_server_state: ha_Server_state %s " "state %s force %c heartbeat %c\n",
css_ha_server_state_string (ha_Server_state), css_ha_server_state_string (state), (force ? 't' : 'f'),
(heartbeat ? 't' : 'f'));
assert (state >= HA_SERVER_STATE_IDLE && state <= HA_SERVER_STATE_DEAD);
if (state == ha_Server_state
|| (!force && ha_Server_state == HA_SERVER_STATE_TO_BE_ACTIVE && state == HA_SERVER_STATE_ACTIVE)
|| (!force && ha_Server_state == HA_SERVER_STATE_TO_BE_STANDBY && state == HA_SERVER_STATE_STANDBY))
{
return NO_ERROR;
}
if (heartbeat == false && !(ha_Server_state == HA_SERVER_STATE_STANDBY && state == HA_SERVER_STATE_MAINTENANCE)
&& !(ha_Server_state == HA_SERVER_STATE_MAINTENANCE && state == HA_SERVER_STATE_STANDBY)
&& !(force && ha_Server_state == HA_SERVER_STATE_TO_BE_ACTIVE && state == HA_SERVER_STATE_ACTIVE))
{
return NO_ERROR;
}
csect_enter (thread_p, CSECT_HA_SERVER_STATE, INF_WAIT);
orig_state = ha_Server_state;
if (force)
{
if (ha_Server_state != state)
{
er_log_debug (ARG_FILE_LINE, "css_change_ha_server_state:" " set force from %s to state %s\n",
css_ha_server_state_string (ha_Server_state), css_ha_server_state_string (state));
ha_Server_state = state;
/* append a dummy log record for LFT to wake LWTs up */
log_append_ha_server_state (thread_p, state);
if (!HA_DISABLED ())
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_CSS_SERVER_HA_MODE_CHANGE, 2,
css_ha_server_state_string (ha_Server_state), css_ha_server_state_string (state));
}
if (ha_Server_state == HA_SERVER_STATE_ACTIVE)
{
log_set_ha_promotion_time (thread_p, ((INT64) time (0)));
}
}
}
switch (state)
{
case HA_SERVER_STATE_ACTIVE:
state = css_transit_ha_server_state (thread_p, HA_SERVER_STATE_ACTIVE);
if (state == HA_SERVER_STATE_NA)
{
break;
}
/* If log appliers have changed their state to done, go directly to active mode */
if (css_check_ha_log_applier_done ())
{
er_log_debug (ARG_FILE_LINE, "css_change_ha_server_state: " "css_check_ha_log_applier_done ()\n");
state = css_transit_ha_server_state (thread_p, HA_SERVER_STATE_ACTIVE);
assert (state == HA_SERVER_STATE_ACTIVE);
}
if (state == HA_SERVER_STATE_ACTIVE)
{
er_log_debug (ARG_FILE_LINE, "css_change_ha_server_state: " "logtb_enable_update() \n");
logtb_enable_update (thread_p);
}
break;
case HA_SERVER_STATE_STANDBY:
state = css_transit_ha_server_state (thread_p, HA_SERVER_STATE_STANDBY);
if (state == HA_SERVER_STATE_NA)
{
break;
}
if (orig_state == HA_SERVER_STATE_IDLE)
{
/* If all log appliers have done their recovering actions, go directly to standby mode */
if (css_check_ha_log_applier_working ())
{
er_log_debug (ARG_FILE_LINE, "css_change_ha_server_state: " "css_check_ha_log_applier_working ()\n");
state = css_transit_ha_server_state (thread_p, HA_SERVER_STATE_STANDBY);
assert (state == HA_SERVER_STATE_STANDBY);
}
}
else
{
/* If there's no active clients (except me), go directly to standby mode */
if (logtb_count_clients (thread_p) == 0)
{
er_log_debug (ARG_FILE_LINE, "css_change_ha_server_state: " "logtb_count_clients () = 0\n");
state = css_transit_ha_server_state (thread_p, HA_SERVER_STATE_STANDBY);
assert (state == HA_SERVER_STATE_STANDBY);
}
}
if (orig_state == HA_SERVER_STATE_MAINTENANCE)
{
boot_server_status (BOOT_SERVER_UP);
}
if (state == HA_SERVER_STATE_STANDBY)
{
er_log_debug (ARG_FILE_LINE, "css_change_ha_server_state: " "logtb_disable_update() \n");
logtb_disable_update (thread_p);
}
break;
case HA_SERVER_STATE_MAINTENANCE:
state = css_transit_ha_server_state (thread_p, HA_SERVER_STATE_MAINTENANCE);
if (state == HA_SERVER_STATE_NA)
{
break;
}
if (state == HA_SERVER_STATE_MAINTENANCE)
{
er_log_debug (ARG_FILE_LINE, "css_change_ha_server_state: " "logtb_enable_update() \n");
logtb_enable_update (thread_p);
boot_server_status (BOOT_SERVER_MAINTENANCE);
}
for (i = 0; i < timeout; i++)
{
/* waiting timeout second while transaction terminated normally. */
if (logtb_count_not_allowed_clients_in_maintenance_mode (thread_p) == 0)
{
break;
}
thread_sleep (1000); /* 1000 msec */
}
if (logtb_count_not_allowed_clients_in_maintenance_mode (thread_p) != 0)
{
LOG_TDES *tdes;
/* try to kill transaction. */
TR_TABLE_CS_ENTER (thread_p);
// start from transaction index i = 1; system transaction cannot be killed
for (i = 1; i < log_Gl.trantable.num_total_indices; i++)
{
tdes = log_Gl.trantable.all_tdes[i];
if (tdes != NULL && tdes->trid != NULL_TRANID)
{
if (!BOOT_IS_ALLOWED_CLIENT_TYPE_IN_MT_MODE (tdes->client.get_host_name (), boot_Host_name,
tdes->client.client_type))
{
logtb_slam_transaction (thread_p, tdes->tran_index);
}
}
}
TR_TABLE_CS_EXIT (thread_p);
thread_sleep (2000); /* 2000 msec */
}
break;
default:
state = HA_SERVER_STATE_NA;
break;
}
csect_exit (thread_p, CSECT_HA_SERVER_STATE);
return (state != HA_SERVER_STATE_NA) ? NO_ERROR : ER_FAILED;
}
// *INDENT-ON*
/*
* css_notify_ha_server_mode - notify the log applier's HA state
* return: NO_ERROR or ER_FAILED
* state(in): new state to be recorded
*/
int
css_notify_ha_log_applier_state (THREAD_ENTRY * thread_p, HA_LOG_APPLIER_STATE state)
{
HA_LOG_APPLIER_STATE_TABLE *table;
HA_SERVER_STATE server_state;
int i, client_id;
assert (state >= HA_LOG_APPLIER_STATE_UNREGISTERED && state <= HA_LOG_APPLIER_STATE_ERROR);
csect_enter (thread_p, CSECT_HA_SERVER_STATE, INF_WAIT);
client_id = css_get_client_id (thread_p);
er_log_debug (ARG_FILE_LINE, "css_notify_ha_log_applier_state: client %d state %s\n", client_id,
css_ha_applier_state_string (state));
for (i = 0, table = ha_Log_applier_state; i < ha_Log_applier_state_num; i++, table++)
{
if (table->client_id == client_id)
{
if (table->state == state)
{
csect_exit (thread_p, CSECT_HA_SERVER_STATE);
return NO_ERROR;
}
table->state = state;
break;
}
if (table->state == HA_LOG_APPLIER_STATE_UNREGISTERED)
{
table->client_id = client_id;
table->state = state;
break;
}
}
if (i == ha_Log_applier_state_num && ha_Log_applier_state_num < ha_Server_num_of_hosts)
{
table = &ha_Log_applier_state[ha_Log_applier_state_num++];
table->client_id = client_id;
table->state = state;
}
if (css_check_ha_log_applier_done ())
{
er_log_debug (ARG_FILE_LINE, "css_notify_ha_log_applier_state: " "css_check_ha_log_applier_done()\n");
server_state = css_transit_ha_server_state (thread_p, HA_SERVER_STATE_ACTIVE);
assert (server_state == HA_SERVER_STATE_ACTIVE);
if (server_state == HA_SERVER_STATE_ACTIVE)
{
er_log_debug (ARG_FILE_LINE, "css_notify_ha_log_applier_state: " "logtb_enable_update() \n");
logtb_enable_update (thread_p);
}
}
if (css_check_ha_log_applier_working ())
{
er_log_debug (ARG_FILE_LINE, "css_notify_ha_log_applier_state: " "css_check_ha_log_applier_working()\n");
server_state = css_transit_ha_server_state (thread_p, HA_SERVER_STATE_STANDBY);
assert (server_state == HA_SERVER_STATE_STANDBY);
if (server_state == HA_SERVER_STATE_STANDBY)
{
er_log_debug (ARG_FILE_LINE, "css_notify_ha_log_applier_state: " "logtb_disable_update() \n");
logtb_disable_update (thread_p);
}
}
csect_exit (thread_p, CSECT_HA_SERVER_STATE);
return NO_ERROR;
}
#if defined(SERVER_MODE)
int
css_check_accessibility (SOCKET new_fd)
{
#if defined(WINDOWS) || defined(SOLARIS)
int saddr_len;
#elif defined(UNIXWARE7)
size_t saddr_len;
#else
socklen_t saddr_len;
#endif
struct sockaddr_in clt_sock_addr;
unsigned char *ip_addr;
int err_code;
saddr_len = sizeof (clt_sock_addr);
if (getpeername (new_fd, (struct sockaddr *) &clt_sock_addr, &saddr_len) != 0)
{
return ER_FAILED;
}
ip_addr = (unsigned char *) &(clt_sock_addr.sin_addr);
if (clt_sock_addr.sin_family == AF_UNIX
|| (ip_addr[0] == 127 && ip_addr[1] == 0 && ip_addr[2] == 0 && ip_addr[3] == 1))
{
return NO_ERROR;
}
if (css_Server_accessible_ip_info == NULL)
{
char ip_str[32];
sprintf (ip_str, "%d.%d.%d.%d", (unsigned char) ip_addr[0], ip_addr[1], ip_addr[2], ip_addr[3]);
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_INACCESSIBLE_IP, 1, ip_str);
return ER_INACCESSIBLE_IP;
}
csect_enter_as_reader (NULL, CSECT_ACL, INF_WAIT);
err_code = css_check_ip (css_Server_accessible_ip_info, ip_addr);
csect_exit (NULL, CSECT_ACL);
if (err_code != NO_ERROR)
{
char ip_str[32];
sprintf (ip_str, "%d.%d.%d.%d", (unsigned char) ip_addr[0], ip_addr[1], ip_addr[2], ip_addr[3]);
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_INACCESSIBLE_IP, 1, ip_str);
}
return err_code;
}
int
css_set_accessible_ip_info (void)
{
int ret_val;
IP_INFO *tmp_accessible_ip_info;
if (prm_get_string_value (PRM_ID_ACCESS_IP_CONTROL_FILE) == NULL)
{
css_Server_accessible_ip_info = NULL;
return NO_ERROR;
}
#if defined (WINDOWS)
if (strlen (prm_get_string_value (PRM_ID_ACCESS_IP_CONTROL_FILE)) > 2
&& isalpha (prm_get_string_value (PRM_ID_ACCESS_IP_CONTROL_FILE)[0])
&& prm_get_string_value (PRM_ID_ACCESS_IP_CONTROL_FILE)[1] == ':')
#else
if (prm_get_string_value (PRM_ID_ACCESS_IP_CONTROL_FILE)[0] == PATH_SEPARATOR)
#endif
{
ip_list_file_name = (char *) prm_get_string_value (PRM_ID_ACCESS_IP_CONTROL_FILE);
}
else
{
ip_list_file_name =
envvar_confdir_file (ip_file_real_path, PATH_MAX, prm_get_string_value (PRM_ID_ACCESS_IP_CONTROL_FILE));
}
ret_val = css_read_ip_info (&tmp_accessible_ip_info, ip_list_file_name);
if (ret_val == NO_ERROR)
{
csect_enter (NULL, CSECT_ACL, INF_WAIT);
if (css_Server_accessible_ip_info != NULL)
{
css_free_accessible_ip_info ();
}
css_Server_accessible_ip_info = tmp_accessible_ip_info;
csect_exit (NULL, CSECT_ACL);
}
return ret_val;
}
int
css_free_accessible_ip_info (void)
{
int ret_val;
ret_val = css_free_ip_info (css_Server_accessible_ip_info);
css_Server_accessible_ip_info = NULL;
return ret_val;
}
void
xacl_dump (THREAD_ENTRY * thread_p, FILE * outfp)
{
int i, j;
if (outfp == NULL)
{
outfp = stdout;
}
fprintf (outfp, "access_ip_control=%s\n", (prm_get_bool_value (PRM_ID_ACCESS_IP_CONTROL) ? "yes" : "no"));
fprintf (outfp, "access_ip_control_file=%s\n", (ip_list_file_name != NULL) ? ip_list_file_name : "NULL");
if (prm_get_bool_value (PRM_ID_ACCESS_IP_CONTROL) == false || css_Server_accessible_ip_info == NULL)
{
return;
}
csect_enter_as_reader (thread_p, CSECT_ACL, INF_WAIT);
for (i = 0; i < css_Server_accessible_ip_info->num_list; i++)
{
int address_index = i * IP_BYTE_COUNT;
for (j = 0; j < css_Server_accessible_ip_info->address_list[address_index]; j++)
{
fprintf (outfp, "%d%s", css_Server_accessible_ip_info->address_list[address_index + j + 1],
((j != 3) ? "." : ""));
}
if (j != 4)
{
fprintf (outfp, "*");
}
fprintf (outfp, "\n");
}
fprintf (outfp, "\n");
csect_exit (thread_p, CSECT_ACL);
return;
}
int
xacl_reload (THREAD_ENTRY * thread_p)
{
return css_set_accessible_ip_info ();
}
#endif
/*
* css_get_client_id() - returns the unique client identifier
* return: returns the unique client identifier, on error, returns -1
*
* Note: WARN: this function doesn't lock on thread_entry
*/
int
css_get_client_id (THREAD_ENTRY * thread_p)
{
CSS_CONN_ENTRY *conn_p;
if (thread_p == NULL)
{
thread_p = thread_get_thread_entry_info ();
}
assert (thread_p != NULL);
conn_p = thread_p->conn_entry;
if (conn_p != NULL)
{
return conn_p->client_id;
}
else
{
return -1;
}
}
/*
* css_set_thread_info () -
* return:
* thread_p(out):
* client_id(in):
* rid(in):
* tran_index(in):
*/
void
css_set_thread_info (THREAD_ENTRY * thread_p, int client_id, int rid, int tran_index, int net_request_index)
{
thread_p->client_id = client_id;
thread_p->rid = rid;
thread_p->tran_index = tran_index;
thread_p->net_request_index = net_request_index;
thread_p->victim_request_fail = false;
thread_p->next_wait_thrd = NULL;
thread_p->wait_for_latch_promote = false;
thread_p->lockwait = NULL;
thread_p->lockwait_state = -1;
thread_p->query_entry = NULL;
thread_p->tran_next_wait = NULL;
thread_p->end_resource_tracks ();
thread_clear_recursion_depth (thread_p);
}
/*
* css_get_comm_request_id() - returns the request id that started the current thread
* return: returns the comm system request id for the client request that
* started the thread. On error, returns -1
*
* Note: WARN: this function doesn't lock on thread_entry
*/
unsigned int
css_get_comm_request_id (THREAD_ENTRY * thread_p)
{
if (thread_p == NULL)
{
thread_p = thread_get_thread_entry_info ();
}
assert (thread_p != NULL);
return thread_p->rid;
}
/*
* css_get_current_conn_entry() -
* return:
*/
CSS_CONN_ENTRY *
css_get_current_conn_entry (void)
{
THREAD_ENTRY *thread_p;
thread_p = thread_get_thread_entry_info ();
assert (thread_p != NULL);
return thread_p->conn_entry;
}
// *INDENT-OFF*
/*
* css_push_server_task () - push a task on server request worker pool
*
* return : void
* thread_ref (in) : thread context
* task (in) : task to execute
*
* TODO: this is also used externally due to legacy design; should be internalized completely
*/
void
css_push_server_task (CSS_CONN_ENTRY &conn_ref)
{
// push the task
//
// note: cores are partitioned by connection index. this is particularly important in order to avoid having tasks
// randomly pushed to cores that are full. some of those tasks may belong to threads holding locks. as a
// consequence, lock waiters may wait longer or even indefinitely if we are really unlucky.
//
conn_ref.add_pending_request ();
conn_ref.add_working_task ();
thread_get_manager ()->push_task_on_core (css_Server_request_worker_pool, new css_server_task (conn_ref),
static_cast<size_t> (conn_ref.idx), conn_ref.in_method);
}
void
css_push_external_task (CSS_CONN_ENTRY *conn, cubthread::entry_task *task)
{
thread_get_manager ()->push_task (css_Server_request_worker_pool, new css_server_external_task (conn, task));
}
void
css_server_task::execute (context_type &thread_ref)
{
session_state *session_p;
thread_ref.conn_entry = &m_conn;
session_p = thread_ref.conn_entry->session_p;
m_conn.start_request ();
if (session_p != NULL)
{
thread_ref.private_lru_index = session_get_private_lru_idx (session_p);
pgbuf_thread_variables_init (&thread_ref);
}
else
{
assert (thread_ref.private_lru_index == -1);
}
thread_ref.m_status = cubthread::entry::status::TS_RUN;
// TODO: we lock tran_index_lock because css_internal_request_handler expects it to be locked. however, I am not
// convinced we really need this
pthread_mutex_lock (&thread_ref.tran_index_lock);
(void) css_internal_request_handler (thread_ref, m_conn);
thread_ref.conn_entry = NULL;
thread_ref.m_status = cubthread::entry::status::TS_FREE;
if (m_conn.end_working_task () == 0 && m_conn.status == CONN_CLOSING)
{
css_request_shutdown_conn (&m_conn, static_cast <uint8_t> (cubconn::connection::ignore_level::DONT_IGNORE), false, 0 /* no wait */);
}
else
{
css_wakeup_handler (&m_conn);
}
}
void
css_server_external_task::execute (context_type &thread_ref)
{
thread_ref.conn_entry = m_conn;
session_state *session_p = thread_ref.conn_entry != NULL ? thread_ref.conn_entry->session_p : NULL;
if (session_p != NULL)
{
thread_ref.private_lru_index = session_get_private_lru_idx (session_p);
}
else
{
assert (thread_ref.private_lru_index == -1);
}
thread_ref.m_status = cubthread::entry::status::TS_RUN;
// TODO: We lock tran_index_lock because external task expects it to be locked.
// However, I am not convinced we really need this
pthread_mutex_lock (&thread_ref.tran_index_lock);
m_task->execute (thread_ref);
thread_ref.conn_entry = NULL;
thread_ref.m_status = cubthread::entry::status::TS_FREE;
}
//
// css_stop_non_log_writer () - function mapped over worker pools to search and stop non-log writer workers
//
// thread_ref (in) : entry of thread to check and stop
// stop_mapper (out) : ignored; part of expected signature of mapper function
// stopper_thread_ref (in) : entry of thread mapping this function over worker pool
//
static void
css_stop_non_log_writer (THREAD_ENTRY & thread_ref, bool & stop_mapper, THREAD_ENTRY & stopper_thread_ref)
{
(void) stop_mapper; // suppress unused warning
// porting of legacy code
if (css_is_log_writer (thread_ref))
{
// not log writer
return;
}
int tran_index = thread_ref.tran_index;
if (tran_index == NULL_TRAN_INDEX)
{
// no transaction, no stop
return;
}
(void) logtb_set_tran_index_interrupt (&stopper_thread_ref, tran_index, true);
if (thread_ref.m_status == cubthread::entry::status::TS_WAIT && logtb_is_current_active (&thread_ref))
{
thread_lock_entry (&thread_ref);
if (thread_ref.tran_index != NULL_TRAN_INDEX && thread_ref.m_status == cubthread::entry::status::TS_WAIT
&& thread_ref.lockwait == NULL && thread_ref.check_interrupt)
{
thread_ref.interrupted = true;
thread_wakeup_already_had_mutex (&thread_ref, THREAD_RESUME_DUE_TO_INTERRUPT);
}
thread_unlock_entry (&thread_ref);
}
// make sure not blocked in locks
lock_force_thread_timeout_lock (&thread_ref);
}
//
// css_stop_log_writer () - function mapped over worker pools to search and stop log writer workers
//
// thread_ref (in) : entry of thread to check and stop
// stop_mapper (out) : ignored; part of expected signature of mapper function
// stopper_thread_ref (in) : entry of thread mapping this function over worker pool
//
static void
css_stop_log_writer (THREAD_ENTRY & thread_ref, bool & stop_mapper)
{
(void) stop_mapper; // suppress unused warning
if (!css_is_log_writer (thread_ref))
{
// this is not log writer
return;
}
if (thread_ref.tran_index == -1)
{
// no transaction, no stop
return;
}
if (thread_ref.m_status == cubthread::entry::status::TS_WAIT && logtb_is_current_active (&thread_ref))
{
thread_check_suspend_reason_and_wakeup (&thread_ref, THREAD_RESUME_DUE_TO_INTERRUPT, THREAD_LOGWR_SUSPENDED);
thread_ref.interrupted = true;
}
// make sure not blocked in locks
lock_force_thread_timeout_lock (&thread_ref);
}
//
// css_find_not_stopped () - find any target thread that is not stopped
//
// thread_ref (in) : entry of thread that should be stopped
// stop_mapper (out) : output true to stop mapping
// is_log_writer (in) : true to target log writers, false to target non-log writers
// found (out) : output true if target thread is not stopped
//
static void
css_find_not_stopped (THREAD_ENTRY & thread_ref, bool & stop_mapper, bool is_log_writer, bool & found)
{
if (thread_ref.conn_entry == NULL)
{
// no conn_entry => does not need stopping
return;
}
if (is_log_writer != css_is_log_writer (thread_ref))
{
// don't care
return;
}
if (thread_ref.m_status != cubthread::entry::status::TS_FREE)
{
found = true;
stop_mapper = true;
}
}
//
// css_is_log_writer () - does thread entry belong to a log writer?
//
// return : true for log writer, false otherwise
// thread_arg (in) : thread entry
//
static bool
css_is_log_writer (const THREAD_ENTRY &thread_arg)
{
// note - access to thread entry is not exclusive and racing may occur
volatile const css_conn_entry * connp = thread_arg.conn_entry;
return connp != NULL && connp->stop_phase == THREAD_STOP_LOGWR;
}
//
// css_stop_all_workers () - stop target workers based on phase (log writers or non-log writers)
//
// thread_ref (in) : thread local entry
// stop_phase (in) : THREAD_STOP_WORKERS_EXCEPT_LOGWR or THREAD_STOP_LOGWR
//
static void
css_stop_all_workers (THREAD_ENTRY &thread_ref, css_thread_stop_type stop_phase)
{
bool is_not_stopped;
if (css_Server_request_worker_pool == NULL)
{
// nothing to stop
return;
}
// note: this is legacy code ported from thread.c; the whole log writer management seems complicated, but hopefully
// it can be removed after HA refactoring.
//
// question: is it possible to have more than one log writer thread?
//
if (stop_phase == THREAD_STOP_WORKERS_EXCEPT_LOGWR)
{
// first block all connections
css_block_all_active_conn (stop_phase);
}
// loop until all are stopped
while (true)
{
// tell all to stop
if (stop_phase == THREAD_STOP_LOGWR)
{
css_Server_request_worker_pool->map_running_contexts (css_stop_log_writer);
}
else
{
css_Server_request_worker_pool->map_running_contexts (css_stop_non_log_writer, thread_ref);
}
// sleep for 50 milliseconds
std::this_thread::sleep_for (std::chrono::milliseconds (50));
// check if any thread is not stopped
is_not_stopped = false;
css_Server_request_worker_pool->map_running_contexts (css_find_not_stopped, stop_phase == THREAD_STOP_LOGWR,
is_not_stopped);
if (!is_not_stopped)
{
// all threads are stopped, break loop
break;
}
if (css_is_shutdown_timeout_expired ())
{
er_log_debug (ARG_FILE_LINE, "could not stop all active workers");
_exit (0);
}
}
// we must not block active connection before terminating log writer thread
if (stop_phase == THREAD_STOP_LOGWR)
{
css_block_all_active_conn (stop_phase);
}
}
//
// css_get_thread_stats () - get statistics for server request handlers
//
// stats_out (out) : output statistics
//
void
css_get_thread_stats (UINT64 *stats_out)
{
css_Server_request_worker_pool->get_stats (stats_out);
}
//
// css_get_task_stats () - get task statistics for server request handlers
//
// stats_out (out) : output statistics
//
void
css_get_task_stats (UINT64 *stats_out)
{
// css_Server_request_worker_pool->get_task_stats (stats_out);
}
//
// css_get_num_request_workers () - get number of workers executing server requests
//
size_t
css_get_num_request_workers (void)
{
return css_Server_request_worker_pool->get_worker_count ();
}
//
// css_wp_worker_get_busy_count_mapper () - function to map through worker pool entries and count busy workers
//
// thread_ref (in) : thread entry (context)
// stop_mapper (in/out) : normally used to stop mapping early, ignored here
// busy_count (out) : increment when busy worker is found
//
static void
css_wp_worker_get_busy_count_mapper (THREAD_ENTRY & thread_ref, bool & stop_mapper, int & busy_count)
{
(void) stop_mapper; // suppress unused parameter warning
if (thread_ref.tran_index != NULL_TRAN_INDEX)
{
// busy thread
busy_count++;
}
else
{
// must be waiting for task; not busy
}
}
//
// css_wp_core_job_scan_mapper () - function to map worker pool cores and get info required for "job scan"
//
// wp_core (in) : worker pool core
// stop_mapper (in/out) : output true to stop mapper early
// thread_p (in) : thread entry of job scan
// ctx (in) : job scan context
// core_index (in/out) : current core index; is incremented on each call
// error_code (out) : output error_code if any errors occur
//
static void
css_wp_core_job_scan_mapper (const cubthread::stats_worker_pool_type::core_impl & wp_core, bool & stop_mapper,
THREAD_ENTRY * thread_p, SHOWSTMT_ARRAY_CONTEXT * ctx, size_t & core_index,
int & error_code)
{
DB_VALUE *vals = showstmt_alloc_tuple_in_context (thread_p, ctx);
if (vals == NULL)
{
assert (false);
error_code = ER_FAILED;
stop_mapper = true;
return;
}
// add core index; it used to be job queue index
size_t val_index = 0;
(void) db_make_int (&vals[val_index++], (int) core_index);
// add max worker count; it used to be max thread workers per job queue
(void) db_make_int (&vals[val_index++], (int) wp_core.get_worker_count ());
// number of busy workers; core does not keep it, we need to count them manually
int busy_count = 0;
wp_core.map_running_contexts (stop_mapper, css_wp_worker_get_busy_count_mapper, busy_count);
(void) db_make_int (&vals[val_index++], (int) busy_count);
// number of connection workers; just for backward compatibility, there are no connections workers here
(void) db_make_int (&vals[val_index++], 0);
// increment core_index
++core_index;
assert (val_index == CSS_JOB_QUEUE_SCAN_COLUMN_COUNT);
}
//
// css_is_any_thread_not_suspended_mapfunc
//
// thread_ref (in) : current thread entry
// stop_mapper (out) : output true to stop mapper
// count (out) : count number of threads
// found (out) : output true when not suspended thread is found
//
static void
css_is_any_thread_not_suspended_mapfunc (THREAD_ENTRY & thread_ref, bool & stop_mapper, size_t & count, bool & found)
{
if (thread_ref.m_status != cubthread::entry::status::TS_WAIT)
{
// found not suspended; stop
stop_mapper = true;
found = true;
return;
}
++count;
}
//
// css_are_all_request_handlers_suspended - are all request handlers suspended?
//
bool
css_are_all_request_handlers_suspended (void)
{
// assume all are suspended
bool is_any_not_suspended = false;
size_t checked_threads_count = 0;
css_Server_request_worker_pool->map_running_contexts (css_is_any_thread_not_suspended_mapfunc, checked_threads_count,
is_any_not_suspended);
if (is_any_not_suspended)
{
// found a thread that was not suspended
return false;
}
if (checked_threads_count == css_Server_request_worker_pool->get_worker_count ())
{
// all threads are suspended
return true;
}
else
{
// at least one thread is free
return false;
}
}
//
// css_count_transaction_worker_threads_mapfunc () - mapper function for worker pool thread entries. tries to identify
// entries belonging to given transaction/client and increment
// counter
//
// thread_ref (in) : thread entry belonging to running worker
// stop_mapper (out) : ignored
// caller_thread (in) : thread entry of caller
// tran_index (in) : transaction index
// client_id (in) : client id
// count (out) : increment counter if thread entry belongs to transaction/client
//
static void
css_count_transaction_worker_threads_mapfunc (THREAD_ENTRY & thread_ref, bool & stop_mapper,
THREAD_ENTRY * caller_thread, int tran_index, int client_id,
size_t & count)
{
(void) stop_mapper; // suppress unused parameter warning
CSS_CONN_ENTRY *conn_p;
bool does_belong = false;
if (caller_thread == &thread_ref || thread_ref.type != TT_WORKER)
{
// not what we need
return;
}
(void) pthread_mutex_lock (&thread_ref.tran_index_lock);
if (!thread_ref.is_on_current_thread ()
&& thread_ref.m_status != cubthread::entry::status::TS_DEAD
&& thread_ref.m_status != cubthread::entry::status::TS_FREE
&& thread_ref.m_status != cubthread::entry::status::TS_CHECK)
{
conn_p = thread_ref.conn_entry;
if (tran_index == NULL_TRAN_INDEX)
{
// exact match client ID is required
does_belong = (conn_p != NULL && conn_p->client_id == client_id);
}
else if (tran_index == thread_ref.tran_index)
{
// match client ID or null connection
does_belong = (conn_p == NULL || conn_p->client_id == client_id);
}
}
pthread_mutex_unlock (&thread_ref.tran_index_lock);
if (does_belong)
{
count++;
}
}
//
// css_count_transaction_worker_threads () - count thread entries belonging to transaction/client (exclude current
// thread)
//
// return : thread entries count
// thread_p (in) : thread entry of caller
// tran_index (in) : transaction index
// client_id (in) : client id
//
size_t
css_count_transaction_worker_threads (THREAD_ENTRY * thread_p, int tran_index, int client_id)
{
size_t count = 0;
if (css_Server_request_worker_pool == NULL)
{
return 0;
}
css_Server_request_worker_pool->map_running_contexts (css_count_transaction_worker_threads_mapfunc, thread_p,
tran_index, client_id, count);
return count;
}
size_t css_get_max_workers ()
{
return css_get_max_conn () + 1; // = css_Num_max_conn in connection_sr.c
}
size_t css_get_max_task_count ()
{
return 2 * css_get_max_workers (); // not that it matters...
}
size_t css_get_max_connections ()
{
return css_get_max_conn () + 1;
}
static bool
css_get_server_request_thread_pooling_configuration (void)
{
return prm_get_bool_value (PRM_ID_THREAD_WORKER_POOLING);
}
static cubthread::wait_seconds
css_get_server_request_thread_timeout_configuration (void)
{
// todo: need infinite timeout
return cubthread::wait_seconds (std::chrono::seconds (prm_get_integer_value (PRM_ID_THREAD_WORKER_TIMEOUT_SECONDS)));
}
static void
css_start_all_threads (void)
{
if (css_Server_request_worker_pool == NULL)
{
// not started yet
return;
}
// start if pooling is configured
using clock_type = std::chrono::system_clock;
clock_type::time_point start_time = clock_type::now ();
bool start_workers = css_get_server_request_thread_pooling_configuration ();
if (start_workers)
{
css_Server_request_worker_pool->warmup ();
}
clock_type::time_point end_time = clock_type::now ();
er_log_debug (ARG_FILE_LINE,
"css_start_all_threads: \n"
"\tstarting transaction workers: %s\n"
"\telapsed time: %lld microseconds",
start_workers ? "true" : "false",
std::chrono::duration_cast<std::chrono::microseconds> (end_time - start_time).count ());
}
// *INDENT-ON*