3232
3333#include " marshal.h"
3434#include " network/connection_handler_task.h"
35+ #include " network/network_io_wrappers.h"
3536#include " network_state.h"
3637#include " protocol_handler.h"
3738
4142namespace peloton {
4243namespace network {
4344
44- // TODO(tianyu) This class is not refactored in full as rewriting the logic is
45- // not cost-effective. However, readability
46- // improvement and other changes may become desirable in the future. Other than
47- // code clutter, responsibility assignment
48- // is not well thought-out in this class. Abstracting out some type of socket
49- // wrapper would be nice.
5045/* *
5146 * @brief A ConnectionHandle encapsulates all information about a client
52- * connection for its entire duration.
53- * One should not use the constructor to construct a new ConnectionHandle
54- * instance every time as it is expensive
55- * to allocate buffers. Instead, use the ConnectionHandleFactory.
56- *
57- * @see ConnectionHandleFactory
47+ * connection for its entire duration. This includes a state machine and the
48+ * necessary libevent infrastructure for a handler to work on this connection.
5849 */
5950class ConnectionHandle {
6051 public:
6152 /* *
62- * Update the existing event to listen to the passed flags
53+ * Constructs a new ConnectionHandle
54+ * @param sock_fd Client's connection fd
55+ * @param handler The handler responsible for this handle
6356 */
64- void UpdateEventFlags ( short flags );
57+ ConnectionHandle ( int sock_fd, ConnectionHandlerTask *handler );
6558
66- WriteState WritePackets ();
67-
68- std::string WriteBufferToString ();
59+ /* *
60+ * @brief Signal to libevent that this ConnectionHandle is ready to handle
61+ * events
62+ *
63+ * This method needs to be called separately after initialization for the
64+ * connection handle to do anything. The reason why this is not performed in
65+ * the constructor is because it publishes pointers to this object. While the
66+ * object should be fully initialized at that point, it's never a bad idea
67+ * to be careful.
68+ */
69+ inline void RegisterToReceiveEvents () {
70+ workpool_event_ = conn_handler_->RegisterManualEvent (
71+ METHOD_AS_CALLBACK (ConnectionHandle, HandleEvent), this );
72+
73+ // TODO(Tianyi): should put the initialization else where.. check
74+ // correctness first.
75+ tcop_.SetTaskCallback (
76+ [](void *arg) {
77+ struct event *event = static_cast <struct event *>(arg);
78+ event_active (event, EV_WRITE, 0 );
79+ },
80+ workpool_event_);
81+
82+ network_event_ = conn_handler_->RegisterEvent (
83+ io_wrapper_->GetSocketFd (), EV_READ | EV_PERSIST,
84+ METHOD_AS_CALLBACK (ConnectionHandle, HandleEvent), this );
85+ }
6986
87+ /* *
88+ * Handles a libevent event. This simply delegates the the state machine.
89+ */
7090 inline void HandleEvent (int , short ) {
7191 state_machine_.Accept (Transition::WAKEUP, *this );
7292 }
7393
74- // Exposed for testing
75- const std::unique_ptr<ProtocolHandler> &GetProtocolHandler () const {
76- return protocol_handler_;
77- }
94+ /* State Machine Actions */
95+ // TODO(Tianyu): Write some documentation when feeling like it
96+ inline Transition TryRead () { return io_wrapper_->FillReadBuffer (); }
97+ Transition TryWrite ();
98+ Transition Process ();
99+ Transition GetResult ();
100+ Transition TrySslHandshake ();
101+ Transition TryCloseConnection ();
78102
79- // State Machine actions
80103 /* *
81- * refill_read_buffer - Used to repopulate read buffer with a fresh
82- * batch of data from the socket
104+ * Updates the event flags of the network event. This configures how the
105+ * handler reacts to client activity from this connection.
106+ * @param flags new flags for the event handle.
83107 */
84- Transition FillReadBuffer ();
85- Transition Wait ();
86- Transition Process ();
87- Transition ProcessWrite ( );
88- Transition GetResult ();
89- Transition CloseSocket ();
108+ inline void UpdateEventFlags ( short flags) {
109+ conn_handler_-> UpdateEvent (
110+ network_event_, io_wrapper_-> GetSocketFd (), flags,
111+ METHOD_AS_CALLBACK (ConnectionHandle, HandleEvent), this );
112+ }
113+
90114 /* *
91- * Flush out all the responses and do real SSL handshake
115+ * Stops receiving network events from client connection. This is useful when
116+ * we are waiting on peloton to return the result of a query and not handling
117+ * client query.
92118 */
93- Transition ProcessWrite_SSLHandshake ();
119+ inline void StopReceivingNetworkEvent () {
120+ EventUtil::EventDel (network_event_);
121+ }
94122
95123 private:
96124 /* *
@@ -145,55 +173,7 @@ class ConnectionHandle {
145173 };
146174
147175 friend class StateMachine ;
148- friend class ConnectionHandleFactory ;
149-
150- ConnectionHandle (int sock_fd, ConnectionHandlerTask *handler,
151- std::shared_ptr<Buffer> rbuf, std::shared_ptr<Buffer> wbuf);
152-
153- /* *
154- * Writes a packet's header (type, size) into the write buffer
155- */
156- WriteState BufferWriteBytesHeader (OutputPacket *pkt);
157-
158- /* *
159- * Writes a packet's content into the write buffer
160- */
161- WriteState BufferWriteBytesContent (OutputPacket *pkt);
162-
163- /* *
164- * Used to invoke a write into the Socket, returns false if the socket is not
165- * ready for write
166- */
167- WriteState FlushWriteBuffer ();
168-
169- /* *
170- * @brief: process SSL handshake to generate valid SSL
171- * connection context for further communications
172- * @return FINISH when the SSL handshake failed
173- * PROCEED when the SSL handshake success
174- * NEED_DATA when the SSL handshake is partially done due to network
175- * latency
176- */
177- Transition SSLHandshake ();
178-
179- /* *
180- * Set the socket to non-blocking mode
181- */
182- inline void SetNonBlocking (evutil_socket_t fd) {
183- auto flags = fcntl (fd, F_GETFL);
184- flags |= O_NONBLOCK;
185- if (fcntl (fd, F_SETFL, flags) < 0 ) {
186- LOG_ERROR (" Failed to set non-blocking socket" );
187- }
188- }
189-
190- /* *
191- * Set TCP No Delay for lower latency
192- */
193- inline void SetTCPNoDelay (evutil_socket_t fd) {
194- int one = 1 ;
195- setsockopt (fd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof one);
196- }
176+ friend class NetworkIoWrapperFactory ;
197177
198178 /* *
199179 * @brief: Determine if there is still responses in the buffer
@@ -202,27 +182,17 @@ class ConnectionHandle {
202182 */
203183 inline bool HasResponse () {
204184 return (protocol_handler_->responses_ .size () != 0 ) ||
205- (wbuf_->buf_size != 0 );
185+ (io_wrapper_-> wbuf_ ->size_ != 0 );
206186 }
207187
208- int sock_fd_; // socket file descriptor
209- struct event *network_event = nullptr ; // something to read from network
210- struct event *workpool_event = nullptr ; // worker thread done the job
211-
212- SSL *conn_SSL_context = nullptr ; // SSL context for the connection
213-
214- ConnectionHandlerTask *handler_; // reference to the network thread
215- std::unique_ptr<ProtocolHandler>
216- protocol_handler_; // Stores state for this socket
217- tcop::TrafficCop traffic_cop_;
218-
219- std::shared_ptr<Buffer> rbuf_; // Socket's read buffer
220- std::shared_ptr<Buffer> wbuf_; // Socket's write buffer
221- unsigned int next_response_ = 0 ; // The next response in the response buffer
222-
188+ ConnectionHandlerTask *conn_handler_;
189+ std::shared_ptr<NetworkIoWrapper> io_wrapper_;
223190 StateMachine state_machine_;
224-
225- short curr_event_flag_; // current libevent event flag
191+ struct event *network_event_ = nullptr , *workpool_event_ = nullptr ;
192+ std::unique_ptr<ProtocolHandler> protocol_handler_ = nullptr ;
193+ tcop::TrafficCop tcop_;
194+ // TODO(Tianyu): Put this into protocol handler in a later refactor
195+ unsigned int next_response_ = 0 ;
226196};
227197} // namespace network
228198} // namespace peloton
0 commit comments