Skip to content

File cas_network.c

File List > broker > cas_network.c

Go to the documentation of this file

/*
 * Copyright 2008 Search Solution Corporation
 * Copyright 2016 CUBRID Corporation
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 *
 */


/*
 * cas_network.c -
 */

#ident "$Id$"

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <fcntl.h>
#include <errno.h>
#include <assert.h>

#if defined(WINDOWS)
#include <winsock2.h>
#include <windows.h>
#include <io.h>
#else /* WINDOWS */
#include <sys/time.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <sys/un.h>
#include <poll.h>
#endif /* WINDOWS */

#include "porting.h"
#include "cas_common.h"
#include "cas_common_vars.h"
#include "cas_network.h"
#include "cas_error.h"
#include "cas_protocol.h"
#include "broker_config.h"
#include "broker_shm.h"
#include "error_code.h"
#include "broker_util.h"
#include "cas_ssl.h"

#if defined(WINDOWS)
#include "broker_wsa_init.h"
#endif /* WINDOWS */

#define SELECT_MASK fd_set

static int write_buffer (SOCKET sock_fd, const char *buf, int size);
static int read_buffer (SOCKET sock_fd, char *buf, int size);

static void set_net_timeout_flag (void);
static void unset_net_timeout_flag (void);

#if defined(WINDOWS)
static int get_host_ip (unsigned char *ip_addr);
#endif /* WINDOWS */

static bool net_timeout_flag = false;

static char net_error_flag;
static int net_timeout = NET_DEFAULT_TIMEOUT;

#define READ_FROM_NET(sd, buf, size) ssl_client ? cas_ssl_read (sd, buf, size) : \
    READ_FROM_SOCKET(sd, buf, size)
#define WRITE_TO_NET(sd, buf, size) ssl_client ? cas_ssl_write (sd, buf, size) : \
       WRITE_TO_SOCKET(sd, buf, size)

SOCKET
#if defined(WINDOWS)
net_init_env (int *new_port)
#else /* WINDOWS */
net_init_env (char *port_name)
#endif              /* WINDOWS */
{
  int one = 1;
  SOCKET sock_fd;
  int sock_addr_len;
#if defined(WINDOWS)
  struct sockaddr_in sock_addr;
  int n;
#else /* WINDOWS */
  struct sockaddr_un sock_addr;
#endif /* WINDOWS */

#if defined(WINDOWS)
  /* WSA startup */
  if (wsa_initialize ())
    {
      return INVALID_SOCKET;
    }
#endif /* WINDOWS */

  /* get a Unix stream socket */
#if defined(WINDOWS)
  sock_fd = socket (AF_INET, SOCK_STREAM, 0);
#else /* WINDOWS */
  sock_fd = socket (AF_UNIX, SOCK_STREAM, 0);
#endif /* WINDOWS */
  if (IS_INVALID_SOCKET (sock_fd))
    {
      return INVALID_SOCKET;
    }
  if ((setsockopt (sock_fd, SOL_SOCKET, SO_REUSEADDR, (char *) &one, sizeof (one))) < 0)
    {
      CLOSE_SOCKET (sock_fd);
      return INVALID_SOCKET;
    }

#if defined(WINDOWS)
  memset (&sock_addr, 0, sizeof (struct sockaddr_in));
  sock_addr.sin_family = AF_INET;
  sock_addr.sin_port = htons ((unsigned short) (*new_port));
  sock_addr_len = sizeof (struct sockaddr_in);
  n = INADDR_ANY;
  memcpy (&sock_addr.sin_addr, &n, sizeof (int));
#else /* WINDOWS */

  memset (&sock_addr, 0, sizeof (struct sockaddr_un));
  sock_addr.sun_family = AF_UNIX;
  snprintf (sock_addr.sun_path, sizeof (sock_addr.sun_path), "%s", port_name);
  sock_addr_len = strlen (sock_addr.sun_path) + sizeof (sock_addr.sun_family) + 1;
#endif /* WINDOWS */

  if (bind (sock_fd, (struct sockaddr *) &sock_addr, sock_addr_len) < 0)
    {
      CLOSE_SOCKET (sock_fd);
      return INVALID_SOCKET;
    }

#if defined(WINDOWS)
  if (getsockname (sock_fd, (struct sockaddr *) &sock_addr, &sock_addr_len) < 0)
    {
      CLOSE_SOCKET (sock_fd);
      return INVALID_SOCKET;
    }
  *new_port = ntohs (sock_addr.sin_port);
#endif /* WINDOWS */

  if (listen (sock_fd, 3) < 0)
    {
      CLOSE_SOCKET (sock_fd);
      return INVALID_SOCKET;
    }

  return (sock_fd);
}


