Skip to content
This repository was archived by the owner on Feb 18, 2023. It is now read-only.

Commit 3fc3eb4

Browse files
authored
Merge pull request #216 from theunkn0wn1/fix/SPARK-241
[SPARK-241] Abstract API execution by one more level, attempt to recover from hard fails.
2 parents 0efce57 + 1abd16b commit 3fc3eb4

2 files changed

Lines changed: 60 additions & 14 deletions

File tree

src/packages/fuelrats_api/v3/interface.py

Lines changed: 50 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
from .models.v1.rats import Rat as ApiRat, RAT_TYPE
1515
from .models.v1.rescue import Rescue as ApiRescue
1616
from .models.jsonapi.resource import Resource
17-
from .websocket.client import Connection
17+
from .websocket.client import Connection, Hardfail
1818
from .websocket.protocol import Request, Response
1919
from .._base import FuelratsApiABC, Impersonation
2020
from ...rat import Rat as InternalRat
@@ -84,6 +84,8 @@ async def run_task(self):
8484
A
8585
TASK
8686
"""
87+
# Ensure we have a new event signal.
88+
self.connected_event = asyncio.Event()
8789
logger.info("creating new socket connection....")
8890
async with websockets.connect(
8991
uri=f"{self.config.uri}?bearer={self.config.authorization}",
@@ -115,15 +117,15 @@ async def update_rescue(self, rescue: Rescue, impersonating: Impersonation) -> N
115117
)
116118
if not Impersonation:
117119
del work.query["representing"]
118-
response = await self.connection.execute(work)
120+
response = await self.execute(work)
119121
return response
120122

121123
async def _get_rescue(self, key: UUID, impersonation: Impersonation) -> Optional[ApiRescue]:
122124
await self.ensure_connection()
123125
work = Request(
124126
endpoint=["rescues", "read"], query={"id": f"{key}", "representing": impersonation}
125127
)
126-
response = await self.connection.execute(work)
128+
response = await self.execute(work)
127129
return cattr.structure(response.body["data"], Optional[ApiRescue])
128130

129131
async def get_rescue(self, key: UUID, impersonation: Impersonation) -> typing.Optional[Rescue]:
@@ -133,8 +135,10 @@ async def get_rescue(self, key: UUID, impersonation: Impersonation) -> typing.Op
133135

134136
async def ensure_connection(self):
135137
if not self.connected_event.is_set():
136-
logger.debug("waiting for the connected event to be set...")
137-
await self.connected_event.wait()
138+
logger.trace("waiting for the connected event to be set...")
139+
# wait for a short period for the connection to be established, but not indefinitely.
140+
await asyncio.wait_for(fut=await self.connected_event.wait(), timeout=5)
141+
138142
logger.trace("connected event is set!")
139143

140144
async def create_rescue(self, rescue: Rescue, impersonating: Impersonation) -> Rescue:
@@ -144,7 +148,7 @@ async def create_rescue(self, rescue: Rescue, impersonating: Impersonation) -> R
144148
query={"representing": impersonating},
145149
body={"data": attr.asdict(ApiRescue.from_internal(rescue), recurse=True)},
146150
)
147-
result = await self.connection.execute(work)
151+
result = await self.execute(work)
148152
# if we get this far, we got a OK response; which means the data field contains our rescue.
149153
payload: ApiRescue = cattr.structure(result.body["data"], ApiRescue)
150154
return payload.into_internal()
@@ -169,7 +173,7 @@ async def _get_nicknames(self, key: str, impersonation: Impersonation) -> Respon
169173
)
170174
# TODO: offline check
171175
logger.info(f"querying nickname {key}")
172-
return await self.connection.execute(work)
176+
return await self.execute(work)
173177

174178
async def _get_rat_uuid(self, key: UUID, impersonation: Impersonation):
175179
await self.ensure_connection()
@@ -178,15 +182,15 @@ async def _get_rat_uuid(self, key: UUID, impersonation: Impersonation):
178182
endpoint=["rats", "read"], query={"id": f"{key}", "representing": impersonation}
179183
)
180184
logger.debug("requesting rat {}", work)
181-
return await self.connection.execute(work)
185+
return await self.execute(work)
182186

183187
async def _get_open_rescues(self, impersonate: Impersonation) -> List[ApiRescue]:
184188
await self.ensure_connection()
185189
work = Request(
186190
endpoint=["rescues", "search"], query={"filter": {"status": {"eq": "open"}}}, body={}
187191
)
188192
logger.trace("requesting open rescues...")
189-
results = await self.connection.execute(work)
193+
results = await self.execute(work)
190194
# Iterators are less expensive than comprehensions (differed compute).
191195
structured_data = cattr.structure(results.body["data"], List[ApiRescue])
192196
return structured_data
@@ -204,3 +208,40 @@ async def _get_rats_from_nickname(self, key: str, impersonation: Impersonation)
204208
logger.debug("filtered Rats from nickname result: {!r}", rats)
205209

206210
return rats
211+
212+
async def execute(self, work: Request, retry: bool = False) -> Response:
213+
"""
214+
Attempts to execute the work item against the underlying connection.
215+
216+
This method will attempt to retry if a retry has not been attempted (`retry`=False).
217+
This method achieves that by destroying the existing connection and restarting the
218+
Relevant worker, then recursively calling itself with retry=True
219+
220+
Args:
221+
work: work item
222+
retry: is this call a retry attempt?
223+
224+
Returns:
225+
Response object
226+
227+
Raises:
228+
Hardfail from underlying API error, if connection is still dead after a retry.
229+
"""
230+
await self.ensure_connection()
231+
232+
try:
233+
# attempt to invoke the underlying connection work item
234+
return await self.connection.execute(work=work)
235+
# If this fails hard, spark needs to attempt to reconnect (unless its already tried.)
236+
except Hardfail:
237+
# unconditionally kill the connection.
238+
self.connection.shutdown.set()
239+
if not retry:
240+
logger.exception("API hard failure detected, attempting to recover...")
241+
# kill the old connection, since its in a hardfail state
242+
self.connection.shutdown.set()
243+
# re-create the run_task, which creates a new connection.
244+
asyncio.create_task(self.run_task())
245+
# recursively call this routine, as its possible to fail more than once.
246+
return await self.execute(work=work, retry=True)
247+
raise

src/packages/fuelrats_api/v3/websocket/client.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,11 @@
1111
from .events import RescueUpdate, CLS_FOR_EVENT
1212
from .. import event_converter
1313
from ..models.v1.apierror import APIException, ApiError, UnauthorizedImpersonation
14+
from ..._base import ApiException
15+
16+
17+
class Hardfail(ApiException):
18+
""" API Hard failure. the underlying transport is in an unrecoverable fail state. """
1419

1520

1621
class Connection:
@@ -152,8 +157,8 @@ async def check_fail(self):
152157
# we do this here since exceptions cannot otherwise be meaningfully handled,
153158
# as the caller of this method is probably not doing so from a unmonitored task
154159
if self._rx_worker.done() and self._rx_worker.exception():
155-
raise self._rx_worker.exception()
156-
if self._tx_worker.done() and self._tx_worker.exception():
157-
raise self._tx_worker.exception()
158-
if self._fail_worker.done() and self._fail_worker.exception():
159-
raise self._fail_worker.exception()
160+
raise Hardfail from self._rx_worker.exception()
161+
elif self._tx_worker.done() and self._tx_worker.exception():
162+
raise Hardfail from self._tx_worker.exception()
163+
elif self._fail_worker.done() and self._fail_worker.exception():
164+
raise Hardfail from self._fail_worker.exception()

0 commit comments

Comments
 (0)