38 # include <dsn/service_api_c.h>    39 # include <dsn/cpp/utils.h>    40 # include <dsn/cpp/config_helper.h>    41 # include <dsn/utility/enum_helper.h>    42 # include <dsn/utility/customizable_id.h>    43 # include <dsn/utility/singleton_vector_store.h>    44 # include <dsn/utility/join_point.h>    45 # include <dsn/utility/extensible_object.h>    46 # include <dsn/utility/exp_delay.h>    47 # include <dsn/utility/dlib.h>    48 # include <dsn/tool-api/perf_counter.h>    50 ENUM_BEGIN(dsn_log_level_t, LOG_LEVEL_INVALID)
    51     ENUM_REG(LOG_LEVEL_INFORMATION)
    52     ENUM_REG(LOG_LEVEL_DEBUG)
    53     ENUM_REG(LOG_LEVEL_WARNING)
    54     ENUM_REG(LOG_LEVEL_ERROR)
    55     ENUM_REG(LOG_LEVEL_FATAL)
    56 ENUM_END(dsn_log_level_t)
    64 ENUM_END(dsn_task_type_t)
    67     ENUM_REG(TASK_PRIORITY_LOW)
    68     ENUM_REG(TASK_PRIORITY_COMMON)
    69     ENUM_REG(TASK_PRIORITY_HIGH)
    70 ENUM_END(dsn_task_priority_t)
    74 enum worker_priority_t
    76     THREAD_xPRIORITY_LOWEST,
    77     THREAD_xPRIORITY_BELOW_NORMAL,
    78     THREAD_xPRIORITY_NORMAL,
    79     THREAD_xPRIORITY_ABOVE_NORMAL,
    80     THREAD_xPRIORITY_HIGHEST,
    81     THREAD_xPRIORITY_COUNT,
    82     THREAD_xPRIORITY_INVALID
    85 ENUM_BEGIN(worker_priority_t, THREAD_xPRIORITY_INVALID)
    86     ENUM_REG(THREAD_xPRIORITY_LOWEST)
    87     ENUM_REG(THREAD_xPRIORITY_BELOW_NORMAL)
    88     ENUM_REG(THREAD_xPRIORITY_NORMAL)
    89     ENUM_REG(THREAD_xPRIORITY_ABOVE_NORMAL)
    90     ENUM_REG(THREAD_xPRIORITY_HIGHEST)
    91 ENUM_END(worker_priority_t)
   103 ENUM_BEGIN(task_state, TASK_STATE_INVALID)
   104     ENUM_REG(TASK_STATE_READY)
   105     ENUM_REG(TASK_STATE_RUNNING)
   106     ENUM_REG(TASK_STATE_FINISHED)
   107     ENUM_REG(TASK_STATE_CANCELLED)
   110 typedef enum ioe_mode
   118 ENUM_BEGIN(ioe_mode, IOE_INVALID)
   119     ENUM_REG(IOE_PER_NODE)
   120     ENUM_REG(IOE_PER_QUEUE)
   123 typedef enum grpc_mode_t
   132 ENUM_BEGIN(grpc_mode_t, GRPC_INVALID)
   133     ENUM_REG(GRPC_TO_LEADER)
   134     ENUM_REG(GRPC_TO_ALL)
   135     ENUM_REG(GRPC_TO_ANY)
   136 ENUM_END(grpc_mode_t)
   138 typedef enum throttling_mode_t
   147 ENUM_BEGIN(throttling_mode_t, TM_INVALID)
   151 ENUM_END(throttling_mode_t)
   154     ENUM_REG(DSF_THRIFT_BINARY)
   155     ENUM_REG(DSF_THRIFT_COMPACT)
   156     ENUM_REG(DSF_THRIFT_JSON)
   157     ENUM_REG(DSF_PROTOC_BINARY)
   158     ENUM_REG(DSF_PROTOC_JSON)
   159 ENUM_END(dsn_msg_serialize_format)
   162 DEFINE_CUSTOMIZED_ID_TYPE(network_header_format)
   163 DEFINE_CUSTOMIZED_ID(network_header_format, NET_HDR_INVALID)
   164 DEFINE_CUSTOMIZED_ID(network_header_format, NET_HDR_DSN)
   167 DEFINE_CUSTOMIZED_ID_TYPE(rpc_channel)
   168 DEFINE_CUSTOMIZED_ID(rpc_channel, RPC_CHANNEL_TCP)
   169 DEFINE_CUSTOMIZED_ID(rpc_channel, RPC_CHANNEL_UDP)
   172 DEFINE_CUSTOMIZED_ID_TYPE(threadpool_code2)
   177 class rpc_request_task;
   178 class rpc_response_task;
   180 class admission_controller;
   181 typedef 
void (*task_rejection_handler)(task*, admission_controller*);
   182 struct rpc_handler_info;
   188     int port_shift_value; 
   191 class task_spec : 
public extensible_object<task_spec, 4>
   195     DSN_API 
