$darkmode
Eigen  5.0.1-dev
CoreThreadPoolDevice.h
1 // This file is part of Eigen, a lightweight C++ template library
2 // for linear algebra.
3 //
4 // Copyright (C) 2023 Charlie Schlosser <cs.schlosser@gmail.com>
5 //
6 // This Source Code Form is subject to the terms of the Mozilla
7 // Public License v. 2.0. If a copy of the MPL was not distributed
8 // with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
9 
10 #ifndef EIGEN_CORE_THREAD_POOL_DEVICE_H
11 #define EIGEN_CORE_THREAD_POOL_DEVICE_H
12 
13 namespace Eigen {
14 
15 // CoreThreadPoolDevice provides an easy-to-understand Device for parallelizing Eigen Core expressions with
16 // Threadpool. Expressions are recursively split evenly until the evaluation cost is less than the threshold for
17 // delegating the task to a thread.
18 /*
19  a
20  / \
21  / \
22  / \
23  / \
24  / \
25  / \
26  / \
27  a e
28  / \ / \
29  / \ / \
30  / \ / \
31  a c e g
32  / \ / \ / \ / \
33  / \ / \ / \ / \
34  a b c d e f g h
35 */
36 // Each task descends the binary tree to the left, delegates the right task to a new thread, and continues to the
37 // left. This ensures that work is evenly distributed to the thread pool as quickly as possible and minimizes the number
38 // of tasks created during the evaluation. Consider an expression that is divided into 8 chunks. The
39 // primary task 'a' creates tasks 'e' 'c' and 'b', and executes its portion of the expression at the bottom of the
40 // tree. Likewise, task 'e' creates tasks 'g' and 'f', and executes its portion of the expression.
41 
42 struct CoreThreadPoolDevice {
43  using Task = std::function<void()>;
44  EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE CoreThreadPoolDevice(ThreadPool& pool, float threadCostThreshold = 3e-5f)
45  : m_pool(pool) {
46  eigen_assert(threadCostThreshold >= 0.0f && "threadCostThreshold must be non-negative");
47  m_costFactor = threadCostThreshold;
48  }
49 
50  template <int PacketSize>
51  EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE int calculateLevels(Index size, float cost) const {
52  eigen_assert(cost >= 0.0f && "cost must be non-negative");
53  Index numOps = size / PacketSize;
54  int actualThreads = numOps < m_pool.NumThreads() ? static_cast<int>(numOps) : m_pool.NumThreads();
55  float totalCost = static_cast<float>(numOps) * cost;
56  float idealThreads = totalCost * m_costFactor;
57  if (idealThreads < static_cast<float>(actualThreads)) {
58  idealThreads = numext::maxi(idealThreads, 1.0f);
59  actualThreads = numext::mini(actualThreads, static_cast<int>(idealThreads));
60  }
61  int maxLevel = internal::log2_ceil(actualThreads);
62  return maxLevel;
63  }
64 
65 // MSVC does not like inlining parallelForImpl
66 #if EIGEN_COMP_MSVC && !EIGEN_COMP_CLANG
67 #define EIGEN_PARALLEL_FOR_INLINE
68 #else
69 #define EIGEN_PARALLEL_FOR_INLINE EIGEN_STRONG_INLINE
70 #endif
71 
72  template <typename UnaryFunctor, int PacketSize>
73  EIGEN_DEVICE_FUNC EIGEN_PARALLEL_FOR_INLINE void parallelForImpl(Index begin, Index end, UnaryFunctor& f,
74  Barrier& barrier, int level) {
75  while (level > 0) {
76  level--;
77  Index size = end - begin;
78  eigen_assert(size % PacketSize == 0 && "this function assumes size is a multiple of PacketSize");
79  Index mid = begin + numext::round_down(size >> 1, PacketSize);
80  Task right = [this, mid, end, &f, &barrier, level]() {
81  parallelForImpl<UnaryFunctor, PacketSize>(mid, end, f, barrier, level);
82  };
83  m_pool.Schedule(std::move(right));
84  end = mid;
85  }
86  for (Index i = begin; i < end; i += PacketSize) f(i);
87  barrier.Notify();
88  }
89 
90  template <typename BinaryFunctor, int PacketSize>
91  EIGEN_DEVICE_FUNC EIGEN_PARALLEL_FOR_INLINE void parallelForImpl(Index outerBegin, Index outerEnd, Index innerBegin,
92  Index innerEnd, BinaryFunctor& f, Barrier& barrier,
93  int level) {
94  while (level > 0) {
95  level--;
96  Index outerSize = outerEnd - outerBegin;
97  if (outerSize > 1) {
98  Index outerMid = outerBegin + (outerSize >> 1);
99  Task right = [this, &f, &barrier, outerMid, outerEnd, innerBegin, innerEnd, level]() {
100  parallelForImpl<BinaryFunctor, PacketSize>(outerMid, outerEnd, innerBegin, innerEnd, f, barrier, level);
101  };
102  m_pool.Schedule(std::move(right));
103  outerEnd = outerMid;
104  } else {
105  Index innerSize = innerEnd - innerBegin;
106  eigen_assert(innerSize % PacketSize == 0 && "this function assumes innerSize is a multiple of PacketSize");
107  Index innerMid = innerBegin + numext::round_down(innerSize >> 1, PacketSize);
108  Task right = [this, &f, &barrier, outerBegin, outerEnd, innerMid, innerEnd, level]() {
109  parallelForImpl<BinaryFunctor, PacketSize>(outerBegin, outerEnd, innerMid, innerEnd, f, barrier, level);
110  };
111  m_pool.Schedule(std::move(right));
112  innerEnd = innerMid;
113  }
114  }
115  for (Index outer = outerBegin; outer < outerEnd; outer++)
116  for (Index inner = innerBegin; inner < innerEnd; inner += PacketSize) f(outer, inner);
117  barrier.Notify();
118  }
119 
120 #undef EIGEN_PARALLEL_FOR_INLINE
121 
122  template <typename UnaryFunctor, int PacketSize>
123  EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void parallelFor(Index begin, Index end, UnaryFunctor& f, float cost) {
124  Index size = end - begin;
125  int maxLevel = calculateLevels<PacketSize>(size, cost);
126  Barrier barrier(1 << maxLevel);
127  parallelForImpl<UnaryFunctor, PacketSize>(begin, end, f, barrier, maxLevel);
128  barrier.Wait();
129  }
130 
131  template <typename BinaryFunctor, int PacketSize>
132  EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void parallelFor(Index outerBegin, Index outerEnd, Index innerBegin,
133  Index innerEnd, BinaryFunctor& f, float cost) {
134  Index outerSize = outerEnd - outerBegin;
135  Index innerSize = innerEnd - innerBegin;
136  Index size = outerSize * innerSize;
137  int maxLevel = calculateLevels<PacketSize>(size, cost);
138  Barrier barrier(1 << maxLevel);
139  parallelForImpl<BinaryFunctor, PacketSize>(outerBegin, outerEnd, innerBegin, innerEnd, f, barrier, maxLevel);
140  barrier.Wait();
141  }
142 
143  ThreadPool& m_pool;
144  // costFactor is the cost of delegating a task to a thread
145  // the inverse is used to avoid a floating point division
146  float m_costFactor;
147 };
148 
149 // specialization of coefficient-wise assignment loops for CoreThreadPoolDevice
150 
151 namespace internal {
152 
153 #ifdef EIGEN_PARSED_BY_DOXYGEN
154 struct Kernel;
155 #endif
156 
157 template <typename Kernel>
158 struct cost_helper {
159  using SrcEvaluatorType = typename Kernel::SrcEvaluatorType;
160  using DstEvaluatorType = typename Kernel::DstEvaluatorType;
161  using SrcXprType = typename SrcEvaluatorType::XprType;
162  using DstXprType = typename DstEvaluatorType::XprType;
163  static constexpr Index Cost = functor_cost<SrcXprType>::Cost + functor_cost<DstXprType>::Cost;
164 };
165 
166 template <typename Kernel>
167 struct dense_assignment_loop_with_device<Kernel, CoreThreadPoolDevice, DefaultTraversal, NoUnrolling> {
168  static constexpr Index XprEvaluationCost = cost_helper<Kernel>::Cost;
169  struct AssignmentFunctor : public Kernel {
170  EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE AssignmentFunctor(Kernel& kernel) : Kernel(kernel) {}
171  EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void operator()(Index outer, Index inner) {
172  this->assignCoeffByOuterInner(outer, inner);
173  }
174  };
175 
176  static EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void run(Kernel& kernel, CoreThreadPoolDevice& device) {
177  const Index innerSize = kernel.innerSize();
178  const Index outerSize = kernel.outerSize();
179  constexpr float cost = static_cast<float>(XprEvaluationCost);
180  AssignmentFunctor functor(kernel);
181  device.template parallelFor<AssignmentFunctor, 1>(0, outerSize, 0, innerSize, functor, cost);
182  }
183 };
184 
185 template <typename Kernel>
186 struct dense_assignment_loop_with_device<Kernel, CoreThreadPoolDevice, DefaultTraversal, InnerUnrolling> {
187  using DstXprType = typename Kernel::DstEvaluatorType::XprType;
188  static constexpr Index XprEvaluationCost = cost_helper<Kernel>::Cost, InnerSize = DstXprType::InnerSizeAtCompileTime;
189  struct AssignmentFunctor : public Kernel {
190  EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE AssignmentFunctor(Kernel& kernel) : Kernel(kernel) {}
191  EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void operator()(Index outer) {
192  copy_using_evaluator_DefaultTraversal_InnerUnrolling<Kernel, 0, InnerSize>::run(*this, outer);
193  }
194  };
195  static EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void run(Kernel& kernel, CoreThreadPoolDevice& device) {
196  const Index outerSize = kernel.outerSize();
197  AssignmentFunctor functor(kernel);
198  constexpr float cost = static_cast<float>(XprEvaluationCost) * static_cast<float>(InnerSize);
199  device.template parallelFor<AssignmentFunctor, 1>(0, outerSize, functor, cost);
200  }
201 };
202 
203 template <typename Kernel>
204 struct dense_assignment_loop_with_device<Kernel, CoreThreadPoolDevice, InnerVectorizedTraversal, NoUnrolling> {
205  using PacketType = typename Kernel::PacketType;
206  static constexpr Index XprEvaluationCost = cost_helper<Kernel>::Cost, PacketSize = unpacket_traits<PacketType>::size,
207  SrcAlignment = Kernel::AssignmentTraits::SrcAlignment,
208  DstAlignment = Kernel::AssignmentTraits::DstAlignment;
209  struct AssignmentFunctor : public Kernel {
210  EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE AssignmentFunctor(Kernel& kernel) : Kernel(kernel) {}
211  EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void operator()(Index outer, Index inner) {
212  this->template assignPacketByOuterInner<Unaligned, Unaligned, PacketType>(outer, inner);
213  }
214  };
215  static EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void run(Kernel& kernel, CoreThreadPoolDevice& device) {
216  const Index innerSize = kernel.innerSize();
217  const Index outerSize = kernel.outerSize();
218  const float cost = static_cast<float>(XprEvaluationCost) * static_cast<float>(innerSize);
219  AssignmentFunctor functor(kernel);
220  device.template parallelFor<AssignmentFunctor, PacketSize>(0, outerSize, 0, innerSize, functor, cost);
221  }
222 };
223 
224 template <typename Kernel>
225 struct dense_assignment_loop_with_device<Kernel, CoreThreadPoolDevice, InnerVectorizedTraversal, InnerUnrolling> {
226  using PacketType = typename Kernel::PacketType;
227  using DstXprType = typename Kernel::DstEvaluatorType::XprType;
228  static constexpr Index XprEvaluationCost = cost_helper<Kernel>::Cost, PacketSize = unpacket_traits<PacketType>::size,
229  SrcAlignment = Kernel::AssignmentTraits::SrcAlignment,
230  DstAlignment = Kernel::AssignmentTraits::DstAlignment,
231  InnerSize = DstXprType::InnerSizeAtCompileTime;
232  struct AssignmentFunctor : public Kernel {
233  EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE AssignmentFunctor(Kernel& kernel) : Kernel(kernel) {}
234  EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void operator()(Index outer) {
235  copy_using_evaluator_innervec_InnerUnrolling<Kernel, 0, InnerSize, SrcAlignment, DstAlignment>::run(*this, outer);
236  }
237  };
238  static EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void run(Kernel& kernel, CoreThreadPoolDevice& device) {
239  const Index outerSize = kernel.outerSize();
240  constexpr float cost = static_cast<float>(XprEvaluationCost) * static_cast<float>(InnerSize);
241  AssignmentFunctor functor(kernel);
242  device.template parallelFor<AssignmentFunctor, PacketSize>(0, outerSize, functor, cost);
243  }
244 };
245 
246 template <typename Kernel>
247 struct dense_assignment_loop_with_device<Kernel, CoreThreadPoolDevice, SliceVectorizedTraversal, NoUnrolling> {
248  using Scalar = typename Kernel::Scalar;
249  using PacketType = typename Kernel::PacketType;
250  static constexpr Index XprEvaluationCost = cost_helper<Kernel>::Cost, PacketSize = unpacket_traits<PacketType>::size;
251  struct PacketAssignmentFunctor : public Kernel {
252  EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE PacketAssignmentFunctor(Kernel& kernel) : Kernel(kernel) {}
253  EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void operator()(Index outer, Index inner) {
254  this->template assignPacketByOuterInner<Unaligned, Unaligned, PacketType>(outer, inner);
255  }
256  };
257  struct ScalarAssignmentFunctor : public Kernel {
258  EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE ScalarAssignmentFunctor(Kernel& kernel) : Kernel(kernel) {}
259  EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void operator()(Index outer) {
260  const Index innerSize = this->innerSize();
261  const Index packetAccessSize = numext::round_down(innerSize, PacketSize);
262  for (Index inner = packetAccessSize; inner < innerSize; inner++) this->assignCoeffByOuterInner(outer, inner);
263  }
264  };
265  static EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void run(Kernel& kernel, CoreThreadPoolDevice& device) {
266  const Index outerSize = kernel.outerSize();
267  const Index innerSize = kernel.innerSize();
268  const Index packetAccessSize = numext::round_down(innerSize, PacketSize);
269  constexpr float packetCost = static_cast<float>(XprEvaluationCost);
270  const float scalarCost = static_cast<float>(XprEvaluationCost) * static_cast<float>(innerSize - packetAccessSize);
271  PacketAssignmentFunctor packetFunctor(kernel);
272  ScalarAssignmentFunctor scalarFunctor(kernel);
273  device.template parallelFor<PacketAssignmentFunctor, PacketSize>(0, outerSize, 0, packetAccessSize, packetFunctor,
274  packetCost);
275  device.template parallelFor<ScalarAssignmentFunctor, 1>(0, outerSize, scalarFunctor, scalarCost);
276  };
277 };
278 
279 template <typename Kernel>
280 struct dense_assignment_loop_with_device<Kernel, CoreThreadPoolDevice, LinearTraversal, NoUnrolling> {
281  static constexpr Index XprEvaluationCost = cost_helper<Kernel>::Cost;
282  struct AssignmentFunctor : public Kernel {
283  EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE AssignmentFunctor(Kernel& kernel) : Kernel(kernel) {}
284  EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void operator()(Index index) { this->assignCoeff(index); }
285  };
286  static EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void run(Kernel& kernel, CoreThreadPoolDevice& device) {
287  const Index size = kernel.size();
288  constexpr float cost = static_cast<float>(XprEvaluationCost);
289  AssignmentFunctor functor(kernel);
290  device.template parallelFor<AssignmentFunctor, 1>(0, size, functor, cost);
291  }
292 };
293 
294 template <typename Kernel>
295 struct dense_assignment_loop_with_device<Kernel, CoreThreadPoolDevice, LinearVectorizedTraversal, NoUnrolling> {
296  using Scalar = typename Kernel::Scalar;
297  using PacketType = typename Kernel::PacketType;
298  static constexpr Index XprEvaluationCost = cost_helper<Kernel>::Cost,
299  RequestedAlignment = Kernel::AssignmentTraits::LinearRequiredAlignment,
300  PacketSize = unpacket_traits<PacketType>::size,
301  DstIsAligned = Kernel::AssignmentTraits::DstAlignment >= RequestedAlignment,
302  DstAlignment = packet_traits<Scalar>::AlignedOnScalar ? RequestedAlignment
303  : Kernel::AssignmentTraits::DstAlignment,
304  SrcAlignment = Kernel::AssignmentTraits::JointAlignment;
305  struct AssignmentFunctor : public Kernel {
306  EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE AssignmentFunctor(Kernel& kernel) : Kernel(kernel) {}
307  EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void operator()(Index index) {
308  this->template assignPacket<DstAlignment, SrcAlignment, PacketType>(index);
309  }
310  };
311  static constexpr bool UsePacketSegment = Kernel::AssignmentTraits::UsePacketSegment;
312  using head_loop =
313  unaligned_dense_assignment_loop<PacketType, DstAlignment, SrcAlignment, UsePacketSegment, DstIsAligned>;
314  using tail_loop = unaligned_dense_assignment_loop<PacketType, DstAlignment, SrcAlignment, UsePacketSegment, false>;
315 
316  static EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void run(Kernel& kernel, CoreThreadPoolDevice& device) {
317  const Index size = kernel.size();
318  const Index alignedStart =
319  DstIsAligned ? 0 : internal::first_aligned<RequestedAlignment>(kernel.dstDataPtr(), size);
320  const Index alignedEnd = alignedStart + numext::round_down(size - alignedStart, PacketSize);
321 
322  head_loop::run(kernel, 0, alignedStart);
323 
324  constexpr float cost = static_cast<float>(XprEvaluationCost);
325  AssignmentFunctor functor(kernel);
326  device.template parallelFor<AssignmentFunctor, PacketSize>(alignedStart, alignedEnd, functor, cost);
327 
328  tail_loop::run(kernel, alignedEnd, size);
329  }
330 };
331 
332 } // namespace internal
333 
334 } // namespace Eigen
335 
336 #endif // EIGEN_CORE_THREAD_POOL_DEVICE_H
static constexpr lastp1_t end
Definition: IndexedViewHelper.h:79
Namespace containing all symbols from the Eigen library.
Definition: B01_Experimental.dox:1
EIGEN_DEFAULT_DENSE_INDEX_TYPE Index
The Index type as used for the API.
Definition: Meta.h:82