Robust Distributed System Nucleus (rDSN)  ver 1.0.0
task_queue.h
1 /*
2  * The MIT License (MIT)
3  *
4  * Copyright (c) 2015 Microsoft Corporation
5  *
6  * -=- Robust Distributed System Nucleus (rDSN) -=-
7  *
8  * Permission is hereby granted, free of charge, to any person obtaining a copy
9  * of this software and associated documentation files (the "Software"), to deal
10  * in the Software without restriction, including without limitation the rights
11  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
12  * copies of the Software, and to permit persons to whom the Software is
13  * furnished to do so, subject to the following conditions:
14  *
15  * The above copyright notice and this permission notice shall be included in
16  * all copies or substantial portions of the Software.
17  *
18  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
19  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
20  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
21  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
22  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
23  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
24  * THE SOFTWARE.
25  */
26 
27 /*
28  * Description:
29  * task queue abstraction
30  *
31  * Revision history:
32  * Mar., 2015, @imzhenyu (Zhenyu Guo), first version
33  * xxxx-xx-xx, author, fix bug about xxx
34  */
35 
36 # pragma once
37 
38 # include <dsn/tool-api/task.h>
39 # include <dsn/tool-api/perf_counter.h>
40 # include <dsn/utility/dlib.h>
41 
42 namespace dsn {
43 
44 class task_worker;
45 class task_worker_pool;
46 class admission_controller;
47 
56 {
57 public:
58  template <typename T> static task_queue* create(task_worker_pool* pool, int index, task_queue* inner_provider)
59  {
60  return new T(pool, index, inner_provider);
61  }
62 
63  typedef task_queue* (*factory)(task_worker_pool*, int, task_queue*);
64 
65 public:
66  DSN_API task_queue(task_worker_pool* pool, int index, task_queue* inner_provider);
67  DSN_API virtual ~task_queue();
68 
69  virtual void enqueue(task* task) = 0;
70  // dequeue may return more than 1 tasks, but there is a configured
71  // best batch size for each worker so that load among workers
72  // are balanced,
73  // returned batch size is stored in parameter batch_size
74  virtual task* dequeue(/*inout*/int& batch_size) = 0;
75 
76  int count() const { return _queue_length.load(std::memory_order_relaxed); }
77  int decrease_count(int count = 1) { _queue_length_counter->add((uint64_t)(-count)); return _queue_length.fetch_sub(count, std::memory_order_relaxed) - count;}
78  int increase_count(int count = 1) { _queue_length_counter->add(count); return _queue_length.fetch_add(count, std::memory_order_relaxed) + count;}
79  const std::string & get_name() { return _name; }
80  task_worker_pool* pool() const { return _pool; }
81  bool is_shared() const { return _worker_count > 1; }
82  int worker_count() const { return _worker_count; }
83  task_worker* owner_worker() const { return _owner_worker; } // when not is_shared()
84  int index() const { return _index; }
85  volatile int* get_virtual_length_ptr() { return &_virtual_queue_length; }
86 
87  admission_controller* controller() const { return _controller; }
88  void set_controller(admission_controller* controller) { _controller = controller; }
89 
90 private:
91  friend class task_worker_pool;
92  void set_owner_worker(task_worker* worker) { _owner_worker = worker; }
93  void enqueue_internal(task* task);
94 
95 private:
96  task_worker_pool* _pool;
97  task_worker* _owner_worker;
98  std::string _name;
99  int _index;
100  admission_controller* _controller;
101  int _worker_count;
102  std::atomic<int> _queue_length;
103  mutable perf_counter_ptr _queue_length_counter;
104  threadpool_spec* _spec;
105  volatile int _virtual_queue_length;
106 };
108 } // end namespace
Definition: task.h:101
task queue batches the input queue for the bound task worker(s) (threads)
Definition: task_queue.h:55
task worker processes the input tasks from the bound task queue
Definition: task_worker.h:54
Definition: address.h:52