Skip to content

File broker_log_replay.c

File List > broker > broker_log_replay.c

Go to the documentation of this file

/*
 * Copyright 2008 Search Solution Corporation
 * Copyright 2016 CUBRID Corporation
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 *
 */


/*
 * broker_log_replay.c -
 */

#ident "$Id$"

#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#if defined(WINDOWS)
#include <io.h>
#else /* WINDOWS */
#include <unistd.h>
#endif /* !WINDOWS */
#include <assert.h>

#include "cubrid_getopt.h"
#include "cas_common.h"
#include "cas_cci.h"
#include "broker_log_util.h"
#include "porting.h"
#include "error_code.h"

#include "filesys.hpp"
#include "filesys_temp.hpp"

#define STAT_MAX_DIFF_TIME              (60000) /* 60 * 1000 : 10 min */
#define SORT_BUF_MAX                    (4096)
#define BIND_STR_BUF_SIZE               (4096)
#define SQL_INFO_TITLE_LEN              (128)

#define CCI_ERR_FILE_NAME               "replay.err"
#define PASS_SQL_FILE_NAME              "skip.sql"

#define DEFAULT_DIFF_TIME_LOWER         (0.01)  /* 10 millisecond */
#define DEFAULT_BREAK_TIME              (0.01)  /* 10 millisecond */

#define INVALID_PORT_NUM                (-1)

#define free_and_init(ptr) \
        do { \
          if ((ptr)) { \
            free ((ptr)); \
            (ptr) = NULL; \
          } \
        } while (0)

enum temp_read_result
{
  READ_STOP = -1,
  READ_CONTINUE = 0,
  READ_SUCCESS = 1
};
typedef enum temp_read_result READ_RESULT;

typedef struct t_sql_info T_SQL_INFO;
struct t_sql_info
{
  char *sql;
  char *rewrite_sql;
  char *bind_str;
  double exec_time;
  double sql_log_time;
  double diff_time;
};

typedef struct t_summary_info T_SUMMARY_INFO;
struct t_summary_info
{
  int num_total_query;
  int num_exec_query;
  int num_skip_query;
  int num_err_query;
  int num_diff_time_query;
  double sum_diff_time;
  double max_diff_time;
  double min_diff_time;
};

typedef struct t_sql_result T_SQL_RESULT;
struct t_sql_result
{
  int diff_time;
  char *sql_info;
};

static int log_replay (char *infilename, char *outfilename);
static char *get_next_log_line (FILE * infp, T_STRING * linebuf_tstr, const off_t last_offset, int *lineno);

static char *get_query_stmt_from_plan (int req);
static int log_prepare (FILE * cci_err, FILE * pass_sql, int con, char *sql_log, T_SQL_INFO * sql_info,
            T_SUMMARY_INFO * summary);
static int get_cci_type (char *p);
static int log_bind_value (int req, T_STRING * linebuf, char *sql_log, char *output_result, int remain_bind_buf);
static int log_execute (int con_h, int req, char *sql_log, double *execute_time);

static void get_sql_time_info (char *sql_log, T_SQL_INFO * info);
static void update_diff_time_statistics (double diff_time);

static int print_temp_result (char *sql_log, T_SQL_INFO * info);
static void update_summary_info (T_SUMMARY_INFO * summary, T_SQL_INFO * sql_info);
static void print_summary_info (T_SUMMARY_INFO * summary);
static void print_result (FILE * outfp, double max_diff_time, double min_diff_time, int tmp_line_len_max);

static char *make_sql_info (char *info_buf, char *start_p, int diff_time, int buf_size);
static int result_sort_func (const void *arg1, const void *arg2);
static READ_RESULT get_temp_file_line (char *read_buf, unsigned int read_buf_size, int *diff_time, char **endp);
static int print_result_without_sort (FILE * outfp, int print_diff_time_lower, int read_buf_max);
static int print_result_with_sort (FILE * outfp, int print_diff_time_lower, int num_query, int read_buf_max);

static int get_args (int argc, char *argv[]);

static int open_file (char *infilename, char *outfilename, FILE ** infp, FILE ** outfp, FILE ** cci_errfp,
              FILE ** skip_sqlfp);
static void close_file (FILE * infp, FILE * outfp, FILE * cci_errfp, FILE * skip_sqlfp);

static char *host = NULL;
static int broker_port = INVALID_PORT_NUM;
static char *dbname = NULL;
static char *dbuser = NULL;
static char *dbpasswd = NULL;

static char from_date[128] = "";
static char to_date[128] = "";
static char check_date_flag = 0;

static char rewrite_query_flag = 0;

static int break_time = 0;
static double print_result_diff_time_lower = 0;

static FILE *br_tmpfp = NULL;
static char br_tmp_filename[PATH_MAX];

static unsigned int num_slower_queries[STAT_MAX_DIFF_TIME] = { 0 };
static unsigned int num_faster_queries[STAT_MAX_DIFF_TIME] = { 0 };

/*
 * log_replay() - log_replay main routine
 *   return: NO_ERROR or ER_FAILED
 *   infp(in): input file pointer
 *   outfp(in): output file pointer
 *   last_offset(in): last offset of input file
 */
