diff --git a/pom.xml b/pom.xml index ea7135f..91e13c0 100644 --- a/pom.xml +++ b/pom.xml @@ -162,7 +162,7 @@ synanon/video/synanon.mp4 soul/audio/uclapasc.wav - soul/audio/uclapasc.dat + soul/audio/uclapasc.dat.gz diff --git a/src/main/java/edu/ucla/library/avpairtree/verticles/WaveformVerticle.java b/src/main/java/edu/ucla/library/avpairtree/verticles/WaveformVerticle.java index 610fc71..5ba56df 100644 --- a/src/main/java/edu/ucla/library/avpairtree/verticles/WaveformVerticle.java +++ b/src/main/java/edu/ucla/library/avpairtree/verticles/WaveformVerticle.java @@ -1,11 +1,12 @@ package edu.ucla.library.avpairtree.verticles; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.net.URLEncoder; -import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.nio.file.Path; +import java.util.zip.GZIPOutputStream; import info.freelibrary.util.Logger; import info.freelibrary.util.LoggerFactory; @@ -148,8 +149,9 @@ public void start(final Promise aPromise) { } /** - * Transforms the source audio file at the given path into audiowaveform data, uploads that data to S3, and replies - * to the message with the URL for the data. If either the transformation or upload fails, sends back error details. + * Transforms the source audio file at the given path into audiowaveform data, compresses and uploads that data to + * S3, and replies to the message with the URL for the compressed data. If either the transformation, compression, + * or upload fails, sends back error details. * * @param aMessage A message with the file path of the audio file to transform */ @@ -158,30 +160,37 @@ private void handle(final Message aMessage) { final CsvItem csvItem = aMessage.body(); final Path audioFilePath = AvPtUtils.getInputFilePath(csvItem, mySourceDir); - audiowaveform(audioFilePath).onSuccess(s3ObjectData -> { + audiowaveform(audioFilePath).onSuccess(data -> { final String ark = csvItem.getItemARK(); final String s3ObjectKey = StringUtils.format(S3_OBJECT_KEY_TEMPLATE, ark); - final PutObjectRequest req = PutObjectRequest.builder().bucket(myS3Bucket).key(s3ObjectKey).build(); - final AsyncRequestBody body = AsyncRequestBody.fromByteBuffer(s3ObjectData); - - // Store the audiowaveform data on S3 - myS3Client.putObject(req, body).whenComplete((resp, err) -> { - if (resp != null) { - // Success! - final String audiowaveformURL = StringUtils.format(myS3ObjectUrlTemplate, - URLEncoder.encode(s3ObjectKey, StandardCharsets.UTF_8)); - - // Reply with a JsonObject associating the item ARK with the URL for the audiowaveform data - aMessage.reply(new JsonObject().put(csvItem.getItemARK(), audiowaveformURL)); - } else { - final String s3ErrorMsg = - LOGGER.getMessage(MessageCodes.AVPT_022, s3ObjectKey, err.getMessage()); - - // Since the sender (WatcherVerticle) just logs all errors, should be okay to use a single - // failureCode for all errors - aMessage.fail(Op.ERROR_CODE, s3ErrorMsg); - } - }); + final PutObjectRequest req = + PutObjectRequest.builder().bucket(myS3Bucket).key(s3ObjectKey).contentEncoding("gzip").build(); + + try { + final byte[] compressedData = gzip(data); + final AsyncRequestBody body = AsyncRequestBody.fromBytes(compressedData); + + // Store the compressed audiowaveform data on S3 + myS3Client.putObject(req, body).whenComplete((resp, err) -> { + if (resp != null) { + // Success! + final String audiowaveformURL = StringUtils.format(myS3ObjectUrlTemplate, + URLEncoder.encode(s3ObjectKey, StandardCharsets.UTF_8)); + + // Reply with a JsonObject associating the item ARK with the URL for the audiowaveform data + aMessage.reply(new JsonObject().put(csvItem.getItemARK(), audiowaveformURL)); + } else { + final String s3ErrorMsg = + LOGGER.getMessage(MessageCodes.AVPT_022, s3ObjectKey, err.getMessage()); + + // Since the sender (WatcherVerticle) just logs all errors, should be okay to use a single + // failureCode for all errors + aMessage.fail(Op.ERROR_CODE, s3ErrorMsg); + } + }); + } catch (final IOException details) { + aMessage.fail(Op.ERROR_CODE, details.getMessage()); + } }).onFailure(details -> { aMessage.fail(Op.ERROR_CODE, details.getMessage()); }); @@ -194,11 +203,11 @@ private void handle(final Message aMessage) { * Transforms the source audio file at the given path into binary audiowaveform data. * * @param anAudioFilePath The path to the audio file to transform - * @return A Future that is completed with a ByteBuffer containing the audiowaveform data + * @return A Future that is completed with a byte array containing the audiowaveform data * @throws IOException if an I/O error occurs during the execution of the audiowaveform program */ - private Future audiowaveform(final Path anAudioFilePath) throws IOException { - final Promise asyncResult = Promise.promise(); + private Future audiowaveform(final Path anAudioFilePath) throws IOException { + final Promise asyncResult = Promise.promise(); final String[] cmd = { AUDIOWAVEFORM, "--input-filename", anAudioFilePath.toString(), "--output-format", "dat", "--bits", "8" }; final String cmdline = String.join(SPACE, cmd); @@ -221,7 +230,7 @@ private Future audiowaveform(final Path anAudioFilePath) throws IOEx // Redact the binary audiowaveform data for logging LOGGER.debug(MessageCodes.AVPT_015, cmdline, exitValue, "[binary audiowaveform data]"); - asyncResult.complete(ByteBuffer.wrap(stdout)); + asyncResult.complete(stdout); } else { asyncResult.fail(LOGGER.getMessage(MessageCodes.AVPT_015, cmdline, exitValue, stderr)); } @@ -232,4 +241,24 @@ private Future audiowaveform(final Path anAudioFilePath) throws IOEx return asyncResult.future(); } + + /** + * Compresses the data in the given byte array to GZIP format. + * + * @param aByteArray The uncompressed data + * @return The compressed data + * @throws IOException if an I/O error occurs during the data compression + */ + private byte[] gzip(final byte[] aByteArray) throws IOException { + final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + + try (GZIPOutputStream gz = new GZIPOutputStream(outputStream)) { + gz.write(aByteArray); + gz.finish(); + + return outputStream.toByteArray(); + } catch (final IOException details) { + throw new IOException(LOGGER.getMessage(MessageCodes.AVPT_023, details)); + } + } } diff --git a/src/main/resources/av-pairtree_messages.xml b/src/main/resources/av-pairtree_messages.xml index 5792725..83c6d15 100644 --- a/src/main/resources/av-pairtree_messages.xml +++ b/src/main/resources/av-pairtree_messages.xml @@ -28,5 +28,6 @@ The environment variable AUDIOWAVEFORM_S3_BUCKET must be set The environment variable AUDIOWAVEFORM_S3_OBJECT_URL_TEMPLATE must be set Unable to upload audiowaveform for item '{}' to S3: {} + Unable to compress data: {} diff --git a/src/test/java/edu/ucla/library/avpairtree/verticles/WaveformVerticleTest.java b/src/test/java/edu/ucla/library/avpairtree/verticles/WaveformVerticleTest.java index a7e1b25..ee370eb 100644 --- a/src/test/java/edu/ucla/library/avpairtree/verticles/WaveformVerticleTest.java +++ b/src/test/java/edu/ucla/library/avpairtree/verticles/WaveformVerticleTest.java @@ -52,14 +52,28 @@ public void testWaveformGenerationAndS3Storage(final TestContext aContext) { WebClient.create(vertx).getAbs(audiowaveformURL).send().onSuccess(resp -> { final Buffer expected = - vertx.fileSystem().readFileBlocking("src/test/resources/soul/audio/uclapasc.dat"); + vertx.fileSystem().readFileBlocking("src/test/resources/soul/audio/uclapasc.dat.gz"); final Buffer actual = resp.body(); + // Partition the GZIP data into the header, body, and footer (according to RFC 1952) + final Buffer expectedHeader = expected.getBuffer(0, 10); + final Buffer actualHeader = actual.getBuffer(0, 10); + + final Buffer expectedBody = expected.getBuffer(10, expected.length() - 8); + final Buffer actualBody = actual.getBuffer(10, actual.length() - 8); + + final Buffer expectedFooter = expected.getBuffer(expected.length() - 8, expected.length()); + final Buffer actualFooter = actual.getBuffer(actual.length() - 8, actual.length()); + try { - assertEquals(expected, actual); + // Apparently JDK 11 doesn't implement RFC 1952 correctly (i.e., it always sets the OS field (the + // last byte in the header) to "0"), so only compare the first nine bytes + assertEquals(expectedHeader.getBuffer(0, expectedHeader.length() - 1), + actualHeader.getBuffer(0, actualHeader.length() - 1)); + assertEquals(expectedBody, actualBody); + assertEquals(expectedFooter, actualFooter); } catch (final AssertionError details) { - LOGGER.error(details, details.getMessage()); - aContext.fail(); + aContext.fail(details); } finally { // TODO: clean up the S3 bucket asyncTask.complete(); diff --git a/src/test/resources/soul/audio/README.md b/src/test/resources/soul/audio/README.md new file mode 100644 index 0000000..b479344 --- /dev/null +++ b/src/test/resources/soul/audio/README.md @@ -0,0 +1,5 @@ +The file `uclapasc.dat.gz` was generated with the command: + +```bash +audiowaveform --input-filename uclapasc.wav --output-format dat --bits 8 | gzip -n - > uclapasc.dat.gz +``` \ No newline at end of file diff --git a/src/test/resources/soul/audio/uclapasc.dat b/src/test/resources/soul/audio/uclapasc.dat deleted file mode 100644 index 8a4ed8b..0000000 Binary files a/src/test/resources/soul/audio/uclapasc.dat and /dev/null differ diff --git a/src/test/resources/soul/audio/uclapasc.dat.gz b/src/test/resources/soul/audio/uclapasc.dat.gz new file mode 100644 index 0000000..b44fdc9 Binary files /dev/null and b/src/test/resources/soul/audio/uclapasc.dat.gz differ