1 //===- llvm/Support/Parallel.h - Parallel algorithms ----------------------===// 2 // 3 // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. 4 // See https://llvm.org/LICENSE.txt for license information. 5 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception 6 // 7 //===----------------------------------------------------------------------===// 8 9 #ifndef LLVM_SUPPORT_PARALLEL_H 10 #define LLVM_SUPPORT_PARALLEL_H 11 12 #include "llvm/ADT/STLExtras.h" 13 #include "llvm/Config/llvm-config.h" 14 #include "llvm/Support/Error.h" 15 #include "llvm/Support/MathExtras.h" 16 #include "llvm/Support/Threading.h" 17 18 #include <algorithm> 19 #include <condition_variable> 20 #include <functional> 21 #include <mutex> 22 23 namespace llvm { 24 25 namespace parallel { 26 27 // Strategy for the default executor used by the parallel routines provided by 28 // this file. It defaults to using all hardware threads and should be 29 // initialized before the first use of parallel routines. 30 extern ThreadPoolStrategy strategy; 31 32 namespace detail { 33 34 #if LLVM_ENABLE_THREADS 35 36 class Latch { 37 uint32_t Count; 38 mutable std::mutex Mutex; 39 mutable std::condition_variable Cond; 40 41 public: 42 explicit Latch(uint32_t Count = 0) : Count(Count) {} 43 ~Latch() { sync(); } 44 45 void inc() { 46 std::lock_guard<std::mutex> lock(Mutex); 47 ++Count; 48 } 49 50 void dec() { 51 std::lock_guard<std::mutex> lock(Mutex); 52 if (--Count == 0) 53 Cond.notify_all(); 54 } 55 56 void sync() const { 57 std::unique_lock<std::mutex> lock(Mutex); 58 Cond.wait(lock, [&] { return Count == 0; }); 59 } 60 }; 61 62 class TaskGroup { 63 Latch L; 64 bool Parallel; 65 66 public: 67 TaskGroup(); 68 ~TaskGroup(); 69 70 void spawn(std::function<void()> f); 71 72 void sync() const { L.sync(); } 73 }; 74 75 const ptrdiff_t MinParallelSize = 1024; 76 77 /// Inclusive median. 78 template <class RandomAccessIterator, class Comparator> 79 RandomAccessIterator medianOf3(RandomAccessIterator Start, 80 RandomAccessIterator End, 81 const Comparator &Comp) { 82 RandomAccessIterator Mid = Start + (std::distance(Start, End) / 2); 83 return Comp(*Start, *(End - 1)) 84 ? (Comp(*Mid, *(End - 1)) ? (Comp(*Start, *Mid) ? Mid : Start) 85 : End - 1) 86 : (Comp(*Mid, *Start) ? (Comp(*(End - 1), *Mid) ? Mid : End - 1) 87 : Start); 88 } 89 90 template <class RandomAccessIterator, class Comparator> 91 void parallel_quick_sort(RandomAccessIterator Start, RandomAccessIterator End, 92 const Comparator &Comp, TaskGroup &TG, size_t Depth) { 93 // Do a sequential sort for small inputs. 94 if (std::distance(Start, End) < detail::MinParallelSize || Depth == 0) { 95 llvm::sort(Start, End, Comp); 96 return; 97 } 98 99 // Partition. 100 auto Pivot = medianOf3(Start, End, Comp); 101 // Move Pivot to End. 102 std::swap(*(End - 1), *Pivot); 103 Pivot = std::partition(Start, End - 1, [&Comp, End](decltype(*Start) V) { 104 return Comp(V, *(End - 1)); 105 }); 106 // Move Pivot to middle of partition. 107 std::swap(*Pivot, *(End - 1)); 108 109 // Recurse. 110 TG.spawn([=, &Comp, &TG] { 111 parallel_quick_sort(Start, Pivot, Comp, TG, Depth - 1); 112 }); 113 parallel_quick_sort(Pivot + 1, End, Comp, TG, Depth - 1); 114 } 115 116 template <class RandomAccessIterator, class Comparator> 117 void parallel_sort(RandomAccessIterator Start, RandomAccessIterator End, 118 const Comparator &Comp) { 119 TaskGroup TG; 120 parallel_quick_sort(Start, End, Comp, TG, 121 llvm::Log2_64(std::distance(Start, End)) + 1); 122 } 123 124 // TaskGroup has a relatively high overhead, so we want to reduce 125 // the number of spawn() calls. We'll create up to 1024 tasks here. 126 // (Note that 1024 is an arbitrary number. This code probably needs 127 // improving to take the number of available cores into account.) 128 enum { MaxTasksPerGroup = 1024 }; 129 130 template <class IterTy, class FuncTy> 131 void parallel_for_each(IterTy Begin, IterTy End, FuncTy Fn) { 132 // If we have zero or one items, then do not incur the overhead of spinning up 133 // a task group. They are surprisingly expensive, and because they do not 134 // support nested parallelism, a single entry task group can block parallel 135 // execution underneath them. 136 auto NumItems = std::distance(Begin, End); 137 if (NumItems <= 1) { 138 if (NumItems) 139 Fn(*Begin); 140 return; 141 } 142 143 // Limit the number of tasks to MaxTasksPerGroup to limit job scheduling 144 // overhead on large inputs. 145 ptrdiff_t TaskSize = NumItems / MaxTasksPerGroup; 146 if (TaskSize == 0) 147 TaskSize = 1; 148 149 TaskGroup TG; 150 while (TaskSize < std::distance(Begin, End)) { 151 TG.spawn([=, &Fn] { std::for_each(Begin, Begin + TaskSize, Fn); }); 152 Begin += TaskSize; 153 } 154 std::for_each(Begin, End, Fn); 155 } 156 157 template <class IndexTy, class FuncTy> 158 void parallel_for_each_n(IndexTy Begin, IndexTy End, FuncTy Fn) { 159 // If we have zero or one items, then do not incur the overhead of spinning up 160 // a task group. They are surprisingly expensive, and because they do not 161 // support nested parallelism, a single entry task group can block parallel 162 // execution underneath them. 163 auto NumItems = End - Begin; 164 if (NumItems <= 1) { 165 if (NumItems) 166 Fn(Begin); 167 return; 168 } 169 170 // Limit the number of tasks to MaxTasksPerGroup to limit job scheduling 171 // overhead on large inputs. 172 ptrdiff_t TaskSize = NumItems / MaxTasksPerGroup; 173 if (TaskSize == 0) 174 TaskSize = 1; 175 176 TaskGroup TG; 177 IndexTy I = Begin; 178 for (; I + TaskSize < End; I += TaskSize) { 179 TG.spawn([=, &Fn] { 180 for (IndexTy J = I, E = I + TaskSize; J != E; ++J) 181 Fn(J); 182 }); 183 } 184 for (IndexTy J = I; J < End; ++J) 185 Fn(J); 186 } 187 188 template <class IterTy, class ResultTy, class ReduceFuncTy, 189 class TransformFuncTy> 190 ResultTy parallel_transform_reduce(IterTy Begin, IterTy End, ResultTy Init, 191 ReduceFuncTy Reduce, 192 TransformFuncTy Transform) { 193 // Limit the number of tasks to MaxTasksPerGroup to limit job scheduling 194 // overhead on large inputs. 195 size_t NumInputs = std::distance(Begin, End); 196 if (NumInputs == 0) 197 return std::move(Init); 198 size_t NumTasks = std::min(static_cast<size_t>(MaxTasksPerGroup), NumInputs); 199 std::vector<ResultTy> Results(NumTasks, Init); 200 { 201 // Each task processes either TaskSize or TaskSize+1 inputs. Any inputs 202 // remaining after dividing them equally amongst tasks are distributed as 203 // one extra input over the first tasks. 204 TaskGroup TG; 205 size_t TaskSize = NumInputs / NumTasks; 206 size_t RemainingInputs = NumInputs % NumTasks; 207 IterTy TBegin = Begin; 208 for (size_t TaskId = 0; TaskId < NumTasks; ++TaskId) { 209 IterTy TEnd = TBegin + TaskSize + (TaskId < RemainingInputs ? 1 : 0); 210 TG.spawn([=, &Transform, &Reduce, &Results] { 211 // Reduce the result of transformation eagerly within each task. 212 ResultTy R = Init; 213 for (IterTy It = TBegin; It != TEnd; ++It) 214 R = Reduce(R, Transform(*It)); 215 Results[TaskId] = R; 216 }); 217 TBegin = TEnd; 218 } 219 assert(TBegin == End); 220 } 221 222 // Do a final reduction. There are at most 1024 tasks, so this only adds 223 // constant single-threaded overhead for large inputs. Hopefully most 224 // reductions are cheaper than the transformation. 225 ResultTy FinalResult = std::move(Results.front()); 226 for (ResultTy &PartialResult : 227 makeMutableArrayRef(Results.data() + 1, Results.size() - 1)) 228 FinalResult = Reduce(FinalResult, std::move(PartialResult)); 229 return std::move(FinalResult); 230 } 231 232 #endif 233 234 } // namespace detail 235 } // namespace parallel 236 237 template <class RandomAccessIterator, 238 class Comparator = std::less< 239 typename std::iterator_traits<RandomAccessIterator>::value_type>> 240 void parallelSort(RandomAccessIterator Start, RandomAccessIterator End, 241 const Comparator &Comp = Comparator()) { 242 #if LLVM_ENABLE_THREADS 243 if (parallel::strategy.ThreadsRequested != 1) { 244 parallel::detail::parallel_sort(Start, End, Comp); 245 return; 246 } 247 #endif 248 llvm::sort(Start, End, Comp); 249 } 250 251 template <class IterTy, class FuncTy> 252 void parallelForEach(IterTy Begin, IterTy End, FuncTy Fn) { 253 #if LLVM_ENABLE_THREADS 254 if (parallel::strategy.ThreadsRequested != 1) { 255 parallel::detail::parallel_for_each(Begin, End, Fn); 256 return; 257 } 258 #endif 259 std::for_each(Begin, End, Fn); 260 } 261 262 template <class FuncTy> 263 void parallelForEachN(size_t Begin, size_t End, FuncTy Fn) { 264 #if LLVM_ENABLE_THREADS 265 if (parallel::strategy.ThreadsRequested != 1) { 266 parallel::detail::parallel_for_each_n(Begin, End, Fn); 267 return; 268 } 269 #endif 270 for (size_t I = Begin; I != End; ++I) 271 Fn(I); 272 } 273 274 template <class IterTy, class ResultTy, class ReduceFuncTy, 275 class TransformFuncTy> 276 ResultTy parallelTransformReduce(IterTy Begin, IterTy End, ResultTy Init, 277 ReduceFuncTy Reduce, 278 TransformFuncTy Transform) { 279 #if LLVM_ENABLE_THREADS 280 if (parallel::strategy.ThreadsRequested != 1) { 281 return parallel::detail::parallel_transform_reduce(Begin, End, Init, Reduce, 282 Transform); 283 } 284 #endif 285 for (IterTy I = Begin; I != End; ++I) 286 Init = Reduce(std::move(Init), Transform(*I)); 287 return std::move(Init); 288 } 289 290 // Range wrappers. 291 template <class RangeTy, 292 class Comparator = std::less<decltype(*std::begin(RangeTy()))>> 293 void parallelSort(RangeTy &&R, const Comparator &Comp = Comparator()) { 294 parallelSort(std::begin(R), std::end(R), Comp); 295 } 296 297 template <class RangeTy, class FuncTy> 298 void parallelForEach(RangeTy &&R, FuncTy Fn) { 299 parallelForEach(std::begin(R), std::end(R), Fn); 300 } 301 302 template <class RangeTy, class ResultTy, class ReduceFuncTy, 303 class TransformFuncTy> 304 ResultTy parallelTransformReduce(RangeTy &&R, ResultTy Init, 305 ReduceFuncTy Reduce, 306 TransformFuncTy Transform) { 307 return parallelTransformReduce(std::begin(R), std::end(R), Init, Reduce, 308 Transform); 309 } 310 311 // Parallel for-each, but with error handling. 312 template <class RangeTy, class FuncTy> 313 Error parallelForEachError(RangeTy &&R, FuncTy Fn) { 314 // The transform_reduce algorithm requires that the initial value be copyable. 315 // Error objects are uncopyable. We only need to copy initial success values, 316 // so work around this mismatch via the C API. The C API represents success 317 // values with a null pointer. The joinErrors discards null values and joins 318 // multiple errors into an ErrorList. 319 return unwrap(parallelTransformReduce( 320 std::begin(R), std::end(R), wrap(Error::success()), 321 [](LLVMErrorRef Lhs, LLVMErrorRef Rhs) { 322 return wrap(joinErrors(unwrap(Lhs), unwrap(Rhs))); 323 }, 324 [&Fn](auto &&V) { return wrap(Fn(V)); })); 325 } 326 327 } // namespace llvm 328 329 #endif // LLVM_SUPPORT_PARALLEL_H 330