38 # include <dsn/tool-api/task.h> 39 # include <dsn/tool-api/perf_counter.h> 40 # include <dsn/utility/dlib.h> 45 class task_worker_pool;
46 class admission_controller;
58 template <
typename T>
static task_queue* create(task_worker_pool* pool,
int index,
task_queue* inner_provider)
60 return new T(pool, index, inner_provider);
67 DSN_API
virtual ~task_queue();
69 virtual void enqueue(
task*
task) = 0;
74 virtual task* dequeue(
int& batch_size) = 0;
76 int count()
const {
return _queue_length.load(std::memory_order_relaxed); }
77 int decrease_count(
int count = 1) { _queue_length_counter->add((uint64_t)(-count));
return _queue_length.fetch_sub(count, std::memory_order_relaxed) - count;}
78 int increase_count(
int count = 1) { _queue_length_counter->add(count);
return _queue_length.fetch_add(count, std::memory_order_relaxed) + count;}
79 const std::string & get_name() {
return _name; }
80 task_worker_pool* pool()
const {
return _pool; }
81 bool is_shared()
const {
return _worker_count > 1; }
82 int worker_count()
const {
return _worker_count; }
83 task_worker* owner_worker()
const {
return _owner_worker; }
84 int index()
const {
return _index; }
85 volatile int* get_virtual_length_ptr() {
return &_virtual_queue_length; }
87 admission_controller* controller()
const {
return _controller; }
88 void set_controller(admission_controller* controller) { _controller = controller; }
91 friend class task_worker_pool;
92 void set_owner_worker(
task_worker* worker) { _owner_worker = worker; }
93 void enqueue_internal(task* task);
96 task_worker_pool* _pool;
100 admission_controller* _controller;
102 std::atomic<int> _queue_length;
103 mutable perf_counter_ptr _queue_length_counter;
104 threadpool_spec* _spec;
105 volatile int _virtual_queue_length;
task queue batches the input queue for the bound task worker(s) (threads)
Definition: task_queue.h:55
task worker processes the input tasks from the bound task queue
Definition: task_worker.h:54