Skip to content

File cas_runner.c

File List > broker > cas_runner.c

Go to the documentation of this file

/*
 * Copyright 2008 Search Solution Corporation
 * Copyright 2016 CUBRID Corporation
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 *
 */


/*
 * cas_runner.c -
 */

#ident "$Id$"

#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <errno.h>
#include <signal.h>
#include <math.h>
#if defined(WINDOWS)
#include <winsock2.h>
#include <windows.h>
#include <process.h>
#include <sys/timeb.h>
#else
#include <pthread.h>
#include <unistd.h>
#endif

#include "cubrid_getopt.h"
#include "cas_common.h"
#include "porting.h"
#include "cas_cci.h"
#include "broker_log_util.h"

#define PRINT_CCI_ERROR(ERRCODE, CCI_ERROR, result_fp)  \
    do {                    \
      T_CCI_ERROR *cci_error_p = CCI_ERROR; \
      if ((ERRCODE) == CCI_ER_DBMS && cci_error_p != NULL) {    \
        if (!ignore_error(cci_error_p->err_code)) {         \
          cas_error_flag = 1;           \
          if (cubrid_manager_run) {         \
            fprintf(cas_error_fp, "server error : (%d) %s\n", cci_error_p->err_code, cci_error_p->err_msg); \
          }                     \
          else {                    \
            if (result_fp) {            \
              fprintf(result_fp, "%s: server error : %d %s\n", exec_script_file, cci_error_p->err_code, cci_error_p->err_msg);  \
            }                   \
            fprintf(cas_error_fp, "%s: server error : %d %s\n", exec_script_file, cci_error_p->err_code, cci_error_p->err_msg); \
          }                     \
        }                       \
      }                 \
      else if ((ERRCODE) < 0) {     \
        char msgbuf[1024] = "";     \
        cas_error_flag = 1;         \
        if (cubrid_manager_run) {       \
          cci_get_error_msg(ERRCODE, NULL, msgbuf, sizeof(msgbuf)); \
          fprintf(cas_error_fp, "%s\n", msgbuf);    \
        }                   \
        else {              \
          cci_get_error_msg(ERRCODE, NULL, msgbuf, sizeof(msgbuf)); \
          if (result_fp) {          \
            fprintf(result_fp, "%s: cci_error : %d %s\n", exec_script_file, (ERRCODE), msgbuf); \
          }                 \
          fprintf(cas_error_fp, "%s: cci_error : %d %s\n", exec_script_file, (ERRCODE), msgbuf);    \
        }                   \
      }                 \
    } while (0)

#define FREE_BIND_INFO(NUM_BIND, BIND_INFO) \
    do {                    \
      int i;                \
      for (i=0 ; i < NUM_BIND ; i++) {  \
        FREE_MEM(BIND_INFO[i].value);   \
      }                 \
      NUM_BIND = 0;             \
    } while (0)

#define MAX_NODE_INFO   100
#define MAX_IGN_SRV_ERR 100

#if defined(WINDOWS)
#define strcasecmp(X, Y)        _stricmp(X, Y)
#ifdef THREAD_FUNC
#undef THREAD_FUNC
#endif
#define THREAD_FUNC         unsigned __stdcall
#define SLEEP_SEC(X)            Sleep((X) * 1000)
#define SLEEP_MILISEC(sec, msec)    Sleep((sec) * 1000 + (msec))
#else
#define THREAD_FUNC     void*
#define SLEEP_SEC(X)        sleep(X)
#define SLEEP_MILISEC(sec, msec)            \
    do {                        \
      struct timeval sleep_time_val;        \
      sleep_time_val.tv_sec = sec;          \
      sleep_time_val.tv_usec = (msec) * 1000;   \
      select(0, 0, 0, 0, &sleep_time_val);      \
    } while(0)
#endif

#define STRDUP(TARGET, SOURCE) \
        do {                                          \
          if(TARGET != NULL) free(TARGET);    \
          TARGET = strdup (SOURCE);            \
        } while(0)

#define SERVER_HANDLE_ALLOC_SIZE        (MAX_SERVER_H_ID + 1)

typedef struct t_bind_info T_BIND_INFO;
struct t_bind_info
{
  char *value;
  int type;
  int len;
};

typedef struct t_node_info T_NODE_INFO;
struct t_node_info
{
  char *node_name;
  char *dbname;
  char *ip;
  char *dbuser;
  char *dbpasswd;
  int port;
};

static double calc_stddev (double *t, double avg, int count);
static double calc_avg (double *t, int count);
static void calc_min_max (double *t, int count, double *min, double *max);
static int get_args (int argc, char *argv[]);
static int read_conf (void);
static void cas_runner (FILE * fp, FILE * result_fp, double *ret_exec_time, double *ret_prepare_time);
static THREAD_FUNC thr_main (void *arg);
static int process_execute (char *msg, int *req_h, int num_bind, T_BIND_INFO * bind_info, FILE * result_fp,
                double *sum_execute_time);
static int process_bind (char *msg, int *num_bind_p, T_BIND_INFO * bind_info, int bind_len);
static int process_endtran (int con_h, int *req_h, FILE * result_fp);
static int process_close_req (char *linebuf, int *req_h, FILE * result_fp);
static void print_result (int cci_res, int req_id, FILE * fp);
static void free_node (T_NODE_INFO * node);
static int make_node_info (T_NODE_INFO * node, char *node_name, char *info_str);
static int set_args_with_node_info (char *node_name);
static int ignore_error (int code);
static char *make_sql_stmt (char *src);

const char *cci_client_name = "JDBC";