static void register_task_code(dsn_task_code_t code, dsn_task_type_t type, dsn_task_priority_t pri, dsn_threadpool_code_t pool);
   199     dsn_task_code_t        code;
   200     dsn_task_type_t        type;
   202     dsn_task_code_t        rpc_paired_code;
   203     shared_exp_delay       rpc_request_delayer;
   207     dsn_task_priority_t    priority;
   208     grpc_mode_t            grpc_mode; 
   209     dsn_threadpool_code_t  pool_code;
   215     bool                   randomize_timer_delay_if_zero; 
   216     network_header_format  rpc_call_header_format;
   218     rpc_channel            rpc_call_channel;
   219     bool                   rpc_message_crc_required;
   221     int32_t                rpc_timeout_milliseconds;
   222     int32_t                rpc_request_resend_timeout_milliseconds; 
   223     throttling_mode_t      rpc_request_throttling_mode; 
   224     std::vector<int>       rpc_request_delays_milliseconds; 
   225     bool                   rpc_request_dropped_before_execution_when_timeout;
   228     bool                   rpc_request_layer2_handler_required; 
   229     bool                   rpc_request_is_write_operation;      
   232     task_rejection_handler rejection_handler;
   239     join_point<void, task*, task*>               on_task_create;
   241     join_point<void, task*, task*>               on_task_enqueue;    
   242     join_point<void, task*>                      on_task_begin; 
   243     join_point<void, task*>                      on_task_end;
   244     join_point<void, task*>                      on_task_cancelled;
   246     join_point<void, task*, task*, uint32_t>     on_task_wait_pre; 
   247     join_point<void, task*>                      on_task_wait_notified;
   248     join_point<void, task*, task*, bool>         on_task_wait_post; 
   249     join_point<void, task*, task*, bool>         on_task_cancel_post; 
   253     join_point<bool, task*, aio_task*>           on_aio_call; 
   254     join_point<void, aio_task*>                  on_aio_enqueue; 
   257     join_point<bool, task*, message_ex*, rpc_response_task*>  on_rpc_call; 
   258     join_point<bool, rpc_request_task*>          on_rpc_request_enqueue;
   261     join_point<bool, task*, message_ex*>         on_rpc_reply;
   262     join_point<bool, rpc_response_task*>         on_rpc_response_enqueue; 
   265     join_point<void, message_ex*, message_ex*>   on_rpc_create_response;
   269     DSN_API 
task_spec(
int code, 
const char* name, dsn_task_type_t type, dsn_task_priority_t pri, dsn_threadpool_code_t pool);
   272     DSN_API 
static bool init();
   273     DSN_API 