static int
log_replay (char *infilename, char *outfilename)
{
  char *linebuf;
  int lineno = 0;
  char *msg_p;
  T_STRING *linebuf_tstr = NULL;
  char date_str[DATE_STR_LEN + 1];
  char bind_str_buf[BIND_STR_BUF_SIZE];
  int remain_bind_buf = 0;
  char conn_url[1024];
  int con_h, req;
  int result = 0;
  int bind_str_offset = 0;
  int temp_line_len = 0;
  int temp_line_len_max = 0;
  FILE *cci_errfp = NULL;
  FILE *skip_sqlfp = NULL;
  FILE *outfp = NULL;
  FILE *infp = NULL;
  struct timeval begin, end;
  double program_run_time;
  off_t last_offset;
  T_SQL_INFO sql_info;
  T_SUMMARY_INFO summary;
  T_CCI_ERROR err_buf;

  gettimeofday (&begin, NULL);

  memset (&summary, 0, sizeof (T_SUMMARY_INFO));

  snprintf (conn_url, sizeof (conn_url), "cci:cubrid:%s:%u:%s:::", host, broker_port, dbname);
  con_h = cci_connect_with_url (conn_url, dbuser, dbpasswd);
  if (con_h < 0)
    {
      fprintf (stderr, "cci connect error. url [%s]\n", conn_url);
      return ER_FAILED;
    }

  result = cci_set_autocommit (con_h, CCI_AUTOCOMMIT_FALSE);
  if (result < 0)
    {
      fprintf (stderr, "cannot set autocommit mode\n");
      goto end;
    }

  if (open_file (infilename, outfilename, &infp, &outfp, &cci_errfp, &skip_sqlfp) < 0)
    {
      result = ER_FAILED;
      goto end;
    }

  assert (infp != NULL);
  last_offset = lseek (fileno (infp), (off_t) 0, SEEK_END);

  fseek (infp, (off_t) 0, SEEK_SET);

  linebuf_tstr = t_string_make (1024);
  if (linebuf_tstr == NULL)
    {
      fprintf (stderr, "memory allocation failed\n");
      result = ER_FAILED;
      goto end;
    }

  while (1)
    {
      linebuf = get_next_log_line (infp, linebuf_tstr, last_offset, &lineno);
      if (linebuf == NULL)
    {
      break;
    }
      if (is_cas_log (linebuf) == 0)
    {
      continue;
    }

      if (check_date_flag == 1)
    {
      GET_CUR_DATE_STR (date_str, linebuf);
      if (ut_check_log_valid_time (date_str, from_date, to_date) < 0)
        {
          continue;
        }
    }

      msg_p = get_msg_start_ptr (linebuf);
      if (strncmp (msg_p, "execute", 7) != 0)
    {
      continue;
    }

      memset (&sql_info, '\0', sizeof (T_SQL_INFO));

      req = log_prepare (cci_errfp, skip_sqlfp, con_h, msg_p, &sql_info, &summary);
      if (req < 0)
    {
      free_and_init (sql_info.sql);
      free_and_init (sql_info.rewrite_sql);
      continue;
    }

      while (1)
    {
      linebuf = get_next_log_line (infp, linebuf_tstr, last_offset, &lineno);
      if (linebuf == NULL)
        {
          break;
        }
      if (is_cas_log (linebuf) == 0)
        {
          continue;
        }

      msg_p = get_msg_start_ptr (linebuf);
      if (strncmp (msg_p, "bind ", 5) == 0)
        {
          if (sql_info.bind_str == NULL)
        {
          memset (bind_str_buf, '\0', sizeof (bind_str_buf));
          sql_info.bind_str = bind_str_buf;
          remain_bind_buf = sizeof (bind_str_buf);
        }

          bind_str_offset = log_bind_value (req, linebuf_tstr, msg_p, sql_info.bind_str, remain_bind_buf);
          if (bind_str_offset < 0)
        {
          fprintf (stderr, "log bind error [line:%d]\n", lineno);
          break;
        }
          sql_info.bind_str += bind_str_offset;
          remain_bind_buf -= bind_str_offset;
        }
      else if (strncmp (msg_p, "execute", 7) == 0)
        {
          result = log_execute (con_h, req, msg_p, &sql_info.exec_time);
          if (result < 0)
        {
          fprintf (cci_errfp, "cci execute error\n");
          if (sql_info.rewrite_sql)
            {
              fprintf (cci_errfp, "rewrite sql[%s]\n", sql_info.rewrite_sql);
            }
          fprintf (cci_errfp, "sql[%s]\n", sql_info.sql);
          if (sql_info.bind_str)
            {
              fprintf (cci_errfp, "bind[%s]\n", bind_str_buf);
            }
          summary.num_err_query++;
          cci_close_req_handle (req);
          break;
        }
          cci_close_req_handle (req);

          summary.num_exec_query++;

          msg_p = strstr (msg_p, "time");
          if (msg_p == NULL)
        {
          /* unexpected sql log. pass this sql to write result */
          break;
        }

          if (sql_info.bind_str != NULL)
        {
          /* restore bind_str with first address of bind string buffer */
          sql_info.bind_str = bind_str_buf;
        }

          get_sql_time_info (msg_p + 5, &sql_info);

          update_summary_info (&summary, &sql_info);

          if (sql_info.diff_time < print_result_diff_time_lower)
        {
          break;
        }

          update_diff_time_statistics (sql_info.diff_time);

          /* we write result in temp file to order after all sql_log execute */
          temp_line_len = print_temp_result (msg_p, &sql_info);

          temp_line_len_max = MAX (temp_line_len_max, temp_line_len);

          break;
        }
      else
        {
          break;
        }
    }

      free_and_init (sql_info.sql);
      free_and_init (sql_info.rewrite_sql);
    }

  fflush (br_tmpfp);

  print_summary_info (&summary);

  print_result (outfp, summary.max_diff_time, summary.min_diff_time, temp_line_len_max + 1);

  gettimeofday (&end, NULL);
  program_run_time = ut_diff_time (&begin, &end);

  fprintf (stdout, "\n%s: %f sec\n", "cubrid_replay run time", program_run_time);

end:
  cci_disconnect (con_h, &err_buf);

  close_file (infp, outfp, cci_errfp, skip_sqlfp);

  if (linebuf_tstr)
    {
      free_and_init (linebuf_tstr->data);
    }
  free_and_init (linebuf_tstr);

  return result;
}

/*
 * get_next_log_line() -
 *   return: address of linebuf
 *   infp(in):
 *   linebuf_tstr(in):
 *   last_offset(in):
 *   lineno(in/out):
 */
