Skip to content

File shard_proxy_queue.c

File List > broker > shard_proxy_queue.c

Go to the documentation of this file

/*
 * Copyright 2008 Search Solution Corporation
 * Copyright 2016 CUBRID Corporation
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 *
 */


/*
 * shard_proxy_queue.c -
 *
 */

#ident "$Id$"


#include <stdio.h>
#include <stdlib.h>
#include <assert.h>

#include "cas_common.h"
#include "shard_proxy_common.h"
#include "shard_proxy_queue.h"


static void
shard_queue_insert_after (T_SHARD_QUEUE * q, T_SHARD_QUEUE_ENT * prev, T_SHARD_QUEUE_ENT * curr)
{
  curr->next = NULL;

  if (q->head == NULL)
    {
      assert (prev == NULL);
      assert (q->tail == NULL);

      q->head = q->tail = curr;
      return;
    }

  if (prev)
    {
      curr->next = prev->next;
      prev->next = curr;
    }
  else
    {
      curr->next = q->head;
      q->head = curr;
    }

  if (q->tail == prev)
    {
      q->tail = curr;
    }

  return;
}

int
shard_queue_enqueue (T_SHARD_QUEUE * q, void *v)
{
  T_SHARD_QUEUE_ENT *q_ent;

  assert (q);
  assert (v);

  q_ent = (T_SHARD_QUEUE_ENT *) malloc (sizeof (T_SHARD_QUEUE_ENT));
  if (q_ent)
    {
      q_ent->v = v;
      shard_queue_insert_after (q, q->tail, q_ent);
      return 0;         /* SUCCESS */
    }

  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Not enough virtual memory. " "Failed to alloc shard queue entry. " "(errno:%d).",
         errno);

  return -1;            /* FAILED */
}

int
shard_queue_ordered_enqueue (T_SHARD_QUEUE * q, void *v, SHARD_COMP_FN comp_fn)
{
  T_SHARD_QUEUE_ENT *q_ent;
  T_SHARD_QUEUE_ENT *curr, *prev;

  q_ent = (T_SHARD_QUEUE_ENT *) malloc (sizeof (T_SHARD_QUEUE_ENT));
  if (q_ent)
    {
      q_ent->v = v;

      if (comp_fn == NULL)
    {
      shard_queue_insert_after (q, q->tail, q_ent);
      return 0;
    }

      prev = NULL;
      for (curr = q->head; curr; curr = curr->next)
    {
      if (comp_fn (curr->v, q_ent->v) > 0)
        {
          shard_queue_insert_after (q, prev, q_ent);
          return 0;
        }

      prev = curr;
    }

      shard_queue_insert_after (q, q->tail, q_ent);

      return 0;
    }

  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Not enough virtual memory. " "Failed to alloc shard queue entry. " "(errno:%d).",
         errno);
  return -1;
}

void *
shard_queue_dequeue (T_SHARD_QUEUE * q)
{
  T_SHARD_QUEUE_ENT *q_ent;
  void *ret;

  if (q->head == NULL)
    {
      return NULL;
    }

  q_ent = q->head;
  ret = q_ent->v;

  if (q->head == q->tail)
    {
      q->head = q->tail = NULL;
    }
  else
    {
      q->head = q->head->next;
    }

  FREE_MEM (q_ent);

  return ret;
}

void *
shard_queue_peek_value (T_SHARD_QUEUE * q)
{
  T_SHARD_QUEUE_ENT *q_ent;
  void *ret;

  if (q->head == NULL)
    {
      return NULL;
    }

  q_ent = q->head;
  ret = q_ent->v;

  return ret;
}

int
shard_queue_initialize (T_SHARD_QUEUE * q)
{
  assert (q);

  q->head = NULL;
  q->tail = NULL;

  return 0;
}

void
shard_queue_destroy (T_SHARD_QUEUE * q)
{
  void *v;

  while ((v = shard_queue_dequeue (q)) != NULL)
    {
      FREE_MEM (v);
    }

  q->head = NULL;
  q->tail = NULL;

  return;
}

static bool
shard_cqueue_is_full (T_SHARD_CQUEUE * q)
{
  assert (q);

  return (q->count == q->size) ? true : false;
}

static bool
shard_cqueue_is_empty (T_SHARD_CQUEUE * q)
{
  assert (q);

  return (q->count == 0) ? true : false;
}


int
shard_cqueue_enqueue (T_SHARD_CQUEUE * q, void *e)
{
  assert (q);

  if (shard_cqueue_is_full (q))
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR, "Queue is full. (q_size:%d, q_count:%d).", q->size, q->count);
      /* FAILED */
      return -1;
    }

  q->ent[q->front] = e;

  q->count++;
  q->front = (q->front + 1) % q->size;

  /* SUCCESS */
  return 0;
}

void *
shard_cqueue_dequeue (T_SHARD_CQUEUE * q)
{
  void *e;

  assert (q);

  if (shard_cqueue_is_empty (q))
    {
      /* FAIELD */
      return NULL;
    }

  e = q->ent[q->rear];

  q->count--;
  q->rear = (q->rear + 1) % q->size;

  return e;
}

int
shard_cqueue_initialize (T_SHARD_CQUEUE * q, int size)
{
  assert (q);
  assert (q->ent == NULL);

  q->size = size;
  q->count = 0;
  q->front = 0;
  q->rear = 0;

  q->ent = (void **) malloc (sizeof (void *) * size);
  if (q->ent == NULL)
    {
      PROXY_LOG (PROXY_LOG_MODE_ERROR,
         "Not enough virtual memory. " "Failed to alloc shard cqueue entry. " "(errno:%d, size:%d).", errno,
         (size * sizeof (void *)));

      /* FAILED */
      return -1;
    }

  /* SUCCESS */
  return 0;
}

void
shard_cqueue_destroy (T_SHARD_CQUEUE * q)
{
  q->size = 0;
  q->count = 0;
  q->front = 0;
  q->rear = 0;
  FREE_MEM (q->ent);
}