Skip to content

Commit

Permalink
EventBus should be object for multi bot support
Browse files Browse the repository at this point in the history
  • Loading branch information
MrXiaoM committed Feb 26, 2024
1 parent 004f495 commit 3ecc323
Show file tree
Hide file tree
Showing 9 changed files with 158 additions and 186 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package cn.evolvefield.onebot.client.connection

import cn.evole.onebot.sdk.util.json.JsonsObject
import cn.evolvefield.onebot.client.handler.ActionHandler
import cn.evolvefield.onebot.client.handler.EventBus
import cn.evolvefield.onebot.client.util.ActionSendUtils
import com.google.gson.JsonSyntaxException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import org.slf4j.Logger

interface IAdapter {
val scope: CoroutineScope
val actionHandler: ActionHandler
val logger: Logger

fun onReceiveMessage(message: String) {
try {
val json = JsonsObject(message)
if (json.optString(META_EVENT) != HEART_BEAT) { // 过滤心跳
logger.debug("Client received <-- {}", json.toString())

if (json.has(API_RESULT_KEY)) { // 接口回调
actionHandler.onReceiveActionResp(json)
} else scope.launch { // 处理事件
mutex.withLock {
EventBus.onReceive(message)
}
}
}
} catch (e: JsonSyntaxException) {
logger.error("Json语法错误: {}", message)
}
}

fun unlockMutex() {
runCatching {
if (mutex.isLocked) mutex.unlock()
if (ActionSendUtils.mutex.isLocked) ActionSendUtils.mutex.unlock()
}
}

companion object {
private const val META_EVENT = "meta_event_type"
private const val API_RESULT_KEY = "echo"
private const val HEART_BEAT = "heartbeat"

val mutex = Mutex()
}
}
Original file line number Diff line number Diff line change
@@ -1,20 +1,13 @@
package cn.evolvefield.onebot.client.connection

import cn.evole.onebot.sdk.util.json.JsonsObject
import cn.evolvefield.onebot.client.core.Bot
import cn.evolvefield.onebot.client.handler.ActionHandler
import cn.evolvefield.onebot.client.handler.EventBus
import cn.evolvefield.onebot.client.util.ActionSendUtils
import com.google.gson.JsonSyntaxException
import kotlinx.coroutines.*
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import org.java_websocket.client.WebSocketClient
import org.java_websocket.framing.CloseFrame
import org.java_websocket.handshake.ServerHandshake
import org.slf4j.Logger
import java.net.URI
import kotlin.math.log