static char *broker_host = NULL;
static int broker_port = 0;
static char *dbname = NULL;
static char *dbuser = NULL;
static char *dbpasswd = NULL;
static int num_thread = 1;
static int repeat_count = 1;
static char *exec_script_file;
static int batch_mode = 0;
static char *result_file = NULL;
static char *cas_err_file = NULL;
static int fork_delay = 0;
static char *node_name = NULL;
static int think_time = 0;
static int qa_test_flag = 0;
static int num_replica = 1;
static int dump_query_plan = 0;
static int autocommit_mode = 0;
static int statdump_mode = 0;

static double *run_time_exec;
static FILE *cas_error_fp;
static int cas_error_flag = 0;
static int cubrid_manager_run = 0;

static T_NODE_INFO node_table[MAX_NODE_INFO];
static int num_node = 0;
static int ign_srv_err_list[MAX_IGN_SRV_ERR];
static int num_ign_srv_err = 0;

int
main (int argc, char *argv[])
{
  pthread_t *thr_id;
  int i;
  const char *err_str = "-";
  double avg;
  double stddev;
  double min, max;
  char *cm_out_msg_fname = NULL;
  FILE *cm_out_msg_fp = NULL;

#if !defined(WINDOWS)
  signal (SIGPIPE, SIG_IGN);
#endif

  if (read_conf () < 0)
    {
      return -1;
    }

  if (get_args (argc, argv) < 0)
    return -1;

  cm_out_msg_fname = getenv ("CUBRID_MANAGER_OUT_MSG_FILE");
  if (cm_out_msg_fname != NULL)
    {
      cubrid_manager_run = 1;
    }

  if (node_name != NULL)
    {
      if (set_args_with_node_info (node_name) < 0)
    {
      fprintf (stderr, "error:node (%s)\n", node_name);
      return -1;
    }
    }

  if (broker_host == NULL || broker_host[0] == '\0')
    {
      fprintf (stderr, "error:broker_host\n");
      return -1;
    }
  if (broker_port <= 0)
    {
      fprintf (stderr, "error:broker_port(%d)\n", broker_port);
      return -1;
    }
  if (dbname == NULL || dbname[0] == '\0')
    {
      fprintf (stderr, "errorr:dbname\n");
      return -1;
    }
  if (dbuser == NULL)
    dbuser = (char *) "PUBLIC";
  if (dbpasswd == NULL)
    dbpasswd = (char *) "";

  cas_error_fp = fopen (cas_err_file, (batch_mode ? "a" : "w"));
  if (cas_error_fp == NULL)
    {
      fprintf (stderr, "fopen error [%s]\n", cas_err_file);
      return -1;
    }

#ifdef DUP_RUN
  num_thread = 1;
#endif

  if (repeat_count < 1)
    repeat_count = 1;
  if (num_thread < 1)
    num_thread = 1;
  if (num_replica < 1)
    num_replica = 1;

  if (!batch_mode && !cubrid_manager_run)
    {
      fprintf (stdout, "broker_host = %s\n", broker_host);
      fprintf (stdout, "broker_port = %d\n", broker_port);
      fprintf (stdout, "num_thread = %d\n", num_thread);
      fprintf (stdout, "repeat = %d\n", repeat_count);
      fprintf (stdout, "dbname = %s\n", dbname);
      fprintf (stdout, "dbuser = %s\n", dbuser);
      fprintf (stdout, "dbpasswd = %s\n", dbpasswd);
      if (result_file)
    {
      fprintf (stdout, "result_file = %s\n", result_file);
    }
    }

  thr_id = (pthread_t *) malloc (sizeof (pthread_t) * num_thread);
  if (thr_id == NULL)
    {
      fprintf (stderr, "malloc error\n");
      return -1;
    }
  run_time_exec = (double *) malloc (sizeof (double) * num_thread * repeat_count);
  if (run_time_exec == NULL)
    {
      FREE_MEM (thr_id);
      fprintf (stderr, "malloc error\n");
      return -1;
    }

  cci_init ();

  if (qa_test_flag == 1)
    {
      int *con_handle;
      T_CCI_ERROR cci_error;
      con_handle = (int *) malloc (sizeof (int) * num_thread);
      if (con_handle == NULL)
    {
      FREE_MEM (thr_id);
      fprintf (stderr, "malloc error\n");
      return -1;
    }
      for (i = 0; i < num_thread; i++)
    {
      con_handle[i] = cci_connect (broker_host, broker_port, dbname, dbuser, dbpasswd);
      cci_get_db_version (con_handle[i], NULL, 0);
    }
      for (i = 0; i < num_thread; i++)
    {
      cci_disconnect (con_handle[i], &cci_error);
    }
      FREE_MEM (con_handle);
    }

  for (i = 0; i < num_thread; i++)
    {
      if (i > 0 && fork_delay > 0)
    {
      SLEEP_SEC (fork_delay);
    }

      if (pthread_create (&thr_id[i], NULL, thr_main, (void *) &i) < 0)
    {
      FREE_MEM (thr_id);
      perror ("Error:cannot create thread");
      return -1;
    }
    }

  for (i = 0; i < num_thread; i++)
    {
      if (pthread_join (thr_id[i], NULL) < 0)
    {
      perror ("pthread_join");
    }
    }

  if (cm_out_msg_fname != NULL)
    {
      cm_out_msg_fp = fopen (cm_out_msg_fname, "w");
    }

  if (cm_out_msg_fp == NULL)
    {
      cm_out_msg_fp = stdout;
    }

  fclose (cas_error_fp);
  if (cas_error_flag)
    {
      FILE *err_fp;
      if (cm_out_msg_fname != NULL)
    {
      err_fp = cm_out_msg_fp;
    }
      else
    {
      err_fp = stderr;
    }
      err_str = "ERR";
      fprintf (err_fp, "\n");
      fprintf (err_fp, "********************************\n");
      if (!cubrid_manager_run)
    {
      fprintf (err_fp, "cas error : %s\n", cas_err_file);
    }
      if (!batch_mode)
    {
      char buf[1024];
      FILE *fp;
      size_t readlen;
      fp = fopen (cas_err_file, "r");
      if (fp != NULL)
        {
          while ((readlen = fread (buf, 1, sizeof (buf), fp)) > 0)
        {
          if (readlen > sizeof (buf))
            {
              readlen = sizeof (buf);
            }
          fwrite (buf, 1, readlen, err_fp);
        }
          fclose (fp);
        }
    }
      fprintf (err_fp, "********************************\n");
    }
  else
    {
      if (!batch_mode)
    unlink (cas_err_file);
    }

  avg = calc_avg (run_time_exec, num_thread * repeat_count);
  stddev = calc_stddev (run_time_exec, avg, num_thread * repeat_count);

  if (cubrid_manager_run)
    {
      calc_min_max (run_time_exec, num_thread * repeat_count, &min, &max);
      fprintf (cm_out_msg_fp, "min : %.6f\n", min);
      fprintf (cm_out_msg_fp, "max : %.6f\n", max);
      fprintf (cm_out_msg_fp, "avg : %.6f\n", avg);
      fprintf (cm_out_msg_fp, "stddev : %.6f\n", stddev);
    }
  else
    {
      fprintf (stdout, "%.6f %.6f %s\n", avg, stddev, err_str);
    }

  if (cm_out_msg_fname != NULL)
    {
      fflush (cm_out_msg_fp);
      if (cm_out_msg_fp != stdout)
    {
      fclose (cm_out_msg_fp);
    }
    }
  FREE_MEM (thr_id);
  FREE_MEM (run_time_exec);

  return 0;
}

