Chris@102: // Copyright (C) 2014 Vicente J. Botet Escriba Chris@102: // Chris@102: // Distributed under the Boost Software License, Version 1.0. (See accompanying Chris@102: // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) Chris@102: // Chris@102: Chris@102: #ifndef BOOST_THREAD_EXECUTORS_SCHEDULER_HPP Chris@102: #define BOOST_THREAD_EXECUTORS_SCHEDULER_HPP Chris@102: Chris@102: #include Chris@102: #include Chris@102: Chris@102: #include Chris@102: #include Chris@102: #include Chris@102: Chris@102: #include Chris@102: Chris@102: namespace boost Chris@102: { Chris@102: namespace executors Chris@102: { Chris@102: /// Wraps the reference to an executor and a function to make a work that submit the function using the executor. Chris@102: template Chris@102: class resubmitter Chris@102: { Chris@102: public: Chris@102: resubmitter(Executor& ex, Function funct) : Chris@102: ex(ex), Chris@102: funct(boost::move(funct)) Chris@102: {} Chris@102: Chris@102: void operator()() Chris@102: { Chris@102: ex.submit(funct); Chris@102: } Chris@102: Chris@102: private: Chris@102: Executor& ex; Chris@102: Function funct; Chris@102: }; Chris@102: Chris@102: /// resubmitter factory Chris@102: template Chris@102: resubmitter::type> Chris@102: resubmit(Executor& ex, BOOST_THREAD_FWD_REF(Function) funct) { Chris@102: return resubmitter::type >(ex, boost::move(funct)); Chris@102: } Chris@102: Chris@102: /// Wraps references to a @c Scheduler and an @c Executor providing an @c Executor that Chris@102: /// resubmit the function using the referenced Executor at a given @c time_point known at construction. Chris@102: template Chris@102: class resubmit_at_executor Chris@102: { Chris@102: public: Chris@102: typedef typename Scheduler::clock clock; Chris@102: typedef typename Scheduler::work work; Chris@102: Chris@102: template Chris@102: resubmit_at_executor(Scheduler& sch, Executor& ex, chrono::time_point const& tp) : Chris@102: sch(sch), Chris@102: ex(ex), Chris@102: tp(tp), Chris@102: is_closed(false) Chris@102: { Chris@102: } Chris@102: Chris@102: ~resubmit_at_executor() Chris@102: { Chris@102: close(); Chris@102: } Chris@102: Chris@102: template Chris@102: void submit(BOOST_THREAD_FWD_REF(Work) w) Chris@102: { Chris@102: if (closed()) Chris@102: { Chris@102: BOOST_THROW_EXCEPTION( sync_queue_is_closed() ); Chris@102: } Chris@102: sch.submit_at(resubmit(ex,boost::forward(w)), tp); Chris@102: } Chris@102: Chris@102: Executor& underlying_executor() Chris@102: { Chris@102: return ex; Chris@102: } Chris@102: Scheduler& underlying_scheduler() Chris@102: { Chris@102: return sch; Chris@102: } Chris@102: Chris@102: void close() Chris@102: { Chris@102: is_closed = true; Chris@102: } Chris@102: Chris@102: bool closed() Chris@102: { Chris@102: return is_closed || sch.closed() || ex.closed(); Chris@102: } Chris@102: Chris@102: private: Chris@102: Scheduler& sch; Chris@102: Executor& ex; Chris@102: typename clock::time_point tp; Chris@102: bool is_closed; Chris@102: }; Chris@102: Chris@102: Chris@102: /// Expression template helper storing a pair of references to an @c Scheduler and an @c Executor Chris@102: /// It provides factory helper functions such as at/after that convert these a pair of @c Scheduler @c Executor Chris@102: /// into an new @c Executor that submit the work using the referenced @c Executor at/after a specific time/duration Chris@102: /// respectively, using the referenced @Scheduler. Chris@102: template Chris@102: class scheduler_executor_wrapper Chris@102: { Chris@102: public: Chris@102: typedef typename Scheduler::clock clock; Chris@102: typedef typename Scheduler::work work; Chris@102: typedef resubmit_at_executor the_executor; Chris@102: Chris@102: scheduler_executor_wrapper(Scheduler& sch, Executor& ex) : Chris@102: sch(sch), Chris@102: ex(ex) Chris@102: {} Chris@102: Chris@102: ~scheduler_executor_wrapper() Chris@102: { Chris@102: } Chris@102: Chris@102: Executor& underlying_executor() Chris@102: { Chris@102: return ex; Chris@102: } Chris@102: Scheduler& underlying_scheduler() Chris@102: { Chris@102: return sch; Chris@102: } Chris@102: Chris@102: template Chris@102: the_executor after(chrono::duration const& rel_time) Chris@102: { Chris@102: return at(clock::now() + rel_time ); Chris@102: } Chris@102: Chris@102: template Chris@102: the_executor at(chrono::time_point const& abs_time) Chris@102: { Chris@102: return the_executor(sch, ex, abs_time); Chris@102: } Chris@102: Chris@102: private: Chris@102: Scheduler& sch; Chris@102: Executor& ex; Chris@102: }; //end class Chris@102: Chris@102: /// Wraps a reference to a @c Scheduler providing an @c Executor that Chris@102: /// run the function at a given @c time_point known at construction. Chris@102: template Chris@102: class at_executor Chris@102: { Chris@102: public: Chris@102: typedef typename Scheduler::clock clock; Chris@102: typedef typename Scheduler::work work; Chris@102: typedef typename clock::time_point time_point; Chris@102: Chris@102: template Chris@102: at_executor(Scheduler& sch, chrono::time_point const& tp) : Chris@102: sch(sch), Chris@102: tp(tp), Chris@102: is_closed(false) Chris@102: {} Chris@102: Chris@102: ~at_executor() Chris@102: { Chris@102: close(); Chris@102: } Chris@102: Chris@102: Scheduler& underlying_scheduler() Chris@102: { Chris@102: return sch; Chris@102: } Chris@102: Chris@102: void close() Chris@102: { Chris@102: is_closed = true; Chris@102: } Chris@102: Chris@102: bool closed() Chris@102: { Chris@102: return is_closed || sch.closed(); Chris@102: } Chris@102: Chris@102: template Chris@102: void submit(BOOST_THREAD_FWD_REF(Work) w) Chris@102: { Chris@102: if (closed()) Chris@102: { Chris@102: BOOST_THROW_EXCEPTION( sync_queue_is_closed() ); Chris@102: } Chris@102: sch.submit_at(boost::forward(w), tp); Chris@102: } Chris@102: Chris@102: template Chris@102: resubmit_at_executor on(Executor& ex) Chris@102: { Chris@102: return resubmit_at_executor(sch, ex, tp); Chris@102: } Chris@102: Chris@102: private: Chris@102: Scheduler& sch; Chris@102: time_point tp; Chris@102: bool is_closed; Chris@102: }; //end class Chris@102: Chris@102: /// A @c Scheduler using a specific thread. Note that a Scheduler is not an Executor. Chris@102: /// It provides factory helper functions such as at/after that convert a @c Scheduler into an @c Executor Chris@102: /// that submit the work at/after a specific time/duration respectively. Chris@102: template Chris@102: class scheduler : public detail::scheduled_executor_base Chris@102: { Chris@102: public: Chris@102: typedef typename detail::scheduled_executor_base::work work; Chris@102: Chris@102: typedef Clock clock; Chris@102: Chris@102: scheduler() Chris@102: : super(), Chris@102: thr(&super::loop, this) {} Chris@102: Chris@102: ~scheduler() Chris@102: { Chris@102: this->close(); Chris@102: thr.join(); Chris@102: } Chris@102: template Chris@102: scheduler_executor_wrapper on(Ex& ex) Chris@102: { Chris@102: return scheduler_executor_wrapper(*this, ex); Chris@102: } Chris@102: Chris@102: template Chris@102: at_executor after(chrono::duration const& rel_time) Chris@102: { Chris@102: return at(rel_time + clock::now()); Chris@102: } Chris@102: Chris@102: template Chris@102: at_executor at(chrono::time_point const& tp) Chris@102: { Chris@102: return at_executor(*this, tp); Chris@102: } Chris@102: Chris@102: private: Chris@102: typedef detail::scheduled_executor_base super; Chris@102: thread thr; Chris@102: }; Chris@102: Chris@102: Chris@102: } Chris@102: using executors::resubmitter; Chris@102: using executors::resubmit; Chris@102: using executors::resubmit_at_executor; Chris@102: using executors::scheduler_executor_wrapper; Chris@102: using executors::at_executor; Chris@102: using executors::scheduler; Chris@102: } Chris@102: Chris@102: #include Chris@102: Chris@102: #endif