Skip to content

proof of concept for message durability with small files (fit a batch request) #2

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
Prev Previous commit
Next Next commit
truncate lines on resend
  • Loading branch information
apuig committed Mar 31, 2025
commit 389875f6cd8525d9edf7959df67c83b933f44817
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@
import java.io.OutputStream;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.charset.StandardCharsets;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.List;
Expand All @@ -25,6 +25,7 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.commons.io.FileSystem;

public class FallbackAppender {

Expand Down Expand Up @@ -86,25 +87,18 @@ public void run() {
}

List<Message> msgs;
lock.lock();
try {
msgs = read();
msgs = truncate(20); // XXX messageSize
if (msgs.isEmpty()) {
continue;
}
// FIXME now its reading all the msgs and waits until all is processed
// it will be better to work with batch and truncate the file
file.delete();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
lastMessage = System.currentTimeMillis();
continue;
} finally {
lock.unlock();
}

// FIXME batch
while (!msgs.isEmpty()) {
boolean canEnqueue = true;
for (int i = msgs.size() - 1; canEnqueue && i >= 0; i--) {
Expand All @@ -113,16 +107,16 @@ public void run() {
if (canEnqueue) {
msgs.remove(i);
System.err.println("reenqueued " + msg.messageId());
} else {
// slow down next iteration when http queue overflow
try {
Thread.sleep(1_000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
try {
Thread.sleep(1_000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

lastMessage = System.currentTimeMillis();
}

try {
Expand Down Expand Up @@ -161,38 +155,43 @@ public void run() {
}
}

List<Message> read() throws IOException {
if (file.exists()) {
try (FileChannel fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.READ)) {
fileChannel.lock(0, Long.MAX_VALUE, true);

final String[] lines = new String(
Channels.newInputStream(fileChannel).readAllBytes(), StandardCharsets.UTF_8)
.split(System.lineSeparator());
return Arrays.stream(lines)
.map(m -> fromJson(m))
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
} else {
List<Message> truncate(int numMessages) throws IOException {
lock.lock();

if (!file.exists()) {
lock.unlock();
return Collections.emptyList();
}

try (ReversedLinesFileReader reader = ReversedLinesFileReader.builder()
.setPath(file.toPath())
.setBufferSize(FileSystem.getCurrent().getBlockSize())
.setCharset(StandardCharsets.UTF_8)
.get()) {

return reader.readLines(numMessages).stream()
.map(this::fromJson)
.filter(Objects::nonNull)
.collect(Collectors.toList());
} finally {
lock.unlock();
}
}

private static final byte[] NEW_LINE = System.lineSeparator().getBytes(StandardCharsets.UTF_8);

private void write(List<Message> batch) {
lock.lock();
try (FileChannel fileChannel = FileChannel.open(
file.toPath(), StandardOpenOption.WRITE, StandardOpenOption.APPEND, StandardOpenOption.CREATE)) {
fileChannel.lock();
file.toPath(), StandardOpenOption.WRITE, StandardOpenOption.APPEND, StandardOpenOption.CREATE);
OutputStream os = Channels.newOutputStream(fileChannel);
FileLock fileLock = fileChannel.lock(); ) {

final String lines = batch.stream()
.map(this::toJson)
.filter(Objects::nonNull)
.collect(Collectors.joining(System.lineSeparator()));
for (Message msg : batch) {
os.write(toJson(msg).getBytes(StandardCharsets.UTF_8));
os.write(NEW_LINE);
}

OutputStream os = Channels.newOutputStream(fileChannel);
os.write(lines.getBytes(StandardCharsets.UTF_8));
os.write(System.lineSeparator().getBytes(StandardCharsets.UTF_8));
fileChannel.force(true);

batch.clear();
Expand Down