38 # include <dsn/tool-api/task.h> 39 # include <dsn/utility/synchronize.h> 40 # include <dsn/tool-api/message_parser.h> 41 # include <dsn/cpp/address.h> 42 # include <dsn/utility/exp_delay.h> 43 # include <dsn/utility/dlib.h> 50 class task_worker_pool;
67 template <
typename T>
static network* create(rpc_engine* srv,
network* inner_provider)
69 return new T(srv, inner_provider);
91 virtual ::dsn::rpc_address address() = 0;
99 virtual void send_message(message_ex* request) = 0;
106 virtual void inject_drop_message(message_ex* msg,
bool is_send) = 0;
111 DSN_API service_node* node()
const;
116 DSN_API
void on_recv_request(message_ex* msg,
int delay_ms);
122 DSN_API
void on_recv_reply(uint64_t
id, message_ex* msg,
int delay_ms);
129 DSN_API message_parser* new_message_parser(network_header_format hdr_format);
132 DSN_API std::pair<message_parser::factory2, size_t> get_message_parser_info(network_header_format hdr_format);
134 rpc_engine* engine()
const {
return _engine; }
135 int max_buffer_block_count_per_send()
const {
return _max_buffer_block_count_per_send; }
136 network_header_format client_hdr_format()
const {
return _client_hdr_format; }
137 network_header_format unknown_msg_hdr_format()
const {
return _unknown_msg_header_format; }
138 int message_buffer_block_size()
const {
return _message_buffer_block_size; }
141 DSN_API
static uint32_t get_local_ipv4();
145 network_header_format _client_hdr_format;
146 network_header_format _unknown_msg_header_format;
147 int _message_buffer_block_size;
148 int _max_buffer_block_count_per_send;
149 int _send_queue_threshold;
152 friend class rpc_engine;
153 DSN_API
void reset_parser_attr(network_header_format client_hdr_format,
int message_buffer_block_size);
163 virtual ~connection_oriented_network() {}
167 DSN_API
void on_server_session_accepted(rpc_session_ptr& s);
168 DSN_API
void on_server_session_disconnected(rpc_session_ptr& s);
172 DSN_API
void on_client_session_connected(rpc_session_ptr& s);
173 DSN_API
void on_client_session_disconnected(rpc_session_ptr& s);
176 DSN_API
virtual void send_message(message_ex* request)
override;
179 DSN_API
virtual void inject_drop_message(message_ex* msg,
bool is_send)
override;
182 virtual rpc_session_ptr create_client_session(::
dsn::rpc_address server_addr) = 0;
185 typedef std::unordered_map< ::dsn::rpc_address, rpc_session_ptr> client_sessions;
186 client_sessions _clients;
187 utils::rw_lock_nr _clients_lock;
189 typedef std::unordered_map< ::dsn::rpc_address, rpc_session_ptr> server_sessions;
190 server_sessions _servers;
191 utils::rw_lock_nr _servers_lock;
197 class rpc_client_matcher;
205 DSN_API
static join_point<void, rpc_session*> on_rpc_session_connected;
206 DSN_API
static join_point<void, rpc_session*> on_rpc_session_disconnected;
212 message_parser_ptr& parser,
215 DSN_API
virtual ~rpc_session();
217 virtual void close_on_fault_injection() = 0;
219 DSN_API
bool has_pending_out_msgs();
220 bool is_client()
const {
return _is_client; }
223 message_parser_ptr parser()
const {
return _parser; }
224 DSN_API
void send_message(message_ex* msg);
225 DSN_API
bool cancel(message_ex* request);
226 void delay_recv(
int delay_ms);
227 DSN_API
bool on_recv_message(message_ex* msg,
int delay_ms);
232 DSN_API
bool on_disconnected(
bool is_write);
234 virtual void connect() = 0;
238 DSN_API
void start_read_next(
int read_next = 256);
245 DSN_API
int prepare_parser();
254 virtual void send(uint64_t signature) = 0;
255 virtual void do_read(
int read_next) = 0;
258 DSN_API
bool try_connecting();
259 DSN_API
void set_connected();
260 DSN_API
bool set_disconnected();
261 bool is_disconnected()
const {
return _connect_state == SS_DISCONNECTED; }
262 bool is_connecting()
const {
return _connect_state == SS_CONNECTING; }
263 bool is_connected()
const {
return _connect_state == SS_CONNECTED; }
264 DSN_API
void on_send_completed(uint64_t signature = 0);
268 DSN_API
bool unlink_message_for_send();
269 DSN_API
void clear_send_queue(
bool resend_msgs);
275 int _max_buffer_block_count_per_send;
276 message_reader _reader;
277 message_parser_ptr _parser;
281 std::vector<message_parser::send_buf> _sending_buffers;
282 std::vector<message_ex*> _sending_msgs;
285 const bool _is_client;
286 rpc_client_matcher *_matcher;
296 ::dsn::utils::ex_lock_nr _lock;
297 volatile bool _is_sending_next;
300 volatile session_state _connect_state;
301 uint64_t _message_sent;
304 std::atomic_int _delay_server_receive_ms;
308 inline void rpc_session::delay_recv(
int delay_ms)
310 int old_delay_ms = _delay_server_receive_ms.load();
311 while (delay_ms > old_delay_ms && !_delay_server_receive_ms.compare_exchange_weak(old_delay_ms, delay_ms)) {}
network bound to a specific rpc_channel and port (see start) !!! all threads must be started with tas...
Definition: network.h:61
Definition: network.h:198
Definition: auto_codes.h:303
Definition: task_spec.h:184
an incomplete network implementation for connection oriented network, e.g., TCP
Definition: network.h:159