#if defined(WINDOWS)
SOCKET
net_connect_proxy (int proxy_id)
#else /* WINDOWS */
SOCKET
net_connect_proxy (void)
#endif              /* !WINDOWS */
{
  int fd, len;

#if defined(WINDOWS)
  char *broker_port;
  int port = 0;
  int one = 1;
  unsigned char ip_addr[4];
  struct sockaddr_in shard_sock_addr;

  /* WSA startup */
  if (wsa_initialize ())
    {
      return (INVALID_SOCKET);
    }

  if (get_host_ip (ip_addr) < 0)
    {
      return (INVALID_SOCKET);
    }

  fd = socket (AF_INET, SOCK_STREAM, 0);
  if (IS_INVALID_SOCKET (fd))
    {
      return (INVALID_SOCKET);
    }
  if ((setsockopt (fd, SOL_SOCKET, SO_REUSEADDR, (char *) &one, sizeof (one))) < 0)
    {
      return (INVALID_SOCKET);
    }

  if ((broker_port = getenv (PORT_NUMBER_ENV_STR)) == NULL)
    {
      return (INVALID_SOCKET);
    }

  port = atoi (broker_port) + 2;
  port = proxy_id * 2 + port;

  SHARD_ERR ("<CAS> connect to socket:[%d].\n", port);

  memset (&shard_sock_addr, 0, sizeof (struct sockaddr_in));
  shard_sock_addr.sin_family = AF_INET;
  shard_sock_addr.sin_port = htons ((unsigned short) port);
  len = sizeof (struct sockaddr_in);
  memcpy (&shard_sock_addr.sin_addr, ip_addr, 4);

#else /* WINDOWS */
  struct sockaddr_un shard_sock_addr;
  char port_name[BROKER_PATH_MAX];

  ut_get_proxy_port_name (port_name, shm_appl->broker_name, as_info->proxy_id, BROKER_PATH_MAX);

  if (port_name == NULL)
    {
      return (INVALID_SOCKET);
    }
  /* FOR DEBUG */
  SHARD_ERR ("<CAS> connect to unixdoamin:[%s].\n", port_name);

  if ((fd = socket (AF_UNIX, SOCK_STREAM, 0)) < 0)
    return (INVALID_SOCKET);

  memset (&shard_sock_addr, 0, sizeof (shard_sock_addr));
  shard_sock_addr.sun_family = AF_UNIX;
  strncpy_bufsize (shard_sock_addr.sun_path, port_name);
#ifdef  _SOCKADDR_LEN       /* 4.3BSD Reno and later */
  len = sizeof (shard_sock_addr.sun_len) + sizeof (shard_sock_addr.sun_family) + strlen (shard_sock_addr.sun_path) + 1;
  shard_sock_addr.sun_len = len;
#else /* vanilla 4.3BSD */
  len = strlen (shard_sock_addr.sun_path) + sizeof (shard_sock_addr.sun_family) + 1;
#endif
#endif /* !WINDOWS */

  if (connect (fd, (struct sockaddr *) &shard_sock_addr, len) < 0)
    {
      CLOSE_SOCKET (fd);
      return (INVALID_SOCKET);
    }

  net_error_flag = 0;
  return (fd);
}


SOCKET
net_connect_client (SOCKET srv_sock_fd)
{
#if defined(WINDOWS) || defined(SOLARIS)
  int clt_sock_addr_len;
#elif defined(UNIXWARE7)
  size_t clt_sock_addr_len;
#else /* UNIXWARE7 */
  socklen_t clt_sock_addr_len;
#endif /* UNIXWARE7 */
  SOCKET clt_sock_fd;
  struct sockaddr_in clt_sock_addr;

  clt_sock_addr_len = sizeof (clt_sock_addr);
  clt_sock_fd = accept (srv_sock_fd, (struct sockaddr *) &clt_sock_addr, &clt_sock_addr_len);

  if (IS_INVALID_SOCKET (clt_sock_fd))
    return INVALID_SOCKET;

  net_error_flag = 0;
  return clt_sock_fd;
}

int
net_write_stream (SOCKET sock_fd, const char *buf, int size)
{
  while (size > 0)
    {
      int write_len;

      write_len = write_buffer (sock_fd, buf, size);

      if (write_len <= 0)
    {
#ifdef _DEBUG
      printf ("write error\n");
#endif
      return -1;
    }
      buf += write_len;
      size -= write_len;
    }
  return 0;
}

int
net_read_stream (SOCKET sock_fd, char *buf, int size)
{
  while (size > 0)
    {
      int read_len;

      read_len = read_buffer (sock_fd, buf, size);

      if (read_len <= 0)
    {
#ifdef _DEBUG
      if (!is_net_timed_out ())
        printf ("read error %d\n", read_len);
#endif
      return -1;
    }
      buf += read_len;
      size -= read_len;
    }

  return 0;
}

int
net_read_header (SOCKET sock_fd, MSG_HEADER * header)
{
  int retval = 0;

  if (cas_info_size > 0)
    {
      retval = net_read_stream (sock_fd, header->buf, MSG_HEADER_SIZE);
      *(header->msg_body_size_ptr) = ntohl (*(header->msg_body_size_ptr));
    }
  else
    {
      retval = net_read_stream (sock_fd, (char *) header->msg_body_size_ptr, 4);
    }

  return retval;
}

int
net_write_header (SOCKET sock_fd, MSG_HEADER * header)
{
  int retval = 0;

  *(header->msg_body_size_ptr) = htonl (*(header->msg_body_size_ptr));
  retval = net_write_stream (sock_fd, header->buf, MSG_HEADER_SIZE);

  return 0;
}

void
init_msg_header (MSG_HEADER * header)
{
  header->msg_body_size_ptr = (int *) (header->buf);
  header->info_ptr = (char *) (header->buf + MSG_HEADER_MSG_SIZE);

  *(header->msg_body_size_ptr) = 0;
  header->info_ptr[CAS_INFO_STATUS] = CAS_INFO_STATUS_INACTIVE;
  header->info_ptr[CAS_INFO_RESERVED_1] = CAS_INFO_RESERVED_DEFAULT;
  header->info_ptr[CAS_INFO_RESERVED_2] = CAS_INFO_RESERVED_DEFAULT;
  header->info_ptr[CAS_INFO_ADDITIONAL_FLAG] = CAS_INFO_RESERVED_DEFAULT;

  /* BROKER_RECONNECT_DOWN_SERVER does not supported. so CAS_INFO_FLAG_MASK_NEW_SESSION_ID flag must be disabled. */
  header->info_ptr[CAS_INFO_ADDITIONAL_FLAG] &= ~CAS_INFO_FLAG_MASK_NEW_SESSION_ID;
}


int
net_write_int (SOCKET sock_fd, int value)
{
  value = htonl (value);

  return (write_buffer (sock_fd, (const char *) (&value), 4));
}

int
net_read_int (SOCKET sock_fd, int *value)
{
  if (net_read_stream (sock_fd, (char *) value, 4) < 0)
    return (-1);

  *value = ntohl (*value);
  return 0;
}

int
net_decode_str (char *msg, int msg_size, char *func_code, void ***ret_argv)
{
  int remain_size = msg_size;
  char *cur_p = msg;
  char *argp;
  int i_val;
  void **argv = NULL;
  int argc = 0;

  *ret_argv = (void **) NULL;

  if (remain_size < 1)
    return CAS_ER_COMMUNICATION;

  *func_code = *cur_p;
  cur_p += 1;
  remain_size -= 1;

  while (remain_size > 0)
    {
      if (remain_size < 4)
    {
      FREE_MEM (argv);
      return CAS_ER_COMMUNICATION;
    }
      argp = cur_p;
      memcpy ((char *) &i_val, cur_p, 4);
      i_val = ntohl (i_val);
      remain_size -= 4;
      cur_p += 4;

      if (remain_size < i_val)
    {
      FREE_MEM (argv);
      return CAS_ER_COMMUNICATION;
    }

      argc++;
      argv = (void **) REALLOC (argv, sizeof (void *) * argc);
      if (argv == NULL)
    return CAS_ER_NO_MORE_MEMORY;

      argv[argc - 1] = argp;

      cur_p += i_val;
      remain_size -= i_val;
    }

  *ret_argv = argv;
  return argc;
}

