Skip to content

File broker.c

FileList > broker > broker.c

Go to the source code of this file

  • #include <stdio.h>
  • #include <stdlib.h>
  • #include <string.h>
  • #include <signal.h>
  • #include <errno.h>
  • #include <fcntl.h>
  • #include <assert.h>
  • #include <pthread.h>
  • #include <unistd.h>
  • #include <sys/file.h>
  • #include <sys/socket.h>
  • #include <netinet/in.h>
  • #include <arpa/inet.h>
  • #include <netinet/tcp.h>
  • #include <sys/time.h>
  • #include <sys/un.h>
  • #include "connection_defs.h"
  • #include "client_support.h"
  • #include "system_parameter.h"
  • #include "databases_file.h"
  • #include "util_func.h"
  • #include "cas_error.h"
  • #include "cas_common.h"
  • #include "broker_error.h"
  • #include "broker_env_def.h"
  • #include "broker_shm.h"
  • #include "broker_msg.h"
  • #include "broker_process_size.h"
  • #include "broker_util.h"
  • #include "broker_access_list.h"
  • #include "broker_filename.h"
  • #include "broker_er_html.h"
  • #include "broker_send_fd.h"
  • #include "error_manager.h"
  • #include "shard_shm.h"
  • #include "shard_metadata.h"
  • #include "broker_proxy_conn.h"
  • #include "dbtype_def.h"

Classes

Type Name
struct t_clt_table

Public Types

Type Name
enum SERVER_STATE
typedef struct t_clt_table T_CLT_TABLE

Public Static Attributes