static char *
get_next_log_line (FILE * infp, T_STRING * linebuf_tstr, const off_t last_offset, int *lineno)
{
  char *linebuf;
  off_t cur_offset;

  assert (lineno != NULL);

  cur_offset = ftell (infp);
  if (cur_offset >= last_offset)
    {
      return NULL;
    }

  if (ut_get_line (infp, linebuf_tstr, NULL, NULL) < 0)
    {
      fprintf (stderr, "memory allocation failed\n");
      return NULL;
    }
  if (t_string_len (linebuf_tstr) <= 0)
    {
      return NULL;
    }
  linebuf = t_string_str (linebuf_tstr);

  if (linebuf[strlen (linebuf) - 1] == '\n')
    {
      linebuf[strlen (linebuf) - 1] = '\0';
    }

  (*lineno)++;

  return linebuf;
}

/*
 * get_query_stmt_from_plan() -
 *   return: sql statment from query plan or NULL
 *   req(in):
 */
static char *
get_query_stmt_from_plan (int req)
{
  char *plan = NULL;
  char *sql_stmt = NULL, *p;
  char *result_sql = NULL;
  int rewrite_sql_len;
  const char *query_header_str = "Query stmt:";

  if (cci_get_query_plan (req, &plan) < 0)
    {
      return NULL;
    }

  p = plan;

  while (1)
    {
      /* we find the last Query stmt: */

      p = strstr (p, query_header_str);
      if (p == NULL)
    {
      break;
    }
      sql_stmt = p;
      p += strlen (query_header_str);
    }

  if (sql_stmt == NULL)
    {
      goto error;
    }

  sql_stmt += strlen (query_header_str);

  trim (sql_stmt);
  if (*sql_stmt == '\0')
    {
      goto error;
    }

  rewrite_sql_len = (int) strlen (sql_stmt);

  result_sql = (char *) malloc (rewrite_sql_len);
  if (result_sql)
    {
      p = strstr (sql_stmt, "class ");
      if (p == NULL)
    {
      snprintf (result_sql, rewrite_sql_len, "%s", sql_stmt);
      cci_query_info_free (plan);
      return result_sql;
    }
      p += 6;

      p = strchr (p, ' ');
      if (p == NULL)
    {
      /* in this case, we found invalid pattern. just return NULL */
      goto error;
    }
      if (*(p - 1) == ',')
    {
      p--;
    }

      snprintf (result_sql, rewrite_sql_len, "select 1 %s", p);
    }
  else
    {
      fprintf (stderr, "memory allocation failed\n");
    }

  cci_query_info_free (plan);

  return result_sql;

error:
  free_and_init (result_sql);

  if (plan)
    {
      cci_query_info_free (plan);
    }

  return NULL;
}

/*
 * log_prepare() -
 *   return: request handle id or ER_FAILED
 *   cci_errfp(in): cci error file pointer
 *   pass_sql(in): pass sql file pointer
 *   con(in): connection handle id
 *   sql_log(in):
 *   sql_info(out):
 *   summary(out):
 */
static int
log_prepare (FILE * cci_errfp, FILE * pass_sql, int con, char *sql_log, T_SQL_INFO * sql_info, T_SUMMARY_INFO * summary)
{
  int req, exec_h_id;
  int prepare_flag, execute_flag;
  int result;
  char *endp;
  char *rewrite_query;
  T_CCI_ERROR err_buf;
  T_CCI_CUBRID_STMT cmd_type = CUBRID_STMT_NONE;

  sql_log = ut_get_execute_type (sql_log, &prepare_flag, &execute_flag);
  if (sql_log == NULL)
    {
      return ER_FAILED;
    }

  if (strncmp (sql_log, "srv_h_id ", 9) != 0)
    {
      return ER_FAILED;
    }

  result = str_to_int32 (&exec_h_id, &endp, (sql_log + 9), 10);
  if (result != 0)
    {
      return ER_FAILED;
    }

  summary->num_total_query++;

  sql_log = endp + 1;

  req = cci_prepare (con, sql_log, prepare_flag, &err_buf);
  if (req < 0)
    {
      summary->num_err_query++;
      fprintf (cci_errfp, "cci prepare error [sql:%s]\n", sql_log);
      return ER_FAILED;
    }

  (void) cci_get_result_info (req, &cmd_type, NULL);

  if (rewrite_query_flag == 1 && (cmd_type == CUBRID_STMT_UPDATE || cmd_type == CUBRID_STMT_DELETE))
    {
      rewrite_query = get_query_stmt_from_plan (req);
      if (rewrite_query == NULL)
    {
      summary->num_skip_query++;
      fprintf (pass_sql, "skip sql [%s]\n", sql_log);
      cci_close_req_handle (req);
      return ER_FAILED;
    }

      cci_close_req_handle (req);

      req = cci_prepare (con, rewrite_query, prepare_flag, &err_buf);
      if (req < 0)
    {
      summary->num_err_query++;
      fprintf (cci_errfp, "cci prepare error [sql:%s]\n", rewrite_query);
      free_and_init (rewrite_query);
      return ER_FAILED;
    }

      sql_info->rewrite_sql = rewrite_query;

      (void) cci_get_result_info (req, &cmd_type, NULL);
    }

  if (cmd_type != CUBRID_STMT_SELECT)
    {
      /* skip this sql stmt */
      cci_close_req_handle (req);
      return ER_FAILED;
    }

  sql_info->sql = strdup (sql_log);
  if (sql_info->sql == NULL)
    {
      fprintf (stderr, "memory allocation failed\n");
      return ER_FAILED;
    }

  return req;
}

/*
 * get_cci_type() - get bind cci type from sql log
 *   return: CCI_U_TYPE
 *   p(in):
 */
