CUBRID Engine  latest
shard_proxy_function.c
Go to the documentation of this file.
1 /*
2  * Copyright 2008 Search Solution Corporation
3  * Copyright 2016 CUBRID Corporation
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 
20 /*
21  * shard_proxy_function.c -
22  *
23  */
24 
25 #ident "$Id$"
26 
27 
28 #include <stdio.h>
29 #include <stdlib.h>
30 #include <assert.h>
31 #include <math.h>
32 
33 #include "porting.h"
34 #include "shard_proxy.h"
35 #include "shard_proxy_handler.h"
36 #include "shard_proxy_function.h"
37 #include "shard_statement.h"
38 #include "shard_parser.h"
39 #include "shard_key_func.h"
40 #include "system_parameter.h"
41 #include "dbtype.h"
42 
47 
48 extern int make_net_buf (T_NET_BUF * net_buf, int size);
49 extern int make_header_info (T_NET_BUF * net_buf, MSG_HEADER * client_msg_header);
50 static void proxy_set_wait_timeout (T_PROXY_CONTEXT * ctx_p, int query_timeout);
51 
52 static int proxy_get_shard_id (T_SHARD_STMT * stmt_p, void **argv, T_SHARD_KEY_RANGE ** range_p_out);
54 static void proxy_update_shard_stats (T_SHARD_STMT * stmt_p, T_SHARD_KEY_RANGE * range_p);
55 static void proxy_update_shard_stats_without_hint (int shard_id);
56 static int proxy_client_execute_internal (T_PROXY_CONTEXT * ctx_p, T_PROXY_EVENT * event_p, int argc, char **argv,
57  char _func_code, int query_timeout, int bind_value_index);
58 static int proxy_cas_execute_internal (T_PROXY_CONTEXT * ctx_p, T_PROXY_EVENT * event_p);
59 static bool proxy_is_invalid_statement (int error_ind, int error_code, char *driver_info,
60  T_BROKER_VERSION client_version);
61 static bool proxy_has_different_column_info (const char *r1, size_t r1_len, const char *r2, size_t r2_len);
62 
63 
64 void
66 {
67  int proxy_wait_timeout_sec;
68  int query_timeout_sec;
69 
70  query_timeout_sec = ceil ((double) query_timeout / 1000);
71  proxy_wait_timeout_sec = ctx_p->wait_timeout;
72 
73  if (proxy_wait_timeout_sec == 0 || query_timeout_sec == 0)
74  {
75  ctx_p->wait_timeout = proxy_wait_timeout_sec + query_timeout_sec;
76  }
77  else if (proxy_wait_timeout_sec < query_timeout_sec)
78  {
79  ctx_p->wait_timeout = proxy_wait_timeout_sec;
80  }
81  else
82  {
83  ctx_p->wait_timeout = query_timeout_sec;
84  }
85 
86  return;
87 }
88 
89 int
90 proxy_check_cas_error (char *read_msg)
91 {
92  char *data;
93 
94  /* error_ind is ... old < R8.3.0 : err_no or svr_h_id R8.3.0 ~ current : error_indicator or srv_h_id */
95  int error_ind;
96 
97  data = read_msg + MSG_HEADER_SIZE;
98  error_ind = (int) ntohl (*(int *) data);
99 
100  return error_ind;
101 }
102 
103 int
104 proxy_get_cas_error_code (char *read_msg, T_BROKER_VERSION client_version)
105 {
106  char *data;
107  int error_code;
108 
109  if (client_version >= CAS_MAKE_VER (8, 3, 0))
110  {
111  data = read_msg + MSG_HEADER_SIZE + sizeof (int) /* error indicator */ ;
112  }
113  else
114  {
115  data = read_msg + MSG_HEADER_SIZE;
116  }
117  error_code = (int) ntohl (*(int *) data);
118 
119  return error_code;
120 }
121 
122 int
123 proxy_send_request_to_cas (T_PROXY_CONTEXT * ctx_p, T_PROXY_EVENT * event_p, int func_code)
124 {
125  int error = 0;
126  T_CAS_IO *cas_io_p;
127 
128  ENTER_FUNC ();
129 
130  cas_io_p =
131  proxy_cas_alloc_by_ctx (ctx_p->client_id, ctx_p->shard_id, ctx_p->cas_id, ctx_p->cid, ctx_p->uid,
132  ctx_p->wait_timeout, func_code);
133  if (cas_io_p == NULL)
134  {
136  "Failed to allocate CAS. " "(shard_id:%d, cas_id:%d, context id:%d, context uid:%d).", ctx_p->shard_id,
137  ctx_p->cas_id, ctx_p->cid, ctx_p->uid);
138  EXIT_FUNC ();
139  return -1;
140  }
141  else if (cas_io_p == (T_CAS_IO *) SHARD_TEMPORARY_UNAVAILABLE)
142  {
143  assert (ctx_p->shard_id < 0 || ctx_p->cas_id < 0);
144 
145  EXIT_FUNC ();
146  return 1 /* waiting idle cas */ ;
147  }
148 
149  if (ctx_p->is_in_tran == false)
150  {
151  proxy_context_set_in_tran (ctx_p, cas_io_p->shard_id, cas_io_p->cas_id);
152  }
153 
154  error = proxy_cas_io_write (cas_io_p, event_p);
155  if (error)
156  {
157  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to write to CAS. " "CAS(%s). event(%s).", proxy_str_cas_io (cas_io_p),
158  proxy_str_event (event_p));
159  EXIT_FUNC ();
160  return -1;
161  }
162  event_p = NULL;
163 
164  EXIT_FUNC ();
165  return 0;
166 }
167 
168 static int
169 proxy_send_request_to_cas_with_new_event (T_PROXY_CONTEXT * ctx_p, unsigned int type, int from,
170  T_PROXY_EVENT_FUNC req_func)
171 {
172  int error = 0;
173  T_PROXY_EVENT *event_p = NULL;
174  char *driver_info;
175 
176  driver_info = proxy_get_driver_info_by_ctx (ctx_p);
177 
178  event_p = proxy_event_new_with_req (driver_info, type, from, req_func);
179  if (event_p == NULL)
180  {
181  error = -1;
182  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to make request event. (error:%d). context(%s).", error,
183  proxy_str_context (ctx_p));
184 
185  goto error_return;
186  }
187 
188  error = proxy_send_request_to_cas (ctx_p, event_p, ctx_p->func_code);
189  if (error < 0)
190  {
191  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to send request to CAS. " "(error=%d). context(%s). evnet(%s).", error,
192  proxy_str_context (ctx_p), proxy_str_event (event_p));
193  goto error_return;
194  }
195  else if (error > 0) /* TODO : DELETE */
196  {
197  goto error_return;
198  }
199 
200  event_p = NULL;
201 
202  return error;
203 
204 error_return:
205  if (event_p != NULL)
206  {
207  proxy_event_free (event_p);
208  event_p = NULL;
209  }
210  return error;
211 }
212 
213 #if defined(ENABLE_UNUSED_FUNCTION)
214 static int
215 proxy_send_request_to_cas_with_stored_server_handle_id (T_PROXY_CONTEXT * ctx_p, T_PROXY_EVENT * event_p, int argc,
216  char **argv)
217 {
218  int error = 0;
219  int srv_h_id, cas_srv_h_id;
220 
221  T_CAS_IO *cas_io_p;
222 
223  ENTER_FUNC ();
224 
225  /* find idle cas */
226  cas_io_p = proxy_cas_alloc_by_ctx (ctx_p->shard_id, ctx_p->cas_id, ctx_p->cid, ctx_p->uid, ctx_p->wait_timeout);
227  if (cas_io_p == NULL)
228  {
230  "Failed to allocate CAS. " "(shard_id:%d, cas_id:%d, context id:%d, context uid:%d).", ctx_p->shard_id,
231  ctx_p->cas_id, ctx_p->cid, ctx_p->uid);
232 
233  goto free_context;
234  }
235  else if (cas_io_p == (T_CAS_IO *) SHARD_TEMPORARY_UNAVAILABLE)
236  {
237  assert (ctx_p->shard_id < 0 || ctx_p->cas_id < 0);
238 
239  goto free_context;
240  }
241 
242  assert (ctx_p->shard_id == cas_io_p->shard_id);
243  assert (ctx_p->cas_id == cas_io_p->cas_id);
244 
245  if (ctx_p->is_in_tran == false)
246  {
247  proxy_context_set_in_tran (ctx_p, cas_io_p->shard_id, cas_io_p->cas_id);
248  }
249 
250  /* find stored server handle id for this shard/cas */
251  net_arg_get_int (&srv_h_id, argv[0]);
252 
253  cas_srv_h_id = shard_stmt_find_srv_h_id_for_shard_cas (srv_h_id, ctx_p->shard_id, ctx_p->cas_id);
254 
255  if (cas_srv_h_id <= 0)
256  {
258 
259  goto free_context;
260  }
261 
262  net_arg_put_int (argv[0], &(cas_srv_h_id));
263 
264 
265  error = proxy_cas_io_write (cas_io_p, event_p);
266  if (error)
267  {
268  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to write to CAS. " "CAS(%s). event(%s).", proxy_str_cas_io (cas_io_p),
269  proxy_str_event (event_p));
270 
271  goto free_context;
272  }
273 
274  event_p = NULL;
275 
276  EXIT_FUNC ();
277  return 0;
278 
279 free_context:
280 
281  if (event_p)
282  {
283  proxy_event_free (event_p);
284  event_p = NULL;
285  }
286 
287  ctx_p->free_context = true;
288 
289  EXIT_FUNC ();
290  return -1;
291 }
292 #endif /* ENABLE_UNUSED_FUNCTION */
293 
294 int
296 {
297  int error = 0;
298  T_CLIENT_IO *cli_io_p;
299 
300  ENTER_FUNC ();
301 
302  /* find client io */
303  cli_io_p = proxy_client_io_find_by_ctx (ctx_p->client_id, ctx_p->cid, ctx_p->uid);
304  if (cli_io_p == NULL)
305  {
306  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to find client. " "(client_id:%d, context id:%d, context uid:%u).",
307  ctx_p->client_id, ctx_p->cid, ctx_p->uid);
308 
309  EXIT_FUNC ();
310  return -1;
311  }
312 
313  error = proxy_client_io_write (cli_io_p, event_p);
314  if (error)
315  {
316  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to write to client. " "client(%s). event(%s).",
317  proxy_str_client_io (cli_io_p), proxy_str_event (event_p));
318  EXIT_FUNC ();
319  return -1;
320  }
321 
322  EXIT_FUNC ();
323  return 0;
324 }
325 
326 int
327 proxy_send_response_to_client_with_new_event (T_PROXY_CONTEXT * ctx_p, unsigned int type, int from,
328  T_PROXY_EVENT_FUNC resp_func)
329 {
330  int error = 0;
331  char *driver_info;
332  T_PROXY_EVENT *event_p = NULL;
333  T_CLIENT_INFO *client_info_p = NULL;
334 
335  client_info_p = shard_shm_get_client_info (proxy_info_p, ctx_p->client_id);
336  if (client_info_p != NULL)
337  {
338  client_info_p->res_time = time (NULL);
339  }
340 
341  driver_info = proxy_get_driver_info_by_ctx (ctx_p);
342 
343  event_p = proxy_event_new_with_rsp (driver_info, type, from, resp_func);
344  if (event_p == NULL)
345  {
346  error = -1;
347  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to make response event. (error:%d). context(%s).", error,
348  proxy_str_context (ctx_p));
349 
350  goto error_return;
351  }
352 
353  error = proxy_send_response_to_client (ctx_p, event_p);
354  if (error)
355  {
356  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to response to the client. " "(error=%d). context(%s). evnet(%s).",
357  error, proxy_str_context (ctx_p), proxy_str_event (event_p));
358 
359  goto error_return;
360  }
361 
362  event_p = NULL;
363 
364  return error;
365 
366 error_return:
367  if (event_p != NULL)
368  {
369  proxy_event_free (event_p);
370  event_p = NULL;
371  }
372  return -1;
373 }
374 
375 static int
377 {
378  int error;
379  int length;
380  char *prepare_resp = NULL;
381  T_PROXY_EVENT *event_p = NULL;
382 
383  prepare_resp = proxy_dup_msg ((char *) stmt_p->reply_buffer);
384  if (prepare_resp == NULL)
385  {
386  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Not enough virtual memory. " "failed to duplicate prepare request. ");
387 
388  EXIT_FUNC ();
389  return -1;
390  }
391 
393  if (event_p == NULL)
394  {
395  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to make event. " "(%s, %s). context(%s).", "PROXY_EVENT_IO_WRITE",
396  "PROXY_EVENT_FROM_CLIENT", proxy_str_context (ctx_p));
397 
398  goto error_return;
399  }
400 
401  length = get_msg_length (prepare_resp);
402  proxy_event_set_buffer (event_p, prepare_resp, length);
403  prepare_resp = NULL;
404 
405  error = proxy_send_response_to_client (ctx_p, event_p);
406  if (error)
407  {
408  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to send response " "to the client. (error:%d). context(%s). event(%s). ",
409  error, proxy_str_context (ctx_p), proxy_str_event (event_p));
410 
411  goto error_return;
412  }
413  event_p = NULL;
414 
415  EXIT_FUNC ();
416  return 0;
417 
418 error_return:
419  if (prepare_resp != NULL)
420  {
421  FREE_MEM (prepare_resp);
422  }
423 
424  if (event_p != NULL)
425  {
426  proxy_event_free (event_p);
427  event_p = NULL;
428  }
429  EXIT_FUNC ();
430  return -1;
431 }
432 
433 static void
435 {
436  SP_PARSER_HINT *first_hint_p;
437 
438  T_SHM_SHARD_KEY_STAT *key_stat_p;
439  T_SHM_SHARD_CONN_STAT *shard_stat_p;
440 
441  int shard_id, key_index, range_index;
442 
443  first_hint_p = sp_get_first_hint (stmt_p->parser);
444  if (first_hint_p == NULL)
445  {
446  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to update stats. No hint available. ");
447  return;
448  }
449 
450  switch (first_hint_p->hint_type)
451  {
452  case HT_KEY:
453  if (range_p == NULL)
454  {
455  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to update stats. Invalid hint key range. (hint_type:%s).", "HT_KEY");
456  assert (range_p);
457  return;
458  }
459  shard_id = range_p->shard_id;
460 
461  shard_stat_p = shard_shm_get_shard_stat (proxy_info_p, shard_id);
462  if (shard_stat_p == NULL)
463  {
465  "Failed to update stats. " "Invalid shm shard stats. (hint_type:%s, proxy_id:%d, shard_id:%d).",
466  "HT_KEY", proxy_info_p->proxy_id, shard_id);
467 
468  assert (shard_stat_p);
469  return;
470  }
471 
472  shard_stat_p->num_hint_key_queries_requested++;
473 
474  key_index = range_p->key_index;
475  range_index = range_p->range_index;
476 
477  key_stat_p = shard_shm_get_key_stat (proxy_info_p, key_index);
478  if (key_stat_p == NULL)
479  {
481  "Failed to update stats. "
482  "Invalid shm shard key stats. (hint_type:%s, proxy_id:%d, key_index:%d).", "HT_KEY",
483  proxy_info_p->proxy_id, key_index);
484 
485  assert (key_stat_p);
486  return;
487  }
488 
489  key_stat_p->stat[range_index].num_range_queries_requested++;
490  return;
491 
492  case HT_ID:
493  assert (first_hint_p->arg.type == VT_INTEGER);
494  if (first_hint_p->arg.type == VT_INTEGER)
495  {
496  INT64 integer = first_hint_p->arg.integer;
497  if (integer < 0 || integer >= MAX_SHARD_CONN)
498  {
499  shard_id = PROXY_INVALID_SHARD;
500  }
501  else
502  {
503  shard_id = (int) integer;
504  }
505 
506  if (proxy_info_p->num_shard_conn <= shard_id)
507  {
509  "Failed to update stats. " "Invalid shard id. (hint_type:%s, shard_id:%d).", "HT_ID",
510  shard_id);
511  return;
512  }
513 
514  shard_stat_p = shard_shm_get_shard_stat (proxy_info_p, shard_id);
515  if (shard_stat_p == NULL)
516  {
518  "Failed to update stats. "
519  "Invalid shm shard stats. (hint_type:%s, proxy_id:%d, shard_id:%d).", "HT_ID",
520  proxy_info_p->proxy_id, shard_id);
521 
522  assert (shard_stat_p);
523  return;
524  }
525 
526  shard_stat_p->num_hint_id_queries_requested++;
527  return;
528  }
529  else
530  {
531  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unsupported hint value type. (hint_value_type:%d).",
532  first_hint_p->arg.type);
533 
534  return;
535  }
536  break;
537 
538  default:
539  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unsupported hint type. (hint_type:%d).", first_hint_p->hint_type);
540 
541  assert (false);
542  break;
543  }
544 
545  return;
546 }
547 
548 static void
550 {
551  T_SHM_SHARD_CONN_STAT *shard_stat_p;
552 
553  assert (shard_id >= 0);
554  if (shard_id < 0)
555  {
556  return;
557  }
558 
559  shard_stat_p = shard_shm_get_shard_stat (proxy_info_p, shard_id);
560  if (shard_stat_p == NULL)
561  {
562  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to update stats. (shard_id:%d).", shard_id);
563 
564  assert (shard_stat_p);
565  return;
566  }
567 
568  shard_stat_p->num_no_hint_queries_requested++;
569 
570  return;
571 }
572 
573 static int
574 proxy_get_shard_id (T_SHARD_STMT * stmt_p, void **argv, T_SHARD_KEY_RANGE ** range_p_out)
575 {
576  int compare_flag = 0;
577  int shard_id = -1, next_shard_id = -1;
578  SP_PARSER_HINT *hint_p;
579  T_SHARD_KEY_RANGE *range_p = NULL;
580 
581  *range_p_out = NULL;
582 
583  hint_p = sp_get_first_hint (stmt_p->parser);
584  if (hint_p == NULL)
585  {
586  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unable to get shard id. No hint available.");
587  proxy_info_p->num_hint_err_queries_processed++;
588 
589  return PROXY_INVALID_SHARD;
590  }
591 
592  for (; hint_p; hint_p = sp_get_next_hint (hint_p))
593  {
594  switch (hint_p->hint_type)
595  {
596  case HT_KEY:
597  range_p = proxy_get_range_by_param (hint_p, argv);
598  if (range_p == NULL)
599  {
600  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unable to get shard id. Invalid hint key range. (hint_type:%s).",
601  "HT_KEY");
602  return PROXY_INVALID_SHARD;
603  }
604  assert (range_p->shard_id >= 0);
605  if (shard_id < 0)
606  {
607  shard_id = range_p->shard_id;
608  }
609  else
610  {
611  next_shard_id = range_p->shard_id;
612  compare_flag = 1;
613  }
614  break;
615 
616  case HT_ID:
617  if (hint_p->arg.type == VT_INTEGER)
618  {
619  INT64 integer = hint_p->arg.integer;
620  if (integer < 0 || integer >= MAX_SHARD_CONN)
621  {
622  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unable to get shard id. Invalid hint id. (hint_type:%s).", "HT_ID");
623  return PROXY_INVALID_SHARD;
624  }
625 
626  if (shard_id < 0)
627  {
628  shard_id = (int) integer;
629  }
630  else
631  {
632  next_shard_id = (int) integer;
633  compare_flag = 1;
634  }
635  }
636  else
637  {
639  "Unable to get shard id. shard id is not integer type. (hint_type:%s, type:%d).", "HT_ID",
640  hint_p->arg.type);
641 
642  return PROXY_INVALID_SHARD;
643  }
644  break;
645 
646  default:
647 
648  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unsupported hint type. (hint_type:%d).", hint_p->hint_type);
649  return PROXY_INVALID_SHARD;
650  }
651 
652  if (compare_flag > 0 && shard_id != next_shard_id)
653  {
654  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Shard id is different. " "(first_shard_id:%d, next_shard_id:%d). ",
655  shard_id, next_shard_id);
656  return PROXY_INVALID_SHARD;
657  }
658 
659  }
660 
661  if (range_p != NULL)
662  {
663  *range_p_out = range_p;
664  }
665 
666  return shard_id;
667 }
668 
669 static T_SHARD_KEY_RANGE *
671 {
672  int hint_position, num_bind;
673  int type_idx, val_idx;
674 
675  char type;
676  int data_size;
677  void *net_type, *net_value;
678 
679  T_SHARD_KEY *key_p;
680  T_SHARD_KEY_RANGE *range_p = NULL;
681 
682  int shard_key_id = 0;
683  const char *key_column;
684 
685  /* Phase 0 : hint position get */
686  /* SHARD TODO : find statement entry, and param position & etc */
687  /* SHARD TODO : multiple key_value */
688 
689  assert (shm_key_p->num_shard_key == 1);
690  if (shm_key_p->num_shard_key != 1)
691  {
692  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Too may shard key column in config. " "(num_shard_key:%d).",
693  shm_key_p->num_shard_key);
694  return NULL;
695  }
696 
697  key_p = (T_SHARD_KEY *) (&(shm_key_p->shard_key[0]));
698  key_column = key_p->key_column;
699 
700  switch (hint_p->bind_type)
701  {
702  case BT_STATIC:
703  shard_key_id = proxy_find_shard_id_by_hint_value (&hint_p->value, key_column);
704  if (shard_key_id < 0)
705  {
706  return NULL;
707  }
708  num_bind = 0;
709  break;
710  case BT_DYNAMIC:
711  assert (hint_p->bind_position >= 0);
712  hint_position = hint_p->bind_position;
713  num_bind = 1;
714  break;
715  default:
716  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unsupported hint bind type. (bind_type:%d).", hint_p->bind_type);
717  return NULL;
718  }
719 
720  /* Phase 1 : Param value get */
721  if (num_bind > 0)
722  {
723  type_idx = 2 * hint_position;
724  val_idx = 2 * hint_position + 1;
725  net_type = argv[type_idx];
726  net_value = argv[val_idx];
727 
728  net_arg_get_char (type, net_type);
729 
730  switch (type)
731  {
732  case CCI_U_TYPE_INT:
733  case CCI_U_TYPE_UINT:
734  {
735  int i_val;
736  net_arg_get_size (&data_size, net_value);
737  if (data_size <= 0)
738  {
739  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unexpected integer hint value size." "(size:%d).", data_size);
740  return NULL;
741  }
742 
743  net_arg_get_int (&i_val, net_value);
744  shard_key_id = (*fn_get_shard_key) (key_column, (T_SHARD_U_TYPE) type, &i_val, sizeof (int));
745  }
746  break;
747  case CCI_U_TYPE_SHORT:
748  case CCI_U_TYPE_USHORT:
749  {
750  short s_val;
751  net_arg_get_size (&data_size, net_value);
752  if (data_size <= 0)
753  {
754  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unexpected short hint value size." "(size:%d).", data_size);
755  return NULL;
756  }
757 
758  net_arg_get_short (&s_val, net_value);
759  shard_key_id = (*fn_get_shard_key) (key_column, (T_SHARD_U_TYPE) type, &s_val, sizeof (short));
760 
761  }
762  break;
763  case CCI_U_TYPE_BIGINT:
764  case CCI_U_TYPE_UBIGINT:
765  {
766  INT64 l_val;
767  net_arg_get_size (&data_size, net_value);
768  if (data_size <= 0)
769  {
770  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unexpected big integer hint value size." "(size:%d).", data_size);
771  return NULL;
772  }
773 
774  net_arg_get_bigint (&l_val, net_value);
775  shard_key_id = (*fn_get_shard_key) (key_column, (T_SHARD_U_TYPE) type, &l_val, sizeof (INT64));
776 
777  }
778  break;
779  case CCI_U_TYPE_ENUM:
780  case CCI_U_TYPE_STRING:
781  {
782  char *s_val;
783  int s_len;
784  net_arg_get_str (&s_val, &s_len, net_value);
785  if (s_val == NULL || s_len == 0)
786  {
787  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Invalid string hint values. (len:%d).", s_len);
788  return NULL;
789  }
790  shard_key_id = (*fn_get_shard_key) (key_column, SHARD_U_TYPE_STRING, s_val, s_len);
791  }
792  break;
793 
794  /* SHARD TODO : support other hint data types */
795 
796  default:
797  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unsupported hint value type. (type:%d).", type);
798 
799  return NULL;
800 
801  }
802  }
803 
804  if (shard_key_id < 0)
805  {
806  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to get shard key id. " "(shard_key_id:%d).", shard_key_id);
807  return NULL;
808  }
809 
810  /* Phase 2 : Shard range get */
811  range_p = shard_metadata_find_shard_range (shm_key_p, (char *) key_column, shard_key_id);
812  if (range_p == NULL)
813  {
814  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unable to find shm shard range. (key:%s, key_id:%d).", key_column,
815  shard_key_id);
816  return NULL;
817  }
818 
819  PROXY_LOG (PROXY_LOG_MODE_SHARD_DETAIL, "Select shard. (shard_id:%d, key_column:[%s], shard_key_id:%d).",
820  range_p->shard_id, key_column, shard_key_id);
821 
822  return range_p;
823 }
824 
825 /*
826  * request event
827  */
828 int
830 {
831  int error = 0;
832  const char func_code = CAS_FC_END_TRAN;
833 
834  ENTER_FUNC ();
835 
836  /* set client tran status as OUT_TRAN, when end_tran */
837  ctx_p->is_client_in_tran = false;
838 
839  if (ctx_p->is_in_tran)
840  {
841  error = proxy_send_request_to_cas (ctx_p, event_p, func_code);
842  if (error < 0)
843  {
844  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to send request to CAS. " "(error=%d). context(%s). evnet(%s).",
845  error, proxy_str_context (ctx_p), proxy_str_event (event_p));
846 
847  goto free_context;
848  }
849  else if (error > 0)
850  {
851  assert (false);
852  goto free_context;
853  }
854 
855  ctx_p->func_code = func_code;
856  }
857  else
858  {
859  error =
862  if (error)
863  {
864  goto free_context;
865  }
866 
867  proxy_event_free (event_p);
868  event_p = NULL;
869  }
870 
871  /* free statement list */
872  proxy_context_free_stmt (ctx_p);
873 
874  EXIT_FUNC ();
875  return 0;
876 
877 free_context:
878  if (event_p)
879  {
880  proxy_event_free (event_p);
881  event_p = NULL;
882  }
883 
884  ctx_p->free_context = true;
885 
886  EXIT_FUNC ();
887  return -1;
888 }
889 
890 int
892 {
893  int error = 0;
894  int i;
895 
896  /* sql statement */
897  char *sql_stmt;
898  char *organized_sql_stmt = NULL;
899  int sql_size;
900 
901  /* argv */
902  char flag;
903  char auto_commit_mode;
904 
905  /* io/statement entries */
906  T_CAS_IO *cas_io_p;
907  T_SHARD_STMT *stmt_p;
908  T_WAIT_CONTEXT *waiter_p;
909 
910  int shard_id;
911  T_SHARD_KEY_RANGE *dummy_range_p = NULL;
912 
913  char *driver_info;
914  T_BROKER_VERSION client_version;
915 
916  char *request_p;
917 
918  bool has_shard_val_hint = false;
919  bool use_temp_statement = false;
920 
921  const char func_code = CAS_FC_PREPARE;
922 
923 
924  ENTER_FUNC ();
925 
926  /* set client tran status as IN_TRAN, when prepare */
927  ctx_p->is_client_in_tran = true;
928 
929  request_p = event_p->buffer.data;
930  assert (request_p); // __FOR_DEBUG
931 
932  if (ctx_p->waiting_event)
933  {
934  assert (false);
935 
937  ctx_p->waiting_event = NULL;
938 
939  goto free_context;
940  }
941 
942  /* process argv */
943  if (argc < 2)
944  {
945  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Invalid argument. (argc:%d). context(%s).", argc, proxy_str_context (ctx_p));
947 
948  proxy_event_free (event_p);
949  event_p = NULL;
950 
951  EXIT_FUNC ();
952  return -1;
953  }
954 
955  net_arg_get_str (&sql_stmt, &sql_size, argv[0]);
956  net_arg_get_char (flag, argv[1]);
957  if (argc > 2)
958  {
959  net_arg_get_char (auto_commit_mode, argv[2]);
960  for (i = 3; i < argc; i++)
961  {
962  int deferred_close_handle;
963  net_arg_get_int (&deferred_close_handle, argv[i]);
964 
965  /* SHARD TODO : what to do for deferred close handle? */
966  }
967  }
968  else
969  {
970  auto_commit_mode = FALSE;
971  }
972 
973  PROXY_DEBUG_LOG ("Process requested prepare sql statement. " "(sql_stmt:[%s]). context(%s).",
974  shard_str_sqls (sql_stmt), proxy_str_context (ctx_p));
975 
976  organized_sql_stmt = shard_stmt_rewrite_sql (&has_shard_val_hint, sql_stmt, proxy_info_p->appl_server);
977 
978  if (organized_sql_stmt == NULL)
979  {
980  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to rewrite sql statement. (sql_stmt:[%s]).", sql_stmt);
982 
983  proxy_event_free (event_p);
984  event_p = NULL;
985 
986  error = -1;
987  goto end;
988  }
989  PROXY_DEBUG_LOG ("Rewrite sql statement. " "(organized_sql_stmt:[%s]). context(%s).",
990  shard_str_sqls (organized_sql_stmt), proxy_str_context (ctx_p));
991 
992  driver_info = proxy_get_driver_info_by_ctx (ctx_p);
993  client_version = CAS_MAKE_PROTO_VER (driver_info);
994 
995  if (ctx_p->is_prepare_for_execute == false)
996  {
997  proxy_info_p->num_request_stmt++;
998  }
999 
1000  stmt_p = shard_stmt_find_by_sql (organized_sql_stmt, ctx_p->database_user, client_version);
1001  if (stmt_p)
1002  {
1003  if (ctx_p->is_prepare_for_execute == false)
1004  {
1005  proxy_info_p->num_request_stmt_in_pool++;
1006  }
1007 
1008  PROXY_DEBUG_LOG ("success to find statement. (stmt:%s).", shard_str_stmt (stmt_p));
1009 
1010  switch (stmt_p->status)
1011  {
1017  if (proxy_context_find_stmt (ctx_p, stmt_p->stmt_h_id) != NULL)
1018  {
1019  use_temp_statement = true;
1020  break;
1021  }
1022 
1023  error = proxy_send_prepared_stmt_to_client (ctx_p, stmt_p);
1024  if (error)
1025  {
1027  "Failed to send prepared statment to client. " "(error:%d). context(%s).", error,
1028  proxy_str_context (ctx_p));
1029  goto free_context;
1030  }
1031 
1032  /* save statement to the context */
1033  if (proxy_context_add_stmt (ctx_p, stmt_p) == NULL)
1034  {
1035  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to add statement to context. " "statement(%s). context(%s).",
1036  shard_str_stmt (stmt_p), proxy_str_context (ctx_p));
1037  goto free_context;
1038  }
1039 
1040  /* we should free event right now */
1041  proxy_event_free (event_p);
1042  event_p = NULL;
1043 
1044 
1045  /* do not relay request to the shard/cas */
1046  error = 0;
1047  goto end;
1048 
1050 
1051  /* if this context was woken up by shard/cas resource and try dummy prepare again */
1052  if (stmt_p->ctx_cid == ctx_p->cid && stmt_p->ctx_uid == ctx_p->uid)
1053  {
1054  assert (ctx_p->prepared_stmt == stmt_p);
1055  goto relay_prepare_request;
1056  }
1057 
1058  if (stmt_p->stmt_type != SHARD_STMT_TYPE_PREPARED)
1059  {
1060  assert (false);
1061 
1062  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unexpected statement type. expect(%d), statement(%s). context(%s).",
1064  goto free_context;
1065  }
1066 
1067  waiter_p = proxy_waiter_new (ctx_p->cid, ctx_p->uid, ctx_p->wait_timeout);
1068  if (waiter_p == NULL)
1069  {
1070  goto free_context;
1071  }
1072 
1073  error = shard_queue_ordered_enqueue (&stmt_p->waitq, (void *) waiter_p, proxy_waiter_comp_fn);
1074  if (error)
1075  {
1076  proxy_waiter_free (waiter_p);
1077  waiter_p = NULL;
1078 
1079  goto free_context;
1080  }
1081 
1082  proxy_info_p->stmt_waiter_count++;
1083  PROXY_DEBUG_LOG ("Add stmt waiter. (waiter_coutn:%d).", proxy_info_p->stmt_waiter_count);
1084 
1085  ctx_p->waiting_event = event_p;
1086  event_p = NULL; /* DO NOT DELETE */
1087 
1088  error = 0;
1089  goto end;
1090 
1091  default:
1092  assert (false);
1093  goto free_context;
1094  }
1095  }
1096 
1097  if (use_temp_statement)
1098  {
1099  stmt_p = shard_stmt_new_exclusive (organized_sql_stmt, ctx_p->cid, ctx_p->uid, client_version);
1100  }
1101  else
1102  {
1103  stmt_p = shard_stmt_new_prepared_stmt (organized_sql_stmt, ctx_p->cid, ctx_p->uid, client_version);
1104  }
1105  if (stmt_p == NULL)
1106  {
1107  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to create new statement. context(%s).", proxy_str_context (ctx_p));
1109 
1110  proxy_event_free (event_p);
1111  event_p = NULL;
1112 
1113  error = -1;
1114  goto end;
1115  }
1116 
1117  PROXY_DEBUG_LOG ("Create new sql statement. " "(index:%d). statement(%s). context(%s).", stmt_p->index,
1118  shard_str_stmt (stmt_p), proxy_str_context (ctx_p));
1119 
1120  if (proxy_info_p->ignore_shard_hint == OFF)
1121  {
1122  error = shard_stmt_set_hint_list (stmt_p);
1123  if (error < 0)
1124  {
1125  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to set hint list. statement(%s). context(%s).",
1126  shard_str_stmt (stmt_p), proxy_str_context (ctx_p));
1127 
1129 
1130  /* check and wakeup statement waiter */
1132 
1133  /*
1134  * there must be no context sharing this statement at this time.
1135  * so, we can free statement.
1136  */
1137  shard_stmt_free (stmt_p);
1138  stmt_p = NULL;
1139 
1140  proxy_event_free (event_p);
1141  event_p = NULL;
1142 
1143  error = -1;
1144  goto end;
1145  }
1146  }
1147 
1148  ctx_p->prepared_stmt = stmt_p;
1149 
1150  /* save statement to the context */
1151  if (proxy_context_add_stmt (ctx_p, stmt_p) == NULL)
1152  {
1153  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to link statement to context. statement(%s). context(%s).",
1154  shard_str_stmt (stmt_p), proxy_str_context (ctx_p));
1155  goto free_context;
1156  }
1157 
1158  error =
1159  shard_stmt_save_prepare_request (stmt_p, has_shard_val_hint, &event_p->buffer.data, &event_p->buffer.length,
1160  argv[0], argv[1], organized_sql_stmt);
1161  if (error)
1162  {
1163  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to save prepared statement request. statement(%s). context(%s).",
1164  shard_str_stmt (stmt_p), proxy_str_context (ctx_p));
1165  goto free_context;
1166  }
1167  request_p = event_p->buffer.data; /* DO NOT DELETE */
1168 
1169 relay_prepare_request:
1170 
1171  if (ctx_p->is_in_tran == false || ctx_p->waiting_dummy_prepare == true)
1172  {
1173  proxy_set_force_out_tran (request_p);
1174  }
1175  ctx_p->waiting_dummy_prepare = false;
1176 
1177  if (stmt_p->status != SHARD_STMT_STATUS_IN_PROGRESS)
1178  {
1179  PROXY_DEBUG_LOG ("Unexpected statement status. (status=%d). statement(%s). context(%s).", stmt_p->status,
1180  shard_str_stmt (stmt_p), proxy_str_context (ctx_p));
1181  assert (false);
1182  goto free_context;
1183  }
1184 
1185  /* if shard_key is static value(or shard_id), dummy prepare send to values's shard */
1186  if (proxy_info_p->ignore_shard_hint == OFF)
1187  {
1188  if (sp_is_hint_static (stmt_p->parser))
1189  {
1190  shard_id = proxy_get_shard_id (stmt_p, NULL, &dummy_range_p);
1191  if (shard_id == PROXY_INVALID_SHARD)
1192  {
1193  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Invalid shard id. (shard_id:%d). context(%s).", shard_id,
1194  proxy_str_context (ctx_p));
1195 
1197 
1198  /* wakeup and reset statment */
1200  shard_stmt_free (ctx_p->prepared_stmt);
1201  ctx_p->prepared_stmt = NULL;
1202 
1203  proxy_event_free (event_p);
1204  event_p = NULL;
1205 
1206  error = -1;
1207  goto end;
1208  }
1209  ctx_p->shard_id = shard_id;
1210  }
1211  }
1212 
1213  cas_io_p =
1214  proxy_cas_alloc_by_ctx (ctx_p->client_id, ctx_p->shard_id, ctx_p->cas_id, ctx_p->cid, ctx_p->uid,
1215  ctx_p->wait_timeout, func_code);
1216  if (cas_io_p == NULL)
1217  {
1218  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to allocate CAS. context(%s).", proxy_str_context (ctx_p));
1219 
1221 
1222  /* wakeup and reset statment */
1224  shard_stmt_free (ctx_p->prepared_stmt);
1225  ctx_p->prepared_stmt = NULL;
1226 
1227  proxy_event_free (event_p);
1228  event_p = NULL;
1229 
1230  if (ctx_p->shard_id != PROXY_INVALID_SHARD)
1231  {
1232  EXIT_FUNC ();
1233  goto free_context;
1234  }
1235  else
1236  {
1237  error = -1;
1238  goto end;
1239  }
1240  }
1241  else if (cas_io_p == (T_CAS_IO *) SHARD_TEMPORARY_UNAVAILABLE)
1242  {
1243  /* waiting idle shard/cas */
1244  ctx_p->waiting_event = event_p;
1245  event_p = NULL;
1246 
1247  ctx_p->waiting_dummy_prepare = true;
1248 
1249  error = 0;
1250  goto end;
1251  }
1252 
1253  /* we should bind context and shard/cas after complete to allocate */
1254  proxy_context_set_in_tran (ctx_p, cas_io_p->shard_id, cas_io_p->cas_id);
1255 
1256  ctx_p->waiting_event = proxy_event_dup (event_p);
1257  if (ctx_p->waiting_event == NULL)
1258  {
1259  goto free_context;
1260  }
1261 
1262  error = proxy_cas_io_write (cas_io_p, event_p);
1263  if (error)
1264  {
1265  goto free_context;
1266  }
1267  event_p = NULL;
1268 
1269  ctx_p->func_code = func_code;
1270 
1271 end:
1272  if (organized_sql_stmt && organized_sql_stmt != sql_stmt)
1273  {
1274  FREE_MEM (organized_sql_stmt);
1275  }
1276 
1277  EXIT_FUNC ();
1278  return error;
1279 
1280 free_context:
1281  if (organized_sql_stmt && organized_sql_stmt != sql_stmt)
1282  {
1283  FREE_MEM (organized_sql_stmt);
1284  }
1285 
1286  if (ctx_p->waiting_event == event_p)
1287  {
1288  ctx_p->waiting_event = NULL;
1289  }
1290 
1291  if (event_p)
1292  {
1293  proxy_event_free (event_p);
1294  event_p = NULL;
1295  }
1296 
1297  ctx_p->free_context = true;
1298 
1299  EXIT_FUNC ();
1300  return error;
1301 }
1302 
1303 int
1305 {
1306  int query_timeout;
1307  int bind_value_index = 9;
1308  T_BROKER_VERSION client_version;
1309  char *driver_info;
1310 
1311  char func_code = CAS_FC_EXECUTE;
1312 
1313  driver_info = proxy_get_driver_info_by_ctx (ctx_p);
1314  client_version = CAS_MAKE_PROTO_VER (driver_info);
1315  if (client_version >= CAS_PROTO_MAKE_VER (PROTOCOL_V1))
1316  {
1317  bind_value_index++;
1318 
1319  net_arg_get_int (&query_timeout, argv[9]);
1320  if (!DOES_CLIENT_UNDERSTAND_THE_PROTOCOL (client_version, PROTOCOL_V2))
1321  {
1322  /* protocol version v1 driver send query timeout in second */
1323  query_timeout *= 1000;
1324  }
1325  }
1326  else
1327  {
1328  query_timeout = 0;
1329  }
1330 
1331  return proxy_client_execute_internal (ctx_p, event_p, argc, argv, func_code, query_timeout, bind_value_index);
1332 }
1333 
1334 static int
1335 proxy_client_execute_internal (T_PROXY_CONTEXT * ctx_p, T_PROXY_EVENT * event_p, int argc, char **argv, char _func_code,
1336  int query_timeout, int bind_value_index)
1337 {
1338  int error = 0;
1339  int srv_h_id;
1340  int cas_srv_h_id;
1341  char *prepare_request = NULL;
1342  int i;
1343  int shard_id, next_shard_id;
1344  int length;
1345  SP_HINT_TYPE hint_type = HT_NONE;
1346  int bind_value_size;
1347  int argc_mod_2;
1348  T_CAS_IO *cas_io_p;
1349  T_SHARD_STMT *stmt_p = NULL;
1350  T_SHARD_KEY_RANGE *range_p = NULL;
1351 
1352  T_PROXY_EVENT *new_event_p = NULL;
1353 
1354  char *request_p;
1355 
1356  char func_code = _func_code;
1357 
1358  ENTER_FUNC ();
1359 
1360  request_p = event_p->buffer.data;
1361  assert (request_p); // __FOR_DEBUG
1362 
1363  if (ctx_p->waiting_event)
1364  {
1365  assert (false);
1366 
1368  ctx_p->waiting_event = NULL;
1369 
1370  goto free_context;
1371  }
1372 
1373  argc_mod_2 = bind_value_index % 2;
1374 
1375  if ((argc < bind_value_index) || (argc % 2 != argc_mod_2))
1376  {
1377  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Invalid argument. (argc:%d). context(%s).", argc, proxy_str_context (ctx_p));
1378 
1380 
1381  proxy_event_free (event_p);
1382  event_p = NULL;
1383 
1384  EXIT_FUNC ();
1385  return -1;
1386  }
1387 
1388  net_arg_get_int (&srv_h_id, argv[0]);
1389 
1390  /* bind variables, even:bind type, odd:bind value */
1391 
1392  stmt_p = shard_stmt_find_by_stmt_h_id (srv_h_id);
1393  if (stmt_p == NULL || stmt_p->status == SHARD_STMT_STATUS_INVALID)
1394  {
1395  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unable to find statement handle identifier. " "(srv_h_id:%d). context(%s).",
1396  srv_h_id, proxy_str_context (ctx_p));
1397 
1399 
1400  proxy_event_free (event_p);
1401  event_p = NULL;
1402 
1403  EXIT_FUNC ();
1404  return -1;
1405  }
1406 
1407  if (proxy_info_p->ignore_shard_hint == OFF)
1408  {
1409  hint_type = (SP_HINT_TYPE) shard_stmt_get_hint_type (stmt_p);
1410  if (hint_type <= HT_INVAL || hint_type > HT_EOF)
1411  {
1412  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unsupported hint type. (hint_type:%d). context(%s).", hint_type,
1413  proxy_str_context (ctx_p));
1414 
1416 
1417  proxy_event_free (event_p);
1418  event_p = NULL;
1419 
1420  EXIT_FUNC ();
1421  return -1;
1422  }
1423 
1424  shard_id = proxy_get_shard_id (stmt_p, (void **) (argv + bind_value_index), &range_p);
1425 
1426  PROXY_LOG (PROXY_LOG_MODE_SHARD_DETAIL, "Select shard. " "(prev_shard_id:%d, curr_shard_id:%d). context(%s).",
1427  ctx_p->shard_id, shard_id, proxy_str_context (ctx_p));
1428 
1429  /* check shard_id */
1430  if (shard_id == PROXY_INVALID_SHARD)
1431  {
1432  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Invalid shard id. (shard_id:%d). context(%s).", shard_id,
1433  proxy_str_context (ctx_p));
1434 
1436 
1437  proxy_event_free (event_p);
1438  event_p = NULL;
1439 
1440  EXIT_FUNC ();
1441  return -1;
1442  }
1443 
1444  if (func_code == CAS_FC_EXECUTE_ARRAY)
1445  {
1446  /* bind value size = bind count * 2(type:value) */
1447  bind_value_size = stmt_p->parser->bind_count * 2;
1448 
1449  /* compare with next batch shard_id when execute_array */
1450  for (i = bind_value_index + bind_value_size; i < argc; i += bind_value_size)
1451  {
1452  next_shard_id = proxy_get_shard_id (stmt_p, (void **) (argv + i), &range_p);
1453 
1454  if (shard_id != next_shard_id)
1455  {
1457  "Shard id is different. " "(first_shard_id:%d, next_shard_id:%d). context(%s).", shard_id,
1458  next_shard_id, proxy_str_context (ctx_p));
1459 
1461 
1462  proxy_event_free (event_p);
1463  event_p = NULL;
1464 
1465  EXIT_FUNC ();
1466  return -1;
1467  }
1468  }
1469  }
1470 
1471  if (ctx_p->shard_id != PROXY_INVALID_SHARD && ctx_p->shard_id != shard_id)
1472  {
1474  "Shard id couldn't be changed in a transaction. "
1475  "(prev_shard_id:%d, curr_shard_id:%d). context(%s).", ctx_p->shard_id, shard_id,
1476  proxy_str_context (ctx_p));
1477 
1479 
1480  proxy_event_free (event_p);
1481  event_p = NULL;
1482 
1483  EXIT_FUNC ();
1484  goto free_context;
1485  }
1486 
1487  ctx_p->shard_id = shard_id;
1488  }
1489 
1490  proxy_set_wait_timeout (ctx_p, query_timeout);
1491 
1492  cas_io_p =
1493  proxy_cas_alloc_by_ctx (ctx_p->client_id, ctx_p->shard_id, ctx_p->cas_id, ctx_p->cid, ctx_p->uid,
1494  ctx_p->wait_timeout, func_code);
1495  if (cas_io_p == NULL)
1496  {
1497  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to allocate CAS. context(%s).", proxy_str_context (ctx_p));
1498 
1500 
1501  proxy_event_free (event_p);
1502  event_p = NULL;
1503 
1504  EXIT_FUNC ();
1505  return -1;
1506  }
1507  else if (cas_io_p == (T_CAS_IO *) SHARD_TEMPORARY_UNAVAILABLE)
1508  {
1509  /* waiting idle shard/cas */
1510  ctx_p->waiting_event = event_p;
1511  event_p = NULL;
1512 
1513  EXIT_FUNC ();
1514  return 0;
1515  }
1516 
1517  proxy_context_set_in_tran (ctx_p, cas_io_p->shard_id, cas_io_p->cas_id);
1518 
1519  /* save statement to the context */
1520  if (proxy_context_add_stmt (ctx_p, stmt_p) == NULL)
1521  {
1522  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to link statement to context. statement(%s). context(%s).",
1523  shard_str_stmt (stmt_p), proxy_str_context (ctx_p));
1524  goto free_context;
1525  }
1526 
1527  /*
1528  * find stored server handle id for this shard/cas, if exist, do execute
1529  * with it. otherwise, do dummy prepare for exeucte to get server handle id.
1530  */
1531  cas_srv_h_id = shard_stmt_find_srv_h_id_for_shard_cas (srv_h_id, cas_io_p->shard_id, cas_io_p->cas_id);
1532  if (cas_srv_h_id < 0) /* not prepared */
1533  {
1534  PROXY_LOG (PROXY_LOG_MODE_SHARD_DETAIL, "Do prepare before execute. cas(%s). context(%s).",
1535  proxy_str_cas_io (cas_io_p), proxy_str_context (ctx_p));
1536 
1537  /* make prepare request event */
1538  prepare_request = proxy_dup_msg ((char *) stmt_p->request_buffer);
1539  if (prepare_request == NULL)
1540  {
1541  goto free_context;
1542  }
1543  length = get_msg_length (prepare_request);
1544 
1545  /* unset force_out_tran bitmask */
1546  proxy_unset_force_out_tran (request_p);
1547 
1549  if (new_event_p == NULL)
1550  {
1551  goto free_context;
1552  }
1553  proxy_event_set_buffer (new_event_p, prepare_request, length);
1554  prepare_request = NULL;
1555 
1556  ctx_p->is_prepare_for_execute = true;
1557  ctx_p->prepared_stmt = stmt_p;
1558  ctx_p->waiting_event = event_p;
1559  event_p = NULL; /* DO NOT DELETE */
1560 
1561  /* __FOR_DEBUG */
1563 
1564  func_code = CAS_FC_PREPARE;
1565 
1566  goto relay_request;
1567  }
1568  else
1569  {
1570  /* If we will fail to execute query, then we have to retry. */
1571  ctx_p->waiting_event = proxy_event_dup (event_p);
1572  if (ctx_p->waiting_event == NULL)
1573  {
1574  goto free_context;
1575  }
1576 
1577  net_arg_put_int (argv[0], &cas_srv_h_id);
1578  }
1579 
1580  ctx_p->stmt_hint_type = hint_type;
1581  ctx_p->stmt_h_id = srv_h_id;
1582 
1583  if (proxy_info_p->ignore_shard_hint == OFF)
1584  {
1585  /* update shard statistics */
1586  if (stmt_p)
1587  {
1588  proxy_update_shard_stats (stmt_p, range_p);
1589  }
1590  else
1591  {
1592  assert (false);
1593  }
1594  }
1595  else
1596  {
1598  }
1599 
1600  new_event_p = event_p; /* add comment */
1601 
1602 relay_request:
1603 
1604  error = proxy_cas_io_write (cas_io_p, new_event_p);
1605  if (error)
1606  {
1607  goto free_context;
1608  }
1609 
1610  ctx_p->func_code = func_code;
1611 
1612  if (event_p && event_p != new_event_p)
1613  {
1614  proxy_event_free (event_p);
1615  event_p = NULL;
1616  }
1617  new_event_p = NULL;
1618 
1619  EXIT_FUNC ();
1620  return 0;
1621 
1622 free_context:
1623  if (ctx_p->is_prepare_for_execute)
1624  {
1625  ctx_p->is_prepare_for_execute = false;
1626  }
1627 
1628  if (ctx_p->prepared_stmt)
1629  {
1630  ctx_p->prepared_stmt = NULL;
1631  }
1632 
1633  if (ctx_p->waiting_event)
1634  {
1635  ctx_p->waiting_event = NULL;
1636  }
1637 
1638  if (event_p == new_event_p)
1639  {
1640  if (event_p)
1641  {
1642  proxy_event_free (event_p);
1643  }
1644  event_p = new_event_p = NULL;
1645  }
1646  else
1647  {
1648  if (event_p)
1649  {
1650  proxy_event_free (event_p);
1651  event_p = NULL;
1652  }
1653 
1654  if (new_event_p)
1655  {
1656  proxy_event_free (new_event_p);
1657  new_event_p = NULL;
1658  }
1659  }
1660 
1661  if (prepare_request)
1662  {
1663  FREE_MEM (prepare_request);
1664  }
1665 
1666  ctx_p->free_context = true;
1667 
1668  EXIT_FUNC ();
1669  return -1;
1670 }
1671 
1672 int
1674 {
1675  int error = 0;
1676  int param_name;
1677  T_CLIENT_INFO *client_info_p = NULL;
1678  char *driver_info;
1679  T_PROXY_EVENT *new_event_p = NULL;
1680 
1681  if (argc < 2)
1682  {
1683  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Invalid argument. (argc:%d). context(%s).", argc, proxy_str_context (ctx_p));
1685  error = -1;
1686  goto free_and_return;
1687  }
1688 
1689  client_info_p = shard_shm_get_client_info (proxy_info_p, ctx_p->client_id);
1690  if (client_info_p == NULL)
1691  {
1693  "Unable to find cilent info in shared memory. " "(context id:%d, context uid:%d)", ctx_p->cid,
1694  ctx_p->uid);
1696  error = -1;
1697  goto free_and_return;
1698  }
1699 
1700  net_arg_get_int (&param_name, argv[0]);
1701  if (param_name == CCI_PARAM_ISOLATION_LEVEL)
1702  {
1703  int isolation_level = 0;
1704 
1705  net_arg_get_int (&isolation_level, argv[1]);
1706  if (!IS_VALID_ISOLATION_LEVEL (isolation_level))
1707  {
1709  "Invalid isolation level. (isolation:%d). " "(context id:%d, context uid:%d)", isolation_level,
1710  ctx_p->cid, ctx_p->uid);
1712  error = -1;
1713  goto free_and_return;
1714  }
1715 
1716  client_info_p->isolation_level = isolation_level;
1717  }
1718  else if (param_name == CCI_PARAM_LOCK_TIMEOUT)
1719  {
1720  int lock_timeout = -1;
1721 
1722  net_arg_get_int (&lock_timeout, argv[1]);
1723  if (lock_timeout < CAS_USE_DEFAULT_DB_PARAM)
1724  {
1725  lock_timeout = -1;
1726  }
1727 
1728  client_info_p->lock_timeout = lock_timeout;
1729  }
1730 
1731  driver_info = proxy_get_driver_info_by_ctx (ctx_p);
1732  new_event_p =
1735  if (new_event_p == NULL)
1736  {
1737  goto free_context;
1738  }
1739 
1740  error = proxy_send_response_to_client (ctx_p, new_event_p);
1741  if (error)
1742  {
1743  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to response to the client. " "(error=%d). context(%s). evnet(%s).",
1744  error, proxy_str_context (ctx_p), proxy_str_event (new_event_p));
1745 
1746  proxy_event_free (new_event_p);
1747  goto free_context;
1748  }
1749 
1750 free_and_return:
1751  if (event_p != NULL)
1752  {
1753  proxy_event_free (event_p);
1754  }
1755 
1756  return error;
1757 
1758 free_context:
1759  ctx_p->free_context = true;
1760 
1761  if (event_p != NULL)
1762  {
1763  proxy_event_free (event_p);
1764  }
1765 
1766  return -1;
1767 }
1768 
1769 int
1771 {
1772  int error = 0;
1773  int param_name;
1774  T_CLIENT_INFO *client_info_p = NULL;
1775  char *driver_info;
1776  T_PROXY_EVENT *new_event_p = NULL;
1777 
1778  if (argc < 1)
1779  {
1780  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Invalid argument. (argc:%d). context(%s).", argc, proxy_str_context (ctx_p));
1782  error = -1;
1783  goto free_and_return;
1784  }
1785 
1786  client_info_p = shard_shm_get_client_info (proxy_info_p, ctx_p->client_id);
1787  if (client_info_p == NULL)
1788  {
1790  "Unable to find cilent info in shared memory. " "(context id:%d, context uid:%d)", ctx_p->cid,
1791  ctx_p->uid);
1793  error = -1;
1794  goto free_and_return;
1795  }
1796 
1797  net_arg_get_int (&param_name, argv[0]);
1798 
1799  driver_info = proxy_get_driver_info_by_ctx (ctx_p);
1800 
1801  if (param_name == CCI_PARAM_ISOLATION_LEVEL)
1802  {
1803  new_event_p =
1806  }
1807  else if (param_name == CCI_PARAM_LOCK_TIMEOUT)
1808  {
1809  new_event_p =
1812  }
1813 
1814  if (new_event_p == NULL)
1815  {
1816  goto free_context;
1817  }
1818 
1819  error = proxy_send_response_to_client (ctx_p, new_event_p);
1820  if (error)
1821  {
1822  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to response to the client. " "(error=%d). context(%s). evnet(%s).",
1823  error, proxy_str_context (ctx_p), proxy_str_event (new_event_p));
1824 
1825  proxy_event_free (new_event_p);
1826  goto free_context;
1827  }
1828 
1829 free_and_return:
1830  if (event_p != NULL)
1831  {
1832  proxy_event_free (event_p);
1833  }
1834 
1835  return error;
1836 
1837 free_context:
1838  ctx_p->free_context = true;
1839 
1840  if (event_p != NULL)
1841  {
1842  proxy_event_free (event_p);
1843  }
1844 
1845  return -1;
1846 }
1847 
1848 int
1850 {
1851  int error = 0;
1852  T_CAS_IO *cas_io_p;
1853  int srv_h_id;
1854  int cas_srv_h_id;
1855 
1856  const char func_code = CAS_FC_CLOSE_REQ_HANDLE;
1857 
1858  ENTER_FUNC ();
1859 
1860  if (ctx_p->is_in_tran)
1861  {
1862  cas_io_p =
1863  proxy_cas_alloc_by_ctx (ctx_p->client_id, ctx_p->shard_id, ctx_p->cas_id, ctx_p->cid, ctx_p->uid,
1864  ctx_p->wait_timeout, func_code);
1865  if (cas_io_p == NULL)
1866  {
1867  goto free_context;
1868  }
1869  else if (cas_io_p == (T_CAS_IO *) SHARD_TEMPORARY_UNAVAILABLE)
1870  {
1871  assert (false); /* it cannot happen */
1872 
1873  goto free_context;
1874  }
1875  assert (ctx_p->shard_id == cas_io_p->shard_id);
1876  assert (ctx_p->cas_id == cas_io_p->cas_id);
1877  proxy_context_set_in_tran (ctx_p, cas_io_p->shard_id, cas_io_p->cas_id);
1878 
1879  net_arg_get_int (&srv_h_id, argv[0]);
1880 
1881  cas_srv_h_id = shard_stmt_find_srv_h_id_for_shard_cas (srv_h_id, cas_io_p->shard_id, cas_io_p->cas_id);
1882  if (cas_srv_h_id < 0)
1883  {
1885  "Failed to close requested handle. " "cannot find cas server handle. context(%s).",
1886  proxy_str_context (ctx_p));
1887  assert (false);
1888  goto out_tran;
1889  }
1890 
1891  /* remove statement handle id for this cas */
1892  shard_stmt_del_srv_h_id_for_shard_cas (srv_h_id, ctx_p->shard_id, ctx_p->cas_id);
1893 
1894  /* relay request to the CAS */
1895  net_arg_put_int (argv[0], &cas_srv_h_id);
1896 
1897  error = proxy_cas_io_write (cas_io_p, event_p);
1898  if (error)
1899  {
1900  goto free_context;
1901  }
1902 
1903  event_p = NULL;
1904 
1905  ctx_p->func_code = func_code;
1906 
1907  EXIT_FUNC ();
1908  return 0;
1909  }
1910 
1911 out_tran:
1912  /* send close_req_handle response to the client, if this context is not IN_TRAN, or context is IN_TRAN status but
1913  * cas_srv_h_id is not found */
1914  /* FIXME : if context is IN_TRAN and cas_srv_hd_id is not found ? */
1915  error =
1918  if (error)
1919  {
1920  goto free_context;
1921  }
1922 
1923  proxy_event_free (event_p);
1924  event_p = NULL;
1925 
1926  EXIT_FUNC ();
1927  return 0;
1928 
1929 free_context:
1930  if (event_p)
1931  {
1932  proxy_event_free (event_p);
1933  event_p = NULL;
1934  }
1935 
1936  ctx_p->free_context = true;
1937 
1938  EXIT_FUNC ();
1939  return -1;
1940 }
1941 
1942 int
1944 {
1945  int error = 0;
1946  int srv_h_id, cas_srv_h_id;
1947 
1948  T_CAS_IO *cas_io_p;
1949 
1950  const char func_code = CAS_FC_CURSOR;
1951 
1952  ENTER_FUNC ();
1953 
1954  if (ctx_p->is_in_tran == false)
1955  {
1957 
1958  proxy_event_free (event_p);
1959  event_p = NULL;
1960 
1961  EXIT_FUNC ();
1962  return -1;
1963  }
1964 
1965  /* find idle cas */
1966  cas_io_p =
1967  proxy_cas_alloc_by_ctx (ctx_p->client_id, ctx_p->shard_id, ctx_p->cas_id, ctx_p->cid, ctx_p->uid,
1968  ctx_p->wait_timeout, func_code);
1969  if (cas_io_p == NULL)
1970  {
1971  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unable to find avaiable idle CAS. " "context(%s). evnet(%s).",
1972  proxy_str_context (ctx_p), proxy_str_event (event_p));
1973 
1974  goto free_context;
1975  }
1976  else if (cas_io_p == (T_CAS_IO *) SHARD_TEMPORARY_UNAVAILABLE)
1977  {
1978  assert (false); /* it cannot happen */
1979 
1980  goto free_context;
1981  }
1982  assert (ctx_p->shard_id == cas_io_p->shard_id);
1983  assert (ctx_p->cas_id == cas_io_p->cas_id);
1984 
1985  /*
1986  * find stored server handle id for this shard/cas, if exist, do fetch
1987  * with it. otherwise, returns proxy internal error to the client.
1988  */
1989  net_arg_get_int (&srv_h_id, argv[0]);
1990 
1991  cas_srv_h_id = shard_stmt_find_srv_h_id_for_shard_cas (srv_h_id, ctx_p->shard_id, ctx_p->cas_id);
1992  if (cas_srv_h_id <= 0)
1993  {
1995 
1996  proxy_event_free (event_p);
1997  event_p = NULL;
1998 
1999  EXIT_FUNC ();
2000  return -1;
2001  }
2002 
2003  net_arg_put_int (argv[0], &(cas_srv_h_id));
2004 
2005  error = proxy_cas_io_write (cas_io_p, event_p);
2006  if (error)
2007  {
2008  goto free_context;
2009  }
2010 
2011  /* in this case, we must not free event */
2012  event_p = NULL;
2013 
2014  ctx_p->func_code = func_code;
2015 
2016  EXIT_FUNC ();
2017  return 0;
2018 
2019 free_context:
2020  if (event_p)
2021  {
2022  proxy_event_free (event_p);
2023  event_p = NULL;
2024  }
2025 
2026  ctx_p->free_context = true;
2027 
2028  EXIT_FUNC ();
2029  return -1;
2030 }
2031 
2032 int
2034 {
2035  int error = 0;
2036  int srv_h_id, cas_srv_h_id;
2037 
2038  T_CAS_IO *cas_io_p;
2039 
2040  const char func_code = CAS_FC_FETCH;
2041 
2042  ENTER_FUNC ();
2043 
2044  if (ctx_p->is_in_tran == false)
2045  {
2047 
2048  proxy_event_free (event_p);
2049  event_p = NULL;
2050 
2051  EXIT_FUNC ();
2052  return -1;
2053  }
2054 
2055  /* find idle cas */
2056  cas_io_p =
2057  proxy_cas_alloc_by_ctx (ctx_p->client_id, ctx_p->shard_id, ctx_p->cas_id, ctx_p->cid, ctx_p->uid,
2058  ctx_p->wait_timeout, func_code);
2059  if (cas_io_p == NULL)
2060  {
2061  goto free_context;
2062  }
2063  else if (cas_io_p == (T_CAS_IO *) SHARD_TEMPORARY_UNAVAILABLE)
2064  {
2065  assert (false); /* it cannot happen */
2066 
2067  goto free_context;
2068  }
2069  assert (ctx_p->shard_id == cas_io_p->shard_id);
2070  assert (ctx_p->cas_id == cas_io_p->cas_id);
2071  proxy_context_set_in_tran (ctx_p, cas_io_p->shard_id, cas_io_p->cas_id);
2072 
2073  /*
2074  * find stored server handle id for this shard/cas, if exist, do fetch
2075  * with it. otherwise, returns proxy internal error to the client.
2076  */
2077  net_arg_get_int (&srv_h_id, argv[0]);
2078 
2079  cas_srv_h_id = shard_stmt_find_srv_h_id_for_shard_cas (srv_h_id, ctx_p->shard_id, ctx_p->cas_id);
2080  if (cas_srv_h_id <= 0)
2081  {
2083 
2084  proxy_event_free (event_p);
2085  event_p = NULL;
2086 
2087  EXIT_FUNC ();
2088  return -1;
2089  }
2090 
2091  net_arg_put_int (argv[0], &(cas_srv_h_id));
2092 
2093  error = proxy_cas_io_write (cas_io_p, event_p);
2094  if (error)
2095  {
2096  goto free_context;
2097  }
2098 
2099  /* in this case, we must not free event */
2100  event_p = NULL;
2101 
2102  ctx_p->func_code = func_code;
2103 
2104  EXIT_FUNC ();
2105  return 0;
2106 
2107 free_context:
2108 
2109  if (event_p)
2110  {
2111  proxy_event_free (event_p);
2112  event_p = NULL;
2113  }
2114 
2115  ctx_p->free_context = true;
2116 
2117  EXIT_FUNC ();
2118  return -1;
2119 }
2120 
2121 int
2123 {
2124  int error = 0;
2125  int shard_id;
2126  T_SHARD_STMT *stmt_p;
2127  char *driver_info;
2128  T_BROKER_VERSION client_version;
2129 
2130  const char func_code = CAS_FC_SCHEMA_INFO;
2131 
2132  ENTER_FUNC ();
2133 
2134  if (ctx_p->waiting_event)
2135  {
2136  assert (false);
2137 
2139  ctx_p->waiting_event = NULL;
2140 
2141  goto free_context;
2142  }
2143 
2144  if (ctx_p->prepared_stmt == NULL)
2145  {
2146  stmt_p = shard_stmt_new_schema_info (ctx_p->cid, ctx_p->uid);
2147  if (stmt_p == NULL)
2148  {
2150 
2151  proxy_event_free (event_p);
2152  event_p = NULL;
2153 
2154  EXIT_FUNC ();
2155  return -1;
2156  }
2157 
2158  /* save statement to the context */
2159  ctx_p->prepared_stmt = stmt_p;
2160  if (proxy_context_add_stmt (ctx_p, stmt_p) == NULL)
2161  {
2162  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to link statement to context. statement(%s). context(%s).",
2163  shard_str_stmt (stmt_p), proxy_str_context (ctx_p));
2164  goto free_context;
2165  }
2166  }
2167  else
2168  {
2169  /*
2170  * It can be happened, when schema_info request is re-invoked
2171  * by shard waiter.
2172  */
2173 
2175  {
2176  assert (false);
2177 
2178  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unexpected statement type. expect(%d), statement(%s). context(%s).",
2180 
2181  goto free_context;
2182  }
2183 
2184  /* statement linked to the context already */
2185  }
2186 
2187  driver_info = proxy_get_driver_info_by_ctx (ctx_p);
2188  client_version = CAS_MAKE_PROTO_VER (driver_info);
2189  if (DOES_CLIENT_UNDERSTAND_THE_PROTOCOL (client_version, PROTOCOL_V5))
2190  {
2191  net_arg_get_int (&shard_id, argv[4]);
2192  }
2193  else
2194  {
2195  shard_id = 0; /* default SHARD # 0 */
2196  }
2197 
2198  if ((shard_id < 0 || shard_id >= proxy_info_p->max_shard)
2199  || (ctx_p->shard_id != PROXY_INVALID_SHARD && ctx_p->shard_id != shard_id))
2200  {
2202 
2203  proxy_event_free (event_p);
2204  event_p = NULL;
2205 
2206  if (ctx_p->prepared_stmt)
2207  {
2208  shard_stmt_free (ctx_p->prepared_stmt);
2209  ctx_p->prepared_stmt = NULL;
2210  }
2211 
2212  EXIT_FUNC ();
2213  return -1;
2214  }
2215  ctx_p->shard_id = shard_id;
2216 
2217  error = proxy_send_request_to_cas (ctx_p, event_p, func_code);
2218  if (error < 0)
2219  {
2220  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to send request to CAS. " "(error=%d). context(%s). evnet(%s).", error,
2221  proxy_str_context (ctx_p), proxy_str_event (event_p));
2222 
2223  goto free_context;
2224  }
2225  else if (error > 0)
2226  {
2227  /* waiting idle shard/cas */
2228  ctx_p->waiting_event = event_p;
2229  event_p = NULL;
2230 
2231  EXIT_FUNC ();
2232  return 0;
2233  }
2234 
2235  ctx_p->func_code = func_code;
2236 
2237  EXIT_FUNC ();
2238  return 0;
2239 
2240 free_context:
2241  if (event_p)
2242  {
2243  proxy_event_free (event_p);
2244  event_p = NULL;
2245  }
2246 
2247  ctx_p->free_context = true;
2248 
2249  EXIT_FUNC ();
2250  return -1;
2251 }
2252 
2253 int
2255 {
2256  int error = 0;
2257  char *request_p = NULL;
2258  char *response_p = NULL;
2259  char *driver_info;
2260  T_PROXY_EVENT *new_event_p = NULL;
2261 
2262  ENTER_FUNC ();
2263 
2264  if (ctx_p->is_connected)
2265  {
2266  driver_info = proxy_get_driver_info_by_ctx (ctx_p);
2267  new_event_p =
2270  if (new_event_p == NULL)
2271  {
2272  goto free_context;
2273  }
2274 
2275  response_p = new_event_p->buffer.data;
2276  assert (response_p); // __FOR_DEBUG
2277 
2278  if (ctx_p->is_client_in_tran)
2279  {
2280  proxy_set_con_status_in_tran (response_p);
2281  }
2282 
2283  error = proxy_send_response_to_client (ctx_p, new_event_p);
2284  if (error)
2285  {
2286  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to response to the client. " "(error=%d). context(%s). evnet(%s).",
2287  error, proxy_str_context (ctx_p), proxy_str_event (new_event_p));
2288 
2289  goto free_context;
2290  }
2291  new_event_p = NULL;
2292 
2293  proxy_event_free (event_p);
2294  event_p = NULL;
2295  }
2296  else
2297  {
2298  request_p = event_p->buffer.data;
2299 
2300  proxy_set_force_out_tran (request_p);
2301 
2302  error = proxy_send_request_to_cas (ctx_p, event_p, CAS_FC_CHECK_CAS);
2303  if (error < 0)
2304  {
2305  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to send request to CAS. " "(error=%d). context(%s). evnet(%s).",
2306  error, proxy_str_context (ctx_p), proxy_str_event (event_p));
2307 
2308  proxy_event_free (event_p);
2309  event_p = NULL;
2310  }
2311  else if (error > 0)
2312  {
2313  PROXY_LOG (PROXY_LOG_MODE_DEBUG, "Waiting cas to connect db context(%s)", proxy_str_context (ctx_p));
2314  ctx_p->waiting_event = event_p;
2315  event_p = NULL;
2316  }
2317  else
2318  {
2319  ctx_p->waiting_event = proxy_event_dup (event_p);
2320  if (ctx_p->waiting_event == NULL)
2321  {
2322  goto free_context;
2323  }
2324  }
2325 
2326  ctx_p->func_code = CAS_FC_CHECK_CAS;
2327  }
2328 
2329  EXIT_FUNC ();
2330  return 0;
2331 
2332 free_context:
2333  if (event_p)
2334  {
2335  proxy_event_free (event_p);
2336  event_p = NULL;
2337  }
2338 
2339  if (new_event_p)
2340  {
2341  proxy_event_free (new_event_p);
2342  new_event_p = NULL;
2343  }
2344 
2345  ctx_p->free_context = true;
2346 
2347  EXIT_FUNC ();
2348  return -1;
2349 }
2350 
2351 int
2353 {
2354  int error = 0;
2355 
2356  ENTER_FUNC ();
2357 
2358  /* set client tran status as OUT_TRAN, when con_close */
2359  ctx_p->is_client_in_tran = false;
2360  ctx_p->free_on_client_io_write = true;
2361  ctx_p->dont_free_statement = false;
2362 
2363  if (ctx_p->is_in_tran)
2364  {
2365  ctx_p->func_code = CAS_FC_END_TRAN;
2366 
2367  error =
2370  if (error < 0)
2371  {
2372  goto free_context;
2373  }
2374  else if (error > 0)
2375  {
2376  assert (false);
2377  goto free_context;
2378  }
2379  }
2380  else
2381  {
2382  error =
2385  if (error)
2386  {
2387  goto free_context;
2388  }
2389  }
2390 
2391  /* free statement list */
2392  proxy_context_free_stmt (ctx_p);
2393 
2394  proxy_event_free (event_p);
2395  event_p = NULL;
2396 
2397  EXIT_FUNC ();
2398  return 0;
2399 
2400 free_context:
2401  if (event_p)
2402  {
2403  proxy_event_free (event_p);
2404  event_p = NULL;
2405  }
2406 
2407  ctx_p->free_context = true;
2408 
2409  EXIT_FUNC ();
2410  return -1;
2411 }
2412 
2413 int
2415 {
2416  int error = 0;
2417  char *db_version = NULL;
2418  char auto_commit_mode;
2419 
2420  ENTER_FUNC ();
2421 
2422  if (argc < 1)
2423  {
2424  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Invalid argument. (argc:%d). context(%s).", argc, proxy_str_context (ctx_p));
2426 
2427  proxy_event_free (event_p);
2428  event_p = NULL;
2429 
2430  EXIT_FUNC ();
2431  return -1;
2432  }
2433 
2434  /* arg0 */
2435  net_arg_get_char (auto_commit_mode, argv[0]);
2436 
2437 
2438  error =
2441  if (error)
2442  {
2443  goto free_context;
2444  }
2445 
2446  proxy_event_free (event_p);
2447  event_p = NULL;
2448 
2449  EXIT_FUNC ();
2450  return 0;
2451 
2452 free_context:
2453  if (event_p)
2454  {
2455  proxy_event_free (event_p);
2456  event_p = NULL;
2457  }
2458 
2459  ctx_p->free_context = true;
2460 
2461  EXIT_FUNC ();
2462  return -1;
2463 }
2464 
2465 int
2467 {
2468  int error = 0;
2469  T_CAS_IO *cas_io_p;
2470  int srv_h_id;
2471  int cas_srv_h_id;
2472 
2473  const char func_code = CAS_FC_CURSOR_CLOSE;
2474 
2475  ENTER_FUNC ();
2476 
2477  if (ctx_p->is_in_tran)
2478  {
2479  cas_io_p =
2480  proxy_cas_alloc_by_ctx (ctx_p->client_id, ctx_p->shard_id, ctx_p->cas_id, ctx_p->cid, ctx_p->uid,
2481  ctx_p->wait_timeout, func_code);
2482  if (cas_io_p == NULL)
2483  {
2484  goto free_context;
2485  }
2486  else if (cas_io_p == (T_CAS_IO *) SHARD_TEMPORARY_UNAVAILABLE)
2487  {
2488  assert (false); /* it cannot be happened */
2489 
2490  goto free_context;
2491  }
2492  assert (ctx_p->shard_id == cas_io_p->shard_id);
2493  assert (ctx_p->cas_id == cas_io_p->cas_id);
2494  proxy_context_set_in_tran (ctx_p, cas_io_p->shard_id, cas_io_p->cas_id);
2495 
2496  net_arg_get_int (&srv_h_id, argv[0]);
2497 
2498  cas_srv_h_id = shard_stmt_find_srv_h_id_for_shard_cas (srv_h_id, cas_io_p->shard_id, cas_io_p->cas_id);
2499  if (cas_srv_h_id < 0)
2500  {
2501  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to cursoe close. " "cannot find cas server handle. context(%s).",
2502  proxy_str_context (ctx_p));
2503  assert (false);
2504  goto out_tran;
2505  }
2506 
2507  /* relay request to the CAS */
2508  net_arg_put_int (argv[0], &cas_srv_h_id);
2509 
2510  error = proxy_cas_io_write (cas_io_p, event_p);
2511  if (error)
2512  {
2513  goto free_context;
2514  }
2515 
2516  event_p = NULL;
2517 
2518  ctx_p->func_code = func_code;
2519 
2520  EXIT_FUNC ();
2521  return 0;
2522  }
2523 
2524 out_tran:
2525  /* send cursor_close response to the client, if this context is not IN_TRAN, or context is IN_TRAN status but
2526  * cas_srv_h_id is not found */
2527  /* FIXME : if context is IN_TRAN and cas_srv_hd_id is not found ? */
2528  error =
2531  if (error)
2532  {
2533  goto free_context;
2534  }
2535 
2536  proxy_event_free (event_p);
2537  event_p = NULL;
2538 
2539  EXIT_FUNC ();
2540  return 0;
2541 
2542 free_context:
2543  if (event_p)
2544  {
2545  proxy_event_free (event_p);
2546  event_p = NULL;
2547  }
2548 
2549  ctx_p->free_context = true;
2550 
2551  EXIT_FUNC ();
2552  return -1;
2553 }
2554 
2555 int
2557 {
2558  int query_timeout;
2559  int bind_value_index = 2;
2560  char *driver_info;
2561  T_BROKER_VERSION client_version;
2562 
2563  char func_code = CAS_FC_EXECUTE_ARRAY;
2564 
2565  driver_info = proxy_get_driver_info_by_ctx (ctx_p);
2566  client_version = CAS_MAKE_PROTO_VER (driver_info);
2567  if (client_version >= CAS_PROTO_MAKE_VER (PROTOCOL_V4))
2568  {
2569  bind_value_index++;
2570 
2571  net_arg_get_int (&query_timeout, argv[1]);
2572  }
2573  else
2574  {
2575  query_timeout = 0;
2576  }
2577 
2578  return proxy_client_execute_internal (ctx_p, event_p, argc, argv, func_code, query_timeout, bind_value_index);
2579 }
2580 
2581 int
2583 {
2584  int error = 0;
2585 
2586  ENTER_FUNC ();
2587 
2588  error =
2591  if (error)
2592  {
2593  goto free_context;
2594  }
2595 
2596  proxy_event_free (event_p);
2597  event_p = NULL;
2598 
2599  EXIT_FUNC ();
2600  return 0;
2601 
2602 free_context:
2603  if (event_p)
2604  {
2605  proxy_event_free (event_p);
2606  event_p = NULL;
2607  }
2608 
2609  ctx_p->free_context = true;
2610 
2611  EXIT_FUNC ();
2612  return -1;
2613 }
2614 
2615 int
2617 {
2618  int error = 0;
2619  int i = 0;
2620  int prepare_argc_count = 0;
2621  int query_timeout;
2622 
2623  /* sql statement */
2624  char *sql_stmt = NULL;
2625  int sql_size = 0;
2626 
2627  /* argv */
2628  char flag = 0;
2629  char auto_commit_mode = FALSE;
2630  char func_code = CAS_FC_PREPARE_AND_EXECUTE;
2631 
2632  T_CAS_IO *cas_io_p;
2633  T_SHARD_STMT *stmt_p = NULL;
2634  int shard_id;
2635  SP_HINT_TYPE hint_type = HT_INVAL;
2636  T_SHARD_KEY_RANGE *range_p = NULL;
2637 
2638  char *driver_info;
2639  T_BROKER_VERSION client_version;
2640 
2641  char *request_p;
2642 
2643  /* set client tran status as IN_TRAN, when prepare */
2644  ctx_p->is_client_in_tran = true;
2645 
2646  request_p = event_p->buffer.data;
2647  assert (request_p); // __FOR_DEBUG
2648 
2649  if (ctx_p->waiting_event)
2650  {
2651  assert (false);
2652 
2654  ctx_p->waiting_event = NULL;
2655 
2656  goto free_context;
2657  }
2658 
2659  /* process argv */
2660  if (argc < 3)
2661  {
2662  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Invalid argument. (argc:%d). context(%s).", argc, proxy_str_context (ctx_p));
2664 
2665  proxy_event_free (event_p);
2666  event_p = NULL;
2667 
2668  EXIT_FUNC ();
2669  error = -1;
2670  goto end;
2671  }
2672 
2673  net_arg_get_int (&prepare_argc_count, argv[0]);
2674  net_arg_get_str (&sql_stmt, &sql_size, argv[1]);
2675  net_arg_get_char (flag, argv[2]);
2676  net_arg_get_char (auto_commit_mode, argv[3]);
2677  for (i = 4; i < prepare_argc_count + 1; i++)
2678  {
2679  int deferred_close_handle;
2680  net_arg_get_int (&deferred_close_handle, argv[i]);
2681 
2682  /* SHARD TODO : what to do for deferred close handle? */
2683  }
2684 
2685  driver_info = proxy_get_driver_info_by_ctx (ctx_p);
2686  client_version = CAS_MAKE_PROTO_VER (driver_info);
2687  if (client_version >= CAS_PROTO_MAKE_VER (PROTOCOL_V1))
2688  {
2689  net_arg_get_int (&query_timeout, argv[prepare_argc_count + 6]);
2690  if (!DOES_CLIENT_UNDERSTAND_THE_PROTOCOL (client_version, PROTOCOL_V2))
2691  {
2692  /* protocol version v1 driver send query timeout in second */
2693  query_timeout *= 1000;
2694  }
2695  }
2696  else
2697  {
2698  query_timeout = 0;
2699  }
2700 
2701  PROXY_DEBUG_LOG ("Process requested prepare and execute sql statement. " "(sql_stmt:[%s]). context(%s).",
2702  shard_str_sqls (sql_stmt), proxy_str_context (ctx_p));
2703 
2704  if (ctx_p->prepared_stmt == NULL)
2705  {
2706  stmt_p = shard_stmt_new_exclusive (sql_stmt, ctx_p->cid, ctx_p->uid, client_version);
2707  if (stmt_p == NULL)
2708  {
2710 
2711  proxy_event_free (event_p);
2712  event_p = NULL;
2713 
2714  EXIT_FUNC ();
2715 
2716  error = -1;
2717  goto end;
2718  }
2719 
2720  if (proxy_info_p->ignore_shard_hint == OFF)
2721  {
2722  error = shard_stmt_set_hint_list (stmt_p);
2723  if (error < 0)
2724  {
2725  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to set hint list. statement(%s). context(%s).",
2726  shard_str_stmt (stmt_p), proxy_str_context (ctx_p));
2727 
2729 
2730  /*
2731  * there must be no context sharing this statement at this time.
2732  * so, we can free statement.
2733  */
2734  shard_stmt_free (stmt_p);
2735  stmt_p = NULL;
2736 
2737  proxy_event_free (event_p);
2738  event_p = NULL;
2739 
2740  error = -1;
2741  goto end;
2742  }
2743  }
2744 
2745  /* save statement to the context */
2746  ctx_p->prepared_stmt = stmt_p;
2747  if (proxy_context_add_stmt (ctx_p, stmt_p) == NULL)
2748  {
2749  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to link statement to context. statement(%s). context(%s).",
2750  shard_str_stmt (stmt_p), proxy_str_context (ctx_p));
2751  goto free_context;
2752  }
2753 
2754  if (proxy_info_p->ignore_shard_hint == OFF)
2755  {
2756  hint_type = (SP_HINT_TYPE) shard_stmt_get_hint_type (stmt_p);
2757  if (hint_type <= HT_INVAL || hint_type > HT_EOF)
2758  {
2759  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unsupported hint type. (hint_type:%d). context(%s).", hint_type,
2760  proxy_str_context (ctx_p));
2761 
2763 
2764  proxy_event_free (event_p);
2765  event_p = NULL;
2766 
2767  error = -1;
2768  goto end;
2769  }
2770 
2771  shard_id = proxy_get_shard_id (stmt_p, NULL, &range_p);
2772  if (shard_id == PROXY_INVALID_SHARD)
2773  {
2774  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Invalid shard id. (shard_id:%d). context(%s).", shard_id,
2775  proxy_str_context (ctx_p));
2776 
2778 
2779  /* wakeup and reset statment */
2781  shard_stmt_free (ctx_p->prepared_stmt);
2782  ctx_p->prepared_stmt = NULL;
2783 
2784  proxy_event_free (event_p);
2785  event_p = NULL;
2786 
2787  error = -1;
2788  goto end;
2789  }
2790 
2791  if (ctx_p->shard_id != PROXY_INVALID_SHARD && ctx_p->shard_id != shard_id)
2792  {
2794  "Shard id couldn't be changed in a transaction. "
2795  "(prev_shard_id:%d, curr_shard_id:%d). context(%s).", ctx_p->shard_id, shard_id,
2796  proxy_str_context (ctx_p));
2797 
2799 
2800  EXIT_FUNC ();
2801  goto free_context;
2802  }
2803 
2804  ctx_p->shard_id = shard_id;
2805  }
2806 
2807  proxy_set_wait_timeout (ctx_p, query_timeout);
2808  }
2809  else
2810  {
2811  if (proxy_info_p->ignore_shard_hint == OFF && ctx_p->shard_id == PROXY_INVALID_SHARD)
2812  {
2813  assert (false);
2814 
2815  proxy_event_free (event_p);
2816  event_p = NULL;
2817 
2818  error = -1;
2819  goto end;
2820  }
2821 
2823  {
2824  assert (false);
2825 
2826  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unexpected statement type. expect(%d), statement(%s). context(%s).",
2828 
2829  proxy_event_free (event_p);
2830  event_p = NULL;
2831 
2832  error = -1;
2833  goto end;
2834  }
2835  }
2836 
2837  if (stmt_p == NULL)
2838  {
2839  if (ctx_p->prepared_stmt == NULL)
2840  {
2841  assert (false);
2842  PROXY_LOG (PROXY_LOG_MODE_ERROR, "No prepared statement to execute");
2843 
2845  EXIT_FUNC ();
2846  goto free_context;
2847  }
2848 
2849  stmt_p = ctx_p->prepared_stmt;
2850  (void) proxy_get_shard_id (stmt_p, NULL, &range_p);
2851  }
2852 
2853  cas_io_p =
2854  proxy_cas_alloc_by_ctx (ctx_p->client_id, ctx_p->shard_id, ctx_p->cas_id, ctx_p->cid, ctx_p->uid,
2855  ctx_p->wait_timeout, func_code);
2856  if (cas_io_p == NULL)
2857  {
2858  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to allocate CAS. context(%s).", proxy_str_context (ctx_p));
2859 
2861 
2862  EXIT_FUNC ();
2863  error = -1;
2864  goto end;
2865  }
2866  else if (cas_io_p == (T_CAS_IO *) SHARD_TEMPORARY_UNAVAILABLE)
2867  {
2868  /* waiting idle shard/cas */
2869  ctx_p->waiting_event = event_p;
2870  event_p = NULL;
2871 
2872  EXIT_FUNC ();
2873  error = 0;
2874  goto end;
2875  }
2876 
2877  proxy_context_set_in_tran (ctx_p, cas_io_p->shard_id, cas_io_p->cas_id);
2878 
2879  /* If we will fail to execute query, then we have to retry. */
2880  ctx_p->waiting_event = proxy_event_dup (event_p);
2881  if (ctx_p->waiting_event == NULL)
2882  {
2883  goto free_context;
2884  }
2885 
2886  ctx_p->stmt_hint_type = hint_type;
2887  ctx_p->stmt_h_id = stmt_p->stmt_h_id;
2888 
2889  if (proxy_info_p->ignore_shard_hint == OFF)
2890  {
2891  /* update shard statistics */
2892  if (stmt_p)
2893  {
2894  proxy_update_shard_stats (stmt_p, range_p);
2895  }
2896  else
2897  {
2898  assert (false);
2899  }
2900  }
2901  else
2902  {
2904  }
2905 
2906  error = proxy_cas_io_write (cas_io_p, event_p);
2907  if (error)
2908  {
2909  goto free_context;
2910  }
2911 
2912  ctx_p->func_code = func_code;
2913 
2914 end:
2915  EXIT_FUNC ();
2916  return error;
2917 
2918 free_context:
2919  if (ctx_p->waiting_event == event_p)
2920  {
2921  ctx_p->waiting_event = NULL;
2922  }
2923 
2924  if (event_p)
2925  {
2926  proxy_event_free (event_p);
2927  event_p = NULL;
2928  }
2929 
2930  ctx_p->free_context = true;
2931 
2932  EXIT_FUNC ();
2933  return error;
2934 }
2935 
2936 int
2938 {
2940 
2941  ENTER_FUNC ();
2942 
2943  assert (event_p);
2944  if (event_p)
2945  {
2946  proxy_event_free (event_p);
2947  event_p = NULL;
2948  }
2949 
2950  EXIT_FUNC ();
2951  return 0;
2952 }
2953 
2954 /*
2955  * process cas response
2956  */
2957 int
2959 {
2960  int error;
2961 
2962  ENTER_FUNC ();
2963 
2964  ctx_p->dont_free_statement = false;
2965 
2966  if (ctx_p->free_on_client_io_write)
2967  {
2968  PROXY_DEBUG_LOG ("Free context on client io write. " "context will be terminated. context(%s).",
2969  proxy_str_context (ctx_p));
2970 
2971  error =
2974 
2975  goto free_event;
2976  }
2977  else if (ctx_p->free_on_end_tran)
2978  {
2979  PROXY_DEBUG_LOG ("Free context on end tran. " "context will be terminated. context(%s).",
2980  proxy_str_context (ctx_p));
2981 
2982  ctx_p->free_context = true;
2983 
2984  error = 0 /* no error */ ;
2985  goto free_event;
2986  }
2987 
2988  error = proxy_send_response_to_client (ctx_p, event_p);
2989  if (error)
2990  {
2991  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to response to the client. " "(error=%d). context(%s). evnet(%s).",
2992  error, proxy_str_context (ctx_p), proxy_str_event (event_p));
2993 
2994  ctx_p->free_context = true;
2995 
2996  error = -1;
2997  goto free_event;
2998  }
2999  event_p = NULL;
3000 
3001  EXIT_FUNC ();
3002  return error;
3003 
3004 free_event:
3005  if (event_p)
3006  {
3007  proxy_event_free (event_p);
3008  event_p = NULL;
3009  }
3010 
3011  EXIT_FUNC ();
3012  return error;
3013 }
3014 
3015 int
3017 {
3018  /* SHARD TODO : REFACTOR */
3019  /* Protocol Phase 0 : MSG_HEADER */
3020 
3021  /* Protocol Phase 1 : GENERAL INFO */
3022  /* INT : SRV_H_ID */
3023  /* INT : RESULT_CACHE_LIFETIME */
3024  /* CHAR : STMT_TYPE */
3025  /* INT : NUM_MARKERS */
3026  /* CHAR : UPDATABLE_FLAG */
3027 
3028  /* Protocol Phase 2 : PREPARED COLUMN COUNT */
3029  /* CASE OF SELECT */
3030  /* INT : NUM_COLS */
3031  /* CASE OF STMT_CALL, STMT_GET_STATS, STMT_EVALUATE */
3032  /* INT : NUM_COLS(1) */
3033  /* ELSE CASE */
3034  /* INT : NUM_COLS(0) */
3035 
3036  /* Protocol Phase 3 : PREPARED COLUMN INFO LIST (NUM_COLS) -- prepare_column_info_set */
3037 
3038  int error, error_ind, error_code;
3039  int srv_h_id;
3040  int stmt_h_id_n;
3041  int srv_h_id_offset = MSG_HEADER_SIZE;
3042  char *srv_h_id_pos;
3043 
3044  T_SHARD_STMT *stmt_p;
3045 
3046  char *response_p;
3047 
3048  char *driver_info;
3049  T_BROKER_VERSION client_version;
3050 
3051  ENTER_FUNC ();
3052 
3053  response_p = event_p->buffer.data;
3054  assert (response_p); // __FOR_DEBUG
3055 
3056  if (ctx_p->is_client_in_tran)
3057  {
3058  proxy_set_con_status_in_tran (response_p);
3059  }
3060 
3061  if (ctx_p->waiting_event && ctx_p->is_prepare_for_execute == false)
3062  {
3064  ctx_p->waiting_event = NULL;
3065  }
3066 
3067  driver_info = proxy_get_driver_info_by_ctx (ctx_p);
3068  client_version = CAS_MAKE_PROTO_VER (driver_info);
3069 
3070  error_ind = proxy_check_cas_error (response_p);
3071  if (error_ind < 0)
3072  {
3073  error_code = proxy_get_cas_error_code (response_p, client_version);
3074 
3075  PROXY_LOG (PROXY_LOG_MODE_NOTICE, "CAS response error. " "(error_ind:%d, error_code:%d). context(%s).", error_ind,
3076  error_code, proxy_str_context (ctx_p));
3077  /* relay error to the client */
3078  error = proxy_send_response_to_client (ctx_p, event_p);
3079  if (error)
3080  {
3081  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to response to the client. " "(error=%d). context(%s). evnet(%s).",
3082  error, proxy_str_context (ctx_p), proxy_str_event (event_p));
3083 
3084  /* statement waiters may be woken up when freeing context */
3085  goto free_context;
3086  }
3087  event_p = NULL;
3088 
3089  /* if stmt's status is SHARD_STMT_STATUS_IN_PROGRESS, wake up waiters and free statement. */
3090  stmt_p = ctx_p->prepared_stmt;
3091  /* __FOR_DEBUG */
3092  assert (stmt_p);
3093 
3094  if (stmt_p && stmt_p->status == SHARD_STMT_STATUS_IN_PROGRESS)
3095  {
3096  /* check and wakeup statement waiter */
3098 
3099  /*
3100  * there must be no context sharing this statement at this time.
3101  * so, we can free statement.
3102  */
3103  shard_stmt_free (ctx_p->prepared_stmt);
3104  }
3105 
3106  ctx_p->prepared_stmt = NULL;
3107  ctx_p->is_prepare_for_execute = false;
3108 
3109  if (ctx_p->waiting_event)
3110  {
3112  ctx_p->waiting_event = NULL;
3113  }
3114 
3115  EXIT_FUNC ();
3116  return 0;
3117  }
3118 
3119  stmt_p = ctx_p->prepared_stmt;
3120  if (stmt_p == NULL)
3121  {
3122  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Invalid statement. Statement couldn't be NULL. context(%s).",
3123  proxy_str_context (ctx_p));
3124 
3125  goto free_context;
3126  }
3127 
3128  /* save prepare response to the statement */
3129  if (stmt_p->reply_buffer == NULL)
3130  {
3131  stmt_p->reply_buffer = proxy_dup_msg (response_p);
3132  if (stmt_p->reply_buffer == NULL)
3133  {
3134  /* statement waiters may be woken up when freeing context */
3135 #if 0
3136  ctx_p->prepared_stmt = NULL;
3137 #endif
3138  goto free_context;
3139  }
3140  stmt_p->reply_buffer_length = get_msg_length (response_p);
3141 
3142  /* modify stmt_h_id */
3143  stmt_h_id_n = htonl (stmt_p->stmt_h_id);
3144  memcpy ((char *) stmt_p->reply_buffer + srv_h_id_offset, (char *) &stmt_h_id_n, NET_SIZE_INT);
3145  }
3146  else
3147  {
3149  (event_p->buffer.data, event_p->buffer.length, (const char *) stmt_p->reply_buffer,
3150  stmt_p->reply_buffer_length))
3151  {
3152  PROXY_LOG (PROXY_LOG_MODE_DEBUG, "Invalid statement. schema info is different. context(%s). " "stmt(%s).",
3153  proxy_str_context (ctx_p), shard_str_stmt (stmt_p));
3154 
3156 
3157  ctx_p->is_prepare_for_execute = false;
3158 
3159  if (ctx_p->waiting_event)
3160  {
3162  ctx_p->waiting_event = NULL;
3163  }
3164 
3166 
3167  proxy_event_free (event_p);
3168  event_p =
3171  ctx_p->is_client_in_tran);
3172  if (event_p == NULL)
3173  {
3174  goto free_context;
3175  }
3176 
3177  /* relay error to the client */
3178  error = proxy_send_response_to_client (ctx_p, event_p);
3179  if (error)
3180  {
3182  "Failed to response to the client. " "(error=%d). context(%s). evnet(%s).", error,
3183  proxy_str_context (ctx_p), proxy_str_event (event_p));
3184 
3185  /* statement waiters may be woken up when freeing context */
3186  goto free_context;
3187  }
3188 
3189  EXIT_FUNC ();
3190  return 0;
3191  }
3192  }
3193 
3194  /* save server handle id for this statement from shard/cas */
3195  srv_h_id = error_ind;
3196  error = shard_stmt_add_srv_h_id_for_shard_cas (stmt_p->stmt_h_id, ctx_p->shard_id, ctx_p->cas_id, srv_h_id);
3197  if (error < 0)
3198  {
3200  "Failed to save server handle " "id to the statement " "(srv_h_id:%d). statement(%s). context(%s). ",
3201  srv_h_id, shard_str_stmt (stmt_p), proxy_str_context (ctx_p));
3202  goto free_context;
3203  }
3204 
3205  if (ctx_p->is_prepare_for_execute)
3206  {
3207  /*
3208  * after finishing prepare_for_execute, process previous client exeucte
3209  * request again.
3210  */
3211 
3212  ctx_p->prepared_stmt = NULL;
3213  ctx_p->is_prepare_for_execute = false;
3214 
3215  assert (ctx_p->waiting_event); // __FOR_DEBUG
3216  error = shard_queue_enqueue (&proxy_Handler.cli_ret_q, (void *) ctx_p->waiting_event);
3217 
3218  if (error)
3219  {
3220  goto free_context;
3221  }
3222 
3223  ctx_p->waiting_event = NULL; /* DO NOT DELETE */
3224 
3225  /* do not relay response to the client, free event here */
3226  proxy_event_free (event_p);
3227  event_p = NULL;
3228 
3229  EXIT_FUNC ();
3230  return 0;
3231  }
3232  else
3233  {
3234  /*
3235  * after dummy prepare,
3236  * we must not free(unpin) statment event though tran status is OUT_TRAN.
3237  */
3238  ctx_p->dont_free_statement = true;
3239  }
3240 
3241  /* The following codes does not use */
3242 
3243  ctx_p->prepared_stmt = NULL;
3244 
3245  /* REPLACE SRV_H_ID */
3246  stmt_h_id_n = htonl (stmt_p->stmt_h_id);
3247  srv_h_id_pos = (char *) (response_p + srv_h_id_offset);
3248  memcpy (srv_h_id_pos, &stmt_h_id_n, NET_SIZE_INT);
3249 
3250  PROXY_LOG (PROXY_LOG_MODE_SHARD_DETAIL, "Replace server handle id. " "(requested_srv_h_id:%d, saved_srv_h_id:%d).",
3251  srv_h_id, stmt_p->stmt_h_id);
3252 
3253  error = proxy_send_response_to_client (ctx_p, event_p);
3254  if (error)
3255  {
3256  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to response to the client. " "(error=%d). context(%s). evnet(%s).",
3257  error, proxy_str_context (ctx_p), proxy_str_event (event_p));
3258 
3259  goto free_context;
3260  }
3261 
3262  EXIT_FUNC ();
3263  return 0;
3264 
3265 free_context:
3266  if (event_p)
3267  {
3268  proxy_event_free (event_p);
3269  event_p = NULL;
3270  }
3271 
3272  ctx_p->free_context = true;
3273 
3274  EXIT_FUNC ();
3275  return -1;
3276 }
3277 
3278 int
3280 {
3281  /* Protocol Phase 0 : MSG_HEADER */
3282 
3283  /* Protocol Phase 1 : GENERAL INFO */
3284  /* INT : ROW_COUNT */
3285  /* CHAR : CLT_CACHE_REUSABLE(1 or 0) */
3286  /* INT : NUM_QUERY_RESULT */
3287 
3288  /* Protocol Phase 2 : QUERY RESULT(NUM_QEURY_RESULT) */
3289  /* CHAR : STMT_TYPE */
3290  /* INT : TUPLE_COUNT */
3291  /* T_OBJECT : INS_OID */
3292  /* INT : SRV_CACHE_TIME.SEC */
3293  /* INT : SRV_CACHE_TIME.USEC */
3294 
3295  return proxy_cas_execute_internal (ctx_p, event_p);
3296 }
3297 
3298 static int
3300 {
3301  int error;
3302  int error_ind;
3303  int error_code = 0;
3304  bool is_in_tran;
3305  char *response_p;
3306  char *driver_info;
3307  T_BROKER_VERSION client_version;
3308 
3309  ENTER_FUNC ();
3310 
3311  response_p = event_p->buffer.data;
3312  assert (response_p); // __FOR_DEBUG
3313 
3314  if (ctx_p->waiting_event)
3315  {
3317  ctx_p->waiting_event = NULL;
3318  }
3319 
3320  is_in_tran = proxy_handler_is_cas_in_tran (ctx_p->shard_id, ctx_p->cas_id);
3321  if (is_in_tran == false)
3322  {
3323  ctx_p->is_client_in_tran = false;
3324  }
3325 
3326  error_ind = proxy_check_cas_error (response_p);
3327  if (error_ind < 0)
3328  {
3329  driver_info = proxy_get_driver_info_by_ctx (ctx_p);
3330  client_version = CAS_MAKE_PROTO_VER (driver_info);
3331  error_code = proxy_get_cas_error_code (response_p, client_version);
3332 
3333  PROXY_LOG (PROXY_LOG_MODE_NOTICE, "CAS response error. " "(error_ind:%d, error_code:%d). context(%s).", error_ind,
3334  error_code, proxy_str_context (ctx_p));
3335 
3336  assert (ctx_p->stmt_h_id >= 0);
3337 
3339 
3340  if (proxy_is_invalid_statement (error_ind, error_code, driver_info, client_version))
3341  {
3343  }
3344  }
3345 
3346  if (proxy_info_p->ignore_shard_hint == OFF)
3347  {
3348  switch (ctx_p->stmt_hint_type)
3349  {
3350  case HT_NONE:
3351  proxy_info_p->num_hint_none_queries_processed++;
3352  break;
3353  case HT_KEY:
3354  proxy_info_p->num_hint_key_queries_processed++;
3355  break;
3356  case HT_ID:
3357  proxy_info_p->num_hint_id_queries_processed++;
3358  break;
3359  case HT_ALL:
3360  proxy_info_p->num_hint_all_queries_processed++;
3361  break;
3362  default:
3363  PROXY_LOG (PROXY_LOG_MODE_NOTICE, "Unsupported statement hint type. " "(hint_type:%d). context(%s).",
3364  ctx_p->stmt_hint_type, proxy_str_context (ctx_p));
3365  }
3366  }
3367  else
3368  {
3369  proxy_info_p->num_hint_none_queries_processed++;
3370  }
3371 
3372  ctx_p->stmt_hint_type = HT_INVAL;
3374 
3375  error = proxy_send_response_to_client (ctx_p, event_p);
3376  if (error)
3377  {
3378  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to response to the client. " "(error=%d). context(%s). evnet(%s).",
3379  error, proxy_str_context (ctx_p), proxy_str_event (event_p));
3380 
3381  goto free_context;
3382  }
3383  event_p = NULL;
3384 
3385  EXIT_FUNC ();
3386  return 0;
3387 
3388 free_context:
3389  if (event_p)
3390  {
3391  proxy_event_free (event_p);
3392  event_p = NULL;
3393  }
3394 
3395  ctx_p->free_context = true;
3396 
3397  EXIT_FUNC ();
3398  return -1;
3399 }
3400 
3401 int
3403 {
3404  /* Protocol Phase 0 : MSG_HEADER */
3405 
3406  /* Protocol Phase 1 : GENERAL INFO */
3407  /* INT : RESULT_CODE */
3408  /* INT : NUM_QUERY */
3409 
3410  /* Protocol Phase 2 : QUERY RESULT(NUM_QEURY) */
3411  /* CASE 1 : QUERY SUCCESS */
3412  /* CHAR : STMT_TYPE */
3413  /* INT : TUPLE_COUNT */
3414  /* T_OBJECT : INS_OID */
3415 
3416  /* CASE 2 : QUERY FAIL */
3417  /* CHAR : STMT_TYPE */
3418  /* INT : ERROR_INDICATOR */
3419  /* INT : ERROR_CODE */
3420  /* INT : ERROR_MSG_LEN */
3421  /* STR : ERROR_MSG */
3422 
3423  return proxy_cas_execute_internal (ctx_p, event_p);
3424 }
3425 
3426 int
3428 {
3429  int error;
3430  bool is_in_tran;
3431 
3432  ENTER_FUNC ();
3433 
3434  is_in_tran = proxy_handler_is_cas_in_tran (ctx_p->shard_id, ctx_p->cas_id);
3435  if (is_in_tran == false)
3436  {
3437  ctx_p->is_client_in_tran = false;
3438  }
3439 
3440  error = proxy_send_response_to_client (ctx_p, event_p);
3441  if (error)
3442  {
3443  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to response to the client. " "(error=%d). context(%s). evnet(%s).",
3444  error, proxy_str_context (ctx_p), proxy_str_event (event_p));
3445 
3446  goto free_context;
3447  }
3448  event_p = NULL;
3449 
3450  EXIT_FUNC ();
3451  return 0;
3452 
3453 free_context:
3454  if (event_p)
3455  {
3456  proxy_event_free (event_p);
3457  event_p = NULL;
3458  }
3459 
3460  ctx_p->free_context = true;
3461 
3462  EXIT_FUNC ();
3463  return -1;
3464 }
3465 
3466 int
3468 {
3469  int error, error_ind, error_code;
3470  int srv_h_id;
3471  int stmt_h_id_n;
3472  int srv_h_id_offset = MSG_HEADER_SIZE;
3473  char *srv_h_id_pos;
3474  T_SHARD_STMT *stmt_p;
3475  char *response_p;
3476  char *driver_info;
3477  T_BROKER_VERSION client_version;
3478 
3479  ENTER_FUNC ();
3480 
3481  response_p = event_p->buffer.data;
3482 
3483  error_ind = proxy_check_cas_error (response_p);
3484  if (error_ind < 0)
3485  {
3486  driver_info = proxy_get_driver_info_by_ctx (ctx_p);
3487  client_version = CAS_MAKE_PROTO_VER (driver_info);
3488  error_code = proxy_get_cas_error_code (response_p, client_version);
3489 
3490  PROXY_LOG (PROXY_LOG_MODE_NOTICE, "CAS response error. " "(error_ind:%d, error_code:%d). context(%s).", error_ind,
3491  error_code, proxy_str_context (ctx_p));
3492 
3493  /* relay error to the client */
3494  error = proxy_send_response_to_client (ctx_p, event_p);
3495  if (error)
3496  {
3497  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to response to the client. " "(error=%d). context(%s). evnet(%s).",
3498  error, proxy_str_context (ctx_p), proxy_str_event (event_p));
3499 
3500  /* statement waiters may be woken up when freeing context */
3501  goto free_context;
3502  }
3503  event_p = NULL;
3504 
3505  /* if stmt's status is SHARD_STMT_STATUS_IN_PROGRESS, wake up waiters and free statement. */
3506  assert (ctx_p->prepared_stmt != NULL);
3507  stmt_p = ctx_p->prepared_stmt;
3508 
3509  if (stmt_p && stmt_p->status == SHARD_STMT_STATUS_IN_PROGRESS)
3510  {
3511  /* check and wakeup statement waiter */
3513 
3514  /*
3515  * there must be no context sharing this statement at this time.
3516  * so, we can free statement.
3517  */
3518  shard_stmt_free (ctx_p->prepared_stmt);
3519  }
3520  ctx_p->prepared_stmt = NULL;
3521 
3522  assert (ctx_p->waiting_event == NULL);
3523  EXIT_FUNC ();
3524  return 0;
3525  }
3526 
3527  stmt_p = ctx_p->prepared_stmt;
3528  if (stmt_p == NULL)
3529  {
3530  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Invalid statement. Statement couldn't be NULL. context(%s).",
3531  proxy_str_context (ctx_p));
3532 
3533  goto free_context;
3534  }
3535 
3536  /* save server handle id for this statement from shard/cas */
3537  srv_h_id = error_ind;
3538  error = shard_stmt_add_srv_h_id_for_shard_cas (stmt_p->stmt_h_id, ctx_p->shard_id, ctx_p->cas_id, srv_h_id);
3539  if (error < 0)
3540  {
3542  "Failed to save server handle " "id to the statement " "(srv_h_id:%d). statement(%s). context(%s). ",
3543  srv_h_id, shard_str_stmt (stmt_p), proxy_str_context (ctx_p));
3544  goto free_context;
3545  }
3546 
3547  ctx_p->prepared_stmt = NULL;
3548 
3549  /* replace server handle id */
3550  stmt_h_id_n = htonl (stmt_p->stmt_h_id);
3551  srv_h_id_pos = (char *) (response_p + srv_h_id_offset);
3552  memcpy (srv_h_id_pos, &stmt_h_id_n, NET_SIZE_INT);
3553 
3554  PROXY_LOG (PROXY_LOG_MODE_SHARD_DETAIL, "Replace server handle id. " "(requested_srv_h_id:%d, saved_srv_h_id:%d).",
3555  srv_h_id, stmt_p->stmt_h_id);
3556 
3557  error = proxy_send_response_to_client (ctx_p, event_p);
3558  if (error)
3559  {
3560  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to response to the client. " "(error=%d). context(%s). evnet(%s).",
3561  error, proxy_str_context (ctx_p), proxy_str_event (event_p));
3562 
3563  goto free_context;
3564  }
3565  event_p = NULL;
3566 
3567  EXIT_FUNC ();
3568  return 0;
3569 
3570 free_context:
3571  if (event_p)
3572  {
3573  proxy_event_free (event_p);
3574  event_p = NULL;
3575  }
3576 
3577  ctx_p->free_context = true;
3578 
3579  EXIT_FUNC ();
3580  return -1;
3581 }
3582 
3583 int
3585 {
3586  int error, error_ind, error_code;
3587  int srv_h_id;
3588  int stmt_h_id_n;
3589  int srv_h_id_offset = MSG_HEADER_SIZE;
3590  char *srv_h_id_pos;
3591  bool is_in_tran;
3592  char *response_p;
3593  char *driver_info;
3594  T_BROKER_VERSION client_version;
3595  T_SHARD_STMT *stmt_p = NULL;
3596 
3597  ENTER_FUNC ();
3598 
3599  if (ctx_p->waiting_event)
3600  {
3602  ctx_p->waiting_event = NULL;
3603  }
3604 
3605  is_in_tran = proxy_handler_is_cas_in_tran (ctx_p->shard_id, ctx_p->cas_id);
3606  if (is_in_tran == false)
3607  {
3608  ctx_p->is_client_in_tran = false;
3609  }
3610 
3611  response_p = event_p->buffer.data;
3612 
3613  error_ind = proxy_check_cas_error (response_p);
3614  if (error_ind < 0)
3615  {
3616  driver_info = proxy_get_driver_info_by_ctx (ctx_p);
3617  client_version = CAS_MAKE_PROTO_VER (driver_info);
3618  error_code = proxy_get_cas_error_code (response_p, client_version);
3619 
3620  PROXY_LOG (PROXY_LOG_MODE_NOTICE, "CAS response error. " "(error_ind:%d, error_code:%d). context(%s).", error_ind,
3621  error_code, proxy_str_context (ctx_p));
3622 
3623  /* relay error to the client */
3624  error = proxy_send_response_to_client (ctx_p, event_p);
3625  if (error)
3626  {
3627  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to response to the client. " "(error=%d). context(%s). evnet(%s).",
3628  error, proxy_str_context (ctx_p), proxy_str_event (event_p));
3629 
3630  /* statement waiters may be woken up when freeing context */
3631  goto free_context;
3632  }
3633  event_p = NULL;
3634 
3635  assert (ctx_p->prepared_stmt != NULL);
3636  stmt_p = ctx_p->prepared_stmt;
3637 
3638  if (stmt_p)
3639  {
3640  /*
3641  * there must be no context sharing this statement at this time.
3642  * so, we can free statement.
3643  */
3644  shard_stmt_free (ctx_p->prepared_stmt);
3645  }
3646  ctx_p->prepared_stmt = NULL;
3647 
3648  assert (ctx_p->waiting_event == NULL);
3649  EXIT_FUNC ();
3650  return 0;
3651  }
3652 
3653  stmt_p = ctx_p->prepared_stmt;
3654  if (stmt_p == NULL)
3655  {
3656  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Invalid statement. Statement couldn't be NULL. context(%s).",
3657  proxy_str_context (ctx_p));
3658 
3659  goto free_context;
3660  }
3661 
3662  /* save server handle id for this statement from shard/cas */
3663  srv_h_id = error_ind;
3664  error = shard_stmt_add_srv_h_id_for_shard_cas (stmt_p->stmt_h_id, ctx_p->shard_id, ctx_p->cas_id, srv_h_id);
3665  if (error < 0)
3666  {
3668  "Failed to save server handle " "id to the statement " "(srv_h_id:%d). statement(%s). context(%s). ",
3669  srv_h_id, shard_str_stmt (stmt_p), proxy_str_context (ctx_p));
3670  goto free_context;
3671  }
3672 
3673  ctx_p->prepared_stmt = NULL;
3674 
3675  /* replace server handle id */
3676  stmt_h_id_n = htonl (stmt_p->stmt_h_id);
3677  srv_h_id_pos = (char *) (response_p + srv_h_id_offset);
3678  memcpy (srv_h_id_pos, &stmt_h_id_n, NET_SIZE_INT);
3679 
3680  PROXY_LOG (PROXY_LOG_MODE_SHARD_DETAIL, "Replace server handle id. " "(requested_srv_h_id:%d, saved_srv_h_id:%d).",
3681  srv_h_id, stmt_p->stmt_h_id);
3682 
3683  if (proxy_info_p->ignore_shard_hint == OFF)
3684  {
3685  switch (ctx_p->stmt_hint_type)
3686  {
3687  case HT_NONE:
3688  proxy_info_p->num_hint_none_queries_processed++;
3689  break;
3690  case HT_KEY:
3691  proxy_info_p->num_hint_key_queries_processed++;
3692  break;
3693  case HT_ID:
3694  proxy_info_p->num_hint_id_queries_processed++;
3695  break;
3696  case HT_ALL:
3697  proxy_info_p->num_hint_all_queries_processed++;
3698  break;
3699  default:
3700  PROXY_LOG (PROXY_LOG_MODE_NOTICE, "Unsupported statement hint type. " "(hint_type:%d). context(%s).",
3701  ctx_p->stmt_hint_type, proxy_str_context (ctx_p));
3702  }
3703  }
3704  else
3705  {
3706  proxy_info_p->num_hint_none_queries_processed++;
3707  }
3708 
3709  ctx_p->stmt_hint_type = HT_INVAL;
3711 
3712  error = proxy_send_response_to_client (ctx_p, event_p);
3713  if (error)
3714  {
3715  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to response to the client. " "(error=%d). context(%s). evnet(%s).",
3716  error, proxy_str_context (ctx_p), proxy_str_event (event_p));
3717 
3718  goto free_context;
3719  }
3720  event_p = NULL;
3721 
3722  EXIT_FUNC ();
3723  return 0;
3724 
3725 free_context:
3726  if (event_p)
3727  {
3728  proxy_event_free (event_p);
3729  event_p = NULL;
3730  }
3731 
3732  ctx_p->free_context = true;
3733 
3734  EXIT_FUNC ();
3735  return -1;
3736 }
3737 
3738 int
3740 {
3741  int error = 0;
3742  int error_code = 0;
3743  int error_ind = 0;
3744  char *response_p = NULL;
3745  char *driver_info = NULL;
3746  T_BROKER_VERSION client_version;
3747  struct timeval client_start_time;
3748  T_PROXY_EVENT *new_event_p = NULL;
3749 
3750  gettimeofday (&client_start_time, NULL);
3751 
3752  if (ctx_p->waiting_event)
3753  {
3755  ctx_p->waiting_event = NULL;
3756  }
3757 
3758  response_p = event_p->buffer.data;
3759  driver_info = proxy_get_driver_info_by_ctx (ctx_p);
3760 
3761  if (event_p->buffer.length > MSG_HEADER_SIZE)
3762  {
3763  error_ind = proxy_check_cas_error (response_p);
3764  assert (error_ind < 0);
3765 
3766  client_version = CAS_MAKE_PROTO_VER (driver_info);
3767  error_code = proxy_get_cas_error_code (response_p, client_version);
3768 
3769  PROXY_LOG (PROXY_LOG_MODE_NOTICE, "CAS response error. " "(error_ind:%d, error_code:%d). context(%s).", error_ind,
3770  error_code, proxy_str_context (ctx_p));
3771 
3772  error = proxy_send_response_to_client (ctx_p, event_p);
3773  if (error)
3774  {
3775  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to response to the client. " "(error=%d). context(%s). evnet(%s).",
3776  error, proxy_str_context (ctx_p), proxy_str_event (event_p));
3777 
3778  goto free_context;
3779  }
3780  }
3781  else
3782  {
3783  /* send dbinfo_ok to the client */
3784  new_event_p =
3787  if (new_event_p == NULL)
3788  {
3789  goto free_context;
3790  }
3791 
3792  if (event_p)
3793  {
3794  proxy_event_free (event_p);
3795  event_p = NULL;
3796  }
3797 
3798  error = proxy_send_response_to_client (ctx_p, new_event_p);
3799  if (error)
3800  {
3801  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to response to the client. " "(error=%d). context(%s). evnet(%s).",
3802  error, proxy_str_context (ctx_p), proxy_str_event (new_event_p));
3803 
3804  goto free_context;
3805  }
3806  }
3807 
3808  if (error_ind < 0)
3809  {
3810  ctx_p->free_on_client_io_write = true;
3811  }
3812  else
3813  {
3814  ctx_p->is_connected = true;
3816  }
3817 
3818  return 0;
3819 
3820 free_context:
3821  if (event_p)
3822  {
3823  proxy_event_free (event_p);
3824  event_p = NULL;
3825  }
3826 
3827  if (new_event_p)
3828  {
3829  proxy_event_free (new_event_p);
3830  }
3831 
3832  ctx_p->free_context = true;
3833  return -1;
3834 }
3835 
3836 int
3838 {
3839  int error;
3840 
3841  ENTER_FUNC ();
3842 
3843  error = proxy_send_response_to_client (ctx_p, event_p);
3844  if (error)
3845  {
3846  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to response to the client. " "(error=%d). context(%s). evnet(%s).",
3847  error, proxy_str_context (ctx_p), proxy_str_event (event_p));
3848 
3849  goto free_context;
3850  }
3851  event_p = NULL;
3852 
3853  EXIT_FUNC ();
3854  return 0;
3855 
3856 free_context:
3857  if (event_p)
3858  {
3859  proxy_event_free (event_p);
3860  event_p = NULL;
3861  }
3862 
3863  ctx_p->free_context = true;
3864 
3865  EXIT_FUNC ();
3866  return -1;
3867 }
3868 
3869 /* disconnect event */
3870 int
3872 {
3873  int error = 0;
3874 
3875  ENTER_FUNC ();
3876 
3877  /*
3878  * we need not send abort while waiting for a response.
3879  */
3880  if (ctx_p->is_in_tran && !IS_VALID_CAS_FC (ctx_p->func_code))
3881  {
3882  ctx_p->free_on_end_tran = true;
3883  ctx_p->func_code = CAS_FC_END_TRAN;
3884 
3885  error =
3888  if (error < 0)
3889  {
3890  error = -1;
3891  goto free_context;
3892  }
3893  else if (error > 0)
3894  {
3895  assert (false);
3896  error = -1;
3897  goto free_context;
3898  }
3899 
3900  EXIT_FUNC ();
3901  return 0;
3902  }
3903 
3904 free_context:
3905  ctx_p->free_context = true;
3906 
3907  EXIT_FUNC ();
3908  return -1;
3909 }
3910 
3911 int
3913 {
3914  /* nothing to do */
3915  return 0;
3916 }
3917 
3918 static bool
3919 proxy_is_invalid_statement (int error_ind, int error_code, char *driver_info, T_BROKER_VERSION client_version)
3920 {
3921  int new_error_code = 0;
3922 
3923  new_error_code = proxy_convert_error_code (error_ind, error_code, driver_info, client_version, PROXY_CONV_ERR_TO_NEW);
3924 
3925  if (client_version < CAS_MAKE_VER (8, 3, 0) && new_error_code == CAS_ER_STMT_POOLING)
3926  {
3927  return true;
3928  }
3929  else if (error_ind == CAS_ERROR_INDICATOR && new_error_code == CAS_ER_STMT_POOLING)
3930  {
3931  return true;
3932  }
3933 
3934  return false;
3935 }
3936 
3937 static bool
3938 proxy_has_different_column_info (const char *r1, size_t r1_len, const char *r2, size_t r2_len)
3939 {
3940  int ignore_size = MSG_HEADER_SIZE + /* srv_h_id */ NET_SIZE_INT
3941  + /* result_cache_lifetime */ NET_SIZE_INT;
3942 
3943  r1 += ignore_size;
3944  r2 += ignore_size;
3945 
3946  if (r1_len != r2_len)
3947  {
3948  return true;
3949  }
3950 
3951  return (memcmp (r1, r2, r1_len - ignore_size) != 0) ? true : false;
3952 }
int proxy_io_set_established_by_ctx(T_PROXY_CONTEXT *ctx_p)
T_PROXY_EVENT * proxy_event_new_with_req(char *driver_info, unsigned int type, int from, T_PROXY_EVENT_FUNC req_func)
int fn_proxy_client_not_supported(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p, int argc, char **argv)
enum sp_hint_type SP_HINT_TYPE
Definition: shard_parser.h:59
T_SHARD_QUEUE cli_ret_q
int(* T_PROXY_EVENT_FUNC)(char *driver_info, char **buffer)
int fn_proxy_client_con_close(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p, int argc, char **argv)
T_CONTEXT_STMT * proxy_context_add_stmt(T_PROXY_CONTEXT *ctx_p, T_SHARD_STMT *stmt_p)
void proxy_set_force_out_tran(char *msg)
int fn_proxy_client_set_db_parameter(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p, int argc, char **argv)
#define MSG_HEADER_SIZE
Definition: cas_protocol.h:114
static int proxy_get_shard_id(T_SHARD_STMT *stmt_p, void **argv, T_SHARD_KEY_RANGE **range_p_out)
void proxy_event_free(T_PROXY_EVENT *event_p)
int proxy_io_make_shard_info(char *driver_info, char **buffer)
int fn_proxy_client_check_cas(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p, int argc, char **argv)
T_SHARD_QUEUE waitq
void proxy_event_set_buffer(T_PROXY_EVENT *event_p, char *data, unsigned int size)
int fn_proxy_cas_check_cas(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p)
int proxy_send_request_to_cas(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p, int func_code)
char * proxy_str_cas_io(T_CAS_IO *cas_io_p)
INT64 num_no_hint_queries_requested
Definition: broker_shm.h:432
char ignore_shard_hint
Definition: broker_shm.h:483
#define PROXY_DEBUG_LOG(fmt, args...)
#define CAS_PROTO_MAKE_VER(VER)
Definition: cas_protocol.h:284
int argc
Definition: dynamic_load.c:951
INT64 integer
Definition: shard_parser.h:114
unsigned int htonl(unsigned int from)
T_CLIENT_IO * proxy_client_io_find_by_ctx(int client_id, int ctx_cid, unsigned int ctx_uid)
INT64 num_hint_id_queries_processed
Definition: broker_shm.h:497
int fn_proxy_client_get_db_parameter(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p, int argc, char **argv)
int proxy_cas_io_write(T_CAS_IO *cas_io_p, T_PROXY_EVENT *event_p)
int fn_proxy_cas_execute_array(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p)
static int proxy_send_prepared_stmt_to_client(T_PROXY_CONTEXT *ctx_p, T_SHARD_STMT *stmt_p)
char database_user[SRV_CON_DBUSER_SIZE]
#define IS_VALID_CAS_FC(fc)
Definition: cas_protocol.h:274
int get_msg_length(char *buffer)
int fn_proxy_cas_prepare(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p)
static void proxy_set_wait_timeout(T_PROXY_CONTEXT *ctx_p, int query_timeout)
T_PROXY_EVENT * proxy_event_dup(T_PROXY_EVENT *event_p)
int shard_stmt_add_srv_h_id_for_shard_cas(int stmt_h_id, int shard_id, int cas_id, int srv_h_id)
SP_VALUE value
Definition: shard_parser.h:126
int shard_queue_enqueue(T_SHARD_QUEUE *q, void *v)
#define EXIT_FUNC()
#define PROXY_EVENT_FROM_CAS
T_SHARD_KEY_RANGE * shard_metadata_find_shard_range(T_SHM_SHARD_KEY *shm_key_p, const char *key, unsigned int hash_res)
int fn_proxy_client_prepare_and_execute(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p, int argc, char **argv)
T_PROXY_EVENT * proxy_event_new_with_rsp_ex(char *driver_info, unsigned int type, int from, T_PROXY_EVENT_FUNC_EX resp_func, void *argv)
char * proxy_get_driver_info_by_ctx(T_PROXY_CONTEXT *ctx_p)
T_IO_BUFFER buffer
unsigned int index
int shard_queue_ordered_enqueue(T_SHARD_QUEUE *q, void *v, SHARD_COMP_FN comp_fn)
int fn_proxy_client_cursor(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p, int argc, char **argv)
SP_HINT_TYPE hint_type
Definition: shard_parser.h:122
T_SHM_SHARD_CONN_STAT * shard_shm_get_shard_stat(T_PROXY_INFO *proxy_info_p, int idx)
Definition: shard_shm.c:463
int proxy_io_make_get_db_version(char *driver_info, char **buffer)
static int proxy_send_request_to_cas_with_new_event(T_PROXY_CONTEXT *ctx_p, unsigned int type, int from, T_PROXY_EVENT_FUNC req_func)
#define CAS_MAKE_VER(MAJOR, MINOR, PATCH)
Definition: cas_protocol.h:304
void proxy_context_set_error(T_PROXY_CONTEXT *ctx_p, int error_ind, int error_code)
T_PROXY_INFO * proxy_info_p
Definition: shard_proxy.c:48
void net_arg_get_int(int *value, void *arg)
Definition: cas_net_buf.c:484
T_SHARD_STMT * shard_stmt_new_schema_info(int ctx_cid, unsigned int ctx_uid)
int proxy_send_response_to_client(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p)
int fn_proxy_get_shard_info(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p, int argc, char **argv)
char * shard_str_sqls(char *sql)
SP_PARSER_HINT * sp_get_first_hint(SP_PARSER_CTX *parser_p)
Definition: shard_parser.c:155
#define ENTER_FUNC()
T_PROXY_EVENT * proxy_event_new_with_error(char *driver_info, unsigned int type, int from, int(*err_func)(char *driver_info, char **buffer, int error_ind, int error_code, const char *error_msg, char is_in_tran), int error_ind, int error_code, const char *error_msg, char is_in_tran)
void proxy_context_free_stmt(T_PROXY_CONTEXT *ctx_p)
T_SHM_SHARD_KEY_RANGE_STAT stat[SHARD_KEY_RANGE_MAX]
Definition: broker_shm.h:450
T_SHARD_STMT * prepared_stmt
void * reply_buffer
void proxy_context_set_in_tran(T_PROXY_CONTEXT *ctx_p, int shard_id, int cas_id)
bool proxy_handler_is_cas_in_tran(int shard_id, int cas_id)
#define CAS_MAKE_PROTO_VER(DRIVER_INFO)
Definition: cas_protocol.h:307
T_SHARD_STMT * shard_stmt_new_prepared_stmt(char *sql_stmt, int ctx_cid, unsigned int ctx_uid, T_BROKER_VERSION client_version)
int proxy_check_cas_error(char *read_msg)
int proxy_convert_error_code(int error_ind, int error_code, char *driver_info, T_BROKER_VERSION client_version, bool to_new)
char * proxy_str_event(T_PROXY_EVENT *event_p)
#define assert(x)
void proxy_unset_force_out_tran(char *msg)
T_SHM_SHARD_KEY * shm_key_p
Definition: shard_proxy.c:51
INT64 num_hint_all_queries_processed
Definition: broker_shm.h:498
SP_VALUE_TYPE type
Definition: shard_parser.h:106
int fn_proxy_client_execute(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p, int argc, char **argv)
int fn_proxy_cas_conn_error(T_PROXY_CONTEXT *ctx_p)
int fn_proxy_client_conn_error(T_PROXY_CONTEXT *ctx_p)
INT64 num_hint_err_queries_processed
Definition: broker_shm.h:499
SP_PARSER_CTX * parser
int shard_stmt_get_hint_type(T_SHARD_STMT *stmt_p)
void proxy_waiter_free(T_WAIT_CONTEXT *waiter)
static T_SHARD_KEY_RANGE * proxy_get_range_by_param(SP_PARSER_HINT *hint_p, void **argv)
void net_arg_get_short(short *value, void *arg)
Definition: cas_net_buf.c:495
static int proxy_cas_execute_internal(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p)
int num_shard_conn
Definition: broker_shm.h:514
int isolation_level
Definition: broker_shm.h:406
int proxy_waiter_comp_fn(const void *arg1, const void *arg2)
INT64 stmt_waiter_count
Definition: broker_shm.h:480
#define CAS_USE_DEFAULT_DB_PARAM
Definition: cas_common.h:43
void net_arg_get_str(char **value, int *size, void *arg)
Definition: cas_net_buf.c:528
unsigned int ctx_uid
int make_header_info(T_NET_BUF *net_buf, MSG_HEADER *client_msg_header)
T_SHARD_STMT * shard_stmt_new_exclusive(char *sql_stmt, int ctx_cid, unsigned int ctx_uid, T_BROKER_VERSION client_version)
int shard_stmt_set_hint_list(T_SHARD_STMT *stmt_p)
void * request_buffer
char appl_server
Definition: broker_shm.h:470
#define CAS_ERROR_INDICATOR
Definition: cas.h:39
T_SHARD_U_TYPE
Definition: shard_key.h:45
void shard_stmt_set_status_invalid(int stmt_h_id)
bool sp_is_hint_static(SP_PARSER_CTX *parser_p)
Definition: shard_parser.c:131
int proxy_io_make_ex_get_lock_timeout(char *driver_info, char **buffer, void *argv)
#define NULL
Definition: freelistheap.h:34
char * shard_str_stmt(T_SHARD_STMT *stmt_p)
T_CLIENT_INFO * shard_shm_get_client_info(T_PROXY_INFO *proxy_info_p, int idx)
Definition: shard_shm.c:448
int shard_stmt_save_prepare_request(T_SHARD_STMT *stmt_p, bool has_shard_val_hint, char **prepare_req, int *prepare_req_len, char *argv_sql_stmt, char *argv_remainder, char *orgzd_sql)
T_PROXY_CONTEXT proxy_Context
void net_arg_get_bigint(DB_BIGINT *value, void *arg)
Definition: cas_net_buf.c:472
static int proxy_client_execute_internal(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p, int argc, char **argv, char _func_code, int query_timeout, int bind_value_index)
#define FREE_MEM(PTR)
Definition: cas_common.h:58
#define SHARD_TEMPORARY_UNAVAILABLE
void net_arg_get_size(int *size, void *arg)
Definition: cas_net_buf.c:451
INT64 num_hint_none_queries_processed
Definition: broker_shm.h:495
char * proxy_dup_msg(char *msg)
int fn_proxy_client_prepare(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p, int argc, char **argv)
int proxy_find_shard_id_by_hint_value(SP_VALUE *value_p, const char *key_column)
INT64 num_hint_key_queries_requested
Definition: broker_shm.h:430
int proxy_io_make_error_msg(char *driver_info, char **buffer, int error_ind, int error_code, const char *error_msg, char is_in_tran)
#define PROXY_CONV_ERR_TO_NEW
T_CAS_IO * proxy_cas_alloc_by_ctx(int client_id, int shard_id, int cas_id, int ctx_cid, unsigned int ctx_uid, int timeout, int func_code)
int fn_proxy_cas_relay_only(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p)
int make_net_buf(T_NET_BUF *net_buf, int size)
int proxy_io_make_set_db_parameter_ok(char *driver_info, char **buffer)
time_t res_time
Definition: broker_shm.h:403
T_SHARD_STMT * shard_stmt_find_by_sql(char *sql_stmt, const char *db_user, T_BROKER_VERSION client_version)
char * shard_stmt_rewrite_sql(bool *has_shard_val_hint, char *sql_stmt, char appl_server)
static void error(const char *msg)
Definition: gencat.c:331
void net_arg_put_int(void *arg, int *value)
Definition: cas_net_buf.c:778
static void proxy_update_shard_stats_without_hint(int shard_id)
INT64 num_request_stmt_in_pool
Definition: broker_shm.h:504
int fn_proxy_client_cursor_close(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p, int argc, char **argv)
int fn_proxy_cas_fetch(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p)
static void proxy_update_shard_stats(T_SHARD_STMT *stmt_p, T_SHARD_KEY_RANGE *range_p)
T_SHARD_KEY shard_key[MAX_SHARD_KEY]
Definition: broker_shm.h:270
INT64 num_request_stmt
Definition: broker_shm.h:503
int proxy_client_io_write(T_CLIENT_IO *cli_io_p, T_PROXY_EVENT *event_p)
int fn_proxy_client_schema_info(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p, int argc, char **argv)
void shard_stmt_check_waiter_and_wakeup(T_SHARD_STMT *stmt_p)
char * proxy_str_context(T_PROXY_CONTEXT *ctx_p)
int proxy_io_make_check_cas_ok(char *driver_info, char **buffer)
int proxy_io_make_ex_get_isolation_level(char *driver_info, char **buffer, void *argv)
INT64 num_hint_id_queries_requested
Definition: broker_shm.h:431
int fn_proxy_cas_execute(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p)
#define MAX_SHARD_CONN
Definition: broker_shm.h:145
T_SHM_SHARD_KEY_STAT * shard_shm_get_key_stat(T_PROXY_INFO *proxy_info_p, int idx)
Definition: shard_shm.c:478
const char ** argv
Definition: dynamic_load.c:952
int fn_proxy_cas_prepare_and_execute(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p)
T_PROXY_EVENT * waiting_event
#define PROXY_LOG(level, fmt, args...)
char key_column[SHARD_KEY_COLUMN_LEN]
Definition: broker_shm.h:261
int proxy_io_make_client_dbinfo_ok(char *driver_info, char **buffer)
int T_BROKER_VERSION
Definition: cas_protocol.h:342
int query_timeout
Definition: cas.c:160
void shard_stmt_free(T_SHARD_STMT *stmt_p)
#define FALSE
Definition: broker_admin.c:50
#define DOES_CLIENT_UNDERSTAND_THE_PROTOCOL(CLIENT, REQUIRE)
Definition: cas_protocol.h:290
T_PROXY_EVENT * proxy_event_new_with_rsp(char *driver_info, unsigned int type, int from, T_PROXY_EVENT_FUNC resp_func)
int fn_proxy_client_fetch(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p, int argc, char **argv)
int i
Definition: dynamic_load.c:954
int proxy_io_make_end_tran_ok(char *driver_info, char **buffer)
int proxy_get_cas_error_code(char *read_msg, T_BROKER_VERSION client_version)
int fn_proxy_cas_end_tran(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p)
int fn_proxy_client_execute_array(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p, int argc, char **argv)
#define IS_VALID_ISOLATION_LEVEL(isolation_level)
Definition: dbtran_def.h:54
#define NET_SIZE_INT
Definition: cas_network.h:43
T_CONTEXT_STMT * proxy_context_find_stmt(T_PROXY_CONTEXT *ctx_p, int stmt_h_id)
unsigned int ntohl(unsigned int from)
int shard_stmt_find_srv_h_id_for_shard_cas(int stmt_h_id, int shard_id, int cas_id)
T_SHARD_STMT * shard_stmt_find_by_stmt_h_id(int stmt_h_id)
int proxy_send_response_to_client_with_new_event(T_PROXY_CONTEXT *ctx_p, unsigned int type, int from, T_PROXY_EVENT_FUNC resp_func)
void proxy_set_con_status_in_tran(char *msg)
INT64 num_hint_key_queries_processed
Definition: broker_shm.h:496
SP_BIND_TYPE bind_type
Definition: shard_parser.h:123
#define PROXY_INVALID_SHARD
Definition: broker_util.h:38
int proxy_io_make_end_tran_abort(char *driver_info, char **buffer)
int fn_proxy_client_get_db_version(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p, int argc, char **argv)
T_PROXY_EVENT * proxy_event_new(unsigned int type, int from_cas)
SP_PARSER_HINT * sp_get_next_hint(SP_PARSER_HINT *hint_p)
Definition: shard_parser.c:161
int fn_proxy_cas_schema_info(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p)
static bool proxy_is_invalid_statement(int error_ind, int error_code, char *driver_info, T_BROKER_VERSION client_version)
#define PROXY_EVENT_FROM_CLIENT
int proxy_io_make_close_req_handle_out_tran_ok(char *driver_info, char **buffer)
int fn_proxy_client_end_tran(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p, int argc, char **argv)
int fn_proxy_client_close_req_handle(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p, int argc, char **argv)
int proxy_io_make_cursor_close_out_tran_ok(char *driver_info, char **buffer)
void shard_stmt_del_srv_h_id_for_shard_cas(int stmt_h_id, int shard_id, int cas_id)
char * proxy_str_client_io(T_CLIENT_IO *cli_io_p)
#define net_arg_get_char(value, arg)
Definition: cas_net_buf.h:146
T_PROXY_HANDLER proxy_Handler
int proxy_io_make_con_close_ok(char *driver_info, char **buffer)
static bool proxy_has_different_column_info(const char *r1, size_t r1_len, const char *r2, size_t r2_len)
T_WAIT_CONTEXT * proxy_waiter_new(int ctx_cid, unsigned int ctx_uid, int timeout)
#define SHARD_STMT_INVALID_HANDLE_ID