39 # include <dsn/utility/ports.h> 40 # include <dsn/utility/extensible_object.h> 41 # include <dsn/tool-api/task_spec.h> 42 # include <dsn/tool-api/task_tracker.h> 43 # include <dsn/tool-api/rpc_message.h> 44 # include <dsn/cpp/callocator.h> 45 # include <dsn/cpp/auto_codes.h> 46 # include <dsn/cpp/utils.h> 51 namespace lock_checker
53 extern __thread
int zlock_exclusive_count;
54 extern __thread
int zlock_shared_count;
55 extern void check_wait_safety();
56 extern void check_dangling_lock();
57 extern void check_wait_task(task* waitee);
61 class task_worker_pool;
88 int last_worker_queue_size;
89 uint64_t node_pool_thread_ids;
90 uint32_t last_lower32_task_id;
92 char scratch_buffer[4][256];
93 int scratch_buffer_index;
94 char* scratch_next() {
return scratch_buffer[++scratch_buffer_index % 4]; }
103 public extensible_object<task, 4>
107 dsn_task_code_t code,
111 service_node* node =
nullptr 113 DSN_API
virtual ~task();
115 virtual void exec() = 0;
117 DSN_API
void exec_internal();
120 DSN_API
bool cancel(
bool wait_until_finished,
bool* finished =
nullptr);
121 DSN_API
bool wait(
int timeout_milliseconds = TIME_MS_MAX,
bool on_cancel =
false);
122 DSN_API
virtual void enqueue();
123 DSN_API
bool set_retry(
bool enqueue_immediately =
true);
124 DSN_API
const char* node_name()
const;
125 void set_error_code(
error_code err) { _error = err; }
126 void set_delay(
int delay_milliseconds = 0) { _delay_milliseconds = delay_milliseconds; }
127 void set_tracker(task_tracker* tracker) { _context_tracker.set_tracker(tracker,
this); }
129 uint64_t id()
const {
return _task_id; }
130 task_state state()
const {
return _state.load(std::memory_order_acquire); }
131 dsn_task_code_t code()
const {
return _spec->code; }
132 task_spec& spec()
const {
return *_spec; }
133 int hash()
const {
return _hash; }
134 int delay_milliseconds()
const {
return _delay_milliseconds; }
136 service_node* node()
const {
return _node; }
137 task_tracker* tracker()
const {
return _context_tracker.tracker(); }
138 bool is_empty()
const {
return _is_null; }
141 DSN_API
static task* get_current_task();
142 DSN_API
static uint64_t get_current_task_id();
145 DSN_API
static service_node* get_current_node();
146 DSN_API
static service_node* get_current_node2();
147 DSN_API
static int get_current_node_id();
148 DSN_API
static int get_current_worker_index();
149 DSN_API
static const char* get_current_node_name();
150 DSN_API
static rpc_engine* get_current_rpc();
151 DSN_API
static disk_engine* get_current_disk();
153 DSN_API
static nfs_node* get_current_nfs();
155 DSN_API
static int get_current_queue_length();
157 DSN_API
static void set_tls_dsn_context(
162 DSN_API
static void set_tls_dsn(
const __tls_dsn__* ctx);
163 DSN_API
static void get_tls_dsn(
__tls_dsn__* ctx);
166 DSN_API
void signal_waiters();
167 DSN_API
void enqueue(task_worker_pool* pool);
168 void set_task_id(uint64_t tid) { _task_id = tid; }
177 static void check_tls_dsn();
178 static void on_tls_dsn_not_set();
180 mutable std::atomic<task_state> _state;
182 std::atomic<void*> _wait_event;
184 int _delay_milliseconds;
185 bool _wait_for_cancel;
188 trackable_task _context_tracker;
199 dsn_task_code_t code,
204 service_node* node =
nullptr 206 :
task(code, context, on_cancel, hash, node)
227 dsn_task_code_t code,
231 uint32_t interval_milliseconds,
233 service_node* node =
nullptr 236 DSN_API
void exec()
override;
238 DSN_API
void enqueue()
override;
241 uint32_t _interval_milliseconds;
249 dsn_task_code_t code;
252 std::atomic<int> running_count;
257 : code(code), unregistered(
false), running_count(0), c_handler(
nullptr), parameter(
nullptr)
264 running_count.fetch_add(1, std::memory_order_relaxed);
269 return running_count.fetch_sub(1, std::memory_order_release);
272 void run(dsn_message_t req)
276 c_handler(req, parameter);
279 if (1 == release_ref())
298 message_ex* get_request()
const {
return _request; }
300 DSN_API
void enqueue()
override;
304 if (0 == _enqueue_ts_ns
305 || dsn_now_ns() - _enqueue_ts_ns <
306 static_cast<uint64_t>(_request->header->client.timeout_ms) * 1000000ULL)
308 _handler->run(_request);
313 message_ex *_request;
315 uint64_t _enqueue_ts_ns;
318 typedef void(*dsn_rpc_response_handler_replace_t)(
324 uint64_t replace_context
335 service_node* node =
nullptr 337 DSN_API ~rpc_response_task();
340 DSN_API
bool enqueue(
error_code err, message_ex* reply);
341 DSN_API
void enqueue()
override;
342 message_ex* get_request()
const {
return _request; }
343 message_ex* get_response()
const {
return _response; }
344 DSN_API
void replace_callback(dsn_rpc_response_handler_replace_t callback, uint64_t context);
345 DSN_API
bool reset_callback();
346 task_worker_pool* caller_pool()
const {
return _caller_pool; }
347 void set_caller_pool(task_worker_pool* pl) { _caller_pool = pl; }
353 _cb(_error.get(), _request, _response, _context);
357 _error.end_tracking();
362 message_ex* _request;
363 message_ex* _response;
364 task_worker_pool * _caller_pool;
367 friend class rpc_engine;
386 uint32_t buffer_size;
387 uint64_t file_offset;
394 disk_aio() : file(
nullptr), buffer(
nullptr), buffer_size(0), file_offset(0), type(AIO_Invalid), engine(
nullptr), file_object(
nullptr)
403 dsn_task_code_t code,
408 service_node* node =
nullptr 412 DSN_API
void enqueue(
error_code err,
size_t transferred_size);
413 size_t get_transferred_size()
const {
return _transferred_size; }
416 void copy_to(
char* dest)
418 if (!_unmerged_write_buffers.empty())
420 for (
auto &buffer : _unmerged_write_buffers)
422 memcpy(dest, buffer.buffer, buffer.size);
428 memcpy(dest, _aio->buffer, _aio->buffer_size);
433 if (!_unmerged_write_buffers.empty()) {
434 std::shared_ptr<char> buffer(dsn::make_shared_array<char>(_aio->buffer_size));
435 _merged_write_buffer_holder.assign(buffer, 0, _aio->buffer_size);
436 _aio->buffer = buffer.get();
437 copy_to(buffer.get());
441 virtual void exec()
override 445 _cb(_error.get(), _transferred_size, _context);
449 _error.end_tracking();
453 std::vector<dsn_file_buffer_t> _unmerged_write_buffers;
454 blob _merged_write_buffer_holder;
457 size_t _transferred_size;
462 __inline
void task::check_tls_dsn()
464 if (tls_dsn.magic != 0xdeadbeef)
466 on_tls_dsn_not_set();
470 __inline
task* task::get_current_task()
473 return tls_dsn.current_task;
476 __inline uint64_t task::get_current_task_id()
478 if (tls_dsn.magic == 0xdeadbeef)
479 return tls_dsn.current_task ? tls_dsn.current_task->id() : 0;
488 return tls_dsn.worker;
493 return tls_dsn.magic == 0xdeadbeef ? tls_dsn.worker :
nullptr;
496 __inline service_node* task::get_current_node()
502 __inline
int task::get_current_node_id()
504 return tls_dsn.magic == 0xdeadbeef ? tls_dsn.node_id : 0;
507 __inline service_node* task::get_current_node2()
509 return tls_dsn.magic == 0xdeadbeef ? tls_dsn.node :
nullptr;
512 __inline
int task::get_current_worker_index()
515 return tls_dsn.worker_index;
518 __inline rpc_engine* task::get_current_rpc()
524 __inline disk_engine* task::get_current_disk()
536 __inline
nfs_node* task::get_current_nfs()
548 __inline
int task::get_current_queue_length()
551 return tls_dsn.last_worker_queue_size;
void(* dsn_task_cancelled_handler_t)(void *)
callback prototype for task cancellation (called on task-being-cancelled)
Definition: api_task.h:111
task queue batches the input queue for the bound task worker(s) (threads)
Definition: task_queue.h:55
void(* dsn_task_handler_t)(void *)
callback prototype for TASK_TYPE_COMPUTE
Definition: api_task.h:64
Definition: auto_codes.h:303
void(* dsn_rpc_request_handler_t)(dsn_message_t, void *)
callback prototype for TASK_TYPE_RPC_REQUEST
Definition: api_task.h:69
void(* dsn_rpc_response_handler_t)(dsn_error_t, dsn_message_t, dsn_message_t, void *)
callback prototype for TASK_TYPE_RPC_RESPONSE
Definition: api_task.h:75
task worker processes the input tasks from the bound task queue
Definition: task_worker.h:54
Definition: task_spec.h:191
void(* dsn_aio_handler_t)(dsn_error_t, size_t, void *)
callback prototype for TASK_TYPE_AIO
Definition: api_task.h:83
timer service schedules the input tasks at specified timepoint
Definition: timer_service.h:51
Definition: env_provider.h:48