diff options
Diffstat (limited to 'src/main/java/com/keuin')
3 files changed, 70 insertions, 49 deletions
diff --git a/src/main/java/com/keuin/crosslink/messaging/endpoint/remote/psmb/PsmbEndpoint.java b/src/main/java/com/keuin/crosslink/messaging/endpoint/remote/psmb/PsmbEndpoint.java index 48df6a8..771268a 100644 --- a/src/main/java/com/keuin/crosslink/messaging/endpoint/remote/psmb/PsmbEndpoint.java +++ b/src/main/java/com/keuin/crosslink/messaging/endpoint/remote/psmb/PsmbEndpoint.java @@ -63,38 +63,39 @@ public class PsmbEndpoint implements IEndpoint { } private void publish() { - final var pub = new PublishClient(host, port); final var logger = LoggerFactory.getLogger(LoggerNaming.name().of("endpoint").of("psmb") .of(String.format("%s,%s", host, pubTopic)).of("pub").toString()); try { // reconnect loop while (true) { - // try to connect - try { - pub.connect(); - pub.setPublish(pubTopic); - } catch (IOException | CommandFailureException ex) { - logger.error("Cannot connect to server", ex); - //noinspection BusyWait - Thread.sleep(RETRY_INTERVAL_MILLIS); - continue; - } - // connected successfully, send messages - try { - // publish loop - long lastBeat = -1; - while (true) { - if (Math.abs(System.currentTimeMillis() - lastBeat) >= keepAliveInterval) { - pub.keepAlive(); - lastBeat = System.currentTimeMillis(); + try (final var pub = new PublishClient(host, port)) { + // try to connect + try { + pub.connect(); + pub.setPublish(pubTopic); + } catch (IOException | CommandFailureException ex) { + logger.error("Cannot connect to server", ex); + Util.SleepBeforeReconnect(logger); + continue; + } + // connected successfully, send messages + try { + // publish loop + long lastBeat = -1; + while (true) { + if (Math.abs(System.currentTimeMillis() - lastBeat) >= keepAliveInterval) { + pub.keepAlive(); + lastBeat = System.currentTimeMillis(); + } + var message = pubQueue.poll(Math.max(keepAliveInterval, 0), TimeUnit.MILLISECONDS); + if (message == null) continue; + pub.publish(PsmbMessageSerializer.serialize(message)); } - var message = pubQueue.poll(Math.max(keepAliveInterval, 0), TimeUnit.MILLISECONDS); - if (message == null) continue; - pub.publish(PsmbMessageSerializer.serialize(message)); + } catch (IOException | CommandFailureException ex) { + logger.error("Cannot publish message", ex); + pub.disconnect(); // reconnect in the outer loop + Util.SleepBeforeReconnect(logger); } - } catch (IOException | CommandFailureException ex) { - logger.error("Cannot publish message", ex); - pub.disconnect(); // reconnect in the outer loop } } } catch (InterruptedException ignored) { @@ -105,35 +106,36 @@ public class PsmbEndpoint implements IEndpoint { } private void subscribe() { - final var sub = new SubscribeClient(host, port, subPattern, keepAliveInterval, subId); final var logger = LoggerFactory.getLogger(LoggerNaming.name().of("endpoint").of("psmb") .of(String.format("%s,%d,%s", host, subId, subPattern)).of("sub").toString()); try { // reconnect loop while (true) { - // try to connect - try { - sub.connect(); - sub.setSubscribe(subPattern, subId); - } catch (IOException | CommandFailureException ex) { - logger.error("Cannot connect to server", ex); - //noinspection BusyWait - Thread.sleep(RETRY_INTERVAL_MILLIS); - continue; - } - // connected successfully, receive messages - try { - // subscribe loop - sub.subscribe(raw -> { - try { - onMessage(PsmbMessageSerializer.deserialize(raw, this)); - } catch (PsmbMessageSerializer.IllegalPackedMessageException ex) { - logger.error("Cannot decode message", ex); - } - }); - } catch (IOException | CommandFailureException ex) { - logger.error("Cannot receive message", ex); - sub.disconnect(); // reconnect in the outer loop + try (final var sub = new SubscribeClient(host, port, subPattern, keepAliveInterval, subId)) { + // try to connect + try { + sub.connect(); + sub.setSubscribe(subPattern, subId); + } catch (IOException | CommandFailureException ex) { + logger.error("Cannot connect to server", ex); + Util.SleepBeforeReconnect(logger); + continue; + } + // connected successfully, receive messages + try { + // subscribe loop + sub.subscribe(raw -> { + try { + onMessage(PsmbMessageSerializer.deserialize(raw, this)); + } catch (PsmbMessageSerializer.IllegalPackedMessageException ex) { + logger.error("Cannot decode message", ex); + } + }); + } catch (IOException | CommandFailureException ex) { + logger.error("Cannot receive message", ex); + sub.disconnect(); // reconnect in the outer loop + Util.SleepBeforeReconnect(logger); + } } } } catch (InterruptedException ignored) { diff --git a/src/main/java/com/keuin/crosslink/messaging/endpoint/remote/psmb/Util.java b/src/main/java/com/keuin/crosslink/messaging/endpoint/remote/psmb/Util.java new file mode 100644 index 0000000..3fc1d0f --- /dev/null +++ b/src/main/java/com/keuin/crosslink/messaging/endpoint/remote/psmb/Util.java @@ -0,0 +1,18 @@ +package com.keuin.crosslink.messaging.endpoint.remote.psmb; + +import org.slf4j.Logger; + +import static com.keuin.crosslink.messaging.endpoint.remote.psmb.PsmbEndpoint.RETRY_INTERVAL_MILLIS; + +public class Util { + public static void SleepBeforeReconnect(Logger logger) throws InterruptedException { + String time; + if (RETRY_INTERVAL_MILLIS > 1000) { + time = String.format("%.1fs", RETRY_INTERVAL_MILLIS / 1000.0); + } else { + time = String.format("%dms", RETRY_INTERVAL_MILLIS); + } + logger.info("Wait for {} before reconnecting.", time); + Thread.sleep(RETRY_INTERVAL_MILLIS); + } +} diff --git a/src/main/java/com/keuin/psmb4j/BaseClient.java b/src/main/java/com/keuin/psmb4j/BaseClient.java index bdcd97c..94f46cb 100644 --- a/src/main/java/com/keuin/psmb4j/BaseClient.java +++ b/src/main/java/com/keuin/psmb4j/BaseClient.java @@ -109,6 +109,7 @@ public abstract class BaseClient implements AutoCloseable { try { socket.close(); } catch (IOException ignored) { + } finally { socket = null; } } |