From 394626437a714aba1d313523f4020bdc7dca6837 Mon Sep 17 00:00:00 2001 From: olme04 Date: Thu, 3 Feb 2022 07:56:53 +0000 Subject: [PATCH] Update code after new MM * remove SharedImmutable annotations * rewrite IntMap to not use atomics on native * remove synchronization from IntMap code at all, and move it to StreamsStorage * remove expect/actual from FrameHandler * drop TrackingSet and move set directly to buffer poll --- .../io/rsocket/kotlin/RequestStrategy.kt | 2 - .../kotlin/io/rsocket/kotlin/frame/io/Dump.kt | 4 - .../kotlin/internal/CloseOperations.kt | 2 - .../io/rsocket/kotlin/internal/IntMap.kt | 358 +++++++++--------- .../io/rsocket/kotlin/internal/Prioritizer.kt | 2 - .../rsocket/kotlin/internal/StreamsStorage.kt | 56 +-- .../kotlin/internal/handler/FrameHandler.kt | 17 +- .../io/rsocket/kotlin/keepalive/KeepAlive.kt | 2 - .../rsocket/kotlin/payload/PayloadMimeType.kt | 2 - .../io/rsocket/kotlin/internal/IntMap.kt | 59 --- .../kotlin/internal/handler/FrameHandler.kt | 30 -- .../io/rsocket/kotlin/internal/IntMap.kt | 59 --- .../kotlin/internal/handler/FrameHandler.kt | 30 -- .../io/rsocket/kotlin/internal/IntMap.kt | 51 --- .../kotlin/internal/handler/FrameHandler.kt | 31 -- .../rsocket/kotlin/test/InUseTrackingPool.kt | 34 +- .../io/rsocket/kotlin/test/Test.common.kt | 2 + .../io/rsocket/kotlin/test/TrackingSet.kt | 55 --- .../kotlin/io/rsocket/kotlin/test/MSet.kt | 27 -- .../kotlin/io/rsocket/kotlin/test/Test.kt | 2 + .../kotlin/io/rsocket/kotlin/test/MSet.kt | 27 -- .../kotlin/io/rsocket/kotlin/test/Test.kt | 2 + .../kotlin/io/rsocket/kotlin/test/MSet.kt | 170 --------- .../kotlin/io/rsocket/kotlin/test/Test.kt | 3 + 24 files changed, 252 insertions(+), 775 deletions(-) delete mode 100644 rsocket-core/src/jsMain/kotlin/io/rsocket/kotlin/internal/IntMap.kt delete mode 100644 rsocket-core/src/jsMain/kotlin/io/rsocket/kotlin/internal/handler/FrameHandler.kt delete mode 100644 rsocket-core/src/jvmMain/kotlin/io/rsocket/kotlin/internal/IntMap.kt delete mode 100644 rsocket-core/src/jvmMain/kotlin/io/rsocket/kotlin/internal/handler/FrameHandler.kt delete mode 100644 rsocket-core/src/nativeMain/kotlin/io/rsocket/kotlin/internal/IntMap.kt delete mode 100644 rsocket-core/src/nativeMain/kotlin/io/rsocket/kotlin/internal/handler/FrameHandler.kt delete mode 100644 rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/TrackingSet.kt delete mode 100644 rsocket-test/src/jsMain/kotlin/io/rsocket/kotlin/test/MSet.kt delete mode 100644 rsocket-test/src/jvmMain/kotlin/io/rsocket/kotlin/test/MSet.kt delete mode 100644 rsocket-test/src/nativeMain/kotlin/io/rsocket/kotlin/test/MSet.kt diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/RequestStrategy.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/RequestStrategy.kt index a18c8047d..b24bc0e9b 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/RequestStrategy.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/RequestStrategy.kt @@ -19,9 +19,7 @@ package io.rsocket.kotlin import kotlinx.atomicfu.* import kotlinx.coroutines.channels.* import kotlin.coroutines.* -import kotlin.native.concurrent.* -@SharedImmutable @ExperimentalStreamsApi private val DefaultStrategy: RequestStrategy = PrefetchStrategy(64, 16) diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/io/Dump.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/io/Dump.kt index 2682eb34a..3b48f70ad 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/io/Dump.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/io/Dump.kt @@ -18,15 +18,11 @@ package io.rsocket.kotlin.frame.io import io.ktor.utils.io.core.* import io.rsocket.kotlin.payload.* -import kotlin.native.concurrent.* -@SharedImmutable private val digits = "0123456789abcdef".toCharArray() -@SharedImmutable private const val divider = "+--------+-------------------------------------------------+----------------+" -@SharedImmutable private const val header = """ +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/CloseOperations.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/CloseOperations.kt index cfc79dcac..617c9dc12 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/CloseOperations.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/CloseOperations.kt @@ -18,7 +18,6 @@ package io.rsocket.kotlin.internal import io.ktor.utils.io.core.* import kotlinx.coroutines.channels.* -import kotlin.native.concurrent.* internal inline fun T.closeOnError(block: (T) -> R): R { try { @@ -29,7 +28,6 @@ internal inline fun T.closeOnError(block: (T) -> R): R { } } -@SharedImmutable private val onUndeliveredCloseable: (Closeable) -> Unit = Closeable::close @Suppress("FunctionName") diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/IntMap.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/IntMap.kt index 640596ea8..f8a3c8ee1 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/IntMap.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/IntMap.kt @@ -25,223 +25,219 @@ private fun safeFindNextPositivePowerOfTwo(value: Int): Int = when { else -> 1 shl 32 - (value - 1).countLeadingZeroBits() } +//TODO decide, is it needed, or can be replaced by simple map, or concurrent map on JVM? +// do benchmarks /** - * Synchronized IntMap implementation based on Netty IntObjectHashMap. - * Uses atomics on K/N to support mutation. - * On JVM and JS no atomics used. + * IntMap implementation based on Netty IntObjectHashMap. */ -internal class IntMap(initialCapacity: Int = 8, private val loadFactor: Float = 0.5f) : SynchronizedObject() { +internal class IntMap( + initialCapacity: Int = 8, + private val loadFactor: Float = 0.5f +) : SynchronizedObject() { init { require(loadFactor > 0.0f && loadFactor <= 1.0f) { "loadFactor must be > 0 and <= 1" } } - private val store = MutableValue(MapState(0, safeFindNextPositivePowerOfTwo(initialCapacity))) + var size: Int = 0 + private set - val size: Int get() = store.value.store.size + private var capacity: Int = safeFindNextPositivePowerOfTwo(initialCapacity) + private var mask: Int = capacity - 1 - private inline fun sync(block: MapState.() -> T): T = synchronized(this) { block(store.value) } + // Clip the upper bound so that there will always be at least one available slot. + private var maxSize = min(mask, (capacity * loadFactor).toInt()) + private var keys: IntArray = IntArray(capacity) - operator fun get(key: Int): V? = sync { get(key) } - operator fun set(key: Int, value: V): V? = sync { setAndGrow(key, value, this@IntMap.store.update) } - fun remove(key: Int): V? = sync { remove(key) } - fun clear() = sync { clear() } - operator fun contains(key: Int): Boolean = sync { contains(key) } - fun values(): List = sync { values() } - fun keys(): Set = sync { keys() } + @Suppress("UNCHECKED_CAST") + private var values: Array = arrayOfNulls(capacity) as Array - private inner class MapState(size: Int, private val capacity: Int) { - private val mask = capacity - 1 + //called when capacity is updated + private fun init() { + mask = capacity - 1 // Clip the upper bound so that there will always be at least one available slot. - private val maxSize = min(mask, (capacity * loadFactor).toInt()) + maxSize = min(mask, (capacity * loadFactor).toInt()) + keys = IntArray(capacity) - val store: ValueStore = ValueStore(size, capacity) - - operator fun contains(key: Int): Boolean = indexOf(key) >= 0 - - operator fun get(key: Int): V? { - val index = indexOf(key) - if (index == -1) return null - return store.value(index) - } + @Suppress("UNCHECKED_CAST") + values = arrayOfNulls(capacity) as Array + } - fun remove(key: Int): V? { - val index = indexOf(key) - if (index == -1) return null - val prev = store.value(index) - removeAt(index) - return prev - } + operator fun contains(key: Int): Boolean = indexOf(key) >= 0 - fun setAndGrow(key: Int, value: V, grow: (newState: MapState) -> Unit): V? { - val startIndex = hashIndex(key) - var index = startIndex - while (true) { - if (store.value(index) == null) { - // Found empty slot, use it. - set(index, key, value) - growSize(grow) - return null - } - if (store.key(index) == key) { - // Found existing entry with this key, just replace the value. - val previousValue = store.value(index) - store.setValue(index, value) - return previousValue - } + operator fun get(key: Int): V? { + val index = indexOf(key) + if (index == -1) return null + return values[index] + } - // Conflict, keep probing ... - index = probeNext(index) + fun remove(key: Int): V? { + val index = indexOf(key) + if (index == -1) return null + val prev = values[index] + removeAt(index) + return prev + } - // Can only happen if the map was full at MAX_ARRAY_SIZE and couldn't grow. - check(index != startIndex) { "Unable to insert" } + operator fun set(key: Int, value: V): V? { + val startIndex = hashIndex(key) + var index = startIndex + while (true) { + if (values[index] == null) { + // Found empty slot, use it. + set(index, key, value) + grow() + return null + } + if (keys[index] == key) { + // Found existing entry with this key, just replace the value. + val previousValue = values[index] + values[index] = value + return previousValue } - } - fun clear() { - repeat(capacity, this::clear) - store.clearSize() - } + // Conflict, keep probing ... + index = probeNext(index) - fun values(): List { - val list = mutableListOf() - repeat(capacity) { - store.value(it)?.let(list::add) - } - return list.toList() + // Can only happen if the map was full at MAX_ARRAY_SIZE and couldn't grow. + check(index != startIndex) { "Unable to insert" } } + } - fun keys(): Set { - val set = mutableSetOf() - repeat(capacity) { - val key = store.key(it) - if (this@MapState.contains(key)) set.add(key) - } - return set.toSet() + fun clear() { + repeat(capacity, this::clear) + size = 0 + } + + fun values(): List { + val list = mutableListOf() + repeat(capacity) { + values[it]?.let(list::add) } + return list.toList() + } - private fun set(index: Int, key: Int, value: V?) { - store.setKey(index, key) - store.setValue(index, value) + fun keys(): Set { + val set = mutableSetOf() + repeat(capacity) { + val key = keys[it] + if (contains(key)) set.add(key) } + return set.toSet() + } - private fun clear(index: Int): Unit = set(index, 0, null) - - /** - * Returns the hashed index for the given key. - * The array lengths are always a power of two, so we can use a bitmask to stay inside the array bounds. - */ - private fun hashIndex(key: Int): Int = key and mask - - /** - * Get the next sequential index after `index` and wraps if necessary. - * The array lengths are always a power of two, so we can use a bitmask to stay inside the array bounds. - */ - private fun probeNext(index: Int): Int = index + 1 and mask - - /** - * Locates the index for the given key. This method probes using double hashing. - * - * @param key the key for an entry in the map. - * @return the index where the key was found, or `-1` if no entry is found for that key. - */ - private fun indexOf(key: Int): Int { - val startIndex = hashIndex(key) - var index = startIndex - while (true) { - // It's available, so no chance that this value exists anywhere in the map. - if (store.value(index) == null) return -1 - if (store.key(index) == key) return index + private fun set(index: Int, key: Int, value: V?) { + keys[index] = key + values[index] = value + } - // Conflict, keep probing ... - index = probeNext(index) - if (index == startIndex) return -1 - } + private fun clear(index: Int): Unit = set(index, 0, null) + + /** + * Returns the hashed index for the given key. + * The array lengths are always a power of two, so we can use a bitmask to stay inside the array bounds. + */ + private fun hashIndex(key: Int): Int = key and mask + + /** + * Get the next sequential index after `index` and wraps if necessary. + * The array lengths are always a power of two, so we can use a bitmask to stay inside the array bounds. + */ + private fun probeNext(index: Int): Int = index + 1 and mask + + /** + * Locates the index for the given key. This method probes using double hashing. + * + * @param key the key for an entry in the map. + * @return the index where the key was found, or `-1` if no entry is found for that key. + */ + private fun indexOf(key: Int): Int { + val startIndex = hashIndex(key) + var index = startIndex + while (true) { + // It's available, so no chance that this value exists anywhere in the map. + if (values[index] == null) return -1 + if (keys[index] == key) return index + + // Conflict, keep probing ... + index = probeNext(index) + if (index == startIndex) return -1 } + } - /** - * Removes entry at the given index position. Also performs opportunistic, incremental rehashing - * if necessary to not break conflict chains. - * - * @param index the index position of the element to remove. - * @return `true` if the next item was moved back. `false` otherwise. - */ - private fun removeAt(index: Int): Boolean { - store.decrementSize() - // Clearing the key is not strictly necessary (for GC like in a regular collection), - // but recommended for security. The memory location is still fresh in the cache anyway. - clear(index) - - // In the interval from index to the next available entry, the arrays may have entries - // that are displaced from their base position due to prior conflicts. Iterate these - // entries and move them back if possible, optimizing future lookups. - // Knuth Section 6.4 Algorithm R, also used by the JDK's IdentityHashMap. - var nextFree = index - var i = probeNext(index) - var value = store.value(i) - while (value != null) { - val key = store.key(i) - val bucket = hashIndex(key) - if (i < bucket && (bucket <= nextFree || nextFree <= i) || bucket <= nextFree && nextFree <= i) { - // Move the displaced entry "back" to the first available position. - set(nextFree, key, value) - // Put the first entry after the displaced entry - clear(i) - nextFree = i - } - i = probeNext(i) - value = store.value(i) + /** + * Removes entry at the given index position. Also performs opportunistic, incremental rehashing + * if necessary to not break conflict chains. + * + * @param index the index position of the element to remove. + * @return `true` if the next item was moved back. `false` otherwise. + */ + private fun removeAt(index: Int): Boolean { + size -= 1 + // Clearing the key is not strictly necessary (for GC like in a regular collection), + // but recommended for security. The memory location is still fresh in the cache anyway. + clear(index) + + // In the interval from index to the next available entry, the arrays may have entries + // that are displaced from their base position due to prior conflicts. Iterate these + // entries and move them back if possible, optimizing future lookups. + // Knuth Section 6.4 Algorithm R, also used by the JDK's IdentityHashMap. + var nextFree = index + var i = probeNext(index) + var value = values[i] + while (value != null) { + val key = keys[i] + val bucket = hashIndex(key) + if (i < bucket && (bucket <= nextFree || nextFree <= i) || bucket <= nextFree && nextFree <= i) { + // Move the displaced entry "back" to the first available position. + set(nextFree, key, value) + // Put the first entry after the displaced entry + clear(i) + nextFree = i } - return nextFree != index + i = probeNext(i) + value = values[i] } + return nextFree != index + } - /** - * Grows the map size after an insertion. If necessary, performs a rehash of the map. - */ - private fun growSize(grow: (newState: MapState) -> Unit) { - val size = store.incrementSize() - if (size <= maxSize) return + /** + * Grows the map size after an insertion. If necessary, performs a rehash of the map. + */ + private fun grow() { + size += 1 + if (size <= maxSize) return - check(capacity != Int.MAX_VALUE) { "Max capacity reached at size=$size" } - grow(rehash()) - } + check(capacity != Int.MAX_VALUE) { "Max capacity reached at size=$size" } + rehash() + } - /** - * Rehashes the map for the given capacity. - * Double the capacity. - */ - private fun rehash(): MapState = MapState(store.size, capacity shl 1).also { newState -> - // Insert to the new arrays. - repeat(capacity) { - val oldValue = store.value(it) ?: return@repeat - val oldKey = store.key(it) - var index = newState.hashIndex(oldKey) - while (true) { - if (newState.store.value(index) == null) { - newState.set(index, oldKey, oldValue) - break - } - // Conflict, keep probing. Can wrap around, but never reaches startIndex again. - index = newState.probeNext(index) + /** + * Rehashes the map for the given capacity. + * Double the capacity. + */ + private fun rehash() { + + val oldCapacity = capacity + val oldValues = values + val oldKeys = keys + + capacity = capacity shl 1 + init() + + // Insert to the new arrays. + repeat(oldCapacity) { + val oldValue = oldValues[it] ?: return@repeat + val oldKey = oldKeys[it] + var index = hashIndex(oldKey) + while (true) { + if (values[index] == null) { + set(index, oldKey, oldValue) + break } + // Conflict, keep probing. Can wrap around, but never reaches startIndex again. + index = probeNext(index) } - clear() } } } - -internal expect class MutableValue(initial: V) { - val value: V - val update: (V) -> Unit -} - -internal expect class ValueStore(size: Int, capacity: Int) { - val size: Int - fun key(index: Int): Int - fun value(index: Int): V? - fun setKey(index: Int, key: Int) - fun setValue(index: Int, value: V?) - fun incrementSize(): Int - fun decrementSize(): Int - fun clearSize() -} diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/Prioritizer.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/Prioritizer.kt index 63fbd64eb..758c47ace 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/Prioritizer.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/Prioritizer.kt @@ -20,9 +20,7 @@ import io.rsocket.kotlin.frame.* import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import kotlinx.coroutines.selects.* -import kotlin.native.concurrent.* -@SharedImmutable private val selectFrame: suspend (Frame) -> Frame = { it } internal class Prioritizer { diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/StreamsStorage.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/StreamsStorage.kt index 1d422a5d0..f69e29f5f 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/StreamsStorage.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/StreamsStorage.kt @@ -20,28 +20,27 @@ import io.ktor.utils.io.core.internal.* import io.ktor.utils.io.pool.* import io.rsocket.kotlin.frame.* import io.rsocket.kotlin.internal.handler.* +import kotlinx.atomicfu.locks.* -internal class StreamsStorage(private val isServer: Boolean, private val pool: ObjectPool) { +internal class StreamsStorage( + private val isServer: Boolean, + private val pool: ObjectPool +) : SynchronizedObject() { private val streamId: StreamId = StreamId(isServer) private val handlers: IntMap = IntMap() - fun nextId(): Int = streamId.next(handlers) - - fun save(id: Int, handler: FrameHandler) { - handlers[id] = handler - } - - fun remove(id: Int): FrameHandler? { - return handlers.remove(id)?.also(FrameHandler::close) - } - - fun contains(id: Int): Boolean { - return id in handlers - } + fun nextId(): Int = synchronized(this) { streamId.next(handlers) } + fun save(id: Int, handler: FrameHandler) = synchronized(this) { handlers[id] = handler } + fun remove(id: Int): FrameHandler? = synchronized(this) { handlers.remove(id) }?.also(FrameHandler::close) + fun contains(id: Int): Boolean = synchronized(this) { id in handlers } + private fun get(id: Int): FrameHandler? = synchronized(this) { handlers[id] } fun cleanup(error: Throwable?) { - val values = handlers.values() - handlers.clear() + val values = synchronized(this) { + val values = handlers.values() + handlers.clear() + values + } values.forEach { it.cleanup(error) it.close() @@ -51,19 +50,32 @@ internal class StreamsStorage(private val isServer: Boolean, private val pool: O fun handleFrame(frame: Frame, responder: RSocketResponder) { val id = frame.streamId when (frame) { - is RequestNFrame -> handlers[id]?.handleRequestN(frame.requestN) - is CancelFrame -> handlers[id]?.handleCancel() - is ErrorFrame -> handlers[id]?.handleError(frame.throwable) + is RequestNFrame -> get(id)?.handleRequestN(frame.requestN) + is CancelFrame -> get(id)?.handleCancel() + is ErrorFrame -> get(id)?.handleError(frame.throwable) is RequestFrame -> when { - frame.type == FrameType.Payload -> handlers[id]?.handleRequest(frame) ?: frame.close() // release on unknown stream id + frame.type == FrameType.Payload -> get(id)?.handleRequest(frame) + ?: frame.close() // release on unknown stream id isServer.xor(id % 2 != 0) -> frame.close() // request frame on wrong stream id else -> { val initialRequest = frame.initialRequest val handler = when (frame.type) { FrameType.RequestFnF -> ResponderFireAndForgetFrameHandler(id, this, responder, pool) FrameType.RequestResponse -> ResponderRequestResponseFrameHandler(id, this, responder, pool) - FrameType.RequestStream -> ResponderRequestStreamFrameHandler(id, this, responder, initialRequest, pool) - FrameType.RequestChannel -> ResponderRequestChannelFrameHandler(id, this, responder, initialRequest, pool) + FrameType.RequestStream -> ResponderRequestStreamFrameHandler( + id, + this, + responder, + initialRequest, + pool + ) + FrameType.RequestChannel -> ResponderRequestChannelFrameHandler( + id, + this, + responder, + initialRequest, + pool + ) else -> error("Wrong request frame type") // should never happen } save(id, handler) diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/FrameHandler.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/FrameHandler.kt index 9e828fa9c..228f71f64 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/FrameHandler.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/FrameHandler.kt @@ -26,7 +26,7 @@ import kotlinx.coroutines.* internal abstract class FrameHandler(pool: ObjectPool) : Closeable { private val data = BytePacketBuilder(pool) private val metadata = BytePacketBuilder(pool) - protected abstract var hasMetadata: Boolean + private var hasMetadata: Boolean = false fun handleRequest(frame: RequestFrame) { if (frame.next || frame.type.isRequestType) handleNextFragment(frame) @@ -73,7 +73,7 @@ internal interface SendFrameHandler { fun onSendFailed(cause: Throwable): Boolean // if true, then request is failed } -internal abstract class BaseRequesterFrameHandler(pool: ObjectPool) : FrameHandler(pool), ReceiveFrameHandler { +internal abstract class RequesterFrameHandler(pool: ObjectPool) : FrameHandler(pool), ReceiveFrameHandler { override fun handleCancel() { //should be called only for RC } @@ -83,8 +83,8 @@ internal abstract class BaseRequesterFrameHandler(pool: ObjectPool) } } -internal abstract class BaseResponderFrameHandler(pool: ObjectPool) : FrameHandler(pool), SendFrameHandler { - protected abstract var job: Job? +internal abstract class ResponderFrameHandler(pool: ObjectPool) : FrameHandler(pool), SendFrameHandler { + protected var job: Job? = null protected abstract fun start(payload: Payload): Job @@ -105,12 +105,3 @@ internal abstract class BaseResponderFrameHandler(pool: ObjectPool) //should be called only for RC } } - -internal expect abstract class ResponderFrameHandler(pool: ObjectPool) : BaseResponderFrameHandler { - override var job: Job? - override var hasMetadata: Boolean -} - -internal expect abstract class RequesterFrameHandler(pool: ObjectPool) : BaseRequesterFrameHandler { - override var hasMetadata: Boolean -} diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/keepalive/KeepAlive.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/keepalive/KeepAlive.kt index 9c17fc1b9..afdc6ad3b 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/keepalive/KeepAlive.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/keepalive/KeepAlive.kt @@ -16,7 +16,6 @@ package io.rsocket.kotlin.keepalive -import kotlin.native.concurrent.* import kotlin.time.* import kotlin.time.Duration.Companion.seconds @@ -33,7 +32,6 @@ public class KeepAlive( public val maxLifetimeMillis: Int = 90 * 1000 // 90 seconds ) -@SharedImmutable internal val DefaultKeepAlive = KeepAlive( intervalMillis = 20 * 1000, // 20 seconds maxLifetimeMillis = 90 * 1000 // 90 seconds diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/payload/PayloadMimeType.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/payload/PayloadMimeType.kt index d1ba3162c..f29f25ed5 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/payload/PayloadMimeType.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/payload/PayloadMimeType.kt @@ -18,7 +18,6 @@ package io.rsocket.kotlin.payload import io.rsocket.kotlin.core.* import io.rsocket.kotlin.frame.io.* -import kotlin.native.concurrent.* public fun PayloadMimeType( data: MimeTypeWithName, @@ -35,7 +34,6 @@ public class PayloadMimeType( } } -@SharedImmutable internal val DefaultPayloadMimeType = PayloadMimeType( data = WellKnownMimeType.ApplicationOctetStream, metadata = WellKnownMimeType.ApplicationOctetStream diff --git a/rsocket-core/src/jsMain/kotlin/io/rsocket/kotlin/internal/IntMap.kt b/rsocket-core/src/jsMain/kotlin/io/rsocket/kotlin/internal/IntMap.kt deleted file mode 100644 index c033c962e..000000000 --- a/rsocket-core/src/jsMain/kotlin/io/rsocket/kotlin/internal/IntMap.kt +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Copyright 2015-2020 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.rsocket.kotlin.internal - -internal actual class MutableValue actual constructor(initial: V) { - private var _value = initial - actual val value: V get() = _value - actual val update: (V) -> Unit = { _value = it } -} - -internal actual class ValueStore actual constructor(size: Int, capacity: Int) { - private var _size = size - private val keys: IntArray = IntArray(capacity) - - @Suppress("UNCHECKED_CAST") - private val values: Array = arrayOfNulls(capacity) as Array - - actual val size: Int get() = _size - - actual fun key(index: Int): Int = keys[index] - - actual fun value(index: Int): V? = values[index] - - actual fun setKey(index: Int, key: Int) { - keys[index] = key - } - - actual fun setValue(index: Int, value: V?) { - values[index] = value - } - - actual fun incrementSize(): Int { - _size += 1 - return _size - } - - actual fun decrementSize(): Int { - _size -= 1 - return _size - } - - actual fun clearSize() { - _size = 0 - } -} diff --git a/rsocket-core/src/jsMain/kotlin/io/rsocket/kotlin/internal/handler/FrameHandler.kt b/rsocket-core/src/jsMain/kotlin/io/rsocket/kotlin/internal/handler/FrameHandler.kt deleted file mode 100644 index 4e40aa98d..000000000 --- a/rsocket-core/src/jsMain/kotlin/io/rsocket/kotlin/internal/handler/FrameHandler.kt +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Copyright 2015-2020 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.rsocket.kotlin.internal.handler - -import io.ktor.utils.io.core.internal.* -import io.ktor.utils.io.pool.* -import kotlinx.coroutines.* - -internal actual abstract class ResponderFrameHandler actual constructor(pool: ObjectPool) : BaseResponderFrameHandler(pool) { - actual override var job: Job? = null - actual override var hasMetadata: Boolean = false -} - -internal actual abstract class RequesterFrameHandler actual constructor(pool: ObjectPool) : BaseRequesterFrameHandler(pool) { - actual override var hasMetadata: Boolean = false -} diff --git a/rsocket-core/src/jvmMain/kotlin/io/rsocket/kotlin/internal/IntMap.kt b/rsocket-core/src/jvmMain/kotlin/io/rsocket/kotlin/internal/IntMap.kt deleted file mode 100644 index c033c962e..000000000 --- a/rsocket-core/src/jvmMain/kotlin/io/rsocket/kotlin/internal/IntMap.kt +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Copyright 2015-2020 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.rsocket.kotlin.internal - -internal actual class MutableValue actual constructor(initial: V) { - private var _value = initial - actual val value: V get() = _value - actual val update: (V) -> Unit = { _value = it } -} - -internal actual class ValueStore actual constructor(size: Int, capacity: Int) { - private var _size = size - private val keys: IntArray = IntArray(capacity) - - @Suppress("UNCHECKED_CAST") - private val values: Array = arrayOfNulls(capacity) as Array - - actual val size: Int get() = _size - - actual fun key(index: Int): Int = keys[index] - - actual fun value(index: Int): V? = values[index] - - actual fun setKey(index: Int, key: Int) { - keys[index] = key - } - - actual fun setValue(index: Int, value: V?) { - values[index] = value - } - - actual fun incrementSize(): Int { - _size += 1 - return _size - } - - actual fun decrementSize(): Int { - _size -= 1 - return _size - } - - actual fun clearSize() { - _size = 0 - } -} diff --git a/rsocket-core/src/jvmMain/kotlin/io/rsocket/kotlin/internal/handler/FrameHandler.kt b/rsocket-core/src/jvmMain/kotlin/io/rsocket/kotlin/internal/handler/FrameHandler.kt deleted file mode 100644 index 4e40aa98d..000000000 --- a/rsocket-core/src/jvmMain/kotlin/io/rsocket/kotlin/internal/handler/FrameHandler.kt +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Copyright 2015-2020 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.rsocket.kotlin.internal.handler - -import io.ktor.utils.io.core.internal.* -import io.ktor.utils.io.pool.* -import kotlinx.coroutines.* - -internal actual abstract class ResponderFrameHandler actual constructor(pool: ObjectPool) : BaseResponderFrameHandler(pool) { - actual override var job: Job? = null - actual override var hasMetadata: Boolean = false -} - -internal actual abstract class RequesterFrameHandler actual constructor(pool: ObjectPool) : BaseRequesterFrameHandler(pool) { - actual override var hasMetadata: Boolean = false -} diff --git a/rsocket-core/src/nativeMain/kotlin/io/rsocket/kotlin/internal/IntMap.kt b/rsocket-core/src/nativeMain/kotlin/io/rsocket/kotlin/internal/IntMap.kt deleted file mode 100644 index fef058edc..000000000 --- a/rsocket-core/src/nativeMain/kotlin/io/rsocket/kotlin/internal/IntMap.kt +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Copyright 2015-2020 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.rsocket.kotlin.internal - -import kotlinx.atomicfu.* - -internal actual class MutableValue actual constructor(initial: V) { - private val _value = atomic(initial) - actual val value: V get() = _value.value - actual val update: (V) -> Unit = { _value.value = it } -} - -internal actual class ValueStore actual constructor(size: Int, capacity: Int) { - private val _size: AtomicInt = atomic(size) - private val keys: AtomicIntArray = AtomicIntArray(capacity) - private val values: AtomicArray = atomicArrayOfNulls(capacity) - - actual val size: Int get() = _size.value - - actual fun key(index: Int): Int = keys[index].value - - actual fun value(index: Int): V? = values[index].value - - actual fun setKey(index: Int, key: Int) { - keys[index].value = key - } - - actual fun setValue(index: Int, value: V?) { - values[index].value = value - } - - actual fun incrementSize(): Int = _size.incrementAndGet() - actual fun decrementSize(): Int = _size.decrementAndGet() - actual fun clearSize() { - _size.value = 0 - } -} diff --git a/rsocket-core/src/nativeMain/kotlin/io/rsocket/kotlin/internal/handler/FrameHandler.kt b/rsocket-core/src/nativeMain/kotlin/io/rsocket/kotlin/internal/handler/FrameHandler.kt deleted file mode 100644 index a27c3f94d..000000000 --- a/rsocket-core/src/nativeMain/kotlin/io/rsocket/kotlin/internal/handler/FrameHandler.kt +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Copyright 2015-2020 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.rsocket.kotlin.internal.handler - -import io.ktor.utils.io.core.internal.* -import io.ktor.utils.io.pool.* -import kotlinx.atomicfu.* -import kotlinx.coroutines.* - -internal actual abstract class ResponderFrameHandler actual constructor(pool: ObjectPool) : BaseResponderFrameHandler(pool) { - actual override var job: Job? by atomic(null) - actual override var hasMetadata: Boolean by atomic(false) -} - -internal actual abstract class RequesterFrameHandler actual constructor(pool: ObjectPool) : BaseRequesterFrameHandler(pool) { - actual override var hasMetadata: Boolean by atomic(false) -} diff --git a/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/InUseTrackingPool.kt b/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/InUseTrackingPool.kt index 865c242fc..cd6f0119a 100644 --- a/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/InUseTrackingPool.kt +++ b/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/InUseTrackingPool.kt @@ -20,20 +20,25 @@ import io.ktor.utils.io.bits.* import io.ktor.utils.io.core.* import io.ktor.utils.io.core.internal.* import io.ktor.utils.io.pool.* +import kotlinx.atomicfu.locks.* import kotlin.test.* -object InUseTrackingPool : ObjectPool { +object InUseTrackingPool : ObjectPool, SynchronizedObject() { override val capacity: Int get() = BufferPool.capacity - private val inUse = TrackingSet() + private val inUse = mutableSetOf() override fun borrow(): ChunkBuffer { val instance = BufferPool.borrow() - inUse.add(instance) + synchronized(this) { + check(inUse.add(IdentityWrapper(instance, Throwable()))) + } return instance } override fun recycle(instance: ChunkBuffer) { - inUse.remove(instance) + synchronized(this) { + check(inUse.remove(IdentityWrapper(instance, null))) + } BufferPool.recycle(instance) } @@ -42,11 +47,16 @@ object InUseTrackingPool : ObjectPool { } fun resetInUse() { - inUse.clear() + synchronized(this) { + inUse.clear() + } } fun assertNoInUse() { - val traces = inUse.traces() ?: return + val traces = synchronized(this) { + inUse.mapNotNull(IdentityWrapper::throwable) + } + if (traces.isEmpty()) return fail(traceMessage(traces)) } @@ -111,6 +121,18 @@ object InUseTrackingPool : ObjectPool { } } +private class IdentityWrapper( + private val instance: ChunkBuffer, + val throwable: Throwable? +) { + override fun equals(other: Any?): Boolean { + if (other !is IdentityWrapper) return false + return other.instance === this.instance + } + + override fun hashCode(): Int = identityHashCode(instance) +} + //TODO leak tracking don't work on JS in SuspendTest // due to `hack`, that to run `suspend` tests on JS, we return `Promise`, for which kotlin JS test runner will subscribe // in such cases `AfterTest` will be called just after `Promise` returned, and not when it resolved diff --git a/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/Test.common.kt b/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/Test.common.kt index 75e6edeb8..09a32c34a 100644 --- a/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/Test.common.kt +++ b/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/Test.common.kt @@ -24,3 +24,5 @@ internal expect fun runTest(ignoreNative: Boolean, block: suspend CoroutineScope expect val anotherDispatcher: CoroutineDispatcher expect val TestLoggerFactory: LoggerFactory + +expect fun identityHashCode(instance: Any): Int diff --git a/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/TrackingSet.kt b/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/TrackingSet.kt deleted file mode 100644 index b91afd719..000000000 --- a/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/TrackingSet.kt +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Copyright 2015-2020 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.rsocket.kotlin.test - -import io.ktor.utils.io.core.internal.* -import kotlinx.atomicfu.locks.* - -class TrackingSet : SynchronizedObject() { - private val set = MSet() - private inline fun sync(block: MSet.() -> T): T = synchronized(this) { block(set) } - - fun add(element: ChunkBuffer) = sync { check(add(IdentityWrapper(element, Throwable()))) } - fun remove(element: ChunkBuffer) = sync { check(remove(IdentityWrapper(element, null))) } - fun clear() = sync { clear() } - fun traces(): List? { - val list = sync { values() } - if (list.isEmpty()) return null - return list.mapNotNull(IdentityWrapper::throwable) - } -} - -private class IdentityWrapper( - private val instance: ChunkBuffer, - val throwable: Throwable? -) { - override fun equals(other: Any?): Boolean { - if (other !is IdentityWrapper) return false - return other.instance === this.instance - } - - override fun hashCode(): Int = identityHashCode(instance) -} - -expect fun identityHashCode(instance: Any): Int - -expect class MSet() { - fun add(element: T): Boolean - fun remove(element: T): Boolean - fun clear() - fun values(): List -} diff --git a/rsocket-test/src/jsMain/kotlin/io/rsocket/kotlin/test/MSet.kt b/rsocket-test/src/jsMain/kotlin/io/rsocket/kotlin/test/MSet.kt deleted file mode 100644 index db3568e41..000000000 --- a/rsocket-test/src/jsMain/kotlin/io/rsocket/kotlin/test/MSet.kt +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Copyright 2015-2020 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.rsocket.kotlin.test - -actual fun identityHashCode(instance: Any): Int = instance.hashCode() - -actual class MSet actual constructor() { - private val delegate = mutableSetOf() - actual fun add(element: T): Boolean = delegate.add(element) - actual fun remove(element: T): Boolean = delegate.remove(element) - actual fun clear(): Unit = delegate.clear() - actual fun values(): List = delegate.toList() -} diff --git a/rsocket-test/src/jsMain/kotlin/io/rsocket/kotlin/test/Test.kt b/rsocket-test/src/jsMain/kotlin/io/rsocket/kotlin/test/Test.kt index 3622c7ba2..b8c212ae1 100644 --- a/rsocket-test/src/jsMain/kotlin/io/rsocket/kotlin/test/Test.kt +++ b/rsocket-test/src/jsMain/kotlin/io/rsocket/kotlin/test/Test.kt @@ -28,3 +28,5 @@ internal actual fun runTest( actual val anotherDispatcher: CoroutineDispatcher get() = Dispatchers.Default actual val TestLoggerFactory: LoggerFactory = ConsoleLogger + +actual fun identityHashCode(instance: Any): Int = instance.hashCode() diff --git a/rsocket-test/src/jvmMain/kotlin/io/rsocket/kotlin/test/MSet.kt b/rsocket-test/src/jvmMain/kotlin/io/rsocket/kotlin/test/MSet.kt deleted file mode 100644 index c2b743da1..000000000 --- a/rsocket-test/src/jvmMain/kotlin/io/rsocket/kotlin/test/MSet.kt +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Copyright 2015-2020 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.rsocket.kotlin.test - -actual fun identityHashCode(instance: Any): Int = System.identityHashCode(instance) - -actual class MSet actual constructor() { - private val delegate = mutableSetOf() - actual fun add(element: T): Boolean = delegate.add(element) - actual fun remove(element: T): Boolean = delegate.remove(element) - actual fun clear(): Unit = delegate.clear() - actual fun values(): List = delegate.toList() -} diff --git a/rsocket-test/src/jvmMain/kotlin/io/rsocket/kotlin/test/Test.kt b/rsocket-test/src/jvmMain/kotlin/io/rsocket/kotlin/test/Test.kt index 3e3a423c8..f7d295ace 100644 --- a/rsocket-test/src/jvmMain/kotlin/io/rsocket/kotlin/test/Test.kt +++ b/rsocket-test/src/jvmMain/kotlin/io/rsocket/kotlin/test/Test.kt @@ -37,3 +37,5 @@ actual val TestLoggerFactory: LoggerFactory = run { JavaLogger } + +actual fun identityHashCode(instance: Any): Int = System.identityHashCode(instance) diff --git a/rsocket-test/src/nativeMain/kotlin/io/rsocket/kotlin/test/MSet.kt b/rsocket-test/src/nativeMain/kotlin/io/rsocket/kotlin/test/MSet.kt deleted file mode 100644 index fe3d53b0e..000000000 --- a/rsocket-test/src/nativeMain/kotlin/io/rsocket/kotlin/test/MSet.kt +++ /dev/null @@ -1,170 +0,0 @@ -/* - * Copyright 2015-2020 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.rsocket.kotlin.test - -import kotlinx.atomicfu.* -import kotlin.math.* -import kotlin.native.* - -actual fun identityHashCode(instance: Any): Int = instance.identityHashCode() - -//implementation is based on SharedSet/SharedVector from ktor-io tests -//supports freezing -actual class MSet { - private var _size: Int by atomic(0) - private var content by atomic(MVector>()) - private val loadFactor: Float get() = _size.toFloat() / content.size - - init { - initBuckets(8) - } - - actual fun add(element: T): Boolean { - val bucket = findBucket(element) - - if (bucket.find(element) >= 0) { - return false - } - - bucket.push(element) - _size++ - - if (loadFactor > 0.75) { - doubleSize() - } - - return true - } - - actual fun remove(element: T): Boolean { - val bucket = findBucket(element) - val result = bucket.remove(element) - - if (result) { - _size-- - } - - return result - } - - private fun doubleSize() { - val old = content - initBuckets(content.size * 2) - _size = 0 - - for (bucketId in 0 until old.size) { - val bucket = old[bucketId] - for (itemId in 0 until bucket.size) { - add(bucket[itemId]) - } - } - } - - private fun initBuckets(count: Int) { - val newContent = MVector>() - repeat(count) { - newContent.push(MVector()) - } - - content = newContent - } - - private fun findBucket(element: T): MVector = content[(element.hashCode().absoluteValue) % content.size] - - actual fun clear() { - content.values().forEach(MVector::clear) - content.clear() - initBuckets(8) - _size = 0 - } - - actual fun values(): List = content.values().flatMap(MVector::values) -} - - -private class MVector { - private var _size by atomic(0) - private var content: AtomicArray by atomic(atomicArrayOfNulls(10)) - - val size: Int get() = _size - - fun values(): List = buildList { - repeat(content.size) { - content[it].value?.let(::add) - } - } - - fun push(element: T) { - if (_size >= content.size) { - increaseCapacity() - } - - content[_size].value = element - _size++ - } - - fun find(element: T): Int { - for (index in 0 until _size) { - if (element == content[index].value) { - return index - } - } - - return -1 - } - - operator fun get(index: Int): T { - if (index >= _size) { - throw IndexOutOfBoundsException("Index: $index, size: $size.") - } - - return content[index].value!! - } - - fun remove(element: T): Boolean { - val index = find(element) - if (index < 0) { - return false - } - - if (index >= size) { - throw IndexOutOfBoundsException("Index: $index, size: $size.") - } - - for (current in index until _size - 1) { - content[current].value = content[current + 1].value - } - - content[_size - 1].value = null - _size-- - return true - } - - fun clear() { - _size = 0 - content = atomicArrayOfNulls(0) - } - - private fun increaseCapacity() { - val newContent = atomicArrayOfNulls(content.size * 2) - for (index in 0 until content.size) { - newContent[index].value = content[index].value - } - - content = newContent - } -} diff --git a/rsocket-test/src/nativeMain/kotlin/io/rsocket/kotlin/test/Test.kt b/rsocket-test/src/nativeMain/kotlin/io/rsocket/kotlin/test/Test.kt index 6b374287a..22d546a78 100644 --- a/rsocket-test/src/nativeMain/kotlin/io/rsocket/kotlin/test/Test.kt +++ b/rsocket-test/src/nativeMain/kotlin/io/rsocket/kotlin/test/Test.kt @@ -18,6 +18,7 @@ package io.rsocket.kotlin.test import io.rsocket.kotlin.logging.* import kotlinx.coroutines.* +import kotlin.native.* internal actual fun runTest( ignoreNative: Boolean, @@ -32,3 +33,5 @@ actual val anotherDispatcher: CoroutineDispatcher get() = newSingleThreadContext @SharedImmutable actual val TestLoggerFactory: LoggerFactory = PrintLogger + +actual fun identityHashCode(instance: Any): Int = instance.identityHashCode()