1717package io.rsocket.kotlin.core
1818
1919import app.cash.turbine.*
20+ import io.ktor.utils.io.core.*
2021import io.rsocket.kotlin.*
2122import io.rsocket.kotlin.logging.*
2223import io.rsocket.kotlin.payload.*
@@ -54,7 +55,7 @@ class ReconnectableRSocketTest : SuspendTest, TestWithLeakCheck {
5455 val connect: suspend () -> RSocket = {
5556 if (first.value) {
5657 first.value = false
57- rrHandler (firstJob)
58+ handler (firstJob)
5859 } else {
5960 error(" Failed to connect" )
6061 }
@@ -89,7 +90,7 @@ class ReconnectableRSocketTest : SuspendTest, TestWithLeakCheck {
8990 first.value = false
9091 error(" Failed to connect" )
9192 } else {
92- rrHandler (handlerJob)
93+ handler (handlerJob)
9394 }
9495 }
9596 val rSocket = ReconnectableRSocket (logger, connect) { cause, attempt ->
@@ -114,7 +115,7 @@ class ReconnectableRSocketTest : SuspendTest, TestWithLeakCheck {
114115 error(" Failed to connect" )
115116 } else {
116117 delay(200 ) // emulate connection establishment
117- rrHandler (Job ())
118+ handler (Job ())
118119 }
119120 }
120121 val rSocket = ReconnectableRSocket (logger, connect) { cause, attempt ->
@@ -137,13 +138,13 @@ class ReconnectableRSocketTest : SuspendTest, TestWithLeakCheck {
137138 when {
138139 first.value -> {
139140 first.value = false
140- rrHandler (firstJob) // first connection
141+ handler (firstJob) // first connection
141142 }
142143 fails.value < 5 -> {
143144 delay(100 )
144145 error(" Failed to connect" )
145146 }
146- else -> rrHandler (Job ())
147+ else -> handler (Job ())
147148 }
148149 }
149150 val rSocket = ReconnectableRSocket (logger, connect) { cause, attempt ->
@@ -170,13 +171,13 @@ class ReconnectableRSocketTest : SuspendTest, TestWithLeakCheck {
170171 when {
171172 first.value -> {
172173 first.value = false
173- streamHandler (firstJob) // first connection
174+ handler (firstJob) // first connection
174175 }
175176 fails.value < 5 -> {
176177 delay(100 )
177178 error(" Failed to connect" )
178179 }
179- else -> streamHandler (Job ())
180+ else -> handler (Job ())
180181 }
181182 }
182183 val rSocket = ReconnectableRSocket (logger, connect) { cause, attempt ->
@@ -206,8 +207,52 @@ class ReconnectableRSocketTest : SuspendTest, TestWithLeakCheck {
206207 assertEquals(5 , fails.value)
207208 }
208209
209- private fun rrHandler (job : Job ): RSocket = RSocketRequestHandler (job) { requestResponse { it } }
210- private fun streamHandler (job : Job ): RSocket = RSocketRequestHandler (job) {
210+ @Test
211+ fun testNoLeakMetadataPush () = testNoLeaksInteraction { metadataPush(it.data) }
212+
213+ @Test
214+ fun testNoLeakFireAndForget () = testNoLeaksInteraction { fireAndForget(it) }
215+
216+ @Test
217+ fun testNoLeakRequestResponse () = testNoLeaksInteraction { requestResponse(it) }
218+
219+ @Test
220+ fun testNoLeakRequestStream () = testNoLeaksInteraction { requestStream(it).collect() }
221+
222+ private inline fun testNoLeaksInteraction (crossinline interaction : suspend RSocket .(payload: Payload ) -> Unit ) = test {
223+ val firstJob = Job ()
224+ val connect: suspend () -> RSocket = {
225+ if (first.compareAndSet(true , false )) {
226+ handler(firstJob)
227+ } else {
228+ error(" Failed to connect" )
229+ }
230+ }
231+ val rSocket = ReconnectableRSocket (logger, connect) { _, attempt ->
232+ delay(100 )
233+ attempt < 5
234+ }
235+
236+ rSocket.requestResponse(Payload .Empty ) // first request to be sure, that connected
237+ firstJob.cancelAndJoin() // cancel
238+
239+ val p = payload(" text" )
240+ assertFails {
241+ rSocket.interaction(p) // test release on reconnecting
242+ }
243+ assertTrue(p.data.isEmpty)
244+
245+ val p2 = payload(" text" )
246+ assertFails {
247+ rSocket.interaction(p2) // test release on failed
248+ }
249+ assertTrue(p2.data.isEmpty)
250+ }
251+
252+ private fun handler (job : Job ): RSocket = RSocketRequestHandler (job) {
253+ requestResponse { payload ->
254+ payload
255+ }
211256 requestStream {
212257 flow {
213258 repeat(5 ) {
0 commit comments