Robust Distributed System Nucleus (rDSN)  ver 1.0.0
task_spec.h
1 /*
2  * The MIT License (MIT)
3  *
4  * Copyright (c) 2015 Microsoft Corporation
5  *
6  * -=- Robust Distributed System Nucleus (rDSN) -=-
7  *
8  * Permission is hereby granted, free of charge, to any person obtaining a copy
9  * of this software and associated documentation files (the "Software"), to deal
10  * in the Software without restriction, including without limitation the rights
11  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
12  * copies of the Software, and to permit persons to whom the Software is
13  * furnished to do so, subject to the following conditions:
14  *
15  * The above copyright notice and this permission notice shall be included in
16  * all copies or substantial portions of the Software.
17  *
18  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
19  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
20  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
21  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
22  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
23  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
24  * THE SOFTWARE.
25  */
26 
27 /*
28  * Description:
29  * specification for the labeled tasks (task kinds)
30  *
31  * Revision history:
32  * Mar., 2015, @imzhenyu (Zhenyu Guo), first version
33  * xxxx-xx-xx, author, fix bug about xxx
34  */
35 
36 # pragma once
37 
38 # include <dsn/service_api_c.h>
39 # include <dsn/cpp/utils.h>
40 # include <dsn/cpp/config_helper.h>
41 # include <dsn/utility/enum_helper.h>
42 # include <dsn/utility/customizable_id.h>
43 # include <dsn/utility/singleton_vector_store.h>
44 # include <dsn/utility/join_point.h>
45 # include <dsn/utility/extensible_object.h>
46 # include <dsn/utility/exp_delay.h>
47 # include <dsn/utility/dlib.h>
48 # include <dsn/tool-api/perf_counter.h>
49 
50 ENUM_BEGIN(dsn_log_level_t, LOG_LEVEL_INVALID)
51  ENUM_REG(LOG_LEVEL_INFORMATION)
52  ENUM_REG(LOG_LEVEL_DEBUG)
53  ENUM_REG(LOG_LEVEL_WARNING)
54  ENUM_REG(LOG_LEVEL_ERROR)
55  ENUM_REG(LOG_LEVEL_FATAL)
56 ENUM_END(dsn_log_level_t)
57 
58 ENUM_BEGIN(dsn_task_type_t, TASK_TYPE_INVALID)
59  ENUM_REG(TASK_TYPE_RPC_REQUEST)
60  ENUM_REG(TASK_TYPE_RPC_RESPONSE)
61  ENUM_REG(TASK_TYPE_COMPUTE)
62  ENUM_REG(TASK_TYPE_AIO)
63  ENUM_REG(TASK_TYPE_CONTINUATION)
64 ENUM_END(dsn_task_type_t)
65 
66 ENUM_BEGIN(dsn_task_priority_t, TASK_PRIORITY_INVALID)
67  ENUM_REG(TASK_PRIORITY_LOW)
68  ENUM_REG(TASK_PRIORITY_COMMON)
69  ENUM_REG(TASK_PRIORITY_HIGH)
70 ENUM_END(dsn_task_priority_t)
71 
72 namespace dsn {
73 
74 enum worker_priority_t
75 {
76  THREAD_xPRIORITY_LOWEST,
77  THREAD_xPRIORITY_BELOW_NORMAL,
78  THREAD_xPRIORITY_NORMAL,
79  THREAD_xPRIORITY_ABOVE_NORMAL,
80  THREAD_xPRIORITY_HIGHEST,
81  THREAD_xPRIORITY_COUNT,
82  THREAD_xPRIORITY_INVALID
83 };
84 
85 ENUM_BEGIN(worker_priority_t, THREAD_xPRIORITY_INVALID)
86  ENUM_REG(THREAD_xPRIORITY_LOWEST)
87  ENUM_REG(THREAD_xPRIORITY_BELOW_NORMAL)
88  ENUM_REG(THREAD_xPRIORITY_NORMAL)
89  ENUM_REG(THREAD_xPRIORITY_ABOVE_NORMAL)
90  ENUM_REG(THREAD_xPRIORITY_HIGHEST)
91 ENUM_END(worker_priority_t)
92 
93 enum task_state
94 {
95  TASK_STATE_READY,
96  TASK_STATE_RUNNING,
97  TASK_STATE_FINISHED,
98  TASK_STATE_CANCELLED,
99  TASK_STATE_COUNT,
100  TASK_STATE_INVALID
101 };
102 
103 ENUM_BEGIN(task_state, TASK_STATE_INVALID)
104  ENUM_REG(TASK_STATE_READY)
105  ENUM_REG(TASK_STATE_RUNNING)
106  ENUM_REG(TASK_STATE_FINISHED)
107  ENUM_REG(TASK_STATE_CANCELLED)
108 ENUM_END(task_state)
109 
110 typedef enum ioe_mode
111 {
112  IOE_PER_NODE, // each node has shared io engine (rpc/disk/nfs/timer)
113  IOE_PER_QUEUE, // each queue has shared io engine (rpc/disk/nfs/timer)
114  IOE_COUNT,
115  IOE_INVALID
116 } ioe_mode;
117 
118 ENUM_BEGIN(ioe_mode, IOE_INVALID)
119  ENUM_REG(IOE_PER_NODE)
120  ENUM_REG(IOE_PER_QUEUE)
121 ENUM_END(ioe_mode)
122 
123 typedef enum grpc_mode_t
124 {
125  GRPC_TO_LEADER, // the rpc is sent to the leader (if exist)
126  GRPC_TO_ALL, // the rpc is sent to all
127  GRPC_TO_ANY, // the rpc is sent to one of the group member
128  GRPC_COUNT,
129  GRPC_INVALID
130 } grpc_mode_t;
131 
132 ENUM_BEGIN(grpc_mode_t, GRPC_INVALID)
133  ENUM_REG(GRPC_TO_LEADER)
134  ENUM_REG(GRPC_TO_ALL)
135  ENUM_REG(GRPC_TO_ANY)
136 ENUM_END(grpc_mode_t)
137 
138 typedef enum throttling_mode_t
139 {
140  TM_NONE, // no throttling applied
141  TM_REJECT, // reject the incoming request
142  TM_DELAY, // delay network receive ops to reducing incoming rate
143  TM_COUNT,
144  TM_INVALID
145 } throttling_mode_t;
146 
147 ENUM_BEGIN(throttling_mode_t, TM_INVALID)
148  ENUM_REG(TM_NONE)
149  ENUM_REG(TM_REJECT)
150  ENUM_REG(TM_DELAY)
151 ENUM_END(throttling_mode_t)
152 
153 ENUM_BEGIN(dsn_msg_serialize_format, DSF_INVALID)
154  ENUM_REG(DSF_THRIFT_BINARY)
155  ENUM_REG(DSF_THRIFT_COMPACT)
156  ENUM_REG(DSF_THRIFT_JSON)
157  ENUM_REG(DSF_PROTOC_BINARY)
158  ENUM_REG(DSF_PROTOC_JSON)
159 ENUM_END(dsn_msg_serialize_format)
160 
161 // define network header format for RPC
162 DEFINE_CUSTOMIZED_ID_TYPE(network_header_format)
163 DEFINE_CUSTOMIZED_ID(network_header_format, NET_HDR_INVALID)
164 DEFINE_CUSTOMIZED_ID(network_header_format, NET_HDR_DSN)
165 
166 // define network channel types for RPC
167 DEFINE_CUSTOMIZED_ID_TYPE(rpc_channel)
168 DEFINE_CUSTOMIZED_ID(rpc_channel, RPC_CHANNEL_TCP)
169 DEFINE_CUSTOMIZED_ID(rpc_channel, RPC_CHANNEL_UDP)
170 
171 // define thread pool code
172 DEFINE_CUSTOMIZED_ID_TYPE(threadpool_code2)
173 
174 class task;
175 class task_queue;
176 class aio_task;
177 class rpc_request_task;
178 class rpc_response_task;
179 class message_ex;
180 class admission_controller;
181 typedef void (*task_rejection_handler)(task*, admission_controller*);
182 struct rpc_handler_info;
183 
184 typedef struct __io_mode_modifier__
185 {
186  ioe_mode mode; // see ioe_mode for details
187  task_queue* queue; // when mode == IOE_PER_QUEUE
188  int port_shift_value; // port += port_shift_value
189 } io_modifer;
190 
191 class task_spec : public extensible_object<task_spec, 4>
192 {
193 public:
194  DSN_API static task_spec* get(int ec);
195  DSN_API static void register_task_code(dsn_task_code_t code, dsn_task_type_t type, dsn_task_priority_t pri, dsn_threadpool_code_t pool);
196 
197 public:
198  // not configurable [
199  dsn_task_code_t code;
200  dsn_task_type_t type;
201  std::string name;
202  dsn_task_code_t rpc_paired_code;
203  shared_exp_delay rpc_request_delayer;
204  // ]
205 
206  // configurable [
207  dsn_task_priority_t priority;
208  grpc_mode_t grpc_mode; // used when a rpc request is sent to a group address
209  dsn_threadpool_code_t pool_code;
210 
211  // allow task executed in other thread pools or tasks
212  // for TASK_TYPE_COMPUTE - allow-inline allows a task being executed in its caller site
213  // for other tasks - allow-inline allows a task being execution in io-thread
214  bool allow_inline;
215  bool randomize_timer_delay_if_zero; // to avoid many timers executing at the same time
216  network_header_format rpc_call_header_format;
217  dsn_msg_serialize_format rpc_msg_payload_serialize_default_format;
218  rpc_channel rpc_call_channel;
219  bool rpc_message_crc_required;
220 
221  int32_t rpc_timeout_milliseconds;
222  int32_t rpc_request_resend_timeout_milliseconds; // 0 for no auto-resend
223  throttling_mode_t rpc_request_throttling_mode; //
224  std::vector<int> rpc_request_delays_milliseconds; // see exp_delay for delaying recving
225  bool rpc_request_dropped_before_execution_when_timeout;
226 
227  // layer 2 configurations
228  bool rpc_request_layer2_handler_required; // need layer 2 handler
229  bool rpc_request_is_write_operation; // need stateful replication
230  // ]
231 
232  task_rejection_handler rejection_handler;
233 
234  // COMPUTE
239  join_point<void, task*, task*> on_task_create;
240 
241  join_point<void, task*, task*> on_task_enqueue;
242  join_point<void, task*> on_task_begin; // TODO: parent task
243  join_point<void, task*> on_task_end;
244  join_point<void, task*> on_task_cancelled;
245 
246  join_point<void, task*, task*, uint32_t> on_task_wait_pre; // waitor, waitee, timeout
247  join_point<void, task*> on_task_wait_notified;
248  join_point<void, task*, task*, bool> on_task_wait_post; // wait succeeded or timedout
249  join_point<void, task*, task*, bool> on_task_cancel_post; // cancel succeeded or not
250 
251 
252  // AIO
253  join_point<bool, task*, aio_task*> on_aio_call; // return true means continue, otherwise early terminate with task::set_error_code
254  join_point<void, aio_task*> on_aio_enqueue; // aio done, enqueue callback
255 
256  // RPC_REQUEST
257  join_point<bool, task*, message_ex*, rpc_response_task*> on_rpc_call; // return true means continue, otherwise dropped and (optionally) timedout
258  join_point<bool, rpc_request_task*> on_rpc_request_enqueue;
259 
260  // RPC_RESPONSE
261  join_point<bool, task*, message_ex*> on_rpc_reply;
262  join_point<bool, rpc_response_task*> on_rpc_response_enqueue; // response, task
263 
264  // message data flow
265  join_point<void, message_ex*, message_ex*> on_rpc_create_response;
268 public:
269  DSN_API task_spec(int code, const char* name, dsn_task_type_t type, dsn_task_priority_t pri, dsn_threadpool_code_t pool);
270 
271 public:
272  DSN_API static bool init();
273  DSN_API void init_profiling(bool profile);
274 };
275 
276 CONFIG_BEGIN(task_spec)
277  CONFIG_FLD_ENUM(dsn_task_priority_t, priority, TASK_PRIORITY_COMMON, TASK_PRIORITY_INVALID, true, "task priority")
278  CONFIG_FLD_ENUM(grpc_mode_t, grpc_mode, GRPC_TO_LEADER, GRPC_INVALID, false, "group rpc mode: GRPC_TO_LEADER, GRPC_TO_ALL, GRPC_TO_ANY")
279  CONFIG_FLD_ID(threadpool_code2, pool_code, THREAD_POOL_DEFAULT, true, "thread pool to execute the task")
280  CONFIG_FLD(bool, bool, allow_inline, false,
281  "allow task executed in other thread pools or tasks "
282  "for TASK_TYPE_COMPUTE - allow-inline allows a task being executed in its caller site "
283  "for other tasks - allow-inline allows a task being execution in io-thread "
284  )
285  CONFIG_FLD(bool, bool, randomize_timer_delay_if_zero, false, "whether to randomize the timer delay to random(0, timer_interval), if the initial delay is zero, to avoid multiple timers executing at the same time (e.g., checkpointing)")
286  CONFIG_FLD_ID(network_header_format, rpc_call_header_format, NET_HDR_DSN, false, "what kind of header format for this kind of rpc calls")
287  CONFIG_FLD_ENUM(dsn_msg_serialize_format, rpc_msg_payload_serialize_default_format, DSF_THRIFT_BINARY, DSF_INVALID, false, "what kind of payload serialization format for this kind of msgs")
288  CONFIG_FLD_ID(rpc_channel, rpc_call_channel, RPC_CHANNEL_TCP, false, "what kind of network channel for this kind of rpc calls")
289  CONFIG_FLD(bool, bool, rpc_message_crc_required, false, "whether to calculate the crc checksum when send request/response")
290  CONFIG_FLD(int32_t, uint64, rpc_timeout_milliseconds, 5000, "what is the default timeout (ms) for this kind of rpc calls")
291  CONFIG_FLD(int32_t, uint64, rpc_request_resend_timeout_milliseconds, 0, "for how long (ms) the request will be resent if no response is received yet, 0 for disable this feature")
292  CONFIG_FLD_ENUM(throttling_mode_t, rpc_request_throttling_mode, TM_NONE, TM_INVALID, false, "throttling mode for rpc requets: TM_NONE, TM_REJECT, TM_DELAY when queue length > pool.queue_length_throttling_threshold")
293  CONFIG_FLD_INT_LIST(rpc_request_delays_milliseconds, "how many milliseconds to delay recving rpc session for when queue length ~= [1.0, 1.2, 1.4, 1.6, 1.8, >=2.0] x pool.queue_length_throttling_threshold, e.g., 0, 0, 1, 2, 5, 10")
294  CONFIG_FLD(bool, bool, rpc_request_dropped_before_execution_when_timeout, false, "whether to drop a request right before execution when its queueing time is already greater than its timeout value")
295 
296  // layer 2 configurations
297  CONFIG_FLD(bool, bool, rpc_request_layer2_handler_required, false, "whether this request needs to be handled by a layer2 handler (e.g., replicated or partitioned)")
298  CONFIG_FLD(bool, bool, rpc_request_is_write_operation, false, "whether this request updates app's state which needs to be replicated using a replication layer2 handler")
299 
300 CONFIG_END
301 
302 struct threadpool_spec
303 {
304  std::string name;
305  dsn_threadpool_code_t pool_code;
306  int worker_count;
307  worker_priority_t worker_priority;
308  bool worker_share_core;
309  uint64_t worker_affinity_mask;
310  int dequeue_batch_size;
311  bool partitioned; // false by default
312  std::string queue_factory_name;
313  std::string worker_factory_name;
314  std::list<std::string> queue_aspects;
315  std::list<std::string> worker_aspects;
316  int queue_length_throttling_threshold;
317  bool enable_virtual_queue_throttling;
318  std::string admission_controller_factory_name;
319  std::string admission_controller_arguments;
320 
321  threadpool_spec(const dsn_threadpool_code_t& code) : name(dsn_threadpool_code_to_string(code)), pool_code(code) {}
322  threadpool_spec(const threadpool_spec& source) = default;
323  threadpool_spec& operator=(const threadpool_spec& source) = default;
324 
325  DSN_API static bool init(/*out*/ std::vector<threadpool_spec>& specs);
326 };
327 
328 CONFIG_BEGIN(threadpool_spec)
329  // CONFIG_FLD_ID(dsn_threadpool_code_t, pool_code) // no need to define it inside section
330  CONFIG_FLD_STRING(name, "", "thread pool name")
331  CONFIG_FLD(int, uint64, worker_count, 2, "thread/worker count")
332  CONFIG_FLD(int, uint64, dequeue_batch_size, 5, "how many tasks (if available) should be returned for one dequeue call for best batching performance")
333  CONFIG_FLD_ENUM(worker_priority_t, worker_priority, THREAD_xPRIORITY_NORMAL, THREAD_xPRIORITY_INVALID, false, "thread priority")
334  CONFIG_FLD(bool, bool, worker_share_core, true, "whether the threads share all assigned cores")
335  CONFIG_FLD(uint64_t, uint64, worker_affinity_mask, 0, "what CPU cores are assigned to this pool, 0 for all")
336  CONFIG_FLD(bool, bool, partitioned, false, "whethe the threads share a single queue(partitioned=false) or not; the latter is usually for workload hash partitioning for avoiding locking")
337  CONFIG_FLD_STRING(queue_factory_name, "", "task queue provider name")
338  CONFIG_FLD_STRING(worker_factory_name, "", "task worker provider name")
339  CONFIG_FLD_STRING_LIST(queue_aspects, "task queue aspects names, usually for tooling purpose")
340  CONFIG_FLD_STRING_LIST(worker_aspects, "task aspects names, usually for tooling purpose")
341  CONFIG_FLD(int, uint64, queue_length_throttling_threshold, 1000000, "throttling: throttling threshold above which rpc requests will be dropped")
342  CONFIG_FLD(bool, bool, enable_virtual_queue_throttling, false, "throttling: whether to enable throttling with virtual queues")
343  CONFIG_FLD_STRING(admission_controller_factory_name, "", "customized admission controller for the task queues")
344  CONFIG_FLD_STRING(admission_controller_arguments, "", "arguments for the cusotmized admission controller")
345 CONFIG_END
346 
347 } // end namespace
348 
dsn_task_priority_t
task priority
Definition: api_task.h:90
task handling rpc response or timeout
Definition: api_task.h:53
callback for file read and write
Definition: api_task.h:55
Definition: task_spec.h:184
Definition: task_spec.h:191
Definition: address.h:52
task handling rpc request
Definition: api_task.h:52
async calls or timers
Definition: api_task.h:54
above tasks are seperated into several continuation tasks by thread-synchronization operations...
Definition: api_task.h:56
dsn_task_type_t
task/event type definition
Definition: api_task.h:50
dsn_msg_serialize_format
define various serialization format supported by rDSN, note any changes here must also be reflected i...
Definition: api_layer1.h:642