Skip to content

Commit

Permalink
fix retry
Browse files Browse the repository at this point in the history
  • Loading branch information
MrXiaoM committed Jan 10, 2024
1 parent 83e9d88 commit eb6b99c
Showing 1 changed file with 26 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,15 @@ import cn.evolvefield.onebot.client.handler.ActionHandler
import cn.evolvefield.onebot.client.handler.EventBus
import cn.evolvefield.onebot.client.util.ActionSendUtils
import com.google.gson.JsonSyntaxException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.*
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import org.java_websocket.client.WebSocketClient
import org.java_websocket.framing.CloseFrame
import org.java_websocket.handshake.ServerHandshake
import org.slf4j.Logger
import java.net.URI
import kotlin.math.log

/**
* Project: onebot-client
Expand All @@ -36,6 +34,7 @@ class WSClient(
) : WebSocketClient(uri, header) {
private var retryCount = 0
private var eventBus: EventBus? = null
val connectDef = CompletableDeferred<Boolean>()
fun createBot(): Bot {
return Bot(this, actionHandler)
}
Expand All @@ -44,29 +43,9 @@ class WSClient(
return if (eventBus != null) eventBus!! else EventBus().also { eventBus = it }
}

override fun connectBlocking(): Boolean {
if (retryTimes < 1) return super.connectBlocking()
return runBlocking {
val wait = 0L.coerceAtLeast(retryWaitMills)
val waitFormatted = String.format("%.1f", wait / 1000.0)
val restFormatted = String.format("%.1f", 0L.coerceAtLeast(retryRestMills) / 1000.0)
while (!super.connectBlocking()) {
if (retryCount >= retryTimes) {
if (retryRestMills < 0) {
logger.warn("重连 $retryTimes 次失败,放弃连接")
return@runBlocking false
}
logger.warn("重连 $retryTimes 次失败,将在 $restFormatted 秒后再尝试连接")
delay(retryRestMills)
retryCount = 0
continue
}
logger.warn("连接失败,将在 $waitFormatted 秒后尝试第 ${++retryCount} 次重连")
delay(wait)
}
retryCount = 0
return@runBlocking true
}
suspend fun connectSuspend(): Boolean {
if (super.connectBlocking()) return true
return connectDef.await()
}

override fun onOpen(handshakedata: ServerHandshake) {
Expand Down Expand Up @@ -99,8 +78,25 @@ class WSClient(
if (mutex.isLocked) mutex.unlock()
if (ActionSendUtils.mutex.isLocked) ActionSendUtils.mutex.unlock()
}
if (retryCount < 1 && code != CloseFrame.NORMAL) { // TODO: 测试确认异常关闭码
reconnectBlocking()
if (code != CloseFrame.NORMAL) { // TODO: 测试确认异常关闭码
if (retryTimes < 1) {
connectDef.complete(false)
return
}
scope.launch {
if (retryCount < retryTimes) {
retryCount++
logger.warn("等待 ${String.format("%.1f", retryWaitMills / 1000.0F)} 秒后重连 (第 $retryCount/$retryTimes")
delay(retryWaitMills)
} else {
logger.warn("重连次数耗尽... 休息 ${String.format("%.1f", retryRestMills / 1000.0F)} 秒后重试")
delay(retryRestMills)
}
logger.info("正在重连...")
if (reconnectBlocking()) {
connectDef.complete(true)
}
}
}
}

Expand All @@ -116,7 +112,7 @@ class WSClient(
val mutex = Mutex()
fun createAndConnect(scope: CoroutineScope, uri: URI, logger: Logger, actionHandler: ActionHandler, retryTimes: Int, retryWaitMills: Long, retryRestMills: Long, header: Map<String, String> = mapOf()): WSClient? {
val ws = WSClient(scope, uri, logger, actionHandler, retryTimes, retryWaitMills, retryRestMills, header)
return if (ws.connectBlocking()) ws else null
return ws.takeIf { runBlocking { ws.connectSuspend() } }
}
}
}

0 comments on commit eb6b99c

Please sign in to comment.