File log_recovery_redo_parallel.cpp¶
File List > cubrid > src > transaction > log_recovery_redo_parallel.cpp
Go to the documentation of this file
/*
* Copyright 2008 Search Solution Corporation
* Copyright 2016 CUBRID Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include "log_recovery_redo_parallel.hpp"
#include "log_manager.h"
#include "vpid.hpp"
// XXX: SHOULD BE THE LAST INCLUDE HEADER
#include "memory_wrapper.hpp"
namespace cublog
{
/*********************************************************************
* redo_parallel::task_active_state_bookkeeping - definition
*********************************************************************/
redo_parallel::task_active_state_bookkeeping::task_active_state_bookkeeping (std::size_t a_size)
: m_size { a_size }
{
assert (a_size < BITSET_MAX_SIZE);
}
inline void
redo_parallel::task_active_state_bookkeeping::set_active (std::size_t a_index)
{
assert (a_index < m_size);
std::lock_guard<std::mutex> lockg { m_values_mtx };
assert (!m_values.test (a_index));
m_values.set (a_index);
}
inline bool
redo_parallel::task_active_state_bookkeeping::is_active (std::size_t a_index) const
{
assert (a_index < m_size);
std::lock_guard<std::mutex> lockg { m_values_mtx };
return m_values.test (a_index);
}
inline void
redo_parallel::task_active_state_bookkeeping::set_inactive (std::size_t a_index)
{
assert (a_index < m_size);
{
std::lock_guard<std::mutex> lockg { m_values_mtx };
assert (m_values.test (a_index));
m_values.reset (a_index);
}
// since active/inactive state is nominal (ie: per each entry), notification has to be nominal as well
m_values_cv.notify_one ();
}
bool
redo_parallel::task_active_state_bookkeeping::is_any_active () const
{
std::lock_guard<std::mutex> lockg { m_values_mtx };
return m_values.any ();
}
inline void
redo_parallel::task_active_state_bookkeeping::wait_for_termination ()
{
std::unique_lock<std::mutex> ulock (m_values_mtx);
m_values_cv.wait (ulock, [this] ()
{
return m_values.none ();
});
}
/*********************************************************************
* redo_parallel::redo_task - declaration
*********************************************************************/
/* a long running task looping and processing redo log jobs;
* offers some 'support' instances to the passing guest log jobs: a log reader,
* unzip support memory for undo data, unzip support memory for redo data;
* internal implementation detail; not to be used externally
*/
class redo_parallel::redo_task final : public cubthread::task<cubthread::entry>
{
private:
using redo_job_vector_t = std::vector<redo_parallel::redo_job_base *>;
public:
redo_task () = delete;
redo_task (std::size_t a_task_idx, bool a_do_monitor_unapplied_log_lsa,
redo_parallel::task_active_state_bookkeeping &task_state_bookkeeping,
const log_rv_redo_context ©_context);
redo_task (const redo_task &) = delete;
redo_task (redo_task &&) = delete;
~redo_task () override;
redo_task &operator = (const redo_task &) = delete;
redo_task &operator = (redo_task &&) = delete;
void execute (context_type &context) override;
void retire () override;
void log_perf_stats () const;
void accumulate_perf_stats (cubperf::stat_value *a_output_stats, std::size_t a_output_stats_size) const;
bool is_idle () const;
inline void push_job (redo_parallel::redo_job_base *a_job);
inline void notify_adding_finished ();
inline log_lsa get_unapplied_log_lsa () const;
private:
inline void pop_jobs (redo_job_vector_t &a_out_job_vec, bool &a_out_adding_finished);
inline void set_unapplied_log_lsa_from_push (const log_lsa &a_log_lsa,
const std::lock_guard<std::mutex> &);
inline void set_unapplied_log_lsa_from_pop_func (const log_lsa &a_log_lsa,
std::unique_lock<std::mutex> &a_ulock);
inline void set_unapplied_log_lsa_from_execute_func (const log_lsa &a_log_lsa);
private:
const std::size_t m_task_idx;
const bool m_do_monitor_unapplied_log_lsa;
redo_parallel::task_active_state_bookkeeping &m_task_state_bookkeeping;
cubperf::statset_definition m_perf_stats_definition;
perf_stats m_perf_stats;
log_rv_redo_context m_redo_context;
redo_job_vector_t m_produce_vec;
mutable std::mutex m_produce_vec_mtx;
std::condition_variable m_produce_vec_cv;
bool m_adding_finished;
/* minimum still-to-be-applied (not-applied) log_lsa for a single log applying task
* scenarios:
* - job is pushed for processing to the internal waiting queue:
* - internal waiting queue and processing queue are still empty:
* - old: MAX_LSA
* - new: pushed job's log_lsa
* - either internal waiting queue, or processing queue, or both are non-empty:
* - old: minimum log_lsa is smaller than that of the currently pushed job since
* jobs log_lsa's are in ever increasing order
* - task picks-up existing jobs in the internal waiting queue
* - if previosly processing queue was non empty:
* - minimum log_lsa from the previous update from the processing queue
* - if previous processing queue was empty
* - if waiting queue is empty
* - MAX_LSA
* - if waiting queue is non-empty
* - minimum log_lsa from waiting queue
* - task has just finished processing a job:
* - there are still jobs to process in the processing queue:
* - minimum log_lsa from the processing queue (ie: the log_lsa of the next job since
* log_lsa are inserted in ever increasing order
* - processing queue is empty
* - no change,the minimum log_lsa will be advanced upon subsequent request to pop
* jobs from the internal waiting queue (see corresponding step)
*
* - a job is pushed for processing to the internal waiting queue:
* - if current minimum log_lsa is MAX_LSA, the newly added job's log_lsa
* becomes the new minimum log_lsa
* - if current minimum log_lsa is not MAX_LSA, assert that the newly added
* job has a log_lsa greater than the existing minimum log_lsa
*/
std::atomic<log_lsa> m_unapplied_log_lsa;
};
/*********************************************************************
* redo_parallel::redo_task - definition
*********************************************************************/
redo_parallel::redo_task::redo_task (std::size_t a_task_idx, bool a_do_monitor_unapplied_log_lsa,
redo_parallel::task_active_state_bookkeeping &task_state_bookkeeping,
const log_rv_redo_context ©_context)
: m_task_idx { a_task_idx }
, m_do_monitor_unapplied_log_lsa { a_do_monitor_unapplied_log_lsa }
, m_task_state_bookkeeping (task_state_bookkeeping)
, m_perf_stats_definition { perf_stats_async_definition_init_list }
, m_perf_stats { perf_stats_is_active_for_async (), m_perf_stats_definition }
, m_redo_context { copy_context }
, m_adding_finished { false }
, m_unapplied_log_lsa { MAX_LSA }
{
// important to set this at this moment and not when execution begins
// to circumvent race conditions where all tasks haven't yet started work
// while already bookkeeping is being checked
m_task_state_bookkeeping.set_active (m_task_idx);
m_produce_vec.reserve (PARALLEL_REDO_JOB_VECTOR_RESERVE_SIZE);
}
redo_parallel::redo_task::~redo_task ()
{
assert (m_adding_finished); // unguarded read
assert (is_idle ());
}
void
redo_parallel::redo_task::execute (context_type &context)
{
redo_job_vector_t jobs_vec;
// according to spec, reserved size survives clearing of the vector
// which should help to only allocate/reserve once
jobs_vec.reserve (PARALLEL_REDO_JOB_VECTOR_RESERVE_SIZE);
for ( ; ; )
{
bool adding_finished { false };
pop_jobs (jobs_vec, adding_finished);
m_perf_stats.time_and_increment (cublog::PERF_STAT_ID_PARALLEL_POP);
if (jobs_vec.empty () && adding_finished)
{
// do not finish before executing all jobs
break;
}
else
{
assert (!jobs_vec.empty ());
THREAD_ENTRY *const thread_entry = &context;
for (auto &job : jobs_vec)
{
if (m_do_monitor_unapplied_log_lsa)
{
set_unapplied_log_lsa_from_execute_func (job->get_log_lsa ());
}
job->execute (thread_entry, m_redo_context);
m_perf_stats.time_and_increment (cublog::PERF_STAT_ID_PARALLEL_EXECUTE);
job->retire (m_task_idx);
m_perf_stats.time_and_increment (cublog::PERF_STAT_ID_PARALLEL_RETIRE);
}
// pointers still present in the vector are either:
// - already passed on to the reusable job container
// - dangling, as they have deleted themselves
jobs_vec.clear ();
}
}
assert (jobs_vec.empty ());
m_task_state_bookkeeping.set_inactive (m_task_idx);
}
void
redo_parallel::redo_task::retire ()
{
// avoid self destruct, will be deleted by owning class
// NOTE: this is needed to be able to collect post-execution perf stats from the tasks
}
void
redo_parallel::redo_task::log_perf_stats () const
{
std::stringstream ss;
ss << "Log recovery redo worker thread " << m_task_idx << " perf stats";
m_perf_stats.log (ss.str ().c_str ());
}
void
redo_parallel::redo_task::accumulate_perf_stats (
cubperf::stat_value *a_output_stats, std::size_t a_output_stats_size) const
{
m_perf_stats.accumulate (a_output_stats, a_output_stats_size);
}
bool
redo_parallel::redo_task::is_idle () const
{
std::lock_guard<std::mutex> lockg { m_produce_vec_mtx };
return !m_task_state_bookkeeping.is_active (m_task_idx);
}
inline void
redo_parallel::redo_task::push_job (redo_parallel::redo_job_base *a_job)
{
assert (false == m_adding_finished); // unguarded read
static std::size_t minimum_jobs_in_queue = 0;
if (minimum_jobs_in_queue == 0)
{
minimum_jobs_in_queue = (std::size_t) prm_get_integer_value (PRM_ID_RECOVERY_REDO_MINIMUM_JOB_COUNT);
}
bool first_job_in_produce_vec = false;
{
std::lock_guard<std::mutex> lockg { m_produce_vec_mtx };
first_job_in_produce_vec = m_produce_vec.empty ();
if (m_do_monitor_unapplied_log_lsa && first_job_in_produce_vec)
{
set_unapplied_log_lsa_from_push (a_job->get_log_lsa (), lockg);
}
m_produce_vec.push_back (a_job);
}
if (m_produce_vec.size () > minimum_jobs_in_queue)
{
m_produce_vec_cv.notify_one ();
}
}
inline void
redo_parallel::redo_task::notify_adding_finished ()
{
{
// for a condition variable, checking the predicate and waiting are not performed atomically
// while unlocking the lock and sleeping are performed atomically
// therefore, force a lock to avoid the situation where the notification occurs just after
// the predicate is checked and before going to sleep
std::lock_guard<std::mutex> lockg { m_produce_vec_mtx };
m_adding_finished = true;
}
m_produce_vec_cv.notify_one ();
}
inline void
redo_parallel::redo_task::pop_jobs (redo_parallel::redo_task::redo_job_vector_t &a_out_job_vec,
bool &a_out_adding_finished)
{
assert (a_out_job_vec.empty ());
static int period_in_sec = -1;
if (period_in_sec == -1)
{
period_in_sec = prm_get_integer_value (PRM_ID_RECOVERY_REDO_JOB_PERIOD_IN_SECS);
}
a_out_adding_finished = false;
std::unique_lock<std::mutex> ulock { m_produce_vec_mtx };
while (!m_produce_vec_cv.wait_for (ulock, std::chrono::seconds (period_in_sec), [this, &ulock,
&a_out_adding_finished] ()
{
if (m_produce_vec.empty ())
{
if (m_do_monitor_unapplied_log_lsa)
{
set_unapplied_log_lsa_from_pop_func (MAX_LSA, ulock);
}
// adding having finished is also a termination condition
a_out_adding_finished = m_adding_finished;
return a_out_adding_finished;
}
return true;
}));
assert ((!m_produce_vec.empty () && !a_out_adding_finished) ||
(m_produce_vec.empty () && a_out_adding_finished)); // xor
m_produce_vec.swap (a_out_job_vec);
if (m_do_monitor_unapplied_log_lsa)
{
// just before adding finishes, there might be no more jobs
const log_lsa new_log_lsa = a_out_job_vec.empty ()
? MAX_LSA
: (*a_out_job_vec.begin ())->get_log_lsa ();
set_unapplied_log_lsa_from_pop_func (new_log_lsa, ulock);
}
}
inline void
redo_parallel::redo_task::set_unapplied_log_lsa_from_push (const log_lsa &a_log_lsa,
const std::lock_guard<std::mutex> &)
{
assert (m_do_monitor_unapplied_log_lsa);
const log_lsa snapshot_unapplied_log_lsa = m_unapplied_log_lsa.load ();
if (snapshot_unapplied_log_lsa.is_max ())
{
m_unapplied_log_lsa.store (a_log_lsa);
}
else
{
// strict comparison because jobs have ever-increasing log_lsa's
assert (snapshot_unapplied_log_lsa < a_log_lsa);
}
}
inline void
redo_parallel::redo_task::set_unapplied_log_lsa_from_pop_func (const log_lsa &a_log_lsa,
std::unique_lock<std::mutex> &a_ulock)
{
assert (m_do_monitor_unapplied_log_lsa);
assert (a_ulock.owns_lock ());
// either replace valid value with max - when task goes to idle mode
// or replace with a greater or equal when the task moves log_lsa's to execution
// -or-equal because push side can fill in a minimum log_lsa, while the task is still idle (ie: has
// nothing yet to execute), which is then found and tested-only on the pop side (first in pop
// function, and then in the execute function before first job in the newly popped vector is executed)
assert (m_unapplied_log_lsa.load () <= a_log_lsa);
m_unapplied_log_lsa.store (a_log_lsa);
}
inline void
redo_parallel::redo_task::set_unapplied_log_lsa_from_execute_func (const log_lsa &a_log_lsa)
{
assert (m_do_monitor_unapplied_log_lsa);
const log_lsa snapshot_unapplied_log_lsa = m_unapplied_log_lsa.load ();
// can never be max because it was already set to a valid value in the pop function
assert (!snapshot_unapplied_log_lsa.is_max ());
// -or-equal because the value was set once to a valid log_lsa in the pop function, while the execute
// function will once again attempt to set the value for the first job in the newly popped vector
assert (snapshot_unapplied_log_lsa <= a_log_lsa);
m_unapplied_log_lsa.store (a_log_lsa);
}
inline log_lsa
redo_parallel::redo_task::get_unapplied_log_lsa () const
{
assert (m_do_monitor_unapplied_log_lsa);
return m_unapplied_log_lsa.load ();
}
/*********************************************************************
* min_unapplied_log_lsa_calculation - definition
*********************************************************************/
redo_parallel::min_unapplied_log_lsa_monitoring::min_unapplied_log_lsa_monitoring (
bool a_do_monitor, const log_lsa &a_start_main_thread_log_lsa,
const std::vector<std::unique_ptr<redo_task>> &a_redo_task)
: m_do_monitor { a_do_monitor }
, m_main_thread_unapplied_log_lsa { a_start_main_thread_log_lsa }
, m_redo_tasks { a_redo_task }
, m_calculated_log_lsa { a_do_monitor
? a_start_main_thread_log_lsa
: MAX_LSA }
, m_terminate_calculation { false }
{
assert ((a_do_monitor &&
!a_start_main_thread_log_lsa.is_max () && !a_start_main_thread_log_lsa.is_null ())
|| (!a_do_monitor && a_start_main_thread_log_lsa.is_max ()));
}
redo_parallel::min_unapplied_log_lsa_monitoring::~min_unapplied_log_lsa_monitoring ()
{
if (m_do_monitor)
{
m_terminate_calculation = true;
assert (m_calculate_thread.joinable ());
// multiple external threads can wait on this cv (wait_past_target_log_lsa)
// calculating loop might also temporarily wait on this
m_calculate_cv.notify_all ();
m_calculate_thread.join ();
}
else
{
assert (!m_calculate_thread.joinable ());
assert (m_calculated_log_lsa == MAX_LSA);
}
}
void
redo_parallel::min_unapplied_log_lsa_monitoring::start ()
{
if (m_do_monitor)
{
assert (m_redo_tasks.size () > 0);
// an upfront value for the minimum unapplied log_lsa will be calculated quasi instantaneous
m_calculate_thread = std::thread
{
std::bind (&redo_parallel::min_unapplied_log_lsa_monitoring::calculate_loop,
std::ref (*this))
};
}
}
void
redo_parallel::min_unapplied_log_lsa_monitoring::set_main_thread_unapplied_log_lsa (const log_lsa &a_log_lsa)
{
assert (m_do_monitor);
assert (m_main_thread_unapplied_log_lsa.load () < a_log_lsa);
assert (!a_log_lsa.is_max () && !a_log_lsa.is_null ());
m_main_thread_unapplied_log_lsa.store (a_log_lsa);
// multiple external threads can wait on this cv (wait_past_target_log_lsa)
// calculating loop might also temporarily wait on this
m_calculate_cv.notify_all ();
}
void
redo_parallel::min_unapplied_log_lsa_monitoring::wait_past_target_log_lsa (const log_lsa &a_target_lsa)
{
assert (m_do_monitor);
assert (a_target_lsa != MAX_LSA);
assert (a_target_lsa != NULL_LSA);
assert (!m_calculated_log_lsa.is_max ());
// avoid gratuitously notifying the calculating internal thread if the condition is already satisfied
// no need to lock the check because, by design, the value will be calculated
// immediately after initialization starting from a main thread log_lsa which will always
// be a valid value (i.e., neiether max, nor null)
if (a_target_lsa < m_calculated_log_lsa)
{
return;
}
// multiple external threads can wait on this cv (wait_past_target_log_lsa)
// calculating loop might also temporarily wait on this
// since it is needed to wake the calculating loop, wake all
m_calculate_cv.notify_all ();
std::unique_lock<std::mutex> ulock { m_calculate_mtx };
m_calculate_cv.wait (ulock, [this, &a_target_lsa] ()
{
return m_calculated_log_lsa > a_target_lsa;
});
}
log_lsa
redo_parallel::min_unapplied_log_lsa_monitoring::calculate ()
{
assert (m_do_monitor);
// the log_lsa supplied by the main thread will always be a valid one, as asserted in the
// ctor; therefore, if the system is idle (ie: no job is currently being executed), this
// main thread log_lsa will dictate the actual "progress" of the system;
log_lsa calculated_log_lsa { m_main_thread_unapplied_log_lsa.load () };
for (const auto &redo_task: m_redo_tasks)
{
const log_lsa task_unapplied_log_lsa { redo_task->get_unapplied_log_lsa () };
if (task_unapplied_log_lsa != MAX_LSA && task_unapplied_log_lsa < calculated_log_lsa)
{
calculated_log_lsa = task_unapplied_log_lsa;
}
}
// - assert might be invalid due to the calculating thread which might kick pro-active
// calculation before any client requests it
// - also, a MAX_LSA result might mean that all threads have finished processing jobs
assert (calculated_log_lsa != MAX_LSA);
assert (!calculated_log_lsa.is_max ());
return calculated_log_lsa;
}
void
redo_parallel::min_unapplied_log_lsa_monitoring::calculate_loop ()
{
assert (m_do_monitor);
assert (m_redo_tasks.size () > 0);
while (!m_terminate_calculation)
{
// calculation happens outside lock to not hold waiting threads
const log_lsa calculated_log_lsa = calculate ();
{
std::lock_guard<std::mutex> lockg { m_calculate_mtx };
m_calculated_log_lsa = calculated_log_lsa;
}
// multiple external threads can wait on this cv (wait_past_target_log_lsa)
m_calculate_cv.notify_all ();
{
std::unique_lock<std::mutex> ulock { m_calculate_mtx };
// might be interrupted from the outside (wait_past_target_log_lsa)
// or by the termination sequence (dtor)
m_calculate_cv.wait_for (ulock, std::chrono::milliseconds (10));
}
}
}
/*********************************************************************
* redo_parallel - definition
*********************************************************************/
redo_parallel::redo_parallel (unsigned a_task_count, bool a_do_monitor_min_unapplied_log_lsa,
const log_lsa &a_start_main_thread_log_lsa, const log_rv_redo_context ©_context)
: m_task_count { a_task_count }
, m_task_state_bookkeeping { a_task_count }
, m_worker_pool { nullptr }
, m_min_unapplied_log_lsa_calculation { a_do_monitor_min_unapplied_log_lsa, a_start_main_thread_log_lsa, m_redo_tasks }
{
assert (a_task_count > 0);
const thread_type tt = TT_RECOVERY;
m_pool_entry_manager = std::make_unique<cubthread::system_worker_entry_manager> (tt);
do_init_worker_pool (a_task_count);
do_init_tasks (a_task_count, a_do_monitor_min_unapplied_log_lsa, copy_context);
}
redo_parallel::~redo_parallel ()
{
assert (!m_task_state_bookkeeping.is_any_active ());
assert (m_worker_pool == nullptr);
for (auto &redo_task: m_redo_tasks)
{
assert (redo_task->is_idle ());
}
}
void
redo_parallel::add (redo_job_base *a_job)
{
const std::size_t task_index = m_vpid_hash (a_job->get_vpid ()) % m_task_count;
redo_task *const task = m_redo_tasks[task_index].get ();
task->push_job (a_job);
}
void
redo_parallel::wait_for_termination_and_stop_execution ()
{
for (auto &redo_task: m_redo_tasks)
{
redo_task->notify_adding_finished ();
}
// blocking call
m_task_state_bookkeeping.wait_for_termination ();
for (auto &redo_task: m_redo_tasks)
{
assert (redo_task->is_idle ());
}
m_worker_pool->stop_execution ();
cubthread::manager *thread_manager = cubthread::get_manager ();
thread_manager->destroy_worker_pool (m_worker_pool);
assert (m_worker_pool == nullptr);
}
void
redo_parallel::do_init_worker_pool (std::size_t a_task_count)
{
assert (a_task_count > 0);
assert (m_worker_pool == nullptr);
m_worker_pool = thread_create_worker_pool (a_task_count, a_task_count, "recovery-redo",
*m_pool_entry_manager.get ());
// m_log = false
}
void
redo_parallel::do_init_tasks (std::size_t a_task_count, bool a_do_monitor_unapplied_log_lsa,
const log_rv_redo_context ©_context)
{
assert (a_task_count > 0);
assert (m_worker_pool != nullptr);
for (unsigned task_idx = 0; task_idx < a_task_count; ++task_idx)
{
auto task = std::make_unique<redo_parallel::redo_task> (task_idx, a_do_monitor_unapplied_log_lsa,
m_task_state_bookkeeping, copy_context);
m_worker_pool->execute (task.get ());
m_redo_tasks.push_back (std::move (task));
}
m_min_unapplied_log_lsa_calculation.start ();
}
void
redo_parallel::log_perf_stats () const
{
if ( !perf_stats_is_active_for_async () )
{
return;
}
const cubperf::statset_definition definition
{
perf_stats_async_definition_init_list
};
std::vector<cubperf::stat_value> accum_perf_stat_results;
accum_perf_stat_results.resize (definition.get_value_count ());
for (auto &redo_task: m_redo_tasks)
{
redo_task->accumulate_perf_stats (accum_perf_stat_results.data (), accum_perf_stat_results.size ());
redo_task->log_perf_stats ();
}
// average
const std::size_t task_count = m_redo_tasks.size ();
const std::size_t value_count = accum_perf_stat_results.size ();
std::vector<cubperf::stat_value> avg_perf_stat_results;
avg_perf_stat_results.resize (value_count, 0);
for (std::size_t idx = 0; idx < value_count; ++idx)
{
avg_perf_stat_results[idx] = accum_perf_stat_results[idx] / task_count;
}
log_perf_stats_values_with_definition ("Log recovery redo worker threads averaged perf stats",
definition, avg_perf_stat_results.data (), value_count);
}
void
redo_parallel::set_main_thread_unapplied_log_lsa (const log_lsa &a_log_lsa)
{
m_min_unapplied_log_lsa_calculation.set_main_thread_unapplied_log_lsa (a_log_lsa);
}
void
redo_parallel::wait_past_target_lsa (const log_lsa &a_target_lsa)
{
m_min_unapplied_log_lsa_calculation.wait_past_target_log_lsa (a_target_lsa);
}
/*********************************************************************
* redo_job_impl - definition
*********************************************************************/
redo_job_impl::redo_job_impl (reusable_jobs_stack *a_reusable_job_stack)
: redo_parallel::redo_job_base (VPID_INITIALIZER, NULL_LSA)
, m_reusable_job_stack { a_reusable_job_stack }
, m_log_rtype { LOG_SMALLER_LOGREC_TYPE }
{
assert (a_reusable_job_stack != nullptr);
}
void redo_job_impl::set_record_info (VPID a_vpid, const log_lsa &a_rcv_lsa, LOG_RECTYPE a_log_rtype)
{
this->redo_job_base::set_record_info (a_vpid, a_rcv_lsa);
assert (a_log_rtype > LOG_SMALLER_LOGREC_TYPE && a_log_rtype < LOG_LARGER_LOGREC_TYPE);
m_log_rtype = a_log_rtype;
}
int redo_job_impl::execute (THREAD_ENTRY *thread_p, log_rv_redo_context &redo_context)
{
/* perf data for processing log redo asynchronously, enabled:
* - during log crash recovery
* - on the page server, when replication is executing in the asynchronous mode
* in both cases, it does include the part that effectively calls the redo function, so, for accurate
* evaluation the part that effectively executes the redo function must be accounted for
*/
const int err_fetch =
redo_context.m_reader.set_lsa_and_fetch_page (get_log_lsa (), redo_context.m_reader_fetch_page_mode);
if (err_fetch != NO_ERROR)
{
return err_fetch;
}
redo_context.m_reader.add_align (sizeof (LOG_RECORD_HEADER));
switch (m_log_rtype)
{
case LOG_REDO_DATA:
read_record_and_redo<log_rec_redo> (thread_p, redo_context);
break;
case LOG_MVCC_REDO_DATA:
read_record_and_redo<log_rec_mvcc_redo> (thread_p, redo_context);
break;
case LOG_UNDOREDO_DATA:
case LOG_DIFF_UNDOREDO_DATA:
read_record_and_redo<log_rec_undoredo> (thread_p, redo_context);
break;
case LOG_MVCC_UNDOREDO_DATA:
case LOG_MVCC_DIFF_UNDOREDO_DATA:
read_record_and_redo<log_rec_mvcc_undoredo> (thread_p, redo_context);
break;
case LOG_RUN_POSTPONE:
read_record_and_redo<log_rec_run_postpone> (thread_p, redo_context);
break;
case LOG_COMPENSATE:
read_record_and_redo<log_rec_compensate> (thread_p, redo_context);
break;
default:
assert (false);
break;
}
return NO_ERROR;
}
void redo_job_impl::retire (std::size_t a_task_idx)
{
// return the job back to the pool of available reusable jobs
m_reusable_job_stack->push (a_task_idx, this);
}
template <typename T>
inline void
redo_job_impl::read_record_and_redo (THREAD_ENTRY *thread_p, log_rv_redo_context &redo_context)
{
redo_context.m_reader.advance_when_does_not_fit (sizeof (T));
log_rv_redo_rec_info<T> record_info (get_log_lsa (), m_log_rtype,
redo_context.m_reader.reinterpret_copy_and_add_align<T> ());
log_rv_redo_record_sync<T> (thread_p, redo_context, record_info, get_vpid ());
}
/*********************************************************************
* reusable_jobs_stack - definition
*********************************************************************/
reusable_jobs_stack::reusable_jobs_stack ()
: m_flush_push_at_count { cublog::PARALLEL_REDO_REUSABLE_JOBS_FLUSH_BACK_COUNT }
{
}
void reusable_jobs_stack::initialize (std::size_t a_push_task_count)
{
assert (m_pop_jobs.empty ());
assert (m_push_jobs.empty ());
assert (m_per_task_push_jobs_vec.empty ());
assert (a_push_task_count > 0);
m_job_pool.resize (PARALLEL_REDO_REUSABLE_JOBS_COUNT, redo_job_impl (this));
m_pop_jobs.reserve (m_job_pool.size ());
for (std::size_t idx = 0; idx < m_job_pool.size (); ++idx)
{
m_pop_jobs.push_back (&m_job_pool[idx]);
}
m_push_jobs.reserve (m_job_pool.size ());
m_per_task_push_jobs_vec.resize (a_push_task_count);
for (job_container_t &jobs: m_per_task_push_jobs_vec)
{
jobs.reserve (m_flush_push_at_count);
}
}
reusable_jobs_stack::~reusable_jobs_stack ()
{
// consistency check that all job instances have been 'returned to the source'
assert ([this] ()
{
const std::size_t pop_size = m_pop_jobs.size ();
const std::size_t push_size = m_push_jobs.size ();
std::size_t per_task_push_size = 0;
for (auto &push_container: m_per_task_push_jobs_vec)
{
per_task_push_size += push_container.size ();
}
assert ((pop_size + push_size + per_task_push_size) == m_job_pool.size ());
return true;
}
());
}
redo_job_impl *reusable_jobs_stack::blocking_pop (perf_stats &a_rcv_redo_perf_stat)
{
if (!m_pop_jobs.empty ())
{
redo_job_impl *const pop_job = m_pop_jobs.back ();
m_pop_jobs.pop_back ();
a_rcv_redo_perf_stat.time_and_increment (PERF_STAT_ID_REDO_OR_PUSH_POP_REUSABLE_DIRECT);
return pop_job;
}
else
{
{
std::unique_lock<std::mutex> locku { m_push_mtx };
m_push_jobs_available_cv.wait (locku, [this] ()
{
return !m_push_jobs.empty ();
});
m_pop_jobs.swap (m_push_jobs);
}
redo_job_impl *const pop_job = m_pop_jobs.back ();
m_pop_jobs.pop_back ();
a_rcv_redo_perf_stat.time_and_increment (PERF_STAT_ID_REDO_OR_PUSH_POP_REUSABLE_WAIT);
return pop_job;
}
assert ("unreachable state reached" == nullptr);
return nullptr;
}
void reusable_jobs_stack::push (std::size_t a_task_idx, redo_job_impl *a_job)
{
job_container_t &push_jobs = m_per_task_push_jobs_vec[a_task_idx];
push_jobs.push_back (a_job);
if (push_jobs.size () > m_flush_push_at_count)
{
{
std::lock_guard<std::mutex> locku { m_push_mtx };
m_push_jobs.insert (m_push_jobs.end (), push_jobs.cbegin (), push_jobs.cend ());
}
push_jobs.clear ();
m_push_jobs_available_cv.notify_one ();
}
}
}