diff --git a/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeOutputStream.java b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeOutputStream.java index 74d5b1361d8..6e20308aef5 100644 --- a/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeOutputStream.java +++ b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeOutputStream.java @@ -1,14 +1,13 @@ package datadog.trace.bootstrap.instrumentation.buffer; -import java.io.FilterOutputStream; import java.io.IOException; import java.io.OutputStream; /** - * A circular buffer with a lookbehind buffer of n bytes. The first time that the latest n bytes - * matches the marker, a content is injected before. + * An OutputStream containing a circular buffer with a lookbehind buffer of n bytes. The first time + * that the latest n bytes matches the marker, a content is injected before. */ -public class InjectingPipeOutputStream extends FilterOutputStream { +public class InjectingPipeOutputStream extends OutputStream { private final byte[] lookbehind; private int pos; private boolean bufferFilled; @@ -18,10 +17,11 @@ public class InjectingPipeOutputStream extends FilterOutputStream { private int matchingPos = 0; private final Runnable onContentInjected; private final int bulkWriteThreshold; + private final OutputStream downstream; /** * @param downstream the delegate output stream - * @param marker the marker to find in the stream + * @param marker the marker to find in the stream. Must at least be one byte. * @param contentToInject the content to inject once before the marker if found. * @param onContentInjected callback called when and if the content is injected. */ @@ -30,7 +30,7 @@ public InjectingPipeOutputStream( final byte[] marker, final byte[] contentToInject, final Runnable onContentInjected) { - super(downstream); + this.downstream = downstream; this.marker = marker; this.lookbehind = new byte[marker.length]; this.pos = 0; @@ -42,12 +42,12 @@ public InjectingPipeOutputStream( @Override public void write(int b) throws IOException { if (found) { - out.write(b); + downstream.write(b); return; } if (bufferFilled) { - out.write(lookbehind[pos]); + downstream.write(lookbehind[pos]); } lookbehind[pos] = (byte) b; @@ -60,7 +60,7 @@ public void write(int b) throws IOException { if (marker[matchingPos++] == b) { if (matchingPos == marker.length) { found = true; - out.write(contentToInject); + downstream.write(contentToInject); if (onContentInjected != null) { onContentInjected.run(); } @@ -72,46 +72,48 @@ public void write(int b) throws IOException { } @Override - public void write(byte[] b, int off, int len) throws IOException { + public void write(byte[] array, int off, int len) throws IOException { if (found) { - out.write(b, off, len); + downstream.write(array, off, len); return; } if (len > bulkWriteThreshold) { // if the content is large enough, we can bulk write everything but the N trail and tail. // This because the buffer can already contain some byte from a previous single write. // Also we need to fill the buffer with the tail since we don't know about the next write. - int idx = arrayContains(b, marker); + int idx = arrayContains(array, off, len, marker); if (idx >= 0) { // we have a full match. just write everything found = true; drain(); - out.write(b, off, idx); - out.write(contentToInject); + downstream.write(array, off, idx); + downstream.write(contentToInject); if (onContentInjected != null) { onContentInjected.run(); } - out.write(b, off + idx, len - idx); + downstream.write(array, off + idx, len - idx); } else { // we don't have a full match. write everything in a bulk except the lookbehind buffer // sequentially for (int i = off; i < off + marker.length - 1; i++) { - write(b[i]); + write(array[i]); } drain(); - out.write(b, off + marker.length - 1, len - bulkWriteThreshold); + downstream.write(array, off + marker.length - 1, len - bulkWriteThreshold); for (int i = len - marker.length + 1; i < len; i++) { - write(b[i]); + write(array[i]); } } } else { // use slow path because the length to write is small and within the lookbehind buffer size - super.write(b, off, len); + for (int i = off; i < off + len; i++) { + write(array[i]); + } } } - private int arrayContains(byte[] array, byte[] search) { - for (int i = 0; i < array.length - search.length; i++) { + private int arrayContains(byte[] array, int off, int len, byte[] search) { + for (int i = off; i < len - search.length; i++) { if (array[i] == search[0]) { boolean found = true; int k = i; @@ -133,10 +135,10 @@ private int arrayContains(byte[] array, byte[] search) { private void drain() throws IOException { if (bufferFilled) { for (int i = 0; i < lookbehind.length; i++) { - out.write(lookbehind[(pos + i) % lookbehind.length]); + downstream.write(lookbehind[(pos + i) % lookbehind.length]); } } else { - out.write(this.lookbehind, 0, pos); + downstream.write(this.lookbehind, 0, pos); } pos = 0; matchingPos = 0; @@ -148,6 +150,6 @@ public void close() throws IOException { if (!found) { drain(); } - super.close(); + downstream.close(); } } diff --git a/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeWriter.java b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeWriter.java new file mode 100644 index 00000000000..00746a77ecd --- /dev/null +++ b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeWriter.java @@ -0,0 +1,160 @@ +package datadog.trace.bootstrap.instrumentation.buffer; + +import java.io.IOException; +import java.io.Writer; + +/** + * An Writer containing a circular buffer with a lookbehind buffer of n bytes. The first time that + * the latest n bytes matches the marker, a content is injected before. + */ +public class InjectingPipeWriter extends Writer { + private final char[] lookbehind; + private int pos; + private boolean bufferFilled; + private final char[] marker; + private final char[] contentToInject; + private boolean found = false; + private int matchingPos = 0; + private final Runnable onContentInjected; + private final int bulkWriteThreshold; + private final Writer downstream; + + /** + * @param downstream the delegate writer + * @param marker the marker to find in the stream. Must at least be one char. + * @param contentToInject the content to inject once before the marker if found. + * @param onContentInjected callback called when and if the content is injected. + */ + public InjectingPipeWriter( + final Writer downstream, + final char[] marker, + final char[] contentToInject, + final Runnable onContentInjected) { + this.downstream = downstream; + this.marker = marker; + this.lookbehind = new char[marker.length]; + this.pos = 0; + this.contentToInject = contentToInject; + this.onContentInjected = onContentInjected; + this.bulkWriteThreshold = marker.length * 2 - 2; + } + + @Override + public void write(int b) throws IOException { + if (found) { + downstream.write(b); + return; + } + + if (bufferFilled) { + downstream.write(lookbehind[pos]); + } + + lookbehind[pos] = (char) b; + pos = (pos + 1) % lookbehind.length; + + if (!bufferFilled) { + bufferFilled = pos == 0; + } + + if (marker[matchingPos++] == b) { + if (matchingPos == marker.length) { + found = true; + downstream.write(contentToInject); + if (onContentInjected != null) { + onContentInjected.run(); + } + drain(); + } + } else { + matchingPos = 0; + } + } + + @Override + public void flush() throws IOException { + downstream.flush(); + } + + @Override + public void write(char[] array, int off, int len) throws IOException { + if (found) { + downstream.write(array, off, len); + return; + } + if (len > bulkWriteThreshold) { + // if the content is large enough, we can bulk write everything but the N trail and tail. + // This because the buffer can already contain some byte from a previous single write. + // Also we need to fill the buffer with the tail since we don't know about the next write. + int idx = arrayContains(array, off, len, marker); + if (idx >= 0) { + // we have a full match. just write everything + found = true; + drain(); + downstream.write(array, off, idx); + downstream.write(contentToInject); + if (onContentInjected != null) { + onContentInjected.run(); + } + downstream.write(array, off + idx, len - idx); + } else { + // we don't have a full match. write everything in a bulk except the lookbehind buffer + // sequentially + for (int i = off; i < off + marker.length - 1; i++) { + write(array[i]); + } + drain(); + downstream.write(array, off + marker.length - 1, len - bulkWriteThreshold); + for (int i = len - marker.length + 1; i < len; i++) { + write(array[i]); + } + } + } else { + // use slow path because the length to write is small and within the lookbehind buffer size + for (int i = off; i < off + len; i++) { + write(array[i]); + } + } + } + + private int arrayContains(char[] array, int off, int len, char[] search) { + for (int i = off; i < len - search.length; i++) { + if (array[i] == search[0]) { + boolean found = true; + int k = i; + for (int j = 1; j < search.length; j++) { + k++; + if (array[k] != search[j]) { + found = false; + break; + } + } + if (found) { + return i; + } + } + } + return -1; + } + + private void drain() throws IOException { + if (bufferFilled) { + for (int i = 0; i < lookbehind.length; i++) { + downstream.write(lookbehind[(pos + i) % lookbehind.length]); + } + } else { + downstream.write(this.lookbehind, 0, pos); + } + pos = 0; + matchingPos = 0; + bufferFilled = false; + } + + @Override + public void close() throws IOException { + if (!found) { + drain(); + } + downstream.close(); + } +} diff --git a/dd-java-agent/agent-bootstrap/src/test/groovy/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeOutputStreamTest.groovy b/dd-java-agent/agent-bootstrap/src/test/groovy/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeOutputStreamTest.groovy index 3ffa60acc61..457b26577ba 100644 --- a/dd-java-agent/agent-bootstrap/src/test/groovy/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeOutputStreamTest.groovy +++ b/dd-java-agent/agent-bootstrap/src/test/groovy/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeOutputStreamTest.groovy @@ -3,24 +3,6 @@ package datadog.trace.bootstrap.instrumentation.buffer import datadog.trace.test.util.DDSpecification class InjectingPipeOutputStreamTest extends DDSpecification { - - static class ExceptionControlledOutputStream extends FilterOutputStream { - - boolean failWrite = false - - ExceptionControlledOutputStream(OutputStream out) { - super(out) - } - - @Override - void write(int b) throws IOException { - if (failWrite) { - throw new IOException("Failed") - } - super.write(b) - } - } - def 'should filter a buffer and inject if found #found'() { setup: def downstream = new ByteArrayOutputStream() diff --git a/dd-java-agent/agent-bootstrap/src/test/groovy/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeWriterTest.groovy b/dd-java-agent/agent-bootstrap/src/test/groovy/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeWriterTest.groovy new file mode 100644 index 00000000000..194ce6952a8 --- /dev/null +++ b/dd-java-agent/agent-bootstrap/src/test/groovy/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeWriterTest.groovy @@ -0,0 +1,39 @@ +package datadog.trace.bootstrap.instrumentation.buffer + +import datadog.trace.test.util.DDSpecification + +class InjectingPipeWriterTest extends DDSpecification { + def 'should filter a buffer and inject if found #found using write'() { + setup: + def downstream = new StringWriter() + def piped = new PrintWriter(new InjectingPipeWriter(downstream, marker.toCharArray(), contentToInject.toCharArray(), null)) + when: + try (def closeme = piped) { + piped.write(body) + } + then: + assert downstream.toString() == expected + where: + body | marker | contentToInject | found | expected + "
" | "" | "" | true | "