Robust Distributed System Nucleus (rDSN)  ver 1.0.0
rpc_stream.h
1 /*
2  * The MIT License (MIT)
3  *
4  * Copyright (c) 2015 Microsoft Corporation
5  *
6  * -=- Robust Distributed System Nucleus (rDSN) -=-
7  *
8  * Permission is hereby granted, free of charge, to any person obtaining a copy
9  * of this software and associated documentation files (the "Software"), to deal
10  * in the Software without restriction, including without limitation the rights
11  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
12  * copies of the Software, and to permit persons to whom the Software is
13  * furnished to do so, subject to the following conditions:
14  *
15  * The above copyright notice and this permission notice shall be included in
16  * all copies or substantial portions of the Software.
17  *
18  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
19  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
20  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
21  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
22  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
23  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
24  * THE SOFTWARE.
25  */
26 
27 /*
28  * Description:
29  * What is this file about?
30  *
31  * Revision history:
32  * xxxx-xx-xx, author, first version
33  * xxxx-xx-xx, author, fix bug about xxx
34  */
35 
36 # pragma once
37 
38 # include <dsn/service_api_c.h>
39 # include <dsn/cpp/blob.h>
40 # include <dsn/cpp/auto_codes.h>
41 
42 namespace dsn
43 {
48  class rpc_read_stream;
49  class rpc_write_stream;
50  typedef ::dsn::ref_ptr<rpc_read_stream> rpc_read_stream_ptr;
51  typedef ::dsn::ref_ptr<rpc_write_stream> rpc_write_stream_ptr;
52 
53  class rpc_read_stream :
54  public safe_handle<dsn_msg_release_ref>,
55  public binary_reader
56  {
57  public:
58  rpc_read_stream(dsn_message_t msg)
59  {
60  set_read_msg(msg);
61  }
62 
64  {
65  }
66 
67  void set_read_msg(dsn_message_t msg)
68  {
69  assign(msg, false);
70 
71  void* ptr;
72  size_t size;
73  bool r = dsn_msg_read_next(msg, &ptr, &size);
74  dassert(r, "read msg must have one segment of buffer ready");
75 
76  blob bb((const char*)ptr, 0, (int)size);
77  init(bb);
78  }
79 
81  {
82  if (native_handle())
83  {
84  dsn_msg_read_commit(native_handle(), (size_t)(total_size() - get_remaining_size()));
85  }
86  }
87  };
88 
90  public safe_handle<dsn_msg_release_ref>,
91  public binary_writer
92  {
93  public:
94  // for response
95  rpc_write_stream(dsn_message_t msg)
97  {
98  _last_write_next_committed = true;
99  _last_write_next_total_size = 0;
100  }
101 
102  // for request
103  rpc_write_stream(task_code code, int timeout_ms = 0, uint64_t hash = 0)
104  : safe_handle<dsn_msg_release_ref>(dsn_msg_create_request(code, timeout_ms, hash), false)
105  {
106  _last_write_next_committed = true;
107  _last_write_next_total_size = 0;
108  }
109 
110  // write buffer for rpc_write_stream is allocated from
111  // a per-thread pool, and it is expected that
112  // the per-thread pool cannot allocated two outstanding
113  // buffers at the same time.
114  // e.g., alloc1, commit1, alloc2, commit2 is ok
115  // while alloc1, alloc2, commit2, commit 1 is invalid
116  void commit_buffer()
117  {
118  if (!_last_write_next_committed)
119  {
120  dsn_msg_write_commit(native_handle(), (size_t)(total_size() - _last_write_next_total_size));
121  _last_write_next_committed = true;
122  }
123  }
124 
126  {
127  commit_buffer();
128  }
129 
130  private:
131  virtual void create_new_buffer(size_t size, /*out*/blob& bb) override
132  {
133  commit_buffer();
134 
135  void* ptr;
136  size_t sz;
137  dsn_msg_write_next(native_handle(), &ptr, &sz, size);
138  dbg_dassert(sz >= size, "allocated buffer size must be not less than the required size");
139  bb.assign((const char*)ptr, 0, (int)sz);
140 
141  _last_write_next_total_size = total_size();
142  _last_write_next_committed = false;
143  }
144 
145  private:
146  bool _last_write_next_committed;
147  int _last_write_next_total_size;
148  };
150 }
DSN_API bool dsn_msg_read_next(dsn_message_t msg, void **ptr, size_t *size)
get message read buffer
Definition: auto_codes.h:55
Definition: auto_codes.h:174
Definition: rpc_stream.h:89
DSN_API void dsn_msg_write_commit(dsn_message_t msg, size_t size)
commit the write buffer after the message content is written with the real written size ...
Definition: address.h:52
DSN_API void dsn_msg_read_commit(dsn_message_t msg, size_t size)
commit the read buffer after the message content is read with the real read size, it is possible to u...
DSN_API dsn_message_t dsn_msg_create_request(dsn_task_code_t rpc_code, int timeout_milliseconds DEFAULT(0), uint64_t hash DEFAULT(0))
create a rpc request message
Definition: rpc_stream.h:53
DSN_API void dsn_msg_write_next(dsn_message_t msg, void **ptr, size_t *size, size_t min_size)
get message write buffer