Robust Distributed System Nucleus (rDSN)  ver 1.0.0
network.h
1 /*
2  * The MIT License (MIT)
3  *
4  * Copyright (c) 2015 Microsoft Corporation
5  *
6  * -=- Robust Distributed System Nucleus (rDSN) -=-
7  *
8  * Permission is hereby granted, free of charge, to any person obtaining a copy
9  * of this software and associated documentation files (the "Software"), to deal
10  * in the Software without restriction, including without limitation the rights
11  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
12  * copies of the Software, and to permit persons to whom the Software is
13  * furnished to do so, subject to the following conditions:
14  *
15  * The above copyright notice and this permission notice shall be included in
16  * all copies or substantial portions of the Software.
17  *
18  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
19  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
20  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
21  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
22  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
23  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
24  * THE SOFTWARE.
25  */
26 
27 /*
28  * Description:
29  * base interface for a network provider
30  *
31  * Revision history:
32  * Mar., 2015, @imzhenyu (Zhenyu Guo), first version
33  * xxxx-xx-xx, author, fix bug about xxx
34  */
35 
36 # pragma once
37 
38 # include <dsn/tool-api/task.h>
39 # include <dsn/utility/synchronize.h>
40 # include <dsn/tool-api/message_parser.h>
41 # include <dsn/cpp/address.h>
42 # include <dsn/utility/exp_delay.h>
43 # include <dsn/utility/dlib.h>
44 # include <atomic>
45 
46 namespace dsn {
47 
48  class rpc_engine;
49  class service_node;
50  class task_worker_pool;
51  class task_queue;
61  class network
62  {
63  public:
64  //
65  // network factory prototype
66  //
67  template <typename T> static network* create(rpc_engine* srv, network* inner_provider)
68  {
69  return new T(srv, inner_provider);
70  }
71 
72  typedef network* (*factory)(rpc_engine*, network*);
73 
74  public:
75  //
76  // srv - the rpc engine, could contain many networks there
77  // inner_provider - when not null, this network is simply a wrapper for tooling purpose (e.g., tracing)
78  // all downcalls should be redirected to the inner provider in the end
79  //
80  DSN_API network(rpc_engine* srv, network* inner_provider);
81  virtual ~network() {}
82 
83  //
84  // when client_only is true, port is faked (equal to app id for tracing purpose)
85  //
86  virtual error_code start(rpc_channel channel, int port, bool client_only, io_modifer& ctx) = 0;
87 
88  //
89  // the named address
90  //
91  virtual ::dsn::rpc_address address() = 0;
92 
93  //
94  // this is where the upper rpc engine calls down for a RPC call
95  // request - the message to be sent, all meta info (e.g., timeout, server address are
96  // prepared ready in its header; use message_parser to extract
97  // blobs from message for sending
98  //
99  virtual void send_message(message_ex* request) = 0;
100 
101  //
102  // tools in rDSN may decide to drop this msg,
103  // in this case, the network should implement the appropriate
104  // failure model that makes this failure possible in reality
105  //
106  virtual void inject_drop_message(message_ex* msg, bool is_send) = 0;
107 
108  //
109  // utilities
110  //
111  DSN_API service_node* node() const;
112 
113  //
114  // called when network received a complete request message
115  //
116  DSN_API void on_recv_request(message_ex* msg, int delay_ms);
117 
118  //
119  // called when network received a complete reply message or network failed,
120  // if network failed, the 'msg' will be nullptr
121  //
122  DSN_API void on_recv_reply(uint64_t id, message_ex* msg, int delay_ms);
123 
124  //
125  // create a message parser for
126  // (1) extracing blob from a RPC request message for low layer'
127  // (2) parsing a incoming blob message to get the rpc_message
128  //
129  DSN_API message_parser* new_message_parser(network_header_format hdr_format);
130 
131  // for in-place new message parser
132  DSN_API std::pair<message_parser::factory2, size_t> get_message_parser_info(network_header_format hdr_format);
133 
134  rpc_engine* engine() const { return _engine; }
135  int max_buffer_block_count_per_send() const { return _max_buffer_block_count_per_send; }
136  network_header_format client_hdr_format() const { return _client_hdr_format; }
137  network_header_format unknown_msg_hdr_format() const { return _unknown_msg_header_format; }
138  int message_buffer_block_size() const { return _message_buffer_block_size; }
139 
140  protected:
141  DSN_API static uint32_t get_local_ipv4();
142 
143  protected:
144  rpc_engine *_engine;
145  network_header_format _client_hdr_format;
146  network_header_format _unknown_msg_header_format; // default is NET_HDR_INVALID
147  int _message_buffer_block_size;
148  int _max_buffer_block_count_per_send;
149  int _send_queue_threshold;
150 
151  private:
152  friend class rpc_engine;
153  DSN_API void reset_parser_attr(network_header_format client_hdr_format, int message_buffer_block_size);
154  };
155 
160  {
161  public:
162  DSN_API connection_oriented_network(rpc_engine* srv, network* inner_provider);
163  virtual ~connection_oriented_network() {}
164 
165  // server session management
166  DSN_API rpc_session_ptr get_server_session(::dsn::rpc_address ep);
167  DSN_API void on_server_session_accepted(rpc_session_ptr& s);
168  DSN_API void on_server_session_disconnected(rpc_session_ptr& s);
169 
170  // client session management
171  DSN_API rpc_session_ptr get_client_session(::dsn::rpc_address ep);
172  DSN_API void on_client_session_connected(rpc_session_ptr& s);
173  DSN_API void on_client_session_disconnected(rpc_session_ptr& s);
174 
175  // called upon RPC call, rpc client session is created on demand
176  DSN_API virtual void send_message(message_ex* request) override;
177 
178  // called by rpc engine
179  DSN_API virtual void inject_drop_message(message_ex* msg, bool is_send) override;
180 
181  // to be defined
182  virtual rpc_session_ptr create_client_session(::dsn::rpc_address server_addr) = 0;
183 
184  protected:
185  typedef std::unordered_map< ::dsn::rpc_address, rpc_session_ptr> client_sessions;
186  client_sessions _clients; // to_address => rpc_session
187  utils::rw_lock_nr _clients_lock;
188 
189  typedef std::unordered_map< ::dsn::rpc_address, rpc_session_ptr> server_sessions;
190  server_sessions _servers; // from_address => rpc_session
191  utils::rw_lock_nr _servers_lock;
192  };
193 
197  class rpc_client_matcher;
198  class rpc_session : public ref_counter
199  {
200  public:
205  DSN_API static join_point<void, rpc_session*> on_rpc_session_connected;
206  DSN_API static join_point<void, rpc_session*> on_rpc_session_disconnected;
208  public:
209  DSN_API rpc_session(
211  ::dsn::rpc_address remote_addr,
212  message_parser_ptr& parser,
213  bool is_client
214  );
215  DSN_API virtual ~rpc_session();
216 
217  virtual void close_on_fault_injection() = 0;
218 
219  DSN_API bool has_pending_out_msgs();
220  bool is_client() const { return _is_client; }
221  ::dsn::rpc_address remote_address() const { return _remote_addr; }
222  connection_oriented_network& net() const { return _net; }
223  message_parser_ptr parser() const { return _parser; }
224  DSN_API void send_message(message_ex* msg);
225  DSN_API bool cancel(message_ex* request);
226  void delay_recv(int delay_ms);
227  DSN_API bool on_recv_message(message_ex* msg, int delay_ms);
228 
229  // for client session
230  public:
231  // return true if the socket should be closed
232  DSN_API bool on_disconnected(bool is_write);
233 
234  virtual void connect() = 0;
235 
236  // for server session
237  public:
238  DSN_API void start_read_next(int read_next = 256);
239 
240  // should be called in do_read() before using _parser when it is nullptr.
241  // returns:
242  // -1 : prepare failed, maybe because of invalid message header type
243  // 0 : prepare succeed, _parser is not nullptr now.
244  // >0 : need read more data, returns read_next.
245  DSN_API int prepare_parser();
246 
247  // shared
248  protected:
249  //
250  // sending messages are put in _sending_msgs
251  // buffer is prepared well in _sending_buffers
252  // always call on_send_completed later
253  //
254  virtual void send(uint64_t signature) = 0;
255  virtual void do_read(int read_next) = 0;
256 
257  protected:
258  DSN_API bool try_connecting(); // return true when it is permitted
259  DSN_API void set_connected();
260  DSN_API bool set_disconnected(); // return true when it is permitted
261  bool is_disconnected() const { return _connect_state == SS_DISCONNECTED; }
262  bool is_connecting() const { return _connect_state == SS_CONNECTING; }
263  bool is_connected() const { return _connect_state == SS_CONNECTED; }
264  DSN_API void on_send_completed(uint64_t signature = 0); // default value for nothing is sent
265 
266  private:
267  // return whether there are messages for sending; should always be called in lock
268  DSN_API bool unlink_message_for_send();
269  DSN_API void clear_send_queue(bool resend_msgs);
270 
271  protected:
272  // constant info
274  ::dsn::rpc_address _remote_addr;
275  int _max_buffer_block_count_per_send;
276  message_reader _reader;
277  message_parser_ptr _parser;
278 
279  // messages are currently being sent
280  // also locked by _lock later
281  std::vector<message_parser::send_buf> _sending_buffers;
282  std::vector<message_ex*> _sending_msgs;
283 
284  private:
285  const bool _is_client;
286  rpc_client_matcher *_matcher;
287 
288  enum session_state
289  {
290  SS_CONNECTING,
291  SS_CONNECTED,
292  SS_DISCONNECTED
293  };
294 
295  // TODO: expose the queue to be customizable
296  ::dsn::utils::ex_lock_nr _lock; // [
297  volatile bool _is_sending_next;
298  int _message_count; // count of _messages
299  dlink _messages;
300  volatile session_state _connect_state;
301  uint64_t _message_sent;
302  // ]
303 
304  std::atomic_int _delay_server_receive_ms;
305  };
306 
307  // --------- inline implementation --------------
308  inline void rpc_session::delay_recv(int delay_ms)
309  {
310  int old_delay_ms = _delay_server_receive_ms.load();
311  while (delay_ms > old_delay_ms && !_delay_server_receive_ms.compare_exchange_weak(old_delay_ms, delay_ms)) {}
312  }
313 
315 }
network bound to a specific rpc_channel and port (see start) !!! all threads must be started with tas...
Definition: network.h:61
Definition: network.h:198
Definition: auto_codes.h:303
Definition: task_spec.h:184
Definition: address.h:52
an incomplete network implementation for connection oriented network, e.g., TCP
Definition: network.h:159
Definition: address.h:61