Skip to content

bigger files with a batch in each line #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: poc-small-files
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -327,18 +327,22 @@ static class UploadFileTask extends UploadTask {

@Override
public void run() {
if (upload(() -> service.upload(uploadUrl, RequestBody.create(path.toFile(), JSON)).execute())) {
try {
Files.delete(path);
} catch (IOException e) {
// will attempt to submit again (rename file?)
LOGGER.log(Level.WARNING, "Cannot delete file " + path, e);
}
try {
if(Files.lines(path).map(batchLine -> upload(() -> service.upload(uploadUrl, RequestBody.create(batchLine, JSON)).execute())).allMatch(Boolean.TRUE::equals)) {
try {
Files.delete(path);
} catch (IOException e) {
// will attempt to submit again (rename file?)
LOGGER.log(Level.WARNING, "Cannot delete file " + path, e);
}
}
} catch (IOException e) {
LOGGER.log(Level.WARNING, "Cannot process file " + path, e);
}
}
}

public void resubmit(Path path) {
public void resubmit(Path path) throws IOException {
networkExecutor.submit(new UploadFileTask(breaker, service, uploadUrl, path));
}

Expand Down
11 changes: 11 additions & 0 deletions analytics/src/main/java/com/segment/analytics/internal/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ public class Config {
public static final int DEFAULT_FALLBACK_QUEUE_FLUSH_SIZE = 50;
public static final int DEFAULT_FALLBACK_QUEUE_FLUSH_MS = 2_000;
public static final int DEFAULT_FALLBACK_ROLLOVER_TIMEOUT_SECONDS = 60;
public static final int DEFAULT_FALLBACK_ROLLOVER_MAX_SIZE = 1024 * 1024 * 5;
public static final String DEFAULT_FALLBACK_FILE = "pending";


public static ThreadFactory defaultThreadFactory() {
return new ThreadFactory() {
@Override
Expand Down Expand Up @@ -184,13 +186,16 @@ public static class FileConfig {
final String filePath;
/** max time to keep a open overflow file before finish the batch */
final int rolloverTimeoutSeconds;
/** max size of files */
final long rolloverMaxSizeBytes;

private FileConfig(Builder builder) {
this.size = builder.size;
this.flushSize = builder.flushSize;
this.flushMs = builder.flushMs;
this.filePath = builder.filePath;
this.rolloverTimeoutSeconds = builder.rolloverTimeoutSeconds;
this.rolloverMaxSizeBytes = builder.rolloverMaxSizeBytes;
}

public static Builder builder() {
Expand All @@ -203,6 +208,7 @@ public static class Builder {
private int flushMs = DEFAULT_FALLBACK_QUEUE_FLUSH_MS;
private String filePath = DEFAULT_FALLBACK_FILE;
private int rolloverTimeoutSeconds = DEFAULT_FALLBACK_ROLLOVER_TIMEOUT_SECONDS;
private long rolloverMaxSizeBytes = DEFAULT_FALLBACK_ROLLOVER_MAX_SIZE;

public Builder size(int value) {
this.size = value;
Expand All @@ -229,6 +235,11 @@ public Builder rolloverTimeoutSeconds(int value) {
return this;
}

public Builder rolloverMaxSizeBytes(long value) {
this.rolloverMaxSizeBytes = value;
return this;
}

public FileConfig build() {
return new FileConfig(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand All @@ -41,8 +42,14 @@ public class FallbackAppender implements Closeable {
*/
private static final long MAX_BATCH_SIZE = 475_000; // 475KB.

private Path currentFile;
private Instant currentStart;
private Path currentFileOverflow;
private Instant currentStartOverflow;
private Path currentFileBatch;
private Instant currentStartBatch;
private Lock currentFileBatchLock = new ReentrantLock();
private long currentLineSize = 0;
boolean firstEventInBatch = true;

private final Path directory;
private final Gson gson;
private final FileConfig config;
Expand All @@ -57,7 +64,8 @@ public FallbackAppender(Gson gson, ThreadFactory threadFactory, FileConfig confi
this.config = config;
this.directory = Files.createDirectories(Path.of(config.filePath));

rollover();
rolloverOverflow();
rolloverBatch();

this.queue = new ArrayBlockingQueue<Message>(config.size);
this.writer = threadFactory.newThread(new FileWriter());
Expand All @@ -66,24 +74,53 @@ public FallbackAppender(Gson gson, ThreadFactory threadFactory, FileConfig confi
}

/** Ends the currentFile and start a new one */
private void rollover() {
private void rolloverOverflow() {
String fileName;
if (currentFile != null) {
fileName = currentFile.getFileName().toString();
if (currentFileOverflow != null) {
fileName = currentFileOverflow.getFileName().toString();
try {
Files.move(
currentFile,
currentFile.resolveSibling(fileName.substring(0, fileName.length() - TMP_EXTENSION.length())),
currentFileOverflow,
currentFileOverflow.resolveSibling(
fileName.substring(0, fileName.length() - TMP_EXTENSION.length())),
StandardCopyOption.ATOMIC_MOVE);
} catch (IOException e) {
LOGGER.log(Level.WARNING, "Cannot rollover " + fileName, e);
}
}

currentStart = Instant.now();
fileName = String.format("%s-%s%s", currentStart.toEpochMilli(), UUID.randomUUID(), TMP_EXTENSION);
this.currentFile = directory.resolve(fileName);
LOGGER.log(Level.FINE, "currentFile : {0}", fileName);
currentStartOverflow = Instant.now();
fileName = String.format("%s-%s%s", currentStartOverflow.toEpochMilli(), UUID.randomUUID(), TMP_EXTENSION);
this.currentFileOverflow = directory.resolve(fileName);
LOGGER.log(Level.FINE, "currentFileOverflow : {0}", fileName);
firstEventInBatch = true;
}

private void rolloverBatch() {
currentFileBatchLock.lock();
try {

String fileName;
if (currentFileBatch != null) {
fileName = currentFileBatch.getFileName().toString();
try {
Files.move(
currentFileBatch,
currentFileBatch.resolveSibling(
fileName.substring(0, fileName.length() - TMP_EXTENSION.length())),
StandardCopyOption.ATOMIC_MOVE);
} catch (IOException e) {
LOGGER.log(Level.WARNING, "Cannot rollover " + fileName, e);
}
}

currentStartBatch = Instant.now();
fileName = String.format("%s-%s%s", currentStartBatch.toEpochMilli(), UUID.randomUUID(), TMP_EXTENSION);
this.currentFileBatch = directory.resolve(fileName);
LOGGER.log(Level.FINE, "currentFileBatch : {0}", fileName);
} finally {
currentFileBatchLock.unlock();
}
}

@Override
Expand All @@ -93,24 +130,25 @@ public void close() {

/** Write a new file with the content of a batch */
public void add(Batch batch) {
String fileName = String.format("%s-%s", batch.sentAt().getTime(), UUID.randomUUID());
Path path = directory.resolve(fileName + TMP_EXTENSION);

currentFileBatchLock.lock();
try (FileChannel fileChannel = FileChannel.open(
path, StandardOpenOption.WRITE, StandardOpenOption.APPEND, StandardOpenOption.CREATE_NEW);
currentFileBatch,
StandardOpenOption.WRITE,
StandardOpenOption.APPEND,
StandardOpenOption.CREATE);
Writer w = Channels.newWriter(fileChannel, StandardCharsets.UTF_8)) {

saveBatch(batch, w);

if (fileChannel.size() > config.rolloverMaxSizeBytes) {
rolloverBatch();
} else {
w.write(System.lineSeparator());
}
// TODO fileChannel.force(true);
} catch (IOException e) {
LOGGER.log(Level.WARNING, "Cannot write file batch file " + fileName, e);
}

try {
Files.move(path, path.resolveSibling(fileName), StandardCopyOption.ATOMIC_MOVE);
} catch (IOException e) {
LOGGER.log(Level.WARNING, "Cannot move file batch file " + fileName, e);
LOGGER.log(Level.WARNING, "Cannot write file batch file " + currentFileBatch, e);
} finally {
currentFileBatchLock.unlock();
}
}

Expand All @@ -136,10 +174,17 @@ public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {

if (Duration.between(currentStart, Instant.now()).getSeconds() > config.rolloverTimeoutSeconds
&& currentFile.toFile().exists()) {
endCurrentFile();
rollover();
if ((Duration.between(currentStartOverflow, Instant.now()).getSeconds()
> config.rolloverTimeoutSeconds
&& currentFileOverflow.toFile().exists())
|| currentFileOverflow.toFile().length() > config.rolloverMaxSizeBytes) {
endCurrentLine();
rolloverOverflow();
}

if (Duration.between(currentStartBatch, Instant.now()).getSeconds() > config.rolloverTimeoutSeconds
&& currentFileBatch.toFile().exists()) {
rolloverBatch();
}

final Message msg = queue.poll(config.flushMs, TimeUnit.MILLISECONDS);
Expand All @@ -165,66 +210,68 @@ public void run() {

private static final byte[] BATCH_BEGIN = "{\"batch\":[".getBytes(StandardCharsets.UTF_8);
private static final byte[] COMMA = ",".getBytes(StandardCharsets.UTF_8);
private static final byte[] NEW_LINE = System.lineSeparator().getBytes(StandardCharsets.UTF_8);
private static final byte[] BATCH_END =
"],\"sentAt\":\"2023-04-19T04:03:46.880Z\",\"writeKey\":\"mywrite\"}".getBytes(StandardCharsets.UTF_8);
// FIXME DateTimeUtils
// FIXME mywrite

private void write(List<Message> batch) {
List<Message> remaining = writeInternal(batch);
while (!remaining.isEmpty()) {
rollover();
remaining = writeInternal(remaining);
}

writeInternal(batch);
batch.clear();
}

/** @return messages that do not fit in the current file */
private List<Message> writeInternal(List<Message> batch) {
private void writeInternal(List<Message> batch) {
try (FileChannel fileChannel = FileChannel.open(
currentFile, StandardOpenOption.WRITE, StandardOpenOption.APPEND, StandardOpenOption.CREATE);
currentFileOverflow,
StandardOpenOption.WRITE,
StandardOpenOption.APPEND,
StandardOpenOption.CREATE);
OutputStream os = Channels.newOutputStream(fileChannel)) {

long currentFileSize = fileChannel.size();
boolean first = currentFileSize == 0;
if (first) {
if (firstEventInBatch) {
os.write(BATCH_BEGIN);
currentLineSize = BATCH_BEGIN.length;
}

for (int i = 0; i < batch.size(); i++) {
Message msg = batch.get(i);
byte[] msgBytes = toJson(msg).getBytes(StandardCharsets.UTF_8);
if (msgBytes.length + currentFileSize + COMMA.length + BATCH_END.length > MAX_BATCH_SIZE) {
if (msgBytes.length + currentLineSize + COMMA.length + BATCH_END.length > MAX_BATCH_SIZE) {
os.write(BATCH_END);
// TODO fileChannel.force(true);

return batch.subList(i, batch.size());
os.write(NEW_LINE);
os.write(BATCH_BEGIN);
currentLineSize = BATCH_BEGIN.length;
firstEventInBatch = true;
}

if (first) {
first = false;
if (firstEventInBatch) {
firstEventInBatch = false;
} else {
os.write(COMMA);
currentLineSize += COMMA.length;
}
os.write(msgBytes);
currentLineSize += msgBytes.length;
}
// TODO fileChannel.force(true);
return Collections.emptyList();
} catch (IOException e) {
LOGGER.log(Level.WARNING, "write file " + currentFile, e);
return Collections.emptyList();
LOGGER.log(Level.WARNING, "write file " + currentFileOverflow, e);
}
}

private void endCurrentFile() {
try (FileChannel fileChannel = FileChannel.open(
currentFile, StandardOpenOption.WRITE, StandardOpenOption.APPEND, StandardOpenOption.CREATE);
private void endCurrentLine() {
try (FileChannel fileChannel =
FileChannel.open(currentFileOverflow, StandardOpenOption.WRITE, StandardOpenOption.APPEND);
OutputStream os = Channels.newOutputStream(fileChannel)) {
os.write(BATCH_END);
if (currentLineSize == BATCH_BEGIN.length) {
fileChannel.truncate(fileChannel.size() - (BATCH_BEGIN.length + NEW_LINE.length));
} else {
os.write(BATCH_END);
}
// TODO fileChannel.force(true);
} catch (IOException e) {
LOGGER.log(Level.WARNING, "write file " + currentFile, e);
LOGGER.log(Level.WARNING, "write file " + currentFileOverflow, e);
}
}

Expand Down