Robust Distributed System Nucleus (rDSN)  ver 1.0.0
task.h
1 /*
2  * The MIT License (MIT)
3  *
4  * Copyright (c) 2015 Microsoft Corporation
5  *
6  * -=- Robust Distributed System Nucleus (rDSN) -=-
7  *
8  * Permission is hereby granted, free of charge, to any person obtaining a copy
9  * of this software and associated documentation files (the "Software"), to deal
10  * in the Software without restriction, including without limitation the rights
11  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
12  * copies of the Software, and to permit persons to whom the Software is
13  * furnished to do so, subject to the following conditions:
14  *
15  * The above copyright notice and this permission notice shall be included in
16  * all copies or substantial portions of the Software.
17  *
18  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
19  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
20  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
21  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
22  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
23  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
24  * THE SOFTWARE.
25  */
26 
27 /*
28  * Description:
29  * the task abstraction in zion, as well as the derived various types of
30  * tasks in our system
31  *
32  * Revision history:
33  * Mar., 2015, @imzhenyu (Zhenyu Guo), first version
34  * xxxx-xx-xx, author, fix bug about xxx
35  */
36 
37 # pragma once
38 
39 # include <dsn/utility/ports.h>
40 # include <dsn/utility/extensible_object.h>
41 # include <dsn/tool-api/task_spec.h>
42 # include <dsn/tool-api/task_tracker.h>
43 # include <dsn/tool-api/rpc_message.h>
44 # include <dsn/cpp/callocator.h>
45 # include <dsn/cpp/auto_codes.h>
46 # include <dsn/cpp/utils.h>
47 
48 namespace dsn
49 {
50 
51 namespace lock_checker
52 {
53  extern __thread int zlock_exclusive_count;
54  extern __thread int zlock_shared_count;
55  extern void check_wait_safety();
56  extern void check_dangling_lock();
57  extern void check_wait_task(task* waitee);
58 }
59 
60 class task_worker;
61 class task_worker_pool;
62 class service_node;
63 class task_engine;
64 class task_queue;
65 class rpc_engine;
66 class disk_engine;
67 class env_provider;
68 class nfs_node;
69 class timer_service;
70 class task;
71 
73 {
74  uint32_t magic;
75  task *current_task;
76 
77  task_worker *worker;
78  int worker_index;
79  service_node *node;
80  int node_id;
81 
82  rpc_engine *rpc;
83  disk_engine *disk;
84  env_provider *env;
85  nfs_node *nfs;
86  timer_service *tsvc;
87 
88  int last_worker_queue_size;
89  uint64_t node_pool_thread_ids; // 8,8,16 bits
90  uint32_t last_lower32_task_id; // 32bits
91 
92  char scratch_buffer[4][256]; // for temp to_string() etc., 4 buffers in maximum
93  int scratch_buffer_index;
94  char* scratch_next() { return scratch_buffer[++scratch_buffer_index % 4]; }
95 };
96 
97 extern __thread struct __tls_dsn__ tls_dsn;
98 
99 //----------------- common task -------------------------------------------------------
100 
101 class task :
102  public ref_counter,
103  public extensible_object<task, 4>
104 {
105 public:
106  DSN_API task(
107  dsn_task_code_t code,
108  void* context,
109  dsn_task_cancelled_handler_t on_cancel,
110  int hash = 0,
111  service_node* node = nullptr
112  );
113  DSN_API virtual ~task();
114 
115  virtual void exec() = 0;
116 
117  DSN_API void exec_internal();
118  // return whether *this* cancel success,
119  // for timers, even return value is false, the further timer execs are cancelled
120  DSN_API bool cancel(bool wait_until_finished, /*out*/ bool* finished = nullptr);
121  DSN_API bool wait(int timeout_milliseconds = TIME_MS_MAX, bool on_cancel = false);
122  DSN_API virtual void enqueue();
123  DSN_API bool set_retry(bool enqueue_immediately = true); // return true when called inside exec(), false otherwise
124  DSN_API const char* node_name() const;
125  void set_error_code(error_code err) { _error = err; }
126  void set_delay(int delay_milliseconds = 0) { _delay_milliseconds = delay_milliseconds; }
127  void set_tracker(task_tracker* tracker) { _context_tracker.set_tracker(tracker, this); }
128 
129  uint64_t id() const { return _task_id; }
130  task_state state() const { return _state.load(std::memory_order_acquire); }
131  dsn_task_code_t code() const { return _spec->code; }
132  task_spec& spec() const { return *_spec; }
133  int hash() const { return _hash; }
134  int delay_milliseconds() const { return _delay_milliseconds; }
135  error_code error() const { return _error; }
136  service_node* node() const { return _node; }
137  task_tracker* tracker() const { return _context_tracker.tracker(); }
138  bool is_empty() const { return _is_null; }
139 
140  // static helper utilities
141  DSN_API static task* get_current_task();
142  DSN_API static uint64_t get_current_task_id();
143  DSN_API static task_worker* get_current_worker();
144  DSN_API static task_worker* get_current_worker2();
145  DSN_API static service_node* get_current_node();
146  DSN_API static service_node* get_current_node2();
147  DSN_API static int get_current_node_id();
148  DSN_API static int get_current_worker_index();
149  DSN_API static const char* get_current_node_name();
150  DSN_API static rpc_engine* get_current_rpc();
151  DSN_API static disk_engine* get_current_disk();
152  DSN_API static env_provider* get_current_env();
153  DSN_API static nfs_node* get_current_nfs();
154  DSN_API static timer_service* get_current_tsvc();
155  DSN_API static int get_current_queue_length();
156 
157  DSN_API static void set_tls_dsn_context(
158  service_node* node, // cannot be null
159  task_worker* worker, // null for io or timer threads if they are not worker threads
160  task_queue* queue // owner queue if io_mode == IOE_PER_QUEUE
161  );
162  DSN_API static void set_tls_dsn(const __tls_dsn__* ctx);
163  DSN_API static void get_tls_dsn(/*out*/ __tls_dsn__* ctx);
164 
165 protected:
166  DSN_API void signal_waiters();
167  DSN_API void enqueue(task_worker_pool* pool);
168  void set_task_id(uint64_t tid) { _task_id = tid; }
169 
170  bool _is_null;
171  error_code _error;
172  void *_context; // the context for the task/on_cancel callbacks
173  dsn_task_cancelled_handler_t _on_cancel;
174 
175 private:
176  task(const task&);
177  static void check_tls_dsn();
178  static void on_tls_dsn_not_set();
179 
180  mutable std::atomic<task_state> _state;
181  uint64_t _task_id;
182  std::atomic<void*> _wait_event;
183  int _hash;
184  int _delay_milliseconds;
185  bool _wait_for_cancel;
186  task_spec *_spec;
187  service_node *_node;
188  trackable_task _context_tracker; // when tracker is gone, the task is cancelled automatically
189 
190 public:
191  // used by task queue only
192  task* next;
193 };
194 
195 class task_c : public task, public transient_object
196 {
197 public:
198  task_c(
199  dsn_task_code_t code,
200  dsn_task_handler_t cb,
201  void* context,
203  int hash = 0,
204  service_node* node = nullptr
205  )
206  : task(code, context, on_cancel, hash, node)
207  {
208  _cb = cb;
209  }
210 
211  void exec() override
212  {
213  _cb(_context);
214  }
215 
216 private:
217  dsn_task_handler_t _cb;
218 };
219 
220 
221 //----------------- timer task -------------------------------------------------------
222 
223 class timer_task : public task
224 {
225 public:
226  timer_task(
227  dsn_task_code_t code,
228  dsn_task_handler_t cb,
229  void* context,
230  dsn_task_cancelled_handler_t on_cancel,
231  uint32_t interval_milliseconds,
232  int hash = 0,
233  service_node* node = nullptr
234  );
235 
236  DSN_API void exec() override;
237 
238  DSN_API void enqueue() override;
239 
240 private:
241  uint32_t _interval_milliseconds;
242  dsn_task_handler_t _cb;
243 };
244 
245 //----------------- rpc task -------------------------------------------------------
246 
248 {
249  dsn_task_code_t code;
250  std::string name;
251  bool unregistered;
252  std::atomic<int> running_count;
253  dsn_rpc_request_handler_t c_handler;
254  void* parameter;
255 
256  explicit rpc_handler_info(dsn_task_code_t code)
257  : code(code), unregistered(false), running_count(0), c_handler(nullptr), parameter(nullptr)
258  {
259  }
260  ~rpc_handler_info() { }
261 
262  void add_ref()
263  {
264  running_count.fetch_add(1, std::memory_order_relaxed);
265  }
266 
267  int release_ref()
268  {
269  return running_count.fetch_sub(1, std::memory_order_release);
270  }
271 
272  void run(dsn_message_t req)
273  {
274  if (!unregistered)
275  {
276  c_handler(req, parameter);
277  }
278 
279  if (1 == release_ref())
280  {
281  delete this;
282  }
283  }
284 
285  void unregister()
286  {
287  unregistered = true;
288  }
289 };
290 
291 class service_node;
292 class rpc_request_task : public task, public transient_object
293 {
294 public:
295  rpc_request_task(message_ex* request, rpc_handler_info* h, service_node* node);
296  ~rpc_request_task();
297 
298  message_ex* get_request() const { return _request; }
299 
300  DSN_API void enqueue() override;
301 
302  void exec() override
303  {
304  if (0 == _enqueue_ts_ns
305  || dsn_now_ns() - _enqueue_ts_ns <
306  static_cast<uint64_t>(_request->header->client.timeout_ms) * 1000000ULL)
307  {
308  _handler->run(_request);
309  }
310  }
311 
312 protected:
313  message_ex *_request;
314  rpc_handler_info* _handler;
315  uint64_t _enqueue_ts_ns;
316 };
317 
318 typedef void(*dsn_rpc_response_handler_replace_t)(
320  dsn_error_t err,
321  dsn_message_t req,
322  dsn_message_t resp,
323  void* context,
324  uint64_t replace_context
325  );
326 class rpc_response_task : public task, public transient_object
327 {
328 public:
329  DSN_API rpc_response_task(
330  message_ex* request,
332  void* context,
333  dsn_task_cancelled_handler_t on_cancel,
334  int hash = 0,
335  service_node* node = nullptr
336  );
337  DSN_API ~rpc_response_task();
338 
339  // return true for normal case, false for fault injection applied
340  DSN_API bool enqueue(error_code err, message_ex* reply);
341  DSN_API void enqueue() override; // re-enqueue after above enqueue, e.g., after delay
342  message_ex* get_request() const { return _request; }
343  message_ex* get_response() const { return _response; }
344  DSN_API void replace_callback(dsn_rpc_response_handler_replace_t callback, uint64_t context); // not thread-safe
345  DSN_API bool reset_callback(); // used only when replace_callback is called before, not thread-safe
346  task_worker_pool* caller_pool() const { return _caller_pool; }
347  void set_caller_pool(task_worker_pool* pl) { _caller_pool = pl; }
348 
349  void exec() override
350  {
351  if (_cb)
352  {
353  _cb(_error.get(), _request, _response, _context);
354  }
355  else
356  {
357  _error.end_tracking();
358  }
359  }
360 
361 private:
362  message_ex* _request;
363  message_ex* _response;
364  task_worker_pool * _caller_pool;
366 
367  friend class rpc_engine;
368 };
369 
370 //------------------------- disk AIO task ---------------------------------------------------
371 
372 enum aio_type
373 {
374  AIO_Invalid,
375  AIO_Read,
376  AIO_Write
377 };
378 
379 class disk_engine;
380 class disk_aio
381 {
382 public:
383  // filled by apps
384  dsn_handle_t file;
385  void* buffer;
386  uint32_t buffer_size;
387  uint64_t file_offset;
388 
389  // filled by frameworks
390  aio_type type;
391  disk_engine *engine;
392  void* file_object;
393 
394  disk_aio() : file(nullptr), buffer(nullptr), buffer_size(0), file_offset(0), type(AIO_Invalid), engine(nullptr), file_object(nullptr)
395  {}
396  virtual ~disk_aio(){}
397 };
398 
399 class aio_task : public task, public transient_object
400 {
401 public:
402  DSN_API aio_task(
403  dsn_task_code_t code,
404  dsn_aio_handler_t cb,
405  void* context,
406  dsn_task_cancelled_handler_t on_cancel,
407  int hash = 0,
408  service_node* node = nullptr
409  );
410  DSN_API ~aio_task();
411 
412  DSN_API void enqueue(error_code err, size_t transferred_size);
413  size_t get_transferred_size() const { return _transferred_size; }
414  disk_aio* aio() { return _aio; }
415 
416  void copy_to(char* dest)
417  {
418  if (!_unmerged_write_buffers.empty())
419  {
420  for (auto &buffer : _unmerged_write_buffers)
421  {
422  memcpy(dest, buffer.buffer, buffer.size);
423  dest += buffer.size;
424  }
425  }
426  else
427  {
428  memcpy(dest, _aio->buffer, _aio->buffer_size);
429  }
430  }
431 
432  void collapse() {
433  if (!_unmerged_write_buffers.empty()) {
434  std::shared_ptr<char> buffer(dsn::make_shared_array<char>(_aio->buffer_size));
435  _merged_write_buffer_holder.assign(buffer, 0, _aio->buffer_size);
436  _aio->buffer = buffer.get();
437  copy_to(buffer.get());
438  }
439  }
440 
441  virtual void exec() override // aio completed
442  {
443  if (nullptr != _cb)
444  {
445  _cb(_error.get(), _transferred_size, _context);
446  }
447  else
448  {
449  _error.end_tracking();
450  }
451  }
452 
453  std::vector<dsn_file_buffer_t> _unmerged_write_buffers;
454  blob _merged_write_buffer_holder;
455 protected:
456  disk_aio* _aio;
457  size_t _transferred_size;
458  dsn_aio_handler_t _cb;
459 };
460 
461 // ------------------------ inline implementations --------------------
462 __inline /*static*/ void task::check_tls_dsn()
463 {
464  if (tls_dsn.magic != 0xdeadbeef)
465  {
466  on_tls_dsn_not_set();
467  }
468 }
469 
470 __inline /*static*/ task* task::get_current_task()
471 {
472  check_tls_dsn();
473  return tls_dsn.current_task;
474 }
475 
476 __inline /*static*/ uint64_t task::get_current_task_id()
477 {
478  if (tls_dsn.magic == 0xdeadbeef)
479  return tls_dsn.current_task ? tls_dsn.current_task->id() : 0;
480  else
481  return 0;
482 }
483 
484 
485 __inline /*static*/ task_worker* task::get_current_worker()
486 {
487  check_tls_dsn();
488  return tls_dsn.worker;
489 }
490 
491 __inline /*static*/ task_worker* task::get_current_worker2()
492 {
493  return tls_dsn.magic == 0xdeadbeef ? tls_dsn.worker : nullptr;
494 }
495 
496 __inline /*static*/ service_node* task::get_current_node()
497 {
498  check_tls_dsn();
499  return tls_dsn.node;
500 }
501 
502 __inline /*static*/ int task::get_current_node_id()
503 {
504  return tls_dsn.magic == 0xdeadbeef ? tls_dsn.node_id : 0;
505 }
506 
507 __inline /*static*/ service_node* task::get_current_node2()
508 {
509  return tls_dsn.magic == 0xdeadbeef ? tls_dsn.node : nullptr;
510 }
511 
512 __inline /*static*/ int task::get_current_worker_index()
513 {
514  check_tls_dsn();
515  return tls_dsn.worker_index;
516 }
517 
518 __inline /*static*/ rpc_engine* task::get_current_rpc()
519 {
520  check_tls_dsn();
521  return tls_dsn.rpc;
522 }
523 
524 __inline /*static*/ disk_engine* task::get_current_disk()
525 {
526  check_tls_dsn();
527  return tls_dsn.disk;
528 }
529 
530 __inline /*static*/ env_provider* task::get_current_env()
531 {
532  check_tls_dsn();
533  return tls_dsn.env;
534 }
535 
536 __inline /*static*/ nfs_node* task::get_current_nfs()
537 {
538  check_tls_dsn();
539  return tls_dsn.nfs;
540 }
541 
542 __inline /*static*/ timer_service* task::get_current_tsvc()
543 {
544  check_tls_dsn();
545  return tls_dsn.tsvc;
546 }
547 
548 __inline /*static*/ int task::get_current_queue_length()
549 {
550  check_tls_dsn();
551  return tls_dsn.last_worker_queue_size;
552 }
553 
554 } // end namespace
Definition: task.h:223
Definition: task.h:247
void(* dsn_task_cancelled_handler_t)(void *)
callback prototype for task cancellation (called on task-being-cancelled)
Definition: api_task.h:111
Definition: task.h:101
task queue batches the input queue for the bound task worker(s) (threads)
Definition: task_queue.h:55
void(* dsn_task_handler_t)(void *)
callback prototype for TASK_TYPE_COMPUTE
Definition: api_task.h:64
Definition: task.h:380
Definition: task.h:72
Definition: task.h:399
Definition: task.h:326
Definition: task.h:195
Definition: auto_codes.h:303
void(* dsn_rpc_request_handler_t)(dsn_message_t, void *)
callback prototype for TASK_TYPE_RPC_REQUEST
Definition: api_task.h:69
void(* dsn_rpc_response_handler_t)(dsn_error_t, dsn_message_t, dsn_message_t, void *)
callback prototype for TASK_TYPE_RPC_RESPONSE
Definition: api_task.h:75
task worker processes the input tasks from the bound task queue
Definition: task_worker.h:54
Definition: task_spec.h:191
void(* dsn_aio_handler_t)(dsn_error_t, size_t, void *)
callback prototype for TASK_TYPE_AIO
Definition: api_task.h:83
Definition: address.h:52
Definition: nfs.h:72
timer service schedules the input tasks at specified timepoint
Definition: timer_service.h:51
Definition: env_provider.h:48
Definition: task.h:292