CUBRID Engine  latest
scan_json_table.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2008 Search Solution Corporation
3  * Copyright 2016 CUBRID Corporation
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include "scan_json_table.hpp"
20 
21 #include "access_json_table.hpp"
22 #include "db_json.hpp"
23 #include "dbtype.h"
24 #include "fetch.h"
25 #include "object_primitive.h"
26 #include "scan_manager.h"
27 #include "storage_common.h"
28 
29 #include <algorithm>
30 
31 namespace cubscan
32 {
33  namespace json_table
34  {
36  {
37  std::size_t m_child; // current child
38  cubxasl::json_table::node *m_node; // pointer to access node
39  JSON_DOC_STORE m_input_doc; // input JSON document value
40  const JSON_DOC *m_process_doc; // for no expand, it matched input document. when node is expanded, it will
41  // point iterator value
42  bool m_is_row_fetched; // set to true when current row is fetched
43  bool m_need_advance_row; // set to true when next node action is to advance row
44  bool m_is_node_consumed; // set to true when all node rows (based on current input) are consumed
45  bool m_iteration_started; // set to true when iteration was started by at least one child.
46  // note: when all children are consumed, if row was never expanded, it is
47  // generated by leaving all children values as nil
48 
49  void advance_row_cursor (void); // advance to next row
50  void start_json_iterator (void); // start json iteration of changing input document
51  int fetch_row (void); // fetch current row (if not fetched)
52  void end (void); // finish current node scan
53 
54  cursor (void);
55  };
56 
58  : m_child (0)
59  , m_node (NULL)
60  , m_input_doc ()
62  , m_is_row_fetched (false)
63  , m_need_advance_row (false)
64  , m_is_node_consumed (true)
65  , m_iteration_started (false)
66  {
67  //
68  }
69 
70  void
72  {
73  // don't advance again in row
74  m_need_advance_row = false;
75 
76  // reset row expansion
77  m_iteration_started = false;
78 
80  {
81  end ();
82  return;
83  }
84 
85  // advance with row
87  m_is_row_fetched = false;
88 
89  // advance also with ordinality
91 
92  // reset child to first branch
93  m_child = 0;
94  }
95 
96  void
98  {
99  m_is_node_consumed = false;
101  {
102  assert (db_json_get_type (m_input_doc.get_immutable ()) == DB_JSON_ARRAY);
104  }
105  }
106 
107  int
109  {
110  if (m_is_row_fetched)
111  {
112  // already fetched
113  return NO_ERROR;
114  }
115 
116  // if we have an iterator, value is obtained from iterator. otherwise, use m_input_doc
117  if (m_node->m_iterator != NULL)
118  {
120  }
121  else
122  {
124  // todo: is it guaranteed we do not use m_process_doc after we delete input_doc?
125  m_process_doc = m_input_doc.get_immutable ();
126  }
127 
128  if (m_process_doc == NULL)
129  {
130  assert (false);
131  return ER_FAILED;
132  }
133 
134  int error_code = NO_ERROR;
135  for (size_t i = 0; i < m_node->m_output_columns_size; ++i)
136  {
138  if (error_code != NO_ERROR)
139  {
140  ASSERT_ERROR ();
141  return error_code;
142  }
143  }
144 
145  return NO_ERROR;
146  }
147 
148  void
150  {
151  m_is_node_consumed = true;
152 
154 
156  m_node->clear_columns (false);
157  }
158 
159  size_t
161  {
162  size_t max_child_height = 0;
163 
164  for (size_t i = 0; i < node.m_nested_nodes_size; ++i)
165  {
166  const cubxasl::json_table::node &child = node.m_nested_nodes[i];
167  max_child_height = std::max (max_child_height, get_tree_height (child));
168  }
169 
170  return 1 + max_child_height;
171  }
172 
173  void
175  {
176  m_specp = &spec;
177 
178  assert (m_specp->m_node_count > 0);
179 
182 
184 
185  // init cursor nodes to left-most branch
187  m_scan_cursor[0].m_node = t;
188  for (int i = 1; t->m_nested_nodes_size != 0; t = &t->m_nested_nodes[0], ++i)
189  {
190  m_scan_cursor[i].m_node = t;
191  }
192 
194  }
195 
196  void
197  scanner::clear (xasl_node *xasl_p, bool is_final, bool is_final_clear)
198  {
199  // columns should be released every time
200  m_specp->m_root_node->clear_xasl (is_final_clear);
202 
203  // all json documents should be released depending on is_final
204  if (is_final)
205  {
206  for (size_t i = 0; i < m_tree_height; ++i)
207  {
209  cursor.m_input_doc.clear ();
210 
211  cursor.m_child = 0;
212  cursor.m_is_row_fetched = false;
213  cursor.m_need_advance_row = false;
214  cursor.m_is_node_consumed = true;
215  cursor.m_iteration_started = false;
216  }
217 
218  m_specp->m_root_node->clear_iterators (is_final_clear);
219 
220  if (is_final_clear)
221  {
222  delete [] m_scan_cursor;
223  }
224  }
225  }
226 
227  int
229  {
230  int error_code = NO_ERROR;
231 
232  // we need the starting value to expand into a list of records
233  DB_VALUE *value_p = NULL;
234  error_code = fetch_peek_dbval (thread_p, m_specp->m_json_reguvar, m_vd, NULL, NULL, NULL, &value_p);
235  if (error_code != NO_ERROR)
236  {
237  ASSERT_ERROR ();
238  return error_code;
239  }
240  if (value_p == NULL)
241  {
242  assert (false);
243  return ER_FAILED;
244  }
245 
246  if (db_value_is_null (value_p))
247  {
249  return NO_ERROR;
250  }
251 
252  // build m_scan_cursor
253 
254  if (db_value_type (value_p) == DB_TYPE_JSON)
255  {
256  error_code = init_cursor (*db_get_json_document (value_p), *m_specp->m_root_node, m_scan_cursor[0]);
257  if (error_code != NO_ERROR)
258  {
259  ASSERT_ERROR ();
260  return error_code;
261  }
262  }
263  else
264  {
265  JSON_DOC_STORE document;
266 
267  error_code = db_value_to_json_doc (*value_p, false, document);
268  if (error_code != NO_ERROR)
269  {
270  ASSERT_ERROR ();
271  return error_code;
272  }
273  error_code = init_cursor (*document.get_immutable (), *m_specp->m_root_node, m_scan_cursor[0]);
274  if (error_code != NO_ERROR)
275  {
276  ASSERT_ERROR ();
277  return error_code;
278  }
279  }
280 
281  // if we gather expr from another table, for each row we need to reset the ordinality
284 
285  return NO_ERROR;
286  }
287 
288  void
290  {
291  assert (thread_p != NULL);
292  }
293 
294  int
296  {
297  bool has_row = false;
298  int error_code = NO_ERROR;
299  DB_LOGICAL logical = V_FALSE;
300 
301  if (sid.position == S_BEFORE)
302  {
303  error_code = open (thread_p);
304  if (error_code != NO_ERROR)
305  {
306  sc = S_ERROR;
307  return error_code;
308  }
309  sid.position = S_ON;
310  sid.status = S_STARTED;
311  }
312  else if (sid.position != S_ON)
313  {
314  assert (false);
315  sc = S_END;
316  return ER_FAILED;
317  }
318 
319  while (true)
320  {
321  error_code = scan_next_internal (thread_p, 0, has_row);
322  if (error_code != NO_ERROR)
323  {
324  ASSERT_ERROR ();
325  sc = S_ERROR;
326  return error_code;
327  }
328  if (!has_row)
329  {
330  sid.position = S_AFTER;
331  sc = S_END;
332  return NO_ERROR;
333  }
334 
336  {
337  break;
338  }
339 
340  logical = m_scan_predicate.pr_eval_fnc (thread_p, m_scan_predicate.pred_expr, sid.vd, NULL);
341  if (logical == V_TRUE)
342  {
343  break;
344  }
345  if (logical == V_ERROR)
346  {
347  ASSERT_ERROR_AND_SET (error_code);
348  sc = S_ERROR;
349  return error_code;
350  }
351  }
352 
353  sc = S_SUCCESS;
354  return NO_ERROR;
355  }
356 
357  int
358  scanner::set_input_document (cursor &cursor_arg, const cubxasl::json_table::node &node, const JSON_DOC &document)
359  {
360  int error_code = NO_ERROR;
361  cursor_arg.m_input_doc.clear ();
362 
363  // extract input document
364  error_code = db_json_extract_document_from_path (&document, node.m_path, cursor_arg.m_input_doc);
365  if (error_code != NO_ERROR)
366  {
367  ASSERT_ERROR ();
368  return error_code;
369  }
370 
371  if (cursor_arg.m_input_doc.is_null ())
372  {
373  // cannot retrieve input_doc from path
374  cursor_arg.m_is_node_consumed = true;
375  }
376  else
377  {
378  // start cursor based on input document
379  cursor_arg.start_json_iterator ();
380  }
381 
382  return NO_ERROR;
383  }
384 
385  int
387  {
388  cursor_out.m_is_row_fetched = false;
389  cursor_out.m_child = 0;
390  cursor_out.m_node = &node;
391 
392  return set_input_document (cursor_out, node, doc);
393  }
394 
395  int
396  scanner::set_next_cursor (const cursor &current_cursor, size_t next_depth)
397  {
398  return init_cursor (*current_cursor.m_process_doc,
399  current_cursor.m_node->m_nested_nodes[current_cursor.m_child],
400  m_scan_cursor[next_depth]);
401  }
402 
403  void
405  {
406  for (size_t i = 0; i < node.m_output_columns_size; ++i)
407  {
410  }
411  }
412 
413  void
415  {
416  node.init_iterator ();
417 
418  for (size_t i = 0; i < node.m_nested_nodes_size; ++i)
419  {
421  }
422  }
423 
424  void
426  {
427  node.init_ordinality ();
428 
429  for (size_t i = 0; i < node.m_nested_nodes_size; ++i)
430  {
432  }
433  }
434 
435  int
436  scanner::scan_next_internal (cubthread::entry *thread_p, size_t depth, bool &found_row_output)
437  {
438  int error_code = NO_ERROR;
439  cursor &this_cursor = m_scan_cursor[depth];
440 
441  // check if cursor is already in child node
442  if (m_scan_cursor_depth >= depth + 1)
443  {
444  // advance to child
445  error_code = scan_next_internal (thread_p, depth + 1, found_row_output);
446  if (error_code != NO_ERROR)
447  {
448  return error_code;
449  }
450  if (found_row_output)
451  {
452  // advance to new child
453  return NO_ERROR;
454  }
455  else
456  {
457  this_cursor.m_child++;
458  }
459  }
460 
461  // get the cursor from the current depth
462  assert (this_cursor.m_node != NULL);
463 
464  // loop through node's rows and children until all possible rows are generated
465  while (!this_cursor.m_is_node_consumed)
466  {
467  // note - do not loop without taking new action
468  // an action is either advancing to new row or advancing to new child
469  if (this_cursor.m_need_advance_row)
470  {
471  this_cursor.advance_row_cursor ();
472  if (this_cursor.m_is_node_consumed)
473  {
474  break;
475  }
476  }
477 
478  // first things first, fetch current row
479  error_code = this_cursor.fetch_row ();
480  if (error_code != NO_ERROR)
481  {
482  return error_code;
483  }
484 
485  // if this is leaf node, then we have a new complete row
486  if (this_cursor.m_node->m_nested_nodes_size == 0)
487  {
488  found_row_output = true;
489  // next time, cursor will have to be incremented
490  this_cursor.m_need_advance_row = true;
491  return NO_ERROR;
492  }
493 
494  // non-leaf
495  // advance to current child
496  if (this_cursor.m_child == this_cursor.m_node->m_nested_nodes_size)
497  {
498  // next time, cursor will have to be incremented
499  this_cursor.m_need_advance_row = true;
500 
501  if (this_cursor.m_iteration_started)
502  {
503  continue;
504  }
505 
506  found_row_output = true;
507  return NO_ERROR;
508  }
509 
510  // create cursor for next child
511  error_code = set_next_cursor (this_cursor, depth + 1);
512  if (error_code != NO_ERROR)
513  {
514  ASSERT_ERROR ();
515  return error_code;
516  }
517  cursor &next_cursor = m_scan_cursor[depth + 1];
518 
519  if (!next_cursor.m_is_node_consumed)
520  {
521  // advance current level in tree
523 
524  this_cursor.m_iteration_started = true;
525 
526  error_code = scan_next_internal (thread_p, depth + 1, found_row_output);
527  if (error_code != NO_ERROR)
528  {
529  return error_code;
530  }
531  }
532  else
533  {
534  this_cursor.m_child++;
535  continue;
536  }
537 
538  if (found_row_output)
539  {
540  // found a row; scan is stopped
541  return NO_ERROR;
542  }
543  else
544  {
545  // child could not generate a row. advance to next
546  this_cursor.m_child++;
547  }
548  }
549 
550  // no more rows...
551  found_row_output = false;
552 
553  if (m_scan_cursor_depth > 0)
554  {
555  // remove this cursor
557  }
558 
559  return NO_ERROR;
560  }
561 
562  SCAN_PRED &
564  {
565  return m_scan_predicate;
566  }
567 
568  void
570  {
571  m_vd = vd;
572  }
573  } // namespace json_table
574 } // namespace cubscan
val_descr * vd
Definition: scan_manager.h:350
#define NO_ERROR
Definition: error_code.h:46
void init_iterators(cubxasl::json_table::node &node)
#define ASSERT_ERROR()
SCAN_CODE
void end(cubthread::entry *thread_p)
regu_variable_node * m_json_reguvar
JSON_DOC * db_get_json_document(const DB_VALUE *value)
const JSON_DOC * db_json_iterator_get_document(JSON_ITERATOR &json_itr)
Definition: db_json.cpp:930
#define ER_FAILED
Definition: error_code.h:47
void clear_xasl(bool is_final_clear=true)
bool db_json_iterator_has_next(JSON_ITERATOR &json_itr)
Definition: db_json.cpp:936
#define ASSERT_ERROR_AND_SET(error_code)
void set_value_descriptor(val_descr *vd)
int db_json_extract_document_from_path(const JSON_DOC *document, const std::string &path, JSON_DOC_STORE &result, bool allow_wildcards)
Definition: db_json.cpp:1153
void clear_node_columns(cubxasl::json_table::node &node)
int init_cursor(const JSON_DOC &doc, cubxasl::json_table::node &node, cursor &cursor_out)
void db_json_iterator_next(JSON_ITERATOR &json_itr)
Definition: db_json.cpp:924
#define assert(x)
DB_TYPE db_value_type(const DB_VALUE *value)
int fetch_peek_dbval(THREAD_ENTRY *thread_p, REGU_VARIABLE *regu_var, val_descr *vd, OID *class_oid, OID *obj_oid, QFILE_TUPLE tpl, DB_VALUE **peek_dbval)
Definition: fetch.c:3773
PRED_EXPR * pred_expr
void db_json_set_iterator(JSON_ITERATOR *&json_itr, const JSON_DOC &new_doc)
Definition: db_json.cpp:942
#define NULL
Definition: freelistheap.h:34
SCAN_POSITION position
Definition: scan_manager.h:334
SCAN_STATUS status
Definition: scan_manager.h:333
int pr_clear_value(DB_VALUE *value)
DB_JSON_TYPE db_json_get_type(const JSON_DOC *doc)
Definition: db_json.cpp:2519
int scan_next_internal(cubthread::entry *thread_p, size_t depth, bool &found_row_output)
#define max(a, b)
cubxasl::json_table::node * m_node
bool db_value_is_null(const DB_VALUE *value)
void db_json_reset_iterator(JSON_ITERATOR *&json_itr)
Definition: db_json.cpp:948
int evaluate(const JSON_DOC &input, size_t ordinality)
PR_EVAL_FNC pr_eval_fnc
void clear_columns(bool is_final_clear)
int set_input_document(cursor &cursor, const cubxasl::json_table::node &node, const JSON_DOC &document)
cubxasl::json_table::spec_node * m_specp
int open(cubthread::entry *thread_p)
void init(cubxasl::json_table::spec_node &spec)
DB_LOGICAL
Definition: dbtype_def.h:1218
int next_scan(cubthread::entry *thread_p, scan_id_struct &sid, SCAN_CODE &sc)
int i
Definition: dynamic_load.c:954
int db_make_null(DB_VALUE *value)
size_t get_tree_height(const cubxasl::json_table::node &node)
void reset_ordinality(cubxasl::json_table::node &node)
int set_next_cursor(const cursor &current_cursor, size_t next_depth)
void clear(xasl_node *xasl_p, bool is_final, bool is_final_clear)
int db_value_to_json_doc(const DB_VALUE &db_val, bool force_copy, JSON_DOC_STORE &json_doc)
Definition: db_json.cpp:3183
void clear_iterators(bool is_final_clear)