CUBRID Engine  latest
shard_statement.c
Go to the documentation of this file.
1 /*
2  * Copyright 2008 Search Solution Corporation
3  * Copyright 2016 CUBRID Corporation
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 
20 /*
21  * shard_statement.c -
22  */
23 
24 #ident "$Id$"
25 
26 
27 #include <stdio.h>
28 #include <stdlib.h>
29 #include <string.h>
30 #include <assert.h>
31 
32 #include "cas_common.h"
33 #include "shard_proxy.h"
34 #include "shard_proxy_common.h"
35 #include "shard_statement.h"
36 #include "shard_shm.h"
37 
38 #if defined (SUPPRESS_STRLEN_WARNING)
39 #define strlen(s1) ((int) strlen(s1))
40 #endif /* defined (SUPPRESS_STRLEN_WARNING) */
41 
44 
45 static int shard_stmt_lru_insert (T_SHARD_STMT * stmt_p);
46 static int shard_stmt_lru_delete (T_SHARD_STMT * stmt_p);
47 static T_SHARD_STMT *shard_stmt_get_lru (void);
48 
49 static int *shard_stmt_pos_srv_h_id (T_SHARD_STMT * stmt_p, int shard_id, int cas_id);
50 static int shard_stmt_find_srv_h_id (T_SHARD_STMT * stmt_p, int shard_id, int cas_id);
51 static int shard_stmt_set_srv_h_id (T_SHARD_STMT * stmt_p, int shard_id, int cas_id, int srv_h_id);
52 
54 
55 static T_SHARD_STMT *shard_stmt_new_internal (int stmt_type, char *sql_stmt, int ctx_cid, unsigned int ctx_uid,
56  T_BROKER_VERSION client_version);
57 
58 static int shard_stmt_change_shard_val_to_id (char **sql_stmt, const char **buf, char appl_server);
59 static char *shard_stmt_write_buf_to_sql (char *sql_stmt, const char *buf, int length, bool is_to_upper,
60  char appl_server);
61 
63 static void shard_stmt_put_statement_to_map (const char *sql_stmt, T_SHARD_STMT * stmt_p);
64 static void shard_stmt_del_statement_from_map (T_SHARD_STMT * stmt_p);
65 static void shard_stmt_set_status (int stmt_h_id, int status);
66 
68 
70 
71 static int
73 {
74  if ((stmt_p->num_pinned != 0) || (stmt_p->lru_prev != NULL) || (stmt_p->lru_next != NULL))
75  {
76  PROXY_DEBUG_LOG ("Invalid statement status. statement(%s).", shard_str_stmt (stmt_p));
77 
78  assert (false);
79  return -1;
80  }
81 
82  stmt_p->lru_next = NULL;
83  stmt_p->lru_prev = shard_Stmt.mru;
84 
85  if (shard_Stmt.mru)
86  {
87  shard_Stmt.mru->lru_next = stmt_p;
88  }
89  else
90  {
91  shard_Stmt.lru = stmt_p;
92  }
93 
94  shard_Stmt.mru = stmt_p;
95 
96  return 0;
97 }
98 
99 static int
101 {
102  ENTER_FUNC ();
103 
104  if ((stmt_p->lru_prev == NULL && shard_Stmt.lru != stmt_p) || (stmt_p->lru_next == NULL && shard_Stmt.mru != stmt_p))
105  {
106  PROXY_DEBUG_LOG ("Invalid statement lru list. " "(stmt_p:%p, lru:%p, mru:%p). " "statement(%s).", stmt_p,
107  shard_Stmt.lru, shard_Stmt.mru, shard_str_stmt (stmt_p));
108 
109  assert (false);
110 
111  EXIT_FUNC ();
112  return -1;
113  }
114 
115  if (stmt_p->lru_next)
116  {
117  stmt_p->lru_next->lru_prev = stmt_p->lru_prev;
118  }
119  else
120  {
121  shard_Stmt.mru = stmt_p->lru_prev;
122  }
123 
124  if (stmt_p->lru_prev)
125  {
126  stmt_p->lru_prev->lru_next = stmt_p->lru_next;
127  }
128  else
129  {
130  shard_Stmt.lru = stmt_p->lru_next;
131  }
132 
133  stmt_p->lru_prev = NULL;
134  stmt_p->lru_next = NULL;
135 
136  EXIT_FUNC ();
137  return 0;
138 }
139 
140 static T_SHARD_STMT *
142 {
143  return shard_Stmt.lru;
144 }
145 
146 static int *
147 shard_stmt_pos_srv_h_id (T_SHARD_STMT * stmt_p, int shard_id, int cas_id)
148 {
149  int pos;
150  int srv_h_id;
151 
152  if ((shard_id < 0 || shard_Stmt.max_num_shard <= shard_id) || (cas_id < 0 || shard_Stmt.num_cas_per_shard <= cas_id))
153  {
154  PROXY_DEBUG_LOG ("Invalid statement shard/CAS id. " "(shard_id:%d, cas_id:%d, max_num_shard:%d, "
155  "num_cas_per_shard:%d). statement(%s).", shard_id, cas_id, shard_Stmt.max_num_shard,
156  shard_Stmt.num_cas_per_shard, shard_str_stmt (stmt_p));
157 
158  assert (false);
159  return NULL;
160  }
161 
162  pos = (shard_id * shard_Stmt.num_cas_per_shard) + cas_id;
163  srv_h_id = stmt_p->srv_h_id_ent[pos];
164 
165  return &(stmt_p->srv_h_id_ent[pos]);
166 }
167 
168 static int
169 shard_stmt_find_srv_h_id (T_SHARD_STMT * stmt_p, int shard_id, int cas_id)
170 {
171  int *srv_h_id_p;
172 
173  srv_h_id_p = shard_stmt_pos_srv_h_id (stmt_p, shard_id, cas_id);
174  if (srv_h_id_p == NULL)
175  {
176  return -1;
177  }
178 
179  return (*srv_h_id_p);
180 }
181 
182 static int
183 shard_stmt_set_srv_h_id (T_SHARD_STMT * stmt_p, int shard_id, int cas_id, int srv_h_id)
184 {
185  int *srv_h_id_p;
186 
187  srv_h_id_p = shard_stmt_pos_srv_h_id (stmt_p, shard_id, cas_id);
188  if (srv_h_id_p == NULL)
189  {
190  return -1;
191  }
192 
193  *srv_h_id_p = srv_h_id;
194  return 0;
195 }
196 
197 static T_BROKER_VERSION
199 {
200  /* protocol version used in only shard_Stmt->stmt_entry's client_version */
201  if (client_version < CAS_MAKE_VER (8, 3, 0))
202  {
203  /* old protocol */
204  return CAS_MAKE_VER (8, 2, 0);
205  }
206  else if (client_version < CAS_MAKE_VER (8, 4, 0))
207  {
208  /* error indicator protocol added */
209  return CAS_MAKE_VER (8, 3, 0);
210  }
211  else if (client_version <= CAS_PROTO_MAKE_VER (PROTOCOL_V1))
212  {
213  /* prepare result info added */
215  }
216  /* send columns meta-data with the result for executing */
217  /* PROTOCOL_V2 is current protocol version */
218  return CAS_PROTO_CURRENT_VER;
219 }
220 
221 T_SHARD_STMT *
222 shard_stmt_find_by_sql (char *sql_stmt, const char *db_user, T_BROKER_VERSION client_version)
223 {
224  T_SHARD_STMT *stmt_p = NULL;
225  T_BROKER_VERSION client_protocol_version;
226 
227  client_protocol_version = shard_stmt_make_protocol_version (client_version);
228 
229  stmt_p = (T_SHARD_STMT *) mht_get (shard_Stmt.stmt_map, sql_stmt);
230  for (; stmt_p != NULL; stmt_p = stmt_p->hash_next)
231  {
232  if (stmt_p->stmt_type != SHARD_STMT_TYPE_PREPARED)
233  {
234  assert (false);
235  continue;
236  }
237 
239  {
240  continue;
241  }
242 
243  if (stmt_p->client_version != client_protocol_version)
244  {
245  continue;
246  }
247 
248  if (proxy_info_p->appl_server == APPL_SERVER_CAS_MYSQL)
249  {
250  if (strcmp (db_user, stmt_p->database_user))
251  {
252  continue;
253  }
254  }
255  else
256  {
257  if (strcasecmp (db_user, stmt_p->database_user))
258  {
259  continue;
260  }
261  }
262 
263  assert (strcmp (sp_get_sql_stmt (stmt_p->parser), sql_stmt) == 0);
264 
265  return stmt_p;
266  }
267 
268  return NULL;
269 }
270 
271 T_SHARD_STMT *
273 {
274  T_SHARD_STMT *stmt_p = NULL;
275  unsigned hash_res;
276 
277  hash_res = (stmt_h_id % shard_Stmt.max_num_stmt);
278 
279  stmt_p = &(shard_Stmt.stmt_ent[hash_res]);
280  if (stmt_p->status != SHARD_STMT_STATUS_UNUSED && stmt_p->stmt_h_id == stmt_h_id)
281  {
282  return stmt_p;
283  }
284 
285  return NULL;
286 }
287 
288 static T_SHARD_STMT *
290 {
291  T_SHARD_STMT *stmt_p = NULL;
292  int i;
293 
294  for (i = 0; i < shard_Stmt.max_num_stmt; i++)
295  {
296  stmt_p = &(shard_Stmt.stmt_ent[i]);
297  if (stmt_p->status != SHARD_STMT_STATUS_UNUSED)
298  {
299  continue;
300  }
301 
302  return stmt_p;
303  }
304 
305  return NULL;
306 }
307 
308 int
310 {
311  int error = 0;
312 
313  if (stmt_p->status == SHARD_STMT_STATUS_UNUSED)
314  {
315  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Invalid statement status. statement(%s).", shard_str_stmt (stmt_p));
316  assert (false);
317  return -1;
318  }
319 
320  stmt_p->num_pinned += 1;
321 
322  if ((shard_Stmt.lru == stmt_p) || (stmt_p->lru_prev || stmt_p->lru_next))
323  {
324  PROXY_DEBUG_LOG ("Pin statement. statement(%s).", shard_str_stmt (stmt_p));
325 
326  error = shard_stmt_lru_delete (stmt_p);
327  if (error < 0)
328  {
329  stmt_p->num_pinned -= 1;
330  return error;
331  }
332  }
333 
334  return error;
335 }
336 
337 int
339 {
340  int error = 0;
341 
342  if (stmt_p->status == SHARD_STMT_STATUS_UNUSED)
343  {
344  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Invalid statement status. statement(%s).", shard_str_stmt (stmt_p));
345  assert (false);
346  return -1;
347  }
348 
349  stmt_p->num_pinned -= 1;
350  assert (stmt_p->num_pinned >= 0);
351 
352  if (stmt_p->num_pinned <= 0)
353  {
354  stmt_p->num_pinned = 0;
355 
356  error = shard_stmt_lru_insert (stmt_p);
357  if (error < 0)
358  {
359  stmt_p->num_pinned += 1;
360  return error;
361  }
362  }
363 
364  return error;
365 }
366 
367 void
369 {
370  int error;
371  T_WAIT_CONTEXT *waiter_p = NULL;
372 
373  ENTER_FUNC ();
374 
375  assert (stmt_p);
376 
377  /* clear context owner */
378  stmt_p->ctx_cid = PROXY_INVALID_CONTEXT;
379  stmt_p->ctx_uid = 0;
380 
381  while ((waiter_p = (T_WAIT_CONTEXT *) shard_queue_dequeue (&stmt_p->waitq)) != NULL)
382  {
384 
385  if (proxy_info_p->stmt_waiter_count > 0)
386  {
387  proxy_info_p->stmt_waiter_count--;
388  }
389 
390  PROXY_DEBUG_LOG ("Wakeup context by statement. statement(%s) waiter_count(%d).", shard_str_stmt (stmt_p),
391  proxy_info_p->stmt_waiter_count);
392 
393  error = proxy_wakeup_context_by_statement (waiter_p);
394  if (error)
395  {
397  "Failed to wakeup context by statement. "
398  "(error:%d, context id:%d, context uid:%d). statement(%s).", error, waiter_p->ctx_cid,
399  waiter_p->ctx_uid, shard_str_stmt (stmt_p));
400 
401  assert (false);
402  }
403 
404  FREE_MEM (waiter_p);
405  }
406 
407  EXIT_FUNC ();
408  return;
409 }
410 
411 static T_SHARD_STMT *
412 shard_stmt_new_internal (int stmt_type, char *sql_stmt, int ctx_cid, unsigned int ctx_uid,
413  T_BROKER_VERSION client_version)
414 {
415  int error;
416  int i, num_cas;
417  T_SHARD_STMT *stmt_p = NULL;
418  T_PROXY_CONTEXT *ctx_p = NULL;
419 
420  assert ((stmt_type != SHARD_STMT_TYPE_SCHEMA_INFO && sql_stmt != NULL)
421  || (stmt_type == SHARD_STMT_TYPE_SCHEMA_INFO && sql_stmt == NULL && client_version == 0));
422 
423  ctx_p = proxy_context_find (ctx_cid, ctx_uid);
424  assert (ctx_p != NULL);
425 
426  assert (stmt_type != SHARD_STMT_TYPE_PREPARED
427  || shard_stmt_find_by_sql (sql_stmt, ctx_p->database_user, client_version) == NULL);
428 
429  num_cas = shard_Stmt.max_num_shard * shard_Stmt.num_cas_per_shard;
430 
431  stmt_p = shard_stmt_find_unused ();
432  if (stmt_p)
433  {
434  assert (stmt_p->num_pinned == 0);
435  assert (stmt_p->lru_next == NULL && stmt_p->lru_prev == NULL);
436 
437  if (!(stmt_p->num_pinned == 0) || !(stmt_p->lru_next == NULL && stmt_p->lru_prev == NULL))
438  {
439  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Invalid statement LRU. statement(%s).", shard_str_stmt (stmt_p));
440  return NULL;
441  }
442  }
443  else
444  {
445  stmt_p = shard_stmt_get_lru ();
446  if (stmt_p)
447  {
448  shard_stmt_free (stmt_p);
449  }
450  else
451  {
452  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Exceeds max prepared statement. ");
453  return NULL;
454  }
455  }
456 
457  if (stmt_p)
458  {
459  if ((int) stmt_p->num_alloc >= shard_Stmt_max_num_alloc)
460  {
461  stmt_p->num_alloc = 0;
462  }
463  stmt_p->stmt_h_id = (stmt_p->num_alloc * shard_Stmt.max_num_stmt) + stmt_p->index;
464 
466  stmt_p->stmt_type = stmt_type;
468  {
469  stmt_p->client_version = shard_stmt_make_protocol_version (client_version);
470 
471  stmt_p->parser = sp_create_parser (sql_stmt);
472  if (stmt_p->parser == NULL)
473  {
474  shard_stmt_free (stmt_p);
475  return NULL;
476  }
477  }
478  else
479  {
480  stmt_p->client_version = 0;
481  assert (stmt_p->parser == NULL);
482  stmt_p->parser = NULL;
483  }
484 
485  stmt_p->ctx_cid = ctx_cid;
486  stmt_p->ctx_uid = ctx_uid;
487  strncpy_bufsize (stmt_p->database_user, ctx_p->database_user);
488 
489  stmt_p->num_pinned = 0;
490  stmt_p->lru_prev = NULL;
491  stmt_p->lru_next = NULL;
492 
493  /* __FOR_DEBUG */
494  assert (stmt_p->request_buffer_length == 0);
495  assert (stmt_p->request_buffer == NULL);
496  assert (stmt_p->reply_buffer_length == 0);
497  assert (stmt_p->reply_buffer == NULL);
498 
499  stmt_p->request_buffer_length = 0;
500  stmt_p->request_buffer = NULL;
501  stmt_p->reply_buffer_length = 0;
502  stmt_p->reply_buffer = NULL;
503 
504  for (i = 0; i < num_cas; i++)
505  {
507  }
508 
509  error = shard_queue_initialize (&stmt_p->waitq);
510  if (error)
511  {
512  assert (false);
513 
514  shard_stmt_free (stmt_p);
515  return NULL;
516  }
517 
518  stmt_p->num_alloc += 1;
519 
520  if (stmt_p->stmt_type == SHARD_STMT_TYPE_PREPARED)
521  {
522  shard_stmt_put_statement_to_map (sql_stmt, stmt_p);
523  }
524  }
525 
526  return stmt_p;
527 }
528 
529 T_SHARD_STMT *
530 shard_stmt_new_prepared_stmt (char *sql_stmt, int ctx_cid, unsigned int ctx_uid, T_BROKER_VERSION client_version)
531 {
532  T_SHARD_STMT *stmt_p = NULL;
533 
534  stmt_p = shard_stmt_new_internal (SHARD_STMT_TYPE_PREPARED, sql_stmt, ctx_cid, ctx_uid, client_version);
535  return stmt_p;
536 }
537 
538 T_SHARD_STMT *
539 shard_stmt_new_schema_info (int ctx_cid, unsigned int ctx_uid)
540 {
541  T_SHARD_STMT *stmt_p = NULL;
542 
543  stmt_p = shard_stmt_new_internal (SHARD_STMT_TYPE_SCHEMA_INFO, NULL, ctx_cid, ctx_uid, 0);
544  return stmt_p;
545 }
546 
547 T_SHARD_STMT *
548 shard_stmt_new_exclusive (char *sql_stmt, int ctx_cid, unsigned int ctx_uid, T_BROKER_VERSION client_version)
549 {
550  T_SHARD_STMT *stmt_p = NULL;
551 
552  stmt_p = shard_stmt_new_internal (SHARD_STMT_TYPE_EXCLUSIVE, sql_stmt, ctx_cid, ctx_uid, client_version);
553  return stmt_p;
554 }
555 
556 void
558 {
559  T_SHARD_STMT *stmt_p = NULL;
560  int i;
561 
562  if (shard_Stmt.stmt_map)
563  {
564  mht_destroy (shard_Stmt.stmt_map);
565  }
566 
567  if (shard_Stmt.stmt_ent)
568  {
569  for (i = 0; i < shard_Stmt.max_num_stmt; i++)
570  {
571  stmt_p = &(shard_Stmt.stmt_ent[i]);
572 
575 
576  stmt_p->ctx_cid = PROXY_INVALID_CONTEXT;
577  stmt_p->ctx_uid = 0;
578 
579  stmt_p->num_pinned = 0;
580  if ((shard_Stmt.lru == stmt_p) || (stmt_p->lru_prev || stmt_p->lru_next))
581  {
582  PROXY_DEBUG_LOG ("Delete statement from lru. " "statement(%p, %s). ", stmt_p, shard_str_stmt (stmt_p));
583 
584  shard_stmt_lru_delete (stmt_p);
585  }
586  stmt_p->lru_prev = NULL;
587  stmt_p->lru_next = NULL;
588 
589  if (stmt_p->parser)
590  {
591  sp_destroy_parser (stmt_p->parser);
592  }
593  stmt_p->parser = NULL;
594 
595  FREE_MEM (stmt_p->request_buffer);
596  stmt_p->request_buffer_length = 0;
597 
598  FREE_MEM (stmt_p->reply_buffer);
599  stmt_p->reply_buffer_length = 0;
600 
601 
602  FREE_MEM (stmt_p->srv_h_id_ent);
603 
604  shard_queue_destroy (&stmt_p->waitq);
605  }
606 
607  FREE_MEM (shard_Stmt.stmt_ent);
608  }
609 
610  shard_Stmt.max_num_stmt = 0;
611  shard_Stmt.max_num_shard = 0;
612  shard_Stmt.num_cas_per_shard = 0;
613 
614  return;
615 }
616 
617 void
619 {
620  int num_cas;
621  int i;
622 
623  ENTER_FUNC ();
624 
625  assert (stmt_p);
626 
627  num_cas = shard_Stmt.max_num_shard * shard_Stmt.num_cas_per_shard;
628 
629  if (stmt_p->stmt_type == SHARD_STMT_TYPE_PREPARED)
630  {
632  }
633 
636  stmt_p->stmt_type = -1;
637 
638  stmt_p->client_version = 0;
639 
640  stmt_p->ctx_cid = PROXY_INVALID_CONTEXT;
641  stmt_p->ctx_uid = 0;
642  stmt_p->database_user[0] = '\0';
643 
644  stmt_p->num_pinned = 0;
645  if ((shard_Stmt.lru == stmt_p) || (stmt_p->lru_prev || stmt_p->lru_next))
646  {
647  PROXY_DEBUG_LOG ("Delete statement from lru. " "statement(%p, %s).", stmt_p, shard_str_stmt (stmt_p));
648 
649  shard_stmt_lru_delete (stmt_p);
650  }
651  stmt_p->lru_prev = NULL;
652  stmt_p->lru_next = NULL;
653  stmt_p->hash_next = NULL;
654  stmt_p->hash_prev = NULL;
655 
656  if (stmt_p->parser)
657  {
658  sp_destroy_parser (stmt_p->parser);
659  }
660  stmt_p->parser = NULL;
661 
662  FREE_MEM (stmt_p->request_buffer);
663  stmt_p->request_buffer_length = 0;
664 
665  FREE_MEM (stmt_p->reply_buffer);
666  stmt_p->reply_buffer_length = 0;
667 
668 
669  for (i = 0; i < num_cas; i++)
670  {
672  }
673 
674  shard_queue_destroy (&stmt_p->waitq);
675 
676  EXIT_FUNC ();
677  return;
678 }
679 
680 
681 /*
682  *
683  * return : stored server handle id
684  */
685 int
686 shard_stmt_find_srv_h_id_for_shard_cas (int stmt_h_id, int shard_id, int cas_id)
687 {
688  int srv_h_id;
689  T_SHARD_STMT *stmt_p = NULL;
690 
691  stmt_p = shard_stmt_find_by_stmt_h_id (stmt_h_id);
692  if (stmt_p == NULL)
693  {
694  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unable to find statement. (stmt_h_id:%d).", stmt_h_id);
695  return -1;
696  }
697 
698  srv_h_id = shard_stmt_find_srv_h_id (stmt_p, shard_id, cas_id);
699  if (srv_h_id == SHARD_STMT_INVALID_HANDLE_ID)
700  {
702  "Unable to find saved statement handle id. " "(shard_id:%d, cas_id:%d). stmt(%s).", shard_id, cas_id,
703  shard_str_stmt (stmt_p));
704  return -1;
705  }
706 
707  return srv_h_id;
708 }
709 
710 /*
711  *
712  * return : success or fail
713  */
714 int
715 shard_stmt_add_srv_h_id_for_shard_cas (int stmt_h_id, int shard_id, int cas_id, int srv_h_id)
716 {
717  int error;
718 
719  T_SHARD_STMT *stmt_p = NULL;
720 
721  stmt_p = shard_stmt_find_by_stmt_h_id (stmt_h_id);
722  if (stmt_p == NULL)
723  {
724  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unable to find statement. (stmt_h_id:%d).", stmt_h_id);
725  return -1;
726  }
727 
728  if (stmt_p->status == SHARD_STMT_STATUS_IN_PROGRESS)
729  {
730  shard_stmt_set_status_complete (stmt_h_id);
731 
732  /* check and wakeup statement waiter */
734  }
735 
736  error = shard_stmt_set_srv_h_id (stmt_p, shard_id, cas_id, srv_h_id);
737  if (error)
738  {
740  "Unable to save statement handle id. " "(shard_id:%d, cas_id:%d, srv_h_id:%d). stmt(%s).", shard_id,
741  cas_id, srv_h_id, shard_str_stmt (stmt_p));
742  return -1;
743  }
744 
745  PROXY_DEBUG_LOG ("save statement handle id. " "(shard_id:%d, cas_id:%d, srv_h_id:%d). statement(%s).", shard_id,
746  cas_id, srv_h_id, shard_str_stmt (stmt_p));
747 
748  return 0;
749 }
750 
751 void
752 shard_stmt_del_srv_h_id_for_shard_cas (int stmt_h_id, int shard_id, int cas_id)
753 {
754  int *srv_h_id_p;
755  T_SHARD_STMT *stmt_p = NULL;
756 
757  ENTER_FUNC ();
758 
759  stmt_p = shard_stmt_find_by_stmt_h_id (stmt_h_id);
760  if (stmt_p == NULL)
761  {
762  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unable to find statement. (stmt_h_id:%d).", stmt_h_id);
763  return;
764  }
765 
766  srv_h_id_p = shard_stmt_pos_srv_h_id (stmt_p, shard_id, cas_id);
767  if (srv_h_id_p)
768  {
769  *srv_h_id_p = SHARD_STMT_INVALID_HANDLE_ID;
770  }
771 
772  EXIT_FUNC ();
773  return;
774 }
775 
776 void
778 {
779  int i;
780  int *srv_h_id_p;
781 
782  T_SHARD_STMT *stmt_p = NULL;
783 
784  ENTER_FUNC ();
785 
786  for (i = 0; i < shard_Stmt.max_num_stmt; i++)
787  {
788  stmt_p = &(shard_Stmt.stmt_ent[i]);
789  if (stmt_p->status == SHARD_STMT_STATUS_UNUSED)
790  {
791  continue;
792  }
793 
794  srv_h_id_p = shard_stmt_pos_srv_h_id (stmt_p, shard_id, cas_id);
795  if (srv_h_id_p)
796  {
797  *srv_h_id_p = SHARD_STMT_INVALID_HANDLE_ID;
798  }
799  }
800 
801  EXIT_FUNC ();
802  return;
803 }
804 
805 int
807 {
808  int ret;
809  SP_PARSER_HINT *hint_p = NULL;
810  SP_HINT_TYPE hint_type = HT_NONE;
811 
813  if (stmt_p->stmt_type == SHARD_STMT_TYPE_SCHEMA_INFO)
814  {
815  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unexpected statement type. (expect:%d, current:%d).", SHARD_STMT_TYPE_PREPARED,
816  stmt_p->stmt_type);
817  return -1;
818  }
819 
820  ret = sp_parse_sql (stmt_p->parser);
821  if (ret < 0)
822  {
823  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to parse sql statement. " "(error:%d). stmt(%s).", ret,
824  shard_str_stmt (stmt_p));
825  return -1;
826  }
827 
828  hint_p = sp_get_first_hint (stmt_p->parser);
829  while (hint_p != NULL)
830  {
831  if (hint_type == HT_NONE)
832  {
833  hint_type = hint_p->hint_type;
834  }
835  else if (hint_type != hint_p->hint_type)
836  {
837  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unexpected hint type. (expect:%d, current:%d).", hint_p->hint_type,
838  hint_type);
839  return -1;
840  }
841 
842  /* currently only support HT_KEY and HT_ID */
843  if (hint_type != HT_KEY && hint_type != HT_ID)
844  {
845  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unsupported hint type. (hint_type:%d).", hint_type);
846  return -1;
847  }
848  hint_p = sp_get_next_hint (hint_p);
849  }
850  return 1;
851 }
852 
853 int
855 {
856  SP_PARSER_HINT *hint_p = NULL;
857 
858  if (stmt_p == NULL)
859  {
860  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Invalid statement. Statement couldn't be NULL.");
861  return HT_INVAL;
862  }
863 
865  if (stmt_p->stmt_type == SHARD_STMT_TYPE_SCHEMA_INFO)
866  {
867  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unexpected statement type. (expect:%d, current:%d).", SHARD_STMT_TYPE_PREPARED,
868  stmt_p->stmt_type);
869  return -1;
870  }
871 
872  if (stmt_p->parser == NULL)
873  {
874  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Invalid parser. Statement parser couldn't be NULL.");
875  return HT_INVAL;
876  }
877 
878  hint_p = sp_get_first_hint (stmt_p->parser);
879  if (hint_p == NULL)
880  {
881  return HT_NONE;
882  }
883 
884  return hint_p->hint_type;
885 }
886 
887 int
888 shard_stmt_save_prepare_request (T_SHARD_STMT * stmt_p, bool has_shard_val_hint, char **prepare_req,
889  int *prepare_req_len, char *argv_sql_stmt, char *argv_remainder, char *orgzd_sql)
890 {
891  int sql_size;
892  int orgzd_sql_size, n_orgzd_sql_size;
893  int req_msg_size;
894  int remainder_size;
895  int expand_size;
896  unsigned int offset;
897  char *prepare_req_header;
898  char *tmp_prepare_req;
899  char *sql_stmt;
900  char *cur_p;
901 
902  prepare_req_header = *(prepare_req);
903 
904  net_arg_get_str (&sql_stmt, &sql_size, argv_sql_stmt);
905 
906  if (has_shard_val_hint == true)
907  {
908  req_msg_size = get_msg_length (prepare_req_header);
909  orgzd_sql_size = strlen (orgzd_sql) + 1;
910 
911  if (sql_size >= orgzd_sql_size)
912  {
913  expand_size = 0;
914  }
915  else
916  {
917  expand_size = orgzd_sql_size - sql_size;
918  }
919 
920 
921  if (expand_size == 0)
922  {
923  /* replace organized sql */
924  memcpy ((argv_sql_stmt + NET_SIZE_INT), orgzd_sql, orgzd_sql_size);
925  }
926  else
927  {
928  stmt_p->request_buffer = (char *) malloc (req_msg_size + expand_size);
929 
930  if (stmt_p->request_buffer == NULL)
931  {
932  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Malloc failed.");
933  return -1;
934  }
935 
936  /* 1. append header and function code */
937  offset = argv_sql_stmt - prepare_req_header;
938  memcpy (stmt_p->request_buffer, prepare_req_header, offset);
939 
940  /* 2. append argv_sql_stmt : sql statement */
941  /* 2.1 set length */
942  cur_p = (char *) stmt_p->request_buffer + offset;
943  n_orgzd_sql_size = htonl (orgzd_sql_size);
944  memcpy (cur_p, &n_orgzd_sql_size, NET_SIZE_INT);
945 
946  /* 2.2 set organized sql string */
947  cur_p += NET_SIZE_INT;
948  memcpy (cur_p, orgzd_sql, orgzd_sql_size);
949 
950  /* 3. append argv_remainder ~ : the rest argvs */
951  cur_p += orgzd_sql_size;
952  remainder_size = (req_msg_size) - (argv_remainder - prepare_req_header);
953  memcpy (cur_p, argv_remainder, remainder_size);
954 
955  /* 4. set length */
956  stmt_p->request_buffer_length = (req_msg_size + expand_size);
958 
959  tmp_prepare_req = proxy_dup_msg ((char *) stmt_p->request_buffer);
960  if (tmp_prepare_req == NULL)
961  {
962  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to duplicate prepare request.");
963  return -1;
964  }
965 
966  FREE_MEM (*prepare_req);
967  *prepare_req = tmp_prepare_req;
968  *prepare_req_len = stmt_p->request_buffer_length;
969 
970  assert (stmt_p->request_buffer_length == get_msg_length (tmp_prepare_req));
971 
972  return 0;
973  }
974  }
975 
976  stmt_p->request_buffer = proxy_dup_msg (prepare_req_header);
977  if (stmt_p->request_buffer == NULL)
978  {
979  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Failed to duplicate prepare request.");
980  return -1;
981  }
982  stmt_p->request_buffer_length = get_msg_length (prepare_req_header);
983 
984  return 0;
985 }
986 
987 #if defined (PROXY_VERBOSE_DEBUG)
988 void
989 shard_stmt_dump_title (FILE * fp)
990 {
991  fprintf (fp, "[%-10s][%-5s] %-10s %-2s %-10s %-10s %-10s\n", "INDEX", "NUSED", "STMT_H_ID", "ST", "NUM_PINNED",
992  "LRU_NEXT", "LRU_PREV");
993 
994  return;
995 }
996 
997 void
998 shard_stmt_dump (FILE * fp, T_SHARD_STMT * stmt_p)
999 {
1000  fprintf (fp, "[%-10u][%-5u] %-10d %-2d %-10d %-10p %-10p\n", stmt_p->index, stmt_p->num_alloc, stmt_p->stmt_h_id,
1001  stmt_p->status, stmt_p->num_pinned, stmt_p->lru_next, stmt_p->lru_prev);
1002 
1003  return;
1004 }
1005 
1006 void
1007 shard_stmt_dump_all (FILE * fp)
1008 {
1009  T_SHARD_STMT *stmt_p;
1010  int i;
1011 
1012  fprintf (fp, "\n");
1013  fprintf (fp, "* %-20s : %d\n", "MAX_NUM_STMT", shard_Stmt.max_num_stmt);
1014  fprintf (fp, "* %-20s : %d\n", "MAX_NUM_SHARD", shard_Stmt.max_num_shard);
1015  fprintf (fp, "* %-20s : %d\n", "NUM_CAS_PER_SHARD", shard_Stmt.num_cas_per_shard);
1016  fprintf (fp, "\n");
1017 
1018  shard_stmt_dump_title (fp);
1019  for (i = 0; i < shard_Stmt.max_num_stmt; i++)
1020  {
1021  stmt_p = &(shard_Stmt.stmt_ent[i]);
1022  shard_stmt_dump (fp, stmt_p);
1023  }
1024 
1025  return;
1026 }
1027 #endif /* ENABLE_UNUSED_FUNCTION */
1028 
1029 char *
1031 {
1032  static char buffer[BUFSIZ];
1033 
1034  if (stmt_p == NULL)
1035  {
1036  return (char *) "-";
1037  }
1038 
1039  snprintf (buffer, sizeof (buffer),
1040  "index:%u, num_alloc:%u, " "stmt_h_id:%d, status:%d, " "stmt_type:%d, " "context id:%d, context uid:%d, "
1041  "num pinned:%d, " "lru_next:%p, lru_prev:%p, " "hash_next:%p, hash_prev:%p, " "db_user:%s, sql_stmt:[%s]",
1042  stmt_p->index, stmt_p->num_alloc, stmt_p->stmt_h_id, stmt_p->status, stmt_p->stmt_type, stmt_p->ctx_cid,
1043  stmt_p->ctx_uid, stmt_p->num_pinned, stmt_p->lru_next, stmt_p->lru_prev, stmt_p->hash_next,
1044  stmt_p->hash_prev, stmt_p->database_user,
1045  (stmt_p->parser) ? ((stmt_p->parser->sql_stmt) ? shard_str_sqls (stmt_p->parser->sql_stmt) : "-") : "-");
1046 
1047  return (char *) buffer;
1048 }
1049 
1050 int
1051 shard_stmt_initialize (int initial_size)
1052 {
1053  T_SHARD_STMT *stmt_p;
1054  T_SHARD_INFO *shard_info_p;
1055  int mem_size;
1056  int num_cas;
1057  int i, j;
1058 
1059  static bool is_init = false;
1060 
1061  if (is_init)
1062  {
1063  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Statement have not been initialized.");
1064  return -1;
1065  }
1066 
1067  shard_info_p = shard_shm_find_shard_info (proxy_info_p, 0);
1068 
1069  mem_size = initial_size * sizeof (T_SHARD_STMT);
1070  shard_Stmt.stmt_ent = (T_SHARD_STMT *) malloc (mem_size);
1071  if (shard_Stmt.stmt_ent == NULL)
1072  {
1074  "Not enough virtual memory. " "Failed to alloc statement entries. " "(errno:%d, size:%d).", errno,
1075  mem_size);
1076 
1077  return -1;
1078  }
1079  memset (shard_Stmt.stmt_ent, 0, mem_size);
1080 
1081  shard_Stmt.max_num_stmt = initial_size;
1082  shard_Stmt.max_num_shard = proxy_info_p->max_shard;
1083  shard_Stmt.num_cas_per_shard = shard_info_p->max_appl_server;
1084  shard_Stmt.mru = NULL;
1085  shard_Stmt.lru = NULL;
1086 
1087  shard_Stmt_max_num_alloc = MAX (shard_Stmt_max_num_alloc, (INT_MAX / shard_Stmt.max_num_stmt) - 1);
1088 
1089  shard_Stmt.stmt_map =
1090  mht_create ("Proxy statement pool", shard_Stmt.max_num_stmt, mht_1strhash, mht_compare_strings_are_equal);
1091  if (shard_Stmt.stmt_map == NULL)
1092  {
1093  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Not enough virtual memory. " "Failed to alloc map of statement. " "(errno:%d).",
1094  errno);
1095 
1096  return -1;
1097  }
1098 
1099  num_cas = shard_Stmt.max_num_shard * shard_Stmt.num_cas_per_shard;
1100 
1101  for (i = 0; i < shard_Stmt.max_num_stmt; i++)
1102  {
1103  stmt_p = &(shard_Stmt.stmt_ent[i]);
1104  memset (stmt_p, 0, sizeof (T_SHARD_STMT));
1105 
1106  stmt_p->index = i;
1107  stmt_p->ctx_cid = PROXY_INVALID_CONTEXT;
1108  stmt_p->ctx_uid = 0;
1109  stmt_p->database_user[0] = '\0';
1110 
1111  stmt_p->client_version = 0;
1112 
1113  stmt_p->srv_h_id_ent = (int *) malloc (num_cas * sizeof (int));
1114  if (stmt_p->srv_h_id_ent == NULL)
1115  {
1117  "Not enough virtual memory. " "Failed to alloc statement handle id entries. "
1118  "(errno:%d, size:%d).", errno, mem_size);
1119 
1120 
1121  return -1;
1122  }
1123  for (j = 0; j < num_cas; j++)
1124  {
1126  }
1127  }
1128 
1129  return 0;
1130 }
1131 
1132 char *
1133 shard_stmt_rewrite_sql (bool * has_shard_val_hint, char *sql_stmt, char appl_server)
1134 {
1135  int error = NO_ERROR;
1136  const char *p = NULL;
1137  const char *next_p = NULL;
1138  char *q = NULL;
1139  char *organized_sql_stmt = NULL;
1140  SP_HINT_TYPE hint_type = HT_NONE;
1141 #if 0 /* for fully rewriting sql statement */
1142  SP_TOKEN curr_token;
1143  SP_TOKEN prev_token;
1144  int whitespace_count = 0;
1145 #endif
1146 
1147  while (*sql_stmt && isspace (*sql_stmt))
1148  {
1149  sql_stmt++;
1150  }
1151 
1158  organized_sql_stmt = (char *) malloc (sizeof (char) * (strlen (sql_stmt) + 2));
1159  if (organized_sql_stmt == NULL)
1160  {
1161  return NULL;
1162  }
1163 
1164  p = sql_stmt;
1165  q = organized_sql_stmt;
1166 
1167 #if 0 /* for fully rewriting sql statement */
1168  prev_token = curr_token = TT_NONE;
1169  while (*p)
1170  {
1171  next_p = sp_get_token_type (p, &curr_token);
1172 
1173  switch (prev_token)
1174  {
1175  case TT_SINGLE_QUOTED:
1176  case TT_DOUBLE_QUOTED:
1177  switch (curr_token)
1178  {
1179  case TT_SINGLE_QUOTED:
1180  case TT_DOUBLE_QUOTED:
1181  q = shard_stmt_write_buf_to_sql (q, "'", 1, true, appl_server);
1182  break;
1183  default:
1184  q = shard_stmt_write_buf_to_sql (q, p, next_p - p, false, appl_server);
1185  }
1186  break;
1187  case TT_CPP_COMMENT:
1188  case TT_CSQL_COMMENT:
1189  if (curr_token == TT_NEWLINE)
1190  {
1191  whitespace_count = 0;
1192  q = shard_stmt_write_buf_to_sql (q, "*/", 2, true, appl_server);
1193  break;
1194  }
1195  default:
1196  switch (curr_token)
1197  {
1198  case TT_NEWLINE:
1199  case TT_WHITESPACE:
1200  if (whitespace_count++ < 1)
1201  {
1202  q = shard_stmt_write_buf_to_sql (q, " ", 1, true, appl_server);
1203  }
1204  break;
1205  case TT_SINGLE_QUOTED:
1206  case TT_DOUBLE_QUOTED:
1207  q = shard_stmt_write_buf_to_sql (q, "'", 1, true, appl_server);
1208  break;
1209  case TT_CSQL_COMMENT:
1210  case TT_CPP_COMMENT:
1211  q = shard_stmt_write_buf_to_sql (q, "/*", 2, true, appl_server);
1212  p = next_p;
1213  if (*p == '+')
1214  {
1215  p++;
1216  p = sp_get_hint_type (p, &hint_type);
1217  if (hint_type == HT_VAL)
1218  {
1219  error = shard_stmt_change_shard_val_to_id (&q, &p, appl_server);
1220  if (error != NO_ERROR)
1221  {
1222  free (organized_sql_stmt);
1223  organized_sql_stmt = NULL;
1224 
1225  return NULL;
1226  }
1227  next_p = p;
1228  }
1229  }
1230  break;
1231  case TT_HINT:
1232  q = shard_stmt_write_buf_to_sql (q, "/*+", 3, true, appl_server);
1233  p = next_p;
1234  p = sp_get_hint_type (p, &hint_type);
1235  if (hint_type == HT_VAL)
1236  {
1237  error = shard_stmt_change_shard_val_to_id (&q, &p, appl_server);
1238  if (error != NO_ERROR)
1239  {
1240  free (organized_sql_stmt);
1241  organized_sql_stmt = NULL;
1242 
1243  return NULL;
1244  }
1245  next_p = p;
1246  }
1247  break;
1248  default:
1249  q = shard_stmt_write_buf_to_sql (q, p, next_p - p, true, appl_server);
1250  break;
1251  }
1252  }
1253 
1254  if (curr_token != TT_WHITESPACE && curr_token != TT_NEWLINE)
1255  {
1256  whitespace_count = 0;
1257  }
1258 
1259  if (prev_token == TT_NONE)
1260  {
1261  if (sp_is_exist_pair_token (curr_token))
1262  {
1263  prev_token = curr_token;
1264  }
1265  }
1266  else if (sp_is_pair_token (prev_token, curr_token))
1267  {
1268  prev_token = TT_NONE;
1269  }
1270 
1271  p = next_p;
1272  }
1273 #else
1274  while (*(p))
1275  {
1276  if (*(p) == '/' && *(p + 1) == '*' && *(p + 2) == '+')
1277  {
1278  q = shard_stmt_write_buf_to_sql (q, "/*+", 3, true, appl_server);
1279  p = p + 3;
1280  next_p = p;
1281  p = sp_get_hint_type (p, &hint_type);
1282  if (hint_type == HT_VAL)
1283  {
1284  error = shard_stmt_change_shard_val_to_id (&q, &p, appl_server);
1285  if (error != NO_ERROR)
1286  {
1287  free (organized_sql_stmt);
1288  organized_sql_stmt = NULL;
1289 
1290  return NULL;
1291  }
1292 
1293  *(has_shard_val_hint) = true;
1294  }
1295  else
1296  {
1297  p = next_p;
1298  }
1299  }
1300  else
1301  {
1302  q = shard_stmt_write_buf_to_sql (q, p, 1, true, appl_server);
1303  p += 1;
1304  }
1305  }
1306 #endif
1307 
1308  *(q++) = '\0'; /* NTS */
1309 
1310  return organized_sql_stmt;
1311 }
1312 
1313 static int
1314 shard_stmt_change_shard_val_to_id (char **sql_stmt, const char **buf, char appl_server)
1315 {
1316  int error = NO_ERROR;
1317  SP_PARSER_HINT *hint_p = NULL;
1318  T_SHARD_KEY *key_p = NULL;
1319  T_SHARD_KEY_RANGE *range_p = NULL;
1320  const char *key_column;
1321  int shard_key_id = -1;
1322  char shard_key_id_string[14];
1323 
1324  hint_p = sp_create_parser_hint ();
1325  if (hint_p == NULL)
1326  {
1327  error = ER_SP_OUT_OF_MEMORY;
1328  goto FINALLY;
1329  }
1330 
1331  hint_p->hint_type = HT_VAL;
1332 
1333  *buf = sp_get_hint_arg (*buf, hint_p, &error);
1334  if (error != NO_ERROR)
1335  {
1336  goto FINALLY;
1337  }
1338 
1339  *buf = sp_check_end_of_hint (*buf, &error);
1340  if (error != NO_ERROR)
1341  {
1342  goto FINALLY;
1343  }
1344 
1345  key_p = (T_SHARD_KEY *) (&(shm_key_p->shard_key[0]));
1346  key_column = key_p->key_column;
1347 
1348  shard_key_id = proxy_find_shard_id_by_hint_value (&hint_p->arg, key_column);
1349  if (shard_key_id < 0)
1350  {
1351  error = ER_SP_INVALID_HINT;
1352  goto FINALLY;
1353  }
1354 
1355  range_p = shard_metadata_find_shard_range (shm_key_p, key_column, shard_key_id);
1356  if (range_p == NULL)
1357  {
1358  error = ER_SP_INVALID_HINT;
1359 
1360  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Unable to find shard key range. " "(key:[%s], key_id:%d).", key_column,
1361  shard_key_id);
1362 
1363  goto FINALLY;
1364  }
1365 
1366  sprintf (shard_key_id_string, "shard_id(%d)*/", range_p->shard_id);
1367  *sql_stmt =
1368  shard_stmt_write_buf_to_sql (*sql_stmt, shard_key_id_string, strlen (shard_key_id_string), true, appl_server);
1369 
1370 FINALLY:
1371  if (hint_p != NULL)
1372  {
1373  sp_free_parser_hint (hint_p);
1374  free (hint_p);
1375  }
1376  return error;
1377 }
1378 
1379 char *
1380 shard_stmt_write_buf_to_sql (char *sql_stmt, const char *buf, int length, bool is_to_upper, char appl_server)
1381 {
1382  int i = 0;
1383 
1384  for (; i < length; i++)
1385  {
1386 #if 0
1387  if (is_to_upper && appl_server == APPL_SERVER_CAS)
1388  {
1389  *(sql_stmt++) = toupper (*(buf++));
1390  }
1391  else
1392  {
1393  *(sql_stmt++) = *(buf++);
1394  }
1395 #else
1396  *(sql_stmt++) = *(buf++);
1397 #endif
1398  }
1399 
1400  return sql_stmt;
1401 }
1402 
1403 void
1405 {
1406  T_SHARD_STMT *stmt_p;
1407  int i, now;
1408 
1409  now = time (NULL);
1410 
1411  for (i = 0; i < shard_Stmt.max_num_stmt; i++)
1412  {
1413  stmt_p = &(shard_Stmt.stmt_ent[i]);
1414  if (stmt_p->status == SHARD_STMT_STATUS_UNUSED)
1415  {
1416  continue;
1417  }
1418 
1419  proxy_waiter_timeout (&stmt_p->waitq, &proxy_info_p->stmt_waiter_count, now);
1420  }
1421 
1422  return;
1423 }
1424 
1425 void
1427 {
1429 }
1430 
1431 void
1433 {
1435 }
1436 
1437 static void
1438 shard_stmt_put_statement_to_map (const char *sql_stmt, T_SHARD_STMT * stmt_p)
1439 {
1440  T_SHARD_STMT *hash_stmt_p = NULL;
1441 
1442  hash_stmt_p = (T_SHARD_STMT *) mht_get (shard_Stmt.stmt_map, sql_stmt);
1443  if (hash_stmt_p == NULL)
1444  {
1445  mht_put (shard_Stmt.stmt_map, sp_get_sql_stmt (stmt_p->parser), stmt_p);
1446  stmt_p->hash_prev = NULL;
1447  stmt_p->hash_next = NULL;
1448 
1449  PROXY_DEBUG_LOG ("SHARD_STMT PUT : %s", shard_str_stmt (stmt_p));
1450  }
1451  else
1452  {
1453  while (hash_stmt_p->hash_next)
1454  {
1455  hash_stmt_p = hash_stmt_p->hash_next;
1456  }
1457 
1458  hash_stmt_p->hash_next = stmt_p;
1459  stmt_p->hash_prev = hash_stmt_p;
1460  stmt_p->hash_next = NULL;
1461  }
1462 }
1463 
1464 static void
1466 {
1467  const char *key = NULL;
1468  T_SHARD_STMT *hash_stmt_p = NULL;
1469  T_SHARD_STMT *prev_stmt_p = NULL;
1470  T_SHARD_STMT *next_stmt_p = NULL;
1471 
1472  key = sp_get_sql_stmt (stmt_p->parser);
1473 
1474  hash_stmt_p = (T_SHARD_STMT *) mht_get (shard_Stmt.stmt_map, key);
1475  if (hash_stmt_p == NULL)
1476  {
1477  return;
1478  }
1479 
1480  while (hash_stmt_p != NULL && stmt_p != hash_stmt_p)
1481  {
1482  hash_stmt_p = hash_stmt_p->hash_next;
1483  }
1484 
1485  if (hash_stmt_p == NULL || stmt_p != hash_stmt_p)
1486  {
1487  assert (false);
1488  return;
1489  }
1490 
1491  if (stmt_p->hash_prev)
1492  {
1493  prev_stmt_p = stmt_p->hash_prev;
1494  next_stmt_p = stmt_p->hash_next;
1495 
1496  prev_stmt_p->hash_next = next_stmt_p;
1497  if (next_stmt_p)
1498  {
1499  next_stmt_p->hash_prev = prev_stmt_p;
1500  }
1501  }
1502  else
1503  {
1504  mht_rem (shard_Stmt.stmt_map, key, NULL, NULL);
1505 
1506  if (stmt_p->hash_next)
1507  {
1508  next_stmt_p = stmt_p->hash_next;
1509 
1510  next_stmt_p->hash_prev = NULL;
1511  mht_put (shard_Stmt.stmt_map, sp_get_sql_stmt (next_stmt_p->parser), next_stmt_p);
1512  }
1513  }
1514 }
1515 
1516 static void
1517 shard_stmt_set_status (int stmt_h_id, int status)
1518 {
1519  T_SHARD_STMT *stmt_p = NULL;
1520 
1521  stmt_p = shard_stmt_find_by_stmt_h_id (stmt_h_id);
1522  if (stmt_p == NULL)
1523  {
1524  assert (false);
1525  return;
1526  }
1527 
1528  if (stmt_p->status == SHARD_STMT_STATUS_INVALID && status == SHARD_STMT_STATUS_COMPLETE)
1529  {
1530  assert (false);
1531  }
1532 
1533  stmt_p->status = status;
1534 }
T_SHARD_STMT * lru_prev
static int shard_stmt_lru_insert(T_SHARD_STMT *stmt_p)
#define APPL_SERVER_CAS
Definition: broker_config.h:34
enum sp_hint_type SP_HINT_TYPE
Definition: shard_parser.h:59
#define MSG_HEADER_SIZE
Definition: cas_protocol.h:114
#define NO_ERROR
Definition: error_code.h:46
void set_data_length(char *buffer, int length)
T_SHARD_QUEUE waitq
unsigned int mht_1strhash(const void *key, const unsigned int ht_size)
Definition: memory_hash.c:447
T_SHARD_INFO * shard_shm_find_shard_info(T_PROXY_INFO *proxy_info_p, int shard_id)
Definition: shard_shm.c:433
void sp_free_parser_hint(SP_PARSER_HINT *hint_p)
Definition: shard_parser.c:416
T_PROXY_CONTEXT * proxy_context_find(int cid, unsigned int uid)
void sp_destroy_parser(SP_PARSER_CTX *parser_p)
Definition: shard_parser.c:116
#define PROXY_DEBUG_LOG(fmt, args...)
#define CAS_PROTO_MAKE_VER(VER)
Definition: cas_protocol.h:284
unsigned int htonl(unsigned int from)
char database_user[SRV_CON_DBUSER_SIZE]
int shard_stmt_unpin(T_SHARD_STMT *stmt_p)
static int shard_stmt_set_srv_h_id(T_SHARD_STMT *stmt_p, int shard_id, int cas_id, int srv_h_id)
T_SHARD_STMT_GLOBAL shard_Stmt
#define PROXY_INVALID_CONTEXT
int mht_rem(MHT_TABLE *ht, const void *key, int(*rem_func)(const void *key, void *data, void *args), void *func_args)
Definition: memory_hash.c:1952
static int shard_stmt_find_srv_h_id(T_SHARD_STMT *stmt_p, int shard_id, int cas_id)
char database_user[SRV_CON_DBUSER_SIZE]
int get_msg_length(char *buffer)
void shard_stmt_destroy(void)
const void * mht_put(MHT_TABLE *ht, const void *key, void *data)
Definition: memory_hash.c:1778
int shard_stmt_add_srv_h_id_for_shard_cas(int stmt_h_id, int shard_id, int cas_id, int srv_h_id)
#define EXIT_FUNC()
T_SHARD_KEY_RANGE * shard_metadata_find_shard_range(T_SHM_SHARD_KEY *shm_key_p, const char *key, unsigned int hash_res)
static T_BROKER_VERSION shard_stmt_make_protocol_version(T_BROKER_VERSION client_version)
unsigned int num_alloc
unsigned int index
SP_HINT_TYPE hint_type
Definition: shard_parser.h:122
static T_SHARD_STMT * shard_stmt_get_lru(void)
static T_SHARD_STMT * shard_stmt_find_unused(void)
#define CAS_MAKE_VER(MAJOR, MINOR, PATCH)
Definition: cas_protocol.h:304
int max_appl_server
Definition: broker_shm.h:416
T_PROXY_INFO * proxy_info_p
Definition: shard_proxy.c:48
T_SHARD_STMT * lru
static char * shard_stmt_write_buf_to_sql(char *sql_stmt, const char *buf, int length, bool is_to_upper, char appl_server)
int request_buffer_length
T_SHARD_STMT * shard_stmt_new_schema_info(int ctx_cid, unsigned int ctx_uid)
char * shard_str_sqls(char *sql)
SP_PARSER_HINT * sp_get_first_hint(SP_PARSER_CTX *parser_p)
Definition: shard_parser.c:155
#define ENTER_FUNC()
SP_PARSER_CTX * sp_create_parser(const char *sql_stmt)
Definition: shard_parser.c:63
const char * sp_check_end_of_hint(const char *sql, int *error)
Definition: shard_parser.c:649
static int shard_stmt_lru_delete(T_SHARD_STMT *stmt_p)
#define APPL_SERVER_CAS_MYSQL
Definition: broker_config.h:37
void * reply_buffer
void mht_destroy(MHT_TABLE *ht)
Definition: memory_hash.c:1140
#define CAS_PROTO_CURRENT_VER
Definition: cas_protocol.h:286
T_BROKER_VERSION client_version
T_SHARD_STMT * shard_stmt_new_prepared_stmt(char *sql_stmt, int ctx_cid, unsigned int ctx_uid, T_BROKER_VERSION client_version)
#define assert(x)
void shard_statement_wait_timer(void)
struct t_shard_stmt T_SHARD_STMT
const char * sp_get_token_type(const char *sql, SP_TOKEN *token)
Definition: shard_parser.c:489
int shard_Stmt_max_num_alloc
SP_PARSER_CTX * parser
int shard_stmt_get_hint_type(T_SHARD_STMT *stmt_p)
bool sp_is_pair_token(SP_TOKEN start_token, SP_TOKEN end_token)
Definition: shard_parser.c:203
INT64 stmt_waiter_count
Definition: broker_shm.h:480
void net_arg_get_str(char **value, int *size, void *arg)
Definition: cas_net_buf.c:528
unsigned int ctx_uid
T_SHARD_STMT * shard_stmt_new_exclusive(char *sql_stmt, int ctx_cid, unsigned int ctx_uid, T_BROKER_VERSION client_version)
static int shard_stmt_change_shard_val_to_id(char **sql_stmt, const char **buf, char appl_server)
int shard_stmt_set_hint_list(T_SHARD_STMT *stmt_p)
void * request_buffer
char appl_server
Definition: broker_shm.h:470
void shard_stmt_set_status_invalid(int stmt_h_id)
void * mht_get(MHT_TABLE *ht, const void *key)
Definition: memory_hash.c:1419
#define NULL
Definition: freelistheap.h:34
#define strncpy_bufsize(buf, str)
Definition: porting.h:340
char * shard_str_stmt(T_SHARD_STMT *stmt_p)
int shard_stmt_save_prepare_request(T_SHARD_STMT *stmt_p, bool has_shard_val_hint, char **prepare_req, int *prepare_req_len, char *argv_sql_stmt, char *argv_remainder, char *orgzd_sql)
if(extra_options)
Definition: dynamic_load.c:958
#define FREE_MEM(PTR)
Definition: cas_common.h:58
MHT_TABLE * mht_create(const char *name, int est_size, unsigned int(*hash_func)(const void *key, unsigned int ht_size), int(*cmp_func)(const void *key1, const void *key2))
Definition: memory_hash.c:894
char * proxy_dup_msg(char *msg)
int proxy_find_shard_id_by_hint_value(SP_VALUE *value_p, const char *key_column)
const char * sp_get_hint_arg(const char *sql, SP_PARSER_HINT *hint_p, int *error)
Definition: shard_parser.c:610
enum sp_token SP_TOKEN
Definition: shard_parser.h:94
SP_PARSER_HINT * sp_create_parser_hint(void)
Definition: shard_parser.c:333
T_SHARD_STMT * stmt_ent
T_SHARD_STMT * shard_stmt_find_by_sql(char *sql_stmt, const char *db_user, T_BROKER_VERSION client_version)
char * shard_stmt_rewrite_sql(bool *has_shard_val_hint, char *sql_stmt, char appl_server)
static void error(const char *msg)
Definition: gencat.c:331
int shard_stmt_pin(T_SHARD_STMT *stmt_p)
static T_SHARD_STMT * shard_stmt_new_internal(int stmt_type, char *sql_stmt, int ctx_cid, unsigned int ctx_uid, T_BROKER_VERSION client_version)
void proxy_waiter_timeout(T_SHARD_QUEUE *waitq, INT64 *counter, int now)
T_SHARD_KEY shard_key[MAX_SHARD_KEY]
Definition: broker_shm.h:270
T_SHARD_STMT * lru_next
T_SHARD_STMT * mru
void shard_stmt_check_waiter_and_wakeup(T_SHARD_STMT *stmt_p)
void shard_queue_destroy(T_SHARD_QUEUE *q)
static int * shard_stmt_pos_srv_h_id(T_SHARD_STMT *stmt_p, int shard_id, int cas_id)
void shard_stmt_set_status_complete(int stmt_h_id)
#define strlen(s1)
Definition: intl_support.c:43
void shard_stmt_del_all_srv_h_id_for_shard_cas(int shard_id, int cas_id)
#define PROXY_LOG(level, fmt, args...)
char key_column[SHARD_KEY_COLUMN_LEN]
Definition: broker_shm.h:261
static void shard_stmt_put_statement_to_map(const char *sql_stmt, T_SHARD_STMT *stmt_p)
#define SHARD_STMT_MAX_NUM_ALLOC
const char * sp_get_hint_type(const char *sql, SP_HINT_TYPE *hint_type)
Definition: shard_parser.c:575
int T_BROKER_VERSION
Definition: cas_protocol.h:342
void shard_stmt_free(T_SHARD_STMT *stmt_p)
static void shard_stmt_set_status(int stmt_h_id, int status)
int i
Definition: dynamic_load.c:954
void * shard_queue_dequeue(T_SHARD_QUEUE *q)
#define NET_SIZE_INT
Definition: cas_network.h:43
int shard_stmt_initialize(int initial_size)
bool sp_is_exist_pair_token(SP_TOKEN token)
Definition: shard_parser.c:852
int shard_queue_initialize(T_SHARD_QUEUE *q)
int shard_stmt_find_srv_h_id_for_shard_cas(int stmt_h_id, int shard_id, int cas_id)
T_SHARD_STMT * shard_stmt_find_by_stmt_h_id(int stmt_h_id)
int sp_parse_sql(SP_PARSER_CTX *parser_p)
Definition: shard_parser.c:96
const char * sp_get_sql_stmt(SP_PARSER_CTX *parser_p)
Definition: shard_parser.c:197
SP_PARSER_HINT * sp_get_next_hint(SP_PARSER_HINT *hint_p)
Definition: shard_parser.c:161
static void shard_stmt_del_statement_from_map(T_SHARD_STMT *stmt_p)
T_SHARD_STMT * hash_prev
int mht_compare_strings_are_equal(const void *key1, const void *key2)
Definition: memory_hash.c:767
T_SHARD_STMT * hash_next
T_SHM_SHARD_KEY * shm_key_p
Definition: shard_proxy.c:51
int proxy_wakeup_context_by_statement(T_WAIT_CONTEXT *waiter_p)
const char ** p
Definition: dynamic_load.c:945
void shard_stmt_del_srv_h_id_for_shard_cas(int stmt_h_id, int shard_id, int cas_id)
#define SHARD_STMT_INVALID_HANDLE_ID