DYT/Tool/OpenSceneGraph-3.6.5/include/osgEarth/weejobs.h

1248 lines
40 KiB
C
Raw Permalink Normal View History

2024-12-24 23:49:36 +00:00
/**
* weejobs
* Copyright 2024 Pelican Mapping
* https://github.com/pelicanmapping/weejobs
* MIT License
*/
#pragma once
#include <atomic>
#include <cfloat>
#include <chrono>
#include <condition_variable>
#include <cstdlib>
#include <functional>
#include <list>
#include <mutex>
#include <thread>
#include <type_traits>
#include <vector>
#include <string>
#include <algorithm>
// OPTIONAL: Define WEEJOBS_EXPORT if you want to use this library from multiple modules (DLLs)
#ifndef WEEJOBS_EXPORT
#define WEEJOBS_EXPORT
#endif
// OPTIONAL: Customize the namespace by defining WEEJOBS_NAMESPACE before including this file.
#ifndef WEEJOBS_NAMESPACE
#define WEEJOBS_NAMESPACE jobs
#endif
// Version
#define WEEJOBS_VERSION_MAJOR 1
#define WEEJOBS_VERSION_MINOR 0
#define WEEJOBS_VERSION_REV 1
#define WEEJOBS_STR_NX(s) #s
#define WEEJOBS_STR(s) WEEJOBS_STR_NX(s)
#define WEEJOBS_COMPUTE_VERSION(major, minor, patch) ((major) * 10000 + (minor) * 100 + (patch))
#define WEEJOBS_VERSION_NUMBER WEEJOBS_COMPUTE_VERSION(WEEJOBS_VERSION_MAJOR, WEEJOBS_VERSION_MINOR, WEEJOBS_VERSION_REV)
#define WEEJOBS_VERSION_STRING WEEJOBS_STR(WEEJOBS_VERSION_MAJOR) "." WEEJOBS_STR(WEEJOBS_VERSION_MINOR) "." WEEJOBS_STR(WEEJOBS_VERSION_REV)
#if __cplusplus >= 201703L || _MSVC_LANG >= 201703L
#define WEEJOBS_NO_DISCARD [[nodiscard]]
#else
#define WEEJOBS_NO_DISCARD
#endif
/**
* weejobs is an API for scheduling a task to run in the background.
* Please read the README.md file for more information.
*/
namespace WEEJOBS_NAMESPACE
{
/**
* Interface for something that can be canceled
*/
class cancelable
{
public:
virtual bool canceled() const { return false; }
};
namespace detail
{
/**
* Event with a binary signaled state, for multi-threaded sychronization.
*
* The event has two states:
* "set" means that a call to wait() will not block;
* "unset" means that calls to wait() will block until another thread calls set().
*
* The event starts out unset.
*
* Typical usage: Thread A creates Thread B to run asynchronous code. Thread A
* then calls wait(), which blocks Thread A. When Thread B is finished, it calls
* set(). Thread A then wakes up and continues execution.
*
* NOTE: ALL waiting threads will wake up when the Event is cleared.
*/
struct event
{
public:
//! Construct a new event
event() : _set(false) { }
//! DTOR
~event() {
_set = false;
for (int i = 0; i < 255; ++i) // workaround buggy broadcast
_cond.notify_all();
}
//! Block until the event is set, then return true.
inline bool wait() {
while (!_set) {
std::unique_lock<std::mutex> lock(_m);
if (!_set)
_cond.wait(lock);
}
return _set;
}
//! Block until the event is set or the timout expires.
//! Return true if the event has set, otherwise false.
template<typename T>
inline bool wait(T timeout) {
if (!_set) {
std::unique_lock<std::mutex> lock(_m);
if (!_set)
_cond.wait_for(lock, timeout);
}
return _set;
}
//! Block until the event is set; then reset it.
inline bool waitAndReset() {
std::unique_lock<std::mutex> lock(_m);
if (!_set)
_cond.wait(lock);
_set = false;
return true;
}
//! Set the event state, causing any waiters to unblock.
inline void set() {
if (!_set) {
std::unique_lock<std::mutex> lock(_m);
if (!_set) {
_set = true;
_cond.notify_all();
}
}
}
//! Reset (unset) the event state; new waiters will block until set() is called.
inline void reset() {
std::unique_lock<std::mutex> lock(_m);
_set = false;
}
//! Whether the event state is set (waiters will not block).
inline bool isSet() const {
return _set;
}
protected:
bool _set;
std::condition_variable_any _cond;
std::mutex _m; // do not use Mutex, we never want tracking
};
/**
* Sempahore lets N users aquire it and then notifies when the
* count goes back down to zero.
*/
class semaphore
{
public:
//! Acquire, increasing the usage count by one
void acquire()
{
std::unique_lock<std::mutex> lock(_m);
++_count;
}
//! Release, decreasing the usage count by one.
//! When the count reaches zero, joiners will be notified and
//! the semaphore will reset to its initial state.
void release()
{
std::unique_lock<std::mutex> lock(_m);
_count = std::max(_count - 1, 0);
if (_count == 0)
_cv.notify_all();
}
//! Reset to initialize state; this will cause a join to occur
//! even if no acquisitions have taken place.
void reset()
{
std::unique_lock<std::mutex> lock(_m);
_count = 0;
_cv.notify_all();
}
//! Current count in the semaphore
std::size_t count() const
{
std::unique_lock<std::mutex> lock(_m);
return _count;
}
//! Block until the semaphore count returns to zero.
//! (It must first have left zero)
//! Warning: this method will block forever if the count
//! never reaches zero!
void join()
{
std::unique_lock<std::mutex> lock(_m);
while (_count > 0)
_cv.wait(lock);
}
//! Block until the semaphore count returns to zero, or
//! the operation is canceled.
//! (It must first have left zero)
void join(cancelable* c)
{
_cv.wait_for(_m, std::chrono::seconds(1), [this, c]() {
return
(_count == 0) ||
(c && c->canceled());
}
);
_count = 0;
}
private:
int _count = 0;
std::condition_variable_any _cv;
mutable std::mutex _m;
};
#if __cplusplus >= 201703L || _MSVC_LANG >= 201703L
template<typename F, typename...Args>
using result_of_t = typename std::invoke_result<F, Args...>::type;
#else
template<typename F, typename...Args>
using result_of_t = typename std::result_of<F(Args...)>::type;
#endif
}
/**
* Include a jobgroup in a context to group together multiple jobs.
* You can then call jobgroup::join() to wait for the whole group
* to finish.
*/
struct jobgroup : public detail::semaphore
{
static std::shared_ptr<jobgroup> create()
{
return std::make_shared<jobgroup>();
}
};
/**
* Context object you can pass to dispatch(...) to control aspects of
* how the background task is run.
*/
struct context
{
std::string name; // readable name of the job
class jobpool* pool = nullptr; // job pool to run in
std::function<float()> priority = {}; // priority of the job
std::shared_ptr<jobgroup> group = nullptr; // join group for this job
bool can_cancel = true; // if true, the job will cancel if its future goes out of scope
};
/**
* Future holds the future result of an asynchronous operation.
*
* Usage:
* Producer (usually an asynchronous function call) creates a future<T>
* (the promise of a future result) and immediately returns it. The Consumer
* then performs other work, and eventually (or immediately) checks available()
* for a result or canceled() for cancelation. If availabile() is true,
* Consumer calls value() to fetch the valid result.
*
* As long as at least two equivalent Future object (i.e. Futures pointing to the
* same internal shared data) exist, the Future is considered valid. Once
* that count goes to one, the Future is either available (the value is ready)
* or empty (i.e., canceled or abandoned).
*/
template<typename T>
class future : public cancelable
{
private:
// internal structure to track references to the result
// One instance of this is shared among all Future instances
// created from the copy constructor.
struct shared_t
{
T _obj;
mutable detail::event _ev;
std::mutex _continuation_mutex;
std::function<void()> _continuation;
std::atomic_bool _continuation_ran = { false };
};
public:
//! Default constructor
future()
{
_shared = std::make_shared<shared_t>();
}
//! Default copy constructor
future(const future& rhs) = default;
//! True is this Future is unused and not connected to any other Future
bool empty() const
{
return !available() && _shared.use_count() == 1;
}
//! True if the promise was resolved and a result if available.
bool available() const
{
return _shared->_ev.isSet();
}
//! True if a promise exists, but has not yet been resolved;
//! Presumably the asynchronous task is still working.
bool working() const
{
return !empty() && !available();
}
// cancelable interface
bool canceled() const override
{
return empty();
}
//! Deference the result object. Make sure you check available()
//! to check that the future was actually resolved; otherwise you
//! will just get the default object.
const T& value() const
{
return _shared->_obj;
}
//! Dereference this object to const pointer to the result.
const T* operator -> () const
{
return &_shared->_obj;
}
//! Result is available AND equal to the argument.
bool has_value(const T& arg) const
{
return available() && value() == arg;
}
//! Same as value(), but if the result is available will reset the
//! future before returning the result object.
T release()
{
bool avail = available();
T result = value();
if (avail)
reset();
return result;
}
//! Blocks until the result becomes available or the future is abandoned;
//! then returns the result object.
const T& join() const
{
while (
!empty() &&
!_shared->_ev.wait(std::chrono::milliseconds(1)));
return value();
}
//! Blocks until the result becomes available or the future is abandoned
//! or a cancelation flag is set; then returns the result object. Be sure to
//! check canceled() after calling join() to see if the return value is valid.
const T& join(const cancelable* p) const
{
while (working() && (p == nullptr || !p->canceled()))
{
_shared->_ev.wait(std::chrono::milliseconds(1));
}
return value();
}
//! Blocks until the result becomes available or the future is abandoned
//! or a cancelation flag is set; then returns the result object. Be sure to
//! check canceled() after calling join() to see if the return value is valid.
const T& join(const cancelable& p) const
{
return join(&p);
}
//! Release reference to a promise, resetting this future to its default state
void abandon()
{
_shared.reset(new shared_t());
}
//! synonym for abandon.
void reset()
{
abandon();
}
//! Resolve (fulfill) the promise with the provided result value.
void resolve(const T& value)
{
_shared->_obj = value;
_shared->_ev.set();
fire_continuation();
}
//! Resolve (fulfill) the promise with an rvalue
void resolve(T&& value)
{
_shared->_obj = std::move(value);
_shared->_ev.set();
fire_continuation();
}
//! Resolve (fulfill) the promise with a default result
void resolve()
{
_shared->_ev.set();
fire_continuation();
}
//! The number of objects, including this one, that
//! reference the shared container. If this method
//! returns 1, that means this is the only object with
//! access to the data. This method will never return zero.
unsigned refs() const
{
return _shared.use_count();
}
//! Add a continuation to this future. The continuation will be dispatched
//! when this object's result becomes available; that result will be the input
//! value to the continuation function. The continuation function in turn must
//! return a value (cannot be void).
template<typename F, typename R = typename detail::result_of_t<F, const T&, cancelable&>>
WEEJOBS_NO_DISCARD inline future<R> then_dispatch(F func, const context& con = {});
//! Add a continuation to this future. Instead of the functor returning a value,
//! it will instead have the option of resolving the incoming future/promise object.
//! This is useful for operations that have their own way of running asynchronous code.
//! Note: for some reason when you use this variant you must specific the template
//! argument, e.g. result.then<int>(auto i, promise<int> p)
template<typename R>
WEEJOBS_NO_DISCARD inline future<R> then_dispatch(std::function<void(const T&, future<R>&)> func, const context& con = {});
//! Add a continuation to this future. The functor only takes an input value and has no
//! return value (fire and forget).
inline void then_dispatch(std::function<void(const T&)> func, const context& con = {});
private:
std::shared_ptr<shared_t> _shared;
void fire_continuation()
{
std::lock_guard<std::mutex> lock(_shared->_continuation_mutex);
if (_shared->_continuation && !_shared->_continuation_ran.exchange(true))
_shared->_continuation();
// Zero out the continuation function immediately after running it.
// This is important because the continuation might hold a reference to a promise
// that might hamper cancelation.
_shared->_continuation = nullptr;
}
};
//! in the "promise/future" pattern, we use the same object for both,
//! but here's an alias for clarity.
template<class T> using promise = future<T>;
namespace detail
{
struct job
{
context ctx;
std::function<bool()> _delegate;
bool operator < (const job& rhs) const
{
float lp = ctx.priority ? ctx.priority() : -FLT_MAX;
float rp = rhs.ctx.priority ? rhs.ctx.priority() : -FLT_MAX;
return lp < rp;
}
};
inline bool steal_job(class jobpool* thief, detail::job& stolen);
}
/**
* A priority-sorted collection of jobs that are running or waiting
* to run in a thread pool.
*/
class jobpool
{
public:
/**
* Metrics of a thread pool.
*/
struct metrics_t
{
std::string name;
std::atomic_uint concurrency = { 0u };
std::atomic_uint pending = { 0u };
std::atomic_uint running = { 0u };
std::atomic_uint postprocessing = { 0u };
std::atomic_uint canceled = { 0u };
std::atomic_uint total = { 0u };
};
public:
//! Destroy
~jobpool()
{
stop_threads();
}
//! Name of this job pool
const std::string& name() const
{
return _metrics.name;
}
metrics_t* metrics()
{
return &_metrics;
}
//! Set the concurrency of this job scheduler
void set_concurrency(unsigned value)
{
value = std::max(value, 1u);
if (_target_concurrency != value)
{
_target_concurrency = value;
start_threads();
}
}
//! Get the target concurrency (thread count)
unsigned concurrency() const
{
return _target_concurrency;
}
//! Whether this job pool is allowed to steal work from other job pools
//! when it is idle. Default = true.
void set_can_steal_work(bool value)
{
_can_steal_work = value;
}
//! Discard all queued jobs
void cancel_all()
{
std::lock_guard<std::mutex> lock(_queue_mutex);
_queue.clear();
_queue_size = 0;
_metrics.canceled += _metrics.pending;
_metrics.pending = 0;
}
//! Schedule an asynchronous task on this scheduler
//! Use job::dispatch to run jobs (usually no need to call this directly)
//! @param delegate Function to execute
//! @param context Job details
void _dispatch_delegate(std::function<bool()>& delegate, const context& context)
{
if (!_done)
{
// If we have a group semaphore, acquire it BEFORE queuing the job
if (context.group)
{
context.group->acquire();
}
if (_target_concurrency > 0)
{
std::lock_guard<std::mutex> lock(_queue_mutex);
_queue.emplace_back(detail::job{ context, delegate });
_queue_size++;
_metrics.pending++;
_metrics.total++;
_block.notify_one();
}
else
{
// no threads? run synchronously.
delegate();
if (context.group)
{
context.group->release();
}
}
}
}
//! removes the highest priority job from the queue and places it
//! in output. Returns true if a job was taken, false if the queue
//! was empty.
inline bool _take_job(detail::job& output, bool lock)
{
if (lock)
{
std::lock_guard<std::mutex> lock(_queue_mutex);
return _take_job(output, false);
}
else if (!_done && _queue_size > 0)
{
auto ptr = _queue.end();
float highest_priority = -FLT_MAX;
for (auto iter = _queue.begin(); iter != _queue.end(); ++iter)
{
float priority = iter->ctx.priority != nullptr ?
iter->ctx.priority() :
0.0f;
if (ptr == _queue.end() || priority > highest_priority)
{
ptr = iter;
highest_priority = priority;
}
}
if (ptr == _queue.end())
ptr = _queue.begin();
output = std::move(*ptr);
_queue.erase(ptr);
_queue_size--;
_metrics.pending--;
return true;
}
return false;
}
//! Construct a new job pool.
//! Do not call this directly - call getPool(name) instead.
jobpool(const std::string& name, unsigned concurrency) :
_target_concurrency(concurrency)
{
_metrics.name = name;
_metrics.concurrency = 0;
}
//! Pulls queued jobs and runs them in whatever thread run() is called from.
//! Runs in a loop until _done is set.
inline void run();
//! Spawn all threads in this scheduler
inline void start_threads();
//! Signall all threads to stop
inline void stop_threads();
//! Wait for all threads to exit (after calling stop_threads)
inline void join_threads();
bool _can_steal_work = true;
std::list<detail::job> _queue;
std::atomic_int _queue_size = { 0 }; // don't use list::size(), it's slow and not atomic
mutable std::mutex _queue_mutex; // protect access to the queue
mutable std::mutex _quit_mutex; // protects access to _done
std::atomic<unsigned> _target_concurrency; // target number of concurrent threads in the pool
std::condition_variable_any _block; // thread waiter block
bool _done = false; // set to true when threads should exit
std::vector<std::thread> _threads; // threads in the pool
metrics_t _metrics; // metrics for this pool
};
class metrics
{
public:
//! Total number of pending jobs across all schedulers
int total_pending() const;
//! Total number of running jobs across all schedulers
int total_running() const;
//! Total number of running jobs across all schedulers
int total_postprocessing() const;
//! Total number of canceled jobs across all schedulers
int total_canceled() const;
//! Total number of active jobs in the system
int total() const;
//! Gets a vector of all jobpool metrics structures.
inline const std::vector<struct jobpool::metrics_t*> all()
{
return _pools;
}
std::vector<struct jobpool::metrics_t*> _pools;
};
/**
* Runtime singleton object;
* Declare with WEEJOBS_INSTANCE in one of your .cpp files.
*/
namespace detail
{
struct runtime
{
inline runtime();
inline ~runtime();
inline void shutdown();
bool _alive = true;
bool _stealing_allowed = false;
std::mutex _pools_mutex;
std::vector<jobpool*> _pools;
metrics _metrics;
std::function<void(const char*)> _set_thread_name;
};
}
//! Access to the runtime singleton - users need not call this
extern WEEJOBS_EXPORT detail::runtime& instance();
//! Returns the job pool with the given name, creating a new one if it doesn't
//! already exist. If you don't specify a name, a default pool is used.
inline jobpool* get_pool(const std::string& name = {})
{
std::lock_guard<std::mutex> lock(instance()._pools_mutex);
for (auto pool : instance()._pools)
{
if (pool->name() == name)
return pool;
}
auto new_pool = new jobpool(name, 2u);
instance()._pools.push_back(new_pool);
instance()._metrics._pools.push_back(&new_pool->_metrics);
new_pool->start_threads();
return new_pool;
}
namespace detail
{
// dispatches a function to the appropriate job pool.
inline void pool_dispatch(std::function<bool()> delegate, const context& context)
{
auto pool = context.pool ? context.pool : get_pool({});
if (pool)
{
pool->_dispatch_delegate(delegate, context);
// if work stealing is enabled, wake up all pools
if (instance()._stealing_allowed)
{
std::lock_guard<std::mutex> lock(instance()._pools_mutex);
for (auto pool : instance()._pools)
{
pool->_block.notify_all();
}
}
}
}
}
//! Dispatches a job with no return value. Fire and forget.
//! @param task Function to run in a thread. Prototype is void(void).
//! @param context Optional configuration for the asynchronous function call
inline void dispatch(std::function<void()> task, const context& context = {})
{
auto delegate = [task]() mutable -> bool { task(); return true; };
detail::pool_dispatch(delegate, context);
}
//! Dispatches a job and immediately returns a future result.
//! @param task Function to run in a thread. Prototype is T(cancelable&)
//! @param context Optional configuration for the asynchronous function call
//! @return Future result of the async function call
template<typename F, typename T = typename detail::result_of_t<F, cancelable&>>
WEEJOBS_NO_DISCARD inline future<T> dispatch(F task, const context& context = {})
{
future<T> promise;
bool can_cancel = context.can_cancel;
std::function<bool()> delegate = [task, promise, can_cancel]() mutable
{
bool good = true;
if (can_cancel)
{
good = !promise.canceled();
if (good)
promise.resolve(task(promise));
}
else
{
cancelable dummy;
promise.resolve(task(dummy));
}
return good;
};
detail::pool_dispatch(delegate, context);
return promise;
}
//! Dispatches a job and immediately returns a future result.
//! @param task Function to run in a thread. Prototype is T(cancelable&)
//! @param promise Optional user-supplied promise object
//! @param context Optional configuration for the asynchronous function call
//! @return Future result of the async function call
template<typename F, typename T = typename detail::result_of_t<F, cancelable&>>
WEEJOBS_NO_DISCARD inline future<T> dispatch(F task, future<T> promise, const context& context = {})
{
bool can_cancel = context.can_cancel;
std::function<bool()> delegate = [task, promise, can_cancel]() mutable
{
bool run = !can_cancel || !promise.canceled();
if (run)
{
task(promise);
}
return run;
};
detail::pool_dispatch(delegate, context);
return promise;
}
//! Metrics for all job pool
inline metrics* get_metrics()
{
return &instance()._metrics;
}
//! stop all threads, wait for them to exit, and shut down the system
inline void shutdown()
{
instance().shutdown();
}
//! Whether the weejobs runtime is still alive (has not been shutdown)
inline bool alive()
{
return instance()._alive;
}
//! Install a function that the SDK can use to set job pool thread names
//! when it spawns them.
inline void set_thread_name_function(std::function<void(const char*)> f)
{
instance()._set_thread_name = f;
}
//! Whether to allow jobpools to steal work from other jobpools when they are idle.
inline void set_allow_work_stealing(bool value)
{
instance()._stealing_allowed = value;
}
inline detail::runtime::runtime()
{
//nop
}
inline detail::runtime::~runtime()
{
shutdown();
}
inline void detail::runtime::shutdown()
{
_alive = false;
//std::cout << "stopping " << _pools.size() << " threads..." << std::endl;
for (auto& pool : _pools)
if (pool)
pool->stop_threads();
//std::cout << "joining " << _pools.size() << " threads..." << std::endl;
for (auto& pool : _pools)
if (pool)
pool->join_threads();
}
inline void jobpool::run()
{
while (!_done)
{
detail::job next;
bool have_next = false;
{
if (_can_steal_work && instance()._stealing_allowed)
{
{
std::unique_lock<std::mutex> lock(_queue_mutex);
// work-stealing enabled: wait until any queue is non-empty
_block.wait(lock, [this]() { return get_metrics()->total_pending() > 0 || _done; });
if (!_done && _queue_size > 0)
{
have_next = _take_job(next, false);
}
}
if (!_done && !have_next)
{
have_next = detail::steal_job(this, next);
}
}
else
{
std::unique_lock<std::mutex> lock(_queue_mutex);
// wait until just our local queue is non-empty
_block.wait(lock, [this] { return (_queue_size > 0) || _done; });
if (!_done && _queue_size > 0)
{
have_next = _take_job(next, false);
}
}
}
if (have_next)
{
_metrics.running++;
auto t0 = std::chrono::steady_clock::now();
bool job_executed = next._delegate();
auto duration = std::chrono::steady_clock::now() - t0;
if (job_executed == false)
{
_metrics.canceled++;
}
// release the group semaphore if necessary
if (next.ctx.group != nullptr)
{
next.ctx.group->release();
}
_metrics.running--;
}
// See if we no longer need this thread because the
// target concurrency has been reduced
std::lock_guard<std::mutex> lock(_quit_mutex);
if (_target_concurrency < _metrics.concurrency)
{
_metrics.concurrency--;
break;
}
}
}
inline void jobpool::start_threads()
{
_done = false;
// Not enough? Start up more
while (_metrics.concurrency < _target_concurrency)
{
_metrics.concurrency++;
_threads.push_back(std::thread([this]
{
if (instance()._set_thread_name)
{
instance()._set_thread_name(_metrics.name.c_str());
}
run();
}
));
}
}
inline void jobpool::stop_threads()
{
_done = true;
// Clear out the queue
std::lock_guard<std::mutex> lock(_queue_mutex);
// reset any group semaphores so that JobGroup.join()
// will not deadlock.
for (auto& queuedjob : _queue)
{
if (queuedjob.ctx.group != nullptr)
{
queuedjob.ctx.group->release();
}
}
_queue.clear();
_queue_size = 0;
// wake up all threads so they can exit
_block.notify_all();
}
//! Wait for all threads to exit (after calling stop_threads)
inline void jobpool::join_threads()
{
// wait for them to exit
for (unsigned i = 0; i < _threads.size(); ++i)
{
if (_threads[i].joinable())
{
_threads[i].join();
}
}
_threads.clear();
}
// steal a job from another jobpool's queue (other than "thief").
inline bool detail::steal_job(jobpool* thief, detail::job& stolen)
{
jobpool* pool_with_most_jobs = nullptr;
{
std::lock_guard<std::mutex> lock(instance()._pools_mutex);
std::size_t max_num_jobs = 0u;
for (auto pool : instance()._pools)
{
if (pool != thief)
{
if (static_cast<std::size_t>(pool->_queue_size) > max_num_jobs)
{
max_num_jobs = pool->_queue_size;
pool_with_most_jobs = pool;
}
}
}
}
if (pool_with_most_jobs)
{
return pool_with_most_jobs->_take_job(stolen, true);
}
return false;
}
template<typename T>
template<typename F, typename R>
inline future<R> future<T>::then_dispatch(F func, const context& con)
{
// The future result of F.
// In this case, the continuation task will return a value that the system will use to resolve the promise.
future<R> continuation_promise;
// lock the continuation and set it:
{
std::lock_guard<std::mutex> lock(_shared->_continuation_mutex);
if (_shared->_continuation)
{
return {}; // only one continuation allowed
}
// take a weak ptr to this future's shared data. If this future goes away we'll still
// have access to its result.
std::weak_ptr<shared_t> weak_shared = _shared;
context copy_of_con = con;
_shared->_continuation = [func, copy_of_con, weak_shared, continuation_promise]() mutable
{
auto shared = weak_shared.lock();
// verify the parent's result is actually available (simulate available())
if (shared && shared->_ev.isSet())
{
// copy it and dispatch it as the input to a new job:
T copy_of_value = shared->_obj;
// Once this wrapper gets created, note that we now have 2 refereces to the continuation_promise.
// To prevent this from hampering cancelation, the continuation fuction is set to nullptr
// immediately after being called.
auto wrapper = [func, copy_of_value, continuation_promise]() mutable
{
continuation_promise.resolve(func(copy_of_value, continuation_promise));
};
jobs::dispatch(wrapper, copy_of_con);
}
};
}
// maybe the future is already available?
if (available())
{
fire_continuation();
}
return continuation_promise;
}
template<typename T>
template<typename R>
inline future<R> future<T>::then_dispatch(std::function<void(const T&, future<R>&)> func, const context& con)
{
// The future we will return to the caller.
// Note, the user function "func" is responsible for resolving this promise.
future<R> continuation_promise;
// lock the continuation and set it:
{
std::lock_guard<std::mutex> lock(_shared->_continuation_mutex);
if (_shared->_continuation)
{
return {}; // only one continuation allowed
}
// take a weak ptr to this future's shared data. If this future goes away we'll still
// have access to its result.
std::weak_ptr<shared_t> weak_shared = _shared;
// The user task is responsible for resolving the promise.
// This continuation executes the user function directly instead of dispatching it
// to the job pool. This is because we expect the user function to use some external
// asynchronous mechanism to resolve the promise.
_shared->_continuation = [func, weak_shared, continuation_promise]() mutable
{
auto shared = weak_shared.lock();
if (shared)
{
func(shared->_obj, continuation_promise);
}
};
}
if (available())
{
fire_continuation();
}
return continuation_promise;
}
template<typename T>
inline void future<T>::then_dispatch(std::function<void(const T&)> func, const context& con)
{
// lock the continuation and set it:
{
std::lock_guard<std::mutex> lock(_shared->_continuation_mutex);
if (_shared->_continuation)
{
return; // only one continuation allowed
}
// take a weak ptr to this future's shared data. If this future goes away we'll still
// have access to its result.
std::weak_ptr<shared_t> weak_shared = _shared;
auto copy_of_con = con;
_shared->_continuation = [func, weak_shared, copy_of_con]() mutable
{
auto shared = weak_shared.lock();
if (shared)
{
auto copy_of_value = shared->_obj;
auto fire_and_forget_delegate = [func, copy_of_value]() mutable
{
func(copy_of_value);
return true;
};
detail::pool_dispatch(fire_and_forget_delegate, copy_of_con);
}
};
}
if (available())
{
fire_continuation();
}
}
//! Total number of pending jobs across all schedulers
inline int metrics::total_pending() const
{
std::lock_guard<std::mutex> lock(instance()._pools_mutex);
int count = 0;
for (auto pool : _pools)
count += pool->pending;
return count;
}
//! Total number of running jobs across all schedulers
inline int metrics::total_running() const
{
std::lock_guard<std::mutex> lock(instance()._pools_mutex);
int count = 0;
for (auto pool : _pools)
count += pool->running;
return count;
}
//! Total number of running jobs across all schedulers
inline int metrics::total_postprocessing() const
{
std::lock_guard<std::mutex> lock(instance()._pools_mutex);
int count = 0;
for (auto pool : _pools)
count += pool->postprocessing;
return count;
}
//! Total number of canceled jobs across all schedulers
inline int metrics::total_canceled() const
{
std::lock_guard<std::mutex> lock(instance()._pools_mutex);
int count = 0;
for (auto pool : _pools)
count += pool->canceled;
return count;
}
//! Total number of active jobs in the system
inline int metrics::total() const
{
std::lock_guard<std::mutex> lock(instance()._pools_mutex);
int count = 0;
for (auto pool : _pools)
count += pool->pending + pool->running + pool->postprocessing;
return count;
}
// Use this macro ONCE in your application in a .cpp file to
// instaniate the weejobs runtime singleton.
#define WEEJOBS_INSTANCE \
namespace WEEJOBS_NAMESPACE { \
static detail::runtime runtime_singleton_instance; \
detail::runtime& instance() { return runtime_singleton_instance; } \
}
}