/**
* Project: onebot-client
Expand All @@ -23,26 +16,21 @@ import kotlin.math.log
* Description:
*/
class WSClient(
private val scope: CoroutineScope,
override val scope: CoroutineScope,
uri: URI,
private val logger: Logger,
private val actionHandler: ActionHandler,
val retryTimes: Int = 5,
val retryWaitMills: Long = 5000L,
val retryRestMills: Long = 60000L,
override val logger: Logger,
override val actionHandler: ActionHandler,
private val retryTimes: Int = 5,
private val retryWaitMills: Long = 5000L,
private val retryRestMills: Long = 60000L,
header: Map<String, String> = mapOf(),
) : WebSocketClient(uri, header) {
) : WebSocketClient(uri, header), IAdapter {
private var retryCount = 0
private var eventBus: EventBus? = null
val connectDef = CompletableDeferred<Boolean>()
private val connectDef = CompletableDeferred<Boolean>()
fun createBot(): Bot {
return Bot(this, actionHandler)
}

fun createEventBus(): EventBus {
return if (eventBus != null) eventBus!! else EventBus().also { eventBus = it }
}

suspend fun connectSuspend(): Boolean {
if (super.connectBlocking()) return true
return connectDef.await()
Expand All @@ -52,61 +40,44 @@ class WSClient(
logger.info("▌ 已连接到服务器 ┈━═☆")
}

override fun onMessage(message: String) {
try {
val jsonObject = JsonsObject(message)
if (HEART_BEAT != jsonObject.optString(META_EVENT)) { //过滤心跳
logger.debug("Client received <-- {}", jsonObject.toString())
if (jsonObject.has(API_RESULT_KEY)) {
actionHandler.onReceiveActionResp(jsonObject) //请求执行
} else {
scope.launch {
mutex.withLock {
eventBus?.onReceive(message)
}
}
}
}
} catch (e: JsonSyntaxException) {
logger.error("Json语法错误:{}", message)
}
}
override fun onMessage(message: String) = onReceiveMessage(message)

override fun onClose(code: Int, reason: String, remote: Boolean) {
logger.info(
"▌ 服务器连接因 {} 已关闭 (关闭码: {})",
reason.ifEmpty { "未知原因" },
CloseCode.valueOf(code) ?: code
)
runCatching {
if (mutex.isLocked) mutex.unlock()
if (ActionSendUtils.mutex.isLocked) ActionSendUtils.mutex.unlock()
unlockMutex()

// 自动重连
if (code != CloseFrame.NORMAL) retry()
}

private fun retry() {
if (retryTimes < 1 || retryWaitMills < 0) {
logger.warn("连接失败,未开启自动重连,放弃连接")
connectDef.complete(false)
return
}
if (code != CloseFrame.NORMAL) { // TODO: 测试确认异常关闭码
if (retryTimes < 1 || retryWaitMills < 0) {
logger.warn("连接失败,未开启自动重连,放弃连接")
connectDef.complete(false)
return
}
scope.launch {
if (retryCount < retryTimes) {
retryCount++
logger.warn("等待 ${String.format("%.1f", retryWaitMills / 1000.0F)} 秒后重连 (第 $retryCount/$retryTimes 次)")
delay(retryWaitMills)
} else {
retryCount = 0
if (retryRestMills < 0) {
logger.warn("重连次数耗尽... 放弃重试")
return@launch
}
logger.warn("重连次数耗尽... 休息 ${String.format("%.1f", retryRestMills / 1000.0F)} 秒后重试")
delay(retryRestMills)
}
logger.info("正在重连...")
if (reconnectBlocking()) {
retryCount = 0
connectDef.complete(true)
scope.launch {
if (retryCount < retryTimes) {
retryCount++
logger.warn("等待 ${String.format("%.1f", retryWaitMills / 1000.0F)} 秒后重连 (第 $retryCount/$retryTimes 次)")
delay(retryWaitMills)
} else {
retryCount = 0
if (retryRestMills < 0) {
logger.warn("重连次数耗尽... 放弃重试")
return@launch
}
logger.warn("重连次数耗尽... 休息 ${String.format("%.1f", retryRestMills / 1000.0F)} 秒后重试")
delay(retryRestMills)
}
logger.info("正在重连...")
if (reconnectBlocking()) {
retryCount = 0
connectDef.complete(true)
}
}
}
Expand All @@ -116,11 +87,6 @@ class WSClient(
}

companion object {
private const val META_EVENT = "meta_event_type"
private const val API_RESULT_KEY = "echo"
private const val HEART_BEAT = "heartbeat"

val mutex = Mutex()
fun createAndConnect(scope: CoroutineScope, uri: URI, logger: Logger, actionHandler: ActionHandler, retryTimes: Int, retryWaitMills: Long, retryRestMills: Long, header: Map<String, String> = mapOf()): WSClient? {
val ws = WSClient(scope, uri, logger, actionHandler, retryTimes, retryWaitMills, retryRestMills, header)
return ws.takeIf { runBlocking { ws.connectSuspend() } }
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,9 @@
package cn.evolvefield.onebot.client.connection

import cn.evole.onebot.sdk.util.json.JsonsObject
import cn.evolvefield.onebot.client.core.Bot
import cn.evolvefield.onebot.client.handler.ActionHandler
import cn.evolvefield.onebot.client.handler.EventBus
import cn.evolvefield.onebot.client.util.ActionSendUtils
import com.google.gson.JsonSyntaxException
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import org.java_websocket.WebSocket
import org.java_websocket.framing.CloseFrame
import org.java_websocket.handshake.ClientHandshake
Expand All @@ -25,20 +18,15 @@ import java.net.InetSocketAddress
* Description:
*/
class WSServer(
private val scope: CoroutineScope,
override val scope: CoroutineScope,
address: InetSocketAddress,
private val logger: Logger,
private val actionHandler: ActionHandler,
override val logger: Logger,
override val actionHandler: ActionHandler,
private val token: String
) : WebSocketServer(address) {
private var eventBus: EventBus? = null
) : WebSocketServer(address), IAdapter {
private var bot: Bot? = null
val def = CompletableDeferred<Bot>()

fun createEventBus(): EventBus {
return if (eventBus != null) eventBus!! else EventBus().also { eventBus = it }
}

override fun onStart() {
logger.info("▌ 反向 WebSocket 服务端已在 $address 启动")
logger.info("▌ 正在等待客户端连接...")
Expand Down Expand Up @@ -76,48 +64,22 @@ class WSServer(
}).conn = conn
}

override fun onMessage(conn: WebSocket, message: String) {
try {
val jsonObject = JsonsObject(message)
if (HEART_BEAT != jsonObject.optString(META_EVENT)) { //过滤心跳
logger.debug("Client received <-- {}", jsonObject.toString())
if (jsonObject.has(API_RESULT_KEY)) {
actionHandler.onReceiveActionResp(jsonObject) //请求执行
} else {
scope.launch {
mutex.withLock {
eventBus?.onReceive(message)
}
}
}
}
} catch (e: JsonSyntaxException) {
logger.error("Json语法错误: {}", message)
}
}
override fun onMessage(conn: WebSocket, message: String) = onReceiveMessage(message)

override fun onClose(conn: WebSocket, code: Int, reason: String, remote: Boolean) {
logger.info(
"▌ 反向 WebSocket 客户端连接因 {} 已关闭 (关闭码: {})",
reason.ifEmpty { "未知原因" },
CloseCode.valueOf(code) ?: code
)
runCatching {
if (mutex.isLocked) mutex.unlock()
if (ActionSendUtils.mutex.isLocked) ActionSendUtils.mutex.unlock()
}
unlockMutex()
}

override fun onError(conn: WebSocket, ex: Exception) {
logger.error("▌ 反向 WebSocket 客户端连接出现错误 {} 或未连接 ┈━═☆", ex.localizedMessage)
}

companion object {
private const val META_EVENT = "meta_event_type"
private const val API_RESULT_KEY = "echo"
private const val HEART_BEAT = "heartbeat"

val mutex = Mutex()
suspend fun createAndWaitConnect(scope: CoroutineScope, address: InetSocketAddress, logger: Logger, actionHandler: ActionHandler, token: String): Pair<WSServer, Bot> {
val ws = WSServer(scope, address, logger, actionHandler, token)
ws.start()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,20 @@ import java.util.concurrent.ConcurrentHashMap
* Description:
*/
@Suppress("unused")
class EventBus {

object EventBus {
private val log = LoggerFactory.getLogger(EventBus::class.java)
//存储监听器对象
private val eventlistenerlist: MutableList<EventListener<out Event>> = ArrayList()

private val listeners: MutableList<EventListener<out Event>> = ArrayList()
//缓存类型与监听器的关系
private val cache: MutableMap<Class<out Event>, List<EventListener<out Event>>> = ConcurrentHashMap()

fun addListener(listener: EventListener<out Event>) {
eventlistenerlist.add(listener)
listeners.add(listener)
}

fun stop() {
cache.clear()
eventlistenerlist.clear()
listeners.clear()
}

/**
Expand Down Expand Up @@ -63,7 +62,7 @@ class EventBus {
*/
private fun getMethod(messageType: Class<out Event>): List<EventListener<out Event>> {
val eventListeners: MutableList<EventListener<out Event>> = ArrayList()
for (eventListener in eventlistenerlist) {
for (eventListener in listeners) {
try {
if (eventListener.javaClass.declaredMethods.none {
it.name == "onMessage" && it.parameterTypes.any { par -> messageType == par }
Expand All @@ -78,16 +77,12 @@ class EventBus {
}

val listenerList: List<EventListener<out Event>>
get() = eventlistenerlist
get() = listeners

/**
* 清除类型缓存
*/
fun cleanCache() {
cache.clear()
}

companion object {
private val log = LoggerFactory.getLogger(EventBus::class.java)
}
}
Loading

0 comments on commit 3ecc323

Please sign in to comment.