Skip to content

File broker_proxy_conn.c

File List > broker > broker_proxy_conn.c

Go to the documentation of this file

/*
 * Copyright 2008 Search Solution Corporation
 * Copyright 2016 CUBRID Corporation
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 *
 */


/*
 * broker_proxy_conn.c -
 */

#ident "$Id$"

#include <sys/types.h>
#include <assert.h>
#if !defined(WINDOWS)
#include <sys/time.h>
#include <unistd.h>
#include <pthread.h>
#endif /* !WINDOWS */

#include "porting.h"
#include "broker_proxy_conn.h"
#include "shard_proxy_common.h"
#include "shard_shm.h"

#if !defined(WINDOWS)
T_PROXY_CONN broker_Proxy_conn = {
  -1,               /* max_num_proxy */
  0,                /* cur_num_proxy */
  NULL              /* proxy_sockfd */
};

#define PROXY_SVR_CON_RETRY_COUNT   3
#define PROXY_SVR_CON_RETRY_MSEC    400

pthread_mutex_t proxy_conn_mutex;

static void broker_free_all_proxy_conn_ent (void);
static T_PROXY_CONN_ENT *broker_find_proxy_conn_by_fd (SOCKET fd);
static T_PROXY_CONN_ENT *broker_find_proxy_conn_by_id (int proxy_id);

static void
broker_free_all_proxy_conn_ent (void)
{
  T_PROXY_CONN_ENT *ent_p, *next_ent_p;

  for (ent_p = broker_Proxy_conn.proxy_conn_ent; ent_p; ent_p = next_ent_p)
    {
      next_ent_p = ent_p->next;

      FREE_MEM (ent_p);
      ent_p = NULL;
    }

  broker_Proxy_conn.proxy_conn_ent = NULL;
}

int
broker_set_proxy_fds (fd_set * fds)
{
  int ret = 0;
  T_PROXY_CONN_ENT *ent_p;

  pthread_mutex_lock (&proxy_conn_mutex);
  for (ent_p = broker_Proxy_conn.proxy_conn_ent; ent_p; ent_p = ent_p->next)
    {
      if (ent_p->fd != INVALID_SOCKET && ent_p->status == PROXY_CONN_CONNECTED)
    {
      FD_SET (ent_p->fd, fds);
    }
    }
  pthread_mutex_unlock (&proxy_conn_mutex);

  return ret;
}

SOCKET
broker_get_readable_proxy_conn (fd_set * fds)
{
  SOCKET fd;
  T_PROXY_CONN_ENT *ent_p;

  pthread_mutex_lock (&proxy_conn_mutex);
  for (ent_p = broker_Proxy_conn.proxy_conn_ent; ent_p; ent_p = ent_p->next)
    {
      if (ent_p->fd == INVALID_SOCKET)
    {
      continue;
    }

      if (ent_p->status != PROXY_CONN_CONNECTED)
    {
      continue;
    }

      if (FD_ISSET (ent_p->fd, fds))
    {
      fd = ent_p->fd;
      FD_CLR (ent_p->fd, fds);

      pthread_mutex_unlock (&proxy_conn_mutex);
      return fd;
    }
    }
  pthread_mutex_unlock (&proxy_conn_mutex);

  return INVALID_SOCKET;

}

static T_PROXY_CONN_ENT *
broker_find_proxy_conn_by_fd (SOCKET fd)
{
  T_PROXY_CONN_ENT *ent_p;

  for (ent_p = broker_Proxy_conn.proxy_conn_ent; ent_p; ent_p = ent_p->next)
    {
      if (ent_p->fd != fd)
    {
      continue;
    }

      return ent_p;
    }

  return NULL;
}

static T_PROXY_CONN_ENT *
broker_find_proxy_conn_by_id (int proxy_id)
{
  T_PROXY_CONN_ENT *ent_p;

  for (ent_p = broker_Proxy_conn.proxy_conn_ent; ent_p; ent_p = ent_p->next)
    {
      if (ent_p->proxy_id != proxy_id)
    {
      continue;
    }

      return ent_p;
    }

  return NULL;
}

