File px_interrupt.hpp¶
File List > cubrid > src > query > parallel > px_interrupt.hpp
Go to the documentation of this file
/*
*
* Copyright 2016 CUBRID Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#ifndef _PX_INTERRUPT_HPP_
#define _PX_INTERRUPT_HPP_
#include <atomic>
#include <mutex>
#include <vector>
#include "error_context.hpp"
namespace parallel_query
{
class interrupt
{
public:
enum class interrupt_code
{
NO_INTERRUPT,
USER_INTERRUPTED_FROM_MAIN_THREAD,
USER_INTERRUPTED_FROM_WORKER_THREAD,
ERROR_INTERRUPTED_FROM_MAIN_THREAD,
ERROR_INTERRUPTED_FROM_WORKER_THREAD,
INST_NUM_SATISFIED,
JOB_ENDED,
};
std::atomic<interrupt_code> m_code;
inline interrupt_code get_code() const noexcept
{
return m_code.load (std::memory_order_acquire);
}
inline void set_code (interrupt_code code) noexcept
{
m_code.store (code, std::memory_order_release);
}
inline void clear() noexcept
{
m_code.store (interrupt_code::NO_INTERRUPT, std::memory_order_release);
}
inline interrupt (interrupt_code code) : m_code (code) {}
inline interrupt() : m_code (interrupt_code::NO_INTERRUPT) {}
};
class atomic_instnum
{
public:
std::size_t m_destination_tuple_cnt;
std::atomic<std::size_t> m_current_tuple_cnt;
bool m_is_instnum_set;
inline atomic_instnum() : m_destination_tuple_cnt (0), m_current_tuple_cnt (0), m_is_instnum_set (false) {}
inline atomic_instnum (std::size_t destination_tuple_cnt) : m_destination_tuple_cnt (destination_tuple_cnt),
m_current_tuple_cnt (0), m_is_instnum_set (true) {}
inline void set_destination_tuple_cnt (std::size_t destination_tuple_cnt) noexcept
{
m_destination_tuple_cnt = destination_tuple_cnt;
m_is_instnum_set = true;
}
inline bool is_instnum_satisfies_after_1tuple_insert() noexcept
{
return m_is_instnum_set?m_current_tuple_cnt.fetch_add (1) >= m_destination_tuple_cnt:false;
}
};
class err_messages_with_lock
{
using er_message = cuberr::er_message;
public:
std::mutex m_mutex;
std::vector<er_message *> m_error_messages;
err_messages_with_lock()
:m_mutex (),
m_error_messages ()
{}
~err_messages_with_lock()
{
for (auto *msg : m_error_messages)
{
delete msg;
}
m_error_messages.clear();
}
inline int move_top_error_message_to_this ()
{
int err_id = 0;
std::lock_guard<std::mutex> lock (m_mutex);
m_error_messages.push_back (new cuberr::er_message (false));
err_id = cuberr::context::get_thread_local_context ().get_current_error_level ().err_id;
m_error_messages.back()->swap (cuberr::context::get_thread_local_context ().get_current_error_level ());
return err_id;
}
};
}
#endif