ROSE  0.11.96.11
ParallelSort.h
1 // Parallel sorting using multiple threads. See ParallelSort::quicksort() near the end of this file.
2 #ifndef ROSE_ParallelSort_H
3 #define ROSE_ParallelSort_H
4 
5 #include <boost/thread.hpp>
6 #include <list>
7 #include <vector>
8 
9 namespace Rose {
10 
21 namespace ParallelSort {
22 
23 // This stuff is all private but useful for any parallel sort algorithm.
24 namespace Private {
25 
26 // A unit of work. The values to be worked on are specified by a begin (inclusive) and end (exclusive) iterator.
27 template<class RandomAccessIterator>
28 struct Work {
29  RandomAccessIterator begin, end;
30  Work(RandomAccessIterator begin, RandomAccessIterator end): begin(begin), end(end) {}
31 };
32 
33 // Information about a sorting job. A job is generated when the user requests a sort, and each job may have multiple threads.
34 template<class RandomAccessIterator, class Compare>
35 struct Job {
36  Compare compare; // functor to compare two values, like for std::sort()
37  boost::condition_variable worklistInsertion; // signaled when something is added to the worklist
38  static const int multiThreshold = 10000; // size at which to start multi-threading
39  boost::mutex mutex; // protects all of the following data members
40  std::list<Work<RandomAccessIterator> > worklist; // non-overlapping stuff that needs to be sorted
41  size_t npending; // number of workers that are processing jobs that aren't in worklist
42  Job(Compare compare): compare(compare), npending(0) {}
43 };
44 
45 // Somewhat like std::partition(). Partitions the iterator range into two parts according to the value at the pivot iterator
46 // and returns an iterator for the beginning of the second part. The values of the first part will all be less than the pivot
47 // value, and the values of the second part will all be greater than or equal to the pivot value. The values are in no
48 // particular order.
49 template<class RandomAccessIterator, class Compare>
50 RandomAccessIterator partition(RandomAccessIterator begin, RandomAccessIterator end,
51  RandomAccessIterator pivot, Compare compare) {
52  assert(begin < end);
53  assert(pivot >= begin && pivot < end);
54  std::swap(*pivot, *(end-1)); // temporarily move the pivot out of the way
55  for (RandomAccessIterator i=pivot=begin; i+1<end; ++i) {
56  if (compare(*i, *(end-1)))
57  std::swap(*i, *pivot++);
58  }
59  std::swap(*(end-1), *pivot);
60  return pivot;
61 }
62 
63 // Add work to the work list
64 template<class RandomAccessIterator, class Compare>
65 void addWork(Job<RandomAccessIterator, Compare> &job, const Work<RandomAccessIterator> &work) {
66  boost::lock_guard<boost::mutex> lock(job.mutex);
67  job.worklist.push_back(work);
68  job.worklistInsertion.notify_one();
69 }
70 
71 // Sorts one unit of work, adding additional items to the worklist if necessary.
72 template<class RandomAccessIterator, class Compare>
73 void quicksort(Job<RandomAccessIterator, Compare> &job, Work<RandomAccessIterator> work) {
74  while (work.end - work.begin > 1) {
75  if (work.end - work.begin < job.multiThreshold) {
76  std::sort(work.begin, work.end, job.compare);
77  return;
78  } else {
79  RandomAccessIterator pivot = work.begin + (work.end - work.begin) / 2; // assuming fairly even distribution
80  pivot = partition(work.begin, work.end, pivot, job.compare);
81  addWork(job, Work<RandomAccessIterator>(pivot+1, work.end));
82  work.end = pivot;
83  }
84  }
85 }
86 
87 // A worker thread.
88 template<class RandomAccessIterator, class Compare>
89 struct Worker {
91  size_t id; // only for debugging
92  Worker(Job<RandomAccessIterator, Compare> &job, size_t id): job(job), id(id) {}
93  void operator()() {
94  boost::unique_lock<boost::mutex> lock(job.mutex);
95  while (true) {
96  // Get the next unit of work. If no workers are working and the worklist is empty then we're all done.
97  while (job.worklist.empty() && job.npending>0)
98  job.worklistInsertion.wait(lock);
99  if (job.worklist.empty())
100  return;
101  Work<RandomAccessIterator> work = job.worklist.front();
102  job.worklist.pop_front();
103  ++job.npending;
104 
105  // Sort that unit of work
106  lock.unlock();
107  quicksort(job, work);
108  lock.lock();
109 
110  // Indicate that the work is completed
111  assert(job.npending>0);
112  if (0==--job.npending && job.worklist.empty())
113  job.worklistInsertion.notify_all();
114  }
115  }
116 };
117 
118 } // namespace
119 
120 
139 template<class RandomAccessIterator, class Compare>
140 void quicksort(RandomAccessIterator begin, RandomAccessIterator end, Compare compare, size_t nthreads) {
141  assert(begin < end);
142  using namespace Private;
143 
144  Job<RandomAccessIterator, Compare> job(compare);
145  addWork(job, Work<RandomAccessIterator>(begin, end));
146 
147  // Start worker threads (we can't assume containers with move semantics, so use an array)
148  size_t nworkers = std::max(nthreads, (size_t)1) - 1;
149  boost::thread *workers = new boost::thread[nworkers];
150  for (size_t i=0; i<nworkers; ++i)
151  workers[i] = boost::thread(Worker<RandomAccessIterator, Compare>(job, i+1));
152 
153  // Participate in the work ourselves (we might be the only thread!)
154  Worker<RandomAccessIterator, Compare>(job, 0)();
155 
156  // Wait for all the threads to finish
157  for (size_t i=0; i<nworkers; ++i)
158  workers[i].join();
159 }
160 
161 } // namespace
162 } // namespace
163 
164 #endif
Rose::ParallelSort::Private::Worker
Definition: ParallelSort.h:89
Rose::ParallelSort::quicksort
void quicksort(RandomAccessIterator begin, RandomAccessIterator end, Compare compare, size_t nthreads)
Sort values in parallel.
Definition: ParallelSort.h:140
mutex
Definition: Cxx_Grammar.h:5593
Rose::ParallelSort::Private::Work
Definition: ParallelSort.h:28
Rose
Main namespace for the ROSE library.
Definition: BinaryTutorial.dox:3
Rose::ParallelSort::Private::Job
Definition: ParallelSort.h:35