static int
get_cci_type (char *p)
{
  int type = -1;

  assert (p);
  switch (*p)
    {
    case 'B':
      if (memcmp (p, "BIGINT", 7) == 0)
    {
      type = CCI_U_TYPE_BIGINT;
    }
      else if (memcmp (p, "BIT", 4) == 0)
    {
      type = CCI_U_TYPE_BIT;
    }
      else if (memcmp (p, "BLOB", 5) == 0)
    {
      type = CCI_U_TYPE_NULL;
    }
      break;

    case 'C':
      if (memcmp (p, "CHAR", 5) == 0)
    {
      type = CCI_U_TYPE_CHAR;
    }
      else if (memcmp (p, "CLOB", 5) == 0)
    {
      type = CCI_U_TYPE_NULL;
    }
      break;

    case 'E':
      if (memcmp (p, "ENUM", 5) == 0)
    {
      type = CCI_U_TYPE_ENUM;
    }
      break;

    case 'J':
      if (memcmp (p, "JSON", 5) == 0)
    {
      type = CCI_U_TYPE_JSON;
    }
      break;

    case 'I':
      if (memcmp (p, "INT", 4) == 0)
    {
      type = CCI_U_TYPE_INT;
    }
      break;

    case 'D':
      if (memcmp (p, "DOUBLE", 7) == 0)
    {
      type = CCI_U_TYPE_DOUBLE;
    }
      else if (memcmp (p, "DATE", 5) == 0)
    {
      type = CCI_U_TYPE_DATE;
    }
      else if (memcmp (p, "DATETIME", 9) == 0)
    {
      type = CCI_U_TYPE_DATETIME;
    }
      else if (memcmp (p, "DATETIMETZ", 11) == 0)
    {
      type = CCI_U_TYPE_DATETIMETZ;
    }
      break;

    case 'F':
      if (memcmp (p, "FLOAT", 6) == 0)
    {
      type = CCI_U_TYPE_FLOAT;
    }
      break;

    case 'M':
      if (memcmp (p, "MONETARY", 9) == 0)
    {
      type = CCI_U_TYPE_MONETARY;
    }
      break;

    case 'N':
      if (memcmp (p, "NUMERIC", 8) == 0)
    {
      type = CCI_U_TYPE_NUMERIC;
    }
      else if (memcmp (p, "NULL", 5) == 0)
    {
      type = CCI_U_TYPE_NULL;
    }
      break;

    case 'O':
      if (memcmp (p, "OBJECT", 7) == 0)
    {
      type = CCI_U_TYPE_OBJECT;
    }
      break;

    case 'S':
      if (memcmp (p, "SHORT", 6) == 0)
    {
      type = CCI_U_TYPE_SHORT;
    }
      break;

    case 'T':
      if (memcmp (p, "TIME", 5) == 0)
    {
      type = CCI_U_TYPE_TIME;
    }
      else if (memcmp (p, "TIMESTAMP", 10) == 0)
    {
      type = CCI_U_TYPE_TIMESTAMP;
    }
      else if (memcmp (p, "TIMESTAMPTZ", 12) == 0)
    {
      type = CCI_U_TYPE_TIMESTAMPTZ;
    }
      break;

    case 'U':
      if (memcmp (p, "UINT", 5) == 0)
    {
      type = CCI_U_TYPE_UINT;
    }
      else if (memcmp (p, "UBIGINT", 8) == 0)
    {
      type = CCI_U_TYPE_UBIGINT;
    }
      else if (memcmp (p, "USHORT", 7) == 0)
    {
      type = CCI_U_TYPE_USHORT;
    }
      break;

    case 'V':
      if (memcmp (p, "VARCHAR", 8) == 0)
    {
      type = CCI_U_TYPE_STRING;
    }
      else if (memcmp (p, "VARBIT", 7) == 0)
    {
      type = CCI_U_TYPE_VARBIT;
    }
      break;

    default:
      break;
    }

  return type;
}

/*
 * log_bind_value() -
 *   return: offset of bind string buffer or ER_FAILED
 *   req(in):
 *   linebuf(in):
 *   sql_log(in):
 *   output_result(out):
 *   remain_bind_buf(in):
 */
static int
log_bind_value (int req, T_STRING * linebuf, char *sql_log, char *output_result, int remain_bind_buf)
{
  char *p, *q, *r;
  char *value_p;
  char *endp;
  int type, res;
  int bind_idx;
  int bind_len = 0;
  int result = 0;
  int offset = 0;

  assert (req > 0);

  sql_log += 5;

  result = str_to_int32 (&bind_idx, &endp, sql_log, 10);
  if (result < 0)
    {
      fprintf (stderr, "invalid bind index\n");
      return ER_FAILED;
    }

  p = strchr (sql_log, ':');
  if (p == NULL)
    {
      return ER_FAILED;
    }
  p += 2;
  q = strchr (p, ' ');
  if (q == NULL)
    {
      if (strcmp (p, "NULL") == 0)
    {
      value_p = (char *) "";
    }
      else
    {
      return ER_FAILED;
    }
    }
  else
    {
      bind_len = t_string_bind_len (linebuf);
      if (bind_len > 0)
    {
      r = strchr (q, ')');
      if (r == NULL)
        {
          return ER_FAILED;
        }
      *q = '\0';
      *r = '\0';
      value_p = r + 1;
    }
      else
    {
      *q = '\0';
      value_p = q + 1;
    }
    }

  type = get_cci_type (p);
  if (type == CCI_U_TYPE_NULL)
    {
      value_p = (char *) "";
    }
  else if (type == -1)
    {
      fprintf (stderr, "unknown cci type\n");
      return ER_FAILED;
    }

  if ((type == CCI_U_TYPE_VARBIT) || (type == CCI_U_TYPE_BIT))
    {
      T_CCI_BIT vptr;
      memset ((char *) &vptr, 0x00, sizeof (T_CCI_BIT));
      vptr.size = bind_len;
      vptr.buf = (char *) value_p;
      res = cci_bind_param (req, bind_idx, CCI_A_TYPE_BIT, (void *) &(vptr), (T_CCI_U_TYPE) type, CCI_BIND_PTR);
    }
  else
    {
      res = cci_bind_param (req, bind_idx, CCI_A_TYPE_STR, value_p, (T_CCI_U_TYPE) type, 0);
    }
  if (res != CCI_ER_NO_ERROR)
    {
      return ER_FAILED;
    }

  if (remain_bind_buf <= 0)
    {
      return NO_ERROR;
    }

  if (bind_len > 0)
    {
      offset = snprintf (output_result, remain_bind_buf, "%d: '%s',END", bind_idx, value_p);
    }
  else
    {
      offset = snprintf (output_result, remain_bind_buf, "%d: %s,END", bind_idx, value_p);
    }

  return offset;
}

