$darkmode
Eigen  5.0.1-dev
ForkJoin.h
1 // This file is part of Eigen, a lightweight C++ template library
2 // for linear algebra.
3 //
4 // Copyright (C) 2025 Weiwei Kong <weiweikong@google.com>
5 //
6 // This Source Code Form is subject to the terms of the Mozilla
7 // Public License v. 2.0. If a copy of the MPL was not distributed
8 // with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
9 
10 #ifndef EIGEN_THREADPOOL_FORKJOIN_H
11 #define EIGEN_THREADPOOL_FORKJOIN_H
12 
13 // IWYU pragma: private
14 #include "./InternalHeaderCheck.h"
15 
16 namespace Eigen {
17 
18 // ForkJoinScheduler provides implementations of various non-blocking ParallelFor algorithms for unary
19 // and binary parallel tasks. More specifically, the implementations follow the binary tree-based
20 // algorithm from the following paper:
21 //
22 // Lea, D. (2000, June). A java fork/join framework. *In Proceedings of the
23 // ACM 2000 conference on Java Grande* (pp. 36-43).
24 //
25 // For a given binary task function `f(i,j)` and integers `num_threads`, `granularity`, `start`, and `end`,
26 // the implemented parallel for algorithm schedules and executes at most `num_threads` of the functions
27 // from the following set in parallel (either synchronously or asynchronously):
28 //
29 // f(start,start+s_1), f(start+s_1,start+s_2), ..., f(start+s_n,end)
30 //
31 // where `s_{j+1} - s_{j}` and `end - s_n` are roughly within a factor of two of `granularity`. For a unary
32 // task function `g(k)`, the same operation is applied with
33 //
34 // f(i,j) = [&](){ for(Index k = i; k < j; ++k) g(k); };
35 //
36 // Note that the parameter `granularity` should be tuned by the user based on the trade-off of running the
37 // given task function sequentially vs. scheduling individual tasks in parallel. An example of a partially
38 // tuned `granularity` is in `Eigen::CoreThreadPoolDevice::parallelFor(...)` where the template
39 // parameter `PacketSize` and float input `cost` are used to indirectly compute a granularity level for a
40 // given task function.
41 //
42 // Example usage #1 (synchronous):
43 // ```
44 // ThreadPool thread_pool(num_threads);
45 // ForkJoinScheduler::ParallelFor(0, num_tasks, granularity, std::move(parallel_task), &thread_pool);
46 // ```
47 //
48 // Example usage #2 (executing multiple tasks asynchronously, each one parallelized with ParallelFor):
49 // ```
50 // ThreadPool thread_pool(num_threads);
51 // Barrier barrier(num_async_calls);
52 // auto done = [&](){ barrier.Notify(); };
53 // for (Index k=0; k<num_async_calls; ++k) {
54 // ForkJoinScheduler::ParallelForAsync(task_start[k], task_end[k], granularity[k], parallel_task[k], done,
55 // &thread_pool);
56 // }
57 // barrier.Wait();
58 // ```
59 class ForkJoinScheduler {
60  public:
61  // Runs `do_func` asynchronously for the range [start, end) with a specified
62  // granularity. `do_func` should be of type `std::function<void(Index,
63  // Index)`. `done()` is called exactly once after all tasks have been executed.
64  template <typename DoFnType, typename DoneFnType, typename ThreadPoolEnv>
65  static void ParallelForAsync(Index start, Index end, Index granularity, DoFnType&& do_func, DoneFnType&& done,
66  ThreadPoolTempl<ThreadPoolEnv>* thread_pool) {
67  if (start >= end) {
68  done();
69  return;
70  }
71  thread_pool->Schedule([start, end, granularity, thread_pool, do_func = std::forward<DoFnType>(do_func),
72  done = std::forward<DoneFnType>(done)]() {
73  RunParallelFor(start, end, granularity, do_func, thread_pool);
74  done();
75  });
76  }
77 
78  // Synchronous variant of ParallelForAsync.
79  // WARNING: Making nested calls to `ParallelFor`, e.g., calling `ParallelFor` inside a task passed into another
80  // `ParallelFor` call, may lead to deadlocks due to how task stealing is implemented.
81  template <typename DoFnType, typename ThreadPoolEnv>
82  static void ParallelFor(Index start, Index end, Index granularity, DoFnType&& do_func,
83  ThreadPoolTempl<ThreadPoolEnv>* thread_pool) {
84  if (start >= end) return;
85  Barrier barrier(1);
86  auto done = [&barrier]() { barrier.Notify(); };
87  ParallelForAsync(start, end, granularity, do_func, done, thread_pool);
88  barrier.Wait();
89  }
90 
91  private:
92  // Schedules `right_thunk`, runs `left_thunk`, and runs other tasks until `right_thunk` has finished.
93  template <typename LeftType, typename RightType, typename ThreadPoolEnv>
94  static void ForkJoin(LeftType&& left_thunk, RightType&& right_thunk, ThreadPoolTempl<ThreadPoolEnv>* thread_pool) {
95  typedef typename ThreadPoolTempl<ThreadPoolEnv>::Task Task;
96  std::atomic<bool> right_done(false);
97  auto execute_right = [&right_thunk, &right_done]() {
98  std::forward<RightType>(right_thunk)();
99  right_done.store(true, std::memory_order_release);
100  };
101  thread_pool->Schedule(execute_right);
102  std::forward<LeftType>(left_thunk)();
103  Task task;
104  while (!right_done.load(std::memory_order_acquire)) {
105  thread_pool->MaybeGetTask(&task);
106  if (task.f) task.f();
107  }
108  }
109 
110  static Index ComputeMidpoint(Index start, Index end, Index granularity) {
111  // Typical workloads choose initial values of `{start, end, granularity}` such that `start - end` and
112  // `granularity` are powers of two. Since modern processors usually implement (2^x)-way
113  // set-associative caches, we minimize the number of cache misses by choosing midpoints that are not
114  // powers of two (to avoid having two addresses in the main memory pointing to the same point in the
115  // cache). More specifically, we choose the midpoint at (roughly) the 9/16 mark.
116  const Index size = end - start;
117  const Index offset = numext::round_down(9 * (size + 1) / 16, granularity);
118  return start + offset;
119  }
120 
121  template <typename DoFnType, typename ThreadPoolEnv>
122  static void RunParallelFor(Index start, Index end, Index granularity, DoFnType&& do_func,
123  ThreadPoolTempl<ThreadPoolEnv>* thread_pool) {
124  Index mid = ComputeMidpoint(start, end, granularity);
125  if ((end - start) < granularity || mid == start || mid == end) {
126  do_func(start, end);
127  return;
128  }
129  ForkJoin([start, mid, granularity, &do_func,
130  thread_pool]() { RunParallelFor(start, mid, granularity, do_func, thread_pool); },
131  [mid, end, granularity, &do_func, thread_pool]() {
132  RunParallelFor(mid, end, granularity, do_func, thread_pool);
133  },
134  thread_pool);
135  }
136 };
137 
138 } // namespace Eigen
139 
140 #endif // EIGEN_THREADPOOL_FORKJOIN_H
static constexpr lastp1_t end
Definition: IndexedViewHelper.h:79
Namespace containing all symbols from the Eigen library.
Definition: B01_Experimental.dox:1
EIGEN_DEFAULT_DENSE_INDEX_TYPE Index
The Index type as used for the API.
Definition: Meta.h:82