Skip to content

Commit 2653fbf

Browse files
authored
kn: implement SdkBuffer (#1214)
1 parent 43e39b8 commit 2653fbf

37 files changed

+245
-909
lines changed

runtime/crt-util/jvm/src/aws/smithy/kotlin/runtime/crt/ReadChannelBodyStreamJvm.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,6 @@ package aws.smithy.kotlin.runtime.crt
77

88
import aws.sdk.kotlin.crt.io.MutableBuffer
99
import aws.smithy.kotlin.runtime.io.SdkBuffer
10+
import aws.smithy.kotlin.runtime.io.read
1011

1112
internal actual fun transferRequestBody(outgoing: SdkBuffer, dest: MutableBuffer) = outgoing.read(dest.buffer)

runtime/runtime-core/api/runtime-core.api

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -760,6 +760,12 @@ public final class aws/smithy/kotlin/runtime/io/ClosedWriteChannelException : ja
760760
public synthetic fun <init> (Ljava/lang/String;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
761761
}
762762

763+
public class aws/smithy/kotlin/runtime/io/EOFException : java/io/EOFException {
764+
public fun <init> ()V
765+
public fun <init> (Ljava/lang/String;)V
766+
public fun <init> (Ljava/lang/String;Ljava/lang/Throwable;)V
767+
}
768+
763769
public final class aws/smithy/kotlin/runtime/io/GzipByteReadChannel : aws/smithy/kotlin/runtime/io/SdkByteReadChannel {
764770
public fun <init> (Laws/smithy/kotlin/runtime/io/SdkByteReadChannel;)V
765771
public fun cancel (Ljava/lang/Throwable;)Z
@@ -805,6 +811,10 @@ public final class aws/smithy/kotlin/runtime/io/HashingSource : aws/smithy/kotli
805811
}
806812

807813
public final class aws/smithy/kotlin/runtime/io/JavaIOKt {
814+
public static final fun inputStream (Laws/smithy/kotlin/runtime/io/SdkBuffer;)Ljava/io/InputStream;
815+
public static final fun isOpen (Laws/smithy/kotlin/runtime/io/SdkBuffer;)Z
816+
public static final fun outputStream (Laws/smithy/kotlin/runtime/io/SdkBuffer;)Ljava/io/OutputStream;
817+
public static final fun read (Laws/smithy/kotlin/runtime/io/SdkBuffer;Ljava/nio/ByteBuffer;)I
808818
public static final fun sink (Ljava/io/File;)Laws/smithy/kotlin/runtime/io/SdkSink;
809819
public static final fun sink (Ljava/io/OutputStream;)Laws/smithy/kotlin/runtime/io/SdkSink;
810820
public static final fun sink (Ljava/nio/file/Path;)Laws/smithy/kotlin/runtime/io/SdkSink;
@@ -815,6 +825,7 @@ public final class aws/smithy/kotlin/runtime/io/JavaIOKt {
815825
public static final fun source (Ljava/nio/file/Path;Lkotlin/ranges/LongRange;)Laws/smithy/kotlin/runtime/io/SdkSource;
816826
public static synthetic fun source$default (Ljava/io/File;JJILjava/lang/Object;)Laws/smithy/kotlin/runtime/io/SdkSource;
817827
public static synthetic fun source$default (Ljava/nio/file/Path;JJILjava/lang/Object;)Laws/smithy/kotlin/runtime/io/SdkSource;
828+
public static final fun write (Laws/smithy/kotlin/runtime/io/SdkBuffer;Ljava/nio/ByteBuffer;)I
818829
}
819830

820831
public final class aws/smithy/kotlin/runtime/io/SdkBuffer : aws/smithy/kotlin/runtime/io/SdkBufferedSink, aws/smithy/kotlin/runtime/io/SdkBufferedSource {
@@ -827,12 +838,8 @@ public final class aws/smithy/kotlin/runtime/io/SdkBuffer : aws/smithy/kotlin/ru
827838
public fun getBuffer ()Laws/smithy/kotlin/runtime/io/SdkBuffer;
828839
public final fun getSize ()J
829840
public fun hashCode ()I
830-
public fun inputStream ()Ljava/io/InputStream;
831-
public fun isOpen ()Z
832-
public fun outputStream ()Ljava/io/OutputStream;
833841
public fun peek ()Laws/smithy/kotlin/runtime/io/SdkBufferedSource;
834842
public fun read (Laws/smithy/kotlin/runtime/io/SdkBuffer;J)J
835-
public fun read (Ljava/nio/ByteBuffer;)I
836843
public fun read ([BII)I
837844
public fun readAll (Laws/smithy/kotlin/runtime/io/SdkSink;)J
838845
public fun readByte ()B
@@ -852,7 +859,6 @@ public final class aws/smithy/kotlin/runtime/io/SdkBuffer : aws/smithy/kotlin/ru
852859
public fun toString ()Ljava/lang/String;
853860
public fun write (Laws/smithy/kotlin/runtime/io/SdkBuffer;J)V
854861
public fun write (Laws/smithy/kotlin/runtime/io/SdkSource;J)V
855-
public fun write (Ljava/nio/ByteBuffer;)I
856862
public fun write ([BII)V
857863
public fun writeAll (Laws/smithy/kotlin/runtime/io/SdkSource;)J
858864
public fun writeByte (B)V
@@ -865,11 +871,9 @@ public final class aws/smithy/kotlin/runtime/io/SdkBuffer : aws/smithy/kotlin/ru
865871
public fun writeUtf8 (Ljava/lang/String;II)V
866872
}
867873

868-
public abstract interface class aws/smithy/kotlin/runtime/io/SdkBufferedSink : aws/smithy/kotlin/runtime/io/SdkSink, java/nio/channels/WritableByteChannel {
874+
public abstract interface class aws/smithy/kotlin/runtime/io/SdkBufferedSink : aws/smithy/kotlin/runtime/io/SdkSink {
869875
public abstract fun emit ()V
870-
public abstract fun flush ()V
871876
public abstract fun getBuffer ()Laws/smithy/kotlin/runtime/io/SdkBuffer;
872-
public abstract fun outputStream ()Ljava/io/OutputStream;
873877
public abstract fun write (Laws/smithy/kotlin/runtime/io/SdkSource;J)V
874878
public abstract fun write ([BII)V
875879
public abstract fun writeAll (Laws/smithy/kotlin/runtime/io/SdkSource;)J
@@ -888,10 +892,9 @@ public final class aws/smithy/kotlin/runtime/io/SdkBufferedSink$DefaultImpls {
888892
public static synthetic fun writeUtf8$default (Laws/smithy/kotlin/runtime/io/SdkBufferedSink;Ljava/lang/String;IIILjava/lang/Object;)V
889893
}
890894

891-
public abstract interface class aws/smithy/kotlin/runtime/io/SdkBufferedSource : aws/smithy/kotlin/runtime/io/SdkSource, java/nio/channels/ReadableByteChannel {
895+
public abstract interface class aws/smithy/kotlin/runtime/io/SdkBufferedSource : aws/smithy/kotlin/runtime/io/SdkSource {
892896
public abstract fun exhausted ()Z
893897
public abstract fun getBuffer ()Laws/smithy/kotlin/runtime/io/SdkBuffer;
894-
public abstract fun inputStream ()Ljava/io/InputStream;
895898
public abstract fun peek ()Laws/smithy/kotlin/runtime/io/SdkBufferedSource;
896899
public abstract fun read ([BII)I
897900
public abstract fun readAll (Laws/smithy/kotlin/runtime/io/SdkSink;)J
@@ -1023,16 +1026,13 @@ public abstract interface class aws/smithy/kotlin/runtime/io/SdkSource : java/io
10231026
public abstract fun read (Laws/smithy/kotlin/runtime/io/SdkBuffer;J)J
10241027
}
10251028

1026-
public final class aws/smithy/kotlin/runtime/io/SdkSourceJVMKt {
1029+
public final class aws/smithy/kotlin/runtime/io/SdkSourceKt {
1030+
public static final fun readFully (Laws/smithy/kotlin/runtime/io/SdkSource;Laws/smithy/kotlin/runtime/io/SdkBuffer;J)V
10271031
public static final fun readToByteArray (Laws/smithy/kotlin/runtime/io/SdkSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
10281032
public static final fun toSdkByteReadChannel (Laws/smithy/kotlin/runtime/io/SdkSource;Lkotlinx/coroutines/CoroutineScope;)Laws/smithy/kotlin/runtime/io/SdkByteReadChannel;
10291033
public static synthetic fun toSdkByteReadChannel$default (Laws/smithy/kotlin/runtime/io/SdkSource;Lkotlinx/coroutines/CoroutineScope;ILjava/lang/Object;)Laws/smithy/kotlin/runtime/io/SdkByteReadChannel;
10301034
}
10311035

1032-
public final class aws/smithy/kotlin/runtime/io/SdkSourceKt {
1033-
public static final fun readFully (Laws/smithy/kotlin/runtime/io/SdkSource;Laws/smithy/kotlin/runtime/io/SdkBuffer;J)V
1034-
}
1035-
10361036
public final class aws/smithy/kotlin/runtime/io/internal/ConvertKt {
10371037
public static final fun toOkio (Laws/smithy/kotlin/runtime/io/SdkBuffer;)Lokio/Buffer;
10381038
public static final fun toOkio (Laws/smithy/kotlin/runtime/io/SdkSink;)Lokio/Sink;

runtime/runtime-core/common/src/aws/smithy/kotlin/runtime/content/ByteStream.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package aws.smithy.kotlin.runtime.content
77
import aws.smithy.kotlin.runtime.io.*
88
import aws.smithy.kotlin.runtime.io.internal.SdkDispatchers
99
import kotlinx.coroutines.CoroutineScope
10+
import kotlinx.coroutines.IO
1011
import kotlinx.coroutines.flow.*
1112
import kotlinx.coroutines.launch
1213

runtime/runtime-core/common/src/aws/smithy/kotlin/runtime/io/BufferedSinkAdapter.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ internal expect class BufferedSinkAdapter(sink: okio.BufferedSink) : SdkBuffered
3030
// base class that fills in most of the common implementation, platforms just need to implement the platform specific
3131
// part of the interface
3232
internal abstract class AbstractBufferedSinkAdapter(
33-
protected val delegate: okio.BufferedSink,
33+
internal val delegate: okio.BufferedSink,
3434
) : SdkBufferedSink {
3535
override fun toString(): String = delegate.toString()
3636

runtime/runtime-core/common/src/aws/smithy/kotlin/runtime/io/BuffereredSourceAdapter.kt

Lines changed: 44 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -32,53 +32,76 @@ internal expect class BufferedSourceAdapter(source: okio.BufferedSource) : SdkBu
3232
override fun close()
3333
}
3434

35+
/**
36+
* Used to wrap calls to Okio, catching Okio exceptions (e.g. okio.EOFException) and throwing our own (e.g. aws.smithy.kotlin.runtime.io.EOFException).
37+
*/
38+
internal inline fun <T> SdkBufferedSource.wrapOkio(block: SdkBufferedSource.() -> T): T = try {
39+
block()
40+
} catch (e: okio.EOFException) {
41+
throw EOFException("Okio operation failed: ${e.message}", e)
42+
} catch (e: okio.IOException) {
43+
throw IOException("Okio operation failed: ${e.message}", e)
44+
}
45+
3546
// base class that fills in most of the common implementation, platforms just need to implement the platform specific
3647
// part of the interface
3748
internal abstract class AbstractBufferedSourceAdapter(
38-
protected val delegate: okio.BufferedSource,
49+
internal val delegate: okio.BufferedSource,
3950
) : SdkBufferedSource {
4051
override val buffer: SdkBuffer
4152
get() = delegate.buffer.toSdk()
4253

43-
override fun skip(byteCount: Long): Unit = delegate.skip(byteCount)
54+
override fun skip(byteCount: Long): Unit = wrapOkio { delegate.skip(byteCount) }
4455

45-
override fun readByte(): Byte = delegate.readByte()
56+
override fun readByte(): Byte = wrapOkio { delegate.readByte() }
4657

47-
override fun readShort(): Short = delegate.readShort()
58+
override fun readShort(): Short = wrapOkio { delegate.readShort() }
4859

49-
override fun readShortLe(): Short = delegate.readShortLe()
60+
override fun readShortLe(): Short = wrapOkio { delegate.readShortLe() }
5061

51-
override fun readLong(): Long = delegate.readLong()
62+
override fun readLong(): Long = wrapOkio { delegate.readLong() }
5263

53-
override fun readLongLe(): Long = delegate.readLongLe()
64+
override fun readLongLe(): Long = wrapOkio { delegate.readLongLe() }
5465

55-
override fun readInt(): Int = delegate.readInt()
66+
override fun readInt(): Int = wrapOkio { delegate.readInt() }
5667

57-
override fun readIntLe(): Int = delegate.readIntLe()
68+
override fun readIntLe(): Int = wrapOkio { delegate.readIntLe() }
5869

59-
override fun readAll(sink: SdkSink): Long =
70+
override fun readAll(sink: SdkSink): Long = wrapOkio {
6071
delegate.readAll(sink.toOkio())
72+
}
6173

62-
override fun read(sink: ByteArray, offset: Int, limit: Int): Int =
74+
override fun read(sink: ByteArray, offset: Int, limit: Int): Int = wrapOkio {
6375
delegate.read(sink, offset, limit)
76+
}
6477

65-
override fun read(sink: SdkBuffer, limit: Long): Long =
78+
override fun read(sink: SdkBuffer, limit: Long): Long = wrapOkio {
6679
delegate.read(sink.toOkio(), limit)
80+
}
6781

68-
override fun readByteArray(): ByteArray = delegate.readByteArray()
82+
override fun readByteArray(): ByteArray = wrapOkio { delegate.readByteArray() }
6983

70-
override fun readByteArray(byteCount: Long): ByteArray = delegate.readByteArray(byteCount)
84+
override fun readByteArray(byteCount: Long): ByteArray = wrapOkio {
85+
delegate.readByteArray(byteCount)
86+
}
7187

72-
override fun readUtf8(): String = delegate.readUtf8()
88+
override fun readUtf8(): String = wrapOkio { delegate.readUtf8() }
7389

74-
override fun readUtf8(byteCount: Long): String = delegate.readUtf8(byteCount)
90+
override fun readUtf8(byteCount: Long): String = wrapOkio {
91+
delegate.readUtf8(byteCount)
92+
}
7593

76-
override fun peek(): SdkBufferedSource =
94+
override fun peek(): SdkBufferedSource = wrapOkio {
7795
delegate.peek().toSdk().buffer()
78-
override fun exhausted(): Boolean = delegate.exhausted()
79-
override fun request(byteCount: Long): Boolean = delegate.request(byteCount)
96+
}
97+
98+
override fun exhausted(): Boolean = wrapOkio { delegate.exhausted() }
99+
100+
override fun request(byteCount: Long): Boolean = wrapOkio {
101+
delegate.request(byteCount)
102+
}
80103

81-
override fun require(byteCount: Long): Unit = delegate.require(byteCount)
104+
override fun require(byteCount: Long): Unit = wrapOkio { delegate.require(byteCount) }
82105

83-
override fun close() = delegate.close()
106+
override fun close() = wrapOkio { delegate.close() }
84107
}

runtime/runtime-core/common/src/aws/smithy/kotlin/runtime/io/Exceptions.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,9 @@ public expect open class IOException(message: String?, cause: Throwable?) : Exce
1010
public constructor(message: String?)
1111
}
1212

13-
public expect open class EOFException(message: String?) : IOException {
13+
public expect open class EOFException(message: String?, cause: Throwable?) : IOException {
1414
public constructor()
15+
public constructor(message: String?)
1516
}
1617

1718
/**

runtime/runtime-core/common/src/aws/smithy/kotlin/runtime/io/SdkByteReadChannel.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package aws.smithy.kotlin.runtime.io
66

77
import aws.smithy.kotlin.runtime.io.internal.SdkDispatchers
8+
import kotlinx.coroutines.IO
89
import kotlinx.coroutines.withContext
910

1011
/**

runtime/runtime-core/common/src/aws/smithy/kotlin/runtime/io/SdkSource.kt

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,15 @@
66
package aws.smithy.kotlin.runtime.io
77

88
import aws.smithy.kotlin.runtime.InternalApi
9+
import aws.smithy.kotlin.runtime.io.internal.JobChannel
10+
import aws.smithy.kotlin.runtime.io.internal.SdkDispatchers
11+
import kotlinx.coroutines.CoroutineName
912
import kotlinx.coroutines.CoroutineScope
13+
import kotlinx.coroutines.DelicateCoroutinesApi
14+
import kotlinx.coroutines.GlobalScope
15+
import kotlinx.coroutines.ensureActive
16+
import kotlinx.coroutines.launch
17+
import kotlinx.coroutines.withContext
1018

1119
/**
1220
* A source for reading a stream of bytes (e.g. from file, network, or in-memory buffer). Sources may
@@ -43,16 +51,42 @@ public interface SdkSource : Closeable {
4351
* Consume the [SdkSource] and pull the entire contents into memory as a [ByteArray].
4452
*/
4553
@InternalApi
46-
public expect suspend fun SdkSource.readToByteArray(): ByteArray
54+
public suspend fun SdkSource.readToByteArray(): ByteArray = withContext(SdkDispatchers.IO) {
55+
use { it.buffer().readByteArray() }
56+
}
4757

4858
/**
4959
* Convert the [SdkSource] to an [SdkByteReadChannel]. Content is read from the source and forwarded
5060
* to the channel.
5161
* @param coroutineScope the coroutine scope to use to launch a background reader channel responsible for propagating data
5262
* between source and the returned channel
5363
*/
64+
@OptIn(DelicateCoroutinesApi::class)
5465
@InternalApi
55-
public expect fun SdkSource.toSdkByteReadChannel(coroutineScope: CoroutineScope? = null): SdkByteReadChannel
66+
public fun SdkSource.toSdkByteReadChannel(coroutineScope: CoroutineScope? = null): SdkByteReadChannel {
67+
val source = this
68+
val ch = JobChannel()
69+
val scope = coroutineScope ?: GlobalScope
70+
val job = scope.launch(SdkDispatchers.IO + CoroutineName("sdk-source-reader")) {
71+
val buffer = SdkBuffer()
72+
val result = runCatching {
73+
source.use {
74+
while (true) {
75+
ensureActive()
76+
val rc = source.read(buffer, DEFAULT_BYTE_CHANNEL_MAX_BUFFER_SIZE.toLong())
77+
if (rc == -1L) break
78+
ch.write(buffer)
79+
}
80+
}
81+
}
82+
83+
ch.close(result.exceptionOrNull())
84+
}
85+
86+
ch.attachJob(job)
87+
88+
return ch
89+
}
5690

5791
/**
5892
* Remove exactly [byteCount] bytes from this source and appends them to [sink].

0 commit comments

Comments
 (0)