44 #include <sys/filio.h> 46 #include <sys/socket.h> 48 #include <netinet/in.h> 85 #define CSS_WAIT_COUNT 5 86 #define CSS_GOING_DOWN_IMMEDIATELY "Server going down immediately" 89 #define SockError SOCKET_ERROR 94 #define RMUTEX_NAME_TEMP_CONN_ENTRY "TEMP_CONN_ENTRY" 115 #define HA_LOG_APPLIER_STATE_TABLE_MAX 5 197 void execute (
context_type & thread_ref)
override final;
229 static int css_process_new_connection_request (
void);
262 #if defined (SERVER_MODE) 296 size_t core_index = 0;
308 #endif // SERVER_MODE 317 #if !defined(WINDOWS) 321 #if defined(SA_MODE) && (defined(LINUX) || defined(x86_SOLARIS) || defined(HPUX)) 393 int r, run_code = 1, status = 0, nfds;
394 struct pollfd po[] = { {0, 0, 0}, {0, 0, 0} };
413 po[0].events = POLLIN;
420 po[1].events = POLLIN;
465 "Disconnected with the cub_master and will shut itself down",
"");
468 #if !defined(WINDOWS) 477 css_process_new_connection_request ();
514 r =
css_readn (master_fd, (
char *) &request,
sizeof (
int), -1);
515 if (r ==
sizeof (
int))
517 return ((
int)
ntohl (request));
562 #if !defined(WINDOWS) 603 char reply_buf[
sizeof (int)];
608 memcpy (reply_buf, (
char *) &t,
sizeof (
int));
610 css_send_data (conn, rid, reply_buf, (
int)
sizeof (reply_buf));
631 #if defined (WINDOWS) 724 r = send (master_fd, (
char *) &response,
sizeof (
int), 0);
740 #if !defined(WINDOWS) 775 #if !defined(WINDOWS) 810 css_Master_conn =
NULL;
820 struct timeval timeout;
846 css_process_new_connection_request (
void)
849 int reason, buffer_size,
rc;
895 buffer_size = reason = rid = 0;
940 char *packed_server_name;
950 if (packed_server_name !=
NULL)
960 css_Master_conn = conn;
984 int n, type,
rv, status;
985 volatile int conn_status;
986 int css_peer_alive_timeout, poll_timeout;
987 int max_num_loop, num_loop;
989 struct pollfd po[1] = { {0, 0, 0} };
991 if (thread_p ==
NULL)
1002 css_peer_alive_timeout = 5000;
1004 max_num_loop = css_peer_alive_timeout / poll_timeout;
1009 while (thread_p->shutdown ==
false && conn->stop_talk ==
false)
1012 conn_status = conn->
status;
1023 conn_status = conn->
status;
1037 po[0].events = POLLIN;
1039 n = poll (po, 1, poll_timeout);
1042 if (num_loop < max_num_loop)
1049 #if !defined (WINDOWS) 1090 if (po[0].revents & POLLERR || po[0].revents & POLLHUP)
1119 "css_connection_handler_thread: status %d conn { status %d transaction_id %d " 1120 "db_error %d stop_talk %d stop_phase %d }\n", status, conn->
status, conn->get_tran_index (),
1121 conn->
db_error, conn->stop_talk, conn->stop_phase);
1123 (*css_Connection_error_handler) (thread_p, conn);
1127 assert (thread_p->shutdown ==
true || conn->stop_talk ==
true);
1152 if (conn->stop_phase != stop_phase)
1161 conn->stop_talk =
true;
1208 int request,
rc, size = 0;
1209 char *buffer =
NULL;
1210 int local_tran_index;
1213 assert (thread_ref.conn_entry == &conn_ref);
1215 local_tran_index = thread_ref.tran_index;
1221 thread_ref.tran_index = conn_ref.get_tran_index ();
1271 int size,
char *buffer),
1309 if (server_name ==
NULL || port_id <= 0)
1314 #if defined(WINDOWS) 1317 fprintf (stderr,
"Winsock startup error\n");
1324 const std::size_t MAX_TASK_COUNT = 2 * MAX_WORKERS;
1328 css_Server_request_worker_pool =
1335 if (css_Server_request_worker_pool ==
NULL)
1344 css_Connection_worker_pool =
1350 if (css_Connection_worker_pool ==
NULL)
1369 css_Master_conn = conn;
1371 #if !defined(WINDOWS) 1377 fprintf (stderr,
"failed to heartbeat register.\n");
1408 #if !defined(NDEBUG) 1418 perfmon_er_log_current_stats (thread_p);
1440 #if defined(WINDOWS) 1488 if (buffer_size > 0 && buffer !=
NULL)
1490 rc = css_send_two_data (conn,
CSS_RID_FROM_EID (eid), reply, reply_size, buffer, buffer_size);
1516 css_send_reply_and_large_data_to_client (
unsigned int eid,
char *reply,
int reply_size,
char *buffer, INT64 buffer_size)
1523 int *buffers_size,
i;
1527 if (buffer_size > 0 && buffer !=
NULL)
1529 num_buffers = (int) (buffer_size / INT_MAX) + 2;
1531 buffers = (
char **) malloc (
sizeof (
char *) * num_buffers);
1532 if (buffers ==
NULL)
1538 buffers_size = (
int *) malloc (
sizeof (
int) * num_buffers);
1539 if (buffers_size ==
NULL)
1547 buffers_size[0] = reply_size;
1549 for (i = 1; i < num_buffers; i++)
1551 buffers[
i] = &buffer[pos];
1552 if (buffer_size > INT_MAX)
1554 buffers_size[
i] = INT_MAX;
1558 buffers_size[
i] = buffer_size;
1560 pos += buffers_size[
i];
1563 rc = css_send_large_data (conn,
CSS_RID_FROM_EID (eid), (
const char **) buffers, buffers_size, num_buffers);
1566 free (buffers_size);
1594 char *buffer1,
int buffer1_size,
char *buffer2,
int buffer2_size)
1600 if (buffer2 ==
NULL || buffer2_size <= 0)
1605 css_send_three_data (conn,
CSS_RID_FROM_EID (eid), reply, reply_size, buffer1, buffer1_size, buffer2, buffer2_size);
1629 char *buffer1,
int buffer1_size,
char *buffer2,
int buffer2_size,
char *buffer3,
1636 if (buffer3 ==
NULL || buffer3_size <= 0)
1642 rc = css_send_four_data (conn,
CSS_RID_FROM_EID (eid), reply, reply_size, buffer1, buffer1_size, buffer2,
1643 buffer2_size, buffer3, buffer3_size);
1792 char *packed_name =
NULL;
1793 const char *env_name =
NULL;
1794 char pid_string[16], *s;
1797 if (server_name !=
NULL)
1800 if (env_name ==
NULL)
1815 sprintf (pid_string,
"%d", getpid ());
1818 strlen (pid_string) + 1);
1826 packed_name = (
char *) malloc (*name_length);
1827 if (packed_name ==
NULL)
1880 char *ver_str =
NULL;
1885 conn = thread_p->conn_entry;
1888 if (conn->version_string ==
NULL)
1890 ver_str = (
char *) malloc (
strlen (version_string) + 1);
1891 if (ver_str !=
NULL)
1893 strcpy (ver_str, version_string);
1894 conn->version_string = ver_str;
1899 (
size_t) (
strlen (version_string) + 1));
1905 ver_str = conn->version_string;
1912 #if defined (ENABLE_UNUSED_FUNCTION) 1919 css_get_client_version_string (
void)
1926 return entry->version_string;
2018 struct ha_server_state_transition_table
2024 static struct ha_server_state_transition_table ha_Server_state_transition[] = {
2055 struct ha_server_state_transition_table *table;
2065 for (table = ha_Server_state_transition; table->cur_state !=
HA_SERVER_STATE_NA; table++)
2067 if (table->cur_state ==
ha_Server_state && table->req_state == req_state)
2071 new_state = table->next_state;
2105 #define FROM_OTHERS 0 2106 #define FROM_REGISTER_CLIENT 1 2107 #define FROM_UNREGISTER_CLIENT 2 2127 "Connection rejected. " "The server is changing to standby mode.");
2139 "logtb_count_clients () = 1 including me " 2140 "transit state from 'to-be-standby' to 'standby'\n");
2176 if (i == ha_Server_num_of_hosts
2201 if (i == ha_Server_num_of_hosts
2226 (heartbeat ?
't' :
'f'));
2344 for (i = 0; i < timeout; i++)
2413 if (table->
state == state)
2430 table = &ha_Log_applier_state[ha_Log_applier_state_num++];
2463 #if defined(SERVER_MODE) 2467 #if defined(WINDOWS) || defined(SOLARIS) 2469 #elif defined(UNIXWARE7) 2472 socklen_t saddr_len;
2474 struct sockaddr_in clt_sock_addr;
2475 unsigned char *ip_addr;
2478 saddr_len =
sizeof (clt_sock_addr);
2480 if (getpeername (new_fd, (
struct sockaddr *) &clt_sock_addr, &saddr_len) != 0)
2485 ip_addr = (
unsigned char *) &(clt_sock_addr.sin_addr);
2487 if (clt_sock_addr.sin_family == AF_UNIX
2488 || (ip_addr[0] == 127 && ip_addr[1] == 0 && ip_addr[2] == 0 && ip_addr[3] == 1))
2493 if (css_Server_accessible_ip_info ==
NULL)
2497 sprintf (ip_str,
"%d.%d.%d.%d", (
unsigned char) ip_addr[0], ip_addr[1], ip_addr[2], ip_addr[3]);
2505 err_code =
css_check_ip (css_Server_accessible_ip_info, ip_addr);
2512 sprintf (ip_str,
"%d.%d.%d.%d", (
unsigned char) ip_addr[0], ip_addr[1], ip_addr[2], ip_addr[3]);
2524 IP_INFO *tmp_accessible_ip_info;
2528 css_Server_accessible_ip_info =
NULL;
2532 #if defined (WINDOWS) 2553 if (css_Server_accessible_ip_info !=
NULL)
2557 css_Server_accessible_ip_info = tmp_accessible_ip_info;
2571 css_Server_accessible_ip_info =
NULL;
2596 for (i = 0; i < css_Server_accessible_ip_info->
num_list; i++)
2600 for (j = 0; j < css_Server_accessible_ip_info->
address_list[address_index]; j++)
2602 fprintf (outfp,
"%d%s", css_Server_accessible_ip_info->
address_list[address_index + j + 1],
2603 ((j != 3) ?
"." :
""));
2607 fprintf (outfp,
"*");
2609 fprintf (outfp,
"\n");
2612 fprintf (outfp,
"\n");
2636 if (thread_p ==
NULL)
2643 conn_p = thread_p->conn_entry;
2666 thread_p->rid = rid;
2667 thread_p->tran_index = tran_index;
2668 thread_p->net_request_index = net_request_index;
2669 thread_p->victim_request_fail =
false;
2670 thread_p->next_wait_thrd =
NULL;
2671 thread_p->wait_for_latch_promote =
false;
2672 thread_p->lockwait =
NULL;
2673 thread_p->lockwait_state = -1;
2674 thread_p->query_entry =
NULL;
2675 thread_p->tran_next_wait =
NULL;
2677 thread_p->end_resource_tracks ();
2691 if (thread_p ==
NULL)
2698 return thread_p->rid;
2713 return thread_p->conn_entry;
2735 conn_ref.add_pending_request ();
2737 static_cast<size_t> (conn_ref.idx));
2749 m_conn.start_request ();
2751 thread_ref.conn_entry = &m_conn;
2752 session_state *session_p = thread_ref.conn_entry->session_p;
2754 if (session_p !=
NULL)
2760 assert (thread_ref.private_lru_index == -1);
2770 thread_ref.conn_entry =
NULL;
2777 thread_ref.conn_entry = m_conn;
2780 if (session_p !=
NULL)
2786 assert (thread_ref.private_lru_index == -1);
2793 m_task->execute (thread_ref);
2795 thread_ref.conn_entry =
NULL;
2801 thread_ref.conn_entry = &m_conn;
2808 thread_ref.conn_entry =
NULL;
2830 int tran_index = thread_ref.tran_index;
2844 && thread_ref.lockwait ==
NULL && thread_ref.check_interrupt)
2846 thread_ref.interrupted =
true;
2872 if (thread_ref.tran_index == -1)
2880 thread_ref.interrupted =
true;
2898 if (thread_ref.conn_entry ==
NULL)
2939 bool is_not_stopped;
2941 if (css_Server_request_worker_pool ==
NULL)
2975 std::this_thread::sleep_for (std::chrono::milliseconds (50));
2978 is_not_stopped =
false;
2981 if (!is_not_stopped)
2987 if (!is_not_stopped)
3015 css_Server_request_worker_pool->
get_stats (stats_out);
3093 size_t val_index = 0;
3094 (void)
db_make_int (&vals[val_index++], (
int) core_index);
3102 (void)
db_make_int (&vals[val_index++], (
int) busy_count);
3141 bool is_any_not_suspended =
false;
3142 size_t checked_threads_count = 0;
3145 is_any_not_suspended);
3146 if (is_any_not_suspended)
3152 if (checked_threads_count == css_Server_request_worker_pool->
get_max_count ())
3184 bool does_belong =
false;
3186 if (caller_thread == &thread_ref || thread_ref.type !=
TT_WORKER)
3194 if (!thread_ref.is_on_current_thread ()
3199 conn_p = thread_ref.conn_entry;
3205 else if (tran_index == thread_ref.tran_index)
3235 tran_index, client_id, count);
3270 if (css_Connection_worker_pool ==
NULL || css_Server_request_worker_pool ==
NULL)
3277 using clock_type = std::chrono::system_clock;
3283 if (start_connections)
3294 "css_start_all_threads: \n" 3295 "\tstarting connection threads: %s\n" 3296 "\tstarting transaction workers: %s\n" 3297 "\telapsed time: %lld microseconds",
3298 start_connections ?
"true" :
"false",
3299 start_workers ?
"true" :
"false",
3300 std::chrono::duration_cast<std::chrono::microseconds> (end_time - start_time).
count ());
static void css_stop_log_writer(THREAD_ENTRY &thread_ref, bool &)
int css_free_accessible_ip_info(void)
cubthread::entry_task * m_task
const char * envvar_root(void)
size_t css_get_num_request_workers(void)
unsigned char address_list[ACL_MAX_IP_COUNT *IP_BYTE_COUNT]
unsigned int css_send_data_to_client(CSS_CONN_ENTRY *conn, unsigned int eid, char *buffer, int buffer_size)
#define FROM_UNREGISTER_CLIENT
static void css_wp_worker_get_busy_count_mapper(THREAD_ENTRY &thread_ref, bool &stop_mapper, int &busy_count)
cubthread::entry * thread_get_thread_entry_info(void)
THREAD_RET_T THREAD_CALLING_CONVENTION css_master_thread(void)
static bool css_check_ha_log_applier_done(void)
static int css_read_header(CSS_CONN_ENTRY *conn, NET_HEADER *local_header)
css_server_external_task(CSS_CONN_ENTRY *conn, cubthread::entry_task *task)
static void css_send_reply_to_new_client_request(CSS_CONN_ENTRY *conn, unsigned short rid, int reason)
void logtb_disable_update(THREAD_ENTRY *thread_p)
void css_close_server_connection_socket(void)
void css_set_ha_repl_delayed(void)
int css_init(THREAD_ENTRY *thread_p, char *server_name, int name_length, int port_id)
static cubthread::entry_workpool * css_Server_request_worker_pool
#define START_EXCLUSIVE_ACCESS_ACTIVE_CONN_ANCHOR(r)
int(* CSS_THREAD_FN)(THREAD_ENTRY *thrd, CSS_THREAD_ARG)
#define HA_CHANGE_MODE_IMMEDIATELY
static void css_start_all_threads(void)
int xacl_reload(THREAD_ENTRY *thread_p)
size_t css_count_transaction_worker_threads(THREAD_ENTRY *thread_p, int tran_index, int client_id)
#define rmutex_unlock(a, b)
unsigned int css_get_comm_request_id(THREAD_ENTRY *thread_p)
#define ERR_CSS_MASTER_PIPE_ERROR
void showstmt_free_array_context(THREAD_ENTRY *thread_p, SHOWSTMT_ARRAY_CONTEXT *ctx)
static int css_internal_request_handler(THREAD_ENTRY &thread_ref, CSS_CONN_ENTRY &conn_ref)
void css_shutdown_conn(CSS_CONN_ENTRY *conn)
void execute(context_type &thread_ref) overridefinal
unsigned int htonl(unsigned int from)
unsigned int css_send_reply_and_3_data_to_client(CSS_CONN_ENTRY *conn, unsigned int eid, char *reply, int reply_size, char *buffer1, int buffer1_size, char *buffer2, int buffer2_size, char *buffer3, int buffer3_size)
void css_free_conn(CSS_CONN_ENTRY *conn)
SOCKET css_Pipe_to_master
LOG_LSA * log_get_eof_lsa(void)
#define csect_enter(a, b, c)
static void css_refuse_connection_request(SOCKET new_fd, unsigned short rid, int reason, int error)
#define pthread_mutex_unlock(a)
void logpb_force_flush_pages(THREAD_ENTRY *thread_p)
#define CHECK_CLIENT_IS_ALIVE()
int logtb_count_not_allowed_clients_in_maintenance_mode(THREAD_ENTRY *thread_p)
int css_change_ha_server_state(THREAD_ENTRY *thread_p, HA_SERVER_STATE state, bool force, int timeout, bool heartbeat)
#define ASSERT_ERROR_AND_SET(error_code)
~css_server_external_task(void)
bool logtb_set_tran_index_interrupt(THREAD_ENTRY *thread_p, int tran_index, bool set)
void thread_sleep(double millisec)
#define assert_release(e)
#define CSS_RID_FROM_EID(eid)
void thread_wakeup_already_had_mutex(cubthread::entry *thread_p, thread_resume_suspend_status resume_reason)
const char * get_host_name() const
static void css_push_server_task(CSS_CONN_ENTRY &conn_ref)
void css_initialize_server_interfaces(int(*request_handler)(THREAD_ENTRY *thrd, unsigned int eid, int request, int size, char *buffer), CSS_THREAD_FN connection_error_function)
static bool css_Server_shutdown_inited
char * er_get_area_error(char *buffer, int *length)
static bool ha_Repl_delay_detected
cubthread::manager * thread_get_manager(void)
HA_LOG_APPLIER_STATE state
#define OR_ALIGNED_BUF(size)
static CSS_CONN_ENTRY * css_Master_conn
#define ER_CSS_SERVER_HA_MODE_CHANGE
#define MASTER_TO_SRV_MSG_SIZE
void push_task_on_core(entry_workpool *worker_pool_arg, entry_task *exec_p, std::size_t core_hash)
const int LOG_WORKER_POOL_CONNECTIONS
const int LOG_WORKER_POOL_TRAN_WORKERS
void map_running_contexts(bool &stop, Func &&func, Args &&...args) const
int jsp_jvm_is_loaded(void)
void logtb_enable_update(THREAD_ENTRY *thread_p)
entry_workpool * create_worker_pool(std::size_t pool_size, std::size_t task_max_count, const char *name, entry_manager *context_manager, std::size_t core_count, bool debug_logging, bool pool_threads=false, wait_seconds wait_for_task_time=std::chrono::seconds(5))
#define OR_ALIGNED_BUF_SIZE(abuf)
void css_set_thread_info(THREAD_ENTRY *thread_p, int client_id, int rid, int tran_index, int net_request_index)
int css_read_ip_info(IP_INFO **out_ip_info, char *filename)
static css_error_code css_internal_connection_handler(CSS_CONN_ENTRY *conn)
std::chrono::high_resolution_clock clock_type
SHOWSTMT_ARRAY_CONTEXT * showstmt_alloc_array_context(THREAD_ENTRY *thread_p, int num_total, int num_cols)
unsigned int css_send_reply_and_2_data_to_client(CSS_CONN_ENTRY *conn, unsigned int eid, char *reply, int reply_size, char *buffer1, int buffer1_size, char *buffer2, int buffer2_size)
static int css_Master_port_id
static int css_check_conn(CSS_CONN_ENTRY *p)
#define er_log_debug(...)
int css_return_queued_error(CSS_CONN_ENTRY *conn, unsigned short request_id, char **buffer, int *buffer_size, int *rc)
void prm_set_integer_value(PARAM_ID prm_id, int value)
static char * css_Master_server_name
static bool css_get_connection_thread_pooling_configuration(void)
#define HA_LOG_APPLIER_STATE_TABLE_MAX
unsigned int css_receive_data_from_client_with_timeout(CSS_CONN_ENTRY *conn, unsigned int eid, char **buffer, int *size, int timeout)
int css_receive_request(CSS_CONN_ENTRY *conn, unsigned short *rid, int *request, int *buffer_size)
size_t css_get_num_connection_workers(void)
void css_set_ha_num_of_hosts(int num)
static cubthread::wait_seconds css_get_connection_thread_timeout_configuration(void)
static void css_wp_core_job_scan_mapper(const cubthread::entry_workpool::core &wp_core, bool &stop_mapper, THREAD_ENTRY *thread_p, SHOWSTMT_ARRAY_CONTEXT *ctx, size_t &core_index, int &error_code)
#define TR_TABLE_CS_ENTER(thread_p)
static void css_find_not_stopped(THREAD_ENTRY &thread_ref, bool &stop, bool is_log_writer, bool &found)
#define OR_ALIGNED_BUF_START(abuf)
int css_send_heartbeat_data(CSS_CONN_ENTRY *conn, const char *data, int size)
char boot_Host_name[CUB_MAXHOSTNAMELEN]
int css_set_accessible_ip_info(void)
void css_insert_into_active_conn_list(CSS_CONN_ENTRY *conn)
static int css_reestablish_connection_to_master(void)
static HA_LOG_APPLIER_STATE_TABLE ha_Log_applier_state[HA_LOG_APPLIER_STATE_TABLE_MAX]
manager * get_manager(void)
static IP_INFO * css_Server_accessible_ip_info
void er_set(int severity, const char *file_name, const int line_no, int err_id, int num_args,...)
void css_push_external_task(CSS_CONN_ENTRY *conn, cubthread::entry_task *task)
db_client_type client_type
void LOG_CS_ENTER_READ_MODE(THREAD_ENTRY *thread_p)
static void css_count_transaction_worker_threads_mapfunc(THREAD_ENTRY &thread_ref, bool &stop_mapper, THREAD_ENTRY *caller_thread, int tran_index, int client_id, size_t &count)
clock::time_point time_point
static bool css_is_log_writer(const THREAD_ENTRY &thread_arg)
css_error_code(* css_Connect_handler)(CSS_CONN_ENTRY *)
#define ERR_CSS_SHUTDOWN_ERROR
#define BOOT_IS_ALLOWED_CLIENT_TYPE_IN_MT_MODE(host1, host2, client_type)
std::size_t get_core_count(void) const
int prm_get_integer_value(PARAM_ID prm_id)
int er_set_area_error(char *server_area)
int logtb_count_clients(THREAD_ENTRY *thread_p)
static const size_t CSS_JOB_QUEUE_SCAN_COLUMN_COUNT
static int(* css_Server_request_handler)(THREAD_ENTRY *, unsigned int, int, int, char *)
#define ER_OUT_OF_VIRTUAL_MEMORY
void LOG_CS_EXIT(THREAD_ENTRY *thread_p)
static void css_process_get_server_ha_mode_request(SOCKET master_fd)
void css_register_handler_routines(css_error_code(*connect_handler)(CSS_CONN_ENTRY *conn), CSS_THREAD_FN request_handler, CSS_THREAD_FN connection_error_handler)
void map_cores(Func &&func, Args &&...args)
const char * css_ha_server_state_string(HA_SERVER_STATE state)
#define ER_HB_PROCESS_EVENT
bool css_are_all_request_handlers_suspended(void)
#define IS_INVALID_SOCKET(socket)
SOCKET css_server_accept(SOCKET sockfd)
#define DEFAULT_HEADER_DATA
bool css_is_shutdowning_server()
void thread_check_suspend_reason_and_wakeup(cubthread::entry *thread_p, thread_resume_suspend_status resume_reason, thread_resume_suspend_status suspend_reason)
static void css_setup_server_loop(void)
static void css_set_shutdown_timeout(int timeout)
static void css_initialize_conn(CSS_CONN_ENTRY *conn, SOCKET fd)
#define ER_INACCESSIBLE_IP
bool logtb_is_current_active(THREAD_ENTRY *thread_p)
static void css_close_connection_to_master(void)
static char * ip_list_file_name
void logtb_slam_transaction(THREAD_ENTRY *thread_p, int tran_index)
enum ha_log_applier_state HA_LOG_APPLIER_STATE
HA_SERVER_STATE css_ha_server_state(void)
static char ip_file_real_path[PATH_MAX]
bool css_is_shutdown_timeout_expired(void)
void lock_force_thread_timeout_lock(THREAD_ENTRY *thrd)
static int ha_Log_applier_state_num
unsigned int css_receive_data_from_client(CSS_CONN_ENTRY *conn, unsigned int eid, char **buffer, int *size)
void vacuum_stop_workers(THREAD_ENTRY *thread_p)
void er_log_stats(void) const
#define rmutex_lock(a, b)
void css_end_server_request(CSS_CONN_ENTRY *conn)
static void css_process_change_server_ha_mode_request(SOCKET master_fd)
int session_get_private_lru_idx(const void *session_p)
css_connection_task(CSS_CONN_ENTRY &conn)
static void css_process_shutdown_request(SOCKET master_fd)
bool css_peer_alive(SOCKET sd, int timeout)
SOCKET css_Server_connection_socket
SOCKET css_open_new_socket_from_master(SOCKET fd, unsigned short *rid)
void get_stats(cubperf::stat_value *stats_out) const
void thread_lock_entry(cubthread::entry *thread_p)
struct packet_header NET_HEADER
int css_get_ha_num_of_hosts(void)
void css_unset_ha_repl_delayed(void)
DB_VALUE * showstmt_alloc_tuple_in_context(THREAD_ENTRY *thread_p, SHOWSTMT_ARRAY_CONTEXT *ctx)
CSS_CONN_ENTRY * css_get_current_conn_entry(void)
static struct timeval start_time
#define OR_LOG_LSA_ALIGNED_SIZE
int css_get_client_id(THREAD_ENTRY *thread_p)
std::size_t system_core_count(void)
void er_set_with_oserror(int severity, const char *file_name, const int line_no, int err_id, int num_args,...)
int count(int &result, const cub_regex_object ®, const std::string &src, const int position, const INTL_CODESET codeset)
static void css_process_get_eof_request(SOCKET master_fd)
static void css_stop_all_workers(THREAD_ENTRY &thread_ref, css_thread_stop_type stop_phase)
void log_append_ha_server_state(THREAD_ENTRY *thread_p, int state)
wait_duration< std::chrono::seconds > wait_seconds
CSS_CONN_ENTRY * css_Active_conn_anchor
static int css_get_master_request(SOCKET master_fd)
void map_running_contexts(Func &&func, Args &&...args)
unsigned int css_send_abort_to_client(CSS_CONN_ENTRY *conn, unsigned int eid)
void execute(context_type &thread_ref) overridefinal
void destroy_worker_pool(entry_workpool *&worker_pool_arg)
static void css_is_any_thread_not_suspended_mapfunc(THREAD_ENTRY &thread_ref, bool &stop_mapper, size_t &count, bool &found)
int css_send_heartbeat_request(CSS_CONN_ENTRY *conn, int command)
#define RMUTEX_NAME_TEMP_CONN_ENTRY
static struct timeval css_Shutdown_timeout
static void error(const char *msg)
void worker_manager_stop_all()
#define ER_CSS_CLIENTS_EXCEEDED
void thread_unlock_entry(cubthread::entry *thread_p)
int css_readn(SOCKET fd, char *ptr, int nbytes, int timeout)
int css_receive_data(CSS_CONN_ENTRY *conn, unsigned short req_id, char **buffer, int *buffer_size, int timeout)
static cubthread::entry_workpool * css_Connection_worker_pool
void css_get_thread_stats(UINT64 *stats_out)
int css_get_max_conn(void)
css_server_task(CSS_CONN_ENTRY &conn)
static cubthread::wait_seconds css_get_server_request_thread_timeout_configuration(void)
int css_windows_startup(void)
unsigned short ntohs(unsigned short from)
static bool css_get_server_request_thread_pooling_configuration(void)
static bool css_check_ha_log_applier_working(void)
int css_check_ip(IP_INFO *ip_info, unsigned char *address)
void css_dealloc_conn_rmutex(CSS_CONN_ENTRY *conn)
#define csect_enter_as_reader(a, b, c)
int css_free_ip_info(IP_INFO *ip_info)
#define FROM_REGISTER_CLIENT
void thread_clear_recursion_depth(cubthread::entry *thread_p)
#define free_and_init(ptr)
char * prm_get_string_value(PARAM_ID prm_id)
static void css_stop_non_log_writer(THREAD_ENTRY &thread_ref, bool &, THREAD_ENTRY &stopper_thread_ref)
char * css_pack_server_name(const char *server_name, int *name_length)
std::size_t get_max_count(void) const
void xacl_dump(THREAD_ENTRY *thread_p, FILE *outfp)
void css_windows_shutdown(void)
bool prm_get_bool_value(PARAM_ID prm_id)
static int css_process_master_request(SOCKET master_fd)
CSS_CONN_ENTRY * css_connect_to_master_server(int master_port_id, const char *server_name, int name_length)
bool css_is_ha_repl_delayed(void)
int css_check_ha_server_state_for_client(THREAD_ENTRY *thread_p, int whence)
void start_all_workers(void)
int css_job_queues_start_scan(THREAD_ENTRY *thread_p, int show_type, DB_VALUE **arg_values, int arg_cnt, void **ptr)
unsigned int css_return_eid_from_conn(CSS_CONN_ENTRY *conn, CSS_MAP_ENTRY **anchor, unsigned short rid)
int hb_register_to_master(CSS_CONN_ENTRY *conn, int type)
unsigned int css_send_reply_and_data_to_client(CSS_CONN_ENTRY *conn, unsigned int eid, char *reply, int reply_size, char *buffer, int buffer_size)
static int css_connection_handler_thread(THREAD_ENTRY *thrd, CSS_CONN_ENTRY *conn)
#define ERR_CSS_ERROR_FROM_SERVER
void css_start_shutdown_server()
size_t css_get_num_total_workers(void)
static void css_process_new_client(SOCKET master_fd)
static HA_SERVER_STATE css_transit_ha_server_state(THREAD_ENTRY *thread_p, HA_SERVER_STATE req_state)
char * or_pack_log_lsa(const char *ptr, const struct log_lsa *lsa)
char * strdup(const char *str)
int css_read_and_queue(CSS_CONN_ENTRY *conn, int *type)
#define pthread_mutex_lock(a)
#define CSS_ENTRYID_FROM_EID(eid)
static int ha_Server_num_of_hosts
int db_make_int(DB_VALUE *value, const int num)
unsigned int ntohl(unsigned int from)
int css_send_data(CSS_CONN_ENTRY *conn, unsigned short rid, const char *buffer, int buffer_size)
enum ha_server_state HA_SERVER_STATE
#define rmutex_initialize(a, b)
void push_task(entry_workpool *worker_pool_arg, entry_task *exec_p)
char * css_add_client_version_string(THREAD_ENTRY *thread_p, const char *version_string)
std::size_t get_max_worker_count(void) const
int css_send_error(CSS_CONN_ENTRY *conn, unsigned short rid, const char *buffer, int buffer_size)
int css_send_abort_request(CSS_CONN_ENTRY *conn, unsigned short request_id)
static int css_test_for_client_errors(CSS_CONN_ENTRY *conn, unsigned int eid)
bool log_prior_has_worker_log_records(THREAD_ENTRY *thread_p)
void css_block_all_active_conn(unsigned short stop_phase)
#define TR_TABLE_CS_EXIT(thread_p)
CSS_CONN_ENTRY * css_Conn_array
void log_set_ha_promotion_time(THREAD_ENTRY *thread_p, INT64 ha_promotion_time)
unsigned int css_send_error_to_client(CSS_CONN_ENTRY *conn, unsigned int eid, char *buffer, int buffer_size)
static HA_SERVER_STATE ha_Server_state
void execute(context_type &thread_ref) overridefinal
#define END_EXCLUSIVE_ACCESS_ACTIVE_CONN_ANCHOR(r)
bool is_logging_configured(const int logging_flag)
int css_notify_ha_log_applier_state(THREAD_ENTRY *thread_p, HA_LOG_APPLIER_STATE state)
SIGNAL_HANDLER_FUNCTION os_set_signal_handler(const int sig_no, SIGNAL_HANDLER_FUNCTION sig_handler)
#define THREAD_CALLING_CONVENTION
const char * rel_major_release_string(void)
void css_cleanup_server_queues(unsigned int eid)
static int css_check_accessibility(SOCKET new_fd)
const char * css_ha_applier_state_string(HA_LOG_APPLIER_STATE state)
char * envvar_confdir_file(char *path, size_t size, const char *filename)
void css_remove_all_unexpected_packets(CSS_CONN_ENTRY *conn)
CSS_CONN_ENTRY * css_make_conn(SOCKET fd)