summaryrefslogtreecommitdiff
path: root/src/main/java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java')
-rw-r--r--src/main/java/com/keuin/crosslink/api/ApiServer.java16
-rw-r--r--src/main/java/com/keuin/crosslink/messaging/endpoint/EndpointNamespace.java2
-rw-r--r--src/main/java/com/keuin/crosslink/messaging/endpoint/system/ApiEndpoint.java53
-rw-r--r--src/main/java/com/keuin/crosslink/plugin/common/PluginMain.java7
-rw-r--r--src/main/java/com/keuin/psmb4j/BaseClient.java25
5 files changed, 89 insertions, 14 deletions
diff --git a/src/main/java/com/keuin/crosslink/api/ApiServer.java b/src/main/java/com/keuin/crosslink/api/ApiServer.java
index ebd1820..8a3bbce 100644
--- a/src/main/java/com/keuin/crosslink/api/ApiServer.java
+++ b/src/main/java/com/keuin/crosslink/api/ApiServer.java
@@ -101,6 +101,22 @@ public class ApiServer implements IApiServer {
logger.debug("Finished reading server status. Sending response to HTTP client.");
}
})
+ .put("/message", new JsonReqHandler() {
+ @Override
+ protected void handle(JsonHttpExchange exchange) throws IOException {
+ var req = exchange.getRequestBody();
+ var sender = req.get("sender");
+ var message = req.get("message");
+ if (!sender.isTextual() || !message.isTextual()) {
+ exchange.setResponseCode(400);
+ var resp = exchange.getResponseBody();
+ resp.put("success", false);
+ resp.put("message", "Illegal parameter type.");
+ return;
+ }
+
+ }
+ })
.build().forEach((p, h) -> server.createContext(p).setHandler(h));
server.start();
logger.info("API server is listening on {}:{}.",
diff --git a/src/main/java/com/keuin/crosslink/messaging/endpoint/EndpointNamespace.java b/src/main/java/com/keuin/crosslink/messaging/endpoint/EndpointNamespace.java
index fb3eb12..8dfcd9d 100644
--- a/src/main/java/com/keuin/crosslink/messaging/endpoint/EndpointNamespace.java
+++ b/src/main/java/com/keuin/crosslink/messaging/endpoint/EndpointNamespace.java
@@ -6,7 +6,7 @@ import org.jetbrains.annotations.Nullable;
import java.util.Objects;
public enum EndpointNamespace {
- SERVER("server"), REMOTE("remote");
+ SERVER("server"), REMOTE("remote"), SYSTEM("system");
private final String namespace;
EndpointNamespace(String namespace) {
diff --git a/src/main/java/com/keuin/crosslink/messaging/endpoint/system/ApiEndpoint.java b/src/main/java/com/keuin/crosslink/messaging/endpoint/system/ApiEndpoint.java
new file mode 100644
index 0000000..ea12266
--- /dev/null
+++ b/src/main/java/com/keuin/crosslink/messaging/endpoint/system/ApiEndpoint.java
@@ -0,0 +1,53 @@
+package com.keuin.crosslink.messaging.endpoint.system;
+
+import com.keuin.crosslink.messaging.endpoint.EndpointNamespace;
+import com.keuin.crosslink.messaging.endpoint.IEndpoint;
+import com.keuin.crosslink.messaging.message.IMessage;
+import com.keuin.crosslink.messaging.router.IRouter;
+import com.keuin.crosslink.messaging.sender.ISender;
+import org.jetbrains.annotations.NotNull;
+
+public class ApiEndpoint implements IEndpoint {
+
+ public static final ApiEndpoint INSTANCE = new ApiEndpoint(); // For simplicity, we use singleton.
+ private IRouter router;
+
+ private ApiEndpoint() {
+ }
+
+ /**
+ * The actual API provides message sent from this endpoint.
+ *
+ * @param sender message sender.
+ * @param content message content.
+ */
+ public void offerMessage(@NotNull ISender sender, @NotNull String content) {
+ router.sendMessage(IMessage.create(this, sender, content));
+ }
+
+ @Override
+ public void sendMessage(IMessage message) {
+ // This endpoint does not receive messages,
+ // so we do nothing here.
+ }
+
+ @Override
+ public void setRouter(IRouter router) {
+ this.router = router;
+ }
+
+ @Override
+ public void close() {
+ // Since sendMessage is empty, we do nothing here.
+ }
+
+ @Override
+ public @NotNull String id() {
+ return "api";
+ }
+
+ @Override
+ public @NotNull EndpointNamespace namespace() {
+ return EndpointNamespace.SYSTEM;
+ }
+}
diff --git a/src/main/java/com/keuin/crosslink/plugin/common/PluginMain.java b/src/main/java/com/keuin/crosslink/plugin/common/PluginMain.java
index bdaf31d..69b4299 100644
--- a/src/main/java/com/keuin/crosslink/plugin/common/PluginMain.java
+++ b/src/main/java/com/keuin/crosslink/plugin/common/PluginMain.java
@@ -58,7 +58,8 @@ public final class PluginMain {
logger.info("Initializing components.");
// initialize message routing
logger.info("Initializing message routing.");
- var endpoints = new HashSet<>(coreAccessor.getServerEndpoints()); // All local and remote endpoints. remotes will be added later
+ // Contains all enabled endpoints, including local and remote ones. Remote endpoints will be added later.
+ final var endpoints = new HashSet<>(coreAccessor.getServerEndpoints());
try {
var messaging = GlobalConfigManager.getInstance().messaging();
var routing = Optional.ofNullable(messaging.get("routing"))
@@ -95,11 +96,15 @@ public final class PluginMain {
logger.error("Invalid remote endpoint", ex);
throw new RuntimeException(ex);
}
+
} catch (JsonProcessingException ex) {
logger.error("Failed to parse JSON config", ex);
throw new RuntimeException(ex);
}
+ // API endpoint (send messages from HTTP api)
+
+
// register all endpoints on the router
for (IEndpoint ep : endpoints) {
if (!messageRouter.addEndpoint(ep)) {
diff --git a/src/main/java/com/keuin/psmb4j/BaseClient.java b/src/main/java/com/keuin/psmb4j/BaseClient.java
index 3c5cf08..0fd5274 100644
--- a/src/main/java/com/keuin/psmb4j/BaseClient.java
+++ b/src/main/java/com/keuin/psmb4j/BaseClient.java
@@ -39,6 +39,7 @@ public abstract class BaseClient implements AutoCloseable {
* and should be called only once.
* If an IO error occurred when doing some operation,
* this client must be reconnected before next operations.
+ *
* @throws IOException if a network error occurred
*/
public void connect() throws IOException {
@@ -74,19 +75,19 @@ public abstract class BaseClient implements AutoCloseable {
public void keepAlive() throws IOException {
final var nop = new byte[]{'N', 'O', 'P'};
final var nil = new byte[]{'N', 'I', 'L'};
- synchronized (socketReadLock) {
- synchronized (socketWriteLock) {
- // lock the whole bidirectional communication
- os.write(nop);
- os.flush();
- // wait for a response NIL
- var response = InputStreamUtils.readBytes(is, 3);
- if (!Arrays.equals(response, nil)) {
- throw new RuntimeException("illegal command from server: " +
- new String(response, StandardCharsets.US_ASCII));
- }
- }
+// synchronized (socketReadLock) {
+ synchronized (socketWriteLock) {
+ // lock the whole bidirectional communication
+ os.write(nop);
+ os.flush();
+// // wait for a response NIL
+// var response = InputStreamUtils.readBytes(is, 3);
+// if (!Arrays.equals(response, nil)) {
+// throw new RuntimeException("illegal command from server: " +
+// new String(response, StandardCharsets.US_ASCII));
+// }
}
+// }
}
public void disconnect() {