$darkmode
Eigen  5.0.1-dev
ThreadLocal.h
1 // This file is part of Eigen, a lightweight C++ template library
2 // for linear algebra.
3 //
4 // Copyright (C) 2016 Benoit Steiner <benoit.steiner.goog@gmail.com>
5 //
6 // This Source Code Form is subject to the terms of the Mozilla
7 // Public License v. 2.0. If a copy of the MPL was not distributed
8 // with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
9 
10 #ifndef EIGEN_CXX11_THREADPOOL_THREAD_LOCAL_H
11 #define EIGEN_CXX11_THREADPOOL_THREAD_LOCAL_H
12 
13 #ifdef EIGEN_AVOID_THREAD_LOCAL
14 
15 #ifdef EIGEN_THREAD_LOCAL
16 #undef EIGEN_THREAD_LOCAL
17 #endif
18 
19 #else
20 
21 #if ((EIGEN_COMP_GNUC) || __has_feature(cxx_thread_local) || EIGEN_COMP_MSVC)
22 #define EIGEN_THREAD_LOCAL static thread_local
23 #endif
24 
25 // Disable TLS for Apple and Android builds with older toolchains.
26 #if defined(__APPLE__)
27 // Included for TARGET_OS_IPHONE, __IPHONE_OS_VERSION_MIN_REQUIRED,
28 // __IPHONE_8_0.
29 #include <Availability.h>
30 #include <TargetConditionals.h>
31 #endif
32 // Checks whether C++11's `thread_local` storage duration specifier is
33 // supported.
34 #if EIGEN_COMP_CLANGAPPLE && \
35  ((EIGEN_COMP_CLANGAPPLE < 8000042) || (TARGET_OS_IPHONE && __IPHONE_OS_VERSION_MIN_REQUIRED < __IPHONE_9_0))
36 // Notes: Xcode's clang did not support `thread_local` until version
37 // 8, and even then not for all iOS < 9.0.
38 #undef EIGEN_THREAD_LOCAL
39 
40 #elif defined(__ANDROID__) && EIGEN_COMP_CLANG
41 // There are platforms for which TLS should not be used even though the compiler
42 // makes it seem like it's supported (Android NDK < r12b for example).
43 // This is primarily because of linker problems and toolchain misconfiguration:
44 // TLS isn't supported until NDK r12b per
45 // https://developer.android.com/ndk/downloads/revision_history.html
46 
47 #if defined(__ANDROID__) && defined(__clang__) && defined(__NDK_MAJOR__) && defined(__NDK_MINOR__) && \
48  ((__NDK_MAJOR__ < 12) || ((__NDK_MAJOR__ == 12) && (__NDK_MINOR__ < 1)))
49 #undef EIGEN_THREAD_LOCAL
50 #endif
51 #endif // defined(__ANDROID__) && defined(__clang__)
52 
53 #endif // EIGEN_AVOID_THREAD_LOCAL
54 
55 // IWYU pragma: private
56 #include "./InternalHeaderCheck.h"
57 
58 namespace Eigen {
59 
60 namespace internal {
61 template <typename T>
62 struct ThreadLocalNoOpInitialize {
63  void operator()(T&) const {}
64 };
65 
66 template <typename T>
67 struct ThreadLocalNoOpRelease {
68  void operator()(T&) const {}
69 };
70 
71 } // namespace internal
72 
73 // Thread local container for elements of type T, that does not use thread local
74 // storage. As long as the number of unique threads accessing this storage
75 // is smaller than `capacity_`, it is lock-free and wait-free. Otherwise it will
76 // use a mutex for synchronization.
77 //
78 // Type `T` has to be default constructible, and by default each thread will get
79 // a default constructed value. It is possible to specify custom `initialize`
80 // callable, that will be called lazily from each thread accessing this object,
81 // and will be passed a default initialized object of type `T`. Also it's
82 // possible to pass a custom `release` callable, that will be invoked before
83 // calling ~T().
84 //
85 // Example:
86 //
87 // struct Counter {
88 // int value = 0;
89 // }
90 //
91 // Eigen::ThreadLocal<Counter> counter(10);
92 //
93 // // Each thread will have access to it's own counter object.
94 // Counter& cnt = counter.local();
95 // cnt++;
96 //
97 // WARNING: Eigen::ThreadLocal uses the OS-specific value returned by
98 // std::this_thread::get_id() to identify threads. This value is not guaranteed
99 // to be unique except for the life of the thread. A newly created thread may
100 // get an OS-specific ID equal to that of an already destroyed thread.
101 //
102 // Somewhat similar to TBB thread local storage, with similar restrictions:
103 // https://www.threadingbuildingblocks.org/docs/help/reference/thread_local_storage/enumerable_thread_specific_cls.html
104 //
105 template <typename T, typename Initialize = internal::ThreadLocalNoOpInitialize<T>,
106  typename Release = internal::ThreadLocalNoOpRelease<T>>
107 class ThreadLocal {
108  // We preallocate default constructed elements in MaxSizedVector.
109  static_assert(std::is_default_constructible<T>::value, "ThreadLocal data type must be default constructible");
110 
111  public:
112  explicit ThreadLocal(int capacity)
113  : ThreadLocal(capacity, internal::ThreadLocalNoOpInitialize<T>(), internal::ThreadLocalNoOpRelease<T>()) {}
114 
115  ThreadLocal(int capacity, Initialize initialize)
116  : ThreadLocal(capacity, std::move(initialize), internal::ThreadLocalNoOpRelease<T>()) {}
117 
118  ThreadLocal(int capacity, Initialize initialize, Release release)
119  : initialize_(std::move(initialize)),
120  release_(std::move(release)),
121  capacity_(capacity),
122  data_(capacity_),
123  ptr_(capacity_),
124  filled_records_(0) {
125  eigen_assert(capacity_ >= 0);
126  data_.resize(capacity_);
127  for (int i = 0; i < capacity_; ++i) {
128  ptr_.emplace_back(nullptr);
129  }
130  }
131 
132  T& local() {
133  std::thread::id this_thread = std::this_thread::get_id();
134  if (capacity_ == 0) return SpilledLocal(this_thread);
135 
136  std::size_t h = std::hash<std::thread::id>()(this_thread);
137  const int start_idx = h % capacity_;
138 
139  // NOTE: From the definition of `std::this_thread::get_id()` it is
140  // guaranteed that we never can have concurrent insertions with the same key
141  // to our hash-map like data structure. If we didn't find an element during
142  // the initial traversal, it's guaranteed that no one else could have
143  // inserted it while we are in this function. This allows to massively
144  // simplify out lock-free insert-only hash map.
145 
146  // Check if we already have an element for `this_thread`.
147  int idx = start_idx;
148  while (ptr_[idx].load() != nullptr) {
149  ThreadIdAndValue& record = *(ptr_[idx].load());
150  if (record.thread_id == this_thread) return record.value;
151 
152  idx += 1;
153  if (idx >= capacity_) idx -= capacity_;
154  if (idx == start_idx) break;
155  }
156 
157  // If we are here, it means that we found an insertion point in lookup
158  // table at `idx`, or we did a full traversal and table is full.
159 
160  // If lock-free storage is full, fallback on mutex.
161  if (filled_records_.load() >= capacity_) return SpilledLocal(this_thread);
162 
163  // We double check that we still have space to insert an element into a lock
164  // free storage. If old value in `filled_records_` is larger than the
165  // records capacity, it means that some other thread added an element while
166  // we were traversing lookup table.
167  int insertion_index = filled_records_.fetch_add(1, std::memory_order_relaxed);
168  if (insertion_index >= capacity_) return SpilledLocal(this_thread);
169 
170  // At this point it's guaranteed that we can access to
171  // data_[insertion_index_] without a data race.
172  data_[insertion_index].thread_id = this_thread;
173  initialize_(data_[insertion_index].value);
174 
175  // That's the pointer we'll put into the lookup table.
176  ThreadIdAndValue* inserted = &data_[insertion_index];
177 
178  // We'll use nullptr pointer to ThreadIdAndValue in a compare-and-swap loop.
179  ThreadIdAndValue* empty = nullptr;
180 
181  // Now we have to find an insertion point into the lookup table. We start
182  // from the `idx` that was identified as an insertion point above, it's
183  // guaranteed that we will have an empty record somewhere in a lookup table
184  // (because we created a record in the `data_`).
185  const int insertion_idx = idx;
186 
187  do {
188  // Always start search from the original insertion candidate.
189  idx = insertion_idx;
190  while (ptr_[idx].load() != nullptr) {
191  idx += 1;
192  if (idx >= capacity_) idx -= capacity_;
193  // If we did a full loop, it means that we don't have any free entries
194  // in the lookup table, and this means that something is terribly wrong.
195  eigen_assert(idx != insertion_idx);
196  }
197  // Atomic CAS of the pointer guarantees that any other thread, that will
198  // follow this pointer will see all the mutations in the `data_`.
199  } while (!ptr_[idx].compare_exchange_weak(empty, inserted));
200 
201  return inserted->value;
202  }
203 
204  // WARN: It's not thread safe to call it concurrently with `local()`.
205  void ForEach(std::function<void(std::thread::id, T&)> f) {
206  // Reading directly from `data_` is unsafe, because only CAS to the
207  // record in `ptr_` makes all changes visible to other threads.
208  for (auto& ptr : ptr_) {
209  ThreadIdAndValue* record = ptr.load();
210  if (record == nullptr) continue;
211  f(record->thread_id, record->value);
212  }
213 
214  // We did not spill into the map based storage.
215  if (filled_records_.load(std::memory_order_relaxed) < capacity_) return;
216 
217  // Adds a happens before edge from the last call to SpilledLocal().
218  EIGEN_MUTEX_LOCK lock(mu_);
219  for (auto& kv : per_thread_map_) {
220  f(kv.first, kv.second);
221  }
222  }
223 
224  // WARN: It's not thread safe to call it concurrently with `local()`.
225  ~ThreadLocal() {
226  // Reading directly from `data_` is unsafe, because only CAS to the record
227  // in `ptr_` makes all changes visible to other threads.
228  for (auto& ptr : ptr_) {
229  ThreadIdAndValue* record = ptr.load();
230  if (record == nullptr) continue;
231  release_(record->value);
232  }
233 
234  // We did not spill into the map based storage.
235  if (filled_records_.load(std::memory_order_relaxed) < capacity_) return;
236 
237  // Adds a happens before edge from the last call to SpilledLocal().
238  EIGEN_MUTEX_LOCK lock(mu_);
239  for (auto& kv : per_thread_map_) {
240  release_(kv.second);
241  }
242  }
243 
244  private:
245  struct ThreadIdAndValue {
246  std::thread::id thread_id;
247  T value;
248  };
249 
250  // Use unordered map guarded by a mutex when lock free storage is full.
251  T& SpilledLocal(std::thread::id this_thread) {
252  EIGEN_MUTEX_LOCK lock(mu_);
253 
254  auto it = per_thread_map_.find(this_thread);
255  if (it == per_thread_map_.end()) {
256  auto result = per_thread_map_.emplace(this_thread, T());
257  eigen_assert(result.second);
258  initialize_((*result.first).second);
259  return (*result.first).second;
260  } else {
261  return it->second;
262  }
263  }
264 
265  Initialize initialize_;
266  Release release_;
267  const int capacity_;
268 
269  // Storage that backs lock-free lookup table `ptr_`. Records stored in this
270  // storage contiguously starting from index 0.
271  MaxSizeVector<ThreadIdAndValue> data_;
272 
273  // Atomic pointers to the data stored in `data_`. Used as a lookup table for
274  // linear probing hash map (https://en.wikipedia.org/wiki/Linear_probing).
275  MaxSizeVector<std::atomic<ThreadIdAndValue*>> ptr_;
276 
277  // Number of records stored in the `data_`.
278  std::atomic<int> filled_records_;
279 
280  // We fallback on per thread map if lock-free storage is full. In practice
281  // this should never happen, if `capacity_` is a reasonable estimate of the
282  // number of threads running in a system.
283  EIGEN_MUTEX mu_; // Protects per_thread_map_.
284  std::unordered_map<std::thread::id, T> per_thread_map_;
285 };
286 
287 } // namespace Eigen
288 
289 #endif // EIGEN_CXX11_THREADPOOL_THREAD_LOCAL_H
Namespace containing all symbols from the Eigen library.
Definition: B01_Experimental.dox:1
Definition: BFloat16.h:231