Skip to content
This repository was archived by the owner on May 1, 2024. It is now read-only.

Back Pressure should be applied per Listener #562

Open
jeremydmiller opened this issue Nov 5, 2019 · 6 comments
Open

Back Pressure should be applied per Listener #562

jeremydmiller opened this issue Nov 5, 2019 · 6 comments

Comments

@jeremydmiller
Copy link
Member

For right now, I think the answer is to delete all the back pressure that's there now, and start over. Hmm. Really going to be "shut down the listener" if the local queue is backed up.

@jeremydmiller
Copy link
Member Author

This is going to be more work than I originally though.

  • Delete the current back pressure agent
  • RecoverIncomingMessages needs to be smart enough to understand which listeners are currently latched

@jeremydmiller
Copy link
Member Author

This is what BackPressureAgent was:

    public class BackPressureAgent : BackgroundService
    {
        private readonly IMessagingRoot _root;

        public BackPressureAgent(IMessagingRoot root)
        {
            _root = root;
        }

        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            while (!stoppingToken.IsCancellationRequested)
            {
                await Task.Delay(_root.Options.BackPressurePollingInterval, stoppingToken);
                try
                {
                    ApplyBackPressure();
                }
                catch (Exception e)
                {
                    _root.Logger.LogException(e);
                }
            }
        }


        public void ApplyBackPressure()
        {
            var ratio = _root.Workers.QueuedCount / (double) _root.Options.MaximumLocalEnqueuedBackPressureThreshold;

            if (_root.ListeningStatus == ListeningStatus.Accepting && ratio > 1.0)
                _root.ListeningStatus = ListeningStatus.TooBusy;
            else if (_root.ListeningStatus == ListeningStatus.TooBusy && ratio < 0.8)
                _root.ListeningStatus = ListeningStatus.Accepting;
        }
    }

@jeremydmiller
Copy link
Member Author

And these settings should now go to ListenerSettings:


        /// <summary>
        ///     Used to govern the incoming and outgoing message recovery process by making slowing down
        ///     the recovery process when the local worker queues have this many enqueued
        ///     messages
        /// </summary>
        public int MaximumLocalEnqueuedBackPressureThreshold { get; set; } = 10000;

        /// <summary>
        ///     Polling interval for applying back pressure checking. Default is 2 seconds
        /// </summary>
        public TimeSpan BackPressurePollingInterval { get; set; } = 2.Seconds();


@jeremydmiller
Copy link
Member Author

Might want to tap in directly to an IWorkerQueue for the retries to do backpressure

@jeremydmiller
Copy link
Member Author

The tests for back pressure agent:

using Jasper.Messaging;
using Jasper.Messaging.Transports;
using NSubstitute;
using Shouldly;
using Xunit;

namespace Jasper.Testing.Messaging
{
    public class BackPressureAgentTests
    {
        public BackPressureAgentTests()
        {
            theRoot = new MockMessagingRoot();
            theAgent = new BackPressureAgent(theRoot);
        }

        private readonly MockMessagingRoot theRoot;
        private readonly BackPressureAgent theAgent;

        [Fact]
        public void status_is_accepting_and_above_the_threshold()
        {
            theRoot.Workers.QueuedCount.Returns(theRoot.Options.MaximumLocalEnqueuedBackPressureThreshold + 5);
            theRoot.ListeningStatus = ListeningStatus.Accepting;

            theAgent.ApplyBackPressure();

            theRoot.ListeningStatus.ShouldBe(ListeningStatus.TooBusy);
        }

        [Fact]
        public void status_is_accepting_and_at_the_threshold()
        {
            theRoot.Workers.QueuedCount.Returns(theRoot.Options.MaximumLocalEnqueuedBackPressureThreshold);
            theRoot.ListeningStatus = ListeningStatus.Accepting;

            theAgent.ApplyBackPressure();

            theRoot.ListeningStatus.ShouldBe(ListeningStatus.Accepting);
        }


        [Fact]
        public void status_is_accepting_and_below_the_threshold()
        {
            theRoot.Workers.QueuedCount.Returns(theRoot.Options.MaximumLocalEnqueuedBackPressureThreshold - 5);
            theRoot.ListeningStatus = ListeningStatus.Accepting;

            theAgent.ApplyBackPressure();

            theRoot.ListeningStatus.ShouldBe(ListeningStatus.Accepting);
        }

        [Fact]
        public void status_is_too_busy_and_above_the_threshold()
        {
            theRoot.Workers.QueuedCount.Returns(theRoot.Options.MaximumLocalEnqueuedBackPressureThreshold + 5);
            theRoot.ListeningStatus = ListeningStatus.TooBusy;

            theAgent.ApplyBackPressure();

            theRoot.ListeningStatus.ShouldBe(ListeningStatus.TooBusy);
        }

        [Fact]
        public void status_is_too_busy_and_below_10_percent_of_the_threshold()
        {
            theRoot.Options.MaximumLocalEnqueuedBackPressureThreshold = 2000;
            theRoot.Workers.QueuedCount.Returns(1599);
            theRoot.ListeningStatus = ListeningStatus.TooBusy;

            theAgent.ApplyBackPressure();

            theRoot.ListeningStatus.ShouldBe(ListeningStatus.Accepting);
        }

        [Fact]
        public void status_is_too_busy_and_within_10_percent_of_the_threshold()
        {
            theRoot.Options.MaximumLocalEnqueuedBackPressureThreshold = 2000;
            theRoot.Workers.QueuedCount.Returns(1801);
            theRoot.ListeningStatus = ListeningStatus.TooBusy;

            theAgent.ApplyBackPressure();

            theRoot.ListeningStatus.ShouldBe(ListeningStatus.TooBusy);
        }
    }
}

@jeremydmiller jeremydmiller added this to the The big 1.0 milestone Nov 14, 2019
@jeremydmiller
Copy link
Member Author

Pushing this back to 1.1. Try to do it in conjunction with the circuit breaker features

@jeremydmiller jeremydmiller modified the milestones: The big 1.0, 1.1 Dec 7, 2019
@jeremydmiller jeremydmiller removed this from the 1.2 milestone Jul 7, 2021
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant