File load_session.hpp¶
File List > cubrid > src > loaddb > load_session.hpp
Go to the documentation of this file
/*
* Copyright 2008 Search Solution Corporation
* Copyright 2016 CUBRID Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
/*
* load_session.hpp - entry point for server side loaddb
*/
#ifndef _LOAD_SESSION_HPP_
#define _LOAD_SESSION_HPP_
#include "dbtype_def.h"
#include "load_class_registry.hpp"
#include "load_common.hpp"
#include "load_error_handler.hpp"
#include "thread_entry_task.hpp"
#include "utility.h"
#include <atomic>
#include <condition_variable>
#include <memory>
#include <mutex>
#include <set>
#include <string>
#include <vector>
namespace cubload
{
class driver;
/*
* cubload::session
*
* description
* This class serves as an entry point to server side loaddb functionality.
* It has two main public function:
* * load_file : for parsing a loaddb object file directly on server if the file exists on the server machine
* * load_batch: when loaddb object file exists only on client machine, then client must send over
* the network batches from file and then these batches will be parsed by the server
*
* The file is split into batches or batches are received over the network, then each batch is delegated to a
* worker thread from a internal worker pool. The worker thread does the scanning/parsing and inserting of the data
* Loaddb session is attached to database client session in session_state struct
*
* how to use
* cubload::session *session = NULL;
* session_get_loaddb_context (thread_p, session);
*
* session.load_file (*thread_p);
*
* or
*
* cubload::batch batch = "<batch>"; // get batch from client
* session.load_batch (*thread_p, batch);
*/
class session
{
public:
explicit session (load_args &args);
~session ();
session (session &&other) = delete; // Move c-tor: deleted
session (const session ©) = delete; // Copy c-tor: deleted
session &operator= (session &&other) = delete; // Move operator: deleted
session &operator= (const session ©) = delete; // Copy operator: deleted
/*
* Check and install a class from object file on the the server
*
* return: NO_ERROR in case of success or a error code in case of failure.
* thread_ref(in): thread entry
* batch(in) : a batch where content is a line starting with '%id' or '%class' from object file
*/
int install_class (cubthread::entry &thread_ref, const batch &batch, bool &is_ignored, std::string &cls_name);
/*
* Load a batch from object file on the the server
*
* return: NO_ERROR in case of success or a error code in case of failure.
* thread_ref(in): thread entry
* batch(in) : a batch from loaddb object
*/
int load_batch (cubthread::entry &thread_ref, const batch *batch, bool use_temp_batch, bool &is_batch_accepted,
load_status &status);
void wait_for_completion ();
void wait_for_previous_batch (batch_id id);
void notify_batch_done (batch_id id);
void notify_batch_done_and_register_tran_end (batch_id id, int tran_index);
void register_tran_start (int tran_index);
void on_error (std::string &err_msg);
void fail (bool has_lock = false);
bool is_failed ();
void interrupt ();
void fetch_status (load_status &status, bool has_lock = false);
void stats_update_rows_committed (int64_t rows_committed);
int64_t stats_get_rows_committed ();
void stats_update_last_committed_line (int64_t last_committed_line);
void stats_update_current_line (int64_t current_line);
const load_args &get_args ();
class_registry &get_class_registry ();
void set_client_type (int client_type);
int get_client_type ();
template<typename... Args>
void append_log_msg (MSGCAT_LOADDB_MSG msg_id, Args &&... args);
private:
void notify_waiting_threads ();
bool is_completed ();
void collect_stats ();
template<typename T>
void update_atomic_value_with_max (std::atomic<T> &atomic_val, T new_max);
std::mutex m_mutex;
std::condition_variable m_cond_var;
std::set<int> m_tran_indexes;
load_args m_args;
batch_id m_last_batch_id;
std::atomic<batch_id> m_max_batch_id;
std::atomic<size_t> m_active_task_count; // note: all decrements need to be protected by mutex
class_registry m_class_registry;
std::atomic<int> m_load_client_type; /* ADMIN_LOADDB_COMPAT_UNDER_11_2 or ADMIN_LOADDB_COMPAT_UNDER_11_4 */
stats m_stats; // load db stats
bool m_is_failed;
std::vector<stats> m_collected_stats;
driver *m_driver;
cubthread::entry_task *m_temp_task;
};
} // namespace cubload
// alias declaration for legacy C files
using load_session = cubload::session;
namespace cubload
{
// Template implementation
template<typename... Args>
void
session::append_log_msg (MSGCAT_LOADDB_MSG msg_id, Args &&... args)
{
if (get_args ().verbose)
{
std::string log_msg = error_handler::format_log_msg (msg_id, std::forward<Args> (args)...);
std::unique_lock<std::mutex> ulock (m_mutex);
m_stats.log_message.append (log_msg);
collect_stats ();
ulock.unlock ();
notify_waiting_threads ();
}
}
}
#endif /* _LOAD_SESSION_HPP_ */