include/cuda/experimental/__stf/internal/task.cuh
File members: include/cuda/experimental/__stf/internal/task.cuh
//===----------------------------------------------------------------------===//
//
// Part of CUDASTF in CUDA C++ Core Libraries,
// under the Apache License v2.0 with LLVM Exceptions.
// See https://llvm.org/LICENSE.txt for license information.
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
// SPDX-FileCopyrightText: Copyright (c) 2022-2024 NVIDIA CORPORATION & AFFILIATES.
//
//===----------------------------------------------------------------------===//
#pragma once
#include <cuda/__cccl_config>
#if defined(_CCCL_IMPLICIT_SYSTEM_HEADER_GCC)
# pragma GCC system_header
#elif defined(_CCCL_IMPLICIT_SYSTEM_HEADER_CLANG)
# pragma clang system_header
#elif defined(_CCCL_IMPLICIT_SYSTEM_HEADER_MSVC)
# pragma system_header
#endif // no system header
/*
* This is a generic class of "tasks" that are synchronized according to
* accesses on "data" depending on in/out dependencies
*/
#include <cuda/experimental/__stf/internal/msir.cuh>
#include <cuda/experimental/__stf/internal/task_dep.cuh> // task has-a task_dep_vector_untyped
#include <optional>
namespace cuda::experimental::stf
{
namespace reserved
{
class mapping_id_tag
{};
using mapping_id_t = reserved::unique_id<reserved::mapping_id_tag>;
} // end namespace reserved
class backend_ctx_untyped;
class logical_data_untyped;
class exec_place;
void reclaim_memory(
backend_ctx_untyped& ctx, const data_place& place, size_t requested_s, size_t& reclaimed_s, event_list& prereqs);
class task
{
public:
enum class phase
{
setup, // the task has not started yet
running, // between acquire and release
finished, // we have released
};
private:
// pimpl
class impl
{
public:
impl(const impl&) = delete;
impl& operator=(const impl&) = delete;
impl(exec_place where = exec_place::current_device())
: e_place(mv(where))
, affine_data_place(e_place.affine_data_place())
{}
// Vector of user provided deps
task_dep_vector_untyped deps;
// This list is only useful when calling the get() method of a task, to
// reduce overheads, we initialize this vector lazily
void initialize_reordered_indexes()
{
// This list gives converts the original index to the sorted index
// For example the first entry of the list before being ordered has order t->reordered_index[0]
reordered_indexes.resize(deps.size());
int sorted_index = 0;
for (auto& it : deps)
{
reordered_indexes[it.dependency_index] = sorted_index;
sorted_index++;
}
}
// Get the index of the dependency after reordering, for example
// deps[reordered_index[0]] is the first piece of data
::std::vector<size_t> reordered_indexes;
// Indices of logical data which were locked (non skipped). Indexes are
// those obtained after sorting.
::std::vector<::std::pair<size_t, access_mode>> unskipped_indexes;
// Extra events that need to be done before the task starts. These are
// "extra" as these are in addition to the events that will be required to
// acquire the logical_data_untypeds accessed by the task
event_list input_events;
// A string useful for debugging purpose
mutable ::std::string symbol;
// This points to the prerequisites for this task's termination
event_list done_prereqs;
// Used to uniquely identify the task
reserved::unique_id_t unique_id;
// Used to uniquely identify the task for mapping purposes
reserved::mapping_id_t mapping_id;
// This is a pointer to a generic data structure used by "unset_place" to
// restore previous context
exec_place saved_place_ctx;
// Indicate the status of the task
task::phase phase = task::phase::setup;
// This is where the task is executed
exec_place e_place;
// This is the default data place for the task. In general this is the
// affine data place of the execution place, but this can be a
// composite data place when using a grid of places for example.
data_place affine_data_place;
::std::vector<::std::function<void()>> post_submission_hooks;
};
protected:
// This is the only state
::std::shared_ptr<impl> pimpl;
public:
task()
: pimpl(::std::make_shared<impl>())
{}
task(exec_place ep)
: pimpl(::std::make_shared<impl>(mv(ep)))
{}
task(const task& rhs)
: pimpl(rhs.pimpl)
{}
task(task&&) = default;
task& operator=(const task& rhs) = default;
task& operator=(task&& rhs) = default;
explicit operator bool() const
{
return pimpl != nullptr;
}
bool operator==(const task& rhs) const
{
return pimpl == rhs.pimpl;
}
const ::std::string& get_symbol() const
{
if (pimpl->symbol.empty())
{
pimpl->symbol = "task " + ::std::to_string(pimpl->unique_id);
}
return pimpl->symbol;
}
void set_symbol(::std::string new_symbol)
{
EXPECT(get_task_phase() == phase::setup);
pimpl->symbol = mv(new_symbol);
}
void add_dep(task_dep_untyped d)
{
EXPECT(get_task_phase() == phase::setup);
pimpl->deps.push_back(mv(d));
}
void add_deps(task_dep_vector_untyped input_deps)
{
EXPECT(get_task_phase() == phase::setup);
if (pimpl->deps.empty())
{
// Frequent case
pimpl->deps = mv(input_deps);
}
else
{
pimpl->deps.insert(
pimpl->deps.end(), ::std::make_move_iterator(input_deps.begin()), ::std::make_move_iterator(input_deps.end()));
}
}
template <typename... Pack>
void add_deps(task_dep_untyped first, Pack&&... pack)
{
EXPECT(get_task_phase() == phase::setup);
pimpl->deps.push_back(mv(first));
if constexpr (sizeof...(Pack) > 0)
{
add_deps(::std::forward<Pack>(pack)...);
}
}
const task_dep_vector_untyped& get_task_deps() const
{
return pimpl->deps;
}
task& on(exec_place p)
{
EXPECT(get_task_phase() == phase::setup);
// This defines an affine data place too
set_affine_data_place(p.affine_data_place());
pimpl->e_place = mv(p);
return *this;
}
const exec_place& get_exec_place() const
{
return pimpl->e_place;
}
exec_place& get_exec_place()
{
return pimpl->e_place;
}
void set_exec_place(const exec_place& place)
{
pimpl->e_place = place;
}
const data_place& get_affine_data_place() const
{
return pimpl->affine_data_place;
}
void set_affine_data_place(data_place affine_data_place)
{
pimpl->affine_data_place = mv(affine_data_place);
}
dim4 grid_dims() const
{
return get_exec_place().grid_dims();
}
const event_list& get_done_prereqs() const
{
return pimpl->done_prereqs;
}
template <typename T>
void merge_event_list(T&& tail)
{
pimpl->done_prereqs.merge(::std::forward<T>(tail));
}
instance_id_t find_data_instance_id(const logical_data_untyped& d) const;
template <typename T, typename logical_data_untyped = logical_data_untyped>
decltype(auto) get(size_t submitted_index) const;
// If there are extra input dependencies in addition to STF-induced events
void set_input_events(event_list _input_events)
{
EXPECT(get_task_phase() == phase::setup);
pimpl->input_events = mv(_input_events);
}
const event_list& get_input_events() const
{
return pimpl->input_events;
}
// Get the unique task identifier
int get_unique_id() const
{
return pimpl->unique_id;
}
// Get the unique task mapping identifier
int get_mapping_id() const
{
return pimpl->mapping_id;
}
size_t hash() const
{
return ::std::hash<impl*>()(pimpl.get());
}
void add_post_submission_hook(::std::vector<::std::function<void()>>& hooks)
{
for (auto& h : hooks)
{
pimpl->post_submission_hooks.push_back(h);
}
}
// Resolve all dependencies at the specified execution place
// Returns execution prereqs
event_list acquire(backend_ctx_untyped& ctx);
void release(backend_ctx_untyped& ctx, event_list& done_prereqs);
// Returns the current state of the task
phase get_task_phase() const
{
EXPECT(pimpl);
return pimpl->phase;
}
/* When the task has ended, we cannot do anything with it. It is possible
* that the user-facing task object is not destroyed when the context is
* synchronized, so we clear it.
*
* This for example happens when doing :
* auto t = ctx.task(A.rw());
* t->*[](auto A){...};
* ctx.finalize();
*/
void clear()
{
pimpl.reset((cuda::experimental::stf::task::impl*) nullptr);
}
};
namespace reserved
{
/* This method lazily allocates data (possibly reclaiming memory) and copies data if needed */
template <typename Data>
void dep_allocate(
backend_ctx_untyped& ctx,
Data& d,
access_mode mode,
const data_place& dplace,
const ::std::optional<exec_place> eplace,
instance_id_t instance_id,
event_list& prereqs)
{
auto& inst = d.get_data_instance(instance_id);
/*
* DATA LAZY ALLOCATION
*/
bool already_allocated = inst.is_allocated();
if (!already_allocated)
{
// nvtx_range r("acquire::allocate");
/* Try to allocate memory : if we fail to do so, we must try to
* free other instances first, and retry */
int alloc_attempts = 0;
while (true)
{
::std::ptrdiff_t s = 0;
prereqs.merge(inst.get_read_prereq(), inst.get_write_prereq());
// The allocation routine may decide to store some extra information
void* extra_args = nullptr;
d.allocate(dplace, instance_id, s, &extra_args, prereqs);
// Save extra_args
inst.set_extra_args(extra_args);
if (s >= 0)
{
// This allocation was succesful
inst.allocated_size = s;
inst.set_allocated(true);
inst.reclaimable = true;
break;
}
assert(s < 0);
// Limit the number of attempts if it's simply not possible
EXPECT(alloc_attempts++ < 5);
// We failed to allocate so we try to reclaim
size_t reclaimed_s = 0;
size_t needed = -s;
reclaim_memory(ctx, dplace, needed, reclaimed_s, prereqs);
}
// After allocating a reduction instance, we need to initialize it
if (mode == access_mode::relaxed)
{
assert(eplace.has_value());
// We have just allocated a new piece of data to perform
// reductions, so we need to initialize this with an
// appropriate user-provided operator
// First get the data instance and then its reduction operator
::std::shared_ptr<reduction_operator_base> ops = inst.get_redux_op();
ops->init_op_untyped(d, dplace, instance_id, eplace.value(), prereqs);
}
}
}
} // end namespace reserved
// inline size_t task_state::hash() const {
// size_t h = 0;
// for (auto& e: logical_data_ids) {
// int id = e.first;
// auto handle = e.second.lock();
// // ignore expired handles
// if (handle) {
// hash_combine(h, ::std::hash<int> {}(id));
// hash_combine(h, handle->hash());
// }
// }
// return h;
// }
class data_instance
{
public:
data_instance() {}
data_instance(bool used, data_place dplace)
: used(used)
, dplace(mv(dplace))
{
#if 0
// Since this will default construct a task, we need to decrement the id
reserved::mapping_id_t::decrement_id();
#endif
}
void set_used(bool flag)
{
assert(flag != used);
used = flag;
}
bool get_used() const
{
return used;
}
void set_dplace(data_place _dplace)
{
dplace = mv(_dplace);
}
const data_place& get_dplace() const
{
return dplace;
}
// Returns what is the reduction operator associated to this data instance
::std::shared_ptr<reduction_operator_base> get_redux_op() const
{
return redux_op;
}
// Sets the reduction operator associated to that data instance
void set_redux_op(::std::shared_ptr<reduction_operator_base> op)
{
redux_op = op;
}
// Indicates if the data instance is allocated (ie. if it needs to be
// allocated prior to use). Note that we may have allocated instances that
// are out of sync too.
bool is_allocated() const
{
return state.is_allocated();
}
void set_allocated(bool b)
{
state.set_allocated(b);
}
reserved::msir_state_id get_msir() const
{
return state.get_msir();
}
void set_msir(reserved::msir_state_id st)
{
state.set_msir(st);
}
const event_list& get_read_prereq() const
{
return state.get_read_prereq();
}
const event_list& get_write_prereq() const
{
return state.get_write_prereq();
}
void set_read_prereq(event_list prereq)
{
state.set_read_prereq(mv(prereq));
}
void set_write_prereq(event_list prereq)
{
state.set_write_prereq(mv(prereq));
}
void add_read_prereq(const event_list& _prereq)
{
state.add_read_prereq(_prereq);
}
void add_write_prereq(const event_list& _prereq)
{
state.add_write_prereq(_prereq);
}
void clear_read_prereq()
{
state.clear_read_prereq();
}
void clear_write_prereq()
{
state.clear_write_prereq();
}
bool has_last_task_relaxed() const
{
return last_task_relaxed.has_value();
}
void set_last_task_relaxed(task t)
{
last_task_relaxed = mv(t);
}
const task& get_last_task_relaxed() const
{
assert(last_task_relaxed.has_value());
return last_task_relaxed.value();
}
int max_prereq_id() const
{
return state.max_prereq_id();
}
// Compute a hash of the MSI/Alloc state
size_t state_hash() const
{
return hash<reserved::per_data_instance_msi_state>{}(state);
}
void set_extra_args(void* args)
{
extra_args = args;
}
void* get_extra_args() const
{
return extra_args;
}
void clear()
{
clear_read_prereq();
clear_write_prereq();
last_task_relaxed.reset();
}
private:
// Is this instance available or not ? If not we can reuse this data
// instance when looking for an available slot in the vector of data
// instances attached to the logical data
bool used = false;
// If the used flag is set, this tells where this instance is located
data_place dplace;
// Reduction operator attached to the data instance
::std::shared_ptr<reduction_operator_base> redux_op;
// @@@@TODO@@@@ There are a lot of unchecked forwarding with this variable,
// which is public in practice ...
//
// This structure contains everything to implement the MSI protocol,
// including asynchronous prereqs so that we only use a data instance once
// it's ready to do so
reserved::per_data_instance_msi_state state;
// This stores the last task which used this instance with a relaxed coherence mode (redux)
::std::optional<task> last_task_relaxed;
// This generic pointer can be used to store some information in the
// allocator which is passed to the deallocation routine.
void* extra_args = nullptr;
public:
// Size of the memory allocation (bytes). Only valid for allocated instances.
size_t allocated_size = 0;
// A false value indicates that this instance cannot be a candidate for
// memory reclaiming (e.g. because this corresponds to memory allocated by
// the user)
bool reclaimable = false;
bool automatically_pinned = false;
};
template <>
struct hash<task>
{
::std::size_t operator()(const task& t) const
{
return t.hash();
}
};
} // namespace cuda::experimental::stf