void init_profiling(
bool profile);
   277     CONFIG_FLD_ENUM(dsn_task_priority_t, priority, TASK_PRIORITY_COMMON, TASK_PRIORITY_INVALID, 
true, 
"task priority")
   278     CONFIG_FLD_ENUM(grpc_mode_t, grpc_mode, GRPC_TO_LEADER, GRPC_INVALID, 
false, 
"group rpc mode: GRPC_TO_LEADER, GRPC_TO_ALL, GRPC_TO_ANY")
   279     CONFIG_FLD_ID(threadpool_code2, pool_code, THREAD_POOL_DEFAULT, 
true, 
"thread pool to execute the task")
   280     CONFIG_FLD(
bool, 
bool, allow_inline, 
false, 
   281         "allow task executed in other thread pools or tasks "   282         "for TASK_TYPE_COMPUTE - allow-inline allows a task being executed in its caller site "   283         "for other tasks - allow-inline allows a task being execution in io-thread "           285     CONFIG_FLD(
bool, 
bool, randomize_timer_delay_if_zero, 
false, 
"whether to randomize the timer delay to random(0, timer_interval), if the initial delay is zero, to avoid multiple timers executing at the same time (e.g., checkpointing)")
   286     CONFIG_FLD_ID(network_header_format, rpc_call_header_format, NET_HDR_DSN, 
false, 
"what kind of header format for this kind of rpc calls")
   287     CONFIG_FLD_ENUM(
dsn_msg_serialize_format, rpc_msg_payload_serialize_default_format, DSF_THRIFT_BINARY, DSF_INVALID, 
false, 
"what kind of payload serialization format for this kind of msgs")
   288     CONFIG_FLD_ID(rpc_channel, rpc_call_channel, RPC_CHANNEL_TCP, 
false, 
"what kind of network channel for this kind of rpc calls")
   289     CONFIG_FLD(
bool, 
bool, rpc_message_crc_required, 
false, 
"whether to calculate the crc checksum when send request/response")
   290     CONFIG_FLD(int32_t, uint64, rpc_timeout_milliseconds, 5000, 
"what is the default timeout (ms) for this kind of rpc calls")    
   291     CONFIG_FLD(int32_t, uint64, rpc_request_resend_timeout_milliseconds, 0, 
"for how long (ms) the request will be resent if no response is received yet, 0 for disable this feature")
   292     CONFIG_FLD_ENUM(throttling_mode_t, rpc_request_throttling_mode, TM_NONE, TM_INVALID, 
false, 
"throttling mode for rpc requets: TM_NONE, TM_REJECT, TM_DELAY when queue length > pool.queue_length_throttling_threshold")
   293     CONFIG_FLD_INT_LIST(rpc_request_delays_milliseconds, 
"how many milliseconds to delay recving rpc session for when queue length ~= [1.0, 1.2, 1.4, 1.6, 1.8, >=2.0] x pool.queue_length_throttling_threshold, e.g., 0, 0, 1, 2, 5, 10")
   294     CONFIG_FLD(
bool, 
bool, rpc_request_dropped_before_execution_when_timeout, 
false, 
"whether to drop a request right before execution when its queueing time is already greater than its timeout value")    
   297     CONFIG_FLD(
bool, 
bool, rpc_request_layer2_handler_required, 
false, 
"whether this request needs to be handled by a layer2 handler (e.g., replicated or partitioned)")
   298     CONFIG_FLD(
bool, 
bool, rpc_request_is_write_operation, 
false, 
"whether this request updates app's state which needs to be replicated using a replication layer2 handler")
   302 struct threadpool_spec
   305     dsn_threadpool_code_t   pool_code;
   307     worker_priority_t       worker_priority;
   308     bool                    worker_share_core;
   309     uint64_t                worker_affinity_mask;
   310     int                     dequeue_batch_size;
   312     std::string             queue_factory_name;
   313     std::string             worker_factory_name;
   314     std::list<std::string>  queue_aspects;
   315     std::list<std::string>  worker_aspects;
   316     int                     queue_length_throttling_threshold;
   317     bool                    enable_virtual_queue_throttling;
   318     std::string             admission_controller_factory_name;
   319     std::string             admission_controller_arguments;
   321     threadpool_spec(
const dsn_threadpool_code_t& code) : name(dsn_threadpool_code_to_string(code)), pool_code(code) {}
   322     threadpool_spec(
const threadpool_spec& source) = 
default;
   323     threadpool_spec& operator=(
const threadpool_spec& source) = 
default;
   325     DSN_API 
static bool init( std::vector<threadpool_spec>& specs);
   328 CONFIG_BEGIN(threadpool_spec)
   330     CONFIG_FLD_STRING(name, 
"", 
"thread pool name")
   331     CONFIG_FLD(
int, uint64, worker_count, 2, 
"thread/worker count")
   332     CONFIG_FLD(
int, uint64, dequeue_batch_size, 5, 
"how many tasks (if available) should be returned for one dequeue call for best batching performance") 
   333     CONFIG_FLD_ENUM(worker_priority_t, worker_priority, THREAD_xPRIORITY_NORMAL, THREAD_xPRIORITY_INVALID, 
false, 
"thread priority")
   334     CONFIG_FLD(
bool, 
bool, worker_share_core, 
true, 
"whether the threads share all assigned cores")
   335     CONFIG_FLD(uint64_t, uint64, worker_affinity_mask, 0, 
"what CPU cores are assigned to this pool, 0 for all")
   336     CONFIG_FLD(
bool, 
bool, partitioned, 
false, 
"whethe the threads share a single queue(partitioned=false) or not; the latter is usually for workload hash partitioning for avoiding locking")
   337     CONFIG_FLD_STRING(queue_factory_name, 
"", 
"task queue provider name")
   338     CONFIG_FLD_STRING(worker_factory_name, 
"", 
"task worker provider name")
   339     CONFIG_FLD_STRING_LIST(queue_aspects, 
"task queue aspects names, usually for tooling purpose")
   340     CONFIG_FLD_STRING_LIST(worker_aspects, 
"task aspects names, usually for tooling purpose")    
   341     CONFIG_FLD(
int, uint64, queue_length_throttling_threshold, 1000000, 
"throttling: throttling threshold above which rpc requests will be dropped")
   342     CONFIG_FLD(
bool, 
bool, enable_virtual_queue_throttling, 
false, 
"throttling: whether to enable throttling with virtual queues")        
   343     CONFIG_FLD_STRING(admission_controller_factory_name, 
"", 
"customized admission controller for the task queues")
   344     CONFIG_FLD_STRING(admission_controller_arguments, 
"", 
"arguments for the cusotmized admission controller")
 dsn_task_priority_t
task priority 
Definition: api_task.h:90
task handling rpc response or timeout 
Definition: api_task.h:53
callback for file read and write 
Definition: api_task.h:55
Definition: task_spec.h:184
Definition: task_spec.h:191
task handling rpc request 
Definition: api_task.h:52
async calls or timers 
Definition: api_task.h:54
above tasks are seperated into several continuation tasks by thread-synchronization operations...
Definition: api_task.h:56
dsn_task_type_t
task/event type definition 
Definition: api_task.h:50
dsn_msg_serialize_format
define various serialization format supported by rDSN, note any changes here must also be reflected i...
Definition: api_layer1.h:642