Skip to content

Commit 4575023

Browse files
authored
misc: enhance support for replayable instances of InputStream (#1197)
1 parent 7085c8a commit 4575023

File tree

3 files changed

+57
-2
lines changed

3 files changed

+57
-2
lines changed
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
{
2+
"id": "2de8162f-e5a0-4618-b00d-8da3cfdbc6a2",
3+
"type": "feature",
4+
"description": "Enhance support for replayable instances of `InputStream`",
5+
"issues": [
6+
"https://github.com/awslabs/aws-sdk-kotlin/issues/1473"
7+
]
8+
}

runtime/runtime-core/jvm/src/aws/smithy/kotlin/runtime/content/ByteStreamJVM.kt

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,11 +114,30 @@ public fun ByteStream.Companion.fromInputStream(
114114
* @param contentLength If specified, indicates how many bytes remain in this stream. Defaults to `null`.
115115
*/
116116
public fun InputStream.asByteStream(contentLength: Long? = null): ByteStream.SourceStream {
117-
val source = source()
117+
if (markSupported() && contentLength != null) {
118+
mark(contentLength.toInt())
119+
}
120+
118121
return object : ByteStream.SourceStream() {
119122
override val contentLength: Long? = contentLength
120123
override val isOneShot: Boolean = !markSupported()
121-
override fun readFrom(): SdkSource = source
124+
override fun readFrom(): SdkSource {
125+
if (markSupported() && contentLength != null) {
126+
reset()
127+
mark(contentLength.toInt())
128+
return object : SdkSource by source() {
129+
/*
130+
* This is a no-op close to prevent body hashing from closing the underlying InputStream, which causes
131+
* `IOException: Stream closed` on subsequent reads. Consider making [ByteStream.ChannelStream]/[ByteStream.SourceStream]
132+
* (or possibly even [ByteStream] itself) implement [Closeable] to better handle closing streams.
133+
* This should allow us to clean up our usage of [ByteStream.cancel()].
134+
*/
135+
override fun close() { }
136+
}
137+
}
138+
139+
return source()
140+
}
122141
}
123142
}
124143

runtime/runtime-core/jvm/test/aws/smithy/kotlin/runtime/content/ByteStreamJVMTest.kt

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,11 @@
55

66
package aws.smithy.kotlin.runtime.content
77

8+
import aws.smithy.kotlin.runtime.io.readToByteArray
89
import aws.smithy.kotlin.runtime.testing.RandomTempFile
910
import kotlinx.coroutines.test.runTest
11+
import java.io.BufferedInputStream
12+
import java.io.ByteArrayInputStream
1013
import java.io.ByteArrayOutputStream
1114
import java.io.InputStream
1215
import java.io.OutputStream
@@ -228,6 +231,31 @@ class ByteStreamJVMTest {
228231
assertFalse(sos.closed)
229232
}
230233

234+
// https://github.com/awslabs/aws-sdk-kotlin/issues/1473
235+
@Test
236+
fun testReplayableInputStreamAsByteStream() = runTest {
237+
val content = "Hello, Bytes!".encodeToByteArray()
238+
val byteArrayIns = ByteArrayInputStream(content)
239+
val nonReplayableIns = NonReplayableInputStream(byteArrayIns)
240+
241+
// buffer the non-replayable stream, making it replayable...
242+
val bufferedIns = BufferedInputStream(nonReplayableIns)
243+
244+
val byteStream = bufferedIns.asByteStream(content.size.toLong())
245+
246+
// Test that it can be read at least twice (e.g. once for hashing the body, once for transmitting the body)
247+
assertContentEquals(content, byteStream.readFrom().use { it.readToByteArray() })
248+
assertContentEquals(content, byteStream.readFrom().use { it.readToByteArray() })
249+
}
250+
251+
private class NonReplayableInputStream(val inputStream: InputStream) : InputStream() {
252+
override fun markSupported(): Boolean = false // not replayable
253+
254+
override fun read(): Int = inputStream.read()
255+
override fun mark(readlimit: Int) = inputStream.mark(readlimit)
256+
override fun reset() = inputStream.reset()
257+
}
258+
231259
private class StatusTrackingOutputStream(val os: OutputStream) : OutputStream() {
232260
var closed: Boolean = false
233261

0 commit comments

Comments
 (0)