/*
 * log_execute() -
 *   return: NO_ERROR or ER_FAILED
 *   con_h(in):
 *   req(in):
 *   sql_log(in):
 *   execute_time(out):
 */
static int
log_execute (int con_h, int req, char *sql_log, double *execute_time)
{
  char *msg_p;
  int prepare_flag;
  int execute_flag;
  int res;
  struct timeval begin, end;
  T_CCI_ERROR cci_error;
  CCI_AUTOCOMMIT_MODE mode;

  assert (req > 0);

  *execute_time = 0;

  mode = cci_get_autocommit (con_h);
  if (mode == CCI_AUTOCOMMIT_TRUE)
    {
      res = cci_set_autocommit (con_h, CCI_AUTOCOMMIT_FALSE);
      if (res < 0)
    {
      fprintf (stderr, "cannot set autocommit mode\n");
      return ER_FAILED;
    }
    }

  msg_p = ut_get_execute_type (sql_log, &prepare_flag, &execute_flag);
  if (msg_p == NULL)
    {
      return ER_FAILED;
    }

  gettimeofday (&begin, NULL);

  res = cci_execute (req, (char) execute_flag, 0, &cci_error);
  if (res < 0)
    {
      if (break_time > 0)
    {
      SLEEP_MILISEC (0, break_time);
    }
      return res;
    }

  gettimeofday (&end, NULL);
  *execute_time = ut_diff_time (&begin, &end);

  res = cci_end_tran (con_h, CCI_TRAN_ROLLBACK, &cci_error);

  if (break_time > 0)
    {
      SLEEP_MILISEC (0, break_time);
    }

  return res;
}

/*
 * get_sql_time_info() -
 *   return: void
 *   sql_info(in):
 *   info(in/out):
 */
static void
get_sql_time_info (char *sql_log, T_SQL_INFO * info)
{
  assert (info != NULL);

  sscanf (sql_log, "%lf", &info->sql_log_time);

  info->diff_time = info->exec_time - info->sql_log_time;

  return;
}

/*
 * update_diff_time_statistics() -
 *   return: void
 *   diff_time(in):
 */
static void
update_diff_time_statistics (double diff_time)
{
  /*
   * we save count of query witch have specific diff time
   * to use when order with diff time
   */
  int diff_in_msec;

  diff_in_msec = (int) (diff_time * 1000);

  if (diff_in_msec >= STAT_MAX_DIFF_TIME)
    {
      num_slower_queries[STAT_MAX_DIFF_TIME - 1]++;
    }
  else if (diff_in_msec >= 0)
    {
      num_slower_queries[diff_in_msec]++;
    }
  else if (diff_in_msec <= (-STAT_MAX_DIFF_TIME))
    {
      num_faster_queries[STAT_MAX_DIFF_TIME - 1]++;
    }
  else
    {
      assert (diff_in_msec < 0);
      num_faster_queries[(-diff_in_msec)]++;
    }

  return;
}

/*
 * print_temp_result() - save temp result until all sql are executed
 *   return: line length
 *   sql_log(in):
 *   info(in):
 */
static int
print_temp_result (char *sql_log, T_SQL_INFO * info)
{
  size_t bind_len;
  int line_len = 0;
  char *rewrite_sql = info->rewrite_sql;
  char *bind_str = info->bind_str;

  line_len +=
    fprintf (br_tmpfp, "%d %d %d %s", (int) (info->diff_time * 1000), (int) (info->exec_time * 1000),
         (int) (info->sql_log_time * 1000), info->sql);

  if (rewrite_sql != NULL)
    {
      line_len += fprintf (br_tmpfp, "REWRITE:%s", rewrite_sql);
    }
  if (bind_str != NULL)
    {
      bind_len = strlen (bind_str);
      if (bind_len > 0)
    {
      line_len += fprintf (br_tmpfp, "BIND:%s", bind_str);
    }
    }

  line_len += fprintf (br_tmpfp, "\n");

  return line_len;
}

/*
 * update_summary_info() -
 *   return: void
 *   summary(out):
 *   sql_info(out):
 */
static void
update_summary_info (T_SUMMARY_INFO * summary, T_SQL_INFO * sql_info)
{
  summary->sum_diff_time += sql_info->diff_time;

  if (sql_info->diff_time >= print_result_diff_time_lower)
    {
      summary->num_diff_time_query++;
    }

  summary->max_diff_time = MAX (summary->max_diff_time, sql_info->diff_time);
  /* min_diff_time is used for knowing lower bound of diff time when order result */
  summary->min_diff_time = MIN (summary->min_diff_time, sql_info->diff_time);

  return;
}

/*
 * print_summary_info () -
 *   return: void
 *   summary(in):
 */
