Robust Distributed System Nucleus (rDSN)  ver 1.0.0
task_helper.h
1 /*
2  * The MIT License (MIT)
3  *
4  * Copyright (c) 2015 Microsoft Corporation
5  *
6  * -=- Robust Distributed System Nucleus (rDSN) -=-
7  *
8  * Permission is hereby granted, free of charge, to any person obtaining a copy
9  * of this software and associated documentation files (the "Software"), to deal
10  * in the Software without restriction, including without limitation the rights
11  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
12  * copies of the Software, and to permit persons to whom the Software is
13  * furnished to do so, subject to the following conditions:
14  *
15  * The above copyright notice and this permission notice shall be included in
16  * all copies or substantial portions of the Software.
17  *
18  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
19  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
20  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
21  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
22  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
23  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
24  * THE SOFTWARE.
25  */
26 
27 /*
28  * Description:
29  * helpers for easier task programing atop C api
30  *
31  * Revision history:
32  * Sep., 2015, @imzhenyu (Zhenyu Guo), first version
33  * xxxx-xx-xx, author, fix bug about xxx
34  */
35 
36 # pragma once
37 
38 # include <dsn/service_api_c.h>
39 # include <dsn/cpp/auto_codes.h>
40 # include <dsn/cpp/utils.h>
41 # include <dsn/cpp/rpc_stream.h>
42 # include <dsn/cpp/serialization.h>
43 # include <dsn/cpp/zlocks.h>
44 # include <dsn/utility/autoref_ptr.h>
45 # include <dsn/utility/synchronize.h>
46 # include <dsn/utility/link.h>
47 # include <dsn/cpp/callocator.h>
48 # include <set>
49 # include <map>
50 # include <thread>
51 # include <dsn/cpp/optional.h>
52 
53 namespace dsn
54 {
55  typedef std::function<void(error_code, size_t)> aio_handler;
56  class safe_task_handle;
57  typedef ::dsn::ref_ptr<safe_task_handle> task_ptr;
58 
59  //
60  // basic cpp task wrapper
61  // which manages the task handle
62  // and the interaction with task context manager, clientlet
63  //
65  public ::dsn::ref_counter
66  {
67  public:
69  {
70  _task = nullptr;
71  _rpc_response = nullptr;
72  }
73 
74  virtual ~safe_task_handle()
75  {
76  dsn_task_release_ref(_task);
77 
78  if (_rpc_response != nullptr)
79  dsn_msg_release_ref(_rpc_response);
80  }
81 
82  void set_task_info(dsn_task_t t)
83  {
84  _task = t;
86  }
87 
88  dsn_task_t native_handle() const
89  { return _task; }
90 
91  virtual bool cancel(bool wait_until_finished, bool* finished = nullptr)
92  {
93  return dsn_task_cancel2(_task, wait_until_finished, finished);
94  }
95 
96  void set_delay(int delay_ms)
97  {
98  dsn_task_set_delay(_task, delay_ms);
99  }
100 
101  void wait() const
102  {
103  dsn_task_wait(_task);
104  }
105 
106  bool wait(int timeout_millieseconds) const
107  {
108  return dsn_task_wait_timeout(_task, timeout_millieseconds);
109  }
110 
111  ::dsn::error_code error() const
112  {
113  return dsn_task_error(_task);
114  }
115 
116  size_t io_size() const
117  {
118  return dsn_file_get_io_size(_task);
119  }
120 
121  void enqueue(std::chrono::milliseconds delay = std::chrono::milliseconds(0)) const
122  {
123  dsn_task_call(_task, static_cast<int>(delay.count()));
124  }
125 
126  void enqueue_aio(error_code err, size_t size) const
127  {
128  dsn_file_task_enqueue(_task, err.get(), size);
129  }
130 
131  dsn_message_t response()
132  {
133  if (_rpc_response == nullptr)
134  _rpc_response = dsn_rpc_get_response(_task);
135  return _rpc_response;
136  }
137 
138  void enqueue_rpc_response(error_code err, dsn_message_t resp) const
139  {
140  dsn_rpc_enqueue_response(_task, err.get(), resp);
141  }
142 
143  private:
144  dsn_task_t _task;
145  dsn_message_t _rpc_response;
146  };
147 
148  template<typename THandler>
150  public safe_task_handle,
151  public transient_object
152  {
153  public:
154  explicit transient_safe_task(THandler&& h) : _handler(std::move(h))
155  {
156  }
157  explicit transient_safe_task(const THandler& h) : _handler(h)
158  {
159  }
160  virtual bool cancel(bool wait_until_finished, bool* finished = nullptr) override
161  {
162  return safe_task_handle::cancel(wait_until_finished, finished);
163  }
164 
165  static void on_cancel(void* task)
166  {
167  auto t = static_cast<transient_safe_task*>(task);
168  t->_handler.reset();
169  t->release_ref(); // added upon callback exec registration
170  }
171 
172  static void exec(void* task)
173  {
174  auto t = static_cast<transient_safe_task*>(task);
175  dbg_dassert(t->_handler.is_some(), "_handler is missing");
176  t->_handler.unwrap()();
177  t->_handler.reset();
178  t->release_ref(); // added upon callback exec registration
179  }
180 
181  static void exec_rpc_response(dsn_error_t err, dsn_message_t req, dsn_message_t resp, void* task)
182  {
183  auto t = static_cast<transient_safe_task*>(task);
184  dbg_dassert(t->_handler.is_some(), "_handler is missing");
185  t->_handler.unwrap()(err, req, resp);
186  t->_handler.reset();
187  t->release_ref(); // added upon callback exec_rpc_response registration
188  }
189 
190  static void exec_aio(dsn_error_t err, size_t sz, void* task)
191  {
192  auto t = static_cast<transient_safe_task*>(task);
193  dbg_dassert(t->_handler.is_some(), "_handler is missing");
194  t->_handler.unwrap()(err, sz);
195  t->_handler.reset();
196  t->release_ref(); // added upon callback exec_aio registration
197  }
198 
199  private:
200  dsn::optional<THandler> _handler;
201  };
202 
203  template<typename THandler>
205  public safe_task_handle
206  {
207  public:
208  explicit timer_safe_task(THandler&& h) : _handler(std::move(h))
209  {
210  }
211  explicit timer_safe_task(const THandler& h) : _handler(h)
212  {
213  }
214  virtual bool cancel(bool wait_until_finished, bool* finished = nullptr) override
215  {
216  return safe_task_handle::cancel(wait_until_finished, finished);
217  }
218 
219  static void on_cancel(void* task)
220  {
221  auto t = static_cast<timer_safe_task*>(task);
222  t->_handler.reset();
223  t->release_ref(); // added upon callback exec registration
224  }
225 
226  static void exec_timer(void* task)
227  {
228  auto t = static_cast<timer_safe_task*>(task);
229  dbg_dassert(t->_handler.is_some(), "_handler is missing");
230  t->_handler.unwrap()();
231  }
232 
233  private:
234  dsn::optional<THandler> _handler;
235  };
236 
237  //
238  // two staged computation task
239  // this is used when a task handle is returned when a call is made,
240  // while the task, is however, enqueued later after other operations when
241  // certain parameters to the task is known (e.g., error code after logging)
242  // in thise case, we can use two staged computation task as this is.
243  //
244  // task_ptr task = tasking::create_late_task(...);
245  // ...
246  // return task;
247  //
248  // ... after logging ....
249  // task->bind_and_enqueue([&](cb)=>{std::bind(cb, error)}, delay);
250  //
251  template<typename THandler>
253  {
254  public:
255  safe_late_task(THandler& h)
256  : _bound_handler(nullptr), _handler(h)
257  {
258  }
259 
260  operator task_ptr() const
261  {
262  return task_ptr(this);
263  }
264 
265  virtual bool cancel(bool wait_until_finished, bool* finished = nullptr) override
266  {
267  bool r = safe_task_handle::cancel(wait_until_finished, finished);
268  if (r)
269  {
270  _bound_handler = nullptr;
271  }
272  return r;
273  }
274 
275  void bind_and_enqueue(
276  std::function<std::function<void()> (THandler&)> binder,
277  int delay_milliseconds = 0
278  )
279  {
280  _bound_handler = binder(_handler);
281  _handler = nullptr;
282  dsn_task_call(native_handle(), delay_milliseconds);
283  }
284 
285  static void on_cancel(void* task)
286  {
287  auto t = (safe_late_task<THandler>*)task;
288  t->release_ref(); // added upon callback exec registration
289  }
290 
291  static void exec(void* task)
292  {
293  auto t = (safe_late_task<THandler>*)task;
294  t->_bound_handler();
295  t->_bound_handler = nullptr;
296  t->release_ref(); // added upon callback exec registration
297  }
298 
299  private:
300  std::function<void()> _bound_handler;
301  THandler _handler;
302  };
303 
304  // ------- inlined implementation ----------
305 }
Definition: task_helper.h:64
DSN_API bool dsn_task_cancel2(dsn_task_t task, bool wait_until_finished, bool *finished)
cancel a task
Definition: task.h:101
DSN_API void dsn_file_task_enqueue(dsn_task_t cb_task, dsn_error_t err, size_t size)
mimic io completion when no io operation is really issued
DSN_API void dsn_task_add_ref(dsn_task_t task)
Add reference count for a task created from dsn_task_create etc.
Definition: auto_codes.h:303
DSN_API void dsn_rpc_enqueue_response(dsn_task_t rpc_call, dsn_error_t err, dsn_message_t response)
this is to mimic a response is received when no real rpc is called
DSN_API void dsn_task_set_delay(dsn_task_t task, int delay_ms)
set delay for a task
Definition: task_helper.h:204
DSN_API void dsn_msg_release_ref(dsn_message_t msg)
release reference to the message, paired with /ref dsn_msg_add_ref
DSN_API void dsn_task_call(dsn_task_t task, int delay_milliseconds DEFAULT(0))
start the task
DSN_API dsn_error_t dsn_task_error(dsn_task_t task)
get result error code of a task
Definition: address.h:52
DSN_API void dsn_task_wait(dsn_task_t task)
wait until a task is completed
DSN_API bool dsn_task_wait_timeout(dsn_task_t task, int timeout_milliseconds)
wait until a task is completed
DSN_API dsn_message_t dsn_rpc_get_response(dsn_task_t rpc_call)
get response message from the response task, note returned msg must be explicitly released using dsn_...
Definition: task_helper.h:149
Definition: task_helper.h:252
DSN_API size_t dsn_file_get_io_size(dsn_task_t cb_task)
get read/written io size for the given aio task
DSN_API void dsn_task_release_ref(dsn_task_t task)
release reference for a given task handle