CUBRID Engine  latest
server_support.c
Go to the documentation of this file.
1 /*
2  * Copyright 2008 Search Solution Corporation
3  * Copyright 2016 CUBRID Corporation
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 /*
20  * server_support.c - server interface
21  */
22 
23 #ident "$Id$"
24 
25 #include "server_support.h"
26 
27 #include "config.h"
28 #include "load_worker_manager.hpp"
29 #include "log_append.hpp"
30 #include "session.h"
31 #include "thread_entry_task.hpp"
32 #include "thread_entry.hpp"
33 #include "thread_manager.hpp"
34 #include "thread_worker_pool.hpp"
35 
36 #include <stdio.h>
37 #include <stdlib.h>
38 #include <string.h>
39 #include <errno.h>
40 #if !defined(WINDOWS)
41 #include <signal.h>
42 #include <unistd.h>
43 #if defined(SOLARIS)
44 #include <sys/filio.h>
45 #endif /* SOLARIS */
46 #include <sys/socket.h>
47 #include <fcntl.h>
48 #include <netinet/in.h>
49 #endif /* !WINDOWS */
50 #include <assert.h>
51 
52 #include "porting.h"
53 #include "memory_alloc.h"
54 #include "boot_sr.h"
55 #include "connection_defs.h"
56 #include "connection_globals.h"
57 #include "release_string.h"
58 #include "system_parameter.h"
59 #include "environment_variable.h"
60 #include "error_manager.h"
61 #include "connection_error.h"
62 #include "message_catalog.h"
63 #include "critical_section.h"
64 #include "lock_manager.h"
65 #include "log_lsa.hpp"
66 #include "log_manager.h"
67 #include "network.h"
68 #include "object_representation.h"
69 #include "jsp_sr.h"
70 #include "show_scan.h"
71 #if defined(WINDOWS)
72 #include "wintcp.h"
73 #else /* WINDOWS */
74 #include "tcp.h"
75 #endif /* WINDOWS */
76 #include "connection_sr.h"
77 #include "xserver_interface.h"
78 #include "utility.h"
79 #include "vacuum.h"
80 #if !defined(WINDOWS)
81 #include "heartbeat.h"
82 #endif
83 #include "dbtype.h"
84 
85 #define CSS_WAIT_COUNT 5 /* # of retry to connect to master */
86 #define CSS_GOING_DOWN_IMMEDIATELY "Server going down immediately"
87 
88 #if defined(WINDOWS)
89 #define SockError SOCKET_ERROR
90 #else /* WINDOWS */
91 #define SockError -1
92 #endif /* WINDOWS */
93 
94 #define RMUTEX_NAME_TEMP_CONN_ENTRY "TEMP_CONN_ENTRY"
95 
96 static bool css_Server_shutdown_inited = false;
97 static struct timeval css_Shutdown_timeout = { 0, 0 };
98 
99 static char *css_Master_server_name = NULL; /* database identifier */
103 static char *ip_list_file_name = NULL;
104 static char ip_file_real_path[PATH_MAX];
105 
106 /* internal request hander function */
107 static int (*css_Server_request_handler) (THREAD_ENTRY *, unsigned int, int, int, char *);
108 
109 /* server's state for HA feature */
111 static bool ha_Repl_delay_detected = false;
112 
113 static int ha_Server_num_of_hosts = 0;
114 
115 #define HA_LOG_APPLIER_STATE_TABLE_MAX 5
118 {
121 };
122 
129 };
130 
132 
133 // *INDENT-OFF*
136 
138 {
139 public:
140 
141  css_server_task (void) = delete;
142 
144  : m_conn (conn)
145  {
146  }
147 
148  void execute (context_type &thread_ref) override final;
149 
150  // retire not overwritten; task is automatically deleted
151 
152 private:
154 };
155 
156 // css_server_external_task - class used for legacy desgin; external modules may push tasks on css worker pool and we
157 // need to make sure conn_entry is properly initialized.
158 //
159 // TODO: remove me
161 {
162 public:
163  css_server_external_task (void) = delete;
164 
166  : m_conn (conn)
167  , m_task (task)
168  {
169  }
170 
172  {
173  m_task->retire ();
174  }
175 
176  void execute (context_type &thread_ref) override final;
177 
178  // retire not overwritten; task is automatically deleted
179 
180 private:
183 };
184 
186 {
187 public:
188 
189  css_connection_task (void) = delete;
190 
192  : m_conn (conn)
193  {
194  //
195  }
196 
197  void execute (context_type & thread_ref) override final;
198 
199  // retire not overwritten; task is automatically deleted
200 
201 private:
203 };
204 
205 static const size_t CSS_JOB_QUEUE_SCAN_COLUMN_COUNT = 4;
206 
207 static void css_setup_server_loop (void);
208 static int css_check_conn (CSS_CONN_ENTRY * p);
209 static void css_set_shutdown_timeout (int timeout);
210 static int css_get_master_request (SOCKET master_fd);
211 static int css_process_master_request (SOCKET master_fd);
212 static void css_process_shutdown_request (SOCKET master_fd);
213 static void css_send_reply_to_new_client_request (CSS_CONN_ENTRY * conn, unsigned short rid, int reason);
214 static void css_refuse_connection_request (SOCKET new_fd, unsigned short rid, int reason, int error);
215 static void css_process_new_client (SOCKET master_fd);
216 static void css_process_get_server_ha_mode_request (SOCKET master_fd);
217 static void css_process_change_server_ha_mode_request (SOCKET master_fd);
218 static void css_process_get_eof_request (SOCKET master_fd);
219 
220 static void css_close_connection_to_master (void);
221 static int css_reestablish_connection_to_master (void);
222 static int css_connection_handler_thread (THREAD_ENTRY * thrd, CSS_CONN_ENTRY * conn);
224 static int css_internal_request_handler (THREAD_ENTRY & thread_ref, CSS_CONN_ENTRY & conn_ref);
225 static int css_test_for_client_errors (CSS_CONN_ENTRY * conn, unsigned int eid);
226 static int css_check_accessibility (SOCKET new_fd);
227 
228 #if defined(WINDOWS)
229 static int css_process_new_connection_request (void);
230 #endif /* WINDOWS */
231 
232 static bool css_check_ha_log_applier_done (void);
233 static bool css_check_ha_log_applier_working (void);
234 
235 static void css_push_server_task (CSS_CONN_ENTRY & conn_ref);
236 static void css_stop_non_log_writer (THREAD_ENTRY & thread_ref, bool &, THREAD_ENTRY & stopper_thread_ref);
237 static void css_stop_log_writer (THREAD_ENTRY & thread_ref, bool &);
238 static void css_find_not_stopped (THREAD_ENTRY & thread_ref, bool & stop, bool is_log_writer, bool & found);
239 static bool css_is_log_writer (const THREAD_ENTRY & thread_arg);
240 static void css_stop_all_workers (THREAD_ENTRY & thread_ref, css_thread_stop_type stop_phase);
241 static void css_wp_worker_get_busy_count_mapper (THREAD_ENTRY & thread_ref, bool & stop_mapper, int &busy_count);
242 // cubthread::entry_workpool::core confuses indent
243 static void css_wp_core_job_scan_mapper (const cubthread::entry_workpool::core & wp_core, bool & stop_mapper,
244  THREAD_ENTRY * thread_p, SHOWSTMT_ARRAY_CONTEXT * ctx, size_t & core_index,
245  int & error_code);
246 static void
247 css_is_any_thread_not_suspended_mapfunc (THREAD_ENTRY & thread_ref, bool & stop_mapper, size_t & count, bool & found);
248 static void
249 css_count_transaction_worker_threads_mapfunc (THREAD_ENTRY & thread_ref, bool & stop_mapper,
250  THREAD_ENTRY * caller_thread, int tran_index, int client_id,
251  size_t & count);
252 
254 
259 static void css_start_all_threads (void);
260 // *INDENT-ON*
261 
262 #if defined (SERVER_MODE)
263 /*
264  * css_job_queues_start_scan() - start scan function for 'SHOW JOB QUEUES'
265  * return: NO_ERROR, or ER_code
266  * thread_p(in): thread entry
267  * show_type(in):
268  * arg_values(in):
269  * arg_cnt(in):
270  * ptr(in/out): 'show job queues' context
271  *
272  * NOTE: job queues don't really exist anymore, at least not the way SHOW JOB QUEUES statement was created for.
273  * we now have worker pool "cores" that act as partitions of workers and queued tasks.
274  * for backward compatibility, the statement is not changed; only its columns are reinterpreted
275  * 1. job queue index => core index
276  * 2. job queue max workers => core max workers
277  * 3. job queue busy workers => core busy workers
278  * 4. job queue connection workers => 0 // connection workers are separated in a different worker pool
279  */
280 int
281 css_job_queues_start_scan (THREAD_ENTRY * thread_p, int show_type, DB_VALUE ** arg_values, int arg_cnt, void **ptr)
282 {
283  int error = NO_ERROR;
285 
286  *ptr = NULL;
287 
288  ctx = showstmt_alloc_array_context (thread_p, (int) css_Server_request_worker_pool->get_core_count (),
290  if (ctx == NULL)
291  {
292  ASSERT_ERROR_AND_SET (error);
293  return error;
294  }
295 
296  size_t core_index = 0; // core index starts with 0
297  css_Server_request_worker_pool->map_cores (css_wp_core_job_scan_mapper, thread_p, ctx, core_index, error);
298  if (error != NO_ERROR)
299  {
300  ASSERT_ERROR ();
301  showstmt_free_array_context (thread_p, ctx);
302  return error;
303  }
304  *ptr = ctx;
305 
306  return NO_ERROR;
307 }
308 #endif // SERVER_MODE
309 
310 /*
311  * css_setup_server_loop() -
312  * return:
313  */
314 static void
316 {
317 #if !defined(WINDOWS)
318  (void) os_set_signal_handler (SIGPIPE, SIG_IGN);
319 #endif /* not WINDOWS */
320 
321 #if defined(SA_MODE) && (defined(LINUX) || defined(x86_SOLARIS) || defined(HPUX))
322  if (!jsp_jvm_is_loaded ())
323  {
324  (void) os_set_signal_handler (SIGFPE, SIG_IGN);
325  }
326 #else /* LINUX || x86_SOLARIS || HPUX */
327  (void) os_set_signal_handler (SIGFPE, SIG_IGN);
328 #endif /* LINUX || x86_SOLARIS || HPUX */
329 
331  {
332  /* execute master thread. */
334  }
335  else
336  {
338  }
339 }
340 
341 /*
342  * css_check_conn() -
343  * return:
344  * p(in):
345  */
346 static int
348 {
349 #if defined(WINDOWS)
350  u_long status = 0;
351 #else
352  int status = 0;
353 #endif
354 
355 #if defined(WINDOWS)
356  if (ioctlsocket (p->fd, FIONREAD, &status) == SockError || p->status != CONN_OPEN)
357  {
358  return ER_FAILED;
359  }
360 #else /* WINDOWS */
361  if (fcntl (p->fd, F_GETFL, status) < 0 || p->status != CONN_OPEN)
362  {
363  return ER_FAILED;
364  }
365 #endif /* WINDOWS */
366 
367  return NO_ERROR;
368 }
369 
370 /*
371  * css_set_shutdown_timeout() -
372  * return:
373  * timeout(in):
374  */
375 static void
377 {
378  if (gettimeofday (&css_Shutdown_timeout, NULL) == 0)
379  {
380  css_Shutdown_timeout.tv_sec += timeout;
381  }
382  return;
383 }
384 
385 /*
386  * css_master_thread() - Master thread, accept/process master process's request
387  * return:
388  * arg(in):
389  */
392 {
393  int r, run_code = 1, status = 0, nfds;
394  struct pollfd po[] = { {0, 0, 0}, {0, 0, 0} };
395 
396  while (run_code)
397  {
398  /* check if socket has error or client is down */
399  if (!IS_INVALID_SOCKET (css_Pipe_to_master) && css_check_conn (css_Master_conn) < 0)
400  {
401  css_shutdown_conn (css_Master_conn);
403  }
404 
405  /* clear the pollfd each time before poll */
406  nfds = 0;
407  po[0].fd = -1;
408  po[0].events = 0;
409 
411  {
412  po[0].fd = css_Pipe_to_master;
413  po[0].events = POLLIN;
414  nfds = 1;
415  }
416 #if defined(WINDOWS)
418  {
419  po[1].fd = css_Server_connection_socket;
420  po[1].events = POLLIN;
421  nfds = 2;
422  }
423 #endif /* WINDOWS */
424 
425  /* select() sets timeout value to 0 or waited time */
426  r = poll (po, nfds, (prm_get_integer_value (PRM_ID_TCP_CONNECTION_TIMEOUT) * 1000));
427  if (r > 0 && (IS_INVALID_SOCKET (css_Pipe_to_master) || !(po[0].revents & POLLIN))
428 #if defined(WINDOWS)
429  && (IS_INVALID_SOCKET (css_Server_connection_socket) || !(po[1].revents & POLLIN))
430 #endif /* WINDOWS */
431  )
432  {
433  continue;
434  }
435 
436  if (r < 0)
437  {
439 #if defined(WINDOWS)
440  && ioctlsocket (css_Pipe_to_master, FIONREAD, (u_long *) (&status)) == SockError
441 #else /* WINDOWS */
442  && fcntl (css_Pipe_to_master, F_GETFL, status) == SockError
443 #endif /* WINDOWS */
444  )
445  {
447  break;
448  }
449  }
450  else if (r > 0)
451  {
452  if (!IS_INVALID_SOCKET (css_Pipe_to_master) && (po[0].revents & POLLIN))
453  {
455  if (run_code == -1)
456  {
458  /* shutdown message received */
459  run_code = (!HA_DISABLED ())? 0 : 1;
460  }
461 
462  if (run_code == 0 && !HA_DISABLED ())
463  {
465  "Disconnected with the cub_master and will shut itself down", "");
466  }
467  }
468 #if !defined(WINDOWS)
469  else
470  {
471  break;
472  }
473 
474 #else /* !WINDOWS */
475  if (!IS_INVALID_SOCKET (css_Server_connection_socket) && (po[1].revents & POLLIN))
476  {
477  css_process_new_connection_request ();
478  }
479 #endif /* !WINDOWS */
480  }
481 
482  if (run_code)
483  {
485  {
487  }
488  }
489  else
490  {
491  break;
492  }
493  }
494 
496 
497 #if defined(WINDOWS)
498  return 0;
499 #else /* WINDOWS */
500  return NULL;
501 #endif /* WINDOWS */
502 }
503 
504 /*
505  * css_get_master_request () -
506  * return:
507  * master_fd(in):
508  */
509 static int
511 {
512  int request, r;
513 
514  r = css_readn (master_fd, (char *) &request, sizeof (int), -1);
515  if (r == sizeof (int))
516  {
517  return ((int) ntohl (request));
518  }
519  else
520  {
521  return (-1);
522  }
523 }
524 
525 /*
526  * css_process_master_request () -
527  * return:
528  * master_fd(in):
529  * read_fd_var(in):
530  * exception_fd_var(in):
531  */
532 static int
534 {
535  int request, r;
536 
537  r = 1;
538  request = (int) css_get_master_request (master_fd);
539 
540  switch (request)
541  {
543  css_process_new_client (master_fd);
544  break;
545 
547  css_process_shutdown_request (master_fd);
548  r = 0;
549  break;
550 
554  case SERVER_STOP_TRACING:
558  break;
559  case SERVER_GET_HA_MODE:
561  break;
562 #if !defined(WINDOWS)
565  break;
566  case SERVER_GET_EOF:
567  css_process_get_eof_request (master_fd);
568  break;
569 #endif
570  default:
571  /* master do not respond */
572  r = -1;
573  break;
574  }
575 
576  return r;
577 }
578 
579 /*
580  * css_process_shutdown_request () -
581  * return:
582  * master_fd(in):
583  */
584 static void
586 {
587  char buffer[MASTER_TO_SRV_MSG_SIZE];
588  int r, timeout;
589 
590  timeout = (int) css_get_master_request (master_fd);
591 
592  r = css_readn (master_fd, buffer, MASTER_TO_SRV_MSG_SIZE, -1);
593  if (r < 0)
594  {
596  return;
597  }
598 }
599 
600 static void
601 css_send_reply_to_new_client_request (CSS_CONN_ENTRY * conn, unsigned short rid, int reason)
602 {
603  char reply_buf[sizeof (int)];
604  int t;
605 
606  // the first is reason.
607  t = htonl (reason);
608  memcpy (reply_buf, (char *) &t, sizeof (int));
609 
610  css_send_data (conn, rid, reply_buf, (int) sizeof (reply_buf));
611 }
612 
613 static void
614 css_refuse_connection_request (SOCKET new_fd, unsigned short rid, int reason, int error)
615 {
616  CSS_CONN_ENTRY temp_conn;
617  OR_ALIGNED_BUF (1024) a_buffer;
618  char *buffer;
619  char *area;
620  int length = 1024;
621  int r;
622 
623  /* open a temporary connection to send a reply to client.
624  * Note that no name is given for its csect. also see css_is_temporary_conn_csect.
625  */
626 
627  css_initialize_conn (&temp_conn, new_fd);
628  r = rmutex_initialize (&temp_conn.rmutex, RMUTEX_NAME_TEMP_CONN_ENTRY);
629  assert (r == NO_ERROR);
630 
631 #if defined (WINDOWS)
632  // WINDOWS style connection. see css_process_new_connection_request
633 
635 
636  r = css_read_header (&temp_conn, &header);
637  if (r != NO_ERRORS)
638  {
639  assert (r == NO_ERRORS);
640  return;
641  }
642 #endif /* WINDOWS */
643 
644  css_send_reply_to_new_client_request (&temp_conn, rid, reason);
645 
646  buffer = OR_ALIGNED_BUF_START (a_buffer);
647 
648  area = er_get_area_error (buffer, &length);
649 
650  temp_conn.db_error = error;
651  css_send_error (&temp_conn, rid, area, length);
652  css_shutdown_conn (&temp_conn);
653  css_dealloc_conn_rmutex (&temp_conn);
654  er_clear ();
655 }
656 
657 /*
658  * css_process_new_client () -
659  * return:
660  * master_fd(in):
661  */
662 static void
664 {
665  SOCKET new_fd;
666  int error;
667  CSS_CONN_ENTRY *conn;
668  unsigned short rid;
669 
670  /* receive new socket descriptor from the master */
671  new_fd = css_open_new_socket_from_master (master_fd, &rid);
672  if (IS_INVALID_SOCKET (new_fd))
673  {
674  return;
675  }
676 
678  {
679  ASSERT_ERROR_AND_SET (error);
681  return;
682  }
683 
684  conn = css_make_conn (new_fd);
685  if (conn == NULL)
686  {
687  error = ER_CSS_CLIENTS_EXCEEDED;
690  return;
691  }
692 
694 
696  {
697  (void) (*css_Connect_handler) (conn);
698  }
699  else
700  {
701  assert_release (false);
702  }
703 }
704 
705 /*
706  * css_process_get_server_ha_mode_request() -
707  * return:
708  */
709 static void
711 {
712  int r;
713  int response;
714 
715  if (HA_DISABLED ())
716  {
717  response = htonl (HA_SERVER_STATE_NA);
718  }
719  else
720  {
721  response = htonl (ha_Server_state);
722  }
723 
724  r = send (master_fd, (char *) &response, sizeof (int), 0);
725  if (r < 0)
726  {
728  return;
729  }
730 
731 }
732 
733 /*
734  * css_process_get_server_ha_mode_request() -
735  * return:
736  */
737 static void
739 {
740 #if !defined(WINDOWS)
742  THREAD_ENTRY *thread_p;
743 
744  state = (HA_SERVER_STATE) css_get_master_request (master_fd);
745 
746  thread_p = thread_get_thread_entry_info ();
747  assert (thread_p != NULL);
748 
749  if (state == HA_SERVER_STATE_ACTIVE || state == HA_SERVER_STATE_STANDBY)
750  {
751  if (css_change_ha_server_state (thread_p, state, false, HA_CHANGE_MODE_IMMEDIATELY, true) != NO_ERROR)
752  {
753  er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ERR_CSS_ERROR_FROM_SERVER, 1, "Cannot change server HA mode");
754  }
755  }
756  else
757  {
758  er_log_debug (ARG_FILE_LINE, "ERROR : unexpected state. (state :%d). \n", state);
759  }
760 
761  state = (HA_SERVER_STATE) htonl ((int) css_ha_server_state ());
762 
764  css_send_heartbeat_data (css_Master_conn, (char *) &state, sizeof (state));
765 #endif
766 }
767 
768 /*
769  * css_process_get_eof_request() -
770  * return:
771  */
772 static void
774 {
775 #if !defined(WINDOWS)
776  LOG_LSA *eof_lsa;
778  char *reply;
779  THREAD_ENTRY *thread_p;
780 
781  reply = OR_ALIGNED_BUF_START (a_reply);
782 
783  thread_p = thread_get_thread_entry_info ();
784  assert (thread_p != NULL);
785 
786  LOG_CS_ENTER_READ_MODE (thread_p);
787 
788  eof_lsa = log_get_eof_lsa ();
789  (void) or_pack_log_lsa (reply, eof_lsa);
790 
791  LOG_CS_EXIT (thread_p);
792 
793  css_send_heartbeat_request (css_Master_conn, SERVER_GET_EOF);
794  css_send_heartbeat_data (css_Master_conn, reply, OR_ALIGNED_BUF_SIZE (a_reply));
795 #endif
796 }
797 
798 /*
799  * css_close_connection_to_master() -
800  * return:
801  */
802 static void
804 {
806  {
807  css_shutdown_conn (css_Master_conn);
808  }
810  css_Master_conn = NULL;
811 }
812 
813 /*
814  * css_shutdown_timeout() -
815  * return:
816  */
817 bool
819 {
820  struct timeval timeout;
821 
822  /* css_Shutdown_timeout is set by shutdown request */
823  if (css_Shutdown_timeout.tv_sec != 0 && gettimeofday (&timeout, NULL) == 0)
824  {
825  if (css_Shutdown_timeout.tv_sec <= timeout.tv_sec)
826  {
827  return true;
828  }
829  }
830 
831  return false;
832 }
833 
834 #if defined(WINDOWS)
835 /*
836  * css_process_new_connection_request () -
837  * return:
838  *
839  * Note: Called when a connect() is detected on the
840  * css_Server_connection_socket indicating the presence of a new client
841  * attempting to connect. Accept the connection and establish a new FD
842  * for this client. Send him back a little blip so he knows things are
843  * ok.
844  */
845 static int
846 css_process_new_connection_request (void)
847 {
848  SOCKET new_fd;
849  int reason, buffer_size, rc;
850  CSS_CONN_ENTRY *conn;
851  unsigned short rid;
853  int error;
854 
856 
857  if (IS_INVALID_SOCKET (new_fd))
858  {
859  return 1;
860  }
861 
863  {
864  ASSERT_ERROR_AND_SET (error);
866  return -1;
867  }
868 
869  conn = css_make_conn (new_fd);
870  if (conn == NULL)
871  {
872  error = ER_CSS_CLIENTS_EXCEEDED;
875  return -1;
876  }
877 
878  buffer_size = sizeof (NET_HEADER);
879  do
880  {
881  /* css_receive_request */
882  if (!conn || conn->status != CONN_OPEN)
883  {
884  rc = CONNECTION_CLOSED;
885  break;
886  }
887 
888  rc = css_read_header (conn, &header);
889  if (rc == NO_ERRORS)
890  {
891  rid = (unsigned short) ntohl (header.request_id);
892 
893  if (ntohl (header.type) != COMMAND_TYPE)
894  {
895  buffer_size = reason = rid = 0;
896  rc = WRONG_PACKET_TYPE;
897  }
898  else
899  {
900  reason = (int) (unsigned short) ntohs (header.function_code);
901  buffer_size = (int) ntohl (header.buffer_size);
902  }
903  }
904  }
905  while (rc == WRONG_PACKET_TYPE);
906 
907  if (rc == NO_ERRORS)
908  {
909  if (reason == DATA_REQUEST)
910  {
912 
914  {
915  (void) (*css_Connect_handler) (conn);
916  }
917  }
918  else
919  {
921 
922  css_free_conn (conn);
923  }
924  }
925 
926  /* can't let problems accepting client requests terminate the loop */
927  return 1;
928 }
929 #endif /* WINDOWS */
930 
931 /*
932  * css_reestablish_connection_to_master() -
933  * return:
934  */
935 static int
937 {
938  CSS_CONN_ENTRY *conn;
939  static int i = CSS_WAIT_COUNT;
940  char *packed_server_name;
941  int name_length;
942 
943  if (i-- > 0)
944  {
945  return 0;
946  }
947  i = CSS_WAIT_COUNT;
948 
949  packed_server_name = css_pack_server_name (css_Master_server_name, &name_length);
950  if (packed_server_name != NULL)
951  {
952  conn = css_connect_to_master_server (css_Master_port_id, packed_server_name, name_length);
953  if (conn != NULL)
954  {
955  css_Pipe_to_master = conn->fd;
956  if (css_Master_conn)
957  {
958  css_free_conn (css_Master_conn);
959  }
960  css_Master_conn = conn;
961  free_and_init (packed_server_name);
962  return 1;
963  }
964  else
965  {
966  free_and_init (packed_server_name);
967  }
968  }
969 
971  return 0;
972 }
973 
974 /*
975  * css_connection_handler_thread () - Accept/process request from one client
976  * return:
977  * arg(in):
978  *
979  * Note: One server thread per one client
980  */
981 static int
983 {
984  int n, type, rv, status;
985  volatile int conn_status;
986  int css_peer_alive_timeout, poll_timeout;
987  int max_num_loop, num_loop;
988  SOCKET fd;
989  struct pollfd po[1] = { {0, 0, 0} };
990 
991  if (thread_p == NULL)
992  {
993  thread_p = thread_get_thread_entry_info ();
994  }
995 
996  fd = conn->fd;
997 
998  pthread_mutex_unlock (&thread_p->tran_index_lock);
999 
1000  thread_p->type = TT_SERVER; /* server thread */
1001 
1002  css_peer_alive_timeout = 5000;
1003  poll_timeout = 100;
1004  max_num_loop = css_peer_alive_timeout / poll_timeout;
1005  num_loop = 0;
1006 
1007  status = NO_ERRORS;
1008  /* check if socket has error or client is down */
1009  while (thread_p->shutdown == false && conn->stop_talk == false)
1010  {
1011  /* check the connection */
1012  conn_status = conn->status;
1013  if (conn_status == CONN_CLOSING)
1014  {
1015  /* There's an interesting race condition among client, worker thread and connection handler.
1016  * Please find CBRD-21375 for detail and also see sboot_notify_unregister_client.
1017  *
1018  * We have to synchronize here with worker thread which may be in sboot_notify_unregister_client
1019  * to let it have a chance to send reply to client.
1020  */
1021  rmutex_lock (thread_p, &conn->rmutex);
1022 
1023  conn_status = conn->status;
1024 
1025  rmutex_unlock (thread_p, &conn->rmutex);
1026  }
1027 
1028  if (conn_status != CONN_OPEN)
1029  {
1030  er_log_debug (ARG_FILE_LINE, "css_connection_handler_thread: conn->status (%d) is not CONN_OPEN.",
1031  conn_status);
1032  status = CONNECTION_CLOSED;
1033  break;
1034  }
1035 
1036  po[0].fd = fd;
1037  po[0].events = POLLIN;
1038  po[0].revents = 0;
1039  n = poll (po, 1, poll_timeout);
1040  if (n == 0)
1041  {
1042  if (num_loop < max_num_loop)
1043  {
1044  num_loop++;
1045  continue;
1046  }
1047  num_loop = 0;
1048 
1049 #if !defined (WINDOWS)
1050  /* 0 means it timed out and no fd is changed. */
1051  if (CHECK_CLIENT_IS_ALIVE ())
1052  {
1053  if (css_peer_alive (fd, css_peer_alive_timeout) == false)
1054  {
1055  er_log_debug (ARG_FILE_LINE, "css_connection_handler_thread: css_peer_alive() error\n");
1056  status = CONNECTION_CLOSED;
1057  break;
1058  }
1059  }
1060 
1061  /* check server's HA state */
1063  && css_count_transaction_worker_threads (thread_p, conn->get_tran_index (), conn->client_id) == 0)
1064  {
1065  status = REQUEST_REFUSED;
1066  break;
1067  }
1068 #endif /* !WINDOWS */
1069  continue;
1070  }
1071  else if (n < 0)
1072  {
1073  num_loop = 0;
1074 
1075  if (errno == EINTR)
1076  {
1077  continue;
1078  }
1079  else
1080  {
1081  er_log_debug (ARG_FILE_LINE, "css_connection_handler_thread: select() error\n");
1082  status = ERROR_ON_READ;
1083  break;
1084  }
1085  }
1086  else
1087  {
1088  num_loop = 0;
1089 
1090  if (po[0].revents & POLLERR || po[0].revents & POLLHUP)
1091  {
1092  status = ERROR_ON_READ;
1093  break;
1094  }
1095 
1096  /* read command/data/etc request from socket, and enqueue it to appr. queue */
1097  status = css_read_and_queue (conn, &type);
1098  if (status != NO_ERRORS)
1099  {
1100  er_log_debug (ARG_FILE_LINE, "css_connection_handler_thread: css_read_and_queue() error\n");
1101  break;
1102  }
1103  else
1104  {
1105  /* if new command request has arrived, make new job and add it to job queue */
1106  if (type == COMMAND_TYPE)
1107  {
1108  // push new task
1109  css_push_server_task (*conn);
1110  }
1111  }
1112  }
1113  }
1114 
1115  /* check the connection and call connection error handler */
1116  if (status != NO_ERRORS || css_check_conn (conn) != NO_ERROR)
1117  {
1119  "css_connection_handler_thread: status %d conn { status %d transaction_id %d "
1120  "db_error %d stop_talk %d stop_phase %d }\n", status, conn->status, conn->get_tran_index (),
1121  conn->db_error, conn->stop_talk, conn->stop_phase);
1122  rv = pthread_mutex_lock (&thread_p->tran_index_lock);
1123  (*css_Connection_error_handler) (thread_p, conn);
1124  }
1125  else
1126  {
1127  assert (thread_p->shutdown == true || conn->stop_talk == true);
1128  }
1129 
1130  return 0;
1131 }
1132 
1133 /*
1134  * css_block_all_active_conn() - Before shutdown, stop all server thread
1135  * return:
1136  *
1137  * Note: All communication will be stopped
1138  */
1139 void
1140 css_block_all_active_conn (unsigned short stop_phase)
1141 {
1142  CSS_CONN_ENTRY *conn;
1143  int r;
1144 
1146 
1147  for (conn = css_Active_conn_anchor; conn != NULL; conn = conn->next)
1148  {
1149  r = rmutex_lock (NULL, &conn->rmutex);
1150  assert (r == NO_ERROR);
1151 
1152  if (conn->stop_phase != stop_phase)
1153  {
1154  r = rmutex_unlock (NULL, &conn->rmutex);
1155  assert (r == NO_ERROR);
1156  continue;
1157  }
1158  css_end_server_request (conn);
1159  if (!IS_INVALID_SOCKET (conn->fd) && conn->fd != css_Pipe_to_master)
1160  {
1161  conn->stop_talk = true;
1162  logtb_set_tran_index_interrupt (NULL, conn->get_tran_index (), 1);
1163  }
1164 
1165  r = rmutex_unlock (NULL, &conn->rmutex);
1166  assert (r == NO_ERROR);
1167  }
1168 
1170 }
1171 
1172 /*
1173  * css_internal_connection_handler() -
1174  * return:
1175  * conn(in):
1176  *
1177  * Note: This routine is "registered" to be called when a new connection is requested by the client
1178  */
1179 static css_error_code
1181 {
1183 
1184  // push connection handler task
1185  cubthread::get_manager ()->push_task (css_Connection_worker_pool, new css_connection_task (*conn));
1186 
1187  return NO_ERRORS;
1188 }
1189 
1190 /*
1191  * css_internal_request_handler() -
1192  * return:
1193  * arg(in):
1194  *
1195  * Note: This routine is "registered" to be called when a new request is
1196  * initiated by the client.
1197  *
1198  * To now support multiple concurrent requests from the same client,
1199  * check if a request is actually sent on the socket. If data was sent
1200  * (not a request), then just return and the scheduler will wake up the
1201  * thread that is blocking for data.
1202  */
1203 static int
1205 {
1206  unsigned short rid;
1207  unsigned int eid;
1208  int request, rc, size = 0;
1209  char *buffer = NULL;
1210  int local_tran_index;
1211  int status = CSS_UNPLANNED_SHUTDOWN;
1212 
1213  assert (thread_ref.conn_entry == &conn_ref);
1214 
1215  local_tran_index = thread_ref.tran_index;
1216 
1217  rc = css_receive_request (&conn_ref, &rid, &request, &size);
1218  if (rc == NO_ERRORS)
1219  {
1220  /* 1. change thread's transaction id to this connection's */
1221  thread_ref.tran_index = conn_ref.get_tran_index ();
1222 
1223  pthread_mutex_unlock (&thread_ref.tran_index_lock);
1224 
1225  if (size)
1226  {
1227  rc = css_receive_data (&conn_ref, rid, &buffer, &size, -1);
1228  if (rc != NO_ERRORS)
1229  {
1230  return status;
1231  }
1232  }
1233 
1234  conn_ref.db_error = 0; /* This will reset the error indicator */
1235 
1236  eid = css_return_eid_from_conn (&conn_ref, rid);
1237  /* 2. change thread's client, rid, tran_index for this request */
1238  css_set_thread_info (&thread_ref, conn_ref.client_id, eid, conn_ref.get_tran_index (), request);
1239 
1240  /* 3. Call server_request() function */
1241  status = css_Server_request_handler (&thread_ref, eid, request, size, buffer);
1242 
1243  /* 4. reset thread transaction id(may be NULL_TRAN_INDEX) */
1244  css_set_thread_info (&thread_ref, -1, 0, local_tran_index, -1);
1245  }
1246  else
1247  {
1248  pthread_mutex_unlock (&thread_ref.tran_index_lock);
1249 
1250  if (rc == ERROR_WHEN_READING_SIZE || rc == NO_DATA_AVAILABLE)
1251  {
1252  status = CSS_NO_ERRORS;
1253  }
1254  }
1255 
1256  return status;
1257 }
1258 
1259 /*
1260  * css_initialize_server_interfaces() - initialize the server interfaces
1261  * return:
1262  * request_handler(in):
1263  * thrd(in):
1264  * eid(in):
1265  * request(in):
1266  * size(in):
1267  * buffer(in):
1268  */
1269 void
1270 css_initialize_server_interfaces (int (*request_handler) (THREAD_ENTRY * thrd, unsigned int eid, int request,
1271  int size, char *buffer),
1272  CSS_THREAD_FN connection_error_function)
1273 {
1274  css_Server_request_handler = request_handler;
1275  css_register_handler_routines (css_internal_connection_handler, NULL /* disabled */ , connection_error_function);
1276 }
1277 
1278 bool
1280 {
1282 }
1283 
1284 void
1286 {
1288 }
1289 
1290 /*
1291  * css_init() -
1292  * return:
1293  * thread_p(in):
1294  * server_name(in):
1295  * name_length(in):
1296  * port_id(in):
1297  *
1298  * Note: This routine is the entry point for the server interface. Once this
1299  * routine is called, control will not return to the caller until the
1300  * server/scheduler is stopped. Please call
1301  * css_initialize_server_interfaces before calling this function.
1302  */
1303 int
1304 css_init (THREAD_ENTRY * thread_p, char *server_name, int name_length, int port_id)
1305 {
1306  CSS_CONN_ENTRY *conn;
1307  int status = NO_ERROR;
1308 
1309  if (server_name == NULL || port_id <= 0)
1310  {
1311  return ER_FAILED;
1312  }
1313 
1314 #if defined(WINDOWS)
1315  if (css_windows_startup () < 0)
1316  {
1317  fprintf (stderr, "Winsock startup error\n");
1318  return ER_FAILED;
1319  }
1320 #endif /* WINDOWS */
1321 
1322  // initialize worker pool for server requests
1323  const std::size_t MAX_WORKERS = css_get_max_conn () + 1; // = css_Num_max_conn in connection_sr.c
1324  const std::size_t MAX_TASK_COUNT = 2 * MAX_WORKERS; // not that it matters...
1325  const std::size_t MAX_CONNECTIONS = css_get_max_conn () + 1;
1326 
1327  // create request worker pool
1328  css_Server_request_worker_pool =
1329  cubthread::get_manager ()->create_worker_pool (MAX_WORKERS, MAX_TASK_COUNT, "transaction workers", NULL,
1335  if (css_Server_request_worker_pool == NULL)
1336  {
1337  assert (false);
1339  status = ER_FAILED;
1340  goto shutdown;
1341  }
1342 
1343  // create connection worker pool
1344  css_Connection_worker_pool =
1345  cubthread::get_manager ()->create_worker_pool (MAX_CONNECTIONS, MAX_CONNECTIONS, "connection threads", NULL, 1,
1350  if (css_Connection_worker_pool == NULL)
1351  {
1352  assert (false);
1354  status = ER_FAILED;
1355  goto shutdown;
1356  }
1357 
1359 
1360  conn = css_connect_to_master_server (port_id, server_name, name_length);
1361  if (conn != NULL)
1362  {
1363  /* insert conn into active conn list */
1365 
1366  css_Master_server_name = strdup (server_name);
1367  css_Master_port_id = port_id;
1368  css_Pipe_to_master = conn->fd;
1369  css_Master_conn = conn;
1370 
1371 #if !defined(WINDOWS)
1372  if (!HA_DISABLED ())
1373  {
1374  status = hb_register_to_master (css_Master_conn, HB_PTYPE_SERVER);
1375  if (status != NO_ERROR)
1376  {
1377  fprintf (stderr, "failed to heartbeat register.\n");
1378  }
1379  }
1380 #endif
1381 
1382  if (status == NO_ERROR)
1383  {
1384  // server message loop
1386  }
1387  }
1388 
1389 shutdown:
1390  /*
1391  * start to shutdown server
1392  */
1394 
1395  // stop threads; in first phase we need to stop active workers, but keep log writers for a while longer to make sure
1396  // all log is transfered
1398 
1399  /* stop vacuum threads. */
1400  vacuum_stop_workers (thread_p);
1401 
1402  // stop load sessions
1404 
1405  /* we should flush all append pages before stop log writer */
1406  logpb_force_flush_pages (thread_p);
1407 
1408 #if !defined(NDEBUG)
1409  /* All active transaction and vacuum workers should have been stopped. Only system transactions are still running. */
1411 #endif
1412 
1413  // stop log writers
1415 
1417  {
1418  perfmon_er_log_current_stats (thread_p);
1419  }
1420  css_Server_request_worker_pool->er_log_stats ();
1421  css_Connection_worker_pool->er_log_stats ();
1422 
1423  // destroy thread worker pools
1424  thread_get_manager ()->destroy_worker_pool (css_Server_request_worker_pool);
1425  thread_get_manager ()->destroy_worker_pool (css_Connection_worker_pool);
1426 
1427  if (!HA_DISABLED ())
1428  {
1430  }
1431 
1433  {
1435  }
1436 
1437  /* If this was opened for the new style connection protocol, make sure it gets closed. */
1439 
1440 #if defined(WINDOWS)
1442 #endif /* WINDOWS */
1443 
1444  return status;
1445 }
1446 
1447 /*
1448  * css_send_data_to_client() - send a data buffer to the server
1449  * return:
1450  * eid(in): enquiry id
1451  * buffer(in): data buffer to queue for expected data.
1452  * buffer_size(in): size of data buffer
1453  *
1454  * Note: This is to be used ONLY by the server to return data to the client
1455  */
1456 unsigned int
1457 css_send_data_to_client (CSS_CONN_ENTRY * conn, unsigned int eid, char *buffer, int buffer_size)
1458 {
1459  int rc = 0;
1460 
1461  assert (conn != NULL);
1462 
1463  rc = css_send_data (conn, CSS_RID_FROM_EID (eid), buffer, buffer_size);
1464  return (rc == NO_ERRORS) ? 0 : rc;
1465 }
1466 
1467 /*
1468  * css_send_reply_and_data_to_client() - send a reply to the server,
1469  * and optionaly, an additional data
1470  * buffer
1471  * return:
1472  * eid(in): enquiry id
1473  * reply(in): the reply data (error or no error)
1474  * reply_size(in): the size of the reply data.
1475  * buffer(in): data buffer to queue for expected data.
1476  * buffer_size(in): size of data buffer
1477  *
1478  * Note: This is to be used only by the server
1479  */
1480 unsigned int
1481 css_send_reply_and_data_to_client (CSS_CONN_ENTRY * conn, unsigned int eid, char *reply, int reply_size, char *buffer,
1482  int buffer_size)
1483 {
1484  int rc = 0;
1485 
1486  assert (conn != NULL);
1487 
1488  if (buffer_size > 0 && buffer != NULL)
1489  {
1490  rc = css_send_two_data (conn, CSS_RID_FROM_EID (eid), reply, reply_size, buffer, buffer_size);
1491  }
1492  else
1493  {
1494  rc = css_send_data (conn, CSS_RID_FROM_EID (eid), reply, reply_size);
1495  }
1496 
1497  return (rc == NO_ERRORS) ? NO_ERROR : rc;
1498 }
1499 
1500 #if 0
1501 /*
1502  * css_send_reply_and_large_data_to_client() - send a reply to the server,
1503  * and optionaly, an additional l
1504  * large data
1505  * buffer
1506  * return:
1507  * eid(in): enquiry id
1508  * reply(in): the reply data (error or no error)
1509  * reply_size(in): the size of the reply data.
1510  * buffer(in): data buffer to queue for expected data.
1511  * buffer_size(in): size of data buffer
1512  *
1513  * Note: This is to be used only by the server
1514  */
1515 unsigned int
1516 css_send_reply_and_large_data_to_client (unsigned int eid, char *reply, int reply_size, char *buffer, INT64 buffer_size)
1517 {
1518  CSS_CONN_ENTRY *conn;
1519  int rc = 0;
1520  int idx = CSS_ENTRYID_FROM_EID (eid);
1521  int num_buffers;
1522  char **buffers;
1523  int *buffers_size, i;
1524  INT64 pos = 0;
1525 
1526  conn = &css_Conn_array[idx];
1527  if (buffer_size > 0 && buffer != NULL)
1528  {
1529  num_buffers = (int) (buffer_size / INT_MAX) + 2;
1530 
1531  buffers = (char **) malloc (sizeof (char *) * num_buffers);
1532  if (buffers == NULL)
1533  {
1534  er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, sizeof (char *) * num_buffers);
1535  return ER_OUT_OF_VIRTUAL_MEMORY;
1536  }
1537 
1538  buffers_size = (int *) malloc (sizeof (int) * num_buffers);
1539  if (buffers_size == NULL)
1540  {
1541  free (buffers);
1542  er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, sizeof (int) * num_buffers);
1543  return ER_OUT_OF_VIRTUAL_MEMORY;
1544  }
1545 
1546  buffers[0] = reply;
1547  buffers_size[0] = reply_size;
1548 
1549  for (i = 1; i < num_buffers; i++)
1550  {
1551  buffers[i] = &buffer[pos];
1552  if (buffer_size > INT_MAX)
1553  {
1554  buffers_size[i] = INT_MAX;
1555  }
1556  else
1557  {
1558  buffers_size[i] = buffer_size;
1559  }
1560  pos += buffers_size[i];
1561  }
1562 
1563  rc = css_send_large_data (conn, CSS_RID_FROM_EID (eid), (const char **) buffers, buffers_size, num_buffers);
1564 
1565  free (buffers);
1566  free (buffers_size);
1567  }
1568  else
1569  {
1570  rc = css_send_data (conn, CSS_RID_FROM_EID (eid), reply, reply_size);
1571  }
1572 
1573  return (rc == NO_ERRORS) ? NO_ERROR : rc;
1574 }
1575 #endif
1576 
1577 /*
1578  * css_send_reply_and_2_data_to_client() - send a reply to the server,
1579  * and optionaly, an additional data
1580  * buffer
1581  * return:
1582  * eid(in): enquiry id
1583  * reply(in): the reply data (error or no error)
1584  * reply_size(in): the size of the reply data.
1585  * buffer1(in): data buffer to queue for expected data.
1586  * buffer1_size(in): size of data buffer
1587  * buffer2(in): data buffer to queue for expected data.
1588  * buffer2_size(in): size of data buffer
1589  *
1590  * Note: This is to be used only by the server
1591  */
1592 unsigned int
1593 css_send_reply_and_2_data_to_client (CSS_CONN_ENTRY * conn, unsigned int eid, char *reply, int reply_size,
1594  char *buffer1, int buffer1_size, char *buffer2, int buffer2_size)
1595 {
1596  int rc = 0;
1597 
1598  assert (conn != NULL);
1599 
1600  if (buffer2 == NULL || buffer2_size <= 0)
1601  {
1602  return (css_send_reply_and_data_to_client (conn, eid, reply, reply_size, buffer1, buffer1_size));
1603  }
1604  rc =
1605  css_send_three_data (conn, CSS_RID_FROM_EID (eid), reply, reply_size, buffer1, buffer1_size, buffer2, buffer2_size);
1606 
1607  return (rc == NO_ERRORS) ? 0 : rc;
1608 }
1609 
1610 /*
1611  * css_send_reply_and_3_data_to_client() - send a reply to the server,
1612  * and optionaly, an additional data
1613  * buffer
1614  * return:
1615  * eid(in): enquiry id
1616  * reply(in): the reply data (error or no error)
1617  * reply_size(in): the size of the reply data.
1618  * buffer1(in): data buffer to queue for expected data.
1619  * buffer1_size(in): size of data buffer
1620  * buffer2(in): data buffer to queue for expected data.
1621  * buffer2_size(in): size of data buffer
1622  * buffer3(in): data buffer to queue for expected data.
1623  * buffer3_size(in): size of data buffer
1624  *
1625  * Note: This is to be used only by the server
1626  */
1627 unsigned int
1628 css_send_reply_and_3_data_to_client (CSS_CONN_ENTRY * conn, unsigned int eid, char *reply, int reply_size,
1629  char *buffer1, int buffer1_size, char *buffer2, int buffer2_size, char *buffer3,
1630  int buffer3_size)
1631 {
1632  int rc = 0;
1633 
1634  assert (conn != NULL);
1635 
1636  if (buffer3 == NULL || buffer3_size <= 0)
1637  {
1638  return (css_send_reply_and_2_data_to_client (conn, eid, reply, reply_size, buffer1, buffer1_size, buffer2,
1639  buffer2_size));
1640  }
1641 
1642  rc = css_send_four_data (conn, CSS_RID_FROM_EID (eid), reply, reply_size, buffer1, buffer1_size, buffer2,
1643  buffer2_size, buffer3, buffer3_size);
1644 
1645  return (rc == NO_ERRORS) ? 0 : rc;
1646 }
1647 
1648 /*
1649  * css_send_error_to_client() - send an error buffer to the server
1650  * return:
1651  * conn(in): connection entry
1652  * eid(in): enquiry id
1653  * buffer(in): data buffer to queue for expected data.
1654  * buffer_size(in): size of data buffer
1655  *
1656  * Note: This is to be used ONLY by the server to return error data to the
1657  * client.
1658  */
1659 unsigned int
1660 css_send_error_to_client (CSS_CONN_ENTRY * conn, unsigned int eid, char *buffer, int buffer_size)
1661 {
1662  int rc;
1663 
1664  assert (conn != NULL);
1665 
1666  rc = css_send_error (conn, CSS_RID_FROM_EID (eid), buffer, buffer_size);
1667 
1668  return (rc == NO_ERRORS) ? 0 : rc;
1669 }
1670 
1671 /*
1672  * css_send_abort_to_client() - send an abort message to the client
1673  * return:
1674  * eid(in): enquiry id
1675  */
1676 unsigned int
1677 css_send_abort_to_client (CSS_CONN_ENTRY * conn, unsigned int eid)
1678 {
1679  int rc = 0;
1680 
1681  assert (conn != NULL);
1682 
1683  rc = css_send_abort_request (conn, CSS_RID_FROM_EID (eid));
1684 
1685  return (rc == NO_ERRORS) ? 0 : rc;
1686 }
1687 
1688 /*
1689  * css_test_for_client_errors () -
1690  * return: error id from the client
1691  * conn(in):
1692  * eid(in):
1693  */
1694 static int
1695 css_test_for_client_errors (CSS_CONN_ENTRY * conn, unsigned int eid)
1696 {
1697  char *error_buffer;
1698  int error_size, rc, errid = NO_ERROR;
1699 
1700  assert (conn != NULL);
1701 
1702  if (css_return_queued_error (conn, CSS_RID_FROM_EID (eid), &error_buffer, &error_size, &rc))
1703  {
1704  errid = er_set_area_error (error_buffer);
1705  free_and_init (error_buffer);
1706  }
1707  return errid;
1708 }
1709 
1710 /*
1711  * css_receive_data_from_client() - return data that was sent by the server
1712  * return:
1713  * eid(in): enquiry id
1714  * buffer(out): data buffer to send to client.
1715  * buffer_size(out): size of data buffer
1716  *
1717  * note: caller should know that it returns zero on success and
1718  * returns css error code on failure
1719  */
1720 unsigned int
1721 css_receive_data_from_client (CSS_CONN_ENTRY * conn, unsigned int eid, char **buffer, int *size)
1722 {
1723  return css_receive_data_from_client_with_timeout (conn, eid, buffer, size, -1);
1724 }
1725 
1726 /*
1727  * css_receive_data_from_client_with_timeout() - return data that was sent by the server
1728  * return:
1729  * eid(in): enquiry id
1730  * buffer(out): data buffer to send to client.
1731  * buffer_size(out): size of data buffer
1732  * timeout(in): timeout in seconds
1733  *
1734  * note: caller should know that it returns zero on success and
1735  * returns css error code on failure
1736  */
1737 unsigned int
1738 css_receive_data_from_client_with_timeout (CSS_CONN_ENTRY * conn, unsigned int eid, char **buffer, int *size,
1739  int timeout)
1740 {
1741  int rc = 0;
1742 
1743  assert (conn != NULL);
1744 
1745  *size = 0;
1746 
1747  rc = css_receive_data (conn, CSS_RID_FROM_EID (eid), buffer, size, timeout);
1748 
1749  if (rc == NO_ERRORS || rc == RECORD_TRUNCATED)
1750  {
1751  css_test_for_client_errors (conn, eid);
1752  return 0;
1753  }
1754 
1755  return rc;
1756 }
1757 
1758 /*
1759  * css_end_server_request() - terminates the request from the client
1760  * return:
1761  * conn(in/out):
1762  */
1763 void
1765 {
1766  int r;
1767 
1768  r = rmutex_lock (NULL, &conn->rmutex);
1769  assert (r == NO_ERROR);
1770 
1772  conn->status = CONN_CLOSING;
1773 
1774  r = rmutex_unlock (NULL, &conn->rmutex);
1775  assert (r == NO_ERROR);
1776 }
1777 
1778 /*
1779  * css_pack_server_name() -
1780  * return: a new string containing the server name and the database version
1781  * string
1782  * server_name(in): the name of the database volume
1783  * name_length(out): returned size of the server_name
1784  *
1785  * Note: Builds a character buffer with three embedded strings: the database
1786  * volume name, a string containing the release identifier, and the
1787  * CUBRID environment variable (if exists)
1788  */
1789 char *
1790 css_pack_server_name (const char *server_name, int *name_length)
1791 {
1792  char *packed_name = NULL;
1793  const char *env_name = NULL;
1794  char pid_string[16], *s;
1795  const char *t;
1796 
1797  if (server_name != NULL)
1798  {
1799  env_name = envvar_root ();
1800  if (env_name == NULL)
1801  {
1802  return NULL;
1803  }
1804 
1805  /*
1806  * here we changed the 2nd string in packed_name from
1807  * rel_release_string() to rel_major_release_string()
1808  * solely for the purpose of matching the name of the cubrid driver.
1809  * That is, the name of the cubrid driver has been changed to use
1810  * MAJOR_RELEASE_STRING (see drivers/Makefile). So, here we must also
1811  * use rel_major_release_string(), so master can successfully find and
1812  * fork cubrid drivers.
1813  */
1814 
1815  sprintf (pid_string, "%d", getpid ());
1816  *name_length =
1817  (int) (strlen (server_name) + 1 + strlen (rel_major_release_string ()) + 1 + strlen (env_name) + 1 +
1818  strlen (pid_string) + 1);
1819 
1820  /* in order to prepend '#' */
1821  if (!HA_DISABLED ())
1822  {
1823  (*name_length)++;
1824  }
1825 
1826  packed_name = (char *) malloc (*name_length);
1827  if (packed_name == NULL)
1828  {
1829  er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, (size_t) (*name_length));
1830  return NULL;
1831  }
1832 
1833  s = packed_name;
1834  t = server_name;
1835 
1836  if (!HA_DISABLED ())
1837  {
1838  *s++ = '#';
1839  }
1840 
1841  while (*t)
1842  {
1843  *s++ = *t++;
1844  }
1845  *s++ = '\0';
1846 
1847  t = rel_major_release_string ();
1848  while (*t)
1849  {
1850  *s++ = *t++;
1851  }
1852  *s++ = '\0';
1853 
1854  t = env_name;
1855  while (*t)
1856  {
1857  *s++ = *t++;
1858  }
1859  *s++ = '\0';
1860 
1861  t = pid_string;
1862  while (*t)
1863  {
1864  *s++ = *t++;
1865  }
1866  *s++ = '\0';
1867  }
1868  return packed_name;
1869 }
1870 
1871 /*
1872  * css_add_client_version_string() - add the version_string to socket queue
1873  * entry structure
1874  * return: pointer to version_string in the socket queue entry structure
1875  * version_string(in):
1876  */
1877 char *
1878 css_add_client_version_string (THREAD_ENTRY * thread_p, const char *version_string)
1879 {
1880  char *ver_str = NULL;
1881  CSS_CONN_ENTRY *conn;
1882 
1883  assert (thread_p != NULL);
1884 
1885  conn = thread_p->conn_entry;
1886  if (conn != NULL)
1887  {
1888  if (conn->version_string == NULL)
1889  {
1890  ver_str = (char *) malloc (strlen (version_string) + 1);
1891  if (ver_str != NULL)
1892  {
1893  strcpy (ver_str, version_string);
1894  conn->version_string = ver_str;
1895  }
1896  else
1897  {
1899  (size_t) (strlen (version_string) + 1));
1900  }
1901  }
1902  else
1903  {
1904  /* already registered */
1905  ver_str = conn->version_string;
1906  }
1907  }
1908 
1909  return ver_str;
1910 }
1911 
1912 #if defined (ENABLE_UNUSED_FUNCTION)
1913 /*
1914  * css_get_client_version_string() - retrieve the version_string from socket
1915  * queue entry structure
1916  * return:
1917  */
1918 char *
1919 css_get_client_version_string (void)
1920 {
1921  CSS_CONN_ENTRY *entry;
1922 
1923  entry = css_get_current_conn_entry ();
1924  if (entry != NULL)
1925  {
1926  return entry->version_string;
1927  }
1928  else
1929  {
1930  return NULL;
1931  }
1932 }
1933 #endif /* ENABLE_UNUSED_FUNCTION */
1934 
1935 /*
1936  * css_cleanup_server_queues () -
1937  * return:
1938  * eid(in):
1939  */
1940 void
1941 css_cleanup_server_queues (unsigned int eid)
1942 {
1943  int idx = CSS_ENTRYID_FROM_EID (eid);
1944 
1946 }
1947 
1948 /*
1949  * css_set_ha_num_of_hosts -
1950  * return: none
1951  *
1952  * Note: be careful to use
1953  */
1954 void
1956 {
1957  if (num < 1)
1958  {
1959  num = 1;
1960  }
1962  {
1964  }
1965  ha_Server_num_of_hosts = num - 1;
1966 }
1967 
1968 /*
1969  * css_get_ha_num_of_hosts -
1970  * return: return the number of hosts
1971  *
1972  * Note:
1973  */
1974 int
1976 {
1977  return ha_Server_num_of_hosts;
1978 }
1979 
1980 /*
1981  * css_ha_server_state - return the current HA server state
1982  * return: one of HA_SERVER_STATE
1983  */
1986 {
1987  return ha_Server_state;
1988 }
1989 
1990 bool
1992 {
1993  return ha_Repl_delay_detected;
1994 }
1995 
1996 void
1998 {
1999  ha_Repl_delay_detected = true;
2000 }
2001 
2002 void
2004 {
2005  ha_Repl_delay_detected = false;
2006 }
2007 
2008 /*
2009  * css_transit_ha_server_state - request to transit the current HA server
2010  * state to the required state
2011  * return: new state changed if successful or HA_SERVER_STATE_NA
2012  * req_state(in): the state for the server to transit
2013  *
2014  */
2015 static HA_SERVER_STATE
2017 {
2018  struct ha_server_state_transition_table
2019  {
2020  HA_SERVER_STATE cur_state;
2021  HA_SERVER_STATE req_state;
2022  HA_SERVER_STATE next_state;
2023  };
2024  static struct ha_server_state_transition_table ha_Server_state_transition[] = {
2025  /* idle -> active */
2026  {HA_SERVER_STATE_IDLE, HA_SERVER_STATE_ACTIVE, HA_SERVER_STATE_ACTIVE},
2027 #if 0
2028  /* idle -> to-be-standby */
2030 #else
2031  /* idle -> standby */
2032  {HA_SERVER_STATE_IDLE, HA_SERVER_STATE_STANDBY, HA_SERVER_STATE_STANDBY},
2033 #endif
2034  /* idle -> maintenance */
2035  {HA_SERVER_STATE_IDLE, HA_SERVER_STATE_MAINTENANCE, HA_SERVER_STATE_MAINTENANCE},
2036  /* active -> active */
2037  {HA_SERVER_STATE_ACTIVE, HA_SERVER_STATE_ACTIVE, HA_SERVER_STATE_ACTIVE},
2038  /* active -> to-be-standby */
2039  {HA_SERVER_STATE_ACTIVE, HA_SERVER_STATE_STANDBY, HA_SERVER_STATE_TO_BE_STANDBY},
2040  /* to-be-active -> active */
2041  {HA_SERVER_STATE_TO_BE_ACTIVE, HA_SERVER_STATE_ACTIVE, HA_SERVER_STATE_ACTIVE},
2042  /* standby -> standby */
2043  {HA_SERVER_STATE_STANDBY, HA_SERVER_STATE_STANDBY, HA_SERVER_STATE_STANDBY},
2044  /* standby -> to-be-active */
2046  /* statndby -> maintenance */
2047  {HA_SERVER_STATE_STANDBY, HA_SERVER_STATE_MAINTENANCE, HA_SERVER_STATE_MAINTENANCE},
2048  /* to-be-standby -> standby */
2049  {HA_SERVER_STATE_TO_BE_STANDBY, HA_SERVER_STATE_STANDBY, HA_SERVER_STATE_STANDBY},
2050  /* maintenance -> standby */
2051  {HA_SERVER_STATE_MAINTENANCE, HA_SERVER_STATE_STANDBY, HA_SERVER_STATE_TO_BE_STANDBY},
2052  /* end of table */
2053  {HA_SERVER_STATE_NA, HA_SERVER_STATE_NA, HA_SERVER_STATE_NA}
2054  };
2055  struct ha_server_state_transition_table *table;
2056  HA_SERVER_STATE new_state = HA_SERVER_STATE_NA;
2057 
2058  if (ha_Server_state == req_state)
2059  {
2060  return req_state;
2061  }
2062 
2064 
2065  for (table = ha_Server_state_transition; table->cur_state != HA_SERVER_STATE_NA; table++)
2066  {
2067  if (table->cur_state == ha_Server_state && table->req_state == req_state)
2068  {
2069  er_log_debug (ARG_FILE_LINE, "css_transit_ha_server_state: " "ha_Server_state (%s) -> (%s)\n",
2071  new_state = table->next_state;
2072  /* append a dummy log record for LFT to wake LWTs up */
2073  log_append_ha_server_state (thread_p, new_state);
2074  if (!HA_DISABLED ())
2075  {
2078  }
2079  ha_Server_state = new_state;
2080  /* sync up the current HA state with the system parameter */
2082 
2084  {
2085  log_set_ha_promotion_time (thread_p, ((INT64) time (0)));
2087  }
2088 
2089  break;
2090  }
2091  }
2092 
2093  csect_exit (thread_p, CSECT_HA_SERVER_STATE);
2094  return new_state;
2095 }
2096 
2097 /*
2098  * css_check_ha_server_state_for_client
2099  * return: NO_ERROR or errno
2100  * whence(in): 0: others, 1: register_client, 2: unregister_client
2101  */
2102 int
2104 {
2105 #define FROM_OTHERS 0
2106 #define FROM_REGISTER_CLIENT 1
2107 #define FROM_UNREGISTER_CLIENT 2
2108  int err = NO_ERROR;
2110 
2111  /* csect_enter (thread_p, CSECT_HA_SERVER_STATE, INF_WAIT); */
2112 
2113  switch (ha_Server_state)
2114  {
2116  /* Server accepts clients even though it is in a to-be-active state */
2117  break;
2118 
2120  /*
2121  * If the server's state is 'to-be-standby',
2122  * new connection request will be rejected for HA fail-back action.
2123  */
2124  if (whence == FROM_REGISTER_CLIENT)
2125  {
2127  "Connection rejected. " "The server is changing to standby mode.");
2129  }
2130  /*
2131  * If all connected clients are released (by reset-on-commit),
2132  * change the state to 'standby' as a completion of HA fail-back action.
2133  */
2134  else if (whence == FROM_UNREGISTER_CLIENT)
2135  {
2136  if (logtb_count_clients (thread_p) == 1)
2137  {
2139  "logtb_count_clients () = 1 including me "
2140  "transit state from 'to-be-standby' to 'standby'\n");
2142  assert (state == HA_SERVER_STATE_STANDBY);
2143  if (state == HA_SERVER_STATE_STANDBY)
2144  {
2145  er_log_debug (ARG_FILE_LINE, "css_check_ha_server_state_for_client: " "logtb_disable_update() \n");
2146  logtb_disable_update (thread_p);
2147  }
2148  }
2149  }
2150  break;
2151 
2152  default:
2153  break;
2154  }
2155 
2156  /* csect_exit (CSECT_HA_SERVER_STATE); */
2157  return err;
2158 }
2159 
2160 /*
2161  * css_check_ha_log_applier_done - check all log appliers have done
2162  * return: true or false
2163  */
2164 static bool
2166 {
2167  int i;
2168 
2169  for (i = 0; i < ha_Server_num_of_hosts; i++)
2170  {
2171  if (ha_Log_applier_state[i].state != HA_LOG_APPLIER_STATE_DONE)
2172  {
2173  break;
2174  }
2175  }
2176  if (i == ha_Server_num_of_hosts
2178  {
2179  return true;
2180  }
2181  return false;
2182 }
2183 
2184 /*
2185  * css_check_ha_log_applier_working - check all log appliers are working
2186  * return: true or false
2187  */
2188 static bool
2190 {
2191  int i;
2192 
2193  for (i = 0; i < ha_Server_num_of_hosts; i++)
2194  {
2195  if (ha_Log_applier_state[i].state != HA_LOG_APPLIER_STATE_WORKING
2196  || ha_Log_applier_state[i].state != HA_LOG_APPLIER_STATE_DONE)
2197  {
2198  break;
2199  }
2200  }
2201  if (i == ha_Server_num_of_hosts
2203  {
2204  return true;
2205  }
2206  return false;
2207 }
2208 
2209 // *INDENT-OFF*
2210 /*
2211  * css_change_ha_server_state - change the server's HA state
2212  * return: NO_ERROR or ER_FAILED
2213  * state(in): new state for server to be
2214  * force(in): force to change
2215  * timeout(in): timeout (standby to maintenance)
2216  * heartbeat(in): from heartbeat master
2217  */
2218 int
2219 css_change_ha_server_state (THREAD_ENTRY * thread_p, HA_SERVER_STATE state, bool force, int timeout, bool heartbeat)
2220 {
2221  HA_SERVER_STATE orig_state;
2222  int i;
2223 
2224  er_log_debug (ARG_FILE_LINE, "css_change_ha_server_state: ha_Server_state %s " "state %s force %c heartbeat %c\n",
2226  (heartbeat ? 't' : 'f'));
2227 
2228  assert (state >= HA_SERVER_STATE_IDLE && state <= HA_SERVER_STATE_DEAD);
2229 
2230  if (state == ha_Server_state
2233  {
2234  return NO_ERROR;
2235  }
2236 
2237  if (heartbeat == false && !(ha_Server_state == HA_SERVER_STATE_STANDBY && state == HA_SERVER_STATE_MAINTENANCE)
2240  {
2241  return NO_ERROR;
2242  }
2243 
2245 
2246  orig_state = ha_Server_state;
2247 
2248  if (force)
2249  {
2250  if (ha_Server_state != state)
2251  {
2252  er_log_debug (ARG_FILE_LINE, "css_change_ha_server_state:" " set force from %s to state %s\n",
2255  /* append a dummy log record for LFT to wake LWTs up */
2256  log_append_ha_server_state (thread_p, state);
2257  if (!HA_DISABLED ())
2258  {
2261  }
2262 
2264  {
2265  log_set_ha_promotion_time (thread_p, ((INT64) time (0)));
2266  }
2267  }
2268  }
2269 
2270  switch (state)
2271  {
2274  if (state == HA_SERVER_STATE_NA)
2275  {
2276  break;
2277  }
2278  /* If log appliers have changed their state to done, go directly to active mode */
2280  {
2281  er_log_debug (ARG_FILE_LINE, "css_change_ha_server_state: " "css_check_ha_log_applier_done ()\n");
2283  assert (state == HA_SERVER_STATE_ACTIVE);
2284  }
2285  if (state == HA_SERVER_STATE_ACTIVE)
2286  {
2287  er_log_debug (ARG_FILE_LINE, "css_change_ha_server_state: " "logtb_enable_update() \n");
2288  logtb_enable_update (thread_p);
2289  }
2290  break;
2291 
2294  if (state == HA_SERVER_STATE_NA)
2295  {
2296  break;
2297  }
2298  if (orig_state == HA_SERVER_STATE_IDLE)
2299  {
2300  /* If all log appliers have done their recovering actions, go directly to standby mode */
2302  {
2303  er_log_debug (ARG_FILE_LINE, "css_change_ha_server_state: " "css_check_ha_log_applier_working ()\n");
2305  assert (state == HA_SERVER_STATE_STANDBY);
2306  }
2307  }
2308  else
2309  {
2310  /* If there's no active clients (except me), go directly to standby mode */
2311  if (logtb_count_clients (thread_p) == 0)
2312  {
2313  er_log_debug (ARG_FILE_LINE, "css_change_ha_server_state: " "logtb_count_clients () = 0\n");
2315  assert (state == HA_SERVER_STATE_STANDBY);
2316  }
2317  }
2318  if (orig_state == HA_SERVER_STATE_MAINTENANCE)
2319  {
2321  }
2322  if (state == HA_SERVER_STATE_STANDBY)
2323  {
2324  er_log_debug (ARG_FILE_LINE, "css_change_ha_server_state: " "logtb_disable_update() \n");
2325  logtb_disable_update (thread_p);
2326  }
2327  break;
2328 
2331  if (state == HA_SERVER_STATE_NA)
2332  {
2333  break;
2334  }
2335 
2336  if (state == HA_SERVER_STATE_MAINTENANCE)
2337  {
2338  er_log_debug (ARG_FILE_LINE, "css_change_ha_server_state: " "logtb_enable_update() \n");
2339  logtb_enable_update (thread_p);
2340 
2342  }
2343 
2344  for (i = 0; i < timeout; i++)
2345  {
2346  /* waiting timeout second while transaction terminated normally. */
2348  {
2349  break;
2350  }
2351  thread_sleep (1000); /* 1000 msec */
2352  }
2353 
2355  {
2356  LOG_TDES *tdes;
2357 
2358  /* try to kill transaction. */
2359  TR_TABLE_CS_ENTER (thread_p);
2360  // start from transaction index i = 1; system transaction cannot be killed
2361  for (i = 1; i < log_Gl.trantable.num_total_indices; i++)
2362  {
2363  tdes = log_Gl.trantable.all_tdes[i];
2364  if (tdes != NULL && tdes->trid != NULL_TRANID)
2365  {
2367  tdes->client.client_type))
2368  {
2369  logtb_slam_transaction (thread_p, tdes->tran_index);
2370  }
2371  }
2372  }
2373  TR_TABLE_CS_EXIT (thread_p);
2374 
2375  thread_sleep (2000); /* 2000 msec */
2376  }
2377  break;
2378 
2379  default:
2380  state = HA_SERVER_STATE_NA;
2381  break;
2382  }
2383 
2384  csect_exit (thread_p, CSECT_HA_SERVER_STATE);
2385 
2386  return (state != HA_SERVER_STATE_NA) ? NO_ERROR : ER_FAILED;
2387 }
2388 // *INDENT-ON*
2389 
2390 /*
2391  * css_notify_ha_server_mode - notify the log applier's HA state
2392  * return: NO_ERROR or ER_FAILED
2393  * state(in): new state to be recorded
2394  */
2395 int
2397 {
2399  HA_SERVER_STATE server_state;
2400  int i, client_id;
2401 
2403 
2405 
2406  client_id = css_get_client_id (thread_p);
2407  er_log_debug (ARG_FILE_LINE, "css_notify_ha_log_applier_state: client %d state %s\n", client_id,
2408  css_ha_applier_state_string (state));
2409  for (i = 0, table = ha_Log_applier_state; i < ha_Log_applier_state_num; i++, table++)
2410  {
2411  if (table->client_id == client_id)
2412  {
2413  if (table->state == state)
2414  {
2415  csect_exit (thread_p, CSECT_HA_SERVER_STATE);
2416  return NO_ERROR;
2417  }
2418  table->state = state;
2419  break;
2420  }
2422  {
2423  table->client_id = client_id;
2424  table->state = state;
2425  break;
2426  }
2427  }
2428  if (i == ha_Log_applier_state_num && ha_Log_applier_state_num < ha_Server_num_of_hosts)
2429  {
2430  table = &ha_Log_applier_state[ha_Log_applier_state_num++];
2431  table->client_id = client_id;
2432  table->state = state;
2433  }
2434 
2436  {
2437  er_log_debug (ARG_FILE_LINE, "css_notify_ha_log_applier_state: " "css_check_ha_log_applier_done()\n");
2438  server_state = css_transit_ha_server_state (thread_p, HA_SERVER_STATE_ACTIVE);
2439  assert (server_state == HA_SERVER_STATE_ACTIVE);
2440  if (server_state == HA_SERVER_STATE_ACTIVE)
2441  {
2442  er_log_debug (ARG_FILE_LINE, "css_notify_ha_log_applier_state: " "logtb_enable_update() \n");
2443  logtb_enable_update (thread_p);
2444  }
2445  }
2446 
2448  {
2449  er_log_debug (ARG_FILE_LINE, "css_notify_ha_log_applier_state: " "css_check_ha_log_applier_working()\n");
2450  server_state = css_transit_ha_server_state (thread_p, HA_SERVER_STATE_STANDBY);
2451  assert (server_state == HA_SERVER_STATE_STANDBY);
2452  if (server_state == HA_SERVER_STATE_STANDBY)
2453  {
2454  er_log_debug (ARG_FILE_LINE, "css_notify_ha_log_applier_state: " "logtb_disable_update() \n");
2455  logtb_disable_update (thread_p);
2456  }
2457  }
2458 
2459  csect_exit (thread_p, CSECT_HA_SERVER_STATE);
2460  return NO_ERROR;
2461 }
2462 
2463 #if defined(SERVER_MODE)
2464 static int
2466 {
2467 #if defined(WINDOWS) || defined(SOLARIS)
2468  int saddr_len;
2469 #elif defined(UNIXWARE7)
2470  size_t saddr_len;
2471 #else
2472  socklen_t saddr_len;
2473 #endif
2474  struct sockaddr_in clt_sock_addr;
2475  unsigned char *ip_addr;
2476  int err_code;
2477 
2478  saddr_len = sizeof (clt_sock_addr);
2479 
2480  if (getpeername (new_fd, (struct sockaddr *) &clt_sock_addr, &saddr_len) != 0)
2481  {
2482  return ER_FAILED;
2483  }
2484 
2485  ip_addr = (unsigned char *) &(clt_sock_addr.sin_addr);
2486 
2487  if (clt_sock_addr.sin_family == AF_UNIX
2488  || (ip_addr[0] == 127 && ip_addr[1] == 0 && ip_addr[2] == 0 && ip_addr[3] == 1))
2489  {
2490  return NO_ERROR;
2491  }
2492 
2493  if (css_Server_accessible_ip_info == NULL)
2494  {
2495  char ip_str[32];
2496 
2497  sprintf (ip_str, "%d.%d.%d.%d", (unsigned char) ip_addr[0], ip_addr[1], ip_addr[2], ip_addr[3]);
2498 
2500 
2501  return ER_INACCESSIBLE_IP;
2502  }
2503 
2505  err_code = css_check_ip (css_Server_accessible_ip_info, ip_addr);
2507 
2508  if (err_code != NO_ERROR)
2509  {
2510  char ip_str[32];
2511 
2512  sprintf (ip_str, "%d.%d.%d.%d", (unsigned char) ip_addr[0], ip_addr[1], ip_addr[2], ip_addr[3]);
2513 
2515  }
2516 
2517  return err_code;
2518 }
2519 
2520 int
2522 {
2523  int ret_val;
2524  IP_INFO *tmp_accessible_ip_info;
2525 
2527  {
2528  css_Server_accessible_ip_info = NULL;
2529  return NO_ERROR;
2530  }
2531 
2532 #if defined (WINDOWS)
2536 #else
2538 #endif
2539  {
2541  }
2542  else
2543  {
2546  }
2547 
2548  ret_val = css_read_ip_info (&tmp_accessible_ip_info, ip_list_file_name);
2549  if (ret_val == NO_ERROR)
2550  {
2552 
2553  if (css_Server_accessible_ip_info != NULL)
2554  {
2556  }
2557  css_Server_accessible_ip_info = tmp_accessible_ip_info;
2558 
2560  }
2561 
2562  return ret_val;
2563 }
2564 
2565 int
2567 {
2568  int ret_val;
2569 
2570  ret_val = css_free_ip_info (css_Server_accessible_ip_info);
2571  css_Server_accessible_ip_info = NULL;
2572 
2573  return ret_val;
2574 }
2575 
2576 void
2577 xacl_dump (THREAD_ENTRY * thread_p, FILE * outfp)
2578 {
2579  int i, j;
2580 
2581  if (outfp == NULL)
2582  {
2583  outfp = stdout;
2584  }
2585 
2586  fprintf (outfp, "access_ip_control=%s\n", (prm_get_bool_value (PRM_ID_ACCESS_IP_CONTROL) ? "yes" : "no"));
2587  fprintf (outfp, "access_ip_control_file=%s\n", (ip_list_file_name != NULL) ? ip_list_file_name : "NULL");
2588 
2589  if (prm_get_bool_value (PRM_ID_ACCESS_IP_CONTROL) == false || css_Server_accessible_ip_info == NULL)
2590  {
2591  return;
2592  }
2593 
2595 
2596  for (i = 0; i < css_Server_accessible_ip_info->num_list; i++)
2597  {
2598  int address_index = i * IP_BYTE_COUNT;
2599 
2600  for (j = 0; j < css_Server_accessible_ip_info->address_list[address_index]; j++)
2601  {
2602  fprintf (outfp, "%d%s", css_Server_accessible_ip_info->address_list[address_index + j + 1],
2603  ((j != 3) ? "." : ""));
2604  }
2605  if (j != 4)
2606  {
2607  fprintf (outfp, "*");
2608  }
2609  fprintf (outfp, "\n");
2610  }
2611 
2612  fprintf (outfp, "\n");
2613  csect_exit (thread_p, CSECT_ACL);
2614 
2615  return;
2616 }
2617 
2618 int
2619 xacl_reload (THREAD_ENTRY * thread_p)
2620 {
2621  return css_set_accessible_ip_info ();
2622 }
2623 #endif
2624 
2625 /*
2626  * css_get_client_id() - returns the unique client identifier
2627  * return: returns the unique client identifier, on error, returns -1
2628  *
2629  * Note: WARN: this function doesn't lock on thread_entry
2630  */
2631 int
2633 {
2634  CSS_CONN_ENTRY *conn_p;
2635 
2636  if (thread_p == NULL)
2637  {
2638  thread_p = thread_get_thread_entry_info ();
2639  }
2640 
2641  assert (thread_p != NULL);
2642 
2643  conn_p = thread_p->conn_entry;
2644  if (conn_p != NULL)
2645  {
2646  return conn_p->client_id;
2647  }
2648  else
2649  {
2650  return -1;
2651  }
2652 }
2653 
2654 /*
2655  * css_set_thread_info () -
2656  * return:
2657  * thread_p(out):
2658  * client_id(in):
2659  * rid(in):
2660  * tran_index(in):
2661  */
2662 void
2663 css_set_thread_info (THREAD_ENTRY * thread_p, int client_id, int rid, int tran_index, int net_request_index)
2664 {
2665  thread_p->client_id = client_id;
2666  thread_p->rid = rid;
2667  thread_p->tran_index = tran_index;
2668  thread_p->net_request_index = net_request_index;
2669  thread_p->victim_request_fail = false;
2670  thread_p->next_wait_thrd = NULL;
2671  thread_p->wait_for_latch_promote = false;
2672  thread_p->lockwait = NULL;
2673  thread_p->lockwait_state = -1;
2674  thread_p->query_entry = NULL;
2675  thread_p->tran_next_wait = NULL;
2676 
2677  thread_p->end_resource_tracks ();
2678  thread_clear_recursion_depth (thread_p);
2679 }
2680 
2681 /*
2682  * css_get_comm_request_id() - returns the request id that started the current thread
2683  * return: returns the comm system request id for the client request that
2684  * started the thread. On error, returns -1
2685  *
2686  * Note: WARN: this function doesn't lock on thread_entry
2687  */
2688 unsigned int
2690 {
2691  if (thread_p == NULL)
2692  {
2693  thread_p = thread_get_thread_entry_info ();
2694  }
2695 
2696  assert (thread_p != NULL);
2697 
2698  return thread_p->rid;
2699 }
2700 
2701 /*
2702  * css_get_current_conn_entry() -
2703  * return:
2704  */
2707 {
2708  THREAD_ENTRY *thread_p;
2709 
2710  thread_p = thread_get_thread_entry_info ();
2711  assert (thread_p != NULL);
2712 
2713  return thread_p->conn_entry;
2714 }
2715 
2716 // *INDENT-OFF*
2717 /*
2718  * css_push_server_task () - push a task on server request worker pool
2719  *
2720  * return : void
2721  * thread_ref (in) : thread context
2722  * task (in) : task to execute
2723  *
2724  * TODO: this is also used externally due to legacy design; should be internalized completely
2725  */
2726 static void
2728 {
2729  // push the task
2730  //
2731  // note: cores are partitioned by connection index. this is particularly important in order to avoid having tasks
2732  // randomly pushed to cores that are full. some of those tasks may belong to threads holding locks. as a
2733  // consequence, lock waiters may wait longer or even indefinitely if we are really unlucky.
2734  //
2735  conn_ref.add_pending_request ();
2736  thread_get_manager ()->push_task_on_core (css_Server_request_worker_pool, new css_server_task (conn_ref),
2737  static_cast<size_t> (conn_ref.idx));
2738 }
2739 
2740 void
2742 {
2743  thread_get_manager ()->push_task (css_Server_request_worker_pool, new css_server_external_task (conn, task));
2744 }
2745 
2746 void
2748 {
2749  m_conn.start_request ();
2750 
2751  thread_ref.conn_entry = &m_conn;
2752  session_state *session_p = thread_ref.conn_entry->session_p;
2753 
2754  if (session_p != NULL)
2755  {
2756  thread_ref.private_lru_index = session_get_private_lru_idx (session_p);
2757  }
2758  else
2759  {
2760  assert (thread_ref.private_lru_index == -1);
2761  }
2762 
2763  thread_ref.m_status = cubthread::entry::status::TS_RUN;
2764 
2765  // TODO: we lock tran_index_lock because css_internal_request_handler expects it to be locked. however, I am not
2766  // convinced we really need this
2767  pthread_mutex_lock (&thread_ref.tran_index_lock);
2768  (void) css_internal_request_handler (thread_ref, m_conn);
2769 
2770  thread_ref.conn_entry = NULL;
2771  thread_ref.m_status = cubthread::entry::status::TS_FREE;
2772 }
2773 
2774 void
2776 {
2777  thread_ref.conn_entry = m_conn;
2778 
2779  session_state *session_p = thread_ref.conn_entry != NULL ? thread_ref.conn_entry->session_p : NULL;
2780  if (session_p != NULL)
2781  {
2782  thread_ref.private_lru_index = session_get_private_lru_idx (session_p);
2783  }
2784  else
2785  {
2786  assert (thread_ref.private_lru_index == -1);
2787  }
2788 
2789  // TODO: We lock tran_index_lock because external task expects it to be locked.
2790  // However, I am not convinced we really need this
2791  pthread_mutex_lock (&thread_ref.tran_index_lock);
2792 
2793  m_task->execute (thread_ref);
2794 
2795  thread_ref.conn_entry = NULL;
2796 }
2797 
2798 void
2800 {
2801  thread_ref.conn_entry = &m_conn;
2802 
2803  // todo: we lock tran_index_lock because css_connection_handler_thread expects it to be locked. however, I am not
2804  // convinced we really need this
2805  pthread_mutex_lock (&thread_ref.tran_index_lock);
2806  (void) css_connection_handler_thread (&thread_ref, &m_conn);
2807 
2808  thread_ref.conn_entry = NULL;
2809 }
2810 
2811 //
2812 // css_stop_non_log_writer () - function mapped over worker pools to search and stop non-log writer workers
2813 //
2814 // thread_ref (in) : entry of thread to check and stop
2815 // stop_mapper (out) : ignored; part of expected signature of mapper function
2816 // stopper_thread_ref (in) : entry of thread mapping this function over worker pool
2817 //
2818 static void
2819 css_stop_non_log_writer (THREAD_ENTRY & thread_ref, bool & stop_mapper, THREAD_ENTRY & stopper_thread_ref)
2820 {
2821  (void) stop_mapper; // suppress unused warning
2822 
2823  // porting of legacy code
2824 
2825  if (css_is_log_writer (thread_ref))
2826  {
2827  // not log writer
2828  return;
2829  }
2830  int tran_index = thread_ref.tran_index;
2831  if (tran_index == NULL_TRAN_INDEX)
2832  {
2833  // no transaction, no stop
2834  return;
2835  }
2836 
2837  (void) logtb_set_tran_index_interrupt (&stopper_thread_ref, tran_index, true);
2838 
2839  if (thread_ref.m_status == cubthread::entry::status::TS_WAIT && logtb_is_current_active (&thread_ref))
2840  {
2841  thread_lock_entry (&thread_ref);
2842 
2843  if (thread_ref.tran_index != NULL_TRAN_INDEX && thread_ref.m_status == cubthread::entry::status::TS_WAIT
2844  && thread_ref.lockwait == NULL && thread_ref.check_interrupt)
2845  {
2846  thread_ref.interrupted = true;
2848  }
2849  thread_unlock_entry (&thread_ref);
2850  }
2851  // make sure not blocked in locks
2852  lock_force_thread_timeout_lock (&thread_ref);
2853 }
2854 
2855 //
2856 // css_stop_log_writer () - function mapped over worker pools to search and stop log writer workers
2857 //
2858 // thread_ref (in) : entry of thread to check and stop
2859 // stop_mapper (out) : ignored; part of expected signature of mapper function
2860 // stopper_thread_ref (in) : entry of thread mapping this function over worker pool
2861 //
2862 static void
2863 css_stop_log_writer (THREAD_ENTRY & thread_ref, bool & stop_mapper)
2864 {
2865  (void) stop_mapper; // suppress unused warning
2866 
2867  if (!css_is_log_writer (thread_ref))
2868  {
2869  // this is not log writer
2870  return;
2871  }
2872  if (thread_ref.tran_index == -1)
2873  {
2874  // no transaction, no stop
2875  return;
2876  }
2877  if (thread_ref.m_status == cubthread::entry::status::TS_WAIT && logtb_is_current_active (&thread_ref))
2878  {
2880  thread_ref.interrupted = true;
2881  }
2882  // make sure not blocked in locks
2883  lock_force_thread_timeout_lock (&thread_ref);
2884 }
2885 
2886 
2887 //
2888 // css_find_not_stopped () - find any target thread that is not stopped
2889 //
2890 // thread_ref (in) : entry of thread that should be stopped
2891 // stop_mapper (out) : output true to stop mapping
2892 // is_log_writer (in) : true to target log writers, false to target non-log writers
2893 // found (out) : output true if target thread is not stopped
2894 //
2895 static void
2896 css_find_not_stopped (THREAD_ENTRY & thread_ref, bool & stop_mapper, bool is_log_writer, bool & found)
2897 {
2898  if (thread_ref.conn_entry == NULL)
2899  {
2900  // no conn_entry => does not need stopping
2901  return;
2902  }
2903 
2904  if (is_log_writer != css_is_log_writer (thread_ref))
2905  {
2906  // don't care
2907  return;
2908  }
2909  if (thread_ref.m_status != cubthread::entry::status::TS_FREE)
2910  {
2911  found = true;
2912  stop_mapper = true;
2913  }
2914 }
2915 
2916 //
2917 // css_is_log_writer () - does thread entry belong to a log writer?
2918 //
2919 // return : true for log writer, false otherwise
2920 // thread_arg (in) : thread entry
2921 //
2922 static bool
2923 css_is_log_writer (const THREAD_ENTRY &thread_arg)
2924 {
2925  // note - access to thread entry is not exclusive and racing may occur
2926  volatile const css_conn_entry * connp = thread_arg.conn_entry;
2927  return connp != NULL && connp->stop_phase == THREAD_STOP_LOGWR;
2928 }
2929 
2930 //
2931 // css_stop_all_workers () - stop target workers based on phase (log writers or non-log writers)
2932 //
2933 // thread_ref (in) : thread local entry
2934 // stop_phase (in) : THREAD_STOP_WORKERS_EXCEPT_LOGWR or THREAD_STOP_LOGWR
2935 //
2936 static void
2938 {
2939  bool is_not_stopped;
2940 
2941  if (css_Server_request_worker_pool == NULL)
2942  {
2943  // nothing to stop
2944  return;
2945  }
2946 
2947  // note: this is legacy code ported from thread.c; the whole log writer management seems complicated, but hopefully
2948  // it can be removed after HA refactoring.
2949  //
2950  // question: is it possible to have more than one log writer thread?
2951  //
2952 
2953  if (stop_phase == THREAD_STOP_WORKERS_EXCEPT_LOGWR)
2954  {
2955  // first block all connections
2956  css_block_all_active_conn (stop_phase);
2957  }
2958 
2959  // loop until all are stopped
2960  while (true)
2961  {
2962  // tell all to stop
2963  if (stop_phase == THREAD_STOP_LOGWR)
2964  {
2965  css_Server_request_worker_pool->map_running_contexts (css_stop_log_writer);
2966  css_Connection_worker_pool->map_running_contexts (css_stop_log_writer);
2967  }
2968  else
2969  {
2970  css_Server_request_worker_pool->map_running_contexts (css_stop_non_log_writer, thread_ref);
2971  css_Connection_worker_pool->map_running_contexts (css_stop_non_log_writer, thread_ref);
2972  }
2973 
2974  // sleep for 50 milliseconds
2975  std::this_thread::sleep_for (std::chrono::milliseconds (50));
2976 
2977  // check if any thread is not stopped
2978  is_not_stopped = false;
2979  css_Server_request_worker_pool->map_running_contexts (css_find_not_stopped, stop_phase == THREAD_STOP_LOGWR,
2980  is_not_stopped);
2981  if (!is_not_stopped)
2982  {
2983  // check connection threads too
2984  css_Connection_worker_pool->map_running_contexts (css_find_not_stopped, stop_phase == THREAD_STOP_LOGWR,
2985  is_not_stopped);
2986  }
2987  if (!is_not_stopped)
2988  {
2989  // all threads are stopped, break loop
2990  break;
2991  }
2992 
2994  {
2995  er_log_debug (ARG_FILE_LINE, "could not stop all active workers");
2996  _exit (0);
2997  }
2998  }
2999 
3000  // we must not block active connection before terminating log writer thread
3001  if (stop_phase == THREAD_STOP_LOGWR)
3002  {
3003  css_block_all_active_conn (stop_phase);
3004  }
3005 }
3006 
3007 //
3008 // css_get_thread_stats () - get statistics for server request handlers
3009 //
3010 // stats_out (out) : output statistics
3011 //
3012 void
3013 css_get_thread_stats (UINT64 *stats_out)
3014 {
3015  css_Server_request_worker_pool->get_stats (stats_out);
3016 }
3017 
3018 //
3019 // css_get_num_request_workers () - get number of workers executing server requests
3020 //
3021 size_t
3023 {
3024  return css_Server_request_worker_pool->get_max_count ();
3025 }
3026 
3027 //
3028 // css_get_num_connection_workers () - get number of workers handling connections
3029 //
3030 size_t
3032 {
3033  return css_Connection_worker_pool->get_max_count ();
3034 }
3035 
3036 //
3037 // css_get_num_total_workers () - get total number of workers (request and connection handlers)
3038 //
3039 size_t
3041 {
3043 }
3044 
3045 //
3046 // css_wp_worker_get_busy_count_mapper () - function to map through worker pool entries and count busy workers
3047 //
3048 // thread_ref (in) : thread entry (context)
3049 // stop_mapper (in/out) : normally used to stop mapping early, ignored here
3050 // busy_count (out) : increment when busy worker is found
3051 //
3052 static void
3053 css_wp_worker_get_busy_count_mapper (THREAD_ENTRY & thread_ref, bool & stop_mapper, int & busy_count)
3054 {
3055  (void) stop_mapper; // suppress unused parameter warning
3056 
3057  if (thread_ref.tran_index != NULL_TRAN_INDEX)
3058  {
3059  // busy thread
3060  busy_count++;
3061  }
3062  else
3063  {
3064  // must be waiting for task; not busy
3065  }
3066 }
3067 
3068 //
3069 // css_wp_core_job_scan_mapper () - function to map worker pool cores and get info required for "job scan"
3070 //
3071 // wp_core (in) : worker pool core
3072 // stop_mapper (in/out) : output true to stop mapper early
3073 // thread_p (in) : thread entry of job scan
3074 // ctx (in) : job scan context
3075 // core_index (in/out) : current core index; is incremented on each call
3076 // error_code (out) : output error_code if any errors occur
3077 //
3078 static void
3080  THREAD_ENTRY * thread_p, SHOWSTMT_ARRAY_CONTEXT * ctx, size_t & core_index,
3081  int & error_code)
3082 {
3083  DB_VALUE *vals = showstmt_alloc_tuple_in_context (thread_p, ctx);
3084  if (vals == NULL)
3085  {
3086  assert (false);
3087  error_code = ER_FAILED;
3088  stop_mapper = true;
3089  return;
3090  }
3091 
3092  // add core index; it used to be job queue index
3093  size_t val_index = 0;
3094  (void) db_make_int (&vals[val_index++], (int) core_index);
3095 
3096  // add max worker count; it used to be max thread workers per job queue
3097  (void) db_make_int (&vals[val_index++], (int) wp_core.get_max_worker_count ());
3098 
3099  // number of busy workers; core does not keep it, we need to count them manually
3100  int busy_count = 0;
3101  wp_core.map_running_contexts (stop_mapper, css_wp_worker_get_busy_count_mapper, busy_count);
3102  (void) db_make_int (&vals[val_index++], (int) busy_count);
3103 
3104  // number of connection workers; just for backward compatibility, there are no connections workers here
3105  (void) db_make_int (&vals[val_index++], 0);
3106 
3107  // increment core_index
3108  ++core_index;
3109 
3110  assert (val_index == CSS_JOB_QUEUE_SCAN_COLUMN_COUNT);
3111 }
3112 
3113 //
3114 // css_is_any_thread_not_suspended_mapfunc
3115 //
3116 // thread_ref (in) : current thread entry
3117 // stop_mapper (out) : output true to stop mapper
3118 // count (out) : count number of threads
3119 // found (out) : output true when not suspended thread is found
3120 //
3121 static void
3122 css_is_any_thread_not_suspended_mapfunc (THREAD_ENTRY & thread_ref, bool & stop_mapper, size_t & count, bool & found)
3123 {
3124  if (thread_ref.m_status != cubthread::entry::status::TS_WAIT)
3125  {
3126  // found not suspended; stop
3127  stop_mapper = true;
3128  found = true;
3129  return;
3130  }
3131  ++count;
3132 }
3133 
3134 //
3135 // css_are_all_request_handlers_suspended - are all request handlers suspended?
3136 //
3137 bool
3139 {
3140  // assume all are suspended
3141  bool is_any_not_suspended = false;
3142  size_t checked_threads_count = 0;
3143 
3144  css_Server_request_worker_pool->map_running_contexts (css_is_any_thread_not_suspended_mapfunc, checked_threads_count,
3145  is_any_not_suspended);
3146  if (is_any_not_suspended)
3147  {
3148  // found a thread that was not suspended
3149  return false;
3150  }
3151 
3152  if (checked_threads_count == css_Server_request_worker_pool->get_max_count ())
3153  {
3154  // all threads are suspended
3155  return true;
3156  }
3157  else
3158  {
3159  // at least one thread is free
3160  return false;
3161  }
3162 }
3163 
3164 //
3165 // css_count_transaction_worker_threads_mapfunc () - mapper function for worker pool thread entries. tries to identify
3166 // entries belonging to given transaction/client and increment
3167 // counter
3168 //
3169 // thread_ref (in) : thread entry belonging to running worker
3170 // stop_mapper (out) : ignored
3171 // caller_thread (in) : thread entry of caller
3172 // tran_index (in) : transaction index
3173 // client_id (in) : client id
3174 // count (out) : increment counter if thread entry belongs to transaction/client
3175 //
3176 static void
3178  THREAD_ENTRY * caller_thread, int tran_index, int client_id,
3179  size_t & count)
3180 {
3181  (void) stop_mapper; // suppress unused parameter warning
3182 
3183  CSS_CONN_ENTRY *conn_p;
3184  bool does_belong = false;
3185 
3186  if (caller_thread == &thread_ref || thread_ref.type != TT_WORKER)
3187  {
3188  // not what we need
3189  return;
3190  }
3191 
3192  (void) pthread_mutex_lock (&thread_ref.tran_index_lock);
3193 
3194  if (!thread_ref.is_on_current_thread ()
3195  && thread_ref.m_status != cubthread::entry::status::TS_DEAD
3196  && thread_ref.m_status != cubthread::entry::status::TS_FREE
3197  && thread_ref.m_status != cubthread::entry::status::TS_CHECK)
3198  {
3199  conn_p = thread_ref.conn_entry;
3200  if (tran_index == NULL_TRAN_INDEX)
3201  {
3202  // exact match client ID is required
3203  does_belong = (conn_p != NULL && conn_p->client_id == client_id);
3204  }
3205  else if (tran_index == thread_ref.tran_index)
3206  {
3207  // match client ID or null connection
3208  does_belong = (conn_p == NULL || conn_p->client_id == client_id);
3209  }
3210  }
3211 
3212  pthread_mutex_unlock (&thread_ref.tran_index_lock);
3213 
3214  if (does_belong)
3215  {
3216  count++;
3217  }
3218 }
3219 
3220 //
3221 // css_count_transaction_worker_threads () - count thread entries belonging to transaction/client (exclude current
3222 // thread)
3223 //
3224 // return : thread entries count
3225 // thread_p (in) : thread entry of caller
3226 // tran_index (in) : transaction index
3227 // client_id (in) : client id
3228 //
3229 size_t
3231 {
3232  size_t count = 0;
3233 
3234  css_Server_request_worker_pool->map_running_contexts (css_count_transaction_worker_threads_mapfunc, thread_p,
3235  tran_index, client_id, count);
3236 
3237  return count;
3238 }
3239 
3240 static bool
3242 {
3244 }
3245 
3248 {
3249  // todo: need infinite timeout
3250  return
3252 }
3253 
3254 static bool
3256 {
3258 }
3259 
3262 {
3263  // todo: need infinite timeout
3265 }
3266 
3267 static void
3269 {
3270  if (css_Connection_worker_pool == NULL || css_Server_request_worker_pool == NULL)
3271  {
3272  // not started yet
3273  return;
3274  }
3275 
3276  // start if pooling is configured
3277  using clock_type = std::chrono::system_clock;
3278  clock_type::time_point start_time = clock_type::now ();
3279 
3280  bool start_connections = css_get_connection_thread_pooling_configuration ();
3282 
3283  if (start_connections)
3284  {
3285  css_Connection_worker_pool->start_all_workers ();
3286  }
3287  if (start_workers)
3288  {
3289  css_Server_request_worker_pool->start_all_workers ();
3290  }
3291 
3292  clock_type::time_point end_time = clock_type::now ();
3294  "css_start_all_threads: \n"
3295  "\tstarting connection threads: %s\n"
3296  "\tstarting transaction workers: %s\n"
3297  "\telapsed time: %lld microseconds",
3298  start_connections ? "true" : "false",
3299  start_workers ? "true" : "false",
3300  std::chrono::duration_cast<std::chrono::microseconds> (end_time - start_time).count ());
3301 }
3302 // *INDENT-ON*
static void css_stop_log_writer(THREAD_ENTRY &thread_ref, bool &)
#define NUM_NORMAL_TRANS
int css_free_accessible_ip_info(void)
cubthread::entry_task * m_task
const char * envvar_root(void)
size_t css_get_num_request_workers(void)
int status
unsigned char address_list[ACL_MAX_IP_COUNT *IP_BYTE_COUNT]
Definition: broker_shm.h:202
unsigned int css_send_data_to_client(CSS_CONN_ENTRY *conn, unsigned int eid, char *buffer, int buffer_size)
#define FROM_UNREGISTER_CLIENT
static void css_wp_worker_get_busy_count_mapper(THREAD_ENTRY &thread_ref, bool &stop_mapper, int &busy_count)
int tran_index
Definition: log_impl.h:465
css_error_code
cubthread::entry * thread_get_thread_entry_info(void)
#define NO_ERROR
Definition: error_code.h:46
THREAD_RET_T THREAD_CALLING_CONVENTION css_master_thread(void)
static bool css_check_ha_log_applier_done(void)
static int css_read_header(CSS_CONN_ENTRY *conn, NET_HEADER *local_header)
css_server_external_task(CSS_CONN_ENTRY *conn, cubthread::entry_task *task)
static void css_send_reply_to_new_client_request(CSS_CONN_ENTRY *conn, unsigned short rid, int reason)
void logtb_disable_update(THREAD_ENTRY *thread_p)
void css_close_server_connection_socket(void)
Definition: tcp.c:1242
void css_set_ha_repl_delayed(void)
TRANTABLE trantable
Definition: log_impl.h:650
int css_init(THREAD_ENTRY *thread_p, char *server_name, int name_length, int port_id)
static cubthread::entry_workpool * css_Server_request_worker_pool
int num_total_indices
Definition: log_impl.h:581
#define START_EXCLUSIVE_ACCESS_ACTIVE_CONN_ANCHOR(r)
Definition: connection_sr.h:58
int SOCKET
Definition: porting.h:482
#define ASSERT_ERROR()
int(* CSS_THREAD_FN)(THREAD_ENTRY *thrd, CSS_THREAD_ARG)
#define HA_CHANGE_MODE_IMMEDIATELY
static void css_start_all_threads(void)
int xacl_reload(THREAD_ENTRY *thread_p)
size_t css_count_transaction_worker_threads(THREAD_ENTRY *thread_p, int tran_index, int client_id)
#define rmutex_unlock(a, b)
unsigned int css_get_comm_request_id(THREAD_ENTRY *thread_p)
#define ERR_CSS_MASTER_PIPE_ERROR
Definition: error_code.h:424
void showstmt_free_array_context(THREAD_ENTRY *thread_p, SHOWSTMT_ARRAY_CONTEXT *ctx)
Definition: show_scan.c:373
#define CSS_WAIT_COUNT
static int css_internal_request_handler(THREAD_ENTRY &thread_ref, CSS_CONN_ENTRY &conn_ref)
void css_shutdown_conn(CSS_CONN_ENTRY *conn)
void execute(context_type &thread_ref) overridefinal
unsigned int htonl(unsigned int from)
unsigned int css_send_reply_and_3_data_to_client(CSS_CONN_ENTRY *conn, unsigned int eid, char *reply, int reply_size, char *buffer1, int buffer1_size, char *buffer2, int buffer2_size, char *buffer3, int buffer3_size)
#define ER_FAILED
Definition: error_code.h:47
void css_free_conn(CSS_CONN_ENTRY *conn)
int private_lru_index
Definition: session.c:133
SOCKET css_Pipe_to_master
LOG_LSA * log_get_eof_lsa(void)
Definition: log_manager.c:572
LOG_GLOBAL log_Gl
#define csect_enter(a, b, c)
Definition: cnv.c:138
#define NULL_TRANID
static void css_refuse_connection_request(SOCKET new_fd, unsigned short rid, int reason, int error)
#define pthread_mutex_unlock(a)
Definition: area_alloc.c:51
void logpb_force_flush_pages(THREAD_ENTRY *thread_p)
#define CHECK_CLIENT_IS_ALIVE()
int logtb_count_not_allowed_clients_in_maintenance_mode(THREAD_ENTRY *thread_p)
int css_change_ha_server_state(THREAD_ENTRY *thread_p, HA_SERVER_STATE state, bool force, int timeout, bool heartbeat)
#define ASSERT_ERROR_AND_SET(error_code)
bool logtb_set_tran_index_interrupt(THREAD_ENTRY *thread_p, int tran_index, bool set)
void thread_sleep(double millisec)
#define assert_release(e)
Definition: error_manager.h:96
#define CSS_RID_FROM_EID(eid)
void thread_wakeup_already_had_mutex(cubthread::entry *thread_p, thread_resume_suspend_status resume_reason)
const char * get_host_name() const
static void css_push_server_task(CSS_CONN_ENTRY &conn_ref)
void css_initialize_server_interfaces(int(*request_handler)(THREAD_ENTRY *thrd, unsigned int eid, int request, int size, char *buffer), CSS_THREAD_FN connection_error_function)
SOCKET fd
static bool css_Server_shutdown_inited
char * er_get_area_error(char *buffer, int *length)
static bool ha_Repl_delay_detected
cubthread::manager * thread_get_manager(void)
HA_LOG_APPLIER_STATE state
#define OR_ALIGNED_BUF(size)
static CSS_CONN_ENTRY * css_Master_conn
#define ER_CSS_SERVER_HA_MODE_CHANGE
Definition: error_code.h:1209
#define MASTER_TO_SRV_MSG_SIZE
void push_task_on_core(entry_workpool *worker_pool_arg, entry_task *exec_p, std::size_t core_hash)
const int LOG_WORKER_POOL_CONNECTIONS
static int eid
Definition: cas_error_log.c:61
const int LOG_WORKER_POOL_TRAN_WORKERS
void map_running_contexts(bool &stop, Func &&func, Args &&...args) const
int jsp_jvm_is_loaded(void)
Definition: jsp_sr.c:774
void logtb_enable_update(THREAD_ENTRY *thread_p)
entry_workpool * create_worker_pool(std::size_t pool_size, std::size_t task_max_count, const char *name, entry_manager *context_manager, std::size_t core_count, bool debug_logging, bool pool_threads=false, wait_seconds wait_for_task_time=std::chrono::seconds(5))
#define OR_ALIGNED_BUF_SIZE(abuf)
void css_set_thread_info(THREAD_ENTRY *thread_p, int client_id, int rid, int tran_index, int net_request_index)
int css_read_ip_info(IP_INFO **out_ip_info, char *filename)
static css_error_code css_internal_connection_handler(CSS_CONN_ENTRY *conn)
std::chrono::high_resolution_clock clock_type
SHOWSTMT_ARRAY_CONTEXT * showstmt_alloc_array_context(THREAD_ENTRY *thread_p, int num_total, int num_cols)
Definition: show_scan.c:336
unsigned int css_send_reply_and_2_data_to_client(CSS_CONN_ENTRY *conn, unsigned int eid, char *reply, int reply_size, char *buffer1, int buffer1_size, char *buffer2, int buffer2_size)
static int css_Master_port_id
static int css_check_conn(CSS_CONN_ENTRY *p)
#define er_log_debug(...)
int css_return_queued_error(CSS_CONN_ENTRY *conn, unsigned short request_id, char **buffer, int *buffer_size, int *rc)
#define INVALID_SOCKET
Definition: porting.h:483
void prm_set_integer_value(PARAM_ID prm_id, int value)
static char * css_Master_server_name
static bool css_get_connection_thread_pooling_configuration(void)
#define HA_LOG_APPLIER_STATE_TABLE_MAX
unsigned int css_receive_data_from_client_with_timeout(CSS_CONN_ENTRY *conn, unsigned int eid, char **buffer, int *size, int timeout)
int css_receive_request(CSS_CONN_ENTRY *conn, unsigned short *rid, int *request, int *buffer_size)
size_t css_get_num_connection_workers(void)
#define SockError
void css_set_ha_num_of_hosts(int num)
static cubthread::wait_seconds css_get_connection_thread_timeout_configuration(void)
void THREAD_ENTRY
static void css_wp_core_job_scan_mapper(const cubthread::entry_workpool::core &wp_core, bool &stop_mapper, THREAD_ENTRY *thread_p, SHOWSTMT_ARRAY_CONTEXT *ctx, size_t &core_index, int &error_code)
#define TR_TABLE_CS_ENTER(thread_p)
Definition: log_impl.h:88
static void css_find_not_stopped(THREAD_ENTRY &thread_ref, bool &stop, bool is_log_writer, bool &found)
LOG_TDES ** all_tdes
Definition: log_impl.h:595
#define OR_ALIGNED_BUF_START(abuf)
int css_send_heartbeat_data(CSS_CONN_ENTRY *conn, const char *data, int size)
Definition: heartbeat.c:178
char boot_Host_name[CUB_MAXHOSTNAMELEN]
Definition: boot_cl.c:158
int css_set_accessible_ip_info(void)
void css_insert_into_active_conn_list(CSS_CONN_ENTRY *conn)
static int css_reestablish_connection_to_master(void)
static HA_LOG_APPLIER_STATE_TABLE ha_Log_applier_state[HA_LOG_APPLIER_STATE_TABLE_MAX]
manager * get_manager(void)
static IP_INFO * css_Server_accessible_ip_info
#define THREAD_RET_T
Definition: porting.h:713
void er_set(int severity, const char *file_name, const int line_no, int err_id, int num_args,...)
void css_push_external_task(CSS_CONN_ENTRY *conn, cubthread::entry_task *task)
db_client_type client_type
void LOG_CS_ENTER_READ_MODE(THREAD_ENTRY *thread_p)
#define assert(x)
static void css_count_transaction_worker_threads_mapfunc(THREAD_ENTRY &thread_ref, bool &stop_mapper, THREAD_ENTRY *caller_thread, int tran_index, int client_id, size_t &count)
clock::time_point time_point
Definition: perf_def.hpp:40
static bool css_is_log_writer(const THREAD_ENTRY &thread_arg)
css_error_code(* css_Connect_handler)(CSS_CONN_ENTRY *)
#define ERR_CSS_SHUTDOWN_ERROR
Definition: error_code.h:422
#define BOOT_IS_ALLOWED_CLIENT_TYPE_IN_MT_MODE(host1, host2, client_type)
Definition: boot.h:91
std::size_t get_core_count(void) const
int prm_get_integer_value(PARAM_ID prm_id)
#define ER_GENERIC_ERROR
Definition: error_code.h:49
int er_set_area_error(char *server_area)
int logtb_count_clients(THREAD_ENTRY *thread_p)
static const size_t CSS_JOB_QUEUE_SCAN_COLUMN_COUNT
static int(* css_Server_request_handler)(THREAD_ENTRY *, unsigned int, int, int, char *)
#define ER_OUT_OF_VIRTUAL_MEMORY
Definition: error_code.h:50
void LOG_CS_EXIT(THREAD_ENTRY *thread_p)
static void css_process_get_server_ha_mode_request(SOCKET master_fd)
void css_register_handler_routines(css_error_code(*connect_handler)(CSS_CONN_ENTRY *conn), CSS_THREAD_FN request_handler, CSS_THREAD_FN connection_error_handler)
void map_cores(Func &&func, Args &&...args)
const char * css_ha_server_state_string(HA_SERVER_STATE state)
#define ER_HB_PROCESS_EVENT
Definition: error_code.h:1239
bool css_are_all_request_handlers_suspended(void)
#define IS_INVALID_SOCKET(socket)
Definition: porting.h:484
SOCKET css_server_accept(SOCKET sockfd)
Definition: tcp.c:1256
#define DEFAULT_HEADER_DATA
bool css_is_shutdowning_server()
void thread_check_suspend_reason_and_wakeup(cubthread::entry *thread_p, thread_resume_suspend_status resume_reason, thread_resume_suspend_status suspend_reason)
static void css_setup_server_loop(void)
static void css_set_shutdown_timeout(int timeout)
static void css_initialize_conn(CSS_CONN_ENTRY *conn, SOCKET fd)
#define ER_INACCESSIBLE_IP
Definition: error_code.h:1340
bool logtb_is_current_active(THREAD_ENTRY *thread_p)
static void css_close_connection_to_master(void)
static char * ip_list_file_name
void logtb_slam_transaction(THREAD_ENTRY *thread_p, int tran_index)
enum ha_log_applier_state HA_LOG_APPLIER_STATE
HA_SERVER_STATE css_ha_server_state(void)
static char ip_file_real_path[PATH_MAX]
bool css_is_shutdown_timeout_expired(void)
void lock_force_thread_timeout_lock(THREAD_ENTRY *thrd)
static int ha_Log_applier_state_num
static int rv
Definition: area_alloc.c:52
unsigned int css_receive_data_from_client(CSS_CONN_ENTRY *conn, unsigned int eid, char **buffer, int *size)
void vacuum_stop_workers(THREAD_ENTRY *thread_p)
Definition: vacuum.c:1332
void er_log_stats(void) const
#define NULL
Definition: freelistheap.h:34
#define rmutex_lock(a, b)
void css_end_server_request(CSS_CONN_ENTRY *conn)
bool in_transaction
static void css_process_change_server_ha_mode_request(SOCKET master_fd)
int session_get_private_lru_idx(const void *session_p)
Definition: session.c:3084
css_connection_task(CSS_CONN_ENTRY &conn)
static void css_process_shutdown_request(SOCKET master_fd)
bool css_peer_alive(SOCKET sd, int timeout)
Definition: tcp.c:1470
SOCKET css_Server_connection_socket
#define err(fd,...)
Definition: porting.h:431
SOCKET css_open_new_socket_from_master(SOCKET fd, unsigned short *rid)
Definition: tcp.c:1059
void get_stats(cubperf::stat_value *stats_out) const
void thread_lock_entry(cubthread::entry *thread_p)
struct packet_header NET_HEADER
int css_get_ha_num_of_hosts(void)
void css_unset_ha_repl_delayed(void)
#define csect_exit(a, b)
Definition: cnv.c:139
DB_VALUE * showstmt_alloc_tuple_in_context(THREAD_ENTRY *thread_p, SHOWSTMT_ARRAY_CONTEXT *ctx)
Definition: show_scan.c:402
CSS_CONN_ENTRY * css_get_current_conn_entry(void)
static struct timeval start_time
#define OR_LOG_LSA_ALIGNED_SIZE
CLIENTIDS client
Definition: log_impl.h:484
int css_get_client_id(THREAD_ENTRY *thread_p)
std::size_t system_core_count(void)
void er_set_with_oserror(int severity, const char *file_name, const int line_no, int err_id, int num_args,...)
int count(int &result, const cub_regex_object &reg, const std::string &src, const int position, const INTL_CODESET codeset)
static void css_process_get_eof_request(SOCKET master_fd)
static void css_stop_all_workers(THREAD_ENTRY &thread_ref, css_thread_stop_type stop_phase)
void log_append_ha_server_state(THREAD_ENTRY *thread_p, int state)
Definition: log_manager.c:3156
wait_duration< std::chrono::seconds > wait_seconds
CSS_CONN_ENTRY * css_Active_conn_anchor
#define ER_IO_WRITE
Definition: error_code.h:63
static int css_get_master_request(SOCKET master_fd)
void map_running_contexts(Func &&func, Args &&...args)
unsigned int css_send_abort_to_client(CSS_CONN_ENTRY *conn, unsigned int eid)
void execute(context_type &thread_ref) overridefinal
void destroy_worker_pool(entry_workpool *&worker_pool_arg)
static void css_is_any_thread_not_suspended_mapfunc(THREAD_ENTRY &thread_ref, bool &stop_mapper, size_t &count, bool &found)
int css_send_heartbeat_request(CSS_CONN_ENTRY *conn, int command)
Definition: heartbeat.c:151
#define NULL_TRAN_INDEX
Context context_type
Definition: thread_task.hpp:64
#define RMUTEX_NAME_TEMP_CONN_ENTRY
static struct timeval css_Shutdown_timeout
static void error(const char *msg)
Definition: gencat.c:331
void worker_manager_stop_all()
#define ER_CSS_CLIENTS_EXCEEDED
Definition: error_code.h:809
void thread_unlock_entry(cubthread::entry *thread_p)
static int rc
Definition: serial.c:50
int css_readn(SOCKET fd, char *ptr, int nbytes, int timeout)
int css_receive_data(CSS_CONN_ENTRY *conn, unsigned short req_id, char **buffer, int *buffer_size, int timeout)
static cubthread::entry_workpool * css_Connection_worker_pool
void css_get_thread_stats(UINT64 *stats_out)
int css_get_max_conn(void)
#define ARG_FILE_LINE
Definition: error_manager.h:44
css_server_task(CSS_CONN_ENTRY &conn)
static cubthread::wait_seconds css_get_server_request_thread_timeout_configuration(void)
int css_windows_startup(void)
Definition: wintcp.c:94
unsigned short ntohs(unsigned short from)
static bool css_get_server_request_thread_pooling_configuration(void)
static bool css_check_ha_log_applier_working(void)
int css_check_ip(IP_INFO *ip_info, unsigned char *address)
void css_dealloc_conn_rmutex(CSS_CONN_ENTRY *conn)
int num_list
Definition: broker_shm.h:204
#define csect_enter_as_reader(a, b, c)
int css_free_ip_info(IP_INFO *ip_info)
#define FROM_REGISTER_CLIENT
void thread_clear_recursion_depth(cubthread::entry *thread_p)
#define free_and_init(ptr)
Definition: memory_alloc.h:147
#define strlen(s1)
Definition: intl_support.c:43
char * prm_get_string_value(PARAM_ID prm_id)
CSS_CONN_ENTRY * m_conn
CSS_CONN_ENTRY & m_conn
static void css_stop_non_log_writer(THREAD_ENTRY &thread_ref, bool &, THREAD_ENTRY &stopper_thread_ref)
TRANID trid
Definition: log_impl.h:466
char * css_pack_server_name(const char *server_name, int *name_length)
std::size_t get_max_count(void) const
void xacl_dump(THREAD_ENTRY *thread_p, FILE *outfp)
void css_windows_shutdown(void)
Definition: wintcp.c:140
bool prm_get_bool_value(PARAM_ID prm_id)
static int css_process_master_request(SOCKET master_fd)
CSS_CONN_ENTRY * css_connect_to_master_server(int master_port_id, const char *server_name, int name_length)
bool css_is_ha_repl_delayed(void)
int css_check_ha_server_state_for_client(THREAD_ENTRY *thread_p, int whence)
boot_server_status
Definition: boot_sr.h:67
int css_job_queues_start_scan(THREAD_ENTRY *thread_p, int show_type, DB_VALUE **arg_values, int arg_cnt, void **ptr)
unsigned int css_return_eid_from_conn(CSS_CONN_ENTRY *conn, CSS_MAP_ENTRY **anchor, unsigned short rid)
void er_clear(void)
int hb_register_to_master(CSS_CONN_ENTRY *conn, int type)
Definition: heartbeat.c:355
unsigned int css_send_reply_and_data_to_client(CSS_CONN_ENTRY *conn, unsigned int eid, char *reply, int reply_size, char *buffer, int buffer_size)
static int css_connection_handler_thread(THREAD_ENTRY *thrd, CSS_CONN_ENTRY *conn)
int i
Definition: dynamic_load.c:954
#define ERR_CSS_ERROR_FROM_SERVER
Definition: error_code.h:442
void css_start_shutdown_server()
size_t css_get_num_total_workers(void)
static void css_process_new_client(SOCKET master_fd)
static HA_SERVER_STATE css_transit_ha_server_state(THREAD_ENTRY *thread_p, HA_SERVER_STATE req_state)
char * or_pack_log_lsa(const char *ptr, const struct log_lsa *lsa)
char * strdup(const char *str)
Definition: porting.c:901
#define HA_DISABLED()
int css_read_and_queue(CSS_CONN_ENTRY *conn, int *type)
#define pthread_mutex_lock(a)
Definition: area_alloc.c:50
#define CSS_ENTRYID_FROM_EID(eid)
static int ha_Server_num_of_hosts
_exit(1)
int db_make_int(DB_VALUE *value, const int num)
unsigned int ntohl(unsigned int from)
int css_send_data(CSS_CONN_ENTRY *conn, unsigned short rid, const char *buffer, int buffer_size)
CSS_CONN_ENTRY & m_conn
css_thread_stop_type
enum ha_server_state HA_SERVER_STATE
Definition: boot.h:126
CSS_CONN_ENTRY * next
#define rmutex_initialize(a, b)
void push_task(entry_workpool *worker_pool_arg, entry_task *exec_p)
char * css_add_client_version_string(THREAD_ENTRY *thread_p, const char *version_string)
std::size_t get_max_worker_count(void) const
int css_send_error(CSS_CONN_ENTRY *conn, unsigned short rid, const char *buffer, int buffer_size)
int css_send_abort_request(CSS_CONN_ENTRY *conn, unsigned short request_id)
int db_error
static int css_test_for_client_errors(CSS_CONN_ENTRY *conn, unsigned int eid)
bool log_prior_has_worker_log_records(THREAD_ENTRY *thread_p)
Definition: log_append.cpp:152
void css_block_all_active_conn(unsigned short stop_phase)
#define TR_TABLE_CS_EXIT(thread_p)
Definition: log_impl.h:90
CSS_CONN_ENTRY * css_Conn_array
void log_set_ha_promotion_time(THREAD_ENTRY *thread_p, INT64 ha_promotion_time)
Definition: log_manager.c:9334
unsigned int css_send_error_to_client(CSS_CONN_ENTRY *conn, unsigned int eid, char *buffer, int buffer_size)
static HA_SERVER_STATE ha_Server_state
void execute(context_type &thread_ref) overridefinal
#define PATH_SEPARATOR
Definition: porting.h:347
#define IP_BYTE_COUNT
Definition: broker_shm.h:129
#define END_EXCLUSIVE_ACCESS_ACTIVE_CONN_ANCHOR(r)
Definition: connection_sr.h:66
bool is_logging_configured(const int logging_flag)
int css_notify_ha_log_applier_state(THREAD_ENTRY *thread_p, HA_LOG_APPLIER_STATE state)
SIGNAL_HANDLER_FUNCTION os_set_signal_handler(const int sig_no, SIGNAL_HANDLER_FUNCTION sig_handler)
Definition: porting.c:1333
const char ** p
Definition: dynamic_load.c:945
#define THREAD_CALLING_CONVENTION
Definition: porting.h:714
const char * rel_major_release_string(void)
void css_cleanup_server_queues(unsigned int eid)
int client_id
static int css_check_accessibility(SOCKET new_fd)
const char * css_ha_applier_state_string(HA_LOG_APPLIER_STATE state)
char * envvar_confdir_file(char *path, size_t size, const char *filename)
void css_remove_all_unexpected_packets(CSS_CONN_ENTRY *conn)
CSS_CONN_ENTRY * css_make_conn(SOCKET fd)