38 # include <dsn/cpp/serialization.h> 39 # include <dsn/cpp/task_helper.h> 40 # include <dsn/cpp/function_traits.h> 54 dsn_task_tracker_t tracker()
const {
return _tracker; }
57 static uint32_t random32(uint32_t min, uint32_t max) {
return dsn_random32(min, max); }
58 static uint64_t random64(uint64_t min, uint64_t max) {
return dsn_random64(min, max); }
59 static uint64_t now_ns() {
return dsn_now_ns(); }
60 static uint64_t now_us() {
return dsn_now_us(); }
61 static uint64_t now_ms() {
return dsn_now_ms(); }
64 void check_hashed_access();
67 int _access_thread_id;
68 bool _access_thread_id_inited;
69 dsn_task_tracker_t _tracker;
75 void operator()()
const {}
82 constexpr
static bool const value =
false;
84 template<
typename TFunction>
87 using inspect_t = function_traits<TFunction>;
88 constexpr
static bool const value =
89 std::is_same<typename inspect_t::template arg_t<0>,
dsn::error_code>::value
90 && std::is_same<typename inspect_t::template arg_t<1>, dsn_message_t>::value
91 && std::is_same<typename inspect_t::template arg_t<2>, dsn_message_t>::value;
97 constexpr
static bool const value =
false;
99 template<
typename TFunction>
103 using inspect_t = function_traits<TFunction>;
104 constexpr
static bool const value =
105 std::is_same<typename inspect_t::template arg_t<0>,
dsn::error_code>::value
106 && std::is_default_constructible<typename std::decay<typename inspect_t::template arg_t<1>>::type>::value;
107 using response_t =
typename std::decay<typename inspect_t::template arg_t<1>>::type;
113 constexpr
static bool const value =
false;
115 template<
typename TFunction>
116 struct is_aio_callback<TFunction, typename
std::enable_if<function_traits<TFunction>::arity == 2>::type>
118 using inspect_t = function_traits<TFunction>;
119 constexpr
static bool const value =
120 std::is_same<typename inspect_t::template arg_t<0>,
dsn::error_code>::value
121 && std::is_convertible<typename inspect_t::template arg_t<1>, uint64_t>::value;
131 template<
typename TCallback>
132 task_ptr create_task(
135 TCallback&& callback,
138 using callback_storage_t =
typename std::remove_reference<TCallback>::type;
147 svc ? svc->tracker() :
nullptr);
148 tsk->set_task_info(native_tsk);
152 template<
typename TCallback>
153 task_ptr create_timer_task(
156 TCallback&& callback,
157 std::chrono::milliseconds timer_interval,
160 using callback_storage_t =
typename std::remove_reference<TCallback>::type;
169 static_cast<int>(timer_interval.count()),
170 svc ? svc->tracker() :
nullptr);
171 tsk->set_task_info(native_tsk);
175 template<
typename TCallback>
179 TCallback&& callback,
181 std::chrono::milliseconds delay = std::chrono::milliseconds(0))
183 auto tsk = create_task(evt, svc, std::forward<TCallback>(callback), hash);
188 template<
typename TCallback>
189 task_ptr enqueue_timer(
192 TCallback&& callback,
193 std::chrono::milliseconds timer_interval,
195 std::chrono::milliseconds delay = std::chrono::milliseconds(0))
197 auto tsk = create_timer_task(evt, svc, std::forward<TCallback>(callback), timer_interval, hash);
202 template<
typename THandler>
215 tsk, hash, svc ? svc->tracker() :
nullptr);
217 tsk->set_task_info(t);
229 task_ptr create_rpc_response_task(
230 dsn_message_t request,
233 int reply_thread_hash = 0);
235 template<
typename TCallback>
236 typename std::enable_if<is_raw_rpc_callback<TCallback>::value, task_ptr>::type
237 create_rpc_response_task(
238 dsn_message_t request,
240 TCallback&& callback,
241 int reply_thread_hash = 0)
243 using callback_storage_t =
typename std::remove_reference<TCallback>::type;
253 svc ? svc->tracker() : nullptr
255 tsk->set_task_info(t);
259 template<
typename TCallback>
260 typename std::enable_if<is_typed_rpc_callback<TCallback>::value, task_ptr>::type
261 create_rpc_response_task(
262 dsn_message_t request,
264 TCallback&& callback,
265 int reply_thread_hash = 0)
267 return create_rpc_response_task(
270 [cb_fwd = std::forward<TCallback>(callback)](
error_code err, dsn_message_t req, dsn_message_t resp)
mutable 275 ::dsn::unmarshall(resp, response);
277 cb_fwd(err, std::move(response));
282 template<
typename TCallback>
285 dsn_message_t request,
287 TCallback&& callback,
288 int reply_thread_hash = 0
291 task_ptr t = create_rpc_response_task(request, svc, std::forward<TCallback>(callback), reply_thread_hash);
296 template<
typename TRequest,
typename TCallback>
299 dsn_task_code_t code,
302 TCallback&& callback,
304 std::chrono::milliseconds timeout = std::chrono::milliseconds(0),
305 int reply_thread_hash = 0
309 ::dsn::marshall(msg, std::forward<TRequest>(req));
310 return call(server, msg, owner, std::forward<TCallback>(callback), reply_thread_hash);
317 template<
typename TCallback>
321 TCallback&& callback,
322 int reply_thread_hash = 0)
324 return ::dsn::rpc::call(server, request, owner, std::forward<TCallback>(callback), reply_thread_hash);
327 dsn_message_t request;
330 template<
typename TRequest>
332 dsn_task_code_t code,
335 std::chrono::milliseconds timeout = std::chrono::milliseconds(0)
339 ::dsn::marshall(msg, std::forward<TRequest>(req));
355 template<
typename TRequest>
356 void call_one_way_typed(
358 dsn_task_code_t code,
364 ::dsn::marshall(msg, req);
368 template<
typename TResponse>
372 std::pair< ::dsn::error_code, TResponse> result;
373 result.first = task->error();
374 if (task->error() == ::dsn::ERR_OK)
376 ::dsn::unmarshall(task->response(), result.second);
381 template<
typename TResponse,
typename TRequest>
382 std::pair< ::dsn::error_code, TResponse> call_wait(
384 dsn_task_code_t code,
387 std::chrono::milliseconds timeout = std::chrono::milliseconds(0)
390 return wait_and_unwrap<TResponse>(call(server, code, std::forward<TRequest>(req),
nullptr, empty_callback, hash, timeout));
401 task_ptr create_aio_task(
402 dsn_task_code_t callback_code,
407 template<
typename TCallback>
408 task_ptr create_aio_task(
409 dsn_task_code_t callback_code,
411 TCallback&& callback,
415 using callback_storage_t =
typename std::remove_reference<TCallback>::type;
422 tsk, hash, svc ? svc->tracker() : nullptr
425 tsk->set_task_info(t);
429 template<
typename TCallback>
435 dsn_task_code_t callback_code,
437 TCallback&& callback,
441 auto tsk = create_aio_task(callback_code, svc, std::forward<TCallback>(callback), hash);
442 dsn_file_read(fh, buffer, count, offset, tsk->native_handle());
446 template<
typename TCallback>
452 dsn_task_code_t callback_code,
454 TCallback&& callback,
458 auto tsk = create_aio_task(callback_code, svc, std::forward<TCallback>(callback), hash);
463 template<
typename TCallback>
464 task_ptr write_vector(
469 dsn_task_code_t callback_code,
471 TCallback&& callback,
475 auto tsk = create_aio_task(callback_code, svc, std::forward<TCallback>(callback), hash);
480 void copy_remote_files_impl(
482 const std::string& source_dir,
483 const std::vector<std::string>& files,
484 const std::string& dest_dir,
486 dsn_task_t native_task
489 template<
typename TCallback>
490 task_ptr copy_remote_files(
492 const std::string& source_dir,
493 const std::vector<std::string>& files,
494 const std::string& dest_dir,
496 dsn_task_code_t callback_code,
498 TCallback&& callback,
502 auto tsk = create_aio_task(callback_code, svc, std::forward<TCallback>(callback), hash);
503 copy_remote_files_impl(remote, source_dir, files, dest_dir, overwrite, tsk->native_handle());
507 template<
typename TCallback>
508 task_ptr copy_remote_directory(
510 const std::string& source_dir,
511 const std::string& dest_dir,
513 dsn_task_code_t callback_code,
515 TCallback&& callback,
519 return copy_remote_files(
520 remote, source_dir, {}, dest_dir, overwrite,
521 callback_code, svc, std::forward<TCallback>(callback), hash
Definition: clientlet.h:95
Definition: task_helper.h:64
Definition: clientlet.h:313
DSN_API dsn_task_t dsn_file_create_aio_task_ex(dsn_task_code_t code, dsn_aio_handler_t cb, dsn_task_cancelled_handler_t on_cancel, void *context, int hash DEFAULT(0), dsn_task_tracker_t tracker DEFAULT(nullptr))
create aio task which is executed on completion of the file operations
DSN_API dsn_address_t dsn_primary_address()
get the primary address of the rpc engine attached to the current thread
DSN_API void dsn_rpc_call(dsn_address_t server, dsn_task_t rpc_call)
client invokes the RPC call
DSN_API void dsn_file_write_vector(dsn_handle_t file, const dsn_file_buffer_t *buffers, int buffer_count, uint64_t offset, dsn_task_t cb)
write file asynchronously with vector buffers
Definition: api_layer1.h:939
DSN_API uint64_t dsn_random64(uint64_t min, uint64_t max)
return [min, max]
DSN_API void dsn_file_write(dsn_handle_t file, const char *buffer, int count, uint64_t offset, dsn_task_t cb)
write file asynchronously
Definition: auto_codes.h:303
DSN_API dsn_task_t dsn_task_create_ex(dsn_task_code_t code, dsn_task_handler_t cb, dsn_task_cancelled_handler_t on_cancel, void *context, int hash DEFAULT(0), dsn_task_tracker_t tracker DEFAULT(nullptr))
similar to dsn_task_create, except an on_cancel callback is provided to be executed when the task is ...
Definition: clientlet.h:111
DSN_API dsn_task_t dsn_task_create_timer_ex(dsn_task_code_t code, dsn_task_handler_t cb, dsn_task_cancelled_handler_t on_cancel, void *context, int hash, int interval_milliseconds, dsn_task_tracker_t tracker DEFAULT(nullptr))
similar to dsn_task_create_timer, except an on_cancel callback is provided to be executed when the ta...
Definition: task_helper.h:204
DSN_API void dsn_rpc_call_one_way(dsn_address_t server, dsn_message_t request)
one-way RPC from client, no rpc response is expected
Definition: clientlet.h:48
Definition: clientlet.h:80
__inline uint32_t dsn_random32(uint32_t min, uint32_t max)
return [min, max]
Definition: api_layer1.h:1141
DSN_API dsn_message_t dsn_msg_create_request(dsn_task_code_t rpc_code, int timeout_milliseconds DEFAULT(0), uint64_t hash DEFAULT(0))
create a rpc request message
DSN_API dsn_task_t dsn_rpc_create_response_task_ex(dsn_message_t request, dsn_rpc_response_handler_t cb, dsn_task_cancelled_handler_t on_cancel, void *context, int reply_thread_hash DEFAULT(0), dsn_task_tracker_t tracker DEFAULT(nullptr))
create a callback task to handle the response message from RPC server, or timeout.
Definition: clientlet.h:73
Definition: task_helper.h:149
Definition: task_helper.h:252
DSN_API void dsn_file_read(dsn_handle_t file, char *buffer, int count, uint64_t offset, dsn_task_t cb)
read file asynchronously