17#ifndef KOKKOS_IMPL_PUBLIC_INCLUDE
18#include <Kokkos_Macros.hpp>
20 "Including non-public Kokkos header files is not allowed.");
22#ifndef KOKKOS_TASKSCHEDULER_HPP
23#define KOKKOS_TASKSCHEDULER_HPP
27#include <Kokkos_Macros.hpp>
28#if defined(KOKKOS_ENABLE_TASKDAG)
30#include <Kokkos_Core_fwd.hpp>
31#include <Kokkos_TaskScheduler_fwd.hpp>
34#include <Kokkos_MemoryPool.hpp>
36#include <Kokkos_Future.hpp>
37#include <impl/Kokkos_TaskQueue.hpp>
38#include <impl/Kokkos_SingleTaskQueue.hpp>
39#include <impl/Kokkos_TaskQueueMultiple.hpp>
40#include <impl/Kokkos_TaskPolicyData.hpp>
41#include <impl/Kokkos_TaskTeamMember.hpp>
42#include <impl/Kokkos_SimpleTaskScheduler.hpp>
51template <
class,
class>
56template <
class ExecSpace,
class QueueType>
57class BasicTaskScheduler :
public Impl::TaskSchedulerBase {
59 using scheduler_type = BasicTaskScheduler;
60 using execution_space = ExecSpace;
61 using queue_type = QueueType;
62 using memory_space =
typename queue_type::memory_space;
63 using memory_pool =
typename queue_type::memory_pool;
64 using specialization = Impl::TaskQueueSpecialization<BasicTaskScheduler>;
65 using member_type =
typename specialization::member_type;
66 using team_scheduler_type = BasicTaskScheduler;
67 template <
class Functor>
68 using runnable_task_type =
69 Impl::Task<scheduler_type, typename Functor::value_type, Functor>;
70 template <
class ValueType>
71 using future_type = Kokkos::BasicFuture<ValueType, BasicTaskScheduler>;
72 template <
class FunctorType>
73 using future_type_for_functor = future_type<typename FunctorType::value_type>;
76 using track_type = Kokkos::Impl::SharedAllocationTracker;
77 using task_base = Impl::TaskBase;
84 template <
typename,
typename>
85 friend class Impl::TaskQueue;
87 friend struct Impl::TaskQueueSpecialization;
88 template <
typename,
typename>
89 friend class Impl::TaskQueueSpecializationConstrained;
90 template <
typename,
typename>
91 friend class Impl::TaskTeamMemberAdapter;
92 template <
typename,
typename>
93 friend class Impl::TaskExec;
97 KOKKOS_INLINE_FUNCTION
98 BasicTaskScheduler(track_type arg_track, queue_type* arg_queue)
99 : m_track(std::move(arg_track)), m_queue(std::move(arg_queue)) {}
101 KOKKOS_INLINE_FUNCTION
102 team_scheduler_type get_team_scheduler(
int team_rank)
const {
103 return {m_track, &m_queue->get_team_queue(team_rank)};
108 KOKKOS_INLINE_FUNCTION
109 static constexpr task_base* _get_task_ptr(std::nullptr_t) {
return nullptr; }
111 template <
class ValueType>
112 KOKKOS_INLINE_FUNCTION
static constexpr task_base* _get_task_ptr(
113 future_type<ValueType>&& f) {
117 template <
int TaskEnum,
typename DepTaskType,
typename FunctorType>
119 Kokkos::BasicFuture<typename FunctorType::value_type, scheduler_type>
120 _spawn_impl(DepTaskType* arg_predecessor_task, TaskPriority arg_priority,
121 typename task_base::function_type arg_function,
122 typename task_base::destroy_type ,
123 FunctorType&& arg_functor) {
124 using functor_future_type =
125 future_type_for_functor<std::decay_t<FunctorType>>;
127 Impl::Task<BasicTaskScheduler,
typename functor_future_type::value_type,
139 functor_future_type f;
143 const size_t alloc_size =
144 m_queue->template spawn_allocation_size<FunctorType>();
146 void* task_storage = m_queue->allocate(alloc_size);
154 new (task_storage) task_type(std::forward<FunctorType>(arg_functor));
156 f.m_task->m_apply = arg_function;
158 f.m_task->m_queue = m_queue;
159 f.m_task->m_next = arg_predecessor_task;
160 f.m_task->m_ref_count = 2;
161 f.m_task->m_alloc_size = alloc_size;
162 f.m_task->m_task_type = TaskEnum;
163 f.m_task->m_priority = (int16_t)arg_priority;
165 Kokkos::memory_fence();
172 m_queue->schedule_runnable(f.m_task);
181 KOKKOS_INLINE_FUNCTION
182 BasicTaskScheduler() : m_track(), m_queue(nullptr) {}
184 KOKKOS_INLINE_FUNCTION
185 BasicTaskScheduler(BasicTaskScheduler&& rhs) noexcept
186 : m_track(rhs.m_track),
188 m_queue(std::move(rhs.m_queue)) {}
190 KOKKOS_INLINE_FUNCTION
191 BasicTaskScheduler(BasicTaskScheduler
const& rhs)
192 : m_track(rhs.m_track), m_queue(rhs.m_queue) {}
194 KOKKOS_INLINE_FUNCTION
195 BasicTaskScheduler& operator=(BasicTaskScheduler&& rhs)
noexcept {
196 m_track = rhs.m_track;
198 m_queue = std::move(rhs.m_queue);
202 KOKKOS_INLINE_FUNCTION
203 BasicTaskScheduler& operator=(BasicTaskScheduler
const& rhs) {
204 m_track = rhs.m_track;
205 m_queue = rhs.m_queue;
209 explicit BasicTaskScheduler(memory_pool
const& arg_memory_pool) noexcept
210 : m_track(), m_queue(
nullptr) {
212 Kokkos::Impl::SharedAllocationRecord<memory_space,
213 typename queue_type::Destroy>;
215 record_type* record = record_type::allocate(
216 memory_space(),
"Kokkos::TaskQueue",
sizeof(queue_type));
218 m_queue =
new (record->data()) queue_type(arg_memory_pool);
220 record->m_destroy.m_queue = m_queue;
222 m_track.assign_allocated_record_to_uninitialized(record);
225 BasicTaskScheduler(memory_space
const& arg_memory_space,
226 size_t const mempool_capacity,
227 unsigned const mempool_min_block_size
229 unsigned const mempool_max_block_size
231 unsigned const mempool_superblock_size
233 : BasicTaskScheduler(memory_pool(
234 arg_memory_space, mempool_capacity, mempool_min_block_size,
235 mempool_max_block_size, mempool_superblock_size)) {}
239 KOKKOS_INLINE_FUNCTION
240 queue_type& queue() const noexcept {
241 KOKKOS_EXPECTS(m_queue !=
nullptr);
245 KOKKOS_INLINE_FUNCTION
246 memory_pool* memory() const noexcept {
247 return m_queue ? &(m_queue->m_memory) : (memory_pool*)0;
252 template <
typename FunctorType>
253 KOKKOS_FUNCTION
size_t spawn_allocation_size()
const {
254 return m_queue->template spawn_allocation_size<FunctorType>();
259 size_t when_all_allocation_size(
int narg)
const {
260 return m_queue->when_all_allocation_size(narg);
265 template <
int TaskEnum,
typename DepFutureType,
typename FunctorType>
266 KOKKOS_FUNCTION
static Kokkos::BasicFuture<
typename FunctorType::value_type,
268 spawn(Impl::TaskPolicyWithScheduler<TaskEnum, scheduler_type, DepFutureType>&&
270 typename task_base::function_type arg_function,
271 typename task_base::destroy_type arg_destroy,
272 FunctorType&& arg_functor) {
273 return std::move(arg_policy.scheduler())
274 .template _spawn_impl<TaskEnum>(
275 _get_task_ptr(std::move(arg_policy.predecessor())),
276 arg_policy.priority(), arg_function, arg_destroy,
277 std::forward<FunctorType>(arg_functor));
280 template <
int TaskEnum,
typename DepFutureType,
typename FunctorType>
281 KOKKOS_FUNCTION future_type_for_functor<std::decay_t<FunctorType>> spawn(
282 Impl::TaskPolicyWithPredecessor<TaskEnum, DepFutureType>&& arg_policy,
283 FunctorType&& arg_functor) {
284 using task_type = runnable_task_type<FunctorType>;
285 typename task_type::function_type
const ptr = task_type::apply;
286 typename task_type::destroy_type
const dtor = task_type::destroy;
288 return _spawn_impl<TaskEnum>(
289 _get_task_ptr(std::move(arg_policy).predecessor()),
290 arg_policy.priority(), ptr, dtor,
291 std::forward<FunctorType>(arg_functor));
294 template <
typename FunctorType,
typename ValueType,
typename Scheduler>
295 KOKKOS_FUNCTION
static void respawn(
296 FunctorType* arg_self,
297 BasicFuture<ValueType, Scheduler>
const& arg_dependence,
298 TaskPriority
const& arg_priority) {
301 using value_type =
typename FunctorType::value_type;
302 using task_type = Impl::Task<BasicTaskScheduler, value_type, FunctorType>;
304 task_type*
const task =
static_cast<task_type*
>(arg_self);
306 task->m_priority =
static_cast<int>(arg_priority);
308 task->add_dependence(arg_dependence.m_task);
313 template <
typename FunctorType>
314 KOKKOS_FUNCTION
static void respawn(FunctorType* arg_self,
315 BasicTaskScheduler
const&,
316 TaskPriority
const& arg_priority) {
319 using value_type =
typename FunctorType::value_type;
320 using task_type = Impl::Task<BasicTaskScheduler, value_type, FunctorType>;
322 task_type*
const task =
static_cast<task_type*
>(arg_self);
324 task->m_priority =
static_cast<int>(arg_priority);
326 task->add_dependence(
nullptr);
335 template <
typename ValueType>
336 KOKKOS_FUNCTION BasicFuture<void, scheduler_type> when_all(
337 BasicFuture<ValueType, BasicTaskScheduler>
const arg[],
int narg) {
341 queue_type* q = m_queue;
345 for (
int i = 0; i < narg; ++i) {
346 task_base*
const t = arg[i].m_task;
350 Kokkos::Impl::desul_atomic_inc(&(t->m_ref_count),
351 Kokkos::Impl::MemoryOrderSeqCst(),
352 Kokkos::Impl::MemoryScopeDevice());
353 if (q !=
static_cast<queue_type const*
>(t->m_queue)) {
355 "Kokkos when_all Futures must be in the same scheduler");
363 size_t const alloc_size = q->when_all_allocation_size(narg);
365 f.m_task =
reinterpret_cast<task_base*
>(q->allocate(alloc_size));
373 new (f.m_task) task_base();
375 f.m_task->m_queue = q;
376 f.m_task->m_ref_count = 2;
377 f.m_task->m_alloc_size =
static_cast<int32_t
>(alloc_size);
378 f.m_task->m_dep_count = narg;
379 f.m_task->m_task_type = task_base::Aggregate;
383 task_base*
volatile*
const dep = f.m_task->aggregate_dependences();
385 for (
int i = 0; i < narg; ++i) {
386 dep[i] = arg[i].m_task;
389 Kokkos::memory_fence();
391 q->schedule_aggregate(f.m_task);
401 KOKKOS_FUNCTION BasicFuture<void, scheduler_type> when_all(
int narg,
403 using input_type =
decltype(func(0));
405 static_assert(is_future<input_type>::value,
406 "Functor must return a Kokkos::Future");
410 if (0 == narg)
return f;
412 size_t const alloc_size = m_queue->when_all_allocation_size(narg);
414 f.m_task =
reinterpret_cast<task_base*
>(m_queue->allocate(alloc_size));
421 new (f.m_task) task_base();
425 f.m_task->m_queue = m_queue;
426 f.m_task->m_ref_count = 2;
427 f.m_task->m_alloc_size =
static_cast<int32_t
>(alloc_size);
428 f.m_task->m_dep_count = narg;
429 f.m_task->m_task_type = task_base::Aggregate;
435 task_base*
volatile*
const dep = f.m_task->aggregate_dependences();
437 for (
int i = 0; i < narg; ++i) {
438 const input_type arg_f = func(i);
439 if (
nullptr != arg_f.m_task) {
448 Kokkos::Impl::desul_atomic_inc(&(arg_f.m_task->m_ref_count),
449 Kokkos::Impl::MemoryOrderSeqCst(),
450 Kokkos::Impl::MemoryScopeDevice());
451 dep[i] = arg_f.m_task;
455 Kokkos::memory_fence();
457 m_queue->schedule_aggregate(f.m_task);
465 KOKKOS_INLINE_FUNCTION
466 int allocation_capacity() const noexcept {
467 return m_queue->m_memory.capacity();
470 KOKKOS_INLINE_FUNCTION
471 int allocated_task_count() const noexcept {
return m_queue->m_count_alloc; }
473 KOKKOS_INLINE_FUNCTION
474 int allocated_task_count_max() const noexcept {
return m_queue->m_max_alloc; }
476 KOKKOS_INLINE_FUNCTION
477 long allocated_task_count_accum() const noexcept {
478 return m_queue->m_accum_alloc;
483 template <
class S,
class Q>
484 friend void wait(Kokkos::BasicTaskScheduler<S, Q>
const&);
497template <
class T,
class Scheduler>
498Impl::TaskPolicyWithPredecessor<Impl::TaskType::TaskTeam,
499 Kokkos::BasicFuture<T, Scheduler>>
500 KOKKOS_INLINE_FUNCTION
501 TaskTeam(Kokkos::BasicFuture<T, Scheduler> arg_future,
502 TaskPriority arg_priority = TaskPriority::Regular) {
503 return {std::move(arg_future), arg_priority};
506template <
class Scheduler>
507Impl::TaskPolicyWithScheduler<Impl::TaskType::TaskTeam, Scheduler>
508 KOKKOS_INLINE_FUNCTION TaskTeam(
509 Scheduler arg_scheduler,
510 std::enable_if_t<Kokkos::is_scheduler<Scheduler>::value, TaskPriority>
511 arg_priority = TaskPriority::Regular) {
512 return {std::move(arg_scheduler), arg_priority};
515template <
class Scheduler,
class PredecessorFuture>
516Impl::TaskPolicyWithScheduler<Kokkos::Impl::TaskType::TaskTeam, Scheduler,
518 KOKKOS_INLINE_FUNCTION
519 TaskTeam(Scheduler arg_scheduler, PredecessorFuture arg_future,
520 std::enable_if_t<Kokkos::is_scheduler<Scheduler>::value &&
521 Kokkos::is_future<PredecessorFuture>::value,
523 arg_priority = TaskPriority::Regular) {
524 static_assert(std::is_same<
typename PredecessorFuture::scheduler_type,
526 "Can't create a task policy from a scheduler and a future from "
527 "a different scheduler");
529 return {std::move(arg_scheduler), std::move(arg_future), arg_priority};
534template <
class T,
class Scheduler>
535Impl::TaskPolicyWithPredecessor<Impl::TaskType::TaskSingle,
536 Kokkos::BasicFuture<T, Scheduler>>
537 KOKKOS_INLINE_FUNCTION
538 TaskSingle(Kokkos::BasicFuture<T, Scheduler> arg_future,
539 TaskPriority arg_priority = TaskPriority::Regular) {
540 return {std::move(arg_future), arg_priority};
543template <
class Scheduler>
544Impl::TaskPolicyWithScheduler<Impl::TaskType::TaskSingle, Scheduler>
545 KOKKOS_INLINE_FUNCTION TaskSingle(
546 Scheduler arg_scheduler,
547 std::enable_if_t<Kokkos::is_scheduler<Scheduler>::value, TaskPriority>
548 arg_priority = TaskPriority::Regular) {
549 return {std::move(arg_scheduler), arg_priority};
552template <
class Scheduler,
class PredecessorFuture>
553Impl::TaskPolicyWithScheduler<Kokkos::Impl::TaskType::TaskSingle, Scheduler,
555 KOKKOS_INLINE_FUNCTION
556 TaskSingle(Scheduler arg_scheduler, PredecessorFuture arg_future,
557 std::enable_if_t<Kokkos::is_scheduler<Scheduler>::value &&
558 Kokkos::is_future<PredecessorFuture>::value,
560 arg_priority = TaskPriority::Regular) {
561 static_assert(std::is_same<
typename PredecessorFuture::scheduler_type,
563 "Can't create a task policy from a scheduler and a future from "
564 "a different scheduler");
566 return {std::move(arg_scheduler), std::move(arg_future), arg_priority};
577template <
int TaskEnum,
typename Scheduler,
typename DepFutureType,
578 typename FunctorType>
579typename Scheduler::template future_type_for_functor<std::decay_t<FunctorType>>
580host_spawn(Impl::TaskPolicyWithScheduler<TaskEnum, Scheduler, DepFutureType>
582 FunctorType&& arg_functor) {
583 using scheduler_type = Scheduler;
585 typename scheduler_type::template runnable_task_type<FunctorType>;
587 static_assert(TaskEnum == Impl::TaskType::TaskTeam ||
588 TaskEnum == Impl::TaskType::TaskSingle,
589 "Kokkos host_spawn requires TaskTeam or TaskSingle");
593 typename task_type::function_type ptr;
594 typename task_type::destroy_type dtor;
595 Kokkos::Impl::TaskQueueSpecialization<
596 scheduler_type>::template get_function_pointer<task_type>(ptr, dtor);
598 return scheduler_type::spawn(std::move(arg_policy), ptr, dtor,
599 std::forward<FunctorType>(arg_functor));
608template <
int TaskEnum,
typename Scheduler,
typename DepFutureType,
609 typename FunctorType>
610typename Scheduler::template future_type_for_functor<std::decay_t<FunctorType>>
611 KOKKOS_INLINE_FUNCTION
612 task_spawn(Impl::TaskPolicyWithScheduler<TaskEnum, Scheduler, DepFutureType>
614 FunctorType&& arg_functor) {
615 using scheduler_type = Scheduler;
618 typename scheduler_type::template runnable_task_type<FunctorType>;
620 static_assert(TaskEnum == Impl::TaskType::TaskTeam ||
621 TaskEnum == Impl::TaskType::TaskSingle,
622 "Kokkos task_spawn requires TaskTeam or TaskSingle");
624 typename task_type::function_type
const ptr = task_type::apply;
625 typename task_type::destroy_type
const dtor = task_type::destroy;
627 return scheduler_type::spawn(std::move(arg_policy), ptr, dtor,
628 std::forward<FunctorType>(arg_functor));
636template <
typename FunctorType,
typename T>
637void KOKKOS_INLINE_FUNCTION
638respawn(FunctorType* arg_self, T
const& arg,
639 TaskPriority
const& arg_priority = TaskPriority::Regular) {
640 static_assert(Kokkos::is_future<T>::value || Kokkos::is_scheduler<T>::value,
641 "Kokkos respawn argument must be Future or TaskScheduler");
643 T::scheduler_type::respawn(arg_self, arg, arg_priority);
659template <
class ExecSpace,
class QueueType>
660inline void wait(BasicTaskScheduler<ExecSpace, QueueType>
const& scheduler) {
661 using scheduler_type = BasicTaskScheduler<ExecSpace, QueueType>;
662 scheduler_type::specialization::execute(scheduler);