-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathDXFeed.py
117 lines (99 loc) · 3.36 KB
/
DXFeed.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import aiocometd, asyncio
from aiocometd import ConnectionType, Client
from DXAuthExtension import DXAuthExtension
from enum import Enum
# https://github.com/LordKaT/tastytrade_api_thing/blob/main/lib/DXFeed.py
class DXAction(str, Enum):
ADD = "add"
ADD_TIME_SERIES = "addTimeSeries"
class DXEvent(str, Enum):
TRADE = "Trade"
QUOTE = "Quote"
SUMMARY = "Summary"
PROFILE = "Profile"
ORDER = "Order"
TIME_AND_SALE = "TimeAndSale"
CANDLE = "Candle"
TRADE_ETH = "TradETH"
SPREAD_ORDER = "SpreadOrder"
GREEKS = "Greeks"
THEORETICAL_PRICE = "TheoPrice"
UNDERLYING = "Underlying"
SERIES = "Series"
CONFIGURATION = "Configuration"
class DXService(str, Enum):
DATA = "/service/data"
SUBSCRIBE = "/service/sub"
class DXFeed:
uri: str = None
auth_token: str = None
streamer: any = None
active: bool = False
def __init__(self, uri: str = None, auth_token: str = None) -> None:
self.uri = uri
self.auth_token = auth_token
async def connect(self) -> bool:
aiocometd.client.DEFAULT_CONNECTION_TYPE = ConnectionType.WEBSOCKET
self.streamer = Client(
url=self.uri,
connection_types=ConnectionType.WEBSOCKET,
auth=DXAuthExtension(self.auth_token),
)
await self.streamer.open()
await self.streamer.subscribe(DXService.DATA.value)
await self.streamer.publish(DXService.SUBSCRIBE.value, {"reset": True})
self.active = True
async def disconnect(self) -> bool:
self.active = False
await self.streamer.close()
async def subscribe(
self,
events: list[DXEvent] = [],
symbols: list[str] = [],
) -> bool:
for event in events:
print(f"Subscribing to {event.value}: {symbols}")
body = {}
body[event.value] = symbols
await self.streamer.publish(
DXService.SUBSCRIBE.value,
{DXAction.ADD: {event.value: symbols}},
)
return True
async def subscribe_time_series(
self, symbol: str = None, from_time: int = None, to_time: int = None
) -> bool:
body = {}
body[DXEvent.CANDLE.value] = [
{
"eventSymbol": symbol,
"fromTime": from_time,
"toTime": to_time,
}
]
print({DXAction.ADD_TIME_SERIES.value: body})
await self.streamer.publish(
DXService.SUBSCRIBE.value, {DXAction.ADD_TIME_SERIES.value: body}
)
return True
async def unsubscribe(
self, events: list[DXEvent] = [], symbols: list[str] = []
) -> bool:
for event in events:
print(f"Unsubscribing from {event.value}: {symbols}")
await self.streamer.publish(
DXService.SUBSCRIBE, {"remove": {event.value: symbols}}
)
return True
async def listen(self, callback) -> bool:
try:
async with asyncio.timeout(0.1):
async for msg in self.streamer:
if msg["channel"] != DXService.DATA:
print(f"dxfeed other {msg}")
continue
# print(f"dxfeed get {msg}")
await callback(msg)
return True
except asyncio.TimeoutError:
return True