Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -1580,8 +1580,8 @@ internal open class BufferedChannel<E>(
* [CancellableContinuation] and [SelectInstance].
*
* Roughly, [hasNext] is a [receive] sibling, while [next] simply
* returns the already retrieved element. From the implementation
* side, [receiveResult] stores the element retrieved by [hasNext]
* returns the already retrieved element and [hasNext] being idempotent.
* From the implementation side, [receiveResult] stores the element retrieved by [hasNext]
* (or a special [CHANNEL_CLOSED] token if the channel is closed).
*
* The [invoke] function is a [CancelHandler] implementation,
Expand Down Expand Up @@ -1614,8 +1614,10 @@ internal open class BufferedChannel<E>(
private var continuation: CancellableContinuationImpl<Boolean>? = null

// `hasNext()` is just a special receive operation.
override suspend fun hasNext(): Boolean =
receiveImpl( // <-- this is an inline function
override suspend fun hasNext(): Boolean {
return if (this.receiveResult !== NO_RECEIVE_RESULT && this.receiveResult !== CHANNEL_CLOSED) {
true
} else receiveImpl( // <-- this is an inline function
// Do not create a continuation until it is required;
// it is created later via [onNoWaiterSuspend], if needed.
waiter = null,
Expand All @@ -1636,6 +1638,7 @@ internal open class BufferedChannel<E>(
// The tail-call optimization is applied here.
onNoWaiterSuspend = { segm, i, r -> return hasNextOnNoWaiterSuspend(segm, i, r) }
)
}

private fun onClosedHasNext(): Boolean {
this.receiveResult = CHANNEL_CLOSED
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,40 @@ import kotlinx.coroutines.*
import kotlin.test.*

class BufferedChannelTest : TestBase() {
@Test
fun testIteratorHasNextIsIdempotent() = runTest {
val q = Channel<Int>()
check(q.isEmpty)
val iter = q.iterator()
expect(1)
val sender = launch {
expect(4)
q.send(1) // sent
expect(10)
q.close()
expect(11)
}
expect(2)
val receiver = launch {
expect(5)
check(iter.hasNext())
expect(6)
check(iter.hasNext())
expect(7)
check(iter.hasNext())
expect(8)
check(iter.next() == 1)
expect(9)
check(!iter.hasNext())
expect(12)
}
expect(3)
sender.join()
receiver.join()
check(q.isClosedForReceive)
finish(13)
}

@Test
fun testSimple() = runTest {
val q = Channel<Int>(1)
Expand Down