Skip to content

File thread_worker_pool_taskcap.cpp

File List > cubrid > src > thread > thread_worker_pool_taskcap.cpp

Go to the documentation of this file

/*
 * Copyright 2008 Search Solution Corporation
 * Copyright 2016 CUBRID Corporation
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 *
 */

//
// cap the number of tasks that are pushed into an worker pool and block incoming pushes when maxed
//

#include "thread_worker_pool_taskcap.hpp"

// XXX: SHOULD BE THE LAST INCLUDE HEADER
#include "memory_wrapper.hpp"

// worker_pool_task_capper template implementation

namespace cubthread
{
  worker_pool_task_capper::worker_pool_task_capper (worker_pool *worker_pool)
    : m_worker_pool (worker_pool)
    , m_tasks_available (0)
    , m_max_tasks (0)
    , m_mutex ()
    , m_cond_var ()
  {
    m_tasks_available = m_max_tasks = worker_pool->get_worker_count ();
  }

  bool
  worker_pool_task_capper::try_task (task<entry> *task)
  {
    std::unique_lock<std::mutex> ulock (m_mutex);

    if (m_tasks_available == 0)
      {
    // is full
    return false;
      }

    execute (task);
    return true;
  }

  void
  worker_pool_task_capper::push_task (task<entry> *task)
  {
    std::unique_lock<std::mutex> ulock (m_mutex);

    auto pred = [&] () -> bool { return (m_tasks_available > 0); };
    m_cond_var.wait (ulock, pred);

    // Make sure we have the lock.
    assert (ulock.owns_lock ());

    execute (task);
  }

  void
  worker_pool_task_capper::execute (task<entry> *task)
  {
    // Safeguard.
    assert (m_tasks_available > 0);

    m_tasks_available--;
    m_worker_pool->execute (new capped_task (*this, task));
  }

  void worker_pool_task_capper::end_task ()
  {
    std::unique_lock<std::mutex> ulock (m_mutex);
    m_tasks_available++;

    // Safeguard
    assert (m_tasks_available <= m_max_tasks && m_tasks_available > 0);

    ulock.unlock ();
    m_cond_var.notify_all ();
  }

  worker_pool *worker_pool_task_capper::get_worker_pool () const
  {
    return m_worker_pool;
  }

  worker_pool_task_capper::capped_task::capped_task (worker_pool_task_capper &capper, task<entry> *task)
    : m_capper (capper)
    , m_nested_task (task)
  {
  }

  worker_pool_task_capper::capped_task::~capped_task ()
  {
    m_nested_task->retire ();
  }

  void
  worker_pool_task_capper::capped_task::execute (entry &ctx)
  {
    m_nested_task->execute (ctx);
    m_capper.end_task ();
  }
}