summaryrefslogtreecommitdiff
path: root/src/main/java/com
diff options
context:
space:
mode:
authorKeuin <[email protected]>2022-09-24 11:44:51 +0800
committerKeuin <[email protected]>2022-09-25 19:31:34 +0800
commit1f697291f91aec5f76e017ae955822356adb22ad (patch)
tree52528bcaa5cfffe63bd293320acaec129070f5ce /src/main/java/com
parentfac973923d378c7111ba04114ca145b7ba03dfb9 (diff)
Fix PSMB reconnect not working. Use try-with.HEADmaster
Diffstat (limited to 'src/main/java/com')
-rw-r--r--src/main/java/com/keuin/crosslink/messaging/endpoint/remote/psmb/PsmbEndpoint.java100
-rw-r--r--src/main/java/com/keuin/crosslink/messaging/endpoint/remote/psmb/Util.java18
-rw-r--r--src/main/java/com/keuin/psmb4j/BaseClient.java1
3 files changed, 70 insertions, 49 deletions
diff --git a/src/main/java/com/keuin/crosslink/messaging/endpoint/remote/psmb/PsmbEndpoint.java b/src/main/java/com/keuin/crosslink/messaging/endpoint/remote/psmb/PsmbEndpoint.java
index 48df6a8..771268a 100644
--- a/src/main/java/com/keuin/crosslink/messaging/endpoint/remote/psmb/PsmbEndpoint.java
+++ b/src/main/java/com/keuin/crosslink/messaging/endpoint/remote/psmb/PsmbEndpoint.java
@@ -63,38 +63,39 @@ public class PsmbEndpoint implements IEndpoint {
}
private void publish() {
- final var pub = new PublishClient(host, port);
final var logger = LoggerFactory.getLogger(LoggerNaming.name().of("endpoint").of("psmb")
.of(String.format("%s,%s", host, pubTopic)).of("pub").toString());
try {
// reconnect loop
while (true) {
- // try to connect
- try {
- pub.connect();
- pub.setPublish(pubTopic);
- } catch (IOException | CommandFailureException ex) {
- logger.error("Cannot connect to server", ex);
- //noinspection BusyWait
- Thread.sleep(RETRY_INTERVAL_MILLIS);
- continue;
- }
- // connected successfully, send messages
- try {
- // publish loop
- long lastBeat = -1;
- while (true) {
- if (Math.abs(System.currentTimeMillis() - lastBeat) >= keepAliveInterval) {
- pub.keepAlive();
- lastBeat = System.currentTimeMillis();
+ try (final var pub = new PublishClient(host, port)) {
+ // try to connect
+ try {
+ pub.connect();
+ pub.setPublish(pubTopic);
+ } catch (IOException | CommandFailureException ex) {
+ logger.error("Cannot connect to server", ex);
+ Util.SleepBeforeReconnect(logger);
+ continue;
+ }
+ // connected successfully, send messages
+ try {
+ // publish loop
+ long lastBeat = -1;
+ while (true) {
+ if (Math.abs(System.currentTimeMillis() - lastBeat) >= keepAliveInterval) {
+ pub.keepAlive();
+ lastBeat = System.currentTimeMillis();
+ }
+ var message = pubQueue.poll(Math.max(keepAliveInterval, 0), TimeUnit.MILLISECONDS);
+ if (message == null) continue;
+ pub.publish(PsmbMessageSerializer.serialize(message));
}
- var message = pubQueue.poll(Math.max(keepAliveInterval, 0), TimeUnit.MILLISECONDS);
- if (message == null) continue;
- pub.publish(PsmbMessageSerializer.serialize(message));
+ } catch (IOException | CommandFailureException ex) {
+ logger.error("Cannot publish message", ex);
+ pub.disconnect(); // reconnect in the outer loop
+ Util.SleepBeforeReconnect(logger);
}
- } catch (IOException | CommandFailureException ex) {
- logger.error("Cannot publish message", ex);
- pub.disconnect(); // reconnect in the outer loop
}
}
} catch (InterruptedException ignored) {
@@ -105,35 +106,36 @@ public class PsmbEndpoint implements IEndpoint {
}
private void subscribe() {
- final var sub = new SubscribeClient(host, port, subPattern, keepAliveInterval, subId);
final var logger = LoggerFactory.getLogger(LoggerNaming.name().of("endpoint").of("psmb")
.of(String.format("%s,%d,%s", host, subId, subPattern)).of("sub").toString());
try {
// reconnect loop
while (true) {
- // try to connect
- try {
- sub.connect();
- sub.setSubscribe(subPattern, subId);
- } catch (IOException | CommandFailureException ex) {
- logger.error("Cannot connect to server", ex);
- //noinspection BusyWait
- Thread.sleep(RETRY_INTERVAL_MILLIS);
- continue;
- }
- // connected successfully, receive messages
- try {
- // subscribe loop
- sub.subscribe(raw -> {
- try {
- onMessage(PsmbMessageSerializer.deserialize(raw, this));
- } catch (PsmbMessageSerializer.IllegalPackedMessageException ex) {
- logger.error("Cannot decode message", ex);
- }
- });
- } catch (IOException | CommandFailureException ex) {
- logger.error("Cannot receive message", ex);
- sub.disconnect(); // reconnect in the outer loop
+ try (final var sub = new SubscribeClient(host, port, subPattern, keepAliveInterval, subId)) {
+ // try to connect
+ try {
+ sub.connect();
+ sub.setSubscribe(subPattern, subId);
+ } catch (IOException | CommandFailureException ex) {
+ logger.error("Cannot connect to server", ex);
+ Util.SleepBeforeReconnect(logger);
+ continue;
+ }
+ // connected successfully, receive messages
+ try {
+ // subscribe loop
+ sub.subscribe(raw -> {
+ try {
+ onMessage(PsmbMessageSerializer.deserialize(raw, this));
+ } catch (PsmbMessageSerializer.IllegalPackedMessageException ex) {
+ logger.error("Cannot decode message", ex);
+ }
+ });
+ } catch (IOException | CommandFailureException ex) {
+ logger.error("Cannot receive message", ex);
+ sub.disconnect(); // reconnect in the outer loop
+ Util.SleepBeforeReconnect(logger);
+ }
}
}
} catch (InterruptedException ignored) {
diff --git a/src/main/java/com/keuin/crosslink/messaging/endpoint/remote/psmb/Util.java b/src/main/java/com/keuin/crosslink/messaging/endpoint/remote/psmb/Util.java
new file mode 100644
index 0000000..3fc1d0f
--- /dev/null
+++ b/src/main/java/com/keuin/crosslink/messaging/endpoint/remote/psmb/Util.java
@@ -0,0 +1,18 @@
+package com.keuin.crosslink.messaging.endpoint.remote.psmb;
+
+import org.slf4j.Logger;
+
+import static com.keuin.crosslink.messaging.endpoint.remote.psmb.PsmbEndpoint.RETRY_INTERVAL_MILLIS;
+
+public class Util {
+ public static void SleepBeforeReconnect(Logger logger) throws InterruptedException {
+ String time;
+ if (RETRY_INTERVAL_MILLIS > 1000) {
+ time = String.format("%.1fs", RETRY_INTERVAL_MILLIS / 1000.0);
+ } else {
+ time = String.format("%dms", RETRY_INTERVAL_MILLIS);
+ }
+ logger.info("Wait for {} before reconnecting.", time);
+ Thread.sleep(RETRY_INTERVAL_MILLIS);
+ }
+}
diff --git a/src/main/java/com/keuin/psmb4j/BaseClient.java b/src/main/java/com/keuin/psmb4j/BaseClient.java
index bdcd97c..94f46cb 100644
--- a/src/main/java/com/keuin/psmb4j/BaseClient.java
+++ b/src/main/java/com/keuin/psmb4j/BaseClient.java
@@ -109,6 +109,7 @@ public abstract class BaseClient implements AutoCloseable {
try {
socket.close();
} catch (IOException ignored) {
+ } finally {
socket = null;
}
}