Skip to content

Commit 67da0ee

Browse files
committed
Remove usages of deprecated in ktor APIs
* introduce BufferPool as a placeholder for something that will (or will not) available after migration to ktor 3.0 and kotlinx-io
1 parent 37315f8 commit 67da0ee

File tree

78 files changed

+366
-428
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

78 files changed

+366
-428
lines changed

rsocket-core/api/rsocket-core.api

Lines changed: 45 additions & 46 deletions
Large diffs are not rendered by default.

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/Connection.kt

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2022 the original author or authors.
2+
* Copyright 2015-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,28 +17,25 @@
1717
package io.rsocket.kotlin
1818

1919
import io.ktor.utils.io.core.*
20-
import io.ktor.utils.io.core.internal.*
21-
import io.ktor.utils.io.pool.*
2220
import io.rsocket.kotlin.frame.*
2321
import io.rsocket.kotlin.internal.*
22+
import io.rsocket.kotlin.internal.io.*
2423
import kotlinx.coroutines.*
2524

2625
/**
2726
* That interface isn't stable for inheritance.
2827
*/
2928
@TransportApi
3029
public interface Connection : CoroutineScope {
31-
public val pool: ObjectPool<ChunkBuffer> get() = ChunkBuffer.Pool
32-
3330
public suspend fun send(packet: ByteReadPacket)
3431
public suspend fun receive(): ByteReadPacket
3532
}
3633

3734
@OptIn(TransportApi::class)
38-
internal suspend inline fun <T> Connection.receiveFrame(block: (frame: Frame) -> T): T =
35+
internal suspend inline fun <T> Connection.receiveFrame(pool: BufferPool, block: (frame: Frame) -> T): T =
3936
receive().readFrame(pool).closeOnError(block)
4037

