38 # include <dsn/service_api_c.h> 39 # include <dsn/cpp/utils.h> 40 # include <dsn/cpp/config_helper.h> 41 # include <dsn/utility/enum_helper.h> 42 # include <dsn/utility/customizable_id.h> 43 # include <dsn/utility/singleton_vector_store.h> 44 # include <dsn/utility/join_point.h> 45 # include <dsn/utility/extensible_object.h> 46 # include <dsn/utility/exp_delay.h> 47 # include <dsn/utility/dlib.h> 48 # include <dsn/tool-api/perf_counter.h> 50 ENUM_BEGIN(dsn_log_level_t, LOG_LEVEL_INVALID)
51 ENUM_REG(LOG_LEVEL_INFORMATION)
52 ENUM_REG(LOG_LEVEL_DEBUG)
53 ENUM_REG(LOG_LEVEL_WARNING)
54 ENUM_REG(LOG_LEVEL_ERROR)
55 ENUM_REG(LOG_LEVEL_FATAL)
56 ENUM_END(dsn_log_level_t)
64 ENUM_END(dsn_task_type_t)
67 ENUM_REG(TASK_PRIORITY_LOW)
68 ENUM_REG(TASK_PRIORITY_COMMON)
69 ENUM_REG(TASK_PRIORITY_HIGH)
70 ENUM_END(dsn_task_priority_t)
74 enum worker_priority_t
76 THREAD_xPRIORITY_LOWEST,
77 THREAD_xPRIORITY_BELOW_NORMAL,
78 THREAD_xPRIORITY_NORMAL,
79 THREAD_xPRIORITY_ABOVE_NORMAL,
80 THREAD_xPRIORITY_HIGHEST,
81 THREAD_xPRIORITY_COUNT,
82 THREAD_xPRIORITY_INVALID
85 ENUM_BEGIN(worker_priority_t, THREAD_xPRIORITY_INVALID)
86 ENUM_REG(THREAD_xPRIORITY_LOWEST)
87 ENUM_REG(THREAD_xPRIORITY_BELOW_NORMAL)
88 ENUM_REG(THREAD_xPRIORITY_NORMAL)
89 ENUM_REG(THREAD_xPRIORITY_ABOVE_NORMAL)
90 ENUM_REG(THREAD_xPRIORITY_HIGHEST)
91 ENUM_END(worker_priority_t)
103 ENUM_BEGIN(task_state, TASK_STATE_INVALID)
104 ENUM_REG(TASK_STATE_READY)
105 ENUM_REG(TASK_STATE_RUNNING)
106 ENUM_REG(TASK_STATE_FINISHED)
107 ENUM_REG(TASK_STATE_CANCELLED)
110 typedef enum ioe_mode
118 ENUM_BEGIN(ioe_mode, IOE_INVALID)
119 ENUM_REG(IOE_PER_NODE)
120 ENUM_REG(IOE_PER_QUEUE)
123 typedef enum grpc_mode_t
132 ENUM_BEGIN(grpc_mode_t, GRPC_INVALID)
133 ENUM_REG(GRPC_TO_LEADER)
134 ENUM_REG(GRPC_TO_ALL)
135 ENUM_REG(GRPC_TO_ANY)
136 ENUM_END(grpc_mode_t)
138 typedef enum throttling_mode_t
147 ENUM_BEGIN(throttling_mode_t, TM_INVALID)
151 ENUM_END(throttling_mode_t)
154 ENUM_REG(DSF_THRIFT_BINARY)
155 ENUM_REG(DSF_THRIFT_COMPACT)
156 ENUM_REG(DSF_THRIFT_JSON)
157 ENUM_REG(DSF_PROTOC_BINARY)
158 ENUM_REG(DSF_PROTOC_JSON)
159 ENUM_END(dsn_msg_serialize_format)
162 DEFINE_CUSTOMIZED_ID_TYPE(network_header_format)
163 DEFINE_CUSTOMIZED_ID(network_header_format, NET_HDR_INVALID)
164 DEFINE_CUSTOMIZED_ID(network_header_format, NET_HDR_DSN)
167 DEFINE_CUSTOMIZED_ID_TYPE(rpc_channel)
168 DEFINE_CUSTOMIZED_ID(rpc_channel, RPC_CHANNEL_TCP)
169 DEFINE_CUSTOMIZED_ID(rpc_channel, RPC_CHANNEL_UDP)
172 DEFINE_CUSTOMIZED_ID_TYPE(threadpool_code2)
177 class rpc_request_task;
178 class rpc_response_task;
180 class admission_controller;
181 typedef
void (*task_rejection_handler)(task*, admission_controller*);
182 struct rpc_handler_info;
188 int port_shift_value;
191 class task_spec :
public extensible_object<task_spec, 4>
195 DSN_API
static void register_task_code(dsn_task_code_t code, dsn_task_type_t type, dsn_task_priority_t pri, dsn_threadpool_code_t pool);
199 dsn_task_code_t code;
200 dsn_task_type_t type;
202 dsn_task_code_t rpc_paired_code;
203 shared_exp_delay rpc_request_delayer;
207 dsn_task_priority_t priority;
208 grpc_mode_t grpc_mode;
209 dsn_threadpool_code_t pool_code;
215 bool randomize_timer_delay_if_zero;
216 network_header_format rpc_call_header_format;
218 rpc_channel rpc_call_channel;
219 bool rpc_message_crc_required;
221 int32_t rpc_timeout_milliseconds;
222 int32_t rpc_request_resend_timeout_milliseconds;
223 throttling_mode_t rpc_request_throttling_mode;
224 std::vector<int> rpc_request_delays_milliseconds;
225 bool rpc_request_dropped_before_execution_when_timeout;
228 bool rpc_request_layer2_handler_required;
229 bool rpc_request_is_write_operation;
232 task_rejection_handler rejection_handler;
239 join_point<void, task*, task*> on_task_create;
241 join_point<void, task*, task*> on_task_enqueue;
242 join_point<void, task*> on_task_begin;
243 join_point<void, task*> on_task_end;
244 join_point<void, task*> on_task_cancelled;
246 join_point<void, task*, task*, uint32_t> on_task_wait_pre;
247 join_point<void, task*> on_task_wait_notified;
248 join_point<void, task*, task*, bool> on_task_wait_post;
249 join_point<void, task*, task*, bool> on_task_cancel_post;
253 join_point<bool, task*, aio_task*> on_aio_call;
254 join_point<void, aio_task*> on_aio_enqueue;
257 join_point<bool, task*, message_ex*, rpc_response_task*> on_rpc_call;
258 join_point<bool, rpc_request_task*> on_rpc_request_enqueue;
261 join_point<bool, task*, message_ex*> on_rpc_reply;
262 join_point<bool, rpc_response_task*> on_rpc_response_enqueue;
265 join_point<void, message_ex*, message_ex*> on_rpc_create_response;
269 DSN_API
task_spec(
int code,
const char* name, dsn_task_type_t type, dsn_task_priority_t pri, dsn_threadpool_code_t pool);
272 DSN_API
static bool init();
273 DSN_API
void init_profiling(
bool profile);
277 CONFIG_FLD_ENUM(dsn_task_priority_t, priority, TASK_PRIORITY_COMMON, TASK_PRIORITY_INVALID,
true,
"task priority")
278 CONFIG_FLD_ENUM(grpc_mode_t, grpc_mode, GRPC_TO_LEADER, GRPC_INVALID,
false,
"group rpc mode: GRPC_TO_LEADER, GRPC_TO_ALL, GRPC_TO_ANY")
279 CONFIG_FLD_ID(threadpool_code2, pool_code, THREAD_POOL_DEFAULT,
true,
"thread pool to execute the task")
280 CONFIG_FLD(
bool,
bool, allow_inline,
false,
281 "allow task executed in other thread pools or tasks " 282 "for TASK_TYPE_COMPUTE - allow-inline allows a task being executed in its caller site " 283 "for other tasks - allow-inline allows a task being execution in io-thread " 285 CONFIG_FLD(
bool,
bool, randomize_timer_delay_if_zero,
false,
"whether to randomize the timer delay to random(0, timer_interval), if the initial delay is zero, to avoid multiple timers executing at the same time (e.g., checkpointing)")
286 CONFIG_FLD_ID(network_header_format, rpc_call_header_format, NET_HDR_DSN,
false,
"what kind of header format for this kind of rpc calls")
287 CONFIG_FLD_ENUM(
dsn_msg_serialize_format, rpc_msg_payload_serialize_default_format, DSF_THRIFT_BINARY, DSF_INVALID,
false,
"what kind of payload serialization format for this kind of msgs")
288 CONFIG_FLD_ID(rpc_channel, rpc_call_channel, RPC_CHANNEL_TCP,
false,
"what kind of network channel for this kind of rpc calls")
289 CONFIG_FLD(
bool,
bool, rpc_message_crc_required,
false,
"whether to calculate the crc checksum when send request/response")
290 CONFIG_FLD(int32_t, uint64, rpc_timeout_milliseconds, 5000,
"what is the default timeout (ms) for this kind of rpc calls")
291 CONFIG_FLD(int32_t, uint64, rpc_request_resend_timeout_milliseconds, 0,
"for how long (ms) the request will be resent if no response is received yet, 0 for disable this feature")
292 CONFIG_FLD_ENUM(throttling_mode_t, rpc_request_throttling_mode, TM_NONE, TM_INVALID,
false,
"throttling mode for rpc requets: TM_NONE, TM_REJECT, TM_DELAY when queue length > pool.queue_length_throttling_threshold")
293 CONFIG_FLD_INT_LIST(rpc_request_delays_milliseconds,
"how many milliseconds to delay recving rpc session for when queue length ~= [1.0, 1.2, 1.4, 1.6, 1.8, >=2.0] x pool.queue_length_throttling_threshold, e.g., 0, 0, 1, 2, 5, 10")
294 CONFIG_FLD(
bool,
bool, rpc_request_dropped_before_execution_when_timeout,
false,
"whether to drop a request right before execution when its queueing time is already greater than its timeout value")
297 CONFIG_FLD(
bool,
bool, rpc_request_layer2_handler_required,
false,
"whether this request needs to be handled by a layer2 handler (e.g., replicated or partitioned)")
298 CONFIG_FLD(
bool,
bool, rpc_request_is_write_operation,
false,
"whether this request updates app's state which needs to be replicated using a replication layer2 handler")
302 struct threadpool_spec
305 dsn_threadpool_code_t pool_code;
307 worker_priority_t worker_priority;
308 bool worker_share_core;
309 uint64_t worker_affinity_mask;
310 int dequeue_batch_size;
312 std::string queue_factory_name;
313 std::string worker_factory_name;
314 std::list<std::string> queue_aspects;
315 std::list<std::string> worker_aspects;
316 int queue_length_throttling_threshold;
317 bool enable_virtual_queue_throttling;
318 std::string admission_controller_factory_name;
319 std::string admission_controller_arguments;
321 threadpool_spec(
const dsn_threadpool_code_t& code) : name(dsn_threadpool_code_to_string(code)), pool_code(code) {}
322 threadpool_spec(
const threadpool_spec& source) =
default;
323 threadpool_spec& operator=(
const threadpool_spec& source) =
default;
325 DSN_API
static bool init( std::vector<threadpool_spec>& specs);
328 CONFIG_BEGIN(threadpool_spec)
330 CONFIG_FLD_STRING(name,
"",
"thread pool name")
331 CONFIG_FLD(
int, uint64, worker_count, 2,
"thread/worker count")
332 CONFIG_FLD(
int, uint64, dequeue_batch_size, 5,
"how many tasks (if available) should be returned for one dequeue call for best batching performance")
333 CONFIG_FLD_ENUM(worker_priority_t, worker_priority, THREAD_xPRIORITY_NORMAL, THREAD_xPRIORITY_INVALID,
false,
"thread priority")
334 CONFIG_FLD(
bool,
bool, worker_share_core,
true,
"whether the threads share all assigned cores")
335 CONFIG_FLD(uint64_t, uint64, worker_affinity_mask, 0,
"what CPU cores are assigned to this pool, 0 for all")
336 CONFIG_FLD(
bool,
bool, partitioned,
false,
"whethe the threads share a single queue(partitioned=false) or not; the latter is usually for workload hash partitioning for avoiding locking")
337 CONFIG_FLD_STRING(queue_factory_name,
"",
"task queue provider name")
338 CONFIG_FLD_STRING(worker_factory_name,
"",
"task worker provider name")
339 CONFIG_FLD_STRING_LIST(queue_aspects,
"task queue aspects names, usually for tooling purpose")
340 CONFIG_FLD_STRING_LIST(worker_aspects,
"task aspects names, usually for tooling purpose")
341 CONFIG_FLD(
int, uint64, queue_length_throttling_threshold, 1000000,
"throttling: throttling threshold above which rpc requests will be dropped")
342 CONFIG_FLD(
bool,
bool, enable_virtual_queue_throttling,
false,
"throttling: whether to enable throttling with virtual queues")
343 CONFIG_FLD_STRING(admission_controller_factory_name,
"",
"customized admission controller for the task queues")
344 CONFIG_FLD_STRING(admission_controller_arguments,
"",
"arguments for the cusotmized admission controller")
dsn_task_priority_t
task priority
Definition: api_task.h:90
task handling rpc response or timeout
Definition: api_task.h:53
callback for file read and write
Definition: api_task.h:55
Definition: task_spec.h:184
Definition: task_spec.h:191
task handling rpc request
Definition: api_task.h:52
async calls or timers
Definition: api_task.h:54
above tasks are seperated into several continuation tasks by thread-synchronization operations...
Definition: api_task.h:56
dsn_task_type_t
task/event type definition
Definition: api_task.h:50
dsn_msg_serialize_format
define various serialization format supported by rDSN, note any changes here must also be reflected i...
Definition: api_layer1.h:642