$darkmode
Eigen-unsupported  5.0.1-dev
TensorDeviceThreadPool.h
1 // This file is part of Eigen, a lightweight C++ template library
2 // for linear algebra.
3 //
4 // Copyright (C) 2014 Benoit Steiner <benoit.steiner.goog@gmail.com>
5 //
6 // This Source Code Form is subject to the terms of the Mozilla
7 // Public License v. 2.0. If a copy of the MPL was not distributed
8 // with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
9 
10 #if defined(EIGEN_USE_THREADS) && !defined(EIGEN_CXX11_TENSOR_TENSOR_DEVICE_THREAD_POOL_H)
11 #define EIGEN_CXX11_TENSOR_TENSOR_DEVICE_THREAD_POOL_H
12 
13 // IWYU pragma: private
14 #include "./InternalHeaderCheck.h"
15 
16 namespace Eigen {
17 
18 // An abstract interface to a device specific memory allocator.
19 class Allocator {
20  public:
21  virtual ~Allocator() {}
22  virtual void* allocate(size_t num_bytes) const = 0;
23  virtual void deallocate(void* buffer) const = 0;
24 };
25 
26 // Build a thread pool device on top the an existing pool of threads.
27 struct ThreadPoolDevice {
28  // The ownership of the thread pool remains with the caller.
29  ThreadPoolDevice(ThreadPoolInterface* pool, int num_cores, Allocator* allocator = nullptr)
30  : pool_(pool), num_threads_(num_cores), allocator_(allocator) {}
31 
32  EIGEN_STRONG_INLINE void* allocate(size_t num_bytes) const {
33  return allocator_ ? allocator_->allocate(num_bytes) : internal::aligned_malloc(num_bytes);
34  }
35 
36  EIGEN_STRONG_INLINE void deallocate(void* buffer) const {
37  if (allocator_) {
38  allocator_->deallocate(buffer);
39  } else {
40  internal::aligned_free(buffer);
41  }
42  }
43 
44  EIGEN_STRONG_INLINE void* allocate_temp(size_t num_bytes) const { return allocate(num_bytes); }
45 
46  EIGEN_STRONG_INLINE void deallocate_temp(void* buffer) const { deallocate(buffer); }
47 
48  template <typename Type>
49  EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE Type get(Type data) const {
50  return data;
51  }
52 
53  EIGEN_STRONG_INLINE void memcpy(void* dst, const void* src, size_t n) const {
54 #ifdef __ANDROID__
55  ::memcpy(dst, src, n);
56 #else
57  // TODO(rmlarsen): Align blocks on cache lines.
58  // We have observed that going beyond 4 threads usually just wastes
59  // CPU cycles due to the threads competing for memory bandwidth, so we
60  // statically schedule at most 4 block copies here.
61  const size_t kMinBlockSize = 32768;
62  const size_t num_threads = CostModel::numThreads(n, TensorOpCost(1.0, 1.0, 0), 4);
63  if (n <= kMinBlockSize || num_threads < 2) {
64  ::memcpy(dst, src, n);
65  } else {
66  const char* src_ptr = static_cast<const char*>(src);
67  char* dst_ptr = static_cast<char*>(dst);
68  const size_t blocksize = (n + (num_threads - 1)) / num_threads;
69  Barrier barrier(static_cast<int>(num_threads - 1));
70  // Launch the last 3 blocks on worker threads.
71  for (size_t i = 1; i < num_threads; ++i) {
72  pool_->Schedule([n, i, src_ptr, dst_ptr, blocksize, &barrier] {
73  ::memcpy(dst_ptr + i * blocksize, src_ptr + i * blocksize, numext::mini(blocksize, n - (i * blocksize)));
74  barrier.Notify();
75  });
76  }
77  // Launch the first block on the main thread.
78  ::memcpy(dst_ptr, src_ptr, blocksize);
79  barrier.Wait();
80  }
81 #endif
82  }
83  EIGEN_STRONG_INLINE void memcpyHostToDevice(void* dst, const void* src, size_t n) const { memcpy(dst, src, n); }
84  EIGEN_STRONG_INLINE void memcpyDeviceToHost(void* dst, const void* src, size_t n) const { memcpy(dst, src, n); }
85 
86  EIGEN_STRONG_INLINE void memset(void* buffer, int c, size_t n) const { ::memset(buffer, c, n); }
87 
88  template <typename T>
89  EIGEN_STRONG_INLINE void fill(T* begin, T* end, const T& value) const {
90  std::fill(begin, end, value);
91  }
92 
93  EIGEN_STRONG_INLINE int numThreads() const { return num_threads_; }
94 
95  // Number of theads available in the underlying thread pool. This number can
96  // be different from the value returned by numThreads().
97  EIGEN_STRONG_INLINE int numThreadsInPool() const { return pool_->NumThreads(); }
98 
99  EIGEN_STRONG_INLINE size_t firstLevelCacheSize() const { return l1CacheSize(); }
100 
101  EIGEN_STRONG_INLINE size_t lastLevelCacheSize() const {
102  // The l3 cache size is shared between all the cores.
103  return l3CacheSize() / num_threads_;
104  }
105 
106  EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE void synchronize() const {
107  // Nothing. Threadpool device operations are synchronous.
108  }
109 
110  EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE int majorDeviceVersion() const {
111  // Should return an enum that encodes the ISA supported by the CPU
112  return 1;
113  }
114 
115  // TODO(rmlarsen): Remove this deprecated interface when all users have been converted.
116  template <class Function, class... Args>
117  EIGEN_STRONG_INLINE void enqueueNoNotification(Function&& f, Args&&... args) const {
118  enqueue(std::forward<Function>(f), std::forward<Args>(args)...);
119  }
120 
121  template <class Function, class... Args>
122  EIGEN_STRONG_INLINE void enqueue(Function&& f, Args&&... args) const {
123 #if EIGEN_COMP_CXXVER >= 20
124  if constexpr (sizeof...(args) > 0) {
125  auto run_f = [f = std::forward<Function>(f), ... args = std::forward<Args>(args)]() { f(args...); };
126 #else
127  if (sizeof...(args) > 0) {
128  auto run_f = [f = std::forward<Function>(f), &args...]() { f(args...); };
129 #endif
130  pool_->Schedule(std::move(run_f));
131  } else {
132  pool_->Schedule(std::forward<Function>(f));
133  }
134  }
135 
136  // Returns a logical thread index between 0 and pool_->NumThreads() - 1 if
137  // called from one of the threads in pool_. Returns -1 otherwise.
138  EIGEN_STRONG_INLINE int currentThreadId() const { return pool_->CurrentThreadId(); }
139 
140  // WARNING: This function is synchronous and will block the calling thread.
141  //
142  // Synchronous parallelFor executes f with [0, n) arguments in parallel and
143  // waits for completion. F accepts a half-open interval [first, last). Block
144  // size is chosen based on the iteration cost and resulting parallel
145  // efficiency. If block_align is not nullptr, it is called to round up the
146  // block size.
147  void parallelFor(Index n, const TensorOpCost& cost, std::function<Index(Index)> block_align,
148  std::function<void(Index, Index)> f) const {
149  if (EIGEN_PREDICT_FALSE(n <= 0)) {
150  return;
151  // Compute small problems directly in the caller thread.
152  } else if (n == 1 || numThreads() == 1 || CostModel::numThreads(n, cost, static_cast<int>(numThreads())) == 1) {
153  f(0, n);
154  return;
155  }
156 
157  // Compute block size and total count of blocks.
158  ParallelForBlock block = CalculateParallelForBlock(n, cost, block_align);
159 
160  // Recursively divide size into halves until we reach block_size.
161  // Division code rounds mid to block_size, so we are guaranteed to get
162  // block_count leaves that do actual computations.
163  Barrier barrier(static_cast<unsigned int>(block.count));
164  if (block.count <= numThreads()) {
165  // Avoid a thread hop by running the root of the tree and one block on the
166  // main thread.
167  handleRange(0, n, block.size, &barrier, pool_, f);
168  } else {
169  // Execute the root in the thread pool to avoid running work on more than
170  // numThreads() threads.
171  pool_->Schedule([this, n, &block, &barrier, &f]() { handleRange(0, n, block.size, &barrier, pool_, f); });
172  }
173 
174  barrier.Wait();
175  }
176 
177  // Convenience wrapper for parallelFor that does not align blocks.
178  void parallelFor(Index n, const TensorOpCost& cost, std::function<void(Index, Index)> f) const {
179  parallelFor(n, cost, nullptr, std::move(f));
180  }
181 
182  // WARNING: This function is asynchronous and will not block the calling thread.
183  //
184  // Asynchronous parallelFor executes f with [0, n) arguments in parallel
185  // without waiting for completion. When the last block finished, it will call
186  // 'done' callback. F accepts a half-open interval [first, last). Block size
187  // is chosen based on the iteration cost and resulting parallel efficiency. If
188  // block_align is not nullptr, it is called to round up the block size.
189  void parallelForAsync(Index n, const TensorOpCost& cost, std::function<Index(Index)> block_align,
190  std::function<void(Index, Index)> f, std::function<void()> done) const {
191  // Compute small problems directly in the caller thread.
192  if (n <= 1 || numThreads() == 1 || CostModel::numThreads(n, cost, static_cast<int>(numThreads())) == 1) {
193  f(0, n);
194  done();
195  return;
196  }
197 
198  // Compute block size and total count of blocks.
199  ParallelForBlock block = CalculateParallelForBlock(n, cost, block_align);
200 
201  ParallelForAsyncContext* const ctx = new ParallelForAsyncContext(block.count, std::move(f), std::move(done));
202 
203  // Recursively divide size into halves until we reach block_size.
204  // Division code rounds mid to block_size, so we are guaranteed to get
205  // block_count leaves that do actual computations.
206  ctx->handle_range = [this, ctx, block](Index firstIdx, Index lastIdx) {
207  while (lastIdx - firstIdx > block.size) {
208  // Split into halves and schedule the second half on a different thread.
209  const Index midIdx = firstIdx + numext::div_ceil((lastIdx - firstIdx) / 2, block.size) * block.size;
210  pool_->Schedule([ctx, midIdx, lastIdx]() { ctx->handle_range(midIdx, lastIdx); });
211  lastIdx = midIdx;
212  }
213 
214  // Single block or less, execute directly.
215  ctx->f(firstIdx, lastIdx);
216 
217  // Delete async context if it was the last block.
218  if (ctx->count.fetch_sub(1) == 1) delete ctx;
219  };
220 
221  if (block.count <= numThreads()) {
222  // Avoid a thread hop by running the root of the tree and one block on the
223  // main thread.
224  ctx->handle_range(0, n);
225  } else {
226  // Execute the root in the thread pool to avoid running work on more than
227  // numThreads() threads.
228  pool_->Schedule([ctx, n]() { ctx->handle_range(0, n); });
229  }
230  }
231 
232  // Convenience wrapper for parallelForAsync that does not align blocks.
233  void parallelForAsync(Index n, const TensorOpCost& cost, std::function<void(Index, Index)> f,
234  std::function<void()> done) const {
235  parallelForAsync(n, cost, nullptr, std::move(f), std::move(done));
236  }
237 
238  // Thread pool accessor.
239  ThreadPoolInterface* getPool() const { return pool_; }
240 
241  // Allocator accessor.
242  Allocator* allocator() const { return allocator_; }
243 
244  private:
245  typedef TensorCostModel<ThreadPoolDevice> CostModel;
246 
247  static void handleRange(Index firstIdx, Index lastIdx, Index granularity, Barrier* barrier, ThreadPoolInterface* pool,
248  const std::function<void(Index, Index)>& f) {
249  while (lastIdx - firstIdx > granularity) {
250  // Split into halves and schedule the second half on a different thread.
251  const Index midIdx = firstIdx + numext::div_ceil((lastIdx - firstIdx) / 2, granularity) * granularity;
252  pool->Schedule([=, &f]() { handleRange(midIdx, lastIdx, granularity, barrier, pool, f); });
253  lastIdx = midIdx;
254  }
255  // Single block or less, execute directly.
256  f(firstIdx, lastIdx);
257  barrier->Notify();
258  }
259 
260  // For parallelForAsync we must keep passed in closures on the heap, and
261  // delete them only after `done` callback finished.
262  struct ParallelForAsyncContext {
263  ParallelForAsyncContext(Index block_count, std::function<void(Index, Index)> block_f,
264  std::function<void()> done_callback)
265  : count(block_count), f(std::move(block_f)), done(std::move(done_callback)) {}
266  ~ParallelForAsyncContext() { done(); }
267 
268  std::atomic<Index> count;
269  std::function<void(Index, Index)> f;
270  std::function<void()> done;
271 
272  std::function<void(Index, Index)> handle_range;
273  };
274 
275  struct ParallelForBlock {
276  Index size; // block size
277  Index count; // number of blocks
278  };
279 
280  // Calculates block size based on (1) the iteration cost and (2) parallel
281  // efficiency. We want blocks to be not too small to mitigate parallelization
282  // overheads; not too large to mitigate tail effect and potential load
283  // imbalance and we also want number of blocks to be evenly dividable across
284  // threads.
285  ParallelForBlock CalculateParallelForBlock(const Index n, const TensorOpCost& cost,
286  std::function<Index(Index)> block_align) const {
287  const double block_size_f = 1.0 / CostModel::taskSize(1, cost);
288  const Index max_oversharding_factor = 4;
289  Index block_size = numext::mini(
290  n, numext::maxi<Index>(numext::div_ceil<Index>(n, max_oversharding_factor * numThreads()), block_size_f));
291  const Index max_block_size = numext::mini(n, 2 * block_size);
292 
293  if (block_align) {
294  Index new_block_size = block_align(block_size);
295  eigen_assert(new_block_size >= block_size);
296  block_size = numext::mini(n, new_block_size);
297  }
298 
299  Index block_count = numext::div_ceil(n, block_size);
300 
301  // Calculate parallel efficiency as fraction of total CPU time used for
302  // computations:
303  double max_efficiency =
304  static_cast<double>(block_count) / (numext::div_ceil<Index>(block_count, numThreads()) * numThreads());
305 
306  // Now try to increase block size up to max_block_size as long as it
307  // doesn't decrease parallel efficiency.
308  for (Index prev_block_count = block_count; max_efficiency < 1.0 && prev_block_count > 1;) {
309  // This is the next block size that divides size into a smaller number
310  // of blocks than the current block_size.
311  Index coarser_block_size = numext::div_ceil(n, prev_block_count - 1);
312  if (block_align) {
313  Index new_block_size = block_align(coarser_block_size);
314  eigen_assert(new_block_size >= coarser_block_size);
315  coarser_block_size = numext::mini(n, new_block_size);
316  }
317  if (coarser_block_size > max_block_size) {
318  break; // Reached max block size. Stop.
319  }
320  // Recalculate parallel efficiency.
321  const Index coarser_block_count = numext::div_ceil(n, coarser_block_size);
322  eigen_assert(coarser_block_count < prev_block_count);
323  prev_block_count = coarser_block_count;
324  const double coarser_efficiency = static_cast<double>(coarser_block_count) /
325  (numext::div_ceil<Index>(coarser_block_count, numThreads()) * numThreads());
326  if (coarser_efficiency + 0.01 >= max_efficiency) {
327  // Taking it.
328  block_size = coarser_block_size;
329  block_count = coarser_block_count;
330  if (max_efficiency < coarser_efficiency) {
331  max_efficiency = coarser_efficiency;
332  }
333  }
334  }
335 
336  return {block_size, block_count};
337  }
338 
339  ThreadPoolInterface* pool_;
340  int num_threads_;
341  Allocator* allocator_;
342 };
343 
344 } // end namespace Eigen
345 
346 #endif // EIGEN_CXX11_TENSOR_TENSOR_DEVICE_THREAD_POOL_H
static constexpr lastp1_t end
Namespace containing all symbols from the Eigen library.
Definition: AutoDiffScalar.h:629
std::ptrdiff_t l3CacheSize()
EIGEN_DEFAULT_DENSE_INDEX_TYPE Index
std::ptrdiff_t l1CacheSize()