Robust Distributed System Nucleus (rDSN)
ver 1.0.0
|
rDSN adopts the event-driven programming model, where all computations (event handlers) are represented as individual tasks; each is the execution of a sequential piece of code in one thread. Specifically, rDSN categorizes the tasks into four types, as defined in dsn_task_type_t.
Unlike the traditional event-driven programming, rDSN enhances the model in the following ways, with which they control the application in many aspects in a declarative approach.
each task is labeled with a task code, with which developers can configure many aspects in config files. Developers can define new task code using DEFINE_TASK_CODE, or dsn_task_code_register.
[task..default] ; allow task executed in other thread pools or tasks ; for TASK_TYPE_COMPUTE - allow-inline allows a task being executed in its caller site ; for other tasks - allow-inline allows a task being execution in io-thread allow_inline = false
; group rpc mode with group address: GRPC_TO_LEADER, GRPC_TO_ALL, GRPC_TO_ANY grpc_mode = GRPC_TO_LEADER
; when toollet profiler is enabled is_profile = true
; when toollet tracer is enabled is_trace = true
; thread pool to execute the task pool_code = THREAD_POOL_DEFAULT
; task priority priority = TASK_PRIORITY_COMMON
; whether to randomize the timer delay to random(0, timer_interval), ; if the initial delay is zero, to avoid multiple timers executing at the same time (e.g., checkpointing) randomize_timer_delay_if_zero = false
; what kind of network channel for this kind of rpc calls rpc_call_channel = RPC_CHANNEL_TCP
; what kind of header format for this kind of rpc calls rpc_call_header_format = NET_HDR_DSN
; how many milliseconds to delay recving rpc session for ; when queue length ~= [1.0, 1.2, 1.4, 1.6, 1.8, >=2.0] x pool.queue_length_throttling_threshold, ; e.g., 0, 0, 1, 2, 5, 10 rpc_request_delays_milliseconds = 0, 0, 1, 2, 5, 10
; whether to drop a request right before execution when its queueing time ; is already greater than its timeout value rpc_request_dropped_before_execution_when_timeout = false
; for how long (ms) the request will be resent if no response ; is received yet, 0 for disable this feature rpc_request_resend_timeout_milliseconds = 0
; throttling mode for rpc requets: TM_NONE, TM_REJECT, TM_DELAY when ; queue length > pool.queue_length_throttling_threshold rpc_request_throttling_mode = TM_NONE
; what is the default timeout (ms) for this kind of rpc calls rpc_timeout_milliseconds = 5000
[task.LPC_AIO_IMMEDIATE_CALLBACK] ; override the option in [task..default] allow_inline = true
each task code is bound to a thread pool, which can be customized as follows. Developers can define new thread pools using DEFINE_THREAD_POOL_CODE, or dsn_threadpool_code_register.
[threadpool..default]
; how many tasks (if available) should be returned for ; one dequeue call for best batching performance dequeue_batch_size = 5
; throttling: whether to enable throttling with virtual queues enable_virtual_queue_throttling = false
; thread pool name name = THREAD_POOL_INVALID
; whethe the threads share a single queue(partitioned=false) or not; ; the latter is usually for workload hash partitioning for avoiding locking partitioned = false
; task queue aspects names, usually for tooling purpose queue_aspects =
; task queue provider name queue_factory_name = dsn::tools::hpc_concurrent_task_queue
; throttling: throttling threshold above which rpc requests will be dropped queue_length_throttling_threshold = 1000000
; what CPU cores are assigned to this pool, 0 for all worker_affinity_mask = 0
; task aspects names, usually for tooling purpose worker_aspects =
; thread/worker count worker_count = 2
; task worker provider name worker_factory_name =
; thread priority worker_priority = THREAD_xPRIORITY_NORMAL
; whether the threads share all assigned cores worker_share_core = true
[threadpool.THREAD_POOL_DEFAULT] ; override default options in [threadpool..default] dequeue_batch_size = 5
Typedefs | |
typedef enum dsn_task_type_t | dsn_task_type_t |
typedef void(* | dsn_task_handler_t) (void *) |
typedef void(* | dsn_rpc_request_handler_t) (dsn_message_t, void *) |
typedef void(* | dsn_rpc_response_handler_t) (dsn_error_t, dsn_message_t, dsn_message_t, void *) |
typedef void(* | dsn_aio_handler_t) (dsn_error_t, size_t, void *) |
typedef enum dsn_task_priority_t | dsn_task_priority_t |
typedef void(* | dsn_task_cancelled_handler_t) (void *) |
Enumerations | |
enum | dsn_task_type_t { TASK_TYPE_RPC_REQUEST, TASK_TYPE_RPC_RESPONSE, TASK_TYPE_COMPUTE, TASK_TYPE_AIO, TASK_TYPE_CONTINUATION, TASK_TYPE_COUNT, TASK_TYPE_INVALID } |
enum | dsn_task_priority_t { TASK_PRIORITY_LOW, TASK_PRIORITY_COMMON, TASK_PRIORITY_HIGH, TASK_PRIORITY_COUNT, TASK_PRIORITY_INVALID } |
Functions | |
DSN_API dsn_threadpool_code_t | dsn_threadpool_code_register (const char *name) |
DSN_API const char * | dsn_threadpool_code_to_string (dsn_threadpool_code_t pool_code) |
DSN_API dsn_threadpool_code_t | dsn_threadpool_code_from_string (const char *s, dsn_threadpool_code_t default_code) |
DSN_API int | dsn_threadpool_code_max () |
DSN_API int | dsn_threadpool_get_current_tid () |
DSN_API dsn_task_code_t | dsn_task_code_register (const char *name, dsn_task_type_t type, dsn_task_priority_t, dsn_threadpool_code_t pool) |
DSN_API void | dsn_task_code_query (dsn_task_code_t code, dsn_task_type_t *ptype, dsn_task_priority_t *ppri, dsn_threadpool_code_t *ppool) |
DSN_API void | dsn_task_code_set_threadpool (dsn_task_code_t code, dsn_threadpool_code_t pool) |
DSN_API void | dsn_task_code_set_priority (dsn_task_code_t code, dsn_task_priority_t pri) |
DSN_API const char * | dsn_task_code_to_string (dsn_task_code_t code) |
DSN_API dsn_task_code_t | dsn_task_code_from_string (const char *s, dsn_task_code_t default_code) |
DSN_API int | dsn_task_code_max () |
DSN_API const char * | dsn_task_type_to_string (dsn_task_type_t tt) |
DSN_API const char * | dsn_task_priority_to_string (dsn_task_priority_t tt) |
DSN_API volatile int * | dsn_task_queue_virtual_length_ptr (dsn_task_code_t code, int hash DEFAULT(0)) |
DSN_API void | dsn_task_add_ref (dsn_task_t task) |
DSN_API void | dsn_task_release_ref (dsn_task_t task) |
DSN_API int | dsn_task_get_ref (dsn_task_t task) |
DSN_API bool | dsn_task_cancel (dsn_task_t task, bool wait_until_finished) |
DSN_API void | dsn_task_set_delay (dsn_task_t task, int delay_ms) |
DSN_API bool | dsn_task_cancel2 (dsn_task_t task, bool wait_until_finished, bool *finished) |
DSN_API void | dsn_task_cancel_current_timer () |
DSN_API void | dsn_task_wait (dsn_task_t task) |
DSN_API bool | dsn_task_wait_timeout (dsn_task_t task, int timeout_milliseconds) |
DSN_API dsn_error_t | dsn_task_error (dsn_task_t task) |
DSN_API bool | dsn_task_is_running_inside (dsn_task_t t) |
DSN_API dsn_task_tracker_t | dsn_task_tracker_create (int task_bucket_count) |
DSN_API void | dsn_task_tracker_destroy (dsn_task_tracker_t tracker) |
DSN_API void | dsn_task_tracker_cancel_all (dsn_task_tracker_t tracker) |
DSN_API void | dsn_task_tracker_wait_all (dsn_task_tracker_t tracker) |
typedef void(* dsn_task_cancelled_handler_t) (void * ) |
callback prototype for task cancellation (called on task-being-cancelled)
in rDSN, tasks can be cancelled. For languages such as C++, when there are explicit resource release operations (e.g., ::free, release_ref()) in the task handlers, cancellation will cause resource leak due to not-executed task handleers. in order to support such scenario, rDSN provides dsn_task_cancelled_handler_t which is executed when a task is cancelled. Note this callback does not have thread affinity similar to task handlers above (which are configured to be executed in certain thread pools or even a fixed thread). Therefore, it is developers' resposibility to ensure this cancallation callback only does thread-insensitive operations (e.g., release_ref()).
enum dsn_task_type_t |
task/event type definition
DSN_API void dsn_task_add_ref | ( | dsn_task_t | task | ) |
Add reference count for a task created from dsn_task_create etc.
task | the task handle. |
Memory usage of tasks are controlled using reference-count. All returned dsn_task_t are NOT add_ref by rDSN, so you DO NOT need to call task_release_ref to release the tasks. the decision is made for easier programming, and you may consider the later dsn_rpc_xxx calls do the resource gc work for you.
however, before you emit the tasks (e.g., via dsn_task_call, dsn_rpc_call), AND you want to hold the task handle further after the emit API, you need to call dsn_task_add_ref to ensure the handle is still valid, and also call dsn_task_release_ref later to release the handle.
DSN_API void dsn_task_release_ref | ( | dsn_task_t | task | ) |
release reference for a given task handle
task | the task handle |
See more details of the comment in dsn_task_add_ref
DSN_API int dsn_task_get_ref | ( | dsn_task_t | task | ) |
get reference for a given task handle
task | the task handle |
See more details of the comment in dsn_task_add_ref
DSN_API bool dsn_task_cancel | ( | dsn_task_t | task, |
bool | wait_until_finished | ||
) |
cancel a task
task | the task handle |
wait_until_finished | true if wait until finished is needed |
DSN_API void dsn_task_set_delay | ( | dsn_task_t | task, |
int | delay_ms | ||
) |
set delay for a task
task | the task handle |
delay_ms | the delay milliseconds for a task |
DSN_API bool dsn_task_cancel2 | ( | dsn_task_t | task, |
bool | wait_until_finished, | ||
bool * | finished | ||
) |
cancel a task
task | the task handle |
wait_until_finished | true if wait until finished is needed |
finished | after the call, whether the task is finished (completed successfully, or cancelled) |
DSN_API void dsn_task_wait | ( | dsn_task_t | task | ) |
wait until a task is completed
task | the task handle |
DSN_API bool dsn_task_wait_timeout | ( | dsn_task_t | task, |
int | timeout_milliseconds | ||
) |
wait until a task is completed
task | the task handle |
timeout_milliseconds | maximum time to wait |
DSN_API dsn_error_t dsn_task_error | ( | dsn_task_t | task | ) |
get result error code of a task
task | the task handle. |
DSN_API bool dsn_task_is_running_inside | ( | dsn_task_t | t | ) |
check whether the task is currently running inside the given task
t | the given task handle |
DSN_API dsn_task_tracker_t dsn_task_tracker_create | ( | int | task_bucket_count | ) |
task trackers are used to track task context
When a task executes, it usually accesses certain context. When the context is gone, all tasks accessing this context needs to be cancelled automatically to avoid invalid context access. To release this burden from developers, rDSN provides task tracker which can be embedded into a context, and destroyed when the context is gone.
task_bucket_count | number of task buckets to reduce thread conflicts |
DSN_API void dsn_task_tracker_destroy | ( | dsn_task_tracker_t | tracker | ) |
destroy a task tracker, which cancels all pending tasks as well
tracker | task tracker handle |
DSN_API void dsn_task_tracker_cancel_all | ( | dsn_task_tracker_t | tracker | ) |
cancels all pending tasks bound to this tracker
tracker | task tracker handle |
DSN_API void dsn_task_tracker_wait_all | ( | dsn_task_tracker_t | tracker | ) |
wait all pending tasks to be completed bound to this tracker
tracker | task tracker handle |