Skip to content

File px_hash_join_task_manager.hpp

File List > cubrid > src > query > parallel > px_hash_join > px_hash_join_task_manager.hpp

Go to the documentation of this file

/*
 * Copyright 2008 Search Solution Corporation
 * Copyright 2016 CUBRID Corporation
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 *
 */

/*
 * px_hash_join_task_manager.hpp
 */

#pragma once

#include "query_hash_join.h"

#include <atomic>
#include <condition_variable>
#include <mutex>

#include "error_context.hpp"        /* cuberr::context */
#include "px_worker_manager.hpp"    /* parallel_query::worker_manager */
#include "storage_common.h"     /* NULL_TRAN_INDEX */
#include "thread_entry.hpp"     /* cubthread::entry */
#include "thread_entry_task.hpp"    /* cubthread::entry_task */

/*
 * Forward Declarations
 */

struct qmgr_temp_file;

typedef struct qmgr_temp_file QMGR_TEMP_FILE;

/*
 * Class Definitions
 */

namespace parallel_query
{
  namespace hash_join
  {
    /* Forward Declarations */
    class base_task;

    /*
     * task_manager
     */

    class task_manager
    {
      public:
    task_manager (worker_manager *worker_manager, cubthread::entry &main_thread_ref);

    inline cubthread::entry &get_main_thread_ref () const noexcept
    {
      return m_main_thread_ref;
    }

    void push_task (base_task *task);
    void end_task ();
    void join ();

    inline bool has_error () const noexcept
    {
      return m_has_error.load (std::memory_order_acquire);
    }

    void handle_error (cubthread::entry &thread_ref);
    void notify_stop ();

    bool check_interrupt (cubthread::entry &thread_ref);
    void clear_interrupt (cubthread::entry &thread_ref);

      private:
    worker_manager *m_worker_manager;

    cubthread::entry &m_main_thread_ref;
    cuberr::context &m_main_error_context;

    std::condition_variable m_all_tasks_done_cv;
    std::mutex m_active_tasks_mutex;
    int m_active_tasks;

    std::atomic<bool> m_has_error;
    };

    /*
     * task_execution_guard - RAII helper that sets up worker thread context (main thread emulation and resource tracking)
     */

    class task_execution_guard
    {
      public:
    inline task_execution_guard (cubthread::entry &thread_ref, task_manager &task_manager)
      : m_thread_ref (thread_ref)
    {
      cubthread::entry &main_thread_ref = task_manager.get_main_thread_ref ();

      m_thread_ref.m_px_orig_thread_entry = &main_thread_ref;
      m_thread_ref.conn_entry = main_thread_ref.conn_entry;
      m_thread_ref.tran_index = main_thread_ref.tran_index;
      m_thread_ref.on_trace = main_thread_ref.on_trace;

      assert (m_thread_ref.conn_entry != nullptr);
      assert (m_thread_ref.tran_index != NULL_TRAN_INDEX);

      m_thread_ref.push_resource_tracks ();
    }

    inline ~task_execution_guard ()
    {
      m_thread_ref.conn_entry = nullptr;
      m_thread_ref.on_trace = false;

      m_thread_ref.pop_resource_tracks ();
    }

      private:
    cubthread::entry &m_thread_ref;
    };

    /*
     * base_task
     */

    class base_task: public cubthread::entry_task
    {
      public:
    base_task (task_manager &task_manager, HASHJOIN_MANAGER *manager, int index);
    void retire () override;

      protected:
    task_manager &m_task_manager;
    HASHJOIN_MANAGER *m_manager;
    const int m_index;
    };

    /*
     * split_task
     */

    class split_task: public base_task
    {
      public:
    split_task (task_manager &task_manager, HASHJOIN_MANAGER *manager, HASHJOIN_INPUT_SPLIT_INFO *split_info,
            HASHJOIN_SHARED_SPLIT_INFO *shared_info, int index);
    void execute (cubthread::entry &thread_ref) override;

      private:
    HASHJOIN_INPUT_SPLIT_INFO *m_split_info;
    HASHJOIN_SHARED_SPLIT_INFO *m_shared_info;

    /* per-thread membuf iteration state: -1 = not owner, (>= 0) = current membuf page index */
    int m_membuf_index;

    /* per-thread sector iteration state */
    int m_sector_index;     /* current sector index in page_map, -1 = need next sector */
    UINT64 m_current_bitmap;    /* remaining page bits in current sector */
    VSID m_current_vsid;        /* current sector VSID */
    QMGR_TEMP_FILE *m_current_tfile;    /* tfile that owns the last returned page */

    PAGE_PTR get_next_page (cubthread::entry &thread_ref);
    };

    /*
     * join_task
     */

    class join_task: public base_task
    {
      public:
    join_task (task_manager &task_manager, HASHJOIN_MANAGER *manager,HASHJOIN_CONTEXT *contexts,
           HASHJOIN_SHARED_JOIN_INFO *shared_info, int index);
    void execute (cubthread::entry &thread_ref) override;

      private:
    HASHJOIN_CONTEXT *m_contexts;
    HASHJOIN_SHARED_JOIN_INFO *m_shared_info;

    HASHJOIN_CONTEXT *get_next_context ();
    };
  } /* namespace hash_join */
} /* namespace parallel_query */