/** * weejobs * Copyright 2024 Pelican Mapping * https://github.com/pelicanmapping/weejobs * MIT License */ #pragma once #include #include #include #include #include #include #include #include #include #include #include #include #include // OPTIONAL: Define WEEJOBS_EXPORT if you want to use this library from multiple modules (DLLs) #ifndef WEEJOBS_EXPORT #define WEEJOBS_EXPORT #endif // OPTIONAL: Customize the namespace by defining WEEJOBS_NAMESPACE before including this file. #ifndef WEEJOBS_NAMESPACE #define WEEJOBS_NAMESPACE jobs #endif // Version #define WEEJOBS_VERSION_MAJOR 1 #define WEEJOBS_VERSION_MINOR 0 #define WEEJOBS_VERSION_REV 1 #define WEEJOBS_STR_NX(s) #s #define WEEJOBS_STR(s) WEEJOBS_STR_NX(s) #define WEEJOBS_COMPUTE_VERSION(major, minor, patch) ((major) * 10000 + (minor) * 100 + (patch)) #define WEEJOBS_VERSION_NUMBER WEEJOBS_COMPUTE_VERSION(WEEJOBS_VERSION_MAJOR, WEEJOBS_VERSION_MINOR, WEEJOBS_VERSION_REV) #define WEEJOBS_VERSION_STRING WEEJOBS_STR(WEEJOBS_VERSION_MAJOR) "." WEEJOBS_STR(WEEJOBS_VERSION_MINOR) "." WEEJOBS_STR(WEEJOBS_VERSION_REV) #if __cplusplus >= 201703L || _MSVC_LANG >= 201703L #define WEEJOBS_NO_DISCARD [[nodiscard]] #else #define WEEJOBS_NO_DISCARD #endif /** * weejobs is an API for scheduling a task to run in the background. * Please read the README.md file for more information. */ namespace WEEJOBS_NAMESPACE { /** * Interface for something that can be canceled */ class cancelable { public: virtual bool canceled() const { return false; } }; namespace detail { /** * Event with a binary signaled state, for multi-threaded sychronization. * * The event has two states: * "set" means that a call to wait() will not block; * "unset" means that calls to wait() will block until another thread calls set(). * * The event starts out unset. * * Typical usage: Thread A creates Thread B to run asynchronous code. Thread A * then calls wait(), which blocks Thread A. When Thread B is finished, it calls * set(). Thread A then wakes up and continues execution. * * NOTE: ALL waiting threads will wake up when the Event is cleared. */ struct event { public: //! Construct a new event event() : _set(false) { } //! DTOR ~event() { _set = false; for (int i = 0; i < 255; ++i) // workaround buggy broadcast _cond.notify_all(); } //! Block until the event is set, then return true. inline bool wait() { while (!_set) { std::unique_lock lock(_m); if (!_set) _cond.wait(lock); } return _set; } //! Block until the event is set or the timout expires. //! Return true if the event has set, otherwise false. template inline bool wait(T timeout) { if (!_set) { std::unique_lock lock(_m); if (!_set) _cond.wait_for(lock, timeout); } return _set; } //! Block until the event is set; then reset it. inline bool waitAndReset() { std::unique_lock lock(_m); if (!_set) _cond.wait(lock); _set = false; return true; } //! Set the event state, causing any waiters to unblock. inline void set() { if (!_set) { std::unique_lock lock(_m); if (!_set) { _set = true; _cond.notify_all(); } } } //! Reset (unset) the event state; new waiters will block until set() is called. inline void reset() { std::unique_lock lock(_m); _set = false; } //! Whether the event state is set (waiters will not block). inline bool isSet() const { return _set; } protected: bool _set; std::condition_variable_any _cond; std::mutex _m; // do not use Mutex, we never want tracking }; /** * Sempahore lets N users aquire it and then notifies when the * count goes back down to zero. */ class semaphore { public: //! Acquire, increasing the usage count by one void acquire() { std::unique_lock lock(_m); ++_count; } //! Release, decreasing the usage count by one. //! When the count reaches zero, joiners will be notified and //! the semaphore will reset to its initial state. void release() { std::unique_lock lock(_m); _count = std::max(_count - 1, 0); if (_count == 0) _cv.notify_all(); } //! Reset to initialize state; this will cause a join to occur //! even if no acquisitions have taken place. void reset() { std::unique_lock lock(_m); _count = 0; _cv.notify_all(); } //! Current count in the semaphore std::size_t count() const { std::unique_lock lock(_m); return _count; } //! Block until the semaphore count returns to zero. //! (It must first have left zero) //! Warning: this method will block forever if the count //! never reaches zero! void join() { std::unique_lock lock(_m); while (_count > 0) _cv.wait(lock); } //! Block until the semaphore count returns to zero, or //! the operation is canceled. //! (It must first have left zero) void join(cancelable* c) { _cv.wait_for(_m, std::chrono::seconds(1), [this, c]() { return (_count == 0) || (c && c->canceled()); } ); _count = 0; } private: int _count = 0; std::condition_variable_any _cv; mutable std::mutex _m; }; #if __cplusplus >= 201703L || _MSVC_LANG >= 201703L template using result_of_t = typename std::invoke_result::type; #else template using result_of_t = typename std::result_of::type; #endif } /** * Include a jobgroup in a context to group together multiple jobs. * You can then call jobgroup::join() to wait for the whole group * to finish. */ struct jobgroup : public detail::semaphore { static std::shared_ptr create() { return std::make_shared(); } }; /** * Context object you can pass to dispatch(...) to control aspects of * how the background task is run. */ struct context { std::string name; // readable name of the job class jobpool* pool = nullptr; // job pool to run in std::function priority = {}; // priority of the job std::shared_ptr group = nullptr; // join group for this job bool can_cancel = true; // if true, the job will cancel if its future goes out of scope }; /** * Future holds the future result of an asynchronous operation. * * Usage: * Producer (usually an asynchronous function call) creates a future * (the promise of a future result) and immediately returns it. The Consumer * then performs other work, and eventually (or immediately) checks available() * for a result or canceled() for cancelation. If availabile() is true, * Consumer calls value() to fetch the valid result. * * As long as at least two equivalent Future object (i.e. Futures pointing to the * same internal shared data) exist, the Future is considered valid. Once * that count goes to one, the Future is either available (the value is ready) * or empty (i.e., canceled or abandoned). */ template class future : public cancelable { private: // internal structure to track references to the result // One instance of this is shared among all Future instances // created from the copy constructor. struct shared_t { T _obj; mutable detail::event _ev; std::mutex _continuation_mutex; std::function _continuation; std::atomic_bool _continuation_ran = { false }; }; public: //! Default constructor future() { _shared = std::make_shared(); } //! Default copy constructor future(const future& rhs) = default; //! True is this Future is unused and not connected to any other Future bool empty() const { return !available() && _shared.use_count() == 1; } //! True if the promise was resolved and a result if available. bool available() const { return _shared->_ev.isSet(); } //! True if a promise exists, but has not yet been resolved; //! Presumably the asynchronous task is still working. bool working() const { return !empty() && !available(); } // cancelable interface bool canceled() const override { return empty(); } //! Deference the result object. Make sure you check available() //! to check that the future was actually resolved; otherwise you //! will just get the default object. const T& value() const { return _shared->_obj; } //! Dereference this object to const pointer to the result. const T* operator -> () const { return &_shared->_obj; } //! Result is available AND equal to the argument. bool has_value(const T& arg) const { return available() && value() == arg; } //! Same as value(), but if the result is available will reset the //! future before returning the result object. T release() { bool avail = available(); T result = value(); if (avail) reset(); return result; } //! Blocks until the result becomes available or the future is abandoned; //! then returns the result object. const T& join() const { while ( !empty() && !_shared->_ev.wait(std::chrono::milliseconds(1))); return value(); } //! Blocks until the result becomes available or the future is abandoned //! or a cancelation flag is set; then returns the result object. Be sure to //! check canceled() after calling join() to see if the return value is valid. const T& join(const cancelable* p) const { while (working() && (p == nullptr || !p->canceled())) { _shared->_ev.wait(std::chrono::milliseconds(1)); } return value(); } //! Blocks until the result becomes available or the future is abandoned //! or a cancelation flag is set; then returns the result object. Be sure to //! check canceled() after calling join() to see if the return value is valid. const T& join(const cancelable& p) const { return join(&p); } //! Release reference to a promise, resetting this future to its default state void abandon() { _shared.reset(new shared_t()); } //! synonym for abandon. void reset() { abandon(); } //! Resolve (fulfill) the promise with the provided result value. void resolve(const T& value) { _shared->_obj = value; _shared->_ev.set(); fire_continuation(); } //! Resolve (fulfill) the promise with an rvalue void resolve(T&& value) { _shared->_obj = std::move(value); _shared->_ev.set(); fire_continuation(); } //! Resolve (fulfill) the promise with a default result void resolve() { _shared->_ev.set(); fire_continuation(); } //! The number of objects, including this one, that //! reference the shared container. If this method //! returns 1, that means this is the only object with //! access to the data. This method will never return zero. unsigned refs() const { return _shared.use_count(); } //! Add a continuation to this future. The continuation will be dispatched //! when this object's result becomes available; that result will be the input //! value to the continuation function. The continuation function in turn must //! return a value (cannot be void). template> WEEJOBS_NO_DISCARD inline future then_dispatch(F func, const context& con = {}); //! Add a continuation to this future. Instead of the functor returning a value, //! it will instead have the option of resolving the incoming future/promise object. //! This is useful for operations that have their own way of running asynchronous code. //! Note: for some reason when you use this variant you must specific the template //! argument, e.g. result.then(auto i, promise p) template WEEJOBS_NO_DISCARD inline future then_dispatch(std::function&)> func, const context& con = {}); //! Add a continuation to this future. The functor only takes an input value and has no //! return value (fire and forget). inline void then_dispatch(std::function func, const context& con = {}); private: std::shared_ptr _shared; void fire_continuation() { std::lock_guard lock(_shared->_continuation_mutex); if (_shared->_continuation && !_shared->_continuation_ran.exchange(true)) _shared->_continuation(); // Zero out the continuation function immediately after running it. // This is important because the continuation might hold a reference to a promise // that might hamper cancelation. _shared->_continuation = nullptr; } }; //! in the "promise/future" pattern, we use the same object for both, //! but here's an alias for clarity. template using promise = future; namespace detail { struct job { context ctx; std::function _delegate; bool operator < (const job& rhs) const { float lp = ctx.priority ? ctx.priority() : -FLT_MAX; float rp = rhs.ctx.priority ? rhs.ctx.priority() : -FLT_MAX; return lp < rp; } }; inline bool steal_job(class jobpool* thief, detail::job& stolen); } /** * A priority-sorted collection of jobs that are running or waiting * to run in a thread pool. */ class jobpool { public: /** * Metrics of a thread pool. */ struct metrics_t { std::string name; std::atomic_uint concurrency = { 0u }; std::atomic_uint pending = { 0u }; std::atomic_uint running = { 0u }; std::atomic_uint postprocessing = { 0u }; std::atomic_uint canceled = { 0u }; std::atomic_uint total = { 0u }; }; public: //! Destroy ~jobpool() { stop_threads(); } //! Name of this job pool const std::string& name() const { return _metrics.name; } metrics_t* metrics() { return &_metrics; } //! Set the concurrency of this job scheduler void set_concurrency(unsigned value) { value = std::max(value, 1u); if (_target_concurrency != value) { _target_concurrency = value; start_threads(); } } //! Get the target concurrency (thread count) unsigned concurrency() const { return _target_concurrency; } //! Whether this job pool is allowed to steal work from other job pools //! when it is idle. Default = true. void set_can_steal_work(bool value) { _can_steal_work = value; } //! Discard all queued jobs void cancel_all() { std::lock_guard lock(_queue_mutex); _queue.clear(); _queue_size = 0; _metrics.canceled += _metrics.pending; _metrics.pending = 0; } //! Schedule an asynchronous task on this scheduler //! Use job::dispatch to run jobs (usually no need to call this directly) //! @param delegate Function to execute //! @param context Job details void _dispatch_delegate(std::function& delegate, const context& context) { if (!_done) { // If we have a group semaphore, acquire it BEFORE queuing the job if (context.group) { context.group->acquire(); } if (_target_concurrency > 0) { std::lock_guard lock(_queue_mutex); _queue.emplace_back(detail::job{ context, delegate }); _queue_size++; _metrics.pending++; _metrics.total++; _block.notify_one(); } else { // no threads? run synchronously. delegate(); if (context.group) { context.group->release(); } } } } //! removes the highest priority job from the queue and places it //! in output. Returns true if a job was taken, false if the queue //! was empty. inline bool _take_job(detail::job& output, bool lock) { if (lock) { std::lock_guard lock(_queue_mutex); return _take_job(output, false); } else if (!_done && _queue_size > 0) { auto ptr = _queue.end(); float highest_priority = -FLT_MAX; for (auto iter = _queue.begin(); iter != _queue.end(); ++iter) { float priority = iter->ctx.priority != nullptr ? iter->ctx.priority() : 0.0f; if (ptr == _queue.end() || priority > highest_priority) { ptr = iter; highest_priority = priority; } } if (ptr == _queue.end()) ptr = _queue.begin(); output = std::move(*ptr); _queue.erase(ptr); _queue_size--; _metrics.pending--; return true; } return false; } //! Construct a new job pool. //! Do not call this directly - call getPool(name) instead. jobpool(const std::string& name, unsigned concurrency) : _target_concurrency(concurrency) { _metrics.name = name; _metrics.concurrency = 0; } //! Pulls queued jobs and runs them in whatever thread run() is called from. //! Runs in a loop until _done is set. inline void run(); //! Spawn all threads in this scheduler inline void start_threads(); //! Signall all threads to stop inline void stop_threads(); //! Wait for all threads to exit (after calling stop_threads) inline void join_threads(); bool _can_steal_work = true; std::list _queue; std::atomic_int _queue_size = { 0 }; // don't use list::size(), it's slow and not atomic mutable std::mutex _queue_mutex; // protect access to the queue mutable std::mutex _quit_mutex; // protects access to _done std::atomic _target_concurrency; // target number of concurrent threads in the pool std::condition_variable_any _block; // thread waiter block bool _done = false; // set to true when threads should exit std::vector _threads; // threads in the pool metrics_t _metrics; // metrics for this pool }; class metrics { public: //! Total number of pending jobs across all schedulers int total_pending() const; //! Total number of running jobs across all schedulers int total_running() const; //! Total number of running jobs across all schedulers int total_postprocessing() const; //! Total number of canceled jobs across all schedulers int total_canceled() const; //! Total number of active jobs in the system int total() const; //! Gets a vector of all jobpool metrics structures. inline const std::vector all() { return _pools; } std::vector _pools; }; /** * Runtime singleton object; * Declare with WEEJOBS_INSTANCE in one of your .cpp files. */ namespace detail { struct runtime { inline runtime(); inline ~runtime(); inline void shutdown(); bool _alive = true; bool _stealing_allowed = false; std::mutex _pools_mutex; std::vector _pools; metrics _metrics; std::function _set_thread_name; }; } //! Access to the runtime singleton - users need not call this extern WEEJOBS_EXPORT detail::runtime& instance(); //! Returns the job pool with the given name, creating a new one if it doesn't //! already exist. If you don't specify a name, a default pool is used. inline jobpool* get_pool(const std::string& name = {}) { std::lock_guard lock(instance()._pools_mutex); for (auto pool : instance()._pools) { if (pool->name() == name) return pool; } auto new_pool = new jobpool(name, 2u); instance()._pools.push_back(new_pool); instance()._metrics._pools.push_back(&new_pool->_metrics); new_pool->start_threads(); return new_pool; } namespace detail { // dispatches a function to the appropriate job pool. inline void pool_dispatch(std::function delegate, const context& context) { auto pool = context.pool ? context.pool : get_pool({}); if (pool) { pool->_dispatch_delegate(delegate, context); // if work stealing is enabled, wake up all pools if (instance()._stealing_allowed) { std::lock_guard lock(instance()._pools_mutex); for (auto pool : instance()._pools) { pool->_block.notify_all(); } } } } } //! Dispatches a job with no return value. Fire and forget. //! @param task Function to run in a thread. Prototype is void(void). //! @param context Optional configuration for the asynchronous function call inline void dispatch(std::function task, const context& context = {}) { auto delegate = [task]() mutable -> bool { task(); return true; }; detail::pool_dispatch(delegate, context); } //! Dispatches a job and immediately returns a future result. //! @param task Function to run in a thread. Prototype is T(cancelable&) //! @param context Optional configuration for the asynchronous function call //! @return Future result of the async function call template> WEEJOBS_NO_DISCARD inline future dispatch(F task, const context& context = {}) { future promise; bool can_cancel = context.can_cancel; std::function delegate = [task, promise, can_cancel]() mutable { bool good = true; if (can_cancel) { good = !promise.canceled(); if (good) promise.resolve(task(promise)); } else { cancelable dummy; promise.resolve(task(dummy)); } return good; }; detail::pool_dispatch(delegate, context); return promise; } //! Dispatches a job and immediately returns a future result. //! @param task Function to run in a thread. Prototype is T(cancelable&) //! @param promise Optional user-supplied promise object //! @param context Optional configuration for the asynchronous function call //! @return Future result of the async function call template> WEEJOBS_NO_DISCARD inline future dispatch(F task, future promise, const context& context = {}) { bool can_cancel = context.can_cancel; std::function delegate = [task, promise, can_cancel]() mutable { bool run = !can_cancel || !promise.canceled(); if (run) { task(promise); } return run; }; detail::pool_dispatch(delegate, context); return promise; } //! Metrics for all job pool inline metrics* get_metrics() { return &instance()._metrics; } //! stop all threads, wait for them to exit, and shut down the system inline void shutdown() { instance().shutdown(); } //! Whether the weejobs runtime is still alive (has not been shutdown) inline bool alive() { return instance()._alive; } //! Install a function that the SDK can use to set job pool thread names //! when it spawns them. inline void set_thread_name_function(std::function f) { instance()._set_thread_name = f; } //! Whether to allow jobpools to steal work from other jobpools when they are idle. inline void set_allow_work_stealing(bool value) { instance()._stealing_allowed = value; } inline detail::runtime::runtime() { //nop } inline detail::runtime::~runtime() { shutdown(); } inline void detail::runtime::shutdown() { _alive = false; //std::cout << "stopping " << _pools.size() << " threads..." << std::endl; for (auto& pool : _pools) if (pool) pool->stop_threads(); //std::cout << "joining " << _pools.size() << " threads..." << std::endl; for (auto& pool : _pools) if (pool) pool->join_threads(); } inline void jobpool::run() { while (!_done) { detail::job next; bool have_next = false; { if (_can_steal_work && instance()._stealing_allowed) { { std::unique_lock lock(_queue_mutex); // work-stealing enabled: wait until any queue is non-empty _block.wait(lock, [this]() { return get_metrics()->total_pending() > 0 || _done; }); if (!_done && _queue_size > 0) { have_next = _take_job(next, false); } } if (!_done && !have_next) { have_next = detail::steal_job(this, next); } } else { std::unique_lock lock(_queue_mutex); // wait until just our local queue is non-empty _block.wait(lock, [this] { return (_queue_size > 0) || _done; }); if (!_done && _queue_size > 0) { have_next = _take_job(next, false); } } } if (have_next) { _metrics.running++; auto t0 = std::chrono::steady_clock::now(); bool job_executed = next._delegate(); auto duration = std::chrono::steady_clock::now() - t0; if (job_executed == false) { _metrics.canceled++; } // release the group semaphore if necessary if (next.ctx.group != nullptr) { next.ctx.group->release(); } _metrics.running--; } // See if we no longer need this thread because the // target concurrency has been reduced std::lock_guard lock(_quit_mutex); if (_target_concurrency < _metrics.concurrency) { _metrics.concurrency--; break; } } } inline void jobpool::start_threads() { _done = false; // Not enough? Start up more while (_metrics.concurrency < _target_concurrency) { _metrics.concurrency++; _threads.push_back(std::thread([this] { if (instance()._set_thread_name) { instance()._set_thread_name(_metrics.name.c_str()); } run(); } )); } } inline void jobpool::stop_threads() { _done = true; // Clear out the queue std::lock_guard lock(_queue_mutex); // reset any group semaphores so that JobGroup.join() // will not deadlock. for (auto& queuedjob : _queue) { if (queuedjob.ctx.group != nullptr) { queuedjob.ctx.group->release(); } } _queue.clear(); _queue_size = 0; // wake up all threads so they can exit _block.notify_all(); } //! Wait for all threads to exit (after calling stop_threads) inline void jobpool::join_threads() { // wait for them to exit for (unsigned i = 0; i < _threads.size(); ++i) { if (_threads[i].joinable()) { _threads[i].join(); } } _threads.clear(); } // steal a job from another jobpool's queue (other than "thief"). inline bool detail::steal_job(jobpool* thief, detail::job& stolen) { jobpool* pool_with_most_jobs = nullptr; { std::lock_guard lock(instance()._pools_mutex); std::size_t max_num_jobs = 0u; for (auto pool : instance()._pools) { if (pool != thief) { if (static_cast(pool->_queue_size) > max_num_jobs) { max_num_jobs = pool->_queue_size; pool_with_most_jobs = pool; } } } } if (pool_with_most_jobs) { return pool_with_most_jobs->_take_job(stolen, true); } return false; } template template inline future future::then_dispatch(F func, const context& con) { // The future result of F. // In this case, the continuation task will return a value that the system will use to resolve the promise. future continuation_promise; // lock the continuation and set it: { std::lock_guard lock(_shared->_continuation_mutex); if (_shared->_continuation) { return {}; // only one continuation allowed } // take a weak ptr to this future's shared data. If this future goes away we'll still // have access to its result. std::weak_ptr weak_shared = _shared; context copy_of_con = con; _shared->_continuation = [func, copy_of_con, weak_shared, continuation_promise]() mutable { auto shared = weak_shared.lock(); // verify the parent's result is actually available (simulate available()) if (shared && shared->_ev.isSet()) { // copy it and dispatch it as the input to a new job: T copy_of_value = shared->_obj; // Once this wrapper gets created, note that we now have 2 refereces to the continuation_promise. // To prevent this from hampering cancelation, the continuation fuction is set to nullptr // immediately after being called. auto wrapper = [func, copy_of_value, continuation_promise]() mutable { continuation_promise.resolve(func(copy_of_value, continuation_promise)); }; jobs::dispatch(wrapper, copy_of_con); } }; } // maybe the future is already available? if (available()) { fire_continuation(); } return continuation_promise; } template template inline future future::then_dispatch(std::function&)> func, const context& con) { // The future we will return to the caller. // Note, the user function "func" is responsible for resolving this promise. future continuation_promise; // lock the continuation and set it: { std::lock_guard lock(_shared->_continuation_mutex); if (_shared->_continuation) { return {}; // only one continuation allowed } // take a weak ptr to this future's shared data. If this future goes away we'll still // have access to its result. std::weak_ptr weak_shared = _shared; // The user task is responsible for resolving the promise. // This continuation executes the user function directly instead of dispatching it // to the job pool. This is because we expect the user function to use some external // asynchronous mechanism to resolve the promise. _shared->_continuation = [func, weak_shared, continuation_promise]() mutable { auto shared = weak_shared.lock(); if (shared) { func(shared->_obj, continuation_promise); } }; } if (available()) { fire_continuation(); } return continuation_promise; } template inline void future::then_dispatch(std::function func, const context& con) { // lock the continuation and set it: { std::lock_guard lock(_shared->_continuation_mutex); if (_shared->_continuation) { return; // only one continuation allowed } // take a weak ptr to this future's shared data. If this future goes away we'll still // have access to its result. std::weak_ptr weak_shared = _shared; auto copy_of_con = con; _shared->_continuation = [func, weak_shared, copy_of_con]() mutable { auto shared = weak_shared.lock(); if (shared) { auto copy_of_value = shared->_obj; auto fire_and_forget_delegate = [func, copy_of_value]() mutable { func(copy_of_value); return true; }; detail::pool_dispatch(fire_and_forget_delegate, copy_of_con); } }; } if (available()) { fire_continuation(); } } //! Total number of pending jobs across all schedulers inline int metrics::total_pending() const { std::lock_guard lock(instance()._pools_mutex); int count = 0; for (auto pool : _pools) count += pool->pending; return count; } //! Total number of running jobs across all schedulers inline int metrics::total_running() const { std::lock_guard lock(instance()._pools_mutex); int count = 0; for (auto pool : _pools) count += pool->running; return count; } //! Total number of running jobs across all schedulers inline int metrics::total_postprocessing() const { std::lock_guard lock(instance()._pools_mutex); int count = 0; for (auto pool : _pools) count += pool->postprocessing; return count; } //! Total number of canceled jobs across all schedulers inline int metrics::total_canceled() const { std::lock_guard lock(instance()._pools_mutex); int count = 0; for (auto pool : _pools) count += pool->canceled; return count; } //! Total number of active jobs in the system inline int metrics::total() const { std::lock_guard lock(instance()._pools_mutex); int count = 0; for (auto pool : _pools) count += pool->pending + pool->running + pool->postprocessing; return count; } // Use this macro ONCE in your application in a .cpp file to // instaniate the weejobs runtime singleton. #define WEEJOBS_INSTANCE \ namespace WEEJOBS_NAMESPACE { \ static detail::runtime runtime_singleton_instance; \ detail::runtime& instance() { return runtime_singleton_instance; } \ } }