static void
calc_min_max (double *t, int count, double *min, double *max)
{
  int i;
  if (count <= 0)
    {
      *min = 0;
      *max = 0;
      return;
    }
  *min = t[0];
  *max = t[0];
  for (i = 1; i < count; i++)
    {
      if (*min > t[i])
    *min = t[i];
      if (*max < t[i])
    *max = t[i];
    }
}

static double
calc_avg (double *t, int count)
{
  double sum = 0;
  int i;
  for (i = 0; i < count; i++)
    {
      sum += t[i];
    }
  return (sum / count);
}

static double
calc_stddev (double *t, double avg, int count)
{
  double sum = 0;
  int i;
  for (i = 0; i < count; i++)
    {
      sum += ((t[i] - avg) * (t[i] - avg));
    }
  sum /= count;
  return (sqrt (sum));
}

static int
get_args (int argc, char *argv[])
{
  int c;

  while ((c = getopt (argc, argv, "saQbqI:P:d:u:p:t:r:o:e:f:n:h:R:")) != EOF)
    {
      switch (c)
    {
    case 's':
      statdump_mode = 1;
      break;
    case 'a':
      autocommit_mode = 1;
      break;
    case 'b':
      batch_mode = 1;
      break;
    case 'q':
      qa_test_flag = 1;
      break;
    case 'I':
      broker_host = optarg;
      break;
    case 'P':
      broker_port = atoi (optarg);
      break;
    case 'd':
      dbname = optarg;
      break;
    case 'u':
      dbuser = optarg;
      break;
    case 'p':
      STRDUP (dbpasswd, optarg);
#if defined (LINUX)
      memset (optarg, '*', strlen (optarg));
#endif
      break;
    case 't':
      num_thread = atoi (optarg);
      if (num_thread < 1)
        num_thread = 1;
      break;
    case 'r':
      repeat_count = atoi (optarg);
      if (repeat_count < 1)
        repeat_count = 1;
      break;
    case 'o':
      result_file = optarg;
      break;
    case 'e':
      cas_err_file = optarg;
      break;
    case 'f':
      fork_delay = atoi (optarg);
      break;
    case 'n':
      node_name = optarg;
      break;
    case 'h':
      think_time = atoi (optarg);
      break;
    case 'R':
      num_replica = atoi (optarg);
      break;
    case 'Q':
      dump_query_plan = 1;
      break;
    case '?':
      goto getargs_err;
    }
    }

  if (optind >= argc)
    {
      goto getargs_err;
    }

  exec_script_file = argv[optind];

  if (batch_mode)
    {
      if (result_file != NULL && strcmp (result_file, "stdout") == 0)
    result_file = NULL;
    }

  return 0;

getargs_err:
  fprintf (stderr,
       "usage : %s [OPTION] exec_script_file\n" "\n" "valid options:\n" "  -I   broker host\n"
       "  -P   broker port\n" "  -d   database name\n" "  -u   user name\n" "  -p   user password\n"
       "  -t   the number of thread\n" "  -r   the number of times to execute entire query by each thread\n"
       "  -Q   enable to print a plan per query\n" "  -o   result file\n"
       "  -s   enable to print a statdump per query\n" "  -a   enable auto commit mode\n", argv[0]);
  return -1;
}