static void
print_summary_info (T_SUMMARY_INFO * summary)
{
  double avg_diff_time = 0;
  char msg_buf[64];

  if (summary->num_exec_query != 0)
    {
      avg_diff_time = summary->sum_diff_time / summary->num_exec_query;
    }

  fprintf (stdout, "------------------- Result Summary --------------------------\n");
  fprintf (stdout, "* %-40s : %d\n", "Total queries", summary->num_total_query);
  fprintf (stdout, "* %-40s : %d\n", "Skipped queries (see skip.sql)", summary->num_skip_query);
  fprintf (stdout, "* %-40s : %d\n", "Failed queries (see replay.err)", summary->num_err_query);
  snprintf (msg_buf, sizeof (msg_buf), "Slow queries (time diff > %.3f secs)", print_result_diff_time_lower);
  fprintf (stdout, "* %-40s : %d\n", msg_buf, summary->num_diff_time_query);
  fprintf (stdout, "* %-40s : %.3f\n", "Max execution time diff", summary->max_diff_time);
  fprintf (stdout, "* %-40s : %.3f\n", "Avg execution time diff", avg_diff_time);

  return;
}

/*
 * make_sql_info() - make output result from temp file
 *   return: memory address of result or NULL
 *   sql_info(in):
 *   start_p(in):
 *   diff_time(in):
 *   buf_size(in):
 */
static char *
make_sql_info (char *sql_info, char *start_p, int diff_time, int buf_size)
{
  char *endp, *p;
  char *sql = NULL, *rewrite_sql = NULL;
  char *bind_str = NULL;
  int res, offset = 0;
  int sql_log_time, exec_time;

  res = str_to_int32 (&exec_time, &endp, start_p, 10);

  p = endp + 1;
  res = str_to_int32 (&sql_log_time, &endp, p, 10);

  sql = endp + 1;
  p = strstr (sql, "REWRITE:");
  if (p)
    {
      *p = '\0';
      p += 8;
      rewrite_sql = p;
      endp = p;
    }

  p = strstr (endp, "BIND:");
  if (p)
    {
      *p = '\0';
      p += 5;
      bind_str = p;
    }

  offset =
    snprintf (sql_info, buf_size, "EXEC TIME (REPLAY / SQL_LOG / DIFF): %.3f / %.3f / %.3f\n" "SQL: %s\n",
          ((double) exec_time) / 1000, ((double) sql_log_time) / 1000, ((double) diff_time) / 1000, sql);

  if (rewrite_sql && (buf_size - offset) > 0)
    {
      offset += snprintf (sql_info + offset, (buf_size - offset), "REWRITE SQL: %s\n", rewrite_sql);
    }

  if (bind_str == NULL)
    {
      return sql_info;
    }

  while ((buf_size - offset) > 0)
    {
      p = strstr (bind_str, ",END");
      if (p)
    {
      *p = '\0';
      offset += snprintf (sql_info + offset, (buf_size - offset), "BIND %s\n", bind_str);
      p += 4;
      bind_str = p;
    }
      else
    {
      break;
    }
    }

  return sql_info;
}

/*
 * result_sort_func() - compare sql with diff time
 *   return: difference of diff time of two sql
 *   arg1(in):
 *   arg2(in):
 */
static int
result_sort_func (const void *arg1, const void *arg2)
{
  int diff_time1;
  int diff_time2;

  diff_time1 = ((T_SQL_RESULT *) arg1)->diff_time;
  diff_time2 = ((T_SQL_RESULT *) arg2)->diff_time;

  return (diff_time2 - diff_time1);
}

/*
 * get_temp_file_lien() -
 *   return: READ_RESULT (SUCCESS / CONTINUE / STOP)
 *   read_buf(in):
 *   read_buf_size(in):
 *   diff_time(out):
 *   endp(out):
 */
static READ_RESULT
get_temp_file_line (char *read_buf, unsigned int read_buf_size, int *diff_time, char **endp)
{
  char *p;
  int res;

  p = fgets (read_buf, read_buf_size, br_tmpfp);
  if (p)
    {
      res = str_to_int32 (diff_time, endp, read_buf, 10);
      if (res != 0)
    {
      return READ_CONTINUE;
    }

      p = strchr (read_buf, '\n');
      if (p)
    {
      *(p) = '\0';

      return READ_SUCCESS;
    }
      else
    {
      /* continue reading temp file. buf this read_buf isn't written in output file */
      return READ_CONTINUE;
    }
    }
  /* end reading temp file */
  return READ_STOP;
}

/*
 * print_result_without_sort() -
 *   return: NO_ERROR or ER_FAILED
 *   outfp(in):
 *   print_diff_time_lower(in): min diff time which will be print
 *   read_buf_max(in): size of read buffer
 */
static int
print_result_without_sort (FILE * outfp, int print_diff_time_lower, int read_buf_max)
{
  int diff_time;
  int res = 0;
  char *endp;
  char *read_buf;
  T_SQL_RESULT result;

  auto[filename, next_tmp_fp] = filesys::open_temp_file ("br_", "w+b");
  if (next_tmp_fp == NULL)
    {
      fprintf (stderr, "cannot open temp file\n");
      return ER_FAILED;
    }

  read_buf = (char *) malloc (read_buf_max);
  if (read_buf == NULL)
    {
      fprintf (stderr, "memory allocation failed\n");
      fclose (next_tmp_fp);
      unlink (filename.c_str ());
      return ER_FAILED;
    }

  fseek (br_tmpfp, (off_t) 0, SEEK_SET);
  while (1)
    {
      res = get_temp_file_line (read_buf, read_buf_max, &diff_time, &endp);
      if (res == READ_STOP)
    {
      break;
    }
      else if (res == READ_CONTINUE)
    {
      continue;
    }

      if (diff_time < print_diff_time_lower)
    {
      fprintf (next_tmp_fp, "%s\n", read_buf);
      continue;
    }

      result.sql_info = (char *) malloc (read_buf_max + SQL_INFO_TITLE_LEN);
      if (result.sql_info == NULL)
    {
      fprintf (stderr, "memory allocation failed\n");
      fclose (next_tmp_fp);
      unlink (filename.c_str ());
      free_and_init (read_buf);
      return ER_FAILED;
    }

      make_sql_info (result.sql_info, endp + 1, diff_time, read_buf_max + SQL_INFO_TITLE_LEN);

      fprintf (outfp, "%s\n", result.sql_info);
      free_and_init (result.sql_info);
    }

  fclose (br_tmpfp);
  unlink (br_tmp_filename);

  fflush (next_tmp_fp);

  /* save next temp file pointer in global variable */
  br_tmpfp = next_tmp_fp;
  strcpy (br_tmp_filename, filename.c_str ());

  free_and_init (read_buf);
  return NO_ERROR;
}

