summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorKeuin <[email protected]>2022-08-13 23:21:48 +0800
committerKeuin <[email protected]>2022-08-13 23:21:48 +0800
commitb721f1eb0f2460bfb7524d8067bddc7905d64876 (patch)
treef99ca1a4be062c1b5ec182fdb3c31f5241427a6c /src
parentb068d11b1d8423ae07b7deaf4cdb57b47b828b26 (diff)
Add HistoricMessageRecorder.
Diffstat (limited to 'src')
-rw-r--r--src/main/java/com/keuin/crosslink/messaging/history/HistoricMessageRecorder.java51
-rw-r--r--src/main/java/com/keuin/crosslink/messaging/history/IHistoricMessageRecorder.java25
-rw-r--r--src/test/java/com/keuin/crosslink/messaging/history/HistoricMessageRecorderTest.java31
3 files changed, 107 insertions, 0 deletions
diff --git a/src/main/java/com/keuin/crosslink/messaging/history/HistoricMessageRecorder.java b/src/main/java/com/keuin/crosslink/messaging/history/HistoricMessageRecorder.java
new file mode 100644
index 0000000..2412bb4
--- /dev/null
+++ b/src/main/java/com/keuin/crosslink/messaging/history/HistoricMessageRecorder.java
@@ -0,0 +1,51 @@
+package com.keuin.crosslink.messaging.history;
+
+import com.keuin.crosslink.messaging.message.IMessage;
+import com.keuin.crosslink.util.Pair;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+public class HistoricMessageRecorder implements IHistoricMessageRecorder {
+ private final long ttlMillis;
+ private final LinkedList<Pair<Long, IMessage>> que = new LinkedList<>();
+ private final Object lock = new Object();
+
+ public HistoricMessageRecorder(long ttlMillis) {
+ this.ttlMillis = ttlMillis;
+ }
+
+ private void clean() {
+ for (var head = que.peek();
+ head != null &&
+ Math.abs(System.currentTimeMillis() - head.getK()) > ttlMillis;
+ head = que.peek()
+ ) {
+ // head has expired, remove it
+ que.removeFirst();
+ }
+ }
+ /**
+ * Add and memorize a message.
+ * Note: this implementation is synchronized. Be caution if you requires a high performance.
+ * @param message the message to save.
+ */
+ @Override
+ public void addMessage(IMessage message) {
+ Objects.requireNonNull(message);
+ synchronized (lock) {
+ que.add(new Pair<>(System.currentTimeMillis(), message));
+ clean();
+ }
+ }
+
+ @Override
+ public List<IMessage> getMessages() {
+ synchronized (lock) {
+ clean();
+ return que.stream().map(Pair::getV).collect(Collectors.toList());
+ }
+ }
+}
diff --git a/src/main/java/com/keuin/crosslink/messaging/history/IHistoricMessageRecorder.java b/src/main/java/com/keuin/crosslink/messaging/history/IHistoricMessageRecorder.java
new file mode 100644
index 0000000..f5f245d
--- /dev/null
+++ b/src/main/java/com/keuin/crosslink/messaging/history/IHistoricMessageRecorder.java
@@ -0,0 +1,25 @@
+package com.keuin.crosslink.messaging.history;
+
+import com.keuin.crosslink.messaging.message.IMessage;
+
+import java.util.List;
+
+/**
+ * Save and keep messages with a TTL.
+ * The recorder behaves like an unlimited queue which features TTL on the messages.
+ * Messages can be appended and traverse at any time.
+ * Outdated messages will be removed from the internal storage.
+ */
+public interface IHistoricMessageRecorder {
+ /**
+ * Add and memorize a message.
+ * @param message the message to save.
+ */
+ void addMessage(IMessage message);
+
+ /**
+ * Get recent messages.
+ * @return an {@link List} containing all recent messages in the time scope.
+ */
+ List<IMessage> getMessages();
+}
diff --git a/src/test/java/com/keuin/crosslink/messaging/history/HistoricMessageRecorderTest.java b/src/test/java/com/keuin/crosslink/messaging/history/HistoricMessageRecorderTest.java
new file mode 100644
index 0000000..4dfdfd4
--- /dev/null
+++ b/src/test/java/com/keuin/crosslink/messaging/history/HistoricMessageRecorderTest.java
@@ -0,0 +1,31 @@
+package com.keuin.crosslink.messaging.history;
+
+import com.keuin.crosslink.messaging.endpoint.IEndpoint;
+import com.keuin.crosslink.messaging.message.IMessage;
+import com.keuin.crosslink.messaging.sender.ISender;
+import com.keuin.crosslink.testable.FakeEndpoint;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.UUID;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+class HistoricMessageRecorderTest {
+
+ @Test
+ void testTTL() throws InterruptedException {
+ var recorder = new HistoricMessageRecorder(1000);
+ var ep0 = new FakeEndpoint("z");
+ var sender = ISender.create("sender", UUID.randomUUID());
+ var msg = IMessage.create(ep0, sender, "MSG,,,");
+ recorder.addMessage(msg);
+ Thread.sleep(500);
+ var list = recorder.getMessages();
+ assertEquals(1, list.size());
+ assertEquals(msg, list.get(0));
+ Thread.sleep(510);
+ list = recorder.getMessages();
+ assertEquals(0, list.size());
+ }
+} \ No newline at end of file