static THREAD_FUNC
thr_main (void *arg)
{
  int id = *(int *) arg;
  FILE *fp;
  int i;
  FILE *result_fp;

  fp = fopen (exec_script_file, "r");
  if (fp == NULL)
    {
      fprintf (stderr, "fopen error [%s]\n", exec_script_file);
      goto end;
    }

  if (result_file == NULL)
    {
      result_fp = NULL;
    }
  else if (strcmp (result_file, "stdout") == 0)
    {
      result_fp = stdout;
    }
  else if (strcmp (result_file, "stderr") == 0)
    {
      result_fp = stderr;
    }
  else
    {
      char result_filename[256];
      sprintf (result_filename, "%s.%d", result_file, id);
      result_fp = fopen (result_filename, "w");
    }

#ifndef DUP_RUN
  if (repeat_count > 1)
    {
      cas_runner (fp, result_fp, NULL, NULL);
      fseek (fp, 0, SEEK_SET);
    }
#endif

  for (i = 0; i < repeat_count; i++)
    {
      double e, p;
      cas_runner (fp, result_fp, &e, &p);
      run_time_exec[id * repeat_count + i] = e;
      fseek (fp, 0, SEEK_SET);
      if (think_time > 0)
    SLEEP_SEC (think_time);
    }

  fclose (fp);
  if (result_fp != NULL && result_fp != stderr && result_fp != stdout)
    fclose (result_fp);

end:
#if defined(WINDOWS)
  return 0;
#else
  return NULL;
#endif
}