int
broker_add_proxy_conn (SOCKET fd)
{
  int ret = 0;
  T_PROXY_CONN_ENT *ent_p;

  if (broker_Proxy_conn.max_num_proxy < 0)
    {
      return -1;
    }

  pthread_mutex_lock (&proxy_conn_mutex);
  if (broker_Proxy_conn.max_num_proxy <= broker_Proxy_conn.cur_num_proxy)
    {
      pthread_mutex_unlock (&proxy_conn_mutex);
      return -1;
    }

  ent_p = (T_PROXY_CONN_ENT *) malloc (sizeof (T_PROXY_CONN_ENT));
  if (ent_p == NULL)
    {
      pthread_mutex_unlock (&proxy_conn_mutex);
      return -1;
    }

  ent_p->proxy_id = PROXY_INVALID_ID;
  ent_p->status = PROXY_CONN_CONNECTED;
  ent_p->fd = fd;

  ent_p->next = broker_Proxy_conn.proxy_conn_ent;
  broker_Proxy_conn.proxy_conn_ent = ent_p;

  broker_Proxy_conn.cur_num_proxy++;
  if (broker_Proxy_conn.cur_num_proxy > broker_Proxy_conn.max_num_proxy)
    {
      assert (false);
      broker_Proxy_conn.cur_num_proxy = broker_Proxy_conn.max_num_proxy;
    }

  pthread_mutex_unlock (&proxy_conn_mutex);
  return ret;
}

int
broker_delete_proxy_conn_by_fd (SOCKET fd)
{
  int ret = 0;
  T_PROXY_CONN_ENT *ent_p, *prev_ent_p;

  pthread_mutex_lock (&proxy_conn_mutex);
  for (prev_ent_p = ent_p = broker_Proxy_conn.proxy_conn_ent; ent_p; prev_ent_p = ent_p, ent_p = ent_p->next)
    {
      if (ent_p->fd == fd)
    {
      if (ent_p == broker_Proxy_conn.proxy_conn_ent)
        {
          broker_Proxy_conn.proxy_conn_ent = ent_p->next;
        }
      else
        {
          prev_ent_p->next = ent_p->next;
        }

      broker_Proxy_conn.cur_num_proxy--;
      if (broker_Proxy_conn.cur_num_proxy < 0)
        {
          assert (false);
          broker_Proxy_conn.cur_num_proxy = 0;
        }

      FREE_MEM (ent_p);
      ent_p = NULL;
      break;
    }
    }
  pthread_mutex_unlock (&proxy_conn_mutex);

  return ret;
}

int
broker_delete_proxy_conn_by_proxy_id (int proxy_id)
{
  int ret = 0;
  T_PROXY_CONN_ENT *ent_p, *prev_ent_p;

  if (proxy_id == PROXY_INVALID_ID)
    {
      return -1;
    }

  pthread_mutex_lock (&proxy_conn_mutex);
  for (prev_ent_p = ent_p = broker_Proxy_conn.proxy_conn_ent; ent_p; prev_ent_p = ent_p, ent_p = ent_p->next)
    {
      if (ent_p->proxy_id == proxy_id)
    {
      if (ent_p == broker_Proxy_conn.proxy_conn_ent)
        {
          broker_Proxy_conn.proxy_conn_ent = ent_p->next;
        }
      else
        {
          prev_ent_p->next = ent_p->next;
        }

      broker_Proxy_conn.cur_num_proxy--;
      if (broker_Proxy_conn.cur_num_proxy < 0)
        {
          assert (false);
          broker_Proxy_conn.cur_num_proxy = 0;
        }

      FREE_MEM (ent_p);
      ent_p = NULL;
      break;
    }
    }

  pthread_mutex_unlock (&proxy_conn_mutex);

  return ret;
}

int
broker_register_proxy_conn (SOCKET fd, int proxy_id)
{
  int ret = 0;
  T_PROXY_CONN_ENT *ent_p;

  pthread_mutex_lock (&proxy_conn_mutex);
  ent_p = broker_find_proxy_conn_by_fd (fd);
  if (ent_p == NULL)
    {
      pthread_mutex_unlock (&proxy_conn_mutex);
      return -1;
    }
  assert (ent_p->status != PROXY_CONN_AVAILABLE);
  assert (ent_p->proxy_id == PROXY_INVALID_ID);
  if (ent_p->status == PROXY_CONN_AVAILABLE || ent_p->proxy_id != PROXY_INVALID_ID)
    {
      pthread_mutex_unlock (&proxy_conn_mutex);
      return -1;
    }

  ent_p->status = PROXY_CONN_AVAILABLE;
  ent_p->proxy_id = proxy_id;

  pthread_mutex_unlock (&proxy_conn_mutex);

  return ret;
}
#endif /* !WINDOWS */

