You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I'm following the local rate limit codes to write my own custom filter.
The logic is following: when the service is being rate limited to x requests/sec and the actual input rate is larger than x, instead of sending response 'request{} is being rate limited", this filter will buffer all incoming requests and keep sending requests to the service with rate x.
Implementations: The code skeleton is similar to the local rate limit one. There is a queue maintained in the LocalRateLimiterImpl obj.
The LocalRateLimiterImpl has a periodic timer to call TSQueue.pop() which will check some condition to resume the requests using callbacks->continueDecoding(). The queue implementation is as following:
structQueueElement{
DecoderFilterCallbacks* callbacks;
std::chrono::time_point<std::chrono::system_clock> timeout;
};
classTSQueue: publicLogger::Loggable<Logger::Id::filter>{
private:// Underlying queue
std::queue<QueueElement> queue_;
// mutex for thread synchronizationmutable std::mutex mutex_;
std::chrono::time_point<std::chrono::system_clock> last_timeout;
std::chrono::microseconds delay;
public:TSQueue(std::chrono::microseconds delay_){
delay = delay_;
}
// Pushes an element to the queuevoidpush(DecoderFilterCallbacks* callbacks)
{
std::lock_guard<std::mutex> lock(mutex_);
if (queue_.empty()){
last_timeout = std::chrono::system_clock::now();
QueueElement ele = {callbacks, std::chrono::time_point(last_timeout)};
queue_.push(ele);
}else{
last_timeout += delay;
QueueElement ele = {callbacks, std::chrono::time_point(last_timeout)};
queue_.push(ele);
}
}
// Pops an element off the queuevoidpop()
{
std::lock_guard<std::mutex> lock(mutex_);
while (!queue_.empty())
{
QueueElement ele = queue_.front();
std::chrono::time_point<std::chrono::system_clock> t = std::chrono::system_clock::now();
if (ele.timeout < t){
ele.callbacks->continueDecoding();
queue_.pop();
}else{
break;
}
}
}
};
However, when I complied it and tried to run it, it can successfully pause the first request. After it tried to resume the request, there is a thread safety error after the onMessageEncoding is called. It might be buggy to just call callbacks.continueDecoding() from a share FilterConfig.
Any suggestions to fix this? Thanks!
The text was updated successfully, but these errors were encountered:
yizhengx
changed the title
How to correctly pause and resume filter chains when writing custom filters?
How to correctly pause and resume message decoding/encoding when writing custom filters?
Jul 31, 2023
I'm following the local rate limit codes to write my own custom filter.
The logic is following: when the service is being rate limited to x requests/sec and the actual input rate is larger than x, instead of sending response 'request{} is being rate limited", this filter will buffer all incoming requests and keep sending requests to the service with rate x.
Implementations: The code skeleton is similar to the local rate limit one. There is a queue maintained in the LocalRateLimiterImpl obj.
Every time a request comes in, it will be paused in OnMessageDecode() function and the callbacks will be pushed into the queue.
The LocalRateLimiterImpl has a periodic timer to call TSQueue.pop() which will check some condition to resume the requests using callbacks->continueDecoding(). The queue implementation is as following:
However, when I complied it and tried to run it, it can successfully pause the first request. After it tried to resume the request, there is a thread safety error after the onMessageEncoding is called. It might be buggy to just call callbacks.continueDecoding() from a share FilterConfig.
Any suggestions to fix this? Thanks!
The text was updated successfully, but these errors were encountered: