CUBRID Engine  latest
broker_proxy_conn.c
Go to the documentation of this file.
1 /*
2  * Copyright 2008 Search Solution Corporation
3  * Copyright 2016 CUBRID Corporation
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 
20 /*
21  * broker_proxy_conn.c -
22  */
23 
24 #ident "$Id$"
25 
26 #include <sys/types.h>
27 #include <assert.h>
28 #if !defined(WINDOWS)
29 #include <sys/time.h>
30 #include <unistd.h>
31 #include <pthread.h>
32 #endif /* !WINDOWS */
33 
34 #include "porting.h"
35 #include "broker_proxy_conn.h"
36 #include "shard_proxy_common.h"
37 #include "shard_shm.h"
38 
39 #if !defined(WINDOWS)
41  -1, /* max_num_proxy */
42  0, /* cur_num_proxy */
43  NULL /* proxy_sockfd */
44 };
45 
46 pthread_mutex_t proxy_conn_mutex;
47 
48 static void broker_free_all_proxy_conn_ent (void);
51 
52 static void
54 {
55  T_PROXY_CONN_ENT *ent_p, *next_ent_p;
56 
57  for (ent_p = broker_Proxy_conn.proxy_conn_ent; ent_p; ent_p = next_ent_p)
58  {
59  next_ent_p = ent_p->next;
60 
61  FREE_MEM (ent_p);
62  ent_p = NULL;
63  }
64 
65  broker_Proxy_conn.proxy_conn_ent = NULL;
66 }
67 
68 int
69 broker_set_proxy_fds (fd_set * fds)
70 {
71  int ret = 0;
72  T_PROXY_CONN_ENT *ent_p;
73 
75  for (ent_p = broker_Proxy_conn.proxy_conn_ent; ent_p; ent_p = ent_p->next)
76  {
77  if (ent_p->fd != INVALID_SOCKET && ent_p->status == PROXY_CONN_CONNECTED)
78  {
79  FD_SET (ent_p->fd, fds);
80  }
81  }
83 
84  return ret;
85 }
86 
87 SOCKET
89 {
90  SOCKET fd;
91  T_PROXY_CONN_ENT *ent_p;
92 
94  for (ent_p = broker_Proxy_conn.proxy_conn_ent; ent_p; ent_p = ent_p->next)
95  {
96  if (ent_p->fd == INVALID_SOCKET)
97  {
98  continue;
99  }
100 
101  if (ent_p->status != PROXY_CONN_CONNECTED)
102  {
103  continue;
104  }
105 
106  if (FD_ISSET (ent_p->fd, fds))
107  {
108  fd = ent_p->fd;
109  FD_CLR (ent_p->fd, fds);
110 
112  return fd;
113  }
114  }
116 
117  return INVALID_SOCKET;
118 
119 }
120 
121 static T_PROXY_CONN_ENT *
123 {
124  T_PROXY_CONN_ENT *ent_p;
125 
126  for (ent_p = broker_Proxy_conn.proxy_conn_ent; ent_p; ent_p = ent_p->next)
127  {
128  if (ent_p->fd != fd)
129  {
130  continue;
131  }
132 
133  return ent_p;
134  }
135 
136  return NULL;
137 }
138 
139 static T_PROXY_CONN_ENT *
141 {
142  T_PROXY_CONN_ENT *ent_p;
143 
144  for (ent_p = broker_Proxy_conn.proxy_conn_ent; ent_p; ent_p = ent_p->next)
145  {
146  if (ent_p->proxy_id != proxy_id)
147  {
148  continue;
149  }
150 
151  return ent_p;
152  }
153 
154  return NULL;
155 }
156 
157 int
159 {
160  int ret = 0;
161  T_PROXY_CONN_ENT *ent_p;
162 
163  if (broker_Proxy_conn.max_num_proxy < 0)
164  {
165  return -1;
166  }
167 
169  if (broker_Proxy_conn.max_num_proxy <= broker_Proxy_conn.cur_num_proxy)
170  {
172  return -1;
173  }
174 
175  ent_p = (T_PROXY_CONN_ENT *) malloc (sizeof (T_PROXY_CONN_ENT));
176  if (ent_p == NULL)
177  {
179  return -1;
180  }
181 
182  ent_p->proxy_id = PROXY_INVALID_ID;
183  ent_p->status = PROXY_CONN_CONNECTED;
184  ent_p->fd = fd;
185 
186  ent_p->next = broker_Proxy_conn.proxy_conn_ent;
187  broker_Proxy_conn.proxy_conn_ent = ent_p;
188 
189  broker_Proxy_conn.cur_num_proxy++;
190  if (broker_Proxy_conn.cur_num_proxy > broker_Proxy_conn.max_num_proxy)
191  {
192  assert (false);
193  broker_Proxy_conn.cur_num_proxy = broker_Proxy_conn.max_num_proxy;
194  }
195 
197  return ret;
198 }
199 
200 int
202 {
203  int ret = 0;
204  T_PROXY_CONN_ENT *ent_p, *prev_ent_p;
205 
207  for (prev_ent_p = ent_p = broker_Proxy_conn.proxy_conn_ent; ent_p; prev_ent_p = ent_p, ent_p = ent_p->next)
208  {
209  if (ent_p->fd == fd)
210  {
211  if (ent_p == broker_Proxy_conn.proxy_conn_ent)
212  {
213  broker_Proxy_conn.proxy_conn_ent = ent_p->next;
214  }
215  else
216  {
217  prev_ent_p->next = ent_p->next;
218  }
219 
220  broker_Proxy_conn.cur_num_proxy--;
221  if (broker_Proxy_conn.cur_num_proxy < 0)
222  {
223  assert (false);
224  broker_Proxy_conn.cur_num_proxy = 0;
225  }
226 
227  FREE_MEM (ent_p);
228  ent_p = NULL;
229  break;
230  }
231  }
233 
234  return ret;
235 }
236 
237 int
239 {
240  int ret = 0;
241  T_PROXY_CONN_ENT *ent_p, *prev_ent_p;
242 
243  if (proxy_id == PROXY_INVALID_ID)
244  {
245  return -1;
246  }
247 
249  for (prev_ent_p = ent_p = broker_Proxy_conn.proxy_conn_ent; ent_p; prev_ent_p = ent_p, ent_p = ent_p->next)
250  {
251  if (ent_p->proxy_id == proxy_id)
252  {
253  if (ent_p == broker_Proxy_conn.proxy_conn_ent)
254  {
255  broker_Proxy_conn.proxy_conn_ent = ent_p->next;
256  }
257  else
258  {
259  prev_ent_p->next = ent_p->next;
260  }
261 
262  broker_Proxy_conn.cur_num_proxy--;
263  if (broker_Proxy_conn.cur_num_proxy < 0)
264  {
265  assert (false);
266  broker_Proxy_conn.cur_num_proxy = 0;
267  }
268 
269  FREE_MEM (ent_p);
270  ent_p = NULL;
271  break;
272  }
273  }
274 
276 
277  return ret;
278 }
279 
280 int
282 {
283  int ret = 0;
284  T_PROXY_CONN_ENT *ent_p;
285 
287  ent_p = broker_find_proxy_conn_by_fd (fd);
288  if (ent_p == NULL)
289  {
291  return -1;
292  }
293  assert (ent_p->status != PROXY_CONN_AVAILABLE);
294  assert (ent_p->proxy_id == PROXY_INVALID_ID);
295  if (ent_p->status == PROXY_CONN_AVAILABLE || ent_p->proxy_id != PROXY_INVALID_ID)
296  {
298  return -1;
299  }
300 
301  ent_p->status = PROXY_CONN_AVAILABLE;
302  ent_p->proxy_id = proxy_id;
303 
305 
306  return ret;
307 }
308 #endif /* !WINDOWS */
309 
310 #if defined(WINDOWS)
311 int
313 #else /* WINDOWS */
314 SOCKET
316 #endif /* !WINDOWS */
317 {
318  int proxy_index;
319  int min_cur_client = -1;
320  int cur_client = -1;
321  int max_context = -1;
323 #if defined(WINDOWS)
324  T_PROXY_INFO *find_proxy_info_p;
325 #else /* WINDOWS */
326  T_PROXY_CONN_ENT *ent_p;
327  SOCKET fd = INVALID_SOCKET;
328 
329  if (broker_Proxy_conn.max_num_proxy < 0)
330  {
331  return INVALID_SOCKET;
332  }
333 
335 #endif /* !WINDOWS */
336  for (proxy_index = 0; proxy_index < shm_proxy_p->num_proxy; proxy_index++)
337  {
338  proxy_info_p = shard_shm_find_proxy_info (shm_proxy_p, proxy_index);
339 
340  if (proxy_info_p->pid <= 0)
341  {
342  continue;
343  }
344 
345  max_context = proxy_info_p->max_context;
346  cur_client = proxy_info_p->cur_client;
347 
348 #if !defined(WINDOWS)
349  ent_p = broker_find_proxy_conn_by_id (proxy_info_p->proxy_id);
350  if (ent_p == NULL || ent_p->status != PROXY_CONN_AVAILABLE)
351  {
352  continue;
353  }
354 
355  assert (ent_p->fd != INVALID_SOCKET);
356 #endif /* !WINDOWS */
357 
358  if (min_cur_client == -1)
359  {
360  min_cur_client = cur_client;
361  }
362 
363  if (cur_client < max_context && cur_client <= min_cur_client)
364  {
365 #if defined(WINDOWS)
366  find_proxy_info_p = proxy_info_p;
367 #else /* WINDOWS */
368  fd = ent_p->fd;
369 #endif /* !WINDOWS */
370 
371  min_cur_client = cur_client;
372  }
373  }
374 #if !defined(WINDOWS)
376 #endif /* !WINDOWS */
377 
378 #if defined(WINDOWS)
379  return find_proxy_info_p->proxy_port;
380 #else /* WINDOWS */
381  return fd;
382 #endif /* !WINDOWS */
383 }
384 
385 #if !defined(WINDOWS)
386 SOCKET
388 {
389  T_PROXY_CONN_ENT *ent_p;
390  int max_fd;
391 
392  max_fd = proxy_sock_fd;
393 
395  for (ent_p = broker_Proxy_conn.proxy_conn_ent; ent_p; ent_p = ent_p->next)
396  {
397  if (ent_p->status != PROXY_CONN_NOT_CONNECTED)
398  {
399  if (max_fd < ent_p->fd)
400  {
401  max_fd = ent_p->fd;
402  }
403  }
404  }
406 
407  return (max_fd + 1);
408 }
409 
410 int
411 broker_init_proxy_conn (int max_proxy)
412 {
413  if (broker_Proxy_conn.max_num_proxy >= 0)
414  {
415  return 0;
416  }
417 
419 
420  broker_Proxy_conn.max_num_proxy = max_proxy;
421  broker_Proxy_conn.cur_num_proxy = 0;
422  broker_Proxy_conn.proxy_conn_ent = NULL;
423 
424  return 0;
425 }
426 
427 void
429 {
430  if (broker_Proxy_conn.max_num_proxy < 0)
431  {
432  return;
433  }
434 
436  broker_Proxy_conn.max_num_proxy = -1;
437  broker_Proxy_conn.cur_num_proxy = 0;
438 
441 
443 
444  return;
445 }
446 #endif /* !WINDOWS */
int broker_init_proxy_conn(int max_proxy)
SOCKET broker_find_available_proxy(T_SHM_PROXY *shm_proxy_p)
int SOCKET
Definition: porting.h:482
#define pthread_mutex_init(a, b)
Definition: area_alloc.c:48
int broker_add_proxy_conn(SOCKET fd)
T_PROXY_INFO * shard_shm_find_proxy_info(T_SHM_PROXY *proxy_p, int proxy_id)
Definition: shard_shm.c:419
#define pthread_mutex_unlock(a)
Definition: area_alloc.c:51
int broker_set_proxy_fds(fd_set *fds)
T_PROXY_CONN broker_Proxy_conn
static T_SHM_PROXY * shm_proxy_p
Definition: broker.c:313
SOCKET broker_get_proxy_conn_maxfd(SOCKET proxy_sock_fd)
#define INVALID_SOCKET
Definition: porting.h:483
static void broker_free_all_proxy_conn_ent(void)
#define assert(x)
int proxy_id
Definition: shard_proxy.c:45
static SOCKET proxy_sock_fd
Definition: broker.c:306
#define NULL
Definition: freelistheap.h:34
T_PROXY_CONN_ENT * proxy_conn_ent
#define FREE_MEM(PTR)
Definition: cas_common.h:58
static T_PROXY_CONN_ENT * broker_find_proxy_conn_by_fd(SOCKET fd)
T_PROXY_INFO * proxy_info_p
Definition: shard_proxy.c:48
static T_PROXY_CONN_ENT * broker_find_proxy_conn_by_id(int proxy_id)
T_PROXY_CONN_ENT * next
int broker_delete_proxy_conn_by_proxy_id(int proxy_id)
int T_BROKER_VERSION
Definition: cas_protocol.h:342
int broker_register_proxy_conn(SOCKET fd, int proxy_id)
SOCKET broker_get_readable_proxy_conn(fd_set *fds)
pthread_mutex_t proxy_conn_mutex
#define pthread_mutex_lock(a)
Definition: area_alloc.c:50
#define PROXY_INVALID_ID
Definition: broker_util.h:35
int broker_delete_proxy_conn_by_fd(SOCKET fd)
void broker_destroy_proxy_conn(void)
#define pthread_mutex_destroy(a)
Definition: area_alloc.c:49