File session.c¶
File List > cubrid > src > session > session.c
Go to the documentation of this file
/*
* Copyright 2008 Search Solution Corporation
* Copyright 2016 CUBRID Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
/*
* session.c - session state internal API
*/
#ident "$Id$"
#include <assert.h>
#if !defined(WINDOWS)
#include <sys/time.h>
#include <sys/resource.h>
#endif /* !WINDDOWS */
#include "system.h"
#include "session.h"
#include "boot_sr.h"
#include "jansson.h"
#include "critical_section.h"
#include "error_manager.h"
#include "system_parameter.h"
#include "environment_variable.h"
#if defined(SERVER_MODE)
#include "connection_sr.h"
#else /* !defined (SERVER_MODE) = defined (SA_MODE) */
#include "db.h"
#endif /* defined (SA_MODE) */
#include "lock_free.h"
#include "object_primitive.h"
#include "dbtype.h"
#include "string_opfunc.h"
#if defined (SERVER_MODE)
#include "thread_daemon.hpp"
#endif
#include "thread_entry_task.hpp"
#include "thread_lockfree_hash_map.hpp"
#include "thread_manager.hpp"
#include "xasl_cache.h"
#include "pl_session.hpp"
// XXX: SHOULD BE THE LAST INCLUDE HEADER
#include "memory_wrapper.hpp"
#if !defined(SERVER_MODE)
#define pthread_mutex_init(a, b)
#define pthread_mutex_destroy(a)
#define pthread_mutex_lock(a) 0
#define pthread_mutex_trylock(a) 0
#define pthread_mutex_unlock(a)
static int rv;
#endif /* not SERVER_MODE */
#define SESSIONS_HASH_SIZE 1000
#define MAX_SESSION_VARIABLES_COUNT 20
#define MAX_PREPARED_STATEMENTS_COUNT 20
typedef struct session_info SESSION_INFO;
struct session_info
{
SESSION_ID *session_ids;
int count;
};
typedef struct session_variable SESSION_VARIABLE;
struct session_variable
{
char *name;
DB_VALUE *value;
SESSION_VARIABLE *next;
};
typedef struct prepared_statement PREPARED_STATEMENT;
struct prepared_statement
{
char *name;
char *alias_print;
SHA1Hash sha1;
int info_length;
char *info;
PREPARED_STATEMENT *next;
};
typedef struct session_query_entry SESSION_QUERY_ENTRY;
struct session_query_entry
{
QUERY_ID query_id; /* unique query identifier */
QFILE_LIST_ID *list_id; /* result list file identifier */
QMGR_TEMP_FILE *temp_file; /* temp files */
int num_tmp; /* number of temp files allocated */
int total_count; /* total number of file pages allocated for the entire query */
QUERY_FLAG query_flag;
SESSION_QUERY_ENTRY *next;
};
typedef struct session_state SESSION_STATE;
struct session_state
{
SESSION_ID id; /* session id */
SESSION_STATE *stack; /* used in freelist */
SESSION_STATE *next; /* used in hash table */
pthread_mutex_t mutex; /* state mutex */
UINT64 del_id; /* delete transaction ID (for lock free) */
bool is_keep_session;
bool is_trigger_involved;
bool is_last_insert_id_generated;
bool auto_commit;
DB_VALUE cur_insert_id;
DB_VALUE last_insert_id;
int row_count;
SESSION_VARIABLE *session_variables;
PREPARED_STATEMENT *statements;
SESSION_QUERY_ENTRY *queries;
time_t active_time;
SESSION_PARAM *session_parameters;
char *trace_stats;
char *plan_string;
int trace_format;
int ref_count;
TZ_REGION session_tz_region;
int private_lru_index;
load_session *load_session_p;
PL_SESSION *pl_session_p;
// *INDENT-OFF*
session_state ();
~session_state ();
// *INDENT-ON*
};
/* session state manipulation functions */
static void *session_state_alloc (void);
static int session_state_free (void *st);
static int session_state_init (void *st);
static int session_state_uninit (void *st);
static int session_key_copy (void *src, void *dest);
static unsigned int session_key_hash (void *key, int hash_table_size);
static int session_key_compare (void *k1, void *k2);
static int session_key_increment (void *key, void *existing);
/* session state structure descriptor for hash table */
static LF_ENTRY_DESCRIPTOR session_state_Descriptor = {
offsetof (SESSION_STATE, stack),
offsetof (SESSION_STATE, next),
offsetof (SESSION_STATE, del_id),
offsetof (SESSION_STATE, id),
offsetof (SESSION_STATE, mutex),
LF_EM_USING_MUTEX,
LF_ENTRY_DESCRIPTOR_MAX_ALLOC,
session_state_alloc,
session_state_free,
session_state_init,
session_state_uninit,
session_key_copy,
session_key_compare,
session_key_hash,
session_key_increment
};
// *INDENT-OFF*
using session_hashmap_type = cubthread::lockfree_hashmap<SESSION_ID, session_state>;
using session_hashmap_iterator = session_hashmap_type::iterator;
// *INDENT-ON*
typedef struct active_sessions
{
session_hashmap_type states_hashmap;
SESSION_ID last_session_id;
int num_holdable_cursors;
// *INDENT-OFF*
active_sessions ()
: states_hashmap {}
, last_session_id (0)
, num_holdable_cursors (0)
{
}
// *INDENT-ON*
} ACTIVE_SESSIONS;
/* the active sessions storage */
static ACTIVE_SESSIONS sessions;
static int session_remove_expired_sessions (THREAD_ENTRY * thread_p);
static int session_check_timeout (SESSION_STATE * session_p, SESSION_INFO * active_sessions, bool * remove);
static void session_free_prepared_statement (PREPARED_STATEMENT * stmt_p);
static int session_add_variable (SESSION_STATE * state_p, const DB_VALUE * name, DB_VALUE * value);
static int session_drop_variable (SESSION_STATE * state_p, const DB_VALUE * name);
static void free_session_variable (SESSION_VARIABLE * var);
static void update_session_variable (SESSION_VARIABLE * var, const DB_VALUE * new_value);
static DB_VALUE *db_value_alloc_and_copy (const DB_VALUE * src);
static int session_dump_session (SESSION_STATE * session);
static void session_dump_variable (SESSION_VARIABLE * var);
static void session_dump_prepared_statement (PREPARED_STATEMENT * stmt_p);
static SESSION_QUERY_ENTRY *qentry_to_sentry (QMGR_QUERY_ENTRY * qentry_p);
static int session_preserve_temporary_files (THREAD_ENTRY * thread_p, SESSION_QUERY_ENTRY * q_entry);
static void sentry_to_qentry (const SESSION_QUERY_ENTRY * sentry_p, QMGR_QUERY_ENTRY * qentry_p);
static void session_free_sentry_data (THREAD_ENTRY * thread_p, SESSION_QUERY_ENTRY * sentry_p);
static void session_set_conn_entry_data (THREAD_ENTRY * thread_p, SESSION_STATE * session_p);
static SESSION_STATE *session_get_session_state (THREAD_ENTRY * thread_p);
#if !defined (NDEBUG) && defined (SERVER_MODE)
static int session_state_verify_ref_count (THREAD_ENTRY * thread_p, SESSION_STATE * session_p);
#endif
// *INDENT-OFF*
#if defined (SERVER_MODE)
static cubthread::daemon *session_Control_daemon = NULL;
static void session_control_daemon_init ();
static void session_control_daemon_destroy ();
#endif
session_state::session_state ()
{
pthread_mutex_init (&mutex, NULL);
}
session_state::~session_state ()
{
pthread_mutex_destroy (&mutex);
}
// *INDENT-ON*
/*
* session_state_alloc () - allocate a new session state
* returns: new pointer or NULL on error
*/
static void *
session_state_alloc (void)
{
SESSION_STATE *state;
state = (SESSION_STATE *) malloc (sizeof (SESSION_STATE));
if (state != NULL)
{
pthread_mutex_init (&state->mutex, NULL);
}
return (void *) state;
}
/*
* session_state_free () - free a session state
* returns: error code or NO_ERROR
* st(in): state to free
*/
static int
session_state_free (void *st)
{
if (st != NULL)
{
pthread_mutex_destroy (&((SESSION_STATE *) st)->mutex);
free (st);
return NO_ERROR;
}
else
{
return ER_FAILED;
}
}
/*
* session_state_init () - initialize a session state
* returns: error code or NO_ERROR
* st(in): state to initialize
*/
static int
session_state_init (void *st)
{
SESSION_STATE *session_p = (SESSION_STATE *) st;
if (st == NULL)
{
return ER_FAILED;
}
/* initialize fields */
db_make_null (&session_p->cur_insert_id);
db_make_null (&session_p->last_insert_id);
session_p->is_keep_session = false;
session_p->is_trigger_involved = false;
session_p->is_last_insert_id_generated = false;
session_p->row_count = -1;
session_p->session_variables = NULL;
session_p->statements = NULL;
session_p->queries = NULL;
session_p->session_parameters = NULL;
session_p->trace_stats = NULL;
session_p->plan_string = NULL;
session_p->ref_count = 0;
session_p->trace_format = QUERY_TRACE_TEXT;
session_p->private_lru_index = -1;
session_p->auto_commit = false;
session_p->load_session_p = NULL;
session_p->pl_session_p = NULL;
return NO_ERROR;
}
/*
* session_state_uninit () - uninitialize a session state
* returns: error code or NO_ERROR
* st(in): state to uninitialize
*/
static int
session_state_uninit (void *st)
{
SESSION_STATE *session = (SESSION_STATE *) st;
SESSION_VARIABLE *vcurent = NULL, *vnext = NULL;
PREPARED_STATEMENT *pcurent = NULL, *pnext = NULL;
THREAD_ENTRY *thread_p = thread_get_thread_entry_info ();
SESSION_QUERY_ENTRY *qcurent = NULL, *qnext = NULL;
int cnt = 0;
if (session == NULL)
{
return NO_ERROR;
}
#if defined (SESSION_DEBUG)
er_log_debug (ARG_FILE_LINE, "session_free_session %u\n", session->id);
#endif /* SESSION_DEBUG */
session_stop_attached_threads (thread_p, session);
if (session->pl_session_p)
{
delete session->pl_session_p;
session->pl_session_p = NULL;
}
else
{
er_log_debug (ARG_FILE_LINE, "[unexpected] session %u's pl_session_p is NULL in session_state_uninit()\n",
session->id);
}
/* free session variables */
vcurent = session->session_variables;
while (vcurent != NULL)
{
vnext = vcurent->next;
free_session_variable (vcurent);
vcurent = vnext;
}
session->session_variables = NULL;
/* free session statements */
pcurent = session->statements;
while (pcurent != NULL)
{
pnext = pcurent->next;
session_free_prepared_statement (pcurent);
pcurent = pnext;
}
session->statements = NULL;
/* free holdable queries */
qcurent = session->queries;
while (qcurent)
{
qnext = qcurent->next;
qcurent->next = NULL;
session_free_sentry_data (thread_p, qcurent);
free_and_init (qcurent);
qcurent = qnext;
cnt++;
}
session->queries = NULL;
if (session->session_parameters)
{
sysprm_free_session_parameters (&session->session_parameters);
}
(void) pgbuf_release_private_lru (thread_p, session->private_lru_index);
session->private_lru_index = -1;
#if defined (SESSION_DEBUG)
er_log_debug (ARG_FILE_LINE, "session_free_session closed %d queries for %d\n", cnt, session->id);
#endif /* SESSION_DEBUG */
pr_clear_value (&session->cur_insert_id);
pr_clear_value (&session->last_insert_id);
if (session->trace_stats != NULL)
{
free_and_init (session->trace_stats);
}
if (session->plan_string != NULL)
{
free_and_init (session->plan_string);
}
return NO_ERROR;
}
/*
* session_key_copy () - copy a session key
* returns: error code or NO_ERROR
* src(in): source
* dest(in): destination
*/
static int
session_key_copy (void *src, void *dest)
{
SESSION_ID *src_id, *dest_id;
if (src == NULL || dest == NULL)
{
return ER_FAILED;
}
src_id = (SESSION_ID *) src;
dest_id = (SESSION_ID *) dest;
*dest_id = *src_id;
/* all ok */
return NO_ERROR;
}
/*
* session_key_hash () - hashing function for the session hash
* return: int
* key(in): Session key
* htsize(in): Memory Hash Table Size
*
* Note: Generate a hash number for the given key for the given hash table
* size.
*/
static unsigned int
session_key_hash (void *key, int hash_table_size)
{
SESSION_ID id = *((SESSION_ID *) key);
return (id % hash_table_size);
}
/*
* sessions_key_compare () - Compare two session keys
* return: int (true or false)
* key_left (in) : First session key
* key_right (in) : Second session key
*/
static int
session_key_compare (void *k1, void *k2)
{
SESSION_ID *key1, *key2;
key1 = (SESSION_ID *) k1;
key2 = (SESSION_ID *) k2;
if (k1 == NULL || k2 == NULL)
{
/* should not happen */
assert (false);
return 0;
}
if (*key1 == *key2)
{
/* equal */
return 0;
}
else
{
/* not equal */
return 1;
}
}
/*
* session_key_increment () - increment a key
* returns: error code or NO_ERROR
* key(in): key to increment
* existing(in): existing entry with same key (NOT USED)
*/
static int
session_key_increment (void *key, void *existing)
{
SESSION_ID *key_p = (SESSION_ID *) key;
if (key == NULL)
{
return ER_FAILED;
}
else
{
(*key_p)++;
return NO_ERROR;
}
}
/*
* session_free_prepared_statement () - free memory allocated for a prepared
* statement
* return : void
* stmt_p (in) : prepared statement object
*/
static void
session_free_prepared_statement (PREPARED_STATEMENT * stmt_p)
{
if (stmt_p == NULL)
{
return;
}
#if defined (SESSION_DEBUG)
er_log_debug (ARG_FILE_LINE, "drop statement %s\n", stmt_p->name);
#endif /* SESSION_DEBUG */
if (stmt_p->name != NULL)
{
free_and_init (stmt_p->name);
}
if (stmt_p->alias_print != NULL)
{
free_and_init (stmt_p->alias_print);
}
if (stmt_p->info != NULL)
{
free_and_init (stmt_p->info);
}
free_and_init (stmt_p);
}
// *INDENT-OFF*
#if defined (SERVER_MODE)
void
session_control_daemon_execute (cubthread::entry & thread_ref)
{
if (!BO_IS_SERVER_RESTARTED ())
{
// wait for boot to finish
return;
}
session_remove_expired_sessions (&thread_ref);
}
/*
* session_control_daemon_init () - initialize session control daemon
*/
REGISTER_DAEMON (session_control);
void
session_control_daemon_init ()
{
assert (session_Control_daemon == NULL);
cubthread::looper looper = cubthread::looper (std::chrono::seconds (60));
cubthread::entry_callable_task *daemon_task =
new cubthread::entry_callable_task (std::bind (session_control_daemon_execute, std::placeholders::_1));
// create session control daemon thread
session_Control_daemon = cubthread::get_manager ()->create_daemon (looper, daemon_task, "session-control");
}
/*
* session_control_daemon_destroy () - destroy session control daemon
*/
void
session_control_daemon_destroy ()
{
cubthread::get_manager ()->destroy_daemon (session_Control_daemon);
}
#endif /* SERVER_MODE */
// *INDENT-ON*
/*
* session_states_init () - Initialize session states area
*
* Note: Creates and initializes a main memory hash table that will be
* used by session states operations. This routine should only be
* called once during server boot.
*/
void
session_states_init (THREAD_ENTRY * thread_p)
{
sessions.last_session_id = 0;
sessions.num_holdable_cursors = 0;
#if defined (SESSION_DEBUG)
er_log_debug (ARG_FILE_LINE, "creating session states table\n");
#endif /* SESSION_DEBUG */
sessions.states_hashmap.init (sessions_Ts, THREAD_TS_SESSIONS, SESSIONS_HASH_SIZE, 2, 50, session_state_Descriptor);
#if defined (SERVER_MODE)
session_control_daemon_init ();
#endif /* SERVER_MODE */
}
/*
* session_states_finalize () - cleanup the session states information
* return: NO_ERROR or error code
* thread_p (in) : the thread executing this function
*
* Note: This function deletes the session states global storage area.
* This function should be called only during server shutdown
*/
void
session_states_finalize (THREAD_ENTRY * thread_p)
{
#if defined (SERVER_MODE)
session_control_daemon_destroy ();
#endif /* SERVER_MODE */
const char *env_value = envvar_get ("DUMP_SESSION");
if (env_value != NULL)
{
session_states_dump (thread_p);
}
#if defined (SESSION_DEBUG)
er_log_debug (ARG_FILE_LINE, "deleting session state table\n");
#endif /* SESSION_DEBUG */
/* destroy hash and freelist */
sessions.states_hashmap.destroy ();
}
/*
* session_state_create () - Create a sessions state with the specified id
* return: NO_ERROR or error code
* session_id (in) : the session id
*
* Note: This function creates and adds a sessions state object to the
* sessions state memory hash. This function should be called when a
* session starts.
*/
int
session_state_create (THREAD_ENTRY * thread_p, SESSION_ID * id)
{
SESSION_STATE *session_p = NULL;
SESSION_ID next_session_id;
assert (id != NULL);
#if defined (SERVER_MODE)
if (thread_p && thread_p->conn_entry && thread_p->conn_entry->session_p)
{
SESSION_ID old_id = thread_p->conn_entry->session_id;
/* session_check_session should clear session_p, right? add safe-guard if necessary. */
assert (thread_p->conn_entry->session_p->id == old_id);
session_p = sessions.states_hashmap.find (thread_p, old_id);
if (session_p == NULL)
{
thread_p->conn_entry->session_id = DB_EMPTY_SESSION;
thread_p->conn_entry->session_p = NULL;
}
else
{
assert (session_p == thread_p->conn_entry->session_p);
#if !defined(NDEBUG)
session_state_verify_ref_count (thread_p, session_p);
#endif
thread_p->conn_entry->session_id = DB_EMPTY_SESSION;
thread_p->conn_entry->session_p = NULL;
session_state_decrease_ref_count (thread_p, session_p);
logtb_set_current_user_active (thread_p, false);
pthread_mutex_unlock (&session_p->mutex);
}
}
#endif
/* create search key */
next_session_id = ATOMIC_INC_32 (&sessions.last_session_id, 1);
*id = next_session_id;
/* insert new entry into hash table */
(void) sessions.states_hashmap.insert (thread_p, *id, session_p);
if (session_p == NULL)
{
/* should not happen */
assert (false);
return ER_FAILED;
}
/* inserted key might have been incremented; if last_session_id was not modified in the meantime, store the new value
*/
ATOMIC_CAS_32 (&sessions.last_session_id, next_session_id, *id);
if (session_p->pl_session_p)
{
/* should not happen */
assert (false);
er_log_debug (ARG_FILE_LINE, "(assertion fail) PL session is not NULL for a newly created session\n");
return ER_FAILED;
}
session_p->pl_session_p = new PL_SESSION (session_p->id);
/* initialize session active time */
session_p->active_time = time (NULL);
#if defined (SERVER_MODE)
#if !defined (NDEBUG)
(void) session_state_verify_ref_count (thread_p, session_p);
#endif
/* increase reference count of new session_p */
session_state_increase_ref_count (thread_p, session_p);
session_p->private_lru_index = pgbuf_assign_private_lru (thread_p);
/* set as thread session */
session_set_conn_entry_data (thread_p, session_p);
logtb_set_current_user_active (thread_p, true);
#endif
/* done with the entry */
pthread_mutex_unlock (&session_p->mutex);
#if defined (SESSION_DEBUG)
/* debug logging */
er_log_debug (ARG_FILE_LINE, "adding session with id %u\n", *id);
if (prm_get_bool_value (PRM_ID_ER_LOG_DEBUG) == true)
{
session_hashmap_iterator it = { thread_p, sessions.states_hashmap };
SESSION_STATE *state;
er_log_debug (ARG_FILE_LINE, "printing active sessions\n");
for (state = it.iterate (); state != NULL; state = it.iterate ())
{
er_log_debug (ARG_FILE_LINE, "session %u", state->id);
}
er_log_debug (ARG_FILE_LINE, "finished printing active sessions\n");
}
#endif /* SESSION_DEBUG */
return NO_ERROR;
}
/*
* session_state_destroy () - close a session state
* return : NO_ERROR or error code
* id(in) : the identifier for the session
* is_keep_session(in) : whether to keep the session
*/
int
session_state_destroy (THREAD_ENTRY * thread_p, const SESSION_ID id, bool is_keep_session)
{
SESSION_STATE *session_p;
int error = NO_ERROR, success = 0;
#if defined (SESSION_DEBUG)
er_log_debug (ARG_FILE_LINE, "removing session %u", id);
#endif /* SESSION_DEBUG */
SESSION_ID key_id = id;
session_p = sessions.states_hashmap.find (thread_p, key_id);
if (session_p == NULL)
{
er_set (ER_WARNING_SEVERITY, ARG_FILE_LINE, ER_SES_SESSION_EXPIRED, 0);
return ER_SES_SESSION_EXPIRED;
}
if (is_keep_session == true)
{
session_p->is_keep_session = true;
pthread_mutex_unlock (&session_p->mutex);
return NO_ERROR;
}
#if defined (SERVER_MODE)
if (thread_p != NULL && thread_p->conn_entry != NULL && thread_p->conn_entry->session_p != NULL
&& thread_p->conn_entry->session_p == session_p)
{
thread_p->conn_entry->session_p = NULL;
thread_p->conn_entry->session_id = DB_EMPTY_SESSION;
if (session_p->ref_count > 0)
{
session_state_decrease_ref_count (thread_p, session_p);
}
}
else
{
/* do we accept this case?? if we don't, add safe-guard here. */
}
logtb_set_current_user_active (thread_p, false);
if (session_p->ref_count > 0)
{
/* This session_state is busy, I can't remove */
pthread_mutex_unlock (&session_p->mutex);
return NO_ERROR;
}
/* Now we can destroy this session */
assert (session_p->ref_count == 0);
#endif
/* Destroy the session related resources like session parameters */
(void) session_state_uninit (session_p);
// delete from hash
if (!sessions.states_hashmap.erase_locked (thread_p, key_id, session_p))
{
/* we don't have clear operations on this hash table, this shouldn't happen */
pthread_mutex_unlock (&session_p->mutex);
assert_release (false);
return ER_FAILED;
}
return error;
}
/*
* session_check_session () - check if the session state with id
* exists and update the timeout for it
* return : NO_ERROR or error code
* id(in) : the identifier for the session
*/
int
session_check_session (THREAD_ENTRY * thread_p, const SESSION_ID id)
{
SESSION_STATE *session_p = NULL;
int error = NO_ERROR;
#if defined (SESSION_DEBUG)
er_log_debug (ARG_FILE_LINE, "updating timeout for session_id %u\n", id);
#endif /* SESSION_DEBUG */
#if defined (SERVER_MODE)
if (thread_p && thread_p->conn_entry && thread_p->conn_entry->session_p)
{
SESSION_ID old_id = thread_p->conn_entry->session_id;
assert (thread_p->conn_entry->session_p->id == old_id);
session_p = sessions.states_hashmap.find (thread_p, old_id);
if (session_p == NULL)
{
/* the session in connection entry no longer exists... */
/* todo: add safe guard if we cannot accept this case */
thread_p->conn_entry->session_id = DB_EMPTY_SESSION;
thread_p->conn_entry->session_p = NULL;
return ER_FAILED;
}
assert (session_p == thread_p->conn_entry->session_p);
#if !defined(NDEBUG)
session_state_verify_ref_count (thread_p, session_p);
#endif
thread_p->conn_entry->session_id = DB_EMPTY_SESSION;
thread_p->conn_entry->session_p = NULL;
session_state_decrease_ref_count (thread_p, session_p);
logtb_set_current_user_active (thread_p, false);
pthread_mutex_unlock (&session_p->mutex);
}
#endif
SESSION_ID key_id = id;
session_p = sessions.states_hashmap.find (thread_p, key_id);
if (session_p == NULL)
{
er_set (ER_WARNING_SEVERITY, ARG_FILE_LINE, ER_SES_SESSION_EXPIRED, 0);
return ER_SES_SESSION_EXPIRED;
}
/* update session active time */
session_p->active_time = time (NULL);
#if defined (SERVER_MODE)
#if !defined (NDEBUG)
(void) session_state_verify_ref_count (thread_p, session_p);
#endif
/* increase reference count of new session_p */
session_state_increase_ref_count (thread_p, session_p);
session_set_conn_entry_data (thread_p, session_p);
logtb_set_current_user_active (thread_p, true);
#endif
/* done with the entry */
pthread_mutex_unlock (&session_p->mutex);
return error;
}
/*
* session_remove_expired_sessions () - remove expired sessions
* return : NO_ERROR or error code
*/
static int
session_remove_expired_sessions (THREAD_ENTRY * thread_p)
{
#define EXPIRED_SESSION_BUFFER_SIZE 1024
session_hashmap_iterator it = { thread_p, sessions.states_hashmap };
SESSION_STATE *state = NULL;
int err = NO_ERROR, success = 0;
bool is_expired = false;
SESSION_INFO active_sessions;
SESSION_ID expired_sid_buffer[EXPIRED_SESSION_BUFFER_SIZE];
int n_expired_sids = 0;
int sid_index;
bool finished = false;
active_sessions.count = -1;
active_sessions.session_ids = NULL;
/* Loop until all expired sessions are removed.
* NOTE: We cannot call lf_hash_delete while iterating... lf_hash_delete may have to retry, which also resets the
* lock-free transaction. And resetting lock-free transaction can break our iterator.
*/
while (!finished)
{
it.restart ();
while (true)
{
state = it.iterate ();
if (state == NULL)
{
finished = true;
break;
}
/* iterate next. the mutex lock of the current state will be released */
if (session_check_timeout (state, &active_sessions, &is_expired) != NO_ERROR)
{
pthread_mutex_unlock (&state->mutex);
sessions.states_hashmap.end_tran (thread_p);
err = ER_FAILED;
goto exit_on_end;
}
if (is_expired)
{
/* Now we can destroy this session */
assert (state->ref_count == 0);
if (state->is_keep_session == true)
{
/* keep session */
pthread_mutex_unlock (&state->mutex);
continue;
}
else
{
expired_sid_buffer[n_expired_sids++] = state->id;
/* Destroy the session related resources like session parameters */
(void) session_state_uninit (state);
}
if (n_expired_sids == EXPIRED_SESSION_BUFFER_SIZE)
{
/* No more room in buffer */
/* Interrupt iteration. */
/* Free current entry mutex. */
pthread_mutex_unlock (&state->mutex);
/* End lock-free transaction started by iterator. */
sessions.states_hashmap.end_tran (thread_p);
break;
}
}
}
/* Remove expired sessions. */
for (sid_index = 0; sid_index < n_expired_sids; sid_index++)
{
if (!sessions.states_hashmap.erase (thread_p, expired_sid_buffer[sid_index]))
{
/* we don't have clear operations on this hash table, this shouldn't happen */
assert_release (false);
err = ER_FAILED;
goto exit_on_end;
}
}
n_expired_sids = 0;
}
exit_on_end:
if (active_sessions.session_ids != NULL)
{
assert (active_sessions.count > 0);
free_and_init (active_sessions.session_ids);
}
return err;
#undef EXPIRED_SESSION_BUFFER_SIZE
}
/*
* session_check_timeout () - verify if a session timeout expired
* return : NO_ERROR or error code
* session_p(in) : session id
* active_sessions(in) : array of the active sessions info
* remove(out) : true if session timeout expired and it doesn't have an active connection, false otherwise
*/
static int
session_check_timeout (SESSION_STATE * session_p, SESSION_INFO * active_sessions, bool * remove)
{
int err = NO_ERROR;
time_t curr_time = time (NULL);
(*remove) = false;
if ((curr_time - session_p->active_time) >= prm_get_integer_value (PRM_ID_SESSION_STATE_TIMEOUT))
{
#if defined (SERVER_MODE)
int i;
/* first see if we still have an active connection */
if (active_sessions->count == -1)
{
/* we need to get the active connection list */
err = css_get_session_ids_for_active_connections (&active_sessions->session_ids, &active_sessions->count);
if (err != NO_ERROR)
{
return err;
}
}
for (i = 0; i < active_sessions->count; i++)
{
if (active_sessions->session_ids[i] == session_p->id)
{
/* also update session active time */
session_p->active_time = time (NULL);
return err;
}
}
#endif
/* remove this session: timeout expired and it doesn't have an active connection. */
#if defined (SESSION_DEBUG)
er_log_debug (ARG_FILE_LINE, "timeout expired for session %u\n", session_p->id);
#endif /* SESSION_DEBUG */
(*remove) = true;
}
else
{
#if defined (SESSION_DEBUG)
er_log_debug (ARG_FILE_LINE, "timeout ok for session %u\n", session_p->id);
#endif /* SESSION_DEBUG */
}
return err;
}
/*
* free_session_variable () - free memory allocated for a session variable
* return : void
* var (in) : session variable
*/
static void
free_session_variable (SESSION_VARIABLE * var)
{
if (var == NULL)
{
return;
}
if (var->name != NULL)
{
free_and_init (var->name);
}
if (var->value != NULL)
{
if (QSTR_IS_ANY_CHAR_OR_BIT (DB_VALUE_DOMAIN_TYPE (var->value)))
{
/* free allocated string */
free_and_init (var->value->data.ch.medium.buf);
}
free_and_init (var->value);
}
free_and_init (var);
}
/*
* session_add_variable () - add a session variable to the list
* return: error code
* state_p (in) : session state object
* name (in) : name of the variable
* value (in) : variable value
*/
static int
session_add_variable (SESSION_STATE * state_p, const DB_VALUE * name, DB_VALUE * value)
{
SESSION_VARIABLE *var = NULL;
SESSION_VARIABLE *current = NULL;
int count = 0;
size_t len;
const char *name_str;
assert (DB_VALUE_DOMAIN_TYPE (name) == DB_TYPE_CHAR);
name_str = db_get_string (name);
assert (name_str != NULL);
len = db_get_string_size (name);
len = MAX (len, strlen ("collect_exec_stats"));
if (strncasecmp (name_str, "collect_exec_stats", len) == 0)
{
if (db_get_int (value) == 1)
{
perfmon_start_watch (NULL);
}
else if (db_get_int (value) == 0)
{
perfmon_stop_watch (NULL);
}
}
else if (strncasecmp (name_str, "trace_plan", 10) == 0)
{
if (state_p->plan_string != NULL)
{
free_and_init (state_p->plan_string);
}
state_p->plan_string = strdup (db_get_string (value));
}
current = state_p->session_variables;
while (current)
{
assert (current->name != NULL);
if (intl_identifier_casecmp (name_str, current->name) == 0)
{
/* if it already exists, just update the value */
update_session_variable (current, value);
return NO_ERROR;
}
current = current->next;
count++;
}
if (count >= MAX_SESSION_VARIABLES_COUNT)
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_SES_TOO_MANY_VARIABLES, 0);
return ER_FAILED;
}
/* create a new session variable and add it to the list */
var = (SESSION_VARIABLE *) malloc (sizeof (SESSION_VARIABLE));
if (var == NULL)
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, sizeof (SESSION_VARIABLE));
return ER_FAILED;
}
len = db_get_string_size (name);
var->name = (char *) malloc (len + 1);
if (var->name == NULL)
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, len + 1);
goto error;
}
memcpy (var->name, name_str, len);
var->name[len] = 0;
var->value = db_value_alloc_and_copy (value);
/* add new variable to the beginning of the list */
var->next = state_p->session_variables;
state_p->session_variables = var;
return NO_ERROR;
error:
if (var != NULL)
{
if (var->name)
{
free_and_init (var->name);
}
pr_clear_value (var->value);
free_and_init (var);
}
return ER_FAILED;
}
/*
* db_value_alloc_and_copy () - create a DB_VALUE on the heap
* return : DB_VALUE or NULL
* src (in) : value to copy
*/
static DB_VALUE *
db_value_alloc_and_copy (const DB_VALUE * src)
{
DB_TYPE src_dbtype;
TP_DOMAIN *domain = NULL;
DB_VALUE *dest = NULL;
DB_VALUE conv;
int length = 0, precision = 0, scale = 0;
char *str = NULL;
const char *src_str;
dest = (DB_VALUE *) malloc (sizeof (DB_VALUE));
if (dest == NULL)
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, sizeof (DB_VALUE));
return NULL;
}
src_dbtype = DB_VALUE_DOMAIN_TYPE (src);
if (DB_IS_NULL (src))
{
db_make_null (dest);
return dest;
}
if (TP_IS_NUMERIC_TYPE (src_dbtype))
{
pr_clone_value ((DB_VALUE *) src, dest);
return dest;
}
if (!QSTR_IS_ANY_CHAR_OR_BIT (src_dbtype))
{
/* attempt to convert to varchar */
db_make_null (&conv);
domain = db_type_to_db_domain (DB_TYPE_VARCHAR);
domain->precision = TP_FLOATING_PRECISION_VALUE;
if (tp_value_cast (src, &conv, domain, false) != DOMAIN_COMPATIBLE)
{
db_make_null (dest);
return dest;
}
src_dbtype = DB_TYPE_VARCHAR;
free_and_init (dest);
dest = db_value_alloc_and_copy (&conv);
pr_clear_value (&conv);
return dest;
}
length = db_get_string_size (src);
scale = 0;
str = (char *) malloc (length + 1);
if (str == NULL)
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, (size_t) (length + 1));
return NULL;
}
src_str = db_get_string (src);
if (src_str != NULL)
{
memcpy (str, src_str, length);
}
precision = db_value_precision (src);
db_value_domain_init (dest, src_dbtype, precision, scale);
dest->need_clear = true;
switch (src_dbtype)
{
case DB_TYPE_CHAR:
db_make_char (dest, precision, str, length, db_get_string_codeset (src), db_get_string_collation (src));
break;
case DB_TYPE_VARCHAR:
db_make_varchar (dest, precision, str, length, db_get_string_codeset (src), db_get_string_collation (src));
break;
case DB_TYPE_BIT:
db_make_bit (dest, precision, str, length);
break;
case DB_TYPE_VARBIT:
db_make_varbit (dest, precision, str, length);
break;
default:
assert (false);
return NULL;
}
return dest;
}
/*
* update_session_variable () - update the value of a session variable
* return : void
* var (in/out) : the variable to update
* new_value (in) : the new value
*/
static void
update_session_variable (SESSION_VARIABLE * var, const DB_VALUE * new_value)
{
if (var->value != NULL)
{
if (QSTR_IS_ANY_CHAR_OR_BIT (DB_VALUE_DOMAIN_TYPE (var->value)))
{
/* free allocated string */
free_and_init (var->value->data.ch.medium.buf);
}
free_and_init (var->value);
}
var->value = db_value_alloc_and_copy (new_value);
}
/*
* session_drop_variable () - drop a session variable from the list
* return: error code
* state_p (in) : session state object
* name (in) : name of the variable
*/
static int
session_drop_variable (SESSION_STATE * state_p, const DB_VALUE * name)
{
SESSION_VARIABLE *current = NULL, *prev = NULL;
const char *name_str;
if (state_p->session_variables == NULL)
{
return NO_ERROR;
}
assert (DB_VALUE_DOMAIN_TYPE (name) == DB_TYPE_CHAR);
name_str = db_get_string (name);
assert (name_str != NULL);
current = state_p->session_variables;
while (current)
{
assert (current->name != NULL);
if (intl_identifier_casecmp (name_str, current->name) == 0)
{
SESSION_VARIABLE *next = current->next;
free_session_variable (current);
if (prev == NULL)
{
state_p->session_variables = next;
}
else
{
prev->next = next;
}
return NO_ERROR;
}
prev = current;
current = current->next;
}
return NO_ERROR;
}
/*
* session_get_session_id () - get the session id associated with a thread
* return : NO_ERROR or error code
* thread_p (in) : thread for which to get the session id
* session_id(out): session_id
*/
int
session_get_session_id (THREAD_ENTRY * thread_p, SESSION_ID * id)
{
assert (id != NULL);
#if !defined(SERVER_MODE)
*id = db_Session_id;
return NO_ERROR;
#else
if (thread_p == NULL)
{
return ER_FAILED;
}
if (thread_p->conn_entry == NULL)
{
return ER_FAILED;
}
*id = thread_p->conn_entry->session_id;
return NO_ERROR;
#endif /* SERVER_MODE */
}
/*
* session_get_last_insert_id () - get the value of the last inserted id
* in the session associated with a thread
* return : NO_ERROR or error code
* thread_p (in) : thread that identifies the session
* value (out) : pointer into which to store the last insert id value
* update_last_insert_id(in): whether update the last insert id
*/
int
session_get_last_insert_id (THREAD_ENTRY * thread_p, DB_VALUE * value, bool update_last_insert_id)
{
SESSION_STATE *state_p = NULL;
assert (value != NULL);
state_p = session_get_session_state (thread_p);
if (state_p == NULL)
{
return ER_FAILED;
}
if (update_last_insert_id && !state_p->is_trigger_involved && !DB_IS_NULL (&state_p->cur_insert_id))
{
pr_clone_value (&state_p->cur_insert_id, &state_p->last_insert_id);
pr_clear_value (&state_p->cur_insert_id);
}
pr_clone_value (&state_p->last_insert_id, value);
return NO_ERROR;
}
/*
* session_set_cur_insert_id () - set the value of the current inserted id
* in the session associated with a thread
* return : NO_ERROR or error code
* thread_p (in) : thread that identifies the session
* value (in) : the value of the last inserted id
* force (in) : update the value unconditionally
*
* Note: Even though we allow other data types for serial columns, the session
* keeps the value of the last insert id as a DB_TYPE_NUMERIC. This function
* performs a coercion here if needed.
*/
int
session_set_cur_insert_id (THREAD_ENTRY * thread_p, const DB_VALUE * value, bool force)
{
SESSION_STATE *state_p = NULL;
bool need_coercion = false;
if (DB_VALUE_TYPE (value) != DB_TYPE_NUMERIC)
{
need_coercion = true;
}
else if (DB_VALUE_PRECISION (value) != DB_MAX_NUMERIC_PRECISION || DB_VALUE_SCALE (value) != 0)
{
need_coercion = true;
}
state_p = session_get_session_state (thread_p);
if (state_p == NULL)
{
return ER_FAILED;
}
if ((force == false && state_p->is_last_insert_id_generated == true) || state_p->is_trigger_involved == true)
{
return NO_ERROR;
}
if (!DB_IS_NULL (&state_p->cur_insert_id))
{
pr_clone_value (&state_p->cur_insert_id, &state_p->last_insert_id);
pr_clear_value (&state_p->cur_insert_id);
}
if (!need_coercion)
{
pr_clone_value ((DB_VALUE *) value, &state_p->cur_insert_id);
}
else
{
TP_DOMAIN *num = tp_domain_resolve_default (DB_TYPE_NUMERIC);
num->precision = DB_MAX_NUMERIC_PRECISION;
num->scale = 0;
if (tp_value_cast (value, &state_p->cur_insert_id, num, false) != DOMAIN_COMPATIBLE)
{
pr_clear_value (&state_p->cur_insert_id);
return ER_FAILED;
}
}
state_p->is_last_insert_id_generated = true;
return NO_ERROR;
}
/*
* session_reset_cur_insert_id () - reset the current insert_id as NULL
* when the insert fail.
* return : NO_ERROR or error code
* thread_p (in) : thread that identifies the session
*/
int
session_reset_cur_insert_id (THREAD_ENTRY * thread_p)
{
SESSION_STATE *state_p = NULL;
state_p = session_get_session_state (thread_p);
if (state_p == NULL)
{
return ER_FAILED;
}
if (state_p->is_trigger_involved)
{
return NO_ERROR;
}
if (state_p->is_last_insert_id_generated == false)
{
return NO_ERROR;
}
pr_clear_value (&state_p->cur_insert_id);
state_p->is_last_insert_id_generated = false;
return NO_ERROR;
}
/*
* session_begin_insert_values () - set is_last_insert_id_generated to false
* in the session associated with a thread
* return : NO_ERROR or error code
* thread_p (in) : thread that identifies the session
*/
int
session_begin_insert_values (THREAD_ENTRY * thread_p)
{
SESSION_STATE *state_p = NULL;
state_p = session_get_session_state (thread_p);
if (state_p == NULL)
{
return ER_FAILED;
}
if (state_p->is_trigger_involved)
{
return NO_ERROR;
}
state_p->is_last_insert_id_generated = false;
return NO_ERROR;
}
/*
* session_set_trigger_state () - set is_trigger_involved
*
* return : NO_ERROR or error code
* thread_p (in) : thread that identifies the session
* in_trigger(in):
*/
int
session_set_trigger_state (THREAD_ENTRY * thread_p, bool in_trigger)
{
SESSION_STATE *state_p = NULL;
state_p = session_get_session_state (thread_p);
if (state_p == NULL)
{
return ER_FAILED;
}
state_p->is_trigger_involved = in_trigger;
return NO_ERROR;
}
/*
* session_get_row_count () - get the affected row count from the session
* associated with a thread
* return : NO_ERROR or error code
* thread_p(in) : thread that identifies the session
* row_count(out) : pointer into which to store the count
*
* Note: Row count refers to the number of rows affected by the last INSERT,
* UPDATE or DELETE statement
*/
int
session_get_row_count (THREAD_ENTRY * thread_p, int *row_count)
{
SESSION_STATE *state_p = NULL;
assert (row_count != NULL);
state_p = session_get_session_state (thread_p);
if (state_p == NULL)
{
return ER_FAILED;
}
*row_count = state_p->row_count;
return NO_ERROR;
}
/*
* session_set_row_count () - set the count of affected rows for a session
*
* return : NO_ERROR or error code
* thread_p(in) : thread that identifies the session
* row_count(in) : row count
*/
int
session_set_row_count (THREAD_ENTRY * thread_p, const int row_count)
{
SESSION_STATE *state_p = NULL;
state_p = session_get_session_state (thread_p);
if (state_p == NULL)
{
return ER_FAILED;
}
#if 0
er_log_debug (ARG_FILE_LINE, "setting row_count for session %u to %d\n", state_p->id, state_p->row_count);
#endif
state_p->row_count = row_count;
return NO_ERROR;
}
/*
* session_set_is_keep_session () - set the is_keep_session flag for a session
* return : NO_ERROR or error code
* thread_p (in) : thread that identifies the session
* is_keep_session (in) : whether to keep the session
*/
int
session_set_is_keep_session (THREAD_ENTRY * thread_p, bool is_keep_session)
{
SESSION_STATE *state_p = NULL;
state_p = session_get_session_state (thread_p);
if (state_p == NULL)
{
return ER_FAILED;
}
state_p->is_keep_session = is_keep_session;
return NO_ERROR;
}
/*
* session_get_session_params () - get the list of session parameters stored
* in session_state
*
* return : NO_ERROR or error code
* thread_p(in) : thread that identifies the session
* session_parameters_ptr(out) : pointer to session parameter list
*/
int
session_get_session_parameters (THREAD_ENTRY * thread_p, SESSION_PARAM ** session_parameters_ptr)
{
SESSION_STATE *state_p = NULL;
assert (session_parameters_ptr != NULL);
if (*session_parameters_ptr)
{
free_and_init (*session_parameters_ptr);
}
state_p = session_get_session_state (thread_p);
if (state_p == NULL)
{
return ER_FAILED;
}
*session_parameters_ptr = state_p->session_parameters;
return NO_ERROR;
}
/*
* session_set_session_parameters () - set session parameters to session state
*
* return : error code
* thread_p (in) : worker thread
* session_parameters (in) : array of session parameters
*/
int
session_set_session_parameters (THREAD_ENTRY * thread_p, SESSION_PARAM * session_parameters)
{
SESSION_STATE *state_p = NULL;
assert (session_parameters != NULL);
state_p = session_get_session_state (thread_p);
if (state_p == NULL)
{
return ER_FAILED;
}
state_p->session_parameters = session_parameters;
return NO_ERROR;
}
/*
* session_create_prepared_statement () - create a prepared statement and add
* it to the prepared statements list
* return : NO_ERROR or error code
* thread_p (in) : thread entry
* name (in) : the name of the statement
* alias_print (in) : the printed compiled statement
* sha1 (in) : sha1 hash for printed compiled statement
* info (in) : serialized prepared statement info
* info_len (in) : serialized buffer length
*
* Note: This function assumes that the memory for its arguments was
* dynamically allocated and does not copy the values received. It's important
* that the caller never frees this memory. If an error occurs, this function
* will free the memory allocated for its arguments.
*/
int
session_create_prepared_statement (THREAD_ENTRY * thread_p, char *name, char *alias_print, SHA1Hash * sha1, char *info,
int info_len)
{
SESSION_STATE *state_p = NULL;
PREPARED_STATEMENT *stmt_p = NULL;
int err = NO_ERROR;
stmt_p = (PREPARED_STATEMENT *) malloc (sizeof (PREPARED_STATEMENT));
if (stmt_p == NULL)
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, sizeof (PREPARED_STATEMENT));
err = ER_FAILED;
goto error;
}
stmt_p->name = name;
stmt_p->alias_print = alias_print;
stmt_p->sha1 = *sha1;
stmt_p->info_length = info_len;
stmt_p->info = info;
stmt_p->next = NULL;
state_p = session_get_session_state (thread_p);
if (state_p == NULL)
{
err = ER_FAILED;
goto error;
}
#if defined (SESSION_DEBUG)
er_log_debug (ARG_FILE_LINE, "create statement %s(%d)\n", name, state_p->id);
#endif /* SESSION_DEBUG */
if (state_p->statements == NULL)
{
state_p->statements = stmt_p;
}
else
{
/* find and remove prepared statements with the same name */
int cnt = 0;
PREPARED_STATEMENT *current = NULL, *prev = NULL;
current = state_p->statements;
while (current != NULL)
{
if (intl_identifier_casecmp (current->name, name) == 0)
{
/* we need to remove it */
if (prev == NULL)
{
state_p->statements = current->next;
}
else
{
prev->next = current->next;
}
current->next = NULL;
session_free_prepared_statement (current);
break;
}
cnt++;
prev = current;
current = current->next;
}
if (cnt >= MAX_PREPARED_STATEMENTS_COUNT)
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_SES_TOO_MANY_STATEMENTS, 0);
err = ER_FAILED;
goto error;
}
else if (state_p->statements == NULL)
{
/* add the new statement at the beginning of the list */
state_p->statements = stmt_p;
}
else
{
stmt_p->next = state_p->statements;
state_p->statements = stmt_p;
}
}
#if defined (SESSION_DEBUG)
er_log_debug (ARG_FILE_LINE, "success %s(%d)\n", name, state_p->id);
#endif /* SESSION_DEBUG */
return NO_ERROR;
error:
if (stmt_p != NULL)
{
free_and_init (stmt_p);
}
return err;
}
/*
* session_get_prepared_statement () - get available information about a prepared statement
* return: NO_ERROR or error code
* thread_p (in) :
* name (in) : the name of the prepared statement
* info (out) : serialized prepared statement information
* info_len (out) : serialized buffer length
* xasl_id (out) : XASL ID for this statement
*
* Note: This function allocates memory for query, columns and parameters using db_private_alloc. This memory must be
* freed by the caller by using db_private_free.
*/
int
session_get_prepared_statement (THREAD_ENTRY * thread_p, const char *name, char **info, int *info_len,
xasl_cache_ent ** xasl_entry)
{
SESSION_STATE *state_p = NULL;
PREPARED_STATEMENT *stmt_p = NULL;
int err = NO_ERROR;
const char *alias_print;
char *data = NULL;
assert (xasl_entry != NULL);
state_p = session_get_session_state (thread_p);
if (state_p == NULL)
{
return ER_FAILED;
}
for (stmt_p = state_p->statements; stmt_p != NULL; stmt_p = stmt_p->next)
{
if (intl_identifier_casecmp (stmt_p->name, name) == 0)
{
break;
}
}
if (stmt_p == NULL)
{
/* prepared statement not found */
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_IT_PREPARED_NAME_NOT_FOUND, 1, name);
return ER_FAILED;
}
/* alias_print */
alias_print = stmt_p->alias_print;
*info_len = stmt_p->info_length;
if (stmt_p->info_length == 0)
{
*info = NULL;
}
else
{
data = (char *) malloc (stmt_p->info_length);
if (data == NULL)
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, (size_t) stmt_p->info_length);
return ER_FAILED;
}
memcpy (data, stmt_p->info, stmt_p->info_length);
*info = data;
}
/* since the xasl id is not session specific, we can fetch it outside of the session critical section */
if (alias_print == NULL)
{
/* if we don't have an alias print, we do not search for the XASL entry */
*xasl_entry = NULL;
#if defined (SESSION_DEBUG)
er_log_debug (ARG_FILE_LINE, "found null xasl_id for %s(%d)\n", name, state_p->id);
#endif /* SESSION_DEBUG */
return NO_ERROR;
}
*xasl_entry = NULL;
err = xcache_find_sha1 (thread_p, &stmt_p->sha1, XASL_CACHE_SEARCH_GENERIC, xasl_entry, NULL);
if (err != NO_ERROR)
{
ASSERT_ERROR ();
return err;
}
return NO_ERROR;
}
/*
* session_delete_prepared_statement () - delete a prepared statement
* return : error code or NO_ERROR
* thread_p (in) :
* name (in) : name of the prepared statement
*/
int
session_delete_prepared_statement (THREAD_ENTRY * thread_p, const char *name)
{
SESSION_STATE *state_p = NULL;
PREPARED_STATEMENT *stmt_p = NULL, *prev = NULL;
bool found = false;
state_p = session_get_session_state (thread_p);
if (state_p == NULL)
{
return ER_FAILED;
}
#if defined (SESSION_DEBUG)
er_log_debug (ARG_FILE_LINE, "dropping %s from session_id %d\n", name, state_p->id);
#endif /* SESSION_DEBUG */
for (stmt_p = state_p->statements, prev = NULL; stmt_p != NULL; prev = stmt_p, stmt_p = stmt_p->next)
{
if (intl_identifier_casecmp (stmt_p->name, name) == 0)
{
if (prev == NULL)
{
state_p->statements = stmt_p->next;
}
else
{
prev->next = stmt_p->next;
}
stmt_p->next = NULL;
session_free_prepared_statement (stmt_p);
found = true;
break;
}
}
if (!found)
{
/* prepared statement not found */
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_IT_PREPARED_NAME_NOT_FOUND, 1, name);
return ER_FAILED;
}
return NO_ERROR;
}
/*
* login_user () - login user
* return : error code
* thread_p : worker thread
* username(in) : name of the user
*/
int
login_user (THREAD_ENTRY * thread_p, const char *username)
{
int tran_index = LOG_FIND_THREAD_TRAN_INDEX (thread_p);
LOG_TDES *tdes = NULL;
tdes = LOG_FIND_TDES (tran_index);
if (tdes != NULL)
{
tdes->client.set_user (username);
}
return NO_ERROR;
}
/*
* session_set_session_variables () - set session variables
* return : error code
* thread_p (in) : worker thread
* values (in) : array of variables to set
* count (in) : number of elements in array
*/
int
session_set_session_variables (THREAD_ENTRY * thread_p, DB_VALUE * values, const int count)
{
SESSION_STATE *state_p = NULL;
int i = 0;
assert (count % 2 == 0);
state_p = session_get_session_state (thread_p);
if (state_p == NULL)
{
return ER_FAILED;
}
for (i = 0; i < count; i += 2)
{
if (session_add_variable (state_p, &values[i], &values[i + 1]) != NO_ERROR)
{
return ER_FAILED;
}
}
return NO_ERROR;
}
/*
* session_define_variable () - define a session variable
* return : int
* thread_p (in) : worker thread
* name (in) : name of the variable
* value (in) : variable value
*/
int
session_define_variable (THREAD_ENTRY * thread_p, DB_VALUE * name, DB_VALUE * value, DB_VALUE * result)
{
SESSION_STATE *state_p = NULL;
int err = NO_ERROR;
assert (DB_VALUE_DOMAIN_TYPE (name) == DB_TYPE_CHAR);
state_p = session_get_session_state (thread_p);
if (state_p == NULL)
{
return ER_FAILED;
}
err = session_add_variable (state_p, name, value);
if (err == NO_ERROR)
{
pr_clone_value (value, result);
}
else
{
db_make_null (result);
}
return err;
}
/*
* session_get_variable () - get the value of a variable
* return : error code
* thread_p (in) : worker thread
* name (in) : name of the variable
* result (out) : variable value
*/
int
session_get_variable (THREAD_ENTRY * thread_p, const DB_VALUE * name, DB_VALUE * result)
{
SESSION_STATE *state_p = NULL;
const char *name_str;
SESSION_VARIABLE *var;
assert (DB_VALUE_DOMAIN_TYPE (name) == DB_TYPE_CHAR);
name_str = db_get_string (name);
assert (name_str != NULL);
state_p = session_get_session_state (thread_p);
if (state_p == NULL)
{
return ER_FAILED;
}
var = state_p->session_variables;
while (var != NULL)
{
assert (var->name != NULL);
if (intl_identifier_casecmp (var->name, name_str) == 0)
{
pr_clone_value (var->value, result);
break;
}
var = var->next;
}
if (var == NULL)
{
/* we didn't find it, set error and exit */
char *var_name = NULL;
size_t name_len = strlen (name_str);
var_name = (char *) malloc (name_len + 1);
if (var_name != NULL)
{
memcpy (var_name, name_str, name_len);
var_name[name_len] = 0;
}
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_SES_VARIABLE_NOT_FOUND, 1, var_name);
if (var_name != NULL)
{
free_and_init (var_name);
}
return ER_FAILED;
}
return NO_ERROR;
}
/*
* session_get_session_variable_no_copy () - get the value of a session
* variable
* return : int
* thread_p (in) : worker thread
* name (in) : name of the variable
* value (in/out): variable value
* Note: This function gets a reference to a session variable from the session
* state object. Because it gets the actual pointer, it is not thread safe
* and it should only be called in the stand alone mode
*/
int
session_get_variable_no_copy (THREAD_ENTRY * thread_p, const DB_VALUE * name, DB_VALUE ** result)
{
SESSION_ID id;
SESSION_STATE *state_p = NULL;
size_t name_len;
const char *name_str;
SESSION_VARIABLE *var;
#if defined (SERVER_MODE)
/* do not call this function in a multi-threaded context */
assert (false);
return ER_FAILED;
#endif
assert (name != NULL);
assert (DB_VALUE_DOMAIN_TYPE (name) == DB_TYPE_CHAR);
assert (result != NULL);
name_str = db_get_string (name);
name_len = (name_str != NULL) ? strlen (name_str) : 0;
if (session_get_session_id (thread_p, &id) != NO_ERROR)
{
return ER_FAILED;
}
state_p = sessions.states_hashmap.find (thread_p, id);
if (state_p == NULL)
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_SES_SESSION_EXPIRED, 0);
return ER_FAILED;
}
var = state_p->session_variables;
while (var != NULL)
{
assert (var->name != NULL);
if (name_len != strlen (var->name))
{
var = var->next;
continue;
}
if (strncasecmp (var->name, name_str, name_len) == 0)
{
*result = var->value;
break;
}
var = var->next;
}
if (var == NULL)
{
/* we didn't find it, set error and exit */
char *var_name = NULL;
var_name = (char *) malloc (name_len + 1);
if (var_name != NULL)
{
memcpy (var_name, name_str, name_len);
var_name[name_len] = 0;
}
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_SES_VARIABLE_NOT_FOUND, 1, var_name);
if (var_name != NULL)
{
free_and_init (var_name);
}
pthread_mutex_unlock (&state_p->mutex);
return ER_FAILED;
}
/* done with the entry */
pthread_mutex_unlock (&state_p->mutex);
return NO_ERROR;
}
/*
* session_drop_session_variables () - drop session variables
* return : error code
* thread_p (in) : worker thread
* values (in) : array of variables to drop
* count (in) : number of elements in array
*/
int
session_drop_session_variables (THREAD_ENTRY * thread_p, DB_VALUE * values, const int count)
{
SESSION_STATE *state_p = NULL;
int i = 0;
state_p = session_get_session_state (thread_p);
if (state_p == NULL)
{
return ER_FAILED;
}
for (i = 0; i < count; i++)
{
if (session_drop_variable (state_p, &values[i]) != NO_ERROR)
{
return ER_FAILED;
}
}
return NO_ERROR;
}
/*
* session_get_exec_stats_and_clear () - get execution statistics
* return : error code
* thread_p (in) : worker thread
* name (in) : name of the stats
* result (out) : stats value
*/
int
session_get_exec_stats_and_clear (THREAD_ENTRY * thread_p, const DB_VALUE * name, DB_VALUE * result)
{
const char *name_str;
UINT64 stat_val;
assert (DB_VALUE_DOMAIN_TYPE (name) == DB_TYPE_CHAR);
name_str = db_get_string (name);
stat_val = perfmon_get_stats_and_clear (thread_p, name_str);
db_make_bigint (result, stat_val);
return NO_ERROR;
}
/*
* session_states_dump () - dump the session states information
* return: void
* thread_p (in) : the thread executing this function
*/
void
session_states_dump (THREAD_ENTRY * thread_p)
{
session_hashmap_iterator it = { thread_p, sessions.states_hashmap };
SESSION_STATE *state;
size_t session_count = sessions.states_hashmap.get_element_count ();
fprintf (stdout, "\nSESSION COUNT = %zu\n", session_count);
for (state = it.iterate (); state != NULL; state = it.iterate ())
{
session_dump_session (state);
}
fflush (stdout);
}
/*
* session_dump_session () - dump a session state
* return : NO_ERROR
* key(in) : the key from the MHT_TABLE for this session
* data(in): session state data
* args(in): not used
*/
static int
session_dump_session (SESSION_STATE * session)
{
SESSION_VARIABLE *vcurent, *vnext;
PREPARED_STATEMENT *pcurent, *pnext;
DB_VALUE v;
fprintf (stdout, "SESSION ID = %d\n", session->id);
db_value_coerce (&session->last_insert_id, &v, db_type_to_db_domain (DB_TYPE_VARCHAR));
fprintf (stdout, "\tLAST_INSERT_ID = %s\n", db_get_string (&v));
db_value_clear (&v);
fprintf (stdout, "\tROW_COUNT = %d\n", session->row_count);
fprintf (stdout, "\tAUTO_COMMIT = %d\n", session->auto_commit);
fprintf (stdout, "\tSESSION VARIABLES\n");
vcurent = session->session_variables;
while (vcurent != NULL)
{
vnext = vcurent->next;
session_dump_variable (vcurent);
vcurent = vnext;
}
fprintf (stdout, "\tPREPRARE STATEMENTS\n");
pcurent = session->statements;
while (pcurent != NULL)
{
pnext = pcurent->next;
session_dump_prepared_statement (pcurent);
pcurent = pnext;
}
fprintf (stdout, "\n");
return NO_ERROR;
}
/*
* session_dump_variable () - dump a session variable
* return : void
* var (in) : session variable
*/
static void
session_dump_variable (SESSION_VARIABLE * var)
{
DB_VALUE v;
if (var == NULL)
{
return;
}
if (var->name != NULL)
{
fprintf (stdout, "\t\t%s = ", var->name);
}
if (var->value != NULL)
{
db_value_coerce (var->value, &v, db_type_to_db_domain (DB_TYPE_VARCHAR));
fprintf (stdout, "%s\n", db_get_string (&v));
db_value_clear (&v);
}
}
/*
* session_dump_prepared_statement () - dump a prepared statement
* return : void
* stmt_p (in) : prepared statement object
*/
static void
session_dump_prepared_statement (PREPARED_STATEMENT * stmt_p)
{
if (stmt_p == NULL)
{
return;
}
if (stmt_p->name != NULL)
{
fprintf (stdout, "\t\t%s = ", stmt_p->name);
}
if (stmt_p->alias_print != NULL)
{
fprintf (stdout, "%s\n", stmt_p->alias_print);
fprintf (stdout, "sha1 = %08x | %08x | %08x | %08x | %08x\n", SHA1_AS_ARGS (&stmt_p->sha1));
}
}
/*
* qentry_to_sentry () - create a session query entry from a query manager
* entry
* return : session query entry or NULL
* qentry_p (in) : query manager query entry
*/
static SESSION_QUERY_ENTRY *
qentry_to_sentry (QMGR_QUERY_ENTRY * qentry_p)
{
SESSION_QUERY_ENTRY *sqentry_p = NULL;
assert (qentry_p != NULL);
sqentry_p = (SESSION_QUERY_ENTRY *) malloc (sizeof (SESSION_QUERY_ENTRY));
if (sqentry_p == NULL)
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_OUT_OF_VIRTUAL_MEMORY, 1, sizeof (SESSION_QUERY_ENTRY));
return NULL;
}
sqentry_p->query_id = qentry_p->query_id;
sqentry_p->list_id = qentry_p->list_id;
sqentry_p->temp_file = qentry_p->temp_vfid;
qentry_p->list_id = NULL;
qentry_p->temp_vfid = NULL;
sqentry_p->num_tmp = qentry_p->num_tmp;
sqentry_p->total_count = qentry_p->total_count;
sqentry_p->query_flag = qentry_p->query_flag;
sqentry_p->next = NULL;
return sqentry_p;
}
/*
* session_preserve_temporary_files () - remove list files used by qentry_p
* from the file manager so that it
* doesn't delete them at transaction
* end
* return : error code or NO_ERROR
* thread_p (in) :
* qentry_p (in) : query entry
*/
static int
session_preserve_temporary_files (THREAD_ENTRY * thread_p, SESSION_QUERY_ENTRY * qentry_p)
{
QMGR_TEMP_FILE *tfile_vfid_p = NULL, *temp = NULL;
if (qentry_p == NULL)
{
assert (false);
return NO_ERROR;
}
if (qentry_p->list_id == NULL)
{
return NO_ERROR;
}
if (qentry_p->temp_file)
{
tfile_vfid_p = qentry_p->temp_file;
tfile_vfid_p->prev->next = NULL;
while (tfile_vfid_p)
{
if (!VFID_ISNULL (&tfile_vfid_p->temp_vfid))
{
if (!tfile_vfid_p->preserved)
{
file_temp_preserve (thread_p, &tfile_vfid_p->temp_vfid);
tfile_vfid_p->preserved = true;
}
}
temp = tfile_vfid_p;
tfile_vfid_p = tfile_vfid_p->next;
}
}
return NO_ERROR;
}
/*
* sentry_to_qentry () - create a query manager entry from a session query
* entry
* return : void
* sentry_p (in) : session query entry
* qentry_p (in/out) : query manager query entry
*/
static void
sentry_to_qentry (const SESSION_QUERY_ENTRY * sentry_p, QMGR_QUERY_ENTRY * qentry_p)
{
qentry_p->query_id = sentry_p->query_id;
qentry_p->list_id = sentry_p->list_id;
qentry_p->temp_vfid = sentry_p->temp_file;
qentry_p->list_ent = NULL;
qentry_p->num_tmp = sentry_p->num_tmp;
qentry_p->total_count = sentry_p->total_count;
qentry_p->query_status = QUERY_COMPLETED;
qentry_p->query_flag = sentry_p->query_flag;
XASL_ID_SET_NULL (&qentry_p->xasl_id);
qentry_p->xasl_ent = NULL;
qentry_p->er_msg = NULL;
qentry_p->is_holdable = true;
}
/*
* session_store_query_entry_info () - create a query entry
* return : void
* thread_p (in) :
* qentry_p (in) : query entry
*/
void
session_store_query_entry_info (THREAD_ENTRY * thread_p, QMGR_QUERY_ENTRY * qentry_p)
{
SESSION_STATE *state_p = NULL;
SESSION_QUERY_ENTRY *sqentry_p = NULL, *current = NULL;
assert (qentry_p != NULL);
state_p = session_get_session_state (thread_p);
if (state_p == NULL)
{
return;
}
/* iterate over queries so we don't add the same query twice */
current = state_p->queries;
while (current != NULL)
{
if (current->query_id == qentry_p->query_id)
{
/* we don't need to add it again, just set list_id to null so that the query manager does not drop it */
qentry_p->list_id = NULL;
qentry_p->temp_vfid = NULL;
return;
}
current = current->next;
}
/* We didn't find it. Create an entry and add it to the list */
sqentry_p = qentry_to_sentry (qentry_p);
if (sqentry_p == NULL)
{
return;
}
session_preserve_temporary_files (thread_p, sqentry_p);
if (state_p->queries == NULL)
{
state_p->queries = sqentry_p;
}
else
{
sqentry_p->next = state_p->queries;
state_p->queries = sqentry_p;
}
sessions.num_holdable_cursors++;
}
/*
* session_free_sentry_data () - close list files associated with a query
* entry
* return : void
* thread_p (in) :
* sentry_p (in) :
*/
static void
session_free_sentry_data (THREAD_ENTRY * thread_p, SESSION_QUERY_ENTRY * sentry_p)
{
if (sentry_p == NULL)
{
return;
}
if (sentry_p->list_id != NULL)
{
qfile_close_list (thread_p, sentry_p->list_id);
qfile_free_list_id (sentry_p->list_id);
}
if (sentry_p->temp_file != NULL)
{
qmgr_free_temp_file_list (thread_p, sentry_p->temp_file, sentry_p->query_id, false);
}
sessions.num_holdable_cursors--;
}
/*
* session_load_query_entry_info () - search for a query entry
* return : error code or NO_ERROR
* thread_p (in) :
* qentry_p (in/out) : query entry
*/
int
session_load_query_entry_info (THREAD_ENTRY * thread_p, QMGR_QUERY_ENTRY * qentry_p)
{
SESSION_STATE *state_p = NULL;
SESSION_QUERY_ENTRY *sentry_p = NULL;
state_p = session_get_session_state (thread_p);
if (state_p == NULL)
{
return ER_FAILED;
}
sentry_p = state_p->queries;
while (sentry_p != NULL)
{
if (sentry_p->query_id == qentry_p->query_id)
{
sentry_to_qentry (sentry_p, qentry_p);
return NO_ERROR;
}
sentry_p = sentry_p->next;
}
return ER_FAILED;
}
/*
* session_remove_query_entry_all () - remove all query entries from the session
* thread_p (in) : active thread
*/
void
session_remove_query_entry_all (THREAD_ENTRY * thread_p)
{
SESSION_STATE *state_p = NULL;
SESSION_QUERY_ENTRY *sentry_p = NULL, *prev = NULL;
state_p = session_get_session_state (thread_p);
if (state_p == NULL)
{
return;
}
sentry_p = state_p->queries;
while (sentry_p != NULL)
{
session_free_sentry_data (thread_p, sentry_p);
prev = sentry_p;
sentry_p = sentry_p->next;
free_and_init (prev);
}
state_p->queries = NULL;
}
/*
* session_remove_query_entry_info () - remove a query entry from the holdable queries list
* return : error code or NO_ERROR
* thread_p (in) : active thread
* query_id (in) : query id
*/
int
session_remove_query_entry_info (THREAD_ENTRY * thread_p, const QUERY_ID query_id)
{
SESSION_STATE *state_p = NULL;
SESSION_QUERY_ENTRY *sentry_p = NULL, *prev = NULL;
state_p = session_get_session_state (thread_p);
if (state_p == NULL)
{
return ER_FAILED;
}
sentry_p = state_p->queries;
while (sentry_p != NULL)
{
if (sentry_p->query_id == query_id)
{
/* remove sentry_p from the queries list */
if (prev == NULL)
{
state_p->queries = sentry_p->next;
}
else
{
prev->next = sentry_p->next;
}
session_free_sentry_data (thread_p, sentry_p);
free_and_init (sentry_p);
break;
}
prev = sentry_p;
sentry_p = sentry_p->next;
}
return NO_ERROR;
}
/*
* session_clear_query_entry_info () - remove a query entry from the holdable queries list but do not close the
* associated list files
* return : error code or NO_ERROR
* thread_p (in) : active thread
* query_id (in) : query id
*/
int
session_clear_query_entry_info (THREAD_ENTRY * thread_p, const QUERY_ID query_id)
{
SESSION_STATE *state_p = NULL;
SESSION_QUERY_ENTRY *sentry_p = NULL, *prev = NULL;
state_p = session_get_session_state (thread_p);
if (state_p == NULL)
{
return ER_FAILED;
}
sentry_p = state_p->queries;
while (sentry_p != NULL)
{
if (sentry_p->query_id == query_id)
{
/* remove sentry_p from the queries list */
if (prev == NULL)
{
state_p->queries = sentry_p->next;
}
else
{
prev->next = sentry_p->next;
}
free_and_init (sentry_p);
sessions.num_holdable_cursors--;
break;
}
prev = sentry_p;
sentry_p = sentry_p->next;
}
return NO_ERROR;
}
/*
* session_is_queryid_idle () - search for a idle query entry among the holable results
* return : true if the given query_id is idle, false otherwise
* thread_p (in) :
* query_id (in) : query id
* max_query_id_uses (out): max query id among the active ones. caller may use it as a hint
*/
bool
session_is_queryid_idle (THREAD_ENTRY * thread_p, const QUERY_ID query_id, QUERY_ID * max_query_id_uses)
{
SESSION_STATE *state_p = NULL;
SESSION_QUERY_ENTRY *sentry_p = NULL;
*max_query_id_uses = 0;
state_p = session_get_session_state (thread_p);
if (state_p == NULL)
{
return true;
}
for (sentry_p = state_p->queries; sentry_p != NULL; sentry_p = sentry_p->next)
{
if (*max_query_id_uses < sentry_p->query_id)
{
*max_query_id_uses = sentry_p->query_id;
}
if (sentry_p->query_id == query_id)
{
return false;
}
}
return true;
}
/*
* session_set_conn_entry_data () - set references to session state objects
* into the connection entry associated
* with this thread
* return : void
* thread_p (in) : current thread
* session_p (in) : session state object
*/
static void
session_set_conn_entry_data (THREAD_ENTRY * thread_p, SESSION_STATE * session_p)
{
#if defined(SERVER_MODE)
if (thread_p == NULL)
{
thread_p = thread_get_thread_entry_info ();
if (thread_p == NULL)
{
return;
}
}
if (thread_p->conn_entry != NULL)
{
/* If we have a connection entry associated with this thread, setup session data for this conn_entry */
thread_p->conn_entry->session_p = session_p;
thread_p->conn_entry->session_id = session_p->id;
}
thread_p->private_lru_index = session_p->private_lru_index;
pgbuf_thread_variables_init (thread_p);
#endif
}
/*
* session_get_session_parameter () - get a reference to a system parameter
* return : session parameter object
* thread_p (in) : thread entry
* id (in) : parameter id
*/
SESSION_PARAM *
session_get_session_parameter (THREAD_ENTRY * thread_p, PARAM_ID id)
{
SESSION_STATE *session_p = session_get_session_state (thread_p);
if (session_p == NULL)
{
return NULL;
}
assert (id <= PRM_LAST_ID);
#ifndef NDEBUG
int i, count;
count = sysprm_get_session_parameters_count ();
for (i = 0; i < count; i++)
{
if (session_p->session_parameters[i].prm_id == id)
{
assert (prm_Def_session_idx[id] == i);
break;
}
}
if (i >= count)
{
assert (prm_Def_session_idx[id] == -1);
}
#endif
return ((prm_Def_session_idx[id] < 0) ? NULL : &session_p->session_parameters[prm_Def_session_idx[id]]);
}
/*
* session_get_session_state () - get the session state object
* return : session state object or NULL in case of error
* thread_p (in) : thread for which to get the session
*/
static SESSION_STATE *
session_get_session_state (THREAD_ENTRY * thread_p)
{
#if defined(SERVER_MODE)
/* The session state object cached in the conn_entry object associated with every server request. Instead of
* accessing the session states hash through a critical section, we can just return the hashed value. */
if (thread_p == NULL)
{
thread_p = thread_get_thread_entry_info ();
}
if (thread_p != NULL && thread_p->conn_entry != NULL && thread_p->conn_entry->session_p != NULL)
{
return thread_p->conn_entry->session_p;
}
else
{
/* any request for this object should find it cached in the connection entry */
if (thread_p->type == TT_WORKER)
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_SES_SESSION_EXPIRED, 0);
}
return NULL;
}
#else
SESSION_ID id;
int error = NO_ERROR;
SESSION_STATE *state_p = NULL;
error = session_get_session_id (thread_p, &id);
if (error != NO_ERROR)
{
return NULL;
}
state_p = sessions.states_hashmap.find (thread_p, id);
if (state_p == NULL)
{
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_SES_SESSION_EXPIRED, 0);
return NULL;
}
/* not in server mode so no need for mutex or HP */
pthread_mutex_unlock (&state_p->mutex);
return state_p;
#endif
}
/*
* session_get_trace_stats () - return query trace result
* return : query trace
* result(out) :
*/
int
session_get_trace_stats (THREAD_ENTRY * thread_p, DB_VALUE * result)
{
SESSION_STATE *state_p = NULL;
char *trace_str = NULL;
size_t sizeloc;
FILE *fp;
json_t *plan, *xasl, *stats;
DB_VALUE temp_result;
state_p = session_get_session_state (thread_p);
if (state_p == NULL)
{
return ER_FAILED;
}
if (state_p->plan_string == NULL && state_p->trace_stats == NULL)
{
db_make_null (result);
return NO_ERROR;
}
if (state_p->trace_format == QUERY_TRACE_TEXT)
{
fp = port_open_memstream (&trace_str, &sizeloc);
if (fp)
{
if (state_p->plan_string != NULL)
{
fprintf (fp, "\nQuery Plan:\n%s", state_p->plan_string);
}
if (state_p->trace_stats != NULL)
{
fprintf (fp, "\nTrace Statistics:\n%s", state_p->trace_stats);
}
port_close_memstream (fp, &trace_str, &sizeloc);
}
}
else if (state_p->trace_format == QUERY_TRACE_JSON)
{
stats = json_object ();
if (state_p->plan_string != NULL)
{
plan = json_loads (state_p->plan_string, 0, NULL);
if (plan != NULL)
{
json_object_set_new (stats, "Query Plan", plan);
}
}
if (state_p->trace_stats != NULL)
{
xasl = json_loads (state_p->trace_stats, 0, NULL);
if (xasl != NULL)
{
json_object_set_new (stats, "Trace Statistics", xasl);
}
}
trace_str = json_dumps (stats, JSON_INDENT (2) | JSON_PRESERVE_ORDER);
json_object_clear (stats);
json_decref (stats);
}
if (trace_str != NULL)
{
db_make_string (&temp_result, trace_str);
pr_clone_value (&temp_result, result);
free_and_init (trace_str);
}
else
{
db_make_null (result);
}
thread_set_clear_trace (thread_p, true);
return NO_ERROR;
}
/*
* session_set_trace_stats () - save query trace result to session
* return :
* stats(in) :
* format(in) :
*/
int
session_set_trace_stats (THREAD_ENTRY * thread_p, char *stats, int format)
{
SESSION_STATE *state_p = NULL;
state_p = session_get_session_state (thread_p);
if (state_p == NULL)
{
return ER_FAILED;
}
if (state_p->trace_stats != NULL)
{
free_and_init (state_p->trace_stats);
}
state_p->trace_stats = stats;
state_p->trace_format = format;
return NO_ERROR;
}
/*
* session_clear_trace_stats () - clear query trace result from session
* return :
* stats(in) :
* format(in) :
*/
int
session_clear_trace_stats (THREAD_ENTRY * thread_p)
{
SESSION_STATE *state_p = NULL;
assert (thread_need_clear_trace (thread_p) == true);
state_p = session_get_session_state (thread_p);
if (state_p == NULL)
{
return ER_FAILED;
}
if (state_p->plan_string != NULL)
{
free_and_init (state_p->plan_string);
}
if (state_p->trace_stats != NULL)
{
free_and_init (state_p->trace_stats);
}
thread_set_clear_trace (thread_p, false);
return NO_ERROR;
}
/*
* session_get_session_tz_region () - get a reference to the session timezone
* region
* return : reference to session TZ_REGION object
* thread_p (in) : thread entry
*/
TZ_REGION *
session_get_session_tz_region (THREAD_ENTRY * thread_p)
{
SESSION_STATE *session_p = NULL;
session_p = session_get_session_state (thread_p);
if (session_p == NULL)
{
return NULL;
}
return &session_p->session_tz_region;
}
#if !defined (NDEBUG) && defined (SERVER_MODE)
/*
* session_state_verify_ref_count () -
* return :
*
*/
static int
session_state_verify_ref_count (THREAD_ENTRY * thread_p, SESSION_STATE * session_p)
{
int ref_count = 0, r;
CSS_CONN_ENTRY *conn;
if (session_p == NULL)
{
assert (0);
return ER_FAILED;
}
if (css_Active_conn_anchor == NULL)
{
assert (0);
return ER_FAILED;
}
START_SHARED_ACCESS_ACTIVE_CONN_ANCHOR (r);
for (conn = css_Active_conn_anchor; conn != NULL; conn = conn->next)
{
if (session_p->id == conn->session_id)
{
ref_count++;
}
}
if (ref_count != session_p->ref_count)
{
END_SHARED_ACCESS_ACTIVE_CONN_ANCHOR (r);
assert (0);
return ER_FAILED;
}
END_SHARED_ACCESS_ACTIVE_CONN_ANCHOR (r);
return NO_ERROR;
}
#endif
/*
* session_state_increase_ref_count () -
* return :
*
*/
#if defined (SERVER_MODE)
int
session_set_pl_session_parameter (THREAD_ENTRY * thread_p, PARAM_ID id)
{
SESSION_STATE *state_p = NULL;
state_p = session_get_session_state (thread_p);
if (state_p == NULL)
{
return ER_FAILED;
}
assert (state_p->pl_session_p);
state_p->pl_session_p->mark_session_param_changed (id);
return NO_ERROR;
}
int
session_state_increase_ref_count (THREAD_ENTRY * thread_p, SESSION_STATE * state_p)
{
if (state_p == NULL)
{
assert (0);
return ER_FAILED;
}
ATOMIC_INC_32 (&state_p->ref_count, 1);
if (state_p->ref_count <= 0)
{
assert (state_p->ref_count > 0);
ATOMIC_TAS_32 (&state_p->ref_count, 1);
}
return NO_ERROR;
}
/*
* session_state_decrease_ref_count () -
* return :
*
*/
int
session_state_decrease_ref_count (THREAD_ENTRY * thread_p, SESSION_STATE * state_p)
{
if (state_p == NULL)
{
assert (0);
return ER_FAILED;
}
ATOMIC_INC_32 (&state_p->ref_count, -1);
if (state_p->ref_count < 0)
{
assert (state_p->ref_count >= 0);
ATOMIC_TAS_32 (&state_p->ref_count, 0);
}
return NO_ERROR;
}
#endif
/*
* session_get_number_of_holdable_cursors () - return the number of holdable cursors
*
* return : the number of holdable cursors
*
*/
int
session_get_number_of_holdable_cursors (void)
{
return sessions.num_holdable_cursors;
}
/*
* session_get_private_lru_idx () - returns the LRU index of this session
*
*
* return : LRU index
* session_p (in) : session
*
*/
int
session_get_private_lru_idx (const void *session_p)
{
return ((SESSION_STATE *) session_p)->private_lru_index;
}
/*
* session_set_tran_auto_commit () - set transaction auto commit state
*
* return : NO_ERROR or error code
* thread_p(in) : thread
* auto_commit(in) : auto commit
*/
int
session_set_tran_auto_commit (THREAD_ENTRY * thread_p, bool auto_commit)
{
SESSION_STATE *state_p = NULL;
state_p = session_get_session_state (thread_p);
if (state_p == NULL)
{
return ER_FAILED;
}
state_p->auto_commit = auto_commit;
return NO_ERROR;
}
int
session_set_load_session (THREAD_ENTRY * thread_p, load_session * load_session_p)
{
SESSION_STATE *state_p = NULL;
state_p = session_get_session_state (thread_p);
if (state_p == NULL)
{
return ER_FAILED;
}
state_p->load_session_p = load_session_p;
return NO_ERROR;
}
int
session_get_load_session (THREAD_ENTRY * thread_p, REFPTR (load_session, load_session_ref_ptr))
{
SESSION_STATE *state_p = NULL;
state_p = session_get_session_state (thread_p);
if (state_p == NULL)
{
return ER_FAILED;
}
load_session_ref_ptr = state_p->load_session_p;
return NO_ERROR;
}
bool
session_is_pl_session_running (THREAD_ENTRY * thread_p)
{
SESSION_STATE *state_p = NULL;
state_p = session_get_session_state (thread_p);
if (state_p == NULL)
{
return false;
}
return state_p->pl_session_p->is_sp_running ();
}
int
session_get_pl_session (THREAD_ENTRY * thread_p, REFPTR (PL_SESSION, pl_session_ref_ptr))
{
int error = NO_ERROR;
SESSION_STATE *state_p = NULL;
pl_session_ref_ptr = nullptr;
state_p = session_get_session_state (thread_p);
if (state_p == NULL)
{
error = (er_errid () != NO_ERROR) ? er_errid () : ER_FAILED;
}
else
{
assert (state_p->pl_session_p);
if (state_p->pl_session_p->is_sp_running () && state_p->pl_session_p->is_interrupted ())
{
// TODO: should this be an error?
pl_session_ref_ptr = nullptr;
er_set (ER_ERROR_SEVERITY, ARG_FILE_LINE, ER_INTERRUPTED, 0);
error = ER_INTERRUPTED;
}
pl_session_ref_ptr = state_p->pl_session_p;
}
return error;
}
/*
* session_stop_attached_threads - stops extra attached threads (not connection worker thread)
* associated with the session
*
*/
void
session_stop_attached_threads (THREAD_ENTRY * thread_p, void *session_arg)
{
#if defined (SERVER_MODE)
SESSION_STATE *session = (SESSION_STATE *) session_arg;
assert (session != NULL);
// on uninit abort and delete loaddb session
if (session->load_session_p != NULL)
{
session->load_session_p->interrupt ();
session->load_session_p->wait_for_completion ();
delete session->load_session_p;
session->load_session_p = NULL;
}
if (session->pl_session_p)
{
if (thread_p && thread_p->type == TT_WORKER)
{
session->pl_session_p->set_interrupt (er_errid ());
session->pl_session_p->wait_until_pl_session_done ();
}
}
#endif
}