@@ -59,7 +59,11 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
5959
6060 private final Class <? extends Channel > channelClass ;
6161
62- private AsyncFSOutput output ;
62+ private volatile AsyncFSOutput output ;
63+ /**
64+ * Save {@link AsyncFSOutput#getSyncedLength()} when {@link #output} is closed.
65+ */
66+ private volatile long finalSyncedLength = -1 ;
6367
6468 private static final class OutputStreamWrapper extends OutputStream
6569 implements ByteBufferWriter {
@@ -156,6 +160,13 @@ public synchronized void close() throws IOException {
156160 LOG .warn ("normal close failed, try recover" , e );
157161 output .recoverAndClose (null );
158162 }
163+ /**
164+ * We have to call {@link AsyncFSOutput#getSyncedLength()}
165+ * after {@link AsyncFSOutput#close()} to get the final length
166+ * synced to underlying filesystem because {@link AsyncFSOutput#close()}
167+ * may also flush some data to underlying filesystem.
168+ */
169+ this .finalSyncedLength = this .output .getSyncedLength ();
159170 this .output = null ;
160171 }
161172
@@ -234,6 +245,17 @@ protected OutputStream getOutputStreamForCellEncoder() {
234245
235246 @ Override
236247 public long getSyncedLength () {
237- return this .output .getSyncedLength ();
248+ /**
249+ * The statement "this.output = null;" in {@link AsyncProtobufLogWriter#close}
250+ * is a sync point, if output is null, then finalSyncedLength must set,
251+ * so we can return finalSyncedLength, else we return output.getSyncedLength
252+ */
253+ AsyncFSOutput outputToUse = this .output ;
254+ if (outputToUse == null ) {
255+ long finalSyncedLengthToUse = this .finalSyncedLength ;
256+ assert finalSyncedLengthToUse >= 0 ;
257+ return finalSyncedLengthToUse ;
258+ }
259+ return outputToUse .getSyncedLength ();
238260 }
239261}
0 commit comments