Robust Distributed System Nucleus (rDSN)  ver 1.0.0
clientlet.h
1 /*
2  * The MIT License (MIT)
3  *
4  * Copyright (c) 2015 Microsoft Corporation
5  *
6  * -=- Robust Distributed System Nucleus (rDSN) -=-
7  *
8  * Permission is hereby granted, free of charge, to any person obtaining a copy
9  * of this software and associated documentation files (the "Software"), to deal
10  * in the Software without restriction, including without limitation the rights
11  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
12  * copies of the Software, and to permit persons to whom the Software is
13  * furnished to do so, subject to the following conditions:
14  *
15  * The above copyright notice and this permission notice shall be included in
16  * all copies or substantial portions of the Software.
17  *
18  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
19  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
20  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
21  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
22  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
23  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
24  * THE SOFTWARE.
25  */
26 
27 /*
28  * Description:
29  * c++ client side service API
30  *
31  * Revision history:
32  * Sep., 2015, @imzhenyu (Zhenyu Guo), first version
33  * xxxx-xx-xx, author, fix bug about xxx
34  */
35 
36 # pragma once
37 
38 # include <dsn/cpp/serialization.h>
39 # include <dsn/cpp/task_helper.h>
40 # include <dsn/cpp/function_traits.h>
41 
42 namespace dsn
43 {
44  /*
45  clientlet is the base class for RPC service and client
46  there can be multiple clientlet in the system
47  */
48  class clientlet
49  {
50  public:
51  clientlet(int task_bucket_count = 13);
52  virtual ~clientlet();
53 
54  dsn_task_tracker_t tracker() const { return _tracker; }
55  rpc_address primary_address() { return dsn_primary_address(); }
56 
57  static uint32_t random32(uint32_t min, uint32_t max) { return dsn_random32(min, max); }
58  static uint64_t random64(uint64_t min, uint64_t max) { return dsn_random64(min, max); }
59  static uint64_t now_ns() { return dsn_now_ns(); }
60  static uint64_t now_us() { return dsn_now_us(); }
61  static uint64_t now_ms() { return dsn_now_ms(); }
62 
63  protected:
64  void check_hashed_access();
65 
66  private:
67  int _access_thread_id;
68  bool _access_thread_id_inited;
69  dsn_task_tracker_t _tracker;
70  };
71 
72  //callback function concepts
74  {
75  void operator()()const {}
76  };
77  constexpr empty_callback_t empty_callback{};
78 
79  // callback(error_code, dsn_message_t request, dsn_message_t response)
80  template<typename TFunction, class Enable = void> struct is_raw_rpc_callback
81  {
82  constexpr static bool const value = false;
83  };
84  template<typename TFunction>
85  struct is_raw_rpc_callback<TFunction, typename std::enable_if<function_traits<TFunction>::arity == 3>::type>
86  {
87  using inspect_t = function_traits<TFunction>;
88  constexpr static bool const value =
89  std::is_same<typename inspect_t::template arg_t<0>, dsn::error_code>::value
90  && std::is_same<typename inspect_t::template arg_t<1>, dsn_message_t>::value
91  && std::is_same<typename inspect_t::template arg_t<2>, dsn_message_t>::value;
92  };
93 
94  // callback(error_code, TResponse&& response)
95  template<typename TFunction, class Enable = void> struct is_typed_rpc_callback
96  {
97  constexpr static bool const value = false;
98  };
99  template<typename TFunction>
100  struct is_typed_rpc_callback<TFunction, typename std::enable_if<function_traits<TFunction>::arity == 2>::type>
101  {
102  //todo: check if response_t is marshallable
103  using inspect_t = function_traits<TFunction>;
104  constexpr static bool const value =
105  std::is_same<typename inspect_t::template arg_t<0>, dsn::error_code>::value
106  && std::is_default_constructible<typename std::decay<typename inspect_t::template arg_t<1>>::type>::value;
107  using response_t = typename std::decay<typename inspect_t::template arg_t<1>>::type;
108  };
109 
110  // callback(error_code, int io_size)
111  template<typename TFunction, class Enable = void> struct is_aio_callback
112  {
113  constexpr static bool const value = false;
114  };
115  template<typename TFunction>
116  struct is_aio_callback<TFunction, typename std::enable_if<function_traits<TFunction>::arity == 2>::type>
117  {
118  using inspect_t = function_traits<TFunction>;
119  constexpr static bool const value =
120  std::is_same<typename inspect_t::template arg_t<0>, dsn::error_code>::value
121  && std::is_convertible<typename inspect_t::template arg_t<1>, uint64_t>::value;
122  };
123 
124 
129  namespace tasking
130  {
131  template<typename TCallback>
132  task_ptr create_task(
133  dsn_task_code_t evt,
134  clientlet* svc,
135  TCallback&& callback,
136  int hash = 0)
137  {
138  using callback_storage_t = typename std::remove_reference<TCallback>::type;
139  auto tsk = new transient_safe_task<callback_storage_t>(std::forward<TCallback>(callback));
140  tsk->add_ref(); // released in exec callback
141  auto native_tsk = dsn_task_create_ex(
142  evt,
145  tsk,
146  hash,
147  svc ? svc->tracker() : nullptr);
148  tsk->set_task_info(native_tsk);
149  return tsk;
150  }
151 
152  template<typename TCallback>
153  task_ptr create_timer_task(
154  dsn_task_code_t evt,
155  clientlet* svc,
156  TCallback&& callback,
157  std::chrono::milliseconds timer_interval,
158  int hash = 0)
159  {
160  using callback_storage_t = typename std::remove_reference<TCallback>::type;
161  auto tsk = new timer_safe_task<callback_storage_t>(std::forward<TCallback>(callback));
162  tsk->add_ref(); // released in exec callback
163  auto native_tsk = dsn_task_create_timer_ex(
164  evt,
167  tsk,
168  hash,
169  static_cast<int>(timer_interval.count()),
170  svc ? svc->tracker() : nullptr);
171  tsk->set_task_info(native_tsk);
172  return tsk;
173  }
174 
175  template<typename TCallback>
176  task_ptr enqueue(
177  dsn_task_code_t evt,
178  clientlet* svc,
179  TCallback&& callback,
180  int hash = 0,
181  std::chrono::milliseconds delay = std::chrono::milliseconds(0))
182  {
183  auto tsk = create_task(evt, svc, std::forward<TCallback>(callback), hash);
184  tsk->enqueue(delay);
185  return tsk;
186  }
187 
188  template<typename TCallback>
189  task_ptr enqueue_timer(
190  dsn_task_code_t evt,
191  clientlet* svc,
192  TCallback&& callback,
193  std::chrono::milliseconds timer_interval,
194  int hash = 0,
195  std::chrono::milliseconds delay = std::chrono::milliseconds(0))
196  {
197  auto tsk = create_timer_task(evt, svc, std::forward<TCallback>(callback), timer_interval, hash);
198  tsk->enqueue(delay);
199  return tsk;
200  }
201 
202  template<typename THandler>
203  inline safe_late_task<THandler>* create_late_task(
204  dsn_task_code_t evt,
205  THandler callback,
206  int hash = 0,
207  clientlet* svc = nullptr
208  )
209  {
210  auto tsk = new safe_late_task<THandler>(callback);
211  tsk->add_ref(); // released in exec callback
212  auto t = dsn_task_create_ex(evt,
215  tsk, hash, svc ? svc->tracker() : nullptr);
216 
217  tsk->set_task_info(t);
218  return tsk;
219  }
220  }
227  namespace rpc
228  {
229  task_ptr create_rpc_response_task(
230  dsn_message_t request,
231  clientlet* svc,
233  int reply_thread_hash = 0);
234 
235  template<typename TCallback>
236  typename std::enable_if<is_raw_rpc_callback<TCallback>::value, task_ptr>::type
237  create_rpc_response_task(
238  dsn_message_t request,
239  clientlet* svc,
240  TCallback&& callback,
241  int reply_thread_hash = 0)
242  {
243  using callback_storage_t = typename std::remove_reference<TCallback>::type;
244  auto tsk = new transient_safe_task<callback_storage_t>(std::forward<TCallback>(callback));
245  tsk->add_ref(); // released in exec_rpc_response
246 
248  request,
251  tsk,
252  reply_thread_hash,
253  svc ? svc->tracker() : nullptr
254  );
255  tsk->set_task_info(t);
256  return tsk;
257  }
258 
259  template<typename TCallback>
260  typename std::enable_if<is_typed_rpc_callback<TCallback>::value, task_ptr>::type
261  create_rpc_response_task(
262  dsn_message_t request,
263  clientlet* svc,
264  TCallback&& callback,
265  int reply_thread_hash = 0)
266  {
267  return create_rpc_response_task(
268  request,
269  svc,
270  [cb_fwd = std::forward<TCallback>(callback)](error_code err, dsn_message_t req, dsn_message_t resp) mutable
271  {
273  if (err == ERR_OK)
274  {
275  ::dsn::unmarshall(resp, response);
276  }
277  cb_fwd(err, std::move(response));
278  },
279  reply_thread_hash);
280  }
281 
282  template<typename TCallback>
283  task_ptr call(
284  ::dsn::rpc_address server,
285  dsn_message_t request,
286  clientlet* svc,
287  TCallback&& callback,
288  int reply_thread_hash = 0
289  )
290  {
291  task_ptr t = create_rpc_response_task(request, svc, std::forward<TCallback>(callback), reply_thread_hash);
292  dsn_rpc_call(server.c_addr(), t->native_handle());
293  return t;
294  }
295 
296  template<typename TRequest, typename TCallback>
297  task_ptr call(
298  ::dsn::rpc_address server,
299  dsn_task_code_t code,
300  TRequest&& req,
301  clientlet* owner,
302  TCallback&& callback,
303  uint64_t hash = 0,
304  std::chrono::milliseconds timeout = std::chrono::milliseconds(0),
305  int reply_thread_hash = 0
306  )
307  {
308  dsn_message_t msg = dsn_msg_create_request(code, static_cast<int>(timeout.count()), hash);
309  ::dsn::marshall(msg, std::forward<TRequest>(req));
310  return call(server, msg, owner, std::forward<TCallback>(callback), reply_thread_hash);
311  }
312 
314  {
315  public:
316  explicit rpc_message_helper(dsn_message_t request) : request(request) {}
317  template<typename TCallback>
318  task_ptr call(
319  ::dsn::rpc_address server,
320  clientlet* owner,
321  TCallback&& callback,
322  int reply_thread_hash = 0)
323  {
324  return ::dsn::rpc::call(server, request, owner, std::forward<TCallback>(callback), reply_thread_hash);
325  }
326  private:
327  dsn_message_t request;
328  };
329 
330  template<typename TRequest>
331  rpc_message_helper create_message(
332  dsn_task_code_t code,
333  TRequest&& req,
334  uint64_t hash = 0,
335  std::chrono::milliseconds timeout = std::chrono::milliseconds(0)
336  )
337  {
338  dsn_message_t msg = dsn_msg_create_request(code, static_cast<int>(timeout.count()), hash);
339  ::dsn::marshall(msg, std::forward<TRequest>(req));
340  return rpc_message_helper(msg);
341  }
342 
343 
344  //
345  // for TRequest/TResponse, we assume that the following routines are defined:
346  // marshall(binary_writer& writer, const T& val);
347  // unmarshall(binary_reader& reader, /*out*/ T& val);
348  // either in the namespace of ::dsn::utils or T
349  // developers may write these helper functions by their own, or use tools
350  // such as protocol-buffer, thrift, or bond to generate these functions automatically
351  // for their TRequest and TResponse
352  //
353 
354  // no callback
355  template<typename TRequest>
356  void call_one_way_typed(
357  ::dsn::rpc_address server,
358  dsn_task_code_t code,
359  const TRequest& req,
360  uint64_t hash = 0
361  )
362  {
363  dsn_message_t msg = dsn_msg_create_request(code, 0, hash);
364  ::dsn::marshall(msg, req);
365  dsn_rpc_call_one_way(server.c_addr(), msg);
366  }
367 
368  template<typename TResponse>
369  std::pair< ::dsn::error_code, TResponse> wait_and_unwrap(safe_task_handle* task)
370  {
371  task->wait();
372  std::pair< ::dsn::error_code, TResponse> result;
373  result.first = task->error();
374  if (task->error() == ::dsn::ERR_OK)
375  {
376  ::dsn::unmarshall(task->response(), result.second);
377  }
378  return result;
379  }
380 
381  template<typename TResponse, typename TRequest>
382  std::pair< ::dsn::error_code, TResponse> call_wait(
383  ::dsn::rpc_address server,
384  dsn_task_code_t code,
385  TRequest&& req,
386  int hash = 0,
387  std::chrono::milliseconds timeout = std::chrono::milliseconds(0)
388  )
389  {
390  return wait_and_unwrap<TResponse>(call(server, code, std::forward<TRequest>(req), nullptr, empty_callback, hash, timeout));
391  }
392  }
399  namespace file
400  {
401  task_ptr create_aio_task(
402  dsn_task_code_t callback_code,
403  clientlet* svc,
405  int hash);
406 
407  template<typename TCallback>
408  task_ptr create_aio_task(
409  dsn_task_code_t callback_code,
410  clientlet* svc,
411  TCallback&& callback,
412  int hash)
413  {
414  static_assert(is_aio_callback<TCallback>::value, "invalid aio callback");
415  using callback_storage_t = typename std::remove_reference<TCallback>::type;
416  auto tsk = new transient_safe_task<callback_storage_t>(std::forward<TCallback>(callback));
417  tsk->add_ref(); // released in exec_aio
418 
419  dsn_task_t t = dsn_file_create_aio_task_ex(callback_code,
422  tsk, hash, svc ? svc->tracker() : nullptr
423  );
424 
425  tsk->set_task_info(t);
426  return tsk;
427  }
428 
429  template<typename TCallback>
430  task_ptr read(
431  dsn_handle_t fh,
432  char* buffer,
433  int count,
434  uint64_t offset,
435  dsn_task_code_t callback_code,
436  clientlet* svc,
437  TCallback&& callback,
438  int hash = 0
439  )
440  {
441  auto tsk = create_aio_task(callback_code, svc, std::forward<TCallback>(callback), hash);
442  dsn_file_read(fh, buffer, count, offset, tsk->native_handle());
443  return tsk;
444  }
445 
446  template<typename TCallback>
447  task_ptr write(
448  dsn_handle_t fh,
449  const char* buffer,
450  int count,
451  uint64_t offset,
452  dsn_task_code_t callback_code,
453  clientlet* svc,
454  TCallback&& callback,
455  int hash = 0
456  )
457  {
458  auto tsk = create_aio_task(callback_code, svc, std::forward<TCallback>(callback), hash);
459  dsn_file_write(fh, buffer, count, offset, tsk->native_handle());
460  return tsk;
461  }
462 
463  template<typename TCallback>
464  task_ptr write_vector(
465  dsn_handle_t fh,
466  const dsn_file_buffer_t* buffers,
467  int buffer_count,
468  uint64_t offset,
469  dsn_task_code_t callback_code,
470  clientlet* svc,
471  TCallback&& callback,
472  int hash = 0
473  )
474  {
475  auto tsk = create_aio_task(callback_code, svc, std::forward<TCallback>(callback), hash);
476  dsn_file_write_vector(fh, buffers, buffer_count, offset, tsk->native_handle());
477  return tsk;
478  }
479 
480  void copy_remote_files_impl(
481  ::dsn::rpc_address remote,
482  const std::string& source_dir,
483  const std::vector<std::string>& files, // empty for all
484  const std::string& dest_dir,
485  bool overwrite,
486  dsn_task_t native_task
487  );
488 
489  template<typename TCallback>
490  task_ptr copy_remote_files(
491  ::dsn::rpc_address remote,
492  const std::string& source_dir,
493  const std::vector<std::string>& files, // empty for all
494  const std::string& dest_dir,
495  bool overwrite,
496  dsn_task_code_t callback_code,
497  clientlet* svc,
498  TCallback&& callback,
499  int hash = 0
500  )
501  {
502  auto tsk = create_aio_task(callback_code, svc, std::forward<TCallback>(callback), hash);
503  copy_remote_files_impl(remote, source_dir, files, dest_dir, overwrite, tsk->native_handle());
504  return tsk;
505  }
506 
507  template<typename TCallback>
508  task_ptr copy_remote_directory(
509  ::dsn::rpc_address remote,
510  const std::string& source_dir,
511  const std::string& dest_dir,
512  bool overwrite,
513  dsn_task_code_t callback_code,
514  clientlet* svc,
515  TCallback&& callback,
516  int hash = 0
517  )
518  {
519  return copy_remote_files(
520  remote, source_dir, {}, dest_dir, overwrite,
521  callback_code, svc, std::forward<TCallback>(callback), hash
522  );
523  }
524  }
526  // ------------- inline implementation ----------------
527 
528 } // end namespace
Definition: clientlet.h:95
Definition: task_helper.h:64
Definition: clientlet.h:313
Definition: task.h:101
DSN_API dsn_task_t dsn_file_create_aio_task_ex(dsn_task_code_t code, dsn_aio_handler_t cb, dsn_task_cancelled_handler_t on_cancel, void *context, int hash DEFAULT(0), dsn_task_tracker_t tracker DEFAULT(nullptr))
create aio task which is executed on completion of the file operations
DSN_API dsn_address_t dsn_primary_address()
get the primary address of the rpc engine attached to the current thread
DSN_API void dsn_rpc_call(dsn_address_t server, dsn_task_t rpc_call)
client invokes the RPC call
STL namespace.
DSN_API void dsn_file_write_vector(dsn_handle_t file, const dsn_file_buffer_t *buffers, int buffer_count, uint64_t offset, dsn_task_t cb)
write file asynchronously with vector buffers
Definition: api_layer1.h:939
DSN_API uint64_t dsn_random64(uint64_t min, uint64_t max)
return [min, max]
DSN_API void dsn_file_write(dsn_handle_t file, const char *buffer, int count, uint64_t offset, dsn_task_t cb)
write file asynchronously
Definition: auto_codes.h:303
DSN_API dsn_task_t dsn_task_create_ex(dsn_task_code_t code, dsn_task_handler_t cb, dsn_task_cancelled_handler_t on_cancel, void *context, int hash DEFAULT(0), dsn_task_tracker_t tracker DEFAULT(nullptr))
similar to dsn_task_create, except an on_cancel callback is provided to be executed when the task is ...
Definition: clientlet.h:111
DSN_API dsn_task_t dsn_task_create_timer_ex(dsn_task_code_t code, dsn_task_handler_t cb, dsn_task_cancelled_handler_t on_cancel, void *context, int hash, int interval_milliseconds, dsn_task_tracker_t tracker DEFAULT(nullptr))
similar to dsn_task_create_timer, except an on_cancel callback is provided to be executed when the ta...
Definition: task_helper.h:204
DSN_API void dsn_rpc_call_one_way(dsn_address_t server, dsn_message_t request)
one-way RPC from client, no rpc response is expected
Definition: clientlet.h:48
Definition: address.h:52
Definition: clientlet.h:80
__inline uint32_t dsn_random32(uint32_t min, uint32_t max)
return [min, max]
Definition: api_layer1.h:1141
DSN_API dsn_message_t dsn_msg_create_request(dsn_task_code_t rpc_code, int timeout_milliseconds DEFAULT(0), uint64_t hash DEFAULT(0))
create a rpc request message
DSN_API dsn_task_t dsn_rpc_create_response_task_ex(dsn_message_t request, dsn_rpc_response_handler_t cb, dsn_task_cancelled_handler_t on_cancel, void *context, int reply_thread_hash DEFAULT(0), dsn_task_tracker_t tracker DEFAULT(nullptr))
create a callback task to handle the response message from RPC server, or timeout.
Definition: clientlet.h:73
Definition: task_helper.h:149
Definition: task_helper.h:252
Definition: address.h:61
DSN_API void dsn_file_read(dsn_handle_t file, char *buffer, int count, uint64_t offset, dsn_task_t cb)
read file asynchronously