From b721f1eb0f2460bfb7524d8067bddc7905d64876 Mon Sep 17 00:00:00 2001 From: Keuin Date: Sat, 13 Aug 2022 23:21:48 +0800 Subject: Add HistoricMessageRecorder. --- .../messaging/history/HistoricMessageRecorder.java | 51 ++++++++++++++++++++++ .../history/IHistoricMessageRecorder.java | 25 +++++++++++ .../history/HistoricMessageRecorderTest.java | 31 +++++++++++++ 3 files changed, 107 insertions(+) create mode 100644 src/main/java/com/keuin/crosslink/messaging/history/HistoricMessageRecorder.java create mode 100644 src/main/java/com/keuin/crosslink/messaging/history/IHistoricMessageRecorder.java create mode 100644 src/test/java/com/keuin/crosslink/messaging/history/HistoricMessageRecorderTest.java diff --git a/src/main/java/com/keuin/crosslink/messaging/history/HistoricMessageRecorder.java b/src/main/java/com/keuin/crosslink/messaging/history/HistoricMessageRecorder.java new file mode 100644 index 0000000..2412bb4 --- /dev/null +++ b/src/main/java/com/keuin/crosslink/messaging/history/HistoricMessageRecorder.java @@ -0,0 +1,51 @@ +package com.keuin.crosslink.messaging.history; + +import com.keuin.crosslink.messaging.message.IMessage; +import com.keuin.crosslink.util.Pair; + +import java.util.LinkedList; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +public class HistoricMessageRecorder implements IHistoricMessageRecorder { + private final long ttlMillis; + private final LinkedList> que = new LinkedList<>(); + private final Object lock = new Object(); + + public HistoricMessageRecorder(long ttlMillis) { + this.ttlMillis = ttlMillis; + } + + private void clean() { + for (var head = que.peek(); + head != null && + Math.abs(System.currentTimeMillis() - head.getK()) > ttlMillis; + head = que.peek() + ) { + // head has expired, remove it + que.removeFirst(); + } + } + /** + * Add and memorize a message. + * Note: this implementation is synchronized. Be caution if you requires a high performance. + * @param message the message to save. + */ + @Override + public void addMessage(IMessage message) { + Objects.requireNonNull(message); + synchronized (lock) { + que.add(new Pair<>(System.currentTimeMillis(), message)); + clean(); + } + } + + @Override + public List getMessages() { + synchronized (lock) { + clean(); + return que.stream().map(Pair::getV).collect(Collectors.toList()); + } + } +} diff --git a/src/main/java/com/keuin/crosslink/messaging/history/IHistoricMessageRecorder.java b/src/main/java/com/keuin/crosslink/messaging/history/IHistoricMessageRecorder.java new file mode 100644 index 0000000..f5f245d --- /dev/null +++ b/src/main/java/com/keuin/crosslink/messaging/history/IHistoricMessageRecorder.java @@ -0,0 +1,25 @@ +package com.keuin.crosslink.messaging.history; + +import com.keuin.crosslink.messaging.message.IMessage; + +import java.util.List; + +/** + * Save and keep messages with a TTL. + * The recorder behaves like an unlimited queue which features TTL on the messages. + * Messages can be appended and traverse at any time. + * Outdated messages will be removed from the internal storage. + */ +public interface IHistoricMessageRecorder { + /** + * Add and memorize a message. + * @param message the message to save. + */ + void addMessage(IMessage message); + + /** + * Get recent messages. + * @return an {@link List} containing all recent messages in the time scope. + */ + List getMessages(); +} diff --git a/src/test/java/com/keuin/crosslink/messaging/history/HistoricMessageRecorderTest.java b/src/test/java/com/keuin/crosslink/messaging/history/HistoricMessageRecorderTest.java new file mode 100644 index 0000000..4dfdfd4 --- /dev/null +++ b/src/test/java/com/keuin/crosslink/messaging/history/HistoricMessageRecorderTest.java @@ -0,0 +1,31 @@ +package com.keuin.crosslink.messaging.history; + +import com.keuin.crosslink.messaging.endpoint.IEndpoint; +import com.keuin.crosslink.messaging.message.IMessage; +import com.keuin.crosslink.messaging.sender.ISender; +import com.keuin.crosslink.testable.FakeEndpoint; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.UUID; + +import static org.junit.jupiter.api.Assertions.*; + +class HistoricMessageRecorderTest { + + @Test + void testTTL() throws InterruptedException { + var recorder = new HistoricMessageRecorder(1000); + var ep0 = new FakeEndpoint("z"); + var sender = ISender.create("sender", UUID.randomUUID()); + var msg = IMessage.create(ep0, sender, "MSG,,,"); + recorder.addMessage(msg); + Thread.sleep(500); + var list = recorder.getMessages(); + assertEquals(1, list.size()); + assertEquals(msg, list.get(0)); + Thread.sleep(510); + list = recorder.getMessages(); + assertEquals(0, list.size()); + } +} \ No newline at end of file -- cgit v1.2.3