44
55import java .nio .BufferOverflowException ;
66import java .nio .ByteBuffer ;
7- import java .nio .CharBuffer ;
8- import java .nio .charset .Charset ;
9- import java .nio .charset .CharsetEncoder ;
10- import java .nio .charset .CoderResult ;
11- import java .nio .charset .CodingErrorAction ;
127
138import java .util .Queue ;
149import java .util .concurrent .ArrayBlockingQueue ;
2015import java .util .concurrent .atomic .AtomicInteger ;
2116
2217public abstract class StatsDProcessor {
23- protected static final Charset MESSAGE_CHARSET = Charset .forName ("UTF-8" );
24-
2518 protected static final String MESSAGE_TOO_LONG = "Message longer than size of sendBuffer" ;
2619 protected static final int WAIT_SLEEP_MS = 10 ; // 10 ms would be a 100HZ slice
2720
2821 protected final StatsDClientErrorHandler handler ;
2922
23+ final int maxPacketSizeBytes ;
3024 protected final BufferPool bufferPool ;
3125 protected final Queue <Message > highPrioMessages ; // FIFO queue for high priority messages
3226 protected final BlockingQueue <ByteBuffer > outboundQueue ; // FIFO queue with outbound buffers
@@ -47,10 +41,9 @@ public abstract class StatsDProcessor {
4741
4842 protected abstract class ProcessingTask implements Runnable {
4943 protected StringBuilder builder = new StringBuilder ();
50- protected CharBuffer buffer = CharBuffer .wrap (builder );
51- protected final CharsetEncoder utf8Encoder = MESSAGE_CHARSET .newEncoder ()
52- .onMalformedInput (CodingErrorAction .REPLACE )
53- .onUnmappableCharacter (CodingErrorAction .REPLACE );
44+ char [] charBuffer = new char [maxPacketSizeBytes ];
45+ // + 4 so that we can check for buffer overflow without computing encoded length first
46+ final byte [] byteBuffer = new byte [maxPacketSizeBytes + 4 ];
5447
5548 public final void run () {
5649 try {
@@ -148,20 +141,44 @@ protected void processLoop() {
148141 abstract Message getMessage () throws InterruptedException ;
149142
150143 protected void writeBuilderToSendBuffer (ByteBuffer sendBuffer ) {
151-
152144 int length = builder .length ();
153- // use existing charbuffer if possible, otherwise re-wrap
154- if (length <= buffer .capacity ()) {
155- buffer .limit (length ).position (0 );
156- } else {
157- buffer = CharBuffer .wrap (builder );
145+ if (length > charBuffer .length ) {
146+ charBuffer = new char [length ];
158147 }
159148
160- sendBuffer .mark ();
161- if (utf8Encoder .encode (buffer , sendBuffer , true ) == CoderResult .OVERFLOW ) {
162- sendBuffer .reset ();
163- throw new BufferOverflowException ();
149+ // We trust this returns valid UTF-16.
150+ builder .getChars (0 , length , charBuffer , 0 );
151+
152+ int blen = 0 ;
153+ for (int i = 0 ; i < length ; i ++) {
154+ char ch = charBuffer [i ];
155+ // https://en.wikipedia.org/wiki/UTF-8#Description
156+ // https://en.wikipedia.org/wiki/UTF-16#Description
157+ if (ch < 0x80 ) {
158+ byteBuffer [blen ++] = (byte )ch ;
159+ } else if (ch < 0x800 ) {
160+ byteBuffer [blen ++] = (byte )(192 | (ch >> 6 ));
161+ byteBuffer [blen ++] = (byte )(128 | (ch & 63 ));
162+ } else if (ch < 0xd800 || ch >= 0xe000 ) {
163+ byteBuffer [blen ++] = (byte )(224 | (ch >> 12 ));
164+ byteBuffer [blen ++] = (byte )(128 | ((ch >> 6 ) & 63 ));
165+ byteBuffer [blen ++] = (byte )(128 | (ch & 63 ));
166+ } else {
167+ // surrogate pair
168+ int decoded = ((ch & 0x3ff ) << 10 ) | (charBuffer [++i ] & 0x3ff ) | 0x10000 ;
169+ byteBuffer [blen ++] = (byte )(240 | (decoded >> 18 ));
170+ byteBuffer [blen ++] = (byte )(128 | ((decoded >> 12 ) & 63 ));
171+ byteBuffer [blen ++] = (byte )(128 | ((decoded >> 6 ) & 63 ));
172+ byteBuffer [blen ++] = (byte )(128 | (decoded & 63 ));
173+ }
174+
175+ if (blen >= maxPacketSizeBytes ) {
176+ throw new BufferOverflowException ();
177+ }
164178 }
179+
180+ sendBuffer .mark ();
181+ sendBuffer .put (byteBuffer , 0 , blen );
165182 }
166183 }
167184
@@ -175,6 +192,7 @@ protected void writeBuilderToSendBuffer(ByteBuffer sendBuffer) {
175192 this .workers = new Thread [workers ];
176193 this .qcapacity = queueSize ;
177194
195+ this .maxPacketSizeBytes = maxPacketSizeBytes ;
178196 this .bufferPool = new BufferPool (poolSize , maxPacketSizeBytes , true );
179197 this .highPrioMessages = new ConcurrentLinkedQueue <>();
180198 this .outboundQueue = new ArrayBlockingQueue <ByteBuffer >(poolSize );
0 commit comments