-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy paththrottling_middleware.py
93 lines (75 loc) · 2.9 KB
/
throttling_middleware.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import asyncio
from aiogram import types, Dispatcher
from aiogram.dispatcher import DEFAULT_RATE_LIMIT
from aiogram.dispatcher.handler import current_handler, CancelHandler
from aiogram.dispatcher.middlewares import BaseMiddleware
from aiogram.utils.exceptions import Throttled
def rate_limit(limit: int, key=None):
"""
Decorator for configuring rate limit and key in different functions.
:param limit:
:param key:
:return:
"""
def decorator(func):
setattr(func, 'throttling_rate_limit', limit)
if key:
setattr(func, 'throttling_key', key)
return func
return decorator
class ThrottlingMiddleware(BaseMiddleware):
"""
Simple middleware
"""
def __init__(self, limit=DEFAULT_RATE_LIMIT, key_prefix='antiflood_'):
self.rate_limit = limit
self.prefix = key_prefix
super(ThrottlingMiddleware, self).__init__()
async def on_process_message(self, message: types.Message, data: dict):
"""
This handler is called when dispatcher receives a message
:param message:
"""
# Get current handler
handler = current_handler.get()
# Get dispatcher from context
dispatcher = Dispatcher.get_current()
# If handler was configured, get rate limit and key from handler
if handler:
limit = getattr(handler, 'throttling_rate_limit', self.rate_limit)
key = getattr(handler, 'throttling_key', f"{self.prefix}_{handler.__name__}")
else:
limit = self.rate_limit
key = f"{self.prefix}_message"
# Use Dispatcher.throttle method.
try:
await dispatcher.throttle(key, rate=limit)
except Throttled as t:
# Execute action
await self.message_throttled(message, t)
# Cancel current handler
raise CancelHandler()
async def message_throttled(self, message: types.Message, throttled: Throttled):
"""
Notify user only on first exceed and notify about unlocking only on last exceed
:param message:
:param throttled:
"""
handler = current_handler.get()
dispatcher = Dispatcher.get_current()
if handler:
key = getattr(handler, 'throttling_key', f"{self.prefix}_{handler.__name__}")
else:
key = f"{self.prefix}_message"
# Calculate how much time is left till the block ends
delta = throttled.rate - throttled.delta
# Prevent flooding
if throttled.exceeded_count <= 2:
await message.reply('Too many requests!')
# Sleep.
await asyncio.sleep(delta)
# Check lock status
thr = await dispatcher.check_key(key)
# If current message is not last with current key - do not send message
if thr.exceeded_count == throttled.exceeded_count:
await message.reply('Unlocked.')