From b721f1eb0f2460bfb7524d8067bddc7905d64876 Mon Sep 17 00:00:00 2001 From: Keuin Date: Sat, 13 Aug 2022 23:21:48 +0800 Subject: Add HistoricMessageRecorder. --- .../messaging/history/HistoricMessageRecorder.java | 51 ++++++++++++++++++++++ .../history/IHistoricMessageRecorder.java | 25 +++++++++++ 2 files changed, 76 insertions(+) create mode 100644 src/main/java/com/keuin/crosslink/messaging/history/HistoricMessageRecorder.java create mode 100644 src/main/java/com/keuin/crosslink/messaging/history/IHistoricMessageRecorder.java (limited to 'src/main') diff --git a/src/main/java/com/keuin/crosslink/messaging/history/HistoricMessageRecorder.java b/src/main/java/com/keuin/crosslink/messaging/history/HistoricMessageRecorder.java new file mode 100644 index 0000000..2412bb4 --- /dev/null +++ b/src/main/java/com/keuin/crosslink/messaging/history/HistoricMessageRecorder.java @@ -0,0 +1,51 @@ +package com.keuin.crosslink.messaging.history; + +import com.keuin.crosslink.messaging.message.IMessage; +import com.keuin.crosslink.util.Pair; + +import java.util.LinkedList; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +public class HistoricMessageRecorder implements IHistoricMessageRecorder { + private final long ttlMillis; + private final LinkedList> que = new LinkedList<>(); + private final Object lock = new Object(); + + public HistoricMessageRecorder(long ttlMillis) { + this.ttlMillis = ttlMillis; + } + + private void clean() { + for (var head = que.peek(); + head != null && + Math.abs(System.currentTimeMillis() - head.getK()) > ttlMillis; + head = que.peek() + ) { + // head has expired, remove it + que.removeFirst(); + } + } + /** + * Add and memorize a message. + * Note: this implementation is synchronized. Be caution if you requires a high performance. + * @param message the message to save. + */ + @Override + public void addMessage(IMessage message) { + Objects.requireNonNull(message); + synchronized (lock) { + que.add(new Pair<>(System.currentTimeMillis(), message)); + clean(); + } + } + + @Override + public List getMessages() { + synchronized (lock) { + clean(); + return que.stream().map(Pair::getV).collect(Collectors.toList()); + } + } +} diff --git a/src/main/java/com/keuin/crosslink/messaging/history/IHistoricMessageRecorder.java b/src/main/java/com/keuin/crosslink/messaging/history/IHistoricMessageRecorder.java new file mode 100644 index 0000000..f5f245d --- /dev/null +++ b/src/main/java/com/keuin/crosslink/messaging/history/IHistoricMessageRecorder.java @@ -0,0 +1,25 @@ +package com.keuin.crosslink.messaging.history; + +import com.keuin.crosslink.messaging.message.IMessage; + +import java.util.List; + +/** + * Save and keep messages with a TTL. + * The recorder behaves like an unlimited queue which features TTL on the messages. + * Messages can be appended and traverse at any time. + * Outdated messages will be removed from the internal storage. + */ +public interface IHistoricMessageRecorder { + /** + * Add and memorize a message. + * @param message the message to save. + */ + void addMessage(IMessage message); + + /** + * Get recent messages. + * @return an {@link List} containing all recent messages in the time scope. + */ + List getMessages(); +} -- cgit v1.2.3