int
net_read_to_file (SOCKET sock_fd, int file_size, char *filename)
{
  int out_fd;
  char read_buf[1024];
  int read_len;

  out_fd = open (filename, O_CREAT | O_TRUNC | O_WRONLY, 0666);
#if defined(WINDOWS)
  if (out_fd >= 0)
    setmode (out_fd, O_BINARY);
#endif /* WINDOWS */

  while (file_size > 0)
    {
      read_len = read_buffer (sock_fd, read_buf, (int) MIN (SSIZEOF (read_buf), file_size));
      if (read_len <= 0 || read_len > MIN (SSIZEOF (read_buf), file_size))
    {
      return ERROR_INFO_SET (CAS_ER_COMMUNICATION, CAS_ERROR_INDICATOR);
    }
      if (out_fd >= 0)
    {
      write (out_fd, read_buf, read_len);
    }
      file_size -= read_len;
    }

  if (out_fd < 0)
    {
      return ERROR_INFO_SET (CAS_ER_OPEN_FILE, CAS_ERROR_INDICATOR);
    }

  close (out_fd);
  return 0;
}

int
net_write_from_file (SOCKET sock_fd, int file_size, char *filename)
{
  int in_fd;
  char read_buf[1024];
  int read_len;

  in_fd = open (filename, O_RDONLY);
  if (in_fd < 0)
    {
      return -1;
    }

#if defined(WINDOWS)
  setmode (in_fd, O_BINARY);
#endif /* WINDOWS */

  while (file_size > 0)
    {
      read_len = read (in_fd, read_buf, (int) MIN (file_size, SSIZEOF (read_buf)));
      if (read_len < 0)
    {
      close (in_fd);
      return -1;
    }
      if (net_write_stream (sock_fd, read_buf, read_len) < 0)
    {
      close (in_fd);
      return -1;
    }
      file_size -= read_len;
    }

  close (in_fd);
  return 0;
}

void
net_timeout_set (int timeout_sec)
{
  net_timeout = timeout_sec;
}

static int
read_buffer (SOCKET sock_fd, char *buf, int size)
{
  int read_len = -1;
  bool ssl_data_ready = false;
#if defined(ASYNC_MODE)
  struct pollfd po[2] = { {0, 0, 0}, {0, 0, 0} };
  int timeout, po_size, n;
#endif /* ASYNC_MODE */

  unset_net_timeout_flag ();
  if (net_error_flag)
    {
      return -1;
    }

#if defined(ASYNC_MODE)
  timeout = net_timeout < 0 ? -1 : net_timeout * 1000;

  po[0].fd = sock_fd;
  po[0].events = POLLIN;
  po_size = 1;

  if (cas_shard_flag == OFF)
    {
      if (!IS_INVALID_SOCKET (new_req_sock_fd))
    {
      po[1].fd = new_req_sock_fd;
      po[1].events = POLLIN;
      po_size = 2;
    }
    }

retry_poll:
  if (ssl_client && is_ssl_data_ready (sock_fd))
    {
      po[0].revents = POLLIN;
      n = 1;
    }
  else
    {
      n = poll (po, po_size, timeout);
    }
  if (n < 0)
    {
      if (errno == EINTR)
    {
      goto retry_poll;
    }
      else
    {
      net_error_flag = 1;
      return -1;
    }
    }
  else if (n == 0)
    {
      /* TIMEOUT */
      set_net_timeout_flag ();
      return -1;
    }
  else
    {
      if (cas_shard_flag == OFF && !IS_INVALID_SOCKET (new_req_sock_fd) && (po[1].revents & POLLIN))
    {
      /* CHANGE CLIENT */
      return -1;
    }
      if (po[0].revents & POLLERR || po[0].revents & POLLHUP)
    {
      read_len = -1;
    }
      else if (po[0].revents & POLLIN)
    {
#endif /* ASYNC_MODE */
      /* RECEIVE NEW REQUEST */
      read_len = READ_FROM_NET (sock_fd, buf, size);
#if defined(ASYNC_MODE)
    }
    }
#endif /* ASYNC_MODE */

  if (read_len <= 0)
    {
      net_error_flag = 1;
    }
  return read_len;
}

