CUBRID Engine  latest
load_session.hpp
Go to the documentation of this file.
1 /*
2  * Copyright 2008 Search Solution Corporation
3  * Copyright 2016 CUBRID Corporation
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 /*
20  * load_session.hpp - entry point for server side loaddb
21  */
22 
23 #ifndef _LOAD_SESSION_HPP_
24 #define _LOAD_SESSION_HPP_
25 
26 #include "dbtype_def.h"
27 #include "load_class_registry.hpp"
28 #include "load_common.hpp"
29 #include "load_error_handler.hpp"
30 #include "thread_entry_task.hpp"
31 #include "utility.h"
32 
33 #include <atomic>
34 #include <condition_variable>
35 #include <memory>
36 #include <mutex>
37 #include <set>
38 #include <string>
39 #include <vector>
40 
41 namespace cubload
42 {
43 
44  class driver;
45 
46  /*
47  * cubload::session
48  *
49  * description
50  * This class serves as an entry point to server side loaddb functionality.
51  * It has two main public function:
52  * * load_file : for parsing a loaddb object file directly on server if the file exists on the server machine
53  * * load_batch: when loaddb object file exists only on client machine, then client must send over
54  * the network batches from file and then these batches will be parsed by the server
55  *
56  * The file is split into batches or batches are received over the network, then each batch is delegated to a
57  * worker thread from a internal worker pool. The worker thread does the scanning/parsing and inserting of the data
58  * Loaddb session is attached to database client session in session_state struct
59  *
60  * how to use
61  * cubload::session *session = NULL;
62  * session_get_loaddb_context (thread_p, session);
63  *
64  * session.load_file (*thread_p);
65  *
66  * or
67  *
68  * cubload::batch batch = "<batch>"; // get batch from client
69  * session.load_batch (*thread_p, batch);
70  */
71  class session
72  {
73  public:
74  explicit session (load_args &args);
75  ~session ();
76 
77  session (session &&other) = delete; // Move c-tor: deleted
78  session (const session &copy) = delete; // Copy c-tor: deleted
79 
80  session &operator= (session &&other) = delete; // Move operator: deleted
81  session &operator= (const session &copy) = delete; // Copy operator: deleted
82 
83  /*
84  * Check and install a class from object file on the the server
85  *
86  * return: NO_ERROR in case of success or a error code in case of failure.
87  * thread_ref(in): thread entry
88  * batch(in) : a batch where content is a line starting with '%id' or '%class' from object file
89  */
90  int install_class (cubthread::entry &thread_ref, const batch &batch, bool &is_ignored, std::string &cls_name);
91 
92  /*
93  * Load a batch from object file on the the server
94  *
95  * return: NO_ERROR in case of success or a error code in case of failure.
96  * thread_ref(in): thread entry
97  * batch(in) : a batch from loaddb object
98  */
99  int load_batch (cubthread::entry &thread_ref, const batch *batch, bool use_temp_batch, bool &is_batch_accepted,
100  load_status &status);
101 
102  void wait_for_completion ();
104  void notify_batch_done (batch_id id);
105  void notify_batch_done_and_register_tran_end (batch_id id, int tran_index);
106  void register_tran_start (int tran_index);
107 
108  void on_error (std::string &err_msg);
109 
110  void fail (bool has_lock = false);
111  bool is_failed ();
112  void interrupt ();
113 
114  void fetch_status (load_status &status, bool has_lock = false);
115 
116  void stats_update_rows_committed (int64_t rows_committed);
117  int64_t stats_get_rows_committed ();
118 
119  void stats_update_last_committed_line (int64_t last_committed_line);
120  void stats_update_current_line (int64_t current_line);
121 
122  void update_class_statistics (cubthread::entry &thread_ref);
123  const load_args &get_args ();
124 
126 
127  template<typename... Args>
128  void append_log_msg (MSGCAT_LOADDB_MSG msg_id, Args &&... args);
129 
130  private:
131  void notify_waiting_threads ();
132  bool is_completed ();
133  void collect_stats ();
134 
135  template<typename T>
136  void update_atomic_value_with_max (std::atomic<T> &atomic_val, T new_max);
137 
139  std::condition_variable m_cond_var;
140  std::set<int> m_tran_indexes;
141 
144  std::atomic<batch_id> m_max_batch_id;
145  std::atomic<size_t> m_active_task_count; // note: all decrements need to be protected by mutex
146 
148 
149  stats m_stats; // load db stats
151  std::vector<stats> m_collected_stats;
152 
154 
156  };
157 
158 } // namespace cubload
159 
160 // alias declaration for legacy C files
162 
163 namespace cubload
164 {
165  // Template implementation
166  template<typename... Args>
167  void
168  session::append_log_msg (MSGCAT_LOADDB_MSG msg_id, Args &&... args)
169  {
170  if (get_args ().verbose)
171  {
172  std::string log_msg = error_handler::format_log_msg (msg_id, std::forward<Args> (args)...);
173 
174  std::unique_lock<std::mutex> ulock (m_mutex);
175 
176  m_stats.log_message.append (log_msg);
177 
178  collect_stats ();
179  ulock.unlock ();
181  }
182  }
183 }
184 
185 #endif /* _LOAD_SESSION_HPP_ */
void on_error(std::string &err_msg)
void notify_batch_done_and_register_tran_end(batch_id id, int tran_index)
void update_class_statistics(cubthread::entry &thread_ref)
std::mutex m_mutex
static API_MUTEX mutex
Definition: api_util.c:72
void notify_waiting_threads()
int install_class(cubthread::entry &thread_ref, const batch &batch, bool &is_ignored, std::string &cls_name)
void stats_update_rows_committed(int64_t rows_committed)
int64_t stats_get_rows_committed()
class_registry m_class_registry
static std::string format_log_msg(MSGCAT_LOADDB_MSG msg_id, Args &&...args)
std::atomic< batch_id > m_max_batch_id
MSGCAT_LOADDB_MSG
Definition: utility.h:438
std::set< int > m_tran_indexes
void notify_batch_done(batch_id id)
void wait_for_completion()
void append_log_msg(MSGCAT_LOADDB_MSG msg_id, Args &&...args)
std::string log_message
std::condition_variable m_cond_var
class_registry & get_class_registry()
const load_args & get_args()
std::atomic< size_t > m_active_task_count
session & operator=(session &&other)=delete
int64_t batch_id
Definition: load_common.hpp:39
batch_id m_last_batch_id
void fetch_status(load_status &status, bool has_lock=false)
int load_batch(cubthread::entry &thread_ref, const batch *batch, bool use_temp_batch, bool &is_batch_accepted, load_status &status)
void register_tran_start(int tran_index)
void update_atomic_value_with_max(std::atomic< T > &atomic_val, T new_max)
std::vector< stats > m_collected_stats
void stats_update_last_committed_line(int64_t last_committed_line)
void fail(bool has_lock=false)
cubthread::entry_task * m_temp_task
void stats_update_current_line(int64_t current_line)
void wait_for_previous_batch(batch_id id)
session(load_args &args)