/*
 * print_result_with_sort() -
 *   return: NO_ERROR or ER_FAILED
 *   outfp(in):
 *   print_diff_time_lower(in):
 *   num_query(in):
 *   read_buf_max(in): size of read buffer
 */
static int
print_result_with_sort (FILE * outfp, int print_diff_time_lower, int num_query, int read_buf_max)
{
  int diff_time;
  int i = 0;
  int res = 0;
  char *endp;
  char *read_buf = NULL;
  T_SQL_RESULT *result;

  if (num_query <= 0)
    {
      return NO_ERROR;
    }

  auto[filename, next_tmp_fp] = filesys::open_temp_file ("br_", "w+b");
  if (next_tmp_fp == NULL)
    {
      fprintf (stderr, "cannot open temp file\n");
      return ER_FAILED;
    }

  result = (T_SQL_RESULT *) malloc (sizeof (T_SQL_RESULT) * num_query);
  if (result == NULL)
    {
      fprintf (stderr, "memory allocation failed\n");
      fclose (next_tmp_fp);
      unlink (filename.c_str ());
      return ER_FAILED;
    }
  memset (result, '\0', sizeof (T_SQL_RESULT) * num_query);

  read_buf = (char *) malloc (read_buf_max);
  if (read_buf == NULL)
    {
      fprintf (stderr, "memory allocation failed\n");
      goto error;
    }

  fseek (br_tmpfp, (off_t) 0, SEEK_SET);
  while (1)
    {
      res = get_temp_file_line (read_buf, read_buf_max, &diff_time, &endp);
      if (res == READ_STOP)
    {
      break;
    }
      else if (res == READ_CONTINUE)
    {
      continue;
    }

      if (diff_time >= print_diff_time_lower)
    {
      result[i].sql_info = (char *) malloc (read_buf_max + SQL_INFO_TITLE_LEN);
      if (result[i].sql_info == NULL)
        {
          fprintf (stderr, "memory allocation failed\n");
          goto error;
        }

      result[i].diff_time = diff_time;
      make_sql_info (result[i].sql_info, endp + 1, diff_time, read_buf_max + SQL_INFO_TITLE_LEN);
      i++;
    }
      else
    {
      /*
       * if sql's diff time is shorter than diff_time_lower,
       * write in next temp file. it will be read next time.
       */
      fprintf (next_tmp_fp, "%s\n", read_buf);
    }
    }

  num_query = i;
  /* sort result order by diff time */
  qsort (result, num_query, sizeof (T_SQL_RESULT), result_sort_func);

  for (i = 0; i < num_query; i++)
    {
      fprintf (outfp, "%s\n", result[i].sql_info);
      free_and_init (result[i].sql_info);
    }

  free_and_init (read_buf);

  free_and_init (result);

  fclose (br_tmpfp);
  unlink (br_tmp_filename);

  fflush (outfp);
  fflush (next_tmp_fp);

  /* save next temp file pointer in global variable */
  br_tmpfp = next_tmp_fp;

  return NO_ERROR;

error:
  free_and_init (read_buf);

  free_and_init (result);

  num_query = i;
  for (i = 0; i < num_query; i++)
    {
      free_and_init (result[i].sql_info);
    }
  fclose (next_tmp_fp);
  unlink (filename.c_str ());

  return ER_FAILED;
}

/*
 * print_result() -
 *   outfp(in):
 *   max_diff_time(in):
 *   min_diff_time(in):
 *   temp_line_len_max(in): line max of temp file result.
 */
static void
print_result (FILE * outfp, double max_diff_time, double min_diff_time, int temp_line_len_max)
{
  int i, num_sort = 0;
  int max_diff_in_msec = (int) (max_diff_time * 1000) + 1;
  int min_diff_in_msec = (int) (min_diff_time * 1000) - 1;

  max_diff_in_msec = MIN (max_diff_in_msec, STAT_MAX_DIFF_TIME - 1);
  min_diff_in_msec = MAX (min_diff_in_msec, -(STAT_MAX_DIFF_TIME - 1));

  for (i = max_diff_in_msec; i >= 0; i--)
    {
      num_sort += num_slower_queries[i];
      if (num_sort > SORT_BUF_MAX)
    {
      if (print_result_with_sort (outfp, i + 1, num_sort - num_slower_queries[i], temp_line_len_max) < 0)
        {
          return;
        }
      /*
       * we don't sort last diff time query to avoid ordering size excess SORT_BUF_SIZE
       * in genaral, many sql have same diff time. so, this help decrease needless sorting
       */
      if (print_result_without_sort (outfp, i, temp_line_len_max) < 0)
        {
          return;
        }

      num_sort = 0;
    }
    }

  for (i = -1; i >= min_diff_in_msec; i--)
    {
      num_sort += num_faster_queries[(-i)];
      if (num_sort > SORT_BUF_MAX)
    {
      if (print_result_with_sort (outfp, i + 1, num_sort - num_faster_queries[(-i)], temp_line_len_max) < 0)
        {
          return;
        }

      if (print_result_without_sort (outfp, i, temp_line_len_max) < 0)
        {
          return;
        }

      num_sort = 0;
    }
    }

  if (num_sort > 0)
    {
      print_result_with_sort (outfp, min_diff_in_msec, num_sort, temp_line_len_max);
    }

  return;
}