static void
cas_runner (FILE * fp, FILE * result_fp, double *ret_exec_time, double *ret_prepare_time)
{
  char *sql_stmt = NULL;
  int con_h = -1;
  T_CCI_ERROR cci_error;
  int num_bind = 0;
  double prepare_time = 0;
  double sum_execute_time = 0;
  double sum_prepare_time = 0;
  char *linebuf = NULL;
  char *data = NULL;
  T_BIND_INFO *bind_info = NULL;
  int *req_h = NULL;
  int req_stat_h = -1;
  int error;
  int ind;
  int i, t_str_len;
  T_STRING *linebuf_tstr = NULL;
#ifdef DUP_RUN
  int dup_con_h;
  int *dup_req_h = NULL;
#endif

  linebuf_tstr = t_string_make (1000);
  req_h = (int *) malloc (sizeof (int) * SERVER_HANDLE_ALLOC_SIZE);
  bind_info = (T_BIND_INFO *) malloc (sizeof (T_BIND_INFO) * MAX_BIND_VALUE);
#ifdef DUP_RUN
  dup_req_h = (int *) malloc (sizeof (int) * SERVER_HANDLE_ALLOC_SIZE);
#endif

  if (linebuf_tstr == NULL || req_h == NULL || bind_info == NULL
#ifdef DUP_RUN
      || dup_req_h == NULL
#endif
    )
    {
      fprintf (stderr, "malloc error\n");
      goto end_cas_runner;
    }
  memset (req_h, 0, sizeof (int) * SERVER_HANDLE_ALLOC_SIZE);
  memset (bind_info, 0, sizeof (T_BIND_INFO) * MAX_BIND_VALUE);
#ifdef DUP_RUN
  memset (dup_req_h, 0, sizeof (int) * SERVER_HANDLE_ALLOC_SIZE);
#endif

  con_h = cci_connect (broker_host, broker_port, dbname, dbuser, dbpasswd);
  if (con_h < 0)
    {
      PRINT_CCI_ERROR (con_h, NULL, result_fp);
      goto end_cas_runner;
    }
#ifdef DUP_RUN
  dup_con_h = cci_connect (broker_host, broker_port, dbname, dbuser, dbpasswd);
  if (dup_con_h < 0)
    {
      fprintf (stderr, "DUP_RUN cci_connect error\n");
      goto end_cas_runner;
    }
#endif

  if (autocommit_mode)
    {
      if (cci_set_autocommit (con_h, CCI_AUTOCOMMIT_TRUE) < 0)
    {
      fprintf (stderr, "cannot set autocommit mode");
      goto end_cas_runner;
    }
#ifdef DUP_RUN
      if (cci_set_autocommit (dup_con_h, CCI_AUTOCOMMIT_TRUE) < 0)
    {
      fprintf (stderr, "DUP_RUN cannot set autocommit mode");
      goto end_cas_runner;
    }
#endif
    }

  if (statdump_mode)
    {
      req_stat_h = cci_prepare (con_h, "set @collect_exec_stats = 1", 0, &cci_error);
      if (req_stat_h < 0)
    {
      fprintf (stderr, "cci_prepare error\n");
    }
      else
    {
      int res;
      error = cci_execute (req_stat_h, 0, 0, &cci_error);

      res = cci_close_req_handle (req_stat_h);
      if (res < 0)
        {
          fprintf (stderr, "cci_close_req_error\n");
          req_stat_h = -1;
        }

      if (error < 0)
        {
          fprintf (stderr, "cci_execute error\n");
        }
      else
        {
          req_stat_h = cci_prepare (con_h, "show exec statistics", 0, &cci_error);
          if (req_stat_h < 0)
        {
          fprintf (stderr, "cci_prepare error\n");
        }
        }
    }
    }

  while (1)
    {
      if (ut_get_line (fp, linebuf_tstr, NULL, NULL) < 0)
    {
      fprintf (stderr, "malloc error\n");
      goto end_cas_runner;
    }
      t_str_len = t_string_len (linebuf_tstr);
      if (t_str_len <= 0)
    break;
      linebuf = t_string_str (linebuf_tstr);

      if (linebuf[t_str_len - 1] == '\n')
    linebuf[t_str_len - 1] = '\0';

      if (t_str_len == 1 && linebuf[0] == '\0')
    continue;

      if (linebuf[0] == 'Q')
    {
      FREE_MEM (sql_stmt);
      sql_stmt = make_sql_stmt (linebuf + 2);
      if (sql_stmt == NULL)
        {
          goto end_cas_runner;
        }

      if (result_fp)
        {
          fprintf (result_fp, "-------------- query -----------------\n");
          fprintf (result_fp, "%s\n", sql_stmt);
        }
    }
      else if (linebuf[0] == 'P')
    {
      int req_id, prepare_flag;
      struct timeval begin, end;

      if (sscanf (linebuf + 2, "%d %d", &req_id, &prepare_flag) < 2)
        {
          fprintf (stderr, "file format error : %s\n", linebuf);
          FREE_MEM (sql_stmt);
          goto end_cas_runner;
        }
      if (req_id < 0 || req_id >= SERVER_HANDLE_ALLOC_SIZE)
        {
          fprintf (stderr, "request id error : %d (valid range 0-%d)\n", req_id, SERVER_HANDLE_ALLOC_SIZE - 1);
          FREE_MEM (sql_stmt);
          goto end_cas_runner;
        }
      gettimeofday (&begin, NULL);
      req_h[req_id] = cci_prepare (con_h, sql_stmt, prepare_flag, &cci_error);
      gettimeofday (&end, NULL);
      prepare_time = ut_diff_time (&begin, &end);
      sum_prepare_time += prepare_time;

      if (result_fp)
        {
          fprintf (result_fp, "cci_prepare elapsed time : %.3f \n", prepare_time);
        }

      if (req_h[req_id] < 0)
        {
          fprintf (cas_error_fp, "prepare error\n%s\nrequest id %d\n", linebuf, req_id);
          PRINT_CCI_ERROR (req_h[req_id], &cci_error, result_fp);
        }
#ifdef DUP_RUN
      dup_req_h[req_id] = cci_prepare (dup_con_h, sql_stmt, prepare_flag, &cci_error);
#endif
      FREE_MEM (sql_stmt);
    }
      else if (linebuf[0] == 'B')
    {
      if (process_bind (linebuf, &num_bind, bind_info, t_string_bind_len (linebuf_tstr)) < 0)
        {
          FREE_BIND_INFO (num_bind, bind_info);
          goto end_cas_runner;
        }
    }
      else if (linebuf[0] == 'E')
    {
      int res;
      res = process_execute (linebuf, req_h, num_bind, bind_info, result_fp, &sum_execute_time);
#ifdef DUP_RUN
      process_execute (linebuf, dup_req_h, num_bind, bind_info, result_fp, NULL);
#endif
      FREE_BIND_INFO (num_bind, bind_info);
      num_bind = 0;
      if (res < 0)
        goto end_cas_runner;
    }
      else if (linebuf[0] == 'C')
    {
      if (process_close_req (linebuf, req_h, result_fp) < 0)
        goto end_cas_runner;
#ifdef DUP_RUN
      process_close_req (linebuf, dup_req_h, result_fp);
#endif
    }
      else if (linebuf[0] == 'T')
    {
      if (process_endtran (con_h, req_h, result_fp) < 0)
        goto end_cas_runner;
#ifdef DUP_RUN
      if (process_endtran (dup_con_h, dup_req_h, result_fp) < 0)
        {
          fprintf (stderr, "DUP_RUN end_transaction error\n");
        }
#endif
      if (statdump_mode && req_stat_h > 0)
        {
          error = cci_execute (req_stat_h, 0, 0, &cci_error);
          if (error < 0)
        {
          fprintf (cas_error_fp, "execute error\nshow exec statistics\nrequest id %d\n", req_stat_h);
          continue;
        }
          if (result_fp)
        {
          fprintf (result_fp, "SHOW EXEC STATISTICS\n");
        }

          while (1)
        {
          error = cci_cursor (req_stat_h, 1, CCI_CURSOR_CURRENT, &cci_error);

          if (error == CCI_ER_NO_MORE_DATA)
            {
              break;
            }

          if (error < 0)
            {
              fprintf (cas_error_fp, "cursor error\nrequest id %d\n", req_stat_h);
              PRINT_CCI_ERROR (error, &cci_error, result_fp);
              break;
            }

          error = cci_fetch (req_stat_h, &cci_error);
          if (error < 0)
            {
              fprintf (cas_error_fp, "fetch error\nrequest id %d\n", req_stat_h);
              PRINT_CCI_ERROR (error, &cci_error, result_fp);
              break;
            }
          for (i = 1; i <= 2; i++)
            {
              error = cci_get_data (req_stat_h, i, CCI_A_TYPE_STR, &data, &ind);
              if (error < 0)
            {
              fprintf (cas_error_fp, "get data error\nrequest id %d\n", req_stat_h);
              PRINT_CCI_ERROR (error, NULL, result_fp);
              break;
            }
              if (ind < 0 || data == NULL)
            {
              if (result_fp)
                {
                  fprintf (result_fp, "<NULL>\t|");
                }
            }
              else
            {
              if (result_fp)
                {
                  fprintf (result_fp, "%s\t|", data);
                }
            }
            }
          if (result_fp)
            {
              fprintf (result_fp, "\n");
            }
        }
        }
    }
      else
    {
      fprintf (stderr, "file format error : %s\n", linebuf);
    }
    }

end_cas_runner:
  if (req_stat_h > 0)
    {
      cci_close_req_handle (req_stat_h);
    }

  if (con_h > 0)
    {
      cci_disconnect (con_h, &cci_error);
    }
#ifdef DUP_RUN
  if (dup_con_h > 0)
    {
      cci_disconnect (dup_con_h, &cci_error);
    }
#endif

  FREE_MEM (req_h);
  FREE_MEM (bind_info);
  if (linebuf_tstr)
    t_string_free (linebuf_tstr);
#ifdef DUP_RUN
  FREE_MEM (dup_req_h);
#endif
  FREE_MEM (sql_stmt);

  if (ret_exec_time)
    *ret_exec_time = sum_execute_time;
  if (ret_prepare_time)
    *ret_prepare_time = sum_prepare_time;
}

