Skip to content

Commit 32646f7

Browse files
authored
Subscribe Unsubscribe message handlers (#7)
1 parent 6634b95 commit 32646f7

13 files changed

+266
-160
lines changed
Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,13 @@
11
import pytest
22

33
from app.testing import MockedWebSocketServerProtocol
4-
from storage import SubscriptionStorage
54

65

76
@pytest.fixture
8-
def storage():
9-
return SubscriptionStorage()
7+
def ws():
8+
return MockedWebSocketServerProtocol()
109

1110

1211
@pytest.fixture
13-
def ws():
12+
def ya_ws():
1413
return MockedWebSocketServerProtocol()

src/conftest.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,11 @@
22

33
from app.conf import Settings
44

5+
pytest_plugins = [
6+
"app.fixtures",
7+
"storage.fixtures",
8+
]
9+
510

611
@pytest.fixture(autouse=True)
712
def settings(mocker):

src/handlers/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
from handlers.websocket_auth_message_handler import WebsocketAuthMessageHandler
1+
from handlers.message_handler import WebSocketMessageHandler
22

33
__all__ = [
4-
"WebsocketAuthMessageHandler",
4+
"WebSocketMessageHandler",
55
]

src/handlers/message_handler.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
from dataclasses import dataclass
2+
from typing import Coroutine, Any, Callable
3+
4+
from websockets import WebSocketServerProtocol
5+
6+
from a12n.jwk_client import AsyncJWKClient
7+
from a12n.jwk_client import AsyncJWKClientException
8+
from app import conf
9+
from handlers.dto import AuthMessage
10+
from handlers.dto import SubscribeMessage
11+
from handlers.dto import UnsubscribeMessage
12+
from handlers.dto import SuccessResponseMessage
13+
from handlers.exceptions import WebsocketMessageException
14+
from storage.exceptions import StorageOperationException
15+
from storage.storage_updaters import StorageWebSocketRegister
16+
from storage import SubscriptionStorage
17+
from storage.storage_updaters import StorageUserSubscriber
18+
from storage.storage_updaters import StorageUserUnsubscriber
19+
from handlers.dto import IncomingMessage
20+
21+
AsyncMessageHandler = Callable[[WebSocketServerProtocol, Any], Coroutine[Any, Any, SuccessResponseMessage]]
22+
23+
24+
@dataclass
25+
class WebSocketMessageHandler:
26+
storage: SubscriptionStorage
27+
28+
def __post_init__(self) -> None:
29+
settings = conf.get_app_settings()
30+
self.jwk_client = AsyncJWKClient(jwks_url=settings.AUTH_JWKS_URL, supported_signing_algorithms=settings.AUTH_SUPPORTED_SIGNING_ALGORITHMS)
31+
32+
self.message_handlers: dict[str, AsyncMessageHandler] = {
33+
"Authenticate": self.handle_auth_message,
34+
"Subscribe": self.handle_subscribe_message,
35+
"Unsubscribe": self.handle_unsubscribe_message,
36+
}
37+
38+
async def handle_message(self, websocket: WebSocketServerProtocol, message: IncomingMessage) -> SuccessResponseMessage:
39+
return await self.message_handlers[message.message_type](websocket, message)
40+
41+
async def handle_auth_message(self, websocket: WebSocketServerProtocol, message: AuthMessage) -> SuccessResponseMessage:
42+
try:
43+
validated_token = await self.jwk_client.decode(message.params.token)
44+
StorageWebSocketRegister(storage=self.storage, websocket=websocket, validated_token=validated_token)()
45+
except (AsyncJWKClientException, StorageOperationException) as exc:
46+
raise WebsocketMessageException(str(exc), message)
47+
48+
return SuccessResponseMessage.model_construct(incoming_message=message)
49+
50+
async def handle_subscribe_message(self, websocket: WebSocketServerProtocol, message: SubscribeMessage) -> SuccessResponseMessage:
51+
StorageUserSubscriber(storage=self.storage, websocket=websocket, event=message.params.event)()
52+
return SuccessResponseMessage.model_construct(incoming_message=message)
53+
54+
async def handle_unsubscribe_message(self, websocket: WebSocketServerProtocol, message: UnsubscribeMessage) -> SuccessResponseMessage:
55+
StorageUserUnsubscriber(storage=self.storage, websocket=websocket, event=message.params.event)()
56+
return SuccessResponseMessage.model_construct(incoming_message=message)
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import pytest
2+
3+
from handlers import WebSocketMessageHandler
4+
from handlers.dto import AuthMessage, SubscribeMessage, UnsubscribeMessage
5+
6+
7+
@pytest.fixture(autouse=True)
8+
def settings(settings):
9+
settings.AUTH_JWKS_URL = "https://auth.clowns.com/auth/realms/clowns-realm/protocol/openid-connect/certs"
10+
settings.AUTH_SUPPORTED_SIGNING_ALGORITHMS = ["RS256"]
11+
return settings
12+
13+
14+
@pytest.fixture
15+
def force_token_on_validation(mocker, valid_token):
16+
return mocker.patch("a12n.jwk_client.AsyncJWKClient.decode", return_value=valid_token)
17+
18+
19+
@pytest.fixture
20+
def message_handler(storage):
21+
return WebSocketMessageHandler(storage=storage)
22+
23+
24+
@pytest.fixture
25+
def auth_message():
26+
return AuthMessage(message_id=23, message_type="Authenticate", params={"token": "some-valid-token-value"})
27+
28+
29+
@pytest.fixture
30+
def subscribe_message():
31+
return SubscribeMessage(message_id=24, message_type="Subscribe", params={"event": "channel1"})
32+
33+
34+
@pytest.fixture
35+
def unsubscribe_message():
36+
return UnsubscribeMessage(message_id=25, message_type="Unsubscribe", params={"event": "channel1"})
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
import pytest
2+
3+
from a12n.jwk_client import AsyncJWKClientException
4+
from app.types import DecodedValidToken
5+
from handlers.dto import SuccessResponseMessage
6+
from handlers.exceptions import WebsocketMessageException
7+
from handlers import WebSocketMessageHandler
8+
from storage.storage_updaters import StorageWebSocketRegister
9+
10+
pytestmark = [
11+
pytest.mark.usefixtures("force_token_on_validation"),
12+
]
13+
14+
15+
@pytest.fixture
16+
def ya_user_decoded_valid_token():
17+
return DecodedValidToken(sub="ya_user", exp="4852128170")
18+
19+
20+
@pytest.fixture
21+
def auth_handler(message_handler: WebSocketMessageHandler, ws):
22+
return lambda auth_message: message_handler.handle_auth_message(ws, auth_message)
23+
24+
25+
async def test_auth_handler_response_on_correct_auth_message(auth_handler, auth_message):
26+
auth_response = await auth_handler(auth_message)
27+
28+
assert isinstance(auth_response, SuccessResponseMessage)
29+
assert auth_response.message_type == "SuccessResponse"
30+
assert auth_response.incoming_message == auth_message
31+
32+
33+
async def test_auth_handler_register_websocket_in_storage(auth_handler, ws, auth_message, mocker, storage, valid_token):
34+
spy_websocket_register = mocker.spy(StorageWebSocketRegister, "__call__")
35+
36+
await auth_handler(auth_message)
37+
38+
assert storage.is_websocket_registered(ws) is True
39+
spy_websocket_register.assert_called_once()
40+
called_service = spy_websocket_register.call_args.args[0]
41+
assert called_service.storage == storage
42+
assert called_service.websocket == ws
43+
assert called_service.validated_token == valid_token
44+
45+
46+
async def test_auth_handler_raise_if_user_send_token_for_different_user(auth_handler, auth_message, storage, ws, register_ws, ya_user_decoded_valid_token):
47+
register_ws(ws, ya_user_decoded_valid_token)
48+
49+
with pytest.raises(WebsocketMessageException) as exc_info:
50+
await auth_handler(auth_message) # send valid user1 token while connection registered with ya_user
51+
52+
raised_exception = exc_info.value
53+
assert raised_exception.error_detail == "The user has different public id"
54+
assert raised_exception.incoming_message == auth_message
55+
assert storage.is_websocket_registered(ws) is True, "The existed connection should not be touched"
56+
57+
58+
async def test_auth_handler_raise_if_user_try_to_auth_with_expired_token(auth_handler, ws, auth_message, force_token_on_validation, storage):
59+
force_token_on_validation.side_effect = AsyncJWKClientException("The token is expired")
60+
61+
with pytest.raises(WebsocketMessageException) as exc_info:
62+
await auth_handler(auth_message)
63+
64+
raised_exception = exc_info.value
65+
assert raised_exception.error_detail == "The token is expired"
66+
assert raised_exception.incoming_message == auth_message
67+
assert storage.is_websocket_registered(ws) is False, "The ws should not be added to registered websockets"
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
import pytest
2+
from handlers.message_handler import WebSocketMessageHandler
3+
4+
5+
@pytest.fixture
6+
def get_message_handler(storage):
7+
return lambda: WebSocketMessageHandler(storage)
8+
9+
10+
def test_message_handler_jwk_client_settings(message_handler):
11+
assert message_handler.jwk_client.jwks_url == "https://auth.clowns.com/auth/realms/clowns-realm/protocol/openid-connect/certs"
12+
assert message_handler.jwk_client.supported_signing_algorithms == ["RS256"]
13+
14+
15+
@pytest.mark.usefixtures("force_token_on_validation")
16+
async def test_message_handler_call_auth_handler_on_auth_message(get_message_handler, auth_message, mocker, ws):
17+
spy_auth_handler = mocker.spy(WebSocketMessageHandler, "handle_auth_message")
18+
19+
await get_message_handler().handle_message(ws, auth_message)
20+
21+
spy_auth_handler.assert_awaited_once()
22+
23+
24+
async def test_message_handler_call_subscribe_handler_on_subscribe_message(get_message_handler, subscribe_message, mocker, ws_registered):
25+
spy_subscribe_handler = mocker.spy(WebSocketMessageHandler, "handle_subscribe_message")
26+
27+
await get_message_handler().handle_message(ws_registered, subscribe_message)
28+
29+
spy_subscribe_handler.assert_awaited_once()
30+
31+
32+
async def test_message_handler_call_unsubscribe_handler_on_unsubscribe_message(get_message_handler, unsubscribe_message, mocker, ws_subscribed):
33+
spy_unsubscribe_handler = mocker.spy(WebSocketMessageHandler, "handle_unsubscribe_message")
34+
35+
await get_message_handler().handle_message(ws_subscribed, unsubscribe_message)
36+
37+
spy_unsubscribe_handler.assert_awaited_once()
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
import pytest
2+
from handlers.message_handler import WebSocketMessageHandler
3+
from storage.storage_updaters import StorageUserSubscriber
4+
5+
6+
@pytest.fixture
7+
def subscribe_handler(message_handler: WebSocketMessageHandler, ws_registered):
8+
return lambda subscribe_message: message_handler.handle_subscribe_message(ws_registered, subscribe_message)
9+
10+
11+
async def test_subscribe_handler_return_success_response(subscribe_handler, subscribe_message):
12+
subscribe_response = await subscribe_handler(subscribe_message)
13+
14+
assert subscribe_response.message_type == "SuccessResponse"
15+
assert subscribe_response.incoming_message == subscribe_message
16+
17+
18+
async def test_subscribe_handler_call_storage_subscriber_under_the_hood(subscribe_handler, subscribe_message, mocker, storage, ws_registered):
19+
spy_storage_subscriber = mocker.spy(StorageUserSubscriber, "__call__")
20+
21+
await subscribe_handler(subscribe_message)
22+
23+
spy_storage_subscriber.assert_called_once()
24+
called_service = spy_storage_subscriber.call_args.args[0]
25+
assert called_service.storage == storage
26+
assert called_service.websocket == ws_registered
27+
assert called_service.event == subscribe_message.params.event
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
import pytest
2+
from handlers.message_handler import WebSocketMessageHandler
3+
from storage.storage_updaters import StorageUserUnsubscriber
4+
5+
6+
@pytest.fixture
7+
def unsubscribe_handler(message_handler: WebSocketMessageHandler, ws_subscribed):
8+
return lambda unsubscribe_message: message_handler.handle_unsubscribe_message(ws_subscribed, unsubscribe_message)
9+
10+
11+
async def test_unsubscribe_handler_return_success_response(unsubscribe_handler, unsubscribe_message):
12+
unsubscribe_response = await unsubscribe_handler(unsubscribe_message)
13+
14+
assert unsubscribe_response.message_type == "SuccessResponse"
15+
assert unsubscribe_response.incoming_message == unsubscribe_message
16+
17+
18+
async def test_unsubscribe_handler_call_storage_unsubscriber_under_the_hood(unsubscribe_handler, unsubscribe_message, mocker, storage, ws_subscribed):
19+
spy_storage_unsubscriber = mocker.spy(StorageUserUnsubscriber, "__call__")
20+
21+
await unsubscribe_handler(unsubscribe_message)
22+
23+
spy_storage_unsubscriber.assert_called_once()
24+
called_service = spy_storage_unsubscriber.call_args.args[0]
25+
assert called_service.storage == storage
26+
assert called_service.websocket == ws_subscribed
27+
assert called_service.event == unsubscribe_message.params.event

src/handlers/tests/tests_auth_message_handler.py

Lines changed: 0 additions & 104 deletions
This file was deleted.

0 commit comments

Comments
 (0)