/*
 * get_args() -
 *   return: option indicator or ER_FAILED
 *   argc(in):
 *   argv(in):
 */
static int
get_args (int argc, char *argv[])
{
  int c;
  int num_file_arg;
  double break_time_in_sec = DEFAULT_BREAK_TIME;
  print_result_diff_time_lower = DEFAULT_DIFF_TIME_LOWER;

  while (1)
    {
      c = getopt (argc, argv, "rd:u:p:I:P:F:T:h:D:");
      if (c == EOF)
    {
      break;
    }
      switch (c)
    {
    case 'I':
      host = optarg;
      break;
    case 'P':
      if (parse_int (&broker_port, optarg, 10) < 0)
        {
          goto usage;
        }
      break;
    case 'd':
      dbname = optarg;
      break;
    case 'u':
      dbuser = optarg;
      break;
    case 'p':
      dbpasswd = strdup (optarg);
#if defined (LINUX)
      memset (optarg, '*', strlen (optarg));
#endif
      break;
    case 'r':
      rewrite_query_flag = 1;
      break;
    case 'h':
      break_time_in_sec = atof (optarg);
      break;
    case 'D':
      print_result_diff_time_lower = atof (optarg);
      break;
    case 'F':
      if (str_to_log_date_format (optarg, from_date) < 0)
        {
          goto date_format_err;
        }
      check_date_flag = 1;
      break;
    case 'T':
      if (str_to_log_date_format (optarg, to_date) < 0)
        {
          goto date_format_err;
        }
      check_date_flag = 1;
      break;
    default:
      goto usage;
    }
    }

  break_time = (int) (break_time_in_sec * 1000);

  num_file_arg = argc - optind;
  if (num_file_arg != 2)
    {
      goto usage;
    }

  if (host == NULL || broker_port == INVALID_PORT_NUM || dbname == NULL)
    {
      goto usage;
    }

  return optind;

usage:
  fprintf (stderr,
       "usage : %s -I broker_host -P broker_port -d database_name infile outfile [OPTION] \n" "\n"
       "valid options:\n" "  -u   user name\n" "  -p   user password\n"
       "  -h   break time between query execute; default: %.3f(sec)\n"
       "  -r   enable to rewrite update/delete query to select query\n"
       "  -D   minimum value of time difference make print result; default: %.3f(sec)\n"
       "  -F   datetime when start to replay sql_log\n" "  -T   datetime when end to replay sql_log\n", argv[0],
       DEFAULT_BREAK_TIME, DEFAULT_DIFF_TIME_LOWER);

  return ER_FAILED;
date_format_err:
  fprintf (stderr, "invalid date. valid date format is yy-mm-dd hh:mm:ss.\n");
  return ER_FAILED;
}

/*
 * open_file() -
 *   return: NO_ERROR or ER_FAILED
 *   infilename(in):
 *   outfilename(in):
 *   infp(out):
 *   outfp(out):
 *   cci_errfp(out):
 *   skip_sqlfp(out):
 */

#define PROC_ERR(code){ \
        close_file (*infp, *outfp, *cci_errfp, *skip_sqlfp); \
        return code; \
}

static int
open_file (char *infilename, char *outfilename, FILE ** infp, FILE ** outfp, FILE ** cci_errfp, FILE ** skip_sqlfp)
{
  *infp = fopen (infilename, "r");
  if (*infp == NULL)
    {
      fprintf (stderr, "cannot open input file '%s'\n", infilename);
      return ER_FAILED;
    }

  *outfp = fopen (outfilename, "w");
  if (*outfp == NULL)
    {
      fprintf (stderr, "cannot open output file '%s'\n", outfilename);
      PROC_ERR (ER_FAILED);
    }

  auto[filename, fileptr] = filesys::open_temp_file ("br_", "w+b");

  br_tmpfp = fileptr;

  if (br_tmpfp == NULL)
    {
      fprintf (stderr, "cannot open temp file\n");
      PROC_ERR (ER_FAILED);
    }
  strcpy (br_tmp_filename, filename.c_str ());

  *cci_errfp = fopen (CCI_ERR_FILE_NAME, "w");
  if (*cci_errfp == NULL)
    {
      fprintf (stderr, "cannot open output file '%s'\n", CCI_ERR_FILE_NAME);
      PROC_ERR (ER_FAILED);
    }

  *skip_sqlfp = fopen (PASS_SQL_FILE_NAME, "w");
  if (*skip_sqlfp == NULL)
    {
      fprintf (stderr, "cannot open output file '%s'\n", PASS_SQL_FILE_NAME);
      PROC_ERR (ER_FAILED);
    }

  return NO_ERROR;

}

/*
 * close_file() -
 *   return: void
 *   infp(in):
 *   outfp(in):
 *   cci_errfp(in):
 *   skip_sqlfp(in):
 */
static void
close_file (FILE * infp, FILE * outfp, FILE * cci_errfp, FILE * skip_sqlfp)
{
  if (infp != NULL)
    {
      fclose (infp);
    }

  if (outfp != NULL)
    {
      fclose (outfp);
    }

  if (br_tmpfp != NULL)
    {
      fclose (br_tmpfp);
    }

  if (cci_errfp != NULL)
    {
      fclose (cci_errfp);
    }

  if (skip_sqlfp != NULL)
    {
      fclose (skip_sqlfp);
    }

  return;
}

int
main (int argc, char *argv[])
{
  int start_arg;
  char *infilename = NULL;
  char *outfilename = NULL;
  int res;

  start_arg = get_args (argc, argv);
  if (start_arg < 0)
    {
      return ER_FAILED;
    }

  assert (start_arg < argc);

  infilename = argv[start_arg];

  outfilename = argv[start_arg + 1];

  res = log_replay (infilename, outfilename);

  if (dbpasswd != NULL)
    {
      free (dbpasswd);
    }

  return res;
}