17
17
package io.emeraldpay.dshackle.upstream.ethereum
18
18
19
19
import io.emeraldpay.dshackle.Defaults
20
+ import io.emeraldpay.dshackle.Global
20
21
import io.emeraldpay.dshackle.SilentException
21
22
import io.emeraldpay.dshackle.config.AuthConfig
22
23
import io.emeraldpay.dshackle.data.BlockContainer
23
24
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest
24
25
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcResponse
25
26
import io.infinitape.etherjar.rpc.json.BlockJson
26
27
import io.infinitape.etherjar.rpc.json.TransactionRefJson
27
- import io.infinitape.etherjar.rpc.ws.WebsocketClient
28
+ import io.infinitape.etherjar.rpc.ws.SubscriptionJson
29
+ import io.netty.buffer.ByteBufInputStream
30
+ import io.netty.handler.codec.http.HttpHeaderNames
28
31
import org.slf4j.LoggerFactory
32
+ import reactor.core.Disposable
29
33
import reactor.core.publisher.Flux
30
34
import reactor.core.publisher.Mono
31
- import reactor.extra.processor.TopicProcessor
35
+ import reactor.core.publisher.Sinks
36
+ import reactor.netty.http.client.HttpClient
37
+ import reactor.netty.http.client.WebsocketClientSpec
32
38
import reactor.retry.Repeat
39
+ import java.io.InputStream
33
40
import java.net.URI
34
41
import java.time.Duration
42
+ import java.util.*
43
+ import java.util.concurrent.TimeUnit
44
+ import java.util.concurrent.atomic.AtomicReference
35
45
36
46
class EthereumWsFactory (
37
47
private val uri : URI ,
@@ -49,36 +59,119 @@ class EthereumWsFactory(
49
59
private val origin : URI ,
50
60
private val upstream : EthereumUpstream ,
51
61
private val basicAuth : AuthConfig .ClientBasicAuth ?
52
- ) {
62
+ ) : AutoCloseable {
53
63
54
64
companion object {
55
65
private val log = LoggerFactory .getLogger(EthereumWs ::class .java)
66
+
67
+ private const val START_REQUEST = " {\" jsonrpc\" :\" 2.0\" , \" method\" :\" eth_subscribe\" , \" id\" :\" blocks\" , \" params\" :[\" newHeads\" ]}"
56
68
}
57
69
58
- private val topic = TopicProcessor
59
- .builder<BlockContainer >()
60
- .name(" new-blocks" )
61
- .build()
70
+ private val topic = Sinks
71
+ .many()
72
+ .unicast()
73
+ .onBackpressureBuffer<BlockContainer >()
74
+ private var keepConnection = true
75
+ private var connection: Disposable ? = null
62
76
63
77
fun connect () {
64
- log.info(" Connecting to WebSocket: $uri " )
65
- val clientBuilder = WebsocketClient .newBuilder()
66
- .connectTo(uri)
67
- .origin(origin)
68
- basicAuth?.let { auth ->
69
- clientBuilder.basicAuth(auth.username, auth.password)
70
- }
71
- val client = clientBuilder.build()
72
- try {
73
- client.connect()
74
- client.onNewBlock(this ::onNewBlock)
75
- } catch (e: Exception ) {
76
- log.error(" Failed to connect to websocket at $uri . Error: ${e.message} " )
78
+ if (keepConnection) {
79
+ connectInternal()
77
80
}
78
81
}
79
82
83
+ private fun tryReconnectLater () {
84
+ Global .control.schedule(
85
+ { connectInternal() },
86
+ Defaults .retryConnection.seconds, TimeUnit .SECONDS )
87
+ }
88
+
89
+ private fun connectInternal () {
90
+ log.info(" Connecting to WebSocket: $uri " )
91
+ connection?.dispose()
92
+ connection = null
93
+
94
+ val subscriptionId = AtomicReference <String >(" NOTSET" )
95
+
96
+ val objectMapper = Global .objectMapper
97
+ connection = HttpClient .create()
98
+ .doOnError(
99
+ { _, t ->
100
+ log.warn(" Failed to connect to $uri . Error: ${t.message} " )
101
+ // going to try to reconnect later
102
+ tryReconnectLater()
103
+ },
104
+ { _, _ ->
105
+
106
+ }
107
+ )
108
+ .headers { headers ->
109
+ headers.add(HttpHeaderNames .ORIGIN , origin)
110
+ basicAuth?.let { auth ->
111
+ val tmp: String = auth.username + " :" + auth.password
112
+ val base64password = Base64 .getEncoder().encodeToString(tmp.toByteArray())
113
+ headers.add(HttpHeaderNames .AUTHORIZATION , " Basic $base64password " )
114
+ }
115
+ }
116
+ .let {
117
+ if (uri.scheme == " wss" ) {
118
+ it.secure()
119
+ } else {
120
+ it
121
+ }
122
+ }
123
+ .websocket(
124
+ WebsocketClientSpec .builder()
125
+ .handlePing(true )
126
+ .compress(false )
127
+ .build()
128
+ )
129
+
130
+ .uri(uri)
131
+ .handle { inbound, outbound ->
132
+ val consumer = inbound.aggregateFrames()
133
+ .aggregateFrames(8 * 65_536 )
134
+ .receiveFrames()
135
+ .flatMap {
136
+ val msg: SubscriptionJson = objectMapper.readerFor(SubscriptionJson ::class .java)
137
+ .readValue(ByteBufInputStream (it.content()) as InputStream )
138
+ when {
139
+ msg.error != null -> {
140
+ Mono .error(IllegalStateException (" Received error from WS upstream" ))
141
+ }
142
+ msg.subscription == subscriptionId.get() -> {
143
+ onNewBlock(msg.blockResult)
144
+ Mono .empty<Int >()
145
+ }
146
+ msg.subscription == null -> {
147
+ // received ID for subscription
148
+ subscriptionId.set(msg.result.asText())
149
+ log.debug(" Connected to $uri " )
150
+ Mono .empty<Int >()
151
+ }
152
+ else -> {
153
+ Mono .error(IllegalStateException (" Unknown message received: ${msg.subscription} " ))
154
+ }
155
+ }
156
+ }
157
+ .onErrorResume { t ->
158
+ log.warn(" Connection dropped to $uri . Error: ${t.message} " )
159
+ // going to try to reconnect later
160
+ tryReconnectLater()
161
+ // completes current outbound flow
162
+ Mono .empty()
163
+ }
164
+
165
+
166
+ outbound.sendString(Mono .just(START_REQUEST ).doOnError {
167
+ println (" !!!!!!!" )
168
+ })
169
+ .then(consumer.then())
170
+ }.subscribe()
171
+ }
172
+
80
173
fun onNewBlock (block : BlockJson <TransactionRefJson >) {
81
- // WS returns incomplete blocks
174
+ // WS returns incomplete blocks, i.e. without some fields, so need to fetch full block data
82
175
if (block.difficulty == null || block.transactions == null ) {
83
176
Mono .just(block.hash)
84
177
.flatMap { hash ->
@@ -100,16 +193,23 @@ class EthereumWsFactory(
100
193
}
101
194
.timeout(Defaults .timeout, Mono .empty())
102
195
.onErrorResume { Mono .empty() }
103
- .subscribe(topic::onNext)
196
+ .subscribe {
197
+ topic.tryEmitNext(it)
198
+ }
104
199
105
200
} else {
106
- topic.onNext (BlockContainer .from(block))
201
+ topic.tryEmitNext (BlockContainer .from(block))
107
202
}
108
203
}
109
204
110
205
fun getFlux (): Flux <BlockContainer > {
111
- return Flux .from(this .topic)
112
- .onBackpressureLatest()
206
+ return this .topic.asFlux()
207
+ }
208
+
209
+ override fun close () {
210
+ keepConnection = false
211
+ connection?.dispose()
212
+ connection = null
113
213
}
114
214
}
115
215
0 commit comments