CUBRID Engine  latest
connection_list_cl.c
Go to the documentation of this file.
1 /*
2  * Copyright 2008 Search Solution Corporation
3  * Copyright 2016 CUBRID Corporation
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 /*
20  * connection_list_cl.c - Queuing routines used for saving data and commands
21  */
22 
23 #ident "$Id$"
24 
25 #include "config.h"
26 
27 #include <stdio.h>
28 #include <string.h>
29 #if defined(WINDOWS)
30 #include <winsock2.h>
31 #else /* WINDOWS */
32 #include <sys/types.h>
33 #include <netinet/in.h>
34 #endif /* WINDOWS */
35 
36 #include "connection_cl.h"
37 #if defined(WINDOWS)
38 #include "wintcp.h"
39 #else /* WINDOWS */
40 #include "tcp.h"
41 #endif /* WINDOWS */
42 #include "system_parameter.h"
43 #include "connection_list_cl.h"
44 
45 static CSS_QUEUE_ENTRY *css_make_queue_entry (unsigned int key, char *buffer, int size, CSS_QUEUE_ENTRY * next, int rc,
46  int transid, int invalidate_snapshot, int db_error);
47 static void css_free_queue_entry (CSS_QUEUE_ENTRY * entry_p);
48 static int css_add_entry_to_header (CSS_QUEUE_ENTRY ** anchor, unsigned short request_id, char *buffer, int buffer_size,
49  int rc, int transid, int invalidate_snapshot, int db_error);
50 static bool css_is_request_aborted (CSS_CONN_ENTRY * conn, unsigned short request_id);
51 static void css_queue_data_packet (CSS_CONN_ENTRY * conn, unsigned short request_id, NET_HEADER * header);
52 static void css_queue_error_packet (CSS_CONN_ENTRY * conn, unsigned short request_id, NET_HEADER * header);
53 static void css_queue_command_packet (CSS_CONN_ENTRY * conn, unsigned short request_id, NET_HEADER * header, int size);
54 static void css_process_abort_packet (CSS_CONN_ENTRY * conn, unsigned short request_id);
55 
56 /*
57  * css_make_queue_entry() -
58  * return:
59  * key(in):
60  * buffer(in):
61  * size(in):
62  * next(in):
63  * rc(in):
64  * transid(in):
65  * invalidate_snapshot(in): true, if need to invalidate the snapshot
66  * db_error(in):
67  */
68 static CSS_QUEUE_ENTRY *
69 css_make_queue_entry (unsigned int key, char *buffer, int size, CSS_QUEUE_ENTRY * next, int rc, int transid,
70  int invalidate_snapshot, int db_error)
71 {
72  CSS_QUEUE_ENTRY *entry_p;
73 
74  entry_p = (CSS_QUEUE_ENTRY *) malloc (sizeof (CSS_QUEUE_ENTRY));
75  if (entry_p == NULL)
76  {
77  return NULL;
78  }
79 
80  entry_p->key = key;
81  entry_p->buffer = buffer;
82  entry_p->next = next;
83  entry_p->size = size;
84  entry_p->rc = rc;
85  entry_p->transaction_id = transid;
86  entry_p->invalidate_snapshot = invalidate_snapshot;
87  entry_p->db_error = db_error;
88 
89  return entry_p;
90 }
91 
92 /*
93  * css_free_queue_entry() -
94  * return:
95  * header(in):
96  */
97 static void
99 {
100  if (entry_p != NULL)
101  {
102  if (entry_p->buffer)
103  {
104  free_and_init (entry_p->buffer);
105  }
106  free_and_init (entry_p);
107  }
108 }
109 
110 /*
111  * css_find_queue_entry() -
112  * return:
113  * header(in):
114  * key(in):
115  */
117 css_find_queue_entry (CSS_QUEUE_ENTRY * header, unsigned int key)
118 {
119  CSS_QUEUE_ENTRY *entry_p;
120 
121  for (entry_p = header; entry_p; entry_p = entry_p->next)
122  {
123  if (entry_p->key == key)
124  {
125  return entry_p;
126  }
127  }
128 
129  return NULL;
130 }
131 
132 /*
133  * css_add_entry_to_header() - add an entry to a queue header
134  * return:
135  * anchor(in/out):
136  * request_id(in):
137  * buffer(in):
138  * buffer_size(in):
139  * rc(in):
140  * transid(in):
141  * invalidate_snapshot(in):
142  * db_error(in):
143  *
144  * Note: this will add an entry to the end of the header
145  */
146 static int
147 css_add_entry_to_header (CSS_QUEUE_ENTRY ** anchor, unsigned short request_id, char *buffer, int buffer_size, int rc,
148  int transid, int invalidate_snapshot, int db_error)
149 {
150  CSS_QUEUE_ENTRY *enrty_p, *new_entry_p;
151 
152  new_entry_p =
153  css_make_queue_entry (request_id, buffer, buffer_size, NULL, rc, transid, invalidate_snapshot, db_error);
154  if (new_entry_p == NULL)
155  {
156  return CANT_ALLOC_BUFFER;
157  }
158 
159  if (*anchor == NULL)
160  {
161  *anchor = new_entry_p;
162  }
163  else
164  {
165  enrty_p = *anchor;
166  while (enrty_p->next)
167  {
168  enrty_p = enrty_p->next;
169  }
170 
171  enrty_p->next = new_entry_p;
172  }
173 
174  return NO_ERRORS;
175 }
176 
177 /*
178  * css_queue_remove_header() - remove an entire column from the queue anchor
179  * return:
180  * anchor(in/out):
181  */
182 void
184 {
185  CSS_QUEUE_ENTRY *entry_p, *prev_p;
186 
187  if (*anchor == NULL)
188  {
189  return;
190  }
191 
192  prev_p = *anchor;
193  entry_p = (*anchor)->next;
194 
195  while (prev_p)
196  {
197  css_free_queue_entry (prev_p);
198  prev_p = entry_p;
199 
200  if (entry_p)
201  {
202  entry_p = entry_p->next;
203  }
204  else
205  {
206  entry_p = NULL;
207  }
208  }
209 
210  *anchor = NULL;
211 }
212 
213 /*
214  * css_queue_remove_header_entry() - remove an entry from the header
215  * return:
216  * anchor(in/out):
217  * request_id(in):
218  */
219 void
220 css_queue_remove_header_entry (CSS_QUEUE_ENTRY ** anchor, unsigned short request_id)
221 {
222  CSS_QUEUE_ENTRY *entry_p, *prev_p;
223 
224  if (*anchor == NULL)
225  {
226  return;
227  }
228 
229  entry_p = *anchor;
230  prev_p = NULL;
231 
232  while (entry_p)
233  {
234  if (entry_p->key == request_id)
235  {
236  if (*anchor == entry_p)
237  {
238  *anchor = entry_p->next;
239  }
240  else
241  {
242  prev_p->next = entry_p->next;
243  }
244 
245  css_free_queue_entry (entry_p);
246  break;
247  }
248 
249  prev_p = entry_p;
250  entry_p = entry_p->next;
251  }
252 }
253 
254 /*
255  * css_queue_remove_header_entry_ptr() -
256  * return:
257  * anchor(in):
258  * entry(in):
259  */
260 void
262 {
263  CSS_QUEUE_ENTRY *entry_p, *prev_p;
264 
265  if (*anchor == NULL)
266  {
267  return;
268  }
269 
270  entry_p = *anchor;
271  prev_p = nullptr;
272 
273  while (entry_p)
274  {
275  if (entry_p == entry)
276  {
277  if (*anchor == entry_p)
278  {
279  *anchor = entry_p->next;
280  }
281  else if (prev_p != nullptr)
282  {
283  prev_p->next = entry_p->next;
284  }
285  else
286  {
287  assert (false);
288  }
289 
290  css_free_queue_entry (entry_p);
291  break;
292  }
293 
294  prev_p = entry_p;
295  entry_p = entry_p->next;
296  }
297 }
298 
299 /*
300  * css_request_aborted() -
301  * return:
302  * conn(in):
303  * request_id(in):
304  */
305 static bool
306 css_is_request_aborted (CSS_CONN_ENTRY * conn, unsigned short request_id)
307 {
308  if (css_find_queue_entry (conn->abort_queue, request_id) != NULL)
309  {
310  return true;
311  }
312 
313  return false;
314 }
315 
316 static int
317 css_queue_packet (CSS_CONN_ENTRY * conn, CSS_QUEUE_ENTRY ** queue_p, unsigned short request_id, char *buffer, int size,
318  int rc)
319 {
320  if (!css_is_request_aborted (conn, request_id))
321  {
322  return css_add_entry_to_header (queue_p, request_id, buffer, size, rc, conn->get_tran_index (),
323  conn->invalidate_snapshot, conn->db_error);
324  }
325 
326  return NO_ERRORS;
327 }
328 
329 
330 /*
331  * css_queue_user_data_buffer () -
332  * return:
333  * conn(in/out):
334  * request_id(in):
335  * size(in):
336  * buffer(in):
337  *
338  * Note: If a client queues a data buffer when starting a request, we will
339  * save the buffer until data is returned from the client.
340  */
341 int
342 css_queue_user_data_buffer (CSS_CONN_ENTRY * conn, unsigned short request_id, int size, char *buffer)
343 {
344  if (buffer)
345  {
346  return css_queue_packet (conn, &conn->buffer_queue, request_id, buffer, size, 0);
347  }
348 
349  return NO_ERRORS;
350 }
351 
352 static bool
353 css_recv_and_queue_packet (CSS_CONN_ENTRY * conn, unsigned short request_id, char *buffer, int size,
354  CSS_QUEUE_ENTRY ** queue_p)
355 {
356  int rc;
357 
358  rc = css_net_recv (conn->fd, buffer, &size, -1);
359  if (rc == NO_ERRORS || rc == RECORD_TRUNCATED)
360  {
361  if (!css_is_request_aborted (conn, request_id))
362  {
363  css_add_entry_to_header (queue_p, request_id, buffer, size, rc, conn->get_tran_index (),
364  conn->invalidate_snapshot, conn->db_error);
365  return true;
366  }
367  }
368 
369  return false;
370 }
371 
372 /*
373  * css_queue_unexpected_data_packet () -
374  * return: void
375  * conn(in/out):
376  * request_id(in):
377  * header(in):
378  * size(in):
379  * rc(in):
380  *
381  * Note: This indicates that a data packet has arrived for a different
382  * request id. Save it for future processing.
383  */
384 void
385 css_queue_unexpected_data_packet (CSS_CONN_ENTRY * conn, unsigned short request_id, char *buffer, int size, int rc)
386 {
387  (void) css_queue_packet (conn, &conn->data_queue, request_id, buffer, size, rc);
388 }
389 
390 /*
391  * css_queue_data_packet () - read the data packet following the header packet
392  * return: void
393  * conn(in/out):
394  * request_id(in):
395  * header(in):
396  *
397  * Note: The data packet will then be queued.
398  */
399 static void
400 css_queue_data_packet (CSS_CONN_ENTRY * conn, unsigned short request_id, NET_HEADER * header)
401 {
402  char *buffer;
403  int size;
404 
405  size = ntohl (header->buffer_size);
406  buffer = css_return_data_buffer (conn, request_id, &size);
407 
408  if (buffer != NULL)
409  {
410  if (css_recv_and_queue_packet (conn, request_id, buffer, size, &conn->data_queue) == false)
411  {
412  free_and_init (buffer);
413  }
414  }
415  else
416  {
417  css_read_remaining_bytes (conn->fd, sizeof (int) + size);
419  }
420 }
421 
422 /*
423  * css_queue_unexpected_error_packet () -
424  * return: void
425  * conn(in/out):
426  * request_id(in):
427  * header(in):
428  * size(in):
429  * rc(in):
430  *
431  * Note: This indicates that an error packet has arrived for a different
432  * request id. Save it for future processing.
433  */
434 void
435 css_queue_unexpected_error_packet (CSS_CONN_ENTRY * conn, unsigned short request_id, char *buffer, int size, int rc)
436 {
437  (void) css_queue_packet (conn, &conn->error_queue, request_id, buffer, size, rc);
438 }
439 
440 /*
441  * css_queue_error_packet () - read the error packet following the header
442  * packet
443  * return: void
444  * conn(in/out):
445  * request_id(in):
446  * header(in):
447  *
448  * Note: The data packet will then be queued.
449  */
450 static void
451 css_queue_error_packet (CSS_CONN_ENTRY * conn, unsigned short request_id, NET_HEADER * header)
452 {
453  char *buffer;
454  int size;
455 
456  size = ntohl (header->buffer_size);
457  buffer = (char *) malloc (size);
458 
459  if (buffer != NULL)
460  {
461  if (css_recv_and_queue_packet (conn, request_id, buffer, size, &conn->error_queue) == false)
462  {
463  free_and_init (buffer);
464  }
465  }
466  else
467  {
468  css_read_remaining_bytes (conn->fd, sizeof (int) + size);
470  }
471 }
472 
473 /*
474  * css_queue_command_packet () -
475  * return: void
476  * conn(in/out):
477  * request_id(in):
478  * header(in):
479  * size(in):
480  *
481  * Note: This indicates that an unexpected command packet has arrived.
482  * Save it for future processing.
483  */
484 static void
485 css_queue_command_packet (CSS_CONN_ENTRY * conn, unsigned short request_id, NET_HEADER * header, int size)
486 {
487  NET_HEADER *temp;
488 
489  if (!css_is_request_aborted (conn, request_id))
490  {
491  temp = (NET_HEADER *) malloc (sizeof (NET_HEADER));
492 
493  if (temp != NULL)
494  {
495  memcpy ((char *) temp, (char *) header, sizeof (NET_HEADER));
496  css_add_entry_to_header (&conn->request_queue, request_id, (char *) temp, size, 0, conn->get_tran_index (),
497  conn->invalidate_snapshot, conn->db_error);
498  }
499  }
500 }
501 
502 /*
503  * css_process_abort_packet () - the server side of processing an aborted
504  * request
505  * return: void
506  * conn(in/out):
507  * request_id(in):
508  */
509 static void
510 css_process_abort_packet (CSS_CONN_ENTRY * conn, unsigned short request_id)
511 {
512  css_queue_remove_header_entry (&conn->request_queue, request_id);
513  css_queue_remove_header_entry (&conn->data_queue, request_id);
514 
515  if (css_find_queue_entry (conn->abort_queue, request_id) == NULL)
516  {
517  css_add_entry_to_header (&conn->abort_queue, request_id, NULL, 0, 0, conn->get_tran_index (),
518  conn->invalidate_snapshot, conn->db_error);
519  }
520 }
521 
522 /*
523  * css_process_close_packet () -
524  * return: void
525  * conn(in/out):
526  */
527 static void
529 {
530  if (conn->fd >= 0)
531  {
532  css_shutdown_socket (conn->fd);
533  conn->fd = -1;
534  }
535  conn->status = CONN_CLOSED;
536 }
537 
538 /*
539  * css_queue_unexpected_packet () -
540  * return: void
541  * type(in):
542  * conn(in/out):
543  * request_id(in):
544  * header(in):
545  * size(in):
546  *
547  * Note: This is used by the client when data or commands are
548  * encountered when not expected.
549  */
550 void
551 css_queue_unexpected_packet (int type, CSS_CONN_ENTRY * conn, unsigned short request_id, NET_HEADER * header, int size)
552 {
553  unsigned short flags = 0;
554 
555  conn->set_tran_index (ntohl (header->transaction_id));
556  flags = ntohs (header->flags);
558  conn->db_error = (int) ntohl (header->db_error);
559 
560  switch (type)
561  {
562  case CLOSE_TYPE:
564  break;
565 
566  case ABORT_TYPE:
567  css_process_abort_packet (conn, request_id);
568  break;
569 
570  case DATA_TYPE:
571  css_queue_data_packet (conn, request_id, header);
572  break;
573 
574  case ERROR_TYPE:
575  css_queue_error_packet (conn, request_id, header);
576  break;
577 
578  case COMMAND_TYPE:
579  css_queue_command_packet (conn, request_id, header, size);
580  break;
581 
582  default:
583  TPRINTF ("Asked to queue an unknown packet id = %d.\n", type);
584  }
585 }
int status
static bool css_is_request_aborted(CSS_CONN_ENTRY *conn, unsigned short request_id)
CSS_QUEUE_ENTRY * buffer_queue
static int css_add_entry_to_header(CSS_QUEUE_ENTRY **anchor, unsigned short request_id, char *buffer, int buffer_size, int rc, int transid, int invalidate_snapshot, int db_error)
void css_queue_remove_header_entry_ptr(CSS_QUEUE_ENTRY **anchor, CSS_QUEUE_ENTRY *entry)
void css_queue_remove_header_entry(CSS_QUEUE_ENTRY **anchor, unsigned short request_id)
int db_error
#define NET_HEADER_FLAG_INVALIDATE_SNAPSHOT
int size
SOCKET fd
static void css_queue_data_packet(CSS_CONN_ENTRY *conn, unsigned short request_id, NET_HEADER *header)
unsigned short flags
void css_queue_unexpected_data_packet(CSS_CONN_ENTRY *conn, unsigned short request_id, char *buffer, int size, int rc)
void css_read_remaining_bytes(SOCKET fd, int len)
unsigned int key
void css_queue_unexpected_error_packet(CSS_CONN_ENTRY *conn, unsigned short request_id, char *buffer, int size, int rc)
CSS_QUEUE_ENTRY * data_queue
#define assert(x)
int transaction_id
int rc
void css_queue_unexpected_packet(int type, CSS_CONN_ENTRY *conn, unsigned short request_id, NET_HEADER *header, int size)
void css_shutdown_socket(SOCKET fd)
Definition: tcp.c:1179
#define NULL
Definition: freelistheap.h:34
void css_queue_remove_header(CSS_QUEUE_ENTRY **anchor)
#define TPRINTF(error_string, arg)
static void css_queue_command_packet(CSS_CONN_ENTRY *conn, unsigned short request_id, NET_HEADER *header, int size)
CSS_QUEUE_ENTRY * abort_queue
static void css_queue_error_packet(CSS_CONN_ENTRY *conn, unsigned short request_id, NET_HEADER *header)
char * css_return_data_buffer(CSS_CONN_ENTRY *conn, unsigned short request_id, int *buffer_size)
int invalidate_snapshot
static int rc
Definition: serial.c:50
static CSS_QUEUE_ENTRY * css_make_queue_entry(unsigned int key, char *buffer, int size, CSS_QUEUE_ENTRY *next, int rc, int transid, int invalidate_snapshot, int db_error)
CSS_QUEUE_ENTRY * css_find_queue_entry(CSS_QUEUE_ENTRY *header, unsigned int key)
static int css_queue_packet(CSS_CONN_ENTRY *conn, CSS_QUEUE_ENTRY **queue_p, unsigned short request_id, char *buffer, int size, int rc)
unsigned short ntohs(unsigned short from)
static void css_free_queue_entry(CSS_QUEUE_ENTRY *entry_p)
#define free_and_init(ptr)
Definition: memory_alloc.h:147
CSS_QUEUE_ENTRY * next
int css_queue_user_data_buffer(CSS_CONN_ENTRY *conn, unsigned short request_id, int size, char *buffer)
CSS_QUEUE_ENTRY * error_queue
int css_net_recv(SOCKET fd, char *buffer, int *maxlen, int timeout)
static void css_process_close_packet(CSS_CONN_ENTRY *conn)
unsigned int ntohl(unsigned int from)
int invalidate_snapshot
CSS_QUEUE_ENTRY * request_queue
static bool css_recv_and_queue_packet(CSS_CONN_ENTRY *conn, unsigned short request_id, char *buffer, int size, CSS_QUEUE_ENTRY **queue_p)
int db_error
static void css_process_abort_packet(CSS_CONN_ENTRY *conn, unsigned short request_id)
char * buffer