Skip to content

Dev #120

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Feb 15, 2025
Merged

Dev #120

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
Changelog
=========

1.13.0 (2025-02-15)
------------------

- initialize dispatcher middleware stack in constructor.
- generic context type added.


1.12.2 (2025-01-15)
------------------

Expand Down
2 changes: 1 addition & 1 deletion pjrpc/__about__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
__description__ = 'Extensible JSON-RPC library'
__url__ = 'https://github.com/dapper91/pjrpc'

__version__ = '1.12.2'
__version__ = '1.13.0'

__author__ = 'Dmitry Pershin'
__email__ = '[email protected]'
Expand Down
74 changes: 39 additions & 35 deletions pjrpc/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,15 @@ def wrapped(*args: Any, **kwargs: Any) -> 'BaseBatch.BaseProxy':
return wrapped

@abc.abstractmethod
def __call__(self, _trace_ctx: SimpleNamespace = SimpleNamespace()) -> Union[Awaitable[Any], Any]:
def __call__(self, _trace_ctx: Optional[SimpleNamespace] = None) -> Union[Awaitable[Any], Any]:
"""
Makes an RPC call.

:param _trace_ctx: tracers request context
"""

@abc.abstractmethod
def call(self, _trace_ctx: SimpleNamespace = SimpleNamespace()) -> Union[Awaitable[Any], Any]:
def call(self, _trace_ctx: Optional[SimpleNamespace] = None) -> Union[Awaitable[Any], Any]:
"""
Makes an RPC call.

Expand Down Expand Up @@ -97,7 +97,7 @@ def __call__(self, method: str, *args: Any, **kwargs: Any) -> 'BaseBatch':
return self.add(method, *args, **kwargs)

@abc.abstractmethod
def call(self, _trace_ctx: SimpleNamespace = SimpleNamespace()) -> Union[Awaitable[Any], Any]:
def call(self, _trace_ctx: Optional[SimpleNamespace] = None) -> Union[Awaitable[Any], Any]:
"""
Makes a JSON-RPC request.