static int
read_conf (void)
{
  FILE *fp;
  char read_buf[1024];
  char buf1[1024], buf2[1024], buf3[1024];
  int lineno = 0;
  const char *conf_file;
  int num_token;
  char *p;

  /* set initial error file name */
  if (cas_err_file == NULL)
    {
      cas_err_file = strdup ("cas_error");
    }

  conf_file = getenv (CAS_RUNNER_CONF_ENV);
  if (conf_file == NULL)
    conf_file = CAS_RUNNER_CONF;

  fp = fopen (conf_file, "r");
  if (fp == NULL)
    {
      /*
       * fprintf(stderr, "fopen error [%s]\n", CAS_RUNNER_CONF); return -1; */
      return 0;
    }

  while (fgets (read_buf, sizeof (read_buf), fp))
    {
      lineno++;

      p = strchr (read_buf, '#');
      if (p)
    {
      *p = '\0';
    }
      num_token = sscanf (read_buf, "%1023s %1023s %1023s", buf1, buf2, buf3);
      if (num_token < 2)
    {
      continue;
    }

      if (num_token == 3)
    {
      if (strcasecmp (buf1, "node") == 0)
        {
          if (num_node >= MAX_NODE_INFO)
        {
          goto error;
        }
          if (make_node_info (&node_table[num_node], buf2, buf3) < 0)
        {
          continue;
        }
          num_node++;
        }
    }
      else
    {
      if (strcasecmp (buf1, "CAS_IP") == 0)
        {
          STRDUP (broker_host, buf2);
        }
      else if (strcasecmp (buf1, "CAS_PORT") == 0)
        {
          broker_port = atoi (buf2);
        }
      else if (strcasecmp (buf1, "DBNAME") == 0)
        {
          STRDUP (dbname, buf2);
        }
      else if (strcasecmp (buf1, "NUM_THREAD") == 0)
        {
          num_thread = atoi (buf2);
        }
      else if (strcasecmp (buf1, "DBUSER") == 0)
        {
          STRDUP (dbuser, buf2);
        }
      else if (strcasecmp (buf1, "DBPASSWD") == 0)
        {
          STRDUP (dbpasswd, buf2);
        }
      else if (strcasecmp (buf1, "REPEAT") == 0)
        {
          repeat_count = atoi (buf2);
        }
      else if (strcasecmp (buf1, "RESULT_FILE") == 0)
        {
          STRDUP (result_file, buf2);
        }
      else if (strcasecmp (buf1, "CAS_ERROR_FILE") == 0)
        {
          STRDUP (cas_err_file, buf2);
        }
      else if (strcasecmp (buf1, "FORK_DELAY") == 0)
        {
          fork_delay = atoi (buf2);
        }
      else if (strcasecmp (buf1, "IGNORE_SERVER_ERROR") == 0)
        {
          int ign_err = atoi (buf2);
          if (ign_err < 0)
        {
          if (num_ign_srv_err >= MAX_IGN_SRV_ERR)
            goto error;
          ign_srv_err_list[num_ign_srv_err++] = ign_err;
        }
        }
      else
        goto error;
    }
    }

  fclose (fp);
  return 0;

error:
  fprintf (stderr, "%s : error [%d] line\n", CAS_RUNNER_CONF, lineno);
  fclose (fp);
  return -1;
}

static int
process_bind (char *linebuf, int *num_bind_p, T_BIND_INFO * bind_info, int bind_len)
{
  char *p;
  int num_bind = *num_bind_p;

  if (num_bind >= MAX_BIND_VALUE)
    {
      fprintf (stderr, "bind buffer overflow[%d]\n", num_bind);
      return -1;
    }

  bind_info[num_bind].type = atoi (linebuf + 2);
  p = strchr (linebuf + 2, ' ');
  if (p == NULL)
    {
      fprintf (stderr, "file format error : %s\n", linebuf);
      return -1;
    }

  if ((bind_info[num_bind].type == CCI_U_TYPE_CHAR) || (bind_info[num_bind].type == CCI_U_TYPE_STRING)
      || (bind_info[num_bind].type == CCI_U_TYPE_BIT) || (bind_info[num_bind].type == CCI_U_TYPE_VARBIT)
      || (bind_info[num_bind].type == CCI_U_TYPE_ENUM) || (bind_info[num_bind].type == CCI_U_TYPE_JSON))
    {
      bind_info[num_bind].len = atoi (p + 1);
      p = strchr (p + 1, ' ');
      if (p == NULL)
    {
      fprintf (stderr, "file format error : %s\n", linebuf);
      return -1;
    }
    }
  else if (bind_info[num_bind].type == CCI_U_TYPE_BLOB || bind_info[num_bind].type == CCI_U_TYPE_CLOB)
    {
      fprintf (stderr, "binding BLOB/CLOB is not implemented : %s\nreplaced with NULL value.\n", p + 1);
      bind_info[num_bind].type = CCI_U_TYPE_NULL;
    }

  if (bind_len > 0)
    {
      bind_info[num_bind].value = (char *) malloc (bind_len + 1);
      if (bind_info[num_bind].value == NULL)
    {
      fprintf (stderr, "malloc error\n");
      return -1;
    }
      memcpy (bind_info[num_bind].value, p + 1, bind_len);
      bind_info[num_bind].value[bind_len] = 0x00;
    }
  else
    {
      bind_info[num_bind].value = strdup (p + 1);
      if (bind_info[num_bind].value == NULL)
    {
      fprintf (stderr, "malloc error\n");
      return -1;
    }
    }

  *num_bind_p = num_bind + 1;
  return 0;
}

