Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changes/2de8162f-e5a0-4618-b00d-8da3cfdbc6a2.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"id": "2de8162f-e5a0-4618-b00d-8da3cfdbc6a2",
"type": "feature",
"description": "Enhance support for replayable instances of `InputStream`",
"issues": [
"https://github.com/awslabs/aws-sdk-kotlin/issues/1473"
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,30 @@ public fun ByteStream.Companion.fromInputStream(
* @param contentLength If specified, indicates how many bytes remain in this stream. Defaults to `null`.
*/
public fun InputStream.asByteStream(contentLength: Long? = null): ByteStream.SourceStream {
val source = source()
if (markSupported() && contentLength != null) {
mark(contentLength.toInt())
}

return object : ByteStream.SourceStream() {
override val contentLength: Long? = contentLength
override val isOneShot: Boolean = !markSupported()
override fun readFrom(): SdkSource = source
override fun readFrom(): SdkSource {
if (markSupported() && contentLength != null) {
reset()
mark(contentLength.toInt())
return object : SdkSource by source() {
/*
* This is a no-op close to prevent body hashing from closing the underlying InputStream, which causes
* `IOException: Stream closed` on subsequent reads. Consider making [ByteStream.ChannelStream]/[ByteStream.SourceStream]
* (or possibly even [ByteStream] itself) implement [Closeable] to better handle closing streams.
* This should allow us to clean up our usage of [ByteStream.cancel()].
*/
override fun close() { }
}
}

return source()
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@

package aws.smithy.kotlin.runtime.content

import aws.smithy.kotlin.runtime.io.readToByteArray
import aws.smithy.kotlin.runtime.testing.RandomTempFile
import kotlinx.coroutines.test.runTest
import java.io.BufferedInputStream
import java.io.ByteArrayInputStream
import java.io.ByteArrayOutputStream
import java.io.InputStream
import java.io.OutputStream
Expand Down Expand Up @@ -228,6 +231,31 @@ class ByteStreamJVMTest {
assertFalse(sos.closed)
}

// https://github.com/awslabs/aws-sdk-kotlin/issues/1473
@Test
fun testReplayableInputStreamAsByteStream() = runTest {
val content = "Hello, Bytes!".encodeToByteArray()
val byteArrayIns = ByteArrayInputStream(content)
val nonReplayableIns = NonReplayableInputStream(byteArrayIns)

// buffer the non-replayable stream, making it replayable...
val bufferedIns = BufferedInputStream(nonReplayableIns)

val byteStream = bufferedIns.asByteStream(content.size.toLong())

// Test that it can be read at least twice (e.g. once for hashing the body, once for transmitting the body)
assertContentEquals(content, byteStream.readFrom().use { it.readToByteArray() })
assertContentEquals(content, byteStream.readFrom().use { it.readToByteArray() })
}

private class NonReplayableInputStream(val inputStream: InputStream) : InputStream() {
override fun markSupported(): Boolean = false // not replayable

override fun read(): Int = inputStream.read()
override fun mark(readlimit: Int) = inputStream.mark(readlimit)
override fun reset() = inputStream.reset()
}

private class StatusTrackingOutputStream(val os: OutputStream) : OutputStream() {
var closed: Boolean = false

Expand Down
Loading