File load_session.cpp¶
File List > cubrid > src > loaddb > load_session.cpp
Go to the documentation of this file
/*
* Copyright 2008 Search Solution Corporation
* Copyright 2016 CUBRID Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
/*
* load_session.cpp - entry point for server side loaddb
*/
#include "load_session.hpp"
#include "load_driver.hpp"
#include "load_server_loader.hpp"
#include "load_worker_manager.hpp"
#include "resource_shared_pool.hpp"
#include "xserver_interface.h"
#include <sstream>
// XXX: SHOULD BE THE LAST INCLUDE HEADER
#include "memory_wrapper.hpp"
namespace cubload
{
void init_driver (driver *driver, session &session);
bool invoke_parser (driver *driver, const batch &batch_);
}
namespace cubload
{
void
init_driver (driver *driver, session &session)
{
if (driver == NULL)
{
session.fail ();
assert (false);
return;
}
// avoid driver being initialized twice
if (driver->is_initialized ())
{
return;
}
error_handler *error_handler_ = new error_handler (session);
class_installer *cls_installer = new server_class_installer (session, *error_handler_);
object_loader *obj_loader = new server_object_loader (session, *error_handler_);
driver->initialize (cls_installer, obj_loader, error_handler_);
}
bool
invoke_parser (driver *driver, const batch &batch_)
{
if (driver == NULL || !driver->is_initialized ())
{
return false;
}
driver->get_object_loader ().init (batch_.get_class_id ());
driver->get_class_installer ().set_class_id (batch_.get_class_id ());
// parse doc says that 0 is returned if parsing succeeds
std::istringstream iss (batch_.get_content ());
int parser_result = driver->parse (iss, batch_.get_line_offset ());
driver->get_object_loader ().destroy ();
return parser_result == 0;
}
/*
* cubload::load_worker
* extends cubthread::entry_task
*
* description
* Loaddb worker thread task, which does parsing and inserting of data rows within a transaction
*/
class load_task : public cubthread::entry_task
{
public:
load_task () = delete; // Default c-tor: deleted.
~load_task () override
{
if (!m_was_session_notified)
{
notify_done ();
}
delete &m_batch;
}
load_task (const batch &batch, session &session, css_conn_entry &conn_entry)
: m_batch (batch)
, m_session (session)
, m_conn_entry (conn_entry)
, m_was_session_notified (false)
{
//
}
void execute (cubthread::entry &thread_ref) final
{
if (m_session.is_failed ())
{
return;
}
thread_ref.conn_entry = &m_conn_entry;
driver *driver = thread_ref.m_loaddb_driver;
assert (driver != NULL &&!driver->is_initialized ());
init_driver (driver, m_session);
bool is_syntax_check_only = m_session.get_args ().syntax_check;
const class_entry *cls_entry = m_session.get_class_registry ().get_class_entry (m_batch.get_class_id ());
if (cls_entry == NULL)
{
if (!is_syntax_check_only)
{
driver->get_error_handler ().on_failure_with_line (LOADDB_MSG_TABLE_IS_MISSING);
}
else
{
driver->get_error_handler ().on_error_with_line (LOADDB_MSG_TABLE_IS_MISSING);
}
driver->clear ();
notify_done ();
return;
}
logtb_assign_tran_index (&thread_ref, NULL_TRANID, TRAN_ACTIVE, NULL, NULL, TRAN_LOCK_INFINITE_WAIT,
TRAN_DEFAULT_ISOLATION_LEVEL ());
int tran_index = thread_ref.tran_index;
m_session.register_tran_start (tran_index);
// Get the clientids from the session and set it on the current worker.
LOG_TDES *session_tdes = log_Gl.trantable.all_tdes[m_conn_entry.get_tran_index ()];
LOG_TDES *worker_tdes = log_Gl.trantable.all_tdes[tran_index];
worker_tdes->client.set_ids (session_tdes->client);
bool parser_result = invoke_parser (driver, m_batch);
// Get the class name.
std::string class_name = cls_entry->get_class_name ();
// We need this to update the stats.
int line_no = driver->get_scanner ().lineno ();
// Get the inserted lines
std::size_t rows_number = driver->get_object_loader ().get_rows_number ();
// We don't need anything from the driver anymore.
driver->clear ();
if (m_session.is_failed () || (!is_syntax_check_only && (!parser_result || er_has_error ())))
{
// if a batch transaction was aborted and syntax only is not enabled then abort entire loaddb session
m_session.fail ();
xtran_server_abort (&thread_ref);
}
else
{
// order batch commits, therefore wait until previous batch is committed
m_session.wait_for_previous_batch (m_batch.get_id ());
xtran_server_commit (&thread_ref, false);
// update load statistics after commit
m_session.stats_update_rows_committed (rows_number);
m_session.stats_update_last_committed_line (line_no + 1);
MSGCAT_LOADDB_MSG msg_type;
if (m_session.get_args ().syntax_check)
{
msg_type = LOADDB_MSG_INSTANCE_COUNT;
}
else
{
msg_type = LOADDB_MSG_COMMITTED_INSTANCES;
}
m_session.append_log_msg (msg_type, class_name.c_str (), rows_number);
}
// Clear the clientids.
worker_tdes->client.reset ();
// notify session that batch is done
notify_done_and_tran_end (tran_index);
}
private:
void notify_done ()
{
assert (!m_was_session_notified);
m_session.notify_batch_done (m_batch.get_id ());
m_was_session_notified = true;
}
void notify_done_and_tran_end (int tran_index)
{
assert (!m_was_session_notified);
m_session.notify_batch_done_and_register_tran_end (m_batch.get_id (), tran_index);
m_was_session_notified = true;
}
const batch &m_batch;
session &m_session;
css_conn_entry &m_conn_entry;
bool m_was_session_notified;
};
session::session (load_args &args)
: m_mutex ()
, m_cond_var ()
, m_tran_indexes ()
, m_args (args)
, m_last_batch_id {NULL_BATCH_ID}
, m_max_batch_id {NULL_BATCH_ID}
, m_active_task_count {0}
, m_class_registry ()
, m_load_client_type (DB_CLIENT_TYPE_LOADDB_UTILITY)
, m_stats ()
, m_is_failed (false)
, m_collected_stats ()
, m_driver (NULL)
, m_temp_task (NULL)
{
worker_manager_register_session (*this);
m_driver = new driver ();
init_driver (m_driver, *this);
if (!m_args.table_name.empty ())
{
// just set class id to 1 since only one table can be specified as command line argument
cubthread::entry &thread_ref = cubthread::get_entry ();
{
const char *dot = NULL;
const char *class_name = NULL;
int len = 0;
class_name = m_args.table_name.c_str ();
len = STATIC_CAST (int, strlen (class_name));
dot = strchr (class_name, '.');
if (dot)
{
/* user specified name */
/* user name of user specified name */
len = STATIC_CAST (int, dot - class_name);
if (len >= DB_MAX_USER_LENGTH)
{
m_driver->get_error_handler ().on_error (LOADDB_MSG_EXCEED_MAX_USER_LEN, DB_MAX_USER_LENGTH - 1);
return;
}
/* class name of user specified name */
len = STATIC_CAST (int, strlen (dot + 1));
}
if (len >= DB_MAX_IDENTIFIER_LENGTH - DB_MAX_USER_LENGTH)
{
m_driver->get_error_handler ().on_error (LOADDB_MSG_EXCEED_MAX_LEN, DB_MAX_IDENTIFIER_LENGTH - DB_MAX_USER_LENGTH - 1);
return;
}
}
thread_ref.m_loaddb_driver = m_driver;
m_driver->get_class_installer ().set_class_id (FIRST_CLASS_ID);
m_driver->get_class_installer ().install_class (m_args.table_name.c_str ());
thread_ref.m_loaddb_driver = NULL;
}
}
session::~session ()
{
delete m_driver;
worker_manager_unregister_session (*this);
}
bool
session::is_completed ()
{
return m_last_batch_id == m_max_batch_id;
}
void
session::wait_for_previous_batch (const batch_id id)
{
auto pred = [this, &id] () -> bool { return is_failed () || id == (m_last_batch_id + 1); };
if (id == FIRST_BATCH_ID || pred ())
{
return;
}
std::unique_lock<std::mutex> ulock (m_mutex);
m_cond_var.wait (ulock, pred);
}
void
session::wait_for_completion ()
{
auto pred = [this] () -> bool
{
// condition of finish and no active tasks
return (is_failed () || is_completed ()) && (m_active_task_count == 0);
};
if (pred ())
{
return;
}
std::unique_lock<std::mutex> ulock (m_mutex);
m_cond_var.wait (ulock, pred);
}
void
session::notify_batch_done (batch_id id)
{
std::unique_lock<std::mutex> ulock (m_mutex);
assert (m_active_task_count > 0);
--m_active_task_count;
if (!is_failed ())
{
assert (m_last_batch_id == id - 1);
m_last_batch_id = id;
}
ulock.unlock ();
notify_waiting_threads ();
er_clear ();
}
void
session::notify_batch_done_and_register_tran_end (batch_id id, int tran_index)
{
std::unique_lock<std::mutex> ulock (m_mutex);
// free transaction index
logtb_free_tran_index (&cubthread::get_entry (), tran_index);
assert (m_active_task_count > 0);
--m_active_task_count;
if (!is_failed ())
{
assert (m_last_batch_id == id - 1);
m_last_batch_id = id;
}
if (m_tran_indexes.erase (tran_index) != 1)
{
assert (false);
}
collect_stats ();
ulock.unlock ();
notify_waiting_threads ();
er_clear ();
}
void
session::register_tran_start (int tran_index)
{
std::unique_lock<std::mutex> ulock (m_mutex);
auto ret = m_tran_indexes.insert (tran_index);
assert (ret.second); // it means it was inserted
}
void
session::on_error (std::string &err_msg)
{
std::unique_lock<std::mutex> ulock (m_mutex);
m_stats.rows_failed++;
m_stats.error_message.append (err_msg);
collect_stats ();
ulock.unlock ();
notify_waiting_threads ();
}
void
session::fail (bool has_lock)
{
std::unique_lock<std::mutex> ulock (m_mutex, std::defer_lock);
if (!has_lock)
{
ulock.lock ();
}
// check if failed after lock was acquired
if (m_is_failed)
{
return;
}
m_is_failed = true;
if (!has_lock)
{
ulock.unlock ();
// notify waiting threads that session was aborted
notify_waiting_threads ();
}
else
{
// caller should manage notifications too
}
}
bool
session::is_failed ()
{
return m_is_failed;
}
void
session::interrupt ()
{
cubthread::entry *thread_p = &cubthread::get_entry ();
std::unique_lock<std::mutex> ulock (m_mutex);
for (auto &it : m_tran_indexes)
{
(void) logtb_set_tran_index_interrupt (thread_p, it, true);
}
fail (true);
ulock.unlock ();
notify_waiting_threads ();
}
void
session::stats_update_rows_committed (int64_t rows_committed)
{
std::unique_lock<std::mutex> ulock (m_mutex);
m_stats.rows_committed += rows_committed;
}
int64_t
session::stats_get_rows_committed ()
{
return m_stats.rows_committed;
}
void
session::stats_update_last_committed_line (int64_t last_committed_line)
{
if (last_committed_line <= m_stats.last_committed_line)
{
return;
}
std::unique_lock<std::mutex> ulock (m_mutex);
// check if again after lock was acquired
if (last_committed_line <= m_stats.last_committed_line)
{
return;
}
m_stats.last_committed_line = last_committed_line;
}
void
session::stats_update_current_line (int64_t current_line)
{
update_atomic_value_with_max (m_stats.current_line, current_line);
}
template<typename T>
void
session::update_atomic_value_with_max (std::atomic<T> &atomic_val, T new_max)
{
int64_t curr_max;
do
{
curr_max = atomic_val.load ();
if (curr_max >= new_max)
{
// max is already stored
break;
}
}
while (!atomic_val.compare_exchange_strong (curr_max, new_max));
}
class_registry &
session::get_class_registry ()
{
return m_class_registry;
}
const load_args &
session::get_args ()
{
return m_args;
}
int
session::get_client_type ()
{
return m_load_client_type.load ();
}
void
session::set_client_type (int client_type)
{
m_load_client_type.store (client_type);
}
void
session::notify_waiting_threads ()
{
m_cond_var.notify_all ();
}
int
session::install_class (cubthread::entry &thread_ref, const batch &batch, bool &is_ignored, std::string &cls_name)
{
thread_ref.m_loaddb_driver = m_driver;
int error_code = NO_ERROR;
bool parser_result = invoke_parser (m_driver, batch);
const class_entry *cls_entry = get_class_registry ().get_class_entry (batch.get_class_id ());
if (cls_entry != NULL)
{
is_ignored = cls_entry->is_ignored ();
cls_name = cls_entry->get_class_name ();
}
else
{
is_ignored = false;
}
if (is_ignored)
{
thread_ref.m_loaddb_driver = NULL;
return NO_ERROR;
}
if (is_failed () || !parser_result || er_has_error ())
{
fail ();
error_code = er_errid_if_has_error ();
if (error_code == NO_ERROR)
{
error_code = ER_FAILED;
}
}
thread_ref.m_loaddb_driver = NULL;
return error_code;
}
int
session::load_batch (cubthread::entry &thread_ref, const batch *batch, bool use_temp_batch, bool &is_batch_accepted,
load_status &status)
{
if (is_failed ())
{
return ER_FAILED;
}
if (batch != NULL && batch->get_content ().empty ())
{
assert (false);
return ER_FAILED;
}
cubthread::entry_task *task = NULL;
if (use_temp_batch)
{
assert (m_temp_task != NULL && batch == NULL);
task = m_temp_task;
}
else
{
assert (m_temp_task == NULL && batch != NULL);
update_atomic_value_with_max (m_max_batch_id, batch->get_id ());
task = new load_task (*batch, *this, *thread_ref.conn_entry);
}
std::unique_lock<std::mutex> ulock (m_mutex);
auto pred = [&] () -> bool
{
is_batch_accepted = worker_manager_try_task (task);
if (is_batch_accepted)
{
++m_active_task_count;
if (use_temp_batch)
{
m_temp_task = NULL;
}
}
else if (!use_temp_batch)
{
m_temp_task = task;
use_temp_batch = true;
}
return !m_collected_stats.empty () || is_batch_accepted;
};
// if worker pool is full, but all jobs belong to other sessions, nobody will notify me when a job is finished.
// loop & use timed waits instead of infinite wait
while (true)
{
const std::chrono::milliseconds WAIT_MS { 10 }; // wakeup every 10 milliseconds
if (m_cond_var.wait_for (ulock, WAIT_MS, pred))
{
break;
}
// go back to waiting
}
fetch_status (status, true);
return NO_ERROR;
}
void
session::collect_stats ()
{
m_collected_stats.emplace_back (m_stats);
// since client periodically fetches the stats, clear error_message in order not to send twice same message
// However, for syntax checking we do not clear the messages since we throw the errors at the end
if (!m_args.syntax_check)
{
m_stats.error_message.clear ();
}
m_stats.log_message.clear ();
}
void
session::fetch_status (load_status &status, bool has_lock)
{
std::unique_lock<std::mutex> ulock (m_mutex, std::defer_lock);
if (!has_lock)
{
ulock.lock ();
}
std::vector<stats> stats_;
if (!m_collected_stats.empty ())
{
stats_ = std::move (m_collected_stats);
assert (!stats_.empty ());
assert (m_collected_stats.empty ());
}
status = load_status (get_client_type (), is_completed (), is_failed (), stats_);
if (!has_lock)
{
ulock.unlock ();
}
}
} // namespace cubload