Type Name
int br_index = -1
T_BROKER_INFO * br_info_p = [**NULL**](freelistheap_8h.md#define-null)
int br_shard_flag = (0)
pthread_mutex_t broker_shm_mutex
const char * cas_client_type_str = /* multi line expression */
pthread_cond_t clt_table_cond
pthread_mutex_t clt_table_mutex
int current_dropping_as_index = -1
int hold_job = 0
int max_open_fd = 128
int num_busy_uts = 0
int process_flag = 1
SOCKET proxy_sock_fd
pthread_mutex_t run_appl_mutex
char run_appl_server_flag = 0
char run_proxy_flag = 0
pthread_mutex_t run_proxy_mutex
struct sockaddr_un shard_sock_addr
T_SHM_APPL_SERVER * shm_appl
T_SHM_BROKER * shm_br = [**NULL**](freelistheap_8h.md#define-null)
T_SHM_PROXY * shm_proxy_p = [**NULL**](freelistheap_8h.md#define-null)
struct sockaddr_in sock_addr
int sock_addr_len
SOCKET sock_fd

Public Functions

Type Name
int main (int argc, char * argv)

Public Static Functions

Type Name
bool broker_add_new_cas (void)
bool broker_drop_one_cas_by_time_to_kill (void)
int broker_init_shm (void)
void * cas_monitor_thr_f (void * arg)
void cas_monitor_worker (T_APPL_SERVER_INFO * as_info_p, int br_index, int as_index, int * busy_uts)
void check_cas_log (char * br_name, T_APPL_SERVER_INFO * as_info_p, int as_index)
void check_proxy_access_log (T_PROXY_INFO * proxy_info_p)
void check_proxy_log (char * br_name, T_PROXY_INFO * proxy_info_p)
void cleanup (int signo)
SOCKET connect_srv (char * br_name, int as_index)
CSS_CONN_ENTRY * connect_to_master_for_server_monitor (const char * db_name, const char * db_host)
bool di_understand_renewed_error_code (const char * driver_info)
void * dispatch_thr_f (void * arg)
int find_add_as_index (void)
int find_drop_as_index (void)
int find_idle_cas (void)
void get_as_slow_log_filename (char * log_filename, int len, char * broker_name, T_APPL_SERVER_INFO * as_info_p, int as_index)
void get_as_sql_log_filename (char * log_filename, int len, char * broker_name, T_APPL_SERVER_INFO * as_info_p, int as_index)
int get_server_state_from_master (CSS_CONN_ENTRY * conn, const char * db_name)
void * hang_check_thr_f (void * arg)
int init_env (void)
int init_proxy_env (void)
int insert_db_server_check_list (T_DB_SERVER * list_p, int check_list_cnt, const char * db_name, const char * db_host)
void proxy_check_worker (int br_index, T_PROXY_INFO * proxy_info_p)
void * proxy_listener_thr_f (void * arg)
void * proxy_monitor_thr_f (void * arg)
void proxy_monitor_worker (T_PROXY_INFO * proxy_info_p, int br_index, int proxy_index)
void * psize_check_thr_f (void * arg)
void psize_check_worker (T_APPL_SERVER_INFO * as_info_p, int br_index, int as_index)
int read_buffer_async (SOCKET sock_fd, char * buf, int size, int timeout)
int read_from_client (SOCKET sock_fd, char * buf, int size)
int read_from_client_with_timeout (SOCKET sock_fd, char * buf, int size, int timeout_sec)
int read_nbytes_from_client (SOCKET sock_fd, char * buf, int size)
void * receiver_thr_f (void * arg)
void restart_appl_server (T_APPL_SERVER_INFO * as_info_p, int br_index, int as_index)
void restart_proxy_server (T_PROXY_INFO * proxy_info_p, int br_index, int proxy_index)
int run_appl_server (T_APPL_SERVER_INFO * as_info_p, int br_index, int as_index)
int run_proxy_server (T_PROXY_INFO * proxy_info_p, int br_index, int proxy_index)
void send_error_to_driver (int sock, int error, char * driver_info)
void * server_monitor_thr_f (void * arg)
void shard_broker_process (void)
void * shard_dispatch_thr_f (void * arg)
int stop_appl_server (T_APPL_SERVER_INFO * as_info_p, int br_index, int as_index)
int stop_proxy_server (T_PROXY_INFO * proxy_info_p, int br_index, int proxy_index)
int write_buffer_async (SOCKET sock_fd, char * buf, int size, int timeout)
int write_to_client (SOCKET sock_fd, char * buf, int size)
int write_to_client_with_timeout (SOCKET sock_fd, char * buf, int size, int timeout_sec)

Macros

Type Name
define ALIGN_ENV_BUF_SIZE (X) (((([**X**](broker__monitor_8c.md#function-timeout)) + [**ENV\_BUF\_INIT\_SIZE**](broker_8c.md#define-env_buf_init_size)) / [**ENV\_BUF\_INIT\_SIZE**](broker_8c.md#define-env_buf_init_size)) \* [**ENV\_BUF\_INIT\_SIZE**](broker_8c.md#define-env_buf_init_size))
define BUFFER_SIZE [**ONE\_K**](porting_8h.md#define-one_k)
define CAS_SEND_ERROR_CODE (FD, VAL) /* multi line expression */
define ENV_BUF_INIT_SIZE 512
define HANG_COUNT_THRESHOLD_RATIO 0.5
define IP_ADDR_STR_LEN 20
define IS_SET_BIT (C, B) ((([**C**](broker__monitor_8c.md#function-timeout)) & ([**B**](broker__monitor_8c.md#function-timeout))) == ([**B**](broker__monitor_8c.md#function-timeout)))
define JOB_COUNT_MAX 130000000
define MONITOR_SERVER_INTERVAL 5
define NUM_COLLECT_COUNT_PER_INTVL 4
define PS_CHK_PERIOD 600
define SET_BROKER_ERR_CODE () /* multi line expression */
define SET_BROKER_OK_CODE () /* multi line expression */
define SOCKET_TIMEOUT_SEC 2
define V3_WRITE_HEADER_ERR_SOCK (sockfd) /* multi line expression */
define V3_WRITE_HEADER_OK_FILE_SOCK (sock_fd) /* multi line expression */

Public Types Documentation

enum SERVER_STATE

enum SERVER_STATE {
    SERVER_STATE_UNKNOWN = 0,
    SERVER_STATE_DEAD = 1,
    SERVER_STATE_DEREGISTERED = 2,
    SERVER_STATE_STARTED = 3,
    SERVER_STATE_NOT_REGISTERED = 4,
    SERVER_STATE_REGISTERED = 5,
    SERVER_STATE_REGISTERED_AND_TO_BE_STANDBY = 6,
    SERVER_STATE_REGISTERED_AND_ACTIVE = 7,
    SERVER_STATE_REGISTERED_AND_TO_BE_ACTIVE = 8
};

typedef T_CLT_TABLE

typedef struct t_clt_table T_CLT_TABLE;

Public Static Attributes Documentation

variable br_index

int br_index;

variable br_info_p

T_BROKER_INFO* br_info_p;

variable br_shard_flag

int br_shard_flag;

variable broker_shm_mutex

pthread_mutex_t broker_shm_mutex;

variable cas_client_type_str

const char* cas_client_type_str[];

variable clt_table_cond

pthread_cond_t clt_table_cond;

variable clt_table_mutex

pthread_mutex_t clt_table_mutex;

variable current_dropping_as_index

int current_dropping_as_index;

variable hold_job

int hold_job;

variable max_open_fd

int max_open_fd;

variable num_busy_uts

int num_busy_uts;

variable process_flag

int process_flag;

variable proxy_sock_fd

SOCKET proxy_sock_fd;

variable run_appl_mutex

pthread_mutex_t run_appl_mutex;

variable run_appl_server_flag

char run_appl_server_flag;

variable run_proxy_flag

char run_proxy_flag;

variable run_proxy_mutex

pthread_mutex_t run_proxy_mutex;

variable shard_sock_addr

struct sockaddr_un shard_sock_addr;

variable shm_appl

T_SHM_APPL_SERVER* shm_appl;

variable shm_br

T_SHM_BROKER* shm_br;

variable shm_proxy_p

T_SHM_PROXY* shm_proxy_p;

variable sock_addr

struct sockaddr_in sock_addr;

variable sock_addr_len

int sock_addr_len;

variable sock_fd

SOCKET sock_fd;

Public Functions Documentation

function main

int main (
    int argc,
    char * argv
) 

Public Static Functions Documentation

function broker_add_new_cas

static bool broker_add_new_cas (
    void
) 

function broker_drop_one_cas_by_time_to_kill

static bool broker_drop_one_cas_by_time_to_kill (
    void
) 

function broker_init_shm

static int broker_init_shm (
    void
) 

function cas_monitor_thr_f

static void * cas_monitor_thr_f (
    void * arg
) 

function cas_monitor_worker

static void cas_monitor_worker (
    T_APPL_SERVER_INFO * as_info_p,
    int br_index,
    int as_index,
    int * busy_uts
) 

function check_cas_log

static void check_cas_log (
    char * br_name,
    T_APPL_SERVER_INFO * as_info_p,
    int as_index
) 

function check_proxy_access_log

static void check_proxy_access_log (
    T_PROXY_INFO * proxy_info_p
) 

function check_proxy_log

static void check_proxy_log (
    char * br_name,
    T_PROXY_INFO * proxy_info_p
) 

function cleanup

static void cleanup (
    int signo
) 

function connect_srv

static SOCKET connect_srv (
    char * br_name,
    int as_index
) 

function connect_to_master_for_server_monitor

static CSS_CONN_ENTRY * connect_to_master_for_server_monitor (
    const  char * db_name,
    const  char * db_host
) 

function di_understand_renewed_error_code

static bool di_understand_renewed_error_code (
    const  char * driver_info
) 

function dispatch_thr_f

static void * dispatch_thr_f (
    void * arg
) 

function find_add_as_index

static int find_add_as_index (
    void
) 

function find_drop_as_index

static int find_drop_as_index (
    void
) 

function find_idle_cas

static int find_idle_cas (
    void
) 

function get_as_slow_log_filename

static void get_as_slow_log_filename (
    char * log_filename,
    int len,
    char * broker_name,
    T_APPL_SERVER_INFO * as_info_p,
    int as_index
) 

function get_as_sql_log_filename

static void get_as_sql_log_filename (
    char * log_filename,
    int len,
    char * broker_name,
    T_APPL_SERVER_INFO * as_info_p,
    int as_index
) 

function get_server_state_from_master

static int get_server_state_from_master (
    CSS_CONN_ENTRY * conn,
    const  char * db_name
) 

function hang_check_thr_f

static void * hang_check_thr_f (
    void * arg
) 

function init_env

static int init_env (
    void
) 

function init_proxy_env

static int init_proxy_env (
    void
) 

function insert_db_server_check_list

static int insert_db_server_check_list (
    T_DB_SERVER * list_p,
    int check_list_cnt,
    const  char * db_name,
    const  char * db_host
) 

function proxy_check_worker

static void proxy_check_worker (
    int br_index,
    T_PROXY_INFO * proxy_info_p
) 

function proxy_listener_thr_f

static void * proxy_listener_thr_f (
    void * arg
) 

function proxy_monitor_thr_f

static void * proxy_monitor_thr_f (
    void * arg
) 

function proxy_monitor_worker

static void proxy_monitor_worker (
    T_PROXY_INFO * proxy_info_p,
    int br_index,
    int proxy_index
) 

function psize_check_thr_f

static void * psize_check_thr_f (
    void * arg
) 

function psize_check_worker

static void psize_check_worker (
    T_APPL_SERVER_INFO * as_info_p,
    int br_index,
    int as_index
) 

function read_buffer_async

static int read_buffer_async (
    SOCKET sock_fd,
    char * buf,
    int size,
    int timeout
) 

function read_from_client

static int read_from_client (
    SOCKET sock_fd,
    char * buf,
    int size
) 

function read_from_client_with_timeout

static int read_from_client_with_timeout (
    SOCKET sock_fd,
    char * buf,
    int size,
    int timeout_sec
) 

function read_nbytes_from_client

static int read_nbytes_from_client (
    SOCKET sock_fd,
    char * buf,
    int size
) 

function receiver_thr_f

static void * receiver_thr_f (
    void * arg
) 

function restart_appl_server

static void restart_appl_server (
    T_APPL_SERVER_INFO * as_info_p,
    int br_index,
    int as_index
) 

function restart_proxy_server

static void restart_proxy_server (
    T_PROXY_INFO * proxy_info_p,
    int br_index,
    int proxy_index
) 

function run_appl_server

static int run_appl_server (
    T_APPL_SERVER_INFO * as_info_p,
    int br_index,
    int as_index
) 

function run_proxy_server

static int run_proxy_server (
    T_PROXY_INFO * proxy_info_p,
    int br_index,
    int proxy_index
) 

function send_error_to_driver

static void send_error_to_driver (
    int sock,
    int error,
    char * driver_info
) 

function server_monitor_thr_f

static void * server_monitor_thr_f (
    void * arg
) 

function shard_broker_process

static void shard_broker_process (
    void
) 

function shard_dispatch_thr_f

static void * shard_dispatch_thr_f (
    void * arg
) 

function stop_appl_server

static int stop_appl_server (
    T_APPL_SERVER_INFO * as_info_p,
    int br_index,
    int as_index
) 

function stop_proxy_server

static int stop_proxy_server (
    T_PROXY_INFO * proxy_info_p,
    int br_index,
    int proxy_index
) 

function write_buffer_async

static int write_buffer_async (
    SOCKET sock_fd,
    char * buf,
    int size,
    int timeout
) 

function write_to_client

static int write_to_client (
    SOCKET sock_fd,
    char * buf,
    int size
) 

function write_to_client_with_timeout

static int write_to_client_with_timeout (
    SOCKET sock_fd,
    char * buf,
    int size,
    int timeout_sec
) 

Macro Definition Documentation

define ALIGN_ENV_BUF_SIZE

#define ALIGN_ENV_BUF_SIZE (
    X
) `(((( X ) + ENV_BUF_INIT_SIZE ) / ENV_BUF_INIT_SIZE ) * ENV_BUF_INIT_SIZE )`

define BUFFER_SIZE

#define BUFFER_SIZE `ONE_K`

define CAS_SEND_ERROR_CODE

#define CAS_SEND_ERROR_CODE (
    FD,
    VAL
) `/* multi line expression */`

define ENV_BUF_INIT_SIZE

#define ENV_BUF_INIT_SIZE `512`

define HANG_COUNT_THRESHOLD_RATIO

#define HANG_COUNT_THRESHOLD_RATIO `0.5`

define IP_ADDR_STR_LEN

#define IP_ADDR_STR_LEN `20`

define IS_SET_BIT

#define IS_SET_BIT (
    C,
    B
) `((( C ) & ( B )) == ( B ))`

define JOB_COUNT_MAX

#define JOB_COUNT_MAX `130000000`

define MONITOR_SERVER_INTERVAL

#define MONITOR_SERVER_INTERVAL `5`

define NUM_COLLECT_COUNT_PER_INTVL

#define NUM_COLLECT_COUNT_PER_INTVL `4`

define PS_CHK_PERIOD

#define PS_CHK_PERIOD `600`

define SET_BROKER_ERR_CODE

#define SET_BROKER_ERR_CODE (

) `/* multi line expression */`

define SET_BROKER_OK_CODE

#define SET_BROKER_OK_CODE (

) `/* multi line expression */`

define SOCKET_TIMEOUT_SEC

#define SOCKET_TIMEOUT_SEC `2`

define V3_WRITE_HEADER_ERR_SOCK

#define V3_WRITE_HEADER_ERR_SOCK (
    sockfd
) `/* multi line expression */`

define V3_WRITE_HEADER_OK_FILE_SOCK

#define V3_WRITE_HEADER_OK_FILE_SOCK (
    sock_fd
) `/* multi line expression */`


The documentation for this class was generated from the following file cubrid/src/broker/broker.c