File pl_session.cpp¶
File List > cubrid > src > sp > pl_session.cpp
Go to the documentation of this file
/*
*
* Copyright 2016 CUBRID Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include "pl_session.hpp"
#include "pl_comm.h"
#include "pl_query_cursor.hpp"
#include "pl_sr.h"
#include "query_manager.h"
#include "session.h"
#include "xserver_interface.h"
#include "thread_manager.hpp"
#include "method_error.hpp"
#include "method_struct_parameter_info.hpp"
// XXX: SHOULD BE THE LAST INCLUDE HEADER
#include "memory_wrapper.hpp"
#define CONN_EPOCH_NEVER (-2)
#define CONN_EPOCH_NONE (-1)
namespace cubpl
{
// Global interface
session *get_session ()
{
session *s = nullptr;
cubthread::entry *thread_p = thread_get_thread_entry_info ();
#if defined (SERVER_MODE)
// only worker thread can access session
if (thread_p && thread_p->type != TT_WORKER)
{
return nullptr;
}
#endif
int error = session_get_pl_session (thread_p, s);
if (error != NO_ERROR)
{
// session expired or internal error
er_set (ER_NOTIFICATION_SEVERITY, ARG_FILE_LINE, ER_INTERRUPTING, 1, thread_p->tran_index);
}
return s; // s can be null on error cases, especially, on session expriation
}
// Runtime Context
session::session (SESSION_ID id)
: m_mutex_stack ()
, m_mutex_connection ()
, m_mutex_cursor ()
, m_session_cursors {}
, m_stack_map {}
, m_exec_stack {}
, m_cursor_map {}
, m_req_id {0}
, m_param_info {nullptr}
, m_all_session_params_required {true}
, m_last_conn_epoch (CONN_EPOCH_NEVER)
, m_stack_idx {-1}
, m_interrupt_id (NO_ERROR)
, m_id (id)
{
m_exec_stack.reserve (METHOD_MAX_RECURSION_DEPTH + 1);
}
session::~session ()
{
er_log_debug (ARG_FILE_LINE, "pl_session (delete): %d\n", m_id);
destroy_pl_context_jvm ();
std::lock_guard<std::mutex> lock (m_mutex_connection);
m_session_connections.clear ();
// TODO: delete other resources
}
execution_stack *
session::create_and_push_stack (cubthread::entry *thread_p)
{
if (thread_p == nullptr)
{
thread_p = thread_get_thread_entry_info ();
}
std::lock_guard<std::mutex> lock (m_mutex_stack);
if (m_stack_idx >= METHOD_MAX_RECURSION_DEPTH)
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_SP_TOO_MANY_NESTED_CALL, 0);
set_interrupt (ER_SP_TOO_MANY_NESTED_CALL);
return nullptr;
}
if (m_stack_idx == -1)
{
// clear previous interrupt state
clear_interrupt ();
}
else
{
assert (m_stack_idx >= 0);
// check interrupt
if (m_interrupt_id != NO_ERROR)
{
// block creating a new stack
set_local_error_for_interrupt ();
return nullptr;
}
}
execution_stack *stack = new execution_stack (thread_p, this);
if (stack)
{
// update stack index
m_stack_idx++;
// push to exec_stack
PL_STACK_ID stack_id = stack->get_id ();
m_stack_map [stack_id] = stack;
int stack_size = (int) m_exec_stack.size ();
if (m_stack_idx < stack_size)
{
m_exec_stack [m_stack_idx] = stack_id;
}
else
{
m_exec_stack.emplace_back (stack_id);
}
}
else
{
set_interrupt (ER_OUT_OF_VIRTUAL_MEMORY);
}
return stack;
}
void
session::pop_and_destroy_stack (const PL_STACK_ID sid)
{
auto target_stack_is_at_top = [&] () -> bool
{
// condition to check
return m_exec_stack[m_stack_idx] == sid;
};
std::unique_lock<std::mutex> ulock (m_mutex_stack);
// Guaranteed to be removed from the topmost element
m_cond_target_stack_at_top.wait (ulock, target_stack_is_at_top);
assert (m_stack_idx >= 0);
m_exec_stack[m_stack_idx] = -1;
m_stack_idx--;
m_cond_target_stack_at_top.notify_all();
m_stack_map.erase (sid);
if (m_stack_idx == -1)
{
clear_interrupt ();
m_cond_pl_session_done.notify_all();
}
else
{
assert (m_stack_idx >= 0);
}
}
execution_stack *
session::top_stack ()
{
std::lock_guard<std::mutex> lock (m_mutex_stack);
if (m_stack_idx == -1)
{
return nullptr;
}
assert (m_stack_idx >= 0);
PL_STACK_ID top = m_exec_stack[m_stack_idx];
const auto &it = m_stack_map.find (top);
if (it == m_stack_map.end ())
{
// should not happended
assert (false);
return nullptr;
}
return it->second;
}
connection_view
session::claim_connection ()
{
std::lock_guard<std::mutex> lock (m_mutex_connection);
connection_view cv = nullptr;
if (m_session_connections.empty ())
{
connection_pool *pool = get_connection_pool ();
if (pool)
{
m_session_connections.emplace_back (std::move (pool->claim ()));
}
}
if (!m_session_connections.empty ())
{
cv = std::move (m_session_connections.front());
m_session_connections.pop_front();
}
if (check_reloading_pl_context_required (cv))
{
set_session_params_all_required (true);
}
return cv;
}
void
session::release_connection (connection_view &conn)
{
std::lock_guard<std::mutex> lock (m_mutex_connection);
if (conn != nullptr)
{
// if there was no error, update the connection epoch and session parameters state
if (conn->get_last_error () == NO_ERROR)
{
if (m_last_conn_epoch == conn->get_epoch ())
{
set_session_params_all_required (false);
}
m_last_conn_epoch = conn->get_epoch ();
}
m_session_connections.emplace_back (std::move (conn));
}
}
void
session::destroy_pl_context_jvm ()
{
if (m_last_conn_epoch != CONN_EPOCH_NEVER)
{
cubmethod::header header (m_id, SP_CODE_DESTROY);
m_last_conn_epoch = CONN_EPOCH_NONE;
connection_view cv = claim_connection ();
if (cv)
{
if (cv->is_valid ())
{
cv->send_buffer_args (header);
}
release_connection (cv);
}
}
}
bool
session::is_thread_involved (thread_id_t id)
{
std::lock_guard<std::mutex> lock (m_mutex_stack);
for (const auto &it : m_stack_map)
{
execution_stack *stack = it.second;
if (stack->get_thread_entry () && id == stack->get_thread_entry ()->get_id ())
{
return true;
}
}
return false;
}
void
session::set_interrupt (int reason, std::string msg)
{
if (m_interrupt_id != NO_ERROR)
{
// do not overwrite interrupt
return;
}
switch (reason)
{
/* no arg */
case ER_INTERRUPTED:
case ER_SP_TOO_MANY_NESTED_CALL:
case ER_NET_SERVER_SHUTDOWN:
case ER_SP_NOT_RUNNING_PL_SERVER:
case ER_SES_SESSION_EXPIRED:
m_interrupt_id = reason;
m_interrupt_msg.assign ("");
break;
/* 1 arg */
case ER_SP_CANNOT_CONNECT_PL_SERVER:
case ER_SP_NETWORK_ERROR:
case ER_OUT_OF_VIRTUAL_MEMORY:
m_interrupt_id = reason;
m_interrupt_msg.assign (msg);
break;
default:
/* do nothing */
break;
}
#if !defined (NDEBUG)
if (m_interrupt_id != NO_ERROR)
{
er_log_debug (ARG_FILE_LINE, "pl_session (interrupted): %d\n", m_id);
}
#endif
destroy_pl_context_jvm ();
}
void
session::set_local_error_for_interrupt ()
{
cubmethod::handle_method_error (get_interrupt_id (), get_interrupt_msg ());
}
bool
session::is_interrupted ()
{
return m_interrupt_id != NO_ERROR;
}
int
session::get_interrupt_id ()
{
return m_interrupt_id;
}
std::string
session::get_interrupt_msg ()
{
return m_interrupt_msg;
}
void
session::clear_interrupt ()
{
m_interrupt_id = NO_ERROR;
m_interrupt_msg.clear ();
}
void
session::wait_until_pl_session_done ()
{
auto pl_session_is_not_running = [this] () -> bool
{
// condition of finish
return m_stack_idx == -1; // not running
};
std::unique_lock<std::mutex> ulock (m_mutex_stack);
m_cond_pl_session_done.wait (ulock, pl_session_is_not_running);
}
SESSION_ID
session::get_id ()
{
return m_id;
}
bool
session::is_sp_running ()
{
std::lock_guard<std::mutex> lock (m_mutex_stack);
return m_stack_idx >= 0;
}
query_cursor *
session::get_cursor (cubthread::entry *thread_p, QUERY_ID query_id)
{
if (query_id == NULL_QUERY_ID)
{
return nullptr;
}
std::lock_guard<std::mutex> lock (m_mutex_cursor);
// find in map
auto search = m_cursor_map.find (query_id);
if (search != m_cursor_map.end ())
{
// found
return search->second;
}
return nullptr;
}
query_cursor *
session::create_cursor (cubthread::entry *thread_p, QUERY_ID query_id, bool is_oid_included)
{
if (query_id == NULL_QUERY_ID || query_id >= SHRT_MAX)
{
// false query e.g) SELECT * FROM db_class WHERE 0 <> 0
return nullptr;
}
std::lock_guard<std::mutex> lock (m_mutex_cursor);
query_cursor *cursor = nullptr;
// find in map
auto search = m_cursor_map.find (query_id);
if (search != m_cursor_map.end ())
{
// found
cursor = search->second;
assert (cursor != nullptr);
cursor->change_owner (thread_p);
return cursor;
}
else
{
// not found, create a new server-side cursor
cursor = new query_cursor (thread_p, query_id, is_oid_included);
// store a new cursor in map
m_cursor_map [query_id] = cursor;
assert (cursor != nullptr);
}
return cursor;
}
void
session::destroy_cursor (cubthread::entry *thread_p, QUERY_ID query_id)
{
if (query_id == NULL_QUERY_ID)
{
/* do nothing */
return;
}
std::lock_guard<std::mutex> lock (m_mutex_cursor);
// find in map
auto search = m_cursor_map.find (query_id);
if (search != m_cursor_map.end ())
{
delete search->second; // search->second cannot be null
m_cursor_map.erase (search);
}
}
void
session::add_session_cursor (cubthread::entry *thread_p, QUERY_ID query_id)
{
if (query_id == NULL_QUERY_ID)
{
/* do nothing */
return;
}
std::lock_guard<std::mutex> lock (m_mutex_cursor);
m_session_cursors.insert (query_id);
}
void
session::remove_session_cursor (cubthread::entry *thread_p, QUERY_ID query_id)
{
if (query_id == NULL_QUERY_ID)
{
/* do nothing */
return;
}
std::lock_guard<std::mutex> lock (m_mutex_cursor);
m_session_cursors.erase (query_id);
}
bool
session::is_session_cursor (QUERY_ID query_id)
{
std::lock_guard<std::mutex> lock (m_mutex_cursor);
return (m_session_cursors.find (query_id) != m_session_cursors.end ());
}
void
session::destroy_all_cursors ()
{
std::lock_guard<std::mutex> lock (m_mutex_cursor);
for (auto &it : m_cursor_map)
{
query_cursor *cursor = it.second;
if (cursor)
{
delete cursor;
}
}
m_cursor_map.clear ();
m_session_cursors.clear ();
}
cubmethod::db_parameter_info *
session::get_db_parameter_info () const
{
return m_param_info;
}
void
session::set_db_parameter_info (cubmethod::db_parameter_info *param_info)
{
m_param_info = param_info;
}
bool
session::check_reloading_pl_context_required (const connection_view &conn)
{
return m_last_conn_epoch < 0 || !conn || m_last_conn_epoch != conn->get_epoch () || !conn->is_valid ();
}
const std::vector <sys_param>
session::obtain_session_parameters (const connection_view &conn)
{
std::vector<sys_param> changed_sys_params;
if (check_reloading_pl_context_required (conn))
{
set_session_params_all_required (true);
}
if (m_session_param_changed_ids.size () == 0 && !m_all_session_params_required)
{
// there was no change in session parameters
return changed_sys_params;
}
// obtain DB's session parameters
SYSPRM_ASSIGN_VALUE *session_params = xsysprm_get_pl_context_parameters (PRM_USER_CHANGE | PRM_FOR_SESSION);
SYSPRM_ASSIGN_VALUE *next_param = session_params;
while (next_param != NULL)
{
bool is_param_changed = m_session_param_changed_ids.find (next_param->prm_id) != m_session_param_changed_ids.end ();
if (m_all_session_params_required || is_param_changed)
{
changed_sys_params.emplace_back (next_param);
}
next_param = next_param->next;
}
// obtain PL's session parameters
auto to_int = [] (sys_param_id id)
{
return static_cast<std::underlying_type_t<sys_param_id>> (id);
};
for (auto i = to_int (sys_param_id::PRM_ID_BEGIN);
i < to_int (sys_param_id::PRM_ID_END); ++i)
{
bool is_param_changed = m_session_param_changed_ids.find (i) != m_session_param_changed_ids.end ();
if (m_all_session_params_required || is_param_changed)
{
auto param = m_session_params.find (i);
if (param != m_session_params.end ())
{
changed_sys_params.emplace_back (param->second);
}
}
}
if (session_params)
{
sysprm_free_assign_values (&session_params);
}
if (changed_sys_params.size () > 0)
{
for (auto ¶m : changed_sys_params)
{
set_session_param (param);
}
}
return changed_sys_params;
}
void
session::mark_session_param_changed (int prm_id)
{
m_session_param_changed_ids.insert (prm_id);
}
void
session::set_session_param (const sys_param ¶m)
{
m_session_params.insert_or_assign (param.prm_id, param);
}
void
session::set_session_params_all_required (bool is_required)
{
m_all_session_params_required = is_required;
if (!m_all_session_params_required)
{
m_session_param_changed_ids.clear ();
}
}
sys_param::sys_param (const SYSPRM_ASSIGN_VALUE *db_param)
: sys_param (GET_PRM (db_param->prm_id))
{
//
}
sys_param::sys_param (const SYSPRM_PARAM *db_param)
: sys_param ((int) db_param->id, (int) GET_PRM (db_param->id)->datatype, "")
{
if (prm_id < static_cast<int> (sys_param_id::PRM_ID_BEGIN))
{
set_prm_value (db_param);
}
}
sys_param::sys_param (int prm_id, int prm_type, std::string prm_value)
: prm_id (prm_id), prm_type (prm_type), prm_value (prm_value)
{
}
void
sys_param::set_prm_value (const SYSPRM_PARAM *prm)
{
if (prm_type == PRM_BOOLEAN)
{
bool val = prm_get_bool_value (prm->id);
prm_value = val ? "t" : "f";
}
else if (prm_type == PRM_STRING)
{
const char *val = prm_get_string_value (prm->id);
if (val == NULL)
{
switch (prm->id)
{
case PRM_ID_INTL_COLLATION:
val = lang_get_collation_name (LANG_GET_BINARY_COLLATION (LANG_SYS_CODESET));
break;
case PRM_ID_INTL_DATE_LANG:
case PRM_ID_INTL_NUMBER_LANG:
val = lang_get_Lang_name ();
break;
case PRM_ID_TIMEZONE:
val = prm_get_string_value (PRM_ID_SERVER_TIMEZONE);
break;
default:
/* do nothing */
break;
}
}
prm_value = val;
}
else if (prm_type == PRM_INTEGER)
{
int val = prm_get_integer_value (prm->id);
prm_value = std::to_string (val);
}
else if (prm_type == PRM_BIGINT)
{
UINT64 val = prm_get_bigint_value (prm->id);
prm_value = std::to_string (val);
}
else if (prm_type == PRM_FLOAT)
{
float val = prm_get_float_value (prm->id);
prm_value = std::to_string (val);
}
else
{
assert (false);
prm_value = "*unknown*";
}
}
#define SYS_PARAM_PACKER_ARGS() \
prm_id, prm_type, prm_value
void
sys_param::pack (cubpacking::packer &serializator) const
{
serializator.pack_all (SYS_PARAM_PACKER_ARGS());
}
size_t
sys_param::get_packed_size (cubpacking::packer &serializator, std::size_t start_offset) const
{
return serializator.get_all_packed_size_starting_offset (start_offset, SYS_PARAM_PACKER_ARGS ());
}
void
sys_param::unpack (cubpacking::unpacker &deserializator)
{
deserializator.unpack_all (SYS_PARAM_PACKER_ARGS ());
}
} // cubmethod