Expand All @@ -107,7 +107,7 @@ def call(self, _trace_ctx: SimpleNamespace = SimpleNamespace()) -> Union[Awaitab

@abc.abstractmethod
def send(
self, request: BatchRequest, _trace_ctx: SimpleNamespace = SimpleNamespace(), **kwargs: Any,
self, request: BatchRequest, _trace_ctx: Optional[SimpleNamespace] = None, **kwargs: Any,
) -> Union[Awaitable[Optional[BatchResponse]], Optional[BatchResponse]]:
"""
Sends a JSON-RPC batch request.
Expand Down Expand Up @@ -183,10 +183,10 @@ class Proxy(BaseBatch.BaseProxy):
def __init__(self, batch: 'Batch'):
super().__init__(batch)

def __call__(self, _trace_ctx: SimpleNamespace = SimpleNamespace()) -> Any:
def __call__(self, _trace_ctx: Optional[SimpleNamespace] = None) -> Any:
return self.call(_trace_ctx)

def call(self, _trace_ctx: SimpleNamespace = SimpleNamespace()) -> Any:
def call(self, _trace_ctx: Optional[SimpleNamespace] = None) -> Any:
return self._batch.call(_trace_ctx)

@property
Expand All @@ -196,13 +196,13 @@ def proxy(self) -> 'Proxy':
def __init__(self, client: 'AbstractClient'):
super().__init__(client)

def call(self, _trace_ctx: SimpleNamespace = SimpleNamespace()) -> Optional[Any]:
response = self.send(self._requests)
def call(self, _trace_ctx: Optional[SimpleNamespace] = None) -> Optional[Any]:
response = self.send(self._requests, _trace_ctx=_trace_ctx)

return response.result if response is not None else None

def send(
self, request: BatchRequest, _trace_ctx: SimpleNamespace = SimpleNamespace(), **kwargs: Any,
self, request: BatchRequest, _trace_ctx: Optional[SimpleNamespace] = None, **kwargs: Any,
) -> Optional[BatchResponse]:
return cast(
Optional[BatchResponse], self._client._send(
Expand All @@ -225,10 +225,10 @@ class Proxy(BaseBatch.BaseProxy):
def __init__(self, batch: 'AsyncBatch'):
super().__init__(batch)

async def __call__(self, _trace_ctx: SimpleNamespace = SimpleNamespace()) -> Any:
async def __call__(self, _trace_ctx: Optional[SimpleNamespace] = None) -> Any:
return await self.call(_trace_ctx)

async def call(self, _trace_ctx: SimpleNamespace = SimpleNamespace()) -> Any:
async def call(self, _trace_ctx: Optional[SimpleNamespace] = None) -> Any:
return await self._batch.call(_trace_ctx)

@property
Expand All @@ -238,13 +238,13 @@ def proxy(self) -> 'Proxy':
def __init__(self, client: 'AbstractAsyncClient'):
super().__init__(client)

async def call(self, _trace_ctx: SimpleNamespace = SimpleNamespace()) -> Optional[Any]:
async def call(self, _trace_ctx: Optional[SimpleNamespace] = None) -> Optional[Any]:
response = await self.send(self._requests, _trace_ctx=_trace_ctx)

return response.result if response is not None else None

async def send(
self, request: BatchRequest, _trace_ctx: SimpleNamespace = SimpleNamespace(), **kwargs: Any,
self, request: BatchRequest, _trace_ctx: Optional[SimpleNamespace] = None, **kwargs: Any,
) -> Optional[BatchResponse]:
return await cast(
Awaitable[Optional[BatchResponse]], self._client._send(
Expand Down Expand Up @@ -304,7 +304,7 @@ def __init__(
json_decoder: Optional[json.JSONDecoder] = None,
strict: bool = True,
request_args: Optional[Dict[str, Any]] = None,
tracers: Iterable[Tracer] = (),
tracers: Iterable[Tracer[Any]] = (),
retry_strategy: Optional[retry.RetryStrategy] = None,
):
self.request_class = request_class
Expand All @@ -326,7 +326,7 @@ def __call__(
self,
method: str,
*args: Any,
_trace_ctx: SimpleNamespace = SimpleNamespace(),
_trace_ctx: Optional[SimpleNamespace] = None,
**kwargs: Any,
) -> Union[Awaitable[Any], Any]:
"""
Expand All @@ -339,7 +339,7 @@ def __call__(
:returns: response result
"""

return self.call(method, *args, **kwargs)
return self.call(method, *args, _trace_ctx=_trace_ctx, **kwargs)

@property
def proxy(self) -> 'Proxy':
Expand All @@ -359,7 +359,7 @@ def _send(
request: AbstractRequest,
response_class: Type[AbstractResponse],
validator: Callable[..., None],
_trace_ctx: SimpleNamespace = SimpleNamespace(),
_trace_ctx: Optional[SimpleNamespace] = None,
**kwargs: Any,
) -> Union[Awaitable[Optional[AbstractResponse]], Optional[AbstractResponse]]:
pass
Expand Down Expand Up @@ -407,7 +407,7 @@ def notify(
self,
method: str,
*args: Any,
_trace_ctx: SimpleNamespace = SimpleNamespace(),
_trace_ctx: Optional[SimpleNamespace] = None,
**kwargs: Any,
) -> Optional[Response]:
"""
Expand All @@ -429,7 +429,7 @@ def notify(
return self.send(request, _trace_ctx=_trace_ctx)

def call(
self, method: str, *args: Any, _trace_ctx: SimpleNamespace = SimpleNamespace(), **kwargs: Any,
self, method: str, *args: Any, _trace_ctx: Optional[SimpleNamespace] = None, **kwargs: Any,
) -> Any:
"""
Makes JSON-RPC call.
Expand All @@ -456,7 +456,7 @@ def call(
def send(
self,
request: Request,
_trace_ctx: SimpleNamespace = SimpleNamespace(),
_trace_ctx: Optional[SimpleNamespace] = None,
_retry_strategy: MaybeSet[retry.RetryStrategy] = UNSET,
**kwargs: Any,
) -> Optional[Response]:
Expand Down Expand Up @@ -486,25 +486,27 @@ def traced(method: Callable[..., Any]) -> Callable[..., Any]:
def wrapper(
self: 'AbstractClient',
request: AbstractRequest,
_trace_ctx: SimpleNamespace,
_trace_ctx: Optional[SimpleNamespace] = None,
**kwargs: Any,
) -> Optional[AbstractResponse]:
"""
Adds tracing logic to the method.
"""

trace_ctx = _trace_ctx or SimpleNamespace()

for tracer in self._tracers:
tracer.on_request_begin(_trace_ctx, request)
tracer.on_request_begin(trace_ctx, request)

try:
response = method(self, request, _trace_ctx=_trace_ctx, **kwargs)
response = method(self, request, _trace_ctx=trace_ctx, **kwargs)
except BaseException as e:
for tracer in self._tracers:
tracer.on_error(_trace_ctx, request, e)
tracer.on_error(trace_ctx, request, e)
raise

for tracer in self._tracers:
tracer.on_request_end(_trace_ctx, request, response)
tracer.on_request_end(trace_ctx, request, response)

return response

Expand Down Expand Up @@ -541,7 +543,7 @@ def _send(
request: AbstractRequest,
response_class: Type[AbstractResponse],
validator: Callable[..., None],
_trace_ctx: SimpleNamespace = SimpleNamespace(),
_trace_ctx: Optional[SimpleNamespace] = None,
**kwargs: Any,
) -> Optional[AbstractResponse]:
kwargs = {**self._request_args, **kwargs}
Expand Down Expand Up @@ -588,7 +590,7 @@ async def _request(self, request_text: str, is_notification: bool = False, **kwa
async def send(
self,
request: Request,
_trace_ctx: SimpleNamespace = SimpleNamespace(),
_trace_ctx: Optional[SimpleNamespace] = None,
_retry_strategy: MaybeSet[retry.RetryStrategy] = UNSET,
**kwargs: Any,
) -> Optional[Response]:
Expand Down Expand Up @@ -618,25 +620,27 @@ def traced(method: Callable[..., Any]) -> Callable[..., Any]:
async def wrapper(
self: 'AbstractAsyncClient',
request: Request,
_trace_ctx: SimpleNamespace = SimpleNamespace(),
_trace_ctx: Optional[SimpleNamespace] = None,
**kwargs: Any,
) -> Response:
"""
Adds tracing logic to the method.
"""

trace_ctx = _trace_ctx or SimpleNamespace()

for tracer in self._tracers:
tracer.on_request_begin(_trace_ctx, request)
tracer.on_request_begin(trace_ctx, request)

try:
response = await method(self, request, _trace_ctx=_trace_ctx, **kwargs)
response = await method(self, request, _trace_ctx=trace_ctx, **kwargs)
except BaseException as e:
for tracer in self._tracers:
tracer.on_error(_trace_ctx, request, e)
tracer.on_error(trace_ctx, request, e)
raise

for tracer in self._tracers:
tracer.on_request_end(_trace_ctx, request, response)
tracer.on_request_end(trace_ctx, request, response)

return response

Expand Down Expand Up @@ -673,7 +677,7 @@ async def _send(
request: AbstractRequest,
response_class: Type[AbstractResponse],
validator: Callable[..., None],
_trace_ctx: SimpleNamespace = SimpleNamespace(),
_trace_ctx: Optional[SimpleNamespace] = None,
**kwargs: Any,
) -> Optional[AbstractResponse]:
kwargs = {**self._request_args, **kwargs}
Expand All @@ -697,7 +701,7 @@ async def notify(
self,
method: str,
*args: Any,
_trace_ctx: SimpleNamespace = SimpleNamespace(),
_trace_ctx: Optional[SimpleNamespace] = None,
**kwargs: Any,
) -> Optional[Response]:
"""
Expand All @@ -719,7 +723,7 @@ async def notify(
return await self.send(request, _trace_ctx=_trace_ctx)

async def call(
self, method: str, *args: Any, _trace_ctx: SimpleNamespace = SimpleNamespace(), **kwargs: Any,
self, method: str, *args: Any, _trace_ctx: Optional[SimpleNamespace] = None, **kwargs: Any,
) -> Any:
"""
Makes JSON-RPC call.
Expand Down
21 changes: 11 additions & 10 deletions pjrpc/client/tracer.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
import logging
from types import SimpleNamespace
from typing import Optional
from typing import Generic, Optional, TypeVar

from pjrpc import AbstractRequest, AbstractResponse

client_logger = logging.getLogger(__package__)

ContextType = TypeVar('ContextType')

class Tracer:

class Tracer(Generic[ContextType]):
"""
JSON-RPC client tracer.
"""

def on_request_begin(self, trace_context: SimpleNamespace, request: AbstractRequest) -> None:
def on_request_begin(self, trace_context: ContextType, request: AbstractRequest) -> None:
"""
Handler called before JSON-RPC request begins.

Expand All @@ -21,7 +22,7 @@ def on_request_begin(self, trace_context: SimpleNamespace, request: AbstractRequ
"""

def on_request_end(
self, trace_context: SimpleNamespace, request: AbstractRequest, response: Optional[AbstractResponse],
self, trace_context: ContextType, request: AbstractRequest, response: Optional[AbstractResponse],
) -> None:
"""
Handler called after JSON-RPC request ends.
Expand All @@ -32,7 +33,7 @@ def on_request_end(
"""

def on_error(
self, trace_context: SimpleNamespace, request: AbstractRequest, error: BaseException,
self, trace_context: ContextType, request: AbstractRequest, error: BaseException,
) -> None:
"""
Handler called when JSON-RPC request failed.
Expand All @@ -43,7 +44,7 @@ def on_error(
"""


class LoggingTracer(Tracer):
class LoggingTracer(Tracer[ContextType], Generic[ContextType]):
"""
JSON-RPC client logging tracer.
"""
Expand All @@ -52,15 +53,15 @@ def __init__(self, logger: logging.Logger = client_logger, level: int = logging.
self._logger = logger
self._level = level

def on_request_begin(self, trace_context: SimpleNamespace, request: AbstractRequest) -> None:
def on_request_begin(self, trace_context: ContextType, request: AbstractRequest) -> None:
self._logger.log(self._level, "sending request: %r", request)

def on_request_end(
self, trace_context: SimpleNamespace, request: AbstractRequest, response: Optional[AbstractResponse],
self, trace_context: ContextType, request: AbstractRequest, response: Optional[AbstractResponse],
) -> None:
self._logger.log(self._level, "received response: %r", response)

def on_error(
self, trace_context: SimpleNamespace, request: AbstractRequest, error: BaseException,
self, trace_context: ContextType, request: AbstractRequest, error: BaseException,
) -> None:
self._logger.log(self._level, "request '%s' sending error: %r", request, error)
Loading