8 #include <Sawyer/Exception.h>
9 #include <Sawyer/Graph.h>
10 #include <Sawyer/Map.h>
11 #include <Sawyer/Sawyer.h>
12 #include <Sawyer/Stack.h>
14 #include <boost/foreach.hpp>
15 #include <boost/thread/condition_variable.hpp>
16 #include <boost/thread/locks.hpp>
17 #include <boost/thread/mutex.hpp>
18 #include <boost/thread/thread.hpp>
19 #include <boost/version.hpp>
31 template<
class DependencyGraph,
class Functor>
34 DependencyGraph dependencies_;
38 boost::condition_variable workInserted_;
40 boost::thread *workers_;
41 size_t nItemsStarted_;
42 size_t nItemsFinished_;
43 size_t nWorkersRunning_;
44 size_t nWorkersFinished_;
45 std::set<size_t> runningTasks_;
53 : hasStarted_(false), hasWaited_(false), nWorkers_(0), workers_(NULL), nItemsStarted_(0), nItemsFinished_(0),
54 nWorkersRunning_(0), nWorkersFinished_(0) {}
72 : hasStarted_(false), hasWaited_(false), nWorkers_(0), workers_(NULL), nItemsStarted_(0), nItemsFinished_(0),
73 nWorkersRunning_(0), nWorkersFinished_(0) {
103 void start(
const DependencyGraph &dependencies,
size_t nWorkers, Functor functor) {
104 boost::lock_guard<boost::mutex> lock(mutex_);
106 throw std::runtime_error(
"work can start only once per object");
108 dependencies_ = dependencies;
110 nWorkers = boost::thread::hardware_concurrency();
111 nWorkers_ = std::max((
size_t)1, std::min(
nWorkers, dependencies.nVertices()));
112 nItemsStarted_ = nWorkersFinished_ = 0;
113 runningTasks_.clear();
115 startWorkersNS(functor);
123 boost::unique_lock<boost::mutex> lock(mutex_);
124 if (!hasStarted_ || hasWaited_)
129 for (
size_t i=0; i<nWorkers_; ++i)
133 if (dependencies_.nEdges() != 0)
135 dependencies_.clear();
142 template<
class Rep,
class Period>
143 bool tryWaitFor(
const boost::chrono::duration<Rep, Period> &relTime) {
144 const boost::chrono::steady_clock::time_point endAt = boost::chrono::steady_clock::now() + relTime;
145 boost::unique_lock<boost::mutex> lock(mutex_);
146 if (!hasStarted_ || hasWaited_)
150 for (
size_t i = 0; i < nWorkers_; ++i) {
151 if (!workers_[i].try_join_until(endAt))
157 if (dependencies_.nEdges() != 0)
159 dependencies_.clear();
168 void run(
const DependencyGraph &dependencies,
size_t nWorkers, Functor functor) {
177 boost::lock_guard<boost::mutex> lock(mutex_);
178 return !hasStarted_ || nWorkersFinished_ == nWorkers_;
186 boost::lock_guard<boost::mutex> lock(mutex_);
187 return nItemsStarted_;
194 boost::lock_guard<boost::mutex> lock(mutex_);
195 return nItemsFinished_;
203 boost::lock_guard<boost::mutex> lock(mutex_);
204 return runningTasks_;
212 boost::lock_guard<boost::mutex> lock(mutex_);
213 return std::make_pair(nWorkers_-nWorkersFinished_, nWorkersRunning_);
218 void fillWorkQueueNS() {
219 ASSERT_require(workQueue_.
isEmpty());
220 BOOST_FOREACH (
const typename DependencyGraph::Vertex &vertex, dependencies_.vertices()) {
221 if (vertex.nOutEdges() == 0)
222 workQueue_.
push(vertex.id());
227 void startWorkersNS(Functor functor) {
228 workers_ =
new boost::thread[nWorkers_];
229 for (
size_t i=0; i<nWorkers_; ++i)
230 workers_[i] = boost::thread(startWorker,
this, functor);
234 static void startWorker(
ThreadWorkers *
self, Functor functor) {
235 self->worker(functor);
238 void worker(Functor functor) {
241 boost::unique_lock<boost::mutex> lock(mutex_);
242 while (nItemsFinished_ < nItemsStarted_ && workQueue_.
isEmpty())
243 workInserted_.wait(lock);
244 if (nItemsFinished_ == nItemsStarted_ && workQueue_.
isEmpty()) {
248 ASSERT_forbid(workQueue_.
isEmpty());
249 size_t workItemId = workQueue_.
pop();
250 typename DependencyGraph::ConstVertexIterator workVertex = dependencies_.findVertex(workItemId);
251 ASSERT_require(workVertex->nOutEdges() == 0);
252 typename DependencyGraph::VertexValue workItem = workVertex->value();
257 runningTasks_.insert(workItemId);
259 functor(workItemId, workItem);
263 runningTasks_.erase(workItemId);
266 Container::Map<size_t, typename DependencyGraph::ConstVertexIterator> candidateWorkItems;
267 BOOST_FOREACH (
const typename DependencyGraph::Edge &edge, workVertex->inEdges())
268 candidateWorkItems.insert(edge.source()->
id(), edge.source());
269 dependencies_.clearInEdges(workVertex);
270 size_t newWorkInserted = 0;
271 BOOST_FOREACH (const typename DependencyGraph::ConstVertexIterator &candidate, candidateWorkItems.values()) {
272 if (candidate->nOutEdges() == 0) {
273 workQueue_.
push(candidate->id());
279 if (0 == newWorkInserted) {
281 workInserted_.notify_all();
282 }
else if (1 == newWorkInserted) {
284 }
else if (2 == newWorkInserted) {
285 workInserted_.notify_one();
287 workInserted_.notify_all();
314 template<
class DependencyGraph,
class Functor>
316 workInParallel(
const DependencyGraph &dependencies,
size_t nWorkers, Functor functor) {
321 template<
class DependencyGraph,
class Functor,
class Monitor>
323 workInParallel(
const DependencyGraph &dependencies,
size_t nWorkers, Functor functor,
324 Monitor monitor, boost::chrono::milliseconds period) {
326 workers.
start(dependencies, nWorkers, functor);