File log_postpone_cache.cpp¶
File List > cubrid > src > transaction > log_postpone_cache.cpp
Go to the documentation of this file
/*
* Copyright 2008 Search Solution Corporation
* Copyright 2016 CUBRID Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
/*
* log_postpone_cache.cpp - log postpone cache module
*/
#include "log_postpone_cache.hpp"
#include "memory_alloc.h"
#include "memory_private_allocator.hpp"
#include "object_representation.h"
#include "log_manager.h"
#include <cstring>
// XXX: SHOULD BE THE LAST INCLUDE HEADER
#include "memory_wrapper.hpp"
void
log_postpone_cache::copy_to (log_postpone_cache &dest) const
{
m_redo_data_buf.copy_to (dest.m_redo_data_buf);
dest.m_redo_data_offset = m_redo_data_offset;
dest.m_is_redo_data_buf_full = m_is_redo_data_buf_full;
dest.m_max_cache_entries = m_max_cache_entries;
dest.m_cursor = m_cursor;
dest.m_cache_entries = m_cache_entries;
}
void
log_postpone_cache::reset ()
{
m_cursor = 0;
m_redo_data_offset = 0;
m_is_redo_data_buf_full = false;
if (m_redo_data_buf.get_size () > BUFFER_RESET_SIZE)
{
m_redo_data_buf.freemem ();
}
}
void
log_postpone_cache::reset_from (std::size_t cursor)
{
if (cursor >= m_cursor)
{
// Cannot reset
assert (false);
return ;
}
if (cursor > 0)
{
m_cursor = cursor;
m_redo_data_offset = m_cache_entries[cursor].m_offset;
m_is_redo_data_buf_full = false;
}
else
{
assert (cursor == 0);
reset ();
}
}
void
log_postpone_cache::add_redo_data (const log_prior_node &node)
{
assert (node.data_header != NULL);
assert (node.rlength == 0 || node.rdata != NULL);
assert (m_cursor <= m_max_cache_entries);
if (is_full ())
{
// Cannot cache postpones, cache reached max capacity
return;
}
// Check if recovery data fits in preallocated buffer
std::size_t redo_data_size = sizeof (log_rec_redo) + node.rlength + (2 * MAX_ALIGNMENT);
std::size_t total_size = redo_data_size + m_redo_data_offset;
if (total_size > REDO_DATA_MAX_SIZE)
{
// Cannot store all recovery data
m_is_redo_data_buf_full = true;
return;
}
else
{
m_redo_data_buf.extend_to (total_size);
}
// Cache a new postpone log record entry
cache_entry &new_entry = m_cache_entries[m_cursor];
new_entry.m_offset = m_redo_data_offset;
// first and only first entry has m_offset equal to zero
assert ((m_cursor == 0) == (new_entry.m_offset == 0));
new_entry.m_lsa.set_null ();
// Cache log_rec_redo from data_header
char *redo_data_ptr = m_redo_data_buf.get_ptr () + m_redo_data_offset;
memcpy (redo_data_ptr, node.data_header, sizeof (log_rec_redo));
redo_data_ptr += sizeof (log_rec_redo);
redo_data_ptr = PTR_ALIGN (redo_data_ptr, MAX_ALIGNMENT);
// Cache recovery data
assert (((log_rec_redo *) node.data_header)->length == node.rlength);
if (node.rlength > 0)
{
memcpy (redo_data_ptr, node.rdata, node.rlength);
redo_data_ptr += node.rlength;
redo_data_ptr = PTR_ALIGN (redo_data_ptr, MAX_ALIGNMENT);
}
m_redo_data_offset = redo_data_ptr - m_redo_data_buf.get_ptr ();
}
void
log_postpone_cache::add_lsa (const log_lsa &lsa)
{
assert (!lsa.is_null ());
if (is_full ())
{
return;
}
assert (m_cursor < m_max_cache_entries);
m_cache_entries[m_cursor].m_lsa = lsa;
/* Now that all needed data is saved, increment cached entries counter. */
m_cursor++;
}
bool
log_postpone_cache::do_postpone (cubthread::entry &thread_ref, const log_lsa &start_postpone_lsa)
{
assert (!start_postpone_lsa.is_null ());
// First cached postpone entry at start_postpone_lsa
int start_index = -1;
for (std::size_t i = 0; i < m_cursor; ++i)
{
if (m_cache_entries[i].m_lsa == start_postpone_lsa)
{
// Found start lsa
start_index = (int) i;
break;
}
}
if (start_index < 0)
{
// Start LSA was not found.
// Should call log_do_postpone.
return false;
}
if (is_full ())
{
// Do not reset the entire cache; the postponed logs might be used by an outer loop.
// Since we cannot handle all of these postponed logs here, we must call log_do_postpone.
reset_from (static_cast<std::size_t> (start_index));
return false;
}
const size_t RCV_DATA_DEFAULT_SIZE = 1024;
cubmem::extensible_stack_block<RCV_DATA_DEFAULT_SIZE> rcv_data_buffer { cubmem::PRIVATE_BLOCK_ALLOCATOR };
// Run all postpones after start_index
for (std::size_t i = start_index; i < m_cursor; ++i)
{
cache_entry &entry = m_cache_entries[i];
// Get redo data header
char *redo_data = m_redo_data_buf.get_ptr () + entry.m_offset;
log_rec_redo redo = * (log_rec_redo *) redo_data;
// Get recovery data
char *data_ptr = redo_data + sizeof (log_rec_redo);
data_ptr = PTR_ALIGN (data_ptr, MAX_ALIGNMENT);
rcv_data_buffer.extend_to (redo.length);
std::memcpy (rcv_data_buffer.get_ptr (), data_ptr, redo.length);
(void) log_execute_run_postpone (&thread_ref, &entry.m_lsa, &redo, rcv_data_buffer.get_ptr ());
}
// Finished running postpones, update the number of entries which should be run on next commit
m_cursor = start_index;
m_redo_data_offset = m_cache_entries[start_index].m_offset;
m_is_redo_data_buf_full = false;
if (m_cursor == 0)
{
assert (m_redo_data_offset == 0); // should be 0 when cursor is back to 0
reset ();
}
return true;
}
bool
log_postpone_cache::is_full () const
{
return m_cursor == m_max_cache_entries || m_is_redo_data_buf_full;
}