File px_heap_scan_result_handler.hpp¶
File List > cubrid > src > query > parallel > px_heap_scan > px_heap_scan_result_handler.hpp
Go to the documentation of this file
/*
*
* Copyright 2016 CUBRID Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
/*
* px_heap_scan_result_handler.hpp
*/
#ifndef _PX_HEAP_SCAN_RESULT_HANDLER_HPP_
#define _PX_HEAP_SCAN_RESULT_HANDLER_HPP_
#include "query_list.h"
#include "storage_common.h"
#include "thread_entry.hpp"
#include "px_interrupt.hpp"
#include "xasl.h"
#include "px_heap_scan_result_type.hpp"
#include <atomic>
#include <condition_variable>
#include <cstdint>
#include <mutex>
#include <type_traits>
#include <vector>
namespace parallel_heap_scan
{
class list_id_header;
struct read_spec;
union VPID64_t;
class mergeable_list_variables;
class xasl_snapshot_variables;
class mergeable_list_tls;
class xasl_snapshot_tls;
template <RESULT_TYPE result_type>
class result_handler
{
using interrupt = parallel_query::interrupt;
using err_messages_with_lock = parallel_query::err_messages_with_lock;
using read_dest_type = std::conditional_t<result_type == RESULT_TYPE::MERGEABLE_LIST, QFILE_LIST_ID, VAL_LIST>;
using write_dest_type = std::conditional_t<result_type == RESULT_TYPE::MERGEABLE_LIST, OUTPTR_LIST, VAL_LIST>;
using variables =
std::conditional_t<result_type == RESULT_TYPE::MERGEABLE_LIST, mergeable_list_variables, xasl_snapshot_variables>;
using tls = std::conditional_t<result_type == RESULT_TYPE::MERGEABLE_LIST, mergeable_list_tls, xasl_snapshot_tls>;
public:
result_handler (QUERY_ID query_id, interrupt *interrupt_p, err_messages_with_lock *err_messages_p, int parallelism,
bool g_agg_domain_resolve_need, XASL_NODE *orig_xasl_tree_for_domain_resolve);
void read_initialize (THREAD_ENTRY *thread_p);
SCAN_CODE read (THREAD_ENTRY *thread_p, read_dest_type *dest);
void read_finalize (THREAD_ENTRY *thread_p);
void write_initialize (THREAD_ENTRY *thread_p, OUTPTR_LIST *outptr_list, XASL_NODE *curr_xasl, VAL_DESCR *vd);
bool write (THREAD_ENTRY *thread_p, write_dest_type *src);
void write_finalize (THREAD_ENTRY *thread_p);
private:
void get_valid_read_spec ();
/* common */
int m_parallelism;
std::mutex m_result_mutex;
std::condition_variable m_result_cv;
QUERY_ID m_query_id;
interrupt *m_interrupt_p; /* for interrupt */
err_messages_with_lock *m_err_messages_p; /* for error messages */
/* specific */
variables m_;
thread_local static tls tl;
};
class mergeable_list_variables
{
public:
mergeable_list_variables()
: orig_xasl (nullptr),
active_results (0),
is_list_id_domain_resolved (false) {}
~mergeable_list_variables() = default;
std::vector<QFILE_LIST_ID *> writer_results;
std::mutex writer_results_mutex;
XASL_NODE *orig_xasl;
int active_results;
bool is_list_id_domain_resolved;
std::vector<QFILE_LIST_ID *> hgby_results;
bool g_hash_eligible;
};
class xasl_snapshot_variables
{
public:
xasl_snapshot_variables()
: list_id_header_index (0),
current_read_spec (nullptr) {}
~xasl_snapshot_variables() = default;
std::vector<list_id_header> list_id_headers;
std::vector<read_spec> read_specs;
std::atomic_int list_id_header_index;
read_spec *current_read_spec;
};
class mergeable_list_tls
{
public:
mergeable_list_tls()
: writer_result_p (nullptr),
vd (nullptr),
xasl (nullptr),
val_list_domain_resolved (false),
agg_hash_state (HS_NONE),
g_agg_domains_resolved (TRUE) {}
~mergeable_list_tls() = default;
QFILE_LIST_ID *writer_result_p;
QFILE_TUPLE_RECORD tpl_buf;
VAL_DESCR *vd;
XASL_NODE *xasl;
std::vector<DB_VALUE> dbvals_for_domain_resolve;
bool val_list_domain_resolved;
AGGREGATE_HASH_STATE agg_hash_state;
int g_agg_domains_resolved;
};
class xasl_snapshot_tls
{
public:
xasl_snapshot_tls()
: list_id_header_p (nullptr) {}
~xasl_snapshot_tls() = default;
list_id_header *list_id_header_p;
QFILE_TUPLE_RECORD tpl_buf;
};
union VPID64_t
{
uint64_t uint64;
VPID vpid;
};
class list_id_header
{
public:
std::atomic<VPID64_t> m_first_vpid;
std::atomic<VPID64_t> m_last_vpid;
std::atomic<bool> m_list_closed;
std::atomic<bool> m_valid;
QFILE_LIST_ID *m_list_id_p;
std::vector<std::atomic<TP_DOMAIN *>*> m_type_list;
int m_type_cnt;
list_id_header()
: m_first_vpid(), m_last_vpid(), m_list_closed (false), m_valid (false),
m_list_id_p (nullptr), m_type_cnt (0) {}
list_id_header (const list_id_header &other)
: m_first_vpid (other.m_first_vpid.load()),
m_last_vpid (other.m_last_vpid.load()),
m_list_closed (other.m_list_closed.load()),
m_valid (other.m_valid.load()),
m_list_id_p (other.m_list_id_p),
m_type_list (other.m_type_list),
m_type_cnt (other.m_type_cnt) {}
list_id_header (list_id_header &&other) noexcept
: m_first_vpid (other.m_first_vpid.load()),
m_last_vpid (other.m_last_vpid.load()),
m_list_closed (other.m_list_closed.load()),
m_valid (other.m_valid.load()),
m_list_id_p (other.m_list_id_p),
m_type_list (std::move (other.m_type_list)),
m_type_cnt (other.m_type_cnt)
{
other.m_list_id_p = nullptr;
other.m_type_cnt = 0;
other.m_list_closed.store (false);
other.m_valid.store (false);
}
list_id_header &operator= (const list_id_header &other)
{
if (this != &other)
{
m_first_vpid.store (other.m_first_vpid.load());
m_last_vpid.store (other.m_last_vpid.load());
m_list_closed.store (other.m_list_closed.load());
m_valid.store (other.m_valid.load());
m_list_id_p = other.m_list_id_p;
m_type_list = other.m_type_list;
m_type_cnt = other.m_type_cnt;
}
return *this;
}
list_id_header &operator= (list_id_header &&other) noexcept
{
if (this != &other)
{
m_first_vpid.store (other.m_first_vpid.load());
m_last_vpid.store (other.m_last_vpid.load());
m_list_closed.store (other.m_list_closed.load());
m_valid.store (other.m_valid.load());
m_list_id_p = other.m_list_id_p;
m_type_list = std::move (other.m_type_list);
m_type_cnt = other.m_type_cnt;
other.m_list_id_p = nullptr;
other.m_type_cnt = 0;
other.m_list_closed.store (false);
other.m_valid.store (false);
}
return *this;
}
};
struct read_spec
{
list_id_header *list_id_header_p;
bool read_ended;
bool list_scan_id_opened;
QFILE_LIST_SCAN_ID list_scan_id;
};
template <>
class result_handler <RESULT_TYPE::BUILDVALUE_OPT>
{
using interrupt = parallel_query::interrupt;
using err_messages_with_lock = parallel_query::err_messages_with_lock;
using read_dest_type = AGGREGATE_TYPE;
using write_dest_type = AGGREGATE_TYPE;
public:
result_handler (QUERY_ID query_id, interrupt *interrupt_p, err_messages_with_lock *err_messages_p, int parallelism,
AGGREGATE_TYPE *orig_agg_list);
void read_initialize (THREAD_ENTRY *thread_p);
SCAN_CODE read (THREAD_ENTRY *thread_p, read_dest_type *dest);
void read_finalize (THREAD_ENTRY *thread_p);
void write_initialize (THREAD_ENTRY *thread_p, OUTPTR_LIST *outptr_list, write_dest_type *agg_list, VAL_DESCR *vd,
xasl_node *xasl_p);
bool write (THREAD_ENTRY *thread_p);
void write_finalize (THREAD_ENTRY *thread_p);
private:
int m_parallelism;
std::mutex m_result_mutex;
std::condition_variable m_result_cv;
int m_result_completed;
QUERY_ID m_query_id;
interrupt *m_interrupt_p; /* for interrupt */
err_messages_with_lock *m_err_messages_p; /* for error messages */
AGGREGATE_TYPE *m_orig_agg_list;
std::mutex writer_results_mutex;
thread_local static AGGREGATE_TYPE *tl_agg_p;
thread_local static OUTPTR_LIST *tl_outptr_list_p;
thread_local static VAL_DESCR *tl_vd;
thread_local static xasl_node *tl_xasl_p;
thread_local static QFILE_TUPLE_RECORD tl_tpl_buf;
thread_local static OR_BUF tl_or_buf;
};
}
#endif /*_PX_HEAP_SCAN_RESULT_HANDLER_HPP_ */