Robust Distributed System Nucleus (rDSN)  ver 1.0.0
task_worker.h
1 /*
2  * The MIT License (MIT)
3  *
4  * Copyright (c) 2015 Microsoft Corporation
5  *
6  * -=- Robust Distributed System Nucleus (rDSN) -=-
7  *
8  * Permission is hereby granted, free of charge, to any person obtaining a copy
9  * of this software and associated documentation files (the "Software"), to deal
10  * in the Software without restriction, including without limitation the rights
11  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
12  * copies of the Software, and to permit persons to whom the Software is
13  * furnished to do so, subject to the following conditions:
14  *
15  * The above copyright notice and this permission notice shall be included in
16  * all copies or substantial portions of the Software.
17  *
18  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
19  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
20  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
21  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
22  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
23  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
24  * THE SOFTWARE.
25  */
26 
27 /*
28  * Description:
29  * task worker (thread) abstraction
30  *
31  * Revision history:
32  * Mar., 2015, @imzhenyu (Zhenyu Guo), first version
33  * xxxx-xx-xx, author, fix bug about xxx
34  */
35 
36 # pragma once
37 
38 # include <dsn/tool-api/task_queue.h>
39 # include <dsn/utility/extensible_object.h>
40 # include <dsn/utility/synchronize.h>
41 # include <dsn/utility/dlib.h>
42 # include <dsn/tool-api/perf_counter.h>
43 # include <thread>
44 
45 namespace dsn {
46 
54 class task_worker : public extensible_object<task_worker, 4>
55 {
56 public:
57  template <typename T> static task_worker* create(task_worker_pool* pool, task_queue* q, int index, task_worker* inner_provider)
58  {
59  return new T(pool, q, index, inner_provider);
60  }
61 
62  typedef task_worker* (*factory)(task_worker_pool*, task_queue*, int, task_worker*);
63 
64 public:
65  DSN_API task_worker(task_worker_pool* pool, task_queue* q, int index, task_worker* inner_provider);
66  DSN_API virtual ~task_worker(void);
67 
68  // service management
69  DSN_API void start();
70  DSN_API void stop();
71 
72  DSN_API virtual void loop(); // run tasks from _input_queue
73 
74  // inquery
75  const std::string& name() const { return _name; }
76  int index() const { return _index; }
77  int native_tid() const { return _native_tid; }
78  task_worker_pool* pool() const { return _owner_pool; }
79  task_queue* queue() const { return _input_queue; }
80  DSN_API const threadpool_spec& pool_spec() const;
81  DSN_API static task_worker* current();
82 
83 private:
84  task_worker_pool* _owner_pool;
85  task_queue* _input_queue;
86  int _index;
87  int _native_tid;
88  std::string _name;
89  std::thread *_thread;
90  bool _is_running;
91  utils::notify_event _started;
92  int _processed_task_count;
93 
94 public:
95  DSN_API static void set_name(const char* name);
96  DSN_API static void set_priority(worker_priority_t pri);
97  DSN_API static void set_affinity(uint64_t affinity);
98 
99 private:
100  void run_internal();
101 
102 public:
107  DSN_API static join_point<void, task_worker*> on_start;
108  DSN_API static join_point<void, task_worker*> on_create;
110 };
112 } // end namespace
113 
114 
task queue batches the input queue for the bound task worker(s) (threads)
Definition: task_queue.h:55
task worker processes the input tasks from the bound task queue
Definition: task_worker.h:54
Definition: address.h:52