From 483a27ab82b9d2f8a1ef19761744cf41febaaad1 Mon Sep 17 00:00:00 2001 From: George Fu Date: Fri, 11 Aug 2023 21:00:54 +0000 Subject: [PATCH 1/2] feat(lib-storage): improve performance by reducing buffer copies --- lib/lib-storage/src/Upload.spec.ts | 243 +++++++++--------- lib/lib-storage/src/bytelength.ts | 8 +- lib/lib-storage/src/chunker.ts | 36 ++- .../src/chunks/getChunkBuffer.spec.ts | 66 ----- lib/lib-storage/src/chunks/getChunkStream.ts | 16 +- .../src/chunks/getChunkUint8Array.spec.ts | 70 +++++ ...etChunkBuffer.ts => getChunkUint8Array.ts} | 9 +- lib/lib-storage/src/chunks/getDataReadable.ts | 10 +- .../src/chunks/getDataReadableStream.ts | 16 +- 9 files changed, 253 insertions(+), 221 deletions(-) delete mode 100644 lib/lib-storage/src/chunks/getChunkBuffer.spec.ts create mode 100644 lib/lib-storage/src/chunks/getChunkUint8Array.spec.ts rename lib/lib-storage/src/chunks/{getChunkBuffer.ts => getChunkUint8Array.ts} (60%) diff --git a/lib/lib-storage/src/Upload.spec.ts b/lib/lib-storage/src/Upload.spec.ts index a92fcb1c612bb..65a2e5c5f77ed 100644 --- a/lib/lib-storage/src/Upload.spec.ts +++ b/lib/lib-storage/src/Upload.spec.ts @@ -312,135 +312,136 @@ describe(Upload.name, () => { expect(result.Location).toEqual("https://example-bucket.example-host.com/folder/example-key"); }); - it("should upload using multi-part when parts are larger than part size", async () => { - // create a string that's larger than 5MB. - const partSize = 1024 * 1024 * 5; - const largeBuffer = Buffer.from("#".repeat(partSize + 10)); - const firstBuffer = largeBuffer.subarray(0, partSize); - const secondBuffer = largeBuffer.subarray(partSize); - const actionParams = { ...params, Body: largeBuffer }; - const upload = new Upload({ - params: actionParams, - client: new S3({}), - }); - await upload.done(); - expect(sendMock).toHaveBeenCalledTimes(4); - // create multipartMock is called correctly. - expect(createMultipartMock).toHaveBeenCalledTimes(1); - expect(createMultipartMock).toHaveBeenCalledWith({ - ...actionParams, - Body: undefined, - }); - // upload parts is called correctly. - expect(uploadPartMock).toHaveBeenCalledTimes(2); - expect(uploadPartMock).toHaveBeenNthCalledWith(1, { - ...actionParams, - // @ts-ignore extended custom matcher - Body: expect.toHaveSameHashAsBuffer(firstBuffer), - PartNumber: 1, - UploadId: "mockuploadId", - }); - expect(uploadPartMock).toHaveBeenNthCalledWith(2, { - ...actionParams, - // @ts-ignore extended custom matcher - Body: expect.toHaveSameHashAsBuffer(secondBuffer), - PartNumber: 2, - UploadId: "mockuploadId", - }); - // complete multipart upload is called correctly. - expect(completeMultipartMock).toHaveBeenCalledTimes(1); - expect(completeMultipartMock).toHaveBeenLastCalledWith({ - ...actionParams, - Body: undefined, - UploadId: "mockuploadId", - MultipartUpload: { - Parts: [ - { - ETag: "mock-upload-Etag", - PartNumber: 1, - }, - { - ETag: "mock-upload-Etag-2", - PartNumber: 2, - }, - ], - }, - }); + [ + { type: "buffer", largeBuffer: Buffer.from("#".repeat(DEFAULT_PART_SIZE + 10)) }, + { type: "Uint8array", largeBuffer: Uint8Array.from(Buffer.from("#".repeat(DEFAULT_PART_SIZE + 10))) }, + ].forEach(({ type, largeBuffer }) => { + it(`should upload using multi-part when parts are larger than part size ${type}`, async () => { + const firstBuffer = largeBuffer.subarray(0, DEFAULT_PART_SIZE); + const secondBuffer = largeBuffer.subarray(DEFAULT_PART_SIZE); + const actionParams = { ...params, Body: largeBuffer }; + const upload = new Upload({ + params: actionParams, + client: new S3({}), + }); + await upload.done(); + expect(sendMock).toHaveBeenCalledTimes(4); + // create multipartMock is called correctly. + expect(createMultipartMock).toHaveBeenCalledTimes(1); + expect(createMultipartMock).toHaveBeenCalledWith({ + ...actionParams, + Body: undefined, + }); + // upload parts is called correctly. + expect(uploadPartMock).toHaveBeenCalledTimes(2); + expect(uploadPartMock).toHaveBeenNthCalledWith(1, { + ...actionParams, + // @ts-ignore extended custom matcher + Body: expect.toHaveSameHashAsBuffer(firstBuffer), + PartNumber: 1, + UploadId: "mockuploadId", + }); + expect(uploadPartMock).toHaveBeenNthCalledWith(2, { + ...actionParams, + // @ts-ignore extended custom matcher + Body: expect.toHaveSameHashAsBuffer(secondBuffer), + PartNumber: 2, + UploadId: "mockuploadId", + }); + // complete multipart upload is called correctly. + expect(completeMultipartMock).toHaveBeenCalledTimes(1); + expect(completeMultipartMock).toHaveBeenLastCalledWith({ + ...actionParams, + Body: undefined, + UploadId: "mockuploadId", + MultipartUpload: { + Parts: [ + { + ETag: "mock-upload-Etag", + PartNumber: 1, + }, + { + ETag: "mock-upload-Etag-2", + PartNumber: 2, + }, + ], + }, + }); - // no tags were passed. - expect(putObjectTaggingMock).toHaveBeenCalledTimes(0); - // put was not called - expect(putObjectMock).toHaveBeenCalledTimes(0); - }); + // no tags were passed. + expect(putObjectTaggingMock).toHaveBeenCalledTimes(0); + // put was not called + expect(putObjectMock).toHaveBeenCalledTimes(0); + }); + + it("should upload using multi-part when parts are larger than part size stream", async () => { + // create a string that's larger than 5MB. + const firstBuffer = largeBuffer.subarray(0, DEFAULT_PART_SIZE); + const secondBuffer = largeBuffer.subarray(DEFAULT_PART_SIZE); + const streamBody = Readable.from( + (function* () { + yield largeBuffer; + })() + ); + const actionParams = { ...params, Body: streamBody }; + const upload = new Upload({ + params: actionParams, + client: new S3({}), + }); - it("should upload using multi-part when parts are larger than part size stream", async () => { - // create a string that's larger than 5MB. - const largeBuffer = Buffer.from("#".repeat(DEFAULT_PART_SIZE + 10)); - const firstBuffer = largeBuffer.subarray(0, DEFAULT_PART_SIZE); - const secondBuffer = largeBuffer.subarray(DEFAULT_PART_SIZE); - const streamBody = Readable.from( - (function* () { - yield largeBuffer; - })() - ); - const actionParams = { ...params, Body: streamBody }; - const upload = new Upload({ - params: actionParams, - client: new S3({}), - }); + await upload.done(); - await upload.done(); + expect(sendMock).toHaveBeenCalledTimes(4); + // create multipartMock is called correctly. + expect(createMultipartMock).toHaveBeenCalledTimes(1); + expect(createMultipartMock).toHaveBeenCalledWith({ + ...actionParams, + Body: undefined, + }); - expect(sendMock).toHaveBeenCalledTimes(4); - // create multipartMock is called correctly. - expect(createMultipartMock).toHaveBeenCalledTimes(1); - expect(createMultipartMock).toHaveBeenCalledWith({ - ...actionParams, - Body: undefined, - }); + // upload parts is called correctly. + expect(uploadPartMock).toHaveBeenCalledTimes(2); + expect(uploadPartMock).toHaveBeenNthCalledWith(1, { + ...actionParams, + // @ts-ignore extended custom matcher + Body: expect.toHaveSameHashAsBuffer(firstBuffer), + PartNumber: 1, + UploadId: "mockuploadId", + }); - // upload parts is called correctly. - expect(uploadPartMock).toHaveBeenCalledTimes(2); - expect(uploadPartMock).toHaveBeenNthCalledWith(1, { - ...actionParams, - // @ts-ignore extended custom matcher - Body: expect.toHaveSameHashAsBuffer(firstBuffer), - PartNumber: 1, - UploadId: "mockuploadId", - }); + expect(uploadPartMock).toHaveBeenNthCalledWith(2, { + ...actionParams, + // @ts-ignore extended custom matcher + Body: expect.toHaveSameHashAsBuffer(secondBuffer), + PartNumber: 2, + UploadId: "mockuploadId", + }); - expect(uploadPartMock).toHaveBeenNthCalledWith(2, { - ...actionParams, - // @ts-ignore extended custom matcher - Body: expect.toHaveSameHashAsBuffer(secondBuffer), - PartNumber: 2, - UploadId: "mockuploadId", - }); + // complete multipart upload is called correctly. + expect(completeMultipartMock).toHaveBeenCalledTimes(1); + expect(completeMultipartMock).toHaveBeenLastCalledWith({ + ...actionParams, + Body: undefined, + UploadId: "mockuploadId", + MultipartUpload: { + Parts: [ + { + ETag: "mock-upload-Etag", + PartNumber: 1, + }, + { + ETag: "mock-upload-Etag-2", + PartNumber: 2, + }, + ], + }, + }); - // complete multipart upload is called correctly. - expect(completeMultipartMock).toHaveBeenCalledTimes(1); - expect(completeMultipartMock).toHaveBeenLastCalledWith({ - ...actionParams, - Body: undefined, - UploadId: "mockuploadId", - MultipartUpload: { - Parts: [ - { - ETag: "mock-upload-Etag", - PartNumber: 1, - }, - { - ETag: "mock-upload-Etag-2", - PartNumber: 2, - }, - ], - }, + // no tags were passed. + expect(putObjectTaggingMock).toHaveBeenCalledTimes(0); + // put was not called + expect(putObjectMock).toHaveBeenCalledTimes(0); }); - - // no tags were passed. - expect(putObjectTaggingMock).toHaveBeenCalledTimes(0); - // put was not called - expect(putObjectMock).toHaveBeenCalledTimes(0); }); it("should add tags to the object if tags have been added PUT", async () => { diff --git a/lib/lib-storage/src/bytelength.ts b/lib/lib-storage/src/bytelength.ts index ab9ea3d94cf69..caefb943d0a82 100644 --- a/lib/lib-storage/src/bytelength.ts +++ b/lib/lib-storage/src/bytelength.ts @@ -1,8 +1,14 @@ +import { Buffer } from "buffer"; // do not remove this import: Node.js buffer or buffer NPM module for browser. + import { ClientDefaultValues } from "./runtimeConfig"; export const byteLength = (input: any) => { if (input === null || input === undefined) return 0; - if (typeof input === "string") input = Buffer.from(input); + + if (typeof input === "string") { + return Buffer.byteLength(input); + } + if (typeof input.byteLength === "number") { return input.byteLength; } else if (typeof input.length === "number") { diff --git a/lib/lib-storage/src/chunker.ts b/lib/lib-storage/src/chunker.ts index 40f0cd98be7b7..fb9c205bfb7a4 100644 --- a/lib/lib-storage/src/chunker.ts +++ b/lib/lib-storage/src/chunker.ts @@ -1,29 +1,37 @@ -import { Buffer } from "buffer"; +import { Buffer } from "buffer"; // do not remove this import: Node.js buffer or buffer NPM module for browser. import { Readable } from "stream"; -import { getChunkBuffer } from "./chunks/getChunkBuffer"; import { getChunkStream } from "./chunks/getChunkStream"; +import { getChunkUint8Array } from "./chunks/getChunkUint8Array"; import { getDataReadable } from "./chunks/getDataReadable"; import { getDataReadableStream } from "./chunks/getDataReadableStream"; import { BodyDataTypes } from "./types"; +import type { RawDataPart } from "./Upload"; -export const getChunk = (data: BodyDataTypes, partSize: number) => { - if (data instanceof Buffer) { - return getChunkBuffer(data, partSize); - } else if (data instanceof Readable) { +export const getChunk = (data: BodyDataTypes, partSize: number): AsyncGenerator => { + if (data instanceof Uint8Array) { + // includes Buffer (extends Uint8Array) + return getChunkUint8Array(data, partSize); + } + + if (data instanceof Readable) { return getChunkStream(data, partSize, getDataReadable); - } else if (data instanceof String || typeof data === "string" || data instanceof Uint8Array) { - // chunk Strings, Uint8Array. - return getChunkBuffer(Buffer.from(data), partSize); } + + if (data instanceof String || typeof data === "string") { + return getChunkUint8Array(Buffer.from(data), partSize); + } + if (typeof (data as any).stream === "function") { // approximate support for Blobs. return getChunkStream((data as any).stream(), partSize, getDataReadableStream); - } else if (data instanceof ReadableStream) { + } + + if (data instanceof ReadableStream) { return getChunkStream(data, partSize, getDataReadableStream); - } else { - throw new Error( - "Body Data is unsupported format, expected data to be one of: string | Uint8Array | Buffer | Readable | ReadableStream | Blob;." - ); } + + throw new Error( + "Body Data is unsupported format, expected data to be one of: string | Uint8Array | Buffer | Readable | ReadableStream | Blob;." + ); }; diff --git a/lib/lib-storage/src/chunks/getChunkBuffer.spec.ts b/lib/lib-storage/src/chunks/getChunkBuffer.spec.ts deleted file mode 100644 index 8f4335dcdc818..0000000000000 --- a/lib/lib-storage/src/chunks/getChunkBuffer.spec.ts +++ /dev/null @@ -1,66 +0,0 @@ -import { byteLength } from "../bytelength"; -import { getChunkBuffer } from "./getChunkBuffer"; - -describe.only(getChunkBuffer.name, () => { - const getBuffer = (size: number) => Buffer.from("#".repeat(size)); - - describe("Buffer chunking", () => { - it("should come back with small sub buffers", async () => { - const chunklength = 100; - const totalLength = 1000; - const buffer = getBuffer(totalLength); - const chunker = getChunkBuffer(buffer, chunklength); - - let chunkNum = 0; - const expectedNumberOfChunks = totalLength / chunklength; - for await (const chunk of chunker) { - chunkNum += 1; - expect(byteLength(chunk.data)).toEqual(chunklength); - expect(chunk.partNumber).toEqual(chunkNum); - if (chunkNum < expectedNumberOfChunks) { - expect(chunk.lastPart).toBe(undefined); - } else { - expect(chunk.lastPart).toBe(true); - } - } - - expect(chunkNum).toEqual(expectedNumberOfChunks); - }); - - it("should come back with the last chunk the remainder size", async () => { - const chunklength = 1000; - const totalLength = 2200; - const buffer = getBuffer(totalLength); - - const chunker = getChunkBuffer(buffer, chunklength); - const chunks = []; - for await (const chunk of chunker) { - chunks.push(chunk); - } - - expect(chunks.length).toEqual(3); - expect(byteLength(chunks[0].data)).toBe(chunklength); - expect(chunks[0].lastPart).toBe(undefined); - expect(byteLength(chunks[1].data)).toBe(chunklength); - expect(chunks[1].lastPart).toBe(undefined); - expect(byteLength(chunks[2].data)).toBe(totalLength % chunklength); - expect(chunks[2].lastPart).toBe(true); - }); - - it("should come back with one chunk if it is a small buffer", async () => { - const chunklength = 1000; - const totalLength = 200; - const buffer = getBuffer(totalLength); - - const chunker = getChunkBuffer(buffer, chunklength); - const chunks = []; - for await (const chunk of chunker) { - chunks.push(chunk); - } - - expect(chunks.length).toEqual(1); - expect(byteLength(chunks[0].data)).toBe(totalLength % chunklength); - expect(chunks[0].lastPart).toBe(true); - }); - }); -}); diff --git a/lib/lib-storage/src/chunks/getChunkStream.ts b/lib/lib-storage/src/chunks/getChunkStream.ts index 6165e4afd67fa..c8de2ffd0af48 100644 --- a/lib/lib-storage/src/chunks/getChunkStream.ts +++ b/lib/lib-storage/src/chunks/getChunkStream.ts @@ -1,23 +1,23 @@ -import { Buffer } from "buffer"; +import { Buffer } from "buffer"; // do not remove this import: Node.js buffer or buffer NPM module for browser. import { RawDataPart } from "../Upload"; interface Buffers { - chunks: Buffer[]; + chunks: Uint8Array[]; length: number; } export async function* getChunkStream( data: T, partSize: number, - getNextData: (data: T) => AsyncGenerator + getNextData: (data: T) => AsyncGenerator ): AsyncGenerator { let partNumber = 1; const currentBuffer: Buffers = { chunks: [], length: 0 }; for await (const datum of getNextData(data)) { currentBuffer.chunks.push(datum); - currentBuffer.length += datum.length; + currentBuffer.length += datum.byteLength; while (currentBuffer.length >= partSize) { /** @@ -28,18 +28,18 @@ export async function* getChunkStream( yield { partNumber, - data: dataChunk.slice(0, partSize), + data: dataChunk.subarray(0, partSize), }; // Reset the buffer. - currentBuffer.chunks = [dataChunk.slice(partSize)]; - currentBuffer.length = currentBuffer.chunks[0].length; + currentBuffer.chunks = [dataChunk.subarray(partSize)]; + currentBuffer.length = currentBuffer.chunks[0].byteLength; partNumber += 1; } } yield { partNumber, - data: Buffer.concat(currentBuffer.chunks), + data: currentBuffer.chunks.length !== 1 ? Buffer.concat(currentBuffer.chunks) : currentBuffer.chunks[0], lastPart: true, }; } diff --git a/lib/lib-storage/src/chunks/getChunkUint8Array.spec.ts b/lib/lib-storage/src/chunks/getChunkUint8Array.spec.ts new file mode 100644 index 0000000000000..eae36ca8c07c1 --- /dev/null +++ b/lib/lib-storage/src/chunks/getChunkUint8Array.spec.ts @@ -0,0 +1,70 @@ +import { byteLength } from "../bytelength"; +import { RawDataPart } from "../Upload"; +import { getChunkUint8Array } from "./getChunkUint8Array"; + +describe(getChunkUint8Array.name, () => { + [ + { type: "Buffer", getBuffer: (size: number) => Buffer.from("#".repeat(size)) }, + { type: "Uint8Array", getBuffer: (size: number) => Uint8Array.from(Buffer.from("#".repeat(size))) }, + ].forEach(({ getBuffer, type }) => { + describe(`${type} chunking`, () => { + it("should come back with small sub buffers", async () => { + const chunklength = 100; + const totalLength = 1000; + const buffer = getBuffer(totalLength); + const chunker = getChunkUint8Array(buffer, chunklength); + + let chunkNum = 0; + const expectedNumberOfChunks = totalLength / chunklength; + for await (const chunk of chunker) { + chunkNum += 1; + expect(byteLength(chunk.data)).toEqual(chunklength); + expect(chunk.partNumber).toEqual(chunkNum); + if (chunkNum < expectedNumberOfChunks) { + expect(chunk.lastPart).toBe(undefined); + } else { + expect(chunk.lastPart).toBe(true); + } + } + + expect(chunkNum).toEqual(expectedNumberOfChunks); + }); + + it("should come back with the last chunk the remainder size", async () => { + const chunklength = 1000; + const totalLength = 2200; + const buffer = getBuffer(totalLength); + + const chunker = getChunkUint8Array(buffer, chunklength); + const chunks = [] as RawDataPart[]; + for await (const chunk of chunker) { + chunks.push(chunk); + } + + expect(chunks.length).toEqual(3); + expect(byteLength(chunks[0].data)).toBe(chunklength); + expect(chunks[0].lastPart).toBe(undefined); + expect(byteLength(chunks[1].data)).toBe(chunklength); + expect(chunks[1].lastPart).toBe(undefined); + expect(byteLength(chunks[2].data)).toBe(totalLength % chunklength); + expect(chunks[2].lastPart).toBe(true); + }); + + it("should come back with one chunk if it is a small buffer", async () => { + const chunklength = 1000; + const totalLength = 200; + const buffer = getBuffer(totalLength); + + const chunker = getChunkUint8Array(buffer, chunklength); + const chunks = [] as RawDataPart[]; + for await (const chunk of chunker) { + chunks.push(chunk); + } + + expect(chunks.length).toEqual(1); + expect(byteLength(chunks[0].data)).toBe(totalLength % chunklength); + expect(chunks[0].lastPart).toBe(true); + }); + }); + }); +}); diff --git a/lib/lib-storage/src/chunks/getChunkBuffer.ts b/lib/lib-storage/src/chunks/getChunkUint8Array.ts similarity index 60% rename from lib/lib-storage/src/chunks/getChunkBuffer.ts rename to lib/lib-storage/src/chunks/getChunkUint8Array.ts index e3553a69fb0a2..255d6789cbbcc 100644 --- a/lib/lib-storage/src/chunks/getChunkBuffer.ts +++ b/lib/lib-storage/src/chunks/getChunkUint8Array.ts @@ -1,6 +1,9 @@ import { RawDataPart } from "../Upload"; -export async function* getChunkBuffer(data: Buffer, partSize: number): AsyncGenerator { +export async function* getChunkUint8Array( + data: Uint8Array, + partSize: number +): AsyncGenerator { let partNumber = 1; let startByte = 0; let endByte = partSize; @@ -8,7 +11,7 @@ export async function* getChunkBuffer(data: Buffer, partSize: number): AsyncGene while (endByte < data.byteLength) { yield { partNumber, - data: data.slice(startByte, endByte), + data: data.subarray(startByte, endByte), }; partNumber += 1; startByte = endByte; @@ -17,7 +20,7 @@ export async function* getChunkBuffer(data: Buffer, partSize: number): AsyncGene yield { partNumber, - data: data.slice(startByte), + data: data.subarray(startByte), lastPart: true, }; } diff --git a/lib/lib-storage/src/chunks/getDataReadable.ts b/lib/lib-storage/src/chunks/getDataReadable.ts index 7e71acfddf7f6..5ab5f705dba0a 100644 --- a/lib/lib-storage/src/chunks/getDataReadable.ts +++ b/lib/lib-storage/src/chunks/getDataReadable.ts @@ -1,8 +1,12 @@ -import { Buffer } from "buffer"; +import { Buffer } from "buffer"; // do not remove this import: Node.js buffer or buffer NPM module for browser. import { Readable } from "stream"; -export async function* getDataReadable(data: Readable): AsyncGenerator { +export async function* getDataReadable(data: Readable): AsyncGenerator { for await (const chunk of data) { - yield Buffer.from(chunk); + if (Buffer.isBuffer(chunk) || chunk instanceof Uint8Array) { + yield chunk; + } else { + yield Buffer.from(chunk); + } } } diff --git a/lib/lib-storage/src/chunks/getDataReadableStream.ts b/lib/lib-storage/src/chunks/getDataReadableStream.ts index 0f7671614faf5..b6894f4c7367b 100644 --- a/lib/lib-storage/src/chunks/getDataReadableStream.ts +++ b/lib/lib-storage/src/chunks/getDataReadableStream.ts @@ -1,6 +1,6 @@ -import { Buffer } from "buffer"; +import { Buffer } from "buffer"; // do not remove this import: Node.js buffer or buffer NPM module for browser. -export async function* getDataReadableStream(data: ReadableStream): AsyncGenerator { +export async function* getDataReadableStream(data: ReadableStream): AsyncGenerator { // Get a lock on the stream. const reader = data.getReader(); @@ -9,9 +9,15 @@ export async function* getDataReadableStream(data: ReadableStream): AsyncGenerat // Read from the stream. const { done, value } = await reader.read(); // Exit if we're done. - if (done) return; - // Else yield the chunk. - yield Buffer.from(value); + if (done) { + return; + } + + if (Buffer.isBuffer(value) || value instanceof Uint8Array) { + yield value; + } else { + yield Buffer.from(value); + } } } catch (e) { throw e; From ff37a21c2e127bbc2a5ad9d393a532d03e23772d Mon Sep 17 00:00:00 2001 From: George Fu Date: Tue, 12 Mar 2024 15:30:16 +0000 Subject: [PATCH 2/2] test(lib-storage): add e2e tests --- lib/lib-storage/jest.config.e2e.js | 5 ++ lib/lib-storage/package.json | 3 +- lib/lib-storage/src/chunks/getChunkStream.ts | 1 + lib/lib-storage/src/lib-storage.e2e.spec.ts | 52 ++++++++++++++++++++ 4 files changed, 60 insertions(+), 1 deletion(-) create mode 100644 lib/lib-storage/jest.config.e2e.js create mode 100644 lib/lib-storage/src/lib-storage.e2e.spec.ts diff --git a/lib/lib-storage/jest.config.e2e.js b/lib/lib-storage/jest.config.e2e.js new file mode 100644 index 0000000000000..c3aa6055ef756 --- /dev/null +++ b/lib/lib-storage/jest.config.e2e.js @@ -0,0 +1,5 @@ +module.exports = { + preset: "ts-jest", + testMatch: ["**/*.e2e.spec.ts"], + bail: true, +}; diff --git a/lib/lib-storage/package.json b/lib/lib-storage/package.json index 46c3c9dc8c2f1..a69a4a83048dc 100644 --- a/lib/lib-storage/package.json +++ b/lib/lib-storage/package.json @@ -14,7 +14,8 @@ "build:types:downlevel": "downlevel-dts dist-types dist-types/ts3.4", "clean": "rimraf ./dist-* && rimraf *.tsbuildinfo", "extract:docs": "api-extractor run --local", - "test": "jest" + "test": "jest", + "test:e2e": "jest -c jest.config.e2e.js" }, "engines": { "node": ">=14.0.0" diff --git a/lib/lib-storage/src/chunks/getChunkStream.ts b/lib/lib-storage/src/chunks/getChunkStream.ts index c8de2ffd0af48..21ac3c779aff6 100644 --- a/lib/lib-storage/src/chunks/getChunkStream.ts +++ b/lib/lib-storage/src/chunks/getChunkStream.ts @@ -37,6 +37,7 @@ export async function* getChunkStream( partNumber += 1; } } + yield { partNumber, data: currentBuffer.chunks.length !== 1 ? Buffer.concat(currentBuffer.chunks) : currentBuffer.chunks[0], diff --git a/lib/lib-storage/src/lib-storage.e2e.spec.ts b/lib/lib-storage/src/lib-storage.e2e.spec.ts new file mode 100644 index 0000000000000..c0909ab1e3a8c --- /dev/null +++ b/lib/lib-storage/src/lib-storage.e2e.spec.ts @@ -0,0 +1,52 @@ +import { S3 } from "@aws-sdk/client-s3"; +import { Upload } from "@aws-sdk/lib-storage"; +import type { AwsCredentialIdentity } from "@smithy/types"; +import { randomBytes } from "crypto"; +import { Readable } from "stream"; + +const region: string | undefined = process?.env?.AWS_SMOKE_TEST_REGION; +const credentials: AwsCredentialIdentity | undefined = (globalThis as any).credentials || undefined; +const Bucket = process?.env?.AWS_SMOKE_TEST_BUCKET; + +jest.setTimeout(45_000); + +describe("@aws-sdk/lib-storage", () => { + let Key = ``; + const data = randomBytes(20_240_000); + const dataString = data.toString(); + + const client = new S3({ + region, + credentials, + }); + + describe("Upload", () => { + beforeAll(() => { + Key = `multi-part-file-${Date.now()}`; + }); + afterAll(async () => { + await client.deleteObject({ Bucket, Key }); + }); + + for (const body of [data, dataString, Readable.from(data)]) { + it("should upload in parts for input type " + body.constructor.name, async () => { + const s3Upload = new Upload({ + client, + params: { + Bucket, + Key, + Body: body, + }, + }); + await s3Upload.done(); + + const object = await client.getObject({ + Bucket, + Key, + }); + + expect(await object.Body?.transformToString()).toEqual(dataString); + }); + } + }); +});