Home | History | Annotate | Line # | Download | only in Support
      1 //===- llvm/Support/Parallel.h - Parallel algorithms ----------------------===//
      2 //
      3 // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
      4 // See https://llvm.org/LICENSE.txt for license information.
      5 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
      6 //
      7 //===----------------------------------------------------------------------===//
      8 
      9 #ifndef LLVM_SUPPORT_PARALLEL_H
     10 #define LLVM_SUPPORT_PARALLEL_H
     11 
     12 #include "llvm/ADT/STLExtras.h"
     13 #include "llvm/Config/llvm-config.h"
     14 #include "llvm/Support/Error.h"
     15 #include "llvm/Support/MathExtras.h"
     16 #include "llvm/Support/Threading.h"
     17 
     18 #include <algorithm>
     19 #include <condition_variable>
     20 #include <functional>
     21 #include <mutex>
     22 
     23 namespace llvm {
     24 
     25 namespace parallel {
     26 
     27 // Strategy for the default executor used by the parallel routines provided by
     28 // this file. It defaults to using all hardware threads and should be
     29 // initialized before the first use of parallel routines.
     30 extern ThreadPoolStrategy strategy;
     31 
     32 namespace detail {
     33 
     34 #if LLVM_ENABLE_THREADS
     35 
     36 class Latch {
     37   uint32_t Count;
     38   mutable std::mutex Mutex;
     39   mutable std::condition_variable Cond;
     40 
     41 public:
     42   explicit Latch(uint32_t Count = 0) : Count(Count) {}
     43   ~Latch() { sync(); }
     44 
     45   void inc() {
     46     std::lock_guard<std::mutex> lock(Mutex);
     47     ++Count;
     48   }
     49 
     50   void dec() {
     51     std::lock_guard<std::mutex> lock(Mutex);
     52     if (--Count == 0)
     53       Cond.notify_all();
     54   }
     55 
     56   void sync() const {
     57     std::unique_lock<std::mutex> lock(Mutex);
     58     Cond.wait(lock, [&] { return Count == 0; });
     59   }
     60 };
     61 
     62 class TaskGroup {
     63   Latch L;
     64   bool Parallel;
     65 
     66 public:
     67   TaskGroup();
     68   ~TaskGroup();
     69 
     70   void spawn(std::function<void()> f);
     71 
     72   void sync() const { L.sync(); }
     73 };
     74 
     75 const ptrdiff_t MinParallelSize = 1024;
     76 
     77 /// Inclusive median.
     78 template <class RandomAccessIterator, class Comparator>
     79 RandomAccessIterator medianOf3(RandomAccessIterator Start,
     80                                RandomAccessIterator End,
     81                                const Comparator &Comp) {
     82   RandomAccessIterator Mid = Start + (std::distance(Start, End) / 2);
     83   return Comp(*Start, *(End - 1))
     84              ? (Comp(*Mid, *(End - 1)) ? (Comp(*Start, *Mid) ? Mid : Start)
     85                                        : End - 1)
     86              : (Comp(*Mid, *Start) ? (Comp(*(End - 1), *Mid) ? Mid : End - 1)
     87                                    : Start);
     88 }
     89 
     90 template <class RandomAccessIterator, class Comparator>
     91 void parallel_quick_sort(RandomAccessIterator Start, RandomAccessIterator End,
     92                          const Comparator &Comp, TaskGroup &TG, size_t Depth) {
     93   // Do a sequential sort for small inputs.
     94   if (std::distance(Start, End) < detail::MinParallelSize || Depth == 0) {
     95     llvm::sort(Start, End, Comp);
     96     return;
     97   }
     98 
     99   // Partition.
    100   auto Pivot = medianOf3(Start, End, Comp);
    101   // Move Pivot to End.
    102   std::swap(*(End - 1), *Pivot);
    103   Pivot = std::partition(Start, End - 1, [&Comp, End](decltype(*Start) V) {
    104     return Comp(V, *(End - 1));
    105   });
    106   // Move Pivot to middle of partition.
    107   std::swap(*Pivot, *(End - 1));
    108 
    109   // Recurse.
    110   TG.spawn([=, &Comp, &TG] {
    111     parallel_quick_sort(Start, Pivot, Comp, TG, Depth - 1);
    112   });
    113   parallel_quick_sort(Pivot + 1, End, Comp, TG, Depth - 1);
    114 }
    115 
    116 template <class RandomAccessIterator, class Comparator>
    117 void parallel_sort(RandomAccessIterator Start, RandomAccessIterator End,
    118                    const Comparator &Comp) {
    119   TaskGroup TG;
    120   parallel_quick_sort(Start, End, Comp, TG,
    121                       llvm::Log2_64(std::distance(Start, End)) + 1);
    122 }
    123 
    124 // TaskGroup has a relatively high overhead, so we want to reduce
    125 // the number of spawn() calls. We'll create up to 1024 tasks here.
    126 // (Note that 1024 is an arbitrary number. This code probably needs
    127 // improving to take the number of available cores into account.)
    128 enum { MaxTasksPerGroup = 1024 };
    129 
    130 template <class IterTy, class FuncTy>
    131 void parallel_for_each(IterTy Begin, IterTy End, FuncTy Fn) {
    132   // If we have zero or one items, then do not incur the overhead of spinning up
    133   // a task group.  They are surprisingly expensive, and because they do not
    134   // support nested parallelism, a single entry task group can block parallel
    135   // execution underneath them.
    136   auto NumItems = std::distance(Begin, End);
    137   if (NumItems <= 1) {
    138     if (NumItems)
    139       Fn(*Begin);
    140     return;
    141   }
    142 
    143   // Limit the number of tasks to MaxTasksPerGroup to limit job scheduling
    144   // overhead on large inputs.
    145   ptrdiff_t TaskSize = NumItems / MaxTasksPerGroup;
    146   if (TaskSize == 0)
    147     TaskSize = 1;
    148 
    149   TaskGroup TG;
    150   while (TaskSize < std::distance(Begin, End)) {
    151     TG.spawn([=, &Fn] { std::for_each(Begin, Begin + TaskSize, Fn); });
    152     Begin += TaskSize;
    153   }
    154   std::for_each(Begin, End, Fn);
    155 }
    156 
    157 template <class IndexTy, class FuncTy>
    158 void parallel_for_each_n(IndexTy Begin, IndexTy End, FuncTy Fn) {
    159   // If we have zero or one items, then do not incur the overhead of spinning up
    160   // a task group.  They are surprisingly expensive, and because they do not
    161   // support nested parallelism, a single entry task group can block parallel
    162   // execution underneath them.
    163   auto NumItems = End - Begin;
    164   if (NumItems <= 1) {
    165     if (NumItems)
    166       Fn(Begin);
    167     return;
    168   }
    169 
    170   // Limit the number of tasks to MaxTasksPerGroup to limit job scheduling
    171   // overhead on large inputs.
    172   ptrdiff_t TaskSize = NumItems / MaxTasksPerGroup;
    173   if (TaskSize == 0)
    174     TaskSize = 1;
    175 
    176   TaskGroup TG;
    177   IndexTy I = Begin;
    178   for (; I + TaskSize < End; I += TaskSize) {
    179     TG.spawn([=, &Fn] {
    180       for (IndexTy J = I, E = I + TaskSize; J != E; ++J)
    181         Fn(J);
    182     });
    183   }
    184   for (IndexTy J = I; J < End; ++J)
    185     Fn(J);
    186 }
    187 
    188 template <class IterTy, class ResultTy, class ReduceFuncTy,
    189           class TransformFuncTy>
    190 ResultTy parallel_transform_reduce(IterTy Begin, IterTy End, ResultTy Init,
    191                                    ReduceFuncTy Reduce,
    192                                    TransformFuncTy Transform) {
    193   // Limit the number of tasks to MaxTasksPerGroup to limit job scheduling
    194   // overhead on large inputs.
    195   size_t NumInputs = std::distance(Begin, End);
    196   if (NumInputs == 0)
    197     return std::move(Init);
    198   size_t NumTasks = std::min(static_cast<size_t>(MaxTasksPerGroup), NumInputs);
    199   std::vector<ResultTy> Results(NumTasks, Init);
    200   {
    201     // Each task processes either TaskSize or TaskSize+1 inputs. Any inputs
    202     // remaining after dividing them equally amongst tasks are distributed as
    203     // one extra input over the first tasks.
    204     TaskGroup TG;
    205     size_t TaskSize = NumInputs / NumTasks;
    206     size_t RemainingInputs = NumInputs % NumTasks;
    207     IterTy TBegin = Begin;
    208     for (size_t TaskId = 0; TaskId < NumTasks; ++TaskId) {
    209       IterTy TEnd = TBegin + TaskSize + (TaskId < RemainingInputs ? 1 : 0);
    210       TG.spawn([=, &Transform, &Reduce, &Results] {
    211         // Reduce the result of transformation eagerly within each task.
    212         ResultTy R = Init;
    213         for (IterTy It = TBegin; It != TEnd; ++It)
    214           R = Reduce(R, Transform(*It));
    215         Results[TaskId] = R;
    216       });
    217       TBegin = TEnd;
    218     }
    219     assert(TBegin == End);
    220   }
    221 
    222   // Do a final reduction. There are at most 1024 tasks, so this only adds
    223   // constant single-threaded overhead for large inputs. Hopefully most
    224   // reductions are cheaper than the transformation.
    225   ResultTy FinalResult = std::move(Results.front());
    226   for (ResultTy &PartialResult :
    227        makeMutableArrayRef(Results.data() + 1, Results.size() - 1))
    228     FinalResult = Reduce(FinalResult, std::move(PartialResult));
    229   return std::move(FinalResult);
    230 }
    231 
    232 #endif
    233 
    234 } // namespace detail
    235 } // namespace parallel
    236 
    237 template <class RandomAccessIterator,
    238           class Comparator = std::less<
    239               typename std::iterator_traits<RandomAccessIterator>::value_type>>
    240 void parallelSort(RandomAccessIterator Start, RandomAccessIterator End,
    241                   const Comparator &Comp = Comparator()) {
    242 #if LLVM_ENABLE_THREADS
    243   if (parallel::strategy.ThreadsRequested != 1) {
    244     parallel::detail::parallel_sort(Start, End, Comp);
    245     return;
    246   }
    247 #endif
    248   llvm::sort(Start, End, Comp);
    249 }
    250 
    251 template <class IterTy, class FuncTy>
    252 void parallelForEach(IterTy Begin, IterTy End, FuncTy Fn) {
    253 #if LLVM_ENABLE_THREADS
    254   if (parallel::strategy.ThreadsRequested != 1) {
    255     parallel::detail::parallel_for_each(Begin, End, Fn);
    256     return;
    257   }
    258 #endif
    259   std::for_each(Begin, End, Fn);
    260 }
    261 
    262 template <class FuncTy>
    263 void parallelForEachN(size_t Begin, size_t End, FuncTy Fn) {
    264 #if LLVM_ENABLE_THREADS
    265   if (parallel::strategy.ThreadsRequested != 1) {
    266     parallel::detail::parallel_for_each_n(Begin, End, Fn);
    267     return;
    268   }
    269 #endif
    270   for (size_t I = Begin; I != End; ++I)
    271     Fn(I);
    272 }
    273 
    274 template <class IterTy, class ResultTy, class ReduceFuncTy,
    275           class TransformFuncTy>
    276 ResultTy parallelTransformReduce(IterTy Begin, IterTy End, ResultTy Init,
    277                                  ReduceFuncTy Reduce,
    278                                  TransformFuncTy Transform) {
    279 #if LLVM_ENABLE_THREADS
    280   if (parallel::strategy.ThreadsRequested != 1) {
    281     return parallel::detail::parallel_transform_reduce(Begin, End, Init, Reduce,
    282                                                        Transform);
    283   }
    284 #endif
    285   for (IterTy I = Begin; I != End; ++I)
    286     Init = Reduce(std::move(Init), Transform(*I));
    287   return std::move(Init);
    288 }
    289 
    290 // Range wrappers.
    291 template <class RangeTy,
    292           class Comparator = std::less<decltype(*std::begin(RangeTy()))>>
    293 void parallelSort(RangeTy &&R, const Comparator &Comp = Comparator()) {
    294   parallelSort(std::begin(R), std::end(R), Comp);
    295 }
    296 
    297 template <class RangeTy, class FuncTy>
    298 void parallelForEach(RangeTy &&R, FuncTy Fn) {
    299   parallelForEach(std::begin(R), std::end(R), Fn);
    300 }
    301 
    302 template <class RangeTy, class ResultTy, class ReduceFuncTy,
    303           class TransformFuncTy>
    304 ResultTy parallelTransformReduce(RangeTy &&R, ResultTy Init,
    305                                  ReduceFuncTy Reduce,
    306                                  TransformFuncTy Transform) {
    307   return parallelTransformReduce(std::begin(R), std::end(R), Init, Reduce,
    308                                  Transform);
    309 }
    310 
    311 // Parallel for-each, but with error handling.
    312 template <class RangeTy, class FuncTy>
    313 Error parallelForEachError(RangeTy &&R, FuncTy Fn) {
    314   // The transform_reduce algorithm requires that the initial value be copyable.
    315   // Error objects are uncopyable. We only need to copy initial success values,
    316   // so work around this mismatch via the C API. The C API represents success
    317   // values with a null pointer. The joinErrors discards null values and joins
    318   // multiple errors into an ErrorList.
    319   return unwrap(parallelTransformReduce(
    320       std::begin(R), std::end(R), wrap(Error::success()),
    321       [](LLVMErrorRef Lhs, LLVMErrorRef Rhs) {
    322         return wrap(joinErrors(unwrap(Lhs), unwrap(Rhs)));
    323       },
    324       [&Fn](auto &&V) { return wrap(Fn(V)); }));
    325 }
    326 
    327 } // namespace llvm
    328 
    329 #endif // LLVM_SUPPORT_PARALLEL_H
    330