4138
@OptIn(TransportApi::class)
42-
internal suspend fun Connection.sendFrame(frame: Frame) {
39+
internal suspend fun Connection.sendFrame(pool: BufferPool, frame: Frame) {
4340
frame.toPacket(pool).closeOnError { send(it) }
4441
}

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketConnector.kt

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2022 the original author or authors.
2+
* Copyright 2015-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,6 +20,7 @@ import io.rsocket.kotlin.*
2020
import io.rsocket.kotlin.frame.*
2121
import io.rsocket.kotlin.frame.io.*
2222
import io.rsocket.kotlin.internal.*
23+
import io.rsocket.kotlin.internal.io.*
2324
import io.rsocket.kotlin.logging.*
2425
import io.rsocket.kotlin.transport.*
2526
import kotlinx.coroutines.*
@@ -32,6 +33,7 @@ public class RSocketConnector internal constructor(
3233
private val connectionConfigProvider: () -> ConnectionConfig,
3334
private val acceptor: ConnectionAcceptor,
3435
private val reconnectPredicate: ReconnectPredicate?,
36+
private val bufferPool: BufferPool,
3537
) {
3638

3739
public suspend fun connect(transport: ClientTransport): RSocket = when (reconnectPredicate) {
@@ -68,9 +70,10 @@ public class RSocketConnector internal constructor(
6870
maxFragmentSize = maxFragmentSize,
6971
interceptors = interceptors,
7072
connectionConfig = connectionConfig,
71-
acceptor = acceptor
73+
acceptor = acceptor,
74+
bufferPool = bufferPool
7275
)
73-
connection.sendFrame(setupFrame)
76+
connection.sendFrame(bufferPool, setupFrame)
7477
return requester
7578
} catch (cause: Throwable) {
7679
connectionConfig.setupPayload.close()
@@ -82,5 +85,5 @@ public class RSocketConnector internal constructor(
8285

8386
private fun Connection.wrapConnection(): Connection =
8487
interceptors.wrapConnection(this)
85-
.logging(loggerFactory.logger("io.rsocket.kotlin.frame"))
88+
.logging(loggerFactory.logger("io.rsocket.kotlin.frame"), bufferPool)
8689
}

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketConnectorBuilder.kt

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2022 the original author or authors.
2+
* Copyright 2015-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,6 +18,7 @@ package io.rsocket.kotlin.core
1818

1919
import io.rsocket.kotlin.*
2020
import io.rsocket.kotlin.internal.*
21+
import io.rsocket.kotlin.internal.io.*
2122
import io.rsocket.kotlin.keepalive.*
2223
import io.rsocket.kotlin.logging.*
2324
import io.rsocket.kotlin.payload.*
@@ -35,6 +36,9 @@ public class RSocketConnectorBuilder internal constructor() {
3536
field = value
3637
}
3738

39+
@Deprecated("Only for tests in rsocket", level = DeprecationLevel.ERROR)
40+
public var bufferPool: BufferPool = BufferPool.Default
41+
3842
private val connectionConfig: ConnectionConfigBuilder = ConnectionConfigBuilder()
3943
private val interceptors: InterceptorsBuilder = InterceptorsBuilder()
4044
private var acceptor: ConnectionAcceptor? = null
@@ -108,7 +112,8 @@ public class RSocketConnectorBuilder internal constructor() {
108112
interceptors.build(),
109113
connectionConfig.producer(),
110114
acceptor ?: defaultAcceptor,
111-
reconnectPredicate
115+
reconnectPredicate,
116+
@Suppress("DEPRECATION_ERROR") bufferPool
112117
)
113118

114119
private companion object {

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketServer.kt

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2022 the original author or authors.
2+
* Copyright 2015-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,6 +20,7 @@ import io.rsocket.kotlin.*
2020
import io.rsocket.kotlin.frame.*
2121
import io.rsocket.kotlin.frame.io.*
2222
import io.rsocket.kotlin.internal.*
23+
import io.rsocket.kotlin.internal.io.*
2324
import io.rsocket.kotlin.logging.*
2425
import io.rsocket.kotlin.transport.*
2526
import kotlinx.coroutines.*
@@ -29,6 +30,7 @@ public class RSocketServer internal constructor(
2930
private val loggerFactory: LoggerFactory,
3031
private val maxFragmentSize: Int,
3132
private val interceptors: Interceptors,
33+
private val bufferPool: BufferPool,
3234
) {
3335

3436
@DelicateCoroutinesApi
@@ -47,7 +49,7 @@ public class RSocketServer internal constructor(
4749
}
4850
}
4951

50-
private suspend fun Connection.bind(acceptor: ConnectionAcceptor): Job = receiveFrame { setupFrame ->
52+
private suspend fun Connection.bind(acceptor: ConnectionAcceptor): Job = receiveFrame(bufferPool) { setupFrame ->
5153
when {
5254
setupFrame !is SetupFrame -> failSetup(RSocketError.Setup.Invalid("Invalid setup frame: ${setupFrame.type}"))
5355
setupFrame.version != Version.Current -> failSetup(RSocketError.Setup.Unsupported("Unsupported version: ${setupFrame.version}"))
@@ -64,7 +66,8 @@ public class RSocketServer internal constructor(
6466
payloadMimeType = setupFrame.payloadMimeType,
6567
setupPayload = setupFrame.payload
6668
),
67-
acceptor = acceptor
69+
acceptor = acceptor,
70+
bufferPool = bufferPool
6871
)
6972
coroutineContext.job
7073
} catch (e: Throwable) {
@@ -75,13 +78,13 @@ public class RSocketServer internal constructor(
7578

7679
@Suppress("SuspendFunctionOnCoroutineScope")
7780
private suspend fun Connection.failSetup(error: RSocketError.Setup): Nothing {
78-
sendFrame(ErrorFrame(0, error))
81+
sendFrame(bufferPool, ErrorFrame(0, error))
7982
cancel("Connection establishment failed", error)
8083
throw error
8184
}
8285

8386
private fun Connection.wrapConnection(): Connection =
8487
interceptors.wrapConnection(this)
85-
.logging(loggerFactory.logger("io.rsocket.kotlin.frame"))
88+
.logging(loggerFactory.logger("io.rsocket.kotlin.frame"), bufferPool)
8689

8790
}

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketServerBuilder.kt

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2022 the original author or authors.
2+
* Copyright 2015-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,6 +17,7 @@
1717
package io.rsocket.kotlin.core
1818

1919
import io.rsocket.kotlin.*
20+
import io.rsocket.kotlin.internal.io.*
2021
import io.rsocket.kotlin.logging.*
2122

2223
public class RSocketServerBuilder internal constructor() {
@@ -30,14 +31,22 @@ public class RSocketServerBuilder internal constructor() {
3031
field = value
3132
}
3233

34+
@Deprecated("Only for tests in rsocket", level = DeprecationLevel.ERROR)
35+
public var bufferPool: BufferPool = BufferPool.Default
36+
3337
private val interceptors: InterceptorsBuilder = InterceptorsBuilder()
3438

3539
public fun interceptors(configure: InterceptorsBuilder.() -> Unit) {
3640
interceptors.configure()
3741
}
3842

3943
@OptIn(RSocketLoggingApi::class)
40-
internal fun build(): RSocketServer = RSocketServer(loggerFactory, maxFragmentSize, interceptors.build())
44+
internal fun build(): RSocketServer = RSocketServer(
45+
loggerFactory,
46+
maxFragmentSize,
47+
interceptors.build(),
48+
@Suppress("DEPRECATION_ERROR") bufferPool
49+
)
4150
}
4251

4352
public fun RSocketServer(configure: RSocketServerBuilder.() -> Unit = {}): RSocketServer {

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/ExtensionFrame.kt

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2022 the original author or authors.
2+
* Copyright 2015-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,9 +17,8 @@
1717
package io.rsocket.kotlin.frame
1818

1919
import io.ktor.utils.io.core.*
20-
import io.ktor.utils.io.core.internal.*
21-
import io.ktor.utils.io.pool.*
2220
import io.rsocket.kotlin.frame.io.*
21+
import io.rsocket.kotlin.internal.io.*
2322
import io.rsocket.kotlin.payload.*
2423

2524
internal class ExtensionFrame(
@@ -49,7 +48,7 @@ internal class ExtensionFrame(
4948
}
5049
}
5150

52-
internal fun ByteReadPacket.readExtension(pool: ObjectPool<ChunkBuffer>, streamId: Int, flags: Int): ExtensionFrame {
51+
internal fun ByteReadPacket.readExtension(pool: BufferPool, streamId: Int, flags: Int): ExtensionFrame {
5352
val extendedType = readInt()
5453
val payload = readPayload(pool, flags)
5554
return ExtensionFrame(streamId, extendedType, payload)

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/Frame.kt

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2022 the original author or authors.
2+
* Copyright 2015-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,9 +17,8 @@
1717
package io.rsocket.kotlin.frame
1818

1919
import io.ktor.utils.io.core.*
20-
import io.ktor.utils.io.core.internal.*
21-
import io.ktor.utils.io.pool.*
2220
import io.rsocket.kotlin.frame.io.*
21+
import io.rsocket.kotlin.internal.io.*
2322

2423
private const val FlagsMask: Int = 1023
2524
private const val FrameTypeShift: Int = 10
@@ -33,9 +32,9 @@ internal sealed class Frame : Closeable {
3332
protected abstract fun StringBuilder.appendFlags()
3433
protected abstract fun StringBuilder.appendSelf()
3534

36-
internal fun toPacket(pool: ObjectPool<ChunkBuffer>): ByteReadPacket {
35+
internal fun toPacket(pool: BufferPool): ByteReadPacket {
3736
check(type.canHaveMetadata || !(flags check Flags.Metadata)) { "bad value for metadata flag" }
38-
return buildPacket(pool) {
37+
return pool.buildPacket {
3938
writeInt(streamId)
4039
writeShort((type.encodedType shl FrameTypeShift or flags).toShort())
4140
writeSelf()
@@ -54,7 +53,7 @@ internal sealed class Frame : Closeable {
5453
}
5554
}
5655

57-
internal fun ByteReadPacket.readFrame(pool: ObjectPool<ChunkBuffer>): Frame = use {
56+
internal fun ByteReadPacket.readFrame(pool: BufferPool): Frame = use {
5857
val streamId = readInt()
5958
val typeAndFlags = readShort().toInt() and 0xFFFF
6059
val flags = typeAndFlags and FlagsMask
@@ -75,9 +74,11 @@ internal fun ByteReadPacket.readFrame(pool: ObjectPool<ChunkBuffer>): Frame = us
7574
FrameType.RequestFnF,
7675
FrameType.RequestResponse,
7776
-> readRequest(pool, type, streamId, flags, withInitial = false)
77+
7878
FrameType.RequestStream,
7979
FrameType.RequestChannel,
8080
-> readRequest(pool, type, streamId, flags, withInitial = true)
81+
8182
FrameType.Reserved -> error("Reserved")
8283
}
8384
}

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/KeepAliveFrame.kt

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2022 the original author or authors.
2+
* Copyright 2015-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,9 +17,8 @@
1717
package io.rsocket.kotlin.frame
1818

1919
import io.ktor.utils.io.core.*
20-
import io.ktor.utils.io.core.internal.*
21-
import io.ktor.utils.io.pool.*
2220
import io.rsocket.kotlin.frame.io.*
21+
import io.rsocket.kotlin.internal.io.*
2322

2423
private const val RespondFlag = 128
2524

@@ -51,7 +50,7 @@ internal class KeepAliveFrame(
5150
}
5251
}
5352

54-
internal fun ByteReadPacket.readKeepAlive(pool: ObjectPool<ChunkBuffer>, flags: Int): KeepAliveFrame {
53+
internal fun ByteReadPacket.readKeepAlive(pool: BufferPool, flags: Int): KeepAliveFrame {
5554
val respond = flags check RespondFlag
5655
val lastPosition = readLong()
5756
val data = readPacket(pool)

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/LeaseFrame.kt

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2022 the original author or authors.
2+
* Copyright 2015-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,9 +17,8 @@
1717
package io.rsocket.kotlin.frame
1818

1919
import io.ktor.utils.io.core.*
20-
import io.ktor.utils.io.core.internal.*
21-
import io.ktor.utils.io.pool.*
2220
import io.rsocket.kotlin.frame.io.*
21+
import io.rsocket.kotlin.internal.io.*
2322

2423
internal class LeaseFrame(
2524
val ttl: Int,
@@ -50,7 +49,7 @@ internal class LeaseFrame(
5049
}
5150
}
5251

53-
internal fun ByteReadPacket.readLease(pool: ObjectPool<ChunkBuffer>, flags: Int): LeaseFrame {
52+
internal fun ByteReadPacket.readLease(pool: BufferPool, flags: Int): LeaseFrame {
5453
val ttl = readInt()
5554
val numberOfRequests = readInt()
5655
val metadata = if (flags check Flags.Metadata) readMetadata(pool) else null

0 commit comments

Comments
 (0)