Skip to content

File px_worker_manager_global.cpp

File List > cubrid > src > query > parallel > px_worker_manager_global.cpp

Go to the documentation of this file

/*
 *
 * Copyright 2016 CUBRID Corporation
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 *
 */

/*
 * px_worker_manager_global.cpp - module that manages parallel worker threads.
 */

#if !defined (SERVER_MODE) && !defined (SA_MODE)
#error Belongs to server module
#endif /* !defined (SERVER_MODE) && !defined (SA_MODE) */

#include "px_worker_manager_global.hpp"
#include "system_parameter.h"

// XXX: SHOULD BE THE LAST INCLUDE HEADER
#include "memory_wrapper.hpp"

namespace parallel_query
{
  worker_manager_global::worker_manager_global()
    : m_worker_pool (nullptr),
      m_init_flag (),
      m_available (0),
      m_capacity (0)
  {
  }

  worker_manager_global::~worker_manager_global()
  {
    destroy();
  }

  REGISTER_WORKERPOOL (parallel_query, []()
  {
    return prm_get_integer_value (PRM_ID_MAX_PARALLEL_WORKERS);
  });

  void worker_manager_global::init()
  {
    std::call_once (m_init_flag, [this] ()
    {
      int max_parallel_workers = prm_get_integer_value (PRM_ID_MAX_PARALLEL_WORKERS);
      if (max_parallel_workers < 2)
    {
      /* parallel execution requires at least 2 workers */
      assert (m_worker_pool == nullptr);
      return;
    }

      int pool_size = max_parallel_workers;

      assert (m_worker_pool == nullptr);

      m_worker_pool = thread_create_worker_pool (pool_size, 1, "parallel-query", thread_get_entry_manager ());
      // m_log = false

      if (m_worker_pool == nullptr)
    {
      return;
    }

      m_capacity = max_parallel_workers;
      m_available = max_parallel_workers;
    });
  }

  void worker_manager_global::destroy()
  {
    if (m_worker_pool == nullptr)
      {
    /* init() was not called or failed */
    return;
      }

    /* all workers should be released before destroy */
    assert (m_available.load () == m_capacity);

    cubthread::get_manager()->destroy_worker_pool (m_worker_pool);
    m_worker_pool = nullptr;
  }

  int worker_manager_global::try_reserve_workers (const int num_workers)
  {
    assert (num_workers > 0);
    assert (num_workers <= PRM_MAX_PARALLELISM);

    /* safe-guard */
    if (num_workers <= 0)
      {
    return 0;
      }

    /* safe-guard */
    int requested = MIN (num_workers, PRM_MAX_PARALLELISM);

    /* minimum parallel degree:
     * - 2 for parallel execution (heap scan, hash join, sort)
     * - 1 for parallel subquery (main thread + 1 worker for uncorrelated subquery)
     */
    const int min_degree = (requested == 1) ? 1 : 2;

    int available = m_available.load ();

    while (true)
      {
    /* check if enough workers available */
    if (available < min_degree)
      {
        return 0;
      }

    /* reserve as many as possible, up to requested */
    int reserved = (requested <= available) ? requested : available;

    if (m_available.compare_exchange_weak (available, available - reserved))
      {
        return reserved;
      }

    /* CAS failed: available is updated with actual value, retry */
    std::this_thread::yield ();
      }
  }

  void worker_manager_global::release_workers (const int num_workers)
  {
    assert (num_workers > 0);
    assert (m_worker_pool != nullptr);
    assert (m_available.load () + num_workers <= m_capacity);

    /* safe-guard */
    if (num_workers <= 0)
      {
    return;
      }

    m_available.fetch_add (num_workers);
  }

  void worker_manager_global::push_task (cubthread::entry_task *task)
  {
    assert (task != nullptr);
    assert (m_worker_pool != nullptr);
    cubthread::get_manager()->push_task (m_worker_pool, task);
  }
}