static int
process_execute (char *linebuf, int *req_h, int num_bind, T_BIND_INFO * bind_info, FILE * result_fp,
         double *sum_execute_time)
{
  int req_id, exec_flag;
  T_CCI_ERROR cci_error;
  struct timeval begin, end;
  double elapsed_time = 0;

  if (sscanf (linebuf + 2, "%d %d", &req_id, &exec_flag) < 2)
    {
      fprintf (stderr, "file format error : %s\n", linebuf);
      return -1;
    }
  if (req_id < 0 || req_id >= SERVER_HANDLE_ALLOC_SIZE)
    {
      fprintf (stderr, "request id error : %d (valid range 0-%d)\n", req_id, SERVER_HANDLE_ALLOC_SIZE - 1);
      return -1;
    }

  if (num_replica > 1)
    {
      exec_flag |= CCI_EXEC_QUERY_ALL;
    }

  if (req_h[req_id] > 0)
    {
      int res;
      if (num_bind > 0)
    {
      int i, k;
      for (k = 0; k < num_replica; k++)
        {
          for (i = 0; i < num_bind; i++)
        {
          if ((bind_info[i].type == CCI_U_TYPE_VARBIT) || (bind_info[i].type == CCI_U_TYPE_BIT))
            {
              T_CCI_BIT vptr;
              memset ((char *) &vptr, 0x00, sizeof (T_CCI_BIT));
              vptr.size = bind_info[i].len;
              vptr.buf = (char *) bind_info[i].value;
              res =
            cci_bind_param (req_h[req_id], (k * num_bind) + i + 1, CCI_A_TYPE_BIT, (void *) &(vptr),
                    (T_CCI_U_TYPE) bind_info[i].type, CCI_BIND_PTR);
            }
          else
            {
              res =
            cci_bind_param (req_h[req_id], (k * num_bind) + i + 1, CCI_A_TYPE_STR, bind_info[i].value,
                    (T_CCI_U_TYPE) bind_info[i].type, 0);
            }
          if (res < 0)
            {
              fprintf (cas_error_fp, "bind error\n%s\nrequest id %d bind %d\n", linebuf, req_id, i);
              PRINT_CCI_ERROR (res, NULL, result_fp);
            }
        }
        }
    }

      if (dump_query_plan)
    exec_flag |= CCI_EXEC_QUERY_INFO;

      gettimeofday (&begin, NULL);
      res = cci_execute (req_h[req_id], exec_flag, 0, &cci_error);
      gettimeofday (&end, NULL);
      elapsed_time = ut_diff_time (&begin, &end);
      if (!batch_mode && !cubrid_manager_run)
    {
      fprintf (stdout, "exec_time : %.3f \n", elapsed_time);
    }

      if (result_fp)
    {
      fprintf (result_fp, "cci_execute elapsed_time : %.3f \n", elapsed_time);
    }

      if (res < 0)
    {
      fprintf (cas_error_fp, "execute error\n%s\nrequest id %d\n", linebuf, req_id);
      PRINT_CCI_ERROR (res, &cci_error, result_fp);
    }
      else
    {
      print_result (res, req_h[req_id], result_fp);
    }
    }
  if (sum_execute_time)
    *sum_execute_time += elapsed_time;

  return 0;
}

static int
process_close_req (char *linebuf, int *req_h, FILE * result_fp)
{
  int req_id, res;

  req_id = atoi (linebuf + 2);
  if (req_id < 0 || req_id >= SERVER_HANDLE_ALLOC_SIZE)
    {
      fprintf (cas_error_fp, "close error\n%s\nrequest id %d\n", linebuf, req_id);
      PRINT_CCI_ERROR (CCI_ER_REQ_HANDLE, NULL, result_fp);
      return 0;
    }
  if (req_h[req_id] > 0)
    {
      res = cci_close_req_handle (req_h[req_id]);
      if (res < 0)
    {
      fprintf (cas_error_fp, "close error\n%s\nrequest id %d\n", linebuf, req_id);
      PRINT_CCI_ERROR (res, NULL, result_fp);
    }
    }
  req_h[req_id] = 0;
  return 0;
}

static int
process_endtran (int con_h, int *req_h, FILE * result_fp)
{
  int res, i;
  T_CCI_ERROR cci_error;

  struct timeval begin, end;
  double commit_time;

  if (!autocommit_mode)
    {
      gettimeofday (&begin, NULL);
      res = cci_end_tran (con_h, CCI_TRAN_ROLLBACK, &cci_error);
      gettimeofday (&end, NULL);
      commit_time = ut_diff_time (&begin, &end);

      if (result_fp)
    {
      fprintf (result_fp, "cci_end_tran elapsed_time : %.3f \n", commit_time);
    }

      if (res < 0)
    {
      fprintf (cas_error_fp, "end tran error\nconnection handle id %d\n", con_h);
    }
      PRINT_CCI_ERROR (res, &cci_error, result_fp);
    }

  for (i = 0; i < SERVER_HANDLE_ALLOC_SIZE; i++)
    {
      req_h[i] = 0;
    }

  return 0;
}

