Program Listing for File thread-pool.h¶
↰ Return to documentation for file (aslam_cv2/aslam_cv_common/include/aslam/common/thread-pool.h
)
#ifndef ASLAM_THREAD_POOL_H
#define ASLAM_THREAD_POOL_H
// Adapted from https://github.com/progschj/ThreadPool on September 3, 2014
//
// Original copyright:
// Copyright (c) 2012 Jakob Progsch
//
// This software is provided 'as-is', without any express or implied
// warranty. In no event will the authors be held liable for any damages
// arising from the use of this software.
//
// Permission is granted to anyone to use this software for any purpose,
// including commercial applications, and to alter it and redistribute it
// freely, subject to the following restrictions:
//
// 1. The origin of this software must not be misrepresented; you must not
// claim that you wrote the original software. If you use this software
// in a product, an acknowledgment in the product documentation would be
// appreciated but is not required.
//
// 2. Altered source versions must be plainly marked as such, and must not be
// misrepresented as being the original software.
//
// 3. This notice may not be removed or altered from any source
// distribution.
#include <condition_variable>
#include <future>
#include <functional>
#include <limits>
#include <memory>
#include <mutex>
#include <queue>
#include <stdexcept>
#include <thread>
#include <unordered_map>
#include <vector>
#include <glog/logging.h>
namespace aslam {
class ThreadPool {
public:
ThreadPool(const size_t numThreads);
~ThreadPool();
template<class Function, class ... Args>
std::future<typename std::result_of<Function(Args...)>::type>
enqueueOrdered(const size_t exclusivity_group_id, Function&& function,
Args&&... args);
template<class Function, class... Args>
std::future<typename std::result_of<Function(Args...)>::type>
enqueue(Function&& function, Args&&... args);
void stop(){ stop_ = true; }
// Number of queued tasks.
size_t numQueuedTasks() const;
void waitForEmptyQueue() const;
static constexpr size_t kGroupdIdNonExclusiveTask =
std::numeric_limits<size_t>::max();
size_t numActiveThreads() const;
private:
// This version is not threadsafe.
size_t numQueuedTasksImpl() const;
void run();
std::vector<std::thread> workers_;
// The group id is a size_t where the number kGroupdIdNonExclusiveTask
// represents a non-exclusive task that needs no guarantees on its execution
// order. All tasks with other group ids have guaranteed execution order that
// corresponds to order of enqueing the task.
typedef std::deque<std::pair<size_t, std::function<void()>>> TaskDeque;
TaskDeque groupid_tasks_;
// The guard map should never contain negative group ids.
typedef std::unordered_map<size_t, bool> GuardMap;
GuardMap groupid_exclusivity_guards_;
// Count the number of non-exclusive tasks in groupid_tasks_;
// where groupid == kGroupdIdNonExclusiveTask.
size_t num_queued_nonexclusive_tasks = 0u;
// A mutex to protect the list of tasks of the group list.
mutable std::mutex tasks_mutex_;
// A condition variable that signals a change in the task queue; either a
// removed or inserted element.
mutable std::condition_variable tasks_queue_change_;
// A counter of active threads
unsigned active_threads_;
// A signal to stop the threads.
volatile bool stop_;
};
// Add new work item to the pool.
template<class Function, class... Args>
std::future<typename std::result_of<Function(Args...)>::type>
ThreadPool::enqueue(Function&& function, Args&&... args) {
return enqueueOrdered(kGroupdIdNonExclusiveTask, function,
std::forward<Args>(args)...);
}
// Add new work item to the pool.
template<class Function, class... Args>
std::future<typename std::result_of<Function(Args...)>::type>
ThreadPool::enqueueOrdered(const size_t exclusivity_group_id,
Function&& function, Args&&... args) {
typedef typename std::result_of<Function(Args...)>::type return_type;
// Don't allow enqueueing after stopping the pool.
if(stop_) {
LOG(ERROR) << "enqueue() called on stopped ThreadPool";
// An empty future will return valid() == false.
return std::future<typename std::result_of<Function(Args...)>::type>();
}
auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(function, std::forward<Args>(args)...));
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(tasks_mutex_);
groupid_tasks_.emplace_back(exclusivity_group_id, [task](){ (*task)();});
// Initialize a group id exclusivity guard.
if (exclusivity_group_id == kGroupdIdNonExclusiveTask) {
++num_queued_nonexclusive_tasks;
} else if (groupid_exclusivity_guards_.count(
exclusivity_group_id) == 0u) {
groupid_exclusivity_guards_.emplace(exclusivity_group_id, false);
}
}
tasks_queue_change_.notify_one();
return res;
}
} // namespace aslam
#endif // ASLAM_THREAD_POOL_H