static int
write_buffer (SOCKET sock_fd, const char *buf, int size)
{
  int write_len = -1;
#ifdef ASYNC_MODE
  struct pollfd po[1] = { {0, 0, 0} };
  int timeout, n;

  timeout = net_timeout < 0 ? -1 : net_timeout * 1000;
#endif /* ASYNC_MODE */

  if (net_error_flag || IS_INVALID_SOCKET (sock_fd))
    {
      return -1;
    }

#ifdef ASYNC_MODE
  po[0].fd = sock_fd;
  po[0].events = POLLOUT;

retry_poll:
  n = poll (po, 1, timeout);
  if (n < 0)
    {
      if (errno == EINTR)
    {
      goto retry_poll;
    }
      else
    {
      net_error_flag = 1;
      return -1;
    }
    }
  else if (n == 0)
    {
      /* TIMEOUT */
      net_error_flag = 1;
      return -1;
    }
  else
    {
      if (po[0].revents & POLLERR || po[0].revents & POLLHUP)
    {
      write_len = -1;
    }
      else if (po[0].revents & POLLOUT)
    {
#endif /* ASYNC_MODE */
      write_len = WRITE_TO_NET (sock_fd, buf, size);
#if defined(ASYNC_MODE)
    }
    }
#endif /* ASYNC_MODE */

  if (write_len <= 0)
    {
      net_error_flag = 1;
    }
  return write_len;
}

#if defined(WINDOWS)
static int
get_host_ip (unsigned char *ip_addr)
{
  char hostname[CUB_MAXHOSTNAMELEN];
  struct hostent *hp;

  if (gethostname (hostname, sizeof (hostname)) < 0)
    {
      return -1;
    }
  if ((hp = gethostbyname_uhost (hostname)) == NULL)
    {
      return -1;
    }
  memcpy (ip_addr, hp->h_addr_list[0], 4);

  return 0;
}
#endif /* WINDOWS */

bool
is_net_timed_out (void)
{
  return net_timeout_flag;
}

static void
set_net_timeout_flag (void)
{
  net_timeout_flag = true;
}

static void
unset_net_timeout_flag (void)
{
  net_timeout_flag = false;
}

void
net_write_error (SOCKET sock, int version, char *driver_info, char *cas_info, int cas_info_size, int indicator,
         int code, const char *msg)
{
  size_t len = NET_SIZE_INT;
  size_t err_msg_len = 0;
  char err_msg[ERR_MSG_LENGTH];

  assert (code != NO_ERROR);

  if (version >= CAS_MAKE_VER (8, 3, 0))
    {
      len += NET_SIZE_INT;
    }

  err_msg_len = net_error_append_shard_info (err_msg, msg, ERR_MSG_LENGTH);
  if (err_msg_len > 0)
    {
      len += err_msg_len + 1;
    }
  net_write_int (sock, (int) len);

  if (cas_info_size > 0)
    {
      net_write_stream (sock, cas_info, cas_info_size);
    }

  if (version >= CAS_MAKE_VER (8, 3, 0))
    {
      net_write_int (sock, indicator);
    }

  if (!DOES_CLIENT_MATCH_THE_PROTOCOL (version, PROTOCOL_V2) && !cas_di_understand_renewed_error_code (driver_info)
      && code != NO_ERROR)
    {
      if (indicator == CAS_ERROR_INDICATOR || code == CAS_ER_NOT_AUTHORIZED_CLIENT)
    {
      code = CAS_CONV_ERROR_TO_OLD (code);
    }
    }

  net_write_int (sock, code);
  if (err_msg_len > 0)
    {
      net_write_stream (sock, err_msg, (int) err_msg_len + 1);
    }
}