CUBRID Engine  latest
load_session.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2008 Search Solution Corporation
3  * Copyright 2016 CUBRID Corporation
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 /*
20  * load_session.cpp - entry point for server side loaddb
21  */
22 
23 #include "load_session.hpp"
24 
25 #include "load_driver.hpp"
26 #include "load_server_loader.hpp"
27 #include "load_worker_manager.hpp"
28 #include "resource_shared_pool.hpp"
29 #include "xserver_interface.h"
30 
31 #include <sstream>
32 
33 namespace cubload
34 {
35 
36  void init_driver (driver *driver, session &session);
37 
38  bool invoke_parser (driver *driver, const batch &batch_);
39 
40 }
41 
42 namespace cubload
43 {
44 
45  void
47  {
48  if (driver == NULL)
49  {
50  session.fail ();
51  assert (false);
52  return;
53  }
54 
55  // avoid driver being initialized twice
56  if (driver->is_initialized ())
57  {
58  return;
59  }
60 
61  error_handler *error_handler_ = new error_handler (session);
62  class_installer *cls_installer = new server_class_installer (session, *error_handler_);
63  object_loader *obj_loader = new server_object_loader (session, *error_handler_);
64 
65  driver->initialize (cls_installer, obj_loader, error_handler_);
66  }
67 
68  bool
69  invoke_parser (driver *driver, const batch &batch_)
70  {
71  if (driver == NULL || !driver->is_initialized ())
72  {
73  return false;
74  }
75 
76  driver->get_object_loader ().init (batch_.get_class_id ());
77  driver->get_class_installer ().set_class_id (batch_.get_class_id ());
78 
79  // parse doc says that 0 is returned if parsing succeeds
80  std::istringstream iss (batch_.get_content ());
81  int parser_result = driver->parse (iss, batch_.get_line_offset ());
82 
83  driver->get_object_loader ().destroy ();
84 
85  return parser_result == 0;
86  }
87 
88  /*
89  * cubload::load_worker
90  * extends cubthread::entry_task
91  *
92  * description
93  * Loaddb worker thread task, which does parsing and inserting of data rows within a transaction
94  */
96  {
97  public:
98  load_task () = delete; // Default c-tor: deleted.
99 
100  ~load_task () override
101  {
103  {
104  notify_done ();
105  }
106  delete &m_batch;
107  }
108 
110  : m_batch (batch)
111  , m_session (session)
112  , m_conn_entry (conn_entry)
113  , m_was_session_notified (false)
114  {
115  //
116  }
117 
118  void execute (cubthread::entry &thread_ref) final
119  {
120  if (m_session.is_failed ())
121  {
122  return;
123  }
124 
125  thread_ref.conn_entry = &m_conn_entry;
126  driver *driver = thread_ref.m_loaddb_driver;
127 
128  assert (driver != NULL && !driver->is_initialized ());
129  init_driver (driver, m_session);
130 
131  bool is_syntax_check_only = m_session.get_args ().syntax_check;
133  if (cls_entry == NULL)
134  {
135  if (!is_syntax_check_only)
136  {
138  }
139  else
140  {
142  }
143 
144  driver->clear ();
145  notify_done ();
146  return;
147  }
148 
151  int tran_index = thread_ref.tran_index;
152  m_session.register_tran_start (tran_index);
153 
154  // Get the clientids from the session and set it on the current worker.
155  LOG_TDES *session_tdes = log_Gl.trantable.all_tdes[m_conn_entry.get_tran_index ()];
156  LOG_TDES *worker_tdes = log_Gl.trantable.all_tdes[tran_index];
157  worker_tdes->client.set_ids (session_tdes->client);
158 
159  bool parser_result = invoke_parser (driver, m_batch);
160 
161  // Get the class name.
162  std::string class_name = cls_entry->get_class_name ();
163 
164  // We need this to update the stats.
165  int line_no = driver->get_scanner ().lineno ();
166 
167  // Get the inserted lines
168  std::size_t rows_number = driver->get_object_loader ().get_rows_number ();
169 
170  // We don't need anything from the driver anymore.
171  driver->clear ();
172 
173  if (m_session.is_failed () || (!is_syntax_check_only && (!parser_result || er_has_error ())))
174  {
175  // if a batch transaction was aborted and syntax only is not enabled then abort entire loaddb session
176  m_session.fail ();
177 
178  xtran_server_abort (&thread_ref);
179  }
180  else
181  {
182  // order batch commits, therefore wait until previous batch is committed
184 
185  xtran_server_commit (&thread_ref, false);
186 
187  // update load statistics after commit
190 
191  MSGCAT_LOADDB_MSG msg_type;
193  {
194  msg_type = LOADDB_MSG_INSTANCE_COUNT;
195  }
196  else
197  {
199  }
200 
201  m_session.append_log_msg (msg_type, class_name.c_str (), rows_number);
202  }
203 
204  // Clear the clientids.
205  worker_tdes->client.reset ();
206 
207  // notify session that batch is done
208  notify_done_and_tran_end (tran_index);
209  }
210 
211  private:
212  void notify_done ()
213  {
216  m_was_session_notified = true;
217  }
218 
219  void notify_done_and_tran_end (int tran_index)
220  {
223  m_was_session_notified = true;
224  }
225 
226  const batch &m_batch;
230  };
231 
233  : m_mutex ()
234  , m_cond_var ()
235  , m_tran_indexes ()
236  , m_args (args)
237  , m_last_batch_id {NULL_BATCH_ID}
239  , m_active_task_count {0}
240  , m_class_registry ()
241  , m_stats ()
242  , m_is_failed (false)
243  , m_collected_stats ()
244  , m_driver (NULL)
245  , m_temp_task (NULL)
246  {
248 
249  m_driver = new driver ();
250  init_driver (m_driver, *this);
251 
252  if (!m_args.table_name.empty ())
253  {
254  // just set class id to 1 since only one table can be specified as command line argument
255  cubthread::entry &thread_ref = cubthread::get_entry ();
256 
258  {
259  // This is an error.
261  return;
262  }
263 
264  thread_ref.m_loaddb_driver = m_driver;
267  thread_ref.m_loaddb_driver = NULL;
268  }
269  }
270 
272  {
273  delete m_driver;
274 
276  }
277 
278  bool
280  {
282  }
283 
284  void
286  {
287  auto pred = [this, &id] () -> bool { return is_failed () || id == (m_last_batch_id + 1); };
288 
289  if (id == FIRST_BATCH_ID || pred ())
290  {
291  return;
292  }
293 
294  std::unique_lock<std::mutex> ulock (m_mutex);
295  m_cond_var.wait (ulock, pred);
296  }
297 
298  void
300  {
301  auto pred = [this] () -> bool
302  {
303  // condition of finish and no active tasks
304  return (is_failed () || is_completed ()) && (m_active_task_count == 0);
305  };
306 
307  if (pred ())
308  {
309  return;
310  }
311 
312  std::unique_lock<std::mutex> ulock (m_mutex);
313  m_cond_var.wait (ulock, pred);
314  }
315 
316  void
318  {
319  std::unique_lock<std::mutex> ulock (m_mutex);
322  if (!is_failed ())
323  {
324  assert (m_last_batch_id == id - 1);
326  }
327  ulock.unlock ();
329 
330  er_clear ();
331  }
332 
333  void
335  {
336  std::unique_lock<std::mutex> ulock (m_mutex);
337  // free transaction index
339 
342  if (!is_failed ())
343  {
344  assert (m_last_batch_id == id - 1);
346  }
347  if (m_tran_indexes.erase (tran_index) != 1)
348  {
349  assert (false);
350  }
351  collect_stats ();
352  ulock.unlock ();
354 
355  er_clear ();
356  }
357 
358  void
360  {
361  std::unique_lock<std::mutex> ulock (m_mutex);
362  auto ret = m_tran_indexes.insert (tran_index);
363  assert (ret.second); // it means it was inserted
364  }
365 
366  void
367  session::on_error (std::string &err_msg)
368  {
369  std::unique_lock<std::mutex> ulock (m_mutex);
370 
372  m_stats.error_message.append (err_msg);
373  collect_stats ();
374  ulock.unlock ();
376  }
377 
378  void
379  session::fail (bool has_lock)
380  {
381  std::unique_lock<std::mutex> ulock (m_mutex, std::defer_lock);
382  if (!has_lock)
383  {
384  ulock.lock ();
385  }
386 
387  // check if failed after lock was acquired
388  if (m_is_failed)
389  {
390  return;
391  }
392 
393  m_is_failed = true;
394  if (!has_lock)
395  {
396  ulock.unlock ();
397  // notify waiting threads that session was aborted
399  }
400  else
401  {
402  // caller should manage notifications too
403  }
404  }
405 
406  bool
408  {
409  return m_is_failed;
410  }
411 
412  void
414  {
415  cubthread::entry *thread_p = &cubthread::get_entry ();
416  std::unique_lock<std::mutex> ulock (m_mutex);
417  for (auto &it : m_tran_indexes)
418  {
419  (void) logtb_set_tran_index_interrupt (thread_p, it, true);
420  }
421  fail (true);
422  ulock.unlock ();
424  }
425 
426  void
427  session::stats_update_rows_committed (int64_t rows_committed)
428  {
429  std::unique_lock<std::mutex> ulock (m_mutex);
430  m_stats.rows_committed += rows_committed;
431  }
432 
433  int64_t
435  {
436  return m_stats.rows_committed;
437  }
438 
439  void
440  session::stats_update_last_committed_line (int64_t last_committed_line)
441  {
442  if (last_committed_line <= m_stats.last_committed_line)
443  {
444  return;
445  }
446 
447  std::unique_lock<std::mutex> ulock (m_mutex);
448 
449  // check if again after lock was acquired
450  if (last_committed_line <= m_stats.last_committed_line)
451  {
452  return;
453  }
454 
455  m_stats.last_committed_line = last_committed_line;
456  }
457 
458  void
459  session::stats_update_current_line (int64_t current_line)
460  {
462  }
463 
464  template<typename T>
465  void
466  session::update_atomic_value_with_max (std::atomic<T> &atomic_val, T new_max)
467  {
468  int64_t curr_max;
469 
470  do
471  {
472  curr_max = atomic_val.load ();
473  if (curr_max >= new_max)
474  {
475  // max is already stored
476  break;
477  }
478  }
479  while (!atomic_val.compare_exchange_strong (curr_max, new_max));
480  }
481 
482  void
484  {
486  {
487  return;
488  }
489 
490  std::vector<const class_entry *> class_entries;
491  m_class_registry.get_all_class_entries (class_entries);
492 
494 
495  for (const class_entry *class_entry : class_entries)
496  {
497  if (!class_entry->is_ignored ())
498  {
499  OID *class_oid = const_cast<OID *> (&class_entry->get_class_oid ());
500  xstats_update_statistics (&thread_ref, class_oid, STATS_WITH_SAMPLING);
502  }
503  }
504  }
505 
508  {
509  return m_class_registry;
510  }
511 
512  const load_args &
514  {
515  return m_args;
516  }
517 
518  void
520  {
521  m_cond_var.notify_all ();
522  }
523 
524  int
525  session::install_class (cubthread::entry &thread_ref, const batch &batch, bool &is_ignored, std::string &cls_name)
526  {
527  thread_ref.m_loaddb_driver = m_driver;
528 
529  int error_code = NO_ERROR;
530  bool parser_result = invoke_parser (m_driver, batch);
531  const class_entry *cls_entry = get_class_registry ().get_class_entry (batch.get_class_id ());
532  if (cls_entry != NULL)
533  {
534  is_ignored = cls_entry->is_ignored ();
535  cls_name = cls_entry->get_class_name ();
536  }
537  else
538  {
539  is_ignored = false;
540  }
541 
542  if (is_ignored)
543  {
544  thread_ref.m_loaddb_driver = NULL;
545 
546  return NO_ERROR;
547  }
548 
549  if (is_failed () || !parser_result || er_has_error ())
550  {
551  fail ();
552 
553  error_code = er_errid_if_has_error ();
554  if (error_code == NO_ERROR)
555  {
556  error_code = ER_FAILED;
557  }
558  }
559 
560  thread_ref.m_loaddb_driver = NULL;
561 
562  return error_code;
563  }
564 
565  int
566  session::load_batch (cubthread::entry &thread_ref, const batch *batch, bool use_temp_batch, bool &is_batch_accepted,
567  load_status &status)
568  {
569  if (is_failed ())
570  {
571  return ER_FAILED;
572  }
573 
574  if (batch != NULL && batch->get_content ().empty ())
575  {
576  assert (false);
577  return ER_FAILED;
578  }
579 
580  cubthread::entry_task *task = NULL;
581  if (use_temp_batch)
582  {
583  assert (m_temp_task != NULL && batch == NULL);
584  task = m_temp_task;
585  }
586  else
587  {
588  assert (m_temp_task == NULL && batch != NULL);
590 
591  task = new load_task (*batch, *this, *thread_ref.conn_entry);
592  }
593 
594  std::unique_lock<std::mutex> ulock (m_mutex);
595  auto pred = [&] () -> bool
596  {
597  is_batch_accepted = worker_manager_try_task (task);
598  if (is_batch_accepted)
599  {
601  if (use_temp_batch)
602  {
603  m_temp_task = NULL;
604  }
605  }
606  else if (!use_temp_batch)
607  {
608  m_temp_task = task;
609  use_temp_batch = true;
610  }
611 
612  return !m_collected_stats.empty () || is_batch_accepted;
613  };
614 
615  // if worker pool is full, but all jobs belong to other sessions, nobody will notify me when a job is finished.
616  // loop & use timed waits instead of infinite wait
617  while (true)
618  {
619  const std::chrono::milliseconds WAIT_MS { 10 }; // wakeup every 10 milliseconds
620 
621  if (m_cond_var.wait_for (ulock, WAIT_MS, pred))
622  {
623  break;
624  }
625  // go back to waiting
626  }
627 
628  fetch_status (status, true);
629 
630  return NO_ERROR;
631  }
632 
633  void
635  {
636  m_collected_stats.emplace_back (m_stats);
637 
638  // since client periodically fetches the stats, clear error_message in order not to send twice same message
639  // However, for syntax checking we do not clear the messages since we throw the errors at the end
640  if (!m_args.syntax_check)
641  {
642  m_stats.error_message.clear ();
643  }
644  m_stats.log_message.clear ();
645  }
646 
647  void
648  session::fetch_status (load_status &status, bool has_lock)
649  {
650  std::unique_lock<std::mutex> ulock (m_mutex, std::defer_lock);
651  if (!has_lock)
652  {
653  ulock.lock ();
654  }
655 
656  std::vector<stats> stats_;
657  if (!m_collected_stats.empty ())
658  {
659  stats_ = std::move (m_collected_stats);
660  assert (!stats_.empty ());
661  assert (m_collected_stats.empty ());
662  }
663 
664  status = load_status (is_completed (), is_failed (), stats_);
665 
666  if (!has_lock)
667  {
668  ulock.unlock ();
669  }
670  }
671 
672 } // namespace cubload
void on_error(std::string &err_msg)
virtual void destroy()=0
#define NO_ERROR
Definition: error_code.h:46
void notify_batch_done_and_register_tran_end(batch_id id, int tran_index)
TRANTABLE trantable
Definition: log_impl.h:650
int parse(std::istream &iss, int line_offset=0)
Definition: load_driver.cpp:83
void execute(cubthread::entry &thread_ref) final
std::string error_message
void update_class_statistics(cubthread::entry &thread_ref)
std::mutex m_mutex
void notify_waiting_threads()
#define ER_FAILED
Definition: error_code.h:47
LOG_GLOBAL log_Gl
css_conn_entry * conn_entry
int install_class(cubthread::entry &thread_ref, const batch &batch, bool &is_ignored, std::string &cls_name)
void stats_update_rows_committed(int64_t rows_committed)
#define NULL_TRANID
#define TRAN_DEFAULT_ISOLATION_LEVEL()
Definition: dbtran_def.h:58
virtual int install_class(const char *class_name)=0
load_task(const batch &batch, session &session, css_conn_entry &conn_entry)
int64_t stats_get_rows_committed()
css_conn_entry & m_conn_entry
bool logtb_set_tran_index_interrupt(THREAD_ENTRY *thread_p, int tran_index, bool set)
#define SM_MAX_IDENTIFIER_LENGTH
class_registry m_class_registry
bool invoke_parser(driver *driver, const batch &batch_)
TRAN_STATE xtran_server_commit(THREAD_ENTRY *thrd, bool retain_lock)
std::atomic< batch_id > m_max_batch_id
MSGCAT_LOADDB_MSG
Definition: utility.h:438
TRAN_STATE xtran_server_abort(THREAD_ENTRY *thrd)
const char * get_class_name() const
std::set< int > m_tran_indexes
void notify_batch_done(batch_id id)
virtual void init(class_id clsid)=0
void wait_for_completion()
scanner & get_scanner()
const class_entry * get_class_entry(class_id clsid)
LOG_TDES ** all_tdes
Definition: log_impl.h:595
void append_log_msg(MSGCAT_LOADDB_MSG msg_id, Args &&...args)
std::string log_message
std::atomic< int64_t > current_line
#define assert(x)
const batch_id FIRST_BATCH_ID
Definition: load_common.hpp:45
std::condition_variable m_cond_var
std::string table_name
bool is_initialized()
Definition: load_driver.cpp:77
const batch_id NULL_BATCH_ID
Definition: load_common.hpp:43
class_registry & get_class_registry()
void notify_done_and_tran_end(int tran_index)
#define TRAN_LOCK_INFINITE_WAIT
Definition: log_comm.h:29
cubload::driver * m_loaddb_driver
const class_id FIRST_CLASS_ID
Definition: load_common.hpp:44
#define NULL
Definition: freelistheap.h:34
const load_args & get_args()
std::atomic< size_t > m_active_task_count
virtual std::size_t get_rows_number()=0
int64_t last_committed_line
int logtb_assign_tran_index(THREAD_ENTRY *thread_p, TRANID trid, TRAN_STATE state, const BOOT_CLIENT_CREDENTIAL *client_credential, TRAN_STATE *current_state, int wait_msecs, TRAN_ISOLATION isolation)
int64_t batch_id
Definition: load_common.hpp:39
CLIENTIDS client
Definition: log_impl.h:484
void worker_manager_unregister_session(session &load_session)
const std::string & get_content() const
batch_id m_last_batch_id
void fetch_status(load_status &status, bool has_lock=false)
int load_batch(cubthread::entry &thread_ref, const batch *batch, bool use_temp_batch, bool &is_batch_accepted, load_status &status)
int64_t get_line_offset() const
void logtb_free_tran_index(THREAD_ENTRY *thread_p, int tran_index)
bool er_has_error(void)
void register_tran_start(int tran_index)
void on_failure_with_line(MSGCAT_LOADDB_MSG msg_id, Args &&...args)
void init_driver(driver *driver, session &session)
int er_errid_if_has_error(void)
int intl_identifier_lower_string_size(const char *src)
void update_atomic_value_with_max(std::atomic< T > &atomic_val, T new_max)
int xstats_update_statistics(THREAD_ENTRY *thread_p, OID *classoid, bool with_fullscan)
class_installer & get_class_installer()
Definition: load_driver.cpp:96
object_loader & get_object_loader()
void er_clear(void)
std::vector< stats > m_collected_stats
void stats_update_last_committed_line(int64_t last_committed_line)
void initialize(class_installer *cls_installer, object_loader *obj_loader, error_handler *error_handler)
Definition: load_driver.cpp:65
class_id get_class_id() const
int64_t rows_committed
void fail(bool has_lock=false)
cubthread::entry_task * m_temp_task
bool is_ignored() const
void stats_update_current_line(int64_t current_line)
entry & get_entry(void)
#define STATS_WITH_SAMPLING
Definition: statistics.h:35
void worker_manager_register_session(session &load_session)
bool worker_manager_try_task(cubthread::entry_task *task)
void set_ids(db_client_type type, const char *client_info, const char *db_user, const char *program_name, const char *login_name, const char *host_name, int process_id)
batch_id get_id() const
void wait_for_previous_batch(batch_id id)
const batch & m_batch
const OID & get_class_oid() const
virtual void set_class_id(class_id clsid)=0
error_handler & get_error_handler()
void on_error_with_line(MSGCAT_LOADDB_MSG msg_id, Args &&...args)
session(load_args &args)
void get_all_class_entries(std::vector< const class_entry * > &entries) const
cubload::load_status load_status
void on_error(MSGCAT_LOADDB_MSG msg_id, Args &&...args)