static void
print_result (int cci_res, int req_id, FILE * result_fp)
{
  int column_count;
  int res;
  int i;
  int ind;
  char *buffer;
  T_CCI_ERROR cci_error;
  int num_tuple = 0;
  T_CCI_CUBRID_STMT cmd_type;
  char *plan;

  if (result_fp == NULL)
    return;

  fprintf (result_fp, "cci_execute:%d\n", cci_res);

  if (dump_query_plan)
    {
      if (cci_get_query_plan (req_id, &plan) >= 0)
    {
      fprintf (result_fp, "---------- query plan --------------\n");
      fprintf (result_fp, "%s\n", (plan ? plan : ""));
      cci_query_info_free (plan);
    }
    }


  cci_get_result_info (req_id, &cmd_type, &column_count);

/*
  if (cmd_type == CUBRID_STMT_CALL_SP) {
    column_count = cci_get_bind_num(req_id);
    printf("col cnt = %d\n", column_count);
  }
*/

  res = cci_cursor (req_id, 1, CCI_CURSOR_FIRST, &cci_error);
  if (res == CCI_ER_NO_MORE_DATA || column_count <= 0)
    return;
  if (res < 0)
    {
      fprintf (cas_error_fp, "cursor error\nrequest id %d\n", req_id);
      PRINT_CCI_ERROR (res, &cci_error, result_fp);
      return;
    }

  fprintf (result_fp, "---------- query result --------------\n");

  while (1)
    {
      res = cci_fetch (req_id, &cci_error);
      if (res < 0)
    {
      fprintf (cas_error_fp, "fetch error\nrequest id %d\n", req_id);
      PRINT_CCI_ERROR (res, &cci_error, result_fp);
      break;
    }
      for (i = 0; i < column_count; i++)
    {
      res = cci_get_data (req_id, i + 1, CCI_A_TYPE_STR, &buffer, &ind);
      if (res < 0)
        {
          fprintf (cas_error_fp, "get data error\nrequest id %d\n", req_id);
          PRINT_CCI_ERROR (res, NULL, result_fp);
          break;
        }
      if (ind < 0 || buffer == NULL)
        fprintf (result_fp, "<NULL>|");
      else
        fprintf (result_fp, "%s|", buffer);
    }
      fprintf (result_fp, "\n");
      num_tuple++;

      if (cmd_type == CUBRID_STMT_CALL_SP)
    {
      break;
    }
      else
    {
      res = cci_cursor (req_id, 1, CCI_CURSOR_CURRENT, &cci_error);
      if (res == CCI_ER_NO_MORE_DATA)
        break;
      if (res < 0)
        {
          fprintf (cas_error_fp, "cursor error\nrequest id %d\n", req_id);
          PRINT_CCI_ERROR (res, NULL, result_fp);
          break;
        }
    }
    }

  fprintf (result_fp, "-- %d rows ----------------------------\n", num_tuple);
}

static int
make_node_info (T_NODE_INFO * node, char *node_name, char *info_str)
{
  char *p;
  char *str = NULL;
  int i;
  char *token[5];

  memset (node, 0, sizeof (T_NODE_INFO));

  trim (node_name);

  info_str = strdup (info_str);
  if (info_str == NULL)
    {
      fprintf (stderr, "malloc error\n");
      return -1;
    }
  trim (info_str);

  str = info_str;
  token[0] = str;
  for (i = 1; i < 5; i++)
    {
      p = strchr (str, ':');
      if (p == NULL)
    goto err;
      *p = '\0';
      str = p + 1;
      token[i] = str;
    }

  node->node_name = strdup (node_name);
  node->dbname = strdup (token[0]);
  node->ip = strdup (token[1]);
  node->port = atoi (token[2]);
  node->dbuser = strdup (token[3]);
  node->dbpasswd = strdup (token[4]);

  if (node->node_name == NULL || node->dbname == NULL || node->ip == NULL || node->dbuser == NULL
      || node->dbpasswd == NULL)
    {
      goto err;
    }

  FREE_MEM (info_str);
  return 0;

err:
  fprintf (stderr, "invalid node format (%s)\n", info_str ? info_str : "NULL");
  FREE_MEM (info_str);
  free_node (node);
  return -1;
}

static void
free_node (T_NODE_INFO * node)
{
  FREE_MEM (node->node_name);
  FREE_MEM (node->dbname);
  FREE_MEM (node->ip);
  FREE_MEM (node->dbuser);
  FREE_MEM (node->dbpasswd);
}

static int
set_args_with_node_info (char *node_name)
{
  int i;
  T_NODE_INFO *node;

  node = NULL;
  for (i = 0; i < num_node; i++)
    {
      if (strcasecmp (node_table[i].node_name, node_name) == 0)
    {
      node = &node_table[i];
      break;
    }
    }
  if (node == NULL)
    return -1;

  if (dbname == NULL)
    dbname = node->dbname;
  if (broker_host == NULL)
    broker_host = node->ip;
  if (broker_port == 0)
    broker_port = node->port;
  if (dbuser == NULL)
    dbuser = node->dbuser;
  if (dbpasswd == NULL)
    dbpasswd = node->dbpasswd;
  return 0;
}

static int
ignore_error (int code)
{
  int i;
  for (i = 0; i < num_ign_srv_err; i++)
    {
      if (ign_srv_err_list[i] == code)
    return 1;
    }
  return 0;
}

static char *
make_sql_stmt (char *src)
{
  char *p;
  char *tmp;
  int query_len;
  char *query;

  tmp = (char *) malloc (strlen (src) + 3);
  if (tmp == NULL)
    {
      fprintf (stderr, "malloc error\n");
      return NULL;
    }
  strcpy (tmp, src);
  for (p = tmp; *p; p++)
    {
      if (*p == 1)
    *p = '\n';
    }

  if (cubrid_manager_run)
    {
      query_len = (int) strlen (tmp);
    }
  else
    {
      trim (tmp);
      query_len = (int) strlen (tmp);
      if (query_len > 0)
    {
      if (tmp[query_len - 1] != ';')
        {
          tmp[query_len++] = ';';
        }
      tmp[query_len++] = '\n';
      tmp[query_len] = '\0';
    }
    }

  if (num_replica == 1)
    {
      query = tmp;
    }
  else
    {
      int i;
      int offset = 0;
      query = (char *) malloc ((query_len + 1) * num_replica);
      if (query == NULL)
    {
      fprintf (stderr, "malloc error\n");
      FREE_MEM (tmp);
      return NULL;
    }
      for (i = 0; i < num_replica; i++)
    {
      strcpy (query + offset, tmp);
      offset += query_len;
    }
      FREE_MEM (tmp);
    }
  return query;
}