@@ -131,22 +131,29 @@ handle_info(#'basic.cancel'{}, State) ->
131
131
handle_info ({'EXIT' , _Conn , Reason }, State ) ->
132
132
{stop , {connection_died , Reason }, State };
133
133
134
- handle_info ({inet_reply , _Ref , ok }, State ) ->
135
- {noreply , State , hibernate };
136
-
137
- handle_info ({inet_async , Sock , _Ref , {ok , Data }},
138
- State = # state { socket = Sock , connection_state = blocked }) ->
134
+ handle_info ({Tag , Sock , Data },
135
+ State = # state { socket = Sock , connection_state = blocked })
136
+ when Tag =:= tcp ; Tag =:= ssl ->
139
137
{noreply , State # state { deferred_recv = Data }, hibernate };
140
138
141
- handle_info ({inet_async , Sock , _Ref , {ok , Data }},
142
- State = # state { socket = Sock , connection_state = running }) ->
139
+ handle_info ({Tag , Sock , Data },
140
+ State = # state { socket = Sock , connection_state = running })
141
+ when Tag =:= tcp ; Tag =:= ssl ->
143
142
process_received_bytes (
144
143
Data , control_throttle (State # state { await_recv = false }));
145
144
146
- handle_info ({inet_async , _Sock , _Ref , {error , Reason }}, State = # state {}) ->
145
+ handle_info ({Tag , Sock }, State = # state {socket = Sock })
146
+ when Tag =:= tcp_closed ; Tag =:= ssl_closed ->
147
+ network_error (closed , State );
148
+
149
+ handle_info ({Tag , Sock , Reason }, State = # state {socket = Sock })
150
+ when Tag =:= tcp_error ; Tag =:= ssl_error ->
147
151
network_error (Reason , State );
148
152
149
- handle_info ({inet_reply , _Sock , {error , Reason }}, State = # state {}) ->
153
+ handle_info ({inet_reply , Sock , ok }, State = # state {socket = Sock }) ->
154
+ {noreply , State , hibernate };
155
+
156
+ handle_info ({inet_reply , Sock , {error , Reason }}, State = # state {socket = Sock }) ->
150
157
network_error (Reason , State );
151
158
152
159
handle_info ({conserve_resources , Conserve }, State ) ->
@@ -348,7 +355,7 @@ run_socket(State = #state{ deferred_recv = Data }) when Data =/= undefined ->
348
355
run_socket (State = # state { await_recv = true }) ->
349
356
State ;
350
357
run_socket (State = # state { socket = Sock }) ->
351
- rabbit_net :async_recv (Sock , 0 , infinity ),
358
+ rabbit_net :setopts (Sock , [{ active , once }] ),
352
359
State # state { await_recv = true }.
353
360
354
361
control_throttle (State = # state { connection_state = Flow ,
@@ -367,7 +374,7 @@ control_throttle(State = #state{ connection_state = Flow,
367
374
maybe_process_deferred_recv (State = # state { deferred_recv = undefined }) ->
368
375
{noreply , State , hibernate };
369
376
maybe_process_deferred_recv (State = # state { deferred_recv = Data , socket = Sock }) ->
370
- handle_info ({inet_async , Sock , noref , { ok , Data } },
377
+ handle_info ({tcp , Sock , Data },
371
378
State # state { deferred_recv = undefined }).
372
379
373
380
maybe_emit_stats (undefined ) ->
0 commit comments