#if defined(WINDOWS)
int
broker_find_available_proxy (T_SHM_PROXY * shm_proxy_p, int ip_addr, T_BROKER_VERSION clt_version)
#else /* WINDOWS */
SOCKET
broker_find_available_proxy (T_SHM_PROXY * shm_proxy_p)
#endif              /* !WINDOWS */
{
  int proxy_index;
  int min_cur_client = -1;
  int cur_client = -1;
  int max_context = -1;
  T_PROXY_INFO *proxy_info_p;
#if defined(WINDOWS)
  T_PROXY_INFO *find_proxy_info_p;
#else /* WINDOWS */
  T_PROXY_CONN_ENT *ent_p;
  SOCKET fd = INVALID_SOCKET;
  int retry_count = 0;

  if (broker_Proxy_conn.max_num_proxy < 0)
    {
      return INVALID_SOCKET;
    }

retry:
  pthread_mutex_lock (&proxy_conn_mutex);
#endif /* !WINDOWS */
  for (proxy_index = 0; proxy_index < shm_proxy_p->num_proxy; proxy_index++)
    {
      proxy_info_p = shard_shm_find_proxy_info (shm_proxy_p, proxy_index);

      if (proxy_info_p->pid <= 0)
    {
      continue;
    }

      max_context = proxy_info_p->max_context;
      cur_client = proxy_info_p->cur_client;

#if !defined(WINDOWS)
      ent_p = broker_find_proxy_conn_by_id (proxy_info_p->proxy_id);
      if (ent_p == NULL || ent_p->status != PROXY_CONN_AVAILABLE)
    {
      continue;
    }

      assert (ent_p->fd != INVALID_SOCKET);
#endif /* !WINDOWS */

      if (min_cur_client == -1)
    {
      min_cur_client = cur_client;
    }

      if (cur_client < max_context && cur_client <= min_cur_client)
    {
#if defined(WINDOWS)
      find_proxy_info_p = proxy_info_p;
#else /* WINDOWS */
      fd = ent_p->fd;
#endif /* !WINDOWS */

      min_cur_client = cur_client;
    }
    }

#if !defined(WINDOWS)
  if (shm_proxy_p->num_proxy > 0 && fd < 0 && retry_count++ < PROXY_SVR_CON_RETRY_COUNT)
    {
      pthread_mutex_unlock (&proxy_conn_mutex);
      SLEEP_MILISEC (0, PROXY_SVR_CON_RETRY_MSEC);
      goto retry;
    }
#endif /* !WINDOWS */

#if !defined(WINDOWS)
  pthread_mutex_unlock (&proxy_conn_mutex);
#endif /* !WINDOWS */

#if defined(WINDOWS)
  return find_proxy_info_p->proxy_port;
#else /* WINDOWS */
  return fd;
#endif /* !WINDOWS */
}

#if !defined(WINDOWS)
SOCKET
broker_get_proxy_conn_maxfd (SOCKET proxy_sock_fd)
{
  T_PROXY_CONN_ENT *ent_p;
  int max_fd;

  max_fd = proxy_sock_fd;

  pthread_mutex_lock (&proxy_conn_mutex);
  for (ent_p = broker_Proxy_conn.proxy_conn_ent; ent_p; ent_p = ent_p->next)
    {
      if (ent_p->status != PROXY_CONN_NOT_CONNECTED)
    {
      if (max_fd < ent_p->fd)
        {
          max_fd = ent_p->fd;
        }
    }
    }
  pthread_mutex_unlock (&proxy_conn_mutex);

  return (max_fd + 1);
}

int
broker_init_proxy_conn (int max_proxy)
{
  if (broker_Proxy_conn.max_num_proxy >= 0)
    {
      return 0;
    }

  pthread_mutex_init (&proxy_conn_mutex, NULL);

  broker_Proxy_conn.max_num_proxy = max_proxy;
  broker_Proxy_conn.cur_num_proxy = 0;
  broker_Proxy_conn.proxy_conn_ent = NULL;

  return 0;
}

void
broker_destroy_proxy_conn (void)
{
  if (broker_Proxy_conn.max_num_proxy < 0)
    {
      return;
    }

  pthread_mutex_lock (&proxy_conn_mutex);
  broker_Proxy_conn.max_num_proxy = -1;
  broker_Proxy_conn.cur_num_proxy = 0;

  broker_free_all_proxy_conn_ent ();
  pthread_mutex_unlock (&proxy_conn_mutex);

  pthread_mutex_destroy (&proxy_conn_mutex);

  return;
}
#endif /* !WINDOWS */