CUBRID Engine  latest
shard_proxy_queue.c
Go to the documentation of this file.
1 /*
2  * Copyright 2008 Search Solution Corporation
3  * Copyright 2016 CUBRID Corporation
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 
20 /*
21  * shard_proxy_queue.c -
22  *
23  */
24 
25 #ident "$Id$"
26 
27 
28 #include <stdio.h>
29 #include <stdlib.h>
30 #include <assert.h>
31 
32 #include "cas_common.h"
33 #include "shard_proxy_common.h"
34 #include "shard_proxy_queue.h"
35 
36 
37 static void
39 {
40  curr->next = NULL;
41 
42  if (q->head == NULL)
43  {
44  assert (prev == NULL);
45  assert (q->tail == NULL);
46 
47  q->head = q->tail = curr;
48  return;
49  }
50 
51  if (prev)
52  {
53  curr->next = prev->next;
54  prev->next = curr;
55  }
56  else
57  {
58  curr->next = q->head;
59  q->head = curr;
60  }
61 
62  if (q->tail == prev)
63  {
64  q->tail = curr;
65  }
66 
67  return;
68 }
69 
70 int
72 {
73  T_SHARD_QUEUE_ENT *q_ent;
74 
75  assert (q);
76  assert (v);
77 
78  q_ent = (T_SHARD_QUEUE_ENT *) malloc (sizeof (T_SHARD_QUEUE_ENT));
79  if (q_ent)
80  {
81  q_ent->v = v;
82  shard_queue_insert_after (q, q->tail, q_ent);
83  return 0; /* SUCCESS */
84  }
85 
86  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Not enough virtual memory. " "Failed to alloc shard queue entry. " "(errno:%d).",
87  errno);
88 
89  return -1; /* FAILED */
90 }
91 
92 int
94 {
95  T_SHARD_QUEUE_ENT *q_ent;
96  T_SHARD_QUEUE_ENT *curr, *prev;
97 
98  q_ent = (T_SHARD_QUEUE_ENT *) malloc (sizeof (T_SHARD_QUEUE_ENT));
99  if (q_ent)
100  {
101  q_ent->v = v;
102 
103  if (comp_fn == NULL)
104  {
105  shard_queue_insert_after (q, q->tail, q_ent);
106  return 0;
107  }
108 
109  prev = NULL;
110  for (curr = q->head; curr; curr = curr->next)
111  {
112  if (comp_fn (curr->v, q_ent->v) > 0)
113  {
114  shard_queue_insert_after (q, prev, q_ent);
115  return 0;
116  }
117 
118  prev = curr;
119  }
120 
121  shard_queue_insert_after (q, q->tail, q_ent);
122 
123  return 0;
124  }
125 
126  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Not enough virtual memory. " "Failed to alloc shard queue entry. " "(errno:%d).",
127  errno);
128  return -1;
129 }
130 
131 void *
133 {
134  T_SHARD_QUEUE_ENT *q_ent;
135  void *ret;
136 
137  if (q->head == NULL)
138  {
139  return NULL;
140  }
141 
142  q_ent = q->head;
143  ret = q_ent->v;
144 
145  if (q->head == q->tail)
146  {
147  q->head = q->tail = NULL;
148  }
149  else
150  {
151  q->head = q->head->next;
152  }
153 
154  FREE_MEM (q_ent);
155 
156  return ret;
157 }
158 
159 void *
161 {
162  T_SHARD_QUEUE_ENT *q_ent;
163  void *ret;
164 
165  if (q->head == NULL)
166  {
167  return NULL;
168  }
169 
170  q_ent = q->head;
171  ret = q_ent->v;
172 
173  return ret;
174 }
175 
176 int
178 {
179  assert (q);
180 
181  q->head = NULL;
182  q->tail = NULL;
183 
184  return 0;
185 }
186 
187 void
189 {
190  void *v;
191 
192  while ((v = shard_queue_dequeue (q)) != NULL)
193  {
194  FREE_MEM (v);
195  }
196 
197  q->head = NULL;
198  q->tail = NULL;
199 
200  return;
201 }
202 
203 static bool
205 {
206  assert (q);
207 
208  return (q->count == q->size) ? true : false;
209 }
210 
211 static bool
213 {
214  assert (q);
215 
216  return (q->count == 0) ? true : false;
217 }
218 
219 
220 int
222 {
223  assert (q);
224 
225  if (shard_cqueue_is_full (q))
226  {
227  PROXY_LOG (PROXY_LOG_MODE_ERROR, "Queue is full. (q_size:%d, q_count:%d).", q->size, q->count);
228  /* FAILED */
229  return -1;
230  }
231 
232  q->ent[q->front] = e;
233 
234  q->count++;
235  q->front = (q->front + 1) % q->size;
236 
237  /* SUCCESS */
238  return 0;
239 }
240 
241 void *
243 {
244  void *e;
245 
246  assert (q);
247 
248  if (shard_cqueue_is_empty (q))
249  {
250  /* FAIELD */
251  return NULL;
252  }
253 
254  e = q->ent[q->rear];
255 
256  q->count--;
257  q->rear = (q->rear + 1) % q->size;
258 
259  return e;
260 }
261 
262 int
264 {
265  assert (q);
266  assert (q->ent == NULL);
267 
268  q->size = size;
269  q->count = 0;
270  q->front = 0;
271  q->rear = 0;
272 
273  q->ent = (void **) malloc (sizeof (void *) * size);
274  if (q->ent == NULL)
275  {
277  "Not enough virtual memory. " "Failed to alloc shard cqueue entry. " "(errno:%d, size:%d).", errno,
278  (size * sizeof (void *)));
279 
280  /* FAILED */
281  return -1;
282  }
283 
284  /* SUCCESS */
285  return 0;
286 }
287 
288 void
290 {
291  q->size = 0;
292  q->count = 0;
293  q->front = 0;
294  q->rear = 0;
295  FREE_MEM (q->ent);
296 }
int shard_queue_enqueue(T_SHARD_QUEUE *q, void *v)
int shard_queue_ordered_enqueue(T_SHARD_QUEUE *q, void *v, SHARD_COMP_FN comp_fn)
T_SHARD_QUEUE_ENT * tail
int(* SHARD_COMP_FN)(const void *arg1, const void *arg2)
T_SHARD_QUEUE_ENT * head
#define assert(x)
static bool shard_cqueue_is_empty(T_SHARD_CQUEUE *q)
#define NULL
Definition: freelistheap.h:34
#define FREE_MEM(PTR)
Definition: cas_common.h:58
void * shard_cqueue_dequeue(T_SHARD_CQUEUE *q)
void shard_queue_destroy(T_SHARD_QUEUE *q)
#define PROXY_LOG(level, fmt, args...)
static bool shard_cqueue_is_full(T_SHARD_CQUEUE *q)
void shard_cqueue_destroy(T_SHARD_CQUEUE *q)
void * shard_queue_dequeue(T_SHARD_QUEUE *q)
int shard_cqueue_enqueue(T_SHARD_CQUEUE *q, void *e)
void * shard_queue_peek_value(T_SHARD_QUEUE *q)
int shard_queue_initialize(T_SHARD_QUEUE *q)
static void shard_queue_insert_after(T_SHARD_QUEUE *q, T_SHARD_QUEUE_ENT *prev, T_SHARD_QUEUE_ENT *curr)
T_SHARD_QUEUE_ENT * next
int shard_cqueue_initialize(T_SHARD_CQUEUE *q, int size)