CUBRID Engine  latest
shard_proxy_handler.c
Go to the documentation of this file.
1 /*
2  * Copyright 2008 Search Solution Corporation
3  * Copyright 2016 CUBRID Corporation
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 
20 /*
21  * shard_proxy_handler.c
22  */
23 
24 #ident "$Id$"
25 
26 
27 #include <assert.h>
28 
29 #if defined(WINDOWS)
30 #include "porting.h"
31 #endif /* WINDOWS */
32 
33 #include "broker_config.h"
34 #include "broker_shm.h"
35 #include "shard_proxy_common.h"
36 #include "cas_protocol.h"
37 #include "shard_shm.h"
38 
39 #include "shard_proxy_handler.h"
40 #include "shard_proxy_io.h"
41 #include "shard_proxy_queue.h"
42 #include "shard_parser.h"
43 #include "shard_proxy_function.h"
44 
45 #define PROXY_MAX_IGNORE_TIMER_CHECK 10
46 #define PROXY_TIMER_CHECK_INTERVAL 1 /* sec */
47 
49 extern T_SHM_PROXY *shm_proxy_p;
51 
53 
56 
58 static int proxy_handler_process_cas_error (T_PROXY_EVENT * event_p);
60 static void proxy_handler_process_cas_event (T_PROXY_EVENT * event_p);
66 
67 static void proxy_context_clear (T_PROXY_CONTEXT * ctx_p);
68 static void proxy_context_free_client (T_PROXY_CONTEXT * ctx_p);
69 static void proxy_context_free_shard (T_PROXY_CONTEXT * ctx_p);
70 
71 static int proxy_context_initialize (void);
72 static void proxy_context_destroy (void);
73 
75  fn_proxy_client_end_tran, /* fn_end_tran */
76  fn_proxy_client_prepare, /* fn_prepare */
77  fn_proxy_client_execute, /* fn_execute */
78  fn_proxy_client_get_db_parameter, /* fn_get_db_parameter */
79  fn_proxy_client_set_db_parameter, /* fn_set_db_parameter */
80  fn_proxy_client_close_req_handle, /* fn_close_req_handle */
81  fn_proxy_client_cursor, /* fn_cursor */
82  fn_proxy_client_fetch, /* fn_fetch */
83  fn_proxy_client_schema_info, /* fn_schema_info */
84  fn_proxy_client_not_supported, /* fn_oid_get */
85  fn_proxy_client_not_supported, /* fn_oid_put */
86  fn_proxy_client_not_supported, /* fn_deprecated */
87  fn_proxy_client_not_supported, /* fn_deprecated */
88  fn_proxy_client_not_supported, /* fn_deprecated */
89  fn_proxy_client_get_db_version, /* fn_get_db_version */
90  fn_proxy_client_not_supported, /* fn_get_class_num_objs */
91  fn_proxy_client_not_supported, /* fn_oid */
92  fn_proxy_client_not_supported, /* fn_collection */
93  fn_proxy_client_not_supported, /* fn_next_result */
94  fn_proxy_client_not_supported, /* fn_execute_batch */
95  fn_proxy_client_execute_array, /* fn_execute_array */
96  fn_proxy_client_not_supported, /* fn_cursor_update */
97  fn_proxy_client_not_supported, /* fn_get_attr_type_str */
98  fn_proxy_client_not_supported, /* fn_get_query_info */
99  fn_proxy_client_not_supported, /* fn_deprecated */
100  fn_proxy_client_not_supported, /* fn_savepoint */
101  fn_proxy_client_not_supported, /* fn_parameter_info */
102  fn_proxy_client_not_supported, /* fn_xa_prepare */
103  fn_proxy_client_not_supported, /* fn_xa_recover */
104  fn_proxy_client_not_supported, /* fn_xa_end_tran */
105  fn_proxy_client_con_close, /* fn_con_close */
106  fn_proxy_client_check_cas, /* fn_check_cas */
107  fn_proxy_client_not_supported, /* fn_make_out_rs */
108  fn_proxy_client_not_supported, /* fn_get_generated_keys */
109  fn_proxy_client_not_supported, /* fn_lob_new */
110  fn_proxy_client_not_supported, /* fn_lob_write */
111  fn_proxy_client_not_supported, /* fn_lob_read */
112  fn_proxy_client_not_supported, /* fn_end_session */
113  fn_proxy_client_not_supported, /* fn_get_row_count */
114  fn_proxy_client_not_supported, /* fn_get_last_insert_id */
115  fn_proxy_client_prepare_and_execute, /* fn_prepare_and_execute */
116  fn_proxy_client_cursor_close, /* fn_cursor_close */
117  fn_proxy_get_shard_info /* fn_get_shard_info */
118 };
119 
120 
122  fn_proxy_cas_end_tran, /* fn_end_tran */
123  fn_proxy_cas_prepare, /* fn_prepare */
124  fn_proxy_cas_execute, /* fn_execute */
125  fn_proxy_cas_relay_only, /* fn_get_db_parameter */
126  fn_proxy_cas_relay_only, /* fn_set_db_parameter */
127  fn_proxy_cas_relay_only, /* fn_close_req_handle */
128  fn_proxy_cas_relay_only, /* fn_cursor */
129  fn_proxy_cas_fetch, /* fn_fetch */
130  fn_proxy_cas_schema_info, /* fn_schema_info */
131  fn_proxy_cas_relay_only, /* fn_oid_get */
132  fn_proxy_cas_relay_only, /* fn_oid_put */
133  fn_proxy_cas_relay_only, /* fn_deprecated */
134  fn_proxy_cas_relay_only, /* fn_deprecated */
135  fn_proxy_cas_relay_only, /* fn_deprecated */
136  fn_proxy_cas_relay_only, /* fn_get_db_version */
137  fn_proxy_cas_relay_only, /* fn_get_class_num_objs */
138  fn_proxy_cas_relay_only, /* fn_oid */
139  fn_proxy_cas_relay_only, /* fn_collection */
140  fn_proxy_cas_relay_only, /* fn_next_result */
141  fn_proxy_cas_relay_only, /* fn_execute_batch */
142  fn_proxy_cas_execute_array, /* fn_execute_array */
143  fn_proxy_cas_relay_only, /* fn_cursor_update */
144  fn_proxy_cas_relay_only, /* fn_get_attr_type_str */
145  fn_proxy_cas_relay_only, /* fn_get_query_info */
146  fn_proxy_cas_relay_only, /* fn_deprecated */
147  fn_proxy_cas_relay_only, /* fn_savepoint */
148  fn_proxy_cas_relay_only, /* fn_parameter_info */
149  fn_proxy_cas_relay_only, /* fn_xa_prepare */
150  fn_proxy_cas_relay_only, /* fn_xa_recover */
151  fn_proxy_cas_relay_only, /* fn_xa_end_tran */
152  fn_proxy_cas_relay_only, /* fn_con_close */
153  fn_proxy_cas_check_cas, /* fn_check_cas */
154  fn_proxy_cas_relay_only, /* fn_make_out_rs */
155  fn_proxy_cas_relay_only, /* fn_get_generated_keys */
156  fn_proxy_cas_relay_only, /* fn_lob_new */
157  fn_proxy_cas_relay_only, /* fn_lob_write */
158  fn_proxy_cas_relay_only, /* fn_lob_read */
159  fn_proxy_cas_relay_only, /* fn_end_session */
160  fn_proxy_cas_relay_only, /* fn_get_row_count */
161  fn_proxy_cas_relay_only, /* fn_get_last_insert_id */
162  fn_proxy_cas_prepare_and_execute, /* fn_prepare_and_execute */
163  fn_proxy_cas_relay_only, /* fn_cursor_close */
164  fn_proxy_cas_relay_only /* fn_get_shard_info */
165 };
166 
167 
169 proxy_waiter_new (int ctx_cid, unsigned int ctx_uid, int timeout)
170 {
171  T_WAIT_CONTEXT *n;
172 
173  n = (T_WAIT_CONTEXT *) malloc (sizeof (T_WAIT_CONTEXT));
174  if (n)
175  {
176  n->ctx_cid = ctx_cid;
177  n->ctx_uid = ctx_uid;
178  if (timeout <= 0)
179  {
180  n->expire_time = INT_MAX;
181  }
182  else
183  {
184  n->expire_time = time (NULL) + timeout;
185  }
186  }
187  return n;
188 }
189 
190 void
192 {
193  assert (waiter);
194 
195  FREE_MEM (waiter);
196 }
197 
198 void
199 proxy_waiter_timeout (T_SHARD_QUEUE * waitq, INT64 * counter, int now)
200 {
201  T_PROXY_CONTEXT *ctx_p;
202  T_WAIT_CONTEXT *waiter_p;
203 
204  while (1)
205  {
206  waiter_p = (T_WAIT_CONTEXT *) shard_queue_peek_value (waitq);
207  if (waiter_p == NULL)
208  {
209  break;
210  }
211 
212  if (waiter_p->expire_time >= now)
213  {
214  break;
215  }
216 
217  waiter_p = (T_WAIT_CONTEXT *) shard_queue_dequeue (waitq);
218  assert (waiter_p != NULL);
219 
220  if (counter != NULL && *counter > 0)
221  {
222  (*counter)--;
223  PROXY_DEBUG_LOG ("Waiter timeout. (counter:%d).", *counter);
224  }
225 
226  ctx_p = proxy_context_find (waiter_p->ctx_cid, waiter_p->ctx_uid);
227  if (ctx_p == NULL)
228  {
229  /* context was freed already */
230  proxy_waiter_free (waiter_p);
231  waiter_p = NULL;
232  continue;
233  }
234 
235  proxy_context_timeout (ctx_p);
236 
237  proxy_waiter_free (waiter_p);
238  waiter_p = NULL;
239  }
240 
241  return;
242 }
243 
244 int
245 proxy_waiter_comp_fn (const void *arg1, const void *arg2)
246 {
247  T_WAIT_CONTEXT *l, *r;
248 
249  l = (T_WAIT_CONTEXT *) arg1;
250  r = (T_WAIT_CONTEXT *) arg2;
251 
252  return (l->expire_time - r->expire_time);
253 }
254 
255 bool
256 proxy_handler_is_cas_in_tran (int shard_id, int cas_id)
257 {
259 
260  assert (shard_id >= 0);
261  assert (cas_id >= 0);
262 
263  as_info = shard_shm_get_as_info (proxy_info_p, shm_as_p, shard_id, cas_id);
264  if (as_info)
265  {
266  if (as_info->con_status == CON_STATUS_IN_TRAN || as_info->num_holdable_results > 0
267  || as_info->cas_change_mode == CAS_CHANGE_MODE_KEEP)
268  {
269  return true;
270  }
271  else
272  {
273  return false;
274  }
275  }
276 
277  return false;
278 }
279 
280 void
281 proxy_context_set_error (T_PROXY_CONTEXT * ctx_p, int error_ind, int error_code)
282 {
283  assert (ctx_p);
284 
285  assert (ctx_p->error_ind == CAS_NO_ERROR);
286  assert (ctx_p->error_code == CAS_NO_ERROR);
287  assert (ctx_p->error_msg[0] == '\0');
288 
289  ctx_p->error_ind = error_ind;
290  ctx_p->error_code = error_code;
291  ctx_p->error_msg[0] = '\0';
292 
293  return;
294 }
295 
296 void
297 proxy_context_set_error_with_msg (T_PROXY_CONTEXT * ctx_p, int error_ind, int error_code, const char *error_msg)
298 {
299  proxy_context_set_error (ctx_p, error_ind, error_code);
300  snprintf (ctx_p->error_msg, sizeof (ctx_p->error_msg), error_msg);
301 
302  return;
303 }
304 
305 void
307 {
308  assert (ctx_p);
309 
310  ctx_p->error_ind = CAS_NO_ERROR;
311  ctx_p->error_code = CAS_NO_ERROR;
312  ctx_p->error_msg[0] = '\0';
313 
314  return;
315 }
316 
317 int
319 {
320  int error;
321  T_PROXY_EVENT *event_p;
322  T_CLIENT_INFO *client_info_p = NULL;
323  char *driver_info;
324 
325  ENTER_FUNC ();
326 
327  assert (ctx_p->error_ind != CAS_NO_ERROR);
328  assert (ctx_p->error_code != CAS_NO_ERROR);
329 
330  proxy_info_p->num_proxy_error_processed++;
331 
332  /* reset request and response timeout */
333  client_info_p = shard_shm_get_client_info (proxy_info_p, ctx_p->client_id);
334  if (client_info_p != NULL)
335  {
336  shard_shm_init_client_info_request (client_info_p);
337  }
338 
339  driver_info = proxy_get_driver_info_by_ctx (ctx_p);
340 
341  event_p =
343  ctx_p->error_ind, ctx_p->error_code, ctx_p->error_msg, ctx_p->is_client_in_tran);
344  if (event_p == NULL)
345  {
346  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to make error message. " "context(%s).", proxy_str_context (ctx_p));
347  goto error_return;
348  }
349 
350  error = proxy_send_response_to_client (ctx_p, event_p);
351  if (error)
352  {
353  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to send response " "to the client. (error:%d). context(%s). event(%s). ",
354  error, proxy_str_context (ctx_p), proxy_str_event (event_p));
355  goto error_return;
356  }
357  event_p = NULL;
358 
359  EXIT_FUNC ();
360  return 0;
361 
362 error_return:
363 
364  if (event_p)
365  {
366  proxy_event_free (event_p);
367  event_p = NULL;
368  }
369 
370  ctx_p->free_context = true;
371  return -1;
372 }
373 
374 static void
376 {
377  int error = NO_ERROR;
378  int error_ind;
379  char *response_p;
380  T_PROXY_CONTEXT *ctx_p = NULL;
381  T_CLIENT_INFO *client_info_p;
382  char func_code;
383 
384  T_PROXY_CAS_FUNC proxy_cas_fn;
385 
386  ENTER_FUNC ();
387 
388  assert (event_p->from_cas == PROXY_EVENT_FROM_CAS);
389 
390  response_p = event_p->buffer.data;
391  assert (response_p);
392 
393  ctx_p = proxy_context_find (event_p->cid, event_p->uid);
394  if (ctx_p == NULL)
395  {
396  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unable to find context. event(%s).", proxy_str_event (event_p));
397 
398  proxy_event_free (event_p);
399  event_p = NULL;
400 
401  EXIT_FUNC ();
402  return;
403  }
404 
405  client_info_p = shard_shm_get_client_info (proxy_info_p, ctx_p->client_id);
406 
407  error_ind = proxy_check_cas_error (response_p);
408  if (error_ind < 0)
409  {
410  error = proxy_handler_process_cas_error (event_p);
411  if (error)
412  {
413  proxy_event_free (event_p);
414  event_p = NULL;
415  goto end;
416  }
417  }
418 
419  func_code = ctx_p->func_code;
420  if (func_code <= 0 || func_code >= CAS_FC_MAX)
421  {
422  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unsupported function code. " "(func_code:%d). context(%s).", func_code,
423  proxy_str_context (ctx_p));
424  /*
425  * 1*) drop unexpected messages from cas ?
426  * 2) free context ?
427  */
428  proxy_event_free (event_p);
429  goto end;
430  }
431 
432  PROXY_DEBUG_LOG ("process cas response. (func_code:%d, context:%s)", func_code, proxy_str_context (ctx_p));
433 
434  proxy_cas_fn = proxy_cas_fn_table[func_code - 1];
435  error = proxy_cas_fn (ctx_p, event_p);
436  if (error)
437  {
438  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Error returned. (CAS function, func_code:%d, error:%d).", func_code, error);
439 
440  event_p = NULL;
441  goto end;
442  }
443  event_p = NULL;
444 
446  ctx_p->is_in_tran = ctx_p->is_cas_in_tran;
447  if (ctx_p->is_in_tran == false)
448  {
449  /* release shard/cas */
450  proxy_cas_release_by_ctx (ctx_p->shard_id, ctx_p->cas_id, ctx_p->cid, ctx_p->uid);
451 
453  ctx_p->prepared_stmt = NULL;
455 
456  if (ctx_p->dont_free_statement == false)
457  {
458  proxy_context_free_stmt (ctx_p);
459  }
460  }
461 
462  PROXY_DEBUG_LOG ("process cas response end. (func_code:%d, context:%s)", func_code, proxy_str_context (ctx_p));
463 
464 end:
466  ctx_p->wait_timeout = proxy_info_p->wait_timeout;
467 
468  if (ctx_p->free_context)
469  {
470  proxy_context_free (ctx_p);
471  }
472 
473  shard_shm_set_client_info_response (client_info_p);
474 
475  EXIT_FUNC ();
476  return;
477 }
478 
479 static int
481 {
482  T_PROXY_CONTEXT *ctx_p;
483 
484  ENTER_FUNC ();
485 
486  assert (event_p);
487 
488  ctx_p = proxy_context_find (event_p->cid, event_p->uid);
489  if (ctx_p == NULL)
490  {
491  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unable to find context. event(%s).", proxy_str_event (event_p));
492 
493  goto end;
494  }
495 
496  ctx_p->wait_timeout = proxy_info_p->wait_timeout;
497  if (ctx_p->is_in_tran)
498  {
499  if (ctx_p->func_code)
500  {
501  /*
502  * we should relay error event to the client
503  * so, must not call proxy_event_free().
504  */
505  return 0;
506  }
507  else
508  {
509  /* unexpected error, free context */
510  /* it may be session timeout error, ... */
511  ctx_p->free_context = true;
512  }
513  }
514  else
515  {
516  /* discard unexpected error */
517  }
518 
519 end:
520  proxy_event_free (event_p);
521 
522  EXIT_FUNC ();
523  return -1;
524 }
525 
526 static void
528 {
529  int error;
530  T_PROXY_CONTEXT *ctx_p;
531 
532  ENTER_FUNC ();
533 
534  assert (event_p->from_cas == PROXY_EVENT_FROM_CAS);
535 
536  ctx_p = proxy_context_find (event_p->cid, event_p->uid);
537  if (ctx_p == NULL)
538  {
539  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unable to find context. event(%s).", proxy_str_event (event_p));
540  goto end;
541  }
542 
543  PROXY_DEBUG_LOG ("Process CAS connection error. context(%s).", proxy_str_context (ctx_p));
544 
545  if (ctx_p->is_in_tran)
546  {
547  if (ctx_p->waiting_event && ctx_p->is_cas_in_tran == false
548  && (ctx_p->func_code == CAS_FC_PREPARE || ctx_p->func_code == CAS_FC_EXECUTE
550  {
551  PROXY_DEBUG_LOG ("Context is in_tran status " "and waiting prepare/execute response. "
552  "retransmit reqeust to the CAS. context(%s).", proxy_str_context (ctx_p));
553  /* release this shard/cas */
554  proxy_context_free_shard (ctx_p);
555 
556  /* set transaction status 'out_tran' */
558 
559  /* retry previous request */
560  error = shard_queue_enqueue (&proxy_Handler.cli_ret_q, (void *) ctx_p->waiting_event);
561  if (error)
562  {
563  assert (false);
564 
565  proxy_context_free (ctx_p);
566  goto end;
567  }
568 
569  /* reset waiting event */
570  ctx_p->waiting_event = NULL;
571  }
572  else
573  {
574  PROXY_DEBUG_LOG ("context is in_tran status. " "and function code %d, " "cas_is_in_ran %s, "
575  "waiting_event %p.", ctx_p->func_code, (ctx_p->is_cas_in_tran) ? "in_tran" : "out_tran",
576  ctx_p->waiting_event);
577 
578  /* TODO : send error to the client */
579  proxy_context_free (ctx_p);
580  }
581  }
582  else
583  {
584  PROXY_DEBUG_LOG ("context is out_tran status.");
585  proxy_context_free (ctx_p);
586  }
587 
588 end:
589  proxy_event_free (event_p);
590  event_p = NULL;
591 
592  EXIT_FUNC ();
593  return;
594 }
595 
596 static void
598 {
599  ENTER_FUNC ();
600 
601  assert (event_p);
602 
603  switch (event_p->type)
604  {
607  break;
608 
611  break;
612 
613  default:
614  assert (false);
615  break;
616  }
617 
618  EXIT_FUNC ();
619  return;
620 }
621 
622 static void
624 {
625  int error = NO_ERROR;
626  char *request_p;
627  char *payload;
628  int length;
629  int argc;
630  char **argv = NULL;
631  char func_code;
632  T_PROXY_CONTEXT *ctx_p;
633  T_CLIENT_INFO *client_info_p;
634  T_BROKER_VERSION client_version;
635  char *driver_info;
636 
637  T_PROXY_CLIENT_FUNC proxy_client_fn;
638 
639  ENTER_FUNC ();
640 
641  assert (event_p);
643 
644  ctx_p = proxy_context_find (event_p->cid, event_p->uid);
645  if (ctx_p == NULL)
646  {
647  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unable to find context. event(%s).", proxy_str_event (event_p));
648 
649  proxy_event_free (event_p);
650 
651  EXIT_FUNC ();
652  return;
653  }
654  client_info_p = shard_shm_get_client_info (proxy_info_p, ctx_p->client_id);
655 
656  request_p = event_p->buffer.data;
657  assert (request_p);
658 
659  payload = request_p + MSG_HEADER_SIZE;
660  length = get_data_length (request_p);
661 
662  argc = net_decode_str (payload, length, &func_code, (void ***) (&argv));
663  if (argc < 0)
664  {
665  error = ER_FAILED;
666 
667  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Invalid argument. (argc:%d). event(%s).", argc, proxy_str_event (event_p));
668 
669  proxy_event_free (event_p);
670  goto end;
671  }
672 
673  if (func_code <= 0 || func_code >= CAS_FC_MAX)
674  {
675  error = ER_FAILED;
676 
677  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unsupported function code. " "(func_code:%d). context(%s).", func_code,
678  proxy_str_context (ctx_p));
679 
680  proxy_event_free (event_p);
681  goto end;
682  }
683 
684  /* SHARD TODO : fix cci/jdbc */
685  proxy_unset_force_out_tran (request_p);
686 
687  driver_info = proxy_get_driver_info_by_ctx (ctx_p);
688  client_version = CAS_MAKE_PROTO_VER (driver_info);
689  if (DOES_CLIENT_MATCH_THE_PROTOCOL (client_version, PROTOCOL_V2))
690  {
691  switch (func_code)
692  {
695  break;
696  case CAS_FC_CURSOR_CLOSE:
698  break;
699  default:
700  break;
701  }
702  }
703 
704  shard_shm_set_client_info_request (client_info_p, func_code);
705 
706  PROXY_LOG (PROXY_LOG_MODE_DEBUG, "process client request. (func_code:%d, context:%s)", func_code,
707  proxy_str_context (ctx_p));
708 
709  proxy_client_fn = proxy_client_fn_table[func_code - 1];
710  error = proxy_client_fn (ctx_p, event_p, argc, argv);
711  if (error)
712  {
713  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Error returned. (client function, func_code:%d, error:%d).", func_code, error);
714 
715  event_p = NULL;
716  goto end;
717  }
718  event_p = NULL;
719 
720 end:
721 
722  if (ctx_p->error_ind)
723  {
724  proxy_context_send_error (ctx_p);
726  }
727 
728  if (ctx_p->free_context)
729  {
730  proxy_context_free (ctx_p);
731  }
732 
733  if (error)
734  {
735  shard_shm_init_client_info_request (client_info_p);
736  }
737 
738  if (argv)
739  {
740  FREE_MEM (argv);
741  }
742 
743  EXIT_FUNC ();
744 
745  return;
746 }
747 
748 static void
750 {
751  int error;
752  T_PROXY_CONTEXT *ctx_p;
753 
754  ENTER_FUNC ();
755 
757 
758  ctx_p = proxy_context_find (event_p->cid, event_p->uid);
759  if (ctx_p == NULL)
760  {
761  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unable to find context. event(%s).", proxy_str_event (event_p));
762 
763  proxy_event_free (event_p);
764  event_p = NULL;
765  EXIT_FUNC ();
766  return;
767  }
768 
769  error = fn_proxy_client_conn_error (ctx_p);
770  if (error)
771  {
772  ;
773  }
774 
775  proxy_event_free (event_p);
776  event_p = NULL;
777 
778  if (ctx_p->free_context)
779  {
780  proxy_context_free (ctx_p);
781  }
782 
783  EXIT_FUNC ();
784  return;
785 }
786 
787 static void
789 {
790  T_PROXY_EVENT *waiting_event;
791  T_PROXY_CONTEXT *ctx_p;
792  T_CLIENT_INFO *client_info_p;
793 
794  ENTER_FUNC ();
795 
797 
798  ctx_p = proxy_context_find (event_p->cid, event_p->uid);
799  if (ctx_p == NULL)
800  {
801  proxy_event_free (event_p);
802  event_p = NULL;
803 
804  EXIT_FUNC ();
805  return;
806  }
807 
808  /* set in_tran, shard/cas */
809  proxy_context_set_in_tran (ctx_p, event_p->shard_id, event_p->cas_id);
810 
811  client_info_p = shard_shm_get_client_info (proxy_info_p, ctx_p->client_id);
812  if (client_info_p == NULL)
813  {
815  "Unable to find cilent info in shared memory. " "(context id:%d, context uid:%d)", ctx_p->cid,
816  ctx_p->uid);
817  }
818  else
819  {
821  (proxy_info_p, shm_as_p, event_p->shard_id, event_p->cas_id, client_info_p->client_ip,
822  client_info_p->driver_info, client_info_p->driver_info) == false)
823  {
824 
825  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unable to find CAS info in shared memory. " "(shard_id:%d, cas_id:%d).",
826  event_p->shard_id, event_p->cas_id);
827  }
828  assert (CAS_MAKE_PROTO_VER (client_info_p->driver_info) != 0);
829  }
830 
831  /* retry */
832  waiting_event = ctx_p->waiting_event;
833  ctx_p->waiting_event = NULL;
834 
835  proxy_handler_process_client_request (waiting_event); /* DO NOT MOVE */
836 
837  proxy_event_free (event_p);
838  event_p = NULL;
839 
840  EXIT_FUNC ();
841  return;
842 }
843 
844 static void
846 {
847  T_PROXY_EVENT *waiting_event;
848  T_PROXY_CONTEXT *ctx_p;
849 
850  ENTER_FUNC ();
851 
853 
854  ctx_p = proxy_context_find (event_p->cid, event_p->uid);
855  if (ctx_p == NULL)
856  {
857  proxy_event_free (event_p);
858  event_p = NULL;
859 
860  EXIT_FUNC ();
861  return;
862  }
863 
864  /* retry */
865  waiting_event = ctx_p->waiting_event;
866  ctx_p->waiting_event = NULL;
867 
868  proxy_handler_process_client_request (waiting_event); /* DO NOT MOVE */
869 
870  proxy_event_free (event_p);
871  event_p = NULL;
872 
873  EXIT_FUNC ();
874  return;
875 }
876 
877 static void
879 {
880  ENTER_FUNC ();
881 
882  assert (event_p);
883 
884  switch (event_p->type)
885  {
888  break;
889 
892  break;
893 
896  break;
897 
900  break;
901 
902  default:
903  assert (false);
904  break;
905  }
906 
907  EXIT_FUNC ();
908  return;
909 }
910 
911 static int
913 {
914  int error;
915  int i, size;
916 
917  T_PROXY_CONTEXT *ctx_p;
918 
919  static bool is_init = false;
920 
921  if (is_init == true)
922  {
923  return 0;
924  }
925 
926  proxy_Context.size = shm_proxy_p->max_context;
927 
928  error = shard_cqueue_initialize (&proxy_Context.freeq, proxy_Context.size);
929  if (error)
930  {
931  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to initialize context free queue. " "(error:%d).", error);
932 
933  goto error_return;
934  }
935 
936  size = proxy_Context.size * sizeof (T_PROXY_CONTEXT);
937  proxy_Context.ent = (T_PROXY_CONTEXT *) malloc (size);
938  if (proxy_Context.ent == NULL)
939  {
941  "Not enough virtual memory. " "Failed to alloc context entries. " "(errno:%d, size:%d).", errno, size);
942  goto error_return;
943  }
944  memset (proxy_Context.ent, 0, size);
945 
946  for (i = 0; i < proxy_Context.size; i++)
947  {
948  ctx_p = &(proxy_Context.ent[i]);
949 
950  proxy_context_clear (ctx_p);
951  ctx_p->cid = i;
952 
953  error = shard_cqueue_enqueue (&proxy_Context.freeq, (void *) ctx_p);
954  if (error)
955  {
956  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to initialize context free queue entries." "(error:%d).", error);
957  goto error_return;
958  }
959  }
960 
961  is_init = true;
962  return 0;
963 
964 error_return:
965  return -1;
966 }
967 
968 static void
970 {
971  shard_cqueue_destroy (&proxy_Context.freeq);
972 
973  FREE_MEM (proxy_Context.ent);
974 
975  return;
976 }
977 
978 #if defined(PROXY_VERBOSE_DEBUG)
979 void
980 proxy_context_dump_stmt (FILE * fp, T_PROXY_CONTEXT * ctx_p)
981 {
982  T_CONTEXT_STMT *stmt_list_p;
983 
984  fprintf (fp, "* STMT_LIST: ");
985  for (stmt_list_p = ctx_p->stmt_list; stmt_list_p; stmt_list_p = stmt_list_p->next)
986  {
987  fprintf (fp, "%-10d ", stmt_list_p->stmt_h_id);
988  }
989  fprintf (fp, "\n");
990 
991  return;
992 }
993 
994 void
995 proxy_context_dump_title (FILE * fp)
996 {
997  fprintf (fp,
998  "%-5s %-10s %-10s %-5s %-5s " "%-5s %-5s %-15s " "%-15s %-15s %-15s " "%-10s %-10s %-10s " "%-10s %-10s \n",
999  "CID", "UID", "CLIENT", "SHARD", "CAS", "BUSY", "IN_TRAN", "PREPARE_FOR_EXEC", "FREE_ON_END_TRAN",
1000  "FREE_CONTEXT", "CLIENT_END_TRAN", "FUNC_CODE", "STMT_H_ID", "STMT_HINT_TYPE", "ERROR_IND", "ERROR_CODE");
1001 
1002  return;
1003 }
1004 
1005 
1006 void
1007 proxy_context_dump (FILE * fp, T_PROXY_CONTEXT * ctx_p)
1008 {
1009  fprintf (fp,
1010  "%-5d %-10u %-10d %-5d %-5d " "%-5s %-5s %-15s " "%-15s %-15s %-15s %-15s " "%-10d %-10d %-10d "
1011  "%-10d %-10d \n", ctx_p->cid, ctx_p->uid, ctx_p->client_id, ctx_p->shard_id, ctx_p->cas_id,
1012  (ctx_p->is_busy) ? "YES" : "NO", (ctx_p->is_in_tran) ? "YES" : "NO",
1013  (ctx_p->is_prepare_for_execute) ? "YES" : "NO", (ctx_p->free_on_end_tran) ? "YES" : "NO",
1014  (ctx_p->free_on_client_io_write) ? "YES" : "NO", (ctx_p->free_context) ? "YES" : "NO",
1015  (ctx_p->is_client_in_tran) ? "YES" : "NO", ctx_p->func_code, ctx_p->stmt_h_id, ctx_p->stmt_hint_type,
1016  ctx_p->error_ind, ctx_p->error_code);
1017 
1018  proxy_context_dump_stmt (fp, ctx_p);
1019 
1020  return;
1021 }
1022 
1023 void
1024 proxy_context_dump_all (FILE * fp)
1025 {
1026  T_PROXY_CONTEXT *ctx_p;
1027  int i;
1028 
1029  fprintf (fp, "\n");
1030  fprintf (fp, "* %-20s : %d\n", "SIZE", proxy_Context.size);
1031  fprintf (fp, "\n");
1032 
1033  if (proxy_Context.ent == NULL)
1034  {
1035  return;
1036  }
1037 
1038  proxy_context_dump_title (fp);
1039  for (i = 0; i < proxy_Context.size; i++)
1040  {
1041  ctx_p = &(proxy_Context.ent[i]);
1042  proxy_context_dump (fp, ctx_p);
1043  }
1044  return;
1045 }
1046 
1047 void
1048 proxy_context_print (bool print_all)
1049 {
1050  T_PROXY_CONTEXT *ctx_p;
1051  int i;
1052 
1053  PROXY_LOG (PROXY_LOG_MODE_SCHEDULE_DETAIL, "* CONTEXT *");
1054  PROXY_LOG (PROXY_LOG_MODE_SCHEDULE_DETAIL, "%-20s = %d", "size", proxy_Context.size);
1055 
1056  PROXY_LOG (PROXY_LOG_MODE_SCHEDULE_DETAIL, "%-7s %-5s %-10s %-5s %-5s %-8s " "%-10s %-10s %-10s %-10s %-10s",
1057  "idx", "cid", "uid", "busy", "wait", "in_tran", "client_id", "shard_id", "cas_id", "func_code", "stmt_id");
1058  if (proxy_Context.ent)
1059  {
1060  for (i = 0; i < proxy_Context.size; i++)
1061  {
1062  ctx_p = &(proxy_Context.ent[i]);
1063 
1064  if (!print_all && !ctx_p->is_busy)
1065  {
1066  continue;
1067  }
1068  PROXY_LOG (PROXY_LOG_MODE_SCHEDULE_DETAIL, "[%-5d] %-5d %-10u %-5s %-8s " "%-10d %-10d %-10d %-10d %-10d",
1069  i, ctx_p->cid, ctx_p->uid, (ctx_p->is_busy) ? "YES" : "NO", (ctx_p->is_in_tran) ? "YES" : "NO",
1070  ctx_p->client_id, ctx_p->shard_id, ctx_p->cas_id, ctx_p->func_code, ctx_p->stmt_h_id);
1071  }
1072  }
1073 
1074  return;
1075 }
1076 #endif /* PROXY_VERBOSE_DEBUG */
1077 
1078 char *
1080 {
1081  static char buffer[BUFSIZ];
1082 
1083  if (ctx_p == NULL)
1084  {
1085  return (char *) "-";
1086  }
1087 
1088  snprintf (buffer, sizeof (buffer),
1089  "cid:%d, uid:%u, " "is_busy:%s, is_in_tran:%s, " "is_prepare_for_execute:%s, free_on_end_tran:%s, "
1090  "free_on_client_io_write:%s, " "free_context:%s, is_client_in_tran:%s, " "is_cas_in_tran:%s, "
1091  "waiting_event:(%p, %s), " "func_code:%d, stmt_h_id:%d, stmt_hint_type:%d, " "wait_timeout:%d, "
1092  "client_id:%d, shard_id:%d, cas_id:%d, " "error_ind:%d, error_code:%d, error_msg:[%s] ", ctx_p->cid,
1093  ctx_p->uid, (ctx_p->is_busy) ? "Y" : "N", (ctx_p->is_in_tran) ? "Y" : "N",
1094  (ctx_p->is_prepare_for_execute) ? "Y" : "N", (ctx_p->free_on_end_tran) ? "Y" : "N",
1095  (ctx_p->free_on_client_io_write) ? "Y" : "N", (ctx_p->free_context) ? "Y" : "N",
1096  (ctx_p->is_client_in_tran) ? "Y" : "N", (ctx_p->is_cas_in_tran) ? "Y" : "N", ctx_p->waiting_event,
1097  proxy_str_event (ctx_p->waiting_event), ctx_p->func_code, ctx_p->stmt_h_id, ctx_p->stmt_hint_type,
1098  ctx_p->wait_timeout, ctx_p->client_id, ctx_p->shard_id, ctx_p->cas_id, ctx_p->error_ind, ctx_p->error_code,
1099  (ctx_p->error_msg[0]) ? ctx_p->error_msg : "-");
1100 
1101  return (char *) buffer;
1102 }
1103 
1104 
1105 static void
1107 {
1108  assert (ctx_p);
1109 
1110  ctx_p->uid = 0;
1111  ctx_p->is_busy = false;
1112  ctx_p->is_in_tran = false;
1113  ctx_p->is_prepare_for_execute = false;
1114  ctx_p->free_on_end_tran = false;
1115  ctx_p->free_on_client_io_write = false;
1116  ctx_p->free_context = false;
1117  ctx_p->is_client_in_tran = false;
1118  ctx_p->is_cas_in_tran = false;
1119  ctx_p->waiting_dummy_prepare = false;
1120  ctx_p->dont_free_statement = false;
1121  ctx_p->wait_timeout = 0;
1122 
1123  if (ctx_p->waiting_event)
1124  {
1126  ctx_p->waiting_event = NULL;
1127  }
1128 
1131  ctx_p->stmt_hint_type = HT_INVAL;
1132 
1133  if (ctx_p->prepared_stmt != NULL)
1134  {
1136  {
1137  /* check and wakeup statement waiter */
1139 
1140  /* free statement */
1141  shard_stmt_free (ctx_p->prepared_stmt);
1142  }
1143  else if (ctx_p->prepared_stmt->stmt_type != SHARD_STMT_TYPE_PREPARED)
1144  {
1145  /*
1146  * shcema info server handle can't be shared with other context
1147  * so, we can free statement at this time.
1148  */
1149 
1150  shard_stmt_free (ctx_p->prepared_stmt);
1151  }
1152  }
1153  ctx_p->prepared_stmt = NULL;
1154 
1155  ctx_p->is_connected = false;
1156  ctx_p->database_user[0] = '\0';
1157  ctx_p->database_passwd[0] = '\0';
1158 
1159  proxy_context_free_stmt (ctx_p);
1160 
1162  ctx_p->shard_id = PROXY_INVALID_SHARD;
1163  ctx_p->cas_id = PROXY_INVALID_CAS;
1164 
1165  proxy_context_clear_error (ctx_p);
1166  return;
1167 }
1168 
1169 void
1170 proxy_context_set_in_tran (T_PROXY_CONTEXT * ctx_p, int shard_id, int cas_id)
1171 {
1172  assert (ctx_p);
1173  assert (shard_id >= 0);
1174  assert (cas_id >= 0);
1175 
1176  ctx_p->is_in_tran = true;
1177  ctx_p->shard_id = shard_id;
1178  ctx_p->cas_id = cas_id;
1179  ctx_p->dont_free_statement = false;
1180 
1181  return;
1182 }
1183 
1184 void
1186 {
1187  assert (ctx_p);
1188 
1189  ctx_p->is_in_tran = false;
1190  ctx_p->shard_id = PROXY_INVALID_SHARD;
1191  ctx_p->cas_id = PROXY_INVALID_CAS;
1192 
1193  return;
1194 }
1195 
1196 
1199 {
1200  T_PROXY_CONTEXT *ctx_p;
1201  static unsigned int uid = 0;
1202 
1203  ctx_p = (T_PROXY_CONTEXT *) shard_cqueue_dequeue (&proxy_Context.freeq);
1204  if (ctx_p)
1205  {
1206  assert (ctx_p->is_busy == false);
1207  assert (ctx_p->is_in_tran == false);
1208 
1209  proxy_context_clear (ctx_p);
1210  ctx_p->uid = (++uid == 0) ? ++uid : uid;
1211  ctx_p->is_busy = true;
1212  ctx_p->wait_timeout = proxy_info_p->wait_timeout;
1213 
1214  PROXY_LOG (PROXY_LOG_MODE_SHARD_DETAIL, "New context created. context(%s).", proxy_str_context (ctx_p));
1215  }
1216 
1217 #if defined(PROXY_VERBOSE_DEBUG)
1218  proxy_context_print (false);
1219 #endif /* PROXY_VERBOSE_DEBUG */
1220 
1221  return ctx_p;
1222 }
1223 
1224 
1225 static void
1227 {
1228  T_CLIENT_INFO *client_info_p = NULL;
1229 
1230  ENTER_FUNC ();
1231 
1232  assert (ctx_p);
1233 
1234  if (ctx_p->client_id < 0)
1235  {
1236  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Invalid client identifier. " "(client_id:%d). context(%s).", ctx_p->client_id,
1237  proxy_str_context (ctx_p));
1238  EXIT_FUNC ();
1239  return;
1240  }
1241 
1242  client_info_p = shard_shm_get_client_info (proxy_info_p, ctx_p->client_id);
1243  if (client_info_p != NULL)
1244  {
1245  shard_shm_init_client_info (client_info_p);
1246  }
1247 
1248  proxy_client_io_free_by_ctx (ctx_p->client_id, ctx_p->cid, ctx_p->uid);
1249 
1250  EXIT_FUNC ();
1251  return;
1252 }
1253 
1254 static void
1256 {
1257  ENTER_FUNC ();
1258 
1259  assert (ctx_p);
1260 
1261  if (ctx_p->is_in_tran == false)
1262  {
1263  EXIT_FUNC ();
1264  return;
1265  }
1266 
1267  proxy_cas_io_free_by_ctx (ctx_p->shard_id, ctx_p->cas_id, ctx_p->cid, ctx_p->uid);
1268 
1269  EXIT_FUNC ();
1270  return;
1271 }
1272 
1273 void
1275 {
1276  int error;
1277 
1278  ENTER_FUNC ();
1279 
1280  assert (ctx_p);
1281 
1282  proxy_context_free_shard (ctx_p);
1283  proxy_context_free_client (ctx_p);
1284  proxy_context_clear (ctx_p);
1285 
1286  error = shard_cqueue_enqueue (&proxy_Context.freeq, (void *) ctx_p);
1287  if (error)
1288  {
1289  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to queue context to free queue. (error:%d).", error);
1290  assert (false);
1291  }
1292 
1293  EXIT_FUNC ();
1294  return;
1295 }
1296 
1297 #if defined (ENABLE_UNUSED_FUNCTION)
1298 void
1299 proxy_context_free_by_cid (int cid, unsigned int uid)
1300 {
1301  int error;
1302  T_PROXY_CONTEXT *ctx_p;
1303 
1304  ctx_p = proxy_context_find (cid, uid);
1305  if (ctx_p)
1306  {
1307  proxy_context_free (ctx_p);
1308  }
1309  return;
1310 }
1311 #endif /* ENABLE_UNUSED_FUNCTION */
1312 
1314 proxy_context_find (int cid, unsigned int uid)
1315 {
1316  T_PROXY_CONTEXT *ctx_p;
1317 
1318  ctx_p = &(proxy_Context.ent[cid]);
1319 
1320  return (ctx_p->is_busy && ctx_p->uid == uid) ? ctx_p : NULL;
1321 }
1322 
1325 {
1326  T_PROXY_CONTEXT *ctx_p = NULL;
1327  T_CLIENT_IO *cli_io_p = NULL;
1328 
1329  assert (sock_io_p);
1330 
1331  cli_io_p = proxy_client_io_find_by_fd (sock_io_p->id.client_id, sock_io_p->fd);
1332  if (cli_io_p == NULL)
1333  {
1334  proxy_socket_io_delete (sock_io_p->fd);
1335 
1336  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unable to find client entry. " "(client_id:%d, fd:%d).",
1337  sock_io_p->id.client_id, sock_io_p->fd);
1338  return NULL;
1339  }
1340 
1341  ctx_p = proxy_context_find (cli_io_p->ctx_cid, cli_io_p->ctx_uid);
1342  if (ctx_p == NULL)
1343  {
1344  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unable to find context. client(%s).", proxy_str_client_io (cli_io_p));
1345 
1346  proxy_client_io_free (cli_io_p);
1347  return NULL;
1348  }
1349 
1350  return ctx_p;
1351 }
1352 
1355 {
1356  T_CONTEXT_STMT *stmt_list_p;
1357 
1358  for (stmt_list_p = ctx_p->stmt_list; stmt_list_p; stmt_list_p = stmt_list_p->next)
1359  {
1360  if (stmt_list_p->stmt_h_id != stmt_h_id)
1361  {
1362  continue;
1363  }
1364 
1365  return stmt_list_p;
1366  }
1367 
1368  return NULL;
1369 }
1370 
1373 {
1374  int error = 0;
1375  T_CONTEXT_STMT *stmt_list_p;
1376 
1377  stmt_list_p = proxy_context_find_stmt (ctx_p, stmt_p->stmt_h_id);
1378  if (stmt_list_p)
1379  {
1380  return stmt_list_p;
1381  }
1382 
1383  stmt_list_p = (T_CONTEXT_STMT *) malloc (sizeof (T_CONTEXT_STMT));
1384  if (stmt_list_p == NULL)
1385  {
1386  PROXY_DEBUG_LOG ("malloc failed. ");
1387  return NULL;
1388  }
1389 
1390  error = shard_stmt_pin (stmt_p);
1391  if (error < 0)
1392  {
1393  FREE_MEM (stmt_list_p);
1394 
1395  return NULL;
1396  }
1397 
1398  stmt_list_p->stmt_h_id = stmt_p->stmt_h_id;
1399 
1400  stmt_list_p->next = ctx_p->stmt_list;
1401  ctx_p->stmt_list = stmt_list_p;
1402 
1403  PROXY_DEBUG_LOG ("add prepared statement to context. " "(context:(%s), stmt:(%s))", proxy_str_context (ctx_p),
1404  shard_str_stmt (stmt_p));
1405 
1406  return stmt_list_p;
1407 }
1408 
1409 void
1411 {
1412  T_CONTEXT_STMT *stmt_list_p, *stmt_list_np;
1413  T_SHARD_STMT *stmt_p;
1414 
1415  for (stmt_list_p = ctx_p->stmt_list; stmt_list_p; stmt_list_p = stmt_list_np)
1416  {
1417  stmt_list_np = stmt_list_p->next;
1418 
1419  stmt_p = shard_stmt_find_by_stmt_h_id (stmt_list_p->stmt_h_id);
1420  if (stmt_p)
1421  {
1422  PROXY_DEBUG_LOG ("remove prepared statement from context. " "(context:(%s), stmt:(%s))",
1423  proxy_str_context (ctx_p), shard_str_stmt (stmt_p));
1424 
1425  shard_stmt_unpin (stmt_p);
1426 
1427  if (stmt_p->num_pinned <= 0 && stmt_p->status == SHARD_STMT_STATUS_INVALID)
1428  {
1429  shard_stmt_free (stmt_p);
1430  }
1431  else if (stmt_p->stmt_type != SHARD_STMT_TYPE_PREPARED)
1432  {
1433  assert (stmt_p->num_pinned == 0);
1434  shard_stmt_free (stmt_p);
1435  }
1436  }
1437 
1438  FREE_MEM (stmt_list_p);
1439  }
1440 
1441  ctx_p->stmt_list = NULL;
1442 
1443  return;
1444 }
1445 
1446 void
1448 {
1449  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Context waiter timed out. " "context(%s).", proxy_str_context (ctx_p));
1450 
1451  /* free pending 'prepare/execute' request */
1452  if (ctx_p->waiting_event)
1453  {
1455  ctx_p->waiting_event = NULL;
1456  }
1457 
1458  ctx_p->waiting_dummy_prepare = false;
1459  ctx_p->wait_timeout = proxy_info_p->wait_timeout;
1460 
1463  ctx_p->stmt_hint_type = HT_INVAL;
1464 
1465  /* if statement owner */
1467  {
1468  /* check and wakeup statement waiter */
1470 
1471  /* free statement */
1472  shard_stmt_free (ctx_p->prepared_stmt);
1473  }
1474  ctx_p->prepared_stmt = NULL;
1475 
1476  /* if shard/cas waiter */
1477  if (ctx_p->is_in_tran == false)
1478  {
1479  if (ctx_p->cas_id != PROXY_INVALID_CAS)
1480  {
1481  assert (false);
1482  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unexpected transaction status. " "context(%s).", proxy_str_context (ctx_p));
1483 
1484  }
1485  ctx_p->shard_id = PROXY_INVALID_SHARD;
1486  ctx_p->cas_id = PROXY_INVALID_CAS;
1487  }
1488 
1490  "proxy service temporarily unavailable");
1491  proxy_context_send_error (ctx_p);
1492  proxy_context_clear_error (ctx_p);
1493 
1494  return;
1495 }
1496 
1497 void
1499 {
1500  shard_queue_destroy (&proxy_Handler.cas_rcv_q);
1501  shard_queue_destroy (&proxy_Handler.cli_ret_q);
1502  shard_queue_destroy (&proxy_Handler.cli_rcv_q);
1504 }
1505 
1506 int
1508 {
1509  int error;
1510  static bool is_init = false;
1511 
1512  if (is_init == true)
1513  {
1514  return 0;
1515  }
1516 
1517  error = proxy_context_initialize ();
1518  if (error)
1519  {
1520  goto error_return;
1521  }
1522 
1523  error = shard_queue_initialize (&proxy_Handler.cas_rcv_q);
1524  if (error)
1525  {
1526  goto error_return;
1527  }
1528 
1529  error = shard_queue_initialize (&proxy_Handler.cli_ret_q);
1530  if (error)
1531  {
1532  goto error_return;
1533  }
1534 
1535  error = shard_queue_initialize (&proxy_Handler.cli_rcv_q);
1536  if (error)
1537  {
1538  goto error_return;
1539  }
1540 
1541  is_init = true;
1542  return 0;
1543 
1544 error_return:
1546 
1547  return -1;
1548 }
1549 
1550 void
1552 {
1553  T_PROXY_HANDLER *handler_p;
1554  void *msg;
1555 
1556  handler_p = &(proxy_Handler);
1557 
1558  /* process cas response message */
1559  while ((msg = shard_queue_dequeue (&handler_p->cas_rcv_q)) != NULL)
1560  {
1562  }
1563 
1564  /* process client retry message */
1565  while ((msg = shard_queue_dequeue (&handler_p->cli_ret_q)) != NULL)
1566  {
1568  }
1569 
1570  /* process client request message */
1571  while ((msg = shard_queue_dequeue (&handler_p->cli_rcv_q)) != NULL)
1572  {
1574  }
1575 
1576  return;
1577 }
1578 
1579 int
1580 proxy_wakeup_context_by_shard (T_WAIT_CONTEXT * waiter_p, int shard_id, int cas_id)
1581 {
1582  int error;
1583  T_CAS_IO *cas_io_p;
1584  T_PROXY_EVENT *event_p = NULL;
1585  T_PROXY_CONTEXT *ctx_p = NULL;
1586 
1587  assert (waiter_p);
1588  assert (shard_id >= 0);
1589  assert (cas_id >= 0);
1590 
1591  ctx_p = proxy_context_find (waiter_p->ctx_cid, waiter_p->ctx_uid);
1592  if (ctx_p == NULL)
1593  {
1594  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unable to find context. " "(context id:%d, context uid:%d).", waiter_p->ctx_cid,
1595  waiter_p->ctx_uid);
1596  goto error_return;
1597  }
1598 
1599  PROXY_DEBUG_LOG ("wakeup context(cid:%d, uid:%u) by shard(%d)/cas(%d).", ctx_p->cid, ctx_p->uid, shard_id, cas_id);
1600 
1601  cas_io_p =
1602  proxy_cas_alloc_by_ctx (ctx_p->client_id, shard_id, cas_id, waiter_p->ctx_cid, waiter_p->ctx_uid,
1603  ctx_p->wait_timeout, ctx_p->func_code);
1604  if (cas_io_p == NULL)
1605  {
1606  PROXY_DEBUG_LOG ("failed to proxy_cas_alloc_by_ctx. " "(shard_id:%d, cas_id:%d, ctx_cid:%d, ctx_uid:%d", shard_id,
1607  cas_id, waiter_p->ctx_cid, waiter_p->ctx_uid);
1608  goto error_return;
1609  }
1610  if (cas_io_p == (T_CAS_IO *) SHARD_TEMPORARY_UNAVAILABLE)
1611  {
1612  assert (false);
1613  goto error_return;
1614  }
1615 
1616  proxy_context_set_in_tran (ctx_p, cas_io_p->shard_id, cas_io_p->cas_id);
1617 
1619  if (event_p == NULL)
1620  {
1621  PROXY_DEBUG_LOG ("failed to proxy_event_new.");
1622  goto error_return;
1623  }
1624  proxy_event_set_context (event_p, waiter_p->ctx_cid, waiter_p->ctx_uid);
1625  proxy_event_set_shard (event_p, cas_io_p->shard_id, cas_io_p->cas_id);
1626 
1627  error = shard_queue_enqueue (&proxy_Handler.cli_ret_q, (void *) event_p);
1628  if (error)
1629  {
1630  assert (false);
1631 
1632  proxy_event_free (event_p);
1633  event_p = NULL;
1634 
1635  PROXY_DEBUG_LOG ("failed to shard_queue_enqueue.");
1636  goto error_return;
1637  }
1638 
1639  return 0;
1640 
1641 error_return:
1642  if (ctx_p)
1643  {
1644  proxy_context_free (ctx_p);
1645  }
1646  return -1;
1647 }
1648 
1649 int
1651 {
1652  int error;
1653  T_PROXY_EVENT *event_p;
1654 
1655  assert (waiter_p);
1656 
1658  if (event_p == NULL)
1659  {
1660  PROXY_DEBUG_LOG ("failed to proxy_event_new.");
1661  return -1;
1662  }
1663  proxy_event_set_context (event_p, waiter_p->ctx_cid, waiter_p->ctx_uid);
1664 
1665  error = shard_queue_enqueue (&proxy_Handler.cli_ret_q, (void *) event_p);
1666  if (error)
1667  {
1668  PROXY_DEBUG_LOG ("failed to shard_queue_enqueue.");
1669  assert (false);
1670  return -1;
1671  }
1672 
1673  return 0;
1674 }
1675 
1676 T_PROXY_EVENT *
1677 proxy_event_new (unsigned int type, int from_cas)
1678 {
1679  T_PROXY_EVENT *event_p;
1680 
1681  event_p = (T_PROXY_EVENT *) malloc (sizeof (T_PROXY_EVENT));
1682  if (event_p)
1683  {
1684  event_p->type = type;
1685  event_p->from_cas = from_cas;
1686 
1687  event_p->cid = PROXY_INVALID_CONTEXT;
1688  event_p->uid = 0;
1689  event_p->shard_id = PROXY_INVALID_SHARD;
1690  event_p->cas_id = PROXY_INVALID_CAS;
1691 
1692  memset (((void *) &event_p->buffer), 0, sizeof (event_p->buffer));
1693  }
1694 
1695  return event_p;
1696 }
1697 
1698 T_PROXY_EVENT *
1700 {
1701  T_PROXY_EVENT *new_event_p;
1702 
1703  new_event_p = (T_PROXY_EVENT *) malloc (sizeof (T_PROXY_EVENT));
1704  if (new_event_p)
1705  {
1706  memcpy ((void *) new_event_p, (void *) event_p, offsetof (T_PROXY_EVENT, buffer));
1707 
1708  new_event_p->buffer.length = event_p->buffer.length;
1709  new_event_p->buffer.offset = event_p->buffer.offset;
1710  new_event_p->buffer.data = (char *) malloc (event_p->buffer.length * sizeof (char));
1711  if (new_event_p->buffer.data == NULL)
1712  {
1713  FREE_MEM (new_event_p);
1714  return NULL;
1715  }
1716  memcpy ((void *) new_event_p->buffer.data, (void *) event_p->buffer.data, event_p->buffer.length);
1717  }
1718 
1719  return new_event_p;
1720 }
1721 
1722 T_PROXY_EVENT *
1723 proxy_event_new_with_req (char *driver_info, unsigned int type, int from, T_PROXY_EVENT_FUNC req_func)
1724 {
1725  T_PROXY_EVENT *event_p;
1726  char *msg = NULL;
1727  int length;
1728 
1729  event_p = proxy_event_new (type, from);
1730  if (event_p == NULL)
1731  {
1732  return NULL;
1733  }
1734 
1735  length = req_func (driver_info, &msg);
1736  if (length <= 0)
1737  {
1738  proxy_event_free (event_p);
1739  return NULL;
1740  }
1741 
1742  proxy_event_set_buffer (event_p, msg, length);
1743 
1744  return event_p;
1745 }
1746 
1747 T_PROXY_EVENT *
1748 proxy_event_new_with_rsp (char *driver_info, unsigned int type, int from, T_PROXY_EVENT_FUNC resp_func)
1749 {
1750  return proxy_event_new_with_req (driver_info, type, from, resp_func);
1751 }
1752 
1753 T_PROXY_EVENT *
1754 proxy_event_new_with_rsp_ex (char *driver_info, unsigned int type, int from, T_PROXY_EVENT_FUNC_EX resp_func,
1755  void *argv)
1756 {
1757  T_PROXY_EVENT *event_p;
1758  char *msg = NULL;
1759  int length;
1760 
1761  event_p = proxy_event_new (type, from);
1762  if (event_p == NULL)
1763  {
1764  return NULL;
1765  }
1766 
1767  length = resp_func (driver_info, &msg, argv);
1768  if (length <= 0)
1769  {
1770  proxy_event_free (event_p);
1771  return NULL;
1772  }
1773 
1774  proxy_event_set_buffer (event_p, msg, length);
1775 
1776  return event_p;
1777 }
1778 
1779 T_PROXY_EVENT *
1780 proxy_event_new_with_error (char *driver_info, unsigned int type, int from,
1781  int (*err_func) (char *driver_info, char **buffer, int error_ind, int error_code,
1782  const char *error_msg, char is_in_tran), int error_ind, int error_code,
1783  const char *error_msg, char is_in_tran)
1784 {
1785  T_PROXY_EVENT *event_p;
1786  char *msg = NULL;
1787  int length;
1788 
1789  event_p = proxy_event_new (type, from);
1790  if (event_p == NULL)
1791  {
1792  return NULL;
1793  }
1794 
1795  length = err_func (driver_info, &msg, error_ind, error_code, error_msg, is_in_tran);
1796  if (length <= 0)
1797  {
1798  proxy_event_free (event_p);
1799  return NULL;
1800  }
1801 
1802  proxy_event_set_buffer (event_p, msg, length);
1803 
1804  return event_p;
1805 }
1806 
1807 int
1808 proxy_event_alloc_buffer (T_PROXY_EVENT * event_p, unsigned int size)
1809 {
1810  assert (event_p);
1811 
1812  event_p->buffer.length = size;
1813  event_p->buffer.offset = 0;
1814  event_p->buffer.data = (char *) malloc (size * sizeof (char));
1815 
1816  return (event_p->buffer.data != NULL) ? 0 : -1;
1817 }
1818 
1819 int
1820 proxy_event_realloc_buffer (T_PROXY_EVENT * event_p, unsigned int size)
1821 {
1822  char *old_data;
1823 
1824  assert (event_p);
1825  assert (event_p->buffer.data);
1826 
1827  old_data = event_p->buffer.data;
1828 
1829  assert (event_p->buffer.length < (int) size);
1830 
1831  event_p->buffer.data = (char *) realloc (event_p->buffer.data, size * sizeof (char));
1832  if (event_p->buffer.data == NULL)
1833  {
1834  event_p->buffer.data = old_data;
1835  return -1;
1836  }
1837  event_p->buffer.length = size;
1838 
1839  return 0;
1840 }
1841 
1842 void
1843 proxy_event_set_buffer (T_PROXY_EVENT * event_p, char *data, unsigned int size)
1844 {
1845  assert (event_p);
1846  assert (data != NULL);
1847 
1848  event_p->buffer.length = size;
1849  event_p->buffer.offset = 0;
1850  event_p->buffer.data = data;
1851 }
1852 
1853 void
1854 proxy_event_set_type_from (T_PROXY_EVENT * event_p, unsigned int type, int from_cas)
1855 {
1856  assert (event_p);
1857 
1858  event_p->type = type;
1859  event_p->from_cas = from_cas;
1860 
1861  return;
1862 }
1863 
1864 void
1865 proxy_event_set_context (T_PROXY_EVENT * event_p, int cid, unsigned int uid)
1866 {
1867  assert (event_p);
1868 
1869  event_p->cid = cid;
1870  event_p->uid = uid;
1871 
1872  return;
1873 }
1874 
1875 
1876 void
1877 proxy_event_set_shard (T_PROXY_EVENT * event_p, int shard_id, int cas_id)
1878 {
1879  assert (event_p);
1880 
1881  event_p->shard_id = shard_id;
1882  event_p->cas_id = cas_id;
1883 
1884  return;
1885 }
1886 
1887 bool
1889 {
1890  assert (event_p);
1891  assert (event_p->type == PROXY_EVENT_IO_READ);
1892 
1893  return (event_p->buffer.length != 0 && event_p->buffer.length == event_p->buffer.offset) ? true : false;
1894 }
1895 
1896 #if defined (ENABLE_UNUSED_FUNCTION)
1897 bool
1898 proxy_event_io_write_complete (T_PROXY_EVENT * event_p)
1899 {
1900  assert (event_p);
1901  assert (event_p->type == PROXY_EVENT_IO_WRITE);
1902 
1903  return (event_p->buffer.length != 0 && event_p->buffer.length == event_p->buffer.offset) ? true : false;
1904 }
1905 #endif /* ENABLE_UNUSED_FUNCTION */
1906 
1907 void
1909 {
1910  assert (event_p);
1911 
1912  proxy_io_buffer_clear (&event_p->buffer);
1913  FREE_MEM (event_p);
1914 
1915  return;
1916 }
1917 
1918 char *
1920 {
1921  static char buffer[LINE_MAX];
1922 
1923  if (event_p == NULL)
1924  {
1925  return (char *) "-";
1926  }
1927 
1928  snprintf (buffer, sizeof (buffer), "type:%d, from_cas:%s, cid:%d, uid:%u, shard_id:%d, cas_id:%d", event_p->type,
1929  (event_p->from_cas) ? "Y" : "N", event_p->cid, event_p->uid, event_p->shard_id, event_p->cas_id);
1930 
1931  return (char *) buffer;
1932 }
1933 
1934 void
1936 {
1937  static int num_called = 0;
1938  static int old = 0;
1939  int now, diff_time;
1940 
1941  num_called++;
1942  num_called = (num_called % PROXY_MAX_IGNORE_TIMER_CHECK);
1943  if (num_called != 0)
1944  {
1945  return;
1946  }
1947 
1948  now = time (NULL);
1949  diff_time = now - old;
1950  if (diff_time < PROXY_TIMER_CHECK_INTERVAL)
1951  {
1952  return;
1953  }
1954 
1957 
1958  old = now;
1959 
1960  return;
1961 }
1962 
1963 char *
1964 shard_str_sqls (char *sql)
1965 {
1966  static char buffer[LINE_MAX];
1967  size_t len;
1968  size_t head_len, ws_len, tail_len;
1969  char *from, *to;
1970 
1971  if (sql == NULL)
1972  {
1973  return (char *) "-";
1974  }
1975 
1976  len = strlen (sql);
1977  if (len < (int) sizeof (buffer))
1978  {
1979  return sql;
1980  }
1981 
1982  ws_len = 32;
1983  head_len = sizeof (buffer) / 2;
1984  tail_len = sizeof (buffer) - head_len - ws_len;
1985 
1986  /* head */
1987  memcpy (buffer, sql, head_len);
1988 
1989  /* ws */
1990  *(buffer + head_len) = '\n';
1991  memset ((buffer + head_len + 1), (int) '.', ws_len - 2);
1992  *(buffer + head_len + ws_len - 1) = '\n';
1993 
1994  /* tail */
1995  to = (sql + len);
1996  from = (to - tail_len);
1997  memcpy ((buffer + head_len + ws_len), from, tail_len);
1998 
1999  buffer[LINE_MAX - 1] = '\0'; /* bulletproof */
2000 
2001  return buffer;
2002 }
T_PROXY_EVENT * proxy_event_new_with_req(char *driver_info, unsigned int type, int from, T_PROXY_EVENT_FUNC req_func)
void proxy_context_set_out_tran(T_PROXY_CONTEXT *ctx_p)
int fn_proxy_client_not_supported(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p, int argc, char **argv)
T_SHARD_QUEUE cli_ret_q
int(* T_PROXY_EVENT_FUNC)(char *driver_info, char **buffer)
int fn_proxy_client_con_close(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p, int argc, char **argv)
char driver_info[SRV_CON_CLIENT_INFO_SIZE]
Definition: broker_shm.h:391
T_CONTEXT_STMT * proxy_context_add_stmt(T_PROXY_CONTEXT *ctx_p, T_SHARD_STMT *stmt_p)
int fn_proxy_client_set_db_parameter(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p, int argc, char **argv)
#define MSG_HEADER_SIZE
Definition: cas_protocol.h:114
#define NO_ERROR
Definition: error_code.h:46
int(* T_PROXY_CLIENT_FUNC)(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p, int argc, char **argv)
void proxy_event_free(T_PROXY_EVENT *event_p)
static void proxy_handler_process_client_wakeup_by_shard(T_PROXY_EVENT *event_p)
int fn_proxy_client_check_cas(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p, int argc, char **argv)
int proxy_wakeup_context_by_shard(T_WAIT_CONTEXT *waiter_p, int shard_id, int cas_id)
void proxy_io_buffer_clear(T_IO_BUFFER *io_buffer)
void proxy_event_set_buffer(T_PROXY_EVENT *event_p, char *data, unsigned int size)
int fn_proxy_cas_check_cas(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p)
int proxy_handler_initialize(void)
T_PROXY_CONTEXT * proxy_context_find(int cid, unsigned int uid)
#define PROXY_DEBUG_LOG(fmt, args...)
int argc
Definition: dynamic_load.c:951
int fn_proxy_client_get_db_parameter(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p, int argc, char **argv)
T_PROXY_INFO * proxy_info_p
Definition: shard_proxy.c:48
static void proxy_handler_process_cas_response(T_PROXY_EVENT *event_p)
void proxy_event_set_shard(T_PROXY_EVENT *event_p, int shard_id, int cas_id)
int fn_proxy_cas_execute_array(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p)
#define ER_FAILED
Definition: error_code.h:47
int shard_stmt_unpin(T_SHARD_STMT *stmt_p)
T_PROXY_CONTEXT * proxy_context_find_by_socket_client_io(T_SOCKET_IO *sock_io_p)
#define PROXY_INVALID_CONTEXT
void proxy_available_cas_wait_timer(void)
char database_user[SRV_CON_DBUSER_SIZE]
int fn_proxy_cas_prepare(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p)
T_PROXY_EVENT * proxy_event_dup(T_PROXY_EVENT *event_p)
int shard_queue_enqueue(T_SHARD_QUEUE *q, void *v)
#define EXIT_FUNC()
#define PROXY_EVENT_FROM_CAS
void proxy_event_set_type_from(T_PROXY_EVENT *event_p, unsigned int type, int from_cas)
int fn_proxy_client_prepare_and_execute(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p, int argc, char **argv)
T_PROXY_EVENT * proxy_event_new_with_rsp_ex(char *driver_info, unsigned int type, int from, T_PROXY_EVENT_FUNC_EX resp_func, void *argv)
void shard_shm_init_client_info_request(T_CLIENT_INFO *client_info_p)
Definition: shard_shm.c:773
bool proxy_event_io_read_complete(T_PROXY_EVENT *event_p)
char * proxy_get_driver_info_by_ctx(T_PROXY_CONTEXT *ctx_p)
T_IO_BUFFER buffer
T_SHARD_QUEUE cli_rcv_q
int fn_proxy_client_cursor(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p, int argc, char **argv)
static int proxy_context_initialize(void)
void proxy_context_set_error(T_PROXY_CONTEXT *ctx_p, int error_ind, int error_code)
T_APPL_SERVER_INFO * shard_shm_get_as_info(T_PROXY_INFO *proxy_info_p, T_SHM_APPL_SERVER *shm_as_p, int shard_id, int as_id)
Definition: shard_shm.c:671
void proxy_timer_process(void)
T_CONTEXT_STMT * next
static void proxy_handler_process_cas_event(T_PROXY_EVENT *event_p)
int proxy_send_response_to_client(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p)
int fn_proxy_get_shard_info(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p, int argc, char **argv)
char * shard_str_sqls(char *sql)
#define ENTER_FUNC()
void proxy_handler_process(void)
T_PROXY_EVENT * proxy_event_new_with_error(char *driver_info, unsigned int type, int from, int(*err_func)(char *driver_info, char **buffer, int error_ind, int error_code, const char *error_msg, char is_in_tran), int error_ind, int error_code, const char *error_msg, char is_in_tran)
void proxy_context_free_stmt(T_PROXY_CONTEXT *ctx_p)
int max_context
Definition: broker_shm.h:534
T_SHARD_STMT * prepared_stmt
void proxy_cas_io_free_by_ctx(int shard_id, int cas_id, int ctx_cid, int unsigned ctx_uid)
void proxy_context_set_in_tran(T_PROXY_CONTEXT *ctx_p, int shard_id, int cas_id)
int proxy_context_send_error(T_PROXY_CONTEXT *ctx_p)
bool proxy_handler_is_cas_in_tran(int shard_id, int cas_id)
#define CAS_MAKE_PROTO_VER(DRIVER_INFO)
Definition: cas_protocol.h:307
#define DOES_CLIENT_MATCH_THE_PROTOCOL(CLIENT, MATCH)
Definition: cas_protocol.h:289
#define PROXY_TIMER_CHECK_INTERVAL
int proxy_check_cas_error(char *read_msg)
void proxy_client_io_free_by_ctx(int client_id, int ctx_cid, int ctx_uid)
char * proxy_str_event(T_PROXY_EVENT *event_p)
#define assert(x)
T_CONTEXT_STMT * stmt_list
void proxy_unset_force_out_tran(char *msg)
char database_passwd[SRV_CON_DBPASSWD_SIZE]
void shard_statement_wait_timer(void)
int fn_proxy_client_execute(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p, int argc, char **argv)
void shard_shm_init_client_info(T_CLIENT_INFO *client_info_p)
Definition: shard_shm.c:756
static void proxy_handler_process_client_conn_error(T_PROXY_EVENT *event_p)
int fn_proxy_client_conn_error(T_PROXY_CONTEXT *ctx_p)
#define PROXY_INVALID_FUNC_CODE
void proxy_waiter_free(T_WAIT_CONTEXT *waiter)
void proxy_context_timeout(T_PROXY_CONTEXT *ctx_p)
T_PROXY_CONTEXT_GLOBAL proxy_Context
int proxy_waiter_comp_fn(const void *arg1, const void *arg2)
void proxy_context_set_error_with_msg(T_PROXY_CONTEXT *ctx_p, int error_ind, int error_code, const char *error_msg)
T_SHARD_QUEUE cas_rcv_q
int proxy_event_realloc_buffer(T_PROXY_EVENT *event_p, unsigned int size)
#define CAS_ERROR_INDICATOR
Definition: cas.h:39
int get_data_length(char *buffer)
SP_PARSER_CTX * parser
#define NULL
Definition: freelistheap.h:34
union t_socket_io::@41 id
char * shard_str_stmt(T_SHARD_STMT *stmt_p)
T_CLIENT_INFO * shard_shm_get_client_info(T_PROXY_INFO *proxy_info_p, int idx)
Definition: shard_shm.c:448
static void proxy_context_clear(T_PROXY_CONTEXT *ctx_p)
#define FREE_MEM(PTR)
Definition: cas_common.h:58
#define SHARD_TEMPORARY_UNAVAILABLE
void * shard_cqueue_dequeue(T_SHARD_CQUEUE *q)
int fn_proxy_client_prepare(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p, int argc, char **argv)
void proxy_client_io_free(T_CLIENT_IO *cli_io_p)
#define PROXY_INVALID_CAS
Definition: broker_util.h:41
int proxy_event_alloc_buffer(T_PROXY_EVENT *event_p, unsigned int size)
static void proxy_handler_process_client_request(T_PROXY_EVENT *event_p)
static void proxy_context_destroy(void)
int proxy_io_make_error_msg(char *driver_info, char **buffer, int error_ind, int error_code, const char *error_msg, char is_in_tran)
void proxy_handler_destroy(void)
static int proxy_handler_process_cas_error(T_PROXY_EVENT *event_p)
T_CAS_IO * proxy_cas_alloc_by_ctx(int client_id, int shard_id, int cas_id, int ctx_cid, unsigned int ctx_uid, int timeout, int func_code)
struct t_proxy_context T_PROXY_CONTEXT
int fn_proxy_cas_relay_only(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p)
void proxy_context_clear_error(T_PROXY_CONTEXT *ctx_p)
static void error(const char *msg)
Definition: gencat.c:331
T_CLIENT_IO * proxy_client_io_find_by_fd(int client_id, SOCKET fd)
int fn_proxy_client_cursor_close(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p, int argc, char **argv)
int shard_stmt_pin(T_SHARD_STMT *stmt_p)
void shard_shm_set_client_info_response(T_CLIENT_INFO *client_info_p)
Definition: shard_shm.c:793
static void proxy_handler_process_client_wakeup_by_statement(T_PROXY_EVENT *event_p)
T_PROXY_CONTEXT * proxy_context_new(void)
static void proxy_context_free_shard(T_PROXY_CONTEXT *ctx_p)
static void proxy_handler_process_cas_conn_error(T_PROXY_EVENT *event_p)
void proxy_waiter_timeout(T_SHARD_QUEUE *waitq, INT64 *counter, int now)
int fn_proxy_cas_fetch(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p)
int fn_proxy_client_schema_info(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p, int argc, char **argv)
unsigned short type
void proxy_cas_release_by_ctx(int shard_id, int cas_id, int ctx_cid, unsigned int ctx_uid)
void shard_stmt_check_waiter_and_wakeup(T_SHARD_STMT *stmt_p)
char * proxy_str_context(T_PROXY_CONTEXT *ctx_p)
T_PROXY_HANDLER proxy_Handler
void shard_queue_destroy(T_SHARD_QUEUE *q)
int fn_proxy_cas_execute(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p)
const char ** argv
Definition: dynamic_load.c:952
int fn_proxy_cas_prepare_and_execute(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p)
#define strlen(s1)
Definition: intl_support.c:43
int net_decode_str(char *msg, int msg_size, char *func_code, void ***ret_argv)
Definition: cas_network.c:393
T_PROXY_EVENT * waiting_event
static T_PROXY_CAS_FUNC proxy_cas_fn_table[]
#define PROXY_LOG(level, fmt, args...)
bool shard_shm_set_as_client_info(T_PROXY_INFO *proxy_info_p, T_SHM_APPL_SERVER *shm_as_p, int shard_id, int as_id, unsigned int ip_addr, char *driver_info, char *driver_version)
Definition: shard_shm.c:689
int T_BROKER_VERSION
Definition: cas_protocol.h:342
void shard_stmt_free(T_SHARD_STMT *stmt_p)
T_APPL_SERVER_INFO * as_info
Definition: cas.c:153
static void proxy_context_free_client(T_PROXY_CONTEXT *ctx_p)
void shard_cqueue_destroy(T_SHARD_CQUEUE *q)
T_SHM_APPL_SERVER * shm_as_p
Definition: shard_proxy.c:43
T_PROXY_EVENT * proxy_event_new_with_rsp(char *driver_info, unsigned int type, int from, T_PROXY_EVENT_FUNC resp_func)
int wait_timeout
Definition: broker_shm.h:479
T_SHM_PROXY * shm_proxy_p
int fn_proxy_client_fetch(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p, int argc, char **argv)
int i
Definition: dynamic_load.c:954
#define PROXY_MAX_IGNORE_TIMER_CHECK
static void proxy_handler_process_client_event(T_PROXY_EVENT *event_p)
void * shard_queue_dequeue(T_SHARD_QUEUE *q)
int fn_proxy_cas_end_tran(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p)
int fn_proxy_client_execute_array(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p, int argc, char **argv)
int shard_cqueue_enqueue(T_SHARD_CQUEUE *q, void *e)
INT64 num_proxy_error_processed
Definition: broker_shm.h:501
void * shard_queue_peek_value(T_SHARD_QUEUE *q)
T_CONTEXT_STMT * proxy_context_find_stmt(T_PROXY_CONTEXT *ctx_p, int stmt_h_id)
int shard_queue_initialize(T_SHARD_QUEUE *q)
unsigned int uid
T_SHARD_STMT * shard_stmt_find_by_stmt_h_id(int stmt_h_id)
void proxy_context_free(T_PROXY_CONTEXT *ctx_p)
#define PROXY_INVALID_CLIENT
#define PROXY_INVALID_SHARD
Definition: broker_util.h:38
int fn_proxy_client_get_db_version(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p, int argc, char **argv)
T_PROXY_EVENT * proxy_event_new(unsigned int type, int from_cas)
int fn_proxy_cas_schema_info(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p)
int proxy_socket_io_delete(SOCKET fd)
static T_PROXY_CLIENT_FUNC proxy_client_fn_table[]
#define PROXY_EVENT_FROM_CLIENT
int fn_proxy_client_end_tran(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p, int argc, char **argv)
int fn_proxy_client_close_req_handle(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p, int argc, char **argv)
void proxy_event_set_context(T_PROXY_EVENT *event_p, int cid, unsigned int uid)
int proxy_wakeup_context_by_statement(T_WAIT_CONTEXT *waiter_p)
int(* T_PROXY_EVENT_FUNC_EX)(char *driver_info, char **buffer, void *argv)
char * proxy_str_client_io(T_CLIENT_IO *cli_io_p)
int shard_cqueue_initialize(T_SHARD_CQUEUE *q, int size)
int(* T_PROXY_CAS_FUNC)(T_PROXY_CONTEXT *ctx_p, T_PROXY_EVENT *event_p)
#define CAS_NO_ERROR
Definition: cas.h:41
void shard_shm_set_client_info_request(T_CLIENT_INFO *client_info_p, int func_code)
Definition: shard_shm.c:783
unsigned int ctx_uid
T_WAIT_CONTEXT * proxy_waiter_new(int ctx_cid, unsigned int ctx_uid, int timeout)
#define SHARD_STMT_INVALID_HANDLE_ID