diff options
author | Keuin <[email protected]> | 2022-08-13 23:21:48 +0800 |
---|---|---|
committer | Keuin <[email protected]> | 2022-08-13 23:21:48 +0800 |
commit | b721f1eb0f2460bfb7524d8067bddc7905d64876 (patch) | |
tree | f99ca1a4be062c1b5ec182fdb3c31f5241427a6c /src | |
parent | b068d11b1d8423ae07b7deaf4cdb57b47b828b26 (diff) |
Add HistoricMessageRecorder.
Diffstat (limited to 'src')
3 files changed, 107 insertions, 0 deletions
diff --git a/src/main/java/com/keuin/crosslink/messaging/history/HistoricMessageRecorder.java b/src/main/java/com/keuin/crosslink/messaging/history/HistoricMessageRecorder.java new file mode 100644 index 0000000..2412bb4 --- /dev/null +++ b/src/main/java/com/keuin/crosslink/messaging/history/HistoricMessageRecorder.java @@ -0,0 +1,51 @@ +package com.keuin.crosslink.messaging.history; + +import com.keuin.crosslink.messaging.message.IMessage; +import com.keuin.crosslink.util.Pair; + +import java.util.LinkedList; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +public class HistoricMessageRecorder implements IHistoricMessageRecorder { + private final long ttlMillis; + private final LinkedList<Pair<Long, IMessage>> que = new LinkedList<>(); + private final Object lock = new Object(); + + public HistoricMessageRecorder(long ttlMillis) { + this.ttlMillis = ttlMillis; + } + + private void clean() { + for (var head = que.peek(); + head != null && + Math.abs(System.currentTimeMillis() - head.getK()) > ttlMillis; + head = que.peek() + ) { + // head has expired, remove it + que.removeFirst(); + } + } + /** + * Add and memorize a message. + * Note: this implementation is synchronized. Be caution if you requires a high performance. + * @param message the message to save. + */ + @Override + public void addMessage(IMessage message) { + Objects.requireNonNull(message); + synchronized (lock) { + que.add(new Pair<>(System.currentTimeMillis(), message)); + clean(); + } + } + + @Override + public List<IMessage> getMessages() { + synchronized (lock) { + clean(); + return que.stream().map(Pair::getV).collect(Collectors.toList()); + } + } +} diff --git a/src/main/java/com/keuin/crosslink/messaging/history/IHistoricMessageRecorder.java b/src/main/java/com/keuin/crosslink/messaging/history/IHistoricMessageRecorder.java new file mode 100644 index 0000000..f5f245d --- /dev/null +++ b/src/main/java/com/keuin/crosslink/messaging/history/IHistoricMessageRecorder.java @@ -0,0 +1,25 @@ +package com.keuin.crosslink.messaging.history; + +import com.keuin.crosslink.messaging.message.IMessage; + +import java.util.List; + +/** + * Save and keep messages with a TTL. + * The recorder behaves like an unlimited queue which features TTL on the messages. + * Messages can be appended and traverse at any time. + * Outdated messages will be removed from the internal storage. + */ +public interface IHistoricMessageRecorder { + /** + * Add and memorize a message. + * @param message the message to save. + */ + void addMessage(IMessage message); + + /** + * Get recent messages. + * @return an {@link List} containing all recent messages in the time scope. + */ + List<IMessage> getMessages(); +} diff --git a/src/test/java/com/keuin/crosslink/messaging/history/HistoricMessageRecorderTest.java b/src/test/java/com/keuin/crosslink/messaging/history/HistoricMessageRecorderTest.java new file mode 100644 index 0000000..4dfdfd4 --- /dev/null +++ b/src/test/java/com/keuin/crosslink/messaging/history/HistoricMessageRecorderTest.java @@ -0,0 +1,31 @@ +package com.keuin.crosslink.messaging.history; + +import com.keuin.crosslink.messaging.endpoint.IEndpoint; +import com.keuin.crosslink.messaging.message.IMessage; +import com.keuin.crosslink.messaging.sender.ISender; +import com.keuin.crosslink.testable.FakeEndpoint; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.UUID; + +import static org.junit.jupiter.api.Assertions.*; + +class HistoricMessageRecorderTest { + + @Test + void testTTL() throws InterruptedException { + var recorder = new HistoricMessageRecorder(1000); + var ep0 = new FakeEndpoint("z"); + var sender = ISender.create("sender", UUID.randomUUID()); + var msg = IMessage.create(ep0, sender, "MSG,,,"); + recorder.addMessage(msg); + Thread.sleep(500); + var list = recorder.getMessages(); + assertEquals(1, list.size()); + assertEquals(msg, list.get(0)); + Thread.sleep(510); + list = recorder.getMessages(); + assertEquals(0, list.size()); + } +}
\ No newline at end of file |